xref: /spdk/lib/event/reactor.c (revision a474889bc6cae9a5525f8f519714cf0b07ca836b)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/likely.h"
36 
37 #include "spdk_internal/event.h"
38 
39 #include "spdk/log.h"
40 #include "spdk/thread.h"
41 #include "spdk/env.h"
42 #include "spdk/util.h"
43 #include "spdk/string.h"
44 #include "spdk/fd_group.h"
45 
46 #ifdef __linux__
47 #include <sys/prctl.h>
48 #include <sys/eventfd.h>
49 #endif
50 
51 #ifdef __FreeBSD__
52 #include <pthread_np.h>
53 #endif
54 
55 #define SPDK_EVENT_BATCH_SIZE		8
56 
57 static struct spdk_reactor *g_reactors;
58 static struct spdk_cpuset g_reactor_core_mask;
59 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
60 
61 static bool g_framework_context_switch_monitor_enabled = true;
62 
63 static struct spdk_mempool *g_spdk_event_mempool = NULL;
64 
65 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
66 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
67 
68 static struct spdk_scheduler *g_scheduler;
69 static struct spdk_scheduler *g_new_scheduler;
70 static struct spdk_reactor *g_scheduling_reactor;
71 static uint32_t g_scheduler_period;
72 static struct spdk_scheduler_core_info *g_core_infos = NULL;
73 
74 TAILQ_HEAD(, spdk_governor) g_governor_list
75 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
76 
77 static int _governor_get_capabilities(uint32_t lcore_id,
78 				      struct spdk_governor_capabilities *capabilities);
79 
80 static struct spdk_governor g_governor = {
81 	.name = "default",
82 	.get_core_capabilities = _governor_get_capabilities,
83 };
84 
85 static int reactor_interrupt_init(struct spdk_reactor *reactor);
86 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
87 
88 static struct spdk_scheduler *
89 _scheduler_find(char *name)
90 {
91 	struct spdk_scheduler *tmp;
92 
93 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
94 		if (strcmp(name, tmp->name) == 0) {
95 			return tmp;
96 		}
97 	}
98 
99 	return NULL;
100 }
101 
102 int
103 _spdk_scheduler_set(char *name)
104 {
105 	struct spdk_scheduler *scheduler;
106 
107 	scheduler = _scheduler_find(name);
108 	if (scheduler == NULL) {
109 		SPDK_ERRLOG("Requested scheduler is missing\n");
110 		return -ENOENT;
111 	}
112 
113 	if (g_reactors == NULL || g_scheduling_reactor == NULL) {
114 		g_new_scheduler = scheduler;
115 		g_scheduler = scheduler;
116 		return 0;
117 	}
118 
119 	if (g_scheduling_reactor->flags.is_scheduling) {
120 		g_new_scheduler = scheduler;
121 	} else {
122 		if (g_scheduler->deinit != NULL) {
123 			g_scheduler->deinit(&g_governor);
124 		}
125 
126 		g_new_scheduler = scheduler;
127 		g_scheduler = scheduler;
128 	}
129 
130 	if (scheduler->init != NULL) {
131 		scheduler->init(&g_governor);
132 	}
133 
134 	return 0;
135 }
136 
137 void
138 _spdk_scheduler_period_set(uint32_t period)
139 {
140 	g_scheduler_period = period;
141 }
142 
143 void
144 _spdk_scheduler_list_add(struct spdk_scheduler *scheduler)
145 {
146 	if (_scheduler_find(scheduler->name)) {
147 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
148 		assert(false);
149 		return;
150 	}
151 
152 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
153 }
154 
155 static void
156 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
157 {
158 	reactor->lcore = lcore;
159 	reactor->flags.is_valid = true;
160 
161 	TAILQ_INIT(&reactor->threads);
162 	reactor->thread_count = 0;
163 
164 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
165 	assert(reactor->events != NULL);
166 
167 	if (spdk_interrupt_mode_is_enabled()) {
168 		reactor_interrupt_init(reactor);
169 	}
170 }
171 
172 struct spdk_reactor *
173 spdk_reactor_get(uint32_t lcore)
174 {
175 	struct spdk_reactor *reactor;
176 
177 	if (g_reactors == NULL) {
178 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
179 		return NULL;
180 	}
181 
182 	reactor = &g_reactors[lcore];
183 
184 	if (reactor->flags.is_valid == false) {
185 		return NULL;
186 	}
187 
188 	return reactor;
189 }
190 
191 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
192 static bool reactor_thread_op_supported(enum spdk_thread_op op);
193 
194 int
195 spdk_reactors_init(void)
196 {
197 	int rc;
198 	uint32_t i, last_core;
199 	char mempool_name[32];
200 
201 	rc = _spdk_scheduler_set("static");
202 	if (rc != 0) {
203 		SPDK_ERRLOG("Failed setting up scheduler\n");
204 		return rc;
205 	}
206 
207 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
208 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
209 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
210 			       sizeof(struct spdk_event),
211 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
212 			       SPDK_ENV_SOCKET_ID_ANY);
213 
214 	if (g_spdk_event_mempool == NULL) {
215 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
216 		return -1;
217 	}
218 
219 	/* struct spdk_reactor must be aligned on 64 byte boundary */
220 	last_core = spdk_env_get_last_core();
221 	rc = posix_memalign((void **)&g_reactors, 64,
222 			    (last_core + 1) * sizeof(struct spdk_reactor));
223 	if (rc != 0) {
224 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
225 			    last_core + 1);
226 		spdk_mempool_free(g_spdk_event_mempool);
227 		return -1;
228 	}
229 
230 	g_core_infos = calloc(last_core + 1, sizeof(*g_core_infos));
231 	if (g_core_infos == NULL) {
232 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
233 		spdk_mempool_free(g_spdk_event_mempool);
234 		free(g_reactors);
235 		return -ENOMEM;
236 	}
237 
238 	memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));
239 
240 	spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
241 				 sizeof(struct spdk_lw_thread));
242 
243 	SPDK_ENV_FOREACH_CORE(i) {
244 		reactor_construct(&g_reactors[i], i);
245 	}
246 
247 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
248 
249 	return 0;
250 }
251 
252 void
253 spdk_reactors_fini(void)
254 {
255 	uint32_t i;
256 	struct spdk_reactor *reactor;
257 
258 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
259 		return;
260 	}
261 
262 	if (g_scheduler->deinit != NULL) {
263 		g_scheduler->deinit(&g_governor);
264 	}
265 
266 	spdk_thread_lib_fini();
267 
268 	SPDK_ENV_FOREACH_CORE(i) {
269 		reactor = spdk_reactor_get(i);
270 		assert(reactor != NULL);
271 		assert(reactor->thread_count == 0);
272 		if (reactor->events != NULL) {
273 			spdk_ring_free(reactor->events);
274 		}
275 
276 		if (reactor->interrupt_mode) {
277 			reactor_interrupt_fini(reactor);
278 		}
279 
280 		if (g_core_infos != NULL) {
281 			free(g_core_infos[i].threads);
282 		}
283 	}
284 
285 	spdk_mempool_free(g_spdk_event_mempool);
286 
287 	free(g_reactors);
288 	g_reactors = NULL;
289 	free(g_core_infos);
290 	g_core_infos = NULL;
291 }
292 
293 struct spdk_event *
294 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
295 {
296 	struct spdk_event *event = NULL;
297 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
298 
299 	if (!reactor) {
300 		assert(false);
301 		return NULL;
302 	}
303 
304 	event = spdk_mempool_get(g_spdk_event_mempool);
305 	if (event == NULL) {
306 		assert(false);
307 		return NULL;
308 	}
309 
310 	event->lcore = lcore;
311 	event->fn = fn;
312 	event->arg1 = arg1;
313 	event->arg2 = arg2;
314 
315 	return event;
316 }
317 
318 void
319 spdk_event_call(struct spdk_event *event)
320 {
321 	int rc;
322 	struct spdk_reactor *reactor;
323 
324 	reactor = spdk_reactor_get(event->lcore);
325 
326 	assert(reactor != NULL);
327 	assert(reactor->events != NULL);
328 
329 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
330 	if (rc != 1) {
331 		assert(false);
332 	}
333 
334 	if (reactor->interrupt_mode) {
335 		uint64_t notify = 1;
336 
337 		rc = write(reactor->events_fd, &notify, sizeof(notify));
338 		if (rc < 0) {
339 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
340 		}
341 	}
342 }
343 
344 static inline uint32_t
345 event_queue_run_batch(struct spdk_reactor *reactor)
346 {
347 	unsigned count, i;
348 	void *events[SPDK_EVENT_BATCH_SIZE];
349 	struct spdk_thread *thread;
350 	struct spdk_lw_thread *lw_thread;
351 
352 #ifdef DEBUG
353 	/*
354 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
355 	 * so we will never actually read uninitialized data from events, but just to be sure
356 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
357 	 */
358 	memset(events, 0, sizeof(events));
359 #endif
360 
361 	if (reactor->interrupt_mode) {
362 		uint64_t notify = 1;
363 		int rc;
364 
365 		/* There may be race between event_acknowledge and another producer's event_notify,
366 		 * so event_acknowledge should be applied ahead. And then check for self's event_notify.
367 		 * This can avoid event notification missing.
368 		 */
369 		rc = read(reactor->events_fd, &notify, sizeof(notify));
370 		if (rc < 0) {
371 			SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno));
372 			return -errno;
373 		}
374 
375 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
376 
377 		if (spdk_ring_count(reactor->events) != 0) {
378 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
379 			rc = write(reactor->events_fd, &notify, sizeof(notify));
380 			if (rc < 0) {
381 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
382 				return -errno;
383 			}
384 		}
385 	} else {
386 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
387 	}
388 
389 	if (count == 0) {
390 		return 0;
391 	}
392 
393 	/* Execute the events. There are still some remaining events
394 	 * that must occur on an SPDK thread. To accomodate those, try to
395 	 * run them on the first thread in the list, if it exists. */
396 	lw_thread = TAILQ_FIRST(&reactor->threads);
397 	if (lw_thread) {
398 		thread = spdk_thread_get_from_ctx(lw_thread);
399 	} else {
400 		thread = NULL;
401 	}
402 
403 	spdk_set_thread(thread);
404 
405 	for (i = 0; i < count; i++) {
406 		struct spdk_event *event = events[i];
407 
408 		assert(event != NULL);
409 		event->fn(event->arg1, event->arg2);
410 	}
411 
412 	spdk_set_thread(NULL);
413 
414 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
415 
416 	return count;
417 }
418 
419 /* 1s */
420 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
421 
422 static int
423 get_rusage(struct spdk_reactor *reactor)
424 {
425 	struct rusage		rusage;
426 
427 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
428 		return -1;
429 	}
430 
431 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
432 		SPDK_INFOLOG(reactor,
433 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
434 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
435 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
436 	}
437 	reactor->rusage = rusage;
438 
439 	return -1;
440 }
441 
442 void
443 spdk_framework_enable_context_switch_monitor(bool enable)
444 {
445 	/* This global is being read by multiple threads, so this isn't
446 	 * strictly thread safe. However, we're toggling between true and
447 	 * false here, and if a thread sees the value update later than it
448 	 * should, it's no big deal. */
449 	g_framework_context_switch_monitor_enabled = enable;
450 }
451 
452 bool
453 spdk_framework_context_switch_monitor_enabled(void)
454 {
455 	return g_framework_context_switch_monitor_enabled;
456 }
457 
458 static void
459 _set_thread_name(const char *thread_name)
460 {
461 #if defined(__linux__)
462 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
463 #elif defined(__FreeBSD__)
464 	pthread_set_name_np(pthread_self(), thread_name);
465 #else
466 #error missing platform support for thread name
467 #endif
468 }
469 
470 static void
471 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
472 {
473 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
474 
475 	lw_thread->lcore = reactor->lcore;
476 
477 	spdk_set_thread(thread);
478 	spdk_thread_get_stats(&lw_thread->current_stats);
479 }
480 
481 static void
482 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
483 {
484 	struct spdk_scheduler_core_info *core;
485 	struct spdk_lw_thread *lw_thread;
486 	uint32_t i, j;
487 
488 	SPDK_ENV_FOREACH_CORE(i) {
489 		core = &cores_info[i];
490 		for (j = 0; j < core->threads_count; j++) {
491 			lw_thread = core->threads[j];
492 			if (lw_thread->lcore != lw_thread->new_lcore) {
493 				_spdk_lw_thread_set_core(lw_thread, lw_thread->new_lcore);
494 			}
495 		}
496 	}
497 }
498 
499 static void
500 _reactors_scheduler_fini(void *arg1, void *arg2)
501 {
502 	struct spdk_reactor *reactor;
503 	uint32_t last_core;
504 	uint32_t i;
505 
506 	if (g_reactor_state == SPDK_REACTOR_STATE_RUNNING) {
507 		last_core = spdk_env_get_last_core();
508 		g_scheduler->balance(g_core_infos, last_core + 1, &g_governor);
509 
510 		/* Reschedule based on the balancing output */
511 		_threads_reschedule(g_core_infos);
512 
513 		SPDK_ENV_FOREACH_CORE(i) {
514 			reactor = spdk_reactor_get(i);
515 			reactor->flags.is_scheduling = false;
516 		}
517 	}
518 }
519 
520 static void
521 _reactors_scheduler_cancel(void *arg1, void *arg2)
522 {
523 	struct spdk_reactor *reactor;
524 	uint32_t i;
525 
526 	SPDK_ENV_FOREACH_CORE(i) {
527 		reactor = spdk_reactor_get(i);
528 		reactor->flags.is_scheduling = false;
529 	}
530 }
531 
532 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
533 static void
534 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
535 {
536 	struct spdk_scheduler_core_info *core_info;
537 	struct spdk_lw_thread *lw_thread;
538 	struct spdk_reactor *reactor;
539 	struct spdk_event *evt;
540 	uint32_t next_core;
541 	uint32_t i;
542 
543 	reactor = spdk_reactor_get(spdk_env_get_current_core());
544 	reactor->flags.is_scheduling = true;
545 	core_info = &g_core_infos[reactor->lcore];
546 	core_info->lcore = reactor->lcore;
547 	core_info->core_idle_tsc = reactor->idle_tsc;
548 	core_info->core_busy_tsc = reactor->busy_tsc;
549 
550 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
551 
552 	free(core_info->threads);
553 	core_info->threads = NULL;
554 
555 	i = 0;
556 
557 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
558 		_init_thread_stats(reactor, lw_thread);
559 		i++;
560 	}
561 
562 	core_info->threads_count = i;
563 
564 	if (core_info->threads_count > 0) {
565 		core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *));
566 		if (core_info->threads == NULL) {
567 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
568 
569 			/* Cancel this round of schedule work */
570 			evt = spdk_event_allocate(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL);
571 			spdk_event_call(evt);
572 			return;
573 		}
574 
575 		i = 0;
576 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
577 			core_info->threads[i] = lw_thread;
578 			i++;
579 		}
580 	}
581 
582 	next_core = spdk_env_get_next_core(reactor->lcore);
583 	if (next_core == UINT32_MAX) {
584 		next_core = spdk_env_get_first_core();
585 	}
586 
587 	/* If we've looped back around to the scheduler thread, move to the next phase */
588 	if (next_core == g_scheduling_reactor->lcore) {
589 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
590 		evt = spdk_event_allocate(next_core, _reactors_scheduler_fini, NULL, NULL);
591 		spdk_event_call(evt);
592 		return;
593 	}
594 
595 	evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
596 	spdk_event_call(evt);
597 }
598 
599 static int _reactor_schedule_thread(struct spdk_thread *thread);
600 static uint64_t g_rusage_period;
601 
602 static bool
603 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
604 {
605 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
606 	int efd;
607 
608 	if (spdk_unlikely(lw_thread->resched)) {
609 		lw_thread->resched = false;
610 		TAILQ_REMOVE(&reactor->threads, lw_thread, link);
611 		assert(reactor->thread_count > 0);
612 		reactor->thread_count--;
613 
614 		if (reactor->interrupt_mode) {
615 			efd = spdk_thread_get_interrupt_fd(thread);
616 			spdk_fd_group_remove(reactor->fgrp, efd);
617 		}
618 		_reactor_schedule_thread(thread);
619 		return true;
620 	}
621 
622 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
623 			  spdk_thread_is_idle(thread))) {
624 		if (reactor->flags.is_scheduling == false) {
625 			TAILQ_REMOVE(&reactor->threads, lw_thread, link);
626 			assert(reactor->thread_count > 0);
627 			reactor->thread_count--;
628 
629 			if (reactor->interrupt_mode) {
630 				efd = spdk_thread_get_interrupt_fd(thread);
631 				spdk_fd_group_remove(reactor->fgrp, efd);
632 			}
633 			spdk_thread_destroy(thread);
634 			return true;
635 		}
636 	}
637 
638 	return false;
639 }
640 
641 static void
642 reactor_interrupt_run(struct spdk_reactor *reactor)
643 {
644 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
645 
646 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
647 
648 	/* TODO: add tsc records and g_framework_context_switch_monitor_enabled */
649 }
650 
651 static void
652 _reactor_run(struct spdk_reactor *reactor)
653 {
654 	struct spdk_thread	*thread;
655 	struct spdk_lw_thread	*lw_thread, *tmp;
656 	uint64_t		now;
657 	int			rc;
658 
659 	event_queue_run_batch(reactor);
660 
661 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
662 		thread = spdk_thread_get_from_ctx(lw_thread);
663 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
664 
665 		now = spdk_thread_get_last_tsc(thread);
666 		if (rc == 0) {
667 			reactor->idle_tsc += now - reactor->tsc_last;
668 		} else if (rc > 0) {
669 			reactor->busy_tsc += now - reactor->tsc_last;
670 		}
671 		reactor->tsc_last = now;
672 
673 		reactor_post_process_lw_thread(reactor, lw_thread);
674 	}
675 
676 	if (g_framework_context_switch_monitor_enabled) {
677 		if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
678 			get_rusage(reactor);
679 			reactor->last_rusage = reactor->tsc_last;
680 		}
681 	}
682 }
683 
684 static int
685 reactor_run(void *arg)
686 {
687 	struct spdk_reactor	*reactor = arg;
688 	struct spdk_thread	*thread;
689 	struct spdk_lw_thread	*lw_thread, *tmp;
690 	char			thread_name[32];
691 	uint64_t		last_sched = 0;
692 
693 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
694 
695 	/* Rename the POSIX thread because the reactor is tied to the POSIX
696 	 * thread in the SPDK event library.
697 	 */
698 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
699 	_set_thread_name(thread_name);
700 
701 	reactor->tsc_last = spdk_get_ticks();
702 
703 	while (1) {
704 		if (spdk_unlikely(reactor->interrupt_mode)) {
705 			reactor_interrupt_run(reactor);
706 		} else {
707 			_reactor_run(reactor);
708 		}
709 
710 		if (spdk_unlikely((reactor->tsc_last - last_sched) > g_scheduler_period &&
711 				  reactor == g_scheduling_reactor &&
712 				  !reactor->flags.is_scheduling)) {
713 			if (spdk_unlikely(g_scheduler != g_new_scheduler)) {
714 				if (g_scheduler->deinit != NULL) {
715 					g_scheduler->deinit(&g_governor);
716 				}
717 				g_scheduler = g_new_scheduler;
718 			}
719 
720 			if (spdk_unlikely(g_scheduler->balance != NULL)) {
721 				last_sched = reactor->tsc_last;
722 				_reactors_scheduler_gather_metrics(NULL, NULL);
723 			}
724 		}
725 
726 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
727 			break;
728 		}
729 	}
730 
731 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
732 		thread = spdk_thread_get_from_ctx(lw_thread);
733 		spdk_set_thread(thread);
734 		spdk_thread_exit(thread);
735 	}
736 
737 	while (!TAILQ_EMPTY(&reactor->threads)) {
738 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
739 			thread = spdk_thread_get_from_ctx(lw_thread);
740 			spdk_set_thread(thread);
741 			if (spdk_thread_is_exited(thread)) {
742 				TAILQ_REMOVE(&reactor->threads, lw_thread, link);
743 				assert(reactor->thread_count > 0);
744 				reactor->thread_count--;
745 				if (reactor->interrupt_mode) {
746 					int efd = spdk_thread_get_interrupt_fd(thread);
747 
748 					spdk_fd_group_remove(reactor->fgrp, efd);
749 				}
750 				spdk_thread_destroy(thread);
751 			} else {
752 				spdk_thread_poll(thread, 0, 0);
753 			}
754 		}
755 	}
756 
757 	return 0;
758 }
759 
760 int
761 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
762 {
763 	int ret;
764 	const struct spdk_cpuset *validmask;
765 
766 	ret = spdk_cpuset_parse(cpumask, mask);
767 	if (ret < 0) {
768 		return ret;
769 	}
770 
771 	validmask = spdk_app_get_core_mask();
772 	spdk_cpuset_and(cpumask, validmask);
773 
774 	return 0;
775 }
776 
777 const struct spdk_cpuset *
778 spdk_app_get_core_mask(void)
779 {
780 	return &g_reactor_core_mask;
781 }
782 
783 void
784 spdk_reactors_start(void)
785 {
786 	struct spdk_reactor *reactor;
787 	struct spdk_cpuset tmp_cpumask = {};
788 	uint32_t i, current_core;
789 	int rc;
790 	char thread_name[32];
791 
792 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
793 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
794 
795 	current_core = spdk_env_get_current_core();
796 	SPDK_ENV_FOREACH_CORE(i) {
797 		if (i != current_core) {
798 			reactor = spdk_reactor_get(i);
799 			if (reactor == NULL) {
800 				continue;
801 			}
802 
803 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
804 			if (rc < 0) {
805 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
806 				assert(false);
807 				return;
808 			}
809 
810 			/* For now, for each reactor spawn one thread. */
811 			snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
812 
813 			spdk_cpuset_zero(&tmp_cpumask);
814 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
815 
816 			spdk_thread_create(thread_name, &tmp_cpumask);
817 		}
818 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
819 	}
820 
821 	/* Start the master reactor */
822 	reactor = spdk_reactor_get(current_core);
823 	assert(reactor != NULL);
824 	g_scheduling_reactor = reactor;
825 	reactor_run(reactor);
826 
827 	spdk_env_thread_wait_all();
828 
829 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
830 }
831 
832 void
833 spdk_reactors_stop(void *arg1)
834 {
835 	uint32_t i;
836 	int rc;
837 	struct spdk_reactor *reactor;
838 	uint64_t notify = 1;
839 
840 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
841 
842 	if (spdk_interrupt_mode_is_enabled()) {
843 		SPDK_ENV_FOREACH_CORE(i) {
844 			reactor = spdk_reactor_get(i);
845 
846 			rc = write(reactor->events_fd, &notify, sizeof(notify));
847 			if (rc < 0) {
848 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
849 				continue;
850 			}
851 		}
852 	}
853 }
854 
855 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
856 static uint32_t g_next_core = UINT32_MAX;
857 
858 static int
859 thread_process_interrupts(void *arg)
860 {
861 	struct spdk_thread *thread = arg;
862 
863 	return spdk_thread_poll(thread, 0, 0);
864 }
865 
866 static void
867 _schedule_thread(void *arg1, void *arg2)
868 {
869 	struct spdk_lw_thread *lw_thread = arg1;
870 	struct spdk_reactor *reactor;
871 	uint32_t current_core;
872 	int efd;
873 
874 	current_core = spdk_env_get_current_core();
875 
876 	reactor = spdk_reactor_get(current_core);
877 	assert(reactor != NULL);
878 
879 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
880 	reactor->thread_count++;
881 
882 	if (reactor->interrupt_mode) {
883 		int rc;
884 		struct spdk_thread *thread;
885 
886 		thread = spdk_thread_get_from_ctx(lw_thread);
887 		efd = spdk_thread_get_interrupt_fd(thread);
888 		rc = spdk_fd_group_add(reactor->fgrp, efd, thread_process_interrupts, thread);
889 		if (rc < 0) {
890 			SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
891 		}
892 	}
893 }
894 
895 static int
896 _reactor_schedule_thread(struct spdk_thread *thread)
897 {
898 	uint32_t core;
899 	struct spdk_lw_thread *lw_thread;
900 	struct spdk_event *evt = NULL;
901 	struct spdk_cpuset *cpumask;
902 	uint32_t i;
903 
904 	cpumask = spdk_thread_get_cpumask(thread);
905 
906 	lw_thread = spdk_thread_get_ctx(thread);
907 	assert(lw_thread != NULL);
908 	core = lw_thread->lcore;
909 	memset(lw_thread, 0, sizeof(*lw_thread));
910 
911 	pthread_mutex_lock(&g_scheduler_mtx);
912 	if (core == SPDK_ENV_LCORE_ID_ANY) {
913 		for (i = 0; i < spdk_env_get_core_count(); i++) {
914 			if (g_next_core > spdk_env_get_last_core()) {
915 				g_next_core = spdk_env_get_first_core();
916 			}
917 			core = g_next_core;
918 			g_next_core = spdk_env_get_next_core(g_next_core);
919 
920 			if (spdk_cpuset_get_cpu(cpumask, core)) {
921 				break;
922 			}
923 		}
924 	}
925 
926 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
927 
928 	pthread_mutex_unlock(&g_scheduler_mtx);
929 
930 	assert(evt != NULL);
931 	if (evt == NULL) {
932 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
933 		return -1;
934 	}
935 
936 	lw_thread->tsc_start = spdk_get_ticks();
937 
938 	spdk_event_call(evt);
939 
940 	return 0;
941 }
942 
943 static void
944 _reactor_request_thread_reschedule(struct spdk_thread *thread)
945 {
946 	struct spdk_lw_thread *lw_thread;
947 	struct spdk_reactor *reactor;
948 	uint32_t current_core;
949 
950 	assert(thread == spdk_get_thread());
951 
952 	lw_thread = spdk_thread_get_ctx(thread);
953 
954 	_spdk_lw_thread_set_core(lw_thread, SPDK_ENV_LCORE_ID_ANY);
955 
956 	current_core = spdk_env_get_current_core();
957 	reactor = spdk_reactor_get(current_core);
958 	assert(reactor != NULL);
959 	if (reactor->interrupt_mode) {
960 		uint64_t notify = 1;
961 
962 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
963 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
964 		}
965 	}
966 }
967 
968 static int
969 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
970 {
971 	struct spdk_lw_thread *lw_thread;
972 
973 	switch (op) {
974 	case SPDK_THREAD_OP_NEW:
975 		lw_thread = spdk_thread_get_ctx(thread);
976 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
977 		return _reactor_schedule_thread(thread);
978 	case SPDK_THREAD_OP_RESCHED:
979 		_reactor_request_thread_reschedule(thread);
980 		return 0;
981 	default:
982 		return -ENOTSUP;
983 	}
984 }
985 
986 static bool
987 reactor_thread_op_supported(enum spdk_thread_op op)
988 {
989 	switch (op) {
990 	case SPDK_THREAD_OP_NEW:
991 	case SPDK_THREAD_OP_RESCHED:
992 		return true;
993 	default:
994 		return false;
995 	}
996 }
997 
998 struct call_reactor {
999 	uint32_t cur_core;
1000 	spdk_event_fn fn;
1001 	void *arg1;
1002 	void *arg2;
1003 
1004 	uint32_t orig_core;
1005 	spdk_event_fn cpl;
1006 };
1007 
1008 static void
1009 on_reactor(void *arg1, void *arg2)
1010 {
1011 	struct call_reactor *cr = arg1;
1012 	struct spdk_event *evt;
1013 
1014 	cr->fn(cr->arg1, cr->arg2);
1015 
1016 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1017 
1018 	if (cr->cur_core > spdk_env_get_last_core()) {
1019 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1020 
1021 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1022 		free(cr);
1023 	} else {
1024 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1025 			      cr->cur_core);
1026 
1027 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1028 	}
1029 	assert(evt != NULL);
1030 	spdk_event_call(evt);
1031 }
1032 
1033 void
1034 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1035 {
1036 	struct call_reactor *cr;
1037 	struct spdk_event *evt;
1038 
1039 	cr = calloc(1, sizeof(*cr));
1040 	if (!cr) {
1041 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1042 		cpl(arg1, arg2);
1043 		return;
1044 	}
1045 
1046 	cr->fn = fn;
1047 	cr->arg1 = arg1;
1048 	cr->arg2 = arg2;
1049 	cr->cpl = cpl;
1050 	cr->orig_core = spdk_env_get_current_core();
1051 	cr->cur_core = spdk_env_get_first_core();
1052 
1053 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1054 
1055 	evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL);
1056 	assert(evt != NULL);
1057 
1058 	spdk_event_call(evt);
1059 }
1060 
1061 #ifdef __linux__
1062 static int
1063 reactor_schedule_thread_event(void *arg)
1064 {
1065 	struct spdk_reactor *reactor = arg;
1066 	struct spdk_lw_thread *lw_thread, *tmp;
1067 	uint32_t count = 0;
1068 	uint64_t notify = 1;
1069 
1070 	assert(reactor->interrupt_mode);
1071 
1072 	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1073 		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
1074 		return -errno;
1075 	}
1076 
1077 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1078 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1079 	}
1080 
1081 	return count;
1082 }
1083 
1084 static int
1085 reactor_interrupt_init(struct spdk_reactor *reactor)
1086 {
1087 	int rc;
1088 
1089 	rc = spdk_fd_group_create(&reactor->fgrp);
1090 	if (rc != 0) {
1091 		return rc;
1092 	}
1093 
1094 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1095 	if (reactor->resched_fd < 0) {
1096 		rc = -EBADF;
1097 		goto err;
1098 	}
1099 
1100 	rc = spdk_fd_group_add(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event,
1101 			       reactor);
1102 	if (rc) {
1103 		close(reactor->resched_fd);
1104 		goto err;
1105 	}
1106 
1107 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1108 	if (reactor->events_fd < 0) {
1109 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1110 		close(reactor->resched_fd);
1111 
1112 		rc = -EBADF;
1113 		goto err;
1114 	}
1115 
1116 	rc = spdk_fd_group_add(reactor->fgrp, reactor->events_fd,
1117 			       (spdk_fd_fn)event_queue_run_batch, reactor);
1118 	if (rc) {
1119 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1120 		close(reactor->resched_fd);
1121 		close(reactor->events_fd);
1122 		goto err;
1123 	}
1124 
1125 	reactor->interrupt_mode = true;
1126 	return 0;
1127 
1128 err:
1129 	spdk_fd_group_destroy(reactor->fgrp);
1130 	return rc;
1131 }
1132 #else
1133 static int
1134 reactor_interrupt_init(struct spdk_reactor *reactor)
1135 {
1136 	return -ENOTSUP;
1137 }
1138 #endif
1139 
1140 static void
1141 reactor_interrupt_fini(struct spdk_reactor *reactor)
1142 {
1143 	struct spdk_fd_group *fgrp = reactor->fgrp;
1144 
1145 	if (!fgrp) {
1146 		return;
1147 	}
1148 
1149 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1150 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1151 
1152 	close(reactor->events_fd);
1153 	close(reactor->resched_fd);
1154 
1155 	spdk_fd_group_destroy(fgrp);
1156 	reactor->fgrp = NULL;
1157 }
1158 
1159 void
1160 _spdk_lw_thread_set_core(struct spdk_lw_thread *thread, uint32_t lcore)
1161 {
1162 	assert(thread != NULL);
1163 	thread->lcore = lcore;
1164 	thread->resched = true;
1165 }
1166 
1167 void
1168 _spdk_lw_thread_get_current_stats(struct spdk_lw_thread *thread, struct spdk_thread_stats *stats)
1169 {
1170 	assert(thread != NULL);
1171 	*stats = thread->current_stats;
1172 }
1173 
1174 static int
1175 _governor_get_capabilities(uint32_t lcore_id, struct spdk_governor_capabilities *capabilities)
1176 {
1177 	capabilities->freq_change = false;
1178 	capabilities->freq_getset = false;
1179 	capabilities->freq_up = false;
1180 	capabilities->freq_down = false;
1181 	capabilities->freq_max = false;
1182 	capabilities->freq_min = false;
1183 	capabilities->turbo_set = false;
1184 	capabilities->priority = false;
1185 	capabilities->turbo_available = false;
1186 
1187 	return 0;
1188 }
1189 
1190 static struct spdk_governor *
1191 _governor_find(char *name)
1192 {
1193 	struct spdk_governor *governor, *tmp;
1194 
1195 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1196 		if (strcmp(name, governor->name) == 0) {
1197 			return governor;
1198 		}
1199 	}
1200 
1201 	return NULL;
1202 }
1203 
1204 int
1205 _spdk_governor_set(char *name)
1206 {
1207 	struct spdk_governor *governor;
1208 	uint32_t i;
1209 	int rc;
1210 
1211 	governor = _governor_find(name);
1212 	if (governor == NULL) {
1213 		return -EINVAL;
1214 	}
1215 
1216 	g_governor = *governor;
1217 
1218 	if (g_governor.init) {
1219 		rc = g_governor.init();
1220 		if (rc != 0) {
1221 			return rc;
1222 		}
1223 	}
1224 
1225 	SPDK_ENV_FOREACH_CORE(i) {
1226 		if (g_governor.init_core) {
1227 			rc = g_governor.init_core(i);
1228 			if (rc != 0) {
1229 				return rc;
1230 			}
1231 		}
1232 	}
1233 
1234 	return 0;
1235 }
1236 
1237 void
1238 _spdk_governor_list_add(struct spdk_governor *governor)
1239 {
1240 	if (_governor_find(governor->name)) {
1241 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1242 		assert(false);
1243 		return;
1244 	}
1245 
1246 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1247 }
1248 
1249 SPDK_LOG_REGISTER_COMPONENT(reactor)
1250