1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #endif 29 30 #ifdef SPDK_HAVE_EXECINFO_H 31 #include <execinfo.h> 32 #endif 33 34 #define SPDK_MSG_BATCH_SIZE 8 35 #define SPDK_MAX_DEVICE_NAME_LEN 256 36 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 37 #define SPDK_MAX_POLLER_NAME_LEN 256 38 #define SPDK_MAX_THREAD_NAME_LEN 256 39 40 static struct spdk_thread *g_app_thread; 41 42 struct spdk_interrupt { 43 int efd; 44 struct spdk_thread *thread; 45 spdk_interrupt_fn fn; 46 void *arg; 47 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 48 }; 49 50 enum spdk_poller_state { 51 /* The poller is registered with a thread but not currently executing its fn. */ 52 SPDK_POLLER_STATE_WAITING, 53 54 /* The poller is currently running its fn. */ 55 SPDK_POLLER_STATE_RUNNING, 56 57 /* The poller was unregistered during the execution of its fn. */ 58 SPDK_POLLER_STATE_UNREGISTERED, 59 60 /* The poller is in the process of being paused. It will be paused 61 * during the next time it's supposed to be executed. 62 */ 63 SPDK_POLLER_STATE_PAUSING, 64 65 /* The poller is registered but currently paused. It's on the 66 * paused_pollers list. 67 */ 68 SPDK_POLLER_STATE_PAUSED, 69 }; 70 71 struct spdk_poller { 72 TAILQ_ENTRY(spdk_poller) tailq; 73 RB_ENTRY(spdk_poller) node; 74 75 /* Current state of the poller; should only be accessed from the poller's thread. */ 76 enum spdk_poller_state state; 77 78 uint64_t period_ticks; 79 uint64_t next_run_tick; 80 uint64_t run_count; 81 uint64_t busy_count; 82 uint64_t id; 83 spdk_poller_fn fn; 84 void *arg; 85 struct spdk_thread *thread; 86 struct spdk_interrupt *intr; 87 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 88 void *set_intr_cb_arg; 89 90 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 91 }; 92 93 enum spdk_thread_state { 94 /* The thread is processing poller and message by spdk_thread_poll(). */ 95 SPDK_THREAD_STATE_RUNNING, 96 97 /* The thread is in the process of termination. It reaps unregistering 98 * poller are releasing I/O channel. 99 */ 100 SPDK_THREAD_STATE_EXITING, 101 102 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 103 SPDK_THREAD_STATE_EXITED, 104 }; 105 106 struct spdk_thread_post_poller_handler { 107 spdk_post_poller_fn fn; 108 void *fn_arg; 109 }; 110 111 #define SPDK_THREAD_MAX_POST_POLLER_HANDLERS (4) 112 113 struct spdk_thread { 114 uint64_t tsc_last; 115 struct spdk_thread_stats stats; 116 /* 117 * Contains pollers actively running on this thread. Pollers 118 * are run round-robin. The thread takes one poller from the head 119 * of the ring, executes it, then puts it back at the tail of 120 * the ring. 121 */ 122 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 123 /** 124 * Contains pollers running on this thread with a periodic timer. 125 */ 126 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 127 struct spdk_poller *first_timed_poller; 128 /* 129 * Contains paused pollers. Pollers on this queue are waiting until 130 * they are resumed (in which case they're put onto the active/timer 131 * queues) or unregistered. 132 */ 133 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 134 struct spdk_thread_post_poller_handler pp_handlers[SPDK_THREAD_MAX_POST_POLLER_HANDLERS]; 135 struct spdk_ring *messages; 136 uint8_t num_pp_handlers; 137 int msg_fd; 138 SLIST_HEAD(, spdk_msg) msg_cache; 139 size_t msg_cache_count; 140 spdk_msg_fn critical_msg; 141 uint64_t id; 142 uint64_t next_poller_id; 143 enum spdk_thread_state state; 144 int pending_unregister_count; 145 uint32_t for_each_count; 146 147 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 148 TAILQ_ENTRY(spdk_thread) tailq; 149 150 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 151 struct spdk_cpuset cpumask; 152 uint64_t exit_timeout_tsc; 153 154 int32_t lock_count; 155 156 /* spdk_thread is bound to current CPU core. */ 157 bool is_bound; 158 159 /* Indicates whether this spdk_thread currently runs in interrupt. */ 160 bool in_interrupt; 161 bool poller_unregistered; 162 struct spdk_fd_group *fgrp; 163 164 uint16_t trace_id; 165 166 uint8_t reserved[6]; 167 168 /* User context allocated at the end */ 169 uint8_t ctx[0]; 170 }; 171 172 /* 173 * Assert that spdk_thread struct is 8 byte aligned to ensure 174 * the user ctx is also 8-byte aligned. 175 */ 176 SPDK_STATIC_ASSERT((sizeof(struct spdk_thread)) % 8 == 0, "Incorrect size"); 177 178 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 179 180 static spdk_new_thread_fn g_new_thread_fn = NULL; 181 static spdk_thread_op_fn g_thread_op_fn = NULL; 182 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 183 static size_t g_ctx_sz = 0; 184 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 185 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 186 * SPDK application is required. 187 */ 188 static uint64_t g_thread_id = 1; 189 190 enum spin_error { 191 SPIN_ERR_NONE, 192 /* Trying to use an SPDK lock while not on an SPDK thread */ 193 SPIN_ERR_NOT_SPDK_THREAD, 194 /* Trying to lock a lock already held by this SPDK thread */ 195 SPIN_ERR_DEADLOCK, 196 /* Trying to unlock a lock not held by this SPDK thread */ 197 SPIN_ERR_WRONG_THREAD, 198 /* pthread_spin_*() returned an error */ 199 SPIN_ERR_PTHREAD, 200 /* Trying to destroy a lock that is held */ 201 SPIN_ERR_LOCK_HELD, 202 /* lock_count is invalid */ 203 SPIN_ERR_LOCK_COUNT, 204 /* 205 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 206 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 207 * deadlock when another SPDK thread on the same pthread tries to take that lock. 208 */ 209 SPIN_ERR_HOLD_DURING_SWITCH, 210 /* Trying to use a lock that was destroyed (but not re-initialized) */ 211 SPIN_ERR_DESTROYED, 212 /* Trying to use a lock that is not initialized */ 213 SPIN_ERR_NOT_INITIALIZED, 214 215 /* Must be last, not an actual error code */ 216 SPIN_ERR_LAST 217 }; 218 219 static const char *spin_error_strings[] = { 220 [SPIN_ERR_NONE] = "No error", 221 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 222 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 223 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 224 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 225 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 226 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 227 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 228 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 229 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 230 }; 231 232 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 233 ? "Unknown error" : spin_error_strings[err] 234 235 static void 236 __posix_abort(enum spin_error err) 237 { 238 abort(); 239 } 240 241 typedef void (*spin_abort)(enum spin_error err); 242 spin_abort g_spin_abort_fn = __posix_abort; 243 244 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 245 do { \ 246 if (spdk_unlikely(!(cond))) { \ 247 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 248 SPIN_ERROR_STRING(err), #cond); \ 249 extra_log; \ 250 g_spin_abort_fn(err); \ 251 ret; \ 252 } \ 253 } while (0) 254 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 255 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 256 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 257 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 258 259 struct io_device { 260 void *io_device; 261 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 262 spdk_io_channel_create_cb create_cb; 263 spdk_io_channel_destroy_cb destroy_cb; 264 spdk_io_device_unregister_cb unregister_cb; 265 struct spdk_thread *unregister_thread; 266 uint32_t ctx_size; 267 uint32_t for_each_count; 268 RB_ENTRY(io_device) node; 269 270 uint32_t refcnt; 271 272 bool pending_unregister; 273 bool unregistered; 274 }; 275 276 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 277 278 static int 279 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 280 { 281 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 282 } 283 284 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 285 286 static int 287 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 288 { 289 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 290 } 291 292 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 293 294 struct spdk_msg { 295 spdk_msg_fn fn; 296 void *arg; 297 298 SLIST_ENTRY(spdk_msg) link; 299 }; 300 301 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 302 303 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 304 static uint32_t g_thread_count = 0; 305 306 static __thread struct spdk_thread *tls_thread = NULL; 307 308 static void 309 thread_trace(void) 310 { 311 struct spdk_trace_tpoint_opts opts[] = { 312 { 313 "THREAD_IOCH_GET", TRACE_THREAD_IOCH_GET, 314 OWNER_TYPE_NONE, OBJECT_NONE, 0, 315 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 316 }, 317 { 318 "THREAD_IOCH_PUT", TRACE_THREAD_IOCH_PUT, 319 OWNER_TYPE_NONE, OBJECT_NONE, 0, 320 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 321 } 322 }; 323 324 spdk_trace_register_owner_type(OWNER_TYPE_THREAD, 't'); 325 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 326 } 327 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 328 329 /* 330 * If this compare function returns zero when two next_run_ticks are equal, 331 * the macro RB_INSERT() returns a pointer to the element with the same 332 * next_run_tick. 333 * 334 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 335 * to remove as a parameter. 336 * 337 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 338 * side by returning 1 when two next_run_ticks are equal. 339 */ 340 static inline int 341 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 342 { 343 if (poller1->next_run_tick < poller2->next_run_tick) { 344 return -1; 345 } else { 346 return 1; 347 } 348 } 349 350 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 351 352 static inline struct spdk_thread * 353 _get_thread(void) 354 { 355 return tls_thread; 356 } 357 358 static int 359 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 360 { 361 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 362 363 g_ctx_sz = ctx_sz; 364 365 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 366 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 367 sizeof(struct spdk_msg), 368 0, /* No cache. We do our own. */ 369 SPDK_ENV_NUMA_ID_ANY); 370 371 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 372 msg_mempool_sz); 373 374 if (!g_spdk_msg_mempool) { 375 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 376 return -ENOMEM; 377 } 378 379 return 0; 380 } 381 382 static void thread_interrupt_destroy(struct spdk_thread *thread); 383 static int thread_interrupt_create(struct spdk_thread *thread); 384 385 static void 386 _free_thread(struct spdk_thread *thread) 387 { 388 struct spdk_io_channel *ch; 389 struct spdk_msg *msg; 390 struct spdk_poller *poller, *ptmp; 391 392 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 393 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 394 thread->name, ch->dev->name); 395 } 396 397 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 398 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 399 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 400 poller->name); 401 } 402 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 403 free(poller); 404 } 405 406 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 407 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 408 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 409 poller->name); 410 } 411 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 412 free(poller); 413 } 414 415 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 416 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 417 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 418 free(poller); 419 } 420 421 pthread_mutex_lock(&g_devlist_mutex); 422 assert(g_thread_count > 0); 423 g_thread_count--; 424 TAILQ_REMOVE(&g_threads, thread, tailq); 425 pthread_mutex_unlock(&g_devlist_mutex); 426 427 msg = SLIST_FIRST(&thread->msg_cache); 428 while (msg != NULL) { 429 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 430 431 assert(thread->msg_cache_count > 0); 432 thread->msg_cache_count--; 433 spdk_mempool_put(g_spdk_msg_mempool, msg); 434 435 msg = SLIST_FIRST(&thread->msg_cache); 436 } 437 438 assert(thread->msg_cache_count == 0); 439 440 if (spdk_interrupt_mode_is_enabled()) { 441 thread_interrupt_destroy(thread); 442 } 443 444 spdk_ring_free(thread->messages); 445 free(thread); 446 } 447 448 int 449 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 450 { 451 assert(g_new_thread_fn == NULL); 452 assert(g_thread_op_fn == NULL); 453 454 if (new_thread_fn == NULL) { 455 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 456 } else { 457 g_new_thread_fn = new_thread_fn; 458 } 459 460 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 461 } 462 463 int 464 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 465 spdk_thread_op_supported_fn thread_op_supported_fn, 466 size_t ctx_sz, size_t msg_mempool_sz) 467 { 468 assert(g_new_thread_fn == NULL); 469 assert(g_thread_op_fn == NULL); 470 assert(g_thread_op_supported_fn == NULL); 471 472 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 473 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 474 return -EINVAL; 475 } 476 477 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 478 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 479 } else { 480 g_thread_op_fn = thread_op_fn; 481 g_thread_op_supported_fn = thread_op_supported_fn; 482 } 483 484 return _thread_lib_init(ctx_sz, msg_mempool_sz); 485 } 486 487 void 488 spdk_thread_lib_fini(void) 489 { 490 struct io_device *dev; 491 492 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 493 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 494 } 495 496 g_new_thread_fn = NULL; 497 g_thread_op_fn = NULL; 498 g_thread_op_supported_fn = NULL; 499 g_ctx_sz = 0; 500 if (g_app_thread != NULL) { 501 _free_thread(g_app_thread); 502 g_app_thread = NULL; 503 } 504 505 if (g_spdk_msg_mempool) { 506 spdk_mempool_free(g_spdk_msg_mempool); 507 g_spdk_msg_mempool = NULL; 508 } 509 } 510 511 struct spdk_thread * 512 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 513 { 514 struct spdk_thread *thread, *null_thread; 515 size_t size = SPDK_ALIGN_CEIL(sizeof(*thread) + g_ctx_sz, SPDK_CACHE_LINE_SIZE); 516 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 517 int rc = 0, i; 518 519 /* Since this spdk_thread object will be used by another core, ensure that it won't share a 520 * cache line with any other object allocated on this core */ 521 rc = posix_memalign((void **)&thread, SPDK_CACHE_LINE_SIZE, size); 522 if (rc != 0) { 523 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 524 return NULL; 525 } 526 memset(thread, 0, size); 527 528 if (cpumask) { 529 spdk_cpuset_copy(&thread->cpumask, cpumask); 530 } else { 531 spdk_cpuset_negate(&thread->cpumask); 532 } 533 534 RB_INIT(&thread->io_channels); 535 TAILQ_INIT(&thread->active_pollers); 536 RB_INIT(&thread->timed_pollers); 537 TAILQ_INIT(&thread->paused_pollers); 538 SLIST_INIT(&thread->msg_cache); 539 thread->msg_cache_count = 0; 540 541 thread->tsc_last = spdk_get_ticks(); 542 543 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 544 * ID exceeds UINT64_MAX a warning message is logged 545 */ 546 thread->next_poller_id = 1; 547 548 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY); 549 if (!thread->messages) { 550 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 551 free(thread); 552 return NULL; 553 } 554 555 /* Fill the local message pool cache. */ 556 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 557 if (rc == 0) { 558 /* If we can't populate the cache it's ok. The cache will get filled 559 * up organically as messages are passed to the thread. */ 560 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 561 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 562 thread->msg_cache_count++; 563 } 564 } 565 566 if (name) { 567 snprintf(thread->name, sizeof(thread->name), "%s", name); 568 } else { 569 snprintf(thread->name, sizeof(thread->name), "%p", thread); 570 } 571 572 thread->trace_id = spdk_trace_register_owner(OWNER_TYPE_THREAD, thread->name); 573 574 pthread_mutex_lock(&g_devlist_mutex); 575 if (g_thread_id == 0) { 576 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 577 pthread_mutex_unlock(&g_devlist_mutex); 578 _free_thread(thread); 579 return NULL; 580 } 581 thread->id = g_thread_id++; 582 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 583 g_thread_count++; 584 pthread_mutex_unlock(&g_devlist_mutex); 585 586 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 587 thread->id, thread->name); 588 589 if (spdk_interrupt_mode_is_enabled()) { 590 thread->in_interrupt = true; 591 rc = thread_interrupt_create(thread); 592 if (rc != 0) { 593 _free_thread(thread); 594 return NULL; 595 } 596 } 597 598 if (g_new_thread_fn) { 599 rc = g_new_thread_fn(thread); 600 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 601 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 602 } 603 604 if (rc != 0) { 605 _free_thread(thread); 606 return NULL; 607 } 608 609 thread->state = SPDK_THREAD_STATE_RUNNING; 610 611 /* If this is the first thread, save it as the app thread. Use an atomic 612 * compare + exchange to guard against crazy users who might try to 613 * call spdk_thread_create() simultaneously on multiple threads. 614 */ 615 null_thread = NULL; 616 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 617 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 618 619 return thread; 620 } 621 622 struct spdk_thread * 623 spdk_thread_get_app_thread(void) 624 { 625 return g_app_thread; 626 } 627 628 bool 629 spdk_thread_is_app_thread(struct spdk_thread *thread) 630 { 631 if (thread == NULL) { 632 thread = _get_thread(); 633 } 634 635 return g_app_thread == thread; 636 } 637 638 void 639 spdk_thread_bind(struct spdk_thread *thread, bool bind) 640 { 641 thread->is_bound = bind; 642 } 643 644 bool 645 spdk_thread_is_bound(struct spdk_thread *thread) 646 { 647 return thread->is_bound; 648 } 649 650 void 651 spdk_set_thread(struct spdk_thread *thread) 652 { 653 tls_thread = thread; 654 } 655 656 static void 657 thread_exit(struct spdk_thread *thread, uint64_t now) 658 { 659 struct spdk_poller *poller; 660 struct spdk_io_channel *ch; 661 662 if (now >= thread->exit_timeout_tsc) { 663 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 664 thread->name); 665 goto exited; 666 } 667 668 if (spdk_ring_count(thread->messages) > 0) { 669 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 670 return; 671 } 672 673 if (thread->for_each_count > 0) { 674 SPDK_INFOLOG(thread, "thread %s is still executing %u for_each_channels/threads\n", 675 thread->name, thread->for_each_count); 676 return; 677 } 678 679 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 680 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 681 SPDK_INFOLOG(thread, 682 "thread %s still has active poller %s\n", 683 thread->name, poller->name); 684 return; 685 } 686 } 687 688 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 689 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 690 SPDK_INFOLOG(thread, 691 "thread %s still has active timed poller %s\n", 692 thread->name, poller->name); 693 return; 694 } 695 } 696 697 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 698 SPDK_INFOLOG(thread, 699 "thread %s still has paused poller %s\n", 700 thread->name, poller->name); 701 return; 702 } 703 704 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 705 SPDK_INFOLOG(thread, 706 "thread %s still has channel for io_device %s\n", 707 thread->name, ch->dev->name); 708 return; 709 } 710 711 if (thread->pending_unregister_count > 0) { 712 SPDK_INFOLOG(thread, 713 "thread %s is still unregistering io_devices\n", 714 thread->name); 715 return; 716 } 717 718 exited: 719 thread->state = SPDK_THREAD_STATE_EXITED; 720 if (spdk_unlikely(thread->in_interrupt)) { 721 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 722 } 723 } 724 725 static void _thread_exit(void *ctx); 726 727 int 728 spdk_thread_exit(struct spdk_thread *thread) 729 { 730 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 731 732 assert(tls_thread == thread); 733 734 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 735 SPDK_INFOLOG(thread, 736 "thread %s is already exiting\n", 737 thread->name); 738 return 0; 739 } 740 741 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 742 SPDK_THREAD_EXIT_TIMEOUT_SEC); 743 thread->state = SPDK_THREAD_STATE_EXITING; 744 745 if (spdk_interrupt_mode_is_enabled()) { 746 spdk_thread_send_msg(thread, _thread_exit, thread); 747 } 748 749 return 0; 750 } 751 752 bool 753 spdk_thread_is_running(struct spdk_thread *thread) 754 { 755 return thread->state == SPDK_THREAD_STATE_RUNNING; 756 } 757 758 bool 759 spdk_thread_is_exited(struct spdk_thread *thread) 760 { 761 return thread->state == SPDK_THREAD_STATE_EXITED; 762 } 763 764 void 765 spdk_thread_destroy(struct spdk_thread *thread) 766 { 767 assert(thread != NULL); 768 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 769 770 assert(thread->state == SPDK_THREAD_STATE_EXITED); 771 772 if (tls_thread == thread) { 773 tls_thread = NULL; 774 } 775 776 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 777 if (thread != g_app_thread) { 778 _free_thread(thread); 779 } 780 } 781 782 void * 783 spdk_thread_get_ctx(struct spdk_thread *thread) 784 { 785 if (g_ctx_sz > 0) { 786 return thread->ctx; 787 } 788 789 return NULL; 790 } 791 792 struct spdk_cpuset * 793 spdk_thread_get_cpumask(struct spdk_thread *thread) 794 { 795 return &thread->cpumask; 796 } 797 798 int 799 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 800 { 801 struct spdk_thread *thread; 802 803 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 804 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 805 assert(false); 806 return -ENOTSUP; 807 } 808 809 thread = spdk_get_thread(); 810 if (!thread) { 811 SPDK_ERRLOG("Called from non-SPDK thread\n"); 812 assert(false); 813 return -EINVAL; 814 } 815 816 spdk_cpuset_copy(&thread->cpumask, cpumask); 817 818 /* Invoke framework's reschedule operation. If this function is called multiple times 819 * in a single spdk_thread_poll() context, the last cpumask will be used in the 820 * reschedule operation. 821 */ 822 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 823 824 return 0; 825 } 826 827 struct spdk_thread * 828 spdk_thread_get_from_ctx(void *ctx) 829 { 830 if (ctx == NULL) { 831 assert(false); 832 return NULL; 833 } 834 835 assert(g_ctx_sz > 0); 836 837 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 838 } 839 840 static inline uint32_t 841 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 842 { 843 unsigned count, i; 844 void *messages[SPDK_MSG_BATCH_SIZE]; 845 uint64_t notify = 1; 846 int rc; 847 848 #ifdef DEBUG 849 /* 850 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 851 * so we will never actually read uninitialized data from events, but just to be sure 852 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 853 */ 854 memset(messages, 0, sizeof(messages)); 855 #endif 856 857 if (max_msgs > 0) { 858 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 859 } else { 860 max_msgs = SPDK_MSG_BATCH_SIZE; 861 } 862 863 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 864 if (spdk_unlikely(thread->in_interrupt) && 865 spdk_ring_count(thread->messages) != 0) { 866 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 867 if (rc < 0) { 868 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 869 } 870 } 871 if (count == 0) { 872 return 0; 873 } 874 875 for (i = 0; i < count; i++) { 876 struct spdk_msg *msg = messages[i]; 877 878 assert(msg != NULL); 879 880 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 881 882 msg->fn(msg->arg); 883 884 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 885 886 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 887 /* Insert the messages at the head. We want to re-use the hot 888 * ones. */ 889 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 890 thread->msg_cache_count++; 891 } else { 892 spdk_mempool_put(g_spdk_msg_mempool, msg); 893 } 894 } 895 896 return count; 897 } 898 899 static void 900 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 901 { 902 struct spdk_poller *tmp __attribute__((unused)); 903 904 poller->next_run_tick = now + poller->period_ticks; 905 906 /* 907 * Insert poller in the thread's timed_pollers tree by next scheduled run time 908 * as its key. 909 */ 910 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 911 assert(tmp == NULL); 912 913 /* Update the cache only if it is empty or the inserted poller is earlier than it. 914 * RB_MIN() is not necessary here because all pollers, which has exactly the same 915 * next_run_tick as the existing poller, are inserted on the right side. 916 */ 917 if (thread->first_timed_poller == NULL || 918 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 919 thread->first_timed_poller = poller; 920 } 921 } 922 923 static inline void 924 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 925 { 926 struct spdk_poller *tmp __attribute__((unused)); 927 928 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 929 assert(tmp != NULL); 930 931 /* This function is not used in any case that is performance critical. 932 * Update the cache simply by RB_MIN() if it needs to be changed. 933 */ 934 if (thread->first_timed_poller == poller) { 935 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 936 } 937 } 938 939 static void 940 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 941 { 942 if (poller->period_ticks) { 943 poller_insert_timer(thread, poller, spdk_get_ticks()); 944 } else { 945 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 946 } 947 } 948 949 static inline void 950 thread_update_stats(struct spdk_thread *thread, uint64_t end, 951 uint64_t start, int rc) 952 { 953 if (rc == 0) { 954 /* Poller status idle */ 955 thread->stats.idle_tsc += end - start; 956 } else if (rc > 0) { 957 /* Poller status busy */ 958 thread->stats.busy_tsc += end - start; 959 } 960 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 961 thread->tsc_last = end; 962 } 963 964 static inline int 965 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 966 { 967 int rc; 968 969 switch (poller->state) { 970 case SPDK_POLLER_STATE_UNREGISTERED: 971 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 972 free(poller); 973 return 0; 974 case SPDK_POLLER_STATE_PAUSING: 975 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 976 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 977 poller->state = SPDK_POLLER_STATE_PAUSED; 978 return 0; 979 case SPDK_POLLER_STATE_WAITING: 980 break; 981 default: 982 assert(false); 983 break; 984 } 985 986 poller->state = SPDK_POLLER_STATE_RUNNING; 987 rc = poller->fn(poller->arg); 988 989 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 990 991 poller->run_count++; 992 if (rc > 0) { 993 poller->busy_count++; 994 } 995 996 #ifdef DEBUG 997 if (rc == -1) { 998 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 999 } 1000 #endif 1001 1002 switch (poller->state) { 1003 case SPDK_POLLER_STATE_UNREGISTERED: 1004 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1005 free(poller); 1006 break; 1007 case SPDK_POLLER_STATE_PAUSING: 1008 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1009 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1010 poller->state = SPDK_POLLER_STATE_PAUSED; 1011 break; 1012 case SPDK_POLLER_STATE_PAUSED: 1013 case SPDK_POLLER_STATE_WAITING: 1014 break; 1015 case SPDK_POLLER_STATE_RUNNING: 1016 poller->state = SPDK_POLLER_STATE_WAITING; 1017 break; 1018 default: 1019 assert(false); 1020 break; 1021 } 1022 1023 return rc; 1024 } 1025 1026 static inline int 1027 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 1028 uint64_t now) 1029 { 1030 int rc; 1031 1032 switch (poller->state) { 1033 case SPDK_POLLER_STATE_UNREGISTERED: 1034 free(poller); 1035 return 0; 1036 case SPDK_POLLER_STATE_PAUSING: 1037 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1038 poller->state = SPDK_POLLER_STATE_PAUSED; 1039 return 0; 1040 case SPDK_POLLER_STATE_WAITING: 1041 break; 1042 default: 1043 assert(false); 1044 break; 1045 } 1046 1047 poller->state = SPDK_POLLER_STATE_RUNNING; 1048 rc = poller->fn(poller->arg); 1049 1050 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1051 1052 poller->run_count++; 1053 if (rc > 0) { 1054 poller->busy_count++; 1055 } 1056 1057 #ifdef DEBUG 1058 if (rc == -1) { 1059 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1060 } 1061 #endif 1062 1063 switch (poller->state) { 1064 case SPDK_POLLER_STATE_UNREGISTERED: 1065 free(poller); 1066 break; 1067 case SPDK_POLLER_STATE_PAUSING: 1068 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1069 poller->state = SPDK_POLLER_STATE_PAUSED; 1070 break; 1071 case SPDK_POLLER_STATE_PAUSED: 1072 break; 1073 case SPDK_POLLER_STATE_RUNNING: 1074 poller->state = SPDK_POLLER_STATE_WAITING; 1075 /* fallthrough */ 1076 case SPDK_POLLER_STATE_WAITING: 1077 poller_insert_timer(thread, poller, now); 1078 break; 1079 default: 1080 assert(false); 1081 break; 1082 } 1083 1084 return rc; 1085 } 1086 1087 static inline void 1088 thread_run_pp_handlers(struct spdk_thread *thread) 1089 { 1090 uint8_t i, count = thread->num_pp_handlers; 1091 1092 /* Set to max value to prevent new handlers registration within the callback */ 1093 thread->num_pp_handlers = SPDK_THREAD_MAX_POST_POLLER_HANDLERS; 1094 1095 for (i = 0; i < count; i++) { 1096 thread->pp_handlers[i].fn(thread->pp_handlers[i].fn_arg); 1097 thread->pp_handlers[i].fn = NULL; 1098 } 1099 1100 thread->num_pp_handlers = 0; 1101 } 1102 1103 static int 1104 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1105 { 1106 uint32_t msg_count; 1107 struct spdk_poller *poller, *tmp; 1108 spdk_msg_fn critical_msg; 1109 int rc = 0; 1110 1111 thread->tsc_last = now; 1112 1113 critical_msg = thread->critical_msg; 1114 if (spdk_unlikely(critical_msg != NULL)) { 1115 critical_msg(NULL); 1116 thread->critical_msg = NULL; 1117 rc = 1; 1118 } 1119 1120 msg_count = msg_queue_run_batch(thread, max_msgs); 1121 if (msg_count) { 1122 rc = 1; 1123 } 1124 1125 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1126 active_pollers_head, tailq, tmp) { 1127 int poller_rc; 1128 1129 poller_rc = thread_execute_poller(thread, poller); 1130 if (poller_rc > rc) { 1131 rc = poller_rc; 1132 } 1133 if (thread->num_pp_handlers) { 1134 thread_run_pp_handlers(thread); 1135 } 1136 } 1137 1138 poller = thread->first_timed_poller; 1139 while (poller != NULL) { 1140 int timer_rc = 0; 1141 1142 if (now < poller->next_run_tick) { 1143 break; 1144 } 1145 1146 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1147 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1148 1149 /* Update the cache to the next timed poller in the list 1150 * only if the current poller is still the closest, otherwise, 1151 * do nothing because the cache has been already updated. 1152 */ 1153 if (thread->first_timed_poller == poller) { 1154 thread->first_timed_poller = tmp; 1155 } 1156 1157 timer_rc = thread_execute_timed_poller(thread, poller, now); 1158 if (timer_rc > rc) { 1159 rc = timer_rc; 1160 } 1161 1162 poller = tmp; 1163 } 1164 1165 return rc; 1166 } 1167 1168 static void 1169 _thread_remove_pollers(void *ctx) 1170 { 1171 struct spdk_thread *thread = ctx; 1172 struct spdk_poller *poller, *tmp; 1173 1174 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1175 active_pollers_head, tailq, tmp) { 1176 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1177 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1178 free(poller); 1179 } 1180 } 1181 1182 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1183 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1184 poller_remove_timer(thread, poller); 1185 free(poller); 1186 } 1187 } 1188 1189 thread->poller_unregistered = false; 1190 } 1191 1192 static void 1193 _thread_exit(void *ctx) 1194 { 1195 struct spdk_thread *thread = ctx; 1196 1197 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1198 1199 thread_exit(thread, spdk_get_ticks()); 1200 1201 if (thread->state != SPDK_THREAD_STATE_EXITED) { 1202 spdk_thread_send_msg(thread, _thread_exit, thread); 1203 } 1204 } 1205 1206 int 1207 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1208 { 1209 struct spdk_thread *orig_thread; 1210 int rc; 1211 1212 orig_thread = _get_thread(); 1213 tls_thread = thread; 1214 1215 if (now == 0) { 1216 now = spdk_get_ticks(); 1217 } 1218 1219 if (spdk_likely(!thread->in_interrupt)) { 1220 rc = thread_poll(thread, max_msgs, now); 1221 if (spdk_unlikely(thread->in_interrupt)) { 1222 /* The thread transitioned to interrupt mode during the above poll. 1223 * Poll it one more time in case that during the transition time 1224 * there is msg received without notification. 1225 */ 1226 rc = thread_poll(thread, max_msgs, now); 1227 } 1228 1229 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1230 thread_exit(thread, now); 1231 } 1232 } else { 1233 /* Non-block wait on thread's fd_group */ 1234 rc = spdk_fd_group_wait(thread->fgrp, 0); 1235 } 1236 1237 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1238 1239 tls_thread = orig_thread; 1240 1241 return rc; 1242 } 1243 1244 uint64_t 1245 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1246 { 1247 struct spdk_poller *poller; 1248 1249 poller = thread->first_timed_poller; 1250 if (poller) { 1251 return poller->next_run_tick; 1252 } 1253 1254 return 0; 1255 } 1256 1257 int 1258 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1259 { 1260 return !TAILQ_EMPTY(&thread->active_pollers); 1261 } 1262 1263 static bool 1264 thread_has_unpaused_pollers(struct spdk_thread *thread) 1265 { 1266 if (TAILQ_EMPTY(&thread->active_pollers) && 1267 RB_EMPTY(&thread->timed_pollers)) { 1268 return false; 1269 } 1270 1271 return true; 1272 } 1273 1274 bool 1275 spdk_thread_has_pollers(struct spdk_thread *thread) 1276 { 1277 if (!thread_has_unpaused_pollers(thread) && 1278 TAILQ_EMPTY(&thread->paused_pollers)) { 1279 return false; 1280 } 1281 1282 return true; 1283 } 1284 1285 bool 1286 spdk_thread_is_idle(struct spdk_thread *thread) 1287 { 1288 if (spdk_ring_count(thread->messages) || 1289 thread_has_unpaused_pollers(thread) || 1290 thread->critical_msg != NULL) { 1291 return false; 1292 } 1293 1294 return true; 1295 } 1296 1297 uint32_t 1298 spdk_thread_get_count(void) 1299 { 1300 /* 1301 * Return cached value of the current thread count. We could acquire the 1302 * lock and iterate through the TAILQ of threads to count them, but that 1303 * count could still be invalidated after we release the lock. 1304 */ 1305 return g_thread_count; 1306 } 1307 1308 struct spdk_thread * 1309 spdk_get_thread(void) 1310 { 1311 return _get_thread(); 1312 } 1313 1314 const char * 1315 spdk_thread_get_name(const struct spdk_thread *thread) 1316 { 1317 return thread->name; 1318 } 1319 1320 uint64_t 1321 spdk_thread_get_id(const struct spdk_thread *thread) 1322 { 1323 return thread->id; 1324 } 1325 1326 struct spdk_thread * 1327 spdk_thread_get_by_id(uint64_t id) 1328 { 1329 struct spdk_thread *thread; 1330 1331 if (id == 0 || id >= g_thread_id) { 1332 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1333 return NULL; 1334 } 1335 pthread_mutex_lock(&g_devlist_mutex); 1336 TAILQ_FOREACH(thread, &g_threads, tailq) { 1337 if (thread->id == id) { 1338 break; 1339 } 1340 } 1341 pthread_mutex_unlock(&g_devlist_mutex); 1342 return thread; 1343 } 1344 1345 int 1346 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1347 { 1348 struct spdk_thread *thread; 1349 1350 thread = _get_thread(); 1351 if (!thread) { 1352 SPDK_ERRLOG("No thread allocated\n"); 1353 return -EINVAL; 1354 } 1355 1356 if (stats == NULL) { 1357 return -EINVAL; 1358 } 1359 1360 *stats = thread->stats; 1361 1362 return 0; 1363 } 1364 1365 uint64_t 1366 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1367 { 1368 if (thread == NULL) { 1369 thread = _get_thread(); 1370 } 1371 1372 return thread->tsc_last; 1373 } 1374 1375 static inline int 1376 thread_send_msg_notification(const struct spdk_thread *target_thread) 1377 { 1378 uint64_t notify = 1; 1379 int rc; 1380 1381 /* Not necessary to do notification if interrupt facility is not enabled */ 1382 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1383 return 0; 1384 } 1385 1386 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1387 * after sending thread msg, it is necessary to check whether target thread runs in 1388 * interrupt mode and then decide whether do event notification. 1389 */ 1390 if (spdk_unlikely(target_thread->in_interrupt)) { 1391 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1392 if (rc < 0) { 1393 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1394 return -EIO; 1395 } 1396 } 1397 1398 return 0; 1399 } 1400 1401 int 1402 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1403 { 1404 struct spdk_thread *local_thread; 1405 struct spdk_msg *msg; 1406 int rc; 1407 1408 assert(thread != NULL); 1409 1410 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1411 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1412 return -EIO; 1413 } 1414 1415 local_thread = _get_thread(); 1416 1417 msg = NULL; 1418 if (local_thread != NULL) { 1419 if (local_thread->msg_cache_count > 0) { 1420 msg = SLIST_FIRST(&local_thread->msg_cache); 1421 assert(msg != NULL); 1422 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1423 local_thread->msg_cache_count--; 1424 } 1425 } 1426 1427 if (msg == NULL) { 1428 msg = spdk_mempool_get(g_spdk_msg_mempool); 1429 if (!msg) { 1430 SPDK_ERRLOG("msg could not be allocated\n"); 1431 return -ENOMEM; 1432 } 1433 } 1434 1435 msg->fn = fn; 1436 msg->arg = ctx; 1437 1438 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1439 if (rc != 1) { 1440 SPDK_ERRLOG("msg could not be enqueued\n"); 1441 spdk_mempool_put(g_spdk_msg_mempool, msg); 1442 return -EIO; 1443 } 1444 1445 return thread_send_msg_notification(thread); 1446 } 1447 1448 int 1449 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1450 { 1451 spdk_msg_fn expected = NULL; 1452 1453 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1454 __ATOMIC_SEQ_CST)) { 1455 return -EIO; 1456 } 1457 1458 return thread_send_msg_notification(thread); 1459 } 1460 1461 #ifdef __linux__ 1462 static int 1463 interrupt_timerfd_process(void *arg) 1464 { 1465 struct spdk_poller *poller = arg; 1466 uint64_t exp; 1467 int rc; 1468 1469 /* clear the level of interval timer */ 1470 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1471 if (rc < 0) { 1472 if (rc == -EAGAIN) { 1473 return 0; 1474 } 1475 1476 return rc; 1477 } 1478 1479 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1480 1481 return poller->fn(poller->arg); 1482 } 1483 1484 static int 1485 period_poller_interrupt_init(struct spdk_poller *poller) 1486 { 1487 int timerfd; 1488 1489 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1490 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1491 if (timerfd < 0) { 1492 return -errno; 1493 } 1494 1495 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1496 if (poller->intr == NULL) { 1497 close(timerfd); 1498 return -1; 1499 } 1500 1501 return 0; 1502 } 1503 1504 static void 1505 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1506 { 1507 int timerfd; 1508 uint64_t now_tick = spdk_get_ticks(); 1509 uint64_t ticks = spdk_get_ticks_hz(); 1510 int ret; 1511 struct itimerspec new_tv = {}; 1512 struct itimerspec old_tv = {}; 1513 1514 assert(poller->intr != NULL); 1515 assert(poller->period_ticks != 0); 1516 1517 timerfd = poller->intr->efd; 1518 1519 assert(timerfd >= 0); 1520 1521 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1522 interrupt_mode ? "interrupt" : "poll"); 1523 1524 if (interrupt_mode) { 1525 /* Set repeated timer expiration */ 1526 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1527 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1528 1529 /* Update next timer expiration */ 1530 if (poller->next_run_tick == 0) { 1531 poller->next_run_tick = now_tick + poller->period_ticks; 1532 } else if (poller->next_run_tick < now_tick) { 1533 poller->next_run_tick = now_tick; 1534 } 1535 1536 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1537 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1538 1539 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1540 if (ret < 0) { 1541 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1542 assert(false); 1543 } 1544 } else { 1545 /* Disarm the timer */ 1546 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1547 if (ret < 0) { 1548 /* timerfd_settime's failure indicates that the timerfd is in error */ 1549 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1550 assert(false); 1551 } 1552 1553 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1554 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1555 */ 1556 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1557 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1558 poller_remove_timer(poller->thread, poller); 1559 poller_insert_timer(poller->thread, poller, now_tick); 1560 } 1561 } 1562 1563 static void 1564 poller_interrupt_fini(struct spdk_poller *poller) 1565 { 1566 int fd; 1567 1568 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1569 assert(poller->intr != NULL); 1570 fd = poller->intr->efd; 1571 spdk_interrupt_unregister(&poller->intr); 1572 close(fd); 1573 } 1574 1575 static int 1576 busy_poller_interrupt_init(struct spdk_poller *poller) 1577 { 1578 int busy_efd; 1579 1580 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1581 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1582 if (busy_efd < 0) { 1583 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1584 return -errno; 1585 } 1586 1587 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1588 if (poller->intr == NULL) { 1589 close(busy_efd); 1590 return -1; 1591 } 1592 1593 return 0; 1594 } 1595 1596 static void 1597 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1598 { 1599 int busy_efd = poller->intr->efd; 1600 uint64_t notify = 1; 1601 int rc __attribute__((unused)); 1602 1603 assert(busy_efd >= 0); 1604 1605 if (interrupt_mode) { 1606 /* Write without read on eventfd will get it repeatedly triggered. */ 1607 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1608 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1609 } 1610 } else { 1611 /* Read on eventfd will clear its level triggering. */ 1612 rc = read(busy_efd, ¬ify, sizeof(notify)); 1613 } 1614 } 1615 1616 #else 1617 1618 static int 1619 period_poller_interrupt_init(struct spdk_poller *poller) 1620 { 1621 return -ENOTSUP; 1622 } 1623 1624 static void 1625 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1626 { 1627 } 1628 1629 static void 1630 poller_interrupt_fini(struct spdk_poller *poller) 1631 { 1632 } 1633 1634 static int 1635 busy_poller_interrupt_init(struct spdk_poller *poller) 1636 { 1637 return -ENOTSUP; 1638 } 1639 1640 static void 1641 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1642 { 1643 } 1644 1645 #endif 1646 1647 void 1648 spdk_poller_register_interrupt(struct spdk_poller *poller, 1649 spdk_poller_set_interrupt_mode_cb cb_fn, 1650 void *cb_arg) 1651 { 1652 assert(poller != NULL); 1653 assert(spdk_get_thread() == poller->thread); 1654 1655 if (!spdk_interrupt_mode_is_enabled()) { 1656 return; 1657 } 1658 1659 /* If this poller already had an interrupt, clean the old one up. */ 1660 if (poller->intr != NULL) { 1661 poller_interrupt_fini(poller); 1662 } 1663 1664 poller->set_intr_cb_fn = cb_fn; 1665 poller->set_intr_cb_arg = cb_arg; 1666 1667 /* Set poller into interrupt mode if thread is in interrupt. */ 1668 if (poller->thread->in_interrupt && poller->set_intr_cb_fn) { 1669 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1670 } 1671 } 1672 1673 static uint64_t 1674 convert_us_to_ticks(uint64_t us) 1675 { 1676 uint64_t quotient, remainder, ticks; 1677 1678 if (us) { 1679 quotient = us / SPDK_SEC_TO_USEC; 1680 remainder = us % SPDK_SEC_TO_USEC; 1681 ticks = spdk_get_ticks_hz(); 1682 1683 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1684 } else { 1685 return 0; 1686 } 1687 } 1688 1689 static struct spdk_poller * 1690 poller_register(spdk_poller_fn fn, 1691 void *arg, 1692 uint64_t period_microseconds, 1693 const char *name) 1694 { 1695 struct spdk_thread *thread; 1696 struct spdk_poller *poller; 1697 1698 thread = spdk_get_thread(); 1699 if (!thread) { 1700 assert(false); 1701 return NULL; 1702 } 1703 1704 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1705 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1706 return NULL; 1707 } 1708 1709 poller = calloc(1, sizeof(*poller)); 1710 if (poller == NULL) { 1711 SPDK_ERRLOG("Poller memory allocation failed\n"); 1712 return NULL; 1713 } 1714 1715 if (name) { 1716 snprintf(poller->name, sizeof(poller->name), "%s", name); 1717 } else { 1718 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1719 } 1720 1721 poller->state = SPDK_POLLER_STATE_WAITING; 1722 poller->fn = fn; 1723 poller->arg = arg; 1724 poller->thread = thread; 1725 poller->intr = NULL; 1726 if (thread->next_poller_id == 0) { 1727 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1728 thread->next_poller_id = 1; 1729 } 1730 poller->id = thread->next_poller_id++; 1731 1732 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1733 1734 if (spdk_interrupt_mode_is_enabled()) { 1735 int rc; 1736 1737 if (period_microseconds) { 1738 rc = period_poller_interrupt_init(poller); 1739 if (rc < 0) { 1740 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1741 free(poller); 1742 return NULL; 1743 } 1744 1745 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1746 poller->set_intr_cb_arg = NULL; 1747 1748 } else { 1749 /* If the poller doesn't have a period, create interruptfd that's always 1750 * busy automatically when running in interrupt mode. 1751 */ 1752 rc = busy_poller_interrupt_init(poller); 1753 if (rc > 0) { 1754 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1755 free(poller); 1756 return NULL; 1757 } 1758 1759 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1760 poller->set_intr_cb_arg = NULL; 1761 } 1762 1763 /* Set poller into interrupt mode if thread is in interrupt. */ 1764 if (poller->thread->in_interrupt) { 1765 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1766 } 1767 } 1768 1769 thread_insert_poller(thread, poller); 1770 1771 return poller; 1772 } 1773 1774 struct spdk_poller * 1775 spdk_poller_register(spdk_poller_fn fn, 1776 void *arg, 1777 uint64_t period_microseconds) 1778 { 1779 return poller_register(fn, arg, period_microseconds, NULL); 1780 } 1781 1782 struct spdk_poller * 1783 spdk_poller_register_named(spdk_poller_fn fn, 1784 void *arg, 1785 uint64_t period_microseconds, 1786 const char *name) 1787 { 1788 return poller_register(fn, arg, period_microseconds, name); 1789 } 1790 1791 static void 1792 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1793 struct spdk_thread *curthread) 1794 { 1795 if (thread == NULL) { 1796 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1797 abort(); 1798 } 1799 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1800 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1801 thread->name, thread->id); 1802 assert(false); 1803 } 1804 1805 void 1806 spdk_poller_unregister(struct spdk_poller **ppoller) 1807 { 1808 struct spdk_thread *thread; 1809 struct spdk_poller *poller; 1810 1811 poller = *ppoller; 1812 if (poller == NULL) { 1813 return; 1814 } 1815 1816 *ppoller = NULL; 1817 1818 thread = spdk_get_thread(); 1819 if (!thread) { 1820 assert(false); 1821 return; 1822 } 1823 1824 if (poller->thread != thread) { 1825 wrong_thread(__func__, poller->name, poller->thread, thread); 1826 return; 1827 } 1828 1829 if (spdk_interrupt_mode_is_enabled()) { 1830 /* Release the interrupt resource for period or busy poller */ 1831 if (poller->intr != NULL) { 1832 poller_interrupt_fini(poller); 1833 } 1834 1835 /* If there is not already a pending poller removal, generate 1836 * a message to go process removals. */ 1837 if (!thread->poller_unregistered) { 1838 thread->poller_unregistered = true; 1839 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1840 } 1841 } 1842 1843 /* If the poller was paused, put it on the active_pollers list so that 1844 * its unregistration can be processed by spdk_thread_poll(). 1845 */ 1846 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1847 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1848 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1849 poller->period_ticks = 0; 1850 } 1851 1852 /* Simply set the state to unregistered. The poller will get cleaned up 1853 * in a subsequent call to spdk_thread_poll(). 1854 */ 1855 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1856 } 1857 1858 void 1859 spdk_poller_pause(struct spdk_poller *poller) 1860 { 1861 struct spdk_thread *thread; 1862 1863 thread = spdk_get_thread(); 1864 if (!thread) { 1865 assert(false); 1866 return; 1867 } 1868 1869 if (poller->thread != thread) { 1870 wrong_thread(__func__, poller->name, poller->thread, thread); 1871 return; 1872 } 1873 1874 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1875 * spdk_thread_poll() move it. It allows a poller to be paused from 1876 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1877 * iteration, or from within itself without breaking the logic to always 1878 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1879 */ 1880 switch (poller->state) { 1881 case SPDK_POLLER_STATE_PAUSED: 1882 case SPDK_POLLER_STATE_PAUSING: 1883 break; 1884 case SPDK_POLLER_STATE_RUNNING: 1885 case SPDK_POLLER_STATE_WAITING: 1886 poller->state = SPDK_POLLER_STATE_PAUSING; 1887 break; 1888 default: 1889 assert(false); 1890 break; 1891 } 1892 } 1893 1894 void 1895 spdk_poller_resume(struct spdk_poller *poller) 1896 { 1897 struct spdk_thread *thread; 1898 1899 thread = spdk_get_thread(); 1900 if (!thread) { 1901 assert(false); 1902 return; 1903 } 1904 1905 if (poller->thread != thread) { 1906 wrong_thread(__func__, poller->name, poller->thread, thread); 1907 return; 1908 } 1909 1910 /* If a poller is paused it has to be removed from the paused pollers 1911 * list and put on the active list or timer tree depending on its 1912 * period_ticks. If a poller is still in the process of being paused, 1913 * we just need to flip its state back to waiting, as it's already on 1914 * the appropriate list or tree. 1915 */ 1916 switch (poller->state) { 1917 case SPDK_POLLER_STATE_PAUSED: 1918 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1919 thread_insert_poller(thread, poller); 1920 /* fallthrough */ 1921 case SPDK_POLLER_STATE_PAUSING: 1922 poller->state = SPDK_POLLER_STATE_WAITING; 1923 break; 1924 case SPDK_POLLER_STATE_RUNNING: 1925 case SPDK_POLLER_STATE_WAITING: 1926 break; 1927 default: 1928 assert(false); 1929 break; 1930 } 1931 } 1932 1933 const char * 1934 spdk_poller_get_name(struct spdk_poller *poller) 1935 { 1936 return poller->name; 1937 } 1938 1939 uint64_t 1940 spdk_poller_get_id(struct spdk_poller *poller) 1941 { 1942 return poller->id; 1943 } 1944 1945 const char * 1946 spdk_poller_get_state_str(struct spdk_poller *poller) 1947 { 1948 switch (poller->state) { 1949 case SPDK_POLLER_STATE_WAITING: 1950 return "waiting"; 1951 case SPDK_POLLER_STATE_RUNNING: 1952 return "running"; 1953 case SPDK_POLLER_STATE_UNREGISTERED: 1954 return "unregistered"; 1955 case SPDK_POLLER_STATE_PAUSING: 1956 return "pausing"; 1957 case SPDK_POLLER_STATE_PAUSED: 1958 return "paused"; 1959 default: 1960 return NULL; 1961 } 1962 } 1963 1964 uint64_t 1965 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1966 { 1967 return poller->period_ticks; 1968 } 1969 1970 void 1971 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1972 { 1973 stats->run_count = poller->run_count; 1974 stats->busy_count = poller->busy_count; 1975 } 1976 1977 struct spdk_poller * 1978 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1979 { 1980 return TAILQ_FIRST(&thread->active_pollers); 1981 } 1982 1983 struct spdk_poller * 1984 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1985 { 1986 return TAILQ_NEXT(prev, tailq); 1987 } 1988 1989 struct spdk_poller * 1990 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1991 { 1992 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1993 } 1994 1995 struct spdk_poller * 1996 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1997 { 1998 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1999 } 2000 2001 struct spdk_poller * 2002 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 2003 { 2004 return TAILQ_FIRST(&thread->paused_pollers); 2005 } 2006 2007 struct spdk_poller * 2008 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 2009 { 2010 return TAILQ_NEXT(prev, tailq); 2011 } 2012 2013 struct spdk_io_channel * 2014 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 2015 { 2016 return RB_MIN(io_channel_tree, &thread->io_channels); 2017 } 2018 2019 struct spdk_io_channel * 2020 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 2021 { 2022 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 2023 } 2024 2025 uint16_t 2026 spdk_thread_get_trace_id(struct spdk_thread *thread) 2027 { 2028 return thread->trace_id; 2029 } 2030 2031 struct call_thread { 2032 struct spdk_thread *cur_thread; 2033 spdk_msg_fn fn; 2034 void *ctx; 2035 2036 struct spdk_thread *orig_thread; 2037 spdk_msg_fn cpl; 2038 }; 2039 2040 static void 2041 _back_to_orig_thread(void *ctx) 2042 { 2043 struct call_thread *ct = ctx; 2044 2045 assert(ct->orig_thread->for_each_count > 0); 2046 ct->orig_thread->for_each_count--; 2047 2048 ct->cpl(ct->ctx); 2049 free(ctx); 2050 } 2051 2052 static void 2053 _on_thread(void *ctx) 2054 { 2055 struct call_thread *ct = ctx; 2056 int rc __attribute__((unused)); 2057 2058 ct->fn(ct->ctx); 2059 2060 pthread_mutex_lock(&g_devlist_mutex); 2061 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2062 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 2063 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 2064 ct->cur_thread->name); 2065 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2066 } 2067 pthread_mutex_unlock(&g_devlist_mutex); 2068 2069 if (!ct->cur_thread) { 2070 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 2071 2072 rc = spdk_thread_send_msg(ct->orig_thread, _back_to_orig_thread, ctx); 2073 } else { 2074 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 2075 ct->cur_thread->name); 2076 2077 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 2078 } 2079 assert(rc == 0); 2080 } 2081 2082 void 2083 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 2084 { 2085 struct call_thread *ct; 2086 struct spdk_thread *thread; 2087 int rc __attribute__((unused)); 2088 2089 ct = calloc(1, sizeof(*ct)); 2090 if (!ct) { 2091 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2092 cpl(ctx); 2093 return; 2094 } 2095 2096 ct->fn = fn; 2097 ct->ctx = ctx; 2098 ct->cpl = cpl; 2099 2100 thread = _get_thread(); 2101 if (!thread) { 2102 SPDK_ERRLOG("No thread allocated\n"); 2103 free(ct); 2104 cpl(ctx); 2105 return; 2106 } 2107 ct->orig_thread = thread; 2108 2109 ct->orig_thread->for_each_count++; 2110 2111 pthread_mutex_lock(&g_devlist_mutex); 2112 ct->cur_thread = TAILQ_FIRST(&g_threads); 2113 pthread_mutex_unlock(&g_devlist_mutex); 2114 2115 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2116 ct->orig_thread->name); 2117 2118 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2119 assert(rc == 0); 2120 } 2121 2122 static inline void 2123 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2124 { 2125 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2126 return; 2127 } 2128 2129 if (poller->set_intr_cb_fn) { 2130 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2131 } 2132 } 2133 2134 void 2135 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2136 { 2137 struct spdk_thread *thread = _get_thread(); 2138 struct spdk_poller *poller, *tmp; 2139 2140 assert(thread); 2141 assert(spdk_interrupt_mode_is_enabled()); 2142 2143 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2144 thread->name, enable_interrupt ? "intr" : "poll", 2145 thread->in_interrupt ? "intr" : "poll"); 2146 2147 if (thread->in_interrupt == enable_interrupt) { 2148 return; 2149 } 2150 2151 /* Set pollers to expected mode */ 2152 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2153 poller_set_interrupt_mode(poller, enable_interrupt); 2154 } 2155 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2156 poller_set_interrupt_mode(poller, enable_interrupt); 2157 } 2158 /* All paused pollers will go to work in interrupt mode */ 2159 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2160 poller_set_interrupt_mode(poller, enable_interrupt); 2161 } 2162 2163 thread->in_interrupt = enable_interrupt; 2164 return; 2165 } 2166 2167 static struct io_device * 2168 io_device_get(void *io_device) 2169 { 2170 struct io_device find = {}; 2171 2172 find.io_device = io_device; 2173 return RB_FIND(io_device_tree, &g_io_devices, &find); 2174 } 2175 2176 void 2177 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2178 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2179 const char *name) 2180 { 2181 struct io_device *dev, *tmp; 2182 struct spdk_thread *thread; 2183 2184 assert(io_device != NULL); 2185 assert(create_cb != NULL); 2186 assert(destroy_cb != NULL); 2187 2188 thread = spdk_get_thread(); 2189 if (!thread) { 2190 SPDK_ERRLOG("called from non-SPDK thread\n"); 2191 assert(false); 2192 return; 2193 } 2194 2195 dev = calloc(1, sizeof(struct io_device)); 2196 if (dev == NULL) { 2197 SPDK_ERRLOG("could not allocate io_device\n"); 2198 return; 2199 } 2200 2201 dev->io_device = io_device; 2202 if (name) { 2203 snprintf(dev->name, sizeof(dev->name), "%s", name); 2204 } else { 2205 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2206 } 2207 dev->create_cb = create_cb; 2208 dev->destroy_cb = destroy_cb; 2209 dev->unregister_cb = NULL; 2210 dev->ctx_size = ctx_size; 2211 dev->for_each_count = 0; 2212 dev->unregistered = false; 2213 dev->refcnt = 0; 2214 2215 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2216 dev->name, dev->io_device, thread->name); 2217 2218 pthread_mutex_lock(&g_devlist_mutex); 2219 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2220 if (tmp != NULL) { 2221 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2222 io_device, tmp->name, dev->name); 2223 free(dev); 2224 } 2225 2226 pthread_mutex_unlock(&g_devlist_mutex); 2227 } 2228 2229 static void 2230 _finish_unregister(void *arg) 2231 { 2232 struct io_device *dev = arg; 2233 struct spdk_thread *thread; 2234 2235 thread = spdk_get_thread(); 2236 assert(thread == dev->unregister_thread); 2237 2238 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2239 dev->name, dev->io_device, thread->name); 2240 2241 assert(thread->pending_unregister_count > 0); 2242 thread->pending_unregister_count--; 2243 2244 dev->unregister_cb(dev->io_device); 2245 free(dev); 2246 } 2247 2248 static void 2249 io_device_free(struct io_device *dev) 2250 { 2251 int rc __attribute__((unused)); 2252 2253 if (dev->unregister_cb == NULL) { 2254 free(dev); 2255 } else { 2256 assert(dev->unregister_thread != NULL); 2257 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2258 dev->name, dev->io_device, dev->unregister_thread->name); 2259 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2260 assert(rc == 0); 2261 } 2262 } 2263 2264 void 2265 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2266 { 2267 struct io_device *dev; 2268 uint32_t refcnt; 2269 struct spdk_thread *thread; 2270 2271 thread = spdk_get_thread(); 2272 if (!thread) { 2273 SPDK_ERRLOG("called from non-SPDK thread\n"); 2274 assert(false); 2275 return; 2276 } 2277 2278 pthread_mutex_lock(&g_devlist_mutex); 2279 dev = io_device_get(io_device); 2280 if (!dev) { 2281 SPDK_ERRLOG("io_device %p not found\n", io_device); 2282 assert(false); 2283 pthread_mutex_unlock(&g_devlist_mutex); 2284 return; 2285 } 2286 2287 /* The for_each_count check differentiates the user attempting to unregister the 2288 * device a second time, from the internal call to this function that occurs 2289 * after the for_each_count reaches 0. 2290 */ 2291 if (dev->pending_unregister && dev->for_each_count > 0) { 2292 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2293 assert(false); 2294 pthread_mutex_unlock(&g_devlist_mutex); 2295 return; 2296 } 2297 2298 dev->unregister_cb = unregister_cb; 2299 dev->unregister_thread = thread; 2300 2301 if (dev->for_each_count > 0) { 2302 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2303 dev->name, io_device, dev->for_each_count); 2304 dev->pending_unregister = true; 2305 pthread_mutex_unlock(&g_devlist_mutex); 2306 return; 2307 } 2308 2309 dev->unregistered = true; 2310 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2311 refcnt = dev->refcnt; 2312 pthread_mutex_unlock(&g_devlist_mutex); 2313 2314 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2315 dev->name, dev->io_device, thread->name); 2316 2317 if (unregister_cb) { 2318 thread->pending_unregister_count++; 2319 } 2320 2321 if (refcnt > 0) { 2322 /* defer deletion */ 2323 return; 2324 } 2325 2326 io_device_free(dev); 2327 } 2328 2329 const char * 2330 spdk_io_device_get_name(struct io_device *dev) 2331 { 2332 return dev->name; 2333 } 2334 2335 static struct spdk_io_channel * 2336 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2337 { 2338 struct spdk_io_channel find = {}; 2339 2340 find.dev = dev; 2341 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2342 } 2343 2344 struct spdk_io_channel * 2345 spdk_get_io_channel(void *io_device) 2346 { 2347 struct spdk_io_channel *ch; 2348 struct spdk_thread *thread; 2349 struct io_device *dev; 2350 int rc; 2351 2352 pthread_mutex_lock(&g_devlist_mutex); 2353 dev = io_device_get(io_device); 2354 if (dev == NULL) { 2355 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2356 pthread_mutex_unlock(&g_devlist_mutex); 2357 return NULL; 2358 } 2359 2360 thread = _get_thread(); 2361 if (!thread) { 2362 SPDK_ERRLOG("No thread allocated\n"); 2363 pthread_mutex_unlock(&g_devlist_mutex); 2364 return NULL; 2365 } 2366 2367 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2368 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2369 pthread_mutex_unlock(&g_devlist_mutex); 2370 return NULL; 2371 } 2372 2373 ch = thread_get_io_channel(thread, dev); 2374 if (ch != NULL) { 2375 ch->ref++; 2376 2377 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2378 ch, dev->name, dev->io_device, thread->name, ch->ref); 2379 2380 /* 2381 * An I/O channel already exists for this device on this 2382 * thread, so return it. 2383 */ 2384 pthread_mutex_unlock(&g_devlist_mutex); 2385 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2386 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2387 return ch; 2388 } 2389 2390 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2391 if (ch == NULL) { 2392 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2393 pthread_mutex_unlock(&g_devlist_mutex); 2394 return NULL; 2395 } 2396 2397 ch->dev = dev; 2398 ch->destroy_cb = dev->destroy_cb; 2399 ch->thread = thread; 2400 ch->ref = 1; 2401 ch->destroy_ref = 0; 2402 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2403 2404 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2405 ch, dev->name, dev->io_device, thread->name, ch->ref); 2406 2407 dev->refcnt++; 2408 2409 pthread_mutex_unlock(&g_devlist_mutex); 2410 2411 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2412 if (rc != 0) { 2413 pthread_mutex_lock(&g_devlist_mutex); 2414 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2415 dev->refcnt--; 2416 free(ch); 2417 SPDK_ERRLOG("could not create io_channel for io_device %s (%p): %s (rc=%d)\n", 2418 dev->name, io_device, spdk_strerror(-rc), rc); 2419 pthread_mutex_unlock(&g_devlist_mutex); 2420 return NULL; 2421 } 2422 2423 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2424 return ch; 2425 } 2426 2427 static void 2428 put_io_channel(void *arg) 2429 { 2430 struct spdk_io_channel *ch = arg; 2431 bool do_remove_dev = true; 2432 struct spdk_thread *thread; 2433 2434 thread = spdk_get_thread(); 2435 if (!thread) { 2436 SPDK_ERRLOG("called from non-SPDK thread\n"); 2437 assert(false); 2438 return; 2439 } 2440 2441 SPDK_DEBUGLOG(thread, 2442 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2443 ch, ch->dev->name, ch->dev->io_device, thread->name); 2444 2445 assert(ch->thread == thread); 2446 2447 ch->destroy_ref--; 2448 2449 if (ch->ref > 0 || ch->destroy_ref > 0) { 2450 /* 2451 * Another reference to the associated io_device was requested 2452 * after this message was sent but before it had a chance to 2453 * execute. 2454 */ 2455 return; 2456 } 2457 2458 pthread_mutex_lock(&g_devlist_mutex); 2459 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2460 pthread_mutex_unlock(&g_devlist_mutex); 2461 2462 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2463 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2464 2465 pthread_mutex_lock(&g_devlist_mutex); 2466 ch->dev->refcnt--; 2467 2468 if (!ch->dev->unregistered) { 2469 do_remove_dev = false; 2470 } 2471 2472 if (ch->dev->refcnt > 0) { 2473 do_remove_dev = false; 2474 } 2475 2476 pthread_mutex_unlock(&g_devlist_mutex); 2477 2478 if (do_remove_dev) { 2479 io_device_free(ch->dev); 2480 } 2481 free(ch); 2482 } 2483 2484 void 2485 spdk_put_io_channel(struct spdk_io_channel *ch) 2486 { 2487 struct spdk_thread *thread; 2488 int rc __attribute__((unused)); 2489 2490 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2491 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2492 2493 thread = spdk_get_thread(); 2494 if (!thread) { 2495 SPDK_ERRLOG("called from non-SPDK thread\n"); 2496 assert(false); 2497 return; 2498 } 2499 2500 if (ch->thread != thread) { 2501 wrong_thread(__func__, "ch", ch->thread, thread); 2502 return; 2503 } 2504 2505 SPDK_DEBUGLOG(thread, 2506 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2507 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2508 2509 ch->ref--; 2510 2511 if (ch->ref == 0) { 2512 ch->destroy_ref++; 2513 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2514 assert(rc == 0); 2515 } 2516 } 2517 2518 struct spdk_io_channel * 2519 spdk_io_channel_from_ctx(void *ctx) 2520 { 2521 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2522 } 2523 2524 struct spdk_thread * 2525 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2526 { 2527 return ch->thread; 2528 } 2529 2530 void * 2531 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2532 { 2533 return ch->dev->io_device; 2534 } 2535 2536 const char * 2537 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2538 { 2539 return spdk_io_device_get_name(ch->dev); 2540 } 2541 2542 int 2543 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2544 { 2545 return ch->ref; 2546 } 2547 2548 struct spdk_io_channel_iter { 2549 void *io_device; 2550 struct io_device *dev; 2551 spdk_channel_msg fn; 2552 int status; 2553 void *ctx; 2554 struct spdk_io_channel *ch; 2555 2556 struct spdk_thread *cur_thread; 2557 2558 struct spdk_thread *orig_thread; 2559 spdk_channel_for_each_cpl cpl; 2560 }; 2561 2562 void * 2563 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2564 { 2565 return i->io_device; 2566 } 2567 2568 struct spdk_io_channel * 2569 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2570 { 2571 return i->ch; 2572 } 2573 2574 void * 2575 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2576 { 2577 return i->ctx; 2578 } 2579 2580 static void 2581 _call_completion(void *ctx) 2582 { 2583 struct spdk_io_channel_iter *i = ctx; 2584 2585 assert(i->orig_thread->for_each_count > 0); 2586 i->orig_thread->for_each_count--; 2587 2588 if (i->cpl != NULL) { 2589 i->cpl(i, i->status); 2590 } 2591 free(i); 2592 } 2593 2594 static void 2595 _call_channel(void *ctx) 2596 { 2597 struct spdk_io_channel_iter *i = ctx; 2598 struct spdk_io_channel *ch; 2599 2600 /* 2601 * It is possible that the channel was deleted before this 2602 * message had a chance to execute. If so, skip calling 2603 * the fn() on this thread. 2604 */ 2605 pthread_mutex_lock(&g_devlist_mutex); 2606 ch = thread_get_io_channel(i->cur_thread, i->dev); 2607 pthread_mutex_unlock(&g_devlist_mutex); 2608 2609 if (ch) { 2610 i->fn(i); 2611 } else { 2612 spdk_for_each_channel_continue(i, 0); 2613 } 2614 } 2615 2616 void 2617 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2618 spdk_channel_for_each_cpl cpl) 2619 { 2620 struct spdk_thread *thread; 2621 struct spdk_io_channel *ch; 2622 struct spdk_io_channel_iter *i; 2623 int rc __attribute__((unused)); 2624 2625 i = calloc(1, sizeof(*i)); 2626 if (!i) { 2627 SPDK_ERRLOG("Unable to allocate iterator\n"); 2628 assert(false); 2629 return; 2630 } 2631 2632 i->io_device = io_device; 2633 i->fn = fn; 2634 i->ctx = ctx; 2635 i->cpl = cpl; 2636 i->orig_thread = _get_thread(); 2637 2638 i->orig_thread->for_each_count++; 2639 2640 pthread_mutex_lock(&g_devlist_mutex); 2641 i->dev = io_device_get(io_device); 2642 if (i->dev == NULL) { 2643 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2644 assert(false); 2645 i->status = -ENODEV; 2646 goto end; 2647 } 2648 2649 /* Do not allow new for_each operations if we are already waiting to unregister 2650 * the device for other for_each operations to complete. 2651 */ 2652 if (i->dev->pending_unregister) { 2653 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2654 i->status = -ENODEV; 2655 goto end; 2656 } 2657 2658 TAILQ_FOREACH(thread, &g_threads, tailq) { 2659 ch = thread_get_io_channel(thread, i->dev); 2660 if (ch != NULL) { 2661 ch->dev->for_each_count++; 2662 i->cur_thread = thread; 2663 i->ch = ch; 2664 pthread_mutex_unlock(&g_devlist_mutex); 2665 rc = spdk_thread_send_msg(thread, _call_channel, i); 2666 assert(rc == 0); 2667 return; 2668 } 2669 } 2670 2671 end: 2672 pthread_mutex_unlock(&g_devlist_mutex); 2673 2674 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2675 assert(rc == 0); 2676 } 2677 2678 static void 2679 __pending_unregister(void *arg) 2680 { 2681 struct io_device *dev = arg; 2682 2683 assert(dev->pending_unregister); 2684 assert(dev->for_each_count == 0); 2685 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2686 } 2687 2688 void 2689 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2690 { 2691 struct spdk_thread *thread; 2692 struct spdk_io_channel *ch; 2693 struct io_device *dev; 2694 int rc __attribute__((unused)); 2695 2696 assert(i->cur_thread == spdk_get_thread()); 2697 2698 i->status = status; 2699 2700 pthread_mutex_lock(&g_devlist_mutex); 2701 dev = i->dev; 2702 if (status) { 2703 goto end; 2704 } 2705 2706 thread = TAILQ_NEXT(i->cur_thread, tailq); 2707 while (thread) { 2708 ch = thread_get_io_channel(thread, dev); 2709 if (ch != NULL) { 2710 i->cur_thread = thread; 2711 i->ch = ch; 2712 pthread_mutex_unlock(&g_devlist_mutex); 2713 rc = spdk_thread_send_msg(thread, _call_channel, i); 2714 assert(rc == 0); 2715 return; 2716 } 2717 thread = TAILQ_NEXT(thread, tailq); 2718 } 2719 2720 end: 2721 dev->for_each_count--; 2722 i->ch = NULL; 2723 pthread_mutex_unlock(&g_devlist_mutex); 2724 2725 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2726 assert(rc == 0); 2727 2728 pthread_mutex_lock(&g_devlist_mutex); 2729 if (dev->pending_unregister && dev->for_each_count == 0) { 2730 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2731 assert(rc == 0); 2732 } 2733 pthread_mutex_unlock(&g_devlist_mutex); 2734 } 2735 2736 static void 2737 thread_interrupt_destroy(struct spdk_thread *thread) 2738 { 2739 struct spdk_fd_group *fgrp = thread->fgrp; 2740 2741 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2742 2743 if (thread->msg_fd < 0) { 2744 return; 2745 } 2746 2747 spdk_fd_group_remove(fgrp, thread->msg_fd); 2748 close(thread->msg_fd); 2749 thread->msg_fd = -1; 2750 2751 spdk_fd_group_destroy(fgrp); 2752 thread->fgrp = NULL; 2753 } 2754 2755 #ifdef __linux__ 2756 static int 2757 thread_interrupt_msg_process(void *arg) 2758 { 2759 struct spdk_thread *thread = arg; 2760 struct spdk_thread *orig_thread; 2761 uint32_t msg_count; 2762 spdk_msg_fn critical_msg; 2763 int rc = 0; 2764 uint64_t notify = 1; 2765 2766 assert(spdk_interrupt_mode_is_enabled()); 2767 2768 orig_thread = spdk_get_thread(); 2769 spdk_set_thread(thread); 2770 2771 /* There may be race between msg_acknowledge and another producer's msg_notify, 2772 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2773 * This can avoid msg notification missing. 2774 */ 2775 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2776 if (rc < 0 && errno != EAGAIN) { 2777 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2778 } 2779 2780 critical_msg = thread->critical_msg; 2781 if (spdk_unlikely(critical_msg != NULL)) { 2782 critical_msg(NULL); 2783 thread->critical_msg = NULL; 2784 rc = 1; 2785 } 2786 2787 msg_count = msg_queue_run_batch(thread, 0); 2788 if (msg_count) { 2789 rc = 1; 2790 } 2791 2792 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2793 if (spdk_unlikely(!thread->in_interrupt)) { 2794 /* The thread transitioned to poll mode in a msg during the above processing. 2795 * Clear msg_fd since thread messages will be polled directly in poll mode. 2796 */ 2797 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2798 if (rc < 0 && errno != EAGAIN) { 2799 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2800 } 2801 } 2802 2803 spdk_set_thread(orig_thread); 2804 return rc; 2805 } 2806 2807 static int 2808 thread_interrupt_create(struct spdk_thread *thread) 2809 { 2810 int rc; 2811 2812 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2813 2814 rc = spdk_fd_group_create(&thread->fgrp); 2815 if (rc) { 2816 return rc; 2817 } 2818 2819 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2820 if (thread->msg_fd < 0) { 2821 rc = -errno; 2822 spdk_fd_group_destroy(thread->fgrp); 2823 thread->fgrp = NULL; 2824 2825 return rc; 2826 } 2827 2828 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2829 thread_interrupt_msg_process, thread); 2830 } 2831 #else 2832 static int 2833 thread_interrupt_create(struct spdk_thread *thread) 2834 { 2835 return -ENOTSUP; 2836 } 2837 #endif 2838 2839 static int 2840 _interrupt_wrapper(void *ctx) 2841 { 2842 struct spdk_interrupt *intr = ctx; 2843 struct spdk_thread *orig_thread, *thread; 2844 int rc; 2845 2846 orig_thread = spdk_get_thread(); 2847 thread = intr->thread; 2848 2849 spdk_set_thread(thread); 2850 2851 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2852 intr->fn, intr->arg); 2853 2854 rc = intr->fn(intr->arg); 2855 2856 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2857 2858 spdk_set_thread(orig_thread); 2859 2860 return rc; 2861 } 2862 2863 struct spdk_interrupt * 2864 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2865 void *arg, const char *name) 2866 { 2867 return spdk_interrupt_register_for_events(efd, SPDK_INTERRUPT_EVENT_IN, fn, arg, name); 2868 } 2869 2870 struct spdk_interrupt * 2871 spdk_interrupt_register_for_events(int efd, uint32_t events, spdk_interrupt_fn fn, void *arg, 2872 const char *name) 2873 { 2874 struct spdk_event_handler_opts opts = {}; 2875 2876 spdk_fd_group_get_default_event_handler_opts(&opts, sizeof(opts)); 2877 opts.events = events; 2878 opts.fd_type = SPDK_FD_TYPE_DEFAULT; 2879 2880 return spdk_interrupt_register_ext(efd, fn, arg, name, &opts); 2881 } 2882 2883 struct spdk_interrupt * 2884 spdk_interrupt_register_ext(int efd, spdk_interrupt_fn fn, void *arg, const char *name, 2885 struct spdk_event_handler_opts *opts) 2886 { 2887 struct spdk_thread *thread; 2888 struct spdk_interrupt *intr; 2889 int ret; 2890 2891 thread = spdk_get_thread(); 2892 if (!thread) { 2893 assert(false); 2894 return NULL; 2895 } 2896 2897 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2898 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2899 return NULL; 2900 } 2901 2902 intr = calloc(1, sizeof(*intr)); 2903 if (intr == NULL) { 2904 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2905 return NULL; 2906 } 2907 2908 if (name) { 2909 snprintf(intr->name, sizeof(intr->name), "%s", name); 2910 } else { 2911 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2912 } 2913 2914 intr->efd = efd; 2915 intr->thread = thread; 2916 intr->fn = fn; 2917 intr->arg = arg; 2918 2919 ret = spdk_fd_group_add_ext(thread->fgrp, efd, _interrupt_wrapper, intr, intr->name, opts); 2920 2921 if (ret != 0) { 2922 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2923 thread->name, efd, spdk_strerror(-ret)); 2924 free(intr); 2925 return NULL; 2926 } 2927 2928 return intr; 2929 } 2930 2931 void 2932 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2933 { 2934 struct spdk_thread *thread; 2935 struct spdk_interrupt *intr; 2936 2937 intr = *pintr; 2938 if (intr == NULL) { 2939 return; 2940 } 2941 2942 *pintr = NULL; 2943 2944 thread = spdk_get_thread(); 2945 if (!thread) { 2946 assert(false); 2947 return; 2948 } 2949 2950 if (intr->thread != thread) { 2951 wrong_thread(__func__, intr->name, intr->thread, thread); 2952 return; 2953 } 2954 2955 spdk_fd_group_remove(thread->fgrp, intr->efd); 2956 free(intr); 2957 } 2958 2959 int 2960 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2961 enum spdk_interrupt_event_types event_types) 2962 { 2963 struct spdk_thread *thread; 2964 2965 thread = spdk_get_thread(); 2966 if (!thread) { 2967 assert(false); 2968 return -EINVAL; 2969 } 2970 2971 if (intr->thread != thread) { 2972 wrong_thread(__func__, intr->name, intr->thread, thread); 2973 return -EINVAL; 2974 } 2975 2976 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2977 } 2978 2979 int 2980 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2981 { 2982 return spdk_fd_group_get_fd(thread->fgrp); 2983 } 2984 2985 struct spdk_fd_group * 2986 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2987 { 2988 return thread->fgrp; 2989 } 2990 2991 static bool g_interrupt_mode = false; 2992 2993 int 2994 spdk_interrupt_mode_enable(void) 2995 { 2996 /* It must be called once prior to initializing the threading library. 2997 * g_spdk_msg_mempool will be valid if thread library is initialized. 2998 */ 2999 if (g_spdk_msg_mempool) { 3000 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 3001 return -1; 3002 } 3003 3004 #ifdef __linux__ 3005 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 3006 g_interrupt_mode = true; 3007 return 0; 3008 #else 3009 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 3010 g_interrupt_mode = false; 3011 return -ENOTSUP; 3012 #endif 3013 } 3014 3015 bool 3016 spdk_interrupt_mode_is_enabled(void) 3017 { 3018 return g_interrupt_mode; 3019 } 3020 3021 #define SSPIN_DEBUG_STACK_FRAMES 16 3022 3023 struct sspin_stack { 3024 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 3025 uint32_t depth; 3026 }; 3027 3028 struct spdk_spinlock_internal { 3029 struct sspin_stack init_stack; 3030 struct sspin_stack lock_stack; 3031 struct sspin_stack unlock_stack; 3032 }; 3033 3034 static void 3035 sspin_init_internal(struct spdk_spinlock *sspin) 3036 { 3037 #ifdef DEBUG 3038 sspin->internal = calloc(1, sizeof(*sspin->internal)); 3039 #endif 3040 } 3041 3042 static void 3043 sspin_fini_internal(struct spdk_spinlock *sspin) 3044 { 3045 #ifdef DEBUG 3046 free(sspin->internal); 3047 sspin->internal = NULL; 3048 #endif 3049 } 3050 3051 #if defined(DEBUG) && defined(SPDK_HAVE_EXECINFO_H) 3052 #define SSPIN_GET_STACK(sspin, which) \ 3053 do { \ 3054 if (sspin->internal != NULL) { \ 3055 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 3056 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 3057 } \ 3058 } while (0) 3059 #else 3060 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 3061 #endif 3062 3063 static void 3064 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 3065 { 3066 #ifdef SPDK_HAVE_EXECINFO_H 3067 char **stack; 3068 size_t i; 3069 3070 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 3071 if (stack == NULL) { 3072 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 3073 return; 3074 } 3075 SPDK_ERRLOG(" %s:\n", title); 3076 for (i = 0; i < sspin_stack->depth; i++) { 3077 /* 3078 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 3079 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 3080 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 3081 * gdb_macros.md in the docs directory for details. 3082 */ 3083 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 3084 } 3085 free(stack); 3086 #endif /* SPDK_HAVE_EXECINFO_H */ 3087 } 3088 3089 static void 3090 sspin_stacks_print(const struct spdk_spinlock *sspin) 3091 { 3092 if (sspin->internal == NULL) { 3093 return; 3094 } 3095 SPDK_ERRLOG("spinlock %p\n", sspin); 3096 sspin_stack_print("Lock initialized at", &sspin->internal->init_stack); 3097 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 3098 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 3099 } 3100 3101 void 3102 spdk_spin_init(struct spdk_spinlock *sspin) 3103 { 3104 int rc; 3105 3106 memset(sspin, 0, sizeof(*sspin)); 3107 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 3108 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3109 sspin_init_internal(sspin); 3110 SSPIN_GET_STACK(sspin, init); 3111 sspin->initialized = true; 3112 } 3113 3114 void 3115 spdk_spin_destroy(struct spdk_spinlock *sspin) 3116 { 3117 int rc; 3118 3119 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3120 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3121 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 3122 3123 rc = pthread_spin_destroy(&sspin->spinlock); 3124 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3125 3126 sspin_fini_internal(sspin); 3127 sspin->initialized = false; 3128 sspin->destroyed = true; 3129 } 3130 3131 void 3132 spdk_spin_lock(struct spdk_spinlock *sspin) 3133 { 3134 struct spdk_thread *thread = spdk_get_thread(); 3135 int rc; 3136 3137 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3138 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3139 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3140 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3141 3142 rc = pthread_spin_lock(&sspin->spinlock); 3143 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3144 3145 sspin->thread = thread; 3146 sspin->thread->lock_count++; 3147 3148 SSPIN_GET_STACK(sspin, lock); 3149 } 3150 3151 void 3152 spdk_spin_unlock(struct spdk_spinlock *sspin) 3153 { 3154 struct spdk_thread *thread = spdk_get_thread(); 3155 int rc; 3156 3157 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3158 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3159 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3160 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3161 3162 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3163 thread->lock_count--; 3164 sspin->thread = NULL; 3165 3166 SSPIN_GET_STACK(sspin, unlock); 3167 3168 rc = pthread_spin_unlock(&sspin->spinlock); 3169 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3170 } 3171 3172 bool 3173 spdk_spin_held(struct spdk_spinlock *sspin) 3174 { 3175 struct spdk_thread *thread = spdk_get_thread(); 3176 3177 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3178 3179 return sspin->thread == thread; 3180 } 3181 3182 void 3183 spdk_thread_register_post_poller_handler(spdk_post_poller_fn fn, void *fn_arg) 3184 { 3185 struct spdk_thread *thr; 3186 3187 thr = _get_thread(); 3188 assert(thr); 3189 if (spdk_unlikely(thr->num_pp_handlers == SPDK_THREAD_MAX_POST_POLLER_HANDLERS)) { 3190 SPDK_ERRLOG("Too many handlers registered"); 3191 return; 3192 } 3193 3194 thr->pp_handlers[thr->num_pp_handlers].fn = fn; 3195 thr->pp_handlers[thr->num_pp_handlers].fn_arg = fn_arg; 3196 thr->num_pp_handlers++; 3197 } 3198 3199 SPDK_LOG_REGISTER_COMPONENT(thread) 3200