1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/likely.h" 11 #include "spdk/queue.h" 12 #include "spdk/string.h" 13 #include "spdk/thread.h" 14 #include "spdk/trace.h" 15 #include "spdk/util.h" 16 #include "spdk/fd_group.h" 17 18 #include "spdk/log.h" 19 #include "spdk_internal/thread.h" 20 #include "spdk_internal/usdt.h" 21 #include "thread_internal.h" 22 23 #include "spdk_internal/trace_defs.h" 24 25 #ifdef __linux__ 26 #include <sys/timerfd.h> 27 #include <sys/eventfd.h> 28 #endif 29 30 #define SPDK_MSG_BATCH_SIZE 8 31 #define SPDK_MAX_DEVICE_NAME_LEN 256 32 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 33 #define SPDK_MAX_POLLER_NAME_LEN 256 34 #define SPDK_MAX_THREAD_NAME_LEN 256 35 36 static struct spdk_thread *g_app_thread; 37 38 struct spdk_interrupt { 39 int efd; 40 struct spdk_thread *thread; 41 spdk_interrupt_fn fn; 42 void *arg; 43 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 44 }; 45 46 enum spdk_poller_state { 47 /* The poller is registered with a thread but not currently executing its fn. */ 48 SPDK_POLLER_STATE_WAITING, 49 50 /* The poller is currently running its fn. */ 51 SPDK_POLLER_STATE_RUNNING, 52 53 /* The poller was unregistered during the execution of its fn. */ 54 SPDK_POLLER_STATE_UNREGISTERED, 55 56 /* The poller is in the process of being paused. It will be paused 57 * during the next time it's supposed to be executed. 58 */ 59 SPDK_POLLER_STATE_PAUSING, 60 61 /* The poller is registered but currently paused. It's on the 62 * paused_pollers list. 63 */ 64 SPDK_POLLER_STATE_PAUSED, 65 }; 66 67 struct spdk_poller { 68 TAILQ_ENTRY(spdk_poller) tailq; 69 RB_ENTRY(spdk_poller) node; 70 71 /* Current state of the poller; should only be accessed from the poller's thread. */ 72 enum spdk_poller_state state; 73 74 uint64_t period_ticks; 75 uint64_t next_run_tick; 76 uint64_t run_count; 77 uint64_t busy_count; 78 uint64_t id; 79 spdk_poller_fn fn; 80 void *arg; 81 struct spdk_thread *thread; 82 struct spdk_interrupt *intr; 83 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 84 void *set_intr_cb_arg; 85 86 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 87 }; 88 89 enum spdk_thread_state { 90 /* The thread is processing poller and message by spdk_thread_poll(). */ 91 SPDK_THREAD_STATE_RUNNING, 92 93 /* The thread is in the process of termination. It reaps unregistering 94 * poller are releasing I/O channel. 95 */ 96 SPDK_THREAD_STATE_EXITING, 97 98 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 99 SPDK_THREAD_STATE_EXITED, 100 }; 101 102 struct spdk_thread { 103 uint64_t tsc_last; 104 struct spdk_thread_stats stats; 105 /* 106 * Contains pollers actively running on this thread. Pollers 107 * are run round-robin. The thread takes one poller from the head 108 * of the ring, executes it, then puts it back at the tail of 109 * the ring. 110 */ 111 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 112 /** 113 * Contains pollers running on this thread with a periodic timer. 114 */ 115 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 116 struct spdk_poller *first_timed_poller; 117 /* 118 * Contains paused pollers. Pollers on this queue are waiting until 119 * they are resumed (in which case they're put onto the active/timer 120 * queues) or unregistered. 121 */ 122 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 123 struct spdk_ring *messages; 124 int msg_fd; 125 SLIST_HEAD(, spdk_msg) msg_cache; 126 size_t msg_cache_count; 127 spdk_msg_fn critical_msg; 128 uint64_t id; 129 uint64_t next_poller_id; 130 enum spdk_thread_state state; 131 int pending_unregister_count; 132 133 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 134 TAILQ_ENTRY(spdk_thread) tailq; 135 136 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 137 struct spdk_cpuset cpumask; 138 uint64_t exit_timeout_tsc; 139 140 int32_t lock_count; 141 142 /* Indicates whether this spdk_thread currently runs in interrupt. */ 143 bool in_interrupt; 144 bool poller_unregistered; 145 struct spdk_fd_group *fgrp; 146 147 /* User context allocated at the end */ 148 uint8_t ctx[0]; 149 }; 150 151 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 152 153 static spdk_new_thread_fn g_new_thread_fn = NULL; 154 static spdk_thread_op_fn g_thread_op_fn = NULL; 155 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 156 static size_t g_ctx_sz = 0; 157 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 158 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 159 * SPDK application is required. 160 */ 161 static uint64_t g_thread_id = 1; 162 163 enum spin_error { 164 SPIN_ERR_NONE, 165 /* Trying to use an SPDK lock while not on an SPDK thread */ 166 SPIN_ERR_NOT_SPDK_THREAD, 167 /* Trying to lock a lock already held by this SPDK thread */ 168 SPIN_ERR_DEADLOCK, 169 /* Trying to unlock a lock not held by this SPDK thread */ 170 SPIN_ERR_WRONG_THREAD, 171 /* pthread_spin_*() returned an error */ 172 SPIN_ERR_PTHREAD, 173 /* Trying to destroy a lock that is held */ 174 SPIN_ERR_LOCK_HELD, 175 /* lock_count is invalid */ 176 SPIN_ERR_LOCK_COUNT, 177 /* 178 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 179 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 180 * deadlock when another SPDK thread on the same pthread tries to take that lock. 181 */ 182 SPIN_ERR_HOLD_DURING_SWITCH, 183 /* Must be last, not an actual error code */ 184 SPIN_ERR_LAST 185 }; 186 187 static const char *spin_error_strings[] = { 188 [SPIN_ERR_NONE] = "No error", 189 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 190 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 191 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 192 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 193 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 194 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 195 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 196 }; 197 198 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 199 ? "Unknown error" : spin_error_strings[err] 200 201 static void 202 __posix_abort(enum spin_error err) 203 { 204 abort(); 205 } 206 207 typedef void (*spin_abort)(enum spin_error err); 208 spin_abort g_spin_abort_fn = __posix_abort; 209 210 #define SPIN_ASSERT_IMPL(cond, err, ret) \ 211 do { \ 212 if (spdk_unlikely(!(cond))) { \ 213 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 214 SPIN_ERROR_STRING(err), #cond); \ 215 g_spin_abort_fn(err); \ 216 ret; \ 217 } \ 218 } while (0) 219 #define SPIN_ASSERT_RETURN_VOID(cond, err) SPIN_ASSERT_IMPL(cond, err, return) 220 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, return ret) 221 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err,) 222 223 struct io_device { 224 void *io_device; 225 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 226 spdk_io_channel_create_cb create_cb; 227 spdk_io_channel_destroy_cb destroy_cb; 228 spdk_io_device_unregister_cb unregister_cb; 229 struct spdk_thread *unregister_thread; 230 uint32_t ctx_size; 231 uint32_t for_each_count; 232 RB_ENTRY(io_device) node; 233 234 uint32_t refcnt; 235 236 bool pending_unregister; 237 bool unregistered; 238 }; 239 240 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 241 242 static int 243 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 244 { 245 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 246 } 247 248 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 249 250 static int 251 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 252 { 253 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 254 } 255 256 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 257 258 struct spdk_msg { 259 spdk_msg_fn fn; 260 void *arg; 261 262 SLIST_ENTRY(spdk_msg) link; 263 }; 264 265 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 266 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 267 268 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 269 static uint32_t g_thread_count = 0; 270 271 static __thread struct spdk_thread *tls_thread = NULL; 272 273 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 274 { 275 spdk_trace_register_description("THREAD_IOCH_GET", 276 TRACE_THREAD_IOCH_GET, 277 OWNER_NONE, OBJECT_NONE, 0, 278 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 279 spdk_trace_register_description("THREAD_IOCH_PUT", 280 TRACE_THREAD_IOCH_PUT, 281 OWNER_NONE, OBJECT_NONE, 0, 282 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 283 } 284 285 /* 286 * If this compare function returns zero when two next_run_ticks are equal, 287 * the macro RB_INSERT() returns a pointer to the element with the same 288 * next_run_tick. 289 * 290 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 291 * to remove as a parameter. 292 * 293 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 294 * side by returning 1 when two next_run_ticks are equal. 295 */ 296 static inline int 297 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 298 { 299 if (poller1->next_run_tick < poller2->next_run_tick) { 300 return -1; 301 } else { 302 return 1; 303 } 304 } 305 306 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 307 308 static inline struct spdk_thread * 309 _get_thread(void) 310 { 311 return tls_thread; 312 } 313 314 static int 315 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 316 { 317 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 318 319 g_ctx_sz = ctx_sz; 320 321 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 322 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 323 sizeof(struct spdk_msg), 324 0, /* No cache. We do our own. */ 325 SPDK_ENV_SOCKET_ID_ANY); 326 327 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 328 msg_mempool_sz); 329 330 if (!g_spdk_msg_mempool) { 331 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 332 return -ENOMEM; 333 } 334 335 return 0; 336 } 337 338 static void thread_interrupt_destroy(struct spdk_thread *thread); 339 static int thread_interrupt_create(struct spdk_thread *thread); 340 341 static void 342 _free_thread(struct spdk_thread *thread) 343 { 344 struct spdk_io_channel *ch; 345 struct spdk_msg *msg; 346 struct spdk_poller *poller, *ptmp; 347 348 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 349 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 350 thread->name, ch->dev->name); 351 } 352 353 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 354 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 355 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 356 poller->name); 357 } 358 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 359 free(poller); 360 } 361 362 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 363 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 364 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 365 poller->name); 366 } 367 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 368 free(poller); 369 } 370 371 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 372 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 373 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 374 free(poller); 375 } 376 377 pthread_mutex_lock(&g_devlist_mutex); 378 assert(g_thread_count > 0); 379 g_thread_count--; 380 TAILQ_REMOVE(&g_threads, thread, tailq); 381 pthread_mutex_unlock(&g_devlist_mutex); 382 383 msg = SLIST_FIRST(&thread->msg_cache); 384 while (msg != NULL) { 385 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 386 387 assert(thread->msg_cache_count > 0); 388 thread->msg_cache_count--; 389 spdk_mempool_put(g_spdk_msg_mempool, msg); 390 391 msg = SLIST_FIRST(&thread->msg_cache); 392 } 393 394 assert(thread->msg_cache_count == 0); 395 396 if (spdk_interrupt_mode_is_enabled()) { 397 thread_interrupt_destroy(thread); 398 } 399 400 spdk_ring_free(thread->messages); 401 free(thread); 402 } 403 404 int 405 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 406 { 407 assert(g_new_thread_fn == NULL); 408 assert(g_thread_op_fn == NULL); 409 410 if (new_thread_fn == NULL) { 411 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 412 } else { 413 g_new_thread_fn = new_thread_fn; 414 } 415 416 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 417 } 418 419 int 420 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 421 spdk_thread_op_supported_fn thread_op_supported_fn, 422 size_t ctx_sz, size_t msg_mempool_sz) 423 { 424 assert(g_new_thread_fn == NULL); 425 assert(g_thread_op_fn == NULL); 426 assert(g_thread_op_supported_fn == NULL); 427 428 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 429 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 430 return -EINVAL; 431 } 432 433 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 434 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 435 } else { 436 g_thread_op_fn = thread_op_fn; 437 g_thread_op_supported_fn = thread_op_supported_fn; 438 } 439 440 return _thread_lib_init(ctx_sz, msg_mempool_sz); 441 } 442 443 void 444 spdk_thread_lib_fini(void) 445 { 446 struct io_device *dev; 447 448 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 449 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 450 } 451 452 g_new_thread_fn = NULL; 453 g_thread_op_fn = NULL; 454 g_thread_op_supported_fn = NULL; 455 g_ctx_sz = 0; 456 if (g_app_thread != NULL) { 457 _free_thread(g_app_thread); 458 g_app_thread = NULL; 459 } 460 461 if (g_spdk_msg_mempool) { 462 spdk_mempool_free(g_spdk_msg_mempool); 463 g_spdk_msg_mempool = NULL; 464 } 465 } 466 467 struct spdk_thread * 468 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 469 { 470 struct spdk_thread *thread, *null_thread; 471 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 472 int rc = 0, i; 473 474 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 475 if (!thread) { 476 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 477 return NULL; 478 } 479 480 if (cpumask) { 481 spdk_cpuset_copy(&thread->cpumask, cpumask); 482 } else { 483 spdk_cpuset_negate(&thread->cpumask); 484 } 485 486 RB_INIT(&thread->io_channels); 487 TAILQ_INIT(&thread->active_pollers); 488 RB_INIT(&thread->timed_pollers); 489 TAILQ_INIT(&thread->paused_pollers); 490 SLIST_INIT(&thread->msg_cache); 491 thread->msg_cache_count = 0; 492 493 thread->tsc_last = spdk_get_ticks(); 494 495 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 496 * ID exceeds UINT64_MAX a warning message is logged 497 */ 498 thread->next_poller_id = 1; 499 500 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 501 if (!thread->messages) { 502 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 503 free(thread); 504 return NULL; 505 } 506 507 /* Fill the local message pool cache. */ 508 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 509 if (rc == 0) { 510 /* If we can't populate the cache it's ok. The cache will get filled 511 * up organically as messages are passed to the thread. */ 512 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 513 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 514 thread->msg_cache_count++; 515 } 516 } 517 518 if (name) { 519 snprintf(thread->name, sizeof(thread->name), "%s", name); 520 } else { 521 snprintf(thread->name, sizeof(thread->name), "%p", thread); 522 } 523 524 pthread_mutex_lock(&g_devlist_mutex); 525 if (g_thread_id == 0) { 526 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 527 pthread_mutex_unlock(&g_devlist_mutex); 528 _free_thread(thread); 529 return NULL; 530 } 531 thread->id = g_thread_id++; 532 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 533 g_thread_count++; 534 pthread_mutex_unlock(&g_devlist_mutex); 535 536 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 537 thread->id, thread->name); 538 539 if (spdk_interrupt_mode_is_enabled()) { 540 thread->in_interrupt = true; 541 rc = thread_interrupt_create(thread); 542 if (rc != 0) { 543 _free_thread(thread); 544 return NULL; 545 } 546 } 547 548 if (g_new_thread_fn) { 549 rc = g_new_thread_fn(thread); 550 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 551 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 552 } 553 554 if (rc != 0) { 555 _free_thread(thread); 556 return NULL; 557 } 558 559 thread->state = SPDK_THREAD_STATE_RUNNING; 560 561 /* If this is the first thread, save it as the app thread. Use an atomic 562 * compare + exchange to guard against crazy users who might try to 563 * call spdk_thread_create() simultaneously on multiple threads. 564 */ 565 null_thread = NULL; 566 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 567 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 568 569 return thread; 570 } 571 572 struct spdk_thread * 573 spdk_thread_get_app_thread(void) 574 { 575 return g_app_thread; 576 } 577 578 void 579 spdk_set_thread(struct spdk_thread *thread) 580 { 581 tls_thread = thread; 582 } 583 584 static void 585 thread_exit(struct spdk_thread *thread, uint64_t now) 586 { 587 struct spdk_poller *poller; 588 struct spdk_io_channel *ch; 589 590 if (now >= thread->exit_timeout_tsc) { 591 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 592 thread->name); 593 goto exited; 594 } 595 596 if (spdk_ring_count(thread->messages) > 0) { 597 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 598 return; 599 } 600 601 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 602 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 603 SPDK_INFOLOG(thread, 604 "thread %s still has active poller %s\n", 605 thread->name, poller->name); 606 return; 607 } 608 } 609 610 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 611 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 612 SPDK_INFOLOG(thread, 613 "thread %s still has active timed poller %s\n", 614 thread->name, poller->name); 615 return; 616 } 617 } 618 619 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 620 SPDK_INFOLOG(thread, 621 "thread %s still has paused poller %s\n", 622 thread->name, poller->name); 623 return; 624 } 625 626 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 627 SPDK_INFOLOG(thread, 628 "thread %s still has channel for io_device %s\n", 629 thread->name, ch->dev->name); 630 return; 631 } 632 633 if (thread->pending_unregister_count > 0) { 634 SPDK_INFOLOG(thread, 635 "thread %s is still unregistering io_devices\n", 636 thread->name); 637 return; 638 } 639 640 exited: 641 thread->state = SPDK_THREAD_STATE_EXITED; 642 if (spdk_unlikely(thread->in_interrupt)) { 643 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 644 } 645 } 646 647 static void _thread_exit(void *ctx); 648 649 int 650 spdk_thread_exit(struct spdk_thread *thread) 651 { 652 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 653 654 assert(tls_thread == thread); 655 656 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 657 SPDK_INFOLOG(thread, 658 "thread %s is already exiting\n", 659 thread->name); 660 return 0; 661 } 662 663 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 664 SPDK_THREAD_EXIT_TIMEOUT_SEC); 665 thread->state = SPDK_THREAD_STATE_EXITING; 666 667 if (spdk_interrupt_mode_is_enabled()) { 668 spdk_thread_send_msg(thread, _thread_exit, thread); 669 } 670 671 return 0; 672 } 673 674 bool 675 spdk_thread_is_running(struct spdk_thread *thread) 676 { 677 return thread->state == SPDK_THREAD_STATE_RUNNING; 678 } 679 680 bool 681 spdk_thread_is_exited(struct spdk_thread *thread) 682 { 683 return thread->state == SPDK_THREAD_STATE_EXITED; 684 } 685 686 void 687 spdk_thread_destroy(struct spdk_thread *thread) 688 { 689 assert(thread != NULL); 690 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 691 692 assert(thread->state == SPDK_THREAD_STATE_EXITED); 693 694 if (tls_thread == thread) { 695 tls_thread = NULL; 696 } 697 698 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 699 if (thread != g_app_thread) { 700 _free_thread(thread); 701 } 702 } 703 704 void * 705 spdk_thread_get_ctx(struct spdk_thread *thread) 706 { 707 if (g_ctx_sz > 0) { 708 return thread->ctx; 709 } 710 711 return NULL; 712 } 713 714 struct spdk_cpuset * 715 spdk_thread_get_cpumask(struct spdk_thread *thread) 716 { 717 return &thread->cpumask; 718 } 719 720 int 721 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 722 { 723 struct spdk_thread *thread; 724 725 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 726 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 727 assert(false); 728 return -ENOTSUP; 729 } 730 731 thread = spdk_get_thread(); 732 if (!thread) { 733 SPDK_ERRLOG("Called from non-SPDK thread\n"); 734 assert(false); 735 return -EINVAL; 736 } 737 738 spdk_cpuset_copy(&thread->cpumask, cpumask); 739 740 /* Invoke framework's reschedule operation. If this function is called multiple times 741 * in a single spdk_thread_poll() context, the last cpumask will be used in the 742 * reschedule operation. 743 */ 744 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 745 746 return 0; 747 } 748 749 struct spdk_thread * 750 spdk_thread_get_from_ctx(void *ctx) 751 { 752 if (ctx == NULL) { 753 assert(false); 754 return NULL; 755 } 756 757 assert(g_ctx_sz > 0); 758 759 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 760 } 761 762 static inline uint32_t 763 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 764 { 765 unsigned count, i; 766 void *messages[SPDK_MSG_BATCH_SIZE]; 767 uint64_t notify = 1; 768 int rc; 769 770 #ifdef DEBUG 771 /* 772 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 773 * so we will never actually read uninitialized data from events, but just to be sure 774 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 775 */ 776 memset(messages, 0, sizeof(messages)); 777 #endif 778 779 if (max_msgs > 0) { 780 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 781 } else { 782 max_msgs = SPDK_MSG_BATCH_SIZE; 783 } 784 785 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 786 if (spdk_unlikely(thread->in_interrupt) && 787 spdk_ring_count(thread->messages) != 0) { 788 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 789 if (rc < 0) { 790 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 791 } 792 } 793 if (count == 0) { 794 return 0; 795 } 796 797 for (i = 0; i < count; i++) { 798 struct spdk_msg *msg = messages[i]; 799 800 assert(msg != NULL); 801 802 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 803 804 msg->fn(msg->arg); 805 806 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 807 808 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 809 /* Insert the messages at the head. We want to re-use the hot 810 * ones. */ 811 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 812 thread->msg_cache_count++; 813 } else { 814 spdk_mempool_put(g_spdk_msg_mempool, msg); 815 } 816 } 817 818 return count; 819 } 820 821 static void 822 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 823 { 824 struct spdk_poller *tmp __attribute__((unused)); 825 826 poller->next_run_tick = now + poller->period_ticks; 827 828 /* 829 * Insert poller in the thread's timed_pollers tree by next scheduled run time 830 * as its key. 831 */ 832 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 833 assert(tmp == NULL); 834 835 /* Update the cache only if it is empty or the inserted poller is earlier than it. 836 * RB_MIN() is not necessary here because all pollers, which has exactly the same 837 * next_run_tick as the existing poller, are inserted on the right side. 838 */ 839 if (thread->first_timed_poller == NULL || 840 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 841 thread->first_timed_poller = poller; 842 } 843 } 844 845 static inline void 846 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 847 { 848 struct spdk_poller *tmp __attribute__((unused)); 849 850 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 851 assert(tmp != NULL); 852 853 /* This function is not used in any case that is performance critical. 854 * Update the cache simply by RB_MIN() if it needs to be changed. 855 */ 856 if (thread->first_timed_poller == poller) { 857 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 858 } 859 } 860 861 static void 862 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 863 { 864 if (poller->period_ticks) { 865 poller_insert_timer(thread, poller, spdk_get_ticks()); 866 } else { 867 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 868 } 869 } 870 871 static inline void 872 thread_update_stats(struct spdk_thread *thread, uint64_t end, 873 uint64_t start, int rc) 874 { 875 if (rc == 0) { 876 /* Poller status idle */ 877 thread->stats.idle_tsc += end - start; 878 } else if (rc > 0) { 879 /* Poller status busy */ 880 thread->stats.busy_tsc += end - start; 881 } 882 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 883 thread->tsc_last = end; 884 } 885 886 static inline int 887 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 888 { 889 int rc; 890 891 switch (poller->state) { 892 case SPDK_POLLER_STATE_UNREGISTERED: 893 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 894 free(poller); 895 return 0; 896 case SPDK_POLLER_STATE_PAUSING: 897 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 898 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 899 poller->state = SPDK_POLLER_STATE_PAUSED; 900 return 0; 901 case SPDK_POLLER_STATE_WAITING: 902 break; 903 default: 904 assert(false); 905 break; 906 } 907 908 poller->state = SPDK_POLLER_STATE_RUNNING; 909 rc = poller->fn(poller->arg); 910 911 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 912 913 poller->run_count++; 914 if (rc > 0) { 915 poller->busy_count++; 916 } 917 918 #ifdef DEBUG 919 if (rc == -1) { 920 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 921 } 922 #endif 923 924 switch (poller->state) { 925 case SPDK_POLLER_STATE_UNREGISTERED: 926 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 927 free(poller); 928 break; 929 case SPDK_POLLER_STATE_PAUSING: 930 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 931 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 932 poller->state = SPDK_POLLER_STATE_PAUSED; 933 break; 934 case SPDK_POLLER_STATE_PAUSED: 935 case SPDK_POLLER_STATE_WAITING: 936 break; 937 case SPDK_POLLER_STATE_RUNNING: 938 poller->state = SPDK_POLLER_STATE_WAITING; 939 break; 940 default: 941 assert(false); 942 break; 943 } 944 945 return rc; 946 } 947 948 static inline int 949 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 950 uint64_t now) 951 { 952 int rc; 953 954 switch (poller->state) { 955 case SPDK_POLLER_STATE_UNREGISTERED: 956 free(poller); 957 return 0; 958 case SPDK_POLLER_STATE_PAUSING: 959 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 960 poller->state = SPDK_POLLER_STATE_PAUSED; 961 return 0; 962 case SPDK_POLLER_STATE_WAITING: 963 break; 964 default: 965 assert(false); 966 break; 967 } 968 969 poller->state = SPDK_POLLER_STATE_RUNNING; 970 rc = poller->fn(poller->arg); 971 972 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 973 974 poller->run_count++; 975 if (rc > 0) { 976 poller->busy_count++; 977 } 978 979 #ifdef DEBUG 980 if (rc == -1) { 981 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 982 } 983 #endif 984 985 switch (poller->state) { 986 case SPDK_POLLER_STATE_UNREGISTERED: 987 free(poller); 988 break; 989 case SPDK_POLLER_STATE_PAUSING: 990 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 991 poller->state = SPDK_POLLER_STATE_PAUSED; 992 break; 993 case SPDK_POLLER_STATE_PAUSED: 994 break; 995 case SPDK_POLLER_STATE_RUNNING: 996 poller->state = SPDK_POLLER_STATE_WAITING; 997 /* fallthrough */ 998 case SPDK_POLLER_STATE_WAITING: 999 poller_insert_timer(thread, poller, now); 1000 break; 1001 default: 1002 assert(false); 1003 break; 1004 } 1005 1006 return rc; 1007 } 1008 1009 static int 1010 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1011 { 1012 uint32_t msg_count; 1013 struct spdk_poller *poller, *tmp; 1014 spdk_msg_fn critical_msg; 1015 int rc = 0; 1016 1017 thread->tsc_last = now; 1018 1019 critical_msg = thread->critical_msg; 1020 if (spdk_unlikely(critical_msg != NULL)) { 1021 critical_msg(NULL); 1022 thread->critical_msg = NULL; 1023 rc = 1; 1024 } 1025 1026 msg_count = msg_queue_run_batch(thread, max_msgs); 1027 if (msg_count) { 1028 rc = 1; 1029 } 1030 1031 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1032 active_pollers_head, tailq, tmp) { 1033 int poller_rc; 1034 1035 poller_rc = thread_execute_poller(thread, poller); 1036 if (poller_rc > rc) { 1037 rc = poller_rc; 1038 } 1039 } 1040 1041 poller = thread->first_timed_poller; 1042 while (poller != NULL) { 1043 int timer_rc = 0; 1044 1045 if (now < poller->next_run_tick) { 1046 break; 1047 } 1048 1049 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1050 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1051 1052 /* Update the cache to the next timed poller in the list 1053 * only if the current poller is still the closest, otherwise, 1054 * do nothing because the cache has been already updated. 1055 */ 1056 if (thread->first_timed_poller == poller) { 1057 thread->first_timed_poller = tmp; 1058 } 1059 1060 timer_rc = thread_execute_timed_poller(thread, poller, now); 1061 if (timer_rc > rc) { 1062 rc = timer_rc; 1063 } 1064 1065 poller = tmp; 1066 } 1067 1068 return rc; 1069 } 1070 1071 static void 1072 _thread_remove_pollers(void *ctx) 1073 { 1074 struct spdk_thread *thread = ctx; 1075 struct spdk_poller *poller, *tmp; 1076 1077 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1078 active_pollers_head, tailq, tmp) { 1079 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1080 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1081 free(poller); 1082 } 1083 } 1084 1085 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1086 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1087 poller_remove_timer(thread, poller); 1088 free(poller); 1089 } 1090 } 1091 1092 thread->poller_unregistered = false; 1093 } 1094 1095 static void 1096 _thread_exit(void *ctx) 1097 { 1098 struct spdk_thread *thread = ctx; 1099 1100 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1101 1102 thread_exit(thread, spdk_get_ticks()); 1103 } 1104 1105 int 1106 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1107 { 1108 struct spdk_thread *orig_thread; 1109 int rc; 1110 1111 orig_thread = _get_thread(); 1112 tls_thread = thread; 1113 1114 if (now == 0) { 1115 now = spdk_get_ticks(); 1116 } 1117 1118 if (spdk_likely(!thread->in_interrupt)) { 1119 rc = thread_poll(thread, max_msgs, now); 1120 if (spdk_unlikely(thread->in_interrupt)) { 1121 /* The thread transitioned to interrupt mode during the above poll. 1122 * Poll it one more time in case that during the transition time 1123 * there is msg received without notification. 1124 */ 1125 rc = thread_poll(thread, max_msgs, now); 1126 } 1127 1128 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1129 thread_exit(thread, now); 1130 } 1131 } else { 1132 /* Non-block wait on thread's fd_group */ 1133 rc = spdk_fd_group_wait(thread->fgrp, 0); 1134 } 1135 1136 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1137 1138 tls_thread = orig_thread; 1139 1140 return rc; 1141 } 1142 1143 uint64_t 1144 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1145 { 1146 struct spdk_poller *poller; 1147 1148 poller = thread->first_timed_poller; 1149 if (poller) { 1150 return poller->next_run_tick; 1151 } 1152 1153 return 0; 1154 } 1155 1156 int 1157 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1158 { 1159 return !TAILQ_EMPTY(&thread->active_pollers); 1160 } 1161 1162 static bool 1163 thread_has_unpaused_pollers(struct spdk_thread *thread) 1164 { 1165 if (TAILQ_EMPTY(&thread->active_pollers) && 1166 RB_EMPTY(&thread->timed_pollers)) { 1167 return false; 1168 } 1169 1170 return true; 1171 } 1172 1173 bool 1174 spdk_thread_has_pollers(struct spdk_thread *thread) 1175 { 1176 if (!thread_has_unpaused_pollers(thread) && 1177 TAILQ_EMPTY(&thread->paused_pollers)) { 1178 return false; 1179 } 1180 1181 return true; 1182 } 1183 1184 bool 1185 spdk_thread_is_idle(struct spdk_thread *thread) 1186 { 1187 if (spdk_ring_count(thread->messages) || 1188 thread_has_unpaused_pollers(thread) || 1189 thread->critical_msg != NULL) { 1190 return false; 1191 } 1192 1193 return true; 1194 } 1195 1196 uint32_t 1197 spdk_thread_get_count(void) 1198 { 1199 /* 1200 * Return cached value of the current thread count. We could acquire the 1201 * lock and iterate through the TAILQ of threads to count them, but that 1202 * count could still be invalidated after we release the lock. 1203 */ 1204 return g_thread_count; 1205 } 1206 1207 struct spdk_thread * 1208 spdk_get_thread(void) 1209 { 1210 return _get_thread(); 1211 } 1212 1213 const char * 1214 spdk_thread_get_name(const struct spdk_thread *thread) 1215 { 1216 return thread->name; 1217 } 1218 1219 uint64_t 1220 spdk_thread_get_id(const struct spdk_thread *thread) 1221 { 1222 return thread->id; 1223 } 1224 1225 struct spdk_thread * 1226 spdk_thread_get_by_id(uint64_t id) 1227 { 1228 struct spdk_thread *thread; 1229 1230 if (id == 0 || id >= g_thread_id) { 1231 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1232 return NULL; 1233 } 1234 pthread_mutex_lock(&g_devlist_mutex); 1235 TAILQ_FOREACH(thread, &g_threads, tailq) { 1236 if (thread->id == id) { 1237 break; 1238 } 1239 } 1240 pthread_mutex_unlock(&g_devlist_mutex); 1241 return thread; 1242 } 1243 1244 int 1245 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1246 { 1247 struct spdk_thread *thread; 1248 1249 thread = _get_thread(); 1250 if (!thread) { 1251 SPDK_ERRLOG("No thread allocated\n"); 1252 return -EINVAL; 1253 } 1254 1255 if (stats == NULL) { 1256 return -EINVAL; 1257 } 1258 1259 *stats = thread->stats; 1260 1261 return 0; 1262 } 1263 1264 uint64_t 1265 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1266 { 1267 if (thread == NULL) { 1268 thread = _get_thread(); 1269 } 1270 1271 return thread->tsc_last; 1272 } 1273 1274 static inline int 1275 thread_send_msg_notification(const struct spdk_thread *target_thread) 1276 { 1277 uint64_t notify = 1; 1278 int rc; 1279 1280 /* Not necessary to do notification if interrupt facility is not enabled */ 1281 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1282 return 0; 1283 } 1284 1285 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1286 * after sending thread msg, it is necessary to check whether target thread runs in 1287 * interrupt mode and then decide whether do event notification. 1288 */ 1289 if (spdk_unlikely(target_thread->in_interrupt)) { 1290 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1291 if (rc < 0) { 1292 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1293 return -EIO; 1294 } 1295 } 1296 1297 return 0; 1298 } 1299 1300 int 1301 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1302 { 1303 struct spdk_thread *local_thread; 1304 struct spdk_msg *msg; 1305 int rc; 1306 1307 assert(thread != NULL); 1308 1309 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1310 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1311 return -EIO; 1312 } 1313 1314 local_thread = _get_thread(); 1315 1316 msg = NULL; 1317 if (local_thread != NULL) { 1318 if (local_thread->msg_cache_count > 0) { 1319 msg = SLIST_FIRST(&local_thread->msg_cache); 1320 assert(msg != NULL); 1321 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1322 local_thread->msg_cache_count--; 1323 } 1324 } 1325 1326 if (msg == NULL) { 1327 msg = spdk_mempool_get(g_spdk_msg_mempool); 1328 if (!msg) { 1329 SPDK_ERRLOG("msg could not be allocated\n"); 1330 return -ENOMEM; 1331 } 1332 } 1333 1334 msg->fn = fn; 1335 msg->arg = ctx; 1336 1337 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1338 if (rc != 1) { 1339 SPDK_ERRLOG("msg could not be enqueued\n"); 1340 spdk_mempool_put(g_spdk_msg_mempool, msg); 1341 return -EIO; 1342 } 1343 1344 return thread_send_msg_notification(thread); 1345 } 1346 1347 int 1348 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1349 { 1350 spdk_msg_fn expected = NULL; 1351 1352 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1353 __ATOMIC_SEQ_CST)) { 1354 return -EIO; 1355 } 1356 1357 return thread_send_msg_notification(thread); 1358 } 1359 1360 #ifdef __linux__ 1361 static int 1362 interrupt_timerfd_process(void *arg) 1363 { 1364 struct spdk_poller *poller = arg; 1365 uint64_t exp; 1366 int rc; 1367 1368 /* clear the level of interval timer */ 1369 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1370 if (rc < 0) { 1371 if (rc == -EAGAIN) { 1372 return 0; 1373 } 1374 1375 return rc; 1376 } 1377 1378 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1379 1380 return poller->fn(poller->arg); 1381 } 1382 1383 static int 1384 period_poller_interrupt_init(struct spdk_poller *poller) 1385 { 1386 int timerfd; 1387 1388 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1389 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1390 if (timerfd < 0) { 1391 return -errno; 1392 } 1393 1394 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1395 if (poller->intr == NULL) { 1396 close(timerfd); 1397 return -1; 1398 } 1399 1400 return 0; 1401 } 1402 1403 static void 1404 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1405 { 1406 int timerfd; 1407 uint64_t now_tick = spdk_get_ticks(); 1408 uint64_t ticks = spdk_get_ticks_hz(); 1409 int ret; 1410 struct itimerspec new_tv = {}; 1411 struct itimerspec old_tv = {}; 1412 1413 assert(poller->intr != NULL); 1414 assert(poller->period_ticks != 0); 1415 1416 timerfd = poller->intr->efd; 1417 1418 assert(timerfd >= 0); 1419 1420 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1421 interrupt_mode ? "interrupt" : "poll"); 1422 1423 if (interrupt_mode) { 1424 /* Set repeated timer expiration */ 1425 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1426 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1427 1428 /* Update next timer expiration */ 1429 if (poller->next_run_tick == 0) { 1430 poller->next_run_tick = now_tick + poller->period_ticks; 1431 } else if (poller->next_run_tick < now_tick) { 1432 poller->next_run_tick = now_tick; 1433 } 1434 1435 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1436 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1437 1438 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1439 if (ret < 0) { 1440 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1441 assert(false); 1442 } 1443 } else { 1444 /* Disarm the timer */ 1445 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1446 if (ret < 0) { 1447 /* timerfd_settime's failure indicates that the timerfd is in error */ 1448 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1449 assert(false); 1450 } 1451 1452 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1453 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1454 */ 1455 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1456 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1457 poller_remove_timer(poller->thread, poller); 1458 poller_insert_timer(poller->thread, poller, now_tick); 1459 } 1460 } 1461 1462 static void 1463 poller_interrupt_fini(struct spdk_poller *poller) 1464 { 1465 int fd; 1466 1467 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1468 assert(poller->intr != NULL); 1469 fd = poller->intr->efd; 1470 spdk_interrupt_unregister(&poller->intr); 1471 close(fd); 1472 } 1473 1474 static int 1475 busy_poller_interrupt_init(struct spdk_poller *poller) 1476 { 1477 int busy_efd; 1478 1479 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1480 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1481 if (busy_efd < 0) { 1482 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1483 return -errno; 1484 } 1485 1486 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1487 if (poller->intr == NULL) { 1488 close(busy_efd); 1489 return -1; 1490 } 1491 1492 return 0; 1493 } 1494 1495 static void 1496 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1497 { 1498 int busy_efd = poller->intr->efd; 1499 uint64_t notify = 1; 1500 int rc __attribute__((unused)); 1501 1502 assert(busy_efd >= 0); 1503 1504 if (interrupt_mode) { 1505 /* Write without read on eventfd will get it repeatedly triggered. */ 1506 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1507 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1508 } 1509 } else { 1510 /* Read on eventfd will clear its level triggering. */ 1511 rc = read(busy_efd, ¬ify, sizeof(notify)); 1512 } 1513 } 1514 1515 #else 1516 1517 static int 1518 period_poller_interrupt_init(struct spdk_poller *poller) 1519 { 1520 return -ENOTSUP; 1521 } 1522 1523 static void 1524 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1525 { 1526 } 1527 1528 static void 1529 poller_interrupt_fini(struct spdk_poller *poller) 1530 { 1531 } 1532 1533 static int 1534 busy_poller_interrupt_init(struct spdk_poller *poller) 1535 { 1536 return -ENOTSUP; 1537 } 1538 1539 static void 1540 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1541 { 1542 } 1543 1544 #endif 1545 1546 void 1547 spdk_poller_register_interrupt(struct spdk_poller *poller, 1548 spdk_poller_set_interrupt_mode_cb cb_fn, 1549 void *cb_arg) 1550 { 1551 assert(poller != NULL); 1552 assert(cb_fn != NULL); 1553 assert(spdk_get_thread() == poller->thread); 1554 1555 if (!spdk_interrupt_mode_is_enabled()) { 1556 return; 1557 } 1558 1559 /* If this poller already had an interrupt, clean the old one up. */ 1560 if (poller->intr != NULL) { 1561 poller_interrupt_fini(poller); 1562 } 1563 1564 poller->set_intr_cb_fn = cb_fn; 1565 poller->set_intr_cb_arg = cb_arg; 1566 1567 /* Set poller into interrupt mode if thread is in interrupt. */ 1568 if (poller->thread->in_interrupt) { 1569 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1570 } 1571 } 1572 1573 static uint64_t 1574 convert_us_to_ticks(uint64_t us) 1575 { 1576 uint64_t quotient, remainder, ticks; 1577 1578 if (us) { 1579 quotient = us / SPDK_SEC_TO_USEC; 1580 remainder = us % SPDK_SEC_TO_USEC; 1581 ticks = spdk_get_ticks_hz(); 1582 1583 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1584 } else { 1585 return 0; 1586 } 1587 } 1588 1589 static struct spdk_poller * 1590 poller_register(spdk_poller_fn fn, 1591 void *arg, 1592 uint64_t period_microseconds, 1593 const char *name) 1594 { 1595 struct spdk_thread *thread; 1596 struct spdk_poller *poller; 1597 1598 thread = spdk_get_thread(); 1599 if (!thread) { 1600 assert(false); 1601 return NULL; 1602 } 1603 1604 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1605 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1606 return NULL; 1607 } 1608 1609 poller = calloc(1, sizeof(*poller)); 1610 if (poller == NULL) { 1611 SPDK_ERRLOG("Poller memory allocation failed\n"); 1612 return NULL; 1613 } 1614 1615 if (name) { 1616 snprintf(poller->name, sizeof(poller->name), "%s", name); 1617 } else { 1618 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1619 } 1620 1621 poller->state = SPDK_POLLER_STATE_WAITING; 1622 poller->fn = fn; 1623 poller->arg = arg; 1624 poller->thread = thread; 1625 poller->intr = NULL; 1626 if (thread->next_poller_id == 0) { 1627 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1628 thread->next_poller_id = 1; 1629 } 1630 poller->id = thread->next_poller_id++; 1631 1632 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1633 1634 if (spdk_interrupt_mode_is_enabled()) { 1635 int rc; 1636 1637 if (period_microseconds) { 1638 rc = period_poller_interrupt_init(poller); 1639 if (rc < 0) { 1640 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1641 free(poller); 1642 return NULL; 1643 } 1644 1645 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1646 poller->set_intr_cb_arg = NULL; 1647 1648 } else { 1649 /* If the poller doesn't have a period, create interruptfd that's always 1650 * busy automatically when running in interrupt mode. 1651 */ 1652 rc = busy_poller_interrupt_init(poller); 1653 if (rc > 0) { 1654 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1655 free(poller); 1656 return NULL; 1657 } 1658 1659 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1660 poller->set_intr_cb_arg = NULL; 1661 } 1662 1663 /* Set poller into interrupt mode if thread is in interrupt. */ 1664 if (poller->thread->in_interrupt) { 1665 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1666 } 1667 } 1668 1669 thread_insert_poller(thread, poller); 1670 1671 return poller; 1672 } 1673 1674 struct spdk_poller * 1675 spdk_poller_register(spdk_poller_fn fn, 1676 void *arg, 1677 uint64_t period_microseconds) 1678 { 1679 return poller_register(fn, arg, period_microseconds, NULL); 1680 } 1681 1682 struct spdk_poller * 1683 spdk_poller_register_named(spdk_poller_fn fn, 1684 void *arg, 1685 uint64_t period_microseconds, 1686 const char *name) 1687 { 1688 return poller_register(fn, arg, period_microseconds, name); 1689 } 1690 1691 static void 1692 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1693 struct spdk_thread *curthread) 1694 { 1695 if (thread == NULL) { 1696 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1697 abort(); 1698 } 1699 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1700 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1701 thread->name, thread->id); 1702 assert(false); 1703 } 1704 1705 void 1706 spdk_poller_unregister(struct spdk_poller **ppoller) 1707 { 1708 struct spdk_thread *thread; 1709 struct spdk_poller *poller; 1710 1711 poller = *ppoller; 1712 if (poller == NULL) { 1713 return; 1714 } 1715 1716 *ppoller = NULL; 1717 1718 thread = spdk_get_thread(); 1719 if (!thread) { 1720 assert(false); 1721 return; 1722 } 1723 1724 if (poller->thread != thread) { 1725 wrong_thread(__func__, poller->name, poller->thread, thread); 1726 return; 1727 } 1728 1729 if (spdk_interrupt_mode_is_enabled()) { 1730 /* Release the interrupt resource for period or busy poller */ 1731 if (poller->intr != NULL) { 1732 poller_interrupt_fini(poller); 1733 } 1734 1735 /* If there is not already a pending poller removal, generate 1736 * a message to go process removals. */ 1737 if (!thread->poller_unregistered) { 1738 thread->poller_unregistered = true; 1739 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1740 } 1741 } 1742 1743 /* If the poller was paused, put it on the active_pollers list so that 1744 * its unregistration can be processed by spdk_thread_poll(). 1745 */ 1746 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1747 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1748 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1749 poller->period_ticks = 0; 1750 } 1751 1752 /* Simply set the state to unregistered. The poller will get cleaned up 1753 * in a subsequent call to spdk_thread_poll(). 1754 */ 1755 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1756 } 1757 1758 void 1759 spdk_poller_pause(struct spdk_poller *poller) 1760 { 1761 struct spdk_thread *thread; 1762 1763 thread = spdk_get_thread(); 1764 if (!thread) { 1765 assert(false); 1766 return; 1767 } 1768 1769 if (poller->thread != thread) { 1770 wrong_thread(__func__, poller->name, poller->thread, thread); 1771 return; 1772 } 1773 1774 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1775 * spdk_thread_poll() move it. It allows a poller to be paused from 1776 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1777 * iteration, or from within itself without breaking the logic to always 1778 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1779 */ 1780 switch (poller->state) { 1781 case SPDK_POLLER_STATE_PAUSED: 1782 case SPDK_POLLER_STATE_PAUSING: 1783 break; 1784 case SPDK_POLLER_STATE_RUNNING: 1785 case SPDK_POLLER_STATE_WAITING: 1786 poller->state = SPDK_POLLER_STATE_PAUSING; 1787 break; 1788 default: 1789 assert(false); 1790 break; 1791 } 1792 } 1793 1794 void 1795 spdk_poller_resume(struct spdk_poller *poller) 1796 { 1797 struct spdk_thread *thread; 1798 1799 thread = spdk_get_thread(); 1800 if (!thread) { 1801 assert(false); 1802 return; 1803 } 1804 1805 if (poller->thread != thread) { 1806 wrong_thread(__func__, poller->name, poller->thread, thread); 1807 return; 1808 } 1809 1810 /* If a poller is paused it has to be removed from the paused pollers 1811 * list and put on the active list or timer tree depending on its 1812 * period_ticks. If a poller is still in the process of being paused, 1813 * we just need to flip its state back to waiting, as it's already on 1814 * the appropriate list or tree. 1815 */ 1816 switch (poller->state) { 1817 case SPDK_POLLER_STATE_PAUSED: 1818 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1819 thread_insert_poller(thread, poller); 1820 /* fallthrough */ 1821 case SPDK_POLLER_STATE_PAUSING: 1822 poller->state = SPDK_POLLER_STATE_WAITING; 1823 break; 1824 case SPDK_POLLER_STATE_RUNNING: 1825 case SPDK_POLLER_STATE_WAITING: 1826 break; 1827 default: 1828 assert(false); 1829 break; 1830 } 1831 } 1832 1833 const char * 1834 spdk_poller_get_name(struct spdk_poller *poller) 1835 { 1836 return poller->name; 1837 } 1838 1839 uint64_t 1840 spdk_poller_get_id(struct spdk_poller *poller) 1841 { 1842 return poller->id; 1843 } 1844 1845 const char * 1846 spdk_poller_get_state_str(struct spdk_poller *poller) 1847 { 1848 switch (poller->state) { 1849 case SPDK_POLLER_STATE_WAITING: 1850 return "waiting"; 1851 case SPDK_POLLER_STATE_RUNNING: 1852 return "running"; 1853 case SPDK_POLLER_STATE_UNREGISTERED: 1854 return "unregistered"; 1855 case SPDK_POLLER_STATE_PAUSING: 1856 return "pausing"; 1857 case SPDK_POLLER_STATE_PAUSED: 1858 return "paused"; 1859 default: 1860 return NULL; 1861 } 1862 } 1863 1864 uint64_t 1865 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1866 { 1867 return poller->period_ticks; 1868 } 1869 1870 void 1871 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1872 { 1873 stats->run_count = poller->run_count; 1874 stats->busy_count = poller->busy_count; 1875 } 1876 1877 struct spdk_poller * 1878 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1879 { 1880 return TAILQ_FIRST(&thread->active_pollers); 1881 } 1882 1883 struct spdk_poller * 1884 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1885 { 1886 return TAILQ_NEXT(prev, tailq); 1887 } 1888 1889 struct spdk_poller * 1890 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1891 { 1892 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1893 } 1894 1895 struct spdk_poller * 1896 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1897 { 1898 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1899 } 1900 1901 struct spdk_poller * 1902 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1903 { 1904 return TAILQ_FIRST(&thread->paused_pollers); 1905 } 1906 1907 struct spdk_poller * 1908 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1909 { 1910 return TAILQ_NEXT(prev, tailq); 1911 } 1912 1913 struct spdk_io_channel * 1914 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1915 { 1916 return RB_MIN(io_channel_tree, &thread->io_channels); 1917 } 1918 1919 struct spdk_io_channel * 1920 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1921 { 1922 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1923 } 1924 1925 struct call_thread { 1926 struct spdk_thread *cur_thread; 1927 spdk_msg_fn fn; 1928 void *ctx; 1929 1930 struct spdk_thread *orig_thread; 1931 spdk_msg_fn cpl; 1932 }; 1933 1934 static void 1935 _on_thread(void *ctx) 1936 { 1937 struct call_thread *ct = ctx; 1938 int rc __attribute__((unused)); 1939 1940 ct->fn(ct->ctx); 1941 1942 pthread_mutex_lock(&g_devlist_mutex); 1943 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1944 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1945 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1946 ct->cur_thread->name); 1947 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1948 } 1949 pthread_mutex_unlock(&g_devlist_mutex); 1950 1951 if (!ct->cur_thread) { 1952 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1953 1954 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1955 free(ctx); 1956 } else { 1957 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1958 ct->cur_thread->name); 1959 1960 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1961 } 1962 assert(rc == 0); 1963 } 1964 1965 void 1966 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1967 { 1968 struct call_thread *ct; 1969 struct spdk_thread *thread; 1970 int rc __attribute__((unused)); 1971 1972 ct = calloc(1, sizeof(*ct)); 1973 if (!ct) { 1974 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1975 cpl(ctx); 1976 return; 1977 } 1978 1979 ct->fn = fn; 1980 ct->ctx = ctx; 1981 ct->cpl = cpl; 1982 1983 thread = _get_thread(); 1984 if (!thread) { 1985 SPDK_ERRLOG("No thread allocated\n"); 1986 free(ct); 1987 cpl(ctx); 1988 return; 1989 } 1990 ct->orig_thread = thread; 1991 1992 pthread_mutex_lock(&g_devlist_mutex); 1993 ct->cur_thread = TAILQ_FIRST(&g_threads); 1994 pthread_mutex_unlock(&g_devlist_mutex); 1995 1996 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1997 ct->orig_thread->name); 1998 1999 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2000 assert(rc == 0); 2001 } 2002 2003 static inline void 2004 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2005 { 2006 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2007 return; 2008 } 2009 2010 if (!poller->set_intr_cb_fn) { 2011 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2012 assert(false); 2013 return; 2014 } 2015 2016 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2017 } 2018 2019 void 2020 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2021 { 2022 struct spdk_thread *thread = _get_thread(); 2023 struct spdk_poller *poller, *tmp; 2024 2025 assert(thread); 2026 assert(spdk_interrupt_mode_is_enabled()); 2027 2028 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2029 thread->name, enable_interrupt ? "intr" : "poll", 2030 thread->in_interrupt ? "intr" : "poll"); 2031 2032 if (thread->in_interrupt == enable_interrupt) { 2033 return; 2034 } 2035 2036 /* Set pollers to expected mode */ 2037 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2038 poller_set_interrupt_mode(poller, enable_interrupt); 2039 } 2040 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2041 poller_set_interrupt_mode(poller, enable_interrupt); 2042 } 2043 /* All paused pollers will go to work in interrupt mode */ 2044 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2045 poller_set_interrupt_mode(poller, enable_interrupt); 2046 } 2047 2048 thread->in_interrupt = enable_interrupt; 2049 return; 2050 } 2051 2052 static struct io_device * 2053 io_device_get(void *io_device) 2054 { 2055 struct io_device find = {}; 2056 2057 find.io_device = io_device; 2058 return RB_FIND(io_device_tree, &g_io_devices, &find); 2059 } 2060 2061 void 2062 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2063 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2064 const char *name) 2065 { 2066 struct io_device *dev, *tmp; 2067 struct spdk_thread *thread; 2068 2069 assert(io_device != NULL); 2070 assert(create_cb != NULL); 2071 assert(destroy_cb != NULL); 2072 2073 thread = spdk_get_thread(); 2074 if (!thread) { 2075 SPDK_ERRLOG("called from non-SPDK thread\n"); 2076 assert(false); 2077 return; 2078 } 2079 2080 dev = calloc(1, sizeof(struct io_device)); 2081 if (dev == NULL) { 2082 SPDK_ERRLOG("could not allocate io_device\n"); 2083 return; 2084 } 2085 2086 dev->io_device = io_device; 2087 if (name) { 2088 snprintf(dev->name, sizeof(dev->name), "%s", name); 2089 } else { 2090 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2091 } 2092 dev->create_cb = create_cb; 2093 dev->destroy_cb = destroy_cb; 2094 dev->unregister_cb = NULL; 2095 dev->ctx_size = ctx_size; 2096 dev->for_each_count = 0; 2097 dev->unregistered = false; 2098 dev->refcnt = 0; 2099 2100 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2101 dev->name, dev->io_device, thread->name); 2102 2103 pthread_mutex_lock(&g_devlist_mutex); 2104 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2105 if (tmp != NULL) { 2106 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2107 io_device, tmp->name, dev->name); 2108 free(dev); 2109 } 2110 2111 pthread_mutex_unlock(&g_devlist_mutex); 2112 } 2113 2114 static void 2115 _finish_unregister(void *arg) 2116 { 2117 struct io_device *dev = arg; 2118 struct spdk_thread *thread; 2119 2120 thread = spdk_get_thread(); 2121 assert(thread == dev->unregister_thread); 2122 2123 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2124 dev->name, dev->io_device, thread->name); 2125 2126 assert(thread->pending_unregister_count > 0); 2127 thread->pending_unregister_count--; 2128 2129 dev->unregister_cb(dev->io_device); 2130 free(dev); 2131 } 2132 2133 static void 2134 io_device_free(struct io_device *dev) 2135 { 2136 int rc __attribute__((unused)); 2137 2138 if (dev->unregister_cb == NULL) { 2139 free(dev); 2140 } else { 2141 assert(dev->unregister_thread != NULL); 2142 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2143 dev->name, dev->io_device, dev->unregister_thread->name); 2144 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2145 assert(rc == 0); 2146 } 2147 } 2148 2149 void 2150 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2151 { 2152 struct io_device *dev; 2153 uint32_t refcnt; 2154 struct spdk_thread *thread; 2155 2156 thread = spdk_get_thread(); 2157 if (!thread) { 2158 SPDK_ERRLOG("called from non-SPDK thread\n"); 2159 assert(false); 2160 return; 2161 } 2162 2163 pthread_mutex_lock(&g_devlist_mutex); 2164 dev = io_device_get(io_device); 2165 if (!dev) { 2166 SPDK_ERRLOG("io_device %p not found\n", io_device); 2167 assert(false); 2168 pthread_mutex_unlock(&g_devlist_mutex); 2169 return; 2170 } 2171 2172 /* The for_each_count check differentiates the user attempting to unregister the 2173 * device a second time, from the internal call to this function that occurs 2174 * after the for_each_count reaches 0. 2175 */ 2176 if (dev->pending_unregister && dev->for_each_count > 0) { 2177 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2178 assert(false); 2179 pthread_mutex_unlock(&g_devlist_mutex); 2180 return; 2181 } 2182 2183 dev->unregister_cb = unregister_cb; 2184 dev->unregister_thread = thread; 2185 2186 if (dev->for_each_count > 0) { 2187 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2188 dev->name, io_device, dev->for_each_count); 2189 dev->pending_unregister = true; 2190 pthread_mutex_unlock(&g_devlist_mutex); 2191 return; 2192 } 2193 2194 dev->unregistered = true; 2195 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2196 refcnt = dev->refcnt; 2197 pthread_mutex_unlock(&g_devlist_mutex); 2198 2199 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2200 dev->name, dev->io_device, thread->name); 2201 2202 if (unregister_cb) { 2203 thread->pending_unregister_count++; 2204 } 2205 2206 if (refcnt > 0) { 2207 /* defer deletion */ 2208 return; 2209 } 2210 2211 io_device_free(dev); 2212 } 2213 2214 const char * 2215 spdk_io_device_get_name(struct io_device *dev) 2216 { 2217 return dev->name; 2218 } 2219 2220 static struct spdk_io_channel * 2221 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2222 { 2223 struct spdk_io_channel find = {}; 2224 2225 find.dev = dev; 2226 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2227 } 2228 2229 struct spdk_io_channel * 2230 spdk_get_io_channel(void *io_device) 2231 { 2232 struct spdk_io_channel *ch; 2233 struct spdk_thread *thread; 2234 struct io_device *dev; 2235 int rc; 2236 2237 pthread_mutex_lock(&g_devlist_mutex); 2238 dev = io_device_get(io_device); 2239 if (dev == NULL) { 2240 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2241 pthread_mutex_unlock(&g_devlist_mutex); 2242 return NULL; 2243 } 2244 2245 thread = _get_thread(); 2246 if (!thread) { 2247 SPDK_ERRLOG("No thread allocated\n"); 2248 pthread_mutex_unlock(&g_devlist_mutex); 2249 return NULL; 2250 } 2251 2252 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2253 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2254 pthread_mutex_unlock(&g_devlist_mutex); 2255 return NULL; 2256 } 2257 2258 ch = thread_get_io_channel(thread, dev); 2259 if (ch != NULL) { 2260 ch->ref++; 2261 2262 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2263 ch, dev->name, dev->io_device, thread->name, ch->ref); 2264 2265 /* 2266 * An I/O channel already exists for this device on this 2267 * thread, so return it. 2268 */ 2269 pthread_mutex_unlock(&g_devlist_mutex); 2270 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2271 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2272 return ch; 2273 } 2274 2275 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2276 if (ch == NULL) { 2277 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2278 pthread_mutex_unlock(&g_devlist_mutex); 2279 return NULL; 2280 } 2281 2282 ch->dev = dev; 2283 ch->destroy_cb = dev->destroy_cb; 2284 ch->thread = thread; 2285 ch->ref = 1; 2286 ch->destroy_ref = 0; 2287 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2288 2289 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2290 ch, dev->name, dev->io_device, thread->name, ch->ref); 2291 2292 dev->refcnt++; 2293 2294 pthread_mutex_unlock(&g_devlist_mutex); 2295 2296 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2297 if (rc != 0) { 2298 pthread_mutex_lock(&g_devlist_mutex); 2299 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2300 dev->refcnt--; 2301 free(ch); 2302 pthread_mutex_unlock(&g_devlist_mutex); 2303 return NULL; 2304 } 2305 2306 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2307 return ch; 2308 } 2309 2310 static void 2311 put_io_channel(void *arg) 2312 { 2313 struct spdk_io_channel *ch = arg; 2314 bool do_remove_dev = true; 2315 struct spdk_thread *thread; 2316 2317 thread = spdk_get_thread(); 2318 if (!thread) { 2319 SPDK_ERRLOG("called from non-SPDK thread\n"); 2320 assert(false); 2321 return; 2322 } 2323 2324 SPDK_DEBUGLOG(thread, 2325 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2326 ch, ch->dev->name, ch->dev->io_device, thread->name); 2327 2328 assert(ch->thread == thread); 2329 2330 ch->destroy_ref--; 2331 2332 if (ch->ref > 0 || ch->destroy_ref > 0) { 2333 /* 2334 * Another reference to the associated io_device was requested 2335 * after this message was sent but before it had a chance to 2336 * execute. 2337 */ 2338 return; 2339 } 2340 2341 pthread_mutex_lock(&g_devlist_mutex); 2342 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2343 pthread_mutex_unlock(&g_devlist_mutex); 2344 2345 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2346 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2347 2348 pthread_mutex_lock(&g_devlist_mutex); 2349 ch->dev->refcnt--; 2350 2351 if (!ch->dev->unregistered) { 2352 do_remove_dev = false; 2353 } 2354 2355 if (ch->dev->refcnt > 0) { 2356 do_remove_dev = false; 2357 } 2358 2359 pthread_mutex_unlock(&g_devlist_mutex); 2360 2361 if (do_remove_dev) { 2362 io_device_free(ch->dev); 2363 } 2364 free(ch); 2365 } 2366 2367 void 2368 spdk_put_io_channel(struct spdk_io_channel *ch) 2369 { 2370 struct spdk_thread *thread; 2371 int rc __attribute__((unused)); 2372 2373 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2374 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2375 2376 thread = spdk_get_thread(); 2377 if (!thread) { 2378 SPDK_ERRLOG("called from non-SPDK thread\n"); 2379 assert(false); 2380 return; 2381 } 2382 2383 if (ch->thread != thread) { 2384 wrong_thread(__func__, "ch", ch->thread, thread); 2385 return; 2386 } 2387 2388 SPDK_DEBUGLOG(thread, 2389 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2390 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2391 2392 ch->ref--; 2393 2394 if (ch->ref == 0) { 2395 ch->destroy_ref++; 2396 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2397 assert(rc == 0); 2398 } 2399 } 2400 2401 struct spdk_io_channel * 2402 spdk_io_channel_from_ctx(void *ctx) 2403 { 2404 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2405 } 2406 2407 struct spdk_thread * 2408 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2409 { 2410 return ch->thread; 2411 } 2412 2413 void * 2414 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2415 { 2416 return ch->dev->io_device; 2417 } 2418 2419 const char * 2420 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2421 { 2422 return spdk_io_device_get_name(ch->dev); 2423 } 2424 2425 int 2426 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2427 { 2428 return ch->ref; 2429 } 2430 2431 struct spdk_io_channel_iter { 2432 void *io_device; 2433 struct io_device *dev; 2434 spdk_channel_msg fn; 2435 int status; 2436 void *ctx; 2437 struct spdk_io_channel *ch; 2438 2439 struct spdk_thread *cur_thread; 2440 2441 struct spdk_thread *orig_thread; 2442 spdk_channel_for_each_cpl cpl; 2443 }; 2444 2445 void * 2446 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2447 { 2448 return i->io_device; 2449 } 2450 2451 struct spdk_io_channel * 2452 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2453 { 2454 return i->ch; 2455 } 2456 2457 void * 2458 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2459 { 2460 return i->ctx; 2461 } 2462 2463 static void 2464 _call_completion(void *ctx) 2465 { 2466 struct spdk_io_channel_iter *i = ctx; 2467 2468 if (i->cpl != NULL) { 2469 i->cpl(i, i->status); 2470 } 2471 free(i); 2472 } 2473 2474 static void 2475 _call_channel(void *ctx) 2476 { 2477 struct spdk_io_channel_iter *i = ctx; 2478 struct spdk_io_channel *ch; 2479 2480 /* 2481 * It is possible that the channel was deleted before this 2482 * message had a chance to execute. If so, skip calling 2483 * the fn() on this thread. 2484 */ 2485 pthread_mutex_lock(&g_devlist_mutex); 2486 ch = thread_get_io_channel(i->cur_thread, i->dev); 2487 pthread_mutex_unlock(&g_devlist_mutex); 2488 2489 if (ch) { 2490 i->fn(i); 2491 } else { 2492 spdk_for_each_channel_continue(i, 0); 2493 } 2494 } 2495 2496 void 2497 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2498 spdk_channel_for_each_cpl cpl) 2499 { 2500 struct spdk_thread *thread; 2501 struct spdk_io_channel *ch; 2502 struct spdk_io_channel_iter *i; 2503 int rc __attribute__((unused)); 2504 2505 i = calloc(1, sizeof(*i)); 2506 if (!i) { 2507 SPDK_ERRLOG("Unable to allocate iterator\n"); 2508 assert(false); 2509 return; 2510 } 2511 2512 i->io_device = io_device; 2513 i->fn = fn; 2514 i->ctx = ctx; 2515 i->cpl = cpl; 2516 i->orig_thread = _get_thread(); 2517 2518 pthread_mutex_lock(&g_devlist_mutex); 2519 i->dev = io_device_get(io_device); 2520 if (i->dev == NULL) { 2521 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2522 assert(false); 2523 i->status = -ENODEV; 2524 goto end; 2525 } 2526 2527 /* Do not allow new for_each operations if we are already waiting to unregister 2528 * the device for other for_each operations to complete. 2529 */ 2530 if (i->dev->pending_unregister) { 2531 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2532 i->status = -ENODEV; 2533 goto end; 2534 } 2535 2536 TAILQ_FOREACH(thread, &g_threads, tailq) { 2537 ch = thread_get_io_channel(thread, i->dev); 2538 if (ch != NULL) { 2539 ch->dev->for_each_count++; 2540 i->cur_thread = thread; 2541 i->ch = ch; 2542 pthread_mutex_unlock(&g_devlist_mutex); 2543 rc = spdk_thread_send_msg(thread, _call_channel, i); 2544 assert(rc == 0); 2545 return; 2546 } 2547 } 2548 2549 end: 2550 pthread_mutex_unlock(&g_devlist_mutex); 2551 2552 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2553 assert(rc == 0); 2554 } 2555 2556 static void 2557 __pending_unregister(void *arg) 2558 { 2559 struct io_device *dev = arg; 2560 2561 assert(dev->pending_unregister); 2562 assert(dev->for_each_count == 0); 2563 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2564 } 2565 2566 void 2567 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2568 { 2569 struct spdk_thread *thread; 2570 struct spdk_io_channel *ch; 2571 struct io_device *dev; 2572 int rc __attribute__((unused)); 2573 2574 assert(i->cur_thread == spdk_get_thread()); 2575 2576 i->status = status; 2577 2578 pthread_mutex_lock(&g_devlist_mutex); 2579 dev = i->dev; 2580 if (status) { 2581 goto end; 2582 } 2583 2584 thread = TAILQ_NEXT(i->cur_thread, tailq); 2585 while (thread) { 2586 ch = thread_get_io_channel(thread, dev); 2587 if (ch != NULL) { 2588 i->cur_thread = thread; 2589 i->ch = ch; 2590 pthread_mutex_unlock(&g_devlist_mutex); 2591 rc = spdk_thread_send_msg(thread, _call_channel, i); 2592 assert(rc == 0); 2593 return; 2594 } 2595 thread = TAILQ_NEXT(thread, tailq); 2596 } 2597 2598 end: 2599 dev->for_each_count--; 2600 i->ch = NULL; 2601 pthread_mutex_unlock(&g_devlist_mutex); 2602 2603 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2604 assert(rc == 0); 2605 2606 pthread_mutex_lock(&g_devlist_mutex); 2607 if (dev->pending_unregister && dev->for_each_count == 0) { 2608 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2609 assert(rc == 0); 2610 } 2611 pthread_mutex_unlock(&g_devlist_mutex); 2612 } 2613 2614 static void 2615 thread_interrupt_destroy(struct spdk_thread *thread) 2616 { 2617 struct spdk_fd_group *fgrp = thread->fgrp; 2618 2619 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2620 2621 if (thread->msg_fd < 0) { 2622 return; 2623 } 2624 2625 spdk_fd_group_remove(fgrp, thread->msg_fd); 2626 close(thread->msg_fd); 2627 thread->msg_fd = -1; 2628 2629 spdk_fd_group_destroy(fgrp); 2630 thread->fgrp = NULL; 2631 } 2632 2633 #ifdef __linux__ 2634 static int 2635 thread_interrupt_msg_process(void *arg) 2636 { 2637 struct spdk_thread *thread = arg; 2638 struct spdk_thread *orig_thread; 2639 uint32_t msg_count; 2640 spdk_msg_fn critical_msg; 2641 int rc = 0; 2642 uint64_t notify = 1; 2643 2644 assert(spdk_interrupt_mode_is_enabled()); 2645 2646 orig_thread = spdk_get_thread(); 2647 spdk_set_thread(thread); 2648 2649 /* There may be race between msg_acknowledge and another producer's msg_notify, 2650 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2651 * This can avoid msg notification missing. 2652 */ 2653 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2654 if (rc < 0 && errno != EAGAIN) { 2655 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2656 } 2657 2658 critical_msg = thread->critical_msg; 2659 if (spdk_unlikely(critical_msg != NULL)) { 2660 critical_msg(NULL); 2661 thread->critical_msg = NULL; 2662 rc = 1; 2663 } 2664 2665 msg_count = msg_queue_run_batch(thread, 0); 2666 if (msg_count) { 2667 rc = 1; 2668 } 2669 2670 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2671 if (spdk_unlikely(!thread->in_interrupt)) { 2672 /* The thread transitioned to poll mode in a msg during the above processing. 2673 * Clear msg_fd since thread messages will be polled directly in poll mode. 2674 */ 2675 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2676 if (rc < 0 && errno != EAGAIN) { 2677 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2678 } 2679 } 2680 2681 spdk_set_thread(orig_thread); 2682 return rc; 2683 } 2684 2685 static int 2686 thread_interrupt_create(struct spdk_thread *thread) 2687 { 2688 int rc; 2689 2690 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2691 2692 rc = spdk_fd_group_create(&thread->fgrp); 2693 if (rc) { 2694 return rc; 2695 } 2696 2697 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2698 if (thread->msg_fd < 0) { 2699 rc = -errno; 2700 spdk_fd_group_destroy(thread->fgrp); 2701 thread->fgrp = NULL; 2702 2703 return rc; 2704 } 2705 2706 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2707 thread_interrupt_msg_process, thread); 2708 } 2709 #else 2710 static int 2711 thread_interrupt_create(struct spdk_thread *thread) 2712 { 2713 return -ENOTSUP; 2714 } 2715 #endif 2716 2717 static int 2718 _interrupt_wrapper(void *ctx) 2719 { 2720 struct spdk_interrupt *intr = ctx; 2721 struct spdk_thread *orig_thread, *thread; 2722 int rc; 2723 2724 orig_thread = spdk_get_thread(); 2725 thread = intr->thread; 2726 2727 spdk_set_thread(thread); 2728 2729 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2730 intr->fn, intr->arg); 2731 2732 rc = intr->fn(intr->arg); 2733 2734 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2735 2736 spdk_set_thread(orig_thread); 2737 2738 return rc; 2739 } 2740 2741 struct spdk_interrupt * 2742 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2743 void *arg, const char *name) 2744 { 2745 struct spdk_thread *thread; 2746 struct spdk_interrupt *intr; 2747 int ret; 2748 2749 thread = spdk_get_thread(); 2750 if (!thread) { 2751 assert(false); 2752 return NULL; 2753 } 2754 2755 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2756 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2757 return NULL; 2758 } 2759 2760 intr = calloc(1, sizeof(*intr)); 2761 if (intr == NULL) { 2762 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2763 return NULL; 2764 } 2765 2766 if (name) { 2767 snprintf(intr->name, sizeof(intr->name), "%s", name); 2768 } else { 2769 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2770 } 2771 2772 intr->efd = efd; 2773 intr->thread = thread; 2774 intr->fn = fn; 2775 intr->arg = arg; 2776 2777 ret = spdk_fd_group_add(thread->fgrp, efd, _interrupt_wrapper, intr, intr->name); 2778 2779 if (ret != 0) { 2780 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2781 thread->name, efd, spdk_strerror(-ret)); 2782 free(intr); 2783 return NULL; 2784 } 2785 2786 return intr; 2787 } 2788 2789 void 2790 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2791 { 2792 struct spdk_thread *thread; 2793 struct spdk_interrupt *intr; 2794 2795 intr = *pintr; 2796 if (intr == NULL) { 2797 return; 2798 } 2799 2800 *pintr = NULL; 2801 2802 thread = spdk_get_thread(); 2803 if (!thread) { 2804 assert(false); 2805 return; 2806 } 2807 2808 if (intr->thread != thread) { 2809 wrong_thread(__func__, intr->name, intr->thread, thread); 2810 return; 2811 } 2812 2813 spdk_fd_group_remove(thread->fgrp, intr->efd); 2814 free(intr); 2815 } 2816 2817 int 2818 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2819 enum spdk_interrupt_event_types event_types) 2820 { 2821 struct spdk_thread *thread; 2822 2823 thread = spdk_get_thread(); 2824 if (!thread) { 2825 assert(false); 2826 return -EINVAL; 2827 } 2828 2829 if (intr->thread != thread) { 2830 wrong_thread(__func__, intr->name, intr->thread, thread); 2831 return -EINVAL; 2832 } 2833 2834 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2835 } 2836 2837 int 2838 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2839 { 2840 return spdk_fd_group_get_fd(thread->fgrp); 2841 } 2842 2843 struct spdk_fd_group * 2844 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2845 { 2846 return thread->fgrp; 2847 } 2848 2849 static bool g_interrupt_mode = false; 2850 2851 int 2852 spdk_interrupt_mode_enable(void) 2853 { 2854 /* It must be called once prior to initializing the threading library. 2855 * g_spdk_msg_mempool will be valid if thread library is initialized. 2856 */ 2857 if (g_spdk_msg_mempool) { 2858 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2859 return -1; 2860 } 2861 2862 #ifdef __linux__ 2863 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2864 g_interrupt_mode = true; 2865 return 0; 2866 #else 2867 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2868 g_interrupt_mode = false; 2869 return -ENOTSUP; 2870 #endif 2871 } 2872 2873 bool 2874 spdk_interrupt_mode_is_enabled(void) 2875 { 2876 return g_interrupt_mode; 2877 } 2878 2879 void 2880 spdk_spin_init(struct spdk_spinlock *sspin) 2881 { 2882 int rc; 2883 2884 memset(sspin, 0, sizeof(*sspin)); 2885 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 2886 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2887 } 2888 2889 void 2890 spdk_spin_destroy(struct spdk_spinlock *sspin) 2891 { 2892 int rc; 2893 2894 SPIN_ASSERT_RETURN_VOID(sspin->thread == NULL, SPIN_ERR_LOCK_HELD); 2895 2896 rc = pthread_spin_destroy(&sspin->spinlock); 2897 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2898 } 2899 2900 void 2901 spdk_spin_lock(struct spdk_spinlock *sspin) 2902 { 2903 struct spdk_thread *thread = spdk_get_thread(); 2904 int rc; 2905 2906 SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD); 2907 SPIN_ASSERT_RETURN_VOID(thread != sspin->thread, SPIN_ERR_DEADLOCK); 2908 2909 rc = pthread_spin_lock(&sspin->spinlock); 2910 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2911 2912 sspin->thread = thread; 2913 sspin->thread->lock_count++; 2914 } 2915 2916 void 2917 spdk_spin_unlock(struct spdk_spinlock *sspin) 2918 { 2919 struct spdk_thread *thread = spdk_get_thread(); 2920 int rc; 2921 2922 SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD); 2923 SPIN_ASSERT_RETURN_VOID(thread == sspin->thread, SPIN_ERR_WRONG_THREAD); 2924 2925 SPIN_ASSERT_RETURN_VOID(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT); 2926 thread->lock_count--; 2927 sspin->thread = NULL; 2928 2929 rc = pthread_spin_unlock(&sspin->spinlock); 2930 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2931 } 2932 2933 bool 2934 spdk_spin_held(struct spdk_spinlock *sspin) 2935 { 2936 struct spdk_thread *thread = spdk_get_thread(); 2937 2938 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 2939 2940 return sspin->thread == thread; 2941 } 2942 2943 SPDK_LOG_REGISTER_COMPONENT(thread) 2944