xref: /spdk/lib/thread/thread.c (revision c4d9daeb7bf491bc0eb6e8d417b75d44773cb009)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/queue.h"
38 #include "spdk/string.h"
39 #include "spdk/thread.h"
40 #include "spdk/util.h"
41 
42 #include "spdk_internal/log.h"
43 #include "spdk_internal/thread.h"
44 
45 #define SPDK_MSG_BATCH_SIZE		8
46 
47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
48 
49 static spdk_new_thread_fn g_new_thread_fn = NULL;
50 static size_t g_ctx_sz = 0;
51 
52 struct io_device {
53 	void				*io_device;
54 	char				*name;
55 	spdk_io_channel_create_cb	create_cb;
56 	spdk_io_channel_destroy_cb	destroy_cb;
57 	spdk_io_device_unregister_cb	unregister_cb;
58 	struct spdk_thread		*unregister_thread;
59 	uint32_t			ctx_size;
60 	uint32_t			for_each_count;
61 	TAILQ_ENTRY(io_device)		tailq;
62 
63 	uint32_t			refcnt;
64 
65 	bool				unregistered;
66 };
67 
68 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
69 
70 struct spdk_msg {
71 	spdk_msg_fn		fn;
72 	void			*arg;
73 
74 	SLIST_ENTRY(spdk_msg)	link;
75 };
76 
77 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
78 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
79 
80 enum spdk_poller_state {
81 	/* The poller is registered with a thread but not currently executing its fn. */
82 	SPDK_POLLER_STATE_WAITING,
83 
84 	/* The poller is currently running its fn. */
85 	SPDK_POLLER_STATE_RUNNING,
86 
87 	/* The poller was unregistered during the execution of its fn. */
88 	SPDK_POLLER_STATE_UNREGISTERED,
89 };
90 
91 struct spdk_poller {
92 	TAILQ_ENTRY(spdk_poller)	tailq;
93 
94 	/* Current state of the poller; should only be accessed from the poller's thread. */
95 	enum spdk_poller_state		state;
96 
97 	uint64_t			period_ticks;
98 	uint64_t			next_run_tick;
99 	spdk_poller_fn			fn;
100 	void				*arg;
101 };
102 
103 struct spdk_thread {
104 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
105 	TAILQ_ENTRY(spdk_thread)	tailq;
106 	char				*name;
107 
108 	struct spdk_cpuset		*cpumask;
109 
110 	uint64_t			tsc_last;
111 	struct spdk_thread_stats	stats;
112 
113 	/*
114 	 * Contains pollers actively running on this thread.  Pollers
115 	 *  are run round-robin. The thread takes one poller from the head
116 	 *  of the ring, executes it, then puts it back at the tail of
117 	 *  the ring.
118 	 */
119 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
120 
121 	/**
122 	 * Contains pollers running on this thread with a periodic timer.
123 	 */
124 	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;
125 
126 	struct spdk_ring		*messages;
127 
128 	SLIST_HEAD(, spdk_msg)		msg_cache;
129 	size_t				msg_cache_count;
130 
131 	/* User context allocated at the end */
132 	uint8_t				ctx[0];
133 };
134 
135 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
136 static uint32_t g_thread_count = 0;
137 
138 static __thread struct spdk_thread *tls_thread = NULL;
139 
140 static inline struct spdk_thread *
141 _get_thread(void)
142 {
143 	return tls_thread;
144 }
145 
146 int
147 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
148 {
149 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
150 
151 	assert(g_new_thread_fn == NULL);
152 	g_new_thread_fn = new_thread_fn;
153 
154 	g_ctx_sz = ctx_sz;
155 
156 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
157 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
158 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
159 			     sizeof(struct spdk_msg),
160 			     0, /* No cache. We do our own. */
161 			     SPDK_ENV_SOCKET_ID_ANY);
162 
163 	if (!g_spdk_msg_mempool) {
164 		return -1;
165 	}
166 
167 	return 0;
168 }
169 
170 void
171 spdk_thread_lib_fini(void)
172 {
173 	struct io_device *dev;
174 
175 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
176 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
177 	}
178 
179 	if (g_spdk_msg_mempool) {
180 		spdk_mempool_free(g_spdk_msg_mempool);
181 		g_spdk_msg_mempool = NULL;
182 	}
183 
184 	g_new_thread_fn = NULL;
185 	g_ctx_sz = 0;
186 }
187 
188 static void
189 _free_thread(struct spdk_thread *thread)
190 {
191 	struct spdk_io_channel *ch;
192 	struct spdk_msg *msg;
193 	struct spdk_poller *poller, *ptmp;
194 
195 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
196 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
197 			    thread->name, ch->dev->name);
198 	}
199 
200 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
201 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
202 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
203 				     poller);
204 		}
205 
206 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
207 		free(poller);
208 	}
209 
210 
211 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
212 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
213 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
214 				     poller);
215 		}
216 
217 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
218 		free(poller);
219 	}
220 
221 	pthread_mutex_lock(&g_devlist_mutex);
222 	assert(g_thread_count > 0);
223 	g_thread_count--;
224 	TAILQ_REMOVE(&g_threads, thread, tailq);
225 	pthread_mutex_unlock(&g_devlist_mutex);
226 
227 	spdk_cpuset_free(thread->cpumask);
228 	free(thread->name);
229 
230 	msg = SLIST_FIRST(&thread->msg_cache);
231 	while (msg != NULL) {
232 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
233 
234 		assert(thread->msg_cache_count > 0);
235 		thread->msg_cache_count--;
236 		spdk_mempool_put(g_spdk_msg_mempool, msg);
237 
238 		msg = SLIST_FIRST(&thread->msg_cache);
239 	}
240 
241 	assert(thread->msg_cache_count == 0);
242 
243 	spdk_ring_free(thread->messages);
244 	free(thread);
245 }
246 
247 struct spdk_thread *
248 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
249 {
250 	struct spdk_thread *thread;
251 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
252 	int rc, i;
253 
254 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
255 	if (!thread) {
256 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
257 		return NULL;
258 	}
259 
260 	thread->cpumask = spdk_cpuset_alloc();
261 	if (!thread->cpumask) {
262 		free(thread);
263 		SPDK_ERRLOG("Unable to allocate memory for CPU mask\n");
264 		return NULL;
265 	}
266 
267 	if (cpumask) {
268 		spdk_cpuset_copy(thread->cpumask, cpumask);
269 	} else {
270 		spdk_cpuset_negate(thread->cpumask);
271 	}
272 
273 	TAILQ_INIT(&thread->io_channels);
274 	TAILQ_INIT(&thread->active_pollers);
275 	TAILQ_INIT(&thread->timer_pollers);
276 	SLIST_INIT(&thread->msg_cache);
277 	thread->msg_cache_count = 0;
278 
279 	thread->tsc_last = spdk_get_ticks();
280 
281 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
282 	if (!thread->messages) {
283 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
284 		spdk_cpuset_free(thread->cpumask);
285 		free(thread);
286 		return NULL;
287 	}
288 
289 	/* Fill the local message pool cache. */
290 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
291 	if (rc == 0) {
292 		/* If we can't populate the cache it's ok. The cache will get filled
293 		 * up organically as messages are passed to the thread. */
294 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
295 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
296 			thread->msg_cache_count++;
297 		}
298 	}
299 
300 	if (name) {
301 		thread->name = strdup(name);
302 	} else {
303 		thread->name = spdk_sprintf_alloc("%p", thread);
304 	}
305 
306 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
307 
308 	pthread_mutex_lock(&g_devlist_mutex);
309 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
310 	g_thread_count++;
311 	pthread_mutex_unlock(&g_devlist_mutex);
312 
313 	if (g_new_thread_fn) {
314 		rc = g_new_thread_fn(thread);
315 		if (rc != 0) {
316 			_free_thread(thread);
317 			return NULL;
318 		}
319 	}
320 
321 	return thread;
322 }
323 
324 void
325 spdk_set_thread(struct spdk_thread *thread)
326 {
327 	tls_thread = thread;
328 }
329 
330 void
331 spdk_thread_exit(struct spdk_thread *thread)
332 {
333 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name);
334 
335 	if (tls_thread == thread) {
336 		tls_thread = NULL;
337 	}
338 
339 	_free_thread(thread);
340 }
341 
342 void *
343 spdk_thread_get_ctx(struct spdk_thread *thread)
344 {
345 	if (g_ctx_sz > 0) {
346 		return thread->ctx;
347 	}
348 
349 	return NULL;
350 }
351 
352 struct spdk_cpuset *
353 spdk_thread_get_cpumask(struct spdk_thread *thread)
354 {
355 	return thread->cpumask;
356 }
357 
358 struct spdk_thread *
359 spdk_thread_get_from_ctx(void *ctx)
360 {
361 	if (ctx == NULL) {
362 		assert(false);
363 		return NULL;
364 	}
365 
366 	assert(g_ctx_sz > 0);
367 
368 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
369 }
370 
371 static inline uint32_t
372 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
373 {
374 	unsigned count, i;
375 	void *messages[SPDK_MSG_BATCH_SIZE];
376 
377 #ifdef DEBUG
378 	/*
379 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
380 	 * so we will never actually read uninitialized data from events, but just to be sure
381 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
382 	 */
383 	memset(messages, 0, sizeof(messages));
384 #endif
385 
386 	if (max_msgs > 0) {
387 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
388 	} else {
389 		max_msgs = SPDK_MSG_BATCH_SIZE;
390 	}
391 
392 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
393 	if (count == 0) {
394 		return 0;
395 	}
396 
397 	for (i = 0; i < count; i++) {
398 		struct spdk_msg *msg = messages[i];
399 
400 		assert(msg != NULL);
401 		msg->fn(msg->arg);
402 
403 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
404 			/* Insert the messages at the head. We want to re-use the hot
405 			 * ones. */
406 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
407 			thread->msg_cache_count++;
408 		} else {
409 			spdk_mempool_put(g_spdk_msg_mempool, msg);
410 		}
411 	}
412 
413 	return count;
414 }
415 
416 static void
417 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
418 {
419 	struct spdk_poller *iter;
420 
421 	poller->next_run_tick = now + poller->period_ticks;
422 
423 	/*
424 	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
425 	 * run time.
426 	 */
427 	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
428 		if (iter->next_run_tick <= poller->next_run_tick) {
429 			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
430 			return;
431 		}
432 	}
433 
434 	/* No earlier pollers were found, so this poller must be the new head */
435 	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
436 }
437 
438 int
439 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
440 {
441 	uint32_t msg_count;
442 	struct spdk_thread *orig_thread;
443 	struct spdk_poller *poller, *tmp;
444 	int rc = 0;
445 
446 	orig_thread = _get_thread();
447 	tls_thread = thread;
448 
449 	if (now == 0) {
450 		now = spdk_get_ticks();
451 	}
452 
453 	msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
454 	if (msg_count) {
455 		rc = 1;
456 	}
457 
458 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
459 				   active_pollers_head, tailq, tmp) {
460 		int poller_rc;
461 
462 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
463 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
464 			free(poller);
465 			continue;
466 		}
467 
468 		poller->state = SPDK_POLLER_STATE_RUNNING;
469 		poller_rc = poller->fn(poller->arg);
470 
471 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
472 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
473 			free(poller);
474 			continue;
475 		}
476 
477 		poller->state = SPDK_POLLER_STATE_WAITING;
478 
479 #ifdef DEBUG
480 		if (poller_rc == -1) {
481 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
482 		}
483 #endif
484 
485 		if (poller_rc > rc) {
486 			rc = poller_rc;
487 		}
488 
489 	}
490 
491 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
492 		int timer_rc = 0;
493 
494 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
495 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
496 			free(poller);
497 			continue;
498 		}
499 
500 		if (now < poller->next_run_tick) {
501 			break;
502 		}
503 
504 		poller->state = SPDK_POLLER_STATE_RUNNING;
505 		timer_rc = poller->fn(poller->arg);
506 
507 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
508 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
509 			free(poller);
510 			continue;
511 		}
512 
513 		poller->state = SPDK_POLLER_STATE_WAITING;
514 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
515 		_spdk_poller_insert_timer(thread, poller, now);
516 
517 #ifdef DEBUG
518 		if (timer_rc == -1) {
519 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
520 		}
521 #endif
522 
523 		if (timer_rc > rc) {
524 			rc = timer_rc;
525 
526 		}
527 	}
528 
529 	if (rc == 0) {
530 		/* Poller status idle */
531 		thread->stats.idle_tsc += now - thread->tsc_last;
532 	} else if (rc > 0) {
533 		/* Poller status busy */
534 		thread->stats.busy_tsc += now - thread->tsc_last;
535 	} else {
536 		/* Poller status unknown */
537 		thread->stats.unknown_tsc += now - thread->tsc_last;
538 	}
539 	thread->tsc_last = now;
540 
541 	tls_thread = orig_thread;
542 
543 	return rc;
544 }
545 
546 uint64_t
547 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
548 {
549 	struct spdk_poller *poller;
550 
551 	poller = TAILQ_FIRST(&thread->timer_pollers);
552 	if (poller) {
553 		return poller->next_run_tick;
554 	}
555 
556 	return 0;
557 }
558 
559 int
560 spdk_thread_has_active_pollers(struct spdk_thread *thread)
561 {
562 	return !TAILQ_EMPTY(&thread->active_pollers);
563 }
564 
565 bool
566 spdk_thread_has_pollers(struct spdk_thread *thread)
567 {
568 	if (TAILQ_EMPTY(&thread->active_pollers) &&
569 	    TAILQ_EMPTY(&thread->timer_pollers)) {
570 		return false;
571 	}
572 
573 	return true;
574 }
575 
576 bool
577 spdk_thread_is_idle(struct spdk_thread *thread)
578 {
579 	if (spdk_ring_count(thread->messages) ||
580 	    spdk_thread_has_pollers(thread)) {
581 		return false;
582 	}
583 
584 	return true;
585 }
586 
587 uint32_t
588 spdk_thread_get_count(void)
589 {
590 	/*
591 	 * Return cached value of the current thread count.  We could acquire the
592 	 *  lock and iterate through the TAILQ of threads to count them, but that
593 	 *  count could still be invalidated after we release the lock.
594 	 */
595 	return g_thread_count;
596 }
597 
598 struct spdk_thread *
599 spdk_get_thread(void)
600 {
601 	struct spdk_thread *thread;
602 
603 	thread = _get_thread();
604 	if (!thread) {
605 		SPDK_ERRLOG("No thread allocated\n");
606 	}
607 
608 	return thread;
609 }
610 
611 const char *
612 spdk_thread_get_name(const struct spdk_thread *thread)
613 {
614 	return thread->name;
615 }
616 
617 int
618 spdk_thread_get_stats(struct spdk_thread_stats *stats)
619 {
620 	struct spdk_thread *thread;
621 
622 	thread = _get_thread();
623 	if (!thread) {
624 		SPDK_ERRLOG("No thread allocated\n");
625 		return -EINVAL;
626 	}
627 
628 	if (stats == NULL) {
629 		return -EINVAL;
630 	}
631 
632 	*stats = thread->stats;
633 
634 	return 0;
635 }
636 
637 void
638 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
639 {
640 	struct spdk_thread *local_thread;
641 	struct spdk_msg *msg;
642 	int rc;
643 
644 	if (!thread) {
645 		assert(false);
646 		return;
647 	}
648 
649 	local_thread = _get_thread();
650 
651 	msg = NULL;
652 	if (local_thread != NULL) {
653 		if (local_thread->msg_cache_count > 0) {
654 			msg = SLIST_FIRST(&local_thread->msg_cache);
655 			assert(msg != NULL);
656 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
657 			local_thread->msg_cache_count--;
658 		}
659 	}
660 
661 	if (msg == NULL) {
662 		msg = spdk_mempool_get(g_spdk_msg_mempool);
663 		if (!msg) {
664 			assert(false);
665 			return;
666 		}
667 	}
668 
669 	msg->fn = fn;
670 	msg->arg = ctx;
671 
672 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1);
673 	if (rc != 1) {
674 		assert(false);
675 		spdk_mempool_put(g_spdk_msg_mempool, msg);
676 		return;
677 	}
678 }
679 
680 struct spdk_poller *
681 spdk_poller_register(spdk_poller_fn fn,
682 		     void *arg,
683 		     uint64_t period_microseconds)
684 {
685 	struct spdk_thread *thread;
686 	struct spdk_poller *poller;
687 	uint64_t quotient, remainder, ticks;
688 
689 	thread = spdk_get_thread();
690 	if (!thread) {
691 		assert(false);
692 		return NULL;
693 	}
694 
695 	poller = calloc(1, sizeof(*poller));
696 	if (poller == NULL) {
697 		SPDK_ERRLOG("Poller memory allocation failed\n");
698 		return NULL;
699 	}
700 
701 	poller->state = SPDK_POLLER_STATE_WAITING;
702 	poller->fn = fn;
703 	poller->arg = arg;
704 
705 	if (period_microseconds) {
706 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
707 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
708 		ticks = spdk_get_ticks_hz();
709 
710 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
711 	} else {
712 		poller->period_ticks = 0;
713 	}
714 
715 	if (poller->period_ticks) {
716 		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
717 	} else {
718 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
719 	}
720 
721 	return poller;
722 }
723 
724 void
725 spdk_poller_unregister(struct spdk_poller **ppoller)
726 {
727 	struct spdk_thread *thread;
728 	struct spdk_poller *poller;
729 
730 	poller = *ppoller;
731 	if (poller == NULL) {
732 		return;
733 	}
734 
735 	*ppoller = NULL;
736 
737 	thread = spdk_get_thread();
738 	if (!thread) {
739 		assert(false);
740 		return;
741 	}
742 
743 	/* Simply set the state to unregistered. The poller will get cleaned up
744 	 * in a subsequent call to spdk_thread_poll().
745 	 */
746 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
747 }
748 
749 struct call_thread {
750 	struct spdk_thread *cur_thread;
751 	spdk_msg_fn fn;
752 	void *ctx;
753 
754 	struct spdk_thread *orig_thread;
755 	spdk_msg_fn cpl;
756 };
757 
758 static void
759 spdk_on_thread(void *ctx)
760 {
761 	struct call_thread *ct = ctx;
762 
763 	ct->fn(ct->ctx);
764 
765 	pthread_mutex_lock(&g_devlist_mutex);
766 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
767 	pthread_mutex_unlock(&g_devlist_mutex);
768 
769 	if (!ct->cur_thread) {
770 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
771 
772 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
773 		free(ctx);
774 	} else {
775 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
776 			      ct->cur_thread->name);
777 
778 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
779 	}
780 }
781 
782 void
783 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
784 {
785 	struct call_thread *ct;
786 	struct spdk_thread *thread;
787 
788 	ct = calloc(1, sizeof(*ct));
789 	if (!ct) {
790 		SPDK_ERRLOG("Unable to perform thread iteration\n");
791 		cpl(ctx);
792 		return;
793 	}
794 
795 	ct->fn = fn;
796 	ct->ctx = ctx;
797 	ct->cpl = cpl;
798 
799 	thread = _get_thread();
800 	if (!thread) {
801 		SPDK_ERRLOG("No thread allocated\n");
802 		free(ct);
803 		cpl(ctx);
804 		return;
805 	}
806 	ct->orig_thread = thread;
807 
808 	pthread_mutex_lock(&g_devlist_mutex);
809 	ct->cur_thread = TAILQ_FIRST(&g_threads);
810 	pthread_mutex_unlock(&g_devlist_mutex);
811 
812 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
813 		      ct->orig_thread->name);
814 
815 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
816 }
817 
818 void
819 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
820 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
821 			const char *name)
822 {
823 	struct io_device *dev, *tmp;
824 	struct spdk_thread *thread;
825 
826 	assert(io_device != NULL);
827 	assert(create_cb != NULL);
828 	assert(destroy_cb != NULL);
829 
830 	thread = spdk_get_thread();
831 	if (!thread) {
832 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
833 		assert(false);
834 		return;
835 	}
836 
837 	dev = calloc(1, sizeof(struct io_device));
838 	if (dev == NULL) {
839 		SPDK_ERRLOG("could not allocate io_device\n");
840 		return;
841 	}
842 
843 	dev->io_device = io_device;
844 	if (name) {
845 		dev->name = strdup(name);
846 	} else {
847 		dev->name = spdk_sprintf_alloc("%p", dev);
848 	}
849 	dev->create_cb = create_cb;
850 	dev->destroy_cb = destroy_cb;
851 	dev->unregister_cb = NULL;
852 	dev->ctx_size = ctx_size;
853 	dev->for_each_count = 0;
854 	dev->unregistered = false;
855 	dev->refcnt = 0;
856 
857 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
858 		      dev->name, dev->io_device, thread->name);
859 
860 	pthread_mutex_lock(&g_devlist_mutex);
861 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
862 		if (tmp->io_device == io_device) {
863 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
864 				    io_device, tmp->name, dev->name);
865 			free(dev->name);
866 			free(dev);
867 			pthread_mutex_unlock(&g_devlist_mutex);
868 			return;
869 		}
870 	}
871 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
872 	pthread_mutex_unlock(&g_devlist_mutex);
873 }
874 
875 static void
876 _finish_unregister(void *arg)
877 {
878 	struct io_device *dev = arg;
879 
880 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
881 		      dev->name, dev->io_device, dev->unregister_thread->name);
882 
883 	dev->unregister_cb(dev->io_device);
884 	free(dev->name);
885 	free(dev);
886 }
887 
888 static void
889 _spdk_io_device_free(struct io_device *dev)
890 {
891 	if (dev->unregister_cb == NULL) {
892 		free(dev->name);
893 		free(dev);
894 	} else {
895 		assert(dev->unregister_thread != NULL);
896 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
897 			      dev->name, dev->io_device, dev->unregister_thread->name);
898 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
899 	}
900 }
901 
902 void
903 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
904 {
905 	struct io_device *dev;
906 	uint32_t refcnt;
907 	struct spdk_thread *thread;
908 
909 	thread = spdk_get_thread();
910 	if (!thread) {
911 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
912 		assert(false);
913 		return;
914 	}
915 
916 	pthread_mutex_lock(&g_devlist_mutex);
917 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
918 		if (dev->io_device == io_device) {
919 			break;
920 		}
921 	}
922 
923 	if (!dev) {
924 		SPDK_ERRLOG("io_device %p not found\n", io_device);
925 		assert(false);
926 		pthread_mutex_unlock(&g_devlist_mutex);
927 		return;
928 	}
929 
930 	if (dev->for_each_count > 0) {
931 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
932 			    dev->name, io_device, dev->for_each_count);
933 		pthread_mutex_unlock(&g_devlist_mutex);
934 		return;
935 	}
936 
937 	dev->unregister_cb = unregister_cb;
938 	dev->unregistered = true;
939 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
940 	refcnt = dev->refcnt;
941 	dev->unregister_thread = thread;
942 	pthread_mutex_unlock(&g_devlist_mutex);
943 
944 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
945 		      dev->name, dev->io_device, thread->name);
946 
947 	if (refcnt > 0) {
948 		/* defer deletion */
949 		return;
950 	}
951 
952 	_spdk_io_device_free(dev);
953 }
954 
955 struct spdk_io_channel *
956 spdk_get_io_channel(void *io_device)
957 {
958 	struct spdk_io_channel *ch;
959 	struct spdk_thread *thread;
960 	struct io_device *dev;
961 	int rc;
962 
963 	pthread_mutex_lock(&g_devlist_mutex);
964 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
965 		if (dev->io_device == io_device) {
966 			break;
967 		}
968 	}
969 	if (dev == NULL) {
970 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
971 		pthread_mutex_unlock(&g_devlist_mutex);
972 		return NULL;
973 	}
974 
975 	thread = _get_thread();
976 	if (!thread) {
977 		SPDK_ERRLOG("No thread allocated\n");
978 		pthread_mutex_unlock(&g_devlist_mutex);
979 		return NULL;
980 	}
981 
982 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
983 		if (ch->dev == dev) {
984 			ch->ref++;
985 
986 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
987 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
988 
989 			/*
990 			 * An I/O channel already exists for this device on this
991 			 *  thread, so return it.
992 			 */
993 			pthread_mutex_unlock(&g_devlist_mutex);
994 			return ch;
995 		}
996 	}
997 
998 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
999 	if (ch == NULL) {
1000 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1001 		pthread_mutex_unlock(&g_devlist_mutex);
1002 		return NULL;
1003 	}
1004 
1005 	ch->dev = dev;
1006 	ch->destroy_cb = dev->destroy_cb;
1007 	ch->thread = thread;
1008 	ch->ref = 1;
1009 	ch->destroy_ref = 0;
1010 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1011 
1012 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1013 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1014 
1015 	dev->refcnt++;
1016 
1017 	pthread_mutex_unlock(&g_devlist_mutex);
1018 
1019 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1020 	if (rc != 0) {
1021 		pthread_mutex_lock(&g_devlist_mutex);
1022 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1023 		dev->refcnt--;
1024 		free(ch);
1025 		pthread_mutex_unlock(&g_devlist_mutex);
1026 		return NULL;
1027 	}
1028 
1029 	return ch;
1030 }
1031 
1032 static void
1033 _spdk_put_io_channel(void *arg)
1034 {
1035 	struct spdk_io_channel *ch = arg;
1036 	bool do_remove_dev = true;
1037 	struct spdk_thread *thread;
1038 
1039 	thread = spdk_get_thread();
1040 	if (!thread) {
1041 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
1042 		assert(false);
1043 		return;
1044 	}
1045 
1046 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1047 		      "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
1048 		      ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
1049 
1050 	assert(ch->thread == thread);
1051 
1052 	ch->destroy_ref--;
1053 
1054 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1055 		/*
1056 		 * Another reference to the associated io_device was requested
1057 		 *  after this message was sent but before it had a chance to
1058 		 *  execute.
1059 		 */
1060 		return;
1061 	}
1062 
1063 	pthread_mutex_lock(&g_devlist_mutex);
1064 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1065 	pthread_mutex_unlock(&g_devlist_mutex);
1066 
1067 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1068 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1069 
1070 	pthread_mutex_lock(&g_devlist_mutex);
1071 	ch->dev->refcnt--;
1072 
1073 	if (!ch->dev->unregistered) {
1074 		do_remove_dev = false;
1075 	}
1076 
1077 	if (ch->dev->refcnt > 0) {
1078 		do_remove_dev = false;
1079 	}
1080 
1081 	pthread_mutex_unlock(&g_devlist_mutex);
1082 
1083 	if (do_remove_dev) {
1084 		_spdk_io_device_free(ch->dev);
1085 	}
1086 	free(ch);
1087 }
1088 
1089 void
1090 spdk_put_io_channel(struct spdk_io_channel *ch)
1091 {
1092 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1093 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1094 		      ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
1095 
1096 	ch->ref--;
1097 
1098 	if (ch->ref == 0) {
1099 		ch->destroy_ref++;
1100 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
1101 	}
1102 }
1103 
1104 struct spdk_io_channel *
1105 spdk_io_channel_from_ctx(void *ctx)
1106 {
1107 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1108 }
1109 
1110 struct spdk_thread *
1111 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1112 {
1113 	return ch->thread;
1114 }
1115 
1116 struct spdk_io_channel_iter {
1117 	void *io_device;
1118 	struct io_device *dev;
1119 	spdk_channel_msg fn;
1120 	int status;
1121 	void *ctx;
1122 	struct spdk_io_channel *ch;
1123 
1124 	struct spdk_thread *cur_thread;
1125 
1126 	struct spdk_thread *orig_thread;
1127 	spdk_channel_for_each_cpl cpl;
1128 };
1129 
1130 void *
1131 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1132 {
1133 	return i->io_device;
1134 }
1135 
1136 struct spdk_io_channel *
1137 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1138 {
1139 	return i->ch;
1140 }
1141 
1142 void *
1143 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1144 {
1145 	return i->ctx;
1146 }
1147 
1148 static void
1149 _call_completion(void *ctx)
1150 {
1151 	struct spdk_io_channel_iter *i = ctx;
1152 
1153 	if (i->cpl != NULL) {
1154 		i->cpl(i, i->status);
1155 	}
1156 	free(i);
1157 }
1158 
1159 static void
1160 _call_channel(void *ctx)
1161 {
1162 	struct spdk_io_channel_iter *i = ctx;
1163 	struct spdk_io_channel *ch;
1164 
1165 	/*
1166 	 * It is possible that the channel was deleted before this
1167 	 *  message had a chance to execute.  If so, skip calling
1168 	 *  the fn() on this thread.
1169 	 */
1170 	pthread_mutex_lock(&g_devlist_mutex);
1171 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1172 		if (ch->dev->io_device == i->io_device) {
1173 			break;
1174 		}
1175 	}
1176 	pthread_mutex_unlock(&g_devlist_mutex);
1177 
1178 	if (ch) {
1179 		i->fn(i);
1180 	} else {
1181 		spdk_for_each_channel_continue(i, 0);
1182 	}
1183 }
1184 
1185 void
1186 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1187 		      spdk_channel_for_each_cpl cpl)
1188 {
1189 	struct spdk_thread *thread;
1190 	struct spdk_io_channel *ch;
1191 	struct spdk_io_channel_iter *i;
1192 
1193 	i = calloc(1, sizeof(*i));
1194 	if (!i) {
1195 		SPDK_ERRLOG("Unable to allocate iterator\n");
1196 		return;
1197 	}
1198 
1199 	i->io_device = io_device;
1200 	i->fn = fn;
1201 	i->ctx = ctx;
1202 	i->cpl = cpl;
1203 
1204 	pthread_mutex_lock(&g_devlist_mutex);
1205 	i->orig_thread = _get_thread();
1206 
1207 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1208 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1209 			if (ch->dev->io_device == io_device) {
1210 				ch->dev->for_each_count++;
1211 				i->dev = ch->dev;
1212 				i->cur_thread = thread;
1213 				i->ch = ch;
1214 				pthread_mutex_unlock(&g_devlist_mutex);
1215 				spdk_thread_send_msg(thread, _call_channel, i);
1216 				return;
1217 			}
1218 		}
1219 	}
1220 
1221 	pthread_mutex_unlock(&g_devlist_mutex);
1222 
1223 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1224 }
1225 
1226 void
1227 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1228 {
1229 	struct spdk_thread *thread;
1230 	struct spdk_io_channel *ch;
1231 
1232 	assert(i->cur_thread == spdk_get_thread());
1233 
1234 	i->status = status;
1235 
1236 	pthread_mutex_lock(&g_devlist_mutex);
1237 	if (status) {
1238 		goto end;
1239 	}
1240 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1241 	while (thread) {
1242 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1243 			if (ch->dev->io_device == i->io_device) {
1244 				i->cur_thread = thread;
1245 				i->ch = ch;
1246 				pthread_mutex_unlock(&g_devlist_mutex);
1247 				spdk_thread_send_msg(thread, _call_channel, i);
1248 				return;
1249 			}
1250 		}
1251 		thread = TAILQ_NEXT(thread, tailq);
1252 	}
1253 
1254 end:
1255 	i->dev->for_each_count--;
1256 	i->ch = NULL;
1257 	pthread_mutex_unlock(&g_devlist_mutex);
1258 
1259 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1260 }
1261 
1262 
1263 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1264