xref: /spdk/lib/event/reactor.c (revision 16d862d0380886f6fc765f68a87e240bb4295595)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/likely.h"
8 
9 #include "spdk_internal/event.h"
10 #include "spdk_internal/usdt.h"
11 
12 #include "spdk/log.h"
13 #include "spdk/thread.h"
14 #include "spdk/env.h"
15 #include "spdk/util.h"
16 #include "spdk/scheduler.h"
17 #include "spdk/string.h"
18 #include "spdk/fd_group.h"
19 
20 #ifdef __linux__
21 #include <sys/prctl.h>
22 #include <sys/eventfd.h>
23 #endif
24 
25 #ifdef __FreeBSD__
26 #include <pthread_np.h>
27 #endif
28 
29 #define SPDK_EVENT_BATCH_SIZE		8
30 
31 static struct spdk_reactor *g_reactors;
32 static uint32_t g_reactor_count;
33 static struct spdk_cpuset g_reactor_core_mask;
34 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
35 
36 static bool g_framework_context_switch_monitor_enabled = true;
37 
38 static struct spdk_mempool *g_spdk_event_mempool = NULL;
39 
40 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
41 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
42 
43 static struct spdk_scheduler *g_scheduler = NULL;
44 static struct spdk_reactor *g_scheduling_reactor;
45 bool g_scheduling_in_progress = false;
46 static uint64_t g_scheduler_period = 0;
47 static uint32_t g_scheduler_core_number;
48 static struct spdk_scheduler_core_info *g_core_infos = NULL;
49 
50 TAILQ_HEAD(, spdk_governor) g_governor_list
51 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
52 
53 static struct spdk_governor *g_governor = NULL;
54 
55 static int reactor_interrupt_init(struct spdk_reactor *reactor);
56 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
57 
58 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER;
59 static bool g_stopping_reactors = false;
60 
61 static struct spdk_scheduler *
62 _scheduler_find(const char *name)
63 {
64 	struct spdk_scheduler *tmp;
65 
66 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
67 		if (strcmp(name, tmp->name) == 0) {
68 			return tmp;
69 		}
70 	}
71 
72 	return NULL;
73 }
74 
75 int
76 spdk_scheduler_set(const char *name)
77 {
78 	struct spdk_scheduler *scheduler;
79 	int rc = 0;
80 
81 	/* NULL scheduler was specifically requested */
82 	if (name == NULL) {
83 		if (g_scheduler) {
84 			g_scheduler->deinit();
85 		}
86 		g_scheduler = NULL;
87 		return 0;
88 	}
89 
90 	scheduler = _scheduler_find(name);
91 	if (scheduler == NULL) {
92 		SPDK_ERRLOG("Requested scheduler is missing\n");
93 		return -EINVAL;
94 	}
95 
96 	if (g_scheduler == scheduler) {
97 		return 0;
98 	}
99 
100 	rc = scheduler->init();
101 	if (rc == 0) {
102 		if (g_scheduler) {
103 			g_scheduler->deinit();
104 		}
105 		g_scheduler = scheduler;
106 	}
107 
108 	return rc;
109 }
110 
111 struct spdk_scheduler *
112 spdk_scheduler_get(void)
113 {
114 	return g_scheduler;
115 }
116 
117 uint64_t
118 spdk_scheduler_get_period(void)
119 {
120 	/* Convert from ticks to microseconds */
121 	return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
122 }
123 
124 void
125 spdk_scheduler_set_period(uint64_t period)
126 {
127 	/* Convert microseconds to ticks */
128 	g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
129 }
130 
131 void
132 spdk_scheduler_register(struct spdk_scheduler *scheduler)
133 {
134 	if (_scheduler_find(scheduler->name)) {
135 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
136 		assert(false);
137 		return;
138 	}
139 
140 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
141 }
142 
143 uint32_t
144 spdk_scheduler_get_scheduling_lcore(void)
145 {
146 	return g_scheduling_reactor->lcore;
147 }
148 
149 static void
150 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
151 {
152 	reactor->lcore = lcore;
153 	reactor->flags.is_valid = true;
154 
155 	TAILQ_INIT(&reactor->threads);
156 	reactor->thread_count = 0;
157 	spdk_cpuset_zero(&reactor->notify_cpuset);
158 
159 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
160 	if (reactor->events == NULL) {
161 		SPDK_ERRLOG("Failed to allocate events ring\n");
162 		assert(false);
163 	}
164 
165 	/* Always initialize interrupt facilities for reactor */
166 	if (reactor_interrupt_init(reactor) != 0) {
167 		/* Reactor interrupt facilities are necessary if seting app to interrupt mode. */
168 		if (spdk_interrupt_mode_is_enabled()) {
169 			SPDK_ERRLOG("Failed to prepare intr facilities\n");
170 			assert(false);
171 		}
172 		return;
173 	}
174 
175 	/* If application runs with full interrupt ability,
176 	 * all reactors are going to run in interrupt mode.
177 	 */
178 	if (spdk_interrupt_mode_is_enabled()) {
179 		uint32_t i;
180 
181 		SPDK_ENV_FOREACH_CORE(i) {
182 			spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
183 		}
184 		reactor->in_interrupt = true;
185 	}
186 }
187 
188 struct spdk_reactor *
189 spdk_reactor_get(uint32_t lcore)
190 {
191 	struct spdk_reactor *reactor;
192 
193 	if (g_reactors == NULL) {
194 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
195 		return NULL;
196 	}
197 
198 	if (lcore >= g_reactor_count) {
199 		return NULL;
200 	}
201 
202 	reactor = &g_reactors[lcore];
203 
204 	if (reactor->flags.is_valid == false) {
205 		return NULL;
206 	}
207 
208 	return reactor;
209 }
210 
211 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
212 static bool reactor_thread_op_supported(enum spdk_thread_op op);
213 
214 int
215 spdk_reactors_init(size_t msg_mempool_size)
216 {
217 	struct spdk_reactor *reactor;
218 	int rc;
219 	uint32_t i, current_core;
220 	char mempool_name[32];
221 
222 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
223 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
224 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
225 			       sizeof(struct spdk_event),
226 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
227 			       SPDK_ENV_SOCKET_ID_ANY);
228 
229 	if (g_spdk_event_mempool == NULL) {
230 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
231 		return -1;
232 	}
233 
234 	/* struct spdk_reactor must be aligned on 64 byte boundary */
235 	g_reactor_count = spdk_env_get_last_core() + 1;
236 	rc = posix_memalign((void **)&g_reactors, 64,
237 			    g_reactor_count * sizeof(struct spdk_reactor));
238 	if (rc != 0) {
239 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
240 			    g_reactor_count);
241 		spdk_mempool_free(g_spdk_event_mempool);
242 		return -1;
243 	}
244 
245 	g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos));
246 	if (g_core_infos == NULL) {
247 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
248 		spdk_mempool_free(g_spdk_event_mempool);
249 		free(g_reactors);
250 		return -ENOMEM;
251 	}
252 
253 	memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor));
254 
255 	rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
256 				      sizeof(struct spdk_lw_thread), msg_mempool_size);
257 	if (rc != 0) {
258 		SPDK_ERRLOG("Initialize spdk thread lib failed\n");
259 		spdk_mempool_free(g_spdk_event_mempool);
260 		free(g_reactors);
261 		free(g_core_infos);
262 		return rc;
263 	}
264 
265 	SPDK_ENV_FOREACH_CORE(i) {
266 		reactor_construct(&g_reactors[i], i);
267 	}
268 
269 	current_core = spdk_env_get_current_core();
270 	reactor = spdk_reactor_get(current_core);
271 	assert(reactor != NULL);
272 	g_scheduling_reactor = reactor;
273 
274 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
275 
276 	return 0;
277 }
278 
279 void
280 spdk_reactors_fini(void)
281 {
282 	uint32_t i;
283 	struct spdk_reactor *reactor;
284 
285 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
286 		return;
287 	}
288 
289 	spdk_thread_lib_fini();
290 
291 	SPDK_ENV_FOREACH_CORE(i) {
292 		reactor = spdk_reactor_get(i);
293 		assert(reactor != NULL);
294 		assert(reactor->thread_count == 0);
295 		if (reactor->events != NULL) {
296 			spdk_ring_free(reactor->events);
297 		}
298 
299 		reactor_interrupt_fini(reactor);
300 
301 		if (g_core_infos != NULL) {
302 			free(g_core_infos[i].thread_infos);
303 		}
304 	}
305 
306 	spdk_mempool_free(g_spdk_event_mempool);
307 
308 	free(g_reactors);
309 	g_reactors = NULL;
310 	free(g_core_infos);
311 	g_core_infos = NULL;
312 }
313 
314 static void _reactor_set_interrupt_mode(void *arg1, void *arg2);
315 
316 static void
317 _reactor_set_notify_cpuset(void *arg1, void *arg2)
318 {
319 	struct spdk_reactor *target = arg1;
320 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
321 
322 	assert(reactor != NULL);
323 	spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt);
324 }
325 
326 static void
327 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
328 {
329 	struct spdk_event *ev;
330 
331 	ev = spdk_event_allocate(lcore, fn, arg1, arg2);
332 	assert(ev);
333 	spdk_event_call(ev);
334 }
335 
336 static void
337 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2)
338 {
339 	struct spdk_reactor *target = arg1;
340 
341 	if (target->new_in_interrupt == false) {
342 		target->set_interrupt_mode_in_progress = false;
343 		spdk_thread_send_msg(spdk_thread_get_app_thread(), target->set_interrupt_mode_cb_fn,
344 				     target->set_interrupt_mode_cb_arg);
345 	} else {
346 		_event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL);
347 	}
348 }
349 
350 static void
351 _reactor_set_thread_interrupt_mode(void *ctx)
352 {
353 	struct spdk_reactor *reactor = ctx;
354 
355 	spdk_thread_set_interrupt_mode(reactor->in_interrupt);
356 }
357 
358 static void
359 _reactor_set_interrupt_mode(void *arg1, void *arg2)
360 {
361 	struct spdk_reactor *target = arg1;
362 	struct spdk_thread *thread;
363 	struct spdk_fd_group *grp;
364 	struct spdk_lw_thread *lw_thread, *tmp;
365 
366 	assert(target == spdk_reactor_get(spdk_env_get_current_core()));
367 	assert(target != NULL);
368 	assert(target->in_interrupt != target->new_in_interrupt);
369 	SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n",
370 		      target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll");
371 
372 	target->in_interrupt = target->new_in_interrupt;
373 
374 	if (spdk_interrupt_mode_is_enabled()) {
375 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
376 		TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) {
377 			thread = spdk_thread_get_from_ctx(lw_thread);
378 			if (target->in_interrupt) {
379 				grp = spdk_thread_get_interrupt_fd_group(thread);
380 				spdk_fd_group_nest(target->fgrp, grp);
381 			} else {
382 				grp = spdk_thread_get_interrupt_fd_group(thread);
383 				spdk_fd_group_unnest(target->fgrp, grp);
384 			}
385 
386 			spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target);
387 		}
388 	}
389 
390 	if (target->new_in_interrupt == false) {
391 		/* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately
392 		 * track reactor stats. */
393 		target->tsc_last = spdk_get_ticks();
394 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
395 	} else {
396 		uint64_t notify = 1;
397 		int rc = 0;
398 
399 		/* Always trigger spdk_event and resched event in case of race condition */
400 		rc = write(target->events_fd, &notify, sizeof(notify));
401 		if (rc < 0) {
402 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
403 		}
404 		rc = write(target->resched_fd, &notify, sizeof(notify));
405 		if (rc < 0) {
406 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
407 		}
408 
409 		target->set_interrupt_mode_in_progress = false;
410 		spdk_thread_send_msg(spdk_thread_get_app_thread(), target->set_interrupt_mode_cb_fn,
411 				     target->set_interrupt_mode_cb_arg);
412 	}
413 }
414 
415 int
416 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt,
417 				spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg)
418 {
419 	struct spdk_reactor *target;
420 
421 	target = spdk_reactor_get(lcore);
422 	if (target == NULL) {
423 		return -EINVAL;
424 	}
425 
426 	/* Eventfd has to be supported in order to use interrupt functionality. */
427 	if (target->fgrp == NULL) {
428 		return -ENOTSUP;
429 	}
430 
431 	if (spdk_env_get_current_core() != g_scheduling_reactor->lcore) {
432 		SPDK_ERRLOG("It is only permitted within scheduling reactor.\n");
433 		return -EPERM;
434 	}
435 
436 	if (target->in_interrupt == new_in_interrupt) {
437 		cb_fn(cb_arg);
438 		return 0;
439 	}
440 
441 	if (target->set_interrupt_mode_in_progress) {
442 		SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore);
443 		return -EBUSY;
444 	}
445 	target->set_interrupt_mode_in_progress = true;
446 
447 	target->new_in_interrupt = new_in_interrupt;
448 	target->set_interrupt_mode_cb_fn = cb_fn;
449 	target->set_interrupt_mode_cb_arg = cb_arg;
450 
451 	SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n",
452 		      spdk_env_get_current_core(), lcore);
453 
454 	if (new_in_interrupt == false) {
455 		/* For potential race cases, when setting the reactor to poll mode,
456 		 * first change the mode of the reactor and then clear the corresponding
457 		 * bit of the notify_cpuset of each reactor.
458 		 */
459 		_event_call(lcore, _reactor_set_interrupt_mode, target, NULL);
460 	} else {
461 		/* For race cases, when setting the reactor to interrupt mode, first set the
462 		 * corresponding bit of the notify_cpuset of each reactor and then change the mode.
463 		 */
464 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
465 	}
466 
467 	return 0;
468 }
469 
470 struct spdk_event *
471 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
472 {
473 	struct spdk_event *event = NULL;
474 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
475 
476 	if (!reactor) {
477 		assert(false);
478 		return NULL;
479 	}
480 
481 	event = spdk_mempool_get(g_spdk_event_mempool);
482 	if (event == NULL) {
483 		assert(false);
484 		return NULL;
485 	}
486 
487 	event->lcore = lcore;
488 	event->fn = fn;
489 	event->arg1 = arg1;
490 	event->arg2 = arg2;
491 
492 	return event;
493 }
494 
495 void
496 spdk_event_call(struct spdk_event *event)
497 {
498 	int rc;
499 	struct spdk_reactor *reactor;
500 	struct spdk_reactor *local_reactor = NULL;
501 	uint32_t current_core = spdk_env_get_current_core();
502 
503 	reactor = spdk_reactor_get(event->lcore);
504 
505 	assert(reactor != NULL);
506 	assert(reactor->events != NULL);
507 
508 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
509 	if (rc != 1) {
510 		assert(false);
511 	}
512 
513 	if (current_core != SPDK_ENV_LCORE_ID_ANY) {
514 		local_reactor = spdk_reactor_get(current_core);
515 	}
516 
517 	/* If spdk_event_call isn't called on a reactor, always send a notification.
518 	 * If it is called on a reactor, send a notification if the destination reactor
519 	 * is indicated in interrupt mode state.
520 	 */
521 	if (spdk_unlikely(local_reactor == NULL) ||
522 	    spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
523 		uint64_t notify = 1;
524 
525 		rc = write(reactor->events_fd, &notify, sizeof(notify));
526 		if (rc < 0) {
527 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
528 		}
529 	}
530 }
531 
532 static inline int
533 event_queue_run_batch(void *arg)
534 {
535 	struct spdk_reactor *reactor = arg;
536 	size_t count, i;
537 	void *events[SPDK_EVENT_BATCH_SIZE];
538 
539 #ifdef DEBUG
540 	/*
541 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
542 	 * so we will never actually read uninitialized data from events, but just to be sure
543 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
544 	 */
545 	memset(events, 0, sizeof(events));
546 #endif
547 
548 	/* Operate event notification if this reactor currently runs in interrupt state */
549 	if (spdk_unlikely(reactor->in_interrupt)) {
550 		uint64_t notify = 1;
551 		int rc;
552 
553 		/* There may be race between event_acknowledge and another producer's event_notify,
554 		 * so event_acknowledge should be applied ahead. And then check for self's event_notify.
555 		 * This can avoid event notification missing.
556 		 */
557 		rc = read(reactor->events_fd, &notify, sizeof(notify));
558 		if (rc < 0) {
559 			SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno));
560 			return -errno;
561 		}
562 
563 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
564 
565 		if (spdk_ring_count(reactor->events) != 0) {
566 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
567 			rc = write(reactor->events_fd, &notify, sizeof(notify));
568 			if (rc < 0) {
569 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
570 				return -errno;
571 			}
572 		}
573 	} else {
574 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
575 	}
576 
577 	if (count == 0) {
578 		return 0;
579 	}
580 
581 	for (i = 0; i < count; i++) {
582 		struct spdk_event *event = events[i];
583 
584 		assert(event != NULL);
585 		assert(spdk_get_thread() == NULL);
586 		SPDK_DTRACE_PROBE3(event_exec, event->fn,
587 				   event->arg1, event->arg2);
588 		event->fn(event->arg1, event->arg2);
589 	}
590 
591 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
592 
593 	return (int)count;
594 }
595 
596 /* 1s */
597 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
598 
599 static int
600 get_rusage(struct spdk_reactor *reactor)
601 {
602 	struct rusage		rusage;
603 
604 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
605 		return -1;
606 	}
607 
608 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
609 		SPDK_INFOLOG(reactor,
610 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
611 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
612 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
613 	}
614 	reactor->rusage = rusage;
615 
616 	return -1;
617 }
618 
619 void
620 spdk_framework_enable_context_switch_monitor(bool enable)
621 {
622 	/* This global is being read by multiple threads, so this isn't
623 	 * strictly thread safe. However, we're toggling between true and
624 	 * false here, and if a thread sees the value update later than it
625 	 * should, it's no big deal. */
626 	g_framework_context_switch_monitor_enabled = enable;
627 }
628 
629 bool
630 spdk_framework_context_switch_monitor_enabled(void)
631 {
632 	return g_framework_context_switch_monitor_enabled;
633 }
634 
635 static void
636 _set_thread_name(const char *thread_name)
637 {
638 #if defined(__linux__)
639 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
640 #elif defined(__FreeBSD__)
641 	pthread_set_name_np(pthread_self(), thread_name);
642 #else
643 	pthread_setname_np(pthread_self(), thread_name);
644 #endif
645 }
646 
647 static void
648 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
649 {
650 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
651 	struct spdk_thread_stats prev_total_stats;
652 
653 	/* Read total_stats before updating it to calculate stats during the last scheduling period. */
654 	prev_total_stats = lw_thread->total_stats;
655 
656 	spdk_set_thread(thread);
657 	spdk_thread_get_stats(&lw_thread->total_stats);
658 	spdk_set_thread(NULL);
659 
660 	lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc;
661 	lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc;
662 }
663 
664 static void
665 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info)
666 {
667 	struct spdk_lw_thread *lw_thread;
668 	struct spdk_thread *thread;
669 
670 	thread = spdk_thread_get_by_id(thread_info->thread_id);
671 	if (thread == NULL) {
672 		/* Thread no longer exists. */
673 		return;
674 	}
675 	lw_thread = spdk_thread_get_ctx(thread);
676 	assert(lw_thread != NULL);
677 
678 	lw_thread->lcore = thread_info->lcore;
679 	lw_thread->resched = true;
680 }
681 
682 static void
683 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
684 {
685 	struct spdk_scheduler_core_info *core;
686 	struct spdk_scheduler_thread_info *thread_info;
687 	uint32_t i, j;
688 
689 	SPDK_ENV_FOREACH_CORE(i) {
690 		core = &cores_info[i];
691 		for (j = 0; j < core->threads_count; j++) {
692 			thread_info = &core->thread_infos[j];
693 			if (thread_info->lcore != i) {
694 				_threads_reschedule_thread(thread_info);
695 			}
696 		}
697 		core->threads_count = 0;
698 		free(core->thread_infos);
699 		core->thread_infos = NULL;
700 	}
701 }
702 
703 static void
704 _reactors_scheduler_fini(void)
705 {
706 	/* Reschedule based on the balancing output */
707 	_threads_reschedule(g_core_infos);
708 
709 	g_scheduling_in_progress = false;
710 }
711 
712 static void
713 _reactors_scheduler_update_core_mode(void *ctx)
714 {
715 	struct spdk_reactor *reactor;
716 	uint32_t i;
717 	int rc = 0;
718 
719 	for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) {
720 		reactor = spdk_reactor_get(i);
721 		assert(reactor != NULL);
722 		if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) {
723 			/* Switch next found reactor to new state */
724 			rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode,
725 							     _reactors_scheduler_update_core_mode, NULL);
726 			if (rc == 0) {
727 				/* Set core to start with after callback completes */
728 				g_scheduler_core_number = spdk_env_get_next_core(i);
729 				return;
730 			}
731 		}
732 	}
733 	_reactors_scheduler_fini();
734 }
735 
736 static void
737 _reactors_scheduler_cancel(void *arg1, void *arg2)
738 {
739 	struct spdk_scheduler_core_info *core;
740 	uint32_t i;
741 
742 	SPDK_ENV_FOREACH_CORE(i) {
743 		core = &g_core_infos[i];
744 		core->threads_count = 0;
745 		free(core->thread_infos);
746 		core->thread_infos = NULL;
747 	}
748 
749 	g_scheduling_in_progress = false;
750 }
751 
752 static void
753 _reactors_scheduler_balance(void *arg1, void *arg2)
754 {
755 	struct spdk_scheduler *scheduler = spdk_scheduler_get();
756 
757 	if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) {
758 		_reactors_scheduler_cancel(NULL, NULL);
759 		return;
760 	}
761 
762 	scheduler->balance(g_core_infos, g_reactor_count);
763 
764 	g_scheduler_core_number = spdk_env_get_first_core();
765 	_reactors_scheduler_update_core_mode(NULL);
766 }
767 
768 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
769 static void
770 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
771 {
772 	struct spdk_scheduler_core_info *core_info;
773 	struct spdk_lw_thread *lw_thread;
774 	struct spdk_thread *thread;
775 	struct spdk_reactor *reactor;
776 	uint32_t next_core;
777 	uint32_t i = 0;
778 
779 	reactor = spdk_reactor_get(spdk_env_get_current_core());
780 	assert(reactor != NULL);
781 	core_info = &g_core_infos[reactor->lcore];
782 	core_info->lcore = reactor->lcore;
783 	core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc;
784 	core_info->total_idle_tsc = reactor->idle_tsc;
785 	core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc;
786 	core_info->total_busy_tsc = reactor->busy_tsc;
787 	core_info->interrupt_mode = reactor->in_interrupt;
788 	core_info->threads_count = 0;
789 
790 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
791 
792 	if (reactor->thread_count > 0) {
793 		core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos));
794 		if (core_info->thread_infos == NULL) {
795 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
796 
797 			/* Cancel this round of schedule work */
798 			_event_call(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL);
799 			return;
800 		}
801 
802 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
803 			_init_thread_stats(reactor, lw_thread);
804 
805 			core_info->thread_infos[i].lcore = lw_thread->lcore;
806 			thread = spdk_thread_get_from_ctx(lw_thread);
807 			assert(thread != NULL);
808 			core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread);
809 			core_info->thread_infos[i].total_stats = lw_thread->total_stats;
810 			core_info->thread_infos[i].current_stats = lw_thread->current_stats;
811 			core_info->threads_count++;
812 			assert(core_info->threads_count <= reactor->thread_count);
813 			i++;
814 		}
815 	}
816 
817 	next_core = spdk_env_get_next_core(reactor->lcore);
818 	if (next_core == UINT32_MAX) {
819 		next_core = spdk_env_get_first_core();
820 	}
821 
822 	/* If we've looped back around to the scheduler thread, move to the next phase */
823 	if (next_core == g_scheduling_reactor->lcore) {
824 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
825 		_event_call(next_core, _reactors_scheduler_balance, NULL, NULL);
826 		return;
827 	}
828 
829 	_event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
830 }
831 
832 static int _reactor_schedule_thread(struct spdk_thread *thread);
833 static uint64_t g_rusage_period;
834 
835 static void
836 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
837 {
838 	struct spdk_thread	*thread = spdk_thread_get_from_ctx(lw_thread);
839 	struct spdk_fd_group	*grp;
840 
841 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
842 	assert(reactor->thread_count > 0);
843 	reactor->thread_count--;
844 
845 	/* Operate thread intr if running with full interrupt ability */
846 	if (spdk_interrupt_mode_is_enabled()) {
847 		if (reactor->in_interrupt) {
848 			grp = spdk_thread_get_interrupt_fd_group(thread);
849 			spdk_fd_group_unnest(reactor->fgrp, grp);
850 		}
851 	}
852 }
853 
854 static bool
855 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
856 {
857 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
858 
859 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
860 			  spdk_thread_is_idle(thread))) {
861 		_reactor_remove_lw_thread(reactor, lw_thread);
862 		spdk_thread_destroy(thread);
863 		return true;
864 	}
865 
866 	if (spdk_unlikely(lw_thread->resched && !spdk_thread_is_bound(thread))) {
867 		lw_thread->resched = false;
868 		_reactor_remove_lw_thread(reactor, lw_thread);
869 		_reactor_schedule_thread(thread);
870 		return true;
871 	}
872 
873 	return false;
874 }
875 
876 static void
877 reactor_interrupt_run(struct spdk_reactor *reactor)
878 {
879 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
880 
881 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
882 }
883 
884 static void
885 _reactor_run(struct spdk_reactor *reactor)
886 {
887 	struct spdk_thread	*thread;
888 	struct spdk_lw_thread	*lw_thread, *tmp;
889 	uint64_t		now;
890 	int			rc;
891 
892 	event_queue_run_batch(reactor);
893 
894 	/* If no threads are present on the reactor,
895 	 * tsc_last gets outdated. Update it to track
896 	 * thread execution time correctly. */
897 	if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) {
898 		now = spdk_get_ticks();
899 		reactor->idle_tsc += now - reactor->tsc_last;
900 		reactor->tsc_last = now;
901 		return;
902 	}
903 
904 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
905 		thread = spdk_thread_get_from_ctx(lw_thread);
906 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
907 
908 		now = spdk_thread_get_last_tsc(thread);
909 		if (rc == 0) {
910 			reactor->idle_tsc += now - reactor->tsc_last;
911 		} else if (rc > 0) {
912 			reactor->busy_tsc += now - reactor->tsc_last;
913 		}
914 		reactor->tsc_last = now;
915 
916 		reactor_post_process_lw_thread(reactor, lw_thread);
917 	}
918 }
919 
920 static int
921 reactor_run(void *arg)
922 {
923 	struct spdk_reactor	*reactor = arg;
924 	struct spdk_thread	*thread;
925 	struct spdk_lw_thread	*lw_thread, *tmp;
926 	char			thread_name[32];
927 	uint64_t		last_sched = 0;
928 
929 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
930 
931 	/* Rename the POSIX thread because the reactor is tied to the POSIX
932 	 * thread in the SPDK event library.
933 	 */
934 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
935 	_set_thread_name(thread_name);
936 
937 	reactor->tsc_last = spdk_get_ticks();
938 
939 	while (1) {
940 		/* Execute interrupt process fn if this reactor currently runs in interrupt state */
941 		if (spdk_unlikely(reactor->in_interrupt)) {
942 			reactor_interrupt_run(reactor);
943 		} else {
944 			_reactor_run(reactor);
945 		}
946 
947 		if (g_framework_context_switch_monitor_enabled) {
948 			if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
949 				get_rusage(reactor);
950 				reactor->last_rusage = reactor->tsc_last;
951 			}
952 		}
953 
954 		if (spdk_unlikely(g_scheduler_period > 0 &&
955 				  (reactor->tsc_last - last_sched) > g_scheduler_period &&
956 				  reactor == g_scheduling_reactor &&
957 				  !g_scheduling_in_progress)) {
958 			last_sched = reactor->tsc_last;
959 			g_scheduling_in_progress = true;
960 			_reactors_scheduler_gather_metrics(NULL, NULL);
961 		}
962 
963 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
964 			break;
965 		}
966 	}
967 
968 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
969 		thread = spdk_thread_get_from_ctx(lw_thread);
970 		/* All threads should have already had spdk_thread_exit() called on them, except
971 		 * for the app thread.
972 		 */
973 		if (spdk_thread_is_running(thread)) {
974 			if (!spdk_thread_is_app_thread(thread)) {
975 				SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n",
976 					    spdk_thread_get_name(thread));
977 				SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n");
978 			}
979 			spdk_set_thread(thread);
980 			spdk_thread_exit(thread);
981 		}
982 	}
983 
984 	while (!TAILQ_EMPTY(&reactor->threads)) {
985 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
986 			thread = spdk_thread_get_from_ctx(lw_thread);
987 			spdk_set_thread(thread);
988 			if (spdk_thread_is_exited(thread)) {
989 				_reactor_remove_lw_thread(reactor, lw_thread);
990 				spdk_thread_destroy(thread);
991 			} else {
992 				if (spdk_unlikely(reactor->in_interrupt)) {
993 					reactor_interrupt_run(reactor);
994 				} else {
995 					spdk_thread_poll(thread, 0, 0);
996 				}
997 			}
998 		}
999 	}
1000 
1001 	return 0;
1002 }
1003 
1004 int
1005 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
1006 {
1007 	int ret;
1008 	const struct spdk_cpuset *validmask;
1009 
1010 	ret = spdk_cpuset_parse(cpumask, mask);
1011 	if (ret < 0) {
1012 		return ret;
1013 	}
1014 
1015 	validmask = spdk_app_get_core_mask();
1016 	spdk_cpuset_and(cpumask, validmask);
1017 
1018 	return 0;
1019 }
1020 
1021 const struct spdk_cpuset *
1022 spdk_app_get_core_mask(void)
1023 {
1024 	return &g_reactor_core_mask;
1025 }
1026 
1027 void
1028 spdk_reactors_start(void)
1029 {
1030 	struct spdk_reactor *reactor;
1031 	uint32_t i, current_core;
1032 	int rc;
1033 
1034 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
1035 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
1036 	/* Reinitialize to false, in case the app framework is restarting in the same process. */
1037 	g_stopping_reactors = false;
1038 
1039 	current_core = spdk_env_get_current_core();
1040 	SPDK_ENV_FOREACH_CORE(i) {
1041 		if (i != current_core) {
1042 			reactor = spdk_reactor_get(i);
1043 			if (reactor == NULL) {
1044 				continue;
1045 			}
1046 
1047 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
1048 			if (rc < 0) {
1049 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
1050 				assert(false);
1051 				return;
1052 			}
1053 		}
1054 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
1055 	}
1056 
1057 	/* Start the main reactor */
1058 	reactor = spdk_reactor_get(current_core);
1059 	assert(reactor != NULL);
1060 	reactor_run(reactor);
1061 
1062 	spdk_env_thread_wait_all();
1063 
1064 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
1065 }
1066 
1067 static void
1068 _reactors_stop(void *arg1, void *arg2)
1069 {
1070 	uint32_t i;
1071 	int rc;
1072 	struct spdk_reactor *reactor;
1073 	struct spdk_reactor *local_reactor;
1074 	uint64_t notify = 1;
1075 
1076 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
1077 	local_reactor = spdk_reactor_get(spdk_env_get_current_core());
1078 
1079 	SPDK_ENV_FOREACH_CORE(i) {
1080 		/* If spdk_event_call isn't called  on a reactor, always send a notification.
1081 		 * If it is called on a reactor, send a notification if the destination reactor
1082 		 * is indicated in interrupt mode state.
1083 		 */
1084 		if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
1085 			reactor = spdk_reactor_get(i);
1086 			assert(reactor != NULL);
1087 			rc = write(reactor->events_fd, &notify, sizeof(notify));
1088 			if (rc < 0) {
1089 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
1090 				continue;
1091 			}
1092 		}
1093 	}
1094 }
1095 
1096 static void
1097 nop(void *arg1, void *arg2)
1098 {
1099 }
1100 
1101 void
1102 spdk_reactors_stop(void *arg1)
1103 {
1104 	spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop);
1105 }
1106 
1107 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
1108 static uint32_t g_next_core = UINT32_MAX;
1109 
1110 static void
1111 _schedule_thread(void *arg1, void *arg2)
1112 {
1113 	struct spdk_lw_thread *lw_thread = arg1;
1114 	struct spdk_thread *thread;
1115 	struct spdk_reactor *reactor;
1116 	uint32_t current_core;
1117 	struct spdk_fd_group *grp;
1118 
1119 	current_core = spdk_env_get_current_core();
1120 	reactor = spdk_reactor_get(current_core);
1121 	assert(reactor != NULL);
1122 
1123 	/* Update total_stats to reflect state of thread
1124 	* at the end of the move. */
1125 	thread = spdk_thread_get_from_ctx(lw_thread);
1126 	spdk_set_thread(thread);
1127 	spdk_thread_get_stats(&lw_thread->total_stats);
1128 	spdk_set_thread(NULL);
1129 
1130 	lw_thread->lcore = current_core;
1131 
1132 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
1133 	reactor->thread_count++;
1134 
1135 	/* Operate thread intr if running with full interrupt ability */
1136 	if (spdk_interrupt_mode_is_enabled()) {
1137 		int rc;
1138 
1139 		if (reactor->in_interrupt) {
1140 			grp = spdk_thread_get_interrupt_fd_group(thread);
1141 			rc = spdk_fd_group_nest(reactor->fgrp, grp);
1142 			if (rc < 0) {
1143 				SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
1144 			}
1145 		}
1146 
1147 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
1148 		spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor);
1149 	}
1150 }
1151 
1152 static int
1153 _reactor_schedule_thread(struct spdk_thread *thread)
1154 {
1155 	uint32_t core;
1156 	struct spdk_lw_thread *lw_thread;
1157 	struct spdk_event *evt = NULL;
1158 	struct spdk_cpuset *cpumask;
1159 	uint32_t i;
1160 	struct spdk_reactor *local_reactor = NULL;
1161 	uint32_t current_lcore = spdk_env_get_current_core();
1162 	struct spdk_cpuset polling_cpumask;
1163 	struct spdk_cpuset valid_cpumask;
1164 
1165 	cpumask = spdk_thread_get_cpumask(thread);
1166 
1167 	lw_thread = spdk_thread_get_ctx(thread);
1168 	assert(lw_thread != NULL);
1169 	core = lw_thread->lcore;
1170 	memset(lw_thread, 0, sizeof(*lw_thread));
1171 
1172 	if (current_lcore != SPDK_ENV_LCORE_ID_ANY) {
1173 		local_reactor = spdk_reactor_get(current_lcore);
1174 		assert(local_reactor);
1175 	}
1176 
1177 	/* When interrupt ability of spdk_thread is not enabled and the current
1178 	 * reactor runs on DPDK thread, skip reactors which are in interrupt mode.
1179 	 */
1180 	if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) {
1181 		/* Get the cpumask of all reactors in polling */
1182 		spdk_cpuset_zero(&polling_cpumask);
1183 		SPDK_ENV_FOREACH_CORE(i) {
1184 			spdk_cpuset_set_cpu(&polling_cpumask, i, true);
1185 		}
1186 		spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset);
1187 
1188 		if (core == SPDK_ENV_LCORE_ID_ANY) {
1189 			/* Get the cpumask of all valid reactors which are suggested and also in polling */
1190 			spdk_cpuset_copy(&valid_cpumask, &polling_cpumask);
1191 			spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread));
1192 
1193 			/* If there are any valid reactors, spdk_thread should be scheduled
1194 			 * into one of the valid reactors.
1195 			 * If there is no valid reactors, spdk_thread should be scheduled
1196 			 * into one of the polling reactors.
1197 			 */
1198 			if (spdk_cpuset_count(&valid_cpumask) != 0) {
1199 				cpumask = &valid_cpumask;
1200 			} else {
1201 				cpumask = &polling_cpumask;
1202 			}
1203 		} else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) {
1204 			/* If specified reactor is not in polling, spdk_thread should be scheduled
1205 			 * into one of the polling reactors.
1206 			 */
1207 			core = SPDK_ENV_LCORE_ID_ANY;
1208 			cpumask = &polling_cpumask;
1209 		}
1210 	}
1211 
1212 	pthread_mutex_lock(&g_scheduler_mtx);
1213 	if (core == SPDK_ENV_LCORE_ID_ANY) {
1214 		for (i = 0; i < spdk_env_get_core_count(); i++) {
1215 			if (g_next_core >= g_reactor_count) {
1216 				g_next_core = spdk_env_get_first_core();
1217 			}
1218 			core = g_next_core;
1219 			g_next_core = spdk_env_get_next_core(g_next_core);
1220 
1221 			if (spdk_cpuset_get_cpu(cpumask, core)) {
1222 				break;
1223 			}
1224 		}
1225 	}
1226 
1227 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
1228 
1229 	pthread_mutex_unlock(&g_scheduler_mtx);
1230 
1231 	assert(evt != NULL);
1232 	if (evt == NULL) {
1233 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
1234 		return -1;
1235 	}
1236 
1237 	lw_thread->tsc_start = spdk_get_ticks();
1238 
1239 	spdk_event_call(evt);
1240 
1241 	return 0;
1242 }
1243 
1244 static void
1245 _reactor_request_thread_reschedule(struct spdk_thread *thread)
1246 {
1247 	struct spdk_lw_thread *lw_thread;
1248 	struct spdk_reactor *reactor;
1249 	uint32_t current_core;
1250 
1251 	assert(thread == spdk_get_thread());
1252 
1253 	lw_thread = spdk_thread_get_ctx(thread);
1254 
1255 	assert(lw_thread != NULL);
1256 	lw_thread->resched = true;
1257 	lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1258 
1259 	current_core = spdk_env_get_current_core();
1260 	reactor = spdk_reactor_get(current_core);
1261 	assert(reactor != NULL);
1262 
1263 	/* Send a notification if the destination reactor is indicated in intr mode state */
1264 	if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
1265 		uint64_t notify = 1;
1266 
1267 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1268 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
1269 		}
1270 	}
1271 }
1272 
1273 static int
1274 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
1275 {
1276 	struct spdk_lw_thread *lw_thread;
1277 
1278 	switch (op) {
1279 	case SPDK_THREAD_OP_NEW:
1280 		lw_thread = spdk_thread_get_ctx(thread);
1281 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1282 		return _reactor_schedule_thread(thread);
1283 	case SPDK_THREAD_OP_RESCHED:
1284 		_reactor_request_thread_reschedule(thread);
1285 		return 0;
1286 	default:
1287 		return -ENOTSUP;
1288 	}
1289 }
1290 
1291 static bool
1292 reactor_thread_op_supported(enum spdk_thread_op op)
1293 {
1294 	switch (op) {
1295 	case SPDK_THREAD_OP_NEW:
1296 	case SPDK_THREAD_OP_RESCHED:
1297 		return true;
1298 	default:
1299 		return false;
1300 	}
1301 }
1302 
1303 struct call_reactor {
1304 	uint32_t cur_core;
1305 	spdk_event_fn fn;
1306 	void *arg1;
1307 	void *arg2;
1308 
1309 	uint32_t orig_core;
1310 	spdk_event_fn cpl;
1311 };
1312 
1313 static void
1314 on_reactor(void *arg1, void *arg2)
1315 {
1316 	struct call_reactor *cr = arg1;
1317 	struct spdk_event *evt;
1318 
1319 	cr->fn(cr->arg1, cr->arg2);
1320 
1321 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1322 
1323 	if (cr->cur_core >= g_reactor_count) {
1324 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1325 
1326 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1327 		free(cr);
1328 	} else {
1329 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1330 			      cr->cur_core);
1331 
1332 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1333 	}
1334 	assert(evt != NULL);
1335 	spdk_event_call(evt);
1336 }
1337 
1338 void
1339 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1340 {
1341 	struct call_reactor *cr;
1342 
1343 	/* When the application framework is shutting down, we will send one
1344 	 * final for_each_reactor operation with completion callback _reactors_stop,
1345 	 * to flush any existing for_each_reactor operations to avoid any memory
1346 	 * leaks. We use a mutex here to protect a boolean flag that will ensure
1347 	 * we don't start any more operations once we've started shutting down.
1348 	 */
1349 	pthread_mutex_lock(&g_stopping_reactors_mtx);
1350 	if (g_stopping_reactors) {
1351 		pthread_mutex_unlock(&g_stopping_reactors_mtx);
1352 		return;
1353 	} else if (cpl == _reactors_stop) {
1354 		g_stopping_reactors = true;
1355 	}
1356 	pthread_mutex_unlock(&g_stopping_reactors_mtx);
1357 
1358 	cr = calloc(1, sizeof(*cr));
1359 	if (!cr) {
1360 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1361 		cpl(arg1, arg2);
1362 		return;
1363 	}
1364 
1365 	cr->fn = fn;
1366 	cr->arg1 = arg1;
1367 	cr->arg2 = arg2;
1368 	cr->cpl = cpl;
1369 	cr->orig_core = spdk_env_get_current_core();
1370 	cr->cur_core = spdk_env_get_first_core();
1371 
1372 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1373 
1374 	_event_call(cr->cur_core, on_reactor, cr, NULL);
1375 }
1376 
1377 #ifdef __linux__
1378 static int
1379 reactor_schedule_thread_event(void *arg)
1380 {
1381 	struct spdk_reactor *reactor = arg;
1382 	struct spdk_lw_thread *lw_thread, *tmp;
1383 	uint32_t count = 0;
1384 	uint64_t notify = 1;
1385 
1386 	assert(reactor->in_interrupt);
1387 
1388 	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1389 		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
1390 		return -errno;
1391 	}
1392 
1393 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1394 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1395 	}
1396 
1397 	return count;
1398 }
1399 
1400 static int
1401 reactor_interrupt_init(struct spdk_reactor *reactor)
1402 {
1403 	int rc;
1404 
1405 	rc = spdk_fd_group_create(&reactor->fgrp);
1406 	if (rc != 0) {
1407 		return rc;
1408 	}
1409 
1410 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1411 	if (reactor->resched_fd < 0) {
1412 		rc = -EBADF;
1413 		goto err;
1414 	}
1415 
1416 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event,
1417 			       reactor);
1418 	if (rc) {
1419 		close(reactor->resched_fd);
1420 		goto err;
1421 	}
1422 
1423 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1424 	if (reactor->events_fd < 0) {
1425 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1426 		close(reactor->resched_fd);
1427 
1428 		rc = -EBADF;
1429 		goto err;
1430 	}
1431 
1432 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd,
1433 			       event_queue_run_batch, reactor);
1434 	if (rc) {
1435 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1436 		close(reactor->resched_fd);
1437 		close(reactor->events_fd);
1438 		goto err;
1439 	}
1440 
1441 	return 0;
1442 
1443 err:
1444 	spdk_fd_group_destroy(reactor->fgrp);
1445 	reactor->fgrp = NULL;
1446 	return rc;
1447 }
1448 #else
1449 static int
1450 reactor_interrupt_init(struct spdk_reactor *reactor)
1451 {
1452 	return -ENOTSUP;
1453 }
1454 #endif
1455 
1456 static void
1457 reactor_interrupt_fini(struct spdk_reactor *reactor)
1458 {
1459 	struct spdk_fd_group *fgrp = reactor->fgrp;
1460 
1461 	if (!fgrp) {
1462 		return;
1463 	}
1464 
1465 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1466 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1467 
1468 	close(reactor->events_fd);
1469 	close(reactor->resched_fd);
1470 
1471 	spdk_fd_group_destroy(fgrp);
1472 	reactor->fgrp = NULL;
1473 }
1474 
1475 static struct spdk_governor *
1476 _governor_find(const char *name)
1477 {
1478 	struct spdk_governor *governor, *tmp;
1479 
1480 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1481 		if (strcmp(name, governor->name) == 0) {
1482 			return governor;
1483 		}
1484 	}
1485 
1486 	return NULL;
1487 }
1488 
1489 int
1490 spdk_governor_set(const char *name)
1491 {
1492 	struct spdk_governor *governor;
1493 	int rc = 0;
1494 
1495 	/* NULL governor was specifically requested */
1496 	if (name == NULL) {
1497 		if (g_governor) {
1498 			g_governor->deinit();
1499 		}
1500 		g_governor = NULL;
1501 		return 0;
1502 	}
1503 
1504 	governor = _governor_find(name);
1505 	if (governor == NULL) {
1506 		return -EINVAL;
1507 	}
1508 
1509 	if (g_governor == governor) {
1510 		return 0;
1511 	}
1512 
1513 	rc = governor->init();
1514 	if (rc == 0) {
1515 		if (g_governor) {
1516 			g_governor->deinit();
1517 		}
1518 		g_governor = governor;
1519 	}
1520 
1521 	return rc;
1522 }
1523 
1524 struct spdk_governor *
1525 spdk_governor_get(void)
1526 {
1527 	return g_governor;
1528 }
1529 
1530 void
1531 spdk_governor_register(struct spdk_governor *governor)
1532 {
1533 	if (_governor_find(governor->name)) {
1534 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1535 		assert(false);
1536 		return;
1537 	}
1538 
1539 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1540 }
1541 
1542 SPDK_LOG_REGISTER_COMPONENT(reactor)
1543