xref: /spdk/lib/thread/thread.c (revision 784b9d48746955f210926648a0131f84f58de76f)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/env.h"
10 #include "spdk/likely.h"
11 #include "spdk/queue.h"
12 #include "spdk/string.h"
13 #include "spdk/thread.h"
14 #include "spdk/trace.h"
15 #include "spdk/util.h"
16 #include "spdk/fd_group.h"
17 
18 #include "spdk/log.h"
19 #include "spdk_internal/thread.h"
20 #include "spdk_internal/usdt.h"
21 #include "thread_internal.h"
22 
23 #include "spdk_internal/trace_defs.h"
24 
25 #ifdef __linux__
26 #include <sys/timerfd.h>
27 #include <sys/eventfd.h>
28 #endif
29 
30 #define SPDK_MSG_BATCH_SIZE		8
31 #define SPDK_MAX_DEVICE_NAME_LEN	256
32 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
33 #define SPDK_MAX_POLLER_NAME_LEN	256
34 #define SPDK_MAX_THREAD_NAME_LEN	256
35 
36 static struct spdk_thread *g_app_thread;
37 
38 enum spdk_poller_state {
39 	/* The poller is registered with a thread but not currently executing its fn. */
40 	SPDK_POLLER_STATE_WAITING,
41 
42 	/* The poller is currently running its fn. */
43 	SPDK_POLLER_STATE_RUNNING,
44 
45 	/* The poller was unregistered during the execution of its fn. */
46 	SPDK_POLLER_STATE_UNREGISTERED,
47 
48 	/* The poller is in the process of being paused.  It will be paused
49 	 * during the next time it's supposed to be executed.
50 	 */
51 	SPDK_POLLER_STATE_PAUSING,
52 
53 	/* The poller is registered but currently paused.  It's on the
54 	 * paused_pollers list.
55 	 */
56 	SPDK_POLLER_STATE_PAUSED,
57 };
58 
59 struct spdk_poller {
60 	TAILQ_ENTRY(spdk_poller)	tailq;
61 	RB_ENTRY(spdk_poller)		node;
62 
63 	/* Current state of the poller; should only be accessed from the poller's thread. */
64 	enum spdk_poller_state		state;
65 
66 	uint64_t			period_ticks;
67 	uint64_t			next_run_tick;
68 	uint64_t			run_count;
69 	uint64_t			busy_count;
70 	uint64_t			id;
71 	spdk_poller_fn			fn;
72 	void				*arg;
73 	struct spdk_thread		*thread;
74 	/* Native interruptfd for period or busy poller */
75 	int				interruptfd;
76 	spdk_poller_set_interrupt_mode_cb set_intr_cb_fn;
77 	void				*set_intr_cb_arg;
78 
79 	char				name[SPDK_MAX_POLLER_NAME_LEN + 1];
80 };
81 
82 enum spdk_thread_state {
83 	/* The thread is processing poller and message by spdk_thread_poll(). */
84 	SPDK_THREAD_STATE_RUNNING,
85 
86 	/* The thread is in the process of termination. It reaps unregistering
87 	 * poller are releasing I/O channel.
88 	 */
89 	SPDK_THREAD_STATE_EXITING,
90 
91 	/* The thread is exited. It is ready to call spdk_thread_destroy(). */
92 	SPDK_THREAD_STATE_EXITED,
93 };
94 
95 struct spdk_thread {
96 	uint64_t			tsc_last;
97 	struct spdk_thread_stats	stats;
98 	/*
99 	 * Contains pollers actively running on this thread.  Pollers
100 	 *  are run round-robin. The thread takes one poller from the head
101 	 *  of the ring, executes it, then puts it back at the tail of
102 	 *  the ring.
103 	 */
104 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
105 	/**
106 	 * Contains pollers running on this thread with a periodic timer.
107 	 */
108 	RB_HEAD(timed_pollers_tree, spdk_poller)	timed_pollers;
109 	struct spdk_poller				*first_timed_poller;
110 	/*
111 	 * Contains paused pollers.  Pollers on this queue are waiting until
112 	 * they are resumed (in which case they're put onto the active/timer
113 	 * queues) or unregistered.
114 	 */
115 	TAILQ_HEAD(paused_pollers_head, spdk_poller)	paused_pollers;
116 	struct spdk_ring		*messages;
117 	int				msg_fd;
118 	SLIST_HEAD(, spdk_msg)		msg_cache;
119 	size_t				msg_cache_count;
120 	spdk_msg_fn			critical_msg;
121 	uint64_t			id;
122 	uint64_t			next_poller_id;
123 	enum spdk_thread_state		state;
124 	int				pending_unregister_count;
125 
126 	RB_HEAD(io_channel_tree, spdk_io_channel)	io_channels;
127 	TAILQ_ENTRY(spdk_thread)			tailq;
128 
129 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
130 	struct spdk_cpuset		cpumask;
131 	uint64_t			exit_timeout_tsc;
132 
133 	int32_t				lock_count;
134 
135 	/* Indicates whether this spdk_thread currently runs in interrupt. */
136 	bool				in_interrupt;
137 	bool				poller_unregistered;
138 	struct spdk_fd_group		*fgrp;
139 
140 	/* User context allocated at the end */
141 	uint8_t				ctx[0];
142 };
143 
144 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
145 
146 static spdk_new_thread_fn g_new_thread_fn = NULL;
147 static spdk_thread_op_fn g_thread_op_fn = NULL;
148 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
149 static size_t g_ctx_sz = 0;
150 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
151  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
152  * SPDK application is required.
153  */
154 static uint64_t g_thread_id = 1;
155 
156 enum spin_error {
157 	SPIN_ERR_NONE,
158 	/* Trying to use an SPDK lock while not on an SPDK thread */
159 	SPIN_ERR_NOT_SPDK_THREAD,
160 	/* Trying to lock a lock already held by this SPDK thread */
161 	SPIN_ERR_DEADLOCK,
162 	/* Trying to unlock a lock not held by this SPDK thread */
163 	SPIN_ERR_WRONG_THREAD,
164 	/* pthread_spin_*() returned an error */
165 	SPIN_ERR_PTHREAD,
166 	/* Trying to destroy a lock that is held */
167 	SPIN_ERR_LOCK_HELD,
168 	/* lock_count is invalid */
169 	SPIN_ERR_LOCK_COUNT,
170 	/*
171 	 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to
172 	 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to
173 	 * deadlock when another SPDK thread on the same pthread tries to take that lock.
174 	 */
175 	SPIN_ERR_HOLD_DURING_SWITCH,
176 	/* Must be last, not an actual error code */
177 	SPIN_ERR_LAST
178 };
179 
180 static const char *spin_error_strings[] = {
181 	[SPIN_ERR_NONE]			= "No error",
182 	[SPIN_ERR_NOT_SPDK_THREAD]	= "Not an SPDK thread",
183 	[SPIN_ERR_DEADLOCK]		= "Deadlock detected",
184 	[SPIN_ERR_WRONG_THREAD]		= "Unlock on wrong SPDK thread",
185 	[SPIN_ERR_PTHREAD]		= "Error from pthread_spinlock",
186 	[SPIN_ERR_LOCK_HELD]		= "Destroying a held spinlock",
187 	[SPIN_ERR_LOCK_COUNT]		= "Lock count is invalid",
188 	[SPIN_ERR_HOLD_DURING_SWITCH]	= "Lock(s) held while SPDK thread going off CPU",
189 };
190 
191 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \
192 				? "Unknown error" : spin_error_strings[err]
193 
194 static void
195 __posix_abort(enum spin_error err)
196 {
197 	abort();
198 }
199 
200 typedef void (*spin_abort)(enum spin_error err);
201 spin_abort g_spin_abort_fn = __posix_abort;
202 
203 #define SPIN_ASSERT_IMPL(cond, err, ret) \
204 	do { \
205 		if (spdk_unlikely(!(cond))) { \
206 			SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \
207 				    SPIN_ERROR_STRING(err), #cond); \
208 			g_spin_abort_fn(err); \
209 			ret; \
210 		} \
211 	} while (0)
212 #define SPIN_ASSERT_RETURN_VOID(cond, err)	SPIN_ASSERT_IMPL(cond, err, return)
213 #define SPIN_ASSERT_RETURN(cond, err, ret)	SPIN_ASSERT_IMPL(cond, err, return ret)
214 #define SPIN_ASSERT(cond, err)			SPIN_ASSERT_IMPL(cond, err,)
215 
216 struct io_device {
217 	void				*io_device;
218 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
219 	spdk_io_channel_create_cb	create_cb;
220 	spdk_io_channel_destroy_cb	destroy_cb;
221 	spdk_io_device_unregister_cb	unregister_cb;
222 	struct spdk_thread		*unregister_thread;
223 	uint32_t			ctx_size;
224 	uint32_t			for_each_count;
225 	RB_ENTRY(io_device)		node;
226 
227 	uint32_t			refcnt;
228 
229 	bool				pending_unregister;
230 	bool				unregistered;
231 };
232 
233 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices);
234 
235 static int
236 io_device_cmp(struct io_device *dev1, struct io_device *dev2)
237 {
238 	return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device);
239 }
240 
241 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp);
242 
243 static int
244 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2)
245 {
246 	return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev);
247 }
248 
249 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp);
250 
251 struct spdk_msg {
252 	spdk_msg_fn		fn;
253 	void			*arg;
254 
255 	SLIST_ENTRY(spdk_msg)	link;
256 };
257 
258 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
259 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
260 
261 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
262 static uint32_t g_thread_count = 0;
263 
264 static __thread struct spdk_thread *tls_thread = NULL;
265 
266 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD)
267 {
268 	spdk_trace_register_description("THREAD_IOCH_GET",
269 					TRACE_THREAD_IOCH_GET,
270 					OWNER_NONE, OBJECT_NONE, 0,
271 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
272 	spdk_trace_register_description("THREAD_IOCH_PUT",
273 					TRACE_THREAD_IOCH_PUT,
274 					OWNER_NONE, OBJECT_NONE, 0,
275 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
276 }
277 
278 /*
279  * If this compare function returns zero when two next_run_ticks are equal,
280  * the macro RB_INSERT() returns a pointer to the element with the same
281  * next_run_tick.
282  *
283  * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element
284  * to remove as a parameter.
285  *
286  * Hence we allow RB_INSERT() to insert elements with the same keys on the right
287  * side by returning 1 when two next_run_ticks are equal.
288  */
289 static inline int
290 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2)
291 {
292 	if (poller1->next_run_tick < poller2->next_run_tick) {
293 		return -1;
294 	} else {
295 		return 1;
296 	}
297 }
298 
299 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare);
300 
301 static inline struct spdk_thread *
302 _get_thread(void)
303 {
304 	return tls_thread;
305 }
306 
307 static int
308 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz)
309 {
310 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
311 
312 	g_ctx_sz = ctx_sz;
313 
314 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
315 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz,
316 			     sizeof(struct spdk_msg),
317 			     0, /* No cache. We do our own. */
318 			     SPDK_ENV_SOCKET_ID_ANY);
319 
320 	SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n",
321 		      msg_mempool_sz);
322 
323 	if (!g_spdk_msg_mempool) {
324 		SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
325 		return -ENOMEM;
326 	}
327 
328 	return 0;
329 }
330 
331 static void thread_interrupt_destroy(struct spdk_thread *thread);
332 static int thread_interrupt_create(struct spdk_thread *thread);
333 
334 static void
335 _free_thread(struct spdk_thread *thread)
336 {
337 	struct spdk_io_channel *ch;
338 	struct spdk_msg *msg;
339 	struct spdk_poller *poller, *ptmp;
340 
341 	RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
342 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
343 			    thread->name, ch->dev->name);
344 	}
345 
346 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
347 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
348 			SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
349 				     poller->name);
350 		}
351 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
352 		free(poller);
353 	}
354 
355 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) {
356 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
357 			SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
358 				     poller->name);
359 		}
360 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
361 		free(poller);
362 	}
363 
364 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
365 		SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
366 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
367 		free(poller);
368 	}
369 
370 	pthread_mutex_lock(&g_devlist_mutex);
371 	assert(g_thread_count > 0);
372 	g_thread_count--;
373 	TAILQ_REMOVE(&g_threads, thread, tailq);
374 	pthread_mutex_unlock(&g_devlist_mutex);
375 
376 	msg = SLIST_FIRST(&thread->msg_cache);
377 	while (msg != NULL) {
378 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
379 
380 		assert(thread->msg_cache_count > 0);
381 		thread->msg_cache_count--;
382 		spdk_mempool_put(g_spdk_msg_mempool, msg);
383 
384 		msg = SLIST_FIRST(&thread->msg_cache);
385 	}
386 
387 	assert(thread->msg_cache_count == 0);
388 
389 	if (spdk_interrupt_mode_is_enabled()) {
390 		thread_interrupt_destroy(thread);
391 	}
392 
393 	spdk_ring_free(thread->messages);
394 	free(thread);
395 }
396 
397 int
398 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
399 {
400 	assert(g_new_thread_fn == NULL);
401 	assert(g_thread_op_fn == NULL);
402 
403 	if (new_thread_fn == NULL) {
404 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
405 	} else {
406 		g_new_thread_fn = new_thread_fn;
407 	}
408 
409 	return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE);
410 }
411 
412 int
413 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
414 			 spdk_thread_op_supported_fn thread_op_supported_fn,
415 			 size_t ctx_sz, size_t msg_mempool_sz)
416 {
417 	assert(g_new_thread_fn == NULL);
418 	assert(g_thread_op_fn == NULL);
419 	assert(g_thread_op_supported_fn == NULL);
420 
421 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
422 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
423 		return -EINVAL;
424 	}
425 
426 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
427 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
428 	} else {
429 		g_thread_op_fn = thread_op_fn;
430 		g_thread_op_supported_fn = thread_op_supported_fn;
431 	}
432 
433 	return _thread_lib_init(ctx_sz, msg_mempool_sz);
434 }
435 
436 void
437 spdk_thread_lib_fini(void)
438 {
439 	struct io_device *dev;
440 
441 	RB_FOREACH(dev, io_device_tree, &g_io_devices) {
442 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
443 	}
444 
445 	g_new_thread_fn = NULL;
446 	g_thread_op_fn = NULL;
447 	g_thread_op_supported_fn = NULL;
448 	g_ctx_sz = 0;
449 	if (g_app_thread != NULL) {
450 		_free_thread(g_app_thread);
451 		g_app_thread = NULL;
452 	}
453 
454 	if (g_spdk_msg_mempool) {
455 		spdk_mempool_free(g_spdk_msg_mempool);
456 		g_spdk_msg_mempool = NULL;
457 	}
458 }
459 
460 struct spdk_thread *
461 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask)
462 {
463 	struct spdk_thread *thread, *null_thread;
464 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
465 	int rc = 0, i;
466 
467 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
468 	if (!thread) {
469 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
470 		return NULL;
471 	}
472 
473 	if (cpumask) {
474 		spdk_cpuset_copy(&thread->cpumask, cpumask);
475 	} else {
476 		spdk_cpuset_negate(&thread->cpumask);
477 	}
478 
479 	RB_INIT(&thread->io_channels);
480 	TAILQ_INIT(&thread->active_pollers);
481 	RB_INIT(&thread->timed_pollers);
482 	TAILQ_INIT(&thread->paused_pollers);
483 	SLIST_INIT(&thread->msg_cache);
484 	thread->msg_cache_count = 0;
485 
486 	thread->tsc_last = spdk_get_ticks();
487 
488 	/* Monotonic increasing ID is set to each created poller beginning at 1. Once the
489 	 * ID exceeds UINT64_MAX a warning message is logged
490 	 */
491 	thread->next_poller_id = 1;
492 
493 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
494 	if (!thread->messages) {
495 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
496 		free(thread);
497 		return NULL;
498 	}
499 
500 	/* Fill the local message pool cache. */
501 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
502 	if (rc == 0) {
503 		/* If we can't populate the cache it's ok. The cache will get filled
504 		 * up organically as messages are passed to the thread. */
505 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
506 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
507 			thread->msg_cache_count++;
508 		}
509 	}
510 
511 	if (name) {
512 		snprintf(thread->name, sizeof(thread->name), "%s", name);
513 	} else {
514 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
515 	}
516 
517 	pthread_mutex_lock(&g_devlist_mutex);
518 	if (g_thread_id == 0) {
519 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
520 		pthread_mutex_unlock(&g_devlist_mutex);
521 		_free_thread(thread);
522 		return NULL;
523 	}
524 	thread->id = g_thread_id++;
525 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
526 	g_thread_count++;
527 	pthread_mutex_unlock(&g_devlist_mutex);
528 
529 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
530 		      thread->id, thread->name);
531 
532 	if (spdk_interrupt_mode_is_enabled()) {
533 		thread->in_interrupt = true;
534 		rc = thread_interrupt_create(thread);
535 		if (rc != 0) {
536 			_free_thread(thread);
537 			return NULL;
538 		}
539 	}
540 
541 	if (g_new_thread_fn) {
542 		rc = g_new_thread_fn(thread);
543 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
544 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
545 	}
546 
547 	if (rc != 0) {
548 		_free_thread(thread);
549 		return NULL;
550 	}
551 
552 	thread->state = SPDK_THREAD_STATE_RUNNING;
553 
554 	/* If this is the first thread, save it as the app thread.  Use an atomic
555 	 * compare + exchange to guard against crazy users who might try to
556 	 * call spdk_thread_create() simultaneously on multiple threads.
557 	 */
558 	null_thread = NULL;
559 	__atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false,
560 				    __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
561 
562 	return thread;
563 }
564 
565 struct spdk_thread *
566 spdk_thread_get_app_thread(void)
567 {
568 	return g_app_thread;
569 }
570 
571 void
572 spdk_set_thread(struct spdk_thread *thread)
573 {
574 	tls_thread = thread;
575 }
576 
577 static void
578 thread_exit(struct spdk_thread *thread, uint64_t now)
579 {
580 	struct spdk_poller *poller;
581 	struct spdk_io_channel *ch;
582 
583 	if (now >= thread->exit_timeout_tsc) {
584 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
585 			    thread->name);
586 		goto exited;
587 	}
588 
589 	if (spdk_ring_count(thread->messages) > 0) {
590 		SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name);
591 		return;
592 	}
593 
594 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
595 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
596 			SPDK_INFOLOG(thread,
597 				     "thread %s still has active poller %s\n",
598 				     thread->name, poller->name);
599 			return;
600 		}
601 	}
602 
603 	RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) {
604 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
605 			SPDK_INFOLOG(thread,
606 				     "thread %s still has active timed poller %s\n",
607 				     thread->name, poller->name);
608 			return;
609 		}
610 	}
611 
612 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
613 		SPDK_INFOLOG(thread,
614 			     "thread %s still has paused poller %s\n",
615 			     thread->name, poller->name);
616 		return;
617 	}
618 
619 	RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
620 		SPDK_INFOLOG(thread,
621 			     "thread %s still has channel for io_device %s\n",
622 			     thread->name, ch->dev->name);
623 		return;
624 	}
625 
626 	if (thread->pending_unregister_count > 0) {
627 		SPDK_INFOLOG(thread,
628 			     "thread %s is still unregistering io_devices\n",
629 			     thread->name);
630 		return;
631 	}
632 
633 exited:
634 	thread->state = SPDK_THREAD_STATE_EXITED;
635 	if (spdk_unlikely(thread->in_interrupt)) {
636 		g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
637 	}
638 }
639 
640 int
641 spdk_thread_exit(struct spdk_thread *thread)
642 {
643 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
644 
645 	assert(tls_thread == thread);
646 
647 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
648 		SPDK_INFOLOG(thread,
649 			     "thread %s is already exiting\n",
650 			     thread->name);
651 		return 0;
652 	}
653 
654 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
655 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
656 	thread->state = SPDK_THREAD_STATE_EXITING;
657 	return 0;
658 }
659 
660 bool
661 spdk_thread_is_running(struct spdk_thread *thread)
662 {
663 	return thread->state == SPDK_THREAD_STATE_RUNNING;
664 }
665 
666 bool
667 spdk_thread_is_exited(struct spdk_thread *thread)
668 {
669 	return thread->state == SPDK_THREAD_STATE_EXITED;
670 }
671 
672 void
673 spdk_thread_destroy(struct spdk_thread *thread)
674 {
675 	assert(thread != NULL);
676 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
677 
678 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
679 
680 	if (tls_thread == thread) {
681 		tls_thread = NULL;
682 	}
683 
684 	/* To be safe, do not free the app thread until spdk_thread_lib_fini(). */
685 	if (thread != g_app_thread) {
686 		_free_thread(thread);
687 	}
688 }
689 
690 void *
691 spdk_thread_get_ctx(struct spdk_thread *thread)
692 {
693 	if (g_ctx_sz > 0) {
694 		return thread->ctx;
695 	}
696 
697 	return NULL;
698 }
699 
700 struct spdk_cpuset *
701 spdk_thread_get_cpumask(struct spdk_thread *thread)
702 {
703 	return &thread->cpumask;
704 }
705 
706 int
707 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
708 {
709 	struct spdk_thread *thread;
710 
711 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
712 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
713 		assert(false);
714 		return -ENOTSUP;
715 	}
716 
717 	thread = spdk_get_thread();
718 	if (!thread) {
719 		SPDK_ERRLOG("Called from non-SPDK thread\n");
720 		assert(false);
721 		return -EINVAL;
722 	}
723 
724 	spdk_cpuset_copy(&thread->cpumask, cpumask);
725 
726 	/* Invoke framework's reschedule operation. If this function is called multiple times
727 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
728 	 * reschedule operation.
729 	 */
730 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
731 
732 	return 0;
733 }
734 
735 struct spdk_thread *
736 spdk_thread_get_from_ctx(void *ctx)
737 {
738 	if (ctx == NULL) {
739 		assert(false);
740 		return NULL;
741 	}
742 
743 	assert(g_ctx_sz > 0);
744 
745 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
746 }
747 
748 static inline uint32_t
749 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
750 {
751 	unsigned count, i;
752 	void *messages[SPDK_MSG_BATCH_SIZE];
753 	uint64_t notify = 1;
754 	int rc;
755 
756 #ifdef DEBUG
757 	/*
758 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
759 	 * so we will never actually read uninitialized data from events, but just to be sure
760 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
761 	 */
762 	memset(messages, 0, sizeof(messages));
763 #endif
764 
765 	if (max_msgs > 0) {
766 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
767 	} else {
768 		max_msgs = SPDK_MSG_BATCH_SIZE;
769 	}
770 
771 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
772 	if (spdk_unlikely(thread->in_interrupt) &&
773 	    spdk_ring_count(thread->messages) != 0) {
774 		rc = write(thread->msg_fd, &notify, sizeof(notify));
775 		if (rc < 0) {
776 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
777 		}
778 	}
779 	if (count == 0) {
780 		return 0;
781 	}
782 
783 	for (i = 0; i < count; i++) {
784 		struct spdk_msg *msg = messages[i];
785 
786 		assert(msg != NULL);
787 
788 		SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg);
789 
790 		msg->fn(msg->arg);
791 
792 		SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
793 
794 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
795 			/* Insert the messages at the head. We want to re-use the hot
796 			 * ones. */
797 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
798 			thread->msg_cache_count++;
799 		} else {
800 			spdk_mempool_put(g_spdk_msg_mempool, msg);
801 		}
802 	}
803 
804 	return count;
805 }
806 
807 static void
808 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
809 {
810 	struct spdk_poller *tmp __attribute__((unused));
811 
812 	poller->next_run_tick = now + poller->period_ticks;
813 
814 	/*
815 	 * Insert poller in the thread's timed_pollers tree by next scheduled run time
816 	 * as its key.
817 	 */
818 	tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller);
819 	assert(tmp == NULL);
820 
821 	/* Update the cache only if it is empty or the inserted poller is earlier than it.
822 	 * RB_MIN() is not necessary here because all pollers, which has exactly the same
823 	 * next_run_tick as the existing poller, are inserted on the right side.
824 	 */
825 	if (thread->first_timed_poller == NULL ||
826 	    poller->next_run_tick < thread->first_timed_poller->next_run_tick) {
827 		thread->first_timed_poller = poller;
828 	}
829 }
830 
831 static inline void
832 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller)
833 {
834 	struct spdk_poller *tmp __attribute__((unused));
835 
836 	tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
837 	assert(tmp != NULL);
838 
839 	/* This function is not used in any case that is performance critical.
840 	 * Update the cache simply by RB_MIN() if it needs to be changed.
841 	 */
842 	if (thread->first_timed_poller == poller) {
843 		thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers);
844 	}
845 }
846 
847 static void
848 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
849 {
850 	if (poller->period_ticks) {
851 		poller_insert_timer(thread, poller, spdk_get_ticks());
852 	} else {
853 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
854 	}
855 }
856 
857 static inline void
858 thread_update_stats(struct spdk_thread *thread, uint64_t end,
859 		    uint64_t start, int rc)
860 {
861 	if (rc == 0) {
862 		/* Poller status idle */
863 		thread->stats.idle_tsc += end - start;
864 	} else if (rc > 0) {
865 		/* Poller status busy */
866 		thread->stats.busy_tsc += end - start;
867 	}
868 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
869 	thread->tsc_last = end;
870 }
871 
872 static inline int
873 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller)
874 {
875 	int rc;
876 
877 	switch (poller->state) {
878 	case SPDK_POLLER_STATE_UNREGISTERED:
879 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
880 		free(poller);
881 		return 0;
882 	case SPDK_POLLER_STATE_PAUSING:
883 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
884 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
885 		poller->state = SPDK_POLLER_STATE_PAUSED;
886 		return 0;
887 	case SPDK_POLLER_STATE_WAITING:
888 		break;
889 	default:
890 		assert(false);
891 		break;
892 	}
893 
894 	poller->state = SPDK_POLLER_STATE_RUNNING;
895 	rc = poller->fn(poller->arg);
896 
897 	SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
898 
899 	poller->run_count++;
900 	if (rc > 0) {
901 		poller->busy_count++;
902 	}
903 
904 #ifdef DEBUG
905 	if (rc == -1) {
906 		SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
907 	}
908 #endif
909 
910 	switch (poller->state) {
911 	case SPDK_POLLER_STATE_UNREGISTERED:
912 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
913 		free(poller);
914 		break;
915 	case SPDK_POLLER_STATE_PAUSING:
916 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
917 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
918 		poller->state = SPDK_POLLER_STATE_PAUSED;
919 		break;
920 	case SPDK_POLLER_STATE_PAUSED:
921 	case SPDK_POLLER_STATE_WAITING:
922 		break;
923 	case SPDK_POLLER_STATE_RUNNING:
924 		poller->state = SPDK_POLLER_STATE_WAITING;
925 		break;
926 	default:
927 		assert(false);
928 		break;
929 	}
930 
931 	return rc;
932 }
933 
934 static inline int
935 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller,
936 			    uint64_t now)
937 {
938 	int rc;
939 
940 	switch (poller->state) {
941 	case SPDK_POLLER_STATE_UNREGISTERED:
942 		free(poller);
943 		return 0;
944 	case SPDK_POLLER_STATE_PAUSING:
945 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
946 		poller->state = SPDK_POLLER_STATE_PAUSED;
947 		return 0;
948 	case SPDK_POLLER_STATE_WAITING:
949 		break;
950 	default:
951 		assert(false);
952 		break;
953 	}
954 
955 	poller->state = SPDK_POLLER_STATE_RUNNING;
956 	rc = poller->fn(poller->arg);
957 
958 	SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
959 
960 	poller->run_count++;
961 	if (rc > 0) {
962 		poller->busy_count++;
963 	}
964 
965 #ifdef DEBUG
966 	if (rc == -1) {
967 		SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
968 	}
969 #endif
970 
971 	switch (poller->state) {
972 	case SPDK_POLLER_STATE_UNREGISTERED:
973 		free(poller);
974 		break;
975 	case SPDK_POLLER_STATE_PAUSING:
976 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
977 		poller->state = SPDK_POLLER_STATE_PAUSED;
978 		break;
979 	case SPDK_POLLER_STATE_PAUSED:
980 		break;
981 	case SPDK_POLLER_STATE_RUNNING:
982 		poller->state = SPDK_POLLER_STATE_WAITING;
983 	/* fallthrough */
984 	case SPDK_POLLER_STATE_WAITING:
985 		poller_insert_timer(thread, poller, now);
986 		break;
987 	default:
988 		assert(false);
989 		break;
990 	}
991 
992 	return rc;
993 }
994 
995 static int
996 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
997 {
998 	uint32_t msg_count;
999 	struct spdk_poller *poller, *tmp;
1000 	spdk_msg_fn critical_msg;
1001 	int rc = 0;
1002 
1003 	thread->tsc_last = now;
1004 
1005 	critical_msg = thread->critical_msg;
1006 	if (spdk_unlikely(critical_msg != NULL)) {
1007 		critical_msg(NULL);
1008 		thread->critical_msg = NULL;
1009 		rc = 1;
1010 	}
1011 
1012 	msg_count = msg_queue_run_batch(thread, max_msgs);
1013 	if (msg_count) {
1014 		rc = 1;
1015 	}
1016 
1017 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
1018 				   active_pollers_head, tailq, tmp) {
1019 		int poller_rc;
1020 
1021 		poller_rc = thread_execute_poller(thread, poller);
1022 		if (poller_rc > rc) {
1023 			rc = poller_rc;
1024 		}
1025 	}
1026 
1027 	poller = thread->first_timed_poller;
1028 	while (poller != NULL) {
1029 		int timer_rc = 0;
1030 
1031 		if (now < poller->next_run_tick) {
1032 			break;
1033 		}
1034 
1035 		tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller);
1036 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
1037 
1038 		/* Update the cache to the next timed poller in the list
1039 		 * only if the current poller is still the closest, otherwise,
1040 		 * do nothing because the cache has been already updated.
1041 		 */
1042 		if (thread->first_timed_poller == poller) {
1043 			thread->first_timed_poller = tmp;
1044 		}
1045 
1046 		timer_rc = thread_execute_timed_poller(thread, poller, now);
1047 		if (timer_rc > rc) {
1048 			rc = timer_rc;
1049 		}
1050 
1051 		poller = tmp;
1052 	}
1053 
1054 	return rc;
1055 }
1056 
1057 int
1058 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
1059 {
1060 	struct spdk_thread *orig_thread;
1061 	int rc;
1062 	uint64_t notify = 1;
1063 
1064 	orig_thread = _get_thread();
1065 	tls_thread = thread;
1066 
1067 	if (now == 0) {
1068 		now = spdk_get_ticks();
1069 	}
1070 
1071 	if (spdk_likely(!thread->in_interrupt)) {
1072 		rc = thread_poll(thread, max_msgs, now);
1073 		if (spdk_unlikely(thread->in_interrupt)) {
1074 			/* The thread transitioned to interrupt mode during the above poll.
1075 			 * Poll it one more time in case that during the transition time
1076 			 * there is msg received without notification.
1077 			 */
1078 			rc = thread_poll(thread, max_msgs, now);
1079 		}
1080 	} else {
1081 		/* Non-block wait on thread's fd_group */
1082 		rc = spdk_fd_group_wait(thread->fgrp, 0);
1083 		SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
1084 		if (spdk_unlikely(!thread->in_interrupt)) {
1085 			/* The thread transitioned to poll mode in a msg during the above processing.
1086 			 * Clear msg_fd since thread messages will be polled directly in poll mode.
1087 			 */
1088 			rc = read(thread->msg_fd, &notify, sizeof(notify));
1089 			if (rc < 0 && errno != EAGAIN) {
1090 				SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno));
1091 			}
1092 		}
1093 
1094 		/* Reap unregistered pollers out of poller execution in intr mode */
1095 		if (spdk_unlikely(thread->poller_unregistered)) {
1096 			struct spdk_poller *poller, *tmp;
1097 
1098 			TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
1099 						   active_pollers_head, tailq, tmp) {
1100 				if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1101 					TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1102 					free(poller);
1103 				}
1104 			}
1105 
1106 			RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
1107 				if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1108 					poller_remove_timer(thread, poller);
1109 					free(poller);
1110 				}
1111 			}
1112 
1113 			thread->poller_unregistered = false;
1114 		}
1115 	}
1116 
1117 
1118 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
1119 		thread_exit(thread, now);
1120 	}
1121 
1122 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
1123 
1124 	tls_thread = orig_thread;
1125 
1126 	return rc;
1127 }
1128 
1129 uint64_t
1130 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
1131 {
1132 	struct spdk_poller *poller;
1133 
1134 	poller = thread->first_timed_poller;
1135 	if (poller) {
1136 		return poller->next_run_tick;
1137 	}
1138 
1139 	return 0;
1140 }
1141 
1142 int
1143 spdk_thread_has_active_pollers(struct spdk_thread *thread)
1144 {
1145 	return !TAILQ_EMPTY(&thread->active_pollers);
1146 }
1147 
1148 static bool
1149 thread_has_unpaused_pollers(struct spdk_thread *thread)
1150 {
1151 	if (TAILQ_EMPTY(&thread->active_pollers) &&
1152 	    RB_EMPTY(&thread->timed_pollers)) {
1153 		return false;
1154 	}
1155 
1156 	return true;
1157 }
1158 
1159 bool
1160 spdk_thread_has_pollers(struct spdk_thread *thread)
1161 {
1162 	if (!thread_has_unpaused_pollers(thread) &&
1163 	    TAILQ_EMPTY(&thread->paused_pollers)) {
1164 		return false;
1165 	}
1166 
1167 	return true;
1168 }
1169 
1170 bool
1171 spdk_thread_is_idle(struct spdk_thread *thread)
1172 {
1173 	if (spdk_ring_count(thread->messages) ||
1174 	    thread_has_unpaused_pollers(thread) ||
1175 	    thread->critical_msg != NULL) {
1176 		return false;
1177 	}
1178 
1179 	return true;
1180 }
1181 
1182 uint32_t
1183 spdk_thread_get_count(void)
1184 {
1185 	/*
1186 	 * Return cached value of the current thread count.  We could acquire the
1187 	 *  lock and iterate through the TAILQ of threads to count them, but that
1188 	 *  count could still be invalidated after we release the lock.
1189 	 */
1190 	return g_thread_count;
1191 }
1192 
1193 struct spdk_thread *
1194 spdk_get_thread(void)
1195 {
1196 	return _get_thread();
1197 }
1198 
1199 const char *
1200 spdk_thread_get_name(const struct spdk_thread *thread)
1201 {
1202 	return thread->name;
1203 }
1204 
1205 uint64_t
1206 spdk_thread_get_id(const struct spdk_thread *thread)
1207 {
1208 	return thread->id;
1209 }
1210 
1211 struct spdk_thread *
1212 spdk_thread_get_by_id(uint64_t id)
1213 {
1214 	struct spdk_thread *thread;
1215 
1216 	if (id == 0 || id >= g_thread_id) {
1217 		SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
1218 		return NULL;
1219 	}
1220 	pthread_mutex_lock(&g_devlist_mutex);
1221 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1222 		if (thread->id == id) {
1223 			break;
1224 		}
1225 	}
1226 	pthread_mutex_unlock(&g_devlist_mutex);
1227 	return thread;
1228 }
1229 
1230 int
1231 spdk_thread_get_stats(struct spdk_thread_stats *stats)
1232 {
1233 	struct spdk_thread *thread;
1234 
1235 	thread = _get_thread();
1236 	if (!thread) {
1237 		SPDK_ERRLOG("No thread allocated\n");
1238 		return -EINVAL;
1239 	}
1240 
1241 	if (stats == NULL) {
1242 		return -EINVAL;
1243 	}
1244 
1245 	*stats = thread->stats;
1246 
1247 	return 0;
1248 }
1249 
1250 uint64_t
1251 spdk_thread_get_last_tsc(struct spdk_thread *thread)
1252 {
1253 	if (thread == NULL) {
1254 		thread = _get_thread();
1255 	}
1256 
1257 	return thread->tsc_last;
1258 }
1259 
1260 static inline int
1261 thread_send_msg_notification(const struct spdk_thread *target_thread)
1262 {
1263 	uint64_t notify = 1;
1264 	int rc;
1265 
1266 	/* Not necessary to do notification if interrupt facility is not enabled */
1267 	if (spdk_likely(!spdk_interrupt_mode_is_enabled())) {
1268 		return 0;
1269 	}
1270 
1271 	/* When each spdk_thread can switch between poll and interrupt mode dynamically,
1272 	 * after sending thread msg, it is necessary to check whether target thread runs in
1273 	 * interrupt mode and then decide whether do event notification.
1274 	 */
1275 	if (spdk_unlikely(target_thread->in_interrupt)) {
1276 		rc = write(target_thread->msg_fd, &notify, sizeof(notify));
1277 		if (rc < 0) {
1278 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
1279 			return -EIO;
1280 		}
1281 	}
1282 
1283 	return 0;
1284 }
1285 
1286 int
1287 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
1288 {
1289 	struct spdk_thread *local_thread;
1290 	struct spdk_msg *msg;
1291 	int rc;
1292 
1293 	assert(thread != NULL);
1294 
1295 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1296 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
1297 		return -EIO;
1298 	}
1299 
1300 	local_thread = _get_thread();
1301 
1302 	msg = NULL;
1303 	if (local_thread != NULL) {
1304 		if (local_thread->msg_cache_count > 0) {
1305 			msg = SLIST_FIRST(&local_thread->msg_cache);
1306 			assert(msg != NULL);
1307 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
1308 			local_thread->msg_cache_count--;
1309 		}
1310 	}
1311 
1312 	if (msg == NULL) {
1313 		msg = spdk_mempool_get(g_spdk_msg_mempool);
1314 		if (!msg) {
1315 			SPDK_ERRLOG("msg could not be allocated\n");
1316 			return -ENOMEM;
1317 		}
1318 	}
1319 
1320 	msg->fn = fn;
1321 	msg->arg = ctx;
1322 
1323 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
1324 	if (rc != 1) {
1325 		SPDK_ERRLOG("msg could not be enqueued\n");
1326 		spdk_mempool_put(g_spdk_msg_mempool, msg);
1327 		return -EIO;
1328 	}
1329 
1330 	return thread_send_msg_notification(thread);
1331 }
1332 
1333 int
1334 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
1335 {
1336 	spdk_msg_fn expected = NULL;
1337 
1338 	if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
1339 					 __ATOMIC_SEQ_CST)) {
1340 		return -EIO;
1341 	}
1342 
1343 	return thread_send_msg_notification(thread);
1344 }
1345 
1346 #ifdef __linux__
1347 static int
1348 interrupt_timerfd_process(void *arg)
1349 {
1350 	struct spdk_poller *poller = arg;
1351 	uint64_t exp;
1352 	int rc;
1353 
1354 	/* clear the level of interval timer */
1355 	rc = read(poller->interruptfd, &exp, sizeof(exp));
1356 	if (rc < 0) {
1357 		if (rc == -EAGAIN) {
1358 			return 0;
1359 		}
1360 
1361 		return rc;
1362 	}
1363 
1364 	SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg);
1365 
1366 	return poller->fn(poller->arg);
1367 }
1368 
1369 static int
1370 period_poller_interrupt_init(struct spdk_poller *poller)
1371 {
1372 	struct spdk_fd_group *fgrp = poller->thread->fgrp;
1373 	int timerfd;
1374 	int rc;
1375 
1376 	SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name);
1377 	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
1378 	if (timerfd < 0) {
1379 		return -errno;
1380 	}
1381 
1382 	rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller);
1383 	if (rc < 0) {
1384 		close(timerfd);
1385 		return rc;
1386 	}
1387 
1388 	poller->interruptfd = timerfd;
1389 	return 0;
1390 }
1391 
1392 static void
1393 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1394 {
1395 	int timerfd = poller->interruptfd;
1396 	uint64_t now_tick = spdk_get_ticks();
1397 	uint64_t ticks = spdk_get_ticks_hz();
1398 	int ret;
1399 	struct itimerspec new_tv = {};
1400 	struct itimerspec old_tv = {};
1401 
1402 	assert(poller->period_ticks != 0);
1403 	assert(timerfd >= 0);
1404 
1405 	SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name,
1406 		      interrupt_mode ? "interrupt" : "poll");
1407 
1408 	if (interrupt_mode) {
1409 		/* Set repeated timer expiration */
1410 		new_tv.it_interval.tv_sec = poller->period_ticks / ticks;
1411 		new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks;
1412 
1413 		/* Update next timer expiration */
1414 		if (poller->next_run_tick == 0) {
1415 			poller->next_run_tick = now_tick + poller->period_ticks;
1416 		} else if (poller->next_run_tick < now_tick) {
1417 			poller->next_run_tick = now_tick;
1418 		}
1419 
1420 		new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks;
1421 		new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks;
1422 
1423 		ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
1424 		if (ret < 0) {
1425 			SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno);
1426 			assert(false);
1427 		}
1428 	} else {
1429 		/* Disarm the timer */
1430 		ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv);
1431 		if (ret < 0) {
1432 			/* timerfd_settime's failure indicates that the timerfd is in error */
1433 			SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno);
1434 			assert(false);
1435 		}
1436 
1437 		/* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be
1438 		 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC
1439 		 */
1440 		now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \
1441 			   (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC;
1442 		poller_remove_timer(poller->thread, poller);
1443 		poller_insert_timer(poller->thread, poller, now_tick);
1444 	}
1445 }
1446 
1447 static void
1448 poller_interrupt_fini(struct spdk_poller *poller)
1449 {
1450 	SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name);
1451 	assert(poller->interruptfd >= 0);
1452 	spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd);
1453 	close(poller->interruptfd);
1454 	poller->interruptfd = -1;
1455 }
1456 
1457 static int
1458 busy_poller_interrupt_init(struct spdk_poller *poller)
1459 {
1460 	int busy_efd;
1461 	int rc;
1462 
1463 	SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name);
1464 	busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1465 	if (busy_efd < 0) {
1466 		SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name);
1467 		return -errno;
1468 	}
1469 
1470 	rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd,
1471 			       poller->fn, poller->arg, poller->name);
1472 	if (rc < 0) {
1473 		close(busy_efd);
1474 		return rc;
1475 	}
1476 
1477 	poller->interruptfd = busy_efd;
1478 	return 0;
1479 }
1480 
1481 static void
1482 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1483 {
1484 	int busy_efd = poller->interruptfd;
1485 	uint64_t notify = 1;
1486 	int rc __attribute__((unused));
1487 
1488 	assert(busy_efd >= 0);
1489 
1490 	if (interrupt_mode) {
1491 		/* Write without read on eventfd will get it repeatedly triggered. */
1492 		if (write(busy_efd, &notify, sizeof(notify)) < 0) {
1493 			SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name);
1494 		}
1495 	} else {
1496 		/* Read on eventfd will clear its level triggering. */
1497 		rc = read(busy_efd, &notify, sizeof(notify));
1498 	}
1499 }
1500 
1501 #else
1502 
1503 static int
1504 period_poller_interrupt_init(struct spdk_poller *poller)
1505 {
1506 	return -ENOTSUP;
1507 }
1508 
1509 static void
1510 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1511 {
1512 }
1513 
1514 static void
1515 poller_interrupt_fini(struct spdk_poller *poller)
1516 {
1517 }
1518 
1519 static int
1520 busy_poller_interrupt_init(struct spdk_poller *poller)
1521 {
1522 	return -ENOTSUP;
1523 }
1524 
1525 static void
1526 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1527 {
1528 }
1529 
1530 #endif
1531 
1532 void
1533 spdk_poller_register_interrupt(struct spdk_poller *poller,
1534 			       spdk_poller_set_interrupt_mode_cb cb_fn,
1535 			       void *cb_arg)
1536 {
1537 	assert(poller != NULL);
1538 	assert(cb_fn != NULL);
1539 	assert(spdk_get_thread() == poller->thread);
1540 
1541 	if (!spdk_interrupt_mode_is_enabled()) {
1542 		return;
1543 	}
1544 
1545 	/* when a poller is created we don't know if the user is ever going to
1546 	 * enable interrupts on it by calling this function, so the poller
1547 	 * registration function has to immediately create a interruptfd.
1548 	 * When this function does get called by user, we have to then destroy
1549 	 * that interruptfd.
1550 	 */
1551 	if (poller->set_intr_cb_fn && poller->interruptfd >= 0) {
1552 		poller_interrupt_fini(poller);
1553 	}
1554 
1555 	poller->set_intr_cb_fn = cb_fn;
1556 	poller->set_intr_cb_arg = cb_arg;
1557 
1558 	/* Set poller into interrupt mode if thread is in interrupt. */
1559 	if (poller->thread->in_interrupt) {
1560 		poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true);
1561 	}
1562 }
1563 
1564 static uint64_t
1565 convert_us_to_ticks(uint64_t us)
1566 {
1567 	uint64_t quotient, remainder, ticks;
1568 
1569 	if (us) {
1570 		quotient = us / SPDK_SEC_TO_USEC;
1571 		remainder = us % SPDK_SEC_TO_USEC;
1572 		ticks = spdk_get_ticks_hz();
1573 
1574 		return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
1575 	} else {
1576 		return 0;
1577 	}
1578 }
1579 
1580 static struct spdk_poller *
1581 poller_register(spdk_poller_fn fn,
1582 		void *arg,
1583 		uint64_t period_microseconds,
1584 		const char *name)
1585 {
1586 	struct spdk_thread *thread;
1587 	struct spdk_poller *poller;
1588 
1589 	thread = spdk_get_thread();
1590 	if (!thread) {
1591 		assert(false);
1592 		return NULL;
1593 	}
1594 
1595 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1596 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1597 		return NULL;
1598 	}
1599 
1600 	poller = calloc(1, sizeof(*poller));
1601 	if (poller == NULL) {
1602 		SPDK_ERRLOG("Poller memory allocation failed\n");
1603 		return NULL;
1604 	}
1605 
1606 	if (name) {
1607 		snprintf(poller->name, sizeof(poller->name), "%s", name);
1608 	} else {
1609 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
1610 	}
1611 
1612 	poller->state = SPDK_POLLER_STATE_WAITING;
1613 	poller->fn = fn;
1614 	poller->arg = arg;
1615 	poller->thread = thread;
1616 	poller->interruptfd = -1;
1617 	if (thread->next_poller_id == 0) {
1618 		SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n");
1619 		thread->next_poller_id = 1;
1620 	}
1621 	poller->id = thread->next_poller_id++;
1622 
1623 	poller->period_ticks = convert_us_to_ticks(period_microseconds);
1624 
1625 	if (spdk_interrupt_mode_is_enabled()) {
1626 		int rc;
1627 
1628 		if (period_microseconds) {
1629 			rc = period_poller_interrupt_init(poller);
1630 			if (rc < 0) {
1631 				SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc));
1632 				free(poller);
1633 				return NULL;
1634 			}
1635 
1636 			spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL);
1637 		} else {
1638 			/* If the poller doesn't have a period, create interruptfd that's always
1639 			 * busy automatically when running in interrupt mode.
1640 			 */
1641 			rc = busy_poller_interrupt_init(poller);
1642 			if (rc > 0) {
1643 				SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc));
1644 				free(poller);
1645 				return NULL;
1646 			}
1647 
1648 			spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL);
1649 		}
1650 	}
1651 
1652 	thread_insert_poller(thread, poller);
1653 
1654 	return poller;
1655 }
1656 
1657 struct spdk_poller *
1658 spdk_poller_register(spdk_poller_fn fn,
1659 		     void *arg,
1660 		     uint64_t period_microseconds)
1661 {
1662 	return poller_register(fn, arg, period_microseconds, NULL);
1663 }
1664 
1665 struct spdk_poller *
1666 spdk_poller_register_named(spdk_poller_fn fn,
1667 			   void *arg,
1668 			   uint64_t period_microseconds,
1669 			   const char *name)
1670 {
1671 	return poller_register(fn, arg, period_microseconds, name);
1672 }
1673 
1674 static void
1675 wrong_thread(const char *func, const char *name, struct spdk_thread *thread,
1676 	     struct spdk_thread *curthread)
1677 {
1678 	if (thread == NULL) {
1679 		SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name);
1680 		abort();
1681 	}
1682 	SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be "
1683 		    "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id,
1684 		    thread->name, thread->id);
1685 	assert(false);
1686 }
1687 
1688 void
1689 spdk_poller_unregister(struct spdk_poller **ppoller)
1690 {
1691 	struct spdk_thread *thread;
1692 	struct spdk_poller *poller;
1693 
1694 	poller = *ppoller;
1695 	if (poller == NULL) {
1696 		return;
1697 	}
1698 
1699 	*ppoller = NULL;
1700 
1701 	thread = spdk_get_thread();
1702 	if (!thread) {
1703 		assert(false);
1704 		return;
1705 	}
1706 
1707 	if (poller->thread != thread) {
1708 		wrong_thread(__func__, poller->name, poller->thread, thread);
1709 		return;
1710 	}
1711 
1712 	if (spdk_interrupt_mode_is_enabled()) {
1713 		/* Release the interrupt resource for period or busy poller */
1714 		if (poller->interruptfd >= 0) {
1715 			poller_interrupt_fini(poller);
1716 		}
1717 
1718 		/* Mark there is poller unregistered. Then unregistered pollers will
1719 		 * get reaped by spdk_thread_poll also in intr mode.
1720 		 */
1721 		thread->poller_unregistered = true;
1722 	}
1723 
1724 	/* If the poller was paused, put it on the active_pollers list so that
1725 	 * its unregistration can be processed by spdk_thread_poll().
1726 	 */
1727 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1728 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1729 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1730 		poller->period_ticks = 0;
1731 	}
1732 
1733 	/* Simply set the state to unregistered. The poller will get cleaned up
1734 	 * in a subsequent call to spdk_thread_poll().
1735 	 */
1736 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1737 }
1738 
1739 void
1740 spdk_poller_pause(struct spdk_poller *poller)
1741 {
1742 	struct spdk_thread *thread;
1743 
1744 	thread = spdk_get_thread();
1745 	if (!thread) {
1746 		assert(false);
1747 		return;
1748 	}
1749 
1750 	if (poller->thread != thread) {
1751 		wrong_thread(__func__, poller->name, poller->thread, thread);
1752 		return;
1753 	}
1754 
1755 	/* We just set its state to SPDK_POLLER_STATE_PAUSING and let
1756 	 * spdk_thread_poll() move it. It allows a poller to be paused from
1757 	 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE
1758 	 * iteration, or from within itself without breaking the logic to always
1759 	 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration.
1760 	 */
1761 	switch (poller->state) {
1762 	case SPDK_POLLER_STATE_PAUSED:
1763 	case SPDK_POLLER_STATE_PAUSING:
1764 		break;
1765 	case SPDK_POLLER_STATE_RUNNING:
1766 	case SPDK_POLLER_STATE_WAITING:
1767 		poller->state = SPDK_POLLER_STATE_PAUSING;
1768 		break;
1769 	default:
1770 		assert(false);
1771 		break;
1772 	}
1773 }
1774 
1775 void
1776 spdk_poller_resume(struct spdk_poller *poller)
1777 {
1778 	struct spdk_thread *thread;
1779 
1780 	thread = spdk_get_thread();
1781 	if (!thread) {
1782 		assert(false);
1783 		return;
1784 	}
1785 
1786 	if (poller->thread != thread) {
1787 		wrong_thread(__func__, poller->name, poller->thread, thread);
1788 		return;
1789 	}
1790 
1791 	/* If a poller is paused it has to be removed from the paused pollers
1792 	 * list and put on the active list or timer tree depending on its
1793 	 * period_ticks.  If a poller is still in the process of being paused,
1794 	 * we just need to flip its state back to waiting, as it's already on
1795 	 * the appropriate list or tree.
1796 	 */
1797 	switch (poller->state) {
1798 	case SPDK_POLLER_STATE_PAUSED:
1799 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1800 		thread_insert_poller(thread, poller);
1801 	/* fallthrough */
1802 	case SPDK_POLLER_STATE_PAUSING:
1803 		poller->state = SPDK_POLLER_STATE_WAITING;
1804 		break;
1805 	case SPDK_POLLER_STATE_RUNNING:
1806 	case SPDK_POLLER_STATE_WAITING:
1807 		break;
1808 	default:
1809 		assert(false);
1810 		break;
1811 	}
1812 }
1813 
1814 const char *
1815 spdk_poller_get_name(struct spdk_poller *poller)
1816 {
1817 	return poller->name;
1818 }
1819 
1820 uint64_t
1821 spdk_poller_get_id(struct spdk_poller *poller)
1822 {
1823 	return poller->id;
1824 }
1825 
1826 const char *
1827 spdk_poller_get_state_str(struct spdk_poller *poller)
1828 {
1829 	switch (poller->state) {
1830 	case SPDK_POLLER_STATE_WAITING:
1831 		return "waiting";
1832 	case SPDK_POLLER_STATE_RUNNING:
1833 		return "running";
1834 	case SPDK_POLLER_STATE_UNREGISTERED:
1835 		return "unregistered";
1836 	case SPDK_POLLER_STATE_PAUSING:
1837 		return "pausing";
1838 	case SPDK_POLLER_STATE_PAUSED:
1839 		return "paused";
1840 	default:
1841 		return NULL;
1842 	}
1843 }
1844 
1845 uint64_t
1846 spdk_poller_get_period_ticks(struct spdk_poller *poller)
1847 {
1848 	return poller->period_ticks;
1849 }
1850 
1851 void
1852 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats)
1853 {
1854 	stats->run_count = poller->run_count;
1855 	stats->busy_count = poller->busy_count;
1856 }
1857 
1858 struct spdk_poller *
1859 spdk_thread_get_first_active_poller(struct spdk_thread *thread)
1860 {
1861 	return TAILQ_FIRST(&thread->active_pollers);
1862 }
1863 
1864 struct spdk_poller *
1865 spdk_thread_get_next_active_poller(struct spdk_poller *prev)
1866 {
1867 	return TAILQ_NEXT(prev, tailq);
1868 }
1869 
1870 struct spdk_poller *
1871 spdk_thread_get_first_timed_poller(struct spdk_thread *thread)
1872 {
1873 	return RB_MIN(timed_pollers_tree, &thread->timed_pollers);
1874 }
1875 
1876 struct spdk_poller *
1877 spdk_thread_get_next_timed_poller(struct spdk_poller *prev)
1878 {
1879 	return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev);
1880 }
1881 
1882 struct spdk_poller *
1883 spdk_thread_get_first_paused_poller(struct spdk_thread *thread)
1884 {
1885 	return TAILQ_FIRST(&thread->paused_pollers);
1886 }
1887 
1888 struct spdk_poller *
1889 spdk_thread_get_next_paused_poller(struct spdk_poller *prev)
1890 {
1891 	return TAILQ_NEXT(prev, tailq);
1892 }
1893 
1894 struct spdk_io_channel *
1895 spdk_thread_get_first_io_channel(struct spdk_thread *thread)
1896 {
1897 	return RB_MIN(io_channel_tree, &thread->io_channels);
1898 }
1899 
1900 struct spdk_io_channel *
1901 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev)
1902 {
1903 	return RB_NEXT(io_channel_tree, &thread->io_channels, prev);
1904 }
1905 
1906 struct call_thread {
1907 	struct spdk_thread *cur_thread;
1908 	spdk_msg_fn fn;
1909 	void *ctx;
1910 
1911 	struct spdk_thread *orig_thread;
1912 	spdk_msg_fn cpl;
1913 };
1914 
1915 static void
1916 _on_thread(void *ctx)
1917 {
1918 	struct call_thread *ct = ctx;
1919 	int rc __attribute__((unused));
1920 
1921 	ct->fn(ct->ctx);
1922 
1923 	pthread_mutex_lock(&g_devlist_mutex);
1924 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1925 	while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) {
1926 		SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n",
1927 			      ct->cur_thread->name);
1928 		ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1929 	}
1930 	pthread_mutex_unlock(&g_devlist_mutex);
1931 
1932 	if (!ct->cur_thread) {
1933 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1934 
1935 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1936 		free(ctx);
1937 	} else {
1938 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1939 			      ct->cur_thread->name);
1940 
1941 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1942 	}
1943 	assert(rc == 0);
1944 }
1945 
1946 void
1947 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1948 {
1949 	struct call_thread *ct;
1950 	struct spdk_thread *thread;
1951 	int rc __attribute__((unused));
1952 
1953 	ct = calloc(1, sizeof(*ct));
1954 	if (!ct) {
1955 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1956 		cpl(ctx);
1957 		return;
1958 	}
1959 
1960 	ct->fn = fn;
1961 	ct->ctx = ctx;
1962 	ct->cpl = cpl;
1963 
1964 	thread = _get_thread();
1965 	if (!thread) {
1966 		SPDK_ERRLOG("No thread allocated\n");
1967 		free(ct);
1968 		cpl(ctx);
1969 		return;
1970 	}
1971 	ct->orig_thread = thread;
1972 
1973 	pthread_mutex_lock(&g_devlist_mutex);
1974 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1975 	pthread_mutex_unlock(&g_devlist_mutex);
1976 
1977 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
1978 		      ct->orig_thread->name);
1979 
1980 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1981 	assert(rc == 0);
1982 }
1983 
1984 static inline void
1985 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode)
1986 {
1987 	if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1988 		return;
1989 	}
1990 
1991 	if (!poller->set_intr_cb_fn) {
1992 		SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name);
1993 		assert(false);
1994 		return;
1995 	}
1996 
1997 	poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode);
1998 }
1999 
2000 void
2001 spdk_thread_set_interrupt_mode(bool enable_interrupt)
2002 {
2003 	struct spdk_thread *thread = _get_thread();
2004 	struct spdk_poller *poller, *tmp;
2005 
2006 	assert(thread);
2007 	assert(spdk_interrupt_mode_is_enabled());
2008 
2009 	SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n",
2010 		       thread->name,  enable_interrupt ? "intr" : "poll",
2011 		       thread->in_interrupt ? "intr" : "poll");
2012 
2013 	if (thread->in_interrupt == enable_interrupt) {
2014 		return;
2015 	}
2016 
2017 	/* Set pollers to expected mode */
2018 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
2019 		poller_set_interrupt_mode(poller, enable_interrupt);
2020 	}
2021 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) {
2022 		poller_set_interrupt_mode(poller, enable_interrupt);
2023 	}
2024 	/* All paused pollers will go to work in interrupt mode */
2025 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) {
2026 		poller_set_interrupt_mode(poller, enable_interrupt);
2027 	}
2028 
2029 	thread->in_interrupt = enable_interrupt;
2030 	return;
2031 }
2032 
2033 static struct io_device *
2034 io_device_get(void *io_device)
2035 {
2036 	struct io_device find = {};
2037 
2038 	find.io_device = io_device;
2039 	return RB_FIND(io_device_tree, &g_io_devices, &find);
2040 }
2041 
2042 void
2043 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
2044 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
2045 			const char *name)
2046 {
2047 	struct io_device *dev, *tmp;
2048 	struct spdk_thread *thread;
2049 
2050 	assert(io_device != NULL);
2051 	assert(create_cb != NULL);
2052 	assert(destroy_cb != NULL);
2053 
2054 	thread = spdk_get_thread();
2055 	if (!thread) {
2056 		SPDK_ERRLOG("called from non-SPDK thread\n");
2057 		assert(false);
2058 		return;
2059 	}
2060 
2061 	dev = calloc(1, sizeof(struct io_device));
2062 	if (dev == NULL) {
2063 		SPDK_ERRLOG("could not allocate io_device\n");
2064 		return;
2065 	}
2066 
2067 	dev->io_device = io_device;
2068 	if (name) {
2069 		snprintf(dev->name, sizeof(dev->name), "%s", name);
2070 	} else {
2071 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
2072 	}
2073 	dev->create_cb = create_cb;
2074 	dev->destroy_cb = destroy_cb;
2075 	dev->unregister_cb = NULL;
2076 	dev->ctx_size = ctx_size;
2077 	dev->for_each_count = 0;
2078 	dev->unregistered = false;
2079 	dev->refcnt = 0;
2080 
2081 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
2082 		      dev->name, dev->io_device, thread->name);
2083 
2084 	pthread_mutex_lock(&g_devlist_mutex);
2085 	tmp = RB_INSERT(io_device_tree, &g_io_devices, dev);
2086 	if (tmp != NULL) {
2087 		SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
2088 			    io_device, tmp->name, dev->name);
2089 		free(dev);
2090 	}
2091 
2092 	pthread_mutex_unlock(&g_devlist_mutex);
2093 }
2094 
2095 static void
2096 _finish_unregister(void *arg)
2097 {
2098 	struct io_device *dev = arg;
2099 	struct spdk_thread *thread;
2100 
2101 	thread = spdk_get_thread();
2102 	assert(thread == dev->unregister_thread);
2103 
2104 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
2105 		      dev->name, dev->io_device, thread->name);
2106 
2107 	assert(thread->pending_unregister_count > 0);
2108 	thread->pending_unregister_count--;
2109 
2110 	dev->unregister_cb(dev->io_device);
2111 	free(dev);
2112 }
2113 
2114 static void
2115 io_device_free(struct io_device *dev)
2116 {
2117 	int rc __attribute__((unused));
2118 
2119 	if (dev->unregister_cb == NULL) {
2120 		free(dev);
2121 	} else {
2122 		assert(dev->unregister_thread != NULL);
2123 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
2124 			      dev->name, dev->io_device, dev->unregister_thread->name);
2125 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
2126 		assert(rc == 0);
2127 	}
2128 }
2129 
2130 void
2131 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
2132 {
2133 	struct io_device *dev;
2134 	uint32_t refcnt;
2135 	struct spdk_thread *thread;
2136 
2137 	thread = spdk_get_thread();
2138 	if (!thread) {
2139 		SPDK_ERRLOG("called from non-SPDK thread\n");
2140 		assert(false);
2141 		return;
2142 	}
2143 
2144 	pthread_mutex_lock(&g_devlist_mutex);
2145 	dev = io_device_get(io_device);
2146 	if (!dev) {
2147 		SPDK_ERRLOG("io_device %p not found\n", io_device);
2148 		assert(false);
2149 		pthread_mutex_unlock(&g_devlist_mutex);
2150 		return;
2151 	}
2152 
2153 	/* The for_each_count check differentiates the user attempting to unregister the
2154 	 * device a second time, from the internal call to this function that occurs
2155 	 * after the for_each_count reaches 0.
2156 	 */
2157 	if (dev->pending_unregister && dev->for_each_count > 0) {
2158 		SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device);
2159 		assert(false);
2160 		pthread_mutex_unlock(&g_devlist_mutex);
2161 		return;
2162 	}
2163 
2164 	dev->unregister_cb = unregister_cb;
2165 	dev->unregister_thread = thread;
2166 
2167 	if (dev->for_each_count > 0) {
2168 		SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n",
2169 			     dev->name, io_device, dev->for_each_count);
2170 		dev->pending_unregister = true;
2171 		pthread_mutex_unlock(&g_devlist_mutex);
2172 		return;
2173 	}
2174 
2175 	dev->unregistered = true;
2176 	RB_REMOVE(io_device_tree, &g_io_devices, dev);
2177 	refcnt = dev->refcnt;
2178 	pthread_mutex_unlock(&g_devlist_mutex);
2179 
2180 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
2181 		      dev->name, dev->io_device, thread->name);
2182 
2183 	if (unregister_cb) {
2184 		thread->pending_unregister_count++;
2185 	}
2186 
2187 	if (refcnt > 0) {
2188 		/* defer deletion */
2189 		return;
2190 	}
2191 
2192 	io_device_free(dev);
2193 }
2194 
2195 const char *
2196 spdk_io_device_get_name(struct io_device *dev)
2197 {
2198 	return dev->name;
2199 }
2200 
2201 static struct spdk_io_channel *
2202 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev)
2203 {
2204 	struct spdk_io_channel find = {};
2205 
2206 	find.dev = dev;
2207 	return RB_FIND(io_channel_tree, &thread->io_channels, &find);
2208 }
2209 
2210 struct spdk_io_channel *
2211 spdk_get_io_channel(void *io_device)
2212 {
2213 	struct spdk_io_channel *ch;
2214 	struct spdk_thread *thread;
2215 	struct io_device *dev;
2216 	int rc;
2217 
2218 	pthread_mutex_lock(&g_devlist_mutex);
2219 	dev = io_device_get(io_device);
2220 	if (dev == NULL) {
2221 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
2222 		pthread_mutex_unlock(&g_devlist_mutex);
2223 		return NULL;
2224 	}
2225 
2226 	thread = _get_thread();
2227 	if (!thread) {
2228 		SPDK_ERRLOG("No thread allocated\n");
2229 		pthread_mutex_unlock(&g_devlist_mutex);
2230 		return NULL;
2231 	}
2232 
2233 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
2234 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
2235 		pthread_mutex_unlock(&g_devlist_mutex);
2236 		return NULL;
2237 	}
2238 
2239 	ch = thread_get_io_channel(thread, dev);
2240 	if (ch != NULL) {
2241 		ch->ref++;
2242 
2243 		SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2244 			      ch, dev->name, dev->io_device, thread->name, ch->ref);
2245 
2246 		/*
2247 		 * An I/O channel already exists for this device on this
2248 		 *  thread, so return it.
2249 		 */
2250 		pthread_mutex_unlock(&g_devlist_mutex);
2251 		spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0,
2252 				  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2253 		return ch;
2254 	}
2255 
2256 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
2257 	if (ch == NULL) {
2258 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
2259 		pthread_mutex_unlock(&g_devlist_mutex);
2260 		return NULL;
2261 	}
2262 
2263 	ch->dev = dev;
2264 	ch->destroy_cb = dev->destroy_cb;
2265 	ch->thread = thread;
2266 	ch->ref = 1;
2267 	ch->destroy_ref = 0;
2268 	RB_INSERT(io_channel_tree, &thread->io_channels, ch);
2269 
2270 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2271 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
2272 
2273 	dev->refcnt++;
2274 
2275 	pthread_mutex_unlock(&g_devlist_mutex);
2276 
2277 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
2278 	if (rc != 0) {
2279 		pthread_mutex_lock(&g_devlist_mutex);
2280 		RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
2281 		dev->refcnt--;
2282 		free(ch);
2283 		pthread_mutex_unlock(&g_devlist_mutex);
2284 		return NULL;
2285 	}
2286 
2287 	spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1);
2288 	return ch;
2289 }
2290 
2291 static void
2292 put_io_channel(void *arg)
2293 {
2294 	struct spdk_io_channel *ch = arg;
2295 	bool do_remove_dev = true;
2296 	struct spdk_thread *thread;
2297 
2298 	thread = spdk_get_thread();
2299 	if (!thread) {
2300 		SPDK_ERRLOG("called from non-SPDK thread\n");
2301 		assert(false);
2302 		return;
2303 	}
2304 
2305 	SPDK_DEBUGLOG(thread,
2306 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
2307 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
2308 
2309 	assert(ch->thread == thread);
2310 
2311 	ch->destroy_ref--;
2312 
2313 	if (ch->ref > 0 || ch->destroy_ref > 0) {
2314 		/*
2315 		 * Another reference to the associated io_device was requested
2316 		 *  after this message was sent but before it had a chance to
2317 		 *  execute.
2318 		 */
2319 		return;
2320 	}
2321 
2322 	pthread_mutex_lock(&g_devlist_mutex);
2323 	RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
2324 	pthread_mutex_unlock(&g_devlist_mutex);
2325 
2326 	/* Don't hold the devlist mutex while the destroy_cb is called. */
2327 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
2328 
2329 	pthread_mutex_lock(&g_devlist_mutex);
2330 	ch->dev->refcnt--;
2331 
2332 	if (!ch->dev->unregistered) {
2333 		do_remove_dev = false;
2334 	}
2335 
2336 	if (ch->dev->refcnt > 0) {
2337 		do_remove_dev = false;
2338 	}
2339 
2340 	pthread_mutex_unlock(&g_devlist_mutex);
2341 
2342 	if (do_remove_dev) {
2343 		io_device_free(ch->dev);
2344 	}
2345 	free(ch);
2346 }
2347 
2348 void
2349 spdk_put_io_channel(struct spdk_io_channel *ch)
2350 {
2351 	struct spdk_thread *thread;
2352 	int rc __attribute__((unused));
2353 
2354 	spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0,
2355 			  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2356 
2357 	thread = spdk_get_thread();
2358 	if (!thread) {
2359 		SPDK_ERRLOG("called from non-SPDK thread\n");
2360 		assert(false);
2361 		return;
2362 	}
2363 
2364 	if (ch->thread != thread) {
2365 		wrong_thread(__func__, "ch", ch->thread, thread);
2366 		return;
2367 	}
2368 
2369 	SPDK_DEBUGLOG(thread,
2370 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2371 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
2372 
2373 	ch->ref--;
2374 
2375 	if (ch->ref == 0) {
2376 		ch->destroy_ref++;
2377 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
2378 		assert(rc == 0);
2379 	}
2380 }
2381 
2382 struct spdk_io_channel *
2383 spdk_io_channel_from_ctx(void *ctx)
2384 {
2385 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
2386 }
2387 
2388 struct spdk_thread *
2389 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
2390 {
2391 	return ch->thread;
2392 }
2393 
2394 void *
2395 spdk_io_channel_get_io_device(struct spdk_io_channel *ch)
2396 {
2397 	return ch->dev->io_device;
2398 }
2399 
2400 const char *
2401 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch)
2402 {
2403 	return spdk_io_device_get_name(ch->dev);
2404 }
2405 
2406 int
2407 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch)
2408 {
2409 	return ch->ref;
2410 }
2411 
2412 struct spdk_io_channel_iter {
2413 	void *io_device;
2414 	struct io_device *dev;
2415 	spdk_channel_msg fn;
2416 	int status;
2417 	void *ctx;
2418 	struct spdk_io_channel *ch;
2419 
2420 	struct spdk_thread *cur_thread;
2421 
2422 	struct spdk_thread *orig_thread;
2423 	spdk_channel_for_each_cpl cpl;
2424 };
2425 
2426 void *
2427 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
2428 {
2429 	return i->io_device;
2430 }
2431 
2432 struct spdk_io_channel *
2433 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
2434 {
2435 	return i->ch;
2436 }
2437 
2438 void *
2439 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
2440 {
2441 	return i->ctx;
2442 }
2443 
2444 static void
2445 _call_completion(void *ctx)
2446 {
2447 	struct spdk_io_channel_iter *i = ctx;
2448 
2449 	if (i->cpl != NULL) {
2450 		i->cpl(i, i->status);
2451 	}
2452 	free(i);
2453 }
2454 
2455 static void
2456 _call_channel(void *ctx)
2457 {
2458 	struct spdk_io_channel_iter *i = ctx;
2459 	struct spdk_io_channel *ch;
2460 
2461 	/*
2462 	 * It is possible that the channel was deleted before this
2463 	 *  message had a chance to execute.  If so, skip calling
2464 	 *  the fn() on this thread.
2465 	 */
2466 	pthread_mutex_lock(&g_devlist_mutex);
2467 	ch = thread_get_io_channel(i->cur_thread, i->dev);
2468 	pthread_mutex_unlock(&g_devlist_mutex);
2469 
2470 	if (ch) {
2471 		i->fn(i);
2472 	} else {
2473 		spdk_for_each_channel_continue(i, 0);
2474 	}
2475 }
2476 
2477 void
2478 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
2479 		      spdk_channel_for_each_cpl cpl)
2480 {
2481 	struct spdk_thread *thread;
2482 	struct spdk_io_channel *ch;
2483 	struct spdk_io_channel_iter *i;
2484 	int rc __attribute__((unused));
2485 
2486 	i = calloc(1, sizeof(*i));
2487 	if (!i) {
2488 		SPDK_ERRLOG("Unable to allocate iterator\n");
2489 		assert(false);
2490 		return;
2491 	}
2492 
2493 	i->io_device = io_device;
2494 	i->fn = fn;
2495 	i->ctx = ctx;
2496 	i->cpl = cpl;
2497 	i->orig_thread = _get_thread();
2498 
2499 	pthread_mutex_lock(&g_devlist_mutex);
2500 	i->dev = io_device_get(io_device);
2501 	if (i->dev == NULL) {
2502 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
2503 		assert(false);
2504 		i->status = -ENODEV;
2505 		goto end;
2506 	}
2507 
2508 	/* Do not allow new for_each operations if we are already waiting to unregister
2509 	 * the device for other for_each operations to complete.
2510 	 */
2511 	if (i->dev->pending_unregister) {
2512 		SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device);
2513 		i->status = -ENODEV;
2514 		goto end;
2515 	}
2516 
2517 	TAILQ_FOREACH(thread, &g_threads, tailq) {
2518 		ch = thread_get_io_channel(thread, i->dev);
2519 		if (ch != NULL) {
2520 			ch->dev->for_each_count++;
2521 			i->cur_thread = thread;
2522 			i->ch = ch;
2523 			pthread_mutex_unlock(&g_devlist_mutex);
2524 			rc = spdk_thread_send_msg(thread, _call_channel, i);
2525 			assert(rc == 0);
2526 			return;
2527 		}
2528 	}
2529 
2530 end:
2531 	pthread_mutex_unlock(&g_devlist_mutex);
2532 
2533 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2534 	assert(rc == 0);
2535 }
2536 
2537 static void
2538 __pending_unregister(void *arg)
2539 {
2540 	struct io_device *dev = arg;
2541 
2542 	assert(dev->pending_unregister);
2543 	assert(dev->for_each_count == 0);
2544 	spdk_io_device_unregister(dev->io_device, dev->unregister_cb);
2545 }
2546 
2547 void
2548 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
2549 {
2550 	struct spdk_thread *thread;
2551 	struct spdk_io_channel *ch;
2552 	struct io_device *dev;
2553 	int rc __attribute__((unused));
2554 
2555 	assert(i->cur_thread == spdk_get_thread());
2556 
2557 	i->status = status;
2558 
2559 	pthread_mutex_lock(&g_devlist_mutex);
2560 	dev = i->dev;
2561 	if (status) {
2562 		goto end;
2563 	}
2564 
2565 	thread = TAILQ_NEXT(i->cur_thread, tailq);
2566 	while (thread) {
2567 		ch = thread_get_io_channel(thread, dev);
2568 		if (ch != NULL) {
2569 			i->cur_thread = thread;
2570 			i->ch = ch;
2571 			pthread_mutex_unlock(&g_devlist_mutex);
2572 			rc = spdk_thread_send_msg(thread, _call_channel, i);
2573 			assert(rc == 0);
2574 			return;
2575 		}
2576 		thread = TAILQ_NEXT(thread, tailq);
2577 	}
2578 
2579 end:
2580 	dev->for_each_count--;
2581 	i->ch = NULL;
2582 	pthread_mutex_unlock(&g_devlist_mutex);
2583 
2584 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2585 	assert(rc == 0);
2586 
2587 	pthread_mutex_lock(&g_devlist_mutex);
2588 	if (dev->pending_unregister && dev->for_each_count == 0) {
2589 		rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev);
2590 		assert(rc == 0);
2591 	}
2592 	pthread_mutex_unlock(&g_devlist_mutex);
2593 }
2594 
2595 struct spdk_interrupt {
2596 	int			efd;
2597 	struct spdk_thread	*thread;
2598 	char			name[SPDK_MAX_POLLER_NAME_LEN + 1];
2599 };
2600 
2601 static void
2602 thread_interrupt_destroy(struct spdk_thread *thread)
2603 {
2604 	struct spdk_fd_group *fgrp = thread->fgrp;
2605 
2606 	SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
2607 
2608 	if (thread->msg_fd < 0) {
2609 		return;
2610 	}
2611 
2612 	spdk_fd_group_remove(fgrp, thread->msg_fd);
2613 	close(thread->msg_fd);
2614 	thread->msg_fd = -1;
2615 
2616 	spdk_fd_group_destroy(fgrp);
2617 	thread->fgrp = NULL;
2618 }
2619 
2620 #ifdef __linux__
2621 static int
2622 thread_interrupt_msg_process(void *arg)
2623 {
2624 	struct spdk_thread *thread = arg;
2625 	uint32_t msg_count;
2626 	spdk_msg_fn critical_msg;
2627 	int rc = 0;
2628 	uint64_t notify = 1;
2629 
2630 	assert(spdk_interrupt_mode_is_enabled());
2631 
2632 	/* There may be race between msg_acknowledge and another producer's msg_notify,
2633 	 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
2634 	 * This can avoid msg notification missing.
2635 	 */
2636 	rc = read(thread->msg_fd, &notify, sizeof(notify));
2637 	if (rc < 0 && errno != EAGAIN) {
2638 		SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno));
2639 	}
2640 
2641 	critical_msg = thread->critical_msg;
2642 	if (spdk_unlikely(critical_msg != NULL)) {
2643 		critical_msg(NULL);
2644 		thread->critical_msg = NULL;
2645 		rc = 1;
2646 	}
2647 
2648 	msg_count = msg_queue_run_batch(thread, 0);
2649 	if (msg_count) {
2650 		rc = 1;
2651 	}
2652 
2653 	return rc;
2654 }
2655 
2656 static int
2657 thread_interrupt_create(struct spdk_thread *thread)
2658 {
2659 	int rc;
2660 
2661 	SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
2662 
2663 	rc = spdk_fd_group_create(&thread->fgrp);
2664 	if (rc) {
2665 		return rc;
2666 	}
2667 
2668 	thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2669 	if (thread->msg_fd < 0) {
2670 		rc = -errno;
2671 		spdk_fd_group_destroy(thread->fgrp);
2672 		thread->fgrp = NULL;
2673 
2674 		return rc;
2675 	}
2676 
2677 	return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd,
2678 				 thread_interrupt_msg_process, thread);
2679 }
2680 #else
2681 static int
2682 thread_interrupt_create(struct spdk_thread *thread)
2683 {
2684 	return -ENOTSUP;
2685 }
2686 #endif
2687 
2688 struct spdk_interrupt *
2689 spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
2690 			void *arg, const char *name)
2691 {
2692 	struct spdk_thread *thread;
2693 	struct spdk_interrupt *intr;
2694 	int ret;
2695 
2696 	thread = spdk_get_thread();
2697 	if (!thread) {
2698 		assert(false);
2699 		return NULL;
2700 	}
2701 
2702 	if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
2703 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
2704 		return NULL;
2705 	}
2706 
2707 	intr = calloc(1, sizeof(*intr));
2708 	if (intr == NULL) {
2709 		SPDK_ERRLOG("Interrupt handler allocation failed\n");
2710 		return NULL;
2711 	}
2712 
2713 	if (name) {
2714 		snprintf(intr->name, sizeof(intr->name), "%s", name);
2715 	} else {
2716 		snprintf(intr->name, sizeof(intr->name), "%p", fn);
2717 	}
2718 
2719 	intr->efd = efd;
2720 	intr->thread = thread;
2721 
2722 	ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name);
2723 
2724 	if (ret != 0) {
2725 		SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n",
2726 			    thread->name, efd, spdk_strerror(-ret));
2727 		free(intr);
2728 		return NULL;
2729 	}
2730 
2731 	return intr;
2732 }
2733 
2734 void
2735 spdk_interrupt_unregister(struct spdk_interrupt **pintr)
2736 {
2737 	struct spdk_thread *thread;
2738 	struct spdk_interrupt *intr;
2739 
2740 	intr = *pintr;
2741 	if (intr == NULL) {
2742 		return;
2743 	}
2744 
2745 	*pintr = NULL;
2746 
2747 	thread = spdk_get_thread();
2748 	if (!thread) {
2749 		assert(false);
2750 		return;
2751 	}
2752 
2753 	if (intr->thread != thread) {
2754 		wrong_thread(__func__, intr->name, intr->thread, thread);
2755 		return;
2756 	}
2757 
2758 	spdk_fd_group_remove(thread->fgrp, intr->efd);
2759 	free(intr);
2760 }
2761 
2762 int
2763 spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
2764 			       enum spdk_interrupt_event_types event_types)
2765 {
2766 	struct spdk_thread *thread;
2767 
2768 	thread = spdk_get_thread();
2769 	if (!thread) {
2770 		assert(false);
2771 		return -EINVAL;
2772 	}
2773 
2774 	if (intr->thread != thread) {
2775 		wrong_thread(__func__, intr->name, intr->thread, thread);
2776 		return -EINVAL;
2777 	}
2778 
2779 	return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
2780 }
2781 
2782 int
2783 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
2784 {
2785 	return spdk_fd_group_get_fd(thread->fgrp);
2786 }
2787 
2788 static bool g_interrupt_mode = false;
2789 
2790 int
2791 spdk_interrupt_mode_enable(void)
2792 {
2793 	/* It must be called once prior to initializing the threading library.
2794 	 * g_spdk_msg_mempool will be valid if thread library is initialized.
2795 	 */
2796 	if (g_spdk_msg_mempool) {
2797 		SPDK_ERRLOG("Failed due to threading library is already initialized.\n");
2798 		return -1;
2799 	}
2800 
2801 #ifdef __linux__
2802 	SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
2803 	g_interrupt_mode = true;
2804 	return 0;
2805 #else
2806 	SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
2807 	g_interrupt_mode = false;
2808 	return -ENOTSUP;
2809 #endif
2810 }
2811 
2812 bool
2813 spdk_interrupt_mode_is_enabled(void)
2814 {
2815 	return g_interrupt_mode;
2816 }
2817 
2818 void
2819 spdk_spin_init(struct spdk_spinlock *sspin)
2820 {
2821 	int rc;
2822 
2823 	memset(sspin, 0, sizeof(*sspin));
2824 	rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE);
2825 	SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD);
2826 }
2827 
2828 void
2829 spdk_spin_destroy(struct spdk_spinlock *sspin)
2830 {
2831 	int rc;
2832 
2833 	SPIN_ASSERT_RETURN_VOID(sspin->thread == NULL, SPIN_ERR_LOCK_HELD);
2834 
2835 	rc = pthread_spin_destroy(&sspin->spinlock);
2836 	SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD);
2837 }
2838 
2839 void
2840 spdk_spin_lock(struct spdk_spinlock *sspin)
2841 {
2842 	struct spdk_thread *thread = spdk_get_thread();
2843 	int rc;
2844 
2845 	SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD);
2846 	SPIN_ASSERT_RETURN_VOID(thread != sspin->thread, SPIN_ERR_DEADLOCK);
2847 
2848 	rc = pthread_spin_lock(&sspin->spinlock);
2849 	SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD);
2850 
2851 	sspin->thread = thread;
2852 	sspin->thread->lock_count++;
2853 }
2854 
2855 void
2856 spdk_spin_unlock(struct spdk_spinlock *sspin)
2857 {
2858 	struct spdk_thread *thread = spdk_get_thread();
2859 	int rc;
2860 
2861 	SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD);
2862 	SPIN_ASSERT_RETURN_VOID(thread == sspin->thread, SPIN_ERR_WRONG_THREAD);
2863 
2864 	SPIN_ASSERT_RETURN_VOID(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT);
2865 	thread->lock_count--;
2866 	sspin->thread = NULL;
2867 
2868 	rc = pthread_spin_unlock(&sspin->spinlock);
2869 	SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD);
2870 }
2871 
2872 bool
2873 spdk_spin_held(struct spdk_spinlock *sspin)
2874 {
2875 	struct spdk_thread *thread = spdk_get_thread();
2876 
2877 	SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false);
2878 
2879 	return sspin->thread == thread;
2880 }
2881 
2882 SPDK_LOG_REGISTER_COMPONENT(thread)
2883