xref: /spdk/lib/event/reactor.c (revision f6866117acb32c78d5ea7bd76ba330284655af35)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/likely.h"
8 
9 #include "spdk_internal/event.h"
10 #include "spdk_internal/usdt.h"
11 
12 #include "spdk/log.h"
13 #include "spdk/thread.h"
14 #include "spdk/env.h"
15 #include "spdk/util.h"
16 #include "spdk/scheduler.h"
17 #include "spdk/string.h"
18 #include "spdk/fd_group.h"
19 
20 #ifdef __linux__
21 #include <sys/prctl.h>
22 #include <sys/eventfd.h>
23 #endif
24 
25 #ifdef __FreeBSD__
26 #include <pthread_np.h>
27 #endif
28 
29 #define SPDK_EVENT_BATCH_SIZE		8
30 
31 static struct spdk_reactor *g_reactors;
32 static uint32_t g_reactor_count;
33 static struct spdk_cpuset g_reactor_core_mask;
34 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
35 
36 static bool g_framework_context_switch_monitor_enabled = true;
37 
38 static struct spdk_mempool *g_spdk_event_mempool = NULL;
39 
40 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
41 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
42 
43 static struct spdk_scheduler *g_scheduler = NULL;
44 static struct spdk_reactor *g_scheduling_reactor;
45 bool g_scheduling_in_progress = false;
46 static uint64_t g_scheduler_period = 0;
47 static uint32_t g_scheduler_core_number;
48 static struct spdk_scheduler_core_info *g_core_infos = NULL;
49 
50 TAILQ_HEAD(, spdk_governor) g_governor_list
51 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
52 
53 static struct spdk_governor *g_governor = NULL;
54 
55 static int reactor_interrupt_init(struct spdk_reactor *reactor);
56 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
57 
58 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER;
59 static bool g_stopping_reactors = false;
60 
61 static struct spdk_scheduler *
62 _scheduler_find(const char *name)
63 {
64 	struct spdk_scheduler *tmp;
65 
66 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
67 		if (strcmp(name, tmp->name) == 0) {
68 			return tmp;
69 		}
70 	}
71 
72 	return NULL;
73 }
74 
75 int
76 spdk_scheduler_set(const char *name)
77 {
78 	struct spdk_scheduler *scheduler;
79 	int rc = 0;
80 
81 	/* NULL scheduler was specifically requested */
82 	if (name == NULL) {
83 		if (g_scheduler) {
84 			g_scheduler->deinit();
85 		}
86 		g_scheduler = NULL;
87 		return 0;
88 	}
89 
90 	scheduler = _scheduler_find(name);
91 	if (scheduler == NULL) {
92 		SPDK_ERRLOG("Requested scheduler is missing\n");
93 		return -EINVAL;
94 	}
95 
96 	if (g_scheduler == scheduler) {
97 		return 0;
98 	}
99 
100 	rc = scheduler->init();
101 	if (rc == 0) {
102 		if (g_scheduler) {
103 			g_scheduler->deinit();
104 		}
105 		g_scheduler = scheduler;
106 	}
107 
108 	return rc;
109 }
110 
111 struct spdk_scheduler *
112 spdk_scheduler_get(void)
113 {
114 	return g_scheduler;
115 }
116 
117 uint64_t
118 spdk_scheduler_get_period(void)
119 {
120 	/* Convert from ticks to microseconds */
121 	return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
122 }
123 
124 void
125 spdk_scheduler_set_period(uint64_t period)
126 {
127 	/* Convert microseconds to ticks */
128 	g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
129 }
130 
131 void
132 spdk_scheduler_register(struct spdk_scheduler *scheduler)
133 {
134 	if (_scheduler_find(scheduler->name)) {
135 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
136 		assert(false);
137 		return;
138 	}
139 
140 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
141 }
142 
143 static void
144 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
145 {
146 	reactor->lcore = lcore;
147 	reactor->flags.is_valid = true;
148 
149 	TAILQ_INIT(&reactor->threads);
150 	reactor->thread_count = 0;
151 	spdk_cpuset_zero(&reactor->notify_cpuset);
152 
153 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
154 	if (reactor->events == NULL) {
155 		SPDK_ERRLOG("Failed to allocate events ring\n");
156 		assert(false);
157 	}
158 
159 	/* Always initialize interrupt facilities for reactor */
160 	if (reactor_interrupt_init(reactor) != 0) {
161 		/* Reactor interrupt facilities are necessary if seting app to interrupt mode. */
162 		if (spdk_interrupt_mode_is_enabled()) {
163 			SPDK_ERRLOG("Failed to prepare intr facilities\n");
164 			assert(false);
165 		}
166 		return;
167 	}
168 
169 	/* If application runs with full interrupt ability,
170 	 * all reactors are going to run in interrupt mode.
171 	 */
172 	if (spdk_interrupt_mode_is_enabled()) {
173 		uint32_t i;
174 
175 		SPDK_ENV_FOREACH_CORE(i) {
176 			spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
177 		}
178 		reactor->in_interrupt = true;
179 	}
180 }
181 
182 struct spdk_reactor *
183 spdk_reactor_get(uint32_t lcore)
184 {
185 	struct spdk_reactor *reactor;
186 
187 	if (g_reactors == NULL) {
188 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
189 		return NULL;
190 	}
191 
192 	if (lcore >= g_reactor_count) {
193 		return NULL;
194 	}
195 
196 	reactor = &g_reactors[lcore];
197 
198 	if (reactor->flags.is_valid == false) {
199 		return NULL;
200 	}
201 
202 	return reactor;
203 }
204 
205 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
206 static bool reactor_thread_op_supported(enum spdk_thread_op op);
207 
208 int
209 spdk_reactors_init(size_t msg_mempool_size)
210 {
211 	struct spdk_reactor *reactor;
212 	int rc;
213 	uint32_t i, current_core;
214 	char mempool_name[32];
215 
216 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
217 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
218 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
219 			       sizeof(struct spdk_event),
220 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
221 			       SPDK_ENV_SOCKET_ID_ANY);
222 
223 	if (g_spdk_event_mempool == NULL) {
224 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
225 		return -1;
226 	}
227 
228 	/* struct spdk_reactor must be aligned on 64 byte boundary */
229 	g_reactor_count = spdk_env_get_last_core() + 1;
230 	rc = posix_memalign((void **)&g_reactors, 64,
231 			    g_reactor_count * sizeof(struct spdk_reactor));
232 	if (rc != 0) {
233 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
234 			    g_reactor_count);
235 		spdk_mempool_free(g_spdk_event_mempool);
236 		return -1;
237 	}
238 
239 	g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos));
240 	if (g_core_infos == NULL) {
241 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
242 		spdk_mempool_free(g_spdk_event_mempool);
243 		free(g_reactors);
244 		return -ENOMEM;
245 	}
246 
247 	memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor));
248 
249 	rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
250 				      sizeof(struct spdk_lw_thread), msg_mempool_size);
251 	if (rc != 0) {
252 		SPDK_ERRLOG("Initialize spdk thread lib failed\n");
253 		spdk_mempool_free(g_spdk_event_mempool);
254 		free(g_reactors);
255 		free(g_core_infos);
256 		return rc;
257 	}
258 
259 	SPDK_ENV_FOREACH_CORE(i) {
260 		reactor_construct(&g_reactors[i], i);
261 	}
262 
263 	current_core = spdk_env_get_current_core();
264 	reactor = spdk_reactor_get(current_core);
265 	assert(reactor != NULL);
266 	g_scheduling_reactor = reactor;
267 
268 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
269 
270 	return 0;
271 }
272 
273 void
274 spdk_reactors_fini(void)
275 {
276 	uint32_t i;
277 	struct spdk_reactor *reactor;
278 
279 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
280 		return;
281 	}
282 
283 	spdk_thread_lib_fini();
284 
285 	SPDK_ENV_FOREACH_CORE(i) {
286 		reactor = spdk_reactor_get(i);
287 		assert(reactor != NULL);
288 		assert(reactor->thread_count == 0);
289 		if (reactor->events != NULL) {
290 			spdk_ring_free(reactor->events);
291 		}
292 
293 		reactor_interrupt_fini(reactor);
294 
295 		if (g_core_infos != NULL) {
296 			free(g_core_infos[i].thread_infos);
297 		}
298 	}
299 
300 	spdk_mempool_free(g_spdk_event_mempool);
301 
302 	free(g_reactors);
303 	g_reactors = NULL;
304 	free(g_core_infos);
305 	g_core_infos = NULL;
306 }
307 
308 static void _reactor_set_interrupt_mode(void *arg1, void *arg2);
309 
310 static void
311 _reactor_set_notify_cpuset(void *arg1, void *arg2)
312 {
313 	struct spdk_reactor *target = arg1;
314 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
315 
316 	assert(reactor != NULL);
317 	spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt);
318 }
319 
320 static void
321 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
322 {
323 	struct spdk_event *ev;
324 
325 	ev = spdk_event_allocate(lcore, fn, arg1, arg2);
326 	assert(ev);
327 	spdk_event_call(ev);
328 }
329 
330 static void
331 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2)
332 {
333 	struct spdk_reactor *target = arg1;
334 
335 	if (target->new_in_interrupt == false) {
336 		target->set_interrupt_mode_in_progress = false;
337 		spdk_thread_send_msg(spdk_thread_get_app_thread(), target->set_interrupt_mode_cb_fn,
338 				     target->set_interrupt_mode_cb_arg);
339 	} else {
340 		_event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL);
341 	}
342 }
343 
344 static void
345 _reactor_set_thread_interrupt_mode(void *ctx)
346 {
347 	struct spdk_reactor *reactor = ctx;
348 
349 	spdk_thread_set_interrupt_mode(reactor->in_interrupt);
350 }
351 
352 static void
353 _reactor_set_interrupt_mode(void *arg1, void *arg2)
354 {
355 	struct spdk_reactor *target = arg1;
356 	struct spdk_thread *thread;
357 	struct spdk_lw_thread *lw_thread, *tmp;
358 
359 	assert(target == spdk_reactor_get(spdk_env_get_current_core()));
360 	assert(target != NULL);
361 	assert(target->in_interrupt != target->new_in_interrupt);
362 	SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n",
363 		      target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll");
364 
365 	target->in_interrupt = target->new_in_interrupt;
366 
367 	if (spdk_interrupt_mode_is_enabled()) {
368 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
369 		TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) {
370 			thread = spdk_thread_get_from_ctx(lw_thread);
371 			spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target);
372 		}
373 	}
374 
375 	if (target->new_in_interrupt == false) {
376 		/* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately
377 		 * track reactor stats. */
378 		target->tsc_last = spdk_get_ticks();
379 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
380 	} else {
381 		uint64_t notify = 1;
382 		int rc = 0;
383 
384 		/* Always trigger spdk_event and resched event in case of race condition */
385 		rc = write(target->events_fd, &notify, sizeof(notify));
386 		if (rc < 0) {
387 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
388 		}
389 		rc = write(target->resched_fd, &notify, sizeof(notify));
390 		if (rc < 0) {
391 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
392 		}
393 
394 		target->set_interrupt_mode_in_progress = false;
395 		spdk_thread_send_msg(spdk_thread_get_app_thread(), target->set_interrupt_mode_cb_fn,
396 				     target->set_interrupt_mode_cb_arg);
397 	}
398 }
399 
400 int
401 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt,
402 				spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg)
403 {
404 	struct spdk_reactor *target;
405 
406 	target = spdk_reactor_get(lcore);
407 	if (target == NULL) {
408 		return -EINVAL;
409 	}
410 
411 	/* Eventfd has to be supported in order to use interrupt functionality. */
412 	if (target->fgrp == NULL) {
413 		return -ENOTSUP;
414 	}
415 
416 	if (spdk_get_thread() != spdk_thread_get_app_thread()) {
417 		SPDK_ERRLOG("It is only permitted within spdk application thread.\n");
418 		return -EPERM;
419 	}
420 
421 	if (target->in_interrupt == new_in_interrupt) {
422 		cb_fn(cb_arg);
423 		return 0;
424 	}
425 
426 	if (target->set_interrupt_mode_in_progress) {
427 		SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore);
428 		return -EBUSY;
429 	}
430 	target->set_interrupt_mode_in_progress = true;
431 
432 	target->new_in_interrupt = new_in_interrupt;
433 	target->set_interrupt_mode_cb_fn = cb_fn;
434 	target->set_interrupt_mode_cb_arg = cb_arg;
435 
436 	SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n",
437 		      spdk_env_get_current_core(), lcore);
438 
439 	if (new_in_interrupt == false) {
440 		/* For potential race cases, when setting the reactor to poll mode,
441 		 * first change the mode of the reactor and then clear the corresponding
442 		 * bit of the notify_cpuset of each reactor.
443 		 */
444 		_event_call(lcore, _reactor_set_interrupt_mode, target, NULL);
445 	} else {
446 		/* For race cases, when setting the reactor to interrupt mode, first set the
447 		 * corresponding bit of the notify_cpuset of each reactor and then change the mode.
448 		 */
449 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
450 	}
451 
452 	return 0;
453 }
454 
455 struct spdk_event *
456 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
457 {
458 	struct spdk_event *event = NULL;
459 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
460 
461 	if (!reactor) {
462 		assert(false);
463 		return NULL;
464 	}
465 
466 	event = spdk_mempool_get(g_spdk_event_mempool);
467 	if (event == NULL) {
468 		assert(false);
469 		return NULL;
470 	}
471 
472 	event->lcore = lcore;
473 	event->fn = fn;
474 	event->arg1 = arg1;
475 	event->arg2 = arg2;
476 
477 	return event;
478 }
479 
480 void
481 spdk_event_call(struct spdk_event *event)
482 {
483 	int rc;
484 	struct spdk_reactor *reactor;
485 	struct spdk_reactor *local_reactor = NULL;
486 	uint32_t current_core = spdk_env_get_current_core();
487 
488 	reactor = spdk_reactor_get(event->lcore);
489 
490 	assert(reactor != NULL);
491 	assert(reactor->events != NULL);
492 
493 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
494 	if (rc != 1) {
495 		assert(false);
496 	}
497 
498 	if (current_core != SPDK_ENV_LCORE_ID_ANY) {
499 		local_reactor = spdk_reactor_get(current_core);
500 	}
501 
502 	/* If spdk_event_call isn't called on a reactor, always send a notification.
503 	 * If it is called on a reactor, send a notification if the destination reactor
504 	 * is indicated in interrupt mode state.
505 	 */
506 	if (spdk_unlikely(local_reactor == NULL) ||
507 	    spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
508 		uint64_t notify = 1;
509 
510 		rc = write(reactor->events_fd, &notify, sizeof(notify));
511 		if (rc < 0) {
512 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
513 		}
514 	}
515 }
516 
517 static inline int
518 event_queue_run_batch(void *arg)
519 {
520 	struct spdk_reactor *reactor = arg;
521 	size_t count, i;
522 	void *events[SPDK_EVENT_BATCH_SIZE];
523 	struct spdk_thread *thread;
524 	struct spdk_lw_thread *lw_thread;
525 
526 #ifdef DEBUG
527 	/*
528 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
529 	 * so we will never actually read uninitialized data from events, but just to be sure
530 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
531 	 */
532 	memset(events, 0, sizeof(events));
533 #endif
534 
535 	/* Operate event notification if this reactor currently runs in interrupt state */
536 	if (spdk_unlikely(reactor->in_interrupt)) {
537 		uint64_t notify = 1;
538 		int rc;
539 
540 		/* There may be race between event_acknowledge and another producer's event_notify,
541 		 * so event_acknowledge should be applied ahead. And then check for self's event_notify.
542 		 * This can avoid event notification missing.
543 		 */
544 		rc = read(reactor->events_fd, &notify, sizeof(notify));
545 		if (rc < 0) {
546 			SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno));
547 			return -errno;
548 		}
549 
550 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
551 
552 		if (spdk_ring_count(reactor->events) != 0) {
553 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
554 			rc = write(reactor->events_fd, &notify, sizeof(notify));
555 			if (rc < 0) {
556 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
557 				return -errno;
558 			}
559 		}
560 	} else {
561 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
562 	}
563 
564 	if (count == 0) {
565 		return 0;
566 	}
567 
568 	/* Execute the events. There are still some remaining events
569 	 * that must occur on an SPDK thread. To accommodate those, try to
570 	 * run them on the first thread in the list, if it exists. */
571 	lw_thread = TAILQ_FIRST(&reactor->threads);
572 	if (lw_thread) {
573 		thread = spdk_thread_get_from_ctx(lw_thread);
574 	} else {
575 		thread = NULL;
576 	}
577 
578 	for (i = 0; i < count; i++) {
579 		struct spdk_event *event = events[i];
580 
581 		assert(event != NULL);
582 		spdk_set_thread(thread);
583 
584 		SPDK_DTRACE_PROBE3(event_exec, event->fn,
585 				   event->arg1, event->arg2);
586 		event->fn(event->arg1, event->arg2);
587 		spdk_set_thread(NULL);
588 	}
589 
590 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
591 
592 	return (int)count;
593 }
594 
595 /* 1s */
596 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
597 
598 static int
599 get_rusage(struct spdk_reactor *reactor)
600 {
601 	struct rusage		rusage;
602 
603 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
604 		return -1;
605 	}
606 
607 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
608 		SPDK_INFOLOG(reactor,
609 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
610 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
611 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
612 	}
613 	reactor->rusage = rusage;
614 
615 	return -1;
616 }
617 
618 void
619 spdk_framework_enable_context_switch_monitor(bool enable)
620 {
621 	/* This global is being read by multiple threads, so this isn't
622 	 * strictly thread safe. However, we're toggling between true and
623 	 * false here, and if a thread sees the value update later than it
624 	 * should, it's no big deal. */
625 	g_framework_context_switch_monitor_enabled = enable;
626 }
627 
628 bool
629 spdk_framework_context_switch_monitor_enabled(void)
630 {
631 	return g_framework_context_switch_monitor_enabled;
632 }
633 
634 static void
635 _set_thread_name(const char *thread_name)
636 {
637 #if defined(__linux__)
638 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
639 #elif defined(__FreeBSD__)
640 	pthread_set_name_np(pthread_self(), thread_name);
641 #else
642 	pthread_setname_np(pthread_self(), thread_name);
643 #endif
644 }
645 
646 static void
647 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
648 {
649 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
650 	struct spdk_thread_stats prev_total_stats;
651 
652 	/* Read total_stats before updating it to calculate stats during the last scheduling period. */
653 	prev_total_stats = lw_thread->total_stats;
654 
655 	spdk_set_thread(thread);
656 	spdk_thread_get_stats(&lw_thread->total_stats);
657 	spdk_set_thread(NULL);
658 
659 	lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc;
660 	lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc;
661 }
662 
663 static void
664 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info)
665 {
666 	struct spdk_lw_thread *lw_thread;
667 	struct spdk_thread *thread;
668 
669 	thread = spdk_thread_get_by_id(thread_info->thread_id);
670 	if (thread == NULL) {
671 		/* Thread no longer exists. */
672 		return;
673 	}
674 	lw_thread = spdk_thread_get_ctx(thread);
675 	assert(lw_thread != NULL);
676 
677 	lw_thread->lcore = thread_info->lcore;
678 	lw_thread->resched = true;
679 }
680 
681 static void
682 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
683 {
684 	struct spdk_scheduler_core_info *core;
685 	struct spdk_scheduler_thread_info *thread_info;
686 	uint32_t i, j;
687 
688 	SPDK_ENV_FOREACH_CORE(i) {
689 		core = &cores_info[i];
690 		for (j = 0; j < core->threads_count; j++) {
691 			thread_info = &core->thread_infos[j];
692 			if (thread_info->lcore != i) {
693 				_threads_reschedule_thread(thread_info);
694 			}
695 		}
696 		core->threads_count = 0;
697 		free(core->thread_infos);
698 		core->thread_infos = NULL;
699 	}
700 }
701 
702 static void
703 _reactors_scheduler_fini(void)
704 {
705 	/* Reschedule based on the balancing output */
706 	_threads_reschedule(g_core_infos);
707 
708 	g_scheduling_in_progress = false;
709 }
710 
711 static void
712 _reactors_scheduler_update_core_mode(void *ctx)
713 {
714 	struct spdk_reactor *reactor;
715 	uint32_t i;
716 	int rc = 0;
717 
718 	for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) {
719 		reactor = spdk_reactor_get(i);
720 		assert(reactor != NULL);
721 		if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) {
722 			/* Switch next found reactor to new state */
723 			rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode,
724 							     _reactors_scheduler_update_core_mode, NULL);
725 			if (rc == 0) {
726 				/* Set core to start with after callback completes */
727 				g_scheduler_core_number = spdk_env_get_next_core(i);
728 				return;
729 			}
730 		}
731 	}
732 	_reactors_scheduler_fini();
733 }
734 
735 static void
736 _reactors_scheduler_cancel(void *arg1, void *arg2)
737 {
738 	struct spdk_scheduler_core_info *core;
739 	uint32_t i;
740 
741 	SPDK_ENV_FOREACH_CORE(i) {
742 		core = &g_core_infos[i];
743 		core->threads_count = 0;
744 		free(core->thread_infos);
745 		core->thread_infos = NULL;
746 	}
747 
748 	g_scheduling_in_progress = false;
749 }
750 
751 static void
752 _reactors_scheduler_balance(void *arg1, void *arg2)
753 {
754 	struct spdk_scheduler *scheduler = spdk_scheduler_get();
755 
756 	if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) {
757 		_reactors_scheduler_cancel(NULL, NULL);
758 		return;
759 	}
760 
761 	scheduler->balance(g_core_infos, g_reactor_count);
762 
763 	g_scheduler_core_number = spdk_env_get_first_core();
764 	_reactors_scheduler_update_core_mode(NULL);
765 }
766 
767 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
768 static void
769 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
770 {
771 	struct spdk_scheduler_core_info *core_info;
772 	struct spdk_lw_thread *lw_thread;
773 	struct spdk_thread *thread;
774 	struct spdk_reactor *reactor;
775 	uint32_t next_core;
776 	uint32_t i = 0;
777 
778 	reactor = spdk_reactor_get(spdk_env_get_current_core());
779 	assert(reactor != NULL);
780 	core_info = &g_core_infos[reactor->lcore];
781 	core_info->lcore = reactor->lcore;
782 	core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc;
783 	core_info->total_idle_tsc = reactor->idle_tsc;
784 	core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc;
785 	core_info->total_busy_tsc = reactor->busy_tsc;
786 	core_info->interrupt_mode = reactor->in_interrupt;
787 	core_info->threads_count = 0;
788 
789 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
790 
791 	if (reactor->thread_count > 0) {
792 		core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos));
793 		if (core_info->thread_infos == NULL) {
794 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
795 
796 			/* Cancel this round of schedule work */
797 			_event_call(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL);
798 			return;
799 		}
800 
801 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
802 			_init_thread_stats(reactor, lw_thread);
803 
804 			core_info->thread_infos[i].lcore = lw_thread->lcore;
805 			thread = spdk_thread_get_from_ctx(lw_thread);
806 			assert(thread != NULL);
807 			core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread);
808 			core_info->thread_infos[i].total_stats = lw_thread->total_stats;
809 			core_info->thread_infos[i].current_stats = lw_thread->current_stats;
810 			core_info->threads_count++;
811 			assert(core_info->threads_count <= reactor->thread_count);
812 			i++;
813 		}
814 	}
815 
816 	next_core = spdk_env_get_next_core(reactor->lcore);
817 	if (next_core == UINT32_MAX) {
818 		next_core = spdk_env_get_first_core();
819 	}
820 
821 	/* If we've looped back around to the scheduler thread, move to the next phase */
822 	if (next_core == g_scheduling_reactor->lcore) {
823 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
824 		_event_call(next_core, _reactors_scheduler_balance, NULL, NULL);
825 		return;
826 	}
827 
828 	_event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
829 }
830 
831 static int _reactor_schedule_thread(struct spdk_thread *thread);
832 static uint64_t g_rusage_period;
833 
834 static void
835 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
836 {
837 	struct spdk_thread	*thread = spdk_thread_get_from_ctx(lw_thread);
838 	int efd;
839 
840 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
841 	assert(reactor->thread_count > 0);
842 	reactor->thread_count--;
843 
844 	/* Operate thread intr if running with full interrupt ability */
845 	if (spdk_interrupt_mode_is_enabled()) {
846 		efd = spdk_thread_get_interrupt_fd(thread);
847 		spdk_fd_group_remove(reactor->fgrp, efd);
848 	}
849 }
850 
851 static bool
852 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
853 {
854 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
855 
856 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
857 			  spdk_thread_is_idle(thread))) {
858 		_reactor_remove_lw_thread(reactor, lw_thread);
859 		spdk_thread_destroy(thread);
860 		return true;
861 	}
862 
863 	if (spdk_unlikely(lw_thread->resched)) {
864 		lw_thread->resched = false;
865 		_reactor_remove_lw_thread(reactor, lw_thread);
866 		_reactor_schedule_thread(thread);
867 		return true;
868 	}
869 
870 	return false;
871 }
872 
873 static void
874 reactor_interrupt_run(struct spdk_reactor *reactor)
875 {
876 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
877 
878 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
879 }
880 
881 static void
882 _reactor_run(struct spdk_reactor *reactor)
883 {
884 	struct spdk_thread	*thread;
885 	struct spdk_lw_thread	*lw_thread, *tmp;
886 	uint64_t		now;
887 	int			rc;
888 
889 	event_queue_run_batch(reactor);
890 
891 	/* If no threads are present on the reactor,
892 	 * tsc_last gets outdated. Update it to track
893 	 * thread execution time correctly. */
894 	if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) {
895 		now = spdk_get_ticks();
896 		reactor->idle_tsc += now - reactor->tsc_last;
897 		reactor->tsc_last = now;
898 		return;
899 	}
900 
901 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
902 		thread = spdk_thread_get_from_ctx(lw_thread);
903 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
904 
905 		now = spdk_thread_get_last_tsc(thread);
906 		if (rc == 0) {
907 			reactor->idle_tsc += now - reactor->tsc_last;
908 		} else if (rc > 0) {
909 			reactor->busy_tsc += now - reactor->tsc_last;
910 		}
911 		reactor->tsc_last = now;
912 
913 		reactor_post_process_lw_thread(reactor, lw_thread);
914 	}
915 }
916 
917 static int
918 reactor_run(void *arg)
919 {
920 	struct spdk_reactor	*reactor = arg;
921 	struct spdk_thread	*thread;
922 	struct spdk_lw_thread	*lw_thread, *tmp;
923 	char			thread_name[32];
924 	uint64_t		last_sched = 0;
925 
926 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
927 
928 	/* Rename the POSIX thread because the reactor is tied to the POSIX
929 	 * thread in the SPDK event library.
930 	 */
931 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
932 	_set_thread_name(thread_name);
933 
934 	reactor->tsc_last = spdk_get_ticks();
935 
936 	while (1) {
937 		/* Execute interrupt process fn if this reactor currently runs in interrupt state */
938 		if (spdk_unlikely(reactor->in_interrupt)) {
939 			reactor_interrupt_run(reactor);
940 		} else {
941 			_reactor_run(reactor);
942 		}
943 
944 		if (g_framework_context_switch_monitor_enabled) {
945 			if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
946 				get_rusage(reactor);
947 				reactor->last_rusage = reactor->tsc_last;
948 			}
949 		}
950 
951 		if (spdk_unlikely(g_scheduler_period > 0 &&
952 				  (reactor->tsc_last - last_sched) > g_scheduler_period &&
953 				  reactor == g_scheduling_reactor &&
954 				  !g_scheduling_in_progress)) {
955 			last_sched = reactor->tsc_last;
956 			g_scheduling_in_progress = true;
957 			_reactors_scheduler_gather_metrics(NULL, NULL);
958 		}
959 
960 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
961 			break;
962 		}
963 	}
964 
965 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
966 		thread = spdk_thread_get_from_ctx(lw_thread);
967 		/* All threads should have already had spdk_thread_exit() called on them, except
968 		 * for the app thread.
969 		 */
970 		if (spdk_thread_is_running(thread)) {
971 			if (thread != spdk_thread_get_app_thread()) {
972 				SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n",
973 					    spdk_thread_get_name(thread));
974 				SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n");
975 			}
976 			spdk_set_thread(thread);
977 			spdk_thread_exit(thread);
978 		}
979 	}
980 
981 	while (!TAILQ_EMPTY(&reactor->threads)) {
982 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
983 			thread = spdk_thread_get_from_ctx(lw_thread);
984 			spdk_set_thread(thread);
985 			if (spdk_thread_is_exited(thread)) {
986 				_reactor_remove_lw_thread(reactor, lw_thread);
987 				spdk_thread_destroy(thread);
988 			} else {
989 				spdk_thread_poll(thread, 0, 0);
990 			}
991 		}
992 	}
993 
994 	return 0;
995 }
996 
997 int
998 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
999 {
1000 	int ret;
1001 	const struct spdk_cpuset *validmask;
1002 
1003 	ret = spdk_cpuset_parse(cpumask, mask);
1004 	if (ret < 0) {
1005 		return ret;
1006 	}
1007 
1008 	validmask = spdk_app_get_core_mask();
1009 	spdk_cpuset_and(cpumask, validmask);
1010 
1011 	return 0;
1012 }
1013 
1014 const struct spdk_cpuset *
1015 spdk_app_get_core_mask(void)
1016 {
1017 	return &g_reactor_core_mask;
1018 }
1019 
1020 void
1021 spdk_reactors_start(void)
1022 {
1023 	struct spdk_reactor *reactor;
1024 	uint32_t i, current_core;
1025 	int rc;
1026 
1027 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
1028 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
1029 	/* Reinitialize to false, in case the app framework is restarting in the same process. */
1030 	g_stopping_reactors = false;
1031 
1032 	current_core = spdk_env_get_current_core();
1033 	SPDK_ENV_FOREACH_CORE(i) {
1034 		if (i != current_core) {
1035 			reactor = spdk_reactor_get(i);
1036 			if (reactor == NULL) {
1037 				continue;
1038 			}
1039 
1040 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
1041 			if (rc < 0) {
1042 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
1043 				assert(false);
1044 				return;
1045 			}
1046 		}
1047 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
1048 	}
1049 
1050 	/* Start the main reactor */
1051 	reactor = spdk_reactor_get(current_core);
1052 	assert(reactor != NULL);
1053 	reactor_run(reactor);
1054 
1055 	spdk_env_thread_wait_all();
1056 
1057 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
1058 }
1059 
1060 static void
1061 _reactors_stop(void *arg1, void *arg2)
1062 {
1063 	uint32_t i;
1064 	int rc;
1065 	struct spdk_reactor *reactor;
1066 	struct spdk_reactor *local_reactor;
1067 	uint64_t notify = 1;
1068 
1069 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
1070 	local_reactor = spdk_reactor_get(spdk_env_get_current_core());
1071 
1072 	SPDK_ENV_FOREACH_CORE(i) {
1073 		/* If spdk_event_call isn't called  on a reactor, always send a notification.
1074 		 * If it is called on a reactor, send a notification if the destination reactor
1075 		 * is indicated in interrupt mode state.
1076 		 */
1077 		if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
1078 			reactor = spdk_reactor_get(i);
1079 			assert(reactor != NULL);
1080 			rc = write(reactor->events_fd, &notify, sizeof(notify));
1081 			if (rc < 0) {
1082 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
1083 				continue;
1084 			}
1085 		}
1086 	}
1087 }
1088 
1089 static void
1090 nop(void *arg1, void *arg2)
1091 {
1092 }
1093 
1094 void
1095 spdk_reactors_stop(void *arg1)
1096 {
1097 	spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop);
1098 }
1099 
1100 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
1101 static uint32_t g_next_core = UINT32_MAX;
1102 
1103 static int
1104 thread_process_interrupts(void *arg)
1105 {
1106 	struct spdk_thread *thread = arg;
1107 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
1108 	uint64_t now;
1109 	int rc;
1110 
1111 	assert(reactor != NULL);
1112 
1113 	/* Update idle_tsc between the end of last intr_fn and the start of this intr_fn. */
1114 	now = spdk_get_ticks();
1115 	reactor->idle_tsc += now - reactor->tsc_last;
1116 	reactor->tsc_last = now;
1117 
1118 	rc = spdk_thread_poll(thread, 0, now);
1119 
1120 	/* Update tsc between the start and the end of this intr_fn. */
1121 	now = spdk_thread_get_last_tsc(thread);
1122 	if (rc == 0) {
1123 		reactor->idle_tsc += now - reactor->tsc_last;
1124 	} else if (rc > 0) {
1125 		reactor->busy_tsc += now - reactor->tsc_last;
1126 	}
1127 	reactor->tsc_last = now;
1128 
1129 	return rc;
1130 }
1131 
1132 static void
1133 _schedule_thread(void *arg1, void *arg2)
1134 {
1135 	struct spdk_lw_thread *lw_thread = arg1;
1136 	struct spdk_thread *thread;
1137 	struct spdk_reactor *reactor;
1138 	uint32_t current_core;
1139 	int efd;
1140 
1141 	current_core = spdk_env_get_current_core();
1142 	reactor = spdk_reactor_get(current_core);
1143 	assert(reactor != NULL);
1144 
1145 	/* Update total_stats to reflect state of thread
1146 	* at the end of the move. */
1147 	thread = spdk_thread_get_from_ctx(lw_thread);
1148 	spdk_set_thread(thread);
1149 	spdk_thread_get_stats(&lw_thread->total_stats);
1150 	spdk_set_thread(NULL);
1151 
1152 	lw_thread->lcore = current_core;
1153 
1154 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
1155 	reactor->thread_count++;
1156 
1157 	/* Operate thread intr if running with full interrupt ability */
1158 	if (spdk_interrupt_mode_is_enabled()) {
1159 		int rc;
1160 
1161 		efd = spdk_thread_get_interrupt_fd(thread);
1162 		rc = SPDK_FD_GROUP_ADD(reactor->fgrp, efd,
1163 				       thread_process_interrupts, thread);
1164 		if (rc < 0) {
1165 			SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
1166 		}
1167 
1168 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
1169 		spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor);
1170 	}
1171 }
1172 
1173 static int
1174 _reactor_schedule_thread(struct spdk_thread *thread)
1175 {
1176 	uint32_t core;
1177 	struct spdk_lw_thread *lw_thread;
1178 	struct spdk_event *evt = NULL;
1179 	struct spdk_cpuset *cpumask;
1180 	uint32_t i;
1181 	struct spdk_reactor *local_reactor = NULL;
1182 	uint32_t current_lcore = spdk_env_get_current_core();
1183 	struct spdk_cpuset polling_cpumask;
1184 	struct spdk_cpuset valid_cpumask;
1185 
1186 	cpumask = spdk_thread_get_cpumask(thread);
1187 
1188 	lw_thread = spdk_thread_get_ctx(thread);
1189 	assert(lw_thread != NULL);
1190 	core = lw_thread->lcore;
1191 	memset(lw_thread, 0, sizeof(*lw_thread));
1192 
1193 	if (current_lcore != SPDK_ENV_LCORE_ID_ANY) {
1194 		local_reactor = spdk_reactor_get(current_lcore);
1195 		assert(local_reactor);
1196 	}
1197 
1198 	/* When interrupt ability of spdk_thread is not enabled and the current
1199 	 * reactor runs on DPDK thread, skip reactors which are in interrupt mode.
1200 	 */
1201 	if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) {
1202 		/* Get the cpumask of all reactors in polling */
1203 		spdk_cpuset_zero(&polling_cpumask);
1204 		SPDK_ENV_FOREACH_CORE(i) {
1205 			spdk_cpuset_set_cpu(&polling_cpumask, i, true);
1206 		}
1207 		spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset);
1208 
1209 		if (core == SPDK_ENV_LCORE_ID_ANY) {
1210 			/* Get the cpumask of all valid reactors which are suggested and also in polling */
1211 			spdk_cpuset_copy(&valid_cpumask, &polling_cpumask);
1212 			spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread));
1213 
1214 			/* If there are any valid reactors, spdk_thread should be scheduled
1215 			 * into one of the valid reactors.
1216 			 * If there is no valid reactors, spdk_thread should be scheduled
1217 			 * into one of the polling reactors.
1218 			 */
1219 			if (spdk_cpuset_count(&valid_cpumask) != 0) {
1220 				cpumask = &valid_cpumask;
1221 			} else {
1222 				cpumask = &polling_cpumask;
1223 			}
1224 		} else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) {
1225 			/* If specified reactor is not in polling, spdk_thread should be scheduled
1226 			 * into one of the polling reactors.
1227 			 */
1228 			core = SPDK_ENV_LCORE_ID_ANY;
1229 			cpumask = &polling_cpumask;
1230 		}
1231 	}
1232 
1233 	pthread_mutex_lock(&g_scheduler_mtx);
1234 	if (core == SPDK_ENV_LCORE_ID_ANY) {
1235 		for (i = 0; i < spdk_env_get_core_count(); i++) {
1236 			if (g_next_core >= g_reactor_count) {
1237 				g_next_core = spdk_env_get_first_core();
1238 			}
1239 			core = g_next_core;
1240 			g_next_core = spdk_env_get_next_core(g_next_core);
1241 
1242 			if (spdk_cpuset_get_cpu(cpumask, core)) {
1243 				break;
1244 			}
1245 		}
1246 	}
1247 
1248 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
1249 
1250 	pthread_mutex_unlock(&g_scheduler_mtx);
1251 
1252 	assert(evt != NULL);
1253 	if (evt == NULL) {
1254 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
1255 		return -1;
1256 	}
1257 
1258 	lw_thread->tsc_start = spdk_get_ticks();
1259 
1260 	spdk_event_call(evt);
1261 
1262 	return 0;
1263 }
1264 
1265 static void
1266 _reactor_request_thread_reschedule(struct spdk_thread *thread)
1267 {
1268 	struct spdk_lw_thread *lw_thread;
1269 	struct spdk_reactor *reactor;
1270 	uint32_t current_core;
1271 
1272 	assert(thread == spdk_get_thread());
1273 
1274 	lw_thread = spdk_thread_get_ctx(thread);
1275 
1276 	assert(lw_thread != NULL);
1277 	lw_thread->resched = true;
1278 	lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1279 
1280 	current_core = spdk_env_get_current_core();
1281 	reactor = spdk_reactor_get(current_core);
1282 	assert(reactor != NULL);
1283 
1284 	/* Send a notification if the destination reactor is indicated in intr mode state */
1285 	if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
1286 		uint64_t notify = 1;
1287 
1288 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1289 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
1290 		}
1291 	}
1292 }
1293 
1294 static int
1295 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
1296 {
1297 	struct spdk_lw_thread *lw_thread;
1298 
1299 	switch (op) {
1300 	case SPDK_THREAD_OP_NEW:
1301 		lw_thread = spdk_thread_get_ctx(thread);
1302 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1303 		return _reactor_schedule_thread(thread);
1304 	case SPDK_THREAD_OP_RESCHED:
1305 		_reactor_request_thread_reschedule(thread);
1306 		return 0;
1307 	default:
1308 		return -ENOTSUP;
1309 	}
1310 }
1311 
1312 static bool
1313 reactor_thread_op_supported(enum spdk_thread_op op)
1314 {
1315 	switch (op) {
1316 	case SPDK_THREAD_OP_NEW:
1317 	case SPDK_THREAD_OP_RESCHED:
1318 		return true;
1319 	default:
1320 		return false;
1321 	}
1322 }
1323 
1324 struct call_reactor {
1325 	uint32_t cur_core;
1326 	spdk_event_fn fn;
1327 	void *arg1;
1328 	void *arg2;
1329 
1330 	uint32_t orig_core;
1331 	spdk_event_fn cpl;
1332 };
1333 
1334 static void
1335 on_reactor(void *arg1, void *arg2)
1336 {
1337 	struct call_reactor *cr = arg1;
1338 	struct spdk_event *evt;
1339 
1340 	cr->fn(cr->arg1, cr->arg2);
1341 
1342 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1343 
1344 	if (cr->cur_core >= g_reactor_count) {
1345 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1346 
1347 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1348 		free(cr);
1349 	} else {
1350 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1351 			      cr->cur_core);
1352 
1353 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1354 	}
1355 	assert(evt != NULL);
1356 	spdk_event_call(evt);
1357 }
1358 
1359 void
1360 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1361 {
1362 	struct call_reactor *cr;
1363 
1364 	/* When the application framework is shutting down, we will send one
1365 	 * final for_each_reactor operation with completion callback _reactors_stop,
1366 	 * to flush any existing for_each_reactor operations to avoid any memory
1367 	 * leaks. We use a mutex here to protect a boolean flag that will ensure
1368 	 * we don't start any more operations once we've started shutting down.
1369 	 */
1370 	pthread_mutex_lock(&g_stopping_reactors_mtx);
1371 	if (g_stopping_reactors) {
1372 		pthread_mutex_unlock(&g_stopping_reactors_mtx);
1373 		return;
1374 	} else if (cpl == _reactors_stop) {
1375 		g_stopping_reactors = true;
1376 	}
1377 	pthread_mutex_unlock(&g_stopping_reactors_mtx);
1378 
1379 	cr = calloc(1, sizeof(*cr));
1380 	if (!cr) {
1381 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1382 		cpl(arg1, arg2);
1383 		return;
1384 	}
1385 
1386 	cr->fn = fn;
1387 	cr->arg1 = arg1;
1388 	cr->arg2 = arg2;
1389 	cr->cpl = cpl;
1390 	cr->orig_core = spdk_env_get_current_core();
1391 	cr->cur_core = spdk_env_get_first_core();
1392 
1393 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1394 
1395 	_event_call(cr->cur_core, on_reactor, cr, NULL);
1396 }
1397 
1398 #ifdef __linux__
1399 static int
1400 reactor_schedule_thread_event(void *arg)
1401 {
1402 	struct spdk_reactor *reactor = arg;
1403 	struct spdk_lw_thread *lw_thread, *tmp;
1404 	uint32_t count = 0;
1405 	uint64_t notify = 1;
1406 
1407 	assert(reactor->in_interrupt);
1408 
1409 	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1410 		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
1411 		return -errno;
1412 	}
1413 
1414 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1415 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1416 	}
1417 
1418 	return count;
1419 }
1420 
1421 static int
1422 reactor_interrupt_init(struct spdk_reactor *reactor)
1423 {
1424 	int rc;
1425 
1426 	rc = spdk_fd_group_create(&reactor->fgrp);
1427 	if (rc != 0) {
1428 		return rc;
1429 	}
1430 
1431 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1432 	if (reactor->resched_fd < 0) {
1433 		rc = -EBADF;
1434 		goto err;
1435 	}
1436 
1437 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event,
1438 			       reactor);
1439 	if (rc) {
1440 		close(reactor->resched_fd);
1441 		goto err;
1442 	}
1443 
1444 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1445 	if (reactor->events_fd < 0) {
1446 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1447 		close(reactor->resched_fd);
1448 
1449 		rc = -EBADF;
1450 		goto err;
1451 	}
1452 
1453 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd,
1454 			       event_queue_run_batch, reactor);
1455 	if (rc) {
1456 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1457 		close(reactor->resched_fd);
1458 		close(reactor->events_fd);
1459 		goto err;
1460 	}
1461 
1462 	return 0;
1463 
1464 err:
1465 	spdk_fd_group_destroy(reactor->fgrp);
1466 	reactor->fgrp = NULL;
1467 	return rc;
1468 }
1469 #else
1470 static int
1471 reactor_interrupt_init(struct spdk_reactor *reactor)
1472 {
1473 	return -ENOTSUP;
1474 }
1475 #endif
1476 
1477 static void
1478 reactor_interrupt_fini(struct spdk_reactor *reactor)
1479 {
1480 	struct spdk_fd_group *fgrp = reactor->fgrp;
1481 
1482 	if (!fgrp) {
1483 		return;
1484 	}
1485 
1486 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1487 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1488 
1489 	close(reactor->events_fd);
1490 	close(reactor->resched_fd);
1491 
1492 	spdk_fd_group_destroy(fgrp);
1493 	reactor->fgrp = NULL;
1494 }
1495 
1496 static struct spdk_governor *
1497 _governor_find(const char *name)
1498 {
1499 	struct spdk_governor *governor, *tmp;
1500 
1501 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1502 		if (strcmp(name, governor->name) == 0) {
1503 			return governor;
1504 		}
1505 	}
1506 
1507 	return NULL;
1508 }
1509 
1510 int
1511 spdk_governor_set(const char *name)
1512 {
1513 	struct spdk_governor *governor;
1514 	int rc = 0;
1515 
1516 	/* NULL governor was specifically requested */
1517 	if (name == NULL) {
1518 		if (g_governor) {
1519 			g_governor->deinit();
1520 		}
1521 		g_governor = NULL;
1522 		return 0;
1523 	}
1524 
1525 	governor = _governor_find(name);
1526 	if (governor == NULL) {
1527 		return -EINVAL;
1528 	}
1529 
1530 	if (g_governor == governor) {
1531 		return 0;
1532 	}
1533 
1534 	rc = governor->init();
1535 	if (rc == 0) {
1536 		if (g_governor) {
1537 			g_governor->deinit();
1538 		}
1539 		g_governor = governor;
1540 	}
1541 
1542 	return rc;
1543 }
1544 
1545 struct spdk_governor *
1546 spdk_governor_get(void)
1547 {
1548 	return g_governor;
1549 }
1550 
1551 void
1552 spdk_governor_register(struct spdk_governor *governor)
1553 {
1554 	if (_governor_find(governor->name)) {
1555 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1556 		assert(false);
1557 		return;
1558 	}
1559 
1560 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1561 }
1562 
1563 SPDK_LOG_REGISTER_COMPONENT(reactor)
1564