xref: /spdk/lib/event/reactor.c (revision de21d8f4e45b732c13ce5c7aa1872f73bffd38aa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/likely.h"
36 
37 #include "spdk_internal/event.h"
38 
39 #include "spdk/log.h"
40 #include "spdk/thread.h"
41 #include "spdk/env.h"
42 #include "spdk/util.h"
43 #include "spdk/string.h"
44 #include "spdk/fd_group.h"
45 
46 #ifdef __linux__
47 #include <sys/prctl.h>
48 #include <sys/eventfd.h>
49 #endif
50 
51 #ifdef __FreeBSD__
52 #include <pthread_np.h>
53 #endif
54 
55 #define SPDK_EVENT_BATCH_SIZE		8
56 
57 static struct spdk_reactor *g_reactors;
58 static uint32_t g_reactor_count;
59 static struct spdk_cpuset g_reactor_core_mask;
60 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
61 
62 static bool g_framework_context_switch_monitor_enabled = true;
63 
64 static struct spdk_mempool *g_spdk_event_mempool = NULL;
65 
66 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
67 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
68 
69 static struct spdk_scheduler *g_scheduler;
70 static struct spdk_scheduler *g_new_scheduler;
71 static struct spdk_reactor *g_scheduling_reactor;
72 static uint64_t g_scheduler_period;
73 static uint32_t g_scheduler_core_number;
74 static struct spdk_scheduler_core_info *g_core_infos = NULL;
75 
76 TAILQ_HEAD(, spdk_governor) g_governor_list
77 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
78 
79 static int _governor_get_capabilities(uint32_t lcore_id,
80 				      struct spdk_governor_capabilities *capabilities);
81 
82 static struct spdk_governor g_governor = {
83 	.name = "default",
84 	.get_core_capabilities = _governor_get_capabilities,
85 };
86 
87 static int reactor_interrupt_init(struct spdk_reactor *reactor);
88 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
89 
90 static struct spdk_scheduler *
91 _scheduler_find(char *name)
92 {
93 	struct spdk_scheduler *tmp;
94 
95 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
96 		if (strcmp(name, tmp->name) == 0) {
97 			return tmp;
98 		}
99 	}
100 
101 	return NULL;
102 }
103 
104 int
105 _spdk_scheduler_set(char *name)
106 {
107 	struct spdk_scheduler *scheduler;
108 
109 	scheduler = _scheduler_find(name);
110 	if (scheduler == NULL) {
111 		SPDK_ERRLOG("Requested scheduler is missing\n");
112 		return -ENOENT;
113 	}
114 
115 	if (g_scheduling_reactor->flags.is_scheduling) {
116 		if (g_scheduler != g_new_scheduler) {
117 			/* Scheduler already changed, cannot defer multiple deinits */
118 			return -EBUSY;
119 		}
120 	} else {
121 		if (g_scheduler != NULL && g_scheduler->deinit != NULL) {
122 			g_scheduler->deinit(&g_governor);
123 		}
124 		g_scheduler = scheduler;
125 	}
126 
127 	g_new_scheduler = scheduler;
128 
129 	if (scheduler->init != NULL) {
130 		scheduler->init(&g_governor);
131 	}
132 
133 	return 0;
134 }
135 
136 struct spdk_scheduler *
137 _spdk_scheduler_get(void)
138 {
139 	return g_scheduler;
140 }
141 
142 uint64_t
143 _spdk_scheduler_period_get(void)
144 {
145 	/* Convert from ticks to microseconds */
146 	return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
147 }
148 
149 void
150 _spdk_scheduler_period_set(uint64_t period)
151 {
152 	/* Convert microseconds to ticks */
153 	g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
154 }
155 
156 void
157 _spdk_scheduler_disable(void)
158 {
159 	g_scheduler_period = 0;
160 }
161 
162 void
163 _spdk_scheduler_list_add(struct spdk_scheduler *scheduler)
164 {
165 	if (_scheduler_find(scheduler->name)) {
166 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
167 		assert(false);
168 		return;
169 	}
170 
171 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
172 }
173 
174 static void
175 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
176 {
177 	reactor->lcore = lcore;
178 	reactor->flags.is_valid = true;
179 
180 	TAILQ_INIT(&reactor->threads);
181 	reactor->thread_count = 0;
182 	spdk_cpuset_zero(&reactor->notify_cpuset);
183 
184 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
185 	if (reactor->events == NULL) {
186 		SPDK_ERRLOG("Failed to allocate events ring\n");
187 		assert(false);
188 	}
189 
190 	/* Always initialize interrupt facilities for reactor */
191 	if (reactor_interrupt_init(reactor) != 0) {
192 		/* Reactor interrupt facilities are necessary if seting app to interrupt mode. */
193 		if (spdk_interrupt_mode_is_enabled()) {
194 			SPDK_ERRLOG("Failed to prepare intr facilities\n");
195 			assert(false);
196 		}
197 		return;
198 	}
199 
200 	/* If application runs with full interrupt ability,
201 	 * all reactors are going to run in interrupt mode.
202 	 */
203 	if (spdk_interrupt_mode_is_enabled()) {
204 		uint32_t i;
205 
206 		SPDK_ENV_FOREACH_CORE(i) {
207 			spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
208 		}
209 		reactor->in_interrupt = true;
210 	}
211 }
212 
213 struct spdk_reactor *
214 spdk_reactor_get(uint32_t lcore)
215 {
216 	struct spdk_reactor *reactor;
217 
218 	if (g_reactors == NULL) {
219 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
220 		return NULL;
221 	}
222 
223 	if (lcore >= g_reactor_count) {
224 		return NULL;
225 	}
226 
227 	reactor = &g_reactors[lcore];
228 
229 	if (reactor->flags.is_valid == false) {
230 		return NULL;
231 	}
232 
233 	return reactor;
234 }
235 
236 struct spdk_reactor *
237 _spdk_get_scheduling_reactor(void)
238 {
239 	return g_scheduling_reactor;
240 }
241 
242 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
243 static bool reactor_thread_op_supported(enum spdk_thread_op op);
244 
245 int
246 spdk_reactors_init(void)
247 {
248 	struct spdk_reactor *reactor;
249 	int rc;
250 	uint32_t i, current_core;
251 	char mempool_name[32];
252 
253 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
254 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
255 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
256 			       sizeof(struct spdk_event),
257 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
258 			       SPDK_ENV_SOCKET_ID_ANY);
259 
260 	if (g_spdk_event_mempool == NULL) {
261 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
262 		return -1;
263 	}
264 
265 	/* struct spdk_reactor must be aligned on 64 byte boundary */
266 	g_reactor_count = spdk_env_get_last_core() + 1;
267 	rc = posix_memalign((void **)&g_reactors, 64,
268 			    g_reactor_count * sizeof(struct spdk_reactor));
269 	if (rc != 0) {
270 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
271 			    g_reactor_count);
272 		spdk_mempool_free(g_spdk_event_mempool);
273 		return -1;
274 	}
275 
276 	g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos));
277 	if (g_core_infos == NULL) {
278 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
279 		spdk_mempool_free(g_spdk_event_mempool);
280 		free(g_reactors);
281 		return -ENOMEM;
282 	}
283 
284 	memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor));
285 
286 	spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
287 				 sizeof(struct spdk_lw_thread));
288 
289 	SPDK_ENV_FOREACH_CORE(i) {
290 		reactor_construct(&g_reactors[i], i);
291 	}
292 
293 	current_core = spdk_env_get_current_core();
294 	reactor = spdk_reactor_get(current_core);
295 	assert(reactor != NULL);
296 	g_scheduling_reactor = reactor;
297 
298 	/* set default scheduling period to one second */
299 	g_scheduler_period = spdk_get_ticks_hz();
300 
301 	rc = _spdk_scheduler_set("static");
302 	assert(rc == 0);
303 
304 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
305 
306 	return 0;
307 }
308 
309 void
310 spdk_reactors_fini(void)
311 {
312 	uint32_t i;
313 	struct spdk_reactor *reactor;
314 
315 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
316 		return;
317 	}
318 
319 	if (g_scheduler->deinit != NULL) {
320 		g_scheduler->deinit(&g_governor);
321 	}
322 
323 	spdk_thread_lib_fini();
324 
325 	SPDK_ENV_FOREACH_CORE(i) {
326 		reactor = spdk_reactor_get(i);
327 		assert(reactor != NULL);
328 		assert(reactor->thread_count == 0);
329 		if (reactor->events != NULL) {
330 			spdk_ring_free(reactor->events);
331 		}
332 
333 		reactor_interrupt_fini(reactor);
334 
335 		if (g_core_infos != NULL) {
336 			free(g_core_infos[i].threads);
337 		}
338 	}
339 
340 	spdk_mempool_free(g_spdk_event_mempool);
341 
342 	free(g_reactors);
343 	g_reactors = NULL;
344 	free(g_core_infos);
345 	g_core_infos = NULL;
346 }
347 
348 static void _reactor_set_interrupt_mode(void *arg1, void *arg2);
349 
350 static void
351 _reactor_set_notify_cpuset(void *arg1, void *arg2)
352 {
353 	struct spdk_reactor *target = arg1;
354 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
355 
356 	spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt);
357 }
358 
359 static void
360 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2)
361 {
362 	struct spdk_reactor *target = arg1;
363 
364 	if (target->new_in_interrupt == false) {
365 		target->set_interrupt_mode_in_progress = false;
366 		spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn,
367 				     target->set_interrupt_mode_cb_arg);
368 	} else {
369 		struct spdk_event *ev;
370 
371 		ev = spdk_event_allocate(target->lcore, _reactor_set_interrupt_mode, target, NULL);
372 		assert(ev);
373 		spdk_event_call(ev);
374 	}
375 }
376 
377 static void
378 _reactor_set_interrupt_mode(void *arg1, void *arg2)
379 {
380 	struct spdk_reactor *target = arg1;
381 
382 	assert(target == spdk_reactor_get(spdk_env_get_current_core()));
383 	assert(target != NULL);
384 	assert(target->in_interrupt != target->new_in_interrupt);
385 	assert(TAILQ_EMPTY(&target->threads));
386 	SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n",
387 		      target->lcore, !target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll");
388 
389 	target->in_interrupt = target->new_in_interrupt;
390 
391 	if (target->new_in_interrupt == false) {
392 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
393 	} else {
394 		uint64_t notify = 1;
395 		int rc = 0;
396 
397 		/* Always trigger spdk_event and resched event in case of race condition */
398 		rc = write(target->events_fd, &notify, sizeof(notify));
399 		if (rc < 0) {
400 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
401 		}
402 		rc = write(target->resched_fd, &notify, sizeof(notify));
403 		if (rc < 0) {
404 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
405 		}
406 
407 		target->set_interrupt_mode_in_progress = false;
408 		spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn,
409 				     target->set_interrupt_mode_cb_arg);
410 	}
411 }
412 
413 int
414 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt,
415 				spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg)
416 {
417 	struct spdk_reactor *target;
418 
419 	target = spdk_reactor_get(lcore);
420 	if (target == NULL) {
421 		return -EINVAL;
422 	}
423 
424 	if (spdk_get_thread() != _spdk_get_app_thread()) {
425 		SPDK_ERRLOG("It is only permitted within spdk application thread.\n");
426 		return -EPERM;
427 	}
428 
429 	if (target->in_interrupt == new_in_interrupt) {
430 		return 0;
431 	}
432 
433 	if (target->set_interrupt_mode_in_progress) {
434 		SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore);
435 		return -EBUSY;
436 	}
437 	target->set_interrupt_mode_in_progress = true;
438 
439 	target->new_in_interrupt = new_in_interrupt;
440 	target->set_interrupt_mode_cb_fn = cb_fn;
441 	target->set_interrupt_mode_cb_arg = cb_arg;
442 
443 	SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n",
444 		      spdk_env_get_current_core(), lcore);
445 
446 	if (new_in_interrupt == false) {
447 		/* For potential race cases, when setting the reactor to poll mode,
448 		 * first change the mode of the reactor and then clear the corresponding
449 		 * bit of the notify_cpuset of each reactor.
450 		 */
451 		struct spdk_event *ev;
452 
453 		ev = spdk_event_allocate(lcore, _reactor_set_interrupt_mode, target, NULL);
454 		assert(ev);
455 		spdk_event_call(ev);
456 	} else {
457 		/* For race caces, when setting the reactor to interrupt mode, first set the
458 		 * corresponding bit of the notify_cpuset of each reactor and then change the mode.
459 		 */
460 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
461 	}
462 
463 	return 0;
464 }
465 
466 struct spdk_event *
467 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
468 {
469 	struct spdk_event *event = NULL;
470 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
471 
472 	if (!reactor) {
473 		assert(false);
474 		return NULL;
475 	}
476 
477 	event = spdk_mempool_get(g_spdk_event_mempool);
478 	if (event == NULL) {
479 		assert(false);
480 		return NULL;
481 	}
482 
483 	event->lcore = lcore;
484 	event->fn = fn;
485 	event->arg1 = arg1;
486 	event->arg2 = arg2;
487 
488 	return event;
489 }
490 
491 void
492 spdk_event_call(struct spdk_event *event)
493 {
494 	int rc;
495 	struct spdk_reactor *reactor;
496 	struct spdk_reactor *local_reactor = NULL;
497 	uint32_t current_core = spdk_env_get_current_core();
498 
499 	reactor = spdk_reactor_get(event->lcore);
500 
501 	assert(reactor != NULL);
502 	assert(reactor->events != NULL);
503 
504 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
505 	if (rc != 1) {
506 		assert(false);
507 	}
508 
509 	if (current_core != SPDK_ENV_LCORE_ID_ANY) {
510 		local_reactor = spdk_reactor_get(current_core);
511 	}
512 
513 	/* If spdk_event_call isn't called on a reactor, always send a notification.
514 	 * If it is called on a reactor, send a notification if the destination reactor
515 	 * is indicated in interrupt mode state.
516 	 */
517 	if (spdk_unlikely(local_reactor == NULL) ||
518 	    spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
519 		uint64_t notify = 1;
520 
521 		rc = write(reactor->events_fd, &notify, sizeof(notify));
522 		if (rc < 0) {
523 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
524 		}
525 	}
526 }
527 
528 static inline uint32_t
529 event_queue_run_batch(struct spdk_reactor *reactor)
530 {
531 	unsigned count, i;
532 	void *events[SPDK_EVENT_BATCH_SIZE];
533 	struct spdk_thread *thread;
534 	struct spdk_lw_thread *lw_thread;
535 
536 #ifdef DEBUG
537 	/*
538 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
539 	 * so we will never actually read uninitialized data from events, but just to be sure
540 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
541 	 */
542 	memset(events, 0, sizeof(events));
543 #endif
544 
545 	/* Operate event notification if this reactor currently runs in interrupt state */
546 	if (spdk_unlikely(reactor->in_interrupt)) {
547 		uint64_t notify = 1;
548 		int rc;
549 
550 		/* There may be race between event_acknowledge and another producer's event_notify,
551 		 * so event_acknowledge should be applied ahead. And then check for self's event_notify.
552 		 * This can avoid event notification missing.
553 		 */
554 		rc = read(reactor->events_fd, &notify, sizeof(notify));
555 		if (rc < 0) {
556 			SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno));
557 			return -errno;
558 		}
559 
560 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
561 
562 		if (spdk_ring_count(reactor->events) != 0) {
563 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
564 			rc = write(reactor->events_fd, &notify, sizeof(notify));
565 			if (rc < 0) {
566 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
567 				return -errno;
568 			}
569 		}
570 	} else {
571 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
572 	}
573 
574 	if (count == 0) {
575 		return 0;
576 	}
577 
578 	/* Execute the events. There are still some remaining events
579 	 * that must occur on an SPDK thread. To accomodate those, try to
580 	 * run them on the first thread in the list, if it exists. */
581 	lw_thread = TAILQ_FIRST(&reactor->threads);
582 	if (lw_thread) {
583 		thread = spdk_thread_get_from_ctx(lw_thread);
584 	} else {
585 		thread = NULL;
586 	}
587 
588 	spdk_set_thread(thread);
589 
590 	for (i = 0; i < count; i++) {
591 		struct spdk_event *event = events[i];
592 
593 		assert(event != NULL);
594 		event->fn(event->arg1, event->arg2);
595 	}
596 
597 	spdk_set_thread(NULL);
598 
599 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
600 
601 	return count;
602 }
603 
604 /* 1s */
605 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
606 
607 static int
608 get_rusage(struct spdk_reactor *reactor)
609 {
610 	struct rusage		rusage;
611 
612 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
613 		return -1;
614 	}
615 
616 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
617 		SPDK_INFOLOG(reactor,
618 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
619 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
620 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
621 	}
622 	reactor->rusage = rusage;
623 
624 	return -1;
625 }
626 
627 void
628 spdk_framework_enable_context_switch_monitor(bool enable)
629 {
630 	/* This global is being read by multiple threads, so this isn't
631 	 * strictly thread safe. However, we're toggling between true and
632 	 * false here, and if a thread sees the value update later than it
633 	 * should, it's no big deal. */
634 	g_framework_context_switch_monitor_enabled = enable;
635 }
636 
637 bool
638 spdk_framework_context_switch_monitor_enabled(void)
639 {
640 	return g_framework_context_switch_monitor_enabled;
641 }
642 
643 static void
644 _set_thread_name(const char *thread_name)
645 {
646 #if defined(__linux__)
647 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
648 #elif defined(__FreeBSD__)
649 	pthread_set_name_np(pthread_self(), thread_name);
650 #else
651 	pthread_setname_np(pthread_self(), thread_name);
652 #endif
653 }
654 
655 static void
656 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
657 {
658 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
659 
660 	lw_thread->lcore = reactor->lcore;
661 
662 	spdk_set_thread(thread);
663 	spdk_thread_get_stats(&lw_thread->current_stats);
664 }
665 
666 static void
667 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
668 {
669 	struct spdk_scheduler_core_info *core;
670 	struct spdk_lw_thread *lw_thread;
671 	uint32_t i, j;
672 
673 	SPDK_ENV_FOREACH_CORE(i) {
674 		core = &cores_info[i];
675 		for (j = 0; j < core->threads_count; j++) {
676 			lw_thread = core->threads[j];
677 			if (lw_thread->lcore != lw_thread->new_lcore) {
678 				_spdk_lw_thread_set_core(lw_thread, lw_thread->new_lcore);
679 			}
680 		}
681 	}
682 }
683 
684 static void
685 _reactors_scheduler_fini(void)
686 {
687 	struct spdk_reactor *reactor;
688 	uint32_t i;
689 
690 	/* Reschedule based on the balancing output */
691 	_threads_reschedule(g_core_infos);
692 
693 	SPDK_ENV_FOREACH_CORE(i) {
694 		reactor = spdk_reactor_get(i);
695 		assert(reactor != NULL);
696 		reactor->flags.is_scheduling = false;
697 	}
698 }
699 
700 static void
701 _reactors_scheduler_update_core_mode(void *ctx)
702 {
703 	struct spdk_reactor *reactor;
704 	int rc = 0;
705 
706 	if (g_scheduler_core_number == SPDK_ENV_LCORE_ID_ANY) {
707 		g_scheduler_core_number = spdk_env_get_first_core();
708 	} else {
709 		g_scheduler_core_number = spdk_env_get_next_core(g_scheduler_core_number);
710 	}
711 
712 	if (g_scheduler_core_number == SPDK_ENV_LCORE_ID_ANY) {
713 		_reactors_scheduler_fini();
714 		return;
715 	}
716 
717 	reactor = spdk_reactor_get(g_scheduler_core_number);
718 	if (reactor->in_interrupt != g_core_infos[g_scheduler_core_number].interrupt_mode) {
719 		/* Switch next found reactor to new state */
720 		rc = spdk_reactor_set_interrupt_mode(g_scheduler_core_number,
721 						     g_core_infos[g_scheduler_core_number].interrupt_mode, _reactors_scheduler_update_core_mode, NULL);
722 		if (rc == 0) {
723 			return;
724 		}
725 	}
726 
727 	_reactors_scheduler_update_core_mode(NULL);
728 }
729 
730 static void
731 _reactors_scheduler_balance(void *arg1, void *arg2)
732 {
733 	if (g_reactor_state == SPDK_REACTOR_STATE_RUNNING) {
734 		g_scheduler->balance(g_core_infos, g_reactor_count, &g_governor);
735 
736 		g_scheduler_core_number = SPDK_ENV_LCORE_ID_ANY;
737 		_reactors_scheduler_update_core_mode(NULL);
738 	}
739 }
740 
741 static void
742 _reactors_scheduler_cancel(void *arg1, void *arg2)
743 {
744 	struct spdk_reactor *reactor;
745 	uint32_t i;
746 
747 	SPDK_ENV_FOREACH_CORE(i) {
748 		reactor = spdk_reactor_get(i);
749 		assert(reactor != NULL);
750 		reactor->flags.is_scheduling = false;
751 	}
752 }
753 
754 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
755 static void
756 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
757 {
758 	struct spdk_scheduler_core_info *core_info;
759 	struct spdk_lw_thread *lw_thread;
760 	struct spdk_reactor *reactor;
761 	struct spdk_event *evt;
762 	uint32_t next_core;
763 	uint32_t i;
764 
765 	reactor = spdk_reactor_get(spdk_env_get_current_core());
766 	assert(reactor != NULL);
767 	reactor->flags.is_scheduling = true;
768 	core_info = &g_core_infos[reactor->lcore];
769 	core_info->lcore = reactor->lcore;
770 	core_info->core_idle_tsc = reactor->idle_tsc;
771 	core_info->core_busy_tsc = reactor->busy_tsc;
772 	core_info->interrupt_mode = reactor->in_interrupt;
773 
774 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
775 
776 	free(core_info->threads);
777 	core_info->threads = NULL;
778 
779 	i = 0;
780 
781 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
782 		_init_thread_stats(reactor, lw_thread);
783 		i++;
784 	}
785 
786 	core_info->threads_count = i;
787 
788 	if (core_info->threads_count > 0) {
789 		core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *));
790 		if (core_info->threads == NULL) {
791 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
792 
793 			/* Cancel this round of schedule work */
794 			evt = spdk_event_allocate(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL);
795 			spdk_event_call(evt);
796 			return;
797 		}
798 
799 		i = 0;
800 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
801 			core_info->threads[i] = lw_thread;
802 			_spdk_lw_thread_get_current_stats(lw_thread, &lw_thread->snapshot_stats);
803 			i++;
804 		}
805 	}
806 
807 	next_core = spdk_env_get_next_core(reactor->lcore);
808 	if (next_core == UINT32_MAX) {
809 		next_core = spdk_env_get_first_core();
810 	}
811 
812 	/* If we've looped back around to the scheduler thread, move to the next phase */
813 	if (next_core == g_scheduling_reactor->lcore) {
814 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
815 		evt = spdk_event_allocate(next_core, _reactors_scheduler_balance, NULL, NULL);
816 		spdk_event_call(evt);
817 		return;
818 	}
819 
820 	evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
821 	spdk_event_call(evt);
822 }
823 
824 static int _reactor_schedule_thread(struct spdk_thread *thread);
825 static uint64_t g_rusage_period;
826 
827 static void
828 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
829 {
830 	struct spdk_thread	*thread = spdk_thread_get_from_ctx(lw_thread);
831 	int efd;
832 
833 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
834 	assert(reactor->thread_count > 0);
835 	reactor->thread_count--;
836 
837 	/* Operate thread intr if running with full interrupt ability */
838 	if (spdk_interrupt_mode_is_enabled()) {
839 		efd = spdk_thread_get_interrupt_fd(thread);
840 		spdk_fd_group_remove(reactor->fgrp, efd);
841 	}
842 }
843 
844 static bool
845 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
846 {
847 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
848 
849 	if (spdk_unlikely(lw_thread->resched)) {
850 		lw_thread->resched = false;
851 		_reactor_remove_lw_thread(reactor, lw_thread);
852 		_reactor_schedule_thread(thread);
853 		return true;
854 	}
855 
856 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
857 			  spdk_thread_is_idle(thread))) {
858 		if (reactor->flags.is_scheduling == false) {
859 			_reactor_remove_lw_thread(reactor, lw_thread);
860 			spdk_thread_destroy(thread);
861 			return true;
862 		}
863 	}
864 
865 	return false;
866 }
867 
868 static void
869 reactor_interrupt_run(struct spdk_reactor *reactor)
870 {
871 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
872 
873 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
874 }
875 
876 static void
877 _reactor_run(struct spdk_reactor *reactor)
878 {
879 	struct spdk_thread	*thread;
880 	struct spdk_lw_thread	*lw_thread, *tmp;
881 	uint64_t		now;
882 	int			rc;
883 
884 	event_queue_run_batch(reactor);
885 
886 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
887 		thread = spdk_thread_get_from_ctx(lw_thread);
888 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
889 
890 		now = spdk_thread_get_last_tsc(thread);
891 		if (rc == 0) {
892 			reactor->idle_tsc += now - reactor->tsc_last;
893 		} else if (rc > 0) {
894 			reactor->busy_tsc += now - reactor->tsc_last;
895 		}
896 		reactor->tsc_last = now;
897 
898 		reactor_post_process_lw_thread(reactor, lw_thread);
899 	}
900 }
901 
902 static int
903 reactor_run(void *arg)
904 {
905 	struct spdk_reactor	*reactor = arg;
906 	struct spdk_thread	*thread;
907 	struct spdk_lw_thread	*lw_thread, *tmp;
908 	char			thread_name[32];
909 	uint64_t		last_sched = 0;
910 
911 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
912 
913 	/* Rename the POSIX thread because the reactor is tied to the POSIX
914 	 * thread in the SPDK event library.
915 	 */
916 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
917 	_set_thread_name(thread_name);
918 
919 	reactor->tsc_last = spdk_get_ticks();
920 
921 	while (1) {
922 		/* Execute interrupt process fn if this reactor currently runs in interrupt state */
923 		if (spdk_unlikely(reactor->in_interrupt)) {
924 			reactor_interrupt_run(reactor);
925 		} else {
926 			_reactor_run(reactor);
927 		}
928 
929 		if (g_framework_context_switch_monitor_enabled) {
930 			if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
931 				get_rusage(reactor);
932 				reactor->last_rusage = reactor->tsc_last;
933 			}
934 		}
935 
936 		if (spdk_unlikely(g_scheduler_period > 0 &&
937 				  (reactor->tsc_last - last_sched) > g_scheduler_period &&
938 				  reactor == g_scheduling_reactor &&
939 				  !reactor->flags.is_scheduling)) {
940 			if (spdk_unlikely(g_scheduler != g_new_scheduler)) {
941 				if (g_scheduler->deinit != NULL) {
942 					g_scheduler->deinit(&g_governor);
943 				}
944 				g_scheduler = g_new_scheduler;
945 			}
946 
947 			if (spdk_unlikely(g_scheduler->balance != NULL)) {
948 				last_sched = reactor->tsc_last;
949 				_reactors_scheduler_gather_metrics(NULL, NULL);
950 			}
951 		}
952 
953 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
954 			break;
955 		}
956 	}
957 
958 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
959 		thread = spdk_thread_get_from_ctx(lw_thread);
960 		spdk_set_thread(thread);
961 		spdk_thread_exit(thread);
962 	}
963 
964 	while (!TAILQ_EMPTY(&reactor->threads)) {
965 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
966 			thread = spdk_thread_get_from_ctx(lw_thread);
967 			spdk_set_thread(thread);
968 			if (spdk_thread_is_exited(thread)) {
969 				_reactor_remove_lw_thread(reactor, lw_thread);
970 				spdk_thread_destroy(thread);
971 			} else {
972 				spdk_thread_poll(thread, 0, 0);
973 			}
974 		}
975 	}
976 
977 	return 0;
978 }
979 
980 int
981 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
982 {
983 	int ret;
984 	const struct spdk_cpuset *validmask;
985 
986 	ret = spdk_cpuset_parse(cpumask, mask);
987 	if (ret < 0) {
988 		return ret;
989 	}
990 
991 	validmask = spdk_app_get_core_mask();
992 	spdk_cpuset_and(cpumask, validmask);
993 
994 	return 0;
995 }
996 
997 const struct spdk_cpuset *
998 spdk_app_get_core_mask(void)
999 {
1000 	return &g_reactor_core_mask;
1001 }
1002 
1003 void
1004 spdk_reactors_start(void)
1005 {
1006 	struct spdk_reactor *reactor;
1007 	uint32_t i, current_core;
1008 	int rc;
1009 
1010 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
1011 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
1012 
1013 	current_core = spdk_env_get_current_core();
1014 	SPDK_ENV_FOREACH_CORE(i) {
1015 		if (i != current_core) {
1016 			reactor = spdk_reactor_get(i);
1017 			if (reactor == NULL) {
1018 				continue;
1019 			}
1020 
1021 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
1022 			if (rc < 0) {
1023 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
1024 				assert(false);
1025 				return;
1026 			}
1027 		}
1028 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
1029 	}
1030 
1031 	/* Start the main reactor */
1032 	reactor = spdk_reactor_get(current_core);
1033 	assert(reactor != NULL);
1034 	reactor_run(reactor);
1035 
1036 	spdk_env_thread_wait_all();
1037 
1038 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
1039 }
1040 
1041 void
1042 spdk_reactors_stop(void *arg1)
1043 {
1044 	uint32_t i;
1045 	int rc;
1046 	struct spdk_reactor *reactor;
1047 	struct spdk_reactor *local_reactor;
1048 	uint64_t notify = 1;
1049 
1050 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
1051 	local_reactor = spdk_reactor_get(spdk_env_get_current_core());
1052 
1053 	SPDK_ENV_FOREACH_CORE(i) {
1054 		/* If spdk_event_call isn't called  on a reactor, always send a notification.
1055 		 * If it is called on a reactor, send a notification if the destination reactor
1056 		 * is indicated in interrupt mode state.
1057 		 */
1058 		if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
1059 			reactor = spdk_reactor_get(i);
1060 			assert(reactor != NULL);
1061 			rc = write(reactor->events_fd, &notify, sizeof(notify));
1062 			if (rc < 0) {
1063 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
1064 				continue;
1065 			}
1066 		}
1067 	}
1068 }
1069 
1070 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
1071 static uint32_t g_next_core = UINT32_MAX;
1072 
1073 static int
1074 thread_process_interrupts(void *arg)
1075 {
1076 	struct spdk_thread *thread = arg;
1077 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
1078 	uint64_t now;
1079 	int rc;
1080 
1081 	/* Update idle_tsc between the end of last intr_fn and the start of this intr_fn. */
1082 	now = spdk_get_ticks();
1083 	reactor->idle_tsc += now - reactor->tsc_last;
1084 	reactor->tsc_last = now;
1085 
1086 	rc = spdk_thread_poll(thread, 0, now);
1087 
1088 	/* Update tsc between the start and the end of this intr_fn. */
1089 	now = spdk_thread_get_last_tsc(thread);
1090 	if (rc == 0) {
1091 		reactor->idle_tsc += now - reactor->tsc_last;
1092 	} else if (rc > 0) {
1093 		reactor->busy_tsc += now - reactor->tsc_last;
1094 	}
1095 	reactor->tsc_last = now;
1096 
1097 	return rc;
1098 }
1099 
1100 static void
1101 _schedule_thread(void *arg1, void *arg2)
1102 {
1103 	struct spdk_lw_thread *lw_thread = arg1;
1104 	struct spdk_reactor *reactor;
1105 	uint32_t current_core;
1106 	int efd;
1107 
1108 	current_core = spdk_env_get_current_core();
1109 	reactor = spdk_reactor_get(current_core);
1110 	assert(reactor != NULL);
1111 
1112 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
1113 	reactor->thread_count++;
1114 
1115 	/* Operate thread intr if running with full interrupt ability */
1116 	if (spdk_interrupt_mode_is_enabled()) {
1117 		int rc;
1118 		struct spdk_thread *thread;
1119 
1120 		thread = spdk_thread_get_from_ctx(lw_thread);
1121 		efd = spdk_thread_get_interrupt_fd(thread);
1122 		rc = spdk_fd_group_add(reactor->fgrp, efd, thread_process_interrupts, thread);
1123 		if (rc < 0) {
1124 			SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
1125 		}
1126 	}
1127 }
1128 
1129 static int
1130 _reactor_schedule_thread(struct spdk_thread *thread)
1131 {
1132 	uint32_t core;
1133 	struct spdk_lw_thread *lw_thread;
1134 	struct spdk_thread_stats last_stats;
1135 	struct spdk_event *evt = NULL;
1136 	struct spdk_cpuset *cpumask;
1137 	uint32_t i;
1138 	struct spdk_reactor *local_reactor = NULL;
1139 	uint32_t current_lcore = spdk_env_get_current_core();
1140 	struct spdk_cpuset polling_cpumask;
1141 	struct spdk_cpuset valid_cpumask;
1142 
1143 	cpumask = spdk_thread_get_cpumask(thread);
1144 
1145 	lw_thread = spdk_thread_get_ctx(thread);
1146 	assert(lw_thread != NULL);
1147 	core = lw_thread->lcore;
1148 	last_stats = lw_thread->last_stats;
1149 	memset(lw_thread, 0, sizeof(*lw_thread));
1150 	lw_thread->last_stats = last_stats;
1151 
1152 	if (current_lcore != SPDK_ENV_LCORE_ID_ANY) {
1153 		local_reactor = spdk_reactor_get(current_lcore);
1154 		assert(local_reactor);
1155 	}
1156 
1157 	/* When interrupt ability of spdk_thread is not enabled and the current
1158 	 * reactor runs on DPDK thread, skip reactors which are in interrupt mode.
1159 	 */
1160 	if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) {
1161 		/* Get the cpumask of all reactors in polling */
1162 		spdk_cpuset_zero(&polling_cpumask);
1163 		SPDK_ENV_FOREACH_CORE(i) {
1164 			spdk_cpuset_set_cpu(&polling_cpumask, i, true);
1165 		}
1166 		spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset);
1167 
1168 		if (core == SPDK_ENV_LCORE_ID_ANY) {
1169 			/* Get the cpumask of all valid reactors which are suggested and also in polling */
1170 			spdk_cpuset_copy(&valid_cpumask, &polling_cpumask);
1171 			spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread));
1172 
1173 			/* If there are any valid reactors, spdk_thread should be scheduled
1174 			 * into one of the valid reactors.
1175 			 * If there is no valid reactors, spdk_thread should be scheduled
1176 			 * into one of the polling reactors.
1177 			 */
1178 			if (spdk_cpuset_count(&valid_cpumask) != 0) {
1179 				cpumask = &valid_cpumask;
1180 			} else {
1181 				cpumask = &polling_cpumask;
1182 			}
1183 		} else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) {
1184 			/* If specified reactor is not in polling, spdk_thread should be scheduled
1185 			 * into one of the polling reactors.
1186 			 */
1187 			core = SPDK_ENV_LCORE_ID_ANY;
1188 			cpumask = &polling_cpumask;
1189 		}
1190 	}
1191 
1192 	pthread_mutex_lock(&g_scheduler_mtx);
1193 	if (core == SPDK_ENV_LCORE_ID_ANY) {
1194 		for (i = 0; i < spdk_env_get_core_count(); i++) {
1195 			if (g_next_core >= g_reactor_count) {
1196 				g_next_core = spdk_env_get_first_core();
1197 			}
1198 			core = g_next_core;
1199 			g_next_core = spdk_env_get_next_core(g_next_core);
1200 
1201 			if (spdk_cpuset_get_cpu(cpumask, core)) {
1202 				break;
1203 			}
1204 		}
1205 	}
1206 
1207 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
1208 
1209 	pthread_mutex_unlock(&g_scheduler_mtx);
1210 
1211 	assert(evt != NULL);
1212 	if (evt == NULL) {
1213 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
1214 		return -1;
1215 	}
1216 
1217 	lw_thread->tsc_start = spdk_get_ticks();
1218 
1219 	spdk_event_call(evt);
1220 
1221 	return 0;
1222 }
1223 
1224 static void
1225 _reactor_request_thread_reschedule(struct spdk_thread *thread)
1226 {
1227 	struct spdk_lw_thread *lw_thread;
1228 	struct spdk_reactor *reactor;
1229 	uint32_t current_core;
1230 
1231 	assert(thread == spdk_get_thread());
1232 
1233 	lw_thread = spdk_thread_get_ctx(thread);
1234 
1235 	_spdk_lw_thread_set_core(lw_thread, SPDK_ENV_LCORE_ID_ANY);
1236 
1237 	current_core = spdk_env_get_current_core();
1238 	reactor = spdk_reactor_get(current_core);
1239 	assert(reactor != NULL);
1240 
1241 	/* Send a notification if the destination reactor is indicated in intr mode state */
1242 	if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
1243 		uint64_t notify = 1;
1244 
1245 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1246 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
1247 		}
1248 	}
1249 }
1250 
1251 static int
1252 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
1253 {
1254 	struct spdk_lw_thread *lw_thread;
1255 
1256 	switch (op) {
1257 	case SPDK_THREAD_OP_NEW:
1258 		lw_thread = spdk_thread_get_ctx(thread);
1259 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1260 		return _reactor_schedule_thread(thread);
1261 	case SPDK_THREAD_OP_RESCHED:
1262 		_reactor_request_thread_reschedule(thread);
1263 		return 0;
1264 	default:
1265 		return -ENOTSUP;
1266 	}
1267 }
1268 
1269 static bool
1270 reactor_thread_op_supported(enum spdk_thread_op op)
1271 {
1272 	switch (op) {
1273 	case SPDK_THREAD_OP_NEW:
1274 	case SPDK_THREAD_OP_RESCHED:
1275 		return true;
1276 	default:
1277 		return false;
1278 	}
1279 }
1280 
1281 struct call_reactor {
1282 	uint32_t cur_core;
1283 	spdk_event_fn fn;
1284 	void *arg1;
1285 	void *arg2;
1286 
1287 	uint32_t orig_core;
1288 	spdk_event_fn cpl;
1289 };
1290 
1291 static void
1292 on_reactor(void *arg1, void *arg2)
1293 {
1294 	struct call_reactor *cr = arg1;
1295 	struct spdk_event *evt;
1296 
1297 	cr->fn(cr->arg1, cr->arg2);
1298 
1299 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1300 
1301 	if (cr->cur_core >= g_reactor_count) {
1302 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1303 
1304 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1305 		free(cr);
1306 	} else {
1307 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1308 			      cr->cur_core);
1309 
1310 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1311 	}
1312 	assert(evt != NULL);
1313 	spdk_event_call(evt);
1314 }
1315 
1316 void
1317 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1318 {
1319 	struct call_reactor *cr;
1320 	struct spdk_event *evt;
1321 
1322 	cr = calloc(1, sizeof(*cr));
1323 	if (!cr) {
1324 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1325 		cpl(arg1, arg2);
1326 		return;
1327 	}
1328 
1329 	cr->fn = fn;
1330 	cr->arg1 = arg1;
1331 	cr->arg2 = arg2;
1332 	cr->cpl = cpl;
1333 	cr->orig_core = spdk_env_get_current_core();
1334 	cr->cur_core = spdk_env_get_first_core();
1335 
1336 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1337 
1338 	evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL);
1339 	assert(evt != NULL);
1340 
1341 	spdk_event_call(evt);
1342 }
1343 
1344 #ifdef __linux__
1345 static int
1346 reactor_schedule_thread_event(void *arg)
1347 {
1348 	struct spdk_reactor *reactor = arg;
1349 	struct spdk_lw_thread *lw_thread, *tmp;
1350 	uint32_t count = 0;
1351 	uint64_t notify = 1;
1352 
1353 	assert(reactor->in_interrupt);
1354 
1355 	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1356 		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
1357 		return -errno;
1358 	}
1359 
1360 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1361 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1362 	}
1363 
1364 	return count;
1365 }
1366 
1367 static int
1368 reactor_interrupt_init(struct spdk_reactor *reactor)
1369 {
1370 	int rc;
1371 
1372 	rc = spdk_fd_group_create(&reactor->fgrp);
1373 	if (rc != 0) {
1374 		return rc;
1375 	}
1376 
1377 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1378 	if (reactor->resched_fd < 0) {
1379 		rc = -EBADF;
1380 		goto err;
1381 	}
1382 
1383 	rc = spdk_fd_group_add(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event,
1384 			       reactor);
1385 	if (rc) {
1386 		close(reactor->resched_fd);
1387 		goto err;
1388 	}
1389 
1390 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1391 	if (reactor->events_fd < 0) {
1392 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1393 		close(reactor->resched_fd);
1394 
1395 		rc = -EBADF;
1396 		goto err;
1397 	}
1398 
1399 	rc = spdk_fd_group_add(reactor->fgrp, reactor->events_fd,
1400 			       (spdk_fd_fn)event_queue_run_batch, reactor);
1401 	if (rc) {
1402 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1403 		close(reactor->resched_fd);
1404 		close(reactor->events_fd);
1405 		goto err;
1406 	}
1407 
1408 	return 0;
1409 
1410 err:
1411 	spdk_fd_group_destroy(reactor->fgrp);
1412 	reactor->fgrp = NULL;
1413 	return rc;
1414 }
1415 #else
1416 static int
1417 reactor_interrupt_init(struct spdk_reactor *reactor)
1418 {
1419 	return -ENOTSUP;
1420 }
1421 #endif
1422 
1423 static void
1424 reactor_interrupt_fini(struct spdk_reactor *reactor)
1425 {
1426 	struct spdk_fd_group *fgrp = reactor->fgrp;
1427 
1428 	if (!fgrp) {
1429 		return;
1430 	}
1431 
1432 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1433 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1434 
1435 	close(reactor->events_fd);
1436 	close(reactor->resched_fd);
1437 
1438 	spdk_fd_group_destroy(fgrp);
1439 	reactor->fgrp = NULL;
1440 }
1441 
1442 void
1443 _spdk_lw_thread_set_core(struct spdk_lw_thread *thread, uint32_t lcore)
1444 {
1445 	assert(thread != NULL);
1446 	thread->lcore = lcore;
1447 	thread->resched = true;
1448 }
1449 
1450 void
1451 _spdk_lw_thread_get_current_stats(struct spdk_lw_thread *thread, struct spdk_thread_stats *stats)
1452 {
1453 	assert(thread != NULL);
1454 	*stats = thread->current_stats;
1455 }
1456 
1457 static int
1458 _governor_get_capabilities(uint32_t lcore_id, struct spdk_governor_capabilities *capabilities)
1459 {
1460 	capabilities->freq_change = false;
1461 	capabilities->freq_getset = false;
1462 	capabilities->freq_up = false;
1463 	capabilities->freq_down = false;
1464 	capabilities->freq_max = false;
1465 	capabilities->freq_min = false;
1466 	capabilities->turbo_set = false;
1467 	capabilities->priority = false;
1468 	capabilities->turbo_available = false;
1469 
1470 	return 0;
1471 }
1472 
1473 static struct spdk_governor *
1474 _governor_find(char *name)
1475 {
1476 	struct spdk_governor *governor, *tmp;
1477 
1478 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1479 		if (strcmp(name, governor->name) == 0) {
1480 			return governor;
1481 		}
1482 	}
1483 
1484 	return NULL;
1485 }
1486 
1487 int
1488 _spdk_governor_set(char *name)
1489 {
1490 	struct spdk_governor *governor;
1491 	uint32_t i;
1492 	int rc;
1493 
1494 	governor = _governor_find(name);
1495 	if (governor == NULL) {
1496 		return -EINVAL;
1497 	}
1498 
1499 	g_governor = *governor;
1500 
1501 	if (g_governor.init) {
1502 		rc = g_governor.init();
1503 		if (rc != 0) {
1504 			return rc;
1505 		}
1506 	}
1507 
1508 	SPDK_ENV_FOREACH_CORE(i) {
1509 		if (g_governor.init_core) {
1510 			rc = g_governor.init_core(i);
1511 			if (rc != 0) {
1512 				return rc;
1513 			}
1514 		}
1515 	}
1516 
1517 	return 0;
1518 }
1519 
1520 struct spdk_governor *
1521 _spdk_governor_get(void)
1522 {
1523 	return &g_governor;
1524 }
1525 
1526 void
1527 _spdk_governor_list_add(struct spdk_governor *governor)
1528 {
1529 	if (_governor_find(governor->name)) {
1530 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1531 		assert(false);
1532 		return;
1533 	}
1534 
1535 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1536 }
1537 
1538 SPDK_LOG_REGISTER_COMPONENT(reactor)
1539