xref: /spdk/lib/thread/thread.c (revision 4b8db27b2aee39a750d22d30e3f57e2a7f35f88d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/queue.h"
39 #include "spdk/string.h"
40 #include "spdk/thread.h"
41 #include "spdk/util.h"
42 
43 #include "spdk_internal/log.h"
44 #include "spdk_internal/thread.h"
45 
46 #define SPDK_MSG_BATCH_SIZE		8
47 #define SPDK_MAX_DEVICE_NAME_LEN	256
48 #define SPDK_MAX_THREAD_NAME_LEN	256
49 
50 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
51 
52 static spdk_new_thread_fn g_new_thread_fn = NULL;
53 static size_t g_ctx_sz = 0;
54 
55 struct io_device {
56 	void				*io_device;
57 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
58 	spdk_io_channel_create_cb	create_cb;
59 	spdk_io_channel_destroy_cb	destroy_cb;
60 	spdk_io_device_unregister_cb	unregister_cb;
61 	struct spdk_thread		*unregister_thread;
62 	uint32_t			ctx_size;
63 	uint32_t			for_each_count;
64 	TAILQ_ENTRY(io_device)		tailq;
65 
66 	uint32_t			refcnt;
67 
68 	bool				unregistered;
69 };
70 
71 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
72 
73 struct spdk_msg {
74 	spdk_msg_fn		fn;
75 	void			*arg;
76 
77 	SLIST_ENTRY(spdk_msg)	link;
78 };
79 
80 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
81 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
82 
83 enum spdk_poller_state {
84 	/* The poller is registered with a thread but not currently executing its fn. */
85 	SPDK_POLLER_STATE_WAITING,
86 
87 	/* The poller is currently running its fn. */
88 	SPDK_POLLER_STATE_RUNNING,
89 
90 	/* The poller was unregistered during the execution of its fn. */
91 	SPDK_POLLER_STATE_UNREGISTERED,
92 
93 	/* The poller is in the process of being paused.  It will be paused
94 	 * during the next time it's supposed to be executed.
95 	 */
96 	SPDK_POLLER_STATE_PAUSING,
97 
98 	/* The poller is registered but currently paused.  It's on the
99 	 * paused_pollers list.
100 	 */
101 	SPDK_POLLER_STATE_PAUSED,
102 };
103 
104 struct spdk_poller {
105 	TAILQ_ENTRY(spdk_poller)	tailq;
106 
107 	/* Current state of the poller; should only be accessed from the poller's thread. */
108 	enum spdk_poller_state		state;
109 
110 	uint64_t			period_ticks;
111 	uint64_t			next_run_tick;
112 	spdk_poller_fn			fn;
113 	void				*arg;
114 };
115 
116 struct spdk_thread {
117 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
118 	TAILQ_ENTRY(spdk_thread)	tailq;
119 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
120 
121 	bool				exit;
122 
123 	struct spdk_cpuset		cpumask;
124 
125 	uint64_t			tsc_last;
126 	struct spdk_thread_stats	stats;
127 
128 	/*
129 	 * Contains pollers actively running on this thread.  Pollers
130 	 *  are run round-robin. The thread takes one poller from the head
131 	 *  of the ring, executes it, then puts it back at the tail of
132 	 *  the ring.
133 	 */
134 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
135 
136 	/**
137 	 * Contains pollers running on this thread with a periodic timer.
138 	 */
139 	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;
140 
141 	/*
142 	 * Contains paused pollers.  Pollers on this queue are waiting until
143 	 * they are resumed (in which case they're put onto the active/timer
144 	 * queues) or unregistered.
145 	 */
146 	TAILQ_HEAD(paused_pollers_head, spdk_poller)	paused_pollers;
147 
148 	struct spdk_ring		*messages;
149 
150 	SLIST_HEAD(, spdk_msg)		msg_cache;
151 	size_t				msg_cache_count;
152 
153 	spdk_msg_fn			critical_msg;
154 
155 	/* User context allocated at the end */
156 	uint8_t				ctx[0];
157 };
158 
159 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
160 static uint32_t g_thread_count = 0;
161 
162 static __thread struct spdk_thread *tls_thread = NULL;
163 
164 static inline struct spdk_thread *
165 _get_thread(void)
166 {
167 	return tls_thread;
168 }
169 
170 int
171 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
172 {
173 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
174 
175 	assert(g_new_thread_fn == NULL);
176 	g_new_thread_fn = new_thread_fn;
177 
178 	g_ctx_sz = ctx_sz;
179 
180 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
181 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
182 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
183 			     sizeof(struct spdk_msg),
184 			     0, /* No cache. We do our own. */
185 			     SPDK_ENV_SOCKET_ID_ANY);
186 
187 	if (!g_spdk_msg_mempool) {
188 		return -1;
189 	}
190 
191 	return 0;
192 }
193 
194 void
195 spdk_thread_lib_fini(void)
196 {
197 	struct io_device *dev;
198 
199 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
200 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
201 	}
202 
203 	if (g_spdk_msg_mempool) {
204 		spdk_mempool_free(g_spdk_msg_mempool);
205 		g_spdk_msg_mempool = NULL;
206 	}
207 
208 	g_new_thread_fn = NULL;
209 	g_ctx_sz = 0;
210 }
211 
212 static void
213 _free_thread(struct spdk_thread *thread)
214 {
215 	struct spdk_io_channel *ch;
216 	struct spdk_msg *msg;
217 	struct spdk_poller *poller, *ptmp;
218 
219 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
220 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
221 			    thread->name, ch->dev->name);
222 	}
223 
224 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
225 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
226 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
227 				     poller);
228 		}
229 
230 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
231 		free(poller);
232 	}
233 
234 
235 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
236 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
237 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
238 				     poller);
239 		}
240 
241 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
242 		free(poller);
243 	}
244 
245 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
246 		SPDK_WARNLOG("poller %p still registered at thread exit\n", poller);
247 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
248 		free(poller);
249 	}
250 
251 	pthread_mutex_lock(&g_devlist_mutex);
252 	assert(g_thread_count > 0);
253 	g_thread_count--;
254 	TAILQ_REMOVE(&g_threads, thread, tailq);
255 	pthread_mutex_unlock(&g_devlist_mutex);
256 
257 	msg = SLIST_FIRST(&thread->msg_cache);
258 	while (msg != NULL) {
259 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
260 
261 		assert(thread->msg_cache_count > 0);
262 		thread->msg_cache_count--;
263 		spdk_mempool_put(g_spdk_msg_mempool, msg);
264 
265 		msg = SLIST_FIRST(&thread->msg_cache);
266 	}
267 
268 	assert(thread->msg_cache_count == 0);
269 
270 	spdk_ring_free(thread->messages);
271 	free(thread);
272 }
273 
274 struct spdk_thread *
275 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
276 {
277 	struct spdk_thread *thread;
278 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
279 	int rc, i;
280 
281 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
282 	if (!thread) {
283 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
284 		return NULL;
285 	}
286 
287 	if (cpumask) {
288 		spdk_cpuset_copy(&thread->cpumask, cpumask);
289 	} else {
290 		spdk_cpuset_negate(&thread->cpumask);
291 	}
292 
293 	TAILQ_INIT(&thread->io_channels);
294 	TAILQ_INIT(&thread->active_pollers);
295 	TAILQ_INIT(&thread->timer_pollers);
296 	TAILQ_INIT(&thread->paused_pollers);
297 	SLIST_INIT(&thread->msg_cache);
298 	thread->msg_cache_count = 0;
299 
300 	thread->tsc_last = spdk_get_ticks();
301 
302 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
303 	if (!thread->messages) {
304 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
305 		free(thread);
306 		return NULL;
307 	}
308 
309 	/* Fill the local message pool cache. */
310 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
311 	if (rc == 0) {
312 		/* If we can't populate the cache it's ok. The cache will get filled
313 		 * up organically as messages are passed to the thread. */
314 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
315 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
316 			thread->msg_cache_count++;
317 		}
318 	}
319 
320 	if (name) {
321 		snprintf(thread->name, sizeof(thread->name), "%s", name);
322 	} else {
323 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
324 	}
325 
326 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
327 
328 	pthread_mutex_lock(&g_devlist_mutex);
329 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
330 	g_thread_count++;
331 	pthread_mutex_unlock(&g_devlist_mutex);
332 
333 	if (g_new_thread_fn) {
334 		rc = g_new_thread_fn(thread);
335 		if (rc != 0) {
336 			_free_thread(thread);
337 			return NULL;
338 		}
339 	}
340 
341 	return thread;
342 }
343 
344 void
345 spdk_set_thread(struct spdk_thread *thread)
346 {
347 	tls_thread = thread;
348 }
349 
350 void
351 spdk_thread_exit(struct spdk_thread *thread)
352 {
353 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name);
354 
355 	assert(tls_thread == thread);
356 
357 	thread->exit = true;
358 }
359 
360 void
361 spdk_thread_destroy(struct spdk_thread *thread)
362 {
363 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name);
364 
365 	assert(thread->exit == true);
366 
367 	if (tls_thread == thread) {
368 		tls_thread = NULL;
369 	}
370 
371 	_free_thread(thread);
372 }
373 
374 void *
375 spdk_thread_get_ctx(struct spdk_thread *thread)
376 {
377 	if (g_ctx_sz > 0) {
378 		return thread->ctx;
379 	}
380 
381 	return NULL;
382 }
383 
384 struct spdk_cpuset *
385 spdk_thread_get_cpumask(struct spdk_thread *thread)
386 {
387 	return &thread->cpumask;
388 }
389 
390 struct spdk_thread *
391 spdk_thread_get_from_ctx(void *ctx)
392 {
393 	if (ctx == NULL) {
394 		assert(false);
395 		return NULL;
396 	}
397 
398 	assert(g_ctx_sz > 0);
399 
400 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
401 }
402 
403 static inline uint32_t
404 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
405 {
406 	unsigned count, i;
407 	void *messages[SPDK_MSG_BATCH_SIZE];
408 
409 #ifdef DEBUG
410 	/*
411 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
412 	 * so we will never actually read uninitialized data from events, but just to be sure
413 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
414 	 */
415 	memset(messages, 0, sizeof(messages));
416 #endif
417 
418 	if (max_msgs > 0) {
419 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
420 	} else {
421 		max_msgs = SPDK_MSG_BATCH_SIZE;
422 	}
423 
424 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
425 	if (count == 0) {
426 		return 0;
427 	}
428 
429 	for (i = 0; i < count; i++) {
430 		struct spdk_msg *msg = messages[i];
431 
432 		assert(msg != NULL);
433 		msg->fn(msg->arg);
434 
435 		if (thread->exit) {
436 			break;
437 		}
438 
439 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
440 			/* Insert the messages at the head. We want to re-use the hot
441 			 * ones. */
442 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
443 			thread->msg_cache_count++;
444 		} else {
445 			spdk_mempool_put(g_spdk_msg_mempool, msg);
446 		}
447 	}
448 
449 	return count;
450 }
451 
452 static void
453 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
454 {
455 	struct spdk_poller *iter;
456 
457 	poller->next_run_tick = now + poller->period_ticks;
458 
459 	/*
460 	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
461 	 * run time.
462 	 */
463 	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
464 		if (iter->next_run_tick <= poller->next_run_tick) {
465 			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
466 			return;
467 		}
468 	}
469 
470 	/* No earlier pollers were found, so this poller must be the new head */
471 	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
472 }
473 
474 static void
475 _spdk_thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
476 {
477 	if (poller->period_ticks) {
478 		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
479 	} else {
480 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
481 	}
482 }
483 
484 int
485 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
486 {
487 	uint32_t msg_count;
488 	struct spdk_thread *orig_thread;
489 	struct spdk_poller *poller, *tmp;
490 	spdk_msg_fn critical_msg;
491 	int rc = 0;
492 
493 	orig_thread = _get_thread();
494 	tls_thread = thread;
495 
496 	if (now == 0) {
497 		now = spdk_get_ticks();
498 	}
499 
500 	critical_msg = thread->critical_msg;
501 	if (spdk_unlikely(critical_msg != NULL)) {
502 		critical_msg(NULL);
503 		thread->critical_msg = NULL;
504 	}
505 
506 	msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
507 	if (msg_count) {
508 		rc = 1;
509 	}
510 
511 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
512 				   active_pollers_head, tailq, tmp) {
513 		int poller_rc;
514 
515 		if (thread->exit) {
516 			break;
517 		}
518 
519 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
520 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
521 			free(poller);
522 			continue;
523 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
524 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
525 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
526 			poller->state = SPDK_POLLER_STATE_PAUSED;
527 			continue;
528 		}
529 
530 		poller->state = SPDK_POLLER_STATE_RUNNING;
531 		poller_rc = poller->fn(poller->arg);
532 
533 #ifdef DEBUG
534 		if (poller_rc == -1) {
535 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
536 		}
537 #endif
538 
539 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
540 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
541 			free(poller);
542 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
543 			poller->state = SPDK_POLLER_STATE_WAITING;
544 		}
545 
546 		if (poller_rc > rc) {
547 			rc = poller_rc;
548 		}
549 	}
550 
551 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
552 		int timer_rc = 0;
553 
554 		if (thread->exit) {
555 			break;
556 		}
557 
558 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
559 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
560 			free(poller);
561 			continue;
562 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
563 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
564 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
565 			poller->state = SPDK_POLLER_STATE_PAUSED;
566 			continue;
567 		}
568 
569 		if (now < poller->next_run_tick) {
570 			break;
571 		}
572 
573 		poller->state = SPDK_POLLER_STATE_RUNNING;
574 		timer_rc = poller->fn(poller->arg);
575 
576 #ifdef DEBUG
577 		if (timer_rc == -1) {
578 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
579 		}
580 #endif
581 
582 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
583 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
584 			free(poller);
585 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
586 			poller->state = SPDK_POLLER_STATE_WAITING;
587 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
588 			_spdk_poller_insert_timer(thread, poller, now);
589 		}
590 
591 		if (timer_rc > rc) {
592 			rc = timer_rc;
593 		}
594 	}
595 
596 	if (rc == 0) {
597 		/* Poller status idle */
598 		thread->stats.idle_tsc += now - thread->tsc_last;
599 	} else if (rc > 0) {
600 		/* Poller status busy */
601 		thread->stats.busy_tsc += now - thread->tsc_last;
602 	}
603 	thread->tsc_last = now;
604 
605 	tls_thread = orig_thread;
606 
607 	return rc;
608 }
609 
610 uint64_t
611 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
612 {
613 	struct spdk_poller *poller;
614 
615 	poller = TAILQ_FIRST(&thread->timer_pollers);
616 	if (poller) {
617 		return poller->next_run_tick;
618 	}
619 
620 	return 0;
621 }
622 
623 int
624 spdk_thread_has_active_pollers(struct spdk_thread *thread)
625 {
626 	return !TAILQ_EMPTY(&thread->active_pollers);
627 }
628 
629 static bool
630 _spdk_thread_has_unpaused_pollers(struct spdk_thread *thread)
631 {
632 	if (TAILQ_EMPTY(&thread->active_pollers) &&
633 	    TAILQ_EMPTY(&thread->timer_pollers)) {
634 		return false;
635 	}
636 
637 	return true;
638 }
639 
640 bool
641 spdk_thread_has_pollers(struct spdk_thread *thread)
642 {
643 	if (!_spdk_thread_has_unpaused_pollers(thread) &&
644 	    TAILQ_EMPTY(&thread->paused_pollers)) {
645 		return false;
646 	}
647 
648 	return true;
649 }
650 
651 bool
652 spdk_thread_is_idle(struct spdk_thread *thread)
653 {
654 	if (spdk_ring_count(thread->messages) ||
655 	    _spdk_thread_has_unpaused_pollers(thread) ||
656 	    thread->critical_msg != NULL) {
657 		return false;
658 	}
659 
660 	return true;
661 }
662 
663 uint32_t
664 spdk_thread_get_count(void)
665 {
666 	/*
667 	 * Return cached value of the current thread count.  We could acquire the
668 	 *  lock and iterate through the TAILQ of threads to count them, but that
669 	 *  count could still be invalidated after we release the lock.
670 	 */
671 	return g_thread_count;
672 }
673 
674 struct spdk_thread *
675 spdk_get_thread(void)
676 {
677 	struct spdk_thread *thread;
678 
679 	thread = _get_thread();
680 	if (!thread) {
681 		SPDK_ERRLOG("No thread allocated\n");
682 	}
683 
684 	return thread;
685 }
686 
687 const char *
688 spdk_thread_get_name(const struct spdk_thread *thread)
689 {
690 	return thread->name;
691 }
692 
693 int
694 spdk_thread_get_stats(struct spdk_thread_stats *stats)
695 {
696 	struct spdk_thread *thread;
697 
698 	thread = _get_thread();
699 	if (!thread) {
700 		SPDK_ERRLOG("No thread allocated\n");
701 		return -EINVAL;
702 	}
703 
704 	if (stats == NULL) {
705 		return -EINVAL;
706 	}
707 
708 	*stats = thread->stats;
709 
710 	return 0;
711 }
712 
713 int
714 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
715 {
716 	struct spdk_thread *local_thread;
717 	struct spdk_msg *msg;
718 	int rc;
719 
720 	assert(thread != NULL);
721 
722 	local_thread = _get_thread();
723 
724 	msg = NULL;
725 	if (local_thread != NULL) {
726 		if (local_thread->msg_cache_count > 0) {
727 			msg = SLIST_FIRST(&local_thread->msg_cache);
728 			assert(msg != NULL);
729 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
730 			local_thread->msg_cache_count--;
731 		}
732 	}
733 
734 	if (msg == NULL) {
735 		msg = spdk_mempool_get(g_spdk_msg_mempool);
736 		if (!msg) {
737 			SPDK_ERRLOG("msg could not be allocated\n");
738 			return -ENOMEM;
739 		}
740 	}
741 
742 	msg->fn = fn;
743 	msg->arg = ctx;
744 
745 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
746 	if (rc != 1) {
747 		SPDK_ERRLOG("msg could not be enqueued\n");
748 		spdk_mempool_put(g_spdk_msg_mempool, msg);
749 		return -EIO;
750 	}
751 
752 	return 0;
753 }
754 
755 int
756 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
757 {
758 	spdk_msg_fn expected = NULL;
759 
760 	if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
761 					__ATOMIC_SEQ_CST)) {
762 		return 0;
763 	}
764 
765 	return -EIO;
766 }
767 
768 struct spdk_poller *
769 spdk_poller_register(spdk_poller_fn fn,
770 		     void *arg,
771 		     uint64_t period_microseconds)
772 {
773 	struct spdk_thread *thread;
774 	struct spdk_poller *poller;
775 	uint64_t quotient, remainder, ticks;
776 
777 	thread = spdk_get_thread();
778 	if (!thread) {
779 		assert(false);
780 		return NULL;
781 	}
782 
783 	poller = calloc(1, sizeof(*poller));
784 	if (poller == NULL) {
785 		SPDK_ERRLOG("Poller memory allocation failed\n");
786 		return NULL;
787 	}
788 
789 	poller->state = SPDK_POLLER_STATE_WAITING;
790 	poller->fn = fn;
791 	poller->arg = arg;
792 
793 	if (period_microseconds) {
794 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
795 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
796 		ticks = spdk_get_ticks_hz();
797 
798 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
799 	} else {
800 		poller->period_ticks = 0;
801 	}
802 
803 	_spdk_thread_insert_poller(thread, poller);
804 
805 	return poller;
806 }
807 
808 void
809 spdk_poller_unregister(struct spdk_poller **ppoller)
810 {
811 	struct spdk_thread *thread;
812 	struct spdk_poller *poller;
813 
814 	poller = *ppoller;
815 	if (poller == NULL) {
816 		return;
817 	}
818 
819 	*ppoller = NULL;
820 
821 	thread = spdk_get_thread();
822 	if (!thread) {
823 		assert(false);
824 		return;
825 	}
826 
827 	/* If the poller was paused, put it on the active_pollers list so that
828 	 * its unregistration can be processed by spdk_thread_poll().
829 	 */
830 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
831 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
832 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
833 		poller->period_ticks = 0;
834 	}
835 
836 	/* Simply set the state to unregistered. The poller will get cleaned up
837 	 * in a subsequent call to spdk_thread_poll().
838 	 */
839 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
840 }
841 
842 void
843 spdk_poller_pause(struct spdk_poller *poller)
844 {
845 	struct spdk_thread *thread;
846 
847 	if (poller->state == SPDK_POLLER_STATE_PAUSED ||
848 	    poller->state == SPDK_POLLER_STATE_PAUSING) {
849 		return;
850 	}
851 
852 	thread = spdk_get_thread();
853 	if (!thread) {
854 		assert(false);
855 		return;
856 	}
857 
858 	/* If a poller is paused from within itself, we can immediately move it
859 	 * on the paused_pollers list.  Otherwise we just set its state to
860 	 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it.  It
861 	 * allows a poller to be paused from another one's context without
862 	 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration.
863 	 */
864 	if (poller->state != SPDK_POLLER_STATE_RUNNING) {
865 		poller->state = SPDK_POLLER_STATE_PAUSING;
866 	} else {
867 		if (poller->period_ticks > 0) {
868 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
869 		} else {
870 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
871 		}
872 
873 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
874 		poller->state = SPDK_POLLER_STATE_PAUSED;
875 	}
876 }
877 
878 void
879 spdk_poller_resume(struct spdk_poller *poller)
880 {
881 	struct spdk_thread *thread;
882 
883 	if (poller->state != SPDK_POLLER_STATE_PAUSED &&
884 	    poller->state != SPDK_POLLER_STATE_PAUSING) {
885 		return;
886 	}
887 
888 	thread = spdk_get_thread();
889 	if (!thread) {
890 		assert(false);
891 		return;
892 	}
893 
894 	/* If a poller is paused it has to be removed from the paused pollers
895 	 * list and put on the active / timer list depending on its
896 	 * period_ticks.  If a poller is still in the process of being paused,
897 	 * we just need to flip its state back to waiting, as it's already on
898 	 * the appropriate list.
899 	 */
900 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
901 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
902 		_spdk_thread_insert_poller(thread, poller);
903 	}
904 
905 	poller->state = SPDK_POLLER_STATE_WAITING;
906 }
907 
908 struct call_thread {
909 	struct spdk_thread *cur_thread;
910 	spdk_msg_fn fn;
911 	void *ctx;
912 
913 	struct spdk_thread *orig_thread;
914 	spdk_msg_fn cpl;
915 };
916 
917 static void
918 spdk_on_thread(void *ctx)
919 {
920 	struct call_thread *ct = ctx;
921 
922 	ct->fn(ct->ctx);
923 
924 	pthread_mutex_lock(&g_devlist_mutex);
925 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
926 	pthread_mutex_unlock(&g_devlist_mutex);
927 
928 	if (!ct->cur_thread) {
929 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
930 
931 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
932 		free(ctx);
933 	} else {
934 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
935 			      ct->cur_thread->name);
936 
937 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
938 	}
939 }
940 
941 void
942 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
943 {
944 	struct call_thread *ct;
945 	struct spdk_thread *thread;
946 
947 	ct = calloc(1, sizeof(*ct));
948 	if (!ct) {
949 		SPDK_ERRLOG("Unable to perform thread iteration\n");
950 		cpl(ctx);
951 		return;
952 	}
953 
954 	ct->fn = fn;
955 	ct->ctx = ctx;
956 	ct->cpl = cpl;
957 
958 	thread = _get_thread();
959 	if (!thread) {
960 		SPDK_ERRLOG("No thread allocated\n");
961 		free(ct);
962 		cpl(ctx);
963 		return;
964 	}
965 	ct->orig_thread = thread;
966 
967 	pthread_mutex_lock(&g_devlist_mutex);
968 	ct->cur_thread = TAILQ_FIRST(&g_threads);
969 	pthread_mutex_unlock(&g_devlist_mutex);
970 
971 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
972 		      ct->orig_thread->name);
973 
974 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
975 }
976 
977 void
978 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
979 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
980 			const char *name)
981 {
982 	struct io_device *dev, *tmp;
983 	struct spdk_thread *thread;
984 
985 	assert(io_device != NULL);
986 	assert(create_cb != NULL);
987 	assert(destroy_cb != NULL);
988 
989 	thread = spdk_get_thread();
990 	if (!thread) {
991 		SPDK_ERRLOG("called from non-SPDK thread\n");
992 		assert(false);
993 		return;
994 	}
995 
996 	dev = calloc(1, sizeof(struct io_device));
997 	if (dev == NULL) {
998 		SPDK_ERRLOG("could not allocate io_device\n");
999 		return;
1000 	}
1001 
1002 	dev->io_device = io_device;
1003 	if (name) {
1004 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1005 	} else {
1006 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1007 	}
1008 	dev->create_cb = create_cb;
1009 	dev->destroy_cb = destroy_cb;
1010 	dev->unregister_cb = NULL;
1011 	dev->ctx_size = ctx_size;
1012 	dev->for_each_count = 0;
1013 	dev->unregistered = false;
1014 	dev->refcnt = 0;
1015 
1016 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
1017 		      dev->name, dev->io_device, thread->name);
1018 
1019 	pthread_mutex_lock(&g_devlist_mutex);
1020 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
1021 		if (tmp->io_device == io_device) {
1022 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1023 				    io_device, tmp->name, dev->name);
1024 			free(dev);
1025 			pthread_mutex_unlock(&g_devlist_mutex);
1026 			return;
1027 		}
1028 	}
1029 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1030 	pthread_mutex_unlock(&g_devlist_mutex);
1031 }
1032 
1033 static void
1034 _finish_unregister(void *arg)
1035 {
1036 	struct io_device *dev = arg;
1037 
1038 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1039 		      dev->name, dev->io_device, dev->unregister_thread->name);
1040 
1041 	dev->unregister_cb(dev->io_device);
1042 	free(dev);
1043 }
1044 
1045 static void
1046 _spdk_io_device_free(struct io_device *dev)
1047 {
1048 	if (dev->unregister_cb == NULL) {
1049 		free(dev);
1050 	} else {
1051 		assert(dev->unregister_thread != NULL);
1052 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
1053 			      dev->name, dev->io_device, dev->unregister_thread->name);
1054 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1055 	}
1056 }
1057 
1058 void
1059 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1060 {
1061 	struct io_device *dev;
1062 	uint32_t refcnt;
1063 	struct spdk_thread *thread;
1064 
1065 	thread = spdk_get_thread();
1066 	if (!thread) {
1067 		SPDK_ERRLOG("called from non-SPDK thread\n");
1068 		assert(false);
1069 		return;
1070 	}
1071 
1072 	pthread_mutex_lock(&g_devlist_mutex);
1073 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1074 		if (dev->io_device == io_device) {
1075 			break;
1076 		}
1077 	}
1078 
1079 	if (!dev) {
1080 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1081 		assert(false);
1082 		pthread_mutex_unlock(&g_devlist_mutex);
1083 		return;
1084 	}
1085 
1086 	if (dev->for_each_count > 0) {
1087 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1088 			    dev->name, io_device, dev->for_each_count);
1089 		pthread_mutex_unlock(&g_devlist_mutex);
1090 		return;
1091 	}
1092 
1093 	dev->unregister_cb = unregister_cb;
1094 	dev->unregistered = true;
1095 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
1096 	refcnt = dev->refcnt;
1097 	dev->unregister_thread = thread;
1098 	pthread_mutex_unlock(&g_devlist_mutex);
1099 
1100 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
1101 		      dev->name, dev->io_device, thread->name);
1102 
1103 	if (refcnt > 0) {
1104 		/* defer deletion */
1105 		return;
1106 	}
1107 
1108 	_spdk_io_device_free(dev);
1109 }
1110 
1111 struct spdk_io_channel *
1112 spdk_get_io_channel(void *io_device)
1113 {
1114 	struct spdk_io_channel *ch;
1115 	struct spdk_thread *thread;
1116 	struct io_device *dev;
1117 	int rc;
1118 
1119 	pthread_mutex_lock(&g_devlist_mutex);
1120 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1121 		if (dev->io_device == io_device) {
1122 			break;
1123 		}
1124 	}
1125 	if (dev == NULL) {
1126 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
1127 		pthread_mutex_unlock(&g_devlist_mutex);
1128 		return NULL;
1129 	}
1130 
1131 	thread = _get_thread();
1132 	if (!thread) {
1133 		SPDK_ERRLOG("No thread allocated\n");
1134 		pthread_mutex_unlock(&g_devlist_mutex);
1135 		return NULL;
1136 	}
1137 
1138 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1139 		if (ch->dev == dev) {
1140 			ch->ref++;
1141 
1142 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1143 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
1144 
1145 			/*
1146 			 * An I/O channel already exists for this device on this
1147 			 *  thread, so return it.
1148 			 */
1149 			pthread_mutex_unlock(&g_devlist_mutex);
1150 			return ch;
1151 		}
1152 	}
1153 
1154 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1155 	if (ch == NULL) {
1156 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1157 		pthread_mutex_unlock(&g_devlist_mutex);
1158 		return NULL;
1159 	}
1160 
1161 	ch->dev = dev;
1162 	ch->destroy_cb = dev->destroy_cb;
1163 	ch->thread = thread;
1164 	ch->ref = 1;
1165 	ch->destroy_ref = 0;
1166 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1167 
1168 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1169 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1170 
1171 	dev->refcnt++;
1172 
1173 	pthread_mutex_unlock(&g_devlist_mutex);
1174 
1175 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1176 	if (rc != 0) {
1177 		pthread_mutex_lock(&g_devlist_mutex);
1178 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1179 		dev->refcnt--;
1180 		free(ch);
1181 		pthread_mutex_unlock(&g_devlist_mutex);
1182 		return NULL;
1183 	}
1184 
1185 	return ch;
1186 }
1187 
1188 static void
1189 _spdk_put_io_channel(void *arg)
1190 {
1191 	struct spdk_io_channel *ch = arg;
1192 	bool do_remove_dev = true;
1193 	struct spdk_thread *thread;
1194 
1195 	thread = spdk_get_thread();
1196 	if (!thread) {
1197 		SPDK_ERRLOG("called from non-SPDK thread\n");
1198 		assert(false);
1199 		return;
1200 	}
1201 
1202 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1203 		      "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
1204 		      ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
1205 
1206 	assert(ch->thread == thread);
1207 
1208 	ch->destroy_ref--;
1209 
1210 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1211 		/*
1212 		 * Another reference to the associated io_device was requested
1213 		 *  after this message was sent but before it had a chance to
1214 		 *  execute.
1215 		 */
1216 		return;
1217 	}
1218 
1219 	pthread_mutex_lock(&g_devlist_mutex);
1220 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1221 	pthread_mutex_unlock(&g_devlist_mutex);
1222 
1223 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1224 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1225 
1226 	pthread_mutex_lock(&g_devlist_mutex);
1227 	ch->dev->refcnt--;
1228 
1229 	if (!ch->dev->unregistered) {
1230 		do_remove_dev = false;
1231 	}
1232 
1233 	if (ch->dev->refcnt > 0) {
1234 		do_remove_dev = false;
1235 	}
1236 
1237 	pthread_mutex_unlock(&g_devlist_mutex);
1238 
1239 	if (do_remove_dev) {
1240 		_spdk_io_device_free(ch->dev);
1241 	}
1242 	free(ch);
1243 }
1244 
1245 void
1246 spdk_put_io_channel(struct spdk_io_channel *ch)
1247 {
1248 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1249 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1250 		      ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
1251 
1252 	ch->ref--;
1253 
1254 	if (ch->ref == 0) {
1255 		ch->destroy_ref++;
1256 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
1257 	}
1258 }
1259 
1260 struct spdk_io_channel *
1261 spdk_io_channel_from_ctx(void *ctx)
1262 {
1263 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1264 }
1265 
1266 struct spdk_thread *
1267 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1268 {
1269 	return ch->thread;
1270 }
1271 
1272 struct spdk_io_channel_iter {
1273 	void *io_device;
1274 	struct io_device *dev;
1275 	spdk_channel_msg fn;
1276 	int status;
1277 	void *ctx;
1278 	struct spdk_io_channel *ch;
1279 
1280 	struct spdk_thread *cur_thread;
1281 
1282 	struct spdk_thread *orig_thread;
1283 	spdk_channel_for_each_cpl cpl;
1284 };
1285 
1286 void *
1287 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1288 {
1289 	return i->io_device;
1290 }
1291 
1292 struct spdk_io_channel *
1293 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1294 {
1295 	return i->ch;
1296 }
1297 
1298 void *
1299 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1300 {
1301 	return i->ctx;
1302 }
1303 
1304 static void
1305 _call_completion(void *ctx)
1306 {
1307 	struct spdk_io_channel_iter *i = ctx;
1308 
1309 	if (i->cpl != NULL) {
1310 		i->cpl(i, i->status);
1311 	}
1312 	free(i);
1313 }
1314 
1315 static void
1316 _call_channel(void *ctx)
1317 {
1318 	struct spdk_io_channel_iter *i = ctx;
1319 	struct spdk_io_channel *ch;
1320 
1321 	/*
1322 	 * It is possible that the channel was deleted before this
1323 	 *  message had a chance to execute.  If so, skip calling
1324 	 *  the fn() on this thread.
1325 	 */
1326 	pthread_mutex_lock(&g_devlist_mutex);
1327 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1328 		if (ch->dev->io_device == i->io_device) {
1329 			break;
1330 		}
1331 	}
1332 	pthread_mutex_unlock(&g_devlist_mutex);
1333 
1334 	if (ch) {
1335 		i->fn(i);
1336 	} else {
1337 		spdk_for_each_channel_continue(i, 0);
1338 	}
1339 }
1340 
1341 void
1342 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1343 		      spdk_channel_for_each_cpl cpl)
1344 {
1345 	struct spdk_thread *thread;
1346 	struct spdk_io_channel *ch;
1347 	struct spdk_io_channel_iter *i;
1348 	int rc __attribute__((unused));
1349 
1350 	i = calloc(1, sizeof(*i));
1351 	if (!i) {
1352 		SPDK_ERRLOG("Unable to allocate iterator\n");
1353 		return;
1354 	}
1355 
1356 	i->io_device = io_device;
1357 	i->fn = fn;
1358 	i->ctx = ctx;
1359 	i->cpl = cpl;
1360 
1361 	pthread_mutex_lock(&g_devlist_mutex);
1362 	i->orig_thread = _get_thread();
1363 
1364 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1365 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1366 			if (ch->dev->io_device == io_device) {
1367 				ch->dev->for_each_count++;
1368 				i->dev = ch->dev;
1369 				i->cur_thread = thread;
1370 				i->ch = ch;
1371 				pthread_mutex_unlock(&g_devlist_mutex);
1372 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1373 				assert(rc == 0);
1374 				return;
1375 			}
1376 		}
1377 	}
1378 
1379 	pthread_mutex_unlock(&g_devlist_mutex);
1380 
1381 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1382 	assert(rc == 0);
1383 }
1384 
1385 void
1386 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1387 {
1388 	struct spdk_thread *thread;
1389 	struct spdk_io_channel *ch;
1390 
1391 	assert(i->cur_thread == spdk_get_thread());
1392 
1393 	i->status = status;
1394 
1395 	pthread_mutex_lock(&g_devlist_mutex);
1396 	if (status) {
1397 		goto end;
1398 	}
1399 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1400 	while (thread) {
1401 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1402 			if (ch->dev->io_device == i->io_device) {
1403 				i->cur_thread = thread;
1404 				i->ch = ch;
1405 				pthread_mutex_unlock(&g_devlist_mutex);
1406 				spdk_thread_send_msg(thread, _call_channel, i);
1407 				return;
1408 			}
1409 		}
1410 		thread = TAILQ_NEXT(thread, tailq);
1411 	}
1412 
1413 end:
1414 	i->dev->for_each_count--;
1415 	i->ch = NULL;
1416 	pthread_mutex_unlock(&g_devlist_mutex);
1417 
1418 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1419 }
1420 
1421 
1422 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1423