xref: /spdk/lib/thread/thread.c (revision 8c2738a8fe4ab9b91f9b417054c9e2498fa8184b)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/queue.h"
39 #include "spdk/string.h"
40 #include "spdk/thread.h"
41 #include "spdk/tree.h"
42 #include "spdk/util.h"
43 #include "spdk/fd_group.h"
44 
45 #include "spdk/log.h"
46 #include "spdk_internal/thread.h"
47 
48 #ifdef __linux__
49 #include <sys/timerfd.h>
50 #include <sys/eventfd.h>
51 #endif
52 
53 #define SPDK_MSG_BATCH_SIZE		8
54 #define SPDK_MAX_DEVICE_NAME_LEN	256
55 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
56 #define SPDK_MAX_POLLER_NAME_LEN	256
57 #define SPDK_MAX_THREAD_NAME_LEN	256
58 
59 enum spdk_poller_state {
60 	/* The poller is registered with a thread but not currently executing its fn. */
61 	SPDK_POLLER_STATE_WAITING,
62 
63 	/* The poller is currently running its fn. */
64 	SPDK_POLLER_STATE_RUNNING,
65 
66 	/* The poller was unregistered during the execution of its fn. */
67 	SPDK_POLLER_STATE_UNREGISTERED,
68 
69 	/* The poller is in the process of being paused.  It will be paused
70 	 * during the next time it's supposed to be executed.
71 	 */
72 	SPDK_POLLER_STATE_PAUSING,
73 
74 	/* The poller is registered but currently paused.  It's on the
75 	 * paused_pollers list.
76 	 */
77 	SPDK_POLLER_STATE_PAUSED,
78 };
79 
80 struct spdk_poller {
81 	TAILQ_ENTRY(spdk_poller)	tailq;
82 	RB_ENTRY(spdk_poller)		node;
83 
84 	/* Current state of the poller; should only be accessed from the poller's thread. */
85 	enum spdk_poller_state		state;
86 
87 	uint64_t			period_ticks;
88 	uint64_t			next_run_tick;
89 	uint64_t			run_count;
90 	uint64_t			busy_count;
91 	spdk_poller_fn			fn;
92 	void				*arg;
93 	struct spdk_thread		*thread;
94 	int				interruptfd;
95 	spdk_poller_set_interrupt_mode_cb set_intr_cb_fn;
96 	void				*set_intr_cb_arg;
97 
98 	char				name[SPDK_MAX_POLLER_NAME_LEN + 1];
99 };
100 
101 enum spdk_thread_state {
102 	/* The thread is pocessing poller and message by spdk_thread_poll(). */
103 	SPDK_THREAD_STATE_RUNNING,
104 
105 	/* The thread is in the process of termination. It reaps unregistering
106 	 * poller are releasing I/O channel.
107 	 */
108 	SPDK_THREAD_STATE_EXITING,
109 
110 	/* The thread is exited. It is ready to call spdk_thread_destroy(). */
111 	SPDK_THREAD_STATE_EXITED,
112 };
113 
114 struct spdk_thread {
115 	uint64_t			tsc_last;
116 	struct spdk_thread_stats	stats;
117 	/*
118 	 * Contains pollers actively running on this thread.  Pollers
119 	 *  are run round-robin. The thread takes one poller from the head
120 	 *  of the ring, executes it, then puts it back at the tail of
121 	 *  the ring.
122 	 */
123 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
124 	/**
125 	 * Contains pollers running on this thread with a periodic timer.
126 	 */
127 	RB_HEAD(timed_pollers_tree, spdk_poller)	timed_pollers;
128 	struct spdk_poller				*first_timed_poller;
129 	/*
130 	 * Contains paused pollers.  Pollers on this queue are waiting until
131 	 * they are resumed (in which case they're put onto the active/timer
132 	 * queues) or unregistered.
133 	 */
134 	TAILQ_HEAD(paused_pollers_head, spdk_poller)	paused_pollers;
135 	struct spdk_ring		*messages;
136 	int				msg_fd;
137 	SLIST_HEAD(, spdk_msg)		msg_cache;
138 	size_t				msg_cache_count;
139 	spdk_msg_fn			critical_msg;
140 	uint64_t			id;
141 	enum spdk_thread_state		state;
142 	int				pending_unregister_count;
143 
144 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
145 	TAILQ_ENTRY(spdk_thread)	tailq;
146 
147 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
148 	struct spdk_cpuset		cpumask;
149 	uint64_t			exit_timeout_tsc;
150 
151 	/* Indicates whether this spdk_thread currently runs in interrupt. */
152 	bool				in_interrupt;
153 	struct spdk_fd_group		*fgrp;
154 
155 	/* User context allocated at the end */
156 	uint8_t				ctx[0];
157 };
158 
159 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
160 
161 static spdk_new_thread_fn g_new_thread_fn = NULL;
162 static spdk_thread_op_fn g_thread_op_fn = NULL;
163 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
164 static size_t g_ctx_sz = 0;
165 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
166  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
167  * SPDK application is required.
168  */
169 static uint64_t g_thread_id = 1;
170 
171 struct io_device {
172 	void				*io_device;
173 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
174 	spdk_io_channel_create_cb	create_cb;
175 	spdk_io_channel_destroy_cb	destroy_cb;
176 	spdk_io_device_unregister_cb	unregister_cb;
177 	struct spdk_thread		*unregister_thread;
178 	uint32_t			ctx_size;
179 	uint32_t			for_each_count;
180 	TAILQ_ENTRY(io_device)		tailq;
181 
182 	uint32_t			refcnt;
183 
184 	bool				unregistered;
185 };
186 
187 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
188 
189 struct spdk_msg {
190 	spdk_msg_fn		fn;
191 	void			*arg;
192 
193 	SLIST_ENTRY(spdk_msg)	link;
194 };
195 
196 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
197 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
198 
199 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
200 static uint32_t g_thread_count = 0;
201 
202 static __thread struct spdk_thread *tls_thread = NULL;
203 
204 /*
205  * If this compare function returns zero when two next_run_ticks are equal,
206  * the macro RB_INSERT() returns a pointer to the element with the same
207  * next_run_tick.
208  *
209  * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element
210  * to remove as a parameter.
211  *
212  * Hence we allow RB_INSERT() to insert elements with the same keys on the right
213  * side by returning 1 when two next_run_ticks are equal.
214  */
215 static inline int
216 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2)
217 {
218 	if (poller1->next_run_tick < poller2->next_run_tick) {
219 		return -1;
220 	} else {
221 		return 1;
222 	}
223 }
224 
225 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare);
226 
227 static inline struct spdk_thread *
228 _get_thread(void)
229 {
230 	return tls_thread;
231 }
232 
233 static int
234 _thread_lib_init(size_t ctx_sz)
235 {
236 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
237 
238 	g_ctx_sz = ctx_sz;
239 
240 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
241 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
242 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
243 			     sizeof(struct spdk_msg),
244 			     0, /* No cache. We do our own. */
245 			     SPDK_ENV_SOCKET_ID_ANY);
246 
247 	if (!g_spdk_msg_mempool) {
248 		SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
249 		return -1;
250 	}
251 
252 	return 0;
253 }
254 
255 int
256 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
257 {
258 	assert(g_new_thread_fn == NULL);
259 	assert(g_thread_op_fn == NULL);
260 
261 	if (new_thread_fn == NULL) {
262 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
263 	} else {
264 		g_new_thread_fn = new_thread_fn;
265 	}
266 
267 	return _thread_lib_init(ctx_sz);
268 }
269 
270 int
271 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
272 			 spdk_thread_op_supported_fn thread_op_supported_fn,
273 			 size_t ctx_sz)
274 {
275 	assert(g_new_thread_fn == NULL);
276 	assert(g_thread_op_fn == NULL);
277 	assert(g_thread_op_supported_fn == NULL);
278 
279 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
280 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
281 		return -EINVAL;
282 	}
283 
284 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
285 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
286 	} else {
287 		g_thread_op_fn = thread_op_fn;
288 		g_thread_op_supported_fn = thread_op_supported_fn;
289 	}
290 
291 	return _thread_lib_init(ctx_sz);
292 }
293 
294 void
295 spdk_thread_lib_fini(void)
296 {
297 	struct io_device *dev;
298 
299 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
300 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
301 	}
302 
303 	if (g_spdk_msg_mempool) {
304 		spdk_mempool_free(g_spdk_msg_mempool);
305 		g_spdk_msg_mempool = NULL;
306 	}
307 
308 	g_new_thread_fn = NULL;
309 	g_thread_op_fn = NULL;
310 	g_thread_op_supported_fn = NULL;
311 	g_ctx_sz = 0;
312 }
313 
314 static void thread_interrupt_destroy(struct spdk_thread *thread);
315 static int thread_interrupt_create(struct spdk_thread *thread);
316 
317 static void
318 _free_thread(struct spdk_thread *thread)
319 {
320 	struct spdk_io_channel *ch;
321 	struct spdk_msg *msg;
322 	struct spdk_poller *poller, *ptmp;
323 
324 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
325 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
326 			    thread->name, ch->dev->name);
327 	}
328 
329 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
330 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
331 			SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
332 				     poller->name);
333 		}
334 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
335 		free(poller);
336 	}
337 
338 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) {
339 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
340 			SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
341 				     poller->name);
342 		}
343 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
344 		free(poller);
345 	}
346 
347 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
348 		SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
349 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
350 		free(poller);
351 	}
352 
353 	pthread_mutex_lock(&g_devlist_mutex);
354 	assert(g_thread_count > 0);
355 	g_thread_count--;
356 	TAILQ_REMOVE(&g_threads, thread, tailq);
357 	pthread_mutex_unlock(&g_devlist_mutex);
358 
359 	msg = SLIST_FIRST(&thread->msg_cache);
360 	while (msg != NULL) {
361 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
362 
363 		assert(thread->msg_cache_count > 0);
364 		thread->msg_cache_count--;
365 		spdk_mempool_put(g_spdk_msg_mempool, msg);
366 
367 		msg = SLIST_FIRST(&thread->msg_cache);
368 	}
369 
370 	assert(thread->msg_cache_count == 0);
371 
372 	if (spdk_interrupt_mode_is_enabled()) {
373 		thread_interrupt_destroy(thread);
374 	}
375 
376 	spdk_ring_free(thread->messages);
377 	free(thread);
378 }
379 
380 struct spdk_thread *
381 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
382 {
383 	struct spdk_thread *thread;
384 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
385 	int rc = 0, i;
386 
387 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
388 	if (!thread) {
389 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
390 		return NULL;
391 	}
392 
393 	if (cpumask) {
394 		spdk_cpuset_copy(&thread->cpumask, cpumask);
395 	} else {
396 		spdk_cpuset_negate(&thread->cpumask);
397 	}
398 
399 	TAILQ_INIT(&thread->io_channels);
400 	TAILQ_INIT(&thread->active_pollers);
401 	RB_INIT(&thread->timed_pollers);
402 	TAILQ_INIT(&thread->paused_pollers);
403 	SLIST_INIT(&thread->msg_cache);
404 	thread->msg_cache_count = 0;
405 
406 	thread->tsc_last = spdk_get_ticks();
407 
408 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
409 	if (!thread->messages) {
410 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
411 		free(thread);
412 		return NULL;
413 	}
414 
415 	/* Fill the local message pool cache. */
416 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
417 	if (rc == 0) {
418 		/* If we can't populate the cache it's ok. The cache will get filled
419 		 * up organically as messages are passed to the thread. */
420 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
421 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
422 			thread->msg_cache_count++;
423 		}
424 	}
425 
426 	if (name) {
427 		snprintf(thread->name, sizeof(thread->name), "%s", name);
428 	} else {
429 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
430 	}
431 
432 	pthread_mutex_lock(&g_devlist_mutex);
433 	if (g_thread_id == 0) {
434 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
435 		pthread_mutex_unlock(&g_devlist_mutex);
436 		_free_thread(thread);
437 		return NULL;
438 	}
439 	thread->id = g_thread_id++;
440 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
441 	g_thread_count++;
442 	pthread_mutex_unlock(&g_devlist_mutex);
443 
444 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
445 		      thread->id, thread->name);
446 
447 	if (spdk_interrupt_mode_is_enabled()) {
448 		thread->in_interrupt = true;
449 		rc = thread_interrupt_create(thread);
450 		if (rc != 0) {
451 			_free_thread(thread);
452 			return NULL;
453 		}
454 	}
455 
456 	if (g_new_thread_fn) {
457 		rc = g_new_thread_fn(thread);
458 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
459 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
460 	}
461 
462 	if (rc != 0) {
463 		_free_thread(thread);
464 		return NULL;
465 	}
466 
467 	thread->state = SPDK_THREAD_STATE_RUNNING;
468 
469 	return thread;
470 }
471 
472 void
473 spdk_set_thread(struct spdk_thread *thread)
474 {
475 	tls_thread = thread;
476 }
477 
478 static void
479 thread_exit(struct spdk_thread *thread, uint64_t now)
480 {
481 	struct spdk_poller *poller;
482 	struct spdk_io_channel *ch;
483 
484 	if (now >= thread->exit_timeout_tsc) {
485 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
486 			    thread->name);
487 		goto exited;
488 	}
489 
490 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
491 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
492 			SPDK_INFOLOG(thread,
493 				     "thread %s still has active poller %s\n",
494 				     thread->name, poller->name);
495 			return;
496 		}
497 	}
498 
499 	RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) {
500 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
501 			SPDK_INFOLOG(thread,
502 				     "thread %s still has active timed poller %s\n",
503 				     thread->name, poller->name);
504 			return;
505 		}
506 	}
507 
508 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
509 		SPDK_INFOLOG(thread,
510 			     "thread %s still has paused poller %s\n",
511 			     thread->name, poller->name);
512 		return;
513 	}
514 
515 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
516 		SPDK_INFOLOG(thread,
517 			     "thread %s still has channel for io_device %s\n",
518 			     thread->name, ch->dev->name);
519 		return;
520 	}
521 
522 	if (thread->pending_unregister_count > 0) {
523 		SPDK_INFOLOG(thread,
524 			     "thread %s is still unregistering io_devices\n",
525 			     thread->name);
526 		return;
527 	}
528 
529 exited:
530 	thread->state = SPDK_THREAD_STATE_EXITED;
531 }
532 
533 int
534 spdk_thread_exit(struct spdk_thread *thread)
535 {
536 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
537 
538 	assert(tls_thread == thread);
539 
540 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
541 		SPDK_INFOLOG(thread,
542 			     "thread %s is already exiting\n",
543 			     thread->name);
544 		return 0;
545 	}
546 
547 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
548 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
549 	thread->state = SPDK_THREAD_STATE_EXITING;
550 	return 0;
551 }
552 
553 bool
554 spdk_thread_is_exited(struct spdk_thread *thread)
555 {
556 	return thread->state == SPDK_THREAD_STATE_EXITED;
557 }
558 
559 void
560 spdk_thread_destroy(struct spdk_thread *thread)
561 {
562 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
563 
564 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
565 
566 	if (tls_thread == thread) {
567 		tls_thread = NULL;
568 	}
569 
570 	_free_thread(thread);
571 }
572 
573 void *
574 spdk_thread_get_ctx(struct spdk_thread *thread)
575 {
576 	if (g_ctx_sz > 0) {
577 		return thread->ctx;
578 	}
579 
580 	return NULL;
581 }
582 
583 struct spdk_cpuset *
584 spdk_thread_get_cpumask(struct spdk_thread *thread)
585 {
586 	return &thread->cpumask;
587 }
588 
589 int
590 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
591 {
592 	struct spdk_thread *thread;
593 
594 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
595 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
596 		assert(false);
597 		return -ENOTSUP;
598 	}
599 
600 	thread = spdk_get_thread();
601 	if (!thread) {
602 		SPDK_ERRLOG("Called from non-SPDK thread\n");
603 		assert(false);
604 		return -EINVAL;
605 	}
606 
607 	spdk_cpuset_copy(&thread->cpumask, cpumask);
608 
609 	/* Invoke framework's reschedule operation. If this function is called multiple times
610 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
611 	 * reschedule operation.
612 	 */
613 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
614 
615 	return 0;
616 }
617 
618 struct spdk_thread *
619 spdk_thread_get_from_ctx(void *ctx)
620 {
621 	if (ctx == NULL) {
622 		assert(false);
623 		return NULL;
624 	}
625 
626 	assert(g_ctx_sz > 0);
627 
628 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
629 }
630 
631 static inline uint32_t
632 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
633 {
634 	unsigned count, i;
635 	void *messages[SPDK_MSG_BATCH_SIZE];
636 	uint64_t notify = 1;
637 	int rc;
638 
639 #ifdef DEBUG
640 	/*
641 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
642 	 * so we will never actually read uninitialized data from events, but just to be sure
643 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
644 	 */
645 	memset(messages, 0, sizeof(messages));
646 #endif
647 
648 	if (max_msgs > 0) {
649 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
650 	} else {
651 		max_msgs = SPDK_MSG_BATCH_SIZE;
652 	}
653 
654 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
655 	if (spdk_unlikely(thread->in_interrupt) &&
656 	    spdk_ring_count(thread->messages) != 0) {
657 		rc = write(thread->msg_fd, &notify, sizeof(notify));
658 		if (rc < 0) {
659 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
660 		}
661 	}
662 	if (count == 0) {
663 		return 0;
664 	}
665 
666 	for (i = 0; i < count; i++) {
667 		struct spdk_msg *msg = messages[i];
668 
669 		assert(msg != NULL);
670 		msg->fn(msg->arg);
671 
672 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
673 			/* Insert the messages at the head. We want to re-use the hot
674 			 * ones. */
675 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
676 			thread->msg_cache_count++;
677 		} else {
678 			spdk_mempool_put(g_spdk_msg_mempool, msg);
679 		}
680 	}
681 
682 	return count;
683 }
684 
685 static void
686 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
687 {
688 	struct spdk_poller *tmp __attribute__((unused));
689 
690 	poller->next_run_tick = now + poller->period_ticks;
691 
692 	/*
693 	 * Insert poller in the thread's timed_pollers tree by next scheduled run time
694 	 * as its key.
695 	 */
696 	tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller);
697 	assert(tmp == NULL);
698 
699 	/* Update the cache only if it is empty or the inserted poller is earlier than it.
700 	 * RB_MIN() is not necessary here because all pollers, which has exactly the same
701 	 * next_run_tick as the existing poller, are inserted on the right side.
702 	 */
703 	if (thread->first_timed_poller == NULL ||
704 	    poller->next_run_tick < thread->first_timed_poller->next_run_tick) {
705 		thread->first_timed_poller = poller;
706 	}
707 }
708 
709 #ifdef __linux__
710 static inline void
711 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller)
712 {
713 	struct spdk_poller *tmp __attribute__((unused));
714 
715 	tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
716 	assert(tmp != NULL);
717 
718 	/* This function is not used in any case that is performance critical.
719 	 * Update the cache simply by RB_MIN() if it needs to be changed.
720 	 */
721 	if (thread->first_timed_poller == poller) {
722 		thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers);
723 	}
724 }
725 #endif
726 
727 static void
728 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
729 {
730 	if (poller->period_ticks) {
731 		poller_insert_timer(thread, poller, spdk_get_ticks());
732 	} else {
733 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
734 	}
735 }
736 
737 static inline void
738 thread_update_stats(struct spdk_thread *thread, uint64_t end,
739 		    uint64_t start, int rc)
740 {
741 	if (rc == 0) {
742 		/* Poller status idle */
743 		thread->stats.idle_tsc += end - start;
744 	} else if (rc > 0) {
745 		/* Poller status busy */
746 		thread->stats.busy_tsc += end - start;
747 	}
748 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
749 	thread->tsc_last = end;
750 }
751 
752 static inline int
753 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller)
754 {
755 	int rc;
756 
757 	switch (poller->state) {
758 	case SPDK_POLLER_STATE_UNREGISTERED:
759 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
760 		free(poller);
761 		return 0;
762 	case SPDK_POLLER_STATE_PAUSING:
763 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
764 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
765 		poller->state = SPDK_POLLER_STATE_PAUSED;
766 		return 0;
767 	case SPDK_POLLER_STATE_WAITING:
768 		break;
769 	default:
770 		assert(false);
771 		break;
772 	}
773 
774 	poller->state = SPDK_POLLER_STATE_RUNNING;
775 	rc = poller->fn(poller->arg);
776 
777 	poller->run_count++;
778 	if (rc > 0) {
779 		poller->busy_count++;
780 	}
781 
782 #ifdef DEBUG
783 	if (rc == -1) {
784 		SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
785 	}
786 #endif
787 
788 	switch (poller->state) {
789 	case SPDK_POLLER_STATE_UNREGISTERED:
790 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
791 		free(poller);
792 		break;
793 	case SPDK_POLLER_STATE_PAUSING:
794 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
795 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
796 		poller->state = SPDK_POLLER_STATE_PAUSED;
797 		break;
798 	case SPDK_POLLER_STATE_PAUSED:
799 	case SPDK_POLLER_STATE_WAITING:
800 		break;
801 	case SPDK_POLLER_STATE_RUNNING:
802 		poller->state = SPDK_POLLER_STATE_WAITING;
803 		break;
804 	default:
805 		assert(false);
806 		break;
807 	}
808 
809 	return rc;
810 }
811 
812 static inline int
813 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller,
814 			    uint64_t now)
815 {
816 	int rc;
817 
818 	switch (poller->state) {
819 	case SPDK_POLLER_STATE_UNREGISTERED:
820 		free(poller);
821 		return 0;
822 	case SPDK_POLLER_STATE_PAUSING:
823 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
824 		poller->state = SPDK_POLLER_STATE_PAUSED;
825 		return 0;
826 	case SPDK_POLLER_STATE_WAITING:
827 		break;
828 	default:
829 		assert(false);
830 		break;
831 	}
832 
833 	poller->state = SPDK_POLLER_STATE_RUNNING;
834 	rc = poller->fn(poller->arg);
835 
836 	poller->run_count++;
837 	if (rc > 0) {
838 		poller->busy_count++;
839 	}
840 
841 #ifdef DEBUG
842 	if (rc == -1) {
843 		SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
844 	}
845 #endif
846 
847 	switch (poller->state) {
848 	case SPDK_POLLER_STATE_UNREGISTERED:
849 		free(poller);
850 		break;
851 	case SPDK_POLLER_STATE_PAUSING:
852 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
853 		poller->state = SPDK_POLLER_STATE_PAUSED;
854 		break;
855 	case SPDK_POLLER_STATE_PAUSED:
856 		break;
857 	case SPDK_POLLER_STATE_RUNNING:
858 		poller->state = SPDK_POLLER_STATE_WAITING;
859 	/* fallthrough */
860 	case SPDK_POLLER_STATE_WAITING:
861 		poller_insert_timer(thread, poller, now);
862 		break;
863 	default:
864 		assert(false);
865 		break;
866 	}
867 
868 	return rc;
869 }
870 
871 static int
872 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
873 {
874 	uint32_t msg_count;
875 	struct spdk_poller *poller, *tmp;
876 	spdk_msg_fn critical_msg;
877 	int rc = 0;
878 
879 	thread->tsc_last = now;
880 
881 	critical_msg = thread->critical_msg;
882 	if (spdk_unlikely(critical_msg != NULL)) {
883 		critical_msg(NULL);
884 		thread->critical_msg = NULL;
885 		rc = 1;
886 	}
887 
888 	msg_count = msg_queue_run_batch(thread, max_msgs);
889 	if (msg_count) {
890 		rc = 1;
891 	}
892 
893 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
894 				   active_pollers_head, tailq, tmp) {
895 		int poller_rc;
896 
897 		poller_rc = thread_execute_poller(thread, poller);
898 		if (poller_rc > rc) {
899 			rc = poller_rc;
900 		}
901 	}
902 
903 	poller = thread->first_timed_poller;
904 	while (poller != NULL) {
905 		int timer_rc = 0;
906 
907 		if (now < poller->next_run_tick) {
908 			break;
909 		}
910 
911 		tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller);
912 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
913 
914 		/* Update the cache to the next timed poller in the list
915 		 * only if the current poller is still the closest, otherwise,
916 		 * do nothing because the cache has been already updated.
917 		 */
918 		if (thread->first_timed_poller == poller) {
919 			thread->first_timed_poller = tmp;
920 		}
921 
922 		timer_rc = thread_execute_timed_poller(thread, poller, now);
923 		if (timer_rc > rc) {
924 			rc = timer_rc;
925 		}
926 
927 		poller = tmp;
928 	}
929 
930 	return rc;
931 }
932 
933 int
934 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
935 {
936 	struct spdk_thread *orig_thread;
937 	int rc;
938 	uint64_t notify = 1;
939 
940 	orig_thread = _get_thread();
941 	tls_thread = thread;
942 
943 	if (now == 0) {
944 		now = spdk_get_ticks();
945 	}
946 
947 	if (spdk_likely(!thread->in_interrupt)) {
948 		rc = thread_poll(thread, max_msgs, now);
949 		if (spdk_unlikely(thread->in_interrupt)) {
950 			/* The thread transitioned to interrupt mode during the above poll.
951 			 * Poll it one more time in case that during the transition time
952 			 * there is msg received without notification.
953 			 */
954 			rc = thread_poll(thread, max_msgs, now);
955 		}
956 	} else {
957 		/* Non-block wait on thread's fd_group */
958 		rc = spdk_fd_group_wait(thread->fgrp, 0);
959 		if (spdk_unlikely(!thread->in_interrupt)) {
960 			/* The thread transitioned to poll mode in a msg during the above processing.
961 			 * Clear msg_fd since thread messages will be polled directly in poll mode.
962 			 */
963 			rc = read(thread->msg_fd, &notify, sizeof(notify));
964 			if (rc < 0 && errno != EAGAIN) {
965 				SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno));
966 			}
967 		}
968 	}
969 
970 
971 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
972 		thread_exit(thread, now);
973 	}
974 
975 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
976 
977 	tls_thread = orig_thread;
978 
979 	return rc;
980 }
981 
982 uint64_t
983 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
984 {
985 	struct spdk_poller *poller;
986 
987 	poller = thread->first_timed_poller;
988 	if (poller) {
989 		return poller->next_run_tick;
990 	}
991 
992 	return 0;
993 }
994 
995 int
996 spdk_thread_has_active_pollers(struct spdk_thread *thread)
997 {
998 	return !TAILQ_EMPTY(&thread->active_pollers);
999 }
1000 
1001 static bool
1002 thread_has_unpaused_pollers(struct spdk_thread *thread)
1003 {
1004 	if (TAILQ_EMPTY(&thread->active_pollers) &&
1005 	    RB_EMPTY(&thread->timed_pollers)) {
1006 		return false;
1007 	}
1008 
1009 	return true;
1010 }
1011 
1012 bool
1013 spdk_thread_has_pollers(struct spdk_thread *thread)
1014 {
1015 	if (!thread_has_unpaused_pollers(thread) &&
1016 	    TAILQ_EMPTY(&thread->paused_pollers)) {
1017 		return false;
1018 	}
1019 
1020 	return true;
1021 }
1022 
1023 bool
1024 spdk_thread_is_idle(struct spdk_thread *thread)
1025 {
1026 	if (spdk_ring_count(thread->messages) ||
1027 	    thread_has_unpaused_pollers(thread) ||
1028 	    thread->critical_msg != NULL) {
1029 		return false;
1030 	}
1031 
1032 	return true;
1033 }
1034 
1035 uint32_t
1036 spdk_thread_get_count(void)
1037 {
1038 	/*
1039 	 * Return cached value of the current thread count.  We could acquire the
1040 	 *  lock and iterate through the TAILQ of threads to count them, but that
1041 	 *  count could still be invalidated after we release the lock.
1042 	 */
1043 	return g_thread_count;
1044 }
1045 
1046 struct spdk_thread *
1047 spdk_get_thread(void)
1048 {
1049 	return _get_thread();
1050 }
1051 
1052 const char *
1053 spdk_thread_get_name(const struct spdk_thread *thread)
1054 {
1055 	return thread->name;
1056 }
1057 
1058 uint64_t
1059 spdk_thread_get_id(const struct spdk_thread *thread)
1060 {
1061 	return thread->id;
1062 }
1063 
1064 struct spdk_thread *
1065 spdk_thread_get_by_id(uint64_t id)
1066 {
1067 	struct spdk_thread *thread;
1068 
1069 	if (id == 0 || id >= g_thread_id) {
1070 		SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
1071 		return NULL;
1072 	}
1073 	pthread_mutex_lock(&g_devlist_mutex);
1074 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1075 		if (thread->id == id) {
1076 			break;
1077 		}
1078 	}
1079 	pthread_mutex_unlock(&g_devlist_mutex);
1080 	return thread;
1081 }
1082 
1083 int
1084 spdk_thread_get_stats(struct spdk_thread_stats *stats)
1085 {
1086 	struct spdk_thread *thread;
1087 
1088 	thread = _get_thread();
1089 	if (!thread) {
1090 		SPDK_ERRLOG("No thread allocated\n");
1091 		return -EINVAL;
1092 	}
1093 
1094 	if (stats == NULL) {
1095 		return -EINVAL;
1096 	}
1097 
1098 	*stats = thread->stats;
1099 
1100 	return 0;
1101 }
1102 
1103 uint64_t
1104 spdk_thread_get_last_tsc(struct spdk_thread *thread)
1105 {
1106 	if (thread == NULL) {
1107 		thread = _get_thread();
1108 	}
1109 
1110 	return thread->tsc_last;
1111 }
1112 
1113 static inline int
1114 thread_send_msg_notification(const struct spdk_thread *target_thread)
1115 {
1116 	uint64_t notify = 1;
1117 	int rc;
1118 
1119 	/* Not necessary to do notification if interrupt facility is not enabled */
1120 	if (spdk_likely(!spdk_interrupt_mode_is_enabled())) {
1121 		return 0;
1122 	}
1123 
1124 	/* When each spdk_thread can switch between poll and interrupt mode dynamically,
1125 	 * after sending thread msg, it is necessary to check whether target thread runs in
1126 	 * interrupt mode and then decide whether do event notification.
1127 	 */
1128 	if (spdk_unlikely(target_thread->in_interrupt)) {
1129 		rc = write(target_thread->msg_fd, &notify, sizeof(notify));
1130 		if (rc < 0) {
1131 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
1132 			return -EIO;
1133 		}
1134 	}
1135 
1136 	return 0;
1137 }
1138 
1139 int
1140 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
1141 {
1142 	struct spdk_thread *local_thread;
1143 	struct spdk_msg *msg;
1144 	int rc;
1145 
1146 	assert(thread != NULL);
1147 
1148 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1149 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
1150 		return -EIO;
1151 	}
1152 
1153 	local_thread = _get_thread();
1154 
1155 	msg = NULL;
1156 	if (local_thread != NULL) {
1157 		if (local_thread->msg_cache_count > 0) {
1158 			msg = SLIST_FIRST(&local_thread->msg_cache);
1159 			assert(msg != NULL);
1160 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
1161 			local_thread->msg_cache_count--;
1162 		}
1163 	}
1164 
1165 	if (msg == NULL) {
1166 		msg = spdk_mempool_get(g_spdk_msg_mempool);
1167 		if (!msg) {
1168 			SPDK_ERRLOG("msg could not be allocated\n");
1169 			return -ENOMEM;
1170 		}
1171 	}
1172 
1173 	msg->fn = fn;
1174 	msg->arg = ctx;
1175 
1176 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
1177 	if (rc != 1) {
1178 		SPDK_ERRLOG("msg could not be enqueued\n");
1179 		spdk_mempool_put(g_spdk_msg_mempool, msg);
1180 		return -EIO;
1181 	}
1182 
1183 	return thread_send_msg_notification(thread);
1184 }
1185 
1186 int
1187 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
1188 {
1189 	spdk_msg_fn expected = NULL;
1190 
1191 	if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
1192 					 __ATOMIC_SEQ_CST)) {
1193 		return -EIO;
1194 	}
1195 
1196 	return thread_send_msg_notification(thread);
1197 }
1198 
1199 #ifdef __linux__
1200 static int
1201 interrupt_timerfd_process(void *arg)
1202 {
1203 	struct spdk_poller *poller = arg;
1204 	uint64_t exp;
1205 	int rc;
1206 
1207 	/* clear the level of interval timer */
1208 	rc = read(poller->interruptfd, &exp, sizeof(exp));
1209 	if (rc < 0) {
1210 		if (rc == -EAGAIN) {
1211 			return 0;
1212 		}
1213 
1214 		return rc;
1215 	}
1216 
1217 	return poller->fn(poller->arg);
1218 }
1219 
1220 static int
1221 period_poller_interrupt_init(struct spdk_poller *poller)
1222 {
1223 	struct spdk_fd_group *fgrp = poller->thread->fgrp;
1224 	int timerfd;
1225 	int rc;
1226 
1227 	SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name);
1228 	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
1229 	if (timerfd < 0) {
1230 		return -errno;
1231 	}
1232 
1233 	rc = spdk_fd_group_add(fgrp, timerfd,
1234 			       interrupt_timerfd_process, poller);
1235 	if (rc < 0) {
1236 		close(timerfd);
1237 		return rc;
1238 	}
1239 
1240 	poller->interruptfd = timerfd;
1241 	return 0;
1242 }
1243 
1244 static void
1245 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1246 {
1247 	int timerfd = poller->interruptfd;
1248 	uint64_t now_tick = spdk_get_ticks();
1249 	uint64_t ticks = spdk_get_ticks_hz();
1250 	int ret;
1251 	struct itimerspec new_tv = {};
1252 	struct itimerspec old_tv = {};
1253 
1254 	assert(poller->period_ticks != 0);
1255 	assert(timerfd >= 0);
1256 
1257 	SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name,
1258 		      interrupt_mode ? "interrupt" : "poll");
1259 
1260 	if (interrupt_mode) {
1261 		/* Set repeated timer expiration */
1262 		new_tv.it_interval.tv_sec = poller->period_ticks / ticks;
1263 		new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks;
1264 
1265 		/* Update next timer expiration */
1266 		if (poller->next_run_tick == 0) {
1267 			poller->next_run_tick = now_tick + poller->period_ticks;
1268 		} else if (poller->next_run_tick < now_tick) {
1269 			poller->next_run_tick = now_tick;
1270 		}
1271 
1272 		new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks;
1273 		new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks;
1274 
1275 		ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
1276 		if (ret < 0) {
1277 			SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno);
1278 			assert(false);
1279 		}
1280 	} else {
1281 		/* Disarm the timer */
1282 		ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv);
1283 		if (ret < 0) {
1284 			/* timerfd_settime's failure indicates that the timerfd is in error */
1285 			SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno);
1286 			assert(false);
1287 		}
1288 
1289 		/* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be
1290 		 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC
1291 		 */
1292 		now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \
1293 			   (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC;
1294 		poller_remove_timer(poller->thread, poller);
1295 		poller_insert_timer(poller->thread, poller, now_tick);
1296 	}
1297 }
1298 
1299 static void
1300 poller_interrupt_fini(struct spdk_poller *poller)
1301 {
1302 	SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name);
1303 	assert(poller->interruptfd >= 0);
1304 	spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd);
1305 	close(poller->interruptfd);
1306 	poller->interruptfd = -1;
1307 }
1308 
1309 static int
1310 busy_poller_interrupt_init(struct spdk_poller *poller)
1311 {
1312 	int busy_efd;
1313 	int rc;
1314 
1315 	SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name);
1316 	busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1317 	if (busy_efd < 0) {
1318 		SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name);
1319 		return -errno;
1320 	}
1321 
1322 	rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, poller->fn, poller->arg);
1323 	if (rc < 0) {
1324 		close(busy_efd);
1325 		return rc;
1326 	}
1327 
1328 	poller->interruptfd = busy_efd;
1329 	return 0;
1330 }
1331 
1332 static void
1333 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1334 {
1335 	int busy_efd = poller->interruptfd;
1336 	uint64_t notify = 1;
1337 	int rc __attribute__((unused));
1338 
1339 	assert(busy_efd >= 0);
1340 
1341 	if (interrupt_mode) {
1342 		/* Write without read on eventfd will get it repeatedly triggered. */
1343 		if (write(busy_efd, &notify, sizeof(notify)) < 0) {
1344 			SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name);
1345 		}
1346 	} else {
1347 		/* Read on eventfd will clear its level triggering. */
1348 		rc = read(busy_efd, &notify, sizeof(notify));
1349 	}
1350 }
1351 
1352 #else
1353 
1354 static int
1355 period_poller_interrupt_init(struct spdk_poller *poller)
1356 {
1357 	return -ENOTSUP;
1358 }
1359 
1360 static void
1361 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1362 {
1363 }
1364 
1365 static void
1366 poller_interrupt_fini(struct spdk_poller *poller)
1367 {
1368 }
1369 
1370 static int
1371 busy_poller_interrupt_init(struct spdk_poller *poller)
1372 {
1373 	return -ENOTSUP;
1374 }
1375 
1376 static void
1377 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1378 {
1379 }
1380 
1381 #endif
1382 
1383 void
1384 spdk_poller_register_interrupt(struct spdk_poller *poller,
1385 			       spdk_poller_set_interrupt_mode_cb cb_fn,
1386 			       void *cb_arg)
1387 {
1388 	assert(poller != NULL);
1389 	assert(cb_fn != NULL);
1390 	assert(spdk_get_thread() == poller->thread);
1391 
1392 	if (!spdk_interrupt_mode_is_enabled()) {
1393 		return;
1394 	}
1395 
1396 	/* when a poller is created we don't know if the user is ever going to
1397 	 * enable interrupts on it by calling this function, so the poller
1398 	 * registration function has to immediately create a interruptfd.
1399 	 * When this function does get called by user, we have to then destroy
1400 	 * that interruptfd.
1401 	 */
1402 	if (poller->set_intr_cb_fn && poller->interruptfd >= 0) {
1403 		poller_interrupt_fini(poller);
1404 	}
1405 
1406 	poller->set_intr_cb_fn = cb_fn;
1407 	poller->set_intr_cb_arg = cb_arg;
1408 
1409 	/* Set poller into interrupt mode if thread is in interrupt. */
1410 	if (poller->thread->in_interrupt) {
1411 		poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true);
1412 	}
1413 }
1414 
1415 static uint64_t
1416 convert_us_to_ticks(uint64_t us)
1417 {
1418 	uint64_t quotient, remainder, ticks;
1419 
1420 	if (us) {
1421 		quotient = us / SPDK_SEC_TO_USEC;
1422 		remainder = us % SPDK_SEC_TO_USEC;
1423 		ticks = spdk_get_ticks_hz();
1424 
1425 		return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
1426 	} else {
1427 		return 0;
1428 	}
1429 }
1430 
1431 static struct spdk_poller *
1432 poller_register(spdk_poller_fn fn,
1433 		void *arg,
1434 		uint64_t period_microseconds,
1435 		const char *name)
1436 {
1437 	struct spdk_thread *thread;
1438 	struct spdk_poller *poller;
1439 
1440 	thread = spdk_get_thread();
1441 	if (!thread) {
1442 		assert(false);
1443 		return NULL;
1444 	}
1445 
1446 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1447 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1448 		return NULL;
1449 	}
1450 
1451 	poller = calloc(1, sizeof(*poller));
1452 	if (poller == NULL) {
1453 		SPDK_ERRLOG("Poller memory allocation failed\n");
1454 		return NULL;
1455 	}
1456 
1457 	if (name) {
1458 		snprintf(poller->name, sizeof(poller->name), "%s", name);
1459 	} else {
1460 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
1461 	}
1462 
1463 	poller->state = SPDK_POLLER_STATE_WAITING;
1464 	poller->fn = fn;
1465 	poller->arg = arg;
1466 	poller->thread = thread;
1467 	poller->interruptfd = -1;
1468 
1469 	poller->period_ticks = convert_us_to_ticks(period_microseconds);
1470 
1471 	if (spdk_interrupt_mode_is_enabled()) {
1472 		int rc;
1473 
1474 		if (period_microseconds) {
1475 			rc = period_poller_interrupt_init(poller);
1476 			if (rc < 0) {
1477 				SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc));
1478 				free(poller);
1479 				return NULL;
1480 			}
1481 
1482 			spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL);
1483 		} else {
1484 			/* If the poller doesn't have a period, create interruptfd that's always
1485 			 * busy automatically when runnning in interrupt mode.
1486 			 */
1487 			rc = busy_poller_interrupt_init(poller);
1488 			if (rc > 0) {
1489 				SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc));
1490 				free(poller);
1491 				return NULL;
1492 			}
1493 
1494 			spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL);
1495 		}
1496 	}
1497 
1498 	thread_insert_poller(thread, poller);
1499 
1500 	return poller;
1501 }
1502 
1503 struct spdk_poller *
1504 spdk_poller_register(spdk_poller_fn fn,
1505 		     void *arg,
1506 		     uint64_t period_microseconds)
1507 {
1508 	return poller_register(fn, arg, period_microseconds, NULL);
1509 }
1510 
1511 struct spdk_poller *
1512 spdk_poller_register_named(spdk_poller_fn fn,
1513 			   void *arg,
1514 			   uint64_t period_microseconds,
1515 			   const char *name)
1516 {
1517 	return poller_register(fn, arg, period_microseconds, name);
1518 }
1519 
1520 void
1521 spdk_poller_unregister(struct spdk_poller **ppoller)
1522 {
1523 	struct spdk_thread *thread;
1524 	struct spdk_poller *poller;
1525 
1526 	poller = *ppoller;
1527 	if (poller == NULL) {
1528 		return;
1529 	}
1530 
1531 	*ppoller = NULL;
1532 
1533 	thread = spdk_get_thread();
1534 	if (!thread) {
1535 		assert(false);
1536 		return;
1537 	}
1538 
1539 	if (poller->thread != thread) {
1540 		SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n");
1541 		assert(false);
1542 		return;
1543 	}
1544 
1545 	if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) {
1546 		poller_interrupt_fini(poller);
1547 	}
1548 
1549 	/* If the poller was paused, put it on the active_pollers list so that
1550 	 * its unregistration can be processed by spdk_thread_poll().
1551 	 */
1552 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1553 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1554 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1555 		poller->period_ticks = 0;
1556 	}
1557 
1558 	/* Simply set the state to unregistered. The poller will get cleaned up
1559 	 * in a subsequent call to spdk_thread_poll().
1560 	 */
1561 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1562 }
1563 
1564 void
1565 spdk_poller_pause(struct spdk_poller *poller)
1566 {
1567 	struct spdk_thread *thread;
1568 
1569 	thread = spdk_get_thread();
1570 	if (!thread) {
1571 		assert(false);
1572 		return;
1573 	}
1574 
1575 	if (poller->thread != thread) {
1576 		SPDK_ERRLOG("different from the thread that called spdk_poller_pause()\n");
1577 		assert(false);
1578 		return;
1579 	}
1580 
1581 	/* We just set its state to SPDK_POLLER_STATE_PAUSING and let
1582 	 * spdk_thread_poll() move it. It allows a poller to be paused from
1583 	 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE
1584 	 * iteration, or from within itself without breaking the logic to always
1585 	 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration.
1586 	 */
1587 	switch (poller->state) {
1588 	case SPDK_POLLER_STATE_PAUSED:
1589 	case SPDK_POLLER_STATE_PAUSING:
1590 		break;
1591 	case SPDK_POLLER_STATE_RUNNING:
1592 	case SPDK_POLLER_STATE_WAITING:
1593 		poller->state = SPDK_POLLER_STATE_PAUSING;
1594 		break;
1595 	default:
1596 		assert(false);
1597 		break;
1598 	}
1599 }
1600 
1601 void
1602 spdk_poller_resume(struct spdk_poller *poller)
1603 {
1604 	struct spdk_thread *thread;
1605 
1606 	thread = spdk_get_thread();
1607 	if (!thread) {
1608 		assert(false);
1609 		return;
1610 	}
1611 
1612 	if (poller->thread != thread) {
1613 		SPDK_ERRLOG("different from the thread that called spdk_poller_resume()\n");
1614 		assert(false);
1615 		return;
1616 	}
1617 
1618 	/* If a poller is paused it has to be removed from the paused pollers
1619 	 * list and put on the active list or timer tree depending on its
1620 	 * period_ticks.  If a poller is still in the process of being paused,
1621 	 * we just need to flip its state back to waiting, as it's already on
1622 	 * the appropriate list or tree.
1623 	 */
1624 	switch (poller->state) {
1625 	case SPDK_POLLER_STATE_PAUSED:
1626 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1627 		thread_insert_poller(thread, poller);
1628 	/* fallthrough */
1629 	case SPDK_POLLER_STATE_PAUSING:
1630 		poller->state = SPDK_POLLER_STATE_WAITING;
1631 		break;
1632 	case SPDK_POLLER_STATE_RUNNING:
1633 	case SPDK_POLLER_STATE_WAITING:
1634 		break;
1635 	default:
1636 		assert(false);
1637 		break;
1638 	}
1639 }
1640 
1641 const char *
1642 spdk_poller_get_name(struct spdk_poller *poller)
1643 {
1644 	return poller->name;
1645 }
1646 
1647 const char *
1648 spdk_poller_get_state_str(struct spdk_poller *poller)
1649 {
1650 	switch (poller->state) {
1651 	case SPDK_POLLER_STATE_WAITING:
1652 		return "waiting";
1653 	case SPDK_POLLER_STATE_RUNNING:
1654 		return "running";
1655 	case SPDK_POLLER_STATE_UNREGISTERED:
1656 		return "unregistered";
1657 	case SPDK_POLLER_STATE_PAUSING:
1658 		return "pausing";
1659 	case SPDK_POLLER_STATE_PAUSED:
1660 		return "paused";
1661 	default:
1662 		return NULL;
1663 	}
1664 }
1665 
1666 uint64_t
1667 spdk_poller_get_period_ticks(struct spdk_poller *poller)
1668 {
1669 	return poller->period_ticks;
1670 }
1671 
1672 void
1673 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats)
1674 {
1675 	stats->run_count = poller->run_count;
1676 	stats->busy_count = poller->busy_count;
1677 }
1678 
1679 struct spdk_poller *
1680 spdk_thread_get_first_active_poller(struct spdk_thread *thread)
1681 {
1682 	return TAILQ_FIRST(&thread->active_pollers);
1683 }
1684 
1685 struct spdk_poller *
1686 spdk_thread_get_next_active_poller(struct spdk_poller *prev)
1687 {
1688 	return TAILQ_NEXT(prev, tailq);
1689 }
1690 
1691 struct spdk_poller *
1692 spdk_thread_get_first_timed_poller(struct spdk_thread *thread)
1693 {
1694 	return RB_MIN(timed_pollers_tree, &thread->timed_pollers);
1695 }
1696 
1697 struct spdk_poller *
1698 spdk_thread_get_next_timed_poller(struct spdk_poller *prev)
1699 {
1700 	return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev);
1701 }
1702 
1703 struct spdk_poller *
1704 spdk_thread_get_first_paused_poller(struct spdk_thread *thread)
1705 {
1706 	return TAILQ_FIRST(&thread->paused_pollers);
1707 }
1708 
1709 struct spdk_poller *
1710 spdk_thread_get_next_paused_poller(struct spdk_poller *prev)
1711 {
1712 	return TAILQ_NEXT(prev, tailq);
1713 }
1714 
1715 struct spdk_io_channel *
1716 spdk_thread_get_first_io_channel(struct spdk_thread *thread)
1717 {
1718 	return TAILQ_FIRST(&thread->io_channels);
1719 }
1720 
1721 struct spdk_io_channel *
1722 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev)
1723 {
1724 	return TAILQ_NEXT(prev, tailq);
1725 }
1726 
1727 struct call_thread {
1728 	struct spdk_thread *cur_thread;
1729 	spdk_msg_fn fn;
1730 	void *ctx;
1731 
1732 	struct spdk_thread *orig_thread;
1733 	spdk_msg_fn cpl;
1734 };
1735 
1736 static void
1737 _on_thread(void *ctx)
1738 {
1739 	struct call_thread *ct = ctx;
1740 	int rc __attribute__((unused));
1741 
1742 	ct->fn(ct->ctx);
1743 
1744 	pthread_mutex_lock(&g_devlist_mutex);
1745 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1746 	pthread_mutex_unlock(&g_devlist_mutex);
1747 
1748 	if (!ct->cur_thread) {
1749 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1750 
1751 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1752 		free(ctx);
1753 	} else {
1754 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1755 			      ct->cur_thread->name);
1756 
1757 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1758 	}
1759 	assert(rc == 0);
1760 }
1761 
1762 void
1763 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1764 {
1765 	struct call_thread *ct;
1766 	struct spdk_thread *thread;
1767 	int rc __attribute__((unused));
1768 
1769 	ct = calloc(1, sizeof(*ct));
1770 	if (!ct) {
1771 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1772 		cpl(ctx);
1773 		return;
1774 	}
1775 
1776 	ct->fn = fn;
1777 	ct->ctx = ctx;
1778 	ct->cpl = cpl;
1779 
1780 	thread = _get_thread();
1781 	if (!thread) {
1782 		SPDK_ERRLOG("No thread allocated\n");
1783 		free(ct);
1784 		cpl(ctx);
1785 		return;
1786 	}
1787 	ct->orig_thread = thread;
1788 
1789 	pthread_mutex_lock(&g_devlist_mutex);
1790 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1791 	pthread_mutex_unlock(&g_devlist_mutex);
1792 
1793 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
1794 		      ct->orig_thread->name);
1795 
1796 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1797 	assert(rc == 0);
1798 }
1799 
1800 static inline void
1801 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode)
1802 {
1803 	if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1804 		return;
1805 	}
1806 
1807 	if (!poller->set_intr_cb_fn) {
1808 		SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name);
1809 		assert(false);
1810 		return;
1811 	}
1812 
1813 	poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode);
1814 }
1815 
1816 void
1817 spdk_thread_set_interrupt_mode(bool enable_interrupt)
1818 {
1819 	struct spdk_thread *thread = _get_thread();
1820 	struct spdk_poller *poller, *tmp;
1821 
1822 	assert(thread);
1823 	assert(spdk_interrupt_mode_is_enabled());
1824 
1825 	if (thread->in_interrupt == enable_interrupt) {
1826 		return;
1827 	}
1828 
1829 	/* Set pollers to expected mode */
1830 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
1831 		poller_set_interrupt_mode(poller, enable_interrupt);
1832 	}
1833 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) {
1834 		poller_set_interrupt_mode(poller, enable_interrupt);
1835 	}
1836 	/* All paused pollers will go to work in interrupt mode */
1837 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) {
1838 		poller_set_interrupt_mode(poller, enable_interrupt);
1839 	}
1840 
1841 	thread->in_interrupt = enable_interrupt;
1842 	return;
1843 }
1844 
1845 void
1846 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1847 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1848 			const char *name)
1849 {
1850 	struct io_device *dev, *tmp;
1851 	struct spdk_thread *thread;
1852 
1853 	assert(io_device != NULL);
1854 	assert(create_cb != NULL);
1855 	assert(destroy_cb != NULL);
1856 
1857 	thread = spdk_get_thread();
1858 	if (!thread) {
1859 		SPDK_ERRLOG("called from non-SPDK thread\n");
1860 		assert(false);
1861 		return;
1862 	}
1863 
1864 	dev = calloc(1, sizeof(struct io_device));
1865 	if (dev == NULL) {
1866 		SPDK_ERRLOG("could not allocate io_device\n");
1867 		return;
1868 	}
1869 
1870 	dev->io_device = io_device;
1871 	if (name) {
1872 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1873 	} else {
1874 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1875 	}
1876 	dev->create_cb = create_cb;
1877 	dev->destroy_cb = destroy_cb;
1878 	dev->unregister_cb = NULL;
1879 	dev->ctx_size = ctx_size;
1880 	dev->for_each_count = 0;
1881 	dev->unregistered = false;
1882 	dev->refcnt = 0;
1883 
1884 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
1885 		      dev->name, dev->io_device, thread->name);
1886 
1887 	pthread_mutex_lock(&g_devlist_mutex);
1888 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
1889 		if (tmp->io_device == io_device) {
1890 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1891 				    io_device, tmp->name, dev->name);
1892 			free(dev);
1893 			pthread_mutex_unlock(&g_devlist_mutex);
1894 			return;
1895 		}
1896 	}
1897 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1898 	pthread_mutex_unlock(&g_devlist_mutex);
1899 }
1900 
1901 static void
1902 _finish_unregister(void *arg)
1903 {
1904 	struct io_device *dev = arg;
1905 	struct spdk_thread *thread;
1906 
1907 	thread = spdk_get_thread();
1908 	assert(thread == dev->unregister_thread);
1909 
1910 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1911 		      dev->name, dev->io_device, thread->name);
1912 
1913 	assert(thread->pending_unregister_count > 0);
1914 	thread->pending_unregister_count--;
1915 
1916 	dev->unregister_cb(dev->io_device);
1917 	free(dev);
1918 }
1919 
1920 static void
1921 io_device_free(struct io_device *dev)
1922 {
1923 	int rc __attribute__((unused));
1924 
1925 	if (dev->unregister_cb == NULL) {
1926 		free(dev);
1927 	} else {
1928 		assert(dev->unregister_thread != NULL);
1929 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
1930 			      dev->name, dev->io_device, dev->unregister_thread->name);
1931 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1932 		assert(rc == 0);
1933 	}
1934 }
1935 
1936 void
1937 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1938 {
1939 	struct io_device *dev;
1940 	uint32_t refcnt;
1941 	struct spdk_thread *thread;
1942 
1943 	thread = spdk_get_thread();
1944 	if (!thread) {
1945 		SPDK_ERRLOG("called from non-SPDK thread\n");
1946 		assert(false);
1947 		return;
1948 	}
1949 
1950 	pthread_mutex_lock(&g_devlist_mutex);
1951 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1952 		if (dev->io_device == io_device) {
1953 			break;
1954 		}
1955 	}
1956 
1957 	if (!dev) {
1958 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1959 		assert(false);
1960 		pthread_mutex_unlock(&g_devlist_mutex);
1961 		return;
1962 	}
1963 
1964 	if (dev->for_each_count > 0) {
1965 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1966 			    dev->name, io_device, dev->for_each_count);
1967 		pthread_mutex_unlock(&g_devlist_mutex);
1968 		return;
1969 	}
1970 
1971 	dev->unregister_cb = unregister_cb;
1972 	dev->unregistered = true;
1973 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
1974 	refcnt = dev->refcnt;
1975 	dev->unregister_thread = thread;
1976 	pthread_mutex_unlock(&g_devlist_mutex);
1977 
1978 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
1979 		      dev->name, dev->io_device, thread->name);
1980 
1981 	if (unregister_cb) {
1982 		thread->pending_unregister_count++;
1983 	}
1984 
1985 	if (refcnt > 0) {
1986 		/* defer deletion */
1987 		return;
1988 	}
1989 
1990 	io_device_free(dev);
1991 }
1992 
1993 const char *
1994 spdk_io_device_get_name(struct io_device *dev)
1995 {
1996 	return dev->name;
1997 }
1998 
1999 struct spdk_io_channel *
2000 spdk_get_io_channel(void *io_device)
2001 {
2002 	struct spdk_io_channel *ch;
2003 	struct spdk_thread *thread;
2004 	struct io_device *dev;
2005 	int rc;
2006 
2007 	pthread_mutex_lock(&g_devlist_mutex);
2008 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
2009 		if (dev->io_device == io_device) {
2010 			break;
2011 		}
2012 	}
2013 	if (dev == NULL) {
2014 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
2015 		pthread_mutex_unlock(&g_devlist_mutex);
2016 		return NULL;
2017 	}
2018 
2019 	thread = _get_thread();
2020 	if (!thread) {
2021 		SPDK_ERRLOG("No thread allocated\n");
2022 		pthread_mutex_unlock(&g_devlist_mutex);
2023 		return NULL;
2024 	}
2025 
2026 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
2027 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
2028 		pthread_mutex_unlock(&g_devlist_mutex);
2029 		return NULL;
2030 	}
2031 
2032 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
2033 		if (ch->dev == dev) {
2034 			ch->ref++;
2035 
2036 			SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2037 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
2038 
2039 			/*
2040 			 * An I/O channel already exists for this device on this
2041 			 *  thread, so return it.
2042 			 */
2043 			pthread_mutex_unlock(&g_devlist_mutex);
2044 			return ch;
2045 		}
2046 	}
2047 
2048 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
2049 	if (ch == NULL) {
2050 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
2051 		pthread_mutex_unlock(&g_devlist_mutex);
2052 		return NULL;
2053 	}
2054 
2055 	ch->dev = dev;
2056 	ch->destroy_cb = dev->destroy_cb;
2057 	ch->thread = thread;
2058 	ch->ref = 1;
2059 	ch->destroy_ref = 0;
2060 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
2061 
2062 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2063 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
2064 
2065 	dev->refcnt++;
2066 
2067 	pthread_mutex_unlock(&g_devlist_mutex);
2068 
2069 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
2070 	if (rc != 0) {
2071 		pthread_mutex_lock(&g_devlist_mutex);
2072 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
2073 		dev->refcnt--;
2074 		free(ch);
2075 		pthread_mutex_unlock(&g_devlist_mutex);
2076 		return NULL;
2077 	}
2078 
2079 	return ch;
2080 }
2081 
2082 static void
2083 put_io_channel(void *arg)
2084 {
2085 	struct spdk_io_channel *ch = arg;
2086 	bool do_remove_dev = true;
2087 	struct spdk_thread *thread;
2088 
2089 	thread = spdk_get_thread();
2090 	if (!thread) {
2091 		SPDK_ERRLOG("called from non-SPDK thread\n");
2092 		assert(false);
2093 		return;
2094 	}
2095 
2096 	SPDK_DEBUGLOG(thread,
2097 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
2098 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
2099 
2100 	assert(ch->thread == thread);
2101 
2102 	ch->destroy_ref--;
2103 
2104 	if (ch->ref > 0 || ch->destroy_ref > 0) {
2105 		/*
2106 		 * Another reference to the associated io_device was requested
2107 		 *  after this message was sent but before it had a chance to
2108 		 *  execute.
2109 		 */
2110 		return;
2111 	}
2112 
2113 	pthread_mutex_lock(&g_devlist_mutex);
2114 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
2115 	pthread_mutex_unlock(&g_devlist_mutex);
2116 
2117 	/* Don't hold the devlist mutex while the destroy_cb is called. */
2118 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
2119 
2120 	pthread_mutex_lock(&g_devlist_mutex);
2121 	ch->dev->refcnt--;
2122 
2123 	if (!ch->dev->unregistered) {
2124 		do_remove_dev = false;
2125 	}
2126 
2127 	if (ch->dev->refcnt > 0) {
2128 		do_remove_dev = false;
2129 	}
2130 
2131 	pthread_mutex_unlock(&g_devlist_mutex);
2132 
2133 	if (do_remove_dev) {
2134 		io_device_free(ch->dev);
2135 	}
2136 	free(ch);
2137 }
2138 
2139 void
2140 spdk_put_io_channel(struct spdk_io_channel *ch)
2141 {
2142 	struct spdk_thread *thread;
2143 	int rc __attribute__((unused));
2144 
2145 	thread = spdk_get_thread();
2146 	if (!thread) {
2147 		SPDK_ERRLOG("called from non-SPDK thread\n");
2148 		assert(false);
2149 		return;
2150 	}
2151 
2152 	if (ch->thread != thread) {
2153 		SPDK_ERRLOG("different from the thread that called get_io_channel()\n");
2154 		assert(false);
2155 		return;
2156 	}
2157 
2158 	SPDK_DEBUGLOG(thread,
2159 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2160 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
2161 
2162 	ch->ref--;
2163 
2164 	if (ch->ref == 0) {
2165 		ch->destroy_ref++;
2166 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
2167 		assert(rc == 0);
2168 	}
2169 }
2170 
2171 struct spdk_io_channel *
2172 spdk_io_channel_from_ctx(void *ctx)
2173 {
2174 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
2175 }
2176 
2177 struct spdk_thread *
2178 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
2179 {
2180 	return ch->thread;
2181 }
2182 
2183 void *
2184 spdk_io_channel_get_io_device(struct spdk_io_channel *ch)
2185 {
2186 	return ch->dev->io_device;
2187 }
2188 
2189 int
2190 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch)
2191 {
2192 	return ch->ref;
2193 }
2194 
2195 struct spdk_io_channel_iter {
2196 	void *io_device;
2197 	struct io_device *dev;
2198 	spdk_channel_msg fn;
2199 	int status;
2200 	void *ctx;
2201 	struct spdk_io_channel *ch;
2202 
2203 	struct spdk_thread *cur_thread;
2204 
2205 	struct spdk_thread *orig_thread;
2206 	spdk_channel_for_each_cpl cpl;
2207 };
2208 
2209 void *
2210 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
2211 {
2212 	return i->io_device;
2213 }
2214 
2215 struct spdk_io_channel *
2216 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
2217 {
2218 	return i->ch;
2219 }
2220 
2221 void *
2222 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
2223 {
2224 	return i->ctx;
2225 }
2226 
2227 static void
2228 _call_completion(void *ctx)
2229 {
2230 	struct spdk_io_channel_iter *i = ctx;
2231 
2232 	if (i->cpl != NULL) {
2233 		i->cpl(i, i->status);
2234 	}
2235 	free(i);
2236 }
2237 
2238 static void
2239 _call_channel(void *ctx)
2240 {
2241 	struct spdk_io_channel_iter *i = ctx;
2242 	struct spdk_io_channel *ch;
2243 
2244 	/*
2245 	 * It is possible that the channel was deleted before this
2246 	 *  message had a chance to execute.  If so, skip calling
2247 	 *  the fn() on this thread.
2248 	 */
2249 	pthread_mutex_lock(&g_devlist_mutex);
2250 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
2251 		if (ch->dev->io_device == i->io_device) {
2252 			break;
2253 		}
2254 	}
2255 	pthread_mutex_unlock(&g_devlist_mutex);
2256 
2257 	if (ch) {
2258 		i->fn(i);
2259 	} else {
2260 		spdk_for_each_channel_continue(i, 0);
2261 	}
2262 }
2263 
2264 void
2265 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
2266 		      spdk_channel_for_each_cpl cpl)
2267 {
2268 	struct spdk_thread *thread;
2269 	struct spdk_io_channel *ch;
2270 	struct spdk_io_channel_iter *i;
2271 	int rc __attribute__((unused));
2272 
2273 	i = calloc(1, sizeof(*i));
2274 	if (!i) {
2275 		SPDK_ERRLOG("Unable to allocate iterator\n");
2276 		return;
2277 	}
2278 
2279 	i->io_device = io_device;
2280 	i->fn = fn;
2281 	i->ctx = ctx;
2282 	i->cpl = cpl;
2283 
2284 	pthread_mutex_lock(&g_devlist_mutex);
2285 	i->orig_thread = _get_thread();
2286 
2287 	TAILQ_FOREACH(thread, &g_threads, tailq) {
2288 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
2289 			if (ch->dev->io_device == io_device) {
2290 				ch->dev->for_each_count++;
2291 				i->dev = ch->dev;
2292 				i->cur_thread = thread;
2293 				i->ch = ch;
2294 				pthread_mutex_unlock(&g_devlist_mutex);
2295 				rc = spdk_thread_send_msg(thread, _call_channel, i);
2296 				assert(rc == 0);
2297 				return;
2298 			}
2299 		}
2300 	}
2301 
2302 	pthread_mutex_unlock(&g_devlist_mutex);
2303 
2304 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2305 	assert(rc == 0);
2306 }
2307 
2308 void
2309 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
2310 {
2311 	struct spdk_thread *thread;
2312 	struct spdk_io_channel *ch;
2313 	int rc __attribute__((unused));
2314 
2315 	assert(i->cur_thread == spdk_get_thread());
2316 
2317 	i->status = status;
2318 
2319 	pthread_mutex_lock(&g_devlist_mutex);
2320 	if (status) {
2321 		goto end;
2322 	}
2323 	thread = TAILQ_NEXT(i->cur_thread, tailq);
2324 	while (thread) {
2325 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
2326 			if (ch->dev->io_device == i->io_device) {
2327 				i->cur_thread = thread;
2328 				i->ch = ch;
2329 				pthread_mutex_unlock(&g_devlist_mutex);
2330 				rc = spdk_thread_send_msg(thread, _call_channel, i);
2331 				assert(rc == 0);
2332 				return;
2333 			}
2334 		}
2335 		thread = TAILQ_NEXT(thread, tailq);
2336 	}
2337 
2338 end:
2339 	i->dev->for_each_count--;
2340 	i->ch = NULL;
2341 	pthread_mutex_unlock(&g_devlist_mutex);
2342 
2343 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2344 	assert(rc == 0);
2345 }
2346 
2347 struct spdk_interrupt {
2348 	int			efd;
2349 	struct spdk_thread	*thread;
2350 	char			name[SPDK_MAX_POLLER_NAME_LEN + 1];
2351 };
2352 
2353 static void
2354 thread_interrupt_destroy(struct spdk_thread *thread)
2355 {
2356 	struct spdk_fd_group *fgrp = thread->fgrp;
2357 
2358 	SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
2359 
2360 	if (thread->msg_fd < 0) {
2361 		return;
2362 	}
2363 
2364 	spdk_fd_group_remove(fgrp, thread->msg_fd);
2365 	close(thread->msg_fd);
2366 	thread->msg_fd = -1;
2367 
2368 	spdk_fd_group_destroy(fgrp);
2369 	thread->fgrp = NULL;
2370 }
2371 
2372 #ifdef __linux__
2373 static int
2374 thread_interrupt_msg_process(void *arg)
2375 {
2376 	struct spdk_thread *thread = arg;
2377 	uint32_t msg_count;
2378 	spdk_msg_fn critical_msg;
2379 	int rc = 0;
2380 	uint64_t notify = 1;
2381 
2382 	assert(spdk_interrupt_mode_is_enabled());
2383 
2384 	/* There may be race between msg_acknowledge and another producer's msg_notify,
2385 	 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
2386 	 * This can avoid msg notification missing.
2387 	 */
2388 	rc = read(thread->msg_fd, &notify, sizeof(notify));
2389 	if (rc < 0 && errno != EAGAIN) {
2390 		SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno));
2391 	}
2392 
2393 	critical_msg = thread->critical_msg;
2394 	if (spdk_unlikely(critical_msg != NULL)) {
2395 		critical_msg(NULL);
2396 		thread->critical_msg = NULL;
2397 		rc = 1;
2398 	}
2399 
2400 	msg_count = msg_queue_run_batch(thread, 0);
2401 	if (msg_count) {
2402 		rc = 1;
2403 	}
2404 
2405 	return rc;
2406 }
2407 
2408 static int
2409 thread_interrupt_create(struct spdk_thread *thread)
2410 {
2411 	int rc;
2412 
2413 	SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
2414 
2415 	rc = spdk_fd_group_create(&thread->fgrp);
2416 	if (rc) {
2417 		return rc;
2418 	}
2419 
2420 	thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2421 	if (thread->msg_fd < 0) {
2422 		rc = -errno;
2423 		spdk_fd_group_destroy(thread->fgrp);
2424 		thread->fgrp = NULL;
2425 
2426 		return rc;
2427 	}
2428 
2429 	return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread);
2430 }
2431 #else
2432 static int
2433 thread_interrupt_create(struct spdk_thread *thread)
2434 {
2435 	return -ENOTSUP;
2436 }
2437 #endif
2438 
2439 struct spdk_interrupt *
2440 spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
2441 			void *arg, const char *name)
2442 {
2443 	struct spdk_thread *thread;
2444 	struct spdk_interrupt *intr;
2445 
2446 	thread = spdk_get_thread();
2447 	if (!thread) {
2448 		assert(false);
2449 		return NULL;
2450 	}
2451 
2452 	if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
2453 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
2454 		return NULL;
2455 	}
2456 
2457 	if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) {
2458 		return NULL;
2459 	}
2460 
2461 	intr = calloc(1, sizeof(*intr));
2462 	if (intr == NULL) {
2463 		SPDK_ERRLOG("Interrupt handler allocation failed\n");
2464 		return NULL;
2465 	}
2466 
2467 	if (name) {
2468 		snprintf(intr->name, sizeof(intr->name), "%s", name);
2469 	} else {
2470 		snprintf(intr->name, sizeof(intr->name), "%p", fn);
2471 	}
2472 
2473 	intr->efd = efd;
2474 	intr->thread = thread;
2475 
2476 	return intr;
2477 }
2478 
2479 void
2480 spdk_interrupt_unregister(struct spdk_interrupt **pintr)
2481 {
2482 	struct spdk_thread *thread;
2483 	struct spdk_interrupt *intr;
2484 
2485 	intr = *pintr;
2486 	if (intr == NULL) {
2487 		return;
2488 	}
2489 
2490 	*pintr = NULL;
2491 
2492 	thread = spdk_get_thread();
2493 	if (!thread) {
2494 		assert(false);
2495 		return;
2496 	}
2497 
2498 	if (intr->thread != thread) {
2499 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
2500 		assert(false);
2501 		return;
2502 	}
2503 
2504 	spdk_fd_group_remove(thread->fgrp, intr->efd);
2505 	free(intr);
2506 }
2507 
2508 int
2509 spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
2510 			       enum spdk_interrupt_event_types event_types)
2511 {
2512 	struct spdk_thread *thread;
2513 
2514 	thread = spdk_get_thread();
2515 	if (!thread) {
2516 		assert(false);
2517 		return -EINVAL;
2518 	}
2519 
2520 	if (intr->thread != thread) {
2521 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
2522 		assert(false);
2523 		return -EINVAL;
2524 	}
2525 
2526 	return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
2527 }
2528 
2529 int
2530 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
2531 {
2532 	return spdk_fd_group_get_fd(thread->fgrp);
2533 }
2534 
2535 static bool g_interrupt_mode = false;
2536 
2537 int
2538 spdk_interrupt_mode_enable(void)
2539 {
2540 	/* It must be called once prior to initializing the threading library.
2541 	 * g_spdk_msg_mempool will be valid if thread library is initialized.
2542 	 */
2543 	if (g_spdk_msg_mempool) {
2544 		SPDK_ERRLOG("Failed due to threading library is already initailzied.\n");
2545 		return -1;
2546 	}
2547 
2548 #ifdef __linux__
2549 	SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
2550 	g_interrupt_mode = true;
2551 	return 0;
2552 #else
2553 	SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
2554 	g_interrupt_mode = false;
2555 	return -ENOTSUP;
2556 #endif
2557 }
2558 
2559 bool
2560 spdk_interrupt_mode_is_enabled(void)
2561 {
2562 	return g_interrupt_mode;
2563 }
2564 
2565 SPDK_LOG_REGISTER_COMPONENT(thread)
2566