xref: /spdk/lib/event/reactor.c (revision d73077b84a71985da1db1c9847ea7c042189bae2)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/likely.h"
36 
37 #include "spdk_internal/event.h"
38 
39 #include "spdk/log.h"
40 #include "spdk/thread.h"
41 #include "spdk/env.h"
42 #include "spdk/util.h"
43 #include "spdk/string.h"
44 #include "spdk/fd_group.h"
45 
46 #ifdef __linux__
47 #include <sys/prctl.h>
48 #include <sys/eventfd.h>
49 #endif
50 
51 #ifdef __FreeBSD__
52 #include <pthread_np.h>
53 #endif
54 
55 #define SPDK_EVENT_BATCH_SIZE		8
56 
57 static struct spdk_reactor *g_reactors;
58 static struct spdk_cpuset g_reactor_core_mask;
59 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
60 
61 static bool g_framework_context_switch_monitor_enabled = true;
62 
63 static struct spdk_mempool *g_spdk_event_mempool = NULL;
64 
65 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
66 	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);
67 
68 static struct spdk_scheduler *g_scheduler;
69 static struct spdk_scheduler *g_new_scheduler;
70 static struct spdk_reactor *g_scheduling_reactor;
71 static uint32_t g_scheduler_period;
72 static struct spdk_scheduler_core_info *g_core_infos = NULL;
73 
74 TAILQ_HEAD(, spdk_governor) g_governor_list
75 	= TAILQ_HEAD_INITIALIZER(g_governor_list);
76 
77 static int _governor_get_capabilities(uint32_t lcore_id,
78 				      struct spdk_governor_capabilities *capabilities);
79 
80 static struct spdk_governor g_governor = {
81 	.name = "default",
82 	.get_core_capabilities = _governor_get_capabilities,
83 };
84 
85 static int reactor_interrupt_init(struct spdk_reactor *reactor);
86 static void reactor_interrupt_fini(struct spdk_reactor *reactor);
87 
88 static struct spdk_scheduler *
89 _scheduler_find(char *name)
90 {
91 	struct spdk_scheduler *tmp;
92 
93 	TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
94 		if (strcmp(name, tmp->name) == 0) {
95 			return tmp;
96 		}
97 	}
98 
99 	return NULL;
100 }
101 
102 int
103 _spdk_scheduler_set(char *name)
104 {
105 	struct spdk_scheduler *scheduler;
106 
107 	scheduler = _scheduler_find(name);
108 	if (scheduler == NULL) {
109 		SPDK_ERRLOG("Requested scheduler is missing\n");
110 		return -ENOENT;
111 	}
112 
113 	if (g_reactors == NULL || g_scheduling_reactor == NULL) {
114 		g_new_scheduler = scheduler;
115 		g_scheduler = scheduler;
116 		return 0;
117 	}
118 
119 	if (g_scheduling_reactor->flags.is_scheduling) {
120 		g_new_scheduler = scheduler;
121 	} else {
122 		if (g_scheduler->deinit != NULL) {
123 			g_scheduler->deinit(&g_governor);
124 		}
125 
126 		g_new_scheduler = scheduler;
127 		g_scheduler = scheduler;
128 	}
129 
130 	if (scheduler->init != NULL) {
131 		scheduler->init(&g_governor);
132 	}
133 
134 	return 0;
135 }
136 
137 void
138 _spdk_scheduler_period_set(uint32_t period)
139 {
140 	g_scheduler_period = period;
141 }
142 
143 void
144 _spdk_scheduler_list_add(struct spdk_scheduler *scheduler)
145 {
146 	if (_scheduler_find(scheduler->name)) {
147 		SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
148 		assert(false);
149 		return;
150 	}
151 
152 	TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
153 }
154 
155 static void
156 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
157 {
158 	reactor->lcore = lcore;
159 	reactor->flags.is_valid = true;
160 
161 	TAILQ_INIT(&reactor->threads);
162 	reactor->thread_count = 0;
163 
164 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
165 	if (reactor->events == NULL) {
166 		SPDK_ERRLOG("Failed to allocate events ring\n");
167 		assert(false);
168 	}
169 
170 	if (spdk_interrupt_mode_is_enabled()) {
171 		reactor_interrupt_init(reactor);
172 	}
173 }
174 
175 struct spdk_reactor *
176 spdk_reactor_get(uint32_t lcore)
177 {
178 	struct spdk_reactor *reactor;
179 
180 	if (g_reactors == NULL) {
181 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
182 		return NULL;
183 	}
184 
185 	reactor = &g_reactors[lcore];
186 
187 	if (reactor->flags.is_valid == false) {
188 		return NULL;
189 	}
190 
191 	return reactor;
192 }
193 
194 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
195 static bool reactor_thread_op_supported(enum spdk_thread_op op);
196 
197 int
198 spdk_reactors_init(void)
199 {
200 	int rc;
201 	uint32_t i, last_core;
202 	char mempool_name[32];
203 
204 	rc = _spdk_scheduler_set("static");
205 	if (rc != 0) {
206 		SPDK_ERRLOG("Failed setting up scheduler\n");
207 		return rc;
208 	}
209 
210 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
211 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
212 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
213 			       sizeof(struct spdk_event),
214 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
215 			       SPDK_ENV_SOCKET_ID_ANY);
216 
217 	if (g_spdk_event_mempool == NULL) {
218 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
219 		return -1;
220 	}
221 
222 	/* struct spdk_reactor must be aligned on 64 byte boundary */
223 	last_core = spdk_env_get_last_core();
224 	rc = posix_memalign((void **)&g_reactors, 64,
225 			    (last_core + 1) * sizeof(struct spdk_reactor));
226 	if (rc != 0) {
227 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
228 			    last_core + 1);
229 		spdk_mempool_free(g_spdk_event_mempool);
230 		return -1;
231 	}
232 
233 	g_core_infos = calloc(last_core + 1, sizeof(*g_core_infos));
234 	if (g_core_infos == NULL) {
235 		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
236 		spdk_mempool_free(g_spdk_event_mempool);
237 		free(g_reactors);
238 		return -ENOMEM;
239 	}
240 
241 	memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));
242 
243 	spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
244 				 sizeof(struct spdk_lw_thread));
245 
246 	SPDK_ENV_FOREACH_CORE(i) {
247 		reactor_construct(&g_reactors[i], i);
248 	}
249 
250 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
251 
252 	return 0;
253 }
254 
255 void
256 spdk_reactors_fini(void)
257 {
258 	uint32_t i;
259 	struct spdk_reactor *reactor;
260 
261 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
262 		return;
263 	}
264 
265 	if (g_scheduler->deinit != NULL) {
266 		g_scheduler->deinit(&g_governor);
267 	}
268 
269 	spdk_thread_lib_fini();
270 
271 	SPDK_ENV_FOREACH_CORE(i) {
272 		reactor = spdk_reactor_get(i);
273 		assert(reactor != NULL);
274 		assert(reactor->thread_count == 0);
275 		if (reactor->events != NULL) {
276 			spdk_ring_free(reactor->events);
277 		}
278 
279 		if (reactor->interrupt_mode) {
280 			reactor_interrupt_fini(reactor);
281 		}
282 
283 		if (g_core_infos != NULL) {
284 			free(g_core_infos[i].threads);
285 		}
286 	}
287 
288 	spdk_mempool_free(g_spdk_event_mempool);
289 
290 	free(g_reactors);
291 	g_reactors = NULL;
292 	free(g_core_infos);
293 	g_core_infos = NULL;
294 }
295 
296 struct spdk_event *
297 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
298 {
299 	struct spdk_event *event = NULL;
300 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
301 
302 	if (!reactor) {
303 		assert(false);
304 		return NULL;
305 	}
306 
307 	event = spdk_mempool_get(g_spdk_event_mempool);
308 	if (event == NULL) {
309 		assert(false);
310 		return NULL;
311 	}
312 
313 	event->lcore = lcore;
314 	event->fn = fn;
315 	event->arg1 = arg1;
316 	event->arg2 = arg2;
317 
318 	return event;
319 }
320 
321 void
322 spdk_event_call(struct spdk_event *event)
323 {
324 	int rc;
325 	struct spdk_reactor *reactor;
326 
327 	reactor = spdk_reactor_get(event->lcore);
328 
329 	assert(reactor != NULL);
330 	assert(reactor->events != NULL);
331 
332 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
333 	if (rc != 1) {
334 		assert(false);
335 	}
336 
337 	if (reactor->interrupt_mode) {
338 		uint64_t notify = 1;
339 
340 		rc = write(reactor->events_fd, &notify, sizeof(notify));
341 		if (rc < 0) {
342 			SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
343 		}
344 	}
345 }
346 
347 static inline uint32_t
348 event_queue_run_batch(struct spdk_reactor *reactor)
349 {
350 	unsigned count, i;
351 	void *events[SPDK_EVENT_BATCH_SIZE];
352 	struct spdk_thread *thread;
353 	struct spdk_lw_thread *lw_thread;
354 
355 #ifdef DEBUG
356 	/*
357 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
358 	 * so we will never actually read uninitialized data from events, but just to be sure
359 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
360 	 */
361 	memset(events, 0, sizeof(events));
362 #endif
363 
364 	if (reactor->interrupt_mode) {
365 		uint64_t notify = 1;
366 		int rc;
367 
368 		/* There may be race between event_acknowledge and another producer's event_notify,
369 		 * so event_acknowledge should be applied ahead. And then check for self's event_notify.
370 		 * This can avoid event notification missing.
371 		 */
372 		rc = read(reactor->events_fd, &notify, sizeof(notify));
373 		if (rc < 0) {
374 			SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno));
375 			return -errno;
376 		}
377 
378 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
379 
380 		if (spdk_ring_count(reactor->events) != 0) {
381 			/* Trigger new notification if there are still events in event-queue waiting for processing. */
382 			rc = write(reactor->events_fd, &notify, sizeof(notify));
383 			if (rc < 0) {
384 				SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
385 				return -errno;
386 			}
387 		}
388 	} else {
389 		count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
390 	}
391 
392 	if (count == 0) {
393 		return 0;
394 	}
395 
396 	/* Execute the events. There are still some remaining events
397 	 * that must occur on an SPDK thread. To accomodate those, try to
398 	 * run them on the first thread in the list, if it exists. */
399 	lw_thread = TAILQ_FIRST(&reactor->threads);
400 	if (lw_thread) {
401 		thread = spdk_thread_get_from_ctx(lw_thread);
402 	} else {
403 		thread = NULL;
404 	}
405 
406 	spdk_set_thread(thread);
407 
408 	for (i = 0; i < count; i++) {
409 		struct spdk_event *event = events[i];
410 
411 		assert(event != NULL);
412 		event->fn(event->arg1, event->arg2);
413 	}
414 
415 	spdk_set_thread(NULL);
416 
417 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
418 
419 	return count;
420 }
421 
422 /* 1s */
423 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
424 
425 static int
426 get_rusage(struct spdk_reactor *reactor)
427 {
428 	struct rusage		rusage;
429 
430 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
431 		return -1;
432 	}
433 
434 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
435 		SPDK_INFOLOG(reactor,
436 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
437 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
438 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
439 	}
440 	reactor->rusage = rusage;
441 
442 	return -1;
443 }
444 
445 void
446 spdk_framework_enable_context_switch_monitor(bool enable)
447 {
448 	/* This global is being read by multiple threads, so this isn't
449 	 * strictly thread safe. However, we're toggling between true and
450 	 * false here, and if a thread sees the value update later than it
451 	 * should, it's no big deal. */
452 	g_framework_context_switch_monitor_enabled = enable;
453 }
454 
455 bool
456 spdk_framework_context_switch_monitor_enabled(void)
457 {
458 	return g_framework_context_switch_monitor_enabled;
459 }
460 
461 static void
462 _set_thread_name(const char *thread_name)
463 {
464 #if defined(__linux__)
465 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
466 #elif defined(__FreeBSD__)
467 	pthread_set_name_np(pthread_self(), thread_name);
468 #else
469 #error missing platform support for thread name
470 #endif
471 }
472 
473 static void
474 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
475 {
476 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
477 
478 	lw_thread->lcore = reactor->lcore;
479 
480 	spdk_set_thread(thread);
481 	spdk_thread_get_stats(&lw_thread->current_stats);
482 }
483 
484 static void
485 _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
486 {
487 	struct spdk_scheduler_core_info *core;
488 	struct spdk_lw_thread *lw_thread;
489 	uint32_t i, j;
490 
491 	SPDK_ENV_FOREACH_CORE(i) {
492 		core = &cores_info[i];
493 		for (j = 0; j < core->threads_count; j++) {
494 			lw_thread = core->threads[j];
495 			if (lw_thread->lcore != lw_thread->new_lcore) {
496 				_spdk_lw_thread_set_core(lw_thread, lw_thread->new_lcore);
497 			}
498 		}
499 	}
500 }
501 
502 static void
503 _reactors_scheduler_fini(void *arg1, void *arg2)
504 {
505 	struct spdk_reactor *reactor;
506 	uint32_t last_core;
507 	uint32_t i;
508 
509 	if (g_reactor_state == SPDK_REACTOR_STATE_RUNNING) {
510 		last_core = spdk_env_get_last_core();
511 		g_scheduler->balance(g_core_infos, last_core + 1, &g_governor);
512 
513 		/* Reschedule based on the balancing output */
514 		_threads_reschedule(g_core_infos);
515 
516 		SPDK_ENV_FOREACH_CORE(i) {
517 			reactor = spdk_reactor_get(i);
518 			reactor->flags.is_scheduling = false;
519 		}
520 	}
521 }
522 
523 static void
524 _reactors_scheduler_cancel(void *arg1, void *arg2)
525 {
526 	struct spdk_reactor *reactor;
527 	uint32_t i;
528 
529 	SPDK_ENV_FOREACH_CORE(i) {
530 		reactor = spdk_reactor_get(i);
531 		reactor->flags.is_scheduling = false;
532 	}
533 }
534 
535 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
536 static void
537 _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
538 {
539 	struct spdk_scheduler_core_info *core_info;
540 	struct spdk_lw_thread *lw_thread;
541 	struct spdk_reactor *reactor;
542 	struct spdk_event *evt;
543 	uint32_t next_core;
544 	uint32_t i;
545 
546 	reactor = spdk_reactor_get(spdk_env_get_current_core());
547 	reactor->flags.is_scheduling = true;
548 	core_info = &g_core_infos[reactor->lcore];
549 	core_info->lcore = reactor->lcore;
550 	core_info->core_idle_tsc = reactor->idle_tsc;
551 	core_info->core_busy_tsc = reactor->busy_tsc;
552 
553 	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
554 
555 	free(core_info->threads);
556 	core_info->threads = NULL;
557 
558 	i = 0;
559 
560 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
561 		_init_thread_stats(reactor, lw_thread);
562 		i++;
563 	}
564 
565 	core_info->threads_count = i;
566 
567 	if (core_info->threads_count > 0) {
568 		core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *));
569 		if (core_info->threads == NULL) {
570 			SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
571 
572 			/* Cancel this round of schedule work */
573 			evt = spdk_event_allocate(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL);
574 			spdk_event_call(evt);
575 			return;
576 		}
577 
578 		i = 0;
579 		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
580 			core_info->threads[i] = lw_thread;
581 			i++;
582 		}
583 	}
584 
585 	next_core = spdk_env_get_next_core(reactor->lcore);
586 	if (next_core == UINT32_MAX) {
587 		next_core = spdk_env_get_first_core();
588 	}
589 
590 	/* If we've looped back around to the scheduler thread, move to the next phase */
591 	if (next_core == g_scheduling_reactor->lcore) {
592 		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
593 		evt = spdk_event_allocate(next_core, _reactors_scheduler_fini, NULL, NULL);
594 		spdk_event_call(evt);
595 		return;
596 	}
597 
598 	evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
599 	spdk_event_call(evt);
600 }
601 
602 static int _reactor_schedule_thread(struct spdk_thread *thread);
603 static uint64_t g_rusage_period;
604 
605 static bool
606 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
607 {
608 	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
609 	int efd;
610 
611 	if (spdk_unlikely(lw_thread->resched)) {
612 		lw_thread->resched = false;
613 		TAILQ_REMOVE(&reactor->threads, lw_thread, link);
614 		assert(reactor->thread_count > 0);
615 		reactor->thread_count--;
616 
617 		if (reactor->interrupt_mode) {
618 			efd = spdk_thread_get_interrupt_fd(thread);
619 			spdk_fd_group_remove(reactor->fgrp, efd);
620 		}
621 		_reactor_schedule_thread(thread);
622 		return true;
623 	}
624 
625 	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
626 			  spdk_thread_is_idle(thread))) {
627 		if (reactor->flags.is_scheduling == false) {
628 			TAILQ_REMOVE(&reactor->threads, lw_thread, link);
629 			assert(reactor->thread_count > 0);
630 			reactor->thread_count--;
631 
632 			if (reactor->interrupt_mode) {
633 				efd = spdk_thread_get_interrupt_fd(thread);
634 				spdk_fd_group_remove(reactor->fgrp, efd);
635 			}
636 			spdk_thread_destroy(thread);
637 			return true;
638 		}
639 	}
640 
641 	return false;
642 }
643 
644 static void
645 reactor_interrupt_run(struct spdk_reactor *reactor)
646 {
647 	int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
648 
649 	spdk_fd_group_wait(reactor->fgrp, block_timeout);
650 
651 	/* TODO: add tsc records and g_framework_context_switch_monitor_enabled */
652 }
653 
654 static void
655 _reactor_run(struct spdk_reactor *reactor)
656 {
657 	struct spdk_thread	*thread;
658 	struct spdk_lw_thread	*lw_thread, *tmp;
659 	uint64_t		now;
660 	int			rc;
661 
662 	event_queue_run_batch(reactor);
663 
664 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
665 		thread = spdk_thread_get_from_ctx(lw_thread);
666 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
667 
668 		now = spdk_thread_get_last_tsc(thread);
669 		if (rc == 0) {
670 			reactor->idle_tsc += now - reactor->tsc_last;
671 		} else if (rc > 0) {
672 			reactor->busy_tsc += now - reactor->tsc_last;
673 		}
674 		reactor->tsc_last = now;
675 
676 		reactor_post_process_lw_thread(reactor, lw_thread);
677 	}
678 
679 	if (g_framework_context_switch_monitor_enabled) {
680 		if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
681 			get_rusage(reactor);
682 			reactor->last_rusage = reactor->tsc_last;
683 		}
684 	}
685 }
686 
687 static int
688 reactor_run(void *arg)
689 {
690 	struct spdk_reactor	*reactor = arg;
691 	struct spdk_thread	*thread;
692 	struct spdk_lw_thread	*lw_thread, *tmp;
693 	char			thread_name[32];
694 	uint64_t		last_sched = 0;
695 
696 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
697 
698 	/* Rename the POSIX thread because the reactor is tied to the POSIX
699 	 * thread in the SPDK event library.
700 	 */
701 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
702 	_set_thread_name(thread_name);
703 
704 	reactor->tsc_last = spdk_get_ticks();
705 
706 	while (1) {
707 		if (spdk_unlikely(reactor->interrupt_mode)) {
708 			reactor_interrupt_run(reactor);
709 		} else {
710 			_reactor_run(reactor);
711 		}
712 
713 		if (spdk_unlikely((reactor->tsc_last - last_sched) > g_scheduler_period &&
714 				  reactor == g_scheduling_reactor &&
715 				  !reactor->flags.is_scheduling)) {
716 			if (spdk_unlikely(g_scheduler != g_new_scheduler)) {
717 				if (g_scheduler->deinit != NULL) {
718 					g_scheduler->deinit(&g_governor);
719 				}
720 				g_scheduler = g_new_scheduler;
721 			}
722 
723 			if (spdk_unlikely(g_scheduler->balance != NULL)) {
724 				last_sched = reactor->tsc_last;
725 				_reactors_scheduler_gather_metrics(NULL, NULL);
726 			}
727 		}
728 
729 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
730 			break;
731 		}
732 	}
733 
734 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
735 		thread = spdk_thread_get_from_ctx(lw_thread);
736 		spdk_set_thread(thread);
737 		spdk_thread_exit(thread);
738 	}
739 
740 	while (!TAILQ_EMPTY(&reactor->threads)) {
741 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
742 			thread = spdk_thread_get_from_ctx(lw_thread);
743 			spdk_set_thread(thread);
744 			if (spdk_thread_is_exited(thread)) {
745 				TAILQ_REMOVE(&reactor->threads, lw_thread, link);
746 				assert(reactor->thread_count > 0);
747 				reactor->thread_count--;
748 				if (reactor->interrupt_mode) {
749 					int efd = spdk_thread_get_interrupt_fd(thread);
750 
751 					spdk_fd_group_remove(reactor->fgrp, efd);
752 				}
753 				spdk_thread_destroy(thread);
754 			} else {
755 				spdk_thread_poll(thread, 0, 0);
756 			}
757 		}
758 	}
759 
760 	return 0;
761 }
762 
763 int
764 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
765 {
766 	int ret;
767 	const struct spdk_cpuset *validmask;
768 
769 	ret = spdk_cpuset_parse(cpumask, mask);
770 	if (ret < 0) {
771 		return ret;
772 	}
773 
774 	validmask = spdk_app_get_core_mask();
775 	spdk_cpuset_and(cpumask, validmask);
776 
777 	return 0;
778 }
779 
780 const struct spdk_cpuset *
781 spdk_app_get_core_mask(void)
782 {
783 	return &g_reactor_core_mask;
784 }
785 
786 void
787 spdk_reactors_start(void)
788 {
789 	struct spdk_reactor *reactor;
790 	struct spdk_cpuset tmp_cpumask = {};
791 	uint32_t i, current_core;
792 	int rc;
793 	char thread_name[32];
794 
795 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
796 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
797 
798 	current_core = spdk_env_get_current_core();
799 	SPDK_ENV_FOREACH_CORE(i) {
800 		if (i != current_core) {
801 			reactor = spdk_reactor_get(i);
802 			if (reactor == NULL) {
803 				continue;
804 			}
805 
806 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
807 			if (rc < 0) {
808 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
809 				assert(false);
810 				return;
811 			}
812 
813 			/* For now, for each reactor spawn one thread. */
814 			snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
815 
816 			spdk_cpuset_zero(&tmp_cpumask);
817 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
818 
819 			spdk_thread_create(thread_name, &tmp_cpumask);
820 		}
821 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
822 	}
823 
824 	/* Start the master reactor */
825 	reactor = spdk_reactor_get(current_core);
826 	assert(reactor != NULL);
827 	g_scheduling_reactor = reactor;
828 	reactor_run(reactor);
829 
830 	spdk_env_thread_wait_all();
831 
832 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
833 }
834 
835 void
836 spdk_reactors_stop(void *arg1)
837 {
838 	uint32_t i;
839 	int rc;
840 	struct spdk_reactor *reactor;
841 	uint64_t notify = 1;
842 
843 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
844 
845 	if (spdk_interrupt_mode_is_enabled()) {
846 		SPDK_ENV_FOREACH_CORE(i) {
847 			reactor = spdk_reactor_get(i);
848 
849 			rc = write(reactor->events_fd, &notify, sizeof(notify));
850 			if (rc < 0) {
851 				SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
852 				continue;
853 			}
854 		}
855 	}
856 }
857 
858 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
859 static uint32_t g_next_core = UINT32_MAX;
860 
861 static int
862 thread_process_interrupts(void *arg)
863 {
864 	struct spdk_thread *thread = arg;
865 
866 	return spdk_thread_poll(thread, 0, 0);
867 }
868 
869 static void
870 _schedule_thread(void *arg1, void *arg2)
871 {
872 	struct spdk_lw_thread *lw_thread = arg1;
873 	struct spdk_reactor *reactor;
874 	uint32_t current_core;
875 	int efd;
876 
877 	current_core = spdk_env_get_current_core();
878 
879 	reactor = spdk_reactor_get(current_core);
880 	assert(reactor != NULL);
881 
882 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
883 	reactor->thread_count++;
884 
885 	if (reactor->interrupt_mode) {
886 		int rc;
887 		struct spdk_thread *thread;
888 
889 		thread = spdk_thread_get_from_ctx(lw_thread);
890 		efd = spdk_thread_get_interrupt_fd(thread);
891 		rc = spdk_fd_group_add(reactor->fgrp, efd, thread_process_interrupts, thread);
892 		if (rc < 0) {
893 			SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
894 		}
895 	}
896 }
897 
898 static int
899 _reactor_schedule_thread(struct spdk_thread *thread)
900 {
901 	uint32_t core;
902 	struct spdk_lw_thread *lw_thread;
903 	struct spdk_event *evt = NULL;
904 	struct spdk_cpuset *cpumask;
905 	uint32_t i;
906 
907 	cpumask = spdk_thread_get_cpumask(thread);
908 
909 	lw_thread = spdk_thread_get_ctx(thread);
910 	assert(lw_thread != NULL);
911 	core = lw_thread->lcore;
912 	memset(lw_thread, 0, sizeof(*lw_thread));
913 
914 	pthread_mutex_lock(&g_scheduler_mtx);
915 	if (core == SPDK_ENV_LCORE_ID_ANY) {
916 		for (i = 0; i < spdk_env_get_core_count(); i++) {
917 			if (g_next_core > spdk_env_get_last_core()) {
918 				g_next_core = spdk_env_get_first_core();
919 			}
920 			core = g_next_core;
921 			g_next_core = spdk_env_get_next_core(g_next_core);
922 
923 			if (spdk_cpuset_get_cpu(cpumask, core)) {
924 				break;
925 			}
926 		}
927 	}
928 
929 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
930 
931 	pthread_mutex_unlock(&g_scheduler_mtx);
932 
933 	assert(evt != NULL);
934 	if (evt == NULL) {
935 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
936 		return -1;
937 	}
938 
939 	lw_thread->tsc_start = spdk_get_ticks();
940 
941 	spdk_event_call(evt);
942 
943 	return 0;
944 }
945 
946 static void
947 _reactor_request_thread_reschedule(struct spdk_thread *thread)
948 {
949 	struct spdk_lw_thread *lw_thread;
950 	struct spdk_reactor *reactor;
951 	uint32_t current_core;
952 
953 	assert(thread == spdk_get_thread());
954 
955 	lw_thread = spdk_thread_get_ctx(thread);
956 
957 	_spdk_lw_thread_set_core(lw_thread, SPDK_ENV_LCORE_ID_ANY);
958 
959 	current_core = spdk_env_get_current_core();
960 	reactor = spdk_reactor_get(current_core);
961 	assert(reactor != NULL);
962 	if (reactor->interrupt_mode) {
963 		uint64_t notify = 1;
964 
965 		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
966 			SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
967 		}
968 	}
969 }
970 
971 static int
972 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
973 {
974 	struct spdk_lw_thread *lw_thread;
975 
976 	switch (op) {
977 	case SPDK_THREAD_OP_NEW:
978 		lw_thread = spdk_thread_get_ctx(thread);
979 		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
980 		return _reactor_schedule_thread(thread);
981 	case SPDK_THREAD_OP_RESCHED:
982 		_reactor_request_thread_reschedule(thread);
983 		return 0;
984 	default:
985 		return -ENOTSUP;
986 	}
987 }
988 
989 static bool
990 reactor_thread_op_supported(enum spdk_thread_op op)
991 {
992 	switch (op) {
993 	case SPDK_THREAD_OP_NEW:
994 	case SPDK_THREAD_OP_RESCHED:
995 		return true;
996 	default:
997 		return false;
998 	}
999 }
1000 
1001 struct call_reactor {
1002 	uint32_t cur_core;
1003 	spdk_event_fn fn;
1004 	void *arg1;
1005 	void *arg2;
1006 
1007 	uint32_t orig_core;
1008 	spdk_event_fn cpl;
1009 };
1010 
1011 static void
1012 on_reactor(void *arg1, void *arg2)
1013 {
1014 	struct call_reactor *cr = arg1;
1015 	struct spdk_event *evt;
1016 
1017 	cr->fn(cr->arg1, cr->arg2);
1018 
1019 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1020 
1021 	if (cr->cur_core > spdk_env_get_last_core()) {
1022 		SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1023 
1024 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1025 		free(cr);
1026 	} else {
1027 		SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1028 			      cr->cur_core);
1029 
1030 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1031 	}
1032 	assert(evt != NULL);
1033 	spdk_event_call(evt);
1034 }
1035 
1036 void
1037 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1038 {
1039 	struct call_reactor *cr;
1040 	struct spdk_event *evt;
1041 
1042 	cr = calloc(1, sizeof(*cr));
1043 	if (!cr) {
1044 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
1045 		cpl(arg1, arg2);
1046 		return;
1047 	}
1048 
1049 	cr->fn = fn;
1050 	cr->arg1 = arg1;
1051 	cr->arg2 = arg2;
1052 	cr->cpl = cpl;
1053 	cr->orig_core = spdk_env_get_current_core();
1054 	cr->cur_core = spdk_env_get_first_core();
1055 
1056 	SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1057 
1058 	evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL);
1059 	assert(evt != NULL);
1060 
1061 	spdk_event_call(evt);
1062 }
1063 
1064 #ifdef __linux__
1065 static int
1066 reactor_schedule_thread_event(void *arg)
1067 {
1068 	struct spdk_reactor *reactor = arg;
1069 	struct spdk_lw_thread *lw_thread, *tmp;
1070 	uint32_t count = 0;
1071 	uint64_t notify = 1;
1072 
1073 	assert(reactor->interrupt_mode);
1074 
1075 	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
1076 		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
1077 		return -errno;
1078 	}
1079 
1080 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1081 		count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1082 	}
1083 
1084 	return count;
1085 }
1086 
1087 static int
1088 reactor_interrupt_init(struct spdk_reactor *reactor)
1089 {
1090 	int rc;
1091 
1092 	rc = spdk_fd_group_create(&reactor->fgrp);
1093 	if (rc != 0) {
1094 		return rc;
1095 	}
1096 
1097 	reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1098 	if (reactor->resched_fd < 0) {
1099 		rc = -EBADF;
1100 		goto err;
1101 	}
1102 
1103 	rc = spdk_fd_group_add(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event,
1104 			       reactor);
1105 	if (rc) {
1106 		close(reactor->resched_fd);
1107 		goto err;
1108 	}
1109 
1110 	reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1111 	if (reactor->events_fd < 0) {
1112 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1113 		close(reactor->resched_fd);
1114 
1115 		rc = -EBADF;
1116 		goto err;
1117 	}
1118 
1119 	rc = spdk_fd_group_add(reactor->fgrp, reactor->events_fd,
1120 			       (spdk_fd_fn)event_queue_run_batch, reactor);
1121 	if (rc) {
1122 		spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1123 		close(reactor->resched_fd);
1124 		close(reactor->events_fd);
1125 		goto err;
1126 	}
1127 
1128 	reactor->interrupt_mode = true;
1129 	return 0;
1130 
1131 err:
1132 	spdk_fd_group_destroy(reactor->fgrp);
1133 	return rc;
1134 }
1135 #else
1136 static int
1137 reactor_interrupt_init(struct spdk_reactor *reactor)
1138 {
1139 	return -ENOTSUP;
1140 }
1141 #endif
1142 
1143 static void
1144 reactor_interrupt_fini(struct spdk_reactor *reactor)
1145 {
1146 	struct spdk_fd_group *fgrp = reactor->fgrp;
1147 
1148 	if (!fgrp) {
1149 		return;
1150 	}
1151 
1152 	spdk_fd_group_remove(fgrp, reactor->events_fd);
1153 	spdk_fd_group_remove(fgrp, reactor->resched_fd);
1154 
1155 	close(reactor->events_fd);
1156 	close(reactor->resched_fd);
1157 
1158 	spdk_fd_group_destroy(fgrp);
1159 	reactor->fgrp = NULL;
1160 }
1161 
1162 void
1163 _spdk_lw_thread_set_core(struct spdk_lw_thread *thread, uint32_t lcore)
1164 {
1165 	assert(thread != NULL);
1166 	thread->lcore = lcore;
1167 	thread->resched = true;
1168 }
1169 
1170 void
1171 _spdk_lw_thread_get_current_stats(struct spdk_lw_thread *thread, struct spdk_thread_stats *stats)
1172 {
1173 	assert(thread != NULL);
1174 	*stats = thread->current_stats;
1175 }
1176 
1177 static int
1178 _governor_get_capabilities(uint32_t lcore_id, struct spdk_governor_capabilities *capabilities)
1179 {
1180 	capabilities->freq_change = false;
1181 	capabilities->freq_getset = false;
1182 	capabilities->freq_up = false;
1183 	capabilities->freq_down = false;
1184 	capabilities->freq_max = false;
1185 	capabilities->freq_min = false;
1186 	capabilities->turbo_set = false;
1187 	capabilities->priority = false;
1188 	capabilities->turbo_available = false;
1189 
1190 	return 0;
1191 }
1192 
1193 static struct spdk_governor *
1194 _governor_find(char *name)
1195 {
1196 	struct spdk_governor *governor, *tmp;
1197 
1198 	TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1199 		if (strcmp(name, governor->name) == 0) {
1200 			return governor;
1201 		}
1202 	}
1203 
1204 	return NULL;
1205 }
1206 
1207 int
1208 _spdk_governor_set(char *name)
1209 {
1210 	struct spdk_governor *governor;
1211 	uint32_t i;
1212 	int rc;
1213 
1214 	governor = _governor_find(name);
1215 	if (governor == NULL) {
1216 		return -EINVAL;
1217 	}
1218 
1219 	g_governor = *governor;
1220 
1221 	if (g_governor.init) {
1222 		rc = g_governor.init();
1223 		if (rc != 0) {
1224 			return rc;
1225 		}
1226 	}
1227 
1228 	SPDK_ENV_FOREACH_CORE(i) {
1229 		if (g_governor.init_core) {
1230 			rc = g_governor.init_core(i);
1231 			if (rc != 0) {
1232 				return rc;
1233 			}
1234 		}
1235 	}
1236 
1237 	return 0;
1238 }
1239 
1240 void
1241 _spdk_governor_list_add(struct spdk_governor *governor)
1242 {
1243 	if (_governor_find(governor->name)) {
1244 		SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1245 		assert(false);
1246 		return;
1247 	}
1248 
1249 	TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1250 }
1251 
1252 SPDK_LOG_REGISTER_COMPONENT(reactor)
1253