xref: /spdk/lib/event/reactor.c (revision c6c1234de9e0015e670dd0b51bf6ce39ee0e07bd)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/likely.h"
8 
9 #include "event_internal.h"
10 
11 #include "spdk_internal/event.h"
12 #include "spdk_internal/usdt.h"
13 
14 #include "spdk/log.h"
15 #include "spdk/thread.h"
16 #include "spdk/env.h"
17 #include "spdk/util.h"
18 #include "spdk/scheduler.h"
19 #include "spdk/string.h"
20 #include "spdk/fd_group.h"
21 
22 #ifdef __linux__
23 #include <sys/prctl.h>
24 #include <sys/eventfd.h>
25 #endif
26 
27 #ifdef __FreeBSD__
28 #include <pthread_np.h>
29 #endif
30 
31 #define SPDK_EVENT_BATCH_SIZE		8
32 
33 static struct spdk_reactor *g_reactors;
34 static uint32_t g_reactor_count;
35 static struct spdk_cpuset g_reactor_core_mask;
36 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
37 
38 static bool g_framework_context_switch_monitor_enabled = true;
39 
40 static struct spdk_mempool *g_spdk_event_mempool = NULL;
41 
42 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
43 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
44 
45 static struct spdk_scheduler *g_scheduler = NULL;
46 static struct spdk_reactor *g_scheduling_reactor;
47 bool g_scheduling_in_progress = false;
48 static uint64_t g_scheduler_period = 0;
49 static uint32_t g_scheduler_core_number;
50 static struct spdk_scheduler_core_info *g_core_infos = NULL;
51 static struct spdk_cpuset g_scheduler_isolated_core_mask;
52 
53 TAILQ_HEAD(, spdk_governor) g_governor_list
54 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
55 
56 static struct spdk_governor *g_governor = NULL;
57 
58 static int reactor_interrupt_init(struct spdk_reactor *reactor);
59 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
60 
61 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER;
62 static bool g_stopping_reactors = false;
63 
64 static struct spdk_scheduler *
65 _scheduler_find(const char *name)
66 {
67 	struct spdk_scheduler *tmp;
68 
69 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
70 		if (strcmp(name, tmp->name) == 0) {
71 			return tmp;
72 		}
73 	}
74 
75 	return NULL;
76 }
77 
78 int
79 spdk_scheduler_set(const char *name)
80 {
81 	struct spdk_scheduler *scheduler;
82 	int rc = 0;
83 
84 	/* NULL scheduler was specifically requested */
85 	if (name == NULL) {
86 		if (g_scheduler) {
87 			g_scheduler->deinit();
88 		}
89 		g_scheduler = NULL;
90 		return 0;
91 	}
92 
93 	scheduler = _scheduler_find(name);
94 	if (scheduler == NULL) {
95 		SPDK_ERRLOG("Requested scheduler is missing\n");
96 		return -EINVAL;
97 	}
98 
99 	if (g_scheduler == scheduler) {
100 		return 0;
101 	}
102 
103 	if (g_scheduler) {
104 		g_scheduler->deinit();
105 	}
106 
107 	rc = scheduler->init();
108 	if (rc == 0) {
109 		g_scheduler = scheduler;
110 	} else {
111 		/* Could not switch to the new scheduler, so keep the old
112 		 * one. We need to check if it wasn't NULL, and ->init() it again.
113 		 */
114 		if (g_scheduler) {
115 			SPDK_ERRLOG("Could not ->init() '%s' scheduler, reverting to '%s'\n",
116 				    name, g_scheduler->name);
117 			g_scheduler->init();
118 		} else {
119 			SPDK_ERRLOG("Could not ->init() '%s' scheduler.\n", name);
120 		}
121 	}
122 
123 	return rc;
124 }
125 
126 struct spdk_scheduler *
127 spdk_scheduler_get(void)
128 {
129 	return g_scheduler;
130 }
131 
132 uint64_t
133 spdk_scheduler_get_period(void)
134 {
135 	/* Convert from ticks to microseconds */
136 	return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
137 }
138 
139 void
140 spdk_scheduler_set_period(uint64_t period)
141 {
142 	/* Convert microseconds to ticks */
143 	g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
144 }
145 
146 void
147 spdk_scheduler_register(struct spdk_scheduler *scheduler)
148 {
149 	if (_scheduler_find(scheduler->name)) {
150 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
151 		assert(false);
152 		return;
153 	}
154 
155 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
156 }
157 
158 uint32_t
159 spdk_scheduler_get_scheduling_lcore(void)
160 {
161 	return g_scheduling_reactor->lcore;
162 }
163 
164 bool
165 spdk_scheduler_set_scheduling_lcore(uint32_t core)
166 {
167 	struct spdk_reactor *reactor = spdk_reactor_get(core);
168 	if (reactor == NULL) {
169 		SPDK_ERRLOG("Failed to set scheduling reactor. Reactor(lcore:%d) does not exist", core);
170 		return false;
171 	}
172 
173 	g_scheduling_reactor = reactor;
174 	return true;
175 }
176 
177 bool
178 scheduler_set_isolated_core_mask(struct spdk_cpuset isolated_core_mask)
179 {
180 	struct spdk_cpuset tmp_mask;
181 
182 	spdk_cpuset_copy(&tmp_mask, spdk_app_get_core_mask());
183 	spdk_cpuset_or(&tmp_mask, &isolated_core_mask);
184 	if (spdk_cpuset_equal(&tmp_mask, spdk_app_get_core_mask()) == false) {
185 		SPDK_ERRLOG("Isolated core mask is not included in app core mask.\n");
186 		return false;
187 	}
188 	spdk_cpuset_copy(&g_scheduler_isolated_core_mask, &isolated_core_mask);
189 	return true;
190 }
191 
192 const char *
193 scheduler_get_isolated_core_mask(void)
194 {
195 	return spdk_cpuset_fmt(&g_scheduler_isolated_core_mask);
196 }
197 
198 static bool
199 scheduler_is_isolated_core(uint32_t core)
200 {
201 	return spdk_cpuset_get_cpu(&g_scheduler_isolated_core_mask, core);
202 }
203 
204 static void
205 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
206 {
207 	reactor->lcore = lcore;
208 	reactor->flags.is_valid = true;
209 
210 	TAILQ_INIT(&reactor->threads);
211 	reactor->thread_count = 0;
212 	spdk_cpuset_zero(&reactor->notify_cpuset);
213 
214 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
215 	if (reactor->events == NULL) {
216 		SPDK_ERRLOG("Failed to allocate events ring\n");
217 		assert(false);
218 	}
219 
220 	/* Always initialize interrupt facilities for reactor */
221 	if (reactor_interrupt_init(reactor) != 0) {
222 		/* Reactor interrupt facilities are necessary if setting app to interrupt mode. */
223 		if (spdk_interrupt_mode_is_enabled()) {
224 			SPDK_ERRLOG("Failed to prepare intr facilities\n");
225 			assert(false);
226 		}
227 		return;
228 	}
229 
230 	/* If application runs with full interrupt ability,
231 	 * all reactors are going to run in interrupt mode.
232 	 */
233 	if (spdk_interrupt_mode_is_enabled()) {
234 		uint32_t i;
235 
236 		SPDK_ENV_FOREACH_CORE(i) {
237 			spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
238 		}
239 		reactor->in_interrupt = true;
240 	}
241 }
242 
243 struct spdk_reactor *
244 spdk_reactor_get(uint32_t lcore)
245 {
246 	struct spdk_reactor *reactor;
247 
248 	if (g_reactors == NULL) {
249 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
250 		return NULL;
251 	}
252 
253 	if (lcore >= g_reactor_count) {
254 		return NULL;
255 	}
256 
257 	reactor = &g_reactors[lcore];
258 
259 	if (reactor->flags.is_valid == false) {
260 		return NULL;
261 	}
262 
263 	return reactor;
264 }
265 
266 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
267 static bool reactor_thread_op_supported(enum spdk_thread_op op);
268 
269 int
270 spdk_reactors_init(size_t msg_mempool_size)
271 {
272 	struct spdk_reactor *reactor;
273 	int rc;
274 	uint32_t i, current_core;
275 	char mempool_name[32];
276 
277 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
278 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
279 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
280 			       sizeof(struct spdk_event),
281 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
282 			       SPDK_ENV_SOCKET_ID_ANY);
283 
284 	if (g_spdk_event_mempool == NULL) {
285 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
286 		return -1;
287 	}
288 
289 	/* struct spdk_reactor must be aligned on 64 byte boundary */
290 	g_reactor_count = spdk_env_get_last_core() + 1;
291 	rc = posix_memalign((void **)&g_reactors, 64,
292 			    g_reactor_count * sizeof(struct spdk_reactor));
293 	if (rc != 0) {
294 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
295 			    g_reactor_count);
296 		spdk_mempool_free(g_spdk_event_mempool);
297 		return -1;
298 	}
299 
300 	g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos));
301 	if (g_core_infos == NULL) {
302 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
303 		spdk_mempool_free(g_spdk_event_mempool);
304 		free(g_reactors);
305 		return -ENOMEM;
306 	}
307 
308 	memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor));
309 
310 	rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
311 				      sizeof(struct spdk_lw_thread), msg_mempool_size);
312 	if (rc != 0) {
313 		SPDK_ERRLOG("Initialize spdk thread lib failed\n");
314 		spdk_mempool_free(g_spdk_event_mempool);
315 		free(g_reactors);
316 		free(g_core_infos);
317 		return rc;
318 	}
319 
320 	SPDK_ENV_FOREACH_CORE(i) {
321 		reactor_construct(&g_reactors[i], i);
322 	}
323 
324 	current_core = spdk_env_get_current_core();
325 	reactor = spdk_reactor_get(current_core);
326 	assert(reactor != NULL);
327 	g_scheduling_reactor = reactor;
328 
329 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
330 
331 	return 0;
332 }
333 
334 void
335 spdk_reactors_fini(void)
336 {
337 	uint32_t i;
338 	struct spdk_reactor *reactor;
339 
340 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
341 		return;
342 	}
343 
344 	spdk_thread_lib_fini();
345 
346 	SPDK_ENV_FOREACH_CORE(i) {
347 		reactor = spdk_reactor_get(i);
348 		assert(reactor != NULL);
349 		assert(reactor->thread_count == 0);
350 		if (reactor->events != NULL) {
351 			spdk_ring_free(reactor->events);
352 		}
353 
354 		reactor_interrupt_fini(reactor);
355 
356 		if (g_core_infos != NULL) {
357 			free(g_core_infos[i].thread_infos);
358 		}
359 	}
360 
361 	spdk_mempool_free(g_spdk_event_mempool);
362 
363 	free(g_reactors);
364 	g_reactors = NULL;
365 	free(g_core_infos);
366 	g_core_infos = NULL;
367 }
368 
369 static void _reactor_set_interrupt_mode(void *arg1, void *arg2);
370 
371 static void
372 _reactor_set_notify_cpuset(void *arg1, void *arg2)
373 {
374 	struct spdk_reactor *target = arg1;
375 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
376 
377 	assert(reactor != NULL);
378 	spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt);
379 }
380 
381 static void
382 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
383 {
384 	struct spdk_event *ev;
385 
386 	ev = spdk_event_allocate(lcore, fn, arg1, arg2);
387 	assert(ev);
388 	spdk_event_call(ev);
389 }
390 
391 static void
392 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2)
393 {
394 	struct spdk_reactor *target = arg1;
395 
396 	if (target->new_in_interrupt == false) {
397 		target->set_interrupt_mode_in_progress = false;
398 		_event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn,
399 			    target->set_interrupt_mode_cb_arg, NULL);
400 	} else {
401 		_event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL);
402 	}
403 }
404 
405 static void
406 _reactor_set_thread_interrupt_mode(void *ctx)
407 {
408 	struct spdk_reactor *reactor = ctx;
409 
410 	spdk_thread_set_interrupt_mode(reactor->in_interrupt);
411 }
412 
413 static void
414 _reactor_set_interrupt_mode(void *arg1, void *arg2)
415 {
416 	struct spdk_reactor *target = arg1;
417 	struct spdk_thread *thread;
418 	struct spdk_fd_group *grp;
419 	struct spdk_lw_thread *lw_thread, *tmp;
420 
421 	assert(target == spdk_reactor_get(spdk_env_get_current_core()));
422 	assert(target != NULL);
423 	assert(target->in_interrupt != target->new_in_interrupt);
424 	SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n",
425 		      target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll");
426 
427 	target->in_interrupt = target->new_in_interrupt;
428 
429 	if (spdk_interrupt_mode_is_enabled()) {
430 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
431 		TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) {
432 			thread = spdk_thread_get_from_ctx(lw_thread);
433 			if (target->in_interrupt) {
434 				grp = spdk_thread_get_interrupt_fd_group(thread);
435 				spdk_fd_group_nest(target->fgrp, grp);
436 			} else {
437 				grp = spdk_thread_get_interrupt_fd_group(thread);
438 				spdk_fd_group_unnest(target->fgrp, grp);
439 			}
440 
441 			spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target);
442 		}
443 	}
444 
445 	if (target->new_in_interrupt == false) {
446 		/* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately
447 		 * track reactor stats. */
448 		target->tsc_last = spdk_get_ticks();
449 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
450 	} else {
451 		uint64_t notify = 1;
452 		int rc = 0;
453 
454 		/* Always trigger spdk_event and resched event in case of race condition */
455 		rc = write(target->events_fd, &notify, sizeof(notify));
456 		if (rc < 0) {
457 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
458 		}
459 		rc = write(target->resched_fd, &notify, sizeof(notify));
460 		if (rc < 0) {
461 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
462 		}
463 
464 		target->set_interrupt_mode_in_progress = false;
465 		_event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn,
466 			    target->set_interrupt_mode_cb_arg, NULL);
467 	}
468 }
469 
470 int
471 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt,
472 				spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg)
473 {
474 	struct spdk_reactor *target;
475 
476 	target = spdk_reactor_get(lcore);
477 	if (target == NULL) {
478 		return -EINVAL;
479 	}
480 
481 	/* Eventfd has to be supported in order to use interrupt functionality. */
482 	if (target->fgrp == NULL) {
483 		return -ENOTSUP;
484 	}
485 
486 	if (spdk_env_get_current_core() != g_scheduling_reactor->lcore) {
487 		SPDK_ERRLOG("It is only permitted within scheduling reactor.\n");
488 		return -EPERM;
489 	}
490 
491 	if (target->in_interrupt == new_in_interrupt) {
492 		cb_fn(cb_arg, NULL);
493 		return 0;
494 	}
495 
496 	if (target->set_interrupt_mode_in_progress) {
497 		SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore);
498 		return -EBUSY;
499 	}
500 	target->set_interrupt_mode_in_progress = true;
501 
502 	target->new_in_interrupt = new_in_interrupt;
503 	target->set_interrupt_mode_cb_fn = cb_fn;
504 	target->set_interrupt_mode_cb_arg = cb_arg;
505 
506 	SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n",
507 		      spdk_env_get_current_core(), lcore);
508 
509 	if (new_in_interrupt == false) {
510 		/* For potential race cases, when setting the reactor to poll mode,
511 		 * first change the mode of the reactor and then clear the corresponding
512 		 * bit of the notify_cpuset of each reactor.
513 		 */
514 		_event_call(lcore, _reactor_set_interrupt_mode, target, NULL);
515 	} else {
516 		/* For race cases, when setting the reactor to interrupt mode, first set the
517 		 * corresponding bit of the notify_cpuset of each reactor and then change the mode.
518 		 */
519 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
520 	}
521 
522 	return 0;
523 }
524 
525 struct spdk_event *
526 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
527 {
528 	struct spdk_event *event = NULL;
529 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
530 
531 	if (!reactor) {
532 		assert(false);
533 		return NULL;
534 	}
535 
536 	event = spdk_mempool_get(g_spdk_event_mempool);
537 	if (event == NULL) {
538 		assert(false);
539 		return NULL;
540 	}
541 
542 	event->lcore = lcore;
543 	event->fn = fn;
544 	event->arg1 = arg1;
545 	event->arg2 = arg2;
546 
547 	return event;
548 }
549 
550 void
551 spdk_event_call(struct spdk_event *event)
552 {
553 	int rc;
554 	struct spdk_reactor *reactor;
555 	struct spdk_reactor *local_reactor = NULL;
556 	uint32_t current_core = spdk_env_get_current_core();
557 
558 	reactor = spdk_reactor_get(event->lcore);
559 
560 	assert(reactor != NULL);
561 	assert(reactor->events != NULL);
562 
563 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
564 	if (rc != 1) {
565 		assert(false);
566 	}
567 
568 	if (current_core != SPDK_ENV_LCORE_ID_ANY) {
569 		local_reactor = spdk_reactor_get(current_core);
570 	}
571 
572 	/* If spdk_event_call isn't called on a reactor, always send a notification.
573 	 * If it is called on a reactor, send a notification if the destination reactor
574 	 * is indicated in interrupt mode state.
575 	 */
576 	if (spdk_unlikely(local_reactor == NULL) ||
577 	    spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
578 		uint64_t notify = 1;
579 
580 		rc = write(reactor->events_fd, &notify, sizeof(notify));
581 		if (rc < 0) {
582 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
583 		}
584 	}
585 }
586 
587 static inline int
588 event_queue_run_batch(void *arg)
589 {
590 	struct spdk_reactor *reactor = arg;
591 	size_t count, i;
592 	void *events[SPDK_EVENT_BATCH_SIZE];
593 
594 #ifdef DEBUG
595 	/*
596 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
597 	 * so we will never actually read uninitialized data from events, but just to be sure
598 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
599 	 */
600 	memset(events, 0, sizeof(events));
601 #endif
602 
603 	/* Operate event notification if this reactor currently runs in interrupt state */
604 	if (spdk_unlikely(reactor->in_interrupt)) {
605 		uint64_t notify = 1;
606 		int rc;
607 
608 		/* There may be race between event_acknowledge and another producer's event_notify,
609 		 * so event_acknowledge should be applied ahead. And then check for self's event_notify.
610 		 * This can avoid event notification missing.
611 		 */
612 		rc = read(reactor->events_fd, &notify, sizeof(notify));
613 		if (rc < 0) {
614 			SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno));
615 			return -errno;
616 		}
617 
618 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
619 
620 		if (spdk_ring_count(reactor->events) != 0) {
621 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
622 			rc = write(reactor->events_fd, &notify, sizeof(notify));
623 			if (rc < 0) {
624 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
625 				return -errno;
626 			}
627 		}
628 	} else {
629 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
630 	}
631 
632 	if (count == 0) {
633 		return 0;
634 	}
635 
636 	for (i = 0; i < count; i++) {
637 		struct spdk_event *event = events[i];
638 
639 		assert(event != NULL);
640 		assert(spdk_get_thread() == NULL);
641 		SPDK_DTRACE_PROBE3(event_exec, event->fn,
642 				   event->arg1, event->arg2);
643 		event->fn(event->arg1, event->arg2);
644 	}
645 
646 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
647 
648 	return (int)count;
649 }
650 
651 /* 1s */
652 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
653 
654 static int
655 get_rusage(struct spdk_reactor *reactor)
656 {
657 	struct rusage		rusage;
658 
659 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
660 		return -1;
661 	}
662 
663 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
664 		SPDK_INFOLOG(reactor,
665 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
666 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
667 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
668 	}
669 	reactor->rusage = rusage;
670 
671 	return -1;
672 }
673 
674 void
675 spdk_framework_enable_context_switch_monitor(bool enable)
676 {
677 	/* This global is being read by multiple threads, so this isn't
678 	 * strictly thread safe. However, we're toggling between true and
679 	 * false here, and if a thread sees the value update later than it
680 	 * should, it's no big deal. */
681 	g_framework_context_switch_monitor_enabled = enable;
682 }
683 
684 bool
685 spdk_framework_context_switch_monitor_enabled(void)
686 {
687 	return g_framework_context_switch_monitor_enabled;
688 }
689 
690 static void
691 _set_thread_name(const char *thread_name)
692 {
693 #if defined(__linux__)
694 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
695 #elif defined(__FreeBSD__)
696 	pthread_set_name_np(pthread_self(), thread_name);
697 #else
698 	pthread_setname_np(pthread_self(), thread_name);
699 #endif
700 }
701 
702 static void
703 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
704 {
705 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
706 	struct spdk_thread_stats prev_total_stats;
707 
708 	/* Read total_stats before updating it to calculate stats during the last scheduling period. */
709 	prev_total_stats = lw_thread->total_stats;
710 
711 	spdk_set_thread(thread);
712 	spdk_thread_get_stats(&lw_thread->total_stats);
713 	spdk_set_thread(NULL);
714 
715 	lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc;
716 	lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc;
717 }
718 
719 static void
720 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info)
721 {
722 	struct spdk_lw_thread *lw_thread;
723 	struct spdk_thread *thread;
724 
725 	thread = spdk_thread_get_by_id(thread_info->thread_id);
726 	if (thread == NULL) {
727 		/* Thread no longer exists. */
728 		return;
729 	}
730 	lw_thread = spdk_thread_get_ctx(thread);
731 	assert(lw_thread != NULL);
732 
733 	lw_thread->lcore = thread_info->lcore;
734 	lw_thread->resched = true;
735 }
736 
737 static void
738 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
739 {
740 	struct spdk_scheduler_core_info *core;
741 	struct spdk_scheduler_thread_info *thread_info;
742 	uint32_t i, j;
743 
744 	SPDK_ENV_FOREACH_CORE(i) {
745 		core = &cores_info[i];
746 		for (j = 0; j < core->threads_count; j++) {
747 			thread_info = &core->thread_infos[j];
748 			if (thread_info->lcore != i) {
749 				if (core->isolated || cores_info[thread_info->lcore].isolated) {
750 					SPDK_ERRLOG("A thread cannot be moved from an isolated core or \
751 								moved to an isolated core. Skip rescheduling thread\n");
752 					continue;
753 				}
754 				_threads_reschedule_thread(thread_info);
755 			}
756 		}
757 		core->threads_count = 0;
758 		free(core->thread_infos);
759 		core->thread_infos = NULL;
760 	}
761 }
762 
763 static void
764 _reactors_scheduler_fini(void)
765 {
766 	/* Reschedule based on the balancing output */
767 	_threads_reschedule(g_core_infos);
768 
769 	g_scheduling_in_progress = false;
770 }
771 
772 static void
773 _reactors_scheduler_update_core_mode(void *ctx1, void *ctx2)
774 {
775 	struct spdk_reactor *reactor;
776 	uint32_t i;
777 	int rc = 0;
778 
779 	for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) {
780 		reactor = spdk_reactor_get(i);
781 		assert(reactor != NULL);
782 		if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) {
783 			/* Switch next found reactor to new state */
784 			rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode,
785 							     _reactors_scheduler_update_core_mode, NULL);
786 			if (rc == 0) {
787 				/* Set core to start with after callback completes */
788 				g_scheduler_core_number = spdk_env_get_next_core(i);
789 				return;
790 			}
791 		}
792 	}
793 	_reactors_scheduler_fini();
794 }
795 
796 static void
797 _reactors_scheduler_cancel(void *arg1, void *arg2)
798 {
799 	struct spdk_scheduler_core_info *core;
800 	uint32_t i;
801 
802 	SPDK_ENV_FOREACH_CORE(i) {
803 		core = &g_core_infos[i];
804 		core->threads_count = 0;
805 		free(core->thread_infos);
806 		core->thread_infos = NULL;
807 	}
808 
809 	g_scheduling_in_progress = false;
810 }
811 
812 static void
813 _reactors_scheduler_balance(void *arg1, void *arg2)
814 {
815 	struct spdk_scheduler *scheduler = spdk_scheduler_get();
816 
817 	if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) {
818 		_reactors_scheduler_cancel(NULL, NULL);
819 		return;
820 	}
821 
822 	scheduler->balance(g_core_infos, g_reactor_count);
823 
824 	g_scheduler_core_number = spdk_env_get_first_core();
825 	_reactors_scheduler_update_core_mode(NULL, NULL);
826 }
827 
828 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
829 static void
830 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
831 {
832 	struct spdk_scheduler_core_info *core_info;
833 	struct spdk_lw_thread *lw_thread;
834 	struct spdk_thread *thread;
835 	struct spdk_reactor *reactor;
836 	uint32_t next_core;
837 	uint32_t i = 0;
838 
839 	reactor = spdk_reactor_get(spdk_env_get_current_core());
840 	assert(reactor != NULL);
841 	core_info = &g_core_infos[reactor->lcore];
842 	core_info->lcore = reactor->lcore;
843 	core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc;
844 	core_info->total_idle_tsc = reactor->idle_tsc;
845 	core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc;
846 	core_info->total_busy_tsc = reactor->busy_tsc;
847 	core_info->interrupt_mode = reactor->in_interrupt;
848 	core_info->threads_count = 0;
849 	core_info->isolated = scheduler_is_isolated_core(reactor->lcore);
850 
851 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
852 
853 	if (reactor->thread_count > 0) {
854 		core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos));
855 		if (core_info->thread_infos == NULL) {
856 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
857 
858 			/* Cancel this round of schedule work */
859 			_event_call(spdk_scheduler_get_scheduling_lcore(), _reactors_scheduler_cancel, NULL, NULL);
860 			return;
861 		}
862 
863 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
864 			_init_thread_stats(reactor, lw_thread);
865 
866 			core_info->thread_infos[i].lcore = lw_thread->lcore;
867 			thread = spdk_thread_get_from_ctx(lw_thread);
868 			assert(thread != NULL);
869 			core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread);
870 			core_info->thread_infos[i].total_stats = lw_thread->total_stats;
871 			core_info->thread_infos[i].current_stats = lw_thread->current_stats;
872 			core_info->threads_count++;
873 			assert(core_info->threads_count <= reactor->thread_count);
874 			i++;
875 		}
876 	}
877 
878 	next_core = spdk_env_get_next_core(reactor->lcore);
879 	if (next_core == UINT32_MAX) {
880 		next_core = spdk_env_get_first_core();
881 	}
882 
883 	/* If we've looped back around to the scheduler thread, move to the next phase */
884 	if (next_core == spdk_scheduler_get_scheduling_lcore()) {
885 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
886 		_event_call(next_core, _reactors_scheduler_balance, NULL, NULL);
887 		return;
888 	}
889 
890 	_event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
891 }
892 
893 static int _reactor_schedule_thread(struct spdk_thread *thread);
894 static uint64_t g_rusage_period;
895 
896 static void
897 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
898 {
899 	struct spdk_thread	*thread = spdk_thread_get_from_ctx(lw_thread);
900 	struct spdk_fd_group	*grp;
901 
902 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
903 	assert(reactor->thread_count > 0);
904 	reactor->thread_count--;
905 
906 	/* Operate thread intr if running with full interrupt ability */
907 	if (spdk_interrupt_mode_is_enabled()) {
908 		if (reactor->in_interrupt) {
909 			grp = spdk_thread_get_interrupt_fd_group(thread);
910 			spdk_fd_group_unnest(reactor->fgrp, grp);
911 		}
912 	}
913 }
914 
915 static bool
916 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
917 {
918 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
919 
920 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
921 			  spdk_thread_is_idle(thread))) {
922 		_reactor_remove_lw_thread(reactor, lw_thread);
923 		spdk_thread_destroy(thread);
924 		return true;
925 	}
926 
927 	if (spdk_unlikely(lw_thread->resched && !spdk_thread_is_bound(thread))) {
928 		lw_thread->resched = false;
929 		_reactor_remove_lw_thread(reactor, lw_thread);
930 		_reactor_schedule_thread(thread);
931 		return true;
932 	}
933 
934 	return false;
935 }
936 
937 static void
938 reactor_interrupt_run(struct spdk_reactor *reactor)
939 {
940 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
941 
942 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
943 }
944 
945 static void
946 _reactor_run(struct spdk_reactor *reactor)
947 {
948 	struct spdk_thread	*thread;
949 	struct spdk_lw_thread	*lw_thread, *tmp;
950 	uint64_t		now;
951 	int			rc;
952 
953 	event_queue_run_batch(reactor);
954 
955 	/* If no threads are present on the reactor,
956 	 * tsc_last gets outdated. Update it to track
957 	 * thread execution time correctly. */
958 	if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) {
959 		now = spdk_get_ticks();
960 		reactor->idle_tsc += now - reactor->tsc_last;
961 		reactor->tsc_last = now;
962 		return;
963 	}
964 
965 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
966 		thread = spdk_thread_get_from_ctx(lw_thread);
967 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
968 
969 		now = spdk_thread_get_last_tsc(thread);
970 		if (rc == 0) {
971 			reactor->idle_tsc += now - reactor->tsc_last;
972 		} else if (rc > 0) {
973 			reactor->busy_tsc += now - reactor->tsc_last;
974 		}
975 		reactor->tsc_last = now;
976 
977 		reactor_post_process_lw_thread(reactor, lw_thread);
978 	}
979 }
980 
981 static int
982 reactor_run(void *arg)
983 {
984 	struct spdk_reactor	*reactor = arg;
985 	struct spdk_thread	*thread;
986 	struct spdk_lw_thread	*lw_thread, *tmp;
987 	char			thread_name[32];
988 	uint64_t		last_sched = 0;
989 
990 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
991 
992 	/* Rename the POSIX thread because the reactor is tied to the POSIX
993 	 * thread in the SPDK event library.
994 	 */
995 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
996 	_set_thread_name(thread_name);
997 
998 	reactor->tsc_last = spdk_get_ticks();
999 
1000 	while (1) {
1001 		/* Execute interrupt process fn if this reactor currently runs in interrupt state */
1002 		if (spdk_unlikely(reactor->in_interrupt)) {
1003 			reactor_interrupt_run(reactor);
1004 		} else {
1005 			_reactor_run(reactor);
1006 		}
1007 
1008 		if (g_framework_context_switch_monitor_enabled) {
1009 			if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
1010 				get_rusage(reactor);
1011 				reactor->last_rusage = reactor->tsc_last;
1012 			}
1013 		}
1014 
1015 		if (spdk_unlikely(g_scheduler_period > 0 &&
1016 				  (reactor->tsc_last - last_sched) > g_scheduler_period &&
1017 				  reactor == g_scheduling_reactor &&
1018 				  !g_scheduling_in_progress)) {
1019 			last_sched = reactor->tsc_last;
1020 			g_scheduling_in_progress = true;
1021 			_reactors_scheduler_gather_metrics(NULL, NULL);
1022 		}
1023 
1024 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
1025 			break;
1026 		}
1027 	}
1028 
1029 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
1030 		thread = spdk_thread_get_from_ctx(lw_thread);
1031 		/* All threads should have already had spdk_thread_exit() called on them, except
1032 		 * for the app thread.
1033 		 */
1034 		if (spdk_thread_is_running(thread)) {
1035 			if (!spdk_thread_is_app_thread(thread)) {
1036 				SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n",
1037 					    spdk_thread_get_name(thread));
1038 				SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n");
1039 			}
1040 			spdk_set_thread(thread);
1041 			spdk_thread_exit(thread);
1042 		}
1043 	}
1044 
1045 	while (!TAILQ_EMPTY(&reactor->threads)) {
1046 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1047 			thread = spdk_thread_get_from_ctx(lw_thread);
1048 			spdk_set_thread(thread);
1049 			if (spdk_thread_is_exited(thread)) {
1050 				_reactor_remove_lw_thread(reactor, lw_thread);
1051 				spdk_thread_destroy(thread);
1052 			} else {
1053 				if (spdk_unlikely(reactor->in_interrupt)) {
1054 					reactor_interrupt_run(reactor);
1055 				} else {
1056 					spdk_thread_poll(thread, 0, 0);
1057 				}
1058 			}
1059 		}
1060 	}
1061 
1062 	return 0;
1063 }
1064 
1065 int
1066 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
1067 {
1068 	int ret;
1069 	const struct spdk_cpuset *validmask;
1070 
1071 	ret = spdk_cpuset_parse(cpumask, mask);
1072 	if (ret < 0) {
1073 		return ret;
1074 	}
1075 
1076 	validmask = spdk_app_get_core_mask();
1077 	spdk_cpuset_and(cpumask, validmask);
1078 
1079 	return 0;
1080 }
1081 
1082 const struct spdk_cpuset *
1083 spdk_app_get_core_mask(void)
1084 {
1085 	return &g_reactor_core_mask;
1086 }
1087 
1088 void
1089 spdk_reactors_start(void)
1090 {
1091 	struct spdk_reactor *reactor;
1092 	uint32_t i, current_core;
1093 	int rc;
1094 
1095 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
1096 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
1097 	/* Reinitialize to false, in case the app framework is restarting in the same process. */
1098 	g_stopping_reactors = false;
1099 
1100 	current_core = spdk_env_get_current_core();
1101 	SPDK_ENV_FOREACH_CORE(i) {
1102 		if (i != current_core) {
1103 			reactor = spdk_reactor_get(i);
1104 			if (reactor == NULL) {
1105 				continue;
1106 			}
1107 
1108 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
1109 			if (rc < 0) {
1110 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
1111 				assert(false);
1112 				return;
1113 			}
1114 		}
1115 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
1116 	}
1117 
1118 	/* Start the main reactor */
1119 	reactor = spdk_reactor_get(current_core);
1120 	assert(reactor != NULL);
1121 	reactor_run(reactor);
1122 
1123 	spdk_env_thread_wait_all();
1124 
1125 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
1126 }
1127 
1128 static void
1129 _reactors_stop(void *arg1, void *arg2)
1130 {
1131 	uint32_t i;
1132 	int rc;
1133 	struct spdk_reactor *reactor;
1134 	struct spdk_reactor *local_reactor;
1135 	uint64_t notify = 1;
1136 
1137 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
1138 	local_reactor = spdk_reactor_get(spdk_env_get_current_core());
1139 
1140 	SPDK_ENV_FOREACH_CORE(i) {
1141 		/* If spdk_event_call isn't called  on a reactor, always send a notification.
1142 		 * If it is called on a reactor, send a notification if the destination reactor
1143 		 * is indicated in interrupt mode state.
1144 		 */
1145 		if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
1146 			reactor = spdk_reactor_get(i);
1147 			assert(reactor != NULL);
1148 			rc = write(reactor->events_fd, &notify, sizeof(notify));
1149 			if (rc < 0) {
1150 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
1151 				continue;
1152 			}
1153 		}
1154 	}
1155 }
1156 
1157 static void
1158 nop(void *arg1, void *arg2)
1159 {
1160 }
1161 
1162 void
1163 spdk_reactors_stop(void *arg1)
1164 {
1165 	spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop);
1166 }
1167 
1168 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
1169 static uint32_t g_next_core = UINT32_MAX;
1170 
1171 static void
1172 _schedule_thread(void *arg1, void *arg2)
1173 {
1174 	struct spdk_lw_thread *lw_thread = arg1;
1175 	struct spdk_thread *thread;
1176 	struct spdk_reactor *reactor;
1177 	uint32_t current_core;
1178 	struct spdk_fd_group *grp;
1179 
1180 	current_core = spdk_env_get_current_core();
1181 	reactor = spdk_reactor_get(current_core);
1182 	assert(reactor != NULL);
1183 
1184 	/* Update total_stats to reflect state of thread
1185 	* at the end of the move. */
1186 	thread = spdk_thread_get_from_ctx(lw_thread);
1187 	spdk_set_thread(thread);
1188 	spdk_thread_get_stats(&lw_thread->total_stats);
1189 	spdk_set_thread(NULL);
1190 
1191 	lw_thread->lcore = current_core;
1192 
1193 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
1194 	reactor->thread_count++;
1195 
1196 	/* Operate thread intr if running with full interrupt ability */
1197 	if (spdk_interrupt_mode_is_enabled()) {
1198 		int rc;
1199 
1200 		if (reactor->in_interrupt) {
1201 			grp = spdk_thread_get_interrupt_fd_group(thread);
1202 			rc = spdk_fd_group_nest(reactor->fgrp, grp);
1203 			if (rc < 0) {
1204 				SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
1205 			}
1206 		}
1207 
1208 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
1209 		spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor);
1210 	}
1211 }
1212 
1213 static int
1214 _reactor_schedule_thread(struct spdk_thread *thread)
1215 {
1216 	uint32_t core;
1217 	struct spdk_lw_thread *lw_thread;
1218 	struct spdk_event *evt = NULL;
1219 	struct spdk_cpuset *cpumask;
1220 	uint32_t i;
1221 	struct spdk_reactor *local_reactor = NULL;
1222 	uint32_t current_lcore = spdk_env_get_current_core();
1223 	struct spdk_cpuset polling_cpumask;
1224 	struct spdk_cpuset valid_cpumask;
1225 
1226 	cpumask = spdk_thread_get_cpumask(thread);
1227 
1228 	lw_thread = spdk_thread_get_ctx(thread);
1229 	assert(lw_thread != NULL);
1230 	core = lw_thread->lcore;
1231 	memset(lw_thread, 0, sizeof(*lw_thread));
1232 
1233 	if (current_lcore != SPDK_ENV_LCORE_ID_ANY) {
1234 		local_reactor = spdk_reactor_get(current_lcore);
1235 		assert(local_reactor);
1236 	}
1237 
1238 	/* When interrupt ability of spdk_thread is not enabled and the current
1239 	 * reactor runs on DPDK thread, skip reactors which are in interrupt mode.
1240 	 */
1241 	if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) {
1242 		/* Get the cpumask of all reactors in polling */
1243 		spdk_cpuset_zero(&polling_cpumask);
1244 		SPDK_ENV_FOREACH_CORE(i) {
1245 			spdk_cpuset_set_cpu(&polling_cpumask, i, true);
1246 		}
1247 		spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset);
1248 
1249 		if (core == SPDK_ENV_LCORE_ID_ANY) {
1250 			/* Get the cpumask of all valid reactors which are suggested and also in polling */
1251 			spdk_cpuset_copy(&valid_cpumask, &polling_cpumask);
1252 			spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread));
1253 
1254 			/* If there are any valid reactors, spdk_thread should be scheduled
1255 			 * into one of the valid reactors.
1256 			 * If there is no valid reactors, spdk_thread should be scheduled
1257 			 * into one of the polling reactors.
1258 			 */
1259 			if (spdk_cpuset_count(&valid_cpumask) != 0) {
1260 				cpumask = &valid_cpumask;
1261 			} else {
1262 				cpumask = &polling_cpumask;
1263 			}
1264 		} else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) {
1265 			/* If specified reactor is not in polling, spdk_thread should be scheduled
1266 			 * into one of the polling reactors.
1267 			 */
1268 			core = SPDK_ENV_LCORE_ID_ANY;
1269 			cpumask = &polling_cpumask;
1270 		}
1271 	}
1272 
1273 	pthread_mutex_lock(&g_scheduler_mtx);
1274 	if (core == SPDK_ENV_LCORE_ID_ANY) {
1275 		for (i = 0; i < spdk_env_get_core_count(); i++) {
1276 			if (g_next_core >= g_reactor_count) {
1277 				g_next_core = spdk_env_get_first_core();
1278 			}
1279 			core = g_next_core;
1280 			g_next_core = spdk_env_get_next_core(g_next_core);
1281 
1282 			if (spdk_cpuset_get_cpu(cpumask, core)) {
1283 				break;
1284 			}
1285 		}
1286 	}
1287 
1288 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
1289 
1290 	pthread_mutex_unlock(&g_scheduler_mtx);
1291 
1292 	assert(evt != NULL);
1293 	if (evt == NULL) {
1294 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
1295 		return -1;
1296 	}
1297 
1298 	lw_thread->tsc_start = spdk_get_ticks();
1299 
1300 	spdk_event_call(evt);
1301 
1302 	return 0;
1303 }
1304 
1305 static void
1306 _reactor_request_thread_reschedule(struct spdk_thread *thread)
1307 {
1308 	struct spdk_lw_thread *lw_thread;
1309 	struct spdk_reactor *reactor;
1310 	uint32_t current_core;
1311 
1312 	assert(thread == spdk_get_thread());
1313 
1314 	lw_thread = spdk_thread_get_ctx(thread);
1315 
1316 	assert(lw_thread != NULL);
1317 	lw_thread->resched = true;
1318 	lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1319 
1320 	current_core = spdk_env_get_current_core();
1321 	reactor = spdk_reactor_get(current_core);
1322 	assert(reactor != NULL);
1323 
1324 	/* Send a notification if the destination reactor is indicated in intr mode state */
1325 	if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
1326 		uint64_t notify = 1;
1327 
1328 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1329 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
1330 		}
1331 	}
1332 }
1333 
1334 static int
1335 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
1336 {
1337 	struct spdk_lw_thread *lw_thread;
1338 
1339 	switch (op) {
1340 	case SPDK_THREAD_OP_NEW:
1341 		lw_thread = spdk_thread_get_ctx(thread);
1342 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1343 		return _reactor_schedule_thread(thread);
1344 	case SPDK_THREAD_OP_RESCHED:
1345 		_reactor_request_thread_reschedule(thread);
1346 		return 0;
1347 	default:
1348 		return -ENOTSUP;
1349 	}
1350 }
1351 
1352 static bool
1353 reactor_thread_op_supported(enum spdk_thread_op op)
1354 {
1355 	switch (op) {
1356 	case SPDK_THREAD_OP_NEW:
1357 	case SPDK_THREAD_OP_RESCHED:
1358 		return true;
1359 	default:
1360 		return false;
1361 	}
1362 }
1363 
1364 struct call_reactor {
1365 	uint32_t cur_core;
1366 	spdk_event_fn fn;
1367 	void *arg1;
1368 	void *arg2;
1369 
1370 	uint32_t orig_core;
1371 	spdk_event_fn cpl;
1372 };
1373 
1374 static void
1375 on_reactor(void *arg1, void *arg2)
1376 {
1377 	struct call_reactor *cr = arg1;
1378 	struct spdk_event *evt;
1379 
1380 	cr->fn(cr->arg1, cr->arg2);
1381 
1382 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1383 
1384 	if (cr->cur_core >= g_reactor_count) {
1385 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1386 
1387 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1388 		free(cr);
1389 	} else {
1390 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1391 			      cr->cur_core);
1392 
1393 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1394 	}
1395 	assert(evt != NULL);
1396 	spdk_event_call(evt);
1397 }
1398 
1399 void
1400 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1401 {
1402 	struct call_reactor *cr;
1403 
1404 	/* When the application framework is shutting down, we will send one
1405 	 * final for_each_reactor operation with completion callback _reactors_stop,
1406 	 * to flush any existing for_each_reactor operations to avoid any memory
1407 	 * leaks. We use a mutex here to protect a boolean flag that will ensure
1408 	 * we don't start any more operations once we've started shutting down.
1409 	 */
1410 	pthread_mutex_lock(&g_stopping_reactors_mtx);
1411 	if (g_stopping_reactors) {
1412 		pthread_mutex_unlock(&g_stopping_reactors_mtx);
1413 		return;
1414 	} else if (cpl == _reactors_stop) {
1415 		g_stopping_reactors = true;
1416 	}
1417 	pthread_mutex_unlock(&g_stopping_reactors_mtx);
1418 
1419 	cr = calloc(1, sizeof(*cr));
1420 	if (!cr) {
1421 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1422 		cpl(arg1, arg2);
1423 		return;
1424 	}
1425 
1426 	cr->fn = fn;
1427 	cr->arg1 = arg1;
1428 	cr->arg2 = arg2;
1429 	cr->cpl = cpl;
1430 	cr->orig_core = spdk_env_get_current_core();
1431 	cr->cur_core = spdk_env_get_first_core();
1432 
1433 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1434 
1435 	_event_call(cr->cur_core, on_reactor, cr, NULL);
1436 }
1437 
1438 #ifdef __linux__
1439 static int
1440 reactor_schedule_thread_event(void *arg)
1441 {
1442 	struct spdk_reactor *reactor = arg;
1443 	struct spdk_lw_thread *lw_thread, *tmp;
1444 	uint32_t count = 0;
1445 	uint64_t notify = 1;
1446 
1447 	assert(reactor->in_interrupt);
1448 
1449 	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1450 		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
1451 		return -errno;
1452 	}
1453 
1454 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1455 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1456 	}
1457 
1458 	return count;
1459 }
1460 
1461 static int
1462 reactor_interrupt_init(struct spdk_reactor *reactor)
1463 {
1464 	int rc;
1465 
1466 	rc = spdk_fd_group_create(&reactor->fgrp);
1467 	if (rc != 0) {
1468 		return rc;
1469 	}
1470 
1471 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1472 	if (reactor->resched_fd < 0) {
1473 		rc = -EBADF;
1474 		goto err;
1475 	}
1476 
1477 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event,
1478 			       reactor);
1479 	if (rc) {
1480 		close(reactor->resched_fd);
1481 		goto err;
1482 	}
1483 
1484 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1485 	if (reactor->events_fd < 0) {
1486 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1487 		close(reactor->resched_fd);
1488 
1489 		rc = -EBADF;
1490 		goto err;
1491 	}
1492 
1493 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd,
1494 			       event_queue_run_batch, reactor);
1495 	if (rc) {
1496 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1497 		close(reactor->resched_fd);
1498 		close(reactor->events_fd);
1499 		goto err;
1500 	}
1501 
1502 	return 0;
1503 
1504 err:
1505 	spdk_fd_group_destroy(reactor->fgrp);
1506 	reactor->fgrp = NULL;
1507 	return rc;
1508 }
1509 #else
1510 static int
1511 reactor_interrupt_init(struct spdk_reactor *reactor)
1512 {
1513 	return -ENOTSUP;
1514 }
1515 #endif
1516 
1517 static void
1518 reactor_interrupt_fini(struct spdk_reactor *reactor)
1519 {
1520 	struct spdk_fd_group *fgrp = reactor->fgrp;
1521 
1522 	if (!fgrp) {
1523 		return;
1524 	}
1525 
1526 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1527 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1528 
1529 	close(reactor->events_fd);
1530 	close(reactor->resched_fd);
1531 
1532 	spdk_fd_group_destroy(fgrp);
1533 	reactor->fgrp = NULL;
1534 }
1535 
1536 static struct spdk_governor *
1537 _governor_find(const char *name)
1538 {
1539 	struct spdk_governor *governor, *tmp;
1540 
1541 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1542 		if (strcmp(name, governor->name) == 0) {
1543 			return governor;
1544 		}
1545 	}
1546 
1547 	return NULL;
1548 }
1549 
1550 int
1551 spdk_governor_set(const char *name)
1552 {
1553 	struct spdk_governor *governor;
1554 	int rc = 0;
1555 
1556 	/* NULL governor was specifically requested */
1557 	if (name == NULL) {
1558 		if (g_governor) {
1559 			g_governor->deinit();
1560 		}
1561 		g_governor = NULL;
1562 		return 0;
1563 	}
1564 
1565 	governor = _governor_find(name);
1566 	if (governor == NULL) {
1567 		return -EINVAL;
1568 	}
1569 
1570 	if (g_governor == governor) {
1571 		return 0;
1572 	}
1573 
1574 	rc = governor->init();
1575 	if (rc == 0) {
1576 		if (g_governor) {
1577 			g_governor->deinit();
1578 		}
1579 		g_governor = governor;
1580 	}
1581 
1582 	return rc;
1583 }
1584 
1585 struct spdk_governor *
1586 spdk_governor_get(void)
1587 {
1588 	return g_governor;
1589 }
1590 
1591 void
1592 spdk_governor_register(struct spdk_governor *governor)
1593 {
1594 	if (_governor_find(governor->name)) {
1595 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1596 		assert(false);
1597 		return;
1598 	}
1599 
1600 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1601 }
1602 
1603 SPDK_LOG_REGISTER_COMPONENT(reactor)
1604