xref: /spdk/lib/event/reactor.c (revision cc6920a4763d4b9a43aa40583c8397d8f14fa100)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/likely.h"
36 
37 #include "spdk_internal/event.h"
38 
39 #include "spdk/log.h"
40 #include "spdk/thread.h"
41 #include "spdk/env.h"
42 #include "spdk/util.h"
43 #include "spdk/scheduler.h"
44 #include "spdk/string.h"
45 #include "spdk/fd_group.h"
46 
47 #ifdef __linux__
48 #include <sys/prctl.h>
49 #include <sys/eventfd.h>
50 #endif
51 
52 #ifdef __FreeBSD__
53 #include <pthread_np.h>
54 #endif
55 
56 #define SPDK_EVENT_BATCH_SIZE		8
57 
58 static struct spdk_reactor *g_reactors;
59 static uint32_t g_reactor_count;
60 static struct spdk_cpuset g_reactor_core_mask;
61 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
62 
63 static bool g_framework_context_switch_monitor_enabled = true;
64 
65 static struct spdk_mempool *g_spdk_event_mempool = NULL;
66 
67 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
68 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
69 
70 static struct spdk_scheduler *g_scheduler = NULL;
71 static struct spdk_reactor *g_scheduling_reactor;
72 bool g_scheduling_in_progress = false;
73 static uint64_t g_scheduler_period = 0;
74 static uint32_t g_scheduler_core_number;
75 static struct spdk_scheduler_core_info *g_core_infos = NULL;
76 
77 TAILQ_HEAD(, spdk_governor) g_governor_list
78 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
79 
80 static struct spdk_governor *g_governor = NULL;
81 
82 static int reactor_interrupt_init(struct spdk_reactor *reactor);
83 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
84 
85 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER;
86 static bool g_stopping_reactors = false;
87 
88 static struct spdk_scheduler *
89 _scheduler_find(const char *name)
90 {
91 	struct spdk_scheduler *tmp;
92 
93 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
94 		if (strcmp(name, tmp->name) == 0) {
95 			return tmp;
96 		}
97 	}
98 
99 	return NULL;
100 }
101 
102 int
103 spdk_scheduler_set(const char *name)
104 {
105 	struct spdk_scheduler *scheduler;
106 	int rc = 0;
107 
108 	/* NULL scheduler was specifically requested */
109 	if (name == NULL) {
110 		if (g_scheduler) {
111 			g_scheduler->deinit();
112 		}
113 		g_scheduler = NULL;
114 		return 0;
115 	}
116 
117 	scheduler = _scheduler_find(name);
118 	if (scheduler == NULL) {
119 		SPDK_ERRLOG("Requested scheduler is missing\n");
120 		return -EINVAL;
121 	}
122 
123 	if (g_scheduler == scheduler) {
124 		return 0;
125 	}
126 
127 	rc = scheduler->init();
128 	if (rc == 0) {
129 		if (g_scheduler) {
130 			g_scheduler->deinit();
131 		}
132 		g_scheduler = scheduler;
133 	}
134 
135 	return rc;
136 }
137 
138 struct spdk_scheduler *
139 spdk_scheduler_get(void)
140 {
141 	return g_scheduler;
142 }
143 
144 uint64_t
145 spdk_scheduler_get_period(void)
146 {
147 	/* Convert from ticks to microseconds */
148 	return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
149 }
150 
151 void
152 spdk_scheduler_set_period(uint64_t period)
153 {
154 	/* Convert microseconds to ticks */
155 	g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
156 }
157 
158 void
159 spdk_scheduler_register(struct spdk_scheduler *scheduler)
160 {
161 	if (_scheduler_find(scheduler->name)) {
162 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
163 		assert(false);
164 		return;
165 	}
166 
167 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
168 }
169 
170 static void
171 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
172 {
173 	reactor->lcore = lcore;
174 	reactor->flags.is_valid = true;
175 
176 	TAILQ_INIT(&reactor->threads);
177 	reactor->thread_count = 0;
178 	spdk_cpuset_zero(&reactor->notify_cpuset);
179 
180 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
181 	if (reactor->events == NULL) {
182 		SPDK_ERRLOG("Failed to allocate events ring\n");
183 		assert(false);
184 	}
185 
186 	/* Always initialize interrupt facilities for reactor */
187 	if (reactor_interrupt_init(reactor) != 0) {
188 		/* Reactor interrupt facilities are necessary if seting app to interrupt mode. */
189 		if (spdk_interrupt_mode_is_enabled()) {
190 			SPDK_ERRLOG("Failed to prepare intr facilities\n");
191 			assert(false);
192 		}
193 		return;
194 	}
195 
196 	/* If application runs with full interrupt ability,
197 	 * all reactors are going to run in interrupt mode.
198 	 */
199 	if (spdk_interrupt_mode_is_enabled()) {
200 		uint32_t i;
201 
202 		SPDK_ENV_FOREACH_CORE(i) {
203 			spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
204 		}
205 		reactor->in_interrupt = true;
206 	}
207 }
208 
209 struct spdk_reactor *
210 spdk_reactor_get(uint32_t lcore)
211 {
212 	struct spdk_reactor *reactor;
213 
214 	if (g_reactors == NULL) {
215 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
216 		return NULL;
217 	}
218 
219 	if (lcore >= g_reactor_count) {
220 		return NULL;
221 	}
222 
223 	reactor = &g_reactors[lcore];
224 
225 	if (reactor->flags.is_valid == false) {
226 		return NULL;
227 	}
228 
229 	return reactor;
230 }
231 
232 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
233 static bool reactor_thread_op_supported(enum spdk_thread_op op);
234 
235 int
236 spdk_reactors_init(void)
237 {
238 	struct spdk_reactor *reactor;
239 	int rc;
240 	uint32_t i, current_core;
241 	char mempool_name[32];
242 
243 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
244 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
245 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
246 			       sizeof(struct spdk_event),
247 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
248 			       SPDK_ENV_SOCKET_ID_ANY);
249 
250 	if (g_spdk_event_mempool == NULL) {
251 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
252 		return -1;
253 	}
254 
255 	/* struct spdk_reactor must be aligned on 64 byte boundary */
256 	g_reactor_count = spdk_env_get_last_core() + 1;
257 	rc = posix_memalign((void **)&g_reactors, 64,
258 			    g_reactor_count * sizeof(struct spdk_reactor));
259 	if (rc != 0) {
260 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
261 			    g_reactor_count);
262 		spdk_mempool_free(g_spdk_event_mempool);
263 		return -1;
264 	}
265 
266 	g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos));
267 	if (g_core_infos == NULL) {
268 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
269 		spdk_mempool_free(g_spdk_event_mempool);
270 		free(g_reactors);
271 		return -ENOMEM;
272 	}
273 
274 	memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor));
275 
276 	spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
277 				 sizeof(struct spdk_lw_thread));
278 
279 	SPDK_ENV_FOREACH_CORE(i) {
280 		reactor_construct(&g_reactors[i], i);
281 	}
282 
283 	current_core = spdk_env_get_current_core();
284 	reactor = spdk_reactor_get(current_core);
285 	assert(reactor != NULL);
286 	g_scheduling_reactor = reactor;
287 
288 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
289 
290 	return 0;
291 }
292 
293 void
294 spdk_reactors_fini(void)
295 {
296 	uint32_t i;
297 	struct spdk_reactor *reactor;
298 
299 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
300 		return;
301 	}
302 
303 	spdk_thread_lib_fini();
304 
305 	SPDK_ENV_FOREACH_CORE(i) {
306 		reactor = spdk_reactor_get(i);
307 		assert(reactor != NULL);
308 		assert(reactor->thread_count == 0);
309 		if (reactor->events != NULL) {
310 			spdk_ring_free(reactor->events);
311 		}
312 
313 		reactor_interrupt_fini(reactor);
314 
315 		if (g_core_infos != NULL) {
316 			free(g_core_infos[i].thread_infos);
317 		}
318 	}
319 
320 	spdk_mempool_free(g_spdk_event_mempool);
321 
322 	free(g_reactors);
323 	g_reactors = NULL;
324 	free(g_core_infos);
325 	g_core_infos = NULL;
326 }
327 
328 static void _reactor_set_interrupt_mode(void *arg1, void *arg2);
329 
330 static void
331 _reactor_set_notify_cpuset(void *arg1, void *arg2)
332 {
333 	struct spdk_reactor *target = arg1;
334 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
335 
336 	assert(reactor != NULL);
337 	spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt);
338 }
339 
340 static void
341 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2)
342 {
343 	struct spdk_reactor *target = arg1;
344 
345 	if (target->new_in_interrupt == false) {
346 		target->set_interrupt_mode_in_progress = false;
347 		spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn,
348 				     target->set_interrupt_mode_cb_arg);
349 	} else {
350 		struct spdk_event *ev;
351 
352 		ev = spdk_event_allocate(target->lcore, _reactor_set_interrupt_mode, target, NULL);
353 		assert(ev);
354 		spdk_event_call(ev);
355 	}
356 }
357 
358 static void
359 _reactor_set_thread_interrupt_mode(void *ctx)
360 {
361 	struct spdk_reactor *reactor = ctx;
362 
363 	spdk_thread_set_interrupt_mode(reactor->in_interrupt);
364 }
365 
366 static void
367 _reactor_set_interrupt_mode(void *arg1, void *arg2)
368 {
369 	struct spdk_reactor *target = arg1;
370 	struct spdk_thread *thread;
371 	struct spdk_lw_thread *lw_thread, *tmp;
372 
373 	assert(target == spdk_reactor_get(spdk_env_get_current_core()));
374 	assert(target != NULL);
375 	assert(target->in_interrupt != target->new_in_interrupt);
376 	SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n",
377 		      target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll");
378 
379 	target->in_interrupt = target->new_in_interrupt;
380 
381 	/* Align spdk_thread with reactor to interrupt mode or poll mode */
382 	TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) {
383 		thread = spdk_thread_get_from_ctx(lw_thread);
384 		spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target);
385 	}
386 
387 	if (target->new_in_interrupt == false) {
388 		/* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately
389 		 * track reactor stats. */
390 		target->tsc_last = spdk_get_ticks();
391 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
392 	} else {
393 		uint64_t notify = 1;
394 		int rc = 0;
395 
396 		/* Always trigger spdk_event and resched event in case of race condition */
397 		rc = write(target->events_fd, &notify, sizeof(notify));
398 		if (rc < 0) {
399 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
400 		}
401 		rc = write(target->resched_fd, &notify, sizeof(notify));
402 		if (rc < 0) {
403 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
404 		}
405 
406 		target->set_interrupt_mode_in_progress = false;
407 		spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn,
408 				     target->set_interrupt_mode_cb_arg);
409 	}
410 }
411 
412 int
413 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt,
414 				spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg)
415 {
416 	struct spdk_reactor *target;
417 
418 	target = spdk_reactor_get(lcore);
419 	if (target == NULL) {
420 		return -EINVAL;
421 	}
422 
423 	/* Eventfd has to be supported in order to use interrupt functionality. */
424 	if (target->fgrp == NULL) {
425 		return -ENOTSUP;
426 	}
427 
428 	if (spdk_get_thread() != _spdk_get_app_thread()) {
429 		SPDK_ERRLOG("It is only permitted within spdk application thread.\n");
430 		return -EPERM;
431 	}
432 
433 	if (target->in_interrupt == new_in_interrupt) {
434 		cb_fn(cb_arg);
435 		return 0;
436 	}
437 
438 	if (target->set_interrupt_mode_in_progress) {
439 		SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore);
440 		return -EBUSY;
441 	}
442 	target->set_interrupt_mode_in_progress = true;
443 
444 	target->new_in_interrupt = new_in_interrupt;
445 	target->set_interrupt_mode_cb_fn = cb_fn;
446 	target->set_interrupt_mode_cb_arg = cb_arg;
447 
448 	SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n",
449 		      spdk_env_get_current_core(), lcore);
450 
451 	if (new_in_interrupt == false) {
452 		/* For potential race cases, when setting the reactor to poll mode,
453 		 * first change the mode of the reactor and then clear the corresponding
454 		 * bit of the notify_cpuset of each reactor.
455 		 */
456 		struct spdk_event *ev;
457 
458 		ev = spdk_event_allocate(lcore, _reactor_set_interrupt_mode, target, NULL);
459 		assert(ev);
460 		spdk_event_call(ev);
461 	} else {
462 		/* For race cases, when setting the reactor to interrupt mode, first set the
463 		 * corresponding bit of the notify_cpuset of each reactor and then change the mode.
464 		 */
465 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
466 	}
467 
468 	return 0;
469 }
470 
471 struct spdk_event *
472 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
473 {
474 	struct spdk_event *event = NULL;
475 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
476 
477 	if (!reactor) {
478 		assert(false);
479 		return NULL;
480 	}
481 
482 	event = spdk_mempool_get(g_spdk_event_mempool);
483 	if (event == NULL) {
484 		assert(false);
485 		return NULL;
486 	}
487 
488 	event->lcore = lcore;
489 	event->fn = fn;
490 	event->arg1 = arg1;
491 	event->arg2 = arg2;
492 
493 	return event;
494 }
495 
496 void
497 spdk_event_call(struct spdk_event *event)
498 {
499 	int rc;
500 	struct spdk_reactor *reactor;
501 	struct spdk_reactor *local_reactor = NULL;
502 	uint32_t current_core = spdk_env_get_current_core();
503 
504 	reactor = spdk_reactor_get(event->lcore);
505 
506 	assert(reactor != NULL);
507 	assert(reactor->events != NULL);
508 
509 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
510 	if (rc != 1) {
511 		assert(false);
512 	}
513 
514 	if (current_core != SPDK_ENV_LCORE_ID_ANY) {
515 		local_reactor = spdk_reactor_get(current_core);
516 	}
517 
518 	/* If spdk_event_call isn't called on a reactor, always send a notification.
519 	 * If it is called on a reactor, send a notification if the destination reactor
520 	 * is indicated in interrupt mode state.
521 	 */
522 	if (spdk_unlikely(local_reactor == NULL) ||
523 	    spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
524 		uint64_t notify = 1;
525 
526 		rc = write(reactor->events_fd, &notify, sizeof(notify));
527 		if (rc < 0) {
528 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
529 		}
530 	}
531 }
532 
533 static inline int
534 event_queue_run_batch(void *arg)
535 {
536 	struct spdk_reactor *reactor = arg;
537 	size_t count, i;
538 	void *events[SPDK_EVENT_BATCH_SIZE];
539 	struct spdk_thread *thread;
540 	struct spdk_lw_thread *lw_thread;
541 
542 #ifdef DEBUG
543 	/*
544 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
545 	 * so we will never actually read uninitialized data from events, but just to be sure
546 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
547 	 */
548 	memset(events, 0, sizeof(events));
549 #endif
550 
551 	/* Operate event notification if this reactor currently runs in interrupt state */
552 	if (spdk_unlikely(reactor->in_interrupt)) {
553 		uint64_t notify = 1;
554 		int rc;
555 
556 		/* There may be race between event_acknowledge and another producer's event_notify,
557 		 * so event_acknowledge should be applied ahead. And then check for self's event_notify.
558 		 * This can avoid event notification missing.
559 		 */
560 		rc = read(reactor->events_fd, &notify, sizeof(notify));
561 		if (rc < 0) {
562 			SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno));
563 			return -errno;
564 		}
565 
566 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
567 
568 		if (spdk_ring_count(reactor->events) != 0) {
569 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
570 			rc = write(reactor->events_fd, &notify, sizeof(notify));
571 			if (rc < 0) {
572 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
573 				return -errno;
574 			}
575 		}
576 	} else {
577 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
578 	}
579 
580 	if (count == 0) {
581 		return 0;
582 	}
583 
584 	/* Execute the events. There are still some remaining events
585 	 * that must occur on an SPDK thread. To accomodate those, try to
586 	 * run them on the first thread in the list, if it exists. */
587 	lw_thread = TAILQ_FIRST(&reactor->threads);
588 	if (lw_thread) {
589 		thread = spdk_thread_get_from_ctx(lw_thread);
590 	} else {
591 		thread = NULL;
592 	}
593 
594 	for (i = 0; i < count; i++) {
595 		struct spdk_event *event = events[i];
596 
597 		assert(event != NULL);
598 		spdk_set_thread(thread);
599 		event->fn(event->arg1, event->arg2);
600 		spdk_set_thread(NULL);
601 	}
602 
603 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
604 
605 	return (int)count;
606 }
607 
608 /* 1s */
609 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
610 
611 static int
612 get_rusage(struct spdk_reactor *reactor)
613 {
614 	struct rusage		rusage;
615 
616 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
617 		return -1;
618 	}
619 
620 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
621 		SPDK_INFOLOG(reactor,
622 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
623 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
624 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
625 	}
626 	reactor->rusage = rusage;
627 
628 	return -1;
629 }
630 
631 void
632 spdk_framework_enable_context_switch_monitor(bool enable)
633 {
634 	/* This global is being read by multiple threads, so this isn't
635 	 * strictly thread safe. However, we're toggling between true and
636 	 * false here, and if a thread sees the value update later than it
637 	 * should, it's no big deal. */
638 	g_framework_context_switch_monitor_enabled = enable;
639 }
640 
641 bool
642 spdk_framework_context_switch_monitor_enabled(void)
643 {
644 	return g_framework_context_switch_monitor_enabled;
645 }
646 
647 static void
648 _set_thread_name(const char *thread_name)
649 {
650 #if defined(__linux__)
651 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
652 #elif defined(__FreeBSD__)
653 	pthread_set_name_np(pthread_self(), thread_name);
654 #else
655 	pthread_setname_np(pthread_self(), thread_name);
656 #endif
657 }
658 
659 static void
660 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
661 {
662 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
663 	struct spdk_thread_stats prev_total_stats;
664 
665 	/* Read total_stats before updating it to calculate stats during the last scheduling period. */
666 	prev_total_stats = lw_thread->total_stats;
667 
668 	spdk_set_thread(thread);
669 	spdk_thread_get_stats(&lw_thread->total_stats);
670 	spdk_set_thread(NULL);
671 
672 	lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc;
673 	lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc;
674 }
675 
676 static void
677 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info)
678 {
679 	struct spdk_lw_thread *lw_thread;
680 	struct spdk_thread *thread;
681 
682 	thread = spdk_thread_get_by_id(thread_info->thread_id);
683 	if (thread == NULL) {
684 		/* Thread no longer exists. */
685 		return;
686 	}
687 	lw_thread = spdk_thread_get_ctx(thread);
688 	assert(lw_thread != NULL);
689 
690 	lw_thread->lcore = thread_info->lcore;
691 	lw_thread->resched = true;
692 }
693 
694 static void
695 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
696 {
697 	struct spdk_scheduler_core_info *core;
698 	struct spdk_scheduler_thread_info *thread_info;
699 	uint32_t i, j;
700 
701 	SPDK_ENV_FOREACH_CORE(i) {
702 		core = &cores_info[i];
703 		for (j = 0; j < core->threads_count; j++) {
704 			thread_info = &core->thread_infos[j];
705 			if (thread_info->lcore != i) {
706 				_threads_reschedule_thread(thread_info);
707 			}
708 		}
709 		core->threads_count = 0;
710 		free(core->thread_infos);
711 		core->thread_infos = NULL;
712 	}
713 }
714 
715 static void
716 _reactors_scheduler_fini(void)
717 {
718 	/* Reschedule based on the balancing output */
719 	_threads_reschedule(g_core_infos);
720 
721 	g_scheduling_in_progress = false;
722 }
723 
724 static void
725 _reactors_scheduler_update_core_mode(void *ctx)
726 {
727 	struct spdk_reactor *reactor;
728 	uint32_t i;
729 	int rc = 0;
730 
731 	for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) {
732 		reactor = spdk_reactor_get(i);
733 		assert(reactor != NULL);
734 		if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) {
735 			/* Switch next found reactor to new state */
736 			rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode,
737 							     _reactors_scheduler_update_core_mode, NULL);
738 			if (rc == 0) {
739 				/* Set core to start with after callback completes */
740 				g_scheduler_core_number = spdk_env_get_next_core(i);
741 				return;
742 			}
743 		}
744 	}
745 	_reactors_scheduler_fini();
746 }
747 
748 static void
749 _reactors_scheduler_cancel(void *arg1, void *arg2)
750 {
751 	struct spdk_scheduler_core_info *core;
752 	uint32_t i;
753 
754 	SPDK_ENV_FOREACH_CORE(i) {
755 		core = &g_core_infos[i];
756 		core->threads_count = 0;
757 		free(core->thread_infos);
758 		core->thread_infos = NULL;
759 	}
760 
761 	g_scheduling_in_progress = false;
762 }
763 
764 static void
765 _reactors_scheduler_balance(void *arg1, void *arg2)
766 {
767 	struct spdk_scheduler *scheduler = spdk_scheduler_get();
768 
769 	if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) {
770 		_reactors_scheduler_cancel(NULL, NULL);
771 		return;
772 	}
773 
774 	scheduler->balance(g_core_infos, g_reactor_count);
775 
776 	g_scheduler_core_number = spdk_env_get_first_core();
777 	_reactors_scheduler_update_core_mode(NULL);
778 }
779 
780 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
781 static void
782 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
783 {
784 	struct spdk_scheduler_core_info *core_info;
785 	struct spdk_lw_thread *lw_thread;
786 	struct spdk_thread *thread;
787 	struct spdk_reactor *reactor;
788 	struct spdk_event *evt;
789 	uint32_t next_core;
790 	uint32_t i = 0;
791 
792 	reactor = spdk_reactor_get(spdk_env_get_current_core());
793 	assert(reactor != NULL);
794 	core_info = &g_core_infos[reactor->lcore];
795 	core_info->lcore = reactor->lcore;
796 	core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc;
797 	core_info->total_idle_tsc = reactor->idle_tsc;
798 	core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc;
799 	core_info->total_busy_tsc = reactor->busy_tsc;
800 	core_info->interrupt_mode = reactor->in_interrupt;
801 	core_info->threads_count = 0;
802 
803 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
804 
805 	if (reactor->thread_count > 0) {
806 		core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos));
807 		if (core_info->thread_infos == NULL) {
808 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
809 
810 			/* Cancel this round of schedule work */
811 			evt = spdk_event_allocate(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL);
812 			spdk_event_call(evt);
813 			return;
814 		}
815 
816 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
817 			_init_thread_stats(reactor, lw_thread);
818 
819 			core_info->thread_infos[i].lcore = lw_thread->lcore;
820 			thread = spdk_thread_get_from_ctx(lw_thread);
821 			assert(thread != NULL);
822 			core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread);
823 			core_info->thread_infos[i].total_stats = lw_thread->total_stats;
824 			core_info->thread_infos[i].current_stats = lw_thread->current_stats;
825 			core_info->threads_count++;
826 			assert(core_info->threads_count <= reactor->thread_count);
827 			i++;
828 		}
829 	}
830 
831 	next_core = spdk_env_get_next_core(reactor->lcore);
832 	if (next_core == UINT32_MAX) {
833 		next_core = spdk_env_get_first_core();
834 	}
835 
836 	/* If we've looped back around to the scheduler thread, move to the next phase */
837 	if (next_core == g_scheduling_reactor->lcore) {
838 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
839 		evt = spdk_event_allocate(next_core, _reactors_scheduler_balance, NULL, NULL);
840 		spdk_event_call(evt);
841 		return;
842 	}
843 
844 	evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
845 	spdk_event_call(evt);
846 }
847 
848 static int _reactor_schedule_thread(struct spdk_thread *thread);
849 static uint64_t g_rusage_period;
850 
851 static void
852 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
853 {
854 	struct spdk_thread	*thread = spdk_thread_get_from_ctx(lw_thread);
855 	int efd;
856 
857 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
858 	assert(reactor->thread_count > 0);
859 	reactor->thread_count--;
860 
861 	/* Operate thread intr if running with full interrupt ability */
862 	if (spdk_interrupt_mode_is_enabled()) {
863 		efd = spdk_thread_get_interrupt_fd(thread);
864 		spdk_fd_group_remove(reactor->fgrp, efd);
865 	}
866 }
867 
868 static bool
869 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
870 {
871 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
872 
873 	if (spdk_unlikely(lw_thread->resched)) {
874 		lw_thread->resched = false;
875 		_reactor_remove_lw_thread(reactor, lw_thread);
876 		_reactor_schedule_thread(thread);
877 		return true;
878 	}
879 
880 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
881 			  spdk_thread_is_idle(thread))) {
882 		_reactor_remove_lw_thread(reactor, lw_thread);
883 		spdk_thread_destroy(thread);
884 		return true;
885 	}
886 
887 	return false;
888 }
889 
890 static void
891 reactor_interrupt_run(struct spdk_reactor *reactor)
892 {
893 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
894 
895 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
896 }
897 
898 static void
899 _reactor_run(struct spdk_reactor *reactor)
900 {
901 	struct spdk_thread	*thread;
902 	struct spdk_lw_thread	*lw_thread, *tmp;
903 	uint64_t		now;
904 	int			rc;
905 
906 	event_queue_run_batch(reactor);
907 
908 	/* If no threads are present on the reactor,
909 	 * tsc_last gets outdated. Update it to track
910 	 * thread execution time correctly. */
911 	if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) {
912 		now = spdk_get_ticks();
913 		reactor->idle_tsc += now - reactor->tsc_last;
914 		reactor->tsc_last = now;
915 		return;
916 	}
917 
918 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
919 		thread = spdk_thread_get_from_ctx(lw_thread);
920 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
921 
922 		now = spdk_thread_get_last_tsc(thread);
923 		if (rc == 0) {
924 			reactor->idle_tsc += now - reactor->tsc_last;
925 		} else if (rc > 0) {
926 			reactor->busy_tsc += now - reactor->tsc_last;
927 		}
928 		reactor->tsc_last = now;
929 
930 		reactor_post_process_lw_thread(reactor, lw_thread);
931 	}
932 }
933 
934 static int
935 reactor_run(void *arg)
936 {
937 	struct spdk_reactor	*reactor = arg;
938 	struct spdk_thread	*thread;
939 	struct spdk_lw_thread	*lw_thread, *tmp;
940 	char			thread_name[32];
941 	uint64_t		last_sched = 0;
942 
943 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
944 
945 	/* Rename the POSIX thread because the reactor is tied to the POSIX
946 	 * thread in the SPDK event library.
947 	 */
948 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
949 	_set_thread_name(thread_name);
950 
951 	reactor->tsc_last = spdk_get_ticks();
952 
953 	while (1) {
954 		/* Execute interrupt process fn if this reactor currently runs in interrupt state */
955 		if (spdk_unlikely(reactor->in_interrupt)) {
956 			reactor_interrupt_run(reactor);
957 		} else {
958 			_reactor_run(reactor);
959 		}
960 
961 		if (g_framework_context_switch_monitor_enabled) {
962 			if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
963 				get_rusage(reactor);
964 				reactor->last_rusage = reactor->tsc_last;
965 			}
966 		}
967 
968 		if (spdk_unlikely(g_scheduler_period > 0 &&
969 				  (reactor->tsc_last - last_sched) > g_scheduler_period &&
970 				  reactor == g_scheduling_reactor &&
971 				  !g_scheduling_in_progress)) {
972 			last_sched = reactor->tsc_last;
973 			g_scheduling_in_progress = true;
974 			_reactors_scheduler_gather_metrics(NULL, NULL);
975 		}
976 
977 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
978 			break;
979 		}
980 	}
981 
982 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
983 		thread = spdk_thread_get_from_ctx(lw_thread);
984 		spdk_set_thread(thread);
985 		spdk_thread_exit(thread);
986 	}
987 
988 	while (!TAILQ_EMPTY(&reactor->threads)) {
989 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
990 			thread = spdk_thread_get_from_ctx(lw_thread);
991 			spdk_set_thread(thread);
992 			if (spdk_thread_is_exited(thread)) {
993 				_reactor_remove_lw_thread(reactor, lw_thread);
994 				spdk_thread_destroy(thread);
995 			} else {
996 				spdk_thread_poll(thread, 0, 0);
997 			}
998 		}
999 	}
1000 
1001 	return 0;
1002 }
1003 
1004 int
1005 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
1006 {
1007 	int ret;
1008 	const struct spdk_cpuset *validmask;
1009 
1010 	ret = spdk_cpuset_parse(cpumask, mask);
1011 	if (ret < 0) {
1012 		return ret;
1013 	}
1014 
1015 	validmask = spdk_app_get_core_mask();
1016 	spdk_cpuset_and(cpumask, validmask);
1017 
1018 	return 0;
1019 }
1020 
1021 const struct spdk_cpuset *
1022 spdk_app_get_core_mask(void)
1023 {
1024 	return &g_reactor_core_mask;
1025 }
1026 
1027 void
1028 spdk_reactors_start(void)
1029 {
1030 	struct spdk_reactor *reactor;
1031 	uint32_t i, current_core;
1032 	int rc;
1033 
1034 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
1035 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
1036 	/* Reinitialize to false, in case the app framework is restarting in the same process. */
1037 	g_stopping_reactors = false;
1038 
1039 	current_core = spdk_env_get_current_core();
1040 	SPDK_ENV_FOREACH_CORE(i) {
1041 		if (i != current_core) {
1042 			reactor = spdk_reactor_get(i);
1043 			if (reactor == NULL) {
1044 				continue;
1045 			}
1046 
1047 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
1048 			if (rc < 0) {
1049 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
1050 				assert(false);
1051 				return;
1052 			}
1053 		}
1054 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
1055 	}
1056 
1057 	/* Start the main reactor */
1058 	reactor = spdk_reactor_get(current_core);
1059 	assert(reactor != NULL);
1060 	reactor_run(reactor);
1061 
1062 	spdk_env_thread_wait_all();
1063 
1064 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
1065 }
1066 
1067 static void
1068 _reactors_stop(void *arg1, void *arg2)
1069 {
1070 	uint32_t i;
1071 	int rc;
1072 	struct spdk_reactor *reactor;
1073 	struct spdk_reactor *local_reactor;
1074 	uint64_t notify = 1;
1075 
1076 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
1077 	local_reactor = spdk_reactor_get(spdk_env_get_current_core());
1078 
1079 	SPDK_ENV_FOREACH_CORE(i) {
1080 		/* If spdk_event_call isn't called  on a reactor, always send a notification.
1081 		 * If it is called on a reactor, send a notification if the destination reactor
1082 		 * is indicated in interrupt mode state.
1083 		 */
1084 		if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
1085 			reactor = spdk_reactor_get(i);
1086 			assert(reactor != NULL);
1087 			rc = write(reactor->events_fd, &notify, sizeof(notify));
1088 			if (rc < 0) {
1089 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
1090 				continue;
1091 			}
1092 		}
1093 	}
1094 }
1095 
1096 static void
1097 nop(void *arg1, void *arg2)
1098 {
1099 }
1100 
1101 void
1102 spdk_reactors_stop(void *arg1)
1103 {
1104 	spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop);
1105 }
1106 
1107 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
1108 static uint32_t g_next_core = UINT32_MAX;
1109 
1110 static int
1111 thread_process_interrupts(void *arg)
1112 {
1113 	struct spdk_thread *thread = arg;
1114 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
1115 	uint64_t now;
1116 	int rc;
1117 
1118 	assert(reactor != NULL);
1119 
1120 	/* Update idle_tsc between the end of last intr_fn and the start of this intr_fn. */
1121 	now = spdk_get_ticks();
1122 	reactor->idle_tsc += now - reactor->tsc_last;
1123 	reactor->tsc_last = now;
1124 
1125 	rc = spdk_thread_poll(thread, 0, now);
1126 
1127 	/* Update tsc between the start and the end of this intr_fn. */
1128 	now = spdk_thread_get_last_tsc(thread);
1129 	if (rc == 0) {
1130 		reactor->idle_tsc += now - reactor->tsc_last;
1131 	} else if (rc > 0) {
1132 		reactor->busy_tsc += now - reactor->tsc_last;
1133 	}
1134 	reactor->tsc_last = now;
1135 
1136 	return rc;
1137 }
1138 
1139 static void
1140 _schedule_thread(void *arg1, void *arg2)
1141 {
1142 	struct spdk_lw_thread *lw_thread = arg1;
1143 	struct spdk_thread *thread;
1144 	struct spdk_reactor *reactor;
1145 	uint32_t current_core;
1146 	int efd;
1147 
1148 	current_core = spdk_env_get_current_core();
1149 	reactor = spdk_reactor_get(current_core);
1150 	assert(reactor != NULL);
1151 
1152 	/* Update total_stats to reflect state of thread
1153 	* at the end of the move. */
1154 	thread = spdk_thread_get_from_ctx(lw_thread);
1155 	spdk_set_thread(thread);
1156 	spdk_thread_get_stats(&lw_thread->total_stats);
1157 	spdk_set_thread(NULL);
1158 
1159 	lw_thread->lcore = current_core;
1160 
1161 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
1162 	reactor->thread_count++;
1163 
1164 	/* Operate thread intr if running with full interrupt ability */
1165 	if (spdk_interrupt_mode_is_enabled()) {
1166 		int rc;
1167 
1168 		efd = spdk_thread_get_interrupt_fd(thread);
1169 		rc = SPDK_FD_GROUP_ADD(reactor->fgrp, efd,
1170 				       thread_process_interrupts, thread);
1171 		if (rc < 0) {
1172 			SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
1173 		}
1174 
1175 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
1176 		spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor);
1177 	}
1178 }
1179 
1180 static int
1181 _reactor_schedule_thread(struct spdk_thread *thread)
1182 {
1183 	uint32_t core;
1184 	struct spdk_lw_thread *lw_thread;
1185 	struct spdk_event *evt = NULL;
1186 	struct spdk_cpuset *cpumask;
1187 	uint32_t i;
1188 	struct spdk_reactor *local_reactor = NULL;
1189 	uint32_t current_lcore = spdk_env_get_current_core();
1190 	struct spdk_cpuset polling_cpumask;
1191 	struct spdk_cpuset valid_cpumask;
1192 
1193 	cpumask = spdk_thread_get_cpumask(thread);
1194 
1195 	lw_thread = spdk_thread_get_ctx(thread);
1196 	assert(lw_thread != NULL);
1197 	core = lw_thread->lcore;
1198 	memset(lw_thread, 0, sizeof(*lw_thread));
1199 
1200 	if (current_lcore != SPDK_ENV_LCORE_ID_ANY) {
1201 		local_reactor = spdk_reactor_get(current_lcore);
1202 		assert(local_reactor);
1203 	}
1204 
1205 	/* When interrupt ability of spdk_thread is not enabled and the current
1206 	 * reactor runs on DPDK thread, skip reactors which are in interrupt mode.
1207 	 */
1208 	if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) {
1209 		/* Get the cpumask of all reactors in polling */
1210 		spdk_cpuset_zero(&polling_cpumask);
1211 		SPDK_ENV_FOREACH_CORE(i) {
1212 			spdk_cpuset_set_cpu(&polling_cpumask, i, true);
1213 		}
1214 		spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset);
1215 
1216 		if (core == SPDK_ENV_LCORE_ID_ANY) {
1217 			/* Get the cpumask of all valid reactors which are suggested and also in polling */
1218 			spdk_cpuset_copy(&valid_cpumask, &polling_cpumask);
1219 			spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread));
1220 
1221 			/* If there are any valid reactors, spdk_thread should be scheduled
1222 			 * into one of the valid reactors.
1223 			 * If there is no valid reactors, spdk_thread should be scheduled
1224 			 * into one of the polling reactors.
1225 			 */
1226 			if (spdk_cpuset_count(&valid_cpumask) != 0) {
1227 				cpumask = &valid_cpumask;
1228 			} else {
1229 				cpumask = &polling_cpumask;
1230 			}
1231 		} else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) {
1232 			/* If specified reactor is not in polling, spdk_thread should be scheduled
1233 			 * into one of the polling reactors.
1234 			 */
1235 			core = SPDK_ENV_LCORE_ID_ANY;
1236 			cpumask = &polling_cpumask;
1237 		}
1238 	}
1239 
1240 	pthread_mutex_lock(&g_scheduler_mtx);
1241 	if (core == SPDK_ENV_LCORE_ID_ANY) {
1242 		for (i = 0; i < spdk_env_get_core_count(); i++) {
1243 			if (g_next_core >= g_reactor_count) {
1244 				g_next_core = spdk_env_get_first_core();
1245 			}
1246 			core = g_next_core;
1247 			g_next_core = spdk_env_get_next_core(g_next_core);
1248 
1249 			if (spdk_cpuset_get_cpu(cpumask, core)) {
1250 				break;
1251 			}
1252 		}
1253 	}
1254 
1255 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
1256 
1257 	pthread_mutex_unlock(&g_scheduler_mtx);
1258 
1259 	assert(evt != NULL);
1260 	if (evt == NULL) {
1261 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
1262 		return -1;
1263 	}
1264 
1265 	lw_thread->tsc_start = spdk_get_ticks();
1266 
1267 	spdk_event_call(evt);
1268 
1269 	return 0;
1270 }
1271 
1272 static void
1273 _reactor_request_thread_reschedule(struct spdk_thread *thread)
1274 {
1275 	struct spdk_lw_thread *lw_thread;
1276 	struct spdk_reactor *reactor;
1277 	uint32_t current_core;
1278 
1279 	assert(thread == spdk_get_thread());
1280 
1281 	lw_thread = spdk_thread_get_ctx(thread);
1282 
1283 	assert(lw_thread != NULL);
1284 	lw_thread->resched = true;
1285 	lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1286 
1287 	current_core = spdk_env_get_current_core();
1288 	reactor = spdk_reactor_get(current_core);
1289 	assert(reactor != NULL);
1290 
1291 	/* Send a notification if the destination reactor is indicated in intr mode state */
1292 	if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
1293 		uint64_t notify = 1;
1294 
1295 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1296 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
1297 		}
1298 	}
1299 }
1300 
1301 static int
1302 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
1303 {
1304 	struct spdk_lw_thread *lw_thread;
1305 
1306 	switch (op) {
1307 	case SPDK_THREAD_OP_NEW:
1308 		lw_thread = spdk_thread_get_ctx(thread);
1309 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1310 		return _reactor_schedule_thread(thread);
1311 	case SPDK_THREAD_OP_RESCHED:
1312 		_reactor_request_thread_reschedule(thread);
1313 		return 0;
1314 	default:
1315 		return -ENOTSUP;
1316 	}
1317 }
1318 
1319 static bool
1320 reactor_thread_op_supported(enum spdk_thread_op op)
1321 {
1322 	switch (op) {
1323 	case SPDK_THREAD_OP_NEW:
1324 	case SPDK_THREAD_OP_RESCHED:
1325 		return true;
1326 	default:
1327 		return false;
1328 	}
1329 }
1330 
1331 struct call_reactor {
1332 	uint32_t cur_core;
1333 	spdk_event_fn fn;
1334 	void *arg1;
1335 	void *arg2;
1336 
1337 	uint32_t orig_core;
1338 	spdk_event_fn cpl;
1339 };
1340 
1341 static void
1342 on_reactor(void *arg1, void *arg2)
1343 {
1344 	struct call_reactor *cr = arg1;
1345 	struct spdk_event *evt;
1346 
1347 	cr->fn(cr->arg1, cr->arg2);
1348 
1349 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1350 
1351 	if (cr->cur_core >= g_reactor_count) {
1352 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1353 
1354 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1355 		free(cr);
1356 	} else {
1357 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1358 			      cr->cur_core);
1359 
1360 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1361 	}
1362 	assert(evt != NULL);
1363 	spdk_event_call(evt);
1364 }
1365 
1366 void
1367 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1368 {
1369 	struct call_reactor *cr;
1370 	struct spdk_event *evt;
1371 
1372 	/* When the application framework is shutting down, we will send one
1373 	 * final for_each_reactor operation with completion callback _reactors_stop,
1374 	 * to flush any existing for_each_reactor operations to avoid any memory
1375 	 * leaks. We use a mutex here to protect a boolean flag that will ensure
1376 	 * we don't start any more operations once we've started shutting down.
1377 	 */
1378 	pthread_mutex_lock(&g_stopping_reactors_mtx);
1379 	if (g_stopping_reactors) {
1380 		pthread_mutex_unlock(&g_stopping_reactors_mtx);
1381 		return;
1382 	} else if (cpl == _reactors_stop) {
1383 		g_stopping_reactors = true;
1384 	}
1385 	pthread_mutex_unlock(&g_stopping_reactors_mtx);
1386 
1387 	cr = calloc(1, sizeof(*cr));
1388 	if (!cr) {
1389 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1390 		cpl(arg1, arg2);
1391 		return;
1392 	}
1393 
1394 	cr->fn = fn;
1395 	cr->arg1 = arg1;
1396 	cr->arg2 = arg2;
1397 	cr->cpl = cpl;
1398 	cr->orig_core = spdk_env_get_current_core();
1399 	cr->cur_core = spdk_env_get_first_core();
1400 
1401 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1402 
1403 	evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL);
1404 	assert(evt != NULL);
1405 
1406 	spdk_event_call(evt);
1407 }
1408 
1409 #ifdef __linux__
1410 static int
1411 reactor_schedule_thread_event(void *arg)
1412 {
1413 	struct spdk_reactor *reactor = arg;
1414 	struct spdk_lw_thread *lw_thread, *tmp;
1415 	uint32_t count = 0;
1416 	uint64_t notify = 1;
1417 
1418 	assert(reactor->in_interrupt);
1419 
1420 	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1421 		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
1422 		return -errno;
1423 	}
1424 
1425 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1426 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1427 	}
1428 
1429 	return count;
1430 }
1431 
1432 static int
1433 reactor_interrupt_init(struct spdk_reactor *reactor)
1434 {
1435 	int rc;
1436 
1437 	rc = spdk_fd_group_create(&reactor->fgrp);
1438 	if (rc != 0) {
1439 		return rc;
1440 	}
1441 
1442 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1443 	if (reactor->resched_fd < 0) {
1444 		rc = -EBADF;
1445 		goto err;
1446 	}
1447 
1448 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event,
1449 			       reactor);
1450 	if (rc) {
1451 		close(reactor->resched_fd);
1452 		goto err;
1453 	}
1454 
1455 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1456 	if (reactor->events_fd < 0) {
1457 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1458 		close(reactor->resched_fd);
1459 
1460 		rc = -EBADF;
1461 		goto err;
1462 	}
1463 
1464 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd,
1465 			       event_queue_run_batch, reactor);
1466 	if (rc) {
1467 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1468 		close(reactor->resched_fd);
1469 		close(reactor->events_fd);
1470 		goto err;
1471 	}
1472 
1473 	return 0;
1474 
1475 err:
1476 	spdk_fd_group_destroy(reactor->fgrp);
1477 	reactor->fgrp = NULL;
1478 	return rc;
1479 }
1480 #else
1481 static int
1482 reactor_interrupt_init(struct spdk_reactor *reactor)
1483 {
1484 	return -ENOTSUP;
1485 }
1486 #endif
1487 
1488 static void
1489 reactor_interrupt_fini(struct spdk_reactor *reactor)
1490 {
1491 	struct spdk_fd_group *fgrp = reactor->fgrp;
1492 
1493 	if (!fgrp) {
1494 		return;
1495 	}
1496 
1497 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1498 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1499 
1500 	close(reactor->events_fd);
1501 	close(reactor->resched_fd);
1502 
1503 	spdk_fd_group_destroy(fgrp);
1504 	reactor->fgrp = NULL;
1505 }
1506 
1507 static struct spdk_governor *
1508 _governor_find(const char *name)
1509 {
1510 	struct spdk_governor *governor, *tmp;
1511 
1512 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1513 		if (strcmp(name, governor->name) == 0) {
1514 			return governor;
1515 		}
1516 	}
1517 
1518 	return NULL;
1519 }
1520 
1521 int
1522 spdk_governor_set(const char *name)
1523 {
1524 	struct spdk_governor *governor;
1525 	int rc = 0;
1526 
1527 	/* NULL governor was specifically requested */
1528 	if (name == NULL) {
1529 		if (g_governor) {
1530 			g_governor->deinit();
1531 		}
1532 		g_governor = NULL;
1533 		return 0;
1534 	}
1535 
1536 	governor = _governor_find(name);
1537 	if (governor == NULL) {
1538 		return -EINVAL;
1539 	}
1540 
1541 	if (g_governor == governor) {
1542 		return 0;
1543 	}
1544 
1545 	rc = governor->init();
1546 	if (rc == 0) {
1547 		if (g_governor) {
1548 			g_governor->deinit();
1549 		}
1550 		g_governor = governor;
1551 	}
1552 
1553 	return rc;
1554 }
1555 
1556 struct spdk_governor *
1557 spdk_governor_get(void)
1558 {
1559 	return g_governor;
1560 }
1561 
1562 void
1563 spdk_governor_register(struct spdk_governor *governor)
1564 {
1565 	if (_governor_find(governor->name)) {
1566 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1567 		assert(false);
1568 		return;
1569 	}
1570 
1571 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1572 }
1573 
1574 SPDK_LOG_REGISTER_COMPONENT(reactor)
1575