xref: /spdk/lib/thread/thread.c (revision d2a0706982a95a1ae495f9499e2defb08e98fe24)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/queue.h"
39 #include "spdk/string.h"
40 #include "spdk/thread.h"
41 #include "spdk/util.h"
42 #include "spdk/fd_group.h"
43 
44 #include "spdk/log.h"
45 #include "spdk_internal/thread.h"
46 
47 #ifdef __linux__
48 #include <sys/timerfd.h>
49 #include <sys/eventfd.h>
50 #endif
51 
52 #define SPDK_MSG_BATCH_SIZE		8
53 #define SPDK_MAX_DEVICE_NAME_LEN	256
54 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
55 
56 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
57 
58 static spdk_new_thread_fn g_new_thread_fn = NULL;
59 static spdk_thread_op_fn g_thread_op_fn = NULL;
60 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
61 static size_t g_ctx_sz = 0;
62 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
63  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
64  * SPDK application is required.
65  */
66 static uint64_t g_thread_id = 1;
67 
68 struct io_device {
69 	void				*io_device;
70 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
71 	spdk_io_channel_create_cb	create_cb;
72 	spdk_io_channel_destroy_cb	destroy_cb;
73 	spdk_io_device_unregister_cb	unregister_cb;
74 	struct spdk_thread		*unregister_thread;
75 	uint32_t			ctx_size;
76 	uint32_t			for_each_count;
77 	TAILQ_ENTRY(io_device)		tailq;
78 
79 	uint32_t			refcnt;
80 
81 	bool				unregistered;
82 };
83 
84 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
85 
86 struct spdk_msg {
87 	spdk_msg_fn		fn;
88 	void			*arg;
89 
90 	SLIST_ENTRY(spdk_msg)	link;
91 };
92 
93 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
94 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
95 
96 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
97 static uint32_t g_thread_count = 0;
98 
99 static __thread struct spdk_thread *tls_thread = NULL;
100 
101 static inline struct spdk_thread *
102 _get_thread(void)
103 {
104 	return tls_thread;
105 }
106 
107 static int
108 _thread_lib_init(size_t ctx_sz)
109 {
110 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
111 
112 	g_ctx_sz = ctx_sz;
113 
114 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
115 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
116 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
117 			     sizeof(struct spdk_msg),
118 			     0, /* No cache. We do our own. */
119 			     SPDK_ENV_SOCKET_ID_ANY);
120 
121 	if (!g_spdk_msg_mempool) {
122 		SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
123 		return -1;
124 	}
125 
126 	return 0;
127 }
128 
129 int
130 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
131 {
132 	assert(g_new_thread_fn == NULL);
133 	assert(g_thread_op_fn == NULL);
134 
135 	if (new_thread_fn == NULL) {
136 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
137 	} else {
138 		g_new_thread_fn = new_thread_fn;
139 	}
140 
141 	return _thread_lib_init(ctx_sz);
142 }
143 
144 int
145 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
146 			 spdk_thread_op_supported_fn thread_op_supported_fn,
147 			 size_t ctx_sz)
148 {
149 	assert(g_new_thread_fn == NULL);
150 	assert(g_thread_op_fn == NULL);
151 	assert(g_thread_op_supported_fn == NULL);
152 
153 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
154 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
155 		return -EINVAL;
156 	}
157 
158 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
159 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
160 	} else {
161 		g_thread_op_fn = thread_op_fn;
162 		g_thread_op_supported_fn = thread_op_supported_fn;
163 	}
164 
165 	return _thread_lib_init(ctx_sz);
166 }
167 
168 void
169 spdk_thread_lib_fini(void)
170 {
171 	struct io_device *dev;
172 
173 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
174 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
175 	}
176 
177 	if (g_spdk_msg_mempool) {
178 		spdk_mempool_free(g_spdk_msg_mempool);
179 		g_spdk_msg_mempool = NULL;
180 	}
181 
182 	g_new_thread_fn = NULL;
183 	g_thread_op_fn = NULL;
184 	g_thread_op_supported_fn = NULL;
185 	g_ctx_sz = 0;
186 }
187 
188 static void thread_interrupt_destroy(struct spdk_thread *thread);
189 static int thread_interrupt_create(struct spdk_thread *thread);
190 
191 static void
192 _free_thread(struct spdk_thread *thread)
193 {
194 	struct spdk_io_channel *ch;
195 	struct spdk_msg *msg;
196 	struct spdk_poller *poller, *ptmp;
197 
198 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
199 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
200 			    thread->name, ch->dev->name);
201 	}
202 
203 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
204 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
205 			SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
206 				     poller->name);
207 		}
208 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
209 		free(poller);
210 	}
211 
212 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) {
213 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
214 			SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
215 				     poller->name);
216 		}
217 		TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
218 		free(poller);
219 	}
220 
221 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
222 		SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
223 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
224 		free(poller);
225 	}
226 
227 	pthread_mutex_lock(&g_devlist_mutex);
228 	assert(g_thread_count > 0);
229 	g_thread_count--;
230 	TAILQ_REMOVE(&g_threads, thread, tailq);
231 	pthread_mutex_unlock(&g_devlist_mutex);
232 
233 	msg = SLIST_FIRST(&thread->msg_cache);
234 	while (msg != NULL) {
235 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
236 
237 		assert(thread->msg_cache_count > 0);
238 		thread->msg_cache_count--;
239 		spdk_mempool_put(g_spdk_msg_mempool, msg);
240 
241 		msg = SLIST_FIRST(&thread->msg_cache);
242 	}
243 
244 	assert(thread->msg_cache_count == 0);
245 
246 	if (thread->interrupt_mode) {
247 		thread_interrupt_destroy(thread);
248 	}
249 
250 	spdk_ring_free(thread->messages);
251 	free(thread);
252 }
253 
254 struct spdk_thread *
255 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
256 {
257 	struct spdk_thread *thread;
258 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
259 	int rc = 0, i;
260 
261 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
262 	if (!thread) {
263 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
264 		return NULL;
265 	}
266 
267 	if (cpumask) {
268 		spdk_cpuset_copy(&thread->cpumask, cpumask);
269 	} else {
270 		spdk_cpuset_negate(&thread->cpumask);
271 	}
272 
273 	TAILQ_INIT(&thread->io_channels);
274 	TAILQ_INIT(&thread->active_pollers);
275 	TAILQ_INIT(&thread->timed_pollers);
276 	TAILQ_INIT(&thread->paused_pollers);
277 	SLIST_INIT(&thread->msg_cache);
278 	thread->msg_cache_count = 0;
279 
280 	thread->tsc_last = spdk_get_ticks();
281 
282 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
283 	if (!thread->messages) {
284 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
285 		free(thread);
286 		return NULL;
287 	}
288 
289 	/* Fill the local message pool cache. */
290 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
291 	if (rc == 0) {
292 		/* If we can't populate the cache it's ok. The cache will get filled
293 		 * up organically as messages are passed to the thread. */
294 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
295 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
296 			thread->msg_cache_count++;
297 		}
298 	}
299 
300 	if (name) {
301 		snprintf(thread->name, sizeof(thread->name), "%s", name);
302 	} else {
303 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
304 	}
305 
306 	pthread_mutex_lock(&g_devlist_mutex);
307 	if (g_thread_id == 0) {
308 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
309 		pthread_mutex_unlock(&g_devlist_mutex);
310 		_free_thread(thread);
311 		return NULL;
312 	}
313 	thread->id = g_thread_id++;
314 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
315 	g_thread_count++;
316 	pthread_mutex_unlock(&g_devlist_mutex);
317 
318 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
319 		      thread->id, thread->name);
320 
321 	if (spdk_interrupt_mode_is_enabled()) {
322 		thread->interrupt_mode = true;
323 		rc = thread_interrupt_create(thread);
324 		if (rc != 0) {
325 			_free_thread(thread);
326 			return NULL;
327 		}
328 	}
329 
330 	if (g_new_thread_fn) {
331 		rc = g_new_thread_fn(thread);
332 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
333 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
334 	}
335 
336 	if (rc != 0) {
337 		_free_thread(thread);
338 		return NULL;
339 	}
340 
341 	thread->state = SPDK_THREAD_STATE_RUNNING;
342 
343 	return thread;
344 }
345 
346 void
347 spdk_set_thread(struct spdk_thread *thread)
348 {
349 	tls_thread = thread;
350 }
351 
352 static void
353 thread_exit(struct spdk_thread *thread, uint64_t now)
354 {
355 	struct spdk_poller *poller;
356 	struct spdk_io_channel *ch;
357 
358 	if (now >= thread->exit_timeout_tsc) {
359 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
360 			    thread->name);
361 		goto exited;
362 	}
363 
364 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
365 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
366 			SPDK_INFOLOG(thread,
367 				     "thread %s still has active poller %s\n",
368 				     thread->name, poller->name);
369 			return;
370 		}
371 	}
372 
373 	TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) {
374 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
375 			SPDK_INFOLOG(thread,
376 				     "thread %s still has active timed poller %s\n",
377 				     thread->name, poller->name);
378 			return;
379 		}
380 	}
381 
382 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
383 		SPDK_INFOLOG(thread,
384 			     "thread %s still has paused poller %s\n",
385 			     thread->name, poller->name);
386 		return;
387 	}
388 
389 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
390 		SPDK_INFOLOG(thread,
391 			     "thread %s still has channel for io_device %s\n",
392 			     thread->name, ch->dev->name);
393 		return;
394 	}
395 
396 exited:
397 	thread->state = SPDK_THREAD_STATE_EXITED;
398 }
399 
400 int
401 spdk_thread_exit(struct spdk_thread *thread)
402 {
403 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
404 
405 	assert(tls_thread == thread);
406 
407 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
408 		SPDK_INFOLOG(thread,
409 			     "thread %s is already exiting\n",
410 			     thread->name);
411 		return 0;
412 	}
413 
414 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
415 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
416 	thread->state = SPDK_THREAD_STATE_EXITING;
417 	return 0;
418 }
419 
420 bool
421 spdk_thread_is_exited(struct spdk_thread *thread)
422 {
423 	return thread->state == SPDK_THREAD_STATE_EXITED;
424 }
425 
426 void
427 spdk_thread_destroy(struct spdk_thread *thread)
428 {
429 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
430 
431 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
432 
433 	if (tls_thread == thread) {
434 		tls_thread = NULL;
435 	}
436 
437 	_free_thread(thread);
438 }
439 
440 void *
441 spdk_thread_get_ctx(struct spdk_thread *thread)
442 {
443 	if (g_ctx_sz > 0) {
444 		return thread->ctx;
445 	}
446 
447 	return NULL;
448 }
449 
450 struct spdk_cpuset *
451 spdk_thread_get_cpumask(struct spdk_thread *thread)
452 {
453 	return &thread->cpumask;
454 }
455 
456 int
457 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
458 {
459 	struct spdk_thread *thread;
460 
461 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
462 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
463 		assert(false);
464 		return -ENOTSUP;
465 	}
466 
467 	thread = spdk_get_thread();
468 	if (!thread) {
469 		SPDK_ERRLOG("Called from non-SPDK thread\n");
470 		assert(false);
471 		return -EINVAL;
472 	}
473 
474 	spdk_cpuset_copy(&thread->cpumask, cpumask);
475 
476 	/* Invoke framework's reschedule operation. If this function is called multiple times
477 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
478 	 * reschedule operation.
479 	 */
480 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
481 
482 	return 0;
483 }
484 
485 struct spdk_thread *
486 spdk_thread_get_from_ctx(void *ctx)
487 {
488 	if (ctx == NULL) {
489 		assert(false);
490 		return NULL;
491 	}
492 
493 	assert(g_ctx_sz > 0);
494 
495 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
496 }
497 
498 static inline uint32_t
499 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
500 {
501 	unsigned count, i;
502 	void *messages[SPDK_MSG_BATCH_SIZE];
503 	uint64_t notify = 1;
504 	int rc;
505 
506 #ifdef DEBUG
507 	/*
508 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
509 	 * so we will never actually read uninitialized data from events, but just to be sure
510 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
511 	 */
512 	memset(messages, 0, sizeof(messages));
513 #endif
514 
515 	if (max_msgs > 0) {
516 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
517 	} else {
518 		max_msgs = SPDK_MSG_BATCH_SIZE;
519 	}
520 	if (thread->interrupt_mode) {
521 		/* There may be race between msg_acknowledge and another producer's msg_notify,
522 		 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
523 		 * This can avoid msg notification missing.
524 		 */
525 		rc = read(thread->msg_fd, &notify, sizeof(notify));
526 		if (rc < 0) {
527 			SPDK_ERRLOG("failed to acknowledge msg_queue: %s.\n", spdk_strerror(errno));
528 		}
529 	}
530 
531 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
532 	if (thread->interrupt_mode && spdk_ring_count(thread->messages) != 0) {
533 		rc = write(thread->msg_fd, &notify, sizeof(notify));
534 		if (rc < 0) {
535 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
536 		}
537 	}
538 	if (count == 0) {
539 		return 0;
540 	}
541 
542 	for (i = 0; i < count; i++) {
543 		struct spdk_msg *msg = messages[i];
544 
545 		assert(msg != NULL);
546 		msg->fn(msg->arg);
547 
548 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
549 			/* Insert the messages at the head. We want to re-use the hot
550 			 * ones. */
551 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
552 			thread->msg_cache_count++;
553 		} else {
554 			spdk_mempool_put(g_spdk_msg_mempool, msg);
555 		}
556 	}
557 
558 	return count;
559 }
560 
561 static void
562 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
563 {
564 	struct spdk_poller *iter;
565 
566 	poller->next_run_tick = now + poller->period_ticks;
567 
568 	/*
569 	 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled
570 	 * run time.
571 	 */
572 	TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) {
573 		if (iter->next_run_tick <= poller->next_run_tick) {
574 			TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq);
575 			return;
576 		}
577 	}
578 
579 	/* No earlier pollers were found, so this poller must be the new head */
580 	TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq);
581 }
582 
583 static void
584 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
585 {
586 	if (poller->period_ticks) {
587 		poller_insert_timer(thread, poller, spdk_get_ticks());
588 	} else {
589 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
590 	}
591 }
592 
593 static inline void
594 thread_update_stats(struct spdk_thread *thread, uint64_t end,
595 		    uint64_t start, int rc)
596 {
597 	if (rc == 0) {
598 		/* Poller status idle */
599 		thread->stats.idle_tsc += end - start;
600 	} else if (rc > 0) {
601 		/* Poller status busy */
602 		thread->stats.busy_tsc += end - start;
603 	}
604 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
605 	thread->tsc_last = end;
606 }
607 
608 static int
609 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
610 {
611 	uint32_t msg_count;
612 	struct spdk_poller *poller, *tmp;
613 	spdk_msg_fn critical_msg;
614 	int rc = 0;
615 
616 	thread->tsc_last = now;
617 
618 	critical_msg = thread->critical_msg;
619 	if (spdk_unlikely(critical_msg != NULL)) {
620 		critical_msg(NULL);
621 		thread->critical_msg = NULL;
622 	}
623 
624 	msg_count = msg_queue_run_batch(thread, max_msgs);
625 	if (msg_count) {
626 		rc = 1;
627 	}
628 
629 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
630 				   active_pollers_head, tailq, tmp) {
631 		int poller_rc;
632 
633 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
634 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
635 			free(poller);
636 			continue;
637 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
638 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
639 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
640 			poller->state = SPDK_POLLER_STATE_PAUSED;
641 			continue;
642 		}
643 
644 		poller->state = SPDK_POLLER_STATE_RUNNING;
645 		poller_rc = poller->fn(poller->arg);
646 
647 		poller->run_count++;
648 		if (poller_rc > 0) {
649 			poller->busy_count++;
650 		}
651 
652 #ifdef DEBUG
653 		if (poller_rc == -1) {
654 			SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
655 		}
656 #endif
657 
658 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
659 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
660 			free(poller);
661 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
662 			poller->state = SPDK_POLLER_STATE_WAITING;
663 		}
664 
665 		if (poller_rc > rc) {
666 			rc = poller_rc;
667 		}
668 	}
669 
670 	TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) {
671 		int timer_rc = 0;
672 
673 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
674 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
675 			free(poller);
676 			continue;
677 		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
678 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
679 			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
680 			poller->state = SPDK_POLLER_STATE_PAUSED;
681 			continue;
682 		}
683 
684 		if (now < poller->next_run_tick) {
685 			break;
686 		}
687 
688 		poller->state = SPDK_POLLER_STATE_RUNNING;
689 		timer_rc = poller->fn(poller->arg);
690 
691 		poller->run_count++;
692 		if (timer_rc > 0) {
693 			poller->busy_count++;
694 		}
695 
696 #ifdef DEBUG
697 		if (timer_rc == -1) {
698 			SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
699 		}
700 #endif
701 
702 		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
703 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
704 			free(poller);
705 		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
706 			poller->state = SPDK_POLLER_STATE_WAITING;
707 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
708 			poller_insert_timer(thread, poller, now);
709 		}
710 
711 		if (timer_rc > rc) {
712 			rc = timer_rc;
713 		}
714 	}
715 
716 	return rc;
717 }
718 
719 int
720 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
721 {
722 	struct spdk_thread *orig_thread;
723 	int rc;
724 
725 	orig_thread = _get_thread();
726 	tls_thread = thread;
727 
728 	if (now == 0) {
729 		now = spdk_get_ticks();
730 	}
731 
732 	if (!thread->interrupt_mode) {
733 		rc = thread_poll(thread, max_msgs, now);
734 	} else {
735 		/* Non-block wait on thread's fd_group */
736 		rc = spdk_fd_group_wait(thread->fgrp, 0);
737 	}
738 
739 
740 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
741 		thread_exit(thread, now);
742 	}
743 
744 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
745 
746 	tls_thread = orig_thread;
747 
748 	return rc;
749 }
750 
751 uint64_t
752 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
753 {
754 	struct spdk_poller *poller;
755 
756 	poller = TAILQ_FIRST(&thread->timed_pollers);
757 	if (poller) {
758 		return poller->next_run_tick;
759 	}
760 
761 	return 0;
762 }
763 
764 int
765 spdk_thread_has_active_pollers(struct spdk_thread *thread)
766 {
767 	return !TAILQ_EMPTY(&thread->active_pollers);
768 }
769 
770 static bool
771 thread_has_unpaused_pollers(struct spdk_thread *thread)
772 {
773 	if (TAILQ_EMPTY(&thread->active_pollers) &&
774 	    TAILQ_EMPTY(&thread->timed_pollers)) {
775 		return false;
776 	}
777 
778 	return true;
779 }
780 
781 bool
782 spdk_thread_has_pollers(struct spdk_thread *thread)
783 {
784 	if (!thread_has_unpaused_pollers(thread) &&
785 	    TAILQ_EMPTY(&thread->paused_pollers)) {
786 		return false;
787 	}
788 
789 	return true;
790 }
791 
792 bool
793 spdk_thread_is_idle(struct spdk_thread *thread)
794 {
795 	if (spdk_ring_count(thread->messages) ||
796 	    thread_has_unpaused_pollers(thread) ||
797 	    thread->critical_msg != NULL) {
798 		return false;
799 	}
800 
801 	return true;
802 }
803 
804 uint32_t
805 spdk_thread_get_count(void)
806 {
807 	/*
808 	 * Return cached value of the current thread count.  We could acquire the
809 	 *  lock and iterate through the TAILQ of threads to count them, but that
810 	 *  count could still be invalidated after we release the lock.
811 	 */
812 	return g_thread_count;
813 }
814 
815 struct spdk_thread *
816 spdk_get_thread(void)
817 {
818 	return _get_thread();
819 }
820 
821 const char *
822 spdk_thread_get_name(const struct spdk_thread *thread)
823 {
824 	return thread->name;
825 }
826 
827 uint64_t
828 spdk_thread_get_id(const struct spdk_thread *thread)
829 {
830 	return thread->id;
831 }
832 
833 struct spdk_thread *
834 spdk_thread_get_by_id(uint64_t id)
835 {
836 	struct spdk_thread *thread;
837 
838 	pthread_mutex_lock(&g_devlist_mutex);
839 	TAILQ_FOREACH(thread, &g_threads, tailq) {
840 		if (thread->id == id) {
841 			pthread_mutex_unlock(&g_devlist_mutex);
842 
843 			return thread;
844 		}
845 	}
846 	pthread_mutex_unlock(&g_devlist_mutex);
847 
848 	return NULL;
849 }
850 
851 int
852 spdk_thread_get_stats(struct spdk_thread_stats *stats)
853 {
854 	struct spdk_thread *thread;
855 
856 	thread = _get_thread();
857 	if (!thread) {
858 		SPDK_ERRLOG("No thread allocated\n");
859 		return -EINVAL;
860 	}
861 
862 	if (stats == NULL) {
863 		return -EINVAL;
864 	}
865 
866 	*stats = thread->stats;
867 
868 	return 0;
869 }
870 
871 uint64_t
872 spdk_thread_get_last_tsc(struct spdk_thread *thread)
873 {
874 	if (thread == NULL) {
875 		thread = _get_thread();
876 	}
877 
878 	return thread->tsc_last;
879 }
880 
881 int
882 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
883 {
884 	struct spdk_thread *local_thread;
885 	struct spdk_msg *msg;
886 	int rc;
887 
888 	assert(thread != NULL);
889 
890 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
891 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
892 		return -EIO;
893 	}
894 
895 	local_thread = _get_thread();
896 
897 	msg = NULL;
898 	if (local_thread != NULL) {
899 		if (local_thread->msg_cache_count > 0) {
900 			msg = SLIST_FIRST(&local_thread->msg_cache);
901 			assert(msg != NULL);
902 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
903 			local_thread->msg_cache_count--;
904 		}
905 	}
906 
907 	if (msg == NULL) {
908 		msg = spdk_mempool_get(g_spdk_msg_mempool);
909 		if (!msg) {
910 			SPDK_ERRLOG("msg could not be allocated\n");
911 			return -ENOMEM;
912 		}
913 	}
914 
915 	msg->fn = fn;
916 	msg->arg = ctx;
917 
918 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
919 	if (rc != 1) {
920 		SPDK_ERRLOG("msg could not be enqueued\n");
921 		spdk_mempool_put(g_spdk_msg_mempool, msg);
922 		return -EIO;
923 	}
924 
925 	if (thread->interrupt_mode) {
926 		uint64_t notify = 1;
927 
928 		rc = write(thread->msg_fd, &notify, sizeof(notify));
929 		if (rc < 0) {
930 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
931 			return -EIO;
932 		}
933 	}
934 
935 	return 0;
936 }
937 
938 int
939 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
940 {
941 	spdk_msg_fn expected = NULL;
942 
943 	if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
944 					__ATOMIC_SEQ_CST)) {
945 		if (thread->interrupt_mode) {
946 			uint64_t notify = 1;
947 			int rc;
948 
949 			rc = write(thread->msg_fd, &notify, sizeof(notify));
950 			if (rc < 0) {
951 				SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
952 				return -EIO;
953 			}
954 		}
955 
956 		return 0;
957 	}
958 
959 	return -EIO;
960 }
961 
962 #ifdef __linux__
963 static int
964 interrupt_timerfd_prepare(uint64_t period_microseconds)
965 {
966 	int timerfd;
967 	int ret;
968 	struct itimerspec new_tv;
969 	uint64_t period_seconds;
970 	uint64_t period_nanoseconds;
971 
972 	if (period_microseconds == 0) {
973 		return -EINVAL;
974 	}
975 
976 	period_seconds = period_microseconds / SPDK_SEC_TO_USEC;
977 	period_nanoseconds = period_microseconds % SPDK_SEC_TO_USEC * 1000;
978 
979 	new_tv.it_value.tv_sec = period_seconds;
980 	new_tv.it_value.tv_nsec = period_nanoseconds;
981 
982 	new_tv.it_interval.tv_sec = period_seconds;
983 	new_tv.it_interval.tv_nsec = period_nanoseconds;
984 
985 	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK & TFD_CLOEXEC);
986 	if (timerfd < 0) {
987 		return -errno;
988 	}
989 
990 	ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
991 	if (ret < 0) {
992 		close(timerfd);
993 		return -errno;
994 	}
995 
996 	return timerfd;
997 }
998 #else
999 static int
1000 interrupt_timerfd_prepare(uint64_t period_microseconds)
1001 {
1002 	return -ENOTSUP;
1003 }
1004 #endif
1005 
1006 static int
1007 interrupt_timerfd_process(void *arg)
1008 {
1009 	struct spdk_poller *poller = arg;
1010 	uint64_t exp;
1011 	int rc;
1012 
1013 	/* clear the level of interval timer */
1014 	rc = read(poller->timerfd, &exp, sizeof(exp));
1015 	if (rc < 0) {
1016 		if (rc == -EAGAIN) {
1017 			return 0;
1018 		}
1019 
1020 		return rc;
1021 	}
1022 
1023 	return poller->fn(poller->arg);
1024 }
1025 
1026 static int
1027 thread_interrupt_register_timerfd(struct spdk_fd_group *fgrp,
1028 				  uint64_t period_microseconds,
1029 				  struct spdk_poller *poller)
1030 {
1031 	int timerfd;
1032 	int rc;
1033 
1034 	timerfd = interrupt_timerfd_prepare(period_microseconds);
1035 	if (timerfd < 0) {
1036 		return timerfd;
1037 	}
1038 
1039 	rc = spdk_fd_group_add(fgrp, timerfd,
1040 			       interrupt_timerfd_process, poller);
1041 	if (rc < 0) {
1042 		close(timerfd);
1043 		return rc;
1044 	}
1045 
1046 	return timerfd;
1047 }
1048 
1049 static struct spdk_poller *
1050 poller_register(spdk_poller_fn fn,
1051 		void *arg,
1052 		uint64_t period_microseconds,
1053 		const char *name)
1054 {
1055 	struct spdk_thread *thread;
1056 	struct spdk_poller *poller;
1057 	uint64_t quotient, remainder, ticks;
1058 
1059 	thread = spdk_get_thread();
1060 	if (!thread) {
1061 		assert(false);
1062 		return NULL;
1063 	}
1064 
1065 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1066 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1067 		return NULL;
1068 	}
1069 
1070 	poller = calloc(1, sizeof(*poller));
1071 	if (poller == NULL) {
1072 		SPDK_ERRLOG("Poller memory allocation failed\n");
1073 		return NULL;
1074 	}
1075 
1076 	if (name) {
1077 		snprintf(poller->name, sizeof(poller->name), "%s", name);
1078 	} else {
1079 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
1080 	}
1081 
1082 	poller->state = SPDK_POLLER_STATE_WAITING;
1083 	poller->fn = fn;
1084 	poller->arg = arg;
1085 	poller->thread = thread;
1086 
1087 	if (period_microseconds) {
1088 		quotient = period_microseconds / SPDK_SEC_TO_USEC;
1089 		remainder = period_microseconds % SPDK_SEC_TO_USEC;
1090 		ticks = spdk_get_ticks_hz();
1091 
1092 		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
1093 	} else {
1094 		poller->period_ticks = 0;
1095 	}
1096 
1097 	if (thread->interrupt_mode && period_microseconds != 0) {
1098 		int rc;
1099 
1100 		rc = thread_interrupt_register_timerfd(thread->fgrp, period_microseconds, poller);
1101 		if (rc < 0) {
1102 			SPDK_ERRLOG("Failed to register timerfd for periodic poller: %s\n", spdk_strerror(-rc));
1103 			free(poller);
1104 			return NULL;
1105 		}
1106 		poller->timerfd = rc;
1107 	}
1108 
1109 	thread_insert_poller(thread, poller);
1110 
1111 	return poller;
1112 }
1113 
1114 struct spdk_poller *
1115 spdk_poller_register(spdk_poller_fn fn,
1116 		     void *arg,
1117 		     uint64_t period_microseconds)
1118 {
1119 	return poller_register(fn, arg, period_microseconds, NULL);
1120 }
1121 
1122 struct spdk_poller *
1123 spdk_poller_register_named(spdk_poller_fn fn,
1124 			   void *arg,
1125 			   uint64_t period_microseconds,
1126 			   const char *name)
1127 {
1128 	return poller_register(fn, arg, period_microseconds, name);
1129 }
1130 
1131 void
1132 spdk_poller_unregister(struct spdk_poller **ppoller)
1133 {
1134 	struct spdk_thread *thread;
1135 	struct spdk_poller *poller;
1136 
1137 	poller = *ppoller;
1138 	if (poller == NULL) {
1139 		return;
1140 	}
1141 
1142 	*ppoller = NULL;
1143 
1144 	thread = spdk_get_thread();
1145 	if (!thread) {
1146 		assert(false);
1147 		return;
1148 	}
1149 
1150 	if (poller->thread != thread) {
1151 		SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n");
1152 		assert(false);
1153 		return;
1154 	}
1155 
1156 	if (thread->interrupt_mode && poller->timerfd) {
1157 		spdk_fd_group_remove(thread->fgrp, poller->timerfd);
1158 		close(poller->timerfd);
1159 		poller->timerfd = 0;
1160 	}
1161 
1162 	/* If the poller was paused, put it on the active_pollers list so that
1163 	 * its unregistration can be processed by spdk_thread_poll().
1164 	 */
1165 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1166 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1167 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1168 		poller->period_ticks = 0;
1169 	}
1170 
1171 	/* Simply set the state to unregistered. The poller will get cleaned up
1172 	 * in a subsequent call to spdk_thread_poll().
1173 	 */
1174 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1175 }
1176 
1177 void
1178 spdk_poller_pause(struct spdk_poller *poller)
1179 {
1180 	struct spdk_thread *thread;
1181 
1182 	if (poller->state == SPDK_POLLER_STATE_PAUSED ||
1183 	    poller->state == SPDK_POLLER_STATE_PAUSING) {
1184 		return;
1185 	}
1186 
1187 	thread = spdk_get_thread();
1188 	if (!thread) {
1189 		assert(false);
1190 		return;
1191 	}
1192 
1193 	/* If a poller is paused from within itself, we can immediately move it
1194 	 * on the paused_pollers list.  Otherwise we just set its state to
1195 	 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it.  It
1196 	 * allows a poller to be paused from another one's context without
1197 	 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration.
1198 	 */
1199 	if (poller->state != SPDK_POLLER_STATE_RUNNING) {
1200 		poller->state = SPDK_POLLER_STATE_PAUSING;
1201 	} else {
1202 		if (poller->period_ticks > 0) {
1203 			TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
1204 		} else {
1205 			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1206 		}
1207 
1208 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1209 		poller->state = SPDK_POLLER_STATE_PAUSED;
1210 	}
1211 }
1212 
1213 void
1214 spdk_poller_resume(struct spdk_poller *poller)
1215 {
1216 	struct spdk_thread *thread;
1217 
1218 	if (poller->state != SPDK_POLLER_STATE_PAUSED &&
1219 	    poller->state != SPDK_POLLER_STATE_PAUSING) {
1220 		return;
1221 	}
1222 
1223 	thread = spdk_get_thread();
1224 	if (!thread) {
1225 		assert(false);
1226 		return;
1227 	}
1228 
1229 	/* If a poller is paused it has to be removed from the paused pollers
1230 	 * list and put on the active / timer list depending on its
1231 	 * period_ticks.  If a poller is still in the process of being paused,
1232 	 * we just need to flip its state back to waiting, as it's already on
1233 	 * the appropriate list.
1234 	 */
1235 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1236 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1237 		thread_insert_poller(thread, poller);
1238 	}
1239 
1240 	poller->state = SPDK_POLLER_STATE_WAITING;
1241 }
1242 
1243 const char *
1244 spdk_poller_state_str(enum spdk_poller_state state)
1245 {
1246 	switch (state) {
1247 	case SPDK_POLLER_STATE_WAITING:
1248 		return "waiting";
1249 	case SPDK_POLLER_STATE_RUNNING:
1250 		return "running";
1251 	case SPDK_POLLER_STATE_UNREGISTERED:
1252 		return "unregistered";
1253 	case SPDK_POLLER_STATE_PAUSING:
1254 		return "pausing";
1255 	case SPDK_POLLER_STATE_PAUSED:
1256 		return "paused";
1257 	default:
1258 		return NULL;
1259 	}
1260 }
1261 
1262 struct call_thread {
1263 	struct spdk_thread *cur_thread;
1264 	spdk_msg_fn fn;
1265 	void *ctx;
1266 
1267 	struct spdk_thread *orig_thread;
1268 	spdk_msg_fn cpl;
1269 };
1270 
1271 static void
1272 _on_thread(void *ctx)
1273 {
1274 	struct call_thread *ct = ctx;
1275 	int rc __attribute__((unused));
1276 
1277 	ct->fn(ct->ctx);
1278 
1279 	pthread_mutex_lock(&g_devlist_mutex);
1280 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1281 	pthread_mutex_unlock(&g_devlist_mutex);
1282 
1283 	if (!ct->cur_thread) {
1284 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1285 
1286 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1287 		free(ctx);
1288 	} else {
1289 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1290 			      ct->cur_thread->name);
1291 
1292 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1293 	}
1294 	assert(rc == 0);
1295 }
1296 
1297 void
1298 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1299 {
1300 	struct call_thread *ct;
1301 	struct spdk_thread *thread;
1302 	int rc __attribute__((unused));
1303 
1304 	ct = calloc(1, sizeof(*ct));
1305 	if (!ct) {
1306 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1307 		cpl(ctx);
1308 		return;
1309 	}
1310 
1311 	ct->fn = fn;
1312 	ct->ctx = ctx;
1313 	ct->cpl = cpl;
1314 
1315 	thread = _get_thread();
1316 	if (!thread) {
1317 		SPDK_ERRLOG("No thread allocated\n");
1318 		free(ct);
1319 		cpl(ctx);
1320 		return;
1321 	}
1322 	ct->orig_thread = thread;
1323 
1324 	pthread_mutex_lock(&g_devlist_mutex);
1325 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1326 	pthread_mutex_unlock(&g_devlist_mutex);
1327 
1328 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
1329 		      ct->orig_thread->name);
1330 
1331 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1332 	assert(rc == 0);
1333 }
1334 
1335 void
1336 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1337 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1338 			const char *name)
1339 {
1340 	struct io_device *dev, *tmp;
1341 	struct spdk_thread *thread;
1342 
1343 	assert(io_device != NULL);
1344 	assert(create_cb != NULL);
1345 	assert(destroy_cb != NULL);
1346 
1347 	thread = spdk_get_thread();
1348 	if (!thread) {
1349 		SPDK_ERRLOG("called from non-SPDK thread\n");
1350 		assert(false);
1351 		return;
1352 	}
1353 
1354 	dev = calloc(1, sizeof(struct io_device));
1355 	if (dev == NULL) {
1356 		SPDK_ERRLOG("could not allocate io_device\n");
1357 		return;
1358 	}
1359 
1360 	dev->io_device = io_device;
1361 	if (name) {
1362 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1363 	} else {
1364 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1365 	}
1366 	dev->create_cb = create_cb;
1367 	dev->destroy_cb = destroy_cb;
1368 	dev->unregister_cb = NULL;
1369 	dev->ctx_size = ctx_size;
1370 	dev->for_each_count = 0;
1371 	dev->unregistered = false;
1372 	dev->refcnt = 0;
1373 
1374 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
1375 		      dev->name, dev->io_device, thread->name);
1376 
1377 	pthread_mutex_lock(&g_devlist_mutex);
1378 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
1379 		if (tmp->io_device == io_device) {
1380 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1381 				    io_device, tmp->name, dev->name);
1382 			free(dev);
1383 			pthread_mutex_unlock(&g_devlist_mutex);
1384 			return;
1385 		}
1386 	}
1387 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1388 	pthread_mutex_unlock(&g_devlist_mutex);
1389 }
1390 
1391 static void
1392 _finish_unregister(void *arg)
1393 {
1394 	struct io_device *dev = arg;
1395 
1396 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1397 		      dev->name, dev->io_device, dev->unregister_thread->name);
1398 
1399 	dev->unregister_cb(dev->io_device);
1400 	free(dev);
1401 }
1402 
1403 static void
1404 io_device_free(struct io_device *dev)
1405 {
1406 	int rc __attribute__((unused));
1407 
1408 	if (dev->unregister_cb == NULL) {
1409 		free(dev);
1410 	} else {
1411 		assert(dev->unregister_thread != NULL);
1412 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
1413 			      dev->name, dev->io_device, dev->unregister_thread->name);
1414 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1415 		assert(rc == 0);
1416 	}
1417 }
1418 
1419 void
1420 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1421 {
1422 	struct io_device *dev;
1423 	uint32_t refcnt;
1424 	struct spdk_thread *thread;
1425 
1426 	thread = spdk_get_thread();
1427 	if (!thread) {
1428 		SPDK_ERRLOG("called from non-SPDK thread\n");
1429 		assert(false);
1430 		return;
1431 	}
1432 
1433 	pthread_mutex_lock(&g_devlist_mutex);
1434 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1435 		if (dev->io_device == io_device) {
1436 			break;
1437 		}
1438 	}
1439 
1440 	if (!dev) {
1441 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1442 		assert(false);
1443 		pthread_mutex_unlock(&g_devlist_mutex);
1444 		return;
1445 	}
1446 
1447 	if (dev->for_each_count > 0) {
1448 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1449 			    dev->name, io_device, dev->for_each_count);
1450 		pthread_mutex_unlock(&g_devlist_mutex);
1451 		return;
1452 	}
1453 
1454 	dev->unregister_cb = unregister_cb;
1455 	dev->unregistered = true;
1456 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
1457 	refcnt = dev->refcnt;
1458 	dev->unregister_thread = thread;
1459 	pthread_mutex_unlock(&g_devlist_mutex);
1460 
1461 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
1462 		      dev->name, dev->io_device, thread->name);
1463 
1464 	if (refcnt > 0) {
1465 		/* defer deletion */
1466 		return;
1467 	}
1468 
1469 	io_device_free(dev);
1470 }
1471 
1472 const char *
1473 spdk_io_device_get_name(struct io_device *dev)
1474 {
1475 	return dev->name;
1476 }
1477 
1478 struct spdk_io_channel *
1479 spdk_get_io_channel(void *io_device)
1480 {
1481 	struct spdk_io_channel *ch;
1482 	struct spdk_thread *thread;
1483 	struct io_device *dev;
1484 	int rc;
1485 
1486 	pthread_mutex_lock(&g_devlist_mutex);
1487 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1488 		if (dev->io_device == io_device) {
1489 			break;
1490 		}
1491 	}
1492 	if (dev == NULL) {
1493 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
1494 		pthread_mutex_unlock(&g_devlist_mutex);
1495 		return NULL;
1496 	}
1497 
1498 	thread = _get_thread();
1499 	if (!thread) {
1500 		SPDK_ERRLOG("No thread allocated\n");
1501 		pthread_mutex_unlock(&g_devlist_mutex);
1502 		return NULL;
1503 	}
1504 
1505 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1506 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
1507 		pthread_mutex_unlock(&g_devlist_mutex);
1508 		return NULL;
1509 	}
1510 
1511 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1512 		if (ch->dev == dev) {
1513 			ch->ref++;
1514 
1515 			SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1516 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
1517 
1518 			/*
1519 			 * An I/O channel already exists for this device on this
1520 			 *  thread, so return it.
1521 			 */
1522 			pthread_mutex_unlock(&g_devlist_mutex);
1523 			return ch;
1524 		}
1525 	}
1526 
1527 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1528 	if (ch == NULL) {
1529 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1530 		pthread_mutex_unlock(&g_devlist_mutex);
1531 		return NULL;
1532 	}
1533 
1534 	ch->dev = dev;
1535 	ch->destroy_cb = dev->destroy_cb;
1536 	ch->thread = thread;
1537 	ch->ref = 1;
1538 	ch->destroy_ref = 0;
1539 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1540 
1541 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1542 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
1543 
1544 	dev->refcnt++;
1545 
1546 	pthread_mutex_unlock(&g_devlist_mutex);
1547 
1548 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
1549 	if (rc != 0) {
1550 		pthread_mutex_lock(&g_devlist_mutex);
1551 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1552 		dev->refcnt--;
1553 		free(ch);
1554 		pthread_mutex_unlock(&g_devlist_mutex);
1555 		return NULL;
1556 	}
1557 
1558 	return ch;
1559 }
1560 
1561 static void
1562 put_io_channel(void *arg)
1563 {
1564 	struct spdk_io_channel *ch = arg;
1565 	bool do_remove_dev = true;
1566 	struct spdk_thread *thread;
1567 
1568 	thread = spdk_get_thread();
1569 	if (!thread) {
1570 		SPDK_ERRLOG("called from non-SPDK thread\n");
1571 		assert(false);
1572 		return;
1573 	}
1574 
1575 	SPDK_DEBUGLOG(thread,
1576 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
1577 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
1578 
1579 	assert(ch->thread == thread);
1580 
1581 	ch->destroy_ref--;
1582 
1583 	if (ch->ref > 0 || ch->destroy_ref > 0) {
1584 		/*
1585 		 * Another reference to the associated io_device was requested
1586 		 *  after this message was sent but before it had a chance to
1587 		 *  execute.
1588 		 */
1589 		return;
1590 	}
1591 
1592 	pthread_mutex_lock(&g_devlist_mutex);
1593 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1594 	pthread_mutex_unlock(&g_devlist_mutex);
1595 
1596 	/* Don't hold the devlist mutex while the destroy_cb is called. */
1597 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1598 
1599 	pthread_mutex_lock(&g_devlist_mutex);
1600 	ch->dev->refcnt--;
1601 
1602 	if (!ch->dev->unregistered) {
1603 		do_remove_dev = false;
1604 	}
1605 
1606 	if (ch->dev->refcnt > 0) {
1607 		do_remove_dev = false;
1608 	}
1609 
1610 	pthread_mutex_unlock(&g_devlist_mutex);
1611 
1612 	if (do_remove_dev) {
1613 		io_device_free(ch->dev);
1614 	}
1615 	free(ch);
1616 }
1617 
1618 void
1619 spdk_put_io_channel(struct spdk_io_channel *ch)
1620 {
1621 	struct spdk_thread *thread;
1622 	int rc __attribute__((unused));
1623 
1624 	thread = spdk_get_thread();
1625 	if (!thread) {
1626 		SPDK_ERRLOG("called from non-SPDK thread\n");
1627 		assert(false);
1628 		return;
1629 	}
1630 
1631 	if (ch->thread != thread) {
1632 		SPDK_ERRLOG("different from the thread that called get_io_channel()\n");
1633 		assert(false);
1634 		return;
1635 	}
1636 
1637 	SPDK_DEBUGLOG(thread,
1638 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1639 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
1640 
1641 	ch->ref--;
1642 
1643 	if (ch->ref == 0) {
1644 		ch->destroy_ref++;
1645 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
1646 		assert(rc == 0);
1647 	}
1648 }
1649 
1650 struct spdk_io_channel *
1651 spdk_io_channel_from_ctx(void *ctx)
1652 {
1653 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1654 }
1655 
1656 struct spdk_thread *
1657 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1658 {
1659 	return ch->thread;
1660 }
1661 
1662 struct spdk_io_channel_iter {
1663 	void *io_device;
1664 	struct io_device *dev;
1665 	spdk_channel_msg fn;
1666 	int status;
1667 	void *ctx;
1668 	struct spdk_io_channel *ch;
1669 
1670 	struct spdk_thread *cur_thread;
1671 
1672 	struct spdk_thread *orig_thread;
1673 	spdk_channel_for_each_cpl cpl;
1674 };
1675 
1676 void *
1677 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1678 {
1679 	return i->io_device;
1680 }
1681 
1682 struct spdk_io_channel *
1683 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1684 {
1685 	return i->ch;
1686 }
1687 
1688 void *
1689 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1690 {
1691 	return i->ctx;
1692 }
1693 
1694 static void
1695 _call_completion(void *ctx)
1696 {
1697 	struct spdk_io_channel_iter *i = ctx;
1698 
1699 	if (i->cpl != NULL) {
1700 		i->cpl(i, i->status);
1701 	}
1702 	free(i);
1703 }
1704 
1705 static void
1706 _call_channel(void *ctx)
1707 {
1708 	struct spdk_io_channel_iter *i = ctx;
1709 	struct spdk_io_channel *ch;
1710 
1711 	/*
1712 	 * It is possible that the channel was deleted before this
1713 	 *  message had a chance to execute.  If so, skip calling
1714 	 *  the fn() on this thread.
1715 	 */
1716 	pthread_mutex_lock(&g_devlist_mutex);
1717 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1718 		if (ch->dev->io_device == i->io_device) {
1719 			break;
1720 		}
1721 	}
1722 	pthread_mutex_unlock(&g_devlist_mutex);
1723 
1724 	if (ch) {
1725 		i->fn(i);
1726 	} else {
1727 		spdk_for_each_channel_continue(i, 0);
1728 	}
1729 }
1730 
1731 void
1732 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1733 		      spdk_channel_for_each_cpl cpl)
1734 {
1735 	struct spdk_thread *thread;
1736 	struct spdk_io_channel *ch;
1737 	struct spdk_io_channel_iter *i;
1738 	int rc __attribute__((unused));
1739 
1740 	i = calloc(1, sizeof(*i));
1741 	if (!i) {
1742 		SPDK_ERRLOG("Unable to allocate iterator\n");
1743 		return;
1744 	}
1745 
1746 	i->io_device = io_device;
1747 	i->fn = fn;
1748 	i->ctx = ctx;
1749 	i->cpl = cpl;
1750 
1751 	pthread_mutex_lock(&g_devlist_mutex);
1752 	i->orig_thread = _get_thread();
1753 
1754 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1755 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1756 			if (ch->dev->io_device == io_device) {
1757 				ch->dev->for_each_count++;
1758 				i->dev = ch->dev;
1759 				i->cur_thread = thread;
1760 				i->ch = ch;
1761 				pthread_mutex_unlock(&g_devlist_mutex);
1762 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1763 				assert(rc == 0);
1764 				return;
1765 			}
1766 		}
1767 	}
1768 
1769 	pthread_mutex_unlock(&g_devlist_mutex);
1770 
1771 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1772 	assert(rc == 0);
1773 }
1774 
1775 void
1776 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1777 {
1778 	struct spdk_thread *thread;
1779 	struct spdk_io_channel *ch;
1780 	int rc __attribute__((unused));
1781 
1782 	assert(i->cur_thread == spdk_get_thread());
1783 
1784 	i->status = status;
1785 
1786 	pthread_mutex_lock(&g_devlist_mutex);
1787 	if (status) {
1788 		goto end;
1789 	}
1790 	thread = TAILQ_NEXT(i->cur_thread, tailq);
1791 	while (thread) {
1792 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1793 			if (ch->dev->io_device == i->io_device) {
1794 				i->cur_thread = thread;
1795 				i->ch = ch;
1796 				pthread_mutex_unlock(&g_devlist_mutex);
1797 				rc = spdk_thread_send_msg(thread, _call_channel, i);
1798 				assert(rc == 0);
1799 				return;
1800 			}
1801 		}
1802 		thread = TAILQ_NEXT(thread, tailq);
1803 	}
1804 
1805 end:
1806 	i->dev->for_each_count--;
1807 	i->ch = NULL;
1808 	pthread_mutex_unlock(&g_devlist_mutex);
1809 
1810 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1811 	assert(rc == 0);
1812 }
1813 
1814 struct spdk_interrupt {
1815 	int			efd;
1816 	struct spdk_thread	*thread;
1817 	char			name[SPDK_MAX_POLLER_NAME_LEN + 1];
1818 };
1819 
1820 static void
1821 thread_interrupt_destroy(struct spdk_thread *thread)
1822 {
1823 	struct spdk_fd_group *fgrp = thread->fgrp;
1824 
1825 	SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
1826 
1827 	if (thread->msg_fd <= 0) {
1828 		return;
1829 	}
1830 
1831 	spdk_fd_group_remove(fgrp, thread->msg_fd);
1832 	close(thread->msg_fd);
1833 
1834 	spdk_fd_group_destroy(fgrp);
1835 	thread->fgrp = NULL;
1836 }
1837 
1838 #ifdef __linux__
1839 static int
1840 thread_interrupt_msg_process(void *arg)
1841 {
1842 	struct spdk_thread *thread = arg;
1843 	struct spdk_thread *orig_thread;
1844 	uint32_t msg_count;
1845 	spdk_msg_fn critical_msg;
1846 	int rc = 0;
1847 	uint64_t now = spdk_get_ticks();
1848 
1849 	orig_thread = _get_thread();
1850 	tls_thread = thread;
1851 
1852 	critical_msg = thread->critical_msg;
1853 	if (spdk_unlikely(critical_msg != NULL)) {
1854 		critical_msg(NULL);
1855 		thread->critical_msg = NULL;
1856 	}
1857 
1858 	msg_count = msg_queue_run_batch(thread, 0);
1859 	if (msg_count) {
1860 		rc = 1;
1861 	}
1862 
1863 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
1864 
1865 	tls_thread = orig_thread;
1866 
1867 	return rc;
1868 }
1869 
1870 static int
1871 thread_interrupt_create(struct spdk_thread *thread)
1872 {
1873 	int rc;
1874 
1875 	SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
1876 
1877 	rc = spdk_fd_group_create(&thread->fgrp);
1878 	if (rc) {
1879 		return rc;
1880 	}
1881 
1882 	thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1883 	if (thread->msg_fd < 0) {
1884 		rc = -errno;
1885 		spdk_fd_group_destroy(thread->fgrp);
1886 		thread->fgrp = NULL;
1887 
1888 		return rc;
1889 	}
1890 
1891 	return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread);
1892 }
1893 #else
1894 static int
1895 thread_interrupt_create(struct spdk_thread *thread)
1896 {
1897 	return -ENOTSUP;
1898 }
1899 #endif
1900 
1901 struct spdk_interrupt *
1902 spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
1903 			void *arg, const char *name)
1904 {
1905 	struct spdk_thread *thread;
1906 	struct spdk_interrupt *intr;
1907 
1908 	thread = spdk_get_thread();
1909 	if (!thread) {
1910 		assert(false);
1911 		return NULL;
1912 	}
1913 
1914 	if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
1915 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1916 		return NULL;
1917 	}
1918 
1919 	if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) {
1920 		return NULL;
1921 	}
1922 
1923 	intr = calloc(1, sizeof(*intr));
1924 	if (intr == NULL) {
1925 		SPDK_ERRLOG("Interrupt handler allocation failed\n");
1926 		return NULL;
1927 	}
1928 
1929 	if (name) {
1930 		snprintf(intr->name, sizeof(intr->name), "%s", name);
1931 	} else {
1932 		snprintf(intr->name, sizeof(intr->name), "%p", fn);
1933 	}
1934 
1935 	intr->efd = efd;
1936 	intr->thread = thread;
1937 
1938 	return intr;
1939 }
1940 
1941 void
1942 spdk_interrupt_unregister(struct spdk_interrupt **pintr)
1943 {
1944 	struct spdk_thread *thread;
1945 	struct spdk_interrupt *intr;
1946 
1947 	intr = *pintr;
1948 	if (intr == NULL) {
1949 		return;
1950 	}
1951 
1952 	*pintr = NULL;
1953 
1954 	thread = spdk_get_thread();
1955 	if (!thread) {
1956 		assert(false);
1957 		return;
1958 	}
1959 
1960 	if (intr->thread != thread) {
1961 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
1962 		assert(false);
1963 		return;
1964 	}
1965 
1966 	spdk_fd_group_remove(thread->fgrp, intr->efd);
1967 	free(intr);
1968 }
1969 
1970 int
1971 spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
1972 			       enum spdk_interrupt_event_types event_types)
1973 {
1974 	struct spdk_thread *thread;
1975 
1976 	thread = spdk_get_thread();
1977 	if (!thread) {
1978 		assert(false);
1979 		return -EINVAL;
1980 	}
1981 
1982 	if (intr->thread != thread) {
1983 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
1984 		assert(false);
1985 		return -EINVAL;
1986 	}
1987 
1988 	return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
1989 }
1990 
1991 int
1992 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
1993 {
1994 	return spdk_fd_group_get_fd(thread->fgrp);
1995 }
1996 
1997 static bool g_interrupt_mode = false;
1998 
1999 int
2000 spdk_interrupt_mode_enable(void)
2001 {
2002 #ifdef __linux__
2003 	SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
2004 	g_interrupt_mode = true;
2005 	return 0;
2006 #else
2007 	SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
2008 	g_interrupt_mode = false;
2009 	return -ENOTSUP;
2010 #endif
2011 }
2012 
2013 bool
2014 spdk_interrupt_mode_is_enabled(void)
2015 {
2016 	return g_interrupt_mode;
2017 }
2018 
2019 SPDK_LOG_REGISTER_COMPONENT(thread)
2020