1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #endif 29 30 #define SPDK_MSG_BATCH_SIZE 8 31 #define SPDK_MAX_DEVICE_NAME_LEN 256 32 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 33 #define SPDK_MAX_POLLER_NAME_LEN 256 34 #define SPDK_MAX_THREAD_NAME_LEN 256 35 36 static struct spdk_thread *g_app_thread; 37 38 enum spdk_poller_state { 39 /* The poller is registered with a thread but not currently executing its fn. */ 40 SPDK_POLLER_STATE_WAITING, 41 42 /* The poller is currently running its fn. */ 43 SPDK_POLLER_STATE_RUNNING, 44 45 /* The poller was unregistered during the execution of its fn. */ 46 SPDK_POLLER_STATE_UNREGISTERED, 47 48 /* The poller is in the process of being paused. It will be paused 49 * during the next time it's supposed to be executed. 50 */ 51 SPDK_POLLER_STATE_PAUSING, 52 53 /* The poller is registered but currently paused. It's on the 54 * paused_pollers list. 55 */ 56 SPDK_POLLER_STATE_PAUSED, 57 }; 58 59 struct spdk_poller { 60 TAILQ_ENTRY(spdk_poller) tailq; 61 RB_ENTRY(spdk_poller) node; 62 63 /* Current state of the poller; should only be accessed from the poller's thread. */ 64 enum spdk_poller_state state; 65 66 uint64_t period_ticks; 67 uint64_t next_run_tick; 68 uint64_t run_count; 69 uint64_t busy_count; 70 uint64_t id; 71 spdk_poller_fn fn; 72 void *arg; 73 struct spdk_thread *thread; 74 /* Native interruptfd for period or busy poller */ 75 int interruptfd; 76 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 77 void *set_intr_cb_arg; 78 79 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 80 }; 81 82 enum spdk_thread_state { 83 /* The thread is processing poller and message by spdk_thread_poll(). */ 84 SPDK_THREAD_STATE_RUNNING, 85 86 /* The thread is in the process of termination. It reaps unregistering 87 * poller are releasing I/O channel. 88 */ 89 SPDK_THREAD_STATE_EXITING, 90 91 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 92 SPDK_THREAD_STATE_EXITED, 93 }; 94 95 struct spdk_thread { 96 uint64_t tsc_last; 97 struct spdk_thread_stats stats; 98 /* 99 * Contains pollers actively running on this thread. Pollers 100 * are run round-robin. The thread takes one poller from the head 101 * of the ring, executes it, then puts it back at the tail of 102 * the ring. 103 */ 104 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 105 /** 106 * Contains pollers running on this thread with a periodic timer. 107 */ 108 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 109 struct spdk_poller *first_timed_poller; 110 /* 111 * Contains paused pollers. Pollers on this queue are waiting until 112 * they are resumed (in which case they're put onto the active/timer 113 * queues) or unregistered. 114 */ 115 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 116 struct spdk_ring *messages; 117 int msg_fd; 118 SLIST_HEAD(, spdk_msg) msg_cache; 119 size_t msg_cache_count; 120 spdk_msg_fn critical_msg; 121 uint64_t id; 122 uint64_t next_poller_id; 123 enum spdk_thread_state state; 124 int pending_unregister_count; 125 126 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 127 TAILQ_ENTRY(spdk_thread) tailq; 128 129 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 130 struct spdk_cpuset cpumask; 131 uint64_t exit_timeout_tsc; 132 133 int32_t lock_count; 134 135 /* Indicates whether this spdk_thread currently runs in interrupt. */ 136 bool in_interrupt; 137 bool poller_unregistered; 138 struct spdk_fd_group *fgrp; 139 140 /* User context allocated at the end */ 141 uint8_t ctx[0]; 142 }; 143 144 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 145 146 static spdk_new_thread_fn g_new_thread_fn = NULL; 147 static spdk_thread_op_fn g_thread_op_fn = NULL; 148 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 149 static size_t g_ctx_sz = 0; 150 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 151 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 152 * SPDK application is required. 153 */ 154 static uint64_t g_thread_id = 1; 155 156 enum spin_error { 157 SPIN_ERR_NONE, 158 /* Trying to use an SPDK lock while not on an SPDK thread */ 159 SPIN_ERR_NOT_SPDK_THREAD, 160 /* Trying to lock a lock already held by this SPDK thread */ 161 SPIN_ERR_DEADLOCK, 162 /* Trying to unlock a lock not held by this SPDK thread */ 163 SPIN_ERR_WRONG_THREAD, 164 /* pthread_spin_*() returned an error */ 165 SPIN_ERR_PTHREAD, 166 /* Trying to destroy a lock that is held */ 167 SPIN_ERR_LOCK_HELD, 168 /* lock_count is invalid */ 169 SPIN_ERR_LOCK_COUNT, 170 /* 171 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 172 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 173 * deadlock when another SPDK thread on the same pthread tries to take that lock. 174 */ 175 SPIN_ERR_HOLD_DURING_SWITCH, 176 /* Must be last, not an actual error code */ 177 SPIN_ERR_LAST 178 }; 179 180 static const char *spin_error_strings[] = { 181 [SPIN_ERR_NONE] = "No error", 182 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 183 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 184 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 185 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 186 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 187 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 188 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 189 }; 190 191 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 192 ? "Unknown error" : spin_error_strings[err] 193 194 static void 195 __posix_abort(enum spin_error err) 196 { 197 abort(); 198 } 199 200 typedef void (*spin_abort)(enum spin_error err); 201 spin_abort g_spin_abort_fn = __posix_abort; 202 203 #define SPIN_ASSERT_IMPL(cond, err, ret) \ 204 do { \ 205 if (spdk_unlikely(!(cond))) { \ 206 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 207 SPIN_ERROR_STRING(err), #cond); \ 208 g_spin_abort_fn(err); \ 209 ret; \ 210 } \ 211 } while (0) 212 #define SPIN_ASSERT_RETURN_VOID(cond, err) SPIN_ASSERT_IMPL(cond, err, return) 213 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, return ret) 214 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err,) 215 216 struct io_device { 217 void *io_device; 218 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 219 spdk_io_channel_create_cb create_cb; 220 spdk_io_channel_destroy_cb destroy_cb; 221 spdk_io_device_unregister_cb unregister_cb; 222 struct spdk_thread *unregister_thread; 223 uint32_t ctx_size; 224 uint32_t for_each_count; 225 RB_ENTRY(io_device) node; 226 227 uint32_t refcnt; 228 229 bool pending_unregister; 230 bool unregistered; 231 }; 232 233 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 234 235 static int 236 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 237 { 238 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 239 } 240 241 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 242 243 static int 244 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 245 { 246 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 247 } 248 249 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 250 251 struct spdk_msg { 252 spdk_msg_fn fn; 253 void *arg; 254 255 SLIST_ENTRY(spdk_msg) link; 256 }; 257 258 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 259 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 260 261 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 262 static uint32_t g_thread_count = 0; 263 264 static __thread struct spdk_thread *tls_thread = NULL; 265 266 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 267 { 268 spdk_trace_register_description("THREAD_IOCH_GET", 269 TRACE_THREAD_IOCH_GET, 270 OWNER_NONE, OBJECT_NONE, 0, 271 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 272 spdk_trace_register_description("THREAD_IOCH_PUT", 273 TRACE_THREAD_IOCH_PUT, 274 OWNER_NONE, OBJECT_NONE, 0, 275 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 276 } 277 278 /* 279 * If this compare function returns zero when two next_run_ticks are equal, 280 * the macro RB_INSERT() returns a pointer to the element with the same 281 * next_run_tick. 282 * 283 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 284 * to remove as a parameter. 285 * 286 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 287 * side by returning 1 when two next_run_ticks are equal. 288 */ 289 static inline int 290 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 291 { 292 if (poller1->next_run_tick < poller2->next_run_tick) { 293 return -1; 294 } else { 295 return 1; 296 } 297 } 298 299 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 300 301 static inline struct spdk_thread * 302 _get_thread(void) 303 { 304 return tls_thread; 305 } 306 307 static int 308 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 309 { 310 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 311 312 g_ctx_sz = ctx_sz; 313 314 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 315 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 316 sizeof(struct spdk_msg), 317 0, /* No cache. We do our own. */ 318 SPDK_ENV_SOCKET_ID_ANY); 319 320 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 321 msg_mempool_sz); 322 323 if (!g_spdk_msg_mempool) { 324 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 325 return -ENOMEM; 326 } 327 328 return 0; 329 } 330 331 static void thread_interrupt_destroy(struct spdk_thread *thread); 332 static int thread_interrupt_create(struct spdk_thread *thread); 333 334 static void 335 _free_thread(struct spdk_thread *thread) 336 { 337 struct spdk_io_channel *ch; 338 struct spdk_msg *msg; 339 struct spdk_poller *poller, *ptmp; 340 341 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 342 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 343 thread->name, ch->dev->name); 344 } 345 346 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 347 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 348 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 349 poller->name); 350 } 351 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 352 free(poller); 353 } 354 355 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 356 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 357 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 358 poller->name); 359 } 360 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 361 free(poller); 362 } 363 364 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 365 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 366 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 367 free(poller); 368 } 369 370 pthread_mutex_lock(&g_devlist_mutex); 371 assert(g_thread_count > 0); 372 g_thread_count--; 373 TAILQ_REMOVE(&g_threads, thread, tailq); 374 pthread_mutex_unlock(&g_devlist_mutex); 375 376 msg = SLIST_FIRST(&thread->msg_cache); 377 while (msg != NULL) { 378 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 379 380 assert(thread->msg_cache_count > 0); 381 thread->msg_cache_count--; 382 spdk_mempool_put(g_spdk_msg_mempool, msg); 383 384 msg = SLIST_FIRST(&thread->msg_cache); 385 } 386 387 assert(thread->msg_cache_count == 0); 388 389 if (spdk_interrupt_mode_is_enabled()) { 390 thread_interrupt_destroy(thread); 391 } 392 393 spdk_ring_free(thread->messages); 394 free(thread); 395 } 396 397 int 398 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 399 { 400 assert(g_new_thread_fn == NULL); 401 assert(g_thread_op_fn == NULL); 402 403 if (new_thread_fn == NULL) { 404 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 405 } else { 406 g_new_thread_fn = new_thread_fn; 407 } 408 409 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 410 } 411 412 int 413 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 414 spdk_thread_op_supported_fn thread_op_supported_fn, 415 size_t ctx_sz, size_t msg_mempool_sz) 416 { 417 assert(g_new_thread_fn == NULL); 418 assert(g_thread_op_fn == NULL); 419 assert(g_thread_op_supported_fn == NULL); 420 421 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 422 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 423 return -EINVAL; 424 } 425 426 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 427 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 428 } else { 429 g_thread_op_fn = thread_op_fn; 430 g_thread_op_supported_fn = thread_op_supported_fn; 431 } 432 433 return _thread_lib_init(ctx_sz, msg_mempool_sz); 434 } 435 436 void 437 spdk_thread_lib_fini(void) 438 { 439 struct io_device *dev; 440 441 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 442 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 443 } 444 445 g_new_thread_fn = NULL; 446 g_thread_op_fn = NULL; 447 g_thread_op_supported_fn = NULL; 448 g_ctx_sz = 0; 449 if (g_app_thread != NULL) { 450 _free_thread(g_app_thread); 451 g_app_thread = NULL; 452 } 453 454 if (g_spdk_msg_mempool) { 455 spdk_mempool_free(g_spdk_msg_mempool); 456 g_spdk_msg_mempool = NULL; 457 } 458 } 459 460 struct spdk_thread * 461 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 462 { 463 struct spdk_thread *thread, *null_thread; 464 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 465 int rc = 0, i; 466 467 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 468 if (!thread) { 469 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 470 return NULL; 471 } 472 473 if (cpumask) { 474 spdk_cpuset_copy(&thread->cpumask, cpumask); 475 } else { 476 spdk_cpuset_negate(&thread->cpumask); 477 } 478 479 RB_INIT(&thread->io_channels); 480 TAILQ_INIT(&thread->active_pollers); 481 RB_INIT(&thread->timed_pollers); 482 TAILQ_INIT(&thread->paused_pollers); 483 SLIST_INIT(&thread->msg_cache); 484 thread->msg_cache_count = 0; 485 486 thread->tsc_last = spdk_get_ticks(); 487 488 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 489 * ID exceeds UINT64_MAX a warning message is logged 490 */ 491 thread->next_poller_id = 1; 492 493 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 494 if (!thread->messages) { 495 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 496 free(thread); 497 return NULL; 498 } 499 500 /* Fill the local message pool cache. */ 501 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 502 if (rc == 0) { 503 /* If we can't populate the cache it's ok. The cache will get filled 504 * up organically as messages are passed to the thread. */ 505 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 506 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 507 thread->msg_cache_count++; 508 } 509 } 510 511 if (name) { 512 snprintf(thread->name, sizeof(thread->name), "%s", name); 513 } else { 514 snprintf(thread->name, sizeof(thread->name), "%p", thread); 515 } 516 517 pthread_mutex_lock(&g_devlist_mutex); 518 if (g_thread_id == 0) { 519 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 520 pthread_mutex_unlock(&g_devlist_mutex); 521 _free_thread(thread); 522 return NULL; 523 } 524 thread->id = g_thread_id++; 525 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 526 g_thread_count++; 527 pthread_mutex_unlock(&g_devlist_mutex); 528 529 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 530 thread->id, thread->name); 531 532 if (spdk_interrupt_mode_is_enabled()) { 533 thread->in_interrupt = true; 534 rc = thread_interrupt_create(thread); 535 if (rc != 0) { 536 _free_thread(thread); 537 return NULL; 538 } 539 } 540 541 if (g_new_thread_fn) { 542 rc = g_new_thread_fn(thread); 543 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 544 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 545 } 546 547 if (rc != 0) { 548 _free_thread(thread); 549 return NULL; 550 } 551 552 thread->state = SPDK_THREAD_STATE_RUNNING; 553 554 /* If this is the first thread, save it as the app thread. Use an atomic 555 * compare + exchange to guard against crazy users who might try to 556 * call spdk_thread_create() simultaneously on multiple threads. 557 */ 558 null_thread = NULL; 559 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 560 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 561 562 return thread; 563 } 564 565 struct spdk_thread * 566 spdk_thread_get_app_thread(void) 567 { 568 return g_app_thread; 569 } 570 571 void 572 spdk_set_thread(struct spdk_thread *thread) 573 { 574 tls_thread = thread; 575 } 576 577 static void 578 thread_exit(struct spdk_thread *thread, uint64_t now) 579 { 580 struct spdk_poller *poller; 581 struct spdk_io_channel *ch; 582 583 if (now >= thread->exit_timeout_tsc) { 584 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 585 thread->name); 586 goto exited; 587 } 588 589 if (spdk_ring_count(thread->messages) > 0) { 590 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 591 return; 592 } 593 594 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 595 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 596 SPDK_INFOLOG(thread, 597 "thread %s still has active poller %s\n", 598 thread->name, poller->name); 599 return; 600 } 601 } 602 603 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 604 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 605 SPDK_INFOLOG(thread, 606 "thread %s still has active timed poller %s\n", 607 thread->name, poller->name); 608 return; 609 } 610 } 611 612 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 613 SPDK_INFOLOG(thread, 614 "thread %s still has paused poller %s\n", 615 thread->name, poller->name); 616 return; 617 } 618 619 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 620 SPDK_INFOLOG(thread, 621 "thread %s still has channel for io_device %s\n", 622 thread->name, ch->dev->name); 623 return; 624 } 625 626 if (thread->pending_unregister_count > 0) { 627 SPDK_INFOLOG(thread, 628 "thread %s is still unregistering io_devices\n", 629 thread->name); 630 return; 631 } 632 633 exited: 634 thread->state = SPDK_THREAD_STATE_EXITED; 635 if (spdk_unlikely(thread->in_interrupt)) { 636 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 637 } 638 } 639 640 int 641 spdk_thread_exit(struct spdk_thread *thread) 642 { 643 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 644 645 assert(tls_thread == thread); 646 647 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 648 SPDK_INFOLOG(thread, 649 "thread %s is already exiting\n", 650 thread->name); 651 return 0; 652 } 653 654 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 655 SPDK_THREAD_EXIT_TIMEOUT_SEC); 656 thread->state = SPDK_THREAD_STATE_EXITING; 657 return 0; 658 } 659 660 bool 661 spdk_thread_is_running(struct spdk_thread *thread) 662 { 663 return thread->state == SPDK_THREAD_STATE_RUNNING; 664 } 665 666 bool 667 spdk_thread_is_exited(struct spdk_thread *thread) 668 { 669 return thread->state == SPDK_THREAD_STATE_EXITED; 670 } 671 672 void 673 spdk_thread_destroy(struct spdk_thread *thread) 674 { 675 assert(thread != NULL); 676 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 677 678 assert(thread->state == SPDK_THREAD_STATE_EXITED); 679 680 if (tls_thread == thread) { 681 tls_thread = NULL; 682 } 683 684 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 685 if (thread != g_app_thread) { 686 _free_thread(thread); 687 } 688 } 689 690 void * 691 spdk_thread_get_ctx(struct spdk_thread *thread) 692 { 693 if (g_ctx_sz > 0) { 694 return thread->ctx; 695 } 696 697 return NULL; 698 } 699 700 struct spdk_cpuset * 701 spdk_thread_get_cpumask(struct spdk_thread *thread) 702 { 703 return &thread->cpumask; 704 } 705 706 int 707 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 708 { 709 struct spdk_thread *thread; 710 711 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 712 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 713 assert(false); 714 return -ENOTSUP; 715 } 716 717 thread = spdk_get_thread(); 718 if (!thread) { 719 SPDK_ERRLOG("Called from non-SPDK thread\n"); 720 assert(false); 721 return -EINVAL; 722 } 723 724 spdk_cpuset_copy(&thread->cpumask, cpumask); 725 726 /* Invoke framework's reschedule operation. If this function is called multiple times 727 * in a single spdk_thread_poll() context, the last cpumask will be used in the 728 * reschedule operation. 729 */ 730 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 731 732 return 0; 733 } 734 735 struct spdk_thread * 736 spdk_thread_get_from_ctx(void *ctx) 737 { 738 if (ctx == NULL) { 739 assert(false); 740 return NULL; 741 } 742 743 assert(g_ctx_sz > 0); 744 745 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 746 } 747 748 static inline uint32_t 749 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 750 { 751 unsigned count, i; 752 void *messages[SPDK_MSG_BATCH_SIZE]; 753 uint64_t notify = 1; 754 int rc; 755 756 #ifdef DEBUG 757 /* 758 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 759 * so we will never actually read uninitialized data from events, but just to be sure 760 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 761 */ 762 memset(messages, 0, sizeof(messages)); 763 #endif 764 765 if (max_msgs > 0) { 766 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 767 } else { 768 max_msgs = SPDK_MSG_BATCH_SIZE; 769 } 770 771 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 772 if (spdk_unlikely(thread->in_interrupt) && 773 spdk_ring_count(thread->messages) != 0) { 774 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 775 if (rc < 0) { 776 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 777 } 778 } 779 if (count == 0) { 780 return 0; 781 } 782 783 for (i = 0; i < count; i++) { 784 struct spdk_msg *msg = messages[i]; 785 786 assert(msg != NULL); 787 788 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 789 790 msg->fn(msg->arg); 791 792 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 793 794 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 795 /* Insert the messages at the head. We want to re-use the hot 796 * ones. */ 797 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 798 thread->msg_cache_count++; 799 } else { 800 spdk_mempool_put(g_spdk_msg_mempool, msg); 801 } 802 } 803 804 return count; 805 } 806 807 static void 808 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 809 { 810 struct spdk_poller *tmp __attribute__((unused)); 811 812 poller->next_run_tick = now + poller->period_ticks; 813 814 /* 815 * Insert poller in the thread's timed_pollers tree by next scheduled run time 816 * as its key. 817 */ 818 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 819 assert(tmp == NULL); 820 821 /* Update the cache only if it is empty or the inserted poller is earlier than it. 822 * RB_MIN() is not necessary here because all pollers, which has exactly the same 823 * next_run_tick as the existing poller, are inserted on the right side. 824 */ 825 if (thread->first_timed_poller == NULL || 826 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 827 thread->first_timed_poller = poller; 828 } 829 } 830 831 static inline void 832 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 833 { 834 struct spdk_poller *tmp __attribute__((unused)); 835 836 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 837 assert(tmp != NULL); 838 839 /* This function is not used in any case that is performance critical. 840 * Update the cache simply by RB_MIN() if it needs to be changed. 841 */ 842 if (thread->first_timed_poller == poller) { 843 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 844 } 845 } 846 847 static void 848 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 849 { 850 if (poller->period_ticks) { 851 poller_insert_timer(thread, poller, spdk_get_ticks()); 852 } else { 853 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 854 } 855 } 856 857 static inline void 858 thread_update_stats(struct spdk_thread *thread, uint64_t end, 859 uint64_t start, int rc) 860 { 861 if (rc == 0) { 862 /* Poller status idle */ 863 thread->stats.idle_tsc += end - start; 864 } else if (rc > 0) { 865 /* Poller status busy */ 866 thread->stats.busy_tsc += end - start; 867 } 868 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 869 thread->tsc_last = end; 870 } 871 872 static inline int 873 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 874 { 875 int rc; 876 877 switch (poller->state) { 878 case SPDK_POLLER_STATE_UNREGISTERED: 879 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 880 free(poller); 881 return 0; 882 case SPDK_POLLER_STATE_PAUSING: 883 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 884 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 885 poller->state = SPDK_POLLER_STATE_PAUSED; 886 return 0; 887 case SPDK_POLLER_STATE_WAITING: 888 break; 889 default: 890 assert(false); 891 break; 892 } 893 894 poller->state = SPDK_POLLER_STATE_RUNNING; 895 rc = poller->fn(poller->arg); 896 897 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 898 899 poller->run_count++; 900 if (rc > 0) { 901 poller->busy_count++; 902 } 903 904 #ifdef DEBUG 905 if (rc == -1) { 906 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 907 } 908 #endif 909 910 switch (poller->state) { 911 case SPDK_POLLER_STATE_UNREGISTERED: 912 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 913 free(poller); 914 break; 915 case SPDK_POLLER_STATE_PAUSING: 916 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 917 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 918 poller->state = SPDK_POLLER_STATE_PAUSED; 919 break; 920 case SPDK_POLLER_STATE_PAUSED: 921 case SPDK_POLLER_STATE_WAITING: 922 break; 923 case SPDK_POLLER_STATE_RUNNING: 924 poller->state = SPDK_POLLER_STATE_WAITING; 925 break; 926 default: 927 assert(false); 928 break; 929 } 930 931 return rc; 932 } 933 934 static inline int 935 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 936 uint64_t now) 937 { 938 int rc; 939 940 switch (poller->state) { 941 case SPDK_POLLER_STATE_UNREGISTERED: 942 free(poller); 943 return 0; 944 case SPDK_POLLER_STATE_PAUSING: 945 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 946 poller->state = SPDK_POLLER_STATE_PAUSED; 947 return 0; 948 case SPDK_POLLER_STATE_WAITING: 949 break; 950 default: 951 assert(false); 952 break; 953 } 954 955 poller->state = SPDK_POLLER_STATE_RUNNING; 956 rc = poller->fn(poller->arg); 957 958 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 959 960 poller->run_count++; 961 if (rc > 0) { 962 poller->busy_count++; 963 } 964 965 #ifdef DEBUG 966 if (rc == -1) { 967 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 968 } 969 #endif 970 971 switch (poller->state) { 972 case SPDK_POLLER_STATE_UNREGISTERED: 973 free(poller); 974 break; 975 case SPDK_POLLER_STATE_PAUSING: 976 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 977 poller->state = SPDK_POLLER_STATE_PAUSED; 978 break; 979 case SPDK_POLLER_STATE_PAUSED: 980 break; 981 case SPDK_POLLER_STATE_RUNNING: 982 poller->state = SPDK_POLLER_STATE_WAITING; 983 /* fallthrough */ 984 case SPDK_POLLER_STATE_WAITING: 985 poller_insert_timer(thread, poller, now); 986 break; 987 default: 988 assert(false); 989 break; 990 } 991 992 return rc; 993 } 994 995 static int 996 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 997 { 998 uint32_t msg_count; 999 struct spdk_poller *poller, *tmp; 1000 spdk_msg_fn critical_msg; 1001 int rc = 0; 1002 1003 thread->tsc_last = now; 1004 1005 critical_msg = thread->critical_msg; 1006 if (spdk_unlikely(critical_msg != NULL)) { 1007 critical_msg(NULL); 1008 thread->critical_msg = NULL; 1009 rc = 1; 1010 } 1011 1012 msg_count = msg_queue_run_batch(thread, max_msgs); 1013 if (msg_count) { 1014 rc = 1; 1015 } 1016 1017 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1018 active_pollers_head, tailq, tmp) { 1019 int poller_rc; 1020 1021 poller_rc = thread_execute_poller(thread, poller); 1022 if (poller_rc > rc) { 1023 rc = poller_rc; 1024 } 1025 } 1026 1027 poller = thread->first_timed_poller; 1028 while (poller != NULL) { 1029 int timer_rc = 0; 1030 1031 if (now < poller->next_run_tick) { 1032 break; 1033 } 1034 1035 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1036 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1037 1038 /* Update the cache to the next timed poller in the list 1039 * only if the current poller is still the closest, otherwise, 1040 * do nothing because the cache has been already updated. 1041 */ 1042 if (thread->first_timed_poller == poller) { 1043 thread->first_timed_poller = tmp; 1044 } 1045 1046 timer_rc = thread_execute_timed_poller(thread, poller, now); 1047 if (timer_rc > rc) { 1048 rc = timer_rc; 1049 } 1050 1051 poller = tmp; 1052 } 1053 1054 return rc; 1055 } 1056 1057 int 1058 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1059 { 1060 struct spdk_thread *orig_thread; 1061 int rc; 1062 uint64_t notify = 1; 1063 1064 orig_thread = _get_thread(); 1065 tls_thread = thread; 1066 1067 if (now == 0) { 1068 now = spdk_get_ticks(); 1069 } 1070 1071 if (spdk_likely(!thread->in_interrupt)) { 1072 rc = thread_poll(thread, max_msgs, now); 1073 if (spdk_unlikely(thread->in_interrupt)) { 1074 /* The thread transitioned to interrupt mode during the above poll. 1075 * Poll it one more time in case that during the transition time 1076 * there is msg received without notification. 1077 */ 1078 rc = thread_poll(thread, max_msgs, now); 1079 } 1080 } else { 1081 /* Non-block wait on thread's fd_group */ 1082 rc = spdk_fd_group_wait(thread->fgrp, 0); 1083 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1084 if (spdk_unlikely(!thread->in_interrupt)) { 1085 /* The thread transitioned to poll mode in a msg during the above processing. 1086 * Clear msg_fd since thread messages will be polled directly in poll mode. 1087 */ 1088 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 1089 if (rc < 0 && errno != EAGAIN) { 1090 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 1091 } 1092 } 1093 1094 /* Reap unregistered pollers out of poller execution in intr mode */ 1095 if (spdk_unlikely(thread->poller_unregistered)) { 1096 struct spdk_poller *poller, *tmp; 1097 1098 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1099 active_pollers_head, tailq, tmp) { 1100 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1101 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1102 free(poller); 1103 } 1104 } 1105 1106 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1107 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1108 poller_remove_timer(thread, poller); 1109 free(poller); 1110 } 1111 } 1112 1113 thread->poller_unregistered = false; 1114 } 1115 } 1116 1117 1118 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1119 thread_exit(thread, now); 1120 } 1121 1122 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1123 1124 tls_thread = orig_thread; 1125 1126 return rc; 1127 } 1128 1129 uint64_t 1130 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1131 { 1132 struct spdk_poller *poller; 1133 1134 poller = thread->first_timed_poller; 1135 if (poller) { 1136 return poller->next_run_tick; 1137 } 1138 1139 return 0; 1140 } 1141 1142 int 1143 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1144 { 1145 return !TAILQ_EMPTY(&thread->active_pollers); 1146 } 1147 1148 static bool 1149 thread_has_unpaused_pollers(struct spdk_thread *thread) 1150 { 1151 if (TAILQ_EMPTY(&thread->active_pollers) && 1152 RB_EMPTY(&thread->timed_pollers)) { 1153 return false; 1154 } 1155 1156 return true; 1157 } 1158 1159 bool 1160 spdk_thread_has_pollers(struct spdk_thread *thread) 1161 { 1162 if (!thread_has_unpaused_pollers(thread) && 1163 TAILQ_EMPTY(&thread->paused_pollers)) { 1164 return false; 1165 } 1166 1167 return true; 1168 } 1169 1170 bool 1171 spdk_thread_is_idle(struct spdk_thread *thread) 1172 { 1173 if (spdk_ring_count(thread->messages) || 1174 thread_has_unpaused_pollers(thread) || 1175 thread->critical_msg != NULL) { 1176 return false; 1177 } 1178 1179 return true; 1180 } 1181 1182 uint32_t 1183 spdk_thread_get_count(void) 1184 { 1185 /* 1186 * Return cached value of the current thread count. We could acquire the 1187 * lock and iterate through the TAILQ of threads to count them, but that 1188 * count could still be invalidated after we release the lock. 1189 */ 1190 return g_thread_count; 1191 } 1192 1193 struct spdk_thread * 1194 spdk_get_thread(void) 1195 { 1196 return _get_thread(); 1197 } 1198 1199 const char * 1200 spdk_thread_get_name(const struct spdk_thread *thread) 1201 { 1202 return thread->name; 1203 } 1204 1205 uint64_t 1206 spdk_thread_get_id(const struct spdk_thread *thread) 1207 { 1208 return thread->id; 1209 } 1210 1211 struct spdk_thread * 1212 spdk_thread_get_by_id(uint64_t id) 1213 { 1214 struct spdk_thread *thread; 1215 1216 if (id == 0 || id >= g_thread_id) { 1217 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1218 return NULL; 1219 } 1220 pthread_mutex_lock(&g_devlist_mutex); 1221 TAILQ_FOREACH(thread, &g_threads, tailq) { 1222 if (thread->id == id) { 1223 break; 1224 } 1225 } 1226 pthread_mutex_unlock(&g_devlist_mutex); 1227 return thread; 1228 } 1229 1230 int 1231 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1232 { 1233 struct spdk_thread *thread; 1234 1235 thread = _get_thread(); 1236 if (!thread) { 1237 SPDK_ERRLOG("No thread allocated\n"); 1238 return -EINVAL; 1239 } 1240 1241 if (stats == NULL) { 1242 return -EINVAL; 1243 } 1244 1245 *stats = thread->stats; 1246 1247 return 0; 1248 } 1249 1250 uint64_t 1251 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1252 { 1253 if (thread == NULL) { 1254 thread = _get_thread(); 1255 } 1256 1257 return thread->tsc_last; 1258 } 1259 1260 static inline int 1261 thread_send_msg_notification(const struct spdk_thread *target_thread) 1262 { 1263 uint64_t notify = 1; 1264 int rc; 1265 1266 /* Not necessary to do notification if interrupt facility is not enabled */ 1267 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1268 return 0; 1269 } 1270 1271 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1272 * after sending thread msg, it is necessary to check whether target thread runs in 1273 * interrupt mode and then decide whether do event notification. 1274 */ 1275 if (spdk_unlikely(target_thread->in_interrupt)) { 1276 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1277 if (rc < 0) { 1278 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1279 return -EIO; 1280 } 1281 } 1282 1283 return 0; 1284 } 1285 1286 int 1287 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1288 { 1289 struct spdk_thread *local_thread; 1290 struct spdk_msg *msg; 1291 int rc; 1292 1293 assert(thread != NULL); 1294 1295 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1296 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1297 return -EIO; 1298 } 1299 1300 local_thread = _get_thread(); 1301 1302 msg = NULL; 1303 if (local_thread != NULL) { 1304 if (local_thread->msg_cache_count > 0) { 1305 msg = SLIST_FIRST(&local_thread->msg_cache); 1306 assert(msg != NULL); 1307 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1308 local_thread->msg_cache_count--; 1309 } 1310 } 1311 1312 if (msg == NULL) { 1313 msg = spdk_mempool_get(g_spdk_msg_mempool); 1314 if (!msg) { 1315 SPDK_ERRLOG("msg could not be allocated\n"); 1316 return -ENOMEM; 1317 } 1318 } 1319 1320 msg->fn = fn; 1321 msg->arg = ctx; 1322 1323 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1324 if (rc != 1) { 1325 SPDK_ERRLOG("msg could not be enqueued\n"); 1326 spdk_mempool_put(g_spdk_msg_mempool, msg); 1327 return -EIO; 1328 } 1329 1330 return thread_send_msg_notification(thread); 1331 } 1332 1333 int 1334 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1335 { 1336 spdk_msg_fn expected = NULL; 1337 1338 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1339 __ATOMIC_SEQ_CST)) { 1340 return -EIO; 1341 } 1342 1343 return thread_send_msg_notification(thread); 1344 } 1345 1346 #ifdef __linux__ 1347 static int 1348 interrupt_timerfd_process(void *arg) 1349 { 1350 struct spdk_poller *poller = arg; 1351 uint64_t exp; 1352 int rc; 1353 1354 /* clear the level of interval timer */ 1355 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1356 if (rc < 0) { 1357 if (rc == -EAGAIN) { 1358 return 0; 1359 } 1360 1361 return rc; 1362 } 1363 1364 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1365 1366 return poller->fn(poller->arg); 1367 } 1368 1369 static int 1370 period_poller_interrupt_init(struct spdk_poller *poller) 1371 { 1372 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1373 int timerfd; 1374 int rc; 1375 1376 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1377 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1378 if (timerfd < 0) { 1379 return -errno; 1380 } 1381 1382 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1383 if (rc < 0) { 1384 close(timerfd); 1385 return rc; 1386 } 1387 1388 poller->interruptfd = timerfd; 1389 return 0; 1390 } 1391 1392 static void 1393 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1394 { 1395 int timerfd = poller->interruptfd; 1396 uint64_t now_tick = spdk_get_ticks(); 1397 uint64_t ticks = spdk_get_ticks_hz(); 1398 int ret; 1399 struct itimerspec new_tv = {}; 1400 struct itimerspec old_tv = {}; 1401 1402 assert(poller->period_ticks != 0); 1403 assert(timerfd >= 0); 1404 1405 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1406 interrupt_mode ? "interrupt" : "poll"); 1407 1408 if (interrupt_mode) { 1409 /* Set repeated timer expiration */ 1410 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1411 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1412 1413 /* Update next timer expiration */ 1414 if (poller->next_run_tick == 0) { 1415 poller->next_run_tick = now_tick + poller->period_ticks; 1416 } else if (poller->next_run_tick < now_tick) { 1417 poller->next_run_tick = now_tick; 1418 } 1419 1420 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1421 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1422 1423 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1424 if (ret < 0) { 1425 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1426 assert(false); 1427 } 1428 } else { 1429 /* Disarm the timer */ 1430 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1431 if (ret < 0) { 1432 /* timerfd_settime's failure indicates that the timerfd is in error */ 1433 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1434 assert(false); 1435 } 1436 1437 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1438 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1439 */ 1440 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1441 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1442 poller_remove_timer(poller->thread, poller); 1443 poller_insert_timer(poller->thread, poller, now_tick); 1444 } 1445 } 1446 1447 static void 1448 poller_interrupt_fini(struct spdk_poller *poller) 1449 { 1450 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1451 assert(poller->interruptfd >= 0); 1452 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1453 close(poller->interruptfd); 1454 poller->interruptfd = -1; 1455 } 1456 1457 static int 1458 busy_poller_interrupt_init(struct spdk_poller *poller) 1459 { 1460 int busy_efd; 1461 int rc; 1462 1463 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1464 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1465 if (busy_efd < 0) { 1466 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1467 return -errno; 1468 } 1469 1470 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1471 poller->fn, poller->arg, poller->name); 1472 if (rc < 0) { 1473 close(busy_efd); 1474 return rc; 1475 } 1476 1477 poller->interruptfd = busy_efd; 1478 return 0; 1479 } 1480 1481 static void 1482 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1483 { 1484 int busy_efd = poller->interruptfd; 1485 uint64_t notify = 1; 1486 int rc __attribute__((unused)); 1487 1488 assert(busy_efd >= 0); 1489 1490 if (interrupt_mode) { 1491 /* Write without read on eventfd will get it repeatedly triggered. */ 1492 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1493 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1494 } 1495 } else { 1496 /* Read on eventfd will clear its level triggering. */ 1497 rc = read(busy_efd, ¬ify, sizeof(notify)); 1498 } 1499 } 1500 1501 #else 1502 1503 static int 1504 period_poller_interrupt_init(struct spdk_poller *poller) 1505 { 1506 return -ENOTSUP; 1507 } 1508 1509 static void 1510 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1511 { 1512 } 1513 1514 static void 1515 poller_interrupt_fini(struct spdk_poller *poller) 1516 { 1517 } 1518 1519 static int 1520 busy_poller_interrupt_init(struct spdk_poller *poller) 1521 { 1522 return -ENOTSUP; 1523 } 1524 1525 static void 1526 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1527 { 1528 } 1529 1530 #endif 1531 1532 void 1533 spdk_poller_register_interrupt(struct spdk_poller *poller, 1534 spdk_poller_set_interrupt_mode_cb cb_fn, 1535 void *cb_arg) 1536 { 1537 assert(poller != NULL); 1538 assert(cb_fn != NULL); 1539 assert(spdk_get_thread() == poller->thread); 1540 1541 if (!spdk_interrupt_mode_is_enabled()) { 1542 return; 1543 } 1544 1545 /* when a poller is created we don't know if the user is ever going to 1546 * enable interrupts on it by calling this function, so the poller 1547 * registration function has to immediately create a interruptfd. 1548 * When this function does get called by user, we have to then destroy 1549 * that interruptfd. 1550 */ 1551 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1552 poller_interrupt_fini(poller); 1553 } 1554 1555 poller->set_intr_cb_fn = cb_fn; 1556 poller->set_intr_cb_arg = cb_arg; 1557 1558 /* Set poller into interrupt mode if thread is in interrupt. */ 1559 if (poller->thread->in_interrupt) { 1560 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1561 } 1562 } 1563 1564 static uint64_t 1565 convert_us_to_ticks(uint64_t us) 1566 { 1567 uint64_t quotient, remainder, ticks; 1568 1569 if (us) { 1570 quotient = us / SPDK_SEC_TO_USEC; 1571 remainder = us % SPDK_SEC_TO_USEC; 1572 ticks = spdk_get_ticks_hz(); 1573 1574 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1575 } else { 1576 return 0; 1577 } 1578 } 1579 1580 static struct spdk_poller * 1581 poller_register(spdk_poller_fn fn, 1582 void *arg, 1583 uint64_t period_microseconds, 1584 const char *name) 1585 { 1586 struct spdk_thread *thread; 1587 struct spdk_poller *poller; 1588 1589 thread = spdk_get_thread(); 1590 if (!thread) { 1591 assert(false); 1592 return NULL; 1593 } 1594 1595 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1596 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1597 return NULL; 1598 } 1599 1600 poller = calloc(1, sizeof(*poller)); 1601 if (poller == NULL) { 1602 SPDK_ERRLOG("Poller memory allocation failed\n"); 1603 return NULL; 1604 } 1605 1606 if (name) { 1607 snprintf(poller->name, sizeof(poller->name), "%s", name); 1608 } else { 1609 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1610 } 1611 1612 poller->state = SPDK_POLLER_STATE_WAITING; 1613 poller->fn = fn; 1614 poller->arg = arg; 1615 poller->thread = thread; 1616 poller->interruptfd = -1; 1617 if (thread->next_poller_id == 0) { 1618 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1619 thread->next_poller_id = 1; 1620 } 1621 poller->id = thread->next_poller_id++; 1622 1623 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1624 1625 if (spdk_interrupt_mode_is_enabled()) { 1626 int rc; 1627 1628 if (period_microseconds) { 1629 rc = period_poller_interrupt_init(poller); 1630 if (rc < 0) { 1631 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1632 free(poller); 1633 return NULL; 1634 } 1635 1636 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1637 } else { 1638 /* If the poller doesn't have a period, create interruptfd that's always 1639 * busy automatically when running in interrupt mode. 1640 */ 1641 rc = busy_poller_interrupt_init(poller); 1642 if (rc > 0) { 1643 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1644 free(poller); 1645 return NULL; 1646 } 1647 1648 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1649 } 1650 } 1651 1652 thread_insert_poller(thread, poller); 1653 1654 return poller; 1655 } 1656 1657 struct spdk_poller * 1658 spdk_poller_register(spdk_poller_fn fn, 1659 void *arg, 1660 uint64_t period_microseconds) 1661 { 1662 return poller_register(fn, arg, period_microseconds, NULL); 1663 } 1664 1665 struct spdk_poller * 1666 spdk_poller_register_named(spdk_poller_fn fn, 1667 void *arg, 1668 uint64_t period_microseconds, 1669 const char *name) 1670 { 1671 return poller_register(fn, arg, period_microseconds, name); 1672 } 1673 1674 static void 1675 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1676 struct spdk_thread *curthread) 1677 { 1678 if (thread == NULL) { 1679 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1680 abort(); 1681 } 1682 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1683 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1684 thread->name, thread->id); 1685 assert(false); 1686 } 1687 1688 void 1689 spdk_poller_unregister(struct spdk_poller **ppoller) 1690 { 1691 struct spdk_thread *thread; 1692 struct spdk_poller *poller; 1693 1694 poller = *ppoller; 1695 if (poller == NULL) { 1696 return; 1697 } 1698 1699 *ppoller = NULL; 1700 1701 thread = spdk_get_thread(); 1702 if (!thread) { 1703 assert(false); 1704 return; 1705 } 1706 1707 if (poller->thread != thread) { 1708 wrong_thread(__func__, poller->name, poller->thread, thread); 1709 return; 1710 } 1711 1712 if (spdk_interrupt_mode_is_enabled()) { 1713 /* Release the interrupt resource for period or busy poller */ 1714 if (poller->interruptfd >= 0) { 1715 poller_interrupt_fini(poller); 1716 } 1717 1718 /* Mark there is poller unregistered. Then unregistered pollers will 1719 * get reaped by spdk_thread_poll also in intr mode. 1720 */ 1721 thread->poller_unregistered = true; 1722 } 1723 1724 /* If the poller was paused, put it on the active_pollers list so that 1725 * its unregistration can be processed by spdk_thread_poll(). 1726 */ 1727 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1728 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1729 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1730 poller->period_ticks = 0; 1731 } 1732 1733 /* Simply set the state to unregistered. The poller will get cleaned up 1734 * in a subsequent call to spdk_thread_poll(). 1735 */ 1736 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1737 } 1738 1739 void 1740 spdk_poller_pause(struct spdk_poller *poller) 1741 { 1742 struct spdk_thread *thread; 1743 1744 thread = spdk_get_thread(); 1745 if (!thread) { 1746 assert(false); 1747 return; 1748 } 1749 1750 if (poller->thread != thread) { 1751 wrong_thread(__func__, poller->name, poller->thread, thread); 1752 return; 1753 } 1754 1755 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1756 * spdk_thread_poll() move it. It allows a poller to be paused from 1757 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1758 * iteration, or from within itself without breaking the logic to always 1759 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1760 */ 1761 switch (poller->state) { 1762 case SPDK_POLLER_STATE_PAUSED: 1763 case SPDK_POLLER_STATE_PAUSING: 1764 break; 1765 case SPDK_POLLER_STATE_RUNNING: 1766 case SPDK_POLLER_STATE_WAITING: 1767 poller->state = SPDK_POLLER_STATE_PAUSING; 1768 break; 1769 default: 1770 assert(false); 1771 break; 1772 } 1773 } 1774 1775 void 1776 spdk_poller_resume(struct spdk_poller *poller) 1777 { 1778 struct spdk_thread *thread; 1779 1780 thread = spdk_get_thread(); 1781 if (!thread) { 1782 assert(false); 1783 return; 1784 } 1785 1786 if (poller->thread != thread) { 1787 wrong_thread(__func__, poller->name, poller->thread, thread); 1788 return; 1789 } 1790 1791 /* If a poller is paused it has to be removed from the paused pollers 1792 * list and put on the active list or timer tree depending on its 1793 * period_ticks. If a poller is still in the process of being paused, 1794 * we just need to flip its state back to waiting, as it's already on 1795 * the appropriate list or tree. 1796 */ 1797 switch (poller->state) { 1798 case SPDK_POLLER_STATE_PAUSED: 1799 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1800 thread_insert_poller(thread, poller); 1801 /* fallthrough */ 1802 case SPDK_POLLER_STATE_PAUSING: 1803 poller->state = SPDK_POLLER_STATE_WAITING; 1804 break; 1805 case SPDK_POLLER_STATE_RUNNING: 1806 case SPDK_POLLER_STATE_WAITING: 1807 break; 1808 default: 1809 assert(false); 1810 break; 1811 } 1812 } 1813 1814 const char * 1815 spdk_poller_get_name(struct spdk_poller *poller) 1816 { 1817 return poller->name; 1818 } 1819 1820 uint64_t 1821 spdk_poller_get_id(struct spdk_poller *poller) 1822 { 1823 return poller->id; 1824 } 1825 1826 const char * 1827 spdk_poller_get_state_str(struct spdk_poller *poller) 1828 { 1829 switch (poller->state) { 1830 case SPDK_POLLER_STATE_WAITING: 1831 return "waiting"; 1832 case SPDK_POLLER_STATE_RUNNING: 1833 return "running"; 1834 case SPDK_POLLER_STATE_UNREGISTERED: 1835 return "unregistered"; 1836 case SPDK_POLLER_STATE_PAUSING: 1837 return "pausing"; 1838 case SPDK_POLLER_STATE_PAUSED: 1839 return "paused"; 1840 default: 1841 return NULL; 1842 } 1843 } 1844 1845 uint64_t 1846 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1847 { 1848 return poller->period_ticks; 1849 } 1850 1851 void 1852 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1853 { 1854 stats->run_count = poller->run_count; 1855 stats->busy_count = poller->busy_count; 1856 } 1857 1858 struct spdk_poller * 1859 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1860 { 1861 return TAILQ_FIRST(&thread->active_pollers); 1862 } 1863 1864 struct spdk_poller * 1865 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1866 { 1867 return TAILQ_NEXT(prev, tailq); 1868 } 1869 1870 struct spdk_poller * 1871 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1872 { 1873 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1874 } 1875 1876 struct spdk_poller * 1877 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1878 { 1879 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1880 } 1881 1882 struct spdk_poller * 1883 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1884 { 1885 return TAILQ_FIRST(&thread->paused_pollers); 1886 } 1887 1888 struct spdk_poller * 1889 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1890 { 1891 return TAILQ_NEXT(prev, tailq); 1892 } 1893 1894 struct spdk_io_channel * 1895 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1896 { 1897 return RB_MIN(io_channel_tree, &thread->io_channels); 1898 } 1899 1900 struct spdk_io_channel * 1901 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1902 { 1903 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1904 } 1905 1906 struct call_thread { 1907 struct spdk_thread *cur_thread; 1908 spdk_msg_fn fn; 1909 void *ctx; 1910 1911 struct spdk_thread *orig_thread; 1912 spdk_msg_fn cpl; 1913 }; 1914 1915 static void 1916 _on_thread(void *ctx) 1917 { 1918 struct call_thread *ct = ctx; 1919 int rc __attribute__((unused)); 1920 1921 ct->fn(ct->ctx); 1922 1923 pthread_mutex_lock(&g_devlist_mutex); 1924 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1925 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1926 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1927 ct->cur_thread->name); 1928 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1929 } 1930 pthread_mutex_unlock(&g_devlist_mutex); 1931 1932 if (!ct->cur_thread) { 1933 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1934 1935 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1936 free(ctx); 1937 } else { 1938 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1939 ct->cur_thread->name); 1940 1941 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1942 } 1943 assert(rc == 0); 1944 } 1945 1946 void 1947 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1948 { 1949 struct call_thread *ct; 1950 struct spdk_thread *thread; 1951 int rc __attribute__((unused)); 1952 1953 ct = calloc(1, sizeof(*ct)); 1954 if (!ct) { 1955 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1956 cpl(ctx); 1957 return; 1958 } 1959 1960 ct->fn = fn; 1961 ct->ctx = ctx; 1962 ct->cpl = cpl; 1963 1964 thread = _get_thread(); 1965 if (!thread) { 1966 SPDK_ERRLOG("No thread allocated\n"); 1967 free(ct); 1968 cpl(ctx); 1969 return; 1970 } 1971 ct->orig_thread = thread; 1972 1973 pthread_mutex_lock(&g_devlist_mutex); 1974 ct->cur_thread = TAILQ_FIRST(&g_threads); 1975 pthread_mutex_unlock(&g_devlist_mutex); 1976 1977 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1978 ct->orig_thread->name); 1979 1980 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1981 assert(rc == 0); 1982 } 1983 1984 static inline void 1985 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1986 { 1987 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1988 return; 1989 } 1990 1991 if (!poller->set_intr_cb_fn) { 1992 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1993 assert(false); 1994 return; 1995 } 1996 1997 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1998 } 1999 2000 void 2001 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2002 { 2003 struct spdk_thread *thread = _get_thread(); 2004 struct spdk_poller *poller, *tmp; 2005 2006 assert(thread); 2007 assert(spdk_interrupt_mode_is_enabled()); 2008 2009 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2010 thread->name, enable_interrupt ? "intr" : "poll", 2011 thread->in_interrupt ? "intr" : "poll"); 2012 2013 if (thread->in_interrupt == enable_interrupt) { 2014 return; 2015 } 2016 2017 /* Set pollers to expected mode */ 2018 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2019 poller_set_interrupt_mode(poller, enable_interrupt); 2020 } 2021 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2022 poller_set_interrupt_mode(poller, enable_interrupt); 2023 } 2024 /* All paused pollers will go to work in interrupt mode */ 2025 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2026 poller_set_interrupt_mode(poller, enable_interrupt); 2027 } 2028 2029 thread->in_interrupt = enable_interrupt; 2030 return; 2031 } 2032 2033 static struct io_device * 2034 io_device_get(void *io_device) 2035 { 2036 struct io_device find = {}; 2037 2038 find.io_device = io_device; 2039 return RB_FIND(io_device_tree, &g_io_devices, &find); 2040 } 2041 2042 void 2043 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2044 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2045 const char *name) 2046 { 2047 struct io_device *dev, *tmp; 2048 struct spdk_thread *thread; 2049 2050 assert(io_device != NULL); 2051 assert(create_cb != NULL); 2052 assert(destroy_cb != NULL); 2053 2054 thread = spdk_get_thread(); 2055 if (!thread) { 2056 SPDK_ERRLOG("called from non-SPDK thread\n"); 2057 assert(false); 2058 return; 2059 } 2060 2061 dev = calloc(1, sizeof(struct io_device)); 2062 if (dev == NULL) { 2063 SPDK_ERRLOG("could not allocate io_device\n"); 2064 return; 2065 } 2066 2067 dev->io_device = io_device; 2068 if (name) { 2069 snprintf(dev->name, sizeof(dev->name), "%s", name); 2070 } else { 2071 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2072 } 2073 dev->create_cb = create_cb; 2074 dev->destroy_cb = destroy_cb; 2075 dev->unregister_cb = NULL; 2076 dev->ctx_size = ctx_size; 2077 dev->for_each_count = 0; 2078 dev->unregistered = false; 2079 dev->refcnt = 0; 2080 2081 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2082 dev->name, dev->io_device, thread->name); 2083 2084 pthread_mutex_lock(&g_devlist_mutex); 2085 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2086 if (tmp != NULL) { 2087 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2088 io_device, tmp->name, dev->name); 2089 free(dev); 2090 } 2091 2092 pthread_mutex_unlock(&g_devlist_mutex); 2093 } 2094 2095 static void 2096 _finish_unregister(void *arg) 2097 { 2098 struct io_device *dev = arg; 2099 struct spdk_thread *thread; 2100 2101 thread = spdk_get_thread(); 2102 assert(thread == dev->unregister_thread); 2103 2104 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2105 dev->name, dev->io_device, thread->name); 2106 2107 assert(thread->pending_unregister_count > 0); 2108 thread->pending_unregister_count--; 2109 2110 dev->unregister_cb(dev->io_device); 2111 free(dev); 2112 } 2113 2114 static void 2115 io_device_free(struct io_device *dev) 2116 { 2117 int rc __attribute__((unused)); 2118 2119 if (dev->unregister_cb == NULL) { 2120 free(dev); 2121 } else { 2122 assert(dev->unregister_thread != NULL); 2123 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2124 dev->name, dev->io_device, dev->unregister_thread->name); 2125 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2126 assert(rc == 0); 2127 } 2128 } 2129 2130 void 2131 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2132 { 2133 struct io_device *dev; 2134 uint32_t refcnt; 2135 struct spdk_thread *thread; 2136 2137 thread = spdk_get_thread(); 2138 if (!thread) { 2139 SPDK_ERRLOG("called from non-SPDK thread\n"); 2140 assert(false); 2141 return; 2142 } 2143 2144 pthread_mutex_lock(&g_devlist_mutex); 2145 dev = io_device_get(io_device); 2146 if (!dev) { 2147 SPDK_ERRLOG("io_device %p not found\n", io_device); 2148 assert(false); 2149 pthread_mutex_unlock(&g_devlist_mutex); 2150 return; 2151 } 2152 2153 /* The for_each_count check differentiates the user attempting to unregister the 2154 * device a second time, from the internal call to this function that occurs 2155 * after the for_each_count reaches 0. 2156 */ 2157 if (dev->pending_unregister && dev->for_each_count > 0) { 2158 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2159 assert(false); 2160 pthread_mutex_unlock(&g_devlist_mutex); 2161 return; 2162 } 2163 2164 dev->unregister_cb = unregister_cb; 2165 dev->unregister_thread = thread; 2166 2167 if (dev->for_each_count > 0) { 2168 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2169 dev->name, io_device, dev->for_each_count); 2170 dev->pending_unregister = true; 2171 pthread_mutex_unlock(&g_devlist_mutex); 2172 return; 2173 } 2174 2175 dev->unregistered = true; 2176 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2177 refcnt = dev->refcnt; 2178 pthread_mutex_unlock(&g_devlist_mutex); 2179 2180 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2181 dev->name, dev->io_device, thread->name); 2182 2183 if (unregister_cb) { 2184 thread->pending_unregister_count++; 2185 } 2186 2187 if (refcnt > 0) { 2188 /* defer deletion */ 2189 return; 2190 } 2191 2192 io_device_free(dev); 2193 } 2194 2195 const char * 2196 spdk_io_device_get_name(struct io_device *dev) 2197 { 2198 return dev->name; 2199 } 2200 2201 static struct spdk_io_channel * 2202 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2203 { 2204 struct spdk_io_channel find = {}; 2205 2206 find.dev = dev; 2207 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2208 } 2209 2210 struct spdk_io_channel * 2211 spdk_get_io_channel(void *io_device) 2212 { 2213 struct spdk_io_channel *ch; 2214 struct spdk_thread *thread; 2215 struct io_device *dev; 2216 int rc; 2217 2218 pthread_mutex_lock(&g_devlist_mutex); 2219 dev = io_device_get(io_device); 2220 if (dev == NULL) { 2221 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2222 pthread_mutex_unlock(&g_devlist_mutex); 2223 return NULL; 2224 } 2225 2226 thread = _get_thread(); 2227 if (!thread) { 2228 SPDK_ERRLOG("No thread allocated\n"); 2229 pthread_mutex_unlock(&g_devlist_mutex); 2230 return NULL; 2231 } 2232 2233 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2234 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2235 pthread_mutex_unlock(&g_devlist_mutex); 2236 return NULL; 2237 } 2238 2239 ch = thread_get_io_channel(thread, dev); 2240 if (ch != NULL) { 2241 ch->ref++; 2242 2243 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2244 ch, dev->name, dev->io_device, thread->name, ch->ref); 2245 2246 /* 2247 * An I/O channel already exists for this device on this 2248 * thread, so return it. 2249 */ 2250 pthread_mutex_unlock(&g_devlist_mutex); 2251 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2252 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2253 return ch; 2254 } 2255 2256 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2257 if (ch == NULL) { 2258 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2259 pthread_mutex_unlock(&g_devlist_mutex); 2260 return NULL; 2261 } 2262 2263 ch->dev = dev; 2264 ch->destroy_cb = dev->destroy_cb; 2265 ch->thread = thread; 2266 ch->ref = 1; 2267 ch->destroy_ref = 0; 2268 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2269 2270 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2271 ch, dev->name, dev->io_device, thread->name, ch->ref); 2272 2273 dev->refcnt++; 2274 2275 pthread_mutex_unlock(&g_devlist_mutex); 2276 2277 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2278 if (rc != 0) { 2279 pthread_mutex_lock(&g_devlist_mutex); 2280 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2281 dev->refcnt--; 2282 free(ch); 2283 pthread_mutex_unlock(&g_devlist_mutex); 2284 return NULL; 2285 } 2286 2287 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2288 return ch; 2289 } 2290 2291 static void 2292 put_io_channel(void *arg) 2293 { 2294 struct spdk_io_channel *ch = arg; 2295 bool do_remove_dev = true; 2296 struct spdk_thread *thread; 2297 2298 thread = spdk_get_thread(); 2299 if (!thread) { 2300 SPDK_ERRLOG("called from non-SPDK thread\n"); 2301 assert(false); 2302 return; 2303 } 2304 2305 SPDK_DEBUGLOG(thread, 2306 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2307 ch, ch->dev->name, ch->dev->io_device, thread->name); 2308 2309 assert(ch->thread == thread); 2310 2311 ch->destroy_ref--; 2312 2313 if (ch->ref > 0 || ch->destroy_ref > 0) { 2314 /* 2315 * Another reference to the associated io_device was requested 2316 * after this message was sent but before it had a chance to 2317 * execute. 2318 */ 2319 return; 2320 } 2321 2322 pthread_mutex_lock(&g_devlist_mutex); 2323 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2324 pthread_mutex_unlock(&g_devlist_mutex); 2325 2326 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2327 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2328 2329 pthread_mutex_lock(&g_devlist_mutex); 2330 ch->dev->refcnt--; 2331 2332 if (!ch->dev->unregistered) { 2333 do_remove_dev = false; 2334 } 2335 2336 if (ch->dev->refcnt > 0) { 2337 do_remove_dev = false; 2338 } 2339 2340 pthread_mutex_unlock(&g_devlist_mutex); 2341 2342 if (do_remove_dev) { 2343 io_device_free(ch->dev); 2344 } 2345 free(ch); 2346 } 2347 2348 void 2349 spdk_put_io_channel(struct spdk_io_channel *ch) 2350 { 2351 struct spdk_thread *thread; 2352 int rc __attribute__((unused)); 2353 2354 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2355 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2356 2357 thread = spdk_get_thread(); 2358 if (!thread) { 2359 SPDK_ERRLOG("called from non-SPDK thread\n"); 2360 assert(false); 2361 return; 2362 } 2363 2364 if (ch->thread != thread) { 2365 wrong_thread(__func__, "ch", ch->thread, thread); 2366 return; 2367 } 2368 2369 SPDK_DEBUGLOG(thread, 2370 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2371 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2372 2373 ch->ref--; 2374 2375 if (ch->ref == 0) { 2376 ch->destroy_ref++; 2377 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2378 assert(rc == 0); 2379 } 2380 } 2381 2382 struct spdk_io_channel * 2383 spdk_io_channel_from_ctx(void *ctx) 2384 { 2385 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2386 } 2387 2388 struct spdk_thread * 2389 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2390 { 2391 return ch->thread; 2392 } 2393 2394 void * 2395 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2396 { 2397 return ch->dev->io_device; 2398 } 2399 2400 const char * 2401 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2402 { 2403 return spdk_io_device_get_name(ch->dev); 2404 } 2405 2406 int 2407 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2408 { 2409 return ch->ref; 2410 } 2411 2412 struct spdk_io_channel_iter { 2413 void *io_device; 2414 struct io_device *dev; 2415 spdk_channel_msg fn; 2416 int status; 2417 void *ctx; 2418 struct spdk_io_channel *ch; 2419 2420 struct spdk_thread *cur_thread; 2421 2422 struct spdk_thread *orig_thread; 2423 spdk_channel_for_each_cpl cpl; 2424 }; 2425 2426 void * 2427 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2428 { 2429 return i->io_device; 2430 } 2431 2432 struct spdk_io_channel * 2433 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2434 { 2435 return i->ch; 2436 } 2437 2438 void * 2439 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2440 { 2441 return i->ctx; 2442 } 2443 2444 static void 2445 _call_completion(void *ctx) 2446 { 2447 struct spdk_io_channel_iter *i = ctx; 2448 2449 if (i->cpl != NULL) { 2450 i->cpl(i, i->status); 2451 } 2452 free(i); 2453 } 2454 2455 static void 2456 _call_channel(void *ctx) 2457 { 2458 struct spdk_io_channel_iter *i = ctx; 2459 struct spdk_io_channel *ch; 2460 2461 /* 2462 * It is possible that the channel was deleted before this 2463 * message had a chance to execute. If so, skip calling 2464 * the fn() on this thread. 2465 */ 2466 pthread_mutex_lock(&g_devlist_mutex); 2467 ch = thread_get_io_channel(i->cur_thread, i->dev); 2468 pthread_mutex_unlock(&g_devlist_mutex); 2469 2470 if (ch) { 2471 i->fn(i); 2472 } else { 2473 spdk_for_each_channel_continue(i, 0); 2474 } 2475 } 2476 2477 void 2478 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2479 spdk_channel_for_each_cpl cpl) 2480 { 2481 struct spdk_thread *thread; 2482 struct spdk_io_channel *ch; 2483 struct spdk_io_channel_iter *i; 2484 int rc __attribute__((unused)); 2485 2486 i = calloc(1, sizeof(*i)); 2487 if (!i) { 2488 SPDK_ERRLOG("Unable to allocate iterator\n"); 2489 assert(false); 2490 return; 2491 } 2492 2493 i->io_device = io_device; 2494 i->fn = fn; 2495 i->ctx = ctx; 2496 i->cpl = cpl; 2497 i->orig_thread = _get_thread(); 2498 2499 pthread_mutex_lock(&g_devlist_mutex); 2500 i->dev = io_device_get(io_device); 2501 if (i->dev == NULL) { 2502 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2503 assert(false); 2504 i->status = -ENODEV; 2505 goto end; 2506 } 2507 2508 /* Do not allow new for_each operations if we are already waiting to unregister 2509 * the device for other for_each operations to complete. 2510 */ 2511 if (i->dev->pending_unregister) { 2512 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2513 i->status = -ENODEV; 2514 goto end; 2515 } 2516 2517 TAILQ_FOREACH(thread, &g_threads, tailq) { 2518 ch = thread_get_io_channel(thread, i->dev); 2519 if (ch != NULL) { 2520 ch->dev->for_each_count++; 2521 i->cur_thread = thread; 2522 i->ch = ch; 2523 pthread_mutex_unlock(&g_devlist_mutex); 2524 rc = spdk_thread_send_msg(thread, _call_channel, i); 2525 assert(rc == 0); 2526 return; 2527 } 2528 } 2529 2530 end: 2531 pthread_mutex_unlock(&g_devlist_mutex); 2532 2533 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2534 assert(rc == 0); 2535 } 2536 2537 static void 2538 __pending_unregister(void *arg) 2539 { 2540 struct io_device *dev = arg; 2541 2542 assert(dev->pending_unregister); 2543 assert(dev->for_each_count == 0); 2544 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2545 } 2546 2547 void 2548 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2549 { 2550 struct spdk_thread *thread; 2551 struct spdk_io_channel *ch; 2552 struct io_device *dev; 2553 int rc __attribute__((unused)); 2554 2555 assert(i->cur_thread == spdk_get_thread()); 2556 2557 i->status = status; 2558 2559 pthread_mutex_lock(&g_devlist_mutex); 2560 dev = i->dev; 2561 if (status) { 2562 goto end; 2563 } 2564 2565 thread = TAILQ_NEXT(i->cur_thread, tailq); 2566 while (thread) { 2567 ch = thread_get_io_channel(thread, dev); 2568 if (ch != NULL) { 2569 i->cur_thread = thread; 2570 i->ch = ch; 2571 pthread_mutex_unlock(&g_devlist_mutex); 2572 rc = spdk_thread_send_msg(thread, _call_channel, i); 2573 assert(rc == 0); 2574 return; 2575 } 2576 thread = TAILQ_NEXT(thread, tailq); 2577 } 2578 2579 end: 2580 dev->for_each_count--; 2581 i->ch = NULL; 2582 pthread_mutex_unlock(&g_devlist_mutex); 2583 2584 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2585 assert(rc == 0); 2586 2587 pthread_mutex_lock(&g_devlist_mutex); 2588 if (dev->pending_unregister && dev->for_each_count == 0) { 2589 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2590 assert(rc == 0); 2591 } 2592 pthread_mutex_unlock(&g_devlist_mutex); 2593 } 2594 2595 struct spdk_interrupt { 2596 int efd; 2597 struct spdk_thread *thread; 2598 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2599 }; 2600 2601 static void 2602 thread_interrupt_destroy(struct spdk_thread *thread) 2603 { 2604 struct spdk_fd_group *fgrp = thread->fgrp; 2605 2606 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2607 2608 if (thread->msg_fd < 0) { 2609 return; 2610 } 2611 2612 spdk_fd_group_remove(fgrp, thread->msg_fd); 2613 close(thread->msg_fd); 2614 thread->msg_fd = -1; 2615 2616 spdk_fd_group_destroy(fgrp); 2617 thread->fgrp = NULL; 2618 } 2619 2620 #ifdef __linux__ 2621 static int 2622 thread_interrupt_msg_process(void *arg) 2623 { 2624 struct spdk_thread *thread = arg; 2625 uint32_t msg_count; 2626 spdk_msg_fn critical_msg; 2627 int rc = 0; 2628 uint64_t notify = 1; 2629 2630 assert(spdk_interrupt_mode_is_enabled()); 2631 2632 /* There may be race between msg_acknowledge and another producer's msg_notify, 2633 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2634 * This can avoid msg notification missing. 2635 */ 2636 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2637 if (rc < 0 && errno != EAGAIN) { 2638 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2639 } 2640 2641 critical_msg = thread->critical_msg; 2642 if (spdk_unlikely(critical_msg != NULL)) { 2643 critical_msg(NULL); 2644 thread->critical_msg = NULL; 2645 rc = 1; 2646 } 2647 2648 msg_count = msg_queue_run_batch(thread, 0); 2649 if (msg_count) { 2650 rc = 1; 2651 } 2652 2653 return rc; 2654 } 2655 2656 static int 2657 thread_interrupt_create(struct spdk_thread *thread) 2658 { 2659 int rc; 2660 2661 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2662 2663 rc = spdk_fd_group_create(&thread->fgrp); 2664 if (rc) { 2665 return rc; 2666 } 2667 2668 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2669 if (thread->msg_fd < 0) { 2670 rc = -errno; 2671 spdk_fd_group_destroy(thread->fgrp); 2672 thread->fgrp = NULL; 2673 2674 return rc; 2675 } 2676 2677 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2678 thread_interrupt_msg_process, thread); 2679 } 2680 #else 2681 static int 2682 thread_interrupt_create(struct spdk_thread *thread) 2683 { 2684 return -ENOTSUP; 2685 } 2686 #endif 2687 2688 struct spdk_interrupt * 2689 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2690 void *arg, const char *name) 2691 { 2692 struct spdk_thread *thread; 2693 struct spdk_interrupt *intr; 2694 int ret; 2695 2696 thread = spdk_get_thread(); 2697 if (!thread) { 2698 assert(false); 2699 return NULL; 2700 } 2701 2702 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2703 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2704 return NULL; 2705 } 2706 2707 intr = calloc(1, sizeof(*intr)); 2708 if (intr == NULL) { 2709 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2710 return NULL; 2711 } 2712 2713 if (name) { 2714 snprintf(intr->name, sizeof(intr->name), "%s", name); 2715 } else { 2716 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2717 } 2718 2719 intr->efd = efd; 2720 intr->thread = thread; 2721 2722 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2723 2724 if (ret != 0) { 2725 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2726 thread->name, efd, spdk_strerror(-ret)); 2727 free(intr); 2728 return NULL; 2729 } 2730 2731 return intr; 2732 } 2733 2734 void 2735 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2736 { 2737 struct spdk_thread *thread; 2738 struct spdk_interrupt *intr; 2739 2740 intr = *pintr; 2741 if (intr == NULL) { 2742 return; 2743 } 2744 2745 *pintr = NULL; 2746 2747 thread = spdk_get_thread(); 2748 if (!thread) { 2749 assert(false); 2750 return; 2751 } 2752 2753 if (intr->thread != thread) { 2754 wrong_thread(__func__, intr->name, intr->thread, thread); 2755 return; 2756 } 2757 2758 spdk_fd_group_remove(thread->fgrp, intr->efd); 2759 free(intr); 2760 } 2761 2762 int 2763 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2764 enum spdk_interrupt_event_types event_types) 2765 { 2766 struct spdk_thread *thread; 2767 2768 thread = spdk_get_thread(); 2769 if (!thread) { 2770 assert(false); 2771 return -EINVAL; 2772 } 2773 2774 if (intr->thread != thread) { 2775 wrong_thread(__func__, intr->name, intr->thread, thread); 2776 return -EINVAL; 2777 } 2778 2779 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2780 } 2781 2782 int 2783 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2784 { 2785 return spdk_fd_group_get_fd(thread->fgrp); 2786 } 2787 2788 static bool g_interrupt_mode = false; 2789 2790 int 2791 spdk_interrupt_mode_enable(void) 2792 { 2793 /* It must be called once prior to initializing the threading library. 2794 * g_spdk_msg_mempool will be valid if thread library is initialized. 2795 */ 2796 if (g_spdk_msg_mempool) { 2797 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2798 return -1; 2799 } 2800 2801 #ifdef __linux__ 2802 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2803 g_interrupt_mode = true; 2804 return 0; 2805 #else 2806 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2807 g_interrupt_mode = false; 2808 return -ENOTSUP; 2809 #endif 2810 } 2811 2812 bool 2813 spdk_interrupt_mode_is_enabled(void) 2814 { 2815 return g_interrupt_mode; 2816 } 2817 2818 void 2819 spdk_spin_init(struct spdk_spinlock *sspin) 2820 { 2821 int rc; 2822 2823 memset(sspin, 0, sizeof(*sspin)); 2824 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 2825 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2826 } 2827 2828 void 2829 spdk_spin_destroy(struct spdk_spinlock *sspin) 2830 { 2831 int rc; 2832 2833 SPIN_ASSERT_RETURN_VOID(sspin->thread == NULL, SPIN_ERR_LOCK_HELD); 2834 2835 rc = pthread_spin_destroy(&sspin->spinlock); 2836 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2837 } 2838 2839 void 2840 spdk_spin_lock(struct spdk_spinlock *sspin) 2841 { 2842 struct spdk_thread *thread = spdk_get_thread(); 2843 int rc; 2844 2845 SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD); 2846 SPIN_ASSERT_RETURN_VOID(thread != sspin->thread, SPIN_ERR_DEADLOCK); 2847 2848 rc = pthread_spin_lock(&sspin->spinlock); 2849 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2850 2851 sspin->thread = thread; 2852 sspin->thread->lock_count++; 2853 } 2854 2855 void 2856 spdk_spin_unlock(struct spdk_spinlock *sspin) 2857 { 2858 struct spdk_thread *thread = spdk_get_thread(); 2859 int rc; 2860 2861 SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD); 2862 SPIN_ASSERT_RETURN_VOID(thread == sspin->thread, SPIN_ERR_WRONG_THREAD); 2863 2864 SPIN_ASSERT_RETURN_VOID(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT); 2865 thread->lock_count--; 2866 sspin->thread = NULL; 2867 2868 rc = pthread_spin_unlock(&sspin->spinlock); 2869 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2870 } 2871 2872 bool 2873 spdk_spin_held(struct spdk_spinlock *sspin) 2874 { 2875 struct spdk_thread *thread = spdk_get_thread(); 2876 2877 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 2878 2879 return sspin->thread == thread; 2880 } 2881 2882 SPDK_LOG_REGISTER_COMPONENT(thread) 2883