1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #endif 29 30 #ifdef SPDK_HAVE_EXECINFO_H 31 #include <execinfo.h> 32 #endif 33 34 #define SPDK_MSG_BATCH_SIZE 8 35 #define SPDK_MAX_DEVICE_NAME_LEN 256 36 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 37 #define SPDK_MAX_POLLER_NAME_LEN 256 38 #define SPDK_MAX_THREAD_NAME_LEN 256 39 40 static struct spdk_thread *g_app_thread; 41 42 struct spdk_interrupt { 43 int efd; 44 struct spdk_thread *thread; 45 spdk_interrupt_fn fn; 46 void *arg; 47 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 48 }; 49 50 enum spdk_poller_state { 51 /* The poller is registered with a thread but not currently executing its fn. */ 52 SPDK_POLLER_STATE_WAITING, 53 54 /* The poller is currently running its fn. */ 55 SPDK_POLLER_STATE_RUNNING, 56 57 /* The poller was unregistered during the execution of its fn. */ 58 SPDK_POLLER_STATE_UNREGISTERED, 59 60 /* The poller is in the process of being paused. It will be paused 61 * during the next time it's supposed to be executed. 62 */ 63 SPDK_POLLER_STATE_PAUSING, 64 65 /* The poller is registered but currently paused. It's on the 66 * paused_pollers list. 67 */ 68 SPDK_POLLER_STATE_PAUSED, 69 }; 70 71 struct spdk_poller { 72 TAILQ_ENTRY(spdk_poller) tailq; 73 RB_ENTRY(spdk_poller) node; 74 75 /* Current state of the poller; should only be accessed from the poller's thread. */ 76 enum spdk_poller_state state; 77 78 uint64_t period_ticks; 79 uint64_t next_run_tick; 80 uint64_t run_count; 81 uint64_t busy_count; 82 uint64_t id; 83 spdk_poller_fn fn; 84 void *arg; 85 struct spdk_thread *thread; 86 struct spdk_interrupt *intr; 87 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 88 void *set_intr_cb_arg; 89 90 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 91 }; 92 93 enum spdk_thread_state { 94 /* The thread is processing poller and message by spdk_thread_poll(). */ 95 SPDK_THREAD_STATE_RUNNING, 96 97 /* The thread is in the process of termination. It reaps unregistering 98 * poller are releasing I/O channel. 99 */ 100 SPDK_THREAD_STATE_EXITING, 101 102 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 103 SPDK_THREAD_STATE_EXITED, 104 }; 105 106 struct spdk_thread { 107 uint64_t tsc_last; 108 struct spdk_thread_stats stats; 109 /* 110 * Contains pollers actively running on this thread. Pollers 111 * are run round-robin. The thread takes one poller from the head 112 * of the ring, executes it, then puts it back at the tail of 113 * the ring. 114 */ 115 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 116 /** 117 * Contains pollers running on this thread with a periodic timer. 118 */ 119 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 120 struct spdk_poller *first_timed_poller; 121 /* 122 * Contains paused pollers. Pollers on this queue are waiting until 123 * they are resumed (in which case they're put onto the active/timer 124 * queues) or unregistered. 125 */ 126 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 127 struct spdk_ring *messages; 128 int msg_fd; 129 SLIST_HEAD(, spdk_msg) msg_cache; 130 size_t msg_cache_count; 131 spdk_msg_fn critical_msg; 132 uint64_t id; 133 uint64_t next_poller_id; 134 enum spdk_thread_state state; 135 int pending_unregister_count; 136 uint32_t for_each_count; 137 138 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 139 TAILQ_ENTRY(spdk_thread) tailq; 140 141 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 142 struct spdk_cpuset cpumask; 143 uint64_t exit_timeout_tsc; 144 145 int32_t lock_count; 146 147 /* spdk_thread is bound to current CPU core. */ 148 bool is_bound; 149 150 /* Indicates whether this spdk_thread currently runs in interrupt. */ 151 bool in_interrupt; 152 bool poller_unregistered; 153 struct spdk_fd_group *fgrp; 154 155 /* User context allocated at the end */ 156 uint8_t ctx[0]; 157 }; 158 159 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 160 161 static spdk_new_thread_fn g_new_thread_fn = NULL; 162 static spdk_thread_op_fn g_thread_op_fn = NULL; 163 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 164 static size_t g_ctx_sz = 0; 165 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 166 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 167 * SPDK application is required. 168 */ 169 static uint64_t g_thread_id = 1; 170 171 enum spin_error { 172 SPIN_ERR_NONE, 173 /* Trying to use an SPDK lock while not on an SPDK thread */ 174 SPIN_ERR_NOT_SPDK_THREAD, 175 /* Trying to lock a lock already held by this SPDK thread */ 176 SPIN_ERR_DEADLOCK, 177 /* Trying to unlock a lock not held by this SPDK thread */ 178 SPIN_ERR_WRONG_THREAD, 179 /* pthread_spin_*() returned an error */ 180 SPIN_ERR_PTHREAD, 181 /* Trying to destroy a lock that is held */ 182 SPIN_ERR_LOCK_HELD, 183 /* lock_count is invalid */ 184 SPIN_ERR_LOCK_COUNT, 185 /* 186 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 187 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 188 * deadlock when another SPDK thread on the same pthread tries to take that lock. 189 */ 190 SPIN_ERR_HOLD_DURING_SWITCH, 191 /* Trying to use a lock that was destroyed (but not re-initialized) */ 192 SPIN_ERR_DESTROYED, 193 /* Trying to use a lock that is not initialized */ 194 SPIN_ERR_NOT_INITIALIZED, 195 196 /* Must be last, not an actual error code */ 197 SPIN_ERR_LAST 198 }; 199 200 static const char *spin_error_strings[] = { 201 [SPIN_ERR_NONE] = "No error", 202 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 203 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 204 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 205 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 206 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 207 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 208 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 209 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 210 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 211 }; 212 213 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 214 ? "Unknown error" : spin_error_strings[err] 215 216 static void 217 __posix_abort(enum spin_error err) 218 { 219 abort(); 220 } 221 222 typedef void (*spin_abort)(enum spin_error err); 223 spin_abort g_spin_abort_fn = __posix_abort; 224 225 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 226 do { \ 227 if (spdk_unlikely(!(cond))) { \ 228 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 229 SPIN_ERROR_STRING(err), #cond); \ 230 extra_log; \ 231 g_spin_abort_fn(err); \ 232 ret; \ 233 } \ 234 } while (0) 235 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 236 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 237 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 238 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 239 240 struct io_device { 241 void *io_device; 242 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 243 spdk_io_channel_create_cb create_cb; 244 spdk_io_channel_destroy_cb destroy_cb; 245 spdk_io_device_unregister_cb unregister_cb; 246 struct spdk_thread *unregister_thread; 247 uint32_t ctx_size; 248 uint32_t for_each_count; 249 RB_ENTRY(io_device) node; 250 251 uint32_t refcnt; 252 253 bool pending_unregister; 254 bool unregistered; 255 }; 256 257 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 258 259 static int 260 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 261 { 262 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 263 } 264 265 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 266 267 static int 268 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 269 { 270 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 271 } 272 273 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 274 275 struct spdk_msg { 276 spdk_msg_fn fn; 277 void *arg; 278 279 SLIST_ENTRY(spdk_msg) link; 280 }; 281 282 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 283 284 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 285 static uint32_t g_thread_count = 0; 286 287 static __thread struct spdk_thread *tls_thread = NULL; 288 289 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 290 { 291 struct spdk_trace_tpoint_opts opts[] = { 292 { 293 "THREAD_IOCH_GET", TRACE_THREAD_IOCH_GET, 294 OWNER_TYPE_NONE, OBJECT_NONE, 0, 295 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 296 }, 297 { 298 "THREAD_IOCH_PUT", TRACE_THREAD_IOCH_PUT, 299 OWNER_TYPE_NONE, OBJECT_NONE, 0, 300 {{ "refcnt", SPDK_TRACE_ARG_TYPE_INT, 4 }} 301 } 302 }; 303 304 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 305 } 306 307 /* 308 * If this compare function returns zero when two next_run_ticks are equal, 309 * the macro RB_INSERT() returns a pointer to the element with the same 310 * next_run_tick. 311 * 312 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 313 * to remove as a parameter. 314 * 315 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 316 * side by returning 1 when two next_run_ticks are equal. 317 */ 318 static inline int 319 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 320 { 321 if (poller1->next_run_tick < poller2->next_run_tick) { 322 return -1; 323 } else { 324 return 1; 325 } 326 } 327 328 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 329 330 static inline struct spdk_thread * 331 _get_thread(void) 332 { 333 return tls_thread; 334 } 335 336 static int 337 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 338 { 339 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 340 341 g_ctx_sz = ctx_sz; 342 343 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 344 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 345 sizeof(struct spdk_msg), 346 0, /* No cache. We do our own. */ 347 SPDK_ENV_SOCKET_ID_ANY); 348 349 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 350 msg_mempool_sz); 351 352 if (!g_spdk_msg_mempool) { 353 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 354 return -ENOMEM; 355 } 356 357 return 0; 358 } 359 360 static void thread_interrupt_destroy(struct spdk_thread *thread); 361 static int thread_interrupt_create(struct spdk_thread *thread); 362 363 static void 364 _free_thread(struct spdk_thread *thread) 365 { 366 struct spdk_io_channel *ch; 367 struct spdk_msg *msg; 368 struct spdk_poller *poller, *ptmp; 369 370 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 371 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 372 thread->name, ch->dev->name); 373 } 374 375 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 376 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 377 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 378 poller->name); 379 } 380 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 381 free(poller); 382 } 383 384 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 385 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 386 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 387 poller->name); 388 } 389 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 390 free(poller); 391 } 392 393 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 394 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 395 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 396 free(poller); 397 } 398 399 pthread_mutex_lock(&g_devlist_mutex); 400 assert(g_thread_count > 0); 401 g_thread_count--; 402 TAILQ_REMOVE(&g_threads, thread, tailq); 403 pthread_mutex_unlock(&g_devlist_mutex); 404 405 msg = SLIST_FIRST(&thread->msg_cache); 406 while (msg != NULL) { 407 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 408 409 assert(thread->msg_cache_count > 0); 410 thread->msg_cache_count--; 411 spdk_mempool_put(g_spdk_msg_mempool, msg); 412 413 msg = SLIST_FIRST(&thread->msg_cache); 414 } 415 416 assert(thread->msg_cache_count == 0); 417 418 if (spdk_interrupt_mode_is_enabled()) { 419 thread_interrupt_destroy(thread); 420 } 421 422 spdk_ring_free(thread->messages); 423 free(thread); 424 } 425 426 int 427 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 428 { 429 assert(g_new_thread_fn == NULL); 430 assert(g_thread_op_fn == NULL); 431 432 if (new_thread_fn == NULL) { 433 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 434 } else { 435 g_new_thread_fn = new_thread_fn; 436 } 437 438 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 439 } 440 441 int 442 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 443 spdk_thread_op_supported_fn thread_op_supported_fn, 444 size_t ctx_sz, size_t msg_mempool_sz) 445 { 446 assert(g_new_thread_fn == NULL); 447 assert(g_thread_op_fn == NULL); 448 assert(g_thread_op_supported_fn == NULL); 449 450 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 451 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 452 return -EINVAL; 453 } 454 455 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 456 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 457 } else { 458 g_thread_op_fn = thread_op_fn; 459 g_thread_op_supported_fn = thread_op_supported_fn; 460 } 461 462 return _thread_lib_init(ctx_sz, msg_mempool_sz); 463 } 464 465 void 466 spdk_thread_lib_fini(void) 467 { 468 struct io_device *dev; 469 470 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 471 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 472 } 473 474 g_new_thread_fn = NULL; 475 g_thread_op_fn = NULL; 476 g_thread_op_supported_fn = NULL; 477 g_ctx_sz = 0; 478 if (g_app_thread != NULL) { 479 _free_thread(g_app_thread); 480 g_app_thread = NULL; 481 } 482 483 if (g_spdk_msg_mempool) { 484 spdk_mempool_free(g_spdk_msg_mempool); 485 g_spdk_msg_mempool = NULL; 486 } 487 } 488 489 struct spdk_thread * 490 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 491 { 492 struct spdk_thread *thread, *null_thread; 493 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 494 int rc = 0, i; 495 496 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 497 if (!thread) { 498 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 499 return NULL; 500 } 501 502 if (cpumask) { 503 spdk_cpuset_copy(&thread->cpumask, cpumask); 504 } else { 505 spdk_cpuset_negate(&thread->cpumask); 506 } 507 508 RB_INIT(&thread->io_channels); 509 TAILQ_INIT(&thread->active_pollers); 510 RB_INIT(&thread->timed_pollers); 511 TAILQ_INIT(&thread->paused_pollers); 512 SLIST_INIT(&thread->msg_cache); 513 thread->msg_cache_count = 0; 514 515 thread->tsc_last = spdk_get_ticks(); 516 517 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 518 * ID exceeds UINT64_MAX a warning message is logged 519 */ 520 thread->next_poller_id = 1; 521 522 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 523 if (!thread->messages) { 524 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 525 free(thread); 526 return NULL; 527 } 528 529 /* Fill the local message pool cache. */ 530 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 531 if (rc == 0) { 532 /* If we can't populate the cache it's ok. The cache will get filled 533 * up organically as messages are passed to the thread. */ 534 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 535 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 536 thread->msg_cache_count++; 537 } 538 } 539 540 if (name) { 541 snprintf(thread->name, sizeof(thread->name), "%s", name); 542 } else { 543 snprintf(thread->name, sizeof(thread->name), "%p", thread); 544 } 545 546 pthread_mutex_lock(&g_devlist_mutex); 547 if (g_thread_id == 0) { 548 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 549 pthread_mutex_unlock(&g_devlist_mutex); 550 _free_thread(thread); 551 return NULL; 552 } 553 thread->id = g_thread_id++; 554 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 555 g_thread_count++; 556 pthread_mutex_unlock(&g_devlist_mutex); 557 558 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 559 thread->id, thread->name); 560 561 if (spdk_interrupt_mode_is_enabled()) { 562 thread->in_interrupt = true; 563 rc = thread_interrupt_create(thread); 564 if (rc != 0) { 565 _free_thread(thread); 566 return NULL; 567 } 568 } 569 570 if (g_new_thread_fn) { 571 rc = g_new_thread_fn(thread); 572 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 573 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 574 } 575 576 if (rc != 0) { 577 _free_thread(thread); 578 return NULL; 579 } 580 581 thread->state = SPDK_THREAD_STATE_RUNNING; 582 583 /* If this is the first thread, save it as the app thread. Use an atomic 584 * compare + exchange to guard against crazy users who might try to 585 * call spdk_thread_create() simultaneously on multiple threads. 586 */ 587 null_thread = NULL; 588 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 589 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 590 591 return thread; 592 } 593 594 struct spdk_thread * 595 spdk_thread_get_app_thread(void) 596 { 597 return g_app_thread; 598 } 599 600 bool 601 spdk_thread_is_app_thread(struct spdk_thread *thread) 602 { 603 if (thread == NULL) { 604 thread = _get_thread(); 605 } 606 607 return g_app_thread == thread; 608 } 609 610 void 611 spdk_thread_bind(struct spdk_thread *thread, bool bind) 612 { 613 thread->is_bound = bind; 614 } 615 616 bool 617 spdk_thread_is_bound(struct spdk_thread *thread) 618 { 619 return thread->is_bound; 620 } 621 622 void 623 spdk_set_thread(struct spdk_thread *thread) 624 { 625 tls_thread = thread; 626 } 627 628 static void 629 thread_exit(struct spdk_thread *thread, uint64_t now) 630 { 631 struct spdk_poller *poller; 632 struct spdk_io_channel *ch; 633 634 if (now >= thread->exit_timeout_tsc) { 635 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 636 thread->name); 637 goto exited; 638 } 639 640 if (spdk_ring_count(thread->messages) > 0) { 641 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 642 return; 643 } 644 645 if (thread->for_each_count > 0) { 646 SPDK_INFOLOG(thread, "thread %s is still executing %u for_each_channels/threads\n", 647 thread->name, thread->for_each_count); 648 return; 649 } 650 651 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 652 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 653 SPDK_INFOLOG(thread, 654 "thread %s still has active poller %s\n", 655 thread->name, poller->name); 656 return; 657 } 658 } 659 660 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 661 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 662 SPDK_INFOLOG(thread, 663 "thread %s still has active timed poller %s\n", 664 thread->name, poller->name); 665 return; 666 } 667 } 668 669 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 670 SPDK_INFOLOG(thread, 671 "thread %s still has paused poller %s\n", 672 thread->name, poller->name); 673 return; 674 } 675 676 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 677 SPDK_INFOLOG(thread, 678 "thread %s still has channel for io_device %s\n", 679 thread->name, ch->dev->name); 680 return; 681 } 682 683 if (thread->pending_unregister_count > 0) { 684 SPDK_INFOLOG(thread, 685 "thread %s is still unregistering io_devices\n", 686 thread->name); 687 return; 688 } 689 690 exited: 691 thread->state = SPDK_THREAD_STATE_EXITED; 692 if (spdk_unlikely(thread->in_interrupt)) { 693 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 694 } 695 } 696 697 static void _thread_exit(void *ctx); 698 699 int 700 spdk_thread_exit(struct spdk_thread *thread) 701 { 702 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 703 704 assert(tls_thread == thread); 705 706 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 707 SPDK_INFOLOG(thread, 708 "thread %s is already exiting\n", 709 thread->name); 710 return 0; 711 } 712 713 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 714 SPDK_THREAD_EXIT_TIMEOUT_SEC); 715 thread->state = SPDK_THREAD_STATE_EXITING; 716 717 if (spdk_interrupt_mode_is_enabled()) { 718 spdk_thread_send_msg(thread, _thread_exit, thread); 719 } 720 721 return 0; 722 } 723 724 bool 725 spdk_thread_is_running(struct spdk_thread *thread) 726 { 727 return thread->state == SPDK_THREAD_STATE_RUNNING; 728 } 729 730 bool 731 spdk_thread_is_exited(struct spdk_thread *thread) 732 { 733 return thread->state == SPDK_THREAD_STATE_EXITED; 734 } 735 736 void 737 spdk_thread_destroy(struct spdk_thread *thread) 738 { 739 assert(thread != NULL); 740 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 741 742 assert(thread->state == SPDK_THREAD_STATE_EXITED); 743 744 if (tls_thread == thread) { 745 tls_thread = NULL; 746 } 747 748 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 749 if (thread != g_app_thread) { 750 _free_thread(thread); 751 } 752 } 753 754 void * 755 spdk_thread_get_ctx(struct spdk_thread *thread) 756 { 757 if (g_ctx_sz > 0) { 758 return thread->ctx; 759 } 760 761 return NULL; 762 } 763 764 struct spdk_cpuset * 765 spdk_thread_get_cpumask(struct spdk_thread *thread) 766 { 767 return &thread->cpumask; 768 } 769 770 int 771 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 772 { 773 struct spdk_thread *thread; 774 775 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 776 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 777 assert(false); 778 return -ENOTSUP; 779 } 780 781 thread = spdk_get_thread(); 782 if (!thread) { 783 SPDK_ERRLOG("Called from non-SPDK thread\n"); 784 assert(false); 785 return -EINVAL; 786 } 787 788 spdk_cpuset_copy(&thread->cpumask, cpumask); 789 790 /* Invoke framework's reschedule operation. If this function is called multiple times 791 * in a single spdk_thread_poll() context, the last cpumask will be used in the 792 * reschedule operation. 793 */ 794 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 795 796 return 0; 797 } 798 799 struct spdk_thread * 800 spdk_thread_get_from_ctx(void *ctx) 801 { 802 if (ctx == NULL) { 803 assert(false); 804 return NULL; 805 } 806 807 assert(g_ctx_sz > 0); 808 809 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 810 } 811 812 static inline uint32_t 813 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 814 { 815 unsigned count, i; 816 void *messages[SPDK_MSG_BATCH_SIZE]; 817 uint64_t notify = 1; 818 int rc; 819 820 #ifdef DEBUG 821 /* 822 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 823 * so we will never actually read uninitialized data from events, but just to be sure 824 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 825 */ 826 memset(messages, 0, sizeof(messages)); 827 #endif 828 829 if (max_msgs > 0) { 830 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 831 } else { 832 max_msgs = SPDK_MSG_BATCH_SIZE; 833 } 834 835 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 836 if (spdk_unlikely(thread->in_interrupt) && 837 spdk_ring_count(thread->messages) != 0) { 838 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 839 if (rc < 0) { 840 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 841 } 842 } 843 if (count == 0) { 844 return 0; 845 } 846 847 for (i = 0; i < count; i++) { 848 struct spdk_msg *msg = messages[i]; 849 850 assert(msg != NULL); 851 852 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 853 854 msg->fn(msg->arg); 855 856 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 857 858 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 859 /* Insert the messages at the head. We want to re-use the hot 860 * ones. */ 861 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 862 thread->msg_cache_count++; 863 } else { 864 spdk_mempool_put(g_spdk_msg_mempool, msg); 865 } 866 } 867 868 return count; 869 } 870 871 static void 872 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 873 { 874 struct spdk_poller *tmp __attribute__((unused)); 875 876 poller->next_run_tick = now + poller->period_ticks; 877 878 /* 879 * Insert poller in the thread's timed_pollers tree by next scheduled run time 880 * as its key. 881 */ 882 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 883 assert(tmp == NULL); 884 885 /* Update the cache only if it is empty or the inserted poller is earlier than it. 886 * RB_MIN() is not necessary here because all pollers, which has exactly the same 887 * next_run_tick as the existing poller, are inserted on the right side. 888 */ 889 if (thread->first_timed_poller == NULL || 890 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 891 thread->first_timed_poller = poller; 892 } 893 } 894 895 static inline void 896 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 897 { 898 struct spdk_poller *tmp __attribute__((unused)); 899 900 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 901 assert(tmp != NULL); 902 903 /* This function is not used in any case that is performance critical. 904 * Update the cache simply by RB_MIN() if it needs to be changed. 905 */ 906 if (thread->first_timed_poller == poller) { 907 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 908 } 909 } 910 911 static void 912 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 913 { 914 if (poller->period_ticks) { 915 poller_insert_timer(thread, poller, spdk_get_ticks()); 916 } else { 917 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 918 } 919 } 920 921 static inline void 922 thread_update_stats(struct spdk_thread *thread, uint64_t end, 923 uint64_t start, int rc) 924 { 925 if (rc == 0) { 926 /* Poller status idle */ 927 thread->stats.idle_tsc += end - start; 928 } else if (rc > 0) { 929 /* Poller status busy */ 930 thread->stats.busy_tsc += end - start; 931 } 932 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 933 thread->tsc_last = end; 934 } 935 936 static inline int 937 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 938 { 939 int rc; 940 941 switch (poller->state) { 942 case SPDK_POLLER_STATE_UNREGISTERED: 943 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 944 free(poller); 945 return 0; 946 case SPDK_POLLER_STATE_PAUSING: 947 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 948 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 949 poller->state = SPDK_POLLER_STATE_PAUSED; 950 return 0; 951 case SPDK_POLLER_STATE_WAITING: 952 break; 953 default: 954 assert(false); 955 break; 956 } 957 958 poller->state = SPDK_POLLER_STATE_RUNNING; 959 rc = poller->fn(poller->arg); 960 961 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 962 963 poller->run_count++; 964 if (rc > 0) { 965 poller->busy_count++; 966 } 967 968 #ifdef DEBUG 969 if (rc == -1) { 970 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 971 } 972 #endif 973 974 switch (poller->state) { 975 case SPDK_POLLER_STATE_UNREGISTERED: 976 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 977 free(poller); 978 break; 979 case SPDK_POLLER_STATE_PAUSING: 980 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 981 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 982 poller->state = SPDK_POLLER_STATE_PAUSED; 983 break; 984 case SPDK_POLLER_STATE_PAUSED: 985 case SPDK_POLLER_STATE_WAITING: 986 break; 987 case SPDK_POLLER_STATE_RUNNING: 988 poller->state = SPDK_POLLER_STATE_WAITING; 989 break; 990 default: 991 assert(false); 992 break; 993 } 994 995 return rc; 996 } 997 998 static inline int 999 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 1000 uint64_t now) 1001 { 1002 int rc; 1003 1004 switch (poller->state) { 1005 case SPDK_POLLER_STATE_UNREGISTERED: 1006 free(poller); 1007 return 0; 1008 case SPDK_POLLER_STATE_PAUSING: 1009 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1010 poller->state = SPDK_POLLER_STATE_PAUSED; 1011 return 0; 1012 case SPDK_POLLER_STATE_WAITING: 1013 break; 1014 default: 1015 assert(false); 1016 break; 1017 } 1018 1019 poller->state = SPDK_POLLER_STATE_RUNNING; 1020 rc = poller->fn(poller->arg); 1021 1022 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1023 1024 poller->run_count++; 1025 if (rc > 0) { 1026 poller->busy_count++; 1027 } 1028 1029 #ifdef DEBUG 1030 if (rc == -1) { 1031 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1032 } 1033 #endif 1034 1035 switch (poller->state) { 1036 case SPDK_POLLER_STATE_UNREGISTERED: 1037 free(poller); 1038 break; 1039 case SPDK_POLLER_STATE_PAUSING: 1040 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1041 poller->state = SPDK_POLLER_STATE_PAUSED; 1042 break; 1043 case SPDK_POLLER_STATE_PAUSED: 1044 break; 1045 case SPDK_POLLER_STATE_RUNNING: 1046 poller->state = SPDK_POLLER_STATE_WAITING; 1047 /* fallthrough */ 1048 case SPDK_POLLER_STATE_WAITING: 1049 poller_insert_timer(thread, poller, now); 1050 break; 1051 default: 1052 assert(false); 1053 break; 1054 } 1055 1056 return rc; 1057 } 1058 1059 static int 1060 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1061 { 1062 uint32_t msg_count; 1063 struct spdk_poller *poller, *tmp; 1064 spdk_msg_fn critical_msg; 1065 int rc = 0; 1066 1067 thread->tsc_last = now; 1068 1069 critical_msg = thread->critical_msg; 1070 if (spdk_unlikely(critical_msg != NULL)) { 1071 critical_msg(NULL); 1072 thread->critical_msg = NULL; 1073 rc = 1; 1074 } 1075 1076 msg_count = msg_queue_run_batch(thread, max_msgs); 1077 if (msg_count) { 1078 rc = 1; 1079 } 1080 1081 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1082 active_pollers_head, tailq, tmp) { 1083 int poller_rc; 1084 1085 poller_rc = thread_execute_poller(thread, poller); 1086 if (poller_rc > rc) { 1087 rc = poller_rc; 1088 } 1089 } 1090 1091 poller = thread->first_timed_poller; 1092 while (poller != NULL) { 1093 int timer_rc = 0; 1094 1095 if (now < poller->next_run_tick) { 1096 break; 1097 } 1098 1099 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1100 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1101 1102 /* Update the cache to the next timed poller in the list 1103 * only if the current poller is still the closest, otherwise, 1104 * do nothing because the cache has been already updated. 1105 */ 1106 if (thread->first_timed_poller == poller) { 1107 thread->first_timed_poller = tmp; 1108 } 1109 1110 timer_rc = thread_execute_timed_poller(thread, poller, now); 1111 if (timer_rc > rc) { 1112 rc = timer_rc; 1113 } 1114 1115 poller = tmp; 1116 } 1117 1118 return rc; 1119 } 1120 1121 static void 1122 _thread_remove_pollers(void *ctx) 1123 { 1124 struct spdk_thread *thread = ctx; 1125 struct spdk_poller *poller, *tmp; 1126 1127 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1128 active_pollers_head, tailq, tmp) { 1129 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1130 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1131 free(poller); 1132 } 1133 } 1134 1135 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1136 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1137 poller_remove_timer(thread, poller); 1138 free(poller); 1139 } 1140 } 1141 1142 thread->poller_unregistered = false; 1143 } 1144 1145 static void 1146 _thread_exit(void *ctx) 1147 { 1148 struct spdk_thread *thread = ctx; 1149 1150 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1151 1152 thread_exit(thread, spdk_get_ticks()); 1153 1154 if (thread->state != SPDK_THREAD_STATE_EXITED) { 1155 spdk_thread_send_msg(thread, _thread_exit, thread); 1156 } 1157 } 1158 1159 int 1160 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1161 { 1162 struct spdk_thread *orig_thread; 1163 int rc; 1164 1165 orig_thread = _get_thread(); 1166 tls_thread = thread; 1167 1168 if (now == 0) { 1169 now = spdk_get_ticks(); 1170 } 1171 1172 if (spdk_likely(!thread->in_interrupt)) { 1173 rc = thread_poll(thread, max_msgs, now); 1174 if (spdk_unlikely(thread->in_interrupt)) { 1175 /* The thread transitioned to interrupt mode during the above poll. 1176 * Poll it one more time in case that during the transition time 1177 * there is msg received without notification. 1178 */ 1179 rc = thread_poll(thread, max_msgs, now); 1180 } 1181 1182 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1183 thread_exit(thread, now); 1184 } 1185 } else { 1186 /* Non-block wait on thread's fd_group */ 1187 rc = spdk_fd_group_wait(thread->fgrp, 0); 1188 } 1189 1190 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1191 1192 tls_thread = orig_thread; 1193 1194 return rc; 1195 } 1196 1197 uint64_t 1198 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1199 { 1200 struct spdk_poller *poller; 1201 1202 poller = thread->first_timed_poller; 1203 if (poller) { 1204 return poller->next_run_tick; 1205 } 1206 1207 return 0; 1208 } 1209 1210 int 1211 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1212 { 1213 return !TAILQ_EMPTY(&thread->active_pollers); 1214 } 1215 1216 static bool 1217 thread_has_unpaused_pollers(struct spdk_thread *thread) 1218 { 1219 if (TAILQ_EMPTY(&thread->active_pollers) && 1220 RB_EMPTY(&thread->timed_pollers)) { 1221 return false; 1222 } 1223 1224 return true; 1225 } 1226 1227 bool 1228 spdk_thread_has_pollers(struct spdk_thread *thread) 1229 { 1230 if (!thread_has_unpaused_pollers(thread) && 1231 TAILQ_EMPTY(&thread->paused_pollers)) { 1232 return false; 1233 } 1234 1235 return true; 1236 } 1237 1238 bool 1239 spdk_thread_is_idle(struct spdk_thread *thread) 1240 { 1241 if (spdk_ring_count(thread->messages) || 1242 thread_has_unpaused_pollers(thread) || 1243 thread->critical_msg != NULL) { 1244 return false; 1245 } 1246 1247 return true; 1248 } 1249 1250 uint32_t 1251 spdk_thread_get_count(void) 1252 { 1253 /* 1254 * Return cached value of the current thread count. We could acquire the 1255 * lock and iterate through the TAILQ of threads to count them, but that 1256 * count could still be invalidated after we release the lock. 1257 */ 1258 return g_thread_count; 1259 } 1260 1261 struct spdk_thread * 1262 spdk_get_thread(void) 1263 { 1264 return _get_thread(); 1265 } 1266 1267 const char * 1268 spdk_thread_get_name(const struct spdk_thread *thread) 1269 { 1270 return thread->name; 1271 } 1272 1273 uint64_t 1274 spdk_thread_get_id(const struct spdk_thread *thread) 1275 { 1276 return thread->id; 1277 } 1278 1279 struct spdk_thread * 1280 spdk_thread_get_by_id(uint64_t id) 1281 { 1282 struct spdk_thread *thread; 1283 1284 if (id == 0 || id >= g_thread_id) { 1285 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1286 return NULL; 1287 } 1288 pthread_mutex_lock(&g_devlist_mutex); 1289 TAILQ_FOREACH(thread, &g_threads, tailq) { 1290 if (thread->id == id) { 1291 break; 1292 } 1293 } 1294 pthread_mutex_unlock(&g_devlist_mutex); 1295 return thread; 1296 } 1297 1298 int 1299 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1300 { 1301 struct spdk_thread *thread; 1302 1303 thread = _get_thread(); 1304 if (!thread) { 1305 SPDK_ERRLOG("No thread allocated\n"); 1306 return -EINVAL; 1307 } 1308 1309 if (stats == NULL) { 1310 return -EINVAL; 1311 } 1312 1313 *stats = thread->stats; 1314 1315 return 0; 1316 } 1317 1318 uint64_t 1319 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1320 { 1321 if (thread == NULL) { 1322 thread = _get_thread(); 1323 } 1324 1325 return thread->tsc_last; 1326 } 1327 1328 static inline int 1329 thread_send_msg_notification(const struct spdk_thread *target_thread) 1330 { 1331 uint64_t notify = 1; 1332 int rc; 1333 1334 /* Not necessary to do notification if interrupt facility is not enabled */ 1335 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1336 return 0; 1337 } 1338 1339 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1340 * after sending thread msg, it is necessary to check whether target thread runs in 1341 * interrupt mode and then decide whether do event notification. 1342 */ 1343 if (spdk_unlikely(target_thread->in_interrupt)) { 1344 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1345 if (rc < 0) { 1346 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1347 return -EIO; 1348 } 1349 } 1350 1351 return 0; 1352 } 1353 1354 int 1355 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1356 { 1357 struct spdk_thread *local_thread; 1358 struct spdk_msg *msg; 1359 int rc; 1360 1361 assert(thread != NULL); 1362 1363 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1364 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1365 return -EIO; 1366 } 1367 1368 local_thread = _get_thread(); 1369 1370 msg = NULL; 1371 if (local_thread != NULL) { 1372 if (local_thread->msg_cache_count > 0) { 1373 msg = SLIST_FIRST(&local_thread->msg_cache); 1374 assert(msg != NULL); 1375 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1376 local_thread->msg_cache_count--; 1377 } 1378 } 1379 1380 if (msg == NULL) { 1381 msg = spdk_mempool_get(g_spdk_msg_mempool); 1382 if (!msg) { 1383 SPDK_ERRLOG("msg could not be allocated\n"); 1384 return -ENOMEM; 1385 } 1386 } 1387 1388 msg->fn = fn; 1389 msg->arg = ctx; 1390 1391 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1392 if (rc != 1) { 1393 SPDK_ERRLOG("msg could not be enqueued\n"); 1394 spdk_mempool_put(g_spdk_msg_mempool, msg); 1395 return -EIO; 1396 } 1397 1398 return thread_send_msg_notification(thread); 1399 } 1400 1401 int 1402 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1403 { 1404 spdk_msg_fn expected = NULL; 1405 1406 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1407 __ATOMIC_SEQ_CST)) { 1408 return -EIO; 1409 } 1410 1411 return thread_send_msg_notification(thread); 1412 } 1413 1414 #ifdef __linux__ 1415 static int 1416 interrupt_timerfd_process(void *arg) 1417 { 1418 struct spdk_poller *poller = arg; 1419 uint64_t exp; 1420 int rc; 1421 1422 /* clear the level of interval timer */ 1423 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1424 if (rc < 0) { 1425 if (rc == -EAGAIN) { 1426 return 0; 1427 } 1428 1429 return rc; 1430 } 1431 1432 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1433 1434 return poller->fn(poller->arg); 1435 } 1436 1437 static int 1438 period_poller_interrupt_init(struct spdk_poller *poller) 1439 { 1440 int timerfd; 1441 1442 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1443 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1444 if (timerfd < 0) { 1445 return -errno; 1446 } 1447 1448 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1449 if (poller->intr == NULL) { 1450 close(timerfd); 1451 return -1; 1452 } 1453 1454 return 0; 1455 } 1456 1457 static void 1458 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1459 { 1460 int timerfd; 1461 uint64_t now_tick = spdk_get_ticks(); 1462 uint64_t ticks = spdk_get_ticks_hz(); 1463 int ret; 1464 struct itimerspec new_tv = {}; 1465 struct itimerspec old_tv = {}; 1466 1467 assert(poller->intr != NULL); 1468 assert(poller->period_ticks != 0); 1469 1470 timerfd = poller->intr->efd; 1471 1472 assert(timerfd >= 0); 1473 1474 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1475 interrupt_mode ? "interrupt" : "poll"); 1476 1477 if (interrupt_mode) { 1478 /* Set repeated timer expiration */ 1479 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1480 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1481 1482 /* Update next timer expiration */ 1483 if (poller->next_run_tick == 0) { 1484 poller->next_run_tick = now_tick + poller->period_ticks; 1485 } else if (poller->next_run_tick < now_tick) { 1486 poller->next_run_tick = now_tick; 1487 } 1488 1489 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1490 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1491 1492 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1493 if (ret < 0) { 1494 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1495 assert(false); 1496 } 1497 } else { 1498 /* Disarm the timer */ 1499 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1500 if (ret < 0) { 1501 /* timerfd_settime's failure indicates that the timerfd is in error */ 1502 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1503 assert(false); 1504 } 1505 1506 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1507 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1508 */ 1509 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1510 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1511 poller_remove_timer(poller->thread, poller); 1512 poller_insert_timer(poller->thread, poller, now_tick); 1513 } 1514 } 1515 1516 static void 1517 poller_interrupt_fini(struct spdk_poller *poller) 1518 { 1519 int fd; 1520 1521 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1522 assert(poller->intr != NULL); 1523 fd = poller->intr->efd; 1524 spdk_interrupt_unregister(&poller->intr); 1525 close(fd); 1526 } 1527 1528 static int 1529 busy_poller_interrupt_init(struct spdk_poller *poller) 1530 { 1531 int busy_efd; 1532 1533 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1534 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1535 if (busy_efd < 0) { 1536 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1537 return -errno; 1538 } 1539 1540 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1541 if (poller->intr == NULL) { 1542 close(busy_efd); 1543 return -1; 1544 } 1545 1546 return 0; 1547 } 1548 1549 static void 1550 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1551 { 1552 int busy_efd = poller->intr->efd; 1553 uint64_t notify = 1; 1554 int rc __attribute__((unused)); 1555 1556 assert(busy_efd >= 0); 1557 1558 if (interrupt_mode) { 1559 /* Write without read on eventfd will get it repeatedly triggered. */ 1560 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1561 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1562 } 1563 } else { 1564 /* Read on eventfd will clear its level triggering. */ 1565 rc = read(busy_efd, ¬ify, sizeof(notify)); 1566 } 1567 } 1568 1569 #else 1570 1571 static int 1572 period_poller_interrupt_init(struct spdk_poller *poller) 1573 { 1574 return -ENOTSUP; 1575 } 1576 1577 static void 1578 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1579 { 1580 } 1581 1582 static void 1583 poller_interrupt_fini(struct spdk_poller *poller) 1584 { 1585 } 1586 1587 static int 1588 busy_poller_interrupt_init(struct spdk_poller *poller) 1589 { 1590 return -ENOTSUP; 1591 } 1592 1593 static void 1594 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1595 { 1596 } 1597 1598 #endif 1599 1600 void 1601 spdk_poller_register_interrupt(struct spdk_poller *poller, 1602 spdk_poller_set_interrupt_mode_cb cb_fn, 1603 void *cb_arg) 1604 { 1605 assert(poller != NULL); 1606 assert(cb_fn != NULL); 1607 assert(spdk_get_thread() == poller->thread); 1608 1609 if (!spdk_interrupt_mode_is_enabled()) { 1610 return; 1611 } 1612 1613 /* If this poller already had an interrupt, clean the old one up. */ 1614 if (poller->intr != NULL) { 1615 poller_interrupt_fini(poller); 1616 } 1617 1618 poller->set_intr_cb_fn = cb_fn; 1619 poller->set_intr_cb_arg = cb_arg; 1620 1621 /* Set poller into interrupt mode if thread is in interrupt. */ 1622 if (poller->thread->in_interrupt) { 1623 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1624 } 1625 } 1626 1627 static uint64_t 1628 convert_us_to_ticks(uint64_t us) 1629 { 1630 uint64_t quotient, remainder, ticks; 1631 1632 if (us) { 1633 quotient = us / SPDK_SEC_TO_USEC; 1634 remainder = us % SPDK_SEC_TO_USEC; 1635 ticks = spdk_get_ticks_hz(); 1636 1637 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1638 } else { 1639 return 0; 1640 } 1641 } 1642 1643 static struct spdk_poller * 1644 poller_register(spdk_poller_fn fn, 1645 void *arg, 1646 uint64_t period_microseconds, 1647 const char *name) 1648 { 1649 struct spdk_thread *thread; 1650 struct spdk_poller *poller; 1651 1652 thread = spdk_get_thread(); 1653 if (!thread) { 1654 assert(false); 1655 return NULL; 1656 } 1657 1658 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1659 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1660 return NULL; 1661 } 1662 1663 poller = calloc(1, sizeof(*poller)); 1664 if (poller == NULL) { 1665 SPDK_ERRLOG("Poller memory allocation failed\n"); 1666 return NULL; 1667 } 1668 1669 if (name) { 1670 snprintf(poller->name, sizeof(poller->name), "%s", name); 1671 } else { 1672 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1673 } 1674 1675 poller->state = SPDK_POLLER_STATE_WAITING; 1676 poller->fn = fn; 1677 poller->arg = arg; 1678 poller->thread = thread; 1679 poller->intr = NULL; 1680 if (thread->next_poller_id == 0) { 1681 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1682 thread->next_poller_id = 1; 1683 } 1684 poller->id = thread->next_poller_id++; 1685 1686 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1687 1688 if (spdk_interrupt_mode_is_enabled()) { 1689 int rc; 1690 1691 if (period_microseconds) { 1692 rc = period_poller_interrupt_init(poller); 1693 if (rc < 0) { 1694 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1695 free(poller); 1696 return NULL; 1697 } 1698 1699 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1700 poller->set_intr_cb_arg = NULL; 1701 1702 } else { 1703 /* If the poller doesn't have a period, create interruptfd that's always 1704 * busy automatically when running in interrupt mode. 1705 */ 1706 rc = busy_poller_interrupt_init(poller); 1707 if (rc > 0) { 1708 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1709 free(poller); 1710 return NULL; 1711 } 1712 1713 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1714 poller->set_intr_cb_arg = NULL; 1715 } 1716 1717 /* Set poller into interrupt mode if thread is in interrupt. */ 1718 if (poller->thread->in_interrupt) { 1719 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1720 } 1721 } 1722 1723 thread_insert_poller(thread, poller); 1724 1725 return poller; 1726 } 1727 1728 struct spdk_poller * 1729 spdk_poller_register(spdk_poller_fn fn, 1730 void *arg, 1731 uint64_t period_microseconds) 1732 { 1733 return poller_register(fn, arg, period_microseconds, NULL); 1734 } 1735 1736 struct spdk_poller * 1737 spdk_poller_register_named(spdk_poller_fn fn, 1738 void *arg, 1739 uint64_t period_microseconds, 1740 const char *name) 1741 { 1742 return poller_register(fn, arg, period_microseconds, name); 1743 } 1744 1745 static void 1746 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1747 struct spdk_thread *curthread) 1748 { 1749 if (thread == NULL) { 1750 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1751 abort(); 1752 } 1753 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1754 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1755 thread->name, thread->id); 1756 assert(false); 1757 } 1758 1759 void 1760 spdk_poller_unregister(struct spdk_poller **ppoller) 1761 { 1762 struct spdk_thread *thread; 1763 struct spdk_poller *poller; 1764 1765 poller = *ppoller; 1766 if (poller == NULL) { 1767 return; 1768 } 1769 1770 *ppoller = NULL; 1771 1772 thread = spdk_get_thread(); 1773 if (!thread) { 1774 assert(false); 1775 return; 1776 } 1777 1778 if (poller->thread != thread) { 1779 wrong_thread(__func__, poller->name, poller->thread, thread); 1780 return; 1781 } 1782 1783 if (spdk_interrupt_mode_is_enabled()) { 1784 /* Release the interrupt resource for period or busy poller */ 1785 if (poller->intr != NULL) { 1786 poller_interrupt_fini(poller); 1787 } 1788 1789 /* If there is not already a pending poller removal, generate 1790 * a message to go process removals. */ 1791 if (!thread->poller_unregistered) { 1792 thread->poller_unregistered = true; 1793 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1794 } 1795 } 1796 1797 /* If the poller was paused, put it on the active_pollers list so that 1798 * its unregistration can be processed by spdk_thread_poll(). 1799 */ 1800 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1801 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1802 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1803 poller->period_ticks = 0; 1804 } 1805 1806 /* Simply set the state to unregistered. The poller will get cleaned up 1807 * in a subsequent call to spdk_thread_poll(). 1808 */ 1809 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1810 } 1811 1812 void 1813 spdk_poller_pause(struct spdk_poller *poller) 1814 { 1815 struct spdk_thread *thread; 1816 1817 thread = spdk_get_thread(); 1818 if (!thread) { 1819 assert(false); 1820 return; 1821 } 1822 1823 if (poller->thread != thread) { 1824 wrong_thread(__func__, poller->name, poller->thread, thread); 1825 return; 1826 } 1827 1828 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1829 * spdk_thread_poll() move it. It allows a poller to be paused from 1830 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1831 * iteration, or from within itself without breaking the logic to always 1832 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1833 */ 1834 switch (poller->state) { 1835 case SPDK_POLLER_STATE_PAUSED: 1836 case SPDK_POLLER_STATE_PAUSING: 1837 break; 1838 case SPDK_POLLER_STATE_RUNNING: 1839 case SPDK_POLLER_STATE_WAITING: 1840 poller->state = SPDK_POLLER_STATE_PAUSING; 1841 break; 1842 default: 1843 assert(false); 1844 break; 1845 } 1846 } 1847 1848 void 1849 spdk_poller_resume(struct spdk_poller *poller) 1850 { 1851 struct spdk_thread *thread; 1852 1853 thread = spdk_get_thread(); 1854 if (!thread) { 1855 assert(false); 1856 return; 1857 } 1858 1859 if (poller->thread != thread) { 1860 wrong_thread(__func__, poller->name, poller->thread, thread); 1861 return; 1862 } 1863 1864 /* If a poller is paused it has to be removed from the paused pollers 1865 * list and put on the active list or timer tree depending on its 1866 * period_ticks. If a poller is still in the process of being paused, 1867 * we just need to flip its state back to waiting, as it's already on 1868 * the appropriate list or tree. 1869 */ 1870 switch (poller->state) { 1871 case SPDK_POLLER_STATE_PAUSED: 1872 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1873 thread_insert_poller(thread, poller); 1874 /* fallthrough */ 1875 case SPDK_POLLER_STATE_PAUSING: 1876 poller->state = SPDK_POLLER_STATE_WAITING; 1877 break; 1878 case SPDK_POLLER_STATE_RUNNING: 1879 case SPDK_POLLER_STATE_WAITING: 1880 break; 1881 default: 1882 assert(false); 1883 break; 1884 } 1885 } 1886 1887 const char * 1888 spdk_poller_get_name(struct spdk_poller *poller) 1889 { 1890 return poller->name; 1891 } 1892 1893 uint64_t 1894 spdk_poller_get_id(struct spdk_poller *poller) 1895 { 1896 return poller->id; 1897 } 1898 1899 const char * 1900 spdk_poller_get_state_str(struct spdk_poller *poller) 1901 { 1902 switch (poller->state) { 1903 case SPDK_POLLER_STATE_WAITING: 1904 return "waiting"; 1905 case SPDK_POLLER_STATE_RUNNING: 1906 return "running"; 1907 case SPDK_POLLER_STATE_UNREGISTERED: 1908 return "unregistered"; 1909 case SPDK_POLLER_STATE_PAUSING: 1910 return "pausing"; 1911 case SPDK_POLLER_STATE_PAUSED: 1912 return "paused"; 1913 default: 1914 return NULL; 1915 } 1916 } 1917 1918 uint64_t 1919 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1920 { 1921 return poller->period_ticks; 1922 } 1923 1924 void 1925 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1926 { 1927 stats->run_count = poller->run_count; 1928 stats->busy_count = poller->busy_count; 1929 } 1930 1931 struct spdk_poller * 1932 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1933 { 1934 return TAILQ_FIRST(&thread->active_pollers); 1935 } 1936 1937 struct spdk_poller * 1938 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1939 { 1940 return TAILQ_NEXT(prev, tailq); 1941 } 1942 1943 struct spdk_poller * 1944 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1945 { 1946 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1947 } 1948 1949 struct spdk_poller * 1950 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1951 { 1952 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1953 } 1954 1955 struct spdk_poller * 1956 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1957 { 1958 return TAILQ_FIRST(&thread->paused_pollers); 1959 } 1960 1961 struct spdk_poller * 1962 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1963 { 1964 return TAILQ_NEXT(prev, tailq); 1965 } 1966 1967 struct spdk_io_channel * 1968 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1969 { 1970 return RB_MIN(io_channel_tree, &thread->io_channels); 1971 } 1972 1973 struct spdk_io_channel * 1974 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1975 { 1976 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1977 } 1978 1979 struct call_thread { 1980 struct spdk_thread *cur_thread; 1981 spdk_msg_fn fn; 1982 void *ctx; 1983 1984 struct spdk_thread *orig_thread; 1985 spdk_msg_fn cpl; 1986 }; 1987 1988 static void 1989 _back_to_orig_thread(void *ctx) 1990 { 1991 struct call_thread *ct = ctx; 1992 1993 assert(ct->orig_thread->for_each_count > 0); 1994 ct->orig_thread->for_each_count--; 1995 1996 ct->cpl(ct->ctx); 1997 free(ctx); 1998 } 1999 2000 static void 2001 _on_thread(void *ctx) 2002 { 2003 struct call_thread *ct = ctx; 2004 int rc __attribute__((unused)); 2005 2006 ct->fn(ct->ctx); 2007 2008 pthread_mutex_lock(&g_devlist_mutex); 2009 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2010 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 2011 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 2012 ct->cur_thread->name); 2013 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2014 } 2015 pthread_mutex_unlock(&g_devlist_mutex); 2016 2017 if (!ct->cur_thread) { 2018 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 2019 2020 rc = spdk_thread_send_msg(ct->orig_thread, _back_to_orig_thread, ctx); 2021 } else { 2022 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 2023 ct->cur_thread->name); 2024 2025 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 2026 } 2027 assert(rc == 0); 2028 } 2029 2030 void 2031 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 2032 { 2033 struct call_thread *ct; 2034 struct spdk_thread *thread; 2035 int rc __attribute__((unused)); 2036 2037 ct = calloc(1, sizeof(*ct)); 2038 if (!ct) { 2039 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2040 cpl(ctx); 2041 return; 2042 } 2043 2044 ct->fn = fn; 2045 ct->ctx = ctx; 2046 ct->cpl = cpl; 2047 2048 thread = _get_thread(); 2049 if (!thread) { 2050 SPDK_ERRLOG("No thread allocated\n"); 2051 free(ct); 2052 cpl(ctx); 2053 return; 2054 } 2055 ct->orig_thread = thread; 2056 2057 ct->orig_thread->for_each_count++; 2058 2059 pthread_mutex_lock(&g_devlist_mutex); 2060 ct->cur_thread = TAILQ_FIRST(&g_threads); 2061 pthread_mutex_unlock(&g_devlist_mutex); 2062 2063 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2064 ct->orig_thread->name); 2065 2066 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2067 assert(rc == 0); 2068 } 2069 2070 static inline void 2071 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2072 { 2073 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2074 return; 2075 } 2076 2077 if (!poller->set_intr_cb_fn) { 2078 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2079 assert(false); 2080 return; 2081 } 2082 2083 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2084 } 2085 2086 void 2087 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2088 { 2089 struct spdk_thread *thread = _get_thread(); 2090 struct spdk_poller *poller, *tmp; 2091 2092 assert(thread); 2093 assert(spdk_interrupt_mode_is_enabled()); 2094 2095 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2096 thread->name, enable_interrupt ? "intr" : "poll", 2097 thread->in_interrupt ? "intr" : "poll"); 2098 2099 if (thread->in_interrupt == enable_interrupt) { 2100 return; 2101 } 2102 2103 /* Set pollers to expected mode */ 2104 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2105 poller_set_interrupt_mode(poller, enable_interrupt); 2106 } 2107 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2108 poller_set_interrupt_mode(poller, enable_interrupt); 2109 } 2110 /* All paused pollers will go to work in interrupt mode */ 2111 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2112 poller_set_interrupt_mode(poller, enable_interrupt); 2113 } 2114 2115 thread->in_interrupt = enable_interrupt; 2116 return; 2117 } 2118 2119 static struct io_device * 2120 io_device_get(void *io_device) 2121 { 2122 struct io_device find = {}; 2123 2124 find.io_device = io_device; 2125 return RB_FIND(io_device_tree, &g_io_devices, &find); 2126 } 2127 2128 void 2129 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2130 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2131 const char *name) 2132 { 2133 struct io_device *dev, *tmp; 2134 struct spdk_thread *thread; 2135 2136 assert(io_device != NULL); 2137 assert(create_cb != NULL); 2138 assert(destroy_cb != NULL); 2139 2140 thread = spdk_get_thread(); 2141 if (!thread) { 2142 SPDK_ERRLOG("called from non-SPDK thread\n"); 2143 assert(false); 2144 return; 2145 } 2146 2147 dev = calloc(1, sizeof(struct io_device)); 2148 if (dev == NULL) { 2149 SPDK_ERRLOG("could not allocate io_device\n"); 2150 return; 2151 } 2152 2153 dev->io_device = io_device; 2154 if (name) { 2155 snprintf(dev->name, sizeof(dev->name), "%s", name); 2156 } else { 2157 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2158 } 2159 dev->create_cb = create_cb; 2160 dev->destroy_cb = destroy_cb; 2161 dev->unregister_cb = NULL; 2162 dev->ctx_size = ctx_size; 2163 dev->for_each_count = 0; 2164 dev->unregistered = false; 2165 dev->refcnt = 0; 2166 2167 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2168 dev->name, dev->io_device, thread->name); 2169 2170 pthread_mutex_lock(&g_devlist_mutex); 2171 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2172 if (tmp != NULL) { 2173 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2174 io_device, tmp->name, dev->name); 2175 free(dev); 2176 } 2177 2178 pthread_mutex_unlock(&g_devlist_mutex); 2179 } 2180 2181 static void 2182 _finish_unregister(void *arg) 2183 { 2184 struct io_device *dev = arg; 2185 struct spdk_thread *thread; 2186 2187 thread = spdk_get_thread(); 2188 assert(thread == dev->unregister_thread); 2189 2190 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2191 dev->name, dev->io_device, thread->name); 2192 2193 assert(thread->pending_unregister_count > 0); 2194 thread->pending_unregister_count--; 2195 2196 dev->unregister_cb(dev->io_device); 2197 free(dev); 2198 } 2199 2200 static void 2201 io_device_free(struct io_device *dev) 2202 { 2203 int rc __attribute__((unused)); 2204 2205 if (dev->unregister_cb == NULL) { 2206 free(dev); 2207 } else { 2208 assert(dev->unregister_thread != NULL); 2209 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2210 dev->name, dev->io_device, dev->unregister_thread->name); 2211 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2212 assert(rc == 0); 2213 } 2214 } 2215 2216 void 2217 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2218 { 2219 struct io_device *dev; 2220 uint32_t refcnt; 2221 struct spdk_thread *thread; 2222 2223 thread = spdk_get_thread(); 2224 if (!thread) { 2225 SPDK_ERRLOG("called from non-SPDK thread\n"); 2226 assert(false); 2227 return; 2228 } 2229 2230 pthread_mutex_lock(&g_devlist_mutex); 2231 dev = io_device_get(io_device); 2232 if (!dev) { 2233 SPDK_ERRLOG("io_device %p not found\n", io_device); 2234 assert(false); 2235 pthread_mutex_unlock(&g_devlist_mutex); 2236 return; 2237 } 2238 2239 /* The for_each_count check differentiates the user attempting to unregister the 2240 * device a second time, from the internal call to this function that occurs 2241 * after the for_each_count reaches 0. 2242 */ 2243 if (dev->pending_unregister && dev->for_each_count > 0) { 2244 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2245 assert(false); 2246 pthread_mutex_unlock(&g_devlist_mutex); 2247 return; 2248 } 2249 2250 dev->unregister_cb = unregister_cb; 2251 dev->unregister_thread = thread; 2252 2253 if (dev->for_each_count > 0) { 2254 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2255 dev->name, io_device, dev->for_each_count); 2256 dev->pending_unregister = true; 2257 pthread_mutex_unlock(&g_devlist_mutex); 2258 return; 2259 } 2260 2261 dev->unregistered = true; 2262 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2263 refcnt = dev->refcnt; 2264 pthread_mutex_unlock(&g_devlist_mutex); 2265 2266 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2267 dev->name, dev->io_device, thread->name); 2268 2269 if (unregister_cb) { 2270 thread->pending_unregister_count++; 2271 } 2272 2273 if (refcnt > 0) { 2274 /* defer deletion */ 2275 return; 2276 } 2277 2278 io_device_free(dev); 2279 } 2280 2281 const char * 2282 spdk_io_device_get_name(struct io_device *dev) 2283 { 2284 return dev->name; 2285 } 2286 2287 static struct spdk_io_channel * 2288 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2289 { 2290 struct spdk_io_channel find = {}; 2291 2292 find.dev = dev; 2293 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2294 } 2295 2296 struct spdk_io_channel * 2297 spdk_get_io_channel(void *io_device) 2298 { 2299 struct spdk_io_channel *ch; 2300 struct spdk_thread *thread; 2301 struct io_device *dev; 2302 int rc; 2303 2304 pthread_mutex_lock(&g_devlist_mutex); 2305 dev = io_device_get(io_device); 2306 if (dev == NULL) { 2307 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2308 pthread_mutex_unlock(&g_devlist_mutex); 2309 return NULL; 2310 } 2311 2312 thread = _get_thread(); 2313 if (!thread) { 2314 SPDK_ERRLOG("No thread allocated\n"); 2315 pthread_mutex_unlock(&g_devlist_mutex); 2316 return NULL; 2317 } 2318 2319 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2320 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2321 pthread_mutex_unlock(&g_devlist_mutex); 2322 return NULL; 2323 } 2324 2325 ch = thread_get_io_channel(thread, dev); 2326 if (ch != NULL) { 2327 ch->ref++; 2328 2329 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2330 ch, dev->name, dev->io_device, thread->name, ch->ref); 2331 2332 /* 2333 * An I/O channel already exists for this device on this 2334 * thread, so return it. 2335 */ 2336 pthread_mutex_unlock(&g_devlist_mutex); 2337 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2338 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2339 return ch; 2340 } 2341 2342 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2343 if (ch == NULL) { 2344 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2345 pthread_mutex_unlock(&g_devlist_mutex); 2346 return NULL; 2347 } 2348 2349 ch->dev = dev; 2350 ch->destroy_cb = dev->destroy_cb; 2351 ch->thread = thread; 2352 ch->ref = 1; 2353 ch->destroy_ref = 0; 2354 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2355 2356 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2357 ch, dev->name, dev->io_device, thread->name, ch->ref); 2358 2359 dev->refcnt++; 2360 2361 pthread_mutex_unlock(&g_devlist_mutex); 2362 2363 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2364 if (rc != 0) { 2365 pthread_mutex_lock(&g_devlist_mutex); 2366 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2367 dev->refcnt--; 2368 free(ch); 2369 SPDK_ERRLOG("could not create io_channel for io_device %s (%p): %s (rc=%d)\n", 2370 dev->name, io_device, spdk_strerror(-rc), rc); 2371 pthread_mutex_unlock(&g_devlist_mutex); 2372 return NULL; 2373 } 2374 2375 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2376 return ch; 2377 } 2378 2379 static void 2380 put_io_channel(void *arg) 2381 { 2382 struct spdk_io_channel *ch = arg; 2383 bool do_remove_dev = true; 2384 struct spdk_thread *thread; 2385 2386 thread = spdk_get_thread(); 2387 if (!thread) { 2388 SPDK_ERRLOG("called from non-SPDK thread\n"); 2389 assert(false); 2390 return; 2391 } 2392 2393 SPDK_DEBUGLOG(thread, 2394 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2395 ch, ch->dev->name, ch->dev->io_device, thread->name); 2396 2397 assert(ch->thread == thread); 2398 2399 ch->destroy_ref--; 2400 2401 if (ch->ref > 0 || ch->destroy_ref > 0) { 2402 /* 2403 * Another reference to the associated io_device was requested 2404 * after this message was sent but before it had a chance to 2405 * execute. 2406 */ 2407 return; 2408 } 2409 2410 pthread_mutex_lock(&g_devlist_mutex); 2411 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2412 pthread_mutex_unlock(&g_devlist_mutex); 2413 2414 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2415 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2416 2417 pthread_mutex_lock(&g_devlist_mutex); 2418 ch->dev->refcnt--; 2419 2420 if (!ch->dev->unregistered) { 2421 do_remove_dev = false; 2422 } 2423 2424 if (ch->dev->refcnt > 0) { 2425 do_remove_dev = false; 2426 } 2427 2428 pthread_mutex_unlock(&g_devlist_mutex); 2429 2430 if (do_remove_dev) { 2431 io_device_free(ch->dev); 2432 } 2433 free(ch); 2434 } 2435 2436 void 2437 spdk_put_io_channel(struct spdk_io_channel *ch) 2438 { 2439 struct spdk_thread *thread; 2440 int rc __attribute__((unused)); 2441 2442 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2443 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2444 2445 thread = spdk_get_thread(); 2446 if (!thread) { 2447 SPDK_ERRLOG("called from non-SPDK thread\n"); 2448 assert(false); 2449 return; 2450 } 2451 2452 if (ch->thread != thread) { 2453 wrong_thread(__func__, "ch", ch->thread, thread); 2454 return; 2455 } 2456 2457 SPDK_DEBUGLOG(thread, 2458 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2459 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2460 2461 ch->ref--; 2462 2463 if (ch->ref == 0) { 2464 ch->destroy_ref++; 2465 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2466 assert(rc == 0); 2467 } 2468 } 2469 2470 struct spdk_io_channel * 2471 spdk_io_channel_from_ctx(void *ctx) 2472 { 2473 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2474 } 2475 2476 struct spdk_thread * 2477 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2478 { 2479 return ch->thread; 2480 } 2481 2482 void * 2483 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2484 { 2485 return ch->dev->io_device; 2486 } 2487 2488 const char * 2489 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2490 { 2491 return spdk_io_device_get_name(ch->dev); 2492 } 2493 2494 int 2495 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2496 { 2497 return ch->ref; 2498 } 2499 2500 struct spdk_io_channel_iter { 2501 void *io_device; 2502 struct io_device *dev; 2503 spdk_channel_msg fn; 2504 int status; 2505 void *ctx; 2506 struct spdk_io_channel *ch; 2507 2508 struct spdk_thread *cur_thread; 2509 2510 struct spdk_thread *orig_thread; 2511 spdk_channel_for_each_cpl cpl; 2512 }; 2513 2514 void * 2515 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2516 { 2517 return i->io_device; 2518 } 2519 2520 struct spdk_io_channel * 2521 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2522 { 2523 return i->ch; 2524 } 2525 2526 void * 2527 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2528 { 2529 return i->ctx; 2530 } 2531 2532 static void 2533 _call_completion(void *ctx) 2534 { 2535 struct spdk_io_channel_iter *i = ctx; 2536 2537 assert(i->orig_thread->for_each_count > 0); 2538 i->orig_thread->for_each_count--; 2539 2540 if (i->cpl != NULL) { 2541 i->cpl(i, i->status); 2542 } 2543 free(i); 2544 } 2545 2546 static void 2547 _call_channel(void *ctx) 2548 { 2549 struct spdk_io_channel_iter *i = ctx; 2550 struct spdk_io_channel *ch; 2551 2552 /* 2553 * It is possible that the channel was deleted before this 2554 * message had a chance to execute. If so, skip calling 2555 * the fn() on this thread. 2556 */ 2557 pthread_mutex_lock(&g_devlist_mutex); 2558 ch = thread_get_io_channel(i->cur_thread, i->dev); 2559 pthread_mutex_unlock(&g_devlist_mutex); 2560 2561 if (ch) { 2562 i->fn(i); 2563 } else { 2564 spdk_for_each_channel_continue(i, 0); 2565 } 2566 } 2567 2568 void 2569 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2570 spdk_channel_for_each_cpl cpl) 2571 { 2572 struct spdk_thread *thread; 2573 struct spdk_io_channel *ch; 2574 struct spdk_io_channel_iter *i; 2575 int rc __attribute__((unused)); 2576 2577 i = calloc(1, sizeof(*i)); 2578 if (!i) { 2579 SPDK_ERRLOG("Unable to allocate iterator\n"); 2580 assert(false); 2581 return; 2582 } 2583 2584 i->io_device = io_device; 2585 i->fn = fn; 2586 i->ctx = ctx; 2587 i->cpl = cpl; 2588 i->orig_thread = _get_thread(); 2589 2590 i->orig_thread->for_each_count++; 2591 2592 pthread_mutex_lock(&g_devlist_mutex); 2593 i->dev = io_device_get(io_device); 2594 if (i->dev == NULL) { 2595 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2596 assert(false); 2597 i->status = -ENODEV; 2598 goto end; 2599 } 2600 2601 /* Do not allow new for_each operations if we are already waiting to unregister 2602 * the device for other for_each operations to complete. 2603 */ 2604 if (i->dev->pending_unregister) { 2605 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2606 i->status = -ENODEV; 2607 goto end; 2608 } 2609 2610 TAILQ_FOREACH(thread, &g_threads, tailq) { 2611 ch = thread_get_io_channel(thread, i->dev); 2612 if (ch != NULL) { 2613 ch->dev->for_each_count++; 2614 i->cur_thread = thread; 2615 i->ch = ch; 2616 pthread_mutex_unlock(&g_devlist_mutex); 2617 rc = spdk_thread_send_msg(thread, _call_channel, i); 2618 assert(rc == 0); 2619 return; 2620 } 2621 } 2622 2623 end: 2624 pthread_mutex_unlock(&g_devlist_mutex); 2625 2626 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2627 assert(rc == 0); 2628 } 2629 2630 static void 2631 __pending_unregister(void *arg) 2632 { 2633 struct io_device *dev = arg; 2634 2635 assert(dev->pending_unregister); 2636 assert(dev->for_each_count == 0); 2637 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2638 } 2639 2640 void 2641 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2642 { 2643 struct spdk_thread *thread; 2644 struct spdk_io_channel *ch; 2645 struct io_device *dev; 2646 int rc __attribute__((unused)); 2647 2648 assert(i->cur_thread == spdk_get_thread()); 2649 2650 i->status = status; 2651 2652 pthread_mutex_lock(&g_devlist_mutex); 2653 dev = i->dev; 2654 if (status) { 2655 goto end; 2656 } 2657 2658 thread = TAILQ_NEXT(i->cur_thread, tailq); 2659 while (thread) { 2660 ch = thread_get_io_channel(thread, dev); 2661 if (ch != NULL) { 2662 i->cur_thread = thread; 2663 i->ch = ch; 2664 pthread_mutex_unlock(&g_devlist_mutex); 2665 rc = spdk_thread_send_msg(thread, _call_channel, i); 2666 assert(rc == 0); 2667 return; 2668 } 2669 thread = TAILQ_NEXT(thread, tailq); 2670 } 2671 2672 end: 2673 dev->for_each_count--; 2674 i->ch = NULL; 2675 pthread_mutex_unlock(&g_devlist_mutex); 2676 2677 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2678 assert(rc == 0); 2679 2680 pthread_mutex_lock(&g_devlist_mutex); 2681 if (dev->pending_unregister && dev->for_each_count == 0) { 2682 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2683 assert(rc == 0); 2684 } 2685 pthread_mutex_unlock(&g_devlist_mutex); 2686 } 2687 2688 static void 2689 thread_interrupt_destroy(struct spdk_thread *thread) 2690 { 2691 struct spdk_fd_group *fgrp = thread->fgrp; 2692 2693 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2694 2695 if (thread->msg_fd < 0) { 2696 return; 2697 } 2698 2699 spdk_fd_group_remove(fgrp, thread->msg_fd); 2700 close(thread->msg_fd); 2701 thread->msg_fd = -1; 2702 2703 spdk_fd_group_destroy(fgrp); 2704 thread->fgrp = NULL; 2705 } 2706 2707 #ifdef __linux__ 2708 static int 2709 thread_interrupt_msg_process(void *arg) 2710 { 2711 struct spdk_thread *thread = arg; 2712 struct spdk_thread *orig_thread; 2713 uint32_t msg_count; 2714 spdk_msg_fn critical_msg; 2715 int rc = 0; 2716 uint64_t notify = 1; 2717 2718 assert(spdk_interrupt_mode_is_enabled()); 2719 2720 orig_thread = spdk_get_thread(); 2721 spdk_set_thread(thread); 2722 2723 /* There may be race between msg_acknowledge and another producer's msg_notify, 2724 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2725 * This can avoid msg notification missing. 2726 */ 2727 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2728 if (rc < 0 && errno != EAGAIN) { 2729 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2730 } 2731 2732 critical_msg = thread->critical_msg; 2733 if (spdk_unlikely(critical_msg != NULL)) { 2734 critical_msg(NULL); 2735 thread->critical_msg = NULL; 2736 rc = 1; 2737 } 2738 2739 msg_count = msg_queue_run_batch(thread, 0); 2740 if (msg_count) { 2741 rc = 1; 2742 } 2743 2744 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2745 if (spdk_unlikely(!thread->in_interrupt)) { 2746 /* The thread transitioned to poll mode in a msg during the above processing. 2747 * Clear msg_fd since thread messages will be polled directly in poll mode. 2748 */ 2749 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2750 if (rc < 0 && errno != EAGAIN) { 2751 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2752 } 2753 } 2754 2755 spdk_set_thread(orig_thread); 2756 return rc; 2757 } 2758 2759 static int 2760 thread_interrupt_create(struct spdk_thread *thread) 2761 { 2762 int rc; 2763 2764 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2765 2766 rc = spdk_fd_group_create(&thread->fgrp); 2767 if (rc) { 2768 return rc; 2769 } 2770 2771 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2772 if (thread->msg_fd < 0) { 2773 rc = -errno; 2774 spdk_fd_group_destroy(thread->fgrp); 2775 thread->fgrp = NULL; 2776 2777 return rc; 2778 } 2779 2780 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2781 thread_interrupt_msg_process, thread); 2782 } 2783 #else 2784 static int 2785 thread_interrupt_create(struct spdk_thread *thread) 2786 { 2787 return -ENOTSUP; 2788 } 2789 #endif 2790 2791 static int 2792 _interrupt_wrapper(void *ctx) 2793 { 2794 struct spdk_interrupt *intr = ctx; 2795 struct spdk_thread *orig_thread, *thread; 2796 int rc; 2797 2798 orig_thread = spdk_get_thread(); 2799 thread = intr->thread; 2800 2801 spdk_set_thread(thread); 2802 2803 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2804 intr->fn, intr->arg); 2805 2806 rc = intr->fn(intr->arg); 2807 2808 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2809 2810 spdk_set_thread(orig_thread); 2811 2812 return rc; 2813 } 2814 2815 struct spdk_interrupt * 2816 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2817 void *arg, const char *name) 2818 { 2819 return spdk_interrupt_register_for_events(efd, SPDK_INTERRUPT_EVENT_IN, fn, arg, name); 2820 } 2821 2822 struct spdk_interrupt * 2823 spdk_interrupt_register_for_events(int efd, uint32_t events, spdk_interrupt_fn fn, void *arg, 2824 const char *name) 2825 { 2826 struct spdk_thread *thread; 2827 struct spdk_interrupt *intr; 2828 int ret; 2829 2830 thread = spdk_get_thread(); 2831 if (!thread) { 2832 assert(false); 2833 return NULL; 2834 } 2835 2836 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2837 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2838 return NULL; 2839 } 2840 2841 intr = calloc(1, sizeof(*intr)); 2842 if (intr == NULL) { 2843 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2844 return NULL; 2845 } 2846 2847 if (name) { 2848 snprintf(intr->name, sizeof(intr->name), "%s", name); 2849 } else { 2850 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2851 } 2852 2853 intr->efd = efd; 2854 intr->thread = thread; 2855 intr->fn = fn; 2856 intr->arg = arg; 2857 2858 ret = spdk_fd_group_add_for_events(thread->fgrp, efd, events, _interrupt_wrapper, intr, intr->name); 2859 2860 if (ret != 0) { 2861 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2862 thread->name, efd, spdk_strerror(-ret)); 2863 free(intr); 2864 return NULL; 2865 } 2866 2867 return intr; 2868 } 2869 2870 void 2871 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2872 { 2873 struct spdk_thread *thread; 2874 struct spdk_interrupt *intr; 2875 2876 intr = *pintr; 2877 if (intr == NULL) { 2878 return; 2879 } 2880 2881 *pintr = NULL; 2882 2883 thread = spdk_get_thread(); 2884 if (!thread) { 2885 assert(false); 2886 return; 2887 } 2888 2889 if (intr->thread != thread) { 2890 wrong_thread(__func__, intr->name, intr->thread, thread); 2891 return; 2892 } 2893 2894 spdk_fd_group_remove(thread->fgrp, intr->efd); 2895 free(intr); 2896 } 2897 2898 int 2899 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2900 enum spdk_interrupt_event_types event_types) 2901 { 2902 struct spdk_thread *thread; 2903 2904 thread = spdk_get_thread(); 2905 if (!thread) { 2906 assert(false); 2907 return -EINVAL; 2908 } 2909 2910 if (intr->thread != thread) { 2911 wrong_thread(__func__, intr->name, intr->thread, thread); 2912 return -EINVAL; 2913 } 2914 2915 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2916 } 2917 2918 int 2919 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2920 { 2921 return spdk_fd_group_get_fd(thread->fgrp); 2922 } 2923 2924 struct spdk_fd_group * 2925 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2926 { 2927 return thread->fgrp; 2928 } 2929 2930 static bool g_interrupt_mode = false; 2931 2932 int 2933 spdk_interrupt_mode_enable(void) 2934 { 2935 /* It must be called once prior to initializing the threading library. 2936 * g_spdk_msg_mempool will be valid if thread library is initialized. 2937 */ 2938 if (g_spdk_msg_mempool) { 2939 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2940 return -1; 2941 } 2942 2943 #ifdef __linux__ 2944 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2945 g_interrupt_mode = true; 2946 return 0; 2947 #else 2948 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2949 g_interrupt_mode = false; 2950 return -ENOTSUP; 2951 #endif 2952 } 2953 2954 bool 2955 spdk_interrupt_mode_is_enabled(void) 2956 { 2957 return g_interrupt_mode; 2958 } 2959 2960 #define SSPIN_DEBUG_STACK_FRAMES 16 2961 2962 struct sspin_stack { 2963 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 2964 uint32_t depth; 2965 }; 2966 2967 struct spdk_spinlock_internal { 2968 struct sspin_stack init_stack; 2969 struct sspin_stack lock_stack; 2970 struct sspin_stack unlock_stack; 2971 }; 2972 2973 static void 2974 sspin_init_internal(struct spdk_spinlock *sspin) 2975 { 2976 #ifdef DEBUG 2977 sspin->internal = calloc(1, sizeof(*sspin->internal)); 2978 #endif 2979 } 2980 2981 static void 2982 sspin_fini_internal(struct spdk_spinlock *sspin) 2983 { 2984 #ifdef DEBUG 2985 free(sspin->internal); 2986 sspin->internal = NULL; 2987 #endif 2988 } 2989 2990 #if defined(DEBUG) && defined(SPDK_HAVE_EXECINFO_H) 2991 #define SSPIN_GET_STACK(sspin, which) \ 2992 do { \ 2993 if (sspin->internal != NULL) { \ 2994 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 2995 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 2996 } \ 2997 } while (0) 2998 #else 2999 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 3000 #endif 3001 3002 static void 3003 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 3004 { 3005 #ifdef SPDK_HAVE_EXECINFO_H 3006 char **stack; 3007 size_t i; 3008 3009 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 3010 if (stack == NULL) { 3011 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 3012 return; 3013 } 3014 SPDK_ERRLOG(" %s:\n", title); 3015 for (i = 0; i < sspin_stack->depth; i++) { 3016 /* 3017 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 3018 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 3019 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 3020 * gdb_macros.md in the docs directory for details. 3021 */ 3022 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 3023 } 3024 free(stack); 3025 #endif /* SPDK_HAVE_EXECINFO_H */ 3026 } 3027 3028 static void 3029 sspin_stacks_print(const struct spdk_spinlock *sspin) 3030 { 3031 if (sspin->internal == NULL) { 3032 return; 3033 } 3034 SPDK_ERRLOG("spinlock %p\n", sspin); 3035 sspin_stack_print("Lock initalized at", &sspin->internal->init_stack); 3036 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 3037 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 3038 } 3039 3040 void 3041 spdk_spin_init(struct spdk_spinlock *sspin) 3042 { 3043 int rc; 3044 3045 memset(sspin, 0, sizeof(*sspin)); 3046 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 3047 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3048 sspin_init_internal(sspin); 3049 SSPIN_GET_STACK(sspin, init); 3050 sspin->initialized = true; 3051 } 3052 3053 void 3054 spdk_spin_destroy(struct spdk_spinlock *sspin) 3055 { 3056 int rc; 3057 3058 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3059 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3060 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 3061 3062 rc = pthread_spin_destroy(&sspin->spinlock); 3063 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3064 3065 sspin_fini_internal(sspin); 3066 sspin->initialized = false; 3067 sspin->destroyed = true; 3068 } 3069 3070 void 3071 spdk_spin_lock(struct spdk_spinlock *sspin) 3072 { 3073 struct spdk_thread *thread = spdk_get_thread(); 3074 int rc; 3075 3076 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3077 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3078 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3079 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3080 3081 rc = pthread_spin_lock(&sspin->spinlock); 3082 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3083 3084 sspin->thread = thread; 3085 sspin->thread->lock_count++; 3086 3087 SSPIN_GET_STACK(sspin, lock); 3088 } 3089 3090 void 3091 spdk_spin_unlock(struct spdk_spinlock *sspin) 3092 { 3093 struct spdk_thread *thread = spdk_get_thread(); 3094 int rc; 3095 3096 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3097 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3098 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3099 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3100 3101 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3102 thread->lock_count--; 3103 sspin->thread = NULL; 3104 3105 SSPIN_GET_STACK(sspin, unlock); 3106 3107 rc = pthread_spin_unlock(&sspin->spinlock); 3108 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3109 } 3110 3111 bool 3112 spdk_spin_held(struct spdk_spinlock *sspin) 3113 { 3114 struct spdk_thread *thread = spdk_get_thread(); 3115 3116 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3117 3118 return sspin->thread == thread; 3119 } 3120 3121 SPDK_LOG_REGISTER_COMPONENT(thread) 3122