xref: /spdk/lib/event/reactor.c (revision 1a5bdab325d2bb6c7cf6257050726e29e3aaf414)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/likely.h"
8 
9 #include "event_internal.h"
10 
11 #include "spdk_internal/event.h"
12 #include "spdk_internal/usdt.h"
13 
14 #include "spdk/log.h"
15 #include "spdk/thread.h"
16 #include "spdk/env.h"
17 #include "spdk/util.h"
18 #include "spdk/scheduler.h"
19 #include "spdk/string.h"
20 #include "spdk/fd_group.h"
21 #include "spdk/trace.h"
22 #include "spdk_internal/trace_defs.h"
23 
24 #ifdef __linux__
25 #include <sys/prctl.h>
26 #include <sys/eventfd.h>
27 #endif
28 
29 #ifdef __FreeBSD__
30 #include <pthread_np.h>
31 #endif
32 
33 #define SPDK_EVENT_BATCH_SIZE		8
34 
35 static struct spdk_reactor *g_reactors;
36 static uint32_t g_reactor_count;
37 static struct spdk_cpuset g_reactor_core_mask;
38 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
39 
40 static bool g_framework_context_switch_monitor_enabled = true;
41 
42 static struct spdk_mempool *g_spdk_event_mempool = NULL;
43 
44 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
45 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
46 
47 static struct spdk_scheduler *g_scheduler = NULL;
48 static struct spdk_reactor *g_scheduling_reactor;
49 bool g_scheduling_in_progress = false;
50 static uint64_t g_scheduler_period_in_tsc = 0;
51 static uint64_t g_scheduler_period_in_us;
52 static uint32_t g_scheduler_core_number;
53 static struct spdk_scheduler_core_info *g_core_infos = NULL;
54 static struct spdk_cpuset g_scheduler_isolated_core_mask;
55 
56 TAILQ_HEAD(, spdk_governor) g_governor_list
57 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
58 
59 static struct spdk_governor *g_governor = NULL;
60 
61 static int reactor_interrupt_init(struct spdk_reactor *reactor);
62 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
63 
64 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER;
65 static bool g_stopping_reactors = false;
66 
67 static struct spdk_scheduler *
68 _scheduler_find(const char *name)
69 {
70 	struct spdk_scheduler *tmp;
71 
72 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
73 		if (strcmp(name, tmp->name) == 0) {
74 			return tmp;
75 		}
76 	}
77 
78 	return NULL;
79 }
80 
81 int
82 spdk_scheduler_set(const char *name)
83 {
84 	struct spdk_scheduler *scheduler;
85 	int rc = 0;
86 
87 	/* NULL scheduler was specifically requested */
88 	if (name == NULL) {
89 		if (g_scheduler) {
90 			g_scheduler->deinit();
91 		}
92 		g_scheduler = NULL;
93 		return 0;
94 	}
95 
96 	scheduler = _scheduler_find(name);
97 	if (scheduler == NULL) {
98 		SPDK_ERRLOG("Requested scheduler is missing\n");
99 		return -EINVAL;
100 	}
101 
102 	if (g_scheduler == scheduler) {
103 		return 0;
104 	}
105 
106 	if (g_scheduler) {
107 		g_scheduler->deinit();
108 	}
109 
110 	rc = scheduler->init();
111 	if (rc == 0) {
112 		g_scheduler = scheduler;
113 	} else {
114 		/* Could not switch to the new scheduler, so keep the old
115 		 * one. We need to check if it wasn't NULL, and ->init() it again.
116 		 */
117 		if (g_scheduler) {
118 			SPDK_ERRLOG("Could not ->init() '%s' scheduler, reverting to '%s'\n",
119 				    name, g_scheduler->name);
120 			g_scheduler->init();
121 		} else {
122 			SPDK_ERRLOG("Could not ->init() '%s' scheduler.\n", name);
123 		}
124 	}
125 
126 	return rc;
127 }
128 
129 struct spdk_scheduler *
130 spdk_scheduler_get(void)
131 {
132 	return g_scheduler;
133 }
134 
135 uint64_t
136 spdk_scheduler_get_period(void)
137 {
138 	return g_scheduler_period_in_us;
139 }
140 
141 void
142 spdk_scheduler_set_period(uint64_t period)
143 {
144 	g_scheduler_period_in_us = period;
145 	g_scheduler_period_in_tsc = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
146 }
147 
148 void
149 spdk_scheduler_register(struct spdk_scheduler *scheduler)
150 {
151 	if (_scheduler_find(scheduler->name)) {
152 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
153 		assert(false);
154 		return;
155 	}
156 
157 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
158 }
159 
160 uint32_t
161 spdk_scheduler_get_scheduling_lcore(void)
162 {
163 	return g_scheduling_reactor->lcore;
164 }
165 
166 bool
167 spdk_scheduler_set_scheduling_lcore(uint32_t core)
168 {
169 	struct spdk_reactor *reactor = spdk_reactor_get(core);
170 	if (reactor == NULL) {
171 		SPDK_ERRLOG("Failed to set scheduling reactor. Reactor(lcore:%d) does not exist", core);
172 		return false;
173 	}
174 
175 	g_scheduling_reactor = reactor;
176 	return true;
177 }
178 
179 bool
180 scheduler_set_isolated_core_mask(struct spdk_cpuset isolated_core_mask)
181 {
182 	struct spdk_cpuset tmp_mask;
183 
184 	spdk_cpuset_copy(&tmp_mask, spdk_app_get_core_mask());
185 	spdk_cpuset_or(&tmp_mask, &isolated_core_mask);
186 	if (spdk_cpuset_equal(&tmp_mask, spdk_app_get_core_mask()) == false) {
187 		SPDK_ERRLOG("Isolated core mask is not included in app core mask.\n");
188 		return false;
189 	}
190 	spdk_cpuset_copy(&g_scheduler_isolated_core_mask, &isolated_core_mask);
191 	return true;
192 }
193 
194 const char *
195 scheduler_get_isolated_core_mask(void)
196 {
197 	return spdk_cpuset_fmt(&g_scheduler_isolated_core_mask);
198 }
199 
200 static bool
201 scheduler_is_isolated_core(uint32_t core)
202 {
203 	return spdk_cpuset_get_cpu(&g_scheduler_isolated_core_mask, core);
204 }
205 
206 static void
207 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
208 {
209 	reactor->lcore = lcore;
210 	reactor->flags.is_valid = true;
211 
212 	TAILQ_INIT(&reactor->threads);
213 	reactor->thread_count = 0;
214 	spdk_cpuset_zero(&reactor->notify_cpuset);
215 
216 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY);
217 	if (reactor->events == NULL) {
218 		SPDK_ERRLOG("Failed to allocate events ring\n");
219 		assert(false);
220 	}
221 
222 	/* Always initialize interrupt facilities for reactor */
223 	if (reactor_interrupt_init(reactor) != 0) {
224 		/* Reactor interrupt facilities are necessary if setting app to interrupt mode. */
225 		if (spdk_interrupt_mode_is_enabled()) {
226 			SPDK_ERRLOG("Failed to prepare intr facilities\n");
227 			assert(false);
228 		}
229 		return;
230 	}
231 
232 	/* If application runs with full interrupt ability,
233 	 * all reactors are going to run in interrupt mode.
234 	 */
235 	if (spdk_interrupt_mode_is_enabled()) {
236 		uint32_t i;
237 
238 		SPDK_ENV_FOREACH_CORE(i) {
239 			spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
240 		}
241 		reactor->in_interrupt = true;
242 	}
243 }
244 
245 struct spdk_reactor *
246 spdk_reactor_get(uint32_t lcore)
247 {
248 	struct spdk_reactor *reactor;
249 
250 	if (g_reactors == NULL) {
251 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
252 		return NULL;
253 	}
254 
255 	if (lcore >= g_reactor_count) {
256 		return NULL;
257 	}
258 
259 	reactor = &g_reactors[lcore];
260 
261 	if (reactor->flags.is_valid == false) {
262 		return NULL;
263 	}
264 
265 	return reactor;
266 }
267 
268 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
269 static bool reactor_thread_op_supported(enum spdk_thread_op op);
270 
271 /* Power of 2 minus 1 is optimal for memory consumption */
272 #define EVENT_MSG_MEMPOOL_SHIFT 14 /* 2^14 = 16384 */
273 #define EVENT_MSG_MEMPOOL_SIZE ((1 << EVENT_MSG_MEMPOOL_SHIFT) - 1)
274 
275 int
276 spdk_reactors_init(size_t msg_mempool_size)
277 {
278 	struct spdk_reactor *reactor;
279 	int rc;
280 	uint32_t i, current_core;
281 	char mempool_name[32];
282 
283 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
284 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
285 			       EVENT_MSG_MEMPOOL_SIZE,
286 			       sizeof(struct spdk_event),
287 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
288 			       SPDK_ENV_NUMA_ID_ANY);
289 
290 	if (g_spdk_event_mempool == NULL) {
291 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
292 		return -1;
293 	}
294 
295 	/* struct spdk_reactor must be aligned on 64 byte boundary */
296 	g_reactor_count = spdk_env_get_last_core() + 1;
297 	rc = posix_memalign((void **)&g_reactors, 64,
298 			    g_reactor_count * sizeof(struct spdk_reactor));
299 	if (rc != 0) {
300 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
301 			    g_reactor_count);
302 		spdk_mempool_free(g_spdk_event_mempool);
303 		return -1;
304 	}
305 
306 	g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos));
307 	if (g_core_infos == NULL) {
308 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
309 		spdk_mempool_free(g_spdk_event_mempool);
310 		free(g_reactors);
311 		return -ENOMEM;
312 	}
313 
314 	memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor));
315 
316 	rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
317 				      sizeof(struct spdk_lw_thread), msg_mempool_size);
318 	if (rc != 0) {
319 		SPDK_ERRLOG("Initialize spdk thread lib failed\n");
320 		spdk_mempool_free(g_spdk_event_mempool);
321 		free(g_reactors);
322 		free(g_core_infos);
323 		return rc;
324 	}
325 
326 	SPDK_ENV_FOREACH_CORE(i) {
327 		reactor_construct(&g_reactors[i], i);
328 	}
329 
330 	current_core = spdk_env_get_current_core();
331 	reactor = spdk_reactor_get(current_core);
332 	assert(reactor != NULL);
333 	g_scheduling_reactor = reactor;
334 
335 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
336 
337 	return 0;
338 }
339 
340 void
341 spdk_reactors_fini(void)
342 {
343 	uint32_t i;
344 	struct spdk_reactor *reactor;
345 
346 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
347 		return;
348 	}
349 
350 	spdk_thread_lib_fini();
351 
352 	SPDK_ENV_FOREACH_CORE(i) {
353 		reactor = spdk_reactor_get(i);
354 		assert(reactor != NULL);
355 		assert(reactor->thread_count == 0);
356 		if (reactor->events != NULL) {
357 			spdk_ring_free(reactor->events);
358 		}
359 
360 		reactor_interrupt_fini(reactor);
361 
362 		if (g_core_infos != NULL) {
363 			free(g_core_infos[i].thread_infos);
364 		}
365 	}
366 
367 	spdk_mempool_free(g_spdk_event_mempool);
368 
369 	free(g_reactors);
370 	g_reactors = NULL;
371 	free(g_core_infos);
372 	g_core_infos = NULL;
373 }
374 
375 static void _reactor_set_interrupt_mode(void *arg1, void *arg2);
376 
377 static void
378 _reactor_set_notify_cpuset(void *arg1, void *arg2)
379 {
380 	struct spdk_reactor *target = arg1;
381 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
382 
383 	assert(reactor != NULL);
384 	spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt);
385 }
386 
387 static void
388 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
389 {
390 	struct spdk_event *ev;
391 
392 	ev = spdk_event_allocate(lcore, fn, arg1, arg2);
393 	assert(ev);
394 	spdk_event_call(ev);
395 }
396 
397 static void
398 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2)
399 {
400 	struct spdk_reactor *target = arg1;
401 
402 	if (target->new_in_interrupt == false) {
403 		target->set_interrupt_mode_in_progress = false;
404 		_event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn,
405 			    target->set_interrupt_mode_cb_arg, NULL);
406 	} else {
407 		_event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL);
408 	}
409 }
410 
411 static void
412 _reactor_set_thread_interrupt_mode(void *ctx)
413 {
414 	struct spdk_reactor *reactor = ctx;
415 
416 	spdk_thread_set_interrupt_mode(reactor->in_interrupt);
417 }
418 
419 static void
420 _reactor_set_interrupt_mode(void *arg1, void *arg2)
421 {
422 	struct spdk_reactor *target = arg1;
423 	struct spdk_thread *thread;
424 	struct spdk_fd_group *grp;
425 	struct spdk_lw_thread *lw_thread, *tmp;
426 
427 	assert(target == spdk_reactor_get(spdk_env_get_current_core()));
428 	assert(target != NULL);
429 	assert(target->in_interrupt != target->new_in_interrupt);
430 	SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n",
431 		      target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll");
432 
433 	target->in_interrupt = target->new_in_interrupt;
434 
435 	if (spdk_interrupt_mode_is_enabled()) {
436 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
437 		TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) {
438 			thread = spdk_thread_get_from_ctx(lw_thread);
439 			if (target->in_interrupt) {
440 				grp = spdk_thread_get_interrupt_fd_group(thread);
441 				spdk_fd_group_nest(target->fgrp, grp);
442 			} else {
443 				grp = spdk_thread_get_interrupt_fd_group(thread);
444 				spdk_fd_group_unnest(target->fgrp, grp);
445 			}
446 
447 			spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target);
448 		}
449 	}
450 
451 	if (target->new_in_interrupt == false) {
452 		/* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately
453 		 * track reactor stats. */
454 		target->tsc_last = spdk_get_ticks();
455 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
456 	} else {
457 		uint64_t notify = 1;
458 		int rc = 0;
459 
460 		/* Always trigger spdk_event and resched event in case of race condition */
461 		rc = write(target->events_fd, &notify, sizeof(notify));
462 		if (rc < 0) {
463 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
464 		}
465 		rc = write(target->resched_fd, &notify, sizeof(notify));
466 		if (rc < 0) {
467 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
468 		}
469 
470 		target->set_interrupt_mode_in_progress = false;
471 		_event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn,
472 			    target->set_interrupt_mode_cb_arg, NULL);
473 	}
474 }
475 
476 int
477 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt,
478 				spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg)
479 {
480 	struct spdk_reactor *target;
481 
482 	target = spdk_reactor_get(lcore);
483 	if (target == NULL) {
484 		return -EINVAL;
485 	}
486 
487 	/* Eventfd has to be supported in order to use interrupt functionality. */
488 	if (target->fgrp == NULL) {
489 		return -ENOTSUP;
490 	}
491 
492 	if (spdk_env_get_current_core() != g_scheduling_reactor->lcore) {
493 		SPDK_ERRLOG("It is only permitted within scheduling reactor.\n");
494 		return -EPERM;
495 	}
496 
497 	if (target->in_interrupt == new_in_interrupt) {
498 		cb_fn(cb_arg, NULL);
499 		return 0;
500 	}
501 
502 	if (target->set_interrupt_mode_in_progress) {
503 		SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore);
504 		return -EBUSY;
505 	}
506 	target->set_interrupt_mode_in_progress = true;
507 
508 	target->new_in_interrupt = new_in_interrupt;
509 	target->set_interrupt_mode_cb_fn = cb_fn;
510 	target->set_interrupt_mode_cb_arg = cb_arg;
511 
512 	SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n",
513 		      spdk_env_get_current_core(), lcore);
514 
515 	if (new_in_interrupt == false) {
516 		/* For potential race cases, when setting the reactor to poll mode,
517 		 * first change the mode of the reactor and then clear the corresponding
518 		 * bit of the notify_cpuset of each reactor.
519 		 */
520 		_event_call(lcore, _reactor_set_interrupt_mode, target, NULL);
521 	} else {
522 		/* For race cases, when setting the reactor to interrupt mode, first set the
523 		 * corresponding bit of the notify_cpuset of each reactor and then change the mode.
524 		 */
525 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
526 	}
527 
528 	return 0;
529 }
530 
531 struct spdk_event *
532 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
533 {
534 	struct spdk_event *event = NULL;
535 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
536 
537 	if (!reactor) {
538 		assert(false);
539 		return NULL;
540 	}
541 
542 	event = spdk_mempool_get(g_spdk_event_mempool);
543 	if (event == NULL) {
544 		assert(false);
545 		return NULL;
546 	}
547 
548 	event->lcore = lcore;
549 	event->fn = fn;
550 	event->arg1 = arg1;
551 	event->arg2 = arg2;
552 
553 	return event;
554 }
555 
556 void
557 spdk_event_call(struct spdk_event *event)
558 {
559 	int rc;
560 	struct spdk_reactor *reactor;
561 	struct spdk_reactor *local_reactor = NULL;
562 	uint32_t current_core = spdk_env_get_current_core();
563 
564 	reactor = spdk_reactor_get(event->lcore);
565 
566 	assert(reactor != NULL);
567 	assert(reactor->events != NULL);
568 
569 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
570 	if (rc != 1) {
571 		assert(false);
572 	}
573 
574 	if (current_core != SPDK_ENV_LCORE_ID_ANY) {
575 		local_reactor = spdk_reactor_get(current_core);
576 	}
577 
578 	/* If spdk_event_call isn't called on a reactor, always send a notification.
579 	 * If it is called on a reactor, send a notification if the destination reactor
580 	 * is indicated in interrupt mode state.
581 	 */
582 	if (spdk_unlikely(local_reactor == NULL) ||
583 	    spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
584 		uint64_t notify = 1;
585 
586 		rc = write(reactor->events_fd, &notify, sizeof(notify));
587 		if (rc < 0) {
588 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
589 		}
590 	}
591 }
592 
593 static inline int
594 event_queue_run_batch(void *arg)
595 {
596 	struct spdk_reactor *reactor = arg;
597 	size_t count, i;
598 	void *events[SPDK_EVENT_BATCH_SIZE];
599 
600 #ifdef DEBUG
601 	/*
602 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
603 	 * so we will never actually read uninitialized data from events, but just to be sure
604 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
605 	 */
606 	memset(events, 0, sizeof(events));
607 #endif
608 
609 	/* Operate event notification if this reactor currently runs in interrupt state */
610 	if (spdk_unlikely(reactor->in_interrupt)) {
611 		uint64_t notify = 1;
612 		int rc;
613 
614 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
615 
616 		if (spdk_ring_count(reactor->events) != 0) {
617 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
618 			rc = write(reactor->events_fd, &notify, sizeof(notify));
619 			if (rc < 0) {
620 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
621 				return -errno;
622 			}
623 		}
624 	} else {
625 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
626 	}
627 
628 	if (count == 0) {
629 		return 0;
630 	}
631 
632 	for (i = 0; i < count; i++) {
633 		struct spdk_event *event = events[i];
634 
635 		assert(event != NULL);
636 		assert(spdk_get_thread() == NULL);
637 		SPDK_DTRACE_PROBE3(event_exec, event->fn,
638 				   event->arg1, event->arg2);
639 		event->fn(event->arg1, event->arg2);
640 	}
641 
642 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
643 
644 	return (int)count;
645 }
646 
647 /* 1s */
648 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
649 
650 static int
651 get_rusage(struct spdk_reactor *reactor)
652 {
653 	struct rusage		rusage;
654 
655 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
656 		return -1;
657 	}
658 
659 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
660 		SPDK_INFOLOG(reactor,
661 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
662 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
663 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
664 	}
665 	reactor->rusage = rusage;
666 
667 	return -1;
668 }
669 
670 void
671 spdk_framework_enable_context_switch_monitor(bool enable)
672 {
673 	/* This global is being read by multiple threads, so this isn't
674 	 * strictly thread safe. However, we're toggling between true and
675 	 * false here, and if a thread sees the value update later than it
676 	 * should, it's no big deal. */
677 	g_framework_context_switch_monitor_enabled = enable;
678 }
679 
680 bool
681 spdk_framework_context_switch_monitor_enabled(void)
682 {
683 	return g_framework_context_switch_monitor_enabled;
684 }
685 
686 static void
687 _set_thread_name(const char *thread_name)
688 {
689 #if defined(__linux__)
690 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
691 #elif defined(__FreeBSD__)
692 	pthread_set_name_np(pthread_self(), thread_name);
693 #else
694 	pthread_setname_np(pthread_self(), thread_name);
695 #endif
696 }
697 
698 static void
699 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
700 {
701 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
702 	struct spdk_thread_stats prev_total_stats;
703 
704 	/* Read total_stats before updating it to calculate stats during the last scheduling period. */
705 	prev_total_stats = lw_thread->total_stats;
706 
707 	spdk_set_thread(thread);
708 	spdk_thread_get_stats(&lw_thread->total_stats);
709 	spdk_set_thread(NULL);
710 
711 	lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc;
712 	lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc;
713 }
714 
715 static void
716 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info)
717 {
718 	struct spdk_lw_thread *lw_thread;
719 	struct spdk_thread *thread;
720 
721 	thread = spdk_thread_get_by_id(thread_info->thread_id);
722 	if (thread == NULL) {
723 		/* Thread no longer exists. */
724 		return;
725 	}
726 	lw_thread = spdk_thread_get_ctx(thread);
727 	assert(lw_thread != NULL);
728 
729 	lw_thread->lcore = thread_info->lcore;
730 	lw_thread->resched = true;
731 }
732 
733 static void
734 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
735 {
736 	struct spdk_scheduler_core_info *core;
737 	struct spdk_scheduler_thread_info *thread_info;
738 	uint32_t i, j;
739 
740 	SPDK_ENV_FOREACH_CORE(i) {
741 		core = &cores_info[i];
742 		for (j = 0; j < core->threads_count; j++) {
743 			thread_info = &core->thread_infos[j];
744 			if (thread_info->lcore != i) {
745 				if (core->isolated || cores_info[thread_info->lcore].isolated) {
746 					SPDK_ERRLOG("A thread cannot be moved from an isolated core or \
747 								moved to an isolated core. Skip rescheduling thread\n");
748 					continue;
749 				}
750 				_threads_reschedule_thread(thread_info);
751 			}
752 		}
753 		core->threads_count = 0;
754 		free(core->thread_infos);
755 		core->thread_infos = NULL;
756 	}
757 }
758 
759 static void
760 _reactors_scheduler_fini(void)
761 {
762 	/* Reschedule based on the balancing output */
763 	_threads_reschedule(g_core_infos);
764 
765 	g_scheduling_in_progress = false;
766 }
767 
768 static void
769 _reactors_scheduler_update_core_mode(void *ctx1, void *ctx2)
770 {
771 	struct spdk_reactor *reactor;
772 	uint32_t i;
773 	int rc = 0;
774 
775 	for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) {
776 		reactor = spdk_reactor_get(i);
777 		assert(reactor != NULL);
778 		if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) {
779 			/* Switch next found reactor to new state */
780 			rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode,
781 							     _reactors_scheduler_update_core_mode, NULL);
782 			if (rc == 0) {
783 				/* Set core to start with after callback completes */
784 				g_scheduler_core_number = spdk_env_get_next_core(i);
785 				return;
786 			}
787 		}
788 	}
789 	_reactors_scheduler_fini();
790 }
791 
792 static void
793 _reactors_scheduler_cancel(void *arg1, void *arg2)
794 {
795 	struct spdk_scheduler_core_info *core;
796 	uint32_t i;
797 
798 	SPDK_ENV_FOREACH_CORE(i) {
799 		core = &g_core_infos[i];
800 		core->threads_count = 0;
801 		free(core->thread_infos);
802 		core->thread_infos = NULL;
803 	}
804 
805 	g_scheduling_in_progress = false;
806 }
807 
808 static void
809 _reactors_scheduler_balance(void *arg1, void *arg2)
810 {
811 	struct spdk_scheduler *scheduler = spdk_scheduler_get();
812 
813 	if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) {
814 		_reactors_scheduler_cancel(NULL, NULL);
815 		return;
816 	}
817 
818 	scheduler->balance(g_core_infos, g_reactor_count);
819 
820 	g_scheduler_core_number = spdk_env_get_first_core();
821 	_reactors_scheduler_update_core_mode(NULL, NULL);
822 }
823 
824 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
825 static void
826 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
827 {
828 	struct spdk_scheduler_core_info *core_info;
829 	struct spdk_lw_thread *lw_thread;
830 	struct spdk_thread *thread;
831 	struct spdk_reactor *reactor;
832 	uint32_t next_core;
833 	uint32_t i = 0;
834 
835 	reactor = spdk_reactor_get(spdk_env_get_current_core());
836 	assert(reactor != NULL);
837 	core_info = &g_core_infos[reactor->lcore];
838 	core_info->lcore = reactor->lcore;
839 	core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc;
840 	core_info->total_idle_tsc = reactor->idle_tsc;
841 	core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc;
842 	core_info->total_busy_tsc = reactor->busy_tsc;
843 	core_info->interrupt_mode = reactor->in_interrupt;
844 	core_info->threads_count = 0;
845 	core_info->isolated = scheduler_is_isolated_core(reactor->lcore);
846 
847 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
848 
849 	spdk_trace_record(TRACE_SCHEDULER_CORE_STATS, reactor->trace_id, 0, 0,
850 			  core_info->current_busy_tsc,
851 			  core_info->current_idle_tsc);
852 
853 	if (reactor->thread_count > 0) {
854 		core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos));
855 		if (core_info->thread_infos == NULL) {
856 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
857 
858 			/* Cancel this round of schedule work */
859 			_event_call(spdk_scheduler_get_scheduling_lcore(), _reactors_scheduler_cancel, NULL, NULL);
860 			return;
861 		}
862 
863 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
864 			_init_thread_stats(reactor, lw_thread);
865 
866 			core_info->thread_infos[i].lcore = lw_thread->lcore;
867 			thread = spdk_thread_get_from_ctx(lw_thread);
868 			assert(thread != NULL);
869 			core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread);
870 			core_info->thread_infos[i].total_stats = lw_thread->total_stats;
871 			core_info->thread_infos[i].current_stats = lw_thread->current_stats;
872 			core_info->threads_count++;
873 			assert(core_info->threads_count <= reactor->thread_count);
874 
875 			spdk_trace_record(TRACE_SCHEDULER_THREAD_STATS, spdk_thread_get_trace_id(thread), 0, 0,
876 					  lw_thread->current_stats.busy_tsc,
877 					  lw_thread->current_stats.idle_tsc);
878 
879 			i++;
880 		}
881 	}
882 
883 	next_core = spdk_env_get_next_core(reactor->lcore);
884 	if (next_core == UINT32_MAX) {
885 		next_core = spdk_env_get_first_core();
886 	}
887 
888 	/* If we've looped back around to the scheduler thread, move to the next phase */
889 	if (next_core == spdk_scheduler_get_scheduling_lcore()) {
890 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
891 		_event_call(next_core, _reactors_scheduler_balance, NULL, NULL);
892 		return;
893 	}
894 
895 	_event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
896 }
897 
898 static int _reactor_schedule_thread(struct spdk_thread *thread);
899 static uint64_t g_rusage_period;
900 
901 static void
902 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
903 {
904 	struct spdk_thread	*thread = spdk_thread_get_from_ctx(lw_thread);
905 	struct spdk_fd_group	*grp;
906 
907 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
908 	assert(reactor->thread_count > 0);
909 	reactor->thread_count--;
910 
911 	/* Operate thread intr if running with full interrupt ability */
912 	if (spdk_interrupt_mode_is_enabled()) {
913 		if (reactor->in_interrupt) {
914 			grp = spdk_thread_get_interrupt_fd_group(thread);
915 			spdk_fd_group_unnest(reactor->fgrp, grp);
916 		}
917 	}
918 }
919 
920 static bool
921 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
922 {
923 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
924 
925 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
926 			  spdk_thread_is_idle(thread))) {
927 		_reactor_remove_lw_thread(reactor, lw_thread);
928 		spdk_thread_destroy(thread);
929 		return true;
930 	}
931 
932 	if (spdk_unlikely(lw_thread->resched && !spdk_thread_is_bound(thread))) {
933 		lw_thread->resched = false;
934 		_reactor_remove_lw_thread(reactor, lw_thread);
935 		_reactor_schedule_thread(thread);
936 		return true;
937 	}
938 
939 	return false;
940 }
941 
942 static void
943 reactor_interrupt_run(struct spdk_reactor *reactor)
944 {
945 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
946 
947 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
948 }
949 
950 static void
951 _reactor_run(struct spdk_reactor *reactor)
952 {
953 	struct spdk_thread	*thread;
954 	struct spdk_lw_thread	*lw_thread, *tmp;
955 	uint64_t		now;
956 	int			rc;
957 
958 	event_queue_run_batch(reactor);
959 
960 	/* If no threads are present on the reactor,
961 	 * tsc_last gets outdated. Update it to track
962 	 * thread execution time correctly. */
963 	if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) {
964 		now = spdk_get_ticks();
965 		reactor->idle_tsc += now - reactor->tsc_last;
966 		reactor->tsc_last = now;
967 		return;
968 	}
969 
970 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
971 		thread = spdk_thread_get_from_ctx(lw_thread);
972 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
973 
974 		now = spdk_thread_get_last_tsc(thread);
975 		if (rc == 0) {
976 			reactor->idle_tsc += now - reactor->tsc_last;
977 		} else if (rc > 0) {
978 			reactor->busy_tsc += now - reactor->tsc_last;
979 		}
980 		reactor->tsc_last = now;
981 
982 		reactor_post_process_lw_thread(reactor, lw_thread);
983 	}
984 }
985 
986 static int
987 reactor_run(void *arg)
988 {
989 	struct spdk_reactor	*reactor = arg;
990 	struct spdk_thread	*thread;
991 	struct spdk_lw_thread	*lw_thread, *tmp;
992 	char			thread_name[32];
993 	uint64_t		last_sched = 0;
994 
995 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
996 
997 	/* Rename the POSIX thread because the reactor is tied to the POSIX
998 	 * thread in the SPDK event library.
999 	 */
1000 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
1001 	_set_thread_name(thread_name);
1002 
1003 	reactor->trace_id = spdk_trace_register_owner(OWNER_TYPE_REACTOR, thread_name);
1004 
1005 	reactor->tsc_last = spdk_get_ticks();
1006 
1007 	while (1) {
1008 		/* Execute interrupt process fn if this reactor currently runs in interrupt state */
1009 		if (spdk_unlikely(reactor->in_interrupt)) {
1010 			reactor_interrupt_run(reactor);
1011 		} else {
1012 			_reactor_run(reactor);
1013 		}
1014 
1015 		if (g_framework_context_switch_monitor_enabled) {
1016 			if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
1017 				get_rusage(reactor);
1018 				reactor->last_rusage = reactor->tsc_last;
1019 			}
1020 		}
1021 
1022 		if (spdk_unlikely(g_scheduler_period_in_tsc > 0 &&
1023 				  (reactor->tsc_last - last_sched) > g_scheduler_period_in_tsc &&
1024 				  reactor == g_scheduling_reactor &&
1025 				  !g_scheduling_in_progress)) {
1026 			last_sched = reactor->tsc_last;
1027 			g_scheduling_in_progress = true;
1028 			spdk_trace_record(TRACE_SCHEDULER_PERIOD_START, 0, 0, 0);
1029 			_reactors_scheduler_gather_metrics(NULL, NULL);
1030 		}
1031 
1032 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
1033 			break;
1034 		}
1035 	}
1036 
1037 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
1038 		thread = spdk_thread_get_from_ctx(lw_thread);
1039 		/* All threads should have already had spdk_thread_exit() called on them, except
1040 		 * for the app thread.
1041 		 */
1042 		if (spdk_thread_is_running(thread)) {
1043 			if (!spdk_thread_is_app_thread(thread)) {
1044 				SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n",
1045 					    spdk_thread_get_name(thread));
1046 				SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n");
1047 			}
1048 			spdk_set_thread(thread);
1049 			spdk_thread_exit(thread);
1050 		}
1051 	}
1052 
1053 	while (!TAILQ_EMPTY(&reactor->threads)) {
1054 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1055 			thread = spdk_thread_get_from_ctx(lw_thread);
1056 			spdk_set_thread(thread);
1057 			if (spdk_thread_is_exited(thread)) {
1058 				_reactor_remove_lw_thread(reactor, lw_thread);
1059 				spdk_thread_destroy(thread);
1060 			} else {
1061 				if (spdk_unlikely(reactor->in_interrupt)) {
1062 					reactor_interrupt_run(reactor);
1063 				} else {
1064 					spdk_thread_poll(thread, 0, 0);
1065 				}
1066 			}
1067 		}
1068 	}
1069 
1070 	return 0;
1071 }
1072 
1073 int
1074 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
1075 {
1076 	int ret;
1077 	const struct spdk_cpuset *validmask;
1078 
1079 	ret = spdk_cpuset_parse(cpumask, mask);
1080 	if (ret < 0) {
1081 		return ret;
1082 	}
1083 
1084 	validmask = spdk_app_get_core_mask();
1085 	spdk_cpuset_and(cpumask, validmask);
1086 
1087 	return 0;
1088 }
1089 
1090 const struct spdk_cpuset *
1091 spdk_app_get_core_mask(void)
1092 {
1093 	return &g_reactor_core_mask;
1094 }
1095 
1096 void
1097 spdk_reactors_start(void)
1098 {
1099 	struct spdk_reactor *reactor;
1100 	uint32_t i, current_core;
1101 	int rc;
1102 
1103 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
1104 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
1105 	/* Reinitialize to false, in case the app framework is restarting in the same process. */
1106 	g_stopping_reactors = false;
1107 
1108 	current_core = spdk_env_get_current_core();
1109 	SPDK_ENV_FOREACH_CORE(i) {
1110 		if (i != current_core) {
1111 			reactor = spdk_reactor_get(i);
1112 			if (reactor == NULL) {
1113 				continue;
1114 			}
1115 
1116 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
1117 			if (rc < 0) {
1118 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
1119 				assert(false);
1120 				return;
1121 			}
1122 		}
1123 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
1124 	}
1125 
1126 	/* Start the main reactor */
1127 	reactor = spdk_reactor_get(current_core);
1128 	assert(reactor != NULL);
1129 	reactor_run(reactor);
1130 
1131 	spdk_env_thread_wait_all();
1132 
1133 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
1134 }
1135 
1136 static void
1137 _reactors_stop(void *arg1, void *arg2)
1138 {
1139 	uint32_t i;
1140 	int rc;
1141 	struct spdk_reactor *reactor;
1142 	struct spdk_reactor *local_reactor;
1143 	uint64_t notify = 1;
1144 
1145 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
1146 	local_reactor = spdk_reactor_get(spdk_env_get_current_core());
1147 
1148 	SPDK_ENV_FOREACH_CORE(i) {
1149 		/* If spdk_event_call isn't called  on a reactor, always send a notification.
1150 		 * If it is called on a reactor, send a notification if the destination reactor
1151 		 * is indicated in interrupt mode state.
1152 		 */
1153 		if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
1154 			reactor = spdk_reactor_get(i);
1155 			assert(reactor != NULL);
1156 			rc = write(reactor->events_fd, &notify, sizeof(notify));
1157 			if (rc < 0) {
1158 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
1159 				continue;
1160 			}
1161 		}
1162 	}
1163 }
1164 
1165 static void
1166 nop(void *arg1, void *arg2)
1167 {
1168 }
1169 
1170 void
1171 spdk_reactors_stop(void *arg1)
1172 {
1173 	spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop);
1174 }
1175 
1176 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
1177 static uint32_t g_next_core = UINT32_MAX;
1178 
1179 static void
1180 _schedule_thread(void *arg1, void *arg2)
1181 {
1182 	struct spdk_lw_thread *lw_thread = arg1;
1183 	struct spdk_thread *thread;
1184 	struct spdk_reactor *reactor;
1185 	uint32_t current_core;
1186 	struct spdk_fd_group *grp;
1187 
1188 	current_core = spdk_env_get_current_core();
1189 	reactor = spdk_reactor_get(current_core);
1190 	assert(reactor != NULL);
1191 
1192 	/* Update total_stats to reflect state of thread
1193 	* at the end of the move. */
1194 	thread = spdk_thread_get_from_ctx(lw_thread);
1195 	spdk_set_thread(thread);
1196 	spdk_thread_get_stats(&lw_thread->total_stats);
1197 	spdk_set_thread(NULL);
1198 
1199 	if (lw_thread->initial_lcore == SPDK_ENV_LCORE_ID_ANY) {
1200 		lw_thread->initial_lcore = current_core;
1201 	}
1202 	lw_thread->lcore = current_core;
1203 
1204 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
1205 	reactor->thread_count++;
1206 
1207 	/* Operate thread intr if running with full interrupt ability */
1208 	if (spdk_interrupt_mode_is_enabled()) {
1209 		int rc;
1210 
1211 		if (reactor->in_interrupt) {
1212 			grp = spdk_thread_get_interrupt_fd_group(thread);
1213 			rc = spdk_fd_group_nest(reactor->fgrp, grp);
1214 			if (rc < 0) {
1215 				SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
1216 			}
1217 		}
1218 
1219 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
1220 		spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor);
1221 	}
1222 }
1223 
1224 static int
1225 _reactor_schedule_thread(struct spdk_thread *thread)
1226 {
1227 	uint32_t core, initial_core;
1228 	struct spdk_lw_thread *lw_thread;
1229 	struct spdk_event *evt = NULL;
1230 	struct spdk_cpuset *cpumask;
1231 	uint32_t i;
1232 	struct spdk_reactor *local_reactor = NULL;
1233 	uint32_t current_lcore = spdk_env_get_current_core();
1234 	struct spdk_cpuset polling_cpumask;
1235 	struct spdk_cpuset valid_cpumask;
1236 
1237 	cpumask = spdk_thread_get_cpumask(thread);
1238 
1239 	lw_thread = spdk_thread_get_ctx(thread);
1240 	assert(lw_thread != NULL);
1241 	core = lw_thread->lcore;
1242 	initial_core = lw_thread->initial_lcore;
1243 	memset(lw_thread, 0, sizeof(*lw_thread));
1244 	lw_thread->initial_lcore = initial_core;
1245 
1246 	if (current_lcore != SPDK_ENV_LCORE_ID_ANY) {
1247 		local_reactor = spdk_reactor_get(current_lcore);
1248 		assert(local_reactor);
1249 	}
1250 
1251 	/* When interrupt ability of spdk_thread is not enabled and the current
1252 	 * reactor runs on DPDK thread, skip reactors which are in interrupt mode.
1253 	 */
1254 	if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) {
1255 		/* Get the cpumask of all reactors in polling */
1256 		spdk_cpuset_zero(&polling_cpumask);
1257 		SPDK_ENV_FOREACH_CORE(i) {
1258 			spdk_cpuset_set_cpu(&polling_cpumask, i, true);
1259 		}
1260 		spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset);
1261 
1262 		if (core == SPDK_ENV_LCORE_ID_ANY) {
1263 			/* Get the cpumask of all valid reactors which are suggested and also in polling */
1264 			spdk_cpuset_copy(&valid_cpumask, &polling_cpumask);
1265 			spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread));
1266 
1267 			/* If there are any valid reactors, spdk_thread should be scheduled
1268 			 * into one of the valid reactors.
1269 			 * If there is no valid reactors, spdk_thread should be scheduled
1270 			 * into one of the polling reactors.
1271 			 */
1272 			if (spdk_cpuset_count(&valid_cpumask) != 0) {
1273 				cpumask = &valid_cpumask;
1274 			} else {
1275 				cpumask = &polling_cpumask;
1276 			}
1277 		} else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) {
1278 			/* If specified reactor is not in polling, spdk_thread should be scheduled
1279 			 * into one of the polling reactors.
1280 			 */
1281 			core = SPDK_ENV_LCORE_ID_ANY;
1282 			cpumask = &polling_cpumask;
1283 		}
1284 	}
1285 
1286 	pthread_mutex_lock(&g_scheduler_mtx);
1287 	if (core == SPDK_ENV_LCORE_ID_ANY) {
1288 		for (i = 0; i < spdk_env_get_core_count(); i++) {
1289 			if (g_next_core >= g_reactor_count) {
1290 				g_next_core = spdk_env_get_first_core();
1291 			}
1292 			core = g_next_core;
1293 			g_next_core = spdk_env_get_next_core(g_next_core);
1294 
1295 			if (spdk_cpuset_get_cpu(cpumask, core)) {
1296 				break;
1297 			}
1298 		}
1299 	}
1300 
1301 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
1302 
1303 	if (current_lcore != core) {
1304 		spdk_trace_record(TRACE_SCHEDULER_MOVE_THREAD, spdk_thread_get_trace_id(thread), 0, 0,
1305 				  current_lcore, core);
1306 	}
1307 
1308 	pthread_mutex_unlock(&g_scheduler_mtx);
1309 
1310 	assert(evt != NULL);
1311 	if (evt == NULL) {
1312 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
1313 		return -1;
1314 	}
1315 
1316 	lw_thread->tsc_start = spdk_get_ticks();
1317 
1318 	spdk_event_call(evt);
1319 
1320 	return 0;
1321 }
1322 
1323 static void
1324 _reactor_request_thread_reschedule(struct spdk_thread *thread)
1325 {
1326 	struct spdk_lw_thread *lw_thread;
1327 	struct spdk_reactor *reactor;
1328 	uint32_t current_core;
1329 
1330 	assert(thread == spdk_get_thread());
1331 
1332 	lw_thread = spdk_thread_get_ctx(thread);
1333 
1334 	assert(lw_thread != NULL);
1335 	lw_thread->resched = true;
1336 	lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1337 
1338 	current_core = spdk_env_get_current_core();
1339 	reactor = spdk_reactor_get(current_core);
1340 	assert(reactor != NULL);
1341 
1342 	/* Send a notification if the destination reactor is indicated in intr mode state */
1343 	if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
1344 		uint64_t notify = 1;
1345 
1346 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1347 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
1348 		}
1349 	}
1350 }
1351 
1352 static int
1353 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
1354 {
1355 	struct spdk_lw_thread *lw_thread;
1356 
1357 	switch (op) {
1358 	case SPDK_THREAD_OP_NEW:
1359 		lw_thread = spdk_thread_get_ctx(thread);
1360 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1361 		lw_thread->initial_lcore = SPDK_ENV_LCORE_ID_ANY;
1362 		return _reactor_schedule_thread(thread);
1363 	case SPDK_THREAD_OP_RESCHED:
1364 		_reactor_request_thread_reschedule(thread);
1365 		return 0;
1366 	default:
1367 		return -ENOTSUP;
1368 	}
1369 }
1370 
1371 static bool
1372 reactor_thread_op_supported(enum spdk_thread_op op)
1373 {
1374 	switch (op) {
1375 	case SPDK_THREAD_OP_NEW:
1376 	case SPDK_THREAD_OP_RESCHED:
1377 		return true;
1378 	default:
1379 		return false;
1380 	}
1381 }
1382 
1383 struct call_reactor {
1384 	uint32_t cur_core;
1385 	spdk_event_fn fn;
1386 	void *arg1;
1387 	void *arg2;
1388 
1389 	uint32_t orig_core;
1390 	spdk_event_fn cpl;
1391 };
1392 
1393 static void
1394 on_reactor(void *arg1, void *arg2)
1395 {
1396 	struct call_reactor *cr = arg1;
1397 	struct spdk_event *evt;
1398 
1399 	cr->fn(cr->arg1, cr->arg2);
1400 
1401 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1402 
1403 	if (cr->cur_core >= g_reactor_count) {
1404 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1405 
1406 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1407 		free(cr);
1408 	} else {
1409 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1410 			      cr->cur_core);
1411 
1412 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1413 	}
1414 	assert(evt != NULL);
1415 	spdk_event_call(evt);
1416 }
1417 
1418 void
1419 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1420 {
1421 	struct call_reactor *cr;
1422 
1423 	/* When the application framework is shutting down, we will send one
1424 	 * final for_each_reactor operation with completion callback _reactors_stop,
1425 	 * to flush any existing for_each_reactor operations to avoid any memory
1426 	 * leaks. We use a mutex here to protect a boolean flag that will ensure
1427 	 * we don't start any more operations once we've started shutting down.
1428 	 */
1429 	pthread_mutex_lock(&g_stopping_reactors_mtx);
1430 	if (g_stopping_reactors) {
1431 		pthread_mutex_unlock(&g_stopping_reactors_mtx);
1432 		return;
1433 	} else if (cpl == _reactors_stop) {
1434 		g_stopping_reactors = true;
1435 	}
1436 	pthread_mutex_unlock(&g_stopping_reactors_mtx);
1437 
1438 	cr = calloc(1, sizeof(*cr));
1439 	if (!cr) {
1440 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1441 		cpl(arg1, arg2);
1442 		return;
1443 	}
1444 
1445 	cr->fn = fn;
1446 	cr->arg1 = arg1;
1447 	cr->arg2 = arg2;
1448 	cr->cpl = cpl;
1449 	cr->orig_core = spdk_env_get_current_core();
1450 	cr->cur_core = spdk_env_get_first_core();
1451 
1452 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1453 
1454 	_event_call(cr->cur_core, on_reactor, cr, NULL);
1455 }
1456 
1457 #ifdef __linux__
1458 static int
1459 reactor_schedule_thread_event(void *arg)
1460 {
1461 	struct spdk_reactor *reactor = arg;
1462 	struct spdk_lw_thread *lw_thread, *tmp;
1463 	uint32_t count = 0;
1464 
1465 	assert(reactor->in_interrupt);
1466 
1467 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1468 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1469 	}
1470 
1471 	return count;
1472 }
1473 
1474 static int
1475 reactor_interrupt_init(struct spdk_reactor *reactor)
1476 {
1477 	struct spdk_event_handler_opts opts = {};
1478 	int rc;
1479 
1480 	rc = spdk_fd_group_create(&reactor->fgrp);
1481 	if (rc != 0) {
1482 		return rc;
1483 	}
1484 
1485 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1486 	if (reactor->resched_fd < 0) {
1487 		rc = -EBADF;
1488 		goto err;
1489 	}
1490 
1491 	spdk_fd_group_get_default_event_handler_opts(&opts, sizeof(opts));
1492 	opts.fd_type = SPDK_FD_TYPE_EVENTFD;
1493 
1494 	rc = SPDK_FD_GROUP_ADD_EXT(reactor->fgrp, reactor->resched_fd,
1495 				   reactor_schedule_thread_event, reactor, &opts);
1496 	if (rc) {
1497 		close(reactor->resched_fd);
1498 		goto err;
1499 	}
1500 
1501 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1502 	if (reactor->events_fd < 0) {
1503 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1504 		close(reactor->resched_fd);
1505 
1506 		rc = -EBADF;
1507 		goto err;
1508 	}
1509 
1510 	rc = SPDK_FD_GROUP_ADD_EXT(reactor->fgrp, reactor->events_fd,
1511 				   event_queue_run_batch, reactor, &opts);
1512 	if (rc) {
1513 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1514 		close(reactor->resched_fd);
1515 		close(reactor->events_fd);
1516 		goto err;
1517 	}
1518 
1519 	return 0;
1520 
1521 err:
1522 	spdk_fd_group_destroy(reactor->fgrp);
1523 	reactor->fgrp = NULL;
1524 	return rc;
1525 }
1526 #else
1527 static int
1528 reactor_interrupt_init(struct spdk_reactor *reactor)
1529 {
1530 	return -ENOTSUP;
1531 }
1532 #endif
1533 
1534 static void
1535 reactor_interrupt_fini(struct spdk_reactor *reactor)
1536 {
1537 	struct spdk_fd_group *fgrp = reactor->fgrp;
1538 
1539 	if (!fgrp) {
1540 		return;
1541 	}
1542 
1543 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1544 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1545 
1546 	close(reactor->events_fd);
1547 	close(reactor->resched_fd);
1548 
1549 	spdk_fd_group_destroy(fgrp);
1550 	reactor->fgrp = NULL;
1551 }
1552 
1553 static struct spdk_governor *
1554 _governor_find(const char *name)
1555 {
1556 	struct spdk_governor *governor, *tmp;
1557 
1558 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1559 		if (strcmp(name, governor->name) == 0) {
1560 			return governor;
1561 		}
1562 	}
1563 
1564 	return NULL;
1565 }
1566 
1567 int
1568 spdk_governor_set(const char *name)
1569 {
1570 	struct spdk_governor *governor;
1571 	int rc = 0;
1572 
1573 	/* NULL governor was specifically requested */
1574 	if (name == NULL) {
1575 		if (g_governor) {
1576 			g_governor->deinit();
1577 		}
1578 		g_governor = NULL;
1579 		return 0;
1580 	}
1581 
1582 	governor = _governor_find(name);
1583 	if (governor == NULL) {
1584 		return -EINVAL;
1585 	}
1586 
1587 	if (g_governor == governor) {
1588 		return 0;
1589 	}
1590 
1591 	rc = governor->init();
1592 	if (rc == 0) {
1593 		if (g_governor) {
1594 			g_governor->deinit();
1595 		}
1596 		g_governor = governor;
1597 	}
1598 
1599 	return rc;
1600 }
1601 
1602 struct spdk_governor *
1603 spdk_governor_get(void)
1604 {
1605 	return g_governor;
1606 }
1607 
1608 void
1609 spdk_governor_register(struct spdk_governor *governor)
1610 {
1611 	if (_governor_find(governor->name)) {
1612 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1613 		assert(false);
1614 		return;
1615 	}
1616 
1617 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1618 }
1619 
1620 SPDK_LOG_REGISTER_COMPONENT(reactor)
1621 
1622 static void
1623 scheduler_trace(void)
1624 {
1625 	struct spdk_trace_tpoint_opts opts[] = {
1626 		{
1627 			"SCHEDULER_PERIOD_START", TRACE_SCHEDULER_PERIOD_START,
1628 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
1629 			{
1630 
1631 			}
1632 		},
1633 		{
1634 			"SCHEDULER_CORE_STATS", TRACE_SCHEDULER_CORE_STATS,
1635 			OWNER_TYPE_REACTOR, OBJECT_NONE, 0,
1636 			{
1637 				{ "busy", SPDK_TRACE_ARG_TYPE_INT, 8},
1638 				{ "idle", SPDK_TRACE_ARG_TYPE_INT, 8}
1639 			}
1640 		},
1641 		{
1642 			"SCHEDULER_THREAD_STATS", TRACE_SCHEDULER_THREAD_STATS,
1643 			OWNER_TYPE_THREAD, OBJECT_NONE, 0,
1644 			{
1645 				{ "busy", SPDK_TRACE_ARG_TYPE_INT, 8},
1646 				{ "idle", SPDK_TRACE_ARG_TYPE_INT, 8}
1647 			}
1648 		},
1649 		{
1650 			"SCHEDULER_MOVE_THREAD", TRACE_SCHEDULER_MOVE_THREAD,
1651 			OWNER_TYPE_THREAD, OBJECT_NONE, 0,
1652 			{
1653 				{ "src", SPDK_TRACE_ARG_TYPE_INT, 8 },
1654 				{ "dst", SPDK_TRACE_ARG_TYPE_INT, 8 }
1655 			}
1656 		}
1657 	};
1658 
1659 	spdk_trace_register_owner_type(OWNER_TYPE_REACTOR, 'r');
1660 	spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts));
1661 
1662 }
1663 
1664 SPDK_TRACE_REGISTER_FN(scheduler_trace, "scheduler", TRACE_GROUP_SCHEDULER)
1665