1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #include <execinfo.h> 29 #endif 30 31 #ifdef __FreeBSD__ 32 #include <execinfo.h> 33 #endif 34 35 #define SPDK_MSG_BATCH_SIZE 8 36 #define SPDK_MAX_DEVICE_NAME_LEN 256 37 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 38 #define SPDK_MAX_POLLER_NAME_LEN 256 39 #define SPDK_MAX_THREAD_NAME_LEN 256 40 41 static struct spdk_thread *g_app_thread; 42 43 struct spdk_interrupt { 44 int efd; 45 struct spdk_thread *thread; 46 spdk_interrupt_fn fn; 47 void *arg; 48 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 49 }; 50 51 enum spdk_poller_state { 52 /* The poller is registered with a thread but not currently executing its fn. */ 53 SPDK_POLLER_STATE_WAITING, 54 55 /* The poller is currently running its fn. */ 56 SPDK_POLLER_STATE_RUNNING, 57 58 /* The poller was unregistered during the execution of its fn. */ 59 SPDK_POLLER_STATE_UNREGISTERED, 60 61 /* The poller is in the process of being paused. It will be paused 62 * during the next time it's supposed to be executed. 63 */ 64 SPDK_POLLER_STATE_PAUSING, 65 66 /* The poller is registered but currently paused. It's on the 67 * paused_pollers list. 68 */ 69 SPDK_POLLER_STATE_PAUSED, 70 }; 71 72 struct spdk_poller { 73 TAILQ_ENTRY(spdk_poller) tailq; 74 RB_ENTRY(spdk_poller) node; 75 76 /* Current state of the poller; should only be accessed from the poller's thread. */ 77 enum spdk_poller_state state; 78 79 uint64_t period_ticks; 80 uint64_t next_run_tick; 81 uint64_t run_count; 82 uint64_t busy_count; 83 uint64_t id; 84 spdk_poller_fn fn; 85 void *arg; 86 struct spdk_thread *thread; 87 struct spdk_interrupt *intr; 88 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 89 void *set_intr_cb_arg; 90 91 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 92 }; 93 94 enum spdk_thread_state { 95 /* The thread is processing poller and message by spdk_thread_poll(). */ 96 SPDK_THREAD_STATE_RUNNING, 97 98 /* The thread is in the process of termination. It reaps unregistering 99 * poller are releasing I/O channel. 100 */ 101 SPDK_THREAD_STATE_EXITING, 102 103 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 104 SPDK_THREAD_STATE_EXITED, 105 }; 106 107 struct spdk_thread { 108 uint64_t tsc_last; 109 struct spdk_thread_stats stats; 110 /* 111 * Contains pollers actively running on this thread. Pollers 112 * are run round-robin. The thread takes one poller from the head 113 * of the ring, executes it, then puts it back at the tail of 114 * the ring. 115 */ 116 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 117 /** 118 * Contains pollers running on this thread with a periodic timer. 119 */ 120 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 121 struct spdk_poller *first_timed_poller; 122 /* 123 * Contains paused pollers. Pollers on this queue are waiting until 124 * they are resumed (in which case they're put onto the active/timer 125 * queues) or unregistered. 126 */ 127 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 128 struct spdk_ring *messages; 129 int msg_fd; 130 SLIST_HEAD(, spdk_msg) msg_cache; 131 size_t msg_cache_count; 132 spdk_msg_fn critical_msg; 133 uint64_t id; 134 uint64_t next_poller_id; 135 enum spdk_thread_state state; 136 int pending_unregister_count; 137 138 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 139 TAILQ_ENTRY(spdk_thread) tailq; 140 141 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 142 struct spdk_cpuset cpumask; 143 uint64_t exit_timeout_tsc; 144 145 int32_t lock_count; 146 147 /* spdk_thread is bound to current CPU core. */ 148 bool is_bound; 149 150 /* Indicates whether this spdk_thread currently runs in interrupt. */ 151 bool in_interrupt; 152 bool poller_unregistered; 153 struct spdk_fd_group *fgrp; 154 155 /* User context allocated at the end */ 156 uint8_t ctx[0]; 157 }; 158 159 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 160 161 static spdk_new_thread_fn g_new_thread_fn = NULL; 162 static spdk_thread_op_fn g_thread_op_fn = NULL; 163 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 164 static size_t g_ctx_sz = 0; 165 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 166 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 167 * SPDK application is required. 168 */ 169 static uint64_t g_thread_id = 1; 170 171 enum spin_error { 172 SPIN_ERR_NONE, 173 /* Trying to use an SPDK lock while not on an SPDK thread */ 174 SPIN_ERR_NOT_SPDK_THREAD, 175 /* Trying to lock a lock already held by this SPDK thread */ 176 SPIN_ERR_DEADLOCK, 177 /* Trying to unlock a lock not held by this SPDK thread */ 178 SPIN_ERR_WRONG_THREAD, 179 /* pthread_spin_*() returned an error */ 180 SPIN_ERR_PTHREAD, 181 /* Trying to destroy a lock that is held */ 182 SPIN_ERR_LOCK_HELD, 183 /* lock_count is invalid */ 184 SPIN_ERR_LOCK_COUNT, 185 /* 186 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 187 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 188 * deadlock when another SPDK thread on the same pthread tries to take that lock. 189 */ 190 SPIN_ERR_HOLD_DURING_SWITCH, 191 /* Trying to use a lock that was destroyed (but not re-initialized) */ 192 SPIN_ERR_DESTROYED, 193 /* Trying to use a lock that is not initialized */ 194 SPIN_ERR_NOT_INITIALIZED, 195 196 /* Must be last, not an actual error code */ 197 SPIN_ERR_LAST 198 }; 199 200 static const char *spin_error_strings[] = { 201 [SPIN_ERR_NONE] = "No error", 202 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 203 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 204 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 205 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 206 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 207 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 208 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 209 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 210 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 211 }; 212 213 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 214 ? "Unknown error" : spin_error_strings[err] 215 216 static void 217 __posix_abort(enum spin_error err) 218 { 219 abort(); 220 } 221 222 typedef void (*spin_abort)(enum spin_error err); 223 spin_abort g_spin_abort_fn = __posix_abort; 224 225 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 226 do { \ 227 if (spdk_unlikely(!(cond))) { \ 228 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 229 SPIN_ERROR_STRING(err), #cond); \ 230 extra_log; \ 231 g_spin_abort_fn(err); \ 232 ret; \ 233 } \ 234 } while (0) 235 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 236 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 237 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 238 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 239 240 struct io_device { 241 void *io_device; 242 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 243 spdk_io_channel_create_cb create_cb; 244 spdk_io_channel_destroy_cb destroy_cb; 245 spdk_io_device_unregister_cb unregister_cb; 246 struct spdk_thread *unregister_thread; 247 uint32_t ctx_size; 248 uint32_t for_each_count; 249 RB_ENTRY(io_device) node; 250 251 uint32_t refcnt; 252 253 bool pending_unregister; 254 bool unregistered; 255 }; 256 257 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 258 259 static int 260 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 261 { 262 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 263 } 264 265 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 266 267 static int 268 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 269 { 270 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 271 } 272 273 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 274 275 struct spdk_msg { 276 spdk_msg_fn fn; 277 void *arg; 278 279 SLIST_ENTRY(spdk_msg) link; 280 }; 281 282 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 283 284 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 285 static uint32_t g_thread_count = 0; 286 287 static __thread struct spdk_thread *tls_thread = NULL; 288 289 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 290 { 291 spdk_trace_register_description("THREAD_IOCH_GET", 292 TRACE_THREAD_IOCH_GET, 293 OWNER_NONE, OBJECT_NONE, 0, 294 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 295 spdk_trace_register_description("THREAD_IOCH_PUT", 296 TRACE_THREAD_IOCH_PUT, 297 OWNER_NONE, OBJECT_NONE, 0, 298 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 299 } 300 301 /* 302 * If this compare function returns zero when two next_run_ticks are equal, 303 * the macro RB_INSERT() returns a pointer to the element with the same 304 * next_run_tick. 305 * 306 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 307 * to remove as a parameter. 308 * 309 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 310 * side by returning 1 when two next_run_ticks are equal. 311 */ 312 static inline int 313 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 314 { 315 if (poller1->next_run_tick < poller2->next_run_tick) { 316 return -1; 317 } else { 318 return 1; 319 } 320 } 321 322 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 323 324 static inline struct spdk_thread * 325 _get_thread(void) 326 { 327 return tls_thread; 328 } 329 330 static int 331 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 332 { 333 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 334 335 g_ctx_sz = ctx_sz; 336 337 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 338 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 339 sizeof(struct spdk_msg), 340 0, /* No cache. We do our own. */ 341 SPDK_ENV_SOCKET_ID_ANY); 342 343 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 344 msg_mempool_sz); 345 346 if (!g_spdk_msg_mempool) { 347 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 348 return -ENOMEM; 349 } 350 351 return 0; 352 } 353 354 static void thread_interrupt_destroy(struct spdk_thread *thread); 355 static int thread_interrupt_create(struct spdk_thread *thread); 356 357 static void 358 _free_thread(struct spdk_thread *thread) 359 { 360 struct spdk_io_channel *ch; 361 struct spdk_msg *msg; 362 struct spdk_poller *poller, *ptmp; 363 364 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 365 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 366 thread->name, ch->dev->name); 367 } 368 369 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 370 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 371 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 372 poller->name); 373 } 374 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 375 free(poller); 376 } 377 378 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 379 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 380 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 381 poller->name); 382 } 383 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 384 free(poller); 385 } 386 387 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 388 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 389 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 390 free(poller); 391 } 392 393 pthread_mutex_lock(&g_devlist_mutex); 394 assert(g_thread_count > 0); 395 g_thread_count--; 396 TAILQ_REMOVE(&g_threads, thread, tailq); 397 pthread_mutex_unlock(&g_devlist_mutex); 398 399 msg = SLIST_FIRST(&thread->msg_cache); 400 while (msg != NULL) { 401 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 402 403 assert(thread->msg_cache_count > 0); 404 thread->msg_cache_count--; 405 spdk_mempool_put(g_spdk_msg_mempool, msg); 406 407 msg = SLIST_FIRST(&thread->msg_cache); 408 } 409 410 assert(thread->msg_cache_count == 0); 411 412 if (spdk_interrupt_mode_is_enabled()) { 413 thread_interrupt_destroy(thread); 414 } 415 416 spdk_ring_free(thread->messages); 417 free(thread); 418 } 419 420 int 421 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 422 { 423 assert(g_new_thread_fn == NULL); 424 assert(g_thread_op_fn == NULL); 425 426 if (new_thread_fn == NULL) { 427 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 428 } else { 429 g_new_thread_fn = new_thread_fn; 430 } 431 432 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 433 } 434 435 int 436 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 437 spdk_thread_op_supported_fn thread_op_supported_fn, 438 size_t ctx_sz, size_t msg_mempool_sz) 439 { 440 assert(g_new_thread_fn == NULL); 441 assert(g_thread_op_fn == NULL); 442 assert(g_thread_op_supported_fn == NULL); 443 444 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 445 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 446 return -EINVAL; 447 } 448 449 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 450 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 451 } else { 452 g_thread_op_fn = thread_op_fn; 453 g_thread_op_supported_fn = thread_op_supported_fn; 454 } 455 456 return _thread_lib_init(ctx_sz, msg_mempool_sz); 457 } 458 459 void 460 spdk_thread_lib_fini(void) 461 { 462 struct io_device *dev; 463 464 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 465 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 466 } 467 468 g_new_thread_fn = NULL; 469 g_thread_op_fn = NULL; 470 g_thread_op_supported_fn = NULL; 471 g_ctx_sz = 0; 472 if (g_app_thread != NULL) { 473 _free_thread(g_app_thread); 474 g_app_thread = NULL; 475 } 476 477 if (g_spdk_msg_mempool) { 478 spdk_mempool_free(g_spdk_msg_mempool); 479 g_spdk_msg_mempool = NULL; 480 } 481 } 482 483 struct spdk_thread * 484 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 485 { 486 struct spdk_thread *thread, *null_thread; 487 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 488 int rc = 0, i; 489 490 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 491 if (!thread) { 492 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 493 return NULL; 494 } 495 496 if (cpumask) { 497 spdk_cpuset_copy(&thread->cpumask, cpumask); 498 } else { 499 spdk_cpuset_negate(&thread->cpumask); 500 } 501 502 RB_INIT(&thread->io_channels); 503 TAILQ_INIT(&thread->active_pollers); 504 RB_INIT(&thread->timed_pollers); 505 TAILQ_INIT(&thread->paused_pollers); 506 SLIST_INIT(&thread->msg_cache); 507 thread->msg_cache_count = 0; 508 509 thread->tsc_last = spdk_get_ticks(); 510 511 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 512 * ID exceeds UINT64_MAX a warning message is logged 513 */ 514 thread->next_poller_id = 1; 515 516 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 517 if (!thread->messages) { 518 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 519 free(thread); 520 return NULL; 521 } 522 523 /* Fill the local message pool cache. */ 524 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 525 if (rc == 0) { 526 /* If we can't populate the cache it's ok. The cache will get filled 527 * up organically as messages are passed to the thread. */ 528 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 529 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 530 thread->msg_cache_count++; 531 } 532 } 533 534 if (name) { 535 snprintf(thread->name, sizeof(thread->name), "%s", name); 536 } else { 537 snprintf(thread->name, sizeof(thread->name), "%p", thread); 538 } 539 540 pthread_mutex_lock(&g_devlist_mutex); 541 if (g_thread_id == 0) { 542 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 543 pthread_mutex_unlock(&g_devlist_mutex); 544 _free_thread(thread); 545 return NULL; 546 } 547 thread->id = g_thread_id++; 548 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 549 g_thread_count++; 550 pthread_mutex_unlock(&g_devlist_mutex); 551 552 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 553 thread->id, thread->name); 554 555 if (spdk_interrupt_mode_is_enabled()) { 556 thread->in_interrupt = true; 557 rc = thread_interrupt_create(thread); 558 if (rc != 0) { 559 _free_thread(thread); 560 return NULL; 561 } 562 } 563 564 if (g_new_thread_fn) { 565 rc = g_new_thread_fn(thread); 566 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 567 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 568 } 569 570 if (rc != 0) { 571 _free_thread(thread); 572 return NULL; 573 } 574 575 thread->state = SPDK_THREAD_STATE_RUNNING; 576 577 /* If this is the first thread, save it as the app thread. Use an atomic 578 * compare + exchange to guard against crazy users who might try to 579 * call spdk_thread_create() simultaneously on multiple threads. 580 */ 581 null_thread = NULL; 582 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 583 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 584 585 return thread; 586 } 587 588 struct spdk_thread * 589 spdk_thread_get_app_thread(void) 590 { 591 return g_app_thread; 592 } 593 594 void 595 spdk_thread_bind(struct spdk_thread *thread, bool bind) 596 { 597 thread->is_bound = bind; 598 } 599 600 bool 601 spdk_thread_is_bound(struct spdk_thread *thread) 602 { 603 return thread->is_bound; 604 } 605 606 void 607 spdk_set_thread(struct spdk_thread *thread) 608 { 609 tls_thread = thread; 610 } 611 612 static void 613 thread_exit(struct spdk_thread *thread, uint64_t now) 614 { 615 struct spdk_poller *poller; 616 struct spdk_io_channel *ch; 617 618 if (now >= thread->exit_timeout_tsc) { 619 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 620 thread->name); 621 goto exited; 622 } 623 624 if (spdk_ring_count(thread->messages) > 0) { 625 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 626 return; 627 } 628 629 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 630 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 631 SPDK_INFOLOG(thread, 632 "thread %s still has active poller %s\n", 633 thread->name, poller->name); 634 return; 635 } 636 } 637 638 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 639 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 640 SPDK_INFOLOG(thread, 641 "thread %s still has active timed poller %s\n", 642 thread->name, poller->name); 643 return; 644 } 645 } 646 647 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 648 SPDK_INFOLOG(thread, 649 "thread %s still has paused poller %s\n", 650 thread->name, poller->name); 651 return; 652 } 653 654 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 655 SPDK_INFOLOG(thread, 656 "thread %s still has channel for io_device %s\n", 657 thread->name, ch->dev->name); 658 return; 659 } 660 661 if (thread->pending_unregister_count > 0) { 662 SPDK_INFOLOG(thread, 663 "thread %s is still unregistering io_devices\n", 664 thread->name); 665 return; 666 } 667 668 exited: 669 thread->state = SPDK_THREAD_STATE_EXITED; 670 if (spdk_unlikely(thread->in_interrupt)) { 671 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 672 } 673 } 674 675 static void _thread_exit(void *ctx); 676 677 int 678 spdk_thread_exit(struct spdk_thread *thread) 679 { 680 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 681 682 assert(tls_thread == thread); 683 684 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 685 SPDK_INFOLOG(thread, 686 "thread %s is already exiting\n", 687 thread->name); 688 return 0; 689 } 690 691 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 692 SPDK_THREAD_EXIT_TIMEOUT_SEC); 693 thread->state = SPDK_THREAD_STATE_EXITING; 694 695 if (spdk_interrupt_mode_is_enabled()) { 696 spdk_thread_send_msg(thread, _thread_exit, thread); 697 } 698 699 return 0; 700 } 701 702 bool 703 spdk_thread_is_running(struct spdk_thread *thread) 704 { 705 return thread->state == SPDK_THREAD_STATE_RUNNING; 706 } 707 708 bool 709 spdk_thread_is_exited(struct spdk_thread *thread) 710 { 711 return thread->state == SPDK_THREAD_STATE_EXITED; 712 } 713 714 void 715 spdk_thread_destroy(struct spdk_thread *thread) 716 { 717 assert(thread != NULL); 718 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 719 720 assert(thread->state == SPDK_THREAD_STATE_EXITED); 721 722 if (tls_thread == thread) { 723 tls_thread = NULL; 724 } 725 726 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 727 if (thread != g_app_thread) { 728 _free_thread(thread); 729 } 730 } 731 732 void * 733 spdk_thread_get_ctx(struct spdk_thread *thread) 734 { 735 if (g_ctx_sz > 0) { 736 return thread->ctx; 737 } 738 739 return NULL; 740 } 741 742 struct spdk_cpuset * 743 spdk_thread_get_cpumask(struct spdk_thread *thread) 744 { 745 return &thread->cpumask; 746 } 747 748 int 749 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 750 { 751 struct spdk_thread *thread; 752 753 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 754 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 755 assert(false); 756 return -ENOTSUP; 757 } 758 759 thread = spdk_get_thread(); 760 if (!thread) { 761 SPDK_ERRLOG("Called from non-SPDK thread\n"); 762 assert(false); 763 return -EINVAL; 764 } 765 766 spdk_cpuset_copy(&thread->cpumask, cpumask); 767 768 /* Invoke framework's reschedule operation. If this function is called multiple times 769 * in a single spdk_thread_poll() context, the last cpumask will be used in the 770 * reschedule operation. 771 */ 772 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 773 774 return 0; 775 } 776 777 struct spdk_thread * 778 spdk_thread_get_from_ctx(void *ctx) 779 { 780 if (ctx == NULL) { 781 assert(false); 782 return NULL; 783 } 784 785 assert(g_ctx_sz > 0); 786 787 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 788 } 789 790 static inline uint32_t 791 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 792 { 793 unsigned count, i; 794 void *messages[SPDK_MSG_BATCH_SIZE]; 795 uint64_t notify = 1; 796 int rc; 797 798 #ifdef DEBUG 799 /* 800 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 801 * so we will never actually read uninitialized data from events, but just to be sure 802 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 803 */ 804 memset(messages, 0, sizeof(messages)); 805 #endif 806 807 if (max_msgs > 0) { 808 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 809 } else { 810 max_msgs = SPDK_MSG_BATCH_SIZE; 811 } 812 813 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 814 if (spdk_unlikely(thread->in_interrupt) && 815 spdk_ring_count(thread->messages) != 0) { 816 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 817 if (rc < 0) { 818 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 819 } 820 } 821 if (count == 0) { 822 return 0; 823 } 824 825 for (i = 0; i < count; i++) { 826 struct spdk_msg *msg = messages[i]; 827 828 assert(msg != NULL); 829 830 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 831 832 msg->fn(msg->arg); 833 834 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 835 836 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 837 /* Insert the messages at the head. We want to re-use the hot 838 * ones. */ 839 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 840 thread->msg_cache_count++; 841 } else { 842 spdk_mempool_put(g_spdk_msg_mempool, msg); 843 } 844 } 845 846 return count; 847 } 848 849 static void 850 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 851 { 852 struct spdk_poller *tmp __attribute__((unused)); 853 854 poller->next_run_tick = now + poller->period_ticks; 855 856 /* 857 * Insert poller in the thread's timed_pollers tree by next scheduled run time 858 * as its key. 859 */ 860 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 861 assert(tmp == NULL); 862 863 /* Update the cache only if it is empty or the inserted poller is earlier than it. 864 * RB_MIN() is not necessary here because all pollers, which has exactly the same 865 * next_run_tick as the existing poller, are inserted on the right side. 866 */ 867 if (thread->first_timed_poller == NULL || 868 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 869 thread->first_timed_poller = poller; 870 } 871 } 872 873 static inline void 874 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 875 { 876 struct spdk_poller *tmp __attribute__((unused)); 877 878 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 879 assert(tmp != NULL); 880 881 /* This function is not used in any case that is performance critical. 882 * Update the cache simply by RB_MIN() if it needs to be changed. 883 */ 884 if (thread->first_timed_poller == poller) { 885 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 886 } 887 } 888 889 static void 890 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 891 { 892 if (poller->period_ticks) { 893 poller_insert_timer(thread, poller, spdk_get_ticks()); 894 } else { 895 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 896 } 897 } 898 899 static inline void 900 thread_update_stats(struct spdk_thread *thread, uint64_t end, 901 uint64_t start, int rc) 902 { 903 if (rc == 0) { 904 /* Poller status idle */ 905 thread->stats.idle_tsc += end - start; 906 } else if (rc > 0) { 907 /* Poller status busy */ 908 thread->stats.busy_tsc += end - start; 909 } 910 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 911 thread->tsc_last = end; 912 } 913 914 static inline int 915 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 916 { 917 int rc; 918 919 switch (poller->state) { 920 case SPDK_POLLER_STATE_UNREGISTERED: 921 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 922 free(poller); 923 return 0; 924 case SPDK_POLLER_STATE_PAUSING: 925 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 926 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 927 poller->state = SPDK_POLLER_STATE_PAUSED; 928 return 0; 929 case SPDK_POLLER_STATE_WAITING: 930 break; 931 default: 932 assert(false); 933 break; 934 } 935 936 poller->state = SPDK_POLLER_STATE_RUNNING; 937 rc = poller->fn(poller->arg); 938 939 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 940 941 poller->run_count++; 942 if (rc > 0) { 943 poller->busy_count++; 944 } 945 946 #ifdef DEBUG 947 if (rc == -1) { 948 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 949 } 950 #endif 951 952 switch (poller->state) { 953 case SPDK_POLLER_STATE_UNREGISTERED: 954 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 955 free(poller); 956 break; 957 case SPDK_POLLER_STATE_PAUSING: 958 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 959 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 960 poller->state = SPDK_POLLER_STATE_PAUSED; 961 break; 962 case SPDK_POLLER_STATE_PAUSED: 963 case SPDK_POLLER_STATE_WAITING: 964 break; 965 case SPDK_POLLER_STATE_RUNNING: 966 poller->state = SPDK_POLLER_STATE_WAITING; 967 break; 968 default: 969 assert(false); 970 break; 971 } 972 973 return rc; 974 } 975 976 static inline int 977 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 978 uint64_t now) 979 { 980 int rc; 981 982 switch (poller->state) { 983 case SPDK_POLLER_STATE_UNREGISTERED: 984 free(poller); 985 return 0; 986 case SPDK_POLLER_STATE_PAUSING: 987 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 988 poller->state = SPDK_POLLER_STATE_PAUSED; 989 return 0; 990 case SPDK_POLLER_STATE_WAITING: 991 break; 992 default: 993 assert(false); 994 break; 995 } 996 997 poller->state = SPDK_POLLER_STATE_RUNNING; 998 rc = poller->fn(poller->arg); 999 1000 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1001 1002 poller->run_count++; 1003 if (rc > 0) { 1004 poller->busy_count++; 1005 } 1006 1007 #ifdef DEBUG 1008 if (rc == -1) { 1009 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1010 } 1011 #endif 1012 1013 switch (poller->state) { 1014 case SPDK_POLLER_STATE_UNREGISTERED: 1015 free(poller); 1016 break; 1017 case SPDK_POLLER_STATE_PAUSING: 1018 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1019 poller->state = SPDK_POLLER_STATE_PAUSED; 1020 break; 1021 case SPDK_POLLER_STATE_PAUSED: 1022 break; 1023 case SPDK_POLLER_STATE_RUNNING: 1024 poller->state = SPDK_POLLER_STATE_WAITING; 1025 /* fallthrough */ 1026 case SPDK_POLLER_STATE_WAITING: 1027 poller_insert_timer(thread, poller, now); 1028 break; 1029 default: 1030 assert(false); 1031 break; 1032 } 1033 1034 return rc; 1035 } 1036 1037 static int 1038 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1039 { 1040 uint32_t msg_count; 1041 struct spdk_poller *poller, *tmp; 1042 spdk_msg_fn critical_msg; 1043 int rc = 0; 1044 1045 thread->tsc_last = now; 1046 1047 critical_msg = thread->critical_msg; 1048 if (spdk_unlikely(critical_msg != NULL)) { 1049 critical_msg(NULL); 1050 thread->critical_msg = NULL; 1051 rc = 1; 1052 } 1053 1054 msg_count = msg_queue_run_batch(thread, max_msgs); 1055 if (msg_count) { 1056 rc = 1; 1057 } 1058 1059 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1060 active_pollers_head, tailq, tmp) { 1061 int poller_rc; 1062 1063 poller_rc = thread_execute_poller(thread, poller); 1064 if (poller_rc > rc) { 1065 rc = poller_rc; 1066 } 1067 } 1068 1069 poller = thread->first_timed_poller; 1070 while (poller != NULL) { 1071 int timer_rc = 0; 1072 1073 if (now < poller->next_run_tick) { 1074 break; 1075 } 1076 1077 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1078 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1079 1080 /* Update the cache to the next timed poller in the list 1081 * only if the current poller is still the closest, otherwise, 1082 * do nothing because the cache has been already updated. 1083 */ 1084 if (thread->first_timed_poller == poller) { 1085 thread->first_timed_poller = tmp; 1086 } 1087 1088 timer_rc = thread_execute_timed_poller(thread, poller, now); 1089 if (timer_rc > rc) { 1090 rc = timer_rc; 1091 } 1092 1093 poller = tmp; 1094 } 1095 1096 return rc; 1097 } 1098 1099 static void 1100 _thread_remove_pollers(void *ctx) 1101 { 1102 struct spdk_thread *thread = ctx; 1103 struct spdk_poller *poller, *tmp; 1104 1105 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1106 active_pollers_head, tailq, tmp) { 1107 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1108 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1109 free(poller); 1110 } 1111 } 1112 1113 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1114 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1115 poller_remove_timer(thread, poller); 1116 free(poller); 1117 } 1118 } 1119 1120 thread->poller_unregistered = false; 1121 } 1122 1123 static void 1124 _thread_exit(void *ctx) 1125 { 1126 struct spdk_thread *thread = ctx; 1127 1128 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1129 1130 thread_exit(thread, spdk_get_ticks()); 1131 } 1132 1133 int 1134 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1135 { 1136 struct spdk_thread *orig_thread; 1137 int rc; 1138 1139 orig_thread = _get_thread(); 1140 tls_thread = thread; 1141 1142 if (now == 0) { 1143 now = spdk_get_ticks(); 1144 } 1145 1146 if (spdk_likely(!thread->in_interrupt)) { 1147 rc = thread_poll(thread, max_msgs, now); 1148 if (spdk_unlikely(thread->in_interrupt)) { 1149 /* The thread transitioned to interrupt mode during the above poll. 1150 * Poll it one more time in case that during the transition time 1151 * there is msg received without notification. 1152 */ 1153 rc = thread_poll(thread, max_msgs, now); 1154 } 1155 1156 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1157 thread_exit(thread, now); 1158 } 1159 } else { 1160 /* Non-block wait on thread's fd_group */ 1161 rc = spdk_fd_group_wait(thread->fgrp, 0); 1162 } 1163 1164 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1165 1166 tls_thread = orig_thread; 1167 1168 return rc; 1169 } 1170 1171 uint64_t 1172 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1173 { 1174 struct spdk_poller *poller; 1175 1176 poller = thread->first_timed_poller; 1177 if (poller) { 1178 return poller->next_run_tick; 1179 } 1180 1181 return 0; 1182 } 1183 1184 int 1185 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1186 { 1187 return !TAILQ_EMPTY(&thread->active_pollers); 1188 } 1189 1190 static bool 1191 thread_has_unpaused_pollers(struct spdk_thread *thread) 1192 { 1193 if (TAILQ_EMPTY(&thread->active_pollers) && 1194 RB_EMPTY(&thread->timed_pollers)) { 1195 return false; 1196 } 1197 1198 return true; 1199 } 1200 1201 bool 1202 spdk_thread_has_pollers(struct spdk_thread *thread) 1203 { 1204 if (!thread_has_unpaused_pollers(thread) && 1205 TAILQ_EMPTY(&thread->paused_pollers)) { 1206 return false; 1207 } 1208 1209 return true; 1210 } 1211 1212 bool 1213 spdk_thread_is_idle(struct spdk_thread *thread) 1214 { 1215 if (spdk_ring_count(thread->messages) || 1216 thread_has_unpaused_pollers(thread) || 1217 thread->critical_msg != NULL) { 1218 return false; 1219 } 1220 1221 return true; 1222 } 1223 1224 uint32_t 1225 spdk_thread_get_count(void) 1226 { 1227 /* 1228 * Return cached value of the current thread count. We could acquire the 1229 * lock and iterate through the TAILQ of threads to count them, but that 1230 * count could still be invalidated after we release the lock. 1231 */ 1232 return g_thread_count; 1233 } 1234 1235 struct spdk_thread * 1236 spdk_get_thread(void) 1237 { 1238 return _get_thread(); 1239 } 1240 1241 const char * 1242 spdk_thread_get_name(const struct spdk_thread *thread) 1243 { 1244 return thread->name; 1245 } 1246 1247 uint64_t 1248 spdk_thread_get_id(const struct spdk_thread *thread) 1249 { 1250 return thread->id; 1251 } 1252 1253 struct spdk_thread * 1254 spdk_thread_get_by_id(uint64_t id) 1255 { 1256 struct spdk_thread *thread; 1257 1258 if (id == 0 || id >= g_thread_id) { 1259 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1260 return NULL; 1261 } 1262 pthread_mutex_lock(&g_devlist_mutex); 1263 TAILQ_FOREACH(thread, &g_threads, tailq) { 1264 if (thread->id == id) { 1265 break; 1266 } 1267 } 1268 pthread_mutex_unlock(&g_devlist_mutex); 1269 return thread; 1270 } 1271 1272 int 1273 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1274 { 1275 struct spdk_thread *thread; 1276 1277 thread = _get_thread(); 1278 if (!thread) { 1279 SPDK_ERRLOG("No thread allocated\n"); 1280 return -EINVAL; 1281 } 1282 1283 if (stats == NULL) { 1284 return -EINVAL; 1285 } 1286 1287 *stats = thread->stats; 1288 1289 return 0; 1290 } 1291 1292 uint64_t 1293 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1294 { 1295 if (thread == NULL) { 1296 thread = _get_thread(); 1297 } 1298 1299 return thread->tsc_last; 1300 } 1301 1302 static inline int 1303 thread_send_msg_notification(const struct spdk_thread *target_thread) 1304 { 1305 uint64_t notify = 1; 1306 int rc; 1307 1308 /* Not necessary to do notification if interrupt facility is not enabled */ 1309 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1310 return 0; 1311 } 1312 1313 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1314 * after sending thread msg, it is necessary to check whether target thread runs in 1315 * interrupt mode and then decide whether do event notification. 1316 */ 1317 if (spdk_unlikely(target_thread->in_interrupt)) { 1318 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1319 if (rc < 0) { 1320 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1321 return -EIO; 1322 } 1323 } 1324 1325 return 0; 1326 } 1327 1328 int 1329 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1330 { 1331 struct spdk_thread *local_thread; 1332 struct spdk_msg *msg; 1333 int rc; 1334 1335 assert(thread != NULL); 1336 1337 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1338 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1339 return -EIO; 1340 } 1341 1342 local_thread = _get_thread(); 1343 1344 msg = NULL; 1345 if (local_thread != NULL) { 1346 if (local_thread->msg_cache_count > 0) { 1347 msg = SLIST_FIRST(&local_thread->msg_cache); 1348 assert(msg != NULL); 1349 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1350 local_thread->msg_cache_count--; 1351 } 1352 } 1353 1354 if (msg == NULL) { 1355 msg = spdk_mempool_get(g_spdk_msg_mempool); 1356 if (!msg) { 1357 SPDK_ERRLOG("msg could not be allocated\n"); 1358 return -ENOMEM; 1359 } 1360 } 1361 1362 msg->fn = fn; 1363 msg->arg = ctx; 1364 1365 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1366 if (rc != 1) { 1367 SPDK_ERRLOG("msg could not be enqueued\n"); 1368 spdk_mempool_put(g_spdk_msg_mempool, msg); 1369 return -EIO; 1370 } 1371 1372 return thread_send_msg_notification(thread); 1373 } 1374 1375 int 1376 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1377 { 1378 spdk_msg_fn expected = NULL; 1379 1380 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1381 __ATOMIC_SEQ_CST)) { 1382 return -EIO; 1383 } 1384 1385 return thread_send_msg_notification(thread); 1386 } 1387 1388 #ifdef __linux__ 1389 static int 1390 interrupt_timerfd_process(void *arg) 1391 { 1392 struct spdk_poller *poller = arg; 1393 uint64_t exp; 1394 int rc; 1395 1396 /* clear the level of interval timer */ 1397 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1398 if (rc < 0) { 1399 if (rc == -EAGAIN) { 1400 return 0; 1401 } 1402 1403 return rc; 1404 } 1405 1406 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1407 1408 return poller->fn(poller->arg); 1409 } 1410 1411 static int 1412 period_poller_interrupt_init(struct spdk_poller *poller) 1413 { 1414 int timerfd; 1415 1416 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1417 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1418 if (timerfd < 0) { 1419 return -errno; 1420 } 1421 1422 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1423 if (poller->intr == NULL) { 1424 close(timerfd); 1425 return -1; 1426 } 1427 1428 return 0; 1429 } 1430 1431 static void 1432 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1433 { 1434 int timerfd; 1435 uint64_t now_tick = spdk_get_ticks(); 1436 uint64_t ticks = spdk_get_ticks_hz(); 1437 int ret; 1438 struct itimerspec new_tv = {}; 1439 struct itimerspec old_tv = {}; 1440 1441 assert(poller->intr != NULL); 1442 assert(poller->period_ticks != 0); 1443 1444 timerfd = poller->intr->efd; 1445 1446 assert(timerfd >= 0); 1447 1448 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1449 interrupt_mode ? "interrupt" : "poll"); 1450 1451 if (interrupt_mode) { 1452 /* Set repeated timer expiration */ 1453 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1454 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1455 1456 /* Update next timer expiration */ 1457 if (poller->next_run_tick == 0) { 1458 poller->next_run_tick = now_tick + poller->period_ticks; 1459 } else if (poller->next_run_tick < now_tick) { 1460 poller->next_run_tick = now_tick; 1461 } 1462 1463 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1464 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1465 1466 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1467 if (ret < 0) { 1468 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1469 assert(false); 1470 } 1471 } else { 1472 /* Disarm the timer */ 1473 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1474 if (ret < 0) { 1475 /* timerfd_settime's failure indicates that the timerfd is in error */ 1476 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1477 assert(false); 1478 } 1479 1480 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1481 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1482 */ 1483 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1484 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1485 poller_remove_timer(poller->thread, poller); 1486 poller_insert_timer(poller->thread, poller, now_tick); 1487 } 1488 } 1489 1490 static void 1491 poller_interrupt_fini(struct spdk_poller *poller) 1492 { 1493 int fd; 1494 1495 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1496 assert(poller->intr != NULL); 1497 fd = poller->intr->efd; 1498 spdk_interrupt_unregister(&poller->intr); 1499 close(fd); 1500 } 1501 1502 static int 1503 busy_poller_interrupt_init(struct spdk_poller *poller) 1504 { 1505 int busy_efd; 1506 1507 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1508 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1509 if (busy_efd < 0) { 1510 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1511 return -errno; 1512 } 1513 1514 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1515 if (poller->intr == NULL) { 1516 close(busy_efd); 1517 return -1; 1518 } 1519 1520 return 0; 1521 } 1522 1523 static void 1524 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1525 { 1526 int busy_efd = poller->intr->efd; 1527 uint64_t notify = 1; 1528 int rc __attribute__((unused)); 1529 1530 assert(busy_efd >= 0); 1531 1532 if (interrupt_mode) { 1533 /* Write without read on eventfd will get it repeatedly triggered. */ 1534 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1535 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1536 } 1537 } else { 1538 /* Read on eventfd will clear its level triggering. */ 1539 rc = read(busy_efd, ¬ify, sizeof(notify)); 1540 } 1541 } 1542 1543 #else 1544 1545 static int 1546 period_poller_interrupt_init(struct spdk_poller *poller) 1547 { 1548 return -ENOTSUP; 1549 } 1550 1551 static void 1552 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1553 { 1554 } 1555 1556 static void 1557 poller_interrupt_fini(struct spdk_poller *poller) 1558 { 1559 } 1560 1561 static int 1562 busy_poller_interrupt_init(struct spdk_poller *poller) 1563 { 1564 return -ENOTSUP; 1565 } 1566 1567 static void 1568 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1569 { 1570 } 1571 1572 #endif 1573 1574 void 1575 spdk_poller_register_interrupt(struct spdk_poller *poller, 1576 spdk_poller_set_interrupt_mode_cb cb_fn, 1577 void *cb_arg) 1578 { 1579 assert(poller != NULL); 1580 assert(cb_fn != NULL); 1581 assert(spdk_get_thread() == poller->thread); 1582 1583 if (!spdk_interrupt_mode_is_enabled()) { 1584 return; 1585 } 1586 1587 /* If this poller already had an interrupt, clean the old one up. */ 1588 if (poller->intr != NULL) { 1589 poller_interrupt_fini(poller); 1590 } 1591 1592 poller->set_intr_cb_fn = cb_fn; 1593 poller->set_intr_cb_arg = cb_arg; 1594 1595 /* Set poller into interrupt mode if thread is in interrupt. */ 1596 if (poller->thread->in_interrupt) { 1597 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1598 } 1599 } 1600 1601 static uint64_t 1602 convert_us_to_ticks(uint64_t us) 1603 { 1604 uint64_t quotient, remainder, ticks; 1605 1606 if (us) { 1607 quotient = us / SPDK_SEC_TO_USEC; 1608 remainder = us % SPDK_SEC_TO_USEC; 1609 ticks = spdk_get_ticks_hz(); 1610 1611 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1612 } else { 1613 return 0; 1614 } 1615 } 1616 1617 static struct spdk_poller * 1618 poller_register(spdk_poller_fn fn, 1619 void *arg, 1620 uint64_t period_microseconds, 1621 const char *name) 1622 { 1623 struct spdk_thread *thread; 1624 struct spdk_poller *poller; 1625 1626 thread = spdk_get_thread(); 1627 if (!thread) { 1628 assert(false); 1629 return NULL; 1630 } 1631 1632 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1633 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1634 return NULL; 1635 } 1636 1637 poller = calloc(1, sizeof(*poller)); 1638 if (poller == NULL) { 1639 SPDK_ERRLOG("Poller memory allocation failed\n"); 1640 return NULL; 1641 } 1642 1643 if (name) { 1644 snprintf(poller->name, sizeof(poller->name), "%s", name); 1645 } else { 1646 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1647 } 1648 1649 poller->state = SPDK_POLLER_STATE_WAITING; 1650 poller->fn = fn; 1651 poller->arg = arg; 1652 poller->thread = thread; 1653 poller->intr = NULL; 1654 if (thread->next_poller_id == 0) { 1655 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1656 thread->next_poller_id = 1; 1657 } 1658 poller->id = thread->next_poller_id++; 1659 1660 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1661 1662 if (spdk_interrupt_mode_is_enabled()) { 1663 int rc; 1664 1665 if (period_microseconds) { 1666 rc = period_poller_interrupt_init(poller); 1667 if (rc < 0) { 1668 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1669 free(poller); 1670 return NULL; 1671 } 1672 1673 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1674 poller->set_intr_cb_arg = NULL; 1675 1676 } else { 1677 /* If the poller doesn't have a period, create interruptfd that's always 1678 * busy automatically when running in interrupt mode. 1679 */ 1680 rc = busy_poller_interrupt_init(poller); 1681 if (rc > 0) { 1682 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1683 free(poller); 1684 return NULL; 1685 } 1686 1687 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1688 poller->set_intr_cb_arg = NULL; 1689 } 1690 1691 /* Set poller into interrupt mode if thread is in interrupt. */ 1692 if (poller->thread->in_interrupt) { 1693 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1694 } 1695 } 1696 1697 thread_insert_poller(thread, poller); 1698 1699 return poller; 1700 } 1701 1702 struct spdk_poller * 1703 spdk_poller_register(spdk_poller_fn fn, 1704 void *arg, 1705 uint64_t period_microseconds) 1706 { 1707 return poller_register(fn, arg, period_microseconds, NULL); 1708 } 1709 1710 struct spdk_poller * 1711 spdk_poller_register_named(spdk_poller_fn fn, 1712 void *arg, 1713 uint64_t period_microseconds, 1714 const char *name) 1715 { 1716 return poller_register(fn, arg, period_microseconds, name); 1717 } 1718 1719 static void 1720 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1721 struct spdk_thread *curthread) 1722 { 1723 if (thread == NULL) { 1724 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1725 abort(); 1726 } 1727 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1728 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1729 thread->name, thread->id); 1730 assert(false); 1731 } 1732 1733 void 1734 spdk_poller_unregister(struct spdk_poller **ppoller) 1735 { 1736 struct spdk_thread *thread; 1737 struct spdk_poller *poller; 1738 1739 poller = *ppoller; 1740 if (poller == NULL) { 1741 return; 1742 } 1743 1744 *ppoller = NULL; 1745 1746 thread = spdk_get_thread(); 1747 if (!thread) { 1748 assert(false); 1749 return; 1750 } 1751 1752 if (poller->thread != thread) { 1753 wrong_thread(__func__, poller->name, poller->thread, thread); 1754 return; 1755 } 1756 1757 if (spdk_interrupt_mode_is_enabled()) { 1758 /* Release the interrupt resource for period or busy poller */ 1759 if (poller->intr != NULL) { 1760 poller_interrupt_fini(poller); 1761 } 1762 1763 /* If there is not already a pending poller removal, generate 1764 * a message to go process removals. */ 1765 if (!thread->poller_unregistered) { 1766 thread->poller_unregistered = true; 1767 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1768 } 1769 } 1770 1771 /* If the poller was paused, put it on the active_pollers list so that 1772 * its unregistration can be processed by spdk_thread_poll(). 1773 */ 1774 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1775 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1776 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1777 poller->period_ticks = 0; 1778 } 1779 1780 /* Simply set the state to unregistered. The poller will get cleaned up 1781 * in a subsequent call to spdk_thread_poll(). 1782 */ 1783 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1784 } 1785 1786 void 1787 spdk_poller_pause(struct spdk_poller *poller) 1788 { 1789 struct spdk_thread *thread; 1790 1791 thread = spdk_get_thread(); 1792 if (!thread) { 1793 assert(false); 1794 return; 1795 } 1796 1797 if (poller->thread != thread) { 1798 wrong_thread(__func__, poller->name, poller->thread, thread); 1799 return; 1800 } 1801 1802 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1803 * spdk_thread_poll() move it. It allows a poller to be paused from 1804 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1805 * iteration, or from within itself without breaking the logic to always 1806 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1807 */ 1808 switch (poller->state) { 1809 case SPDK_POLLER_STATE_PAUSED: 1810 case SPDK_POLLER_STATE_PAUSING: 1811 break; 1812 case SPDK_POLLER_STATE_RUNNING: 1813 case SPDK_POLLER_STATE_WAITING: 1814 poller->state = SPDK_POLLER_STATE_PAUSING; 1815 break; 1816 default: 1817 assert(false); 1818 break; 1819 } 1820 } 1821 1822 void 1823 spdk_poller_resume(struct spdk_poller *poller) 1824 { 1825 struct spdk_thread *thread; 1826 1827 thread = spdk_get_thread(); 1828 if (!thread) { 1829 assert(false); 1830 return; 1831 } 1832 1833 if (poller->thread != thread) { 1834 wrong_thread(__func__, poller->name, poller->thread, thread); 1835 return; 1836 } 1837 1838 /* If a poller is paused it has to be removed from the paused pollers 1839 * list and put on the active list or timer tree depending on its 1840 * period_ticks. If a poller is still in the process of being paused, 1841 * we just need to flip its state back to waiting, as it's already on 1842 * the appropriate list or tree. 1843 */ 1844 switch (poller->state) { 1845 case SPDK_POLLER_STATE_PAUSED: 1846 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1847 thread_insert_poller(thread, poller); 1848 /* fallthrough */ 1849 case SPDK_POLLER_STATE_PAUSING: 1850 poller->state = SPDK_POLLER_STATE_WAITING; 1851 break; 1852 case SPDK_POLLER_STATE_RUNNING: 1853 case SPDK_POLLER_STATE_WAITING: 1854 break; 1855 default: 1856 assert(false); 1857 break; 1858 } 1859 } 1860 1861 const char * 1862 spdk_poller_get_name(struct spdk_poller *poller) 1863 { 1864 return poller->name; 1865 } 1866 1867 uint64_t 1868 spdk_poller_get_id(struct spdk_poller *poller) 1869 { 1870 return poller->id; 1871 } 1872 1873 const char * 1874 spdk_poller_get_state_str(struct spdk_poller *poller) 1875 { 1876 switch (poller->state) { 1877 case SPDK_POLLER_STATE_WAITING: 1878 return "waiting"; 1879 case SPDK_POLLER_STATE_RUNNING: 1880 return "running"; 1881 case SPDK_POLLER_STATE_UNREGISTERED: 1882 return "unregistered"; 1883 case SPDK_POLLER_STATE_PAUSING: 1884 return "pausing"; 1885 case SPDK_POLLER_STATE_PAUSED: 1886 return "paused"; 1887 default: 1888 return NULL; 1889 } 1890 } 1891 1892 uint64_t 1893 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1894 { 1895 return poller->period_ticks; 1896 } 1897 1898 void 1899 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1900 { 1901 stats->run_count = poller->run_count; 1902 stats->busy_count = poller->busy_count; 1903 } 1904 1905 struct spdk_poller * 1906 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1907 { 1908 return TAILQ_FIRST(&thread->active_pollers); 1909 } 1910 1911 struct spdk_poller * 1912 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1913 { 1914 return TAILQ_NEXT(prev, tailq); 1915 } 1916 1917 struct spdk_poller * 1918 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1919 { 1920 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1921 } 1922 1923 struct spdk_poller * 1924 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1925 { 1926 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1927 } 1928 1929 struct spdk_poller * 1930 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1931 { 1932 return TAILQ_FIRST(&thread->paused_pollers); 1933 } 1934 1935 struct spdk_poller * 1936 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1937 { 1938 return TAILQ_NEXT(prev, tailq); 1939 } 1940 1941 struct spdk_io_channel * 1942 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1943 { 1944 return RB_MIN(io_channel_tree, &thread->io_channels); 1945 } 1946 1947 struct spdk_io_channel * 1948 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1949 { 1950 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1951 } 1952 1953 struct call_thread { 1954 struct spdk_thread *cur_thread; 1955 spdk_msg_fn fn; 1956 void *ctx; 1957 1958 struct spdk_thread *orig_thread; 1959 spdk_msg_fn cpl; 1960 }; 1961 1962 static void 1963 _on_thread(void *ctx) 1964 { 1965 struct call_thread *ct = ctx; 1966 int rc __attribute__((unused)); 1967 1968 ct->fn(ct->ctx); 1969 1970 pthread_mutex_lock(&g_devlist_mutex); 1971 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1972 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1973 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1974 ct->cur_thread->name); 1975 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1976 } 1977 pthread_mutex_unlock(&g_devlist_mutex); 1978 1979 if (!ct->cur_thread) { 1980 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1981 1982 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1983 free(ctx); 1984 } else { 1985 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1986 ct->cur_thread->name); 1987 1988 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1989 } 1990 assert(rc == 0); 1991 } 1992 1993 void 1994 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1995 { 1996 struct call_thread *ct; 1997 struct spdk_thread *thread; 1998 int rc __attribute__((unused)); 1999 2000 ct = calloc(1, sizeof(*ct)); 2001 if (!ct) { 2002 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2003 cpl(ctx); 2004 return; 2005 } 2006 2007 ct->fn = fn; 2008 ct->ctx = ctx; 2009 ct->cpl = cpl; 2010 2011 thread = _get_thread(); 2012 if (!thread) { 2013 SPDK_ERRLOG("No thread allocated\n"); 2014 free(ct); 2015 cpl(ctx); 2016 return; 2017 } 2018 ct->orig_thread = thread; 2019 2020 pthread_mutex_lock(&g_devlist_mutex); 2021 ct->cur_thread = TAILQ_FIRST(&g_threads); 2022 pthread_mutex_unlock(&g_devlist_mutex); 2023 2024 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2025 ct->orig_thread->name); 2026 2027 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2028 assert(rc == 0); 2029 } 2030 2031 static inline void 2032 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2033 { 2034 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2035 return; 2036 } 2037 2038 if (!poller->set_intr_cb_fn) { 2039 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2040 assert(false); 2041 return; 2042 } 2043 2044 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2045 } 2046 2047 void 2048 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2049 { 2050 struct spdk_thread *thread = _get_thread(); 2051 struct spdk_poller *poller, *tmp; 2052 2053 assert(thread); 2054 assert(spdk_interrupt_mode_is_enabled()); 2055 2056 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2057 thread->name, enable_interrupt ? "intr" : "poll", 2058 thread->in_interrupt ? "intr" : "poll"); 2059 2060 if (thread->in_interrupt == enable_interrupt) { 2061 return; 2062 } 2063 2064 /* Set pollers to expected mode */ 2065 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2066 poller_set_interrupt_mode(poller, enable_interrupt); 2067 } 2068 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2069 poller_set_interrupt_mode(poller, enable_interrupt); 2070 } 2071 /* All paused pollers will go to work in interrupt mode */ 2072 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2073 poller_set_interrupt_mode(poller, enable_interrupt); 2074 } 2075 2076 thread->in_interrupt = enable_interrupt; 2077 return; 2078 } 2079 2080 static struct io_device * 2081 io_device_get(void *io_device) 2082 { 2083 struct io_device find = {}; 2084 2085 find.io_device = io_device; 2086 return RB_FIND(io_device_tree, &g_io_devices, &find); 2087 } 2088 2089 void 2090 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2091 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2092 const char *name) 2093 { 2094 struct io_device *dev, *tmp; 2095 struct spdk_thread *thread; 2096 2097 assert(io_device != NULL); 2098 assert(create_cb != NULL); 2099 assert(destroy_cb != NULL); 2100 2101 thread = spdk_get_thread(); 2102 if (!thread) { 2103 SPDK_ERRLOG("called from non-SPDK thread\n"); 2104 assert(false); 2105 return; 2106 } 2107 2108 dev = calloc(1, sizeof(struct io_device)); 2109 if (dev == NULL) { 2110 SPDK_ERRLOG("could not allocate io_device\n"); 2111 return; 2112 } 2113 2114 dev->io_device = io_device; 2115 if (name) { 2116 snprintf(dev->name, sizeof(dev->name), "%s", name); 2117 } else { 2118 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2119 } 2120 dev->create_cb = create_cb; 2121 dev->destroy_cb = destroy_cb; 2122 dev->unregister_cb = NULL; 2123 dev->ctx_size = ctx_size; 2124 dev->for_each_count = 0; 2125 dev->unregistered = false; 2126 dev->refcnt = 0; 2127 2128 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2129 dev->name, dev->io_device, thread->name); 2130 2131 pthread_mutex_lock(&g_devlist_mutex); 2132 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2133 if (tmp != NULL) { 2134 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2135 io_device, tmp->name, dev->name); 2136 free(dev); 2137 } 2138 2139 pthread_mutex_unlock(&g_devlist_mutex); 2140 } 2141 2142 static void 2143 _finish_unregister(void *arg) 2144 { 2145 struct io_device *dev = arg; 2146 struct spdk_thread *thread; 2147 2148 thread = spdk_get_thread(); 2149 assert(thread == dev->unregister_thread); 2150 2151 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2152 dev->name, dev->io_device, thread->name); 2153 2154 assert(thread->pending_unregister_count > 0); 2155 thread->pending_unregister_count--; 2156 2157 dev->unregister_cb(dev->io_device); 2158 free(dev); 2159 } 2160 2161 static void 2162 io_device_free(struct io_device *dev) 2163 { 2164 int rc __attribute__((unused)); 2165 2166 if (dev->unregister_cb == NULL) { 2167 free(dev); 2168 } else { 2169 assert(dev->unregister_thread != NULL); 2170 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2171 dev->name, dev->io_device, dev->unregister_thread->name); 2172 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2173 assert(rc == 0); 2174 } 2175 } 2176 2177 void 2178 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2179 { 2180 struct io_device *dev; 2181 uint32_t refcnt; 2182 struct spdk_thread *thread; 2183 2184 thread = spdk_get_thread(); 2185 if (!thread) { 2186 SPDK_ERRLOG("called from non-SPDK thread\n"); 2187 assert(false); 2188 return; 2189 } 2190 2191 pthread_mutex_lock(&g_devlist_mutex); 2192 dev = io_device_get(io_device); 2193 if (!dev) { 2194 SPDK_ERRLOG("io_device %p not found\n", io_device); 2195 assert(false); 2196 pthread_mutex_unlock(&g_devlist_mutex); 2197 return; 2198 } 2199 2200 /* The for_each_count check differentiates the user attempting to unregister the 2201 * device a second time, from the internal call to this function that occurs 2202 * after the for_each_count reaches 0. 2203 */ 2204 if (dev->pending_unregister && dev->for_each_count > 0) { 2205 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2206 assert(false); 2207 pthread_mutex_unlock(&g_devlist_mutex); 2208 return; 2209 } 2210 2211 dev->unregister_cb = unregister_cb; 2212 dev->unregister_thread = thread; 2213 2214 if (dev->for_each_count > 0) { 2215 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2216 dev->name, io_device, dev->for_each_count); 2217 dev->pending_unregister = true; 2218 pthread_mutex_unlock(&g_devlist_mutex); 2219 return; 2220 } 2221 2222 dev->unregistered = true; 2223 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2224 refcnt = dev->refcnt; 2225 pthread_mutex_unlock(&g_devlist_mutex); 2226 2227 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2228 dev->name, dev->io_device, thread->name); 2229 2230 if (unregister_cb) { 2231 thread->pending_unregister_count++; 2232 } 2233 2234 if (refcnt > 0) { 2235 /* defer deletion */ 2236 return; 2237 } 2238 2239 io_device_free(dev); 2240 } 2241 2242 const char * 2243 spdk_io_device_get_name(struct io_device *dev) 2244 { 2245 return dev->name; 2246 } 2247 2248 static struct spdk_io_channel * 2249 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2250 { 2251 struct spdk_io_channel find = {}; 2252 2253 find.dev = dev; 2254 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2255 } 2256 2257 struct spdk_io_channel * 2258 spdk_get_io_channel(void *io_device) 2259 { 2260 struct spdk_io_channel *ch; 2261 struct spdk_thread *thread; 2262 struct io_device *dev; 2263 int rc; 2264 2265 pthread_mutex_lock(&g_devlist_mutex); 2266 dev = io_device_get(io_device); 2267 if (dev == NULL) { 2268 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2269 pthread_mutex_unlock(&g_devlist_mutex); 2270 return NULL; 2271 } 2272 2273 thread = _get_thread(); 2274 if (!thread) { 2275 SPDK_ERRLOG("No thread allocated\n"); 2276 pthread_mutex_unlock(&g_devlist_mutex); 2277 return NULL; 2278 } 2279 2280 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2281 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2282 pthread_mutex_unlock(&g_devlist_mutex); 2283 return NULL; 2284 } 2285 2286 ch = thread_get_io_channel(thread, dev); 2287 if (ch != NULL) { 2288 ch->ref++; 2289 2290 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2291 ch, dev->name, dev->io_device, thread->name, ch->ref); 2292 2293 /* 2294 * An I/O channel already exists for this device on this 2295 * thread, so return it. 2296 */ 2297 pthread_mutex_unlock(&g_devlist_mutex); 2298 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2299 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2300 return ch; 2301 } 2302 2303 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2304 if (ch == NULL) { 2305 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2306 pthread_mutex_unlock(&g_devlist_mutex); 2307 return NULL; 2308 } 2309 2310 ch->dev = dev; 2311 ch->destroy_cb = dev->destroy_cb; 2312 ch->thread = thread; 2313 ch->ref = 1; 2314 ch->destroy_ref = 0; 2315 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2316 2317 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2318 ch, dev->name, dev->io_device, thread->name, ch->ref); 2319 2320 dev->refcnt++; 2321 2322 pthread_mutex_unlock(&g_devlist_mutex); 2323 2324 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2325 if (rc != 0) { 2326 pthread_mutex_lock(&g_devlist_mutex); 2327 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2328 dev->refcnt--; 2329 free(ch); 2330 pthread_mutex_unlock(&g_devlist_mutex); 2331 return NULL; 2332 } 2333 2334 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2335 return ch; 2336 } 2337 2338 static void 2339 put_io_channel(void *arg) 2340 { 2341 struct spdk_io_channel *ch = arg; 2342 bool do_remove_dev = true; 2343 struct spdk_thread *thread; 2344 2345 thread = spdk_get_thread(); 2346 if (!thread) { 2347 SPDK_ERRLOG("called from non-SPDK thread\n"); 2348 assert(false); 2349 return; 2350 } 2351 2352 SPDK_DEBUGLOG(thread, 2353 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2354 ch, ch->dev->name, ch->dev->io_device, thread->name); 2355 2356 assert(ch->thread == thread); 2357 2358 ch->destroy_ref--; 2359 2360 if (ch->ref > 0 || ch->destroy_ref > 0) { 2361 /* 2362 * Another reference to the associated io_device was requested 2363 * after this message was sent but before it had a chance to 2364 * execute. 2365 */ 2366 return; 2367 } 2368 2369 pthread_mutex_lock(&g_devlist_mutex); 2370 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2371 pthread_mutex_unlock(&g_devlist_mutex); 2372 2373 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2374 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2375 2376 pthread_mutex_lock(&g_devlist_mutex); 2377 ch->dev->refcnt--; 2378 2379 if (!ch->dev->unregistered) { 2380 do_remove_dev = false; 2381 } 2382 2383 if (ch->dev->refcnt > 0) { 2384 do_remove_dev = false; 2385 } 2386 2387 pthread_mutex_unlock(&g_devlist_mutex); 2388 2389 if (do_remove_dev) { 2390 io_device_free(ch->dev); 2391 } 2392 free(ch); 2393 } 2394 2395 void 2396 spdk_put_io_channel(struct spdk_io_channel *ch) 2397 { 2398 struct spdk_thread *thread; 2399 int rc __attribute__((unused)); 2400 2401 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2402 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2403 2404 thread = spdk_get_thread(); 2405 if (!thread) { 2406 SPDK_ERRLOG("called from non-SPDK thread\n"); 2407 assert(false); 2408 return; 2409 } 2410 2411 if (ch->thread != thread) { 2412 wrong_thread(__func__, "ch", ch->thread, thread); 2413 return; 2414 } 2415 2416 SPDK_DEBUGLOG(thread, 2417 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2418 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2419 2420 ch->ref--; 2421 2422 if (ch->ref == 0) { 2423 ch->destroy_ref++; 2424 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2425 assert(rc == 0); 2426 } 2427 } 2428 2429 struct spdk_io_channel * 2430 spdk_io_channel_from_ctx(void *ctx) 2431 { 2432 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2433 } 2434 2435 struct spdk_thread * 2436 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2437 { 2438 return ch->thread; 2439 } 2440 2441 void * 2442 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2443 { 2444 return ch->dev->io_device; 2445 } 2446 2447 const char * 2448 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2449 { 2450 return spdk_io_device_get_name(ch->dev); 2451 } 2452 2453 int 2454 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2455 { 2456 return ch->ref; 2457 } 2458 2459 struct spdk_io_channel_iter { 2460 void *io_device; 2461 struct io_device *dev; 2462 spdk_channel_msg fn; 2463 int status; 2464 void *ctx; 2465 struct spdk_io_channel *ch; 2466 2467 struct spdk_thread *cur_thread; 2468 2469 struct spdk_thread *orig_thread; 2470 spdk_channel_for_each_cpl cpl; 2471 }; 2472 2473 void * 2474 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2475 { 2476 return i->io_device; 2477 } 2478 2479 struct spdk_io_channel * 2480 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2481 { 2482 return i->ch; 2483 } 2484 2485 void * 2486 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2487 { 2488 return i->ctx; 2489 } 2490 2491 static void 2492 _call_completion(void *ctx) 2493 { 2494 struct spdk_io_channel_iter *i = ctx; 2495 2496 if (i->cpl != NULL) { 2497 i->cpl(i, i->status); 2498 } 2499 free(i); 2500 } 2501 2502 static void 2503 _call_channel(void *ctx) 2504 { 2505 struct spdk_io_channel_iter *i = ctx; 2506 struct spdk_io_channel *ch; 2507 2508 /* 2509 * It is possible that the channel was deleted before this 2510 * message had a chance to execute. If so, skip calling 2511 * the fn() on this thread. 2512 */ 2513 pthread_mutex_lock(&g_devlist_mutex); 2514 ch = thread_get_io_channel(i->cur_thread, i->dev); 2515 pthread_mutex_unlock(&g_devlist_mutex); 2516 2517 if (ch) { 2518 i->fn(i); 2519 } else { 2520 spdk_for_each_channel_continue(i, 0); 2521 } 2522 } 2523 2524 void 2525 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2526 spdk_channel_for_each_cpl cpl) 2527 { 2528 struct spdk_thread *thread; 2529 struct spdk_io_channel *ch; 2530 struct spdk_io_channel_iter *i; 2531 int rc __attribute__((unused)); 2532 2533 i = calloc(1, sizeof(*i)); 2534 if (!i) { 2535 SPDK_ERRLOG("Unable to allocate iterator\n"); 2536 assert(false); 2537 return; 2538 } 2539 2540 i->io_device = io_device; 2541 i->fn = fn; 2542 i->ctx = ctx; 2543 i->cpl = cpl; 2544 i->orig_thread = _get_thread(); 2545 2546 pthread_mutex_lock(&g_devlist_mutex); 2547 i->dev = io_device_get(io_device); 2548 if (i->dev == NULL) { 2549 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2550 assert(false); 2551 i->status = -ENODEV; 2552 goto end; 2553 } 2554 2555 /* Do not allow new for_each operations if we are already waiting to unregister 2556 * the device for other for_each operations to complete. 2557 */ 2558 if (i->dev->pending_unregister) { 2559 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2560 i->status = -ENODEV; 2561 goto end; 2562 } 2563 2564 TAILQ_FOREACH(thread, &g_threads, tailq) { 2565 ch = thread_get_io_channel(thread, i->dev); 2566 if (ch != NULL) { 2567 ch->dev->for_each_count++; 2568 i->cur_thread = thread; 2569 i->ch = ch; 2570 pthread_mutex_unlock(&g_devlist_mutex); 2571 rc = spdk_thread_send_msg(thread, _call_channel, i); 2572 assert(rc == 0); 2573 return; 2574 } 2575 } 2576 2577 end: 2578 pthread_mutex_unlock(&g_devlist_mutex); 2579 2580 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2581 assert(rc == 0); 2582 } 2583 2584 static void 2585 __pending_unregister(void *arg) 2586 { 2587 struct io_device *dev = arg; 2588 2589 assert(dev->pending_unregister); 2590 assert(dev->for_each_count == 0); 2591 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2592 } 2593 2594 void 2595 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2596 { 2597 struct spdk_thread *thread; 2598 struct spdk_io_channel *ch; 2599 struct io_device *dev; 2600 int rc __attribute__((unused)); 2601 2602 assert(i->cur_thread == spdk_get_thread()); 2603 2604 i->status = status; 2605 2606 pthread_mutex_lock(&g_devlist_mutex); 2607 dev = i->dev; 2608 if (status) { 2609 goto end; 2610 } 2611 2612 thread = TAILQ_NEXT(i->cur_thread, tailq); 2613 while (thread) { 2614 ch = thread_get_io_channel(thread, dev); 2615 if (ch != NULL) { 2616 i->cur_thread = thread; 2617 i->ch = ch; 2618 pthread_mutex_unlock(&g_devlist_mutex); 2619 rc = spdk_thread_send_msg(thread, _call_channel, i); 2620 assert(rc == 0); 2621 return; 2622 } 2623 thread = TAILQ_NEXT(thread, tailq); 2624 } 2625 2626 end: 2627 dev->for_each_count--; 2628 i->ch = NULL; 2629 pthread_mutex_unlock(&g_devlist_mutex); 2630 2631 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2632 assert(rc == 0); 2633 2634 pthread_mutex_lock(&g_devlist_mutex); 2635 if (dev->pending_unregister && dev->for_each_count == 0) { 2636 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2637 assert(rc == 0); 2638 } 2639 pthread_mutex_unlock(&g_devlist_mutex); 2640 } 2641 2642 static void 2643 thread_interrupt_destroy(struct spdk_thread *thread) 2644 { 2645 struct spdk_fd_group *fgrp = thread->fgrp; 2646 2647 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2648 2649 if (thread->msg_fd < 0) { 2650 return; 2651 } 2652 2653 spdk_fd_group_remove(fgrp, thread->msg_fd); 2654 close(thread->msg_fd); 2655 thread->msg_fd = -1; 2656 2657 spdk_fd_group_destroy(fgrp); 2658 thread->fgrp = NULL; 2659 } 2660 2661 #ifdef __linux__ 2662 static int 2663 thread_interrupt_msg_process(void *arg) 2664 { 2665 struct spdk_thread *thread = arg; 2666 struct spdk_thread *orig_thread; 2667 uint32_t msg_count; 2668 spdk_msg_fn critical_msg; 2669 int rc = 0; 2670 uint64_t notify = 1; 2671 2672 assert(spdk_interrupt_mode_is_enabled()); 2673 2674 orig_thread = spdk_get_thread(); 2675 spdk_set_thread(thread); 2676 2677 /* There may be race between msg_acknowledge and another producer's msg_notify, 2678 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2679 * This can avoid msg notification missing. 2680 */ 2681 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2682 if (rc < 0 && errno != EAGAIN) { 2683 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2684 } 2685 2686 critical_msg = thread->critical_msg; 2687 if (spdk_unlikely(critical_msg != NULL)) { 2688 critical_msg(NULL); 2689 thread->critical_msg = NULL; 2690 rc = 1; 2691 } 2692 2693 msg_count = msg_queue_run_batch(thread, 0); 2694 if (msg_count) { 2695 rc = 1; 2696 } 2697 2698 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2699 if (spdk_unlikely(!thread->in_interrupt)) { 2700 /* The thread transitioned to poll mode in a msg during the above processing. 2701 * Clear msg_fd since thread messages will be polled directly in poll mode. 2702 */ 2703 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2704 if (rc < 0 && errno != EAGAIN) { 2705 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2706 } 2707 } 2708 2709 spdk_set_thread(orig_thread); 2710 return rc; 2711 } 2712 2713 static int 2714 thread_interrupt_create(struct spdk_thread *thread) 2715 { 2716 int rc; 2717 2718 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2719 2720 rc = spdk_fd_group_create(&thread->fgrp); 2721 if (rc) { 2722 return rc; 2723 } 2724 2725 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2726 if (thread->msg_fd < 0) { 2727 rc = -errno; 2728 spdk_fd_group_destroy(thread->fgrp); 2729 thread->fgrp = NULL; 2730 2731 return rc; 2732 } 2733 2734 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2735 thread_interrupt_msg_process, thread); 2736 } 2737 #else 2738 static int 2739 thread_interrupt_create(struct spdk_thread *thread) 2740 { 2741 return -ENOTSUP; 2742 } 2743 #endif 2744 2745 static int 2746 _interrupt_wrapper(void *ctx) 2747 { 2748 struct spdk_interrupt *intr = ctx; 2749 struct spdk_thread *orig_thread, *thread; 2750 int rc; 2751 2752 orig_thread = spdk_get_thread(); 2753 thread = intr->thread; 2754 2755 spdk_set_thread(thread); 2756 2757 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2758 intr->fn, intr->arg); 2759 2760 rc = intr->fn(intr->arg); 2761 2762 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2763 2764 spdk_set_thread(orig_thread); 2765 2766 return rc; 2767 } 2768 2769 struct spdk_interrupt * 2770 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2771 void *arg, const char *name) 2772 { 2773 struct spdk_thread *thread; 2774 struct spdk_interrupt *intr; 2775 int ret; 2776 2777 thread = spdk_get_thread(); 2778 if (!thread) { 2779 assert(false); 2780 return NULL; 2781 } 2782 2783 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2784 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2785 return NULL; 2786 } 2787 2788 intr = calloc(1, sizeof(*intr)); 2789 if (intr == NULL) { 2790 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2791 return NULL; 2792 } 2793 2794 if (name) { 2795 snprintf(intr->name, sizeof(intr->name), "%s", name); 2796 } else { 2797 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2798 } 2799 2800 intr->efd = efd; 2801 intr->thread = thread; 2802 intr->fn = fn; 2803 intr->arg = arg; 2804 2805 ret = spdk_fd_group_add(thread->fgrp, efd, _interrupt_wrapper, intr, intr->name); 2806 2807 if (ret != 0) { 2808 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2809 thread->name, efd, spdk_strerror(-ret)); 2810 free(intr); 2811 return NULL; 2812 } 2813 2814 return intr; 2815 } 2816 2817 void 2818 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2819 { 2820 struct spdk_thread *thread; 2821 struct spdk_interrupt *intr; 2822 2823 intr = *pintr; 2824 if (intr == NULL) { 2825 return; 2826 } 2827 2828 *pintr = NULL; 2829 2830 thread = spdk_get_thread(); 2831 if (!thread) { 2832 assert(false); 2833 return; 2834 } 2835 2836 if (intr->thread != thread) { 2837 wrong_thread(__func__, intr->name, intr->thread, thread); 2838 return; 2839 } 2840 2841 spdk_fd_group_remove(thread->fgrp, intr->efd); 2842 free(intr); 2843 } 2844 2845 int 2846 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2847 enum spdk_interrupt_event_types event_types) 2848 { 2849 struct spdk_thread *thread; 2850 2851 thread = spdk_get_thread(); 2852 if (!thread) { 2853 assert(false); 2854 return -EINVAL; 2855 } 2856 2857 if (intr->thread != thread) { 2858 wrong_thread(__func__, intr->name, intr->thread, thread); 2859 return -EINVAL; 2860 } 2861 2862 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2863 } 2864 2865 int 2866 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2867 { 2868 return spdk_fd_group_get_fd(thread->fgrp); 2869 } 2870 2871 struct spdk_fd_group * 2872 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2873 { 2874 return thread->fgrp; 2875 } 2876 2877 static bool g_interrupt_mode = false; 2878 2879 int 2880 spdk_interrupt_mode_enable(void) 2881 { 2882 /* It must be called once prior to initializing the threading library. 2883 * g_spdk_msg_mempool will be valid if thread library is initialized. 2884 */ 2885 if (g_spdk_msg_mempool) { 2886 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2887 return -1; 2888 } 2889 2890 #ifdef __linux__ 2891 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2892 g_interrupt_mode = true; 2893 return 0; 2894 #else 2895 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2896 g_interrupt_mode = false; 2897 return -ENOTSUP; 2898 #endif 2899 } 2900 2901 bool 2902 spdk_interrupt_mode_is_enabled(void) 2903 { 2904 return g_interrupt_mode; 2905 } 2906 2907 #define SSPIN_DEBUG_STACK_FRAMES 16 2908 2909 struct sspin_stack { 2910 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 2911 uint32_t depth; 2912 }; 2913 2914 struct spdk_spinlock_internal { 2915 struct sspin_stack init_stack; 2916 struct sspin_stack lock_stack; 2917 struct sspin_stack unlock_stack; 2918 }; 2919 2920 static void 2921 sspin_init_internal(struct spdk_spinlock *sspin) 2922 { 2923 #ifdef DEBUG 2924 sspin->internal = calloc(1, sizeof(*sspin->internal)); 2925 #endif 2926 } 2927 2928 static void 2929 sspin_fini_internal(struct spdk_spinlock *sspin) 2930 { 2931 #ifdef DEBUG 2932 free(sspin->internal); 2933 sspin->internal = NULL; 2934 #endif 2935 } 2936 2937 #ifdef DEBUG 2938 #define SSPIN_GET_STACK(sspin, which) \ 2939 do { \ 2940 if (sspin->internal != NULL) { \ 2941 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 2942 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 2943 } \ 2944 } while (0) 2945 #else 2946 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 2947 #endif 2948 2949 static void 2950 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 2951 { 2952 char **stack; 2953 size_t i; 2954 2955 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 2956 if (stack == NULL) { 2957 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 2958 return; 2959 } 2960 SPDK_ERRLOG(" %s:\n", title); 2961 for (i = 0; i < sspin_stack->depth; i++) { 2962 /* 2963 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 2964 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 2965 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 2966 * gdb_macros.md in the docs directory for details. 2967 */ 2968 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 2969 } 2970 free(stack); 2971 } 2972 2973 static void 2974 sspin_stacks_print(const struct spdk_spinlock *sspin) 2975 { 2976 if (sspin->internal == NULL) { 2977 return; 2978 } 2979 SPDK_ERRLOG("spinlock %p\n", sspin); 2980 sspin_stack_print("Lock initalized at", &sspin->internal->init_stack); 2981 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 2982 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 2983 } 2984 2985 void 2986 spdk_spin_init(struct spdk_spinlock *sspin) 2987 { 2988 int rc; 2989 2990 memset(sspin, 0, sizeof(*sspin)); 2991 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 2992 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 2993 sspin_init_internal(sspin); 2994 SSPIN_GET_STACK(sspin, init); 2995 sspin->initialized = true; 2996 } 2997 2998 void 2999 spdk_spin_destroy(struct spdk_spinlock *sspin) 3000 { 3001 int rc; 3002 3003 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3004 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3005 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 3006 3007 rc = pthread_spin_destroy(&sspin->spinlock); 3008 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3009 3010 sspin_fini_internal(sspin); 3011 sspin->initialized = false; 3012 sspin->destroyed = true; 3013 } 3014 3015 void 3016 spdk_spin_lock(struct spdk_spinlock *sspin) 3017 { 3018 struct spdk_thread *thread = spdk_get_thread(); 3019 int rc; 3020 3021 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3022 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3023 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3024 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3025 3026 rc = pthread_spin_lock(&sspin->spinlock); 3027 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3028 3029 sspin->thread = thread; 3030 sspin->thread->lock_count++; 3031 3032 SSPIN_GET_STACK(sspin, lock); 3033 } 3034 3035 void 3036 spdk_spin_unlock(struct spdk_spinlock *sspin) 3037 { 3038 struct spdk_thread *thread = spdk_get_thread(); 3039 int rc; 3040 3041 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3042 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3043 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3044 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3045 3046 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3047 thread->lock_count--; 3048 sspin->thread = NULL; 3049 3050 SSPIN_GET_STACK(sspin, unlock); 3051 3052 rc = pthread_spin_unlock(&sspin->spinlock); 3053 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3054 } 3055 3056 bool 3057 spdk_spin_held(struct spdk_spinlock *sspin) 3058 { 3059 struct spdk_thread *thread = spdk_get_thread(); 3060 3061 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3062 3063 return sspin->thread == thread; 3064 } 3065 3066 SPDK_LOG_REGISTER_COMPONENT(thread) 3067