1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #include <execinfo.h> 29 #endif 30 31 #ifdef __FreeBSD__ 32 #include <execinfo.h> 33 #endif 34 35 #define SPDK_MSG_BATCH_SIZE 8 36 #define SPDK_MAX_DEVICE_NAME_LEN 256 37 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 38 #define SPDK_MAX_POLLER_NAME_LEN 256 39 #define SPDK_MAX_THREAD_NAME_LEN 256 40 41 static struct spdk_thread *g_app_thread; 42 43 struct spdk_interrupt { 44 int efd; 45 struct spdk_thread *thread; 46 spdk_interrupt_fn fn; 47 void *arg; 48 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 49 }; 50 51 enum spdk_poller_state { 52 /* The poller is registered with a thread but not currently executing its fn. */ 53 SPDK_POLLER_STATE_WAITING, 54 55 /* The poller is currently running its fn. */ 56 SPDK_POLLER_STATE_RUNNING, 57 58 /* The poller was unregistered during the execution of its fn. */ 59 SPDK_POLLER_STATE_UNREGISTERED, 60 61 /* The poller is in the process of being paused. It will be paused 62 * during the next time it's supposed to be executed. 63 */ 64 SPDK_POLLER_STATE_PAUSING, 65 66 /* The poller is registered but currently paused. It's on the 67 * paused_pollers list. 68 */ 69 SPDK_POLLER_STATE_PAUSED, 70 }; 71 72 struct spdk_poller { 73 TAILQ_ENTRY(spdk_poller) tailq; 74 RB_ENTRY(spdk_poller) node; 75 76 /* Current state of the poller; should only be accessed from the poller's thread. */ 77 enum spdk_poller_state state; 78 79 uint64_t period_ticks; 80 uint64_t next_run_tick; 81 uint64_t run_count; 82 uint64_t busy_count; 83 uint64_t id; 84 spdk_poller_fn fn; 85 void *arg; 86 struct spdk_thread *thread; 87 struct spdk_interrupt *intr; 88 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 89 void *set_intr_cb_arg; 90 91 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 92 }; 93 94 enum spdk_thread_state { 95 /* The thread is processing poller and message by spdk_thread_poll(). */ 96 SPDK_THREAD_STATE_RUNNING, 97 98 /* The thread is in the process of termination. It reaps unregistering 99 * poller are releasing I/O channel. 100 */ 101 SPDK_THREAD_STATE_EXITING, 102 103 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 104 SPDK_THREAD_STATE_EXITED, 105 }; 106 107 struct spdk_thread { 108 uint64_t tsc_last; 109 struct spdk_thread_stats stats; 110 /* 111 * Contains pollers actively running on this thread. Pollers 112 * are run round-robin. The thread takes one poller from the head 113 * of the ring, executes it, then puts it back at the tail of 114 * the ring. 115 */ 116 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 117 /** 118 * Contains pollers running on this thread with a periodic timer. 119 */ 120 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 121 struct spdk_poller *first_timed_poller; 122 /* 123 * Contains paused pollers. Pollers on this queue are waiting until 124 * they are resumed (in which case they're put onto the active/timer 125 * queues) or unregistered. 126 */ 127 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 128 struct spdk_ring *messages; 129 int msg_fd; 130 SLIST_HEAD(, spdk_msg) msg_cache; 131 size_t msg_cache_count; 132 spdk_msg_fn critical_msg; 133 uint64_t id; 134 uint64_t next_poller_id; 135 enum spdk_thread_state state; 136 int pending_unregister_count; 137 138 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 139 TAILQ_ENTRY(spdk_thread) tailq; 140 141 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 142 struct spdk_cpuset cpumask; 143 uint64_t exit_timeout_tsc; 144 145 int32_t lock_count; 146 147 /* Indicates whether this spdk_thread currently runs in interrupt. */ 148 bool in_interrupt; 149 bool poller_unregistered; 150 struct spdk_fd_group *fgrp; 151 152 /* User context allocated at the end */ 153 uint8_t ctx[0]; 154 }; 155 156 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 157 158 static spdk_new_thread_fn g_new_thread_fn = NULL; 159 static spdk_thread_op_fn g_thread_op_fn = NULL; 160 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 161 static size_t g_ctx_sz = 0; 162 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 163 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 164 * SPDK application is required. 165 */ 166 static uint64_t g_thread_id = 1; 167 168 enum spin_error { 169 SPIN_ERR_NONE, 170 /* Trying to use an SPDK lock while not on an SPDK thread */ 171 SPIN_ERR_NOT_SPDK_THREAD, 172 /* Trying to lock a lock already held by this SPDK thread */ 173 SPIN_ERR_DEADLOCK, 174 /* Trying to unlock a lock not held by this SPDK thread */ 175 SPIN_ERR_WRONG_THREAD, 176 /* pthread_spin_*() returned an error */ 177 SPIN_ERR_PTHREAD, 178 /* Trying to destroy a lock that is held */ 179 SPIN_ERR_LOCK_HELD, 180 /* lock_count is invalid */ 181 SPIN_ERR_LOCK_COUNT, 182 /* 183 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 184 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 185 * deadlock when another SPDK thread on the same pthread tries to take that lock. 186 */ 187 SPIN_ERR_HOLD_DURING_SWITCH, 188 /* Trying to use a lock that was destroyed (but not re-initialized) */ 189 SPIN_ERR_DESTROYED, 190 /* Trying to use a lock that is not initialized */ 191 SPIN_ERR_NOT_INITIALIZED, 192 193 /* Must be last, not an actual error code */ 194 SPIN_ERR_LAST 195 }; 196 197 static const char *spin_error_strings[] = { 198 [SPIN_ERR_NONE] = "No error", 199 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 200 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 201 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 202 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 203 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 204 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 205 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 206 [SPIN_ERR_DESTROYED] = "Lock has been destroyed", 207 [SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized", 208 }; 209 210 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 211 ? "Unknown error" : spin_error_strings[err] 212 213 static void 214 __posix_abort(enum spin_error err) 215 { 216 abort(); 217 } 218 219 typedef void (*spin_abort)(enum spin_error err); 220 spin_abort g_spin_abort_fn = __posix_abort; 221 222 #define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \ 223 do { \ 224 if (spdk_unlikely(!(cond))) { \ 225 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 226 SPIN_ERROR_STRING(err), #cond); \ 227 extra_log; \ 228 g_spin_abort_fn(err); \ 229 ret; \ 230 } \ 231 } while (0) 232 #define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \ 233 SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return) 234 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret) 235 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,) 236 237 struct io_device { 238 void *io_device; 239 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 240 spdk_io_channel_create_cb create_cb; 241 spdk_io_channel_destroy_cb destroy_cb; 242 spdk_io_device_unregister_cb unregister_cb; 243 struct spdk_thread *unregister_thread; 244 uint32_t ctx_size; 245 uint32_t for_each_count; 246 RB_ENTRY(io_device) node; 247 248 uint32_t refcnt; 249 250 bool pending_unregister; 251 bool unregistered; 252 }; 253 254 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 255 256 static int 257 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 258 { 259 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 260 } 261 262 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 263 264 static int 265 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 266 { 267 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 268 } 269 270 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 271 272 struct spdk_msg { 273 spdk_msg_fn fn; 274 void *arg; 275 276 SLIST_ENTRY(spdk_msg) link; 277 }; 278 279 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 280 281 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 282 static uint32_t g_thread_count = 0; 283 284 static __thread struct spdk_thread *tls_thread = NULL; 285 286 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 287 { 288 spdk_trace_register_description("THREAD_IOCH_GET", 289 TRACE_THREAD_IOCH_GET, 290 OWNER_NONE, OBJECT_NONE, 0, 291 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 292 spdk_trace_register_description("THREAD_IOCH_PUT", 293 TRACE_THREAD_IOCH_PUT, 294 OWNER_NONE, OBJECT_NONE, 0, 295 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 296 } 297 298 /* 299 * If this compare function returns zero when two next_run_ticks are equal, 300 * the macro RB_INSERT() returns a pointer to the element with the same 301 * next_run_tick. 302 * 303 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 304 * to remove as a parameter. 305 * 306 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 307 * side by returning 1 when two next_run_ticks are equal. 308 */ 309 static inline int 310 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 311 { 312 if (poller1->next_run_tick < poller2->next_run_tick) { 313 return -1; 314 } else { 315 return 1; 316 } 317 } 318 319 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 320 321 static inline struct spdk_thread * 322 _get_thread(void) 323 { 324 return tls_thread; 325 } 326 327 static int 328 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 329 { 330 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 331 332 g_ctx_sz = ctx_sz; 333 334 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 335 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 336 sizeof(struct spdk_msg), 337 0, /* No cache. We do our own. */ 338 SPDK_ENV_SOCKET_ID_ANY); 339 340 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 341 msg_mempool_sz); 342 343 if (!g_spdk_msg_mempool) { 344 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 345 return -ENOMEM; 346 } 347 348 return 0; 349 } 350 351 static void thread_interrupt_destroy(struct spdk_thread *thread); 352 static int thread_interrupt_create(struct spdk_thread *thread); 353 354 static void 355 _free_thread(struct spdk_thread *thread) 356 { 357 struct spdk_io_channel *ch; 358 struct spdk_msg *msg; 359 struct spdk_poller *poller, *ptmp; 360 361 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 362 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 363 thread->name, ch->dev->name); 364 } 365 366 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 367 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 368 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 369 poller->name); 370 } 371 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 372 free(poller); 373 } 374 375 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 376 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 377 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 378 poller->name); 379 } 380 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 381 free(poller); 382 } 383 384 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 385 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 386 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 387 free(poller); 388 } 389 390 pthread_mutex_lock(&g_devlist_mutex); 391 assert(g_thread_count > 0); 392 g_thread_count--; 393 TAILQ_REMOVE(&g_threads, thread, tailq); 394 pthread_mutex_unlock(&g_devlist_mutex); 395 396 msg = SLIST_FIRST(&thread->msg_cache); 397 while (msg != NULL) { 398 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 399 400 assert(thread->msg_cache_count > 0); 401 thread->msg_cache_count--; 402 spdk_mempool_put(g_spdk_msg_mempool, msg); 403 404 msg = SLIST_FIRST(&thread->msg_cache); 405 } 406 407 assert(thread->msg_cache_count == 0); 408 409 if (spdk_interrupt_mode_is_enabled()) { 410 thread_interrupt_destroy(thread); 411 } 412 413 spdk_ring_free(thread->messages); 414 free(thread); 415 } 416 417 int 418 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 419 { 420 assert(g_new_thread_fn == NULL); 421 assert(g_thread_op_fn == NULL); 422 423 if (new_thread_fn == NULL) { 424 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 425 } else { 426 g_new_thread_fn = new_thread_fn; 427 } 428 429 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 430 } 431 432 int 433 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 434 spdk_thread_op_supported_fn thread_op_supported_fn, 435 size_t ctx_sz, size_t msg_mempool_sz) 436 { 437 assert(g_new_thread_fn == NULL); 438 assert(g_thread_op_fn == NULL); 439 assert(g_thread_op_supported_fn == NULL); 440 441 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 442 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 443 return -EINVAL; 444 } 445 446 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 447 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 448 } else { 449 g_thread_op_fn = thread_op_fn; 450 g_thread_op_supported_fn = thread_op_supported_fn; 451 } 452 453 return _thread_lib_init(ctx_sz, msg_mempool_sz); 454 } 455 456 void 457 spdk_thread_lib_fini(void) 458 { 459 struct io_device *dev; 460 461 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 462 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 463 } 464 465 g_new_thread_fn = NULL; 466 g_thread_op_fn = NULL; 467 g_thread_op_supported_fn = NULL; 468 g_ctx_sz = 0; 469 if (g_app_thread != NULL) { 470 _free_thread(g_app_thread); 471 g_app_thread = NULL; 472 } 473 474 if (g_spdk_msg_mempool) { 475 spdk_mempool_free(g_spdk_msg_mempool); 476 g_spdk_msg_mempool = NULL; 477 } 478 } 479 480 struct spdk_thread * 481 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 482 { 483 struct spdk_thread *thread, *null_thread; 484 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 485 int rc = 0, i; 486 487 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 488 if (!thread) { 489 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 490 return NULL; 491 } 492 493 if (cpumask) { 494 spdk_cpuset_copy(&thread->cpumask, cpumask); 495 } else { 496 spdk_cpuset_negate(&thread->cpumask); 497 } 498 499 RB_INIT(&thread->io_channels); 500 TAILQ_INIT(&thread->active_pollers); 501 RB_INIT(&thread->timed_pollers); 502 TAILQ_INIT(&thread->paused_pollers); 503 SLIST_INIT(&thread->msg_cache); 504 thread->msg_cache_count = 0; 505 506 thread->tsc_last = spdk_get_ticks(); 507 508 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 509 * ID exceeds UINT64_MAX a warning message is logged 510 */ 511 thread->next_poller_id = 1; 512 513 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 514 if (!thread->messages) { 515 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 516 free(thread); 517 return NULL; 518 } 519 520 /* Fill the local message pool cache. */ 521 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 522 if (rc == 0) { 523 /* If we can't populate the cache it's ok. The cache will get filled 524 * up organically as messages are passed to the thread. */ 525 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 526 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 527 thread->msg_cache_count++; 528 } 529 } 530 531 if (name) { 532 snprintf(thread->name, sizeof(thread->name), "%s", name); 533 } else { 534 snprintf(thread->name, sizeof(thread->name), "%p", thread); 535 } 536 537 pthread_mutex_lock(&g_devlist_mutex); 538 if (g_thread_id == 0) { 539 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 540 pthread_mutex_unlock(&g_devlist_mutex); 541 _free_thread(thread); 542 return NULL; 543 } 544 thread->id = g_thread_id++; 545 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 546 g_thread_count++; 547 pthread_mutex_unlock(&g_devlist_mutex); 548 549 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 550 thread->id, thread->name); 551 552 if (spdk_interrupt_mode_is_enabled()) { 553 thread->in_interrupt = true; 554 rc = thread_interrupt_create(thread); 555 if (rc != 0) { 556 _free_thread(thread); 557 return NULL; 558 } 559 } 560 561 if (g_new_thread_fn) { 562 rc = g_new_thread_fn(thread); 563 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 564 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 565 } 566 567 if (rc != 0) { 568 _free_thread(thread); 569 return NULL; 570 } 571 572 thread->state = SPDK_THREAD_STATE_RUNNING; 573 574 /* If this is the first thread, save it as the app thread. Use an atomic 575 * compare + exchange to guard against crazy users who might try to 576 * call spdk_thread_create() simultaneously on multiple threads. 577 */ 578 null_thread = NULL; 579 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 580 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 581 582 return thread; 583 } 584 585 struct spdk_thread * 586 spdk_thread_get_app_thread(void) 587 { 588 return g_app_thread; 589 } 590 591 void 592 spdk_set_thread(struct spdk_thread *thread) 593 { 594 tls_thread = thread; 595 } 596 597 static void 598 thread_exit(struct spdk_thread *thread, uint64_t now) 599 { 600 struct spdk_poller *poller; 601 struct spdk_io_channel *ch; 602 603 if (now >= thread->exit_timeout_tsc) { 604 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 605 thread->name); 606 goto exited; 607 } 608 609 if (spdk_ring_count(thread->messages) > 0) { 610 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 611 return; 612 } 613 614 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 615 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 616 SPDK_INFOLOG(thread, 617 "thread %s still has active poller %s\n", 618 thread->name, poller->name); 619 return; 620 } 621 } 622 623 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 624 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 625 SPDK_INFOLOG(thread, 626 "thread %s still has active timed poller %s\n", 627 thread->name, poller->name); 628 return; 629 } 630 } 631 632 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 633 SPDK_INFOLOG(thread, 634 "thread %s still has paused poller %s\n", 635 thread->name, poller->name); 636 return; 637 } 638 639 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 640 SPDK_INFOLOG(thread, 641 "thread %s still has channel for io_device %s\n", 642 thread->name, ch->dev->name); 643 return; 644 } 645 646 if (thread->pending_unregister_count > 0) { 647 SPDK_INFOLOG(thread, 648 "thread %s is still unregistering io_devices\n", 649 thread->name); 650 return; 651 } 652 653 exited: 654 thread->state = SPDK_THREAD_STATE_EXITED; 655 if (spdk_unlikely(thread->in_interrupt)) { 656 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 657 } 658 } 659 660 static void _thread_exit(void *ctx); 661 662 int 663 spdk_thread_exit(struct spdk_thread *thread) 664 { 665 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 666 667 assert(tls_thread == thread); 668 669 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 670 SPDK_INFOLOG(thread, 671 "thread %s is already exiting\n", 672 thread->name); 673 return 0; 674 } 675 676 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 677 SPDK_THREAD_EXIT_TIMEOUT_SEC); 678 thread->state = SPDK_THREAD_STATE_EXITING; 679 680 if (spdk_interrupt_mode_is_enabled()) { 681 spdk_thread_send_msg(thread, _thread_exit, thread); 682 } 683 684 return 0; 685 } 686 687 bool 688 spdk_thread_is_running(struct spdk_thread *thread) 689 { 690 return thread->state == SPDK_THREAD_STATE_RUNNING; 691 } 692 693 bool 694 spdk_thread_is_exited(struct spdk_thread *thread) 695 { 696 return thread->state == SPDK_THREAD_STATE_EXITED; 697 } 698 699 void 700 spdk_thread_destroy(struct spdk_thread *thread) 701 { 702 assert(thread != NULL); 703 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 704 705 assert(thread->state == SPDK_THREAD_STATE_EXITED); 706 707 if (tls_thread == thread) { 708 tls_thread = NULL; 709 } 710 711 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 712 if (thread != g_app_thread) { 713 _free_thread(thread); 714 } 715 } 716 717 void * 718 spdk_thread_get_ctx(struct spdk_thread *thread) 719 { 720 if (g_ctx_sz > 0) { 721 return thread->ctx; 722 } 723 724 return NULL; 725 } 726 727 struct spdk_cpuset * 728 spdk_thread_get_cpumask(struct spdk_thread *thread) 729 { 730 return &thread->cpumask; 731 } 732 733 int 734 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 735 { 736 struct spdk_thread *thread; 737 738 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 739 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 740 assert(false); 741 return -ENOTSUP; 742 } 743 744 thread = spdk_get_thread(); 745 if (!thread) { 746 SPDK_ERRLOG("Called from non-SPDK thread\n"); 747 assert(false); 748 return -EINVAL; 749 } 750 751 spdk_cpuset_copy(&thread->cpumask, cpumask); 752 753 /* Invoke framework's reschedule operation. If this function is called multiple times 754 * in a single spdk_thread_poll() context, the last cpumask will be used in the 755 * reschedule operation. 756 */ 757 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 758 759 return 0; 760 } 761 762 struct spdk_thread * 763 spdk_thread_get_from_ctx(void *ctx) 764 { 765 if (ctx == NULL) { 766 assert(false); 767 return NULL; 768 } 769 770 assert(g_ctx_sz > 0); 771 772 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 773 } 774 775 static inline uint32_t 776 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 777 { 778 unsigned count, i; 779 void *messages[SPDK_MSG_BATCH_SIZE]; 780 uint64_t notify = 1; 781 int rc; 782 783 #ifdef DEBUG 784 /* 785 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 786 * so we will never actually read uninitialized data from events, but just to be sure 787 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 788 */ 789 memset(messages, 0, sizeof(messages)); 790 #endif 791 792 if (max_msgs > 0) { 793 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 794 } else { 795 max_msgs = SPDK_MSG_BATCH_SIZE; 796 } 797 798 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 799 if (spdk_unlikely(thread->in_interrupt) && 800 spdk_ring_count(thread->messages) != 0) { 801 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 802 if (rc < 0) { 803 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 804 } 805 } 806 if (count == 0) { 807 return 0; 808 } 809 810 for (i = 0; i < count; i++) { 811 struct spdk_msg *msg = messages[i]; 812 813 assert(msg != NULL); 814 815 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 816 817 msg->fn(msg->arg); 818 819 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 820 821 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 822 /* Insert the messages at the head. We want to re-use the hot 823 * ones. */ 824 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 825 thread->msg_cache_count++; 826 } else { 827 spdk_mempool_put(g_spdk_msg_mempool, msg); 828 } 829 } 830 831 return count; 832 } 833 834 static void 835 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 836 { 837 struct spdk_poller *tmp __attribute__((unused)); 838 839 poller->next_run_tick = now + poller->period_ticks; 840 841 /* 842 * Insert poller in the thread's timed_pollers tree by next scheduled run time 843 * as its key. 844 */ 845 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 846 assert(tmp == NULL); 847 848 /* Update the cache only if it is empty or the inserted poller is earlier than it. 849 * RB_MIN() is not necessary here because all pollers, which has exactly the same 850 * next_run_tick as the existing poller, are inserted on the right side. 851 */ 852 if (thread->first_timed_poller == NULL || 853 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 854 thread->first_timed_poller = poller; 855 } 856 } 857 858 static inline void 859 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 860 { 861 struct spdk_poller *tmp __attribute__((unused)); 862 863 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 864 assert(tmp != NULL); 865 866 /* This function is not used in any case that is performance critical. 867 * Update the cache simply by RB_MIN() if it needs to be changed. 868 */ 869 if (thread->first_timed_poller == poller) { 870 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 871 } 872 } 873 874 static void 875 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 876 { 877 if (poller->period_ticks) { 878 poller_insert_timer(thread, poller, spdk_get_ticks()); 879 } else { 880 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 881 } 882 } 883 884 static inline void 885 thread_update_stats(struct spdk_thread *thread, uint64_t end, 886 uint64_t start, int rc) 887 { 888 if (rc == 0) { 889 /* Poller status idle */ 890 thread->stats.idle_tsc += end - start; 891 } else if (rc > 0) { 892 /* Poller status busy */ 893 thread->stats.busy_tsc += end - start; 894 } 895 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 896 thread->tsc_last = end; 897 } 898 899 static inline int 900 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 901 { 902 int rc; 903 904 switch (poller->state) { 905 case SPDK_POLLER_STATE_UNREGISTERED: 906 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 907 free(poller); 908 return 0; 909 case SPDK_POLLER_STATE_PAUSING: 910 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 911 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 912 poller->state = SPDK_POLLER_STATE_PAUSED; 913 return 0; 914 case SPDK_POLLER_STATE_WAITING: 915 break; 916 default: 917 assert(false); 918 break; 919 } 920 921 poller->state = SPDK_POLLER_STATE_RUNNING; 922 rc = poller->fn(poller->arg); 923 924 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 925 926 poller->run_count++; 927 if (rc > 0) { 928 poller->busy_count++; 929 } 930 931 #ifdef DEBUG 932 if (rc == -1) { 933 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 934 } 935 #endif 936 937 switch (poller->state) { 938 case SPDK_POLLER_STATE_UNREGISTERED: 939 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 940 free(poller); 941 break; 942 case SPDK_POLLER_STATE_PAUSING: 943 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 944 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 945 poller->state = SPDK_POLLER_STATE_PAUSED; 946 break; 947 case SPDK_POLLER_STATE_PAUSED: 948 case SPDK_POLLER_STATE_WAITING: 949 break; 950 case SPDK_POLLER_STATE_RUNNING: 951 poller->state = SPDK_POLLER_STATE_WAITING; 952 break; 953 default: 954 assert(false); 955 break; 956 } 957 958 return rc; 959 } 960 961 static inline int 962 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 963 uint64_t now) 964 { 965 int rc; 966 967 switch (poller->state) { 968 case SPDK_POLLER_STATE_UNREGISTERED: 969 free(poller); 970 return 0; 971 case SPDK_POLLER_STATE_PAUSING: 972 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 973 poller->state = SPDK_POLLER_STATE_PAUSED; 974 return 0; 975 case SPDK_POLLER_STATE_WAITING: 976 break; 977 default: 978 assert(false); 979 break; 980 } 981 982 poller->state = SPDK_POLLER_STATE_RUNNING; 983 rc = poller->fn(poller->arg); 984 985 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 986 987 poller->run_count++; 988 if (rc > 0) { 989 poller->busy_count++; 990 } 991 992 #ifdef DEBUG 993 if (rc == -1) { 994 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 995 } 996 #endif 997 998 switch (poller->state) { 999 case SPDK_POLLER_STATE_UNREGISTERED: 1000 free(poller); 1001 break; 1002 case SPDK_POLLER_STATE_PAUSING: 1003 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1004 poller->state = SPDK_POLLER_STATE_PAUSED; 1005 break; 1006 case SPDK_POLLER_STATE_PAUSED: 1007 break; 1008 case SPDK_POLLER_STATE_RUNNING: 1009 poller->state = SPDK_POLLER_STATE_WAITING; 1010 /* fallthrough */ 1011 case SPDK_POLLER_STATE_WAITING: 1012 poller_insert_timer(thread, poller, now); 1013 break; 1014 default: 1015 assert(false); 1016 break; 1017 } 1018 1019 return rc; 1020 } 1021 1022 static int 1023 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1024 { 1025 uint32_t msg_count; 1026 struct spdk_poller *poller, *tmp; 1027 spdk_msg_fn critical_msg; 1028 int rc = 0; 1029 1030 thread->tsc_last = now; 1031 1032 critical_msg = thread->critical_msg; 1033 if (spdk_unlikely(critical_msg != NULL)) { 1034 critical_msg(NULL); 1035 thread->critical_msg = NULL; 1036 rc = 1; 1037 } 1038 1039 msg_count = msg_queue_run_batch(thread, max_msgs); 1040 if (msg_count) { 1041 rc = 1; 1042 } 1043 1044 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1045 active_pollers_head, tailq, tmp) { 1046 int poller_rc; 1047 1048 poller_rc = thread_execute_poller(thread, poller); 1049 if (poller_rc > rc) { 1050 rc = poller_rc; 1051 } 1052 } 1053 1054 poller = thread->first_timed_poller; 1055 while (poller != NULL) { 1056 int timer_rc = 0; 1057 1058 if (now < poller->next_run_tick) { 1059 break; 1060 } 1061 1062 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1063 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1064 1065 /* Update the cache to the next timed poller in the list 1066 * only if the current poller is still the closest, otherwise, 1067 * do nothing because the cache has been already updated. 1068 */ 1069 if (thread->first_timed_poller == poller) { 1070 thread->first_timed_poller = tmp; 1071 } 1072 1073 timer_rc = thread_execute_timed_poller(thread, poller, now); 1074 if (timer_rc > rc) { 1075 rc = timer_rc; 1076 } 1077 1078 poller = tmp; 1079 } 1080 1081 return rc; 1082 } 1083 1084 static void 1085 _thread_remove_pollers(void *ctx) 1086 { 1087 struct spdk_thread *thread = ctx; 1088 struct spdk_poller *poller, *tmp; 1089 1090 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1091 active_pollers_head, tailq, tmp) { 1092 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1093 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1094 free(poller); 1095 } 1096 } 1097 1098 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1099 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1100 poller_remove_timer(thread, poller); 1101 free(poller); 1102 } 1103 } 1104 1105 thread->poller_unregistered = false; 1106 } 1107 1108 static void 1109 _thread_exit(void *ctx) 1110 { 1111 struct spdk_thread *thread = ctx; 1112 1113 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1114 1115 thread_exit(thread, spdk_get_ticks()); 1116 } 1117 1118 int 1119 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1120 { 1121 struct spdk_thread *orig_thread; 1122 int rc; 1123 1124 orig_thread = _get_thread(); 1125 tls_thread = thread; 1126 1127 if (now == 0) { 1128 now = spdk_get_ticks(); 1129 } 1130 1131 if (spdk_likely(!thread->in_interrupt)) { 1132 rc = thread_poll(thread, max_msgs, now); 1133 if (spdk_unlikely(thread->in_interrupt)) { 1134 /* The thread transitioned to interrupt mode during the above poll. 1135 * Poll it one more time in case that during the transition time 1136 * there is msg received without notification. 1137 */ 1138 rc = thread_poll(thread, max_msgs, now); 1139 } 1140 1141 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1142 thread_exit(thread, now); 1143 } 1144 } else { 1145 /* Non-block wait on thread's fd_group */ 1146 rc = spdk_fd_group_wait(thread->fgrp, 0); 1147 } 1148 1149 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1150 1151 tls_thread = orig_thread; 1152 1153 return rc; 1154 } 1155 1156 uint64_t 1157 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1158 { 1159 struct spdk_poller *poller; 1160 1161 poller = thread->first_timed_poller; 1162 if (poller) { 1163 return poller->next_run_tick; 1164 } 1165 1166 return 0; 1167 } 1168 1169 int 1170 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1171 { 1172 return !TAILQ_EMPTY(&thread->active_pollers); 1173 } 1174 1175 static bool 1176 thread_has_unpaused_pollers(struct spdk_thread *thread) 1177 { 1178 if (TAILQ_EMPTY(&thread->active_pollers) && 1179 RB_EMPTY(&thread->timed_pollers)) { 1180 return false; 1181 } 1182 1183 return true; 1184 } 1185 1186 bool 1187 spdk_thread_has_pollers(struct spdk_thread *thread) 1188 { 1189 if (!thread_has_unpaused_pollers(thread) && 1190 TAILQ_EMPTY(&thread->paused_pollers)) { 1191 return false; 1192 } 1193 1194 return true; 1195 } 1196 1197 bool 1198 spdk_thread_is_idle(struct spdk_thread *thread) 1199 { 1200 if (spdk_ring_count(thread->messages) || 1201 thread_has_unpaused_pollers(thread) || 1202 thread->critical_msg != NULL) { 1203 return false; 1204 } 1205 1206 return true; 1207 } 1208 1209 uint32_t 1210 spdk_thread_get_count(void) 1211 { 1212 /* 1213 * Return cached value of the current thread count. We could acquire the 1214 * lock and iterate through the TAILQ of threads to count them, but that 1215 * count could still be invalidated after we release the lock. 1216 */ 1217 return g_thread_count; 1218 } 1219 1220 struct spdk_thread * 1221 spdk_get_thread(void) 1222 { 1223 return _get_thread(); 1224 } 1225 1226 const char * 1227 spdk_thread_get_name(const struct spdk_thread *thread) 1228 { 1229 return thread->name; 1230 } 1231 1232 uint64_t 1233 spdk_thread_get_id(const struct spdk_thread *thread) 1234 { 1235 return thread->id; 1236 } 1237 1238 struct spdk_thread * 1239 spdk_thread_get_by_id(uint64_t id) 1240 { 1241 struct spdk_thread *thread; 1242 1243 if (id == 0 || id >= g_thread_id) { 1244 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1245 return NULL; 1246 } 1247 pthread_mutex_lock(&g_devlist_mutex); 1248 TAILQ_FOREACH(thread, &g_threads, tailq) { 1249 if (thread->id == id) { 1250 break; 1251 } 1252 } 1253 pthread_mutex_unlock(&g_devlist_mutex); 1254 return thread; 1255 } 1256 1257 int 1258 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1259 { 1260 struct spdk_thread *thread; 1261 1262 thread = _get_thread(); 1263 if (!thread) { 1264 SPDK_ERRLOG("No thread allocated\n"); 1265 return -EINVAL; 1266 } 1267 1268 if (stats == NULL) { 1269 return -EINVAL; 1270 } 1271 1272 *stats = thread->stats; 1273 1274 return 0; 1275 } 1276 1277 uint64_t 1278 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1279 { 1280 if (thread == NULL) { 1281 thread = _get_thread(); 1282 } 1283 1284 return thread->tsc_last; 1285 } 1286 1287 static inline int 1288 thread_send_msg_notification(const struct spdk_thread *target_thread) 1289 { 1290 uint64_t notify = 1; 1291 int rc; 1292 1293 /* Not necessary to do notification if interrupt facility is not enabled */ 1294 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1295 return 0; 1296 } 1297 1298 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1299 * after sending thread msg, it is necessary to check whether target thread runs in 1300 * interrupt mode and then decide whether do event notification. 1301 */ 1302 if (spdk_unlikely(target_thread->in_interrupt)) { 1303 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1304 if (rc < 0) { 1305 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1306 return -EIO; 1307 } 1308 } 1309 1310 return 0; 1311 } 1312 1313 int 1314 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1315 { 1316 struct spdk_thread *local_thread; 1317 struct spdk_msg *msg; 1318 int rc; 1319 1320 assert(thread != NULL); 1321 1322 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1323 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1324 return -EIO; 1325 } 1326 1327 local_thread = _get_thread(); 1328 1329 msg = NULL; 1330 if (local_thread != NULL) { 1331 if (local_thread->msg_cache_count > 0) { 1332 msg = SLIST_FIRST(&local_thread->msg_cache); 1333 assert(msg != NULL); 1334 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1335 local_thread->msg_cache_count--; 1336 } 1337 } 1338 1339 if (msg == NULL) { 1340 msg = spdk_mempool_get(g_spdk_msg_mempool); 1341 if (!msg) { 1342 SPDK_ERRLOG("msg could not be allocated\n"); 1343 return -ENOMEM; 1344 } 1345 } 1346 1347 msg->fn = fn; 1348 msg->arg = ctx; 1349 1350 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1351 if (rc != 1) { 1352 SPDK_ERRLOG("msg could not be enqueued\n"); 1353 spdk_mempool_put(g_spdk_msg_mempool, msg); 1354 return -EIO; 1355 } 1356 1357 return thread_send_msg_notification(thread); 1358 } 1359 1360 int 1361 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1362 { 1363 spdk_msg_fn expected = NULL; 1364 1365 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1366 __ATOMIC_SEQ_CST)) { 1367 return -EIO; 1368 } 1369 1370 return thread_send_msg_notification(thread); 1371 } 1372 1373 #ifdef __linux__ 1374 static int 1375 interrupt_timerfd_process(void *arg) 1376 { 1377 struct spdk_poller *poller = arg; 1378 uint64_t exp; 1379 int rc; 1380 1381 /* clear the level of interval timer */ 1382 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1383 if (rc < 0) { 1384 if (rc == -EAGAIN) { 1385 return 0; 1386 } 1387 1388 return rc; 1389 } 1390 1391 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1392 1393 return poller->fn(poller->arg); 1394 } 1395 1396 static int 1397 period_poller_interrupt_init(struct spdk_poller *poller) 1398 { 1399 int timerfd; 1400 1401 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1402 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1403 if (timerfd < 0) { 1404 return -errno; 1405 } 1406 1407 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1408 if (poller->intr == NULL) { 1409 close(timerfd); 1410 return -1; 1411 } 1412 1413 return 0; 1414 } 1415 1416 static void 1417 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1418 { 1419 int timerfd; 1420 uint64_t now_tick = spdk_get_ticks(); 1421 uint64_t ticks = spdk_get_ticks_hz(); 1422 int ret; 1423 struct itimerspec new_tv = {}; 1424 struct itimerspec old_tv = {}; 1425 1426 assert(poller->intr != NULL); 1427 assert(poller->period_ticks != 0); 1428 1429 timerfd = poller->intr->efd; 1430 1431 assert(timerfd >= 0); 1432 1433 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1434 interrupt_mode ? "interrupt" : "poll"); 1435 1436 if (interrupt_mode) { 1437 /* Set repeated timer expiration */ 1438 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1439 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1440 1441 /* Update next timer expiration */ 1442 if (poller->next_run_tick == 0) { 1443 poller->next_run_tick = now_tick + poller->period_ticks; 1444 } else if (poller->next_run_tick < now_tick) { 1445 poller->next_run_tick = now_tick; 1446 } 1447 1448 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1449 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1450 1451 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1452 if (ret < 0) { 1453 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1454 assert(false); 1455 } 1456 } else { 1457 /* Disarm the timer */ 1458 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1459 if (ret < 0) { 1460 /* timerfd_settime's failure indicates that the timerfd is in error */ 1461 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1462 assert(false); 1463 } 1464 1465 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1466 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1467 */ 1468 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1469 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1470 poller_remove_timer(poller->thread, poller); 1471 poller_insert_timer(poller->thread, poller, now_tick); 1472 } 1473 } 1474 1475 static void 1476 poller_interrupt_fini(struct spdk_poller *poller) 1477 { 1478 int fd; 1479 1480 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1481 assert(poller->intr != NULL); 1482 fd = poller->intr->efd; 1483 spdk_interrupt_unregister(&poller->intr); 1484 close(fd); 1485 } 1486 1487 static int 1488 busy_poller_interrupt_init(struct spdk_poller *poller) 1489 { 1490 int busy_efd; 1491 1492 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1493 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1494 if (busy_efd < 0) { 1495 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1496 return -errno; 1497 } 1498 1499 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1500 if (poller->intr == NULL) { 1501 close(busy_efd); 1502 return -1; 1503 } 1504 1505 return 0; 1506 } 1507 1508 static void 1509 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1510 { 1511 int busy_efd = poller->intr->efd; 1512 uint64_t notify = 1; 1513 int rc __attribute__((unused)); 1514 1515 assert(busy_efd >= 0); 1516 1517 if (interrupt_mode) { 1518 /* Write without read on eventfd will get it repeatedly triggered. */ 1519 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1520 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1521 } 1522 } else { 1523 /* Read on eventfd will clear its level triggering. */ 1524 rc = read(busy_efd, ¬ify, sizeof(notify)); 1525 } 1526 } 1527 1528 #else 1529 1530 static int 1531 period_poller_interrupt_init(struct spdk_poller *poller) 1532 { 1533 return -ENOTSUP; 1534 } 1535 1536 static void 1537 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1538 { 1539 } 1540 1541 static void 1542 poller_interrupt_fini(struct spdk_poller *poller) 1543 { 1544 } 1545 1546 static int 1547 busy_poller_interrupt_init(struct spdk_poller *poller) 1548 { 1549 return -ENOTSUP; 1550 } 1551 1552 static void 1553 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1554 { 1555 } 1556 1557 #endif 1558 1559 void 1560 spdk_poller_register_interrupt(struct spdk_poller *poller, 1561 spdk_poller_set_interrupt_mode_cb cb_fn, 1562 void *cb_arg) 1563 { 1564 assert(poller != NULL); 1565 assert(cb_fn != NULL); 1566 assert(spdk_get_thread() == poller->thread); 1567 1568 if (!spdk_interrupt_mode_is_enabled()) { 1569 return; 1570 } 1571 1572 /* If this poller already had an interrupt, clean the old one up. */ 1573 if (poller->intr != NULL) { 1574 poller_interrupt_fini(poller); 1575 } 1576 1577 poller->set_intr_cb_fn = cb_fn; 1578 poller->set_intr_cb_arg = cb_arg; 1579 1580 /* Set poller into interrupt mode if thread is in interrupt. */ 1581 if (poller->thread->in_interrupt) { 1582 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1583 } 1584 } 1585 1586 static uint64_t 1587 convert_us_to_ticks(uint64_t us) 1588 { 1589 uint64_t quotient, remainder, ticks; 1590 1591 if (us) { 1592 quotient = us / SPDK_SEC_TO_USEC; 1593 remainder = us % SPDK_SEC_TO_USEC; 1594 ticks = spdk_get_ticks_hz(); 1595 1596 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1597 } else { 1598 return 0; 1599 } 1600 } 1601 1602 static struct spdk_poller * 1603 poller_register(spdk_poller_fn fn, 1604 void *arg, 1605 uint64_t period_microseconds, 1606 const char *name) 1607 { 1608 struct spdk_thread *thread; 1609 struct spdk_poller *poller; 1610 1611 thread = spdk_get_thread(); 1612 if (!thread) { 1613 assert(false); 1614 return NULL; 1615 } 1616 1617 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1618 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1619 return NULL; 1620 } 1621 1622 poller = calloc(1, sizeof(*poller)); 1623 if (poller == NULL) { 1624 SPDK_ERRLOG("Poller memory allocation failed\n"); 1625 return NULL; 1626 } 1627 1628 if (name) { 1629 snprintf(poller->name, sizeof(poller->name), "%s", name); 1630 } else { 1631 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1632 } 1633 1634 poller->state = SPDK_POLLER_STATE_WAITING; 1635 poller->fn = fn; 1636 poller->arg = arg; 1637 poller->thread = thread; 1638 poller->intr = NULL; 1639 if (thread->next_poller_id == 0) { 1640 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1641 thread->next_poller_id = 1; 1642 } 1643 poller->id = thread->next_poller_id++; 1644 1645 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1646 1647 if (spdk_interrupt_mode_is_enabled()) { 1648 int rc; 1649 1650 if (period_microseconds) { 1651 rc = period_poller_interrupt_init(poller); 1652 if (rc < 0) { 1653 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1654 free(poller); 1655 return NULL; 1656 } 1657 1658 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1659 poller->set_intr_cb_arg = NULL; 1660 1661 } else { 1662 /* If the poller doesn't have a period, create interruptfd that's always 1663 * busy automatically when running in interrupt mode. 1664 */ 1665 rc = busy_poller_interrupt_init(poller); 1666 if (rc > 0) { 1667 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1668 free(poller); 1669 return NULL; 1670 } 1671 1672 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1673 poller->set_intr_cb_arg = NULL; 1674 } 1675 1676 /* Set poller into interrupt mode if thread is in interrupt. */ 1677 if (poller->thread->in_interrupt) { 1678 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1679 } 1680 } 1681 1682 thread_insert_poller(thread, poller); 1683 1684 return poller; 1685 } 1686 1687 struct spdk_poller * 1688 spdk_poller_register(spdk_poller_fn fn, 1689 void *arg, 1690 uint64_t period_microseconds) 1691 { 1692 return poller_register(fn, arg, period_microseconds, NULL); 1693 } 1694 1695 struct spdk_poller * 1696 spdk_poller_register_named(spdk_poller_fn fn, 1697 void *arg, 1698 uint64_t period_microseconds, 1699 const char *name) 1700 { 1701 return poller_register(fn, arg, period_microseconds, name); 1702 } 1703 1704 static void 1705 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1706 struct spdk_thread *curthread) 1707 { 1708 if (thread == NULL) { 1709 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1710 abort(); 1711 } 1712 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1713 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1714 thread->name, thread->id); 1715 assert(false); 1716 } 1717 1718 void 1719 spdk_poller_unregister(struct spdk_poller **ppoller) 1720 { 1721 struct spdk_thread *thread; 1722 struct spdk_poller *poller; 1723 1724 poller = *ppoller; 1725 if (poller == NULL) { 1726 return; 1727 } 1728 1729 *ppoller = NULL; 1730 1731 thread = spdk_get_thread(); 1732 if (!thread) { 1733 assert(false); 1734 return; 1735 } 1736 1737 if (poller->thread != thread) { 1738 wrong_thread(__func__, poller->name, poller->thread, thread); 1739 return; 1740 } 1741 1742 if (spdk_interrupt_mode_is_enabled()) { 1743 /* Release the interrupt resource for period or busy poller */ 1744 if (poller->intr != NULL) { 1745 poller_interrupt_fini(poller); 1746 } 1747 1748 /* If there is not already a pending poller removal, generate 1749 * a message to go process removals. */ 1750 if (!thread->poller_unregistered) { 1751 thread->poller_unregistered = true; 1752 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1753 } 1754 } 1755 1756 /* If the poller was paused, put it on the active_pollers list so that 1757 * its unregistration can be processed by spdk_thread_poll(). 1758 */ 1759 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1760 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1761 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1762 poller->period_ticks = 0; 1763 } 1764 1765 /* Simply set the state to unregistered. The poller will get cleaned up 1766 * in a subsequent call to spdk_thread_poll(). 1767 */ 1768 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1769 } 1770 1771 void 1772 spdk_poller_pause(struct spdk_poller *poller) 1773 { 1774 struct spdk_thread *thread; 1775 1776 thread = spdk_get_thread(); 1777 if (!thread) { 1778 assert(false); 1779 return; 1780 } 1781 1782 if (poller->thread != thread) { 1783 wrong_thread(__func__, poller->name, poller->thread, thread); 1784 return; 1785 } 1786 1787 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1788 * spdk_thread_poll() move it. It allows a poller to be paused from 1789 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1790 * iteration, or from within itself without breaking the logic to always 1791 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1792 */ 1793 switch (poller->state) { 1794 case SPDK_POLLER_STATE_PAUSED: 1795 case SPDK_POLLER_STATE_PAUSING: 1796 break; 1797 case SPDK_POLLER_STATE_RUNNING: 1798 case SPDK_POLLER_STATE_WAITING: 1799 poller->state = SPDK_POLLER_STATE_PAUSING; 1800 break; 1801 default: 1802 assert(false); 1803 break; 1804 } 1805 } 1806 1807 void 1808 spdk_poller_resume(struct spdk_poller *poller) 1809 { 1810 struct spdk_thread *thread; 1811 1812 thread = spdk_get_thread(); 1813 if (!thread) { 1814 assert(false); 1815 return; 1816 } 1817 1818 if (poller->thread != thread) { 1819 wrong_thread(__func__, poller->name, poller->thread, thread); 1820 return; 1821 } 1822 1823 /* If a poller is paused it has to be removed from the paused pollers 1824 * list and put on the active list or timer tree depending on its 1825 * period_ticks. If a poller is still in the process of being paused, 1826 * we just need to flip its state back to waiting, as it's already on 1827 * the appropriate list or tree. 1828 */ 1829 switch (poller->state) { 1830 case SPDK_POLLER_STATE_PAUSED: 1831 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1832 thread_insert_poller(thread, poller); 1833 /* fallthrough */ 1834 case SPDK_POLLER_STATE_PAUSING: 1835 poller->state = SPDK_POLLER_STATE_WAITING; 1836 break; 1837 case SPDK_POLLER_STATE_RUNNING: 1838 case SPDK_POLLER_STATE_WAITING: 1839 break; 1840 default: 1841 assert(false); 1842 break; 1843 } 1844 } 1845 1846 const char * 1847 spdk_poller_get_name(struct spdk_poller *poller) 1848 { 1849 return poller->name; 1850 } 1851 1852 uint64_t 1853 spdk_poller_get_id(struct spdk_poller *poller) 1854 { 1855 return poller->id; 1856 } 1857 1858 const char * 1859 spdk_poller_get_state_str(struct spdk_poller *poller) 1860 { 1861 switch (poller->state) { 1862 case SPDK_POLLER_STATE_WAITING: 1863 return "waiting"; 1864 case SPDK_POLLER_STATE_RUNNING: 1865 return "running"; 1866 case SPDK_POLLER_STATE_UNREGISTERED: 1867 return "unregistered"; 1868 case SPDK_POLLER_STATE_PAUSING: 1869 return "pausing"; 1870 case SPDK_POLLER_STATE_PAUSED: 1871 return "paused"; 1872 default: 1873 return NULL; 1874 } 1875 } 1876 1877 uint64_t 1878 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1879 { 1880 return poller->period_ticks; 1881 } 1882 1883 void 1884 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1885 { 1886 stats->run_count = poller->run_count; 1887 stats->busy_count = poller->busy_count; 1888 } 1889 1890 struct spdk_poller * 1891 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1892 { 1893 return TAILQ_FIRST(&thread->active_pollers); 1894 } 1895 1896 struct spdk_poller * 1897 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1898 { 1899 return TAILQ_NEXT(prev, tailq); 1900 } 1901 1902 struct spdk_poller * 1903 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1904 { 1905 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1906 } 1907 1908 struct spdk_poller * 1909 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1910 { 1911 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1912 } 1913 1914 struct spdk_poller * 1915 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1916 { 1917 return TAILQ_FIRST(&thread->paused_pollers); 1918 } 1919 1920 struct spdk_poller * 1921 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1922 { 1923 return TAILQ_NEXT(prev, tailq); 1924 } 1925 1926 struct spdk_io_channel * 1927 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1928 { 1929 return RB_MIN(io_channel_tree, &thread->io_channels); 1930 } 1931 1932 struct spdk_io_channel * 1933 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1934 { 1935 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1936 } 1937 1938 struct call_thread { 1939 struct spdk_thread *cur_thread; 1940 spdk_msg_fn fn; 1941 void *ctx; 1942 1943 struct spdk_thread *orig_thread; 1944 spdk_msg_fn cpl; 1945 }; 1946 1947 static void 1948 _on_thread(void *ctx) 1949 { 1950 struct call_thread *ct = ctx; 1951 int rc __attribute__((unused)); 1952 1953 ct->fn(ct->ctx); 1954 1955 pthread_mutex_lock(&g_devlist_mutex); 1956 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1957 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1958 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1959 ct->cur_thread->name); 1960 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1961 } 1962 pthread_mutex_unlock(&g_devlist_mutex); 1963 1964 if (!ct->cur_thread) { 1965 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1966 1967 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1968 free(ctx); 1969 } else { 1970 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1971 ct->cur_thread->name); 1972 1973 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1974 } 1975 assert(rc == 0); 1976 } 1977 1978 void 1979 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1980 { 1981 struct call_thread *ct; 1982 struct spdk_thread *thread; 1983 int rc __attribute__((unused)); 1984 1985 ct = calloc(1, sizeof(*ct)); 1986 if (!ct) { 1987 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1988 cpl(ctx); 1989 return; 1990 } 1991 1992 ct->fn = fn; 1993 ct->ctx = ctx; 1994 ct->cpl = cpl; 1995 1996 thread = _get_thread(); 1997 if (!thread) { 1998 SPDK_ERRLOG("No thread allocated\n"); 1999 free(ct); 2000 cpl(ctx); 2001 return; 2002 } 2003 ct->orig_thread = thread; 2004 2005 pthread_mutex_lock(&g_devlist_mutex); 2006 ct->cur_thread = TAILQ_FIRST(&g_threads); 2007 pthread_mutex_unlock(&g_devlist_mutex); 2008 2009 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2010 ct->orig_thread->name); 2011 2012 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2013 assert(rc == 0); 2014 } 2015 2016 static inline void 2017 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2018 { 2019 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2020 return; 2021 } 2022 2023 if (!poller->set_intr_cb_fn) { 2024 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2025 assert(false); 2026 return; 2027 } 2028 2029 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2030 } 2031 2032 void 2033 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2034 { 2035 struct spdk_thread *thread = _get_thread(); 2036 struct spdk_poller *poller, *tmp; 2037 2038 assert(thread); 2039 assert(spdk_interrupt_mode_is_enabled()); 2040 2041 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2042 thread->name, enable_interrupt ? "intr" : "poll", 2043 thread->in_interrupt ? "intr" : "poll"); 2044 2045 if (thread->in_interrupt == enable_interrupt) { 2046 return; 2047 } 2048 2049 /* Set pollers to expected mode */ 2050 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2051 poller_set_interrupt_mode(poller, enable_interrupt); 2052 } 2053 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2054 poller_set_interrupt_mode(poller, enable_interrupt); 2055 } 2056 /* All paused pollers will go to work in interrupt mode */ 2057 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2058 poller_set_interrupt_mode(poller, enable_interrupt); 2059 } 2060 2061 thread->in_interrupt = enable_interrupt; 2062 return; 2063 } 2064 2065 static struct io_device * 2066 io_device_get(void *io_device) 2067 { 2068 struct io_device find = {}; 2069 2070 find.io_device = io_device; 2071 return RB_FIND(io_device_tree, &g_io_devices, &find); 2072 } 2073 2074 void 2075 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2076 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2077 const char *name) 2078 { 2079 struct io_device *dev, *tmp; 2080 struct spdk_thread *thread; 2081 2082 assert(io_device != NULL); 2083 assert(create_cb != NULL); 2084 assert(destroy_cb != NULL); 2085 2086 thread = spdk_get_thread(); 2087 if (!thread) { 2088 SPDK_ERRLOG("called from non-SPDK thread\n"); 2089 assert(false); 2090 return; 2091 } 2092 2093 dev = calloc(1, sizeof(struct io_device)); 2094 if (dev == NULL) { 2095 SPDK_ERRLOG("could not allocate io_device\n"); 2096 return; 2097 } 2098 2099 dev->io_device = io_device; 2100 if (name) { 2101 snprintf(dev->name, sizeof(dev->name), "%s", name); 2102 } else { 2103 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2104 } 2105 dev->create_cb = create_cb; 2106 dev->destroy_cb = destroy_cb; 2107 dev->unregister_cb = NULL; 2108 dev->ctx_size = ctx_size; 2109 dev->for_each_count = 0; 2110 dev->unregistered = false; 2111 dev->refcnt = 0; 2112 2113 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2114 dev->name, dev->io_device, thread->name); 2115 2116 pthread_mutex_lock(&g_devlist_mutex); 2117 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2118 if (tmp != NULL) { 2119 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2120 io_device, tmp->name, dev->name); 2121 free(dev); 2122 } 2123 2124 pthread_mutex_unlock(&g_devlist_mutex); 2125 } 2126 2127 static void 2128 _finish_unregister(void *arg) 2129 { 2130 struct io_device *dev = arg; 2131 struct spdk_thread *thread; 2132 2133 thread = spdk_get_thread(); 2134 assert(thread == dev->unregister_thread); 2135 2136 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2137 dev->name, dev->io_device, thread->name); 2138 2139 assert(thread->pending_unregister_count > 0); 2140 thread->pending_unregister_count--; 2141 2142 dev->unregister_cb(dev->io_device); 2143 free(dev); 2144 } 2145 2146 static void 2147 io_device_free(struct io_device *dev) 2148 { 2149 int rc __attribute__((unused)); 2150 2151 if (dev->unregister_cb == NULL) { 2152 free(dev); 2153 } else { 2154 assert(dev->unregister_thread != NULL); 2155 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2156 dev->name, dev->io_device, dev->unregister_thread->name); 2157 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2158 assert(rc == 0); 2159 } 2160 } 2161 2162 void 2163 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2164 { 2165 struct io_device *dev; 2166 uint32_t refcnt; 2167 struct spdk_thread *thread; 2168 2169 thread = spdk_get_thread(); 2170 if (!thread) { 2171 SPDK_ERRLOG("called from non-SPDK thread\n"); 2172 assert(false); 2173 return; 2174 } 2175 2176 pthread_mutex_lock(&g_devlist_mutex); 2177 dev = io_device_get(io_device); 2178 if (!dev) { 2179 SPDK_ERRLOG("io_device %p not found\n", io_device); 2180 assert(false); 2181 pthread_mutex_unlock(&g_devlist_mutex); 2182 return; 2183 } 2184 2185 /* The for_each_count check differentiates the user attempting to unregister the 2186 * device a second time, from the internal call to this function that occurs 2187 * after the for_each_count reaches 0. 2188 */ 2189 if (dev->pending_unregister && dev->for_each_count > 0) { 2190 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2191 assert(false); 2192 pthread_mutex_unlock(&g_devlist_mutex); 2193 return; 2194 } 2195 2196 dev->unregister_cb = unregister_cb; 2197 dev->unregister_thread = thread; 2198 2199 if (dev->for_each_count > 0) { 2200 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2201 dev->name, io_device, dev->for_each_count); 2202 dev->pending_unregister = true; 2203 pthread_mutex_unlock(&g_devlist_mutex); 2204 return; 2205 } 2206 2207 dev->unregistered = true; 2208 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2209 refcnt = dev->refcnt; 2210 pthread_mutex_unlock(&g_devlist_mutex); 2211 2212 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2213 dev->name, dev->io_device, thread->name); 2214 2215 if (unregister_cb) { 2216 thread->pending_unregister_count++; 2217 } 2218 2219 if (refcnt > 0) { 2220 /* defer deletion */ 2221 return; 2222 } 2223 2224 io_device_free(dev); 2225 } 2226 2227 const char * 2228 spdk_io_device_get_name(struct io_device *dev) 2229 { 2230 return dev->name; 2231 } 2232 2233 static struct spdk_io_channel * 2234 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2235 { 2236 struct spdk_io_channel find = {}; 2237 2238 find.dev = dev; 2239 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2240 } 2241 2242 struct spdk_io_channel * 2243 spdk_get_io_channel(void *io_device) 2244 { 2245 struct spdk_io_channel *ch; 2246 struct spdk_thread *thread; 2247 struct io_device *dev; 2248 int rc; 2249 2250 pthread_mutex_lock(&g_devlist_mutex); 2251 dev = io_device_get(io_device); 2252 if (dev == NULL) { 2253 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2254 pthread_mutex_unlock(&g_devlist_mutex); 2255 return NULL; 2256 } 2257 2258 thread = _get_thread(); 2259 if (!thread) { 2260 SPDK_ERRLOG("No thread allocated\n"); 2261 pthread_mutex_unlock(&g_devlist_mutex); 2262 return NULL; 2263 } 2264 2265 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2266 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2267 pthread_mutex_unlock(&g_devlist_mutex); 2268 return NULL; 2269 } 2270 2271 ch = thread_get_io_channel(thread, dev); 2272 if (ch != NULL) { 2273 ch->ref++; 2274 2275 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2276 ch, dev->name, dev->io_device, thread->name, ch->ref); 2277 2278 /* 2279 * An I/O channel already exists for this device on this 2280 * thread, so return it. 2281 */ 2282 pthread_mutex_unlock(&g_devlist_mutex); 2283 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2284 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2285 return ch; 2286 } 2287 2288 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2289 if (ch == NULL) { 2290 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2291 pthread_mutex_unlock(&g_devlist_mutex); 2292 return NULL; 2293 } 2294 2295 ch->dev = dev; 2296 ch->destroy_cb = dev->destroy_cb; 2297 ch->thread = thread; 2298 ch->ref = 1; 2299 ch->destroy_ref = 0; 2300 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2301 2302 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2303 ch, dev->name, dev->io_device, thread->name, ch->ref); 2304 2305 dev->refcnt++; 2306 2307 pthread_mutex_unlock(&g_devlist_mutex); 2308 2309 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2310 if (rc != 0) { 2311 pthread_mutex_lock(&g_devlist_mutex); 2312 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2313 dev->refcnt--; 2314 free(ch); 2315 pthread_mutex_unlock(&g_devlist_mutex); 2316 return NULL; 2317 } 2318 2319 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2320 return ch; 2321 } 2322 2323 static void 2324 put_io_channel(void *arg) 2325 { 2326 struct spdk_io_channel *ch = arg; 2327 bool do_remove_dev = true; 2328 struct spdk_thread *thread; 2329 2330 thread = spdk_get_thread(); 2331 if (!thread) { 2332 SPDK_ERRLOG("called from non-SPDK thread\n"); 2333 assert(false); 2334 return; 2335 } 2336 2337 SPDK_DEBUGLOG(thread, 2338 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2339 ch, ch->dev->name, ch->dev->io_device, thread->name); 2340 2341 assert(ch->thread == thread); 2342 2343 ch->destroy_ref--; 2344 2345 if (ch->ref > 0 || ch->destroy_ref > 0) { 2346 /* 2347 * Another reference to the associated io_device was requested 2348 * after this message was sent but before it had a chance to 2349 * execute. 2350 */ 2351 return; 2352 } 2353 2354 pthread_mutex_lock(&g_devlist_mutex); 2355 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2356 pthread_mutex_unlock(&g_devlist_mutex); 2357 2358 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2359 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2360 2361 pthread_mutex_lock(&g_devlist_mutex); 2362 ch->dev->refcnt--; 2363 2364 if (!ch->dev->unregistered) { 2365 do_remove_dev = false; 2366 } 2367 2368 if (ch->dev->refcnt > 0) { 2369 do_remove_dev = false; 2370 } 2371 2372 pthread_mutex_unlock(&g_devlist_mutex); 2373 2374 if (do_remove_dev) { 2375 io_device_free(ch->dev); 2376 } 2377 free(ch); 2378 } 2379 2380 void 2381 spdk_put_io_channel(struct spdk_io_channel *ch) 2382 { 2383 struct spdk_thread *thread; 2384 int rc __attribute__((unused)); 2385 2386 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2387 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2388 2389 thread = spdk_get_thread(); 2390 if (!thread) { 2391 SPDK_ERRLOG("called from non-SPDK thread\n"); 2392 assert(false); 2393 return; 2394 } 2395 2396 if (ch->thread != thread) { 2397 wrong_thread(__func__, "ch", ch->thread, thread); 2398 return; 2399 } 2400 2401 SPDK_DEBUGLOG(thread, 2402 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2403 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2404 2405 ch->ref--; 2406 2407 if (ch->ref == 0) { 2408 ch->destroy_ref++; 2409 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2410 assert(rc == 0); 2411 } 2412 } 2413 2414 struct spdk_io_channel * 2415 spdk_io_channel_from_ctx(void *ctx) 2416 { 2417 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2418 } 2419 2420 struct spdk_thread * 2421 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2422 { 2423 return ch->thread; 2424 } 2425 2426 void * 2427 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2428 { 2429 return ch->dev->io_device; 2430 } 2431 2432 const char * 2433 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2434 { 2435 return spdk_io_device_get_name(ch->dev); 2436 } 2437 2438 int 2439 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2440 { 2441 return ch->ref; 2442 } 2443 2444 struct spdk_io_channel_iter { 2445 void *io_device; 2446 struct io_device *dev; 2447 spdk_channel_msg fn; 2448 int status; 2449 void *ctx; 2450 struct spdk_io_channel *ch; 2451 2452 struct spdk_thread *cur_thread; 2453 2454 struct spdk_thread *orig_thread; 2455 spdk_channel_for_each_cpl cpl; 2456 }; 2457 2458 void * 2459 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2460 { 2461 return i->io_device; 2462 } 2463 2464 struct spdk_io_channel * 2465 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2466 { 2467 return i->ch; 2468 } 2469 2470 void * 2471 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2472 { 2473 return i->ctx; 2474 } 2475 2476 static void 2477 _call_completion(void *ctx) 2478 { 2479 struct spdk_io_channel_iter *i = ctx; 2480 2481 if (i->cpl != NULL) { 2482 i->cpl(i, i->status); 2483 } 2484 free(i); 2485 } 2486 2487 static void 2488 _call_channel(void *ctx) 2489 { 2490 struct spdk_io_channel_iter *i = ctx; 2491 struct spdk_io_channel *ch; 2492 2493 /* 2494 * It is possible that the channel was deleted before this 2495 * message had a chance to execute. If so, skip calling 2496 * the fn() on this thread. 2497 */ 2498 pthread_mutex_lock(&g_devlist_mutex); 2499 ch = thread_get_io_channel(i->cur_thread, i->dev); 2500 pthread_mutex_unlock(&g_devlist_mutex); 2501 2502 if (ch) { 2503 i->fn(i); 2504 } else { 2505 spdk_for_each_channel_continue(i, 0); 2506 } 2507 } 2508 2509 void 2510 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2511 spdk_channel_for_each_cpl cpl) 2512 { 2513 struct spdk_thread *thread; 2514 struct spdk_io_channel *ch; 2515 struct spdk_io_channel_iter *i; 2516 int rc __attribute__((unused)); 2517 2518 i = calloc(1, sizeof(*i)); 2519 if (!i) { 2520 SPDK_ERRLOG("Unable to allocate iterator\n"); 2521 assert(false); 2522 return; 2523 } 2524 2525 i->io_device = io_device; 2526 i->fn = fn; 2527 i->ctx = ctx; 2528 i->cpl = cpl; 2529 i->orig_thread = _get_thread(); 2530 2531 pthread_mutex_lock(&g_devlist_mutex); 2532 i->dev = io_device_get(io_device); 2533 if (i->dev == NULL) { 2534 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2535 assert(false); 2536 i->status = -ENODEV; 2537 goto end; 2538 } 2539 2540 /* Do not allow new for_each operations if we are already waiting to unregister 2541 * the device for other for_each operations to complete. 2542 */ 2543 if (i->dev->pending_unregister) { 2544 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2545 i->status = -ENODEV; 2546 goto end; 2547 } 2548 2549 TAILQ_FOREACH(thread, &g_threads, tailq) { 2550 ch = thread_get_io_channel(thread, i->dev); 2551 if (ch != NULL) { 2552 ch->dev->for_each_count++; 2553 i->cur_thread = thread; 2554 i->ch = ch; 2555 pthread_mutex_unlock(&g_devlist_mutex); 2556 rc = spdk_thread_send_msg(thread, _call_channel, i); 2557 assert(rc == 0); 2558 return; 2559 } 2560 } 2561 2562 end: 2563 pthread_mutex_unlock(&g_devlist_mutex); 2564 2565 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2566 assert(rc == 0); 2567 } 2568 2569 static void 2570 __pending_unregister(void *arg) 2571 { 2572 struct io_device *dev = arg; 2573 2574 assert(dev->pending_unregister); 2575 assert(dev->for_each_count == 0); 2576 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2577 } 2578 2579 void 2580 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2581 { 2582 struct spdk_thread *thread; 2583 struct spdk_io_channel *ch; 2584 struct io_device *dev; 2585 int rc __attribute__((unused)); 2586 2587 assert(i->cur_thread == spdk_get_thread()); 2588 2589 i->status = status; 2590 2591 pthread_mutex_lock(&g_devlist_mutex); 2592 dev = i->dev; 2593 if (status) { 2594 goto end; 2595 } 2596 2597 thread = TAILQ_NEXT(i->cur_thread, tailq); 2598 while (thread) { 2599 ch = thread_get_io_channel(thread, dev); 2600 if (ch != NULL) { 2601 i->cur_thread = thread; 2602 i->ch = ch; 2603 pthread_mutex_unlock(&g_devlist_mutex); 2604 rc = spdk_thread_send_msg(thread, _call_channel, i); 2605 assert(rc == 0); 2606 return; 2607 } 2608 thread = TAILQ_NEXT(thread, tailq); 2609 } 2610 2611 end: 2612 dev->for_each_count--; 2613 i->ch = NULL; 2614 pthread_mutex_unlock(&g_devlist_mutex); 2615 2616 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2617 assert(rc == 0); 2618 2619 pthread_mutex_lock(&g_devlist_mutex); 2620 if (dev->pending_unregister && dev->for_each_count == 0) { 2621 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2622 assert(rc == 0); 2623 } 2624 pthread_mutex_unlock(&g_devlist_mutex); 2625 } 2626 2627 static void 2628 thread_interrupt_destroy(struct spdk_thread *thread) 2629 { 2630 struct spdk_fd_group *fgrp = thread->fgrp; 2631 2632 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2633 2634 if (thread->msg_fd < 0) { 2635 return; 2636 } 2637 2638 spdk_fd_group_remove(fgrp, thread->msg_fd); 2639 close(thread->msg_fd); 2640 thread->msg_fd = -1; 2641 2642 spdk_fd_group_destroy(fgrp); 2643 thread->fgrp = NULL; 2644 } 2645 2646 #ifdef __linux__ 2647 static int 2648 thread_interrupt_msg_process(void *arg) 2649 { 2650 struct spdk_thread *thread = arg; 2651 struct spdk_thread *orig_thread; 2652 uint32_t msg_count; 2653 spdk_msg_fn critical_msg; 2654 int rc = 0; 2655 uint64_t notify = 1; 2656 2657 assert(spdk_interrupt_mode_is_enabled()); 2658 2659 orig_thread = spdk_get_thread(); 2660 spdk_set_thread(thread); 2661 2662 /* There may be race between msg_acknowledge and another producer's msg_notify, 2663 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2664 * This can avoid msg notification missing. 2665 */ 2666 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2667 if (rc < 0 && errno != EAGAIN) { 2668 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2669 } 2670 2671 critical_msg = thread->critical_msg; 2672 if (spdk_unlikely(critical_msg != NULL)) { 2673 critical_msg(NULL); 2674 thread->critical_msg = NULL; 2675 rc = 1; 2676 } 2677 2678 msg_count = msg_queue_run_batch(thread, 0); 2679 if (msg_count) { 2680 rc = 1; 2681 } 2682 2683 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2684 if (spdk_unlikely(!thread->in_interrupt)) { 2685 /* The thread transitioned to poll mode in a msg during the above processing. 2686 * Clear msg_fd since thread messages will be polled directly in poll mode. 2687 */ 2688 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2689 if (rc < 0 && errno != EAGAIN) { 2690 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2691 } 2692 } 2693 2694 spdk_set_thread(orig_thread); 2695 return rc; 2696 } 2697 2698 static int 2699 thread_interrupt_create(struct spdk_thread *thread) 2700 { 2701 int rc; 2702 2703 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2704 2705 rc = spdk_fd_group_create(&thread->fgrp); 2706 if (rc) { 2707 return rc; 2708 } 2709 2710 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2711 if (thread->msg_fd < 0) { 2712 rc = -errno; 2713 spdk_fd_group_destroy(thread->fgrp); 2714 thread->fgrp = NULL; 2715 2716 return rc; 2717 } 2718 2719 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2720 thread_interrupt_msg_process, thread); 2721 } 2722 #else 2723 static int 2724 thread_interrupt_create(struct spdk_thread *thread) 2725 { 2726 return -ENOTSUP; 2727 } 2728 #endif 2729 2730 static int 2731 _interrupt_wrapper(void *ctx) 2732 { 2733 struct spdk_interrupt *intr = ctx; 2734 struct spdk_thread *orig_thread, *thread; 2735 int rc; 2736 2737 orig_thread = spdk_get_thread(); 2738 thread = intr->thread; 2739 2740 spdk_set_thread(thread); 2741 2742 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2743 intr->fn, intr->arg); 2744 2745 rc = intr->fn(intr->arg); 2746 2747 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2748 2749 spdk_set_thread(orig_thread); 2750 2751 return rc; 2752 } 2753 2754 struct spdk_interrupt * 2755 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2756 void *arg, const char *name) 2757 { 2758 struct spdk_thread *thread; 2759 struct spdk_interrupt *intr; 2760 int ret; 2761 2762 thread = spdk_get_thread(); 2763 if (!thread) { 2764 assert(false); 2765 return NULL; 2766 } 2767 2768 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2769 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2770 return NULL; 2771 } 2772 2773 intr = calloc(1, sizeof(*intr)); 2774 if (intr == NULL) { 2775 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2776 return NULL; 2777 } 2778 2779 if (name) { 2780 snprintf(intr->name, sizeof(intr->name), "%s", name); 2781 } else { 2782 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2783 } 2784 2785 intr->efd = efd; 2786 intr->thread = thread; 2787 intr->fn = fn; 2788 intr->arg = arg; 2789 2790 ret = spdk_fd_group_add(thread->fgrp, efd, _interrupt_wrapper, intr, intr->name); 2791 2792 if (ret != 0) { 2793 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2794 thread->name, efd, spdk_strerror(-ret)); 2795 free(intr); 2796 return NULL; 2797 } 2798 2799 return intr; 2800 } 2801 2802 void 2803 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2804 { 2805 struct spdk_thread *thread; 2806 struct spdk_interrupt *intr; 2807 2808 intr = *pintr; 2809 if (intr == NULL) { 2810 return; 2811 } 2812 2813 *pintr = NULL; 2814 2815 thread = spdk_get_thread(); 2816 if (!thread) { 2817 assert(false); 2818 return; 2819 } 2820 2821 if (intr->thread != thread) { 2822 wrong_thread(__func__, intr->name, intr->thread, thread); 2823 return; 2824 } 2825 2826 spdk_fd_group_remove(thread->fgrp, intr->efd); 2827 free(intr); 2828 } 2829 2830 int 2831 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2832 enum spdk_interrupt_event_types event_types) 2833 { 2834 struct spdk_thread *thread; 2835 2836 thread = spdk_get_thread(); 2837 if (!thread) { 2838 assert(false); 2839 return -EINVAL; 2840 } 2841 2842 if (intr->thread != thread) { 2843 wrong_thread(__func__, intr->name, intr->thread, thread); 2844 return -EINVAL; 2845 } 2846 2847 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2848 } 2849 2850 int 2851 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2852 { 2853 return spdk_fd_group_get_fd(thread->fgrp); 2854 } 2855 2856 struct spdk_fd_group * 2857 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2858 { 2859 return thread->fgrp; 2860 } 2861 2862 static bool g_interrupt_mode = false; 2863 2864 int 2865 spdk_interrupt_mode_enable(void) 2866 { 2867 /* It must be called once prior to initializing the threading library. 2868 * g_spdk_msg_mempool will be valid if thread library is initialized. 2869 */ 2870 if (g_spdk_msg_mempool) { 2871 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2872 return -1; 2873 } 2874 2875 #ifdef __linux__ 2876 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2877 g_interrupt_mode = true; 2878 return 0; 2879 #else 2880 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2881 g_interrupt_mode = false; 2882 return -ENOTSUP; 2883 #endif 2884 } 2885 2886 bool 2887 spdk_interrupt_mode_is_enabled(void) 2888 { 2889 return g_interrupt_mode; 2890 } 2891 2892 #define SSPIN_DEBUG_STACK_FRAMES 16 2893 2894 struct sspin_stack { 2895 void *addrs[SSPIN_DEBUG_STACK_FRAMES]; 2896 uint32_t depth; 2897 }; 2898 2899 struct spdk_spinlock_internal { 2900 struct sspin_stack init_stack; 2901 struct sspin_stack lock_stack; 2902 struct sspin_stack unlock_stack; 2903 }; 2904 2905 static void 2906 sspin_init_internal(struct spdk_spinlock *sspin) 2907 { 2908 #ifdef DEBUG 2909 sspin->internal = calloc(1, sizeof(*sspin->internal)); 2910 #endif 2911 } 2912 2913 static void 2914 sspin_fini_internal(struct spdk_spinlock *sspin) 2915 { 2916 #ifdef DEBUG 2917 free(sspin->internal); 2918 sspin->internal = NULL; 2919 #endif 2920 } 2921 2922 #ifdef DEBUG 2923 #define SSPIN_GET_STACK(sspin, which) \ 2924 do { \ 2925 if (sspin->internal != NULL) { \ 2926 struct sspin_stack *stack = &sspin->internal->which ## _stack; \ 2927 stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \ 2928 } \ 2929 } while (0) 2930 #else 2931 #define SSPIN_GET_STACK(sspin, which) do { } while (0) 2932 #endif 2933 2934 static void 2935 sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack) 2936 { 2937 char **stack; 2938 size_t i; 2939 2940 stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth); 2941 if (stack == NULL) { 2942 SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title); 2943 return; 2944 } 2945 SPDK_ERRLOG(" %s:\n", title); 2946 for (i = 0; i < sspin_stack->depth; i++) { 2947 /* 2948 * This does not print line numbers. In gdb, use something like "list *0x444b6b" or 2949 * "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros 2950 * and use use "print *sspin" or "print sspin->internal.lock_stack". See 2951 * gdb_macros.md in the docs directory for details. 2952 */ 2953 SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]); 2954 } 2955 free(stack); 2956 } 2957 2958 static void 2959 sspin_stacks_print(const struct spdk_spinlock *sspin) 2960 { 2961 if (sspin->internal == NULL) { 2962 return; 2963 } 2964 SPDK_ERRLOG("spinlock %p\n", sspin); 2965 sspin_stack_print("Lock initalized at", &sspin->internal->init_stack); 2966 sspin_stack_print("Last locked at", &sspin->internal->lock_stack); 2967 sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack); 2968 } 2969 2970 void 2971 spdk_spin_init(struct spdk_spinlock *sspin) 2972 { 2973 int rc; 2974 2975 memset(sspin, 0, sizeof(*sspin)); 2976 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 2977 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 2978 sspin_init_internal(sspin); 2979 SSPIN_GET_STACK(sspin, init); 2980 sspin->initialized = true; 2981 } 2982 2983 void 2984 spdk_spin_destroy(struct spdk_spinlock *sspin) 2985 { 2986 int rc; 2987 2988 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 2989 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 2990 SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin); 2991 2992 rc = pthread_spin_destroy(&sspin->spinlock); 2993 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 2994 2995 sspin_fini_internal(sspin); 2996 sspin->initialized = false; 2997 sspin->destroyed = true; 2998 } 2999 3000 void 3001 spdk_spin_lock(struct spdk_spinlock *sspin) 3002 { 3003 struct spdk_thread *thread = spdk_get_thread(); 3004 int rc; 3005 3006 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3007 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3008 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3009 SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin); 3010 3011 rc = pthread_spin_lock(&sspin->spinlock); 3012 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3013 3014 sspin->thread = thread; 3015 sspin->thread->lock_count++; 3016 3017 SSPIN_GET_STACK(sspin, lock); 3018 } 3019 3020 void 3021 spdk_spin_unlock(struct spdk_spinlock *sspin) 3022 { 3023 struct spdk_thread *thread = spdk_get_thread(); 3024 int rc; 3025 3026 SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin); 3027 SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin); 3028 SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin); 3029 SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin); 3030 3031 SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin); 3032 thread->lock_count--; 3033 sspin->thread = NULL; 3034 3035 SSPIN_GET_STACK(sspin, unlock); 3036 3037 rc = pthread_spin_unlock(&sspin->spinlock); 3038 SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin); 3039 } 3040 3041 bool 3042 spdk_spin_held(struct spdk_spinlock *sspin) 3043 { 3044 struct spdk_thread *thread = spdk_get_thread(); 3045 3046 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 3047 3048 return sspin->thread == thread; 3049 } 3050 3051 SPDK_LOG_REGISTER_COMPONENT(thread) 3052