1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #endif 29 30 #ifdef SPDK_HAVE_EXECINFO_H 31 #include <execinfo.h> 32 #endif 33 34 #define SPDK_MSG_BATCH_SIZE 8 35 #define SPDK_MAX_DEVICE_NAME_LEN 256 36 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 37 #define SPDK_MAX_POLLER_NAME_LEN 256 38 #define SPDK_MAX_THREAD_NAME_LEN 256 39 40 static struct spdk_thread *g_app_thread; 41 42 struct spdk_interrupt { 43 int efd; 44 struct spdk_thread *thread; 45 spdk_interrupt_fn fn; 46 void *arg; 47 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 48 }; 49 50 enum spdk_poller_state { 51 /* The poller is registered with a thread but not currently executing its fn. */ 52 SPDK_POLLER_STATE_WAITING, 53 54 /* The poller is currently running its fn. */ 55 SPDK_POLLER_STATE_RUNNING, 56 57 /* The poller was unregistered during the execution of its fn. */ 58 SPDK_POLLER_STATE_UNREGISTERED, 59 60 /* The poller is in the process of being paused. It will be paused 61 * during the next time it's supposed to be executed. 62 */ 63 SPDK_POLLER_STATE_PAUSING, 64 65 /* The poller is registered but currently paused. It's on the 66 * paused_pollers list. 67 */ 68 SPDK_POLLER_STATE_PAUSED, 69 }; 70 71 struct spdk_poller { 72 TAILQ_ENTRY(spdk_poller) tailq; 73 RB_ENTRY(spdk_poller) node; 74 75 /* Current state of the poller; should only be accessed from the poller's thread. */ 76 enum spdk_poller_state state; 77 78 uint64_t period_ticks; 79 uint64_t next_run_tick; 80 uint64_t run_count; 81 uint64_t busy_count; 82 uint64_t id; 83 spdk_poller_fn fn; 84 void *arg; 85 struct spdk_thread *thread; 86 struct spdk_interrupt *intr; 87 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 88 void *set_intr_cb_arg; 89 90 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 91 }; 92 93 enum spdk_thread_state { 94 /* The thread is processing poller and message by spdk_thread_poll(). */ 95 SPDK_THREAD_STATE_RUNNING, 96 97 /* The thread is in the process of termination. It reaps unregistering 98 * poller are releasing I/O channel. 99 */ 100 SPDK_THREAD_STATE_EXITING, 101 102 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 103 SPDK_THREAD_STATE_EXITED, 104 }; 105 106 struct spdk_thread { 107 uint64_t tsc_last; 108 struct spdk_thread_stats stats; 109 /* 110 * Contains pollers actively running on this thread. Pollers 111 * are run round-robin. The thread takes one poller from the head 112 * of the ring, executes it, then puts it back at the tail of 113 * the ring. 114 */ 115 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 116 /** 117 * Contains pollers running on this thread with a periodic timer. 118 */ 119 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 120 struct spdk_poller *first_timed_poller; 121 /* 122 * Contains paused pollers. Pollers on this queue are waiting until 123 * they are resumed (in which case they're put onto the active/timer 124 * queues) or unregistered. 125 */ 126 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 127 struct spdk_ring *messages; 128 int msg_fd; 129 SLIST_HEAD(, spdk_msg) msg_cache; 130 size_t msg_cache_count; 131 spdk_msg_fn critical_msg; 132 uint64_t id; 133 uint64_t next_poller_id; 134 enum spdk_thread_state state; 135 int pending_unregister_count; 136 uint32_t for_each_count; 137 138 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 139 TAILQ_ENTRY(spdk_thread) tailq; 140 141 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 142 struct spdk_cpuset cpumask; 143 uint64_t exit_timeout_tsc; 144 145 int32_t lock_count; 146 147 /* spdk_thread is bound to current CPU core. */ 148 bool is_bound; 149 150 /* Indicates whether this spdk_thread currently runs in interrupt. */ 151 bool in_interrupt; 152 bool poller_unregistered; 153 struct spdk_fd_group *fgrp; 154 155 uint16_t trace_id; 156 157 uint8_t reserved[6]; 158 159 /* User context allocated at the end */ 160 uint8_t ctx[0]; 161 }; 162 163 /* 164 * Assert that spdk_thread struct is 8 byte aligned to ensure 165 * the user ctx is also 8-byte aligned. 166 */ 167 SPDK_STATIC_ASSERT((sizeof(struct spdk_thread)) % 8 == 0, "Incorrect size"); 168 169 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 170 171 static spdk_new_thread_fn g_new_thread_fn = NULL; 172 static spdk_thread_op_fn g_thread_op_fn = NULL; 173 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 174 static size_t g_ctx_sz = 0; 175 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 176 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 177 * SPDK application is required. 178 */ 179 static uint64_t g_thread_id = 1; 180 181 enum spin_error { 182 SPIN_ERR_NONE, 183 /* Trying to use an SPDK lock while not on an SPDK thread */ 184 SPIN_ERR_NOT_SPDK_THREAD, 185 /* Trying to lock a lock already held by this SPDK thread */ 186 SPIN_ERR_DEADLOCK, 187 /* Trying to unlock a lock not held by this SPDK thread */ 188 SPIN_ERR_WRONG_THREAD, 189 /* pthread_spin_*() returned an error */ 190 SPIN_ERR_PTHREAD, 191 /* Trying to destroy a lock that is held */ 192 SPIN_ERR_LOCK_HELD, 193 /* lock_count is invalid */ 194 SPIN_ERR_LOCK_COUNT, 195 /* 196 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 197 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 198 * deadlock when another SPDK thread on the same pthread tries to take that lock. 199 */ 200 SPIN_ERR_HOLD_DURING_SWITCH, 201 /* Trying to use a lock that was destroyed (but not re-initialized) */ 202 SPIN_ERR_DESTROYED, 203 /* Trying to use a lock that is not initialized */ 204 SPIN_ERR_NOT_INITIALIZED, 205 206 /* Must be last, not an actual error code */ 207 SPIN_ERR_LAST 208 }; 209 210 static const char *spin_error_strings[] = { 211 [SPIN_ERR_NONE] = "No error", 212 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 213 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 214 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 215 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 216 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 217 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 218 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 219 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 220 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 221 }; 222 223 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 224 ? "Unknown error" : spin_error_strings[err] 225 226 static void 227 __posix_abort(enum spin_error err) 228 { 229 abort(); 230 } 231 232 typedef void (*spin_abort)(enum spin_error err); 233 spin_abort g_spin_abort_fn = __posix_abort; 234 235 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 236 do { \ 237 if (spdk_unlikely(!(cond))) { \ 238 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 239 SPIN_ERROR_STRING(err), #cond); \ 240 extra_log; \ 241 g_spin_abort_fn(err); \ 242 ret; \ 243 } \ 244 } while (0) 245 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 246 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 247 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 248 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 249 250 struct io_device { 251 void *io_device; 252 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 253 spdk_io_channel_create_cb create_cb; 254 spdk_io_channel_destroy_cb destroy_cb; 255 spdk_io_device_unregister_cb unregister_cb; 256 struct spdk_thread *unregister_thread; 257 uint32_t ctx_size; 258 uint32_t for_each_count; 259 RB_ENTRY(io_device) node; 260 261 uint32_t refcnt; 262 263 bool pending_unregister; 264 bool unregistered; 265 }; 266 267 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 268 269 static int 270 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 271 { 272 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 273 } 274 275 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 276 277 static int 278 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 279 { 280 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 281 } 282 283 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 284 285 struct spdk_msg { 286 spdk_msg_fn fn; 287 void *arg; 288 289 SLIST_ENTRY(spdk_msg) link; 290 }; 291 292 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 293 294 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 295 static uint32_t g_thread_count = 0; 296 297 static __thread struct spdk_thread *tls_thread = NULL; 298 299 static void 300 thread_trace(void) 301 { 302 struct spdk_trace_tpoint_opts opts[] = { 303 { 304 "THREAD_IOCH_GET", TRACE_THREAD_IOCH_GET, 305 OWNER_TYPE_NONE, OBJECT_NONE, 0, 306 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 307 }, 308 { 309 "THREAD_IOCH_PUT", TRACE_THREAD_IOCH_PUT, 310 OWNER_TYPE_NONE, OBJECT_NONE, 0, 311 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 312 } 313 }; 314 315 spdk_trace_register_owner_type(OWNER_TYPE_THREAD, 't'); 316 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 317 } 318 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 319 320 /* 321 * If this compare function returns zero when two next_run_ticks are equal, 322 * the macro RB_INSERT() returns a pointer to the element with the same 323 * next_run_tick. 324 * 325 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 326 * to remove as a parameter. 327 * 328 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 329 * side by returning 1 when two next_run_ticks are equal. 330 */ 331 static inline int 332 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 333 { 334 if (poller1->next_run_tick < poller2->next_run_tick) { 335 return -1; 336 } else { 337 return 1; 338 } 339 } 340 341 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 342 343 static inline struct spdk_thread * 344 _get_thread(void) 345 { 346 return tls_thread; 347 } 348 349 static int 350 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 351 { 352 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 353 354 g_ctx_sz = ctx_sz; 355 356 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 357 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 358 sizeof(struct spdk_msg), 359 0, /* No cache. We do our own. */ 360 SPDK_ENV_NUMA_ID_ANY); 361 362 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 363 msg_mempool_sz); 364 365 if (!g_spdk_msg_mempool) { 366 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 367 return -ENOMEM; 368 } 369 370 return 0; 371 } 372 373 static void thread_interrupt_destroy(struct spdk_thread *thread); 374 static int thread_interrupt_create(struct spdk_thread *thread); 375 376 static void 377 _free_thread(struct spdk_thread *thread) 378 { 379 struct spdk_io_channel *ch; 380 struct spdk_msg *msg; 381 struct spdk_poller *poller, *ptmp; 382 383 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 384 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 385 thread->name, ch->dev->name); 386 } 387 388 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 389 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 390 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 391 poller->name); 392 } 393 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 394 free(poller); 395 } 396 397 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 398 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 399 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 400 poller->name); 401 } 402 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 403 free(poller); 404 } 405 406 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 407 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 408 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 409 free(poller); 410 } 411 412 pthread_mutex_lock(&g_devlist_mutex); 413 assert(g_thread_count > 0); 414 g_thread_count--; 415 TAILQ_REMOVE(&g_threads, thread, tailq); 416 pthread_mutex_unlock(&g_devlist_mutex); 417 418 msg = SLIST_FIRST(&thread->msg_cache); 419 while (msg != NULL) { 420 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 421 422 assert(thread->msg_cache_count > 0); 423 thread->msg_cache_count--; 424 spdk_mempool_put(g_spdk_msg_mempool, msg); 425 426 msg = SLIST_FIRST(&thread->msg_cache); 427 } 428 429 assert(thread->msg_cache_count == 0); 430 431 if (spdk_interrupt_mode_is_enabled()) { 432 thread_interrupt_destroy(thread); 433 } 434 435 spdk_ring_free(thread->messages); 436 free(thread); 437 } 438 439 int 440 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 441 { 442 assert(g_new_thread_fn == NULL); 443 assert(g_thread_op_fn == NULL); 444 445 if (new_thread_fn == NULL) { 446 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 447 } else { 448 g_new_thread_fn = new_thread_fn; 449 } 450 451 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 452 } 453 454 int 455 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 456 spdk_thread_op_supported_fn thread_op_supported_fn, 457 size_t ctx_sz, size_t msg_mempool_sz) 458 { 459 assert(g_new_thread_fn == NULL); 460 assert(g_thread_op_fn == NULL); 461 assert(g_thread_op_supported_fn == NULL); 462 463 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 464 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 465 return -EINVAL; 466 } 467 468 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 469 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 470 } else { 471 g_thread_op_fn = thread_op_fn; 472 g_thread_op_supported_fn = thread_op_supported_fn; 473 } 474 475 return _thread_lib_init(ctx_sz, msg_mempool_sz); 476 } 477 478 void 479 spdk_thread_lib_fini(void) 480 { 481 struct io_device *dev; 482 483 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 484 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 485 } 486 487 g_new_thread_fn = NULL; 488 g_thread_op_fn = NULL; 489 g_thread_op_supported_fn = NULL; 490 g_ctx_sz = 0; 491 if (g_app_thread != NULL) { 492 _free_thread(g_app_thread); 493 g_app_thread = NULL; 494 } 495 496 if (g_spdk_msg_mempool) { 497 spdk_mempool_free(g_spdk_msg_mempool); 498 g_spdk_msg_mempool = NULL; 499 } 500 } 501 502 struct spdk_thread * 503 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 504 { 505 struct spdk_thread *thread, *null_thread; 506 size_t size = SPDK_ALIGN_CEIL(sizeof(*thread) + g_ctx_sz, SPDK_CACHE_LINE_SIZE); 507 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 508 int rc = 0, i; 509 510 /* Since this spdk_thread object will be used by another core, ensure that it won't share a 511 * cache line with any other object allocated on this core */ 512 rc = posix_memalign((void **)&thread, SPDK_CACHE_LINE_SIZE, size); 513 if (rc != 0) { 514 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 515 return NULL; 516 } 517 memset(thread, 0, size); 518 519 if (cpumask) { 520 spdk_cpuset_copy(&thread->cpumask, cpumask); 521 } else { 522 spdk_cpuset_negate(&thread->cpumask); 523 } 524 525 RB_INIT(&thread->io_channels); 526 TAILQ_INIT(&thread->active_pollers); 527 RB_INIT(&thread->timed_pollers); 528 TAILQ_INIT(&thread->paused_pollers); 529 SLIST_INIT(&thread->msg_cache); 530 thread->msg_cache_count = 0; 531 532 thread->tsc_last = spdk_get_ticks(); 533 534 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 535 * ID exceeds UINT64_MAX a warning message is logged 536 */ 537 thread->next_poller_id = 1; 538 539 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY); 540 if (!thread->messages) { 541 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 542 free(thread); 543 return NULL; 544 } 545 546 /* Fill the local message pool cache. */ 547 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 548 if (rc == 0) { 549 /* If we can't populate the cache it's ok. The cache will get filled 550 * up organically as messages are passed to the thread. */ 551 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 552 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 553 thread->msg_cache_count++; 554 } 555 } 556 557 if (name) { 558 snprintf(thread->name, sizeof(thread->name), "%s", name); 559 } else { 560 snprintf(thread->name, sizeof(thread->name), "%p", thread); 561 } 562 563 thread->trace_id = spdk_trace_register_owner(OWNER_TYPE_THREAD, thread->name); 564 565 pthread_mutex_lock(&g_devlist_mutex); 566 if (g_thread_id == 0) { 567 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 568 pthread_mutex_unlock(&g_devlist_mutex); 569 _free_thread(thread); 570 return NULL; 571 } 572 thread->id = g_thread_id++; 573 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 574 g_thread_count++; 575 pthread_mutex_unlock(&g_devlist_mutex); 576 577 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 578 thread->id, thread->name); 579 580 if (spdk_interrupt_mode_is_enabled()) { 581 thread->in_interrupt = true; 582 rc = thread_interrupt_create(thread); 583 if (rc != 0) { 584 _free_thread(thread); 585 return NULL; 586 } 587 } 588 589 if (g_new_thread_fn) { 590 rc = g_new_thread_fn(thread); 591 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 592 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 593 } 594 595 if (rc != 0) { 596 _free_thread(thread); 597 return NULL; 598 } 599 600 thread->state = SPDK_THREAD_STATE_RUNNING; 601 602 /* If this is the first thread, save it as the app thread. Use an atomic 603 * compare + exchange to guard against crazy users who might try to 604 * call spdk_thread_create() simultaneously on multiple threads. 605 */ 606 null_thread = NULL; 607 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 608 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 609 610 return thread; 611 } 612 613 struct spdk_thread * 614 spdk_thread_get_app_thread(void) 615 { 616 return g_app_thread; 617 } 618 619 bool 620 spdk_thread_is_app_thread(struct spdk_thread *thread) 621 { 622 if (thread == NULL) { 623 thread = _get_thread(); 624 } 625 626 return g_app_thread == thread; 627 } 628 629 void 630 spdk_thread_bind(struct spdk_thread *thread, bool bind) 631 { 632 thread->is_bound = bind; 633 } 634 635 bool 636 spdk_thread_is_bound(struct spdk_thread *thread) 637 { 638 return thread->is_bound; 639 } 640 641 void 642 spdk_set_thread(struct spdk_thread *thread) 643 { 644 tls_thread = thread; 645 } 646 647 static void 648 thread_exit(struct spdk_thread *thread, uint64_t now) 649 { 650 struct spdk_poller *poller; 651 struct spdk_io_channel *ch; 652 653 if (now >= thread->exit_timeout_tsc) { 654 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 655 thread->name); 656 goto exited; 657 } 658 659 if (spdk_ring_count(thread->messages) > 0) { 660 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 661 return; 662 } 663 664 if (thread->for_each_count > 0) { 665 SPDK_INFOLOG(thread, "thread %s is still executing %u for_each_channels/threads\n", 666 thread->name, thread->for_each_count); 667 return; 668 } 669 670 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 671 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 672 SPDK_INFOLOG(thread, 673 "thread %s still has active poller %s\n", 674 thread->name, poller->name); 675 return; 676 } 677 } 678 679 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 680 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 681 SPDK_INFOLOG(thread, 682 "thread %s still has active timed poller %s\n", 683 thread->name, poller->name); 684 return; 685 } 686 } 687 688 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 689 SPDK_INFOLOG(thread, 690 "thread %s still has paused poller %s\n", 691 thread->name, poller->name); 692 return; 693 } 694 695 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 696 SPDK_INFOLOG(thread, 697 "thread %s still has channel for io_device %s\n", 698 thread->name, ch->dev->name); 699 return; 700 } 701 702 if (thread->pending_unregister_count > 0) { 703 SPDK_INFOLOG(thread, 704 "thread %s is still unregistering io_devices\n", 705 thread->name); 706 return; 707 } 708 709 exited: 710 thread->state = SPDK_THREAD_STATE_EXITED; 711 if (spdk_unlikely(thread->in_interrupt)) { 712 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 713 } 714 } 715 716 static void _thread_exit(void *ctx); 717 718 int 719 spdk_thread_exit(struct spdk_thread *thread) 720 { 721 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 722 723 assert(tls_thread == thread); 724 725 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 726 SPDK_INFOLOG(thread, 727 "thread %s is already exiting\n", 728 thread->name); 729 return 0; 730 } 731 732 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 733 SPDK_THREAD_EXIT_TIMEOUT_SEC); 734 thread->state = SPDK_THREAD_STATE_EXITING; 735 736 if (spdk_interrupt_mode_is_enabled()) { 737 spdk_thread_send_msg(thread, _thread_exit, thread); 738 } 739 740 return 0; 741 } 742 743 bool 744 spdk_thread_is_running(struct spdk_thread *thread) 745 { 746 return thread->state == SPDK_THREAD_STATE_RUNNING; 747 } 748 749 bool 750 spdk_thread_is_exited(struct spdk_thread *thread) 751 { 752 return thread->state == SPDK_THREAD_STATE_EXITED; 753 } 754 755 void 756 spdk_thread_destroy(struct spdk_thread *thread) 757 { 758 assert(thread != NULL); 759 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 760 761 assert(thread->state == SPDK_THREAD_STATE_EXITED); 762 763 if (tls_thread == thread) { 764 tls_thread = NULL; 765 } 766 767 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 768 if (thread != g_app_thread) { 769 _free_thread(thread); 770 } 771 } 772 773 void * 774 spdk_thread_get_ctx(struct spdk_thread *thread) 775 { 776 if (g_ctx_sz > 0) { 777 return thread->ctx; 778 } 779 780 return NULL; 781 } 782 783 struct spdk_cpuset * 784 spdk_thread_get_cpumask(struct spdk_thread *thread) 785 { 786 return &thread->cpumask; 787 } 788 789 int 790 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 791 { 792 struct spdk_thread *thread; 793 794 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 795 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 796 assert(false); 797 return -ENOTSUP; 798 } 799 800 thread = spdk_get_thread(); 801 if (!thread) { 802 SPDK_ERRLOG("Called from non-SPDK thread\n"); 803 assert(false); 804 return -EINVAL; 805 } 806 807 spdk_cpuset_copy(&thread->cpumask, cpumask); 808 809 /* Invoke framework's reschedule operation. If this function is called multiple times 810 * in a single spdk_thread_poll() context, the last cpumask will be used in the 811 * reschedule operation. 812 */ 813 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 814 815 return 0; 816 } 817 818 struct spdk_thread * 819 spdk_thread_get_from_ctx(void *ctx) 820 { 821 if (ctx == NULL) { 822 assert(false); 823 return NULL; 824 } 825 826 assert(g_ctx_sz > 0); 827 828 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 829 } 830 831 static inline uint32_t 832 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 833 { 834 unsigned count, i; 835 void *messages[SPDK_MSG_BATCH_SIZE]; 836 uint64_t notify = 1; 837 int rc; 838 839 #ifdef DEBUG 840 /* 841 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 842 * so we will never actually read uninitialized data from events, but just to be sure 843 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 844 */ 845 memset(messages, 0, sizeof(messages)); 846 #endif 847 848 if (max_msgs > 0) { 849 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 850 } else { 851 max_msgs = SPDK_MSG_BATCH_SIZE; 852 } 853 854 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 855 if (spdk_unlikely(thread->in_interrupt) && 856 spdk_ring_count(thread->messages) != 0) { 857 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 858 if (rc < 0) { 859 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 860 } 861 } 862 if (count == 0) { 863 return 0; 864 } 865 866 for (i = 0; i < count; i++) { 867 struct spdk_msg *msg = messages[i]; 868 869 assert(msg != NULL); 870 871 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 872 873 msg->fn(msg->arg); 874 875 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 876 877 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 878 /* Insert the messages at the head. We want to re-use the hot 879 * ones. */ 880 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 881 thread->msg_cache_count++; 882 } else { 883 spdk_mempool_put(g_spdk_msg_mempool, msg); 884 } 885 } 886 887 return count; 888 } 889 890 static void 891 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 892 { 893 struct spdk_poller *tmp __attribute__((unused)); 894 895 poller->next_run_tick = now + poller->period_ticks; 896 897 /* 898 * Insert poller in the thread's timed_pollers tree by next scheduled run time 899 * as its key. 900 */ 901 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 902 assert(tmp == NULL); 903 904 /* Update the cache only if it is empty or the inserted poller is earlier than it. 905 * RB_MIN() is not necessary here because all pollers, which has exactly the same 906 * next_run_tick as the existing poller, are inserted on the right side. 907 */ 908 if (thread->first_timed_poller == NULL || 909 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 910 thread->first_timed_poller = poller; 911 } 912 } 913 914 static inline void 915 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 916 { 917 struct spdk_poller *tmp __attribute__((unused)); 918 919 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 920 assert(tmp != NULL); 921 922 /* This function is not used in any case that is performance critical. 923 * Update the cache simply by RB_MIN() if it needs to be changed. 924 */ 925 if (thread->first_timed_poller == poller) { 926 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 927 } 928 } 929 930 static void 931 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 932 { 933 if (poller->period_ticks) { 934 poller_insert_timer(thread, poller, spdk_get_ticks()); 935 } else { 936 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 937 } 938 } 939 940 static inline void 941 thread_update_stats(struct spdk_thread *thread, uint64_t end, 942 uint64_t start, int rc) 943 { 944 if (rc == 0) { 945 /* Poller status idle */ 946 thread->stats.idle_tsc += end - start; 947 } else if (rc > 0) { 948 /* Poller status busy */ 949 thread->stats.busy_tsc += end - start; 950 } 951 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 952 thread->tsc_last = end; 953 } 954 955 static inline int 956 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 957 { 958 int rc; 959 960 switch (poller->state) { 961 case SPDK_POLLER_STATE_UNREGISTERED: 962 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 963 free(poller); 964 return 0; 965 case SPDK_POLLER_STATE_PAUSING: 966 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 967 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 968 poller->state = SPDK_POLLER_STATE_PAUSED; 969 return 0; 970 case SPDK_POLLER_STATE_WAITING: 971 break; 972 default: 973 assert(false); 974 break; 975 } 976 977 poller->state = SPDK_POLLER_STATE_RUNNING; 978 rc = poller->fn(poller->arg); 979 980 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 981 982 poller->run_count++; 983 if (rc > 0) { 984 poller->busy_count++; 985 } 986 987 #ifdef DEBUG 988 if (rc == -1) { 989 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 990 } 991 #endif 992 993 switch (poller->state) { 994 case SPDK_POLLER_STATE_UNREGISTERED: 995 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 996 free(poller); 997 break; 998 case SPDK_POLLER_STATE_PAUSING: 999 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1000 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1001 poller->state = SPDK_POLLER_STATE_PAUSED; 1002 break; 1003 case SPDK_POLLER_STATE_PAUSED: 1004 case SPDK_POLLER_STATE_WAITING: 1005 break; 1006 case SPDK_POLLER_STATE_RUNNING: 1007 poller->state = SPDK_POLLER_STATE_WAITING; 1008 break; 1009 default: 1010 assert(false); 1011 break; 1012 } 1013 1014 return rc; 1015 } 1016 1017 static inline int 1018 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 1019 uint64_t now) 1020 { 1021 int rc; 1022 1023 switch (poller->state) { 1024 case SPDK_POLLER_STATE_UNREGISTERED: 1025 free(poller); 1026 return 0; 1027 case SPDK_POLLER_STATE_PAUSING: 1028 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1029 poller->state = SPDK_POLLER_STATE_PAUSED; 1030 return 0; 1031 case SPDK_POLLER_STATE_WAITING: 1032 break; 1033 default: 1034 assert(false); 1035 break; 1036 } 1037 1038 poller->state = SPDK_POLLER_STATE_RUNNING; 1039 rc = poller->fn(poller->arg); 1040 1041 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1042 1043 poller->run_count++; 1044 if (rc > 0) { 1045 poller->busy_count++; 1046 } 1047 1048 #ifdef DEBUG 1049 if (rc == -1) { 1050 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1051 } 1052 #endif 1053 1054 switch (poller->state) { 1055 case SPDK_POLLER_STATE_UNREGISTERED: 1056 free(poller); 1057 break; 1058 case SPDK_POLLER_STATE_PAUSING: 1059 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1060 poller->state = SPDK_POLLER_STATE_PAUSED; 1061 break; 1062 case SPDK_POLLER_STATE_PAUSED: 1063 break; 1064 case SPDK_POLLER_STATE_RUNNING: 1065 poller->state = SPDK_POLLER_STATE_WAITING; 1066 /* fallthrough */ 1067 case SPDK_POLLER_STATE_WAITING: 1068 poller_insert_timer(thread, poller, now); 1069 break; 1070 default: 1071 assert(false); 1072 break; 1073 } 1074 1075 return rc; 1076 } 1077 1078 static int 1079 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1080 { 1081 uint32_t msg_count; 1082 struct spdk_poller *poller, *tmp; 1083 spdk_msg_fn critical_msg; 1084 int rc = 0; 1085 1086 thread->tsc_last = now; 1087 1088 critical_msg = thread->critical_msg; 1089 if (spdk_unlikely(critical_msg != NULL)) { 1090 critical_msg(NULL); 1091 thread->critical_msg = NULL; 1092 rc = 1; 1093 } 1094 1095 msg_count = msg_queue_run_batch(thread, max_msgs); 1096 if (msg_count) { 1097 rc = 1; 1098 } 1099 1100 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1101 active_pollers_head, tailq, tmp) { 1102 int poller_rc; 1103 1104 poller_rc = thread_execute_poller(thread, poller); 1105 if (poller_rc > rc) { 1106 rc = poller_rc; 1107 } 1108 } 1109 1110 poller = thread->first_timed_poller; 1111 while (poller != NULL) { 1112 int timer_rc = 0; 1113 1114 if (now < poller->next_run_tick) { 1115 break; 1116 } 1117 1118 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1119 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1120 1121 /* Update the cache to the next timed poller in the list 1122 * only if the current poller is still the closest, otherwise, 1123 * do nothing because the cache has been already updated. 1124 */ 1125 if (thread->first_timed_poller == poller) { 1126 thread->first_timed_poller = tmp; 1127 } 1128 1129 timer_rc = thread_execute_timed_poller(thread, poller, now); 1130 if (timer_rc > rc) { 1131 rc = timer_rc; 1132 } 1133 1134 poller = tmp; 1135 } 1136 1137 return rc; 1138 } 1139 1140 static void 1141 _thread_remove_pollers(void *ctx) 1142 { 1143 struct spdk_thread *thread = ctx; 1144 struct spdk_poller *poller, *tmp; 1145 1146 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1147 active_pollers_head, tailq, tmp) { 1148 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1149 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1150 free(poller); 1151 } 1152 } 1153 1154 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1155 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1156 poller_remove_timer(thread, poller); 1157 free(poller); 1158 } 1159 } 1160 1161 thread->poller_unregistered = false; 1162 } 1163 1164 static void 1165 _thread_exit(void *ctx) 1166 { 1167 struct spdk_thread *thread = ctx; 1168 1169 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1170 1171 thread_exit(thread, spdk_get_ticks()); 1172 1173 if (thread->state != SPDK_THREAD_STATE_EXITED) { 1174 spdk_thread_send_msg(thread, _thread_exit, thread); 1175 } 1176 } 1177 1178 int 1179 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1180 { 1181 struct spdk_thread *orig_thread; 1182 int rc; 1183 1184 orig_thread = _get_thread(); 1185 tls_thread = thread; 1186 1187 if (now == 0) { 1188 now = spdk_get_ticks(); 1189 } 1190 1191 if (spdk_likely(!thread->in_interrupt)) { 1192 rc = thread_poll(thread, max_msgs, now); 1193 if (spdk_unlikely(thread->in_interrupt)) { 1194 /* The thread transitioned to interrupt mode during the above poll. 1195 * Poll it one more time in case that during the transition time 1196 * there is msg received without notification. 1197 */ 1198 rc = thread_poll(thread, max_msgs, now); 1199 } 1200 1201 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1202 thread_exit(thread, now); 1203 } 1204 } else { 1205 /* Non-block wait on thread's fd_group */ 1206 rc = spdk_fd_group_wait(thread->fgrp, 0); 1207 } 1208 1209 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1210 1211 tls_thread = orig_thread; 1212 1213 return rc; 1214 } 1215 1216 uint64_t 1217 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1218 { 1219 struct spdk_poller *poller; 1220 1221 poller = thread->first_timed_poller; 1222 if (poller) { 1223 return poller->next_run_tick; 1224 } 1225 1226 return 0; 1227 } 1228 1229 int 1230 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1231 { 1232 return !TAILQ_EMPTY(&thread->active_pollers); 1233 } 1234 1235 static bool 1236 thread_has_unpaused_pollers(struct spdk_thread *thread) 1237 { 1238 if (TAILQ_EMPTY(&thread->active_pollers) && 1239 RB_EMPTY(&thread->timed_pollers)) { 1240 return false; 1241 } 1242 1243 return true; 1244 } 1245 1246 bool 1247 spdk_thread_has_pollers(struct spdk_thread *thread) 1248 { 1249 if (!thread_has_unpaused_pollers(thread) && 1250 TAILQ_EMPTY(&thread->paused_pollers)) { 1251 return false; 1252 } 1253 1254 return true; 1255 } 1256 1257 bool 1258 spdk_thread_is_idle(struct spdk_thread *thread) 1259 { 1260 if (spdk_ring_count(thread->messages) || 1261 thread_has_unpaused_pollers(thread) || 1262 thread->critical_msg != NULL) { 1263 return false; 1264 } 1265 1266 return true; 1267 } 1268 1269 uint32_t 1270 spdk_thread_get_count(void) 1271 { 1272 /* 1273 * Return cached value of the current thread count. We could acquire the 1274 * lock and iterate through the TAILQ of threads to count them, but that 1275 * count could still be invalidated after we release the lock. 1276 */ 1277 return g_thread_count; 1278 } 1279 1280 struct spdk_thread * 1281 spdk_get_thread(void) 1282 { 1283 return _get_thread(); 1284 } 1285 1286 const char * 1287 spdk_thread_get_name(const struct spdk_thread *thread) 1288 { 1289 return thread->name; 1290 } 1291 1292 uint64_t 1293 spdk_thread_get_id(const struct spdk_thread *thread) 1294 { 1295 return thread->id; 1296 } 1297 1298 struct spdk_thread * 1299 spdk_thread_get_by_id(uint64_t id) 1300 { 1301 struct spdk_thread *thread; 1302 1303 if (id == 0 || id >= g_thread_id) { 1304 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1305 return NULL; 1306 } 1307 pthread_mutex_lock(&g_devlist_mutex); 1308 TAILQ_FOREACH(thread, &g_threads, tailq) { 1309 if (thread->id == id) { 1310 break; 1311 } 1312 } 1313 pthread_mutex_unlock(&g_devlist_mutex); 1314 return thread; 1315 } 1316 1317 int 1318 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1319 { 1320 struct spdk_thread *thread; 1321 1322 thread = _get_thread(); 1323 if (!thread) { 1324 SPDK_ERRLOG("No thread allocated\n"); 1325 return -EINVAL; 1326 } 1327 1328 if (stats == NULL) { 1329 return -EINVAL; 1330 } 1331 1332 *stats = thread->stats; 1333 1334 return 0; 1335 } 1336 1337 uint64_t 1338 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1339 { 1340 if (thread == NULL) { 1341 thread = _get_thread(); 1342 } 1343 1344 return thread->tsc_last; 1345 } 1346 1347 static inline int 1348 thread_send_msg_notification(const struct spdk_thread *target_thread) 1349 { 1350 uint64_t notify = 1; 1351 int rc; 1352 1353 /* Not necessary to do notification if interrupt facility is not enabled */ 1354 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1355 return 0; 1356 } 1357 1358 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1359 * after sending thread msg, it is necessary to check whether target thread runs in 1360 * interrupt mode and then decide whether do event notification. 1361 */ 1362 if (spdk_unlikely(target_thread->in_interrupt)) { 1363 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1364 if (rc < 0) { 1365 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1366 return -EIO; 1367 } 1368 } 1369 1370 return 0; 1371 } 1372 1373 int 1374 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1375 { 1376 struct spdk_thread *local_thread; 1377 struct spdk_msg *msg; 1378 int rc; 1379 1380 assert(thread != NULL); 1381 1382 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1383 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1384 return -EIO; 1385 } 1386 1387 local_thread = _get_thread(); 1388 1389 msg = NULL; 1390 if (local_thread != NULL) { 1391 if (local_thread->msg_cache_count > 0) { 1392 msg = SLIST_FIRST(&local_thread->msg_cache); 1393 assert(msg != NULL); 1394 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1395 local_thread->msg_cache_count--; 1396 } 1397 } 1398 1399 if (msg == NULL) { 1400 msg = spdk_mempool_get(g_spdk_msg_mempool); 1401 if (!msg) { 1402 SPDK_ERRLOG("msg could not be allocated\n"); 1403 return -ENOMEM; 1404 } 1405 } 1406 1407 msg->fn = fn; 1408 msg->arg = ctx; 1409 1410 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1411 if (rc != 1) { 1412 SPDK_ERRLOG("msg could not be enqueued\n"); 1413 spdk_mempool_put(g_spdk_msg_mempool, msg); 1414 return -EIO; 1415 } 1416 1417 return thread_send_msg_notification(thread); 1418 } 1419 1420 int 1421 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1422 { 1423 spdk_msg_fn expected = NULL; 1424 1425 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1426 __ATOMIC_SEQ_CST)) { 1427 return -EIO; 1428 } 1429 1430 return thread_send_msg_notification(thread); 1431 } 1432 1433 #ifdef __linux__ 1434 static int 1435 interrupt_timerfd_process(void *arg) 1436 { 1437 struct spdk_poller *poller = arg; 1438 uint64_t exp; 1439 int rc; 1440 1441 /* clear the level of interval timer */ 1442 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1443 if (rc < 0) { 1444 if (rc == -EAGAIN) { 1445 return 0; 1446 } 1447 1448 return rc; 1449 } 1450 1451 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1452 1453 return poller->fn(poller->arg); 1454 } 1455 1456 static int 1457 period_poller_interrupt_init(struct spdk_poller *poller) 1458 { 1459 int timerfd; 1460 1461 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1462 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1463 if (timerfd < 0) { 1464 return -errno; 1465 } 1466 1467 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1468 if (poller->intr == NULL) { 1469 close(timerfd); 1470 return -1; 1471 } 1472 1473 return 0; 1474 } 1475 1476 static void 1477 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1478 { 1479 int timerfd; 1480 uint64_t now_tick = spdk_get_ticks(); 1481 uint64_t ticks = spdk_get_ticks_hz(); 1482 int ret; 1483 struct itimerspec new_tv = {}; 1484 struct itimerspec old_tv = {}; 1485 1486 assert(poller->intr != NULL); 1487 assert(poller->period_ticks != 0); 1488 1489 timerfd = poller->intr->efd; 1490 1491 assert(timerfd >= 0); 1492 1493 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1494 interrupt_mode ? "interrupt" : "poll"); 1495 1496 if (interrupt_mode) { 1497 /* Set repeated timer expiration */ 1498 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1499 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1500 1501 /* Update next timer expiration */ 1502 if (poller->next_run_tick == 0) { 1503 poller->next_run_tick = now_tick + poller->period_ticks; 1504 } else if (poller->next_run_tick < now_tick) { 1505 poller->next_run_tick = now_tick; 1506 } 1507 1508 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1509 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1510 1511 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1512 if (ret < 0) { 1513 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1514 assert(false); 1515 } 1516 } else { 1517 /* Disarm the timer */ 1518 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1519 if (ret < 0) { 1520 /* timerfd_settime's failure indicates that the timerfd is in error */ 1521 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1522 assert(false); 1523 } 1524 1525 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1526 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1527 */ 1528 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1529 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1530 poller_remove_timer(poller->thread, poller); 1531 poller_insert_timer(poller->thread, poller, now_tick); 1532 } 1533 } 1534 1535 static void 1536 poller_interrupt_fini(struct spdk_poller *poller) 1537 { 1538 int fd; 1539 1540 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1541 assert(poller->intr != NULL); 1542 fd = poller->intr->efd; 1543 spdk_interrupt_unregister(&poller->intr); 1544 close(fd); 1545 } 1546 1547 static int 1548 busy_poller_interrupt_init(struct spdk_poller *poller) 1549 { 1550 int busy_efd; 1551 1552 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1553 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1554 if (busy_efd < 0) { 1555 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1556 return -errno; 1557 } 1558 1559 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1560 if (poller->intr == NULL) { 1561 close(busy_efd); 1562 return -1; 1563 } 1564 1565 return 0; 1566 } 1567 1568 static void 1569 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1570 { 1571 int busy_efd = poller->intr->efd; 1572 uint64_t notify = 1; 1573 int rc __attribute__((unused)); 1574 1575 assert(busy_efd >= 0); 1576 1577 if (interrupt_mode) { 1578 /* Write without read on eventfd will get it repeatedly triggered. */ 1579 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1580 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1581 } 1582 } else { 1583 /* Read on eventfd will clear its level triggering. */ 1584 rc = read(busy_efd, ¬ify, sizeof(notify)); 1585 } 1586 } 1587 1588 #else 1589 1590 static int 1591 period_poller_interrupt_init(struct spdk_poller *poller) 1592 { 1593 return -ENOTSUP; 1594 } 1595 1596 static void 1597 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1598 { 1599 } 1600 1601 static void 1602 poller_interrupt_fini(struct spdk_poller *poller) 1603 { 1604 } 1605 1606 static int 1607 busy_poller_interrupt_init(struct spdk_poller *poller) 1608 { 1609 return -ENOTSUP; 1610 } 1611 1612 static void 1613 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1614 { 1615 } 1616 1617 #endif 1618 1619 void 1620 spdk_poller_register_interrupt(struct spdk_poller *poller, 1621 spdk_poller_set_interrupt_mode_cb cb_fn, 1622 void *cb_arg) 1623 { 1624 assert(poller != NULL); 1625 assert(spdk_get_thread() == poller->thread); 1626 1627 if (!spdk_interrupt_mode_is_enabled()) { 1628 return; 1629 } 1630 1631 /* If this poller already had an interrupt, clean the old one up. */ 1632 if (poller->intr != NULL) { 1633 poller_interrupt_fini(poller); 1634 } 1635 1636 poller->set_intr_cb_fn = cb_fn; 1637 poller->set_intr_cb_arg = cb_arg; 1638 1639 /* Set poller into interrupt mode if thread is in interrupt. */ 1640 if (poller->thread->in_interrupt && poller->set_intr_cb_fn) { 1641 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1642 } 1643 } 1644 1645 static uint64_t 1646 convert_us_to_ticks(uint64_t us) 1647 { 1648 uint64_t quotient, remainder, ticks; 1649 1650 if (us) { 1651 quotient = us / SPDK_SEC_TO_USEC; 1652 remainder = us % SPDK_SEC_TO_USEC; 1653 ticks = spdk_get_ticks_hz(); 1654 1655 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1656 } else { 1657 return 0; 1658 } 1659 } 1660 1661 static struct spdk_poller * 1662 poller_register(spdk_poller_fn fn, 1663 void *arg, 1664 uint64_t period_microseconds, 1665 const char *name) 1666 { 1667 struct spdk_thread *thread; 1668 struct spdk_poller *poller; 1669 1670 thread = spdk_get_thread(); 1671 if (!thread) { 1672 assert(false); 1673 return NULL; 1674 } 1675 1676 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1677 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1678 return NULL; 1679 } 1680 1681 poller = calloc(1, sizeof(*poller)); 1682 if (poller == NULL) { 1683 SPDK_ERRLOG("Poller memory allocation failed\n"); 1684 return NULL; 1685 } 1686 1687 if (name) { 1688 snprintf(poller->name, sizeof(poller->name), "%s", name); 1689 } else { 1690 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1691 } 1692 1693 poller->state = SPDK_POLLER_STATE_WAITING; 1694 poller->fn = fn; 1695 poller->arg = arg; 1696 poller->thread = thread; 1697 poller->intr = NULL; 1698 if (thread->next_poller_id == 0) { 1699 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1700 thread->next_poller_id = 1; 1701 } 1702 poller->id = thread->next_poller_id++; 1703 1704 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1705 1706 if (spdk_interrupt_mode_is_enabled()) { 1707 int rc; 1708 1709 if (period_microseconds) { 1710 rc = period_poller_interrupt_init(poller); 1711 if (rc < 0) { 1712 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1713 free(poller); 1714 return NULL; 1715 } 1716 1717 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1718 poller->set_intr_cb_arg = NULL; 1719 1720 } else { 1721 /* If the poller doesn't have a period, create interruptfd that's always 1722 * busy automatically when running in interrupt mode. 1723 */ 1724 rc = busy_poller_interrupt_init(poller); 1725 if (rc > 0) { 1726 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1727 free(poller); 1728 return NULL; 1729 } 1730 1731 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1732 poller->set_intr_cb_arg = NULL; 1733 } 1734 1735 /* Set poller into interrupt mode if thread is in interrupt. */ 1736 if (poller->thread->in_interrupt) { 1737 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1738 } 1739 } 1740 1741 thread_insert_poller(thread, poller); 1742 1743 return poller; 1744 } 1745 1746 struct spdk_poller * 1747 spdk_poller_register(spdk_poller_fn fn, 1748 void *arg, 1749 uint64_t period_microseconds) 1750 { 1751 return poller_register(fn, arg, period_microseconds, NULL); 1752 } 1753 1754 struct spdk_poller * 1755 spdk_poller_register_named(spdk_poller_fn fn, 1756 void *arg, 1757 uint64_t period_microseconds, 1758 const char *name) 1759 { 1760 return poller_register(fn, arg, period_microseconds, name); 1761 } 1762 1763 static void 1764 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1765 struct spdk_thread *curthread) 1766 { 1767 if (thread == NULL) { 1768 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1769 abort(); 1770 } 1771 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1772 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1773 thread->name, thread->id); 1774 assert(false); 1775 } 1776 1777 void 1778 spdk_poller_unregister(struct spdk_poller **ppoller) 1779 { 1780 struct spdk_thread *thread; 1781 struct spdk_poller *poller; 1782 1783 poller = *ppoller; 1784 if (poller == NULL) { 1785 return; 1786 } 1787 1788 *ppoller = NULL; 1789 1790 thread = spdk_get_thread(); 1791 if (!thread) { 1792 assert(false); 1793 return; 1794 } 1795 1796 if (poller->thread != thread) { 1797 wrong_thread(__func__, poller->name, poller->thread, thread); 1798 return; 1799 } 1800 1801 if (spdk_interrupt_mode_is_enabled()) { 1802 /* Release the interrupt resource for period or busy poller */ 1803 if (poller->intr != NULL) { 1804 poller_interrupt_fini(poller); 1805 } 1806 1807 /* If there is not already a pending poller removal, generate 1808 * a message to go process removals. */ 1809 if (!thread->poller_unregistered) { 1810 thread->poller_unregistered = true; 1811 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1812 } 1813 } 1814 1815 /* If the poller was paused, put it on the active_pollers list so that 1816 * its unregistration can be processed by spdk_thread_poll(). 1817 */ 1818 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1819 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1820 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1821 poller->period_ticks = 0; 1822 } 1823 1824 /* Simply set the state to unregistered. The poller will get cleaned up 1825 * in a subsequent call to spdk_thread_poll(). 1826 */ 1827 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1828 } 1829 1830 void 1831 spdk_poller_pause(struct spdk_poller *poller) 1832 { 1833 struct spdk_thread *thread; 1834 1835 thread = spdk_get_thread(); 1836 if (!thread) { 1837 assert(false); 1838 return; 1839 } 1840 1841 if (poller->thread != thread) { 1842 wrong_thread(__func__, poller->name, poller->thread, thread); 1843 return; 1844 } 1845 1846 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1847 * spdk_thread_poll() move it. It allows a poller to be paused from 1848 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1849 * iteration, or from within itself without breaking the logic to always 1850 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1851 */ 1852 switch (poller->state) { 1853 case SPDK_POLLER_STATE_PAUSED: 1854 case SPDK_POLLER_STATE_PAUSING: 1855 break; 1856 case SPDK_POLLER_STATE_RUNNING: 1857 case SPDK_POLLER_STATE_WAITING: 1858 poller->state = SPDK_POLLER_STATE_PAUSING; 1859 break; 1860 default: 1861 assert(false); 1862 break; 1863 } 1864 } 1865 1866 void 1867 spdk_poller_resume(struct spdk_poller *poller) 1868 { 1869 struct spdk_thread *thread; 1870 1871 thread = spdk_get_thread(); 1872 if (!thread) { 1873 assert(false); 1874 return; 1875 } 1876 1877 if (poller->thread != thread) { 1878 wrong_thread(__func__, poller->name, poller->thread, thread); 1879 return; 1880 } 1881 1882 /* If a poller is paused it has to be removed from the paused pollers 1883 * list and put on the active list or timer tree depending on its 1884 * period_ticks. If a poller is still in the process of being paused, 1885 * we just need to flip its state back to waiting, as it's already on 1886 * the appropriate list or tree. 1887 */ 1888 switch (poller->state) { 1889 case SPDK_POLLER_STATE_PAUSED: 1890 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1891 thread_insert_poller(thread, poller); 1892 /* fallthrough */ 1893 case SPDK_POLLER_STATE_PAUSING: 1894 poller->state = SPDK_POLLER_STATE_WAITING; 1895 break; 1896 case SPDK_POLLER_STATE_RUNNING: 1897 case SPDK_POLLER_STATE_WAITING: 1898 break; 1899 default: 1900 assert(false); 1901 break; 1902 } 1903 } 1904 1905 const char * 1906 spdk_poller_get_name(struct spdk_poller *poller) 1907 { 1908 return poller->name; 1909 } 1910 1911 uint64_t 1912 spdk_poller_get_id(struct spdk_poller *poller) 1913 { 1914 return poller->id; 1915 } 1916 1917 const char * 1918 spdk_poller_get_state_str(struct spdk_poller *poller) 1919 { 1920 switch (poller->state) { 1921 case SPDK_POLLER_STATE_WAITING: 1922 return "waiting"; 1923 case SPDK_POLLER_STATE_RUNNING: 1924 return "running"; 1925 case SPDK_POLLER_STATE_UNREGISTERED: 1926 return "unregistered"; 1927 case SPDK_POLLER_STATE_PAUSING: 1928 return "pausing"; 1929 case SPDK_POLLER_STATE_PAUSED: 1930 return "paused"; 1931 default: 1932 return NULL; 1933 } 1934 } 1935 1936 uint64_t 1937 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1938 { 1939 return poller->period_ticks; 1940 } 1941 1942 void 1943 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1944 { 1945 stats->run_count = poller->run_count; 1946 stats->busy_count = poller->busy_count; 1947 } 1948 1949 struct spdk_poller * 1950 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1951 { 1952 return TAILQ_FIRST(&thread->active_pollers); 1953 } 1954 1955 struct spdk_poller * 1956 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1957 { 1958 return TAILQ_NEXT(prev, tailq); 1959 } 1960 1961 struct spdk_poller * 1962 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1963 { 1964 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1965 } 1966 1967 struct spdk_poller * 1968 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1969 { 1970 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1971 } 1972 1973 struct spdk_poller * 1974 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1975 { 1976 return TAILQ_FIRST(&thread->paused_pollers); 1977 } 1978 1979 struct spdk_poller * 1980 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1981 { 1982 return TAILQ_NEXT(prev, tailq); 1983 } 1984 1985 struct spdk_io_channel * 1986 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1987 { 1988 return RB_MIN(io_channel_tree, &thread->io_channels); 1989 } 1990 1991 struct spdk_io_channel * 1992 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1993 { 1994 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1995 } 1996 1997 uint16_t 1998 spdk_thread_get_trace_id(struct spdk_thread *thread) 1999 { 2000 return thread->trace_id; 2001 } 2002 2003 struct call_thread { 2004 struct spdk_thread *cur_thread; 2005 spdk_msg_fn fn; 2006 void *ctx; 2007 2008 struct spdk_thread *orig_thread; 2009 spdk_msg_fn cpl; 2010 }; 2011 2012 static void 2013 _back_to_orig_thread(void *ctx) 2014 { 2015 struct call_thread *ct = ctx; 2016 2017 assert(ct->orig_thread->for_each_count > 0); 2018 ct->orig_thread->for_each_count--; 2019 2020 ct->cpl(ct->ctx); 2021 free(ctx); 2022 } 2023 2024 static void 2025 _on_thread(void *ctx) 2026 { 2027 struct call_thread *ct = ctx; 2028 int rc __attribute__((unused)); 2029 2030 ct->fn(ct->ctx); 2031 2032 pthread_mutex_lock(&g_devlist_mutex); 2033 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2034 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 2035 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 2036 ct->cur_thread->name); 2037 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2038 } 2039 pthread_mutex_unlock(&g_devlist_mutex); 2040 2041 if (!ct->cur_thread) { 2042 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 2043 2044 rc = spdk_thread_send_msg(ct->orig_thread, _back_to_orig_thread, ctx); 2045 } else { 2046 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 2047 ct->cur_thread->name); 2048 2049 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 2050 } 2051 assert(rc == 0); 2052 } 2053 2054 void 2055 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 2056 { 2057 struct call_thread *ct; 2058 struct spdk_thread *thread; 2059 int rc __attribute__((unused)); 2060 2061 ct = calloc(1, sizeof(*ct)); 2062 if (!ct) { 2063 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2064 cpl(ctx); 2065 return; 2066 } 2067 2068 ct->fn = fn; 2069 ct->ctx = ctx; 2070 ct->cpl = cpl; 2071 2072 thread = _get_thread(); 2073 if (!thread) { 2074 SPDK_ERRLOG("No thread allocated\n"); 2075 free(ct); 2076 cpl(ctx); 2077 return; 2078 } 2079 ct->orig_thread = thread; 2080 2081 ct->orig_thread->for_each_count++; 2082 2083 pthread_mutex_lock(&g_devlist_mutex); 2084 ct->cur_thread = TAILQ_FIRST(&g_threads); 2085 pthread_mutex_unlock(&g_devlist_mutex); 2086 2087 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2088 ct->orig_thread->name); 2089 2090 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2091 assert(rc == 0); 2092 } 2093 2094 static inline void 2095 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2096 { 2097 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2098 return; 2099 } 2100 2101 if (poller->set_intr_cb_fn) { 2102 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2103 } 2104 } 2105 2106 void 2107 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2108 { 2109 struct spdk_thread *thread = _get_thread(); 2110 struct spdk_poller *poller, *tmp; 2111 2112 assert(thread); 2113 assert(spdk_interrupt_mode_is_enabled()); 2114 2115 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2116 thread->name, enable_interrupt ? "intr" : "poll", 2117 thread->in_interrupt ? "intr" : "poll"); 2118 2119 if (thread->in_interrupt == enable_interrupt) { 2120 return; 2121 } 2122 2123 /* Set pollers to expected mode */ 2124 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2125 poller_set_interrupt_mode(poller, enable_interrupt); 2126 } 2127 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2128 poller_set_interrupt_mode(poller, enable_interrupt); 2129 } 2130 /* All paused pollers will go to work in interrupt mode */ 2131 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2132 poller_set_interrupt_mode(poller, enable_interrupt); 2133 } 2134 2135 thread->in_interrupt = enable_interrupt; 2136 return; 2137 } 2138 2139 static struct io_device * 2140 io_device_get(void *io_device) 2141 { 2142 struct io_device find = {}; 2143 2144 find.io_device = io_device; 2145 return RB_FIND(io_device_tree, &g_io_devices, &find); 2146 } 2147 2148 void 2149 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2150 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2151 const char *name) 2152 { 2153 struct io_device *dev, *tmp; 2154 struct spdk_thread *thread; 2155 2156 assert(io_device != NULL); 2157 assert(create_cb != NULL); 2158 assert(destroy_cb != NULL); 2159 2160 thread = spdk_get_thread(); 2161 if (!thread) { 2162 SPDK_ERRLOG("called from non-SPDK thread\n"); 2163 assert(false); 2164 return; 2165 } 2166 2167 dev = calloc(1, sizeof(struct io_device)); 2168 if (dev == NULL) { 2169 SPDK_ERRLOG("could not allocate io_device\n"); 2170 return; 2171 } 2172 2173 dev->io_device = io_device; 2174 if (name) { 2175 snprintf(dev->name, sizeof(dev->name), "%s", name); 2176 } else { 2177 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2178 } 2179 dev->create_cb = create_cb; 2180 dev->destroy_cb = destroy_cb; 2181 dev->unregister_cb = NULL; 2182 dev->ctx_size = ctx_size; 2183 dev->for_each_count = 0; 2184 dev->unregistered = false; 2185 dev->refcnt = 0; 2186 2187 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2188 dev->name, dev->io_device, thread->name); 2189 2190 pthread_mutex_lock(&g_devlist_mutex); 2191 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2192 if (tmp != NULL) { 2193 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2194 io_device, tmp->name, dev->name); 2195 free(dev); 2196 } 2197 2198 pthread_mutex_unlock(&g_devlist_mutex); 2199 } 2200 2201 static void 2202 _finish_unregister(void *arg) 2203 { 2204 struct io_device *dev = arg; 2205 struct spdk_thread *thread; 2206 2207 thread = spdk_get_thread(); 2208 assert(thread == dev->unregister_thread); 2209 2210 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2211 dev->name, dev->io_device, thread->name); 2212 2213 assert(thread->pending_unregister_count > 0); 2214 thread->pending_unregister_count--; 2215 2216 dev->unregister_cb(dev->io_device); 2217 free(dev); 2218 } 2219 2220 static void 2221 io_device_free(struct io_device *dev) 2222 { 2223 int rc __attribute__((unused)); 2224 2225 if (dev->unregister_cb == NULL) { 2226 free(dev); 2227 } else { 2228 assert(dev->unregister_thread != NULL); 2229 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2230 dev->name, dev->io_device, dev->unregister_thread->name); 2231 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2232 assert(rc == 0); 2233 } 2234 } 2235 2236 void 2237 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2238 { 2239 struct io_device *dev; 2240 uint32_t refcnt; 2241 struct spdk_thread *thread; 2242 2243 thread = spdk_get_thread(); 2244 if (!thread) { 2245 SPDK_ERRLOG("called from non-SPDK thread\n"); 2246 assert(false); 2247 return; 2248 } 2249 2250 pthread_mutex_lock(&g_devlist_mutex); 2251 dev = io_device_get(io_device); 2252 if (!dev) { 2253 SPDK_ERRLOG("io_device %p not found\n", io_device); 2254 assert(false); 2255 pthread_mutex_unlock(&g_devlist_mutex); 2256 return; 2257 } 2258 2259 /* The for_each_count check differentiates the user attempting to unregister the 2260 * device a second time, from the internal call to this function that occurs 2261 * after the for_each_count reaches 0. 2262 */ 2263 if (dev->pending_unregister && dev->for_each_count > 0) { 2264 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2265 assert(false); 2266 pthread_mutex_unlock(&g_devlist_mutex); 2267 return; 2268 } 2269 2270 dev->unregister_cb = unregister_cb; 2271 dev->unregister_thread = thread; 2272 2273 if (dev->for_each_count > 0) { 2274 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2275 dev->name, io_device, dev->for_each_count); 2276 dev->pending_unregister = true; 2277 pthread_mutex_unlock(&g_devlist_mutex); 2278 return; 2279 } 2280 2281 dev->unregistered = true; 2282 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2283 refcnt = dev->refcnt; 2284 pthread_mutex_unlock(&g_devlist_mutex); 2285 2286 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2287 dev->name, dev->io_device, thread->name); 2288 2289 if (unregister_cb) { 2290 thread->pending_unregister_count++; 2291 } 2292 2293 if (refcnt > 0) { 2294 /* defer deletion */ 2295 return; 2296 } 2297 2298 io_device_free(dev); 2299 } 2300 2301 const char * 2302 spdk_io_device_get_name(struct io_device *dev) 2303 { 2304 return dev->name; 2305 } 2306 2307 static struct spdk_io_channel * 2308 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2309 { 2310 struct spdk_io_channel find = {}; 2311 2312 find.dev = dev; 2313 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2314 } 2315 2316 struct spdk_io_channel * 2317 spdk_get_io_channel(void *io_device) 2318 { 2319 struct spdk_io_channel *ch; 2320 struct spdk_thread *thread; 2321 struct io_device *dev; 2322 int rc; 2323 2324 pthread_mutex_lock(&g_devlist_mutex); 2325 dev = io_device_get(io_device); 2326 if (dev == NULL) { 2327 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2328 pthread_mutex_unlock(&g_devlist_mutex); 2329 return NULL; 2330 } 2331 2332 thread = _get_thread(); 2333 if (!thread) { 2334 SPDK_ERRLOG("No thread allocated\n"); 2335 pthread_mutex_unlock(&g_devlist_mutex); 2336 return NULL; 2337 } 2338 2339 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2340 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2341 pthread_mutex_unlock(&g_devlist_mutex); 2342 return NULL; 2343 } 2344 2345 ch = thread_get_io_channel(thread, dev); 2346 if (ch != NULL) { 2347 ch->ref++; 2348 2349 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2350 ch, dev->name, dev->io_device, thread->name, ch->ref); 2351 2352 /* 2353 * An I/O channel already exists for this device on this 2354 * thread, so return it. 2355 */ 2356 pthread_mutex_unlock(&g_devlist_mutex); 2357 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2358 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2359 return ch; 2360 } 2361 2362 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2363 if (ch == NULL) { 2364 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2365 pthread_mutex_unlock(&g_devlist_mutex); 2366 return NULL; 2367 } 2368 2369 ch->dev = dev; 2370 ch->destroy_cb = dev->destroy_cb; 2371 ch->thread = thread; 2372 ch->ref = 1; 2373 ch->destroy_ref = 0; 2374 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2375 2376 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2377 ch, dev->name, dev->io_device, thread->name, ch->ref); 2378 2379 dev->refcnt++; 2380 2381 pthread_mutex_unlock(&g_devlist_mutex); 2382 2383 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2384 if (rc != 0) { 2385 pthread_mutex_lock(&g_devlist_mutex); 2386 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2387 dev->refcnt--; 2388 free(ch); 2389 SPDK_ERRLOG("could not create io_channel for io_device %s (%p): %s (rc=%d)\n", 2390 dev->name, io_device, spdk_strerror(-rc), rc); 2391 pthread_mutex_unlock(&g_devlist_mutex); 2392 return NULL; 2393 } 2394 2395 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2396 return ch; 2397 } 2398 2399 static void 2400 put_io_channel(void *arg) 2401 { 2402 struct spdk_io_channel *ch = arg; 2403 bool do_remove_dev = true; 2404 struct spdk_thread *thread; 2405 2406 thread = spdk_get_thread(); 2407 if (!thread) { 2408 SPDK_ERRLOG("called from non-SPDK thread\n"); 2409 assert(false); 2410 return; 2411 } 2412 2413 SPDK_DEBUGLOG(thread, 2414 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2415 ch, ch->dev->name, ch->dev->io_device, thread->name); 2416 2417 assert(ch->thread == thread); 2418 2419 ch->destroy_ref--; 2420 2421 if (ch->ref > 0 || ch->destroy_ref > 0) { 2422 /* 2423 * Another reference to the associated io_device was requested 2424 * after this message was sent but before it had a chance to 2425 * execute. 2426 */ 2427 return; 2428 } 2429 2430 pthread_mutex_lock(&g_devlist_mutex); 2431 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2432 pthread_mutex_unlock(&g_devlist_mutex); 2433 2434 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2435 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2436 2437 pthread_mutex_lock(&g_devlist_mutex); 2438 ch->dev->refcnt--; 2439 2440 if (!ch->dev->unregistered) { 2441 do_remove_dev = false; 2442 } 2443 2444 if (ch->dev->refcnt > 0) { 2445 do_remove_dev = false; 2446 } 2447 2448 pthread_mutex_unlock(&g_devlist_mutex); 2449 2450 if (do_remove_dev) { 2451 io_device_free(ch->dev); 2452 } 2453 free(ch); 2454 } 2455 2456 void 2457 spdk_put_io_channel(struct spdk_io_channel *ch) 2458 { 2459 struct spdk_thread *thread; 2460 int rc __attribute__((unused)); 2461 2462 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2463 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2464 2465 thread = spdk_get_thread(); 2466 if (!thread) { 2467 SPDK_ERRLOG("called from non-SPDK thread\n"); 2468 assert(false); 2469 return; 2470 } 2471 2472 if (ch->thread != thread) { 2473 wrong_thread(__func__, "ch", ch->thread, thread); 2474 return; 2475 } 2476 2477 SPDK_DEBUGLOG(thread, 2478 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2479 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2480 2481 ch->ref--; 2482 2483 if (ch->ref == 0) { 2484 ch->destroy_ref++; 2485 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2486 assert(rc == 0); 2487 } 2488 } 2489 2490 struct spdk_io_channel * 2491 spdk_io_channel_from_ctx(void *ctx) 2492 { 2493 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2494 } 2495 2496 struct spdk_thread * 2497 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2498 { 2499 return ch->thread; 2500 } 2501 2502 void * 2503 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2504 { 2505 return ch->dev->io_device; 2506 } 2507 2508 const char * 2509 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2510 { 2511 return spdk_io_device_get_name(ch->dev); 2512 } 2513 2514 int 2515 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2516 { 2517 return ch->ref; 2518 } 2519 2520 struct spdk_io_channel_iter { 2521 void *io_device; 2522 struct io_device *dev; 2523 spdk_channel_msg fn; 2524 int status; 2525 void *ctx; 2526 struct spdk_io_channel *ch; 2527 2528 struct spdk_thread *cur_thread; 2529 2530 struct spdk_thread *orig_thread; 2531 spdk_channel_for_each_cpl cpl; 2532 }; 2533 2534 void * 2535 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2536 { 2537 return i->io_device; 2538 } 2539 2540 struct spdk_io_channel * 2541 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2542 { 2543 return i->ch; 2544 } 2545 2546 void * 2547 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2548 { 2549 return i->ctx; 2550 } 2551 2552 static void 2553 _call_completion(void *ctx) 2554 { 2555 struct spdk_io_channel_iter *i = ctx; 2556 2557 assert(i->orig_thread->for_each_count > 0); 2558 i->orig_thread->for_each_count--; 2559 2560 if (i->cpl != NULL) { 2561 i->cpl(i, i->status); 2562 } 2563 free(i); 2564 } 2565 2566 static void 2567 _call_channel(void *ctx) 2568 { 2569 struct spdk_io_channel_iter *i = ctx; 2570 struct spdk_io_channel *ch; 2571 2572 /* 2573 * It is possible that the channel was deleted before this 2574 * message had a chance to execute. If so, skip calling 2575 * the fn() on this thread. 2576 */ 2577 pthread_mutex_lock(&g_devlist_mutex); 2578 ch = thread_get_io_channel(i->cur_thread, i->dev); 2579 pthread_mutex_unlock(&g_devlist_mutex); 2580 2581 if (ch) { 2582 i->fn(i); 2583 } else { 2584 spdk_for_each_channel_continue(i, 0); 2585 } 2586 } 2587 2588 void 2589 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2590 spdk_channel_for_each_cpl cpl) 2591 { 2592 struct spdk_thread *thread; 2593 struct spdk_io_channel *ch; 2594 struct spdk_io_channel_iter *i; 2595 int rc __attribute__((unused)); 2596 2597 i = calloc(1, sizeof(*i)); 2598 if (!i) { 2599 SPDK_ERRLOG("Unable to allocate iterator\n"); 2600 assert(false); 2601 return; 2602 } 2603 2604 i->io_device = io_device; 2605 i->fn = fn; 2606 i->ctx = ctx; 2607 i->cpl = cpl; 2608 i->orig_thread = _get_thread(); 2609 2610 i->orig_thread->for_each_count++; 2611 2612 pthread_mutex_lock(&g_devlist_mutex); 2613 i->dev = io_device_get(io_device); 2614 if (i->dev == NULL) { 2615 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2616 assert(false); 2617 i->status = -ENODEV; 2618 goto end; 2619 } 2620 2621 /* Do not allow new for_each operations if we are already waiting to unregister 2622 * the device for other for_each operations to complete. 2623 */ 2624 if (i->dev->pending_unregister) { 2625 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2626 i->status = -ENODEV; 2627 goto end; 2628 } 2629 2630 TAILQ_FOREACH(thread, &g_threads, tailq) { 2631 ch = thread_get_io_channel(thread, i->dev); 2632 if (ch != NULL) { 2633 ch->dev->for_each_count++; 2634 i->cur_thread = thread; 2635 i->ch = ch; 2636 pthread_mutex_unlock(&g_devlist_mutex); 2637 rc = spdk_thread_send_msg(thread, _call_channel, i); 2638 assert(rc == 0); 2639 return; 2640 } 2641 } 2642 2643 end: 2644 pthread_mutex_unlock(&g_devlist_mutex); 2645 2646 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2647 assert(rc == 0); 2648 } 2649 2650 static void 2651 __pending_unregister(void *arg) 2652 { 2653 struct io_device *dev = arg; 2654 2655 assert(dev->pending_unregister); 2656 assert(dev->for_each_count == 0); 2657 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2658 } 2659 2660 void 2661 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2662 { 2663 struct spdk_thread *thread; 2664 struct spdk_io_channel *ch; 2665 struct io_device *dev; 2666 int rc __attribute__((unused)); 2667 2668 assert(i->cur_thread == spdk_get_thread()); 2669 2670 i->status = status; 2671 2672 pthread_mutex_lock(&g_devlist_mutex); 2673 dev = i->dev; 2674 if (status) { 2675 goto end; 2676 } 2677 2678 thread = TAILQ_NEXT(i->cur_thread, tailq); 2679 while (thread) { 2680 ch = thread_get_io_channel(thread, dev); 2681 if (ch != NULL) { 2682 i->cur_thread = thread; 2683 i->ch = ch; 2684 pthread_mutex_unlock(&g_devlist_mutex); 2685 rc = spdk_thread_send_msg(thread, _call_channel, i); 2686 assert(rc == 0); 2687 return; 2688 } 2689 thread = TAILQ_NEXT(thread, tailq); 2690 } 2691 2692 end: 2693 dev->for_each_count--; 2694 i->ch = NULL; 2695 pthread_mutex_unlock(&g_devlist_mutex); 2696 2697 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2698 assert(rc == 0); 2699 2700 pthread_mutex_lock(&g_devlist_mutex); 2701 if (dev->pending_unregister && dev->for_each_count == 0) { 2702 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2703 assert(rc == 0); 2704 } 2705 pthread_mutex_unlock(&g_devlist_mutex); 2706 } 2707 2708 static void 2709 thread_interrupt_destroy(struct spdk_thread *thread) 2710 { 2711 struct spdk_fd_group *fgrp = thread->fgrp; 2712 2713 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2714 2715 if (thread->msg_fd < 0) { 2716 return; 2717 } 2718 2719 spdk_fd_group_remove(fgrp, thread->msg_fd); 2720 close(thread->msg_fd); 2721 thread->msg_fd = -1; 2722 2723 spdk_fd_group_destroy(fgrp); 2724 thread->fgrp = NULL; 2725 } 2726 2727 #ifdef __linux__ 2728 static int 2729 thread_interrupt_msg_process(void *arg) 2730 { 2731 struct spdk_thread *thread = arg; 2732 struct spdk_thread *orig_thread; 2733 uint32_t msg_count; 2734 spdk_msg_fn critical_msg; 2735 int rc = 0; 2736 uint64_t notify = 1; 2737 2738 assert(spdk_interrupt_mode_is_enabled()); 2739 2740 orig_thread = spdk_get_thread(); 2741 spdk_set_thread(thread); 2742 2743 /* There may be race between msg_acknowledge and another producer's msg_notify, 2744 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2745 * This can avoid msg notification missing. 2746 */ 2747 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2748 if (rc < 0 && errno != EAGAIN) { 2749 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2750 } 2751 2752 critical_msg = thread->critical_msg; 2753 if (spdk_unlikely(critical_msg != NULL)) { 2754 critical_msg(NULL); 2755 thread->critical_msg = NULL; 2756 rc = 1; 2757 } 2758 2759 msg_count = msg_queue_run_batch(thread, 0); 2760 if (msg_count) { 2761 rc = 1; 2762 } 2763 2764 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2765 if (spdk_unlikely(!thread->in_interrupt)) { 2766 /* The thread transitioned to poll mode in a msg during the above processing. 2767 * Clear msg_fd since thread messages will be polled directly in poll mode. 2768 */ 2769 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2770 if (rc < 0 && errno != EAGAIN) { 2771 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2772 } 2773 } 2774 2775 spdk_set_thread(orig_thread); 2776 return rc; 2777 } 2778 2779 static int 2780 thread_interrupt_create(struct spdk_thread *thread) 2781 { 2782 int rc; 2783 2784 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2785 2786 rc = spdk_fd_group_create(&thread->fgrp); 2787 if (rc) { 2788 return rc; 2789 } 2790 2791 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2792 if (thread->msg_fd < 0) { 2793 rc = -errno; 2794 spdk_fd_group_destroy(thread->fgrp); 2795 thread->fgrp = NULL; 2796 2797 return rc; 2798 } 2799 2800 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2801 thread_interrupt_msg_process, thread); 2802 } 2803 #else 2804 static int 2805 thread_interrupt_create(struct spdk_thread *thread) 2806 { 2807 return -ENOTSUP; 2808 } 2809 #endif 2810 2811 static int 2812 _interrupt_wrapper(void *ctx) 2813 { 2814 struct spdk_interrupt *intr = ctx; 2815 struct spdk_thread *orig_thread, *thread; 2816 int rc; 2817 2818 orig_thread = spdk_get_thread(); 2819 thread = intr->thread; 2820 2821 spdk_set_thread(thread); 2822 2823 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2824 intr->fn, intr->arg); 2825 2826 rc = intr->fn(intr->arg); 2827 2828 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2829 2830 spdk_set_thread(orig_thread); 2831 2832 return rc; 2833 } 2834 2835 struct spdk_interrupt * 2836 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2837 void *arg, const char *name) 2838 { 2839 return spdk_interrupt_register_for_events(efd, SPDK_INTERRUPT_EVENT_IN, fn, arg, name); 2840 } 2841 2842 struct spdk_interrupt * 2843 spdk_interrupt_register_for_events(int efd, uint32_t events, spdk_interrupt_fn fn, void *arg, 2844 const char *name) 2845 { 2846 struct spdk_thread *thread; 2847 struct spdk_interrupt *intr; 2848 int ret; 2849 2850 thread = spdk_get_thread(); 2851 if (!thread) { 2852 assert(false); 2853 return NULL; 2854 } 2855 2856 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2857 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2858 return NULL; 2859 } 2860 2861 intr = calloc(1, sizeof(*intr)); 2862 if (intr == NULL) { 2863 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2864 return NULL; 2865 } 2866 2867 if (name) { 2868 snprintf(intr->name, sizeof(intr->name), "%s", name); 2869 } else { 2870 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2871 } 2872 2873 intr->efd = efd; 2874 intr->thread = thread; 2875 intr->fn = fn; 2876 intr->arg = arg; 2877 2878 ret = spdk_fd_group_add_for_events(thread->fgrp, efd, events, _interrupt_wrapper, intr, intr->name); 2879 2880 if (ret != 0) { 2881 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2882 thread->name, efd, spdk_strerror(-ret)); 2883 free(intr); 2884 return NULL; 2885 } 2886 2887 return intr; 2888 } 2889 2890 void 2891 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2892 { 2893 struct spdk_thread *thread; 2894 struct spdk_interrupt *intr; 2895 2896 intr = *pintr; 2897 if (intr == NULL) { 2898 return; 2899 } 2900 2901 *pintr = NULL; 2902 2903 thread = spdk_get_thread(); 2904 if (!thread) { 2905 assert(false); 2906 return; 2907 } 2908 2909 if (intr->thread != thread) { 2910 wrong_thread(__func__, intr->name, intr->thread, thread); 2911 return; 2912 } 2913 2914 spdk_fd_group_remove(thread->fgrp, intr->efd); 2915 free(intr); 2916 } 2917 2918 int 2919 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2920 enum spdk_interrupt_event_types event_types) 2921 { 2922 struct spdk_thread *thread; 2923 2924 thread = spdk_get_thread(); 2925 if (!thread) { 2926 assert(false); 2927 return -EINVAL; 2928 } 2929 2930 if (intr->thread != thread) { 2931 wrong_thread(__func__, intr->name, intr->thread, thread); 2932 return -EINVAL; 2933 } 2934 2935 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2936 } 2937 2938 int 2939 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2940 { 2941 return spdk_fd_group_get_fd(thread->fgrp); 2942 } 2943 2944 struct spdk_fd_group * 2945 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2946 { 2947 return thread->fgrp; 2948 } 2949 2950 static bool g_interrupt_mode = false; 2951 2952 int 2953 spdk_interrupt_mode_enable(void) 2954 { 2955 /* It must be called once prior to initializing the threading library. 2956 * g_spdk_msg_mempool will be valid if thread library is initialized. 2957 */ 2958 if (g_spdk_msg_mempool) { 2959 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2960 return -1; 2961 } 2962 2963 #ifdef __linux__ 2964 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2965 g_interrupt_mode = true; 2966 return 0; 2967 #else 2968 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2969 g_interrupt_mode = false; 2970 return -ENOTSUP; 2971 #endif 2972 } 2973 2974 bool 2975 spdk_interrupt_mode_is_enabled(void) 2976 { 2977 return g_interrupt_mode; 2978 } 2979 2980 #define SSPIN_DEBUG_STACK_FRAMES 16 2981 2982 struct sspin_stack { 2983 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 2984 uint32_t depth; 2985 }; 2986 2987 struct spdk_spinlock_internal { 2988 struct sspin_stack init_stack; 2989 struct sspin_stack lock_stack; 2990 struct sspin_stack unlock_stack; 2991 }; 2992 2993 static void 2994 sspin_init_internal(struct spdk_spinlock *sspin) 2995 { 2996 #ifdef DEBUG 2997 sspin->internal = calloc(1, sizeof(*sspin->internal)); 2998 #endif 2999 } 3000 3001 static void 3002 sspin_fini_internal(struct spdk_spinlock *sspin) 3003 { 3004 #ifdef DEBUG 3005 free(sspin->internal); 3006 sspin->internal = NULL; 3007 #endif 3008 } 3009 3010 #if defined(DEBUG) && defined(SPDK_HAVE_EXECINFO_H) 3011 #define SSPIN_GET_STACK(sspin, which) \ 3012 do { \ 3013 if (sspin->internal != NULL) { \ 3014 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 3015 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 3016 } \ 3017 } while (0) 3018 #else 3019 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 3020 #endif 3021 3022 static void 3023 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 3024 { 3025 #ifdef SPDK_HAVE_EXECINFO_H 3026 char **stack; 3027 size_t i; 3028 3029 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 3030 if (stack == NULL) { 3031 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 3032 return; 3033 } 3034 SPDK_ERRLOG(" %s:\n", title); 3035 for (i = 0; i < sspin_stack->depth; i++) { 3036 /* 3037 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 3038 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 3039 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 3040 * gdb_macros.md in the docs directory for details. 3041 */ 3042 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 3043 } 3044 free(stack); 3045 #endif /* SPDK_HAVE_EXECINFO_H */ 3046 } 3047 3048 static void 3049 sspin_stacks_print(const struct spdk_spinlock *sspin) 3050 { 3051 if (sspin->internal == NULL) { 3052 return; 3053 } 3054 SPDK_ERRLOG("spinlock %p\n", sspin); 3055 sspin_stack_print("Lock initialized at", &sspin->internal->init_stack); 3056 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 3057 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 3058 } 3059 3060 void 3061 spdk_spin_init(struct spdk_spinlock *sspin) 3062 { 3063 int rc; 3064 3065 memset(sspin, 0, sizeof(*sspin)); 3066 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 3067 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3068 sspin_init_internal(sspin); 3069 SSPIN_GET_STACK(sspin, init); 3070 sspin->initialized = true; 3071 } 3072 3073 void 3074 spdk_spin_destroy(struct spdk_spinlock *sspin) 3075 { 3076 int rc; 3077 3078 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3079 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3080 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 3081 3082 rc = pthread_spin_destroy(&sspin->spinlock); 3083 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3084 3085 sspin_fini_internal(sspin); 3086 sspin->initialized = false; 3087 sspin->destroyed = true; 3088 } 3089 3090 void 3091 spdk_spin_lock(struct spdk_spinlock *sspin) 3092 { 3093 struct spdk_thread *thread = spdk_get_thread(); 3094 int rc; 3095 3096 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3097 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3098 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3099 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3100 3101 rc = pthread_spin_lock(&sspin->spinlock); 3102 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3103 3104 sspin->thread = thread; 3105 sspin->thread->lock_count++; 3106 3107 SSPIN_GET_STACK(sspin, lock); 3108 } 3109 3110 void 3111 spdk_spin_unlock(struct spdk_spinlock *sspin) 3112 { 3113 struct spdk_thread *thread = spdk_get_thread(); 3114 int rc; 3115 3116 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3117 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3118 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3119 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3120 3121 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3122 thread->lock_count--; 3123 sspin->thread = NULL; 3124 3125 SSPIN_GET_STACK(sspin, unlock); 3126 3127 rc = pthread_spin_unlock(&sspin->spinlock); 3128 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3129 } 3130 3131 bool 3132 spdk_spin_held(struct spdk_spinlock *sspin) 3133 { 3134 struct spdk_thread *thread = spdk_get_thread(); 3135 3136 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3137 3138 return sspin->thread == thread; 3139 } 3140 3141 SPDK_LOG_REGISTER_COMPONENT(thread) 3142