1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #endif 29 30 #ifdef SPDK_HAVE_EXECINFO_H 31 #include <execinfo.h> 32 #endif 33 34 #define SPDK_MSG_BATCH_SIZE 8 35 #define SPDK_MAX_DEVICE_NAME_LEN 256 36 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 37 #define SPDK_MAX_POLLER_NAME_LEN 256 38 #define SPDK_MAX_THREAD_NAME_LEN 256 39 40 static struct spdk_thread *g_app_thread; 41 42 struct spdk_interrupt { 43 int efd; 44 struct spdk_fd_group *fgrp; 45 struct spdk_thread *thread; 46 spdk_interrupt_fn fn; 47 void *arg; 48 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 49 }; 50 51 enum spdk_poller_state { 52 /* The poller is registered with a thread but not currently executing its fn. */ 53 SPDK_POLLER_STATE_WAITING, 54 55 /* The poller is currently running its fn. */ 56 SPDK_POLLER_STATE_RUNNING, 57 58 /* The poller was unregistered during the execution of its fn. */ 59 SPDK_POLLER_STATE_UNREGISTERED, 60 61 /* The poller is in the process of being paused. It will be paused 62 * during the next time it's supposed to be executed. 63 */ 64 SPDK_POLLER_STATE_PAUSING, 65 66 /* The poller is registered but currently paused. It's on the 67 * paused_pollers list. 68 */ 69 SPDK_POLLER_STATE_PAUSED, 70 }; 71 72 struct spdk_poller { 73 TAILQ_ENTRY(spdk_poller) tailq; 74 RB_ENTRY(spdk_poller) node; 75 76 /* Current state of the poller; should only be accessed from the poller's thread. */ 77 enum spdk_poller_state state; 78 79 uint64_t period_ticks; 80 uint64_t next_run_tick; 81 uint64_t run_count; 82 uint64_t busy_count; 83 uint64_t id; 84 spdk_poller_fn fn; 85 void *arg; 86 struct spdk_thread *thread; 87 struct spdk_interrupt *intr; 88 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 89 void *set_intr_cb_arg; 90 91 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 92 }; 93 94 enum spdk_thread_state { 95 /* The thread is processing poller and message by spdk_thread_poll(). */ 96 SPDK_THREAD_STATE_RUNNING, 97 98 /* The thread is in the process of termination. It reaps unregistering 99 * poller are releasing I/O channel. 100 */ 101 SPDK_THREAD_STATE_EXITING, 102 103 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 104 SPDK_THREAD_STATE_EXITED, 105 }; 106 107 struct spdk_thread_post_poller_handler { 108 spdk_post_poller_fn fn; 109 void *fn_arg; 110 }; 111 112 #define SPDK_THREAD_MAX_POST_POLLER_HANDLERS (4) 113 114 struct spdk_thread { 115 uint64_t tsc_last; 116 struct spdk_thread_stats stats; 117 /* 118 * Contains pollers actively running on this thread. Pollers 119 * are run round-robin. The thread takes one poller from the head 120 * of the ring, executes it, then puts it back at the tail of 121 * the ring. 122 */ 123 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 124 /** 125 * Contains pollers running on this thread with a periodic timer. 126 */ 127 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 128 struct spdk_poller *first_timed_poller; 129 /* 130 * Contains paused pollers. Pollers on this queue are waiting until 131 * they are resumed (in which case they're put onto the active/timer 132 * queues) or unregistered. 133 */ 134 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 135 struct spdk_thread_post_poller_handler pp_handlers[SPDK_THREAD_MAX_POST_POLLER_HANDLERS]; 136 struct spdk_ring *messages; 137 uint8_t num_pp_handlers; 138 int msg_fd; 139 SLIST_HEAD(, spdk_msg) msg_cache; 140 size_t msg_cache_count; 141 spdk_msg_fn critical_msg; 142 uint64_t id; 143 uint64_t next_poller_id; 144 enum spdk_thread_state state; 145 int pending_unregister_count; 146 uint32_t for_each_count; 147 148 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 149 TAILQ_ENTRY(spdk_thread) tailq; 150 151 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 152 struct spdk_cpuset cpumask; 153 uint64_t exit_timeout_tsc; 154 155 int32_t lock_count; 156 157 /* spdk_thread is bound to current CPU core. */ 158 bool is_bound; 159 160 /* Indicates whether this spdk_thread currently runs in interrupt. */ 161 bool in_interrupt; 162 bool poller_unregistered; 163 struct spdk_fd_group *fgrp; 164 165 uint16_t trace_id; 166 167 uint8_t reserved[6]; 168 169 /* User context allocated at the end */ 170 uint8_t ctx[0]; 171 }; 172 173 /* 174 * Assert that spdk_thread struct is 8 byte aligned to ensure 175 * the user ctx is also 8-byte aligned. 176 */ 177 SPDK_STATIC_ASSERT((sizeof(struct spdk_thread)) % 8 == 0, "Incorrect size"); 178 179 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 180 181 static spdk_new_thread_fn g_new_thread_fn = NULL; 182 static spdk_thread_op_fn g_thread_op_fn = NULL; 183 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 184 static size_t g_ctx_sz = 0; 185 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 186 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 187 * SPDK application is required. 188 */ 189 static uint64_t g_thread_id = 1; 190 191 enum spin_error { 192 SPIN_ERR_NONE, 193 /* Trying to use an SPDK lock while not on an SPDK thread */ 194 SPIN_ERR_NOT_SPDK_THREAD, 195 /* Trying to lock a lock already held by this SPDK thread */ 196 SPIN_ERR_DEADLOCK, 197 /* Trying to unlock a lock not held by this SPDK thread */ 198 SPIN_ERR_WRONG_THREAD, 199 /* pthread_spin_*() returned an error */ 200 SPIN_ERR_PTHREAD, 201 /* Trying to destroy a lock that is held */ 202 SPIN_ERR_LOCK_HELD, 203 /* lock_count is invalid */ 204 SPIN_ERR_LOCK_COUNT, 205 /* 206 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 207 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 208 * deadlock when another SPDK thread on the same pthread tries to take that lock. 209 */ 210 SPIN_ERR_HOLD_DURING_SWITCH, 211 /* Trying to use a lock that was destroyed (but not re-initialized) */ 212 SPIN_ERR_DESTROYED, 213 /* Trying to use a lock that is not initialized */ 214 SPIN_ERR_NOT_INITIALIZED, 215 216 /* Must be last, not an actual error code */ 217 SPIN_ERR_LAST 218 }; 219 220 static const char *spin_error_strings[] = { 221 [SPIN_ERR_NONE] = "No error", 222 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 223 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 224 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 225 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 226 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 227 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 228 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 229 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 230 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 231 }; 232 233 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 234 ? "Unknown error" : spin_error_strings[err] 235 236 static void 237 __posix_abort(enum spin_error err) 238 { 239 abort(); 240 } 241 242 typedef void (*spin_abort)(enum spin_error err); 243 spin_abort g_spin_abort_fn = __posix_abort; 244 245 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 246 do { \ 247 if (spdk_unlikely(!(cond))) { \ 248 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 249 SPIN_ERROR_STRING(err), #cond); \ 250 extra_log; \ 251 g_spin_abort_fn(err); \ 252 ret; \ 253 } \ 254 } while (0) 255 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 256 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 257 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 258 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 259 260 struct io_device { 261 void *io_device; 262 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 263 spdk_io_channel_create_cb create_cb; 264 spdk_io_channel_destroy_cb destroy_cb; 265 spdk_io_device_unregister_cb unregister_cb; 266 struct spdk_thread *unregister_thread; 267 uint32_t ctx_size; 268 uint32_t for_each_count; 269 RB_ENTRY(io_device) node; 270 271 uint32_t refcnt; 272 273 bool pending_unregister; 274 bool unregistered; 275 }; 276 277 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 278 279 static int 280 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 281 { 282 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 283 } 284 285 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 286 287 static int 288 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 289 { 290 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 291 } 292 293 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 294 295 struct spdk_msg { 296 spdk_msg_fn fn; 297 void *arg; 298 299 SLIST_ENTRY(spdk_msg) link; 300 }; 301 302 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 303 304 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 305 static uint32_t g_thread_count = 0; 306 307 static __thread struct spdk_thread *tls_thread = NULL; 308 309 static void 310 thread_trace(void) 311 { 312 struct spdk_trace_tpoint_opts opts[] = { 313 { 314 "THREAD_IOCH_GET", TRACE_THREAD_IOCH_GET, 315 OWNER_TYPE_NONE, OBJECT_NONE, 0, 316 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 317 }, 318 { 319 "THREAD_IOCH_PUT", TRACE_THREAD_IOCH_PUT, 320 OWNER_TYPE_NONE, OBJECT_NONE, 0, 321 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 322 } 323 }; 324 325 spdk_trace_register_owner_type(OWNER_TYPE_THREAD, 't'); 326 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 327 } 328 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 329 330 /* 331 * If this compare function returns zero when two next_run_ticks are equal, 332 * the macro RB_INSERT() returns a pointer to the element with the same 333 * next_run_tick. 334 * 335 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 336 * to remove as a parameter. 337 * 338 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 339 * side by returning 1 when two next_run_ticks are equal. 340 */ 341 static inline int 342 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 343 { 344 if (poller1->next_run_tick < poller2->next_run_tick) { 345 return -1; 346 } else { 347 return 1; 348 } 349 } 350 351 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 352 353 static inline struct spdk_thread * 354 _get_thread(void) 355 { 356 return tls_thread; 357 } 358 359 static int 360 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 361 { 362 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 363 364 g_ctx_sz = ctx_sz; 365 366 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 367 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 368 sizeof(struct spdk_msg), 369 0, /* No cache. We do our own. */ 370 SPDK_ENV_NUMA_ID_ANY); 371 372 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 373 msg_mempool_sz); 374 375 if (!g_spdk_msg_mempool) { 376 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 377 return -ENOMEM; 378 } 379 380 return 0; 381 } 382 383 static void thread_interrupt_destroy(struct spdk_thread *thread); 384 static int thread_interrupt_create(struct spdk_thread *thread); 385 386 static void 387 _free_thread(struct spdk_thread *thread) 388 { 389 struct spdk_io_channel *ch; 390 struct spdk_msg *msg; 391 struct spdk_poller *poller, *ptmp; 392 393 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 394 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 395 thread->name, ch->dev->name); 396 } 397 398 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 399 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 400 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 401 poller->name); 402 } 403 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 404 free(poller); 405 } 406 407 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 408 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 409 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 410 poller->name); 411 } 412 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 413 free(poller); 414 } 415 416 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 417 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 418 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 419 free(poller); 420 } 421 422 pthread_mutex_lock(&g_devlist_mutex); 423 assert(g_thread_count > 0); 424 g_thread_count--; 425 TAILQ_REMOVE(&g_threads, thread, tailq); 426 pthread_mutex_unlock(&g_devlist_mutex); 427 428 msg = SLIST_FIRST(&thread->msg_cache); 429 while (msg != NULL) { 430 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 431 432 assert(thread->msg_cache_count > 0); 433 thread->msg_cache_count--; 434 spdk_mempool_put(g_spdk_msg_mempool, msg); 435 436 msg = SLIST_FIRST(&thread->msg_cache); 437 } 438 439 assert(thread->msg_cache_count == 0); 440 441 if (spdk_interrupt_mode_is_enabled()) { 442 thread_interrupt_destroy(thread); 443 } 444 445 spdk_ring_free(thread->messages); 446 free(thread); 447 } 448 449 int 450 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 451 { 452 assert(g_new_thread_fn == NULL); 453 assert(g_thread_op_fn == NULL); 454 455 if (new_thread_fn == NULL) { 456 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 457 } else { 458 g_new_thread_fn = new_thread_fn; 459 } 460 461 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 462 } 463 464 int 465 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 466 spdk_thread_op_supported_fn thread_op_supported_fn, 467 size_t ctx_sz, size_t msg_mempool_sz) 468 { 469 assert(g_new_thread_fn == NULL); 470 assert(g_thread_op_fn == NULL); 471 assert(g_thread_op_supported_fn == NULL); 472 473 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 474 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 475 return -EINVAL; 476 } 477 478 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 479 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 480 } else { 481 g_thread_op_fn = thread_op_fn; 482 g_thread_op_supported_fn = thread_op_supported_fn; 483 } 484 485 return _thread_lib_init(ctx_sz, msg_mempool_sz); 486 } 487 488 void 489 spdk_thread_lib_fini(void) 490 { 491 struct io_device *dev; 492 493 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 494 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 495 } 496 497 g_new_thread_fn = NULL; 498 g_thread_op_fn = NULL; 499 g_thread_op_supported_fn = NULL; 500 g_ctx_sz = 0; 501 if (g_app_thread != NULL) { 502 _free_thread(g_app_thread); 503 g_app_thread = NULL; 504 } 505 506 if (g_spdk_msg_mempool) { 507 spdk_mempool_free(g_spdk_msg_mempool); 508 g_spdk_msg_mempool = NULL; 509 } 510 } 511 512 struct spdk_thread * 513 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 514 { 515 struct spdk_thread *thread, *null_thread; 516 size_t size = SPDK_ALIGN_CEIL(sizeof(*thread) + g_ctx_sz, SPDK_CACHE_LINE_SIZE); 517 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 518 int rc = 0, i; 519 520 /* Since this spdk_thread object will be used by another core, ensure that it won't share a 521 * cache line with any other object allocated on this core */ 522 rc = posix_memalign((void **)&thread, SPDK_CACHE_LINE_SIZE, size); 523 if (rc != 0) { 524 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 525 return NULL; 526 } 527 memset(thread, 0, size); 528 529 if (cpumask) { 530 spdk_cpuset_copy(&thread->cpumask, cpumask); 531 } else { 532 spdk_cpuset_negate(&thread->cpumask); 533 } 534 535 RB_INIT(&thread->io_channels); 536 TAILQ_INIT(&thread->active_pollers); 537 RB_INIT(&thread->timed_pollers); 538 TAILQ_INIT(&thread->paused_pollers); 539 SLIST_INIT(&thread->msg_cache); 540 thread->msg_cache_count = 0; 541 542 thread->tsc_last = spdk_get_ticks(); 543 544 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 545 * ID exceeds UINT64_MAX a warning message is logged 546 */ 547 thread->next_poller_id = 1; 548 549 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY); 550 if (!thread->messages) { 551 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 552 free(thread); 553 return NULL; 554 } 555 556 /* Fill the local message pool cache. */ 557 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 558 if (rc == 0) { 559 /* If we can't populate the cache it's ok. The cache will get filled 560 * up organically as messages are passed to the thread. */ 561 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 562 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 563 thread->msg_cache_count++; 564 } 565 } 566 567 if (name) { 568 snprintf(thread->name, sizeof(thread->name), "%s", name); 569 } else { 570 snprintf(thread->name, sizeof(thread->name), "%p", thread); 571 } 572 573 thread->trace_id = spdk_trace_register_owner(OWNER_TYPE_THREAD, thread->name); 574 575 pthread_mutex_lock(&g_devlist_mutex); 576 if (g_thread_id == 0) { 577 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 578 pthread_mutex_unlock(&g_devlist_mutex); 579 _free_thread(thread); 580 return NULL; 581 } 582 thread->id = g_thread_id++; 583 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 584 g_thread_count++; 585 pthread_mutex_unlock(&g_devlist_mutex); 586 587 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 588 thread->id, thread->name); 589 590 if (spdk_interrupt_mode_is_enabled()) { 591 thread->in_interrupt = true; 592 rc = thread_interrupt_create(thread); 593 if (rc != 0) { 594 _free_thread(thread); 595 return NULL; 596 } 597 } 598 599 if (g_new_thread_fn) { 600 rc = g_new_thread_fn(thread); 601 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 602 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 603 } 604 605 if (rc != 0) { 606 _free_thread(thread); 607 return NULL; 608 } 609 610 thread->state = SPDK_THREAD_STATE_RUNNING; 611 612 /* If this is the first thread, save it as the app thread. Use an atomic 613 * compare + exchange to guard against crazy users who might try to 614 * call spdk_thread_create() simultaneously on multiple threads. 615 */ 616 null_thread = NULL; 617 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 618 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 619 620 return thread; 621 } 622 623 struct spdk_thread * 624 spdk_thread_get_app_thread(void) 625 { 626 return g_app_thread; 627 } 628 629 bool 630 spdk_thread_is_app_thread(struct spdk_thread *thread) 631 { 632 if (thread == NULL) { 633 thread = _get_thread(); 634 } 635 636 return g_app_thread == thread; 637 } 638 639 void 640 spdk_thread_bind(struct spdk_thread *thread, bool bind) 641 { 642 thread->is_bound = bind; 643 } 644 645 bool 646 spdk_thread_is_bound(struct spdk_thread *thread) 647 { 648 return thread->is_bound; 649 } 650 651 void 652 spdk_set_thread(struct spdk_thread *thread) 653 { 654 tls_thread = thread; 655 } 656 657 static void 658 thread_exit(struct spdk_thread *thread, uint64_t now) 659 { 660 struct spdk_poller *poller; 661 struct spdk_io_channel *ch; 662 663 if (now >= thread->exit_timeout_tsc) { 664 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 665 thread->name); 666 goto exited; 667 } 668 669 if (spdk_ring_count(thread->messages) > 0) { 670 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 671 return; 672 } 673 674 if (thread->for_each_count > 0) { 675 SPDK_INFOLOG(thread, "thread %s is still executing %u for_each_channels/threads\n", 676 thread->name, thread->for_each_count); 677 return; 678 } 679 680 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 681 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 682 SPDK_INFOLOG(thread, 683 "thread %s still has active poller %s\n", 684 thread->name, poller->name); 685 return; 686 } 687 } 688 689 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 690 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 691 SPDK_INFOLOG(thread, 692 "thread %s still has active timed poller %s\n", 693 thread->name, poller->name); 694 return; 695 } 696 } 697 698 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 699 SPDK_INFOLOG(thread, 700 "thread %s still has paused poller %s\n", 701 thread->name, poller->name); 702 return; 703 } 704 705 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 706 SPDK_INFOLOG(thread, 707 "thread %s still has channel for io_device %s\n", 708 thread->name, ch->dev->name); 709 return; 710 } 711 712 if (thread->pending_unregister_count > 0) { 713 SPDK_INFOLOG(thread, 714 "thread %s is still unregistering io_devices\n", 715 thread->name); 716 return; 717 } 718 719 exited: 720 thread->state = SPDK_THREAD_STATE_EXITED; 721 if (spdk_unlikely(thread->in_interrupt)) { 722 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 723 } 724 } 725 726 static void _thread_exit(void *ctx); 727 728 int 729 spdk_thread_exit(struct spdk_thread *thread) 730 { 731 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 732 733 assert(tls_thread == thread); 734 735 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 736 SPDK_INFOLOG(thread, 737 "thread %s is already exiting\n", 738 thread->name); 739 return 0; 740 } 741 742 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 743 SPDK_THREAD_EXIT_TIMEOUT_SEC); 744 thread->state = SPDK_THREAD_STATE_EXITING; 745 746 if (spdk_interrupt_mode_is_enabled()) { 747 spdk_thread_send_msg(thread, _thread_exit, thread); 748 } 749 750 return 0; 751 } 752 753 bool 754 spdk_thread_is_running(struct spdk_thread *thread) 755 { 756 return thread->state == SPDK_THREAD_STATE_RUNNING; 757 } 758 759 bool 760 spdk_thread_is_exited(struct spdk_thread *thread) 761 { 762 return thread->state == SPDK_THREAD_STATE_EXITED; 763 } 764 765 void 766 spdk_thread_destroy(struct spdk_thread *thread) 767 { 768 assert(thread != NULL); 769 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 770 771 assert(thread->state == SPDK_THREAD_STATE_EXITED); 772 773 if (tls_thread == thread) { 774 tls_thread = NULL; 775 } 776 777 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 778 if (thread != g_app_thread) { 779 _free_thread(thread); 780 } 781 } 782 783 void * 784 spdk_thread_get_ctx(struct spdk_thread *thread) 785 { 786 if (g_ctx_sz > 0) { 787 return thread->ctx; 788 } 789 790 return NULL; 791 } 792 793 struct spdk_cpuset * 794 spdk_thread_get_cpumask(struct spdk_thread *thread) 795 { 796 return &thread->cpumask; 797 } 798 799 int 800 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 801 { 802 struct spdk_thread *thread; 803 804 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 805 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 806 assert(false); 807 return -ENOTSUP; 808 } 809 810 thread = spdk_get_thread(); 811 if (!thread) { 812 SPDK_ERRLOG("Called from non-SPDK thread\n"); 813 assert(false); 814 return -EINVAL; 815 } 816 817 spdk_cpuset_copy(&thread->cpumask, cpumask); 818 819 /* Invoke framework's reschedule operation. If this function is called multiple times 820 * in a single spdk_thread_poll() context, the last cpumask will be used in the 821 * reschedule operation. 822 */ 823 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 824 825 return 0; 826 } 827 828 struct spdk_thread * 829 spdk_thread_get_from_ctx(void *ctx) 830 { 831 if (ctx == NULL) { 832 assert(false); 833 return NULL; 834 } 835 836 assert(g_ctx_sz > 0); 837 838 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 839 } 840 841 static inline uint32_t 842 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 843 { 844 unsigned count, i; 845 void *messages[SPDK_MSG_BATCH_SIZE]; 846 uint64_t notify = 1; 847 int rc; 848 849 #ifdef DEBUG 850 /* 851 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 852 * so we will never actually read uninitialized data from events, but just to be sure 853 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 854 */ 855 memset(messages, 0, sizeof(messages)); 856 #endif 857 858 if (max_msgs > 0) { 859 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 860 } else { 861 max_msgs = SPDK_MSG_BATCH_SIZE; 862 } 863 864 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 865 if (spdk_unlikely(thread->in_interrupt) && 866 spdk_ring_count(thread->messages) != 0) { 867 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 868 if (rc < 0) { 869 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 870 } 871 } 872 if (count == 0) { 873 return 0; 874 } 875 876 for (i = 0; i < count; i++) { 877 struct spdk_msg *msg = messages[i]; 878 879 assert(msg != NULL); 880 881 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 882 883 msg->fn(msg->arg); 884 885 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 886 887 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 888 /* Insert the messages at the head. We want to re-use the hot 889 * ones. */ 890 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 891 thread->msg_cache_count++; 892 } else { 893 spdk_mempool_put(g_spdk_msg_mempool, msg); 894 } 895 } 896 897 return count; 898 } 899 900 static void 901 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 902 { 903 struct spdk_poller *tmp __attribute__((unused)); 904 905 poller->next_run_tick = now + poller->period_ticks; 906 907 /* 908 * Insert poller in the thread's timed_pollers tree by next scheduled run time 909 * as its key. 910 */ 911 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 912 assert(tmp == NULL); 913 914 /* Update the cache only if it is empty or the inserted poller is earlier than it. 915 * RB_MIN() is not necessary here because all pollers, which has exactly the same 916 * next_run_tick as the existing poller, are inserted on the right side. 917 */ 918 if (thread->first_timed_poller == NULL || 919 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 920 thread->first_timed_poller = poller; 921 } 922 } 923 924 static inline void 925 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 926 { 927 struct spdk_poller *tmp __attribute__((unused)); 928 929 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 930 assert(tmp != NULL); 931 932 /* This function is not used in any case that is performance critical. 933 * Update the cache simply by RB_MIN() if it needs to be changed. 934 */ 935 if (thread->first_timed_poller == poller) { 936 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 937 } 938 } 939 940 static void 941 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 942 { 943 if (poller->period_ticks) { 944 poller_insert_timer(thread, poller, spdk_get_ticks()); 945 } else { 946 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 947 } 948 } 949 950 static inline void 951 thread_update_stats(struct spdk_thread *thread, uint64_t end, 952 uint64_t start, int rc) 953 { 954 if (rc == 0) { 955 /* Poller status idle */ 956 thread->stats.idle_tsc += end - start; 957 } else if (rc > 0) { 958 /* Poller status busy */ 959 thread->stats.busy_tsc += end - start; 960 } 961 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 962 thread->tsc_last = end; 963 } 964 965 static inline int 966 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 967 { 968 int rc; 969 970 switch (poller->state) { 971 case SPDK_POLLER_STATE_UNREGISTERED: 972 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 973 free(poller); 974 return 0; 975 case SPDK_POLLER_STATE_PAUSING: 976 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 977 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 978 poller->state = SPDK_POLLER_STATE_PAUSED; 979 return 0; 980 case SPDK_POLLER_STATE_WAITING: 981 break; 982 default: 983 assert(false); 984 break; 985 } 986 987 poller->state = SPDK_POLLER_STATE_RUNNING; 988 rc = poller->fn(poller->arg); 989 990 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 991 992 poller->run_count++; 993 if (rc > 0) { 994 poller->busy_count++; 995 } 996 997 #ifdef DEBUG 998 if (rc == -1) { 999 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 1000 } 1001 #endif 1002 1003 switch (poller->state) { 1004 case SPDK_POLLER_STATE_UNREGISTERED: 1005 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1006 free(poller); 1007 break; 1008 case SPDK_POLLER_STATE_PAUSING: 1009 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1010 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1011 poller->state = SPDK_POLLER_STATE_PAUSED; 1012 break; 1013 case SPDK_POLLER_STATE_PAUSED: 1014 case SPDK_POLLER_STATE_WAITING: 1015 break; 1016 case SPDK_POLLER_STATE_RUNNING: 1017 poller->state = SPDK_POLLER_STATE_WAITING; 1018 break; 1019 default: 1020 assert(false); 1021 break; 1022 } 1023 1024 return rc; 1025 } 1026 1027 static inline int 1028 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 1029 uint64_t now) 1030 { 1031 int rc; 1032 1033 switch (poller->state) { 1034 case SPDK_POLLER_STATE_UNREGISTERED: 1035 free(poller); 1036 return 0; 1037 case SPDK_POLLER_STATE_PAUSING: 1038 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1039 poller->state = SPDK_POLLER_STATE_PAUSED; 1040 return 0; 1041 case SPDK_POLLER_STATE_WAITING: 1042 break; 1043 default: 1044 assert(false); 1045 break; 1046 } 1047 1048 poller->state = SPDK_POLLER_STATE_RUNNING; 1049 rc = poller->fn(poller->arg); 1050 1051 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1052 1053 poller->run_count++; 1054 if (rc > 0) { 1055 poller->busy_count++; 1056 } 1057 1058 #ifdef DEBUG 1059 if (rc == -1) { 1060 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1061 } 1062 #endif 1063 1064 switch (poller->state) { 1065 case SPDK_POLLER_STATE_UNREGISTERED: 1066 free(poller); 1067 break; 1068 case SPDK_POLLER_STATE_PAUSING: 1069 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1070 poller->state = SPDK_POLLER_STATE_PAUSED; 1071 break; 1072 case SPDK_POLLER_STATE_PAUSED: 1073 break; 1074 case SPDK_POLLER_STATE_RUNNING: 1075 poller->state = SPDK_POLLER_STATE_WAITING; 1076 /* fallthrough */ 1077 case SPDK_POLLER_STATE_WAITING: 1078 poller_insert_timer(thread, poller, now); 1079 break; 1080 default: 1081 assert(false); 1082 break; 1083 } 1084 1085 return rc; 1086 } 1087 1088 static inline void 1089 thread_run_pp_handlers(struct spdk_thread *thread) 1090 { 1091 uint8_t i, count = thread->num_pp_handlers; 1092 1093 /* Set to max value to prevent new handlers registration within the callback */ 1094 thread->num_pp_handlers = SPDK_THREAD_MAX_POST_POLLER_HANDLERS; 1095 1096 for (i = 0; i < count; i++) { 1097 thread->pp_handlers[i].fn(thread->pp_handlers[i].fn_arg); 1098 thread->pp_handlers[i].fn = NULL; 1099 } 1100 1101 thread->num_pp_handlers = 0; 1102 } 1103 1104 static int 1105 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1106 { 1107 uint32_t msg_count; 1108 struct spdk_poller *poller, *tmp; 1109 spdk_msg_fn critical_msg; 1110 int rc = 0; 1111 1112 thread->tsc_last = now; 1113 1114 critical_msg = thread->critical_msg; 1115 if (spdk_unlikely(critical_msg != NULL)) { 1116 critical_msg(NULL); 1117 thread->critical_msg = NULL; 1118 rc = 1; 1119 } 1120 1121 msg_count = msg_queue_run_batch(thread, max_msgs); 1122 if (msg_count) { 1123 rc = 1; 1124 } 1125 1126 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1127 active_pollers_head, tailq, tmp) { 1128 int poller_rc; 1129 1130 poller_rc = thread_execute_poller(thread, poller); 1131 if (poller_rc > rc) { 1132 rc = poller_rc; 1133 } 1134 if (thread->num_pp_handlers) { 1135 thread_run_pp_handlers(thread); 1136 } 1137 } 1138 1139 poller = thread->first_timed_poller; 1140 while (poller != NULL) { 1141 int timer_rc = 0; 1142 1143 if (now < poller->next_run_tick) { 1144 break; 1145 } 1146 1147 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1148 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1149 1150 /* Update the cache to the next timed poller in the list 1151 * only if the current poller is still the closest, otherwise, 1152 * do nothing because the cache has been already updated. 1153 */ 1154 if (thread->first_timed_poller == poller) { 1155 thread->first_timed_poller = tmp; 1156 } 1157 1158 timer_rc = thread_execute_timed_poller(thread, poller, now); 1159 if (timer_rc > rc) { 1160 rc = timer_rc; 1161 } 1162 1163 poller = tmp; 1164 } 1165 1166 return rc; 1167 } 1168 1169 static void 1170 _thread_remove_pollers(void *ctx) 1171 { 1172 struct spdk_thread *thread = ctx; 1173 struct spdk_poller *poller, *tmp; 1174 1175 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1176 active_pollers_head, tailq, tmp) { 1177 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1178 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1179 free(poller); 1180 } 1181 } 1182 1183 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1184 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1185 poller_remove_timer(thread, poller); 1186 free(poller); 1187 } 1188 } 1189 1190 thread->poller_unregistered = false; 1191 } 1192 1193 static void 1194 _thread_exit(void *ctx) 1195 { 1196 struct spdk_thread *thread = ctx; 1197 1198 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1199 1200 thread_exit(thread, spdk_get_ticks()); 1201 1202 if (thread->state != SPDK_THREAD_STATE_EXITED) { 1203 spdk_thread_send_msg(thread, _thread_exit, thread); 1204 } 1205 } 1206 1207 int 1208 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1209 { 1210 struct spdk_thread *orig_thread; 1211 int rc; 1212 1213 orig_thread = _get_thread(); 1214 tls_thread = thread; 1215 1216 if (now == 0) { 1217 now = spdk_get_ticks(); 1218 } 1219 1220 if (spdk_likely(!thread->in_interrupt)) { 1221 rc = thread_poll(thread, max_msgs, now); 1222 if (spdk_unlikely(thread->in_interrupt)) { 1223 /* The thread transitioned to interrupt mode during the above poll. 1224 * Poll it one more time in case that during the transition time 1225 * there is msg received without notification. 1226 */ 1227 rc = thread_poll(thread, max_msgs, now); 1228 } 1229 1230 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1231 thread_exit(thread, now); 1232 } 1233 } else { 1234 /* Non-block wait on thread's fd_group */ 1235 rc = spdk_fd_group_wait(thread->fgrp, 0); 1236 } 1237 1238 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1239 1240 tls_thread = orig_thread; 1241 1242 return rc; 1243 } 1244 1245 uint64_t 1246 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1247 { 1248 struct spdk_poller *poller; 1249 1250 poller = thread->first_timed_poller; 1251 if (poller) { 1252 return poller->next_run_tick; 1253 } 1254 1255 return 0; 1256 } 1257 1258 int 1259 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1260 { 1261 return !TAILQ_EMPTY(&thread->active_pollers); 1262 } 1263 1264 static bool 1265 thread_has_unpaused_pollers(struct spdk_thread *thread) 1266 { 1267 if (TAILQ_EMPTY(&thread->active_pollers) && 1268 RB_EMPTY(&thread->timed_pollers)) { 1269 return false; 1270 } 1271 1272 return true; 1273 } 1274 1275 bool 1276 spdk_thread_has_pollers(struct spdk_thread *thread) 1277 { 1278 if (!thread_has_unpaused_pollers(thread) && 1279 TAILQ_EMPTY(&thread->paused_pollers)) { 1280 return false; 1281 } 1282 1283 return true; 1284 } 1285 1286 bool 1287 spdk_thread_is_idle(struct spdk_thread *thread) 1288 { 1289 if (spdk_ring_count(thread->messages) || 1290 thread_has_unpaused_pollers(thread) || 1291 thread->critical_msg != NULL) { 1292 return false; 1293 } 1294 1295 return true; 1296 } 1297 1298 uint32_t 1299 spdk_thread_get_count(void) 1300 { 1301 /* 1302 * Return cached value of the current thread count. We could acquire the 1303 * lock and iterate through the TAILQ of threads to count them, but that 1304 * count could still be invalidated after we release the lock. 1305 */ 1306 return g_thread_count; 1307 } 1308 1309 struct spdk_thread * 1310 spdk_get_thread(void) 1311 { 1312 return _get_thread(); 1313 } 1314 1315 const char * 1316 spdk_thread_get_name(const struct spdk_thread *thread) 1317 { 1318 return thread->name; 1319 } 1320 1321 uint64_t 1322 spdk_thread_get_id(const struct spdk_thread *thread) 1323 { 1324 return thread->id; 1325 } 1326 1327 struct spdk_thread * 1328 spdk_thread_get_by_id(uint64_t id) 1329 { 1330 struct spdk_thread *thread; 1331 1332 if (id == 0 || id >= g_thread_id) { 1333 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1334 return NULL; 1335 } 1336 pthread_mutex_lock(&g_devlist_mutex); 1337 TAILQ_FOREACH(thread, &g_threads, tailq) { 1338 if (thread->id == id) { 1339 break; 1340 } 1341 } 1342 pthread_mutex_unlock(&g_devlist_mutex); 1343 return thread; 1344 } 1345 1346 int 1347 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1348 { 1349 struct spdk_thread *thread; 1350 1351 thread = _get_thread(); 1352 if (!thread) { 1353 SPDK_ERRLOG("No thread allocated\n"); 1354 return -EINVAL; 1355 } 1356 1357 if (stats == NULL) { 1358 return -EINVAL; 1359 } 1360 1361 *stats = thread->stats; 1362 1363 return 0; 1364 } 1365 1366 uint64_t 1367 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1368 { 1369 if (thread == NULL) { 1370 thread = _get_thread(); 1371 } 1372 1373 return thread->tsc_last; 1374 } 1375 1376 static inline int 1377 thread_send_msg_notification(const struct spdk_thread *target_thread) 1378 { 1379 uint64_t notify = 1; 1380 int rc; 1381 1382 /* Not necessary to do notification if interrupt facility is not enabled */ 1383 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1384 return 0; 1385 } 1386 1387 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1388 * after sending thread msg, it is necessary to check whether target thread runs in 1389 * interrupt mode and then decide whether do event notification. 1390 */ 1391 if (spdk_unlikely(target_thread->in_interrupt)) { 1392 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1393 if (rc < 0) { 1394 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1395 return -EIO; 1396 } 1397 } 1398 1399 return 0; 1400 } 1401 1402 int 1403 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1404 { 1405 struct spdk_thread *local_thread; 1406 struct spdk_msg *msg; 1407 int rc; 1408 1409 assert(thread != NULL); 1410 1411 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1412 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1413 return -EIO; 1414 } 1415 1416 local_thread = _get_thread(); 1417 1418 msg = NULL; 1419 if (local_thread != NULL) { 1420 if (local_thread->msg_cache_count > 0) { 1421 msg = SLIST_FIRST(&local_thread->msg_cache); 1422 assert(msg != NULL); 1423 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1424 local_thread->msg_cache_count--; 1425 } 1426 } 1427 1428 if (msg == NULL) { 1429 msg = spdk_mempool_get(g_spdk_msg_mempool); 1430 if (!msg) { 1431 SPDK_ERRLOG("msg could not be allocated\n"); 1432 return -ENOMEM; 1433 } 1434 } 1435 1436 msg->fn = fn; 1437 msg->arg = ctx; 1438 1439 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1440 if (rc != 1) { 1441 SPDK_ERRLOG("msg could not be enqueued\n"); 1442 spdk_mempool_put(g_spdk_msg_mempool, msg); 1443 return -EIO; 1444 } 1445 1446 return thread_send_msg_notification(thread); 1447 } 1448 1449 int 1450 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1451 { 1452 spdk_msg_fn expected = NULL; 1453 1454 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1455 __ATOMIC_SEQ_CST)) { 1456 return -EIO; 1457 } 1458 1459 return thread_send_msg_notification(thread); 1460 } 1461 1462 #ifdef __linux__ 1463 static int 1464 interrupt_timerfd_process(void *arg) 1465 { 1466 struct spdk_poller *poller = arg; 1467 uint64_t exp; 1468 int rc; 1469 1470 /* clear the level of interval timer */ 1471 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1472 if (rc < 0) { 1473 if (rc == -EAGAIN) { 1474 return 0; 1475 } 1476 1477 return rc; 1478 } 1479 1480 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1481 1482 return poller->fn(poller->arg); 1483 } 1484 1485 static int 1486 period_poller_interrupt_init(struct spdk_poller *poller) 1487 { 1488 int timerfd; 1489 1490 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1491 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1492 if (timerfd < 0) { 1493 return -errno; 1494 } 1495 1496 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1497 if (poller->intr == NULL) { 1498 close(timerfd); 1499 return -1; 1500 } 1501 1502 return 0; 1503 } 1504 1505 static void 1506 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1507 { 1508 int timerfd; 1509 uint64_t now_tick = spdk_get_ticks(); 1510 uint64_t ticks = spdk_get_ticks_hz(); 1511 int ret; 1512 struct itimerspec new_tv = {}; 1513 struct itimerspec old_tv = {}; 1514 1515 assert(poller->intr != NULL); 1516 assert(poller->period_ticks != 0); 1517 1518 timerfd = poller->intr->efd; 1519 1520 assert(timerfd >= 0); 1521 1522 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1523 interrupt_mode ? "interrupt" : "poll"); 1524 1525 if (interrupt_mode) { 1526 /* Set repeated timer expiration */ 1527 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1528 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1529 1530 /* Update next timer expiration */ 1531 if (poller->next_run_tick == 0) { 1532 poller->next_run_tick = now_tick + poller->period_ticks; 1533 } else if (poller->next_run_tick < now_tick) { 1534 poller->next_run_tick = now_tick; 1535 } 1536 1537 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1538 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1539 1540 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1541 if (ret < 0) { 1542 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1543 assert(false); 1544 } 1545 } else { 1546 /* Disarm the timer */ 1547 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1548 if (ret < 0) { 1549 /* timerfd_settime's failure indicates that the timerfd is in error */ 1550 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1551 assert(false); 1552 } 1553 1554 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1555 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1556 */ 1557 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1558 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1559 poller_remove_timer(poller->thread, poller); 1560 poller_insert_timer(poller->thread, poller, now_tick); 1561 } 1562 } 1563 1564 static void 1565 poller_interrupt_fini(struct spdk_poller *poller) 1566 { 1567 int fd; 1568 1569 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1570 assert(poller->intr != NULL); 1571 fd = poller->intr->efd; 1572 spdk_interrupt_unregister(&poller->intr); 1573 close(fd); 1574 } 1575 1576 static int 1577 busy_poller_interrupt_init(struct spdk_poller *poller) 1578 { 1579 int busy_efd; 1580 1581 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1582 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1583 if (busy_efd < 0) { 1584 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1585 return -errno; 1586 } 1587 1588 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1589 if (poller->intr == NULL) { 1590 close(busy_efd); 1591 return -1; 1592 } 1593 1594 return 0; 1595 } 1596 1597 static void 1598 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1599 { 1600 int busy_efd = poller->intr->efd; 1601 uint64_t notify = 1; 1602 int rc __attribute__((unused)); 1603 1604 assert(busy_efd >= 0); 1605 1606 if (interrupt_mode) { 1607 /* Write without read on eventfd will get it repeatedly triggered. */ 1608 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1609 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1610 } 1611 } else { 1612 /* Read on eventfd will clear its level triggering. */ 1613 rc = read(busy_efd, ¬ify, sizeof(notify)); 1614 } 1615 } 1616 1617 #else 1618 1619 static int 1620 period_poller_interrupt_init(struct spdk_poller *poller) 1621 { 1622 return -ENOTSUP; 1623 } 1624 1625 static void 1626 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1627 { 1628 } 1629 1630 static void 1631 poller_interrupt_fini(struct spdk_poller *poller) 1632 { 1633 } 1634 1635 static int 1636 busy_poller_interrupt_init(struct spdk_poller *poller) 1637 { 1638 return -ENOTSUP; 1639 } 1640 1641 static void 1642 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1643 { 1644 } 1645 1646 #endif 1647 1648 void 1649 spdk_poller_register_interrupt(struct spdk_poller *poller, 1650 spdk_poller_set_interrupt_mode_cb cb_fn, 1651 void *cb_arg) 1652 { 1653 assert(poller != NULL); 1654 assert(spdk_get_thread() == poller->thread); 1655 1656 if (!spdk_interrupt_mode_is_enabled()) { 1657 return; 1658 } 1659 1660 /* If this poller already had an interrupt, clean the old one up. */ 1661 if (poller->intr != NULL) { 1662 poller_interrupt_fini(poller); 1663 } 1664 1665 poller->set_intr_cb_fn = cb_fn; 1666 poller->set_intr_cb_arg = cb_arg; 1667 1668 /* Set poller into interrupt mode if thread is in interrupt. */ 1669 if (poller->thread->in_interrupt && poller->set_intr_cb_fn) { 1670 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1671 } 1672 } 1673 1674 static uint64_t 1675 convert_us_to_ticks(uint64_t us) 1676 { 1677 uint64_t quotient, remainder, ticks; 1678 1679 if (us) { 1680 quotient = us / SPDK_SEC_TO_USEC; 1681 remainder = us % SPDK_SEC_TO_USEC; 1682 ticks = spdk_get_ticks_hz(); 1683 1684 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1685 } else { 1686 return 0; 1687 } 1688 } 1689 1690 static struct spdk_poller * 1691 poller_register(spdk_poller_fn fn, 1692 void *arg, 1693 uint64_t period_microseconds, 1694 const char *name) 1695 { 1696 struct spdk_thread *thread; 1697 struct spdk_poller *poller; 1698 1699 thread = spdk_get_thread(); 1700 if (!thread) { 1701 assert(false); 1702 return NULL; 1703 } 1704 1705 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1706 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1707 return NULL; 1708 } 1709 1710 poller = calloc(1, sizeof(*poller)); 1711 if (poller == NULL) { 1712 SPDK_ERRLOG("Poller memory allocation failed\n"); 1713 return NULL; 1714 } 1715 1716 if (name) { 1717 snprintf(poller->name, sizeof(poller->name), "%s", name); 1718 } else { 1719 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1720 } 1721 1722 poller->state = SPDK_POLLER_STATE_WAITING; 1723 poller->fn = fn; 1724 poller->arg = arg; 1725 poller->thread = thread; 1726 poller->intr = NULL; 1727 if (thread->next_poller_id == 0) { 1728 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1729 thread->next_poller_id = 1; 1730 } 1731 poller->id = thread->next_poller_id++; 1732 1733 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1734 1735 if (spdk_interrupt_mode_is_enabled()) { 1736 int rc; 1737 1738 if (period_microseconds) { 1739 rc = period_poller_interrupt_init(poller); 1740 if (rc < 0) { 1741 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1742 free(poller); 1743 return NULL; 1744 } 1745 1746 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1747 poller->set_intr_cb_arg = NULL; 1748 1749 } else { 1750 /* If the poller doesn't have a period, create interruptfd that's always 1751 * busy automatically when running in interrupt mode. 1752 */ 1753 rc = busy_poller_interrupt_init(poller); 1754 if (rc > 0) { 1755 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1756 free(poller); 1757 return NULL; 1758 } 1759 1760 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1761 poller->set_intr_cb_arg = NULL; 1762 } 1763 1764 /* Set poller into interrupt mode if thread is in interrupt. */ 1765 if (poller->thread->in_interrupt) { 1766 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1767 } 1768 } 1769 1770 thread_insert_poller(thread, poller); 1771 1772 return poller; 1773 } 1774 1775 struct spdk_poller * 1776 spdk_poller_register(spdk_poller_fn fn, 1777 void *arg, 1778 uint64_t period_microseconds) 1779 { 1780 return poller_register(fn, arg, period_microseconds, NULL); 1781 } 1782 1783 struct spdk_poller * 1784 spdk_poller_register_named(spdk_poller_fn fn, 1785 void *arg, 1786 uint64_t period_microseconds, 1787 const char *name) 1788 { 1789 return poller_register(fn, arg, period_microseconds, name); 1790 } 1791 1792 static void 1793 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1794 struct spdk_thread *curthread) 1795 { 1796 if (thread == NULL) { 1797 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1798 abort(); 1799 } 1800 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1801 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1802 thread->name, thread->id); 1803 assert(false); 1804 } 1805 1806 void 1807 spdk_poller_unregister(struct spdk_poller **ppoller) 1808 { 1809 struct spdk_thread *thread; 1810 struct spdk_poller *poller; 1811 1812 poller = *ppoller; 1813 if (poller == NULL) { 1814 return; 1815 } 1816 1817 *ppoller = NULL; 1818 1819 thread = spdk_get_thread(); 1820 if (!thread) { 1821 assert(false); 1822 return; 1823 } 1824 1825 if (poller->thread != thread) { 1826 wrong_thread(__func__, poller->name, poller->thread, thread); 1827 return; 1828 } 1829 1830 if (spdk_interrupt_mode_is_enabled()) { 1831 /* Release the interrupt resource for period or busy poller */ 1832 if (poller->intr != NULL) { 1833 poller_interrupt_fini(poller); 1834 } 1835 1836 /* If there is not already a pending poller removal, generate 1837 * a message to go process removals. */ 1838 if (!thread->poller_unregistered) { 1839 thread->poller_unregistered = true; 1840 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1841 } 1842 } 1843 1844 /* If the poller was paused, put it on the active_pollers list so that 1845 * its unregistration can be processed by spdk_thread_poll(). 1846 */ 1847 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1848 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1849 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1850 poller->period_ticks = 0; 1851 } 1852 1853 /* Simply set the state to unregistered. The poller will get cleaned up 1854 * in a subsequent call to spdk_thread_poll(). 1855 */ 1856 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1857 } 1858 1859 void 1860 spdk_poller_pause(struct spdk_poller *poller) 1861 { 1862 struct spdk_thread *thread; 1863 1864 thread = spdk_get_thread(); 1865 if (!thread) { 1866 assert(false); 1867 return; 1868 } 1869 1870 if (poller->thread != thread) { 1871 wrong_thread(__func__, poller->name, poller->thread, thread); 1872 return; 1873 } 1874 1875 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1876 * spdk_thread_poll() move it. It allows a poller to be paused from 1877 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1878 * iteration, or from within itself without breaking the logic to always 1879 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1880 */ 1881 switch (poller->state) { 1882 case SPDK_POLLER_STATE_PAUSED: 1883 case SPDK_POLLER_STATE_PAUSING: 1884 break; 1885 case SPDK_POLLER_STATE_RUNNING: 1886 case SPDK_POLLER_STATE_WAITING: 1887 poller->state = SPDK_POLLER_STATE_PAUSING; 1888 break; 1889 default: 1890 assert(false); 1891 break; 1892 } 1893 } 1894 1895 void 1896 spdk_poller_resume(struct spdk_poller *poller) 1897 { 1898 struct spdk_thread *thread; 1899 1900 thread = spdk_get_thread(); 1901 if (!thread) { 1902 assert(false); 1903 return; 1904 } 1905 1906 if (poller->thread != thread) { 1907 wrong_thread(__func__, poller->name, poller->thread, thread); 1908 return; 1909 } 1910 1911 /* If a poller is paused it has to be removed from the paused pollers 1912 * list and put on the active list or timer tree depending on its 1913 * period_ticks. If a poller is still in the process of being paused, 1914 * we just need to flip its state back to waiting, as it's already on 1915 * the appropriate list or tree. 1916 */ 1917 switch (poller->state) { 1918 case SPDK_POLLER_STATE_PAUSED: 1919 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1920 thread_insert_poller(thread, poller); 1921 /* fallthrough */ 1922 case SPDK_POLLER_STATE_PAUSING: 1923 poller->state = SPDK_POLLER_STATE_WAITING; 1924 break; 1925 case SPDK_POLLER_STATE_RUNNING: 1926 case SPDK_POLLER_STATE_WAITING: 1927 break; 1928 default: 1929 assert(false); 1930 break; 1931 } 1932 } 1933 1934 const char * 1935 spdk_poller_get_name(struct spdk_poller *poller) 1936 { 1937 return poller->name; 1938 } 1939 1940 uint64_t 1941 spdk_poller_get_id(struct spdk_poller *poller) 1942 { 1943 return poller->id; 1944 } 1945 1946 const char * 1947 spdk_poller_get_state_str(struct spdk_poller *poller) 1948 { 1949 switch (poller->state) { 1950 case SPDK_POLLER_STATE_WAITING: 1951 return "waiting"; 1952 case SPDK_POLLER_STATE_RUNNING: 1953 return "running"; 1954 case SPDK_POLLER_STATE_UNREGISTERED: 1955 return "unregistered"; 1956 case SPDK_POLLER_STATE_PAUSING: 1957 return "pausing"; 1958 case SPDK_POLLER_STATE_PAUSED: 1959 return "paused"; 1960 default: 1961 return NULL; 1962 } 1963 } 1964 1965 uint64_t 1966 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1967 { 1968 return poller->period_ticks; 1969 } 1970 1971 void 1972 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1973 { 1974 stats->run_count = poller->run_count; 1975 stats->busy_count = poller->busy_count; 1976 } 1977 1978 struct spdk_poller * 1979 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1980 { 1981 return TAILQ_FIRST(&thread->active_pollers); 1982 } 1983 1984 struct spdk_poller * 1985 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1986 { 1987 return TAILQ_NEXT(prev, tailq); 1988 } 1989 1990 struct spdk_poller * 1991 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1992 { 1993 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1994 } 1995 1996 struct spdk_poller * 1997 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1998 { 1999 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 2000 } 2001 2002 struct spdk_poller * 2003 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 2004 { 2005 return TAILQ_FIRST(&thread->paused_pollers); 2006 } 2007 2008 struct spdk_poller * 2009 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 2010 { 2011 return TAILQ_NEXT(prev, tailq); 2012 } 2013 2014 struct spdk_io_channel * 2015 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 2016 { 2017 return RB_MIN(io_channel_tree, &thread->io_channels); 2018 } 2019 2020 struct spdk_io_channel * 2021 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 2022 { 2023 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 2024 } 2025 2026 uint16_t 2027 spdk_thread_get_trace_id(struct spdk_thread *thread) 2028 { 2029 return thread->trace_id; 2030 } 2031 2032 struct call_thread { 2033 struct spdk_thread *cur_thread; 2034 spdk_msg_fn fn; 2035 void *ctx; 2036 2037 struct spdk_thread *orig_thread; 2038 spdk_msg_fn cpl; 2039 }; 2040 2041 static void 2042 _back_to_orig_thread(void *ctx) 2043 { 2044 struct call_thread *ct = ctx; 2045 2046 assert(ct->orig_thread->for_each_count > 0); 2047 ct->orig_thread->for_each_count--; 2048 2049 ct->cpl(ct->ctx); 2050 free(ctx); 2051 } 2052 2053 static void 2054 _on_thread(void *ctx) 2055 { 2056 struct call_thread *ct = ctx; 2057 int rc __attribute__((unused)); 2058 2059 ct->fn(ct->ctx); 2060 2061 pthread_mutex_lock(&g_devlist_mutex); 2062 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2063 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 2064 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 2065 ct->cur_thread->name); 2066 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2067 } 2068 pthread_mutex_unlock(&g_devlist_mutex); 2069 2070 if (!ct->cur_thread) { 2071 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 2072 2073 rc = spdk_thread_send_msg(ct->orig_thread, _back_to_orig_thread, ctx); 2074 } else { 2075 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 2076 ct->cur_thread->name); 2077 2078 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 2079 } 2080 assert(rc == 0); 2081 } 2082 2083 void 2084 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 2085 { 2086 struct call_thread *ct; 2087 struct spdk_thread *thread; 2088 int rc __attribute__((unused)); 2089 2090 ct = calloc(1, sizeof(*ct)); 2091 if (!ct) { 2092 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2093 cpl(ctx); 2094 return; 2095 } 2096 2097 ct->fn = fn; 2098 ct->ctx = ctx; 2099 ct->cpl = cpl; 2100 2101 thread = _get_thread(); 2102 if (!thread) { 2103 SPDK_ERRLOG("No thread allocated\n"); 2104 free(ct); 2105 cpl(ctx); 2106 return; 2107 } 2108 ct->orig_thread = thread; 2109 2110 ct->orig_thread->for_each_count++; 2111 2112 pthread_mutex_lock(&g_devlist_mutex); 2113 ct->cur_thread = TAILQ_FIRST(&g_threads); 2114 pthread_mutex_unlock(&g_devlist_mutex); 2115 2116 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2117 ct->orig_thread->name); 2118 2119 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2120 assert(rc == 0); 2121 } 2122 2123 static inline void 2124 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2125 { 2126 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2127 return; 2128 } 2129 2130 if (poller->set_intr_cb_fn) { 2131 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2132 } 2133 } 2134 2135 void 2136 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2137 { 2138 struct spdk_thread *thread = _get_thread(); 2139 struct spdk_poller *poller, *tmp; 2140 2141 assert(thread); 2142 assert(spdk_interrupt_mode_is_enabled()); 2143 2144 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2145 thread->name, enable_interrupt ? "intr" : "poll", 2146 thread->in_interrupt ? "intr" : "poll"); 2147 2148 if (thread->in_interrupt == enable_interrupt) { 2149 return; 2150 } 2151 2152 /* Set pollers to expected mode */ 2153 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2154 poller_set_interrupt_mode(poller, enable_interrupt); 2155 } 2156 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2157 poller_set_interrupt_mode(poller, enable_interrupt); 2158 } 2159 /* All paused pollers will go to work in interrupt mode */ 2160 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2161 poller_set_interrupt_mode(poller, enable_interrupt); 2162 } 2163 2164 thread->in_interrupt = enable_interrupt; 2165 return; 2166 } 2167 2168 static struct io_device * 2169 io_device_get(void *io_device) 2170 { 2171 struct io_device find = {}; 2172 2173 find.io_device = io_device; 2174 return RB_FIND(io_device_tree, &g_io_devices, &find); 2175 } 2176 2177 void 2178 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2179 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2180 const char *name) 2181 { 2182 struct io_device *dev, *tmp; 2183 struct spdk_thread *thread; 2184 2185 assert(io_device != NULL); 2186 assert(create_cb != NULL); 2187 assert(destroy_cb != NULL); 2188 2189 thread = spdk_get_thread(); 2190 if (!thread) { 2191 SPDK_ERRLOG("called from non-SPDK thread\n"); 2192 assert(false); 2193 return; 2194 } 2195 2196 dev = calloc(1, sizeof(struct io_device)); 2197 if (dev == NULL) { 2198 SPDK_ERRLOG("could not allocate io_device\n"); 2199 return; 2200 } 2201 2202 dev->io_device = io_device; 2203 if (name) { 2204 snprintf(dev->name, sizeof(dev->name), "%s", name); 2205 } else { 2206 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2207 } 2208 dev->create_cb = create_cb; 2209 dev->destroy_cb = destroy_cb; 2210 dev->unregister_cb = NULL; 2211 dev->ctx_size = ctx_size; 2212 dev->for_each_count = 0; 2213 dev->unregistered = false; 2214 dev->refcnt = 0; 2215 2216 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2217 dev->name, dev->io_device, thread->name); 2218 2219 pthread_mutex_lock(&g_devlist_mutex); 2220 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2221 if (tmp != NULL) { 2222 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2223 io_device, tmp->name, dev->name); 2224 free(dev); 2225 } 2226 2227 pthread_mutex_unlock(&g_devlist_mutex); 2228 } 2229 2230 static void 2231 _finish_unregister(void *arg) 2232 { 2233 struct io_device *dev = arg; 2234 struct spdk_thread *thread; 2235 2236 thread = spdk_get_thread(); 2237 assert(thread == dev->unregister_thread); 2238 2239 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2240 dev->name, dev->io_device, thread->name); 2241 2242 assert(thread->pending_unregister_count > 0); 2243 thread->pending_unregister_count--; 2244 2245 dev->unregister_cb(dev->io_device); 2246 free(dev); 2247 } 2248 2249 static void 2250 io_device_free(struct io_device *dev) 2251 { 2252 int rc __attribute__((unused)); 2253 2254 if (dev->unregister_cb == NULL) { 2255 free(dev); 2256 } else { 2257 assert(dev->unregister_thread != NULL); 2258 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2259 dev->name, dev->io_device, dev->unregister_thread->name); 2260 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2261 assert(rc == 0); 2262 } 2263 } 2264 2265 void 2266 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2267 { 2268 struct io_device *dev; 2269 uint32_t refcnt; 2270 struct spdk_thread *thread; 2271 2272 thread = spdk_get_thread(); 2273 if (!thread) { 2274 SPDK_ERRLOG("called from non-SPDK thread\n"); 2275 assert(false); 2276 return; 2277 } 2278 2279 pthread_mutex_lock(&g_devlist_mutex); 2280 dev = io_device_get(io_device); 2281 if (!dev) { 2282 SPDK_ERRLOG("io_device %p not found\n", io_device); 2283 assert(false); 2284 pthread_mutex_unlock(&g_devlist_mutex); 2285 return; 2286 } 2287 2288 /* The for_each_count check differentiates the user attempting to unregister the 2289 * device a second time, from the internal call to this function that occurs 2290 * after the for_each_count reaches 0. 2291 */ 2292 if (dev->pending_unregister && dev->for_each_count > 0) { 2293 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2294 assert(false); 2295 pthread_mutex_unlock(&g_devlist_mutex); 2296 return; 2297 } 2298 2299 dev->unregister_cb = unregister_cb; 2300 dev->unregister_thread = thread; 2301 2302 if (dev->for_each_count > 0) { 2303 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2304 dev->name, io_device, dev->for_each_count); 2305 dev->pending_unregister = true; 2306 pthread_mutex_unlock(&g_devlist_mutex); 2307 return; 2308 } 2309 2310 dev->unregistered = true; 2311 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2312 refcnt = dev->refcnt; 2313 pthread_mutex_unlock(&g_devlist_mutex); 2314 2315 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2316 dev->name, dev->io_device, thread->name); 2317 2318 if (unregister_cb) { 2319 thread->pending_unregister_count++; 2320 } 2321 2322 if (refcnt > 0) { 2323 /* defer deletion */ 2324 return; 2325 } 2326 2327 io_device_free(dev); 2328 } 2329 2330 const char * 2331 spdk_io_device_get_name(struct io_device *dev) 2332 { 2333 return dev->name; 2334 } 2335 2336 static struct spdk_io_channel * 2337 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2338 { 2339 struct spdk_io_channel find = {}; 2340 2341 find.dev = dev; 2342 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2343 } 2344 2345 struct spdk_io_channel * 2346 spdk_get_io_channel(void *io_device) 2347 { 2348 struct spdk_io_channel *ch; 2349 struct spdk_thread *thread; 2350 struct io_device *dev; 2351 int rc; 2352 2353 pthread_mutex_lock(&g_devlist_mutex); 2354 dev = io_device_get(io_device); 2355 if (dev == NULL) { 2356 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2357 pthread_mutex_unlock(&g_devlist_mutex); 2358 return NULL; 2359 } 2360 2361 thread = _get_thread(); 2362 if (!thread) { 2363 SPDK_ERRLOG("No thread allocated\n"); 2364 pthread_mutex_unlock(&g_devlist_mutex); 2365 return NULL; 2366 } 2367 2368 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2369 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2370 pthread_mutex_unlock(&g_devlist_mutex); 2371 return NULL; 2372 } 2373 2374 ch = thread_get_io_channel(thread, dev); 2375 if (ch != NULL) { 2376 ch->ref++; 2377 2378 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2379 ch, dev->name, dev->io_device, thread->name, ch->ref); 2380 2381 /* 2382 * An I/O channel already exists for this device on this 2383 * thread, so return it. 2384 */ 2385 pthread_mutex_unlock(&g_devlist_mutex); 2386 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2387 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2388 return ch; 2389 } 2390 2391 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2392 if (ch == NULL) { 2393 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2394 pthread_mutex_unlock(&g_devlist_mutex); 2395 return NULL; 2396 } 2397 2398 ch->dev = dev; 2399 ch->destroy_cb = dev->destroy_cb; 2400 ch->thread = thread; 2401 ch->ref = 1; 2402 ch->destroy_ref = 0; 2403 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2404 2405 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2406 ch, dev->name, dev->io_device, thread->name, ch->ref); 2407 2408 dev->refcnt++; 2409 2410 pthread_mutex_unlock(&g_devlist_mutex); 2411 2412 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2413 if (rc != 0) { 2414 pthread_mutex_lock(&g_devlist_mutex); 2415 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2416 dev->refcnt--; 2417 free(ch); 2418 SPDK_ERRLOG("could not create io_channel for io_device %s (%p): %s (rc=%d)\n", 2419 dev->name, io_device, spdk_strerror(-rc), rc); 2420 pthread_mutex_unlock(&g_devlist_mutex); 2421 return NULL; 2422 } 2423 2424 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2425 return ch; 2426 } 2427 2428 static void 2429 put_io_channel(void *arg) 2430 { 2431 struct spdk_io_channel *ch = arg; 2432 bool do_remove_dev = true; 2433 struct spdk_thread *thread; 2434 2435 thread = spdk_get_thread(); 2436 if (!thread) { 2437 SPDK_ERRLOG("called from non-SPDK thread\n"); 2438 assert(false); 2439 return; 2440 } 2441 2442 SPDK_DEBUGLOG(thread, 2443 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2444 ch, ch->dev->name, ch->dev->io_device, thread->name); 2445 2446 assert(ch->thread == thread); 2447 2448 ch->destroy_ref--; 2449 2450 if (ch->ref > 0 || ch->destroy_ref > 0) { 2451 /* 2452 * Another reference to the associated io_device was requested 2453 * after this message was sent but before it had a chance to 2454 * execute. 2455 */ 2456 return; 2457 } 2458 2459 pthread_mutex_lock(&g_devlist_mutex); 2460 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2461 pthread_mutex_unlock(&g_devlist_mutex); 2462 2463 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2464 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2465 2466 pthread_mutex_lock(&g_devlist_mutex); 2467 ch->dev->refcnt--; 2468 2469 if (!ch->dev->unregistered) { 2470 do_remove_dev = false; 2471 } 2472 2473 if (ch->dev->refcnt > 0) { 2474 do_remove_dev = false; 2475 } 2476 2477 pthread_mutex_unlock(&g_devlist_mutex); 2478 2479 if (do_remove_dev) { 2480 io_device_free(ch->dev); 2481 } 2482 free(ch); 2483 } 2484 2485 void 2486 spdk_put_io_channel(struct spdk_io_channel *ch) 2487 { 2488 struct spdk_thread *thread; 2489 int rc __attribute__((unused)); 2490 2491 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2492 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2493 2494 thread = spdk_get_thread(); 2495 if (!thread) { 2496 SPDK_ERRLOG("called from non-SPDK thread\n"); 2497 assert(false); 2498 return; 2499 } 2500 2501 if (ch->thread != thread) { 2502 wrong_thread(__func__, "ch", ch->thread, thread); 2503 return; 2504 } 2505 2506 SPDK_DEBUGLOG(thread, 2507 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2508 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2509 2510 ch->ref--; 2511 2512 if (ch->ref == 0) { 2513 ch->destroy_ref++; 2514 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2515 assert(rc == 0); 2516 } 2517 } 2518 2519 struct spdk_io_channel * 2520 spdk_io_channel_from_ctx(void *ctx) 2521 { 2522 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2523 } 2524 2525 struct spdk_thread * 2526 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2527 { 2528 return ch->thread; 2529 } 2530 2531 void * 2532 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2533 { 2534 return ch->dev->io_device; 2535 } 2536 2537 const char * 2538 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2539 { 2540 return spdk_io_device_get_name(ch->dev); 2541 } 2542 2543 int 2544 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2545 { 2546 return ch->ref; 2547 } 2548 2549 struct spdk_io_channel_iter { 2550 void *io_device; 2551 struct io_device *dev; 2552 spdk_channel_msg fn; 2553 int status; 2554 void *ctx; 2555 struct spdk_io_channel *ch; 2556 2557 struct spdk_thread *cur_thread; 2558 2559 struct spdk_thread *orig_thread; 2560 spdk_channel_for_each_cpl cpl; 2561 }; 2562 2563 void * 2564 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2565 { 2566 return i->io_device; 2567 } 2568 2569 struct spdk_io_channel * 2570 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2571 { 2572 return i->ch; 2573 } 2574 2575 void * 2576 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2577 { 2578 return i->ctx; 2579 } 2580 2581 static void 2582 _call_completion(void *ctx) 2583 { 2584 struct spdk_io_channel_iter *i = ctx; 2585 2586 assert(i->orig_thread->for_each_count > 0); 2587 i->orig_thread->for_each_count--; 2588 2589 if (i->cpl != NULL) { 2590 i->cpl(i, i->status); 2591 } 2592 free(i); 2593 } 2594 2595 static void 2596 _call_channel(void *ctx) 2597 { 2598 struct spdk_io_channel_iter *i = ctx; 2599 struct spdk_io_channel *ch; 2600 2601 /* 2602 * It is possible that the channel was deleted before this 2603 * message had a chance to execute. If so, skip calling 2604 * the fn() on this thread. 2605 */ 2606 pthread_mutex_lock(&g_devlist_mutex); 2607 ch = thread_get_io_channel(i->cur_thread, i->dev); 2608 pthread_mutex_unlock(&g_devlist_mutex); 2609 2610 if (ch) { 2611 i->fn(i); 2612 } else { 2613 spdk_for_each_channel_continue(i, 0); 2614 } 2615 } 2616 2617 void 2618 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2619 spdk_channel_for_each_cpl cpl) 2620 { 2621 struct spdk_thread *thread; 2622 struct spdk_io_channel *ch; 2623 struct spdk_io_channel_iter *i; 2624 int rc __attribute__((unused)); 2625 2626 i = calloc(1, sizeof(*i)); 2627 if (!i) { 2628 SPDK_ERRLOG("Unable to allocate iterator\n"); 2629 assert(false); 2630 return; 2631 } 2632 2633 i->io_device = io_device; 2634 i->fn = fn; 2635 i->ctx = ctx; 2636 i->cpl = cpl; 2637 i->orig_thread = _get_thread(); 2638 2639 i->orig_thread->for_each_count++; 2640 2641 pthread_mutex_lock(&g_devlist_mutex); 2642 i->dev = io_device_get(io_device); 2643 if (i->dev == NULL) { 2644 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2645 assert(false); 2646 i->status = -ENODEV; 2647 goto end; 2648 } 2649 2650 /* Do not allow new for_each operations if we are already waiting to unregister 2651 * the device for other for_each operations to complete. 2652 */ 2653 if (i->dev->pending_unregister) { 2654 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2655 i->status = -ENODEV; 2656 goto end; 2657 } 2658 2659 TAILQ_FOREACH(thread, &g_threads, tailq) { 2660 ch = thread_get_io_channel(thread, i->dev); 2661 if (ch != NULL) { 2662 ch->dev->for_each_count++; 2663 i->cur_thread = thread; 2664 i->ch = ch; 2665 pthread_mutex_unlock(&g_devlist_mutex); 2666 rc = spdk_thread_send_msg(thread, _call_channel, i); 2667 assert(rc == 0); 2668 return; 2669 } 2670 } 2671 2672 end: 2673 pthread_mutex_unlock(&g_devlist_mutex); 2674 2675 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2676 assert(rc == 0); 2677 } 2678 2679 static void 2680 __pending_unregister(void *arg) 2681 { 2682 struct io_device *dev = arg; 2683 2684 assert(dev->pending_unregister); 2685 assert(dev->for_each_count == 0); 2686 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2687 } 2688 2689 void 2690 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2691 { 2692 struct spdk_thread *thread; 2693 struct spdk_io_channel *ch; 2694 struct io_device *dev; 2695 int rc __attribute__((unused)); 2696 2697 assert(i->cur_thread == spdk_get_thread()); 2698 2699 i->status = status; 2700 2701 pthread_mutex_lock(&g_devlist_mutex); 2702 dev = i->dev; 2703 if (status) { 2704 goto end; 2705 } 2706 2707 thread = TAILQ_NEXT(i->cur_thread, tailq); 2708 while (thread) { 2709 ch = thread_get_io_channel(thread, dev); 2710 if (ch != NULL) { 2711 i->cur_thread = thread; 2712 i->ch = ch; 2713 pthread_mutex_unlock(&g_devlist_mutex); 2714 rc = spdk_thread_send_msg(thread, _call_channel, i); 2715 assert(rc == 0); 2716 return; 2717 } 2718 thread = TAILQ_NEXT(thread, tailq); 2719 } 2720 2721 end: 2722 dev->for_each_count--; 2723 i->ch = NULL; 2724 pthread_mutex_unlock(&g_devlist_mutex); 2725 2726 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2727 assert(rc == 0); 2728 2729 pthread_mutex_lock(&g_devlist_mutex); 2730 if (dev->pending_unregister && dev->for_each_count == 0) { 2731 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2732 assert(rc == 0); 2733 } 2734 pthread_mutex_unlock(&g_devlist_mutex); 2735 } 2736 2737 static void 2738 thread_interrupt_destroy(struct spdk_thread *thread) 2739 { 2740 struct spdk_fd_group *fgrp = thread->fgrp; 2741 2742 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2743 2744 if (thread->msg_fd < 0) { 2745 return; 2746 } 2747 2748 spdk_fd_group_remove(fgrp, thread->msg_fd); 2749 close(thread->msg_fd); 2750 thread->msg_fd = -1; 2751 2752 spdk_fd_group_destroy(fgrp); 2753 thread->fgrp = NULL; 2754 } 2755 2756 #ifdef __linux__ 2757 static int 2758 thread_interrupt_msg_process(void *arg) 2759 { 2760 struct spdk_thread *thread = arg; 2761 struct spdk_thread *orig_thread; 2762 uint32_t msg_count; 2763 spdk_msg_fn critical_msg; 2764 int rc = 0; 2765 uint64_t notify = 1; 2766 2767 assert(spdk_interrupt_mode_is_enabled()); 2768 2769 orig_thread = spdk_get_thread(); 2770 spdk_set_thread(thread); 2771 2772 critical_msg = thread->critical_msg; 2773 if (spdk_unlikely(critical_msg != NULL)) { 2774 critical_msg(NULL); 2775 thread->critical_msg = NULL; 2776 rc = 1; 2777 } 2778 2779 msg_count = msg_queue_run_batch(thread, 0); 2780 if (msg_count) { 2781 rc = 1; 2782 } 2783 2784 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2785 if (spdk_unlikely(!thread->in_interrupt)) { 2786 /* The thread transitioned to poll mode in a msg during the above processing. 2787 * Clear msg_fd since thread messages will be polled directly in poll mode. 2788 */ 2789 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2790 if (rc < 0 && errno != EAGAIN) { 2791 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2792 } 2793 } 2794 2795 spdk_set_thread(orig_thread); 2796 return rc; 2797 } 2798 2799 static int 2800 thread_interrupt_create(struct spdk_thread *thread) 2801 { 2802 struct spdk_event_handler_opts opts = {}; 2803 int rc; 2804 2805 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2806 2807 rc = spdk_fd_group_create(&thread->fgrp); 2808 if (rc) { 2809 thread->msg_fd = -1; 2810 return rc; 2811 } 2812 2813 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2814 if (thread->msg_fd < 0) { 2815 rc = -errno; 2816 spdk_fd_group_destroy(thread->fgrp); 2817 thread->fgrp = NULL; 2818 2819 return rc; 2820 } 2821 2822 spdk_fd_group_get_default_event_handler_opts(&opts, sizeof(opts)); 2823 opts.fd_type = SPDK_FD_TYPE_EVENTFD; 2824 2825 return SPDK_FD_GROUP_ADD_EXT(thread->fgrp, thread->msg_fd, 2826 thread_interrupt_msg_process, thread, &opts); 2827 } 2828 #else 2829 static int 2830 thread_interrupt_create(struct spdk_thread *thread) 2831 { 2832 return -ENOTSUP; 2833 } 2834 #endif 2835 2836 static int 2837 _interrupt_wrapper(void *ctx) 2838 { 2839 struct spdk_interrupt *intr = ctx; 2840 struct spdk_thread *orig_thread, *thread; 2841 int rc; 2842 2843 orig_thread = spdk_get_thread(); 2844 thread = intr->thread; 2845 2846 spdk_set_thread(thread); 2847 2848 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2849 intr->fn, intr->arg); 2850 2851 rc = intr->fn(intr->arg); 2852 2853 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2854 2855 spdk_set_thread(orig_thread); 2856 2857 return rc; 2858 } 2859 2860 struct spdk_interrupt * 2861 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2862 void *arg, const char *name) 2863 { 2864 return spdk_interrupt_register_for_events(efd, SPDK_INTERRUPT_EVENT_IN, fn, arg, name); 2865 } 2866 2867 struct spdk_interrupt * 2868 spdk_interrupt_register_for_events(int efd, uint32_t events, spdk_interrupt_fn fn, void *arg, 2869 const char *name) 2870 { 2871 struct spdk_event_handler_opts opts = {}; 2872 2873 spdk_fd_group_get_default_event_handler_opts(&opts, sizeof(opts)); 2874 opts.events = events; 2875 opts.fd_type = SPDK_FD_TYPE_DEFAULT; 2876 2877 return spdk_interrupt_register_ext(efd, fn, arg, name, &opts); 2878 } 2879 2880 static struct spdk_interrupt * 2881 alloc_interrupt(int efd, struct spdk_fd_group *fgrp, spdk_interrupt_fn fn, void *arg, 2882 const char *name) 2883 { 2884 struct spdk_thread *thread; 2885 struct spdk_interrupt *intr; 2886 2887 thread = spdk_get_thread(); 2888 if (!thread) { 2889 assert(false); 2890 return NULL; 2891 } 2892 2893 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2894 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2895 return NULL; 2896 } 2897 2898 intr = calloc(1, sizeof(*intr)); 2899 if (intr == NULL) { 2900 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2901 return NULL; 2902 } 2903 2904 if (name) { 2905 snprintf(intr->name, sizeof(intr->name), "%s", name); 2906 } else { 2907 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2908 } 2909 2910 assert(efd < 0 || fgrp == NULL); 2911 intr->efd = efd; 2912 intr->fgrp = fgrp; 2913 intr->thread = thread; 2914 intr->fn = fn; 2915 intr->arg = arg; 2916 2917 return intr; 2918 } 2919 2920 struct spdk_interrupt * 2921 spdk_interrupt_register_ext(int efd, spdk_interrupt_fn fn, void *arg, const char *name, 2922 struct spdk_event_handler_opts *opts) 2923 { 2924 struct spdk_interrupt *intr; 2925 int ret; 2926 2927 intr = alloc_interrupt(efd, NULL, fn, arg, name); 2928 if (intr == NULL) { 2929 return NULL; 2930 } 2931 2932 ret = spdk_fd_group_add_ext(intr->thread->fgrp, efd, 2933 _interrupt_wrapper, intr, intr->name, opts); 2934 if (ret != 0) { 2935 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2936 intr->thread->name, efd, spdk_strerror(-ret)); 2937 free(intr); 2938 return NULL; 2939 } 2940 2941 return intr; 2942 } 2943 2944 static int 2945 interrupt_fd_group_wrapper(void *wrap_ctx, spdk_fd_fn cb_fn, void *cb_ctx) 2946 { 2947 struct spdk_interrupt *intr = wrap_ctx; 2948 struct spdk_thread *orig_thread, *thread; 2949 int rc; 2950 2951 orig_thread = spdk_get_thread(); 2952 thread = intr->thread; 2953 2954 spdk_set_thread(thread); 2955 rc = cb_fn(cb_ctx); 2956 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2957 spdk_set_thread(orig_thread); 2958 2959 return rc; 2960 } 2961 2962 struct spdk_interrupt * 2963 spdk_interrupt_register_fd_group(struct spdk_fd_group *fgrp, const char *name) 2964 { 2965 struct spdk_interrupt *intr; 2966 int rc; 2967 2968 intr = alloc_interrupt(-1, fgrp, NULL, NULL, name); 2969 if (intr == NULL) { 2970 return NULL; 2971 } 2972 2973 rc = spdk_fd_group_set_wrapper(fgrp, interrupt_fd_group_wrapper, intr); 2974 if (rc != 0) { 2975 SPDK_ERRLOG("thread %s: failed to set wrapper for fd_group %d: %s\n", 2976 intr->thread->name, spdk_fd_group_get_fd(fgrp), spdk_strerror(-rc)); 2977 free(intr); 2978 return NULL; 2979 } 2980 2981 rc = spdk_fd_group_nest(intr->thread->fgrp, fgrp); 2982 if (rc != 0) { 2983 SPDK_ERRLOG("thread %s: failed to nest fd_group %d: %s\n", 2984 intr->thread->name, spdk_fd_group_get_fd(fgrp), spdk_strerror(-rc)); 2985 spdk_fd_group_set_wrapper(fgrp, NULL, NULL); 2986 free(intr); 2987 return NULL; 2988 } 2989 2990 return intr; 2991 } 2992 2993 void 2994 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2995 { 2996 struct spdk_thread *thread; 2997 struct spdk_interrupt *intr; 2998 2999 intr = *pintr; 3000 if (intr == NULL) { 3001 return; 3002 } 3003 3004 *pintr = NULL; 3005 3006 thread = spdk_get_thread(); 3007 if (!thread) { 3008 assert(false); 3009 return; 3010 } 3011 3012 if (intr->thread != thread) { 3013 wrong_thread(__func__, intr->name, intr->thread, thread); 3014 return; 3015 } 3016 3017 if (intr->fgrp != NULL) { 3018 assert(intr->efd < 0); 3019 spdk_fd_group_unnest(thread->fgrp, intr->fgrp); 3020 spdk_fd_group_set_wrapper(thread->fgrp, NULL, NULL); 3021 } else { 3022 spdk_fd_group_remove(thread->fgrp, intr->efd); 3023 } 3024 3025 free(intr); 3026 } 3027 3028 int 3029 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 3030 enum spdk_interrupt_event_types event_types) 3031 { 3032 struct spdk_thread *thread; 3033 3034 thread = spdk_get_thread(); 3035 if (!thread) { 3036 assert(false); 3037 return -EINVAL; 3038 } 3039 3040 if (intr->thread != thread) { 3041 wrong_thread(__func__, intr->name, intr->thread, thread); 3042 return -EINVAL; 3043 } 3044 3045 if (intr->efd < 0) { 3046 assert(false); 3047 return -EINVAL; 3048 } 3049 3050 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 3051 } 3052 3053 int 3054 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 3055 { 3056 return spdk_fd_group_get_fd(thread->fgrp); 3057 } 3058 3059 struct spdk_fd_group * 3060 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 3061 { 3062 return thread->fgrp; 3063 } 3064 3065 static bool g_interrupt_mode = false; 3066 3067 int 3068 spdk_interrupt_mode_enable(void) 3069 { 3070 /* It must be called once prior to initializing the threading library. 3071 * g_spdk_msg_mempool will be valid if thread library is initialized. 3072 */ 3073 if (g_spdk_msg_mempool) { 3074 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 3075 return -1; 3076 } 3077 3078 #ifdef __linux__ 3079 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 3080 g_interrupt_mode = true; 3081 return 0; 3082 #else 3083 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 3084 g_interrupt_mode = false; 3085 return -ENOTSUP; 3086 #endif 3087 } 3088 3089 bool 3090 spdk_interrupt_mode_is_enabled(void) 3091 { 3092 return g_interrupt_mode; 3093 } 3094 3095 #define SSPIN_DEBUG_STACK_FRAMES 16 3096 3097 struct sspin_stack { 3098 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 3099 uint32_t depth; 3100 }; 3101 3102 struct spdk_spinlock_internal { 3103 struct sspin_stack init_stack; 3104 struct sspin_stack lock_stack; 3105 struct sspin_stack unlock_stack; 3106 }; 3107 3108 static void 3109 sspin_init_internal(struct spdk_spinlock *sspin) 3110 { 3111 #ifdef DEBUG 3112 sspin->internal = calloc(1, sizeof(*sspin->internal)); 3113 #endif 3114 } 3115 3116 static void 3117 sspin_fini_internal(struct spdk_spinlock *sspin) 3118 { 3119 #ifdef DEBUG 3120 free(sspin->internal); 3121 sspin->internal = NULL; 3122 #endif 3123 } 3124 3125 #if defined(DEBUG) && defined(SPDK_HAVE_EXECINFO_H) 3126 #define SSPIN_GET_STACK(sspin, which) \ 3127 do { \ 3128 if (sspin->internal != NULL) { \ 3129 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 3130 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 3131 } \ 3132 } while (0) 3133 #else 3134 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 3135 #endif 3136 3137 static void 3138 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 3139 { 3140 #ifdef SPDK_HAVE_EXECINFO_H 3141 char **stack; 3142 size_t i; 3143 3144 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 3145 if (stack == NULL) { 3146 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 3147 return; 3148 } 3149 SPDK_ERRLOG(" %s:\n", title); 3150 for (i = 0; i < sspin_stack->depth; i++) { 3151 /* 3152 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 3153 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 3154 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 3155 * gdb_macros.md in the docs directory for details. 3156 */ 3157 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 3158 } 3159 free(stack); 3160 #endif /* SPDK_HAVE_EXECINFO_H */ 3161 } 3162 3163 static void 3164 sspin_stacks_print(const struct spdk_spinlock *sspin) 3165 { 3166 if (sspin->internal == NULL) { 3167 return; 3168 } 3169 SPDK_ERRLOG("spinlock %p\n", sspin); 3170 sspin_stack_print("Lock initialized at", &sspin->internal->init_stack); 3171 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 3172 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 3173 } 3174 3175 void 3176 spdk_spin_init(struct spdk_spinlock *sspin) 3177 { 3178 int rc; 3179 3180 memset(sspin, 0, sizeof(*sspin)); 3181 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 3182 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3183 sspin_init_internal(sspin); 3184 SSPIN_GET_STACK(sspin, init); 3185 sspin->initialized = true; 3186 } 3187 3188 void 3189 spdk_spin_destroy(struct spdk_spinlock *sspin) 3190 { 3191 int rc; 3192 3193 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3194 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3195 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 3196 3197 rc = pthread_spin_destroy(&sspin->spinlock); 3198 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3199 3200 sspin_fini_internal(sspin); 3201 sspin->initialized = false; 3202 sspin->destroyed = true; 3203 } 3204 3205 void 3206 spdk_spin_lock(struct spdk_spinlock *sspin) 3207 { 3208 struct spdk_thread *thread = spdk_get_thread(); 3209 int rc; 3210 3211 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3212 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3213 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3214 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3215 3216 rc = pthread_spin_lock(&sspin->spinlock); 3217 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3218 3219 sspin->thread = thread; 3220 sspin->thread->lock_count++; 3221 3222 SSPIN_GET_STACK(sspin, lock); 3223 } 3224 3225 void 3226 spdk_spin_unlock(struct spdk_spinlock *sspin) 3227 { 3228 struct spdk_thread *thread = spdk_get_thread(); 3229 int rc; 3230 3231 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3232 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3233 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3234 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3235 3236 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3237 thread->lock_count--; 3238 sspin->thread = NULL; 3239 3240 SSPIN_GET_STACK(sspin, unlock); 3241 3242 rc = pthread_spin_unlock(&sspin->spinlock); 3243 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3244 } 3245 3246 bool 3247 spdk_spin_held(struct spdk_spinlock *sspin) 3248 { 3249 struct spdk_thread *thread = spdk_get_thread(); 3250 3251 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3252 3253 return sspin->thread == thread; 3254 } 3255 3256 void 3257 spdk_thread_register_post_poller_handler(spdk_post_poller_fn fn, void *fn_arg) 3258 { 3259 struct spdk_thread *thr; 3260 3261 thr = _get_thread(); 3262 assert(thr); 3263 if (spdk_unlikely(thr->num_pp_handlers == SPDK_THREAD_MAX_POST_POLLER_HANDLERS)) { 3264 SPDK_ERRLOG("Too many handlers registered"); 3265 return; 3266 } 3267 3268 thr->pp_handlers[thr->num_pp_handlers].fn = fn; 3269 thr->pp_handlers[thr->num_pp_handlers].fn_arg = fn_arg; 3270 thr->num_pp_handlers++; 3271 } 3272 3273 SPDK_LOG_REGISTER_COMPONENT(thread) 3274