xref: /spdk/lib/event/reactor.c (revision f93b6fb0a4ebcee203e7c44c9e170c20bbce96cc)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/likely.h"
36 
37 #include "spdk_internal/event.h"
38 #include "spdk_internal/log.h"
39 #include "spdk_internal/thread.h"
40 
41 #include "spdk/log.h"
42 #include "spdk/thread.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 
46 #define SPDK_EVENT_BATCH_SIZE		8
47 
48 enum spdk_reactor_state {
49 	SPDK_REACTOR_STATE_INVALID = 0,
50 	SPDK_REACTOR_STATE_INITIALIZED = 1,
51 	SPDK_REACTOR_STATE_RUNNING = 2,
52 	SPDK_REACTOR_STATE_EXITING = 3,
53 	SPDK_REACTOR_STATE_SHUTDOWN = 4,
54 };
55 
56 struct spdk_lw_thread {
57 	TAILQ_ENTRY(spdk_lw_thread)	link;
58 };
59 
60 struct spdk_reactor {
61 	/* Logical core number for this reactor. */
62 	uint32_t					lcore;
63 
64 	/* Lightweight threads running on this reactor */
65 	TAILQ_HEAD(, spdk_lw_thread)			threads;
66 
67 	/* Poller for get the rusage for the reactor. */
68 	struct spdk_poller				*rusage_poller;
69 
70 	/* The last known rusage values */
71 	struct rusage					rusage;
72 
73 	struct spdk_ring				*events;
74 } __attribute__((aligned(64)));
75 
76 static struct spdk_reactor *g_reactors;
77 
78 static enum spdk_reactor_state	g_reactor_state = SPDK_REACTOR_STATE_INVALID;
79 
80 static bool g_context_switch_monitor_enabled = true;
81 
82 static void spdk_reactor_construct(struct spdk_reactor *w, uint32_t lcore);
83 
84 static struct spdk_mempool *g_spdk_event_mempool = NULL;
85 
86 static struct spdk_cpuset *g_spdk_app_core_mask;
87 
88 static struct spdk_reactor *
89 spdk_reactor_get(uint32_t lcore)
90 {
91 	struct spdk_reactor *reactor;
92 	reactor = spdk_likely(g_reactors) ? &g_reactors[lcore] : NULL;
93 	return reactor;
94 }
95 
96 struct spdk_event *
97 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
98 {
99 	struct spdk_event *event = NULL;
100 	struct spdk_reactor *reactor = spdk_reactor_get(lcore);
101 
102 	if (!reactor) {
103 		assert(false);
104 		return NULL;
105 	}
106 
107 	event = spdk_mempool_get(g_spdk_event_mempool);
108 	if (event == NULL) {
109 		assert(false);
110 		return NULL;
111 	}
112 
113 	event->lcore = lcore;
114 	event->fn = fn;
115 	event->arg1 = arg1;
116 	event->arg2 = arg2;
117 
118 	return event;
119 }
120 
121 void
122 spdk_event_call(struct spdk_event *event)
123 {
124 	int rc;
125 	struct spdk_reactor *reactor;
126 
127 	reactor = spdk_reactor_get(event->lcore);
128 
129 	assert(reactor->events != NULL);
130 	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1);
131 	if (rc != 1) {
132 		assert(false);
133 	}
134 }
135 
136 static inline uint32_t
137 _spdk_event_queue_run_batch(struct spdk_reactor *reactor)
138 {
139 	unsigned count, i;
140 	void *events[SPDK_EVENT_BATCH_SIZE];
141 	struct spdk_thread *thread;
142 	struct spdk_lw_thread *lw_thread;
143 
144 #ifdef DEBUG
145 	/*
146 	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
147 	 * so we will never actually read uninitialized data from events, but just to be sure
148 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
149 	 */
150 	memset(events, 0, sizeof(events));
151 #endif
152 
153 	count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
154 	if (count == 0) {
155 		return 0;
156 	}
157 
158 	/* Execute the events. There are still some remaining events
159 	 * that must occur on an SPDK thread. To accomodate those, try to
160 	 * run them on the first thread in the list, if it exists. */
161 	lw_thread = TAILQ_FIRST(&reactor->threads);
162 	if (lw_thread) {
163 		thread = spdk_thread_get_from_ctx(lw_thread);
164 	} else {
165 		thread = NULL;
166 	}
167 
168 	spdk_set_thread(thread);
169 
170 	for (i = 0; i < count; i++) {
171 		struct spdk_event *event = events[i];
172 
173 		assert(event != NULL);
174 		event->fn(event->arg1, event->arg2);
175 	}
176 
177 	spdk_set_thread(NULL);
178 
179 	spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
180 
181 	return count;
182 }
183 
184 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
185 
186 static int
187 get_rusage(struct spdk_reactor *reactor)
188 {
189 	struct rusage		rusage;
190 
191 	if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
192 		return -1;
193 	}
194 
195 	if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
196 		SPDK_INFOLOG(SPDK_LOG_REACTOR,
197 			     "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
198 			     reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
199 			     rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
200 	}
201 	reactor->rusage = rusage;
202 
203 	return -1;
204 }
205 
206 void
207 spdk_reactor_enable_context_switch_monitor(bool enable)
208 {
209 	/* This global is being read by multiple threads, so this isn't
210 	 * strictly thread safe. However, we're toggling between true and
211 	 * false here, and if a thread sees the value update later than it
212 	 * should, it's no big deal. */
213 	g_context_switch_monitor_enabled = enable;
214 }
215 
216 bool
217 spdk_reactor_context_switch_monitor_enabled(void)
218 {
219 	return g_context_switch_monitor_enabled;
220 }
221 
222 static int
223 _spdk_reactor_run(void *arg)
224 {
225 	struct spdk_reactor	*reactor = arg;
226 	struct spdk_thread	*thread;
227 	uint64_t		last_rusage = 0;
228 	struct spdk_lw_thread	*lw_thread, *tmp;
229 
230 	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
231 
232 	while (1) {
233 		uint64_t now;
234 
235 		/* For each loop through the reactor, capture the time. This time
236 		 * is used for all threads. */
237 		now = spdk_get_ticks();
238 
239 		_spdk_event_queue_run_batch(reactor);
240 
241 		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
242 			thread = spdk_thread_get_from_ctx(lw_thread);
243 
244 			spdk_thread_poll(thread, 0, now);
245 		}
246 
247 		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
248 			break;
249 		}
250 
251 		if (g_context_switch_monitor_enabled) {
252 			if ((last_rusage + CONTEXT_SWITCH_MONITOR_PERIOD) < now) {
253 				get_rusage(reactor);
254 				last_rusage = now;
255 			}
256 		}
257 	}
258 
259 	TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
260 		thread = spdk_thread_get_from_ctx(lw_thread);
261 		TAILQ_REMOVE(&reactor->threads, lw_thread, link);
262 		spdk_thread_exit(thread);
263 	}
264 
265 	return 0;
266 }
267 
268 static void
269 spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
270 {
271 	reactor->lcore = lcore;
272 
273 	TAILQ_INIT(&reactor->threads);
274 
275 	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
276 	assert(reactor->events != NULL);
277 }
278 
279 int
280 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
281 {
282 	int ret;
283 	struct spdk_cpuset *validmask;
284 
285 	ret = spdk_cpuset_parse(cpumask, mask);
286 	if (ret < 0) {
287 		return ret;
288 	}
289 
290 	validmask = spdk_app_get_core_mask();
291 	spdk_cpuset_and(cpumask, validmask);
292 
293 	return 0;
294 }
295 
296 struct spdk_cpuset *
297 spdk_app_get_core_mask(void)
298 {
299 	return g_spdk_app_core_mask;
300 }
301 
302 void
303 spdk_reactors_start(void)
304 {
305 	struct spdk_reactor *reactor;
306 	uint32_t i, current_core;
307 	int rc;
308 	char thread_name[32];
309 
310 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
311 	g_spdk_app_core_mask = spdk_cpuset_alloc();
312 
313 	current_core = spdk_env_get_current_core();
314 	SPDK_ENV_FOREACH_CORE(i) {
315 		if (i != current_core) {
316 			reactor = spdk_reactor_get(i);
317 			rc = spdk_env_thread_launch_pinned(reactor->lcore, _spdk_reactor_run, reactor);
318 			if (rc < 0) {
319 				SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
320 				assert(false);
321 				return;
322 			}
323 
324 			/* For now, for each reactor spawn one thread. */
325 			snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
326 			spdk_thread_create(thread_name);
327 		}
328 		spdk_cpuset_set_cpu(g_spdk_app_core_mask, i, true);
329 	}
330 
331 	/* Start the master reactor */
332 	reactor = spdk_reactor_get(current_core);
333 	_spdk_reactor_run(reactor);
334 
335 	spdk_env_thread_wait_all();
336 
337 	g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
338 	spdk_cpuset_free(g_spdk_app_core_mask);
339 	g_spdk_app_core_mask = NULL;
340 }
341 
342 void
343 spdk_reactors_stop(void *arg1)
344 {
345 	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
346 }
347 
348 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
349 static uint32_t g_next_core = UINT32_MAX;
350 
351 static void
352 _schedule_thread(void *arg1, void *arg2)
353 {
354 	struct spdk_lw_thread *lw_thread = arg1;
355 	struct spdk_reactor *reactor;
356 
357 	reactor = spdk_reactor_get(spdk_env_get_current_core());
358 
359 	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
360 }
361 
362 static void
363 spdk_reactor_schedule_thread(struct spdk_thread *thread)
364 {
365 	uint32_t core;
366 	struct spdk_lw_thread *lw_thread;
367 	struct spdk_event *evt;
368 
369 	lw_thread = spdk_thread_get_ctx(thread);
370 	assert(lw_thread != NULL);
371 	memset(lw_thread, 0, sizeof(*lw_thread));
372 
373 	pthread_mutex_lock(&g_scheduler_mtx);
374 	if (g_next_core > spdk_env_get_last_core()) {
375 		g_next_core = spdk_env_get_first_core();
376 	}
377 	core = g_next_core;
378 	g_next_core = spdk_env_get_next_core(g_next_core);
379 	pthread_mutex_unlock(&g_scheduler_mtx);
380 
381 	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
382 	spdk_event_call(evt);
383 }
384 
385 int
386 spdk_reactors_init(void)
387 {
388 	int rc;
389 	uint32_t i, last_core;
390 	struct spdk_reactor *reactor;
391 	char mempool_name[32];
392 
393 	snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
394 	g_spdk_event_mempool = spdk_mempool_create(mempool_name,
395 			       262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
396 			       sizeof(struct spdk_event),
397 			       SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
398 			       SPDK_ENV_SOCKET_ID_ANY);
399 
400 	if (g_spdk_event_mempool == NULL) {
401 		SPDK_ERRLOG("spdk_event_mempool creation failed\n");
402 		return -1;
403 	}
404 
405 	/* struct spdk_reactor must be aligned on 64 byte boundary */
406 	last_core = spdk_env_get_last_core();
407 	rc = posix_memalign((void **)&g_reactors, 64,
408 			    (last_core + 1) * sizeof(struct spdk_reactor));
409 	if (rc != 0) {
410 		SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
411 			    last_core + 1);
412 		spdk_mempool_free(g_spdk_event_mempool);
413 		return -1;
414 	}
415 
416 	memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));
417 
418 	spdk_thread_lib_init(spdk_reactor_schedule_thread, sizeof(struct spdk_lw_thread));
419 
420 	SPDK_ENV_FOREACH_CORE(i) {
421 		reactor = spdk_reactor_get(i);
422 		spdk_reactor_construct(reactor, i);
423 	}
424 
425 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
426 
427 	return 0;
428 }
429 
430 void
431 spdk_reactors_fini(void)
432 {
433 	uint32_t i;
434 	struct spdk_reactor *reactor;
435 
436 	spdk_thread_lib_fini();
437 
438 	SPDK_ENV_FOREACH_CORE(i) {
439 		reactor = spdk_reactor_get(i);
440 		if (spdk_likely(reactor != NULL) && reactor->events != NULL) {
441 			spdk_ring_free(reactor->events);
442 		}
443 	}
444 
445 	spdk_mempool_free(g_spdk_event_mempool);
446 
447 	free(g_reactors);
448 	g_reactors = NULL;
449 }
450 
451 SPDK_LOG_REGISTER_COMPONENT("reactor", SPDK_LOG_REACTOR)
452