xref: /spdk/lib/thread/thread.c (revision df6b55fd8cb22b0172c2daf39561439f4a162e55)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/queue.h"
38 #include "spdk/string.h"
39 #include "spdk/thread.h"
40 #include "spdk/util.h"
41 
42 #include "spdk_internal/log.h"
43 #include "spdk_internal/thread.h"
44 
45 #ifdef __linux__
46 #include <sys/prctl.h>
47 #endif
48 
49 #ifdef __FreeBSD__
50 #include <pthread_np.h>
51 #endif
52 
53 #define SPDK_MSG_BATCH_SIZE		8
54 
55 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
56 
57 static spdk_new_thread_fn g_new_thread_fn = NULL;
58 
59 struct io_device {
60 	void				*io_device;
61 	char				*name;
62 	spdk_io_channel_create_cb	create_cb;
63 	spdk_io_channel_destroy_cb	destroy_cb;
64 	spdk_io_device_unregister_cb	unregister_cb;
65 	struct spdk_thread		*unregister_thread;
66 	uint32_t			ctx_size;
67 	uint32_t			for_each_count;
68 	TAILQ_ENTRY(io_device)		tailq;
69 
70 	uint32_t			refcnt;
71 
72 	bool				unregistered;
73 };
74 
75 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
76 
77 struct spdk_msg {
78 	spdk_msg_fn		fn;
79 	void			*arg;
80 
81 	SLIST_ENTRY(spdk_msg)	link;
82 };
83 
84 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
85 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
86 
87 enum spdk_poller_state {
88 	/* The poller is registered with a thread but not currently executing its fn. */
89 	SPDK_POLLER_STATE_WAITING,
90 
91 	/* The poller is currently running its fn. */
92 	SPDK_POLLER_STATE_RUNNING,
93 
94 	/* The poller was unregistered during the execution of its fn. */
95 	SPDK_POLLER_STATE_UNREGISTERED,
96 };
97 
98 struct spdk_poller {
99 	TAILQ_ENTRY(spdk_poller)	tailq;
100 
101 	/* Current state of the poller; should only be accessed from the poller's thread. */
102 	enum spdk_poller_state		state;
103 
104 	uint64_t			period_ticks;
105 	uint64_t			next_run_tick;
106 	spdk_poller_fn			fn;
107 	void				*arg;
108 };
109 
110 struct spdk_thread {
111 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
112 	TAILQ_ENTRY(spdk_thread)	tailq;
113 	char				*name;
114 
115 	uint64_t			tsc_last;
116 	struct spdk_thread_stats	stats;
117 
118 	/*
119 	 * Contains pollers actively running on this thread.  Pollers
120 	 *  are run round-robin. The thread takes one poller from the head
121 	 *  of the ring, executes it, then puts it back at the tail of
122 	 *  the ring.
123 	 */
124 	TAILQ_HEAD(, spdk_poller)	active_pollers;
125 
126 	/**
127 	 * Contains pollers running on this thread with a periodic timer.
128 	 */
129 	TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers;
130 
131 	struct spdk_ring		*messages;
132 
133 	SLIST_HEAD(, spdk_msg)		msg_cache;
134 	size_t				msg_cache_count;
135 };
136 
137 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
138 static uint32_t g_thread_count = 0;
139 
140 static __thread struct spdk_thread *tls_thread = NULL;
141 
142 static inline struct spdk_thread *
143 _get_thread(void)
144 {
145 	return tls_thread;
146 }
147 
148 static void
149 _set_thread_name(const char *thread_name)
150 {
151 #if defined(__linux__)
152 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
153 #elif defined(__FreeBSD__)
154 	pthread_set_name_np(pthread_self(), thread_name);
155 #else
156 #error missing platform support for thread name
157 #endif
158 }
159 
160 int
161 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn)
162 {
163 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
164 
165 	assert(g_new_thread_fn == NULL);
166 	g_new_thread_fn = new_thread_fn;
167 
168 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
169 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
170 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
171 			     sizeof(struct spdk_msg),
172 			     0, /* No cache. We do our own. */
173 			     SPDK_ENV_SOCKET_ID_ANY);
174 
175 	if (!g_spdk_msg_mempool) {
176 		return -1;
177 	}
178 
179 	return 0;
180 }
181 
182 void
183 spdk_thread_lib_fini(void)
184 {
185 	struct io_device *dev;
186 
187 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
188 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
189 	}
190 
191 	if (g_spdk_msg_mempool) {
192 		spdk_mempool_free(g_spdk_msg_mempool);
193 	}
194 }
195 
196 struct spdk_thread *
197 spdk_thread_create(const char *name)
198 {
199 	struct spdk_thread *thread;
200 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
201 	int rc, i;
202 
203 	thread = calloc(1, sizeof(*thread));
204 	if (!thread) {
205 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
206 		return NULL;
207 	}
208 
209 	TAILQ_INIT(&thread->io_channels);
210 	TAILQ_INIT(&thread->active_pollers);
211 	TAILQ_INIT(&thread->timer_pollers);
212 	SLIST_INIT(&thread->msg_cache);
213 	thread->msg_cache_count = 0;
214 
215 	thread->tsc_last = spdk_get_ticks();
216 
217 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
218 	if (!thread->messages) {
219 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
220 		free(thread);
221 		return NULL;
222 	}
223 
224 	/* Fill the local message pool cache. */
225 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
226 	if (rc == 0) {
227 		/* If we can't populate the cache it's ok. The cache will get filled
228 		 * up organically as messages are passed to the thread. */
229 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
230 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
231 			thread->msg_cache_count++;
232 		}
233 	}
234 
235 	if (name) {
236 		_set_thread_name(name);
237 		thread->name = strdup(name);
238 	} else {
239 		thread->name = spdk_sprintf_alloc("%p", thread);
240 	}
241 
242 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
243 
244 	pthread_mutex_lock(&g_devlist_mutex);
245 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
246 	g_thread_count++;
247 	pthread_mutex_unlock(&g_devlist_mutex);
248 
249 	if (g_new_thread_fn) {
250 		g_new_thread_fn(thread);
251 	}
252 
253 	return thread;
254 }
255 
256 void
257 spdk_set_thread(struct spdk_thread *thread)
258 {
259 	tls_thread = thread;
260 }
261 
262 void
263 spdk_thread_exit(struct spdk_thread *thread)
264 {
265 	struct spdk_io_channel *ch;
266 	struct spdk_msg *msg;
267 
268 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name);
269 
270 	if (tls_thread == thread) {
271 		tls_thread = NULL;
272 	}
273 
274 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
275 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
276 			    thread->name, ch->dev->name);
277 	}
278 
279 	pthread_mutex_lock(&g_devlist_mutex);
280 	assert(g_thread_count > 0);
281 	g_thread_count--;
282 	TAILQ_REMOVE(&g_threads, thread, tailq);
283 	pthread_mutex_unlock(&g_devlist_mutex);
284 
285 	free(thread->name);
286 
287 	msg = SLIST_FIRST(&thread->msg_cache);
288 	while (msg != NULL) {
289 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
290 
291 		assert(thread->msg_cache_count > 0);
292 		thread->msg_cache_count--;
293 		spdk_mempool_put(g_spdk_msg_mempool, msg);
294 
295 		msg = SLIST_FIRST(&thread->msg_cache);
296 	}
297 
298 	assert(thread->msg_cache_count == 0);
299 
300 	if (thread->messages) {
301 		spdk_ring_free(thread->messages);
302 	}
303 
304 	free(thread);
305 }
306 
307 static inline uint32_t
308 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
309 {
310 	unsigned count, i;
311 	void *messages[SPDK_MSG_BATCH_SIZE];
312 
313 #ifdef DEBUG
314 	/*
315 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
316 	 * so we will never actually read uninitialized data from events, but just to be sure
317 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
318 	 */
319 	memset(messages, 0, sizeof(messages));
320 #endif
321 
322 	if (max_msgs > 0) {
323 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
324 	} else {
325 		max_msgs = SPDK_MSG_BATCH_SIZE;
326 	}
327 
328 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
329 	if (count == 0) {
330 		return 0;
331 	}
332 
333 	for (i = 0; i < count; i++) {
334 		struct spdk_msg *msg = messages[i];
335 
336 		assert(msg != NULL);
337 		msg->fn(msg->arg);
338 
339 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
340 			/* Insert the messages at the head. We want to re-use the hot
341 			 * ones. */
342 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
343 			thread->msg_cache_count++;
344 		} else {
345 			spdk_mempool_put(g_spdk_msg_mempool, msg);
346 		}
347 	}
348 
349 	return count;
350 }
351 
352 static void
353 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
354 {
355 	struct spdk_poller *iter;
356 
357 	poller->next_run_tick = now + poller->period_ticks;
358 
359 	/*
360 	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
361 	 * run time.
362 	 */
363 	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
364 		if (iter->next_run_tick <= poller->next_run_tick) {
365 			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
366 			return;
367 		}
368 	}
369 
370 	/* No earlier pollers were found, so this poller must be the new head */
371 	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
372 }
373 
374 int
375 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
376 {
377 	uint32_t msg_count;
378 	struct spdk_thread *orig_thread;
379 	struct spdk_poller *poller;
380 	int rc = 0;
381 
382 	orig_thread = _get_thread();
383 	tls_thread = thread;
384 
385 	if (now == 0) {
386 		now = spdk_get_ticks();
387 	}
388 
389 	msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
390 	if (msg_count) {
391 		rc = 1;
392 	}
393 
394 	poller = TAILQ_FIRST(&thread->active_pollers);
395 	if (poller) {
396 		int poller_rc;
397 
398 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
399 		poller->state = SPDK_POLLER_STATE_RUNNING;
400 		poller_rc = poller->fn(poller->arg);
401 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
402 			free(poller);
403 		} else {
404 			poller->state = SPDK_POLLER_STATE_WAITING;
405 			TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
406 		}
407 
408 #ifdef DEBUG
409 		if (poller_rc == -1) {
410 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
411 		}
412 #endif
413 
414 		if (poller_rc > rc) {
415 			rc = poller_rc;
416 		}
417 	}
418 
419 	poller = TAILQ_FIRST(&thread->timer_pollers);
420 	if (poller) {
421 		if (now >= poller->next_run_tick) {
422 			int timer_rc = 0;
423 
424 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
425 			poller->state = SPDK_POLLER_STATE_RUNNING;
426 			timer_rc = poller->fn(poller->arg);
427 			if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
428 				free(poller);
429 			} else {
430 				poller->state = SPDK_POLLER_STATE_WAITING;
431 				_spdk_poller_insert_timer(thread, poller, now);
432 			}
433 
434 #ifdef DEBUG
435 			if (timer_rc == -1) {
436 				SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
437 			}
438 #endif
439 
440 			if (timer_rc > rc) {
441 				rc = timer_rc;
442 
443 			}
444 		}
445 	}
446 
447 	if (rc == 0) {
448 		/* Poller status idle */
449 		thread->stats.idle_tsc += now - thread->tsc_last;
450 	} else if (rc > 0) {
451 		/* Poller status busy */
452 		thread->stats.busy_tsc += now - thread->tsc_last;
453 	} else {
454 		/* Poller status unknown */
455 		thread->stats.unknown_tsc += now - thread->tsc_last;
456 	}
457 	thread->tsc_last = now;
458 
459 	tls_thread = orig_thread;
460 
461 	return rc;
462 }
463 
464 uint64_t
465 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
466 {
467 	struct spdk_poller *poller;
468 
469 	poller = TAILQ_FIRST(&thread->timer_pollers);
470 	if (poller) {
471 		return poller->next_run_tick;
472 	}
473 
474 	return 0;
475 }
476 
477 int
478 spdk_thread_has_active_pollers(struct spdk_thread *thread)
479 {
480 	return !TAILQ_EMPTY(&thread->active_pollers);
481 }
482 
483 uint32_t
484 spdk_thread_get_count(void)
485 {
486 	/*
487 	 * Return cached value of the current thread count.  We could acquire the
488 	 *  lock and iterate through the TAILQ of threads to count them, but that
489 	 *  count could still be invalidated after we release the lock.
490 	 */
491 	return g_thread_count;
492 }
493 
494 struct spdk_thread *
495 spdk_get_thread(void)
496 {
497 	struct spdk_thread *thread;
498 
499 	thread = _get_thread();
500 	if (!thread) {
501 		SPDK_ERRLOG("No thread allocated\n");
502 	}
503 
504 	return thread;
505 }
506 
507 const char *
508 spdk_thread_get_name(const struct spdk_thread *thread)
509 {
510 	return thread->name;
511 }
512 
513 int
514 spdk_thread_get_stats(struct spdk_thread_stats *stats)
515 {
516 	struct spdk_thread *thread;
517 
518 	thread = _get_thread();
519 	if (!thread) {
520 		SPDK_ERRLOG("No thread allocated\n");
521 		return -EINVAL;
522 	}
523 
524 	if (stats == NULL) {
525 		return -EINVAL;
526 	}
527 
528 	*stats = thread->stats;
529 
530 	return 0;
531 }
532 
533 void
534 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
535 {
536 	struct spdk_thread *local_thread;
537 	struct spdk_msg *msg;
538 	int rc;
539 
540 	if (!thread) {
541 		assert(false);
542 		return;
543 	}
544 
545 	local_thread = _get_thread();
546 
547 	msg = NULL;
548 	if (local_thread != NULL) {
549 		if (local_thread->msg_cache_count > 0) {
550 			msg = SLIST_FIRST(&local_thread->msg_cache);
551 			assert(msg != NULL);
552 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
553 			local_thread->msg_cache_count--;
554 		}
555 	}
556 
557 	if (msg == NULL) {
558 		msg = spdk_mempool_get(g_spdk_msg_mempool);
559 		if (!msg) {
560 			assert(false);
561 			return;
562 		}
563 	}
564 
565 	msg->fn = fn;
566 	msg->arg = ctx;
567 
568 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1);
569 	if (rc != 1) {
570 		assert(false);
571 		spdk_mempool_put(g_spdk_msg_mempool, msg);
572 		return;
573 	}
574 }
575 
576 struct spdk_poller *
577 spdk_poller_register(spdk_poller_fn fn,
578 		     void *arg,
579 		     uint64_t period_microseconds)
580 {
581 	struct spdk_thread *thread;
582 	struct spdk_poller *poller;
583 	uint64_t quotient, remainder, ticks;
584 
585 	thread = spdk_get_thread();
586 	if (!thread) {
587 		assert(false);
588 		return NULL;
589 	}
590 
591 	poller = calloc(1, sizeof(*poller));
592 	if (poller == NULL) {
593 		SPDK_ERRLOG("Poller memory allocation failed\n");
594 		return NULL;
595 	}
596 
597 	poller->state = SPDK_POLLER_STATE_WAITING;
598 	poller->fn = fn;
599 	poller->arg = arg;
600 
601 	if (period_microseconds) {
602 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
603 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
604 		ticks = spdk_get_ticks_hz();
605 
606 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
607 	} else {
608 		poller->period_ticks = 0;
609 	}
610 
611 	if (poller->period_ticks) {
612 		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
613 	} else {
614 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
615 	}
616 
617 	return poller;
618 }
619 
620 void
621 spdk_poller_unregister(struct spdk_poller **ppoller)
622 {
623 	struct spdk_thread *thread;
624 	struct spdk_poller *poller;
625 
626 	poller = *ppoller;
627 	if (poller == NULL) {
628 		return;
629 	}
630 
631 	*ppoller = NULL;
632 
633 	thread = spdk_get_thread();
634 	if (!thread) {
635 		assert(false);
636 		return;
637 	}
638 
639 	if (poller->state == SPDK_POLLER_STATE_RUNNING) {
640 		/*
641 		 * We are being called from the poller_fn, so set the state to unregistered
642 		 * and let the thread poll loop free the poller.
643 		 */
644 		poller->state = SPDK_POLLER_STATE_UNREGISTERED;
645 	} else {
646 		/* Poller is not running currently, so just free it. */
647 		if (poller->period_ticks) {
648 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
649 		} else {
650 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
651 		}
652 
653 		free(poller);
654 	}
655 }
656 
657 struct call_thread {
658 	struct spdk_thread *cur_thread;
659 	spdk_msg_fn fn;
660 	void *ctx;
661 
662 	struct spdk_thread *orig_thread;
663 	spdk_msg_fn cpl;
664 };
665 
666 static void
667 spdk_on_thread(void *ctx)
668 {
669 	struct call_thread *ct = ctx;
670 
671 	ct->fn(ct->ctx);
672 
673 	pthread_mutex_lock(&g_devlist_mutex);
674 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
675 	pthread_mutex_unlock(&g_devlist_mutex);
676 
677 	if (!ct->cur_thread) {
678 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
679 
680 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
681 		free(ctx);
682 	} else {
683 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
684 			      ct->cur_thread->name);
685 
686 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
687 	}
688 }
689 
690 void
691 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
692 {
693 	struct call_thread *ct;
694 	struct spdk_thread *thread;
695 
696 	ct = calloc(1, sizeof(*ct));
697 	if (!ct) {
698 		SPDK_ERRLOG("Unable to perform thread iteration\n");
699 		cpl(ctx);
700 		return;
701 	}
702 
703 	ct->fn = fn;
704 	ct->ctx = ctx;
705 	ct->cpl = cpl;
706 
707 	pthread_mutex_lock(&g_devlist_mutex);
708 	thread = _get_thread();
709 	if (!thread) {
710 		SPDK_ERRLOG("No thread allocated\n");
711 		free(ct);
712 		cpl(ctx);
713 		return;
714 	}
715 	ct->orig_thread = thread;
716 	ct->cur_thread = TAILQ_FIRST(&g_threads);
717 	pthread_mutex_unlock(&g_devlist_mutex);
718 
719 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
720 		      ct->orig_thread->name);
721 
722 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
723 }
724 
725 void
726 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
727 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
728 			const char *name)
729 {
730 	struct io_device *dev, *tmp;
731 	struct spdk_thread *thread;
732 
733 	assert(io_device != NULL);
734 	assert(create_cb != NULL);
735 	assert(destroy_cb != NULL);
736 
737 	thread = spdk_get_thread();
738 	if (!thread) {
739 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
740 		assert(false);
741 		return;
742 	}
743 
744 	dev = calloc(1, sizeof(struct io_device));
745 	if (dev == NULL) {
746 		SPDK_ERRLOG("could not allocate io_device\n");
747 		return;
748 	}
749 
750 	dev->io_device = io_device;
751 	if (name) {
752 		dev->name = strdup(name);
753 	} else {
754 		dev->name = spdk_sprintf_alloc("%p", dev);
755 	}
756 	dev->create_cb = create_cb;
757 	dev->destroy_cb = destroy_cb;
758 	dev->unregister_cb = NULL;
759 	dev->ctx_size = ctx_size;
760 	dev->for_each_count = 0;
761 	dev->unregistered = false;
762 	dev->refcnt = 0;
763 
764 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
765 		      dev->name, dev->io_device, thread->name);
766 
767 	pthread_mutex_lock(&g_devlist_mutex);
768 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
769 		if (tmp->io_device == io_device) {
770 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
771 				    io_device, tmp->name, dev->name);
772 			free(dev->name);
773 			free(dev);
774 			pthread_mutex_unlock(&g_devlist_mutex);
775 			return;
776 		}
777 	}
778 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
779 	pthread_mutex_unlock(&g_devlist_mutex);
780 }
781 
782 static void
783 _finish_unregister(void *arg)
784 {
785 	struct io_device *dev = arg;
786 
787 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
788 		      dev->name, dev->io_device, dev->unregister_thread->name);
789 
790 	dev->unregister_cb(dev->io_device);
791 	free(dev->name);
792 	free(dev);
793 }
794 
795 static void
796 _spdk_io_device_free(struct io_device *dev)
797 {
798 	if (dev->unregister_cb == NULL) {
799 		free(dev->name);
800 		free(dev);
801 	} else {
802 		assert(dev->unregister_thread != NULL);
803 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
804 			      dev->name, dev->io_device, dev->unregister_thread->name);
805 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
806 	}
807 }
808 
809 void
810 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
811 {
812 	struct io_device *dev;
813 	uint32_t refcnt;
814 	struct spdk_thread *thread;
815 
816 	thread = spdk_get_thread();
817 	if (!thread) {
818 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
819 		assert(false);
820 		return;
821 	}
822 
823 	pthread_mutex_lock(&g_devlist_mutex);
824 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
825 		if (dev->io_device == io_device) {
826 			break;
827 		}
828 	}
829 
830 	if (!dev) {
831 		SPDK_ERRLOG("io_device %p not found\n", io_device);
832 		assert(false);
833 		pthread_mutex_unlock(&g_devlist_mutex);
834 		return;
835 	}
836 
837 	if (dev->for_each_count > 0) {
838 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
839 			    dev->name, io_device, dev->for_each_count);
840 		pthread_mutex_unlock(&g_devlist_mutex);
841 		return;
842 	}
843 
844 	dev->unregister_cb = unregister_cb;
845 	dev->unregistered = true;
846 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
847 	refcnt = dev->refcnt;
848 	dev->unregister_thread = thread;
849 	pthread_mutex_unlock(&g_devlist_mutex);
850 
851 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
852 		      dev->name, dev->io_device, thread->name);
853 
854 	if (refcnt > 0) {
855 		/* defer deletion */
856 		return;
857 	}
858 
859 	_spdk_io_device_free(dev);
860 }
861 
862 struct spdk_io_channel *
863 spdk_get_io_channel(void *io_device)
864 {
865 	struct spdk_io_channel *ch;
866 	struct spdk_thread *thread;
867 	struct io_device *dev;
868 	int rc;
869 
870 	pthread_mutex_lock(&g_devlist_mutex);
871 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
872 		if (dev->io_device == io_device) {
873 			break;
874 		}
875 	}
876 	if (dev == NULL) {
877 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
878 		pthread_mutex_unlock(&g_devlist_mutex);
879 		return NULL;
880 	}
881 
882 	thread = _get_thread();
883 	if (!thread) {
884 		SPDK_ERRLOG("No thread allocated\n");
885 		pthread_mutex_unlock(&g_devlist_mutex);
886 		return NULL;
887 	}
888 
889 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
890 		if (ch->dev == dev) {
891 			ch->ref++;
892 
893 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
894 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
895 
896 			/*
897 			 * An I/O channel already exists for this device on this
898 			 *  thread, so return it.
899 			 */
900 			pthread_mutex_unlock(&g_devlist_mutex);
901 			return ch;
902 		}
903 	}
904 
905 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
906 	if (ch == NULL) {
907 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
908 		pthread_mutex_unlock(&g_devlist_mutex);
909 		return NULL;
910 	}
911 
912 	ch->dev = dev;
913 	ch->destroy_cb = dev->destroy_cb;
914 	ch->thread = thread;
915 	ch->ref = 1;
916 	ch->destroy_ref = 0;
917 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
918 
919 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
920 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
921 
922 	dev->refcnt++;
923 
924 	pthread_mutex_unlock(&g_devlist_mutex);
925 
926 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
927 	if (rc != 0) {
928 		pthread_mutex_lock(&g_devlist_mutex);
929 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
930 		dev->refcnt--;
931 		free(ch);
932 		pthread_mutex_unlock(&g_devlist_mutex);
933 		return NULL;
934 	}
935 
936 	return ch;
937 }
938 
939 static void
940 _spdk_put_io_channel(void *arg)
941 {
942 	struct spdk_io_channel *ch = arg;
943 	bool do_remove_dev = true;
944 	struct spdk_thread *thread;
945 
946 	thread = spdk_get_thread();
947 	if (!thread) {
948 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
949 		assert(false);
950 		return;
951 	}
952 
953 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
954 		      "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
955 		      ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
956 
957 	assert(ch->thread == thread);
958 
959 	ch->destroy_ref--;
960 
961 	if (ch->ref > 0 || ch->destroy_ref > 0) {
962 		/*
963 		 * Another reference to the associated io_device was requested
964 		 *  after this message was sent but before it had a chance to
965 		 *  execute.
966 		 */
967 		return;
968 	}
969 
970 	pthread_mutex_lock(&g_devlist_mutex);
971 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
972 	pthread_mutex_unlock(&g_devlist_mutex);
973 
974 	/* Don't hold the devlist mutex while the destroy_cb is called. */
975 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
976 
977 	pthread_mutex_lock(&g_devlist_mutex);
978 	ch->dev->refcnt--;
979 
980 	if (!ch->dev->unregistered) {
981 		do_remove_dev = false;
982 	}
983 
984 	if (ch->dev->refcnt > 0) {
985 		do_remove_dev = false;
986 	}
987 
988 	pthread_mutex_unlock(&g_devlist_mutex);
989 
990 	if (do_remove_dev) {
991 		_spdk_io_device_free(ch->dev);
992 	}
993 	free(ch);
994 }
995 
996 void
997 spdk_put_io_channel(struct spdk_io_channel *ch)
998 {
999 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1000 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1001 		      ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
1002 
1003 	ch->ref--;
1004 
1005 	if (ch->ref == 0) {
1006 		ch->destroy_ref++;
1007 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
1008 	}
1009 }
1010 
1011 struct spdk_io_channel *
1012 spdk_io_channel_from_ctx(void *ctx)
1013 {
1014 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1015 }
1016 
1017 struct spdk_thread *
1018 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1019 {
1020 	return ch->thread;
1021 }
1022 
1023 struct spdk_io_channel_iter {
1024 	void *io_device;
1025 	struct io_device *dev;
1026 	spdk_channel_msg fn;
1027 	int status;
1028 	void *ctx;
1029 	struct spdk_io_channel *ch;
1030 
1031 	struct spdk_thread *cur_thread;
1032 
1033 	struct spdk_thread *orig_thread;
1034 	spdk_channel_for_each_cpl cpl;
1035 };
1036 
1037 void *
1038 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1039 {
1040 	return i->io_device;
1041 }
1042 
1043 struct spdk_io_channel *
1044 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1045 {
1046 	return i->ch;
1047 }
1048 
1049 void *
1050 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1051 {
1052 	return i->ctx;
1053 }
1054 
1055 static void
1056 _call_completion(void *ctx)
1057 {
1058 	struct spdk_io_channel_iter *i = ctx;
1059 
1060 	if (i->cpl != NULL) {
1061 		i->cpl(i, i->status);
1062 	}
1063 	free(i);
1064 }
1065 
1066 static void
1067 _call_channel(void *ctx)
1068 {
1069 	struct spdk_io_channel_iter *i = ctx;
1070 	struct spdk_io_channel *ch;
1071 
1072 	/*
1073 	 * It is possible that the channel was deleted before this
1074 	 *  message had a chance to execute.  If so, skip calling
1075 	 *  the fn() on this thread.
1076 	 */
1077 	pthread_mutex_lock(&g_devlist_mutex);
1078 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1079 		if (ch->dev->io_device == i->io_device) {
1080 			break;
1081 		}
1082 	}
1083 	pthread_mutex_unlock(&g_devlist_mutex);
1084 
1085 	if (ch) {
1086 		i->fn(i);
1087 	} else {
1088 		spdk_for_each_channel_continue(i, 0);
1089 	}
1090 }
1091 
1092 void
1093 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1094 		      spdk_channel_for_each_cpl cpl)
1095 {
1096 	struct spdk_thread *thread;
1097 	struct spdk_io_channel *ch;
1098 	struct spdk_io_channel_iter *i;
1099 
1100 	i = calloc(1, sizeof(*i));
1101 	if (!i) {
1102 		SPDK_ERRLOG("Unable to allocate iterator\n");
1103 		return;
1104 	}
1105 
1106 	i->io_device = io_device;
1107 	i->fn = fn;
1108 	i->ctx = ctx;
1109 	i->cpl = cpl;
1110 
1111 	pthread_mutex_lock(&g_devlist_mutex);
1112 	i->orig_thread = _get_thread();
1113 
1114 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1115 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1116 			if (ch->dev->io_device == io_device) {
1117 				ch->dev->for_each_count++;
1118 				i->dev = ch->dev;
1119 				i->cur_thread = thread;
1120 				i->ch = ch;
1121 				pthread_mutex_unlock(&g_devlist_mutex);
1122 				spdk_thread_send_msg(thread, _call_channel, i);
1123 				return;
1124 			}
1125 		}
1126 	}
1127 
1128 	pthread_mutex_unlock(&g_devlist_mutex);
1129 
1130 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1131 }
1132 
1133 void
1134 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1135 {
1136 	struct spdk_thread *thread;
1137 	struct spdk_io_channel *ch;
1138 
1139 	assert(i->cur_thread == spdk_get_thread());
1140 
1141 	i->status = status;
1142 
1143 	pthread_mutex_lock(&g_devlist_mutex);
1144 	if (status) {
1145 		goto end;
1146 	}
1147 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1148 	while (thread) {
1149 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1150 			if (ch->dev->io_device == i->io_device) {
1151 				i->cur_thread = thread;
1152 				i->ch = ch;
1153 				pthread_mutex_unlock(&g_devlist_mutex);
1154 				spdk_thread_send_msg(thread, _call_channel, i);
1155 				return;
1156 			}
1157 		}
1158 		thread = TAILQ_NEXT(thread, tailq);
1159 	}
1160 
1161 end:
1162 	i->dev->for_each_count--;
1163 	i->ch = NULL;
1164 	pthread_mutex_unlock(&g_devlist_mutex);
1165 
1166 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1167 }
1168 
1169 
1170 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1171