xref: /spdk/lib/event/reactor.c (revision 0be5557cad778ce88a8836b8595f94352eec0600)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/likely.h"
36 
37 #include "spdk_internal/event.h"
38 #include "spdk_internal/log.h"
39 #include "spdk_internal/thread.h"
40 
41 #include "spdk/log.h"
42 #include "spdk/thread.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 
46 #ifdef __linux__
47 #include <sys/prctl.h>
48 #endif
49 
50 #ifdef __FreeBSD__
51 #include <pthread_np.h>
52 #endif
53 
54 #define SPDK_EVENT_BATCH_SIZE		8
55 
56 static struct spdk_reactor *g_reactors;
57 static struct spdk_cpuset g_reactor_core_mask;
58 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
59 
60 static bool g_framework_context_switch_monitor_enabled = true;
61 
62 static struct spdk_mempool *g_spdk_event_mempool = NULL;
63 
64 static void
65 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
66 {
67 	reactor->lcore = lcore;
68 	reactor->flags.is_valid = true;
69 
70 	TAILQ_INIT(&reactor->threads);
71 	reactor->thread_count = 0;
72 
73 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
74 	assert(reactor->events != NULL);
75 }
76 
77 struct spdk_reactor *
78 spdk_reactor_get(uint32_t lcore)
79 {
80 	struct spdk_reactor *reactor;
81 
82 	if (g_reactors == NULL) {
83 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
84 		return NULL;
85 	}
86 
87 	reactor = &g_reactors[lcore];
88 
89 	if (reactor->flags.is_valid == false) {
90 		return NULL;
91 	}
92 
93 	return reactor;
94 }
95 
96 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
97 static bool reactor_thread_op_supported(enum spdk_thread_op op);
98 
99 int
100 spdk_reactors_init(void)
101 {
102 	int rc;
103 	uint32_t i, last_core;
104 	char mempool_name[32];
105 
106 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
107 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
108 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
109 			       sizeof(struct spdk_event),
110 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
111 			       SPDK_ENV_SOCKET_ID_ANY);
112 
113 	if (g_spdk_event_mempool == NULL) {
114 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
115 		return -1;
116 	}
117 
118 	/* struct spdk_reactor must be aligned on 64 byte boundary */
119 	last_core = spdk_env_get_last_core();
120 	rc = posix_memalign((void **)&g_reactors, 64,
121 			    (last_core + 1) * sizeof(struct spdk_reactor));
122 	if (rc != 0) {
123 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
124 			    last_core + 1);
125 		spdk_mempool_free(g_spdk_event_mempool);
126 		return -1;
127 	}
128 
129 	memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));
130 
131 	spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
132 				 sizeof(struct spdk_lw_thread));
133 
134 	SPDK_ENV_FOREACH_CORE(i) {
135 		reactor_construct(&g_reactors[i], i);
136 	}
137 
138 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
139 
140 	return 0;
141 }
142 
143 void
144 spdk_reactors_fini(void)
145 {
146 	uint32_t i;
147 	struct spdk_reactor *reactor;
148 
149 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
150 		return;
151 	}
152 
153 	spdk_thread_lib_fini();
154 
155 	SPDK_ENV_FOREACH_CORE(i) {
156 		reactor = spdk_reactor_get(i);
157 		assert(reactor != NULL);
158 		assert(reactor->thread_count == 0);
159 		if (reactor->events != NULL) {
160 			spdk_ring_free(reactor->events);
161 		}
162 	}
163 
164 	spdk_mempool_free(g_spdk_event_mempool);
165 
166 	free(g_reactors);
167 	g_reactors = NULL;
168 }
169 
170 struct spdk_event *
171 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
172 {
173 	struct spdk_event *event = NULL;
174 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
175 
176 	if (!reactor) {
177 		assert(false);
178 		return NULL;
179 	}
180 
181 	event = spdk_mempool_get(g_spdk_event_mempool);
182 	if (event == NULL) {
183 		assert(false);
184 		return NULL;
185 	}
186 
187 	event->lcore = lcore;
188 	event->fn = fn;
189 	event->arg1 = arg1;
190 	event->arg2 = arg2;
191 
192 	return event;
193 }
194 
195 void
196 spdk_event_call(struct spdk_event *event)
197 {
198 	int rc;
199 	struct spdk_reactor *reactor;
200 
201 	reactor = spdk_reactor_get(event->lcore);
202 
203 	assert(reactor != NULL);
204 	assert(reactor->events != NULL);
205 
206 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
207 	if (rc != 1) {
208 		assert(false);
209 	}
210 }
211 
212 static inline uint32_t
213 _spdk_event_queue_run_batch(struct spdk_reactor *reactor)
214 {
215 	unsigned count, i;
216 	void *events[SPDK_EVENT_BATCH_SIZE];
217 	struct spdk_thread *thread;
218 	struct spdk_lw_thread *lw_thread;
219 
220 #ifdef DEBUG
221 	/*
222 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
223 	 * so we will never actually read uninitialized data from events, but just to be sure
224 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
225 	 */
226 	memset(events, 0, sizeof(events));
227 #endif
228 
229 	count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
230 	if (count == 0) {
231 		return 0;
232 	}
233 
234 	/* Execute the events. There are still some remaining events
235 	 * that must occur on an SPDK thread. To accomodate those, try to
236 	 * run them on the first thread in the list, if it exists. */
237 	lw_thread = TAILQ_FIRST(&reactor->threads);
238 	if (lw_thread) {
239 		thread = spdk_thread_get_from_ctx(lw_thread);
240 	} else {
241 		thread = NULL;
242 	}
243 
244 	spdk_set_thread(thread);
245 
246 	for (i = 0; i < count; i++) {
247 		struct spdk_event *event = events[i];
248 
249 		assert(event != NULL);
250 		event->fn(event->arg1, event->arg2);
251 	}
252 
253 	spdk_set_thread(NULL);
254 
255 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
256 
257 	return count;
258 }
259 
260 /* 1s */
261 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
262 
263 static int
264 get_rusage(struct spdk_reactor *reactor)
265 {
266 	struct rusage		rusage;
267 
268 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
269 		return -1;
270 	}
271 
272 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
273 		SPDK_INFOLOG(SPDK_LOG_REACTOR,
274 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
275 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
276 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
277 	}
278 	reactor->rusage = rusage;
279 
280 	return -1;
281 }
282 
283 void
284 spdk_framework_enable_context_switch_monitor(bool enable)
285 {
286 	/* This global is being read by multiple threads, so this isn't
287 	 * strictly thread safe. However, we're toggling between true and
288 	 * false here, and if a thread sees the value update later than it
289 	 * should, it's no big deal. */
290 	g_framework_context_switch_monitor_enabled = enable;
291 }
292 
293 bool
294 spdk_framework_context_switch_monitor_enabled(void)
295 {
296 	return g_framework_context_switch_monitor_enabled;
297 }
298 
299 static void
300 _set_thread_name(const char *thread_name)
301 {
302 #if defined(__linux__)
303 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
304 #elif defined(__FreeBSD__)
305 	pthread_set_name_np(pthread_self(), thread_name);
306 #else
307 #error missing platform support for thread name
308 #endif
309 }
310 
311 static int _reactor_schedule_thread(struct spdk_thread *thread);
312 static uint64_t g_rusage_period;
313 
314 static void
315 reactor_run(struct spdk_reactor *reactor)
316 {
317 	struct spdk_thread	*thread;
318 	struct spdk_lw_thread	*lw_thread, *tmp;
319 	uint64_t		now;
320 	int			rc;
321 
322 	_spdk_event_queue_run_batch(reactor);
323 
324 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
325 		thread = spdk_thread_get_from_ctx(lw_thread);
326 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
327 
328 		now = spdk_thread_get_last_tsc(thread);
329 		if (rc == 0) {
330 			reactor->idle_tsc += now - reactor->tsc_last;
331 		} else if (rc > 0) {
332 			reactor->busy_tsc += now - reactor->tsc_last;
333 		}
334 		reactor->tsc_last = now;
335 
336 		if (spdk_unlikely(lw_thread->resched)) {
337 			lw_thread->resched = false;
338 			TAILQ_REMOVE(&reactor->threads, lw_thread, link);
339 			assert(reactor->thread_count > 0);
340 			reactor->thread_count--;
341 			_reactor_schedule_thread(thread);
342 			continue;
343 		}
344 
345 		if (spdk_unlikely(spdk_thread_is_exited(thread) &&
346 				  spdk_thread_is_idle(thread))) {
347 			TAILQ_REMOVE(&reactor->threads, lw_thread, link);
348 			assert(reactor->thread_count > 0);
349 			reactor->thread_count--;
350 			spdk_thread_destroy(thread);
351 			continue;
352 		}
353 	}
354 
355 	if (g_framework_context_switch_monitor_enabled) {
356 		if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
357 			get_rusage(reactor);
358 			reactor->last_rusage = reactor->tsc_last;
359 		}
360 	}
361 }
362 
363 static int
364 _spdk_reactor_run(void *arg)
365 {
366 	struct spdk_reactor	*reactor = arg;
367 	struct spdk_thread	*thread;
368 	struct spdk_lw_thread	*lw_thread, *tmp;
369 	char			thread_name[32];
370 
371 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
372 
373 	/* Rename the POSIX thread because the reactor is tied to the POSIX
374 	 * thread in the SPDK event library.
375 	 */
376 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
377 	_set_thread_name(thread_name);
378 
379 	reactor->tsc_last = spdk_get_ticks();
380 
381 	while (1) {
382 		reactor_run(reactor);
383 
384 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
385 			break;
386 		}
387 	}
388 
389 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
390 		thread = spdk_thread_get_from_ctx(lw_thread);
391 		spdk_set_thread(thread);
392 		spdk_thread_exit(thread);
393 	}
394 
395 	while (!TAILQ_EMPTY(&reactor->threads)) {
396 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
397 			thread = spdk_thread_get_from_ctx(lw_thread);
398 			spdk_set_thread(thread);
399 			if (spdk_thread_is_exited(thread)) {
400 				TAILQ_REMOVE(&reactor->threads, lw_thread, link);
401 				assert(reactor->thread_count > 0);
402 				reactor->thread_count--;
403 				spdk_thread_destroy(thread);
404 			} else {
405 				spdk_thread_poll(thread, 0, 0);
406 			}
407 		}
408 	}
409 
410 	return 0;
411 }
412 
413 int
414 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
415 {
416 	int ret;
417 	struct spdk_cpuset *validmask;
418 
419 	ret = spdk_cpuset_parse(cpumask, mask);
420 	if (ret < 0) {
421 		return ret;
422 	}
423 
424 	validmask = spdk_app_get_core_mask();
425 	spdk_cpuset_and(cpumask, validmask);
426 
427 	return 0;
428 }
429 
430 struct spdk_cpuset *
431 spdk_app_get_core_mask(void)
432 {
433 	return &g_reactor_core_mask;
434 }
435 
436 void
437 spdk_reactors_start(void)
438 {
439 	struct spdk_reactor *reactor;
440 	struct spdk_cpuset tmp_cpumask = {};
441 	uint32_t i, current_core;
442 	int rc;
443 	char thread_name[32];
444 
445 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
446 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
447 
448 	current_core = spdk_env_get_current_core();
449 	SPDK_ENV_FOREACH_CORE(i) {
450 		if (i != current_core) {
451 			reactor = spdk_reactor_get(i);
452 			if (reactor == NULL) {
453 				continue;
454 			}
455 
456 			rc = spdk_env_thread_launch_pinned(reactor->lcore, _spdk_reactor_run, reactor);
457 			if (rc < 0) {
458 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
459 				assert(false);
460 				return;
461 			}
462 
463 			/* For now, for each reactor spawn one thread. */
464 			snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
465 
466 			spdk_cpuset_zero(&tmp_cpumask);
467 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
468 
469 			spdk_thread_create(thread_name, &tmp_cpumask);
470 		}
471 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
472 	}
473 
474 	/* Start the master reactor */
475 	reactor = spdk_reactor_get(current_core);
476 	assert(reactor != NULL);
477 	_spdk_reactor_run(reactor);
478 
479 	spdk_env_thread_wait_all();
480 
481 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
482 }
483 
484 void
485 spdk_reactors_stop(void *arg1)
486 {
487 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
488 }
489 
490 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
491 static uint32_t g_next_core = UINT32_MAX;
492 
493 static void
494 _schedule_thread(void *arg1, void *arg2)
495 {
496 	struct spdk_lw_thread *lw_thread = arg1;
497 	struct spdk_thread *thread;
498 	struct spdk_cpuset *cpumask;
499 	struct spdk_reactor *reactor;
500 	uint32_t current_core;
501 
502 	current_core = spdk_env_get_current_core();
503 
504 	thread = spdk_thread_get_from_ctx(lw_thread);
505 	cpumask = spdk_thread_get_cpumask(thread);
506 	if (!spdk_cpuset_get_cpu(cpumask, current_core)) {
507 		SPDK_ERRLOG("Thread was scheduled to the wrong core %d\n", current_core);
508 		assert(false);
509 	}
510 
511 	reactor = spdk_reactor_get(current_core);
512 	assert(reactor != NULL);
513 
514 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
515 	reactor->thread_count++;
516 }
517 
518 static int
519 _reactor_schedule_thread(struct spdk_thread *thread)
520 {
521 	uint32_t core;
522 	struct spdk_lw_thread *lw_thread;
523 	struct spdk_event *evt = NULL;
524 	struct spdk_cpuset *cpumask;
525 	uint32_t i;
526 
527 	cpumask = spdk_thread_get_cpumask(thread);
528 
529 	lw_thread = spdk_thread_get_ctx(thread);
530 	assert(lw_thread != NULL);
531 	memset(lw_thread, 0, sizeof(*lw_thread));
532 
533 	pthread_mutex_lock(&g_scheduler_mtx);
534 	for (i = 0; i < spdk_env_get_core_count(); i++) {
535 		if (g_next_core > spdk_env_get_last_core()) {
536 			g_next_core = spdk_env_get_first_core();
537 		}
538 		core = g_next_core;
539 		g_next_core = spdk_env_get_next_core(g_next_core);
540 
541 		if (spdk_cpuset_get_cpu(cpumask, core)) {
542 			evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
543 			break;
544 		}
545 	}
546 	pthread_mutex_unlock(&g_scheduler_mtx);
547 
548 	assert(evt != NULL);
549 	if (evt == NULL) {
550 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
551 		return -1;
552 	}
553 
554 	lw_thread->tsc_start = spdk_get_ticks();
555 
556 	spdk_event_call(evt);
557 
558 	return 0;
559 }
560 
561 static void
562 _reactor_request_thread_reschedule(struct spdk_thread *thread)
563 {
564 	struct spdk_lw_thread *lw_thread;
565 
566 	assert(thread == spdk_get_thread());
567 
568 	lw_thread = spdk_thread_get_ctx(thread);
569 
570 	assert(lw_thread != NULL);
571 
572 	lw_thread->resched = true;
573 }
574 
575 static int
576 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
577 {
578 	switch (op) {
579 	case SPDK_THREAD_OP_NEW:
580 		return _reactor_schedule_thread(thread);
581 	case SPDK_THREAD_OP_RESCHED:
582 		_reactor_request_thread_reschedule(thread);
583 		return 0;
584 	default:
585 		return -ENOTSUP;
586 	}
587 }
588 
589 static bool
590 reactor_thread_op_supported(enum spdk_thread_op op)
591 {
592 	switch (op) {
593 	case SPDK_THREAD_OP_NEW:
594 	case SPDK_THREAD_OP_RESCHED:
595 		return true;
596 	default:
597 		return false;
598 	}
599 }
600 
601 struct call_reactor {
602 	uint32_t cur_core;
603 	spdk_event_fn fn;
604 	void *arg1;
605 	void *arg2;
606 
607 	uint32_t orig_core;
608 	spdk_event_fn cpl;
609 };
610 
611 static void
612 on_reactor(void *arg1, void *arg2)
613 {
614 	struct call_reactor *cr = arg1;
615 	struct spdk_event *evt;
616 
617 	cr->fn(cr->arg1, cr->arg2);
618 
619 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
620 
621 	if (cr->cur_core > spdk_env_get_last_core()) {
622 		SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Completed reactor iteration\n");
623 
624 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
625 		free(cr);
626 	} else {
627 		SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Continuing reactor iteration to %d\n",
628 			      cr->cur_core);
629 
630 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
631 	}
632 	assert(evt != NULL);
633 	spdk_event_call(evt);
634 }
635 
636 void
637 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
638 {
639 	struct call_reactor *cr;
640 	struct spdk_event *evt;
641 
642 	cr = calloc(1, sizeof(*cr));
643 	if (!cr) {
644 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
645 		cpl(arg1, arg2);
646 		return;
647 	}
648 
649 	cr->fn = fn;
650 	cr->arg1 = arg1;
651 	cr->arg2 = arg2;
652 	cr->cpl = cpl;
653 	cr->orig_core = spdk_env_get_current_core();
654 	cr->cur_core = spdk_env_get_first_core();
655 
656 	SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Starting reactor iteration from %d\n", cr->orig_core);
657 
658 	evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL);
659 	assert(evt != NULL);
660 
661 	spdk_event_call(evt);
662 }
663 
664 SPDK_LOG_REGISTER_COMPONENT("reactor", SPDK_LOG_REACTOR)
665