1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #include <execinfo.h> 29 #endif 30 31 #ifdef __FreeBSD__ 32 #include <execinfo.h> 33 #endif 34 35 #define SPDK_MSG_BATCH_SIZE 8 36 #define SPDK_MAX_DEVICE_NAME_LEN 256 37 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 38 #define SPDK_MAX_POLLER_NAME_LEN 256 39 #define SPDK_MAX_THREAD_NAME_LEN 256 40 41 static struct spdk_thread *g_app_thread; 42 43 struct spdk_interrupt { 44 int efd; 45 struct spdk_thread *thread; 46 spdk_interrupt_fn fn; 47 void *arg; 48 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 49 }; 50 51 enum spdk_poller_state { 52 /* The poller is registered with a thread but not currently executing its fn. */ 53 SPDK_POLLER_STATE_WAITING, 54 55 /* The poller is currently running its fn. */ 56 SPDK_POLLER_STATE_RUNNING, 57 58 /* The poller was unregistered during the execution of its fn. */ 59 SPDK_POLLER_STATE_UNREGISTERED, 60 61 /* The poller is in the process of being paused. It will be paused 62 * during the next time it's supposed to be executed. 63 */ 64 SPDK_POLLER_STATE_PAUSING, 65 66 /* The poller is registered but currently paused. It's on the 67 * paused_pollers list. 68 */ 69 SPDK_POLLER_STATE_PAUSED, 70 }; 71 72 struct spdk_poller { 73 TAILQ_ENTRY(spdk_poller) tailq; 74 RB_ENTRY(spdk_poller) node; 75 76 /* Current state of the poller; should only be accessed from the poller's thread. */ 77 enum spdk_poller_state state; 78 79 uint64_t period_ticks; 80 uint64_t next_run_tick; 81 uint64_t run_count; 82 uint64_t busy_count; 83 uint64_t id; 84 spdk_poller_fn fn; 85 void *arg; 86 struct spdk_thread *thread; 87 struct spdk_interrupt *intr; 88 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 89 void *set_intr_cb_arg; 90 91 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 92 }; 93 94 enum spdk_thread_state { 95 /* The thread is processing poller and message by spdk_thread_poll(). */ 96 SPDK_THREAD_STATE_RUNNING, 97 98 /* The thread is in the process of termination. It reaps unregistering 99 * poller are releasing I/O channel. 100 */ 101 SPDK_THREAD_STATE_EXITING, 102 103 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 104 SPDK_THREAD_STATE_EXITED, 105 }; 106 107 struct spdk_thread { 108 uint64_t tsc_last; 109 struct spdk_thread_stats stats; 110 /* 111 * Contains pollers actively running on this thread. Pollers 112 * are run round-robin. The thread takes one poller from the head 113 * of the ring, executes it, then puts it back at the tail of 114 * the ring. 115 */ 116 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 117 /** 118 * Contains pollers running on this thread with a periodic timer. 119 */ 120 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 121 struct spdk_poller *first_timed_poller; 122 /* 123 * Contains paused pollers. Pollers on this queue are waiting until 124 * they are resumed (in which case they're put onto the active/timer 125 * queues) or unregistered. 126 */ 127 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 128 struct spdk_ring *messages; 129 int msg_fd; 130 SLIST_HEAD(, spdk_msg) msg_cache; 131 size_t msg_cache_count; 132 spdk_msg_fn critical_msg; 133 uint64_t id; 134 uint64_t next_poller_id; 135 enum spdk_thread_state state; 136 int pending_unregister_count; 137 138 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 139 TAILQ_ENTRY(spdk_thread) tailq; 140 141 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 142 struct spdk_cpuset cpumask; 143 uint64_t exit_timeout_tsc; 144 145 int32_t lock_count; 146 147 /* spdk_thread is bound to current CPU core. */ 148 bool is_bound; 149 150 /* Indicates whether this spdk_thread currently runs in interrupt. */ 151 bool in_interrupt; 152 bool poller_unregistered; 153 struct spdk_fd_group *fgrp; 154 155 /* User context allocated at the end */ 156 uint8_t ctx[0]; 157 }; 158 159 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 160 161 static spdk_new_thread_fn g_new_thread_fn = NULL; 162 static spdk_thread_op_fn g_thread_op_fn = NULL; 163 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 164 static size_t g_ctx_sz = 0; 165 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 166 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 167 * SPDK application is required. 168 */ 169 static uint64_t g_thread_id = 1; 170 171 enum spin_error { 172 SPIN_ERR_NONE, 173 /* Trying to use an SPDK lock while not on an SPDK thread */ 174 SPIN_ERR_NOT_SPDK_THREAD, 175 /* Trying to lock a lock already held by this SPDK thread */ 176 SPIN_ERR_DEADLOCK, 177 /* Trying to unlock a lock not held by this SPDK thread */ 178 SPIN_ERR_WRONG_THREAD, 179 /* pthread_spin_*() returned an error */ 180 SPIN_ERR_PTHREAD, 181 /* Trying to destroy a lock that is held */ 182 SPIN_ERR_LOCK_HELD, 183 /* lock_count is invalid */ 184 SPIN_ERR_LOCK_COUNT, 185 /* 186 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 187 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 188 * deadlock when another SPDK thread on the same pthread tries to take that lock. 189 */ 190 SPIN_ERR_HOLD_DURING_SWITCH, 191 /* Trying to use a lock that was destroyed (but not re-initialized) */ 192 SPIN_ERR_DESTROYED, 193 /* Trying to use a lock that is not initialized */ 194 SPIN_ERR_NOT_INITIALIZED, 195 196 /* Must be last, not an actual error code */ 197 SPIN_ERR_LAST 198 }; 199 200 static const char *spin_error_strings[] = { 201 [SPIN_ERR_NONE] = "No error", 202 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 203 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 204 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 205 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 206 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 207 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 208 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 209 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 210 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 211 }; 212 213 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 214 ? "Unknown error" : spin_error_strings[err] 215 216 static void 217 __posix_abort(enum spin_error err) 218 { 219 abort(); 220 } 221 222 typedef void (*spin_abort)(enum spin_error err); 223 spin_abort g_spin_abort_fn = __posix_abort; 224 225 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 226 do { \ 227 if (spdk_unlikely(!(cond))) { \ 228 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 229 SPIN_ERROR_STRING(err), #cond); \ 230 extra_log; \ 231 g_spin_abort_fn(err); \ 232 ret; \ 233 } \ 234 } while (0) 235 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 236 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 237 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 238 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 239 240 struct io_device { 241 void *io_device; 242 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 243 spdk_io_channel_create_cb create_cb; 244 spdk_io_channel_destroy_cb destroy_cb; 245 spdk_io_device_unregister_cb unregister_cb; 246 struct spdk_thread *unregister_thread; 247 uint32_t ctx_size; 248 uint32_t for_each_count; 249 RB_ENTRY(io_device) node; 250 251 uint32_t refcnt; 252 253 bool pending_unregister; 254 bool unregistered; 255 }; 256 257 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 258 259 static int 260 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 261 { 262 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 263 } 264 265 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 266 267 static int 268 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 269 { 270 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 271 } 272 273 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 274 275 struct spdk_msg { 276 spdk_msg_fn fn; 277 void *arg; 278 279 SLIST_ENTRY(spdk_msg) link; 280 }; 281 282 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 283 284 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 285 static uint32_t g_thread_count = 0; 286 287 static __thread struct spdk_thread *tls_thread = NULL; 288 289 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 290 { 291 spdk_trace_register_description("THREAD_IOCH_GET", 292 TRACE_THREAD_IOCH_GET, 293 OWNER_NONE, OBJECT_NONE, 0, 294 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 295 spdk_trace_register_description("THREAD_IOCH_PUT", 296 TRACE_THREAD_IOCH_PUT, 297 OWNER_NONE, OBJECT_NONE, 0, 298 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 299 } 300 301 /* 302 * If this compare function returns zero when two next_run_ticks are equal, 303 * the macro RB_INSERT() returns a pointer to the element with the same 304 * next_run_tick. 305 * 306 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 307 * to remove as a parameter. 308 * 309 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 310 * side by returning 1 when two next_run_ticks are equal. 311 */ 312 static inline int 313 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 314 { 315 if (poller1->next_run_tick < poller2->next_run_tick) { 316 return -1; 317 } else { 318 return 1; 319 } 320 } 321 322 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 323 324 static inline struct spdk_thread * 325 _get_thread(void) 326 { 327 return tls_thread; 328 } 329 330 static int 331 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 332 { 333 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 334 335 g_ctx_sz = ctx_sz; 336 337 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 338 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 339 sizeof(struct spdk_msg), 340 0, /* No cache. We do our own. */ 341 SPDK_ENV_SOCKET_ID_ANY); 342 343 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 344 msg_mempool_sz); 345 346 if (!g_spdk_msg_mempool) { 347 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 348 return -ENOMEM; 349 } 350 351 return 0; 352 } 353 354 static void thread_interrupt_destroy(struct spdk_thread *thread); 355 static int thread_interrupt_create(struct spdk_thread *thread); 356 357 static void 358 _free_thread(struct spdk_thread *thread) 359 { 360 struct spdk_io_channel *ch; 361 struct spdk_msg *msg; 362 struct spdk_poller *poller, *ptmp; 363 364 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 365 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 366 thread->name, ch->dev->name); 367 } 368 369 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 370 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 371 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 372 poller->name); 373 } 374 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 375 free(poller); 376 } 377 378 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 379 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 380 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 381 poller->name); 382 } 383 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 384 free(poller); 385 } 386 387 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 388 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 389 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 390 free(poller); 391 } 392 393 pthread_mutex_lock(&g_devlist_mutex); 394 assert(g_thread_count > 0); 395 g_thread_count--; 396 TAILQ_REMOVE(&g_threads, thread, tailq); 397 pthread_mutex_unlock(&g_devlist_mutex); 398 399 msg = SLIST_FIRST(&thread->msg_cache); 400 while (msg != NULL) { 401 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 402 403 assert(thread->msg_cache_count > 0); 404 thread->msg_cache_count--; 405 spdk_mempool_put(g_spdk_msg_mempool, msg); 406 407 msg = SLIST_FIRST(&thread->msg_cache); 408 } 409 410 assert(thread->msg_cache_count == 0); 411 412 if (spdk_interrupt_mode_is_enabled()) { 413 thread_interrupt_destroy(thread); 414 } 415 416 spdk_ring_free(thread->messages); 417 free(thread); 418 } 419 420 int 421 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 422 { 423 assert(g_new_thread_fn == NULL); 424 assert(g_thread_op_fn == NULL); 425 426 if (new_thread_fn == NULL) { 427 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 428 } else { 429 g_new_thread_fn = new_thread_fn; 430 } 431 432 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 433 } 434 435 int 436 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 437 spdk_thread_op_supported_fn thread_op_supported_fn, 438 size_t ctx_sz, size_t msg_mempool_sz) 439 { 440 assert(g_new_thread_fn == NULL); 441 assert(g_thread_op_fn == NULL); 442 assert(g_thread_op_supported_fn == NULL); 443 444 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 445 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 446 return -EINVAL; 447 } 448 449 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 450 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 451 } else { 452 g_thread_op_fn = thread_op_fn; 453 g_thread_op_supported_fn = thread_op_supported_fn; 454 } 455 456 return _thread_lib_init(ctx_sz, msg_mempool_sz); 457 } 458 459 void 460 spdk_thread_lib_fini(void) 461 { 462 struct io_device *dev; 463 464 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 465 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 466 } 467 468 g_new_thread_fn = NULL; 469 g_thread_op_fn = NULL; 470 g_thread_op_supported_fn = NULL; 471 g_ctx_sz = 0; 472 if (g_app_thread != NULL) { 473 _free_thread(g_app_thread); 474 g_app_thread = NULL; 475 } 476 477 if (g_spdk_msg_mempool) { 478 spdk_mempool_free(g_spdk_msg_mempool); 479 g_spdk_msg_mempool = NULL; 480 } 481 } 482 483 struct spdk_thread * 484 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 485 { 486 struct spdk_thread *thread, *null_thread; 487 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 488 int rc = 0, i; 489 490 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 491 if (!thread) { 492 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 493 return NULL; 494 } 495 496 if (cpumask) { 497 spdk_cpuset_copy(&thread->cpumask, cpumask); 498 } else { 499 spdk_cpuset_negate(&thread->cpumask); 500 } 501 502 RB_INIT(&thread->io_channels); 503 TAILQ_INIT(&thread->active_pollers); 504 RB_INIT(&thread->timed_pollers); 505 TAILQ_INIT(&thread->paused_pollers); 506 SLIST_INIT(&thread->msg_cache); 507 thread->msg_cache_count = 0; 508 509 thread->tsc_last = spdk_get_ticks(); 510 511 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 512 * ID exceeds UINT64_MAX a warning message is logged 513 */ 514 thread->next_poller_id = 1; 515 516 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 517 if (!thread->messages) { 518 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 519 free(thread); 520 return NULL; 521 } 522 523 /* Fill the local message pool cache. */ 524 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 525 if (rc == 0) { 526 /* If we can't populate the cache it's ok. The cache will get filled 527 * up organically as messages are passed to the thread. */ 528 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 529 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 530 thread->msg_cache_count++; 531 } 532 } 533 534 if (name) { 535 snprintf(thread->name, sizeof(thread->name), "%s", name); 536 } else { 537 snprintf(thread->name, sizeof(thread->name), "%p", thread); 538 } 539 540 pthread_mutex_lock(&g_devlist_mutex); 541 if (g_thread_id == 0) { 542 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 543 pthread_mutex_unlock(&g_devlist_mutex); 544 _free_thread(thread); 545 return NULL; 546 } 547 thread->id = g_thread_id++; 548 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 549 g_thread_count++; 550 pthread_mutex_unlock(&g_devlist_mutex); 551 552 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 553 thread->id, thread->name); 554 555 if (spdk_interrupt_mode_is_enabled()) { 556 thread->in_interrupt = true; 557 rc = thread_interrupt_create(thread); 558 if (rc != 0) { 559 _free_thread(thread); 560 return NULL; 561 } 562 } 563 564 if (g_new_thread_fn) { 565 rc = g_new_thread_fn(thread); 566 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 567 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 568 } 569 570 if (rc != 0) { 571 _free_thread(thread); 572 return NULL; 573 } 574 575 thread->state = SPDK_THREAD_STATE_RUNNING; 576 577 /* If this is the first thread, save it as the app thread. Use an atomic 578 * compare + exchange to guard against crazy users who might try to 579 * call spdk_thread_create() simultaneously on multiple threads. 580 */ 581 null_thread = NULL; 582 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 583 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 584 585 return thread; 586 } 587 588 struct spdk_thread * 589 spdk_thread_get_app_thread(void) 590 { 591 return g_app_thread; 592 } 593 594 bool 595 spdk_thread_is_app_thread(struct spdk_thread *thread) 596 { 597 if (thread == NULL) { 598 thread = _get_thread(); 599 } 600 601 return g_app_thread == thread; 602 } 603 604 void 605 spdk_thread_bind(struct spdk_thread *thread, bool bind) 606 { 607 thread->is_bound = bind; 608 } 609 610 bool 611 spdk_thread_is_bound(struct spdk_thread *thread) 612 { 613 return thread->is_bound; 614 } 615 616 void 617 spdk_set_thread(struct spdk_thread *thread) 618 { 619 tls_thread = thread; 620 } 621 622 static void 623 thread_exit(struct spdk_thread *thread, uint64_t now) 624 { 625 struct spdk_poller *poller; 626 struct spdk_io_channel *ch; 627 628 if (now >= thread->exit_timeout_tsc) { 629 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 630 thread->name); 631 goto exited; 632 } 633 634 if (spdk_ring_count(thread->messages) > 0) { 635 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 636 return; 637 } 638 639 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 640 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 641 SPDK_INFOLOG(thread, 642 "thread %s still has active poller %s\n", 643 thread->name, poller->name); 644 return; 645 } 646 } 647 648 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 649 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 650 SPDK_INFOLOG(thread, 651 "thread %s still has active timed poller %s\n", 652 thread->name, poller->name); 653 return; 654 } 655 } 656 657 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 658 SPDK_INFOLOG(thread, 659 "thread %s still has paused poller %s\n", 660 thread->name, poller->name); 661 return; 662 } 663 664 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 665 SPDK_INFOLOG(thread, 666 "thread %s still has channel for io_device %s\n", 667 thread->name, ch->dev->name); 668 return; 669 } 670 671 if (thread->pending_unregister_count > 0) { 672 SPDK_INFOLOG(thread, 673 "thread %s is still unregistering io_devices\n", 674 thread->name); 675 return; 676 } 677 678 exited: 679 thread->state = SPDK_THREAD_STATE_EXITED; 680 if (spdk_unlikely(thread->in_interrupt)) { 681 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 682 } 683 } 684 685 static void _thread_exit(void *ctx); 686 687 int 688 spdk_thread_exit(struct spdk_thread *thread) 689 { 690 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 691 692 assert(tls_thread == thread); 693 694 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 695 SPDK_INFOLOG(thread, 696 "thread %s is already exiting\n", 697 thread->name); 698 return 0; 699 } 700 701 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 702 SPDK_THREAD_EXIT_TIMEOUT_SEC); 703 thread->state = SPDK_THREAD_STATE_EXITING; 704 705 if (spdk_interrupt_mode_is_enabled()) { 706 spdk_thread_send_msg(thread, _thread_exit, thread); 707 } 708 709 return 0; 710 } 711 712 bool 713 spdk_thread_is_running(struct spdk_thread *thread) 714 { 715 return thread->state == SPDK_THREAD_STATE_RUNNING; 716 } 717 718 bool 719 spdk_thread_is_exited(struct spdk_thread *thread) 720 { 721 return thread->state == SPDK_THREAD_STATE_EXITED; 722 } 723 724 void 725 spdk_thread_destroy(struct spdk_thread *thread) 726 { 727 assert(thread != NULL); 728 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 729 730 assert(thread->state == SPDK_THREAD_STATE_EXITED); 731 732 if (tls_thread == thread) { 733 tls_thread = NULL; 734 } 735 736 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 737 if (thread != g_app_thread) { 738 _free_thread(thread); 739 } 740 } 741 742 void * 743 spdk_thread_get_ctx(struct spdk_thread *thread) 744 { 745 if (g_ctx_sz > 0) { 746 return thread->ctx; 747 } 748 749 return NULL; 750 } 751 752 struct spdk_cpuset * 753 spdk_thread_get_cpumask(struct spdk_thread *thread) 754 { 755 return &thread->cpumask; 756 } 757 758 int 759 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 760 { 761 struct spdk_thread *thread; 762 763 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 764 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 765 assert(false); 766 return -ENOTSUP; 767 } 768 769 thread = spdk_get_thread(); 770 if (!thread) { 771 SPDK_ERRLOG("Called from non-SPDK thread\n"); 772 assert(false); 773 return -EINVAL; 774 } 775 776 spdk_cpuset_copy(&thread->cpumask, cpumask); 777 778 /* Invoke framework's reschedule operation. If this function is called multiple times 779 * in a single spdk_thread_poll() context, the last cpumask will be used in the 780 * reschedule operation. 781 */ 782 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 783 784 return 0; 785 } 786 787 struct spdk_thread * 788 spdk_thread_get_from_ctx(void *ctx) 789 { 790 if (ctx == NULL) { 791 assert(false); 792 return NULL; 793 } 794 795 assert(g_ctx_sz > 0); 796 797 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 798 } 799 800 static inline uint32_t 801 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 802 { 803 unsigned count, i; 804 void *messages[SPDK_MSG_BATCH_SIZE]; 805 uint64_t notify = 1; 806 int rc; 807 808 #ifdef DEBUG 809 /* 810 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 811 * so we will never actually read uninitialized data from events, but just to be sure 812 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 813 */ 814 memset(messages, 0, sizeof(messages)); 815 #endif 816 817 if (max_msgs > 0) { 818 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 819 } else { 820 max_msgs = SPDK_MSG_BATCH_SIZE; 821 } 822 823 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 824 if (spdk_unlikely(thread->in_interrupt) && 825 spdk_ring_count(thread->messages) != 0) { 826 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 827 if (rc < 0) { 828 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 829 } 830 } 831 if (count == 0) { 832 return 0; 833 } 834 835 for (i = 0; i < count; i++) { 836 struct spdk_msg *msg = messages[i]; 837 838 assert(msg != NULL); 839 840 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 841 842 msg->fn(msg->arg); 843 844 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 845 846 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 847 /* Insert the messages at the head. We want to re-use the hot 848 * ones. */ 849 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 850 thread->msg_cache_count++; 851 } else { 852 spdk_mempool_put(g_spdk_msg_mempool, msg); 853 } 854 } 855 856 return count; 857 } 858 859 static void 860 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 861 { 862 struct spdk_poller *tmp __attribute__((unused)); 863 864 poller->next_run_tick = now + poller->period_ticks; 865 866 /* 867 * Insert poller in the thread's timed_pollers tree by next scheduled run time 868 * as its key. 869 */ 870 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 871 assert(tmp == NULL); 872 873 /* Update the cache only if it is empty or the inserted poller is earlier than it. 874 * RB_MIN() is not necessary here because all pollers, which has exactly the same 875 * next_run_tick as the existing poller, are inserted on the right side. 876 */ 877 if (thread->first_timed_poller == NULL || 878 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 879 thread->first_timed_poller = poller; 880 } 881 } 882 883 static inline void 884 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 885 { 886 struct spdk_poller *tmp __attribute__((unused)); 887 888 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 889 assert(tmp != NULL); 890 891 /* This function is not used in any case that is performance critical. 892 * Update the cache simply by RB_MIN() if it needs to be changed. 893 */ 894 if (thread->first_timed_poller == poller) { 895 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 896 } 897 } 898 899 static void 900 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 901 { 902 if (poller->period_ticks) { 903 poller_insert_timer(thread, poller, spdk_get_ticks()); 904 } else { 905 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 906 } 907 } 908 909 static inline void 910 thread_update_stats(struct spdk_thread *thread, uint64_t end, 911 uint64_t start, int rc) 912 { 913 if (rc == 0) { 914 /* Poller status idle */ 915 thread->stats.idle_tsc += end - start; 916 } else if (rc > 0) { 917 /* Poller status busy */ 918 thread->stats.busy_tsc += end - start; 919 } 920 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 921 thread->tsc_last = end; 922 } 923 924 static inline int 925 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 926 { 927 int rc; 928 929 switch (poller->state) { 930 case SPDK_POLLER_STATE_UNREGISTERED: 931 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 932 free(poller); 933 return 0; 934 case SPDK_POLLER_STATE_PAUSING: 935 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 936 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 937 poller->state = SPDK_POLLER_STATE_PAUSED; 938 return 0; 939 case SPDK_POLLER_STATE_WAITING: 940 break; 941 default: 942 assert(false); 943 break; 944 } 945 946 poller->state = SPDK_POLLER_STATE_RUNNING; 947 rc = poller->fn(poller->arg); 948 949 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 950 951 poller->run_count++; 952 if (rc > 0) { 953 poller->busy_count++; 954 } 955 956 #ifdef DEBUG 957 if (rc == -1) { 958 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 959 } 960 #endif 961 962 switch (poller->state) { 963 case SPDK_POLLER_STATE_UNREGISTERED: 964 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 965 free(poller); 966 break; 967 case SPDK_POLLER_STATE_PAUSING: 968 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 969 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 970 poller->state = SPDK_POLLER_STATE_PAUSED; 971 break; 972 case SPDK_POLLER_STATE_PAUSED: 973 case SPDK_POLLER_STATE_WAITING: 974 break; 975 case SPDK_POLLER_STATE_RUNNING: 976 poller->state = SPDK_POLLER_STATE_WAITING; 977 break; 978 default: 979 assert(false); 980 break; 981 } 982 983 return rc; 984 } 985 986 static inline int 987 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 988 uint64_t now) 989 { 990 int rc; 991 992 switch (poller->state) { 993 case SPDK_POLLER_STATE_UNREGISTERED: 994 free(poller); 995 return 0; 996 case SPDK_POLLER_STATE_PAUSING: 997 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 998 poller->state = SPDK_POLLER_STATE_PAUSED; 999 return 0; 1000 case SPDK_POLLER_STATE_WAITING: 1001 break; 1002 default: 1003 assert(false); 1004 break; 1005 } 1006 1007 poller->state = SPDK_POLLER_STATE_RUNNING; 1008 rc = poller->fn(poller->arg); 1009 1010 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1011 1012 poller->run_count++; 1013 if (rc > 0) { 1014 poller->busy_count++; 1015 } 1016 1017 #ifdef DEBUG 1018 if (rc == -1) { 1019 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1020 } 1021 #endif 1022 1023 switch (poller->state) { 1024 case SPDK_POLLER_STATE_UNREGISTERED: 1025 free(poller); 1026 break; 1027 case SPDK_POLLER_STATE_PAUSING: 1028 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1029 poller->state = SPDK_POLLER_STATE_PAUSED; 1030 break; 1031 case SPDK_POLLER_STATE_PAUSED: 1032 break; 1033 case SPDK_POLLER_STATE_RUNNING: 1034 poller->state = SPDK_POLLER_STATE_WAITING; 1035 /* fallthrough */ 1036 case SPDK_POLLER_STATE_WAITING: 1037 poller_insert_timer(thread, poller, now); 1038 break; 1039 default: 1040 assert(false); 1041 break; 1042 } 1043 1044 return rc; 1045 } 1046 1047 static int 1048 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1049 { 1050 uint32_t msg_count; 1051 struct spdk_poller *poller, *tmp; 1052 spdk_msg_fn critical_msg; 1053 int rc = 0; 1054 1055 thread->tsc_last = now; 1056 1057 critical_msg = thread->critical_msg; 1058 if (spdk_unlikely(critical_msg != NULL)) { 1059 critical_msg(NULL); 1060 thread->critical_msg = NULL; 1061 rc = 1; 1062 } 1063 1064 msg_count = msg_queue_run_batch(thread, max_msgs); 1065 if (msg_count) { 1066 rc = 1; 1067 } 1068 1069 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1070 active_pollers_head, tailq, tmp) { 1071 int poller_rc; 1072 1073 poller_rc = thread_execute_poller(thread, poller); 1074 if (poller_rc > rc) { 1075 rc = poller_rc; 1076 } 1077 } 1078 1079 poller = thread->first_timed_poller; 1080 while (poller != NULL) { 1081 int timer_rc = 0; 1082 1083 if (now < poller->next_run_tick) { 1084 break; 1085 } 1086 1087 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1088 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1089 1090 /* Update the cache to the next timed poller in the list 1091 * only if the current poller is still the closest, otherwise, 1092 * do nothing because the cache has been already updated. 1093 */ 1094 if (thread->first_timed_poller == poller) { 1095 thread->first_timed_poller = tmp; 1096 } 1097 1098 timer_rc = thread_execute_timed_poller(thread, poller, now); 1099 if (timer_rc > rc) { 1100 rc = timer_rc; 1101 } 1102 1103 poller = tmp; 1104 } 1105 1106 return rc; 1107 } 1108 1109 static void 1110 _thread_remove_pollers(void *ctx) 1111 { 1112 struct spdk_thread *thread = ctx; 1113 struct spdk_poller *poller, *tmp; 1114 1115 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1116 active_pollers_head, tailq, tmp) { 1117 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1118 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1119 free(poller); 1120 } 1121 } 1122 1123 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1124 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1125 poller_remove_timer(thread, poller); 1126 free(poller); 1127 } 1128 } 1129 1130 thread->poller_unregistered = false; 1131 } 1132 1133 static void 1134 _thread_exit(void *ctx) 1135 { 1136 struct spdk_thread *thread = ctx; 1137 1138 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1139 1140 thread_exit(thread, spdk_get_ticks()); 1141 } 1142 1143 int 1144 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1145 { 1146 struct spdk_thread *orig_thread; 1147 int rc; 1148 1149 orig_thread = _get_thread(); 1150 tls_thread = thread; 1151 1152 if (now == 0) { 1153 now = spdk_get_ticks(); 1154 } 1155 1156 if (spdk_likely(!thread->in_interrupt)) { 1157 rc = thread_poll(thread, max_msgs, now); 1158 if (spdk_unlikely(thread->in_interrupt)) { 1159 /* The thread transitioned to interrupt mode during the above poll. 1160 * Poll it one more time in case that during the transition time 1161 * there is msg received without notification. 1162 */ 1163 rc = thread_poll(thread, max_msgs, now); 1164 } 1165 1166 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1167 thread_exit(thread, now); 1168 } 1169 } else { 1170 /* Non-block wait on thread's fd_group */ 1171 rc = spdk_fd_group_wait(thread->fgrp, 0); 1172 } 1173 1174 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1175 1176 tls_thread = orig_thread; 1177 1178 return rc; 1179 } 1180 1181 uint64_t 1182 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1183 { 1184 struct spdk_poller *poller; 1185 1186 poller = thread->first_timed_poller; 1187 if (poller) { 1188 return poller->next_run_tick; 1189 } 1190 1191 return 0; 1192 } 1193 1194 int 1195 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1196 { 1197 return !TAILQ_EMPTY(&thread->active_pollers); 1198 } 1199 1200 static bool 1201 thread_has_unpaused_pollers(struct spdk_thread *thread) 1202 { 1203 if (TAILQ_EMPTY(&thread->active_pollers) && 1204 RB_EMPTY(&thread->timed_pollers)) { 1205 return false; 1206 } 1207 1208 return true; 1209 } 1210 1211 bool 1212 spdk_thread_has_pollers(struct spdk_thread *thread) 1213 { 1214 if (!thread_has_unpaused_pollers(thread) && 1215 TAILQ_EMPTY(&thread->paused_pollers)) { 1216 return false; 1217 } 1218 1219 return true; 1220 } 1221 1222 bool 1223 spdk_thread_is_idle(struct spdk_thread *thread) 1224 { 1225 if (spdk_ring_count(thread->messages) || 1226 thread_has_unpaused_pollers(thread) || 1227 thread->critical_msg != NULL) { 1228 return false; 1229 } 1230 1231 return true; 1232 } 1233 1234 uint32_t 1235 spdk_thread_get_count(void) 1236 { 1237 /* 1238 * Return cached value of the current thread count. We could acquire the 1239 * lock and iterate through the TAILQ of threads to count them, but that 1240 * count could still be invalidated after we release the lock. 1241 */ 1242 return g_thread_count; 1243 } 1244 1245 struct spdk_thread * 1246 spdk_get_thread(void) 1247 { 1248 return _get_thread(); 1249 } 1250 1251 const char * 1252 spdk_thread_get_name(const struct spdk_thread *thread) 1253 { 1254 return thread->name; 1255 } 1256 1257 uint64_t 1258 spdk_thread_get_id(const struct spdk_thread *thread) 1259 { 1260 return thread->id; 1261 } 1262 1263 struct spdk_thread * 1264 spdk_thread_get_by_id(uint64_t id) 1265 { 1266 struct spdk_thread *thread; 1267 1268 if (id == 0 || id >= g_thread_id) { 1269 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1270 return NULL; 1271 } 1272 pthread_mutex_lock(&g_devlist_mutex); 1273 TAILQ_FOREACH(thread, &g_threads, tailq) { 1274 if (thread->id == id) { 1275 break; 1276 } 1277 } 1278 pthread_mutex_unlock(&g_devlist_mutex); 1279 return thread; 1280 } 1281 1282 int 1283 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1284 { 1285 struct spdk_thread *thread; 1286 1287 thread = _get_thread(); 1288 if (!thread) { 1289 SPDK_ERRLOG("No thread allocated\n"); 1290 return -EINVAL; 1291 } 1292 1293 if (stats == NULL) { 1294 return -EINVAL; 1295 } 1296 1297 *stats = thread->stats; 1298 1299 return 0; 1300 } 1301 1302 uint64_t 1303 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1304 { 1305 if (thread == NULL) { 1306 thread = _get_thread(); 1307 } 1308 1309 return thread->tsc_last; 1310 } 1311 1312 static inline int 1313 thread_send_msg_notification(const struct spdk_thread *target_thread) 1314 { 1315 uint64_t notify = 1; 1316 int rc; 1317 1318 /* Not necessary to do notification if interrupt facility is not enabled */ 1319 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1320 return 0; 1321 } 1322 1323 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1324 * after sending thread msg, it is necessary to check whether target thread runs in 1325 * interrupt mode and then decide whether do event notification. 1326 */ 1327 if (spdk_unlikely(target_thread->in_interrupt)) { 1328 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1329 if (rc < 0) { 1330 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1331 return -EIO; 1332 } 1333 } 1334 1335 return 0; 1336 } 1337 1338 int 1339 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1340 { 1341 struct spdk_thread *local_thread; 1342 struct spdk_msg *msg; 1343 int rc; 1344 1345 assert(thread != NULL); 1346 1347 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1348 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1349 return -EIO; 1350 } 1351 1352 local_thread = _get_thread(); 1353 1354 msg = NULL; 1355 if (local_thread != NULL) { 1356 if (local_thread->msg_cache_count > 0) { 1357 msg = SLIST_FIRST(&local_thread->msg_cache); 1358 assert(msg != NULL); 1359 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1360 local_thread->msg_cache_count--; 1361 } 1362 } 1363 1364 if (msg == NULL) { 1365 msg = spdk_mempool_get(g_spdk_msg_mempool); 1366 if (!msg) { 1367 SPDK_ERRLOG("msg could not be allocated\n"); 1368 return -ENOMEM; 1369 } 1370 } 1371 1372 msg->fn = fn; 1373 msg->arg = ctx; 1374 1375 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1376 if (rc != 1) { 1377 SPDK_ERRLOG("msg could not be enqueued\n"); 1378 spdk_mempool_put(g_spdk_msg_mempool, msg); 1379 return -EIO; 1380 } 1381 1382 return thread_send_msg_notification(thread); 1383 } 1384 1385 int 1386 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1387 { 1388 spdk_msg_fn expected = NULL; 1389 1390 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1391 __ATOMIC_SEQ_CST)) { 1392 return -EIO; 1393 } 1394 1395 return thread_send_msg_notification(thread); 1396 } 1397 1398 #ifdef __linux__ 1399 static int 1400 interrupt_timerfd_process(void *arg) 1401 { 1402 struct spdk_poller *poller = arg; 1403 uint64_t exp; 1404 int rc; 1405 1406 /* clear the level of interval timer */ 1407 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1408 if (rc < 0) { 1409 if (rc == -EAGAIN) { 1410 return 0; 1411 } 1412 1413 return rc; 1414 } 1415 1416 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1417 1418 return poller->fn(poller->arg); 1419 } 1420 1421 static int 1422 period_poller_interrupt_init(struct spdk_poller *poller) 1423 { 1424 int timerfd; 1425 1426 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1427 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1428 if (timerfd < 0) { 1429 return -errno; 1430 } 1431 1432 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1433 if (poller->intr == NULL) { 1434 close(timerfd); 1435 return -1; 1436 } 1437 1438 return 0; 1439 } 1440 1441 static void 1442 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1443 { 1444 int timerfd; 1445 uint64_t now_tick = spdk_get_ticks(); 1446 uint64_t ticks = spdk_get_ticks_hz(); 1447 int ret; 1448 struct itimerspec new_tv = {}; 1449 struct itimerspec old_tv = {}; 1450 1451 assert(poller->intr != NULL); 1452 assert(poller->period_ticks != 0); 1453 1454 timerfd = poller->intr->efd; 1455 1456 assert(timerfd >= 0); 1457 1458 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1459 interrupt_mode ? "interrupt" : "poll"); 1460 1461 if (interrupt_mode) { 1462 /* Set repeated timer expiration */ 1463 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1464 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1465 1466 /* Update next timer expiration */ 1467 if (poller->next_run_tick == 0) { 1468 poller->next_run_tick = now_tick + poller->period_ticks; 1469 } else if (poller->next_run_tick < now_tick) { 1470 poller->next_run_tick = now_tick; 1471 } 1472 1473 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1474 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1475 1476 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1477 if (ret < 0) { 1478 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1479 assert(false); 1480 } 1481 } else { 1482 /* Disarm the timer */ 1483 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1484 if (ret < 0) { 1485 /* timerfd_settime's failure indicates that the timerfd is in error */ 1486 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1487 assert(false); 1488 } 1489 1490 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1491 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1492 */ 1493 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1494 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1495 poller_remove_timer(poller->thread, poller); 1496 poller_insert_timer(poller->thread, poller, now_tick); 1497 } 1498 } 1499 1500 static void 1501 poller_interrupt_fini(struct spdk_poller *poller) 1502 { 1503 int fd; 1504 1505 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1506 assert(poller->intr != NULL); 1507 fd = poller->intr->efd; 1508 spdk_interrupt_unregister(&poller->intr); 1509 close(fd); 1510 } 1511 1512 static int 1513 busy_poller_interrupt_init(struct spdk_poller *poller) 1514 { 1515 int busy_efd; 1516 1517 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1518 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1519 if (busy_efd < 0) { 1520 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1521 return -errno; 1522 } 1523 1524 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1525 if (poller->intr == NULL) { 1526 close(busy_efd); 1527 return -1; 1528 } 1529 1530 return 0; 1531 } 1532 1533 static void 1534 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1535 { 1536 int busy_efd = poller->intr->efd; 1537 uint64_t notify = 1; 1538 int rc __attribute__((unused)); 1539 1540 assert(busy_efd >= 0); 1541 1542 if (interrupt_mode) { 1543 /* Write without read on eventfd will get it repeatedly triggered. */ 1544 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1545 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1546 } 1547 } else { 1548 /* Read on eventfd will clear its level triggering. */ 1549 rc = read(busy_efd, ¬ify, sizeof(notify)); 1550 } 1551 } 1552 1553 #else 1554 1555 static int 1556 period_poller_interrupt_init(struct spdk_poller *poller) 1557 { 1558 return -ENOTSUP; 1559 } 1560 1561 static void 1562 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1563 { 1564 } 1565 1566 static void 1567 poller_interrupt_fini(struct spdk_poller *poller) 1568 { 1569 } 1570 1571 static int 1572 busy_poller_interrupt_init(struct spdk_poller *poller) 1573 { 1574 return -ENOTSUP; 1575 } 1576 1577 static void 1578 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1579 { 1580 } 1581 1582 #endif 1583 1584 void 1585 spdk_poller_register_interrupt(struct spdk_poller *poller, 1586 spdk_poller_set_interrupt_mode_cb cb_fn, 1587 void *cb_arg) 1588 { 1589 assert(poller != NULL); 1590 assert(cb_fn != NULL); 1591 assert(spdk_get_thread() == poller->thread); 1592 1593 if (!spdk_interrupt_mode_is_enabled()) { 1594 return; 1595 } 1596 1597 /* If this poller already had an interrupt, clean the old one up. */ 1598 if (poller->intr != NULL) { 1599 poller_interrupt_fini(poller); 1600 } 1601 1602 poller->set_intr_cb_fn = cb_fn; 1603 poller->set_intr_cb_arg = cb_arg; 1604 1605 /* Set poller into interrupt mode if thread is in interrupt. */ 1606 if (poller->thread->in_interrupt) { 1607 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1608 } 1609 } 1610 1611 static uint64_t 1612 convert_us_to_ticks(uint64_t us) 1613 { 1614 uint64_t quotient, remainder, ticks; 1615 1616 if (us) { 1617 quotient = us / SPDK_SEC_TO_USEC; 1618 remainder = us % SPDK_SEC_TO_USEC; 1619 ticks = spdk_get_ticks_hz(); 1620 1621 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1622 } else { 1623 return 0; 1624 } 1625 } 1626 1627 static struct spdk_poller * 1628 poller_register(spdk_poller_fn fn, 1629 void *arg, 1630 uint64_t period_microseconds, 1631 const char *name) 1632 { 1633 struct spdk_thread *thread; 1634 struct spdk_poller *poller; 1635 1636 thread = spdk_get_thread(); 1637 if (!thread) { 1638 assert(false); 1639 return NULL; 1640 } 1641 1642 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1643 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1644 return NULL; 1645 } 1646 1647 poller = calloc(1, sizeof(*poller)); 1648 if (poller == NULL) { 1649 SPDK_ERRLOG("Poller memory allocation failed\n"); 1650 return NULL; 1651 } 1652 1653 if (name) { 1654 snprintf(poller->name, sizeof(poller->name), "%s", name); 1655 } else { 1656 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1657 } 1658 1659 poller->state = SPDK_POLLER_STATE_WAITING; 1660 poller->fn = fn; 1661 poller->arg = arg; 1662 poller->thread = thread; 1663 poller->intr = NULL; 1664 if (thread->next_poller_id == 0) { 1665 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1666 thread->next_poller_id = 1; 1667 } 1668 poller->id = thread->next_poller_id++; 1669 1670 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1671 1672 if (spdk_interrupt_mode_is_enabled()) { 1673 int rc; 1674 1675 if (period_microseconds) { 1676 rc = period_poller_interrupt_init(poller); 1677 if (rc < 0) { 1678 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1679 free(poller); 1680 return NULL; 1681 } 1682 1683 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1684 poller->set_intr_cb_arg = NULL; 1685 1686 } else { 1687 /* If the poller doesn't have a period, create interruptfd that's always 1688 * busy automatically when running in interrupt mode. 1689 */ 1690 rc = busy_poller_interrupt_init(poller); 1691 if (rc > 0) { 1692 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1693 free(poller); 1694 return NULL; 1695 } 1696 1697 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1698 poller->set_intr_cb_arg = NULL; 1699 } 1700 1701 /* Set poller into interrupt mode if thread is in interrupt. */ 1702 if (poller->thread->in_interrupt) { 1703 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1704 } 1705 } 1706 1707 thread_insert_poller(thread, poller); 1708 1709 return poller; 1710 } 1711 1712 struct spdk_poller * 1713 spdk_poller_register(spdk_poller_fn fn, 1714 void *arg, 1715 uint64_t period_microseconds) 1716 { 1717 return poller_register(fn, arg, period_microseconds, NULL); 1718 } 1719 1720 struct spdk_poller * 1721 spdk_poller_register_named(spdk_poller_fn fn, 1722 void *arg, 1723 uint64_t period_microseconds, 1724 const char *name) 1725 { 1726 return poller_register(fn, arg, period_microseconds, name); 1727 } 1728 1729 static void 1730 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1731 struct spdk_thread *curthread) 1732 { 1733 if (thread == NULL) { 1734 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1735 abort(); 1736 } 1737 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1738 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1739 thread->name, thread->id); 1740 assert(false); 1741 } 1742 1743 void 1744 spdk_poller_unregister(struct spdk_poller **ppoller) 1745 { 1746 struct spdk_thread *thread; 1747 struct spdk_poller *poller; 1748 1749 poller = *ppoller; 1750 if (poller == NULL) { 1751 return; 1752 } 1753 1754 *ppoller = NULL; 1755 1756 thread = spdk_get_thread(); 1757 if (!thread) { 1758 assert(false); 1759 return; 1760 } 1761 1762 if (poller->thread != thread) { 1763 wrong_thread(__func__, poller->name, poller->thread, thread); 1764 return; 1765 } 1766 1767 if (spdk_interrupt_mode_is_enabled()) { 1768 /* Release the interrupt resource for period or busy poller */ 1769 if (poller->intr != NULL) { 1770 poller_interrupt_fini(poller); 1771 } 1772 1773 /* If there is not already a pending poller removal, generate 1774 * a message to go process removals. */ 1775 if (!thread->poller_unregistered) { 1776 thread->poller_unregistered = true; 1777 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1778 } 1779 } 1780 1781 /* If the poller was paused, put it on the active_pollers list so that 1782 * its unregistration can be processed by spdk_thread_poll(). 1783 */ 1784 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1785 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1786 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1787 poller->period_ticks = 0; 1788 } 1789 1790 /* Simply set the state to unregistered. The poller will get cleaned up 1791 * in a subsequent call to spdk_thread_poll(). 1792 */ 1793 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1794 } 1795 1796 void 1797 spdk_poller_pause(struct spdk_poller *poller) 1798 { 1799 struct spdk_thread *thread; 1800 1801 thread = spdk_get_thread(); 1802 if (!thread) { 1803 assert(false); 1804 return; 1805 } 1806 1807 if (poller->thread != thread) { 1808 wrong_thread(__func__, poller->name, poller->thread, thread); 1809 return; 1810 } 1811 1812 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1813 * spdk_thread_poll() move it. It allows a poller to be paused from 1814 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1815 * iteration, or from within itself without breaking the logic to always 1816 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1817 */ 1818 switch (poller->state) { 1819 case SPDK_POLLER_STATE_PAUSED: 1820 case SPDK_POLLER_STATE_PAUSING: 1821 break; 1822 case SPDK_POLLER_STATE_RUNNING: 1823 case SPDK_POLLER_STATE_WAITING: 1824 poller->state = SPDK_POLLER_STATE_PAUSING; 1825 break; 1826 default: 1827 assert(false); 1828 break; 1829 } 1830 } 1831 1832 void 1833 spdk_poller_resume(struct spdk_poller *poller) 1834 { 1835 struct spdk_thread *thread; 1836 1837 thread = spdk_get_thread(); 1838 if (!thread) { 1839 assert(false); 1840 return; 1841 } 1842 1843 if (poller->thread != thread) { 1844 wrong_thread(__func__, poller->name, poller->thread, thread); 1845 return; 1846 } 1847 1848 /* If a poller is paused it has to be removed from the paused pollers 1849 * list and put on the active list or timer tree depending on its 1850 * period_ticks. If a poller is still in the process of being paused, 1851 * we just need to flip its state back to waiting, as it's already on 1852 * the appropriate list or tree. 1853 */ 1854 switch (poller->state) { 1855 case SPDK_POLLER_STATE_PAUSED: 1856 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1857 thread_insert_poller(thread, poller); 1858 /* fallthrough */ 1859 case SPDK_POLLER_STATE_PAUSING: 1860 poller->state = SPDK_POLLER_STATE_WAITING; 1861 break; 1862 case SPDK_POLLER_STATE_RUNNING: 1863 case SPDK_POLLER_STATE_WAITING: 1864 break; 1865 default: 1866 assert(false); 1867 break; 1868 } 1869 } 1870 1871 const char * 1872 spdk_poller_get_name(struct spdk_poller *poller) 1873 { 1874 return poller->name; 1875 } 1876 1877 uint64_t 1878 spdk_poller_get_id(struct spdk_poller *poller) 1879 { 1880 return poller->id; 1881 } 1882 1883 const char * 1884 spdk_poller_get_state_str(struct spdk_poller *poller) 1885 { 1886 switch (poller->state) { 1887 case SPDK_POLLER_STATE_WAITING: 1888 return "waiting"; 1889 case SPDK_POLLER_STATE_RUNNING: 1890 return "running"; 1891 case SPDK_POLLER_STATE_UNREGISTERED: 1892 return "unregistered"; 1893 case SPDK_POLLER_STATE_PAUSING: 1894 return "pausing"; 1895 case SPDK_POLLER_STATE_PAUSED: 1896 return "paused"; 1897 default: 1898 return NULL; 1899 } 1900 } 1901 1902 uint64_t 1903 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1904 { 1905 return poller->period_ticks; 1906 } 1907 1908 void 1909 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1910 { 1911 stats->run_count = poller->run_count; 1912 stats->busy_count = poller->busy_count; 1913 } 1914 1915 struct spdk_poller * 1916 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1917 { 1918 return TAILQ_FIRST(&thread->active_pollers); 1919 } 1920 1921 struct spdk_poller * 1922 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1923 { 1924 return TAILQ_NEXT(prev, tailq); 1925 } 1926 1927 struct spdk_poller * 1928 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1929 { 1930 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1931 } 1932 1933 struct spdk_poller * 1934 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1935 { 1936 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1937 } 1938 1939 struct spdk_poller * 1940 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1941 { 1942 return TAILQ_FIRST(&thread->paused_pollers); 1943 } 1944 1945 struct spdk_poller * 1946 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1947 { 1948 return TAILQ_NEXT(prev, tailq); 1949 } 1950 1951 struct spdk_io_channel * 1952 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1953 { 1954 return RB_MIN(io_channel_tree, &thread->io_channels); 1955 } 1956 1957 struct spdk_io_channel * 1958 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1959 { 1960 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1961 } 1962 1963 struct call_thread { 1964 struct spdk_thread *cur_thread; 1965 spdk_msg_fn fn; 1966 void *ctx; 1967 1968 struct spdk_thread *orig_thread; 1969 spdk_msg_fn cpl; 1970 }; 1971 1972 static void 1973 _on_thread(void *ctx) 1974 { 1975 struct call_thread *ct = ctx; 1976 int rc __attribute__((unused)); 1977 1978 ct->fn(ct->ctx); 1979 1980 pthread_mutex_lock(&g_devlist_mutex); 1981 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1982 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1983 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1984 ct->cur_thread->name); 1985 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1986 } 1987 pthread_mutex_unlock(&g_devlist_mutex); 1988 1989 if (!ct->cur_thread) { 1990 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1991 1992 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1993 free(ctx); 1994 } else { 1995 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1996 ct->cur_thread->name); 1997 1998 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1999 } 2000 assert(rc == 0); 2001 } 2002 2003 void 2004 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 2005 { 2006 struct call_thread *ct; 2007 struct spdk_thread *thread; 2008 int rc __attribute__((unused)); 2009 2010 ct = calloc(1, sizeof(*ct)); 2011 if (!ct) { 2012 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2013 cpl(ctx); 2014 return; 2015 } 2016 2017 ct->fn = fn; 2018 ct->ctx = ctx; 2019 ct->cpl = cpl; 2020 2021 thread = _get_thread(); 2022 if (!thread) { 2023 SPDK_ERRLOG("No thread allocated\n"); 2024 free(ct); 2025 cpl(ctx); 2026 return; 2027 } 2028 ct->orig_thread = thread; 2029 2030 pthread_mutex_lock(&g_devlist_mutex); 2031 ct->cur_thread = TAILQ_FIRST(&g_threads); 2032 pthread_mutex_unlock(&g_devlist_mutex); 2033 2034 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2035 ct->orig_thread->name); 2036 2037 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2038 assert(rc == 0); 2039 } 2040 2041 static inline void 2042 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2043 { 2044 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2045 return; 2046 } 2047 2048 if (!poller->set_intr_cb_fn) { 2049 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2050 assert(false); 2051 return; 2052 } 2053 2054 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2055 } 2056 2057 void 2058 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2059 { 2060 struct spdk_thread *thread = _get_thread(); 2061 struct spdk_poller *poller, *tmp; 2062 2063 assert(thread); 2064 assert(spdk_interrupt_mode_is_enabled()); 2065 2066 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2067 thread->name, enable_interrupt ? "intr" : "poll", 2068 thread->in_interrupt ? "intr" : "poll"); 2069 2070 if (thread->in_interrupt == enable_interrupt) { 2071 return; 2072 } 2073 2074 /* Set pollers to expected mode */ 2075 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2076 poller_set_interrupt_mode(poller, enable_interrupt); 2077 } 2078 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2079 poller_set_interrupt_mode(poller, enable_interrupt); 2080 } 2081 /* All paused pollers will go to work in interrupt mode */ 2082 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2083 poller_set_interrupt_mode(poller, enable_interrupt); 2084 } 2085 2086 thread->in_interrupt = enable_interrupt; 2087 return; 2088 } 2089 2090 static struct io_device * 2091 io_device_get(void *io_device) 2092 { 2093 struct io_device find = {}; 2094 2095 find.io_device = io_device; 2096 return RB_FIND(io_device_tree, &g_io_devices, &find); 2097 } 2098 2099 void 2100 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2101 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2102 const char *name) 2103 { 2104 struct io_device *dev, *tmp; 2105 struct spdk_thread *thread; 2106 2107 assert(io_device != NULL); 2108 assert(create_cb != NULL); 2109 assert(destroy_cb != NULL); 2110 2111 thread = spdk_get_thread(); 2112 if (!thread) { 2113 SPDK_ERRLOG("called from non-SPDK thread\n"); 2114 assert(false); 2115 return; 2116 } 2117 2118 dev = calloc(1, sizeof(struct io_device)); 2119 if (dev == NULL) { 2120 SPDK_ERRLOG("could not allocate io_device\n"); 2121 return; 2122 } 2123 2124 dev->io_device = io_device; 2125 if (name) { 2126 snprintf(dev->name, sizeof(dev->name), "%s", name); 2127 } else { 2128 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2129 } 2130 dev->create_cb = create_cb; 2131 dev->destroy_cb = destroy_cb; 2132 dev->unregister_cb = NULL; 2133 dev->ctx_size = ctx_size; 2134 dev->for_each_count = 0; 2135 dev->unregistered = false; 2136 dev->refcnt = 0; 2137 2138 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2139 dev->name, dev->io_device, thread->name); 2140 2141 pthread_mutex_lock(&g_devlist_mutex); 2142 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2143 if (tmp != NULL) { 2144 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2145 io_device, tmp->name, dev->name); 2146 free(dev); 2147 } 2148 2149 pthread_mutex_unlock(&g_devlist_mutex); 2150 } 2151 2152 static void 2153 _finish_unregister(void *arg) 2154 { 2155 struct io_device *dev = arg; 2156 struct spdk_thread *thread; 2157 2158 thread = spdk_get_thread(); 2159 assert(thread == dev->unregister_thread); 2160 2161 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2162 dev->name, dev->io_device, thread->name); 2163 2164 assert(thread->pending_unregister_count > 0); 2165 thread->pending_unregister_count--; 2166 2167 dev->unregister_cb(dev->io_device); 2168 free(dev); 2169 } 2170 2171 static void 2172 io_device_free(struct io_device *dev) 2173 { 2174 int rc __attribute__((unused)); 2175 2176 if (dev->unregister_cb == NULL) { 2177 free(dev); 2178 } else { 2179 assert(dev->unregister_thread != NULL); 2180 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2181 dev->name, dev->io_device, dev->unregister_thread->name); 2182 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2183 assert(rc == 0); 2184 } 2185 } 2186 2187 void 2188 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2189 { 2190 struct io_device *dev; 2191 uint32_t refcnt; 2192 struct spdk_thread *thread; 2193 2194 thread = spdk_get_thread(); 2195 if (!thread) { 2196 SPDK_ERRLOG("called from non-SPDK thread\n"); 2197 assert(false); 2198 return; 2199 } 2200 2201 pthread_mutex_lock(&g_devlist_mutex); 2202 dev = io_device_get(io_device); 2203 if (!dev) { 2204 SPDK_ERRLOG("io_device %p not found\n", io_device); 2205 assert(false); 2206 pthread_mutex_unlock(&g_devlist_mutex); 2207 return; 2208 } 2209 2210 /* The for_each_count check differentiates the user attempting to unregister the 2211 * device a second time, from the internal call to this function that occurs 2212 * after the for_each_count reaches 0. 2213 */ 2214 if (dev->pending_unregister && dev->for_each_count > 0) { 2215 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2216 assert(false); 2217 pthread_mutex_unlock(&g_devlist_mutex); 2218 return; 2219 } 2220 2221 dev->unregister_cb = unregister_cb; 2222 dev->unregister_thread = thread; 2223 2224 if (dev->for_each_count > 0) { 2225 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2226 dev->name, io_device, dev->for_each_count); 2227 dev->pending_unregister = true; 2228 pthread_mutex_unlock(&g_devlist_mutex); 2229 return; 2230 } 2231 2232 dev->unregistered = true; 2233 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2234 refcnt = dev->refcnt; 2235 pthread_mutex_unlock(&g_devlist_mutex); 2236 2237 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2238 dev->name, dev->io_device, thread->name); 2239 2240 if (unregister_cb) { 2241 thread->pending_unregister_count++; 2242 } 2243 2244 if (refcnt > 0) { 2245 /* defer deletion */ 2246 return; 2247 } 2248 2249 io_device_free(dev); 2250 } 2251 2252 const char * 2253 spdk_io_device_get_name(struct io_device *dev) 2254 { 2255 return dev->name; 2256 } 2257 2258 static struct spdk_io_channel * 2259 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2260 { 2261 struct spdk_io_channel find = {}; 2262 2263 find.dev = dev; 2264 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2265 } 2266 2267 struct spdk_io_channel * 2268 spdk_get_io_channel(void *io_device) 2269 { 2270 struct spdk_io_channel *ch; 2271 struct spdk_thread *thread; 2272 struct io_device *dev; 2273 int rc; 2274 2275 pthread_mutex_lock(&g_devlist_mutex); 2276 dev = io_device_get(io_device); 2277 if (dev == NULL) { 2278 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2279 pthread_mutex_unlock(&g_devlist_mutex); 2280 return NULL; 2281 } 2282 2283 thread = _get_thread(); 2284 if (!thread) { 2285 SPDK_ERRLOG("No thread allocated\n"); 2286 pthread_mutex_unlock(&g_devlist_mutex); 2287 return NULL; 2288 } 2289 2290 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2291 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2292 pthread_mutex_unlock(&g_devlist_mutex); 2293 return NULL; 2294 } 2295 2296 ch = thread_get_io_channel(thread, dev); 2297 if (ch != NULL) { 2298 ch->ref++; 2299 2300 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2301 ch, dev->name, dev->io_device, thread->name, ch->ref); 2302 2303 /* 2304 * An I/O channel already exists for this device on this 2305 * thread, so return it. 2306 */ 2307 pthread_mutex_unlock(&g_devlist_mutex); 2308 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2309 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2310 return ch; 2311 } 2312 2313 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2314 if (ch == NULL) { 2315 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2316 pthread_mutex_unlock(&g_devlist_mutex); 2317 return NULL; 2318 } 2319 2320 ch->dev = dev; 2321 ch->destroy_cb = dev->destroy_cb; 2322 ch->thread = thread; 2323 ch->ref = 1; 2324 ch->destroy_ref = 0; 2325 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2326 2327 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2328 ch, dev->name, dev->io_device, thread->name, ch->ref); 2329 2330 dev->refcnt++; 2331 2332 pthread_mutex_unlock(&g_devlist_mutex); 2333 2334 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2335 if (rc != 0) { 2336 pthread_mutex_lock(&g_devlist_mutex); 2337 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2338 dev->refcnt--; 2339 free(ch); 2340 pthread_mutex_unlock(&g_devlist_mutex); 2341 return NULL; 2342 } 2343 2344 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2345 return ch; 2346 } 2347 2348 static void 2349 put_io_channel(void *arg) 2350 { 2351 struct spdk_io_channel *ch = arg; 2352 bool do_remove_dev = true; 2353 struct spdk_thread *thread; 2354 2355 thread = spdk_get_thread(); 2356 if (!thread) { 2357 SPDK_ERRLOG("called from non-SPDK thread\n"); 2358 assert(false); 2359 return; 2360 } 2361 2362 SPDK_DEBUGLOG(thread, 2363 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2364 ch, ch->dev->name, ch->dev->io_device, thread->name); 2365 2366 assert(ch->thread == thread); 2367 2368 ch->destroy_ref--; 2369 2370 if (ch->ref > 0 || ch->destroy_ref > 0) { 2371 /* 2372 * Another reference to the associated io_device was requested 2373 * after this message was sent but before it had a chance to 2374 * execute. 2375 */ 2376 return; 2377 } 2378 2379 pthread_mutex_lock(&g_devlist_mutex); 2380 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2381 pthread_mutex_unlock(&g_devlist_mutex); 2382 2383 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2384 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2385 2386 pthread_mutex_lock(&g_devlist_mutex); 2387 ch->dev->refcnt--; 2388 2389 if (!ch->dev->unregistered) { 2390 do_remove_dev = false; 2391 } 2392 2393 if (ch->dev->refcnt > 0) { 2394 do_remove_dev = false; 2395 } 2396 2397 pthread_mutex_unlock(&g_devlist_mutex); 2398 2399 if (do_remove_dev) { 2400 io_device_free(ch->dev); 2401 } 2402 free(ch); 2403 } 2404 2405 void 2406 spdk_put_io_channel(struct spdk_io_channel *ch) 2407 { 2408 struct spdk_thread *thread; 2409 int rc __attribute__((unused)); 2410 2411 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2412 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2413 2414 thread = spdk_get_thread(); 2415 if (!thread) { 2416 SPDK_ERRLOG("called from non-SPDK thread\n"); 2417 assert(false); 2418 return; 2419 } 2420 2421 if (ch->thread != thread) { 2422 wrong_thread(__func__, "ch", ch->thread, thread); 2423 return; 2424 } 2425 2426 SPDK_DEBUGLOG(thread, 2427 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2428 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2429 2430 ch->ref--; 2431 2432 if (ch->ref == 0) { 2433 ch->destroy_ref++; 2434 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2435 assert(rc == 0); 2436 } 2437 } 2438 2439 struct spdk_io_channel * 2440 spdk_io_channel_from_ctx(void *ctx) 2441 { 2442 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2443 } 2444 2445 struct spdk_thread * 2446 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2447 { 2448 return ch->thread; 2449 } 2450 2451 void * 2452 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2453 { 2454 return ch->dev->io_device; 2455 } 2456 2457 const char * 2458 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2459 { 2460 return spdk_io_device_get_name(ch->dev); 2461 } 2462 2463 int 2464 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2465 { 2466 return ch->ref; 2467 } 2468 2469 struct spdk_io_channel_iter { 2470 void *io_device; 2471 struct io_device *dev; 2472 spdk_channel_msg fn; 2473 int status; 2474 void *ctx; 2475 struct spdk_io_channel *ch; 2476 2477 struct spdk_thread *cur_thread; 2478 2479 struct spdk_thread *orig_thread; 2480 spdk_channel_for_each_cpl cpl; 2481 }; 2482 2483 void * 2484 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2485 { 2486 return i->io_device; 2487 } 2488 2489 struct spdk_io_channel * 2490 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2491 { 2492 return i->ch; 2493 } 2494 2495 void * 2496 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2497 { 2498 return i->ctx; 2499 } 2500 2501 static void 2502 _call_completion(void *ctx) 2503 { 2504 struct spdk_io_channel_iter *i = ctx; 2505 2506 if (i->cpl != NULL) { 2507 i->cpl(i, i->status); 2508 } 2509 free(i); 2510 } 2511 2512 static void 2513 _call_channel(void *ctx) 2514 { 2515 struct spdk_io_channel_iter *i = ctx; 2516 struct spdk_io_channel *ch; 2517 2518 /* 2519 * It is possible that the channel was deleted before this 2520 * message had a chance to execute. If so, skip calling 2521 * the fn() on this thread. 2522 */ 2523 pthread_mutex_lock(&g_devlist_mutex); 2524 ch = thread_get_io_channel(i->cur_thread, i->dev); 2525 pthread_mutex_unlock(&g_devlist_mutex); 2526 2527 if (ch) { 2528 i->fn(i); 2529 } else { 2530 spdk_for_each_channel_continue(i, 0); 2531 } 2532 } 2533 2534 void 2535 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2536 spdk_channel_for_each_cpl cpl) 2537 { 2538 struct spdk_thread *thread; 2539 struct spdk_io_channel *ch; 2540 struct spdk_io_channel_iter *i; 2541 int rc __attribute__((unused)); 2542 2543 i = calloc(1, sizeof(*i)); 2544 if (!i) { 2545 SPDK_ERRLOG("Unable to allocate iterator\n"); 2546 assert(false); 2547 return; 2548 } 2549 2550 i->io_device = io_device; 2551 i->fn = fn; 2552 i->ctx = ctx; 2553 i->cpl = cpl; 2554 i->orig_thread = _get_thread(); 2555 2556 pthread_mutex_lock(&g_devlist_mutex); 2557 i->dev = io_device_get(io_device); 2558 if (i->dev == NULL) { 2559 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2560 assert(false); 2561 i->status = -ENODEV; 2562 goto end; 2563 } 2564 2565 /* Do not allow new for_each operations if we are already waiting to unregister 2566 * the device for other for_each operations to complete. 2567 */ 2568 if (i->dev->pending_unregister) { 2569 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2570 i->status = -ENODEV; 2571 goto end; 2572 } 2573 2574 TAILQ_FOREACH(thread, &g_threads, tailq) { 2575 ch = thread_get_io_channel(thread, i->dev); 2576 if (ch != NULL) { 2577 ch->dev->for_each_count++; 2578 i->cur_thread = thread; 2579 i->ch = ch; 2580 pthread_mutex_unlock(&g_devlist_mutex); 2581 rc = spdk_thread_send_msg(thread, _call_channel, i); 2582 assert(rc == 0); 2583 return; 2584 } 2585 } 2586 2587 end: 2588 pthread_mutex_unlock(&g_devlist_mutex); 2589 2590 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2591 assert(rc == 0); 2592 } 2593 2594 static void 2595 __pending_unregister(void *arg) 2596 { 2597 struct io_device *dev = arg; 2598 2599 assert(dev->pending_unregister); 2600 assert(dev->for_each_count == 0); 2601 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2602 } 2603 2604 void 2605 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2606 { 2607 struct spdk_thread *thread; 2608 struct spdk_io_channel *ch; 2609 struct io_device *dev; 2610 int rc __attribute__((unused)); 2611 2612 assert(i->cur_thread == spdk_get_thread()); 2613 2614 i->status = status; 2615 2616 pthread_mutex_lock(&g_devlist_mutex); 2617 dev = i->dev; 2618 if (status) { 2619 goto end; 2620 } 2621 2622 thread = TAILQ_NEXT(i->cur_thread, tailq); 2623 while (thread) { 2624 ch = thread_get_io_channel(thread, dev); 2625 if (ch != NULL) { 2626 i->cur_thread = thread; 2627 i->ch = ch; 2628 pthread_mutex_unlock(&g_devlist_mutex); 2629 rc = spdk_thread_send_msg(thread, _call_channel, i); 2630 assert(rc == 0); 2631 return; 2632 } 2633 thread = TAILQ_NEXT(thread, tailq); 2634 } 2635 2636 end: 2637 dev->for_each_count--; 2638 i->ch = NULL; 2639 pthread_mutex_unlock(&g_devlist_mutex); 2640 2641 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2642 assert(rc == 0); 2643 2644 pthread_mutex_lock(&g_devlist_mutex); 2645 if (dev->pending_unregister && dev->for_each_count == 0) { 2646 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2647 assert(rc == 0); 2648 } 2649 pthread_mutex_unlock(&g_devlist_mutex); 2650 } 2651 2652 static void 2653 thread_interrupt_destroy(struct spdk_thread *thread) 2654 { 2655 struct spdk_fd_group *fgrp = thread->fgrp; 2656 2657 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2658 2659 if (thread->msg_fd < 0) { 2660 return; 2661 } 2662 2663 spdk_fd_group_remove(fgrp, thread->msg_fd); 2664 close(thread->msg_fd); 2665 thread->msg_fd = -1; 2666 2667 spdk_fd_group_destroy(fgrp); 2668 thread->fgrp = NULL; 2669 } 2670 2671 #ifdef __linux__ 2672 static int 2673 thread_interrupt_msg_process(void *arg) 2674 { 2675 struct spdk_thread *thread = arg; 2676 struct spdk_thread *orig_thread; 2677 uint32_t msg_count; 2678 spdk_msg_fn critical_msg; 2679 int rc = 0; 2680 uint64_t notify = 1; 2681 2682 assert(spdk_interrupt_mode_is_enabled()); 2683 2684 orig_thread = spdk_get_thread(); 2685 spdk_set_thread(thread); 2686 2687 /* There may be race between msg_acknowledge and another producer's msg_notify, 2688 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2689 * This can avoid msg notification missing. 2690 */ 2691 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2692 if (rc < 0 && errno != EAGAIN) { 2693 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2694 } 2695 2696 critical_msg = thread->critical_msg; 2697 if (spdk_unlikely(critical_msg != NULL)) { 2698 critical_msg(NULL); 2699 thread->critical_msg = NULL; 2700 rc = 1; 2701 } 2702 2703 msg_count = msg_queue_run_batch(thread, 0); 2704 if (msg_count) { 2705 rc = 1; 2706 } 2707 2708 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2709 if (spdk_unlikely(!thread->in_interrupt)) { 2710 /* The thread transitioned to poll mode in a msg during the above processing. 2711 * Clear msg_fd since thread messages will be polled directly in poll mode. 2712 */ 2713 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2714 if (rc < 0 && errno != EAGAIN) { 2715 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2716 } 2717 } 2718 2719 spdk_set_thread(orig_thread); 2720 return rc; 2721 } 2722 2723 static int 2724 thread_interrupt_create(struct spdk_thread *thread) 2725 { 2726 int rc; 2727 2728 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2729 2730 rc = spdk_fd_group_create(&thread->fgrp); 2731 if (rc) { 2732 return rc; 2733 } 2734 2735 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2736 if (thread->msg_fd < 0) { 2737 rc = -errno; 2738 spdk_fd_group_destroy(thread->fgrp); 2739 thread->fgrp = NULL; 2740 2741 return rc; 2742 } 2743 2744 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2745 thread_interrupt_msg_process, thread); 2746 } 2747 #else 2748 static int 2749 thread_interrupt_create(struct spdk_thread *thread) 2750 { 2751 return -ENOTSUP; 2752 } 2753 #endif 2754 2755 static int 2756 _interrupt_wrapper(void *ctx) 2757 { 2758 struct spdk_interrupt *intr = ctx; 2759 struct spdk_thread *orig_thread, *thread; 2760 int rc; 2761 2762 orig_thread = spdk_get_thread(); 2763 thread = intr->thread; 2764 2765 spdk_set_thread(thread); 2766 2767 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2768 intr->fn, intr->arg); 2769 2770 rc = intr->fn(intr->arg); 2771 2772 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2773 2774 spdk_set_thread(orig_thread); 2775 2776 return rc; 2777 } 2778 2779 struct spdk_interrupt * 2780 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2781 void *arg, const char *name) 2782 { 2783 struct spdk_thread *thread; 2784 struct spdk_interrupt *intr; 2785 int ret; 2786 2787 thread = spdk_get_thread(); 2788 if (!thread) { 2789 assert(false); 2790 return NULL; 2791 } 2792 2793 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2794 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2795 return NULL; 2796 } 2797 2798 intr = calloc(1, sizeof(*intr)); 2799 if (intr == NULL) { 2800 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2801 return NULL; 2802 } 2803 2804 if (name) { 2805 snprintf(intr->name, sizeof(intr->name), "%s", name); 2806 } else { 2807 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2808 } 2809 2810 intr->efd = efd; 2811 intr->thread = thread; 2812 intr->fn = fn; 2813 intr->arg = arg; 2814 2815 ret = spdk_fd_group_add(thread->fgrp, efd, _interrupt_wrapper, intr, intr->name); 2816 2817 if (ret != 0) { 2818 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2819 thread->name, efd, spdk_strerror(-ret)); 2820 free(intr); 2821 return NULL; 2822 } 2823 2824 return intr; 2825 } 2826 2827 void 2828 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2829 { 2830 struct spdk_thread *thread; 2831 struct spdk_interrupt *intr; 2832 2833 intr = *pintr; 2834 if (intr == NULL) { 2835 return; 2836 } 2837 2838 *pintr = NULL; 2839 2840 thread = spdk_get_thread(); 2841 if (!thread) { 2842 assert(false); 2843 return; 2844 } 2845 2846 if (intr->thread != thread) { 2847 wrong_thread(__func__, intr->name, intr->thread, thread); 2848 return; 2849 } 2850 2851 spdk_fd_group_remove(thread->fgrp, intr->efd); 2852 free(intr); 2853 } 2854 2855 int 2856 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2857 enum spdk_interrupt_event_types event_types) 2858 { 2859 struct spdk_thread *thread; 2860 2861 thread = spdk_get_thread(); 2862 if (!thread) { 2863 assert(false); 2864 return -EINVAL; 2865 } 2866 2867 if (intr->thread != thread) { 2868 wrong_thread(__func__, intr->name, intr->thread, thread); 2869 return -EINVAL; 2870 } 2871 2872 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2873 } 2874 2875 int 2876 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2877 { 2878 return spdk_fd_group_get_fd(thread->fgrp); 2879 } 2880 2881 struct spdk_fd_group * 2882 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2883 { 2884 return thread->fgrp; 2885 } 2886 2887 static bool g_interrupt_mode = false; 2888 2889 int 2890 spdk_interrupt_mode_enable(void) 2891 { 2892 /* It must be called once prior to initializing the threading library. 2893 * g_spdk_msg_mempool will be valid if thread library is initialized. 2894 */ 2895 if (g_spdk_msg_mempool) { 2896 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2897 return -1; 2898 } 2899 2900 #ifdef __linux__ 2901 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2902 g_interrupt_mode = true; 2903 return 0; 2904 #else 2905 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2906 g_interrupt_mode = false; 2907 return -ENOTSUP; 2908 #endif 2909 } 2910 2911 bool 2912 spdk_interrupt_mode_is_enabled(void) 2913 { 2914 return g_interrupt_mode; 2915 } 2916 2917 #define SSPIN_DEBUG_STACK_FRAMES 16 2918 2919 struct sspin_stack { 2920 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 2921 uint32_t depth; 2922 }; 2923 2924 struct spdk_spinlock_internal { 2925 struct sspin_stack init_stack; 2926 struct sspin_stack lock_stack; 2927 struct sspin_stack unlock_stack; 2928 }; 2929 2930 static void 2931 sspin_init_internal(struct spdk_spinlock *sspin) 2932 { 2933 #ifdef DEBUG 2934 sspin->internal = calloc(1, sizeof(*sspin->internal)); 2935 #endif 2936 } 2937 2938 static void 2939 sspin_fini_internal(struct spdk_spinlock *sspin) 2940 { 2941 #ifdef DEBUG 2942 free(sspin->internal); 2943 sspin->internal = NULL; 2944 #endif 2945 } 2946 2947 #ifdef DEBUG 2948 #define SSPIN_GET_STACK(sspin, which) \ 2949 do { \ 2950 if (sspin->internal != NULL) { \ 2951 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 2952 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 2953 } \ 2954 } while (0) 2955 #else 2956 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 2957 #endif 2958 2959 static void 2960 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 2961 { 2962 char **stack; 2963 size_t i; 2964 2965 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 2966 if (stack == NULL) { 2967 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 2968 return; 2969 } 2970 SPDK_ERRLOG(" %s:\n", title); 2971 for (i = 0; i < sspin_stack->depth; i++) { 2972 /* 2973 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 2974 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 2975 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 2976 * gdb_macros.md in the docs directory for details. 2977 */ 2978 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 2979 } 2980 free(stack); 2981 } 2982 2983 static void 2984 sspin_stacks_print(const struct spdk_spinlock *sspin) 2985 { 2986 if (sspin->internal == NULL) { 2987 return; 2988 } 2989 SPDK_ERRLOG("spinlock %p\n", sspin); 2990 sspin_stack_print("Lock initalized at", &sspin->internal->init_stack); 2991 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 2992 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 2993 } 2994 2995 void 2996 spdk_spin_init(struct spdk_spinlock *sspin) 2997 { 2998 int rc; 2999 3000 memset(sspin, 0, sizeof(*sspin)); 3001 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 3002 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3003 sspin_init_internal(sspin); 3004 SSPIN_GET_STACK(sspin, init); 3005 sspin->initialized = true; 3006 } 3007 3008 void 3009 spdk_spin_destroy(struct spdk_spinlock *sspin) 3010 { 3011 int rc; 3012 3013 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3014 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3015 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 3016 3017 rc = pthread_spin_destroy(&sspin->spinlock); 3018 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3019 3020 sspin_fini_internal(sspin); 3021 sspin->initialized = false; 3022 sspin->destroyed = true; 3023 } 3024 3025 void 3026 spdk_spin_lock(struct spdk_spinlock *sspin) 3027 { 3028 struct spdk_thread *thread = spdk_get_thread(); 3029 int rc; 3030 3031 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3032 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3033 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3034 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3035 3036 rc = pthread_spin_lock(&sspin->spinlock); 3037 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3038 3039 sspin->thread = thread; 3040 sspin->thread->lock_count++; 3041 3042 SSPIN_GET_STACK(sspin, lock); 3043 } 3044 3045 void 3046 spdk_spin_unlock(struct spdk_spinlock *sspin) 3047 { 3048 struct spdk_thread *thread = spdk_get_thread(); 3049 int rc; 3050 3051 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3052 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3053 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3054 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3055 3056 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3057 thread->lock_count--; 3058 sspin->thread = NULL; 3059 3060 SSPIN_GET_STACK(sspin, unlock); 3061 3062 rc = pthread_spin_unlock(&sspin->spinlock); 3063 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3064 } 3065 3066 bool 3067 spdk_spin_held(struct spdk_spinlock *sspin) 3068 { 3069 struct spdk_thread *thread = spdk_get_thread(); 3070 3071 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3072 3073 return sspin->thread == thread; 3074 } 3075 3076 SPDK_LOG_REGISTER_COMPONENT(thread) 3077