xref: /spdk/lib/thread/thread.c (revision 4fe4040a14d6752bb631bffddc10ea53ed68e150)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/queue.h"
39 #include "spdk/string.h"
40 #include "spdk/thread.h"
41 #include "spdk/trace.h"
42 #include "spdk/tree.h"
43 #include "spdk/util.h"
44 #include "spdk/fd_group.h"
45 
46 #include "spdk/log.h"
47 #include "spdk_internal/thread.h"
48 #include "thread_internal.h"
49 
50 #ifdef __linux__
51 #include <sys/timerfd.h>
52 #include <sys/eventfd.h>
53 #endif
54 
55 #define SPDK_MSG_BATCH_SIZE		8
56 #define SPDK_MAX_DEVICE_NAME_LEN	256
57 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
58 #define SPDK_MAX_POLLER_NAME_LEN	256
59 #define SPDK_MAX_THREAD_NAME_LEN	256
60 
61 enum spdk_poller_state {
62 	/* The poller is registered with a thread but not currently executing its fn. */
63 	SPDK_POLLER_STATE_WAITING,
64 
65 	/* The poller is currently running its fn. */
66 	SPDK_POLLER_STATE_RUNNING,
67 
68 	/* The poller was unregistered during the execution of its fn. */
69 	SPDK_POLLER_STATE_UNREGISTERED,
70 
71 	/* The poller is in the process of being paused.  It will be paused
72 	 * during the next time it's supposed to be executed.
73 	 */
74 	SPDK_POLLER_STATE_PAUSING,
75 
76 	/* The poller is registered but currently paused.  It's on the
77 	 * paused_pollers list.
78 	 */
79 	SPDK_POLLER_STATE_PAUSED,
80 };
81 
82 struct spdk_poller {
83 	TAILQ_ENTRY(spdk_poller)	tailq;
84 	RB_ENTRY(spdk_poller)		node;
85 
86 	/* Current state of the poller; should only be accessed from the poller's thread. */
87 	enum spdk_poller_state		state;
88 
89 	uint64_t			period_ticks;
90 	uint64_t			next_run_tick;
91 	uint64_t			run_count;
92 	uint64_t			busy_count;
93 	spdk_poller_fn			fn;
94 	void				*arg;
95 	struct spdk_thread		*thread;
96 	int				interruptfd;
97 	spdk_poller_set_interrupt_mode_cb set_intr_cb_fn;
98 	void				*set_intr_cb_arg;
99 
100 	char				name[SPDK_MAX_POLLER_NAME_LEN + 1];
101 };
102 
103 enum spdk_thread_state {
104 	/* The thread is pocessing poller and message by spdk_thread_poll(). */
105 	SPDK_THREAD_STATE_RUNNING,
106 
107 	/* The thread is in the process of termination. It reaps unregistering
108 	 * poller are releasing I/O channel.
109 	 */
110 	SPDK_THREAD_STATE_EXITING,
111 
112 	/* The thread is exited. It is ready to call spdk_thread_destroy(). */
113 	SPDK_THREAD_STATE_EXITED,
114 };
115 
116 struct spdk_thread {
117 	uint64_t			tsc_last;
118 	struct spdk_thread_stats	stats;
119 	/*
120 	 * Contains pollers actively running on this thread.  Pollers
121 	 *  are run round-robin. The thread takes one poller from the head
122 	 *  of the ring, executes it, then puts it back at the tail of
123 	 *  the ring.
124 	 */
125 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
126 	/**
127 	 * Contains pollers running on this thread with a periodic timer.
128 	 */
129 	RB_HEAD(timed_pollers_tree, spdk_poller)	timed_pollers;
130 	struct spdk_poller				*first_timed_poller;
131 	/*
132 	 * Contains paused pollers.  Pollers on this queue are waiting until
133 	 * they are resumed (in which case they're put onto the active/timer
134 	 * queues) or unregistered.
135 	 */
136 	TAILQ_HEAD(paused_pollers_head, spdk_poller)	paused_pollers;
137 	struct spdk_ring		*messages;
138 	int				msg_fd;
139 	SLIST_HEAD(, spdk_msg)		msg_cache;
140 	size_t				msg_cache_count;
141 	spdk_msg_fn			critical_msg;
142 	uint64_t			id;
143 	enum spdk_thread_state		state;
144 	int				pending_unregister_count;
145 
146 	TAILQ_HEAD(, spdk_io_channel)	io_channels;
147 	TAILQ_ENTRY(spdk_thread)	tailq;
148 
149 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
150 	struct spdk_cpuset		cpumask;
151 	uint64_t			exit_timeout_tsc;
152 
153 	/* Indicates whether this spdk_thread currently runs in interrupt. */
154 	bool				in_interrupt;
155 	struct spdk_fd_group		*fgrp;
156 
157 	/* User context allocated at the end */
158 	uint8_t				ctx[0];
159 };
160 
161 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
162 
163 static spdk_new_thread_fn g_new_thread_fn = NULL;
164 static spdk_thread_op_fn g_thread_op_fn = NULL;
165 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
166 static size_t g_ctx_sz = 0;
167 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
168  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
169  * SPDK application is required.
170  */
171 static uint64_t g_thread_id = 1;
172 
173 struct io_device {
174 	void				*io_device;
175 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
176 	spdk_io_channel_create_cb	create_cb;
177 	spdk_io_channel_destroy_cb	destroy_cb;
178 	spdk_io_device_unregister_cb	unregister_cb;
179 	struct spdk_thread		*unregister_thread;
180 	uint32_t			ctx_size;
181 	uint32_t			for_each_count;
182 	TAILQ_ENTRY(io_device)		tailq;
183 
184 	uint32_t			refcnt;
185 
186 	bool				unregistered;
187 };
188 
189 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
190 
191 struct spdk_msg {
192 	spdk_msg_fn		fn;
193 	void			*arg;
194 
195 	SLIST_ENTRY(spdk_msg)	link;
196 };
197 
198 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
199 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
200 
201 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
202 static uint32_t g_thread_count = 0;
203 
204 static __thread struct spdk_thread *tls_thread = NULL;
205 
206 #define TRACE_GROUP_THREAD		0xa
207 #define TRACE_THREAD_IOCH_GET   SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x0)
208 #define TRACE_THREAD_IOCH_PUT   SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x1)
209 
210 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD)
211 {
212 	spdk_trace_register_description("THREAD_IOCH_GET",
213 					TRACE_THREAD_IOCH_GET,
214 					OWNER_NONE, OBJECT_NONE, 0,
215 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
216 	spdk_trace_register_description("THREAD_IOCH_PUT",
217 					TRACE_THREAD_IOCH_PUT,
218 					OWNER_NONE, OBJECT_NONE, 0,
219 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
220 }
221 
222 /*
223  * If this compare function returns zero when two next_run_ticks are equal,
224  * the macro RB_INSERT() returns a pointer to the element with the same
225  * next_run_tick.
226  *
227  * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element
228  * to remove as a parameter.
229  *
230  * Hence we allow RB_INSERT() to insert elements with the same keys on the right
231  * side by returning 1 when two next_run_ticks are equal.
232  */
233 static inline int
234 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2)
235 {
236 	if (poller1->next_run_tick < poller2->next_run_tick) {
237 		return -1;
238 	} else {
239 		return 1;
240 	}
241 }
242 
243 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare);
244 
245 static inline struct spdk_thread *
246 _get_thread(void)
247 {
248 	return tls_thread;
249 }
250 
251 static int
252 _thread_lib_init(size_t ctx_sz)
253 {
254 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
255 
256 	g_ctx_sz = ctx_sz;
257 
258 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
259 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
260 			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
261 			     sizeof(struct spdk_msg),
262 			     0, /* No cache. We do our own. */
263 			     SPDK_ENV_SOCKET_ID_ANY);
264 
265 	if (!g_spdk_msg_mempool) {
266 		SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
267 		return -1;
268 	}
269 
270 	return 0;
271 }
272 
273 int
274 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
275 {
276 	assert(g_new_thread_fn == NULL);
277 	assert(g_thread_op_fn == NULL);
278 
279 	if (new_thread_fn == NULL) {
280 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
281 	} else {
282 		g_new_thread_fn = new_thread_fn;
283 	}
284 
285 	return _thread_lib_init(ctx_sz);
286 }
287 
288 int
289 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
290 			 spdk_thread_op_supported_fn thread_op_supported_fn,
291 			 size_t ctx_sz)
292 {
293 	assert(g_new_thread_fn == NULL);
294 	assert(g_thread_op_fn == NULL);
295 	assert(g_thread_op_supported_fn == NULL);
296 
297 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
298 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
299 		return -EINVAL;
300 	}
301 
302 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
303 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
304 	} else {
305 		g_thread_op_fn = thread_op_fn;
306 		g_thread_op_supported_fn = thread_op_supported_fn;
307 	}
308 
309 	return _thread_lib_init(ctx_sz);
310 }
311 
312 void
313 spdk_thread_lib_fini(void)
314 {
315 	struct io_device *dev;
316 
317 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
318 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
319 	}
320 
321 	if (g_spdk_msg_mempool) {
322 		spdk_mempool_free(g_spdk_msg_mempool);
323 		g_spdk_msg_mempool = NULL;
324 	}
325 
326 	g_new_thread_fn = NULL;
327 	g_thread_op_fn = NULL;
328 	g_thread_op_supported_fn = NULL;
329 	g_ctx_sz = 0;
330 }
331 
332 static void thread_interrupt_destroy(struct spdk_thread *thread);
333 static int thread_interrupt_create(struct spdk_thread *thread);
334 
335 static void
336 _free_thread(struct spdk_thread *thread)
337 {
338 	struct spdk_io_channel *ch;
339 	struct spdk_msg *msg;
340 	struct spdk_poller *poller, *ptmp;
341 
342 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
343 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
344 			    thread->name, ch->dev->name);
345 	}
346 
347 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
348 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
349 			SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
350 				     poller->name);
351 		}
352 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
353 		free(poller);
354 	}
355 
356 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) {
357 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
358 			SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
359 				     poller->name);
360 		}
361 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
362 		free(poller);
363 	}
364 
365 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
366 		SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
367 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
368 		free(poller);
369 	}
370 
371 	pthread_mutex_lock(&g_devlist_mutex);
372 	assert(g_thread_count > 0);
373 	g_thread_count--;
374 	TAILQ_REMOVE(&g_threads, thread, tailq);
375 	pthread_mutex_unlock(&g_devlist_mutex);
376 
377 	msg = SLIST_FIRST(&thread->msg_cache);
378 	while (msg != NULL) {
379 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
380 
381 		assert(thread->msg_cache_count > 0);
382 		thread->msg_cache_count--;
383 		spdk_mempool_put(g_spdk_msg_mempool, msg);
384 
385 		msg = SLIST_FIRST(&thread->msg_cache);
386 	}
387 
388 	assert(thread->msg_cache_count == 0);
389 
390 	if (spdk_interrupt_mode_is_enabled()) {
391 		thread_interrupt_destroy(thread);
392 	}
393 
394 	spdk_ring_free(thread->messages);
395 	free(thread);
396 }
397 
398 struct spdk_thread *
399 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
400 {
401 	struct spdk_thread *thread;
402 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
403 	int rc = 0, i;
404 
405 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
406 	if (!thread) {
407 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
408 		return NULL;
409 	}
410 
411 	if (cpumask) {
412 		spdk_cpuset_copy(&thread->cpumask, cpumask);
413 	} else {
414 		spdk_cpuset_negate(&thread->cpumask);
415 	}
416 
417 	TAILQ_INIT(&thread->io_channels);
418 	TAILQ_INIT(&thread->active_pollers);
419 	RB_INIT(&thread->timed_pollers);
420 	TAILQ_INIT(&thread->paused_pollers);
421 	SLIST_INIT(&thread->msg_cache);
422 	thread->msg_cache_count = 0;
423 
424 	thread->tsc_last = spdk_get_ticks();
425 
426 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
427 	if (!thread->messages) {
428 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
429 		free(thread);
430 		return NULL;
431 	}
432 
433 	/* Fill the local message pool cache. */
434 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
435 	if (rc == 0) {
436 		/* If we can't populate the cache it's ok. The cache will get filled
437 		 * up organically as messages are passed to the thread. */
438 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
439 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
440 			thread->msg_cache_count++;
441 		}
442 	}
443 
444 	if (name) {
445 		snprintf(thread->name, sizeof(thread->name), "%s", name);
446 	} else {
447 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
448 	}
449 
450 	pthread_mutex_lock(&g_devlist_mutex);
451 	if (g_thread_id == 0) {
452 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
453 		pthread_mutex_unlock(&g_devlist_mutex);
454 		_free_thread(thread);
455 		return NULL;
456 	}
457 	thread->id = g_thread_id++;
458 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
459 	g_thread_count++;
460 	pthread_mutex_unlock(&g_devlist_mutex);
461 
462 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
463 		      thread->id, thread->name);
464 
465 	if (spdk_interrupt_mode_is_enabled()) {
466 		thread->in_interrupt = true;
467 		rc = thread_interrupt_create(thread);
468 		if (rc != 0) {
469 			_free_thread(thread);
470 			return NULL;
471 		}
472 	}
473 
474 	if (g_new_thread_fn) {
475 		rc = g_new_thread_fn(thread);
476 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
477 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
478 	}
479 
480 	if (rc != 0) {
481 		_free_thread(thread);
482 		return NULL;
483 	}
484 
485 	thread->state = SPDK_THREAD_STATE_RUNNING;
486 
487 	return thread;
488 }
489 
490 void
491 spdk_set_thread(struct spdk_thread *thread)
492 {
493 	tls_thread = thread;
494 }
495 
496 static void
497 thread_exit(struct spdk_thread *thread, uint64_t now)
498 {
499 	struct spdk_poller *poller;
500 	struct spdk_io_channel *ch;
501 
502 	if (now >= thread->exit_timeout_tsc) {
503 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
504 			    thread->name);
505 		goto exited;
506 	}
507 
508 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
509 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
510 			SPDK_INFOLOG(thread,
511 				     "thread %s still has active poller %s\n",
512 				     thread->name, poller->name);
513 			return;
514 		}
515 	}
516 
517 	RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) {
518 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
519 			SPDK_INFOLOG(thread,
520 				     "thread %s still has active timed poller %s\n",
521 				     thread->name, poller->name);
522 			return;
523 		}
524 	}
525 
526 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
527 		SPDK_INFOLOG(thread,
528 			     "thread %s still has paused poller %s\n",
529 			     thread->name, poller->name);
530 		return;
531 	}
532 
533 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
534 		SPDK_INFOLOG(thread,
535 			     "thread %s still has channel for io_device %s\n",
536 			     thread->name, ch->dev->name);
537 		return;
538 	}
539 
540 	if (thread->pending_unregister_count > 0) {
541 		SPDK_INFOLOG(thread,
542 			     "thread %s is still unregistering io_devices\n",
543 			     thread->name);
544 		return;
545 	}
546 
547 exited:
548 	thread->state = SPDK_THREAD_STATE_EXITED;
549 }
550 
551 int
552 spdk_thread_exit(struct spdk_thread *thread)
553 {
554 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
555 
556 	assert(tls_thread == thread);
557 
558 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
559 		SPDK_INFOLOG(thread,
560 			     "thread %s is already exiting\n",
561 			     thread->name);
562 		return 0;
563 	}
564 
565 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
566 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
567 	thread->state = SPDK_THREAD_STATE_EXITING;
568 	return 0;
569 }
570 
571 bool
572 spdk_thread_is_exited(struct spdk_thread *thread)
573 {
574 	return thread->state == SPDK_THREAD_STATE_EXITED;
575 }
576 
577 void
578 spdk_thread_destroy(struct spdk_thread *thread)
579 {
580 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
581 
582 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
583 
584 	if (tls_thread == thread) {
585 		tls_thread = NULL;
586 	}
587 
588 	_free_thread(thread);
589 }
590 
591 void *
592 spdk_thread_get_ctx(struct spdk_thread *thread)
593 {
594 	if (g_ctx_sz > 0) {
595 		return thread->ctx;
596 	}
597 
598 	return NULL;
599 }
600 
601 struct spdk_cpuset *
602 spdk_thread_get_cpumask(struct spdk_thread *thread)
603 {
604 	return &thread->cpumask;
605 }
606 
607 int
608 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
609 {
610 	struct spdk_thread *thread;
611 
612 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
613 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
614 		assert(false);
615 		return -ENOTSUP;
616 	}
617 
618 	thread = spdk_get_thread();
619 	if (!thread) {
620 		SPDK_ERRLOG("Called from non-SPDK thread\n");
621 		assert(false);
622 		return -EINVAL;
623 	}
624 
625 	spdk_cpuset_copy(&thread->cpumask, cpumask);
626 
627 	/* Invoke framework's reschedule operation. If this function is called multiple times
628 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
629 	 * reschedule operation.
630 	 */
631 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
632 
633 	return 0;
634 }
635 
636 struct spdk_thread *
637 spdk_thread_get_from_ctx(void *ctx)
638 {
639 	if (ctx == NULL) {
640 		assert(false);
641 		return NULL;
642 	}
643 
644 	assert(g_ctx_sz > 0);
645 
646 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
647 }
648 
649 static inline uint32_t
650 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
651 {
652 	unsigned count, i;
653 	void *messages[SPDK_MSG_BATCH_SIZE];
654 	uint64_t notify = 1;
655 	int rc;
656 
657 #ifdef DEBUG
658 	/*
659 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
660 	 * so we will never actually read uninitialized data from events, but just to be sure
661 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
662 	 */
663 	memset(messages, 0, sizeof(messages));
664 #endif
665 
666 	if (max_msgs > 0) {
667 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
668 	} else {
669 		max_msgs = SPDK_MSG_BATCH_SIZE;
670 	}
671 
672 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
673 	if (spdk_unlikely(thread->in_interrupt) &&
674 	    spdk_ring_count(thread->messages) != 0) {
675 		rc = write(thread->msg_fd, &notify, sizeof(notify));
676 		if (rc < 0) {
677 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
678 		}
679 	}
680 	if (count == 0) {
681 		return 0;
682 	}
683 
684 	for (i = 0; i < count; i++) {
685 		struct spdk_msg *msg = messages[i];
686 
687 		assert(msg != NULL);
688 		msg->fn(msg->arg);
689 
690 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
691 			/* Insert the messages at the head. We want to re-use the hot
692 			 * ones. */
693 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
694 			thread->msg_cache_count++;
695 		} else {
696 			spdk_mempool_put(g_spdk_msg_mempool, msg);
697 		}
698 	}
699 
700 	return count;
701 }
702 
703 static void
704 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
705 {
706 	struct spdk_poller *tmp __attribute__((unused));
707 
708 	poller->next_run_tick = now + poller->period_ticks;
709 
710 	/*
711 	 * Insert poller in the thread's timed_pollers tree by next scheduled run time
712 	 * as its key.
713 	 */
714 	tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller);
715 	assert(tmp == NULL);
716 
717 	/* Update the cache only if it is empty or the inserted poller is earlier than it.
718 	 * RB_MIN() is not necessary here because all pollers, which has exactly the same
719 	 * next_run_tick as the existing poller, are inserted on the right side.
720 	 */
721 	if (thread->first_timed_poller == NULL ||
722 	    poller->next_run_tick < thread->first_timed_poller->next_run_tick) {
723 		thread->first_timed_poller = poller;
724 	}
725 }
726 
727 #ifdef __linux__
728 static inline void
729 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller)
730 {
731 	struct spdk_poller *tmp __attribute__((unused));
732 
733 	tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
734 	assert(tmp != NULL);
735 
736 	/* This function is not used in any case that is performance critical.
737 	 * Update the cache simply by RB_MIN() if it needs to be changed.
738 	 */
739 	if (thread->first_timed_poller == poller) {
740 		thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers);
741 	}
742 }
743 #endif
744 
745 static void
746 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
747 {
748 	if (poller->period_ticks) {
749 		poller_insert_timer(thread, poller, spdk_get_ticks());
750 	} else {
751 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
752 	}
753 }
754 
755 static inline void
756 thread_update_stats(struct spdk_thread *thread, uint64_t end,
757 		    uint64_t start, int rc)
758 {
759 	if (rc == 0) {
760 		/* Poller status idle */
761 		thread->stats.idle_tsc += end - start;
762 	} else if (rc > 0) {
763 		/* Poller status busy */
764 		thread->stats.busy_tsc += end - start;
765 	}
766 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
767 	thread->tsc_last = end;
768 }
769 
770 static inline int
771 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller)
772 {
773 	int rc;
774 
775 	switch (poller->state) {
776 	case SPDK_POLLER_STATE_UNREGISTERED:
777 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
778 		free(poller);
779 		return 0;
780 	case SPDK_POLLER_STATE_PAUSING:
781 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
782 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
783 		poller->state = SPDK_POLLER_STATE_PAUSED;
784 		return 0;
785 	case SPDK_POLLER_STATE_WAITING:
786 		break;
787 	default:
788 		assert(false);
789 		break;
790 	}
791 
792 	poller->state = SPDK_POLLER_STATE_RUNNING;
793 	rc = poller->fn(poller->arg);
794 
795 	poller->run_count++;
796 	if (rc > 0) {
797 		poller->busy_count++;
798 	}
799 
800 #ifdef DEBUG
801 	if (rc == -1) {
802 		SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
803 	}
804 #endif
805 
806 	switch (poller->state) {
807 	case SPDK_POLLER_STATE_UNREGISTERED:
808 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
809 		free(poller);
810 		break;
811 	case SPDK_POLLER_STATE_PAUSING:
812 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
813 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
814 		poller->state = SPDK_POLLER_STATE_PAUSED;
815 		break;
816 	case SPDK_POLLER_STATE_PAUSED:
817 	case SPDK_POLLER_STATE_WAITING:
818 		break;
819 	case SPDK_POLLER_STATE_RUNNING:
820 		poller->state = SPDK_POLLER_STATE_WAITING;
821 		break;
822 	default:
823 		assert(false);
824 		break;
825 	}
826 
827 	return rc;
828 }
829 
830 static inline int
831 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller,
832 			    uint64_t now)
833 {
834 	int rc;
835 
836 	switch (poller->state) {
837 	case SPDK_POLLER_STATE_UNREGISTERED:
838 		free(poller);
839 		return 0;
840 	case SPDK_POLLER_STATE_PAUSING:
841 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
842 		poller->state = SPDK_POLLER_STATE_PAUSED;
843 		return 0;
844 	case SPDK_POLLER_STATE_WAITING:
845 		break;
846 	default:
847 		assert(false);
848 		break;
849 	}
850 
851 	poller->state = SPDK_POLLER_STATE_RUNNING;
852 	rc = poller->fn(poller->arg);
853 
854 	poller->run_count++;
855 	if (rc > 0) {
856 		poller->busy_count++;
857 	}
858 
859 #ifdef DEBUG
860 	if (rc == -1) {
861 		SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
862 	}
863 #endif
864 
865 	switch (poller->state) {
866 	case SPDK_POLLER_STATE_UNREGISTERED:
867 		free(poller);
868 		break;
869 	case SPDK_POLLER_STATE_PAUSING:
870 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
871 		poller->state = SPDK_POLLER_STATE_PAUSED;
872 		break;
873 	case SPDK_POLLER_STATE_PAUSED:
874 		break;
875 	case SPDK_POLLER_STATE_RUNNING:
876 		poller->state = SPDK_POLLER_STATE_WAITING;
877 	/* fallthrough */
878 	case SPDK_POLLER_STATE_WAITING:
879 		poller_insert_timer(thread, poller, now);
880 		break;
881 	default:
882 		assert(false);
883 		break;
884 	}
885 
886 	return rc;
887 }
888 
889 static int
890 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
891 {
892 	uint32_t msg_count;
893 	struct spdk_poller *poller, *tmp;
894 	spdk_msg_fn critical_msg;
895 	int rc = 0;
896 
897 	thread->tsc_last = now;
898 
899 	critical_msg = thread->critical_msg;
900 	if (spdk_unlikely(critical_msg != NULL)) {
901 		critical_msg(NULL);
902 		thread->critical_msg = NULL;
903 		rc = 1;
904 	}
905 
906 	msg_count = msg_queue_run_batch(thread, max_msgs);
907 	if (msg_count) {
908 		rc = 1;
909 	}
910 
911 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
912 				   active_pollers_head, tailq, tmp) {
913 		int poller_rc;
914 
915 		poller_rc = thread_execute_poller(thread, poller);
916 		if (poller_rc > rc) {
917 			rc = poller_rc;
918 		}
919 	}
920 
921 	poller = thread->first_timed_poller;
922 	while (poller != NULL) {
923 		int timer_rc = 0;
924 
925 		if (now < poller->next_run_tick) {
926 			break;
927 		}
928 
929 		tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller);
930 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
931 
932 		/* Update the cache to the next timed poller in the list
933 		 * only if the current poller is still the closest, otherwise,
934 		 * do nothing because the cache has been already updated.
935 		 */
936 		if (thread->first_timed_poller == poller) {
937 			thread->first_timed_poller = tmp;
938 		}
939 
940 		timer_rc = thread_execute_timed_poller(thread, poller, now);
941 		if (timer_rc > rc) {
942 			rc = timer_rc;
943 		}
944 
945 		poller = tmp;
946 	}
947 
948 	return rc;
949 }
950 
951 int
952 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
953 {
954 	struct spdk_thread *orig_thread;
955 	int rc;
956 	uint64_t notify = 1;
957 
958 	orig_thread = _get_thread();
959 	tls_thread = thread;
960 
961 	if (now == 0) {
962 		now = spdk_get_ticks();
963 	}
964 
965 	if (spdk_likely(!thread->in_interrupt)) {
966 		rc = thread_poll(thread, max_msgs, now);
967 		if (spdk_unlikely(thread->in_interrupt)) {
968 			/* The thread transitioned to interrupt mode during the above poll.
969 			 * Poll it one more time in case that during the transition time
970 			 * there is msg received without notification.
971 			 */
972 			rc = thread_poll(thread, max_msgs, now);
973 		}
974 	} else {
975 		/* Non-block wait on thread's fd_group */
976 		rc = spdk_fd_group_wait(thread->fgrp, 0);
977 		if (spdk_unlikely(!thread->in_interrupt)) {
978 			/* The thread transitioned to poll mode in a msg during the above processing.
979 			 * Clear msg_fd since thread messages will be polled directly in poll mode.
980 			 */
981 			rc = read(thread->msg_fd, &notify, sizeof(notify));
982 			if (rc < 0 && errno != EAGAIN) {
983 				SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno));
984 			}
985 		}
986 	}
987 
988 
989 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
990 		thread_exit(thread, now);
991 	}
992 
993 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
994 
995 	tls_thread = orig_thread;
996 
997 	return rc;
998 }
999 
1000 uint64_t
1001 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
1002 {
1003 	struct spdk_poller *poller;
1004 
1005 	poller = thread->first_timed_poller;
1006 	if (poller) {
1007 		return poller->next_run_tick;
1008 	}
1009 
1010 	return 0;
1011 }
1012 
1013 int
1014 spdk_thread_has_active_pollers(struct spdk_thread *thread)
1015 {
1016 	return !TAILQ_EMPTY(&thread->active_pollers);
1017 }
1018 
1019 static bool
1020 thread_has_unpaused_pollers(struct spdk_thread *thread)
1021 {
1022 	if (TAILQ_EMPTY(&thread->active_pollers) &&
1023 	    RB_EMPTY(&thread->timed_pollers)) {
1024 		return false;
1025 	}
1026 
1027 	return true;
1028 }
1029 
1030 bool
1031 spdk_thread_has_pollers(struct spdk_thread *thread)
1032 {
1033 	if (!thread_has_unpaused_pollers(thread) &&
1034 	    TAILQ_EMPTY(&thread->paused_pollers)) {
1035 		return false;
1036 	}
1037 
1038 	return true;
1039 }
1040 
1041 bool
1042 spdk_thread_is_idle(struct spdk_thread *thread)
1043 {
1044 	if (spdk_ring_count(thread->messages) ||
1045 	    thread_has_unpaused_pollers(thread) ||
1046 	    thread->critical_msg != NULL) {
1047 		return false;
1048 	}
1049 
1050 	return true;
1051 }
1052 
1053 uint32_t
1054 spdk_thread_get_count(void)
1055 {
1056 	/*
1057 	 * Return cached value of the current thread count.  We could acquire the
1058 	 *  lock and iterate through the TAILQ of threads to count them, but that
1059 	 *  count could still be invalidated after we release the lock.
1060 	 */
1061 	return g_thread_count;
1062 }
1063 
1064 struct spdk_thread *
1065 spdk_get_thread(void)
1066 {
1067 	return _get_thread();
1068 }
1069 
1070 const char *
1071 spdk_thread_get_name(const struct spdk_thread *thread)
1072 {
1073 	return thread->name;
1074 }
1075 
1076 uint64_t
1077 spdk_thread_get_id(const struct spdk_thread *thread)
1078 {
1079 	return thread->id;
1080 }
1081 
1082 struct spdk_thread *
1083 spdk_thread_get_by_id(uint64_t id)
1084 {
1085 	struct spdk_thread *thread;
1086 
1087 	if (id == 0 || id >= g_thread_id) {
1088 		SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
1089 		return NULL;
1090 	}
1091 	pthread_mutex_lock(&g_devlist_mutex);
1092 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1093 		if (thread->id == id) {
1094 			break;
1095 		}
1096 	}
1097 	pthread_mutex_unlock(&g_devlist_mutex);
1098 	return thread;
1099 }
1100 
1101 int
1102 spdk_thread_get_stats(struct spdk_thread_stats *stats)
1103 {
1104 	struct spdk_thread *thread;
1105 
1106 	thread = _get_thread();
1107 	if (!thread) {
1108 		SPDK_ERRLOG("No thread allocated\n");
1109 		return -EINVAL;
1110 	}
1111 
1112 	if (stats == NULL) {
1113 		return -EINVAL;
1114 	}
1115 
1116 	*stats = thread->stats;
1117 
1118 	return 0;
1119 }
1120 
1121 uint64_t
1122 spdk_thread_get_last_tsc(struct spdk_thread *thread)
1123 {
1124 	if (thread == NULL) {
1125 		thread = _get_thread();
1126 	}
1127 
1128 	return thread->tsc_last;
1129 }
1130 
1131 static inline int
1132 thread_send_msg_notification(const struct spdk_thread *target_thread)
1133 {
1134 	uint64_t notify = 1;
1135 	int rc;
1136 
1137 	/* Not necessary to do notification if interrupt facility is not enabled */
1138 	if (spdk_likely(!spdk_interrupt_mode_is_enabled())) {
1139 		return 0;
1140 	}
1141 
1142 	/* When each spdk_thread can switch between poll and interrupt mode dynamically,
1143 	 * after sending thread msg, it is necessary to check whether target thread runs in
1144 	 * interrupt mode and then decide whether do event notification.
1145 	 */
1146 	if (spdk_unlikely(target_thread->in_interrupt)) {
1147 		rc = write(target_thread->msg_fd, &notify, sizeof(notify));
1148 		if (rc < 0) {
1149 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
1150 			return -EIO;
1151 		}
1152 	}
1153 
1154 	return 0;
1155 }
1156 
1157 int
1158 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
1159 {
1160 	struct spdk_thread *local_thread;
1161 	struct spdk_msg *msg;
1162 	int rc;
1163 
1164 	assert(thread != NULL);
1165 
1166 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1167 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
1168 		return -EIO;
1169 	}
1170 
1171 	local_thread = _get_thread();
1172 
1173 	msg = NULL;
1174 	if (local_thread != NULL) {
1175 		if (local_thread->msg_cache_count > 0) {
1176 			msg = SLIST_FIRST(&local_thread->msg_cache);
1177 			assert(msg != NULL);
1178 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
1179 			local_thread->msg_cache_count--;
1180 		}
1181 	}
1182 
1183 	if (msg == NULL) {
1184 		msg = spdk_mempool_get(g_spdk_msg_mempool);
1185 		if (!msg) {
1186 			SPDK_ERRLOG("msg could not be allocated\n");
1187 			return -ENOMEM;
1188 		}
1189 	}
1190 
1191 	msg->fn = fn;
1192 	msg->arg = ctx;
1193 
1194 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
1195 	if (rc != 1) {
1196 		SPDK_ERRLOG("msg could not be enqueued\n");
1197 		spdk_mempool_put(g_spdk_msg_mempool, msg);
1198 		return -EIO;
1199 	}
1200 
1201 	return thread_send_msg_notification(thread);
1202 }
1203 
1204 int
1205 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
1206 {
1207 	spdk_msg_fn expected = NULL;
1208 
1209 	if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
1210 					 __ATOMIC_SEQ_CST)) {
1211 		return -EIO;
1212 	}
1213 
1214 	return thread_send_msg_notification(thread);
1215 }
1216 
1217 #ifdef __linux__
1218 static int
1219 interrupt_timerfd_process(void *arg)
1220 {
1221 	struct spdk_poller *poller = arg;
1222 	uint64_t exp;
1223 	int rc;
1224 
1225 	/* clear the level of interval timer */
1226 	rc = read(poller->interruptfd, &exp, sizeof(exp));
1227 	if (rc < 0) {
1228 		if (rc == -EAGAIN) {
1229 			return 0;
1230 		}
1231 
1232 		return rc;
1233 	}
1234 
1235 	return poller->fn(poller->arg);
1236 }
1237 
1238 static int
1239 period_poller_interrupt_init(struct spdk_poller *poller)
1240 {
1241 	struct spdk_fd_group *fgrp = poller->thread->fgrp;
1242 	int timerfd;
1243 	int rc;
1244 
1245 	SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name);
1246 	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
1247 	if (timerfd < 0) {
1248 		return -errno;
1249 	}
1250 
1251 	rc = spdk_fd_group_add(fgrp, timerfd,
1252 			       interrupt_timerfd_process, poller);
1253 	if (rc < 0) {
1254 		close(timerfd);
1255 		return rc;
1256 	}
1257 
1258 	poller->interruptfd = timerfd;
1259 	return 0;
1260 }
1261 
1262 static void
1263 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1264 {
1265 	int timerfd = poller->interruptfd;
1266 	uint64_t now_tick = spdk_get_ticks();
1267 	uint64_t ticks = spdk_get_ticks_hz();
1268 	int ret;
1269 	struct itimerspec new_tv = {};
1270 	struct itimerspec old_tv = {};
1271 
1272 	assert(poller->period_ticks != 0);
1273 	assert(timerfd >= 0);
1274 
1275 	SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name,
1276 		      interrupt_mode ? "interrupt" : "poll");
1277 
1278 	if (interrupt_mode) {
1279 		/* Set repeated timer expiration */
1280 		new_tv.it_interval.tv_sec = poller->period_ticks / ticks;
1281 		new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks;
1282 
1283 		/* Update next timer expiration */
1284 		if (poller->next_run_tick == 0) {
1285 			poller->next_run_tick = now_tick + poller->period_ticks;
1286 		} else if (poller->next_run_tick < now_tick) {
1287 			poller->next_run_tick = now_tick;
1288 		}
1289 
1290 		new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks;
1291 		new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks;
1292 
1293 		ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
1294 		if (ret < 0) {
1295 			SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno);
1296 			assert(false);
1297 		}
1298 	} else {
1299 		/* Disarm the timer */
1300 		ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv);
1301 		if (ret < 0) {
1302 			/* timerfd_settime's failure indicates that the timerfd is in error */
1303 			SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno);
1304 			assert(false);
1305 		}
1306 
1307 		/* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be
1308 		 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC
1309 		 */
1310 		now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \
1311 			   (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC;
1312 		poller_remove_timer(poller->thread, poller);
1313 		poller_insert_timer(poller->thread, poller, now_tick);
1314 	}
1315 }
1316 
1317 static void
1318 poller_interrupt_fini(struct spdk_poller *poller)
1319 {
1320 	SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name);
1321 	assert(poller->interruptfd >= 0);
1322 	spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd);
1323 	close(poller->interruptfd);
1324 	poller->interruptfd = -1;
1325 }
1326 
1327 static int
1328 busy_poller_interrupt_init(struct spdk_poller *poller)
1329 {
1330 	int busy_efd;
1331 	int rc;
1332 
1333 	SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name);
1334 	busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1335 	if (busy_efd < 0) {
1336 		SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name);
1337 		return -errno;
1338 	}
1339 
1340 	rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, poller->fn, poller->arg);
1341 	if (rc < 0) {
1342 		close(busy_efd);
1343 		return rc;
1344 	}
1345 
1346 	poller->interruptfd = busy_efd;
1347 	return 0;
1348 }
1349 
1350 static void
1351 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1352 {
1353 	int busy_efd = poller->interruptfd;
1354 	uint64_t notify = 1;
1355 	int rc __attribute__((unused));
1356 
1357 	assert(busy_efd >= 0);
1358 
1359 	if (interrupt_mode) {
1360 		/* Write without read on eventfd will get it repeatedly triggered. */
1361 		if (write(busy_efd, &notify, sizeof(notify)) < 0) {
1362 			SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name);
1363 		}
1364 	} else {
1365 		/* Read on eventfd will clear its level triggering. */
1366 		rc = read(busy_efd, &notify, sizeof(notify));
1367 	}
1368 }
1369 
1370 #else
1371 
1372 static int
1373 period_poller_interrupt_init(struct spdk_poller *poller)
1374 {
1375 	return -ENOTSUP;
1376 }
1377 
1378 static void
1379 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1380 {
1381 }
1382 
1383 static void
1384 poller_interrupt_fini(struct spdk_poller *poller)
1385 {
1386 }
1387 
1388 static int
1389 busy_poller_interrupt_init(struct spdk_poller *poller)
1390 {
1391 	return -ENOTSUP;
1392 }
1393 
1394 static void
1395 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1396 {
1397 }
1398 
1399 #endif
1400 
1401 void
1402 spdk_poller_register_interrupt(struct spdk_poller *poller,
1403 			       spdk_poller_set_interrupt_mode_cb cb_fn,
1404 			       void *cb_arg)
1405 {
1406 	assert(poller != NULL);
1407 	assert(cb_fn != NULL);
1408 	assert(spdk_get_thread() == poller->thread);
1409 
1410 	if (!spdk_interrupt_mode_is_enabled()) {
1411 		return;
1412 	}
1413 
1414 	/* when a poller is created we don't know if the user is ever going to
1415 	 * enable interrupts on it by calling this function, so the poller
1416 	 * registration function has to immediately create a interruptfd.
1417 	 * When this function does get called by user, we have to then destroy
1418 	 * that interruptfd.
1419 	 */
1420 	if (poller->set_intr_cb_fn && poller->interruptfd >= 0) {
1421 		poller_interrupt_fini(poller);
1422 	}
1423 
1424 	poller->set_intr_cb_fn = cb_fn;
1425 	poller->set_intr_cb_arg = cb_arg;
1426 
1427 	/* Set poller into interrupt mode if thread is in interrupt. */
1428 	if (poller->thread->in_interrupt) {
1429 		poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true);
1430 	}
1431 }
1432 
1433 static uint64_t
1434 convert_us_to_ticks(uint64_t us)
1435 {
1436 	uint64_t quotient, remainder, ticks;
1437 
1438 	if (us) {
1439 		quotient = us / SPDK_SEC_TO_USEC;
1440 		remainder = us % SPDK_SEC_TO_USEC;
1441 		ticks = spdk_get_ticks_hz();
1442 
1443 		return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
1444 	} else {
1445 		return 0;
1446 	}
1447 }
1448 
1449 static struct spdk_poller *
1450 poller_register(spdk_poller_fn fn,
1451 		void *arg,
1452 		uint64_t period_microseconds,
1453 		const char *name)
1454 {
1455 	struct spdk_thread *thread;
1456 	struct spdk_poller *poller;
1457 
1458 	thread = spdk_get_thread();
1459 	if (!thread) {
1460 		assert(false);
1461 		return NULL;
1462 	}
1463 
1464 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1465 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1466 		return NULL;
1467 	}
1468 
1469 	poller = calloc(1, sizeof(*poller));
1470 	if (poller == NULL) {
1471 		SPDK_ERRLOG("Poller memory allocation failed\n");
1472 		return NULL;
1473 	}
1474 
1475 	if (name) {
1476 		snprintf(poller->name, sizeof(poller->name), "%s", name);
1477 	} else {
1478 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
1479 	}
1480 
1481 	poller->state = SPDK_POLLER_STATE_WAITING;
1482 	poller->fn = fn;
1483 	poller->arg = arg;
1484 	poller->thread = thread;
1485 	poller->interruptfd = -1;
1486 
1487 	poller->period_ticks = convert_us_to_ticks(period_microseconds);
1488 
1489 	if (spdk_interrupt_mode_is_enabled()) {
1490 		int rc;
1491 
1492 		if (period_microseconds) {
1493 			rc = period_poller_interrupt_init(poller);
1494 			if (rc < 0) {
1495 				SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc));
1496 				free(poller);
1497 				return NULL;
1498 			}
1499 
1500 			spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL);
1501 		} else {
1502 			/* If the poller doesn't have a period, create interruptfd that's always
1503 			 * busy automatically when runnning in interrupt mode.
1504 			 */
1505 			rc = busy_poller_interrupt_init(poller);
1506 			if (rc > 0) {
1507 				SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc));
1508 				free(poller);
1509 				return NULL;
1510 			}
1511 
1512 			spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL);
1513 		}
1514 	}
1515 
1516 	thread_insert_poller(thread, poller);
1517 
1518 	return poller;
1519 }
1520 
1521 struct spdk_poller *
1522 spdk_poller_register(spdk_poller_fn fn,
1523 		     void *arg,
1524 		     uint64_t period_microseconds)
1525 {
1526 	return poller_register(fn, arg, period_microseconds, NULL);
1527 }
1528 
1529 struct spdk_poller *
1530 spdk_poller_register_named(spdk_poller_fn fn,
1531 			   void *arg,
1532 			   uint64_t period_microseconds,
1533 			   const char *name)
1534 {
1535 	return poller_register(fn, arg, period_microseconds, name);
1536 }
1537 
1538 void
1539 spdk_poller_unregister(struct spdk_poller **ppoller)
1540 {
1541 	struct spdk_thread *thread;
1542 	struct spdk_poller *poller;
1543 
1544 	poller = *ppoller;
1545 	if (poller == NULL) {
1546 		return;
1547 	}
1548 
1549 	*ppoller = NULL;
1550 
1551 	thread = spdk_get_thread();
1552 	if (!thread) {
1553 		assert(false);
1554 		return;
1555 	}
1556 
1557 	if (poller->thread != thread) {
1558 		SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n");
1559 		assert(false);
1560 		return;
1561 	}
1562 
1563 	if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) {
1564 		poller_interrupt_fini(poller);
1565 	}
1566 
1567 	/* If the poller was paused, put it on the active_pollers list so that
1568 	 * its unregistration can be processed by spdk_thread_poll().
1569 	 */
1570 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1571 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1572 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1573 		poller->period_ticks = 0;
1574 	}
1575 
1576 	/* Simply set the state to unregistered. The poller will get cleaned up
1577 	 * in a subsequent call to spdk_thread_poll().
1578 	 */
1579 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1580 }
1581 
1582 void
1583 spdk_poller_pause(struct spdk_poller *poller)
1584 {
1585 	struct spdk_thread *thread;
1586 
1587 	thread = spdk_get_thread();
1588 	if (!thread) {
1589 		assert(false);
1590 		return;
1591 	}
1592 
1593 	if (poller->thread != thread) {
1594 		SPDK_ERRLOG("different from the thread that called spdk_poller_pause()\n");
1595 		assert(false);
1596 		return;
1597 	}
1598 
1599 	/* We just set its state to SPDK_POLLER_STATE_PAUSING and let
1600 	 * spdk_thread_poll() move it. It allows a poller to be paused from
1601 	 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE
1602 	 * iteration, or from within itself without breaking the logic to always
1603 	 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration.
1604 	 */
1605 	switch (poller->state) {
1606 	case SPDK_POLLER_STATE_PAUSED:
1607 	case SPDK_POLLER_STATE_PAUSING:
1608 		break;
1609 	case SPDK_POLLER_STATE_RUNNING:
1610 	case SPDK_POLLER_STATE_WAITING:
1611 		poller->state = SPDK_POLLER_STATE_PAUSING;
1612 		break;
1613 	default:
1614 		assert(false);
1615 		break;
1616 	}
1617 }
1618 
1619 void
1620 spdk_poller_resume(struct spdk_poller *poller)
1621 {
1622 	struct spdk_thread *thread;
1623 
1624 	thread = spdk_get_thread();
1625 	if (!thread) {
1626 		assert(false);
1627 		return;
1628 	}
1629 
1630 	if (poller->thread != thread) {
1631 		SPDK_ERRLOG("different from the thread that called spdk_poller_resume()\n");
1632 		assert(false);
1633 		return;
1634 	}
1635 
1636 	/* If a poller is paused it has to be removed from the paused pollers
1637 	 * list and put on the active list or timer tree depending on its
1638 	 * period_ticks.  If a poller is still in the process of being paused,
1639 	 * we just need to flip its state back to waiting, as it's already on
1640 	 * the appropriate list or tree.
1641 	 */
1642 	switch (poller->state) {
1643 	case SPDK_POLLER_STATE_PAUSED:
1644 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1645 		thread_insert_poller(thread, poller);
1646 	/* fallthrough */
1647 	case SPDK_POLLER_STATE_PAUSING:
1648 		poller->state = SPDK_POLLER_STATE_WAITING;
1649 		break;
1650 	case SPDK_POLLER_STATE_RUNNING:
1651 	case SPDK_POLLER_STATE_WAITING:
1652 		break;
1653 	default:
1654 		assert(false);
1655 		break;
1656 	}
1657 }
1658 
1659 const char *
1660 spdk_poller_get_name(struct spdk_poller *poller)
1661 {
1662 	return poller->name;
1663 }
1664 
1665 const char *
1666 spdk_poller_get_state_str(struct spdk_poller *poller)
1667 {
1668 	switch (poller->state) {
1669 	case SPDK_POLLER_STATE_WAITING:
1670 		return "waiting";
1671 	case SPDK_POLLER_STATE_RUNNING:
1672 		return "running";
1673 	case SPDK_POLLER_STATE_UNREGISTERED:
1674 		return "unregistered";
1675 	case SPDK_POLLER_STATE_PAUSING:
1676 		return "pausing";
1677 	case SPDK_POLLER_STATE_PAUSED:
1678 		return "paused";
1679 	default:
1680 		return NULL;
1681 	}
1682 }
1683 
1684 uint64_t
1685 spdk_poller_get_period_ticks(struct spdk_poller *poller)
1686 {
1687 	return poller->period_ticks;
1688 }
1689 
1690 void
1691 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats)
1692 {
1693 	stats->run_count = poller->run_count;
1694 	stats->busy_count = poller->busy_count;
1695 }
1696 
1697 struct spdk_poller *
1698 spdk_thread_get_first_active_poller(struct spdk_thread *thread)
1699 {
1700 	return TAILQ_FIRST(&thread->active_pollers);
1701 }
1702 
1703 struct spdk_poller *
1704 spdk_thread_get_next_active_poller(struct spdk_poller *prev)
1705 {
1706 	return TAILQ_NEXT(prev, tailq);
1707 }
1708 
1709 struct spdk_poller *
1710 spdk_thread_get_first_timed_poller(struct spdk_thread *thread)
1711 {
1712 	return RB_MIN(timed_pollers_tree, &thread->timed_pollers);
1713 }
1714 
1715 struct spdk_poller *
1716 spdk_thread_get_next_timed_poller(struct spdk_poller *prev)
1717 {
1718 	return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev);
1719 }
1720 
1721 struct spdk_poller *
1722 spdk_thread_get_first_paused_poller(struct spdk_thread *thread)
1723 {
1724 	return TAILQ_FIRST(&thread->paused_pollers);
1725 }
1726 
1727 struct spdk_poller *
1728 spdk_thread_get_next_paused_poller(struct spdk_poller *prev)
1729 {
1730 	return TAILQ_NEXT(prev, tailq);
1731 }
1732 
1733 struct spdk_io_channel *
1734 spdk_thread_get_first_io_channel(struct spdk_thread *thread)
1735 {
1736 	return TAILQ_FIRST(&thread->io_channels);
1737 }
1738 
1739 struct spdk_io_channel *
1740 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev)
1741 {
1742 	return TAILQ_NEXT(prev, tailq);
1743 }
1744 
1745 struct call_thread {
1746 	struct spdk_thread *cur_thread;
1747 	spdk_msg_fn fn;
1748 	void *ctx;
1749 
1750 	struct spdk_thread *orig_thread;
1751 	spdk_msg_fn cpl;
1752 };
1753 
1754 static void
1755 _on_thread(void *ctx)
1756 {
1757 	struct call_thread *ct = ctx;
1758 	int rc __attribute__((unused));
1759 
1760 	ct->fn(ct->ctx);
1761 
1762 	pthread_mutex_lock(&g_devlist_mutex);
1763 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1764 	pthread_mutex_unlock(&g_devlist_mutex);
1765 
1766 	if (!ct->cur_thread) {
1767 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1768 
1769 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1770 		free(ctx);
1771 	} else {
1772 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1773 			      ct->cur_thread->name);
1774 
1775 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1776 	}
1777 	assert(rc == 0);
1778 }
1779 
1780 void
1781 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1782 {
1783 	struct call_thread *ct;
1784 	struct spdk_thread *thread;
1785 	int rc __attribute__((unused));
1786 
1787 	ct = calloc(1, sizeof(*ct));
1788 	if (!ct) {
1789 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1790 		cpl(ctx);
1791 		return;
1792 	}
1793 
1794 	ct->fn = fn;
1795 	ct->ctx = ctx;
1796 	ct->cpl = cpl;
1797 
1798 	thread = _get_thread();
1799 	if (!thread) {
1800 		SPDK_ERRLOG("No thread allocated\n");
1801 		free(ct);
1802 		cpl(ctx);
1803 		return;
1804 	}
1805 	ct->orig_thread = thread;
1806 
1807 	pthread_mutex_lock(&g_devlist_mutex);
1808 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1809 	pthread_mutex_unlock(&g_devlist_mutex);
1810 
1811 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
1812 		      ct->orig_thread->name);
1813 
1814 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1815 	assert(rc == 0);
1816 }
1817 
1818 static inline void
1819 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode)
1820 {
1821 	if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1822 		return;
1823 	}
1824 
1825 	if (!poller->set_intr_cb_fn) {
1826 		SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name);
1827 		assert(false);
1828 		return;
1829 	}
1830 
1831 	poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode);
1832 }
1833 
1834 void
1835 spdk_thread_set_interrupt_mode(bool enable_interrupt)
1836 {
1837 	struct spdk_thread *thread = _get_thread();
1838 	struct spdk_poller *poller, *tmp;
1839 
1840 	assert(thread);
1841 	assert(spdk_interrupt_mode_is_enabled());
1842 
1843 	if (thread->in_interrupt == enable_interrupt) {
1844 		return;
1845 	}
1846 
1847 	/* Set pollers to expected mode */
1848 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
1849 		poller_set_interrupt_mode(poller, enable_interrupt);
1850 	}
1851 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) {
1852 		poller_set_interrupt_mode(poller, enable_interrupt);
1853 	}
1854 	/* All paused pollers will go to work in interrupt mode */
1855 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) {
1856 		poller_set_interrupt_mode(poller, enable_interrupt);
1857 	}
1858 
1859 	thread->in_interrupt = enable_interrupt;
1860 	return;
1861 }
1862 
1863 void
1864 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1865 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1866 			const char *name)
1867 {
1868 	struct io_device *dev, *tmp;
1869 	struct spdk_thread *thread;
1870 
1871 	assert(io_device != NULL);
1872 	assert(create_cb != NULL);
1873 	assert(destroy_cb != NULL);
1874 
1875 	thread = spdk_get_thread();
1876 	if (!thread) {
1877 		SPDK_ERRLOG("called from non-SPDK thread\n");
1878 		assert(false);
1879 		return;
1880 	}
1881 
1882 	dev = calloc(1, sizeof(struct io_device));
1883 	if (dev == NULL) {
1884 		SPDK_ERRLOG("could not allocate io_device\n");
1885 		return;
1886 	}
1887 
1888 	dev->io_device = io_device;
1889 	if (name) {
1890 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1891 	} else {
1892 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1893 	}
1894 	dev->create_cb = create_cb;
1895 	dev->destroy_cb = destroy_cb;
1896 	dev->unregister_cb = NULL;
1897 	dev->ctx_size = ctx_size;
1898 	dev->for_each_count = 0;
1899 	dev->unregistered = false;
1900 	dev->refcnt = 0;
1901 
1902 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
1903 		      dev->name, dev->io_device, thread->name);
1904 
1905 	pthread_mutex_lock(&g_devlist_mutex);
1906 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
1907 		if (tmp->io_device == io_device) {
1908 			SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1909 				    io_device, tmp->name, dev->name);
1910 			free(dev);
1911 			pthread_mutex_unlock(&g_devlist_mutex);
1912 			return;
1913 		}
1914 	}
1915 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1916 	pthread_mutex_unlock(&g_devlist_mutex);
1917 }
1918 
1919 static void
1920 _finish_unregister(void *arg)
1921 {
1922 	struct io_device *dev = arg;
1923 	struct spdk_thread *thread;
1924 
1925 	thread = spdk_get_thread();
1926 	assert(thread == dev->unregister_thread);
1927 
1928 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1929 		      dev->name, dev->io_device, thread->name);
1930 
1931 	assert(thread->pending_unregister_count > 0);
1932 	thread->pending_unregister_count--;
1933 
1934 	dev->unregister_cb(dev->io_device);
1935 	free(dev);
1936 }
1937 
1938 static void
1939 io_device_free(struct io_device *dev)
1940 {
1941 	int rc __attribute__((unused));
1942 
1943 	if (dev->unregister_cb == NULL) {
1944 		free(dev);
1945 	} else {
1946 		assert(dev->unregister_thread != NULL);
1947 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
1948 			      dev->name, dev->io_device, dev->unregister_thread->name);
1949 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1950 		assert(rc == 0);
1951 	}
1952 }
1953 
1954 void
1955 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1956 {
1957 	struct io_device *dev;
1958 	uint32_t refcnt;
1959 	struct spdk_thread *thread;
1960 
1961 	thread = spdk_get_thread();
1962 	if (!thread) {
1963 		SPDK_ERRLOG("called from non-SPDK thread\n");
1964 		assert(false);
1965 		return;
1966 	}
1967 
1968 	pthread_mutex_lock(&g_devlist_mutex);
1969 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1970 		if (dev->io_device == io_device) {
1971 			break;
1972 		}
1973 	}
1974 
1975 	if (!dev) {
1976 		SPDK_ERRLOG("io_device %p not found\n", io_device);
1977 		assert(false);
1978 		pthread_mutex_unlock(&g_devlist_mutex);
1979 		return;
1980 	}
1981 
1982 	if (dev->for_each_count > 0) {
1983 		SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1984 			    dev->name, io_device, dev->for_each_count);
1985 		pthread_mutex_unlock(&g_devlist_mutex);
1986 		return;
1987 	}
1988 
1989 	dev->unregister_cb = unregister_cb;
1990 	dev->unregistered = true;
1991 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
1992 	refcnt = dev->refcnt;
1993 	dev->unregister_thread = thread;
1994 	pthread_mutex_unlock(&g_devlist_mutex);
1995 
1996 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
1997 		      dev->name, dev->io_device, thread->name);
1998 
1999 	if (unregister_cb) {
2000 		thread->pending_unregister_count++;
2001 	}
2002 
2003 	if (refcnt > 0) {
2004 		/* defer deletion */
2005 		return;
2006 	}
2007 
2008 	io_device_free(dev);
2009 }
2010 
2011 const char *
2012 spdk_io_device_get_name(struct io_device *dev)
2013 {
2014 	return dev->name;
2015 }
2016 
2017 struct spdk_io_channel *
2018 spdk_get_io_channel(void *io_device)
2019 {
2020 	struct spdk_io_channel *ch;
2021 	struct spdk_thread *thread;
2022 	struct io_device *dev;
2023 	int rc;
2024 
2025 	pthread_mutex_lock(&g_devlist_mutex);
2026 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
2027 		if (dev->io_device == io_device) {
2028 			break;
2029 		}
2030 	}
2031 	if (dev == NULL) {
2032 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
2033 		pthread_mutex_unlock(&g_devlist_mutex);
2034 		return NULL;
2035 	}
2036 
2037 	thread = _get_thread();
2038 	if (!thread) {
2039 		SPDK_ERRLOG("No thread allocated\n");
2040 		pthread_mutex_unlock(&g_devlist_mutex);
2041 		return NULL;
2042 	}
2043 
2044 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
2045 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
2046 		pthread_mutex_unlock(&g_devlist_mutex);
2047 		return NULL;
2048 	}
2049 
2050 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
2051 		if (ch->dev == dev) {
2052 			ch->ref++;
2053 
2054 			SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2055 				      ch, dev->name, dev->io_device, thread->name, ch->ref);
2056 
2057 			/*
2058 			 * An I/O channel already exists for this device on this
2059 			 *  thread, so return it.
2060 			 */
2061 			pthread_mutex_unlock(&g_devlist_mutex);
2062 			spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0,
2063 					  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2064 			return ch;
2065 		}
2066 	}
2067 
2068 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
2069 	if (ch == NULL) {
2070 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
2071 		pthread_mutex_unlock(&g_devlist_mutex);
2072 		return NULL;
2073 	}
2074 
2075 	ch->dev = dev;
2076 	ch->destroy_cb = dev->destroy_cb;
2077 	ch->thread = thread;
2078 	ch->ref = 1;
2079 	ch->destroy_ref = 0;
2080 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
2081 
2082 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2083 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
2084 
2085 	dev->refcnt++;
2086 
2087 	pthread_mutex_unlock(&g_devlist_mutex);
2088 
2089 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
2090 	if (rc != 0) {
2091 		pthread_mutex_lock(&g_devlist_mutex);
2092 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
2093 		dev->refcnt--;
2094 		free(ch);
2095 		pthread_mutex_unlock(&g_devlist_mutex);
2096 		return NULL;
2097 	}
2098 
2099 	spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1);
2100 	return ch;
2101 }
2102 
2103 static void
2104 put_io_channel(void *arg)
2105 {
2106 	struct spdk_io_channel *ch = arg;
2107 	bool do_remove_dev = true;
2108 	struct spdk_thread *thread;
2109 
2110 	thread = spdk_get_thread();
2111 	if (!thread) {
2112 		SPDK_ERRLOG("called from non-SPDK thread\n");
2113 		assert(false);
2114 		return;
2115 	}
2116 
2117 	SPDK_DEBUGLOG(thread,
2118 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
2119 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
2120 
2121 	assert(ch->thread == thread);
2122 
2123 	ch->destroy_ref--;
2124 
2125 	if (ch->ref > 0 || ch->destroy_ref > 0) {
2126 		/*
2127 		 * Another reference to the associated io_device was requested
2128 		 *  after this message was sent but before it had a chance to
2129 		 *  execute.
2130 		 */
2131 		return;
2132 	}
2133 
2134 	pthread_mutex_lock(&g_devlist_mutex);
2135 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
2136 	pthread_mutex_unlock(&g_devlist_mutex);
2137 
2138 	/* Don't hold the devlist mutex while the destroy_cb is called. */
2139 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
2140 
2141 	pthread_mutex_lock(&g_devlist_mutex);
2142 	ch->dev->refcnt--;
2143 
2144 	if (!ch->dev->unregistered) {
2145 		do_remove_dev = false;
2146 	}
2147 
2148 	if (ch->dev->refcnt > 0) {
2149 		do_remove_dev = false;
2150 	}
2151 
2152 	pthread_mutex_unlock(&g_devlist_mutex);
2153 
2154 	if (do_remove_dev) {
2155 		io_device_free(ch->dev);
2156 	}
2157 	free(ch);
2158 }
2159 
2160 void
2161 spdk_put_io_channel(struct spdk_io_channel *ch)
2162 {
2163 	struct spdk_thread *thread;
2164 	int rc __attribute__((unused));
2165 
2166 	spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0,
2167 			  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2168 
2169 	thread = spdk_get_thread();
2170 	if (!thread) {
2171 		SPDK_ERRLOG("called from non-SPDK thread\n");
2172 		assert(false);
2173 		return;
2174 	}
2175 
2176 	if (ch->thread != thread) {
2177 		SPDK_ERRLOG("different from the thread that called get_io_channel()\n");
2178 		assert(false);
2179 		return;
2180 	}
2181 
2182 	SPDK_DEBUGLOG(thread,
2183 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2184 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
2185 
2186 	ch->ref--;
2187 
2188 	if (ch->ref == 0) {
2189 		ch->destroy_ref++;
2190 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
2191 		assert(rc == 0);
2192 	}
2193 }
2194 
2195 struct spdk_io_channel *
2196 spdk_io_channel_from_ctx(void *ctx)
2197 {
2198 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
2199 }
2200 
2201 struct spdk_thread *
2202 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
2203 {
2204 	return ch->thread;
2205 }
2206 
2207 void *
2208 spdk_io_channel_get_io_device(struct spdk_io_channel *ch)
2209 {
2210 	return ch->dev->io_device;
2211 }
2212 
2213 const char *
2214 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch)
2215 {
2216 	return spdk_io_device_get_name(ch->dev);
2217 }
2218 
2219 int
2220 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch)
2221 {
2222 	return ch->ref;
2223 }
2224 
2225 struct spdk_io_channel_iter {
2226 	void *io_device;
2227 	struct io_device *dev;
2228 	spdk_channel_msg fn;
2229 	int status;
2230 	void *ctx;
2231 	struct spdk_io_channel *ch;
2232 
2233 	struct spdk_thread *cur_thread;
2234 
2235 	struct spdk_thread *orig_thread;
2236 	spdk_channel_for_each_cpl cpl;
2237 };
2238 
2239 void *
2240 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
2241 {
2242 	return i->io_device;
2243 }
2244 
2245 struct spdk_io_channel *
2246 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
2247 {
2248 	return i->ch;
2249 }
2250 
2251 void *
2252 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
2253 {
2254 	return i->ctx;
2255 }
2256 
2257 static void
2258 _call_completion(void *ctx)
2259 {
2260 	struct spdk_io_channel_iter *i = ctx;
2261 
2262 	if (i->cpl != NULL) {
2263 		i->cpl(i, i->status);
2264 	}
2265 	free(i);
2266 }
2267 
2268 static void
2269 _call_channel(void *ctx)
2270 {
2271 	struct spdk_io_channel_iter *i = ctx;
2272 	struct spdk_io_channel *ch;
2273 
2274 	/*
2275 	 * It is possible that the channel was deleted before this
2276 	 *  message had a chance to execute.  If so, skip calling
2277 	 *  the fn() on this thread.
2278 	 */
2279 	pthread_mutex_lock(&g_devlist_mutex);
2280 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
2281 		if (ch->dev->io_device == i->io_device) {
2282 			break;
2283 		}
2284 	}
2285 	pthread_mutex_unlock(&g_devlist_mutex);
2286 
2287 	if (ch) {
2288 		i->fn(i);
2289 	} else {
2290 		spdk_for_each_channel_continue(i, 0);
2291 	}
2292 }
2293 
2294 void
2295 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
2296 		      spdk_channel_for_each_cpl cpl)
2297 {
2298 	struct spdk_thread *thread;
2299 	struct spdk_io_channel *ch;
2300 	struct spdk_io_channel_iter *i;
2301 	int rc __attribute__((unused));
2302 
2303 	i = calloc(1, sizeof(*i));
2304 	if (!i) {
2305 		SPDK_ERRLOG("Unable to allocate iterator\n");
2306 		return;
2307 	}
2308 
2309 	i->io_device = io_device;
2310 	i->fn = fn;
2311 	i->ctx = ctx;
2312 	i->cpl = cpl;
2313 
2314 	pthread_mutex_lock(&g_devlist_mutex);
2315 	i->orig_thread = _get_thread();
2316 
2317 	TAILQ_FOREACH(thread, &g_threads, tailq) {
2318 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
2319 			if (ch->dev->io_device == io_device) {
2320 				ch->dev->for_each_count++;
2321 				i->dev = ch->dev;
2322 				i->cur_thread = thread;
2323 				i->ch = ch;
2324 				pthread_mutex_unlock(&g_devlist_mutex);
2325 				rc = spdk_thread_send_msg(thread, _call_channel, i);
2326 				assert(rc == 0);
2327 				return;
2328 			}
2329 		}
2330 	}
2331 
2332 	pthread_mutex_unlock(&g_devlist_mutex);
2333 
2334 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2335 	assert(rc == 0);
2336 }
2337 
2338 void
2339 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
2340 {
2341 	struct spdk_thread *thread;
2342 	struct spdk_io_channel *ch;
2343 	int rc __attribute__((unused));
2344 
2345 	assert(i->cur_thread == spdk_get_thread());
2346 
2347 	i->status = status;
2348 
2349 	pthread_mutex_lock(&g_devlist_mutex);
2350 	if (status) {
2351 		goto end;
2352 	}
2353 	thread = TAILQ_NEXT(i->cur_thread, tailq);
2354 	while (thread) {
2355 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
2356 			if (ch->dev->io_device == i->io_device) {
2357 				i->cur_thread = thread;
2358 				i->ch = ch;
2359 				pthread_mutex_unlock(&g_devlist_mutex);
2360 				rc = spdk_thread_send_msg(thread, _call_channel, i);
2361 				assert(rc == 0);
2362 				return;
2363 			}
2364 		}
2365 		thread = TAILQ_NEXT(thread, tailq);
2366 	}
2367 
2368 end:
2369 	i->dev->for_each_count--;
2370 	i->ch = NULL;
2371 	pthread_mutex_unlock(&g_devlist_mutex);
2372 
2373 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2374 	assert(rc == 0);
2375 }
2376 
2377 struct spdk_interrupt {
2378 	int			efd;
2379 	struct spdk_thread	*thread;
2380 	char			name[SPDK_MAX_POLLER_NAME_LEN + 1];
2381 };
2382 
2383 static void
2384 thread_interrupt_destroy(struct spdk_thread *thread)
2385 {
2386 	struct spdk_fd_group *fgrp = thread->fgrp;
2387 
2388 	SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
2389 
2390 	if (thread->msg_fd < 0) {
2391 		return;
2392 	}
2393 
2394 	spdk_fd_group_remove(fgrp, thread->msg_fd);
2395 	close(thread->msg_fd);
2396 	thread->msg_fd = -1;
2397 
2398 	spdk_fd_group_destroy(fgrp);
2399 	thread->fgrp = NULL;
2400 }
2401 
2402 #ifdef __linux__
2403 static int
2404 thread_interrupt_msg_process(void *arg)
2405 {
2406 	struct spdk_thread *thread = arg;
2407 	uint32_t msg_count;
2408 	spdk_msg_fn critical_msg;
2409 	int rc = 0;
2410 	uint64_t notify = 1;
2411 
2412 	assert(spdk_interrupt_mode_is_enabled());
2413 
2414 	/* There may be race between msg_acknowledge and another producer's msg_notify,
2415 	 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
2416 	 * This can avoid msg notification missing.
2417 	 */
2418 	rc = read(thread->msg_fd, &notify, sizeof(notify));
2419 	if (rc < 0 && errno != EAGAIN) {
2420 		SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno));
2421 	}
2422 
2423 	critical_msg = thread->critical_msg;
2424 	if (spdk_unlikely(critical_msg != NULL)) {
2425 		critical_msg(NULL);
2426 		thread->critical_msg = NULL;
2427 		rc = 1;
2428 	}
2429 
2430 	msg_count = msg_queue_run_batch(thread, 0);
2431 	if (msg_count) {
2432 		rc = 1;
2433 	}
2434 
2435 	return rc;
2436 }
2437 
2438 static int
2439 thread_interrupt_create(struct spdk_thread *thread)
2440 {
2441 	int rc;
2442 
2443 	SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
2444 
2445 	rc = spdk_fd_group_create(&thread->fgrp);
2446 	if (rc) {
2447 		return rc;
2448 	}
2449 
2450 	thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2451 	if (thread->msg_fd < 0) {
2452 		rc = -errno;
2453 		spdk_fd_group_destroy(thread->fgrp);
2454 		thread->fgrp = NULL;
2455 
2456 		return rc;
2457 	}
2458 
2459 	return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread);
2460 }
2461 #else
2462 static int
2463 thread_interrupt_create(struct spdk_thread *thread)
2464 {
2465 	return -ENOTSUP;
2466 }
2467 #endif
2468 
2469 struct spdk_interrupt *
2470 spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
2471 			void *arg, const char *name)
2472 {
2473 	struct spdk_thread *thread;
2474 	struct spdk_interrupt *intr;
2475 
2476 	thread = spdk_get_thread();
2477 	if (!thread) {
2478 		assert(false);
2479 		return NULL;
2480 	}
2481 
2482 	if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
2483 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
2484 		return NULL;
2485 	}
2486 
2487 	if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) {
2488 		return NULL;
2489 	}
2490 
2491 	intr = calloc(1, sizeof(*intr));
2492 	if (intr == NULL) {
2493 		SPDK_ERRLOG("Interrupt handler allocation failed\n");
2494 		return NULL;
2495 	}
2496 
2497 	if (name) {
2498 		snprintf(intr->name, sizeof(intr->name), "%s", name);
2499 	} else {
2500 		snprintf(intr->name, sizeof(intr->name), "%p", fn);
2501 	}
2502 
2503 	intr->efd = efd;
2504 	intr->thread = thread;
2505 
2506 	return intr;
2507 }
2508 
2509 void
2510 spdk_interrupt_unregister(struct spdk_interrupt **pintr)
2511 {
2512 	struct spdk_thread *thread;
2513 	struct spdk_interrupt *intr;
2514 
2515 	intr = *pintr;
2516 	if (intr == NULL) {
2517 		return;
2518 	}
2519 
2520 	*pintr = NULL;
2521 
2522 	thread = spdk_get_thread();
2523 	if (!thread) {
2524 		assert(false);
2525 		return;
2526 	}
2527 
2528 	if (intr->thread != thread) {
2529 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
2530 		assert(false);
2531 		return;
2532 	}
2533 
2534 	spdk_fd_group_remove(thread->fgrp, intr->efd);
2535 	free(intr);
2536 }
2537 
2538 int
2539 spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
2540 			       enum spdk_interrupt_event_types event_types)
2541 {
2542 	struct spdk_thread *thread;
2543 
2544 	thread = spdk_get_thread();
2545 	if (!thread) {
2546 		assert(false);
2547 		return -EINVAL;
2548 	}
2549 
2550 	if (intr->thread != thread) {
2551 		SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n");
2552 		assert(false);
2553 		return -EINVAL;
2554 	}
2555 
2556 	return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
2557 }
2558 
2559 int
2560 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
2561 {
2562 	return spdk_fd_group_get_fd(thread->fgrp);
2563 }
2564 
2565 static bool g_interrupt_mode = false;
2566 
2567 int
2568 spdk_interrupt_mode_enable(void)
2569 {
2570 	/* It must be called once prior to initializing the threading library.
2571 	 * g_spdk_msg_mempool will be valid if thread library is initialized.
2572 	 */
2573 	if (g_spdk_msg_mempool) {
2574 		SPDK_ERRLOG("Failed due to threading library is already initailzied.\n");
2575 		return -1;
2576 	}
2577 
2578 #ifdef __linux__
2579 	SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
2580 	g_interrupt_mode = true;
2581 	return 0;
2582 #else
2583 	SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
2584 	g_interrupt_mode = false;
2585 	return -ENOTSUP;
2586 #endif
2587 }
2588 
2589 bool
2590 spdk_interrupt_mode_is_enabled(void)
2591 {
2592 	return g_interrupt_mode;
2593 }
2594 
2595 SPDK_LOG_REGISTER_COMPONENT(thread)
2596