xref: /spdk/test/event/scheduler/scheduler.c (revision e1d06d9954b871531c9b376069d620d2c6cee854)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/likely.h"
39 #include "spdk/json.h"
40 #include "spdk/jsonrpc.h"
41 #include "spdk/rpc.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 
46 static bool g_is_running = true;
47 pthread_mutex_t g_sched_list_mutex = PTHREAD_MUTEX_INITIALIZER;
48 #define TIMESLICE_US 100 /* Execution time of single busy thread poll in us. */
49 static uint64_t g_core_time_period;
50 static uint64_t g_timeslice_tsc;
51 
52 struct sched_thread {
53 	struct spdk_thread *thread;
54 	struct spdk_poller *poller;
55 	int active_percent;
56 	uint64_t next_period_tsc;
57 	uint64_t remaining_tsc;
58 	struct spdk_jsonrpc_request *request;
59 	TAILQ_ENTRY(sched_thread) link;
60 };
61 
62 static TAILQ_HEAD(, sched_thread) g_sched_threads = TAILQ_HEAD_INITIALIZER(g_sched_threads);
63 
64 struct rpc_thread_create {
65 	int active_percent;
66 	char *name;
67 	char *cpu_mask;
68 };
69 
70 static void
71 free_rpc_thread_create(struct rpc_thread_create *req)
72 {
73 	free(req->name);
74 	free(req->cpu_mask);
75 }
76 
77 static const struct spdk_json_object_decoder rpc_thread_create_decoders[] = {
78 	{"active", offsetof(struct rpc_thread_create, active_percent), spdk_json_decode_uint64},
79 	{"name", offsetof(struct rpc_thread_create, name), spdk_json_decode_string, true},
80 	{"cpu_mask", offsetof(struct rpc_thread_create, cpu_mask), spdk_json_decode_string, true},
81 };
82 
83 static void
84 rpc_scheduler_thread_create_cb(struct spdk_jsonrpc_request *request, uint64_t thread_id)
85 {
86 	struct spdk_json_write_ctx *w;
87 
88 	w = spdk_jsonrpc_begin_result(request);
89 	spdk_json_write_uint64(w, thread_id);
90 	spdk_jsonrpc_end_result(request, w);
91 }
92 
93 static void
94 thread_delete(struct sched_thread *sched_thread)
95 {
96 	spdk_poller_unregister(&sched_thread->poller);
97 	spdk_thread_exit(sched_thread->thread);
98 
99 	TAILQ_REMOVE(&g_sched_threads, sched_thread, link);
100 	free(sched_thread);
101 
102 	if (!g_is_running && TAILQ_EMPTY(&g_sched_threads)) {
103 		spdk_app_stop(0);
104 	}
105 }
106 
107 static int
108 poller_run(void *arg)
109 {
110 	struct sched_thread *sched_thread = arg;
111 	uint64_t now;
112 
113 	if (spdk_unlikely(!g_is_running)) {
114 		pthread_mutex_lock(&g_sched_list_mutex);
115 		thread_delete(sched_thread);
116 		pthread_mutex_unlock(&g_sched_list_mutex);
117 		return SPDK_POLLER_IDLE;
118 	}
119 
120 	now = spdk_get_ticks();
121 
122 	/* Reset the timers once we go over single core time period */
123 	if (sched_thread->next_period_tsc <= now) {
124 		sched_thread->next_period_tsc = now + g_core_time_period;
125 		sched_thread->remaining_tsc = (g_core_time_period / 100) * sched_thread->active_percent;
126 	}
127 
128 	if (sched_thread->remaining_tsc > 0) {
129 		spdk_delay_us(TIMESLICE_US);
130 		sched_thread->remaining_tsc -= spdk_min(sched_thread->remaining_tsc, g_timeslice_tsc);
131 		return SPDK_POLLER_BUSY;
132 	}
133 
134 	return SPDK_POLLER_IDLE;
135 }
136 
137 static void
138 rpc_register_poller(void *arg)
139 {
140 	struct sched_thread *sched_thread = arg;
141 
142 	sched_thread->poller = spdk_poller_register_named(poller_run, sched_thread, 0,
143 			       spdk_thread_get_name(sched_thread->thread));
144 	assert(sched_thread->poller != NULL);
145 
146 	if (sched_thread->request != NULL) {
147 		rpc_scheduler_thread_create_cb(sched_thread->request, spdk_thread_get_id(sched_thread->thread));
148 		sched_thread->request = NULL;
149 	}
150 }
151 
152 static void
153 rpc_scheduler_thread_create(struct spdk_jsonrpc_request *request,
154 			    const struct spdk_json_val *params)
155 {
156 	struct sched_thread *sched_thread;
157 	struct rpc_thread_create req = {0};
158 	struct spdk_cpuset *cpu_set = NULL;
159 	int rc = 0;
160 
161 	if (spdk_json_decode_object(params, rpc_thread_create_decoders,
162 				    SPDK_COUNTOF(rpc_thread_create_decoders),
163 				    &req)) {
164 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
165 						 "Invalid parameters provided");
166 		return;
167 	}
168 
169 	if (req.active_percent < 0 || req.active_percent > 100) {
170 		SPDK_ERRLOG("invalid percent value %d\n", req.active_percent);
171 		spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
172 		free_rpc_thread_create(&req);
173 		return;
174 	}
175 
176 	if (req.cpu_mask != NULL) {
177 		cpu_set = calloc(1, sizeof(*cpu_set));
178 		assert(cpu_set != NULL);
179 		rc = spdk_cpuset_parse(cpu_set, req.cpu_mask);
180 		if (rc < 0) {
181 			SPDK_ERRLOG("invalid cpumask %s\n", req.cpu_mask);
182 			spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
183 			free_rpc_thread_create(&req);
184 			free(cpu_set);
185 			return;
186 		}
187 	}
188 
189 	sched_thread = calloc(1, sizeof(*sched_thread));
190 	assert(sched_thread != NULL);
191 
192 	sched_thread->thread = spdk_thread_create(req.name, cpu_set);
193 	assert(sched_thread->thread != NULL);
194 	free(cpu_set);
195 
196 	sched_thread->request = request;
197 	sched_thread->active_percent = req.active_percent;
198 	sched_thread->next_period_tsc = 0;
199 
200 	spdk_thread_send_msg(sched_thread->thread, rpc_register_poller, sched_thread);
201 
202 	free_rpc_thread_create(&req);
203 
204 	pthread_mutex_lock(&g_sched_list_mutex);
205 	TAILQ_INSERT_TAIL(&g_sched_threads, sched_thread, link);
206 	pthread_mutex_unlock(&g_sched_list_mutex);
207 
208 	return;
209 }
210 
211 SPDK_RPC_REGISTER("scheduler_thread_create", rpc_scheduler_thread_create, SPDK_RPC_RUNTIME)
212 
213 struct rpc_thread_set_active_ctx {
214 	int active_percent;
215 	struct spdk_jsonrpc_request *request;
216 };
217 
218 struct rpc_thread_set_active {
219 	uint64_t thread_id;
220 	int active_percent;
221 };
222 
223 static const struct spdk_json_object_decoder rpc_thread_set_active_decoders[] = {
224 	{"thread_id", offsetof(struct rpc_thread_set_active, thread_id), spdk_json_decode_uint64},
225 	{"active", offsetof(struct rpc_thread_set_active, active_percent), spdk_json_decode_uint64},
226 };
227 
228 static void
229 rpc_scheduler_thread_set_active_cb(void *arg)
230 {
231 	struct rpc_thread_set_active_ctx *ctx = arg;
232 	uint64_t thread_id;
233 	struct sched_thread *sched_thread;
234 
235 	thread_id = spdk_thread_get_id(spdk_get_thread());
236 
237 	pthread_mutex_lock(&g_sched_list_mutex);
238 	TAILQ_FOREACH(sched_thread, &g_sched_threads, link) {
239 		if (spdk_thread_get_id(sched_thread->thread) == thread_id) {
240 			/* Reset next_period_tsc to force recalculation of remaining_tsc. */
241 			sched_thread->next_period_tsc = 0;
242 			sched_thread->active_percent = ctx->active_percent;
243 			pthread_mutex_unlock(&g_sched_list_mutex);
244 			spdk_jsonrpc_send_bool_response(ctx->request, true);
245 			free(ctx);
246 			return;
247 		}
248 	}
249 	pthread_mutex_unlock(&g_sched_list_mutex);
250 
251 	spdk_jsonrpc_send_error_response(ctx->request, -ENOENT, spdk_strerror(ENOENT));
252 	free(ctx);
253 	return;
254 }
255 
256 static void
257 rpc_scheduler_thread_set_active(struct spdk_jsonrpc_request *request,
258 				const struct spdk_json_val *params)
259 {
260 	struct spdk_thread *thread;
261 	struct rpc_thread_set_active req = {0};
262 	struct rpc_thread_set_active_ctx *ctx;
263 
264 	if (spdk_json_decode_object(params, rpc_thread_set_active_decoders,
265 				    SPDK_COUNTOF(rpc_thread_set_active_decoders),
266 				    &req)) {
267 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
268 						 "Invalid parameters provided");
269 		return;
270 	}
271 
272 	if (req.active_percent < 0 || req.active_percent > 100) {
273 		SPDK_ERRLOG("invalid percent value %d\n", req.active_percent);
274 		spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
275 		return;
276 	}
277 
278 	thread = spdk_thread_get_by_id(req.thread_id);
279 	if (thread == NULL) {
280 		spdk_jsonrpc_send_error_response(request, -ENOENT, spdk_strerror(ENOENT));
281 		return;
282 	}
283 
284 	ctx = calloc(1, sizeof(*ctx));
285 	if (ctx == NULL) {
286 		spdk_jsonrpc_send_error_response(request, -ENOMEM, spdk_strerror(-ENOMEM));
287 		return;
288 	}
289 	ctx->request = request;
290 	ctx->active_percent = req.active_percent;
291 
292 	spdk_thread_send_msg(thread, rpc_scheduler_thread_set_active_cb, ctx);
293 }
294 
295 SPDK_RPC_REGISTER("scheduler_thread_set_active", rpc_scheduler_thread_set_active, SPDK_RPC_RUNTIME)
296 
297 struct rpc_thread_delete_ctx {
298 	struct spdk_jsonrpc_request *request;
299 };
300 
301 struct rpc_thread_delete {
302 	uint64_t thread_id;
303 };
304 
305 static const struct spdk_json_object_decoder rpc_thread_delete_decoders[] = {
306 	{"thread_id", offsetof(struct rpc_thread_delete, thread_id), spdk_json_decode_uint64},
307 };
308 
309 static void
310 rpc_scheduler_thread_delete_cb(void *arg)
311 {
312 	struct rpc_thread_delete_ctx *ctx = arg;
313 	struct sched_thread *sched_thread;
314 	uint64_t thread_id;
315 
316 	thread_id = spdk_thread_get_id(spdk_get_thread());
317 
318 	pthread_mutex_lock(&g_sched_list_mutex);
319 	TAILQ_FOREACH(sched_thread, &g_sched_threads, link) {
320 		if (spdk_thread_get_id(sched_thread->thread) == thread_id) {
321 			thread_delete(sched_thread);
322 			pthread_mutex_unlock(&g_sched_list_mutex);
323 			spdk_jsonrpc_send_bool_response(ctx->request, true);
324 			free(ctx);
325 			return;
326 		}
327 	}
328 	pthread_mutex_unlock(&g_sched_list_mutex);
329 
330 	spdk_jsonrpc_send_error_response(ctx->request, -ENOENT, spdk_strerror(ENOENT));
331 	free(ctx);
332 	return;
333 }
334 
335 static void
336 rpc_scheduler_thread_delete(struct spdk_jsonrpc_request *request,
337 			    const struct spdk_json_val *params)
338 {
339 	struct spdk_thread *thread;
340 	struct rpc_thread_delete req = {0};
341 	struct rpc_thread_delete_ctx *ctx;
342 
343 	if (spdk_json_decode_object(params, rpc_thread_delete_decoders,
344 				    SPDK_COUNTOF(rpc_thread_delete_decoders),
345 				    &req)) {
346 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
347 						 "Invalid parameters provided");
348 		return;
349 	}
350 
351 	thread = spdk_thread_get_by_id(req.thread_id);
352 	if (thread == NULL) {
353 		spdk_jsonrpc_send_error_response(request, -ENOENT, spdk_strerror(ENOENT));
354 		return;
355 	}
356 
357 	ctx = calloc(1, sizeof(*ctx));
358 	if (ctx == NULL) {
359 		spdk_jsonrpc_send_error_response(request, -ENOMEM, spdk_strerror(-ENOMEM));
360 		return;
361 	}
362 	ctx->request = request;
363 
364 	spdk_thread_send_msg(thread, rpc_scheduler_thread_delete_cb, ctx);
365 }
366 
367 SPDK_RPC_REGISTER("scheduler_thread_delete", rpc_scheduler_thread_delete, SPDK_RPC_RUNTIME)
368 
369 static void
370 test_shutdown(void)
371 {
372 	g_is_running = false;
373 	SPDK_NOTICELOG("Scheduler test application stopped.\n");
374 	pthread_mutex_lock(&g_sched_list_mutex);
375 	if (TAILQ_EMPTY(&g_sched_threads)) {
376 		spdk_app_stop(0);
377 	}
378 	pthread_mutex_unlock(&g_sched_list_mutex);
379 }
380 
381 static void
382 test_start(void *arg1)
383 {
384 	/* Hardcode g_core_time_period as 100ms. */
385 	g_core_time_period = spdk_get_ticks_hz() / 10;
386 	/* Hardcode g_timeslice_tsc as 100us. */
387 	g_timeslice_tsc = spdk_get_ticks_hz() / SPDK_SEC_TO_USEC * TIMESLICE_US;
388 
389 	SPDK_NOTICELOG("Scheduler test application started.\n");
390 }
391 
392 int
393 main(int argc, char **argv)
394 {
395 	struct spdk_app_opts opts;
396 	int rc = 0;
397 
398 	spdk_app_opts_init(&opts, sizeof(opts));
399 	opts.name = "scheduler";
400 	opts.shutdown_cb = test_shutdown;
401 
402 	if ((rc = spdk_app_parse_args(argc, argv, &opts,
403 				      NULL, NULL, NULL, NULL)) != SPDK_APP_PARSE_ARGS_SUCCESS) {
404 		return rc;
405 	}
406 
407 	rc = spdk_app_start(&opts, test_start, NULL);
408 
409 	spdk_app_fini();
410 
411 	return rc;
412 }
413