xref: /spdk/lib/thread/thread.c (revision fecffda6ecf8853b82edccde429b68252f0a62c5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/env.h"
10 #include "spdk/bdev.h"
11 #include "spdk/likely.h"
12 #include "spdk/queue.h"
13 #include "spdk/string.h"
14 #include "spdk/thread.h"
15 #include "spdk/trace.h"
16 #include "spdk/util.h"
17 #include "spdk/fd_group.h"
18 
19 #include "spdk/log.h"
20 #include "spdk_internal/thread.h"
21 #include "spdk_internal/usdt.h"
22 #include "thread_internal.h"
23 
24 #include "spdk_internal/trace_defs.h"
25 
26 #ifdef __linux__
27 #include <sys/timerfd.h>
28 #include <sys/eventfd.h>
29 #endif
30 
31 #define SPDK_MSG_BATCH_SIZE		8
32 #define SPDK_MAX_DEVICE_NAME_LEN	256
33 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
34 #define SPDK_MAX_POLLER_NAME_LEN	256
35 #define SPDK_MAX_THREAD_NAME_LEN	256
36 #define IOBUF_MIN_SMALL_POOL_SIZE	8191
37 #define IOBUF_MIN_LARGE_POOL_SIZE	1023
38 #define IOBUF_ALIGNMENT			512
39 #define IOBUF_MIN_SMALL_BUFSIZE		(SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + \
40 					 IOBUF_ALIGNMENT)
41 #define IOBUF_MIN_LARGE_BUFSIZE		(SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + \
42 					 IOBUF_ALIGNMENT)
43 
44 static struct spdk_thread *g_app_thread;
45 
46 enum spdk_poller_state {
47 	/* The poller is registered with a thread but not currently executing its fn. */
48 	SPDK_POLLER_STATE_WAITING,
49 
50 	/* The poller is currently running its fn. */
51 	SPDK_POLLER_STATE_RUNNING,
52 
53 	/* The poller was unregistered during the execution of its fn. */
54 	SPDK_POLLER_STATE_UNREGISTERED,
55 
56 	/* The poller is in the process of being paused.  It will be paused
57 	 * during the next time it's supposed to be executed.
58 	 */
59 	SPDK_POLLER_STATE_PAUSING,
60 
61 	/* The poller is registered but currently paused.  It's on the
62 	 * paused_pollers list.
63 	 */
64 	SPDK_POLLER_STATE_PAUSED,
65 };
66 
67 struct spdk_poller {
68 	TAILQ_ENTRY(spdk_poller)	tailq;
69 	RB_ENTRY(spdk_poller)		node;
70 
71 	/* Current state of the poller; should only be accessed from the poller's thread. */
72 	enum spdk_poller_state		state;
73 
74 	uint64_t			period_ticks;
75 	uint64_t			next_run_tick;
76 	uint64_t			run_count;
77 	uint64_t			busy_count;
78 	uint64_t			id;
79 	spdk_poller_fn			fn;
80 	void				*arg;
81 	struct spdk_thread		*thread;
82 	/* Native interruptfd for period or busy poller */
83 	int				interruptfd;
84 	spdk_poller_set_interrupt_mode_cb set_intr_cb_fn;
85 	void				*set_intr_cb_arg;
86 
87 	char				name[SPDK_MAX_POLLER_NAME_LEN + 1];
88 };
89 
90 enum spdk_thread_state {
91 	/* The thread is processing poller and message by spdk_thread_poll(). */
92 	SPDK_THREAD_STATE_RUNNING,
93 
94 	/* The thread is in the process of termination. It reaps unregistering
95 	 * poller are releasing I/O channel.
96 	 */
97 	SPDK_THREAD_STATE_EXITING,
98 
99 	/* The thread is exited. It is ready to call spdk_thread_destroy(). */
100 	SPDK_THREAD_STATE_EXITED,
101 };
102 
103 struct spdk_thread {
104 	uint64_t			tsc_last;
105 	struct spdk_thread_stats	stats;
106 	/*
107 	 * Contains pollers actively running on this thread.  Pollers
108 	 *  are run round-robin. The thread takes one poller from the head
109 	 *  of the ring, executes it, then puts it back at the tail of
110 	 *  the ring.
111 	 */
112 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
113 	/**
114 	 * Contains pollers running on this thread with a periodic timer.
115 	 */
116 	RB_HEAD(timed_pollers_tree, spdk_poller)	timed_pollers;
117 	struct spdk_poller				*first_timed_poller;
118 	/*
119 	 * Contains paused pollers.  Pollers on this queue are waiting until
120 	 * they are resumed (in which case they're put onto the active/timer
121 	 * queues) or unregistered.
122 	 */
123 	TAILQ_HEAD(paused_pollers_head, spdk_poller)	paused_pollers;
124 	struct spdk_ring		*messages;
125 	int				msg_fd;
126 	SLIST_HEAD(, spdk_msg)		msg_cache;
127 	size_t				msg_cache_count;
128 	spdk_msg_fn			critical_msg;
129 	uint64_t			id;
130 	uint64_t			next_poller_id;
131 	enum spdk_thread_state		state;
132 	int				pending_unregister_count;
133 
134 	RB_HEAD(io_channel_tree, spdk_io_channel)	io_channels;
135 	TAILQ_ENTRY(spdk_thread)			tailq;
136 
137 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
138 	struct spdk_cpuset		cpumask;
139 	uint64_t			exit_timeout_tsc;
140 
141 	int32_t				lock_count;
142 
143 	/* Indicates whether this spdk_thread currently runs in interrupt. */
144 	bool				in_interrupt;
145 	bool				poller_unregistered;
146 	struct spdk_fd_group		*fgrp;
147 
148 	/* User context allocated at the end */
149 	uint8_t				ctx[0];
150 };
151 
152 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
153 
154 static spdk_new_thread_fn g_new_thread_fn = NULL;
155 static spdk_thread_op_fn g_thread_op_fn = NULL;
156 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
157 static size_t g_ctx_sz = 0;
158 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
159  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
160  * SPDK application is required.
161  */
162 static uint64_t g_thread_id = 1;
163 
164 enum spin_error {
165 	SPIN_ERR_NONE,
166 	/* Trying to use an SPDK lock while not on an SPDK thread */
167 	SPIN_ERR_NOT_SPDK_THREAD,
168 	/* Trying to lock a lock already held by this SPDK thread */
169 	SPIN_ERR_DEADLOCK,
170 	/* Trying to unlock a lock not held by this SPDK thread */
171 	SPIN_ERR_WRONG_THREAD,
172 	/* pthread_spin_*() returned an error */
173 	SPIN_ERR_PTHREAD,
174 	/* Trying to destroy a lock that is held */
175 	SPIN_ERR_LOCK_HELD,
176 	/* lock_count is invalid */
177 	SPIN_ERR_LOCK_COUNT,
178 	/*
179 	 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to
180 	 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to
181 	 * deadlock when another SPDK thread on the same pthread tries to take that lock.
182 	 */
183 	SPIN_ERR_HOLD_DURING_SWITCH,
184 	/* Must be last, not an actual error code */
185 	SPIN_ERR_LAST
186 };
187 
188 static const char *spin_error_strings[] = {
189 	[SPIN_ERR_NONE]			= "No error",
190 	[SPIN_ERR_NOT_SPDK_THREAD]	= "Not an SPDK thread",
191 	[SPIN_ERR_DEADLOCK]		= "Deadlock detected",
192 	[SPIN_ERR_WRONG_THREAD]		= "Unlock on wrong SPDK thread",
193 	[SPIN_ERR_PTHREAD]		= "Error from pthread_spinlock",
194 	[SPIN_ERR_LOCK_HELD]		= "Destroying a held spinlock",
195 	[SPIN_ERR_LOCK_COUNT]		= "Lock count is invalid",
196 	[SPIN_ERR_HOLD_DURING_SWITCH]	= "Lock(s) held while SPDK thread going off CPU",
197 };
198 
199 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \
200 				? "Unknown error" : spin_error_strings[err]
201 
202 static void
203 __posix_abort(enum spin_error err)
204 {
205 	abort();
206 }
207 
208 typedef void (*spin_abort)(enum spin_error err);
209 spin_abort g_spin_abort_fn = __posix_abort;
210 
211 #define SPIN_ASSERT_IMPL(cond, err, ret) \
212 	do { \
213 		if (spdk_unlikely(!(cond))) { \
214 			SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \
215 				    SPIN_ERROR_STRING(err), #cond); \
216 			g_spin_abort_fn(err); \
217 			ret; \
218 		} \
219 	} while (0)
220 #define SPIN_ASSERT_RETURN_VOID(cond, err)	SPIN_ASSERT_IMPL(cond, err, return)
221 #define SPIN_ASSERT_RETURN(cond, err, ret)	SPIN_ASSERT_IMPL(cond, err, return ret)
222 #define SPIN_ASSERT(cond, err)			SPIN_ASSERT_IMPL(cond, err,)
223 
224 struct io_device {
225 	void				*io_device;
226 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
227 	spdk_io_channel_create_cb	create_cb;
228 	spdk_io_channel_destroy_cb	destroy_cb;
229 	spdk_io_device_unregister_cb	unregister_cb;
230 	struct spdk_thread		*unregister_thread;
231 	uint32_t			ctx_size;
232 	uint32_t			for_each_count;
233 	RB_ENTRY(io_device)		node;
234 
235 	uint32_t			refcnt;
236 
237 	bool				pending_unregister;
238 	bool				unregistered;
239 };
240 
241 struct iobuf_channel {
242 	spdk_iobuf_entry_stailq_t small_queue;
243 	spdk_iobuf_entry_stailq_t large_queue;
244 };
245 
246 struct iobuf_module {
247 	char				*name;
248 	TAILQ_ENTRY(iobuf_module)	tailq;
249 };
250 
251 struct iobuf {
252 	struct spdk_mempool		*small_pool;
253 	struct spdk_mempool		*large_pool;
254 	struct spdk_iobuf_opts		opts;
255 	TAILQ_HEAD(, iobuf_module)	modules;
256 	spdk_iobuf_finish_cb		finish_cb;
257 	void				*finish_arg;
258 };
259 
260 static struct iobuf g_iobuf = {
261 	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
262 	.opts = {
263 		.small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE,
264 		.large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE,
265 		.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE,
266 		.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE,
267 	},
268 };
269 
270 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices);
271 
272 static int
273 io_device_cmp(struct io_device *dev1, struct io_device *dev2)
274 {
275 	return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device);
276 }
277 
278 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp);
279 
280 static int
281 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2)
282 {
283 	return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev);
284 }
285 
286 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp);
287 
288 struct spdk_msg {
289 	spdk_msg_fn		fn;
290 	void			*arg;
291 
292 	SLIST_ENTRY(spdk_msg)	link;
293 };
294 
295 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
296 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
297 
298 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
299 static uint32_t g_thread_count = 0;
300 
301 static __thread struct spdk_thread *tls_thread = NULL;
302 
303 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD)
304 {
305 	spdk_trace_register_description("THREAD_IOCH_GET",
306 					TRACE_THREAD_IOCH_GET,
307 					OWNER_NONE, OBJECT_NONE, 0,
308 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
309 	spdk_trace_register_description("THREAD_IOCH_PUT",
310 					TRACE_THREAD_IOCH_PUT,
311 					OWNER_NONE, OBJECT_NONE, 0,
312 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
313 }
314 
315 /*
316  * If this compare function returns zero when two next_run_ticks are equal,
317  * the macro RB_INSERT() returns a pointer to the element with the same
318  * next_run_tick.
319  *
320  * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element
321  * to remove as a parameter.
322  *
323  * Hence we allow RB_INSERT() to insert elements with the same keys on the right
324  * side by returning 1 when two next_run_ticks are equal.
325  */
326 static inline int
327 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2)
328 {
329 	if (poller1->next_run_tick < poller2->next_run_tick) {
330 		return -1;
331 	} else {
332 		return 1;
333 	}
334 }
335 
336 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare);
337 
338 static inline struct spdk_thread *
339 _get_thread(void)
340 {
341 	return tls_thread;
342 }
343 
344 static int
345 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz)
346 {
347 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
348 
349 	g_ctx_sz = ctx_sz;
350 
351 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
352 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz,
353 			     sizeof(struct spdk_msg),
354 			     0, /* No cache. We do our own. */
355 			     SPDK_ENV_SOCKET_ID_ANY);
356 
357 	SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n",
358 		      msg_mempool_sz);
359 
360 	if (!g_spdk_msg_mempool) {
361 		SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
362 		return -ENOMEM;
363 	}
364 
365 	return 0;
366 }
367 
368 static void thread_interrupt_destroy(struct spdk_thread *thread);
369 static int thread_interrupt_create(struct spdk_thread *thread);
370 
371 static void
372 _free_thread(struct spdk_thread *thread)
373 {
374 	struct spdk_io_channel *ch;
375 	struct spdk_msg *msg;
376 	struct spdk_poller *poller, *ptmp;
377 
378 	RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
379 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
380 			    thread->name, ch->dev->name);
381 	}
382 
383 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
384 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
385 			SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
386 				     poller->name);
387 		}
388 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
389 		free(poller);
390 	}
391 
392 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) {
393 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
394 			SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
395 				     poller->name);
396 		}
397 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
398 		free(poller);
399 	}
400 
401 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
402 		SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
403 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
404 		free(poller);
405 	}
406 
407 	pthread_mutex_lock(&g_devlist_mutex);
408 	assert(g_thread_count > 0);
409 	g_thread_count--;
410 	TAILQ_REMOVE(&g_threads, thread, tailq);
411 	pthread_mutex_unlock(&g_devlist_mutex);
412 
413 	msg = SLIST_FIRST(&thread->msg_cache);
414 	while (msg != NULL) {
415 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
416 
417 		assert(thread->msg_cache_count > 0);
418 		thread->msg_cache_count--;
419 		spdk_mempool_put(g_spdk_msg_mempool, msg);
420 
421 		msg = SLIST_FIRST(&thread->msg_cache);
422 	}
423 
424 	assert(thread->msg_cache_count == 0);
425 
426 	if (spdk_interrupt_mode_is_enabled()) {
427 		thread_interrupt_destroy(thread);
428 	}
429 
430 	spdk_ring_free(thread->messages);
431 	free(thread);
432 }
433 
434 int
435 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
436 {
437 	assert(g_new_thread_fn == NULL);
438 	assert(g_thread_op_fn == NULL);
439 
440 	if (new_thread_fn == NULL) {
441 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
442 	} else {
443 		g_new_thread_fn = new_thread_fn;
444 	}
445 
446 	return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE);
447 }
448 
449 int
450 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
451 			 spdk_thread_op_supported_fn thread_op_supported_fn,
452 			 size_t ctx_sz, size_t msg_mempool_sz)
453 {
454 	assert(g_new_thread_fn == NULL);
455 	assert(g_thread_op_fn == NULL);
456 	assert(g_thread_op_supported_fn == NULL);
457 
458 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
459 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
460 		return -EINVAL;
461 	}
462 
463 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
464 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
465 	} else {
466 		g_thread_op_fn = thread_op_fn;
467 		g_thread_op_supported_fn = thread_op_supported_fn;
468 	}
469 
470 	return _thread_lib_init(ctx_sz, msg_mempool_sz);
471 }
472 
473 void
474 spdk_thread_lib_fini(void)
475 {
476 	struct io_device *dev;
477 
478 	RB_FOREACH(dev, io_device_tree, &g_io_devices) {
479 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
480 	}
481 
482 	g_new_thread_fn = NULL;
483 	g_thread_op_fn = NULL;
484 	g_thread_op_supported_fn = NULL;
485 	g_ctx_sz = 0;
486 	if (g_app_thread != NULL) {
487 		_free_thread(g_app_thread);
488 		g_app_thread = NULL;
489 	}
490 
491 	if (g_spdk_msg_mempool) {
492 		spdk_mempool_free(g_spdk_msg_mempool);
493 		g_spdk_msg_mempool = NULL;
494 	}
495 }
496 
497 struct spdk_thread *
498 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask)
499 {
500 	struct spdk_thread *thread, *null_thread;
501 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
502 	int rc = 0, i;
503 
504 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
505 	if (!thread) {
506 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
507 		return NULL;
508 	}
509 
510 	if (cpumask) {
511 		spdk_cpuset_copy(&thread->cpumask, cpumask);
512 	} else {
513 		spdk_cpuset_negate(&thread->cpumask);
514 	}
515 
516 	RB_INIT(&thread->io_channels);
517 	TAILQ_INIT(&thread->active_pollers);
518 	RB_INIT(&thread->timed_pollers);
519 	TAILQ_INIT(&thread->paused_pollers);
520 	SLIST_INIT(&thread->msg_cache);
521 	thread->msg_cache_count = 0;
522 
523 	thread->tsc_last = spdk_get_ticks();
524 
525 	/* Monotonic increasing ID is set to each created poller beginning at 1. Once the
526 	 * ID exceeds UINT64_MAX a warning message is logged
527 	 */
528 	thread->next_poller_id = 1;
529 
530 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
531 	if (!thread->messages) {
532 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
533 		free(thread);
534 		return NULL;
535 	}
536 
537 	/* Fill the local message pool cache. */
538 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
539 	if (rc == 0) {
540 		/* If we can't populate the cache it's ok. The cache will get filled
541 		 * up organically as messages are passed to the thread. */
542 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
543 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
544 			thread->msg_cache_count++;
545 		}
546 	}
547 
548 	if (name) {
549 		snprintf(thread->name, sizeof(thread->name), "%s", name);
550 	} else {
551 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
552 	}
553 
554 	pthread_mutex_lock(&g_devlist_mutex);
555 	if (g_thread_id == 0) {
556 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
557 		pthread_mutex_unlock(&g_devlist_mutex);
558 		_free_thread(thread);
559 		return NULL;
560 	}
561 	thread->id = g_thread_id++;
562 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
563 	g_thread_count++;
564 	pthread_mutex_unlock(&g_devlist_mutex);
565 
566 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
567 		      thread->id, thread->name);
568 
569 	if (spdk_interrupt_mode_is_enabled()) {
570 		thread->in_interrupt = true;
571 		rc = thread_interrupt_create(thread);
572 		if (rc != 0) {
573 			_free_thread(thread);
574 			return NULL;
575 		}
576 	}
577 
578 	if (g_new_thread_fn) {
579 		rc = g_new_thread_fn(thread);
580 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
581 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
582 	}
583 
584 	if (rc != 0) {
585 		_free_thread(thread);
586 		return NULL;
587 	}
588 
589 	thread->state = SPDK_THREAD_STATE_RUNNING;
590 
591 	/* If this is the first thread, save it as the app thread.  Use an atomic
592 	 * compare + exchange to guard against crazy users who might try to
593 	 * call spdk_thread_create() simultaneously on multiple threads.
594 	 */
595 	null_thread = NULL;
596 	__atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false,
597 				    __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
598 
599 	return thread;
600 }
601 
602 struct spdk_thread *
603 spdk_thread_get_app_thread(void)
604 {
605 	return g_app_thread;
606 }
607 
608 void
609 spdk_set_thread(struct spdk_thread *thread)
610 {
611 	tls_thread = thread;
612 }
613 
614 static void
615 thread_exit(struct spdk_thread *thread, uint64_t now)
616 {
617 	struct spdk_poller *poller;
618 	struct spdk_io_channel *ch;
619 
620 	if (now >= thread->exit_timeout_tsc) {
621 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
622 			    thread->name);
623 		goto exited;
624 	}
625 
626 	if (spdk_ring_count(thread->messages) > 0) {
627 		SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name);
628 		return;
629 	}
630 
631 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
632 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
633 			SPDK_INFOLOG(thread,
634 				     "thread %s still has active poller %s\n",
635 				     thread->name, poller->name);
636 			return;
637 		}
638 	}
639 
640 	RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) {
641 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
642 			SPDK_INFOLOG(thread,
643 				     "thread %s still has active timed poller %s\n",
644 				     thread->name, poller->name);
645 			return;
646 		}
647 	}
648 
649 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
650 		SPDK_INFOLOG(thread,
651 			     "thread %s still has paused poller %s\n",
652 			     thread->name, poller->name);
653 		return;
654 	}
655 
656 	RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
657 		SPDK_INFOLOG(thread,
658 			     "thread %s still has channel for io_device %s\n",
659 			     thread->name, ch->dev->name);
660 		return;
661 	}
662 
663 	if (thread->pending_unregister_count > 0) {
664 		SPDK_INFOLOG(thread,
665 			     "thread %s is still unregistering io_devices\n",
666 			     thread->name);
667 		return;
668 	}
669 
670 exited:
671 	thread->state = SPDK_THREAD_STATE_EXITED;
672 	if (spdk_unlikely(thread->in_interrupt)) {
673 		g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
674 	}
675 }
676 
677 int
678 spdk_thread_exit(struct spdk_thread *thread)
679 {
680 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
681 
682 	assert(tls_thread == thread);
683 
684 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
685 		SPDK_INFOLOG(thread,
686 			     "thread %s is already exiting\n",
687 			     thread->name);
688 		return 0;
689 	}
690 
691 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
692 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
693 	thread->state = SPDK_THREAD_STATE_EXITING;
694 	return 0;
695 }
696 
697 bool
698 spdk_thread_is_running(struct spdk_thread *thread)
699 {
700 	return thread->state == SPDK_THREAD_STATE_RUNNING;
701 }
702 
703 bool
704 spdk_thread_is_exited(struct spdk_thread *thread)
705 {
706 	return thread->state == SPDK_THREAD_STATE_EXITED;
707 }
708 
709 void
710 spdk_thread_destroy(struct spdk_thread *thread)
711 {
712 	assert(thread != NULL);
713 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
714 
715 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
716 
717 	if (tls_thread == thread) {
718 		tls_thread = NULL;
719 	}
720 
721 	/* To be safe, do not free the app thread until spdk_thread_lib_fini(). */
722 	if (thread != g_app_thread) {
723 		_free_thread(thread);
724 	}
725 }
726 
727 void *
728 spdk_thread_get_ctx(struct spdk_thread *thread)
729 {
730 	if (g_ctx_sz > 0) {
731 		return thread->ctx;
732 	}
733 
734 	return NULL;
735 }
736 
737 struct spdk_cpuset *
738 spdk_thread_get_cpumask(struct spdk_thread *thread)
739 {
740 	return &thread->cpumask;
741 }
742 
743 int
744 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
745 {
746 	struct spdk_thread *thread;
747 
748 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
749 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
750 		assert(false);
751 		return -ENOTSUP;
752 	}
753 
754 	thread = spdk_get_thread();
755 	if (!thread) {
756 		SPDK_ERRLOG("Called from non-SPDK thread\n");
757 		assert(false);
758 		return -EINVAL;
759 	}
760 
761 	spdk_cpuset_copy(&thread->cpumask, cpumask);
762 
763 	/* Invoke framework's reschedule operation. If this function is called multiple times
764 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
765 	 * reschedule operation.
766 	 */
767 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
768 
769 	return 0;
770 }
771 
772 struct spdk_thread *
773 spdk_thread_get_from_ctx(void *ctx)
774 {
775 	if (ctx == NULL) {
776 		assert(false);
777 		return NULL;
778 	}
779 
780 	assert(g_ctx_sz > 0);
781 
782 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
783 }
784 
785 static inline uint32_t
786 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
787 {
788 	unsigned count, i;
789 	void *messages[SPDK_MSG_BATCH_SIZE];
790 	uint64_t notify = 1;
791 	int rc;
792 
793 #ifdef DEBUG
794 	/*
795 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
796 	 * so we will never actually read uninitialized data from events, but just to be sure
797 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
798 	 */
799 	memset(messages, 0, sizeof(messages));
800 #endif
801 
802 	if (max_msgs > 0) {
803 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
804 	} else {
805 		max_msgs = SPDK_MSG_BATCH_SIZE;
806 	}
807 
808 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
809 	if (spdk_unlikely(thread->in_interrupt) &&
810 	    spdk_ring_count(thread->messages) != 0) {
811 		rc = write(thread->msg_fd, &notify, sizeof(notify));
812 		if (rc < 0) {
813 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
814 		}
815 	}
816 	if (count == 0) {
817 		return 0;
818 	}
819 
820 	for (i = 0; i < count; i++) {
821 		struct spdk_msg *msg = messages[i];
822 
823 		assert(msg != NULL);
824 
825 		SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg);
826 
827 		msg->fn(msg->arg);
828 
829 		SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
830 
831 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
832 			/* Insert the messages at the head. We want to re-use the hot
833 			 * ones. */
834 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
835 			thread->msg_cache_count++;
836 		} else {
837 			spdk_mempool_put(g_spdk_msg_mempool, msg);
838 		}
839 	}
840 
841 	return count;
842 }
843 
844 static void
845 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
846 {
847 	struct spdk_poller *tmp __attribute__((unused));
848 
849 	poller->next_run_tick = now + poller->period_ticks;
850 
851 	/*
852 	 * Insert poller in the thread's timed_pollers tree by next scheduled run time
853 	 * as its key.
854 	 */
855 	tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller);
856 	assert(tmp == NULL);
857 
858 	/* Update the cache only if it is empty or the inserted poller is earlier than it.
859 	 * RB_MIN() is not necessary here because all pollers, which has exactly the same
860 	 * next_run_tick as the existing poller, are inserted on the right side.
861 	 */
862 	if (thread->first_timed_poller == NULL ||
863 	    poller->next_run_tick < thread->first_timed_poller->next_run_tick) {
864 		thread->first_timed_poller = poller;
865 	}
866 }
867 
868 static inline void
869 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller)
870 {
871 	struct spdk_poller *tmp __attribute__((unused));
872 
873 	tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
874 	assert(tmp != NULL);
875 
876 	/* This function is not used in any case that is performance critical.
877 	 * Update the cache simply by RB_MIN() if it needs to be changed.
878 	 */
879 	if (thread->first_timed_poller == poller) {
880 		thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers);
881 	}
882 }
883 
884 static void
885 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
886 {
887 	if (poller->period_ticks) {
888 		poller_insert_timer(thread, poller, spdk_get_ticks());
889 	} else {
890 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
891 	}
892 }
893 
894 static inline void
895 thread_update_stats(struct spdk_thread *thread, uint64_t end,
896 		    uint64_t start, int rc)
897 {
898 	if (rc == 0) {
899 		/* Poller status idle */
900 		thread->stats.idle_tsc += end - start;
901 	} else if (rc > 0) {
902 		/* Poller status busy */
903 		thread->stats.busy_tsc += end - start;
904 	}
905 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
906 	thread->tsc_last = end;
907 }
908 
909 static inline int
910 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller)
911 {
912 	int rc;
913 
914 	switch (poller->state) {
915 	case SPDK_POLLER_STATE_UNREGISTERED:
916 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
917 		free(poller);
918 		return 0;
919 	case SPDK_POLLER_STATE_PAUSING:
920 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
921 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
922 		poller->state = SPDK_POLLER_STATE_PAUSED;
923 		return 0;
924 	case SPDK_POLLER_STATE_WAITING:
925 		break;
926 	default:
927 		assert(false);
928 		break;
929 	}
930 
931 	poller->state = SPDK_POLLER_STATE_RUNNING;
932 	rc = poller->fn(poller->arg);
933 
934 	SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
935 
936 	poller->run_count++;
937 	if (rc > 0) {
938 		poller->busy_count++;
939 	}
940 
941 #ifdef DEBUG
942 	if (rc == -1) {
943 		SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
944 	}
945 #endif
946 
947 	switch (poller->state) {
948 	case SPDK_POLLER_STATE_UNREGISTERED:
949 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
950 		free(poller);
951 		break;
952 	case SPDK_POLLER_STATE_PAUSING:
953 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
954 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
955 		poller->state = SPDK_POLLER_STATE_PAUSED;
956 		break;
957 	case SPDK_POLLER_STATE_PAUSED:
958 	case SPDK_POLLER_STATE_WAITING:
959 		break;
960 	case SPDK_POLLER_STATE_RUNNING:
961 		poller->state = SPDK_POLLER_STATE_WAITING;
962 		break;
963 	default:
964 		assert(false);
965 		break;
966 	}
967 
968 	return rc;
969 }
970 
971 static inline int
972 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller,
973 			    uint64_t now)
974 {
975 	int rc;
976 
977 	switch (poller->state) {
978 	case SPDK_POLLER_STATE_UNREGISTERED:
979 		free(poller);
980 		return 0;
981 	case SPDK_POLLER_STATE_PAUSING:
982 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
983 		poller->state = SPDK_POLLER_STATE_PAUSED;
984 		return 0;
985 	case SPDK_POLLER_STATE_WAITING:
986 		break;
987 	default:
988 		assert(false);
989 		break;
990 	}
991 
992 	poller->state = SPDK_POLLER_STATE_RUNNING;
993 	rc = poller->fn(poller->arg);
994 
995 	SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
996 
997 	poller->run_count++;
998 	if (rc > 0) {
999 		poller->busy_count++;
1000 	}
1001 
1002 #ifdef DEBUG
1003 	if (rc == -1) {
1004 		SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
1005 	}
1006 #endif
1007 
1008 	switch (poller->state) {
1009 	case SPDK_POLLER_STATE_UNREGISTERED:
1010 		free(poller);
1011 		break;
1012 	case SPDK_POLLER_STATE_PAUSING:
1013 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1014 		poller->state = SPDK_POLLER_STATE_PAUSED;
1015 		break;
1016 	case SPDK_POLLER_STATE_PAUSED:
1017 		break;
1018 	case SPDK_POLLER_STATE_RUNNING:
1019 		poller->state = SPDK_POLLER_STATE_WAITING;
1020 	/* fallthrough */
1021 	case SPDK_POLLER_STATE_WAITING:
1022 		poller_insert_timer(thread, poller, now);
1023 		break;
1024 	default:
1025 		assert(false);
1026 		break;
1027 	}
1028 
1029 	return rc;
1030 }
1031 
1032 static int
1033 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
1034 {
1035 	uint32_t msg_count;
1036 	struct spdk_poller *poller, *tmp;
1037 	spdk_msg_fn critical_msg;
1038 	int rc = 0;
1039 
1040 	thread->tsc_last = now;
1041 
1042 	critical_msg = thread->critical_msg;
1043 	if (spdk_unlikely(critical_msg != NULL)) {
1044 		critical_msg(NULL);
1045 		thread->critical_msg = NULL;
1046 		rc = 1;
1047 	}
1048 
1049 	msg_count = msg_queue_run_batch(thread, max_msgs);
1050 	if (msg_count) {
1051 		rc = 1;
1052 	}
1053 
1054 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
1055 				   active_pollers_head, tailq, tmp) {
1056 		int poller_rc;
1057 
1058 		poller_rc = thread_execute_poller(thread, poller);
1059 		if (poller_rc > rc) {
1060 			rc = poller_rc;
1061 		}
1062 	}
1063 
1064 	poller = thread->first_timed_poller;
1065 	while (poller != NULL) {
1066 		int timer_rc = 0;
1067 
1068 		if (now < poller->next_run_tick) {
1069 			break;
1070 		}
1071 
1072 		tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller);
1073 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
1074 
1075 		/* Update the cache to the next timed poller in the list
1076 		 * only if the current poller is still the closest, otherwise,
1077 		 * do nothing because the cache has been already updated.
1078 		 */
1079 		if (thread->first_timed_poller == poller) {
1080 			thread->first_timed_poller = tmp;
1081 		}
1082 
1083 		timer_rc = thread_execute_timed_poller(thread, poller, now);
1084 		if (timer_rc > rc) {
1085 			rc = timer_rc;
1086 		}
1087 
1088 		poller = tmp;
1089 	}
1090 
1091 	return rc;
1092 }
1093 
1094 int
1095 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
1096 {
1097 	struct spdk_thread *orig_thread;
1098 	int rc;
1099 	uint64_t notify = 1;
1100 
1101 	orig_thread = _get_thread();
1102 	tls_thread = thread;
1103 
1104 	if (now == 0) {
1105 		now = spdk_get_ticks();
1106 	}
1107 
1108 	if (spdk_likely(!thread->in_interrupt)) {
1109 		rc = thread_poll(thread, max_msgs, now);
1110 		if (spdk_unlikely(thread->in_interrupt)) {
1111 			/* The thread transitioned to interrupt mode during the above poll.
1112 			 * Poll it one more time in case that during the transition time
1113 			 * there is msg received without notification.
1114 			 */
1115 			rc = thread_poll(thread, max_msgs, now);
1116 		}
1117 	} else {
1118 		/* Non-block wait on thread's fd_group */
1119 		rc = spdk_fd_group_wait(thread->fgrp, 0);
1120 		SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
1121 		if (spdk_unlikely(!thread->in_interrupt)) {
1122 			/* The thread transitioned to poll mode in a msg during the above processing.
1123 			 * Clear msg_fd since thread messages will be polled directly in poll mode.
1124 			 */
1125 			rc = read(thread->msg_fd, &notify, sizeof(notify));
1126 			if (rc < 0 && errno != EAGAIN) {
1127 				SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno));
1128 			}
1129 		}
1130 
1131 		/* Reap unregistered pollers out of poller execution in intr mode */
1132 		if (spdk_unlikely(thread->poller_unregistered)) {
1133 			struct spdk_poller *poller, *tmp;
1134 
1135 			TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
1136 						   active_pollers_head, tailq, tmp) {
1137 				if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1138 					TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1139 					free(poller);
1140 				}
1141 			}
1142 
1143 			RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
1144 				if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1145 					poller_remove_timer(thread, poller);
1146 					free(poller);
1147 				}
1148 			}
1149 
1150 			thread->poller_unregistered = false;
1151 		}
1152 	}
1153 
1154 
1155 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
1156 		thread_exit(thread, now);
1157 	}
1158 
1159 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
1160 
1161 	tls_thread = orig_thread;
1162 
1163 	return rc;
1164 }
1165 
1166 uint64_t
1167 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
1168 {
1169 	struct spdk_poller *poller;
1170 
1171 	poller = thread->first_timed_poller;
1172 	if (poller) {
1173 		return poller->next_run_tick;
1174 	}
1175 
1176 	return 0;
1177 }
1178 
1179 int
1180 spdk_thread_has_active_pollers(struct spdk_thread *thread)
1181 {
1182 	return !TAILQ_EMPTY(&thread->active_pollers);
1183 }
1184 
1185 static bool
1186 thread_has_unpaused_pollers(struct spdk_thread *thread)
1187 {
1188 	if (TAILQ_EMPTY(&thread->active_pollers) &&
1189 	    RB_EMPTY(&thread->timed_pollers)) {
1190 		return false;
1191 	}
1192 
1193 	return true;
1194 }
1195 
1196 bool
1197 spdk_thread_has_pollers(struct spdk_thread *thread)
1198 {
1199 	if (!thread_has_unpaused_pollers(thread) &&
1200 	    TAILQ_EMPTY(&thread->paused_pollers)) {
1201 		return false;
1202 	}
1203 
1204 	return true;
1205 }
1206 
1207 bool
1208 spdk_thread_is_idle(struct spdk_thread *thread)
1209 {
1210 	if (spdk_ring_count(thread->messages) ||
1211 	    thread_has_unpaused_pollers(thread) ||
1212 	    thread->critical_msg != NULL) {
1213 		return false;
1214 	}
1215 
1216 	return true;
1217 }
1218 
1219 uint32_t
1220 spdk_thread_get_count(void)
1221 {
1222 	/*
1223 	 * Return cached value of the current thread count.  We could acquire the
1224 	 *  lock and iterate through the TAILQ of threads to count them, but that
1225 	 *  count could still be invalidated after we release the lock.
1226 	 */
1227 	return g_thread_count;
1228 }
1229 
1230 struct spdk_thread *
1231 spdk_get_thread(void)
1232 {
1233 	return _get_thread();
1234 }
1235 
1236 const char *
1237 spdk_thread_get_name(const struct spdk_thread *thread)
1238 {
1239 	return thread->name;
1240 }
1241 
1242 uint64_t
1243 spdk_thread_get_id(const struct spdk_thread *thread)
1244 {
1245 	return thread->id;
1246 }
1247 
1248 struct spdk_thread *
1249 spdk_thread_get_by_id(uint64_t id)
1250 {
1251 	struct spdk_thread *thread;
1252 
1253 	if (id == 0 || id >= g_thread_id) {
1254 		SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
1255 		return NULL;
1256 	}
1257 	pthread_mutex_lock(&g_devlist_mutex);
1258 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1259 		if (thread->id == id) {
1260 			break;
1261 		}
1262 	}
1263 	pthread_mutex_unlock(&g_devlist_mutex);
1264 	return thread;
1265 }
1266 
1267 int
1268 spdk_thread_get_stats(struct spdk_thread_stats *stats)
1269 {
1270 	struct spdk_thread *thread;
1271 
1272 	thread = _get_thread();
1273 	if (!thread) {
1274 		SPDK_ERRLOG("No thread allocated\n");
1275 		return -EINVAL;
1276 	}
1277 
1278 	if (stats == NULL) {
1279 		return -EINVAL;
1280 	}
1281 
1282 	*stats = thread->stats;
1283 
1284 	return 0;
1285 }
1286 
1287 uint64_t
1288 spdk_thread_get_last_tsc(struct spdk_thread *thread)
1289 {
1290 	if (thread == NULL) {
1291 		thread = _get_thread();
1292 	}
1293 
1294 	return thread->tsc_last;
1295 }
1296 
1297 static inline int
1298 thread_send_msg_notification(const struct spdk_thread *target_thread)
1299 {
1300 	uint64_t notify = 1;
1301 	int rc;
1302 
1303 	/* Not necessary to do notification if interrupt facility is not enabled */
1304 	if (spdk_likely(!spdk_interrupt_mode_is_enabled())) {
1305 		return 0;
1306 	}
1307 
1308 	/* When each spdk_thread can switch between poll and interrupt mode dynamically,
1309 	 * after sending thread msg, it is necessary to check whether target thread runs in
1310 	 * interrupt mode and then decide whether do event notification.
1311 	 */
1312 	if (spdk_unlikely(target_thread->in_interrupt)) {
1313 		rc = write(target_thread->msg_fd, &notify, sizeof(notify));
1314 		if (rc < 0) {
1315 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
1316 			return -EIO;
1317 		}
1318 	}
1319 
1320 	return 0;
1321 }
1322 
1323 int
1324 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
1325 {
1326 	struct spdk_thread *local_thread;
1327 	struct spdk_msg *msg;
1328 	int rc;
1329 
1330 	assert(thread != NULL);
1331 
1332 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1333 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
1334 		return -EIO;
1335 	}
1336 
1337 	local_thread = _get_thread();
1338 
1339 	msg = NULL;
1340 	if (local_thread != NULL) {
1341 		if (local_thread->msg_cache_count > 0) {
1342 			msg = SLIST_FIRST(&local_thread->msg_cache);
1343 			assert(msg != NULL);
1344 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
1345 			local_thread->msg_cache_count--;
1346 		}
1347 	}
1348 
1349 	if (msg == NULL) {
1350 		msg = spdk_mempool_get(g_spdk_msg_mempool);
1351 		if (!msg) {
1352 			SPDK_ERRLOG("msg could not be allocated\n");
1353 			return -ENOMEM;
1354 		}
1355 	}
1356 
1357 	msg->fn = fn;
1358 	msg->arg = ctx;
1359 
1360 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
1361 	if (rc != 1) {
1362 		SPDK_ERRLOG("msg could not be enqueued\n");
1363 		spdk_mempool_put(g_spdk_msg_mempool, msg);
1364 		return -EIO;
1365 	}
1366 
1367 	return thread_send_msg_notification(thread);
1368 }
1369 
1370 int
1371 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
1372 {
1373 	spdk_msg_fn expected = NULL;
1374 
1375 	if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
1376 					 __ATOMIC_SEQ_CST)) {
1377 		return -EIO;
1378 	}
1379 
1380 	return thread_send_msg_notification(thread);
1381 }
1382 
1383 #ifdef __linux__
1384 static int
1385 interrupt_timerfd_process(void *arg)
1386 {
1387 	struct spdk_poller *poller = arg;
1388 	uint64_t exp;
1389 	int rc;
1390 
1391 	/* clear the level of interval timer */
1392 	rc = read(poller->interruptfd, &exp, sizeof(exp));
1393 	if (rc < 0) {
1394 		if (rc == -EAGAIN) {
1395 			return 0;
1396 		}
1397 
1398 		return rc;
1399 	}
1400 
1401 	SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg);
1402 
1403 	return poller->fn(poller->arg);
1404 }
1405 
1406 static int
1407 period_poller_interrupt_init(struct spdk_poller *poller)
1408 {
1409 	struct spdk_fd_group *fgrp = poller->thread->fgrp;
1410 	int timerfd;
1411 	int rc;
1412 
1413 	SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name);
1414 	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
1415 	if (timerfd < 0) {
1416 		return -errno;
1417 	}
1418 
1419 	rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller);
1420 	if (rc < 0) {
1421 		close(timerfd);
1422 		return rc;
1423 	}
1424 
1425 	poller->interruptfd = timerfd;
1426 	return 0;
1427 }
1428 
1429 static void
1430 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1431 {
1432 	int timerfd = poller->interruptfd;
1433 	uint64_t now_tick = spdk_get_ticks();
1434 	uint64_t ticks = spdk_get_ticks_hz();
1435 	int ret;
1436 	struct itimerspec new_tv = {};
1437 	struct itimerspec old_tv = {};
1438 
1439 	assert(poller->period_ticks != 0);
1440 	assert(timerfd >= 0);
1441 
1442 	SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name,
1443 		      interrupt_mode ? "interrupt" : "poll");
1444 
1445 	if (interrupt_mode) {
1446 		/* Set repeated timer expiration */
1447 		new_tv.it_interval.tv_sec = poller->period_ticks / ticks;
1448 		new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks;
1449 
1450 		/* Update next timer expiration */
1451 		if (poller->next_run_tick == 0) {
1452 			poller->next_run_tick = now_tick + poller->period_ticks;
1453 		} else if (poller->next_run_tick < now_tick) {
1454 			poller->next_run_tick = now_tick;
1455 		}
1456 
1457 		new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks;
1458 		new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks;
1459 
1460 		ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
1461 		if (ret < 0) {
1462 			SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno);
1463 			assert(false);
1464 		}
1465 	} else {
1466 		/* Disarm the timer */
1467 		ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv);
1468 		if (ret < 0) {
1469 			/* timerfd_settime's failure indicates that the timerfd is in error */
1470 			SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno);
1471 			assert(false);
1472 		}
1473 
1474 		/* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be
1475 		 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC
1476 		 */
1477 		now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \
1478 			   (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC;
1479 		poller_remove_timer(poller->thread, poller);
1480 		poller_insert_timer(poller->thread, poller, now_tick);
1481 	}
1482 }
1483 
1484 static void
1485 poller_interrupt_fini(struct spdk_poller *poller)
1486 {
1487 	SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name);
1488 	assert(poller->interruptfd >= 0);
1489 	spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd);
1490 	close(poller->interruptfd);
1491 	poller->interruptfd = -1;
1492 }
1493 
1494 static int
1495 busy_poller_interrupt_init(struct spdk_poller *poller)
1496 {
1497 	int busy_efd;
1498 	int rc;
1499 
1500 	SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name);
1501 	busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1502 	if (busy_efd < 0) {
1503 		SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name);
1504 		return -errno;
1505 	}
1506 
1507 	rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd,
1508 			       poller->fn, poller->arg, poller->name);
1509 	if (rc < 0) {
1510 		close(busy_efd);
1511 		return rc;
1512 	}
1513 
1514 	poller->interruptfd = busy_efd;
1515 	return 0;
1516 }
1517 
1518 static void
1519 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1520 {
1521 	int busy_efd = poller->interruptfd;
1522 	uint64_t notify = 1;
1523 	int rc __attribute__((unused));
1524 
1525 	assert(busy_efd >= 0);
1526 
1527 	if (interrupt_mode) {
1528 		/* Write without read on eventfd will get it repeatedly triggered. */
1529 		if (write(busy_efd, &notify, sizeof(notify)) < 0) {
1530 			SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name);
1531 		}
1532 	} else {
1533 		/* Read on eventfd will clear its level triggering. */
1534 		rc = read(busy_efd, &notify, sizeof(notify));
1535 	}
1536 }
1537 
1538 #else
1539 
1540 static int
1541 period_poller_interrupt_init(struct spdk_poller *poller)
1542 {
1543 	return -ENOTSUP;
1544 }
1545 
1546 static void
1547 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1548 {
1549 }
1550 
1551 static void
1552 poller_interrupt_fini(struct spdk_poller *poller)
1553 {
1554 }
1555 
1556 static int
1557 busy_poller_interrupt_init(struct spdk_poller *poller)
1558 {
1559 	return -ENOTSUP;
1560 }
1561 
1562 static void
1563 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1564 {
1565 }
1566 
1567 #endif
1568 
1569 void
1570 spdk_poller_register_interrupt(struct spdk_poller *poller,
1571 			       spdk_poller_set_interrupt_mode_cb cb_fn,
1572 			       void *cb_arg)
1573 {
1574 	assert(poller != NULL);
1575 	assert(cb_fn != NULL);
1576 	assert(spdk_get_thread() == poller->thread);
1577 
1578 	if (!spdk_interrupt_mode_is_enabled()) {
1579 		return;
1580 	}
1581 
1582 	/* when a poller is created we don't know if the user is ever going to
1583 	 * enable interrupts on it by calling this function, so the poller
1584 	 * registration function has to immediately create a interruptfd.
1585 	 * When this function does get called by user, we have to then destroy
1586 	 * that interruptfd.
1587 	 */
1588 	if (poller->set_intr_cb_fn && poller->interruptfd >= 0) {
1589 		poller_interrupt_fini(poller);
1590 	}
1591 
1592 	poller->set_intr_cb_fn = cb_fn;
1593 	poller->set_intr_cb_arg = cb_arg;
1594 
1595 	/* Set poller into interrupt mode if thread is in interrupt. */
1596 	if (poller->thread->in_interrupt) {
1597 		poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true);
1598 	}
1599 }
1600 
1601 static uint64_t
1602 convert_us_to_ticks(uint64_t us)
1603 {
1604 	uint64_t quotient, remainder, ticks;
1605 
1606 	if (us) {
1607 		quotient = us / SPDK_SEC_TO_USEC;
1608 		remainder = us % SPDK_SEC_TO_USEC;
1609 		ticks = spdk_get_ticks_hz();
1610 
1611 		return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
1612 	} else {
1613 		return 0;
1614 	}
1615 }
1616 
1617 static struct spdk_poller *
1618 poller_register(spdk_poller_fn fn,
1619 		void *arg,
1620 		uint64_t period_microseconds,
1621 		const char *name)
1622 {
1623 	struct spdk_thread *thread;
1624 	struct spdk_poller *poller;
1625 
1626 	thread = spdk_get_thread();
1627 	if (!thread) {
1628 		assert(false);
1629 		return NULL;
1630 	}
1631 
1632 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1633 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1634 		return NULL;
1635 	}
1636 
1637 	poller = calloc(1, sizeof(*poller));
1638 	if (poller == NULL) {
1639 		SPDK_ERRLOG("Poller memory allocation failed\n");
1640 		return NULL;
1641 	}
1642 
1643 	if (name) {
1644 		snprintf(poller->name, sizeof(poller->name), "%s", name);
1645 	} else {
1646 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
1647 	}
1648 
1649 	poller->state = SPDK_POLLER_STATE_WAITING;
1650 	poller->fn = fn;
1651 	poller->arg = arg;
1652 	poller->thread = thread;
1653 	poller->interruptfd = -1;
1654 	if (thread->next_poller_id == 0) {
1655 		SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n");
1656 		thread->next_poller_id = 1;
1657 	}
1658 	poller->id = thread->next_poller_id++;
1659 
1660 	poller->period_ticks = convert_us_to_ticks(period_microseconds);
1661 
1662 	if (spdk_interrupt_mode_is_enabled()) {
1663 		int rc;
1664 
1665 		if (period_microseconds) {
1666 			rc = period_poller_interrupt_init(poller);
1667 			if (rc < 0) {
1668 				SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc));
1669 				free(poller);
1670 				return NULL;
1671 			}
1672 
1673 			spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL);
1674 		} else {
1675 			/* If the poller doesn't have a period, create interruptfd that's always
1676 			 * busy automatically when running in interrupt mode.
1677 			 */
1678 			rc = busy_poller_interrupt_init(poller);
1679 			if (rc > 0) {
1680 				SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc));
1681 				free(poller);
1682 				return NULL;
1683 			}
1684 
1685 			spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL);
1686 		}
1687 	}
1688 
1689 	thread_insert_poller(thread, poller);
1690 
1691 	return poller;
1692 }
1693 
1694 struct spdk_poller *
1695 spdk_poller_register(spdk_poller_fn fn,
1696 		     void *arg,
1697 		     uint64_t period_microseconds)
1698 {
1699 	return poller_register(fn, arg, period_microseconds, NULL);
1700 }
1701 
1702 struct spdk_poller *
1703 spdk_poller_register_named(spdk_poller_fn fn,
1704 			   void *arg,
1705 			   uint64_t period_microseconds,
1706 			   const char *name)
1707 {
1708 	return poller_register(fn, arg, period_microseconds, name);
1709 }
1710 
1711 static void
1712 wrong_thread(const char *func, const char *name, struct spdk_thread *thread,
1713 	     struct spdk_thread *curthread)
1714 {
1715 	if (thread == NULL) {
1716 		SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name);
1717 		abort();
1718 	}
1719 	SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be "
1720 		    "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id,
1721 		    thread->name, thread->id);
1722 	assert(false);
1723 }
1724 
1725 void
1726 spdk_poller_unregister(struct spdk_poller **ppoller)
1727 {
1728 	struct spdk_thread *thread;
1729 	struct spdk_poller *poller;
1730 
1731 	poller = *ppoller;
1732 	if (poller == NULL) {
1733 		return;
1734 	}
1735 
1736 	*ppoller = NULL;
1737 
1738 	thread = spdk_get_thread();
1739 	if (!thread) {
1740 		assert(false);
1741 		return;
1742 	}
1743 
1744 	if (poller->thread != thread) {
1745 		wrong_thread(__func__, poller->name, poller->thread, thread);
1746 		return;
1747 	}
1748 
1749 	if (spdk_interrupt_mode_is_enabled()) {
1750 		/* Release the interrupt resource for period or busy poller */
1751 		if (poller->interruptfd >= 0) {
1752 			poller_interrupt_fini(poller);
1753 		}
1754 
1755 		/* Mark there is poller unregistered. Then unregistered pollers will
1756 		 * get reaped by spdk_thread_poll also in intr mode.
1757 		 */
1758 		thread->poller_unregistered = true;
1759 	}
1760 
1761 	/* If the poller was paused, put it on the active_pollers list so that
1762 	 * its unregistration can be processed by spdk_thread_poll().
1763 	 */
1764 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1765 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1766 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1767 		poller->period_ticks = 0;
1768 	}
1769 
1770 	/* Simply set the state to unregistered. The poller will get cleaned up
1771 	 * in a subsequent call to spdk_thread_poll().
1772 	 */
1773 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1774 }
1775 
1776 void
1777 spdk_poller_pause(struct spdk_poller *poller)
1778 {
1779 	struct spdk_thread *thread;
1780 
1781 	thread = spdk_get_thread();
1782 	if (!thread) {
1783 		assert(false);
1784 		return;
1785 	}
1786 
1787 	if (poller->thread != thread) {
1788 		wrong_thread(__func__, poller->name, poller->thread, thread);
1789 		return;
1790 	}
1791 
1792 	/* We just set its state to SPDK_POLLER_STATE_PAUSING and let
1793 	 * spdk_thread_poll() move it. It allows a poller to be paused from
1794 	 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE
1795 	 * iteration, or from within itself without breaking the logic to always
1796 	 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration.
1797 	 */
1798 	switch (poller->state) {
1799 	case SPDK_POLLER_STATE_PAUSED:
1800 	case SPDK_POLLER_STATE_PAUSING:
1801 		break;
1802 	case SPDK_POLLER_STATE_RUNNING:
1803 	case SPDK_POLLER_STATE_WAITING:
1804 		poller->state = SPDK_POLLER_STATE_PAUSING;
1805 		break;
1806 	default:
1807 		assert(false);
1808 		break;
1809 	}
1810 }
1811 
1812 void
1813 spdk_poller_resume(struct spdk_poller *poller)
1814 {
1815 	struct spdk_thread *thread;
1816 
1817 	thread = spdk_get_thread();
1818 	if (!thread) {
1819 		assert(false);
1820 		return;
1821 	}
1822 
1823 	if (poller->thread != thread) {
1824 		wrong_thread(__func__, poller->name, poller->thread, thread);
1825 		return;
1826 	}
1827 
1828 	/* If a poller is paused it has to be removed from the paused pollers
1829 	 * list and put on the active list or timer tree depending on its
1830 	 * period_ticks.  If a poller is still in the process of being paused,
1831 	 * we just need to flip its state back to waiting, as it's already on
1832 	 * the appropriate list or tree.
1833 	 */
1834 	switch (poller->state) {
1835 	case SPDK_POLLER_STATE_PAUSED:
1836 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1837 		thread_insert_poller(thread, poller);
1838 	/* fallthrough */
1839 	case SPDK_POLLER_STATE_PAUSING:
1840 		poller->state = SPDK_POLLER_STATE_WAITING;
1841 		break;
1842 	case SPDK_POLLER_STATE_RUNNING:
1843 	case SPDK_POLLER_STATE_WAITING:
1844 		break;
1845 	default:
1846 		assert(false);
1847 		break;
1848 	}
1849 }
1850 
1851 const char *
1852 spdk_poller_get_name(struct spdk_poller *poller)
1853 {
1854 	return poller->name;
1855 }
1856 
1857 uint64_t
1858 spdk_poller_get_id(struct spdk_poller *poller)
1859 {
1860 	return poller->id;
1861 }
1862 
1863 const char *
1864 spdk_poller_get_state_str(struct spdk_poller *poller)
1865 {
1866 	switch (poller->state) {
1867 	case SPDK_POLLER_STATE_WAITING:
1868 		return "waiting";
1869 	case SPDK_POLLER_STATE_RUNNING:
1870 		return "running";
1871 	case SPDK_POLLER_STATE_UNREGISTERED:
1872 		return "unregistered";
1873 	case SPDK_POLLER_STATE_PAUSING:
1874 		return "pausing";
1875 	case SPDK_POLLER_STATE_PAUSED:
1876 		return "paused";
1877 	default:
1878 		return NULL;
1879 	}
1880 }
1881 
1882 uint64_t
1883 spdk_poller_get_period_ticks(struct spdk_poller *poller)
1884 {
1885 	return poller->period_ticks;
1886 }
1887 
1888 void
1889 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats)
1890 {
1891 	stats->run_count = poller->run_count;
1892 	stats->busy_count = poller->busy_count;
1893 }
1894 
1895 struct spdk_poller *
1896 spdk_thread_get_first_active_poller(struct spdk_thread *thread)
1897 {
1898 	return TAILQ_FIRST(&thread->active_pollers);
1899 }
1900 
1901 struct spdk_poller *
1902 spdk_thread_get_next_active_poller(struct spdk_poller *prev)
1903 {
1904 	return TAILQ_NEXT(prev, tailq);
1905 }
1906 
1907 struct spdk_poller *
1908 spdk_thread_get_first_timed_poller(struct spdk_thread *thread)
1909 {
1910 	return RB_MIN(timed_pollers_tree, &thread->timed_pollers);
1911 }
1912 
1913 struct spdk_poller *
1914 spdk_thread_get_next_timed_poller(struct spdk_poller *prev)
1915 {
1916 	return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev);
1917 }
1918 
1919 struct spdk_poller *
1920 spdk_thread_get_first_paused_poller(struct spdk_thread *thread)
1921 {
1922 	return TAILQ_FIRST(&thread->paused_pollers);
1923 }
1924 
1925 struct spdk_poller *
1926 spdk_thread_get_next_paused_poller(struct spdk_poller *prev)
1927 {
1928 	return TAILQ_NEXT(prev, tailq);
1929 }
1930 
1931 struct spdk_io_channel *
1932 spdk_thread_get_first_io_channel(struct spdk_thread *thread)
1933 {
1934 	return RB_MIN(io_channel_tree, &thread->io_channels);
1935 }
1936 
1937 struct spdk_io_channel *
1938 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev)
1939 {
1940 	return RB_NEXT(io_channel_tree, &thread->io_channels, prev);
1941 }
1942 
1943 struct call_thread {
1944 	struct spdk_thread *cur_thread;
1945 	spdk_msg_fn fn;
1946 	void *ctx;
1947 
1948 	struct spdk_thread *orig_thread;
1949 	spdk_msg_fn cpl;
1950 };
1951 
1952 static void
1953 _on_thread(void *ctx)
1954 {
1955 	struct call_thread *ct = ctx;
1956 	int rc __attribute__((unused));
1957 
1958 	ct->fn(ct->ctx);
1959 
1960 	pthread_mutex_lock(&g_devlist_mutex);
1961 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1962 	while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) {
1963 		SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n",
1964 			      ct->cur_thread->name);
1965 		ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1966 	}
1967 	pthread_mutex_unlock(&g_devlist_mutex);
1968 
1969 	if (!ct->cur_thread) {
1970 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1971 
1972 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1973 		free(ctx);
1974 	} else {
1975 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1976 			      ct->cur_thread->name);
1977 
1978 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1979 	}
1980 	assert(rc == 0);
1981 }
1982 
1983 void
1984 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1985 {
1986 	struct call_thread *ct;
1987 	struct spdk_thread *thread;
1988 	int rc __attribute__((unused));
1989 
1990 	ct = calloc(1, sizeof(*ct));
1991 	if (!ct) {
1992 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1993 		cpl(ctx);
1994 		return;
1995 	}
1996 
1997 	ct->fn = fn;
1998 	ct->ctx = ctx;
1999 	ct->cpl = cpl;
2000 
2001 	thread = _get_thread();
2002 	if (!thread) {
2003 		SPDK_ERRLOG("No thread allocated\n");
2004 		free(ct);
2005 		cpl(ctx);
2006 		return;
2007 	}
2008 	ct->orig_thread = thread;
2009 
2010 	pthread_mutex_lock(&g_devlist_mutex);
2011 	ct->cur_thread = TAILQ_FIRST(&g_threads);
2012 	pthread_mutex_unlock(&g_devlist_mutex);
2013 
2014 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
2015 		      ct->orig_thread->name);
2016 
2017 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
2018 	assert(rc == 0);
2019 }
2020 
2021 static inline void
2022 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode)
2023 {
2024 	if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
2025 		return;
2026 	}
2027 
2028 	if (!poller->set_intr_cb_fn) {
2029 		SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name);
2030 		assert(false);
2031 		return;
2032 	}
2033 
2034 	poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode);
2035 }
2036 
2037 void
2038 spdk_thread_set_interrupt_mode(bool enable_interrupt)
2039 {
2040 	struct spdk_thread *thread = _get_thread();
2041 	struct spdk_poller *poller, *tmp;
2042 
2043 	assert(thread);
2044 	assert(spdk_interrupt_mode_is_enabled());
2045 
2046 	SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n",
2047 		       thread->name,  enable_interrupt ? "intr" : "poll",
2048 		       thread->in_interrupt ? "intr" : "poll");
2049 
2050 	if (thread->in_interrupt == enable_interrupt) {
2051 		return;
2052 	}
2053 
2054 	/* Set pollers to expected mode */
2055 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
2056 		poller_set_interrupt_mode(poller, enable_interrupt);
2057 	}
2058 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) {
2059 		poller_set_interrupt_mode(poller, enable_interrupt);
2060 	}
2061 	/* All paused pollers will go to work in interrupt mode */
2062 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) {
2063 		poller_set_interrupt_mode(poller, enable_interrupt);
2064 	}
2065 
2066 	thread->in_interrupt = enable_interrupt;
2067 	return;
2068 }
2069 
2070 static struct io_device *
2071 io_device_get(void *io_device)
2072 {
2073 	struct io_device find = {};
2074 
2075 	find.io_device = io_device;
2076 	return RB_FIND(io_device_tree, &g_io_devices, &find);
2077 }
2078 
2079 void
2080 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
2081 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
2082 			const char *name)
2083 {
2084 	struct io_device *dev, *tmp;
2085 	struct spdk_thread *thread;
2086 
2087 	assert(io_device != NULL);
2088 	assert(create_cb != NULL);
2089 	assert(destroy_cb != NULL);
2090 
2091 	thread = spdk_get_thread();
2092 	if (!thread) {
2093 		SPDK_ERRLOG("called from non-SPDK thread\n");
2094 		assert(false);
2095 		return;
2096 	}
2097 
2098 	dev = calloc(1, sizeof(struct io_device));
2099 	if (dev == NULL) {
2100 		SPDK_ERRLOG("could not allocate io_device\n");
2101 		return;
2102 	}
2103 
2104 	dev->io_device = io_device;
2105 	if (name) {
2106 		snprintf(dev->name, sizeof(dev->name), "%s", name);
2107 	} else {
2108 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
2109 	}
2110 	dev->create_cb = create_cb;
2111 	dev->destroy_cb = destroy_cb;
2112 	dev->unregister_cb = NULL;
2113 	dev->ctx_size = ctx_size;
2114 	dev->for_each_count = 0;
2115 	dev->unregistered = false;
2116 	dev->refcnt = 0;
2117 
2118 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
2119 		      dev->name, dev->io_device, thread->name);
2120 
2121 	pthread_mutex_lock(&g_devlist_mutex);
2122 	tmp = RB_INSERT(io_device_tree, &g_io_devices, dev);
2123 	if (tmp != NULL) {
2124 		SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
2125 			    io_device, tmp->name, dev->name);
2126 		free(dev);
2127 	}
2128 
2129 	pthread_mutex_unlock(&g_devlist_mutex);
2130 }
2131 
2132 static void
2133 _finish_unregister(void *arg)
2134 {
2135 	struct io_device *dev = arg;
2136 	struct spdk_thread *thread;
2137 
2138 	thread = spdk_get_thread();
2139 	assert(thread == dev->unregister_thread);
2140 
2141 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
2142 		      dev->name, dev->io_device, thread->name);
2143 
2144 	assert(thread->pending_unregister_count > 0);
2145 	thread->pending_unregister_count--;
2146 
2147 	dev->unregister_cb(dev->io_device);
2148 	free(dev);
2149 }
2150 
2151 static void
2152 io_device_free(struct io_device *dev)
2153 {
2154 	int rc __attribute__((unused));
2155 
2156 	if (dev->unregister_cb == NULL) {
2157 		free(dev);
2158 	} else {
2159 		assert(dev->unregister_thread != NULL);
2160 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
2161 			      dev->name, dev->io_device, dev->unregister_thread->name);
2162 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
2163 		assert(rc == 0);
2164 	}
2165 }
2166 
2167 void
2168 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
2169 {
2170 	struct io_device *dev;
2171 	uint32_t refcnt;
2172 	struct spdk_thread *thread;
2173 
2174 	thread = spdk_get_thread();
2175 	if (!thread) {
2176 		SPDK_ERRLOG("called from non-SPDK thread\n");
2177 		assert(false);
2178 		return;
2179 	}
2180 
2181 	pthread_mutex_lock(&g_devlist_mutex);
2182 	dev = io_device_get(io_device);
2183 	if (!dev) {
2184 		SPDK_ERRLOG("io_device %p not found\n", io_device);
2185 		assert(false);
2186 		pthread_mutex_unlock(&g_devlist_mutex);
2187 		return;
2188 	}
2189 
2190 	/* The for_each_count check differentiates the user attempting to unregister the
2191 	 * device a second time, from the internal call to this function that occurs
2192 	 * after the for_each_count reaches 0.
2193 	 */
2194 	if (dev->pending_unregister && dev->for_each_count > 0) {
2195 		SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device);
2196 		assert(false);
2197 		pthread_mutex_unlock(&g_devlist_mutex);
2198 		return;
2199 	}
2200 
2201 	dev->unregister_cb = unregister_cb;
2202 	dev->unregister_thread = thread;
2203 
2204 	if (dev->for_each_count > 0) {
2205 		SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n",
2206 			     dev->name, io_device, dev->for_each_count);
2207 		dev->pending_unregister = true;
2208 		pthread_mutex_unlock(&g_devlist_mutex);
2209 		return;
2210 	}
2211 
2212 	dev->unregistered = true;
2213 	RB_REMOVE(io_device_tree, &g_io_devices, dev);
2214 	refcnt = dev->refcnt;
2215 	pthread_mutex_unlock(&g_devlist_mutex);
2216 
2217 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
2218 		      dev->name, dev->io_device, thread->name);
2219 
2220 	if (unregister_cb) {
2221 		thread->pending_unregister_count++;
2222 	}
2223 
2224 	if (refcnt > 0) {
2225 		/* defer deletion */
2226 		return;
2227 	}
2228 
2229 	io_device_free(dev);
2230 }
2231 
2232 const char *
2233 spdk_io_device_get_name(struct io_device *dev)
2234 {
2235 	return dev->name;
2236 }
2237 
2238 static struct spdk_io_channel *
2239 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev)
2240 {
2241 	struct spdk_io_channel find = {};
2242 
2243 	find.dev = dev;
2244 	return RB_FIND(io_channel_tree, &thread->io_channels, &find);
2245 }
2246 
2247 struct spdk_io_channel *
2248 spdk_get_io_channel(void *io_device)
2249 {
2250 	struct spdk_io_channel *ch;
2251 	struct spdk_thread *thread;
2252 	struct io_device *dev;
2253 	int rc;
2254 
2255 	pthread_mutex_lock(&g_devlist_mutex);
2256 	dev = io_device_get(io_device);
2257 	if (dev == NULL) {
2258 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
2259 		pthread_mutex_unlock(&g_devlist_mutex);
2260 		return NULL;
2261 	}
2262 
2263 	thread = _get_thread();
2264 	if (!thread) {
2265 		SPDK_ERRLOG("No thread allocated\n");
2266 		pthread_mutex_unlock(&g_devlist_mutex);
2267 		return NULL;
2268 	}
2269 
2270 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
2271 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
2272 		pthread_mutex_unlock(&g_devlist_mutex);
2273 		return NULL;
2274 	}
2275 
2276 	ch = thread_get_io_channel(thread, dev);
2277 	if (ch != NULL) {
2278 		ch->ref++;
2279 
2280 		SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2281 			      ch, dev->name, dev->io_device, thread->name, ch->ref);
2282 
2283 		/*
2284 		 * An I/O channel already exists for this device on this
2285 		 *  thread, so return it.
2286 		 */
2287 		pthread_mutex_unlock(&g_devlist_mutex);
2288 		spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0,
2289 				  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2290 		return ch;
2291 	}
2292 
2293 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
2294 	if (ch == NULL) {
2295 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
2296 		pthread_mutex_unlock(&g_devlist_mutex);
2297 		return NULL;
2298 	}
2299 
2300 	ch->dev = dev;
2301 	ch->destroy_cb = dev->destroy_cb;
2302 	ch->thread = thread;
2303 	ch->ref = 1;
2304 	ch->destroy_ref = 0;
2305 	RB_INSERT(io_channel_tree, &thread->io_channels, ch);
2306 
2307 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2308 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
2309 
2310 	dev->refcnt++;
2311 
2312 	pthread_mutex_unlock(&g_devlist_mutex);
2313 
2314 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
2315 	if (rc != 0) {
2316 		pthread_mutex_lock(&g_devlist_mutex);
2317 		RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
2318 		dev->refcnt--;
2319 		free(ch);
2320 		pthread_mutex_unlock(&g_devlist_mutex);
2321 		return NULL;
2322 	}
2323 
2324 	spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1);
2325 	return ch;
2326 }
2327 
2328 static void
2329 put_io_channel(void *arg)
2330 {
2331 	struct spdk_io_channel *ch = arg;
2332 	bool do_remove_dev = true;
2333 	struct spdk_thread *thread;
2334 
2335 	thread = spdk_get_thread();
2336 	if (!thread) {
2337 		SPDK_ERRLOG("called from non-SPDK thread\n");
2338 		assert(false);
2339 		return;
2340 	}
2341 
2342 	SPDK_DEBUGLOG(thread,
2343 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
2344 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
2345 
2346 	assert(ch->thread == thread);
2347 
2348 	ch->destroy_ref--;
2349 
2350 	if (ch->ref > 0 || ch->destroy_ref > 0) {
2351 		/*
2352 		 * Another reference to the associated io_device was requested
2353 		 *  after this message was sent but before it had a chance to
2354 		 *  execute.
2355 		 */
2356 		return;
2357 	}
2358 
2359 	pthread_mutex_lock(&g_devlist_mutex);
2360 	RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
2361 	pthread_mutex_unlock(&g_devlist_mutex);
2362 
2363 	/* Don't hold the devlist mutex while the destroy_cb is called. */
2364 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
2365 
2366 	pthread_mutex_lock(&g_devlist_mutex);
2367 	ch->dev->refcnt--;
2368 
2369 	if (!ch->dev->unregistered) {
2370 		do_remove_dev = false;
2371 	}
2372 
2373 	if (ch->dev->refcnt > 0) {
2374 		do_remove_dev = false;
2375 	}
2376 
2377 	pthread_mutex_unlock(&g_devlist_mutex);
2378 
2379 	if (do_remove_dev) {
2380 		io_device_free(ch->dev);
2381 	}
2382 	free(ch);
2383 }
2384 
2385 void
2386 spdk_put_io_channel(struct spdk_io_channel *ch)
2387 {
2388 	struct spdk_thread *thread;
2389 	int rc __attribute__((unused));
2390 
2391 	spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0,
2392 			  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2393 
2394 	thread = spdk_get_thread();
2395 	if (!thread) {
2396 		SPDK_ERRLOG("called from non-SPDK thread\n");
2397 		assert(false);
2398 		return;
2399 	}
2400 
2401 	if (ch->thread != thread) {
2402 		wrong_thread(__func__, "ch", ch->thread, thread);
2403 		return;
2404 	}
2405 
2406 	SPDK_DEBUGLOG(thread,
2407 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2408 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
2409 
2410 	ch->ref--;
2411 
2412 	if (ch->ref == 0) {
2413 		ch->destroy_ref++;
2414 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
2415 		assert(rc == 0);
2416 	}
2417 }
2418 
2419 struct spdk_io_channel *
2420 spdk_io_channel_from_ctx(void *ctx)
2421 {
2422 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
2423 }
2424 
2425 struct spdk_thread *
2426 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
2427 {
2428 	return ch->thread;
2429 }
2430 
2431 void *
2432 spdk_io_channel_get_io_device(struct spdk_io_channel *ch)
2433 {
2434 	return ch->dev->io_device;
2435 }
2436 
2437 const char *
2438 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch)
2439 {
2440 	return spdk_io_device_get_name(ch->dev);
2441 }
2442 
2443 int
2444 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch)
2445 {
2446 	return ch->ref;
2447 }
2448 
2449 struct spdk_io_channel_iter {
2450 	void *io_device;
2451 	struct io_device *dev;
2452 	spdk_channel_msg fn;
2453 	int status;
2454 	void *ctx;
2455 	struct spdk_io_channel *ch;
2456 
2457 	struct spdk_thread *cur_thread;
2458 
2459 	struct spdk_thread *orig_thread;
2460 	spdk_channel_for_each_cpl cpl;
2461 };
2462 
2463 void *
2464 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
2465 {
2466 	return i->io_device;
2467 }
2468 
2469 struct spdk_io_channel *
2470 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
2471 {
2472 	return i->ch;
2473 }
2474 
2475 void *
2476 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
2477 {
2478 	return i->ctx;
2479 }
2480 
2481 static void
2482 _call_completion(void *ctx)
2483 {
2484 	struct spdk_io_channel_iter *i = ctx;
2485 
2486 	if (i->cpl != NULL) {
2487 		i->cpl(i, i->status);
2488 	}
2489 	free(i);
2490 }
2491 
2492 static void
2493 _call_channel(void *ctx)
2494 {
2495 	struct spdk_io_channel_iter *i = ctx;
2496 	struct spdk_io_channel *ch;
2497 
2498 	/*
2499 	 * It is possible that the channel was deleted before this
2500 	 *  message had a chance to execute.  If so, skip calling
2501 	 *  the fn() on this thread.
2502 	 */
2503 	pthread_mutex_lock(&g_devlist_mutex);
2504 	ch = thread_get_io_channel(i->cur_thread, i->dev);
2505 	pthread_mutex_unlock(&g_devlist_mutex);
2506 
2507 	if (ch) {
2508 		i->fn(i);
2509 	} else {
2510 		spdk_for_each_channel_continue(i, 0);
2511 	}
2512 }
2513 
2514 void
2515 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
2516 		      spdk_channel_for_each_cpl cpl)
2517 {
2518 	struct spdk_thread *thread;
2519 	struct spdk_io_channel *ch;
2520 	struct spdk_io_channel_iter *i;
2521 	int rc __attribute__((unused));
2522 
2523 	i = calloc(1, sizeof(*i));
2524 	if (!i) {
2525 		SPDK_ERRLOG("Unable to allocate iterator\n");
2526 		assert(false);
2527 		return;
2528 	}
2529 
2530 	i->io_device = io_device;
2531 	i->fn = fn;
2532 	i->ctx = ctx;
2533 	i->cpl = cpl;
2534 	i->orig_thread = _get_thread();
2535 
2536 	pthread_mutex_lock(&g_devlist_mutex);
2537 	i->dev = io_device_get(io_device);
2538 	if (i->dev == NULL) {
2539 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
2540 		assert(false);
2541 		i->status = -ENODEV;
2542 		goto end;
2543 	}
2544 
2545 	/* Do not allow new for_each operations if we are already waiting to unregister
2546 	 * the device for other for_each operations to complete.
2547 	 */
2548 	if (i->dev->pending_unregister) {
2549 		SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device);
2550 		i->status = -ENODEV;
2551 		goto end;
2552 	}
2553 
2554 	TAILQ_FOREACH(thread, &g_threads, tailq) {
2555 		ch = thread_get_io_channel(thread, i->dev);
2556 		if (ch != NULL) {
2557 			ch->dev->for_each_count++;
2558 			i->cur_thread = thread;
2559 			i->ch = ch;
2560 			pthread_mutex_unlock(&g_devlist_mutex);
2561 			rc = spdk_thread_send_msg(thread, _call_channel, i);
2562 			assert(rc == 0);
2563 			return;
2564 		}
2565 	}
2566 
2567 end:
2568 	pthread_mutex_unlock(&g_devlist_mutex);
2569 
2570 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2571 	assert(rc == 0);
2572 }
2573 
2574 static void
2575 __pending_unregister(void *arg)
2576 {
2577 	struct io_device *dev = arg;
2578 
2579 	assert(dev->pending_unregister);
2580 	assert(dev->for_each_count == 0);
2581 	spdk_io_device_unregister(dev->io_device, dev->unregister_cb);
2582 }
2583 
2584 void
2585 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
2586 {
2587 	struct spdk_thread *thread;
2588 	struct spdk_io_channel *ch;
2589 	struct io_device *dev;
2590 	int rc __attribute__((unused));
2591 
2592 	assert(i->cur_thread == spdk_get_thread());
2593 
2594 	i->status = status;
2595 
2596 	pthread_mutex_lock(&g_devlist_mutex);
2597 	dev = i->dev;
2598 	if (status) {
2599 		goto end;
2600 	}
2601 
2602 	thread = TAILQ_NEXT(i->cur_thread, tailq);
2603 	while (thread) {
2604 		ch = thread_get_io_channel(thread, dev);
2605 		if (ch != NULL) {
2606 			i->cur_thread = thread;
2607 			i->ch = ch;
2608 			pthread_mutex_unlock(&g_devlist_mutex);
2609 			rc = spdk_thread_send_msg(thread, _call_channel, i);
2610 			assert(rc == 0);
2611 			return;
2612 		}
2613 		thread = TAILQ_NEXT(thread, tailq);
2614 	}
2615 
2616 end:
2617 	dev->for_each_count--;
2618 	i->ch = NULL;
2619 	pthread_mutex_unlock(&g_devlist_mutex);
2620 
2621 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2622 	assert(rc == 0);
2623 
2624 	pthread_mutex_lock(&g_devlist_mutex);
2625 	if (dev->pending_unregister && dev->for_each_count == 0) {
2626 		rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev);
2627 		assert(rc == 0);
2628 	}
2629 	pthread_mutex_unlock(&g_devlist_mutex);
2630 }
2631 
2632 struct spdk_interrupt {
2633 	int			efd;
2634 	struct spdk_thread	*thread;
2635 	char			name[SPDK_MAX_POLLER_NAME_LEN + 1];
2636 };
2637 
2638 static void
2639 thread_interrupt_destroy(struct spdk_thread *thread)
2640 {
2641 	struct spdk_fd_group *fgrp = thread->fgrp;
2642 
2643 	SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
2644 
2645 	if (thread->msg_fd < 0) {
2646 		return;
2647 	}
2648 
2649 	spdk_fd_group_remove(fgrp, thread->msg_fd);
2650 	close(thread->msg_fd);
2651 	thread->msg_fd = -1;
2652 
2653 	spdk_fd_group_destroy(fgrp);
2654 	thread->fgrp = NULL;
2655 }
2656 
2657 #ifdef __linux__
2658 static int
2659 thread_interrupt_msg_process(void *arg)
2660 {
2661 	struct spdk_thread *thread = arg;
2662 	uint32_t msg_count;
2663 	spdk_msg_fn critical_msg;
2664 	int rc = 0;
2665 	uint64_t notify = 1;
2666 
2667 	assert(spdk_interrupt_mode_is_enabled());
2668 
2669 	/* There may be race between msg_acknowledge and another producer's msg_notify,
2670 	 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
2671 	 * This can avoid msg notification missing.
2672 	 */
2673 	rc = read(thread->msg_fd, &notify, sizeof(notify));
2674 	if (rc < 0 && errno != EAGAIN) {
2675 		SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno));
2676 	}
2677 
2678 	critical_msg = thread->critical_msg;
2679 	if (spdk_unlikely(critical_msg != NULL)) {
2680 		critical_msg(NULL);
2681 		thread->critical_msg = NULL;
2682 		rc = 1;
2683 	}
2684 
2685 	msg_count = msg_queue_run_batch(thread, 0);
2686 	if (msg_count) {
2687 		rc = 1;
2688 	}
2689 
2690 	return rc;
2691 }
2692 
2693 static int
2694 thread_interrupt_create(struct spdk_thread *thread)
2695 {
2696 	int rc;
2697 
2698 	SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
2699 
2700 	rc = spdk_fd_group_create(&thread->fgrp);
2701 	if (rc) {
2702 		return rc;
2703 	}
2704 
2705 	thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2706 	if (thread->msg_fd < 0) {
2707 		rc = -errno;
2708 		spdk_fd_group_destroy(thread->fgrp);
2709 		thread->fgrp = NULL;
2710 
2711 		return rc;
2712 	}
2713 
2714 	return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd,
2715 				 thread_interrupt_msg_process, thread);
2716 }
2717 #else
2718 static int
2719 thread_interrupt_create(struct spdk_thread *thread)
2720 {
2721 	return -ENOTSUP;
2722 }
2723 #endif
2724 
2725 struct spdk_interrupt *
2726 spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
2727 			void *arg, const char *name)
2728 {
2729 	struct spdk_thread *thread;
2730 	struct spdk_interrupt *intr;
2731 	int ret;
2732 
2733 	thread = spdk_get_thread();
2734 	if (!thread) {
2735 		assert(false);
2736 		return NULL;
2737 	}
2738 
2739 	if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
2740 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
2741 		return NULL;
2742 	}
2743 
2744 	intr = calloc(1, sizeof(*intr));
2745 	if (intr == NULL) {
2746 		SPDK_ERRLOG("Interrupt handler allocation failed\n");
2747 		return NULL;
2748 	}
2749 
2750 	if (name) {
2751 		snprintf(intr->name, sizeof(intr->name), "%s", name);
2752 	} else {
2753 		snprintf(intr->name, sizeof(intr->name), "%p", fn);
2754 	}
2755 
2756 	intr->efd = efd;
2757 	intr->thread = thread;
2758 
2759 	ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name);
2760 
2761 	if (ret != 0) {
2762 		SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n",
2763 			    thread->name, efd, spdk_strerror(-ret));
2764 		free(intr);
2765 		return NULL;
2766 	}
2767 
2768 	return intr;
2769 }
2770 
2771 void
2772 spdk_interrupt_unregister(struct spdk_interrupt **pintr)
2773 {
2774 	struct spdk_thread *thread;
2775 	struct spdk_interrupt *intr;
2776 
2777 	intr = *pintr;
2778 	if (intr == NULL) {
2779 		return;
2780 	}
2781 
2782 	*pintr = NULL;
2783 
2784 	thread = spdk_get_thread();
2785 	if (!thread) {
2786 		assert(false);
2787 		return;
2788 	}
2789 
2790 	if (intr->thread != thread) {
2791 		wrong_thread(__func__, intr->name, intr->thread, thread);
2792 		return;
2793 	}
2794 
2795 	spdk_fd_group_remove(thread->fgrp, intr->efd);
2796 	free(intr);
2797 }
2798 
2799 int
2800 spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
2801 			       enum spdk_interrupt_event_types event_types)
2802 {
2803 	struct spdk_thread *thread;
2804 
2805 	thread = spdk_get_thread();
2806 	if (!thread) {
2807 		assert(false);
2808 		return -EINVAL;
2809 	}
2810 
2811 	if (intr->thread != thread) {
2812 		wrong_thread(__func__, intr->name, intr->thread, thread);
2813 		return -EINVAL;
2814 	}
2815 
2816 	return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
2817 }
2818 
2819 int
2820 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
2821 {
2822 	return spdk_fd_group_get_fd(thread->fgrp);
2823 }
2824 
2825 static bool g_interrupt_mode = false;
2826 
2827 int
2828 spdk_interrupt_mode_enable(void)
2829 {
2830 	/* It must be called once prior to initializing the threading library.
2831 	 * g_spdk_msg_mempool will be valid if thread library is initialized.
2832 	 */
2833 	if (g_spdk_msg_mempool) {
2834 		SPDK_ERRLOG("Failed due to threading library is already initialized.\n");
2835 		return -1;
2836 	}
2837 
2838 #ifdef __linux__
2839 	SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
2840 	g_interrupt_mode = true;
2841 	return 0;
2842 #else
2843 	SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
2844 	g_interrupt_mode = false;
2845 	return -ENOTSUP;
2846 #endif
2847 }
2848 
2849 bool
2850 spdk_interrupt_mode_is_enabled(void)
2851 {
2852 	return g_interrupt_mode;
2853 }
2854 
2855 void
2856 spdk_spin_init(struct spdk_spinlock *sspin)
2857 {
2858 	int rc;
2859 
2860 	memset(sspin, 0, sizeof(*sspin));
2861 	rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE);
2862 	SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD);
2863 }
2864 
2865 void
2866 spdk_spin_destroy(struct spdk_spinlock *sspin)
2867 {
2868 	int rc;
2869 
2870 	SPIN_ASSERT_RETURN_VOID(sspin->thread == NULL, SPIN_ERR_LOCK_HELD);
2871 
2872 	rc = pthread_spin_destroy(&sspin->spinlock);
2873 	SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD);
2874 }
2875 
2876 void
2877 spdk_spin_lock(struct spdk_spinlock *sspin)
2878 {
2879 	struct spdk_thread *thread = spdk_get_thread();
2880 	int rc;
2881 
2882 	SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD);
2883 	SPIN_ASSERT_RETURN_VOID(thread != sspin->thread, SPIN_ERR_DEADLOCK);
2884 
2885 	rc = pthread_spin_lock(&sspin->spinlock);
2886 	SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD);
2887 
2888 	sspin->thread = thread;
2889 	sspin->thread->lock_count++;
2890 }
2891 
2892 void
2893 spdk_spin_unlock(struct spdk_spinlock *sspin)
2894 {
2895 	struct spdk_thread *thread = spdk_get_thread();
2896 	int rc;
2897 
2898 	SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD);
2899 	SPIN_ASSERT_RETURN_VOID(thread == sspin->thread, SPIN_ERR_WRONG_THREAD);
2900 
2901 	SPIN_ASSERT_RETURN_VOID(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT);
2902 	thread->lock_count--;
2903 	sspin->thread = NULL;
2904 
2905 	rc = pthread_spin_unlock(&sspin->spinlock);
2906 	SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD);
2907 }
2908 
2909 bool
2910 spdk_spin_held(struct spdk_spinlock *sspin)
2911 {
2912 	struct spdk_thread *thread = spdk_get_thread();
2913 
2914 	SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false);
2915 
2916 	return sspin->thread == thread;
2917 }
2918 
2919 static int
2920 iobuf_channel_create_cb(void *io_device, void *ctx)
2921 {
2922 	struct iobuf_channel *ch = ctx;
2923 
2924 	STAILQ_INIT(&ch->small_queue);
2925 	STAILQ_INIT(&ch->large_queue);
2926 
2927 	return 0;
2928 }
2929 
2930 static void
2931 iobuf_channel_destroy_cb(void *io_device, void *ctx)
2932 {
2933 	struct iobuf_channel *ch __attribute__((unused)) = ctx;
2934 
2935 	assert(STAILQ_EMPTY(&ch->small_queue));
2936 	assert(STAILQ_EMPTY(&ch->large_queue));
2937 }
2938 
2939 int
2940 spdk_iobuf_initialize(void)
2941 {
2942 	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
2943 	int rc = 0;
2944 
2945 	g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count,
2946 			     opts->small_bufsize + SPDK_IOBUF_DATA_OFFSET, 0,
2947 			     SPDK_ENV_SOCKET_ID_ANY);
2948 	if (!g_iobuf.small_pool) {
2949 		SPDK_ERRLOG("Failed to create small iobuf pool\n");
2950 		rc = -ENOMEM;
2951 		goto error;
2952 	}
2953 
2954 	g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count,
2955 			     opts->large_bufsize + SPDK_IOBUF_DATA_OFFSET, 0,
2956 			     SPDK_ENV_SOCKET_ID_ANY);
2957 	if (!g_iobuf.large_pool) {
2958 		SPDK_ERRLOG("Failed to create large iobuf pool\n");
2959 		rc = -ENOMEM;
2960 		goto error;
2961 	}
2962 
2963 	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
2964 				sizeof(struct iobuf_channel), "iobuf");
2965 
2966 	return 0;
2967 error:
2968 	spdk_mempool_free(g_iobuf.small_pool);
2969 	return rc;
2970 }
2971 
2972 static void
2973 iobuf_unregister_cb(void *io_device)
2974 {
2975 	struct iobuf_module *module;
2976 
2977 	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
2978 		module = TAILQ_FIRST(&g_iobuf.modules);
2979 		TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
2980 		free(module->name);
2981 		free(module);
2982 	}
2983 
2984 	if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
2985 		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
2986 			    spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
2987 	}
2988 
2989 	if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
2990 		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
2991 			    spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
2992 	}
2993 
2994 	spdk_mempool_free(g_iobuf.small_pool);
2995 	spdk_mempool_free(g_iobuf.large_pool);
2996 
2997 	if (g_iobuf.finish_cb != NULL) {
2998 		g_iobuf.finish_cb(g_iobuf.finish_arg);
2999 	}
3000 }
3001 
3002 void
3003 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
3004 {
3005 	g_iobuf.finish_cb = cb_fn;
3006 	g_iobuf.finish_arg = cb_arg;
3007 
3008 	spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
3009 }
3010 
3011 int
3012 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
3013 {
3014 	if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
3015 		SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
3016 			    IOBUF_MIN_SMALL_POOL_SIZE);
3017 		return -EINVAL;
3018 	}
3019 	if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
3020 		SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
3021 			    IOBUF_MIN_LARGE_POOL_SIZE);
3022 		return -EINVAL;
3023 	}
3024 	if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
3025 		SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n",
3026 			    IOBUF_MIN_SMALL_BUFSIZE);
3027 		return -EINVAL;
3028 	}
3029 	if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
3030 		SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n",
3031 			    IOBUF_MIN_LARGE_BUFSIZE);
3032 		return -EINVAL;
3033 	}
3034 
3035 	g_iobuf.opts = *opts;
3036 
3037 	return 0;
3038 }
3039 
3040 void
3041 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts)
3042 {
3043 	*opts = g_iobuf.opts;
3044 }
3045 
3046 int
3047 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
3048 			uint32_t small_cache_size, uint32_t large_cache_size)
3049 {
3050 	struct spdk_io_channel *ioch;
3051 	struct iobuf_channel *iobuf_ch;
3052 	struct iobuf_module *module;
3053 	struct spdk_iobuf_buffer *buf;
3054 	uint32_t i;
3055 
3056 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
3057 		if (strcmp(name, module->name) == 0) {
3058 			break;
3059 		}
3060 	}
3061 
3062 	if (module == NULL) {
3063 		SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
3064 		return -ENODEV;
3065 	}
3066 
3067 	ioch = spdk_get_io_channel(&g_iobuf);
3068 	if (ioch == NULL) {
3069 		SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
3070 		return -ENOMEM;
3071 	}
3072 
3073 	iobuf_ch = spdk_io_channel_get_ctx(ioch);
3074 
3075 	ch->small.queue = &iobuf_ch->small_queue;
3076 	ch->large.queue = &iobuf_ch->large_queue;
3077 	ch->small.pool = g_iobuf.small_pool;
3078 	ch->large.pool = g_iobuf.large_pool;
3079 	ch->small.bufsize = g_iobuf.opts.small_bufsize;
3080 	ch->large.bufsize = g_iobuf.opts.large_bufsize;
3081 	ch->parent = ioch;
3082 	ch->module = module;
3083 	ch->small.cache_size = small_cache_size;
3084 	ch->large.cache_size = large_cache_size;
3085 	ch->small.cache_count = 0;
3086 	ch->large.cache_count = 0;
3087 
3088 	STAILQ_INIT(&ch->small.cache);
3089 	STAILQ_INIT(&ch->large.cache);
3090 
3091 	for (i = 0; i < small_cache_size; ++i) {
3092 		buf = spdk_mempool_get(g_iobuf.small_pool);
3093 		if (buf == NULL) {
3094 			SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
3095 				    "You may need to increase spdk_iobuf_opts.small_pool_count\n");
3096 			goto error;
3097 		}
3098 		STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
3099 		ch->small.cache_count++;
3100 	}
3101 	for (i = 0; i < large_cache_size; ++i) {
3102 		buf = spdk_mempool_get(g_iobuf.large_pool);
3103 		if (buf == NULL) {
3104 			SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
3105 				    "You may need to increase spdk_iobuf_opts.large_pool_count\n");
3106 			goto error;
3107 		}
3108 		STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
3109 		ch->large.cache_count++;
3110 	}
3111 
3112 	return 0;
3113 error:
3114 	spdk_iobuf_channel_fini(ch);
3115 
3116 	return -ENOMEM;
3117 }
3118 
3119 void
3120 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
3121 {
3122 	struct spdk_iobuf_entry *entry __attribute__((unused));
3123 	struct spdk_iobuf_buffer *buf;
3124 
3125 	/* Make sure none of the wait queue entries are coming from this module */
3126 	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
3127 		assert(entry->module != ch->module);
3128 	}
3129 	STAILQ_FOREACH(entry, ch->large.queue, stailq) {
3130 		assert(entry->module != ch->module);
3131 	}
3132 
3133 	/* Release cached buffers back to the pool */
3134 	while (!STAILQ_EMPTY(&ch->small.cache)) {
3135 		buf = STAILQ_FIRST(&ch->small.cache);
3136 		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
3137 		spdk_mempool_put(ch->small.pool, buf);
3138 		ch->small.cache_count--;
3139 	}
3140 	while (!STAILQ_EMPTY(&ch->large.cache)) {
3141 		buf = STAILQ_FIRST(&ch->large.cache);
3142 		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
3143 		spdk_mempool_put(ch->large.pool, buf);
3144 		ch->large.cache_count--;
3145 	}
3146 
3147 	assert(ch->small.cache_count == 0);
3148 	assert(ch->large.cache_count == 0);
3149 
3150 	spdk_put_io_channel(ch->parent);
3151 	ch->parent = NULL;
3152 }
3153 
3154 int
3155 spdk_iobuf_register_module(const char *name)
3156 {
3157 	struct iobuf_module *module;
3158 
3159 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
3160 		if (strcmp(name, module->name) == 0) {
3161 			return -EEXIST;
3162 		}
3163 	}
3164 
3165 	module = calloc(1, sizeof(*module));
3166 	if (module == NULL) {
3167 		return -ENOMEM;
3168 	}
3169 
3170 	module->name = strdup(name);
3171 	if (module->name == NULL) {
3172 		free(module);
3173 		return -ENOMEM;
3174 	}
3175 
3176 	TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
3177 
3178 	return 0;
3179 }
3180 
3181 int
3182 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
3183 			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
3184 {
3185 	struct spdk_iobuf_entry *entry, *tmp;
3186 	int rc;
3187 
3188 	STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
3189 		/* We only want to iterate over the entries requested by the module which owns ch */
3190 		if (entry->module != ch->module) {
3191 			continue;
3192 		}
3193 
3194 		rc = cb_fn(ch, entry, cb_ctx);
3195 		if (rc != 0) {
3196 			return rc;
3197 		}
3198 	}
3199 
3200 	return 0;
3201 }
3202 
3203 void
3204 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
3205 		       uint64_t len)
3206 {
3207 	struct spdk_iobuf_pool *pool;
3208 
3209 	if (len <= ch->small.bufsize) {
3210 		pool = &ch->small;
3211 	} else {
3212 		assert(len <= ch->large.bufsize);
3213 		pool = &ch->large;
3214 	}
3215 
3216 	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
3217 }
3218 
3219 SPDK_LOG_REGISTER_COMPONENT(thread)
3220