Lines Matching defs:thread

13 #include "spdk/thread.h"
19 #include "spdk_internal/thread.h"
45 struct spdk_thread *thread;
52 /* The poller is registered with a thread but not currently executing its fn. */
76 /* Current state of the poller; should only be accessed from the poller's thread. */
86 struct spdk_thread *thread;
95 /* The thread is processing poller and message by spdk_thread_poll(). */
98 /* The thread is in the process of termination. It reaps unregistering
103 /* The thread is exited. It is ready to call spdk_thread_destroy(). */
118 * Contains pollers actively running on this thread. Pollers
119 * are run round-robin. The thread takes one poller from the head
125 * Contains pollers running on this thread with a periodic timer.
185 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the
186 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
193 /* Trying to use an SPDK lock while not on an SPDK thread */
195 /* Trying to lock a lock already held by this SPDK thread */
197 /* Trying to unlock a lock not held by this SPDK thread */
207 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to
208 * deadlock when another SPDK thread on the same pthread tries to take that lock.
222 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread",
224 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread",
228 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU",
328 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD)
372 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n",
383 static void thread_interrupt_destroy(struct spdk_thread *thread);
384 static int thread_interrupt_create(struct spdk_thread *thread);
387 _free_thread(struct spdk_thread *thread)
393 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
394 SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
395 thread->name, ch->dev->name);
398 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
400 SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
403 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
407 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) {
409 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
412 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
416 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
417 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
418 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
425 TAILQ_REMOVE(&g_threads, thread, tailq);
428 msg = SLIST_FIRST(&thread->msg_cache);
430 SLIST_REMOVE_HEAD(&thread->msg_cache, link);
432 assert(thread->msg_cache_count > 0);
433 thread->msg_cache_count--;
436 msg = SLIST_FIRST(&thread->msg_cache);
439 assert(thread->msg_cache_count == 0);
442 thread_interrupt_destroy(thread);
445 spdk_ring_free(thread->messages);
446 free(thread);
456 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
479 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
515 struct spdk_thread *thread, *null_thread;
516 size_t size = SPDK_ALIGN_CEIL(sizeof(*thread) + g_ctx_sz, SPDK_CACHE_LINE_SIZE);
522 rc = posix_memalign((void **)&thread, SPDK_CACHE_LINE_SIZE, size);
524 SPDK_ERRLOG("Unable to allocate memory for thread\n");
527 memset(thread, 0, size);
530 spdk_cpuset_copy(&thread->cpumask, cpumask);
532 spdk_cpuset_negate(&thread->cpumask);
535 RB_INIT(&thread->io_channels);
536 TAILQ_INIT(&thread->active_pollers);
537 RB_INIT(&thread->timed_pollers);
538 TAILQ_INIT(&thread->paused_pollers);
539 SLIST_INIT(&thread->msg_cache);
540 thread->msg_cache_count = 0;
542 thread->tsc_last = spdk_get_ticks();
547 thread->next_poller_id = 1;
549 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY);
550 if (!thread->messages) {
552 free(thread);
560 * up organically as messages are passed to the thread. */
562 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
563 thread->msg_cache_count++;
568 snprintf(thread->name, sizeof(thread->name), "%s", name);
570 snprintf(thread->name, sizeof(thread->name), "%p", thread);
573 thread->trace_id = spdk_trace_register_owner(OWNER_TYPE_THREAD, thread->name);
577 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
579 _free_thread(thread);
582 thread->id = g_thread_id++;
583 TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
587 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
588 thread->id, thread->name);
591 thread->in_interrupt = true;
592 rc = thread_interrupt_create(thread);
594 _free_thread(thread);
600 rc = g_new_thread_fn(thread);
602 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
606 _free_thread(thread);
610 thread->state = SPDK_THREAD_STATE_RUNNING;
612 /* If this is the first thread, save it as the app thread. Use an atomic
617 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false,
620 return thread;
630 spdk_thread_is_app_thread(struct spdk_thread *thread)
632 if (thread == NULL) {
633 thread = _get_thread();
636 return g_app_thread == thread;
640 spdk_thread_bind(struct spdk_thread *thread, bool bind)
642 thread->is_bound = bind;
646 spdk_thread_is_bound(struct spdk_thread *thread)
648 return thread->is_bound;
652 spdk_set_thread(struct spdk_thread *thread)
654 tls_thread = thread;
658 thread_exit(struct spdk_thread *thread, uint64_t now)
663 if (now >= thread->exit_timeout_tsc) {
664 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
665 thread->name);
669 if (spdk_ring_count(thread->messages) > 0) {
670 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name);
674 if (thread->for_each_count > 0) {
675 SPDK_INFOLOG(thread, "thread %s is still executing %u for_each_channels/threads\n",
676 thread->name, thread->for_each_count);
680 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
682 SPDK_INFOLOG(thread,
683 "thread %s still has active poller %s\n",
684 thread->name, poller->name);
689 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) {
691 SPDK_INFOLOG(thread,
692 "thread %s still has active timed poller %s\n",
693 thread->name, poller->name);
698 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
699 SPDK_INFOLOG(thread,
700 "thread %s still has paused poller %s\n",
701 thread->name, poller->name);
705 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
706 SPDK_INFOLOG(thread,
707 "thread %s still has channel for io_device %s\n",
708 thread->name, ch->dev->name);
712 if (thread->pending_unregister_count > 0) {
713 SPDK_INFOLOG(thread,
714 "thread %s is still unregistering io_devices\n",
715 thread->name);
720 thread->state = SPDK_THREAD_STATE_EXITED;
721 if (spdk_unlikely(thread->in_interrupt)) {
722 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
729 spdk_thread_exit(struct spdk_thread *thread)
731 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
733 assert(tls_thread == thread);
735 if (thread->state >= SPDK_THREAD_STATE_EXITING) {
736 SPDK_INFOLOG(thread,
737 "thread %s is already exiting\n",
738 thread->name);
742 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
744 thread->state = SPDK_THREAD_STATE_EXITING;
747 spdk_thread_send_msg(thread, _thread_exit, thread);
754 spdk_thread_is_running(struct spdk_thread *thread)
756 return thread->state == SPDK_THREAD_STATE_RUNNING;
760 spdk_thread_is_exited(struct spdk_thread *thread)
762 return thread->state == SPDK_THREAD_STATE_EXITED;
766 spdk_thread_destroy(struct spdk_thread *thread)
768 assert(thread != NULL);
769 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
771 assert(thread->state == SPDK_THREAD_STATE_EXITED);
773 if (tls_thread == thread) {
777 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */
778 if (thread != g_app_thread) {
779 _free_thread(thread);
784 spdk_thread_get_ctx(struct spdk_thread *thread)
787 return thread->ctx;
794 spdk_thread_get_cpumask(struct spdk_thread *thread)
796 return &thread->cpumask;
802 struct spdk_thread *thread;
810 thread = spdk_get_thread();
811 if (!thread) {
812 SPDK_ERRLOG("Called from non-SPDK thread\n");
817 spdk_cpuset_copy(&thread->cpumask, cpumask);
823 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
842 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
864 count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
865 if (spdk_unlikely(thread->in_interrupt) &&
866 spdk_ring_count(thread->messages) != 0) {
867 rc = write(thread->msg_fd, &notify, sizeof(notify));
885 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
887 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
890 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
891 thread->msg_cache_count++;
901 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
908 * Insert poller in the thread's timed_pollers tree by next scheduled run time
911 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller);
918 if (thread->first_timed_poller == NULL ||
919 poller->next_run_tick < thread->first_timed_poller->next_run_tick) {
920 thread->first_timed_poller = poller;
925 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller)
929 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
935 if (thread->first_timed_poller == poller) {
936 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers);
941 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
944 poller_insert_timer(thread, poller, spdk_get_ticks());
946 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
951 thread_update_stats(struct spdk_thread *thread, uint64_t end,
956 thread->stats.idle_tsc += end - start;
959 thread->stats.busy_tsc += end - start;
962 thread->tsc_last = end;
966 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller)
972 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
976 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
977 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
990 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
999 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
1005 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1009 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1010 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1028 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller,
1038 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1051 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
1060 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
1069 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1078 poller_insert_timer(thread, poller, now);
1089 thread_run_pp_handlers(struct spdk_thread *thread)
1091 uint8_t i, count = thread->num_pp_handlers;
1094 thread->num_pp_handlers = SPDK_THREAD_MAX_POST_POLLER_HANDLERS;
1097 thread->pp_handlers[i].fn(thread->pp_handlers[i].fn_arg);
1098 thread->pp_handlers[i].fn = NULL;
1101 thread->num_pp_handlers = 0;
1105 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
1112 thread->tsc_last = now;
1114 critical_msg = thread->critical_msg;
1117 thread->critical_msg = NULL;
1121 msg_count = msg_queue_run_batch(thread, max_msgs);
1126 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
1130 poller_rc = thread_execute_poller(thread, poller);
1134 if (thread->num_pp_handlers) {
1135 thread_run_pp_handlers(thread);
1139 poller = thread->first_timed_poller;
1147 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller);
1148 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
1154 if (thread->first_timed_poller == poller) {
1155 thread->first_timed_poller = tmp;
1158 timer_rc = thread_execute_timed_poller(thread, poller, now);
1172 struct spdk_thread *thread = ctx;
1175 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
1178 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1183 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
1185 poller_remove_timer(thread, poller);
1190 thread->poller_unregistered = false;
1196 struct spdk_thread *thread = ctx;
1198 assert(thread->state == SPDK_THREAD_STATE_EXITING);
1200 thread_exit(thread, spdk_get_ticks());
1202 if (thread->state != SPDK_THREAD_STATE_EXITED) {
1203 spdk_thread_send_msg(thread, _thread_exit, thread);
1208 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
1214 tls_thread = thread;
1220 if (spdk_likely(!thread->in_interrupt)) {
1221 rc = thread_poll(thread, max_msgs, now);
1222 if (spdk_unlikely(thread->in_interrupt)) {
1223 /* The thread transitioned to interrupt mode during the above poll.
1227 rc = thread_poll(thread, max_msgs, now);
1230 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
1231 thread_exit(thread, now);
1234 /* Non-block wait on thread's fd_group */
1235 rc = spdk_fd_group_wait(thread->fgrp, 0);
1238 thread_update_stats(thread, spdk_get_ticks(), now, rc);
1246 spdk_thread_next_poller_expiration(struct spdk_thread *thread)
1250 poller = thread->first_timed_poller;
1259 spdk_thread_has_active_pollers(struct spdk_thread *thread)
1261 return !TAILQ_EMPTY(&thread->active_pollers);
1265 thread_has_unpaused_pollers(struct spdk_thread *thread)
1267 if (TAILQ_EMPTY(&thread->active_pollers) &&
1268 RB_EMPTY(&thread->timed_pollers)) {
1276 spdk_thread_has_pollers(struct spdk_thread *thread)
1278 if (!thread_has_unpaused_pollers(thread) &&
1279 TAILQ_EMPTY(&thread->paused_pollers)) {
1287 spdk_thread_is_idle(struct spdk_thread *thread)
1289 if (spdk_ring_count(thread->messages) ||
1290 thread_has_unpaused_pollers(thread) ||
1291 thread->critical_msg != NULL) {
1302 * Return cached value of the current thread count. We could acquire the
1316 spdk_thread_get_name(const struct spdk_thread *thread)
1318 return thread->name;
1322 spdk_thread_get_id(const struct spdk_thread *thread)
1324 return thread->id;
1330 struct spdk_thread *thread;
1333 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
1337 TAILQ_FOREACH(thread, &g_threads, tailq) {
1338 if (thread->id == id) {
1343 return thread;
1349 struct spdk_thread *thread;
1351 thread = _get_thread();
1352 if (!thread) {
1353 SPDK_ERRLOG("No thread allocated\n");
1361 *stats = thread->stats;
1367 spdk_thread_get_last_tsc(struct spdk_thread *thread)
1369 if (thread == NULL) {
1370 thread = _get_thread();
1373 return thread->tsc_last;
1388 * after sending thread msg, it is necessary to check whether target thread runs in
1403 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
1409 assert(thread != NULL);
1411 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1412 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
1439 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
1446 return thread_send_msg_notification(thread);
1450 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
1454 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
1459 return thread_send_msg_notification(thread);
1490 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name);
1522 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name,
1559 poller_remove_timer(poller->thread, poller);
1560 poller_insert_timer(poller->thread, poller, now_tick);
1569 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name);
1581 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name);
1654 assert(spdk_get_thread() == poller->thread);
1668 /* Set poller into interrupt mode if thread is in interrupt. */
1669 if (poller->thread->in_interrupt && poller->set_intr_cb_fn) {
1696 struct spdk_thread *thread;
1699 thread = spdk_get_thread();
1700 if (!thread) {
1705 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1706 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
1725 poller->thread = thread;
1727 if (thread->next_poller_id == 0) {
1729 thread->next_poller_id = 1;
1731 poller->id = thread->next_poller_id++;
1764 /* Set poller into interrupt mode if thread is in interrupt. */
1765 if (poller->thread->in_interrupt) {
1770 thread_insert_poller(thread, poller);
1793 wrong_thread(const char *func, const char *name, struct spdk_thread *thread,
1796 if (thread == NULL) {
1797 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name);
1800 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be "
1802 thread->name, thread->id);
1809 struct spdk_thread *thread;
1819 thread = spdk_get_thread();
1820 if (!thread) {
1825 if (poller->thread != thread) {
1826 wrong_thread(__func__, poller->name, poller->thread, thread);
1838 if (!thread->poller_unregistered) {
1839 thread->poller_unregistered = true;
1840 spdk_thread_send_msg(thread, _thread_remove_pollers, thread);
1848 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1849 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
1862 struct spdk_thread *thread;
1864 thread = spdk_get_thread();
1865 if (!thread) {
1870 if (poller->thread != thread) {
1871 wrong_thread(__func__, poller->name, poller->thread, thread);
1898 struct spdk_thread *thread;
1900 thread = spdk_get_thread();
1901 if (!thread) {
1906 if (poller->thread != thread) {
1907 wrong_thread(__func__, poller->name, poller->thread, thread);
1919 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1920 thread_insert_poller(thread, poller);
1979 spdk_thread_get_first_active_poller(struct spdk_thread *thread)
1981 return TAILQ_FIRST(&thread->active_pollers);
1991 spdk_thread_get_first_timed_poller(struct spdk_thread *thread)
1993 return RB_MIN(timed_pollers_tree, &thread->timed_pollers);
1999 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev);
2003 spdk_thread_get_first_paused_poller(struct spdk_thread *thread)
2005 return TAILQ_FIRST(&thread->paused_pollers);
2015 spdk_thread_get_first_io_channel(struct spdk_thread *thread)
2017 return RB_MIN(io_channel_tree, &thread->io_channels);
2023 return RB_NEXT(io_channel_tree, &thread->io_channels, prev);
2027 spdk_thread_get_trace_id(struct spdk_thread *thread)
2029 return thread->trace_id;
2064 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n",
2071 SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
2075 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
2087 struct spdk_thread *thread;
2092 SPDK_ERRLOG("Unable to perform thread iteration\n");
2101 thread = _get_thread();
2102 if (!thread) {
2103 SPDK_ERRLOG("No thread allocated\n");
2108 ct->orig_thread = thread;
2116 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
2138 struct spdk_thread *thread = _get_thread();
2141 assert(thread);
2145 thread->name, enable_interrupt ? "intr" : "poll",
2146 thread->in_interrupt ? "intr" : "poll");
2148 if (thread->in_interrupt == enable_interrupt) {
2153 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
2156 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) {
2160 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) {
2164 thread->in_interrupt = enable_interrupt;
2183 struct spdk_thread *thread;
2189 thread = spdk_get_thread();
2190 if (!thread) {
2191 SPDK_ERRLOG("called from non-SPDK thread\n");
2216 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
2217 dev->name, dev->io_device, thread->name);
2234 struct spdk_thread *thread;
2236 thread = spdk_get_thread();
2237 assert(thread == dev->unregister_thread);
2239 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
2240 dev->name, dev->io_device, thread->name);
2242 assert(thread->pending_unregister_count > 0);
2243 thread->pending_unregister_count--;
2258 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
2270 struct spdk_thread *thread;
2272 thread = spdk_get_thread();
2273 if (!thread) {
2274 SPDK_ERRLOG("called from non-SPDK thread\n");
2300 dev->unregister_thread = thread;
2315 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
2316 dev->name, dev->io_device, thread->name);
2319 thread->pending_unregister_count++;
2337 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev)
2342 return RB_FIND(io_channel_tree, &thread->io_channels, &find);
2349 struct spdk_thread *thread;
2361 thread = _get_thread();
2362 if (!thread) {
2363 SPDK_ERRLOG("No thread allocated\n");
2368 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
2369 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
2374 ch = thread_get_io_channel(thread, dev);
2378 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2379 ch, dev->name, dev->io_device, thread->name, ch->ref);
2383 * thread, so return it.
2400 ch->thread = thread;
2403 RB_INSERT(io_channel_tree, &thread->io_channels, ch);
2405 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2406 ch, dev->name, dev->io_device, thread->name, ch->ref);
2415 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
2433 struct spdk_thread *thread;
2435 thread = spdk_get_thread();
2436 if (!thread) {
2437 SPDK_ERRLOG("called from non-SPDK thread\n");
2442 SPDK_DEBUGLOG(thread,
2443 "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
2444 ch, ch->dev->name, ch->dev->io_device, thread->name);
2446 assert(ch->thread == thread);
2460 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
2488 struct spdk_thread *thread;
2494 thread = spdk_get_thread();
2495 if (!thread) {
2496 SPDK_ERRLOG("called from non-SPDK thread\n");
2501 if (ch->thread != thread) {
2502 wrong_thread(__func__, "ch", ch->thread, thread);
2506 SPDK_DEBUGLOG(thread,
2507 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
2508 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
2514 rc = spdk_thread_send_msg(thread, put_io_channel, ch);
2528 return ch->thread;
2604 * the fn() on this thread.
2621 struct spdk_thread *thread;
2659 TAILQ_FOREACH(thread, &g_threads, tailq) {
2660 ch = thread_get_io_channel(thread, i->dev);
2663 i->cur_thread = thread;
2666 rc = spdk_thread_send_msg(thread, _call_channel, i);
2692 struct spdk_thread *thread;
2707 thread = TAILQ_NEXT(i->cur_thread, tailq);
2708 while (thread) {
2709 ch = thread_get_io_channel(thread, dev);
2711 i->cur_thread = thread;
2714 rc = spdk_thread_send_msg(thread, _call_channel, i);
2718 thread = TAILQ_NEXT(thread, tailq);
2738 thread_interrupt_destroy(struct spdk_thread *thread)
2740 struct spdk_fd_group *fgrp = thread->fgrp;
2742 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
2744 if (thread->msg_fd < 0) {
2748 spdk_fd_group_remove(fgrp, thread->msg_fd);
2749 close(thread->msg_fd);
2750 thread->msg_fd = -1;
2753 thread->fgrp = NULL;
2760 struct spdk_thread *thread = arg;
2770 spdk_set_thread(thread);
2772 critical_msg = thread->critical_msg;
2775 thread->critical_msg = NULL;
2779 msg_count = msg_queue_run_batch(thread, 0);
2784 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
2785 if (spdk_unlikely(!thread->in_interrupt)) {
2786 /* The thread transitioned to poll mode in a msg during the above processing.
2787 * Clear msg_fd since thread messages will be polled directly in poll mode.
2789 rc = read(thread->msg_fd, &notify, sizeof(notify));
2800 thread_interrupt_create(struct spdk_thread *thread)
2805 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
2807 rc = spdk_fd_group_create(&thread->fgrp);
2809 thread->msg_fd = -1;
2813 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2814 if (thread->msg_fd < 0) {
2816 spdk_fd_group_destroy(thread->fgrp);
2817 thread->fgrp = NULL;
2825 return SPDK_FD_GROUP_ADD_EXT(thread->fgrp, thread->msg_fd,
2826 thread_interrupt_msg_process, thread, &opts);
2830 thread_interrupt_create(struct spdk_thread *thread)
2840 struct spdk_thread *orig_thread, *thread;
2844 thread = intr->thread;
2846 spdk_set_thread(thread);
2853 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
2884 struct spdk_thread *thread;
2887 thread = spdk_get_thread();
2888 if (!thread) {
2893 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
2894 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
2913 intr->thread = thread;
2932 ret = spdk_fd_group_add_ext(intr->thread->fgrp, efd,
2935 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n",
2936 intr->thread->name, efd, spdk_strerror(-ret));
2948 struct spdk_thread *orig_thread, *thread;
2952 thread = intr->thread;
2954 spdk_set_thread(thread);
2956 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
2975 SPDK_ERRLOG("thread %s: failed to set wrapper for fd_group %d: %s\n",
2976 intr->thread->name, spdk_fd_group_get_fd(fgrp), spdk_strerror(-rc));
2981 rc = spdk_fd_group_nest(intr->thread->fgrp, fgrp);
2983 SPDK_ERRLOG("thread %s: failed to nest fd_group %d: %s\n",
2984 intr->thread->name, spdk_fd_group_get_fd(fgrp), spdk_strerror(-rc));
2996 struct spdk_thread *thread;
3006 thread = spdk_get_thread();
3007 if (!thread) {
3012 if (intr->thread != thread) {
3013 wrong_thread(__func__, intr->name, intr->thread, thread);
3019 spdk_fd_group_unnest(thread->fgrp, intr->fgrp);
3020 spdk_fd_group_set_wrapper(thread->fgrp, NULL, NULL);
3022 spdk_fd_group_remove(thread->fgrp, intr->efd);
3032 struct spdk_thread *thread;
3034 thread = spdk_get_thread();
3035 if (!thread) {
3040 if (intr->thread != thread) {
3041 wrong_thread(__func__, intr->name, intr->thread, thread);
3050 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
3054 spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
3056 return spdk_fd_group_get_fd(thread->fgrp);
3060 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread)
3062 return thread->fgrp;
3071 * g_spdk_msg_mempool will be valid if thread library is initialized.
3195 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin);
3208 struct spdk_thread *thread = spdk_get_thread();
3213 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin);
3214 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin);
3219 sspin->thread = thread;
3220 sspin->thread->lock_count++;
3228 struct spdk_thread *thread = spdk_get_thread();
3233 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin);
3234 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin);
3236 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin);
3237 thread->lock_count--;
3238 sspin->thread = NULL;
3249 struct spdk_thread *thread = spdk_get_thread();
3251 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false);
3253 return sspin->thread == thread;
3273 SPDK_LOG_REGISTER_COMPONENT(thread)