xref: /spdk/lib/thread/thread.c (revision dd1c38cc680e4e8ca2642e93bf289072bff7fc3d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/queue.h"
38 #include "spdk/string.h"
39 #include "spdk/thread.h"
40 #include "spdk/util.h"
41 
42 #include "spdk_internal/log.h"
43 #include "spdk_internal/thread.h"
44 
45 #define SPDK_MSG_BATCH_SIZE		8
46 
47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
48 
49 static spdk_new_thread_fn g_new_thread_fn = NULL;
50 static size_t g_ctx_sz = 0;
51 
52 struct io_device {
53 	void				*io_device;
54 	char				*name;
55 	spdk_io_channel_create_cb	create_cb;
56 	spdk_io_channel_destroy_cb	destroy_cb;
57 	spdk_io_device_unregister_cb	unregister_cb;
58 	struct spdk_thread		*unregister_thread;
59 	uint32_t			ctx_size;
60 	uint32_t			for_each_count;
61 	TAILQ_ENTRY(io_device)		tailq;
62 
63 	uint32_t			refcnt;
64 
65 	bool				unregistered;
66 };
67 
68 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
69 
70 struct spdk_msg {
71 	spdk_msg_fn		fn;
72 	void			*arg;
73 
74 	SLIST_ENTRY(spdk_msg)	link;
75 };
76 
77 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
78 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
79 
80 enum spdk_poller_state {
81 	/* The poller is registered with a thread but not currently executing its fn. */
82 	SPDK_POLLER_STATE_WAITING,
83 
84 	/* The poller is currently running its fn. */
85 	SPDK_POLLER_STATE_RUNNING,
86 
87 	/* The poller was unregistered during the execution of its fn. */
88 	SPDK_POLLER_STATE_UNREGISTERED,
89 };
90 
91 struct spdk_poller {
92 	TAILQ_ENTRY(spdk_poller)	tailq;
93 
94 	/* Current state of the poller; should only be accessed from the poller's thread. */
95 	enum spdk_poller_state		state;
96 
97 	uint64_t			period_ticks;
98 	uint64_t			next_run_tick;
99 	spdk_poller_fn			fn;
100 	void				*arg;
101 };
102 
103 struct spdk_thread {
104 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
105 	TAILQ_ENTRY(spdk_thread)	tailq;
106 	char				*name;
107 
108 	struct spdk_cpuset		*cpumask;
109 
110 	uint64_t			tsc_last;
111 	struct spdk_thread_stats	stats;
112 
113 	/*
114 	 * Contains pollers actively running on this thread.  Pollers
115 	 *  are run round-robin. The thread takes one poller from the head
116 	 *  of the ring, executes it, then puts it back at the tail of
117 	 *  the ring.
118 	 */
119 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
120 
121 	/**
122 	 * Contains pollers running on this thread with a periodic timer.
123 	 */
124 	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;
125 
126 	struct spdk_ring		*messages;
127 
128 	SLIST_HEAD(, spdk_msg)		msg_cache;
129 	size_t				msg_cache_count;
130 
131 	/* User context allocated at the end */
132 	uint8_t				ctx[0];
133 };
134 
135 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
136 static uint32_t g_thread_count = 0;
137 
138 static __thread struct spdk_thread *tls_thread = NULL;
139 
140 static inline struct spdk_thread *
141 _get_thread(void)
142 {
143 	return tls_thread;
144 }
145 
146 int
147 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
148 {
149 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
150 
151 	assert(g_new_thread_fn == NULL);
152 	g_new_thread_fn = new_thread_fn;
153 
154 	g_ctx_sz = ctx_sz;
155 
156 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
157 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
158 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
159 			     sizeof(struct spdk_msg),
160 			     0, /* No cache. We do our own. */
161 			     SPDK_ENV_SOCKET_ID_ANY);
162 
163 	if (!g_spdk_msg_mempool) {
164 		return -1;
165 	}
166 
167 	return 0;
168 }
169 
170 void
171 spdk_thread_lib_fini(void)
172 {
173 	struct io_device *dev;
174 
175 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
176 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
177 	}
178 
179 	if (g_spdk_msg_mempool) {
180 		spdk_mempool_free(g_spdk_msg_mempool);
181 	}
182 }
183 
184 struct spdk_thread *
185 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
186 {
187 	struct spdk_thread *thread;
188 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
189 	int rc, i;
190 
191 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
192 	if (!thread) {
193 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
194 		return NULL;
195 	}
196 
197 	thread->cpumask = spdk_cpuset_alloc();
198 	if (!thread->cpumask) {
199 		free(thread);
200 		SPDK_ERRLOG("Unable to allocate memory for CPU mask\n");
201 		return NULL;
202 	}
203 
204 	if (cpumask) {
205 		spdk_cpuset_copy(thread->cpumask, cpumask);
206 	} else {
207 		spdk_cpuset_negate(thread->cpumask);
208 	}
209 
210 	TAILQ_INIT(&thread->io_channels);
211 	TAILQ_INIT(&thread->active_pollers);
212 	TAILQ_INIT(&thread->timer_pollers);
213 	SLIST_INIT(&thread->msg_cache);
214 	thread->msg_cache_count = 0;
215 
216 	thread->tsc_last = spdk_get_ticks();
217 
218 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
219 	if (!thread->messages) {
220 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
221 		spdk_cpuset_free(thread->cpumask);
222 		free(thread);
223 		return NULL;
224 	}
225 
226 	/* Fill the local message pool cache. */
227 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
228 	if (rc == 0) {
229 		/* If we can't populate the cache it's ok. The cache will get filled
230 		 * up organically as messages are passed to the thread. */
231 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
232 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
233 			thread->msg_cache_count++;
234 		}
235 	}
236 
237 	if (name) {
238 		thread->name = strdup(name);
239 	} else {
240 		thread->name = spdk_sprintf_alloc("%p", thread);
241 	}
242 
243 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
244 
245 	pthread_mutex_lock(&g_devlist_mutex);
246 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
247 	g_thread_count++;
248 	pthread_mutex_unlock(&g_devlist_mutex);
249 
250 	if (g_new_thread_fn) {
251 		g_new_thread_fn(thread);
252 	}
253 
254 	return thread;
255 }
256 
257 void
258 spdk_set_thread(struct spdk_thread *thread)
259 {
260 	tls_thread = thread;
261 }
262 
263 void
264 spdk_thread_exit(struct spdk_thread *thread)
265 {
266 	struct spdk_io_channel *ch;
267 	struct spdk_msg *msg;
268 	struct spdk_poller *poller, *ptmp;
269 
270 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name);
271 
272 	if (tls_thread == thread) {
273 		tls_thread = NULL;
274 	}
275 
276 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
277 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
278 			    thread->name, ch->dev->name);
279 	}
280 
281 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
282 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
283 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
284 				     poller);
285 		}
286 
287 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
288 		free(poller);
289 	}
290 
291 
292 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
293 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
294 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
295 				     poller);
296 		}
297 
298 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
299 		free(poller);
300 	}
301 
302 	spdk_cpuset_free(thread->cpumask);
303 
304 	pthread_mutex_lock(&g_devlist_mutex);
305 	assert(g_thread_count > 0);
306 	g_thread_count--;
307 	TAILQ_REMOVE(&g_threads, thread, tailq);
308 	pthread_mutex_unlock(&g_devlist_mutex);
309 
310 	free(thread->name);
311 
312 	msg = SLIST_FIRST(&thread->msg_cache);
313 	while (msg != NULL) {
314 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
315 
316 		assert(thread->msg_cache_count > 0);
317 		thread->msg_cache_count--;
318 		spdk_mempool_put(g_spdk_msg_mempool, msg);
319 
320 		msg = SLIST_FIRST(&thread->msg_cache);
321 	}
322 
323 	assert(thread->msg_cache_count == 0);
324 
325 	if (thread->messages) {
326 		spdk_ring_free(thread->messages);
327 	}
328 
329 	free(thread);
330 }
331 
332 void *
333 spdk_thread_get_ctx(struct spdk_thread *thread)
334 {
335 	if (g_ctx_sz > 0) {
336 		return thread->ctx;
337 	}
338 
339 	return NULL;
340 }
341 
342 struct spdk_thread *
343 spdk_thread_get_from_ctx(void *ctx)
344 {
345 	if (ctx == NULL) {
346 		assert(false);
347 		return NULL;
348 	}
349 
350 	assert(g_ctx_sz > 0);
351 
352 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
353 }
354 
355 static inline uint32_t
356 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
357 {
358 	unsigned count, i;
359 	void *messages[SPDK_MSG_BATCH_SIZE];
360 
361 #ifdef DEBUG
362 	/*
363 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
364 	 * so we will never actually read uninitialized data from events, but just to be sure
365 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
366 	 */
367 	memset(messages, 0, sizeof(messages));
368 #endif
369 
370 	if (max_msgs > 0) {
371 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
372 	} else {
373 		max_msgs = SPDK_MSG_BATCH_SIZE;
374 	}
375 
376 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
377 	if (count == 0) {
378 		return 0;
379 	}
380 
381 	for (i = 0; i < count; i++) {
382 		struct spdk_msg *msg = messages[i];
383 
384 		assert(msg != NULL);
385 		msg->fn(msg->arg);
386 
387 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
388 			/* Insert the messages at the head. We want to re-use the hot
389 			 * ones. */
390 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
391 			thread->msg_cache_count++;
392 		} else {
393 			spdk_mempool_put(g_spdk_msg_mempool, msg);
394 		}
395 	}
396 
397 	return count;
398 }
399 
400 static void
401 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
402 {
403 	struct spdk_poller *iter;
404 
405 	poller->next_run_tick = now + poller->period_ticks;
406 
407 	/*
408 	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
409 	 * run time.
410 	 */
411 	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
412 		if (iter->next_run_tick <= poller->next_run_tick) {
413 			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
414 			return;
415 		}
416 	}
417 
418 	/* No earlier pollers were found, so this poller must be the new head */
419 	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
420 }
421 
422 int
423 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
424 {
425 	uint32_t msg_count;
426 	struct spdk_thread *orig_thread;
427 	struct spdk_poller *poller, *tmp;
428 	int rc = 0;
429 
430 	orig_thread = _get_thread();
431 	tls_thread = thread;
432 
433 	if (now == 0) {
434 		now = spdk_get_ticks();
435 	}
436 
437 	msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
438 	if (msg_count) {
439 		rc = 1;
440 	}
441 
442 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
443 				   active_pollers_head, tailq, tmp) {
444 		int poller_rc;
445 
446 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
447 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
448 			free(poller);
449 			continue;
450 		}
451 
452 		poller->state = SPDK_POLLER_STATE_RUNNING;
453 		poller_rc = poller->fn(poller->arg);
454 
455 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
456 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
457 			free(poller);
458 			continue;
459 		}
460 
461 		poller->state = SPDK_POLLER_STATE_WAITING;
462 
463 #ifdef DEBUG
464 		if (poller_rc == -1) {
465 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
466 		}
467 #endif
468 
469 		if (poller_rc > rc) {
470 			rc = poller_rc;
471 		}
472 
473 	}
474 
475 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
476 		int timer_rc = 0;
477 
478 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
479 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
480 			free(poller);
481 			continue;
482 		}
483 
484 		if (now < poller->next_run_tick) {
485 			break;
486 		}
487 
488 		poller->state = SPDK_POLLER_STATE_RUNNING;
489 		timer_rc = poller->fn(poller->arg);
490 
491 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
492 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
493 			free(poller);
494 			continue;
495 		}
496 
497 		poller->state = SPDK_POLLER_STATE_WAITING;
498 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
499 		_spdk_poller_insert_timer(thread, poller, now);
500 
501 #ifdef DEBUG
502 		if (timer_rc == -1) {
503 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
504 		}
505 #endif
506 
507 		if (timer_rc > rc) {
508 			rc = timer_rc;
509 
510 		}
511 	}
512 
513 	if (rc == 0) {
514 		/* Poller status idle */
515 		thread->stats.idle_tsc += now - thread->tsc_last;
516 	} else if (rc > 0) {
517 		/* Poller status busy */
518 		thread->stats.busy_tsc += now - thread->tsc_last;
519 	} else {
520 		/* Poller status unknown */
521 		thread->stats.unknown_tsc += now - thread->tsc_last;
522 	}
523 	thread->tsc_last = now;
524 
525 	tls_thread = orig_thread;
526 
527 	return rc;
528 }
529 
530 uint64_t
531 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
532 {
533 	struct spdk_poller *poller;
534 
535 	poller = TAILQ_FIRST(&thread->timer_pollers);
536 	if (poller) {
537 		return poller->next_run_tick;
538 	}
539 
540 	return 0;
541 }
542 
543 int
544 spdk_thread_has_active_pollers(struct spdk_thread *thread)
545 {
546 	return !TAILQ_EMPTY(&thread->active_pollers);
547 }
548 
549 bool
550 spdk_thread_has_pollers(struct spdk_thread *thread)
551 {
552 	if (TAILQ_EMPTY(&thread->active_pollers) &&
553 	    TAILQ_EMPTY(&thread->timer_pollers)) {
554 		return false;
555 	}
556 
557 	return true;
558 }
559 
560 bool
561 spdk_thread_is_idle(struct spdk_thread *thread)
562 {
563 	if (spdk_ring_count(thread->messages) ||
564 	    spdk_thread_has_pollers(thread)) {
565 		return false;
566 	}
567 
568 	return true;
569 }
570 
571 uint32_t
572 spdk_thread_get_count(void)
573 {
574 	/*
575 	 * Return cached value of the current thread count.  We could acquire the
576 	 *  lock and iterate through the TAILQ of threads to count them, but that
577 	 *  count could still be invalidated after we release the lock.
578 	 */
579 	return g_thread_count;
580 }
581 
582 struct spdk_thread *
583 spdk_get_thread(void)
584 {
585 	struct spdk_thread *thread;
586 
587 	thread = _get_thread();
588 	if (!thread) {
589 		SPDK_ERRLOG("No thread allocated\n");
590 	}
591 
592 	return thread;
593 }
594 
595 const char *
596 spdk_thread_get_name(const struct spdk_thread *thread)
597 {
598 	return thread->name;
599 }
600 
601 int
602 spdk_thread_get_stats(struct spdk_thread_stats *stats)
603 {
604 	struct spdk_thread *thread;
605 
606 	thread = _get_thread();
607 	if (!thread) {
608 		SPDK_ERRLOG("No thread allocated\n");
609 		return -EINVAL;
610 	}
611 
612 	if (stats == NULL) {
613 		return -EINVAL;
614 	}
615 
616 	*stats = thread->stats;
617 
618 	return 0;
619 }
620 
621 void
622 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
623 {
624 	struct spdk_thread *local_thread;
625 	struct spdk_msg *msg;
626 	int rc;
627 
628 	if (!thread) {
629 		assert(false);
630 		return;
631 	}
632 
633 	local_thread = _get_thread();
634 
635 	msg = NULL;
636 	if (local_thread != NULL) {
637 		if (local_thread->msg_cache_count > 0) {
638 			msg = SLIST_FIRST(&local_thread->msg_cache);
639 			assert(msg != NULL);
640 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
641 			local_thread->msg_cache_count--;
642 		}
643 	}
644 
645 	if (msg == NULL) {
646 		msg = spdk_mempool_get(g_spdk_msg_mempool);
647 		if (!msg) {
648 			assert(false);
649 			return;
650 		}
651 	}
652 
653 	msg->fn = fn;
654 	msg->arg = ctx;
655 
656 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1);
657 	if (rc != 1) {
658 		assert(false);
659 		spdk_mempool_put(g_spdk_msg_mempool, msg);
660 		return;
661 	}
662 }
663 
664 struct spdk_poller *
665 spdk_poller_register(spdk_poller_fn fn,
666 		     void *arg,
667 		     uint64_t period_microseconds)
668 {
669 	struct spdk_thread *thread;
670 	struct spdk_poller *poller;
671 	uint64_t quotient, remainder, ticks;
672 
673 	thread = spdk_get_thread();
674 	if (!thread) {
675 		assert(false);
676 		return NULL;
677 	}
678 
679 	poller = calloc(1, sizeof(*poller));
680 	if (poller == NULL) {
681 		SPDK_ERRLOG("Poller memory allocation failed\n");
682 		return NULL;
683 	}
684 
685 	poller->state = SPDK_POLLER_STATE_WAITING;
686 	poller->fn = fn;
687 	poller->arg = arg;
688 
689 	if (period_microseconds) {
690 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
691 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
692 		ticks = spdk_get_ticks_hz();
693 
694 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
695 	} else {
696 		poller->period_ticks = 0;
697 	}
698 
699 	if (poller->period_ticks) {
700 		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
701 	} else {
702 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
703 	}
704 
705 	return poller;
706 }
707 
708 void
709 spdk_poller_unregister(struct spdk_poller **ppoller)
710 {
711 	struct spdk_thread *thread;
712 	struct spdk_poller *poller;
713 
714 	poller = *ppoller;
715 	if (poller == NULL) {
716 		return;
717 	}
718 
719 	*ppoller = NULL;
720 
721 	thread = spdk_get_thread();
722 	if (!thread) {
723 		assert(false);
724 		return;
725 	}
726 
727 	/* Simply set the state to unregistered. The poller will get cleaned up
728 	 * in a subsequent call to spdk_thread_poll().
729 	 */
730 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
731 }
732 
733 struct call_thread {
734 	struct spdk_thread *cur_thread;
735 	spdk_msg_fn fn;
736 	void *ctx;
737 
738 	struct spdk_thread *orig_thread;
739 	spdk_msg_fn cpl;
740 };
741 
742 static void
743 spdk_on_thread(void *ctx)
744 {
745 	struct call_thread *ct = ctx;
746 
747 	ct->fn(ct->ctx);
748 
749 	pthread_mutex_lock(&g_devlist_mutex);
750 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
751 	pthread_mutex_unlock(&g_devlist_mutex);
752 
753 	if (!ct->cur_thread) {
754 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
755 
756 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
757 		free(ctx);
758 	} else {
759 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
760 			      ct->cur_thread->name);
761 
762 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
763 	}
764 }
765 
766 void
767 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
768 {
769 	struct call_thread *ct;
770 	struct spdk_thread *thread;
771 
772 	ct = calloc(1, sizeof(*ct));
773 	if (!ct) {
774 		SPDK_ERRLOG("Unable to perform thread iteration\n");
775 		cpl(ctx);
776 		return;
777 	}
778 
779 	ct->fn = fn;
780 	ct->ctx = ctx;
781 	ct->cpl = cpl;
782 
783 	pthread_mutex_lock(&g_devlist_mutex);
784 	thread = _get_thread();
785 	if (!thread) {
786 		SPDK_ERRLOG("No thread allocated\n");
787 		free(ct);
788 		cpl(ctx);
789 		return;
790 	}
791 	ct->orig_thread = thread;
792 	ct->cur_thread = TAILQ_FIRST(&g_threads);
793 	pthread_mutex_unlock(&g_devlist_mutex);
794 
795 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
796 		      ct->orig_thread->name);
797 
798 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
799 }
800 
801 void
802 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
803 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
804 			const char *name)
805 {
806 	struct io_device *dev, *tmp;
807 	struct spdk_thread *thread;
808 
809 	assert(io_device != NULL);
810 	assert(create_cb != NULL);
811 	assert(destroy_cb != NULL);
812 
813 	thread = spdk_get_thread();
814 	if (!thread) {
815 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
816 		assert(false);
817 		return;
818 	}
819 
820 	dev = calloc(1, sizeof(struct io_device));
821 	if (dev == NULL) {
822 		SPDK_ERRLOG("could not allocate io_device\n");
823 		return;
824 	}
825 
826 	dev->io_device = io_device;
827 	if (name) {
828 		dev->name = strdup(name);
829 	} else {
830 		dev->name = spdk_sprintf_alloc("%p", dev);
831 	}
832 	dev->create_cb = create_cb;
833 	dev->destroy_cb = destroy_cb;
834 	dev->unregister_cb = NULL;
835 	dev->ctx_size = ctx_size;
836 	dev->for_each_count = 0;
837 	dev->unregistered = false;
838 	dev->refcnt = 0;
839 
840 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
841 		      dev->name, dev->io_device, thread->name);
842 
843 	pthread_mutex_lock(&g_devlist_mutex);
844 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
845 		if (tmp->io_device == io_device) {
846 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
847 				    io_device, tmp->name, dev->name);
848 			free(dev->name);
849 			free(dev);
850 			pthread_mutex_unlock(&g_devlist_mutex);
851 			return;
852 		}
853 	}
854 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
855 	pthread_mutex_unlock(&g_devlist_mutex);
856 }
857 
858 static void
859 _finish_unregister(void *arg)
860 {
861 	struct io_device *dev = arg;
862 
863 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
864 		      dev->name, dev->io_device, dev->unregister_thread->name);
865 
866 	dev->unregister_cb(dev->io_device);
867 	free(dev->name);
868 	free(dev);
869 }
870 
871 static void
872 _spdk_io_device_free(struct io_device *dev)
873 {
874 	if (dev->unregister_cb == NULL) {
875 		free(dev->name);
876 		free(dev);
877 	} else {
878 		assert(dev->unregister_thread != NULL);
879 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
880 			      dev->name, dev->io_device, dev->unregister_thread->name);
881 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
882 	}
883 }
884 
885 void
886 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
887 {
888 	struct io_device *dev;
889 	uint32_t refcnt;
890 	struct spdk_thread *thread;
891 
892 	thread = spdk_get_thread();
893 	if (!thread) {
894 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
895 		assert(false);
896 		return;
897 	}
898 
899 	pthread_mutex_lock(&g_devlist_mutex);
900 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
901 		if (dev->io_device == io_device) {
902 			break;
903 		}
904 	}
905 
906 	if (!dev) {
907 		SPDK_ERRLOG("io_device %p not found\n", io_device);
908 		assert(false);
909 		pthread_mutex_unlock(&g_devlist_mutex);
910 		return;
911 	}
912 
913 	if (dev->for_each_count > 0) {
914 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
915 			    dev->name, io_device, dev->for_each_count);
916 		pthread_mutex_unlock(&g_devlist_mutex);
917 		return;
918 	}
919 
920 	dev->unregister_cb = unregister_cb;
921 	dev->unregistered = true;
922 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
923 	refcnt = dev->refcnt;
924 	dev->unregister_thread = thread;
925 	pthread_mutex_unlock(&g_devlist_mutex);
926 
927 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
928 		      dev->name, dev->io_device, thread->name);
929 
930 	if (refcnt > 0) {
931 		/* defer deletion */
932 		return;
933 	}
934 
935 	_spdk_io_device_free(dev);
936 }
937 
938 struct spdk_io_channel *
939 spdk_get_io_channel(void *io_device)
940 {
941 	struct spdk_io_channel *ch;
942 	struct spdk_thread *thread;
943 	struct io_device *dev;
944 	int rc;
945 
946 	pthread_mutex_lock(&g_devlist_mutex);
947 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
948 		if (dev->io_device == io_device) {
949 			break;
950 		}
951 	}
952 	if (dev == NULL) {
953 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
954 		pthread_mutex_unlock(&g_devlist_mutex);
955 		return NULL;
956 	}
957 
958 	thread = _get_thread();
959 	if (!thread) {
960 		SPDK_ERRLOG("No thread allocated\n");
961 		pthread_mutex_unlock(&g_devlist_mutex);
962 		return NULL;
963 	}
964 
965 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
966 		if (ch->dev == dev) {
967 			ch->ref++;
968 
969 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
970 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
971 
972 			/*
973 			 * An I/O channel already exists for this device on this
974 			 *  thread, so return it.
975 			 */
976 			pthread_mutex_unlock(&g_devlist_mutex);
977 			return ch;
978 		}
979 	}
980 
981 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
982 	if (ch == NULL) {
983 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
984 		pthread_mutex_unlock(&g_devlist_mutex);
985 		return NULL;
986 	}
987 
988 	ch->dev = dev;
989 	ch->destroy_cb = dev->destroy_cb;
990 	ch->thread = thread;
991 	ch->ref = 1;
992 	ch->destroy_ref = 0;
993 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
994 
995 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
996 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
997 
998 	dev->refcnt++;
999 
1000 	pthread_mutex_unlock(&g_devlist_mutex);
1001 
1002 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1003 	if (rc != 0) {
1004 		pthread_mutex_lock(&g_devlist_mutex);
1005 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1006 		dev->refcnt--;
1007 		free(ch);
1008 		pthread_mutex_unlock(&g_devlist_mutex);
1009 		return NULL;
1010 	}
1011 
1012 	return ch;
1013 }
1014 
1015 static void
1016 _spdk_put_io_channel(void *arg)
1017 {
1018 	struct spdk_io_channel *ch = arg;
1019 	bool do_remove_dev = true;
1020 	struct spdk_thread *thread;
1021 
1022 	thread = spdk_get_thread();
1023 	if (!thread) {
1024 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
1025 		assert(false);
1026 		return;
1027 	}
1028 
1029 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1030 		      "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
1031 		      ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
1032 
1033 	assert(ch->thread == thread);
1034 
1035 	ch->destroy_ref--;
1036 
1037 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1038 		/*
1039 		 * Another reference to the associated io_device was requested
1040 		 *  after this message was sent but before it had a chance to
1041 		 *  execute.
1042 		 */
1043 		return;
1044 	}
1045 
1046 	pthread_mutex_lock(&g_devlist_mutex);
1047 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1048 	pthread_mutex_unlock(&g_devlist_mutex);
1049 
1050 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1051 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1052 
1053 	pthread_mutex_lock(&g_devlist_mutex);
1054 	ch->dev->refcnt--;
1055 
1056 	if (!ch->dev->unregistered) {
1057 		do_remove_dev = false;
1058 	}
1059 
1060 	if (ch->dev->refcnt > 0) {
1061 		do_remove_dev = false;
1062 	}
1063 
1064 	pthread_mutex_unlock(&g_devlist_mutex);
1065 
1066 	if (do_remove_dev) {
1067 		_spdk_io_device_free(ch->dev);
1068 	}
1069 	free(ch);
1070 }
1071 
1072 void
1073 spdk_put_io_channel(struct spdk_io_channel *ch)
1074 {
1075 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1076 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1077 		      ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
1078 
1079 	ch->ref--;
1080 
1081 	if (ch->ref == 0) {
1082 		ch->destroy_ref++;
1083 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
1084 	}
1085 }
1086 
1087 struct spdk_io_channel *
1088 spdk_io_channel_from_ctx(void *ctx)
1089 {
1090 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1091 }
1092 
1093 struct spdk_thread *
1094 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1095 {
1096 	return ch->thread;
1097 }
1098 
1099 struct spdk_io_channel_iter {
1100 	void *io_device;
1101 	struct io_device *dev;
1102 	spdk_channel_msg fn;
1103 	int status;
1104 	void *ctx;
1105 	struct spdk_io_channel *ch;
1106 
1107 	struct spdk_thread *cur_thread;
1108 
1109 	struct spdk_thread *orig_thread;
1110 	spdk_channel_for_each_cpl cpl;
1111 };
1112 
1113 void *
1114 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1115 {
1116 	return i->io_device;
1117 }
1118 
1119 struct spdk_io_channel *
1120 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1121 {
1122 	return i->ch;
1123 }
1124 
1125 void *
1126 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1127 {
1128 	return i->ctx;
1129 }
1130 
1131 static void
1132 _call_completion(void *ctx)
1133 {
1134 	struct spdk_io_channel_iter *i = ctx;
1135 
1136 	if (i->cpl != NULL) {
1137 		i->cpl(i, i->status);
1138 	}
1139 	free(i);
1140 }
1141 
1142 static void
1143 _call_channel(void *ctx)
1144 {
1145 	struct spdk_io_channel_iter *i = ctx;
1146 	struct spdk_io_channel *ch;
1147 
1148 	/*
1149 	 * It is possible that the channel was deleted before this
1150 	 *  message had a chance to execute.  If so, skip calling
1151 	 *  the fn() on this thread.
1152 	 */
1153 	pthread_mutex_lock(&g_devlist_mutex);
1154 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1155 		if (ch->dev->io_device == i->io_device) {
1156 			break;
1157 		}
1158 	}
1159 	pthread_mutex_unlock(&g_devlist_mutex);
1160 
1161 	if (ch) {
1162 		i->fn(i);
1163 	} else {
1164 		spdk_for_each_channel_continue(i, 0);
1165 	}
1166 }
1167 
1168 void
1169 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1170 		      spdk_channel_for_each_cpl cpl)
1171 {
1172 	struct spdk_thread *thread;
1173 	struct spdk_io_channel *ch;
1174 	struct spdk_io_channel_iter *i;
1175 
1176 	i = calloc(1, sizeof(*i));
1177 	if (!i) {
1178 		SPDK_ERRLOG("Unable to allocate iterator\n");
1179 		return;
1180 	}
1181 
1182 	i->io_device = io_device;
1183 	i->fn = fn;
1184 	i->ctx = ctx;
1185 	i->cpl = cpl;
1186 
1187 	pthread_mutex_lock(&g_devlist_mutex);
1188 	i->orig_thread = _get_thread();
1189 
1190 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1191 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1192 			if (ch->dev->io_device == io_device) {
1193 				ch->dev->for_each_count++;
1194 				i->dev = ch->dev;
1195 				i->cur_thread = thread;
1196 				i->ch = ch;
1197 				pthread_mutex_unlock(&g_devlist_mutex);
1198 				spdk_thread_send_msg(thread, _call_channel, i);
1199 				return;
1200 			}
1201 		}
1202 	}
1203 
1204 	pthread_mutex_unlock(&g_devlist_mutex);
1205 
1206 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1207 }
1208 
1209 void
1210 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1211 {
1212 	struct spdk_thread *thread;
1213 	struct spdk_io_channel *ch;
1214 
1215 	assert(i->cur_thread == spdk_get_thread());
1216 
1217 	i->status = status;
1218 
1219 	pthread_mutex_lock(&g_devlist_mutex);
1220 	if (status) {
1221 		goto end;
1222 	}
1223 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1224 	while (thread) {
1225 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1226 			if (ch->dev->io_device == i->io_device) {
1227 				i->cur_thread = thread;
1228 				i->ch = ch;
1229 				pthread_mutex_unlock(&g_devlist_mutex);
1230 				spdk_thread_send_msg(thread, _call_channel, i);
1231 				return;
1232 			}
1233 		}
1234 		thread = TAILQ_NEXT(thread, tailq);
1235 	}
1236 
1237 end:
1238 	i->dev->for_each_count--;
1239 	i->ch = NULL;
1240 	pthread_mutex_unlock(&g_devlist_mutex);
1241 
1242 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1243 }
1244 
1245 
1246 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1247