xref: /spdk/lib/event/reactor.c (revision 952532af6688423ac8aab3367c4a3ab13b87842a)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/likely.h"
8 
9 #include "event_internal.h"
10 
11 #include "spdk_internal/event.h"
12 #include "spdk_internal/usdt.h"
13 
14 #include "spdk/log.h"
15 #include "spdk/thread.h"
16 #include "spdk/env.h"
17 #include "spdk/util.h"
18 #include "spdk/scheduler.h"
19 #include "spdk/string.h"
20 #include "spdk/fd_group.h"
21 #include "spdk/trace.h"
22 #include "spdk_internal/trace_defs.h"
23 
24 #ifdef __linux__
25 #include <sys/prctl.h>
26 #include <sys/eventfd.h>
27 #endif
28 
29 #ifdef __FreeBSD__
30 #include <pthread_np.h>
31 #endif
32 
33 #define SPDK_EVENT_BATCH_SIZE		8
34 
35 static struct spdk_reactor *g_reactors;
36 static uint32_t g_reactor_count;
37 static struct spdk_cpuset g_reactor_core_mask;
38 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
39 
40 static bool g_framework_context_switch_monitor_enabled = true;
41 
42 static struct spdk_mempool *g_spdk_event_mempool = NULL;
43 
44 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
45 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
46 
47 static struct spdk_scheduler *g_scheduler = NULL;
48 static struct spdk_reactor *g_scheduling_reactor;
49 bool g_scheduling_in_progress = false;
50 static uint64_t g_scheduler_period = 0;
51 static uint32_t g_scheduler_core_number;
52 static struct spdk_scheduler_core_info *g_core_infos = NULL;
53 static struct spdk_cpuset g_scheduler_isolated_core_mask;
54 
55 TAILQ_HEAD(, spdk_governor) g_governor_list
56 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
57 
58 static struct spdk_governor *g_governor = NULL;
59 
60 static int reactor_interrupt_init(struct spdk_reactor *reactor);
61 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
62 
63 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER;
64 static bool g_stopping_reactors = false;
65 
66 static struct spdk_scheduler *
67 _scheduler_find(const char *name)
68 {
69 	struct spdk_scheduler *tmp;
70 
71 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
72 		if (strcmp(name, tmp->name) == 0) {
73 			return tmp;
74 		}
75 	}
76 
77 	return NULL;
78 }
79 
80 int
81 spdk_scheduler_set(const char *name)
82 {
83 	struct spdk_scheduler *scheduler;
84 	int rc = 0;
85 
86 	/* NULL scheduler was specifically requested */
87 	if (name == NULL) {
88 		if (g_scheduler) {
89 			g_scheduler->deinit();
90 		}
91 		g_scheduler = NULL;
92 		return 0;
93 	}
94 
95 	scheduler = _scheduler_find(name);
96 	if (scheduler == NULL) {
97 		SPDK_ERRLOG("Requested scheduler is missing\n");
98 		return -EINVAL;
99 	}
100 
101 	if (g_scheduler == scheduler) {
102 		return 0;
103 	}
104 
105 	if (g_scheduler) {
106 		g_scheduler->deinit();
107 	}
108 
109 	rc = scheduler->init();
110 	if (rc == 0) {
111 		g_scheduler = scheduler;
112 	} else {
113 		/* Could not switch to the new scheduler, so keep the old
114 		 * one. We need to check if it wasn't NULL, and ->init() it again.
115 		 */
116 		if (g_scheduler) {
117 			SPDK_ERRLOG("Could not ->init() '%s' scheduler, reverting to '%s'\n",
118 				    name, g_scheduler->name);
119 			g_scheduler->init();
120 		} else {
121 			SPDK_ERRLOG("Could not ->init() '%s' scheduler.\n", name);
122 		}
123 	}
124 
125 	return rc;
126 }
127 
128 struct spdk_scheduler *
129 spdk_scheduler_get(void)
130 {
131 	return g_scheduler;
132 }
133 
134 uint64_t
135 spdk_scheduler_get_period(void)
136 {
137 	/* Convert from ticks to microseconds */
138 	return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
139 }
140 
141 void
142 spdk_scheduler_set_period(uint64_t period)
143 {
144 	/* Convert microseconds to ticks */
145 	g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
146 }
147 
148 void
149 spdk_scheduler_register(struct spdk_scheduler *scheduler)
150 {
151 	if (_scheduler_find(scheduler->name)) {
152 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
153 		assert(false);
154 		return;
155 	}
156 
157 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
158 }
159 
160 uint32_t
161 spdk_scheduler_get_scheduling_lcore(void)
162 {
163 	return g_scheduling_reactor->lcore;
164 }
165 
166 bool
167 spdk_scheduler_set_scheduling_lcore(uint32_t core)
168 {
169 	struct spdk_reactor *reactor = spdk_reactor_get(core);
170 	if (reactor == NULL) {
171 		SPDK_ERRLOG("Failed to set scheduling reactor. Reactor(lcore:%d) does not exist", core);
172 		return false;
173 	}
174 
175 	g_scheduling_reactor = reactor;
176 	return true;
177 }
178 
179 bool
180 scheduler_set_isolated_core_mask(struct spdk_cpuset isolated_core_mask)
181 {
182 	struct spdk_cpuset tmp_mask;
183 
184 	spdk_cpuset_copy(&tmp_mask, spdk_app_get_core_mask());
185 	spdk_cpuset_or(&tmp_mask, &isolated_core_mask);
186 	if (spdk_cpuset_equal(&tmp_mask, spdk_app_get_core_mask()) == false) {
187 		SPDK_ERRLOG("Isolated core mask is not included in app core mask.\n");
188 		return false;
189 	}
190 	spdk_cpuset_copy(&g_scheduler_isolated_core_mask, &isolated_core_mask);
191 	return true;
192 }
193 
194 const char *
195 scheduler_get_isolated_core_mask(void)
196 {
197 	return spdk_cpuset_fmt(&g_scheduler_isolated_core_mask);
198 }
199 
200 static bool
201 scheduler_is_isolated_core(uint32_t core)
202 {
203 	return spdk_cpuset_get_cpu(&g_scheduler_isolated_core_mask, core);
204 }
205 
206 static void
207 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
208 {
209 	reactor->lcore = lcore;
210 	reactor->flags.is_valid = true;
211 
212 	TAILQ_INIT(&reactor->threads);
213 	reactor->thread_count = 0;
214 	spdk_cpuset_zero(&reactor->notify_cpuset);
215 
216 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY);
217 	if (reactor->events == NULL) {
218 		SPDK_ERRLOG("Failed to allocate events ring\n");
219 		assert(false);
220 	}
221 
222 	/* Always initialize interrupt facilities for reactor */
223 	if (reactor_interrupt_init(reactor) != 0) {
224 		/* Reactor interrupt facilities are necessary if setting app to interrupt mode. */
225 		if (spdk_interrupt_mode_is_enabled()) {
226 			SPDK_ERRLOG("Failed to prepare intr facilities\n");
227 			assert(false);
228 		}
229 		return;
230 	}
231 
232 	/* If application runs with full interrupt ability,
233 	 * all reactors are going to run in interrupt mode.
234 	 */
235 	if (spdk_interrupt_mode_is_enabled()) {
236 		uint32_t i;
237 
238 		SPDK_ENV_FOREACH_CORE(i) {
239 			spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
240 		}
241 		reactor->in_interrupt = true;
242 	}
243 }
244 
245 struct spdk_reactor *
246 spdk_reactor_get(uint32_t lcore)
247 {
248 	struct spdk_reactor *reactor;
249 
250 	if (g_reactors == NULL) {
251 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
252 		return NULL;
253 	}
254 
255 	if (lcore >= g_reactor_count) {
256 		return NULL;
257 	}
258 
259 	reactor = &g_reactors[lcore];
260 
261 	if (reactor->flags.is_valid == false) {
262 		return NULL;
263 	}
264 
265 	return reactor;
266 }
267 
268 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
269 static bool reactor_thread_op_supported(enum spdk_thread_op op);
270 
271 /* Power of 2 minus 1 is optimal for memory consumption */
272 #define EVENT_MSG_MEMPOOL_SHIFT 14 /* 2^14 = 16384 */
273 #define EVENT_MSG_MEMPOOL_SIZE ((1 << EVENT_MSG_MEMPOOL_SHIFT) - 1)
274 
275 int
276 spdk_reactors_init(size_t msg_mempool_size)
277 {
278 	struct spdk_reactor *reactor;
279 	int rc;
280 	uint32_t i, current_core;
281 	char mempool_name[32];
282 
283 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
284 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
285 			       EVENT_MSG_MEMPOOL_SIZE,
286 			       sizeof(struct spdk_event),
287 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
288 			       SPDK_ENV_NUMA_ID_ANY);
289 
290 	if (g_spdk_event_mempool == NULL) {
291 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
292 		return -1;
293 	}
294 
295 	/* struct spdk_reactor must be aligned on 64 byte boundary */
296 	g_reactor_count = spdk_env_get_last_core() + 1;
297 	rc = posix_memalign((void **)&g_reactors, 64,
298 			    g_reactor_count * sizeof(struct spdk_reactor));
299 	if (rc != 0) {
300 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
301 			    g_reactor_count);
302 		spdk_mempool_free(g_spdk_event_mempool);
303 		return -1;
304 	}
305 
306 	g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos));
307 	if (g_core_infos == NULL) {
308 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
309 		spdk_mempool_free(g_spdk_event_mempool);
310 		free(g_reactors);
311 		return -ENOMEM;
312 	}
313 
314 	memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor));
315 
316 	rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
317 				      sizeof(struct spdk_lw_thread), msg_mempool_size);
318 	if (rc != 0) {
319 		SPDK_ERRLOG("Initialize spdk thread lib failed\n");
320 		spdk_mempool_free(g_spdk_event_mempool);
321 		free(g_reactors);
322 		free(g_core_infos);
323 		return rc;
324 	}
325 
326 	SPDK_ENV_FOREACH_CORE(i) {
327 		reactor_construct(&g_reactors[i], i);
328 	}
329 
330 	current_core = spdk_env_get_current_core();
331 	reactor = spdk_reactor_get(current_core);
332 	assert(reactor != NULL);
333 	g_scheduling_reactor = reactor;
334 
335 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
336 
337 	return 0;
338 }
339 
340 void
341 spdk_reactors_fini(void)
342 {
343 	uint32_t i;
344 	struct spdk_reactor *reactor;
345 
346 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
347 		return;
348 	}
349 
350 	spdk_thread_lib_fini();
351 
352 	SPDK_ENV_FOREACH_CORE(i) {
353 		reactor = spdk_reactor_get(i);
354 		assert(reactor != NULL);
355 		assert(reactor->thread_count == 0);
356 		if (reactor->events != NULL) {
357 			spdk_ring_free(reactor->events);
358 		}
359 
360 		reactor_interrupt_fini(reactor);
361 
362 		if (g_core_infos != NULL) {
363 			free(g_core_infos[i].thread_infos);
364 		}
365 	}
366 
367 	spdk_mempool_free(g_spdk_event_mempool);
368 
369 	free(g_reactors);
370 	g_reactors = NULL;
371 	free(g_core_infos);
372 	g_core_infos = NULL;
373 }
374 
375 static void _reactor_set_interrupt_mode(void *arg1, void *arg2);
376 
377 static void
378 _reactor_set_notify_cpuset(void *arg1, void *arg2)
379 {
380 	struct spdk_reactor *target = arg1;
381 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
382 
383 	assert(reactor != NULL);
384 	spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt);
385 }
386 
387 static void
388 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
389 {
390 	struct spdk_event *ev;
391 
392 	ev = spdk_event_allocate(lcore, fn, arg1, arg2);
393 	assert(ev);
394 	spdk_event_call(ev);
395 }
396 
397 static void
398 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2)
399 {
400 	struct spdk_reactor *target = arg1;
401 
402 	if (target->new_in_interrupt == false) {
403 		target->set_interrupt_mode_in_progress = false;
404 		_event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn,
405 			    target->set_interrupt_mode_cb_arg, NULL);
406 	} else {
407 		_event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL);
408 	}
409 }
410 
411 static void
412 _reactor_set_thread_interrupt_mode(void *ctx)
413 {
414 	struct spdk_reactor *reactor = ctx;
415 
416 	spdk_thread_set_interrupt_mode(reactor->in_interrupt);
417 }
418 
419 static void
420 _reactor_set_interrupt_mode(void *arg1, void *arg2)
421 {
422 	struct spdk_reactor *target = arg1;
423 	struct spdk_thread *thread;
424 	struct spdk_fd_group *grp;
425 	struct spdk_lw_thread *lw_thread, *tmp;
426 
427 	assert(target == spdk_reactor_get(spdk_env_get_current_core()));
428 	assert(target != NULL);
429 	assert(target->in_interrupt != target->new_in_interrupt);
430 	SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n",
431 		      target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll");
432 
433 	target->in_interrupt = target->new_in_interrupt;
434 
435 	if (spdk_interrupt_mode_is_enabled()) {
436 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
437 		TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) {
438 			thread = spdk_thread_get_from_ctx(lw_thread);
439 			if (target->in_interrupt) {
440 				grp = spdk_thread_get_interrupt_fd_group(thread);
441 				spdk_fd_group_nest(target->fgrp, grp);
442 			} else {
443 				grp = spdk_thread_get_interrupt_fd_group(thread);
444 				spdk_fd_group_unnest(target->fgrp, grp);
445 			}
446 
447 			spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target);
448 		}
449 	}
450 
451 	if (target->new_in_interrupt == false) {
452 		/* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately
453 		 * track reactor stats. */
454 		target->tsc_last = spdk_get_ticks();
455 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
456 	} else {
457 		uint64_t notify = 1;
458 		int rc = 0;
459 
460 		/* Always trigger spdk_event and resched event in case of race condition */
461 		rc = write(target->events_fd, &notify, sizeof(notify));
462 		if (rc < 0) {
463 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
464 		}
465 		rc = write(target->resched_fd, &notify, sizeof(notify));
466 		if (rc < 0) {
467 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
468 		}
469 
470 		target->set_interrupt_mode_in_progress = false;
471 		_event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn,
472 			    target->set_interrupt_mode_cb_arg, NULL);
473 	}
474 }
475 
476 int
477 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt,
478 				spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg)
479 {
480 	struct spdk_reactor *target;
481 
482 	target = spdk_reactor_get(lcore);
483 	if (target == NULL) {
484 		return -EINVAL;
485 	}
486 
487 	/* Eventfd has to be supported in order to use interrupt functionality. */
488 	if (target->fgrp == NULL) {
489 		return -ENOTSUP;
490 	}
491 
492 	if (spdk_env_get_current_core() != g_scheduling_reactor->lcore) {
493 		SPDK_ERRLOG("It is only permitted within scheduling reactor.\n");
494 		return -EPERM;
495 	}
496 
497 	if (target->in_interrupt == new_in_interrupt) {
498 		cb_fn(cb_arg, NULL);
499 		return 0;
500 	}
501 
502 	if (target->set_interrupt_mode_in_progress) {
503 		SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore);
504 		return -EBUSY;
505 	}
506 	target->set_interrupt_mode_in_progress = true;
507 
508 	target->new_in_interrupt = new_in_interrupt;
509 	target->set_interrupt_mode_cb_fn = cb_fn;
510 	target->set_interrupt_mode_cb_arg = cb_arg;
511 
512 	SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n",
513 		      spdk_env_get_current_core(), lcore);
514 
515 	if (new_in_interrupt == false) {
516 		/* For potential race cases, when setting the reactor to poll mode,
517 		 * first change the mode of the reactor and then clear the corresponding
518 		 * bit of the notify_cpuset of each reactor.
519 		 */
520 		_event_call(lcore, _reactor_set_interrupt_mode, target, NULL);
521 	} else {
522 		/* For race cases, when setting the reactor to interrupt mode, first set the
523 		 * corresponding bit of the notify_cpuset of each reactor and then change the mode.
524 		 */
525 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
526 	}
527 
528 	return 0;
529 }
530 
531 struct spdk_event *
532 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
533 {
534 	struct spdk_event *event = NULL;
535 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
536 
537 	if (!reactor) {
538 		assert(false);
539 		return NULL;
540 	}
541 
542 	event = spdk_mempool_get(g_spdk_event_mempool);
543 	if (event == NULL) {
544 		assert(false);
545 		return NULL;
546 	}
547 
548 	event->lcore = lcore;
549 	event->fn = fn;
550 	event->arg1 = arg1;
551 	event->arg2 = arg2;
552 
553 	return event;
554 }
555 
556 void
557 spdk_event_call(struct spdk_event *event)
558 {
559 	int rc;
560 	struct spdk_reactor *reactor;
561 	struct spdk_reactor *local_reactor = NULL;
562 	uint32_t current_core = spdk_env_get_current_core();
563 
564 	reactor = spdk_reactor_get(event->lcore);
565 
566 	assert(reactor != NULL);
567 	assert(reactor->events != NULL);
568 
569 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
570 	if (rc != 1) {
571 		assert(false);
572 	}
573 
574 	if (current_core != SPDK_ENV_LCORE_ID_ANY) {
575 		local_reactor = spdk_reactor_get(current_core);
576 	}
577 
578 	/* If spdk_event_call isn't called on a reactor, always send a notification.
579 	 * If it is called on a reactor, send a notification if the destination reactor
580 	 * is indicated in interrupt mode state.
581 	 */
582 	if (spdk_unlikely(local_reactor == NULL) ||
583 	    spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
584 		uint64_t notify = 1;
585 
586 		rc = write(reactor->events_fd, &notify, sizeof(notify));
587 		if (rc < 0) {
588 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
589 		}
590 	}
591 }
592 
593 static inline int
594 event_queue_run_batch(void *arg)
595 {
596 	struct spdk_reactor *reactor = arg;
597 	size_t count, i;
598 	void *events[SPDK_EVENT_BATCH_SIZE];
599 
600 #ifdef DEBUG
601 	/*
602 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
603 	 * so we will never actually read uninitialized data from events, but just to be sure
604 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
605 	 */
606 	memset(events, 0, sizeof(events));
607 #endif
608 
609 	/* Operate event notification if this reactor currently runs in interrupt state */
610 	if (spdk_unlikely(reactor->in_interrupt)) {
611 		uint64_t notify = 1;
612 		int rc;
613 
614 		/* There may be race between event_acknowledge and another producer's event_notify,
615 		 * so event_acknowledge should be applied ahead. And then check for self's event_notify.
616 		 * This can avoid event notification missing.
617 		 */
618 		rc = read(reactor->events_fd, &notify, sizeof(notify));
619 		if (rc < 0) {
620 			SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno));
621 			return -errno;
622 		}
623 
624 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
625 
626 		if (spdk_ring_count(reactor->events) != 0) {
627 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
628 			rc = write(reactor->events_fd, &notify, sizeof(notify));
629 			if (rc < 0) {
630 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
631 				return -errno;
632 			}
633 		}
634 	} else {
635 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
636 	}
637 
638 	if (count == 0) {
639 		return 0;
640 	}
641 
642 	for (i = 0; i < count; i++) {
643 		struct spdk_event *event = events[i];
644 
645 		assert(event != NULL);
646 		assert(spdk_get_thread() == NULL);
647 		SPDK_DTRACE_PROBE3(event_exec, event->fn,
648 				   event->arg1, event->arg2);
649 		event->fn(event->arg1, event->arg2);
650 	}
651 
652 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
653 
654 	return (int)count;
655 }
656 
657 /* 1s */
658 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
659 
660 static int
661 get_rusage(struct spdk_reactor *reactor)
662 {
663 	struct rusage		rusage;
664 
665 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
666 		return -1;
667 	}
668 
669 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
670 		SPDK_INFOLOG(reactor,
671 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
672 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
673 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
674 	}
675 	reactor->rusage = rusage;
676 
677 	return -1;
678 }
679 
680 void
681 spdk_framework_enable_context_switch_monitor(bool enable)
682 {
683 	/* This global is being read by multiple threads, so this isn't
684 	 * strictly thread safe. However, we're toggling between true and
685 	 * false here, and if a thread sees the value update later than it
686 	 * should, it's no big deal. */
687 	g_framework_context_switch_monitor_enabled = enable;
688 }
689 
690 bool
691 spdk_framework_context_switch_monitor_enabled(void)
692 {
693 	return g_framework_context_switch_monitor_enabled;
694 }
695 
696 static void
697 _set_thread_name(const char *thread_name)
698 {
699 #if defined(__linux__)
700 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
701 #elif defined(__FreeBSD__)
702 	pthread_set_name_np(pthread_self(), thread_name);
703 #else
704 	pthread_setname_np(pthread_self(), thread_name);
705 #endif
706 }
707 
708 static void
709 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
710 {
711 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
712 	struct spdk_thread_stats prev_total_stats;
713 
714 	/* Read total_stats before updating it to calculate stats during the last scheduling period. */
715 	prev_total_stats = lw_thread->total_stats;
716 
717 	spdk_set_thread(thread);
718 	spdk_thread_get_stats(&lw_thread->total_stats);
719 	spdk_set_thread(NULL);
720 
721 	lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc;
722 	lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc;
723 }
724 
725 static void
726 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info)
727 {
728 	struct spdk_lw_thread *lw_thread;
729 	struct spdk_thread *thread;
730 
731 	thread = spdk_thread_get_by_id(thread_info->thread_id);
732 	if (thread == NULL) {
733 		/* Thread no longer exists. */
734 		return;
735 	}
736 	lw_thread = spdk_thread_get_ctx(thread);
737 	assert(lw_thread != NULL);
738 
739 	lw_thread->lcore = thread_info->lcore;
740 	lw_thread->resched = true;
741 }
742 
743 static void
744 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
745 {
746 	struct spdk_scheduler_core_info *core;
747 	struct spdk_scheduler_thread_info *thread_info;
748 	uint32_t i, j;
749 
750 	SPDK_ENV_FOREACH_CORE(i) {
751 		core = &cores_info[i];
752 		for (j = 0; j < core->threads_count; j++) {
753 			thread_info = &core->thread_infos[j];
754 			if (thread_info->lcore != i) {
755 				if (core->isolated || cores_info[thread_info->lcore].isolated) {
756 					SPDK_ERRLOG("A thread cannot be moved from an isolated core or \
757 								moved to an isolated core. Skip rescheduling thread\n");
758 					continue;
759 				}
760 				_threads_reschedule_thread(thread_info);
761 			}
762 		}
763 		core->threads_count = 0;
764 		free(core->thread_infos);
765 		core->thread_infos = NULL;
766 	}
767 }
768 
769 static void
770 _reactors_scheduler_fini(void)
771 {
772 	/* Reschedule based on the balancing output */
773 	_threads_reschedule(g_core_infos);
774 
775 	g_scheduling_in_progress = false;
776 }
777 
778 static void
779 _reactors_scheduler_update_core_mode(void *ctx1, void *ctx2)
780 {
781 	struct spdk_reactor *reactor;
782 	uint32_t i;
783 	int rc = 0;
784 
785 	for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) {
786 		reactor = spdk_reactor_get(i);
787 		assert(reactor != NULL);
788 		if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) {
789 			/* Switch next found reactor to new state */
790 			rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode,
791 							     _reactors_scheduler_update_core_mode, NULL);
792 			if (rc == 0) {
793 				/* Set core to start with after callback completes */
794 				g_scheduler_core_number = spdk_env_get_next_core(i);
795 				return;
796 			}
797 		}
798 	}
799 	_reactors_scheduler_fini();
800 }
801 
802 static void
803 _reactors_scheduler_cancel(void *arg1, void *arg2)
804 {
805 	struct spdk_scheduler_core_info *core;
806 	uint32_t i;
807 
808 	SPDK_ENV_FOREACH_CORE(i) {
809 		core = &g_core_infos[i];
810 		core->threads_count = 0;
811 		free(core->thread_infos);
812 		core->thread_infos = NULL;
813 	}
814 
815 	g_scheduling_in_progress = false;
816 }
817 
818 static void
819 _reactors_scheduler_balance(void *arg1, void *arg2)
820 {
821 	struct spdk_scheduler *scheduler = spdk_scheduler_get();
822 
823 	if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) {
824 		_reactors_scheduler_cancel(NULL, NULL);
825 		return;
826 	}
827 
828 	scheduler->balance(g_core_infos, g_reactor_count);
829 
830 	g_scheduler_core_number = spdk_env_get_first_core();
831 	_reactors_scheduler_update_core_mode(NULL, NULL);
832 }
833 
834 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
835 static void
836 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
837 {
838 	struct spdk_scheduler_core_info *core_info;
839 	struct spdk_lw_thread *lw_thread;
840 	struct spdk_thread *thread;
841 	struct spdk_reactor *reactor;
842 	uint32_t next_core;
843 	uint32_t i = 0;
844 
845 	reactor = spdk_reactor_get(spdk_env_get_current_core());
846 	assert(reactor != NULL);
847 	core_info = &g_core_infos[reactor->lcore];
848 	core_info->lcore = reactor->lcore;
849 	core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc;
850 	core_info->total_idle_tsc = reactor->idle_tsc;
851 	core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc;
852 	core_info->total_busy_tsc = reactor->busy_tsc;
853 	core_info->interrupt_mode = reactor->in_interrupt;
854 	core_info->threads_count = 0;
855 	core_info->isolated = scheduler_is_isolated_core(reactor->lcore);
856 
857 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
858 
859 	spdk_trace_record(TRACE_SCHEDULER_CORE_STATS, reactor->trace_id, 0, 0,
860 			  core_info->current_busy_tsc,
861 			  core_info->current_idle_tsc);
862 
863 	if (reactor->thread_count > 0) {
864 		core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos));
865 		if (core_info->thread_infos == NULL) {
866 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
867 
868 			/* Cancel this round of schedule work */
869 			_event_call(spdk_scheduler_get_scheduling_lcore(), _reactors_scheduler_cancel, NULL, NULL);
870 			return;
871 		}
872 
873 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
874 			_init_thread_stats(reactor, lw_thread);
875 
876 			core_info->thread_infos[i].lcore = lw_thread->lcore;
877 			thread = spdk_thread_get_from_ctx(lw_thread);
878 			assert(thread != NULL);
879 			core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread);
880 			core_info->thread_infos[i].total_stats = lw_thread->total_stats;
881 			core_info->thread_infos[i].current_stats = lw_thread->current_stats;
882 			core_info->threads_count++;
883 			assert(core_info->threads_count <= reactor->thread_count);
884 
885 			spdk_trace_record(TRACE_SCHEDULER_THREAD_STATS, spdk_thread_get_trace_id(thread), 0, 0,
886 					  lw_thread->current_stats.busy_tsc,
887 					  lw_thread->current_stats.idle_tsc);
888 
889 			i++;
890 		}
891 	}
892 
893 	next_core = spdk_env_get_next_core(reactor->lcore);
894 	if (next_core == UINT32_MAX) {
895 		next_core = spdk_env_get_first_core();
896 	}
897 
898 	/* If we've looped back around to the scheduler thread, move to the next phase */
899 	if (next_core == spdk_scheduler_get_scheduling_lcore()) {
900 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
901 		_event_call(next_core, _reactors_scheduler_balance, NULL, NULL);
902 		return;
903 	}
904 
905 	_event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
906 }
907 
908 static int _reactor_schedule_thread(struct spdk_thread *thread);
909 static uint64_t g_rusage_period;
910 
911 static void
912 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
913 {
914 	struct spdk_thread	*thread = spdk_thread_get_from_ctx(lw_thread);
915 	struct spdk_fd_group	*grp;
916 
917 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
918 	assert(reactor->thread_count > 0);
919 	reactor->thread_count--;
920 
921 	/* Operate thread intr if running with full interrupt ability */
922 	if (spdk_interrupt_mode_is_enabled()) {
923 		if (reactor->in_interrupt) {
924 			grp = spdk_thread_get_interrupt_fd_group(thread);
925 			spdk_fd_group_unnest(reactor->fgrp, grp);
926 		}
927 	}
928 }
929 
930 static bool
931 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
932 {
933 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
934 
935 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
936 			  spdk_thread_is_idle(thread))) {
937 		_reactor_remove_lw_thread(reactor, lw_thread);
938 		spdk_thread_destroy(thread);
939 		return true;
940 	}
941 
942 	if (spdk_unlikely(lw_thread->resched && !spdk_thread_is_bound(thread))) {
943 		lw_thread->resched = false;
944 		_reactor_remove_lw_thread(reactor, lw_thread);
945 		_reactor_schedule_thread(thread);
946 		return true;
947 	}
948 
949 	return false;
950 }
951 
952 static void
953 reactor_interrupt_run(struct spdk_reactor *reactor)
954 {
955 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
956 
957 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
958 }
959 
960 static void
961 _reactor_run(struct spdk_reactor *reactor)
962 {
963 	struct spdk_thread	*thread;
964 	struct spdk_lw_thread	*lw_thread, *tmp;
965 	uint64_t		now;
966 	int			rc;
967 
968 	event_queue_run_batch(reactor);
969 
970 	/* If no threads are present on the reactor,
971 	 * tsc_last gets outdated. Update it to track
972 	 * thread execution time correctly. */
973 	if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) {
974 		now = spdk_get_ticks();
975 		reactor->idle_tsc += now - reactor->tsc_last;
976 		reactor->tsc_last = now;
977 		return;
978 	}
979 
980 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
981 		thread = spdk_thread_get_from_ctx(lw_thread);
982 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
983 
984 		now = spdk_thread_get_last_tsc(thread);
985 		if (rc == 0) {
986 			reactor->idle_tsc += now - reactor->tsc_last;
987 		} else if (rc > 0) {
988 			reactor->busy_tsc += now - reactor->tsc_last;
989 		}
990 		reactor->tsc_last = now;
991 
992 		reactor_post_process_lw_thread(reactor, lw_thread);
993 	}
994 }
995 
996 static int
997 reactor_run(void *arg)
998 {
999 	struct spdk_reactor	*reactor = arg;
1000 	struct spdk_thread	*thread;
1001 	struct spdk_lw_thread	*lw_thread, *tmp;
1002 	char			thread_name[32];
1003 	uint64_t		last_sched = 0;
1004 
1005 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
1006 
1007 	/* Rename the POSIX thread because the reactor is tied to the POSIX
1008 	 * thread in the SPDK event library.
1009 	 */
1010 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
1011 	_set_thread_name(thread_name);
1012 
1013 	reactor->trace_id = spdk_trace_register_owner(OWNER_TYPE_REACTOR, thread_name);
1014 
1015 	reactor->tsc_last = spdk_get_ticks();
1016 
1017 	while (1) {
1018 		/* Execute interrupt process fn if this reactor currently runs in interrupt state */
1019 		if (spdk_unlikely(reactor->in_interrupt)) {
1020 			reactor_interrupt_run(reactor);
1021 		} else {
1022 			_reactor_run(reactor);
1023 		}
1024 
1025 		if (g_framework_context_switch_monitor_enabled) {
1026 			if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
1027 				get_rusage(reactor);
1028 				reactor->last_rusage = reactor->tsc_last;
1029 			}
1030 		}
1031 
1032 		if (spdk_unlikely(g_scheduler_period > 0 &&
1033 				  (reactor->tsc_last - last_sched) > g_scheduler_period &&
1034 				  reactor == g_scheduling_reactor &&
1035 				  !g_scheduling_in_progress)) {
1036 			last_sched = reactor->tsc_last;
1037 			g_scheduling_in_progress = true;
1038 			spdk_trace_record(TRACE_SCHEDULER_PERIOD_START, 0, 0, 0);
1039 			_reactors_scheduler_gather_metrics(NULL, NULL);
1040 		}
1041 
1042 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
1043 			break;
1044 		}
1045 	}
1046 
1047 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
1048 		thread = spdk_thread_get_from_ctx(lw_thread);
1049 		/* All threads should have already had spdk_thread_exit() called on them, except
1050 		 * for the app thread.
1051 		 */
1052 		if (spdk_thread_is_running(thread)) {
1053 			if (!spdk_thread_is_app_thread(thread)) {
1054 				SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n",
1055 					    spdk_thread_get_name(thread));
1056 				SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n");
1057 			}
1058 			spdk_set_thread(thread);
1059 			spdk_thread_exit(thread);
1060 		}
1061 	}
1062 
1063 	while (!TAILQ_EMPTY(&reactor->threads)) {
1064 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1065 			thread = spdk_thread_get_from_ctx(lw_thread);
1066 			spdk_set_thread(thread);
1067 			if (spdk_thread_is_exited(thread)) {
1068 				_reactor_remove_lw_thread(reactor, lw_thread);
1069 				spdk_thread_destroy(thread);
1070 			} else {
1071 				if (spdk_unlikely(reactor->in_interrupt)) {
1072 					reactor_interrupt_run(reactor);
1073 				} else {
1074 					spdk_thread_poll(thread, 0, 0);
1075 				}
1076 			}
1077 		}
1078 	}
1079 
1080 	return 0;
1081 }
1082 
1083 int
1084 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
1085 {
1086 	int ret;
1087 	const struct spdk_cpuset *validmask;
1088 
1089 	ret = spdk_cpuset_parse(cpumask, mask);
1090 	if (ret < 0) {
1091 		return ret;
1092 	}
1093 
1094 	validmask = spdk_app_get_core_mask();
1095 	spdk_cpuset_and(cpumask, validmask);
1096 
1097 	return 0;
1098 }
1099 
1100 const struct spdk_cpuset *
1101 spdk_app_get_core_mask(void)
1102 {
1103 	return &g_reactor_core_mask;
1104 }
1105 
1106 void
1107 spdk_reactors_start(void)
1108 {
1109 	struct spdk_reactor *reactor;
1110 	uint32_t i, current_core;
1111 	int rc;
1112 
1113 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
1114 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
1115 	/* Reinitialize to false, in case the app framework is restarting in the same process. */
1116 	g_stopping_reactors = false;
1117 
1118 	current_core = spdk_env_get_current_core();
1119 	SPDK_ENV_FOREACH_CORE(i) {
1120 		if (i != current_core) {
1121 			reactor = spdk_reactor_get(i);
1122 			if (reactor == NULL) {
1123 				continue;
1124 			}
1125 
1126 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
1127 			if (rc < 0) {
1128 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
1129 				assert(false);
1130 				return;
1131 			}
1132 		}
1133 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
1134 	}
1135 
1136 	/* Start the main reactor */
1137 	reactor = spdk_reactor_get(current_core);
1138 	assert(reactor != NULL);
1139 	reactor_run(reactor);
1140 
1141 	spdk_env_thread_wait_all();
1142 
1143 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
1144 }
1145 
1146 static void
1147 _reactors_stop(void *arg1, void *arg2)
1148 {
1149 	uint32_t i;
1150 	int rc;
1151 	struct spdk_reactor *reactor;
1152 	struct spdk_reactor *local_reactor;
1153 	uint64_t notify = 1;
1154 
1155 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
1156 	local_reactor = spdk_reactor_get(spdk_env_get_current_core());
1157 
1158 	SPDK_ENV_FOREACH_CORE(i) {
1159 		/* If spdk_event_call isn't called  on a reactor, always send a notification.
1160 		 * If it is called on a reactor, send a notification if the destination reactor
1161 		 * is indicated in interrupt mode state.
1162 		 */
1163 		if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
1164 			reactor = spdk_reactor_get(i);
1165 			assert(reactor != NULL);
1166 			rc = write(reactor->events_fd, &notify, sizeof(notify));
1167 			if (rc < 0) {
1168 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
1169 				continue;
1170 			}
1171 		}
1172 	}
1173 }
1174 
1175 static void
1176 nop(void *arg1, void *arg2)
1177 {
1178 }
1179 
1180 void
1181 spdk_reactors_stop(void *arg1)
1182 {
1183 	spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop);
1184 }
1185 
1186 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
1187 static uint32_t g_next_core = UINT32_MAX;
1188 
1189 static void
1190 _schedule_thread(void *arg1, void *arg2)
1191 {
1192 	struct spdk_lw_thread *lw_thread = arg1;
1193 	struct spdk_thread *thread;
1194 	struct spdk_reactor *reactor;
1195 	uint32_t current_core;
1196 	struct spdk_fd_group *grp;
1197 
1198 	current_core = spdk_env_get_current_core();
1199 	reactor = spdk_reactor_get(current_core);
1200 	assert(reactor != NULL);
1201 
1202 	/* Update total_stats to reflect state of thread
1203 	* at the end of the move. */
1204 	thread = spdk_thread_get_from_ctx(lw_thread);
1205 	spdk_set_thread(thread);
1206 	spdk_thread_get_stats(&lw_thread->total_stats);
1207 	spdk_set_thread(NULL);
1208 
1209 	lw_thread->lcore = current_core;
1210 
1211 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
1212 	reactor->thread_count++;
1213 
1214 	/* Operate thread intr if running with full interrupt ability */
1215 	if (spdk_interrupt_mode_is_enabled()) {
1216 		int rc;
1217 
1218 		if (reactor->in_interrupt) {
1219 			grp = spdk_thread_get_interrupt_fd_group(thread);
1220 			rc = spdk_fd_group_nest(reactor->fgrp, grp);
1221 			if (rc < 0) {
1222 				SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
1223 			}
1224 		}
1225 
1226 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
1227 		spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor);
1228 	}
1229 }
1230 
1231 static int
1232 _reactor_schedule_thread(struct spdk_thread *thread)
1233 {
1234 	uint32_t core;
1235 	struct spdk_lw_thread *lw_thread;
1236 	struct spdk_event *evt = NULL;
1237 	struct spdk_cpuset *cpumask;
1238 	uint32_t i;
1239 	struct spdk_reactor *local_reactor = NULL;
1240 	uint32_t current_lcore = spdk_env_get_current_core();
1241 	struct spdk_cpuset polling_cpumask;
1242 	struct spdk_cpuset valid_cpumask;
1243 
1244 	cpumask = spdk_thread_get_cpumask(thread);
1245 
1246 	lw_thread = spdk_thread_get_ctx(thread);
1247 	assert(lw_thread != NULL);
1248 	core = lw_thread->lcore;
1249 	memset(lw_thread, 0, sizeof(*lw_thread));
1250 
1251 	if (current_lcore != SPDK_ENV_LCORE_ID_ANY) {
1252 		local_reactor = spdk_reactor_get(current_lcore);
1253 		assert(local_reactor);
1254 	}
1255 
1256 	/* When interrupt ability of spdk_thread is not enabled and the current
1257 	 * reactor runs on DPDK thread, skip reactors which are in interrupt mode.
1258 	 */
1259 	if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) {
1260 		/* Get the cpumask of all reactors in polling */
1261 		spdk_cpuset_zero(&polling_cpumask);
1262 		SPDK_ENV_FOREACH_CORE(i) {
1263 			spdk_cpuset_set_cpu(&polling_cpumask, i, true);
1264 		}
1265 		spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset);
1266 
1267 		if (core == SPDK_ENV_LCORE_ID_ANY) {
1268 			/* Get the cpumask of all valid reactors which are suggested and also in polling */
1269 			spdk_cpuset_copy(&valid_cpumask, &polling_cpumask);
1270 			spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread));
1271 
1272 			/* If there are any valid reactors, spdk_thread should be scheduled
1273 			 * into one of the valid reactors.
1274 			 * If there is no valid reactors, spdk_thread should be scheduled
1275 			 * into one of the polling reactors.
1276 			 */
1277 			if (spdk_cpuset_count(&valid_cpumask) != 0) {
1278 				cpumask = &valid_cpumask;
1279 			} else {
1280 				cpumask = &polling_cpumask;
1281 			}
1282 		} else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) {
1283 			/* If specified reactor is not in polling, spdk_thread should be scheduled
1284 			 * into one of the polling reactors.
1285 			 */
1286 			core = SPDK_ENV_LCORE_ID_ANY;
1287 			cpumask = &polling_cpumask;
1288 		}
1289 	}
1290 
1291 	pthread_mutex_lock(&g_scheduler_mtx);
1292 	if (core == SPDK_ENV_LCORE_ID_ANY) {
1293 		for (i = 0; i < spdk_env_get_core_count(); i++) {
1294 			if (g_next_core >= g_reactor_count) {
1295 				g_next_core = spdk_env_get_first_core();
1296 			}
1297 			core = g_next_core;
1298 			g_next_core = spdk_env_get_next_core(g_next_core);
1299 
1300 			if (spdk_cpuset_get_cpu(cpumask, core)) {
1301 				break;
1302 			}
1303 		}
1304 	}
1305 
1306 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
1307 
1308 	if (current_lcore != core) {
1309 		spdk_trace_record(TRACE_SCHEDULER_MOVE_THREAD, spdk_thread_get_trace_id(thread), 0, 0,
1310 				  current_lcore, core);
1311 	}
1312 
1313 	pthread_mutex_unlock(&g_scheduler_mtx);
1314 
1315 	assert(evt != NULL);
1316 	if (evt == NULL) {
1317 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
1318 		return -1;
1319 	}
1320 
1321 	lw_thread->tsc_start = spdk_get_ticks();
1322 
1323 	spdk_event_call(evt);
1324 
1325 	return 0;
1326 }
1327 
1328 static void
1329 _reactor_request_thread_reschedule(struct spdk_thread *thread)
1330 {
1331 	struct spdk_lw_thread *lw_thread;
1332 	struct spdk_reactor *reactor;
1333 	uint32_t current_core;
1334 
1335 	assert(thread == spdk_get_thread());
1336 
1337 	lw_thread = spdk_thread_get_ctx(thread);
1338 
1339 	assert(lw_thread != NULL);
1340 	lw_thread->resched = true;
1341 	lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1342 
1343 	current_core = spdk_env_get_current_core();
1344 	reactor = spdk_reactor_get(current_core);
1345 	assert(reactor != NULL);
1346 
1347 	/* Send a notification if the destination reactor is indicated in intr mode state */
1348 	if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
1349 		uint64_t notify = 1;
1350 
1351 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1352 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
1353 		}
1354 	}
1355 }
1356 
1357 static int
1358 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
1359 {
1360 	struct spdk_lw_thread *lw_thread;
1361 
1362 	switch (op) {
1363 	case SPDK_THREAD_OP_NEW:
1364 		lw_thread = spdk_thread_get_ctx(thread);
1365 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1366 		return _reactor_schedule_thread(thread);
1367 	case SPDK_THREAD_OP_RESCHED:
1368 		_reactor_request_thread_reschedule(thread);
1369 		return 0;
1370 	default:
1371 		return -ENOTSUP;
1372 	}
1373 }
1374 
1375 static bool
1376 reactor_thread_op_supported(enum spdk_thread_op op)
1377 {
1378 	switch (op) {
1379 	case SPDK_THREAD_OP_NEW:
1380 	case SPDK_THREAD_OP_RESCHED:
1381 		return true;
1382 	default:
1383 		return false;
1384 	}
1385 }
1386 
1387 struct call_reactor {
1388 	uint32_t cur_core;
1389 	spdk_event_fn fn;
1390 	void *arg1;
1391 	void *arg2;
1392 
1393 	uint32_t orig_core;
1394 	spdk_event_fn cpl;
1395 };
1396 
1397 static void
1398 on_reactor(void *arg1, void *arg2)
1399 {
1400 	struct call_reactor *cr = arg1;
1401 	struct spdk_event *evt;
1402 
1403 	cr->fn(cr->arg1, cr->arg2);
1404 
1405 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1406 
1407 	if (cr->cur_core >= g_reactor_count) {
1408 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1409 
1410 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1411 		free(cr);
1412 	} else {
1413 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1414 			      cr->cur_core);
1415 
1416 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1417 	}
1418 	assert(evt != NULL);
1419 	spdk_event_call(evt);
1420 }
1421 
1422 void
1423 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1424 {
1425 	struct call_reactor *cr;
1426 
1427 	/* When the application framework is shutting down, we will send one
1428 	 * final for_each_reactor operation with completion callback _reactors_stop,
1429 	 * to flush any existing for_each_reactor operations to avoid any memory
1430 	 * leaks. We use a mutex here to protect a boolean flag that will ensure
1431 	 * we don't start any more operations once we've started shutting down.
1432 	 */
1433 	pthread_mutex_lock(&g_stopping_reactors_mtx);
1434 	if (g_stopping_reactors) {
1435 		pthread_mutex_unlock(&g_stopping_reactors_mtx);
1436 		return;
1437 	} else if (cpl == _reactors_stop) {
1438 		g_stopping_reactors = true;
1439 	}
1440 	pthread_mutex_unlock(&g_stopping_reactors_mtx);
1441 
1442 	cr = calloc(1, sizeof(*cr));
1443 	if (!cr) {
1444 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1445 		cpl(arg1, arg2);
1446 		return;
1447 	}
1448 
1449 	cr->fn = fn;
1450 	cr->arg1 = arg1;
1451 	cr->arg2 = arg2;
1452 	cr->cpl = cpl;
1453 	cr->orig_core = spdk_env_get_current_core();
1454 	cr->cur_core = spdk_env_get_first_core();
1455 
1456 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1457 
1458 	_event_call(cr->cur_core, on_reactor, cr, NULL);
1459 }
1460 
1461 #ifdef __linux__
1462 static int
1463 reactor_schedule_thread_event(void *arg)
1464 {
1465 	struct spdk_reactor *reactor = arg;
1466 	struct spdk_lw_thread *lw_thread, *tmp;
1467 	uint32_t count = 0;
1468 	uint64_t notify = 1;
1469 
1470 	assert(reactor->in_interrupt);
1471 
1472 	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1473 		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
1474 		return -errno;
1475 	}
1476 
1477 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1478 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1479 	}
1480 
1481 	return count;
1482 }
1483 
1484 static int
1485 reactor_interrupt_init(struct spdk_reactor *reactor)
1486 {
1487 	int rc;
1488 
1489 	rc = spdk_fd_group_create(&reactor->fgrp);
1490 	if (rc != 0) {
1491 		return rc;
1492 	}
1493 
1494 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1495 	if (reactor->resched_fd < 0) {
1496 		rc = -EBADF;
1497 		goto err;
1498 	}
1499 
1500 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event,
1501 			       reactor);
1502 	if (rc) {
1503 		close(reactor->resched_fd);
1504 		goto err;
1505 	}
1506 
1507 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1508 	if (reactor->events_fd < 0) {
1509 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1510 		close(reactor->resched_fd);
1511 
1512 		rc = -EBADF;
1513 		goto err;
1514 	}
1515 
1516 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd,
1517 			       event_queue_run_batch, reactor);
1518 	if (rc) {
1519 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1520 		close(reactor->resched_fd);
1521 		close(reactor->events_fd);
1522 		goto err;
1523 	}
1524 
1525 	return 0;
1526 
1527 err:
1528 	spdk_fd_group_destroy(reactor->fgrp);
1529 	reactor->fgrp = NULL;
1530 	return rc;
1531 }
1532 #else
1533 static int
1534 reactor_interrupt_init(struct spdk_reactor *reactor)
1535 {
1536 	return -ENOTSUP;
1537 }
1538 #endif
1539 
1540 static void
1541 reactor_interrupt_fini(struct spdk_reactor *reactor)
1542 {
1543 	struct spdk_fd_group *fgrp = reactor->fgrp;
1544 
1545 	if (!fgrp) {
1546 		return;
1547 	}
1548 
1549 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1550 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1551 
1552 	close(reactor->events_fd);
1553 	close(reactor->resched_fd);
1554 
1555 	spdk_fd_group_destroy(fgrp);
1556 	reactor->fgrp = NULL;
1557 }
1558 
1559 static struct spdk_governor *
1560 _governor_find(const char *name)
1561 {
1562 	struct spdk_governor *governor, *tmp;
1563 
1564 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1565 		if (strcmp(name, governor->name) == 0) {
1566 			return governor;
1567 		}
1568 	}
1569 
1570 	return NULL;
1571 }
1572 
1573 int
1574 spdk_governor_set(const char *name)
1575 {
1576 	struct spdk_governor *governor;
1577 	int rc = 0;
1578 
1579 	/* NULL governor was specifically requested */
1580 	if (name == NULL) {
1581 		if (g_governor) {
1582 			g_governor->deinit();
1583 		}
1584 		g_governor = NULL;
1585 		return 0;
1586 	}
1587 
1588 	governor = _governor_find(name);
1589 	if (governor == NULL) {
1590 		return -EINVAL;
1591 	}
1592 
1593 	if (g_governor == governor) {
1594 		return 0;
1595 	}
1596 
1597 	rc = governor->init();
1598 	if (rc == 0) {
1599 		if (g_governor) {
1600 			g_governor->deinit();
1601 		}
1602 		g_governor = governor;
1603 	}
1604 
1605 	return rc;
1606 }
1607 
1608 struct spdk_governor *
1609 spdk_governor_get(void)
1610 {
1611 	return g_governor;
1612 }
1613 
1614 void
1615 spdk_governor_register(struct spdk_governor *governor)
1616 {
1617 	if (_governor_find(governor->name)) {
1618 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1619 		assert(false);
1620 		return;
1621 	}
1622 
1623 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1624 }
1625 
1626 SPDK_LOG_REGISTER_COMPONENT(reactor)
1627 
1628 static void
1629 scheduler_trace(void)
1630 {
1631 	struct spdk_trace_tpoint_opts opts[] = {
1632 		{
1633 			"SCHEDULER_PERIOD_START", TRACE_SCHEDULER_PERIOD_START,
1634 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
1635 			{
1636 
1637 			}
1638 		},
1639 		{
1640 			"SCHEDULER_CORE_STATS", TRACE_SCHEDULER_CORE_STATS,
1641 			OWNER_TYPE_REACTOR, OBJECT_NONE, 0,
1642 			{
1643 				{ "busy", SPDK_TRACE_ARG_TYPE_INT, 8},
1644 				{ "idle", SPDK_TRACE_ARG_TYPE_INT, 8}
1645 			}
1646 		},
1647 		{
1648 			"SCHEDULER_THREAD_STATS", TRACE_SCHEDULER_THREAD_STATS,
1649 			OWNER_TYPE_THREAD, OBJECT_NONE, 0,
1650 			{
1651 				{ "busy", SPDK_TRACE_ARG_TYPE_INT, 8},
1652 				{ "idle", SPDK_TRACE_ARG_TYPE_INT, 8}
1653 			}
1654 		},
1655 		{
1656 			"SCHEDULER_MOVE_THREAD", TRACE_SCHEDULER_MOVE_THREAD,
1657 			OWNER_TYPE_THREAD, OBJECT_NONE, 0,
1658 			{
1659 				{ "src", SPDK_TRACE_ARG_TYPE_INT, 8 },
1660 				{ "dst", SPDK_TRACE_ARG_TYPE_INT, 8 }
1661 			}
1662 		}
1663 	};
1664 
1665 	spdk_trace_register_owner_type(OWNER_TYPE_REACTOR, 'r');
1666 	spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts));
1667 
1668 }
1669 
1670 SPDK_TRACE_REGISTER_FN(scheduler_trace, "scheduler", TRACE_GROUP_SCHEDULER)
1671