xref: /spdk/lib/thread/thread.c (revision 9b32f4858c035134b5680b27678716d63370c678)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/queue.h"
39 #include "spdk/string.h"
40 #include "spdk/thread.h"
41 #include "spdk/util.h"
42 
43 #include "spdk/log.h"
44 #include "spdk_internal/thread.h"
45 
46 #define SPDK_MSG_BATCH_SIZE		8
47 #define SPDK_MAX_DEVICE_NAME_LEN	256
48 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
49 
50 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
51 
52 static spdk_new_thread_fn g_new_thread_fn = NULL;
53 static spdk_thread_op_fn g_thread_op_fn = NULL;
54 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
55 static size_t g_ctx_sz = 0;
56 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
57  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
58  * SPDK application is required.
59  */
60 static uint64_t g_thread_id = 1;
61 
62 struct io_device {
63 	void				*io_device;
64 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
65 	spdk_io_channel_create_cb	create_cb;
66 	spdk_io_channel_destroy_cb	destroy_cb;
67 	spdk_io_device_unregister_cb	unregister_cb;
68 	struct spdk_thread		*unregister_thread;
69 	uint32_t			ctx_size;
70 	uint32_t			for_each_count;
71 	TAILQ_ENTRY(io_device)		tailq;
72 
73 	uint32_t			refcnt;
74 
75 	bool				unregistered;
76 };
77 
78 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
79 
80 struct spdk_msg {
81 	spdk_msg_fn		fn;
82 	void			*arg;
83 
84 	SLIST_ENTRY(spdk_msg)	link;
85 };
86 
87 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
88 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
89 
90 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
91 static uint32_t g_thread_count = 0;
92 
93 static __thread struct spdk_thread *tls_thread = NULL;
94 
95 static inline struct spdk_thread *
96 _get_thread(void)
97 {
98 	return tls_thread;
99 }
100 
101 static int
102 _thread_lib_init(size_t ctx_sz)
103 {
104 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
105 
106 	g_ctx_sz = ctx_sz;
107 
108 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
109 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
110 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
111 			     sizeof(struct spdk_msg),
112 			     0, /* No cache. We do our own. */
113 			     SPDK_ENV_SOCKET_ID_ANY);
114 
115 	if (!g_spdk_msg_mempool) {
116 		return -1;
117 	}
118 
119 	return 0;
120 }
121 
122 int
123 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
124 {
125 	assert(g_new_thread_fn == NULL);
126 	assert(g_thread_op_fn == NULL);
127 
128 	if (new_thread_fn == NULL) {
129 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
130 	} else {
131 		g_new_thread_fn = new_thread_fn;
132 	}
133 
134 	return _thread_lib_init(ctx_sz);
135 }
136 
137 int
138 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
139 			 spdk_thread_op_supported_fn thread_op_supported_fn,
140 			 size_t ctx_sz)
141 {
142 	assert(g_new_thread_fn == NULL);
143 	assert(g_thread_op_fn == NULL);
144 	assert(g_thread_op_supported_fn == NULL);
145 
146 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
147 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
148 		return -EINVAL;
149 	}
150 
151 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
152 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
153 	} else {
154 		g_thread_op_fn = thread_op_fn;
155 		g_thread_op_supported_fn = thread_op_supported_fn;
156 	}
157 
158 	return _thread_lib_init(ctx_sz);
159 }
160 
161 void
162 spdk_thread_lib_fini(void)
163 {
164 	struct io_device *dev;
165 
166 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
167 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
168 	}
169 
170 	if (g_spdk_msg_mempool) {
171 		spdk_mempool_free(g_spdk_msg_mempool);
172 		g_spdk_msg_mempool = NULL;
173 	}
174 
175 	g_new_thread_fn = NULL;
176 	g_thread_op_fn = NULL;
177 	g_thread_op_supported_fn = NULL;
178 	g_ctx_sz = 0;
179 }
180 
181 static void
182 _free_thread(struct spdk_thread *thread)
183 {
184 	struct spdk_io_channel *ch;
185 	struct spdk_msg *msg;
186 	struct spdk_poller *poller, *ptmp;
187 
188 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
189 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
190 			    thread->name, ch->dev->name);
191 	}
192 
193 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
194 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
195 			SPDK_WARNLOG("poller %s still registered at thread exit\n",
196 				     poller->name);
197 		}
198 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
199 		free(poller);
200 	}
201 
202 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) {
203 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
204 			SPDK_WARNLOG("poller %s still registered at thread exit\n",
205 				     poller->name);
206 		}
207 		TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
208 		free(poller);
209 	}
210 
211 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
212 		SPDK_WARNLOG("poller %s still registered at thread exit\n", poller->name);
213 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
214 		free(poller);
215 	}
216 
217 	pthread_mutex_lock(&g_devlist_mutex);
218 	assert(g_thread_count > 0);
219 	g_thread_count--;
220 	TAILQ_REMOVE(&g_threads, thread, tailq);
221 	pthread_mutex_unlock(&g_devlist_mutex);
222 
223 	msg = SLIST_FIRST(&thread->msg_cache);
224 	while (msg != NULL) {
225 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
226 
227 		assert(thread->msg_cache_count > 0);
228 		thread->msg_cache_count--;
229 		spdk_mempool_put(g_spdk_msg_mempool, msg);
230 
231 		msg = SLIST_FIRST(&thread->msg_cache);
232 	}
233 
234 	assert(thread->msg_cache_count == 0);
235 
236 	spdk_ring_free(thread->messages);
237 	free(thread);
238 }
239 
240 struct spdk_thread *
241 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
242 {
243 	struct spdk_thread *thread;
244 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
245 	int rc = 0, i;
246 
247 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
248 	if (!thread) {
249 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
250 		return NULL;
251 	}
252 
253 	if (cpumask) {
254 		spdk_cpuset_copy(&thread->cpumask, cpumask);
255 	} else {
256 		spdk_cpuset_negate(&thread->cpumask);
257 	}
258 
259 	TAILQ_INIT(&thread->io_channels);
260 	TAILQ_INIT(&thread->active_pollers);
261 	TAILQ_INIT(&thread->timed_pollers);
262 	TAILQ_INIT(&thread->paused_pollers);
263 	SLIST_INIT(&thread->msg_cache);
264 	thread->msg_cache_count = 0;
265 
266 	thread->tsc_last = spdk_get_ticks();
267 
268 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
269 	if (!thread->messages) {
270 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
271 		free(thread);
272 		return NULL;
273 	}
274 
275 	/* Fill the local message pool cache. */
276 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
277 	if (rc == 0) {
278 		/* If we can't populate the cache it's ok. The cache will get filled
279 		 * up organically as messages are passed to the thread. */
280 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
281 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
282 			thread->msg_cache_count++;
283 		}
284 	}
285 
286 	if (name) {
287 		snprintf(thread->name, sizeof(thread->name), "%s", name);
288 	} else {
289 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
290 	}
291 
292 	pthread_mutex_lock(&g_devlist_mutex);
293 	if (g_thread_id == 0) {
294 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
295 		pthread_mutex_unlock(&g_devlist_mutex);
296 		_free_thread(thread);
297 		return NULL;
298 	}
299 	thread->id = g_thread_id++;
300 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
301 	g_thread_count++;
302 	pthread_mutex_unlock(&g_devlist_mutex);
303 
304 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
305 		      thread->id, thread->name);
306 
307 	if (g_new_thread_fn) {
308 		rc = g_new_thread_fn(thread);
309 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
310 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
311 	}
312 
313 	if (rc != 0) {
314 		_free_thread(thread);
315 		return NULL;
316 	}
317 
318 	thread->state = SPDK_THREAD_STATE_RUNNING;
319 
320 	return thread;
321 }
322 
323 void
324 spdk_set_thread(struct spdk_thread *thread)
325 {
326 	tls_thread = thread;
327 }
328 
329 static void
330 thread_exit(struct spdk_thread *thread, uint64_t now)
331 {
332 	struct spdk_poller *poller;
333 	struct spdk_io_channel *ch;
334 
335 	if (now >= thread->exit_timeout_tsc) {
336 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
337 			    thread->name);
338 		goto exited;
339 	}
340 
341 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
342 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
343 			SPDK_INFOLOG(thread,
344 				     "thread %s still has active poller %s\n",
345 				     thread->name, poller->name);
346 			return;
347 		}
348 	}
349 
350 	TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) {
351 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
352 			SPDK_INFOLOG(thread,
353 				     "thread %s still has active timed poller %s\n",
354 				     thread->name, poller->name);
355 			return;
356 		}
357 	}
358 
359 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
360 		SPDK_INFOLOG(thread,
361 			     "thread %s still has paused poller %s\n",
362 			     thread->name, poller->name);
363 		return;
364 	}
365 
366 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
367 		SPDK_INFOLOG(thread,
368 			     "thread %s still has channel for io_device %s\n",
369 			     thread->name, ch->dev->name);
370 		return;
371 	}
372 
373 exited:
374 	thread->state = SPDK_THREAD_STATE_EXITED;
375 }
376 
377 int
378 spdk_thread_exit(struct spdk_thread *thread)
379 {
380 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
381 
382 	assert(tls_thread == thread);
383 
384 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
385 		SPDK_INFOLOG(thread,
386 			     "thread %s is already exiting\n",
387 			     thread->name);
388 		return 0;
389 	}
390 
391 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
392 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
393 	thread->state = SPDK_THREAD_STATE_EXITING;
394 	return 0;
395 }
396 
397 bool
398 spdk_thread_is_exited(struct spdk_thread *thread)
399 {
400 	return thread->state == SPDK_THREAD_STATE_EXITED;
401 }
402 
403 void
404 spdk_thread_destroy(struct spdk_thread *thread)
405 {
406 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
407 
408 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
409 
410 	if (tls_thread == thread) {
411 		tls_thread = NULL;
412 	}
413 
414 	_free_thread(thread);
415 }
416 
417 void *
418 spdk_thread_get_ctx(struct spdk_thread *thread)
419 {
420 	if (g_ctx_sz > 0) {
421 		return thread->ctx;
422 	}
423 
424 	return NULL;
425 }
426 
427 struct spdk_cpuset *
428 spdk_thread_get_cpumask(struct spdk_thread *thread)
429 {
430 	return &thread->cpumask;
431 }
432 
433 int
434 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
435 {
436 	struct spdk_thread *thread;
437 
438 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
439 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
440 		assert(false);
441 		return -ENOTSUP;
442 	}
443 
444 	thread = spdk_get_thread();
445 	if (!thread) {
446 		SPDK_ERRLOG("Called from non-SPDK thread\n");
447 		assert(false);
448 		return -EINVAL;
449 	}
450 
451 	spdk_cpuset_copy(&thread->cpumask, cpumask);
452 
453 	/* Invoke framework's reschedule operation. If this function is called multiple times
454 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
455 	 * reschedule operation.
456 	 */
457 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
458 
459 	return 0;
460 }
461 
462 struct spdk_thread *
463 spdk_thread_get_from_ctx(void *ctx)
464 {
465 	if (ctx == NULL) {
466 		assert(false);
467 		return NULL;
468 	}
469 
470 	assert(g_ctx_sz > 0);
471 
472 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
473 }
474 
475 static inline uint32_t
476 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
477 {
478 	unsigned count, i;
479 	void *messages[SPDK_MSG_BATCH_SIZE];
480 
481 #ifdef DEBUG
482 	/*
483 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
484 	 * so we will never actually read uninitialized data from events, but just to be sure
485 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
486 	 */
487 	memset(messages, 0, sizeof(messages));
488 #endif
489 
490 	if (max_msgs > 0) {
491 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
492 	} else {
493 		max_msgs = SPDK_MSG_BATCH_SIZE;
494 	}
495 
496 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
497 	if (count == 0) {
498 		return 0;
499 	}
500 
501 	for (i = 0; i < count; i++) {
502 		struct spdk_msg *msg = messages[i];
503 
504 		assert(msg != NULL);
505 		msg->fn(msg->arg);
506 
507 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
508 			/* Insert the messages at the head. We want to re-use the hot
509 			 * ones. */
510 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
511 			thread->msg_cache_count++;
512 		} else {
513 			spdk_mempool_put(g_spdk_msg_mempool, msg);
514 		}
515 	}
516 
517 	return count;
518 }
519 
520 static void
521 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
522 {
523 	struct spdk_poller *iter;
524 
525 	poller->next_run_tick = now + poller->period_ticks;
526 
527 	/*
528 	 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled
529 	 * run time.
530 	 */
531 	TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) {
532 		if (iter->next_run_tick <= poller->next_run_tick) {
533 			TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq);
534 			return;
535 		}
536 	}
537 
538 	/* No earlier pollers were found, so this poller must be the new head */
539 	TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq);
540 }
541 
542 static void
543 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
544 {
545 	if (poller->period_ticks) {
546 		poller_insert_timer(thread, poller, spdk_get_ticks());
547 	} else {
548 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
549 	}
550 }
551 
552 static inline void
553 thread_update_stats(struct spdk_thread *thread, uint64_t end,
554 		    uint64_t start, int rc)
555 {
556 	if (rc == 0) {
557 		/* Poller status idle */
558 		thread->stats.idle_tsc += end - start;
559 	} else if (rc > 0) {
560 		/* Poller status busy */
561 		thread->stats.busy_tsc += end - start;
562 	}
563 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
564 	thread->tsc_last = end;
565 }
566 
567 static int
568 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
569 {
570 	uint32_t msg_count;
571 	struct spdk_poller *poller, *tmp;
572 	spdk_msg_fn critical_msg;
573 	int rc = 0;
574 
575 	thread->tsc_last = now;
576 
577 	critical_msg = thread->critical_msg;
578 	if (spdk_unlikely(critical_msg != NULL)) {
579 		critical_msg(NULL);
580 		thread->critical_msg = NULL;
581 	}
582 
583 	msg_count = msg_queue_run_batch(thread, max_msgs);
584 	if (msg_count) {
585 		rc = 1;
586 	}
587 
588 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
589 				   active_pollers_head, tailq, tmp) {
590 		int poller_rc;
591 
592 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
593 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
594 			free(poller);
595 			continue;
596 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
597 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
598 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
599 			poller->state = SPDK_POLLER_STATE_PAUSED;
600 			continue;
601 		}
602 
603 		poller->state = SPDK_POLLER_STATE_RUNNING;
604 		poller_rc = poller->fn(poller->arg);
605 
606 		poller->run_count++;
607 		if (poller_rc > 0) {
608 			poller->busy_count++;
609 		}
610 
611 #ifdef DEBUG
612 		if (poller_rc == -1) {
613 			SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
614 		}
615 #endif
616 
617 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
618 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
619 			free(poller);
620 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
621 			poller->state = SPDK_POLLER_STATE_WAITING;
622 		}
623 
624 		if (poller_rc > rc) {
625 			rc = poller_rc;
626 		}
627 	}
628 
629 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) {
630 		int timer_rc = 0;
631 
632 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
633 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
634 			free(poller);
635 			continue;
636 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
637 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
638 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
639 			poller->state = SPDK_POLLER_STATE_PAUSED;
640 			continue;
641 		}
642 
643 		if (now < poller->next_run_tick) {
644 			break;
645 		}
646 
647 		poller->state = SPDK_POLLER_STATE_RUNNING;
648 		timer_rc = poller->fn(poller->arg);
649 
650 		poller->run_count++;
651 		if (timer_rc > 0) {
652 			poller->busy_count++;
653 		}
654 
655 #ifdef DEBUG
656 		if (timer_rc == -1) {
657 			SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
658 		}
659 #endif
660 
661 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
662 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
663 			free(poller);
664 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
665 			poller->state = SPDK_POLLER_STATE_WAITING;
666 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
667 			poller_insert_timer(thread, poller, now);
668 		}
669 
670 		if (timer_rc > rc) {
671 			rc = timer_rc;
672 		}
673 	}
674 
675 	return rc;
676 }
677 
678 int
679 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
680 {
681 	struct spdk_thread *orig_thread;
682 	int rc;
683 
684 	orig_thread = _get_thread();
685 	tls_thread = thread;
686 
687 	if (now == 0) {
688 		now = spdk_get_ticks();
689 	}
690 
691 	rc = thread_poll(thread, max_msgs, now);
692 
693 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
694 		thread_exit(thread, now);
695 	}
696 
697 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
698 
699 	tls_thread = orig_thread;
700 
701 	return rc;
702 }
703 
704 uint64_t
705 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
706 {
707 	struct spdk_poller *poller;
708 
709 	poller = TAILQ_FIRST(&thread->timed_pollers);
710 	if (poller) {
711 		return poller->next_run_tick;
712 	}
713 
714 	return 0;
715 }
716 
717 int
718 spdk_thread_has_active_pollers(struct spdk_thread *thread)
719 {
720 	return !TAILQ_EMPTY(&thread->active_pollers);
721 }
722 
723 static bool
724 thread_has_unpaused_pollers(struct spdk_thread *thread)
725 {
726 	if (TAILQ_EMPTY(&thread->active_pollers) &&
727 	    TAILQ_EMPTY(&thread->timed_pollers)) {
728 		return false;
729 	}
730 
731 	return true;
732 }
733 
734 bool
735 spdk_thread_has_pollers(struct spdk_thread *thread)
736 {
737 	if (!thread_has_unpaused_pollers(thread) &&
738 	    TAILQ_EMPTY(&thread->paused_pollers)) {
739 		return false;
740 	}
741 
742 	return true;
743 }
744 
745 bool
746 spdk_thread_is_idle(struct spdk_thread *thread)
747 {
748 	if (spdk_ring_count(thread->messages) ||
749 	    thread_has_unpaused_pollers(thread) ||
750 	    thread->critical_msg != NULL) {
751 		return false;
752 	}
753 
754 	return true;
755 }
756 
757 uint32_t
758 spdk_thread_get_count(void)
759 {
760 	/*
761 	 * Return cached value of the current thread count.  We could acquire the
762 	 *  lock and iterate through the TAILQ of threads to count them, but that
763 	 *  count could still be invalidated after we release the lock.
764 	 */
765 	return g_thread_count;
766 }
767 
768 struct spdk_thread *
769 spdk_get_thread(void)
770 {
771 	return _get_thread();
772 }
773 
774 const char *
775 spdk_thread_get_name(const struct spdk_thread *thread)
776 {
777 	return thread->name;
778 }
779 
780 uint64_t
781 spdk_thread_get_id(const struct spdk_thread *thread)
782 {
783 	return thread->id;
784 }
785 
786 struct spdk_thread *
787 spdk_thread_get_by_id(uint64_t id)
788 {
789 	struct spdk_thread *thread;
790 
791 	pthread_mutex_lock(&g_devlist_mutex);
792 	TAILQ_FOREACH(thread, &g_threads, tailq) {
793 		if (thread->id == id) {
794 			pthread_mutex_unlock(&g_devlist_mutex);
795 
796 			return thread;
797 		}
798 	}
799 	pthread_mutex_unlock(&g_devlist_mutex);
800 
801 	return NULL;
802 }
803 
804 int
805 spdk_thread_get_stats(struct spdk_thread_stats *stats)
806 {
807 	struct spdk_thread *thread;
808 
809 	thread = _get_thread();
810 	if (!thread) {
811 		SPDK_ERRLOG("No thread allocated\n");
812 		return -EINVAL;
813 	}
814 
815 	if (stats == NULL) {
816 		return -EINVAL;
817 	}
818 
819 	*stats = thread->stats;
820 
821 	return 0;
822 }
823 
824 uint64_t
825 spdk_thread_get_last_tsc(struct spdk_thread *thread)
826 {
827 	if (thread == NULL) {
828 		thread = _get_thread();
829 	}
830 
831 	return thread->tsc_last;
832 }
833 
834 int
835 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
836 {
837 	struct spdk_thread *local_thread;
838 	struct spdk_msg *msg;
839 	int rc;
840 
841 	assert(thread != NULL);
842 
843 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
844 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
845 		return -EIO;
846 	}
847 
848 	local_thread = _get_thread();
849 
850 	msg = NULL;
851 	if (local_thread != NULL) {
852 		if (local_thread->msg_cache_count > 0) {
853 			msg = SLIST_FIRST(&local_thread->msg_cache);
854 			assert(msg != NULL);
855 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
856 			local_thread->msg_cache_count--;
857 		}
858 	}
859 
860 	if (msg == NULL) {
861 		msg = spdk_mempool_get(g_spdk_msg_mempool);
862 		if (!msg) {
863 			SPDK_ERRLOG("msg could not be allocated\n");
864 			return -ENOMEM;
865 		}
866 	}
867 
868 	msg->fn = fn;
869 	msg->arg = ctx;
870 
871 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
872 	if (rc != 1) {
873 		SPDK_ERRLOG("msg could not be enqueued\n");
874 		spdk_mempool_put(g_spdk_msg_mempool, msg);
875 		return -EIO;
876 	}
877 
878 	return 0;
879 }
880 
881 int
882 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
883 {
884 	spdk_msg_fn expected = NULL;
885 
886 	if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
887 					__ATOMIC_SEQ_CST)) {
888 		return 0;
889 	}
890 
891 	return -EIO;
892 }
893 
894 static struct spdk_poller *
895 poller_register(spdk_poller_fn fn,
896 		void *arg,
897 		uint64_t period_microseconds,
898 		const char *name)
899 {
900 	struct spdk_thread *thread;
901 	struct spdk_poller *poller;
902 	uint64_t quotient, remainder, ticks;
903 
904 	thread = spdk_get_thread();
905 	if (!thread) {
906 		assert(false);
907 		return NULL;
908 	}
909 
910 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
911 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
912 		return NULL;
913 	}
914 
915 	poller = calloc(1, sizeof(*poller));
916 	if (poller == NULL) {
917 		SPDK_ERRLOG("Poller memory allocation failed\n");
918 		return NULL;
919 	}
920 
921 	if (name) {
922 		snprintf(poller->name, sizeof(poller->name), "%s", name);
923 	} else {
924 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
925 	}
926 
927 	poller->state = SPDK_POLLER_STATE_WAITING;
928 	poller->fn = fn;
929 	poller->arg = arg;
930 	poller->thread = thread;
931 
932 	if (period_microseconds) {
933 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
934 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
935 		ticks = spdk_get_ticks_hz();
936 
937 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
938 	} else {
939 		poller->period_ticks = 0;
940 	}
941 
942 	thread_insert_poller(thread, poller);
943 
944 	return poller;
945 }
946 
947 struct spdk_poller *
948 spdk_poller_register(spdk_poller_fn fn,
949 		     void *arg,
950 		     uint64_t period_microseconds)
951 {
952 	return poller_register(fn, arg, period_microseconds, NULL);
953 }
954 
955 struct spdk_poller *
956 spdk_poller_register_named(spdk_poller_fn fn,
957 			   void *arg,
958 			   uint64_t period_microseconds,
959 			   const char *name)
960 {
961 	return poller_register(fn, arg, period_microseconds, name);
962 }
963 
964 void
965 spdk_poller_unregister(struct spdk_poller **ppoller)
966 {
967 	struct spdk_thread *thread;
968 	struct spdk_poller *poller;
969 
970 	poller = *ppoller;
971 	if (poller == NULL) {
972 		return;
973 	}
974 
975 	*ppoller = NULL;
976 
977 	thread = spdk_get_thread();
978 	if (!thread) {
979 		assert(false);
980 		return;
981 	}
982 
983 	if (poller->thread != thread) {
984 		SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n");
985 		assert(false);
986 		return;
987 	}
988 
989 	/* If the poller was paused, put it on the active_pollers list so that
990 	 * its unregistration can be processed by spdk_thread_poll().
991 	 */
992 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
993 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
994 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
995 		poller->period_ticks = 0;
996 	}
997 
998 	/* Simply set the state to unregistered. The poller will get cleaned up
999 	 * in a subsequent call to spdk_thread_poll().
1000 	 */
1001 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1002 }
1003 
1004 void
1005 spdk_poller_pause(struct spdk_poller *poller)
1006 {
1007 	struct spdk_thread *thread;
1008 
1009 	if (poller->state == SPDK_POLLER_STATE_PAUSED ||
1010 	    poller->state == SPDK_POLLER_STATE_PAUSING) {
1011 		return;
1012 	}
1013 
1014 	thread = spdk_get_thread();
1015 	if (!thread) {
1016 		assert(false);
1017 		return;
1018 	}
1019 
1020 	/* If a poller is paused from within itself, we can immediately move it
1021 	 * on the paused_pollers list.  Otherwise we just set its state to
1022 	 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it.  It
1023 	 * allows a poller to be paused from another one's context without
1024 	 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration.
1025 	 */
1026 	if (poller->state != SPDK_POLLER_STATE_RUNNING) {
1027 		poller->state = SPDK_POLLER_STATE_PAUSING;
1028 	} else {
1029 		if (poller->period_ticks > 0) {
1030 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
1031 		} else {
1032 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1033 		}
1034 
1035 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1036 		poller->state = SPDK_POLLER_STATE_PAUSED;
1037 	}
1038 }
1039 
1040 void
1041 spdk_poller_resume(struct spdk_poller *poller)
1042 {
1043 	struct spdk_thread *thread;
1044 
1045 	if (poller->state != SPDK_POLLER_STATE_PAUSED &&
1046 	    poller->state != SPDK_POLLER_STATE_PAUSING) {
1047 		return;
1048 	}
1049 
1050 	thread = spdk_get_thread();
1051 	if (!thread) {
1052 		assert(false);
1053 		return;
1054 	}
1055 
1056 	/* If a poller is paused it has to be removed from the paused pollers
1057 	 * list and put on the active / timer list depending on its
1058 	 * period_ticks.  If a poller is still in the process of being paused,
1059 	 * we just need to flip its state back to waiting, as it's already on
1060 	 * the appropriate list.
1061 	 */
1062 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1063 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1064 		thread_insert_poller(thread, poller);
1065 	}
1066 
1067 	poller->state = SPDK_POLLER_STATE_WAITING;
1068 }
1069 
1070 const char *
1071 spdk_poller_state_str(enum spdk_poller_state state)
1072 {
1073 	switch (state) {
1074 	case SPDK_POLLER_STATE_WAITING:
1075 		return "waiting";
1076 	case SPDK_POLLER_STATE_RUNNING:
1077 		return "running";
1078 	case SPDK_POLLER_STATE_UNREGISTERED:
1079 		return "unregistered";
1080 	case SPDK_POLLER_STATE_PAUSING:
1081 		return "pausing";
1082 	case SPDK_POLLER_STATE_PAUSED:
1083 		return "paused";
1084 	default:
1085 		return NULL;
1086 	}
1087 }
1088 
1089 struct call_thread {
1090 	struct spdk_thread *cur_thread;
1091 	spdk_msg_fn fn;
1092 	void *ctx;
1093 
1094 	struct spdk_thread *orig_thread;
1095 	spdk_msg_fn cpl;
1096 };
1097 
1098 static void
1099 _on_thread(void *ctx)
1100 {
1101 	struct call_thread *ct = ctx;
1102 	int rc __attribute__((unused));
1103 
1104 	ct->fn(ct->ctx);
1105 
1106 	pthread_mutex_lock(&g_devlist_mutex);
1107 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1108 	pthread_mutex_unlock(&g_devlist_mutex);
1109 
1110 	if (!ct->cur_thread) {
1111 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1112 
1113 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1114 		free(ctx);
1115 	} else {
1116 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1117 			      ct->cur_thread->name);
1118 
1119 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1120 	}
1121 	assert(rc == 0);
1122 }
1123 
1124 void
1125 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1126 {
1127 	struct call_thread *ct;
1128 	struct spdk_thread *thread;
1129 	int rc __attribute__((unused));
1130 
1131 	ct = calloc(1, sizeof(*ct));
1132 	if (!ct) {
1133 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1134 		cpl(ctx);
1135 		return;
1136 	}
1137 
1138 	ct->fn = fn;
1139 	ct->ctx = ctx;
1140 	ct->cpl = cpl;
1141 
1142 	thread = _get_thread();
1143 	if (!thread) {
1144 		SPDK_ERRLOG("No thread allocated\n");
1145 		free(ct);
1146 		cpl(ctx);
1147 		return;
1148 	}
1149 	ct->orig_thread = thread;
1150 
1151 	pthread_mutex_lock(&g_devlist_mutex);
1152 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1153 	pthread_mutex_unlock(&g_devlist_mutex);
1154 
1155 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
1156 		      ct->orig_thread->name);
1157 
1158 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1159 	assert(rc == 0);
1160 }
1161 
1162 void
1163 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1164 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1165 			const char *name)
1166 {
1167 	struct io_device *dev, *tmp;
1168 	struct spdk_thread *thread;
1169 
1170 	assert(io_device != NULL);
1171 	assert(create_cb != NULL);
1172 	assert(destroy_cb != NULL);
1173 
1174 	thread = spdk_get_thread();
1175 	if (!thread) {
1176 		SPDK_ERRLOG("called from non-SPDK thread\n");
1177 		assert(false);
1178 		return;
1179 	}
1180 
1181 	dev = calloc(1, sizeof(struct io_device));
1182 	if (dev == NULL) {
1183 		SPDK_ERRLOG("could not allocate io_device\n");
1184 		return;
1185 	}
1186 
1187 	dev->io_device = io_device;
1188 	if (name) {
1189 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1190 	} else {
1191 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1192 	}
1193 	dev->create_cb = create_cb;
1194 	dev->destroy_cb = destroy_cb;
1195 	dev->unregister_cb = NULL;
1196 	dev->ctx_size = ctx_size;
1197 	dev->for_each_count = 0;
1198 	dev->unregistered = false;
1199 	dev->refcnt = 0;
1200 
1201 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
1202 		      dev->name, dev->io_device, thread->name);
1203 
1204 	pthread_mutex_lock(&g_devlist_mutex);
1205 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
1206 		if (tmp->io_device == io_device) {
1207 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1208 				    io_device, tmp->name, dev->name);
1209 			free(dev);
1210 			pthread_mutex_unlock(&g_devlist_mutex);
1211 			return;
1212 		}
1213 	}
1214 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1215 	pthread_mutex_unlock(&g_devlist_mutex);
1216 }
1217 
1218 static void
1219 _finish_unregister(void *arg)
1220 {
1221 	struct io_device *dev = arg;
1222 
1223 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1224 		      dev->name, dev->io_device, dev->unregister_thread->name);
1225 
1226 	dev->unregister_cb(dev->io_device);
1227 	free(dev);
1228 }
1229 
1230 static void
1231 io_device_free(struct io_device *dev)
1232 {
1233 	int rc __attribute__((unused));
1234 
1235 	if (dev->unregister_cb == NULL) {
1236 		free(dev);
1237 	} else {
1238 		assert(dev->unregister_thread != NULL);
1239 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
1240 			      dev->name, dev->io_device, dev->unregister_thread->name);
1241 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1242 		assert(rc == 0);
1243 	}
1244 }
1245 
1246 void
1247 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1248 {
1249 	struct io_device *dev;
1250 	uint32_t refcnt;
1251 	struct spdk_thread *thread;
1252 
1253 	thread = spdk_get_thread();
1254 	if (!thread) {
1255 		SPDK_ERRLOG("called from non-SPDK thread\n");
1256 		assert(false);
1257 		return;
1258 	}
1259 
1260 	pthread_mutex_lock(&g_devlist_mutex);
1261 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1262 		if (dev->io_device == io_device) {
1263 			break;
1264 		}
1265 	}
1266 
1267 	if (!dev) {
1268 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1269 		assert(false);
1270 		pthread_mutex_unlock(&g_devlist_mutex);
1271 		return;
1272 	}
1273 
1274 	if (dev->for_each_count > 0) {
1275 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1276 			    dev->name, io_device, dev->for_each_count);
1277 		pthread_mutex_unlock(&g_devlist_mutex);
1278 		return;
1279 	}
1280 
1281 	dev->unregister_cb = unregister_cb;
1282 	dev->unregistered = true;
1283 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
1284 	refcnt = dev->refcnt;
1285 	dev->unregister_thread = thread;
1286 	pthread_mutex_unlock(&g_devlist_mutex);
1287 
1288 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
1289 		      dev->name, dev->io_device, thread->name);
1290 
1291 	if (refcnt > 0) {
1292 		/* defer deletion */
1293 		return;
1294 	}
1295 
1296 	io_device_free(dev);
1297 }
1298 
1299 const char *
1300 spdk_io_device_get_name(struct io_device *dev)
1301 {
1302 	return dev->name;
1303 }
1304 
1305 struct spdk_io_channel *
1306 spdk_get_io_channel(void *io_device)
1307 {
1308 	struct spdk_io_channel *ch;
1309 	struct spdk_thread *thread;
1310 	struct io_device *dev;
1311 	int rc;
1312 
1313 	pthread_mutex_lock(&g_devlist_mutex);
1314 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1315 		if (dev->io_device == io_device) {
1316 			break;
1317 		}
1318 	}
1319 	if (dev == NULL) {
1320 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
1321 		pthread_mutex_unlock(&g_devlist_mutex);
1322 		return NULL;
1323 	}
1324 
1325 	thread = _get_thread();
1326 	if (!thread) {
1327 		SPDK_ERRLOG("No thread allocated\n");
1328 		pthread_mutex_unlock(&g_devlist_mutex);
1329 		return NULL;
1330 	}
1331 
1332 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1333 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
1334 		pthread_mutex_unlock(&g_devlist_mutex);
1335 		return NULL;
1336 	}
1337 
1338 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1339 		if (ch->dev == dev) {
1340 			ch->ref++;
1341 
1342 			SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1343 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
1344 
1345 			/*
1346 			 * An I/O channel already exists for this device on this
1347 			 *  thread, so return it.
1348 			 */
1349 			pthread_mutex_unlock(&g_devlist_mutex);
1350 			return ch;
1351 		}
1352 	}
1353 
1354 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1355 	if (ch == NULL) {
1356 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1357 		pthread_mutex_unlock(&g_devlist_mutex);
1358 		return NULL;
1359 	}
1360 
1361 	ch->dev = dev;
1362 	ch->destroy_cb = dev->destroy_cb;
1363 	ch->thread = thread;
1364 	ch->ref = 1;
1365 	ch->destroy_ref = 0;
1366 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1367 
1368 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1369 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1370 
1371 	dev->refcnt++;
1372 
1373 	pthread_mutex_unlock(&g_devlist_mutex);
1374 
1375 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1376 	if (rc != 0) {
1377 		pthread_mutex_lock(&g_devlist_mutex);
1378 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1379 		dev->refcnt--;
1380 		free(ch);
1381 		pthread_mutex_unlock(&g_devlist_mutex);
1382 		return NULL;
1383 	}
1384 
1385 	return ch;
1386 }
1387 
1388 static void
1389 put_io_channel(void *arg)
1390 {
1391 	struct spdk_io_channel *ch = arg;
1392 	bool do_remove_dev = true;
1393 	struct spdk_thread *thread;
1394 
1395 	thread = spdk_get_thread();
1396 	if (!thread) {
1397 		SPDK_ERRLOG("called from non-SPDK thread\n");
1398 		assert(false);
1399 		return;
1400 	}
1401 
1402 	SPDK_DEBUGLOG(thread,
1403 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
1404 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
1405 
1406 	assert(ch->thread == thread);
1407 
1408 	ch->destroy_ref--;
1409 
1410 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1411 		/*
1412 		 * Another reference to the associated io_device was requested
1413 		 *  after this message was sent but before it had a chance to
1414 		 *  execute.
1415 		 */
1416 		return;
1417 	}
1418 
1419 	pthread_mutex_lock(&g_devlist_mutex);
1420 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1421 	pthread_mutex_unlock(&g_devlist_mutex);
1422 
1423 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1424 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1425 
1426 	pthread_mutex_lock(&g_devlist_mutex);
1427 	ch->dev->refcnt--;
1428 
1429 	if (!ch->dev->unregistered) {
1430 		do_remove_dev = false;
1431 	}
1432 
1433 	if (ch->dev->refcnt > 0) {
1434 		do_remove_dev = false;
1435 	}
1436 
1437 	pthread_mutex_unlock(&g_devlist_mutex);
1438 
1439 	if (do_remove_dev) {
1440 		io_device_free(ch->dev);
1441 	}
1442 	free(ch);
1443 }
1444 
1445 void
1446 spdk_put_io_channel(struct spdk_io_channel *ch)
1447 {
1448 	struct spdk_thread *thread;
1449 	int rc __attribute__((unused));
1450 
1451 	thread = spdk_get_thread();
1452 	if (!thread) {
1453 		SPDK_ERRLOG("called from non-SPDK thread\n");
1454 		assert(false);
1455 		return;
1456 	}
1457 
1458 	if (ch->thread != thread) {
1459 		SPDK_ERRLOG("different from the thread that called get_io_channel()\n");
1460 		assert(false);
1461 		return;
1462 	}
1463 
1464 	SPDK_DEBUGLOG(thread,
1465 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1466 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
1467 
1468 	ch->ref--;
1469 
1470 	if (ch->ref == 0) {
1471 		ch->destroy_ref++;
1472 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
1473 		assert(rc == 0);
1474 	}
1475 }
1476 
1477 struct spdk_io_channel *
1478 spdk_io_channel_from_ctx(void *ctx)
1479 {
1480 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1481 }
1482 
1483 struct spdk_thread *
1484 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1485 {
1486 	return ch->thread;
1487 }
1488 
1489 struct spdk_io_channel_iter {
1490 	void *io_device;
1491 	struct io_device *dev;
1492 	spdk_channel_msg fn;
1493 	int status;
1494 	void *ctx;
1495 	struct spdk_io_channel *ch;
1496 
1497 	struct spdk_thread *cur_thread;
1498 
1499 	struct spdk_thread *orig_thread;
1500 	spdk_channel_for_each_cpl cpl;
1501 };
1502 
1503 void *
1504 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1505 {
1506 	return i->io_device;
1507 }
1508 
1509 struct spdk_io_channel *
1510 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1511 {
1512 	return i->ch;
1513 }
1514 
1515 void *
1516 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1517 {
1518 	return i->ctx;
1519 }
1520 
1521 static void
1522 _call_completion(void *ctx)
1523 {
1524 	struct spdk_io_channel_iter *i = ctx;
1525 
1526 	if (i->cpl != NULL) {
1527 		i->cpl(i, i->status);
1528 	}
1529 	free(i);
1530 }
1531 
1532 static void
1533 _call_channel(void *ctx)
1534 {
1535 	struct spdk_io_channel_iter *i = ctx;
1536 	struct spdk_io_channel *ch;
1537 
1538 	/*
1539 	 * It is possible that the channel was deleted before this
1540 	 *  message had a chance to execute.  If so, skip calling
1541 	 *  the fn() on this thread.
1542 	 */
1543 	pthread_mutex_lock(&g_devlist_mutex);
1544 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1545 		if (ch->dev->io_device == i->io_device) {
1546 			break;
1547 		}
1548 	}
1549 	pthread_mutex_unlock(&g_devlist_mutex);
1550 
1551 	if (ch) {
1552 		i->fn(i);
1553 	} else {
1554 		spdk_for_each_channel_continue(i, 0);
1555 	}
1556 }
1557 
1558 void
1559 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1560 		      spdk_channel_for_each_cpl cpl)
1561 {
1562 	struct spdk_thread *thread;
1563 	struct spdk_io_channel *ch;
1564 	struct spdk_io_channel_iter *i;
1565 	int rc __attribute__((unused));
1566 
1567 	i = calloc(1, sizeof(*i));
1568 	if (!i) {
1569 		SPDK_ERRLOG("Unable to allocate iterator\n");
1570 		return;
1571 	}
1572 
1573 	i->io_device = io_device;
1574 	i->fn = fn;
1575 	i->ctx = ctx;
1576 	i->cpl = cpl;
1577 
1578 	pthread_mutex_lock(&g_devlist_mutex);
1579 	i->orig_thread = _get_thread();
1580 
1581 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1582 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1583 			if (ch->dev->io_device == io_device) {
1584 				ch->dev->for_each_count++;
1585 				i->dev = ch->dev;
1586 				i->cur_thread = thread;
1587 				i->ch = ch;
1588 				pthread_mutex_unlock(&g_devlist_mutex);
1589 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1590 				assert(rc == 0);
1591 				return;
1592 			}
1593 		}
1594 	}
1595 
1596 	pthread_mutex_unlock(&g_devlist_mutex);
1597 
1598 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1599 	assert(rc == 0);
1600 }
1601 
1602 void
1603 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1604 {
1605 	struct spdk_thread *thread;
1606 	struct spdk_io_channel *ch;
1607 	int rc __attribute__((unused));
1608 
1609 	assert(i->cur_thread == spdk_get_thread());
1610 
1611 	i->status = status;
1612 
1613 	pthread_mutex_lock(&g_devlist_mutex);
1614 	if (status) {
1615 		goto end;
1616 	}
1617 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1618 	while (thread) {
1619 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1620 			if (ch->dev->io_device == i->io_device) {
1621 				i->cur_thread = thread;
1622 				i->ch = ch;
1623 				pthread_mutex_unlock(&g_devlist_mutex);
1624 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1625 				assert(rc == 0);
1626 				return;
1627 			}
1628 		}
1629 		thread = TAILQ_NEXT(thread, tailq);
1630 	}
1631 
1632 end:
1633 	i->dev->for_each_count--;
1634 	i->ch = NULL;
1635 	pthread_mutex_unlock(&g_devlist_mutex);
1636 
1637 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1638 	assert(rc == 0);
1639 }
1640 
1641 
1642 SPDK_LOG_REGISTER_COMPONENT(thread)
1643