1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (C) 2021 Intel Corporation.
3 * All rights reserved.
4 */
5
6 #include "spdk/stdinc.h"
7
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/likely.h"
11 #include "spdk/json.h"
12 #include "spdk/jsonrpc.h"
13 #include "spdk/rpc.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17
18 #include "spdk_internal/event.h"
19
20 static bool g_is_running = true;
21 pthread_mutex_t g_sched_list_mutex = PTHREAD_MUTEX_INITIALIZER;
22 #define TIMESLICE_US 100 * 1000
23 static bool g_for_each_reactor = false;
24
25 struct sched_thread {
26 struct spdk_thread *thread;
27 struct spdk_poller *poller;
28 struct spdk_poller *idle_poller;
29 int active_percent;
30 struct spdk_jsonrpc_request *request;
31 TAILQ_ENTRY(sched_thread) link;
32 };
33
34 static TAILQ_HEAD(, sched_thread) g_sched_threads = TAILQ_HEAD_INITIALIZER(g_sched_threads);
35
36 struct rpc_thread_create {
37 int active_percent;
38 char *name;
39 char *cpu_mask;
40 };
41
42 static void
free_rpc_thread_create(struct rpc_thread_create * req)43 free_rpc_thread_create(struct rpc_thread_create *req)
44 {
45 free(req->name);
46 free(req->cpu_mask);
47 }
48
49 static const struct spdk_json_object_decoder rpc_thread_create_decoders[] = {
50 {"active", offsetof(struct rpc_thread_create, active_percent), spdk_json_decode_uint64},
51 {"name", offsetof(struct rpc_thread_create, name), spdk_json_decode_string, true},
52 {"cpu_mask", offsetof(struct rpc_thread_create, cpu_mask), spdk_json_decode_string, true},
53 };
54
55 static void
rpc_scheduler_thread_create_cb(struct spdk_jsonrpc_request * request,uint64_t thread_id)56 rpc_scheduler_thread_create_cb(struct spdk_jsonrpc_request *request, uint64_t thread_id)
57 {
58 struct spdk_json_write_ctx *w;
59
60 w = spdk_jsonrpc_begin_result(request);
61 spdk_json_write_uint64(w, thread_id);
62 spdk_jsonrpc_end_result(request, w);
63 }
64
65 static void
thread_delete(struct sched_thread * sched_thread)66 thread_delete(struct sched_thread *sched_thread)
67 {
68 spdk_poller_unregister(&sched_thread->poller);
69 spdk_poller_unregister(&sched_thread->idle_poller);
70 spdk_thread_exit(sched_thread->thread);
71
72 TAILQ_REMOVE(&g_sched_threads, sched_thread, link);
73 free(sched_thread);
74
75 if (!g_is_running && TAILQ_EMPTY(&g_sched_threads)) {
76 spdk_app_stop(0);
77 }
78 }
79
80 static int
poller_run_busy(void * arg)81 poller_run_busy(void *arg)
82 {
83 struct sched_thread *sched_thread = arg;
84
85 if (spdk_unlikely(!g_is_running)) {
86 pthread_mutex_lock(&g_sched_list_mutex);
87 thread_delete(sched_thread);
88 pthread_mutex_unlock(&g_sched_list_mutex);
89 return SPDK_POLLER_IDLE;
90 }
91
92 spdk_delay_us(TIMESLICE_US * sched_thread->active_percent / 100);
93 return SPDK_POLLER_BUSY;
94 }
95
96 static int
poller_run_idle(void * arg)97 poller_run_idle(void *arg)
98 {
99 struct sched_thread *sched_thread = arg;
100
101 if (spdk_unlikely(!g_is_running)) {
102 pthread_mutex_lock(&g_sched_list_mutex);
103 thread_delete(sched_thread);
104 pthread_mutex_unlock(&g_sched_list_mutex);
105 return SPDK_POLLER_IDLE;
106 }
107
108 spdk_delay_us(10);
109 return SPDK_POLLER_IDLE;
110 }
111
112 static void
update_pollers(struct sched_thread * sched_thread)113 update_pollers(struct sched_thread *sched_thread)
114 {
115 spdk_poller_unregister(&sched_thread->poller);
116 if (sched_thread->active_percent > 0) {
117 sched_thread->poller = spdk_poller_register_named(poller_run_busy, sched_thread, TIMESLICE_US,
118 spdk_thread_get_name(sched_thread->thread));
119 assert(sched_thread->poller != NULL);
120 }
121 if (sched_thread->idle_poller == NULL) {
122 sched_thread->idle_poller = spdk_poller_register_named(poller_run_idle, sched_thread, 0,
123 "idle_poller");
124 assert(sched_thread->idle_poller != NULL);
125 }
126 }
127
128 static void
rpc_register_poller(void * arg)129 rpc_register_poller(void *arg)
130 {
131 struct sched_thread *sched_thread = arg;
132
133 update_pollers(sched_thread);
134
135 if (sched_thread->request != NULL) {
136 rpc_scheduler_thread_create_cb(sched_thread->request, spdk_thread_get_id(sched_thread->thread));
137 sched_thread->request = NULL;
138 }
139 }
140
141 static void
rpc_scheduler_thread_create(struct spdk_jsonrpc_request * request,const struct spdk_json_val * params)142 rpc_scheduler_thread_create(struct spdk_jsonrpc_request *request,
143 const struct spdk_json_val *params)
144 {
145 struct sched_thread *sched_thread;
146 struct rpc_thread_create req = {0};
147 struct spdk_cpuset *cpu_set = NULL;
148 int rc = 0;
149
150 if (spdk_json_decode_object(params, rpc_thread_create_decoders,
151 SPDK_COUNTOF(rpc_thread_create_decoders),
152 &req)) {
153 spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
154 "Invalid parameters provided");
155 return;
156 }
157
158 if (req.active_percent < 0 || req.active_percent > 100) {
159 SPDK_ERRLOG("invalid percent value %d\n", req.active_percent);
160 spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
161 free_rpc_thread_create(&req);
162 return;
163 }
164
165 if (req.cpu_mask != NULL) {
166 cpu_set = calloc(1, sizeof(*cpu_set));
167 assert(cpu_set != NULL);
168 rc = spdk_cpuset_parse(cpu_set, req.cpu_mask);
169 if (rc < 0) {
170 SPDK_ERRLOG("invalid cpumask %s\n", req.cpu_mask);
171 spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
172 free_rpc_thread_create(&req);
173 free(cpu_set);
174 return;
175 }
176 }
177
178 sched_thread = calloc(1, sizeof(*sched_thread));
179 assert(sched_thread != NULL);
180
181 sched_thread->thread = spdk_thread_create(req.name, cpu_set);
182 assert(sched_thread->thread != NULL);
183 free(cpu_set);
184
185 sched_thread->request = request;
186 sched_thread->active_percent = req.active_percent;
187
188 spdk_thread_send_msg(sched_thread->thread, rpc_register_poller, sched_thread);
189
190 free_rpc_thread_create(&req);
191
192 pthread_mutex_lock(&g_sched_list_mutex);
193 TAILQ_INSERT_TAIL(&g_sched_threads, sched_thread, link);
194 pthread_mutex_unlock(&g_sched_list_mutex);
195
196 return;
197 }
198
199 SPDK_RPC_REGISTER("scheduler_thread_create", rpc_scheduler_thread_create, SPDK_RPC_RUNTIME)
200
201 struct rpc_thread_set_active_ctx {
202 int active_percent;
203 struct spdk_jsonrpc_request *request;
204 };
205
206 struct rpc_thread_set_active {
207 uint64_t thread_id;
208 int active_percent;
209 };
210
211 static const struct spdk_json_object_decoder rpc_thread_set_active_decoders[] = {
212 {"thread_id", offsetof(struct rpc_thread_set_active, thread_id), spdk_json_decode_uint64},
213 {"active", offsetof(struct rpc_thread_set_active, active_percent), spdk_json_decode_uint64},
214 };
215
216 static void
rpc_scheduler_thread_set_active_cb(void * arg)217 rpc_scheduler_thread_set_active_cb(void *arg)
218 {
219 struct rpc_thread_set_active_ctx *ctx = arg;
220 uint64_t thread_id;
221 struct sched_thread *sched_thread;
222
223 thread_id = spdk_thread_get_id(spdk_get_thread());
224
225 pthread_mutex_lock(&g_sched_list_mutex);
226 TAILQ_FOREACH(sched_thread, &g_sched_threads, link) {
227 if (spdk_thread_get_id(sched_thread->thread) == thread_id) {
228 sched_thread->active_percent = ctx->active_percent;
229 update_pollers(sched_thread);
230 pthread_mutex_unlock(&g_sched_list_mutex);
231 spdk_jsonrpc_send_bool_response(ctx->request, true);
232 free(ctx);
233 return;
234 }
235 }
236 pthread_mutex_unlock(&g_sched_list_mutex);
237
238 spdk_jsonrpc_send_error_response(ctx->request, -ENOENT, spdk_strerror(ENOENT));
239 free(ctx);
240 return;
241 }
242
243 static void
rpc_scheduler_thread_set_active(struct spdk_jsonrpc_request * request,const struct spdk_json_val * params)244 rpc_scheduler_thread_set_active(struct spdk_jsonrpc_request *request,
245 const struct spdk_json_val *params)
246 {
247 struct spdk_thread *thread;
248 struct rpc_thread_set_active req = {0};
249 struct rpc_thread_set_active_ctx *ctx;
250
251 if (spdk_json_decode_object(params, rpc_thread_set_active_decoders,
252 SPDK_COUNTOF(rpc_thread_set_active_decoders),
253 &req)) {
254 spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
255 "Invalid parameters provided");
256 return;
257 }
258
259 if (req.active_percent < 0 || req.active_percent > 100) {
260 SPDK_ERRLOG("invalid percent value %d\n", req.active_percent);
261 spdk_jsonrpc_send_error_response(request, -EINVAL, spdk_strerror(EINVAL));
262 return;
263 }
264
265 thread = spdk_thread_get_by_id(req.thread_id);
266 if (thread == NULL) {
267 spdk_jsonrpc_send_error_response(request, -ENOENT, spdk_strerror(ENOENT));
268 return;
269 }
270
271 ctx = calloc(1, sizeof(*ctx));
272 if (ctx == NULL) {
273 spdk_jsonrpc_send_error_response(request, -ENOMEM, spdk_strerror(-ENOMEM));
274 return;
275 }
276 ctx->request = request;
277 ctx->active_percent = req.active_percent;
278
279 spdk_thread_send_msg(thread, rpc_scheduler_thread_set_active_cb, ctx);
280 }
281
282 SPDK_RPC_REGISTER("scheduler_thread_set_active", rpc_scheduler_thread_set_active, SPDK_RPC_RUNTIME)
283
284 struct rpc_thread_delete_ctx {
285 struct spdk_jsonrpc_request *request;
286 };
287
288 struct rpc_thread_delete {
289 uint64_t thread_id;
290 };
291
292 static const struct spdk_json_object_decoder rpc_thread_delete_decoders[] = {
293 {"thread_id", offsetof(struct rpc_thread_delete, thread_id), spdk_json_decode_uint64},
294 };
295
296 static void
rpc_scheduler_thread_delete_cb(void * arg)297 rpc_scheduler_thread_delete_cb(void *arg)
298 {
299 struct rpc_thread_delete_ctx *ctx = arg;
300 struct sched_thread *sched_thread;
301 uint64_t thread_id;
302
303 thread_id = spdk_thread_get_id(spdk_get_thread());
304
305 pthread_mutex_lock(&g_sched_list_mutex);
306 TAILQ_FOREACH(sched_thread, &g_sched_threads, link) {
307 if (spdk_thread_get_id(sched_thread->thread) == thread_id) {
308 thread_delete(sched_thread);
309 pthread_mutex_unlock(&g_sched_list_mutex);
310 spdk_jsonrpc_send_bool_response(ctx->request, true);
311 free(ctx);
312 return;
313 }
314 }
315 pthread_mutex_unlock(&g_sched_list_mutex);
316
317 spdk_jsonrpc_send_error_response(ctx->request, -ENOENT, spdk_strerror(ENOENT));
318 free(ctx);
319 return;
320 }
321
322 static void
rpc_scheduler_thread_delete(struct spdk_jsonrpc_request * request,const struct spdk_json_val * params)323 rpc_scheduler_thread_delete(struct spdk_jsonrpc_request *request,
324 const struct spdk_json_val *params)
325 {
326 struct spdk_thread *thread;
327 struct rpc_thread_delete req = {0};
328 struct rpc_thread_delete_ctx *ctx;
329
330 if (spdk_json_decode_object(params, rpc_thread_delete_decoders,
331 SPDK_COUNTOF(rpc_thread_delete_decoders),
332 &req)) {
333 spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
334 "Invalid parameters provided");
335 return;
336 }
337
338 thread = spdk_thread_get_by_id(req.thread_id);
339 if (thread == NULL) {
340 spdk_jsonrpc_send_error_response(request, -ENOENT, spdk_strerror(ENOENT));
341 return;
342 }
343
344 ctx = calloc(1, sizeof(*ctx));
345 if (ctx == NULL) {
346 spdk_jsonrpc_send_error_response(request, -ENOMEM, spdk_strerror(-ENOMEM));
347 return;
348 }
349 ctx->request = request;
350
351 spdk_thread_send_msg(thread, rpc_scheduler_thread_delete_cb, ctx);
352 }
353
354 SPDK_RPC_REGISTER("scheduler_thread_delete", rpc_scheduler_thread_delete, SPDK_RPC_RUNTIME)
355
356 static void
test_shutdown(void)357 test_shutdown(void)
358 {
359 g_is_running = false;
360 SPDK_NOTICELOG("Scheduler test application stopped.\n");
361 pthread_mutex_lock(&g_sched_list_mutex);
362 if (TAILQ_EMPTY(&g_sched_threads)) {
363 spdk_app_stop(0);
364 }
365 pthread_mutex_unlock(&g_sched_list_mutex);
366 }
367
368 static void
for_each_nop(void * arg1,void * arg2)369 for_each_nop(void *arg1, void *arg2)
370 {
371 }
372
373 static void
for_each_reactor_start(void * arg1,void * arg2)374 for_each_reactor_start(void *arg1, void *arg2)
375 {
376 spdk_for_each_reactor(for_each_nop, NULL, NULL, for_each_reactor_start);
377 }
378
379 static void
test_start(void * arg1)380 test_start(void *arg1)
381 {
382 SPDK_NOTICELOG("Scheduler test application started.\n");
383 /* Start an spdk_for_each_reactor operation that just keeps
384 * running over and over again until the app exits. This
385 * serves as a regression test for SPDK issue #2206, ensuring
386 * that any pending spdk_for_each_reactor operations are
387 * completed before reactors are shut down.
388 */
389 if (g_for_each_reactor) {
390 for_each_reactor_start(NULL, NULL);
391 }
392 }
393
394 static void
scheduler_usage(void)395 scheduler_usage(void)
396 {
397 printf(" -f Enable spdk_for_each_reactor regression test\n");
398 }
399
400 static int
scheduler_parse_arg(int ch,char * arg)401 scheduler_parse_arg(int ch, char *arg)
402 {
403 switch (ch) {
404 case 'f':
405 g_for_each_reactor = true;
406 break;
407 default:
408 return -EINVAL;
409 }
410 return 0;
411 }
412
413 int
main(int argc,char ** argv)414 main(int argc, char **argv)
415 {
416 struct spdk_app_opts opts;
417 int rc = 0;
418
419 spdk_app_opts_init(&opts, sizeof(opts));
420 opts.name = "scheduler";
421 opts.shutdown_cb = test_shutdown;
422
423 if ((rc = spdk_app_parse_args(argc, argv, &opts,
424 "f", NULL, scheduler_parse_arg, scheduler_usage)) != SPDK_APP_PARSE_ARGS_SUCCESS) {
425 return rc;
426 }
427
428 rc = spdk_app_start(&opts, test_start, NULL);
429
430 spdk_app_fini();
431
432 return rc;
433 }
434