xref: /spdk/lib/thread/thread.c (revision 49c6afbf128a130cd17d695170723fa4ff7699d0)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/queue.h"
39 #include "spdk/string.h"
40 #include "spdk/thread.h"
41 #include "spdk/trace.h"
42 #include "spdk/tree.h"
43 #include "spdk/util.h"
44 #include "spdk/fd_group.h"
45 
46 #include "spdk/log.h"
47 #include "spdk_internal/thread.h"
48 #include "thread_internal.h"
49 
50 #ifdef __linux__
51 #include <sys/timerfd.h>
52 #include <sys/eventfd.h>
53 #endif
54 
55 #define SPDK_MSG_BATCH_SIZE		8
56 #define SPDK_MAX_DEVICE_NAME_LEN	256
57 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
58 #define SPDK_MAX_POLLER_NAME_LEN	256
59 #define SPDK_MAX_THREAD_NAME_LEN	256
60 
61 enum spdk_poller_state {
62 	/* The poller is registered with a thread but not currently executing its fn. */
63 	SPDK_POLLER_STATE_WAITING,
64 
65 	/* The poller is currently running its fn. */
66 	SPDK_POLLER_STATE_RUNNING,
67 
68 	/* The poller was unregistered during the execution of its fn. */
69 	SPDK_POLLER_STATE_UNREGISTERED,
70 
71 	/* The poller is in the process of being paused.  It will be paused
72 	 * during the next time it's supposed to be executed.
73 	 */
74 	SPDK_POLLER_STATE_PAUSING,
75 
76 	/* The poller is registered but currently paused.  It's on the
77 	 * paused_pollers list.
78 	 */
79 	SPDK_POLLER_STATE_PAUSED,
80 };
81 
82 struct spdk_poller {
83 	TAILQ_ENTRY(spdk_poller)	tailq;
84 	RB_ENTRY(spdk_poller)		node;
85 
86 	/* Current state of the poller; should only be accessed from the poller's thread. */
87 	enum spdk_poller_state		state;
88 
89 	uint64_t			period_ticks;
90 	uint64_t			next_run_tick;
91 	uint64_t			run_count;
92 	uint64_t			busy_count;
93 	spdk_poller_fn			fn;
94 	void				*arg;
95 	struct spdk_thread		*thread;
96 	int				interruptfd;
97 	spdk_poller_set_interrupt_mode_cb set_intr_cb_fn;
98 	void				*set_intr_cb_arg;
99 
100 	char				name[SPDK_MAX_POLLER_NAME_LEN + 1];
101 };
102 
103 enum spdk_thread_state {
104 	/* The thread is pocessing poller and message by spdk_thread_poll(). */
105 	SPDK_THREAD_STATE_RUNNING,
106 
107 	/* The thread is in the process of termination. It reaps unregistering
108 	 * poller are releasing I/O channel.
109 	 */
110 	SPDK_THREAD_STATE_EXITING,
111 
112 	/* The thread is exited. It is ready to call spdk_thread_destroy(). */
113 	SPDK_THREAD_STATE_EXITED,
114 };
115 
116 struct spdk_thread {
117 	uint64_t			tsc_last;
118 	struct spdk_thread_stats	stats;
119 	/*
120 	 * Contains pollers actively running on this thread.  Pollers
121 	 *  are run round-robin. The thread takes one poller from the head
122 	 *  of the ring, executes it, then puts it back at the tail of
123 	 *  the ring.
124 	 */
125 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
126 	/**
127 	 * Contains pollers running on this thread with a periodic timer.
128 	 */
129 	RB_HEAD(timed_pollers_tree, spdk_poller)	timed_pollers;
130 	struct spdk_poller				*first_timed_poller;
131 	/*
132 	 * Contains paused pollers.  Pollers on this queue are waiting until
133 	 * they are resumed (in which case they're put onto the active/timer
134 	 * queues) or unregistered.
135 	 */
136 	TAILQ_HEAD(paused_pollers_head, spdk_poller)	paused_pollers;
137 	struct spdk_ring		*messages;
138 	int				msg_fd;
139 	SLIST_HEAD(, spdk_msg)		msg_cache;
140 	size_t				msg_cache_count;
141 	spdk_msg_fn			critical_msg;
142 	uint64_t			id;
143 	enum spdk_thread_state		state;
144 	int				pending_unregister_count;
145 
146 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
147 	TAILQ_ENTRY(spdk_thread)	tailq;
148 
149 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
150 	struct spdk_cpuset		cpumask;
151 	uint64_t			exit_timeout_tsc;
152 
153 	/* Indicates whether this spdk_thread currently runs in interrupt. */
154 	bool				in_interrupt;
155 	struct spdk_fd_group		*fgrp;
156 
157 	/* User context allocated at the end */
158 	uint8_t				ctx[0];
159 };
160 
161 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
162 
163 static spdk_new_thread_fn g_new_thread_fn = NULL;
164 static spdk_thread_op_fn g_thread_op_fn = NULL;
165 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
166 static size_t g_ctx_sz = 0;
167 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
168  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
169  * SPDK application is required.
170  */
171 static uint64_t g_thread_id = 1;
172 
173 struct io_device {
174 	void				*io_device;
175 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
176 	spdk_io_channel_create_cb	create_cb;
177 	spdk_io_channel_destroy_cb	destroy_cb;
178 	spdk_io_device_unregister_cb	unregister_cb;
179 	struct spdk_thread		*unregister_thread;
180 	uint32_t			ctx_size;
181 	uint32_t			for_each_count;
182 	RB_ENTRY(io_device)		node;
183 
184 	uint32_t			refcnt;
185 
186 	bool				unregistered;
187 };
188 
189 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices);
190 
191 static int
192 io_device_cmp(struct io_device *dev1, struct io_device *dev2)
193 {
194 	return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device);
195 }
196 
197 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp);
198 
199 struct spdk_msg {
200 	spdk_msg_fn		fn;
201 	void			*arg;
202 
203 	SLIST_ENTRY(spdk_msg)	link;
204 };
205 
206 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
207 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
208 
209 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
210 static uint32_t g_thread_count = 0;
211 
212 static __thread struct spdk_thread *tls_thread = NULL;
213 
214 #define TRACE_GROUP_THREAD		0xa
215 #define TRACE_THREAD_IOCH_GET   SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x0)
216 #define TRACE_THREAD_IOCH_PUT   SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x1)
217 
218 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD)
219 {
220 	spdk_trace_register_description("THREAD_IOCH_GET",
221 					TRACE_THREAD_IOCH_GET,
222 					OWNER_NONE, OBJECT_NONE, 0,
223 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
224 	spdk_trace_register_description("THREAD_IOCH_PUT",
225 					TRACE_THREAD_IOCH_PUT,
226 					OWNER_NONE, OBJECT_NONE, 0,
227 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
228 }
229 
230 /*
231  * If this compare function returns zero when two next_run_ticks are equal,
232  * the macro RB_INSERT() returns a pointer to the element with the same
233  * next_run_tick.
234  *
235  * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element
236  * to remove as a parameter.
237  *
238  * Hence we allow RB_INSERT() to insert elements with the same keys on the right
239  * side by returning 1 when two next_run_ticks are equal.
240  */
241 static inline int
242 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2)
243 {
244 	if (poller1->next_run_tick < poller2->next_run_tick) {
245 		return -1;
246 	} else {
247 		return 1;
248 	}
249 }
250 
251 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare);
252 
253 static inline struct spdk_thread *
254 _get_thread(void)
255 {
256 	return tls_thread;
257 }
258 
259 static int
260 _thread_lib_init(size_t ctx_sz)
261 {
262 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
263 
264 	g_ctx_sz = ctx_sz;
265 
266 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
267 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
268 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
269 			     sizeof(struct spdk_msg),
270 			     0, /* No cache. We do our own. */
271 			     SPDK_ENV_SOCKET_ID_ANY);
272 
273 	if (!g_spdk_msg_mempool) {
274 		SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
275 		return -1;
276 	}
277 
278 	return 0;
279 }
280 
281 int
282 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
283 {
284 	assert(g_new_thread_fn == NULL);
285 	assert(g_thread_op_fn == NULL);
286 
287 	if (new_thread_fn == NULL) {
288 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
289 	} else {
290 		g_new_thread_fn = new_thread_fn;
291 	}
292 
293 	return _thread_lib_init(ctx_sz);
294 }
295 
296 int
297 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
298 			 spdk_thread_op_supported_fn thread_op_supported_fn,
299 			 size_t ctx_sz)
300 {
301 	assert(g_new_thread_fn == NULL);
302 	assert(g_thread_op_fn == NULL);
303 	assert(g_thread_op_supported_fn == NULL);
304 
305 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
306 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
307 		return -EINVAL;
308 	}
309 
310 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
311 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
312 	} else {
313 		g_thread_op_fn = thread_op_fn;
314 		g_thread_op_supported_fn = thread_op_supported_fn;
315 	}
316 
317 	return _thread_lib_init(ctx_sz);
318 }
319 
320 void
321 spdk_thread_lib_fini(void)
322 {
323 	struct io_device *dev;
324 
325 	RB_FOREACH(dev, io_device_tree, &g_io_devices) {
326 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
327 	}
328 
329 	if (g_spdk_msg_mempool) {
330 		spdk_mempool_free(g_spdk_msg_mempool);
331 		g_spdk_msg_mempool = NULL;
332 	}
333 
334 	g_new_thread_fn = NULL;
335 	g_thread_op_fn = NULL;
336 	g_thread_op_supported_fn = NULL;
337 	g_ctx_sz = 0;
338 }
339 
340 static void thread_interrupt_destroy(struct spdk_thread *thread);
341 static int thread_interrupt_create(struct spdk_thread *thread);
342 
343 static void
344 _free_thread(struct spdk_thread *thread)
345 {
346 	struct spdk_io_channel *ch;
347 	struct spdk_msg *msg;
348 	struct spdk_poller *poller, *ptmp;
349 
350 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
351 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
352 			    thread->name, ch->dev->name);
353 	}
354 
355 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
356 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
357 			SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
358 				     poller->name);
359 		}
360 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
361 		free(poller);
362 	}
363 
364 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) {
365 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
366 			SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
367 				     poller->name);
368 		}
369 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
370 		free(poller);
371 	}
372 
373 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
374 		SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
375 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
376 		free(poller);
377 	}
378 
379 	pthread_mutex_lock(&g_devlist_mutex);
380 	assert(g_thread_count > 0);
381 	g_thread_count--;
382 	TAILQ_REMOVE(&g_threads, thread, tailq);
383 	pthread_mutex_unlock(&g_devlist_mutex);
384 
385 	msg = SLIST_FIRST(&thread->msg_cache);
386 	while (msg != NULL) {
387 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
388 
389 		assert(thread->msg_cache_count > 0);
390 		thread->msg_cache_count--;
391 		spdk_mempool_put(g_spdk_msg_mempool, msg);
392 
393 		msg = SLIST_FIRST(&thread->msg_cache);
394 	}
395 
396 	assert(thread->msg_cache_count == 0);
397 
398 	if (spdk_interrupt_mode_is_enabled()) {
399 		thread_interrupt_destroy(thread);
400 	}
401 
402 	spdk_ring_free(thread->messages);
403 	free(thread);
404 }
405 
406 struct spdk_thread *
407 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
408 {
409 	struct spdk_thread *thread;
410 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
411 	int rc = 0, i;
412 
413 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
414 	if (!thread) {
415 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
416 		return NULL;
417 	}
418 
419 	if (cpumask) {
420 		spdk_cpuset_copy(&thread->cpumask, cpumask);
421 	} else {
422 		spdk_cpuset_negate(&thread->cpumask);
423 	}
424 
425 	TAILQ_INIT(&thread->io_channels);
426 	TAILQ_INIT(&thread->active_pollers);
427 	RB_INIT(&thread->timed_pollers);
428 	TAILQ_INIT(&thread->paused_pollers);
429 	SLIST_INIT(&thread->msg_cache);
430 	thread->msg_cache_count = 0;
431 
432 	thread->tsc_last = spdk_get_ticks();
433 
434 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
435 	if (!thread->messages) {
436 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
437 		free(thread);
438 		return NULL;
439 	}
440 
441 	/* Fill the local message pool cache. */
442 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
443 	if (rc == 0) {
444 		/* If we can't populate the cache it's ok. The cache will get filled
445 		 * up organically as messages are passed to the thread. */
446 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
447 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
448 			thread->msg_cache_count++;
449 		}
450 	}
451 
452 	if (name) {
453 		snprintf(thread->name, sizeof(thread->name), "%s", name);
454 	} else {
455 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
456 	}
457 
458 	pthread_mutex_lock(&g_devlist_mutex);
459 	if (g_thread_id == 0) {
460 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
461 		pthread_mutex_unlock(&g_devlist_mutex);
462 		_free_thread(thread);
463 		return NULL;
464 	}
465 	thread->id = g_thread_id++;
466 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
467 	g_thread_count++;
468 	pthread_mutex_unlock(&g_devlist_mutex);
469 
470 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
471 		      thread->id, thread->name);
472 
473 	if (spdk_interrupt_mode_is_enabled()) {
474 		thread->in_interrupt = true;
475 		rc = thread_interrupt_create(thread);
476 		if (rc != 0) {
477 			_free_thread(thread);
478 			return NULL;
479 		}
480 	}
481 
482 	if (g_new_thread_fn) {
483 		rc = g_new_thread_fn(thread);
484 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
485 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
486 	}
487 
488 	if (rc != 0) {
489 		_free_thread(thread);
490 		return NULL;
491 	}
492 
493 	thread->state = SPDK_THREAD_STATE_RUNNING;
494 
495 	return thread;
496 }
497 
498 void
499 spdk_set_thread(struct spdk_thread *thread)
500 {
501 	tls_thread = thread;
502 }
503 
504 static void
505 thread_exit(struct spdk_thread *thread, uint64_t now)
506 {
507 	struct spdk_poller *poller;
508 	struct spdk_io_channel *ch;
509 
510 	if (now >= thread->exit_timeout_tsc) {
511 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
512 			    thread->name);
513 		goto exited;
514 	}
515 
516 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
517 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
518 			SPDK_INFOLOG(thread,
519 				     "thread %s still has active poller %s\n",
520 				     thread->name, poller->name);
521 			return;
522 		}
523 	}
524 
525 	RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) {
526 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
527 			SPDK_INFOLOG(thread,
528 				     "thread %s still has active timed poller %s\n",
529 				     thread->name, poller->name);
530 			return;
531 		}
532 	}
533 
534 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
535 		SPDK_INFOLOG(thread,
536 			     "thread %s still has paused poller %s\n",
537 			     thread->name, poller->name);
538 		return;
539 	}
540 
541 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
542 		SPDK_INFOLOG(thread,
543 			     "thread %s still has channel for io_device %s\n",
544 			     thread->name, ch->dev->name);
545 		return;
546 	}
547 
548 	if (thread->pending_unregister_count > 0) {
549 		SPDK_INFOLOG(thread,
550 			     "thread %s is still unregistering io_devices\n",
551 			     thread->name);
552 		return;
553 	}
554 
555 exited:
556 	thread->state = SPDK_THREAD_STATE_EXITED;
557 }
558 
559 int
560 spdk_thread_exit(struct spdk_thread *thread)
561 {
562 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
563 
564 	assert(tls_thread == thread);
565 
566 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
567 		SPDK_INFOLOG(thread,
568 			     "thread %s is already exiting\n",
569 			     thread->name);
570 		return 0;
571 	}
572 
573 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
574 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
575 	thread->state = SPDK_THREAD_STATE_EXITING;
576 	return 0;
577 }
578 
579 bool
580 spdk_thread_is_exited(struct spdk_thread *thread)
581 {
582 	return thread->state == SPDK_THREAD_STATE_EXITED;
583 }
584 
585 void
586 spdk_thread_destroy(struct spdk_thread *thread)
587 {
588 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
589 
590 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
591 
592 	if (tls_thread == thread) {
593 		tls_thread = NULL;
594 	}
595 
596 	_free_thread(thread);
597 }
598 
599 void *
600 spdk_thread_get_ctx(struct spdk_thread *thread)
601 {
602 	if (g_ctx_sz > 0) {
603 		return thread->ctx;
604 	}
605 
606 	return NULL;
607 }
608 
609 struct spdk_cpuset *
610 spdk_thread_get_cpumask(struct spdk_thread *thread)
611 {
612 	return &thread->cpumask;
613 }
614 
615 int
616 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
617 {
618 	struct spdk_thread *thread;
619 
620 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
621 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
622 		assert(false);
623 		return -ENOTSUP;
624 	}
625 
626 	thread = spdk_get_thread();
627 	if (!thread) {
628 		SPDK_ERRLOG("Called from non-SPDK thread\n");
629 		assert(false);
630 		return -EINVAL;
631 	}
632 
633 	spdk_cpuset_copy(&thread->cpumask, cpumask);
634 
635 	/* Invoke framework's reschedule operation. If this function is called multiple times
636 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
637 	 * reschedule operation.
638 	 */
639 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
640 
641 	return 0;
642 }
643 
644 struct spdk_thread *
645 spdk_thread_get_from_ctx(void *ctx)
646 {
647 	if (ctx == NULL) {
648 		assert(false);
649 		return NULL;
650 	}
651 
652 	assert(g_ctx_sz > 0);
653 
654 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
655 }
656 
657 static inline uint32_t
658 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
659 {
660 	unsigned count, i;
661 	void *messages[SPDK_MSG_BATCH_SIZE];
662 	uint64_t notify = 1;
663 	int rc;
664 
665 #ifdef DEBUG
666 	/*
667 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
668 	 * so we will never actually read uninitialized data from events, but just to be sure
669 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
670 	 */
671 	memset(messages, 0, sizeof(messages));
672 #endif
673 
674 	if (max_msgs > 0) {
675 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
676 	} else {
677 		max_msgs = SPDK_MSG_BATCH_SIZE;
678 	}
679 
680 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
681 	if (spdk_unlikely(thread->in_interrupt) &&
682 	    spdk_ring_count(thread->messages) != 0) {
683 		rc = write(thread->msg_fd, &notify, sizeof(notify));
684 		if (rc < 0) {
685 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
686 		}
687 	}
688 	if (count == 0) {
689 		return 0;
690 	}
691 
692 	for (i = 0; i < count; i++) {
693 		struct spdk_msg *msg = messages[i];
694 
695 		assert(msg != NULL);
696 		msg->fn(msg->arg);
697 
698 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
699 			/* Insert the messages at the head. We want to re-use the hot
700 			 * ones. */
701 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
702 			thread->msg_cache_count++;
703 		} else {
704 			spdk_mempool_put(g_spdk_msg_mempool, msg);
705 		}
706 	}
707 
708 	return count;
709 }
710 
711 static void
712 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
713 {
714 	struct spdk_poller *tmp __attribute__((unused));
715 
716 	poller->next_run_tick = now + poller->period_ticks;
717 
718 	/*
719 	 * Insert poller in the thread's timed_pollers tree by next scheduled run time
720 	 * as its key.
721 	 */
722 	tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller);
723 	assert(tmp == NULL);
724 
725 	/* Update the cache only if it is empty or the inserted poller is earlier than it.
726 	 * RB_MIN() is not necessary here because all pollers, which has exactly the same
727 	 * next_run_tick as the existing poller, are inserted on the right side.
728 	 */
729 	if (thread->first_timed_poller == NULL ||
730 	    poller->next_run_tick < thread->first_timed_poller->next_run_tick) {
731 		thread->first_timed_poller = poller;
732 	}
733 }
734 
735 #ifdef __linux__
736 static inline void
737 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller)
738 {
739 	struct spdk_poller *tmp __attribute__((unused));
740 
741 	tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
742 	assert(tmp != NULL);
743 
744 	/* This function is not used in any case that is performance critical.
745 	 * Update the cache simply by RB_MIN() if it needs to be changed.
746 	 */
747 	if (thread->first_timed_poller == poller) {
748 		thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers);
749 	}
750 }
751 #endif
752 
753 static void
754 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
755 {
756 	if (poller->period_ticks) {
757 		poller_insert_timer(thread, poller, spdk_get_ticks());
758 	} else {
759 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
760 	}
761 }
762 
763 static inline void
764 thread_update_stats(struct spdk_thread *thread, uint64_t end,
765 		    uint64_t start, int rc)
766 {
767 	if (rc == 0) {
768 		/* Poller status idle */
769 		thread->stats.idle_tsc += end - start;
770 	} else if (rc > 0) {
771 		/* Poller status busy */
772 		thread->stats.busy_tsc += end - start;
773 	}
774 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
775 	thread->tsc_last = end;
776 }
777 
778 static inline int
779 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller)
780 {
781 	int rc;
782 
783 	switch (poller->state) {
784 	case SPDK_POLLER_STATE_UNREGISTERED:
785 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
786 		free(poller);
787 		return 0;
788 	case SPDK_POLLER_STATE_PAUSING:
789 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
790 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
791 		poller->state = SPDK_POLLER_STATE_PAUSED;
792 		return 0;
793 	case SPDK_POLLER_STATE_WAITING:
794 		break;
795 	default:
796 		assert(false);
797 		break;
798 	}
799 
800 	poller->state = SPDK_POLLER_STATE_RUNNING;
801 	rc = poller->fn(poller->arg);
802 
803 	poller->run_count++;
804 	if (rc > 0) {
805 		poller->busy_count++;
806 	}
807 
808 #ifdef DEBUG
809 	if (rc == -1) {
810 		SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
811 	}
812 #endif
813 
814 	switch (poller->state) {
815 	case SPDK_POLLER_STATE_UNREGISTERED:
816 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
817 		free(poller);
818 		break;
819 	case SPDK_POLLER_STATE_PAUSING:
820 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
821 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
822 		poller->state = SPDK_POLLER_STATE_PAUSED;
823 		break;
824 	case SPDK_POLLER_STATE_PAUSED:
825 	case SPDK_POLLER_STATE_WAITING:
826 		break;
827 	case SPDK_POLLER_STATE_RUNNING:
828 		poller->state = SPDK_POLLER_STATE_WAITING;
829 		break;
830 	default:
831 		assert(false);
832 		break;
833 	}
834 
835 	return rc;
836 }
837 
838 static inline int
839 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller,
840 			    uint64_t now)
841 {
842 	int rc;
843 
844 	switch (poller->state) {
845 	case SPDK_POLLER_STATE_UNREGISTERED:
846 		free(poller);
847 		return 0;
848 	case SPDK_POLLER_STATE_PAUSING:
849 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
850 		poller->state = SPDK_POLLER_STATE_PAUSED;
851 		return 0;
852 	case SPDK_POLLER_STATE_WAITING:
853 		break;
854 	default:
855 		assert(false);
856 		break;
857 	}
858 
859 	poller->state = SPDK_POLLER_STATE_RUNNING;
860 	rc = poller->fn(poller->arg);
861 
862 	poller->run_count++;
863 	if (rc > 0) {
864 		poller->busy_count++;
865 	}
866 
867 #ifdef DEBUG
868 	if (rc == -1) {
869 		SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
870 	}
871 #endif
872 
873 	switch (poller->state) {
874 	case SPDK_POLLER_STATE_UNREGISTERED:
875 		free(poller);
876 		break;
877 	case SPDK_POLLER_STATE_PAUSING:
878 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
879 		poller->state = SPDK_POLLER_STATE_PAUSED;
880 		break;
881 	case SPDK_POLLER_STATE_PAUSED:
882 		break;
883 	case SPDK_POLLER_STATE_RUNNING:
884 		poller->state = SPDK_POLLER_STATE_WAITING;
885 	/* fallthrough */
886 	case SPDK_POLLER_STATE_WAITING:
887 		poller_insert_timer(thread, poller, now);
888 		break;
889 	default:
890 		assert(false);
891 		break;
892 	}
893 
894 	return rc;
895 }
896 
897 static int
898 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
899 {
900 	uint32_t msg_count;
901 	struct spdk_poller *poller, *tmp;
902 	spdk_msg_fn critical_msg;
903 	int rc = 0;
904 
905 	thread->tsc_last = now;
906 
907 	critical_msg = thread->critical_msg;
908 	if (spdk_unlikely(critical_msg != NULL)) {
909 		critical_msg(NULL);
910 		thread->critical_msg = NULL;
911 		rc = 1;
912 	}
913 
914 	msg_count = msg_queue_run_batch(thread, max_msgs);
915 	if (msg_count) {
916 		rc = 1;
917 	}
918 
919 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
920 				   active_pollers_head, tailq, tmp) {
921 		int poller_rc;
922 
923 		poller_rc = thread_execute_poller(thread, poller);
924 		if (poller_rc > rc) {
925 			rc = poller_rc;
926 		}
927 	}
928 
929 	poller = thread->first_timed_poller;
930 	while (poller != NULL) {
931 		int timer_rc = 0;
932 
933 		if (now < poller->next_run_tick) {
934 			break;
935 		}
936 
937 		tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller);
938 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
939 
940 		/* Update the cache to the next timed poller in the list
941 		 * only if the current poller is still the closest, otherwise,
942 		 * do nothing because the cache has been already updated.
943 		 */
944 		if (thread->first_timed_poller == poller) {
945 			thread->first_timed_poller = tmp;
946 		}
947 
948 		timer_rc = thread_execute_timed_poller(thread, poller, now);
949 		if (timer_rc > rc) {
950 			rc = timer_rc;
951 		}
952 
953 		poller = tmp;
954 	}
955 
956 	return rc;
957 }
958 
959 int
960 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
961 {
962 	struct spdk_thread *orig_thread;
963 	int rc;
964 	uint64_t notify = 1;
965 
966 	orig_thread = _get_thread();
967 	tls_thread = thread;
968 
969 	if (now == 0) {
970 		now = spdk_get_ticks();
971 	}
972 
973 	if (spdk_likely(!thread->in_interrupt)) {
974 		rc = thread_poll(thread, max_msgs, now);
975 		if (spdk_unlikely(thread->in_interrupt)) {
976 			/* The thread transitioned to interrupt mode during the above poll.
977 			 * Poll it one more time in case that during the transition time
978 			 * there is msg received without notification.
979 			 */
980 			rc = thread_poll(thread, max_msgs, now);
981 		}
982 	} else {
983 		/* Non-block wait on thread's fd_group */
984 		rc = spdk_fd_group_wait(thread->fgrp, 0);
985 		if (spdk_unlikely(!thread->in_interrupt)) {
986 			/* The thread transitioned to poll mode in a msg during the above processing.
987 			 * Clear msg_fd since thread messages will be polled directly in poll mode.
988 			 */
989 			rc = read(thread->msg_fd, &notify, sizeof(notify));
990 			if (rc < 0 && errno != EAGAIN) {
991 				SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno));
992 			}
993 		}
994 	}
995 
996 
997 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
998 		thread_exit(thread, now);
999 	}
1000 
1001 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
1002 
1003 	tls_thread = orig_thread;
1004 
1005 	return rc;
1006 }
1007 
1008 uint64_t
1009 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
1010 {
1011 	struct spdk_poller *poller;
1012 
1013 	poller = thread->first_timed_poller;
1014 	if (poller) {
1015 		return poller->next_run_tick;
1016 	}
1017 
1018 	return 0;
1019 }
1020 
1021 int
1022 spdk_thread_has_active_pollers(struct spdk_thread *thread)
1023 {
1024 	return !TAILQ_EMPTY(&thread->active_pollers);
1025 }
1026 
1027 static bool
1028 thread_has_unpaused_pollers(struct spdk_thread *thread)
1029 {
1030 	if (TAILQ_EMPTY(&thread->active_pollers) &&
1031 	    RB_EMPTY(&thread->timed_pollers)) {
1032 		return false;
1033 	}
1034 
1035 	return true;
1036 }
1037 
1038 bool
1039 spdk_thread_has_pollers(struct spdk_thread *thread)
1040 {
1041 	if (!thread_has_unpaused_pollers(thread) &&
1042 	    TAILQ_EMPTY(&thread->paused_pollers)) {
1043 		return false;
1044 	}
1045 
1046 	return true;
1047 }
1048 
1049 bool
1050 spdk_thread_is_idle(struct spdk_thread *thread)
1051 {
1052 	if (spdk_ring_count(thread->messages) ||
1053 	    thread_has_unpaused_pollers(thread) ||
1054 	    thread->critical_msg != NULL) {
1055 		return false;
1056 	}
1057 
1058 	return true;
1059 }
1060 
1061 uint32_t
1062 spdk_thread_get_count(void)
1063 {
1064 	/*
1065 	 * Return cached value of the current thread count.  We could acquire the
1066 	 *  lock and iterate through the TAILQ of threads to count them, but that
1067 	 *  count could still be invalidated after we release the lock.
1068 	 */
1069 	return g_thread_count;
1070 }
1071 
1072 struct spdk_thread *
1073 spdk_get_thread(void)
1074 {
1075 	return _get_thread();
1076 }
1077 
1078 const char *
1079 spdk_thread_get_name(const struct spdk_thread *thread)
1080 {
1081 	return thread->name;
1082 }
1083 
1084 uint64_t
1085 spdk_thread_get_id(const struct spdk_thread *thread)
1086 {
1087 	return thread->id;
1088 }
1089 
1090 struct spdk_thread *
1091 spdk_thread_get_by_id(uint64_t id)
1092 {
1093 	struct spdk_thread *thread;
1094 
1095 	if (id == 0 || id >= g_thread_id) {
1096 		SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
1097 		return NULL;
1098 	}
1099 	pthread_mutex_lock(&g_devlist_mutex);
1100 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1101 		if (thread->id == id) {
1102 			break;
1103 		}
1104 	}
1105 	pthread_mutex_unlock(&g_devlist_mutex);
1106 	return thread;
1107 }
1108 
1109 int
1110 spdk_thread_get_stats(struct spdk_thread_stats *stats)
1111 {
1112 	struct spdk_thread *thread;
1113 
1114 	thread = _get_thread();
1115 	if (!thread) {
1116 		SPDK_ERRLOG("No thread allocated\n");
1117 		return -EINVAL;
1118 	}
1119 
1120 	if (stats == NULL) {
1121 		return -EINVAL;
1122 	}
1123 
1124 	*stats = thread->stats;
1125 
1126 	return 0;
1127 }
1128 
1129 uint64_t
1130 spdk_thread_get_last_tsc(struct spdk_thread *thread)
1131 {
1132 	if (thread == NULL) {
1133 		thread = _get_thread();
1134 	}
1135 
1136 	return thread->tsc_last;
1137 }
1138 
1139 static inline int
1140 thread_send_msg_notification(const struct spdk_thread *target_thread)
1141 {
1142 	uint64_t notify = 1;
1143 	int rc;
1144 
1145 	/* Not necessary to do notification if interrupt facility is not enabled */
1146 	if (spdk_likely(!spdk_interrupt_mode_is_enabled())) {
1147 		return 0;
1148 	}
1149 
1150 	/* When each spdk_thread can switch between poll and interrupt mode dynamically,
1151 	 * after sending thread msg, it is necessary to check whether target thread runs in
1152 	 * interrupt mode and then decide whether do event notification.
1153 	 */
1154 	if (spdk_unlikely(target_thread->in_interrupt)) {
1155 		rc = write(target_thread->msg_fd, &notify, sizeof(notify));
1156 		if (rc < 0) {
1157 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
1158 			return -EIO;
1159 		}
1160 	}
1161 
1162 	return 0;
1163 }
1164 
1165 int
1166 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
1167 {
1168 	struct spdk_thread *local_thread;
1169 	struct spdk_msg *msg;
1170 	int rc;
1171 
1172 	assert(thread != NULL);
1173 
1174 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1175 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
1176 		return -EIO;
1177 	}
1178 
1179 	local_thread = _get_thread();
1180 
1181 	msg = NULL;
1182 	if (local_thread != NULL) {
1183 		if (local_thread->msg_cache_count > 0) {
1184 			msg = SLIST_FIRST(&local_thread->msg_cache);
1185 			assert(msg != NULL);
1186 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
1187 			local_thread->msg_cache_count--;
1188 		}
1189 	}
1190 
1191 	if (msg == NULL) {
1192 		msg = spdk_mempool_get(g_spdk_msg_mempool);
1193 		if (!msg) {
1194 			SPDK_ERRLOG("msg could not be allocated\n");
1195 			return -ENOMEM;
1196 		}
1197 	}
1198 
1199 	msg->fn = fn;
1200 	msg->arg = ctx;
1201 
1202 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
1203 	if (rc != 1) {
1204 		SPDK_ERRLOG("msg could not be enqueued\n");
1205 		spdk_mempool_put(g_spdk_msg_mempool, msg);
1206 		return -EIO;
1207 	}
1208 
1209 	return thread_send_msg_notification(thread);
1210 }
1211 
1212 int
1213 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
1214 {
1215 	spdk_msg_fn expected = NULL;
1216 
1217 	if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
1218 					 __ATOMIC_SEQ_CST)) {
1219 		return -EIO;
1220 	}
1221 
1222 	return thread_send_msg_notification(thread);
1223 }
1224 
1225 #ifdef __linux__
1226 static int
1227 interrupt_timerfd_process(void *arg)
1228 {
1229 	struct spdk_poller *poller = arg;
1230 	uint64_t exp;
1231 	int rc;
1232 
1233 	/* clear the level of interval timer */
1234 	rc = read(poller->interruptfd, &exp, sizeof(exp));
1235 	if (rc < 0) {
1236 		if (rc == -EAGAIN) {
1237 			return 0;
1238 		}
1239 
1240 		return rc;
1241 	}
1242 
1243 	return poller->fn(poller->arg);
1244 }
1245 
1246 static int
1247 period_poller_interrupt_init(struct spdk_poller *poller)
1248 {
1249 	struct spdk_fd_group *fgrp = poller->thread->fgrp;
1250 	int timerfd;
1251 	int rc;
1252 
1253 	SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name);
1254 	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
1255 	if (timerfd < 0) {
1256 		return -errno;
1257 	}
1258 
1259 	rc = spdk_fd_group_add(fgrp, timerfd,
1260 			       interrupt_timerfd_process, poller);
1261 	if (rc < 0) {
1262 		close(timerfd);
1263 		return rc;
1264 	}
1265 
1266 	poller->interruptfd = timerfd;
1267 	return 0;
1268 }
1269 
1270 static void
1271 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1272 {
1273 	int timerfd = poller->interruptfd;
1274 	uint64_t now_tick = spdk_get_ticks();
1275 	uint64_t ticks = spdk_get_ticks_hz();
1276 	int ret;
1277 	struct itimerspec new_tv = {};
1278 	struct itimerspec old_tv = {};
1279 
1280 	assert(poller->period_ticks != 0);
1281 	assert(timerfd >= 0);
1282 
1283 	SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name,
1284 		      interrupt_mode ? "interrupt" : "poll");
1285 
1286 	if (interrupt_mode) {
1287 		/* Set repeated timer expiration */
1288 		new_tv.it_interval.tv_sec = poller->period_ticks / ticks;
1289 		new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks;
1290 
1291 		/* Update next timer expiration */
1292 		if (poller->next_run_tick == 0) {
1293 			poller->next_run_tick = now_tick + poller->period_ticks;
1294 		} else if (poller->next_run_tick < now_tick) {
1295 			poller->next_run_tick = now_tick;
1296 		}
1297 
1298 		new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks;
1299 		new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks;
1300 
1301 		ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
1302 		if (ret < 0) {
1303 			SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno);
1304 			assert(false);
1305 		}
1306 	} else {
1307 		/* Disarm the timer */
1308 		ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv);
1309 		if (ret < 0) {
1310 			/* timerfd_settime's failure indicates that the timerfd is in error */
1311 			SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno);
1312 			assert(false);
1313 		}
1314 
1315 		/* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be
1316 		 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC
1317 		 */
1318 		now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \
1319 			   (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC;
1320 		poller_remove_timer(poller->thread, poller);
1321 		poller_insert_timer(poller->thread, poller, now_tick);
1322 	}
1323 }
1324 
1325 static void
1326 poller_interrupt_fini(struct spdk_poller *poller)
1327 {
1328 	SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name);
1329 	assert(poller->interruptfd >= 0);
1330 	spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd);
1331 	close(poller->interruptfd);
1332 	poller->interruptfd = -1;
1333 }
1334 
1335 static int
1336 busy_poller_interrupt_init(struct spdk_poller *poller)
1337 {
1338 	int busy_efd;
1339 	int rc;
1340 
1341 	SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name);
1342 	busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1343 	if (busy_efd < 0) {
1344 		SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name);
1345 		return -errno;
1346 	}
1347 
1348 	rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, poller->fn, poller->arg);
1349 	if (rc < 0) {
1350 		close(busy_efd);
1351 		return rc;
1352 	}
1353 
1354 	poller->interruptfd = busy_efd;
1355 	return 0;
1356 }
1357 
1358 static void
1359 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1360 {
1361 	int busy_efd = poller->interruptfd;
1362 	uint64_t notify = 1;
1363 	int rc __attribute__((unused));
1364 
1365 	assert(busy_efd >= 0);
1366 
1367 	if (interrupt_mode) {
1368 		/* Write without read on eventfd will get it repeatedly triggered. */
1369 		if (write(busy_efd, &notify, sizeof(notify)) < 0) {
1370 			SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name);
1371 		}
1372 	} else {
1373 		/* Read on eventfd will clear its level triggering. */
1374 		rc = read(busy_efd, &notify, sizeof(notify));
1375 	}
1376 }
1377 
1378 #else
1379 
1380 static int
1381 period_poller_interrupt_init(struct spdk_poller *poller)
1382 {
1383 	return -ENOTSUP;
1384 }
1385 
1386 static void
1387 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1388 {
1389 }
1390 
1391 static void
1392 poller_interrupt_fini(struct spdk_poller *poller)
1393 {
1394 }
1395 
1396 static int
1397 busy_poller_interrupt_init(struct spdk_poller *poller)
1398 {
1399 	return -ENOTSUP;
1400 }
1401 
1402 static void
1403 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1404 {
1405 }
1406 
1407 #endif
1408 
1409 void
1410 spdk_poller_register_interrupt(struct spdk_poller *poller,
1411 			       spdk_poller_set_interrupt_mode_cb cb_fn,
1412 			       void *cb_arg)
1413 {
1414 	assert(poller != NULL);
1415 	assert(cb_fn != NULL);
1416 	assert(spdk_get_thread() == poller->thread);
1417 
1418 	if (!spdk_interrupt_mode_is_enabled()) {
1419 		return;
1420 	}
1421 
1422 	/* when a poller is created we don't know if the user is ever going to
1423 	 * enable interrupts on it by calling this function, so the poller
1424 	 * registration function has to immediately create a interruptfd.
1425 	 * When this function does get called by user, we have to then destroy
1426 	 * that interruptfd.
1427 	 */
1428 	if (poller->set_intr_cb_fn && poller->interruptfd >= 0) {
1429 		poller_interrupt_fini(poller);
1430 	}
1431 
1432 	poller->set_intr_cb_fn = cb_fn;
1433 	poller->set_intr_cb_arg = cb_arg;
1434 
1435 	/* Set poller into interrupt mode if thread is in interrupt. */
1436 	if (poller->thread->in_interrupt) {
1437 		poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true);
1438 	}
1439 }
1440 
1441 static uint64_t
1442 convert_us_to_ticks(uint64_t us)
1443 {
1444 	uint64_t quotient, remainder, ticks;
1445 
1446 	if (us) {
1447 		quotient = us / SPDK_SEC_TO_USEC;
1448 		remainder = us % SPDK_SEC_TO_USEC;
1449 		ticks = spdk_get_ticks_hz();
1450 
1451 		return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
1452 	} else {
1453 		return 0;
1454 	}
1455 }
1456 
1457 static struct spdk_poller *
1458 poller_register(spdk_poller_fn fn,
1459 		void *arg,
1460 		uint64_t period_microseconds,
1461 		const char *name)
1462 {
1463 	struct spdk_thread *thread;
1464 	struct spdk_poller *poller;
1465 
1466 	thread = spdk_get_thread();
1467 	if (!thread) {
1468 		assert(false);
1469 		return NULL;
1470 	}
1471 
1472 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1473 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1474 		return NULL;
1475 	}
1476 
1477 	poller = calloc(1, sizeof(*poller));
1478 	if (poller == NULL) {
1479 		SPDK_ERRLOG("Poller memory allocation failed\n");
1480 		return NULL;
1481 	}
1482 
1483 	if (name) {
1484 		snprintf(poller->name, sizeof(poller->name), "%s", name);
1485 	} else {
1486 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
1487 	}
1488 
1489 	poller->state = SPDK_POLLER_STATE_WAITING;
1490 	poller->fn = fn;
1491 	poller->arg = arg;
1492 	poller->thread = thread;
1493 	poller->interruptfd = -1;
1494 
1495 	poller->period_ticks = convert_us_to_ticks(period_microseconds);
1496 
1497 	if (spdk_interrupt_mode_is_enabled()) {
1498 		int rc;
1499 
1500 		if (period_microseconds) {
1501 			rc = period_poller_interrupt_init(poller);
1502 			if (rc < 0) {
1503 				SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc));
1504 				free(poller);
1505 				return NULL;
1506 			}
1507 
1508 			spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL);
1509 		} else {
1510 			/* If the poller doesn't have a period, create interruptfd that's always
1511 			 * busy automatically when runnning in interrupt mode.
1512 			 */
1513 			rc = busy_poller_interrupt_init(poller);
1514 			if (rc > 0) {
1515 				SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc));
1516 				free(poller);
1517 				return NULL;
1518 			}
1519 
1520 			spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL);
1521 		}
1522 	}
1523 
1524 	thread_insert_poller(thread, poller);
1525 
1526 	return poller;
1527 }
1528 
1529 struct spdk_poller *
1530 spdk_poller_register(spdk_poller_fn fn,
1531 		     void *arg,
1532 		     uint64_t period_microseconds)
1533 {
1534 	return poller_register(fn, arg, period_microseconds, NULL);
1535 }
1536 
1537 struct spdk_poller *
1538 spdk_poller_register_named(spdk_poller_fn fn,
1539 			   void *arg,
1540 			   uint64_t period_microseconds,
1541 			   const char *name)
1542 {
1543 	return poller_register(fn, arg, period_microseconds, name);
1544 }
1545 
1546 void
1547 spdk_poller_unregister(struct spdk_poller **ppoller)
1548 {
1549 	struct spdk_thread *thread;
1550 	struct spdk_poller *poller;
1551 
1552 	poller = *ppoller;
1553 	if (poller == NULL) {
1554 		return;
1555 	}
1556 
1557 	*ppoller = NULL;
1558 
1559 	thread = spdk_get_thread();
1560 	if (!thread) {
1561 		assert(false);
1562 		return;
1563 	}
1564 
1565 	if (poller->thread != thread) {
1566 		SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n");
1567 		assert(false);
1568 		return;
1569 	}
1570 
1571 	if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) {
1572 		poller_interrupt_fini(poller);
1573 	}
1574 
1575 	/* If the poller was paused, put it on the active_pollers list so that
1576 	 * its unregistration can be processed by spdk_thread_poll().
1577 	 */
1578 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1579 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1580 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1581 		poller->period_ticks = 0;
1582 	}
1583 
1584 	/* Simply set the state to unregistered. The poller will get cleaned up
1585 	 * in a subsequent call to spdk_thread_poll().
1586 	 */
1587 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1588 }
1589 
1590 void
1591 spdk_poller_pause(struct spdk_poller *poller)
1592 {
1593 	struct spdk_thread *thread;
1594 
1595 	thread = spdk_get_thread();
1596 	if (!thread) {
1597 		assert(false);
1598 		return;
1599 	}
1600 
1601 	if (poller->thread != thread) {
1602 		SPDK_ERRLOG("different from the thread that called spdk_poller_pause()\n");
1603 		assert(false);
1604 		return;
1605 	}
1606 
1607 	/* We just set its state to SPDK_POLLER_STATE_PAUSING and let
1608 	 * spdk_thread_poll() move it. It allows a poller to be paused from
1609 	 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE
1610 	 * iteration, or from within itself without breaking the logic to always
1611 	 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration.
1612 	 */
1613 	switch (poller->state) {
1614 	case SPDK_POLLER_STATE_PAUSED:
1615 	case SPDK_POLLER_STATE_PAUSING:
1616 		break;
1617 	case SPDK_POLLER_STATE_RUNNING:
1618 	case SPDK_POLLER_STATE_WAITING:
1619 		poller->state = SPDK_POLLER_STATE_PAUSING;
1620 		break;
1621 	default:
1622 		assert(false);
1623 		break;
1624 	}
1625 }
1626 
1627 void
1628 spdk_poller_resume(struct spdk_poller *poller)
1629 {
1630 	struct spdk_thread *thread;
1631 
1632 	thread = spdk_get_thread();
1633 	if (!thread) {
1634 		assert(false);
1635 		return;
1636 	}
1637 
1638 	if (poller->thread != thread) {
1639 		SPDK_ERRLOG("different from the thread that called spdk_poller_resume()\n");
1640 		assert(false);
1641 		return;
1642 	}
1643 
1644 	/* If a poller is paused it has to be removed from the paused pollers
1645 	 * list and put on the active list or timer tree depending on its
1646 	 * period_ticks.  If a poller is still in the process of being paused,
1647 	 * we just need to flip its state back to waiting, as it's already on
1648 	 * the appropriate list or tree.
1649 	 */
1650 	switch (poller->state) {
1651 	case SPDK_POLLER_STATE_PAUSED:
1652 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1653 		thread_insert_poller(thread, poller);
1654 	/* fallthrough */
1655 	case SPDK_POLLER_STATE_PAUSING:
1656 		poller->state = SPDK_POLLER_STATE_WAITING;
1657 		break;
1658 	case SPDK_POLLER_STATE_RUNNING:
1659 	case SPDK_POLLER_STATE_WAITING:
1660 		break;
1661 	default:
1662 		assert(false);
1663 		break;
1664 	}
1665 }
1666 
1667 const char *
1668 spdk_poller_get_name(struct spdk_poller *poller)
1669 {
1670 	return poller->name;
1671 }
1672 
1673 const char *
1674 spdk_poller_get_state_str(struct spdk_poller *poller)
1675 {
1676 	switch (poller->state) {
1677 	case SPDK_POLLER_STATE_WAITING:
1678 		return "waiting";
1679 	case SPDK_POLLER_STATE_RUNNING:
1680 		return "running";
1681 	case SPDK_POLLER_STATE_UNREGISTERED:
1682 		return "unregistered";
1683 	case SPDK_POLLER_STATE_PAUSING:
1684 		return "pausing";
1685 	case SPDK_POLLER_STATE_PAUSED:
1686 		return "paused";
1687 	default:
1688 		return NULL;
1689 	}
1690 }
1691 
1692 uint64_t
1693 spdk_poller_get_period_ticks(struct spdk_poller *poller)
1694 {
1695 	return poller->period_ticks;
1696 }
1697 
1698 void
1699 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats)
1700 {
1701 	stats->run_count = poller->run_count;
1702 	stats->busy_count = poller->busy_count;
1703 }
1704 
1705 struct spdk_poller *
1706 spdk_thread_get_first_active_poller(struct spdk_thread *thread)
1707 {
1708 	return TAILQ_FIRST(&thread->active_pollers);
1709 }
1710 
1711 struct spdk_poller *
1712 spdk_thread_get_next_active_poller(struct spdk_poller *prev)
1713 {
1714 	return TAILQ_NEXT(prev, tailq);
1715 }
1716 
1717 struct spdk_poller *
1718 spdk_thread_get_first_timed_poller(struct spdk_thread *thread)
1719 {
1720 	return RB_MIN(timed_pollers_tree, &thread->timed_pollers);
1721 }
1722 
1723 struct spdk_poller *
1724 spdk_thread_get_next_timed_poller(struct spdk_poller *prev)
1725 {
1726 	return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev);
1727 }
1728 
1729 struct spdk_poller *
1730 spdk_thread_get_first_paused_poller(struct spdk_thread *thread)
1731 {
1732 	return TAILQ_FIRST(&thread->paused_pollers);
1733 }
1734 
1735 struct spdk_poller *
1736 spdk_thread_get_next_paused_poller(struct spdk_poller *prev)
1737 {
1738 	return TAILQ_NEXT(prev, tailq);
1739 }
1740 
1741 struct spdk_io_channel *
1742 spdk_thread_get_first_io_channel(struct spdk_thread *thread)
1743 {
1744 	return TAILQ_FIRST(&thread->io_channels);
1745 }
1746 
1747 struct spdk_io_channel *
1748 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev)
1749 {
1750 	return TAILQ_NEXT(prev, tailq);
1751 }
1752 
1753 struct call_thread {
1754 	struct spdk_thread *cur_thread;
1755 	spdk_msg_fn fn;
1756 	void *ctx;
1757 
1758 	struct spdk_thread *orig_thread;
1759 	spdk_msg_fn cpl;
1760 };
1761 
1762 static void
1763 _on_thread(void *ctx)
1764 {
1765 	struct call_thread *ct = ctx;
1766 	int rc __attribute__((unused));
1767 
1768 	ct->fn(ct->ctx);
1769 
1770 	pthread_mutex_lock(&g_devlist_mutex);
1771 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1772 	pthread_mutex_unlock(&g_devlist_mutex);
1773 
1774 	if (!ct->cur_thread) {
1775 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1776 
1777 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1778 		free(ctx);
1779 	} else {
1780 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1781 			      ct->cur_thread->name);
1782 
1783 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1784 	}
1785 	assert(rc == 0);
1786 }
1787 
1788 void
1789 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1790 {
1791 	struct call_thread *ct;
1792 	struct spdk_thread *thread;
1793 	int rc __attribute__((unused));
1794 
1795 	ct = calloc(1, sizeof(*ct));
1796 	if (!ct) {
1797 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1798 		cpl(ctx);
1799 		return;
1800 	}
1801 
1802 	ct->fn = fn;
1803 	ct->ctx = ctx;
1804 	ct->cpl = cpl;
1805 
1806 	thread = _get_thread();
1807 	if (!thread) {
1808 		SPDK_ERRLOG("No thread allocated\n");
1809 		free(ct);
1810 		cpl(ctx);
1811 		return;
1812 	}
1813 	ct->orig_thread = thread;
1814 
1815 	pthread_mutex_lock(&g_devlist_mutex);
1816 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1817 	pthread_mutex_unlock(&g_devlist_mutex);
1818 
1819 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
1820 		      ct->orig_thread->name);
1821 
1822 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1823 	assert(rc == 0);
1824 }
1825 
1826 static inline void
1827 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode)
1828 {
1829 	if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1830 		return;
1831 	}
1832 
1833 	if (!poller->set_intr_cb_fn) {
1834 		SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name);
1835 		assert(false);
1836 		return;
1837 	}
1838 
1839 	poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode);
1840 }
1841 
1842 void
1843 spdk_thread_set_interrupt_mode(bool enable_interrupt)
1844 {
1845 	struct spdk_thread *thread = _get_thread();
1846 	struct spdk_poller *poller, *tmp;
1847 
1848 	assert(thread);
1849 	assert(spdk_interrupt_mode_is_enabled());
1850 
1851 	if (thread->in_interrupt == enable_interrupt) {
1852 		return;
1853 	}
1854 
1855 	/* Set pollers to expected mode */
1856 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
1857 		poller_set_interrupt_mode(poller, enable_interrupt);
1858 	}
1859 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) {
1860 		poller_set_interrupt_mode(poller, enable_interrupt);
1861 	}
1862 	/* All paused pollers will go to work in interrupt mode */
1863 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) {
1864 		poller_set_interrupt_mode(poller, enable_interrupt);
1865 	}
1866 
1867 	thread->in_interrupt = enable_interrupt;
1868 	return;
1869 }
1870 
1871 static struct io_device *
1872 io_device_get(void *io_device)
1873 {
1874 	struct io_device find = {};
1875 
1876 	find.io_device = io_device;
1877 	return RB_FIND(io_device_tree, &g_io_devices, &find);
1878 }
1879 
1880 void
1881 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1882 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1883 			const char *name)
1884 {
1885 	struct io_device *dev, *tmp;
1886 	struct spdk_thread *thread;
1887 
1888 	assert(io_device != NULL);
1889 	assert(create_cb != NULL);
1890 	assert(destroy_cb != NULL);
1891 
1892 	thread = spdk_get_thread();
1893 	if (!thread) {
1894 		SPDK_ERRLOG("called from non-SPDK thread\n");
1895 		assert(false);
1896 		return;
1897 	}
1898 
1899 	dev = calloc(1, sizeof(struct io_device));
1900 	if (dev == NULL) {
1901 		SPDK_ERRLOG("could not allocate io_device\n");
1902 		return;
1903 	}
1904 
1905 	dev->io_device = io_device;
1906 	if (name) {
1907 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1908 	} else {
1909 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1910 	}
1911 	dev->create_cb = create_cb;
1912 	dev->destroy_cb = destroy_cb;
1913 	dev->unregister_cb = NULL;
1914 	dev->ctx_size = ctx_size;
1915 	dev->for_each_count = 0;
1916 	dev->unregistered = false;
1917 	dev->refcnt = 0;
1918 
1919 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
1920 		      dev->name, dev->io_device, thread->name);
1921 
1922 	pthread_mutex_lock(&g_devlist_mutex);
1923 	tmp = RB_INSERT(io_device_tree, &g_io_devices, dev);
1924 	if (tmp != NULL) {
1925 		SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1926 			    io_device, tmp->name, dev->name);
1927 		pthread_mutex_unlock(&g_devlist_mutex);
1928 		free(dev);
1929 	}
1930 
1931 	pthread_mutex_unlock(&g_devlist_mutex);
1932 }
1933 
1934 static void
1935 _finish_unregister(void *arg)
1936 {
1937 	struct io_device *dev = arg;
1938 	struct spdk_thread *thread;
1939 
1940 	thread = spdk_get_thread();
1941 	assert(thread == dev->unregister_thread);
1942 
1943 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1944 		      dev->name, dev->io_device, thread->name);
1945 
1946 	assert(thread->pending_unregister_count > 0);
1947 	thread->pending_unregister_count--;
1948 
1949 	dev->unregister_cb(dev->io_device);
1950 	free(dev);
1951 }
1952 
1953 static void
1954 io_device_free(struct io_device *dev)
1955 {
1956 	int rc __attribute__((unused));
1957 
1958 	if (dev->unregister_cb == NULL) {
1959 		free(dev);
1960 	} else {
1961 		assert(dev->unregister_thread != NULL);
1962 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
1963 			      dev->name, dev->io_device, dev->unregister_thread->name);
1964 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1965 		assert(rc == 0);
1966 	}
1967 }
1968 
1969 void
1970 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1971 {
1972 	struct io_device *dev;
1973 	uint32_t refcnt;
1974 	struct spdk_thread *thread;
1975 
1976 	thread = spdk_get_thread();
1977 	if (!thread) {
1978 		SPDK_ERRLOG("called from non-SPDK thread\n");
1979 		assert(false);
1980 		return;
1981 	}
1982 
1983 	pthread_mutex_lock(&g_devlist_mutex);
1984 	dev = io_device_get(io_device);
1985 	if (!dev) {
1986 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1987 		assert(false);
1988 		pthread_mutex_unlock(&g_devlist_mutex);
1989 		return;
1990 	}
1991 
1992 	if (dev->for_each_count > 0) {
1993 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1994 			    dev->name, io_device, dev->for_each_count);
1995 		pthread_mutex_unlock(&g_devlist_mutex);
1996 		return;
1997 	}
1998 
1999 	dev->unregister_cb = unregister_cb;
2000 	dev->unregistered = true;
2001 	RB_REMOVE(io_device_tree, &g_io_devices, dev);
2002 	refcnt = dev->refcnt;
2003 	dev->unregister_thread = thread;
2004 	pthread_mutex_unlock(&g_devlist_mutex);
2005 
2006 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
2007 		      dev->name, dev->io_device, thread->name);
2008 
2009 	if (unregister_cb) {
2010 		thread->pending_unregister_count++;
2011 	}
2012 
2013 	if (refcnt > 0) {
2014 		/* defer deletion */
2015 		return;
2016 	}
2017 
2018 	io_device_free(dev);
2019 }
2020 
2021 const char *
2022 spdk_io_device_get_name(struct io_device *dev)
2023 {
2024 	return dev->name;
2025 }
2026 
2027 struct spdk_io_channel *
2028 spdk_get_io_channel(void *io_device)
2029 {
2030 	struct spdk_io_channel *ch;
2031 	struct spdk_thread *thread;
2032 	struct io_device *dev;
2033 	int rc;
2034 
2035 	pthread_mutex_lock(&g_devlist_mutex);
2036 	dev = io_device_get(io_device);
2037 	if (dev == NULL) {
2038 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
2039 		pthread_mutex_unlock(&g_devlist_mutex);
2040 		return NULL;
2041 	}
2042 
2043 	thread = _get_thread();
2044 	if (!thread) {
2045 		SPDK_ERRLOG("No thread allocated\n");
2046 		pthread_mutex_unlock(&g_devlist_mutex);
2047 		return NULL;
2048 	}
2049 
2050 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
2051 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
2052 		pthread_mutex_unlock(&g_devlist_mutex);
2053 		return NULL;
2054 	}
2055 
2056 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
2057 		if (ch->dev == dev) {
2058 			ch->ref++;
2059 
2060 			SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2061 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
2062 
2063 			/*
2064 			 * An I/O channel already exists for this device on this
2065 			 *  thread, so return it.
2066 			 */
2067 			pthread_mutex_unlock(&g_devlist_mutex);
2068 			spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0,
2069 					  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2070 			return ch;
2071 		}
2072 	}
2073 
2074 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
2075 	if (ch == NULL) {
2076 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
2077 		pthread_mutex_unlock(&g_devlist_mutex);
2078 		return NULL;
2079 	}
2080 
2081 	ch->dev = dev;
2082 	ch->destroy_cb = dev->destroy_cb;
2083 	ch->thread = thread;
2084 	ch->ref = 1;
2085 	ch->destroy_ref = 0;
2086 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
2087 
2088 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2089 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
2090 
2091 	dev->refcnt++;
2092 
2093 	pthread_mutex_unlock(&g_devlist_mutex);
2094 
2095 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
2096 	if (rc != 0) {
2097 		pthread_mutex_lock(&g_devlist_mutex);
2098 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
2099 		dev->refcnt--;
2100 		free(ch);
2101 		pthread_mutex_unlock(&g_devlist_mutex);
2102 		return NULL;
2103 	}
2104 
2105 	spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1);
2106 	return ch;
2107 }
2108 
2109 static void
2110 put_io_channel(void *arg)
2111 {
2112 	struct spdk_io_channel *ch = arg;
2113 	bool do_remove_dev = true;
2114 	struct spdk_thread *thread;
2115 
2116 	thread = spdk_get_thread();
2117 	if (!thread) {
2118 		SPDK_ERRLOG("called from non-SPDK thread\n");
2119 		assert(false);
2120 		return;
2121 	}
2122 
2123 	SPDK_DEBUGLOG(thread,
2124 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
2125 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
2126 
2127 	assert(ch->thread == thread);
2128 
2129 	ch->destroy_ref--;
2130 
2131 	if (ch->ref > 0 || ch->destroy_ref > 0) {
2132 		/*
2133 		 * Another reference to the associated io_device was requested
2134 		 *  after this message was sent but before it had a chance to
2135 		 *  execute.
2136 		 */
2137 		return;
2138 	}
2139 
2140 	pthread_mutex_lock(&g_devlist_mutex);
2141 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
2142 	pthread_mutex_unlock(&g_devlist_mutex);
2143 
2144 	/* Don't hold the devlist mutex while the destroy_cb is called. */
2145 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
2146 
2147 	pthread_mutex_lock(&g_devlist_mutex);
2148 	ch->dev->refcnt--;
2149 
2150 	if (!ch->dev->unregistered) {
2151 		do_remove_dev = false;
2152 	}
2153 
2154 	if (ch->dev->refcnt > 0) {
2155 		do_remove_dev = false;
2156 	}
2157 
2158 	pthread_mutex_unlock(&g_devlist_mutex);
2159 
2160 	if (do_remove_dev) {
2161 		io_device_free(ch->dev);
2162 	}
2163 	free(ch);
2164 }
2165 
2166 void
2167 spdk_put_io_channel(struct spdk_io_channel *ch)
2168 {
2169 	struct spdk_thread *thread;
2170 	int rc __attribute__((unused));
2171 
2172 	spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0,
2173 			  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2174 
2175 	thread = spdk_get_thread();
2176 	if (!thread) {
2177 		SPDK_ERRLOG("called from non-SPDK thread\n");
2178 		assert(false);
2179 		return;
2180 	}
2181 
2182 	if (ch->thread != thread) {
2183 		SPDK_ERRLOG("different from the thread that called get_io_channel()\n");
2184 		assert(false);
2185 		return;
2186 	}
2187 
2188 	SPDK_DEBUGLOG(thread,
2189 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2190 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
2191 
2192 	ch->ref--;
2193 
2194 	if (ch->ref == 0) {
2195 		ch->destroy_ref++;
2196 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
2197 		assert(rc == 0);
2198 	}
2199 }
2200 
2201 struct spdk_io_channel *
2202 spdk_io_channel_from_ctx(void *ctx)
2203 {
2204 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
2205 }
2206 
2207 struct spdk_thread *
2208 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
2209 {
2210 	return ch->thread;
2211 }
2212 
2213 void *
2214 spdk_io_channel_get_io_device(struct spdk_io_channel *ch)
2215 {
2216 	return ch->dev->io_device;
2217 }
2218 
2219 const char *
2220 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch)
2221 {
2222 	return spdk_io_device_get_name(ch->dev);
2223 }
2224 
2225 int
2226 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch)
2227 {
2228 	return ch->ref;
2229 }
2230 
2231 struct spdk_io_channel_iter {
2232 	void *io_device;
2233 	struct io_device *dev;
2234 	spdk_channel_msg fn;
2235 	int status;
2236 	void *ctx;
2237 	struct spdk_io_channel *ch;
2238 
2239 	struct spdk_thread *cur_thread;
2240 
2241 	struct spdk_thread *orig_thread;
2242 	spdk_channel_for_each_cpl cpl;
2243 };
2244 
2245 void *
2246 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
2247 {
2248 	return i->io_device;
2249 }
2250 
2251 struct spdk_io_channel *
2252 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
2253 {
2254 	return i->ch;
2255 }
2256 
2257 void *
2258 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
2259 {
2260 	return i->ctx;
2261 }
2262 
2263 static void
2264 _call_completion(void *ctx)
2265 {
2266 	struct spdk_io_channel_iter *i = ctx;
2267 
2268 	if (i->cpl != NULL) {
2269 		i->cpl(i, i->status);
2270 	}
2271 	free(i);
2272 }
2273 
2274 static void
2275 _call_channel(void *ctx)
2276 {
2277 	struct spdk_io_channel_iter *i = ctx;
2278 	struct spdk_io_channel *ch;
2279 
2280 	/*
2281 	 * It is possible that the channel was deleted before this
2282 	 *  message had a chance to execute.  If so, skip calling
2283 	 *  the fn() on this thread.
2284 	 */
2285 	pthread_mutex_lock(&g_devlist_mutex);
2286 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
2287 		if (ch->dev->io_device == i->io_device) {
2288 			break;
2289 		}
2290 	}
2291 	pthread_mutex_unlock(&g_devlist_mutex);
2292 
2293 	if (ch) {
2294 		i->fn(i);
2295 	} else {
2296 		spdk_for_each_channel_continue(i, 0);
2297 	}
2298 }
2299 
2300 void
2301 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
2302 		      spdk_channel_for_each_cpl cpl)
2303 {
2304 	struct spdk_thread *thread;
2305 	struct spdk_io_channel *ch;
2306 	struct spdk_io_channel_iter *i;
2307 	int rc __attribute__((unused));
2308 
2309 	i = calloc(1, sizeof(*i));
2310 	if (!i) {
2311 		SPDK_ERRLOG("Unable to allocate iterator\n");
2312 		return;
2313 	}
2314 
2315 	i->io_device = io_device;
2316 	i->fn = fn;
2317 	i->ctx = ctx;
2318 	i->cpl = cpl;
2319 
2320 	pthread_mutex_lock(&g_devlist_mutex);
2321 	i->orig_thread = _get_thread();
2322 
2323 	TAILQ_FOREACH(thread, &g_threads, tailq) {
2324 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
2325 			if (ch->dev->io_device == io_device) {
2326 				ch->dev->for_each_count++;
2327 				i->dev = ch->dev;
2328 				i->cur_thread = thread;
2329 				i->ch = ch;
2330 				pthread_mutex_unlock(&g_devlist_mutex);
2331 				rc = spdk_thread_send_msg(thread, _call_channel, i);
2332 				assert(rc == 0);
2333 				return;
2334 			}
2335 		}
2336 	}
2337 
2338 	pthread_mutex_unlock(&g_devlist_mutex);
2339 
2340 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2341 	assert(rc == 0);
2342 }
2343 
2344 void
2345 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
2346 {
2347 	struct spdk_thread *thread;
2348 	struct spdk_io_channel *ch;
2349 	int rc __attribute__((unused));
2350 
2351 	assert(i->cur_thread == spdk_get_thread());
2352 
2353 	i->status = status;
2354 
2355 	pthread_mutex_lock(&g_devlist_mutex);
2356 	if (status) {
2357 		goto end;
2358 	}
2359 	thread = TAILQ_NEXT(i->cur_thread, tailq);
2360 	while (thread) {
2361 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
2362 			if (ch->dev->io_device == i->io_device) {
2363 				i->cur_thread = thread;
2364 				i->ch = ch;
2365 				pthread_mutex_unlock(&g_devlist_mutex);
2366 				rc = spdk_thread_send_msg(thread, _call_channel, i);
2367 				assert(rc == 0);
2368 				return;
2369 			}
2370 		}
2371 		thread = TAILQ_NEXT(thread, tailq);
2372 	}
2373 
2374 end:
2375 	i->dev->for_each_count--;
2376 	i->ch = NULL;
2377 	pthread_mutex_unlock(&g_devlist_mutex);
2378 
2379 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2380 	assert(rc == 0);
2381 }
2382 
2383 struct spdk_interrupt {
2384 	int			efd;
2385 	struct spdk_thread	*thread;
2386 	char			name[SPDK_MAX_POLLER_NAME_LEN + 1];
2387 };
2388 
2389 static void
2390 thread_interrupt_destroy(struct spdk_thread *thread)
2391 {
2392 	struct spdk_fd_group *fgrp = thread->fgrp;
2393 
2394 	SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
2395 
2396 	if (thread->msg_fd < 0) {
2397 		return;
2398 	}
2399 
2400 	spdk_fd_group_remove(fgrp, thread->msg_fd);
2401 	close(thread->msg_fd);
2402 	thread->msg_fd = -1;
2403 
2404 	spdk_fd_group_destroy(fgrp);
2405 	thread->fgrp = NULL;
2406 }
2407 
2408 #ifdef __linux__
2409 static int
2410 thread_interrupt_msg_process(void *arg)
2411 {
2412 	struct spdk_thread *thread = arg;
2413 	uint32_t msg_count;
2414 	spdk_msg_fn critical_msg;
2415 	int rc = 0;
2416 	uint64_t notify = 1;
2417 
2418 	assert(spdk_interrupt_mode_is_enabled());
2419 
2420 	/* There may be race between msg_acknowledge and another producer's msg_notify,
2421 	 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
2422 	 * This can avoid msg notification missing.
2423 	 */
2424 	rc = read(thread->msg_fd, &notify, sizeof(notify));
2425 	if (rc < 0 && errno != EAGAIN) {
2426 		SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno));
2427 	}
2428 
2429 	critical_msg = thread->critical_msg;
2430 	if (spdk_unlikely(critical_msg != NULL)) {
2431 		critical_msg(NULL);
2432 		thread->critical_msg = NULL;
2433 		rc = 1;
2434 	}
2435 
2436 	msg_count = msg_queue_run_batch(thread, 0);
2437 	if (msg_count) {
2438 		rc = 1;
2439 	}
2440 
2441 	return rc;
2442 }
2443 
2444 static int
2445 thread_interrupt_create(struct spdk_thread *thread)
2446 {
2447 	int rc;
2448 
2449 	SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
2450 
2451 	rc = spdk_fd_group_create(&thread->fgrp);
2452 	if (rc) {
2453 		return rc;
2454 	}
2455 
2456 	thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2457 	if (thread->msg_fd < 0) {
2458 		rc = -errno;
2459 		spdk_fd_group_destroy(thread->fgrp);
2460 		thread->fgrp = NULL;
2461 
2462 		return rc;
2463 	}
2464 
2465 	return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread);
2466 }
2467 #else
2468 static int
2469 thread_interrupt_create(struct spdk_thread *thread)
2470 {
2471 	return -ENOTSUP;
2472 }
2473 #endif
2474 
2475 struct spdk_interrupt *
2476 spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
2477 			void *arg, const char *name)
2478 {
2479 	struct spdk_thread *thread;
2480 	struct spdk_interrupt *intr;
2481 
2482 	thread = spdk_get_thread();
2483 	if (!thread) {
2484 		assert(false);
2485 		return NULL;
2486 	}
2487 
2488 	if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
2489 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
2490 		return NULL;
2491 	}
2492 
2493 	if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) {
2494 		return NULL;
2495 	}
2496 
2497 	intr = calloc(1, sizeof(*intr));
2498 	if (intr == NULL) {
2499 		SPDK_ERRLOG("Interrupt handler allocation failed\n");
2500 		return NULL;
2501 	}
2502 
2503 	if (name) {
2504 		snprintf(intr->name, sizeof(intr->name), "%s", name);
2505 	} else {
2506 		snprintf(intr->name, sizeof(intr->name), "%p", fn);
2507 	}
2508 
2509 	intr->efd = efd;
2510 	intr->thread = thread;
2511 
2512 	return intr;
2513 }
2514 
2515 void
2516 spdk_interrupt_unregister(struct spdk_interrupt **pintr)
2517 {
2518 	struct spdk_thread *thread;
2519 	struct spdk_interrupt *intr;
2520 
2521 	intr = *pintr;
2522 	if (intr == NULL) {
2523 		return;
2524 	}
2525 
2526 	*pintr = NULL;
2527 
2528 	thread = spdk_get_thread();
2529 	if (!thread) {
2530 		assert(false);
2531 		return;
2532 	}
2533 
2534 	if (intr->thread != thread) {
2535 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
2536 		assert(false);
2537 		return;
2538 	}
2539 
2540 	spdk_fd_group_remove(thread->fgrp, intr->efd);
2541 	free(intr);
2542 }
2543 
2544 int
2545 spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
2546 			       enum spdk_interrupt_event_types event_types)
2547 {
2548 	struct spdk_thread *thread;
2549 
2550 	thread = spdk_get_thread();
2551 	if (!thread) {
2552 		assert(false);
2553 		return -EINVAL;
2554 	}
2555 
2556 	if (intr->thread != thread) {
2557 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
2558 		assert(false);
2559 		return -EINVAL;
2560 	}
2561 
2562 	return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
2563 }
2564 
2565 int
2566 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
2567 {
2568 	return spdk_fd_group_get_fd(thread->fgrp);
2569 }
2570 
2571 static bool g_interrupt_mode = false;
2572 
2573 int
2574 spdk_interrupt_mode_enable(void)
2575 {
2576 	/* It must be called once prior to initializing the threading library.
2577 	 * g_spdk_msg_mempool will be valid if thread library is initialized.
2578 	 */
2579 	if (g_spdk_msg_mempool) {
2580 		SPDK_ERRLOG("Failed due to threading library is already initailzied.\n");
2581 		return -1;
2582 	}
2583 
2584 #ifdef __linux__
2585 	SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
2586 	g_interrupt_mode = true;
2587 	return 0;
2588 #else
2589 	SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
2590 	g_interrupt_mode = false;
2591 	return -ENOTSUP;
2592 #endif
2593 }
2594 
2595 bool
2596 spdk_interrupt_mode_is_enabled(void)
2597 {
2598 	return g_interrupt_mode;
2599 }
2600 
2601 SPDK_LOG_REGISTER_COMPONENT(thread)
2602