xref: /spdk/lib/thread/thread.c (revision d9e3ffea488025798a892048c1c3211cbb5a42c6)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/queue.h"
39 #include "spdk/string.h"
40 #include "spdk/thread.h"
41 #include "spdk/util.h"
42 #include "spdk/fd_group.h"
43 
44 #include "spdk/log.h"
45 #include "spdk_internal/thread.h"
46 
47 #ifdef __linux__
48 #include <sys/timerfd.h>
49 #include <sys/eventfd.h>
50 #endif
51 
52 #define SPDK_MSG_BATCH_SIZE		8
53 #define SPDK_MAX_DEVICE_NAME_LEN	256
54 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
55 
56 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
57 
58 static spdk_new_thread_fn g_new_thread_fn = NULL;
59 static spdk_thread_op_fn g_thread_op_fn = NULL;
60 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
61 static size_t g_ctx_sz = 0;
62 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
63  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
64  * SPDK application is required.
65  */
66 static uint64_t g_thread_id = 1;
67 
68 struct io_device {
69 	void				*io_device;
70 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
71 	spdk_io_channel_create_cb	create_cb;
72 	spdk_io_channel_destroy_cb	destroy_cb;
73 	spdk_io_device_unregister_cb	unregister_cb;
74 	struct spdk_thread		*unregister_thread;
75 	uint32_t			ctx_size;
76 	uint32_t			for_each_count;
77 	TAILQ_ENTRY(io_device)		tailq;
78 
79 	uint32_t			refcnt;
80 
81 	bool				unregistered;
82 };
83 
84 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
85 
86 struct spdk_msg {
87 	spdk_msg_fn		fn;
88 	void			*arg;
89 
90 	SLIST_ENTRY(spdk_msg)	link;
91 };
92 
93 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
94 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
95 
96 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
97 static uint32_t g_thread_count = 0;
98 
99 static __thread struct spdk_thread *tls_thread = NULL;
100 
101 static inline struct spdk_thread *
102 _get_thread(void)
103 {
104 	return tls_thread;
105 }
106 
107 static int
108 _thread_lib_init(size_t ctx_sz)
109 {
110 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
111 
112 	g_ctx_sz = ctx_sz;
113 
114 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
115 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
116 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
117 			     sizeof(struct spdk_msg),
118 			     0, /* No cache. We do our own. */
119 			     SPDK_ENV_SOCKET_ID_ANY);
120 
121 	if (!g_spdk_msg_mempool) {
122 		SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
123 		return -1;
124 	}
125 
126 	return 0;
127 }
128 
129 int
130 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
131 {
132 	assert(g_new_thread_fn == NULL);
133 	assert(g_thread_op_fn == NULL);
134 
135 	if (new_thread_fn == NULL) {
136 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
137 	} else {
138 		g_new_thread_fn = new_thread_fn;
139 	}
140 
141 	return _thread_lib_init(ctx_sz);
142 }
143 
144 int
145 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
146 			 spdk_thread_op_supported_fn thread_op_supported_fn,
147 			 size_t ctx_sz)
148 {
149 	assert(g_new_thread_fn == NULL);
150 	assert(g_thread_op_fn == NULL);
151 	assert(g_thread_op_supported_fn == NULL);
152 
153 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
154 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
155 		return -EINVAL;
156 	}
157 
158 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
159 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
160 	} else {
161 		g_thread_op_fn = thread_op_fn;
162 		g_thread_op_supported_fn = thread_op_supported_fn;
163 	}
164 
165 	return _thread_lib_init(ctx_sz);
166 }
167 
168 void
169 spdk_thread_lib_fini(void)
170 {
171 	struct io_device *dev;
172 
173 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
174 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
175 	}
176 
177 	if (g_spdk_msg_mempool) {
178 		spdk_mempool_free(g_spdk_msg_mempool);
179 		g_spdk_msg_mempool = NULL;
180 	}
181 
182 	g_new_thread_fn = NULL;
183 	g_thread_op_fn = NULL;
184 	g_thread_op_supported_fn = NULL;
185 	g_ctx_sz = 0;
186 }
187 
188 static void thread_interrupt_destroy(struct spdk_thread *thread);
189 static int thread_interrupt_create(struct spdk_thread *thread);
190 
191 static void
192 _free_thread(struct spdk_thread *thread)
193 {
194 	struct spdk_io_channel *ch;
195 	struct spdk_msg *msg;
196 	struct spdk_poller *poller, *ptmp;
197 
198 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
199 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
200 			    thread->name, ch->dev->name);
201 	}
202 
203 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
204 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
205 			SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
206 				     poller->name);
207 		}
208 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
209 		free(poller);
210 	}
211 
212 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) {
213 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
214 			SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
215 				     poller->name);
216 		}
217 		TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
218 		free(poller);
219 	}
220 
221 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
222 		SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
223 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
224 		free(poller);
225 	}
226 
227 	pthread_mutex_lock(&g_devlist_mutex);
228 	assert(g_thread_count > 0);
229 	g_thread_count--;
230 	TAILQ_REMOVE(&g_threads, thread, tailq);
231 	pthread_mutex_unlock(&g_devlist_mutex);
232 
233 	msg = SLIST_FIRST(&thread->msg_cache);
234 	while (msg != NULL) {
235 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
236 
237 		assert(thread->msg_cache_count > 0);
238 		thread->msg_cache_count--;
239 		spdk_mempool_put(g_spdk_msg_mempool, msg);
240 
241 		msg = SLIST_FIRST(&thread->msg_cache);
242 	}
243 
244 	assert(thread->msg_cache_count == 0);
245 
246 	if (thread->interrupt_mode) {
247 		thread_interrupt_destroy(thread);
248 	}
249 
250 	spdk_ring_free(thread->messages);
251 	free(thread);
252 }
253 
254 struct spdk_thread *
255 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
256 {
257 	struct spdk_thread *thread;
258 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
259 	int rc = 0, i;
260 
261 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
262 	if (!thread) {
263 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
264 		return NULL;
265 	}
266 
267 	if (cpumask) {
268 		spdk_cpuset_copy(&thread->cpumask, cpumask);
269 	} else {
270 		spdk_cpuset_negate(&thread->cpumask);
271 	}
272 
273 	TAILQ_INIT(&thread->io_channels);
274 	TAILQ_INIT(&thread->active_pollers);
275 	TAILQ_INIT(&thread->timed_pollers);
276 	TAILQ_INIT(&thread->paused_pollers);
277 	SLIST_INIT(&thread->msg_cache);
278 	thread->msg_cache_count = 0;
279 
280 	thread->tsc_last = spdk_get_ticks();
281 
282 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
283 	if (!thread->messages) {
284 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
285 		free(thread);
286 		return NULL;
287 	}
288 
289 	/* Fill the local message pool cache. */
290 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
291 	if (rc == 0) {
292 		/* If we can't populate the cache it's ok. The cache will get filled
293 		 * up organically as messages are passed to the thread. */
294 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
295 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
296 			thread->msg_cache_count++;
297 		}
298 	}
299 
300 	if (name) {
301 		snprintf(thread->name, sizeof(thread->name), "%s", name);
302 	} else {
303 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
304 	}
305 
306 	pthread_mutex_lock(&g_devlist_mutex);
307 	if (g_thread_id == 0) {
308 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
309 		pthread_mutex_unlock(&g_devlist_mutex);
310 		_free_thread(thread);
311 		return NULL;
312 	}
313 	thread->id = g_thread_id++;
314 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
315 	g_thread_count++;
316 	pthread_mutex_unlock(&g_devlist_mutex);
317 
318 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
319 		      thread->id, thread->name);
320 
321 	if (spdk_interrupt_mode_is_enabled()) {
322 		thread->interrupt_mode = true;
323 		rc = thread_interrupt_create(thread);
324 		if (rc != 0) {
325 			_free_thread(thread);
326 			return NULL;
327 		}
328 	}
329 
330 	if (g_new_thread_fn) {
331 		rc = g_new_thread_fn(thread);
332 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
333 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
334 	}
335 
336 	if (rc != 0) {
337 		_free_thread(thread);
338 		return NULL;
339 	}
340 
341 	thread->state = SPDK_THREAD_STATE_RUNNING;
342 
343 	return thread;
344 }
345 
346 void
347 spdk_set_thread(struct spdk_thread *thread)
348 {
349 	tls_thread = thread;
350 }
351 
352 static void
353 thread_exit(struct spdk_thread *thread, uint64_t now)
354 {
355 	struct spdk_poller *poller;
356 	struct spdk_io_channel *ch;
357 
358 	if (now >= thread->exit_timeout_tsc) {
359 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
360 			    thread->name);
361 		goto exited;
362 	}
363 
364 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
365 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
366 			SPDK_INFOLOG(thread,
367 				     "thread %s still has active poller %s\n",
368 				     thread->name, poller->name);
369 			return;
370 		}
371 	}
372 
373 	TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) {
374 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
375 			SPDK_INFOLOG(thread,
376 				     "thread %s still has active timed poller %s\n",
377 				     thread->name, poller->name);
378 			return;
379 		}
380 	}
381 
382 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
383 		SPDK_INFOLOG(thread,
384 			     "thread %s still has paused poller %s\n",
385 			     thread->name, poller->name);
386 		return;
387 	}
388 
389 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
390 		SPDK_INFOLOG(thread,
391 			     "thread %s still has channel for io_device %s\n",
392 			     thread->name, ch->dev->name);
393 		return;
394 	}
395 
396 	if (thread->pending_unregister_count > 0) {
397 		SPDK_INFOLOG(thread,
398 			     "thread %s is still unregistering io_devices\n",
399 			     thread->name);
400 		return;
401 	}
402 
403 exited:
404 	thread->state = SPDK_THREAD_STATE_EXITED;
405 }
406 
407 int
408 spdk_thread_exit(struct spdk_thread *thread)
409 {
410 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
411 
412 	assert(tls_thread == thread);
413 
414 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
415 		SPDK_INFOLOG(thread,
416 			     "thread %s is already exiting\n",
417 			     thread->name);
418 		return 0;
419 	}
420 
421 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
422 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
423 	thread->state = SPDK_THREAD_STATE_EXITING;
424 	return 0;
425 }
426 
427 bool
428 spdk_thread_is_exited(struct spdk_thread *thread)
429 {
430 	return thread->state == SPDK_THREAD_STATE_EXITED;
431 }
432 
433 void
434 spdk_thread_destroy(struct spdk_thread *thread)
435 {
436 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
437 
438 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
439 
440 	if (tls_thread == thread) {
441 		tls_thread = NULL;
442 	}
443 
444 	_free_thread(thread);
445 }
446 
447 void *
448 spdk_thread_get_ctx(struct spdk_thread *thread)
449 {
450 	if (g_ctx_sz > 0) {
451 		return thread->ctx;
452 	}
453 
454 	return NULL;
455 }
456 
457 struct spdk_cpuset *
458 spdk_thread_get_cpumask(struct spdk_thread *thread)
459 {
460 	return &thread->cpumask;
461 }
462 
463 int
464 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
465 {
466 	struct spdk_thread *thread;
467 
468 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
469 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
470 		assert(false);
471 		return -ENOTSUP;
472 	}
473 
474 	thread = spdk_get_thread();
475 	if (!thread) {
476 		SPDK_ERRLOG("Called from non-SPDK thread\n");
477 		assert(false);
478 		return -EINVAL;
479 	}
480 
481 	spdk_cpuset_copy(&thread->cpumask, cpumask);
482 
483 	/* Invoke framework's reschedule operation. If this function is called multiple times
484 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
485 	 * reschedule operation.
486 	 */
487 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
488 
489 	return 0;
490 }
491 
492 struct spdk_thread *
493 spdk_thread_get_from_ctx(void *ctx)
494 {
495 	if (ctx == NULL) {
496 		assert(false);
497 		return NULL;
498 	}
499 
500 	assert(g_ctx_sz > 0);
501 
502 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
503 }
504 
505 static inline uint32_t
506 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
507 {
508 	unsigned count, i;
509 	void *messages[SPDK_MSG_BATCH_SIZE];
510 	uint64_t notify = 1;
511 	int rc;
512 
513 #ifdef DEBUG
514 	/*
515 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
516 	 * so we will never actually read uninitialized data from events, but just to be sure
517 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
518 	 */
519 	memset(messages, 0, sizeof(messages));
520 #endif
521 
522 	if (max_msgs > 0) {
523 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
524 	} else {
525 		max_msgs = SPDK_MSG_BATCH_SIZE;
526 	}
527 	if (thread->interrupt_mode) {
528 		/* There may be race between msg_acknowledge and another producer's msg_notify,
529 		 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
530 		 * This can avoid msg notification missing.
531 		 */
532 		rc = read(thread->msg_fd, &notify, sizeof(notify));
533 		if (rc < 0 && errno != EAGAIN) {
534 			SPDK_ERRLOG("failed to acknowledge msg_queue: %s.\n", spdk_strerror(errno));
535 		}
536 	}
537 
538 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
539 	if (thread->interrupt_mode && spdk_ring_count(thread->messages) != 0) {
540 		rc = write(thread->msg_fd, &notify, sizeof(notify));
541 		if (rc < 0) {
542 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
543 		}
544 	}
545 	if (count == 0) {
546 		return 0;
547 	}
548 
549 	for (i = 0; i < count; i++) {
550 		struct spdk_msg *msg = messages[i];
551 
552 		assert(msg != NULL);
553 		msg->fn(msg->arg);
554 
555 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
556 			/* Insert the messages at the head. We want to re-use the hot
557 			 * ones. */
558 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
559 			thread->msg_cache_count++;
560 		} else {
561 			spdk_mempool_put(g_spdk_msg_mempool, msg);
562 		}
563 	}
564 
565 	return count;
566 }
567 
568 static void
569 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
570 {
571 	struct spdk_poller *iter;
572 
573 	poller->next_run_tick = now + poller->period_ticks;
574 
575 	/*
576 	 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled
577 	 * run time.
578 	 */
579 	TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) {
580 		if (iter->next_run_tick <= poller->next_run_tick) {
581 			TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq);
582 			return;
583 		}
584 	}
585 
586 	/* No earlier pollers were found, so this poller must be the new head */
587 	TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq);
588 }
589 
590 static void
591 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
592 {
593 	if (poller->period_ticks) {
594 		poller_insert_timer(thread, poller, spdk_get_ticks());
595 	} else {
596 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
597 	}
598 }
599 
600 static inline void
601 thread_update_stats(struct spdk_thread *thread, uint64_t end,
602 		    uint64_t start, int rc)
603 {
604 	if (rc == 0) {
605 		/* Poller status idle */
606 		thread->stats.idle_tsc += end - start;
607 	} else if (rc > 0) {
608 		/* Poller status busy */
609 		thread->stats.busy_tsc += end - start;
610 	}
611 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
612 	thread->tsc_last = end;
613 }
614 
615 static int
616 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
617 {
618 	uint32_t msg_count;
619 	struct spdk_poller *poller, *tmp;
620 	spdk_msg_fn critical_msg;
621 	int rc = 0;
622 
623 	thread->tsc_last = now;
624 
625 	critical_msg = thread->critical_msg;
626 	if (spdk_unlikely(critical_msg != NULL)) {
627 		critical_msg(NULL);
628 		thread->critical_msg = NULL;
629 		rc = 1;
630 	}
631 
632 	msg_count = msg_queue_run_batch(thread, max_msgs);
633 	if (msg_count) {
634 		rc = 1;
635 	}
636 
637 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
638 				   active_pollers_head, tailq, tmp) {
639 		int poller_rc;
640 
641 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
642 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
643 			free(poller);
644 			continue;
645 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
646 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
647 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
648 			poller->state = SPDK_POLLER_STATE_PAUSED;
649 			continue;
650 		}
651 
652 		poller->state = SPDK_POLLER_STATE_RUNNING;
653 		poller_rc = poller->fn(poller->arg);
654 
655 		poller->run_count++;
656 		if (poller_rc > 0) {
657 			poller->busy_count++;
658 		}
659 
660 #ifdef DEBUG
661 		if (poller_rc == -1) {
662 			SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
663 		}
664 #endif
665 
666 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
667 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
668 			free(poller);
669 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
670 			poller->state = SPDK_POLLER_STATE_WAITING;
671 		}
672 
673 		if (poller_rc > rc) {
674 			rc = poller_rc;
675 		}
676 	}
677 
678 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) {
679 		int timer_rc = 0;
680 
681 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
682 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
683 			free(poller);
684 			continue;
685 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
686 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
687 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
688 			poller->state = SPDK_POLLER_STATE_PAUSED;
689 			continue;
690 		}
691 
692 		if (now < poller->next_run_tick) {
693 			break;
694 		}
695 
696 		poller->state = SPDK_POLLER_STATE_RUNNING;
697 		timer_rc = poller->fn(poller->arg);
698 
699 		poller->run_count++;
700 		if (timer_rc > 0) {
701 			poller->busy_count++;
702 		}
703 
704 #ifdef DEBUG
705 		if (timer_rc == -1) {
706 			SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
707 		}
708 #endif
709 
710 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
711 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
712 			free(poller);
713 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
714 			poller->state = SPDK_POLLER_STATE_WAITING;
715 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
716 			poller_insert_timer(thread, poller, now);
717 		}
718 
719 		if (timer_rc > rc) {
720 			rc = timer_rc;
721 		}
722 	}
723 
724 	return rc;
725 }
726 
727 int
728 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
729 {
730 	struct spdk_thread *orig_thread;
731 	int rc;
732 
733 	orig_thread = _get_thread();
734 	tls_thread = thread;
735 
736 	if (now == 0) {
737 		now = spdk_get_ticks();
738 	}
739 
740 	if (!thread->interrupt_mode) {
741 		rc = thread_poll(thread, max_msgs, now);
742 	} else {
743 		/* Non-block wait on thread's fd_group */
744 		rc = spdk_fd_group_wait(thread->fgrp, 0);
745 	}
746 
747 
748 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
749 		thread_exit(thread, now);
750 	}
751 
752 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
753 
754 	tls_thread = orig_thread;
755 
756 	return rc;
757 }
758 
759 uint64_t
760 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
761 {
762 	struct spdk_poller *poller;
763 
764 	poller = TAILQ_FIRST(&thread->timed_pollers);
765 	if (poller) {
766 		return poller->next_run_tick;
767 	}
768 
769 	return 0;
770 }
771 
772 int
773 spdk_thread_has_active_pollers(struct spdk_thread *thread)
774 {
775 	return !TAILQ_EMPTY(&thread->active_pollers);
776 }
777 
778 static bool
779 thread_has_unpaused_pollers(struct spdk_thread *thread)
780 {
781 	if (TAILQ_EMPTY(&thread->active_pollers) &&
782 	    TAILQ_EMPTY(&thread->timed_pollers)) {
783 		return false;
784 	}
785 
786 	return true;
787 }
788 
789 bool
790 spdk_thread_has_pollers(struct spdk_thread *thread)
791 {
792 	if (!thread_has_unpaused_pollers(thread) &&
793 	    TAILQ_EMPTY(&thread->paused_pollers)) {
794 		return false;
795 	}
796 
797 	return true;
798 }
799 
800 bool
801 spdk_thread_is_idle(struct spdk_thread *thread)
802 {
803 	if (spdk_ring_count(thread->messages) ||
804 	    thread_has_unpaused_pollers(thread) ||
805 	    thread->critical_msg != NULL) {
806 		return false;
807 	}
808 
809 	return true;
810 }
811 
812 uint32_t
813 spdk_thread_get_count(void)
814 {
815 	/*
816 	 * Return cached value of the current thread count.  We could acquire the
817 	 *  lock and iterate through the TAILQ of threads to count them, but that
818 	 *  count could still be invalidated after we release the lock.
819 	 */
820 	return g_thread_count;
821 }
822 
823 struct spdk_thread *
824 spdk_get_thread(void)
825 {
826 	return _get_thread();
827 }
828 
829 const char *
830 spdk_thread_get_name(const struct spdk_thread *thread)
831 {
832 	return thread->name;
833 }
834 
835 uint64_t
836 spdk_thread_get_id(const struct spdk_thread *thread)
837 {
838 	return thread->id;
839 }
840 
841 struct spdk_thread *
842 spdk_thread_get_by_id(uint64_t id)
843 {
844 	struct spdk_thread *thread;
845 
846 	if (id == 0 || id >= g_thread_id) {
847 		SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
848 		return NULL;
849 	}
850 	pthread_mutex_lock(&g_devlist_mutex);
851 	TAILQ_FOREACH(thread, &g_threads, tailq) {
852 		if (thread->id == id) {
853 			break;
854 		}
855 	}
856 	pthread_mutex_unlock(&g_devlist_mutex);
857 	return thread;
858 }
859 
860 int
861 spdk_thread_get_stats(struct spdk_thread_stats *stats)
862 {
863 	struct spdk_thread *thread;
864 
865 	thread = _get_thread();
866 	if (!thread) {
867 		SPDK_ERRLOG("No thread allocated\n");
868 		return -EINVAL;
869 	}
870 
871 	if (stats == NULL) {
872 		return -EINVAL;
873 	}
874 
875 	*stats = thread->stats;
876 
877 	return 0;
878 }
879 
880 uint64_t
881 spdk_thread_get_last_tsc(struct spdk_thread *thread)
882 {
883 	if (thread == NULL) {
884 		thread = _get_thread();
885 	}
886 
887 	return thread->tsc_last;
888 }
889 
890 int
891 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
892 {
893 	struct spdk_thread *local_thread;
894 	struct spdk_msg *msg;
895 	int rc;
896 
897 	assert(thread != NULL);
898 
899 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
900 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
901 		return -EIO;
902 	}
903 
904 	local_thread = _get_thread();
905 
906 	msg = NULL;
907 	if (local_thread != NULL) {
908 		if (local_thread->msg_cache_count > 0) {
909 			msg = SLIST_FIRST(&local_thread->msg_cache);
910 			assert(msg != NULL);
911 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
912 			local_thread->msg_cache_count--;
913 		}
914 	}
915 
916 	if (msg == NULL) {
917 		msg = spdk_mempool_get(g_spdk_msg_mempool);
918 		if (!msg) {
919 			SPDK_ERRLOG("msg could not be allocated\n");
920 			return -ENOMEM;
921 		}
922 	}
923 
924 	msg->fn = fn;
925 	msg->arg = ctx;
926 
927 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
928 	if (rc != 1) {
929 		SPDK_ERRLOG("msg could not be enqueued\n");
930 		spdk_mempool_put(g_spdk_msg_mempool, msg);
931 		return -EIO;
932 	}
933 
934 	if (thread->interrupt_mode) {
935 		uint64_t notify = 1;
936 
937 		rc = write(thread->msg_fd, &notify, sizeof(notify));
938 		if (rc < 0) {
939 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
940 			return -EIO;
941 		}
942 	}
943 
944 	return 0;
945 }
946 
947 int
948 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
949 {
950 	spdk_msg_fn expected = NULL;
951 
952 	if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
953 					__ATOMIC_SEQ_CST)) {
954 		if (thread->interrupt_mode) {
955 			uint64_t notify = 1;
956 			int rc;
957 
958 			rc = write(thread->msg_fd, &notify, sizeof(notify));
959 			if (rc < 0) {
960 				SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
961 				return -EIO;
962 			}
963 		}
964 
965 		return 0;
966 	}
967 
968 	return -EIO;
969 }
970 
971 #ifdef __linux__
972 static int
973 interrupt_timerfd_prepare(uint64_t period_microseconds)
974 {
975 	int timerfd;
976 	int ret;
977 	struct itimerspec new_tv;
978 	uint64_t period_seconds;
979 	uint64_t period_nanoseconds;
980 
981 	if (period_microseconds == 0) {
982 		return -EINVAL;
983 	}
984 
985 	period_seconds = period_microseconds / SPDK_SEC_TO_USEC;
986 	period_nanoseconds = period_microseconds % SPDK_SEC_TO_USEC * 1000;
987 
988 	new_tv.it_value.tv_sec = period_seconds;
989 	new_tv.it_value.tv_nsec = period_nanoseconds;
990 
991 	new_tv.it_interval.tv_sec = period_seconds;
992 	new_tv.it_interval.tv_nsec = period_nanoseconds;
993 
994 	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK & TFD_CLOEXEC);
995 	if (timerfd < 0) {
996 		return -errno;
997 	}
998 
999 	ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
1000 	if (ret < 0) {
1001 		close(timerfd);
1002 		return -errno;
1003 	}
1004 
1005 	return timerfd;
1006 }
1007 #else
1008 static int
1009 interrupt_timerfd_prepare(uint64_t period_microseconds)
1010 {
1011 	return -ENOTSUP;
1012 }
1013 #endif
1014 
1015 static int
1016 interrupt_timerfd_process(void *arg)
1017 {
1018 	struct spdk_poller *poller = arg;
1019 	uint64_t exp;
1020 	int rc;
1021 
1022 	/* clear the level of interval timer */
1023 	rc = read(poller->timerfd, &exp, sizeof(exp));
1024 	if (rc < 0) {
1025 		if (rc == -EAGAIN) {
1026 			return 0;
1027 		}
1028 
1029 		return rc;
1030 	}
1031 
1032 	return poller->fn(poller->arg);
1033 }
1034 
1035 static int
1036 thread_interrupt_register_timerfd(struct spdk_fd_group *fgrp,
1037 				  uint64_t period_microseconds,
1038 				  struct spdk_poller *poller)
1039 {
1040 	int timerfd;
1041 	int rc;
1042 
1043 	timerfd = interrupt_timerfd_prepare(period_microseconds);
1044 	if (timerfd < 0) {
1045 		return timerfd;
1046 	}
1047 
1048 	rc = spdk_fd_group_add(fgrp, timerfd,
1049 			       interrupt_timerfd_process, poller);
1050 	if (rc < 0) {
1051 		close(timerfd);
1052 		return rc;
1053 	}
1054 
1055 	return timerfd;
1056 }
1057 
1058 static struct spdk_poller *
1059 poller_register(spdk_poller_fn fn,
1060 		void *arg,
1061 		uint64_t period_microseconds,
1062 		const char *name)
1063 {
1064 	struct spdk_thread *thread;
1065 	struct spdk_poller *poller;
1066 	uint64_t quotient, remainder, ticks;
1067 
1068 	thread = spdk_get_thread();
1069 	if (!thread) {
1070 		assert(false);
1071 		return NULL;
1072 	}
1073 
1074 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1075 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1076 		return NULL;
1077 	}
1078 
1079 	poller = calloc(1, sizeof(*poller));
1080 	if (poller == NULL) {
1081 		SPDK_ERRLOG("Poller memory allocation failed\n");
1082 		return NULL;
1083 	}
1084 
1085 	if (name) {
1086 		snprintf(poller->name, sizeof(poller->name), "%s", name);
1087 	} else {
1088 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
1089 	}
1090 
1091 	poller->state = SPDK_POLLER_STATE_WAITING;
1092 	poller->fn = fn;
1093 	poller->arg = arg;
1094 	poller->thread = thread;
1095 
1096 	if (period_microseconds) {
1097 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
1098 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
1099 		ticks = spdk_get_ticks_hz();
1100 
1101 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
1102 	} else {
1103 		poller->period_ticks = 0;
1104 	}
1105 
1106 	if (thread->interrupt_mode && period_microseconds != 0) {
1107 		int rc;
1108 
1109 		poller->timerfd = -1;
1110 		rc = thread_interrupt_register_timerfd(thread->fgrp, period_microseconds, poller);
1111 		if (rc < 0) {
1112 			SPDK_ERRLOG("Failed to register timerfd for periodic poller: %s\n", spdk_strerror(-rc));
1113 			free(poller);
1114 			return NULL;
1115 		}
1116 		poller->timerfd = rc;
1117 	}
1118 
1119 	thread_insert_poller(thread, poller);
1120 
1121 	return poller;
1122 }
1123 
1124 struct spdk_poller *
1125 spdk_poller_register(spdk_poller_fn fn,
1126 		     void *arg,
1127 		     uint64_t period_microseconds)
1128 {
1129 	return poller_register(fn, arg, period_microseconds, NULL);
1130 }
1131 
1132 struct spdk_poller *
1133 spdk_poller_register_named(spdk_poller_fn fn,
1134 			   void *arg,
1135 			   uint64_t period_microseconds,
1136 			   const char *name)
1137 {
1138 	return poller_register(fn, arg, period_microseconds, name);
1139 }
1140 
1141 void
1142 spdk_poller_unregister(struct spdk_poller **ppoller)
1143 {
1144 	struct spdk_thread *thread;
1145 	struct spdk_poller *poller;
1146 
1147 	poller = *ppoller;
1148 	if (poller == NULL) {
1149 		return;
1150 	}
1151 
1152 	*ppoller = NULL;
1153 
1154 	thread = spdk_get_thread();
1155 	if (!thread) {
1156 		assert(false);
1157 		return;
1158 	}
1159 
1160 	if (poller->thread != thread) {
1161 		SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n");
1162 		assert(false);
1163 		return;
1164 	}
1165 
1166 	if (thread->interrupt_mode && poller->timerfd >= 0) {
1167 		spdk_fd_group_remove(thread->fgrp, poller->timerfd);
1168 		close(poller->timerfd);
1169 		poller->timerfd = -1;
1170 	}
1171 
1172 	/* If the poller was paused, put it on the active_pollers list so that
1173 	 * its unregistration can be processed by spdk_thread_poll().
1174 	 */
1175 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1176 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1177 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1178 		poller->period_ticks = 0;
1179 	}
1180 
1181 	/* Simply set the state to unregistered. The poller will get cleaned up
1182 	 * in a subsequent call to spdk_thread_poll().
1183 	 */
1184 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1185 }
1186 
1187 void
1188 spdk_poller_pause(struct spdk_poller *poller)
1189 {
1190 	struct spdk_thread *thread;
1191 
1192 	if (poller->state == SPDK_POLLER_STATE_PAUSED ||
1193 	    poller->state == SPDK_POLLER_STATE_PAUSING) {
1194 		return;
1195 	}
1196 
1197 	thread = spdk_get_thread();
1198 	if (!thread) {
1199 		assert(false);
1200 		return;
1201 	}
1202 
1203 	/* If a poller is paused from within itself, we can immediately move it
1204 	 * on the paused_pollers list.  Otherwise we just set its state to
1205 	 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it.  It
1206 	 * allows a poller to be paused from another one's context without
1207 	 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration.
1208 	 */
1209 	if (poller->state != SPDK_POLLER_STATE_RUNNING) {
1210 		poller->state = SPDK_POLLER_STATE_PAUSING;
1211 	} else {
1212 		if (poller->period_ticks > 0) {
1213 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
1214 		} else {
1215 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1216 		}
1217 
1218 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1219 		poller->state = SPDK_POLLER_STATE_PAUSED;
1220 	}
1221 }
1222 
1223 void
1224 spdk_poller_resume(struct spdk_poller *poller)
1225 {
1226 	struct spdk_thread *thread;
1227 
1228 	if (poller->state != SPDK_POLLER_STATE_PAUSED &&
1229 	    poller->state != SPDK_POLLER_STATE_PAUSING) {
1230 		return;
1231 	}
1232 
1233 	thread = spdk_get_thread();
1234 	if (!thread) {
1235 		assert(false);
1236 		return;
1237 	}
1238 
1239 	/* If a poller is paused it has to be removed from the paused pollers
1240 	 * list and put on the active / timer list depending on its
1241 	 * period_ticks.  If a poller is still in the process of being paused,
1242 	 * we just need to flip its state back to waiting, as it's already on
1243 	 * the appropriate list.
1244 	 */
1245 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1246 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1247 		thread_insert_poller(thread, poller);
1248 	}
1249 
1250 	poller->state = SPDK_POLLER_STATE_WAITING;
1251 }
1252 
1253 const char *
1254 spdk_poller_state_str(enum spdk_poller_state state)
1255 {
1256 	switch (state) {
1257 	case SPDK_POLLER_STATE_WAITING:
1258 		return "waiting";
1259 	case SPDK_POLLER_STATE_RUNNING:
1260 		return "running";
1261 	case SPDK_POLLER_STATE_UNREGISTERED:
1262 		return "unregistered";
1263 	case SPDK_POLLER_STATE_PAUSING:
1264 		return "pausing";
1265 	case SPDK_POLLER_STATE_PAUSED:
1266 		return "paused";
1267 	default:
1268 		return NULL;
1269 	}
1270 }
1271 
1272 struct call_thread {
1273 	struct spdk_thread *cur_thread;
1274 	spdk_msg_fn fn;
1275 	void *ctx;
1276 
1277 	struct spdk_thread *orig_thread;
1278 	spdk_msg_fn cpl;
1279 };
1280 
1281 static void
1282 _on_thread(void *ctx)
1283 {
1284 	struct call_thread *ct = ctx;
1285 	int rc __attribute__((unused));
1286 
1287 	ct->fn(ct->ctx);
1288 
1289 	pthread_mutex_lock(&g_devlist_mutex);
1290 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1291 	pthread_mutex_unlock(&g_devlist_mutex);
1292 
1293 	if (!ct->cur_thread) {
1294 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1295 
1296 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1297 		free(ctx);
1298 	} else {
1299 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1300 			      ct->cur_thread->name);
1301 
1302 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1303 	}
1304 	assert(rc == 0);
1305 }
1306 
1307 void
1308 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1309 {
1310 	struct call_thread *ct;
1311 	struct spdk_thread *thread;
1312 	int rc __attribute__((unused));
1313 
1314 	ct = calloc(1, sizeof(*ct));
1315 	if (!ct) {
1316 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1317 		cpl(ctx);
1318 		return;
1319 	}
1320 
1321 	ct->fn = fn;
1322 	ct->ctx = ctx;
1323 	ct->cpl = cpl;
1324 
1325 	thread = _get_thread();
1326 	if (!thread) {
1327 		SPDK_ERRLOG("No thread allocated\n");
1328 		free(ct);
1329 		cpl(ctx);
1330 		return;
1331 	}
1332 	ct->orig_thread = thread;
1333 
1334 	pthread_mutex_lock(&g_devlist_mutex);
1335 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1336 	pthread_mutex_unlock(&g_devlist_mutex);
1337 
1338 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
1339 		      ct->orig_thread->name);
1340 
1341 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1342 	assert(rc == 0);
1343 }
1344 
1345 void
1346 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1347 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1348 			const char *name)
1349 {
1350 	struct io_device *dev, *tmp;
1351 	struct spdk_thread *thread;
1352 
1353 	assert(io_device != NULL);
1354 	assert(create_cb != NULL);
1355 	assert(destroy_cb != NULL);
1356 
1357 	thread = spdk_get_thread();
1358 	if (!thread) {
1359 		SPDK_ERRLOG("called from non-SPDK thread\n");
1360 		assert(false);
1361 		return;
1362 	}
1363 
1364 	dev = calloc(1, sizeof(struct io_device));
1365 	if (dev == NULL) {
1366 		SPDK_ERRLOG("could not allocate io_device\n");
1367 		return;
1368 	}
1369 
1370 	dev->io_device = io_device;
1371 	if (name) {
1372 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1373 	} else {
1374 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1375 	}
1376 	dev->create_cb = create_cb;
1377 	dev->destroy_cb = destroy_cb;
1378 	dev->unregister_cb = NULL;
1379 	dev->ctx_size = ctx_size;
1380 	dev->for_each_count = 0;
1381 	dev->unregistered = false;
1382 	dev->refcnt = 0;
1383 
1384 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
1385 		      dev->name, dev->io_device, thread->name);
1386 
1387 	pthread_mutex_lock(&g_devlist_mutex);
1388 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
1389 		if (tmp->io_device == io_device) {
1390 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1391 				    io_device, tmp->name, dev->name);
1392 			free(dev);
1393 			pthread_mutex_unlock(&g_devlist_mutex);
1394 			return;
1395 		}
1396 	}
1397 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1398 	pthread_mutex_unlock(&g_devlist_mutex);
1399 }
1400 
1401 static void
1402 _finish_unregister(void *arg)
1403 {
1404 	struct io_device *dev = arg;
1405 	struct spdk_thread *thread;
1406 
1407 	thread = spdk_get_thread();
1408 	assert(thread == dev->unregister_thread);
1409 
1410 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1411 		      dev->name, dev->io_device, thread->name);
1412 
1413 	assert(thread->pending_unregister_count > 0);
1414 	thread->pending_unregister_count--;
1415 
1416 	dev->unregister_cb(dev->io_device);
1417 	free(dev);
1418 }
1419 
1420 static void
1421 io_device_free(struct io_device *dev)
1422 {
1423 	int rc __attribute__((unused));
1424 
1425 	if (dev->unregister_cb == NULL) {
1426 		free(dev);
1427 	} else {
1428 		assert(dev->unregister_thread != NULL);
1429 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
1430 			      dev->name, dev->io_device, dev->unregister_thread->name);
1431 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1432 		assert(rc == 0);
1433 	}
1434 }
1435 
1436 void
1437 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1438 {
1439 	struct io_device *dev;
1440 	uint32_t refcnt;
1441 	struct spdk_thread *thread;
1442 
1443 	thread = spdk_get_thread();
1444 	if (!thread) {
1445 		SPDK_ERRLOG("called from non-SPDK thread\n");
1446 		assert(false);
1447 		return;
1448 	}
1449 
1450 	pthread_mutex_lock(&g_devlist_mutex);
1451 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1452 		if (dev->io_device == io_device) {
1453 			break;
1454 		}
1455 	}
1456 
1457 	if (!dev) {
1458 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1459 		assert(false);
1460 		pthread_mutex_unlock(&g_devlist_mutex);
1461 		return;
1462 	}
1463 
1464 	if (dev->for_each_count > 0) {
1465 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1466 			    dev->name, io_device, dev->for_each_count);
1467 		pthread_mutex_unlock(&g_devlist_mutex);
1468 		return;
1469 	}
1470 
1471 	dev->unregister_cb = unregister_cb;
1472 	dev->unregistered = true;
1473 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
1474 	refcnt = dev->refcnt;
1475 	dev->unregister_thread = thread;
1476 	pthread_mutex_unlock(&g_devlist_mutex);
1477 
1478 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
1479 		      dev->name, dev->io_device, thread->name);
1480 
1481 	if (unregister_cb) {
1482 		thread->pending_unregister_count++;
1483 	}
1484 
1485 	if (refcnt > 0) {
1486 		/* defer deletion */
1487 		return;
1488 	}
1489 
1490 	io_device_free(dev);
1491 }
1492 
1493 const char *
1494 spdk_io_device_get_name(struct io_device *dev)
1495 {
1496 	return dev->name;
1497 }
1498 
1499 struct spdk_io_channel *
1500 spdk_get_io_channel(void *io_device)
1501 {
1502 	struct spdk_io_channel *ch;
1503 	struct spdk_thread *thread;
1504 	struct io_device *dev;
1505 	int rc;
1506 
1507 	pthread_mutex_lock(&g_devlist_mutex);
1508 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1509 		if (dev->io_device == io_device) {
1510 			break;
1511 		}
1512 	}
1513 	if (dev == NULL) {
1514 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
1515 		pthread_mutex_unlock(&g_devlist_mutex);
1516 		return NULL;
1517 	}
1518 
1519 	thread = _get_thread();
1520 	if (!thread) {
1521 		SPDK_ERRLOG("No thread allocated\n");
1522 		pthread_mutex_unlock(&g_devlist_mutex);
1523 		return NULL;
1524 	}
1525 
1526 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1527 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
1528 		pthread_mutex_unlock(&g_devlist_mutex);
1529 		return NULL;
1530 	}
1531 
1532 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1533 		if (ch->dev == dev) {
1534 			ch->ref++;
1535 
1536 			SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1537 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
1538 
1539 			/*
1540 			 * An I/O channel already exists for this device on this
1541 			 *  thread, so return it.
1542 			 */
1543 			pthread_mutex_unlock(&g_devlist_mutex);
1544 			return ch;
1545 		}
1546 	}
1547 
1548 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1549 	if (ch == NULL) {
1550 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1551 		pthread_mutex_unlock(&g_devlist_mutex);
1552 		return NULL;
1553 	}
1554 
1555 	ch->dev = dev;
1556 	ch->destroy_cb = dev->destroy_cb;
1557 	ch->thread = thread;
1558 	ch->ref = 1;
1559 	ch->destroy_ref = 0;
1560 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1561 
1562 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1563 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1564 
1565 	dev->refcnt++;
1566 
1567 	pthread_mutex_unlock(&g_devlist_mutex);
1568 
1569 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1570 	if (rc != 0) {
1571 		pthread_mutex_lock(&g_devlist_mutex);
1572 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1573 		dev->refcnt--;
1574 		free(ch);
1575 		pthread_mutex_unlock(&g_devlist_mutex);
1576 		return NULL;
1577 	}
1578 
1579 	return ch;
1580 }
1581 
1582 static void
1583 put_io_channel(void *arg)
1584 {
1585 	struct spdk_io_channel *ch = arg;
1586 	bool do_remove_dev = true;
1587 	struct spdk_thread *thread;
1588 
1589 	thread = spdk_get_thread();
1590 	if (!thread) {
1591 		SPDK_ERRLOG("called from non-SPDK thread\n");
1592 		assert(false);
1593 		return;
1594 	}
1595 
1596 	SPDK_DEBUGLOG(thread,
1597 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
1598 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
1599 
1600 	assert(ch->thread == thread);
1601 
1602 	ch->destroy_ref--;
1603 
1604 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1605 		/*
1606 		 * Another reference to the associated io_device was requested
1607 		 *  after this message was sent but before it had a chance to
1608 		 *  execute.
1609 		 */
1610 		return;
1611 	}
1612 
1613 	pthread_mutex_lock(&g_devlist_mutex);
1614 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1615 	pthread_mutex_unlock(&g_devlist_mutex);
1616 
1617 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1618 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1619 
1620 	pthread_mutex_lock(&g_devlist_mutex);
1621 	ch->dev->refcnt--;
1622 
1623 	if (!ch->dev->unregistered) {
1624 		do_remove_dev = false;
1625 	}
1626 
1627 	if (ch->dev->refcnt > 0) {
1628 		do_remove_dev = false;
1629 	}
1630 
1631 	pthread_mutex_unlock(&g_devlist_mutex);
1632 
1633 	if (do_remove_dev) {
1634 		io_device_free(ch->dev);
1635 	}
1636 	free(ch);
1637 }
1638 
1639 void
1640 spdk_put_io_channel(struct spdk_io_channel *ch)
1641 {
1642 	struct spdk_thread *thread;
1643 	int rc __attribute__((unused));
1644 
1645 	thread = spdk_get_thread();
1646 	if (!thread) {
1647 		SPDK_ERRLOG("called from non-SPDK thread\n");
1648 		assert(false);
1649 		return;
1650 	}
1651 
1652 	if (ch->thread != thread) {
1653 		SPDK_ERRLOG("different from the thread that called get_io_channel()\n");
1654 		assert(false);
1655 		return;
1656 	}
1657 
1658 	SPDK_DEBUGLOG(thread,
1659 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1660 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
1661 
1662 	ch->ref--;
1663 
1664 	if (ch->ref == 0) {
1665 		ch->destroy_ref++;
1666 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
1667 		assert(rc == 0);
1668 	}
1669 }
1670 
1671 struct spdk_io_channel *
1672 spdk_io_channel_from_ctx(void *ctx)
1673 {
1674 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1675 }
1676 
1677 struct spdk_thread *
1678 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1679 {
1680 	return ch->thread;
1681 }
1682 
1683 void *
1684 spdk_io_channel_get_io_device(struct spdk_io_channel *ch)
1685 {
1686 	return ch->dev->io_device;
1687 }
1688 
1689 struct spdk_io_channel_iter {
1690 	void *io_device;
1691 	struct io_device *dev;
1692 	spdk_channel_msg fn;
1693 	int status;
1694 	void *ctx;
1695 	struct spdk_io_channel *ch;
1696 
1697 	struct spdk_thread *cur_thread;
1698 
1699 	struct spdk_thread *orig_thread;
1700 	spdk_channel_for_each_cpl cpl;
1701 };
1702 
1703 void *
1704 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1705 {
1706 	return i->io_device;
1707 }
1708 
1709 struct spdk_io_channel *
1710 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1711 {
1712 	return i->ch;
1713 }
1714 
1715 void *
1716 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1717 {
1718 	return i->ctx;
1719 }
1720 
1721 static void
1722 _call_completion(void *ctx)
1723 {
1724 	struct spdk_io_channel_iter *i = ctx;
1725 
1726 	if (i->cpl != NULL) {
1727 		i->cpl(i, i->status);
1728 	}
1729 	free(i);
1730 }
1731 
1732 static void
1733 _call_channel(void *ctx)
1734 {
1735 	struct spdk_io_channel_iter *i = ctx;
1736 	struct spdk_io_channel *ch;
1737 
1738 	/*
1739 	 * It is possible that the channel was deleted before this
1740 	 *  message had a chance to execute.  If so, skip calling
1741 	 *  the fn() on this thread.
1742 	 */
1743 	pthread_mutex_lock(&g_devlist_mutex);
1744 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1745 		if (ch->dev->io_device == i->io_device) {
1746 			break;
1747 		}
1748 	}
1749 	pthread_mutex_unlock(&g_devlist_mutex);
1750 
1751 	if (ch) {
1752 		i->fn(i);
1753 	} else {
1754 		spdk_for_each_channel_continue(i, 0);
1755 	}
1756 }
1757 
1758 void
1759 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1760 		      spdk_channel_for_each_cpl cpl)
1761 {
1762 	struct spdk_thread *thread;
1763 	struct spdk_io_channel *ch;
1764 	struct spdk_io_channel_iter *i;
1765 	int rc __attribute__((unused));
1766 
1767 	i = calloc(1, sizeof(*i));
1768 	if (!i) {
1769 		SPDK_ERRLOG("Unable to allocate iterator\n");
1770 		return;
1771 	}
1772 
1773 	i->io_device = io_device;
1774 	i->fn = fn;
1775 	i->ctx = ctx;
1776 	i->cpl = cpl;
1777 
1778 	pthread_mutex_lock(&g_devlist_mutex);
1779 	i->orig_thread = _get_thread();
1780 
1781 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1782 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1783 			if (ch->dev->io_device == io_device) {
1784 				ch->dev->for_each_count++;
1785 				i->dev = ch->dev;
1786 				i->cur_thread = thread;
1787 				i->ch = ch;
1788 				pthread_mutex_unlock(&g_devlist_mutex);
1789 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1790 				assert(rc == 0);
1791 				return;
1792 			}
1793 		}
1794 	}
1795 
1796 	pthread_mutex_unlock(&g_devlist_mutex);
1797 
1798 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1799 	assert(rc == 0);
1800 }
1801 
1802 void
1803 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1804 {
1805 	struct spdk_thread *thread;
1806 	struct spdk_io_channel *ch;
1807 	int rc __attribute__((unused));
1808 
1809 	assert(i->cur_thread == spdk_get_thread());
1810 
1811 	i->status = status;
1812 
1813 	pthread_mutex_lock(&g_devlist_mutex);
1814 	if (status) {
1815 		goto end;
1816 	}
1817 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1818 	while (thread) {
1819 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1820 			if (ch->dev->io_device == i->io_device) {
1821 				i->cur_thread = thread;
1822 				i->ch = ch;
1823 				pthread_mutex_unlock(&g_devlist_mutex);
1824 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1825 				assert(rc == 0);
1826 				return;
1827 			}
1828 		}
1829 		thread = TAILQ_NEXT(thread, tailq);
1830 	}
1831 
1832 end:
1833 	i->dev->for_each_count--;
1834 	i->ch = NULL;
1835 	pthread_mutex_unlock(&g_devlist_mutex);
1836 
1837 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1838 	assert(rc == 0);
1839 }
1840 
1841 struct spdk_interrupt {
1842 	int			efd;
1843 	struct spdk_thread	*thread;
1844 	char			name[SPDK_MAX_POLLER_NAME_LEN + 1];
1845 };
1846 
1847 static void
1848 thread_interrupt_destroy(struct spdk_thread *thread)
1849 {
1850 	struct spdk_fd_group *fgrp = thread->fgrp;
1851 
1852 	SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
1853 
1854 	if (thread->msg_fd < 0) {
1855 		return;
1856 	}
1857 
1858 	spdk_fd_group_remove(fgrp, thread->msg_fd);
1859 	close(thread->msg_fd);
1860 	thread->msg_fd = -1;
1861 
1862 	spdk_fd_group_destroy(fgrp);
1863 	thread->fgrp = NULL;
1864 }
1865 
1866 #ifdef __linux__
1867 static int
1868 thread_interrupt_msg_process(void *arg)
1869 {
1870 	struct spdk_thread *thread = arg;
1871 	uint32_t msg_count;
1872 	spdk_msg_fn critical_msg;
1873 	int rc = 0;
1874 
1875 	critical_msg = thread->critical_msg;
1876 	if (spdk_unlikely(critical_msg != NULL)) {
1877 		critical_msg(NULL);
1878 		thread->critical_msg = NULL;
1879 		rc = 1;
1880 	}
1881 
1882 	msg_count = msg_queue_run_batch(thread, 0);
1883 	if (msg_count) {
1884 		rc = 1;
1885 	}
1886 
1887 	return rc;
1888 }
1889 
1890 static int
1891 thread_interrupt_create(struct spdk_thread *thread)
1892 {
1893 	int rc;
1894 
1895 	SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
1896 
1897 	rc = spdk_fd_group_create(&thread->fgrp);
1898 	if (rc) {
1899 		return rc;
1900 	}
1901 
1902 	thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1903 	if (thread->msg_fd < 0) {
1904 		rc = -errno;
1905 		spdk_fd_group_destroy(thread->fgrp);
1906 		thread->fgrp = NULL;
1907 
1908 		return rc;
1909 	}
1910 
1911 	return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread);
1912 }
1913 #else
1914 static int
1915 thread_interrupt_create(struct spdk_thread *thread)
1916 {
1917 	return -ENOTSUP;
1918 }
1919 #endif
1920 
1921 struct spdk_interrupt *
1922 spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
1923 			void *arg, const char *name)
1924 {
1925 	struct spdk_thread *thread;
1926 	struct spdk_interrupt *intr;
1927 
1928 	thread = spdk_get_thread();
1929 	if (!thread) {
1930 		assert(false);
1931 		return NULL;
1932 	}
1933 
1934 	if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
1935 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1936 		return NULL;
1937 	}
1938 
1939 	if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) {
1940 		return NULL;
1941 	}
1942 
1943 	intr = calloc(1, sizeof(*intr));
1944 	if (intr == NULL) {
1945 		SPDK_ERRLOG("Interrupt handler allocation failed\n");
1946 		return NULL;
1947 	}
1948 
1949 	if (name) {
1950 		snprintf(intr->name, sizeof(intr->name), "%s", name);
1951 	} else {
1952 		snprintf(intr->name, sizeof(intr->name), "%p", fn);
1953 	}
1954 
1955 	intr->efd = efd;
1956 	intr->thread = thread;
1957 
1958 	return intr;
1959 }
1960 
1961 void
1962 spdk_interrupt_unregister(struct spdk_interrupt **pintr)
1963 {
1964 	struct spdk_thread *thread;
1965 	struct spdk_interrupt *intr;
1966 
1967 	intr = *pintr;
1968 	if (intr == NULL) {
1969 		return;
1970 	}
1971 
1972 	*pintr = NULL;
1973 
1974 	thread = spdk_get_thread();
1975 	if (!thread) {
1976 		assert(false);
1977 		return;
1978 	}
1979 
1980 	if (intr->thread != thread) {
1981 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
1982 		assert(false);
1983 		return;
1984 	}
1985 
1986 	spdk_fd_group_remove(thread->fgrp, intr->efd);
1987 	free(intr);
1988 }
1989 
1990 int
1991 spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
1992 			       enum spdk_interrupt_event_types event_types)
1993 {
1994 	struct spdk_thread *thread;
1995 
1996 	thread = spdk_get_thread();
1997 	if (!thread) {
1998 		assert(false);
1999 		return -EINVAL;
2000 	}
2001 
2002 	if (intr->thread != thread) {
2003 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
2004 		assert(false);
2005 		return -EINVAL;
2006 	}
2007 
2008 	return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
2009 }
2010 
2011 int
2012 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
2013 {
2014 	return spdk_fd_group_get_fd(thread->fgrp);
2015 }
2016 
2017 static bool g_interrupt_mode = false;
2018 
2019 int
2020 spdk_interrupt_mode_enable(void)
2021 {
2022 #ifdef __linux__
2023 	SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
2024 	g_interrupt_mode = true;
2025 	return 0;
2026 #else
2027 	SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
2028 	g_interrupt_mode = false;
2029 	return -ENOTSUP;
2030 #endif
2031 }
2032 
2033 bool
2034 spdk_interrupt_mode_is_enabled(void)
2035 {
2036 	return g_interrupt_mode;
2037 }
2038 
2039 SPDK_LOG_REGISTER_COMPONENT(thread)
2040