xref: /spdk/lib/thread/thread.c (revision 73204fe2e526f20c3fe7ea2066c0a4ed9dc60272)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/queue.h"
38 #include "spdk/string.h"
39 #include "spdk/thread.h"
40 #include "spdk/util.h"
41 
42 #include "spdk_internal/log.h"
43 #include "spdk_internal/thread.h"
44 
45 #define SPDK_MSG_BATCH_SIZE		8
46 
47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
48 
49 static spdk_new_thread_fn g_new_thread_fn = NULL;
50 static size_t g_ctx_sz = 0;
51 
52 struct io_device {
53 	void				*io_device;
54 	char				*name;
55 	spdk_io_channel_create_cb	create_cb;
56 	spdk_io_channel_destroy_cb	destroy_cb;
57 	spdk_io_device_unregister_cb	unregister_cb;
58 	struct spdk_thread		*unregister_thread;
59 	uint32_t			ctx_size;
60 	uint32_t			for_each_count;
61 	TAILQ_ENTRY(io_device)		tailq;
62 
63 	uint32_t			refcnt;
64 
65 	bool				unregistered;
66 };
67 
68 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
69 
70 struct spdk_msg {
71 	spdk_msg_fn		fn;
72 	void			*arg;
73 
74 	SLIST_ENTRY(spdk_msg)	link;
75 };
76 
77 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
78 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
79 
80 enum spdk_poller_state {
81 	/* The poller is registered with a thread but not currently executing its fn. */
82 	SPDK_POLLER_STATE_WAITING,
83 
84 	/* The poller is currently running its fn. */
85 	SPDK_POLLER_STATE_RUNNING,
86 
87 	/* The poller was unregistered during the execution of its fn. */
88 	SPDK_POLLER_STATE_UNREGISTERED,
89 };
90 
91 struct spdk_poller {
92 	TAILQ_ENTRY(spdk_poller)	tailq;
93 
94 	/* Current state of the poller; should only be accessed from the poller's thread. */
95 	enum spdk_poller_state		state;
96 
97 	uint64_t			period_ticks;
98 	uint64_t			next_run_tick;
99 	spdk_poller_fn			fn;
100 	void				*arg;
101 };
102 
103 struct spdk_thread {
104 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
105 	TAILQ_ENTRY(spdk_thread)	tailq;
106 	char				*name;
107 
108 	bool				exit;
109 
110 	struct spdk_cpuset		*cpumask;
111 
112 	uint64_t			tsc_last;
113 	struct spdk_thread_stats	stats;
114 
115 	/*
116 	 * Contains pollers actively running on this thread.  Pollers
117 	 *  are run round-robin. The thread takes one poller from the head
118 	 *  of the ring, executes it, then puts it back at the tail of
119 	 *  the ring.
120 	 */
121 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
122 
123 	/**
124 	 * Contains pollers running on this thread with a periodic timer.
125 	 */
126 	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;
127 
128 	struct spdk_ring		*messages;
129 
130 	SLIST_HEAD(, spdk_msg)		msg_cache;
131 	size_t				msg_cache_count;
132 
133 	/* User context allocated at the end */
134 	uint8_t				ctx[0];
135 };
136 
137 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
138 static uint32_t g_thread_count = 0;
139 
140 static __thread struct spdk_thread *tls_thread = NULL;
141 
142 static inline struct spdk_thread *
143 _get_thread(void)
144 {
145 	return tls_thread;
146 }
147 
148 int
149 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
150 {
151 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
152 
153 	assert(g_new_thread_fn == NULL);
154 	g_new_thread_fn = new_thread_fn;
155 
156 	g_ctx_sz = ctx_sz;
157 
158 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
159 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
160 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
161 			     sizeof(struct spdk_msg),
162 			     0, /* No cache. We do our own. */
163 			     SPDK_ENV_SOCKET_ID_ANY);
164 
165 	if (!g_spdk_msg_mempool) {
166 		return -1;
167 	}
168 
169 	return 0;
170 }
171 
172 void
173 spdk_thread_lib_fini(void)
174 {
175 	struct io_device *dev;
176 
177 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
178 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
179 	}
180 
181 	if (g_spdk_msg_mempool) {
182 		spdk_mempool_free(g_spdk_msg_mempool);
183 		g_spdk_msg_mempool = NULL;
184 	}
185 
186 	g_new_thread_fn = NULL;
187 	g_ctx_sz = 0;
188 }
189 
190 static void
191 _free_thread(struct spdk_thread *thread)
192 {
193 	struct spdk_io_channel *ch;
194 	struct spdk_msg *msg;
195 	struct spdk_poller *poller, *ptmp;
196 
197 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
198 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
199 			    thread->name, ch->dev->name);
200 	}
201 
202 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
203 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
204 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
205 				     poller);
206 		}
207 
208 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
209 		free(poller);
210 	}
211 
212 
213 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
214 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
215 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
216 				     poller);
217 		}
218 
219 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
220 		free(poller);
221 	}
222 
223 	pthread_mutex_lock(&g_devlist_mutex);
224 	assert(g_thread_count > 0);
225 	g_thread_count--;
226 	TAILQ_REMOVE(&g_threads, thread, tailq);
227 	pthread_mutex_unlock(&g_devlist_mutex);
228 
229 	spdk_cpuset_free(thread->cpumask);
230 	free(thread->name);
231 
232 	msg = SLIST_FIRST(&thread->msg_cache);
233 	while (msg != NULL) {
234 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
235 
236 		assert(thread->msg_cache_count > 0);
237 		thread->msg_cache_count--;
238 		spdk_mempool_put(g_spdk_msg_mempool, msg);
239 
240 		msg = SLIST_FIRST(&thread->msg_cache);
241 	}
242 
243 	assert(thread->msg_cache_count == 0);
244 
245 	spdk_ring_free(thread->messages);
246 	free(thread);
247 }
248 
249 struct spdk_thread *
250 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
251 {
252 	struct spdk_thread *thread;
253 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
254 	int rc, i;
255 
256 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
257 	if (!thread) {
258 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
259 		return NULL;
260 	}
261 
262 	thread->cpumask = spdk_cpuset_alloc();
263 	if (!thread->cpumask) {
264 		free(thread);
265 		SPDK_ERRLOG("Unable to allocate memory for CPU mask\n");
266 		return NULL;
267 	}
268 
269 	if (cpumask) {
270 		spdk_cpuset_copy(thread->cpumask, cpumask);
271 	} else {
272 		spdk_cpuset_negate(thread->cpumask);
273 	}
274 
275 	TAILQ_INIT(&thread->io_channels);
276 	TAILQ_INIT(&thread->active_pollers);
277 	TAILQ_INIT(&thread->timer_pollers);
278 	SLIST_INIT(&thread->msg_cache);
279 	thread->msg_cache_count = 0;
280 
281 	thread->tsc_last = spdk_get_ticks();
282 
283 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
284 	if (!thread->messages) {
285 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
286 		spdk_cpuset_free(thread->cpumask);
287 		free(thread);
288 		return NULL;
289 	}
290 
291 	/* Fill the local message pool cache. */
292 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
293 	if (rc == 0) {
294 		/* If we can't populate the cache it's ok. The cache will get filled
295 		 * up organically as messages are passed to the thread. */
296 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
297 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
298 			thread->msg_cache_count++;
299 		}
300 	}
301 
302 	if (name) {
303 		thread->name = strdup(name);
304 	} else {
305 		thread->name = spdk_sprintf_alloc("%p", thread);
306 	}
307 
308 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
309 
310 	pthread_mutex_lock(&g_devlist_mutex);
311 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
312 	g_thread_count++;
313 	pthread_mutex_unlock(&g_devlist_mutex);
314 
315 	if (g_new_thread_fn) {
316 		rc = g_new_thread_fn(thread);
317 		if (rc != 0) {
318 			_free_thread(thread);
319 			return NULL;
320 		}
321 	}
322 
323 	return thread;
324 }
325 
326 void
327 spdk_set_thread(struct spdk_thread *thread)
328 {
329 	tls_thread = thread;
330 }
331 
332 void
333 spdk_thread_exit(struct spdk_thread *thread)
334 {
335 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name);
336 
337 	assert(tls_thread == thread);
338 
339 	thread->exit = true;
340 }
341 
342 void
343 spdk_thread_destroy(struct spdk_thread *thread)
344 {
345 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name);
346 
347 	assert(thread->exit == true);
348 
349 	if (tls_thread == thread) {
350 		tls_thread = NULL;
351 	}
352 
353 	_free_thread(thread);
354 }
355 
356 void *
357 spdk_thread_get_ctx(struct spdk_thread *thread)
358 {
359 	if (g_ctx_sz > 0) {
360 		return thread->ctx;
361 	}
362 
363 	return NULL;
364 }
365 
366 struct spdk_cpuset *
367 spdk_thread_get_cpumask(struct spdk_thread *thread)
368 {
369 	return thread->cpumask;
370 }
371 
372 struct spdk_thread *
373 spdk_thread_get_from_ctx(void *ctx)
374 {
375 	if (ctx == NULL) {
376 		assert(false);
377 		return NULL;
378 	}
379 
380 	assert(g_ctx_sz > 0);
381 
382 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
383 }
384 
385 static inline uint32_t
386 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
387 {
388 	unsigned count, i;
389 	void *messages[SPDK_MSG_BATCH_SIZE];
390 
391 #ifdef DEBUG
392 	/*
393 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
394 	 * so we will never actually read uninitialized data from events, but just to be sure
395 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
396 	 */
397 	memset(messages, 0, sizeof(messages));
398 #endif
399 
400 	if (max_msgs > 0) {
401 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
402 	} else {
403 		max_msgs = SPDK_MSG_BATCH_SIZE;
404 	}
405 
406 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
407 	if (count == 0) {
408 		return 0;
409 	}
410 
411 	for (i = 0; i < count; i++) {
412 		struct spdk_msg *msg = messages[i];
413 
414 		assert(msg != NULL);
415 		msg->fn(msg->arg);
416 
417 		if (thread->exit) {
418 			break;
419 		}
420 
421 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
422 			/* Insert the messages at the head. We want to re-use the hot
423 			 * ones. */
424 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
425 			thread->msg_cache_count++;
426 		} else {
427 			spdk_mempool_put(g_spdk_msg_mempool, msg);
428 		}
429 	}
430 
431 	return count;
432 }
433 
434 static void
435 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
436 {
437 	struct spdk_poller *iter;
438 
439 	poller->next_run_tick = now + poller->period_ticks;
440 
441 	/*
442 	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
443 	 * run time.
444 	 */
445 	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
446 		if (iter->next_run_tick <= poller->next_run_tick) {
447 			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
448 			return;
449 		}
450 	}
451 
452 	/* No earlier pollers were found, so this poller must be the new head */
453 	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
454 }
455 
456 int
457 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
458 {
459 	uint32_t msg_count;
460 	struct spdk_thread *orig_thread;
461 	struct spdk_poller *poller, *tmp;
462 	int rc = 0;
463 
464 	orig_thread = _get_thread();
465 	tls_thread = thread;
466 
467 	if (now == 0) {
468 		now = spdk_get_ticks();
469 	}
470 
471 	msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
472 	if (msg_count) {
473 		rc = 1;
474 	}
475 
476 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
477 				   active_pollers_head, tailq, tmp) {
478 		int poller_rc;
479 
480 		if (thread->exit) {
481 			break;
482 		}
483 
484 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
485 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
486 			free(poller);
487 			continue;
488 		}
489 
490 		poller->state = SPDK_POLLER_STATE_RUNNING;
491 		poller_rc = poller->fn(poller->arg);
492 
493 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
494 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
495 			free(poller);
496 			continue;
497 		}
498 
499 		poller->state = SPDK_POLLER_STATE_WAITING;
500 
501 #ifdef DEBUG
502 		if (poller_rc == -1) {
503 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
504 		}
505 #endif
506 
507 		if (poller_rc > rc) {
508 			rc = poller_rc;
509 		}
510 
511 	}
512 
513 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
514 		int timer_rc = 0;
515 
516 		if (thread->exit) {
517 			break;
518 		}
519 
520 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
521 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
522 			free(poller);
523 			continue;
524 		}
525 
526 		if (now < poller->next_run_tick) {
527 			break;
528 		}
529 
530 		poller->state = SPDK_POLLER_STATE_RUNNING;
531 		timer_rc = poller->fn(poller->arg);
532 
533 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
534 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
535 			free(poller);
536 			continue;
537 		}
538 
539 		poller->state = SPDK_POLLER_STATE_WAITING;
540 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
541 		_spdk_poller_insert_timer(thread, poller, now);
542 
543 #ifdef DEBUG
544 		if (timer_rc == -1) {
545 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
546 		}
547 #endif
548 
549 		if (timer_rc > rc) {
550 			rc = timer_rc;
551 
552 		}
553 	}
554 
555 	if (rc == 0) {
556 		/* Poller status idle */
557 		thread->stats.idle_tsc += now - thread->tsc_last;
558 	} else if (rc > 0) {
559 		/* Poller status busy */
560 		thread->stats.busy_tsc += now - thread->tsc_last;
561 	}
562 	thread->tsc_last = now;
563 
564 	tls_thread = orig_thread;
565 
566 	return rc;
567 }
568 
569 uint64_t
570 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
571 {
572 	struct spdk_poller *poller;
573 
574 	poller = TAILQ_FIRST(&thread->timer_pollers);
575 	if (poller) {
576 		return poller->next_run_tick;
577 	}
578 
579 	return 0;
580 }
581 
582 int
583 spdk_thread_has_active_pollers(struct spdk_thread *thread)
584 {
585 	return !TAILQ_EMPTY(&thread->active_pollers);
586 }
587 
588 bool
589 spdk_thread_has_pollers(struct spdk_thread *thread)
590 {
591 	if (TAILQ_EMPTY(&thread->active_pollers) &&
592 	    TAILQ_EMPTY(&thread->timer_pollers)) {
593 		return false;
594 	}
595 
596 	return true;
597 }
598 
599 bool
600 spdk_thread_is_idle(struct spdk_thread *thread)
601 {
602 	if (spdk_ring_count(thread->messages) ||
603 	    spdk_thread_has_pollers(thread)) {
604 		return false;
605 	}
606 
607 	return true;
608 }
609 
610 uint32_t
611 spdk_thread_get_count(void)
612 {
613 	/*
614 	 * Return cached value of the current thread count.  We could acquire the
615 	 *  lock and iterate through the TAILQ of threads to count them, but that
616 	 *  count could still be invalidated after we release the lock.
617 	 */
618 	return g_thread_count;
619 }
620 
621 struct spdk_thread *
622 spdk_get_thread(void)
623 {
624 	struct spdk_thread *thread;
625 
626 	thread = _get_thread();
627 	if (!thread) {
628 		SPDK_ERRLOG("No thread allocated\n");
629 	}
630 
631 	return thread;
632 }
633 
634 const char *
635 spdk_thread_get_name(const struct spdk_thread *thread)
636 {
637 	return thread->name;
638 }
639 
640 int
641 spdk_thread_get_stats(struct spdk_thread_stats *stats)
642 {
643 	struct spdk_thread *thread;
644 
645 	thread = _get_thread();
646 	if (!thread) {
647 		SPDK_ERRLOG("No thread allocated\n");
648 		return -EINVAL;
649 	}
650 
651 	if (stats == NULL) {
652 		return -EINVAL;
653 	}
654 
655 	*stats = thread->stats;
656 
657 	return 0;
658 }
659 
660 void
661 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
662 {
663 	struct spdk_thread *local_thread;
664 	struct spdk_msg *msg;
665 	int rc;
666 
667 	if (!thread) {
668 		assert(false);
669 		return;
670 	}
671 
672 	local_thread = _get_thread();
673 
674 	msg = NULL;
675 	if (local_thread != NULL) {
676 		if (local_thread->msg_cache_count > 0) {
677 			msg = SLIST_FIRST(&local_thread->msg_cache);
678 			assert(msg != NULL);
679 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
680 			local_thread->msg_cache_count--;
681 		}
682 	}
683 
684 	if (msg == NULL) {
685 		msg = spdk_mempool_get(g_spdk_msg_mempool);
686 		if (!msg) {
687 			assert(false);
688 			return;
689 		}
690 	}
691 
692 	msg->fn = fn;
693 	msg->arg = ctx;
694 
695 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
696 	if (rc != 1) {
697 		assert(false);
698 		spdk_mempool_put(g_spdk_msg_mempool, msg);
699 		return;
700 	}
701 }
702 
703 struct spdk_poller *
704 spdk_poller_register(spdk_poller_fn fn,
705 		     void *arg,
706 		     uint64_t period_microseconds)
707 {
708 	struct spdk_thread *thread;
709 	struct spdk_poller *poller;
710 	uint64_t quotient, remainder, ticks;
711 
712 	thread = spdk_get_thread();
713 	if (!thread) {
714 		assert(false);
715 		return NULL;
716 	}
717 
718 	poller = calloc(1, sizeof(*poller));
719 	if (poller == NULL) {
720 		SPDK_ERRLOG("Poller memory allocation failed\n");
721 		return NULL;
722 	}
723 
724 	poller->state = SPDK_POLLER_STATE_WAITING;
725 	poller->fn = fn;
726 	poller->arg = arg;
727 
728 	if (period_microseconds) {
729 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
730 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
731 		ticks = spdk_get_ticks_hz();
732 
733 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
734 	} else {
735 		poller->period_ticks = 0;
736 	}
737 
738 	if (poller->period_ticks) {
739 		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
740 	} else {
741 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
742 	}
743 
744 	return poller;
745 }
746 
747 void
748 spdk_poller_unregister(struct spdk_poller **ppoller)
749 {
750 	struct spdk_thread *thread;
751 	struct spdk_poller *poller;
752 
753 	poller = *ppoller;
754 	if (poller == NULL) {
755 		return;
756 	}
757 
758 	*ppoller = NULL;
759 
760 	thread = spdk_get_thread();
761 	if (!thread) {
762 		assert(false);
763 		return;
764 	}
765 
766 	/* Simply set the state to unregistered. The poller will get cleaned up
767 	 * in a subsequent call to spdk_thread_poll().
768 	 */
769 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
770 }
771 
772 struct call_thread {
773 	struct spdk_thread *cur_thread;
774 	spdk_msg_fn fn;
775 	void *ctx;
776 
777 	struct spdk_thread *orig_thread;
778 	spdk_msg_fn cpl;
779 };
780 
781 static void
782 spdk_on_thread(void *ctx)
783 {
784 	struct call_thread *ct = ctx;
785 
786 	ct->fn(ct->ctx);
787 
788 	pthread_mutex_lock(&g_devlist_mutex);
789 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
790 	pthread_mutex_unlock(&g_devlist_mutex);
791 
792 	if (!ct->cur_thread) {
793 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
794 
795 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
796 		free(ctx);
797 	} else {
798 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
799 			      ct->cur_thread->name);
800 
801 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
802 	}
803 }
804 
805 void
806 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
807 {
808 	struct call_thread *ct;
809 	struct spdk_thread *thread;
810 
811 	ct = calloc(1, sizeof(*ct));
812 	if (!ct) {
813 		SPDK_ERRLOG("Unable to perform thread iteration\n");
814 		cpl(ctx);
815 		return;
816 	}
817 
818 	ct->fn = fn;
819 	ct->ctx = ctx;
820 	ct->cpl = cpl;
821 
822 	thread = _get_thread();
823 	if (!thread) {
824 		SPDK_ERRLOG("No thread allocated\n");
825 		free(ct);
826 		cpl(ctx);
827 		return;
828 	}
829 	ct->orig_thread = thread;
830 
831 	pthread_mutex_lock(&g_devlist_mutex);
832 	ct->cur_thread = TAILQ_FIRST(&g_threads);
833 	pthread_mutex_unlock(&g_devlist_mutex);
834 
835 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
836 		      ct->orig_thread->name);
837 
838 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
839 }
840 
841 void
842 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
843 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
844 			const char *name)
845 {
846 	struct io_device *dev, *tmp;
847 	struct spdk_thread *thread;
848 
849 	assert(io_device != NULL);
850 	assert(create_cb != NULL);
851 	assert(destroy_cb != NULL);
852 
853 	thread = spdk_get_thread();
854 	if (!thread) {
855 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
856 		assert(false);
857 		return;
858 	}
859 
860 	dev = calloc(1, sizeof(struct io_device));
861 	if (dev == NULL) {
862 		SPDK_ERRLOG("could not allocate io_device\n");
863 		return;
864 	}
865 
866 	dev->io_device = io_device;
867 	if (name) {
868 		dev->name = strdup(name);
869 	} else {
870 		dev->name = spdk_sprintf_alloc("%p", dev);
871 	}
872 	dev->create_cb = create_cb;
873 	dev->destroy_cb = destroy_cb;
874 	dev->unregister_cb = NULL;
875 	dev->ctx_size = ctx_size;
876 	dev->for_each_count = 0;
877 	dev->unregistered = false;
878 	dev->refcnt = 0;
879 
880 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
881 		      dev->name, dev->io_device, thread->name);
882 
883 	pthread_mutex_lock(&g_devlist_mutex);
884 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
885 		if (tmp->io_device == io_device) {
886 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
887 				    io_device, tmp->name, dev->name);
888 			free(dev->name);
889 			free(dev);
890 			pthread_mutex_unlock(&g_devlist_mutex);
891 			return;
892 		}
893 	}
894 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
895 	pthread_mutex_unlock(&g_devlist_mutex);
896 }
897 
898 static void
899 _finish_unregister(void *arg)
900 {
901 	struct io_device *dev = arg;
902 
903 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
904 		      dev->name, dev->io_device, dev->unregister_thread->name);
905 
906 	dev->unregister_cb(dev->io_device);
907 	free(dev->name);
908 	free(dev);
909 }
910 
911 static void
912 _spdk_io_device_free(struct io_device *dev)
913 {
914 	if (dev->unregister_cb == NULL) {
915 		free(dev->name);
916 		free(dev);
917 	} else {
918 		assert(dev->unregister_thread != NULL);
919 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
920 			      dev->name, dev->io_device, dev->unregister_thread->name);
921 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
922 	}
923 }
924 
925 void
926 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
927 {
928 	struct io_device *dev;
929 	uint32_t refcnt;
930 	struct spdk_thread *thread;
931 
932 	thread = spdk_get_thread();
933 	if (!thread) {
934 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
935 		assert(false);
936 		return;
937 	}
938 
939 	pthread_mutex_lock(&g_devlist_mutex);
940 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
941 		if (dev->io_device == io_device) {
942 			break;
943 		}
944 	}
945 
946 	if (!dev) {
947 		SPDK_ERRLOG("io_device %p not found\n", io_device);
948 		assert(false);
949 		pthread_mutex_unlock(&g_devlist_mutex);
950 		return;
951 	}
952 
953 	if (dev->for_each_count > 0) {
954 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
955 			    dev->name, io_device, dev->for_each_count);
956 		pthread_mutex_unlock(&g_devlist_mutex);
957 		return;
958 	}
959 
960 	dev->unregister_cb = unregister_cb;
961 	dev->unregistered = true;
962 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
963 	refcnt = dev->refcnt;
964 	dev->unregister_thread = thread;
965 	pthread_mutex_unlock(&g_devlist_mutex);
966 
967 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
968 		      dev->name, dev->io_device, thread->name);
969 
970 	if (refcnt > 0) {
971 		/* defer deletion */
972 		return;
973 	}
974 
975 	_spdk_io_device_free(dev);
976 }
977 
978 struct spdk_io_channel *
979 spdk_get_io_channel(void *io_device)
980 {
981 	struct spdk_io_channel *ch;
982 	struct spdk_thread *thread;
983 	struct io_device *dev;
984 	int rc;
985 
986 	pthread_mutex_lock(&g_devlist_mutex);
987 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
988 		if (dev->io_device == io_device) {
989 			break;
990 		}
991 	}
992 	if (dev == NULL) {
993 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
994 		pthread_mutex_unlock(&g_devlist_mutex);
995 		return NULL;
996 	}
997 
998 	thread = _get_thread();
999 	if (!thread) {
1000 		SPDK_ERRLOG("No thread allocated\n");
1001 		pthread_mutex_unlock(&g_devlist_mutex);
1002 		return NULL;
1003 	}
1004 
1005 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1006 		if (ch->dev == dev) {
1007 			ch->ref++;
1008 
1009 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1010 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
1011 
1012 			/*
1013 			 * An I/O channel already exists for this device on this
1014 			 *  thread, so return it.
1015 			 */
1016 			pthread_mutex_unlock(&g_devlist_mutex);
1017 			return ch;
1018 		}
1019 	}
1020 
1021 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1022 	if (ch == NULL) {
1023 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1024 		pthread_mutex_unlock(&g_devlist_mutex);
1025 		return NULL;
1026 	}
1027 
1028 	ch->dev = dev;
1029 	ch->destroy_cb = dev->destroy_cb;
1030 	ch->thread = thread;
1031 	ch->ref = 1;
1032 	ch->destroy_ref = 0;
1033 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1034 
1035 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1036 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1037 
1038 	dev->refcnt++;
1039 
1040 	pthread_mutex_unlock(&g_devlist_mutex);
1041 
1042 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1043 	if (rc != 0) {
1044 		pthread_mutex_lock(&g_devlist_mutex);
1045 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1046 		dev->refcnt--;
1047 		free(ch);
1048 		pthread_mutex_unlock(&g_devlist_mutex);
1049 		return NULL;
1050 	}
1051 
1052 	return ch;
1053 }
1054 
1055 static void
1056 _spdk_put_io_channel(void *arg)
1057 {
1058 	struct spdk_io_channel *ch = arg;
1059 	bool do_remove_dev = true;
1060 	struct spdk_thread *thread;
1061 
1062 	thread = spdk_get_thread();
1063 	if (!thread) {
1064 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
1065 		assert(false);
1066 		return;
1067 	}
1068 
1069 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1070 		      "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
1071 		      ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
1072 
1073 	assert(ch->thread == thread);
1074 
1075 	ch->destroy_ref--;
1076 
1077 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1078 		/*
1079 		 * Another reference to the associated io_device was requested
1080 		 *  after this message was sent but before it had a chance to
1081 		 *  execute.
1082 		 */
1083 		return;
1084 	}
1085 
1086 	pthread_mutex_lock(&g_devlist_mutex);
1087 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1088 	pthread_mutex_unlock(&g_devlist_mutex);
1089 
1090 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1091 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1092 
1093 	pthread_mutex_lock(&g_devlist_mutex);
1094 	ch->dev->refcnt--;
1095 
1096 	if (!ch->dev->unregistered) {
1097 		do_remove_dev = false;
1098 	}
1099 
1100 	if (ch->dev->refcnt > 0) {
1101 		do_remove_dev = false;
1102 	}
1103 
1104 	pthread_mutex_unlock(&g_devlist_mutex);
1105 
1106 	if (do_remove_dev) {
1107 		_spdk_io_device_free(ch->dev);
1108 	}
1109 	free(ch);
1110 }
1111 
1112 void
1113 spdk_put_io_channel(struct spdk_io_channel *ch)
1114 {
1115 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1116 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1117 		      ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
1118 
1119 	ch->ref--;
1120 
1121 	if (ch->ref == 0) {
1122 		ch->destroy_ref++;
1123 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
1124 	}
1125 }
1126 
1127 struct spdk_io_channel *
1128 spdk_io_channel_from_ctx(void *ctx)
1129 {
1130 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1131 }
1132 
1133 struct spdk_thread *
1134 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1135 {
1136 	return ch->thread;
1137 }
1138 
1139 struct spdk_io_channel_iter {
1140 	void *io_device;
1141 	struct io_device *dev;
1142 	spdk_channel_msg fn;
1143 	int status;
1144 	void *ctx;
1145 	struct spdk_io_channel *ch;
1146 
1147 	struct spdk_thread *cur_thread;
1148 
1149 	struct spdk_thread *orig_thread;
1150 	spdk_channel_for_each_cpl cpl;
1151 };
1152 
1153 void *
1154 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1155 {
1156 	return i->io_device;
1157 }
1158 
1159 struct spdk_io_channel *
1160 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1161 {
1162 	return i->ch;
1163 }
1164 
1165 void *
1166 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1167 {
1168 	return i->ctx;
1169 }
1170 
1171 static void
1172 _call_completion(void *ctx)
1173 {
1174 	struct spdk_io_channel_iter *i = ctx;
1175 
1176 	if (i->cpl != NULL) {
1177 		i->cpl(i, i->status);
1178 	}
1179 	free(i);
1180 }
1181 
1182 static void
1183 _call_channel(void *ctx)
1184 {
1185 	struct spdk_io_channel_iter *i = ctx;
1186 	struct spdk_io_channel *ch;
1187 
1188 	/*
1189 	 * It is possible that the channel was deleted before this
1190 	 *  message had a chance to execute.  If so, skip calling
1191 	 *  the fn() on this thread.
1192 	 */
1193 	pthread_mutex_lock(&g_devlist_mutex);
1194 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1195 		if (ch->dev->io_device == i->io_device) {
1196 			break;
1197 		}
1198 	}
1199 	pthread_mutex_unlock(&g_devlist_mutex);
1200 
1201 	if (ch) {
1202 		i->fn(i);
1203 	} else {
1204 		spdk_for_each_channel_continue(i, 0);
1205 	}
1206 }
1207 
1208 void
1209 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1210 		      spdk_channel_for_each_cpl cpl)
1211 {
1212 	struct spdk_thread *thread;
1213 	struct spdk_io_channel *ch;
1214 	struct spdk_io_channel_iter *i;
1215 
1216 	i = calloc(1, sizeof(*i));
1217 	if (!i) {
1218 		SPDK_ERRLOG("Unable to allocate iterator\n");
1219 		return;
1220 	}
1221 
1222 	i->io_device = io_device;
1223 	i->fn = fn;
1224 	i->ctx = ctx;
1225 	i->cpl = cpl;
1226 
1227 	pthread_mutex_lock(&g_devlist_mutex);
1228 	i->orig_thread = _get_thread();
1229 
1230 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1231 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1232 			if (ch->dev->io_device == io_device) {
1233 				ch->dev->for_each_count++;
1234 				i->dev = ch->dev;
1235 				i->cur_thread = thread;
1236 				i->ch = ch;
1237 				pthread_mutex_unlock(&g_devlist_mutex);
1238 				spdk_thread_send_msg(thread, _call_channel, i);
1239 				return;
1240 			}
1241 		}
1242 	}
1243 
1244 	pthread_mutex_unlock(&g_devlist_mutex);
1245 
1246 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1247 }
1248 
1249 void
1250 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1251 {
1252 	struct spdk_thread *thread;
1253 	struct spdk_io_channel *ch;
1254 
1255 	assert(i->cur_thread == spdk_get_thread());
1256 
1257 	i->status = status;
1258 
1259 	pthread_mutex_lock(&g_devlist_mutex);
1260 	if (status) {
1261 		goto end;
1262 	}
1263 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1264 	while (thread) {
1265 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1266 			if (ch->dev->io_device == i->io_device) {
1267 				i->cur_thread = thread;
1268 				i->ch = ch;
1269 				pthread_mutex_unlock(&g_devlist_mutex);
1270 				spdk_thread_send_msg(thread, _call_channel, i);
1271 				return;
1272 			}
1273 		}
1274 		thread = TAILQ_NEXT(thread, tailq);
1275 	}
1276 
1277 end:
1278 	i->dev->for_each_count--;
1279 	i->ch = NULL;
1280 	pthread_mutex_unlock(&g_devlist_mutex);
1281 
1282 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1283 }
1284 
1285 
1286 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1287