1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #include <execinfo.h> 29 #endif 30 31 #ifdef __FreeBSD__ 32 #include <execinfo.h> 33 #endif 34 35 #define SPDK_MSG_BATCH_SIZE 8 36 #define SPDK_MAX_DEVICE_NAME_LEN 256 37 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 38 #define SPDK_MAX_POLLER_NAME_LEN 256 39 #define SPDK_MAX_THREAD_NAME_LEN 256 40 41 static struct spdk_thread *g_app_thread; 42 43 struct spdk_interrupt { 44 int efd; 45 struct spdk_thread *thread; 46 spdk_interrupt_fn fn; 47 void *arg; 48 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 49 }; 50 51 enum spdk_poller_state { 52 /* The poller is registered with a thread but not currently executing its fn. */ 53 SPDK_POLLER_STATE_WAITING, 54 55 /* The poller is currently running its fn. */ 56 SPDK_POLLER_STATE_RUNNING, 57 58 /* The poller was unregistered during the execution of its fn. */ 59 SPDK_POLLER_STATE_UNREGISTERED, 60 61 /* The poller is in the process of being paused. It will be paused 62 * during the next time it's supposed to be executed. 63 */ 64 SPDK_POLLER_STATE_PAUSING, 65 66 /* The poller is registered but currently paused. It's on the 67 * paused_pollers list. 68 */ 69 SPDK_POLLER_STATE_PAUSED, 70 }; 71 72 struct spdk_poller { 73 TAILQ_ENTRY(spdk_poller) tailq; 74 RB_ENTRY(spdk_poller) node; 75 76 /* Current state of the poller; should only be accessed from the poller's thread. */ 77 enum spdk_poller_state state; 78 79 uint64_t period_ticks; 80 uint64_t next_run_tick; 81 uint64_t run_count; 82 uint64_t busy_count; 83 uint64_t id; 84 spdk_poller_fn fn; 85 void *arg; 86 struct spdk_thread *thread; 87 struct spdk_interrupt *intr; 88 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 89 void *set_intr_cb_arg; 90 91 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 92 }; 93 94 enum spdk_thread_state { 95 /* The thread is processing poller and message by spdk_thread_poll(). */ 96 SPDK_THREAD_STATE_RUNNING, 97 98 /* The thread is in the process of termination. It reaps unregistering 99 * poller are releasing I/O channel. 100 */ 101 SPDK_THREAD_STATE_EXITING, 102 103 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 104 SPDK_THREAD_STATE_EXITED, 105 }; 106 107 struct spdk_thread { 108 uint64_t tsc_last; 109 struct spdk_thread_stats stats; 110 /* 111 * Contains pollers actively running on this thread. Pollers 112 * are run round-robin. The thread takes one poller from the head 113 * of the ring, executes it, then puts it back at the tail of 114 * the ring. 115 */ 116 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 117 /** 118 * Contains pollers running on this thread with a periodic timer. 119 */ 120 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 121 struct spdk_poller *first_timed_poller; 122 /* 123 * Contains paused pollers. Pollers on this queue are waiting until 124 * they are resumed (in which case they're put onto the active/timer 125 * queues) or unregistered. 126 */ 127 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 128 struct spdk_ring *messages; 129 int msg_fd; 130 SLIST_HEAD(, spdk_msg) msg_cache; 131 size_t msg_cache_count; 132 spdk_msg_fn critical_msg; 133 uint64_t id; 134 uint64_t next_poller_id; 135 enum spdk_thread_state state; 136 int pending_unregister_count; 137 138 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 139 TAILQ_ENTRY(spdk_thread) tailq; 140 141 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 142 struct spdk_cpuset cpumask; 143 uint64_t exit_timeout_tsc; 144 145 int32_t lock_count; 146 147 /* Indicates whether this spdk_thread currently runs in interrupt. */ 148 bool in_interrupt; 149 bool poller_unregistered; 150 struct spdk_fd_group *fgrp; 151 152 /* User context allocated at the end */ 153 uint8_t ctx[0]; 154 }; 155 156 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 157 158 static spdk_new_thread_fn g_new_thread_fn = NULL; 159 static spdk_thread_op_fn g_thread_op_fn = NULL; 160 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 161 static size_t g_ctx_sz = 0; 162 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 163 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 164 * SPDK application is required. 165 */ 166 static uint64_t g_thread_id = 1; 167 168 enum spin_error { 169 SPIN_ERR_NONE, 170 /* Trying to use an SPDK lock while not on an SPDK thread */ 171 SPIN_ERR_NOT_SPDK_THREAD, 172 /* Trying to lock a lock already held by this SPDK thread */ 173 SPIN_ERR_DEADLOCK, 174 /* Trying to unlock a lock not held by this SPDK thread */ 175 SPIN_ERR_WRONG_THREAD, 176 /* pthread_spin_*() returned an error */ 177 SPIN_ERR_PTHREAD, 178 /* Trying to destroy a lock that is held */ 179 SPIN_ERR_LOCK_HELD, 180 /* lock_count is invalid */ 181 SPIN_ERR_LOCK_COUNT, 182 /* 183 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 184 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 185 * deadlock when another SPDK thread on the same pthread tries to take that lock. 186 */ 187 SPIN_ERR_HOLD_DURING_SWITCH, 188 /* Trying to use a lock that was destroyed (but not re-initialized) */ 189 SPIN_ERR_DESTROYED, 190 /* Trying to use a lock that is not initialized */ 191 SPIN_ERR_NOT_INITIALIZED, 192 193 /* Must be last, not an actual error code */ 194 SPIN_ERR_LAST 195 }; 196 197 static const char *spin_error_strings[] = { 198 [SPIN_ERR_NONE] = "No error", 199 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 200 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 201 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 202 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 203 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 204 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 205 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 206 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 207 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 208 }; 209 210 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 211 ? "Unknown error" : spin_error_strings[err] 212 213 static void 214 __posix_abort(enum spin_error err) 215 { 216 abort(); 217 } 218 219 typedef void (*spin_abort)(enum spin_error err); 220 spin_abort g_spin_abort_fn = __posix_abort; 221 222 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 223 do { \ 224 if (spdk_unlikely(!(cond))) { \ 225 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 226 SPIN_ERROR_STRING(err), #cond); \ 227 extra_log; \ 228 g_spin_abort_fn(err); \ 229 ret; \ 230 } \ 231 } while (0) 232 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 233 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 234 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 235 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 236 237 struct io_device { 238 void *io_device; 239 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 240 spdk_io_channel_create_cb create_cb; 241 spdk_io_channel_destroy_cb destroy_cb; 242 spdk_io_device_unregister_cb unregister_cb; 243 struct spdk_thread *unregister_thread; 244 uint32_t ctx_size; 245 uint32_t for_each_count; 246 RB_ENTRY(io_device) node; 247 248 uint32_t refcnt; 249 250 bool pending_unregister; 251 bool unregistered; 252 }; 253 254 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 255 256 static int 257 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 258 { 259 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 260 } 261 262 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 263 264 static int 265 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 266 { 267 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 268 } 269 270 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 271 272 struct spdk_msg { 273 spdk_msg_fn fn; 274 void *arg; 275 276 SLIST_ENTRY(spdk_msg) link; 277 }; 278 279 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 280 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 281 282 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 283 static uint32_t g_thread_count = 0; 284 285 static __thread struct spdk_thread *tls_thread = NULL; 286 287 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 288 { 289 spdk_trace_register_description("THREAD_IOCH_GET", 290 TRACE_THREAD_IOCH_GET, 291 OWNER_NONE, OBJECT_NONE, 0, 292 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 293 spdk_trace_register_description("THREAD_IOCH_PUT", 294 TRACE_THREAD_IOCH_PUT, 295 OWNER_NONE, OBJECT_NONE, 0, 296 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 297 } 298 299 /* 300 * If this compare function returns zero when two next_run_ticks are equal, 301 * the macro RB_INSERT() returns a pointer to the element with the same 302 * next_run_tick. 303 * 304 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 305 * to remove as a parameter. 306 * 307 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 308 * side by returning 1 when two next_run_ticks are equal. 309 */ 310 static inline int 311 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 312 { 313 if (poller1->next_run_tick < poller2->next_run_tick) { 314 return -1; 315 } else { 316 return 1; 317 } 318 } 319 320 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 321 322 static inline struct spdk_thread * 323 _get_thread(void) 324 { 325 return tls_thread; 326 } 327 328 static int 329 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 330 { 331 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 332 333 g_ctx_sz = ctx_sz; 334 335 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 336 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 337 sizeof(struct spdk_msg), 338 0, /* No cache. We do our own. */ 339 SPDK_ENV_SOCKET_ID_ANY); 340 341 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 342 msg_mempool_sz); 343 344 if (!g_spdk_msg_mempool) { 345 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 346 return -ENOMEM; 347 } 348 349 return 0; 350 } 351 352 static void thread_interrupt_destroy(struct spdk_thread *thread); 353 static int thread_interrupt_create(struct spdk_thread *thread); 354 355 static void 356 _free_thread(struct spdk_thread *thread) 357 { 358 struct spdk_io_channel *ch; 359 struct spdk_msg *msg; 360 struct spdk_poller *poller, *ptmp; 361 362 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 363 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 364 thread->name, ch->dev->name); 365 } 366 367 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 368 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 369 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 370 poller->name); 371 } 372 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 373 free(poller); 374 } 375 376 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 377 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 378 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 379 poller->name); 380 } 381 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 382 free(poller); 383 } 384 385 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 386 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 387 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 388 free(poller); 389 } 390 391 pthread_mutex_lock(&g_devlist_mutex); 392 assert(g_thread_count > 0); 393 g_thread_count--; 394 TAILQ_REMOVE(&g_threads, thread, tailq); 395 pthread_mutex_unlock(&g_devlist_mutex); 396 397 msg = SLIST_FIRST(&thread->msg_cache); 398 while (msg != NULL) { 399 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 400 401 assert(thread->msg_cache_count > 0); 402 thread->msg_cache_count--; 403 spdk_mempool_put(g_spdk_msg_mempool, msg); 404 405 msg = SLIST_FIRST(&thread->msg_cache); 406 } 407 408 assert(thread->msg_cache_count == 0); 409 410 if (spdk_interrupt_mode_is_enabled()) { 411 thread_interrupt_destroy(thread); 412 } 413 414 spdk_ring_free(thread->messages); 415 free(thread); 416 } 417 418 int 419 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 420 { 421 assert(g_new_thread_fn == NULL); 422 assert(g_thread_op_fn == NULL); 423 424 if (new_thread_fn == NULL) { 425 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 426 } else { 427 g_new_thread_fn = new_thread_fn; 428 } 429 430 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 431 } 432 433 int 434 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 435 spdk_thread_op_supported_fn thread_op_supported_fn, 436 size_t ctx_sz, size_t msg_mempool_sz) 437 { 438 assert(g_new_thread_fn == NULL); 439 assert(g_thread_op_fn == NULL); 440 assert(g_thread_op_supported_fn == NULL); 441 442 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 443 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 444 return -EINVAL; 445 } 446 447 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 448 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 449 } else { 450 g_thread_op_fn = thread_op_fn; 451 g_thread_op_supported_fn = thread_op_supported_fn; 452 } 453 454 return _thread_lib_init(ctx_sz, msg_mempool_sz); 455 } 456 457 void 458 spdk_thread_lib_fini(void) 459 { 460 struct io_device *dev; 461 462 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 463 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 464 } 465 466 g_new_thread_fn = NULL; 467 g_thread_op_fn = NULL; 468 g_thread_op_supported_fn = NULL; 469 g_ctx_sz = 0; 470 if (g_app_thread != NULL) { 471 _free_thread(g_app_thread); 472 g_app_thread = NULL; 473 } 474 475 if (g_spdk_msg_mempool) { 476 spdk_mempool_free(g_spdk_msg_mempool); 477 g_spdk_msg_mempool = NULL; 478 } 479 } 480 481 struct spdk_thread * 482 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 483 { 484 struct spdk_thread *thread, *null_thread; 485 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 486 int rc = 0, i; 487 488 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 489 if (!thread) { 490 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 491 return NULL; 492 } 493 494 if (cpumask) { 495 spdk_cpuset_copy(&thread->cpumask, cpumask); 496 } else { 497 spdk_cpuset_negate(&thread->cpumask); 498 } 499 500 RB_INIT(&thread->io_channels); 501 TAILQ_INIT(&thread->active_pollers); 502 RB_INIT(&thread->timed_pollers); 503 TAILQ_INIT(&thread->paused_pollers); 504 SLIST_INIT(&thread->msg_cache); 505 thread->msg_cache_count = 0; 506 507 thread->tsc_last = spdk_get_ticks(); 508 509 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 510 * ID exceeds UINT64_MAX a warning message is logged 511 */ 512 thread->next_poller_id = 1; 513 514 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 515 if (!thread->messages) { 516 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 517 free(thread); 518 return NULL; 519 } 520 521 /* Fill the local message pool cache. */ 522 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 523 if (rc == 0) { 524 /* If we can't populate the cache it's ok. The cache will get filled 525 * up organically as messages are passed to the thread. */ 526 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 527 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 528 thread->msg_cache_count++; 529 } 530 } 531 532 if (name) { 533 snprintf(thread->name, sizeof(thread->name), "%s", name); 534 } else { 535 snprintf(thread->name, sizeof(thread->name), "%p", thread); 536 } 537 538 pthread_mutex_lock(&g_devlist_mutex); 539 if (g_thread_id == 0) { 540 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 541 pthread_mutex_unlock(&g_devlist_mutex); 542 _free_thread(thread); 543 return NULL; 544 } 545 thread->id = g_thread_id++; 546 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 547 g_thread_count++; 548 pthread_mutex_unlock(&g_devlist_mutex); 549 550 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 551 thread->id, thread->name); 552 553 if (spdk_interrupt_mode_is_enabled()) { 554 thread->in_interrupt = true; 555 rc = thread_interrupt_create(thread); 556 if (rc != 0) { 557 _free_thread(thread); 558 return NULL; 559 } 560 } 561 562 if (g_new_thread_fn) { 563 rc = g_new_thread_fn(thread); 564 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 565 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 566 } 567 568 if (rc != 0) { 569 _free_thread(thread); 570 return NULL; 571 } 572 573 thread->state = SPDK_THREAD_STATE_RUNNING; 574 575 /* If this is the first thread, save it as the app thread. Use an atomic 576 * compare + exchange to guard against crazy users who might try to 577 * call spdk_thread_create() simultaneously on multiple threads. 578 */ 579 null_thread = NULL; 580 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 581 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 582 583 return thread; 584 } 585 586 struct spdk_thread * 587 spdk_thread_get_app_thread(void) 588 { 589 return g_app_thread; 590 } 591 592 void 593 spdk_set_thread(struct spdk_thread *thread) 594 { 595 tls_thread = thread; 596 } 597 598 static void 599 thread_exit(struct spdk_thread *thread, uint64_t now) 600 { 601 struct spdk_poller *poller; 602 struct spdk_io_channel *ch; 603 604 if (now >= thread->exit_timeout_tsc) { 605 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 606 thread->name); 607 goto exited; 608 } 609 610 if (spdk_ring_count(thread->messages) > 0) { 611 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 612 return; 613 } 614 615 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 616 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 617 SPDK_INFOLOG(thread, 618 "thread %s still has active poller %s\n", 619 thread->name, poller->name); 620 return; 621 } 622 } 623 624 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 625 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 626 SPDK_INFOLOG(thread, 627 "thread %s still has active timed poller %s\n", 628 thread->name, poller->name); 629 return; 630 } 631 } 632 633 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 634 SPDK_INFOLOG(thread, 635 "thread %s still has paused poller %s\n", 636 thread->name, poller->name); 637 return; 638 } 639 640 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 641 SPDK_INFOLOG(thread, 642 "thread %s still has channel for io_device %s\n", 643 thread->name, ch->dev->name); 644 return; 645 } 646 647 if (thread->pending_unregister_count > 0) { 648 SPDK_INFOLOG(thread, 649 "thread %s is still unregistering io_devices\n", 650 thread->name); 651 return; 652 } 653 654 exited: 655 thread->state = SPDK_THREAD_STATE_EXITED; 656 if (spdk_unlikely(thread->in_interrupt)) { 657 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 658 } 659 } 660 661 static void _thread_exit(void *ctx); 662 663 int 664 spdk_thread_exit(struct spdk_thread *thread) 665 { 666 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 667 668 assert(tls_thread == thread); 669 670 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 671 SPDK_INFOLOG(thread, 672 "thread %s is already exiting\n", 673 thread->name); 674 return 0; 675 } 676 677 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 678 SPDK_THREAD_EXIT_TIMEOUT_SEC); 679 thread->state = SPDK_THREAD_STATE_EXITING; 680 681 if (spdk_interrupt_mode_is_enabled()) { 682 spdk_thread_send_msg(thread, _thread_exit, thread); 683 } 684 685 return 0; 686 } 687 688 bool 689 spdk_thread_is_running(struct spdk_thread *thread) 690 { 691 return thread->state == SPDK_THREAD_STATE_RUNNING; 692 } 693 694 bool 695 spdk_thread_is_exited(struct spdk_thread *thread) 696 { 697 return thread->state == SPDK_THREAD_STATE_EXITED; 698 } 699 700 void 701 spdk_thread_destroy(struct spdk_thread *thread) 702 { 703 assert(thread != NULL); 704 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 705 706 assert(thread->state == SPDK_THREAD_STATE_EXITED); 707 708 if (tls_thread == thread) { 709 tls_thread = NULL; 710 } 711 712 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 713 if (thread != g_app_thread) { 714 _free_thread(thread); 715 } 716 } 717 718 void * 719 spdk_thread_get_ctx(struct spdk_thread *thread) 720 { 721 if (g_ctx_sz > 0) { 722 return thread->ctx; 723 } 724 725 return NULL; 726 } 727 728 struct spdk_cpuset * 729 spdk_thread_get_cpumask(struct spdk_thread *thread) 730 { 731 return &thread->cpumask; 732 } 733 734 int 735 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 736 { 737 struct spdk_thread *thread; 738 739 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 740 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 741 assert(false); 742 return -ENOTSUP; 743 } 744 745 thread = spdk_get_thread(); 746 if (!thread) { 747 SPDK_ERRLOG("Called from non-SPDK thread\n"); 748 assert(false); 749 return -EINVAL; 750 } 751 752 spdk_cpuset_copy(&thread->cpumask, cpumask); 753 754 /* Invoke framework's reschedule operation. If this function is called multiple times 755 * in a single spdk_thread_poll() context, the last cpumask will be used in the 756 * reschedule operation. 757 */ 758 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 759 760 return 0; 761 } 762 763 struct spdk_thread * 764 spdk_thread_get_from_ctx(void *ctx) 765 { 766 if (ctx == NULL) { 767 assert(false); 768 return NULL; 769 } 770 771 assert(g_ctx_sz > 0); 772 773 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 774 } 775 776 static inline uint32_t 777 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 778 { 779 unsigned count, i; 780 void *messages[SPDK_MSG_BATCH_SIZE]; 781 uint64_t notify = 1; 782 int rc; 783 784 #ifdef DEBUG 785 /* 786 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 787 * so we will never actually read uninitialized data from events, but just to be sure 788 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 789 */ 790 memset(messages, 0, sizeof(messages)); 791 #endif 792 793 if (max_msgs > 0) { 794 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 795 } else { 796 max_msgs = SPDK_MSG_BATCH_SIZE; 797 } 798 799 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 800 if (spdk_unlikely(thread->in_interrupt) && 801 spdk_ring_count(thread->messages) != 0) { 802 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 803 if (rc < 0) { 804 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 805 } 806 } 807 if (count == 0) { 808 return 0; 809 } 810 811 for (i = 0; i < count; i++) { 812 struct spdk_msg *msg = messages[i]; 813 814 assert(msg != NULL); 815 816 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 817 818 msg->fn(msg->arg); 819 820 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 821 822 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 823 /* Insert the messages at the head. We want to re-use the hot 824 * ones. */ 825 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 826 thread->msg_cache_count++; 827 } else { 828 spdk_mempool_put(g_spdk_msg_mempool, msg); 829 } 830 } 831 832 return count; 833 } 834 835 static void 836 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 837 { 838 struct spdk_poller *tmp __attribute__((unused)); 839 840 poller->next_run_tick = now + poller->period_ticks; 841 842 /* 843 * Insert poller in the thread's timed_pollers tree by next scheduled run time 844 * as its key. 845 */ 846 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 847 assert(tmp == NULL); 848 849 /* Update the cache only if it is empty or the inserted poller is earlier than it. 850 * RB_MIN() is not necessary here because all pollers, which has exactly the same 851 * next_run_tick as the existing poller, are inserted on the right side. 852 */ 853 if (thread->first_timed_poller == NULL || 854 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 855 thread->first_timed_poller = poller; 856 } 857 } 858 859 static inline void 860 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 861 { 862 struct spdk_poller *tmp __attribute__((unused)); 863 864 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 865 assert(tmp != NULL); 866 867 /* This function is not used in any case that is performance critical. 868 * Update the cache simply by RB_MIN() if it needs to be changed. 869 */ 870 if (thread->first_timed_poller == poller) { 871 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 872 } 873 } 874 875 static void 876 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 877 { 878 if (poller->period_ticks) { 879 poller_insert_timer(thread, poller, spdk_get_ticks()); 880 } else { 881 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 882 } 883 } 884 885 static inline void 886 thread_update_stats(struct spdk_thread *thread, uint64_t end, 887 uint64_t start, int rc) 888 { 889 if (rc == 0) { 890 /* Poller status idle */ 891 thread->stats.idle_tsc += end - start; 892 } else if (rc > 0) { 893 /* Poller status busy */ 894 thread->stats.busy_tsc += end - start; 895 } 896 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 897 thread->tsc_last = end; 898 } 899 900 static inline int 901 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 902 { 903 int rc; 904 905 switch (poller->state) { 906 case SPDK_POLLER_STATE_UNREGISTERED: 907 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 908 free(poller); 909 return 0; 910 case SPDK_POLLER_STATE_PAUSING: 911 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 912 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 913 poller->state = SPDK_POLLER_STATE_PAUSED; 914 return 0; 915 case SPDK_POLLER_STATE_WAITING: 916 break; 917 default: 918 assert(false); 919 break; 920 } 921 922 poller->state = SPDK_POLLER_STATE_RUNNING; 923 rc = poller->fn(poller->arg); 924 925 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 926 927 poller->run_count++; 928 if (rc > 0) { 929 poller->busy_count++; 930 } 931 932 #ifdef DEBUG 933 if (rc == -1) { 934 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 935 } 936 #endif 937 938 switch (poller->state) { 939 case SPDK_POLLER_STATE_UNREGISTERED: 940 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 941 free(poller); 942 break; 943 case SPDK_POLLER_STATE_PAUSING: 944 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 945 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 946 poller->state = SPDK_POLLER_STATE_PAUSED; 947 break; 948 case SPDK_POLLER_STATE_PAUSED: 949 case SPDK_POLLER_STATE_WAITING: 950 break; 951 case SPDK_POLLER_STATE_RUNNING: 952 poller->state = SPDK_POLLER_STATE_WAITING; 953 break; 954 default: 955 assert(false); 956 break; 957 } 958 959 return rc; 960 } 961 962 static inline int 963 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 964 uint64_t now) 965 { 966 int rc; 967 968 switch (poller->state) { 969 case SPDK_POLLER_STATE_UNREGISTERED: 970 free(poller); 971 return 0; 972 case SPDK_POLLER_STATE_PAUSING: 973 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 974 poller->state = SPDK_POLLER_STATE_PAUSED; 975 return 0; 976 case SPDK_POLLER_STATE_WAITING: 977 break; 978 default: 979 assert(false); 980 break; 981 } 982 983 poller->state = SPDK_POLLER_STATE_RUNNING; 984 rc = poller->fn(poller->arg); 985 986 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 987 988 poller->run_count++; 989 if (rc > 0) { 990 poller->busy_count++; 991 } 992 993 #ifdef DEBUG 994 if (rc == -1) { 995 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 996 } 997 #endif 998 999 switch (poller->state) { 1000 case SPDK_POLLER_STATE_UNREGISTERED: 1001 free(poller); 1002 break; 1003 case SPDK_POLLER_STATE_PAUSING: 1004 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1005 poller->state = SPDK_POLLER_STATE_PAUSED; 1006 break; 1007 case SPDK_POLLER_STATE_PAUSED: 1008 break; 1009 case SPDK_POLLER_STATE_RUNNING: 1010 poller->state = SPDK_POLLER_STATE_WAITING; 1011 /* fallthrough */ 1012 case SPDK_POLLER_STATE_WAITING: 1013 poller_insert_timer(thread, poller, now); 1014 break; 1015 default: 1016 assert(false); 1017 break; 1018 } 1019 1020 return rc; 1021 } 1022 1023 static int 1024 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1025 { 1026 uint32_t msg_count; 1027 struct spdk_poller *poller, *tmp; 1028 spdk_msg_fn critical_msg; 1029 int rc = 0; 1030 1031 thread->tsc_last = now; 1032 1033 critical_msg = thread->critical_msg; 1034 if (spdk_unlikely(critical_msg != NULL)) { 1035 critical_msg(NULL); 1036 thread->critical_msg = NULL; 1037 rc = 1; 1038 } 1039 1040 msg_count = msg_queue_run_batch(thread, max_msgs); 1041 if (msg_count) { 1042 rc = 1; 1043 } 1044 1045 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1046 active_pollers_head, tailq, tmp) { 1047 int poller_rc; 1048 1049 poller_rc = thread_execute_poller(thread, poller); 1050 if (poller_rc > rc) { 1051 rc = poller_rc; 1052 } 1053 } 1054 1055 poller = thread->first_timed_poller; 1056 while (poller != NULL) { 1057 int timer_rc = 0; 1058 1059 if (now < poller->next_run_tick) { 1060 break; 1061 } 1062 1063 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1064 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1065 1066 /* Update the cache to the next timed poller in the list 1067 * only if the current poller is still the closest, otherwise, 1068 * do nothing because the cache has been already updated. 1069 */ 1070 if (thread->first_timed_poller == poller) { 1071 thread->first_timed_poller = tmp; 1072 } 1073 1074 timer_rc = thread_execute_timed_poller(thread, poller, now); 1075 if (timer_rc > rc) { 1076 rc = timer_rc; 1077 } 1078 1079 poller = tmp; 1080 } 1081 1082 return rc; 1083 } 1084 1085 static void 1086 _thread_remove_pollers(void *ctx) 1087 { 1088 struct spdk_thread *thread = ctx; 1089 struct spdk_poller *poller, *tmp; 1090 1091 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1092 active_pollers_head, tailq, tmp) { 1093 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1094 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1095 free(poller); 1096 } 1097 } 1098 1099 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1100 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1101 poller_remove_timer(thread, poller); 1102 free(poller); 1103 } 1104 } 1105 1106 thread->poller_unregistered = false; 1107 } 1108 1109 static void 1110 _thread_exit(void *ctx) 1111 { 1112 struct spdk_thread *thread = ctx; 1113 1114 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1115 1116 thread_exit(thread, spdk_get_ticks()); 1117 } 1118 1119 int 1120 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1121 { 1122 struct spdk_thread *orig_thread; 1123 int rc; 1124 1125 orig_thread = _get_thread(); 1126 tls_thread = thread; 1127 1128 if (now == 0) { 1129 now = spdk_get_ticks(); 1130 } 1131 1132 if (spdk_likely(!thread->in_interrupt)) { 1133 rc = thread_poll(thread, max_msgs, now); 1134 if (spdk_unlikely(thread->in_interrupt)) { 1135 /* The thread transitioned to interrupt mode during the above poll. 1136 * Poll it one more time in case that during the transition time 1137 * there is msg received without notification. 1138 */ 1139 rc = thread_poll(thread, max_msgs, now); 1140 } 1141 1142 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1143 thread_exit(thread, now); 1144 } 1145 } else { 1146 /* Non-block wait on thread's fd_group */ 1147 rc = spdk_fd_group_wait(thread->fgrp, 0); 1148 } 1149 1150 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1151 1152 tls_thread = orig_thread; 1153 1154 return rc; 1155 } 1156 1157 uint64_t 1158 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1159 { 1160 struct spdk_poller *poller; 1161 1162 poller = thread->first_timed_poller; 1163 if (poller) { 1164 return poller->next_run_tick; 1165 } 1166 1167 return 0; 1168 } 1169 1170 int 1171 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1172 { 1173 return !TAILQ_EMPTY(&thread->active_pollers); 1174 } 1175 1176 static bool 1177 thread_has_unpaused_pollers(struct spdk_thread *thread) 1178 { 1179 if (TAILQ_EMPTY(&thread->active_pollers) && 1180 RB_EMPTY(&thread->timed_pollers)) { 1181 return false; 1182 } 1183 1184 return true; 1185 } 1186 1187 bool 1188 spdk_thread_has_pollers(struct spdk_thread *thread) 1189 { 1190 if (!thread_has_unpaused_pollers(thread) && 1191 TAILQ_EMPTY(&thread->paused_pollers)) { 1192 return false; 1193 } 1194 1195 return true; 1196 } 1197 1198 bool 1199 spdk_thread_is_idle(struct spdk_thread *thread) 1200 { 1201 if (spdk_ring_count(thread->messages) || 1202 thread_has_unpaused_pollers(thread) || 1203 thread->critical_msg != NULL) { 1204 return false; 1205 } 1206 1207 return true; 1208 } 1209 1210 uint32_t 1211 spdk_thread_get_count(void) 1212 { 1213 /* 1214 * Return cached value of the current thread count. We could acquire the 1215 * lock and iterate through the TAILQ of threads to count them, but that 1216 * count could still be invalidated after we release the lock. 1217 */ 1218 return g_thread_count; 1219 } 1220 1221 struct spdk_thread * 1222 spdk_get_thread(void) 1223 { 1224 return _get_thread(); 1225 } 1226 1227 const char * 1228 spdk_thread_get_name(const struct spdk_thread *thread) 1229 { 1230 return thread->name; 1231 } 1232 1233 uint64_t 1234 spdk_thread_get_id(const struct spdk_thread *thread) 1235 { 1236 return thread->id; 1237 } 1238 1239 struct spdk_thread * 1240 spdk_thread_get_by_id(uint64_t id) 1241 { 1242 struct spdk_thread *thread; 1243 1244 if (id == 0 || id >= g_thread_id) { 1245 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1246 return NULL; 1247 } 1248 pthread_mutex_lock(&g_devlist_mutex); 1249 TAILQ_FOREACH(thread, &g_threads, tailq) { 1250 if (thread->id == id) { 1251 break; 1252 } 1253 } 1254 pthread_mutex_unlock(&g_devlist_mutex); 1255 return thread; 1256 } 1257 1258 int 1259 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1260 { 1261 struct spdk_thread *thread; 1262 1263 thread = _get_thread(); 1264 if (!thread) { 1265 SPDK_ERRLOG("No thread allocated\n"); 1266 return -EINVAL; 1267 } 1268 1269 if (stats == NULL) { 1270 return -EINVAL; 1271 } 1272 1273 *stats = thread->stats; 1274 1275 return 0; 1276 } 1277 1278 uint64_t 1279 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1280 { 1281 if (thread == NULL) { 1282 thread = _get_thread(); 1283 } 1284 1285 return thread->tsc_last; 1286 } 1287 1288 static inline int 1289 thread_send_msg_notification(const struct spdk_thread *target_thread) 1290 { 1291 uint64_t notify = 1; 1292 int rc; 1293 1294 /* Not necessary to do notification if interrupt facility is not enabled */ 1295 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1296 return 0; 1297 } 1298 1299 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1300 * after sending thread msg, it is necessary to check whether target thread runs in 1301 * interrupt mode and then decide whether do event notification. 1302 */ 1303 if (spdk_unlikely(target_thread->in_interrupt)) { 1304 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1305 if (rc < 0) { 1306 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1307 return -EIO; 1308 } 1309 } 1310 1311 return 0; 1312 } 1313 1314 int 1315 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1316 { 1317 struct spdk_thread *local_thread; 1318 struct spdk_msg *msg; 1319 int rc; 1320 1321 assert(thread != NULL); 1322 1323 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1324 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1325 return -EIO; 1326 } 1327 1328 local_thread = _get_thread(); 1329 1330 msg = NULL; 1331 if (local_thread != NULL) { 1332 if (local_thread->msg_cache_count > 0) { 1333 msg = SLIST_FIRST(&local_thread->msg_cache); 1334 assert(msg != NULL); 1335 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1336 local_thread->msg_cache_count--; 1337 } 1338 } 1339 1340 if (msg == NULL) { 1341 msg = spdk_mempool_get(g_spdk_msg_mempool); 1342 if (!msg) { 1343 SPDK_ERRLOG("msg could not be allocated\n"); 1344 return -ENOMEM; 1345 } 1346 } 1347 1348 msg->fn = fn; 1349 msg->arg = ctx; 1350 1351 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1352 if (rc != 1) { 1353 SPDK_ERRLOG("msg could not be enqueued\n"); 1354 spdk_mempool_put(g_spdk_msg_mempool, msg); 1355 return -EIO; 1356 } 1357 1358 return thread_send_msg_notification(thread); 1359 } 1360 1361 int 1362 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1363 { 1364 spdk_msg_fn expected = NULL; 1365 1366 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1367 __ATOMIC_SEQ_CST)) { 1368 return -EIO; 1369 } 1370 1371 return thread_send_msg_notification(thread); 1372 } 1373 1374 #ifdef __linux__ 1375 static int 1376 interrupt_timerfd_process(void *arg) 1377 { 1378 struct spdk_poller *poller = arg; 1379 uint64_t exp; 1380 int rc; 1381 1382 /* clear the level of interval timer */ 1383 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1384 if (rc < 0) { 1385 if (rc == -EAGAIN) { 1386 return 0; 1387 } 1388 1389 return rc; 1390 } 1391 1392 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1393 1394 return poller->fn(poller->arg); 1395 } 1396 1397 static int 1398 period_poller_interrupt_init(struct spdk_poller *poller) 1399 { 1400 int timerfd; 1401 1402 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1403 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1404 if (timerfd < 0) { 1405 return -errno; 1406 } 1407 1408 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1409 if (poller->intr == NULL) { 1410 close(timerfd); 1411 return -1; 1412 } 1413 1414 return 0; 1415 } 1416 1417 static void 1418 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1419 { 1420 int timerfd; 1421 uint64_t now_tick = spdk_get_ticks(); 1422 uint64_t ticks = spdk_get_ticks_hz(); 1423 int ret; 1424 struct itimerspec new_tv = {}; 1425 struct itimerspec old_tv = {}; 1426 1427 assert(poller->intr != NULL); 1428 assert(poller->period_ticks != 0); 1429 1430 timerfd = poller->intr->efd; 1431 1432 assert(timerfd >= 0); 1433 1434 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1435 interrupt_mode ? "interrupt" : "poll"); 1436 1437 if (interrupt_mode) { 1438 /* Set repeated timer expiration */ 1439 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1440 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1441 1442 /* Update next timer expiration */ 1443 if (poller->next_run_tick == 0) { 1444 poller->next_run_tick = now_tick + poller->period_ticks; 1445 } else if (poller->next_run_tick < now_tick) { 1446 poller->next_run_tick = now_tick; 1447 } 1448 1449 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1450 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1451 1452 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1453 if (ret < 0) { 1454 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1455 assert(false); 1456 } 1457 } else { 1458 /* Disarm the timer */ 1459 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1460 if (ret < 0) { 1461 /* timerfd_settime's failure indicates that the timerfd is in error */ 1462 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1463 assert(false); 1464 } 1465 1466 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1467 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1468 */ 1469 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1470 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1471 poller_remove_timer(poller->thread, poller); 1472 poller_insert_timer(poller->thread, poller, now_tick); 1473 } 1474 } 1475 1476 static void 1477 poller_interrupt_fini(struct spdk_poller *poller) 1478 { 1479 int fd; 1480 1481 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1482 assert(poller->intr != NULL); 1483 fd = poller->intr->efd; 1484 spdk_interrupt_unregister(&poller->intr); 1485 close(fd); 1486 } 1487 1488 static int 1489 busy_poller_interrupt_init(struct spdk_poller *poller) 1490 { 1491 int busy_efd; 1492 1493 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1494 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1495 if (busy_efd < 0) { 1496 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1497 return -errno; 1498 } 1499 1500 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1501 if (poller->intr == NULL) { 1502 close(busy_efd); 1503 return -1; 1504 } 1505 1506 return 0; 1507 } 1508 1509 static void 1510 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1511 { 1512 int busy_efd = poller->intr->efd; 1513 uint64_t notify = 1; 1514 int rc __attribute__((unused)); 1515 1516 assert(busy_efd >= 0); 1517 1518 if (interrupt_mode) { 1519 /* Write without read on eventfd will get it repeatedly triggered. */ 1520 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1521 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1522 } 1523 } else { 1524 /* Read on eventfd will clear its level triggering. */ 1525 rc = read(busy_efd, ¬ify, sizeof(notify)); 1526 } 1527 } 1528 1529 #else 1530 1531 static int 1532 period_poller_interrupt_init(struct spdk_poller *poller) 1533 { 1534 return -ENOTSUP; 1535 } 1536 1537 static void 1538 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1539 { 1540 } 1541 1542 static void 1543 poller_interrupt_fini(struct spdk_poller *poller) 1544 { 1545 } 1546 1547 static int 1548 busy_poller_interrupt_init(struct spdk_poller *poller) 1549 { 1550 return -ENOTSUP; 1551 } 1552 1553 static void 1554 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1555 { 1556 } 1557 1558 #endif 1559 1560 void 1561 spdk_poller_register_interrupt(struct spdk_poller *poller, 1562 spdk_poller_set_interrupt_mode_cb cb_fn, 1563 void *cb_arg) 1564 { 1565 assert(poller != NULL); 1566 assert(cb_fn != NULL); 1567 assert(spdk_get_thread() == poller->thread); 1568 1569 if (!spdk_interrupt_mode_is_enabled()) { 1570 return; 1571 } 1572 1573 /* If this poller already had an interrupt, clean the old one up. */ 1574 if (poller->intr != NULL) { 1575 poller_interrupt_fini(poller); 1576 } 1577 1578 poller->set_intr_cb_fn = cb_fn; 1579 poller->set_intr_cb_arg = cb_arg; 1580 1581 /* Set poller into interrupt mode if thread is in interrupt. */ 1582 if (poller->thread->in_interrupt) { 1583 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1584 } 1585 } 1586 1587 static uint64_t 1588 convert_us_to_ticks(uint64_t us) 1589 { 1590 uint64_t quotient, remainder, ticks; 1591 1592 if (us) { 1593 quotient = us / SPDK_SEC_TO_USEC; 1594 remainder = us % SPDK_SEC_TO_USEC; 1595 ticks = spdk_get_ticks_hz(); 1596 1597 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1598 } else { 1599 return 0; 1600 } 1601 } 1602 1603 static struct spdk_poller * 1604 poller_register(spdk_poller_fn fn, 1605 void *arg, 1606 uint64_t period_microseconds, 1607 const char *name) 1608 { 1609 struct spdk_thread *thread; 1610 struct spdk_poller *poller; 1611 1612 thread = spdk_get_thread(); 1613 if (!thread) { 1614 assert(false); 1615 return NULL; 1616 } 1617 1618 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1619 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1620 return NULL; 1621 } 1622 1623 poller = calloc(1, sizeof(*poller)); 1624 if (poller == NULL) { 1625 SPDK_ERRLOG("Poller memory allocation failed\n"); 1626 return NULL; 1627 } 1628 1629 if (name) { 1630 snprintf(poller->name, sizeof(poller->name), "%s", name); 1631 } else { 1632 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1633 } 1634 1635 poller->state = SPDK_POLLER_STATE_WAITING; 1636 poller->fn = fn; 1637 poller->arg = arg; 1638 poller->thread = thread; 1639 poller->intr = NULL; 1640 if (thread->next_poller_id == 0) { 1641 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1642 thread->next_poller_id = 1; 1643 } 1644 poller->id = thread->next_poller_id++; 1645 1646 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1647 1648 if (spdk_interrupt_mode_is_enabled()) { 1649 int rc; 1650 1651 if (period_microseconds) { 1652 rc = period_poller_interrupt_init(poller); 1653 if (rc < 0) { 1654 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1655 free(poller); 1656 return NULL; 1657 } 1658 1659 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1660 poller->set_intr_cb_arg = NULL; 1661 1662 } else { 1663 /* If the poller doesn't have a period, create interruptfd that's always 1664 * busy automatically when running in interrupt mode. 1665 */ 1666 rc = busy_poller_interrupt_init(poller); 1667 if (rc > 0) { 1668 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1669 free(poller); 1670 return NULL; 1671 } 1672 1673 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1674 poller->set_intr_cb_arg = NULL; 1675 } 1676 1677 /* Set poller into interrupt mode if thread is in interrupt. */ 1678 if (poller->thread->in_interrupt) { 1679 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1680 } 1681 } 1682 1683 thread_insert_poller(thread, poller); 1684 1685 return poller; 1686 } 1687 1688 struct spdk_poller * 1689 spdk_poller_register(spdk_poller_fn fn, 1690 void *arg, 1691 uint64_t period_microseconds) 1692 { 1693 return poller_register(fn, arg, period_microseconds, NULL); 1694 } 1695 1696 struct spdk_poller * 1697 spdk_poller_register_named(spdk_poller_fn fn, 1698 void *arg, 1699 uint64_t period_microseconds, 1700 const char *name) 1701 { 1702 return poller_register(fn, arg, period_microseconds, name); 1703 } 1704 1705 static void 1706 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1707 struct spdk_thread *curthread) 1708 { 1709 if (thread == NULL) { 1710 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1711 abort(); 1712 } 1713 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1714 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1715 thread->name, thread->id); 1716 assert(false); 1717 } 1718 1719 void 1720 spdk_poller_unregister(struct spdk_poller **ppoller) 1721 { 1722 struct spdk_thread *thread; 1723 struct spdk_poller *poller; 1724 1725 poller = *ppoller; 1726 if (poller == NULL) { 1727 return; 1728 } 1729 1730 *ppoller = NULL; 1731 1732 thread = spdk_get_thread(); 1733 if (!thread) { 1734 assert(false); 1735 return; 1736 } 1737 1738 if (poller->thread != thread) { 1739 wrong_thread(__func__, poller->name, poller->thread, thread); 1740 return; 1741 } 1742 1743 if (spdk_interrupt_mode_is_enabled()) { 1744 /* Release the interrupt resource for period or busy poller */ 1745 if (poller->intr != NULL) { 1746 poller_interrupt_fini(poller); 1747 } 1748 1749 /* If there is not already a pending poller removal, generate 1750 * a message to go process removals. */ 1751 if (!thread->poller_unregistered) { 1752 thread->poller_unregistered = true; 1753 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1754 } 1755 } 1756 1757 /* If the poller was paused, put it on the active_pollers list so that 1758 * its unregistration can be processed by spdk_thread_poll(). 1759 */ 1760 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1761 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1762 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1763 poller->period_ticks = 0; 1764 } 1765 1766 /* Simply set the state to unregistered. The poller will get cleaned up 1767 * in a subsequent call to spdk_thread_poll(). 1768 */ 1769 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1770 } 1771 1772 void 1773 spdk_poller_pause(struct spdk_poller *poller) 1774 { 1775 struct spdk_thread *thread; 1776 1777 thread = spdk_get_thread(); 1778 if (!thread) { 1779 assert(false); 1780 return; 1781 } 1782 1783 if (poller->thread != thread) { 1784 wrong_thread(__func__, poller->name, poller->thread, thread); 1785 return; 1786 } 1787 1788 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1789 * spdk_thread_poll() move it. It allows a poller to be paused from 1790 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1791 * iteration, or from within itself without breaking the logic to always 1792 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1793 */ 1794 switch (poller->state) { 1795 case SPDK_POLLER_STATE_PAUSED: 1796 case SPDK_POLLER_STATE_PAUSING: 1797 break; 1798 case SPDK_POLLER_STATE_RUNNING: 1799 case SPDK_POLLER_STATE_WAITING: 1800 poller->state = SPDK_POLLER_STATE_PAUSING; 1801 break; 1802 default: 1803 assert(false); 1804 break; 1805 } 1806 } 1807 1808 void 1809 spdk_poller_resume(struct spdk_poller *poller) 1810 { 1811 struct spdk_thread *thread; 1812 1813 thread = spdk_get_thread(); 1814 if (!thread) { 1815 assert(false); 1816 return; 1817 } 1818 1819 if (poller->thread != thread) { 1820 wrong_thread(__func__, poller->name, poller->thread, thread); 1821 return; 1822 } 1823 1824 /* If a poller is paused it has to be removed from the paused pollers 1825 * list and put on the active list or timer tree depending on its 1826 * period_ticks. If a poller is still in the process of being paused, 1827 * we just need to flip its state back to waiting, as it's already on 1828 * the appropriate list or tree. 1829 */ 1830 switch (poller->state) { 1831 case SPDK_POLLER_STATE_PAUSED: 1832 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1833 thread_insert_poller(thread, poller); 1834 /* fallthrough */ 1835 case SPDK_POLLER_STATE_PAUSING: 1836 poller->state = SPDK_POLLER_STATE_WAITING; 1837 break; 1838 case SPDK_POLLER_STATE_RUNNING: 1839 case SPDK_POLLER_STATE_WAITING: 1840 break; 1841 default: 1842 assert(false); 1843 break; 1844 } 1845 } 1846 1847 const char * 1848 spdk_poller_get_name(struct spdk_poller *poller) 1849 { 1850 return poller->name; 1851 } 1852 1853 uint64_t 1854 spdk_poller_get_id(struct spdk_poller *poller) 1855 { 1856 return poller->id; 1857 } 1858 1859 const char * 1860 spdk_poller_get_state_str(struct spdk_poller *poller) 1861 { 1862 switch (poller->state) { 1863 case SPDK_POLLER_STATE_WAITING: 1864 return "waiting"; 1865 case SPDK_POLLER_STATE_RUNNING: 1866 return "running"; 1867 case SPDK_POLLER_STATE_UNREGISTERED: 1868 return "unregistered"; 1869 case SPDK_POLLER_STATE_PAUSING: 1870 return "pausing"; 1871 case SPDK_POLLER_STATE_PAUSED: 1872 return "paused"; 1873 default: 1874 return NULL; 1875 } 1876 } 1877 1878 uint64_t 1879 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1880 { 1881 return poller->period_ticks; 1882 } 1883 1884 void 1885 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1886 { 1887 stats->run_count = poller->run_count; 1888 stats->busy_count = poller->busy_count; 1889 } 1890 1891 struct spdk_poller * 1892 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1893 { 1894 return TAILQ_FIRST(&thread->active_pollers); 1895 } 1896 1897 struct spdk_poller * 1898 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1899 { 1900 return TAILQ_NEXT(prev, tailq); 1901 } 1902 1903 struct spdk_poller * 1904 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1905 { 1906 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1907 } 1908 1909 struct spdk_poller * 1910 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1911 { 1912 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1913 } 1914 1915 struct spdk_poller * 1916 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1917 { 1918 return TAILQ_FIRST(&thread->paused_pollers); 1919 } 1920 1921 struct spdk_poller * 1922 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1923 { 1924 return TAILQ_NEXT(prev, tailq); 1925 } 1926 1927 struct spdk_io_channel * 1928 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1929 { 1930 return RB_MIN(io_channel_tree, &thread->io_channels); 1931 } 1932 1933 struct spdk_io_channel * 1934 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1935 { 1936 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1937 } 1938 1939 struct call_thread { 1940 struct spdk_thread *cur_thread; 1941 spdk_msg_fn fn; 1942 void *ctx; 1943 1944 struct spdk_thread *orig_thread; 1945 spdk_msg_fn cpl; 1946 }; 1947 1948 static void 1949 _on_thread(void *ctx) 1950 { 1951 struct call_thread *ct = ctx; 1952 int rc __attribute__((unused)); 1953 1954 ct->fn(ct->ctx); 1955 1956 pthread_mutex_lock(&g_devlist_mutex); 1957 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1958 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1959 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1960 ct->cur_thread->name); 1961 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1962 } 1963 pthread_mutex_unlock(&g_devlist_mutex); 1964 1965 if (!ct->cur_thread) { 1966 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1967 1968 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1969 free(ctx); 1970 } else { 1971 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1972 ct->cur_thread->name); 1973 1974 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1975 } 1976 assert(rc == 0); 1977 } 1978 1979 void 1980 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1981 { 1982 struct call_thread *ct; 1983 struct spdk_thread *thread; 1984 int rc __attribute__((unused)); 1985 1986 ct = calloc(1, sizeof(*ct)); 1987 if (!ct) { 1988 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1989 cpl(ctx); 1990 return; 1991 } 1992 1993 ct->fn = fn; 1994 ct->ctx = ctx; 1995 ct->cpl = cpl; 1996 1997 thread = _get_thread(); 1998 if (!thread) { 1999 SPDK_ERRLOG("No thread allocated\n"); 2000 free(ct); 2001 cpl(ctx); 2002 return; 2003 } 2004 ct->orig_thread = thread; 2005 2006 pthread_mutex_lock(&g_devlist_mutex); 2007 ct->cur_thread = TAILQ_FIRST(&g_threads); 2008 pthread_mutex_unlock(&g_devlist_mutex); 2009 2010 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2011 ct->orig_thread->name); 2012 2013 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2014 assert(rc == 0); 2015 } 2016 2017 static inline void 2018 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2019 { 2020 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2021 return; 2022 } 2023 2024 if (!poller->set_intr_cb_fn) { 2025 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2026 assert(false); 2027 return; 2028 } 2029 2030 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2031 } 2032 2033 void 2034 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2035 { 2036 struct spdk_thread *thread = _get_thread(); 2037 struct spdk_poller *poller, *tmp; 2038 2039 assert(thread); 2040 assert(spdk_interrupt_mode_is_enabled()); 2041 2042 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2043 thread->name, enable_interrupt ? "intr" : "poll", 2044 thread->in_interrupt ? "intr" : "poll"); 2045 2046 if (thread->in_interrupt == enable_interrupt) { 2047 return; 2048 } 2049 2050 /* Set pollers to expected mode */ 2051 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2052 poller_set_interrupt_mode(poller, enable_interrupt); 2053 } 2054 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2055 poller_set_interrupt_mode(poller, enable_interrupt); 2056 } 2057 /* All paused pollers will go to work in interrupt mode */ 2058 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2059 poller_set_interrupt_mode(poller, enable_interrupt); 2060 } 2061 2062 thread->in_interrupt = enable_interrupt; 2063 return; 2064 } 2065 2066 static struct io_device * 2067 io_device_get(void *io_device) 2068 { 2069 struct io_device find = {}; 2070 2071 find.io_device = io_device; 2072 return RB_FIND(io_device_tree, &g_io_devices, &find); 2073 } 2074 2075 void 2076 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2077 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2078 const char *name) 2079 { 2080 struct io_device *dev, *tmp; 2081 struct spdk_thread *thread; 2082 2083 assert(io_device != NULL); 2084 assert(create_cb != NULL); 2085 assert(destroy_cb != NULL); 2086 2087 thread = spdk_get_thread(); 2088 if (!thread) { 2089 SPDK_ERRLOG("called from non-SPDK thread\n"); 2090 assert(false); 2091 return; 2092 } 2093 2094 dev = calloc(1, sizeof(struct io_device)); 2095 if (dev == NULL) { 2096 SPDK_ERRLOG("could not allocate io_device\n"); 2097 return; 2098 } 2099 2100 dev->io_device = io_device; 2101 if (name) { 2102 snprintf(dev->name, sizeof(dev->name), "%s", name); 2103 } else { 2104 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2105 } 2106 dev->create_cb = create_cb; 2107 dev->destroy_cb = destroy_cb; 2108 dev->unregister_cb = NULL; 2109 dev->ctx_size = ctx_size; 2110 dev->for_each_count = 0; 2111 dev->unregistered = false; 2112 dev->refcnt = 0; 2113 2114 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2115 dev->name, dev->io_device, thread->name); 2116 2117 pthread_mutex_lock(&g_devlist_mutex); 2118 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2119 if (tmp != NULL) { 2120 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2121 io_device, tmp->name, dev->name); 2122 free(dev); 2123 } 2124 2125 pthread_mutex_unlock(&g_devlist_mutex); 2126 } 2127 2128 static void 2129 _finish_unregister(void *arg) 2130 { 2131 struct io_device *dev = arg; 2132 struct spdk_thread *thread; 2133 2134 thread = spdk_get_thread(); 2135 assert(thread == dev->unregister_thread); 2136 2137 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2138 dev->name, dev->io_device, thread->name); 2139 2140 assert(thread->pending_unregister_count > 0); 2141 thread->pending_unregister_count--; 2142 2143 dev->unregister_cb(dev->io_device); 2144 free(dev); 2145 } 2146 2147 static void 2148 io_device_free(struct io_device *dev) 2149 { 2150 int rc __attribute__((unused)); 2151 2152 if (dev->unregister_cb == NULL) { 2153 free(dev); 2154 } else { 2155 assert(dev->unregister_thread != NULL); 2156 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2157 dev->name, dev->io_device, dev->unregister_thread->name); 2158 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2159 assert(rc == 0); 2160 } 2161 } 2162 2163 void 2164 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2165 { 2166 struct io_device *dev; 2167 uint32_t refcnt; 2168 struct spdk_thread *thread; 2169 2170 thread = spdk_get_thread(); 2171 if (!thread) { 2172 SPDK_ERRLOG("called from non-SPDK thread\n"); 2173 assert(false); 2174 return; 2175 } 2176 2177 pthread_mutex_lock(&g_devlist_mutex); 2178 dev = io_device_get(io_device); 2179 if (!dev) { 2180 SPDK_ERRLOG("io_device %p not found\n", io_device); 2181 assert(false); 2182 pthread_mutex_unlock(&g_devlist_mutex); 2183 return; 2184 } 2185 2186 /* The for_each_count check differentiates the user attempting to unregister the 2187 * device a second time, from the internal call to this function that occurs 2188 * after the for_each_count reaches 0. 2189 */ 2190 if (dev->pending_unregister && dev->for_each_count > 0) { 2191 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2192 assert(false); 2193 pthread_mutex_unlock(&g_devlist_mutex); 2194 return; 2195 } 2196 2197 dev->unregister_cb = unregister_cb; 2198 dev->unregister_thread = thread; 2199 2200 if (dev->for_each_count > 0) { 2201 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2202 dev->name, io_device, dev->for_each_count); 2203 dev->pending_unregister = true; 2204 pthread_mutex_unlock(&g_devlist_mutex); 2205 return; 2206 } 2207 2208 dev->unregistered = true; 2209 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2210 refcnt = dev->refcnt; 2211 pthread_mutex_unlock(&g_devlist_mutex); 2212 2213 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2214 dev->name, dev->io_device, thread->name); 2215 2216 if (unregister_cb) { 2217 thread->pending_unregister_count++; 2218 } 2219 2220 if (refcnt > 0) { 2221 /* defer deletion */ 2222 return; 2223 } 2224 2225 io_device_free(dev); 2226 } 2227 2228 const char * 2229 spdk_io_device_get_name(struct io_device *dev) 2230 { 2231 return dev->name; 2232 } 2233 2234 static struct spdk_io_channel * 2235 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2236 { 2237 struct spdk_io_channel find = {}; 2238 2239 find.dev = dev; 2240 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2241 } 2242 2243 struct spdk_io_channel * 2244 spdk_get_io_channel(void *io_device) 2245 { 2246 struct spdk_io_channel *ch; 2247 struct spdk_thread *thread; 2248 struct io_device *dev; 2249 int rc; 2250 2251 pthread_mutex_lock(&g_devlist_mutex); 2252 dev = io_device_get(io_device); 2253 if (dev == NULL) { 2254 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2255 pthread_mutex_unlock(&g_devlist_mutex); 2256 return NULL; 2257 } 2258 2259 thread = _get_thread(); 2260 if (!thread) { 2261 SPDK_ERRLOG("No thread allocated\n"); 2262 pthread_mutex_unlock(&g_devlist_mutex); 2263 return NULL; 2264 } 2265 2266 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2267 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2268 pthread_mutex_unlock(&g_devlist_mutex); 2269 return NULL; 2270 } 2271 2272 ch = thread_get_io_channel(thread, dev); 2273 if (ch != NULL) { 2274 ch->ref++; 2275 2276 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2277 ch, dev->name, dev->io_device, thread->name, ch->ref); 2278 2279 /* 2280 * An I/O channel already exists for this device on this 2281 * thread, so return it. 2282 */ 2283 pthread_mutex_unlock(&g_devlist_mutex); 2284 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2285 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2286 return ch; 2287 } 2288 2289 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2290 if (ch == NULL) { 2291 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2292 pthread_mutex_unlock(&g_devlist_mutex); 2293 return NULL; 2294 } 2295 2296 ch->dev = dev; 2297 ch->destroy_cb = dev->destroy_cb; 2298 ch->thread = thread; 2299 ch->ref = 1; 2300 ch->destroy_ref = 0; 2301 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2302 2303 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2304 ch, dev->name, dev->io_device, thread->name, ch->ref); 2305 2306 dev->refcnt++; 2307 2308 pthread_mutex_unlock(&g_devlist_mutex); 2309 2310 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2311 if (rc != 0) { 2312 pthread_mutex_lock(&g_devlist_mutex); 2313 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2314 dev->refcnt--; 2315 free(ch); 2316 pthread_mutex_unlock(&g_devlist_mutex); 2317 return NULL; 2318 } 2319 2320 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2321 return ch; 2322 } 2323 2324 static void 2325 put_io_channel(void *arg) 2326 { 2327 struct spdk_io_channel *ch = arg; 2328 bool do_remove_dev = true; 2329 struct spdk_thread *thread; 2330 2331 thread = spdk_get_thread(); 2332 if (!thread) { 2333 SPDK_ERRLOG("called from non-SPDK thread\n"); 2334 assert(false); 2335 return; 2336 } 2337 2338 SPDK_DEBUGLOG(thread, 2339 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2340 ch, ch->dev->name, ch->dev->io_device, thread->name); 2341 2342 assert(ch->thread == thread); 2343 2344 ch->destroy_ref--; 2345 2346 if (ch->ref > 0 || ch->destroy_ref > 0) { 2347 /* 2348 * Another reference to the associated io_device was requested 2349 * after this message was sent but before it had a chance to 2350 * execute. 2351 */ 2352 return; 2353 } 2354 2355 pthread_mutex_lock(&g_devlist_mutex); 2356 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2357 pthread_mutex_unlock(&g_devlist_mutex); 2358 2359 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2360 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2361 2362 pthread_mutex_lock(&g_devlist_mutex); 2363 ch->dev->refcnt--; 2364 2365 if (!ch->dev->unregistered) { 2366 do_remove_dev = false; 2367 } 2368 2369 if (ch->dev->refcnt > 0) { 2370 do_remove_dev = false; 2371 } 2372 2373 pthread_mutex_unlock(&g_devlist_mutex); 2374 2375 if (do_remove_dev) { 2376 io_device_free(ch->dev); 2377 } 2378 free(ch); 2379 } 2380 2381 void 2382 spdk_put_io_channel(struct spdk_io_channel *ch) 2383 { 2384 struct spdk_thread *thread; 2385 int rc __attribute__((unused)); 2386 2387 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2388 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2389 2390 thread = spdk_get_thread(); 2391 if (!thread) { 2392 SPDK_ERRLOG("called from non-SPDK thread\n"); 2393 assert(false); 2394 return; 2395 } 2396 2397 if (ch->thread != thread) { 2398 wrong_thread(__func__, "ch", ch->thread, thread); 2399 return; 2400 } 2401 2402 SPDK_DEBUGLOG(thread, 2403 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2404 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2405 2406 ch->ref--; 2407 2408 if (ch->ref == 0) { 2409 ch->destroy_ref++; 2410 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2411 assert(rc == 0); 2412 } 2413 } 2414 2415 struct spdk_io_channel * 2416 spdk_io_channel_from_ctx(void *ctx) 2417 { 2418 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2419 } 2420 2421 struct spdk_thread * 2422 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2423 { 2424 return ch->thread; 2425 } 2426 2427 void * 2428 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2429 { 2430 return ch->dev->io_device; 2431 } 2432 2433 const char * 2434 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2435 { 2436 return spdk_io_device_get_name(ch->dev); 2437 } 2438 2439 int 2440 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2441 { 2442 return ch->ref; 2443 } 2444 2445 struct spdk_io_channel_iter { 2446 void *io_device; 2447 struct io_device *dev; 2448 spdk_channel_msg fn; 2449 int status; 2450 void *ctx; 2451 struct spdk_io_channel *ch; 2452 2453 struct spdk_thread *cur_thread; 2454 2455 struct spdk_thread *orig_thread; 2456 spdk_channel_for_each_cpl cpl; 2457 }; 2458 2459 void * 2460 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2461 { 2462 return i->io_device; 2463 } 2464 2465 struct spdk_io_channel * 2466 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2467 { 2468 return i->ch; 2469 } 2470 2471 void * 2472 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2473 { 2474 return i->ctx; 2475 } 2476 2477 static void 2478 _call_completion(void *ctx) 2479 { 2480 struct spdk_io_channel_iter *i = ctx; 2481 2482 if (i->cpl != NULL) { 2483 i->cpl(i, i->status); 2484 } 2485 free(i); 2486 } 2487 2488 static void 2489 _call_channel(void *ctx) 2490 { 2491 struct spdk_io_channel_iter *i = ctx; 2492 struct spdk_io_channel *ch; 2493 2494 /* 2495 * It is possible that the channel was deleted before this 2496 * message had a chance to execute. If so, skip calling 2497 * the fn() on this thread. 2498 */ 2499 pthread_mutex_lock(&g_devlist_mutex); 2500 ch = thread_get_io_channel(i->cur_thread, i->dev); 2501 pthread_mutex_unlock(&g_devlist_mutex); 2502 2503 if (ch) { 2504 i->fn(i); 2505 } else { 2506 spdk_for_each_channel_continue(i, 0); 2507 } 2508 } 2509 2510 void 2511 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2512 spdk_channel_for_each_cpl cpl) 2513 { 2514 struct spdk_thread *thread; 2515 struct spdk_io_channel *ch; 2516 struct spdk_io_channel_iter *i; 2517 int rc __attribute__((unused)); 2518 2519 i = calloc(1, sizeof(*i)); 2520 if (!i) { 2521 SPDK_ERRLOG("Unable to allocate iterator\n"); 2522 assert(false); 2523 return; 2524 } 2525 2526 i->io_device = io_device; 2527 i->fn = fn; 2528 i->ctx = ctx; 2529 i->cpl = cpl; 2530 i->orig_thread = _get_thread(); 2531 2532 pthread_mutex_lock(&g_devlist_mutex); 2533 i->dev = io_device_get(io_device); 2534 if (i->dev == NULL) { 2535 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2536 assert(false); 2537 i->status = -ENODEV; 2538 goto end; 2539 } 2540 2541 /* Do not allow new for_each operations if we are already waiting to unregister 2542 * the device for other for_each operations to complete. 2543 */ 2544 if (i->dev->pending_unregister) { 2545 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2546 i->status = -ENODEV; 2547 goto end; 2548 } 2549 2550 TAILQ_FOREACH(thread, &g_threads, tailq) { 2551 ch = thread_get_io_channel(thread, i->dev); 2552 if (ch != NULL) { 2553 ch->dev->for_each_count++; 2554 i->cur_thread = thread; 2555 i->ch = ch; 2556 pthread_mutex_unlock(&g_devlist_mutex); 2557 rc = spdk_thread_send_msg(thread, _call_channel, i); 2558 assert(rc == 0); 2559 return; 2560 } 2561 } 2562 2563 end: 2564 pthread_mutex_unlock(&g_devlist_mutex); 2565 2566 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2567 assert(rc == 0); 2568 } 2569 2570 static void 2571 __pending_unregister(void *arg) 2572 { 2573 struct io_device *dev = arg; 2574 2575 assert(dev->pending_unregister); 2576 assert(dev->for_each_count == 0); 2577 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2578 } 2579 2580 void 2581 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2582 { 2583 struct spdk_thread *thread; 2584 struct spdk_io_channel *ch; 2585 struct io_device *dev; 2586 int rc __attribute__((unused)); 2587 2588 assert(i->cur_thread == spdk_get_thread()); 2589 2590 i->status = status; 2591 2592 pthread_mutex_lock(&g_devlist_mutex); 2593 dev = i->dev; 2594 if (status) { 2595 goto end; 2596 } 2597 2598 thread = TAILQ_NEXT(i->cur_thread, tailq); 2599 while (thread) { 2600 ch = thread_get_io_channel(thread, dev); 2601 if (ch != NULL) { 2602 i->cur_thread = thread; 2603 i->ch = ch; 2604 pthread_mutex_unlock(&g_devlist_mutex); 2605 rc = spdk_thread_send_msg(thread, _call_channel, i); 2606 assert(rc == 0); 2607 return; 2608 } 2609 thread = TAILQ_NEXT(thread, tailq); 2610 } 2611 2612 end: 2613 dev->for_each_count--; 2614 i->ch = NULL; 2615 pthread_mutex_unlock(&g_devlist_mutex); 2616 2617 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2618 assert(rc == 0); 2619 2620 pthread_mutex_lock(&g_devlist_mutex); 2621 if (dev->pending_unregister && dev->for_each_count == 0) { 2622 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2623 assert(rc == 0); 2624 } 2625 pthread_mutex_unlock(&g_devlist_mutex); 2626 } 2627 2628 static void 2629 thread_interrupt_destroy(struct spdk_thread *thread) 2630 { 2631 struct spdk_fd_group *fgrp = thread->fgrp; 2632 2633 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2634 2635 if (thread->msg_fd < 0) { 2636 return; 2637 } 2638 2639 spdk_fd_group_remove(fgrp, thread->msg_fd); 2640 close(thread->msg_fd); 2641 thread->msg_fd = -1; 2642 2643 spdk_fd_group_destroy(fgrp); 2644 thread->fgrp = NULL; 2645 } 2646 2647 #ifdef __linux__ 2648 static int 2649 thread_interrupt_msg_process(void *arg) 2650 { 2651 struct spdk_thread *thread = arg; 2652 struct spdk_thread *orig_thread; 2653 uint32_t msg_count; 2654 spdk_msg_fn critical_msg; 2655 int rc = 0; 2656 uint64_t notify = 1; 2657 2658 assert(spdk_interrupt_mode_is_enabled()); 2659 2660 orig_thread = spdk_get_thread(); 2661 spdk_set_thread(thread); 2662 2663 /* There may be race between msg_acknowledge and another producer's msg_notify, 2664 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2665 * This can avoid msg notification missing. 2666 */ 2667 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2668 if (rc < 0 && errno != EAGAIN) { 2669 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2670 } 2671 2672 critical_msg = thread->critical_msg; 2673 if (spdk_unlikely(critical_msg != NULL)) { 2674 critical_msg(NULL); 2675 thread->critical_msg = NULL; 2676 rc = 1; 2677 } 2678 2679 msg_count = msg_queue_run_batch(thread, 0); 2680 if (msg_count) { 2681 rc = 1; 2682 } 2683 2684 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2685 if (spdk_unlikely(!thread->in_interrupt)) { 2686 /* The thread transitioned to poll mode in a msg during the above processing. 2687 * Clear msg_fd since thread messages will be polled directly in poll mode. 2688 */ 2689 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2690 if (rc < 0 && errno != EAGAIN) { 2691 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2692 } 2693 } 2694 2695 spdk_set_thread(orig_thread); 2696 return rc; 2697 } 2698 2699 static int 2700 thread_interrupt_create(struct spdk_thread *thread) 2701 { 2702 int rc; 2703 2704 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2705 2706 rc = spdk_fd_group_create(&thread->fgrp); 2707 if (rc) { 2708 return rc; 2709 } 2710 2711 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2712 if (thread->msg_fd < 0) { 2713 rc = -errno; 2714 spdk_fd_group_destroy(thread->fgrp); 2715 thread->fgrp = NULL; 2716 2717 return rc; 2718 } 2719 2720 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2721 thread_interrupt_msg_process, thread); 2722 } 2723 #else 2724 static int 2725 thread_interrupt_create(struct spdk_thread *thread) 2726 { 2727 return -ENOTSUP; 2728 } 2729 #endif 2730 2731 static int 2732 _interrupt_wrapper(void *ctx) 2733 { 2734 struct spdk_interrupt *intr = ctx; 2735 struct spdk_thread *orig_thread, *thread; 2736 int rc; 2737 2738 orig_thread = spdk_get_thread(); 2739 thread = intr->thread; 2740 2741 spdk_set_thread(thread); 2742 2743 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2744 intr->fn, intr->arg); 2745 2746 rc = intr->fn(intr->arg); 2747 2748 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2749 2750 spdk_set_thread(orig_thread); 2751 2752 return rc; 2753 } 2754 2755 struct spdk_interrupt * 2756 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2757 void *arg, const char *name) 2758 { 2759 struct spdk_thread *thread; 2760 struct spdk_interrupt *intr; 2761 int ret; 2762 2763 thread = spdk_get_thread(); 2764 if (!thread) { 2765 assert(false); 2766 return NULL; 2767 } 2768 2769 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2770 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2771 return NULL; 2772 } 2773 2774 intr = calloc(1, sizeof(*intr)); 2775 if (intr == NULL) { 2776 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2777 return NULL; 2778 } 2779 2780 if (name) { 2781 snprintf(intr->name, sizeof(intr->name), "%s", name); 2782 } else { 2783 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2784 } 2785 2786 intr->efd = efd; 2787 intr->thread = thread; 2788 intr->fn = fn; 2789 intr->arg = arg; 2790 2791 ret = spdk_fd_group_add(thread->fgrp, efd, _interrupt_wrapper, intr, intr->name); 2792 2793 if (ret != 0) { 2794 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2795 thread->name, efd, spdk_strerror(-ret)); 2796 free(intr); 2797 return NULL; 2798 } 2799 2800 return intr; 2801 } 2802 2803 void 2804 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2805 { 2806 struct spdk_thread *thread; 2807 struct spdk_interrupt *intr; 2808 2809 intr = *pintr; 2810 if (intr == NULL) { 2811 return; 2812 } 2813 2814 *pintr = NULL; 2815 2816 thread = spdk_get_thread(); 2817 if (!thread) { 2818 assert(false); 2819 return; 2820 } 2821 2822 if (intr->thread != thread) { 2823 wrong_thread(__func__, intr->name, intr->thread, thread); 2824 return; 2825 } 2826 2827 spdk_fd_group_remove(thread->fgrp, intr->efd); 2828 free(intr); 2829 } 2830 2831 int 2832 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2833 enum spdk_interrupt_event_types event_types) 2834 { 2835 struct spdk_thread *thread; 2836 2837 thread = spdk_get_thread(); 2838 if (!thread) { 2839 assert(false); 2840 return -EINVAL; 2841 } 2842 2843 if (intr->thread != thread) { 2844 wrong_thread(__func__, intr->name, intr->thread, thread); 2845 return -EINVAL; 2846 } 2847 2848 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2849 } 2850 2851 int 2852 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2853 { 2854 return spdk_fd_group_get_fd(thread->fgrp); 2855 } 2856 2857 struct spdk_fd_group * 2858 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2859 { 2860 return thread->fgrp; 2861 } 2862 2863 static bool g_interrupt_mode = false; 2864 2865 int 2866 spdk_interrupt_mode_enable(void) 2867 { 2868 /* It must be called once prior to initializing the threading library. 2869 * g_spdk_msg_mempool will be valid if thread library is initialized. 2870 */ 2871 if (g_spdk_msg_mempool) { 2872 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2873 return -1; 2874 } 2875 2876 #ifdef __linux__ 2877 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2878 g_interrupt_mode = true; 2879 return 0; 2880 #else 2881 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2882 g_interrupt_mode = false; 2883 return -ENOTSUP; 2884 #endif 2885 } 2886 2887 bool 2888 spdk_interrupt_mode_is_enabled(void) 2889 { 2890 return g_interrupt_mode; 2891 } 2892 2893 #define SSPIN_DEBUG_STACK_FRAMES 16 2894 2895 struct sspin_stack { 2896 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 2897 uint32_t depth; 2898 }; 2899 2900 struct spdk_spinlock_internal { 2901 struct sspin_stack init_stack; 2902 struct sspin_stack lock_stack; 2903 struct sspin_stack unlock_stack; 2904 }; 2905 2906 static void 2907 sspin_init_internal(struct spdk_spinlock *sspin) 2908 { 2909 #ifdef DEBUG 2910 sspin->internal = calloc(1, sizeof(*sspin->internal)); 2911 #endif 2912 } 2913 2914 static void 2915 sspin_fini_internal(struct spdk_spinlock *sspin) 2916 { 2917 #ifdef DEBUG 2918 free(sspin->internal); 2919 sspin->internal = NULL; 2920 #endif 2921 } 2922 2923 #ifdef DEBUG 2924 #define SSPIN_GET_STACK(sspin, which) \ 2925 do { \ 2926 if (sspin->internal != NULL) { \ 2927 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 2928 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 2929 } \ 2930 } while (0) 2931 #else 2932 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 2933 #endif 2934 2935 static void 2936 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 2937 { 2938 char **stack; 2939 size_t i; 2940 2941 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 2942 if (stack == NULL) { 2943 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 2944 return; 2945 } 2946 SPDK_ERRLOG(" %s:\n", title); 2947 for (i = 0; i < sspin_stack->depth; i++) { 2948 /* 2949 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 2950 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 2951 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 2952 * gdb_macros.md in the docs directory for details. 2953 */ 2954 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 2955 } 2956 free(stack); 2957 } 2958 2959 static void 2960 sspin_stacks_print(const struct spdk_spinlock *sspin) 2961 { 2962 if (sspin->internal == NULL) { 2963 return; 2964 } 2965 SPDK_ERRLOG("spinlock %p\n", sspin); 2966 sspin_stack_print("Lock initalized at", &sspin->internal->init_stack); 2967 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 2968 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 2969 } 2970 2971 void 2972 spdk_spin_init(struct spdk_spinlock *sspin) 2973 { 2974 int rc; 2975 2976 memset(sspin, 0, sizeof(*sspin)); 2977 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 2978 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 2979 sspin_init_internal(sspin); 2980 SSPIN_GET_STACK(sspin, init); 2981 sspin->initialized = true; 2982 } 2983 2984 void 2985 spdk_spin_destroy(struct spdk_spinlock *sspin) 2986 { 2987 int rc; 2988 2989 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 2990 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 2991 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 2992 2993 rc = pthread_spin_destroy(&sspin->spinlock); 2994 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 2995 2996 sspin_fini_internal(sspin); 2997 sspin->initialized = false; 2998 sspin->destroyed = true; 2999 } 3000 3001 void 3002 spdk_spin_lock(struct spdk_spinlock *sspin) 3003 { 3004 struct spdk_thread *thread = spdk_get_thread(); 3005 int rc; 3006 3007 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3008 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3009 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3010 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3011 3012 rc = pthread_spin_lock(&sspin->spinlock); 3013 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3014 3015 sspin->thread = thread; 3016 sspin->thread->lock_count++; 3017 3018 SSPIN_GET_STACK(sspin, lock); 3019 } 3020 3021 void 3022 spdk_spin_unlock(struct spdk_spinlock *sspin) 3023 { 3024 struct spdk_thread *thread = spdk_get_thread(); 3025 int rc; 3026 3027 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3028 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3029 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3030 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3031 3032 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3033 thread->lock_count--; 3034 sspin->thread = NULL; 3035 3036 SSPIN_GET_STACK(sspin, unlock); 3037 3038 rc = pthread_spin_unlock(&sspin->spinlock); 3039 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3040 } 3041 3042 bool 3043 spdk_spin_held(struct spdk_spinlock *sspin) 3044 { 3045 struct spdk_thread *thread = spdk_get_thread(); 3046 3047 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3048 3049 return sspin->thread == thread; 3050 } 3051 3052 SPDK_LOG_REGISTER_COMPONENT(thread) 3053