1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #endif 29 30 #ifdef SPDK_HAVE_EXECINFO_H 31 #include <execinfo.h> 32 #endif 33 34 #define SPDK_MSG_BATCH_SIZE 8 35 #define SPDK_MAX_DEVICE_NAME_LEN 256 36 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 37 #define SPDK_MAX_POLLER_NAME_LEN 256 38 #define SPDK_MAX_THREAD_NAME_LEN 256 39 40 static struct spdk_thread *g_app_thread; 41 42 struct spdk_interrupt { 43 int efd; 44 struct spdk_thread *thread; 45 spdk_interrupt_fn fn; 46 void *arg; 47 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 48 }; 49 50 enum spdk_poller_state { 51 /* The poller is registered with a thread but not currently executing its fn. */ 52 SPDK_POLLER_STATE_WAITING, 53 54 /* The poller is currently running its fn. */ 55 SPDK_POLLER_STATE_RUNNING, 56 57 /* The poller was unregistered during the execution of its fn. */ 58 SPDK_POLLER_STATE_UNREGISTERED, 59 60 /* The poller is in the process of being paused. It will be paused 61 * during the next time it's supposed to be executed. 62 */ 63 SPDK_POLLER_STATE_PAUSING, 64 65 /* The poller is registered but currently paused. It's on the 66 * paused_pollers list. 67 */ 68 SPDK_POLLER_STATE_PAUSED, 69 }; 70 71 struct spdk_poller { 72 TAILQ_ENTRY(spdk_poller) tailq; 73 RB_ENTRY(spdk_poller) node; 74 75 /* Current state of the poller; should only be accessed from the poller's thread. */ 76 enum spdk_poller_state state; 77 78 uint64_t period_ticks; 79 uint64_t next_run_tick; 80 uint64_t run_count; 81 uint64_t busy_count; 82 uint64_t id; 83 spdk_poller_fn fn; 84 void *arg; 85 struct spdk_thread *thread; 86 struct spdk_interrupt *intr; 87 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 88 void *set_intr_cb_arg; 89 90 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 91 }; 92 93 enum spdk_thread_state { 94 /* The thread is processing poller and message by spdk_thread_poll(). */ 95 SPDK_THREAD_STATE_RUNNING, 96 97 /* The thread is in the process of termination. It reaps unregistering 98 * poller are releasing I/O channel. 99 */ 100 SPDK_THREAD_STATE_EXITING, 101 102 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 103 SPDK_THREAD_STATE_EXITED, 104 }; 105 106 struct spdk_thread { 107 uint64_t tsc_last; 108 struct spdk_thread_stats stats; 109 /* 110 * Contains pollers actively running on this thread. Pollers 111 * are run round-robin. The thread takes one poller from the head 112 * of the ring, executes it, then puts it back at the tail of 113 * the ring. 114 */ 115 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 116 /** 117 * Contains pollers running on this thread with a periodic timer. 118 */ 119 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 120 struct spdk_poller *first_timed_poller; 121 /* 122 * Contains paused pollers. Pollers on this queue are waiting until 123 * they are resumed (in which case they're put onto the active/timer 124 * queues) or unregistered. 125 */ 126 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 127 struct spdk_ring *messages; 128 int msg_fd; 129 SLIST_HEAD(, spdk_msg) msg_cache; 130 size_t msg_cache_count; 131 spdk_msg_fn critical_msg; 132 uint64_t id; 133 uint64_t next_poller_id; 134 enum spdk_thread_state state; 135 int pending_unregister_count; 136 uint32_t for_each_count; 137 138 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 139 TAILQ_ENTRY(spdk_thread) tailq; 140 141 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 142 struct spdk_cpuset cpumask; 143 uint64_t exit_timeout_tsc; 144 145 int32_t lock_count; 146 147 /* spdk_thread is bound to current CPU core. */ 148 bool is_bound; 149 150 /* Indicates whether this spdk_thread currently runs in interrupt. */ 151 bool in_interrupt; 152 bool poller_unregistered; 153 struct spdk_fd_group *fgrp; 154 155 /* User context allocated at the end */ 156 uint8_t ctx[0]; 157 }; 158 159 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 160 161 static spdk_new_thread_fn g_new_thread_fn = NULL; 162 static spdk_thread_op_fn g_thread_op_fn = NULL; 163 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 164 static size_t g_ctx_sz = 0; 165 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 166 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 167 * SPDK application is required. 168 */ 169 static uint64_t g_thread_id = 1; 170 171 enum spin_error { 172 SPIN_ERR_NONE, 173 /* Trying to use an SPDK lock while not on an SPDK thread */ 174 SPIN_ERR_NOT_SPDK_THREAD, 175 /* Trying to lock a lock already held by this SPDK thread */ 176 SPIN_ERR_DEADLOCK, 177 /* Trying to unlock a lock not held by this SPDK thread */ 178 SPIN_ERR_WRONG_THREAD, 179 /* pthread_spin_*() returned an error */ 180 SPIN_ERR_PTHREAD, 181 /* Trying to destroy a lock that is held */ 182 SPIN_ERR_LOCK_HELD, 183 /* lock_count is invalid */ 184 SPIN_ERR_LOCK_COUNT, 185 /* 186 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 187 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 188 * deadlock when another SPDK thread on the same pthread tries to take that lock. 189 */ 190 SPIN_ERR_HOLD_DURING_SWITCH, 191 /* Trying to use a lock that was destroyed (but not re-initialized) */ 192 SPIN_ERR_DESTROYED, 193 /* Trying to use a lock that is not initialized */ 194 SPIN_ERR_NOT_INITIALIZED, 195 196 /* Must be last, not an actual error code */ 197 SPIN_ERR_LAST 198 }; 199 200 static const char *spin_error_strings[] = { 201 [SPIN_ERR_NONE] = "No error", 202 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 203 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 204 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 205 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 206 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 207 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 208 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 209 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 210 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 211 }; 212 213 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 214 ? "Unknown error" : spin_error_strings[err] 215 216 static void 217 __posix_abort(enum spin_error err) 218 { 219 abort(); 220 } 221 222 typedef void (*spin_abort)(enum spin_error err); 223 spin_abort g_spin_abort_fn = __posix_abort; 224 225 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 226 do { \ 227 if (spdk_unlikely(!(cond))) { \ 228 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 229 SPIN_ERROR_STRING(err), #cond); \ 230 extra_log; \ 231 g_spin_abort_fn(err); \ 232 ret; \ 233 } \ 234 } while (0) 235 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 236 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 237 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 238 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 239 240 struct io_device { 241 void *io_device; 242 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 243 spdk_io_channel_create_cb create_cb; 244 spdk_io_channel_destroy_cb destroy_cb; 245 spdk_io_device_unregister_cb unregister_cb; 246 struct spdk_thread *unregister_thread; 247 uint32_t ctx_size; 248 uint32_t for_each_count; 249 RB_ENTRY(io_device) node; 250 251 uint32_t refcnt; 252 253 bool pending_unregister; 254 bool unregistered; 255 }; 256 257 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 258 259 static int 260 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 261 { 262 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 263 } 264 265 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 266 267 static int 268 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 269 { 270 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 271 } 272 273 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 274 275 struct spdk_msg { 276 spdk_msg_fn fn; 277 void *arg; 278 279 SLIST_ENTRY(spdk_msg) link; 280 }; 281 282 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 283 284 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 285 static uint32_t g_thread_count = 0; 286 287 static __thread struct spdk_thread *tls_thread = NULL; 288 289 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 290 { 291 spdk_trace_register_description("THREAD_IOCH_GET", 292 TRACE_THREAD_IOCH_GET, 293 OWNER_TYPE_NONE, OBJECT_NONE, 0, 294 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 295 spdk_trace_register_description("THREAD_IOCH_PUT", 296 TRACE_THREAD_IOCH_PUT, 297 OWNER_TYPE_NONE, OBJECT_NONE, 0, 298 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 299 } 300 301 /* 302 * If this compare function returns zero when two next_run_ticks are equal, 303 * the macro RB_INSERT() returns a pointer to the element with the same 304 * next_run_tick. 305 * 306 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 307 * to remove as a parameter. 308 * 309 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 310 * side by returning 1 when two next_run_ticks are equal. 311 */ 312 static inline int 313 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 314 { 315 if (poller1->next_run_tick < poller2->next_run_tick) { 316 return -1; 317 } else { 318 return 1; 319 } 320 } 321 322 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 323 324 static inline struct spdk_thread * 325 _get_thread(void) 326 { 327 return tls_thread; 328 } 329 330 static int 331 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 332 { 333 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 334 335 g_ctx_sz = ctx_sz; 336 337 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 338 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 339 sizeof(struct spdk_msg), 340 0, /* No cache. We do our own. */ 341 SPDK_ENV_SOCKET_ID_ANY); 342 343 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 344 msg_mempool_sz); 345 346 if (!g_spdk_msg_mempool) { 347 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 348 return -ENOMEM; 349 } 350 351 return 0; 352 } 353 354 static void thread_interrupt_destroy(struct spdk_thread *thread); 355 static int thread_interrupt_create(struct spdk_thread *thread); 356 357 static void 358 _free_thread(struct spdk_thread *thread) 359 { 360 struct spdk_io_channel *ch; 361 struct spdk_msg *msg; 362 struct spdk_poller *poller, *ptmp; 363 364 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 365 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 366 thread->name, ch->dev->name); 367 } 368 369 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 370 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 371 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 372 poller->name); 373 } 374 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 375 free(poller); 376 } 377 378 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 379 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 380 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 381 poller->name); 382 } 383 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 384 free(poller); 385 } 386 387 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 388 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 389 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 390 free(poller); 391 } 392 393 pthread_mutex_lock(&g_devlist_mutex); 394 assert(g_thread_count > 0); 395 g_thread_count--; 396 TAILQ_REMOVE(&g_threads, thread, tailq); 397 pthread_mutex_unlock(&g_devlist_mutex); 398 399 msg = SLIST_FIRST(&thread->msg_cache); 400 while (msg != NULL) { 401 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 402 403 assert(thread->msg_cache_count > 0); 404 thread->msg_cache_count--; 405 spdk_mempool_put(g_spdk_msg_mempool, msg); 406 407 msg = SLIST_FIRST(&thread->msg_cache); 408 } 409 410 assert(thread->msg_cache_count == 0); 411 412 if (spdk_interrupt_mode_is_enabled()) { 413 thread_interrupt_destroy(thread); 414 } 415 416 spdk_ring_free(thread->messages); 417 free(thread); 418 } 419 420 int 421 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 422 { 423 assert(g_new_thread_fn == NULL); 424 assert(g_thread_op_fn == NULL); 425 426 if (new_thread_fn == NULL) { 427 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 428 } else { 429 g_new_thread_fn = new_thread_fn; 430 } 431 432 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 433 } 434 435 int 436 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 437 spdk_thread_op_supported_fn thread_op_supported_fn, 438 size_t ctx_sz, size_t msg_mempool_sz) 439 { 440 assert(g_new_thread_fn == NULL); 441 assert(g_thread_op_fn == NULL); 442 assert(g_thread_op_supported_fn == NULL); 443 444 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 445 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 446 return -EINVAL; 447 } 448 449 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 450 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 451 } else { 452 g_thread_op_fn = thread_op_fn; 453 g_thread_op_supported_fn = thread_op_supported_fn; 454 } 455 456 return _thread_lib_init(ctx_sz, msg_mempool_sz); 457 } 458 459 void 460 spdk_thread_lib_fini(void) 461 { 462 struct io_device *dev; 463 464 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 465 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 466 } 467 468 g_new_thread_fn = NULL; 469 g_thread_op_fn = NULL; 470 g_thread_op_supported_fn = NULL; 471 g_ctx_sz = 0; 472 if (g_app_thread != NULL) { 473 _free_thread(g_app_thread); 474 g_app_thread = NULL; 475 } 476 477 if (g_spdk_msg_mempool) { 478 spdk_mempool_free(g_spdk_msg_mempool); 479 g_spdk_msg_mempool = NULL; 480 } 481 } 482 483 struct spdk_thread * 484 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 485 { 486 struct spdk_thread *thread, *null_thread; 487 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 488 int rc = 0, i; 489 490 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 491 if (!thread) { 492 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 493 return NULL; 494 } 495 496 if (cpumask) { 497 spdk_cpuset_copy(&thread->cpumask, cpumask); 498 } else { 499 spdk_cpuset_negate(&thread->cpumask); 500 } 501 502 RB_INIT(&thread->io_channels); 503 TAILQ_INIT(&thread->active_pollers); 504 RB_INIT(&thread->timed_pollers); 505 TAILQ_INIT(&thread->paused_pollers); 506 SLIST_INIT(&thread->msg_cache); 507 thread->msg_cache_count = 0; 508 509 thread->tsc_last = spdk_get_ticks(); 510 511 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 512 * ID exceeds UINT64_MAX a warning message is logged 513 */ 514 thread->next_poller_id = 1; 515 516 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 517 if (!thread->messages) { 518 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 519 free(thread); 520 return NULL; 521 } 522 523 /* Fill the local message pool cache. */ 524 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 525 if (rc == 0) { 526 /* If we can't populate the cache it's ok. The cache will get filled 527 * up organically as messages are passed to the thread. */ 528 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 529 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 530 thread->msg_cache_count++; 531 } 532 } 533 534 if (name) { 535 snprintf(thread->name, sizeof(thread->name), "%s", name); 536 } else { 537 snprintf(thread->name, sizeof(thread->name), "%p", thread); 538 } 539 540 pthread_mutex_lock(&g_devlist_mutex); 541 if (g_thread_id == 0) { 542 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 543 pthread_mutex_unlock(&g_devlist_mutex); 544 _free_thread(thread); 545 return NULL; 546 } 547 thread->id = g_thread_id++; 548 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 549 g_thread_count++; 550 pthread_mutex_unlock(&g_devlist_mutex); 551 552 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 553 thread->id, thread->name); 554 555 if (spdk_interrupt_mode_is_enabled()) { 556 thread->in_interrupt = true; 557 rc = thread_interrupt_create(thread); 558 if (rc != 0) { 559 _free_thread(thread); 560 return NULL; 561 } 562 } 563 564 if (g_new_thread_fn) { 565 rc = g_new_thread_fn(thread); 566 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 567 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 568 } 569 570 if (rc != 0) { 571 _free_thread(thread); 572 return NULL; 573 } 574 575 thread->state = SPDK_THREAD_STATE_RUNNING; 576 577 /* If this is the first thread, save it as the app thread. Use an atomic 578 * compare + exchange to guard against crazy users who might try to 579 * call spdk_thread_create() simultaneously on multiple threads. 580 */ 581 null_thread = NULL; 582 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 583 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 584 585 return thread; 586 } 587 588 struct spdk_thread * 589 spdk_thread_get_app_thread(void) 590 { 591 return g_app_thread; 592 } 593 594 bool 595 spdk_thread_is_app_thread(struct spdk_thread *thread) 596 { 597 if (thread == NULL) { 598 thread = _get_thread(); 599 } 600 601 return g_app_thread == thread; 602 } 603 604 void 605 spdk_thread_bind(struct spdk_thread *thread, bool bind) 606 { 607 thread->is_bound = bind; 608 } 609 610 bool 611 spdk_thread_is_bound(struct spdk_thread *thread) 612 { 613 return thread->is_bound; 614 } 615 616 void 617 spdk_set_thread(struct spdk_thread *thread) 618 { 619 tls_thread = thread; 620 } 621 622 static void 623 thread_exit(struct spdk_thread *thread, uint64_t now) 624 { 625 struct spdk_poller *poller; 626 struct spdk_io_channel *ch; 627 628 if (now >= thread->exit_timeout_tsc) { 629 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 630 thread->name); 631 goto exited; 632 } 633 634 if (spdk_ring_count(thread->messages) > 0) { 635 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 636 return; 637 } 638 639 if (thread->for_each_count > 0) { 640 SPDK_INFOLOG(thread, "thread %s is still executing %u for_each_channels/threads\n", 641 thread->name, thread->for_each_count); 642 return; 643 } 644 645 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 646 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 647 SPDK_INFOLOG(thread, 648 "thread %s still has active poller %s\n", 649 thread->name, poller->name); 650 return; 651 } 652 } 653 654 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 655 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 656 SPDK_INFOLOG(thread, 657 "thread %s still has active timed poller %s\n", 658 thread->name, poller->name); 659 return; 660 } 661 } 662 663 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 664 SPDK_INFOLOG(thread, 665 "thread %s still has paused poller %s\n", 666 thread->name, poller->name); 667 return; 668 } 669 670 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 671 SPDK_INFOLOG(thread, 672 "thread %s still has channel for io_device %s\n", 673 thread->name, ch->dev->name); 674 return; 675 } 676 677 if (thread->pending_unregister_count > 0) { 678 SPDK_INFOLOG(thread, 679 "thread %s is still unregistering io_devices\n", 680 thread->name); 681 return; 682 } 683 684 exited: 685 thread->state = SPDK_THREAD_STATE_EXITED; 686 if (spdk_unlikely(thread->in_interrupt)) { 687 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 688 } 689 } 690 691 static void _thread_exit(void *ctx); 692 693 int 694 spdk_thread_exit(struct spdk_thread *thread) 695 { 696 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 697 698 assert(tls_thread == thread); 699 700 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 701 SPDK_INFOLOG(thread, 702 "thread %s is already exiting\n", 703 thread->name); 704 return 0; 705 } 706 707 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 708 SPDK_THREAD_EXIT_TIMEOUT_SEC); 709 thread->state = SPDK_THREAD_STATE_EXITING; 710 711 if (spdk_interrupt_mode_is_enabled()) { 712 spdk_thread_send_msg(thread, _thread_exit, thread); 713 } 714 715 return 0; 716 } 717 718 bool 719 spdk_thread_is_running(struct spdk_thread *thread) 720 { 721 return thread->state == SPDK_THREAD_STATE_RUNNING; 722 } 723 724 bool 725 spdk_thread_is_exited(struct spdk_thread *thread) 726 { 727 return thread->state == SPDK_THREAD_STATE_EXITED; 728 } 729 730 void 731 spdk_thread_destroy(struct spdk_thread *thread) 732 { 733 assert(thread != NULL); 734 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 735 736 assert(thread->state == SPDK_THREAD_STATE_EXITED); 737 738 if (tls_thread == thread) { 739 tls_thread = NULL; 740 } 741 742 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 743 if (thread != g_app_thread) { 744 _free_thread(thread); 745 } 746 } 747 748 void * 749 spdk_thread_get_ctx(struct spdk_thread *thread) 750 { 751 if (g_ctx_sz > 0) { 752 return thread->ctx; 753 } 754 755 return NULL; 756 } 757 758 struct spdk_cpuset * 759 spdk_thread_get_cpumask(struct spdk_thread *thread) 760 { 761 return &thread->cpumask; 762 } 763 764 int 765 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 766 { 767 struct spdk_thread *thread; 768 769 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 770 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 771 assert(false); 772 return -ENOTSUP; 773 } 774 775 thread = spdk_get_thread(); 776 if (!thread) { 777 SPDK_ERRLOG("Called from non-SPDK thread\n"); 778 assert(false); 779 return -EINVAL; 780 } 781 782 spdk_cpuset_copy(&thread->cpumask, cpumask); 783 784 /* Invoke framework's reschedule operation. If this function is called multiple times 785 * in a single spdk_thread_poll() context, the last cpumask will be used in the 786 * reschedule operation. 787 */ 788 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 789 790 return 0; 791 } 792 793 struct spdk_thread * 794 spdk_thread_get_from_ctx(void *ctx) 795 { 796 if (ctx == NULL) { 797 assert(false); 798 return NULL; 799 } 800 801 assert(g_ctx_sz > 0); 802 803 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 804 } 805 806 static inline uint32_t 807 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 808 { 809 unsigned count, i; 810 void *messages[SPDK_MSG_BATCH_SIZE]; 811 uint64_t notify = 1; 812 int rc; 813 814 #ifdef DEBUG 815 /* 816 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 817 * so we will never actually read uninitialized data from events, but just to be sure 818 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 819 */ 820 memset(messages, 0, sizeof(messages)); 821 #endif 822 823 if (max_msgs > 0) { 824 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 825 } else { 826 max_msgs = SPDK_MSG_BATCH_SIZE; 827 } 828 829 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 830 if (spdk_unlikely(thread->in_interrupt) && 831 spdk_ring_count(thread->messages) != 0) { 832 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 833 if (rc < 0) { 834 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 835 } 836 } 837 if (count == 0) { 838 return 0; 839 } 840 841 for (i = 0; i < count; i++) { 842 struct spdk_msg *msg = messages[i]; 843 844 assert(msg != NULL); 845 846 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 847 848 msg->fn(msg->arg); 849 850 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 851 852 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 853 /* Insert the messages at the head. We want to re-use the hot 854 * ones. */ 855 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 856 thread->msg_cache_count++; 857 } else { 858 spdk_mempool_put(g_spdk_msg_mempool, msg); 859 } 860 } 861 862 return count; 863 } 864 865 static void 866 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 867 { 868 struct spdk_poller *tmp __attribute__((unused)); 869 870 poller->next_run_tick = now + poller->period_ticks; 871 872 /* 873 * Insert poller in the thread's timed_pollers tree by next scheduled run time 874 * as its key. 875 */ 876 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 877 assert(tmp == NULL); 878 879 /* Update the cache only if it is empty or the inserted poller is earlier than it. 880 * RB_MIN() is not necessary here because all pollers, which has exactly the same 881 * next_run_tick as the existing poller, are inserted on the right side. 882 */ 883 if (thread->first_timed_poller == NULL || 884 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 885 thread->first_timed_poller = poller; 886 } 887 } 888 889 static inline void 890 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 891 { 892 struct spdk_poller *tmp __attribute__((unused)); 893 894 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 895 assert(tmp != NULL); 896 897 /* This function is not used in any case that is performance critical. 898 * Update the cache simply by RB_MIN() if it needs to be changed. 899 */ 900 if (thread->first_timed_poller == poller) { 901 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 902 } 903 } 904 905 static void 906 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 907 { 908 if (poller->period_ticks) { 909 poller_insert_timer(thread, poller, spdk_get_ticks()); 910 } else { 911 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 912 } 913 } 914 915 static inline void 916 thread_update_stats(struct spdk_thread *thread, uint64_t end, 917 uint64_t start, int rc) 918 { 919 if (rc == 0) { 920 /* Poller status idle */ 921 thread->stats.idle_tsc += end - start; 922 } else if (rc > 0) { 923 /* Poller status busy */ 924 thread->stats.busy_tsc += end - start; 925 } 926 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 927 thread->tsc_last = end; 928 } 929 930 static inline int 931 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 932 { 933 int rc; 934 935 switch (poller->state) { 936 case SPDK_POLLER_STATE_UNREGISTERED: 937 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 938 free(poller); 939 return 0; 940 case SPDK_POLLER_STATE_PAUSING: 941 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 942 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 943 poller->state = SPDK_POLLER_STATE_PAUSED; 944 return 0; 945 case SPDK_POLLER_STATE_WAITING: 946 break; 947 default: 948 assert(false); 949 break; 950 } 951 952 poller->state = SPDK_POLLER_STATE_RUNNING; 953 rc = poller->fn(poller->arg); 954 955 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 956 957 poller->run_count++; 958 if (rc > 0) { 959 poller->busy_count++; 960 } 961 962 #ifdef DEBUG 963 if (rc == -1) { 964 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 965 } 966 #endif 967 968 switch (poller->state) { 969 case SPDK_POLLER_STATE_UNREGISTERED: 970 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 971 free(poller); 972 break; 973 case SPDK_POLLER_STATE_PAUSING: 974 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 975 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 976 poller->state = SPDK_POLLER_STATE_PAUSED; 977 break; 978 case SPDK_POLLER_STATE_PAUSED: 979 case SPDK_POLLER_STATE_WAITING: 980 break; 981 case SPDK_POLLER_STATE_RUNNING: 982 poller->state = SPDK_POLLER_STATE_WAITING; 983 break; 984 default: 985 assert(false); 986 break; 987 } 988 989 return rc; 990 } 991 992 static inline int 993 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 994 uint64_t now) 995 { 996 int rc; 997 998 switch (poller->state) { 999 case SPDK_POLLER_STATE_UNREGISTERED: 1000 free(poller); 1001 return 0; 1002 case SPDK_POLLER_STATE_PAUSING: 1003 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1004 poller->state = SPDK_POLLER_STATE_PAUSED; 1005 return 0; 1006 case SPDK_POLLER_STATE_WAITING: 1007 break; 1008 default: 1009 assert(false); 1010 break; 1011 } 1012 1013 poller->state = SPDK_POLLER_STATE_RUNNING; 1014 rc = poller->fn(poller->arg); 1015 1016 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1017 1018 poller->run_count++; 1019 if (rc > 0) { 1020 poller->busy_count++; 1021 } 1022 1023 #ifdef DEBUG 1024 if (rc == -1) { 1025 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1026 } 1027 #endif 1028 1029 switch (poller->state) { 1030 case SPDK_POLLER_STATE_UNREGISTERED: 1031 free(poller); 1032 break; 1033 case SPDK_POLLER_STATE_PAUSING: 1034 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1035 poller->state = SPDK_POLLER_STATE_PAUSED; 1036 break; 1037 case SPDK_POLLER_STATE_PAUSED: 1038 break; 1039 case SPDK_POLLER_STATE_RUNNING: 1040 poller->state = SPDK_POLLER_STATE_WAITING; 1041 /* fallthrough */ 1042 case SPDK_POLLER_STATE_WAITING: 1043 poller_insert_timer(thread, poller, now); 1044 break; 1045 default: 1046 assert(false); 1047 break; 1048 } 1049 1050 return rc; 1051 } 1052 1053 static int 1054 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1055 { 1056 uint32_t msg_count; 1057 struct spdk_poller *poller, *tmp; 1058 spdk_msg_fn critical_msg; 1059 int rc = 0; 1060 1061 thread->tsc_last = now; 1062 1063 critical_msg = thread->critical_msg; 1064 if (spdk_unlikely(critical_msg != NULL)) { 1065 critical_msg(NULL); 1066 thread->critical_msg = NULL; 1067 rc = 1; 1068 } 1069 1070 msg_count = msg_queue_run_batch(thread, max_msgs); 1071 if (msg_count) { 1072 rc = 1; 1073 } 1074 1075 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1076 active_pollers_head, tailq, tmp) { 1077 int poller_rc; 1078 1079 poller_rc = thread_execute_poller(thread, poller); 1080 if (poller_rc > rc) { 1081 rc = poller_rc; 1082 } 1083 } 1084 1085 poller = thread->first_timed_poller; 1086 while (poller != NULL) { 1087 int timer_rc = 0; 1088 1089 if (now < poller->next_run_tick) { 1090 break; 1091 } 1092 1093 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1094 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1095 1096 /* Update the cache to the next timed poller in the list 1097 * only if the current poller is still the closest, otherwise, 1098 * do nothing because the cache has been already updated. 1099 */ 1100 if (thread->first_timed_poller == poller) { 1101 thread->first_timed_poller = tmp; 1102 } 1103 1104 timer_rc = thread_execute_timed_poller(thread, poller, now); 1105 if (timer_rc > rc) { 1106 rc = timer_rc; 1107 } 1108 1109 poller = tmp; 1110 } 1111 1112 return rc; 1113 } 1114 1115 static void 1116 _thread_remove_pollers(void *ctx) 1117 { 1118 struct spdk_thread *thread = ctx; 1119 struct spdk_poller *poller, *tmp; 1120 1121 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1122 active_pollers_head, tailq, tmp) { 1123 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1124 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1125 free(poller); 1126 } 1127 } 1128 1129 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1130 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1131 poller_remove_timer(thread, poller); 1132 free(poller); 1133 } 1134 } 1135 1136 thread->poller_unregistered = false; 1137 } 1138 1139 static void 1140 _thread_exit(void *ctx) 1141 { 1142 struct spdk_thread *thread = ctx; 1143 1144 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1145 1146 thread_exit(thread, spdk_get_ticks()); 1147 1148 if (thread->state != SPDK_THREAD_STATE_EXITED) { 1149 spdk_thread_send_msg(thread, _thread_exit, thread); 1150 } 1151 } 1152 1153 int 1154 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1155 { 1156 struct spdk_thread *orig_thread; 1157 int rc; 1158 1159 orig_thread = _get_thread(); 1160 tls_thread = thread; 1161 1162 if (now == 0) { 1163 now = spdk_get_ticks(); 1164 } 1165 1166 if (spdk_likely(!thread->in_interrupt)) { 1167 rc = thread_poll(thread, max_msgs, now); 1168 if (spdk_unlikely(thread->in_interrupt)) { 1169 /* The thread transitioned to interrupt mode during the above poll. 1170 * Poll it one more time in case that during the transition time 1171 * there is msg received without notification. 1172 */ 1173 rc = thread_poll(thread, max_msgs, now); 1174 } 1175 1176 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1177 thread_exit(thread, now); 1178 } 1179 } else { 1180 /* Non-block wait on thread's fd_group */ 1181 rc = spdk_fd_group_wait(thread->fgrp, 0); 1182 } 1183 1184 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1185 1186 tls_thread = orig_thread; 1187 1188 return rc; 1189 } 1190 1191 uint64_t 1192 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1193 { 1194 struct spdk_poller *poller; 1195 1196 poller = thread->first_timed_poller; 1197 if (poller) { 1198 return poller->next_run_tick; 1199 } 1200 1201 return 0; 1202 } 1203 1204 int 1205 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1206 { 1207 return !TAILQ_EMPTY(&thread->active_pollers); 1208 } 1209 1210 static bool 1211 thread_has_unpaused_pollers(struct spdk_thread *thread) 1212 { 1213 if (TAILQ_EMPTY(&thread->active_pollers) && 1214 RB_EMPTY(&thread->timed_pollers)) { 1215 return false; 1216 } 1217 1218 return true; 1219 } 1220 1221 bool 1222 spdk_thread_has_pollers(struct spdk_thread *thread) 1223 { 1224 if (!thread_has_unpaused_pollers(thread) && 1225 TAILQ_EMPTY(&thread->paused_pollers)) { 1226 return false; 1227 } 1228 1229 return true; 1230 } 1231 1232 bool 1233 spdk_thread_is_idle(struct spdk_thread *thread) 1234 { 1235 if (spdk_ring_count(thread->messages) || 1236 thread_has_unpaused_pollers(thread) || 1237 thread->critical_msg != NULL) { 1238 return false; 1239 } 1240 1241 return true; 1242 } 1243 1244 uint32_t 1245 spdk_thread_get_count(void) 1246 { 1247 /* 1248 * Return cached value of the current thread count. We could acquire the 1249 * lock and iterate through the TAILQ of threads to count them, but that 1250 * count could still be invalidated after we release the lock. 1251 */ 1252 return g_thread_count; 1253 } 1254 1255 struct spdk_thread * 1256 spdk_get_thread(void) 1257 { 1258 return _get_thread(); 1259 } 1260 1261 const char * 1262 spdk_thread_get_name(const struct spdk_thread *thread) 1263 { 1264 return thread->name; 1265 } 1266 1267 uint64_t 1268 spdk_thread_get_id(const struct spdk_thread *thread) 1269 { 1270 return thread->id; 1271 } 1272 1273 struct spdk_thread * 1274 spdk_thread_get_by_id(uint64_t id) 1275 { 1276 struct spdk_thread *thread; 1277 1278 if (id == 0 || id >= g_thread_id) { 1279 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1280 return NULL; 1281 } 1282 pthread_mutex_lock(&g_devlist_mutex); 1283 TAILQ_FOREACH(thread, &g_threads, tailq) { 1284 if (thread->id == id) { 1285 break; 1286 } 1287 } 1288 pthread_mutex_unlock(&g_devlist_mutex); 1289 return thread; 1290 } 1291 1292 int 1293 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1294 { 1295 struct spdk_thread *thread; 1296 1297 thread = _get_thread(); 1298 if (!thread) { 1299 SPDK_ERRLOG("No thread allocated\n"); 1300 return -EINVAL; 1301 } 1302 1303 if (stats == NULL) { 1304 return -EINVAL; 1305 } 1306 1307 *stats = thread->stats; 1308 1309 return 0; 1310 } 1311 1312 uint64_t 1313 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1314 { 1315 if (thread == NULL) { 1316 thread = _get_thread(); 1317 } 1318 1319 return thread->tsc_last; 1320 } 1321 1322 static inline int 1323 thread_send_msg_notification(const struct spdk_thread *target_thread) 1324 { 1325 uint64_t notify = 1; 1326 int rc; 1327 1328 /* Not necessary to do notification if interrupt facility is not enabled */ 1329 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1330 return 0; 1331 } 1332 1333 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1334 * after sending thread msg, it is necessary to check whether target thread runs in 1335 * interrupt mode and then decide whether do event notification. 1336 */ 1337 if (spdk_unlikely(target_thread->in_interrupt)) { 1338 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1339 if (rc < 0) { 1340 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1341 return -EIO; 1342 } 1343 } 1344 1345 return 0; 1346 } 1347 1348 int 1349 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1350 { 1351 struct spdk_thread *local_thread; 1352 struct spdk_msg *msg; 1353 int rc; 1354 1355 assert(thread != NULL); 1356 1357 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1358 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1359 return -EIO; 1360 } 1361 1362 local_thread = _get_thread(); 1363 1364 msg = NULL; 1365 if (local_thread != NULL) { 1366 if (local_thread->msg_cache_count > 0) { 1367 msg = SLIST_FIRST(&local_thread->msg_cache); 1368 assert(msg != NULL); 1369 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1370 local_thread->msg_cache_count--; 1371 } 1372 } 1373 1374 if (msg == NULL) { 1375 msg = spdk_mempool_get(g_spdk_msg_mempool); 1376 if (!msg) { 1377 SPDK_ERRLOG("msg could not be allocated\n"); 1378 return -ENOMEM; 1379 } 1380 } 1381 1382 msg->fn = fn; 1383 msg->arg = ctx; 1384 1385 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1386 if (rc != 1) { 1387 SPDK_ERRLOG("msg could not be enqueued\n"); 1388 spdk_mempool_put(g_spdk_msg_mempool, msg); 1389 return -EIO; 1390 } 1391 1392 return thread_send_msg_notification(thread); 1393 } 1394 1395 int 1396 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1397 { 1398 spdk_msg_fn expected = NULL; 1399 1400 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1401 __ATOMIC_SEQ_CST)) { 1402 return -EIO; 1403 } 1404 1405 return thread_send_msg_notification(thread); 1406 } 1407 1408 #ifdef __linux__ 1409 static int 1410 interrupt_timerfd_process(void *arg) 1411 { 1412 struct spdk_poller *poller = arg; 1413 uint64_t exp; 1414 int rc; 1415 1416 /* clear the level of interval timer */ 1417 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1418 if (rc < 0) { 1419 if (rc == -EAGAIN) { 1420 return 0; 1421 } 1422 1423 return rc; 1424 } 1425 1426 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1427 1428 return poller->fn(poller->arg); 1429 } 1430 1431 static int 1432 period_poller_interrupt_init(struct spdk_poller *poller) 1433 { 1434 int timerfd; 1435 1436 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1437 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1438 if (timerfd < 0) { 1439 return -errno; 1440 } 1441 1442 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1443 if (poller->intr == NULL) { 1444 close(timerfd); 1445 return -1; 1446 } 1447 1448 return 0; 1449 } 1450 1451 static void 1452 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1453 { 1454 int timerfd; 1455 uint64_t now_tick = spdk_get_ticks(); 1456 uint64_t ticks = spdk_get_ticks_hz(); 1457 int ret; 1458 struct itimerspec new_tv = {}; 1459 struct itimerspec old_tv = {}; 1460 1461 assert(poller->intr != NULL); 1462 assert(poller->period_ticks != 0); 1463 1464 timerfd = poller->intr->efd; 1465 1466 assert(timerfd >= 0); 1467 1468 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1469 interrupt_mode ? "interrupt" : "poll"); 1470 1471 if (interrupt_mode) { 1472 /* Set repeated timer expiration */ 1473 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1474 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1475 1476 /* Update next timer expiration */ 1477 if (poller->next_run_tick == 0) { 1478 poller->next_run_tick = now_tick + poller->period_ticks; 1479 } else if (poller->next_run_tick < now_tick) { 1480 poller->next_run_tick = now_tick; 1481 } 1482 1483 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1484 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1485 1486 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1487 if (ret < 0) { 1488 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1489 assert(false); 1490 } 1491 } else { 1492 /* Disarm the timer */ 1493 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1494 if (ret < 0) { 1495 /* timerfd_settime's failure indicates that the timerfd is in error */ 1496 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1497 assert(false); 1498 } 1499 1500 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1501 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1502 */ 1503 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1504 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1505 poller_remove_timer(poller->thread, poller); 1506 poller_insert_timer(poller->thread, poller, now_tick); 1507 } 1508 } 1509 1510 static void 1511 poller_interrupt_fini(struct spdk_poller *poller) 1512 { 1513 int fd; 1514 1515 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1516 assert(poller->intr != NULL); 1517 fd = poller->intr->efd; 1518 spdk_interrupt_unregister(&poller->intr); 1519 close(fd); 1520 } 1521 1522 static int 1523 busy_poller_interrupt_init(struct spdk_poller *poller) 1524 { 1525 int busy_efd; 1526 1527 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1528 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1529 if (busy_efd < 0) { 1530 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1531 return -errno; 1532 } 1533 1534 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1535 if (poller->intr == NULL) { 1536 close(busy_efd); 1537 return -1; 1538 } 1539 1540 return 0; 1541 } 1542 1543 static void 1544 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1545 { 1546 int busy_efd = poller->intr->efd; 1547 uint64_t notify = 1; 1548 int rc __attribute__((unused)); 1549 1550 assert(busy_efd >= 0); 1551 1552 if (interrupt_mode) { 1553 /* Write without read on eventfd will get it repeatedly triggered. */ 1554 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1555 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1556 } 1557 } else { 1558 /* Read on eventfd will clear its level triggering. */ 1559 rc = read(busy_efd, ¬ify, sizeof(notify)); 1560 } 1561 } 1562 1563 #else 1564 1565 static int 1566 period_poller_interrupt_init(struct spdk_poller *poller) 1567 { 1568 return -ENOTSUP; 1569 } 1570 1571 static void 1572 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1573 { 1574 } 1575 1576 static void 1577 poller_interrupt_fini(struct spdk_poller *poller) 1578 { 1579 } 1580 1581 static int 1582 busy_poller_interrupt_init(struct spdk_poller *poller) 1583 { 1584 return -ENOTSUP; 1585 } 1586 1587 static void 1588 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1589 { 1590 } 1591 1592 #endif 1593 1594 void 1595 spdk_poller_register_interrupt(struct spdk_poller *poller, 1596 spdk_poller_set_interrupt_mode_cb cb_fn, 1597 void *cb_arg) 1598 { 1599 assert(poller != NULL); 1600 assert(cb_fn != NULL); 1601 assert(spdk_get_thread() == poller->thread); 1602 1603 if (!spdk_interrupt_mode_is_enabled()) { 1604 return; 1605 } 1606 1607 /* If this poller already had an interrupt, clean the old one up. */ 1608 if (poller->intr != NULL) { 1609 poller_interrupt_fini(poller); 1610 } 1611 1612 poller->set_intr_cb_fn = cb_fn; 1613 poller->set_intr_cb_arg = cb_arg; 1614 1615 /* Set poller into interrupt mode if thread is in interrupt. */ 1616 if (poller->thread->in_interrupt) { 1617 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1618 } 1619 } 1620 1621 static uint64_t 1622 convert_us_to_ticks(uint64_t us) 1623 { 1624 uint64_t quotient, remainder, ticks; 1625 1626 if (us) { 1627 quotient = us / SPDK_SEC_TO_USEC; 1628 remainder = us % SPDK_SEC_TO_USEC; 1629 ticks = spdk_get_ticks_hz(); 1630 1631 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1632 } else { 1633 return 0; 1634 } 1635 } 1636 1637 static struct spdk_poller * 1638 poller_register(spdk_poller_fn fn, 1639 void *arg, 1640 uint64_t period_microseconds, 1641 const char *name) 1642 { 1643 struct spdk_thread *thread; 1644 struct spdk_poller *poller; 1645 1646 thread = spdk_get_thread(); 1647 if (!thread) { 1648 assert(false); 1649 return NULL; 1650 } 1651 1652 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1653 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1654 return NULL; 1655 } 1656 1657 poller = calloc(1, sizeof(*poller)); 1658 if (poller == NULL) { 1659 SPDK_ERRLOG("Poller memory allocation failed\n"); 1660 return NULL; 1661 } 1662 1663 if (name) { 1664 snprintf(poller->name, sizeof(poller->name), "%s", name); 1665 } else { 1666 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1667 } 1668 1669 poller->state = SPDK_POLLER_STATE_WAITING; 1670 poller->fn = fn; 1671 poller->arg = arg; 1672 poller->thread = thread; 1673 poller->intr = NULL; 1674 if (thread->next_poller_id == 0) { 1675 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1676 thread->next_poller_id = 1; 1677 } 1678 poller->id = thread->next_poller_id++; 1679 1680 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1681 1682 if (spdk_interrupt_mode_is_enabled()) { 1683 int rc; 1684 1685 if (period_microseconds) { 1686 rc = period_poller_interrupt_init(poller); 1687 if (rc < 0) { 1688 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1689 free(poller); 1690 return NULL; 1691 } 1692 1693 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1694 poller->set_intr_cb_arg = NULL; 1695 1696 } else { 1697 /* If the poller doesn't have a period, create interruptfd that's always 1698 * busy automatically when running in interrupt mode. 1699 */ 1700 rc = busy_poller_interrupt_init(poller); 1701 if (rc > 0) { 1702 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1703 free(poller); 1704 return NULL; 1705 } 1706 1707 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1708 poller->set_intr_cb_arg = NULL; 1709 } 1710 1711 /* Set poller into interrupt mode if thread is in interrupt. */ 1712 if (poller->thread->in_interrupt) { 1713 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1714 } 1715 } 1716 1717 thread_insert_poller(thread, poller); 1718 1719 return poller; 1720 } 1721 1722 struct spdk_poller * 1723 spdk_poller_register(spdk_poller_fn fn, 1724 void *arg, 1725 uint64_t period_microseconds) 1726 { 1727 return poller_register(fn, arg, period_microseconds, NULL); 1728 } 1729 1730 struct spdk_poller * 1731 spdk_poller_register_named(spdk_poller_fn fn, 1732 void *arg, 1733 uint64_t period_microseconds, 1734 const char *name) 1735 { 1736 return poller_register(fn, arg, period_microseconds, name); 1737 } 1738 1739 static void 1740 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1741 struct spdk_thread *curthread) 1742 { 1743 if (thread == NULL) { 1744 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1745 abort(); 1746 } 1747 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1748 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1749 thread->name, thread->id); 1750 assert(false); 1751 } 1752 1753 void 1754 spdk_poller_unregister(struct spdk_poller **ppoller) 1755 { 1756 struct spdk_thread *thread; 1757 struct spdk_poller *poller; 1758 1759 poller = *ppoller; 1760 if (poller == NULL) { 1761 return; 1762 } 1763 1764 *ppoller = NULL; 1765 1766 thread = spdk_get_thread(); 1767 if (!thread) { 1768 assert(false); 1769 return; 1770 } 1771 1772 if (poller->thread != thread) { 1773 wrong_thread(__func__, poller->name, poller->thread, thread); 1774 return; 1775 } 1776 1777 if (spdk_interrupt_mode_is_enabled()) { 1778 /* Release the interrupt resource for period or busy poller */ 1779 if (poller->intr != NULL) { 1780 poller_interrupt_fini(poller); 1781 } 1782 1783 /* If there is not already a pending poller removal, generate 1784 * a message to go process removals. */ 1785 if (!thread->poller_unregistered) { 1786 thread->poller_unregistered = true; 1787 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1788 } 1789 } 1790 1791 /* If the poller was paused, put it on the active_pollers list so that 1792 * its unregistration can be processed by spdk_thread_poll(). 1793 */ 1794 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1795 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1796 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1797 poller->period_ticks = 0; 1798 } 1799 1800 /* Simply set the state to unregistered. The poller will get cleaned up 1801 * in a subsequent call to spdk_thread_poll(). 1802 */ 1803 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1804 } 1805 1806 void 1807 spdk_poller_pause(struct spdk_poller *poller) 1808 { 1809 struct spdk_thread *thread; 1810 1811 thread = spdk_get_thread(); 1812 if (!thread) { 1813 assert(false); 1814 return; 1815 } 1816 1817 if (poller->thread != thread) { 1818 wrong_thread(__func__, poller->name, poller->thread, thread); 1819 return; 1820 } 1821 1822 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1823 * spdk_thread_poll() move it. It allows a poller to be paused from 1824 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1825 * iteration, or from within itself without breaking the logic to always 1826 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1827 */ 1828 switch (poller->state) { 1829 case SPDK_POLLER_STATE_PAUSED: 1830 case SPDK_POLLER_STATE_PAUSING: 1831 break; 1832 case SPDK_POLLER_STATE_RUNNING: 1833 case SPDK_POLLER_STATE_WAITING: 1834 poller->state = SPDK_POLLER_STATE_PAUSING; 1835 break; 1836 default: 1837 assert(false); 1838 break; 1839 } 1840 } 1841 1842 void 1843 spdk_poller_resume(struct spdk_poller *poller) 1844 { 1845 struct spdk_thread *thread; 1846 1847 thread = spdk_get_thread(); 1848 if (!thread) { 1849 assert(false); 1850 return; 1851 } 1852 1853 if (poller->thread != thread) { 1854 wrong_thread(__func__, poller->name, poller->thread, thread); 1855 return; 1856 } 1857 1858 /* If a poller is paused it has to be removed from the paused pollers 1859 * list and put on the active list or timer tree depending on its 1860 * period_ticks. If a poller is still in the process of being paused, 1861 * we just need to flip its state back to waiting, as it's already on 1862 * the appropriate list or tree. 1863 */ 1864 switch (poller->state) { 1865 case SPDK_POLLER_STATE_PAUSED: 1866 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1867 thread_insert_poller(thread, poller); 1868 /* fallthrough */ 1869 case SPDK_POLLER_STATE_PAUSING: 1870 poller->state = SPDK_POLLER_STATE_WAITING; 1871 break; 1872 case SPDK_POLLER_STATE_RUNNING: 1873 case SPDK_POLLER_STATE_WAITING: 1874 break; 1875 default: 1876 assert(false); 1877 break; 1878 } 1879 } 1880 1881 const char * 1882 spdk_poller_get_name(struct spdk_poller *poller) 1883 { 1884 return poller->name; 1885 } 1886 1887 uint64_t 1888 spdk_poller_get_id(struct spdk_poller *poller) 1889 { 1890 return poller->id; 1891 } 1892 1893 const char * 1894 spdk_poller_get_state_str(struct spdk_poller *poller) 1895 { 1896 switch (poller->state) { 1897 case SPDK_POLLER_STATE_WAITING: 1898 return "waiting"; 1899 case SPDK_POLLER_STATE_RUNNING: 1900 return "running"; 1901 case SPDK_POLLER_STATE_UNREGISTERED: 1902 return "unregistered"; 1903 case SPDK_POLLER_STATE_PAUSING: 1904 return "pausing"; 1905 case SPDK_POLLER_STATE_PAUSED: 1906 return "paused"; 1907 default: 1908 return NULL; 1909 } 1910 } 1911 1912 uint64_t 1913 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1914 { 1915 return poller->period_ticks; 1916 } 1917 1918 void 1919 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1920 { 1921 stats->run_count = poller->run_count; 1922 stats->busy_count = poller->busy_count; 1923 } 1924 1925 struct spdk_poller * 1926 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1927 { 1928 return TAILQ_FIRST(&thread->active_pollers); 1929 } 1930 1931 struct spdk_poller * 1932 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1933 { 1934 return TAILQ_NEXT(prev, tailq); 1935 } 1936 1937 struct spdk_poller * 1938 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1939 { 1940 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1941 } 1942 1943 struct spdk_poller * 1944 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1945 { 1946 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1947 } 1948 1949 struct spdk_poller * 1950 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1951 { 1952 return TAILQ_FIRST(&thread->paused_pollers); 1953 } 1954 1955 struct spdk_poller * 1956 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1957 { 1958 return TAILQ_NEXT(prev, tailq); 1959 } 1960 1961 struct spdk_io_channel * 1962 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1963 { 1964 return RB_MIN(io_channel_tree, &thread->io_channels); 1965 } 1966 1967 struct spdk_io_channel * 1968 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1969 { 1970 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1971 } 1972 1973 struct call_thread { 1974 struct spdk_thread *cur_thread; 1975 spdk_msg_fn fn; 1976 void *ctx; 1977 1978 struct spdk_thread *orig_thread; 1979 spdk_msg_fn cpl; 1980 }; 1981 1982 static void 1983 _back_to_orig_thread(void *ctx) 1984 { 1985 struct call_thread *ct = ctx; 1986 1987 assert(ct->orig_thread->for_each_count > 0); 1988 ct->orig_thread->for_each_count--; 1989 1990 ct->cpl(ct->ctx); 1991 free(ctx); 1992 } 1993 1994 static void 1995 _on_thread(void *ctx) 1996 { 1997 struct call_thread *ct = ctx; 1998 int rc __attribute__((unused)); 1999 2000 ct->fn(ct->ctx); 2001 2002 pthread_mutex_lock(&g_devlist_mutex); 2003 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2004 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 2005 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 2006 ct->cur_thread->name); 2007 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 2008 } 2009 pthread_mutex_unlock(&g_devlist_mutex); 2010 2011 if (!ct->cur_thread) { 2012 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 2013 2014 rc = spdk_thread_send_msg(ct->orig_thread, _back_to_orig_thread, ctx); 2015 } else { 2016 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 2017 ct->cur_thread->name); 2018 2019 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 2020 } 2021 assert(rc == 0); 2022 } 2023 2024 void 2025 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 2026 { 2027 struct call_thread *ct; 2028 struct spdk_thread *thread; 2029 int rc __attribute__((unused)); 2030 2031 ct = calloc(1, sizeof(*ct)); 2032 if (!ct) { 2033 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2034 cpl(ctx); 2035 return; 2036 } 2037 2038 ct->fn = fn; 2039 ct->ctx = ctx; 2040 ct->cpl = cpl; 2041 2042 thread = _get_thread(); 2043 if (!thread) { 2044 SPDK_ERRLOG("No thread allocated\n"); 2045 free(ct); 2046 cpl(ctx); 2047 return; 2048 } 2049 ct->orig_thread = thread; 2050 2051 ct->orig_thread->for_each_count++; 2052 2053 pthread_mutex_lock(&g_devlist_mutex); 2054 ct->cur_thread = TAILQ_FIRST(&g_threads); 2055 pthread_mutex_unlock(&g_devlist_mutex); 2056 2057 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2058 ct->orig_thread->name); 2059 2060 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2061 assert(rc == 0); 2062 } 2063 2064 static inline void 2065 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2066 { 2067 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2068 return; 2069 } 2070 2071 if (!poller->set_intr_cb_fn) { 2072 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2073 assert(false); 2074 return; 2075 } 2076 2077 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2078 } 2079 2080 void 2081 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2082 { 2083 struct spdk_thread *thread = _get_thread(); 2084 struct spdk_poller *poller, *tmp; 2085 2086 assert(thread); 2087 assert(spdk_interrupt_mode_is_enabled()); 2088 2089 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2090 thread->name, enable_interrupt ? "intr" : "poll", 2091 thread->in_interrupt ? "intr" : "poll"); 2092 2093 if (thread->in_interrupt == enable_interrupt) { 2094 return; 2095 } 2096 2097 /* Set pollers to expected mode */ 2098 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2099 poller_set_interrupt_mode(poller, enable_interrupt); 2100 } 2101 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2102 poller_set_interrupt_mode(poller, enable_interrupt); 2103 } 2104 /* All paused pollers will go to work in interrupt mode */ 2105 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2106 poller_set_interrupt_mode(poller, enable_interrupt); 2107 } 2108 2109 thread->in_interrupt = enable_interrupt; 2110 return; 2111 } 2112 2113 static struct io_device * 2114 io_device_get(void *io_device) 2115 { 2116 struct io_device find = {}; 2117 2118 find.io_device = io_device; 2119 return RB_FIND(io_device_tree, &g_io_devices, &find); 2120 } 2121 2122 void 2123 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2124 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2125 const char *name) 2126 { 2127 struct io_device *dev, *tmp; 2128 struct spdk_thread *thread; 2129 2130 assert(io_device != NULL); 2131 assert(create_cb != NULL); 2132 assert(destroy_cb != NULL); 2133 2134 thread = spdk_get_thread(); 2135 if (!thread) { 2136 SPDK_ERRLOG("called from non-SPDK thread\n"); 2137 assert(false); 2138 return; 2139 } 2140 2141 dev = calloc(1, sizeof(struct io_device)); 2142 if (dev == NULL) { 2143 SPDK_ERRLOG("could not allocate io_device\n"); 2144 return; 2145 } 2146 2147 dev->io_device = io_device; 2148 if (name) { 2149 snprintf(dev->name, sizeof(dev->name), "%s", name); 2150 } else { 2151 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2152 } 2153 dev->create_cb = create_cb; 2154 dev->destroy_cb = destroy_cb; 2155 dev->unregister_cb = NULL; 2156 dev->ctx_size = ctx_size; 2157 dev->for_each_count = 0; 2158 dev->unregistered = false; 2159 dev->refcnt = 0; 2160 2161 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2162 dev->name, dev->io_device, thread->name); 2163 2164 pthread_mutex_lock(&g_devlist_mutex); 2165 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2166 if (tmp != NULL) { 2167 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2168 io_device, tmp->name, dev->name); 2169 free(dev); 2170 } 2171 2172 pthread_mutex_unlock(&g_devlist_mutex); 2173 } 2174 2175 static void 2176 _finish_unregister(void *arg) 2177 { 2178 struct io_device *dev = arg; 2179 struct spdk_thread *thread; 2180 2181 thread = spdk_get_thread(); 2182 assert(thread == dev->unregister_thread); 2183 2184 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2185 dev->name, dev->io_device, thread->name); 2186 2187 assert(thread->pending_unregister_count > 0); 2188 thread->pending_unregister_count--; 2189 2190 dev->unregister_cb(dev->io_device); 2191 free(dev); 2192 } 2193 2194 static void 2195 io_device_free(struct io_device *dev) 2196 { 2197 int rc __attribute__((unused)); 2198 2199 if (dev->unregister_cb == NULL) { 2200 free(dev); 2201 } else { 2202 assert(dev->unregister_thread != NULL); 2203 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2204 dev->name, dev->io_device, dev->unregister_thread->name); 2205 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2206 assert(rc == 0); 2207 } 2208 } 2209 2210 void 2211 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2212 { 2213 struct io_device *dev; 2214 uint32_t refcnt; 2215 struct spdk_thread *thread; 2216 2217 thread = spdk_get_thread(); 2218 if (!thread) { 2219 SPDK_ERRLOG("called from non-SPDK thread\n"); 2220 assert(false); 2221 return; 2222 } 2223 2224 pthread_mutex_lock(&g_devlist_mutex); 2225 dev = io_device_get(io_device); 2226 if (!dev) { 2227 SPDK_ERRLOG("io_device %p not found\n", io_device); 2228 assert(false); 2229 pthread_mutex_unlock(&g_devlist_mutex); 2230 return; 2231 } 2232 2233 /* The for_each_count check differentiates the user attempting to unregister the 2234 * device a second time, from the internal call to this function that occurs 2235 * after the for_each_count reaches 0. 2236 */ 2237 if (dev->pending_unregister && dev->for_each_count > 0) { 2238 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2239 assert(false); 2240 pthread_mutex_unlock(&g_devlist_mutex); 2241 return; 2242 } 2243 2244 dev->unregister_cb = unregister_cb; 2245 dev->unregister_thread = thread; 2246 2247 if (dev->for_each_count > 0) { 2248 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2249 dev->name, io_device, dev->for_each_count); 2250 dev->pending_unregister = true; 2251 pthread_mutex_unlock(&g_devlist_mutex); 2252 return; 2253 } 2254 2255 dev->unregistered = true; 2256 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2257 refcnt = dev->refcnt; 2258 pthread_mutex_unlock(&g_devlist_mutex); 2259 2260 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2261 dev->name, dev->io_device, thread->name); 2262 2263 if (unregister_cb) { 2264 thread->pending_unregister_count++; 2265 } 2266 2267 if (refcnt > 0) { 2268 /* defer deletion */ 2269 return; 2270 } 2271 2272 io_device_free(dev); 2273 } 2274 2275 const char * 2276 spdk_io_device_get_name(struct io_device *dev) 2277 { 2278 return dev->name; 2279 } 2280 2281 static struct spdk_io_channel * 2282 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2283 { 2284 struct spdk_io_channel find = {}; 2285 2286 find.dev = dev; 2287 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2288 } 2289 2290 struct spdk_io_channel * 2291 spdk_get_io_channel(void *io_device) 2292 { 2293 struct spdk_io_channel *ch; 2294 struct spdk_thread *thread; 2295 struct io_device *dev; 2296 int rc; 2297 2298 pthread_mutex_lock(&g_devlist_mutex); 2299 dev = io_device_get(io_device); 2300 if (dev == NULL) { 2301 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2302 pthread_mutex_unlock(&g_devlist_mutex); 2303 return NULL; 2304 } 2305 2306 thread = _get_thread(); 2307 if (!thread) { 2308 SPDK_ERRLOG("No thread allocated\n"); 2309 pthread_mutex_unlock(&g_devlist_mutex); 2310 return NULL; 2311 } 2312 2313 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2314 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2315 pthread_mutex_unlock(&g_devlist_mutex); 2316 return NULL; 2317 } 2318 2319 ch = thread_get_io_channel(thread, dev); 2320 if (ch != NULL) { 2321 ch->ref++; 2322 2323 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2324 ch, dev->name, dev->io_device, thread->name, ch->ref); 2325 2326 /* 2327 * An I/O channel already exists for this device on this 2328 * thread, so return it. 2329 */ 2330 pthread_mutex_unlock(&g_devlist_mutex); 2331 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2332 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2333 return ch; 2334 } 2335 2336 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2337 if (ch == NULL) { 2338 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2339 pthread_mutex_unlock(&g_devlist_mutex); 2340 return NULL; 2341 } 2342 2343 ch->dev = dev; 2344 ch->destroy_cb = dev->destroy_cb; 2345 ch->thread = thread; 2346 ch->ref = 1; 2347 ch->destroy_ref = 0; 2348 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2349 2350 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2351 ch, dev->name, dev->io_device, thread->name, ch->ref); 2352 2353 dev->refcnt++; 2354 2355 pthread_mutex_unlock(&g_devlist_mutex); 2356 2357 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2358 if (rc != 0) { 2359 pthread_mutex_lock(&g_devlist_mutex); 2360 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2361 dev->refcnt--; 2362 free(ch); 2363 SPDK_ERRLOG("could not create io_channel for io_device %s (%p): %s (rc=%d)\n", 2364 dev->name, io_device, spdk_strerror(-rc), rc); 2365 pthread_mutex_unlock(&g_devlist_mutex); 2366 return NULL; 2367 } 2368 2369 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2370 return ch; 2371 } 2372 2373 static void 2374 put_io_channel(void *arg) 2375 { 2376 struct spdk_io_channel *ch = arg; 2377 bool do_remove_dev = true; 2378 struct spdk_thread *thread; 2379 2380 thread = spdk_get_thread(); 2381 if (!thread) { 2382 SPDK_ERRLOG("called from non-SPDK thread\n"); 2383 assert(false); 2384 return; 2385 } 2386 2387 SPDK_DEBUGLOG(thread, 2388 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2389 ch, ch->dev->name, ch->dev->io_device, thread->name); 2390 2391 assert(ch->thread == thread); 2392 2393 ch->destroy_ref--; 2394 2395 if (ch->ref > 0 || ch->destroy_ref > 0) { 2396 /* 2397 * Another reference to the associated io_device was requested 2398 * after this message was sent but before it had a chance to 2399 * execute. 2400 */ 2401 return; 2402 } 2403 2404 pthread_mutex_lock(&g_devlist_mutex); 2405 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2406 pthread_mutex_unlock(&g_devlist_mutex); 2407 2408 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2409 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2410 2411 pthread_mutex_lock(&g_devlist_mutex); 2412 ch->dev->refcnt--; 2413 2414 if (!ch->dev->unregistered) { 2415 do_remove_dev = false; 2416 } 2417 2418 if (ch->dev->refcnt > 0) { 2419 do_remove_dev = false; 2420 } 2421 2422 pthread_mutex_unlock(&g_devlist_mutex); 2423 2424 if (do_remove_dev) { 2425 io_device_free(ch->dev); 2426 } 2427 free(ch); 2428 } 2429 2430 void 2431 spdk_put_io_channel(struct spdk_io_channel *ch) 2432 { 2433 struct spdk_thread *thread; 2434 int rc __attribute__((unused)); 2435 2436 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2437 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2438 2439 thread = spdk_get_thread(); 2440 if (!thread) { 2441 SPDK_ERRLOG("called from non-SPDK thread\n"); 2442 assert(false); 2443 return; 2444 } 2445 2446 if (ch->thread != thread) { 2447 wrong_thread(__func__, "ch", ch->thread, thread); 2448 return; 2449 } 2450 2451 SPDK_DEBUGLOG(thread, 2452 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2453 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2454 2455 ch->ref--; 2456 2457 if (ch->ref == 0) { 2458 ch->destroy_ref++; 2459 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2460 assert(rc == 0); 2461 } 2462 } 2463 2464 struct spdk_io_channel * 2465 spdk_io_channel_from_ctx(void *ctx) 2466 { 2467 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2468 } 2469 2470 struct spdk_thread * 2471 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2472 { 2473 return ch->thread; 2474 } 2475 2476 void * 2477 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2478 { 2479 return ch->dev->io_device; 2480 } 2481 2482 const char * 2483 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2484 { 2485 return spdk_io_device_get_name(ch->dev); 2486 } 2487 2488 int 2489 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2490 { 2491 return ch->ref; 2492 } 2493 2494 struct spdk_io_channel_iter { 2495 void *io_device; 2496 struct io_device *dev; 2497 spdk_channel_msg fn; 2498 int status; 2499 void *ctx; 2500 struct spdk_io_channel *ch; 2501 2502 struct spdk_thread *cur_thread; 2503 2504 struct spdk_thread *orig_thread; 2505 spdk_channel_for_each_cpl cpl; 2506 }; 2507 2508 void * 2509 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2510 { 2511 return i->io_device; 2512 } 2513 2514 struct spdk_io_channel * 2515 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2516 { 2517 return i->ch; 2518 } 2519 2520 void * 2521 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2522 { 2523 return i->ctx; 2524 } 2525 2526 static void 2527 _call_completion(void *ctx) 2528 { 2529 struct spdk_io_channel_iter *i = ctx; 2530 2531 assert(i->orig_thread->for_each_count > 0); 2532 i->orig_thread->for_each_count--; 2533 2534 if (i->cpl != NULL) { 2535 i->cpl(i, i->status); 2536 } 2537 free(i); 2538 } 2539 2540 static void 2541 _call_channel(void *ctx) 2542 { 2543 struct spdk_io_channel_iter *i = ctx; 2544 struct spdk_io_channel *ch; 2545 2546 /* 2547 * It is possible that the channel was deleted before this 2548 * message had a chance to execute. If so, skip calling 2549 * the fn() on this thread. 2550 */ 2551 pthread_mutex_lock(&g_devlist_mutex); 2552 ch = thread_get_io_channel(i->cur_thread, i->dev); 2553 pthread_mutex_unlock(&g_devlist_mutex); 2554 2555 if (ch) { 2556 i->fn(i); 2557 } else { 2558 spdk_for_each_channel_continue(i, 0); 2559 } 2560 } 2561 2562 void 2563 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2564 spdk_channel_for_each_cpl cpl) 2565 { 2566 struct spdk_thread *thread; 2567 struct spdk_io_channel *ch; 2568 struct spdk_io_channel_iter *i; 2569 int rc __attribute__((unused)); 2570 2571 i = calloc(1, sizeof(*i)); 2572 if (!i) { 2573 SPDK_ERRLOG("Unable to allocate iterator\n"); 2574 assert(false); 2575 return; 2576 } 2577 2578 i->io_device = io_device; 2579 i->fn = fn; 2580 i->ctx = ctx; 2581 i->cpl = cpl; 2582 i->orig_thread = _get_thread(); 2583 2584 i->orig_thread->for_each_count++; 2585 2586 pthread_mutex_lock(&g_devlist_mutex); 2587 i->dev = io_device_get(io_device); 2588 if (i->dev == NULL) { 2589 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2590 assert(false); 2591 i->status = -ENODEV; 2592 goto end; 2593 } 2594 2595 /* Do not allow new for_each operations if we are already waiting to unregister 2596 * the device for other for_each operations to complete. 2597 */ 2598 if (i->dev->pending_unregister) { 2599 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2600 i->status = -ENODEV; 2601 goto end; 2602 } 2603 2604 TAILQ_FOREACH(thread, &g_threads, tailq) { 2605 ch = thread_get_io_channel(thread, i->dev); 2606 if (ch != NULL) { 2607 ch->dev->for_each_count++; 2608 i->cur_thread = thread; 2609 i->ch = ch; 2610 pthread_mutex_unlock(&g_devlist_mutex); 2611 rc = spdk_thread_send_msg(thread, _call_channel, i); 2612 assert(rc == 0); 2613 return; 2614 } 2615 } 2616 2617 end: 2618 pthread_mutex_unlock(&g_devlist_mutex); 2619 2620 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2621 assert(rc == 0); 2622 } 2623 2624 static void 2625 __pending_unregister(void *arg) 2626 { 2627 struct io_device *dev = arg; 2628 2629 assert(dev->pending_unregister); 2630 assert(dev->for_each_count == 0); 2631 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2632 } 2633 2634 void 2635 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2636 { 2637 struct spdk_thread *thread; 2638 struct spdk_io_channel *ch; 2639 struct io_device *dev; 2640 int rc __attribute__((unused)); 2641 2642 assert(i->cur_thread == spdk_get_thread()); 2643 2644 i->status = status; 2645 2646 pthread_mutex_lock(&g_devlist_mutex); 2647 dev = i->dev; 2648 if (status) { 2649 goto end; 2650 } 2651 2652 thread = TAILQ_NEXT(i->cur_thread, tailq); 2653 while (thread) { 2654 ch = thread_get_io_channel(thread, dev); 2655 if (ch != NULL) { 2656 i->cur_thread = thread; 2657 i->ch = ch; 2658 pthread_mutex_unlock(&g_devlist_mutex); 2659 rc = spdk_thread_send_msg(thread, _call_channel, i); 2660 assert(rc == 0); 2661 return; 2662 } 2663 thread = TAILQ_NEXT(thread, tailq); 2664 } 2665 2666 end: 2667 dev->for_each_count--; 2668 i->ch = NULL; 2669 pthread_mutex_unlock(&g_devlist_mutex); 2670 2671 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2672 assert(rc == 0); 2673 2674 pthread_mutex_lock(&g_devlist_mutex); 2675 if (dev->pending_unregister && dev->for_each_count == 0) { 2676 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2677 assert(rc == 0); 2678 } 2679 pthread_mutex_unlock(&g_devlist_mutex); 2680 } 2681 2682 static void 2683 thread_interrupt_destroy(struct spdk_thread *thread) 2684 { 2685 struct spdk_fd_group *fgrp = thread->fgrp; 2686 2687 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2688 2689 if (thread->msg_fd < 0) { 2690 return; 2691 } 2692 2693 spdk_fd_group_remove(fgrp, thread->msg_fd); 2694 close(thread->msg_fd); 2695 thread->msg_fd = -1; 2696 2697 spdk_fd_group_destroy(fgrp); 2698 thread->fgrp = NULL; 2699 } 2700 2701 #ifdef __linux__ 2702 static int 2703 thread_interrupt_msg_process(void *arg) 2704 { 2705 struct spdk_thread *thread = arg; 2706 struct spdk_thread *orig_thread; 2707 uint32_t msg_count; 2708 spdk_msg_fn critical_msg; 2709 int rc = 0; 2710 uint64_t notify = 1; 2711 2712 assert(spdk_interrupt_mode_is_enabled()); 2713 2714 orig_thread = spdk_get_thread(); 2715 spdk_set_thread(thread); 2716 2717 /* There may be race between msg_acknowledge and another producer's msg_notify, 2718 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2719 * This can avoid msg notification missing. 2720 */ 2721 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2722 if (rc < 0 && errno != EAGAIN) { 2723 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2724 } 2725 2726 critical_msg = thread->critical_msg; 2727 if (spdk_unlikely(critical_msg != NULL)) { 2728 critical_msg(NULL); 2729 thread->critical_msg = NULL; 2730 rc = 1; 2731 } 2732 2733 msg_count = msg_queue_run_batch(thread, 0); 2734 if (msg_count) { 2735 rc = 1; 2736 } 2737 2738 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2739 if (spdk_unlikely(!thread->in_interrupt)) { 2740 /* The thread transitioned to poll mode in a msg during the above processing. 2741 * Clear msg_fd since thread messages will be polled directly in poll mode. 2742 */ 2743 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2744 if (rc < 0 && errno != EAGAIN) { 2745 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2746 } 2747 } 2748 2749 spdk_set_thread(orig_thread); 2750 return rc; 2751 } 2752 2753 static int 2754 thread_interrupt_create(struct spdk_thread *thread) 2755 { 2756 int rc; 2757 2758 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2759 2760 rc = spdk_fd_group_create(&thread->fgrp); 2761 if (rc) { 2762 return rc; 2763 } 2764 2765 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2766 if (thread->msg_fd < 0) { 2767 rc = -errno; 2768 spdk_fd_group_destroy(thread->fgrp); 2769 thread->fgrp = NULL; 2770 2771 return rc; 2772 } 2773 2774 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2775 thread_interrupt_msg_process, thread); 2776 } 2777 #else 2778 static int 2779 thread_interrupt_create(struct spdk_thread *thread) 2780 { 2781 return -ENOTSUP; 2782 } 2783 #endif 2784 2785 static int 2786 _interrupt_wrapper(void *ctx) 2787 { 2788 struct spdk_interrupt *intr = ctx; 2789 struct spdk_thread *orig_thread, *thread; 2790 int rc; 2791 2792 orig_thread = spdk_get_thread(); 2793 thread = intr->thread; 2794 2795 spdk_set_thread(thread); 2796 2797 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2798 intr->fn, intr->arg); 2799 2800 rc = intr->fn(intr->arg); 2801 2802 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2803 2804 spdk_set_thread(orig_thread); 2805 2806 return rc; 2807 } 2808 2809 struct spdk_interrupt * 2810 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2811 void *arg, const char *name) 2812 { 2813 struct spdk_thread *thread; 2814 struct spdk_interrupt *intr; 2815 int ret; 2816 2817 thread = spdk_get_thread(); 2818 if (!thread) { 2819 assert(false); 2820 return NULL; 2821 } 2822 2823 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2824 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2825 return NULL; 2826 } 2827 2828 intr = calloc(1, sizeof(*intr)); 2829 if (intr == NULL) { 2830 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2831 return NULL; 2832 } 2833 2834 if (name) { 2835 snprintf(intr->name, sizeof(intr->name), "%s", name); 2836 } else { 2837 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2838 } 2839 2840 intr->efd = efd; 2841 intr->thread = thread; 2842 intr->fn = fn; 2843 intr->arg = arg; 2844 2845 ret = spdk_fd_group_add(thread->fgrp, efd, _interrupt_wrapper, intr, intr->name); 2846 2847 if (ret != 0) { 2848 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2849 thread->name, efd, spdk_strerror(-ret)); 2850 free(intr); 2851 return NULL; 2852 } 2853 2854 return intr; 2855 } 2856 2857 void 2858 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2859 { 2860 struct spdk_thread *thread; 2861 struct spdk_interrupt *intr; 2862 2863 intr = *pintr; 2864 if (intr == NULL) { 2865 return; 2866 } 2867 2868 *pintr = NULL; 2869 2870 thread = spdk_get_thread(); 2871 if (!thread) { 2872 assert(false); 2873 return; 2874 } 2875 2876 if (intr->thread != thread) { 2877 wrong_thread(__func__, intr->name, intr->thread, thread); 2878 return; 2879 } 2880 2881 spdk_fd_group_remove(thread->fgrp, intr->efd); 2882 free(intr); 2883 } 2884 2885 int 2886 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2887 enum spdk_interrupt_event_types event_types) 2888 { 2889 struct spdk_thread *thread; 2890 2891 thread = spdk_get_thread(); 2892 if (!thread) { 2893 assert(false); 2894 return -EINVAL; 2895 } 2896 2897 if (intr->thread != thread) { 2898 wrong_thread(__func__, intr->name, intr->thread, thread); 2899 return -EINVAL; 2900 } 2901 2902 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2903 } 2904 2905 int 2906 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2907 { 2908 return spdk_fd_group_get_fd(thread->fgrp); 2909 } 2910 2911 struct spdk_fd_group * 2912 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2913 { 2914 return thread->fgrp; 2915 } 2916 2917 static bool g_interrupt_mode = false; 2918 2919 int 2920 spdk_interrupt_mode_enable(void) 2921 { 2922 /* It must be called once prior to initializing the threading library. 2923 * g_spdk_msg_mempool will be valid if thread library is initialized. 2924 */ 2925 if (g_spdk_msg_mempool) { 2926 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2927 return -1; 2928 } 2929 2930 #ifdef __linux__ 2931 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2932 g_interrupt_mode = true; 2933 return 0; 2934 #else 2935 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2936 g_interrupt_mode = false; 2937 return -ENOTSUP; 2938 #endif 2939 } 2940 2941 bool 2942 spdk_interrupt_mode_is_enabled(void) 2943 { 2944 return g_interrupt_mode; 2945 } 2946 2947 #define SSPIN_DEBUG_STACK_FRAMES 16 2948 2949 struct sspin_stack { 2950 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 2951 uint32_t depth; 2952 }; 2953 2954 struct spdk_spinlock_internal { 2955 struct sspin_stack init_stack; 2956 struct sspin_stack lock_stack; 2957 struct sspin_stack unlock_stack; 2958 }; 2959 2960 static void 2961 sspin_init_internal(struct spdk_spinlock *sspin) 2962 { 2963 #ifdef DEBUG 2964 sspin->internal = calloc(1, sizeof(*sspin->internal)); 2965 #endif 2966 } 2967 2968 static void 2969 sspin_fini_internal(struct spdk_spinlock *sspin) 2970 { 2971 #ifdef DEBUG 2972 free(sspin->internal); 2973 sspin->internal = NULL; 2974 #endif 2975 } 2976 2977 #if defined(DEBUG) && defined(SPDK_HAVE_EXECINFO_H) 2978 #define SSPIN_GET_STACK(sspin, which) \ 2979 do { \ 2980 if (sspin->internal != NULL) { \ 2981 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 2982 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 2983 } \ 2984 } while (0) 2985 #else 2986 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 2987 #endif 2988 2989 static void 2990 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 2991 { 2992 #ifdef SPDK_HAVE_EXECINFO_H 2993 char **stack; 2994 size_t i; 2995 2996 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 2997 if (stack == NULL) { 2998 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 2999 return; 3000 } 3001 SPDK_ERRLOG(" %s:\n", title); 3002 for (i = 0; i < sspin_stack->depth; i++) { 3003 /* 3004 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 3005 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 3006 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 3007 * gdb_macros.md in the docs directory for details. 3008 */ 3009 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 3010 } 3011 free(stack); 3012 #endif /* SPDK_HAVE_EXECINFO_H */ 3013 } 3014 3015 static void 3016 sspin_stacks_print(const struct spdk_spinlock *sspin) 3017 { 3018 if (sspin->internal == NULL) { 3019 return; 3020 } 3021 SPDK_ERRLOG("spinlock %p\n", sspin); 3022 sspin_stack_print("Lock initalized at", &sspin->internal->init_stack); 3023 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 3024 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 3025 } 3026 3027 void 3028 spdk_spin_init(struct spdk_spinlock *sspin) 3029 { 3030 int rc; 3031 3032 memset(sspin, 0, sizeof(*sspin)); 3033 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 3034 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3035 sspin_init_internal(sspin); 3036 SSPIN_GET_STACK(sspin, init); 3037 sspin->initialized = true; 3038 } 3039 3040 void 3041 spdk_spin_destroy(struct spdk_spinlock *sspin) 3042 { 3043 int rc; 3044 3045 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3046 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3047 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 3048 3049 rc = pthread_spin_destroy(&sspin->spinlock); 3050 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3051 3052 sspin_fini_internal(sspin); 3053 sspin->initialized = false; 3054 sspin->destroyed = true; 3055 } 3056 3057 void 3058 spdk_spin_lock(struct spdk_spinlock *sspin) 3059 { 3060 struct spdk_thread *thread = spdk_get_thread(); 3061 int rc; 3062 3063 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3064 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3065 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3066 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3067 3068 rc = pthread_spin_lock(&sspin->spinlock); 3069 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3070 3071 sspin->thread = thread; 3072 sspin->thread->lock_count++; 3073 3074 SSPIN_GET_STACK(sspin, lock); 3075 } 3076 3077 void 3078 spdk_spin_unlock(struct spdk_spinlock *sspin) 3079 { 3080 struct spdk_thread *thread = spdk_get_thread(); 3081 int rc; 3082 3083 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3084 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3085 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3086 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3087 3088 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3089 thread->lock_count--; 3090 sspin->thread = NULL; 3091 3092 SSPIN_GET_STACK(sspin, unlock); 3093 3094 rc = pthread_spin_unlock(&sspin->spinlock); 3095 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3096 } 3097 3098 bool 3099 spdk_spin_held(struct spdk_spinlock *sspin) 3100 { 3101 struct spdk_thread *thread = spdk_get_thread(); 3102 3103 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3104 3105 return sspin->thread == thread; 3106 } 3107 3108 SPDK_LOG_REGISTER_COMPONENT(thread) 3109