xref: /spdk/lib/thread/thread.c (revision 63b3b8fd8f7e6cb6a348c2fcdd189aa62baf44f6)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/queue.h"
38 #include "spdk/string.h"
39 #include "spdk/thread.h"
40 #include "spdk/util.h"
41 
42 #include "spdk_internal/log.h"
43 #include "spdk_internal/thread.h"
44 
45 #ifdef __linux__
46 #include <sys/prctl.h>
47 #endif
48 
49 #ifdef __FreeBSD__
50 #include <pthread_np.h>
51 #endif
52 
53 #define SPDK_MSG_BATCH_SIZE		8
54 
55 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
56 
57 static spdk_new_thread_fn g_new_thread_fn = NULL;
58 
59 struct io_device {
60 	void				*io_device;
61 	char				*name;
62 	spdk_io_channel_create_cb	create_cb;
63 	spdk_io_channel_destroy_cb	destroy_cb;
64 	spdk_io_device_unregister_cb	unregister_cb;
65 	struct spdk_thread		*unregister_thread;
66 	uint32_t			ctx_size;
67 	uint32_t			for_each_count;
68 	TAILQ_ENTRY(io_device)		tailq;
69 
70 	uint32_t			refcnt;
71 
72 	bool				unregistered;
73 };
74 
75 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
76 
77 struct spdk_msg {
78 	spdk_msg_fn		fn;
79 	void			*arg;
80 
81 	SLIST_ENTRY(spdk_msg)	link;
82 };
83 
84 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
85 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
86 
87 enum spdk_poller_state {
88 	/* The poller is registered with a thread but not currently executing its fn. */
89 	SPDK_POLLER_STATE_WAITING,
90 
91 	/* The poller is currently running its fn. */
92 	SPDK_POLLER_STATE_RUNNING,
93 
94 	/* The poller was unregistered during the execution of its fn. */
95 	SPDK_POLLER_STATE_UNREGISTERED,
96 };
97 
98 struct spdk_poller {
99 	TAILQ_ENTRY(spdk_poller)	tailq;
100 
101 	/* Current state of the poller; should only be accessed from the poller's thread. */
102 	enum spdk_poller_state		state;
103 
104 	uint64_t			period_ticks;
105 	uint64_t			next_run_tick;
106 	spdk_poller_fn			fn;
107 	void				*arg;
108 };
109 
110 struct spdk_thread {
111 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
112 	TAILQ_ENTRY(spdk_thread)	tailq;
113 	char				*name;
114 
115 	/*
116 	 * Contains pollers actively running on this thread.  Pollers
117 	 *  are run round-robin. The thread takes one poller from the head
118 	 *  of the ring, executes it, then puts it back at the tail of
119 	 *  the ring.
120 	 */
121 	TAILQ_HEAD(, spdk_poller)	active_pollers;
122 
123 	/**
124 	 * Contains pollers running on this thread with a periodic timer.
125 	 */
126 	TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers;
127 
128 	struct spdk_ring		*messages;
129 
130 	SLIST_HEAD(, spdk_msg)		msg_cache;
131 	size_t				msg_cache_count;
132 };
133 
134 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
135 static uint32_t g_thread_count = 0;
136 
137 static __thread struct spdk_thread *tls_thread = NULL;
138 
139 static inline struct spdk_thread *
140 _get_thread(void)
141 {
142 	return tls_thread;
143 }
144 
145 static void
146 _set_thread_name(const char *thread_name)
147 {
148 #if defined(__linux__)
149 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
150 #elif defined(__FreeBSD__)
151 	pthread_set_name_np(pthread_self(), thread_name);
152 #else
153 #error missing platform support for thread name
154 #endif
155 }
156 
157 int
158 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn)
159 {
160 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
161 
162 	assert(g_new_thread_fn == NULL);
163 	g_new_thread_fn = new_thread_fn;
164 
165 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
166 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
167 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
168 			     sizeof(struct spdk_msg),
169 			     0, /* No cache. We do our own. */
170 			     SPDK_ENV_SOCKET_ID_ANY);
171 
172 	if (!g_spdk_msg_mempool) {
173 		return -1;
174 	}
175 
176 	return 0;
177 }
178 
179 void
180 spdk_thread_lib_fini(void)
181 {
182 	struct io_device *dev;
183 
184 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
185 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
186 	}
187 
188 	if (g_spdk_msg_mempool) {
189 		spdk_mempool_free(g_spdk_msg_mempool);
190 	}
191 }
192 
193 struct spdk_thread *
194 spdk_thread_create(const char *name)
195 {
196 	struct spdk_thread *thread;
197 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
198 	int rc, i;
199 
200 	thread = calloc(1, sizeof(*thread));
201 	if (!thread) {
202 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
203 		return NULL;
204 	}
205 
206 	TAILQ_INIT(&thread->io_channels);
207 	TAILQ_INIT(&thread->active_pollers);
208 	TAILQ_INIT(&thread->timer_pollers);
209 	SLIST_INIT(&thread->msg_cache);
210 	thread->msg_cache_count = 0;
211 
212 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
213 	if (!thread->messages) {
214 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
215 		free(thread);
216 		return NULL;
217 	}
218 
219 	/* Fill the local message pool cache. */
220 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
221 	if (rc == 0) {
222 		/* If we can't populate the cache it's ok. The cache will get filled
223 		 * up organically as messages are passed to the thread. */
224 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
225 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
226 			thread->msg_cache_count++;
227 		}
228 	}
229 
230 	if (name) {
231 		_set_thread_name(name);
232 		thread->name = strdup(name);
233 	} else {
234 		thread->name = spdk_sprintf_alloc("%p", thread);
235 	}
236 
237 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
238 
239 	pthread_mutex_lock(&g_devlist_mutex);
240 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
241 	g_thread_count++;
242 	pthread_mutex_unlock(&g_devlist_mutex);
243 
244 	if (g_new_thread_fn) {
245 		g_new_thread_fn(thread);
246 	}
247 
248 	return thread;
249 }
250 
251 void
252 spdk_set_thread(struct spdk_thread *thread)
253 {
254 	tls_thread = thread;
255 }
256 
257 void
258 spdk_thread_exit(struct spdk_thread *thread)
259 {
260 	struct spdk_io_channel *ch;
261 	struct spdk_msg *msg;
262 
263 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name);
264 
265 	if (tls_thread == thread) {
266 		tls_thread = NULL;
267 	}
268 
269 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
270 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
271 			    thread->name, ch->dev->name);
272 	}
273 
274 	pthread_mutex_lock(&g_devlist_mutex);
275 	assert(g_thread_count > 0);
276 	g_thread_count--;
277 	TAILQ_REMOVE(&g_threads, thread, tailq);
278 	pthread_mutex_unlock(&g_devlist_mutex);
279 
280 	free(thread->name);
281 
282 	msg = SLIST_FIRST(&thread->msg_cache);
283 	while (msg != NULL) {
284 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
285 
286 		assert(thread->msg_cache_count > 0);
287 		thread->msg_cache_count--;
288 		spdk_mempool_put(g_spdk_msg_mempool, msg);
289 
290 		msg = SLIST_FIRST(&thread->msg_cache);
291 	}
292 
293 	assert(thread->msg_cache_count == 0);
294 
295 	if (thread->messages) {
296 		spdk_ring_free(thread->messages);
297 	}
298 
299 	free(thread);
300 }
301 
302 static inline uint32_t
303 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
304 {
305 	unsigned count, i;
306 	void *messages[SPDK_MSG_BATCH_SIZE];
307 
308 #ifdef DEBUG
309 	/*
310 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
311 	 * so we will never actually read uninitialized data from events, but just to be sure
312 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
313 	 */
314 	memset(messages, 0, sizeof(messages));
315 #endif
316 
317 	if (max_msgs > 0) {
318 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
319 	} else {
320 		max_msgs = SPDK_MSG_BATCH_SIZE;
321 	}
322 
323 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
324 	if (count == 0) {
325 		return 0;
326 	}
327 
328 	for (i = 0; i < count; i++) {
329 		struct spdk_msg *msg = messages[i];
330 
331 		assert(msg != NULL);
332 		msg->fn(msg->arg);
333 
334 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
335 			/* Insert the messages at the head. We want to re-use the hot
336 			 * ones. */
337 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
338 			thread->msg_cache_count++;
339 		} else {
340 			spdk_mempool_put(g_spdk_msg_mempool, msg);
341 		}
342 	}
343 
344 	return count;
345 }
346 
347 static void
348 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
349 {
350 	struct spdk_poller *iter;
351 
352 	poller->next_run_tick = now + poller->period_ticks;
353 
354 	/*
355 	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
356 	 * run time.
357 	 */
358 	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
359 		if (iter->next_run_tick <= poller->next_run_tick) {
360 			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
361 			return;
362 		}
363 	}
364 
365 	/* No earlier pollers were found, so this poller must be the new head */
366 	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
367 }
368 
369 int
370 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs)
371 {
372 	uint32_t msg_count;
373 	struct spdk_thread *orig_thread;
374 	struct spdk_poller *poller;
375 	int rc = 0;
376 
377 	orig_thread = _get_thread();
378 	tls_thread = thread;
379 
380 	msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
381 	if (msg_count) {
382 		rc = 1;
383 	}
384 
385 	poller = TAILQ_FIRST(&thread->active_pollers);
386 	if (poller) {
387 		int poller_rc;
388 
389 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
390 		poller->state = SPDK_POLLER_STATE_RUNNING;
391 		poller_rc = poller->fn(poller->arg);
392 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
393 			free(poller);
394 		} else {
395 			poller->state = SPDK_POLLER_STATE_WAITING;
396 			TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
397 		}
398 
399 #ifdef DEBUG
400 		if (poller_rc == -1) {
401 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
402 		}
403 #endif
404 
405 		if (poller_rc > rc) {
406 			rc = poller_rc;
407 		}
408 	}
409 
410 	poller = TAILQ_FIRST(&thread->timer_pollers);
411 	if (poller) {
412 		uint64_t now = spdk_get_ticks();
413 
414 		if (now >= poller->next_run_tick) {
415 			int timer_rc = 0;
416 
417 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
418 			poller->state = SPDK_POLLER_STATE_RUNNING;
419 			timer_rc = poller->fn(poller->arg);
420 			if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
421 				free(poller);
422 			} else {
423 				poller->state = SPDK_POLLER_STATE_WAITING;
424 				_spdk_poller_insert_timer(thread, poller, now);
425 			}
426 
427 #ifdef DEBUG
428 			if (timer_rc == -1) {
429 				SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
430 			}
431 #endif
432 
433 			if (timer_rc > rc) {
434 				rc = timer_rc;
435 
436 			}
437 		}
438 	}
439 
440 	tls_thread = orig_thread;
441 
442 	return rc;
443 }
444 
445 uint64_t
446 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
447 {
448 	struct spdk_poller *poller;
449 
450 	poller = TAILQ_FIRST(&thread->timer_pollers);
451 	if (poller) {
452 		return poller->next_run_tick;
453 	}
454 
455 	return 0;
456 }
457 
458 int
459 spdk_thread_has_active_pollers(struct spdk_thread *thread)
460 {
461 	return !TAILQ_EMPTY(&thread->active_pollers);
462 }
463 
464 uint32_t
465 spdk_thread_get_count(void)
466 {
467 	/*
468 	 * Return cached value of the current thread count.  We could acquire the
469 	 *  lock and iterate through the TAILQ of threads to count them, but that
470 	 *  count could still be invalidated after we release the lock.
471 	 */
472 	return g_thread_count;
473 }
474 
475 struct spdk_thread *
476 spdk_get_thread(void)
477 {
478 	struct spdk_thread *thread;
479 
480 	thread = _get_thread();
481 	if (!thread) {
482 		SPDK_ERRLOG("No thread allocated\n");
483 	}
484 
485 	return thread;
486 }
487 
488 const char *
489 spdk_thread_get_name(const struct spdk_thread *thread)
490 {
491 	return thread->name;
492 }
493 
494 void
495 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
496 {
497 	struct spdk_thread *local_thread;
498 	struct spdk_msg *msg;
499 	int rc;
500 
501 	if (!thread) {
502 		assert(false);
503 		return;
504 	}
505 
506 	local_thread = _get_thread();
507 
508 	msg = NULL;
509 	if (local_thread != NULL) {
510 		if (local_thread->msg_cache_count > 0) {
511 			msg = SLIST_FIRST(&local_thread->msg_cache);
512 			assert(msg != NULL);
513 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
514 			local_thread->msg_cache_count--;
515 		}
516 	}
517 
518 	if (msg == NULL) {
519 		msg = spdk_mempool_get(g_spdk_msg_mempool);
520 		if (!msg) {
521 			assert(false);
522 			return;
523 		}
524 	}
525 
526 	msg->fn = fn;
527 	msg->arg = ctx;
528 
529 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1);
530 	if (rc != 1) {
531 		assert(false);
532 		spdk_mempool_put(g_spdk_msg_mempool, msg);
533 		return;
534 	}
535 }
536 
537 struct spdk_poller *
538 spdk_poller_register(spdk_poller_fn fn,
539 		     void *arg,
540 		     uint64_t period_microseconds)
541 {
542 	struct spdk_thread *thread;
543 	struct spdk_poller *poller;
544 	uint64_t quotient, remainder, ticks;
545 
546 	thread = spdk_get_thread();
547 	if (!thread) {
548 		assert(false);
549 		return NULL;
550 	}
551 
552 	poller = calloc(1, sizeof(*poller));
553 	if (poller == NULL) {
554 		SPDK_ERRLOG("Poller memory allocation failed\n");
555 		return NULL;
556 	}
557 
558 	poller->state = SPDK_POLLER_STATE_WAITING;
559 	poller->fn = fn;
560 	poller->arg = arg;
561 
562 	if (period_microseconds) {
563 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
564 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
565 		ticks = spdk_get_ticks_hz();
566 
567 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
568 	} else {
569 		poller->period_ticks = 0;
570 	}
571 
572 	if (poller->period_ticks) {
573 		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
574 	} else {
575 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
576 	}
577 
578 	return poller;
579 }
580 
581 void
582 spdk_poller_unregister(struct spdk_poller **ppoller)
583 {
584 	struct spdk_thread *thread;
585 	struct spdk_poller *poller;
586 
587 	poller = *ppoller;
588 	if (poller == NULL) {
589 		return;
590 	}
591 
592 	*ppoller = NULL;
593 
594 	thread = spdk_get_thread();
595 	if (!thread) {
596 		assert(false);
597 		return;
598 	}
599 
600 	if (poller->state == SPDK_POLLER_STATE_RUNNING) {
601 		/*
602 		 * We are being called from the poller_fn, so set the state to unregistered
603 		 * and let the thread poll loop free the poller.
604 		 */
605 		poller->state = SPDK_POLLER_STATE_UNREGISTERED;
606 	} else {
607 		/* Poller is not running currently, so just free it. */
608 		if (poller->period_ticks) {
609 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
610 		} else {
611 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
612 		}
613 
614 		free(poller);
615 	}
616 }
617 
618 struct call_thread {
619 	struct spdk_thread *cur_thread;
620 	spdk_msg_fn fn;
621 	void *ctx;
622 
623 	struct spdk_thread *orig_thread;
624 	spdk_msg_fn cpl;
625 };
626 
627 static void
628 spdk_on_thread(void *ctx)
629 {
630 	struct call_thread *ct = ctx;
631 
632 	ct->fn(ct->ctx);
633 
634 	pthread_mutex_lock(&g_devlist_mutex);
635 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
636 	pthread_mutex_unlock(&g_devlist_mutex);
637 
638 	if (!ct->cur_thread) {
639 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
640 
641 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
642 		free(ctx);
643 	} else {
644 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
645 			      ct->cur_thread->name);
646 
647 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
648 	}
649 }
650 
651 void
652 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
653 {
654 	struct call_thread *ct;
655 	struct spdk_thread *thread;
656 
657 	ct = calloc(1, sizeof(*ct));
658 	if (!ct) {
659 		SPDK_ERRLOG("Unable to perform thread iteration\n");
660 		cpl(ctx);
661 		return;
662 	}
663 
664 	ct->fn = fn;
665 	ct->ctx = ctx;
666 	ct->cpl = cpl;
667 
668 	pthread_mutex_lock(&g_devlist_mutex);
669 	thread = _get_thread();
670 	if (!thread) {
671 		SPDK_ERRLOG("No thread allocated\n");
672 		free(ct);
673 		cpl(ctx);
674 		return;
675 	}
676 	ct->orig_thread = thread;
677 	ct->cur_thread = TAILQ_FIRST(&g_threads);
678 	pthread_mutex_unlock(&g_devlist_mutex);
679 
680 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
681 		      ct->orig_thread->name);
682 
683 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
684 }
685 
686 void
687 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
688 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
689 			const char *name)
690 {
691 	struct io_device *dev, *tmp;
692 	struct spdk_thread *thread;
693 
694 	assert(io_device != NULL);
695 	assert(create_cb != NULL);
696 	assert(destroy_cb != NULL);
697 
698 	thread = spdk_get_thread();
699 	if (!thread) {
700 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
701 		assert(false);
702 		return;
703 	}
704 
705 	dev = calloc(1, sizeof(struct io_device));
706 	if (dev == NULL) {
707 		SPDK_ERRLOG("could not allocate io_device\n");
708 		return;
709 	}
710 
711 	dev->io_device = io_device;
712 	if (name) {
713 		dev->name = strdup(name);
714 	} else {
715 		dev->name = spdk_sprintf_alloc("%p", dev);
716 	}
717 	dev->create_cb = create_cb;
718 	dev->destroy_cb = destroy_cb;
719 	dev->unregister_cb = NULL;
720 	dev->ctx_size = ctx_size;
721 	dev->for_each_count = 0;
722 	dev->unregistered = false;
723 	dev->refcnt = 0;
724 
725 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
726 		      dev->name, dev->io_device, thread->name);
727 
728 	pthread_mutex_lock(&g_devlist_mutex);
729 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
730 		if (tmp->io_device == io_device) {
731 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
732 				    io_device, tmp->name, dev->name);
733 			free(dev->name);
734 			free(dev);
735 			pthread_mutex_unlock(&g_devlist_mutex);
736 			return;
737 		}
738 	}
739 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
740 	pthread_mutex_unlock(&g_devlist_mutex);
741 }
742 
743 static void
744 _finish_unregister(void *arg)
745 {
746 	struct io_device *dev = arg;
747 
748 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
749 		      dev->name, dev->io_device, dev->unregister_thread->name);
750 
751 	dev->unregister_cb(dev->io_device);
752 	free(dev->name);
753 	free(dev);
754 }
755 
756 static void
757 _spdk_io_device_free(struct io_device *dev)
758 {
759 	if (dev->unregister_cb == NULL) {
760 		free(dev->name);
761 		free(dev);
762 	} else {
763 		assert(dev->unregister_thread != NULL);
764 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
765 			      dev->name, dev->io_device, dev->unregister_thread->name);
766 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
767 	}
768 }
769 
770 void
771 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
772 {
773 	struct io_device *dev;
774 	uint32_t refcnt;
775 	struct spdk_thread *thread;
776 
777 	thread = spdk_get_thread();
778 	if (!thread) {
779 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
780 		assert(false);
781 		return;
782 	}
783 
784 	pthread_mutex_lock(&g_devlist_mutex);
785 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
786 		if (dev->io_device == io_device) {
787 			break;
788 		}
789 	}
790 
791 	if (!dev) {
792 		SPDK_ERRLOG("io_device %p not found\n", io_device);
793 		assert(false);
794 		pthread_mutex_unlock(&g_devlist_mutex);
795 		return;
796 	}
797 
798 	if (dev->for_each_count > 0) {
799 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
800 			    dev->name, io_device, dev->for_each_count);
801 		pthread_mutex_unlock(&g_devlist_mutex);
802 		return;
803 	}
804 
805 	dev->unregister_cb = unregister_cb;
806 	dev->unregistered = true;
807 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
808 	refcnt = dev->refcnt;
809 	dev->unregister_thread = thread;
810 	pthread_mutex_unlock(&g_devlist_mutex);
811 
812 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
813 		      dev->name, dev->io_device, thread->name);
814 
815 	if (refcnt > 0) {
816 		/* defer deletion */
817 		return;
818 	}
819 
820 	_spdk_io_device_free(dev);
821 }
822 
823 struct spdk_io_channel *
824 spdk_get_io_channel(void *io_device)
825 {
826 	struct spdk_io_channel *ch;
827 	struct spdk_thread *thread;
828 	struct io_device *dev;
829 	int rc;
830 
831 	pthread_mutex_lock(&g_devlist_mutex);
832 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
833 		if (dev->io_device == io_device) {
834 			break;
835 		}
836 	}
837 	if (dev == NULL) {
838 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
839 		pthread_mutex_unlock(&g_devlist_mutex);
840 		return NULL;
841 	}
842 
843 	thread = _get_thread();
844 	if (!thread) {
845 		SPDK_ERRLOG("No thread allocated\n");
846 		pthread_mutex_unlock(&g_devlist_mutex);
847 		return NULL;
848 	}
849 
850 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
851 		if (ch->dev == dev) {
852 			ch->ref++;
853 
854 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
855 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
856 
857 			/*
858 			 * An I/O channel already exists for this device on this
859 			 *  thread, so return it.
860 			 */
861 			pthread_mutex_unlock(&g_devlist_mutex);
862 			return ch;
863 		}
864 	}
865 
866 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
867 	if (ch == NULL) {
868 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
869 		pthread_mutex_unlock(&g_devlist_mutex);
870 		return NULL;
871 	}
872 
873 	ch->dev = dev;
874 	ch->destroy_cb = dev->destroy_cb;
875 	ch->thread = thread;
876 	ch->ref = 1;
877 	ch->destroy_ref = 0;
878 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
879 
880 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
881 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
882 
883 	dev->refcnt++;
884 
885 	pthread_mutex_unlock(&g_devlist_mutex);
886 
887 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
888 	if (rc != 0) {
889 		pthread_mutex_lock(&g_devlist_mutex);
890 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
891 		dev->refcnt--;
892 		free(ch);
893 		pthread_mutex_unlock(&g_devlist_mutex);
894 		return NULL;
895 	}
896 
897 	return ch;
898 }
899 
900 static void
901 _spdk_put_io_channel(void *arg)
902 {
903 	struct spdk_io_channel *ch = arg;
904 	bool do_remove_dev = true;
905 	struct spdk_thread *thread;
906 
907 	thread = spdk_get_thread();
908 	if (!thread) {
909 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
910 		assert(false);
911 		return;
912 	}
913 
914 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
915 		      "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
916 		      ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
917 
918 	assert(ch->thread == thread);
919 
920 	ch->destroy_ref--;
921 
922 	if (ch->ref > 0 || ch->destroy_ref > 0) {
923 		/*
924 		 * Another reference to the associated io_device was requested
925 		 *  after this message was sent but before it had a chance to
926 		 *  execute.
927 		 */
928 		return;
929 	}
930 
931 	pthread_mutex_lock(&g_devlist_mutex);
932 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
933 	pthread_mutex_unlock(&g_devlist_mutex);
934 
935 	/* Don't hold the devlist mutex while the destroy_cb is called. */
936 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
937 
938 	pthread_mutex_lock(&g_devlist_mutex);
939 	ch->dev->refcnt--;
940 
941 	if (!ch->dev->unregistered) {
942 		do_remove_dev = false;
943 	}
944 
945 	if (ch->dev->refcnt > 0) {
946 		do_remove_dev = false;
947 	}
948 
949 	pthread_mutex_unlock(&g_devlist_mutex);
950 
951 	if (do_remove_dev) {
952 		_spdk_io_device_free(ch->dev);
953 	}
954 	free(ch);
955 }
956 
957 void
958 spdk_put_io_channel(struct spdk_io_channel *ch)
959 {
960 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
961 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
962 		      ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
963 
964 	ch->ref--;
965 
966 	if (ch->ref == 0) {
967 		ch->destroy_ref++;
968 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
969 	}
970 }
971 
972 struct spdk_io_channel *
973 spdk_io_channel_from_ctx(void *ctx)
974 {
975 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
976 }
977 
978 struct spdk_thread *
979 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
980 {
981 	return ch->thread;
982 }
983 
984 struct spdk_io_channel_iter {
985 	void *io_device;
986 	struct io_device *dev;
987 	spdk_channel_msg fn;
988 	int status;
989 	void *ctx;
990 	struct spdk_io_channel *ch;
991 
992 	struct spdk_thread *cur_thread;
993 
994 	struct spdk_thread *orig_thread;
995 	spdk_channel_for_each_cpl cpl;
996 };
997 
998 void *
999 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1000 {
1001 	return i->io_device;
1002 }
1003 
1004 struct spdk_io_channel *
1005 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1006 {
1007 	return i->ch;
1008 }
1009 
1010 void *
1011 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1012 {
1013 	return i->ctx;
1014 }
1015 
1016 static void
1017 _call_completion(void *ctx)
1018 {
1019 	struct spdk_io_channel_iter *i = ctx;
1020 
1021 	if (i->cpl != NULL) {
1022 		i->cpl(i, i->status);
1023 	}
1024 	free(i);
1025 }
1026 
1027 static void
1028 _call_channel(void *ctx)
1029 {
1030 	struct spdk_io_channel_iter *i = ctx;
1031 	struct spdk_io_channel *ch;
1032 
1033 	/*
1034 	 * It is possible that the channel was deleted before this
1035 	 *  message had a chance to execute.  If so, skip calling
1036 	 *  the fn() on this thread.
1037 	 */
1038 	pthread_mutex_lock(&g_devlist_mutex);
1039 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1040 		if (ch->dev->io_device == i->io_device) {
1041 			break;
1042 		}
1043 	}
1044 	pthread_mutex_unlock(&g_devlist_mutex);
1045 
1046 	if (ch) {
1047 		i->fn(i);
1048 	} else {
1049 		spdk_for_each_channel_continue(i, 0);
1050 	}
1051 }
1052 
1053 void
1054 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1055 		      spdk_channel_for_each_cpl cpl)
1056 {
1057 	struct spdk_thread *thread;
1058 	struct spdk_io_channel *ch;
1059 	struct spdk_io_channel_iter *i;
1060 
1061 	i = calloc(1, sizeof(*i));
1062 	if (!i) {
1063 		SPDK_ERRLOG("Unable to allocate iterator\n");
1064 		return;
1065 	}
1066 
1067 	i->io_device = io_device;
1068 	i->fn = fn;
1069 	i->ctx = ctx;
1070 	i->cpl = cpl;
1071 
1072 	pthread_mutex_lock(&g_devlist_mutex);
1073 	i->orig_thread = _get_thread();
1074 
1075 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1076 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1077 			if (ch->dev->io_device == io_device) {
1078 				ch->dev->for_each_count++;
1079 				i->dev = ch->dev;
1080 				i->cur_thread = thread;
1081 				i->ch = ch;
1082 				pthread_mutex_unlock(&g_devlist_mutex);
1083 				spdk_thread_send_msg(thread, _call_channel, i);
1084 				return;
1085 			}
1086 		}
1087 	}
1088 
1089 	pthread_mutex_unlock(&g_devlist_mutex);
1090 
1091 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1092 }
1093 
1094 void
1095 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1096 {
1097 	struct spdk_thread *thread;
1098 	struct spdk_io_channel *ch;
1099 
1100 	assert(i->cur_thread == spdk_get_thread());
1101 
1102 	i->status = status;
1103 
1104 	pthread_mutex_lock(&g_devlist_mutex);
1105 	if (status) {
1106 		goto end;
1107 	}
1108 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1109 	while (thread) {
1110 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1111 			if (ch->dev->io_device == i->io_device) {
1112 				i->cur_thread = thread;
1113 				i->ch = ch;
1114 				pthread_mutex_unlock(&g_devlist_mutex);
1115 				spdk_thread_send_msg(thread, _call_channel, i);
1116 				return;
1117 			}
1118 		}
1119 		thread = TAILQ_NEXT(thread, tailq);
1120 	}
1121 
1122 end:
1123 	i->dev->for_each_count--;
1124 	i->ch = NULL;
1125 	pthread_mutex_unlock(&g_devlist_mutex);
1126 
1127 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1128 }
1129 
1130 
1131 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1132