xref: /spdk/lib/thread/thread.c (revision afaabcce2388835082b8653b595898f9ca8c3c24)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/string.h"
37 #include "spdk/thread.h"
38 
39 #include "spdk_internal/log.h"
40 
41 #ifdef __linux__
42 #include <sys/prctl.h>
43 #endif
44 
45 #ifdef __FreeBSD__
46 #include <pthread_np.h>
47 #endif
48 
49 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
50 
51 struct io_device {
52 	void				*io_device;
53 	spdk_io_channel_create_cb	create_cb;
54 	spdk_io_channel_destroy_cb	destroy_cb;
55 	spdk_io_device_unregister_cb	unregister_cb;
56 	struct spdk_thread		*unregister_thread;
57 	uint32_t			ctx_size;
58 	uint32_t			for_each_count;
59 	TAILQ_ENTRY(io_device)		tailq;
60 
61 	uint32_t			refcnt;
62 
63 	bool				unregistered;
64 };
65 
66 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
67 
68 struct spdk_thread {
69 	pthread_t			thread_id;
70 	spdk_thread_pass_msg		msg_fn;
71 	spdk_start_poller		start_poller_fn;
72 	spdk_stop_poller		stop_poller_fn;
73 	void				*thread_ctx;
74 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
75 	TAILQ_ENTRY(spdk_thread)	tailq;
76 	char				*name;
77 };
78 
79 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
80 static uint32_t g_thread_count = 0;
81 
82 static struct spdk_thread *
83 _get_thread(void)
84 {
85 	pthread_t thread_id;
86 	struct spdk_thread *thread;
87 
88 	thread_id = pthread_self();
89 
90 	thread = NULL;
91 	TAILQ_FOREACH(thread, &g_threads, tailq) {
92 		if (thread->thread_id == thread_id) {
93 			return thread;
94 		}
95 	}
96 
97 	return NULL;
98 }
99 
100 static void
101 _set_thread_name(const char *thread_name)
102 {
103 #if defined(__linux__)
104 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
105 #elif defined(__FreeBSD__)
106 	pthread_set_name_np(pthread_self(), thread_name);
107 #else
108 #error missing platform support for thread name
109 #endif
110 }
111 
112 int
113 spdk_thread_lib_init(void)
114 {
115 	return 0;
116 }
117 
118 void
119 spdk_thread_lib_fini(void)
120 {
121 }
122 
123 struct spdk_thread *
124 spdk_allocate_thread(spdk_thread_pass_msg msg_fn,
125 		     spdk_start_poller start_poller_fn,
126 		     spdk_stop_poller stop_poller_fn,
127 		     void *thread_ctx, const char *name)
128 {
129 	struct spdk_thread *thread;
130 
131 	pthread_mutex_lock(&g_devlist_mutex);
132 
133 	thread = _get_thread();
134 	if (thread) {
135 		SPDK_ERRLOG("Double allocated SPDK thread\n");
136 		pthread_mutex_unlock(&g_devlist_mutex);
137 		return NULL;
138 	}
139 
140 	thread = calloc(1, sizeof(*thread));
141 	if (!thread) {
142 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
143 		pthread_mutex_unlock(&g_devlist_mutex);
144 		return NULL;
145 	}
146 
147 	thread->thread_id = pthread_self();
148 	thread->msg_fn = msg_fn;
149 	thread->start_poller_fn = start_poller_fn;
150 	thread->stop_poller_fn = stop_poller_fn;
151 	thread->thread_ctx = thread_ctx;
152 	TAILQ_INIT(&thread->io_channels);
153 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
154 	g_thread_count++;
155 	if (name) {
156 		_set_thread_name(name);
157 		thread->name = strdup(name);
158 	} else {
159 		thread->name = spdk_sprintf_alloc("%p", thread);
160 	}
161 
162 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
163 
164 	pthread_mutex_unlock(&g_devlist_mutex);
165 
166 	return thread;
167 }
168 
169 void
170 spdk_free_thread(void)
171 {
172 	struct spdk_thread *thread;
173 
174 	pthread_mutex_lock(&g_devlist_mutex);
175 
176 	thread = _get_thread();
177 	if (!thread) {
178 		SPDK_ERRLOG("No thread allocated\n");
179 		pthread_mutex_unlock(&g_devlist_mutex);
180 		return;
181 	}
182 
183 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name);
184 
185 	assert(g_thread_count > 0);
186 	g_thread_count--;
187 	TAILQ_REMOVE(&g_threads, thread, tailq);
188 	free(thread->name);
189 	free(thread);
190 
191 	pthread_mutex_unlock(&g_devlist_mutex);
192 }
193 
194 uint32_t
195 spdk_thread_get_count(void)
196 {
197 	/*
198 	 * Return cached value of the current thread count.  We could acquire the
199 	 *  lock and iterate through the TAILQ of threads to count them, but that
200 	 *  count could still be invalidated after we release the lock.
201 	 */
202 	return g_thread_count;
203 }
204 
205 struct spdk_thread *
206 spdk_get_thread(void)
207 {
208 	struct spdk_thread *thread;
209 
210 	pthread_mutex_lock(&g_devlist_mutex);
211 
212 	thread = _get_thread();
213 	if (!thread) {
214 		SPDK_ERRLOG("No thread allocated\n");
215 	}
216 
217 	pthread_mutex_unlock(&g_devlist_mutex);
218 
219 	return thread;
220 }
221 
222 const char *
223 spdk_thread_get_name(const struct spdk_thread *thread)
224 {
225 	return thread->name;
226 }
227 
228 void
229 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx)
230 {
231 	thread->msg_fn(fn, ctx, thread->thread_ctx);
232 }
233 
234 
235 struct spdk_poller *
236 spdk_poller_register(spdk_poller_fn fn,
237 		     void *arg,
238 		     uint64_t period_microseconds)
239 {
240 	struct spdk_thread *thread;
241 	struct spdk_poller *poller;
242 
243 	thread = spdk_get_thread();
244 	if (!thread) {
245 		assert(false);
246 		return NULL;
247 	}
248 
249 	if (!thread->start_poller_fn || !thread->stop_poller_fn) {
250 		SPDK_ERRLOG("No related functions to start requested poller\n");
251 		assert(false);
252 		return NULL;
253 	}
254 
255 	poller = thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds);
256 	if (!poller) {
257 		SPDK_ERRLOG("Unable to start requested poller\n");
258 		assert(false);
259 		return NULL;
260 	}
261 
262 	return poller;
263 }
264 
265 void
266 spdk_poller_unregister(struct spdk_poller **ppoller)
267 {
268 	struct spdk_thread *thread;
269 	struct spdk_poller *poller;
270 
271 	poller = *ppoller;
272 	if (poller == NULL) {
273 		return;
274 	}
275 
276 	*ppoller = NULL;
277 
278 	thread = spdk_get_thread();
279 
280 	if (thread) {
281 		thread->stop_poller_fn(poller, thread->thread_ctx);
282 	}
283 }
284 
285 struct call_thread {
286 	struct spdk_thread *cur_thread;
287 	spdk_thread_fn fn;
288 	void *ctx;
289 
290 	struct spdk_thread *orig_thread;
291 	spdk_thread_fn cpl;
292 };
293 
294 static void
295 spdk_on_thread(void *ctx)
296 {
297 	struct call_thread *ct = ctx;
298 
299 	ct->fn(ct->ctx);
300 
301 	pthread_mutex_lock(&g_devlist_mutex);
302 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
303 	pthread_mutex_unlock(&g_devlist_mutex);
304 
305 	if (!ct->cur_thread) {
306 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration");
307 
308 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
309 		free(ctx);
310 	} else {
311 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
312 			      ct->cur_thread->name);
313 
314 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
315 	}
316 }
317 
318 void
319 spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl)
320 {
321 	struct call_thread *ct;
322 
323 	ct = calloc(1, sizeof(*ct));
324 	if (!ct) {
325 		SPDK_ERRLOG("Unable to perform thread iteration\n");
326 		cpl(ctx);
327 		return;
328 	}
329 
330 	ct->fn = fn;
331 	ct->ctx = ctx;
332 	ct->cpl = cpl;
333 
334 	pthread_mutex_lock(&g_devlist_mutex);
335 	ct->orig_thread = _get_thread();
336 	ct->cur_thread = TAILQ_FIRST(&g_threads);
337 	pthread_mutex_unlock(&g_devlist_mutex);
338 
339 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
340 		      ct->orig_thread->name);
341 
342 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
343 }
344 
345 void
346 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
347 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size)
348 {
349 	struct io_device *dev, *tmp;
350 
351 	assert(io_device != NULL);
352 	assert(create_cb != NULL);
353 	assert(destroy_cb != NULL);
354 
355 	dev = calloc(1, sizeof(struct io_device));
356 	if (dev == NULL) {
357 		SPDK_ERRLOG("could not allocate io_device\n");
358 		return;
359 	}
360 
361 	dev->io_device = io_device;
362 	dev->create_cb = create_cb;
363 	dev->destroy_cb = destroy_cb;
364 	dev->unregister_cb = NULL;
365 	dev->ctx_size = ctx_size;
366 	dev->for_each_count = 0;
367 	dev->unregistered = false;
368 	dev->refcnt = 0;
369 
370 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %p on thread %s\n",
371 		      dev, spdk_get_thread()->name);
372 
373 	pthread_mutex_lock(&g_devlist_mutex);
374 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
375 		if (tmp->io_device == io_device) {
376 			SPDK_ERRLOG("io_device %p already registered\n", io_device);
377 			free(dev);
378 			pthread_mutex_unlock(&g_devlist_mutex);
379 			return;
380 		}
381 	}
382 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
383 	pthread_mutex_unlock(&g_devlist_mutex);
384 }
385 
386 static void
387 _finish_unregister(void *arg)
388 {
389 	struct io_device *dev = arg;
390 
391 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %p on thread %s\n",
392 		      dev, dev->unregister_thread->name);
393 
394 	dev->unregister_cb(dev->io_device);
395 	free(dev);
396 }
397 
398 static void
399 _spdk_io_device_free(struct io_device *dev)
400 {
401 	if (dev->unregister_cb == NULL) {
402 		free(dev);
403 	} else {
404 		assert(dev->unregister_thread != NULL);
405 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %p needs to unregister from thread %s\n",
406 			      dev, dev->unregister_thread->name);
407 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
408 	}
409 }
410 
411 void
412 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
413 {
414 	struct io_device *dev;
415 	uint32_t refcnt;
416 	struct spdk_thread *thread;
417 
418 	thread = spdk_get_thread();
419 
420 	pthread_mutex_lock(&g_devlist_mutex);
421 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
422 		if (dev->io_device == io_device) {
423 			break;
424 		}
425 	}
426 
427 	if (!dev) {
428 		SPDK_ERRLOG("io_device %p not found\n", io_device);
429 		assert(false);
430 		pthread_mutex_unlock(&g_devlist_mutex);
431 		return;
432 	}
433 
434 	if (dev->for_each_count > 0) {
435 		SPDK_ERRLOG("io_device %p has %u for_each calls outstanding\n", io_device, dev->for_each_count);
436 		pthread_mutex_unlock(&g_devlist_mutex);
437 		return;
438 	}
439 
440 	dev->unregister_cb = unregister_cb;
441 	dev->unregistered = true;
442 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
443 	refcnt = dev->refcnt;
444 	dev->unregister_thread = thread;
445 	pthread_mutex_unlock(&g_devlist_mutex);
446 
447 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %p from thread %s\n",
448 		      dev, thread->name);
449 
450 	if (refcnt > 0) {
451 		/* defer deletion */
452 		return;
453 	}
454 
455 	_spdk_io_device_free(dev);
456 }
457 
458 struct spdk_io_channel *
459 spdk_get_io_channel(void *io_device)
460 {
461 	struct spdk_io_channel *ch;
462 	struct spdk_thread *thread;
463 	struct io_device *dev;
464 	int rc;
465 
466 	pthread_mutex_lock(&g_devlist_mutex);
467 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
468 		if (dev->io_device == io_device) {
469 			break;
470 		}
471 	}
472 	if (dev == NULL) {
473 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
474 		pthread_mutex_unlock(&g_devlist_mutex);
475 		return NULL;
476 	}
477 
478 	thread = _get_thread();
479 	if (!thread) {
480 		SPDK_ERRLOG("No thread allocated\n");
481 		pthread_mutex_unlock(&g_devlist_mutex);
482 		return NULL;
483 	}
484 
485 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
486 		if (ch->dev == dev) {
487 			ch->ref++;
488 
489 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %p on thread %s refcnt %u\n",
490 				      ch, dev, thread->name, ch->ref);
491 
492 			/*
493 			 * An I/O channel already exists for this device on this
494 			 *  thread, so return it.
495 			 */
496 			pthread_mutex_unlock(&g_devlist_mutex);
497 			return ch;
498 		}
499 	}
500 
501 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
502 	if (ch == NULL) {
503 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
504 		pthread_mutex_unlock(&g_devlist_mutex);
505 		return NULL;
506 	}
507 
508 	ch->dev = dev;
509 	ch->destroy_cb = dev->destroy_cb;
510 	ch->thread = thread;
511 	ch->ref = 1;
512 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
513 
514 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %p on thread %s refcnt %u\n",
515 		      ch, dev, thread->name, ch->ref);
516 
517 	dev->refcnt++;
518 
519 	pthread_mutex_unlock(&g_devlist_mutex);
520 
521 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
522 	if (rc == -1) {
523 		pthread_mutex_lock(&g_devlist_mutex);
524 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
525 		dev->refcnt--;
526 		free(ch);
527 		pthread_mutex_unlock(&g_devlist_mutex);
528 		return NULL;
529 	}
530 
531 	return ch;
532 }
533 
534 static void
535 _spdk_put_io_channel(void *arg)
536 {
537 	struct spdk_io_channel *ch = arg;
538 	bool do_remove_dev = true;
539 
540 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
541 		      "Releasing io_channel %p for io_device %p. Channel thread %p. Current thread %s\n",
542 		      ch, ch->dev, ch->thread, spdk_get_thread()->name);
543 
544 	assert(ch->thread == spdk_get_thread());
545 
546 	if (ch->ref > 0) {
547 		/*
548 		 * Another reference to the associated io_device was requested
549 		 *  after this message was sent but before it had a chance to
550 		 *  execute.
551 		 */
552 		return;
553 	}
554 
555 	pthread_mutex_lock(&g_devlist_mutex);
556 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
557 	pthread_mutex_unlock(&g_devlist_mutex);
558 
559 	/* Don't hold the devlist mutex while the destroy_cb is called. */
560 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
561 
562 	pthread_mutex_lock(&g_devlist_mutex);
563 	ch->dev->refcnt--;
564 
565 	if (!ch->dev->unregistered) {
566 		do_remove_dev = false;
567 	}
568 
569 	if (ch->dev->refcnt > 0) {
570 		do_remove_dev = false;
571 	}
572 
573 	pthread_mutex_unlock(&g_devlist_mutex);
574 
575 	if (do_remove_dev) {
576 		_spdk_io_device_free(ch->dev);
577 	}
578 	free(ch);
579 }
580 
581 void
582 spdk_put_io_channel(struct spdk_io_channel *ch)
583 {
584 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Putting io_channel %p for io_device %p on thread %s refcnt %u\n",
585 		      ch, ch->dev, ch->thread->name, ch->ref);
586 
587 	ch->ref--;
588 
589 	if (ch->ref == 0) {
590 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
591 	}
592 }
593 
594 struct spdk_io_channel *
595 spdk_io_channel_from_ctx(void *ctx)
596 {
597 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
598 }
599 
600 struct spdk_thread *
601 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
602 {
603 	return ch->thread;
604 }
605 
606 struct spdk_io_channel_iter {
607 	void *io_device;
608 	struct io_device *dev;
609 	spdk_channel_msg fn;
610 	int status;
611 	void *ctx;
612 	struct spdk_io_channel *ch;
613 
614 	struct spdk_thread *cur_thread;
615 
616 	struct spdk_thread *orig_thread;
617 	spdk_channel_for_each_cpl cpl;
618 };
619 
620 void *
621 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
622 {
623 	return i->io_device;
624 }
625 
626 struct spdk_io_channel *
627 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
628 {
629 	return i->ch;
630 }
631 
632 void *
633 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
634 {
635 	return i->ctx;
636 }
637 
638 static void
639 _call_completion(void *ctx)
640 {
641 	struct spdk_io_channel_iter *i = ctx;
642 
643 	if (i->cpl != NULL) {
644 		i->cpl(i, i->status);
645 	}
646 	free(i);
647 }
648 
649 static void
650 _call_channel(void *ctx)
651 {
652 	struct spdk_io_channel_iter *i = ctx;
653 	struct spdk_io_channel *ch;
654 
655 	/*
656 	 * It is possible that the channel was deleted before this
657 	 *  message had a chance to execute.  If so, skip calling
658 	 *  the fn() on this thread.
659 	 */
660 	pthread_mutex_lock(&g_devlist_mutex);
661 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
662 		if (ch->dev->io_device == i->io_device) {
663 			break;
664 		}
665 	}
666 	pthread_mutex_unlock(&g_devlist_mutex);
667 
668 	if (ch) {
669 		i->fn(i);
670 	} else {
671 		spdk_for_each_channel_continue(i, 0);
672 	}
673 }
674 
675 void
676 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
677 		      spdk_channel_for_each_cpl cpl)
678 {
679 	struct spdk_thread *thread;
680 	struct spdk_io_channel *ch;
681 	struct spdk_io_channel_iter *i;
682 
683 	i = calloc(1, sizeof(*i));
684 	if (!i) {
685 		SPDK_ERRLOG("Unable to allocate iterator\n");
686 		return;
687 	}
688 
689 	i->io_device = io_device;
690 	i->fn = fn;
691 	i->ctx = ctx;
692 	i->cpl = cpl;
693 
694 	pthread_mutex_lock(&g_devlist_mutex);
695 	i->orig_thread = _get_thread();
696 
697 	TAILQ_FOREACH(thread, &g_threads, tailq) {
698 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
699 			if (ch->dev->io_device == io_device) {
700 				ch->dev->for_each_count++;
701 				i->dev = ch->dev;
702 				i->cur_thread = thread;
703 				i->ch = ch;
704 				pthread_mutex_unlock(&g_devlist_mutex);
705 				spdk_thread_send_msg(thread, _call_channel, i);
706 				return;
707 			}
708 		}
709 	}
710 
711 	pthread_mutex_unlock(&g_devlist_mutex);
712 
713 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
714 }
715 
716 void
717 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
718 {
719 	struct spdk_thread *thread;
720 	struct spdk_io_channel *ch;
721 
722 	assert(i->cur_thread == spdk_get_thread());
723 
724 	i->status = status;
725 
726 	pthread_mutex_lock(&g_devlist_mutex);
727 	if (status) {
728 		goto end;
729 	}
730 	thread = TAILQ_NEXT(i->cur_thread, tailq);
731 	while (thread) {
732 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
733 			if (ch->dev->io_device == i->io_device) {
734 				i->cur_thread = thread;
735 				i->ch = ch;
736 				pthread_mutex_unlock(&g_devlist_mutex);
737 				spdk_thread_send_msg(thread, _call_channel, i);
738 				return;
739 			}
740 		}
741 		thread = TAILQ_NEXT(thread, tailq);
742 	}
743 
744 end:
745 	i->dev->for_each_count--;
746 	i->ch = NULL;
747 	pthread_mutex_unlock(&g_devlist_mutex);
748 
749 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
750 }
751 
752 
753 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
754