xref: /spdk/lib/thread/thread.c (revision ae7b5890ef728af40bd233a5011b924c482603bf)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/queue.h"
38 #include "spdk/string.h"
39 #include "spdk/thread.h"
40 #include "spdk/util.h"
41 
42 #include "spdk_internal/log.h"
43 #include "spdk_internal/thread.h"
44 
45 #define SPDK_MSG_BATCH_SIZE		8
46 #define SPDK_MAX_DEVICE_NAME_LEN	256
47 #define SPDK_MAX_THREAD_NAME_LEN	256
48 
49 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
50 
51 static spdk_new_thread_fn g_new_thread_fn = NULL;
52 static size_t g_ctx_sz = 0;
53 
54 struct io_device {
55 	void				*io_device;
56 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
57 	spdk_io_channel_create_cb	create_cb;
58 	spdk_io_channel_destroy_cb	destroy_cb;
59 	spdk_io_device_unregister_cb	unregister_cb;
60 	struct spdk_thread		*unregister_thread;
61 	uint32_t			ctx_size;
62 	uint32_t			for_each_count;
63 	TAILQ_ENTRY(io_device)		tailq;
64 
65 	uint32_t			refcnt;
66 
67 	bool				unregistered;
68 };
69 
70 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
71 
72 struct spdk_msg {
73 	spdk_msg_fn		fn;
74 	void			*arg;
75 
76 	SLIST_ENTRY(spdk_msg)	link;
77 };
78 
79 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
80 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
81 
82 enum spdk_poller_state {
83 	/* The poller is registered with a thread but not currently executing its fn. */
84 	SPDK_POLLER_STATE_WAITING,
85 
86 	/* The poller is currently running its fn. */
87 	SPDK_POLLER_STATE_RUNNING,
88 
89 	/* The poller was unregistered during the execution of its fn. */
90 	SPDK_POLLER_STATE_UNREGISTERED,
91 };
92 
93 struct spdk_poller {
94 	TAILQ_ENTRY(spdk_poller)	tailq;
95 
96 	/* Current state of the poller; should only be accessed from the poller's thread. */
97 	enum spdk_poller_state		state;
98 
99 	uint64_t			period_ticks;
100 	uint64_t			next_run_tick;
101 	spdk_poller_fn			fn;
102 	void				*arg;
103 };
104 
105 struct spdk_thread {
106 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
107 	TAILQ_ENTRY(spdk_thread)	tailq;
108 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
109 
110 	bool				exit;
111 
112 	struct spdk_cpuset		cpumask;
113 
114 	uint64_t			tsc_last;
115 	struct spdk_thread_stats	stats;
116 
117 	/*
118 	 * Contains pollers actively running on this thread.  Pollers
119 	 *  are run round-robin. The thread takes one poller from the head
120 	 *  of the ring, executes it, then puts it back at the tail of
121 	 *  the ring.
122 	 */
123 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
124 
125 	/**
126 	 * Contains pollers running on this thread with a periodic timer.
127 	 */
128 	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;
129 
130 	struct spdk_ring		*messages;
131 
132 	SLIST_HEAD(, spdk_msg)		msg_cache;
133 	size_t				msg_cache_count;
134 
135 	/* User context allocated at the end */
136 	uint8_t				ctx[0];
137 };
138 
139 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
140 static uint32_t g_thread_count = 0;
141 
142 static __thread struct spdk_thread *tls_thread = NULL;
143 
144 static inline struct spdk_thread *
145 _get_thread(void)
146 {
147 	return tls_thread;
148 }
149 
150 int
151 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
152 {
153 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
154 
155 	assert(g_new_thread_fn == NULL);
156 	g_new_thread_fn = new_thread_fn;
157 
158 	g_ctx_sz = ctx_sz;
159 
160 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
161 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
162 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
163 			     sizeof(struct spdk_msg),
164 			     0, /* No cache. We do our own. */
165 			     SPDK_ENV_SOCKET_ID_ANY);
166 
167 	if (!g_spdk_msg_mempool) {
168 		return -1;
169 	}
170 
171 	return 0;
172 }
173 
174 void
175 spdk_thread_lib_fini(void)
176 {
177 	struct io_device *dev;
178 
179 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
180 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
181 	}
182 
183 	if (g_spdk_msg_mempool) {
184 		spdk_mempool_free(g_spdk_msg_mempool);
185 		g_spdk_msg_mempool = NULL;
186 	}
187 
188 	g_new_thread_fn = NULL;
189 	g_ctx_sz = 0;
190 }
191 
192 static void
193 _free_thread(struct spdk_thread *thread)
194 {
195 	struct spdk_io_channel *ch;
196 	struct spdk_msg *msg;
197 	struct spdk_poller *poller, *ptmp;
198 
199 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
200 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
201 			    thread->name, ch->dev->name);
202 	}
203 
204 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
205 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
206 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
207 				     poller);
208 		}
209 
210 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
211 		free(poller);
212 	}
213 
214 
215 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
216 		if (poller->state == SPDK_POLLER_STATE_WAITING) {
217 			SPDK_WARNLOG("poller %p still registered at thread exit\n",
218 				     poller);
219 		}
220 
221 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
222 		free(poller);
223 	}
224 
225 	pthread_mutex_lock(&g_devlist_mutex);
226 	assert(g_thread_count > 0);
227 	g_thread_count--;
228 	TAILQ_REMOVE(&g_threads, thread, tailq);
229 	pthread_mutex_unlock(&g_devlist_mutex);
230 
231 	msg = SLIST_FIRST(&thread->msg_cache);
232 	while (msg != NULL) {
233 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
234 
235 		assert(thread->msg_cache_count > 0);
236 		thread->msg_cache_count--;
237 		spdk_mempool_put(g_spdk_msg_mempool, msg);
238 
239 		msg = SLIST_FIRST(&thread->msg_cache);
240 	}
241 
242 	assert(thread->msg_cache_count == 0);
243 
244 	spdk_ring_free(thread->messages);
245 	free(thread);
246 }
247 
248 struct spdk_thread *
249 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
250 {
251 	struct spdk_thread *thread;
252 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
253 	int rc, i;
254 
255 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
256 	if (!thread) {
257 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
258 		return NULL;
259 	}
260 
261 	if (cpumask) {
262 		spdk_cpuset_copy(&thread->cpumask, cpumask);
263 	} else {
264 		spdk_cpuset_negate(&thread->cpumask);
265 	}
266 
267 	TAILQ_INIT(&thread->io_channels);
268 	TAILQ_INIT(&thread->active_pollers);
269 	TAILQ_INIT(&thread->timer_pollers);
270 	SLIST_INIT(&thread->msg_cache);
271 	thread->msg_cache_count = 0;
272 
273 	thread->tsc_last = spdk_get_ticks();
274 
275 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
276 	if (!thread->messages) {
277 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
278 		free(thread);
279 		return NULL;
280 	}
281 
282 	/* Fill the local message pool cache. */
283 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
284 	if (rc == 0) {
285 		/* If we can't populate the cache it's ok. The cache will get filled
286 		 * up organically as messages are passed to the thread. */
287 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
288 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
289 			thread->msg_cache_count++;
290 		}
291 	}
292 
293 	if (name) {
294 		snprintf(thread->name, sizeof(thread->name), "%s", name);
295 	} else {
296 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
297 	}
298 
299 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
300 
301 	pthread_mutex_lock(&g_devlist_mutex);
302 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
303 	g_thread_count++;
304 	pthread_mutex_unlock(&g_devlist_mutex);
305 
306 	if (g_new_thread_fn) {
307 		rc = g_new_thread_fn(thread);
308 		if (rc != 0) {
309 			_free_thread(thread);
310 			return NULL;
311 		}
312 	}
313 
314 	return thread;
315 }
316 
317 void
318 spdk_set_thread(struct spdk_thread *thread)
319 {
320 	tls_thread = thread;
321 }
322 
323 void
324 spdk_thread_exit(struct spdk_thread *thread)
325 {
326 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name);
327 
328 	assert(tls_thread == thread);
329 
330 	thread->exit = true;
331 }
332 
333 void
334 spdk_thread_destroy(struct spdk_thread *thread)
335 {
336 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name);
337 
338 	assert(thread->exit == true);
339 
340 	if (tls_thread == thread) {
341 		tls_thread = NULL;
342 	}
343 
344 	_free_thread(thread);
345 }
346 
347 void *
348 spdk_thread_get_ctx(struct spdk_thread *thread)
349 {
350 	if (g_ctx_sz > 0) {
351 		return thread->ctx;
352 	}
353 
354 	return NULL;
355 }
356 
357 struct spdk_cpuset *
358 spdk_thread_get_cpumask(struct spdk_thread *thread)
359 {
360 	return &thread->cpumask;
361 }
362 
363 struct spdk_thread *
364 spdk_thread_get_from_ctx(void *ctx)
365 {
366 	if (ctx == NULL) {
367 		assert(false);
368 		return NULL;
369 	}
370 
371 	assert(g_ctx_sz > 0);
372 
373 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
374 }
375 
376 static inline uint32_t
377 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
378 {
379 	unsigned count, i;
380 	void *messages[SPDK_MSG_BATCH_SIZE];
381 
382 #ifdef DEBUG
383 	/*
384 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
385 	 * so we will never actually read uninitialized data from events, but just to be sure
386 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
387 	 */
388 	memset(messages, 0, sizeof(messages));
389 #endif
390 
391 	if (max_msgs > 0) {
392 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
393 	} else {
394 		max_msgs = SPDK_MSG_BATCH_SIZE;
395 	}
396 
397 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
398 	if (count == 0) {
399 		return 0;
400 	}
401 
402 	for (i = 0; i < count; i++) {
403 		struct spdk_msg *msg = messages[i];
404 
405 		assert(msg != NULL);
406 		msg->fn(msg->arg);
407 
408 		if (thread->exit) {
409 			break;
410 		}
411 
412 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
413 			/* Insert the messages at the head. We want to re-use the hot
414 			 * ones. */
415 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
416 			thread->msg_cache_count++;
417 		} else {
418 			spdk_mempool_put(g_spdk_msg_mempool, msg);
419 		}
420 	}
421 
422 	return count;
423 }
424 
425 static void
426 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
427 {
428 	struct spdk_poller *iter;
429 
430 	poller->next_run_tick = now + poller->period_ticks;
431 
432 	/*
433 	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
434 	 * run time.
435 	 */
436 	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
437 		if (iter->next_run_tick <= poller->next_run_tick) {
438 			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
439 			return;
440 		}
441 	}
442 
443 	/* No earlier pollers were found, so this poller must be the new head */
444 	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
445 }
446 
447 int
448 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
449 {
450 	uint32_t msg_count;
451 	struct spdk_thread *orig_thread;
452 	struct spdk_poller *poller, *tmp;
453 	int rc = 0;
454 
455 	orig_thread = _get_thread();
456 	tls_thread = thread;
457 
458 	if (now == 0) {
459 		now = spdk_get_ticks();
460 	}
461 
462 	msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
463 	if (msg_count) {
464 		rc = 1;
465 	}
466 
467 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
468 				   active_pollers_head, tailq, tmp) {
469 		int poller_rc;
470 
471 		if (thread->exit) {
472 			break;
473 		}
474 
475 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
476 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
477 			free(poller);
478 			continue;
479 		}
480 
481 		poller->state = SPDK_POLLER_STATE_RUNNING;
482 		poller_rc = poller->fn(poller->arg);
483 
484 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
485 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
486 			free(poller);
487 			continue;
488 		}
489 
490 		poller->state = SPDK_POLLER_STATE_WAITING;
491 
492 #ifdef DEBUG
493 		if (poller_rc == -1) {
494 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
495 		}
496 #endif
497 
498 		if (poller_rc > rc) {
499 			rc = poller_rc;
500 		}
501 
502 	}
503 
504 	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
505 		int timer_rc = 0;
506 
507 		if (thread->exit) {
508 			break;
509 		}
510 
511 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
512 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
513 			free(poller);
514 			continue;
515 		}
516 
517 		if (now < poller->next_run_tick) {
518 			break;
519 		}
520 
521 		poller->state = SPDK_POLLER_STATE_RUNNING;
522 		timer_rc = poller->fn(poller->arg);
523 
524 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
525 			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
526 			free(poller);
527 			continue;
528 		}
529 
530 		poller->state = SPDK_POLLER_STATE_WAITING;
531 		TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
532 		_spdk_poller_insert_timer(thread, poller, now);
533 
534 #ifdef DEBUG
535 		if (timer_rc == -1) {
536 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
537 		}
538 #endif
539 
540 		if (timer_rc > rc) {
541 			rc = timer_rc;
542 
543 		}
544 	}
545 
546 	if (rc == 0) {
547 		/* Poller status idle */
548 		thread->stats.idle_tsc += now - thread->tsc_last;
549 	} else if (rc > 0) {
550 		/* Poller status busy */
551 		thread->stats.busy_tsc += now - thread->tsc_last;
552 	}
553 	thread->tsc_last = now;
554 
555 	tls_thread = orig_thread;
556 
557 	return rc;
558 }
559 
560 uint64_t
561 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
562 {
563 	struct spdk_poller *poller;
564 
565 	poller = TAILQ_FIRST(&thread->timer_pollers);
566 	if (poller) {
567 		return poller->next_run_tick;
568 	}
569 
570 	return 0;
571 }
572 
573 int
574 spdk_thread_has_active_pollers(struct spdk_thread *thread)
575 {
576 	return !TAILQ_EMPTY(&thread->active_pollers);
577 }
578 
579 bool
580 spdk_thread_has_pollers(struct spdk_thread *thread)
581 {
582 	if (TAILQ_EMPTY(&thread->active_pollers) &&
583 	    TAILQ_EMPTY(&thread->timer_pollers)) {
584 		return false;
585 	}
586 
587 	return true;
588 }
589 
590 bool
591 spdk_thread_is_idle(struct spdk_thread *thread)
592 {
593 	if (spdk_ring_count(thread->messages) ||
594 	    spdk_thread_has_pollers(thread)) {
595 		return false;
596 	}
597 
598 	return true;
599 }
600 
601 uint32_t
602 spdk_thread_get_count(void)
603 {
604 	/*
605 	 * Return cached value of the current thread count.  We could acquire the
606 	 *  lock and iterate through the TAILQ of threads to count them, but that
607 	 *  count could still be invalidated after we release the lock.
608 	 */
609 	return g_thread_count;
610 }
611 
612 struct spdk_thread *
613 spdk_get_thread(void)
614 {
615 	struct spdk_thread *thread;
616 
617 	thread = _get_thread();
618 	if (!thread) {
619 		SPDK_ERRLOG("No thread allocated\n");
620 	}
621 
622 	return thread;
623 }
624 
625 const char *
626 spdk_thread_get_name(const struct spdk_thread *thread)
627 {
628 	return thread->name;
629 }
630 
631 int
632 spdk_thread_get_stats(struct spdk_thread_stats *stats)
633 {
634 	struct spdk_thread *thread;
635 
636 	thread = _get_thread();
637 	if (!thread) {
638 		SPDK_ERRLOG("No thread allocated\n");
639 		return -EINVAL;
640 	}
641 
642 	if (stats == NULL) {
643 		return -EINVAL;
644 	}
645 
646 	*stats = thread->stats;
647 
648 	return 0;
649 }
650 
651 void
652 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
653 {
654 	struct spdk_thread *local_thread;
655 	struct spdk_msg *msg;
656 	int rc;
657 
658 	if (!thread) {
659 		assert(false);
660 		return;
661 	}
662 
663 	local_thread = _get_thread();
664 
665 	msg = NULL;
666 	if (local_thread != NULL) {
667 		if (local_thread->msg_cache_count > 0) {
668 			msg = SLIST_FIRST(&local_thread->msg_cache);
669 			assert(msg != NULL);
670 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
671 			local_thread->msg_cache_count--;
672 		}
673 	}
674 
675 	if (msg == NULL) {
676 		msg = spdk_mempool_get(g_spdk_msg_mempool);
677 		if (!msg) {
678 			assert(false);
679 			return;
680 		}
681 	}
682 
683 	msg->fn = fn;
684 	msg->arg = ctx;
685 
686 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
687 	if (rc != 1) {
688 		assert(false);
689 		spdk_mempool_put(g_spdk_msg_mempool, msg);
690 		return;
691 	}
692 }
693 
694 struct spdk_poller *
695 spdk_poller_register(spdk_poller_fn fn,
696 		     void *arg,
697 		     uint64_t period_microseconds)
698 {
699 	struct spdk_thread *thread;
700 	struct spdk_poller *poller;
701 	uint64_t quotient, remainder, ticks;
702 
703 	thread = spdk_get_thread();
704 	if (!thread) {
705 		assert(false);
706 		return NULL;
707 	}
708 
709 	poller = calloc(1, sizeof(*poller));
710 	if (poller == NULL) {
711 		SPDK_ERRLOG("Poller memory allocation failed\n");
712 		return NULL;
713 	}
714 
715 	poller->state = SPDK_POLLER_STATE_WAITING;
716 	poller->fn = fn;
717 	poller->arg = arg;
718 
719 	if (period_microseconds) {
720 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
721 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
722 		ticks = spdk_get_ticks_hz();
723 
724 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
725 	} else {
726 		poller->period_ticks = 0;
727 	}
728 
729 	if (poller->period_ticks) {
730 		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
731 	} else {
732 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
733 	}
734 
735 	return poller;
736 }
737 
738 void
739 spdk_poller_unregister(struct spdk_poller **ppoller)
740 {
741 	struct spdk_thread *thread;
742 	struct spdk_poller *poller;
743 
744 	poller = *ppoller;
745 	if (poller == NULL) {
746 		return;
747 	}
748 
749 	*ppoller = NULL;
750 
751 	thread = spdk_get_thread();
752 	if (!thread) {
753 		assert(false);
754 		return;
755 	}
756 
757 	/* Simply set the state to unregistered. The poller will get cleaned up
758 	 * in a subsequent call to spdk_thread_poll().
759 	 */
760 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
761 }
762 
763 struct call_thread {
764 	struct spdk_thread *cur_thread;
765 	spdk_msg_fn fn;
766 	void *ctx;
767 
768 	struct spdk_thread *orig_thread;
769 	spdk_msg_fn cpl;
770 };
771 
772 static void
773 spdk_on_thread(void *ctx)
774 {
775 	struct call_thread *ct = ctx;
776 
777 	ct->fn(ct->ctx);
778 
779 	pthread_mutex_lock(&g_devlist_mutex);
780 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
781 	pthread_mutex_unlock(&g_devlist_mutex);
782 
783 	if (!ct->cur_thread) {
784 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
785 
786 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
787 		free(ctx);
788 	} else {
789 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
790 			      ct->cur_thread->name);
791 
792 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
793 	}
794 }
795 
796 void
797 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
798 {
799 	struct call_thread *ct;
800 	struct spdk_thread *thread;
801 
802 	ct = calloc(1, sizeof(*ct));
803 	if (!ct) {
804 		SPDK_ERRLOG("Unable to perform thread iteration\n");
805 		cpl(ctx);
806 		return;
807 	}
808 
809 	ct->fn = fn;
810 	ct->ctx = ctx;
811 	ct->cpl = cpl;
812 
813 	thread = _get_thread();
814 	if (!thread) {
815 		SPDK_ERRLOG("No thread allocated\n");
816 		free(ct);
817 		cpl(ctx);
818 		return;
819 	}
820 	ct->orig_thread = thread;
821 
822 	pthread_mutex_lock(&g_devlist_mutex);
823 	ct->cur_thread = TAILQ_FIRST(&g_threads);
824 	pthread_mutex_unlock(&g_devlist_mutex);
825 
826 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
827 		      ct->orig_thread->name);
828 
829 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
830 }
831 
832 void
833 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
834 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
835 			const char *name)
836 {
837 	struct io_device *dev, *tmp;
838 	struct spdk_thread *thread;
839 
840 	assert(io_device != NULL);
841 	assert(create_cb != NULL);
842 	assert(destroy_cb != NULL);
843 
844 	thread = spdk_get_thread();
845 	if (!thread) {
846 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
847 		assert(false);
848 		return;
849 	}
850 
851 	dev = calloc(1, sizeof(struct io_device));
852 	if (dev == NULL) {
853 		SPDK_ERRLOG("could not allocate io_device\n");
854 		return;
855 	}
856 
857 	dev->io_device = io_device;
858 	if (name) {
859 		snprintf(dev->name, sizeof(dev->name), "%s", name);
860 	} else {
861 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
862 	}
863 	dev->create_cb = create_cb;
864 	dev->destroy_cb = destroy_cb;
865 	dev->unregister_cb = NULL;
866 	dev->ctx_size = ctx_size;
867 	dev->for_each_count = 0;
868 	dev->unregistered = false;
869 	dev->refcnt = 0;
870 
871 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
872 		      dev->name, dev->io_device, thread->name);
873 
874 	pthread_mutex_lock(&g_devlist_mutex);
875 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
876 		if (tmp->io_device == io_device) {
877 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
878 				    io_device, tmp->name, dev->name);
879 			free(dev);
880 			pthread_mutex_unlock(&g_devlist_mutex);
881 			return;
882 		}
883 	}
884 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
885 	pthread_mutex_unlock(&g_devlist_mutex);
886 }
887 
888 static void
889 _finish_unregister(void *arg)
890 {
891 	struct io_device *dev = arg;
892 
893 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
894 		      dev->name, dev->io_device, dev->unregister_thread->name);
895 
896 	dev->unregister_cb(dev->io_device);
897 	free(dev);
898 }
899 
900 static void
901 _spdk_io_device_free(struct io_device *dev)
902 {
903 	if (dev->unregister_cb == NULL) {
904 		free(dev);
905 	} else {
906 		assert(dev->unregister_thread != NULL);
907 		SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
908 			      dev->name, dev->io_device, dev->unregister_thread->name);
909 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
910 	}
911 }
912 
913 void
914 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
915 {
916 	struct io_device *dev;
917 	uint32_t refcnt;
918 	struct spdk_thread *thread;
919 
920 	thread = spdk_get_thread();
921 	if (!thread) {
922 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
923 		assert(false);
924 		return;
925 	}
926 
927 	pthread_mutex_lock(&g_devlist_mutex);
928 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
929 		if (dev->io_device == io_device) {
930 			break;
931 		}
932 	}
933 
934 	if (!dev) {
935 		SPDK_ERRLOG("io_device %p not found\n", io_device);
936 		assert(false);
937 		pthread_mutex_unlock(&g_devlist_mutex);
938 		return;
939 	}
940 
941 	if (dev->for_each_count > 0) {
942 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
943 			    dev->name, io_device, dev->for_each_count);
944 		pthread_mutex_unlock(&g_devlist_mutex);
945 		return;
946 	}
947 
948 	dev->unregister_cb = unregister_cb;
949 	dev->unregistered = true;
950 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
951 	refcnt = dev->refcnt;
952 	dev->unregister_thread = thread;
953 	pthread_mutex_unlock(&g_devlist_mutex);
954 
955 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
956 		      dev->name, dev->io_device, thread->name);
957 
958 	if (refcnt > 0) {
959 		/* defer deletion */
960 		return;
961 	}
962 
963 	_spdk_io_device_free(dev);
964 }
965 
966 struct spdk_io_channel *
967 spdk_get_io_channel(void *io_device)
968 {
969 	struct spdk_io_channel *ch;
970 	struct spdk_thread *thread;
971 	struct io_device *dev;
972 	int rc;
973 
974 	pthread_mutex_lock(&g_devlist_mutex);
975 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
976 		if (dev->io_device == io_device) {
977 			break;
978 		}
979 	}
980 	if (dev == NULL) {
981 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
982 		pthread_mutex_unlock(&g_devlist_mutex);
983 		return NULL;
984 	}
985 
986 	thread = _get_thread();
987 	if (!thread) {
988 		SPDK_ERRLOG("No thread allocated\n");
989 		pthread_mutex_unlock(&g_devlist_mutex);
990 		return NULL;
991 	}
992 
993 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
994 		if (ch->dev == dev) {
995 			ch->ref++;
996 
997 			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
998 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
999 
1000 			/*
1001 			 * An I/O channel already exists for this device on this
1002 			 *  thread, so return it.
1003 			 */
1004 			pthread_mutex_unlock(&g_devlist_mutex);
1005 			return ch;
1006 		}
1007 	}
1008 
1009 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1010 	if (ch == NULL) {
1011 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1012 		pthread_mutex_unlock(&g_devlist_mutex);
1013 		return NULL;
1014 	}
1015 
1016 	ch->dev = dev;
1017 	ch->destroy_cb = dev->destroy_cb;
1018 	ch->thread = thread;
1019 	ch->ref = 1;
1020 	ch->destroy_ref = 0;
1021 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1022 
1023 	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1024 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1025 
1026 	dev->refcnt++;
1027 
1028 	pthread_mutex_unlock(&g_devlist_mutex);
1029 
1030 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1031 	if (rc != 0) {
1032 		pthread_mutex_lock(&g_devlist_mutex);
1033 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1034 		dev->refcnt--;
1035 		free(ch);
1036 		pthread_mutex_unlock(&g_devlist_mutex);
1037 		return NULL;
1038 	}
1039 
1040 	return ch;
1041 }
1042 
1043 static void
1044 _spdk_put_io_channel(void *arg)
1045 {
1046 	struct spdk_io_channel *ch = arg;
1047 	bool do_remove_dev = true;
1048 	struct spdk_thread *thread;
1049 
1050 	thread = spdk_get_thread();
1051 	if (!thread) {
1052 		SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
1053 		assert(false);
1054 		return;
1055 	}
1056 
1057 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1058 		      "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
1059 		      ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
1060 
1061 	assert(ch->thread == thread);
1062 
1063 	ch->destroy_ref--;
1064 
1065 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1066 		/*
1067 		 * Another reference to the associated io_device was requested
1068 		 *  after this message was sent but before it had a chance to
1069 		 *  execute.
1070 		 */
1071 		return;
1072 	}
1073 
1074 	pthread_mutex_lock(&g_devlist_mutex);
1075 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1076 	pthread_mutex_unlock(&g_devlist_mutex);
1077 
1078 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1079 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1080 
1081 	pthread_mutex_lock(&g_devlist_mutex);
1082 	ch->dev->refcnt--;
1083 
1084 	if (!ch->dev->unregistered) {
1085 		do_remove_dev = false;
1086 	}
1087 
1088 	if (ch->dev->refcnt > 0) {
1089 		do_remove_dev = false;
1090 	}
1091 
1092 	pthread_mutex_unlock(&g_devlist_mutex);
1093 
1094 	if (do_remove_dev) {
1095 		_spdk_io_device_free(ch->dev);
1096 	}
1097 	free(ch);
1098 }
1099 
1100 void
1101 spdk_put_io_channel(struct spdk_io_channel *ch)
1102 {
1103 	SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1104 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1105 		      ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
1106 
1107 	ch->ref--;
1108 
1109 	if (ch->ref == 0) {
1110 		ch->destroy_ref++;
1111 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
1112 	}
1113 }
1114 
1115 struct spdk_io_channel *
1116 spdk_io_channel_from_ctx(void *ctx)
1117 {
1118 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1119 }
1120 
1121 struct spdk_thread *
1122 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1123 {
1124 	return ch->thread;
1125 }
1126 
1127 struct spdk_io_channel_iter {
1128 	void *io_device;
1129 	struct io_device *dev;
1130 	spdk_channel_msg fn;
1131 	int status;
1132 	void *ctx;
1133 	struct spdk_io_channel *ch;
1134 
1135 	struct spdk_thread *cur_thread;
1136 
1137 	struct spdk_thread *orig_thread;
1138 	spdk_channel_for_each_cpl cpl;
1139 };
1140 
1141 void *
1142 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1143 {
1144 	return i->io_device;
1145 }
1146 
1147 struct spdk_io_channel *
1148 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1149 {
1150 	return i->ch;
1151 }
1152 
1153 void *
1154 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1155 {
1156 	return i->ctx;
1157 }
1158 
1159 static void
1160 _call_completion(void *ctx)
1161 {
1162 	struct spdk_io_channel_iter *i = ctx;
1163 
1164 	if (i->cpl != NULL) {
1165 		i->cpl(i, i->status);
1166 	}
1167 	free(i);
1168 }
1169 
1170 static void
1171 _call_channel(void *ctx)
1172 {
1173 	struct spdk_io_channel_iter *i = ctx;
1174 	struct spdk_io_channel *ch;
1175 
1176 	/*
1177 	 * It is possible that the channel was deleted before this
1178 	 *  message had a chance to execute.  If so, skip calling
1179 	 *  the fn() on this thread.
1180 	 */
1181 	pthread_mutex_lock(&g_devlist_mutex);
1182 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1183 		if (ch->dev->io_device == i->io_device) {
1184 			break;
1185 		}
1186 	}
1187 	pthread_mutex_unlock(&g_devlist_mutex);
1188 
1189 	if (ch) {
1190 		i->fn(i);
1191 	} else {
1192 		spdk_for_each_channel_continue(i, 0);
1193 	}
1194 }
1195 
1196 void
1197 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1198 		      spdk_channel_for_each_cpl cpl)
1199 {
1200 	struct spdk_thread *thread;
1201 	struct spdk_io_channel *ch;
1202 	struct spdk_io_channel_iter *i;
1203 
1204 	i = calloc(1, sizeof(*i));
1205 	if (!i) {
1206 		SPDK_ERRLOG("Unable to allocate iterator\n");
1207 		return;
1208 	}
1209 
1210 	i->io_device = io_device;
1211 	i->fn = fn;
1212 	i->ctx = ctx;
1213 	i->cpl = cpl;
1214 
1215 	pthread_mutex_lock(&g_devlist_mutex);
1216 	i->orig_thread = _get_thread();
1217 
1218 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1219 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1220 			if (ch->dev->io_device == io_device) {
1221 				ch->dev->for_each_count++;
1222 				i->dev = ch->dev;
1223 				i->cur_thread = thread;
1224 				i->ch = ch;
1225 				pthread_mutex_unlock(&g_devlist_mutex);
1226 				spdk_thread_send_msg(thread, _call_channel, i);
1227 				return;
1228 			}
1229 		}
1230 	}
1231 
1232 	pthread_mutex_unlock(&g_devlist_mutex);
1233 
1234 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1235 }
1236 
1237 void
1238 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1239 {
1240 	struct spdk_thread *thread;
1241 	struct spdk_io_channel *ch;
1242 
1243 	assert(i->cur_thread == spdk_get_thread());
1244 
1245 	i->status = status;
1246 
1247 	pthread_mutex_lock(&g_devlist_mutex);
1248 	if (status) {
1249 		goto end;
1250 	}
1251 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1252 	while (thread) {
1253 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1254 			if (ch->dev->io_device == i->io_device) {
1255 				i->cur_thread = thread;
1256 				i->ch = ch;
1257 				pthread_mutex_unlock(&g_devlist_mutex);
1258 				spdk_thread_send_msg(thread, _call_channel, i);
1259 				return;
1260 			}
1261 		}
1262 		thread = TAILQ_NEXT(thread, tailq);
1263 	}
1264 
1265 end:
1266 	i->dev->for_each_count--;
1267 	i->ch = NULL;
1268 	pthread_mutex_unlock(&g_devlist_mutex);
1269 
1270 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1271 }
1272 
1273 
1274 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)
1275