xref: /spdk/lib/thread/thread.c (revision 2ad757339358ac2c75123d7f397456580be0c525)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/queue.h"
38 #include "spdk/string.h"
39 #include "spdk/thread.h"
40 #include "spdk/util.h"
41 
42 #include "spdk_internal/log.h"
43 #include "spdk_internal/thread.h"
44 
45 #define SPDK_MSG_BATCH_SIZE		8
46 #define SPDK_MAX_DEVICE_NAME_LEN	256
47 #define SPDK_MAX_THREAD_NAME_LEN	256
48 
49 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
50 
51 static spdk_new_thread_fn g_new_thread_fn = NULL;
52 static size_t g_ctx_sz = 0;
53 
54 struct io_device {
55 	void				*io_device;
56 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
57 	spdk_io_channel_create_cb	create_cb;
58 	spdk_io_channel_destroy_cb	destroy_cb;
59 	spdk_io_device_unregister_cb	unregister_cb;
60 	struct spdk_thread		*unregister_thread;
61 	uint32_t			ctx_size;
62 	uint32_t			for_each_count;
63 	TAILQ_ENTRY(io_device)		tailq;
64 
65 	uint32_t			refcnt;
66 
67 	bool				unregistered;
68 };
69 
70 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
71 
72 struct spdk_msg {
73 	spdk_msg_fn		fn;
74 	void			*arg;
75 
76 	SLIST_ENTRY(spdk_msg)	link;
77 };
78 
79 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
80 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
81 
82 enum spdk_poller_state {
83 	/* The poller is registered with a thread but not currently executing its fn. */
84 	SPDK_POLLER_STATE_WAITING,
85 
86 	/* The poller is currently running its fn. */
87 	SPDK_POLLER_STATE_RUNNING,
88 
89 	/* The poller was unregistered during the execution of its fn. */
90 	SPDK_POLLER_STATE_UNREGISTERED,
91 };
92 
93 struct spdk_poller {
94 	TAILQ_ENTRY(spdk_poller)	tailq;
95 
96 	/* Current state of the poller; should only be accessed from the poller's thread. */
97 	enum spdk_poller_state		state;
98 
99 	uint64_t			period_ticks;
100 	uint64_t			next_run_tick;
101 	spdk_poller_fn			fn;
102 	void				*arg;
103 };
104 
105 struct spdk_thread {
106 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
107 	TAILQ_ENTRY(spdk_thread)	tailq;
108 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
109 
110 	bool				exit;
111 
112 	struct spdk_cpuset		cpumask;
113 
114 	uint64_t			tsc_last;
115 	struct spdk_thread_stats	stats;
116 
117 	/*
118 	 * Contains pollers actively running on this thread.  Pollers
119 	 *  are run round-robin. The thread takes one poller from the head
120 	 *  of the ring, executes it, then puts it back at the tail of
121 	 *  the ring.
122 	 */
123 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
124 
125 	/**
126 	 * Contains pollers running on this thread with a periodic timer.
127 	 */
128 	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;
129 
130 	struct spdk_ring		*messages;
131 
132 	SLIST_HEAD(, spdk_msg)		msg_cache;
133 	size_t				msg_cache_count;
134 
135 	/* User context allocated at the end */
136 	uint8_t				ctx[0];
137 };
138 
139 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
140 static uint32_t g_thread_count = 0;
141 
142 static __thread struct spdk_thread *tls_thread = NULL;
143 
144 static inline struct spdk_thread *
145 _get_thread(void)
146 {
147 	return tls_thread;
148 }
149 
150 int
151 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
152 {
153 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
154 
155 	assert(g_new_thread_fn == NULL);
156 	g_new_thread_fn = new_thread_fn;
157 
158 	g_ctx_sz = ctx_sz;
159 
160 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
161 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
162 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
163 			     sizeof(struct spdk_msg),
164 			     0, /* No cache. We do our own. */
165 			     SPDK_ENV_SOCKET_ID_ANY);
166 
167 	if (!g_spdk_msg_mempool) {
168 		return -1;
169 	}
170 
171 	return 0;
172 }
173 
174 void
175 spdk_thread_lib_fini(void)
176 {
177 	struct io_device *dev;
178 
179 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
180 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
181 	}
182 
183 	if (g_spdk_msg_mempool) {
184 		spdk_mempool_free(g_spdk_msg_mempool);
185 		g_spdk_msg_mempool = NULL;
186 	}
187 
188 	g_new_thread_fn = NULL;
189 	g_ctx_sz = 0;
190 }
191 
192 static void
193 _free_thread(struct spdk_thread *thread)
194 {
195 	struct spdk_io_channel *ch;
196 	struct spdk_msg *msg;
197 	struct spdk_poller *poller, *ptmp;
198 
199 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
200 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
201 			    thread->name, ch->dev->name);
202 	}
203 
204 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
205 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
206 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
207 				     poller);
208 		}
209 
210 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
211 		free(poller);
212 	}
213 
214 
215 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
216 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
217 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
218 				     poller);
219 		}
220 
221 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
222 		free(poller);
223 	}
224 
225 	pthread_mutex_lock(&g_devlist_mutex);
226 	assert(g_thread_count > 0);
227 	g_thread_count--;
228 	TAILQ_REMOVE(&g_threads, thread, tailq);
229 	pthread_mutex_unlock(&g_devlist_mutex);
230 
231 	msg = SLIST_FIRST(&thread->msg_cache);
232 	while (msg != NULL) {
233 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
234 
235 		assert(thread->msg_cache_count > 0);
236 		thread->msg_cache_count--;
237 		spdk_mempool_put(g_spdk_msg_mempool, msg);
238 
239 		msg = SLIST_FIRST(&thread->msg_cache);
240 	}
241 
242 	assert(thread->msg_cache_count == 0);
243 
244 	spdk_ring_free(thread->messages);
245 	free(thread);
246 }
247 
248 struct spdk_thread *
249 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
250 {
251 	struct spdk_thread *thread;
252 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
253 	int rc, i;
254 
255 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
256 	if (!thread) {
257 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
258 		return NULL;
259 	}
260 
261 	if (cpumask) {
262 		spdk_cpuset_copy(&thread->cpumask, cpumask);
263 	} else {
264 		spdk_cpuset_negate(&thread->cpumask);
265 	}
266 
267 	TAILQ_INIT(&thread->io_channels);
268 	TAILQ_INIT(&thread->active_pollers);
269 	TAILQ_INIT(&thread->timer_pollers);
270 	SLIST_INIT(&thread->msg_cache);
271 	thread->msg_cache_count = 0;
272 
273 	thread->tsc_last = spdk_get_ticks();
274 
275 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
276 	if (!thread->messages) {
277 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
278 		free(thread);
279 		return NULL;
280 	}
281 
282 	/* Fill the local message pool cache. */
283 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
284 	if (rc == 0) {
285 		/* If we can't populate the cache it's ok. The cache will get filled
286 		 * up organically as messages are passed to the thread. */
287 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
288 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
289 			thread->msg_cache_count++;
290 		}
291 	}
292 
293 	if (name) {
294 		snprintf(thread->name, sizeof(thread->name), "%s", name);
295 	} else {
296 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
297 	}
298 
299 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
300 
301 	pthread_mutex_lock(&g_devlist_mutex);
302 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
303 	g_thread_count++;
304 	pthread_mutex_unlock(&g_devlist_mutex);
305 
306 	if (g_new_thread_fn) {
307 		rc = g_new_thread_fn(thread);
308 		if (rc != 0) {
309 			_free_thread(thread);
310 			return NULL;
311 		}
312 	}
313 
314 	return thread;
315 }
316 
317 void
318 spdk_set_thread(struct spdk_thread *thread)
319 {
320 	tls_thread = thread;
321 }
322 
323 void
324 spdk_thread_exit(struct spdk_thread *thread)
325 {
326 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name);
327 
328 	assert(tls_thread == thread);
329 
330 	thread->exit = true;
331 }
332 
333 void
334 spdk_thread_destroy(struct spdk_thread *thread)
335 {
336 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name);
337 
338 	assert(thread->exit == true);
339 
340 	if (tls_thread == thread) {
341 		tls_thread = NULL;
342 	}
343 
344 	_free_thread(thread);
345 }
346 
347 void *
348 spdk_thread_get_ctx(struct spdk_thread *thread)
349 {
350 	if (g_ctx_sz > 0) {
351 		return thread->ctx;
352 	}
353 
354 	return NULL;
355 }
356 
357 struct spdk_cpuset *
358 spdk_thread_get_cpumask(struct spdk_thread *thread)
359 {
360 	return &thread->cpumask;
361 }
362 
363 struct spdk_thread *
364 spdk_thread_get_from_ctx(void *ctx)
365 {
366 	if (ctx == NULL) {
367 		assert(false);
368 		return NULL;
369 	}
370 
371 	assert(g_ctx_sz > 0);
372 
373 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
374 }
375 
376 static inline uint32_t
377 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
378 {
379 	unsigned count, i;
380 	void *messages[SPDK_MSG_BATCH_SIZE];
381 
382 #ifdef DEBUG
383 	/*
384 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
385 	 * so we will never actually read uninitialized data from events, but just to be sure
386 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
387 	 */
388 	memset(messages, 0, sizeof(messages));
389 #endif
390 
391 	if (max_msgs > 0) {
392 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
393 	} else {
394 		max_msgs = SPDK_MSG_BATCH_SIZE;
395 	}
396 
397 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
398 	if (count == 0) {
399 		return 0;
400 	}
401 
402 	for (i = 0; i < count; i++) {
403 		struct spdk_msg *msg = messages[i];
404 
405 		assert(msg != NULL);
406 		msg->fn(msg->arg);
407 
408 		if (thread->exit) {
409 			break;
410 		}
411 
412 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
413 			/* Insert the messages at the head. We want to re-use the hot
414 			 * ones. */
415 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
416 			thread->msg_cache_count++;
417 		} else {
418 			spdk_mempool_put(g_spdk_msg_mempool, msg);
419 		}
420 	}
421 
422 	return count;
423 }
424 
425 static void
426 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
427 {
428 	struct spdk_poller *iter;
429 
430 	poller->next_run_tick = now + poller->period_ticks;
431 
432 	/*
433 	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
434 	 * run time.
435 	 */
436 	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
437 		if (iter->next_run_tick <= poller->next_run_tick) {
438 			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
439 			return;
440 		}
441 	}
442 
443 	/* No earlier pollers were found, so this poller must be the new head */
444 	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
445 }
446 
447 int
448 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
449 {
450 	uint32_t msg_count;
451 	struct spdk_thread *orig_thread;
452 	struct spdk_poller *poller, *tmp;
453 	int rc = 0;
454 
455 	orig_thread = _get_thread();
456 	tls_thread = thread;
457 
458 	if (now == 0) {
459 		now = spdk_get_ticks();
460 	}
461 
462 	msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
463 	if (msg_count) {
464 		rc = 1;
465 	}
466 
467 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
468 				   active_pollers_head, tailq, tmp) {
469 		int poller_rc;
470 
471 		if (thread->exit) {
472 			break;
473 		}
474 
475 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
476 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
477 			free(poller);
478 			continue;
479 		}
480 
481 		poller->state = SPDK_POLLER_STATE_RUNNING;
482 		poller_rc = poller->fn(poller->arg);
483 
484 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
485 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
486 			free(poller);
487 			continue;
488 		}
489 
490 		poller->state = SPDK_POLLER_STATE_WAITING;
491 
492 #ifdef DEBUG
493 		if (poller_rc == -1) {
494 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
495 		}
496 #endif
497 
498 		if (poller_rc > rc) {
499 			rc = poller_rc;
500 		}
501 
502 	}
503 
504 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
505 		int timer_rc = 0;
506 
507 		if (thread->exit) {
508 			break;
509 		}
510 
511 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
512 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
513 			free(poller);
514 			continue;
515 		}
516 
517 		if (now < poller->next_run_tick) {
518 			break;
519 		}
520 
521 		poller->state = SPDK_POLLER_STATE_RUNNING;
522 		timer_rc = poller->fn(poller->arg);
523 
524 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
525 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
526 			free(poller);
527 			continue;
528 		}
529 
530 		poller->state = SPDK_POLLER_STATE_WAITING;
531 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
532 		_spdk_poller_insert_timer(thread, poller, now);
533 
534 #ifdef DEBUG
535 		if (timer_rc == -1) {
536 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
537 		}
538 #endif
539 
540 		if (timer_rc > rc) {
541 			rc = timer_rc;
542 
543 		}
544 	}
545 
546 	if (rc == 0) {
547 		/* Poller status idle */
548 		thread->stats.idle_tsc += now - thread->tsc_last;
549 	} else if (rc > 0) {
550 		/* Poller status busy */
551 		thread->stats.busy_tsc += now - thread->tsc_last;
552 	}
553 	thread->tsc_last = now;
554 
555 	tls_thread = orig_thread;
556 
557 	return rc;
558 }
559 
560 uint64_t
561 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
562 {
563 	struct spdk_poller *poller;
564 
565 	poller = TAILQ_FIRST(&thread->timer_pollers);
566 	if (poller) {
567 		return poller->next_run_tick;
568 	}
569 
570 	return 0;
571 }
572 
573 int
574 spdk_thread_has_active_pollers(struct spdk_thread *thread)
575 {
576 	return !TAILQ_EMPTY(&thread->active_pollers);
577 }
578 
579 bool
580 spdk_thread_has_pollers(struct spdk_thread *thread)
581 {
582 	if (TAILQ_EMPTY(&thread->active_pollers) &&
583 	    TAILQ_EMPTY(&thread->timer_pollers)) {
584 		return false;
585 	}
586 
587 	return true;
588 }
589 
590 bool
591 spdk_thread_is_idle(struct spdk_thread *thread)
592 {
593 	if (spdk_ring_count(thread->messages) ||
594 	    spdk_thread_has_pollers(thread)) {
595 		return false;
596 	}
597 
598 	return true;
599 }
600 
601 uint32_t
602 spdk_thread_get_count(void)
603 {
604 	/*
605 	 * Return cached value of the current thread count.  We could acquire the
606 	 *  lock and iterate through the TAILQ of threads to count them, but that
607 	 *  count could still be invalidated after we release the lock.
608 	 */
609 	return g_thread_count;
610 }
611 
612 struct spdk_thread *
613 spdk_get_thread(void)
614 {
615 	struct spdk_thread *thread;
616 
617 	thread = _get_thread();
618 	if (!thread) {
619 		SPDK_ERRLOG("No thread allocated\n");
620 	}
621 
622 	return thread;
623 }
624 
625 const char *
626 spdk_thread_get_name(const struct spdk_thread *thread)
627 {
628 	return thread->name;
629 }
630 
631 int
632 spdk_thread_get_stats(struct spdk_thread_stats *stats)
633 {
634 	struct spdk_thread *thread;
635 
636 	thread = _get_thread();
637 	if (!thread) {
638 		SPDK_ERRLOG("No thread allocated\n");
639 		return -EINVAL;
640 	}
641 
642 	if (stats == NULL) {
643 		return -EINVAL;
644 	}
645 
646 	*stats = thread->stats;
647 
648 	return 0;
649 }
650 
651 int
652 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
653 {
654 	struct spdk_thread *local_thread;
655 	struct spdk_msg *msg;
656 	int rc;
657 
658 	assert(thread != NULL);
659 
660 	local_thread = _get_thread();
661 
662 	msg = NULL;
663 	if (local_thread != NULL) {
664 		if (local_thread->msg_cache_count > 0) {
665 			msg = SLIST_FIRST(&local_thread->msg_cache);
666 			assert(msg != NULL);
667 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
668 			local_thread->msg_cache_count--;
669 		}
670 	}
671 
672 	if (msg == NULL) {
673 		msg = spdk_mempool_get(g_spdk_msg_mempool);
674 		if (!msg) {
675 			SPDK_ERRLOG("msg could not be allocated\n");
676 			return -ENOMEM;
677 		}
678 	}
679 
680 	msg->fn = fn;
681 	msg->arg = ctx;
682 
683 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
684 	if (rc != 1) {
685 		SPDK_ERRLOG("msg could not be enqueued\n");
686 		spdk_mempool_put(g_spdk_msg_mempool, msg);
687 		return -EIO;
688 	}
689 
690 	return 0;
691 }
692 
693 struct spdk_poller *
694 spdk_poller_register(spdk_poller_fn fn,
695 		     void *arg,
696 		     uint64_t period_microseconds)
697 {
698 	struct spdk_thread *thread;
699 	struct spdk_poller *poller;
700 	uint64_t quotient, remainder, ticks;
701 
702 	thread = spdk_get_thread();
703 	if (!thread) {
704 		assert(false);
705 		return NULL;
706 	}
707 
708 	poller = calloc(1, sizeof(*poller));
709 	if (poller == NULL) {
710 		SPDK_ERRLOG("Poller memory allocation failed\n");
711 		return NULL;
712 	}
713 
714 	poller->state = SPDK_POLLER_STATE_WAITING;
715 	poller->fn = fn;
716 	poller->arg = arg;
717 
718 	if (period_microseconds) {
719 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
720 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
721 		ticks = spdk_get_ticks_hz();
722 
723 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
724 	} else {
725 		poller->period_ticks = 0;
726 	}
727 
728 	if (poller->period_ticks) {
729 		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
730 	} else {
731 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
732 	}
733 
734 	return poller;
735 }
736 
737 void
738 spdk_poller_unregister(struct spdk_poller **ppoller)
739 {
740 	struct spdk_thread *thread;
741 	struct spdk_poller *poller;
742 
743 	poller = *ppoller;
744 	if (poller == NULL) {
745 		return;
746 	}
747 
748 	*ppoller = NULL;
749 
750 	thread = spdk_get_thread();
751 	if (!thread) {
752 		assert(false);
753 		return;
754 	}
755 
756 	/* Simply set the state to unregistered. The poller will get cleaned up
757 	 * in a subsequent call to spdk_thread_poll().
758 	 */
759 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
760 }
761 
762 struct call_thread {
763 	struct spdk_thread *cur_thread;
764 	spdk_msg_fn fn;
765 	void *ctx;
766 
767 	struct spdk_thread *orig_thread;
768 	spdk_msg_fn cpl;
769 };
770 
771 static void
772 spdk_on_thread(void *ctx)
773 {
774 	struct call_thread *ct = ctx;
775 
776 	ct->fn(ct->ctx);
777 
778 	pthread_mutex_lock(&g_devlist_mutex);
779 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
780 	pthread_mutex_unlock(&g_devlist_mutex);
781 
782 	if (!ct->cur_thread) {
783 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
784 
785 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
786 		free(ctx);
787 	} else {
788 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
789 			      ct->cur_thread->name);
790 
791 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
792 	}
793 }
794 
795 void
796 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
797 {
798 	struct call_thread *ct;
799 	struct spdk_thread *thread;
800 
801 	ct = calloc(1, sizeof(*ct));
802 	if (!ct) {
803 		SPDK_ERRLOG("Unable to perform thread iteration\n");
804 		cpl(ctx);
805 		return;
806 	}
807 
808 	ct->fn = fn;
809 	ct->ctx = ctx;
810 	ct->cpl = cpl;
811 
812 	thread = _get_thread();
813 	if (!thread) {
814 		SPDK_ERRLOG("No thread allocated\n");
815 		free(ct);
816 		cpl(ctx);
817 		return;
818 	}
819 	ct->orig_thread = thread;
820 
821 	pthread_mutex_lock(&g_devlist_mutex);
822 	ct->cur_thread = TAILQ_FIRST(&g_threads);
823 	pthread_mutex_unlock(&g_devlist_mutex);
824 
825 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
826 		      ct->orig_thread->name);
827 
828 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
829 }
830 
831 void
832 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
833 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
834 			const char *name)
835 {
836 	struct io_device *dev, *tmp;
837 	struct spdk_thread *thread;
838 
839 	assert(io_device != NULL);
840 	assert(create_cb != NULL);
841 	assert(destroy_cb != NULL);
842 
843 	thread = spdk_get_thread();
844 	if (!thread) {
845 		SPDK_ERRLOG("called from non-SPDK thread\n");
846 		assert(false);
847 		return;
848 	}
849 
850 	dev = calloc(1, sizeof(struct io_device));
851 	if (dev == NULL) {
852 		SPDK_ERRLOG("could not allocate io_device\n");
853 		return;
854 	}
855 
856 	dev->io_device = io_device;
857 	if (name) {
858 		snprintf(dev->name, sizeof(dev->name), "%s", name);
859 	} else {
860 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
861 	}
862 	dev->create_cb = create_cb;
863 	dev->destroy_cb = destroy_cb;
864 	dev->unregister_cb = NULL;
865 	dev->ctx_size = ctx_size;
866 	dev->for_each_count = 0;
867 	dev->unregistered = false;
868 	dev->refcnt = 0;
869 
870 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
871 		      dev->name, dev->io_device, thread->name);
872 
873 	pthread_mutex_lock(&g_devlist_mutex);
874 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
875 		if (tmp->io_device == io_device) {
876 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
877 				    io_device, tmp->name, dev->name);
878 			free(dev);
879 			pthread_mutex_unlock(&g_devlist_mutex);
880 			return;
881 		}
882 	}
883 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
884 	pthread_mutex_unlock(&g_devlist_mutex);
885 }
886 
887 static void
888 _finish_unregister(void *arg)
889 {
890 	struct io_device *dev = arg;
891 
892 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
893 		      dev->name, dev->io_device, dev->unregister_thread->name);
894 
895 	dev->unregister_cb(dev->io_device);
896 	free(dev);
897 }
898 
899 static void
900 _spdk_io_device_free(struct io_device *dev)
901 {
902 	if (dev->unregister_cb == NULL) {
903 		free(dev);
904 	} else {
905 		assert(dev->unregister_thread != NULL);
906 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
907 			      dev->name, dev->io_device, dev->unregister_thread->name);
908 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
909 	}
910 }
911 
912 void
913 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
914 {
915 	struct io_device *dev;
916 	uint32_t refcnt;
917 	struct spdk_thread *thread;
918 
919 	thread = spdk_get_thread();
920 	if (!thread) {
921 		SPDK_ERRLOG("called from non-SPDK thread\n");
922 		assert(false);
923 		return;
924 	}
925 
926 	pthread_mutex_lock(&g_devlist_mutex);
927 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
928 		if (dev->io_device == io_device) {
929 			break;
930 		}
931 	}
932 
933 	if (!dev) {
934 		SPDK_ERRLOG("io_device %p not found\n", io_device);
935 		assert(false);
936 		pthread_mutex_unlock(&g_devlist_mutex);
937 		return;
938 	}
939 
940 	if (dev->for_each_count > 0) {
941 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
942 			    dev->name, io_device, dev->for_each_count);
943 		pthread_mutex_unlock(&g_devlist_mutex);
944 		return;
945 	}
946 
947 	dev->unregister_cb = unregister_cb;
948 	dev->unregistered = true;
949 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
950 	refcnt = dev->refcnt;
951 	dev->unregister_thread = thread;
952 	pthread_mutex_unlock(&g_devlist_mutex);
953 
954 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
955 		      dev->name, dev->io_device, thread->name);
956 
957 	if (refcnt > 0) {
958 		/* defer deletion */
959 		return;
960 	}
961 
962 	_spdk_io_device_free(dev);
963 }
964 
965 struct spdk_io_channel *
966 spdk_get_io_channel(void *io_device)
967 {
968 	struct spdk_io_channel *ch;
969 	struct spdk_thread *thread;
970 	struct io_device *dev;
971 	int rc;
972 
973 	pthread_mutex_lock(&g_devlist_mutex);
974 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
975 		if (dev->io_device == io_device) {
976 			break;
977 		}
978 	}
979 	if (dev == NULL) {
980 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
981 		pthread_mutex_unlock(&g_devlist_mutex);
982 		return NULL;
983 	}
984 
985 	thread = _get_thread();
986 	if (!thread) {
987 		SPDK_ERRLOG("No thread allocated\n");
988 		pthread_mutex_unlock(&g_devlist_mutex);
989 		return NULL;
990 	}
991 
992 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
993 		if (ch->dev == dev) {
994 			ch->ref++;
995 
996 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
997 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
998 
999 			/*
1000 			 * An I/O channel already exists for this device on this
1001 			 *  thread, so return it.
1002 			 */
1003 			pthread_mutex_unlock(&g_devlist_mutex);
1004 			return ch;
1005 		}
1006 	}
1007 
1008 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1009 	if (ch == NULL) {
1010 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1011 		pthread_mutex_unlock(&g_devlist_mutex);
1012 		return NULL;
1013 	}
1014 
1015 	ch->dev = dev;
1016 	ch->destroy_cb = dev->destroy_cb;
1017 	ch->thread = thread;
1018 	ch->ref = 1;
1019 	ch->destroy_ref = 0;
1020 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1021 
1022 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1023 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1024 
1025 	dev->refcnt++;
1026 
1027 	pthread_mutex_unlock(&g_devlist_mutex);
1028 
1029 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1030 	if (rc != 0) {
1031 		pthread_mutex_lock(&g_devlist_mutex);
1032 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1033 		dev->refcnt--;
1034 		free(ch);
1035 		pthread_mutex_unlock(&g_devlist_mutex);
1036 		return NULL;
1037 	}
1038 
1039 	return ch;
1040 }
1041 
1042 static void
1043 _spdk_put_io_channel(void *arg)
1044 {
1045 	struct spdk_io_channel *ch = arg;
1046 	bool do_remove_dev = true;
1047 	struct spdk_thread *thread;
1048 
1049 	thread = spdk_get_thread();
1050 	if (!thread) {
1051 		SPDK_ERRLOG("called from non-SPDK thread\n");
1052 		assert(false);
1053 		return;
1054 	}
1055 
1056 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1057 		      "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
1058 		      ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
1059 
1060 	assert(ch->thread == thread);
1061 
1062 	ch->destroy_ref--;
1063 
1064 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1065 		/*
1066 		 * Another reference to the associated io_device was requested
1067 		 *  after this message was sent but before it had a chance to
1068 		 *  execute.
1069 		 */
1070 		return;
1071 	}
1072 
1073 	pthread_mutex_lock(&g_devlist_mutex);
1074 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1075 	pthread_mutex_unlock(&g_devlist_mutex);
1076 
1077 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1078 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1079 
1080 	pthread_mutex_lock(&g_devlist_mutex);
1081 	ch->dev->refcnt--;
1082 
1083 	if (!ch->dev->unregistered) {
1084 		do_remove_dev = false;
1085 	}
1086 
1087 	if (ch->dev->refcnt > 0) {
1088 		do_remove_dev = false;
1089 	}
1090 
1091 	pthread_mutex_unlock(&g_devlist_mutex);
1092 
1093 	if (do_remove_dev) {
1094 		_spdk_io_device_free(ch->dev);
1095 	}
1096 	free(ch);
1097 }
1098 
1099 void
1100 spdk_put_io_channel(struct spdk_io_channel *ch)
1101 {
1102 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1103 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1104 		      ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
1105 
1106 	ch->ref--;
1107 
1108 	if (ch->ref == 0) {
1109 		ch->destroy_ref++;
1110 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
1111 	}
1112 }
1113 
1114 struct spdk_io_channel *
1115 spdk_io_channel_from_ctx(void *ctx)
1116 {
1117 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1118 }
1119 
1120 struct spdk_thread *
1121 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1122 {
1123 	return ch->thread;
1124 }
1125 
1126 struct spdk_io_channel_iter {
1127 	void *io_device;
1128 	struct io_device *dev;
1129 	spdk_channel_msg fn;
1130 	int status;
1131 	void *ctx;
1132 	struct spdk_io_channel *ch;
1133 
1134 	struct spdk_thread *cur_thread;
1135 
1136 	struct spdk_thread *orig_thread;
1137 	spdk_channel_for_each_cpl cpl;
1138 };
1139 
1140 void *
1141 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1142 {
1143 	return i->io_device;
1144 }
1145 
1146 struct spdk_io_channel *
1147 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1148 {
1149 	return i->ch;
1150 }
1151 
1152 void *
1153 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1154 {
1155 	return i->ctx;
1156 }
1157 
1158 static void
1159 _call_completion(void *ctx)
1160 {
1161 	struct spdk_io_channel_iter *i = ctx;
1162 
1163 	if (i->cpl != NULL) {
1164 		i->cpl(i, i->status);
1165 	}
1166 	free(i);
1167 }
1168 
1169 static void
1170 _call_channel(void *ctx)
1171 {
1172 	struct spdk_io_channel_iter *i = ctx;
1173 	struct spdk_io_channel *ch;
1174 
1175 	/*
1176 	 * It is possible that the channel was deleted before this
1177 	 *  message had a chance to execute.  If so, skip calling
1178 	 *  the fn() on this thread.
1179 	 */
1180 	pthread_mutex_lock(&g_devlist_mutex);
1181 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1182 		if (ch->dev->io_device == i->io_device) {
1183 			break;
1184 		}
1185 	}
1186 	pthread_mutex_unlock(&g_devlist_mutex);
1187 
1188 	if (ch) {
1189 		i->fn(i);
1190 	} else {
1191 		spdk_for_each_channel_continue(i, 0);
1192 	}
1193 }
1194 
1195 void
1196 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1197 		      spdk_channel_for_each_cpl cpl)
1198 {
1199 	struct spdk_thread *thread;
1200 	struct spdk_io_channel *ch;
1201 	struct spdk_io_channel_iter *i;
1202 	int rc __attribute__((unused));
1203 
1204 	i = calloc(1, sizeof(*i));
1205 	if (!i) {
1206 		SPDK_ERRLOG("Unable to allocate iterator\n");
1207 		return;
1208 	}
1209 
1210 	i->io_device = io_device;
1211 	i->fn = fn;
1212 	i->ctx = ctx;
1213 	i->cpl = cpl;
1214 
1215 	pthread_mutex_lock(&g_devlist_mutex);
1216 	i->orig_thread = _get_thread();
1217 
1218 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1219 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1220 			if (ch->dev->io_device == io_device) {
1221 				ch->dev->for_each_count++;
1222 				i->dev = ch->dev;
1223 				i->cur_thread = thread;
1224 				i->ch = ch;
1225 				pthread_mutex_unlock(&g_devlist_mutex);
1226 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1227 				assert(rc == 0);
1228 				return;
1229 			}
1230 		}
1231 	}
1232 
1233 	pthread_mutex_unlock(&g_devlist_mutex);
1234 
1235 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1236 	assert(rc == 0);
1237 }
1238 
1239 void
1240 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1241 {
1242 	struct spdk_thread *thread;
1243 	struct spdk_io_channel *ch;
1244 
1245 	assert(i->cur_thread == spdk_get_thread());
1246 
1247 	i->status = status;
1248 
1249 	pthread_mutex_lock(&g_devlist_mutex);
1250 	if (status) {
1251 		goto end;
1252 	}
1253 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1254 	while (thread) {
1255 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1256 			if (ch->dev->io_device == i->io_device) {
1257 				i->cur_thread = thread;
1258 				i->ch = ch;
1259 				pthread_mutex_unlock(&g_devlist_mutex);
1260 				spdk_thread_send_msg(thread, _call_channel, i);
1261 				return;
1262 			}
1263 		}
1264 		thread = TAILQ_NEXT(thread, tailq);
1265 	}
1266 
1267 end:
1268 	i->dev->for_each_count--;
1269 	i->ch = NULL;
1270 	pthread_mutex_unlock(&g_devlist_mutex);
1271 
1272 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1273 }
1274 
1275 
1276 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1277