xref: /spdk/lib/thread/thread.c (revision 0ed85362c8132a2d1927757fbcade66b6660d26a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/queue.h"
39 #include "spdk/string.h"
40 #include "spdk/thread.h"
41 #include "spdk/util.h"
42 
43 #include "spdk_internal/log.h"
44 #include "spdk_internal/thread.h"
45 
46 #define SPDK_MSG_BATCH_SIZE		8
47 #define SPDK_MAX_DEVICE_NAME_LEN	256
48 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
49 
50 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
51 
52 static spdk_new_thread_fn g_new_thread_fn = NULL;
53 static spdk_thread_op_fn g_thread_op_fn = NULL;
54 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
55 static size_t g_ctx_sz = 0;
56 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
57  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
58  * SPDK application is required.
59  */
60 static uint64_t g_thread_id = 1;
61 
62 struct io_device {
63 	void				*io_device;
64 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
65 	spdk_io_channel_create_cb	create_cb;
66 	spdk_io_channel_destroy_cb	destroy_cb;
67 	spdk_io_device_unregister_cb	unregister_cb;
68 	struct spdk_thread		*unregister_thread;
69 	uint32_t			ctx_size;
70 	uint32_t			for_each_count;
71 	TAILQ_ENTRY(io_device)		tailq;
72 
73 	uint32_t			refcnt;
74 
75 	bool				unregistered;
76 };
77 
78 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
79 
80 struct spdk_msg {
81 	spdk_msg_fn		fn;
82 	void			*arg;
83 
84 	SLIST_ENTRY(spdk_msg)	link;
85 };
86 
87 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
88 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
89 
90 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
91 static uint32_t g_thread_count = 0;
92 
93 static __thread struct spdk_thread *tls_thread = NULL;
94 
95 static inline struct spdk_thread *
96 _get_thread(void)
97 {
98 	return tls_thread;
99 }
100 
101 static int
102 _thread_lib_init(size_t ctx_sz)
103 {
104 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
105 
106 	g_ctx_sz = ctx_sz;
107 
108 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
109 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
110 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
111 			     sizeof(struct spdk_msg),
112 			     0, /* No cache. We do our own. */
113 			     SPDK_ENV_SOCKET_ID_ANY);
114 
115 	if (!g_spdk_msg_mempool) {
116 		return -1;
117 	}
118 
119 	return 0;
120 }
121 
122 int
123 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
124 {
125 	assert(g_new_thread_fn == NULL);
126 	assert(g_thread_op_fn == NULL);
127 
128 	if (new_thread_fn == NULL) {
129 		SPDK_INFOLOG(SPDK_LOG_THREAD, "new_thread_fn was not specified at spdk_thread_lib_init\n");
130 	} else {
131 		g_new_thread_fn = new_thread_fn;
132 	}
133 
134 	return _thread_lib_init(ctx_sz);
135 }
136 
137 int
138 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
139 			 spdk_thread_op_supported_fn thread_op_supported_fn,
140 			 size_t ctx_sz)
141 {
142 	assert(g_new_thread_fn == NULL);
143 	assert(g_thread_op_fn == NULL);
144 	assert(g_thread_op_supported_fn == NULL);
145 
146 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
147 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
148 		return -EINVAL;
149 	}
150 
151 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
152 		SPDK_INFOLOG(SPDK_LOG_THREAD, "thread_op_fn and thread_op_supported_fn were not specified\n");
153 	} else {
154 		g_thread_op_fn = thread_op_fn;
155 		g_thread_op_supported_fn = thread_op_supported_fn;
156 	}
157 
158 	return _thread_lib_init(ctx_sz);
159 }
160 
161 void
162 spdk_thread_lib_fini(void)
163 {
164 	struct io_device *dev;
165 
166 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
167 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
168 	}
169 
170 	if (g_spdk_msg_mempool) {
171 		spdk_mempool_free(g_spdk_msg_mempool);
172 		g_spdk_msg_mempool = NULL;
173 	}
174 
175 	g_new_thread_fn = NULL;
176 	g_thread_op_fn = NULL;
177 	g_thread_op_supported_fn = NULL;
178 	g_ctx_sz = 0;
179 }
180 
181 static void
182 _free_thread(struct spdk_thread *thread)
183 {
184 	struct spdk_io_channel *ch;
185 	struct spdk_msg *msg;
186 	struct spdk_poller *poller, *ptmp;
187 
188 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
189 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
190 			    thread->name, ch->dev->name);
191 	}
192 
193 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
194 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
195 			SPDK_WARNLOG("poller %s still registered at thread exit\n",
196 				     poller->name);
197 		}
198 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
199 		free(poller);
200 	}
201 
202 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) {
203 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
204 			SPDK_WARNLOG("poller %s still registered at thread exit\n",
205 				     poller->name);
206 		}
207 		TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
208 		free(poller);
209 	}
210 
211 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
212 		SPDK_WARNLOG("poller %s still registered at thread exit\n", poller->name);
213 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
214 		free(poller);
215 	}
216 
217 	pthread_mutex_lock(&g_devlist_mutex);
218 	assert(g_thread_count > 0);
219 	g_thread_count--;
220 	TAILQ_REMOVE(&g_threads, thread, tailq);
221 	pthread_mutex_unlock(&g_devlist_mutex);
222 
223 	msg = SLIST_FIRST(&thread->msg_cache);
224 	while (msg != NULL) {
225 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
226 
227 		assert(thread->msg_cache_count > 0);
228 		thread->msg_cache_count--;
229 		spdk_mempool_put(g_spdk_msg_mempool, msg);
230 
231 		msg = SLIST_FIRST(&thread->msg_cache);
232 	}
233 
234 	assert(thread->msg_cache_count == 0);
235 
236 	spdk_ring_free(thread->messages);
237 	free(thread);
238 }
239 
240 struct spdk_thread *
241 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
242 {
243 	struct spdk_thread *thread;
244 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
245 	int rc = 0, i;
246 
247 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
248 	if (!thread) {
249 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
250 		return NULL;
251 	}
252 
253 	if (cpumask) {
254 		spdk_cpuset_copy(&thread->cpumask, cpumask);
255 	} else {
256 		spdk_cpuset_negate(&thread->cpumask);
257 	}
258 
259 	TAILQ_INIT(&thread->io_channels);
260 	TAILQ_INIT(&thread->active_pollers);
261 	TAILQ_INIT(&thread->timed_pollers);
262 	TAILQ_INIT(&thread->paused_pollers);
263 	SLIST_INIT(&thread->msg_cache);
264 	thread->msg_cache_count = 0;
265 
266 	thread->tsc_last = spdk_get_ticks();
267 
268 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
269 	if (!thread->messages) {
270 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
271 		free(thread);
272 		return NULL;
273 	}
274 
275 	/* Fill the local message pool cache. */
276 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
277 	if (rc == 0) {
278 		/* If we can't populate the cache it's ok. The cache will get filled
279 		 * up organically as messages are passed to the thread. */
280 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
281 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
282 			thread->msg_cache_count++;
283 		}
284 	}
285 
286 	if (name) {
287 		snprintf(thread->name, sizeof(thread->name), "%s", name);
288 	} else {
289 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
290 	}
291 
292 	pthread_mutex_lock(&g_devlist_mutex);
293 	if (g_thread_id == 0) {
294 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
295 		pthread_mutex_unlock(&g_devlist_mutex);
296 		_free_thread(thread);
297 		return NULL;
298 	}
299 	thread->id = g_thread_id++;
300 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
301 	g_thread_count++;
302 	pthread_mutex_unlock(&g_devlist_mutex);
303 
304 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread (%" PRIu64 ", %s)\n",
305 		      thread->id, thread->name);
306 
307 	if (g_new_thread_fn) {
308 		rc = g_new_thread_fn(thread);
309 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
310 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
311 	}
312 
313 	if (rc != 0) {
314 		_free_thread(thread);
315 		return NULL;
316 	}
317 
318 	thread->state = SPDK_THREAD_STATE_RUNNING;
319 
320 	return thread;
321 }
322 
323 void
324 spdk_set_thread(struct spdk_thread *thread)
325 {
326 	tls_thread = thread;
327 }
328 
329 static void
330 thread_exit(struct spdk_thread *thread, uint64_t now)
331 {
332 	struct spdk_poller *poller;
333 	struct spdk_io_channel *ch;
334 
335 	if (now >= thread->exit_timeout_tsc) {
336 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
337 			    thread->name);
338 		goto exited;
339 	}
340 
341 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
342 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
343 			SPDK_INFOLOG(SPDK_LOG_THREAD,
344 				     "thread %s still has active poller %s\n",
345 				     thread->name, poller->name);
346 			return;
347 		}
348 	}
349 
350 	TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) {
351 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
352 			SPDK_INFOLOG(SPDK_LOG_THREAD,
353 				     "thread %s still has active timed poller %s\n",
354 				     thread->name, poller->name);
355 			return;
356 		}
357 	}
358 
359 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
360 		SPDK_INFOLOG(SPDK_LOG_THREAD,
361 			     "thread %s still has paused poller %s\n",
362 			     thread->name, poller->name);
363 		return;
364 	}
365 
366 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
367 		SPDK_INFOLOG(SPDK_LOG_THREAD,
368 			     "thread %s still has channel for io_device %s\n",
369 			     thread->name, ch->dev->name);
370 		return;
371 	}
372 
373 exited:
374 	thread->state = SPDK_THREAD_STATE_EXITED;
375 }
376 
377 int
378 spdk_thread_exit(struct spdk_thread *thread)
379 {
380 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name);
381 
382 	assert(tls_thread == thread);
383 
384 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
385 		SPDK_INFOLOG(SPDK_LOG_THREAD,
386 			     "thread %s is already exiting\n",
387 			     thread->name);
388 		return 0;
389 	}
390 
391 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
392 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
393 	thread->state = SPDK_THREAD_STATE_EXITING;
394 	return 0;
395 }
396 
397 bool
398 spdk_thread_is_exited(struct spdk_thread *thread)
399 {
400 	return thread->state == SPDK_THREAD_STATE_EXITED;
401 }
402 
403 void
404 spdk_thread_destroy(struct spdk_thread *thread)
405 {
406 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name);
407 
408 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
409 
410 	if (tls_thread == thread) {
411 		tls_thread = NULL;
412 	}
413 
414 	_free_thread(thread);
415 }
416 
417 void *
418 spdk_thread_get_ctx(struct spdk_thread *thread)
419 {
420 	if (g_ctx_sz > 0) {
421 		return thread->ctx;
422 	}
423 
424 	return NULL;
425 }
426 
427 struct spdk_cpuset *
428 spdk_thread_get_cpumask(struct spdk_thread *thread)
429 {
430 	return &thread->cpumask;
431 }
432 
433 int
434 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
435 {
436 	struct spdk_thread *thread;
437 
438 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
439 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
440 		assert(false);
441 		return -ENOTSUP;
442 	}
443 
444 	thread = spdk_get_thread();
445 	if (!thread) {
446 		SPDK_ERRLOG("Called from non-SPDK thread\n");
447 		assert(false);
448 		return -EINVAL;
449 	}
450 
451 	spdk_cpuset_copy(&thread->cpumask, cpumask);
452 
453 	/* Invoke framework's reschedule operation. If this function is called multiple times
454 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
455 	 * reschedule operation.
456 	 */
457 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
458 
459 	return 0;
460 }
461 
462 struct spdk_thread *
463 spdk_thread_get_from_ctx(void *ctx)
464 {
465 	if (ctx == NULL) {
466 		assert(false);
467 		return NULL;
468 	}
469 
470 	assert(g_ctx_sz > 0);
471 
472 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
473 }
474 
475 static inline uint32_t
476 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
477 {
478 	unsigned count, i;
479 	void *messages[SPDK_MSG_BATCH_SIZE];
480 
481 #ifdef DEBUG
482 	/*
483 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
484 	 * so we will never actually read uninitialized data from events, but just to be sure
485 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
486 	 */
487 	memset(messages, 0, sizeof(messages));
488 #endif
489 
490 	if (max_msgs > 0) {
491 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
492 	} else {
493 		max_msgs = SPDK_MSG_BATCH_SIZE;
494 	}
495 
496 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
497 	if (count == 0) {
498 		return 0;
499 	}
500 
501 	for (i = 0; i < count; i++) {
502 		struct spdk_msg *msg = messages[i];
503 
504 		assert(msg != NULL);
505 		msg->fn(msg->arg);
506 
507 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
508 			/* Insert the messages at the head. We want to re-use the hot
509 			 * ones. */
510 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
511 			thread->msg_cache_count++;
512 		} else {
513 			spdk_mempool_put(g_spdk_msg_mempool, msg);
514 		}
515 	}
516 
517 	return count;
518 }
519 
520 static void
521 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
522 {
523 	struct spdk_poller *iter;
524 
525 	poller->next_run_tick = now + poller->period_ticks;
526 
527 	/*
528 	 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled
529 	 * run time.
530 	 */
531 	TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) {
532 		if (iter->next_run_tick <= poller->next_run_tick) {
533 			TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq);
534 			return;
535 		}
536 	}
537 
538 	/* No earlier pollers were found, so this poller must be the new head */
539 	TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq);
540 }
541 
542 static void
543 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
544 {
545 	if (poller->period_ticks) {
546 		poller_insert_timer(thread, poller, spdk_get_ticks());
547 	} else {
548 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
549 	}
550 }
551 
552 static inline void
553 thread_update_stats(struct spdk_thread *thread, uint64_t end,
554 		    uint64_t start, int rc)
555 {
556 	if (rc == 0) {
557 		/* Poller status idle */
558 		thread->stats.idle_tsc += end - start;
559 	} else if (rc > 0) {
560 		/* Poller status busy */
561 		thread->stats.busy_tsc += end - start;
562 	}
563 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
564 	thread->tsc_last = end;
565 }
566 
567 static int
568 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
569 {
570 	uint32_t msg_count;
571 	struct spdk_poller *poller, *tmp;
572 	spdk_msg_fn critical_msg;
573 	int rc = 0;
574 
575 	critical_msg = thread->critical_msg;
576 	if (spdk_unlikely(critical_msg != NULL)) {
577 		critical_msg(NULL);
578 		thread->critical_msg = NULL;
579 	}
580 
581 	msg_count = msg_queue_run_batch(thread, max_msgs);
582 	if (msg_count) {
583 		rc = 1;
584 	}
585 
586 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
587 				   active_pollers_head, tailq, tmp) {
588 		int poller_rc;
589 
590 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
591 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
592 			free(poller);
593 			continue;
594 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
595 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
596 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
597 			poller->state = SPDK_POLLER_STATE_PAUSED;
598 			continue;
599 		}
600 
601 		poller->state = SPDK_POLLER_STATE_RUNNING;
602 		poller_rc = poller->fn(poller->arg);
603 
604 		poller->run_count++;
605 		if (poller_rc > 0) {
606 			poller->busy_count++;
607 		}
608 
609 #ifdef DEBUG
610 		if (poller_rc == -1) {
611 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %s returned -1\n", poller->name);
612 		}
613 #endif
614 
615 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
616 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
617 			free(poller);
618 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
619 			poller->state = SPDK_POLLER_STATE_WAITING;
620 		}
621 
622 		if (poller_rc > rc) {
623 			rc = poller_rc;
624 		}
625 	}
626 
627 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) {
628 		int timer_rc = 0;
629 
630 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
631 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
632 			free(poller);
633 			continue;
634 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
635 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
636 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
637 			poller->state = SPDK_POLLER_STATE_PAUSED;
638 			continue;
639 		}
640 
641 		if (now < poller->next_run_tick) {
642 			break;
643 		}
644 
645 		poller->state = SPDK_POLLER_STATE_RUNNING;
646 		timer_rc = poller->fn(poller->arg);
647 
648 		poller->run_count++;
649 		if (timer_rc > 0) {
650 			poller->busy_count++;
651 		}
652 
653 #ifdef DEBUG
654 		if (timer_rc == -1) {
655 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %s returned -1\n", poller->name);
656 		}
657 #endif
658 
659 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
660 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
661 			free(poller);
662 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
663 			poller->state = SPDK_POLLER_STATE_WAITING;
664 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
665 			poller_insert_timer(thread, poller, now);
666 		}
667 
668 		if (timer_rc > rc) {
669 			rc = timer_rc;
670 		}
671 	}
672 
673 	return rc;
674 }
675 
676 int
677 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
678 {
679 	struct spdk_thread *orig_thread;
680 	int rc;
681 
682 	orig_thread = _get_thread();
683 	tls_thread = thread;
684 
685 	if (now == 0) {
686 		now = spdk_get_ticks();
687 	}
688 
689 	rc = thread_poll(thread, max_msgs, now);
690 
691 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
692 		thread_exit(thread, now);
693 	}
694 
695 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
696 
697 	tls_thread = orig_thread;
698 
699 	return rc;
700 }
701 
702 uint64_t
703 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
704 {
705 	struct spdk_poller *poller;
706 
707 	poller = TAILQ_FIRST(&thread->timed_pollers);
708 	if (poller) {
709 		return poller->next_run_tick;
710 	}
711 
712 	return 0;
713 }
714 
715 int
716 spdk_thread_has_active_pollers(struct spdk_thread *thread)
717 {
718 	return !TAILQ_EMPTY(&thread->active_pollers);
719 }
720 
721 static bool
722 thread_has_unpaused_pollers(struct spdk_thread *thread)
723 {
724 	if (TAILQ_EMPTY(&thread->active_pollers) &&
725 	    TAILQ_EMPTY(&thread->timed_pollers)) {
726 		return false;
727 	}
728 
729 	return true;
730 }
731 
732 bool
733 spdk_thread_has_pollers(struct spdk_thread *thread)
734 {
735 	if (!thread_has_unpaused_pollers(thread) &&
736 	    TAILQ_EMPTY(&thread->paused_pollers)) {
737 		return false;
738 	}
739 
740 	return true;
741 }
742 
743 bool
744 spdk_thread_is_idle(struct spdk_thread *thread)
745 {
746 	if (spdk_ring_count(thread->messages) ||
747 	    thread_has_unpaused_pollers(thread) ||
748 	    thread->critical_msg != NULL) {
749 		return false;
750 	}
751 
752 	return true;
753 }
754 
755 uint32_t
756 spdk_thread_get_count(void)
757 {
758 	/*
759 	 * Return cached value of the current thread count.  We could acquire the
760 	 *  lock and iterate through the TAILQ of threads to count them, but that
761 	 *  count could still be invalidated after we release the lock.
762 	 */
763 	return g_thread_count;
764 }
765 
766 struct spdk_thread *
767 spdk_get_thread(void)
768 {
769 	return _get_thread();
770 }
771 
772 const char *
773 spdk_thread_get_name(const struct spdk_thread *thread)
774 {
775 	return thread->name;
776 }
777 
778 uint64_t
779 spdk_thread_get_id(const struct spdk_thread *thread)
780 {
781 	return thread->id;
782 }
783 
784 struct spdk_thread *
785 spdk_thread_get_by_id(uint64_t id)
786 {
787 	struct spdk_thread *thread;
788 
789 	pthread_mutex_lock(&g_devlist_mutex);
790 	TAILQ_FOREACH(thread, &g_threads, tailq) {
791 		if (thread->id == id) {
792 			pthread_mutex_unlock(&g_devlist_mutex);
793 
794 			return thread;
795 		}
796 	}
797 	pthread_mutex_unlock(&g_devlist_mutex);
798 
799 	return NULL;
800 }
801 
802 int
803 spdk_thread_get_stats(struct spdk_thread_stats *stats)
804 {
805 	struct spdk_thread *thread;
806 
807 	thread = _get_thread();
808 	if (!thread) {
809 		SPDK_ERRLOG("No thread allocated\n");
810 		return -EINVAL;
811 	}
812 
813 	if (stats == NULL) {
814 		return -EINVAL;
815 	}
816 
817 	*stats = thread->stats;
818 
819 	return 0;
820 }
821 
822 uint64_t
823 spdk_thread_get_last_tsc(struct spdk_thread *thread)
824 {
825 	return thread->tsc_last;
826 }
827 
828 int
829 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
830 {
831 	struct spdk_thread *local_thread;
832 	struct spdk_msg *msg;
833 	int rc;
834 
835 	assert(thread != NULL);
836 
837 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
838 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
839 		return -EIO;
840 	}
841 
842 	local_thread = _get_thread();
843 
844 	msg = NULL;
845 	if (local_thread != NULL) {
846 		if (local_thread->msg_cache_count > 0) {
847 			msg = SLIST_FIRST(&local_thread->msg_cache);
848 			assert(msg != NULL);
849 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
850 			local_thread->msg_cache_count--;
851 		}
852 	}
853 
854 	if (msg == NULL) {
855 		msg = spdk_mempool_get(g_spdk_msg_mempool);
856 		if (!msg) {
857 			SPDK_ERRLOG("msg could not be allocated\n");
858 			return -ENOMEM;
859 		}
860 	}
861 
862 	msg->fn = fn;
863 	msg->arg = ctx;
864 
865 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
866 	if (rc != 1) {
867 		SPDK_ERRLOG("msg could not be enqueued\n");
868 		spdk_mempool_put(g_spdk_msg_mempool, msg);
869 		return -EIO;
870 	}
871 
872 	return 0;
873 }
874 
875 int
876 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
877 {
878 	spdk_msg_fn expected = NULL;
879 
880 	if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
881 					__ATOMIC_SEQ_CST)) {
882 		return 0;
883 	}
884 
885 	return -EIO;
886 }
887 
888 static struct spdk_poller *
889 poller_register(spdk_poller_fn fn,
890 		void *arg,
891 		uint64_t period_microseconds,
892 		const char *name)
893 {
894 	struct spdk_thread *thread;
895 	struct spdk_poller *poller;
896 	uint64_t quotient, remainder, ticks;
897 
898 	thread = spdk_get_thread();
899 	if (!thread) {
900 		assert(false);
901 		return NULL;
902 	}
903 
904 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
905 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
906 		return NULL;
907 	}
908 
909 	poller = calloc(1, sizeof(*poller));
910 	if (poller == NULL) {
911 		SPDK_ERRLOG("Poller memory allocation failed\n");
912 		return NULL;
913 	}
914 
915 	if (name) {
916 		snprintf(poller->name, sizeof(poller->name), "%s", name);
917 	} else {
918 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
919 	}
920 
921 	poller->state = SPDK_POLLER_STATE_WAITING;
922 	poller->fn = fn;
923 	poller->arg = arg;
924 	poller->thread = thread;
925 
926 	if (period_microseconds) {
927 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
928 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
929 		ticks = spdk_get_ticks_hz();
930 
931 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
932 	} else {
933 		poller->period_ticks = 0;
934 	}
935 
936 	thread_insert_poller(thread, poller);
937 
938 	return poller;
939 }
940 
941 struct spdk_poller *
942 spdk_poller_register(spdk_poller_fn fn,
943 		     void *arg,
944 		     uint64_t period_microseconds)
945 {
946 	return poller_register(fn, arg, period_microseconds, NULL);
947 }
948 
949 struct spdk_poller *
950 spdk_poller_register_named(spdk_poller_fn fn,
951 			   void *arg,
952 			   uint64_t period_microseconds,
953 			   const char *name)
954 {
955 	return poller_register(fn, arg, period_microseconds, name);
956 }
957 
958 void
959 spdk_poller_unregister(struct spdk_poller **ppoller)
960 {
961 	struct spdk_thread *thread;
962 	struct spdk_poller *poller;
963 
964 	poller = *ppoller;
965 	if (poller == NULL) {
966 		return;
967 	}
968 
969 	*ppoller = NULL;
970 
971 	thread = spdk_get_thread();
972 	if (!thread) {
973 		assert(false);
974 		return;
975 	}
976 
977 	if (poller->thread != thread) {
978 		SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n");
979 		assert(false);
980 		return;
981 	}
982 
983 	/* If the poller was paused, put it on the active_pollers list so that
984 	 * its unregistration can be processed by spdk_thread_poll().
985 	 */
986 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
987 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
988 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
989 		poller->period_ticks = 0;
990 	}
991 
992 	/* Simply set the state to unregistered. The poller will get cleaned up
993 	 * in a subsequent call to spdk_thread_poll().
994 	 */
995 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
996 }
997 
998 void
999 spdk_poller_pause(struct spdk_poller *poller)
1000 {
1001 	struct spdk_thread *thread;
1002 
1003 	if (poller->state == SPDK_POLLER_STATE_PAUSED ||
1004 	    poller->state == SPDK_POLLER_STATE_PAUSING) {
1005 		return;
1006 	}
1007 
1008 	thread = spdk_get_thread();
1009 	if (!thread) {
1010 		assert(false);
1011 		return;
1012 	}
1013 
1014 	/* If a poller is paused from within itself, we can immediately move it
1015 	 * on the paused_pollers list.  Otherwise we just set its state to
1016 	 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it.  It
1017 	 * allows a poller to be paused from another one's context without
1018 	 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration.
1019 	 */
1020 	if (poller->state != SPDK_POLLER_STATE_RUNNING) {
1021 		poller->state = SPDK_POLLER_STATE_PAUSING;
1022 	} else {
1023 		if (poller->period_ticks > 0) {
1024 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
1025 		} else {
1026 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1027 		}
1028 
1029 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1030 		poller->state = SPDK_POLLER_STATE_PAUSED;
1031 	}
1032 }
1033 
1034 void
1035 spdk_poller_resume(struct spdk_poller *poller)
1036 {
1037 	struct spdk_thread *thread;
1038 
1039 	if (poller->state != SPDK_POLLER_STATE_PAUSED &&
1040 	    poller->state != SPDK_POLLER_STATE_PAUSING) {
1041 		return;
1042 	}
1043 
1044 	thread = spdk_get_thread();
1045 	if (!thread) {
1046 		assert(false);
1047 		return;
1048 	}
1049 
1050 	/* If a poller is paused it has to be removed from the paused pollers
1051 	 * list and put on the active / timer list depending on its
1052 	 * period_ticks.  If a poller is still in the process of being paused,
1053 	 * we just need to flip its state back to waiting, as it's already on
1054 	 * the appropriate list.
1055 	 */
1056 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1057 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1058 		thread_insert_poller(thread, poller);
1059 	}
1060 
1061 	poller->state = SPDK_POLLER_STATE_WAITING;
1062 }
1063 
1064 const char *
1065 spdk_poller_state_str(enum spdk_poller_state state)
1066 {
1067 	switch (state) {
1068 	case SPDK_POLLER_STATE_WAITING:
1069 		return "waiting";
1070 	case SPDK_POLLER_STATE_RUNNING:
1071 		return "running";
1072 	case SPDK_POLLER_STATE_UNREGISTERED:
1073 		return "unregistered";
1074 	case SPDK_POLLER_STATE_PAUSING:
1075 		return "pausing";
1076 	case SPDK_POLLER_STATE_PAUSED:
1077 		return "paused";
1078 	default:
1079 		return NULL;
1080 	}
1081 }
1082 
1083 struct call_thread {
1084 	struct spdk_thread *cur_thread;
1085 	spdk_msg_fn fn;
1086 	void *ctx;
1087 
1088 	struct spdk_thread *orig_thread;
1089 	spdk_msg_fn cpl;
1090 };
1091 
1092 static void
1093 _on_thread(void *ctx)
1094 {
1095 	struct call_thread *ct = ctx;
1096 	int rc __attribute__((unused));
1097 
1098 	ct->fn(ct->ctx);
1099 
1100 	pthread_mutex_lock(&g_devlist_mutex);
1101 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1102 	pthread_mutex_unlock(&g_devlist_mutex);
1103 
1104 	if (!ct->cur_thread) {
1105 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
1106 
1107 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1108 		free(ctx);
1109 	} else {
1110 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
1111 			      ct->cur_thread->name);
1112 
1113 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1114 	}
1115 	assert(rc == 0);
1116 }
1117 
1118 void
1119 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1120 {
1121 	struct call_thread *ct;
1122 	struct spdk_thread *thread;
1123 	int rc __attribute__((unused));
1124 
1125 	ct = calloc(1, sizeof(*ct));
1126 	if (!ct) {
1127 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1128 		cpl(ctx);
1129 		return;
1130 	}
1131 
1132 	ct->fn = fn;
1133 	ct->ctx = ctx;
1134 	ct->cpl = cpl;
1135 
1136 	thread = _get_thread();
1137 	if (!thread) {
1138 		SPDK_ERRLOG("No thread allocated\n");
1139 		free(ct);
1140 		cpl(ctx);
1141 		return;
1142 	}
1143 	ct->orig_thread = thread;
1144 
1145 	pthread_mutex_lock(&g_devlist_mutex);
1146 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1147 	pthread_mutex_unlock(&g_devlist_mutex);
1148 
1149 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
1150 		      ct->orig_thread->name);
1151 
1152 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1153 	assert(rc == 0);
1154 }
1155 
1156 void
1157 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1158 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1159 			const char *name)
1160 {
1161 	struct io_device *dev, *tmp;
1162 	struct spdk_thread *thread;
1163 
1164 	assert(io_device != NULL);
1165 	assert(create_cb != NULL);
1166 	assert(destroy_cb != NULL);
1167 
1168 	thread = spdk_get_thread();
1169 	if (!thread) {
1170 		SPDK_ERRLOG("called from non-SPDK thread\n");
1171 		assert(false);
1172 		return;
1173 	}
1174 
1175 	dev = calloc(1, sizeof(struct io_device));
1176 	if (dev == NULL) {
1177 		SPDK_ERRLOG("could not allocate io_device\n");
1178 		return;
1179 	}
1180 
1181 	dev->io_device = io_device;
1182 	if (name) {
1183 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1184 	} else {
1185 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1186 	}
1187 	dev->create_cb = create_cb;
1188 	dev->destroy_cb = destroy_cb;
1189 	dev->unregister_cb = NULL;
1190 	dev->ctx_size = ctx_size;
1191 	dev->for_each_count = 0;
1192 	dev->unregistered = false;
1193 	dev->refcnt = 0;
1194 
1195 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
1196 		      dev->name, dev->io_device, thread->name);
1197 
1198 	pthread_mutex_lock(&g_devlist_mutex);
1199 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
1200 		if (tmp->io_device == io_device) {
1201 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1202 				    io_device, tmp->name, dev->name);
1203 			free(dev);
1204 			pthread_mutex_unlock(&g_devlist_mutex);
1205 			return;
1206 		}
1207 	}
1208 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1209 	pthread_mutex_unlock(&g_devlist_mutex);
1210 }
1211 
1212 static void
1213 _finish_unregister(void *arg)
1214 {
1215 	struct io_device *dev = arg;
1216 
1217 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1218 		      dev->name, dev->io_device, dev->unregister_thread->name);
1219 
1220 	dev->unregister_cb(dev->io_device);
1221 	free(dev);
1222 }
1223 
1224 static void
1225 io_device_free(struct io_device *dev)
1226 {
1227 	int rc __attribute__((unused));
1228 
1229 	if (dev->unregister_cb == NULL) {
1230 		free(dev);
1231 	} else {
1232 		assert(dev->unregister_thread != NULL);
1233 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
1234 			      dev->name, dev->io_device, dev->unregister_thread->name);
1235 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1236 		assert(rc == 0);
1237 	}
1238 }
1239 
1240 void
1241 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1242 {
1243 	struct io_device *dev;
1244 	uint32_t refcnt;
1245 	struct spdk_thread *thread;
1246 
1247 	thread = spdk_get_thread();
1248 	if (!thread) {
1249 		SPDK_ERRLOG("called from non-SPDK thread\n");
1250 		assert(false);
1251 		return;
1252 	}
1253 
1254 	pthread_mutex_lock(&g_devlist_mutex);
1255 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1256 		if (dev->io_device == io_device) {
1257 			break;
1258 		}
1259 	}
1260 
1261 	if (!dev) {
1262 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1263 		assert(false);
1264 		pthread_mutex_unlock(&g_devlist_mutex);
1265 		return;
1266 	}
1267 
1268 	if (dev->for_each_count > 0) {
1269 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1270 			    dev->name, io_device, dev->for_each_count);
1271 		pthread_mutex_unlock(&g_devlist_mutex);
1272 		return;
1273 	}
1274 
1275 	dev->unregister_cb = unregister_cb;
1276 	dev->unregistered = true;
1277 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
1278 	refcnt = dev->refcnt;
1279 	dev->unregister_thread = thread;
1280 	pthread_mutex_unlock(&g_devlist_mutex);
1281 
1282 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
1283 		      dev->name, dev->io_device, thread->name);
1284 
1285 	if (refcnt > 0) {
1286 		/* defer deletion */
1287 		return;
1288 	}
1289 
1290 	io_device_free(dev);
1291 }
1292 
1293 const char *
1294 spdk_io_device_get_name(struct io_device *dev)
1295 {
1296 	return dev->name;
1297 }
1298 
1299 struct spdk_io_channel *
1300 spdk_get_io_channel(void *io_device)
1301 {
1302 	struct spdk_io_channel *ch;
1303 	struct spdk_thread *thread;
1304 	struct io_device *dev;
1305 	int rc;
1306 
1307 	pthread_mutex_lock(&g_devlist_mutex);
1308 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1309 		if (dev->io_device == io_device) {
1310 			break;
1311 		}
1312 	}
1313 	if (dev == NULL) {
1314 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
1315 		pthread_mutex_unlock(&g_devlist_mutex);
1316 		return NULL;
1317 	}
1318 
1319 	thread = _get_thread();
1320 	if (!thread) {
1321 		SPDK_ERRLOG("No thread allocated\n");
1322 		pthread_mutex_unlock(&g_devlist_mutex);
1323 		return NULL;
1324 	}
1325 
1326 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1327 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
1328 		pthread_mutex_unlock(&g_devlist_mutex);
1329 		return NULL;
1330 	}
1331 
1332 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1333 		if (ch->dev == dev) {
1334 			ch->ref++;
1335 
1336 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1337 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
1338 
1339 			/*
1340 			 * An I/O channel already exists for this device on this
1341 			 *  thread, so return it.
1342 			 */
1343 			pthread_mutex_unlock(&g_devlist_mutex);
1344 			return ch;
1345 		}
1346 	}
1347 
1348 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1349 	if (ch == NULL) {
1350 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1351 		pthread_mutex_unlock(&g_devlist_mutex);
1352 		return NULL;
1353 	}
1354 
1355 	ch->dev = dev;
1356 	ch->destroy_cb = dev->destroy_cb;
1357 	ch->thread = thread;
1358 	ch->ref = 1;
1359 	ch->destroy_ref = 0;
1360 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1361 
1362 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1363 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1364 
1365 	dev->refcnt++;
1366 
1367 	pthread_mutex_unlock(&g_devlist_mutex);
1368 
1369 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1370 	if (rc != 0) {
1371 		pthread_mutex_lock(&g_devlist_mutex);
1372 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1373 		dev->refcnt--;
1374 		free(ch);
1375 		pthread_mutex_unlock(&g_devlist_mutex);
1376 		return NULL;
1377 	}
1378 
1379 	return ch;
1380 }
1381 
1382 static void
1383 put_io_channel(void *arg)
1384 {
1385 	struct spdk_io_channel *ch = arg;
1386 	bool do_remove_dev = true;
1387 	struct spdk_thread *thread;
1388 
1389 	thread = spdk_get_thread();
1390 	if (!thread) {
1391 		SPDK_ERRLOG("called from non-SPDK thread\n");
1392 		assert(false);
1393 		return;
1394 	}
1395 
1396 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1397 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
1398 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
1399 
1400 	assert(ch->thread == thread);
1401 
1402 	ch->destroy_ref--;
1403 
1404 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1405 		/*
1406 		 * Another reference to the associated io_device was requested
1407 		 *  after this message was sent but before it had a chance to
1408 		 *  execute.
1409 		 */
1410 		return;
1411 	}
1412 
1413 	pthread_mutex_lock(&g_devlist_mutex);
1414 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1415 	pthread_mutex_unlock(&g_devlist_mutex);
1416 
1417 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1418 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1419 
1420 	pthread_mutex_lock(&g_devlist_mutex);
1421 	ch->dev->refcnt--;
1422 
1423 	if (!ch->dev->unregistered) {
1424 		do_remove_dev = false;
1425 	}
1426 
1427 	if (ch->dev->refcnt > 0) {
1428 		do_remove_dev = false;
1429 	}
1430 
1431 	pthread_mutex_unlock(&g_devlist_mutex);
1432 
1433 	if (do_remove_dev) {
1434 		io_device_free(ch->dev);
1435 	}
1436 	free(ch);
1437 }
1438 
1439 void
1440 spdk_put_io_channel(struct spdk_io_channel *ch)
1441 {
1442 	struct spdk_thread *thread;
1443 	int rc __attribute__((unused));
1444 
1445 	thread = spdk_get_thread();
1446 	if (!thread) {
1447 		SPDK_ERRLOG("called from non-SPDK thread\n");
1448 		assert(false);
1449 		return;
1450 	}
1451 
1452 	if (ch->thread != thread) {
1453 		SPDK_ERRLOG("different from the thread that called get_io_channel()\n");
1454 		assert(false);
1455 		return;
1456 	}
1457 
1458 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1459 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1460 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
1461 
1462 	ch->ref--;
1463 
1464 	if (ch->ref == 0) {
1465 		ch->destroy_ref++;
1466 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
1467 		assert(rc == 0);
1468 	}
1469 }
1470 
1471 struct spdk_io_channel *
1472 spdk_io_channel_from_ctx(void *ctx)
1473 {
1474 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1475 }
1476 
1477 struct spdk_thread *
1478 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1479 {
1480 	return ch->thread;
1481 }
1482 
1483 struct spdk_io_channel_iter {
1484 	void *io_device;
1485 	struct io_device *dev;
1486 	spdk_channel_msg fn;
1487 	int status;
1488 	void *ctx;
1489 	struct spdk_io_channel *ch;
1490 
1491 	struct spdk_thread *cur_thread;
1492 
1493 	struct spdk_thread *orig_thread;
1494 	spdk_channel_for_each_cpl cpl;
1495 };
1496 
1497 void *
1498 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1499 {
1500 	return i->io_device;
1501 }
1502 
1503 struct spdk_io_channel *
1504 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1505 {
1506 	return i->ch;
1507 }
1508 
1509 void *
1510 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1511 {
1512 	return i->ctx;
1513 }
1514 
1515 static void
1516 _call_completion(void *ctx)
1517 {
1518 	struct spdk_io_channel_iter *i = ctx;
1519 
1520 	if (i->cpl != NULL) {
1521 		i->cpl(i, i->status);
1522 	}
1523 	free(i);
1524 }
1525 
1526 static void
1527 _call_channel(void *ctx)
1528 {
1529 	struct spdk_io_channel_iter *i = ctx;
1530 	struct spdk_io_channel *ch;
1531 
1532 	/*
1533 	 * It is possible that the channel was deleted before this
1534 	 *  message had a chance to execute.  If so, skip calling
1535 	 *  the fn() on this thread.
1536 	 */
1537 	pthread_mutex_lock(&g_devlist_mutex);
1538 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1539 		if (ch->dev->io_device == i->io_device) {
1540 			break;
1541 		}
1542 	}
1543 	pthread_mutex_unlock(&g_devlist_mutex);
1544 
1545 	if (ch) {
1546 		i->fn(i);
1547 	} else {
1548 		spdk_for_each_channel_continue(i, 0);
1549 	}
1550 }
1551 
1552 void
1553 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1554 		      spdk_channel_for_each_cpl cpl)
1555 {
1556 	struct spdk_thread *thread;
1557 	struct spdk_io_channel *ch;
1558 	struct spdk_io_channel_iter *i;
1559 	int rc __attribute__((unused));
1560 
1561 	i = calloc(1, sizeof(*i));
1562 	if (!i) {
1563 		SPDK_ERRLOG("Unable to allocate iterator\n");
1564 		return;
1565 	}
1566 
1567 	i->io_device = io_device;
1568 	i->fn = fn;
1569 	i->ctx = ctx;
1570 	i->cpl = cpl;
1571 
1572 	pthread_mutex_lock(&g_devlist_mutex);
1573 	i->orig_thread = _get_thread();
1574 
1575 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1576 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1577 			if (ch->dev->io_device == io_device) {
1578 				ch->dev->for_each_count++;
1579 				i->dev = ch->dev;
1580 				i->cur_thread = thread;
1581 				i->ch = ch;
1582 				pthread_mutex_unlock(&g_devlist_mutex);
1583 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1584 				assert(rc == 0);
1585 				return;
1586 			}
1587 		}
1588 	}
1589 
1590 	pthread_mutex_unlock(&g_devlist_mutex);
1591 
1592 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1593 	assert(rc == 0);
1594 }
1595 
1596 void
1597 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1598 {
1599 	struct spdk_thread *thread;
1600 	struct spdk_io_channel *ch;
1601 	int rc __attribute__((unused));
1602 
1603 	assert(i->cur_thread == spdk_get_thread());
1604 
1605 	i->status = status;
1606 
1607 	pthread_mutex_lock(&g_devlist_mutex);
1608 	if (status) {
1609 		goto end;
1610 	}
1611 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1612 	while (thread) {
1613 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1614 			if (ch->dev->io_device == i->io_device) {
1615 				i->cur_thread = thread;
1616 				i->ch = ch;
1617 				pthread_mutex_unlock(&g_devlist_mutex);
1618 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1619 				assert(rc == 0);
1620 				return;
1621 			}
1622 		}
1623 		thread = TAILQ_NEXT(thread, tailq);
1624 	}
1625 
1626 end:
1627 	i->dev->for_each_count--;
1628 	i->ch = NULL;
1629 	pthread_mutex_unlock(&g_devlist_mutex);
1630 
1631 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1632 	assert(rc == 0);
1633 }
1634 
1635 
1636 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1637