xref: /spdk/test/event/scheduler/scheduler.c (revision a6dbe3721eb3b5990707fc3e378c95e505dd8ab5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2021 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/likely.h"
11 #include "spdk/json.h"
12 #include "spdk/jsonrpc.h"
13 #include "spdk/rpc.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 
18 #include "spdk_internal/event.h"
19 
20 static bool g_is_running = true;
21 pthread_mutex_t g_sched_list_mutex = PTHREAD_MUTEX_INITIALIZER;
22 #define TIMESLICE_US 100 * 1000
23 static bool g_for_each_reactor = false;
24 
25 struct sched_thread {
26 	struct spdk_thread *thread;
27 	struct spdk_poller *poller;
28 	struct spdk_poller *idle_poller;
29 	int active_percent;
30 	struct spdk_jsonrpc_request *request;
31 	TAILQ_ENTRY(sched_thread) link;
32 };
33 
34 static TAILQ_HEAD(, sched_thread) g_sched_threads = TAILQ_HEAD_INITIALIZER(g_sched_threads);
35 
36 struct rpc_thread_create {
37 	int active_percent;
38 	char *name;
39 	char *cpu_mask;
40 };
41 
42 static void
free_rpc_thread_create(struct rpc_thread_create * req)43 free_rpc_thread_create(struct rpc_thread_create *req)
44 {
45 	free(req->name);
46 	free(req->cpu_mask);
47 }
48 
49 static const struct spdk_json_object_decoder rpc_thread_create_decoders[] = {
50 	{"active", offsetof(struct rpc_thread_create, active_percent), spdk_json_decode_uint64},
51 	{"name", offsetof(struct rpc_thread_create, name), spdk_json_decode_string, true},
52 	{"cpu_mask", offsetof(struct rpc_thread_create, cpu_mask), spdk_json_decode_string, true},
53 };
54 
55 static void
rpc_scheduler_thread_create_cb(struct spdk_jsonrpc_request * request,uint64_t thread_id)56 rpc_scheduler_thread_create_cb(struct spdk_jsonrpc_request *request, uint64_t thread_id)
57 {
58 	struct spdk_json_write_ctx *w;
59 
60 	w = spdk_jsonrpc_begin_result(request);
61 	spdk_json_write_uint64(w, thread_id);
62 	spdk_jsonrpc_end_result(request, w);
63 }
64 
65 static void
thread_delete(struct sched_thread * sched_thread)66 thread_delete(struct sched_thread *sched_thread)
67 {
68 	spdk_poller_unregister(&sched_thread->poller);
69 	spdk_poller_unregister(&sched_thread->idle_poller);
70 	spdk_thread_exit(sched_thread->thread);
71 
72 	TAILQ_REMOVE(&g_sched_threads, sched_thread, link);
73 	free(sched_thread);
74 
75 	if (!g_is_running && TAILQ_EMPTY(&g_sched_threads)) {
76 		spdk_app_stop(0);
77 	}
78 }
79 
80 static int
poller_run_busy(void * arg)81 poller_run_busy(void *arg)
82 {
83 	struct sched_thread *sched_thread = arg;
84 
85 	if (spdk_unlikely(!g_is_running)) {
86 		pthread_mutex_lock(&g_sched_list_mutex);
87 		thread_delete(sched_thread);
88 		pthread_mutex_unlock(&g_sched_list_mutex);
89 		return SPDK_POLLER_IDLE;
90 	}
91 
92 	spdk_delay_us(TIMESLICE_US * sched_thread->active_percent / 100);
93 	return SPDK_POLLER_BUSY;
94 }
95 
96 static int
poller_run_idle(void * arg)97 poller_run_idle(void *arg)
98 {
99 	struct sched_thread *sched_thread = arg;
100 
101 	if (spdk_unlikely(!g_is_running)) {
102 		pthread_mutex_lock(&g_sched_list_mutex);
103 		thread_delete(sched_thread);
104 		pthread_mutex_unlock(&g_sched_list_mutex);
105 		return SPDK_POLLER_IDLE;
106 	}
107 
108 	spdk_delay_us(10);
109 	return SPDK_POLLER_IDLE;
110 }
111 
112 static void
update_pollers(struct sched_thread * sched_thread)113 update_pollers(struct sched_thread *sched_thread)
114 {
115 	spdk_poller_unregister(&sched_thread->poller);
116 	if (sched_thread->active_percent > 0) {
117 		sched_thread->poller = spdk_poller_register_named(poller_run_busy, sched_thread, TIMESLICE_US,
118 				       spdk_thread_get_name(sched_thread->thread));
119 		assert(sched_thread->poller != NULL);
120 	}
121 	if (sched_thread->idle_poller == NULL) {
122 		sched_thread->idle_poller = spdk_poller_register_named(poller_run_idle, sched_thread, 0,
123 					    "idle_poller");
124 		assert(sched_thread->idle_poller != NULL);
125 	}
126 }
127 
128 static void
rpc_register_poller(void * arg)129 rpc_register_poller(void *arg)
130 {
131 	struct sched_thread *sched_thread = arg;
132 
133 	update_pollers(sched_thread);
134 
135 	if (sched_thread->request != NULL) {
136 		rpc_scheduler_thread_create_cb(sched_thread->request, spdk_thread_get_id(sched_thread->thread));
137 		sched_thread->request = NULL;
138 	}
139 }
140 
141 static void
rpc_scheduler_thread_create(struct spdk_jsonrpc_request * request,const struct spdk_json_val * params)142 rpc_scheduler_thread_create(struct spdk_jsonrpc_request *request,
143 			    const struct spdk_json_val *params)
144 {
145 	struct sched_thread *sched_thread;
146 	struct rpc_thread_create req = {0};
147 	struct spdk_cpuset *cpu_set = NULL;
148 	int rc = 0;
149 
150 	if (spdk_json_decode_object(params, rpc_thread_create_decoders,
151 				    SPDK_COUNTOF(rpc_thread_create_decoders),
152 				    &req)) {
153 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
154 						 "Invalid parameters provided");
155 		return;
156 	}
157 
158 	if (req.active_percent < 0 || req.active_percent > 100) {
159 		SPDK_ERRLOG("invalid percent value %d\n", req.active_percent);
160 		spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
161 		free_rpc_thread_create(&req);
162 		return;
163 	}
164 
165 	if (req.cpu_mask != NULL) {
166 		cpu_set = calloc(1, sizeof(*cpu_set));
167 		assert(cpu_set != NULL);
168 		rc = spdk_cpuset_parse(cpu_set, req.cpu_mask);
169 		if (rc < 0) {
170 			SPDK_ERRLOG("invalid cpumask %s\n", req.cpu_mask);
171 			spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
172 			free_rpc_thread_create(&req);
173 			free(cpu_set);
174 			return;
175 		}
176 	}
177 
178 	sched_thread = calloc(1, sizeof(*sched_thread));
179 	assert(sched_thread != NULL);
180 
181 	sched_thread->thread = spdk_thread_create(req.name, cpu_set);
182 	assert(sched_thread->thread != NULL);
183 	free(cpu_set);
184 
185 	sched_thread->request = request;
186 	sched_thread->active_percent = req.active_percent;
187 
188 	spdk_thread_send_msg(sched_thread->thread, rpc_register_poller, sched_thread);
189 
190 	free_rpc_thread_create(&req);
191 
192 	pthread_mutex_lock(&g_sched_list_mutex);
193 	TAILQ_INSERT_TAIL(&g_sched_threads, sched_thread, link);
194 	pthread_mutex_unlock(&g_sched_list_mutex);
195 
196 	return;
197 }
198 
199 SPDK_RPC_REGISTER("scheduler_thread_create", rpc_scheduler_thread_create, SPDK_RPC_RUNTIME)
200 
201 struct rpc_thread_set_active_ctx {
202 	int active_percent;
203 	struct spdk_jsonrpc_request *request;
204 };
205 
206 struct rpc_thread_set_active {
207 	uint64_t thread_id;
208 	int active_percent;
209 };
210 
211 static const struct spdk_json_object_decoder rpc_thread_set_active_decoders[] = {
212 	{"thread_id", offsetof(struct rpc_thread_set_active, thread_id), spdk_json_decode_uint64},
213 	{"active", offsetof(struct rpc_thread_set_active, active_percent), spdk_json_decode_uint64},
214 };
215 
216 static void
rpc_scheduler_thread_set_active_cb(void * arg)217 rpc_scheduler_thread_set_active_cb(void *arg)
218 {
219 	struct rpc_thread_set_active_ctx *ctx = arg;
220 	uint64_t thread_id;
221 	struct sched_thread *sched_thread;
222 
223 	thread_id = spdk_thread_get_id(spdk_get_thread());
224 
225 	pthread_mutex_lock(&g_sched_list_mutex);
226 	TAILQ_FOREACH(sched_thread, &g_sched_threads, link) {
227 		if (spdk_thread_get_id(sched_thread->thread) == thread_id) {
228 			sched_thread->active_percent = ctx->active_percent;
229 			update_pollers(sched_thread);
230 			pthread_mutex_unlock(&g_sched_list_mutex);
231 			spdk_jsonrpc_send_bool_response(ctx->request, true);
232 			free(ctx);
233 			return;
234 		}
235 	}
236 	pthread_mutex_unlock(&g_sched_list_mutex);
237 
238 	spdk_jsonrpc_send_error_response(ctx->request, -ENOENT, spdk_strerror(ENOENT));
239 	free(ctx);
240 	return;
241 }
242 
243 static void
rpc_scheduler_thread_set_active(struct spdk_jsonrpc_request * request,const struct spdk_json_val * params)244 rpc_scheduler_thread_set_active(struct spdk_jsonrpc_request *request,
245 				const struct spdk_json_val *params)
246 {
247 	struct spdk_thread *thread;
248 	struct rpc_thread_set_active req = {0};
249 	struct rpc_thread_set_active_ctx *ctx;
250 
251 	if (spdk_json_decode_object(params, rpc_thread_set_active_decoders,
252 				    SPDK_COUNTOF(rpc_thread_set_active_decoders),
253 				    &req)) {
254 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
255 						 "Invalid parameters provided");
256 		return;
257 	}
258 
259 	if (req.active_percent < 0 || req.active_percent > 100) {
260 		SPDK_ERRLOG("invalid percent value %d\n", req.active_percent);
261 		spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
262 		return;
263 	}
264 
265 	thread = spdk_thread_get_by_id(req.thread_id);
266 	if (thread == NULL) {
267 		spdk_jsonrpc_send_error_response(request, -ENOENT, spdk_strerror(ENOENT));
268 		return;
269 	}
270 
271 	ctx = calloc(1, sizeof(*ctx));
272 	if (ctx == NULL) {
273 		spdk_jsonrpc_send_error_response(request, -ENOMEM, spdk_strerror(-ENOMEM));
274 		return;
275 	}
276 	ctx->request = request;
277 	ctx->active_percent = req.active_percent;
278 
279 	spdk_thread_send_msg(thread, rpc_scheduler_thread_set_active_cb, ctx);
280 }
281 
282 SPDK_RPC_REGISTER("scheduler_thread_set_active", rpc_scheduler_thread_set_active, SPDK_RPC_RUNTIME)
283 
284 struct rpc_thread_delete_ctx {
285 	struct spdk_jsonrpc_request *request;
286 };
287 
288 struct rpc_thread_delete {
289 	uint64_t thread_id;
290 };
291 
292 static const struct spdk_json_object_decoder rpc_thread_delete_decoders[] = {
293 	{"thread_id", offsetof(struct rpc_thread_delete, thread_id), spdk_json_decode_uint64},
294 };
295 
296 static void
rpc_scheduler_thread_delete_cb(void * arg)297 rpc_scheduler_thread_delete_cb(void *arg)
298 {
299 	struct rpc_thread_delete_ctx *ctx = arg;
300 	struct sched_thread *sched_thread;
301 	uint64_t thread_id;
302 
303 	thread_id = spdk_thread_get_id(spdk_get_thread());
304 
305 	pthread_mutex_lock(&g_sched_list_mutex);
306 	TAILQ_FOREACH(sched_thread, &g_sched_threads, link) {
307 		if (spdk_thread_get_id(sched_thread->thread) == thread_id) {
308 			thread_delete(sched_thread);
309 			pthread_mutex_unlock(&g_sched_list_mutex);
310 			spdk_jsonrpc_send_bool_response(ctx->request, true);
311 			free(ctx);
312 			return;
313 		}
314 	}
315 	pthread_mutex_unlock(&g_sched_list_mutex);
316 
317 	spdk_jsonrpc_send_error_response(ctx->request, -ENOENT, spdk_strerror(ENOENT));
318 	free(ctx);
319 	return;
320 }
321 
322 static void
rpc_scheduler_thread_delete(struct spdk_jsonrpc_request * request,const struct spdk_json_val * params)323 rpc_scheduler_thread_delete(struct spdk_jsonrpc_request *request,
324 			    const struct spdk_json_val *params)
325 {
326 	struct spdk_thread *thread;
327 	struct rpc_thread_delete req = {0};
328 	struct rpc_thread_delete_ctx *ctx;
329 
330 	if (spdk_json_decode_object(params, rpc_thread_delete_decoders,
331 				    SPDK_COUNTOF(rpc_thread_delete_decoders),
332 				    &req)) {
333 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
334 						 "Invalid parameters provided");
335 		return;
336 	}
337 
338 	thread = spdk_thread_get_by_id(req.thread_id);
339 	if (thread == NULL) {
340 		spdk_jsonrpc_send_error_response(request, -ENOENT, spdk_strerror(ENOENT));
341 		return;
342 	}
343 
344 	ctx = calloc(1, sizeof(*ctx));
345 	if (ctx == NULL) {
346 		spdk_jsonrpc_send_error_response(request, -ENOMEM, spdk_strerror(-ENOMEM));
347 		return;
348 	}
349 	ctx->request = request;
350 
351 	spdk_thread_send_msg(thread, rpc_scheduler_thread_delete_cb, ctx);
352 }
353 
354 SPDK_RPC_REGISTER("scheduler_thread_delete", rpc_scheduler_thread_delete, SPDK_RPC_RUNTIME)
355 
356 static void
test_shutdown(void)357 test_shutdown(void)
358 {
359 	g_is_running = false;
360 	SPDK_NOTICELOG("Scheduler test application stopped.\n");
361 	pthread_mutex_lock(&g_sched_list_mutex);
362 	if (TAILQ_EMPTY(&g_sched_threads)) {
363 		spdk_app_stop(0);
364 	}
365 	pthread_mutex_unlock(&g_sched_list_mutex);
366 }
367 
368 static void
for_each_nop(void * arg1,void * arg2)369 for_each_nop(void *arg1, void *arg2)
370 {
371 }
372 
373 static void
for_each_reactor_start(void * arg1,void * arg2)374 for_each_reactor_start(void *arg1, void *arg2)
375 {
376 	spdk_for_each_reactor(for_each_nop, NULL, NULL, for_each_reactor_start);
377 }
378 
379 static void
test_start(void * arg1)380 test_start(void *arg1)
381 {
382 	SPDK_NOTICELOG("Scheduler test application started.\n");
383 	/* Start an spdk_for_each_reactor operation that just keeps
384 	 * running over and over again until the app exits.  This
385 	 * serves as a regression test for SPDK issue #2206, ensuring
386 	 * that any pending spdk_for_each_reactor operations are
387 	 * completed before reactors are shut down.
388 	 */
389 	if (g_for_each_reactor) {
390 		for_each_reactor_start(NULL, NULL);
391 	}
392 }
393 
394 static void
scheduler_usage(void)395 scheduler_usage(void)
396 {
397 	printf(" -f                        Enable spdk_for_each_reactor regression test\n");
398 }
399 
400 static int
scheduler_parse_arg(int ch,char * arg)401 scheduler_parse_arg(int ch, char *arg)
402 {
403 	switch (ch) {
404 	case 'f':
405 		g_for_each_reactor = true;
406 		break;
407 	default:
408 		return -EINVAL;
409 	}
410 	return 0;
411 }
412 
413 int
main(int argc,char ** argv)414 main(int argc, char **argv)
415 {
416 	struct spdk_app_opts opts;
417 	int rc = 0;
418 
419 	spdk_app_opts_init(&opts, sizeof(opts));
420 	opts.name = "scheduler";
421 	opts.shutdown_cb = test_shutdown;
422 
423 	if ((rc = spdk_app_parse_args(argc, argv, &opts,
424 				      "f", NULL, scheduler_parse_arg, scheduler_usage)) != SPDK_APP_PARSE_ARGS_SUCCESS) {
425 		return rc;
426 	}
427 
428 	rc = spdk_app_start(&opts, test_start, NULL);
429 
430 	spdk_app_fini();
431 
432 	return rc;
433 }
434