xref: /spdk/lib/thread/thread.c (revision 3225f86bc248c13417669e566b99d5ca41bc6a1c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/queue.h"
38 #include "spdk/string.h"
39 #include "spdk/thread.h"
40 #include "spdk/util.h"
41 
42 #include "spdk_internal/log.h"
43 #include "spdk_internal/thread.h"
44 
45 #define SPDK_MSG_BATCH_SIZE		8
46 #define SPDK_MAX_DEVICE_NAME_LEN	256
47 #define SPDK_MAX_THREAD_NAME_LEN	256
48 
49 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
50 
51 static spdk_new_thread_fn g_new_thread_fn = NULL;
52 static size_t g_ctx_sz = 0;
53 
54 struct io_device {
55 	void				*io_device;
56 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
57 	spdk_io_channel_create_cb	create_cb;
58 	spdk_io_channel_destroy_cb	destroy_cb;
59 	spdk_io_device_unregister_cb	unregister_cb;
60 	struct spdk_thread		*unregister_thread;
61 	uint32_t			ctx_size;
62 	uint32_t			for_each_count;
63 	TAILQ_ENTRY(io_device)		tailq;
64 
65 	uint32_t			refcnt;
66 
67 	bool				unregistered;
68 };
69 
70 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
71 
72 struct spdk_msg {
73 	spdk_msg_fn		fn;
74 	void			*arg;
75 
76 	SLIST_ENTRY(spdk_msg)	link;
77 };
78 
79 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
80 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
81 
82 enum spdk_poller_state {
83 	/* The poller is registered with a thread but not currently executing its fn. */
84 	SPDK_POLLER_STATE_WAITING,
85 
86 	/* The poller is currently running its fn. */
87 	SPDK_POLLER_STATE_RUNNING,
88 
89 	/* The poller was unregistered during the execution of its fn. */
90 	SPDK_POLLER_STATE_UNREGISTERED,
91 
92 	/* The poller is in the process of being paused.  It will be paused
93 	 * during the next time it's supposed to be executed.
94 	 */
95 	SPDK_POLLER_STATE_PAUSING,
96 
97 	/* The poller is registered but currently paused.  It's on the
98 	 * paused_pollers list.
99 	 */
100 	SPDK_POLLER_STATE_PAUSED,
101 };
102 
103 struct spdk_poller {
104 	TAILQ_ENTRY(spdk_poller)	tailq;
105 
106 	/* Current state of the poller; should only be accessed from the poller's thread. */
107 	enum spdk_poller_state		state;
108 
109 	uint64_t			period_ticks;
110 	uint64_t			next_run_tick;
111 	spdk_poller_fn			fn;
112 	void				*arg;
113 };
114 
115 struct spdk_thread {
116 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
117 	TAILQ_ENTRY(spdk_thread)	tailq;
118 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
119 
120 	bool				exit;
121 
122 	struct spdk_cpuset		cpumask;
123 
124 	uint64_t			tsc_last;
125 	struct spdk_thread_stats	stats;
126 
127 	/*
128 	 * Contains pollers actively running on this thread.  Pollers
129 	 *  are run round-robin. The thread takes one poller from the head
130 	 *  of the ring, executes it, then puts it back at the tail of
131 	 *  the ring.
132 	 */
133 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
134 
135 	/**
136 	 * Contains pollers running on this thread with a periodic timer.
137 	 */
138 	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;
139 
140 	/*
141 	 * Contains paused pollers.  Pollers on this queue are waiting until
142 	 * they are resumed (in which case they're put onto the active/timer
143 	 * queues) or unregistered.
144 	 */
145 	TAILQ_HEAD(paused_pollers_head, spdk_poller)	paused_pollers;
146 
147 	struct spdk_ring		*messages;
148 
149 	SLIST_HEAD(, spdk_msg)		msg_cache;
150 	size_t				msg_cache_count;
151 
152 	/* User context allocated at the end */
153 	uint8_t				ctx[0];
154 };
155 
156 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
157 static uint32_t g_thread_count = 0;
158 
159 static __thread struct spdk_thread *tls_thread = NULL;
160 
161 static inline struct spdk_thread *
162 _get_thread(void)
163 {
164 	return tls_thread;
165 }
166 
167 int
168 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
169 {
170 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
171 
172 	assert(g_new_thread_fn == NULL);
173 	g_new_thread_fn = new_thread_fn;
174 
175 	g_ctx_sz = ctx_sz;
176 
177 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
178 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
179 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
180 			     sizeof(struct spdk_msg),
181 			     0, /* No cache. We do our own. */
182 			     SPDK_ENV_SOCKET_ID_ANY);
183 
184 	if (!g_spdk_msg_mempool) {
185 		return -1;
186 	}
187 
188 	return 0;
189 }
190 
191 void
192 spdk_thread_lib_fini(void)
193 {
194 	struct io_device *dev;
195 
196 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
197 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
198 	}
199 
200 	if (g_spdk_msg_mempool) {
201 		spdk_mempool_free(g_spdk_msg_mempool);
202 		g_spdk_msg_mempool = NULL;
203 	}
204 
205 	g_new_thread_fn = NULL;
206 	g_ctx_sz = 0;
207 }
208 
209 static void
210 _free_thread(struct spdk_thread *thread)
211 {
212 	struct spdk_io_channel *ch;
213 	struct spdk_msg *msg;
214 	struct spdk_poller *poller, *ptmp;
215 
216 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
217 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
218 			    thread->name, ch->dev->name);
219 	}
220 
221 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
222 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
223 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
224 				     poller);
225 		}
226 
227 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
228 		free(poller);
229 	}
230 
231 
232 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
233 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
234 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
235 				     poller);
236 		}
237 
238 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
239 		free(poller);
240 	}
241 
242 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
243 		SPDK_WARNLOG("poller %p still registered at thread exit\n", poller);
244 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
245 		free(poller);
246 	}
247 
248 	pthread_mutex_lock(&g_devlist_mutex);
249 	assert(g_thread_count > 0);
250 	g_thread_count--;
251 	TAILQ_REMOVE(&g_threads, thread, tailq);
252 	pthread_mutex_unlock(&g_devlist_mutex);
253 
254 	msg = SLIST_FIRST(&thread->msg_cache);
255 	while (msg != NULL) {
256 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
257 
258 		assert(thread->msg_cache_count > 0);
259 		thread->msg_cache_count--;
260 		spdk_mempool_put(g_spdk_msg_mempool, msg);
261 
262 		msg = SLIST_FIRST(&thread->msg_cache);
263 	}
264 
265 	assert(thread->msg_cache_count == 0);
266 
267 	spdk_ring_free(thread->messages);
268 	free(thread);
269 }
270 
271 struct spdk_thread *
272 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
273 {
274 	struct spdk_thread *thread;
275 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
276 	int rc, i;
277 
278 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
279 	if (!thread) {
280 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
281 		return NULL;
282 	}
283 
284 	if (cpumask) {
285 		spdk_cpuset_copy(&thread->cpumask, cpumask);
286 	} else {
287 		spdk_cpuset_negate(&thread->cpumask);
288 	}
289 
290 	TAILQ_INIT(&thread->io_channels);
291 	TAILQ_INIT(&thread->active_pollers);
292 	TAILQ_INIT(&thread->timer_pollers);
293 	TAILQ_INIT(&thread->paused_pollers);
294 	SLIST_INIT(&thread->msg_cache);
295 	thread->msg_cache_count = 0;
296 
297 	thread->tsc_last = spdk_get_ticks();
298 
299 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
300 	if (!thread->messages) {
301 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
302 		free(thread);
303 		return NULL;
304 	}
305 
306 	/* Fill the local message pool cache. */
307 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
308 	if (rc == 0) {
309 		/* If we can't populate the cache it's ok. The cache will get filled
310 		 * up organically as messages are passed to the thread. */
311 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
312 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
313 			thread->msg_cache_count++;
314 		}
315 	}
316 
317 	if (name) {
318 		snprintf(thread->name, sizeof(thread->name), "%s", name);
319 	} else {
320 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
321 	}
322 
323 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
324 
325 	pthread_mutex_lock(&g_devlist_mutex);
326 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
327 	g_thread_count++;
328 	pthread_mutex_unlock(&g_devlist_mutex);
329 
330 	if (g_new_thread_fn) {
331 		rc = g_new_thread_fn(thread);
332 		if (rc != 0) {
333 			_free_thread(thread);
334 			return NULL;
335 		}
336 	}
337 
338 	return thread;
339 }
340 
341 void
342 spdk_set_thread(struct spdk_thread *thread)
343 {
344 	tls_thread = thread;
345 }
346 
347 void
348 spdk_thread_exit(struct spdk_thread *thread)
349 {
350 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name);
351 
352 	assert(tls_thread == thread);
353 
354 	thread->exit = true;
355 }
356 
357 void
358 spdk_thread_destroy(struct spdk_thread *thread)
359 {
360 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name);
361 
362 	assert(thread->exit == true);
363 
364 	if (tls_thread == thread) {
365 		tls_thread = NULL;
366 	}
367 
368 	_free_thread(thread);
369 }
370 
371 void *
372 spdk_thread_get_ctx(struct spdk_thread *thread)
373 {
374 	if (g_ctx_sz > 0) {
375 		return thread->ctx;
376 	}
377 
378 	return NULL;
379 }
380 
381 struct spdk_cpuset *
382 spdk_thread_get_cpumask(struct spdk_thread *thread)
383 {
384 	return &thread->cpumask;
385 }
386 
387 struct spdk_thread *
388 spdk_thread_get_from_ctx(void *ctx)
389 {
390 	if (ctx == NULL) {
391 		assert(false);
392 		return NULL;
393 	}
394 
395 	assert(g_ctx_sz > 0);
396 
397 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
398 }
399 
400 static inline uint32_t
401 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
402 {
403 	unsigned count, i;
404 	void *messages[SPDK_MSG_BATCH_SIZE];
405 
406 #ifdef DEBUG
407 	/*
408 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
409 	 * so we will never actually read uninitialized data from events, but just to be sure
410 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
411 	 */
412 	memset(messages, 0, sizeof(messages));
413 #endif
414 
415 	if (max_msgs > 0) {
416 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
417 	} else {
418 		max_msgs = SPDK_MSG_BATCH_SIZE;
419 	}
420 
421 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
422 	if (count == 0) {
423 		return 0;
424 	}
425 
426 	for (i = 0; i < count; i++) {
427 		struct spdk_msg *msg = messages[i];
428 
429 		assert(msg != NULL);
430 		msg->fn(msg->arg);
431 
432 		if (thread->exit) {
433 			break;
434 		}
435 
436 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
437 			/* Insert the messages at the head. We want to re-use the hot
438 			 * ones. */
439 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
440 			thread->msg_cache_count++;
441 		} else {
442 			spdk_mempool_put(g_spdk_msg_mempool, msg);
443 		}
444 	}
445 
446 	return count;
447 }
448 
449 static void
450 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
451 {
452 	struct spdk_poller *iter;
453 
454 	poller->next_run_tick = now + poller->period_ticks;
455 
456 	/*
457 	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
458 	 * run time.
459 	 */
460 	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
461 		if (iter->next_run_tick <= poller->next_run_tick) {
462 			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
463 			return;
464 		}
465 	}
466 
467 	/* No earlier pollers were found, so this poller must be the new head */
468 	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
469 }
470 
471 static void
472 _spdk_thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
473 {
474 	if (poller->period_ticks) {
475 		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
476 	} else {
477 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
478 	}
479 }
480 
481 int
482 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
483 {
484 	uint32_t msg_count;
485 	struct spdk_thread *orig_thread;
486 	struct spdk_poller *poller, *tmp;
487 	int rc = 0;
488 
489 	orig_thread = _get_thread();
490 	tls_thread = thread;
491 
492 	if (now == 0) {
493 		now = spdk_get_ticks();
494 	}
495 
496 	msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
497 	if (msg_count) {
498 		rc = 1;
499 	}
500 
501 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
502 				   active_pollers_head, tailq, tmp) {
503 		int poller_rc;
504 
505 		if (thread->exit) {
506 			break;
507 		}
508 
509 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
510 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
511 			free(poller);
512 			continue;
513 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
514 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
515 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
516 			poller->state = SPDK_POLLER_STATE_PAUSED;
517 			continue;
518 		}
519 
520 		poller->state = SPDK_POLLER_STATE_RUNNING;
521 		poller_rc = poller->fn(poller->arg);
522 
523 #ifdef DEBUG
524 		if (poller_rc == -1) {
525 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
526 		}
527 #endif
528 
529 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
530 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
531 			free(poller);
532 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
533 			poller->state = SPDK_POLLER_STATE_WAITING;
534 		}
535 
536 		if (poller_rc > rc) {
537 			rc = poller_rc;
538 		}
539 	}
540 
541 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
542 		int timer_rc = 0;
543 
544 		if (thread->exit) {
545 			break;
546 		}
547 
548 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
549 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
550 			free(poller);
551 			continue;
552 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
553 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
554 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
555 			poller->state = SPDK_POLLER_STATE_PAUSED;
556 			continue;
557 		}
558 
559 		if (now < poller->next_run_tick) {
560 			break;
561 		}
562 
563 		poller->state = SPDK_POLLER_STATE_RUNNING;
564 		timer_rc = poller->fn(poller->arg);
565 
566 #ifdef DEBUG
567 		if (timer_rc == -1) {
568 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
569 		}
570 #endif
571 
572 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
573 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
574 			free(poller);
575 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
576 			poller->state = SPDK_POLLER_STATE_WAITING;
577 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
578 			_spdk_poller_insert_timer(thread, poller, now);
579 		}
580 
581 		if (timer_rc > rc) {
582 			rc = timer_rc;
583 		}
584 	}
585 
586 	if (rc == 0) {
587 		/* Poller status idle */
588 		thread->stats.idle_tsc += now - thread->tsc_last;
589 	} else if (rc > 0) {
590 		/* Poller status busy */
591 		thread->stats.busy_tsc += now - thread->tsc_last;
592 	}
593 	thread->tsc_last = now;
594 
595 	tls_thread = orig_thread;
596 
597 	return rc;
598 }
599 
600 uint64_t
601 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
602 {
603 	struct spdk_poller *poller;
604 
605 	poller = TAILQ_FIRST(&thread->timer_pollers);
606 	if (poller) {
607 		return poller->next_run_tick;
608 	}
609 
610 	return 0;
611 }
612 
613 int
614 spdk_thread_has_active_pollers(struct spdk_thread *thread)
615 {
616 	return !TAILQ_EMPTY(&thread->active_pollers);
617 }
618 
619 static bool
620 _spdk_thread_has_unpaused_pollers(struct spdk_thread *thread)
621 {
622 	if (TAILQ_EMPTY(&thread->active_pollers) &&
623 	    TAILQ_EMPTY(&thread->timer_pollers)) {
624 		return false;
625 	}
626 
627 	return true;
628 }
629 
630 bool
631 spdk_thread_has_pollers(struct spdk_thread *thread)
632 {
633 	if (!_spdk_thread_has_unpaused_pollers(thread) &&
634 	    TAILQ_EMPTY(&thread->paused_pollers)) {
635 		return false;
636 	}
637 
638 	return true;
639 }
640 
641 bool
642 spdk_thread_is_idle(struct spdk_thread *thread)
643 {
644 	if (spdk_ring_count(thread->messages) ||
645 	    _spdk_thread_has_unpaused_pollers(thread)) {
646 		return false;
647 	}
648 
649 	return true;
650 }
651 
652 uint32_t
653 spdk_thread_get_count(void)
654 {
655 	/*
656 	 * Return cached value of the current thread count.  We could acquire the
657 	 *  lock and iterate through the TAILQ of threads to count them, but that
658 	 *  count could still be invalidated after we release the lock.
659 	 */
660 	return g_thread_count;
661 }
662 
663 struct spdk_thread *
664 spdk_get_thread(void)
665 {
666 	struct spdk_thread *thread;
667 
668 	thread = _get_thread();
669 	if (!thread) {
670 		SPDK_ERRLOG("No thread allocated\n");
671 	}
672 
673 	return thread;
674 }
675 
676 const char *
677 spdk_thread_get_name(const struct spdk_thread *thread)
678 {
679 	return thread->name;
680 }
681 
682 int
683 spdk_thread_get_stats(struct spdk_thread_stats *stats)
684 {
685 	struct spdk_thread *thread;
686 
687 	thread = _get_thread();
688 	if (!thread) {
689 		SPDK_ERRLOG("No thread allocated\n");
690 		return -EINVAL;
691 	}
692 
693 	if (stats == NULL) {
694 		return -EINVAL;
695 	}
696 
697 	*stats = thread->stats;
698 
699 	return 0;
700 }
701 
702 int
703 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
704 {
705 	struct spdk_thread *local_thread;
706 	struct spdk_msg *msg;
707 	int rc;
708 
709 	assert(thread != NULL);
710 
711 	local_thread = _get_thread();
712 
713 	msg = NULL;
714 	if (local_thread != NULL) {
715 		if (local_thread->msg_cache_count > 0) {
716 			msg = SLIST_FIRST(&local_thread->msg_cache);
717 			assert(msg != NULL);
718 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
719 			local_thread->msg_cache_count--;
720 		}
721 	}
722 
723 	if (msg == NULL) {
724 		msg = spdk_mempool_get(g_spdk_msg_mempool);
725 		if (!msg) {
726 			SPDK_ERRLOG("msg could not be allocated\n");
727 			return -ENOMEM;
728 		}
729 	}
730 
731 	msg->fn = fn;
732 	msg->arg = ctx;
733 
734 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
735 	if (rc != 1) {
736 		SPDK_ERRLOG("msg could not be enqueued\n");
737 		spdk_mempool_put(g_spdk_msg_mempool, msg);
738 		return -EIO;
739 	}
740 
741 	return 0;
742 }
743 
744 struct spdk_poller *
745 spdk_poller_register(spdk_poller_fn fn,
746 		     void *arg,
747 		     uint64_t period_microseconds)
748 {
749 	struct spdk_thread *thread;
750 	struct spdk_poller *poller;
751 	uint64_t quotient, remainder, ticks;
752 
753 	thread = spdk_get_thread();
754 	if (!thread) {
755 		assert(false);
756 		return NULL;
757 	}
758 
759 	poller = calloc(1, sizeof(*poller));
760 	if (poller == NULL) {
761 		SPDK_ERRLOG("Poller memory allocation failed\n");
762 		return NULL;
763 	}
764 
765 	poller->state = SPDK_POLLER_STATE_WAITING;
766 	poller->fn = fn;
767 	poller->arg = arg;
768 
769 	if (period_microseconds) {
770 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
771 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
772 		ticks = spdk_get_ticks_hz();
773 
774 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
775 	} else {
776 		poller->period_ticks = 0;
777 	}
778 
779 	_spdk_thread_insert_poller(thread, poller);
780 
781 	return poller;
782 }
783 
784 void
785 spdk_poller_unregister(struct spdk_poller **ppoller)
786 {
787 	struct spdk_thread *thread;
788 	struct spdk_poller *poller;
789 
790 	poller = *ppoller;
791 	if (poller == NULL) {
792 		return;
793 	}
794 
795 	*ppoller = NULL;
796 
797 	thread = spdk_get_thread();
798 	if (!thread) {
799 		assert(false);
800 		return;
801 	}
802 
803 	/* If the poller was paused, put it on the active_pollers list so that
804 	 * its unregistration can be processed by spdk_thread_poll().
805 	 */
806 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
807 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
808 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
809 		poller->period_ticks = 0;
810 	}
811 
812 	/* Simply set the state to unregistered. The poller will get cleaned up
813 	 * in a subsequent call to spdk_thread_poll().
814 	 */
815 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
816 }
817 
818 void
819 spdk_poller_pause(struct spdk_poller *poller)
820 {
821 	struct spdk_thread *thread;
822 
823 	if (poller->state == SPDK_POLLER_STATE_PAUSED ||
824 	    poller->state == SPDK_POLLER_STATE_PAUSING) {
825 		return;
826 	}
827 
828 	thread = spdk_get_thread();
829 	if (!thread) {
830 		assert(false);
831 		return;
832 	}
833 
834 	/* If a poller is paused from within itself, we can immediately move it
835 	 * on the paused_pollers list.  Otherwise we just set its state to
836 	 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it.  It
837 	 * allows a poller to be paused from another one's context without
838 	 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration.
839 	 */
840 	if (poller->state != SPDK_POLLER_STATE_RUNNING) {
841 		poller->state = SPDK_POLLER_STATE_PAUSING;
842 	} else {
843 		if (poller->period_ticks > 0) {
844 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
845 		} else {
846 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
847 		}
848 
849 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
850 		poller->state = SPDK_POLLER_STATE_PAUSED;
851 	}
852 }
853 
854 void
855 spdk_poller_resume(struct spdk_poller *poller)
856 {
857 	struct spdk_thread *thread;
858 
859 	if (poller->state != SPDK_POLLER_STATE_PAUSED &&
860 	    poller->state != SPDK_POLLER_STATE_PAUSING) {
861 		return;
862 	}
863 
864 	thread = spdk_get_thread();
865 	if (!thread) {
866 		assert(false);
867 		return;
868 	}
869 
870 	/* If a poller is paused it has to be removed from the paused pollers
871 	 * list and put on the active / timer list depending on its
872 	 * period_ticks.  If a poller is still in the process of being paused,
873 	 * we just need to flip its state back to waiting, as it's already on
874 	 * the appropriate list.
875 	 */
876 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
877 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
878 		_spdk_thread_insert_poller(thread, poller);
879 	}
880 
881 	poller->state = SPDK_POLLER_STATE_WAITING;
882 }
883 
884 struct call_thread {
885 	struct spdk_thread *cur_thread;
886 	spdk_msg_fn fn;
887 	void *ctx;
888 
889 	struct spdk_thread *orig_thread;
890 	spdk_msg_fn cpl;
891 };
892 
893 static void
894 spdk_on_thread(void *ctx)
895 {
896 	struct call_thread *ct = ctx;
897 
898 	ct->fn(ct->ctx);
899 
900 	pthread_mutex_lock(&g_devlist_mutex);
901 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
902 	pthread_mutex_unlock(&g_devlist_mutex);
903 
904 	if (!ct->cur_thread) {
905 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
906 
907 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
908 		free(ctx);
909 	} else {
910 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
911 			      ct->cur_thread->name);
912 
913 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
914 	}
915 }
916 
917 void
918 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
919 {
920 	struct call_thread *ct;
921 	struct spdk_thread *thread;
922 
923 	ct = calloc(1, sizeof(*ct));
924 	if (!ct) {
925 		SPDK_ERRLOG("Unable to perform thread iteration\n");
926 		cpl(ctx);
927 		return;
928 	}
929 
930 	ct->fn = fn;
931 	ct->ctx = ctx;
932 	ct->cpl = cpl;
933 
934 	thread = _get_thread();
935 	if (!thread) {
936 		SPDK_ERRLOG("No thread allocated\n");
937 		free(ct);
938 		cpl(ctx);
939 		return;
940 	}
941 	ct->orig_thread = thread;
942 
943 	pthread_mutex_lock(&g_devlist_mutex);
944 	ct->cur_thread = TAILQ_FIRST(&g_threads);
945 	pthread_mutex_unlock(&g_devlist_mutex);
946 
947 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
948 		      ct->orig_thread->name);
949 
950 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
951 }
952 
953 void
954 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
955 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
956 			const char *name)
957 {
958 	struct io_device *dev, *tmp;
959 	struct spdk_thread *thread;
960 
961 	assert(io_device != NULL);
962 	assert(create_cb != NULL);
963 	assert(destroy_cb != NULL);
964 
965 	thread = spdk_get_thread();
966 	if (!thread) {
967 		SPDK_ERRLOG("called from non-SPDK thread\n");
968 		assert(false);
969 		return;
970 	}
971 
972 	dev = calloc(1, sizeof(struct io_device));
973 	if (dev == NULL) {
974 		SPDK_ERRLOG("could not allocate io_device\n");
975 		return;
976 	}
977 
978 	dev->io_device = io_device;
979 	if (name) {
980 		snprintf(dev->name, sizeof(dev->name), "%s", name);
981 	} else {
982 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
983 	}
984 	dev->create_cb = create_cb;
985 	dev->destroy_cb = destroy_cb;
986 	dev->unregister_cb = NULL;
987 	dev->ctx_size = ctx_size;
988 	dev->for_each_count = 0;
989 	dev->unregistered = false;
990 	dev->refcnt = 0;
991 
992 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
993 		      dev->name, dev->io_device, thread->name);
994 
995 	pthread_mutex_lock(&g_devlist_mutex);
996 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
997 		if (tmp->io_device == io_device) {
998 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
999 				    io_device, tmp->name, dev->name);
1000 			free(dev);
1001 			pthread_mutex_unlock(&g_devlist_mutex);
1002 			return;
1003 		}
1004 	}
1005 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1006 	pthread_mutex_unlock(&g_devlist_mutex);
1007 }
1008 
1009 static void
1010 _finish_unregister(void *arg)
1011 {
1012 	struct io_device *dev = arg;
1013 
1014 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1015 		      dev->name, dev->io_device, dev->unregister_thread->name);
1016 
1017 	dev->unregister_cb(dev->io_device);
1018 	free(dev);
1019 }
1020 
1021 static void
1022 _spdk_io_device_free(struct io_device *dev)
1023 {
1024 	if (dev->unregister_cb == NULL) {
1025 		free(dev);
1026 	} else {
1027 		assert(dev->unregister_thread != NULL);
1028 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
1029 			      dev->name, dev->io_device, dev->unregister_thread->name);
1030 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1031 	}
1032 }
1033 
1034 void
1035 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1036 {
1037 	struct io_device *dev;
1038 	uint32_t refcnt;
1039 	struct spdk_thread *thread;
1040 
1041 	thread = spdk_get_thread();
1042 	if (!thread) {
1043 		SPDK_ERRLOG("called from non-SPDK thread\n");
1044 		assert(false);
1045 		return;
1046 	}
1047 
1048 	pthread_mutex_lock(&g_devlist_mutex);
1049 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1050 		if (dev->io_device == io_device) {
1051 			break;
1052 		}
1053 	}
1054 
1055 	if (!dev) {
1056 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1057 		assert(false);
1058 		pthread_mutex_unlock(&g_devlist_mutex);
1059 		return;
1060 	}
1061 
1062 	if (dev->for_each_count > 0) {
1063 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1064 			    dev->name, io_device, dev->for_each_count);
1065 		pthread_mutex_unlock(&g_devlist_mutex);
1066 		return;
1067 	}
1068 
1069 	dev->unregister_cb = unregister_cb;
1070 	dev->unregistered = true;
1071 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
1072 	refcnt = dev->refcnt;
1073 	dev->unregister_thread = thread;
1074 	pthread_mutex_unlock(&g_devlist_mutex);
1075 
1076 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
1077 		      dev->name, dev->io_device, thread->name);
1078 
1079 	if (refcnt > 0) {
1080 		/* defer deletion */
1081 		return;
1082 	}
1083 
1084 	_spdk_io_device_free(dev);
1085 }
1086 
1087 struct spdk_io_channel *
1088 spdk_get_io_channel(void *io_device)
1089 {
1090 	struct spdk_io_channel *ch;
1091 	struct spdk_thread *thread;
1092 	struct io_device *dev;
1093 	int rc;
1094 
1095 	pthread_mutex_lock(&g_devlist_mutex);
1096 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1097 		if (dev->io_device == io_device) {
1098 			break;
1099 		}
1100 	}
1101 	if (dev == NULL) {
1102 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
1103 		pthread_mutex_unlock(&g_devlist_mutex);
1104 		return NULL;
1105 	}
1106 
1107 	thread = _get_thread();
1108 	if (!thread) {
1109 		SPDK_ERRLOG("No thread allocated\n");
1110 		pthread_mutex_unlock(&g_devlist_mutex);
1111 		return NULL;
1112 	}
1113 
1114 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1115 		if (ch->dev == dev) {
1116 			ch->ref++;
1117 
1118 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1119 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
1120 
1121 			/*
1122 			 * An I/O channel already exists for this device on this
1123 			 *  thread, so return it.
1124 			 */
1125 			pthread_mutex_unlock(&g_devlist_mutex);
1126 			return ch;
1127 		}
1128 	}
1129 
1130 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1131 	if (ch == NULL) {
1132 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1133 		pthread_mutex_unlock(&g_devlist_mutex);
1134 		return NULL;
1135 	}
1136 
1137 	ch->dev = dev;
1138 	ch->destroy_cb = dev->destroy_cb;
1139 	ch->thread = thread;
1140 	ch->ref = 1;
1141 	ch->destroy_ref = 0;
1142 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1143 
1144 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1145 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1146 
1147 	dev->refcnt++;
1148 
1149 	pthread_mutex_unlock(&g_devlist_mutex);
1150 
1151 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1152 	if (rc != 0) {
1153 		pthread_mutex_lock(&g_devlist_mutex);
1154 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1155 		dev->refcnt--;
1156 		free(ch);
1157 		pthread_mutex_unlock(&g_devlist_mutex);
1158 		return NULL;
1159 	}
1160 
1161 	return ch;
1162 }
1163 
1164 static void
1165 _spdk_put_io_channel(void *arg)
1166 {
1167 	struct spdk_io_channel *ch = arg;
1168 	bool do_remove_dev = true;
1169 	struct spdk_thread *thread;
1170 
1171 	thread = spdk_get_thread();
1172 	if (!thread) {
1173 		SPDK_ERRLOG("called from non-SPDK thread\n");
1174 		assert(false);
1175 		return;
1176 	}
1177 
1178 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1179 		      "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
1180 		      ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
1181 
1182 	assert(ch->thread == thread);
1183 
1184 	ch->destroy_ref--;
1185 
1186 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1187 		/*
1188 		 * Another reference to the associated io_device was requested
1189 		 *  after this message was sent but before it had a chance to
1190 		 *  execute.
1191 		 */
1192 		return;
1193 	}
1194 
1195 	pthread_mutex_lock(&g_devlist_mutex);
1196 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1197 	pthread_mutex_unlock(&g_devlist_mutex);
1198 
1199 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1200 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1201 
1202 	pthread_mutex_lock(&g_devlist_mutex);
1203 	ch->dev->refcnt--;
1204 
1205 	if (!ch->dev->unregistered) {
1206 		do_remove_dev = false;
1207 	}
1208 
1209 	if (ch->dev->refcnt > 0) {
1210 		do_remove_dev = false;
1211 	}
1212 
1213 	pthread_mutex_unlock(&g_devlist_mutex);
1214 
1215 	if (do_remove_dev) {
1216 		_spdk_io_device_free(ch->dev);
1217 	}
1218 	free(ch);
1219 }
1220 
1221 void
1222 spdk_put_io_channel(struct spdk_io_channel *ch)
1223 {
1224 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1225 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1226 		      ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
1227 
1228 	ch->ref--;
1229 
1230 	if (ch->ref == 0) {
1231 		ch->destroy_ref++;
1232 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
1233 	}
1234 }
1235 
1236 struct spdk_io_channel *
1237 spdk_io_channel_from_ctx(void *ctx)
1238 {
1239 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1240 }
1241 
1242 struct spdk_thread *
1243 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1244 {
1245 	return ch->thread;
1246 }
1247 
1248 struct spdk_io_channel_iter {
1249 	void *io_device;
1250 	struct io_device *dev;
1251 	spdk_channel_msg fn;
1252 	int status;
1253 	void *ctx;
1254 	struct spdk_io_channel *ch;
1255 
1256 	struct spdk_thread *cur_thread;
1257 
1258 	struct spdk_thread *orig_thread;
1259 	spdk_channel_for_each_cpl cpl;
1260 };
1261 
1262 void *
1263 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1264 {
1265 	return i->io_device;
1266 }
1267 
1268 struct spdk_io_channel *
1269 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1270 {
1271 	return i->ch;
1272 }
1273 
1274 void *
1275 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1276 {
1277 	return i->ctx;
1278 }
1279 
1280 static void
1281 _call_completion(void *ctx)
1282 {
1283 	struct spdk_io_channel_iter *i = ctx;
1284 
1285 	if (i->cpl != NULL) {
1286 		i->cpl(i, i->status);
1287 	}
1288 	free(i);
1289 }
1290 
1291 static void
1292 _call_channel(void *ctx)
1293 {
1294 	struct spdk_io_channel_iter *i = ctx;
1295 	struct spdk_io_channel *ch;
1296 
1297 	/*
1298 	 * It is possible that the channel was deleted before this
1299 	 *  message had a chance to execute.  If so, skip calling
1300 	 *  the fn() on this thread.
1301 	 */
1302 	pthread_mutex_lock(&g_devlist_mutex);
1303 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1304 		if (ch->dev->io_device == i->io_device) {
1305 			break;
1306 		}
1307 	}
1308 	pthread_mutex_unlock(&g_devlist_mutex);
1309 
1310 	if (ch) {
1311 		i->fn(i);
1312 	} else {
1313 		spdk_for_each_channel_continue(i, 0);
1314 	}
1315 }
1316 
1317 void
1318 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1319 		      spdk_channel_for_each_cpl cpl)
1320 {
1321 	struct spdk_thread *thread;
1322 	struct spdk_io_channel *ch;
1323 	struct spdk_io_channel_iter *i;
1324 	int rc __attribute__((unused));
1325 
1326 	i = calloc(1, sizeof(*i));
1327 	if (!i) {
1328 		SPDK_ERRLOG("Unable to allocate iterator\n");
1329 		return;
1330 	}
1331 
1332 	i->io_device = io_device;
1333 	i->fn = fn;
1334 	i->ctx = ctx;
1335 	i->cpl = cpl;
1336 
1337 	pthread_mutex_lock(&g_devlist_mutex);
1338 	i->orig_thread = _get_thread();
1339 
1340 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1341 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1342 			if (ch->dev->io_device == io_device) {
1343 				ch->dev->for_each_count++;
1344 				i->dev = ch->dev;
1345 				i->cur_thread = thread;
1346 				i->ch = ch;
1347 				pthread_mutex_unlock(&g_devlist_mutex);
1348 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1349 				assert(rc == 0);
1350 				return;
1351 			}
1352 		}
1353 	}
1354 
1355 	pthread_mutex_unlock(&g_devlist_mutex);
1356 
1357 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1358 	assert(rc == 0);
1359 }
1360 
1361 void
1362 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1363 {
1364 	struct spdk_thread *thread;
1365 	struct spdk_io_channel *ch;
1366 
1367 	assert(i->cur_thread == spdk_get_thread());
1368 
1369 	i->status = status;
1370 
1371 	pthread_mutex_lock(&g_devlist_mutex);
1372 	if (status) {
1373 		goto end;
1374 	}
1375 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1376 	while (thread) {
1377 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1378 			if (ch->dev->io_device == i->io_device) {
1379 				i->cur_thread = thread;
1380 				i->ch = ch;
1381 				pthread_mutex_unlock(&g_devlist_mutex);
1382 				spdk_thread_send_msg(thread, _call_channel, i);
1383 				return;
1384 			}
1385 		}
1386 		thread = TAILQ_NEXT(thread, tailq);
1387 	}
1388 
1389 end:
1390 	i->dev->for_each_count--;
1391 	i->ch = NULL;
1392 	pthread_mutex_unlock(&g_devlist_mutex);
1393 
1394 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1395 }
1396 
1397 
1398 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1399