1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #endif 29 30 #ifdef SPDK_HAVE_EXECINFO_H 31 #include <execinfo.h> 32 #endif 33 34 #define SPDK_MSG_BATCH_SIZE 8 35 #define SPDK_MAX_DEVICE_NAME_LEN 256 36 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 37 #define SPDK_MAX_POLLER_NAME_LEN 256 38 #define SPDK_MAX_THREAD_NAME_LEN 256 39 40 static struct spdk_thread *g_app_thread; 41 42 struct spdk_interrupt { 43 int efd; 44 struct spdk_thread *thread; 45 spdk_interrupt_fn fn; 46 void *arg; 47 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 48 }; 49 50 enum spdk_poller_state { 51 /* The poller is registered with a thread but not currently executing its fn. */ 52 SPDK_POLLER_STATE_WAITING, 53 54 /* The poller is currently running its fn. */ 55 SPDK_POLLER_STATE_RUNNING, 56 57 /* The poller was unregistered during the execution of its fn. */ 58 SPDK_POLLER_STATE_UNREGISTERED, 59 60 /* The poller is in the process of being paused. It will be paused 61 * during the next time it's supposed to be executed. 62 */ 63 SPDK_POLLER_STATE_PAUSING, 64 65 /* The poller is registered but currently paused. It's on the 66 * paused_pollers list. 67 */ 68 SPDK_POLLER_STATE_PAUSED, 69 }; 70 71 struct spdk_poller { 72 TAILQ_ENTRY(spdk_poller) tailq; 73 RB_ENTRY(spdk_poller) node; 74 75 /* Current state of the poller; should only be accessed from the poller's thread. */ 76 enum spdk_poller_state state; 77 78 uint64_t period_ticks; 79 uint64_t next_run_tick; 80 uint64_t run_count; 81 uint64_t busy_count; 82 uint64_t id; 83 spdk_poller_fn fn; 84 void *arg; 85 struct spdk_thread *thread; 86 struct spdk_interrupt *intr; 87 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 88 void *set_intr_cb_arg; 89 90 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 91 }; 92 93 enum spdk_thread_state { 94 /* The thread is processing poller and message by spdk_thread_poll(). */ 95 SPDK_THREAD_STATE_RUNNING, 96 97 /* The thread is in the process of termination. It reaps unregistering 98 * poller are releasing I/O channel. 99 */ 100 SPDK_THREAD_STATE_EXITING, 101 102 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 103 SPDK_THREAD_STATE_EXITED, 104 }; 105 106 struct spdk_thread { 107 uint64_t tsc_last; 108 struct spdk_thread_stats stats; 109 /* 110 * Contains pollers actively running on this thread. Pollers 111 * are run round-robin. The thread takes one poller from the head 112 * of the ring, executes it, then puts it back at the tail of 113 * the ring. 114 */ 115 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 116 /** 117 * Contains pollers running on this thread with a periodic timer. 118 */ 119 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 120 struct spdk_poller *first_timed_poller; 121 /* 122 * Contains paused pollers. Pollers on this queue are waiting until 123 * they are resumed (in which case they're put onto the active/timer 124 * queues) or unregistered. 125 */ 126 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 127 struct spdk_ring *messages; 128 int msg_fd; 129 SLIST_HEAD(, spdk_msg) msg_cache; 130 size_t msg_cache_count; 131 spdk_msg_fn critical_msg; 132 uint64_t id; 133 uint64_t next_poller_id; 134 enum spdk_thread_state state; 135 int pending_unregister_count; 136 uint32_t for_each_count; 137 138 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 139 TAILQ_ENTRY(spdk_thread) tailq; 140 141 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 142 struct spdk_cpuset cpumask; 143 uint64_t exit_timeout_tsc; 144 145 int32_t lock_count; 146 147 /* spdk_thread is bound to current CPU core. */ 148 bool is_bound; 149 150 /* Indicates whether this spdk_thread currently runs in interrupt. */ 151 bool in_interrupt; 152 bool poller_unregistered; 153 struct spdk_fd_group *fgrp; 154 155 /* User context allocated at the end */ 156 uint8_t ctx[0]; 157 }; 158 159 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 160 161 static spdk_new_thread_fn g_new_thread_fn = NULL; 162 static spdk_thread_op_fn g_thread_op_fn = NULL; 163 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 164 static size_t g_ctx_sz = 0; 165 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 166 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 167 * SPDK application is required. 168 */ 169 static uint64_t g_thread_id = 1; 170 171 enum spin_error { 172 SPIN_ERR_NONE, 173 /* Trying to use an SPDK lock while not on an SPDK thread */ 174 SPIN_ERR_NOT_SPDK_THREAD, 175 /* Trying to lock a lock already held by this SPDK thread */ 176 SPIN_ERR_DEADLOCK, 177 /* Trying to unlock a lock not held by this SPDK thread */ 178 SPIN_ERR_WRONG_THREAD, 179 /* pthread_spin_*() returned an error */ 180 SPIN_ERR_PTHREAD, 181 /* Trying to destroy a lock that is held */ 182 SPIN_ERR_LOCK_HELD, 183 /* lock_count is invalid */ 184 SPIN_ERR_LOCK_COUNT, 185 /* 186 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 187 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 188 * deadlock when another SPDK thread on the same pthread tries to take that lock. 189 */ 190 SPIN_ERR_HOLD_DURING_SWITCH, 191 /* Trying to use a lock that was destroyed (but not re-initialized) */ 192 SPIN_ERR_DESTROYED, 193 /* Trying to use a lock that is not initialized */ 194 SPIN_ERR_NOT_INITIALIZED, 195 196 /* Must be last, not an actual error code */ 197 SPIN_ERR_LAST 198 }; 199 200 static const char *spin_error_strings[] = { 201 [SPIN_ERR_NONE] = "No error", 202 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 203 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 204 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 205 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 206 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 207 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 208 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 209 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 210 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 211 }; 212 213 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 214 ? "Unknown error" : spin_error_strings[err] 215 216 static void 217 __posix_abort(enum spin_error err) 218 { 219 abort(); 220 } 221 222 typedef void (*spin_abort)(enum spin_error err); 223 spin_abort g_spin_abort_fn = __posix_abort; 224 225 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 226 do { \ 227 if (spdk_unlikely(!(cond))) { \ 228 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 229 SPIN_ERROR_STRING(err), #cond); \ 230 extra_log; \ 231 g_spin_abort_fn(err); \ 232 ret; \ 233 } \ 234 } while (0) 235 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 236 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 237 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 238 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 239 240 struct io_device { 241 void *io_device; 242 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 243 spdk_io_channel_create_cb create_cb; 244 spdk_io_channel_destroy_cb destroy_cb; 245 spdk_io_device_unregister_cb unregister_cb; 246 struct spdk_thread *unregister_thread; 247 uint32_t ctx_size; 248 uint32_t for_each_count; 249 RB_ENTRY(io_device) node; 250 251 uint32_t refcnt; 252 253 bool pending_unregister; 254 bool unregistered; 255 }; 256 257 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 258 259 static int 260 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 261 { 262 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 263 } 264 265 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 266 267 static int 268 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 269 { 270 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 271 } 272 273 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 274 275 struct spdk_msg { 276 spdk_msg_fn fn; 277 void *arg; 278 279 SLIST_ENTRY(spdk_msg) link; 280 }; 281 282 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 283 284 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 285 static uint32_t g_thread_count = 0; 286 287 static __thread struct spdk_thread *tls_thread = NULL; 288 289 static void 290 thread_trace(void) 291 { 292 struct spdk_trace_tpoint_opts opts[] = { 293 { 294 "THREAD_IOCH_GET", TRACE_THREAD_IOCH_GET, 295 OWNER_TYPE_NONE, OBJECT_NONE, 0, 296 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 297 }, 298 { 299 "THREAD_IOCH_PUT", TRACE_THREAD_IOCH_PUT, 300 OWNER_TYPE_NONE, OBJECT_NONE, 0, 301 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 302 } 303 }; 304 305 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 306 } 307 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 308 309 /* 310 * If this compare function returns zero when two next_run_ticks are equal, 311 * the macro RB_INSERT() returns a pointer to the element with the same 312 * next_run_tick. 313 * 314 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 315 * to remove as a parameter. 316 * 317 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 318 * side by returning 1 when two next_run_ticks are equal. 319 */ 320 static inline int 321 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 322 { 323 if (poller1->next_run_tick < poller2->next_run_tick) { 324 return -1; 325 } else { 326 return 1; 327 } 328 } 329 330 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 331 332 static inline struct spdk_thread * 333 _get_thread(void) 334 { 335 return tls_thread; 336 } 337 338 static int 339 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 340 { 341 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 342 343 g_ctx_sz = ctx_sz; 344 345 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 346 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 347 sizeof(struct spdk_msg), 348 0, /* No cache. We do our own. */ 349 SPDK_ENV_NUMA_ID_ANY); 350 351 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 352 msg_mempool_sz); 353 354 if (!g_spdk_msg_mempool) { 355 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 356 return -ENOMEM; 357 } 358 359 return 0; 360 } 361 362 static void thread_interrupt_destroy(struct spdk_thread *thread); 363 static int thread_interrupt_create(struct spdk_thread *thread); 364 365 static void 366 _free_thread(struct spdk_thread *thread) 367 { 368 struct spdk_io_channel *ch; 369 struct spdk_msg *msg; 370 struct spdk_poller *poller, *ptmp; 371 372 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 373 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 374 thread->name, ch->dev->name); 375 } 376 377 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 378 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 379 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 380 poller->name); 381 } 382 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 383 free(poller); 384 } 385 386 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 387 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 388 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 389 poller->name); 390 } 391 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 392 free(poller); 393 } 394 395 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 396 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 397 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 398 free(poller); 399 } 400 401 pthread_mutex_lock(&g_devlist_mutex); 402 assert(g_thread_count > 0); 403 g_thread_count--; 404 TAILQ_REMOVE(&g_threads, thread, tailq); 405 pthread_mutex_unlock(&g_devlist_mutex); 406 407 msg = SLIST_FIRST(&thread->msg_cache); 408 while (msg != NULL) { 409 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 410 411 assert(thread->msg_cache_count > 0); 412 thread->msg_cache_count--; 413 spdk_mempool_put(g_spdk_msg_mempool, msg); 414 415 msg = SLIST_FIRST(&thread->msg_cache); 416 } 417 418 assert(thread->msg_cache_count == 0); 419 420 if (spdk_interrupt_mode_is_enabled()) { 421 thread_interrupt_destroy(thread); 422 } 423 424 spdk_ring_free(thread->messages); 425 free(thread); 426 } 427 428 int 429 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 430 { 431 assert(g_new_thread_fn == NULL); 432 assert(g_thread_op_fn == NULL); 433 434 if (new_thread_fn == NULL) { 435 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 436 } else { 437 g_new_thread_fn = new_thread_fn; 438 } 439 440 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 441 } 442 443 int 444 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 445 spdk_thread_op_supported_fn thread_op_supported_fn, 446 size_t ctx_sz, size_t msg_mempool_sz) 447 { 448 assert(g_new_thread_fn == NULL); 449 assert(g_thread_op_fn == NULL); 450 assert(g_thread_op_supported_fn == NULL); 451 452 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 453 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 454 return -EINVAL; 455 } 456 457 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 458 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 459 } else { 460 g_thread_op_fn = thread_op_fn; 461 g_thread_op_supported_fn = thread_op_supported_fn; 462 } 463 464 return _thread_lib_init(ctx_sz, msg_mempool_sz); 465 } 466 467 void 468 spdk_thread_lib_fini(void) 469 { 470 struct io_device *dev; 471 472 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 473 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 474 } 475 476 g_new_thread_fn = NULL; 477 g_thread_op_fn = NULL; 478 g_thread_op_supported_fn = NULL; 479 g_ctx_sz = 0; 480 if (g_app_thread != NULL) { 481 _free_thread(g_app_thread); 482 g_app_thread = NULL; 483 } 484 485 if (g_spdk_msg_mempool) { 486 spdk_mempool_free(g_spdk_msg_mempool); 487 g_spdk_msg_mempool = NULL; 488 } 489 } 490 491 struct spdk_thread * 492 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 493 { 494 struct spdk_thread *thread, *null_thread; 495 size_t size = SPDK_ALIGN_CEIL(sizeof(*thread) + g_ctx_sz, SPDK_CACHE_LINE_SIZE); 496 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 497 int rc = 0, i; 498 499 /* Since this spdk_thread object will be used by another core, ensure that it won't share a 500 * cache line with any other object allocated on this core */ 501 rc = posix_memalign((void **)&thread, SPDK_CACHE_LINE_SIZE, size); 502 if (rc != 0) { 503 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 504 return NULL; 505 } 506 memset(thread, 0, size); 507 508 if (cpumask) { 509 spdk_cpuset_copy(&thread->cpumask, cpumask); 510 } else { 511 spdk_cpuset_negate(&thread->cpumask); 512 } 513 514 RB_INIT(&thread->io_channels); 515 TAILQ_INIT(&thread->active_pollers); 516 RB_INIT(&thread->timed_pollers); 517 TAILQ_INIT(&thread->paused_pollers); 518 SLIST_INIT(&thread->msg_cache); 519 thread->msg_cache_count = 0; 520 521 thread->tsc_last = spdk_get_ticks(); 522 523 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 524 * ID exceeds UINT64_MAX a warning message is logged 525 */ 526 thread->next_poller_id = 1; 527 528 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY); 529 if (!thread->messages) { 530 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 531 free(thread); 532 return NULL; 533 } 534 535 /* Fill the local message pool cache. */ 536 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 537 if (rc == 0) { 538 /* If we can't populate the cache it's ok. The cache will get filled 539 * up organically as messages are passed to the thread. */ 540 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 541 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 542 thread->msg_cache_count++; 543 } 544 } 545 546 if (name) { 547 snprintf(thread->name, sizeof(thread->name), "%s", name); 548 } else { 549 snprintf(thread->name, sizeof(thread->name), "%p", thread); 550 } 551 552 pthread_mutex_lock(&g_devlist_mutex); 553 if (g_thread_id == 0) { 554 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 555 pthread_mutex_unlock(&g_devlist_mutex); 556 _free_thread(thread); 557 return NULL; 558 } 559 thread->id = g_thread_id++; 560 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 561 g_thread_count++; 562 pthread_mutex_unlock(&g_devlist_mutex); 563 564 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 565 thread->id, thread->name); 566 567 if (spdk_interrupt_mode_is_enabled()) { 568 thread->in_interrupt = true; 569 rc = thread_interrupt_create(thread); 570 if (rc != 0) { 571 _free_thread(thread); 572 return NULL; 573 } 574 } 575 576 if (g_new_thread_fn) { 577 rc = g_new_thread_fn(thread); 578 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 579 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 580 } 581 582 if (rc != 0) { 583 _free_thread(thread); 584 return NULL; 585 } 586 587 thread->state = SPDK_THREAD_STATE_RUNNING; 588 589 /* If this is the first thread, save it as the app thread. Use an atomic 590 * compare + exchange to guard against crazy users who might try to 591 * call spdk_thread_create() simultaneously on multiple threads. 592 */ 593 null_thread = NULL; 594 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 595 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 596 597 return thread; 598 } 599 600 struct spdk_thread * 601 spdk_thread_get_app_thread(void) 602 { 603 return g_app_thread; 604 } 605 606 bool 607 spdk_thread_is_app_thread(struct spdk_thread *thread) 608 { 609 if (thread == NULL) { 610 thread = _get_thread(); 611 } 612 613 return g_app_thread == thread; 614 } 615 616 void 617 spdk_thread_bind(struct spdk_thread *thread, bool bind) 618 { 619 thread->is_bound = bind; 620 } 621 622 bool 623 spdk_thread_is_bound(struct spdk_thread *thread) 624 { 625 return thread->is_bound; 626 } 627 628 void 629 spdk_set_thread(struct spdk_thread *thread) 630 { 631 tls_thread = thread; 632 } 633 634 static void 635 thread_exit(struct spdk_thread *thread, uint64_t now) 636 { 637 struct spdk_poller *poller; 638 struct spdk_io_channel *ch; 639 640 if (now >= thread->exit_timeout_tsc) { 641 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 642 thread->name); 643 goto exited; 644 } 645 646 if (spdk_ring_count(thread->messages) > 0) { 647 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 648 return; 649 } 650 651 if (thread->for_each_count > 0) { 652 SPDK_INFOLOG(thread, "thread %s is still executing %u for_each_channels/threads\n", 653 thread->name, thread->for_each_count); 654 return; 655 } 656 657 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 658 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 659 SPDK_INFOLOG(thread, 660 "thread %s still has active poller %s\n", 661 thread->name, poller->name); 662 return; 663 } 664 } 665 666 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 667 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 668 SPDK_INFOLOG(thread, 669 "thread %s still has active timed poller %s\n", 670 thread->name, poller->name); 671 return; 672 } 673 } 674 675 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 676 SPDK_INFOLOG(thread, 677 "thread %s still has paused poller %s\n", 678 thread->name, poller->name); 679 return; 680 } 681 682 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 683 SPDK_INFOLOG(thread, 684 "thread %s still has channel for io_device %s\n", 685 thread->name, ch->dev->name); 686 return; 687 } 688 689 if (thread->pending_unregister_count > 0) { 690 SPDK_INFOLOG(thread, 691 "thread %s is still unregistering io_devices\n", 692 thread->name); 693 return; 694 } 695 696 exited: 697 thread->state = SPDK_THREAD_STATE_EXITED; 698 if (spdk_unlikely(thread->in_interrupt)) { 699 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 700 } 701 } 702 703 static void _thread_exit(void *ctx); 704 705 int 706 spdk_thread_exit(struct spdk_thread *thread) 707 { 708 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 709 710 assert(tls_thread == thread); 711 712 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 713 SPDK_INFOLOG(thread, 714 "thread %s is already exiting\n", 715 thread->name); 716 return 0; 717 } 718 719 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 720 SPDK_THREAD_EXIT_TIMEOUT_SEC); 721 thread->state = SPDK_THREAD_STATE_EXITING; 722 723 if (spdk_interrupt_mode_is_enabled()) { 724 spdk_thread_send_msg(thread, _thread_exit, thread); 725 } 726 727 return 0; 728 } 729 730 bool 731 spdk_thread_is_running(struct spdk_thread *thread) 732 { 733 return thread->state == SPDK_THREAD_STATE_RUNNING; 734 } 735 736 bool 737 spdk_thread_is_exited(struct spdk_thread *thread) 738 { 739 return thread->state == SPDK_THREAD_STATE_EXITED; 740 } 741 742 void 743 spdk_thread_destroy(struct spdk_thread *thread) 744 { 745 assert(thread != NULL); 746 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 747 748 assert(thread->state == SPDK_THREAD_STATE_EXITED); 749 750 if (tls_thread == thread) { 751 tls_thread = NULL; 752 } 753 754 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 755 if (thread != g_app_thread) { 756 _free_thread(thread); 757 } 758 } 759 760 void * 761 spdk_thread_get_ctx(struct spdk_thread *thread) 762 { 763 if (g_ctx_sz > 0) { 764 return thread->ctx; 765 } 766 767 return NULL; 768 } 769 770 struct spdk_cpuset * 771 spdk_thread_get_cpumask(struct spdk_thread *thread) 772 { 773 return &thread->cpumask; 774 } 775 776 int 777 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 778 { 779 struct spdk_thread *thread; 780 781 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 782 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 783 assert(false); 784 return -ENOTSUP; 785 } 786 787 thread = spdk_get_thread(); 788 if (!thread) { 789 SPDK_ERRLOG("Called from non-SPDK thread\n"); 790 assert(false); 791 return -EINVAL; 792 } 793 794 spdk_cpuset_copy(&thread->cpumask, cpumask); 795 796 /* Invoke framework's reschedule operation. If this function is called multiple times 797 * in a single spdk_thread_poll() context, the last cpumask will be used in the 798 * reschedule operation. 799 */ 800 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 801 802 return 0; 803 } 804 805 struct spdk_thread * 806 spdk_thread_get_from_ctx(void *ctx) 807 { 808 if (ctx == NULL) { 809 assert(false); 810 return NULL; 811 } 812 813 assert(g_ctx_sz > 0); 814 815 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 816 } 817 818 static inline uint32_t 819 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 820 { 821 unsigned count, i; 822 void *messages[SPDK_MSG_BATCH_SIZE]; 823 uint64_t notify = 1; 824 int rc; 825 826 #ifdef DEBUG 827 /* 828 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 829 * so we will never actually read uninitialized data from events, but just to be sure 830 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 831 */ 832 memset(messages, 0, sizeof(messages)); 833 #endif 834 835 if (max_msgs > 0) { 836 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 837 } else { 838 max_msgs = SPDK_MSG_BATCH_SIZE; 839 } 840 841 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 842 if (spdk_unlikely(thread->in_interrupt) && 843 spdk_ring_count(thread->messages) != 0) { 844 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 845 if (rc < 0) { 846 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 847 } 848 } 849 if (count == 0) { 850 return 0; 851 } 852 853 for (i = 0; i < count; i++) { 854 struct spdk_msg *msg = messages[i]; 855 856 assert(msg != NULL); 857 858 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 859 860 msg->fn(msg->arg); 861 862 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 863 864 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 865 /* Insert the messages at the head. We want to re-use the hot 866 * ones. */ 867 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 868 thread->msg_cache_count++; 869 } else { 870 spdk_mempool_put(g_spdk_msg_mempool, msg); 871 } 872 } 873 874 return count; 875 } 876 877 static void 878 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 879 { 880 struct spdk_poller *tmp __attribute__((unused)); 881 882 poller->next_run_tick = now + poller->period_ticks; 883 884 /* 885 * Insert poller in the thread's timed_pollers tree by next scheduled run time 886 * as its key. 887 */ 888 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 889 assert(tmp == NULL); 890 891 /* Update the cache only if it is empty or the inserted poller is earlier than it. 892 * RB_MIN() is not necessary here because all pollers, which has exactly the same 893 * next_run_tick as the existing poller, are inserted on the right side. 894 */ 895 if (thread->first_timed_poller == NULL || 896 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 897 thread->first_timed_poller = poller; 898 } 899 } 900 901 static inline void 902 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 903 { 904 struct spdk_poller *tmp __attribute__((unused)); 905 906 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 907 assert(tmp != NULL); 908 909 /* This function is not used in any case that is performance critical. 910 * Update the cache simply by RB_MIN() if it needs to be changed. 911 */ 912 if (thread->first_timed_poller == poller) { 913 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 914 } 915 } 916 917 static void 918 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 919 { 920 if (poller->period_ticks) { 921 poller_insert_timer(thread, poller, spdk_get_ticks()); 922 } else { 923 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 924 } 925 } 926 927 static inline void 928 thread_update_stats(struct spdk_thread *thread, uint64_t end, 929 uint64_t start, int rc) 930 { 931 if (rc == 0) { 932 /* Poller status idle */ 933 thread->stats.idle_tsc += end - start; 934 } else if (rc > 0) { 935 /* Poller status busy */ 936 thread->stats.busy_tsc += end - start; 937 } 938 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 939 thread->tsc_last = end; 940 } 941 942 static inline int 943 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 944 { 945 int rc; 946 947 switch (poller->state) { 948 case SPDK_POLLER_STATE_UNREGISTERED: 949 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 950 free(poller); 951 return 0; 952 case SPDK_POLLER_STATE_PAUSING: 953 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 954 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 955 poller->state = SPDK_POLLER_STATE_PAUSED; 956 return 0; 957 case SPDK_POLLER_STATE_WAITING: 958 break; 959 default: 960 assert(false); 961 break; 962 } 963 964 poller->state = SPDK_POLLER_STATE_RUNNING; 965 rc = poller->fn(poller->arg); 966 967 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 968 969 poller->run_count++; 970 if (rc > 0) { 971 poller->busy_count++; 972 } 973 974 #ifdef DEBUG 975 if (rc == -1) { 976 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 977 } 978 #endif 979 980 switch (poller->state) { 981 case SPDK_POLLER_STATE_UNREGISTERED: 982 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 983 free(poller); 984 break; 985 case SPDK_POLLER_STATE_PAUSING: 986 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 987 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 988 poller->state = SPDK_POLLER_STATE_PAUSED; 989 break; 990 case SPDK_POLLER_STATE_PAUSED: 991 case SPDK_POLLER_STATE_WAITING: 992 break; 993 case SPDK_POLLER_STATE_RUNNING: 994 poller->state = SPDK_POLLER_STATE_WAITING; 995 break; 996 default: 997 assert(false); 998 break; 999 } 1000 1001 return rc; 1002 } 1003 1004 static inline int 1005 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 1006 uint64_t now) 1007 { 1008 int rc; 1009 1010 switch (poller->state) { 1011 case SPDK_POLLER_STATE_UNREGISTERED: 1012 free(poller); 1013 return 0; 1014 case SPDK_POLLER_STATE_PAUSING: 1015 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1016 poller->state = SPDK_POLLER_STATE_PAUSED; 1017 return 0; 1018 case SPDK_POLLER_STATE_WAITING: 1019 break; 1020 default: 1021 assert(false); 1022 break; 1023 } 1024 1025 poller->state = SPDK_POLLER_STATE_RUNNING; 1026 rc = poller->fn(poller->arg); 1027 1028 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1029 1030 poller->run_count++; 1031 if (rc > 0) { 1032 poller->busy_count++; 1033 } 1034 1035 #ifdef DEBUG 1036 if (rc == -1) { 1037 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1038 } 1039 #endif 1040 1041 switch (poller->state) { 1042 case SPDK_POLLER_STATE_UNREGISTERED: 1043 free(poller); 1044 break; 1045 case SPDK_POLLER_STATE_PAUSING: 1046 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1047 poller->state = SPDK_POLLER_STATE_PAUSED; 1048 break; 1049 case SPDK_POLLER_STATE_PAUSED: 1050 break; 1051 case SPDK_POLLER_STATE_RUNNING: 1052 poller->state = SPDK_POLLER_STATE_WAITING; 1053 /* fallthrough */ 1054 case SPDK_POLLER_STATE_WAITING: 1055 poller_insert_timer(thread, poller, now); 1056 break; 1057 default: 1058 assert(false); 1059 break; 1060 } 1061 1062 return rc; 1063 } 1064 1065 static int 1066 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1067 { 1068 uint32_t msg_count; 1069 struct spdk_poller *poller, *tmp; 1070 spdk_msg_fn critical_msg; 1071 int rc = 0; 1072 1073 thread->tsc_last = now; 1074 1075 critical_msg = thread->critical_msg; 1076 if (spdk_unlikely(critical_msg != NULL)) { 1077 critical_msg(NULL); 1078 thread->critical_msg = NULL; 1079 rc = 1; 1080 } 1081 1082 msg_count = msg_queue_run_batch(thread, max_msgs); 1083 if (msg_count) { 1084 rc = 1; 1085 } 1086 1087 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1088 active_pollers_head, tailq, tmp) { 1089 int poller_rc; 1090 1091 poller_rc = thread_execute_poller(thread, poller); 1092 if (poller_rc > rc) { 1093 rc = poller_rc; 1094 } 1095 } 1096 1097 poller = thread->first_timed_poller; 1098 while (poller != NULL) { 1099 int timer_rc = 0; 1100 1101 if (now < poller->next_run_tick) { 1102 break; 1103 } 1104 1105 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1106 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1107 1108 /* Update the cache to the next timed poller in the list 1109 * only if the current poller is still the closest, otherwise, 1110 * do nothing because the cache has been already updated. 1111 */ 1112 if (thread->first_timed_poller == poller) { 1113 thread->first_timed_poller = tmp; 1114 } 1115 1116 timer_rc = thread_execute_timed_poller(thread, poller, now); 1117 if (timer_rc > rc) { 1118 rc = timer_rc; 1119 } 1120 1121 poller = tmp; 1122 } 1123 1124 return rc; 1125 } 1126 1127 static void 1128 _thread_remove_pollers(void *ctx) 1129 { 1130 struct spdk_thread *thread = ctx; 1131 struct spdk_poller *poller, *tmp; 1132 1133 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1134 active_pollers_head, tailq, tmp) { 1135 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1136 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1137 free(poller); 1138 } 1139 } 1140 1141 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1142 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1143 poller_remove_timer(thread, poller); 1144 free(poller); 1145 } 1146 } 1147 1148 thread->poller_unregistered = false; 1149 } 1150 1151 static void 1152 _thread_exit(void *ctx) 1153 { 1154 struct spdk_thread *thread = ctx; 1155 1156 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1157 1158 thread_exit(thread, spdk_get_ticks()); 1159 1160 if (thread->state != SPDK_THREAD_STATE_EXITED) { 1161 spdk_thread_send_msg(thread, _thread_exit, thread); 1162 } 1163 } 1164 1165 int 1166 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1167 { 1168 struct spdk_thread *orig_thread; 1169 int rc; 1170 1171 orig_thread = _get_thread(); 1172 tls_thread = thread; 1173 1174 if (now == 0) { 1175 now = spdk_get_ticks(); 1176 } 1177 1178 if (spdk_likely(!thread->in_interrupt)) { 1179 rc = thread_poll(thread, max_msgs, now); 1180 if (spdk_unlikely(thread->in_interrupt)) { 1181 /* The thread transitioned to interrupt mode during the above poll. 1182 * Poll it one more time in case that during the transition time 1183 * there is msg received without notification. 1184 */ 1185 rc = thread_poll(thread, max_msgs, now); 1186 } 1187 1188 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1189 thread_exit(thread, now); 1190 } 1191 } else { 1192 /* Non-block wait on thread's fd_group */ 1193 rc = spdk_fd_group_wait(thread->fgrp, 0); 1194 } 1195 1196 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1197 1198 tls_thread = orig_thread; 1199 1200 return rc; 1201 } 1202 1203 uint64_t 1204 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1205 { 1206 struct spdk_poller *poller; 1207 1208 poller = thread->first_timed_poller; 1209 if (poller) { 1210 return poller->next_run_tick; 1211 } 1212 1213 return 0; 1214 } 1215 1216 int 1217 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1218 { 1219 return !TAILQ_EMPTY(&thread->active_pollers); 1220 } 1221 1222 static bool 1223 thread_has_unpaused_pollers(struct spdk_thread *thread) 1224 { 1225 if (TAILQ_EMPTY(&thread->active_pollers) && 1226 RB_EMPTY(&thread->timed_pollers)) { 1227 return false; 1228 } 1229 1230 return true; 1231 } 1232 1233 bool 1234 spdk_thread_has_pollers(struct spdk_thread *thread) 1235 { 1236 if (!thread_has_unpaused_pollers(thread) && 1237 TAILQ_EMPTY(&thread->paused_pollers)) { 1238 return false; 1239 } 1240 1241 return true; 1242 } 1243 1244 bool 1245 spdk_thread_is_idle(struct spdk_thread *thread) 1246 { 1247 if (spdk_ring_count(thread->messages) || 1248 thread_has_unpaused_pollers(thread) || 1249 thread->critical_msg != NULL) { 1250 return false; 1251 } 1252 1253 return true; 1254 } 1255 1256 uint32_t 1257 spdk_thread_get_count(void) 1258 { 1259 /* 1260 * Return cached value of the current thread count. We could acquire the 1261 * lock and iterate through the TAILQ of threads to count them, but that 1262 * count could still be invalidated after we release the lock. 1263 */ 1264 return g_thread_count; 1265 } 1266 1267 struct spdk_thread * 1268 spdk_get_thread(void) 1269 { 1270 return _get_thread(); 1271 } 1272 1273 const char * 1274 spdk_thread_get_name(const struct spdk_thread *thread) 1275 { 1276 return thread->name; 1277 } 1278 1279 uint64_t 1280 spdk_thread_get_id(const struct spdk_thread *thread) 1281 { 1282 return thread->id; 1283 } 1284 1285 struct spdk_thread * 1286 spdk_thread_get_by_id(uint64_t id) 1287 { 1288 struct spdk_thread *thread; 1289 1290 if (id == 0 || id >= g_thread_id) { 1291 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1292 return NULL; 1293 } 1294 pthread_mutex_lock(&g_devlist_mutex); 1295 TAILQ_FOREACH(thread, &g_threads, tailq) { 1296 if (thread->id == id) { 1297 break; 1298 } 1299 } 1300 pthread_mutex_unlock(&g_devlist_mutex); 1301 return thread; 1302 } 1303 1304 int 1305 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1306 { 1307 struct spdk_thread *thread; 1308 1309 thread = _get_thread(); 1310 if (!thread) { 1311 SPDK_ERRLOG("No thread allocated\n"); 1312 return -EINVAL; 1313 } 1314 1315 if (stats == NULL) { 1316 return -EINVAL; 1317 } 1318 1319 *stats = thread->stats; 1320 1321 return 0; 1322 } 1323 1324 uint64_t 1325 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1326 { 1327 if (thread == NULL) { 1328 thread = _get_thread(); 1329 } 1330 1331 return thread->tsc_last; 1332 } 1333 1334 static inline int 1335 thread_send_msg_notification(const struct spdk_thread *target_thread) 1336 { 1337 uint64_t notify = 1; 1338 int rc; 1339 1340 /* Not necessary to do notification if interrupt facility is not enabled */ 1341 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1342 return 0; 1343 } 1344 1345 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1346 * after sending thread msg, it is necessary to check whether target thread runs in 1347 * interrupt mode and then decide whether do event notification. 1348 */ 1349 if (spdk_unlikely(target_thread->in_interrupt)) { 1350 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1351 if (rc < 0) { 1352 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1353 return -EIO; 1354 } 1355 } 1356 1357 return 0; 1358 } 1359 1360 int 1361 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1362 { 1363 struct spdk_thread *local_thread; 1364 struct spdk_msg *msg; 1365 int rc; 1366 1367 assert(thread != NULL); 1368 1369 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1370 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1371 return -EIO; 1372 } 1373 1374 local_thread = _get_thread(); 1375 1376 msg = NULL; 1377 if (local_thread != NULL) { 1378 if (local_thread->msg_cache_count > 0) { 1379 msg = SLIST_FIRST(&local_thread->msg_cache); 1380 assert(msg != NULL); 1381 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1382 local_thread->msg_cache_count--; 1383 } 1384 } 1385 1386 if (msg == NULL) { 1387 msg = spdk_mempool_get(g_spdk_msg_mempool); 1388 if (!msg) { 1389 SPDK_ERRLOG("msg could not be allocated\n"); 1390 return -ENOMEM; 1391 } 1392 } 1393 1394 msg->fn = fn; 1395 msg->arg = ctx; 1396 1397 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1398 if (rc != 1) { 1399 SPDK_ERRLOG("msg could not be enqueued\n"); 1400 spdk_mempool_put(g_spdk_msg_mempool, msg); 1401 return -EIO; 1402 } 1403 1404 return thread_send_msg_notification(thread); 1405 } 1406 1407 int 1408 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1409 { 1410 spdk_msg_fn expected = NULL; 1411 1412 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1413 __ATOMIC_SEQ_CST)) { 1414 return -EIO; 1415 } 1416 1417 return thread_send_msg_notification(thread); 1418 } 1419 1420 #ifdef __linux__ 1421 static int 1422 interrupt_timerfd_process(void *arg) 1423 { 1424 struct spdk_poller *poller = arg; 1425 uint64_t exp; 1426 int rc; 1427 1428 /* clear the level of interval timer */ 1429 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1430 if (rc < 0) { 1431 if (rc == -EAGAIN) { 1432 return 0; 1433 } 1434 1435 return rc; 1436 } 1437 1438 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1439 1440 return poller->fn(poller->arg); 1441 } 1442 1443 static int 1444 period_poller_interrupt_init(struct spdk_poller *poller) 1445 { 1446 int timerfd; 1447 1448 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1449 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1450 if (timerfd < 0) { 1451 return -errno; 1452 } 1453 1454 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1455 if (poller->intr == NULL) { 1456 close(timerfd); 1457 return -1; 1458 } 1459 1460 return 0; 1461 } 1462 1463 static void 1464 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1465 { 1466 int timerfd; 1467 uint64_t now_tick = spdk_get_ticks(); 1468 uint64_t ticks = spdk_get_ticks_hz(); 1469 int ret; 1470 struct itimerspec new_tv = {}; 1471 struct itimerspec old_tv = {}; 1472 1473 assert(poller->intr != NULL); 1474 assert(poller->period_ticks != 0); 1475 1476 timerfd = poller->intr->efd; 1477 1478 assert(timerfd >= 0); 1479 1480 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1481 interrupt_mode ? "interrupt" : "poll"); 1482 1483 if (interrupt_mode) { 1484 /* Set repeated timer expiration */ 1485 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1486 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1487 1488 /* Update next timer expiration */ 1489 if (poller->next_run_tick == 0) { 1490 poller->next_run_tick = now_tick + poller->period_ticks; 1491 } else if (poller->next_run_tick < now_tick) { 1492 poller->next_run_tick = now_tick; 1493 } 1494 1495 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1496 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1497 1498 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1499 if (ret < 0) { 1500 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1501 assert(false); 1502 } 1503 } else { 1504 /* Disarm the timer */ 1505 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1506 if (ret < 0) { 1507 /* timerfd_settime's failure indicates that the timerfd is in error */ 1508 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1509 assert(false); 1510 } 1511 1512 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1513 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1514 */ 1515 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1516 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1517 poller_remove_timer(poller->thread, poller); 1518 poller_insert_timer(poller->thread, poller, now_tick); 1519 } 1520 } 1521 1522 static void 1523 poller_interrupt_fini(struct spdk_poller *poller) 1524 { 1525 int fd; 1526 1527 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1528 assert(poller->intr != NULL); 1529 fd = poller->intr->efd; 1530 spdk_interrupt_unregister(&poller->intr); 1531 close(fd); 1532 } 1533 1534 static int 1535 busy_poller_interrupt_init(struct spdk_poller *poller) 1536 { 1537 int busy_efd; 1538 1539 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1540 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1541 if (busy_efd < 0) { 1542 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1543 return -errno; 1544 } 1545 1546 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1547 if (poller->intr == NULL) { 1548 close(busy_efd); 1549 return -1; 1550 } 1551 1552 return 0; 1553 } 1554 1555 static void 1556 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1557 { 1558 int busy_efd = poller->intr->efd; 1559 uint64_t notify = 1; 1560 int rc __attribute__((unused)); 1561 1562 assert(busy_efd >= 0); 1563 1564 if (interrupt_mode) { 1565 /* Write without read on eventfd will get it repeatedly triggered. */ 1566 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1567 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1568 } 1569 } else { 1570 /* Read on eventfd will clear its level triggering. */ 1571 rc = read(busy_efd, ¬ify, sizeof(notify)); 1572 } 1573 } 1574 1575 #else 1576 1577 static int 1578 period_poller_interrupt_init(struct spdk_poller *poller) 1579 { 1580 return -ENOTSUP; 1581 } 1582 1583 static void 1584 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1585 { 1586 } 1587 1588 static void 1589 poller_interrupt_fini(struct spdk_poller *poller) 1590 { 1591 } 1592 1593 static int 1594 busy_poller_interrupt_init(struct spdk_poller *poller) 1595 { 1596 return -ENOTSUP; 1597 } 1598 1599 static void 1600 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1601 { 1602 } 1603 1604 #endif 1605 1606 void 1607 spdk_poller_register_interrupt(struct spdk_poller *poller, 1608 spdk_poller_set_interrupt_mode_cb cb_fn, 1609 void *cb_arg) 1610 { 1611 assert(poller != NULL); 1612 assert(spdk_get_thread() == poller->thread); 1613 1614 if (!spdk_interrupt_mode_is_enabled()) { 1615 return; 1616 } 1617 1618 /* If this poller already had an interrupt, clean the old one up. */ 1619 if (poller->intr != NULL) { 1620 poller_interrupt_fini(poller); 1621 } 1622 1623 poller->set_intr_cb_fn = cb_fn; 1624 poller->set_intr_cb_arg = cb_arg; 1625 1626 /* Set poller into interrupt mode if thread is in interrupt. */ 1627 if (poller->thread->in_interrupt && poller->set_intr_cb_fn) { 1628 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1629 } 1630 } 1631 1632 static uint64_t 1633 convert_us_to_ticks(uint64_t us) 1634 { 1635 uint64_t quotient, remainder, ticks; 1636 1637 if (us) { 1638 quotient = us / SPDK_SEC_TO_USEC; 1639 remainder = us % SPDK_SEC_TO_USEC; 1640 ticks = spdk_get_ticks_hz(); 1641 1642 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1643 } else { 1644 return 0; 1645 } 1646 } 1647 1648 static struct spdk_poller * 1649 poller_register(spdk_poller_fn fn, 1650 void *arg, 1651 uint64_t period_microseconds, 1652 const char *name) 1653 { 1654 struct spdk_thread *thread; 1655 struct spdk_poller *poller; 1656 1657 thread = spdk_get_thread(); 1658 if (!thread) { 1659 assert(false); 1660 return NULL; 1661 } 1662 1663 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1664 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1665 return NULL; 1666 } 1667 1668 poller = calloc(1, sizeof(*poller)); 1669 if (poller == NULL) { 1670 SPDK_ERRLOG("Poller memory allocation failed\n"); 1671 return NULL; 1672 } 1673 1674 if (name) { 1675 snprintf(poller->name, sizeof(poller->name), "%s", name); 1676 } else { 1677 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1678 } 1679 1680 poller->state = SPDK_POLLER_STATE_WAITING; 1681 poller->fn = fn; 1682 poller->arg = arg; 1683 poller->thread = thread; 1684 poller->intr = NULL; 1685 if (thread->next_poller_id == 0) { 1686 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1687 thread->next_poller_id = 1; 1688 } 1689 poller->id = thread->next_poller_id++; 1690 1691 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1692 1693 if (spdk_interrupt_mode_is_enabled()) { 1694 int rc; 1695 1696 if (period_microseconds) { 1697 rc = period_poller_interrupt_init(poller); 1698 if (rc < 0) { 1699 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1700 free(poller); 1701 return NULL; 1702 } 1703 1704 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1705 poller->set_intr_cb_arg = NULL; 1706 1707 } else { 1708 /* If the poller doesn't have a period, create interruptfd that's always 1709 * busy automatically when running in interrupt mode. 1710 */ 1711 rc = busy_poller_interrupt_init(poller); 1712 if (rc > 0) { 1713 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1714 free(poller); 1715 return NULL; 1716 } 1717 1718 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1719 poller->set_intr_cb_arg = NULL; 1720 } 1721 1722 /* Set poller into interrupt mode if thread is in interrupt. */ 1723 if (poller->thread->in_interrupt) { 1724 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1725 } 1726 } 1727 1728 thread_insert_poller(thread, poller); 1729 1730 return poller; 1731 } 1732 1733 struct spdk_poller * 1734 spdk_poller_register(spdk_poller_fn fn, 1735 void *arg, 1736 uint64_t period_microseconds) 1737 { 1738 return poller_register(fn, arg, period_microseconds, NULL); 1739 } 1740 1741 struct spdk_poller * 1742 spdk_poller_register_named(spdk_poller_fn fn, 1743 void *arg, 1744 uint64_t period_microseconds, 1745 const char *name) 1746 { 1747 return poller_register(fn, arg, period_microseconds, name); 1748 } 1749 1750 static void 1751 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1752 struct spdk_thread *curthread) 1753 { 1754 if (thread == NULL) { 1755 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1756 abort(); 1757 } 1758 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1759 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1760 thread->name, thread->id); 1761 assert(false); 1762 } 1763 1764 void 1765 spdk_poller_unregister(struct spdk_poller **ppoller) 1766 { 1767 struct spdk_thread *thread; 1768 struct spdk_poller *poller; 1769 1770 poller = *ppoller; 1771 if (poller == NULL) { 1772 return; 1773 } 1774 1775 *ppoller = NULL; 1776 1777 thread = spdk_get_thread(); 1778 if (!thread) { 1779 assert(false); 1780 return; 1781 } 1782 1783 if (poller->thread != thread) { 1784 wrong_thread(__func__, poller->name, poller->thread, thread); 1785 return; 1786 } 1787 1788 if (spdk_interrupt_mode_is_enabled()) { 1789 /* Release the interrupt resource for period or busy poller */ 1790 if (poller->intr != NULL) { 1791 poller_interrupt_fini(poller); 1792 } 1793 1794 /* If there is not already a pending poller removal, generate 1795 * a message to go process removals. */ 1796 if (!thread->poller_unregistered) { 1797 thread->poller_unregistered = true; 1798 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1799 } 1800 } 1801 1802 /* If the poller was paused, put it on the active_pollers list so that 1803 * its unregistration can be processed by spdk_thread_poll(). 1804 */ 1805 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1806 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1807 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1808 poller->period_ticks = 0; 1809 } 1810 1811 /* Simply set the state to unregistered. The poller will get cleaned up 1812 * in a subsequent call to spdk_thread_poll(). 1813 */ 1814 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1815 } 1816 1817 void 1818 spdk_poller_pause(struct spdk_poller *poller) 1819 { 1820 struct spdk_thread *thread; 1821 1822 thread = spdk_get_thread(); 1823 if (!thread) { 1824 assert(false); 1825 return; 1826 } 1827 1828 if (poller->thread != thread) { 1829 wrong_thread(__func__, poller->name, poller->thread, thread); 1830 return; 1831 } 1832 1833 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1834 * spdk_thread_poll() move it. It allows a poller to be paused from 1835 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1836 * iteration, or from within itself without breaking the logic to always 1837 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1838 */ 1839 switch (poller->state) { 1840 case SPDK_POLLER_STATE_PAUSED: 1841 case SPDK_POLLER_STATE_PAUSING: 1842 break; 1843 case SPDK_POLLER_STATE_RUNNING: 1844 case SPDK_POLLER_STATE_WAITING: 1845 poller->state = SPDK_POLLER_STATE_PAUSING; 1846 break; 1847 default: 1848 assert(false); 1849 break; 1850 } 1851 } 1852 1853 void 1854 spdk_poller_resume(struct spdk_poller *poller) 1855 { 1856 struct spdk_thread *thread; 1857 1858 thread = spdk_get_thread(); 1859 if (!thread) { 1860 assert(false); 1861 return; 1862 } 1863 1864 if (poller->thread != thread) { 1865 wrong_thread(__func__, poller->name, poller->thread, thread); 1866 return; 1867 } 1868 1869 /* If a poller is paused it has to be removed from the paused pollers 1870 * list and put on the active list or timer tree depending on its 1871 * period_ticks. If a poller is still in the process of being paused, 1872 * we just need to flip its state back to waiting, as it's already on 1873 * the appropriate list or tree. 1874 */ 1875 switch (poller->state) { 1876 case SPDK_POLLER_STATE_PAUSED: 1877 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1878 thread_insert_poller(thread, poller); 1879 /* fallthrough */ 1880 case SPDK_POLLER_STATE_PAUSING: 1881 poller->state = SPDK_POLLER_STATE_WAITING; 1882 break; 1883 case SPDK_POLLER_STATE_RUNNING: 1884 case SPDK_POLLER_STATE_WAITING: 1885 break; 1886 default: 1887 assert(false); 1888 break; 1889 } 1890 } 1891 1892 const char * 1893 spdk_poller_get_name(struct spdk_poller *poller) 1894 { 1895 return poller->name; 1896 } 1897 1898 uint64_t 1899 spdk_poller_get_id(struct spdk_poller *poller) 1900 { 1901 return poller->id; 1902 } 1903 1904 const char * 1905 spdk_poller_get_state_str(struct spdk_poller *poller) 1906 { 1907 switch (poller->state) { 1908 case SPDK_POLLER_STATE_WAITING: 1909 return "waiting"; 1910 case SPDK_POLLER_STATE_RUNNING: 1911 return "running"; 1912 case SPDK_POLLER_STATE_UNREGISTERED: 1913 return "unregistered"; 1914 case SPDK_POLLER_STATE_PAUSING: 1915 return "pausing"; 1916 case SPDK_POLLER_STATE_PAUSED: 1917 return "paused"; 1918 default: 1919 return NULL; 1920 } 1921 } 1922 1923 uint64_t 1924 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1925 { 1926 return poller->period_ticks; 1927 } 1928 1929 void 1930 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1931 { 1932 stats->run_count = poller->run_count; 1933 stats->busy_count = poller->busy_count; 1934 } 1935 1936 struct spdk_poller * 1937 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1938 { 1939 return TAILQ_FIRST(&thread->active_pollers); 1940 } 1941 1942 struct spdk_poller * 1943 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1944 { 1945 return TAILQ_NEXT(prev, tailq); 1946 } 1947 1948 struct spdk_poller * 1949 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1950 { 1951 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1952 } 1953 1954 struct spdk_poller * 1955 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1956 { 1957 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1958 } 1959 1960 struct spdk_poller * 1961 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1962 { 1963 return TAILQ_FIRST(&thread->paused_pollers); 1964 } 1965 1966 struct spdk_poller * 1967 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1968 { 1969 return TAILQ_NEXT(prev, tailq); 1970 } 1971 1972 struct spdk_io_channel * 1973 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1974 { 1975 return RB_MIN(io_channel_tree, &thread->io_channels); 1976 } 1977 1978 struct spdk_io_channel * 1979 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1980 { 1981 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1982 } 1983 1984 struct call_thread { 1985 struct spdk_thread *cur_thread; 1986 spdk_msg_fn fn; 1987 void *ctx; 1988 1989 struct spdk_thread *orig_thread; 1990 spdk_msg_fn cpl; 1991 }; 1992 1993 static void 1994 _back_to_orig_thread(void *ctx) 1995 { 1996 struct call_thread *ct = ctx; 1997 1998 assert(ct->orig_thread->for_each_count > 0); 1999 ct->orig_thread->for_each_count--; 2000 2001 ct->cpl(ct->ctx); 2002 free(ctx); 2003 } 2004 2005 static void 2006 _on_thread(void *ctx) 2007 { 2008 struct call_thread *ct = ctx; 2009 int rc __attribute__((unused)); 2010 2011 ct->fn(ct->ctx); 2012 2013 pthread_mutex_lock(&g_devlist_mutex); 2014 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2015 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 2016 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 2017 ct->cur_thread->name); 2018 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2019 } 2020 pthread_mutex_unlock(&g_devlist_mutex); 2021 2022 if (!ct->cur_thread) { 2023 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 2024 2025 rc = spdk_thread_send_msg(ct->orig_thread, _back_to_orig_thread, ctx); 2026 } else { 2027 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 2028 ct->cur_thread->name); 2029 2030 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 2031 } 2032 assert(rc == 0); 2033 } 2034 2035 void 2036 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 2037 { 2038 struct call_thread *ct; 2039 struct spdk_thread *thread; 2040 int rc __attribute__((unused)); 2041 2042 ct = calloc(1, sizeof(*ct)); 2043 if (!ct) { 2044 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2045 cpl(ctx); 2046 return; 2047 } 2048 2049 ct->fn = fn; 2050 ct->ctx = ctx; 2051 ct->cpl = cpl; 2052 2053 thread = _get_thread(); 2054 if (!thread) { 2055 SPDK_ERRLOG("No thread allocated\n"); 2056 free(ct); 2057 cpl(ctx); 2058 return; 2059 } 2060 ct->orig_thread = thread; 2061 2062 ct->orig_thread->for_each_count++; 2063 2064 pthread_mutex_lock(&g_devlist_mutex); 2065 ct->cur_thread = TAILQ_FIRST(&g_threads); 2066 pthread_mutex_unlock(&g_devlist_mutex); 2067 2068 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2069 ct->orig_thread->name); 2070 2071 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2072 assert(rc == 0); 2073 } 2074 2075 static inline void 2076 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2077 { 2078 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2079 return; 2080 } 2081 2082 if (poller->set_intr_cb_fn) { 2083 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2084 } 2085 } 2086 2087 void 2088 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2089 { 2090 struct spdk_thread *thread = _get_thread(); 2091 struct spdk_poller *poller, *tmp; 2092 2093 assert(thread); 2094 assert(spdk_interrupt_mode_is_enabled()); 2095 2096 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2097 thread->name, enable_interrupt ? "intr" : "poll", 2098 thread->in_interrupt ? "intr" : "poll"); 2099 2100 if (thread->in_interrupt == enable_interrupt) { 2101 return; 2102 } 2103 2104 /* Set pollers to expected mode */ 2105 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2106 poller_set_interrupt_mode(poller, enable_interrupt); 2107 } 2108 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2109 poller_set_interrupt_mode(poller, enable_interrupt); 2110 } 2111 /* All paused pollers will go to work in interrupt mode */ 2112 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2113 poller_set_interrupt_mode(poller, enable_interrupt); 2114 } 2115 2116 thread->in_interrupt = enable_interrupt; 2117 return; 2118 } 2119 2120 static struct io_device * 2121 io_device_get(void *io_device) 2122 { 2123 struct io_device find = {}; 2124 2125 find.io_device = io_device; 2126 return RB_FIND(io_device_tree, &g_io_devices, &find); 2127 } 2128 2129 void 2130 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2131 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2132 const char *name) 2133 { 2134 struct io_device *dev, *tmp; 2135 struct spdk_thread *thread; 2136 2137 assert(io_device != NULL); 2138 assert(create_cb != NULL); 2139 assert(destroy_cb != NULL); 2140 2141 thread = spdk_get_thread(); 2142 if (!thread) { 2143 SPDK_ERRLOG("called from non-SPDK thread\n"); 2144 assert(false); 2145 return; 2146 } 2147 2148 dev = calloc(1, sizeof(struct io_device)); 2149 if (dev == NULL) { 2150 SPDK_ERRLOG("could not allocate io_device\n"); 2151 return; 2152 } 2153 2154 dev->io_device = io_device; 2155 if (name) { 2156 snprintf(dev->name, sizeof(dev->name), "%s", name); 2157 } else { 2158 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2159 } 2160 dev->create_cb = create_cb; 2161 dev->destroy_cb = destroy_cb; 2162 dev->unregister_cb = NULL; 2163 dev->ctx_size = ctx_size; 2164 dev->for_each_count = 0; 2165 dev->unregistered = false; 2166 dev->refcnt = 0; 2167 2168 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2169 dev->name, dev->io_device, thread->name); 2170 2171 pthread_mutex_lock(&g_devlist_mutex); 2172 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2173 if (tmp != NULL) { 2174 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2175 io_device, tmp->name, dev->name); 2176 free(dev); 2177 } 2178 2179 pthread_mutex_unlock(&g_devlist_mutex); 2180 } 2181 2182 static void 2183 _finish_unregister(void *arg) 2184 { 2185 struct io_device *dev = arg; 2186 struct spdk_thread *thread; 2187 2188 thread = spdk_get_thread(); 2189 assert(thread == dev->unregister_thread); 2190 2191 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2192 dev->name, dev->io_device, thread->name); 2193 2194 assert(thread->pending_unregister_count > 0); 2195 thread->pending_unregister_count--; 2196 2197 dev->unregister_cb(dev->io_device); 2198 free(dev); 2199 } 2200 2201 static void 2202 io_device_free(struct io_device *dev) 2203 { 2204 int rc __attribute__((unused)); 2205 2206 if (dev->unregister_cb == NULL) { 2207 free(dev); 2208 } else { 2209 assert(dev->unregister_thread != NULL); 2210 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2211 dev->name, dev->io_device, dev->unregister_thread->name); 2212 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2213 assert(rc == 0); 2214 } 2215 } 2216 2217 void 2218 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2219 { 2220 struct io_device *dev; 2221 uint32_t refcnt; 2222 struct spdk_thread *thread; 2223 2224 thread = spdk_get_thread(); 2225 if (!thread) { 2226 SPDK_ERRLOG("called from non-SPDK thread\n"); 2227 assert(false); 2228 return; 2229 } 2230 2231 pthread_mutex_lock(&g_devlist_mutex); 2232 dev = io_device_get(io_device); 2233 if (!dev) { 2234 SPDK_ERRLOG("io_device %p not found\n", io_device); 2235 assert(false); 2236 pthread_mutex_unlock(&g_devlist_mutex); 2237 return; 2238 } 2239 2240 /* The for_each_count check differentiates the user attempting to unregister the 2241 * device a second time, from the internal call to this function that occurs 2242 * after the for_each_count reaches 0. 2243 */ 2244 if (dev->pending_unregister && dev->for_each_count > 0) { 2245 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2246 assert(false); 2247 pthread_mutex_unlock(&g_devlist_mutex); 2248 return; 2249 } 2250 2251 dev->unregister_cb = unregister_cb; 2252 dev->unregister_thread = thread; 2253 2254 if (dev->for_each_count > 0) { 2255 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2256 dev->name, io_device, dev->for_each_count); 2257 dev->pending_unregister = true; 2258 pthread_mutex_unlock(&g_devlist_mutex); 2259 return; 2260 } 2261 2262 dev->unregistered = true; 2263 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2264 refcnt = dev->refcnt; 2265 pthread_mutex_unlock(&g_devlist_mutex); 2266 2267 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2268 dev->name, dev->io_device, thread->name); 2269 2270 if (unregister_cb) { 2271 thread->pending_unregister_count++; 2272 } 2273 2274 if (refcnt > 0) { 2275 /* defer deletion */ 2276 return; 2277 } 2278 2279 io_device_free(dev); 2280 } 2281 2282 const char * 2283 spdk_io_device_get_name(struct io_device *dev) 2284 { 2285 return dev->name; 2286 } 2287 2288 static struct spdk_io_channel * 2289 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2290 { 2291 struct spdk_io_channel find = {}; 2292 2293 find.dev = dev; 2294 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2295 } 2296 2297 struct spdk_io_channel * 2298 spdk_get_io_channel(void *io_device) 2299 { 2300 struct spdk_io_channel *ch; 2301 struct spdk_thread *thread; 2302 struct io_device *dev; 2303 int rc; 2304 2305 pthread_mutex_lock(&g_devlist_mutex); 2306 dev = io_device_get(io_device); 2307 if (dev == NULL) { 2308 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2309 pthread_mutex_unlock(&g_devlist_mutex); 2310 return NULL; 2311 } 2312 2313 thread = _get_thread(); 2314 if (!thread) { 2315 SPDK_ERRLOG("No thread allocated\n"); 2316 pthread_mutex_unlock(&g_devlist_mutex); 2317 return NULL; 2318 } 2319 2320 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2321 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2322 pthread_mutex_unlock(&g_devlist_mutex); 2323 return NULL; 2324 } 2325 2326 ch = thread_get_io_channel(thread, dev); 2327 if (ch != NULL) { 2328 ch->ref++; 2329 2330 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2331 ch, dev->name, dev->io_device, thread->name, ch->ref); 2332 2333 /* 2334 * An I/O channel already exists for this device on this 2335 * thread, so return it. 2336 */ 2337 pthread_mutex_unlock(&g_devlist_mutex); 2338 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2339 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2340 return ch; 2341 } 2342 2343 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2344 if (ch == NULL) { 2345 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2346 pthread_mutex_unlock(&g_devlist_mutex); 2347 return NULL; 2348 } 2349 2350 ch->dev = dev; 2351 ch->destroy_cb = dev->destroy_cb; 2352 ch->thread = thread; 2353 ch->ref = 1; 2354 ch->destroy_ref = 0; 2355 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2356 2357 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2358 ch, dev->name, dev->io_device, thread->name, ch->ref); 2359 2360 dev->refcnt++; 2361 2362 pthread_mutex_unlock(&g_devlist_mutex); 2363 2364 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2365 if (rc != 0) { 2366 pthread_mutex_lock(&g_devlist_mutex); 2367 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2368 dev->refcnt--; 2369 free(ch); 2370 SPDK_ERRLOG("could not create io_channel for io_device %s (%p): %s (rc=%d)\n", 2371 dev->name, io_device, spdk_strerror(-rc), rc); 2372 pthread_mutex_unlock(&g_devlist_mutex); 2373 return NULL; 2374 } 2375 2376 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2377 return ch; 2378 } 2379 2380 static void 2381 put_io_channel(void *arg) 2382 { 2383 struct spdk_io_channel *ch = arg; 2384 bool do_remove_dev = true; 2385 struct spdk_thread *thread; 2386 2387 thread = spdk_get_thread(); 2388 if (!thread) { 2389 SPDK_ERRLOG("called from non-SPDK thread\n"); 2390 assert(false); 2391 return; 2392 } 2393 2394 SPDK_DEBUGLOG(thread, 2395 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2396 ch, ch->dev->name, ch->dev->io_device, thread->name); 2397 2398 assert(ch->thread == thread); 2399 2400 ch->destroy_ref--; 2401 2402 if (ch->ref > 0 || ch->destroy_ref > 0) { 2403 /* 2404 * Another reference to the associated io_device was requested 2405 * after this message was sent but before it had a chance to 2406 * execute. 2407 */ 2408 return; 2409 } 2410 2411 pthread_mutex_lock(&g_devlist_mutex); 2412 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2413 pthread_mutex_unlock(&g_devlist_mutex); 2414 2415 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2416 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2417 2418 pthread_mutex_lock(&g_devlist_mutex); 2419 ch->dev->refcnt--; 2420 2421 if (!ch->dev->unregistered) { 2422 do_remove_dev = false; 2423 } 2424 2425 if (ch->dev->refcnt > 0) { 2426 do_remove_dev = false; 2427 } 2428 2429 pthread_mutex_unlock(&g_devlist_mutex); 2430 2431 if (do_remove_dev) { 2432 io_device_free(ch->dev); 2433 } 2434 free(ch); 2435 } 2436 2437 void 2438 spdk_put_io_channel(struct spdk_io_channel *ch) 2439 { 2440 struct spdk_thread *thread; 2441 int rc __attribute__((unused)); 2442 2443 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2444 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2445 2446 thread = spdk_get_thread(); 2447 if (!thread) { 2448 SPDK_ERRLOG("called from non-SPDK thread\n"); 2449 assert(false); 2450 return; 2451 } 2452 2453 if (ch->thread != thread) { 2454 wrong_thread(__func__, "ch", ch->thread, thread); 2455 return; 2456 } 2457 2458 SPDK_DEBUGLOG(thread, 2459 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2460 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2461 2462 ch->ref--; 2463 2464 if (ch->ref == 0) { 2465 ch->destroy_ref++; 2466 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2467 assert(rc == 0); 2468 } 2469 } 2470 2471 struct spdk_io_channel * 2472 spdk_io_channel_from_ctx(void *ctx) 2473 { 2474 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2475 } 2476 2477 struct spdk_thread * 2478 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2479 { 2480 return ch->thread; 2481 } 2482 2483 void * 2484 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2485 { 2486 return ch->dev->io_device; 2487 } 2488 2489 const char * 2490 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2491 { 2492 return spdk_io_device_get_name(ch->dev); 2493 } 2494 2495 int 2496 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2497 { 2498 return ch->ref; 2499 } 2500 2501 struct spdk_io_channel_iter { 2502 void *io_device; 2503 struct io_device *dev; 2504 spdk_channel_msg fn; 2505 int status; 2506 void *ctx; 2507 struct spdk_io_channel *ch; 2508 2509 struct spdk_thread *cur_thread; 2510 2511 struct spdk_thread *orig_thread; 2512 spdk_channel_for_each_cpl cpl; 2513 }; 2514 2515 void * 2516 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2517 { 2518 return i->io_device; 2519 } 2520 2521 struct spdk_io_channel * 2522 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2523 { 2524 return i->ch; 2525 } 2526 2527 void * 2528 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2529 { 2530 return i->ctx; 2531 } 2532 2533 static void 2534 _call_completion(void *ctx) 2535 { 2536 struct spdk_io_channel_iter *i = ctx; 2537 2538 assert(i->orig_thread->for_each_count > 0); 2539 i->orig_thread->for_each_count--; 2540 2541 if (i->cpl != NULL) { 2542 i->cpl(i, i->status); 2543 } 2544 free(i); 2545 } 2546 2547 static void 2548 _call_channel(void *ctx) 2549 { 2550 struct spdk_io_channel_iter *i = ctx; 2551 struct spdk_io_channel *ch; 2552 2553 /* 2554 * It is possible that the channel was deleted before this 2555 * message had a chance to execute. If so, skip calling 2556 * the fn() on this thread. 2557 */ 2558 pthread_mutex_lock(&g_devlist_mutex); 2559 ch = thread_get_io_channel(i->cur_thread, i->dev); 2560 pthread_mutex_unlock(&g_devlist_mutex); 2561 2562 if (ch) { 2563 i->fn(i); 2564 } else { 2565 spdk_for_each_channel_continue(i, 0); 2566 } 2567 } 2568 2569 void 2570 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2571 spdk_channel_for_each_cpl cpl) 2572 { 2573 struct spdk_thread *thread; 2574 struct spdk_io_channel *ch; 2575 struct spdk_io_channel_iter *i; 2576 int rc __attribute__((unused)); 2577 2578 i = calloc(1, sizeof(*i)); 2579 if (!i) { 2580 SPDK_ERRLOG("Unable to allocate iterator\n"); 2581 assert(false); 2582 return; 2583 } 2584 2585 i->io_device = io_device; 2586 i->fn = fn; 2587 i->ctx = ctx; 2588 i->cpl = cpl; 2589 i->orig_thread = _get_thread(); 2590 2591 i->orig_thread->for_each_count++; 2592 2593 pthread_mutex_lock(&g_devlist_mutex); 2594 i->dev = io_device_get(io_device); 2595 if (i->dev == NULL) { 2596 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2597 assert(false); 2598 i->status = -ENODEV; 2599 goto end; 2600 } 2601 2602 /* Do not allow new for_each operations if we are already waiting to unregister 2603 * the device for other for_each operations to complete. 2604 */ 2605 if (i->dev->pending_unregister) { 2606 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2607 i->status = -ENODEV; 2608 goto end; 2609 } 2610 2611 TAILQ_FOREACH(thread, &g_threads, tailq) { 2612 ch = thread_get_io_channel(thread, i->dev); 2613 if (ch != NULL) { 2614 ch->dev->for_each_count++; 2615 i->cur_thread = thread; 2616 i->ch = ch; 2617 pthread_mutex_unlock(&g_devlist_mutex); 2618 rc = spdk_thread_send_msg(thread, _call_channel, i); 2619 assert(rc == 0); 2620 return; 2621 } 2622 } 2623 2624 end: 2625 pthread_mutex_unlock(&g_devlist_mutex); 2626 2627 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2628 assert(rc == 0); 2629 } 2630 2631 static void 2632 __pending_unregister(void *arg) 2633 { 2634 struct io_device *dev = arg; 2635 2636 assert(dev->pending_unregister); 2637 assert(dev->for_each_count == 0); 2638 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2639 } 2640 2641 void 2642 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2643 { 2644 struct spdk_thread *thread; 2645 struct spdk_io_channel *ch; 2646 struct io_device *dev; 2647 int rc __attribute__((unused)); 2648 2649 assert(i->cur_thread == spdk_get_thread()); 2650 2651 i->status = status; 2652 2653 pthread_mutex_lock(&g_devlist_mutex); 2654 dev = i->dev; 2655 if (status) { 2656 goto end; 2657 } 2658 2659 thread = TAILQ_NEXT(i->cur_thread, tailq); 2660 while (thread) { 2661 ch = thread_get_io_channel(thread, dev); 2662 if (ch != NULL) { 2663 i->cur_thread = thread; 2664 i->ch = ch; 2665 pthread_mutex_unlock(&g_devlist_mutex); 2666 rc = spdk_thread_send_msg(thread, _call_channel, i); 2667 assert(rc == 0); 2668 return; 2669 } 2670 thread = TAILQ_NEXT(thread, tailq); 2671 } 2672 2673 end: 2674 dev->for_each_count--; 2675 i->ch = NULL; 2676 pthread_mutex_unlock(&g_devlist_mutex); 2677 2678 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2679 assert(rc == 0); 2680 2681 pthread_mutex_lock(&g_devlist_mutex); 2682 if (dev->pending_unregister && dev->for_each_count == 0) { 2683 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2684 assert(rc == 0); 2685 } 2686 pthread_mutex_unlock(&g_devlist_mutex); 2687 } 2688 2689 static void 2690 thread_interrupt_destroy(struct spdk_thread *thread) 2691 { 2692 struct spdk_fd_group *fgrp = thread->fgrp; 2693 2694 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2695 2696 if (thread->msg_fd < 0) { 2697 return; 2698 } 2699 2700 spdk_fd_group_remove(fgrp, thread->msg_fd); 2701 close(thread->msg_fd); 2702 thread->msg_fd = -1; 2703 2704 spdk_fd_group_destroy(fgrp); 2705 thread->fgrp = NULL; 2706 } 2707 2708 #ifdef __linux__ 2709 static int 2710 thread_interrupt_msg_process(void *arg) 2711 { 2712 struct spdk_thread *thread = arg; 2713 struct spdk_thread *orig_thread; 2714 uint32_t msg_count; 2715 spdk_msg_fn critical_msg; 2716 int rc = 0; 2717 uint64_t notify = 1; 2718 2719 assert(spdk_interrupt_mode_is_enabled()); 2720 2721 orig_thread = spdk_get_thread(); 2722 spdk_set_thread(thread); 2723 2724 /* There may be race between msg_acknowledge and another producer's msg_notify, 2725 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2726 * This can avoid msg notification missing. 2727 */ 2728 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2729 if (rc < 0 && errno != EAGAIN) { 2730 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2731 } 2732 2733 critical_msg = thread->critical_msg; 2734 if (spdk_unlikely(critical_msg != NULL)) { 2735 critical_msg(NULL); 2736 thread->critical_msg = NULL; 2737 rc = 1; 2738 } 2739 2740 msg_count = msg_queue_run_batch(thread, 0); 2741 if (msg_count) { 2742 rc = 1; 2743 } 2744 2745 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2746 if (spdk_unlikely(!thread->in_interrupt)) { 2747 /* The thread transitioned to poll mode in a msg during the above processing. 2748 * Clear msg_fd since thread messages will be polled directly in poll mode. 2749 */ 2750 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2751 if (rc < 0 && errno != EAGAIN) { 2752 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2753 } 2754 } 2755 2756 spdk_set_thread(orig_thread); 2757 return rc; 2758 } 2759 2760 static int 2761 thread_interrupt_create(struct spdk_thread *thread) 2762 { 2763 int rc; 2764 2765 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2766 2767 rc = spdk_fd_group_create(&thread->fgrp); 2768 if (rc) { 2769 return rc; 2770 } 2771 2772 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2773 if (thread->msg_fd < 0) { 2774 rc = -errno; 2775 spdk_fd_group_destroy(thread->fgrp); 2776 thread->fgrp = NULL; 2777 2778 return rc; 2779 } 2780 2781 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2782 thread_interrupt_msg_process, thread); 2783 } 2784 #else 2785 static int 2786 thread_interrupt_create(struct spdk_thread *thread) 2787 { 2788 return -ENOTSUP; 2789 } 2790 #endif 2791 2792 static int 2793 _interrupt_wrapper(void *ctx) 2794 { 2795 struct spdk_interrupt *intr = ctx; 2796 struct spdk_thread *orig_thread, *thread; 2797 int rc; 2798 2799 orig_thread = spdk_get_thread(); 2800 thread = intr->thread; 2801 2802 spdk_set_thread(thread); 2803 2804 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2805 intr->fn, intr->arg); 2806 2807 rc = intr->fn(intr->arg); 2808 2809 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2810 2811 spdk_set_thread(orig_thread); 2812 2813 return rc; 2814 } 2815 2816 struct spdk_interrupt * 2817 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2818 void *arg, const char *name) 2819 { 2820 return spdk_interrupt_register_for_events(efd, SPDK_INTERRUPT_EVENT_IN, fn, arg, name); 2821 } 2822 2823 struct spdk_interrupt * 2824 spdk_interrupt_register_for_events(int efd, uint32_t events, spdk_interrupt_fn fn, void *arg, 2825 const char *name) 2826 { 2827 struct spdk_thread *thread; 2828 struct spdk_interrupt *intr; 2829 int ret; 2830 2831 thread = spdk_get_thread(); 2832 if (!thread) { 2833 assert(false); 2834 return NULL; 2835 } 2836 2837 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2838 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2839 return NULL; 2840 } 2841 2842 intr = calloc(1, sizeof(*intr)); 2843 if (intr == NULL) { 2844 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2845 return NULL; 2846 } 2847 2848 if (name) { 2849 snprintf(intr->name, sizeof(intr->name), "%s", name); 2850 } else { 2851 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2852 } 2853 2854 intr->efd = efd; 2855 intr->thread = thread; 2856 intr->fn = fn; 2857 intr->arg = arg; 2858 2859 ret = spdk_fd_group_add_for_events(thread->fgrp, efd, events, _interrupt_wrapper, intr, intr->name); 2860 2861 if (ret != 0) { 2862 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2863 thread->name, efd, spdk_strerror(-ret)); 2864 free(intr); 2865 return NULL; 2866 } 2867 2868 return intr; 2869 } 2870 2871 void 2872 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2873 { 2874 struct spdk_thread *thread; 2875 struct spdk_interrupt *intr; 2876 2877 intr = *pintr; 2878 if (intr == NULL) { 2879 return; 2880 } 2881 2882 *pintr = NULL; 2883 2884 thread = spdk_get_thread(); 2885 if (!thread) { 2886 assert(false); 2887 return; 2888 } 2889 2890 if (intr->thread != thread) { 2891 wrong_thread(__func__, intr->name, intr->thread, thread); 2892 return; 2893 } 2894 2895 spdk_fd_group_remove(thread->fgrp, intr->efd); 2896 free(intr); 2897 } 2898 2899 int 2900 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2901 enum spdk_interrupt_event_types event_types) 2902 { 2903 struct spdk_thread *thread; 2904 2905 thread = spdk_get_thread(); 2906 if (!thread) { 2907 assert(false); 2908 return -EINVAL; 2909 } 2910 2911 if (intr->thread != thread) { 2912 wrong_thread(__func__, intr->name, intr->thread, thread); 2913 return -EINVAL; 2914 } 2915 2916 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2917 } 2918 2919 int 2920 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2921 { 2922 return spdk_fd_group_get_fd(thread->fgrp); 2923 } 2924 2925 struct spdk_fd_group * 2926 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2927 { 2928 return thread->fgrp; 2929 } 2930 2931 static bool g_interrupt_mode = false; 2932 2933 int 2934 spdk_interrupt_mode_enable(void) 2935 { 2936 /* It must be called once prior to initializing the threading library. 2937 * g_spdk_msg_mempool will be valid if thread library is initialized. 2938 */ 2939 if (g_spdk_msg_mempool) { 2940 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2941 return -1; 2942 } 2943 2944 #ifdef __linux__ 2945 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2946 g_interrupt_mode = true; 2947 return 0; 2948 #else 2949 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2950 g_interrupt_mode = false; 2951 return -ENOTSUP; 2952 #endif 2953 } 2954 2955 bool 2956 spdk_interrupt_mode_is_enabled(void) 2957 { 2958 return g_interrupt_mode; 2959 } 2960 2961 #define SSPIN_DEBUG_STACK_FRAMES 16 2962 2963 struct sspin_stack { 2964 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 2965 uint32_t depth; 2966 }; 2967 2968 struct spdk_spinlock_internal { 2969 struct sspin_stack init_stack; 2970 struct sspin_stack lock_stack; 2971 struct sspin_stack unlock_stack; 2972 }; 2973 2974 static void 2975 sspin_init_internal(struct spdk_spinlock *sspin) 2976 { 2977 #ifdef DEBUG 2978 sspin->internal = calloc(1, sizeof(*sspin->internal)); 2979 #endif 2980 } 2981 2982 static void 2983 sspin_fini_internal(struct spdk_spinlock *sspin) 2984 { 2985 #ifdef DEBUG 2986 free(sspin->internal); 2987 sspin->internal = NULL; 2988 #endif 2989 } 2990 2991 #if defined(DEBUG) && defined(SPDK_HAVE_EXECINFO_H) 2992 #define SSPIN_GET_STACK(sspin, which) \ 2993 do { \ 2994 if (sspin->internal != NULL) { \ 2995 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 2996 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 2997 } \ 2998 } while (0) 2999 #else 3000 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 3001 #endif 3002 3003 static void 3004 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 3005 { 3006 #ifdef SPDK_HAVE_EXECINFO_H 3007 char **stack; 3008 size_t i; 3009 3010 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 3011 if (stack == NULL) { 3012 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 3013 return; 3014 } 3015 SPDK_ERRLOG(" %s:\n", title); 3016 for (i = 0; i < sspin_stack->depth; i++) { 3017 /* 3018 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 3019 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 3020 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 3021 * gdb_macros.md in the docs directory for details. 3022 */ 3023 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 3024 } 3025 free(stack); 3026 #endif /* SPDK_HAVE_EXECINFO_H */ 3027 } 3028 3029 static void 3030 sspin_stacks_print(const struct spdk_spinlock *sspin) 3031 { 3032 if (sspin->internal == NULL) { 3033 return; 3034 } 3035 SPDK_ERRLOG("spinlock %p\n", sspin); 3036 sspin_stack_print("Lock initialized at", &sspin->internal->init_stack); 3037 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 3038 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 3039 } 3040 3041 void 3042 spdk_spin_init(struct spdk_spinlock *sspin) 3043 { 3044 int rc; 3045 3046 memset(sspin, 0, sizeof(*sspin)); 3047 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 3048 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3049 sspin_init_internal(sspin); 3050 SSPIN_GET_STACK(sspin, init); 3051 sspin->initialized = true; 3052 } 3053 3054 void 3055 spdk_spin_destroy(struct spdk_spinlock *sspin) 3056 { 3057 int rc; 3058 3059 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3060 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3061 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 3062 3063 rc = pthread_spin_destroy(&sspin->spinlock); 3064 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3065 3066 sspin_fini_internal(sspin); 3067 sspin->initialized = false; 3068 sspin->destroyed = true; 3069 } 3070 3071 void 3072 spdk_spin_lock(struct spdk_spinlock *sspin) 3073 { 3074 struct spdk_thread *thread = spdk_get_thread(); 3075 int rc; 3076 3077 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3078 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3079 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3080 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3081 3082 rc = pthread_spin_lock(&sspin->spinlock); 3083 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3084 3085 sspin->thread = thread; 3086 sspin->thread->lock_count++; 3087 3088 SSPIN_GET_STACK(sspin, lock); 3089 } 3090 3091 void 3092 spdk_spin_unlock(struct spdk_spinlock *sspin) 3093 { 3094 struct spdk_thread *thread = spdk_get_thread(); 3095 int rc; 3096 3097 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3098 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3099 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3100 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3101 3102 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3103 thread->lock_count--; 3104 sspin->thread = NULL; 3105 3106 SSPIN_GET_STACK(sspin, unlock); 3107 3108 rc = pthread_spin_unlock(&sspin->spinlock); 3109 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3110 } 3111 3112 bool 3113 spdk_spin_held(struct spdk_spinlock *sspin) 3114 { 3115 struct spdk_thread *thread = spdk_get_thread(); 3116 3117 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3118 3119 return sspin->thread == thread; 3120 } 3121 3122 SPDK_LOG_REGISTER_COMPONENT(thread) 3123