xref: /spdk/test/event/scheduler/scheduler.c (revision cc6920a4763d4b9a43aa40583c8397d8f14fa100)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/likely.h"
39 #include "spdk/json.h"
40 #include "spdk/jsonrpc.h"
41 #include "spdk/rpc.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 
46 #include "spdk_internal/event.h"
47 
48 static bool g_is_running = true;
49 pthread_mutex_t g_sched_list_mutex = PTHREAD_MUTEX_INITIALIZER;
50 #define TIMESLICE_US 100 * 1000
51 
52 struct sched_thread {
53 	struct spdk_thread *thread;
54 	struct spdk_poller *poller;
55 	struct spdk_poller *idle_poller;
56 	int active_percent;
57 	struct spdk_jsonrpc_request *request;
58 	TAILQ_ENTRY(sched_thread) link;
59 };
60 
61 static TAILQ_HEAD(, sched_thread) g_sched_threads = TAILQ_HEAD_INITIALIZER(g_sched_threads);
62 
63 struct rpc_thread_create {
64 	int active_percent;
65 	char *name;
66 	char *cpu_mask;
67 };
68 
69 static void
70 free_rpc_thread_create(struct rpc_thread_create *req)
71 {
72 	free(req->name);
73 	free(req->cpu_mask);
74 }
75 
76 static const struct spdk_json_object_decoder rpc_thread_create_decoders[] = {
77 	{"active", offsetof(struct rpc_thread_create, active_percent), spdk_json_decode_uint64},
78 	{"name", offsetof(struct rpc_thread_create, name), spdk_json_decode_string, true},
79 	{"cpu_mask", offsetof(struct rpc_thread_create, cpu_mask), spdk_json_decode_string, true},
80 };
81 
82 static void
83 rpc_scheduler_thread_create_cb(struct spdk_jsonrpc_request *request, uint64_t thread_id)
84 {
85 	struct spdk_json_write_ctx *w;
86 
87 	w = spdk_jsonrpc_begin_result(request);
88 	spdk_json_write_uint64(w, thread_id);
89 	spdk_jsonrpc_end_result(request, w);
90 }
91 
92 static void
93 thread_delete(struct sched_thread *sched_thread)
94 {
95 	spdk_poller_unregister(&sched_thread->poller);
96 	spdk_poller_unregister(&sched_thread->idle_poller);
97 	spdk_thread_exit(sched_thread->thread);
98 
99 	TAILQ_REMOVE(&g_sched_threads, sched_thread, link);
100 	free(sched_thread);
101 
102 	if (!g_is_running && TAILQ_EMPTY(&g_sched_threads)) {
103 		spdk_app_stop(0);
104 	}
105 }
106 
107 static int
108 poller_run_busy(void *arg)
109 {
110 	struct sched_thread *sched_thread = arg;
111 
112 	if (spdk_unlikely(!g_is_running)) {
113 		pthread_mutex_lock(&g_sched_list_mutex);
114 		thread_delete(sched_thread);
115 		pthread_mutex_unlock(&g_sched_list_mutex);
116 		return SPDK_POLLER_IDLE;
117 	}
118 
119 	spdk_delay_us(TIMESLICE_US * sched_thread->active_percent / 100);
120 	return SPDK_POLLER_BUSY;
121 }
122 
123 static int
124 poller_run_idle(void *arg)
125 {
126 	struct sched_thread *sched_thread = arg;
127 
128 	if (spdk_unlikely(!g_is_running)) {
129 		pthread_mutex_lock(&g_sched_list_mutex);
130 		thread_delete(sched_thread);
131 		pthread_mutex_unlock(&g_sched_list_mutex);
132 		return SPDK_POLLER_IDLE;
133 	}
134 
135 	spdk_delay_us(10);
136 	return SPDK_POLLER_IDLE;
137 }
138 
139 static void
140 update_pollers(struct sched_thread *sched_thread)
141 {
142 	spdk_poller_unregister(&sched_thread->poller);
143 	if (sched_thread->active_percent > 0) {
144 		sched_thread->poller = spdk_poller_register_named(poller_run_busy, sched_thread, TIMESLICE_US,
145 				       spdk_thread_get_name(sched_thread->thread));
146 		assert(sched_thread->poller != NULL);
147 	}
148 	if (sched_thread->idle_poller == NULL) {
149 		sched_thread->idle_poller = spdk_poller_register_named(poller_run_idle, sched_thread, 0,
150 					    "idle_poller");
151 		assert(sched_thread->idle_poller != NULL);
152 	}
153 }
154 
155 static void
156 rpc_register_poller(void *arg)
157 {
158 	struct sched_thread *sched_thread = arg;
159 
160 	update_pollers(sched_thread);
161 
162 	if (sched_thread->request != NULL) {
163 		rpc_scheduler_thread_create_cb(sched_thread->request, spdk_thread_get_id(sched_thread->thread));
164 		sched_thread->request = NULL;
165 	}
166 }
167 
168 static void
169 rpc_scheduler_thread_create(struct spdk_jsonrpc_request *request,
170 			    const struct spdk_json_val *params)
171 {
172 	struct sched_thread *sched_thread;
173 	struct rpc_thread_create req = {0};
174 	struct spdk_cpuset *cpu_set = NULL;
175 	int rc = 0;
176 
177 	if (spdk_json_decode_object(params, rpc_thread_create_decoders,
178 				    SPDK_COUNTOF(rpc_thread_create_decoders),
179 				    &req)) {
180 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
181 						 "Invalid parameters provided");
182 		return;
183 	}
184 
185 	if (req.active_percent < 0 || req.active_percent > 100) {
186 		SPDK_ERRLOG("invalid percent value %d\n", req.active_percent);
187 		spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
188 		free_rpc_thread_create(&req);
189 		return;
190 	}
191 
192 	if (req.cpu_mask != NULL) {
193 		cpu_set = calloc(1, sizeof(*cpu_set));
194 		assert(cpu_set != NULL);
195 		rc = spdk_cpuset_parse(cpu_set, req.cpu_mask);
196 		if (rc < 0) {
197 			SPDK_ERRLOG("invalid cpumask %s\n", req.cpu_mask);
198 			spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
199 			free_rpc_thread_create(&req);
200 			free(cpu_set);
201 			return;
202 		}
203 	}
204 
205 	sched_thread = calloc(1, sizeof(*sched_thread));
206 	assert(sched_thread != NULL);
207 
208 	sched_thread->thread = spdk_thread_create(req.name, cpu_set);
209 	assert(sched_thread->thread != NULL);
210 	free(cpu_set);
211 
212 	sched_thread->request = request;
213 	sched_thread->active_percent = req.active_percent;
214 
215 	spdk_thread_send_msg(sched_thread->thread, rpc_register_poller, sched_thread);
216 
217 	free_rpc_thread_create(&req);
218 
219 	pthread_mutex_lock(&g_sched_list_mutex);
220 	TAILQ_INSERT_TAIL(&g_sched_threads, sched_thread, link);
221 	pthread_mutex_unlock(&g_sched_list_mutex);
222 
223 	return;
224 }
225 
226 SPDK_RPC_REGISTER("scheduler_thread_create", rpc_scheduler_thread_create, SPDK_RPC_RUNTIME)
227 
228 struct rpc_thread_set_active_ctx {
229 	int active_percent;
230 	struct spdk_jsonrpc_request *request;
231 };
232 
233 struct rpc_thread_set_active {
234 	uint64_t thread_id;
235 	int active_percent;
236 };
237 
238 static const struct spdk_json_object_decoder rpc_thread_set_active_decoders[] = {
239 	{"thread_id", offsetof(struct rpc_thread_set_active, thread_id), spdk_json_decode_uint64},
240 	{"active", offsetof(struct rpc_thread_set_active, active_percent), spdk_json_decode_uint64},
241 };
242 
243 static void
244 rpc_scheduler_thread_set_active_cb(void *arg)
245 {
246 	struct rpc_thread_set_active_ctx *ctx = arg;
247 	uint64_t thread_id;
248 	struct sched_thread *sched_thread;
249 
250 	thread_id = spdk_thread_get_id(spdk_get_thread());
251 
252 	pthread_mutex_lock(&g_sched_list_mutex);
253 	TAILQ_FOREACH(sched_thread, &g_sched_threads, link) {
254 		if (spdk_thread_get_id(sched_thread->thread) == thread_id) {
255 			sched_thread->active_percent = ctx->active_percent;
256 			update_pollers(sched_thread);
257 			pthread_mutex_unlock(&g_sched_list_mutex);
258 			spdk_jsonrpc_send_bool_response(ctx->request, true);
259 			free(ctx);
260 			return;
261 		}
262 	}
263 	pthread_mutex_unlock(&g_sched_list_mutex);
264 
265 	spdk_jsonrpc_send_error_response(ctx->request, -ENOENT, spdk_strerror(ENOENT));
266 	free(ctx);
267 	return;
268 }
269 
270 static void
271 rpc_scheduler_thread_set_active(struct spdk_jsonrpc_request *request,
272 				const struct spdk_json_val *params)
273 {
274 	struct spdk_thread *thread;
275 	struct rpc_thread_set_active req = {0};
276 	struct rpc_thread_set_active_ctx *ctx;
277 
278 	if (spdk_json_decode_object(params, rpc_thread_set_active_decoders,
279 				    SPDK_COUNTOF(rpc_thread_set_active_decoders),
280 				    &req)) {
281 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
282 						 "Invalid parameters provided");
283 		return;
284 	}
285 
286 	if (req.active_percent < 0 || req.active_percent > 100) {
287 		SPDK_ERRLOG("invalid percent value %d\n", req.active_percent);
288 		spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
289 		return;
290 	}
291 
292 	thread = spdk_thread_get_by_id(req.thread_id);
293 	if (thread == NULL) {
294 		spdk_jsonrpc_send_error_response(request, -ENOENT, spdk_strerror(ENOENT));
295 		return;
296 	}
297 
298 	ctx = calloc(1, sizeof(*ctx));
299 	if (ctx == NULL) {
300 		spdk_jsonrpc_send_error_response(request, -ENOMEM, spdk_strerror(-ENOMEM));
301 		return;
302 	}
303 	ctx->request = request;
304 	ctx->active_percent = req.active_percent;
305 
306 	spdk_thread_send_msg(thread, rpc_scheduler_thread_set_active_cb, ctx);
307 }
308 
309 SPDK_RPC_REGISTER("scheduler_thread_set_active", rpc_scheduler_thread_set_active, SPDK_RPC_RUNTIME)
310 
311 struct rpc_thread_delete_ctx {
312 	struct spdk_jsonrpc_request *request;
313 };
314 
315 struct rpc_thread_delete {
316 	uint64_t thread_id;
317 };
318 
319 static const struct spdk_json_object_decoder rpc_thread_delete_decoders[] = {
320 	{"thread_id", offsetof(struct rpc_thread_delete, thread_id), spdk_json_decode_uint64},
321 };
322 
323 static void
324 rpc_scheduler_thread_delete_cb(void *arg)
325 {
326 	struct rpc_thread_delete_ctx *ctx = arg;
327 	struct sched_thread *sched_thread;
328 	uint64_t thread_id;
329 
330 	thread_id = spdk_thread_get_id(spdk_get_thread());
331 
332 	pthread_mutex_lock(&g_sched_list_mutex);
333 	TAILQ_FOREACH(sched_thread, &g_sched_threads, link) {
334 		if (spdk_thread_get_id(sched_thread->thread) == thread_id) {
335 			thread_delete(sched_thread);
336 			pthread_mutex_unlock(&g_sched_list_mutex);
337 			spdk_jsonrpc_send_bool_response(ctx->request, true);
338 			free(ctx);
339 			return;
340 		}
341 	}
342 	pthread_mutex_unlock(&g_sched_list_mutex);
343 
344 	spdk_jsonrpc_send_error_response(ctx->request, -ENOENT, spdk_strerror(ENOENT));
345 	free(ctx);
346 	return;
347 }
348 
349 static void
350 rpc_scheduler_thread_delete(struct spdk_jsonrpc_request *request,
351 			    const struct spdk_json_val *params)
352 {
353 	struct spdk_thread *thread;
354 	struct rpc_thread_delete req = {0};
355 	struct rpc_thread_delete_ctx *ctx;
356 
357 	if (spdk_json_decode_object(params, rpc_thread_delete_decoders,
358 				    SPDK_COUNTOF(rpc_thread_delete_decoders),
359 				    &req)) {
360 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
361 						 "Invalid parameters provided");
362 		return;
363 	}
364 
365 	thread = spdk_thread_get_by_id(req.thread_id);
366 	if (thread == NULL) {
367 		spdk_jsonrpc_send_error_response(request, -ENOENT, spdk_strerror(ENOENT));
368 		return;
369 	}
370 
371 	ctx = calloc(1, sizeof(*ctx));
372 	if (ctx == NULL) {
373 		spdk_jsonrpc_send_error_response(request, -ENOMEM, spdk_strerror(-ENOMEM));
374 		return;
375 	}
376 	ctx->request = request;
377 
378 	spdk_thread_send_msg(thread, rpc_scheduler_thread_delete_cb, ctx);
379 }
380 
381 SPDK_RPC_REGISTER("scheduler_thread_delete", rpc_scheduler_thread_delete, SPDK_RPC_RUNTIME)
382 
383 static void
384 test_shutdown(void)
385 {
386 	g_is_running = false;
387 	SPDK_NOTICELOG("Scheduler test application stopped.\n");
388 	pthread_mutex_lock(&g_sched_list_mutex);
389 	if (TAILQ_EMPTY(&g_sched_threads)) {
390 		spdk_app_stop(0);
391 	}
392 	pthread_mutex_unlock(&g_sched_list_mutex);
393 }
394 
395 static void
396 for_each_nop(void *arg1, void *arg2)
397 {
398 }
399 
400 static void
401 for_each_done(void *arg1, void *arg2)
402 {
403 	spdk_for_each_reactor(for_each_nop, NULL, NULL, for_each_done);
404 }
405 
406 static void
407 test_start(void *arg1)
408 {
409 	SPDK_NOTICELOG("Scheduler test application started.\n");
410 	/* Start an spdk_for_each_reactor operation that just keeps
411 	 * running over and over again until the app exits.  This
412 	 * serves as a regression test for SPDK issue #2206, ensuring
413 	 * that any pending spdk_for_each_reactor operations are
414 	 * completed before reactors are shut down.
415 	 */
416 	for_each_done(NULL, NULL);
417 }
418 
419 int
420 main(int argc, char **argv)
421 {
422 	struct spdk_app_opts opts;
423 	int rc = 0;
424 
425 	spdk_app_opts_init(&opts, sizeof(opts));
426 	opts.name = "scheduler";
427 	opts.shutdown_cb = test_shutdown;
428 
429 	if ((rc = spdk_app_parse_args(argc, argv, &opts,
430 				      NULL, NULL, NULL, NULL)) != SPDK_APP_PARSE_ARGS_SUCCESS) {
431 		return rc;
432 	}
433 
434 	rc = spdk_app_start(&opts, test_start, NULL);
435 
436 	spdk_app_fini();
437 
438 	return rc;
439 }
440