xref: /spdk/lib/event/reactor.c (revision 18c8b52afa69f39481ebb75711b2f30b11693f9d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/likely.h"
8 
9 #include "spdk_internal/event.h"
10 #include "spdk_internal/usdt.h"
11 
12 #include "spdk/log.h"
13 #include "spdk/thread.h"
14 #include "spdk/env.h"
15 #include "spdk/util.h"
16 #include "spdk/scheduler.h"
17 #include "spdk/string.h"
18 #include "spdk/fd_group.h"
19 
20 #ifdef __linux__
21 #include <sys/prctl.h>
22 #include <sys/eventfd.h>
23 #endif
24 
25 #ifdef __FreeBSD__
26 #include <pthread_np.h>
27 #endif
28 
29 #define SPDK_EVENT_BATCH_SIZE		8
30 
31 static struct spdk_reactor *g_reactors;
32 static uint32_t g_reactor_count;
33 static struct spdk_cpuset g_reactor_core_mask;
34 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
35 
36 static bool g_framework_context_switch_monitor_enabled = true;
37 
38 static struct spdk_mempool *g_spdk_event_mempool = NULL;
39 
40 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
41 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
42 
43 static struct spdk_scheduler *g_scheduler = NULL;
44 static struct spdk_reactor *g_scheduling_reactor;
45 bool g_scheduling_in_progress = false;
46 static uint64_t g_scheduler_period = 0;
47 static uint32_t g_scheduler_core_number;
48 static struct spdk_scheduler_core_info *g_core_infos = NULL;
49 
50 TAILQ_HEAD(, spdk_governor) g_governor_list
51 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
52 
53 static struct spdk_governor *g_governor = NULL;
54 
55 static int reactor_interrupt_init(struct spdk_reactor *reactor);
56 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
57 
58 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER;
59 static bool g_stopping_reactors = false;
60 
61 static struct spdk_scheduler *
62 _scheduler_find(const char *name)
63 {
64 	struct spdk_scheduler *tmp;
65 
66 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
67 		if (strcmp(name, tmp->name) == 0) {
68 			return tmp;
69 		}
70 	}
71 
72 	return NULL;
73 }
74 
75 int
76 spdk_scheduler_set(const char *name)
77 {
78 	struct spdk_scheduler *scheduler;
79 	int rc = 0;
80 
81 	/* NULL scheduler was specifically requested */
82 	if (name == NULL) {
83 		if (g_scheduler) {
84 			g_scheduler->deinit();
85 		}
86 		g_scheduler = NULL;
87 		return 0;
88 	}
89 
90 	scheduler = _scheduler_find(name);
91 	if (scheduler == NULL) {
92 		SPDK_ERRLOG("Requested scheduler is missing\n");
93 		return -EINVAL;
94 	}
95 
96 	if (g_scheduler == scheduler) {
97 		return 0;
98 	}
99 
100 	rc = scheduler->init();
101 	if (rc == 0) {
102 		if (g_scheduler) {
103 			g_scheduler->deinit();
104 		}
105 		g_scheduler = scheduler;
106 	}
107 
108 	return rc;
109 }
110 
111 struct spdk_scheduler *
112 spdk_scheduler_get(void)
113 {
114 	return g_scheduler;
115 }
116 
117 uint64_t
118 spdk_scheduler_get_period(void)
119 {
120 	/* Convert from ticks to microseconds */
121 	return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
122 }
123 
124 void
125 spdk_scheduler_set_period(uint64_t period)
126 {
127 	/* Convert microseconds to ticks */
128 	g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
129 }
130 
131 void
132 spdk_scheduler_register(struct spdk_scheduler *scheduler)
133 {
134 	if (_scheduler_find(scheduler->name)) {
135 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
136 		assert(false);
137 		return;
138 	}
139 
140 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
141 }
142 
143 static void
144 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
145 {
146 	reactor->lcore = lcore;
147 	reactor->flags.is_valid = true;
148 
149 	TAILQ_INIT(&reactor->threads);
150 	reactor->thread_count = 0;
151 	spdk_cpuset_zero(&reactor->notify_cpuset);
152 
153 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
154 	if (reactor->events == NULL) {
155 		SPDK_ERRLOG("Failed to allocate events ring\n");
156 		assert(false);
157 	}
158 
159 	/* Always initialize interrupt facilities for reactor */
160 	if (reactor_interrupt_init(reactor) != 0) {
161 		/* Reactor interrupt facilities are necessary if seting app to interrupt mode. */
162 		if (spdk_interrupt_mode_is_enabled()) {
163 			SPDK_ERRLOG("Failed to prepare intr facilities\n");
164 			assert(false);
165 		}
166 		return;
167 	}
168 
169 	/* If application runs with full interrupt ability,
170 	 * all reactors are going to run in interrupt mode.
171 	 */
172 	if (spdk_interrupt_mode_is_enabled()) {
173 		uint32_t i;
174 
175 		SPDK_ENV_FOREACH_CORE(i) {
176 			spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
177 		}
178 		reactor->in_interrupt = true;
179 	}
180 }
181 
182 struct spdk_reactor *
183 spdk_reactor_get(uint32_t lcore)
184 {
185 	struct spdk_reactor *reactor;
186 
187 	if (g_reactors == NULL) {
188 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
189 		return NULL;
190 	}
191 
192 	if (lcore >= g_reactor_count) {
193 		return NULL;
194 	}
195 
196 	reactor = &g_reactors[lcore];
197 
198 	if (reactor->flags.is_valid == false) {
199 		return NULL;
200 	}
201 
202 	return reactor;
203 }
204 
205 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
206 static bool reactor_thread_op_supported(enum spdk_thread_op op);
207 
208 int
209 spdk_reactors_init(size_t msg_mempool_size)
210 {
211 	struct spdk_reactor *reactor;
212 	int rc;
213 	uint32_t i, current_core;
214 	char mempool_name[32];
215 
216 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
217 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
218 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
219 			       sizeof(struct spdk_event),
220 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
221 			       SPDK_ENV_SOCKET_ID_ANY);
222 
223 	if (g_spdk_event_mempool == NULL) {
224 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
225 		return -1;
226 	}
227 
228 	/* struct spdk_reactor must be aligned on 64 byte boundary */
229 	g_reactor_count = spdk_env_get_last_core() + 1;
230 	rc = posix_memalign((void **)&g_reactors, 64,
231 			    g_reactor_count * sizeof(struct spdk_reactor));
232 	if (rc != 0) {
233 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
234 			    g_reactor_count);
235 		spdk_mempool_free(g_spdk_event_mempool);
236 		return -1;
237 	}
238 
239 	g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos));
240 	if (g_core_infos == NULL) {
241 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
242 		spdk_mempool_free(g_spdk_event_mempool);
243 		free(g_reactors);
244 		return -ENOMEM;
245 	}
246 
247 	memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor));
248 
249 	rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
250 				      sizeof(struct spdk_lw_thread), msg_mempool_size);
251 	if (rc != 0) {
252 		SPDK_ERRLOG("Initialize spdk thread lib failed\n");
253 		spdk_mempool_free(g_spdk_event_mempool);
254 		free(g_reactors);
255 		free(g_core_infos);
256 		return rc;
257 	}
258 
259 	SPDK_ENV_FOREACH_CORE(i) {
260 		reactor_construct(&g_reactors[i], i);
261 	}
262 
263 	current_core = spdk_env_get_current_core();
264 	reactor = spdk_reactor_get(current_core);
265 	assert(reactor != NULL);
266 	g_scheduling_reactor = reactor;
267 
268 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
269 
270 	return 0;
271 }
272 
273 void
274 spdk_reactors_fini(void)
275 {
276 	uint32_t i;
277 	struct spdk_reactor *reactor;
278 
279 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
280 		return;
281 	}
282 
283 	spdk_thread_lib_fini();
284 
285 	SPDK_ENV_FOREACH_CORE(i) {
286 		reactor = spdk_reactor_get(i);
287 		assert(reactor != NULL);
288 		assert(reactor->thread_count == 0);
289 		if (reactor->events != NULL) {
290 			spdk_ring_free(reactor->events);
291 		}
292 
293 		reactor_interrupt_fini(reactor);
294 
295 		if (g_core_infos != NULL) {
296 			free(g_core_infos[i].thread_infos);
297 		}
298 	}
299 
300 	spdk_mempool_free(g_spdk_event_mempool);
301 
302 	free(g_reactors);
303 	g_reactors = NULL;
304 	free(g_core_infos);
305 	g_core_infos = NULL;
306 }
307 
308 static void _reactor_set_interrupt_mode(void *arg1, void *arg2);
309 
310 static void
311 _reactor_set_notify_cpuset(void *arg1, void *arg2)
312 {
313 	struct spdk_reactor *target = arg1;
314 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
315 
316 	assert(reactor != NULL);
317 	spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt);
318 }
319 
320 static void
321 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
322 {
323 	struct spdk_event *ev;
324 
325 	ev = spdk_event_allocate(lcore, fn, arg1, arg2);
326 	assert(ev);
327 	spdk_event_call(ev);
328 }
329 
330 static void
331 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2)
332 {
333 	struct spdk_reactor *target = arg1;
334 
335 	if (target->new_in_interrupt == false) {
336 		target->set_interrupt_mode_in_progress = false;
337 		spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn,
338 				     target->set_interrupt_mode_cb_arg);
339 	} else {
340 		_event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL);
341 	}
342 }
343 
344 static void
345 _reactor_set_thread_interrupt_mode(void *ctx)
346 {
347 	struct spdk_reactor *reactor = ctx;
348 
349 	spdk_thread_set_interrupt_mode(reactor->in_interrupt);
350 }
351 
352 static void
353 _reactor_set_interrupt_mode(void *arg1, void *arg2)
354 {
355 	struct spdk_reactor *target = arg1;
356 	struct spdk_thread *thread;
357 	struct spdk_lw_thread *lw_thread, *tmp;
358 
359 	assert(target == spdk_reactor_get(spdk_env_get_current_core()));
360 	assert(target != NULL);
361 	assert(target->in_interrupt != target->new_in_interrupt);
362 	SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n",
363 		      target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll");
364 
365 	target->in_interrupt = target->new_in_interrupt;
366 
367 	/* Align spdk_thread with reactor to interrupt mode or poll mode */
368 	TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) {
369 		thread = spdk_thread_get_from_ctx(lw_thread);
370 		spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target);
371 	}
372 
373 	if (target->new_in_interrupt == false) {
374 		/* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately
375 		 * track reactor stats. */
376 		target->tsc_last = spdk_get_ticks();
377 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
378 	} else {
379 		uint64_t notify = 1;
380 		int rc = 0;
381 
382 		/* Always trigger spdk_event and resched event in case of race condition */
383 		rc = write(target->events_fd, &notify, sizeof(notify));
384 		if (rc < 0) {
385 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
386 		}
387 		rc = write(target->resched_fd, &notify, sizeof(notify));
388 		if (rc < 0) {
389 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
390 		}
391 
392 		target->set_interrupt_mode_in_progress = false;
393 		spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn,
394 				     target->set_interrupt_mode_cb_arg);
395 	}
396 }
397 
398 int
399 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt,
400 				spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg)
401 {
402 	struct spdk_reactor *target;
403 
404 	target = spdk_reactor_get(lcore);
405 	if (target == NULL) {
406 		return -EINVAL;
407 	}
408 
409 	/* Eventfd has to be supported in order to use interrupt functionality. */
410 	if (target->fgrp == NULL) {
411 		return -ENOTSUP;
412 	}
413 
414 	if (spdk_get_thread() != _spdk_get_app_thread()) {
415 		SPDK_ERRLOG("It is only permitted within spdk application thread.\n");
416 		return -EPERM;
417 	}
418 
419 	if (target->in_interrupt == new_in_interrupt) {
420 		cb_fn(cb_arg);
421 		return 0;
422 	}
423 
424 	if (target->set_interrupt_mode_in_progress) {
425 		SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore);
426 		return -EBUSY;
427 	}
428 	target->set_interrupt_mode_in_progress = true;
429 
430 	target->new_in_interrupt = new_in_interrupt;
431 	target->set_interrupt_mode_cb_fn = cb_fn;
432 	target->set_interrupt_mode_cb_arg = cb_arg;
433 
434 	SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n",
435 		      spdk_env_get_current_core(), lcore);
436 
437 	if (new_in_interrupt == false) {
438 		/* For potential race cases, when setting the reactor to poll mode,
439 		 * first change the mode of the reactor and then clear the corresponding
440 		 * bit of the notify_cpuset of each reactor.
441 		 */
442 		_event_call(lcore, _reactor_set_interrupt_mode, target, NULL);
443 	} else {
444 		/* For race cases, when setting the reactor to interrupt mode, first set the
445 		 * corresponding bit of the notify_cpuset of each reactor and then change the mode.
446 		 */
447 		spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
448 	}
449 
450 	return 0;
451 }
452 
453 struct spdk_event *
454 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
455 {
456 	struct spdk_event *event = NULL;
457 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
458 
459 	if (!reactor) {
460 		assert(false);
461 		return NULL;
462 	}
463 
464 	event = spdk_mempool_get(g_spdk_event_mempool);
465 	if (event == NULL) {
466 		assert(false);
467 		return NULL;
468 	}
469 
470 	event->lcore = lcore;
471 	event->fn = fn;
472 	event->arg1 = arg1;
473 	event->arg2 = arg2;
474 
475 	return event;
476 }
477 
478 void
479 spdk_event_call(struct spdk_event *event)
480 {
481 	int rc;
482 	struct spdk_reactor *reactor;
483 	struct spdk_reactor *local_reactor = NULL;
484 	uint32_t current_core = spdk_env_get_current_core();
485 
486 	reactor = spdk_reactor_get(event->lcore);
487 
488 	assert(reactor != NULL);
489 	assert(reactor->events != NULL);
490 
491 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
492 	if (rc != 1) {
493 		assert(false);
494 	}
495 
496 	if (current_core != SPDK_ENV_LCORE_ID_ANY) {
497 		local_reactor = spdk_reactor_get(current_core);
498 	}
499 
500 	/* If spdk_event_call isn't called on a reactor, always send a notification.
501 	 * If it is called on a reactor, send a notification if the destination reactor
502 	 * is indicated in interrupt mode state.
503 	 */
504 	if (spdk_unlikely(local_reactor == NULL) ||
505 	    spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
506 		uint64_t notify = 1;
507 
508 		rc = write(reactor->events_fd, &notify, sizeof(notify));
509 		if (rc < 0) {
510 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
511 		}
512 	}
513 }
514 
515 static inline int
516 event_queue_run_batch(void *arg)
517 {
518 	struct spdk_reactor *reactor = arg;
519 	size_t count, i;
520 	void *events[SPDK_EVENT_BATCH_SIZE];
521 	struct spdk_thread *thread;
522 	struct spdk_lw_thread *lw_thread;
523 
524 #ifdef DEBUG
525 	/*
526 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
527 	 * so we will never actually read uninitialized data from events, but just to be sure
528 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
529 	 */
530 	memset(events, 0, sizeof(events));
531 #endif
532 
533 	/* Operate event notification if this reactor currently runs in interrupt state */
534 	if (spdk_unlikely(reactor->in_interrupt)) {
535 		uint64_t notify = 1;
536 		int rc;
537 
538 		/* There may be race between event_acknowledge and another producer's event_notify,
539 		 * so event_acknowledge should be applied ahead. And then check for self's event_notify.
540 		 * This can avoid event notification missing.
541 		 */
542 		rc = read(reactor->events_fd, &notify, sizeof(notify));
543 		if (rc < 0) {
544 			SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno));
545 			return -errno;
546 		}
547 
548 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
549 
550 		if (spdk_ring_count(reactor->events) != 0) {
551 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
552 			rc = write(reactor->events_fd, &notify, sizeof(notify));
553 			if (rc < 0) {
554 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
555 				return -errno;
556 			}
557 		}
558 	} else {
559 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
560 	}
561 
562 	if (count == 0) {
563 		return 0;
564 	}
565 
566 	/* Execute the events. There are still some remaining events
567 	 * that must occur on an SPDK thread. To accomodate those, try to
568 	 * run them on the first thread in the list, if it exists. */
569 	lw_thread = TAILQ_FIRST(&reactor->threads);
570 	if (lw_thread) {
571 		thread = spdk_thread_get_from_ctx(lw_thread);
572 	} else {
573 		thread = NULL;
574 	}
575 
576 	for (i = 0; i < count; i++) {
577 		struct spdk_event *event = events[i];
578 
579 		assert(event != NULL);
580 		spdk_set_thread(thread);
581 
582 		SPDK_DTRACE_PROBE3(event_exec, event->fn,
583 				   event->arg1, event->arg2);
584 		event->fn(event->arg1, event->arg2);
585 		spdk_set_thread(NULL);
586 	}
587 
588 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
589 
590 	return (int)count;
591 }
592 
593 /* 1s */
594 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
595 
596 static int
597 get_rusage(struct spdk_reactor *reactor)
598 {
599 	struct rusage		rusage;
600 
601 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
602 		return -1;
603 	}
604 
605 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
606 		SPDK_INFOLOG(reactor,
607 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
608 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
609 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
610 	}
611 	reactor->rusage = rusage;
612 
613 	return -1;
614 }
615 
616 void
617 spdk_framework_enable_context_switch_monitor(bool enable)
618 {
619 	/* This global is being read by multiple threads, so this isn't
620 	 * strictly thread safe. However, we're toggling between true and
621 	 * false here, and if a thread sees the value update later than it
622 	 * should, it's no big deal. */
623 	g_framework_context_switch_monitor_enabled = enable;
624 }
625 
626 bool
627 spdk_framework_context_switch_monitor_enabled(void)
628 {
629 	return g_framework_context_switch_monitor_enabled;
630 }
631 
632 static void
633 _set_thread_name(const char *thread_name)
634 {
635 #if defined(__linux__)
636 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
637 #elif defined(__FreeBSD__)
638 	pthread_set_name_np(pthread_self(), thread_name);
639 #else
640 	pthread_setname_np(pthread_self(), thread_name);
641 #endif
642 }
643 
644 static void
645 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
646 {
647 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
648 	struct spdk_thread_stats prev_total_stats;
649 
650 	/* Read total_stats before updating it to calculate stats during the last scheduling period. */
651 	prev_total_stats = lw_thread->total_stats;
652 
653 	spdk_set_thread(thread);
654 	spdk_thread_get_stats(&lw_thread->total_stats);
655 	spdk_set_thread(NULL);
656 
657 	lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc;
658 	lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc;
659 }
660 
661 static void
662 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info)
663 {
664 	struct spdk_lw_thread *lw_thread;
665 	struct spdk_thread *thread;
666 
667 	thread = spdk_thread_get_by_id(thread_info->thread_id);
668 	if (thread == NULL) {
669 		/* Thread no longer exists. */
670 		return;
671 	}
672 	lw_thread = spdk_thread_get_ctx(thread);
673 	assert(lw_thread != NULL);
674 
675 	lw_thread->lcore = thread_info->lcore;
676 	lw_thread->resched = true;
677 }
678 
679 static void
680 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
681 {
682 	struct spdk_scheduler_core_info *core;
683 	struct spdk_scheduler_thread_info *thread_info;
684 	uint32_t i, j;
685 
686 	SPDK_ENV_FOREACH_CORE(i) {
687 		core = &cores_info[i];
688 		for (j = 0; j < core->threads_count; j++) {
689 			thread_info = &core->thread_infos[j];
690 			if (thread_info->lcore != i) {
691 				_threads_reschedule_thread(thread_info);
692 			}
693 		}
694 		core->threads_count = 0;
695 		free(core->thread_infos);
696 		core->thread_infos = NULL;
697 	}
698 }
699 
700 static void
701 _reactors_scheduler_fini(void)
702 {
703 	/* Reschedule based on the balancing output */
704 	_threads_reschedule(g_core_infos);
705 
706 	g_scheduling_in_progress = false;
707 }
708 
709 static void
710 _reactors_scheduler_update_core_mode(void *ctx)
711 {
712 	struct spdk_reactor *reactor;
713 	uint32_t i;
714 	int rc = 0;
715 
716 	for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) {
717 		reactor = spdk_reactor_get(i);
718 		assert(reactor != NULL);
719 		if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) {
720 			/* Switch next found reactor to new state */
721 			rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode,
722 							     _reactors_scheduler_update_core_mode, NULL);
723 			if (rc == 0) {
724 				/* Set core to start with after callback completes */
725 				g_scheduler_core_number = spdk_env_get_next_core(i);
726 				return;
727 			}
728 		}
729 	}
730 	_reactors_scheduler_fini();
731 }
732 
733 static void
734 _reactors_scheduler_cancel(void *arg1, void *arg2)
735 {
736 	struct spdk_scheduler_core_info *core;
737 	uint32_t i;
738 
739 	SPDK_ENV_FOREACH_CORE(i) {
740 		core = &g_core_infos[i];
741 		core->threads_count = 0;
742 		free(core->thread_infos);
743 		core->thread_infos = NULL;
744 	}
745 
746 	g_scheduling_in_progress = false;
747 }
748 
749 static void
750 _reactors_scheduler_balance(void *arg1, void *arg2)
751 {
752 	struct spdk_scheduler *scheduler = spdk_scheduler_get();
753 
754 	if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) {
755 		_reactors_scheduler_cancel(NULL, NULL);
756 		return;
757 	}
758 
759 	scheduler->balance(g_core_infos, g_reactor_count);
760 
761 	g_scheduler_core_number = spdk_env_get_first_core();
762 	_reactors_scheduler_update_core_mode(NULL);
763 }
764 
765 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
766 static void
767 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
768 {
769 	struct spdk_scheduler_core_info *core_info;
770 	struct spdk_lw_thread *lw_thread;
771 	struct spdk_thread *thread;
772 	struct spdk_reactor *reactor;
773 	uint32_t next_core;
774 	uint32_t i = 0;
775 
776 	reactor = spdk_reactor_get(spdk_env_get_current_core());
777 	assert(reactor != NULL);
778 	core_info = &g_core_infos[reactor->lcore];
779 	core_info->lcore = reactor->lcore;
780 	core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc;
781 	core_info->total_idle_tsc = reactor->idle_tsc;
782 	core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc;
783 	core_info->total_busy_tsc = reactor->busy_tsc;
784 	core_info->interrupt_mode = reactor->in_interrupt;
785 	core_info->threads_count = 0;
786 
787 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
788 
789 	if (reactor->thread_count > 0) {
790 		core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos));
791 		if (core_info->thread_infos == NULL) {
792 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
793 
794 			/* Cancel this round of schedule work */
795 			_event_call(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL);
796 			return;
797 		}
798 
799 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
800 			_init_thread_stats(reactor, lw_thread);
801 
802 			core_info->thread_infos[i].lcore = lw_thread->lcore;
803 			thread = spdk_thread_get_from_ctx(lw_thread);
804 			assert(thread != NULL);
805 			core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread);
806 			core_info->thread_infos[i].total_stats = lw_thread->total_stats;
807 			core_info->thread_infos[i].current_stats = lw_thread->current_stats;
808 			core_info->threads_count++;
809 			assert(core_info->threads_count <= reactor->thread_count);
810 			i++;
811 		}
812 	}
813 
814 	next_core = spdk_env_get_next_core(reactor->lcore);
815 	if (next_core == UINT32_MAX) {
816 		next_core = spdk_env_get_first_core();
817 	}
818 
819 	/* If we've looped back around to the scheduler thread, move to the next phase */
820 	if (next_core == g_scheduling_reactor->lcore) {
821 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
822 		_event_call(next_core, _reactors_scheduler_balance, NULL, NULL);
823 		return;
824 	}
825 
826 	_event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
827 }
828 
829 static int _reactor_schedule_thread(struct spdk_thread *thread);
830 static uint64_t g_rusage_period;
831 
832 static void
833 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
834 {
835 	struct spdk_thread	*thread = spdk_thread_get_from_ctx(lw_thread);
836 	int efd;
837 
838 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
839 	assert(reactor->thread_count > 0);
840 	reactor->thread_count--;
841 
842 	/* Operate thread intr if running with full interrupt ability */
843 	if (spdk_interrupt_mode_is_enabled()) {
844 		efd = spdk_thread_get_interrupt_fd(thread);
845 		spdk_fd_group_remove(reactor->fgrp, efd);
846 	}
847 }
848 
849 static bool
850 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
851 {
852 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
853 
854 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
855 			  spdk_thread_is_idle(thread))) {
856 		_reactor_remove_lw_thread(reactor, lw_thread);
857 		spdk_thread_destroy(thread);
858 		return true;
859 	}
860 
861 	if (spdk_unlikely(lw_thread->resched)) {
862 		lw_thread->resched = false;
863 		_reactor_remove_lw_thread(reactor, lw_thread);
864 		_reactor_schedule_thread(thread);
865 		return true;
866 	}
867 
868 	return false;
869 }
870 
871 static void
872 reactor_interrupt_run(struct spdk_reactor *reactor)
873 {
874 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
875 
876 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
877 }
878 
879 static void
880 _reactor_run(struct spdk_reactor *reactor)
881 {
882 	struct spdk_thread	*thread;
883 	struct spdk_lw_thread	*lw_thread, *tmp;
884 	uint64_t		now;
885 	int			rc;
886 
887 	event_queue_run_batch(reactor);
888 
889 	/* If no threads are present on the reactor,
890 	 * tsc_last gets outdated. Update it to track
891 	 * thread execution time correctly. */
892 	if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) {
893 		now = spdk_get_ticks();
894 		reactor->idle_tsc += now - reactor->tsc_last;
895 		reactor->tsc_last = now;
896 		return;
897 	}
898 
899 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
900 		thread = spdk_thread_get_from_ctx(lw_thread);
901 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
902 
903 		now = spdk_thread_get_last_tsc(thread);
904 		if (rc == 0) {
905 			reactor->idle_tsc += now - reactor->tsc_last;
906 		} else if (rc > 0) {
907 			reactor->busy_tsc += now - reactor->tsc_last;
908 		}
909 		reactor->tsc_last = now;
910 
911 		reactor_post_process_lw_thread(reactor, lw_thread);
912 	}
913 }
914 
915 static int
916 reactor_run(void *arg)
917 {
918 	struct spdk_reactor	*reactor = arg;
919 	struct spdk_thread	*thread;
920 	struct spdk_lw_thread	*lw_thread, *tmp;
921 	char			thread_name[32];
922 	uint64_t		last_sched = 0;
923 
924 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
925 
926 	/* Rename the POSIX thread because the reactor is tied to the POSIX
927 	 * thread in the SPDK event library.
928 	 */
929 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
930 	_set_thread_name(thread_name);
931 
932 	reactor->tsc_last = spdk_get_ticks();
933 
934 	while (1) {
935 		/* Execute interrupt process fn if this reactor currently runs in interrupt state */
936 		if (spdk_unlikely(reactor->in_interrupt)) {
937 			reactor_interrupt_run(reactor);
938 		} else {
939 			_reactor_run(reactor);
940 		}
941 
942 		if (g_framework_context_switch_monitor_enabled) {
943 			if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
944 				get_rusage(reactor);
945 				reactor->last_rusage = reactor->tsc_last;
946 			}
947 		}
948 
949 		if (spdk_unlikely(g_scheduler_period > 0 &&
950 				  (reactor->tsc_last - last_sched) > g_scheduler_period &&
951 				  reactor == g_scheduling_reactor &&
952 				  !g_scheduling_in_progress)) {
953 			last_sched = reactor->tsc_last;
954 			g_scheduling_in_progress = true;
955 			_reactors_scheduler_gather_metrics(NULL, NULL);
956 		}
957 
958 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
959 			break;
960 		}
961 	}
962 
963 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
964 		thread = spdk_thread_get_from_ctx(lw_thread);
965 		spdk_set_thread(thread);
966 		spdk_thread_exit(thread);
967 	}
968 
969 	while (!TAILQ_EMPTY(&reactor->threads)) {
970 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
971 			thread = spdk_thread_get_from_ctx(lw_thread);
972 			spdk_set_thread(thread);
973 			if (spdk_thread_is_exited(thread)) {
974 				_reactor_remove_lw_thread(reactor, lw_thread);
975 				spdk_thread_destroy(thread);
976 			} else {
977 				spdk_thread_poll(thread, 0, 0);
978 			}
979 		}
980 	}
981 
982 	return 0;
983 }
984 
985 int
986 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
987 {
988 	int ret;
989 	const struct spdk_cpuset *validmask;
990 
991 	ret = spdk_cpuset_parse(cpumask, mask);
992 	if (ret < 0) {
993 		return ret;
994 	}
995 
996 	validmask = spdk_app_get_core_mask();
997 	spdk_cpuset_and(cpumask, validmask);
998 
999 	return 0;
1000 }
1001 
1002 const struct spdk_cpuset *
1003 spdk_app_get_core_mask(void)
1004 {
1005 	return &g_reactor_core_mask;
1006 }
1007 
1008 void
1009 spdk_reactors_start(void)
1010 {
1011 	struct spdk_reactor *reactor;
1012 	uint32_t i, current_core;
1013 	int rc;
1014 
1015 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
1016 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
1017 	/* Reinitialize to false, in case the app framework is restarting in the same process. */
1018 	g_stopping_reactors = false;
1019 
1020 	current_core = spdk_env_get_current_core();
1021 	SPDK_ENV_FOREACH_CORE(i) {
1022 		if (i != current_core) {
1023 			reactor = spdk_reactor_get(i);
1024 			if (reactor == NULL) {
1025 				continue;
1026 			}
1027 
1028 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
1029 			if (rc < 0) {
1030 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
1031 				assert(false);
1032 				return;
1033 			}
1034 		}
1035 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
1036 	}
1037 
1038 	/* Start the main reactor */
1039 	reactor = spdk_reactor_get(current_core);
1040 	assert(reactor != NULL);
1041 	reactor_run(reactor);
1042 
1043 	spdk_env_thread_wait_all();
1044 
1045 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
1046 }
1047 
1048 static void
1049 _reactors_stop(void *arg1, void *arg2)
1050 {
1051 	uint32_t i;
1052 	int rc;
1053 	struct spdk_reactor *reactor;
1054 	struct spdk_reactor *local_reactor;
1055 	uint64_t notify = 1;
1056 
1057 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
1058 	local_reactor = spdk_reactor_get(spdk_env_get_current_core());
1059 
1060 	SPDK_ENV_FOREACH_CORE(i) {
1061 		/* If spdk_event_call isn't called  on a reactor, always send a notification.
1062 		 * If it is called on a reactor, send a notification if the destination reactor
1063 		 * is indicated in interrupt mode state.
1064 		 */
1065 		if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
1066 			reactor = spdk_reactor_get(i);
1067 			assert(reactor != NULL);
1068 			rc = write(reactor->events_fd, &notify, sizeof(notify));
1069 			if (rc < 0) {
1070 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
1071 				continue;
1072 			}
1073 		}
1074 	}
1075 }
1076 
1077 static void
1078 nop(void *arg1, void *arg2)
1079 {
1080 }
1081 
1082 void
1083 spdk_reactors_stop(void *arg1)
1084 {
1085 	spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop);
1086 }
1087 
1088 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
1089 static uint32_t g_next_core = UINT32_MAX;
1090 
1091 static int
1092 thread_process_interrupts(void *arg)
1093 {
1094 	struct spdk_thread *thread = arg;
1095 	struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
1096 	uint64_t now;
1097 	int rc;
1098 
1099 	assert(reactor != NULL);
1100 
1101 	/* Update idle_tsc between the end of last intr_fn and the start of this intr_fn. */
1102 	now = spdk_get_ticks();
1103 	reactor->idle_tsc += now - reactor->tsc_last;
1104 	reactor->tsc_last = now;
1105 
1106 	rc = spdk_thread_poll(thread, 0, now);
1107 
1108 	/* Update tsc between the start and the end of this intr_fn. */
1109 	now = spdk_thread_get_last_tsc(thread);
1110 	if (rc == 0) {
1111 		reactor->idle_tsc += now - reactor->tsc_last;
1112 	} else if (rc > 0) {
1113 		reactor->busy_tsc += now - reactor->tsc_last;
1114 	}
1115 	reactor->tsc_last = now;
1116 
1117 	return rc;
1118 }
1119 
1120 static void
1121 _schedule_thread(void *arg1, void *arg2)
1122 {
1123 	struct spdk_lw_thread *lw_thread = arg1;
1124 	struct spdk_thread *thread;
1125 	struct spdk_reactor *reactor;
1126 	uint32_t current_core;
1127 	int efd;
1128 
1129 	current_core = spdk_env_get_current_core();
1130 	reactor = spdk_reactor_get(current_core);
1131 	assert(reactor != NULL);
1132 
1133 	/* Update total_stats to reflect state of thread
1134 	* at the end of the move. */
1135 	thread = spdk_thread_get_from_ctx(lw_thread);
1136 	spdk_set_thread(thread);
1137 	spdk_thread_get_stats(&lw_thread->total_stats);
1138 	spdk_set_thread(NULL);
1139 
1140 	lw_thread->lcore = current_core;
1141 
1142 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
1143 	reactor->thread_count++;
1144 
1145 	/* Operate thread intr if running with full interrupt ability */
1146 	if (spdk_interrupt_mode_is_enabled()) {
1147 		int rc;
1148 
1149 		efd = spdk_thread_get_interrupt_fd(thread);
1150 		rc = SPDK_FD_GROUP_ADD(reactor->fgrp, efd,
1151 				       thread_process_interrupts, thread);
1152 		if (rc < 0) {
1153 			SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
1154 		}
1155 
1156 		/* Align spdk_thread with reactor to interrupt mode or poll mode */
1157 		spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor);
1158 	}
1159 }
1160 
1161 static int
1162 _reactor_schedule_thread(struct spdk_thread *thread)
1163 {
1164 	uint32_t core;
1165 	struct spdk_lw_thread *lw_thread;
1166 	struct spdk_event *evt = NULL;
1167 	struct spdk_cpuset *cpumask;
1168 	uint32_t i;
1169 	struct spdk_reactor *local_reactor = NULL;
1170 	uint32_t current_lcore = spdk_env_get_current_core();
1171 	struct spdk_cpuset polling_cpumask;
1172 	struct spdk_cpuset valid_cpumask;
1173 
1174 	cpumask = spdk_thread_get_cpumask(thread);
1175 
1176 	lw_thread = spdk_thread_get_ctx(thread);
1177 	assert(lw_thread != NULL);
1178 	core = lw_thread->lcore;
1179 	memset(lw_thread, 0, sizeof(*lw_thread));
1180 
1181 	if (current_lcore != SPDK_ENV_LCORE_ID_ANY) {
1182 		local_reactor = spdk_reactor_get(current_lcore);
1183 		assert(local_reactor);
1184 	}
1185 
1186 	/* When interrupt ability of spdk_thread is not enabled and the current
1187 	 * reactor runs on DPDK thread, skip reactors which are in interrupt mode.
1188 	 */
1189 	if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) {
1190 		/* Get the cpumask of all reactors in polling */
1191 		spdk_cpuset_zero(&polling_cpumask);
1192 		SPDK_ENV_FOREACH_CORE(i) {
1193 			spdk_cpuset_set_cpu(&polling_cpumask, i, true);
1194 		}
1195 		spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset);
1196 
1197 		if (core == SPDK_ENV_LCORE_ID_ANY) {
1198 			/* Get the cpumask of all valid reactors which are suggested and also in polling */
1199 			spdk_cpuset_copy(&valid_cpumask, &polling_cpumask);
1200 			spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread));
1201 
1202 			/* If there are any valid reactors, spdk_thread should be scheduled
1203 			 * into one of the valid reactors.
1204 			 * If there is no valid reactors, spdk_thread should be scheduled
1205 			 * into one of the polling reactors.
1206 			 */
1207 			if (spdk_cpuset_count(&valid_cpumask) != 0) {
1208 				cpumask = &valid_cpumask;
1209 			} else {
1210 				cpumask = &polling_cpumask;
1211 			}
1212 		} else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) {
1213 			/* If specified reactor is not in polling, spdk_thread should be scheduled
1214 			 * into one of the polling reactors.
1215 			 */
1216 			core = SPDK_ENV_LCORE_ID_ANY;
1217 			cpumask = &polling_cpumask;
1218 		}
1219 	}
1220 
1221 	pthread_mutex_lock(&g_scheduler_mtx);
1222 	if (core == SPDK_ENV_LCORE_ID_ANY) {
1223 		for (i = 0; i < spdk_env_get_core_count(); i++) {
1224 			if (g_next_core >= g_reactor_count) {
1225 				g_next_core = spdk_env_get_first_core();
1226 			}
1227 			core = g_next_core;
1228 			g_next_core = spdk_env_get_next_core(g_next_core);
1229 
1230 			if (spdk_cpuset_get_cpu(cpumask, core)) {
1231 				break;
1232 			}
1233 		}
1234 	}
1235 
1236 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
1237 
1238 	pthread_mutex_unlock(&g_scheduler_mtx);
1239 
1240 	assert(evt != NULL);
1241 	if (evt == NULL) {
1242 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
1243 		return -1;
1244 	}
1245 
1246 	lw_thread->tsc_start = spdk_get_ticks();
1247 
1248 	spdk_event_call(evt);
1249 
1250 	return 0;
1251 }
1252 
1253 static void
1254 _reactor_request_thread_reschedule(struct spdk_thread *thread)
1255 {
1256 	struct spdk_lw_thread *lw_thread;
1257 	struct spdk_reactor *reactor;
1258 	uint32_t current_core;
1259 
1260 	assert(thread == spdk_get_thread());
1261 
1262 	lw_thread = spdk_thread_get_ctx(thread);
1263 
1264 	assert(lw_thread != NULL);
1265 	lw_thread->resched = true;
1266 	lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1267 
1268 	current_core = spdk_env_get_current_core();
1269 	reactor = spdk_reactor_get(current_core);
1270 	assert(reactor != NULL);
1271 
1272 	/* Send a notification if the destination reactor is indicated in intr mode state */
1273 	if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
1274 		uint64_t notify = 1;
1275 
1276 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1277 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
1278 		}
1279 	}
1280 }
1281 
1282 static int
1283 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
1284 {
1285 	struct spdk_lw_thread *lw_thread;
1286 
1287 	switch (op) {
1288 	case SPDK_THREAD_OP_NEW:
1289 		lw_thread = spdk_thread_get_ctx(thread);
1290 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1291 		return _reactor_schedule_thread(thread);
1292 	case SPDK_THREAD_OP_RESCHED:
1293 		_reactor_request_thread_reschedule(thread);
1294 		return 0;
1295 	default:
1296 		return -ENOTSUP;
1297 	}
1298 }
1299 
1300 static bool
1301 reactor_thread_op_supported(enum spdk_thread_op op)
1302 {
1303 	switch (op) {
1304 	case SPDK_THREAD_OP_NEW:
1305 	case SPDK_THREAD_OP_RESCHED:
1306 		return true;
1307 	default:
1308 		return false;
1309 	}
1310 }
1311 
1312 struct call_reactor {
1313 	uint32_t cur_core;
1314 	spdk_event_fn fn;
1315 	void *arg1;
1316 	void *arg2;
1317 
1318 	uint32_t orig_core;
1319 	spdk_event_fn cpl;
1320 };
1321 
1322 static void
1323 on_reactor(void *arg1, void *arg2)
1324 {
1325 	struct call_reactor *cr = arg1;
1326 	struct spdk_event *evt;
1327 
1328 	cr->fn(cr->arg1, cr->arg2);
1329 
1330 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1331 
1332 	if (cr->cur_core >= g_reactor_count) {
1333 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1334 
1335 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1336 		free(cr);
1337 	} else {
1338 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1339 			      cr->cur_core);
1340 
1341 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1342 	}
1343 	assert(evt != NULL);
1344 	spdk_event_call(evt);
1345 }
1346 
1347 void
1348 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1349 {
1350 	struct call_reactor *cr;
1351 
1352 	/* When the application framework is shutting down, we will send one
1353 	 * final for_each_reactor operation with completion callback _reactors_stop,
1354 	 * to flush any existing for_each_reactor operations to avoid any memory
1355 	 * leaks. We use a mutex here to protect a boolean flag that will ensure
1356 	 * we don't start any more operations once we've started shutting down.
1357 	 */
1358 	pthread_mutex_lock(&g_stopping_reactors_mtx);
1359 	if (g_stopping_reactors) {
1360 		pthread_mutex_unlock(&g_stopping_reactors_mtx);
1361 		return;
1362 	} else if (cpl == _reactors_stop) {
1363 		g_stopping_reactors = true;
1364 	}
1365 	pthread_mutex_unlock(&g_stopping_reactors_mtx);
1366 
1367 	cr = calloc(1, sizeof(*cr));
1368 	if (!cr) {
1369 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1370 		cpl(arg1, arg2);
1371 		return;
1372 	}
1373 
1374 	cr->fn = fn;
1375 	cr->arg1 = arg1;
1376 	cr->arg2 = arg2;
1377 	cr->cpl = cpl;
1378 	cr->orig_core = spdk_env_get_current_core();
1379 	cr->cur_core = spdk_env_get_first_core();
1380 
1381 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1382 
1383 	_event_call(cr->cur_core, on_reactor, cr, NULL);
1384 }
1385 
1386 #ifdef __linux__
1387 static int
1388 reactor_schedule_thread_event(void *arg)
1389 {
1390 	struct spdk_reactor *reactor = arg;
1391 	struct spdk_lw_thread *lw_thread, *tmp;
1392 	uint32_t count = 0;
1393 	uint64_t notify = 1;
1394 
1395 	assert(reactor->in_interrupt);
1396 
1397 	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1398 		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
1399 		return -errno;
1400 	}
1401 
1402 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1403 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1404 	}
1405 
1406 	return count;
1407 }
1408 
1409 static int
1410 reactor_interrupt_init(struct spdk_reactor *reactor)
1411 {
1412 	int rc;
1413 
1414 	rc = spdk_fd_group_create(&reactor->fgrp);
1415 	if (rc != 0) {
1416 		return rc;
1417 	}
1418 
1419 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1420 	if (reactor->resched_fd < 0) {
1421 		rc = -EBADF;
1422 		goto err;
1423 	}
1424 
1425 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event,
1426 			       reactor);
1427 	if (rc) {
1428 		close(reactor->resched_fd);
1429 		goto err;
1430 	}
1431 
1432 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1433 	if (reactor->events_fd < 0) {
1434 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1435 		close(reactor->resched_fd);
1436 
1437 		rc = -EBADF;
1438 		goto err;
1439 	}
1440 
1441 	rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd,
1442 			       event_queue_run_batch, reactor);
1443 	if (rc) {
1444 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1445 		close(reactor->resched_fd);
1446 		close(reactor->events_fd);
1447 		goto err;
1448 	}
1449 
1450 	return 0;
1451 
1452 err:
1453 	spdk_fd_group_destroy(reactor->fgrp);
1454 	reactor->fgrp = NULL;
1455 	return rc;
1456 }
1457 #else
1458 static int
1459 reactor_interrupt_init(struct spdk_reactor *reactor)
1460 {
1461 	return -ENOTSUP;
1462 }
1463 #endif
1464 
1465 static void
1466 reactor_interrupt_fini(struct spdk_reactor *reactor)
1467 {
1468 	struct spdk_fd_group *fgrp = reactor->fgrp;
1469 
1470 	if (!fgrp) {
1471 		return;
1472 	}
1473 
1474 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1475 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1476 
1477 	close(reactor->events_fd);
1478 	close(reactor->resched_fd);
1479 
1480 	spdk_fd_group_destroy(fgrp);
1481 	reactor->fgrp = NULL;
1482 }
1483 
1484 static struct spdk_governor *
1485 _governor_find(const char *name)
1486 {
1487 	struct spdk_governor *governor, *tmp;
1488 
1489 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1490 		if (strcmp(name, governor->name) == 0) {
1491 			return governor;
1492 		}
1493 	}
1494 
1495 	return NULL;
1496 }
1497 
1498 int
1499 spdk_governor_set(const char *name)
1500 {
1501 	struct spdk_governor *governor;
1502 	int rc = 0;
1503 
1504 	/* NULL governor was specifically requested */
1505 	if (name == NULL) {
1506 		if (g_governor) {
1507 			g_governor->deinit();
1508 		}
1509 		g_governor = NULL;
1510 		return 0;
1511 	}
1512 
1513 	governor = _governor_find(name);
1514 	if (governor == NULL) {
1515 		return -EINVAL;
1516 	}
1517 
1518 	if (g_governor == governor) {
1519 		return 0;
1520 	}
1521 
1522 	rc = governor->init();
1523 	if (rc == 0) {
1524 		if (g_governor) {
1525 			g_governor->deinit();
1526 		}
1527 		g_governor = governor;
1528 	}
1529 
1530 	return rc;
1531 }
1532 
1533 struct spdk_governor *
1534 spdk_governor_get(void)
1535 {
1536 	return g_governor;
1537 }
1538 
1539 void
1540 spdk_governor_register(struct spdk_governor *governor)
1541 {
1542 	if (_governor_find(governor->name)) {
1543 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1544 		assert(false);
1545 		return;
1546 	}
1547 
1548 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1549 }
1550 
1551 SPDK_LOG_REGISTER_COMPONENT(reactor)
1552