xref: /spdk/lib/thread/thread.c (revision a6dbe3721eb3b5990707fc3e378c95e505dd8ab5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk/env.h"
9 #include "spdk/likely.h"
10 #include "spdk/queue.h"
11 #include "spdk/string.h"
12 #include "spdk/thread.h"
13 #include "spdk/trace.h"
14 #include "spdk/util.h"
15 #include "spdk/fd_group.h"
16 
17 #include "spdk/log.h"
18 #include "spdk_internal/thread.h"
19 #include "spdk_internal/usdt.h"
20 #include "thread_internal.h"
21 
22 #include "spdk_internal/trace_defs.h"
23 
24 #ifdef __linux__
25 #include <sys/timerfd.h>
26 #include <sys/eventfd.h>
27 #endif
28 
29 #define SPDK_MSG_BATCH_SIZE		8
30 #define SPDK_MAX_DEVICE_NAME_LEN	256
31 #define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
32 #define SPDK_MAX_POLLER_NAME_LEN	256
33 #define SPDK_MAX_THREAD_NAME_LEN	256
34 
35 enum spdk_poller_state {
36 	/* The poller is registered with a thread but not currently executing its fn. */
37 	SPDK_POLLER_STATE_WAITING,
38 
39 	/* The poller is currently running its fn. */
40 	SPDK_POLLER_STATE_RUNNING,
41 
42 	/* The poller was unregistered during the execution of its fn. */
43 	SPDK_POLLER_STATE_UNREGISTERED,
44 
45 	/* The poller is in the process of being paused.  It will be paused
46 	 * during the next time it's supposed to be executed.
47 	 */
48 	SPDK_POLLER_STATE_PAUSING,
49 
50 	/* The poller is registered but currently paused.  It's on the
51 	 * paused_pollers list.
52 	 */
53 	SPDK_POLLER_STATE_PAUSED,
54 };
55 
56 struct spdk_poller {
57 	TAILQ_ENTRY(spdk_poller)	tailq;
58 	RB_ENTRY(spdk_poller)		node;
59 
60 	/* Current state of the poller; should only be accessed from the poller's thread. */
61 	enum spdk_poller_state		state;
62 
63 	uint64_t			period_ticks;
64 	uint64_t			next_run_tick;
65 	uint64_t			run_count;
66 	uint64_t			busy_count;
67 	uint64_t			id;
68 	spdk_poller_fn			fn;
69 	void				*arg;
70 	struct spdk_thread		*thread;
71 	/* Native interruptfd for period or busy poller */
72 	int				interruptfd;
73 	spdk_poller_set_interrupt_mode_cb set_intr_cb_fn;
74 	void				*set_intr_cb_arg;
75 
76 	char				name[SPDK_MAX_POLLER_NAME_LEN + 1];
77 };
78 
79 enum spdk_thread_state {
80 	/* The thread is processing poller and message by spdk_thread_poll(). */
81 	SPDK_THREAD_STATE_RUNNING,
82 
83 	/* The thread is in the process of termination. It reaps unregistering
84 	 * poller are releasing I/O channel.
85 	 */
86 	SPDK_THREAD_STATE_EXITING,
87 
88 	/* The thread is exited. It is ready to call spdk_thread_destroy(). */
89 	SPDK_THREAD_STATE_EXITED,
90 };
91 
92 struct spdk_thread {
93 	uint64_t			tsc_last;
94 	struct spdk_thread_stats	stats;
95 	/*
96 	 * Contains pollers actively running on this thread.  Pollers
97 	 *  are run round-robin. The thread takes one poller from the head
98 	 *  of the ring, executes it, then puts it back at the tail of
99 	 *  the ring.
100 	 */
101 	TAILQ_HEAD(active_pollers_head, spdk_poller)	active_pollers;
102 	/**
103 	 * Contains pollers running on this thread with a periodic timer.
104 	 */
105 	RB_HEAD(timed_pollers_tree, spdk_poller)	timed_pollers;
106 	struct spdk_poller				*first_timed_poller;
107 	/*
108 	 * Contains paused pollers.  Pollers on this queue are waiting until
109 	 * they are resumed (in which case they're put onto the active/timer
110 	 * queues) or unregistered.
111 	 */
112 	TAILQ_HEAD(paused_pollers_head, spdk_poller)	paused_pollers;
113 	struct spdk_ring		*messages;
114 	int				msg_fd;
115 	SLIST_HEAD(, spdk_msg)		msg_cache;
116 	size_t				msg_cache_count;
117 	spdk_msg_fn			critical_msg;
118 	uint64_t			id;
119 	uint64_t			next_poller_id;
120 	enum spdk_thread_state		state;
121 	int				pending_unregister_count;
122 
123 	RB_HEAD(io_channel_tree, spdk_io_channel)	io_channels;
124 	TAILQ_ENTRY(spdk_thread)			tailq;
125 
126 	char				name[SPDK_MAX_THREAD_NAME_LEN + 1];
127 	struct spdk_cpuset		cpumask;
128 	uint64_t			exit_timeout_tsc;
129 
130 	/* Indicates whether this spdk_thread currently runs in interrupt. */
131 	bool				in_interrupt;
132 	bool				poller_unregistered;
133 	struct spdk_fd_group		*fgrp;
134 
135 	/* User context allocated at the end */
136 	uint8_t				ctx[0];
137 };
138 
139 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
140 
141 static spdk_new_thread_fn g_new_thread_fn = NULL;
142 static spdk_thread_op_fn g_thread_op_fn = NULL;
143 static spdk_thread_op_supported_fn g_thread_op_supported_fn;
144 static size_t g_ctx_sz = 0;
145 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
146  * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
147  * SPDK application is required.
148  */
149 static uint64_t g_thread_id = 1;
150 
151 struct io_device {
152 	void				*io_device;
153 	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
154 	spdk_io_channel_create_cb	create_cb;
155 	spdk_io_channel_destroy_cb	destroy_cb;
156 	spdk_io_device_unregister_cb	unregister_cb;
157 	struct spdk_thread		*unregister_thread;
158 	uint32_t			ctx_size;
159 	uint32_t			for_each_count;
160 	RB_ENTRY(io_device)		node;
161 
162 	uint32_t			refcnt;
163 
164 	bool				pending_unregister;
165 	bool				unregistered;
166 };
167 
168 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices);
169 
170 static int
171 io_device_cmp(struct io_device *dev1, struct io_device *dev2)
172 {
173 	return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device);
174 }
175 
176 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp);
177 
178 static int
179 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2)
180 {
181 	return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev);
182 }
183 
184 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp);
185 
186 struct spdk_msg {
187 	spdk_msg_fn		fn;
188 	void			*arg;
189 
190 	SLIST_ENTRY(spdk_msg)	link;
191 };
192 
193 #define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
194 static struct spdk_mempool *g_spdk_msg_mempool = NULL;
195 
196 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
197 static uint32_t g_thread_count = 0;
198 
199 static __thread struct spdk_thread *tls_thread = NULL;
200 
201 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD)
202 {
203 	spdk_trace_register_description("THREAD_IOCH_GET",
204 					TRACE_THREAD_IOCH_GET,
205 					OWNER_NONE, OBJECT_NONE, 0,
206 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
207 	spdk_trace_register_description("THREAD_IOCH_PUT",
208 					TRACE_THREAD_IOCH_PUT,
209 					OWNER_NONE, OBJECT_NONE, 0,
210 					SPDK_TRACE_ARG_TYPE_INT, "refcnt");
211 }
212 
213 /*
214  * If this compare function returns zero when two next_run_ticks are equal,
215  * the macro RB_INSERT() returns a pointer to the element with the same
216  * next_run_tick.
217  *
218  * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element
219  * to remove as a parameter.
220  *
221  * Hence we allow RB_INSERT() to insert elements with the same keys on the right
222  * side by returning 1 when two next_run_ticks are equal.
223  */
224 static inline int
225 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2)
226 {
227 	if (poller1->next_run_tick < poller2->next_run_tick) {
228 		return -1;
229 	} else {
230 		return 1;
231 	}
232 }
233 
234 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare);
235 
236 static inline struct spdk_thread *
237 _get_thread(void)
238 {
239 	return tls_thread;
240 }
241 
242 static int
243 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz)
244 {
245 	char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
246 
247 	g_ctx_sz = ctx_sz;
248 
249 	snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
250 	g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz,
251 			     sizeof(struct spdk_msg),
252 			     0, /* No cache. We do our own. */
253 			     SPDK_ENV_SOCKET_ID_ANY);
254 
255 	SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n",
256 		      msg_mempool_sz);
257 
258 	if (!g_spdk_msg_mempool) {
259 		SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
260 		return -ENOMEM;
261 	}
262 
263 	return 0;
264 }
265 
266 int
267 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
268 {
269 	assert(g_new_thread_fn == NULL);
270 	assert(g_thread_op_fn == NULL);
271 
272 	if (new_thread_fn == NULL) {
273 		SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
274 	} else {
275 		g_new_thread_fn = new_thread_fn;
276 	}
277 
278 	return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE);
279 }
280 
281 int
282 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
283 			 spdk_thread_op_supported_fn thread_op_supported_fn,
284 			 size_t ctx_sz, size_t msg_mempool_sz)
285 {
286 	assert(g_new_thread_fn == NULL);
287 	assert(g_thread_op_fn == NULL);
288 	assert(g_thread_op_supported_fn == NULL);
289 
290 	if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
291 		SPDK_ERRLOG("Both must be defined or undefined together.\n");
292 		return -EINVAL;
293 	}
294 
295 	if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
296 		SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
297 	} else {
298 		g_thread_op_fn = thread_op_fn;
299 		g_thread_op_supported_fn = thread_op_supported_fn;
300 	}
301 
302 	return _thread_lib_init(ctx_sz, msg_mempool_sz);
303 }
304 
305 void
306 spdk_thread_lib_fini(void)
307 {
308 	struct io_device *dev;
309 
310 	RB_FOREACH(dev, io_device_tree, &g_io_devices) {
311 		SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
312 	}
313 
314 	if (g_spdk_msg_mempool) {
315 		spdk_mempool_free(g_spdk_msg_mempool);
316 		g_spdk_msg_mempool = NULL;
317 	}
318 
319 	g_new_thread_fn = NULL;
320 	g_thread_op_fn = NULL;
321 	g_thread_op_supported_fn = NULL;
322 	g_ctx_sz = 0;
323 }
324 
325 static void thread_interrupt_destroy(struct spdk_thread *thread);
326 static int thread_interrupt_create(struct spdk_thread *thread);
327 
328 static void
329 _free_thread(struct spdk_thread *thread)
330 {
331 	struct spdk_io_channel *ch;
332 	struct spdk_msg *msg;
333 	struct spdk_poller *poller, *ptmp;
334 
335 	RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
336 		SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
337 			    thread->name, ch->dev->name);
338 	}
339 
340 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
341 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
342 			SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
343 				     poller->name);
344 		}
345 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
346 		free(poller);
347 	}
348 
349 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) {
350 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
351 			SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
352 				     poller->name);
353 		}
354 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
355 		free(poller);
356 	}
357 
358 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
359 		SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
360 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
361 		free(poller);
362 	}
363 
364 	pthread_mutex_lock(&g_devlist_mutex);
365 	assert(g_thread_count > 0);
366 	g_thread_count--;
367 	TAILQ_REMOVE(&g_threads, thread, tailq);
368 	pthread_mutex_unlock(&g_devlist_mutex);
369 
370 	msg = SLIST_FIRST(&thread->msg_cache);
371 	while (msg != NULL) {
372 		SLIST_REMOVE_HEAD(&thread->msg_cache, link);
373 
374 		assert(thread->msg_cache_count > 0);
375 		thread->msg_cache_count--;
376 		spdk_mempool_put(g_spdk_msg_mempool, msg);
377 
378 		msg = SLIST_FIRST(&thread->msg_cache);
379 	}
380 
381 	assert(thread->msg_cache_count == 0);
382 
383 	if (spdk_interrupt_mode_is_enabled()) {
384 		thread_interrupt_destroy(thread);
385 	}
386 
387 	spdk_ring_free(thread->messages);
388 	free(thread);
389 }
390 
391 struct spdk_thread *
392 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask)
393 {
394 	struct spdk_thread *thread;
395 	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
396 	int rc = 0, i;
397 
398 	thread = calloc(1, sizeof(*thread) + g_ctx_sz);
399 	if (!thread) {
400 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
401 		return NULL;
402 	}
403 
404 	if (cpumask) {
405 		spdk_cpuset_copy(&thread->cpumask, cpumask);
406 	} else {
407 		spdk_cpuset_negate(&thread->cpumask);
408 	}
409 
410 	RB_INIT(&thread->io_channels);
411 	TAILQ_INIT(&thread->active_pollers);
412 	RB_INIT(&thread->timed_pollers);
413 	TAILQ_INIT(&thread->paused_pollers);
414 	SLIST_INIT(&thread->msg_cache);
415 	thread->msg_cache_count = 0;
416 
417 	thread->tsc_last = spdk_get_ticks();
418 
419 	/* Monotonic increasing ID is set to each created poller beginning at 1. Once the
420 	 * ID exceeds UINT64_MAX a warning message is logged
421 	 */
422 	thread->next_poller_id = 1;
423 
424 	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
425 	if (!thread->messages) {
426 		SPDK_ERRLOG("Unable to allocate memory for message ring\n");
427 		free(thread);
428 		return NULL;
429 	}
430 
431 	/* Fill the local message pool cache. */
432 	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
433 	if (rc == 0) {
434 		/* If we can't populate the cache it's ok. The cache will get filled
435 		 * up organically as messages are passed to the thread. */
436 		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
437 			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
438 			thread->msg_cache_count++;
439 		}
440 	}
441 
442 	if (name) {
443 		snprintf(thread->name, sizeof(thread->name), "%s", name);
444 	} else {
445 		snprintf(thread->name, sizeof(thread->name), "%p", thread);
446 	}
447 
448 	pthread_mutex_lock(&g_devlist_mutex);
449 	if (g_thread_id == 0) {
450 		SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
451 		pthread_mutex_unlock(&g_devlist_mutex);
452 		_free_thread(thread);
453 		return NULL;
454 	}
455 	thread->id = g_thread_id++;
456 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
457 	g_thread_count++;
458 	pthread_mutex_unlock(&g_devlist_mutex);
459 
460 	SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
461 		      thread->id, thread->name);
462 
463 	if (spdk_interrupt_mode_is_enabled()) {
464 		thread->in_interrupt = true;
465 		rc = thread_interrupt_create(thread);
466 		if (rc != 0) {
467 			_free_thread(thread);
468 			return NULL;
469 		}
470 	}
471 
472 	if (g_new_thread_fn) {
473 		rc = g_new_thread_fn(thread);
474 	} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
475 		rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
476 	}
477 
478 	if (rc != 0) {
479 		_free_thread(thread);
480 		return NULL;
481 	}
482 
483 	thread->state = SPDK_THREAD_STATE_RUNNING;
484 
485 	return thread;
486 }
487 
488 void
489 spdk_set_thread(struct spdk_thread *thread)
490 {
491 	tls_thread = thread;
492 }
493 
494 static void
495 thread_exit(struct spdk_thread *thread, uint64_t now)
496 {
497 	struct spdk_poller *poller;
498 	struct spdk_io_channel *ch;
499 
500 	if (now >= thread->exit_timeout_tsc) {
501 		SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
502 			    thread->name);
503 		goto exited;
504 	}
505 
506 	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
507 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
508 			SPDK_INFOLOG(thread,
509 				     "thread %s still has active poller %s\n",
510 				     thread->name, poller->name);
511 			return;
512 		}
513 	}
514 
515 	RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) {
516 		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
517 			SPDK_INFOLOG(thread,
518 				     "thread %s still has active timed poller %s\n",
519 				     thread->name, poller->name);
520 			return;
521 		}
522 	}
523 
524 	TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
525 		SPDK_INFOLOG(thread,
526 			     "thread %s still has paused poller %s\n",
527 			     thread->name, poller->name);
528 		return;
529 	}
530 
531 	RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
532 		SPDK_INFOLOG(thread,
533 			     "thread %s still has channel for io_device %s\n",
534 			     thread->name, ch->dev->name);
535 		return;
536 	}
537 
538 	if (thread->pending_unregister_count > 0) {
539 		SPDK_INFOLOG(thread,
540 			     "thread %s is still unregistering io_devices\n",
541 			     thread->name);
542 		return;
543 	}
544 
545 exited:
546 	thread->state = SPDK_THREAD_STATE_EXITED;
547 	if (spdk_unlikely(thread->in_interrupt)) {
548 		g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
549 	}
550 }
551 
552 int
553 spdk_thread_exit(struct spdk_thread *thread)
554 {
555 	SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
556 
557 	assert(tls_thread == thread);
558 
559 	if (thread->state >= SPDK_THREAD_STATE_EXITING) {
560 		SPDK_INFOLOG(thread,
561 			     "thread %s is already exiting\n",
562 			     thread->name);
563 		return 0;
564 	}
565 
566 	thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
567 				   SPDK_THREAD_EXIT_TIMEOUT_SEC);
568 	thread->state = SPDK_THREAD_STATE_EXITING;
569 	return 0;
570 }
571 
572 bool
573 spdk_thread_is_exited(struct spdk_thread *thread)
574 {
575 	return thread->state == SPDK_THREAD_STATE_EXITED;
576 }
577 
578 void
579 spdk_thread_destroy(struct spdk_thread *thread)
580 {
581 	SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
582 
583 	assert(thread->state == SPDK_THREAD_STATE_EXITED);
584 
585 	if (tls_thread == thread) {
586 		tls_thread = NULL;
587 	}
588 
589 	_free_thread(thread);
590 }
591 
592 void *
593 spdk_thread_get_ctx(struct spdk_thread *thread)
594 {
595 	if (g_ctx_sz > 0) {
596 		return thread->ctx;
597 	}
598 
599 	return NULL;
600 }
601 
602 struct spdk_cpuset *
603 spdk_thread_get_cpumask(struct spdk_thread *thread)
604 {
605 	return &thread->cpumask;
606 }
607 
608 int
609 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
610 {
611 	struct spdk_thread *thread;
612 
613 	if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
614 		SPDK_ERRLOG("Framework does not support reschedule operation.\n");
615 		assert(false);
616 		return -ENOTSUP;
617 	}
618 
619 	thread = spdk_get_thread();
620 	if (!thread) {
621 		SPDK_ERRLOG("Called from non-SPDK thread\n");
622 		assert(false);
623 		return -EINVAL;
624 	}
625 
626 	spdk_cpuset_copy(&thread->cpumask, cpumask);
627 
628 	/* Invoke framework's reschedule operation. If this function is called multiple times
629 	 * in a single spdk_thread_poll() context, the last cpumask will be used in the
630 	 * reschedule operation.
631 	 */
632 	g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
633 
634 	return 0;
635 }
636 
637 struct spdk_thread *
638 spdk_thread_get_from_ctx(void *ctx)
639 {
640 	if (ctx == NULL) {
641 		assert(false);
642 		return NULL;
643 	}
644 
645 	assert(g_ctx_sz > 0);
646 
647 	return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
648 }
649 
650 static inline uint32_t
651 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
652 {
653 	unsigned count, i;
654 	void *messages[SPDK_MSG_BATCH_SIZE];
655 	uint64_t notify = 1;
656 	int rc;
657 
658 #ifdef DEBUG
659 	/*
660 	 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
661 	 * so we will never actually read uninitialized data from events, but just to be sure
662 	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
663 	 */
664 	memset(messages, 0, sizeof(messages));
665 #endif
666 
667 	if (max_msgs > 0) {
668 		max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
669 	} else {
670 		max_msgs = SPDK_MSG_BATCH_SIZE;
671 	}
672 
673 	count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
674 	if (spdk_unlikely(thread->in_interrupt) &&
675 	    spdk_ring_count(thread->messages) != 0) {
676 		rc = write(thread->msg_fd, &notify, sizeof(notify));
677 		if (rc < 0) {
678 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
679 		}
680 	}
681 	if (count == 0) {
682 		return 0;
683 	}
684 
685 	for (i = 0; i < count; i++) {
686 		struct spdk_msg *msg = messages[i];
687 
688 		assert(msg != NULL);
689 
690 		SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg);
691 
692 		msg->fn(msg->arg);
693 
694 		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
695 			/* Insert the messages at the head. We want to re-use the hot
696 			 * ones. */
697 			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
698 			thread->msg_cache_count++;
699 		} else {
700 			spdk_mempool_put(g_spdk_msg_mempool, msg);
701 		}
702 	}
703 
704 	return count;
705 }
706 
707 static void
708 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
709 {
710 	struct spdk_poller *tmp __attribute__((unused));
711 
712 	poller->next_run_tick = now + poller->period_ticks;
713 
714 	/*
715 	 * Insert poller in the thread's timed_pollers tree by next scheduled run time
716 	 * as its key.
717 	 */
718 	tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller);
719 	assert(tmp == NULL);
720 
721 	/* Update the cache only if it is empty or the inserted poller is earlier than it.
722 	 * RB_MIN() is not necessary here because all pollers, which has exactly the same
723 	 * next_run_tick as the existing poller, are inserted on the right side.
724 	 */
725 	if (thread->first_timed_poller == NULL ||
726 	    poller->next_run_tick < thread->first_timed_poller->next_run_tick) {
727 		thread->first_timed_poller = poller;
728 	}
729 }
730 
731 static inline void
732 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller)
733 {
734 	struct spdk_poller *tmp __attribute__((unused));
735 
736 	tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
737 	assert(tmp != NULL);
738 
739 	/* This function is not used in any case that is performance critical.
740 	 * Update the cache simply by RB_MIN() if it needs to be changed.
741 	 */
742 	if (thread->first_timed_poller == poller) {
743 		thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers);
744 	}
745 }
746 
747 static void
748 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
749 {
750 	if (poller->period_ticks) {
751 		poller_insert_timer(thread, poller, spdk_get_ticks());
752 	} else {
753 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
754 	}
755 }
756 
757 static inline void
758 thread_update_stats(struct spdk_thread *thread, uint64_t end,
759 		    uint64_t start, int rc)
760 {
761 	if (rc == 0) {
762 		/* Poller status idle */
763 		thread->stats.idle_tsc += end - start;
764 	} else if (rc > 0) {
765 		/* Poller status busy */
766 		thread->stats.busy_tsc += end - start;
767 	}
768 	/* Store end time to use it as start time of the next spdk_thread_poll(). */
769 	thread->tsc_last = end;
770 }
771 
772 static inline int
773 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller)
774 {
775 	int rc;
776 
777 	switch (poller->state) {
778 	case SPDK_POLLER_STATE_UNREGISTERED:
779 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
780 		free(poller);
781 		return 0;
782 	case SPDK_POLLER_STATE_PAUSING:
783 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
784 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
785 		poller->state = SPDK_POLLER_STATE_PAUSED;
786 		return 0;
787 	case SPDK_POLLER_STATE_WAITING:
788 		break;
789 	default:
790 		assert(false);
791 		break;
792 	}
793 
794 	poller->state = SPDK_POLLER_STATE_RUNNING;
795 	rc = poller->fn(poller->arg);
796 
797 	poller->run_count++;
798 	if (rc > 0) {
799 		poller->busy_count++;
800 	}
801 
802 #ifdef DEBUG
803 	if (rc == -1) {
804 		SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
805 	}
806 #endif
807 
808 	switch (poller->state) {
809 	case SPDK_POLLER_STATE_UNREGISTERED:
810 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
811 		free(poller);
812 		break;
813 	case SPDK_POLLER_STATE_PAUSING:
814 		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
815 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
816 		poller->state = SPDK_POLLER_STATE_PAUSED;
817 		break;
818 	case SPDK_POLLER_STATE_PAUSED:
819 	case SPDK_POLLER_STATE_WAITING:
820 		break;
821 	case SPDK_POLLER_STATE_RUNNING:
822 		poller->state = SPDK_POLLER_STATE_WAITING;
823 		break;
824 	default:
825 		assert(false);
826 		break;
827 	}
828 
829 	return rc;
830 }
831 
832 static inline int
833 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller,
834 			    uint64_t now)
835 {
836 	int rc;
837 
838 	switch (poller->state) {
839 	case SPDK_POLLER_STATE_UNREGISTERED:
840 		free(poller);
841 		return 0;
842 	case SPDK_POLLER_STATE_PAUSING:
843 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
844 		poller->state = SPDK_POLLER_STATE_PAUSED;
845 		return 0;
846 	case SPDK_POLLER_STATE_WAITING:
847 		break;
848 	default:
849 		assert(false);
850 		break;
851 	}
852 
853 	poller->state = SPDK_POLLER_STATE_RUNNING;
854 	rc = poller->fn(poller->arg);
855 
856 	poller->run_count++;
857 	if (rc > 0) {
858 		poller->busy_count++;
859 	}
860 
861 #ifdef DEBUG
862 	if (rc == -1) {
863 		SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
864 	}
865 #endif
866 
867 	switch (poller->state) {
868 	case SPDK_POLLER_STATE_UNREGISTERED:
869 		free(poller);
870 		break;
871 	case SPDK_POLLER_STATE_PAUSING:
872 		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
873 		poller->state = SPDK_POLLER_STATE_PAUSED;
874 		break;
875 	case SPDK_POLLER_STATE_PAUSED:
876 		break;
877 	case SPDK_POLLER_STATE_RUNNING:
878 		poller->state = SPDK_POLLER_STATE_WAITING;
879 	/* fallthrough */
880 	case SPDK_POLLER_STATE_WAITING:
881 		poller_insert_timer(thread, poller, now);
882 		break;
883 	default:
884 		assert(false);
885 		break;
886 	}
887 
888 	return rc;
889 }
890 
891 static int
892 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
893 {
894 	uint32_t msg_count;
895 	struct spdk_poller *poller, *tmp;
896 	spdk_msg_fn critical_msg;
897 	int rc = 0;
898 
899 	thread->tsc_last = now;
900 
901 	critical_msg = thread->critical_msg;
902 	if (spdk_unlikely(critical_msg != NULL)) {
903 		critical_msg(NULL);
904 		thread->critical_msg = NULL;
905 		rc = 1;
906 	}
907 
908 	msg_count = msg_queue_run_batch(thread, max_msgs);
909 	if (msg_count) {
910 		rc = 1;
911 	}
912 
913 	TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
914 				   active_pollers_head, tailq, tmp) {
915 		int poller_rc;
916 
917 		poller_rc = thread_execute_poller(thread, poller);
918 		if (poller_rc > rc) {
919 			rc = poller_rc;
920 		}
921 	}
922 
923 	poller = thread->first_timed_poller;
924 	while (poller != NULL) {
925 		int timer_rc = 0;
926 
927 		if (now < poller->next_run_tick) {
928 			break;
929 		}
930 
931 		tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller);
932 		RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
933 
934 		/* Update the cache to the next timed poller in the list
935 		 * only if the current poller is still the closest, otherwise,
936 		 * do nothing because the cache has been already updated.
937 		 */
938 		if (thread->first_timed_poller == poller) {
939 			thread->first_timed_poller = tmp;
940 		}
941 
942 		timer_rc = thread_execute_timed_poller(thread, poller, now);
943 		if (timer_rc > rc) {
944 			rc = timer_rc;
945 		}
946 
947 		poller = tmp;
948 	}
949 
950 	return rc;
951 }
952 
953 int
954 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
955 {
956 	struct spdk_thread *orig_thread;
957 	int rc;
958 	uint64_t notify = 1;
959 
960 	orig_thread = _get_thread();
961 	tls_thread = thread;
962 
963 	if (now == 0) {
964 		now = spdk_get_ticks();
965 	}
966 
967 	if (spdk_likely(!thread->in_interrupt)) {
968 		rc = thread_poll(thread, max_msgs, now);
969 		if (spdk_unlikely(thread->in_interrupt)) {
970 			/* The thread transitioned to interrupt mode during the above poll.
971 			 * Poll it one more time in case that during the transition time
972 			 * there is msg received without notification.
973 			 */
974 			rc = thread_poll(thread, max_msgs, now);
975 		}
976 	} else {
977 		/* Non-block wait on thread's fd_group */
978 		rc = spdk_fd_group_wait(thread->fgrp, 0);
979 		if (spdk_unlikely(!thread->in_interrupt)) {
980 			/* The thread transitioned to poll mode in a msg during the above processing.
981 			 * Clear msg_fd since thread messages will be polled directly in poll mode.
982 			 */
983 			rc = read(thread->msg_fd, &notify, sizeof(notify));
984 			if (rc < 0 && errno != EAGAIN) {
985 				SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno));
986 			}
987 		}
988 
989 		/* Reap unregistered pollers out of poller execution in intr mode */
990 		if (spdk_unlikely(thread->poller_unregistered)) {
991 			struct spdk_poller *poller, *tmp;
992 
993 			TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
994 						   active_pollers_head, tailq, tmp) {
995 				if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
996 					TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
997 					free(poller);
998 				}
999 			}
1000 
1001 			RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
1002 				if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1003 					poller_remove_timer(thread, poller);
1004 					free(poller);
1005 				}
1006 			}
1007 
1008 			thread->poller_unregistered = false;
1009 		}
1010 	}
1011 
1012 
1013 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
1014 		thread_exit(thread, now);
1015 	}
1016 
1017 	thread_update_stats(thread, spdk_get_ticks(), now, rc);
1018 
1019 	tls_thread = orig_thread;
1020 
1021 	return rc;
1022 }
1023 
1024 uint64_t
1025 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
1026 {
1027 	struct spdk_poller *poller;
1028 
1029 	poller = thread->first_timed_poller;
1030 	if (poller) {
1031 		return poller->next_run_tick;
1032 	}
1033 
1034 	return 0;
1035 }
1036 
1037 int
1038 spdk_thread_has_active_pollers(struct spdk_thread *thread)
1039 {
1040 	return !TAILQ_EMPTY(&thread->active_pollers);
1041 }
1042 
1043 static bool
1044 thread_has_unpaused_pollers(struct spdk_thread *thread)
1045 {
1046 	if (TAILQ_EMPTY(&thread->active_pollers) &&
1047 	    RB_EMPTY(&thread->timed_pollers)) {
1048 		return false;
1049 	}
1050 
1051 	return true;
1052 }
1053 
1054 bool
1055 spdk_thread_has_pollers(struct spdk_thread *thread)
1056 {
1057 	if (!thread_has_unpaused_pollers(thread) &&
1058 	    TAILQ_EMPTY(&thread->paused_pollers)) {
1059 		return false;
1060 	}
1061 
1062 	return true;
1063 }
1064 
1065 bool
1066 spdk_thread_is_idle(struct spdk_thread *thread)
1067 {
1068 	if (spdk_ring_count(thread->messages) ||
1069 	    thread_has_unpaused_pollers(thread) ||
1070 	    thread->critical_msg != NULL) {
1071 		return false;
1072 	}
1073 
1074 	return true;
1075 }
1076 
1077 uint32_t
1078 spdk_thread_get_count(void)
1079 {
1080 	/*
1081 	 * Return cached value of the current thread count.  We could acquire the
1082 	 *  lock and iterate through the TAILQ of threads to count them, but that
1083 	 *  count could still be invalidated after we release the lock.
1084 	 */
1085 	return g_thread_count;
1086 }
1087 
1088 struct spdk_thread *
1089 spdk_get_thread(void)
1090 {
1091 	return _get_thread();
1092 }
1093 
1094 const char *
1095 spdk_thread_get_name(const struct spdk_thread *thread)
1096 {
1097 	return thread->name;
1098 }
1099 
1100 uint64_t
1101 spdk_thread_get_id(const struct spdk_thread *thread)
1102 {
1103 	return thread->id;
1104 }
1105 
1106 struct spdk_thread *
1107 spdk_thread_get_by_id(uint64_t id)
1108 {
1109 	struct spdk_thread *thread;
1110 
1111 	if (id == 0 || id >= g_thread_id) {
1112 		SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
1113 		return NULL;
1114 	}
1115 	pthread_mutex_lock(&g_devlist_mutex);
1116 	TAILQ_FOREACH(thread, &g_threads, tailq) {
1117 		if (thread->id == id) {
1118 			break;
1119 		}
1120 	}
1121 	pthread_mutex_unlock(&g_devlist_mutex);
1122 	return thread;
1123 }
1124 
1125 int
1126 spdk_thread_get_stats(struct spdk_thread_stats *stats)
1127 {
1128 	struct spdk_thread *thread;
1129 
1130 	thread = _get_thread();
1131 	if (!thread) {
1132 		SPDK_ERRLOG("No thread allocated\n");
1133 		return -EINVAL;
1134 	}
1135 
1136 	if (stats == NULL) {
1137 		return -EINVAL;
1138 	}
1139 
1140 	*stats = thread->stats;
1141 
1142 	return 0;
1143 }
1144 
1145 uint64_t
1146 spdk_thread_get_last_tsc(struct spdk_thread *thread)
1147 {
1148 	if (thread == NULL) {
1149 		thread = _get_thread();
1150 	}
1151 
1152 	return thread->tsc_last;
1153 }
1154 
1155 static inline int
1156 thread_send_msg_notification(const struct spdk_thread *target_thread)
1157 {
1158 	uint64_t notify = 1;
1159 	int rc;
1160 
1161 	/* Not necessary to do notification if interrupt facility is not enabled */
1162 	if (spdk_likely(!spdk_interrupt_mode_is_enabled())) {
1163 		return 0;
1164 	}
1165 
1166 	/* When each spdk_thread can switch between poll and interrupt mode dynamically,
1167 	 * after sending thread msg, it is necessary to check whether target thread runs in
1168 	 * interrupt mode and then decide whether do event notification.
1169 	 */
1170 	if (spdk_unlikely(target_thread->in_interrupt)) {
1171 		rc = write(target_thread->msg_fd, &notify, sizeof(notify));
1172 		if (rc < 0) {
1173 			SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
1174 			return -EIO;
1175 		}
1176 	}
1177 
1178 	return 0;
1179 }
1180 
1181 int
1182 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
1183 {
1184 	struct spdk_thread *local_thread;
1185 	struct spdk_msg *msg;
1186 	int rc;
1187 
1188 	assert(thread != NULL);
1189 
1190 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1191 		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
1192 		return -EIO;
1193 	}
1194 
1195 	local_thread = _get_thread();
1196 
1197 	msg = NULL;
1198 	if (local_thread != NULL) {
1199 		if (local_thread->msg_cache_count > 0) {
1200 			msg = SLIST_FIRST(&local_thread->msg_cache);
1201 			assert(msg != NULL);
1202 			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
1203 			local_thread->msg_cache_count--;
1204 		}
1205 	}
1206 
1207 	if (msg == NULL) {
1208 		msg = spdk_mempool_get(g_spdk_msg_mempool);
1209 		if (!msg) {
1210 			SPDK_ERRLOG("msg could not be allocated\n");
1211 			return -ENOMEM;
1212 		}
1213 	}
1214 
1215 	msg->fn = fn;
1216 	msg->arg = ctx;
1217 
1218 	rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
1219 	if (rc != 1) {
1220 		SPDK_ERRLOG("msg could not be enqueued\n");
1221 		spdk_mempool_put(g_spdk_msg_mempool, msg);
1222 		return -EIO;
1223 	}
1224 
1225 	return thread_send_msg_notification(thread);
1226 }
1227 
1228 int
1229 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
1230 {
1231 	spdk_msg_fn expected = NULL;
1232 
1233 	if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
1234 					 __ATOMIC_SEQ_CST)) {
1235 		return -EIO;
1236 	}
1237 
1238 	return thread_send_msg_notification(thread);
1239 }
1240 
1241 #ifdef __linux__
1242 static int
1243 interrupt_timerfd_process(void *arg)
1244 {
1245 	struct spdk_poller *poller = arg;
1246 	uint64_t exp;
1247 	int rc;
1248 
1249 	/* clear the level of interval timer */
1250 	rc = read(poller->interruptfd, &exp, sizeof(exp));
1251 	if (rc < 0) {
1252 		if (rc == -EAGAIN) {
1253 			return 0;
1254 		}
1255 
1256 		return rc;
1257 	}
1258 
1259 	SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg);
1260 
1261 	return poller->fn(poller->arg);
1262 }
1263 
1264 static int
1265 period_poller_interrupt_init(struct spdk_poller *poller)
1266 {
1267 	struct spdk_fd_group *fgrp = poller->thread->fgrp;
1268 	int timerfd;
1269 	int rc;
1270 
1271 	SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name);
1272 	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
1273 	if (timerfd < 0) {
1274 		return -errno;
1275 	}
1276 
1277 	rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller);
1278 	if (rc < 0) {
1279 		close(timerfd);
1280 		return rc;
1281 	}
1282 
1283 	poller->interruptfd = timerfd;
1284 	return 0;
1285 }
1286 
1287 static void
1288 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1289 {
1290 	int timerfd = poller->interruptfd;
1291 	uint64_t now_tick = spdk_get_ticks();
1292 	uint64_t ticks = spdk_get_ticks_hz();
1293 	int ret;
1294 	struct itimerspec new_tv = {};
1295 	struct itimerspec old_tv = {};
1296 
1297 	assert(poller->period_ticks != 0);
1298 	assert(timerfd >= 0);
1299 
1300 	SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name,
1301 		      interrupt_mode ? "interrupt" : "poll");
1302 
1303 	if (interrupt_mode) {
1304 		/* Set repeated timer expiration */
1305 		new_tv.it_interval.tv_sec = poller->period_ticks / ticks;
1306 		new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks;
1307 
1308 		/* Update next timer expiration */
1309 		if (poller->next_run_tick == 0) {
1310 			poller->next_run_tick = now_tick + poller->period_ticks;
1311 		} else if (poller->next_run_tick < now_tick) {
1312 			poller->next_run_tick = now_tick;
1313 		}
1314 
1315 		new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks;
1316 		new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks;
1317 
1318 		ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
1319 		if (ret < 0) {
1320 			SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno);
1321 			assert(false);
1322 		}
1323 	} else {
1324 		/* Disarm the timer */
1325 		ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv);
1326 		if (ret < 0) {
1327 			/* timerfd_settime's failure indicates that the timerfd is in error */
1328 			SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno);
1329 			assert(false);
1330 		}
1331 
1332 		/* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be
1333 		 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC
1334 		 */
1335 		now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \
1336 			   (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC;
1337 		poller_remove_timer(poller->thread, poller);
1338 		poller_insert_timer(poller->thread, poller, now_tick);
1339 	}
1340 }
1341 
1342 static void
1343 poller_interrupt_fini(struct spdk_poller *poller)
1344 {
1345 	SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name);
1346 	assert(poller->interruptfd >= 0);
1347 	spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd);
1348 	close(poller->interruptfd);
1349 	poller->interruptfd = -1;
1350 }
1351 
1352 static int
1353 busy_poller_interrupt_init(struct spdk_poller *poller)
1354 {
1355 	int busy_efd;
1356 	int rc;
1357 
1358 	SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name);
1359 	busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1360 	if (busy_efd < 0) {
1361 		SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name);
1362 		return -errno;
1363 	}
1364 
1365 	rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd,
1366 			       poller->fn, poller->arg, poller->name);
1367 	if (rc < 0) {
1368 		close(busy_efd);
1369 		return rc;
1370 	}
1371 
1372 	poller->interruptfd = busy_efd;
1373 	return 0;
1374 }
1375 
1376 static void
1377 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1378 {
1379 	int busy_efd = poller->interruptfd;
1380 	uint64_t notify = 1;
1381 	int rc __attribute__((unused));
1382 
1383 	assert(busy_efd >= 0);
1384 
1385 	if (interrupt_mode) {
1386 		/* Write without read on eventfd will get it repeatedly triggered. */
1387 		if (write(busy_efd, &notify, sizeof(notify)) < 0) {
1388 			SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name);
1389 		}
1390 	} else {
1391 		/* Read on eventfd will clear its level triggering. */
1392 		rc = read(busy_efd, &notify, sizeof(notify));
1393 	}
1394 }
1395 
1396 #else
1397 
1398 static int
1399 period_poller_interrupt_init(struct spdk_poller *poller)
1400 {
1401 	return -ENOTSUP;
1402 }
1403 
1404 static void
1405 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1406 {
1407 }
1408 
1409 static void
1410 poller_interrupt_fini(struct spdk_poller *poller)
1411 {
1412 }
1413 
1414 static int
1415 busy_poller_interrupt_init(struct spdk_poller *poller)
1416 {
1417 	return -ENOTSUP;
1418 }
1419 
1420 static void
1421 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1422 {
1423 }
1424 
1425 #endif
1426 
1427 void
1428 spdk_poller_register_interrupt(struct spdk_poller *poller,
1429 			       spdk_poller_set_interrupt_mode_cb cb_fn,
1430 			       void *cb_arg)
1431 {
1432 	assert(poller != NULL);
1433 	assert(cb_fn != NULL);
1434 	assert(spdk_get_thread() == poller->thread);
1435 
1436 	if (!spdk_interrupt_mode_is_enabled()) {
1437 		return;
1438 	}
1439 
1440 	/* when a poller is created we don't know if the user is ever going to
1441 	 * enable interrupts on it by calling this function, so the poller
1442 	 * registration function has to immediately create a interruptfd.
1443 	 * When this function does get called by user, we have to then destroy
1444 	 * that interruptfd.
1445 	 */
1446 	if (poller->set_intr_cb_fn && poller->interruptfd >= 0) {
1447 		poller_interrupt_fini(poller);
1448 	}
1449 
1450 	poller->set_intr_cb_fn = cb_fn;
1451 	poller->set_intr_cb_arg = cb_arg;
1452 
1453 	/* Set poller into interrupt mode if thread is in interrupt. */
1454 	if (poller->thread->in_interrupt) {
1455 		poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true);
1456 	}
1457 }
1458 
1459 static uint64_t
1460 convert_us_to_ticks(uint64_t us)
1461 {
1462 	uint64_t quotient, remainder, ticks;
1463 
1464 	if (us) {
1465 		quotient = us / SPDK_SEC_TO_USEC;
1466 		remainder = us % SPDK_SEC_TO_USEC;
1467 		ticks = spdk_get_ticks_hz();
1468 
1469 		return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
1470 	} else {
1471 		return 0;
1472 	}
1473 }
1474 
1475 static struct spdk_poller *
1476 poller_register(spdk_poller_fn fn,
1477 		void *arg,
1478 		uint64_t period_microseconds,
1479 		const char *name)
1480 {
1481 	struct spdk_thread *thread;
1482 	struct spdk_poller *poller;
1483 
1484 	thread = spdk_get_thread();
1485 	if (!thread) {
1486 		assert(false);
1487 		return NULL;
1488 	}
1489 
1490 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1491 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1492 		return NULL;
1493 	}
1494 
1495 	poller = calloc(1, sizeof(*poller));
1496 	if (poller == NULL) {
1497 		SPDK_ERRLOG("Poller memory allocation failed\n");
1498 		return NULL;
1499 	}
1500 
1501 	if (name) {
1502 		snprintf(poller->name, sizeof(poller->name), "%s", name);
1503 	} else {
1504 		snprintf(poller->name, sizeof(poller->name), "%p", fn);
1505 	}
1506 
1507 	poller->state = SPDK_POLLER_STATE_WAITING;
1508 	poller->fn = fn;
1509 	poller->arg = arg;
1510 	poller->thread = thread;
1511 	poller->interruptfd = -1;
1512 	if (thread->next_poller_id == 0) {
1513 		SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n");
1514 		thread->next_poller_id = 1;
1515 	}
1516 	poller->id = thread->next_poller_id++;
1517 
1518 	poller->period_ticks = convert_us_to_ticks(period_microseconds);
1519 
1520 	if (spdk_interrupt_mode_is_enabled()) {
1521 		int rc;
1522 
1523 		if (period_microseconds) {
1524 			rc = period_poller_interrupt_init(poller);
1525 			if (rc < 0) {
1526 				SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc));
1527 				free(poller);
1528 				return NULL;
1529 			}
1530 
1531 			spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL);
1532 		} else {
1533 			/* If the poller doesn't have a period, create interruptfd that's always
1534 			 * busy automatically when running in interrupt mode.
1535 			 */
1536 			rc = busy_poller_interrupt_init(poller);
1537 			if (rc > 0) {
1538 				SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc));
1539 				free(poller);
1540 				return NULL;
1541 			}
1542 
1543 			spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL);
1544 		}
1545 	}
1546 
1547 	thread_insert_poller(thread, poller);
1548 
1549 	return poller;
1550 }
1551 
1552 struct spdk_poller *
1553 spdk_poller_register(spdk_poller_fn fn,
1554 		     void *arg,
1555 		     uint64_t period_microseconds)
1556 {
1557 	return poller_register(fn, arg, period_microseconds, NULL);
1558 }
1559 
1560 struct spdk_poller *
1561 spdk_poller_register_named(spdk_poller_fn fn,
1562 			   void *arg,
1563 			   uint64_t period_microseconds,
1564 			   const char *name)
1565 {
1566 	return poller_register(fn, arg, period_microseconds, name);
1567 }
1568 
1569 static void
1570 wrong_thread(const char *func, const char *name, struct spdk_thread *thread,
1571 	     struct spdk_thread *curthread)
1572 {
1573 	if (thread == NULL) {
1574 		SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name);
1575 		abort();
1576 	}
1577 	SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be "
1578 		    "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id,
1579 		    thread->name, thread->id);
1580 	assert(false);
1581 }
1582 
1583 void
1584 spdk_poller_unregister(struct spdk_poller **ppoller)
1585 {
1586 	struct spdk_thread *thread;
1587 	struct spdk_poller *poller;
1588 
1589 	poller = *ppoller;
1590 	if (poller == NULL) {
1591 		return;
1592 	}
1593 
1594 	*ppoller = NULL;
1595 
1596 	thread = spdk_get_thread();
1597 	if (!thread) {
1598 		assert(false);
1599 		return;
1600 	}
1601 
1602 	if (poller->thread != thread) {
1603 		wrong_thread(__func__, poller->name, poller->thread, thread);
1604 		return;
1605 	}
1606 
1607 	if (spdk_interrupt_mode_is_enabled()) {
1608 		/* Release the interrupt resource for period or busy poller */
1609 		if (poller->interruptfd >= 0) {
1610 			poller_interrupt_fini(poller);
1611 		}
1612 
1613 		/* Mark there is poller unregistered. Then unregistered pollers will
1614 		 * get reaped by spdk_thread_poll also in intr mode.
1615 		 */
1616 		thread->poller_unregistered = true;
1617 	}
1618 
1619 	/* If the poller was paused, put it on the active_pollers list so that
1620 	 * its unregistration can be processed by spdk_thread_poll().
1621 	 */
1622 	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1623 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1624 		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1625 		poller->period_ticks = 0;
1626 	}
1627 
1628 	/* Simply set the state to unregistered. The poller will get cleaned up
1629 	 * in a subsequent call to spdk_thread_poll().
1630 	 */
1631 	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
1632 }
1633 
1634 void
1635 spdk_poller_pause(struct spdk_poller *poller)
1636 {
1637 	struct spdk_thread *thread;
1638 
1639 	thread = spdk_get_thread();
1640 	if (!thread) {
1641 		assert(false);
1642 		return;
1643 	}
1644 
1645 	if (poller->thread != thread) {
1646 		wrong_thread(__func__, poller->name, poller->thread, thread);
1647 		return;
1648 	}
1649 
1650 	/* We just set its state to SPDK_POLLER_STATE_PAUSING and let
1651 	 * spdk_thread_poll() move it. It allows a poller to be paused from
1652 	 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE
1653 	 * iteration, or from within itself without breaking the logic to always
1654 	 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration.
1655 	 */
1656 	switch (poller->state) {
1657 	case SPDK_POLLER_STATE_PAUSED:
1658 	case SPDK_POLLER_STATE_PAUSING:
1659 		break;
1660 	case SPDK_POLLER_STATE_RUNNING:
1661 	case SPDK_POLLER_STATE_WAITING:
1662 		poller->state = SPDK_POLLER_STATE_PAUSING;
1663 		break;
1664 	default:
1665 		assert(false);
1666 		break;
1667 	}
1668 }
1669 
1670 void
1671 spdk_poller_resume(struct spdk_poller *poller)
1672 {
1673 	struct spdk_thread *thread;
1674 
1675 	thread = spdk_get_thread();
1676 	if (!thread) {
1677 		assert(false);
1678 		return;
1679 	}
1680 
1681 	if (poller->thread != thread) {
1682 		wrong_thread(__func__, poller->name, poller->thread, thread);
1683 		return;
1684 	}
1685 
1686 	/* If a poller is paused it has to be removed from the paused pollers
1687 	 * list and put on the active list or timer tree depending on its
1688 	 * period_ticks.  If a poller is still in the process of being paused,
1689 	 * we just need to flip its state back to waiting, as it's already on
1690 	 * the appropriate list or tree.
1691 	 */
1692 	switch (poller->state) {
1693 	case SPDK_POLLER_STATE_PAUSED:
1694 		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1695 		thread_insert_poller(thread, poller);
1696 	/* fallthrough */
1697 	case SPDK_POLLER_STATE_PAUSING:
1698 		poller->state = SPDK_POLLER_STATE_WAITING;
1699 		break;
1700 	case SPDK_POLLER_STATE_RUNNING:
1701 	case SPDK_POLLER_STATE_WAITING:
1702 		break;
1703 	default:
1704 		assert(false);
1705 		break;
1706 	}
1707 }
1708 
1709 const char *
1710 spdk_poller_get_name(struct spdk_poller *poller)
1711 {
1712 	return poller->name;
1713 }
1714 
1715 uint64_t
1716 spdk_poller_get_id(struct spdk_poller *poller)
1717 {
1718 	return poller->id;
1719 }
1720 
1721 const char *
1722 spdk_poller_get_state_str(struct spdk_poller *poller)
1723 {
1724 	switch (poller->state) {
1725 	case SPDK_POLLER_STATE_WAITING:
1726 		return "waiting";
1727 	case SPDK_POLLER_STATE_RUNNING:
1728 		return "running";
1729 	case SPDK_POLLER_STATE_UNREGISTERED:
1730 		return "unregistered";
1731 	case SPDK_POLLER_STATE_PAUSING:
1732 		return "pausing";
1733 	case SPDK_POLLER_STATE_PAUSED:
1734 		return "paused";
1735 	default:
1736 		return NULL;
1737 	}
1738 }
1739 
1740 uint64_t
1741 spdk_poller_get_period_ticks(struct spdk_poller *poller)
1742 {
1743 	return poller->period_ticks;
1744 }
1745 
1746 void
1747 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats)
1748 {
1749 	stats->run_count = poller->run_count;
1750 	stats->busy_count = poller->busy_count;
1751 }
1752 
1753 struct spdk_poller *
1754 spdk_thread_get_first_active_poller(struct spdk_thread *thread)
1755 {
1756 	return TAILQ_FIRST(&thread->active_pollers);
1757 }
1758 
1759 struct spdk_poller *
1760 spdk_thread_get_next_active_poller(struct spdk_poller *prev)
1761 {
1762 	return TAILQ_NEXT(prev, tailq);
1763 }
1764 
1765 struct spdk_poller *
1766 spdk_thread_get_first_timed_poller(struct spdk_thread *thread)
1767 {
1768 	return RB_MIN(timed_pollers_tree, &thread->timed_pollers);
1769 }
1770 
1771 struct spdk_poller *
1772 spdk_thread_get_next_timed_poller(struct spdk_poller *prev)
1773 {
1774 	return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev);
1775 }
1776 
1777 struct spdk_poller *
1778 spdk_thread_get_first_paused_poller(struct spdk_thread *thread)
1779 {
1780 	return TAILQ_FIRST(&thread->paused_pollers);
1781 }
1782 
1783 struct spdk_poller *
1784 spdk_thread_get_next_paused_poller(struct spdk_poller *prev)
1785 {
1786 	return TAILQ_NEXT(prev, tailq);
1787 }
1788 
1789 struct spdk_io_channel *
1790 spdk_thread_get_first_io_channel(struct spdk_thread *thread)
1791 {
1792 	return RB_MIN(io_channel_tree, &thread->io_channels);
1793 }
1794 
1795 struct spdk_io_channel *
1796 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev)
1797 {
1798 	return RB_NEXT(io_channel_tree, &thread->io_channels, prev);
1799 }
1800 
1801 struct call_thread {
1802 	struct spdk_thread *cur_thread;
1803 	spdk_msg_fn fn;
1804 	void *ctx;
1805 
1806 	struct spdk_thread *orig_thread;
1807 	spdk_msg_fn cpl;
1808 };
1809 
1810 static void
1811 _on_thread(void *ctx)
1812 {
1813 	struct call_thread *ct = ctx;
1814 	int rc __attribute__((unused));
1815 
1816 	ct->fn(ct->ctx);
1817 
1818 	pthread_mutex_lock(&g_devlist_mutex);
1819 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1820 	while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) {
1821 		SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n",
1822 			      ct->cur_thread->name);
1823 		ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1824 	}
1825 	pthread_mutex_unlock(&g_devlist_mutex);
1826 
1827 	if (!ct->cur_thread) {
1828 		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
1829 
1830 		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
1831 		free(ctx);
1832 	} else {
1833 		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
1834 			      ct->cur_thread->name);
1835 
1836 		rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
1837 	}
1838 	assert(rc == 0);
1839 }
1840 
1841 void
1842 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
1843 {
1844 	struct call_thread *ct;
1845 	struct spdk_thread *thread;
1846 	int rc __attribute__((unused));
1847 
1848 	ct = calloc(1, sizeof(*ct));
1849 	if (!ct) {
1850 		SPDK_ERRLOG("Unable to perform thread iteration\n");
1851 		cpl(ctx);
1852 		return;
1853 	}
1854 
1855 	ct->fn = fn;
1856 	ct->ctx = ctx;
1857 	ct->cpl = cpl;
1858 
1859 	thread = _get_thread();
1860 	if (!thread) {
1861 		SPDK_ERRLOG("No thread allocated\n");
1862 		free(ct);
1863 		cpl(ctx);
1864 		return;
1865 	}
1866 	ct->orig_thread = thread;
1867 
1868 	pthread_mutex_lock(&g_devlist_mutex);
1869 	ct->cur_thread = TAILQ_FIRST(&g_threads);
1870 	pthread_mutex_unlock(&g_devlist_mutex);
1871 
1872 	SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
1873 		      ct->orig_thread->name);
1874 
1875 	rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1876 	assert(rc == 0);
1877 }
1878 
1879 static inline void
1880 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode)
1881 {
1882 	if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
1883 		return;
1884 	}
1885 
1886 	if (!poller->set_intr_cb_fn) {
1887 		SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name);
1888 		assert(false);
1889 		return;
1890 	}
1891 
1892 	poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode);
1893 }
1894 
1895 void
1896 spdk_thread_set_interrupt_mode(bool enable_interrupt)
1897 {
1898 	struct spdk_thread *thread = _get_thread();
1899 	struct spdk_poller *poller, *tmp;
1900 
1901 	assert(thread);
1902 	assert(spdk_interrupt_mode_is_enabled());
1903 
1904 	SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n",
1905 		       thread->name,  enable_interrupt ? "intr" : "poll",
1906 		       thread->in_interrupt ? "intr" : "poll");
1907 
1908 	if (thread->in_interrupt == enable_interrupt) {
1909 		return;
1910 	}
1911 
1912 	/* Set pollers to expected mode */
1913 	RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
1914 		poller_set_interrupt_mode(poller, enable_interrupt);
1915 	}
1916 	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) {
1917 		poller_set_interrupt_mode(poller, enable_interrupt);
1918 	}
1919 	/* All paused pollers will go to work in interrupt mode */
1920 	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) {
1921 		poller_set_interrupt_mode(poller, enable_interrupt);
1922 	}
1923 
1924 	thread->in_interrupt = enable_interrupt;
1925 	return;
1926 }
1927 
1928 static struct io_device *
1929 io_device_get(void *io_device)
1930 {
1931 	struct io_device find = {};
1932 
1933 	find.io_device = io_device;
1934 	return RB_FIND(io_device_tree, &g_io_devices, &find);
1935 }
1936 
1937 void
1938 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1939 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1940 			const char *name)
1941 {
1942 	struct io_device *dev, *tmp;
1943 	struct spdk_thread *thread;
1944 
1945 	assert(io_device != NULL);
1946 	assert(create_cb != NULL);
1947 	assert(destroy_cb != NULL);
1948 
1949 	thread = spdk_get_thread();
1950 	if (!thread) {
1951 		SPDK_ERRLOG("called from non-SPDK thread\n");
1952 		assert(false);
1953 		return;
1954 	}
1955 
1956 	dev = calloc(1, sizeof(struct io_device));
1957 	if (dev == NULL) {
1958 		SPDK_ERRLOG("could not allocate io_device\n");
1959 		return;
1960 	}
1961 
1962 	dev->io_device = io_device;
1963 	if (name) {
1964 		snprintf(dev->name, sizeof(dev->name), "%s", name);
1965 	} else {
1966 		snprintf(dev->name, sizeof(dev->name), "%p", dev);
1967 	}
1968 	dev->create_cb = create_cb;
1969 	dev->destroy_cb = destroy_cb;
1970 	dev->unregister_cb = NULL;
1971 	dev->ctx_size = ctx_size;
1972 	dev->for_each_count = 0;
1973 	dev->unregistered = false;
1974 	dev->refcnt = 0;
1975 
1976 	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
1977 		      dev->name, dev->io_device, thread->name);
1978 
1979 	pthread_mutex_lock(&g_devlist_mutex);
1980 	tmp = RB_INSERT(io_device_tree, &g_io_devices, dev);
1981 	if (tmp != NULL) {
1982 		SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1983 			    io_device, tmp->name, dev->name);
1984 		free(dev);
1985 	}
1986 
1987 	pthread_mutex_unlock(&g_devlist_mutex);
1988 }
1989 
1990 static void
1991 _finish_unregister(void *arg)
1992 {
1993 	struct io_device *dev = arg;
1994 	struct spdk_thread *thread;
1995 
1996 	thread = spdk_get_thread();
1997 	assert(thread == dev->unregister_thread);
1998 
1999 	SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
2000 		      dev->name, dev->io_device, thread->name);
2001 
2002 	assert(thread->pending_unregister_count > 0);
2003 	thread->pending_unregister_count--;
2004 
2005 	dev->unregister_cb(dev->io_device);
2006 	free(dev);
2007 }
2008 
2009 static void
2010 io_device_free(struct io_device *dev)
2011 {
2012 	int rc __attribute__((unused));
2013 
2014 	if (dev->unregister_cb == NULL) {
2015 		free(dev);
2016 	} else {
2017 		assert(dev->unregister_thread != NULL);
2018 		SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
2019 			      dev->name, dev->io_device, dev->unregister_thread->name);
2020 		rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
2021 		assert(rc == 0);
2022 	}
2023 }
2024 
2025 void
2026 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
2027 {
2028 	struct io_device *dev;
2029 	uint32_t refcnt;
2030 	struct spdk_thread *thread;
2031 
2032 	thread = spdk_get_thread();
2033 	if (!thread) {
2034 		SPDK_ERRLOG("called from non-SPDK thread\n");
2035 		assert(false);
2036 		return;
2037 	}
2038 
2039 	pthread_mutex_lock(&g_devlist_mutex);
2040 	dev = io_device_get(io_device);
2041 	if (!dev) {
2042 		SPDK_ERRLOG("io_device %p not found\n", io_device);
2043 		assert(false);
2044 		pthread_mutex_unlock(&g_devlist_mutex);
2045 		return;
2046 	}
2047 
2048 	/* The for_each_count check differentiates the user attempting to unregister the
2049 	 * device a second time, from the internal call to this function that occurs
2050 	 * after the for_each_count reaches 0.
2051 	 */
2052 	if (dev->pending_unregister && dev->for_each_count > 0) {
2053 		SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device);
2054 		assert(false);
2055 		pthread_mutex_unlock(&g_devlist_mutex);
2056 		return;
2057 	}
2058 
2059 	dev->unregister_cb = unregister_cb;
2060 	dev->unregister_thread = thread;
2061 
2062 	if (dev->for_each_count > 0) {
2063 		SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n",
2064 			     dev->name, io_device, dev->for_each_count);
2065 		dev->pending_unregister = true;
2066 		pthread_mutex_unlock(&g_devlist_mutex);
2067 		return;
2068 	}
2069 
2070 	dev->unregistered = true;
2071 	RB_REMOVE(io_device_tree, &g_io_devices, dev);
2072 	refcnt = dev->refcnt;
2073 	pthread_mutex_unlock(&g_devlist_mutex);
2074 
2075 	SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
2076 		      dev->name, dev->io_device, thread->name);
2077 
2078 	if (unregister_cb) {
2079 		thread->pending_unregister_count++;
2080 	}
2081 
2082 	if (refcnt > 0) {
2083 		/* defer deletion */
2084 		return;
2085 	}
2086 
2087 	io_device_free(dev);
2088 }
2089 
2090 const char *
2091 spdk_io_device_get_name(struct io_device *dev)
2092 {
2093 	return dev->name;
2094 }
2095 
2096 static struct spdk_io_channel *
2097 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev)
2098 {
2099 	struct spdk_io_channel find = {};
2100 
2101 	find.dev = dev;
2102 	return RB_FIND(io_channel_tree, &thread->io_channels, &find);
2103 }
2104 
2105 struct spdk_io_channel *
2106 spdk_get_io_channel(void *io_device)
2107 {
2108 	struct spdk_io_channel *ch;
2109 	struct spdk_thread *thread;
2110 	struct io_device *dev;
2111 	int rc;
2112 
2113 	pthread_mutex_lock(&g_devlist_mutex);
2114 	dev = io_device_get(io_device);
2115 	if (dev == NULL) {
2116 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
2117 		pthread_mutex_unlock(&g_devlist_mutex);
2118 		return NULL;
2119 	}
2120 
2121 	thread = _get_thread();
2122 	if (!thread) {
2123 		SPDK_ERRLOG("No thread allocated\n");
2124 		pthread_mutex_unlock(&g_devlist_mutex);
2125 		return NULL;
2126 	}
2127 
2128 	if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
2129 		SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
2130 		pthread_mutex_unlock(&g_devlist_mutex);
2131 		return NULL;
2132 	}
2133 
2134 	ch = thread_get_io_channel(thread, dev);
2135 	if (ch != NULL) {
2136 		ch->ref++;
2137 
2138 		SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2139 			      ch, dev->name, dev->io_device, thread->name, ch->ref);
2140 
2141 		/*
2142 		 * An I/O channel already exists for this device on this
2143 		 *  thread, so return it.
2144 		 */
2145 		pthread_mutex_unlock(&g_devlist_mutex);
2146 		spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0,
2147 				  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2148 		return ch;
2149 	}
2150 
2151 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
2152 	if (ch == NULL) {
2153 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
2154 		pthread_mutex_unlock(&g_devlist_mutex);
2155 		return NULL;
2156 	}
2157 
2158 	ch->dev = dev;
2159 	ch->destroy_cb = dev->destroy_cb;
2160 	ch->thread = thread;
2161 	ch->ref = 1;
2162 	ch->destroy_ref = 0;
2163 	RB_INSERT(io_channel_tree, &thread->io_channels, ch);
2164 
2165 	SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2166 		      ch, dev->name, dev->io_device, thread->name, ch->ref);
2167 
2168 	dev->refcnt++;
2169 
2170 	pthread_mutex_unlock(&g_devlist_mutex);
2171 
2172 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
2173 	if (rc != 0) {
2174 		pthread_mutex_lock(&g_devlist_mutex);
2175 		RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
2176 		dev->refcnt--;
2177 		free(ch);
2178 		pthread_mutex_unlock(&g_devlist_mutex);
2179 		return NULL;
2180 	}
2181 
2182 	spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1);
2183 	return ch;
2184 }
2185 
2186 static void
2187 put_io_channel(void *arg)
2188 {
2189 	struct spdk_io_channel *ch = arg;
2190 	bool do_remove_dev = true;
2191 	struct spdk_thread *thread;
2192 
2193 	thread = spdk_get_thread();
2194 	if (!thread) {
2195 		SPDK_ERRLOG("called from non-SPDK thread\n");
2196 		assert(false);
2197 		return;
2198 	}
2199 
2200 	SPDK_DEBUGLOG(thread,
2201 		      "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
2202 		      ch, ch->dev->name, ch->dev->io_device, thread->name);
2203 
2204 	assert(ch->thread == thread);
2205 
2206 	ch->destroy_ref--;
2207 
2208 	if (ch->ref > 0 || ch->destroy_ref > 0) {
2209 		/*
2210 		 * Another reference to the associated io_device was requested
2211 		 *  after this message was sent but before it had a chance to
2212 		 *  execute.
2213 		 */
2214 		return;
2215 	}
2216 
2217 	pthread_mutex_lock(&g_devlist_mutex);
2218 	RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
2219 	pthread_mutex_unlock(&g_devlist_mutex);
2220 
2221 	/* Don't hold the devlist mutex while the destroy_cb is called. */
2222 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
2223 
2224 	pthread_mutex_lock(&g_devlist_mutex);
2225 	ch->dev->refcnt--;
2226 
2227 	if (!ch->dev->unregistered) {
2228 		do_remove_dev = false;
2229 	}
2230 
2231 	if (ch->dev->refcnt > 0) {
2232 		do_remove_dev = false;
2233 	}
2234 
2235 	pthread_mutex_unlock(&g_devlist_mutex);
2236 
2237 	if (do_remove_dev) {
2238 		io_device_free(ch->dev);
2239 	}
2240 	free(ch);
2241 }
2242 
2243 void
2244 spdk_put_io_channel(struct spdk_io_channel *ch)
2245 {
2246 	struct spdk_thread *thread;
2247 	int rc __attribute__((unused));
2248 
2249 	spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0,
2250 			  (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
2251 
2252 	thread = spdk_get_thread();
2253 	if (!thread) {
2254 		SPDK_ERRLOG("called from non-SPDK thread\n");
2255 		assert(false);
2256 		return;
2257 	}
2258 
2259 	if (ch->thread != thread) {
2260 		wrong_thread(__func__, "ch", ch->thread, thread);
2261 		return;
2262 	}
2263 
2264 	SPDK_DEBUGLOG(thread,
2265 		      "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2266 		      ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
2267 
2268 	ch->ref--;
2269 
2270 	if (ch->ref == 0) {
2271 		ch->destroy_ref++;
2272 		rc = spdk_thread_send_msg(thread, put_io_channel, ch);
2273 		assert(rc == 0);
2274 	}
2275 }
2276 
2277 struct spdk_io_channel *
2278 spdk_io_channel_from_ctx(void *ctx)
2279 {
2280 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
2281 }
2282 
2283 struct spdk_thread *
2284 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
2285 {
2286 	return ch->thread;
2287 }
2288 
2289 void *
2290 spdk_io_channel_get_io_device(struct spdk_io_channel *ch)
2291 {
2292 	return ch->dev->io_device;
2293 }
2294 
2295 const char *
2296 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch)
2297 {
2298 	return spdk_io_device_get_name(ch->dev);
2299 }
2300 
2301 int
2302 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch)
2303 {
2304 	return ch->ref;
2305 }
2306 
2307 struct spdk_io_channel_iter {
2308 	void *io_device;
2309 	struct io_device *dev;
2310 	spdk_channel_msg fn;
2311 	int status;
2312 	void *ctx;
2313 	struct spdk_io_channel *ch;
2314 
2315 	struct spdk_thread *cur_thread;
2316 
2317 	struct spdk_thread *orig_thread;
2318 	spdk_channel_for_each_cpl cpl;
2319 };
2320 
2321 void *
2322 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
2323 {
2324 	return i->io_device;
2325 }
2326 
2327 struct spdk_io_channel *
2328 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
2329 {
2330 	return i->ch;
2331 }
2332 
2333 void *
2334 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
2335 {
2336 	return i->ctx;
2337 }
2338 
2339 static void
2340 _call_completion(void *ctx)
2341 {
2342 	struct spdk_io_channel_iter *i = ctx;
2343 
2344 	if (i->cpl != NULL) {
2345 		i->cpl(i, i->status);
2346 	}
2347 	free(i);
2348 }
2349 
2350 static void
2351 _call_channel(void *ctx)
2352 {
2353 	struct spdk_io_channel_iter *i = ctx;
2354 	struct spdk_io_channel *ch;
2355 
2356 	/*
2357 	 * It is possible that the channel was deleted before this
2358 	 *  message had a chance to execute.  If so, skip calling
2359 	 *  the fn() on this thread.
2360 	 */
2361 	pthread_mutex_lock(&g_devlist_mutex);
2362 	ch = thread_get_io_channel(i->cur_thread, i->dev);
2363 	pthread_mutex_unlock(&g_devlist_mutex);
2364 
2365 	if (ch) {
2366 		i->fn(i);
2367 	} else {
2368 		spdk_for_each_channel_continue(i, 0);
2369 	}
2370 }
2371 
2372 void
2373 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
2374 		      spdk_channel_for_each_cpl cpl)
2375 {
2376 	struct spdk_thread *thread;
2377 	struct spdk_io_channel *ch;
2378 	struct spdk_io_channel_iter *i;
2379 	int rc __attribute__((unused));
2380 
2381 	i = calloc(1, sizeof(*i));
2382 	if (!i) {
2383 		SPDK_ERRLOG("Unable to allocate iterator\n");
2384 		assert(false);
2385 		return;
2386 	}
2387 
2388 	i->io_device = io_device;
2389 	i->fn = fn;
2390 	i->ctx = ctx;
2391 	i->cpl = cpl;
2392 	i->orig_thread = _get_thread();
2393 
2394 	pthread_mutex_lock(&g_devlist_mutex);
2395 	i->dev = io_device_get(io_device);
2396 	if (i->dev == NULL) {
2397 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
2398 		assert(false);
2399 		i->status = -ENODEV;
2400 		goto end;
2401 	}
2402 
2403 	/* Do not allow new for_each operations if we are already waiting to unregister
2404 	 * the device for other for_each operations to complete.
2405 	 */
2406 	if (i->dev->pending_unregister) {
2407 		SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device);
2408 		i->status = -ENODEV;
2409 		goto end;
2410 	}
2411 
2412 	TAILQ_FOREACH(thread, &g_threads, tailq) {
2413 		ch = thread_get_io_channel(thread, i->dev);
2414 		if (ch != NULL) {
2415 			ch->dev->for_each_count++;
2416 			i->cur_thread = thread;
2417 			i->ch = ch;
2418 			pthread_mutex_unlock(&g_devlist_mutex);
2419 			rc = spdk_thread_send_msg(thread, _call_channel, i);
2420 			assert(rc == 0);
2421 			return;
2422 		}
2423 	}
2424 
2425 end:
2426 	pthread_mutex_unlock(&g_devlist_mutex);
2427 
2428 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2429 	assert(rc == 0);
2430 }
2431 
2432 static void
2433 __pending_unregister(void *arg)
2434 {
2435 	struct io_device *dev = arg;
2436 
2437 	assert(dev->pending_unregister);
2438 	assert(dev->for_each_count == 0);
2439 	spdk_io_device_unregister(dev->io_device, dev->unregister_cb);
2440 }
2441 
2442 void
2443 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
2444 {
2445 	struct spdk_thread *thread;
2446 	struct spdk_io_channel *ch;
2447 	struct io_device *dev;
2448 	int rc __attribute__((unused));
2449 
2450 	assert(i->cur_thread == spdk_get_thread());
2451 
2452 	i->status = status;
2453 
2454 	pthread_mutex_lock(&g_devlist_mutex);
2455 	dev = i->dev;
2456 	if (status) {
2457 		goto end;
2458 	}
2459 
2460 	thread = TAILQ_NEXT(i->cur_thread, tailq);
2461 	while (thread) {
2462 		ch = thread_get_io_channel(thread, dev);
2463 		if (ch != NULL) {
2464 			i->cur_thread = thread;
2465 			i->ch = ch;
2466 			pthread_mutex_unlock(&g_devlist_mutex);
2467 			rc = spdk_thread_send_msg(thread, _call_channel, i);
2468 			assert(rc == 0);
2469 			return;
2470 		}
2471 		thread = TAILQ_NEXT(thread, tailq);
2472 	}
2473 
2474 end:
2475 	dev->for_each_count--;
2476 	i->ch = NULL;
2477 	pthread_mutex_unlock(&g_devlist_mutex);
2478 
2479 	rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
2480 	assert(rc == 0);
2481 
2482 	pthread_mutex_lock(&g_devlist_mutex);
2483 	if (dev->pending_unregister && dev->for_each_count == 0) {
2484 		rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev);
2485 		assert(rc == 0);
2486 	}
2487 	pthread_mutex_unlock(&g_devlist_mutex);
2488 }
2489 
2490 struct spdk_interrupt {
2491 	int			efd;
2492 	struct spdk_thread	*thread;
2493 	char			name[SPDK_MAX_POLLER_NAME_LEN + 1];
2494 };
2495 
2496 static void
2497 thread_interrupt_destroy(struct spdk_thread *thread)
2498 {
2499 	struct spdk_fd_group *fgrp = thread->fgrp;
2500 
2501 	SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
2502 
2503 	if (thread->msg_fd < 0) {
2504 		return;
2505 	}
2506 
2507 	spdk_fd_group_remove(fgrp, thread->msg_fd);
2508 	close(thread->msg_fd);
2509 	thread->msg_fd = -1;
2510 
2511 	spdk_fd_group_destroy(fgrp);
2512 	thread->fgrp = NULL;
2513 }
2514 
2515 #ifdef __linux__
2516 static int
2517 thread_interrupt_msg_process(void *arg)
2518 {
2519 	struct spdk_thread *thread = arg;
2520 	uint32_t msg_count;
2521 	spdk_msg_fn critical_msg;
2522 	int rc = 0;
2523 	uint64_t notify = 1;
2524 
2525 	assert(spdk_interrupt_mode_is_enabled());
2526 
2527 	/* There may be race between msg_acknowledge and another producer's msg_notify,
2528 	 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
2529 	 * This can avoid msg notification missing.
2530 	 */
2531 	rc = read(thread->msg_fd, &notify, sizeof(notify));
2532 	if (rc < 0 && errno != EAGAIN) {
2533 		SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno));
2534 	}
2535 
2536 	critical_msg = thread->critical_msg;
2537 	if (spdk_unlikely(critical_msg != NULL)) {
2538 		critical_msg(NULL);
2539 		thread->critical_msg = NULL;
2540 		rc = 1;
2541 	}
2542 
2543 	msg_count = msg_queue_run_batch(thread, 0);
2544 	if (msg_count) {
2545 		rc = 1;
2546 	}
2547 
2548 	return rc;
2549 }
2550 
2551 static int
2552 thread_interrupt_create(struct spdk_thread *thread)
2553 {
2554 	int rc;
2555 
2556 	SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
2557 
2558 	rc = spdk_fd_group_create(&thread->fgrp);
2559 	if (rc) {
2560 		return rc;
2561 	}
2562 
2563 	thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2564 	if (thread->msg_fd < 0) {
2565 		rc = -errno;
2566 		spdk_fd_group_destroy(thread->fgrp);
2567 		thread->fgrp = NULL;
2568 
2569 		return rc;
2570 	}
2571 
2572 	return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd,
2573 				 thread_interrupt_msg_process, thread);
2574 }
2575 #else
2576 static int
2577 thread_interrupt_create(struct spdk_thread *thread)
2578 {
2579 	return -ENOTSUP;
2580 }
2581 #endif
2582 
2583 struct spdk_interrupt *
2584 spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
2585 			void *arg, const char *name)
2586 {
2587 	struct spdk_thread *thread;
2588 	struct spdk_interrupt *intr;
2589 	int ret;
2590 
2591 	thread = spdk_get_thread();
2592 	if (!thread) {
2593 		assert(false);
2594 		return NULL;
2595 	}
2596 
2597 	if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
2598 		SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
2599 		return NULL;
2600 	}
2601 
2602 	ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name);
2603 
2604 	if (ret != 0) {
2605 		SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n",
2606 			    thread->name, efd, spdk_strerror(-ret));
2607 		return NULL;
2608 	}
2609 
2610 	intr = calloc(1, sizeof(*intr));
2611 	if (intr == NULL) {
2612 		SPDK_ERRLOG("Interrupt handler allocation failed\n");
2613 		return NULL;
2614 	}
2615 
2616 	if (name) {
2617 		snprintf(intr->name, sizeof(intr->name), "%s", name);
2618 	} else {
2619 		snprintf(intr->name, sizeof(intr->name), "%p", fn);
2620 	}
2621 
2622 	intr->efd = efd;
2623 	intr->thread = thread;
2624 
2625 	return intr;
2626 }
2627 
2628 void
2629 spdk_interrupt_unregister(struct spdk_interrupt **pintr)
2630 {
2631 	struct spdk_thread *thread;
2632 	struct spdk_interrupt *intr;
2633 
2634 	intr = *pintr;
2635 	if (intr == NULL) {
2636 		return;
2637 	}
2638 
2639 	*pintr = NULL;
2640 
2641 	thread = spdk_get_thread();
2642 	if (!thread) {
2643 		assert(false);
2644 		return;
2645 	}
2646 
2647 	if (intr->thread != thread) {
2648 		wrong_thread(__func__, intr->name, intr->thread, thread);
2649 		return;
2650 	}
2651 
2652 	spdk_fd_group_remove(thread->fgrp, intr->efd);
2653 	free(intr);
2654 }
2655 
2656 int
2657 spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
2658 			       enum spdk_interrupt_event_types event_types)
2659 {
2660 	struct spdk_thread *thread;
2661 
2662 	thread = spdk_get_thread();
2663 	if (!thread) {
2664 		assert(false);
2665 		return -EINVAL;
2666 	}
2667 
2668 	if (intr->thread != thread) {
2669 		wrong_thread(__func__, intr->name, intr->thread, thread);
2670 		return -EINVAL;
2671 	}
2672 
2673 	return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
2674 }
2675 
2676 int
2677 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
2678 {
2679 	return spdk_fd_group_get_fd(thread->fgrp);
2680 }
2681 
2682 static bool g_interrupt_mode = false;
2683 
2684 int
2685 spdk_interrupt_mode_enable(void)
2686 {
2687 	/* It must be called once prior to initializing the threading library.
2688 	 * g_spdk_msg_mempool will be valid if thread library is initialized.
2689 	 */
2690 	if (g_spdk_msg_mempool) {
2691 		SPDK_ERRLOG("Failed due to threading library is already initialized.\n");
2692 		return -1;
2693 	}
2694 
2695 #ifdef __linux__
2696 	SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
2697 	g_interrupt_mode = true;
2698 	return 0;
2699 #else
2700 	SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
2701 	g_interrupt_mode = false;
2702 	return -ENOTSUP;
2703 #endif
2704 }
2705 
2706 bool
2707 spdk_interrupt_mode_is_enabled(void)
2708 {
2709 	return g_interrupt_mode;
2710 }
2711 
2712 SPDK_LOG_REGISTER_COMPONENT(thread)
2713