1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #endif 29 30 #ifdef SPDK_HAVE_EXECINFO_H 31 #include <execinfo.h> 32 #endif 33 34 #define SPDK_MSG_BATCH_SIZE 8 35 #define SPDK_MAX_DEVICE_NAME_LEN 256 36 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 37 #define SPDK_MAX_POLLER_NAME_LEN 256 38 #define SPDK_MAX_THREAD_NAME_LEN 256 39 40 static struct spdk_thread *g_app_thread; 41 42 struct spdk_interrupt { 43 int efd; 44 struct spdk_thread *thread; 45 spdk_interrupt_fn fn; 46 void *arg; 47 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 48 }; 49 50 enum spdk_poller_state { 51 /* The poller is registered with a thread but not currently executing its fn. */ 52 SPDK_POLLER_STATE_WAITING, 53 54 /* The poller is currently running its fn. */ 55 SPDK_POLLER_STATE_RUNNING, 56 57 /* The poller was unregistered during the execution of its fn. */ 58 SPDK_POLLER_STATE_UNREGISTERED, 59 60 /* The poller is in the process of being paused. It will be paused 61 * during the next time it's supposed to be executed. 62 */ 63 SPDK_POLLER_STATE_PAUSING, 64 65 /* The poller is registered but currently paused. It's on the 66 * paused_pollers list. 67 */ 68 SPDK_POLLER_STATE_PAUSED, 69 }; 70 71 struct spdk_poller { 72 TAILQ_ENTRY(spdk_poller) tailq; 73 RB_ENTRY(spdk_poller) node; 74 75 /* Current state of the poller; should only be accessed from the poller's thread. */ 76 enum spdk_poller_state state; 77 78 uint64_t period_ticks; 79 uint64_t next_run_tick; 80 uint64_t run_count; 81 uint64_t busy_count; 82 uint64_t id; 83 spdk_poller_fn fn; 84 void *arg; 85 struct spdk_thread *thread; 86 struct spdk_interrupt *intr; 87 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 88 void *set_intr_cb_arg; 89 90 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 91 }; 92 93 enum spdk_thread_state { 94 /* The thread is processing poller and message by spdk_thread_poll(). */ 95 SPDK_THREAD_STATE_RUNNING, 96 97 /* The thread is in the process of termination. It reaps unregistering 98 * poller are releasing I/O channel. 99 */ 100 SPDK_THREAD_STATE_EXITING, 101 102 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 103 SPDK_THREAD_STATE_EXITED, 104 }; 105 106 struct spdk_thread { 107 uint64_t tsc_last; 108 struct spdk_thread_stats stats; 109 /* 110 * Contains pollers actively running on this thread. Pollers 111 * are run round-robin. The thread takes one poller from the head 112 * of the ring, executes it, then puts it back at the tail of 113 * the ring. 114 */ 115 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 116 /** 117 * Contains pollers running on this thread with a periodic timer. 118 */ 119 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 120 struct spdk_poller *first_timed_poller; 121 /* 122 * Contains paused pollers. Pollers on this queue are waiting until 123 * they are resumed (in which case they're put onto the active/timer 124 * queues) or unregistered. 125 */ 126 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 127 struct spdk_ring *messages; 128 int msg_fd; 129 SLIST_HEAD(, spdk_msg) msg_cache; 130 size_t msg_cache_count; 131 spdk_msg_fn critical_msg; 132 uint64_t id; 133 uint64_t next_poller_id; 134 enum spdk_thread_state state; 135 int pending_unregister_count; 136 uint32_t for_each_count; 137 138 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 139 TAILQ_ENTRY(spdk_thread) tailq; 140 141 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 142 struct spdk_cpuset cpumask; 143 uint64_t exit_timeout_tsc; 144 145 int32_t lock_count; 146 147 /* spdk_thread is bound to current CPU core. */ 148 bool is_bound; 149 150 /* Indicates whether this spdk_thread currently runs in interrupt. */ 151 bool in_interrupt; 152 bool poller_unregistered; 153 struct spdk_fd_group *fgrp; 154 155 /* User context allocated at the end */ 156 uint8_t ctx[0]; 157 }; 158 159 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 160 161 static spdk_new_thread_fn g_new_thread_fn = NULL; 162 static spdk_thread_op_fn g_thread_op_fn = NULL; 163 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 164 static size_t g_ctx_sz = 0; 165 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 166 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 167 * SPDK application is required. 168 */ 169 static uint64_t g_thread_id = 1; 170 171 enum spin_error { 172 SPIN_ERR_NONE, 173 /* Trying to use an SPDK lock while not on an SPDK thread */ 174 SPIN_ERR_NOT_SPDK_THREAD, 175 /* Trying to lock a lock already held by this SPDK thread */ 176 SPIN_ERR_DEADLOCK, 177 /* Trying to unlock a lock not held by this SPDK thread */ 178 SPIN_ERR_WRONG_THREAD, 179 /* pthread_spin_*() returned an error */ 180 SPIN_ERR_PTHREAD, 181 /* Trying to destroy a lock that is held */ 182 SPIN_ERR_LOCK_HELD, 183 /* lock_count is invalid */ 184 SPIN_ERR_LOCK_COUNT, 185 /* 186 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 187 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 188 * deadlock when another SPDK thread on the same pthread tries to take that lock. 189 */ 190 SPIN_ERR_HOLD_DURING_SWITCH, 191 /* Trying to use a lock that was destroyed (but not re-initialized) */ 192 SPIN_ERR_DESTROYED, 193 /* Trying to use a lock that is not initialized */ 194 SPIN_ERR_NOT_INITIALIZED, 195 196 /* Must be last, not an actual error code */ 197 SPIN_ERR_LAST 198 }; 199 200 static const char *spin_error_strings[] = { 201 [SPIN_ERR_NONE] = "No error", 202 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 203 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 204 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 205 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 206 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 207 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 208 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 209 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 210 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 211 }; 212 213 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 214 ? "Unknown error" : spin_error_strings[err] 215 216 static void 217 __posix_abort(enum spin_error err) 218 { 219 abort(); 220 } 221 222 typedef void (*spin_abort)(enum spin_error err); 223 spin_abort g_spin_abort_fn = __posix_abort; 224 225 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 226 do { \ 227 if (spdk_unlikely(!(cond))) { \ 228 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 229 SPIN_ERROR_STRING(err), #cond); \ 230 extra_log; \ 231 g_spin_abort_fn(err); \ 232 ret; \ 233 } \ 234 } while (0) 235 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 236 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 237 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 238 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 239 240 struct io_device { 241 void *io_device; 242 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 243 spdk_io_channel_create_cb create_cb; 244 spdk_io_channel_destroy_cb destroy_cb; 245 spdk_io_device_unregister_cb unregister_cb; 246 struct spdk_thread *unregister_thread; 247 uint32_t ctx_size; 248 uint32_t for_each_count; 249 RB_ENTRY(io_device) node; 250 251 uint32_t refcnt; 252 253 bool pending_unregister; 254 bool unregistered; 255 }; 256 257 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 258 259 static int 260 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 261 { 262 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 263 } 264 265 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 266 267 static int 268 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 269 { 270 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 271 } 272 273 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 274 275 struct spdk_msg { 276 spdk_msg_fn fn; 277 void *arg; 278 279 SLIST_ENTRY(spdk_msg) link; 280 }; 281 282 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 283 284 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 285 static uint32_t g_thread_count = 0; 286 287 static __thread struct spdk_thread *tls_thread = NULL; 288 289 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 290 { 291 struct spdk_trace_tpoint_opts opts[] = { 292 { 293 "THREAD_IOCH_GET", TRACE_THREAD_IOCH_GET, 294 OWNER_TYPE_NONE, OBJECT_NONE, 0, 295 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 296 }, 297 { 298 "THREAD_IOCH_PUT", TRACE_THREAD_IOCH_PUT, 299 OWNER_TYPE_NONE, OBJECT_NONE, 0, 300 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 301 } 302 }; 303 304 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 305 } 306 307 /* 308 * If this compare function returns zero when two next_run_ticks are equal, 309 * the macro RB_INSERT() returns a pointer to the element with the same 310 * next_run_tick. 311 * 312 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 313 * to remove as a parameter. 314 * 315 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 316 * side by returning 1 when two next_run_ticks are equal. 317 */ 318 static inline int 319 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 320 { 321 if (poller1->next_run_tick < poller2->next_run_tick) { 322 return -1; 323 } else { 324 return 1; 325 } 326 } 327 328 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 329 330 static inline struct spdk_thread * 331 _get_thread(void) 332 { 333 return tls_thread; 334 } 335 336 static int 337 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 338 { 339 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 340 341 g_ctx_sz = ctx_sz; 342 343 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 344 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 345 sizeof(struct spdk_msg), 346 0, /* No cache. We do our own. */ 347 SPDK_ENV_NUMA_ID_ANY); 348 349 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 350 msg_mempool_sz); 351 352 if (!g_spdk_msg_mempool) { 353 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 354 return -ENOMEM; 355 } 356 357 return 0; 358 } 359 360 static void thread_interrupt_destroy(struct spdk_thread *thread); 361 static int thread_interrupt_create(struct spdk_thread *thread); 362 363 static void 364 _free_thread(struct spdk_thread *thread) 365 { 366 struct spdk_io_channel *ch; 367 struct spdk_msg *msg; 368 struct spdk_poller *poller, *ptmp; 369 370 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 371 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 372 thread->name, ch->dev->name); 373 } 374 375 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 376 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 377 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 378 poller->name); 379 } 380 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 381 free(poller); 382 } 383 384 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 385 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 386 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 387 poller->name); 388 } 389 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 390 free(poller); 391 } 392 393 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 394 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 395 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 396 free(poller); 397 } 398 399 pthread_mutex_lock(&g_devlist_mutex); 400 assert(g_thread_count > 0); 401 g_thread_count--; 402 TAILQ_REMOVE(&g_threads, thread, tailq); 403 pthread_mutex_unlock(&g_devlist_mutex); 404 405 msg = SLIST_FIRST(&thread->msg_cache); 406 while (msg != NULL) { 407 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 408 409 assert(thread->msg_cache_count > 0); 410 thread->msg_cache_count--; 411 spdk_mempool_put(g_spdk_msg_mempool, msg); 412 413 msg = SLIST_FIRST(&thread->msg_cache); 414 } 415 416 assert(thread->msg_cache_count == 0); 417 418 if (spdk_interrupt_mode_is_enabled()) { 419 thread_interrupt_destroy(thread); 420 } 421 422 spdk_ring_free(thread->messages); 423 free(thread); 424 } 425 426 int 427 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 428 { 429 assert(g_new_thread_fn == NULL); 430 assert(g_thread_op_fn == NULL); 431 432 if (new_thread_fn == NULL) { 433 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 434 } else { 435 g_new_thread_fn = new_thread_fn; 436 } 437 438 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 439 } 440 441 int 442 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 443 spdk_thread_op_supported_fn thread_op_supported_fn, 444 size_t ctx_sz, size_t msg_mempool_sz) 445 { 446 assert(g_new_thread_fn == NULL); 447 assert(g_thread_op_fn == NULL); 448 assert(g_thread_op_supported_fn == NULL); 449 450 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 451 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 452 return -EINVAL; 453 } 454 455 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 456 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 457 } else { 458 g_thread_op_fn = thread_op_fn; 459 g_thread_op_supported_fn = thread_op_supported_fn; 460 } 461 462 return _thread_lib_init(ctx_sz, msg_mempool_sz); 463 } 464 465 void 466 spdk_thread_lib_fini(void) 467 { 468 struct io_device *dev; 469 470 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 471 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 472 } 473 474 g_new_thread_fn = NULL; 475 g_thread_op_fn = NULL; 476 g_thread_op_supported_fn = NULL; 477 g_ctx_sz = 0; 478 if (g_app_thread != NULL) { 479 _free_thread(g_app_thread); 480 g_app_thread = NULL; 481 } 482 483 if (g_spdk_msg_mempool) { 484 spdk_mempool_free(g_spdk_msg_mempool); 485 g_spdk_msg_mempool = NULL; 486 } 487 } 488 489 struct spdk_thread * 490 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 491 { 492 struct spdk_thread *thread, *null_thread; 493 size_t size = SPDK_ALIGN_CEIL(sizeof(*thread) + g_ctx_sz, SPDK_CACHE_LINE_SIZE); 494 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 495 int rc = 0, i; 496 497 /* Since this spdk_thread object will be used by another core, ensure that it won't share a 498 * cache line with any other object allocated on this core */ 499 rc = posix_memalign((void **)&thread, SPDK_CACHE_LINE_SIZE, size); 500 if (rc != 0) { 501 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 502 return NULL; 503 } 504 memset(thread, 0, size); 505 506 if (cpumask) { 507 spdk_cpuset_copy(&thread->cpumask, cpumask); 508 } else { 509 spdk_cpuset_negate(&thread->cpumask); 510 } 511 512 RB_INIT(&thread->io_channels); 513 TAILQ_INIT(&thread->active_pollers); 514 RB_INIT(&thread->timed_pollers); 515 TAILQ_INIT(&thread->paused_pollers); 516 SLIST_INIT(&thread->msg_cache); 517 thread->msg_cache_count = 0; 518 519 thread->tsc_last = spdk_get_ticks(); 520 521 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 522 * ID exceeds UINT64_MAX a warning message is logged 523 */ 524 thread->next_poller_id = 1; 525 526 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY); 527 if (!thread->messages) { 528 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 529 free(thread); 530 return NULL; 531 } 532 533 /* Fill the local message pool cache. */ 534 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 535 if (rc == 0) { 536 /* If we can't populate the cache it's ok. The cache will get filled 537 * up organically as messages are passed to the thread. */ 538 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 539 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 540 thread->msg_cache_count++; 541 } 542 } 543 544 if (name) { 545 snprintf(thread->name, sizeof(thread->name), "%s", name); 546 } else { 547 snprintf(thread->name, sizeof(thread->name), "%p", thread); 548 } 549 550 pthread_mutex_lock(&g_devlist_mutex); 551 if (g_thread_id == 0) { 552 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 553 pthread_mutex_unlock(&g_devlist_mutex); 554 _free_thread(thread); 555 return NULL; 556 } 557 thread->id = g_thread_id++; 558 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 559 g_thread_count++; 560 pthread_mutex_unlock(&g_devlist_mutex); 561 562 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 563 thread->id, thread->name); 564 565 if (spdk_interrupt_mode_is_enabled()) { 566 thread->in_interrupt = true; 567 rc = thread_interrupt_create(thread); 568 if (rc != 0) { 569 _free_thread(thread); 570 return NULL; 571 } 572 } 573 574 if (g_new_thread_fn) { 575 rc = g_new_thread_fn(thread); 576 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 577 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 578 } 579 580 if (rc != 0) { 581 _free_thread(thread); 582 return NULL; 583 } 584 585 thread->state = SPDK_THREAD_STATE_RUNNING; 586 587 /* If this is the first thread, save it as the app thread. Use an atomic 588 * compare + exchange to guard against crazy users who might try to 589 * call spdk_thread_create() simultaneously on multiple threads. 590 */ 591 null_thread = NULL; 592 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 593 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 594 595 return thread; 596 } 597 598 struct spdk_thread * 599 spdk_thread_get_app_thread(void) 600 { 601 return g_app_thread; 602 } 603 604 bool 605 spdk_thread_is_app_thread(struct spdk_thread *thread) 606 { 607 if (thread == NULL) { 608 thread = _get_thread(); 609 } 610 611 return g_app_thread == thread; 612 } 613 614 void 615 spdk_thread_bind(struct spdk_thread *thread, bool bind) 616 { 617 thread->is_bound = bind; 618 } 619 620 bool 621 spdk_thread_is_bound(struct spdk_thread *thread) 622 { 623 return thread->is_bound; 624 } 625 626 void 627 spdk_set_thread(struct spdk_thread *thread) 628 { 629 tls_thread = thread; 630 } 631 632 static void 633 thread_exit(struct spdk_thread *thread, uint64_t now) 634 { 635 struct spdk_poller *poller; 636 struct spdk_io_channel *ch; 637 638 if (now >= thread->exit_timeout_tsc) { 639 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 640 thread->name); 641 goto exited; 642 } 643 644 if (spdk_ring_count(thread->messages) > 0) { 645 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 646 return; 647 } 648 649 if (thread->for_each_count > 0) { 650 SPDK_INFOLOG(thread, "thread %s is still executing %u for_each_channels/threads\n", 651 thread->name, thread->for_each_count); 652 return; 653 } 654 655 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 656 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 657 SPDK_INFOLOG(thread, 658 "thread %s still has active poller %s\n", 659 thread->name, poller->name); 660 return; 661 } 662 } 663 664 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 665 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 666 SPDK_INFOLOG(thread, 667 "thread %s still has active timed poller %s\n", 668 thread->name, poller->name); 669 return; 670 } 671 } 672 673 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 674 SPDK_INFOLOG(thread, 675 "thread %s still has paused poller %s\n", 676 thread->name, poller->name); 677 return; 678 } 679 680 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 681 SPDK_INFOLOG(thread, 682 "thread %s still has channel for io_device %s\n", 683 thread->name, ch->dev->name); 684 return; 685 } 686 687 if (thread->pending_unregister_count > 0) { 688 SPDK_INFOLOG(thread, 689 "thread %s is still unregistering io_devices\n", 690 thread->name); 691 return; 692 } 693 694 exited: 695 thread->state = SPDK_THREAD_STATE_EXITED; 696 if (spdk_unlikely(thread->in_interrupt)) { 697 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 698 } 699 } 700 701 static void _thread_exit(void *ctx); 702 703 int 704 spdk_thread_exit(struct spdk_thread *thread) 705 { 706 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 707 708 assert(tls_thread == thread); 709 710 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 711 SPDK_INFOLOG(thread, 712 "thread %s is already exiting\n", 713 thread->name); 714 return 0; 715 } 716 717 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 718 SPDK_THREAD_EXIT_TIMEOUT_SEC); 719 thread->state = SPDK_THREAD_STATE_EXITING; 720 721 if (spdk_interrupt_mode_is_enabled()) { 722 spdk_thread_send_msg(thread, _thread_exit, thread); 723 } 724 725 return 0; 726 } 727 728 bool 729 spdk_thread_is_running(struct spdk_thread *thread) 730 { 731 return thread->state == SPDK_THREAD_STATE_RUNNING; 732 } 733 734 bool 735 spdk_thread_is_exited(struct spdk_thread *thread) 736 { 737 return thread->state == SPDK_THREAD_STATE_EXITED; 738 } 739 740 void 741 spdk_thread_destroy(struct spdk_thread *thread) 742 { 743 assert(thread != NULL); 744 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 745 746 assert(thread->state == SPDK_THREAD_STATE_EXITED); 747 748 if (tls_thread == thread) { 749 tls_thread = NULL; 750 } 751 752 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 753 if (thread != g_app_thread) { 754 _free_thread(thread); 755 } 756 } 757 758 void * 759 spdk_thread_get_ctx(struct spdk_thread *thread) 760 { 761 if (g_ctx_sz > 0) { 762 return thread->ctx; 763 } 764 765 return NULL; 766 } 767 768 struct spdk_cpuset * 769 spdk_thread_get_cpumask(struct spdk_thread *thread) 770 { 771 return &thread->cpumask; 772 } 773 774 int 775 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 776 { 777 struct spdk_thread *thread; 778 779 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 780 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 781 assert(false); 782 return -ENOTSUP; 783 } 784 785 thread = spdk_get_thread(); 786 if (!thread) { 787 SPDK_ERRLOG("Called from non-SPDK thread\n"); 788 assert(false); 789 return -EINVAL; 790 } 791 792 spdk_cpuset_copy(&thread->cpumask, cpumask); 793 794 /* Invoke framework's reschedule operation. If this function is called multiple times 795 * in a single spdk_thread_poll() context, the last cpumask will be used in the 796 * reschedule operation. 797 */ 798 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 799 800 return 0; 801 } 802 803 struct spdk_thread * 804 spdk_thread_get_from_ctx(void *ctx) 805 { 806 if (ctx == NULL) { 807 assert(false); 808 return NULL; 809 } 810 811 assert(g_ctx_sz > 0); 812 813 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 814 } 815 816 static inline uint32_t 817 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 818 { 819 unsigned count, i; 820 void *messages[SPDK_MSG_BATCH_SIZE]; 821 uint64_t notify = 1; 822 int rc; 823 824 #ifdef DEBUG 825 /* 826 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 827 * so we will never actually read uninitialized data from events, but just to be sure 828 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 829 */ 830 memset(messages, 0, sizeof(messages)); 831 #endif 832 833 if (max_msgs > 0) { 834 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 835 } else { 836 max_msgs = SPDK_MSG_BATCH_SIZE; 837 } 838 839 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 840 if (spdk_unlikely(thread->in_interrupt) && 841 spdk_ring_count(thread->messages) != 0) { 842 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 843 if (rc < 0) { 844 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 845 } 846 } 847 if (count == 0) { 848 return 0; 849 } 850 851 for (i = 0; i < count; i++) { 852 struct spdk_msg *msg = messages[i]; 853 854 assert(msg != NULL); 855 856 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 857 858 msg->fn(msg->arg); 859 860 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 861 862 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 863 /* Insert the messages at the head. We want to re-use the hot 864 * ones. */ 865 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 866 thread->msg_cache_count++; 867 } else { 868 spdk_mempool_put(g_spdk_msg_mempool, msg); 869 } 870 } 871 872 return count; 873 } 874 875 static void 876 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 877 { 878 struct spdk_poller *tmp __attribute__((unused)); 879 880 poller->next_run_tick = now + poller->period_ticks; 881 882 /* 883 * Insert poller in the thread's timed_pollers tree by next scheduled run time 884 * as its key. 885 */ 886 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 887 assert(tmp == NULL); 888 889 /* Update the cache only if it is empty or the inserted poller is earlier than it. 890 * RB_MIN() is not necessary here because all pollers, which has exactly the same 891 * next_run_tick as the existing poller, are inserted on the right side. 892 */ 893 if (thread->first_timed_poller == NULL || 894 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 895 thread->first_timed_poller = poller; 896 } 897 } 898 899 static inline void 900 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 901 { 902 struct spdk_poller *tmp __attribute__((unused)); 903 904 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 905 assert(tmp != NULL); 906 907 /* This function is not used in any case that is performance critical. 908 * Update the cache simply by RB_MIN() if it needs to be changed. 909 */ 910 if (thread->first_timed_poller == poller) { 911 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 912 } 913 } 914 915 static void 916 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 917 { 918 if (poller->period_ticks) { 919 poller_insert_timer(thread, poller, spdk_get_ticks()); 920 } else { 921 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 922 } 923 } 924 925 static inline void 926 thread_update_stats(struct spdk_thread *thread, uint64_t end, 927 uint64_t start, int rc) 928 { 929 if (rc == 0) { 930 /* Poller status idle */ 931 thread->stats.idle_tsc += end - start; 932 } else if (rc > 0) { 933 /* Poller status busy */ 934 thread->stats.busy_tsc += end - start; 935 } 936 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 937 thread->tsc_last = end; 938 } 939 940 static inline int 941 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 942 { 943 int rc; 944 945 switch (poller->state) { 946 case SPDK_POLLER_STATE_UNREGISTERED: 947 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 948 free(poller); 949 return 0; 950 case SPDK_POLLER_STATE_PAUSING: 951 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 952 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 953 poller->state = SPDK_POLLER_STATE_PAUSED; 954 return 0; 955 case SPDK_POLLER_STATE_WAITING: 956 break; 957 default: 958 assert(false); 959 break; 960 } 961 962 poller->state = SPDK_POLLER_STATE_RUNNING; 963 rc = poller->fn(poller->arg); 964 965 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 966 967 poller->run_count++; 968 if (rc > 0) { 969 poller->busy_count++; 970 } 971 972 #ifdef DEBUG 973 if (rc == -1) { 974 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 975 } 976 #endif 977 978 switch (poller->state) { 979 case SPDK_POLLER_STATE_UNREGISTERED: 980 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 981 free(poller); 982 break; 983 case SPDK_POLLER_STATE_PAUSING: 984 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 985 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 986 poller->state = SPDK_POLLER_STATE_PAUSED; 987 break; 988 case SPDK_POLLER_STATE_PAUSED: 989 case SPDK_POLLER_STATE_WAITING: 990 break; 991 case SPDK_POLLER_STATE_RUNNING: 992 poller->state = SPDK_POLLER_STATE_WAITING; 993 break; 994 default: 995 assert(false); 996 break; 997 } 998 999 return rc; 1000 } 1001 1002 static inline int 1003 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 1004 uint64_t now) 1005 { 1006 int rc; 1007 1008 switch (poller->state) { 1009 case SPDK_POLLER_STATE_UNREGISTERED: 1010 free(poller); 1011 return 0; 1012 case SPDK_POLLER_STATE_PAUSING: 1013 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1014 poller->state = SPDK_POLLER_STATE_PAUSED; 1015 return 0; 1016 case SPDK_POLLER_STATE_WAITING: 1017 break; 1018 default: 1019 assert(false); 1020 break; 1021 } 1022 1023 poller->state = SPDK_POLLER_STATE_RUNNING; 1024 rc = poller->fn(poller->arg); 1025 1026 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1027 1028 poller->run_count++; 1029 if (rc > 0) { 1030 poller->busy_count++; 1031 } 1032 1033 #ifdef DEBUG 1034 if (rc == -1) { 1035 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1036 } 1037 #endif 1038 1039 switch (poller->state) { 1040 case SPDK_POLLER_STATE_UNREGISTERED: 1041 free(poller); 1042 break; 1043 case SPDK_POLLER_STATE_PAUSING: 1044 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1045 poller->state = SPDK_POLLER_STATE_PAUSED; 1046 break; 1047 case SPDK_POLLER_STATE_PAUSED: 1048 break; 1049 case SPDK_POLLER_STATE_RUNNING: 1050 poller->state = SPDK_POLLER_STATE_WAITING; 1051 /* fallthrough */ 1052 case SPDK_POLLER_STATE_WAITING: 1053 poller_insert_timer(thread, poller, now); 1054 break; 1055 default: 1056 assert(false); 1057 break; 1058 } 1059 1060 return rc; 1061 } 1062 1063 static int 1064 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1065 { 1066 uint32_t msg_count; 1067 struct spdk_poller *poller, *tmp; 1068 spdk_msg_fn critical_msg; 1069 int rc = 0; 1070 1071 thread->tsc_last = now; 1072 1073 critical_msg = thread->critical_msg; 1074 if (spdk_unlikely(critical_msg != NULL)) { 1075 critical_msg(NULL); 1076 thread->critical_msg = NULL; 1077 rc = 1; 1078 } 1079 1080 msg_count = msg_queue_run_batch(thread, max_msgs); 1081 if (msg_count) { 1082 rc = 1; 1083 } 1084 1085 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1086 active_pollers_head, tailq, tmp) { 1087 int poller_rc; 1088 1089 poller_rc = thread_execute_poller(thread, poller); 1090 if (poller_rc > rc) { 1091 rc = poller_rc; 1092 } 1093 } 1094 1095 poller = thread->first_timed_poller; 1096 while (poller != NULL) { 1097 int timer_rc = 0; 1098 1099 if (now < poller->next_run_tick) { 1100 break; 1101 } 1102 1103 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1104 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1105 1106 /* Update the cache to the next timed poller in the list 1107 * only if the current poller is still the closest, otherwise, 1108 * do nothing because the cache has been already updated. 1109 */ 1110 if (thread->first_timed_poller == poller) { 1111 thread->first_timed_poller = tmp; 1112 } 1113 1114 timer_rc = thread_execute_timed_poller(thread, poller, now); 1115 if (timer_rc > rc) { 1116 rc = timer_rc; 1117 } 1118 1119 poller = tmp; 1120 } 1121 1122 return rc; 1123 } 1124 1125 static void 1126 _thread_remove_pollers(void *ctx) 1127 { 1128 struct spdk_thread *thread = ctx; 1129 struct spdk_poller *poller, *tmp; 1130 1131 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1132 active_pollers_head, tailq, tmp) { 1133 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1134 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1135 free(poller); 1136 } 1137 } 1138 1139 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1140 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1141 poller_remove_timer(thread, poller); 1142 free(poller); 1143 } 1144 } 1145 1146 thread->poller_unregistered = false; 1147 } 1148 1149 static void 1150 _thread_exit(void *ctx) 1151 { 1152 struct spdk_thread *thread = ctx; 1153 1154 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1155 1156 thread_exit(thread, spdk_get_ticks()); 1157 1158 if (thread->state != SPDK_THREAD_STATE_EXITED) { 1159 spdk_thread_send_msg(thread, _thread_exit, thread); 1160 } 1161 } 1162 1163 int 1164 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1165 { 1166 struct spdk_thread *orig_thread; 1167 int rc; 1168 1169 orig_thread = _get_thread(); 1170 tls_thread = thread; 1171 1172 if (now == 0) { 1173 now = spdk_get_ticks(); 1174 } 1175 1176 if (spdk_likely(!thread->in_interrupt)) { 1177 rc = thread_poll(thread, max_msgs, now); 1178 if (spdk_unlikely(thread->in_interrupt)) { 1179 /* The thread transitioned to interrupt mode during the above poll. 1180 * Poll it one more time in case that during the transition time 1181 * there is msg received without notification. 1182 */ 1183 rc = thread_poll(thread, max_msgs, now); 1184 } 1185 1186 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1187 thread_exit(thread, now); 1188 } 1189 } else { 1190 /* Non-block wait on thread's fd_group */ 1191 rc = spdk_fd_group_wait(thread->fgrp, 0); 1192 } 1193 1194 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1195 1196 tls_thread = orig_thread; 1197 1198 return rc; 1199 } 1200 1201 uint64_t 1202 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1203 { 1204 struct spdk_poller *poller; 1205 1206 poller = thread->first_timed_poller; 1207 if (poller) { 1208 return poller->next_run_tick; 1209 } 1210 1211 return 0; 1212 } 1213 1214 int 1215 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1216 { 1217 return !TAILQ_EMPTY(&thread->active_pollers); 1218 } 1219 1220 static bool 1221 thread_has_unpaused_pollers(struct spdk_thread *thread) 1222 { 1223 if (TAILQ_EMPTY(&thread->active_pollers) && 1224 RB_EMPTY(&thread->timed_pollers)) { 1225 return false; 1226 } 1227 1228 return true; 1229 } 1230 1231 bool 1232 spdk_thread_has_pollers(struct spdk_thread *thread) 1233 { 1234 if (!thread_has_unpaused_pollers(thread) && 1235 TAILQ_EMPTY(&thread->paused_pollers)) { 1236 return false; 1237 } 1238 1239 return true; 1240 } 1241 1242 bool 1243 spdk_thread_is_idle(struct spdk_thread *thread) 1244 { 1245 if (spdk_ring_count(thread->messages) || 1246 thread_has_unpaused_pollers(thread) || 1247 thread->critical_msg != NULL) { 1248 return false; 1249 } 1250 1251 return true; 1252 } 1253 1254 uint32_t 1255 spdk_thread_get_count(void) 1256 { 1257 /* 1258 * Return cached value of the current thread count. We could acquire the 1259 * lock and iterate through the TAILQ of threads to count them, but that 1260 * count could still be invalidated after we release the lock. 1261 */ 1262 return g_thread_count; 1263 } 1264 1265 struct spdk_thread * 1266 spdk_get_thread(void) 1267 { 1268 return _get_thread(); 1269 } 1270 1271 const char * 1272 spdk_thread_get_name(const struct spdk_thread *thread) 1273 { 1274 return thread->name; 1275 } 1276 1277 uint64_t 1278 spdk_thread_get_id(const struct spdk_thread *thread) 1279 { 1280 return thread->id; 1281 } 1282 1283 struct spdk_thread * 1284 spdk_thread_get_by_id(uint64_t id) 1285 { 1286 struct spdk_thread *thread; 1287 1288 if (id == 0 || id >= g_thread_id) { 1289 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1290 return NULL; 1291 } 1292 pthread_mutex_lock(&g_devlist_mutex); 1293 TAILQ_FOREACH(thread, &g_threads, tailq) { 1294 if (thread->id == id) { 1295 break; 1296 } 1297 } 1298 pthread_mutex_unlock(&g_devlist_mutex); 1299 return thread; 1300 } 1301 1302 int 1303 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1304 { 1305 struct spdk_thread *thread; 1306 1307 thread = _get_thread(); 1308 if (!thread) { 1309 SPDK_ERRLOG("No thread allocated\n"); 1310 return -EINVAL; 1311 } 1312 1313 if (stats == NULL) { 1314 return -EINVAL; 1315 } 1316 1317 *stats = thread->stats; 1318 1319 return 0; 1320 } 1321 1322 uint64_t 1323 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1324 { 1325 if (thread == NULL) { 1326 thread = _get_thread(); 1327 } 1328 1329 return thread->tsc_last; 1330 } 1331 1332 static inline int 1333 thread_send_msg_notification(const struct spdk_thread *target_thread) 1334 { 1335 uint64_t notify = 1; 1336 int rc; 1337 1338 /* Not necessary to do notification if interrupt facility is not enabled */ 1339 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1340 return 0; 1341 } 1342 1343 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1344 * after sending thread msg, it is necessary to check whether target thread runs in 1345 * interrupt mode and then decide whether do event notification. 1346 */ 1347 if (spdk_unlikely(target_thread->in_interrupt)) { 1348 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1349 if (rc < 0) { 1350 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1351 return -EIO; 1352 } 1353 } 1354 1355 return 0; 1356 } 1357 1358 int 1359 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1360 { 1361 struct spdk_thread *local_thread; 1362 struct spdk_msg *msg; 1363 int rc; 1364 1365 assert(thread != NULL); 1366 1367 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1368 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1369 return -EIO; 1370 } 1371 1372 local_thread = _get_thread(); 1373 1374 msg = NULL; 1375 if (local_thread != NULL) { 1376 if (local_thread->msg_cache_count > 0) { 1377 msg = SLIST_FIRST(&local_thread->msg_cache); 1378 assert(msg != NULL); 1379 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1380 local_thread->msg_cache_count--; 1381 } 1382 } 1383 1384 if (msg == NULL) { 1385 msg = spdk_mempool_get(g_spdk_msg_mempool); 1386 if (!msg) { 1387 SPDK_ERRLOG("msg could not be allocated\n"); 1388 return -ENOMEM; 1389 } 1390 } 1391 1392 msg->fn = fn; 1393 msg->arg = ctx; 1394 1395 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1396 if (rc != 1) { 1397 SPDK_ERRLOG("msg could not be enqueued\n"); 1398 spdk_mempool_put(g_spdk_msg_mempool, msg); 1399 return -EIO; 1400 } 1401 1402 return thread_send_msg_notification(thread); 1403 } 1404 1405 int 1406 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1407 { 1408 spdk_msg_fn expected = NULL; 1409 1410 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1411 __ATOMIC_SEQ_CST)) { 1412 return -EIO; 1413 } 1414 1415 return thread_send_msg_notification(thread); 1416 } 1417 1418 #ifdef __linux__ 1419 static int 1420 interrupt_timerfd_process(void *arg) 1421 { 1422 struct spdk_poller *poller = arg; 1423 uint64_t exp; 1424 int rc; 1425 1426 /* clear the level of interval timer */ 1427 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1428 if (rc < 0) { 1429 if (rc == -EAGAIN) { 1430 return 0; 1431 } 1432 1433 return rc; 1434 } 1435 1436 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1437 1438 return poller->fn(poller->arg); 1439 } 1440 1441 static int 1442 period_poller_interrupt_init(struct spdk_poller *poller) 1443 { 1444 int timerfd; 1445 1446 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1447 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1448 if (timerfd < 0) { 1449 return -errno; 1450 } 1451 1452 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1453 if (poller->intr == NULL) { 1454 close(timerfd); 1455 return -1; 1456 } 1457 1458 return 0; 1459 } 1460 1461 static void 1462 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1463 { 1464 int timerfd; 1465 uint64_t now_tick = spdk_get_ticks(); 1466 uint64_t ticks = spdk_get_ticks_hz(); 1467 int ret; 1468 struct itimerspec new_tv = {}; 1469 struct itimerspec old_tv = {}; 1470 1471 assert(poller->intr != NULL); 1472 assert(poller->period_ticks != 0); 1473 1474 timerfd = poller->intr->efd; 1475 1476 assert(timerfd >= 0); 1477 1478 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1479 interrupt_mode ? "interrupt" : "poll"); 1480 1481 if (interrupt_mode) { 1482 /* Set repeated timer expiration */ 1483 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1484 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1485 1486 /* Update next timer expiration */ 1487 if (poller->next_run_tick == 0) { 1488 poller->next_run_tick = now_tick + poller->period_ticks; 1489 } else if (poller->next_run_tick < now_tick) { 1490 poller->next_run_tick = now_tick; 1491 } 1492 1493 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1494 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1495 1496 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1497 if (ret < 0) { 1498 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1499 assert(false); 1500 } 1501 } else { 1502 /* Disarm the timer */ 1503 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1504 if (ret < 0) { 1505 /* timerfd_settime's failure indicates that the timerfd is in error */ 1506 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1507 assert(false); 1508 } 1509 1510 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1511 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1512 */ 1513 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1514 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1515 poller_remove_timer(poller->thread, poller); 1516 poller_insert_timer(poller->thread, poller, now_tick); 1517 } 1518 } 1519 1520 static void 1521 poller_interrupt_fini(struct spdk_poller *poller) 1522 { 1523 int fd; 1524 1525 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1526 assert(poller->intr != NULL); 1527 fd = poller->intr->efd; 1528 spdk_interrupt_unregister(&poller->intr); 1529 close(fd); 1530 } 1531 1532 static int 1533 busy_poller_interrupt_init(struct spdk_poller *poller) 1534 { 1535 int busy_efd; 1536 1537 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1538 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1539 if (busy_efd < 0) { 1540 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1541 return -errno; 1542 } 1543 1544 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1545 if (poller->intr == NULL) { 1546 close(busy_efd); 1547 return -1; 1548 } 1549 1550 return 0; 1551 } 1552 1553 static void 1554 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1555 { 1556 int busy_efd = poller->intr->efd; 1557 uint64_t notify = 1; 1558 int rc __attribute__((unused)); 1559 1560 assert(busy_efd >= 0); 1561 1562 if (interrupt_mode) { 1563 /* Write without read on eventfd will get it repeatedly triggered. */ 1564 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1565 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1566 } 1567 } else { 1568 /* Read on eventfd will clear its level triggering. */ 1569 rc = read(busy_efd, ¬ify, sizeof(notify)); 1570 } 1571 } 1572 1573 #else 1574 1575 static int 1576 period_poller_interrupt_init(struct spdk_poller *poller) 1577 { 1578 return -ENOTSUP; 1579 } 1580 1581 static void 1582 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1583 { 1584 } 1585 1586 static void 1587 poller_interrupt_fini(struct spdk_poller *poller) 1588 { 1589 } 1590 1591 static int 1592 busy_poller_interrupt_init(struct spdk_poller *poller) 1593 { 1594 return -ENOTSUP; 1595 } 1596 1597 static void 1598 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1599 { 1600 } 1601 1602 #endif 1603 1604 void 1605 spdk_poller_register_interrupt(struct spdk_poller *poller, 1606 spdk_poller_set_interrupt_mode_cb cb_fn, 1607 void *cb_arg) 1608 { 1609 assert(poller != NULL); 1610 assert(cb_fn != NULL); 1611 assert(spdk_get_thread() == poller->thread); 1612 1613 if (!spdk_interrupt_mode_is_enabled()) { 1614 return; 1615 } 1616 1617 /* If this poller already had an interrupt, clean the old one up. */ 1618 if (poller->intr != NULL) { 1619 poller_interrupt_fini(poller); 1620 } 1621 1622 poller->set_intr_cb_fn = cb_fn; 1623 poller->set_intr_cb_arg = cb_arg; 1624 1625 /* Set poller into interrupt mode if thread is in interrupt. */ 1626 if (poller->thread->in_interrupt) { 1627 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1628 } 1629 } 1630 1631 static uint64_t 1632 convert_us_to_ticks(uint64_t us) 1633 { 1634 uint64_t quotient, remainder, ticks; 1635 1636 if (us) { 1637 quotient = us / SPDK_SEC_TO_USEC; 1638 remainder = us % SPDK_SEC_TO_USEC; 1639 ticks = spdk_get_ticks_hz(); 1640 1641 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1642 } else { 1643 return 0; 1644 } 1645 } 1646 1647 static struct spdk_poller * 1648 poller_register(spdk_poller_fn fn, 1649 void *arg, 1650 uint64_t period_microseconds, 1651 const char *name) 1652 { 1653 struct spdk_thread *thread; 1654 struct spdk_poller *poller; 1655 1656 thread = spdk_get_thread(); 1657 if (!thread) { 1658 assert(false); 1659 return NULL; 1660 } 1661 1662 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1663 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1664 return NULL; 1665 } 1666 1667 poller = calloc(1, sizeof(*poller)); 1668 if (poller == NULL) { 1669 SPDK_ERRLOG("Poller memory allocation failed\n"); 1670 return NULL; 1671 } 1672 1673 if (name) { 1674 snprintf(poller->name, sizeof(poller->name), "%s", name); 1675 } else { 1676 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1677 } 1678 1679 poller->state = SPDK_POLLER_STATE_WAITING; 1680 poller->fn = fn; 1681 poller->arg = arg; 1682 poller->thread = thread; 1683 poller->intr = NULL; 1684 if (thread->next_poller_id == 0) { 1685 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1686 thread->next_poller_id = 1; 1687 } 1688 poller->id = thread->next_poller_id++; 1689 1690 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1691 1692 if (spdk_interrupt_mode_is_enabled()) { 1693 int rc; 1694 1695 if (period_microseconds) { 1696 rc = period_poller_interrupt_init(poller); 1697 if (rc < 0) { 1698 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1699 free(poller); 1700 return NULL; 1701 } 1702 1703 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1704 poller->set_intr_cb_arg = NULL; 1705 1706 } else { 1707 /* If the poller doesn't have a period, create interruptfd that's always 1708 * busy automatically when running in interrupt mode. 1709 */ 1710 rc = busy_poller_interrupt_init(poller); 1711 if (rc > 0) { 1712 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1713 free(poller); 1714 return NULL; 1715 } 1716 1717 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1718 poller->set_intr_cb_arg = NULL; 1719 } 1720 1721 /* Set poller into interrupt mode if thread is in interrupt. */ 1722 if (poller->thread->in_interrupt) { 1723 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1724 } 1725 } 1726 1727 thread_insert_poller(thread, poller); 1728 1729 return poller; 1730 } 1731 1732 struct spdk_poller * 1733 spdk_poller_register(spdk_poller_fn fn, 1734 void *arg, 1735 uint64_t period_microseconds) 1736 { 1737 return poller_register(fn, arg, period_microseconds, NULL); 1738 } 1739 1740 struct spdk_poller * 1741 spdk_poller_register_named(spdk_poller_fn fn, 1742 void *arg, 1743 uint64_t period_microseconds, 1744 const char *name) 1745 { 1746 return poller_register(fn, arg, period_microseconds, name); 1747 } 1748 1749 static void 1750 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1751 struct spdk_thread *curthread) 1752 { 1753 if (thread == NULL) { 1754 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1755 abort(); 1756 } 1757 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1758 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1759 thread->name, thread->id); 1760 assert(false); 1761 } 1762 1763 void 1764 spdk_poller_unregister(struct spdk_poller **ppoller) 1765 { 1766 struct spdk_thread *thread; 1767 struct spdk_poller *poller; 1768 1769 poller = *ppoller; 1770 if (poller == NULL) { 1771 return; 1772 } 1773 1774 *ppoller = NULL; 1775 1776 thread = spdk_get_thread(); 1777 if (!thread) { 1778 assert(false); 1779 return; 1780 } 1781 1782 if (poller->thread != thread) { 1783 wrong_thread(__func__, poller->name, poller->thread, thread); 1784 return; 1785 } 1786 1787 if (spdk_interrupt_mode_is_enabled()) { 1788 /* Release the interrupt resource for period or busy poller */ 1789 if (poller->intr != NULL) { 1790 poller_interrupt_fini(poller); 1791 } 1792 1793 /* If there is not already a pending poller removal, generate 1794 * a message to go process removals. */ 1795 if (!thread->poller_unregistered) { 1796 thread->poller_unregistered = true; 1797 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1798 } 1799 } 1800 1801 /* If the poller was paused, put it on the active_pollers list so that 1802 * its unregistration can be processed by spdk_thread_poll(). 1803 */ 1804 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1805 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1806 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1807 poller->period_ticks = 0; 1808 } 1809 1810 /* Simply set the state to unregistered. The poller will get cleaned up 1811 * in a subsequent call to spdk_thread_poll(). 1812 */ 1813 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1814 } 1815 1816 void 1817 spdk_poller_pause(struct spdk_poller *poller) 1818 { 1819 struct spdk_thread *thread; 1820 1821 thread = spdk_get_thread(); 1822 if (!thread) { 1823 assert(false); 1824 return; 1825 } 1826 1827 if (poller->thread != thread) { 1828 wrong_thread(__func__, poller->name, poller->thread, thread); 1829 return; 1830 } 1831 1832 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1833 * spdk_thread_poll() move it. It allows a poller to be paused from 1834 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1835 * iteration, or from within itself without breaking the logic to always 1836 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1837 */ 1838 switch (poller->state) { 1839 case SPDK_POLLER_STATE_PAUSED: 1840 case SPDK_POLLER_STATE_PAUSING: 1841 break; 1842 case SPDK_POLLER_STATE_RUNNING: 1843 case SPDK_POLLER_STATE_WAITING: 1844 poller->state = SPDK_POLLER_STATE_PAUSING; 1845 break; 1846 default: 1847 assert(false); 1848 break; 1849 } 1850 } 1851 1852 void 1853 spdk_poller_resume(struct spdk_poller *poller) 1854 { 1855 struct spdk_thread *thread; 1856 1857 thread = spdk_get_thread(); 1858 if (!thread) { 1859 assert(false); 1860 return; 1861 } 1862 1863 if (poller->thread != thread) { 1864 wrong_thread(__func__, poller->name, poller->thread, thread); 1865 return; 1866 } 1867 1868 /* If a poller is paused it has to be removed from the paused pollers 1869 * list and put on the active list or timer tree depending on its 1870 * period_ticks. If a poller is still in the process of being paused, 1871 * we just need to flip its state back to waiting, as it's already on 1872 * the appropriate list or tree. 1873 */ 1874 switch (poller->state) { 1875 case SPDK_POLLER_STATE_PAUSED: 1876 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1877 thread_insert_poller(thread, poller); 1878 /* fallthrough */ 1879 case SPDK_POLLER_STATE_PAUSING: 1880 poller->state = SPDK_POLLER_STATE_WAITING; 1881 break; 1882 case SPDK_POLLER_STATE_RUNNING: 1883 case SPDK_POLLER_STATE_WAITING: 1884 break; 1885 default: 1886 assert(false); 1887 break; 1888 } 1889 } 1890 1891 const char * 1892 spdk_poller_get_name(struct spdk_poller *poller) 1893 { 1894 return poller->name; 1895 } 1896 1897 uint64_t 1898 spdk_poller_get_id(struct spdk_poller *poller) 1899 { 1900 return poller->id; 1901 } 1902 1903 const char * 1904 spdk_poller_get_state_str(struct spdk_poller *poller) 1905 { 1906 switch (poller->state) { 1907 case SPDK_POLLER_STATE_WAITING: 1908 return "waiting"; 1909 case SPDK_POLLER_STATE_RUNNING: 1910 return "running"; 1911 case SPDK_POLLER_STATE_UNREGISTERED: 1912 return "unregistered"; 1913 case SPDK_POLLER_STATE_PAUSING: 1914 return "pausing"; 1915 case SPDK_POLLER_STATE_PAUSED: 1916 return "paused"; 1917 default: 1918 return NULL; 1919 } 1920 } 1921 1922 uint64_t 1923 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1924 { 1925 return poller->period_ticks; 1926 } 1927 1928 void 1929 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1930 { 1931 stats->run_count = poller->run_count; 1932 stats->busy_count = poller->busy_count; 1933 } 1934 1935 struct spdk_poller * 1936 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1937 { 1938 return TAILQ_FIRST(&thread->active_pollers); 1939 } 1940 1941 struct spdk_poller * 1942 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1943 { 1944 return TAILQ_NEXT(prev, tailq); 1945 } 1946 1947 struct spdk_poller * 1948 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1949 { 1950 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1951 } 1952 1953 struct spdk_poller * 1954 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1955 { 1956 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1957 } 1958 1959 struct spdk_poller * 1960 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1961 { 1962 return TAILQ_FIRST(&thread->paused_pollers); 1963 } 1964 1965 struct spdk_poller * 1966 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1967 { 1968 return TAILQ_NEXT(prev, tailq); 1969 } 1970 1971 struct spdk_io_channel * 1972 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1973 { 1974 return RB_MIN(io_channel_tree, &thread->io_channels); 1975 } 1976 1977 struct spdk_io_channel * 1978 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1979 { 1980 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1981 } 1982 1983 struct call_thread { 1984 struct spdk_thread *cur_thread; 1985 spdk_msg_fn fn; 1986 void *ctx; 1987 1988 struct spdk_thread *orig_thread; 1989 spdk_msg_fn cpl; 1990 }; 1991 1992 static void 1993 _back_to_orig_thread(void *ctx) 1994 { 1995 struct call_thread *ct = ctx; 1996 1997 assert(ct->orig_thread->for_each_count > 0); 1998 ct->orig_thread->for_each_count--; 1999 2000 ct->cpl(ct->ctx); 2001 free(ctx); 2002 } 2003 2004 static void 2005 _on_thread(void *ctx) 2006 { 2007 struct call_thread *ct = ctx; 2008 int rc __attribute__((unused)); 2009 2010 ct->fn(ct->ctx); 2011 2012 pthread_mutex_lock(&g_devlist_mutex); 2013 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2014 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 2015 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 2016 ct->cur_thread->name); 2017 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2018 } 2019 pthread_mutex_unlock(&g_devlist_mutex); 2020 2021 if (!ct->cur_thread) { 2022 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 2023 2024 rc = spdk_thread_send_msg(ct->orig_thread, _back_to_orig_thread, ctx); 2025 } else { 2026 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 2027 ct->cur_thread->name); 2028 2029 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 2030 } 2031 assert(rc == 0); 2032 } 2033 2034 void 2035 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 2036 { 2037 struct call_thread *ct; 2038 struct spdk_thread *thread; 2039 int rc __attribute__((unused)); 2040 2041 ct = calloc(1, sizeof(*ct)); 2042 if (!ct) { 2043 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2044 cpl(ctx); 2045 return; 2046 } 2047 2048 ct->fn = fn; 2049 ct->ctx = ctx; 2050 ct->cpl = cpl; 2051 2052 thread = _get_thread(); 2053 if (!thread) { 2054 SPDK_ERRLOG("No thread allocated\n"); 2055 free(ct); 2056 cpl(ctx); 2057 return; 2058 } 2059 ct->orig_thread = thread; 2060 2061 ct->orig_thread->for_each_count++; 2062 2063 pthread_mutex_lock(&g_devlist_mutex); 2064 ct->cur_thread = TAILQ_FIRST(&g_threads); 2065 pthread_mutex_unlock(&g_devlist_mutex); 2066 2067 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2068 ct->orig_thread->name); 2069 2070 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2071 assert(rc == 0); 2072 } 2073 2074 static inline void 2075 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2076 { 2077 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2078 return; 2079 } 2080 2081 if (!poller->set_intr_cb_fn) { 2082 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2083 assert(false); 2084 return; 2085 } 2086 2087 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2088 } 2089 2090 void 2091 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2092 { 2093 struct spdk_thread *thread = _get_thread(); 2094 struct spdk_poller *poller, *tmp; 2095 2096 assert(thread); 2097 assert(spdk_interrupt_mode_is_enabled()); 2098 2099 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2100 thread->name, enable_interrupt ? "intr" : "poll", 2101 thread->in_interrupt ? "intr" : "poll"); 2102 2103 if (thread->in_interrupt == enable_interrupt) { 2104 return; 2105 } 2106 2107 /* Set pollers to expected mode */ 2108 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2109 poller_set_interrupt_mode(poller, enable_interrupt); 2110 } 2111 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2112 poller_set_interrupt_mode(poller, enable_interrupt); 2113 } 2114 /* All paused pollers will go to work in interrupt mode */ 2115 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2116 poller_set_interrupt_mode(poller, enable_interrupt); 2117 } 2118 2119 thread->in_interrupt = enable_interrupt; 2120 return; 2121 } 2122 2123 static struct io_device * 2124 io_device_get(void *io_device) 2125 { 2126 struct io_device find = {}; 2127 2128 find.io_device = io_device; 2129 return RB_FIND(io_device_tree, &g_io_devices, &find); 2130 } 2131 2132 void 2133 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2134 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2135 const char *name) 2136 { 2137 struct io_device *dev, *tmp; 2138 struct spdk_thread *thread; 2139 2140 assert(io_device != NULL); 2141 assert(create_cb != NULL); 2142 assert(destroy_cb != NULL); 2143 2144 thread = spdk_get_thread(); 2145 if (!thread) { 2146 SPDK_ERRLOG("called from non-SPDK thread\n"); 2147 assert(false); 2148 return; 2149 } 2150 2151 dev = calloc(1, sizeof(struct io_device)); 2152 if (dev == NULL) { 2153 SPDK_ERRLOG("could not allocate io_device\n"); 2154 return; 2155 } 2156 2157 dev->io_device = io_device; 2158 if (name) { 2159 snprintf(dev->name, sizeof(dev->name), "%s", name); 2160 } else { 2161 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2162 } 2163 dev->create_cb = create_cb; 2164 dev->destroy_cb = destroy_cb; 2165 dev->unregister_cb = NULL; 2166 dev->ctx_size = ctx_size; 2167 dev->for_each_count = 0; 2168 dev->unregistered = false; 2169 dev->refcnt = 0; 2170 2171 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2172 dev->name, dev->io_device, thread->name); 2173 2174 pthread_mutex_lock(&g_devlist_mutex); 2175 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2176 if (tmp != NULL) { 2177 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2178 io_device, tmp->name, dev->name); 2179 free(dev); 2180 } 2181 2182 pthread_mutex_unlock(&g_devlist_mutex); 2183 } 2184 2185 static void 2186 _finish_unregister(void *arg) 2187 { 2188 struct io_device *dev = arg; 2189 struct spdk_thread *thread; 2190 2191 thread = spdk_get_thread(); 2192 assert(thread == dev->unregister_thread); 2193 2194 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2195 dev->name, dev->io_device, thread->name); 2196 2197 assert(thread->pending_unregister_count > 0); 2198 thread->pending_unregister_count--; 2199 2200 dev->unregister_cb(dev->io_device); 2201 free(dev); 2202 } 2203 2204 static void 2205 io_device_free(struct io_device *dev) 2206 { 2207 int rc __attribute__((unused)); 2208 2209 if (dev->unregister_cb == NULL) { 2210 free(dev); 2211 } else { 2212 assert(dev->unregister_thread != NULL); 2213 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2214 dev->name, dev->io_device, dev->unregister_thread->name); 2215 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2216 assert(rc == 0); 2217 } 2218 } 2219 2220 void 2221 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2222 { 2223 struct io_device *dev; 2224 uint32_t refcnt; 2225 struct spdk_thread *thread; 2226 2227 thread = spdk_get_thread(); 2228 if (!thread) { 2229 SPDK_ERRLOG("called from non-SPDK thread\n"); 2230 assert(false); 2231 return; 2232 } 2233 2234 pthread_mutex_lock(&g_devlist_mutex); 2235 dev = io_device_get(io_device); 2236 if (!dev) { 2237 SPDK_ERRLOG("io_device %p not found\n", io_device); 2238 assert(false); 2239 pthread_mutex_unlock(&g_devlist_mutex); 2240 return; 2241 } 2242 2243 /* The for_each_count check differentiates the user attempting to unregister the 2244 * device a second time, from the internal call to this function that occurs 2245 * after the for_each_count reaches 0. 2246 */ 2247 if (dev->pending_unregister && dev->for_each_count > 0) { 2248 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2249 assert(false); 2250 pthread_mutex_unlock(&g_devlist_mutex); 2251 return; 2252 } 2253 2254 dev->unregister_cb = unregister_cb; 2255 dev->unregister_thread = thread; 2256 2257 if (dev->for_each_count > 0) { 2258 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2259 dev->name, io_device, dev->for_each_count); 2260 dev->pending_unregister = true; 2261 pthread_mutex_unlock(&g_devlist_mutex); 2262 return; 2263 } 2264 2265 dev->unregistered = true; 2266 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2267 refcnt = dev->refcnt; 2268 pthread_mutex_unlock(&g_devlist_mutex); 2269 2270 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2271 dev->name, dev->io_device, thread->name); 2272 2273 if (unregister_cb) { 2274 thread->pending_unregister_count++; 2275 } 2276 2277 if (refcnt > 0) { 2278 /* defer deletion */ 2279 return; 2280 } 2281 2282 io_device_free(dev); 2283 } 2284 2285 const char * 2286 spdk_io_device_get_name(struct io_device *dev) 2287 { 2288 return dev->name; 2289 } 2290 2291 static struct spdk_io_channel * 2292 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2293 { 2294 struct spdk_io_channel find = {}; 2295 2296 find.dev = dev; 2297 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2298 } 2299 2300 struct spdk_io_channel * 2301 spdk_get_io_channel(void *io_device) 2302 { 2303 struct spdk_io_channel *ch; 2304 struct spdk_thread *thread; 2305 struct io_device *dev; 2306 int rc; 2307 2308 pthread_mutex_lock(&g_devlist_mutex); 2309 dev = io_device_get(io_device); 2310 if (dev == NULL) { 2311 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2312 pthread_mutex_unlock(&g_devlist_mutex); 2313 return NULL; 2314 } 2315 2316 thread = _get_thread(); 2317 if (!thread) { 2318 SPDK_ERRLOG("No thread allocated\n"); 2319 pthread_mutex_unlock(&g_devlist_mutex); 2320 return NULL; 2321 } 2322 2323 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2324 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2325 pthread_mutex_unlock(&g_devlist_mutex); 2326 return NULL; 2327 } 2328 2329 ch = thread_get_io_channel(thread, dev); 2330 if (ch != NULL) { 2331 ch->ref++; 2332 2333 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2334 ch, dev->name, dev->io_device, thread->name, ch->ref); 2335 2336 /* 2337 * An I/O channel already exists for this device on this 2338 * thread, so return it. 2339 */ 2340 pthread_mutex_unlock(&g_devlist_mutex); 2341 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2342 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2343 return ch; 2344 } 2345 2346 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2347 if (ch == NULL) { 2348 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2349 pthread_mutex_unlock(&g_devlist_mutex); 2350 return NULL; 2351 } 2352 2353 ch->dev = dev; 2354 ch->destroy_cb = dev->destroy_cb; 2355 ch->thread = thread; 2356 ch->ref = 1; 2357 ch->destroy_ref = 0; 2358 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2359 2360 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2361 ch, dev->name, dev->io_device, thread->name, ch->ref); 2362 2363 dev->refcnt++; 2364 2365 pthread_mutex_unlock(&g_devlist_mutex); 2366 2367 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2368 if (rc != 0) { 2369 pthread_mutex_lock(&g_devlist_mutex); 2370 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2371 dev->refcnt--; 2372 free(ch); 2373 SPDK_ERRLOG("could not create io_channel for io_device %s (%p): %s (rc=%d)\n", 2374 dev->name, io_device, spdk_strerror(-rc), rc); 2375 pthread_mutex_unlock(&g_devlist_mutex); 2376 return NULL; 2377 } 2378 2379 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2380 return ch; 2381 } 2382 2383 static void 2384 put_io_channel(void *arg) 2385 { 2386 struct spdk_io_channel *ch = arg; 2387 bool do_remove_dev = true; 2388 struct spdk_thread *thread; 2389 2390 thread = spdk_get_thread(); 2391 if (!thread) { 2392 SPDK_ERRLOG("called from non-SPDK thread\n"); 2393 assert(false); 2394 return; 2395 } 2396 2397 SPDK_DEBUGLOG(thread, 2398 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2399 ch, ch->dev->name, ch->dev->io_device, thread->name); 2400 2401 assert(ch->thread == thread); 2402 2403 ch->destroy_ref--; 2404 2405 if (ch->ref > 0 || ch->destroy_ref > 0) { 2406 /* 2407 * Another reference to the associated io_device was requested 2408 * after this message was sent but before it had a chance to 2409 * execute. 2410 */ 2411 return; 2412 } 2413 2414 pthread_mutex_lock(&g_devlist_mutex); 2415 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2416 pthread_mutex_unlock(&g_devlist_mutex); 2417 2418 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2419 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2420 2421 pthread_mutex_lock(&g_devlist_mutex); 2422 ch->dev->refcnt--; 2423 2424 if (!ch->dev->unregistered) { 2425 do_remove_dev = false; 2426 } 2427 2428 if (ch->dev->refcnt > 0) { 2429 do_remove_dev = false; 2430 } 2431 2432 pthread_mutex_unlock(&g_devlist_mutex); 2433 2434 if (do_remove_dev) { 2435 io_device_free(ch->dev); 2436 } 2437 free(ch); 2438 } 2439 2440 void 2441 spdk_put_io_channel(struct spdk_io_channel *ch) 2442 { 2443 struct spdk_thread *thread; 2444 int rc __attribute__((unused)); 2445 2446 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2447 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2448 2449 thread = spdk_get_thread(); 2450 if (!thread) { 2451 SPDK_ERRLOG("called from non-SPDK thread\n"); 2452 assert(false); 2453 return; 2454 } 2455 2456 if (ch->thread != thread) { 2457 wrong_thread(__func__, "ch", ch->thread, thread); 2458 return; 2459 } 2460 2461 SPDK_DEBUGLOG(thread, 2462 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2463 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2464 2465 ch->ref--; 2466 2467 if (ch->ref == 0) { 2468 ch->destroy_ref++; 2469 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2470 assert(rc == 0); 2471 } 2472 } 2473 2474 struct spdk_io_channel * 2475 spdk_io_channel_from_ctx(void *ctx) 2476 { 2477 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2478 } 2479 2480 struct spdk_thread * 2481 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2482 { 2483 return ch->thread; 2484 } 2485 2486 void * 2487 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2488 { 2489 return ch->dev->io_device; 2490 } 2491 2492 const char * 2493 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2494 { 2495 return spdk_io_device_get_name(ch->dev); 2496 } 2497 2498 int 2499 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2500 { 2501 return ch->ref; 2502 } 2503 2504 struct spdk_io_channel_iter { 2505 void *io_device; 2506 struct io_device *dev; 2507 spdk_channel_msg fn; 2508 int status; 2509 void *ctx; 2510 struct spdk_io_channel *ch; 2511 2512 struct spdk_thread *cur_thread; 2513 2514 struct spdk_thread *orig_thread; 2515 spdk_channel_for_each_cpl cpl; 2516 }; 2517 2518 void * 2519 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2520 { 2521 return i->io_device; 2522 } 2523 2524 struct spdk_io_channel * 2525 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2526 { 2527 return i->ch; 2528 } 2529 2530 void * 2531 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2532 { 2533 return i->ctx; 2534 } 2535 2536 static void 2537 _call_completion(void *ctx) 2538 { 2539 struct spdk_io_channel_iter *i = ctx; 2540 2541 assert(i->orig_thread->for_each_count > 0); 2542 i->orig_thread->for_each_count--; 2543 2544 if (i->cpl != NULL) { 2545 i->cpl(i, i->status); 2546 } 2547 free(i); 2548 } 2549 2550 static void 2551 _call_channel(void *ctx) 2552 { 2553 struct spdk_io_channel_iter *i = ctx; 2554 struct spdk_io_channel *ch; 2555 2556 /* 2557 * It is possible that the channel was deleted before this 2558 * message had a chance to execute. If so, skip calling 2559 * the fn() on this thread. 2560 */ 2561 pthread_mutex_lock(&g_devlist_mutex); 2562 ch = thread_get_io_channel(i->cur_thread, i->dev); 2563 pthread_mutex_unlock(&g_devlist_mutex); 2564 2565 if (ch) { 2566 i->fn(i); 2567 } else { 2568 spdk_for_each_channel_continue(i, 0); 2569 } 2570 } 2571 2572 void 2573 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2574 spdk_channel_for_each_cpl cpl) 2575 { 2576 struct spdk_thread *thread; 2577 struct spdk_io_channel *ch; 2578 struct spdk_io_channel_iter *i; 2579 int rc __attribute__((unused)); 2580 2581 i = calloc(1, sizeof(*i)); 2582 if (!i) { 2583 SPDK_ERRLOG("Unable to allocate iterator\n"); 2584 assert(false); 2585 return; 2586 } 2587 2588 i->io_device = io_device; 2589 i->fn = fn; 2590 i->ctx = ctx; 2591 i->cpl = cpl; 2592 i->orig_thread = _get_thread(); 2593 2594 i->orig_thread->for_each_count++; 2595 2596 pthread_mutex_lock(&g_devlist_mutex); 2597 i->dev = io_device_get(io_device); 2598 if (i->dev == NULL) { 2599 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2600 assert(false); 2601 i->status = -ENODEV; 2602 goto end; 2603 } 2604 2605 /* Do not allow new for_each operations if we are already waiting to unregister 2606 * the device for other for_each operations to complete. 2607 */ 2608 if (i->dev->pending_unregister) { 2609 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2610 i->status = -ENODEV; 2611 goto end; 2612 } 2613 2614 TAILQ_FOREACH(thread, &g_threads, tailq) { 2615 ch = thread_get_io_channel(thread, i->dev); 2616 if (ch != NULL) { 2617 ch->dev->for_each_count++; 2618 i->cur_thread = thread; 2619 i->ch = ch; 2620 pthread_mutex_unlock(&g_devlist_mutex); 2621 rc = spdk_thread_send_msg(thread, _call_channel, i); 2622 assert(rc == 0); 2623 return; 2624 } 2625 } 2626 2627 end: 2628 pthread_mutex_unlock(&g_devlist_mutex); 2629 2630 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2631 assert(rc == 0); 2632 } 2633 2634 static void 2635 __pending_unregister(void *arg) 2636 { 2637 struct io_device *dev = arg; 2638 2639 assert(dev->pending_unregister); 2640 assert(dev->for_each_count == 0); 2641 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2642 } 2643 2644 void 2645 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2646 { 2647 struct spdk_thread *thread; 2648 struct spdk_io_channel *ch; 2649 struct io_device *dev; 2650 int rc __attribute__((unused)); 2651 2652 assert(i->cur_thread == spdk_get_thread()); 2653 2654 i->status = status; 2655 2656 pthread_mutex_lock(&g_devlist_mutex); 2657 dev = i->dev; 2658 if (status) { 2659 goto end; 2660 } 2661 2662 thread = TAILQ_NEXT(i->cur_thread, tailq); 2663 while (thread) { 2664 ch = thread_get_io_channel(thread, dev); 2665 if (ch != NULL) { 2666 i->cur_thread = thread; 2667 i->ch = ch; 2668 pthread_mutex_unlock(&g_devlist_mutex); 2669 rc = spdk_thread_send_msg(thread, _call_channel, i); 2670 assert(rc == 0); 2671 return; 2672 } 2673 thread = TAILQ_NEXT(thread, tailq); 2674 } 2675 2676 end: 2677 dev->for_each_count--; 2678 i->ch = NULL; 2679 pthread_mutex_unlock(&g_devlist_mutex); 2680 2681 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2682 assert(rc == 0); 2683 2684 pthread_mutex_lock(&g_devlist_mutex); 2685 if (dev->pending_unregister && dev->for_each_count == 0) { 2686 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2687 assert(rc == 0); 2688 } 2689 pthread_mutex_unlock(&g_devlist_mutex); 2690 } 2691 2692 static void 2693 thread_interrupt_destroy(struct spdk_thread *thread) 2694 { 2695 struct spdk_fd_group *fgrp = thread->fgrp; 2696 2697 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2698 2699 if (thread->msg_fd < 0) { 2700 return; 2701 } 2702 2703 spdk_fd_group_remove(fgrp, thread->msg_fd); 2704 close(thread->msg_fd); 2705 thread->msg_fd = -1; 2706 2707 spdk_fd_group_destroy(fgrp); 2708 thread->fgrp = NULL; 2709 } 2710 2711 #ifdef __linux__ 2712 static int 2713 thread_interrupt_msg_process(void *arg) 2714 { 2715 struct spdk_thread *thread = arg; 2716 struct spdk_thread *orig_thread; 2717 uint32_t msg_count; 2718 spdk_msg_fn critical_msg; 2719 int rc = 0; 2720 uint64_t notify = 1; 2721 2722 assert(spdk_interrupt_mode_is_enabled()); 2723 2724 orig_thread = spdk_get_thread(); 2725 spdk_set_thread(thread); 2726 2727 /* There may be race between msg_acknowledge and another producer's msg_notify, 2728 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2729 * This can avoid msg notification missing. 2730 */ 2731 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2732 if (rc < 0 && errno != EAGAIN) { 2733 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2734 } 2735 2736 critical_msg = thread->critical_msg; 2737 if (spdk_unlikely(critical_msg != NULL)) { 2738 critical_msg(NULL); 2739 thread->critical_msg = NULL; 2740 rc = 1; 2741 } 2742 2743 msg_count = msg_queue_run_batch(thread, 0); 2744 if (msg_count) { 2745 rc = 1; 2746 } 2747 2748 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2749 if (spdk_unlikely(!thread->in_interrupt)) { 2750 /* The thread transitioned to poll mode in a msg during the above processing. 2751 * Clear msg_fd since thread messages will be polled directly in poll mode. 2752 */ 2753 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2754 if (rc < 0 && errno != EAGAIN) { 2755 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2756 } 2757 } 2758 2759 spdk_set_thread(orig_thread); 2760 return rc; 2761 } 2762 2763 static int 2764 thread_interrupt_create(struct spdk_thread *thread) 2765 { 2766 int rc; 2767 2768 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2769 2770 rc = spdk_fd_group_create(&thread->fgrp); 2771 if (rc) { 2772 return rc; 2773 } 2774 2775 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2776 if (thread->msg_fd < 0) { 2777 rc = -errno; 2778 spdk_fd_group_destroy(thread->fgrp); 2779 thread->fgrp = NULL; 2780 2781 return rc; 2782 } 2783 2784 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2785 thread_interrupt_msg_process, thread); 2786 } 2787 #else 2788 static int 2789 thread_interrupt_create(struct spdk_thread *thread) 2790 { 2791 return -ENOTSUP; 2792 } 2793 #endif 2794 2795 static int 2796 _interrupt_wrapper(void *ctx) 2797 { 2798 struct spdk_interrupt *intr = ctx; 2799 struct spdk_thread *orig_thread, *thread; 2800 int rc; 2801 2802 orig_thread = spdk_get_thread(); 2803 thread = intr->thread; 2804 2805 spdk_set_thread(thread); 2806 2807 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2808 intr->fn, intr->arg); 2809 2810 rc = intr->fn(intr->arg); 2811 2812 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2813 2814 spdk_set_thread(orig_thread); 2815 2816 return rc; 2817 } 2818 2819 struct spdk_interrupt * 2820 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2821 void *arg, const char *name) 2822 { 2823 return spdk_interrupt_register_for_events(efd, SPDK_INTERRUPT_EVENT_IN, fn, arg, name); 2824 } 2825 2826 struct spdk_interrupt * 2827 spdk_interrupt_register_for_events(int efd, uint32_t events, spdk_interrupt_fn fn, void *arg, 2828 const char *name) 2829 { 2830 struct spdk_thread *thread; 2831 struct spdk_interrupt *intr; 2832 int ret; 2833 2834 thread = spdk_get_thread(); 2835 if (!thread) { 2836 assert(false); 2837 return NULL; 2838 } 2839 2840 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2841 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2842 return NULL; 2843 } 2844 2845 intr = calloc(1, sizeof(*intr)); 2846 if (intr == NULL) { 2847 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2848 return NULL; 2849 } 2850 2851 if (name) { 2852 snprintf(intr->name, sizeof(intr->name), "%s", name); 2853 } else { 2854 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2855 } 2856 2857 intr->efd = efd; 2858 intr->thread = thread; 2859 intr->fn = fn; 2860 intr->arg = arg; 2861 2862 ret = spdk_fd_group_add_for_events(thread->fgrp, efd, events, _interrupt_wrapper, intr, intr->name); 2863 2864 if (ret != 0) { 2865 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2866 thread->name, efd, spdk_strerror(-ret)); 2867 free(intr); 2868 return NULL; 2869 } 2870 2871 return intr; 2872 } 2873 2874 void 2875 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2876 { 2877 struct spdk_thread *thread; 2878 struct spdk_interrupt *intr; 2879 2880 intr = *pintr; 2881 if (intr == NULL) { 2882 return; 2883 } 2884 2885 *pintr = NULL; 2886 2887 thread = spdk_get_thread(); 2888 if (!thread) { 2889 assert(false); 2890 return; 2891 } 2892 2893 if (intr->thread != thread) { 2894 wrong_thread(__func__, intr->name, intr->thread, thread); 2895 return; 2896 } 2897 2898 spdk_fd_group_remove(thread->fgrp, intr->efd); 2899 free(intr); 2900 } 2901 2902 int 2903 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2904 enum spdk_interrupt_event_types event_types) 2905 { 2906 struct spdk_thread *thread; 2907 2908 thread = spdk_get_thread(); 2909 if (!thread) { 2910 assert(false); 2911 return -EINVAL; 2912 } 2913 2914 if (intr->thread != thread) { 2915 wrong_thread(__func__, intr->name, intr->thread, thread); 2916 return -EINVAL; 2917 } 2918 2919 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2920 } 2921 2922 int 2923 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2924 { 2925 return spdk_fd_group_get_fd(thread->fgrp); 2926 } 2927 2928 struct spdk_fd_group * 2929 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2930 { 2931 return thread->fgrp; 2932 } 2933 2934 static bool g_interrupt_mode = false; 2935 2936 int 2937 spdk_interrupt_mode_enable(void) 2938 { 2939 /* It must be called once prior to initializing the threading library. 2940 * g_spdk_msg_mempool will be valid if thread library is initialized. 2941 */ 2942 if (g_spdk_msg_mempool) { 2943 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2944 return -1; 2945 } 2946 2947 #ifdef __linux__ 2948 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2949 g_interrupt_mode = true; 2950 return 0; 2951 #else 2952 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2953 g_interrupt_mode = false; 2954 return -ENOTSUP; 2955 #endif 2956 } 2957 2958 bool 2959 spdk_interrupt_mode_is_enabled(void) 2960 { 2961 return g_interrupt_mode; 2962 } 2963 2964 #define SSPIN_DEBUG_STACK_FRAMES 16 2965 2966 struct sspin_stack { 2967 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 2968 uint32_t depth; 2969 }; 2970 2971 struct spdk_spinlock_internal { 2972 struct sspin_stack init_stack; 2973 struct sspin_stack lock_stack; 2974 struct sspin_stack unlock_stack; 2975 }; 2976 2977 static void 2978 sspin_init_internal(struct spdk_spinlock *sspin) 2979 { 2980 #ifdef DEBUG 2981 sspin->internal = calloc(1, sizeof(*sspin->internal)); 2982 #endif 2983 } 2984 2985 static void 2986 sspin_fini_internal(struct spdk_spinlock *sspin) 2987 { 2988 #ifdef DEBUG 2989 free(sspin->internal); 2990 sspin->internal = NULL; 2991 #endif 2992 } 2993 2994 #if defined(DEBUG) && defined(SPDK_HAVE_EXECINFO_H) 2995 #define SSPIN_GET_STACK(sspin, which) \ 2996 do { \ 2997 if (sspin->internal != NULL) { \ 2998 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 2999 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 3000 } \ 3001 } while (0) 3002 #else 3003 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 3004 #endif 3005 3006 static void 3007 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 3008 { 3009 #ifdef SPDK_HAVE_EXECINFO_H 3010 char **stack; 3011 size_t i; 3012 3013 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 3014 if (stack == NULL) { 3015 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 3016 return; 3017 } 3018 SPDK_ERRLOG(" %s:\n", title); 3019 for (i = 0; i < sspin_stack->depth; i++) { 3020 /* 3021 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 3022 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 3023 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 3024 * gdb_macros.md in the docs directory for details. 3025 */ 3026 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 3027 } 3028 free(stack); 3029 #endif /* SPDK_HAVE_EXECINFO_H */ 3030 } 3031 3032 static void 3033 sspin_stacks_print(const struct spdk_spinlock *sspin) 3034 { 3035 if (sspin->internal == NULL) { 3036 return; 3037 } 3038 SPDK_ERRLOG("spinlock %p\n", sspin); 3039 sspin_stack_print("Lock initialized at", &sspin->internal->init_stack); 3040 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 3041 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 3042 } 3043 3044 void 3045 spdk_spin_init(struct spdk_spinlock *sspin) 3046 { 3047 int rc; 3048 3049 memset(sspin, 0, sizeof(*sspin)); 3050 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 3051 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3052 sspin_init_internal(sspin); 3053 SSPIN_GET_STACK(sspin, init); 3054 sspin->initialized = true; 3055 } 3056 3057 void 3058 spdk_spin_destroy(struct spdk_spinlock *sspin) 3059 { 3060 int rc; 3061 3062 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3063 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3064 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 3065 3066 rc = pthread_spin_destroy(&sspin->spinlock); 3067 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3068 3069 sspin_fini_internal(sspin); 3070 sspin->initialized = false; 3071 sspin->destroyed = true; 3072 } 3073 3074 void 3075 spdk_spin_lock(struct spdk_spinlock *sspin) 3076 { 3077 struct spdk_thread *thread = spdk_get_thread(); 3078 int rc; 3079 3080 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3081 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3082 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3083 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3084 3085 rc = pthread_spin_lock(&sspin->spinlock); 3086 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3087 3088 sspin->thread = thread; 3089 sspin->thread->lock_count++; 3090 3091 SSPIN_GET_STACK(sspin, lock); 3092 } 3093 3094 void 3095 spdk_spin_unlock(struct spdk_spinlock *sspin) 3096 { 3097 struct spdk_thread *thread = spdk_get_thread(); 3098 int rc; 3099 3100 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3101 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3102 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3103 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3104 3105 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3106 thread->lock_count--; 3107 sspin->thread = NULL; 3108 3109 SSPIN_GET_STACK(sspin, unlock); 3110 3111 rc = pthread_spin_unlock(&sspin->spinlock); 3112 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3113 } 3114 3115 bool 3116 spdk_spin_held(struct spdk_spinlock *sspin) 3117 { 3118 struct spdk_thread *thread = spdk_get_thread(); 3119 3120 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3121 3122 return sspin->thread == thread; 3123 } 3124 3125 SPDK_LOG_REGISTER_COMPONENT(thread) 3126