xref: /spdk/lib/event/reactor.c (revision 06b537bfdb4393dea857e204b85d8df46a351d8a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/likely.h"
36 
37 #include "spdk_internal/event.h"
38 #include "spdk_internal/log.h"
39 #include "spdk_internal/thread.h"
40 
41 #include "spdk/log.h"
42 #include "spdk/thread.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 
46 #ifdef __linux__
47 #include <sys/prctl.h>
48 #endif
49 
50 #ifdef __FreeBSD__
51 #include <pthread_np.h>
52 #endif
53 
54 #define SPDK_EVENT_BATCH_SIZE		8
55 
56 static struct spdk_reactor *g_reactors;
57 static struct spdk_cpuset g_reactor_core_mask;
58 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
59 
60 static bool g_framework_context_switch_monitor_enabled = true;
61 
62 static struct spdk_mempool *g_spdk_event_mempool = NULL;
63 
64 static void
65 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
66 {
67 	reactor->lcore = lcore;
68 	reactor->flags.is_valid = true;
69 
70 	TAILQ_INIT(&reactor->threads);
71 	reactor->thread_count = 0;
72 
73 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
74 	assert(reactor->events != NULL);
75 }
76 
77 struct spdk_reactor *
78 spdk_reactor_get(uint32_t lcore)
79 {
80 	struct spdk_reactor *reactor;
81 
82 	if (g_reactors == NULL) {
83 		SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
84 		return NULL;
85 	}
86 
87 	reactor = &g_reactors[lcore];
88 
89 	if (reactor->flags.is_valid == false) {
90 		return NULL;
91 	}
92 
93 	return reactor;
94 }
95 
96 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
97 static bool reactor_thread_op_supported(enum spdk_thread_op op);
98 
99 int
100 spdk_reactors_init(void)
101 {
102 	int rc;
103 	uint32_t i, last_core;
104 	char mempool_name[32];
105 
106 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
107 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
108 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
109 			       sizeof(struct spdk_event),
110 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
111 			       SPDK_ENV_SOCKET_ID_ANY);
112 
113 	if (g_spdk_event_mempool == NULL) {
114 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
115 		return -1;
116 	}
117 
118 	/* struct spdk_reactor must be aligned on 64 byte boundary */
119 	last_core = spdk_env_get_last_core();
120 	rc = posix_memalign((void **)&g_reactors, 64,
121 			    (last_core + 1) * sizeof(struct spdk_reactor));
122 	if (rc != 0) {
123 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
124 			    last_core + 1);
125 		spdk_mempool_free(g_spdk_event_mempool);
126 		return -1;
127 	}
128 
129 	memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));
130 
131 	spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
132 				 sizeof(struct spdk_lw_thread));
133 
134 	SPDK_ENV_FOREACH_CORE(i) {
135 		reactor_construct(&g_reactors[i], i);
136 	}
137 
138 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
139 
140 	return 0;
141 }
142 
143 void
144 spdk_reactors_fini(void)
145 {
146 	uint32_t i;
147 	struct spdk_reactor *reactor;
148 
149 	if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
150 		return;
151 	}
152 
153 	spdk_thread_lib_fini();
154 
155 	SPDK_ENV_FOREACH_CORE(i) {
156 		reactor = spdk_reactor_get(i);
157 		assert(reactor != NULL);
158 		assert(reactor->thread_count == 0);
159 		if (reactor->events != NULL) {
160 			spdk_ring_free(reactor->events);
161 		}
162 	}
163 
164 	spdk_mempool_free(g_spdk_event_mempool);
165 
166 	free(g_reactors);
167 	g_reactors = NULL;
168 }
169 
170 struct spdk_event *
171 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
172 {
173 	struct spdk_event *event = NULL;
174 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
175 
176 	if (!reactor) {
177 		assert(false);
178 		return NULL;
179 	}
180 
181 	event = spdk_mempool_get(g_spdk_event_mempool);
182 	if (event == NULL) {
183 		assert(false);
184 		return NULL;
185 	}
186 
187 	event->lcore = lcore;
188 	event->fn = fn;
189 	event->arg1 = arg1;
190 	event->arg2 = arg2;
191 
192 	return event;
193 }
194 
195 void
196 spdk_event_call(struct spdk_event *event)
197 {
198 	int rc;
199 	struct spdk_reactor *reactor;
200 
201 	reactor = spdk_reactor_get(event->lcore);
202 
203 	assert(reactor != NULL);
204 	assert(reactor->events != NULL);
205 
206 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
207 	if (rc != 1) {
208 		assert(false);
209 	}
210 }
211 
212 static inline uint32_t
213 event_queue_run_batch(struct spdk_reactor *reactor)
214 {
215 	unsigned count, i;
216 	void *events[SPDK_EVENT_BATCH_SIZE];
217 	struct spdk_thread *thread;
218 	struct spdk_lw_thread *lw_thread;
219 
220 #ifdef DEBUG
221 	/*
222 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
223 	 * so we will never actually read uninitialized data from events, but just to be sure
224 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
225 	 */
226 	memset(events, 0, sizeof(events));
227 #endif
228 
229 	count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
230 	if (count == 0) {
231 		return 0;
232 	}
233 
234 	/* Execute the events. There are still some remaining events
235 	 * that must occur on an SPDK thread. To accomodate those, try to
236 	 * run them on the first thread in the list, if it exists. */
237 	lw_thread = TAILQ_FIRST(&reactor->threads);
238 	if (lw_thread) {
239 		thread = spdk_thread_get_from_ctx(lw_thread);
240 	} else {
241 		thread = NULL;
242 	}
243 
244 	spdk_set_thread(thread);
245 
246 	for (i = 0; i < count; i++) {
247 		struct spdk_event *event = events[i];
248 
249 		assert(event != NULL);
250 		event->fn(event->arg1, event->arg2);
251 	}
252 
253 	spdk_set_thread(NULL);
254 
255 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
256 
257 	return count;
258 }
259 
260 /* 1s */
261 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
262 
263 static int
264 get_rusage(struct spdk_reactor *reactor)
265 {
266 	struct rusage		rusage;
267 
268 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
269 		return -1;
270 	}
271 
272 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
273 		SPDK_INFOLOG(SPDK_LOG_REACTOR,
274 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
275 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
276 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
277 	}
278 	reactor->rusage = rusage;
279 
280 	return -1;
281 }
282 
283 void
284 spdk_framework_enable_context_switch_monitor(bool enable)
285 {
286 	/* This global is being read by multiple threads, so this isn't
287 	 * strictly thread safe. However, we're toggling between true and
288 	 * false here, and if a thread sees the value update later than it
289 	 * should, it's no big deal. */
290 	g_framework_context_switch_monitor_enabled = enable;
291 }
292 
293 bool
294 spdk_framework_context_switch_monitor_enabled(void)
295 {
296 	return g_framework_context_switch_monitor_enabled;
297 }
298 
299 static void
300 _set_thread_name(const char *thread_name)
301 {
302 #if defined(__linux__)
303 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
304 #elif defined(__FreeBSD__)
305 	pthread_set_name_np(pthread_self(), thread_name);
306 #else
307 #error missing platform support for thread name
308 #endif
309 }
310 
311 static int _reactor_schedule_thread(struct spdk_thread *thread);
312 static uint64_t g_rusage_period;
313 
314 static void
315 _reactor_run(struct spdk_reactor *reactor)
316 {
317 	struct spdk_thread	*thread;
318 	struct spdk_lw_thread	*lw_thread, *tmp;
319 	uint64_t		now;
320 	int			rc;
321 
322 	event_queue_run_batch(reactor);
323 
324 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
325 		thread = spdk_thread_get_from_ctx(lw_thread);
326 		rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
327 
328 		now = spdk_thread_get_last_tsc(thread);
329 		if (rc == 0) {
330 			reactor->idle_tsc += now - reactor->tsc_last;
331 		} else if (rc > 0) {
332 			reactor->busy_tsc += now - reactor->tsc_last;
333 		}
334 		reactor->tsc_last = now;
335 
336 		if (spdk_unlikely(lw_thread->resched)) {
337 			lw_thread->resched = false;
338 			TAILQ_REMOVE(&reactor->threads, lw_thread, link);
339 			assert(reactor->thread_count > 0);
340 			reactor->thread_count--;
341 			_reactor_schedule_thread(thread);
342 			continue;
343 		}
344 
345 		if (spdk_unlikely(spdk_thread_is_exited(thread) &&
346 				  spdk_thread_is_idle(thread))) {
347 			TAILQ_REMOVE(&reactor->threads, lw_thread, link);
348 			assert(reactor->thread_count > 0);
349 			reactor->thread_count--;
350 			spdk_thread_destroy(thread);
351 			continue;
352 		}
353 	}
354 
355 	if (g_framework_context_switch_monitor_enabled) {
356 		if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
357 			get_rusage(reactor);
358 			reactor->last_rusage = reactor->tsc_last;
359 		}
360 	}
361 }
362 
363 static int
364 reactor_run(void *arg)
365 {
366 	struct spdk_reactor	*reactor = arg;
367 	struct spdk_thread	*thread;
368 	struct spdk_lw_thread	*lw_thread, *tmp;
369 	char			thread_name[32];
370 
371 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
372 
373 	/* Rename the POSIX thread because the reactor is tied to the POSIX
374 	 * thread in the SPDK event library.
375 	 */
376 	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
377 	_set_thread_name(thread_name);
378 
379 	reactor->tsc_last = spdk_get_ticks();
380 
381 	while (1) {
382 		_reactor_run(reactor);
383 
384 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
385 			break;
386 		}
387 	}
388 
389 	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
390 		thread = spdk_thread_get_from_ctx(lw_thread);
391 		spdk_set_thread(thread);
392 		spdk_thread_exit(thread);
393 	}
394 
395 	while (!TAILQ_EMPTY(&reactor->threads)) {
396 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
397 			thread = spdk_thread_get_from_ctx(lw_thread);
398 			spdk_set_thread(thread);
399 			if (spdk_thread_is_exited(thread)) {
400 				TAILQ_REMOVE(&reactor->threads, lw_thread, link);
401 				assert(reactor->thread_count > 0);
402 				reactor->thread_count--;
403 				spdk_thread_destroy(thread);
404 			} else {
405 				spdk_thread_poll(thread, 0, 0);
406 			}
407 		}
408 	}
409 
410 	return 0;
411 }
412 
413 int
414 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
415 {
416 	int ret;
417 	const struct spdk_cpuset *validmask;
418 
419 	ret = spdk_cpuset_parse(cpumask, mask);
420 	if (ret < 0) {
421 		return ret;
422 	}
423 
424 	validmask = spdk_app_get_core_mask();
425 	spdk_cpuset_and(cpumask, validmask);
426 
427 	return 0;
428 }
429 
430 const struct spdk_cpuset *
431 spdk_app_get_core_mask(void)
432 {
433 	return &g_reactor_core_mask;
434 }
435 
436 void
437 spdk_reactors_start(void)
438 {
439 	struct spdk_reactor *reactor;
440 	struct spdk_cpuset tmp_cpumask = {};
441 	uint32_t i, current_core;
442 	int rc;
443 	char thread_name[32];
444 
445 	g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
446 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
447 
448 	current_core = spdk_env_get_current_core();
449 	SPDK_ENV_FOREACH_CORE(i) {
450 		if (i != current_core) {
451 			reactor = spdk_reactor_get(i);
452 			if (reactor == NULL) {
453 				continue;
454 			}
455 
456 			rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
457 			if (rc < 0) {
458 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
459 				assert(false);
460 				return;
461 			}
462 
463 			/* For now, for each reactor spawn one thread. */
464 			snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
465 
466 			spdk_cpuset_zero(&tmp_cpumask);
467 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
468 
469 			spdk_thread_create(thread_name, &tmp_cpumask);
470 		}
471 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
472 	}
473 
474 	/* Start the master reactor */
475 	reactor = spdk_reactor_get(current_core);
476 	assert(reactor != NULL);
477 	reactor_run(reactor);
478 
479 	spdk_env_thread_wait_all();
480 
481 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
482 }
483 
484 void
485 spdk_reactors_stop(void *arg1)
486 {
487 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
488 }
489 
490 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
491 static uint32_t g_next_core = UINT32_MAX;
492 
493 static void
494 _schedule_thread(void *arg1, void *arg2)
495 {
496 	struct spdk_lw_thread *lw_thread = arg1;
497 	struct spdk_reactor *reactor;
498 	uint32_t current_core;
499 
500 	current_core = spdk_env_get_current_core();
501 
502 	reactor = spdk_reactor_get(current_core);
503 	assert(reactor != NULL);
504 
505 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
506 	reactor->thread_count++;
507 }
508 
509 static int
510 _reactor_schedule_thread(struct spdk_thread *thread)
511 {
512 	uint32_t core;
513 	struct spdk_lw_thread *lw_thread;
514 	struct spdk_event *evt = NULL;
515 	struct spdk_cpuset *cpumask;
516 	uint32_t i;
517 
518 	cpumask = spdk_thread_get_cpumask(thread);
519 
520 	lw_thread = spdk_thread_get_ctx(thread);
521 	assert(lw_thread != NULL);
522 	memset(lw_thread, 0, sizeof(*lw_thread));
523 
524 	pthread_mutex_lock(&g_scheduler_mtx);
525 	for (i = 0; i < spdk_env_get_core_count(); i++) {
526 		if (g_next_core > spdk_env_get_last_core()) {
527 			g_next_core = spdk_env_get_first_core();
528 		}
529 		core = g_next_core;
530 		g_next_core = spdk_env_get_next_core(g_next_core);
531 
532 		if (spdk_cpuset_get_cpu(cpumask, core)) {
533 			evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
534 			break;
535 		}
536 	}
537 	pthread_mutex_unlock(&g_scheduler_mtx);
538 
539 	assert(evt != NULL);
540 	if (evt == NULL) {
541 		SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
542 		return -1;
543 	}
544 
545 	lw_thread->tsc_start = spdk_get_ticks();
546 
547 	spdk_event_call(evt);
548 
549 	return 0;
550 }
551 
552 static void
553 _reactor_request_thread_reschedule(struct spdk_thread *thread)
554 {
555 	struct spdk_lw_thread *lw_thread;
556 
557 	assert(thread == spdk_get_thread());
558 
559 	lw_thread = spdk_thread_get_ctx(thread);
560 
561 	assert(lw_thread != NULL);
562 
563 	lw_thread->resched = true;
564 }
565 
566 static int
567 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
568 {
569 	switch (op) {
570 	case SPDK_THREAD_OP_NEW:
571 		return _reactor_schedule_thread(thread);
572 	case SPDK_THREAD_OP_RESCHED:
573 		_reactor_request_thread_reschedule(thread);
574 		return 0;
575 	default:
576 		return -ENOTSUP;
577 	}
578 }
579 
580 static bool
581 reactor_thread_op_supported(enum spdk_thread_op op)
582 {
583 	switch (op) {
584 	case SPDK_THREAD_OP_NEW:
585 	case SPDK_THREAD_OP_RESCHED:
586 		return true;
587 	default:
588 		return false;
589 	}
590 }
591 
592 struct call_reactor {
593 	uint32_t cur_core;
594 	spdk_event_fn fn;
595 	void *arg1;
596 	void *arg2;
597 
598 	uint32_t orig_core;
599 	spdk_event_fn cpl;
600 };
601 
602 static void
603 on_reactor(void *arg1, void *arg2)
604 {
605 	struct call_reactor *cr = arg1;
606 	struct spdk_event *evt;
607 
608 	cr->fn(cr->arg1, cr->arg2);
609 
610 	cr->cur_core = spdk_env_get_next_core(cr->cur_core);
611 
612 	if (cr->cur_core > spdk_env_get_last_core()) {
613 		SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Completed reactor iteration\n");
614 
615 		evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
616 		free(cr);
617 	} else {
618 		SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Continuing reactor iteration to %d\n",
619 			      cr->cur_core);
620 
621 		evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
622 	}
623 	assert(evt != NULL);
624 	spdk_event_call(evt);
625 }
626 
627 void
628 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
629 {
630 	struct call_reactor *cr;
631 	struct spdk_event *evt;
632 
633 	cr = calloc(1, sizeof(*cr));
634 	if (!cr) {
635 		SPDK_ERRLOG("Unable to perform reactor iteration\n");
636 		cpl(arg1, arg2);
637 		return;
638 	}
639 
640 	cr->fn = fn;
641 	cr->arg1 = arg1;
642 	cr->arg2 = arg2;
643 	cr->cpl = cpl;
644 	cr->orig_core = spdk_env_get_current_core();
645 	cr->cur_core = spdk_env_get_first_core();
646 
647 	SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Starting reactor iteration from %d\n", cr->orig_core);
648 
649 	evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL);
650 	assert(evt != NULL);
651 
652 	spdk_event_call(evt);
653 }
654 
655 SPDK_LOG_REGISTER_COMPONENT("reactor", SPDK_LOG_REACTOR)
656