xref: /spdk/lib/thread/thread.c (revision 1e58cb9e988012ba99ab859061a04335141a9524)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/queue.h"
39 #include "spdk/string.h"
40 #include "spdk/thread.h"
41 #include "spdk/util.h"
42 #include "spdk/fd_group.h"
43 
44 #include "spdk/log.h"
45 #include "spdk_internal/thread.h"
46 
47 #ifdef __linux__
48 #include <sys/timerfd.h>
49 #include <sys/eventfd.h>
50 #endif
51 
52 #define SPDK_MSG_BATCH_SIZE		8
53 #define SPDK_MAX_DEVICE_NAME_LEN	256
54 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
55 
56 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
57 
58 static spdk_new_thread_fn g_new_thread_fn = NULL;
59 static spdk_thread_op_fn g_thread_op_fn = NULL;
60 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
61 static size_t g_ctx_sz = 0;
62 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
63  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
64  * SPDK application is required.
65  */
66 static uint64_t g_thread_id = 1;
67 
68 struct io_device {
69 	void				*io_device;
70 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
71 	spdk_io_channel_create_cb	create_cb;
72 	spdk_io_channel_destroy_cb	destroy_cb;
73 	spdk_io_device_unregister_cb	unregister_cb;
74 	struct spdk_thread		*unregister_thread;
75 	uint32_t			ctx_size;
76 	uint32_t			for_each_count;
77 	TAILQ_ENTRY(io_device)		tailq;
78 
79 	uint32_t			refcnt;
80 
81 	bool				unregistered;
82 };
83 
84 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
85 
86 struct spdk_msg {
87 	spdk_msg_fn		fn;
88 	void			*arg;
89 
90 	SLIST_ENTRY(spdk_msg)	link;
91 };
92 
93 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
94 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
95 
96 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
97 static uint32_t g_thread_count = 0;
98 
99 static __thread struct spdk_thread *tls_thread = NULL;
100 
101 static inline struct spdk_thread *
102 _get_thread(void)
103 {
104 	return tls_thread;
105 }
106 
107 static int
108 _thread_lib_init(size_t ctx_sz)
109 {
110 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
111 
112 	g_ctx_sz = ctx_sz;
113 
114 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
115 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
116 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
117 			     sizeof(struct spdk_msg),
118 			     0, /* No cache. We do our own. */
119 			     SPDK_ENV_SOCKET_ID_ANY);
120 
121 	if (!g_spdk_msg_mempool) {
122 		SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
123 		return -1;
124 	}
125 
126 	return 0;
127 }
128 
129 int
130 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
131 {
132 	assert(g_new_thread_fn == NULL);
133 	assert(g_thread_op_fn == NULL);
134 
135 	if (new_thread_fn == NULL) {
136 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
137 	} else {
138 		g_new_thread_fn = new_thread_fn;
139 	}
140 
141 	return _thread_lib_init(ctx_sz);
142 }
143 
144 int
145 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
146 			 spdk_thread_op_supported_fn thread_op_supported_fn,
147 			 size_t ctx_sz)
148 {
149 	assert(g_new_thread_fn == NULL);
150 	assert(g_thread_op_fn == NULL);
151 	assert(g_thread_op_supported_fn == NULL);
152 
153 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
154 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
155 		return -EINVAL;
156 	}
157 
158 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
159 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
160 	} else {
161 		g_thread_op_fn = thread_op_fn;
162 		g_thread_op_supported_fn = thread_op_supported_fn;
163 	}
164 
165 	return _thread_lib_init(ctx_sz);
166 }
167 
168 void
169 spdk_thread_lib_fini(void)
170 {
171 	struct io_device *dev;
172 
173 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
174 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
175 	}
176 
177 	if (g_spdk_msg_mempool) {
178 		spdk_mempool_free(g_spdk_msg_mempool);
179 		g_spdk_msg_mempool = NULL;
180 	}
181 
182 	g_new_thread_fn = NULL;
183 	g_thread_op_fn = NULL;
184 	g_thread_op_supported_fn = NULL;
185 	g_ctx_sz = 0;
186 }
187 
188 static void thread_interrupt_destroy(struct spdk_thread *thread);
189 static int thread_interrupt_create(struct spdk_thread *thread);
190 
191 static void
192 _free_thread(struct spdk_thread *thread)
193 {
194 	struct spdk_io_channel *ch;
195 	struct spdk_msg *msg;
196 	struct spdk_poller *poller, *ptmp;
197 
198 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
199 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
200 			    thread->name, ch->dev->name);
201 	}
202 
203 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
204 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
205 			SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
206 				     poller->name);
207 		}
208 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
209 		free(poller);
210 	}
211 
212 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) {
213 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
214 			SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
215 				     poller->name);
216 		}
217 		TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
218 		free(poller);
219 	}
220 
221 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
222 		SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
223 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
224 		free(poller);
225 	}
226 
227 	pthread_mutex_lock(&g_devlist_mutex);
228 	assert(g_thread_count > 0);
229 	g_thread_count--;
230 	TAILQ_REMOVE(&g_threads, thread, tailq);
231 	pthread_mutex_unlock(&g_devlist_mutex);
232 
233 	msg = SLIST_FIRST(&thread->msg_cache);
234 	while (msg != NULL) {
235 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
236 
237 		assert(thread->msg_cache_count > 0);
238 		thread->msg_cache_count--;
239 		spdk_mempool_put(g_spdk_msg_mempool, msg);
240 
241 		msg = SLIST_FIRST(&thread->msg_cache);
242 	}
243 
244 	assert(thread->msg_cache_count == 0);
245 
246 	if (spdk_interrupt_mode_is_enabled()) {
247 		thread_interrupt_destroy(thread);
248 	}
249 
250 	spdk_ring_free(thread->messages);
251 	free(thread);
252 }
253 
254 struct spdk_thread *
255 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
256 {
257 	struct spdk_thread *thread;
258 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
259 	int rc = 0, i;
260 
261 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
262 	if (!thread) {
263 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
264 		return NULL;
265 	}
266 
267 	if (cpumask) {
268 		spdk_cpuset_copy(&thread->cpumask, cpumask);
269 	} else {
270 		spdk_cpuset_negate(&thread->cpumask);
271 	}
272 
273 	TAILQ_INIT(&thread->io_channels);
274 	TAILQ_INIT(&thread->active_pollers);
275 	TAILQ_INIT(&thread->timed_pollers);
276 	TAILQ_INIT(&thread->paused_pollers);
277 	SLIST_INIT(&thread->msg_cache);
278 	thread->msg_cache_count = 0;
279 
280 	thread->tsc_last = spdk_get_ticks();
281 
282 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
283 	if (!thread->messages) {
284 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
285 		free(thread);
286 		return NULL;
287 	}
288 
289 	/* Fill the local message pool cache. */
290 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
291 	if (rc == 0) {
292 		/* If we can't populate the cache it's ok. The cache will get filled
293 		 * up organically as messages are passed to the thread. */
294 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
295 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
296 			thread->msg_cache_count++;
297 		}
298 	}
299 
300 	if (name) {
301 		snprintf(thread->name, sizeof(thread->name), "%s", name);
302 	} else {
303 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
304 	}
305 
306 	pthread_mutex_lock(&g_devlist_mutex);
307 	if (g_thread_id == 0) {
308 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
309 		pthread_mutex_unlock(&g_devlist_mutex);
310 		_free_thread(thread);
311 		return NULL;
312 	}
313 	thread->id = g_thread_id++;
314 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
315 	g_thread_count++;
316 	pthread_mutex_unlock(&g_devlist_mutex);
317 
318 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
319 		      thread->id, thread->name);
320 
321 	if (spdk_interrupt_mode_is_enabled()) {
322 		thread->in_interrupt = true;
323 		rc = thread_interrupt_create(thread);
324 		if (rc != 0) {
325 			_free_thread(thread);
326 			return NULL;
327 		}
328 	}
329 
330 	if (g_new_thread_fn) {
331 		rc = g_new_thread_fn(thread);
332 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
333 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
334 	}
335 
336 	if (rc != 0) {
337 		_free_thread(thread);
338 		return NULL;
339 	}
340 
341 	thread->state = SPDK_THREAD_STATE_RUNNING;
342 
343 	return thread;
344 }
345 
346 void
347 spdk_set_thread(struct spdk_thread *thread)
348 {
349 	tls_thread = thread;
350 }
351 
352 static void
353 thread_exit(struct spdk_thread *thread, uint64_t now)
354 {
355 	struct spdk_poller *poller;
356 	struct spdk_io_channel *ch;
357 
358 	if (now >= thread->exit_timeout_tsc) {
359 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
360 			    thread->name);
361 		goto exited;
362 	}
363 
364 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
365 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
366 			SPDK_INFOLOG(thread,
367 				     "thread %s still has active poller %s\n",
368 				     thread->name, poller->name);
369 			return;
370 		}
371 	}
372 
373 	TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) {
374 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
375 			SPDK_INFOLOG(thread,
376 				     "thread %s still has active timed poller %s\n",
377 				     thread->name, poller->name);
378 			return;
379 		}
380 	}
381 
382 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
383 		SPDK_INFOLOG(thread,
384 			     "thread %s still has paused poller %s\n",
385 			     thread->name, poller->name);
386 		return;
387 	}
388 
389 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
390 		SPDK_INFOLOG(thread,
391 			     "thread %s still has channel for io_device %s\n",
392 			     thread->name, ch->dev->name);
393 		return;
394 	}
395 
396 	if (thread->pending_unregister_count > 0) {
397 		SPDK_INFOLOG(thread,
398 			     "thread %s is still unregistering io_devices\n",
399 			     thread->name);
400 		return;
401 	}
402 
403 exited:
404 	thread->state = SPDK_THREAD_STATE_EXITED;
405 }
406 
407 int
408 spdk_thread_exit(struct spdk_thread *thread)
409 {
410 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
411 
412 	assert(tls_thread == thread);
413 
414 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
415 		SPDK_INFOLOG(thread,
416 			     "thread %s is already exiting\n",
417 			     thread->name);
418 		return 0;
419 	}
420 
421 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
422 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
423 	thread->state = SPDK_THREAD_STATE_EXITING;
424 	return 0;
425 }
426 
427 bool
428 spdk_thread_is_exited(struct spdk_thread *thread)
429 {
430 	return thread->state == SPDK_THREAD_STATE_EXITED;
431 }
432 
433 void
434 spdk_thread_destroy(struct spdk_thread *thread)
435 {
436 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
437 
438 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
439 
440 	if (tls_thread == thread) {
441 		tls_thread = NULL;
442 	}
443 
444 	_free_thread(thread);
445 }
446 
447 void *
448 spdk_thread_get_ctx(struct spdk_thread *thread)
449 {
450 	if (g_ctx_sz > 0) {
451 		return thread->ctx;
452 	}
453 
454 	return NULL;
455 }
456 
457 struct spdk_cpuset *
458 spdk_thread_get_cpumask(struct spdk_thread *thread)
459 {
460 	return &thread->cpumask;
461 }
462 
463 int
464 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
465 {
466 	struct spdk_thread *thread;
467 
468 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
469 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
470 		assert(false);
471 		return -ENOTSUP;
472 	}
473 
474 	thread = spdk_get_thread();
475 	if (!thread) {
476 		SPDK_ERRLOG("Called from non-SPDK thread\n");
477 		assert(false);
478 		return -EINVAL;
479 	}
480 
481 	spdk_cpuset_copy(&thread->cpumask, cpumask);
482 
483 	/* Invoke framework's reschedule operation. If this function is called multiple times
484 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
485 	 * reschedule operation.
486 	 */
487 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
488 
489 	return 0;
490 }
491 
492 struct spdk_thread *
493 spdk_thread_get_from_ctx(void *ctx)
494 {
495 	if (ctx == NULL) {
496 		assert(false);
497 		return NULL;
498 	}
499 
500 	assert(g_ctx_sz > 0);
501 
502 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
503 }
504 
505 static inline uint32_t
506 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
507 {
508 	unsigned count, i;
509 	void *messages[SPDK_MSG_BATCH_SIZE];
510 	uint64_t notify = 1;
511 	int rc;
512 
513 #ifdef DEBUG
514 	/*
515 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
516 	 * so we will never actually read uninitialized data from events, but just to be sure
517 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
518 	 */
519 	memset(messages, 0, sizeof(messages));
520 #endif
521 
522 	if (max_msgs > 0) {
523 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
524 	} else {
525 		max_msgs = SPDK_MSG_BATCH_SIZE;
526 	}
527 
528 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
529 	if (spdk_unlikely(thread->in_interrupt) &&
530 	    spdk_ring_count(thread->messages) != 0) {
531 		rc = write(thread->msg_fd, &notify, sizeof(notify));
532 		if (rc < 0) {
533 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
534 		}
535 	}
536 	if (count == 0) {
537 		return 0;
538 	}
539 
540 	for (i = 0; i < count; i++) {
541 		struct spdk_msg *msg = messages[i];
542 
543 		assert(msg != NULL);
544 		msg->fn(msg->arg);
545 
546 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
547 			/* Insert the messages at the head. We want to re-use the hot
548 			 * ones. */
549 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
550 			thread->msg_cache_count++;
551 		} else {
552 			spdk_mempool_put(g_spdk_msg_mempool, msg);
553 		}
554 	}
555 
556 	return count;
557 }
558 
559 static void
560 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
561 {
562 	struct spdk_poller *iter;
563 
564 	poller->next_run_tick = now + poller->period_ticks;
565 
566 	/*
567 	 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled
568 	 * run time.
569 	 */
570 	TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) {
571 		if (iter->next_run_tick <= poller->next_run_tick) {
572 			TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq);
573 			return;
574 		}
575 	}
576 
577 	/* No earlier pollers were found, so this poller must be the new head */
578 	TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq);
579 }
580 
581 static void
582 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
583 {
584 	if (poller->period_ticks) {
585 		poller_insert_timer(thread, poller, spdk_get_ticks());
586 	} else {
587 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
588 	}
589 }
590 
591 static inline void
592 thread_update_stats(struct spdk_thread *thread, uint64_t end,
593 		    uint64_t start, int rc)
594 {
595 	if (rc == 0) {
596 		/* Poller status idle */
597 		thread->stats.idle_tsc += end - start;
598 	} else if (rc > 0) {
599 		/* Poller status busy */
600 		thread->stats.busy_tsc += end - start;
601 	}
602 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
603 	thread->tsc_last = end;
604 }
605 
606 static int
607 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
608 {
609 	uint32_t msg_count;
610 	struct spdk_poller *poller, *tmp;
611 	spdk_msg_fn critical_msg;
612 	int rc = 0;
613 
614 	thread->tsc_last = now;
615 
616 	critical_msg = thread->critical_msg;
617 	if (spdk_unlikely(critical_msg != NULL)) {
618 		critical_msg(NULL);
619 		thread->critical_msg = NULL;
620 		rc = 1;
621 	}
622 
623 	msg_count = msg_queue_run_batch(thread, max_msgs);
624 	if (msg_count) {
625 		rc = 1;
626 	}
627 
628 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
629 				   active_pollers_head, tailq, tmp) {
630 		int poller_rc;
631 
632 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
633 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
634 			free(poller);
635 			continue;
636 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
637 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
638 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
639 			poller->state = SPDK_POLLER_STATE_PAUSED;
640 			continue;
641 		}
642 
643 		poller->state = SPDK_POLLER_STATE_RUNNING;
644 		poller_rc = poller->fn(poller->arg);
645 
646 		poller->run_count++;
647 		if (poller_rc > 0) {
648 			poller->busy_count++;
649 		}
650 
651 #ifdef DEBUG
652 		if (poller_rc == -1) {
653 			SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
654 		}
655 #endif
656 
657 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
658 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
659 			free(poller);
660 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
661 			poller->state = SPDK_POLLER_STATE_WAITING;
662 		}
663 
664 		if (poller_rc > rc) {
665 			rc = poller_rc;
666 		}
667 	}
668 
669 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) {
670 		int timer_rc = 0;
671 
672 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
673 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
674 			free(poller);
675 			continue;
676 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
677 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
678 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
679 			poller->state = SPDK_POLLER_STATE_PAUSED;
680 			continue;
681 		}
682 
683 		if (now < poller->next_run_tick) {
684 			break;
685 		}
686 
687 		poller->state = SPDK_POLLER_STATE_RUNNING;
688 		timer_rc = poller->fn(poller->arg);
689 
690 		poller->run_count++;
691 		if (timer_rc > 0) {
692 			poller->busy_count++;
693 		}
694 
695 #ifdef DEBUG
696 		if (timer_rc == -1) {
697 			SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
698 		}
699 #endif
700 
701 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
702 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
703 			free(poller);
704 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
705 			poller->state = SPDK_POLLER_STATE_WAITING;
706 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
707 			poller_insert_timer(thread, poller, now);
708 		}
709 
710 		if (timer_rc > rc) {
711 			rc = timer_rc;
712 		}
713 	}
714 
715 	return rc;
716 }
717 
718 int
719 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
720 {
721 	struct spdk_thread *orig_thread;
722 	int rc;
723 
724 	orig_thread = _get_thread();
725 	tls_thread = thread;
726 
727 	if (now == 0) {
728 		now = spdk_get_ticks();
729 	}
730 
731 	if (spdk_likely(!thread->in_interrupt)) {
732 		rc = thread_poll(thread, max_msgs, now);
733 	} else {
734 		/* Non-block wait on thread's fd_group */
735 		rc = spdk_fd_group_wait(thread->fgrp, 0);
736 	}
737 
738 
739 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
740 		thread_exit(thread, now);
741 	}
742 
743 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
744 
745 	tls_thread = orig_thread;
746 
747 	return rc;
748 }
749 
750 uint64_t
751 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
752 {
753 	struct spdk_poller *poller;
754 
755 	poller = TAILQ_FIRST(&thread->timed_pollers);
756 	if (poller) {
757 		return poller->next_run_tick;
758 	}
759 
760 	return 0;
761 }
762 
763 int
764 spdk_thread_has_active_pollers(struct spdk_thread *thread)
765 {
766 	return !TAILQ_EMPTY(&thread->active_pollers);
767 }
768 
769 static bool
770 thread_has_unpaused_pollers(struct spdk_thread *thread)
771 {
772 	if (TAILQ_EMPTY(&thread->active_pollers) &&
773 	    TAILQ_EMPTY(&thread->timed_pollers)) {
774 		return false;
775 	}
776 
777 	return true;
778 }
779 
780 bool
781 spdk_thread_has_pollers(struct spdk_thread *thread)
782 {
783 	if (!thread_has_unpaused_pollers(thread) &&
784 	    TAILQ_EMPTY(&thread->paused_pollers)) {
785 		return false;
786 	}
787 
788 	return true;
789 }
790 
791 bool
792 spdk_thread_is_idle(struct spdk_thread *thread)
793 {
794 	if (spdk_ring_count(thread->messages) ||
795 	    thread_has_unpaused_pollers(thread) ||
796 	    thread->critical_msg != NULL) {
797 		return false;
798 	}
799 
800 	return true;
801 }
802 
803 uint32_t
804 spdk_thread_get_count(void)
805 {
806 	/*
807 	 * Return cached value of the current thread count.  We could acquire the
808 	 *  lock and iterate through the TAILQ of threads to count them, but that
809 	 *  count could still be invalidated after we release the lock.
810 	 */
811 	return g_thread_count;
812 }
813 
814 struct spdk_thread *
815 spdk_get_thread(void)
816 {
817 	return _get_thread();
818 }
819 
820 const char *
821 spdk_thread_get_name(const struct spdk_thread *thread)
822 {
823 	return thread->name;
824 }
825 
826 uint64_t
827 spdk_thread_get_id(const struct spdk_thread *thread)
828 {
829 	return thread->id;
830 }
831 
832 struct spdk_thread *
833 spdk_thread_get_by_id(uint64_t id)
834 {
835 	struct spdk_thread *thread;
836 
837 	if (id == 0 || id >= g_thread_id) {
838 		SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
839 		return NULL;
840 	}
841 	pthread_mutex_lock(&g_devlist_mutex);
842 	TAILQ_FOREACH(thread, &g_threads, tailq) {
843 		if (thread->id == id) {
844 			break;
845 		}
846 	}
847 	pthread_mutex_unlock(&g_devlist_mutex);
848 	return thread;
849 }
850 
851 int
852 spdk_thread_get_stats(struct spdk_thread_stats *stats)
853 {
854 	struct spdk_thread *thread;
855 
856 	thread = _get_thread();
857 	if (!thread) {
858 		SPDK_ERRLOG("No thread allocated\n");
859 		return -EINVAL;
860 	}
861 
862 	if (stats == NULL) {
863 		return -EINVAL;
864 	}
865 
866 	*stats = thread->stats;
867 
868 	return 0;
869 }
870 
871 uint64_t
872 spdk_thread_get_last_tsc(struct spdk_thread *thread)
873 {
874 	if (thread == NULL) {
875 		thread = _get_thread();
876 	}
877 
878 	return thread->tsc_last;
879 }
880 
881 static inline int
882 thread_send_msg_notification(const struct spdk_thread *target_thread)
883 {
884 	uint64_t notify = 1;
885 	int rc;
886 
887 	/* Not necessary to do notification if interrupt facility is not enabled */
888 	if (spdk_likely(!spdk_interrupt_mode_is_enabled())) {
889 		return 0;
890 	}
891 
892 	if (spdk_unlikely(target_thread->in_interrupt)) {
893 		rc = write(target_thread->msg_fd, &notify, sizeof(notify));
894 		if (rc < 0) {
895 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
896 			return -EIO;
897 		}
898 	}
899 
900 	return 0;
901 }
902 
903 int
904 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
905 {
906 	struct spdk_thread *local_thread;
907 	struct spdk_msg *msg;
908 	int rc;
909 
910 	assert(thread != NULL);
911 
912 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
913 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
914 		return -EIO;
915 	}
916 
917 	local_thread = _get_thread();
918 
919 	msg = NULL;
920 	if (local_thread != NULL) {
921 		if (local_thread->msg_cache_count > 0) {
922 			msg = SLIST_FIRST(&local_thread->msg_cache);
923 			assert(msg != NULL);
924 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
925 			local_thread->msg_cache_count--;
926 		}
927 	}
928 
929 	if (msg == NULL) {
930 		msg = spdk_mempool_get(g_spdk_msg_mempool);
931 		if (!msg) {
932 			SPDK_ERRLOG("msg could not be allocated\n");
933 			return -ENOMEM;
934 		}
935 	}
936 
937 	msg->fn = fn;
938 	msg->arg = ctx;
939 
940 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
941 	if (rc != 1) {
942 		SPDK_ERRLOG("msg could not be enqueued\n");
943 		spdk_mempool_put(g_spdk_msg_mempool, msg);
944 		return -EIO;
945 	}
946 
947 	return thread_send_msg_notification(thread);
948 }
949 
950 int
951 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
952 {
953 	spdk_msg_fn expected = NULL;
954 
955 	if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
956 					 __ATOMIC_SEQ_CST)) {
957 		return -EIO;
958 	}
959 
960 	return thread_send_msg_notification(thread);
961 }
962 
963 #ifdef __linux__
964 static int
965 interrupt_timerfd_prepare(uint64_t period_microseconds)
966 {
967 	int timerfd;
968 	int ret;
969 	struct itimerspec new_tv;
970 	uint64_t period_seconds;
971 	uint64_t period_nanoseconds;
972 
973 	if (period_microseconds == 0) {
974 		return -EINVAL;
975 	}
976 
977 	period_seconds = period_microseconds / SPDK_SEC_TO_USEC;
978 	period_nanoseconds = period_microseconds % SPDK_SEC_TO_USEC * 1000;
979 
980 	new_tv.it_value.tv_sec = period_seconds;
981 	new_tv.it_value.tv_nsec = period_nanoseconds;
982 
983 	new_tv.it_interval.tv_sec = period_seconds;
984 	new_tv.it_interval.tv_nsec = period_nanoseconds;
985 
986 	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK & TFD_CLOEXEC);
987 	if (timerfd < 0) {
988 		return -errno;
989 	}
990 
991 	ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
992 	if (ret < 0) {
993 		close(timerfd);
994 		return -errno;
995 	}
996 
997 	return timerfd;
998 }
999 #else
1000 static int
1001 interrupt_timerfd_prepare(uint64_t period_microseconds)
1002 {
1003 	return -ENOTSUP;
1004 }
1005 #endif
1006 
1007 static int
1008 interrupt_timerfd_process(void *arg)
1009 {
1010 	struct spdk_poller *poller = arg;
1011 	uint64_t exp;
1012 	int rc;
1013 
1014 	/* clear the level of interval timer */
1015 	rc = read(poller->timerfd, &exp, sizeof(exp));
1016 	if (rc < 0) {
1017 		if (rc == -EAGAIN) {
1018 			return 0;
1019 		}
1020 
1021 		return rc;
1022 	}
1023 
1024 	return poller->fn(poller->arg);
1025 }
1026 
1027 static int
1028 thread_interrupt_register_timerfd(struct spdk_fd_group *fgrp,
1029 				  uint64_t period_microseconds,
1030 				  struct spdk_poller *poller)
1031 {
1032 	int timerfd;
1033 	int rc;
1034 
1035 	timerfd = interrupt_timerfd_prepare(period_microseconds);
1036 	if (timerfd < 0) {
1037 		return timerfd;
1038 	}
1039 
1040 	rc = spdk_fd_group_add(fgrp, timerfd,
1041 			       interrupt_timerfd_process, poller);
1042 	if (rc < 0) {
1043 		close(timerfd);
1044 		return rc;
1045 	}
1046 
1047 	return timerfd;
1048 }
1049 
1050 static struct spdk_poller *
1051 poller_register(spdk_poller_fn fn,
1052 		void *arg,
1053 		uint64_t period_microseconds,
1054 		const char *name)
1055 {
1056 	struct spdk_thread *thread;
1057 	struct spdk_poller *poller;
1058 	uint64_t quotient, remainder, ticks;
1059 
1060 	thread = spdk_get_thread();
1061 	if (!thread) {
1062 		assert(false);
1063 		return NULL;
1064 	}
1065 
1066 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1067 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1068 		return NULL;
1069 	}
1070 
1071 	poller = calloc(1, sizeof(*poller));
1072 	if (poller == NULL) {
1073 		SPDK_ERRLOG("Poller memory allocation failed\n");
1074 		return NULL;
1075 	}
1076 
1077 	if (name) {
1078 		snprintf(poller->name, sizeof(poller->name), "%s", name);
1079 	} else {
1080 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
1081 	}
1082 
1083 	poller->state = SPDK_POLLER_STATE_WAITING;
1084 	poller->fn = fn;
1085 	poller->arg = arg;
1086 	poller->thread = thread;
1087 
1088 	if (period_microseconds) {
1089 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
1090 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
1091 		ticks = spdk_get_ticks_hz();
1092 
1093 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
1094 	} else {
1095 		poller->period_ticks = 0;
1096 	}
1097 
1098 	if (spdk_interrupt_mode_is_enabled() && period_microseconds != 0) {
1099 		int rc;
1100 
1101 		poller->timerfd = -1;
1102 		rc = thread_interrupt_register_timerfd(thread->fgrp, period_microseconds, poller);
1103 		if (rc < 0) {
1104 			SPDK_ERRLOG("Failed to register timerfd for periodic poller: %s\n", spdk_strerror(-rc));
1105 			free(poller);
1106 			return NULL;
1107 		}
1108 		poller->timerfd = rc;
1109 	}
1110 
1111 	thread_insert_poller(thread, poller);
1112 
1113 	return poller;
1114 }
1115 
1116 struct spdk_poller *
1117 spdk_poller_register(spdk_poller_fn fn,
1118 		     void *arg,
1119 		     uint64_t period_microseconds)
1120 {
1121 	return poller_register(fn, arg, period_microseconds, NULL);
1122 }
1123 
1124 struct spdk_poller *
1125 spdk_poller_register_named(spdk_poller_fn fn,
1126 			   void *arg,
1127 			   uint64_t period_microseconds,
1128 			   const char *name)
1129 {
1130 	return poller_register(fn, arg, period_microseconds, name);
1131 }
1132 
1133 void
1134 spdk_poller_unregister(struct spdk_poller **ppoller)
1135 {
1136 	struct spdk_thread *thread;
1137 	struct spdk_poller *poller;
1138 
1139 	poller = *ppoller;
1140 	if (poller == NULL) {
1141 		return;
1142 	}
1143 
1144 	*ppoller = NULL;
1145 
1146 	thread = spdk_get_thread();
1147 	if (!thread) {
1148 		assert(false);
1149 		return;
1150 	}
1151 
1152 	if (poller->thread != thread) {
1153 		SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n");
1154 		assert(false);
1155 		return;
1156 	}
1157 
1158 	if (spdk_interrupt_mode_is_enabled() && poller->timerfd >= 0) {
1159 		spdk_fd_group_remove(thread->fgrp, poller->timerfd);
1160 		close(poller->timerfd);
1161 		poller->timerfd = -1;
1162 	}
1163 
1164 	/* If the poller was paused, put it on the active_pollers list so that
1165 	 * its unregistration can be processed by spdk_thread_poll().
1166 	 */
1167 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1168 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1169 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1170 		poller->period_ticks = 0;
1171 	}
1172 
1173 	/* Simply set the state to unregistered. The poller will get cleaned up
1174 	 * in a subsequent call to spdk_thread_poll().
1175 	 */
1176 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1177 }
1178 
1179 void
1180 spdk_poller_pause(struct spdk_poller *poller)
1181 {
1182 	struct spdk_thread *thread;
1183 
1184 	if (poller->state == SPDK_POLLER_STATE_PAUSED ||
1185 	    poller->state == SPDK_POLLER_STATE_PAUSING) {
1186 		return;
1187 	}
1188 
1189 	thread = spdk_get_thread();
1190 	if (!thread) {
1191 		assert(false);
1192 		return;
1193 	}
1194 
1195 	/* If a poller is paused from within itself, we can immediately move it
1196 	 * on the paused_pollers list.  Otherwise we just set its state to
1197 	 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it.  It
1198 	 * allows a poller to be paused from another one's context without
1199 	 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration.
1200 	 */
1201 	if (poller->state != SPDK_POLLER_STATE_RUNNING) {
1202 		poller->state = SPDK_POLLER_STATE_PAUSING;
1203 	} else {
1204 		if (poller->period_ticks > 0) {
1205 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
1206 		} else {
1207 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1208 		}
1209 
1210 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1211 		poller->state = SPDK_POLLER_STATE_PAUSED;
1212 	}
1213 }
1214 
1215 void
1216 spdk_poller_resume(struct spdk_poller *poller)
1217 {
1218 	struct spdk_thread *thread;
1219 
1220 	if (poller->state != SPDK_POLLER_STATE_PAUSED &&
1221 	    poller->state != SPDK_POLLER_STATE_PAUSING) {
1222 		return;
1223 	}
1224 
1225 	thread = spdk_get_thread();
1226 	if (!thread) {
1227 		assert(false);
1228 		return;
1229 	}
1230 
1231 	/* If a poller is paused it has to be removed from the paused pollers
1232 	 * list and put on the active / timer list depending on its
1233 	 * period_ticks.  If a poller is still in the process of being paused,
1234 	 * we just need to flip its state back to waiting, as it's already on
1235 	 * the appropriate list.
1236 	 */
1237 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1238 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1239 		thread_insert_poller(thread, poller);
1240 	}
1241 
1242 	poller->state = SPDK_POLLER_STATE_WAITING;
1243 }
1244 
1245 const char *
1246 spdk_poller_state_str(enum spdk_poller_state state)
1247 {
1248 	switch (state) {
1249 	case SPDK_POLLER_STATE_WAITING:
1250 		return "waiting";
1251 	case SPDK_POLLER_STATE_RUNNING:
1252 		return "running";
1253 	case SPDK_POLLER_STATE_UNREGISTERED:
1254 		return "unregistered";
1255 	case SPDK_POLLER_STATE_PAUSING:
1256 		return "pausing";
1257 	case SPDK_POLLER_STATE_PAUSED:
1258 		return "paused";
1259 	default:
1260 		return NULL;
1261 	}
1262 }
1263 
1264 struct call_thread {
1265 	struct spdk_thread *cur_thread;
1266 	spdk_msg_fn fn;
1267 	void *ctx;
1268 
1269 	struct spdk_thread *orig_thread;
1270 	spdk_msg_fn cpl;
1271 };
1272 
1273 static void
1274 _on_thread(void *ctx)
1275 {
1276 	struct call_thread *ct = ctx;
1277 	int rc __attribute__((unused));
1278 
1279 	ct->fn(ct->ctx);
1280 
1281 	pthread_mutex_lock(&g_devlist_mutex);
1282 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1283 	pthread_mutex_unlock(&g_devlist_mutex);
1284 
1285 	if (!ct->cur_thread) {
1286 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1287 
1288 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1289 		free(ctx);
1290 	} else {
1291 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1292 			      ct->cur_thread->name);
1293 
1294 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1295 	}
1296 	assert(rc == 0);
1297 }
1298 
1299 void
1300 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1301 {
1302 	struct call_thread *ct;
1303 	struct spdk_thread *thread;
1304 	int rc __attribute__((unused));
1305 
1306 	ct = calloc(1, sizeof(*ct));
1307 	if (!ct) {
1308 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1309 		cpl(ctx);
1310 		return;
1311 	}
1312 
1313 	ct->fn = fn;
1314 	ct->ctx = ctx;
1315 	ct->cpl = cpl;
1316 
1317 	thread = _get_thread();
1318 	if (!thread) {
1319 		SPDK_ERRLOG("No thread allocated\n");
1320 		free(ct);
1321 		cpl(ctx);
1322 		return;
1323 	}
1324 	ct->orig_thread = thread;
1325 
1326 	pthread_mutex_lock(&g_devlist_mutex);
1327 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1328 	pthread_mutex_unlock(&g_devlist_mutex);
1329 
1330 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
1331 		      ct->orig_thread->name);
1332 
1333 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1334 	assert(rc == 0);
1335 }
1336 
1337 void
1338 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1339 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1340 			const char *name)
1341 {
1342 	struct io_device *dev, *tmp;
1343 	struct spdk_thread *thread;
1344 
1345 	assert(io_device != NULL);
1346 	assert(create_cb != NULL);
1347 	assert(destroy_cb != NULL);
1348 
1349 	thread = spdk_get_thread();
1350 	if (!thread) {
1351 		SPDK_ERRLOG("called from non-SPDK thread\n");
1352 		assert(false);
1353 		return;
1354 	}
1355 
1356 	dev = calloc(1, sizeof(struct io_device));
1357 	if (dev == NULL) {
1358 		SPDK_ERRLOG("could not allocate io_device\n");
1359 		return;
1360 	}
1361 
1362 	dev->io_device = io_device;
1363 	if (name) {
1364 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1365 	} else {
1366 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1367 	}
1368 	dev->create_cb = create_cb;
1369 	dev->destroy_cb = destroy_cb;
1370 	dev->unregister_cb = NULL;
1371 	dev->ctx_size = ctx_size;
1372 	dev->for_each_count = 0;
1373 	dev->unregistered = false;
1374 	dev->refcnt = 0;
1375 
1376 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
1377 		      dev->name, dev->io_device, thread->name);
1378 
1379 	pthread_mutex_lock(&g_devlist_mutex);
1380 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
1381 		if (tmp->io_device == io_device) {
1382 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1383 				    io_device, tmp->name, dev->name);
1384 			free(dev);
1385 			pthread_mutex_unlock(&g_devlist_mutex);
1386 			return;
1387 		}
1388 	}
1389 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1390 	pthread_mutex_unlock(&g_devlist_mutex);
1391 }
1392 
1393 static void
1394 _finish_unregister(void *arg)
1395 {
1396 	struct io_device *dev = arg;
1397 	struct spdk_thread *thread;
1398 
1399 	thread = spdk_get_thread();
1400 	assert(thread == dev->unregister_thread);
1401 
1402 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1403 		      dev->name, dev->io_device, thread->name);
1404 
1405 	assert(thread->pending_unregister_count > 0);
1406 	thread->pending_unregister_count--;
1407 
1408 	dev->unregister_cb(dev->io_device);
1409 	free(dev);
1410 }
1411 
1412 static void
1413 io_device_free(struct io_device *dev)
1414 {
1415 	int rc __attribute__((unused));
1416 
1417 	if (dev->unregister_cb == NULL) {
1418 		free(dev);
1419 	} else {
1420 		assert(dev->unregister_thread != NULL);
1421 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
1422 			      dev->name, dev->io_device, dev->unregister_thread->name);
1423 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1424 		assert(rc == 0);
1425 	}
1426 }
1427 
1428 void
1429 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1430 {
1431 	struct io_device *dev;
1432 	uint32_t refcnt;
1433 	struct spdk_thread *thread;
1434 
1435 	thread = spdk_get_thread();
1436 	if (!thread) {
1437 		SPDK_ERRLOG("called from non-SPDK thread\n");
1438 		assert(false);
1439 		return;
1440 	}
1441 
1442 	pthread_mutex_lock(&g_devlist_mutex);
1443 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1444 		if (dev->io_device == io_device) {
1445 			break;
1446 		}
1447 	}
1448 
1449 	if (!dev) {
1450 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1451 		assert(false);
1452 		pthread_mutex_unlock(&g_devlist_mutex);
1453 		return;
1454 	}
1455 
1456 	if (dev->for_each_count > 0) {
1457 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1458 			    dev->name, io_device, dev->for_each_count);
1459 		pthread_mutex_unlock(&g_devlist_mutex);
1460 		return;
1461 	}
1462 
1463 	dev->unregister_cb = unregister_cb;
1464 	dev->unregistered = true;
1465 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
1466 	refcnt = dev->refcnt;
1467 	dev->unregister_thread = thread;
1468 	pthread_mutex_unlock(&g_devlist_mutex);
1469 
1470 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
1471 		      dev->name, dev->io_device, thread->name);
1472 
1473 	if (unregister_cb) {
1474 		thread->pending_unregister_count++;
1475 	}
1476 
1477 	if (refcnt > 0) {
1478 		/* defer deletion */
1479 		return;
1480 	}
1481 
1482 	io_device_free(dev);
1483 }
1484 
1485 const char *
1486 spdk_io_device_get_name(struct io_device *dev)
1487 {
1488 	return dev->name;
1489 }
1490 
1491 struct spdk_io_channel *
1492 spdk_get_io_channel(void *io_device)
1493 {
1494 	struct spdk_io_channel *ch;
1495 	struct spdk_thread *thread;
1496 	struct io_device *dev;
1497 	int rc;
1498 
1499 	pthread_mutex_lock(&g_devlist_mutex);
1500 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1501 		if (dev->io_device == io_device) {
1502 			break;
1503 		}
1504 	}
1505 	if (dev == NULL) {
1506 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
1507 		pthread_mutex_unlock(&g_devlist_mutex);
1508 		return NULL;
1509 	}
1510 
1511 	thread = _get_thread();
1512 	if (!thread) {
1513 		SPDK_ERRLOG("No thread allocated\n");
1514 		pthread_mutex_unlock(&g_devlist_mutex);
1515 		return NULL;
1516 	}
1517 
1518 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1519 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
1520 		pthread_mutex_unlock(&g_devlist_mutex);
1521 		return NULL;
1522 	}
1523 
1524 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1525 		if (ch->dev == dev) {
1526 			ch->ref++;
1527 
1528 			SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1529 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
1530 
1531 			/*
1532 			 * An I/O channel already exists for this device on this
1533 			 *  thread, so return it.
1534 			 */
1535 			pthread_mutex_unlock(&g_devlist_mutex);
1536 			return ch;
1537 		}
1538 	}
1539 
1540 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1541 	if (ch == NULL) {
1542 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1543 		pthread_mutex_unlock(&g_devlist_mutex);
1544 		return NULL;
1545 	}
1546 
1547 	ch->dev = dev;
1548 	ch->destroy_cb = dev->destroy_cb;
1549 	ch->thread = thread;
1550 	ch->ref = 1;
1551 	ch->destroy_ref = 0;
1552 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1553 
1554 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1555 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1556 
1557 	dev->refcnt++;
1558 
1559 	pthread_mutex_unlock(&g_devlist_mutex);
1560 
1561 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1562 	if (rc != 0) {
1563 		pthread_mutex_lock(&g_devlist_mutex);
1564 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1565 		dev->refcnt--;
1566 		free(ch);
1567 		pthread_mutex_unlock(&g_devlist_mutex);
1568 		return NULL;
1569 	}
1570 
1571 	return ch;
1572 }
1573 
1574 static void
1575 put_io_channel(void *arg)
1576 {
1577 	struct spdk_io_channel *ch = arg;
1578 	bool do_remove_dev = true;
1579 	struct spdk_thread *thread;
1580 
1581 	thread = spdk_get_thread();
1582 	if (!thread) {
1583 		SPDK_ERRLOG("called from non-SPDK thread\n");
1584 		assert(false);
1585 		return;
1586 	}
1587 
1588 	SPDK_DEBUGLOG(thread,
1589 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
1590 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
1591 
1592 	assert(ch->thread == thread);
1593 
1594 	ch->destroy_ref--;
1595 
1596 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1597 		/*
1598 		 * Another reference to the associated io_device was requested
1599 		 *  after this message was sent but before it had a chance to
1600 		 *  execute.
1601 		 */
1602 		return;
1603 	}
1604 
1605 	pthread_mutex_lock(&g_devlist_mutex);
1606 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1607 	pthread_mutex_unlock(&g_devlist_mutex);
1608 
1609 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1610 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1611 
1612 	pthread_mutex_lock(&g_devlist_mutex);
1613 	ch->dev->refcnt--;
1614 
1615 	if (!ch->dev->unregistered) {
1616 		do_remove_dev = false;
1617 	}
1618 
1619 	if (ch->dev->refcnt > 0) {
1620 		do_remove_dev = false;
1621 	}
1622 
1623 	pthread_mutex_unlock(&g_devlist_mutex);
1624 
1625 	if (do_remove_dev) {
1626 		io_device_free(ch->dev);
1627 	}
1628 	free(ch);
1629 }
1630 
1631 void
1632 spdk_put_io_channel(struct spdk_io_channel *ch)
1633 {
1634 	struct spdk_thread *thread;
1635 	int rc __attribute__((unused));
1636 
1637 	thread = spdk_get_thread();
1638 	if (!thread) {
1639 		SPDK_ERRLOG("called from non-SPDK thread\n");
1640 		assert(false);
1641 		return;
1642 	}
1643 
1644 	if (ch->thread != thread) {
1645 		SPDK_ERRLOG("different from the thread that called get_io_channel()\n");
1646 		assert(false);
1647 		return;
1648 	}
1649 
1650 	SPDK_DEBUGLOG(thread,
1651 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1652 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
1653 
1654 	ch->ref--;
1655 
1656 	if (ch->ref == 0) {
1657 		ch->destroy_ref++;
1658 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
1659 		assert(rc == 0);
1660 	}
1661 }
1662 
1663 struct spdk_io_channel *
1664 spdk_io_channel_from_ctx(void *ctx)
1665 {
1666 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1667 }
1668 
1669 struct spdk_thread *
1670 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1671 {
1672 	return ch->thread;
1673 }
1674 
1675 void *
1676 spdk_io_channel_get_io_device(struct spdk_io_channel *ch)
1677 {
1678 	return ch->dev->io_device;
1679 }
1680 
1681 struct spdk_io_channel_iter {
1682 	void *io_device;
1683 	struct io_device *dev;
1684 	spdk_channel_msg fn;
1685 	int status;
1686 	void *ctx;
1687 	struct spdk_io_channel *ch;
1688 
1689 	struct spdk_thread *cur_thread;
1690 
1691 	struct spdk_thread *orig_thread;
1692 	spdk_channel_for_each_cpl cpl;
1693 };
1694 
1695 void *
1696 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1697 {
1698 	return i->io_device;
1699 }
1700 
1701 struct spdk_io_channel *
1702 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1703 {
1704 	return i->ch;
1705 }
1706 
1707 void *
1708 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1709 {
1710 	return i->ctx;
1711 }
1712 
1713 static void
1714 _call_completion(void *ctx)
1715 {
1716 	struct spdk_io_channel_iter *i = ctx;
1717 
1718 	if (i->cpl != NULL) {
1719 		i->cpl(i, i->status);
1720 	}
1721 	free(i);
1722 }
1723 
1724 static void
1725 _call_channel(void *ctx)
1726 {
1727 	struct spdk_io_channel_iter *i = ctx;
1728 	struct spdk_io_channel *ch;
1729 
1730 	/*
1731 	 * It is possible that the channel was deleted before this
1732 	 *  message had a chance to execute.  If so, skip calling
1733 	 *  the fn() on this thread.
1734 	 */
1735 	pthread_mutex_lock(&g_devlist_mutex);
1736 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1737 		if (ch->dev->io_device == i->io_device) {
1738 			break;
1739 		}
1740 	}
1741 	pthread_mutex_unlock(&g_devlist_mutex);
1742 
1743 	if (ch) {
1744 		i->fn(i);
1745 	} else {
1746 		spdk_for_each_channel_continue(i, 0);
1747 	}
1748 }
1749 
1750 void
1751 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1752 		      spdk_channel_for_each_cpl cpl)
1753 {
1754 	struct spdk_thread *thread;
1755 	struct spdk_io_channel *ch;
1756 	struct spdk_io_channel_iter *i;
1757 	int rc __attribute__((unused));
1758 
1759 	i = calloc(1, sizeof(*i));
1760 	if (!i) {
1761 		SPDK_ERRLOG("Unable to allocate iterator\n");
1762 		return;
1763 	}
1764 
1765 	i->io_device = io_device;
1766 	i->fn = fn;
1767 	i->ctx = ctx;
1768 	i->cpl = cpl;
1769 
1770 	pthread_mutex_lock(&g_devlist_mutex);
1771 	i->orig_thread = _get_thread();
1772 
1773 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1774 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1775 			if (ch->dev->io_device == io_device) {
1776 				ch->dev->for_each_count++;
1777 				i->dev = ch->dev;
1778 				i->cur_thread = thread;
1779 				i->ch = ch;
1780 				pthread_mutex_unlock(&g_devlist_mutex);
1781 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1782 				assert(rc == 0);
1783 				return;
1784 			}
1785 		}
1786 	}
1787 
1788 	pthread_mutex_unlock(&g_devlist_mutex);
1789 
1790 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1791 	assert(rc == 0);
1792 }
1793 
1794 void
1795 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1796 {
1797 	struct spdk_thread *thread;
1798 	struct spdk_io_channel *ch;
1799 	int rc __attribute__((unused));
1800 
1801 	assert(i->cur_thread == spdk_get_thread());
1802 
1803 	i->status = status;
1804 
1805 	pthread_mutex_lock(&g_devlist_mutex);
1806 	if (status) {
1807 		goto end;
1808 	}
1809 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1810 	while (thread) {
1811 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1812 			if (ch->dev->io_device == i->io_device) {
1813 				i->cur_thread = thread;
1814 				i->ch = ch;
1815 				pthread_mutex_unlock(&g_devlist_mutex);
1816 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1817 				assert(rc == 0);
1818 				return;
1819 			}
1820 		}
1821 		thread = TAILQ_NEXT(thread, tailq);
1822 	}
1823 
1824 end:
1825 	i->dev->for_each_count--;
1826 	i->ch = NULL;
1827 	pthread_mutex_unlock(&g_devlist_mutex);
1828 
1829 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1830 	assert(rc == 0);
1831 }
1832 
1833 struct spdk_interrupt {
1834 	int			efd;
1835 	struct spdk_thread	*thread;
1836 	char			name[SPDK_MAX_POLLER_NAME_LEN + 1];
1837 };
1838 
1839 static void
1840 thread_interrupt_destroy(struct spdk_thread *thread)
1841 {
1842 	struct spdk_fd_group *fgrp = thread->fgrp;
1843 
1844 	SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
1845 
1846 	if (thread->msg_fd < 0) {
1847 		return;
1848 	}
1849 
1850 	spdk_fd_group_remove(fgrp, thread->msg_fd);
1851 	close(thread->msg_fd);
1852 	thread->msg_fd = -1;
1853 
1854 	spdk_fd_group_destroy(fgrp);
1855 	thread->fgrp = NULL;
1856 }
1857 
1858 #ifdef __linux__
1859 static int
1860 thread_interrupt_msg_process(void *arg)
1861 {
1862 	struct spdk_thread *thread = arg;
1863 	uint32_t msg_count;
1864 	spdk_msg_fn critical_msg;
1865 	int rc = 0;
1866 	uint64_t notify = 1;
1867 
1868 	assert(spdk_interrupt_mode_is_enabled());
1869 
1870 	/* There may be race between msg_acknowledge and another producer's msg_notify,
1871 	 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
1872 	 * This can avoid msg notification missing.
1873 	 */
1874 	rc = read(thread->msg_fd, &notify, sizeof(notify));
1875 	if (rc < 0 && errno != EAGAIN) {
1876 		SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno));
1877 	}
1878 
1879 	critical_msg = thread->critical_msg;
1880 	if (spdk_unlikely(critical_msg != NULL)) {
1881 		critical_msg(NULL);
1882 		thread->critical_msg = NULL;
1883 		rc = 1;
1884 	}
1885 
1886 	msg_count = msg_queue_run_batch(thread, 0);
1887 	if (msg_count) {
1888 		rc = 1;
1889 	}
1890 
1891 	return rc;
1892 }
1893 
1894 static int
1895 thread_interrupt_create(struct spdk_thread *thread)
1896 {
1897 	int rc;
1898 
1899 	SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
1900 
1901 	rc = spdk_fd_group_create(&thread->fgrp);
1902 	if (rc) {
1903 		return rc;
1904 	}
1905 
1906 	thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1907 	if (thread->msg_fd < 0) {
1908 		rc = -errno;
1909 		spdk_fd_group_destroy(thread->fgrp);
1910 		thread->fgrp = NULL;
1911 
1912 		return rc;
1913 	}
1914 
1915 	return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread);
1916 }
1917 #else
1918 static int
1919 thread_interrupt_create(struct spdk_thread *thread)
1920 {
1921 	return -ENOTSUP;
1922 }
1923 #endif
1924 
1925 struct spdk_interrupt *
1926 spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
1927 			void *arg, const char *name)
1928 {
1929 	struct spdk_thread *thread;
1930 	struct spdk_interrupt *intr;
1931 
1932 	thread = spdk_get_thread();
1933 	if (!thread) {
1934 		assert(false);
1935 		return NULL;
1936 	}
1937 
1938 	if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
1939 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1940 		return NULL;
1941 	}
1942 
1943 	if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) {
1944 		return NULL;
1945 	}
1946 
1947 	intr = calloc(1, sizeof(*intr));
1948 	if (intr == NULL) {
1949 		SPDK_ERRLOG("Interrupt handler allocation failed\n");
1950 		return NULL;
1951 	}
1952 
1953 	if (name) {
1954 		snprintf(intr->name, sizeof(intr->name), "%s", name);
1955 	} else {
1956 		snprintf(intr->name, sizeof(intr->name), "%p", fn);
1957 	}
1958 
1959 	intr->efd = efd;
1960 	intr->thread = thread;
1961 
1962 	return intr;
1963 }
1964 
1965 void
1966 spdk_interrupt_unregister(struct spdk_interrupt **pintr)
1967 {
1968 	struct spdk_thread *thread;
1969 	struct spdk_interrupt *intr;
1970 
1971 	intr = *pintr;
1972 	if (intr == NULL) {
1973 		return;
1974 	}
1975 
1976 	*pintr = NULL;
1977 
1978 	thread = spdk_get_thread();
1979 	if (!thread) {
1980 		assert(false);
1981 		return;
1982 	}
1983 
1984 	if (intr->thread != thread) {
1985 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
1986 		assert(false);
1987 		return;
1988 	}
1989 
1990 	spdk_fd_group_remove(thread->fgrp, intr->efd);
1991 	free(intr);
1992 }
1993 
1994 int
1995 spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
1996 			       enum spdk_interrupt_event_types event_types)
1997 {
1998 	struct spdk_thread *thread;
1999 
2000 	thread = spdk_get_thread();
2001 	if (!thread) {
2002 		assert(false);
2003 		return -EINVAL;
2004 	}
2005 
2006 	if (intr->thread != thread) {
2007 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
2008 		assert(false);
2009 		return -EINVAL;
2010 	}
2011 
2012 	return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
2013 }
2014 
2015 int
2016 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
2017 {
2018 	return spdk_fd_group_get_fd(thread->fgrp);
2019 }
2020 
2021 static bool g_interrupt_mode = false;
2022 
2023 int
2024 spdk_interrupt_mode_enable(void)
2025 {
2026 	/* It must be called once prior to initializing the threading library.
2027 	 * g_spdk_msg_mempool will be valid if thread library is initialized.
2028 	 */
2029 	if (g_spdk_msg_mempool) {
2030 		SPDK_ERRLOG("Failed due to threading library is already initailzied.\n");
2031 		return -1;
2032 	}
2033 
2034 #ifdef __linux__
2035 	SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
2036 	g_interrupt_mode = true;
2037 	return 0;
2038 #else
2039 	SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
2040 	g_interrupt_mode = false;
2041 	return -ENOTSUP;
2042 #endif
2043 }
2044 
2045 bool
2046 spdk_interrupt_mode_is_enabled(void)
2047 {
2048 	return g_interrupt_mode;
2049 }
2050 
2051 SPDK_LOG_REGISTER_COMPONENT(thread)
2052