1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/bdev.h" 11 #include "spdk/likely.h" 12 #include "spdk/queue.h" 13 #include "spdk/string.h" 14 #include "spdk/thread.h" 15 #include "spdk/trace.h" 16 #include "spdk/util.h" 17 #include "spdk/fd_group.h" 18 19 #include "spdk/log.h" 20 #include "spdk_internal/thread.h" 21 #include "spdk_internal/usdt.h" 22 #include "thread_internal.h" 23 24 #include "spdk_internal/trace_defs.h" 25 26 #ifdef __linux__ 27 #include <sys/timerfd.h> 28 #include <sys/eventfd.h> 29 #endif 30 31 #define SPDK_MSG_BATCH_SIZE 8 32 #define SPDK_MAX_DEVICE_NAME_LEN 256 33 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 34 #define SPDK_MAX_POLLER_NAME_LEN 256 35 #define SPDK_MAX_THREAD_NAME_LEN 256 36 #define IOBUF_MIN_SMALL_POOL_SIZE 8191 37 #define IOBUF_MIN_LARGE_POOL_SIZE 1023 38 #define IOBUF_ALIGNMENT 512 39 #define IOBUF_MIN_SMALL_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + \ 40 IOBUF_ALIGNMENT) 41 #define IOBUF_MIN_LARGE_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + \ 42 IOBUF_ALIGNMENT) 43 44 static struct spdk_thread *g_app_thread; 45 46 enum spdk_poller_state { 47 /* The poller is registered with a thread but not currently executing its fn. */ 48 SPDK_POLLER_STATE_WAITING, 49 50 /* The poller is currently running its fn. */ 51 SPDK_POLLER_STATE_RUNNING, 52 53 /* The poller was unregistered during the execution of its fn. */ 54 SPDK_POLLER_STATE_UNREGISTERED, 55 56 /* The poller is in the process of being paused. It will be paused 57 * during the next time it's supposed to be executed. 58 */ 59 SPDK_POLLER_STATE_PAUSING, 60 61 /* The poller is registered but currently paused. It's on the 62 * paused_pollers list. 63 */ 64 SPDK_POLLER_STATE_PAUSED, 65 }; 66 67 struct spdk_poller { 68 TAILQ_ENTRY(spdk_poller) tailq; 69 RB_ENTRY(spdk_poller) node; 70 71 /* Current state of the poller; should only be accessed from the poller's thread. */ 72 enum spdk_poller_state state; 73 74 uint64_t period_ticks; 75 uint64_t next_run_tick; 76 uint64_t run_count; 77 uint64_t busy_count; 78 uint64_t id; 79 spdk_poller_fn fn; 80 void *arg; 81 struct spdk_thread *thread; 82 /* Native interruptfd for period or busy poller */ 83 int interruptfd; 84 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 85 void *set_intr_cb_arg; 86 87 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 88 }; 89 90 enum spdk_thread_state { 91 /* The thread is processing poller and message by spdk_thread_poll(). */ 92 SPDK_THREAD_STATE_RUNNING, 93 94 /* The thread is in the process of termination. It reaps unregistering 95 * poller are releasing I/O channel. 96 */ 97 SPDK_THREAD_STATE_EXITING, 98 99 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 100 SPDK_THREAD_STATE_EXITED, 101 }; 102 103 struct spdk_thread { 104 uint64_t tsc_last; 105 struct spdk_thread_stats stats; 106 /* 107 * Contains pollers actively running on this thread. Pollers 108 * are run round-robin. The thread takes one poller from the head 109 * of the ring, executes it, then puts it back at the tail of 110 * the ring. 111 */ 112 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 113 /** 114 * Contains pollers running on this thread with a periodic timer. 115 */ 116 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 117 struct spdk_poller *first_timed_poller; 118 /* 119 * Contains paused pollers. Pollers on this queue are waiting until 120 * they are resumed (in which case they're put onto the active/timer 121 * queues) or unregistered. 122 */ 123 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 124 struct spdk_ring *messages; 125 int msg_fd; 126 SLIST_HEAD(, spdk_msg) msg_cache; 127 size_t msg_cache_count; 128 spdk_msg_fn critical_msg; 129 uint64_t id; 130 uint64_t next_poller_id; 131 enum spdk_thread_state state; 132 int pending_unregister_count; 133 134 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 135 TAILQ_ENTRY(spdk_thread) tailq; 136 137 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 138 struct spdk_cpuset cpumask; 139 uint64_t exit_timeout_tsc; 140 141 int32_t lock_count; 142 143 /* Indicates whether this spdk_thread currently runs in interrupt. */ 144 bool in_interrupt; 145 bool poller_unregistered; 146 struct spdk_fd_group *fgrp; 147 148 /* User context allocated at the end */ 149 uint8_t ctx[0]; 150 }; 151 152 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 153 154 static spdk_new_thread_fn g_new_thread_fn = NULL; 155 static spdk_thread_op_fn g_thread_op_fn = NULL; 156 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 157 static size_t g_ctx_sz = 0; 158 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 159 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 160 * SPDK application is required. 161 */ 162 static uint64_t g_thread_id = 1; 163 164 enum spin_error { 165 SPIN_ERR_NONE, 166 /* Trying to use an SPDK lock while not on an SPDK thread */ 167 SPIN_ERR_NOT_SPDK_THREAD, 168 /* Trying to lock a lock already held by this SPDK thread */ 169 SPIN_ERR_DEADLOCK, 170 /* Trying to unlock a lock not held by this SPDK thread */ 171 SPIN_ERR_WRONG_THREAD, 172 /* pthread_spin_*() returned an error */ 173 SPIN_ERR_PTHREAD, 174 /* Trying to destroy a lock that is held */ 175 SPIN_ERR_LOCK_HELD, 176 /* lock_count is invalid */ 177 SPIN_ERR_LOCK_COUNT, 178 /* 179 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 180 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 181 * deadlock when another SPDK thread on the same pthread tries to take that lock. 182 */ 183 SPIN_ERR_HOLD_DURING_SWITCH, 184 /* Must be last, not an actual error code */ 185 SPIN_ERR_LAST 186 }; 187 188 static const char *spin_error_strings[] = { 189 [SPIN_ERR_NONE] = "No error", 190 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 191 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 192 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 193 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 194 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 195 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 196 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 197 }; 198 199 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 200 ? "Unknown error" : spin_error_strings[err] 201 202 static void 203 __posix_abort(enum spin_error err) 204 { 205 abort(); 206 } 207 208 typedef void (*spin_abort)(enum spin_error err); 209 spin_abort g_spin_abort_fn = __posix_abort; 210 211 #define SPIN_ASSERT_IMPL(cond, err, ret) \ 212 do { \ 213 if (spdk_unlikely(!(cond))) { \ 214 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 215 SPIN_ERROR_STRING(err), #cond); \ 216 g_spin_abort_fn(err); \ 217 ret; \ 218 } \ 219 } while (0) 220 #define SPIN_ASSERT_RETURN_VOID(cond, err) SPIN_ASSERT_IMPL(cond, err, return) 221 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, return ret) 222 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err,) 223 224 struct io_device { 225 void *io_device; 226 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 227 spdk_io_channel_create_cb create_cb; 228 spdk_io_channel_destroy_cb destroy_cb; 229 spdk_io_device_unregister_cb unregister_cb; 230 struct spdk_thread *unregister_thread; 231 uint32_t ctx_size; 232 uint32_t for_each_count; 233 RB_ENTRY(io_device) node; 234 235 uint32_t refcnt; 236 237 bool pending_unregister; 238 bool unregistered; 239 }; 240 241 struct iobuf_channel { 242 spdk_iobuf_entry_stailq_t small_queue; 243 spdk_iobuf_entry_stailq_t large_queue; 244 }; 245 246 struct iobuf_module { 247 char *name; 248 TAILQ_ENTRY(iobuf_module) tailq; 249 }; 250 251 struct iobuf { 252 struct spdk_mempool *small_pool; 253 struct spdk_mempool *large_pool; 254 struct spdk_iobuf_opts opts; 255 TAILQ_HEAD(, iobuf_module) modules; 256 spdk_iobuf_finish_cb finish_cb; 257 void *finish_arg; 258 }; 259 260 static struct iobuf g_iobuf = { 261 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 262 .opts = { 263 .small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE, 264 .large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE, 265 .small_bufsize = IOBUF_MIN_SMALL_BUFSIZE, 266 .large_bufsize = IOBUF_MIN_LARGE_BUFSIZE, 267 }, 268 }; 269 270 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 271 272 static int 273 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 274 { 275 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 276 } 277 278 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 279 280 static int 281 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 282 { 283 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 284 } 285 286 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 287 288 struct spdk_msg { 289 spdk_msg_fn fn; 290 void *arg; 291 292 SLIST_ENTRY(spdk_msg) link; 293 }; 294 295 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 296 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 297 298 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 299 static uint32_t g_thread_count = 0; 300 301 static __thread struct spdk_thread *tls_thread = NULL; 302 303 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 304 { 305 spdk_trace_register_description("THREAD_IOCH_GET", 306 TRACE_THREAD_IOCH_GET, 307 OWNER_NONE, OBJECT_NONE, 0, 308 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 309 spdk_trace_register_description("THREAD_IOCH_PUT", 310 TRACE_THREAD_IOCH_PUT, 311 OWNER_NONE, OBJECT_NONE, 0, 312 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 313 } 314 315 /* 316 * If this compare function returns zero when two next_run_ticks are equal, 317 * the macro RB_INSERT() returns a pointer to the element with the same 318 * next_run_tick. 319 * 320 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 321 * to remove as a parameter. 322 * 323 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 324 * side by returning 1 when two next_run_ticks are equal. 325 */ 326 static inline int 327 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 328 { 329 if (poller1->next_run_tick < poller2->next_run_tick) { 330 return -1; 331 } else { 332 return 1; 333 } 334 } 335 336 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 337 338 static inline struct spdk_thread * 339 _get_thread(void) 340 { 341 return tls_thread; 342 } 343 344 static int 345 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 346 { 347 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 348 349 g_ctx_sz = ctx_sz; 350 351 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 352 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 353 sizeof(struct spdk_msg), 354 0, /* No cache. We do our own. */ 355 SPDK_ENV_SOCKET_ID_ANY); 356 357 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 358 msg_mempool_sz); 359 360 if (!g_spdk_msg_mempool) { 361 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 362 return -ENOMEM; 363 } 364 365 return 0; 366 } 367 368 static void thread_interrupt_destroy(struct spdk_thread *thread); 369 static int thread_interrupt_create(struct spdk_thread *thread); 370 371 static void 372 _free_thread(struct spdk_thread *thread) 373 { 374 struct spdk_io_channel *ch; 375 struct spdk_msg *msg; 376 struct spdk_poller *poller, *ptmp; 377 378 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 379 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 380 thread->name, ch->dev->name); 381 } 382 383 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 384 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 385 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 386 poller->name); 387 } 388 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 389 free(poller); 390 } 391 392 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 393 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 394 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 395 poller->name); 396 } 397 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 398 free(poller); 399 } 400 401 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 402 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 403 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 404 free(poller); 405 } 406 407 pthread_mutex_lock(&g_devlist_mutex); 408 assert(g_thread_count > 0); 409 g_thread_count--; 410 TAILQ_REMOVE(&g_threads, thread, tailq); 411 pthread_mutex_unlock(&g_devlist_mutex); 412 413 msg = SLIST_FIRST(&thread->msg_cache); 414 while (msg != NULL) { 415 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 416 417 assert(thread->msg_cache_count > 0); 418 thread->msg_cache_count--; 419 spdk_mempool_put(g_spdk_msg_mempool, msg); 420 421 msg = SLIST_FIRST(&thread->msg_cache); 422 } 423 424 assert(thread->msg_cache_count == 0); 425 426 if (spdk_interrupt_mode_is_enabled()) { 427 thread_interrupt_destroy(thread); 428 } 429 430 spdk_ring_free(thread->messages); 431 free(thread); 432 } 433 434 int 435 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 436 { 437 assert(g_new_thread_fn == NULL); 438 assert(g_thread_op_fn == NULL); 439 440 if (new_thread_fn == NULL) { 441 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 442 } else { 443 g_new_thread_fn = new_thread_fn; 444 } 445 446 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 447 } 448 449 int 450 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 451 spdk_thread_op_supported_fn thread_op_supported_fn, 452 size_t ctx_sz, size_t msg_mempool_sz) 453 { 454 assert(g_new_thread_fn == NULL); 455 assert(g_thread_op_fn == NULL); 456 assert(g_thread_op_supported_fn == NULL); 457 458 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 459 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 460 return -EINVAL; 461 } 462 463 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 464 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 465 } else { 466 g_thread_op_fn = thread_op_fn; 467 g_thread_op_supported_fn = thread_op_supported_fn; 468 } 469 470 return _thread_lib_init(ctx_sz, msg_mempool_sz); 471 } 472 473 void 474 spdk_thread_lib_fini(void) 475 { 476 struct io_device *dev; 477 478 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 479 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 480 } 481 482 g_new_thread_fn = NULL; 483 g_thread_op_fn = NULL; 484 g_thread_op_supported_fn = NULL; 485 g_ctx_sz = 0; 486 if (g_app_thread != NULL) { 487 _free_thread(g_app_thread); 488 g_app_thread = NULL; 489 } 490 491 if (g_spdk_msg_mempool) { 492 spdk_mempool_free(g_spdk_msg_mempool); 493 g_spdk_msg_mempool = NULL; 494 } 495 } 496 497 struct spdk_thread * 498 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 499 { 500 struct spdk_thread *thread, *null_thread; 501 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 502 int rc = 0, i; 503 504 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 505 if (!thread) { 506 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 507 return NULL; 508 } 509 510 if (cpumask) { 511 spdk_cpuset_copy(&thread->cpumask, cpumask); 512 } else { 513 spdk_cpuset_negate(&thread->cpumask); 514 } 515 516 RB_INIT(&thread->io_channels); 517 TAILQ_INIT(&thread->active_pollers); 518 RB_INIT(&thread->timed_pollers); 519 TAILQ_INIT(&thread->paused_pollers); 520 SLIST_INIT(&thread->msg_cache); 521 thread->msg_cache_count = 0; 522 523 thread->tsc_last = spdk_get_ticks(); 524 525 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 526 * ID exceeds UINT64_MAX a warning message is logged 527 */ 528 thread->next_poller_id = 1; 529 530 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 531 if (!thread->messages) { 532 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 533 free(thread); 534 return NULL; 535 } 536 537 /* Fill the local message pool cache. */ 538 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 539 if (rc == 0) { 540 /* If we can't populate the cache it's ok. The cache will get filled 541 * up organically as messages are passed to the thread. */ 542 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 543 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 544 thread->msg_cache_count++; 545 } 546 } 547 548 if (name) { 549 snprintf(thread->name, sizeof(thread->name), "%s", name); 550 } else { 551 snprintf(thread->name, sizeof(thread->name), "%p", thread); 552 } 553 554 pthread_mutex_lock(&g_devlist_mutex); 555 if (g_thread_id == 0) { 556 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 557 pthread_mutex_unlock(&g_devlist_mutex); 558 _free_thread(thread); 559 return NULL; 560 } 561 thread->id = g_thread_id++; 562 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 563 g_thread_count++; 564 pthread_mutex_unlock(&g_devlist_mutex); 565 566 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 567 thread->id, thread->name); 568 569 if (spdk_interrupt_mode_is_enabled()) { 570 thread->in_interrupt = true; 571 rc = thread_interrupt_create(thread); 572 if (rc != 0) { 573 _free_thread(thread); 574 return NULL; 575 } 576 } 577 578 if (g_new_thread_fn) { 579 rc = g_new_thread_fn(thread); 580 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 581 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 582 } 583 584 if (rc != 0) { 585 _free_thread(thread); 586 return NULL; 587 } 588 589 thread->state = SPDK_THREAD_STATE_RUNNING; 590 591 /* If this is the first thread, save it as the app thread. Use an atomic 592 * compare + exchange to guard against crazy users who might try to 593 * call spdk_thread_create() simultaneously on multiple threads. 594 */ 595 null_thread = NULL; 596 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 597 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 598 599 return thread; 600 } 601 602 struct spdk_thread * 603 spdk_thread_get_app_thread(void) 604 { 605 return g_app_thread; 606 } 607 608 void 609 spdk_set_thread(struct spdk_thread *thread) 610 { 611 tls_thread = thread; 612 } 613 614 static void 615 thread_exit(struct spdk_thread *thread, uint64_t now) 616 { 617 struct spdk_poller *poller; 618 struct spdk_io_channel *ch; 619 620 if (now >= thread->exit_timeout_tsc) { 621 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 622 thread->name); 623 goto exited; 624 } 625 626 if (spdk_ring_count(thread->messages) > 0) { 627 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 628 return; 629 } 630 631 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 632 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 633 SPDK_INFOLOG(thread, 634 "thread %s still has active poller %s\n", 635 thread->name, poller->name); 636 return; 637 } 638 } 639 640 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 641 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 642 SPDK_INFOLOG(thread, 643 "thread %s still has active timed poller %s\n", 644 thread->name, poller->name); 645 return; 646 } 647 } 648 649 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 650 SPDK_INFOLOG(thread, 651 "thread %s still has paused poller %s\n", 652 thread->name, poller->name); 653 return; 654 } 655 656 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 657 SPDK_INFOLOG(thread, 658 "thread %s still has channel for io_device %s\n", 659 thread->name, ch->dev->name); 660 return; 661 } 662 663 if (thread->pending_unregister_count > 0) { 664 SPDK_INFOLOG(thread, 665 "thread %s is still unregistering io_devices\n", 666 thread->name); 667 return; 668 } 669 670 exited: 671 thread->state = SPDK_THREAD_STATE_EXITED; 672 if (spdk_unlikely(thread->in_interrupt)) { 673 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 674 } 675 } 676 677 int 678 spdk_thread_exit(struct spdk_thread *thread) 679 { 680 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 681 682 assert(tls_thread == thread); 683 684 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 685 SPDK_INFOLOG(thread, 686 "thread %s is already exiting\n", 687 thread->name); 688 return 0; 689 } 690 691 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 692 SPDK_THREAD_EXIT_TIMEOUT_SEC); 693 thread->state = SPDK_THREAD_STATE_EXITING; 694 return 0; 695 } 696 697 bool 698 spdk_thread_is_running(struct spdk_thread *thread) 699 { 700 return thread->state == SPDK_THREAD_STATE_RUNNING; 701 } 702 703 bool 704 spdk_thread_is_exited(struct spdk_thread *thread) 705 { 706 return thread->state == SPDK_THREAD_STATE_EXITED; 707 } 708 709 void 710 spdk_thread_destroy(struct spdk_thread *thread) 711 { 712 assert(thread != NULL); 713 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 714 715 assert(thread->state == SPDK_THREAD_STATE_EXITED); 716 717 if (tls_thread == thread) { 718 tls_thread = NULL; 719 } 720 721 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 722 if (thread != g_app_thread) { 723 _free_thread(thread); 724 } 725 } 726 727 void * 728 spdk_thread_get_ctx(struct spdk_thread *thread) 729 { 730 if (g_ctx_sz > 0) { 731 return thread->ctx; 732 } 733 734 return NULL; 735 } 736 737 struct spdk_cpuset * 738 spdk_thread_get_cpumask(struct spdk_thread *thread) 739 { 740 return &thread->cpumask; 741 } 742 743 int 744 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 745 { 746 struct spdk_thread *thread; 747 748 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 749 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 750 assert(false); 751 return -ENOTSUP; 752 } 753 754 thread = spdk_get_thread(); 755 if (!thread) { 756 SPDK_ERRLOG("Called from non-SPDK thread\n"); 757 assert(false); 758 return -EINVAL; 759 } 760 761 spdk_cpuset_copy(&thread->cpumask, cpumask); 762 763 /* Invoke framework's reschedule operation. If this function is called multiple times 764 * in a single spdk_thread_poll() context, the last cpumask will be used in the 765 * reschedule operation. 766 */ 767 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 768 769 return 0; 770 } 771 772 struct spdk_thread * 773 spdk_thread_get_from_ctx(void *ctx) 774 { 775 if (ctx == NULL) { 776 assert(false); 777 return NULL; 778 } 779 780 assert(g_ctx_sz > 0); 781 782 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 783 } 784 785 static inline uint32_t 786 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 787 { 788 unsigned count, i; 789 void *messages[SPDK_MSG_BATCH_SIZE]; 790 uint64_t notify = 1; 791 int rc; 792 793 #ifdef DEBUG 794 /* 795 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 796 * so we will never actually read uninitialized data from events, but just to be sure 797 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 798 */ 799 memset(messages, 0, sizeof(messages)); 800 #endif 801 802 if (max_msgs > 0) { 803 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 804 } else { 805 max_msgs = SPDK_MSG_BATCH_SIZE; 806 } 807 808 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 809 if (spdk_unlikely(thread->in_interrupt) && 810 spdk_ring_count(thread->messages) != 0) { 811 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 812 if (rc < 0) { 813 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 814 } 815 } 816 if (count == 0) { 817 return 0; 818 } 819 820 for (i = 0; i < count; i++) { 821 struct spdk_msg *msg = messages[i]; 822 823 assert(msg != NULL); 824 825 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 826 827 msg->fn(msg->arg); 828 829 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 830 831 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 832 /* Insert the messages at the head. We want to re-use the hot 833 * ones. */ 834 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 835 thread->msg_cache_count++; 836 } else { 837 spdk_mempool_put(g_spdk_msg_mempool, msg); 838 } 839 } 840 841 return count; 842 } 843 844 static void 845 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 846 { 847 struct spdk_poller *tmp __attribute__((unused)); 848 849 poller->next_run_tick = now + poller->period_ticks; 850 851 /* 852 * Insert poller in the thread's timed_pollers tree by next scheduled run time 853 * as its key. 854 */ 855 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 856 assert(tmp == NULL); 857 858 /* Update the cache only if it is empty or the inserted poller is earlier than it. 859 * RB_MIN() is not necessary here because all pollers, which has exactly the same 860 * next_run_tick as the existing poller, are inserted on the right side. 861 */ 862 if (thread->first_timed_poller == NULL || 863 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 864 thread->first_timed_poller = poller; 865 } 866 } 867 868 static inline void 869 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 870 { 871 struct spdk_poller *tmp __attribute__((unused)); 872 873 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 874 assert(tmp != NULL); 875 876 /* This function is not used in any case that is performance critical. 877 * Update the cache simply by RB_MIN() if it needs to be changed. 878 */ 879 if (thread->first_timed_poller == poller) { 880 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 881 } 882 } 883 884 static void 885 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 886 { 887 if (poller->period_ticks) { 888 poller_insert_timer(thread, poller, spdk_get_ticks()); 889 } else { 890 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 891 } 892 } 893 894 static inline void 895 thread_update_stats(struct spdk_thread *thread, uint64_t end, 896 uint64_t start, int rc) 897 { 898 if (rc == 0) { 899 /* Poller status idle */ 900 thread->stats.idle_tsc += end - start; 901 } else if (rc > 0) { 902 /* Poller status busy */ 903 thread->stats.busy_tsc += end - start; 904 } 905 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 906 thread->tsc_last = end; 907 } 908 909 static inline int 910 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 911 { 912 int rc; 913 914 switch (poller->state) { 915 case SPDK_POLLER_STATE_UNREGISTERED: 916 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 917 free(poller); 918 return 0; 919 case SPDK_POLLER_STATE_PAUSING: 920 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 921 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 922 poller->state = SPDK_POLLER_STATE_PAUSED; 923 return 0; 924 case SPDK_POLLER_STATE_WAITING: 925 break; 926 default: 927 assert(false); 928 break; 929 } 930 931 poller->state = SPDK_POLLER_STATE_RUNNING; 932 rc = poller->fn(poller->arg); 933 934 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 935 936 poller->run_count++; 937 if (rc > 0) { 938 poller->busy_count++; 939 } 940 941 #ifdef DEBUG 942 if (rc == -1) { 943 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 944 } 945 #endif 946 947 switch (poller->state) { 948 case SPDK_POLLER_STATE_UNREGISTERED: 949 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 950 free(poller); 951 break; 952 case SPDK_POLLER_STATE_PAUSING: 953 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 954 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 955 poller->state = SPDK_POLLER_STATE_PAUSED; 956 break; 957 case SPDK_POLLER_STATE_PAUSED: 958 case SPDK_POLLER_STATE_WAITING: 959 break; 960 case SPDK_POLLER_STATE_RUNNING: 961 poller->state = SPDK_POLLER_STATE_WAITING; 962 break; 963 default: 964 assert(false); 965 break; 966 } 967 968 return rc; 969 } 970 971 static inline int 972 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 973 uint64_t now) 974 { 975 int rc; 976 977 switch (poller->state) { 978 case SPDK_POLLER_STATE_UNREGISTERED: 979 free(poller); 980 return 0; 981 case SPDK_POLLER_STATE_PAUSING: 982 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 983 poller->state = SPDK_POLLER_STATE_PAUSED; 984 return 0; 985 case SPDK_POLLER_STATE_WAITING: 986 break; 987 default: 988 assert(false); 989 break; 990 } 991 992 poller->state = SPDK_POLLER_STATE_RUNNING; 993 rc = poller->fn(poller->arg); 994 995 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 996 997 poller->run_count++; 998 if (rc > 0) { 999 poller->busy_count++; 1000 } 1001 1002 #ifdef DEBUG 1003 if (rc == -1) { 1004 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1005 } 1006 #endif 1007 1008 switch (poller->state) { 1009 case SPDK_POLLER_STATE_UNREGISTERED: 1010 free(poller); 1011 break; 1012 case SPDK_POLLER_STATE_PAUSING: 1013 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1014 poller->state = SPDK_POLLER_STATE_PAUSED; 1015 break; 1016 case SPDK_POLLER_STATE_PAUSED: 1017 break; 1018 case SPDK_POLLER_STATE_RUNNING: 1019 poller->state = SPDK_POLLER_STATE_WAITING; 1020 /* fallthrough */ 1021 case SPDK_POLLER_STATE_WAITING: 1022 poller_insert_timer(thread, poller, now); 1023 break; 1024 default: 1025 assert(false); 1026 break; 1027 } 1028 1029 return rc; 1030 } 1031 1032 static int 1033 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1034 { 1035 uint32_t msg_count; 1036 struct spdk_poller *poller, *tmp; 1037 spdk_msg_fn critical_msg; 1038 int rc = 0; 1039 1040 thread->tsc_last = now; 1041 1042 critical_msg = thread->critical_msg; 1043 if (spdk_unlikely(critical_msg != NULL)) { 1044 critical_msg(NULL); 1045 thread->critical_msg = NULL; 1046 rc = 1; 1047 } 1048 1049 msg_count = msg_queue_run_batch(thread, max_msgs); 1050 if (msg_count) { 1051 rc = 1; 1052 } 1053 1054 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1055 active_pollers_head, tailq, tmp) { 1056 int poller_rc; 1057 1058 poller_rc = thread_execute_poller(thread, poller); 1059 if (poller_rc > rc) { 1060 rc = poller_rc; 1061 } 1062 } 1063 1064 poller = thread->first_timed_poller; 1065 while (poller != NULL) { 1066 int timer_rc = 0; 1067 1068 if (now < poller->next_run_tick) { 1069 break; 1070 } 1071 1072 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1073 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1074 1075 /* Update the cache to the next timed poller in the list 1076 * only if the current poller is still the closest, otherwise, 1077 * do nothing because the cache has been already updated. 1078 */ 1079 if (thread->first_timed_poller == poller) { 1080 thread->first_timed_poller = tmp; 1081 } 1082 1083 timer_rc = thread_execute_timed_poller(thread, poller, now); 1084 if (timer_rc > rc) { 1085 rc = timer_rc; 1086 } 1087 1088 poller = tmp; 1089 } 1090 1091 return rc; 1092 } 1093 1094 int 1095 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1096 { 1097 struct spdk_thread *orig_thread; 1098 int rc; 1099 uint64_t notify = 1; 1100 1101 orig_thread = _get_thread(); 1102 tls_thread = thread; 1103 1104 if (now == 0) { 1105 now = spdk_get_ticks(); 1106 } 1107 1108 if (spdk_likely(!thread->in_interrupt)) { 1109 rc = thread_poll(thread, max_msgs, now); 1110 if (spdk_unlikely(thread->in_interrupt)) { 1111 /* The thread transitioned to interrupt mode during the above poll. 1112 * Poll it one more time in case that during the transition time 1113 * there is msg received without notification. 1114 */ 1115 rc = thread_poll(thread, max_msgs, now); 1116 } 1117 } else { 1118 /* Non-block wait on thread's fd_group */ 1119 rc = spdk_fd_group_wait(thread->fgrp, 0); 1120 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1121 if (spdk_unlikely(!thread->in_interrupt)) { 1122 /* The thread transitioned to poll mode in a msg during the above processing. 1123 * Clear msg_fd since thread messages will be polled directly in poll mode. 1124 */ 1125 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 1126 if (rc < 0 && errno != EAGAIN) { 1127 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 1128 } 1129 } 1130 1131 /* Reap unregistered pollers out of poller execution in intr mode */ 1132 if (spdk_unlikely(thread->poller_unregistered)) { 1133 struct spdk_poller *poller, *tmp; 1134 1135 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1136 active_pollers_head, tailq, tmp) { 1137 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1138 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1139 free(poller); 1140 } 1141 } 1142 1143 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1144 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1145 poller_remove_timer(thread, poller); 1146 free(poller); 1147 } 1148 } 1149 1150 thread->poller_unregistered = false; 1151 } 1152 } 1153 1154 1155 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1156 thread_exit(thread, now); 1157 } 1158 1159 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1160 1161 tls_thread = orig_thread; 1162 1163 return rc; 1164 } 1165 1166 uint64_t 1167 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1168 { 1169 struct spdk_poller *poller; 1170 1171 poller = thread->first_timed_poller; 1172 if (poller) { 1173 return poller->next_run_tick; 1174 } 1175 1176 return 0; 1177 } 1178 1179 int 1180 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1181 { 1182 return !TAILQ_EMPTY(&thread->active_pollers); 1183 } 1184 1185 static bool 1186 thread_has_unpaused_pollers(struct spdk_thread *thread) 1187 { 1188 if (TAILQ_EMPTY(&thread->active_pollers) && 1189 RB_EMPTY(&thread->timed_pollers)) { 1190 return false; 1191 } 1192 1193 return true; 1194 } 1195 1196 bool 1197 spdk_thread_has_pollers(struct spdk_thread *thread) 1198 { 1199 if (!thread_has_unpaused_pollers(thread) && 1200 TAILQ_EMPTY(&thread->paused_pollers)) { 1201 return false; 1202 } 1203 1204 return true; 1205 } 1206 1207 bool 1208 spdk_thread_is_idle(struct spdk_thread *thread) 1209 { 1210 if (spdk_ring_count(thread->messages) || 1211 thread_has_unpaused_pollers(thread) || 1212 thread->critical_msg != NULL) { 1213 return false; 1214 } 1215 1216 return true; 1217 } 1218 1219 uint32_t 1220 spdk_thread_get_count(void) 1221 { 1222 /* 1223 * Return cached value of the current thread count. We could acquire the 1224 * lock and iterate through the TAILQ of threads to count them, but that 1225 * count could still be invalidated after we release the lock. 1226 */ 1227 return g_thread_count; 1228 } 1229 1230 struct spdk_thread * 1231 spdk_get_thread(void) 1232 { 1233 return _get_thread(); 1234 } 1235 1236 const char * 1237 spdk_thread_get_name(const struct spdk_thread *thread) 1238 { 1239 return thread->name; 1240 } 1241 1242 uint64_t 1243 spdk_thread_get_id(const struct spdk_thread *thread) 1244 { 1245 return thread->id; 1246 } 1247 1248 struct spdk_thread * 1249 spdk_thread_get_by_id(uint64_t id) 1250 { 1251 struct spdk_thread *thread; 1252 1253 if (id == 0 || id >= g_thread_id) { 1254 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1255 return NULL; 1256 } 1257 pthread_mutex_lock(&g_devlist_mutex); 1258 TAILQ_FOREACH(thread, &g_threads, tailq) { 1259 if (thread->id == id) { 1260 break; 1261 } 1262 } 1263 pthread_mutex_unlock(&g_devlist_mutex); 1264 return thread; 1265 } 1266 1267 int 1268 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1269 { 1270 struct spdk_thread *thread; 1271 1272 thread = _get_thread(); 1273 if (!thread) { 1274 SPDK_ERRLOG("No thread allocated\n"); 1275 return -EINVAL; 1276 } 1277 1278 if (stats == NULL) { 1279 return -EINVAL; 1280 } 1281 1282 *stats = thread->stats; 1283 1284 return 0; 1285 } 1286 1287 uint64_t 1288 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1289 { 1290 if (thread == NULL) { 1291 thread = _get_thread(); 1292 } 1293 1294 return thread->tsc_last; 1295 } 1296 1297 static inline int 1298 thread_send_msg_notification(const struct spdk_thread *target_thread) 1299 { 1300 uint64_t notify = 1; 1301 int rc; 1302 1303 /* Not necessary to do notification if interrupt facility is not enabled */ 1304 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1305 return 0; 1306 } 1307 1308 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1309 * after sending thread msg, it is necessary to check whether target thread runs in 1310 * interrupt mode and then decide whether do event notification. 1311 */ 1312 if (spdk_unlikely(target_thread->in_interrupt)) { 1313 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1314 if (rc < 0) { 1315 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1316 return -EIO; 1317 } 1318 } 1319 1320 return 0; 1321 } 1322 1323 int 1324 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1325 { 1326 struct spdk_thread *local_thread; 1327 struct spdk_msg *msg; 1328 int rc; 1329 1330 assert(thread != NULL); 1331 1332 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1333 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1334 return -EIO; 1335 } 1336 1337 local_thread = _get_thread(); 1338 1339 msg = NULL; 1340 if (local_thread != NULL) { 1341 if (local_thread->msg_cache_count > 0) { 1342 msg = SLIST_FIRST(&local_thread->msg_cache); 1343 assert(msg != NULL); 1344 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1345 local_thread->msg_cache_count--; 1346 } 1347 } 1348 1349 if (msg == NULL) { 1350 msg = spdk_mempool_get(g_spdk_msg_mempool); 1351 if (!msg) { 1352 SPDK_ERRLOG("msg could not be allocated\n"); 1353 return -ENOMEM; 1354 } 1355 } 1356 1357 msg->fn = fn; 1358 msg->arg = ctx; 1359 1360 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1361 if (rc != 1) { 1362 SPDK_ERRLOG("msg could not be enqueued\n"); 1363 spdk_mempool_put(g_spdk_msg_mempool, msg); 1364 return -EIO; 1365 } 1366 1367 return thread_send_msg_notification(thread); 1368 } 1369 1370 int 1371 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1372 { 1373 spdk_msg_fn expected = NULL; 1374 1375 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1376 __ATOMIC_SEQ_CST)) { 1377 return -EIO; 1378 } 1379 1380 return thread_send_msg_notification(thread); 1381 } 1382 1383 #ifdef __linux__ 1384 static int 1385 interrupt_timerfd_process(void *arg) 1386 { 1387 struct spdk_poller *poller = arg; 1388 uint64_t exp; 1389 int rc; 1390 1391 /* clear the level of interval timer */ 1392 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1393 if (rc < 0) { 1394 if (rc == -EAGAIN) { 1395 return 0; 1396 } 1397 1398 return rc; 1399 } 1400 1401 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1402 1403 return poller->fn(poller->arg); 1404 } 1405 1406 static int 1407 period_poller_interrupt_init(struct spdk_poller *poller) 1408 { 1409 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1410 int timerfd; 1411 int rc; 1412 1413 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1414 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1415 if (timerfd < 0) { 1416 return -errno; 1417 } 1418 1419 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1420 if (rc < 0) { 1421 close(timerfd); 1422 return rc; 1423 } 1424 1425 poller->interruptfd = timerfd; 1426 return 0; 1427 } 1428 1429 static void 1430 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1431 { 1432 int timerfd = poller->interruptfd; 1433 uint64_t now_tick = spdk_get_ticks(); 1434 uint64_t ticks = spdk_get_ticks_hz(); 1435 int ret; 1436 struct itimerspec new_tv = {}; 1437 struct itimerspec old_tv = {}; 1438 1439 assert(poller->period_ticks != 0); 1440 assert(timerfd >= 0); 1441 1442 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1443 interrupt_mode ? "interrupt" : "poll"); 1444 1445 if (interrupt_mode) { 1446 /* Set repeated timer expiration */ 1447 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1448 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1449 1450 /* Update next timer expiration */ 1451 if (poller->next_run_tick == 0) { 1452 poller->next_run_tick = now_tick + poller->period_ticks; 1453 } else if (poller->next_run_tick < now_tick) { 1454 poller->next_run_tick = now_tick; 1455 } 1456 1457 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1458 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1459 1460 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1461 if (ret < 0) { 1462 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1463 assert(false); 1464 } 1465 } else { 1466 /* Disarm the timer */ 1467 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1468 if (ret < 0) { 1469 /* timerfd_settime's failure indicates that the timerfd is in error */ 1470 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1471 assert(false); 1472 } 1473 1474 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1475 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1476 */ 1477 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1478 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1479 poller_remove_timer(poller->thread, poller); 1480 poller_insert_timer(poller->thread, poller, now_tick); 1481 } 1482 } 1483 1484 static void 1485 poller_interrupt_fini(struct spdk_poller *poller) 1486 { 1487 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1488 assert(poller->interruptfd >= 0); 1489 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1490 close(poller->interruptfd); 1491 poller->interruptfd = -1; 1492 } 1493 1494 static int 1495 busy_poller_interrupt_init(struct spdk_poller *poller) 1496 { 1497 int busy_efd; 1498 int rc; 1499 1500 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1501 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1502 if (busy_efd < 0) { 1503 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1504 return -errno; 1505 } 1506 1507 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1508 poller->fn, poller->arg, poller->name); 1509 if (rc < 0) { 1510 close(busy_efd); 1511 return rc; 1512 } 1513 1514 poller->interruptfd = busy_efd; 1515 return 0; 1516 } 1517 1518 static void 1519 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1520 { 1521 int busy_efd = poller->interruptfd; 1522 uint64_t notify = 1; 1523 int rc __attribute__((unused)); 1524 1525 assert(busy_efd >= 0); 1526 1527 if (interrupt_mode) { 1528 /* Write without read on eventfd will get it repeatedly triggered. */ 1529 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1530 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1531 } 1532 } else { 1533 /* Read on eventfd will clear its level triggering. */ 1534 rc = read(busy_efd, ¬ify, sizeof(notify)); 1535 } 1536 } 1537 1538 #else 1539 1540 static int 1541 period_poller_interrupt_init(struct spdk_poller *poller) 1542 { 1543 return -ENOTSUP; 1544 } 1545 1546 static void 1547 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1548 { 1549 } 1550 1551 static void 1552 poller_interrupt_fini(struct spdk_poller *poller) 1553 { 1554 } 1555 1556 static int 1557 busy_poller_interrupt_init(struct spdk_poller *poller) 1558 { 1559 return -ENOTSUP; 1560 } 1561 1562 static void 1563 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1564 { 1565 } 1566 1567 #endif 1568 1569 void 1570 spdk_poller_register_interrupt(struct spdk_poller *poller, 1571 spdk_poller_set_interrupt_mode_cb cb_fn, 1572 void *cb_arg) 1573 { 1574 assert(poller != NULL); 1575 assert(cb_fn != NULL); 1576 assert(spdk_get_thread() == poller->thread); 1577 1578 if (!spdk_interrupt_mode_is_enabled()) { 1579 return; 1580 } 1581 1582 /* when a poller is created we don't know if the user is ever going to 1583 * enable interrupts on it by calling this function, so the poller 1584 * registration function has to immediately create a interruptfd. 1585 * When this function does get called by user, we have to then destroy 1586 * that interruptfd. 1587 */ 1588 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1589 poller_interrupt_fini(poller); 1590 } 1591 1592 poller->set_intr_cb_fn = cb_fn; 1593 poller->set_intr_cb_arg = cb_arg; 1594 1595 /* Set poller into interrupt mode if thread is in interrupt. */ 1596 if (poller->thread->in_interrupt) { 1597 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1598 } 1599 } 1600 1601 static uint64_t 1602 convert_us_to_ticks(uint64_t us) 1603 { 1604 uint64_t quotient, remainder, ticks; 1605 1606 if (us) { 1607 quotient = us / SPDK_SEC_TO_USEC; 1608 remainder = us % SPDK_SEC_TO_USEC; 1609 ticks = spdk_get_ticks_hz(); 1610 1611 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1612 } else { 1613 return 0; 1614 } 1615 } 1616 1617 static struct spdk_poller * 1618 poller_register(spdk_poller_fn fn, 1619 void *arg, 1620 uint64_t period_microseconds, 1621 const char *name) 1622 { 1623 struct spdk_thread *thread; 1624 struct spdk_poller *poller; 1625 1626 thread = spdk_get_thread(); 1627 if (!thread) { 1628 assert(false); 1629 return NULL; 1630 } 1631 1632 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1633 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1634 return NULL; 1635 } 1636 1637 poller = calloc(1, sizeof(*poller)); 1638 if (poller == NULL) { 1639 SPDK_ERRLOG("Poller memory allocation failed\n"); 1640 return NULL; 1641 } 1642 1643 if (name) { 1644 snprintf(poller->name, sizeof(poller->name), "%s", name); 1645 } else { 1646 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1647 } 1648 1649 poller->state = SPDK_POLLER_STATE_WAITING; 1650 poller->fn = fn; 1651 poller->arg = arg; 1652 poller->thread = thread; 1653 poller->interruptfd = -1; 1654 if (thread->next_poller_id == 0) { 1655 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1656 thread->next_poller_id = 1; 1657 } 1658 poller->id = thread->next_poller_id++; 1659 1660 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1661 1662 if (spdk_interrupt_mode_is_enabled()) { 1663 int rc; 1664 1665 if (period_microseconds) { 1666 rc = period_poller_interrupt_init(poller); 1667 if (rc < 0) { 1668 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1669 free(poller); 1670 return NULL; 1671 } 1672 1673 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1674 } else { 1675 /* If the poller doesn't have a period, create interruptfd that's always 1676 * busy automatically when running in interrupt mode. 1677 */ 1678 rc = busy_poller_interrupt_init(poller); 1679 if (rc > 0) { 1680 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1681 free(poller); 1682 return NULL; 1683 } 1684 1685 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1686 } 1687 } 1688 1689 thread_insert_poller(thread, poller); 1690 1691 return poller; 1692 } 1693 1694 struct spdk_poller * 1695 spdk_poller_register(spdk_poller_fn fn, 1696 void *arg, 1697 uint64_t period_microseconds) 1698 { 1699 return poller_register(fn, arg, period_microseconds, NULL); 1700 } 1701 1702 struct spdk_poller * 1703 spdk_poller_register_named(spdk_poller_fn fn, 1704 void *arg, 1705 uint64_t period_microseconds, 1706 const char *name) 1707 { 1708 return poller_register(fn, arg, period_microseconds, name); 1709 } 1710 1711 static void 1712 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1713 struct spdk_thread *curthread) 1714 { 1715 if (thread == NULL) { 1716 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1717 abort(); 1718 } 1719 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1720 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1721 thread->name, thread->id); 1722 assert(false); 1723 } 1724 1725 void 1726 spdk_poller_unregister(struct spdk_poller **ppoller) 1727 { 1728 struct spdk_thread *thread; 1729 struct spdk_poller *poller; 1730 1731 poller = *ppoller; 1732 if (poller == NULL) { 1733 return; 1734 } 1735 1736 *ppoller = NULL; 1737 1738 thread = spdk_get_thread(); 1739 if (!thread) { 1740 assert(false); 1741 return; 1742 } 1743 1744 if (poller->thread != thread) { 1745 wrong_thread(__func__, poller->name, poller->thread, thread); 1746 return; 1747 } 1748 1749 if (spdk_interrupt_mode_is_enabled()) { 1750 /* Release the interrupt resource for period or busy poller */ 1751 if (poller->interruptfd >= 0) { 1752 poller_interrupt_fini(poller); 1753 } 1754 1755 /* Mark there is poller unregistered. Then unregistered pollers will 1756 * get reaped by spdk_thread_poll also in intr mode. 1757 */ 1758 thread->poller_unregistered = true; 1759 } 1760 1761 /* If the poller was paused, put it on the active_pollers list so that 1762 * its unregistration can be processed by spdk_thread_poll(). 1763 */ 1764 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1765 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1766 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1767 poller->period_ticks = 0; 1768 } 1769 1770 /* Simply set the state to unregistered. The poller will get cleaned up 1771 * in a subsequent call to spdk_thread_poll(). 1772 */ 1773 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1774 } 1775 1776 void 1777 spdk_poller_pause(struct spdk_poller *poller) 1778 { 1779 struct spdk_thread *thread; 1780 1781 thread = spdk_get_thread(); 1782 if (!thread) { 1783 assert(false); 1784 return; 1785 } 1786 1787 if (poller->thread != thread) { 1788 wrong_thread(__func__, poller->name, poller->thread, thread); 1789 return; 1790 } 1791 1792 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1793 * spdk_thread_poll() move it. It allows a poller to be paused from 1794 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1795 * iteration, or from within itself without breaking the logic to always 1796 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1797 */ 1798 switch (poller->state) { 1799 case SPDK_POLLER_STATE_PAUSED: 1800 case SPDK_POLLER_STATE_PAUSING: 1801 break; 1802 case SPDK_POLLER_STATE_RUNNING: 1803 case SPDK_POLLER_STATE_WAITING: 1804 poller->state = SPDK_POLLER_STATE_PAUSING; 1805 break; 1806 default: 1807 assert(false); 1808 break; 1809 } 1810 } 1811 1812 void 1813 spdk_poller_resume(struct spdk_poller *poller) 1814 { 1815 struct spdk_thread *thread; 1816 1817 thread = spdk_get_thread(); 1818 if (!thread) { 1819 assert(false); 1820 return; 1821 } 1822 1823 if (poller->thread != thread) { 1824 wrong_thread(__func__, poller->name, poller->thread, thread); 1825 return; 1826 } 1827 1828 /* If a poller is paused it has to be removed from the paused pollers 1829 * list and put on the active list or timer tree depending on its 1830 * period_ticks. If a poller is still in the process of being paused, 1831 * we just need to flip its state back to waiting, as it's already on 1832 * the appropriate list or tree. 1833 */ 1834 switch (poller->state) { 1835 case SPDK_POLLER_STATE_PAUSED: 1836 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1837 thread_insert_poller(thread, poller); 1838 /* fallthrough */ 1839 case SPDK_POLLER_STATE_PAUSING: 1840 poller->state = SPDK_POLLER_STATE_WAITING; 1841 break; 1842 case SPDK_POLLER_STATE_RUNNING: 1843 case SPDK_POLLER_STATE_WAITING: 1844 break; 1845 default: 1846 assert(false); 1847 break; 1848 } 1849 } 1850 1851 const char * 1852 spdk_poller_get_name(struct spdk_poller *poller) 1853 { 1854 return poller->name; 1855 } 1856 1857 uint64_t 1858 spdk_poller_get_id(struct spdk_poller *poller) 1859 { 1860 return poller->id; 1861 } 1862 1863 const char * 1864 spdk_poller_get_state_str(struct spdk_poller *poller) 1865 { 1866 switch (poller->state) { 1867 case SPDK_POLLER_STATE_WAITING: 1868 return "waiting"; 1869 case SPDK_POLLER_STATE_RUNNING: 1870 return "running"; 1871 case SPDK_POLLER_STATE_UNREGISTERED: 1872 return "unregistered"; 1873 case SPDK_POLLER_STATE_PAUSING: 1874 return "pausing"; 1875 case SPDK_POLLER_STATE_PAUSED: 1876 return "paused"; 1877 default: 1878 return NULL; 1879 } 1880 } 1881 1882 uint64_t 1883 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1884 { 1885 return poller->period_ticks; 1886 } 1887 1888 void 1889 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1890 { 1891 stats->run_count = poller->run_count; 1892 stats->busy_count = poller->busy_count; 1893 } 1894 1895 struct spdk_poller * 1896 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1897 { 1898 return TAILQ_FIRST(&thread->active_pollers); 1899 } 1900 1901 struct spdk_poller * 1902 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1903 { 1904 return TAILQ_NEXT(prev, tailq); 1905 } 1906 1907 struct spdk_poller * 1908 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1909 { 1910 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1911 } 1912 1913 struct spdk_poller * 1914 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1915 { 1916 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1917 } 1918 1919 struct spdk_poller * 1920 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1921 { 1922 return TAILQ_FIRST(&thread->paused_pollers); 1923 } 1924 1925 struct spdk_poller * 1926 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1927 { 1928 return TAILQ_NEXT(prev, tailq); 1929 } 1930 1931 struct spdk_io_channel * 1932 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1933 { 1934 return RB_MIN(io_channel_tree, &thread->io_channels); 1935 } 1936 1937 struct spdk_io_channel * 1938 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1939 { 1940 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1941 } 1942 1943 struct call_thread { 1944 struct spdk_thread *cur_thread; 1945 spdk_msg_fn fn; 1946 void *ctx; 1947 1948 struct spdk_thread *orig_thread; 1949 spdk_msg_fn cpl; 1950 }; 1951 1952 static void 1953 _on_thread(void *ctx) 1954 { 1955 struct call_thread *ct = ctx; 1956 int rc __attribute__((unused)); 1957 1958 ct->fn(ct->ctx); 1959 1960 pthread_mutex_lock(&g_devlist_mutex); 1961 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1962 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1963 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1964 ct->cur_thread->name); 1965 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1966 } 1967 pthread_mutex_unlock(&g_devlist_mutex); 1968 1969 if (!ct->cur_thread) { 1970 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1971 1972 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1973 free(ctx); 1974 } else { 1975 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1976 ct->cur_thread->name); 1977 1978 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1979 } 1980 assert(rc == 0); 1981 } 1982 1983 void 1984 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1985 { 1986 struct call_thread *ct; 1987 struct spdk_thread *thread; 1988 int rc __attribute__((unused)); 1989 1990 ct = calloc(1, sizeof(*ct)); 1991 if (!ct) { 1992 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1993 cpl(ctx); 1994 return; 1995 } 1996 1997 ct->fn = fn; 1998 ct->ctx = ctx; 1999 ct->cpl = cpl; 2000 2001 thread = _get_thread(); 2002 if (!thread) { 2003 SPDK_ERRLOG("No thread allocated\n"); 2004 free(ct); 2005 cpl(ctx); 2006 return; 2007 } 2008 ct->orig_thread = thread; 2009 2010 pthread_mutex_lock(&g_devlist_mutex); 2011 ct->cur_thread = TAILQ_FIRST(&g_threads); 2012 pthread_mutex_unlock(&g_devlist_mutex); 2013 2014 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2015 ct->orig_thread->name); 2016 2017 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2018 assert(rc == 0); 2019 } 2020 2021 static inline void 2022 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2023 { 2024 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2025 return; 2026 } 2027 2028 if (!poller->set_intr_cb_fn) { 2029 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2030 assert(false); 2031 return; 2032 } 2033 2034 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2035 } 2036 2037 void 2038 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2039 { 2040 struct spdk_thread *thread = _get_thread(); 2041 struct spdk_poller *poller, *tmp; 2042 2043 assert(thread); 2044 assert(spdk_interrupt_mode_is_enabled()); 2045 2046 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2047 thread->name, enable_interrupt ? "intr" : "poll", 2048 thread->in_interrupt ? "intr" : "poll"); 2049 2050 if (thread->in_interrupt == enable_interrupt) { 2051 return; 2052 } 2053 2054 /* Set pollers to expected mode */ 2055 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2056 poller_set_interrupt_mode(poller, enable_interrupt); 2057 } 2058 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2059 poller_set_interrupt_mode(poller, enable_interrupt); 2060 } 2061 /* All paused pollers will go to work in interrupt mode */ 2062 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2063 poller_set_interrupt_mode(poller, enable_interrupt); 2064 } 2065 2066 thread->in_interrupt = enable_interrupt; 2067 return; 2068 } 2069 2070 static struct io_device * 2071 io_device_get(void *io_device) 2072 { 2073 struct io_device find = {}; 2074 2075 find.io_device = io_device; 2076 return RB_FIND(io_device_tree, &g_io_devices, &find); 2077 } 2078 2079 void 2080 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2081 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2082 const char *name) 2083 { 2084 struct io_device *dev, *tmp; 2085 struct spdk_thread *thread; 2086 2087 assert(io_device != NULL); 2088 assert(create_cb != NULL); 2089 assert(destroy_cb != NULL); 2090 2091 thread = spdk_get_thread(); 2092 if (!thread) { 2093 SPDK_ERRLOG("called from non-SPDK thread\n"); 2094 assert(false); 2095 return; 2096 } 2097 2098 dev = calloc(1, sizeof(struct io_device)); 2099 if (dev == NULL) { 2100 SPDK_ERRLOG("could not allocate io_device\n"); 2101 return; 2102 } 2103 2104 dev->io_device = io_device; 2105 if (name) { 2106 snprintf(dev->name, sizeof(dev->name), "%s", name); 2107 } else { 2108 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2109 } 2110 dev->create_cb = create_cb; 2111 dev->destroy_cb = destroy_cb; 2112 dev->unregister_cb = NULL; 2113 dev->ctx_size = ctx_size; 2114 dev->for_each_count = 0; 2115 dev->unregistered = false; 2116 dev->refcnt = 0; 2117 2118 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2119 dev->name, dev->io_device, thread->name); 2120 2121 pthread_mutex_lock(&g_devlist_mutex); 2122 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2123 if (tmp != NULL) { 2124 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2125 io_device, tmp->name, dev->name); 2126 free(dev); 2127 } 2128 2129 pthread_mutex_unlock(&g_devlist_mutex); 2130 } 2131 2132 static void 2133 _finish_unregister(void *arg) 2134 { 2135 struct io_device *dev = arg; 2136 struct spdk_thread *thread; 2137 2138 thread = spdk_get_thread(); 2139 assert(thread == dev->unregister_thread); 2140 2141 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2142 dev->name, dev->io_device, thread->name); 2143 2144 assert(thread->pending_unregister_count > 0); 2145 thread->pending_unregister_count--; 2146 2147 dev->unregister_cb(dev->io_device); 2148 free(dev); 2149 } 2150 2151 static void 2152 io_device_free(struct io_device *dev) 2153 { 2154 int rc __attribute__((unused)); 2155 2156 if (dev->unregister_cb == NULL) { 2157 free(dev); 2158 } else { 2159 assert(dev->unregister_thread != NULL); 2160 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2161 dev->name, dev->io_device, dev->unregister_thread->name); 2162 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2163 assert(rc == 0); 2164 } 2165 } 2166 2167 void 2168 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2169 { 2170 struct io_device *dev; 2171 uint32_t refcnt; 2172 struct spdk_thread *thread; 2173 2174 thread = spdk_get_thread(); 2175 if (!thread) { 2176 SPDK_ERRLOG("called from non-SPDK thread\n"); 2177 assert(false); 2178 return; 2179 } 2180 2181 pthread_mutex_lock(&g_devlist_mutex); 2182 dev = io_device_get(io_device); 2183 if (!dev) { 2184 SPDK_ERRLOG("io_device %p not found\n", io_device); 2185 assert(false); 2186 pthread_mutex_unlock(&g_devlist_mutex); 2187 return; 2188 } 2189 2190 /* The for_each_count check differentiates the user attempting to unregister the 2191 * device a second time, from the internal call to this function that occurs 2192 * after the for_each_count reaches 0. 2193 */ 2194 if (dev->pending_unregister && dev->for_each_count > 0) { 2195 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2196 assert(false); 2197 pthread_mutex_unlock(&g_devlist_mutex); 2198 return; 2199 } 2200 2201 dev->unregister_cb = unregister_cb; 2202 dev->unregister_thread = thread; 2203 2204 if (dev->for_each_count > 0) { 2205 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2206 dev->name, io_device, dev->for_each_count); 2207 dev->pending_unregister = true; 2208 pthread_mutex_unlock(&g_devlist_mutex); 2209 return; 2210 } 2211 2212 dev->unregistered = true; 2213 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2214 refcnt = dev->refcnt; 2215 pthread_mutex_unlock(&g_devlist_mutex); 2216 2217 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2218 dev->name, dev->io_device, thread->name); 2219 2220 if (unregister_cb) { 2221 thread->pending_unregister_count++; 2222 } 2223 2224 if (refcnt > 0) { 2225 /* defer deletion */ 2226 return; 2227 } 2228 2229 io_device_free(dev); 2230 } 2231 2232 const char * 2233 spdk_io_device_get_name(struct io_device *dev) 2234 { 2235 return dev->name; 2236 } 2237 2238 static struct spdk_io_channel * 2239 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2240 { 2241 struct spdk_io_channel find = {}; 2242 2243 find.dev = dev; 2244 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2245 } 2246 2247 struct spdk_io_channel * 2248 spdk_get_io_channel(void *io_device) 2249 { 2250 struct spdk_io_channel *ch; 2251 struct spdk_thread *thread; 2252 struct io_device *dev; 2253 int rc; 2254 2255 pthread_mutex_lock(&g_devlist_mutex); 2256 dev = io_device_get(io_device); 2257 if (dev == NULL) { 2258 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2259 pthread_mutex_unlock(&g_devlist_mutex); 2260 return NULL; 2261 } 2262 2263 thread = _get_thread(); 2264 if (!thread) { 2265 SPDK_ERRLOG("No thread allocated\n"); 2266 pthread_mutex_unlock(&g_devlist_mutex); 2267 return NULL; 2268 } 2269 2270 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2271 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2272 pthread_mutex_unlock(&g_devlist_mutex); 2273 return NULL; 2274 } 2275 2276 ch = thread_get_io_channel(thread, dev); 2277 if (ch != NULL) { 2278 ch->ref++; 2279 2280 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2281 ch, dev->name, dev->io_device, thread->name, ch->ref); 2282 2283 /* 2284 * An I/O channel already exists for this device on this 2285 * thread, so return it. 2286 */ 2287 pthread_mutex_unlock(&g_devlist_mutex); 2288 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2289 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2290 return ch; 2291 } 2292 2293 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2294 if (ch == NULL) { 2295 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2296 pthread_mutex_unlock(&g_devlist_mutex); 2297 return NULL; 2298 } 2299 2300 ch->dev = dev; 2301 ch->destroy_cb = dev->destroy_cb; 2302 ch->thread = thread; 2303 ch->ref = 1; 2304 ch->destroy_ref = 0; 2305 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2306 2307 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2308 ch, dev->name, dev->io_device, thread->name, ch->ref); 2309 2310 dev->refcnt++; 2311 2312 pthread_mutex_unlock(&g_devlist_mutex); 2313 2314 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2315 if (rc != 0) { 2316 pthread_mutex_lock(&g_devlist_mutex); 2317 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2318 dev->refcnt--; 2319 free(ch); 2320 pthread_mutex_unlock(&g_devlist_mutex); 2321 return NULL; 2322 } 2323 2324 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2325 return ch; 2326 } 2327 2328 static void 2329 put_io_channel(void *arg) 2330 { 2331 struct spdk_io_channel *ch = arg; 2332 bool do_remove_dev = true; 2333 struct spdk_thread *thread; 2334 2335 thread = spdk_get_thread(); 2336 if (!thread) { 2337 SPDK_ERRLOG("called from non-SPDK thread\n"); 2338 assert(false); 2339 return; 2340 } 2341 2342 SPDK_DEBUGLOG(thread, 2343 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2344 ch, ch->dev->name, ch->dev->io_device, thread->name); 2345 2346 assert(ch->thread == thread); 2347 2348 ch->destroy_ref--; 2349 2350 if (ch->ref > 0 || ch->destroy_ref > 0) { 2351 /* 2352 * Another reference to the associated io_device was requested 2353 * after this message was sent but before it had a chance to 2354 * execute. 2355 */ 2356 return; 2357 } 2358 2359 pthread_mutex_lock(&g_devlist_mutex); 2360 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2361 pthread_mutex_unlock(&g_devlist_mutex); 2362 2363 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2364 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2365 2366 pthread_mutex_lock(&g_devlist_mutex); 2367 ch->dev->refcnt--; 2368 2369 if (!ch->dev->unregistered) { 2370 do_remove_dev = false; 2371 } 2372 2373 if (ch->dev->refcnt > 0) { 2374 do_remove_dev = false; 2375 } 2376 2377 pthread_mutex_unlock(&g_devlist_mutex); 2378 2379 if (do_remove_dev) { 2380 io_device_free(ch->dev); 2381 } 2382 free(ch); 2383 } 2384 2385 void 2386 spdk_put_io_channel(struct spdk_io_channel *ch) 2387 { 2388 struct spdk_thread *thread; 2389 int rc __attribute__((unused)); 2390 2391 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2392 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2393 2394 thread = spdk_get_thread(); 2395 if (!thread) { 2396 SPDK_ERRLOG("called from non-SPDK thread\n"); 2397 assert(false); 2398 return; 2399 } 2400 2401 if (ch->thread != thread) { 2402 wrong_thread(__func__, "ch", ch->thread, thread); 2403 return; 2404 } 2405 2406 SPDK_DEBUGLOG(thread, 2407 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2408 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2409 2410 ch->ref--; 2411 2412 if (ch->ref == 0) { 2413 ch->destroy_ref++; 2414 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2415 assert(rc == 0); 2416 } 2417 } 2418 2419 struct spdk_io_channel * 2420 spdk_io_channel_from_ctx(void *ctx) 2421 { 2422 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2423 } 2424 2425 struct spdk_thread * 2426 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2427 { 2428 return ch->thread; 2429 } 2430 2431 void * 2432 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2433 { 2434 return ch->dev->io_device; 2435 } 2436 2437 const char * 2438 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2439 { 2440 return spdk_io_device_get_name(ch->dev); 2441 } 2442 2443 int 2444 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2445 { 2446 return ch->ref; 2447 } 2448 2449 struct spdk_io_channel_iter { 2450 void *io_device; 2451 struct io_device *dev; 2452 spdk_channel_msg fn; 2453 int status; 2454 void *ctx; 2455 struct spdk_io_channel *ch; 2456 2457 struct spdk_thread *cur_thread; 2458 2459 struct spdk_thread *orig_thread; 2460 spdk_channel_for_each_cpl cpl; 2461 }; 2462 2463 void * 2464 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2465 { 2466 return i->io_device; 2467 } 2468 2469 struct spdk_io_channel * 2470 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2471 { 2472 return i->ch; 2473 } 2474 2475 void * 2476 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2477 { 2478 return i->ctx; 2479 } 2480 2481 static void 2482 _call_completion(void *ctx) 2483 { 2484 struct spdk_io_channel_iter *i = ctx; 2485 2486 if (i->cpl != NULL) { 2487 i->cpl(i, i->status); 2488 } 2489 free(i); 2490 } 2491 2492 static void 2493 _call_channel(void *ctx) 2494 { 2495 struct spdk_io_channel_iter *i = ctx; 2496 struct spdk_io_channel *ch; 2497 2498 /* 2499 * It is possible that the channel was deleted before this 2500 * message had a chance to execute. If so, skip calling 2501 * the fn() on this thread. 2502 */ 2503 pthread_mutex_lock(&g_devlist_mutex); 2504 ch = thread_get_io_channel(i->cur_thread, i->dev); 2505 pthread_mutex_unlock(&g_devlist_mutex); 2506 2507 if (ch) { 2508 i->fn(i); 2509 } else { 2510 spdk_for_each_channel_continue(i, 0); 2511 } 2512 } 2513 2514 void 2515 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2516 spdk_channel_for_each_cpl cpl) 2517 { 2518 struct spdk_thread *thread; 2519 struct spdk_io_channel *ch; 2520 struct spdk_io_channel_iter *i; 2521 int rc __attribute__((unused)); 2522 2523 i = calloc(1, sizeof(*i)); 2524 if (!i) { 2525 SPDK_ERRLOG("Unable to allocate iterator\n"); 2526 assert(false); 2527 return; 2528 } 2529 2530 i->io_device = io_device; 2531 i->fn = fn; 2532 i->ctx = ctx; 2533 i->cpl = cpl; 2534 i->orig_thread = _get_thread(); 2535 2536 pthread_mutex_lock(&g_devlist_mutex); 2537 i->dev = io_device_get(io_device); 2538 if (i->dev == NULL) { 2539 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2540 assert(false); 2541 i->status = -ENODEV; 2542 goto end; 2543 } 2544 2545 /* Do not allow new for_each operations if we are already waiting to unregister 2546 * the device for other for_each operations to complete. 2547 */ 2548 if (i->dev->pending_unregister) { 2549 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2550 i->status = -ENODEV; 2551 goto end; 2552 } 2553 2554 TAILQ_FOREACH(thread, &g_threads, tailq) { 2555 ch = thread_get_io_channel(thread, i->dev); 2556 if (ch != NULL) { 2557 ch->dev->for_each_count++; 2558 i->cur_thread = thread; 2559 i->ch = ch; 2560 pthread_mutex_unlock(&g_devlist_mutex); 2561 rc = spdk_thread_send_msg(thread, _call_channel, i); 2562 assert(rc == 0); 2563 return; 2564 } 2565 } 2566 2567 end: 2568 pthread_mutex_unlock(&g_devlist_mutex); 2569 2570 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2571 assert(rc == 0); 2572 } 2573 2574 static void 2575 __pending_unregister(void *arg) 2576 { 2577 struct io_device *dev = arg; 2578 2579 assert(dev->pending_unregister); 2580 assert(dev->for_each_count == 0); 2581 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2582 } 2583 2584 void 2585 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2586 { 2587 struct spdk_thread *thread; 2588 struct spdk_io_channel *ch; 2589 struct io_device *dev; 2590 int rc __attribute__((unused)); 2591 2592 assert(i->cur_thread == spdk_get_thread()); 2593 2594 i->status = status; 2595 2596 pthread_mutex_lock(&g_devlist_mutex); 2597 dev = i->dev; 2598 if (status) { 2599 goto end; 2600 } 2601 2602 thread = TAILQ_NEXT(i->cur_thread, tailq); 2603 while (thread) { 2604 ch = thread_get_io_channel(thread, dev); 2605 if (ch != NULL) { 2606 i->cur_thread = thread; 2607 i->ch = ch; 2608 pthread_mutex_unlock(&g_devlist_mutex); 2609 rc = spdk_thread_send_msg(thread, _call_channel, i); 2610 assert(rc == 0); 2611 return; 2612 } 2613 thread = TAILQ_NEXT(thread, tailq); 2614 } 2615 2616 end: 2617 dev->for_each_count--; 2618 i->ch = NULL; 2619 pthread_mutex_unlock(&g_devlist_mutex); 2620 2621 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2622 assert(rc == 0); 2623 2624 pthread_mutex_lock(&g_devlist_mutex); 2625 if (dev->pending_unregister && dev->for_each_count == 0) { 2626 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2627 assert(rc == 0); 2628 } 2629 pthread_mutex_unlock(&g_devlist_mutex); 2630 } 2631 2632 struct spdk_interrupt { 2633 int efd; 2634 struct spdk_thread *thread; 2635 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2636 }; 2637 2638 static void 2639 thread_interrupt_destroy(struct spdk_thread *thread) 2640 { 2641 struct spdk_fd_group *fgrp = thread->fgrp; 2642 2643 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2644 2645 if (thread->msg_fd < 0) { 2646 return; 2647 } 2648 2649 spdk_fd_group_remove(fgrp, thread->msg_fd); 2650 close(thread->msg_fd); 2651 thread->msg_fd = -1; 2652 2653 spdk_fd_group_destroy(fgrp); 2654 thread->fgrp = NULL; 2655 } 2656 2657 #ifdef __linux__ 2658 static int 2659 thread_interrupt_msg_process(void *arg) 2660 { 2661 struct spdk_thread *thread = arg; 2662 uint32_t msg_count; 2663 spdk_msg_fn critical_msg; 2664 int rc = 0; 2665 uint64_t notify = 1; 2666 2667 assert(spdk_interrupt_mode_is_enabled()); 2668 2669 /* There may be race between msg_acknowledge and another producer's msg_notify, 2670 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2671 * This can avoid msg notification missing. 2672 */ 2673 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2674 if (rc < 0 && errno != EAGAIN) { 2675 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2676 } 2677 2678 critical_msg = thread->critical_msg; 2679 if (spdk_unlikely(critical_msg != NULL)) { 2680 critical_msg(NULL); 2681 thread->critical_msg = NULL; 2682 rc = 1; 2683 } 2684 2685 msg_count = msg_queue_run_batch(thread, 0); 2686 if (msg_count) { 2687 rc = 1; 2688 } 2689 2690 return rc; 2691 } 2692 2693 static int 2694 thread_interrupt_create(struct spdk_thread *thread) 2695 { 2696 int rc; 2697 2698 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2699 2700 rc = spdk_fd_group_create(&thread->fgrp); 2701 if (rc) { 2702 return rc; 2703 } 2704 2705 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2706 if (thread->msg_fd < 0) { 2707 rc = -errno; 2708 spdk_fd_group_destroy(thread->fgrp); 2709 thread->fgrp = NULL; 2710 2711 return rc; 2712 } 2713 2714 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2715 thread_interrupt_msg_process, thread); 2716 } 2717 #else 2718 static int 2719 thread_interrupt_create(struct spdk_thread *thread) 2720 { 2721 return -ENOTSUP; 2722 } 2723 #endif 2724 2725 struct spdk_interrupt * 2726 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2727 void *arg, const char *name) 2728 { 2729 struct spdk_thread *thread; 2730 struct spdk_interrupt *intr; 2731 int ret; 2732 2733 thread = spdk_get_thread(); 2734 if (!thread) { 2735 assert(false); 2736 return NULL; 2737 } 2738 2739 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2740 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2741 return NULL; 2742 } 2743 2744 intr = calloc(1, sizeof(*intr)); 2745 if (intr == NULL) { 2746 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2747 return NULL; 2748 } 2749 2750 if (name) { 2751 snprintf(intr->name, sizeof(intr->name), "%s", name); 2752 } else { 2753 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2754 } 2755 2756 intr->efd = efd; 2757 intr->thread = thread; 2758 2759 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2760 2761 if (ret != 0) { 2762 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2763 thread->name, efd, spdk_strerror(-ret)); 2764 free(intr); 2765 return NULL; 2766 } 2767 2768 return intr; 2769 } 2770 2771 void 2772 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2773 { 2774 struct spdk_thread *thread; 2775 struct spdk_interrupt *intr; 2776 2777 intr = *pintr; 2778 if (intr == NULL) { 2779 return; 2780 } 2781 2782 *pintr = NULL; 2783 2784 thread = spdk_get_thread(); 2785 if (!thread) { 2786 assert(false); 2787 return; 2788 } 2789 2790 if (intr->thread != thread) { 2791 wrong_thread(__func__, intr->name, intr->thread, thread); 2792 return; 2793 } 2794 2795 spdk_fd_group_remove(thread->fgrp, intr->efd); 2796 free(intr); 2797 } 2798 2799 int 2800 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2801 enum spdk_interrupt_event_types event_types) 2802 { 2803 struct spdk_thread *thread; 2804 2805 thread = spdk_get_thread(); 2806 if (!thread) { 2807 assert(false); 2808 return -EINVAL; 2809 } 2810 2811 if (intr->thread != thread) { 2812 wrong_thread(__func__, intr->name, intr->thread, thread); 2813 return -EINVAL; 2814 } 2815 2816 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2817 } 2818 2819 int 2820 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2821 { 2822 return spdk_fd_group_get_fd(thread->fgrp); 2823 } 2824 2825 static bool g_interrupt_mode = false; 2826 2827 int 2828 spdk_interrupt_mode_enable(void) 2829 { 2830 /* It must be called once prior to initializing the threading library. 2831 * g_spdk_msg_mempool will be valid if thread library is initialized. 2832 */ 2833 if (g_spdk_msg_mempool) { 2834 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2835 return -1; 2836 } 2837 2838 #ifdef __linux__ 2839 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2840 g_interrupt_mode = true; 2841 return 0; 2842 #else 2843 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2844 g_interrupt_mode = false; 2845 return -ENOTSUP; 2846 #endif 2847 } 2848 2849 bool 2850 spdk_interrupt_mode_is_enabled(void) 2851 { 2852 return g_interrupt_mode; 2853 } 2854 2855 void 2856 spdk_spin_init(struct spdk_spinlock *sspin) 2857 { 2858 int rc; 2859 2860 memset(sspin, 0, sizeof(*sspin)); 2861 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 2862 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2863 } 2864 2865 void 2866 spdk_spin_destroy(struct spdk_spinlock *sspin) 2867 { 2868 int rc; 2869 2870 SPIN_ASSERT_RETURN_VOID(sspin->thread == NULL, SPIN_ERR_LOCK_HELD); 2871 2872 rc = pthread_spin_destroy(&sspin->spinlock); 2873 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2874 } 2875 2876 void 2877 spdk_spin_lock(struct spdk_spinlock *sspin) 2878 { 2879 struct spdk_thread *thread = spdk_get_thread(); 2880 int rc; 2881 2882 SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD); 2883 SPIN_ASSERT_RETURN_VOID(thread != sspin->thread, SPIN_ERR_DEADLOCK); 2884 2885 rc = pthread_spin_lock(&sspin->spinlock); 2886 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2887 2888 sspin->thread = thread; 2889 sspin->thread->lock_count++; 2890 } 2891 2892 void 2893 spdk_spin_unlock(struct spdk_spinlock *sspin) 2894 { 2895 struct spdk_thread *thread = spdk_get_thread(); 2896 int rc; 2897 2898 SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD); 2899 SPIN_ASSERT_RETURN_VOID(thread == sspin->thread, SPIN_ERR_WRONG_THREAD); 2900 2901 SPIN_ASSERT_RETURN_VOID(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT); 2902 thread->lock_count--; 2903 sspin->thread = NULL; 2904 2905 rc = pthread_spin_unlock(&sspin->spinlock); 2906 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2907 } 2908 2909 bool 2910 spdk_spin_held(struct spdk_spinlock *sspin) 2911 { 2912 struct spdk_thread *thread = spdk_get_thread(); 2913 2914 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 2915 2916 return sspin->thread == thread; 2917 } 2918 2919 static int 2920 iobuf_channel_create_cb(void *io_device, void *ctx) 2921 { 2922 struct iobuf_channel *ch = ctx; 2923 2924 STAILQ_INIT(&ch->small_queue); 2925 STAILQ_INIT(&ch->large_queue); 2926 2927 return 0; 2928 } 2929 2930 static void 2931 iobuf_channel_destroy_cb(void *io_device, void *ctx) 2932 { 2933 struct iobuf_channel *ch __attribute__((unused)) = ctx; 2934 2935 assert(STAILQ_EMPTY(&ch->small_queue)); 2936 assert(STAILQ_EMPTY(&ch->large_queue)); 2937 } 2938 2939 int 2940 spdk_iobuf_initialize(void) 2941 { 2942 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 2943 int rc = 0; 2944 2945 g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count, 2946 opts->small_bufsize + SPDK_IOBUF_DATA_OFFSET, 0, 2947 SPDK_ENV_SOCKET_ID_ANY); 2948 if (!g_iobuf.small_pool) { 2949 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 2950 rc = -ENOMEM; 2951 goto error; 2952 } 2953 2954 g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count, 2955 opts->large_bufsize + SPDK_IOBUF_DATA_OFFSET, 0, 2956 SPDK_ENV_SOCKET_ID_ANY); 2957 if (!g_iobuf.large_pool) { 2958 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 2959 rc = -ENOMEM; 2960 goto error; 2961 } 2962 2963 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 2964 sizeof(struct iobuf_channel), "iobuf"); 2965 2966 return 0; 2967 error: 2968 spdk_mempool_free(g_iobuf.small_pool); 2969 return rc; 2970 } 2971 2972 static void 2973 iobuf_unregister_cb(void *io_device) 2974 { 2975 struct iobuf_module *module; 2976 2977 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 2978 module = TAILQ_FIRST(&g_iobuf.modules); 2979 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 2980 free(module->name); 2981 free(module); 2982 } 2983 2984 if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { 2985 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 2986 spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); 2987 } 2988 2989 if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { 2990 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 2991 spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); 2992 } 2993 2994 spdk_mempool_free(g_iobuf.small_pool); 2995 spdk_mempool_free(g_iobuf.large_pool); 2996 2997 if (g_iobuf.finish_cb != NULL) { 2998 g_iobuf.finish_cb(g_iobuf.finish_arg); 2999 } 3000 } 3001 3002 void 3003 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 3004 { 3005 g_iobuf.finish_cb = cb_fn; 3006 g_iobuf.finish_arg = cb_arg; 3007 3008 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 3009 } 3010 3011 int 3012 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 3013 { 3014 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 3015 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 3016 IOBUF_MIN_SMALL_POOL_SIZE); 3017 return -EINVAL; 3018 } 3019 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 3020 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 3021 IOBUF_MIN_LARGE_POOL_SIZE); 3022 return -EINVAL; 3023 } 3024 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 3025 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n", 3026 IOBUF_MIN_SMALL_BUFSIZE); 3027 return -EINVAL; 3028 } 3029 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 3030 SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n", 3031 IOBUF_MIN_LARGE_BUFSIZE); 3032 return -EINVAL; 3033 } 3034 3035 g_iobuf.opts = *opts; 3036 3037 return 0; 3038 } 3039 3040 void 3041 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts) 3042 { 3043 *opts = g_iobuf.opts; 3044 } 3045 3046 int 3047 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 3048 uint32_t small_cache_size, uint32_t large_cache_size) 3049 { 3050 struct spdk_io_channel *ioch; 3051 struct iobuf_channel *iobuf_ch; 3052 struct iobuf_module *module; 3053 struct spdk_iobuf_buffer *buf; 3054 uint32_t i; 3055 3056 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 3057 if (strcmp(name, module->name) == 0) { 3058 break; 3059 } 3060 } 3061 3062 if (module == NULL) { 3063 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 3064 return -ENODEV; 3065 } 3066 3067 ioch = spdk_get_io_channel(&g_iobuf); 3068 if (ioch == NULL) { 3069 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 3070 return -ENOMEM; 3071 } 3072 3073 iobuf_ch = spdk_io_channel_get_ctx(ioch); 3074 3075 ch->small.queue = &iobuf_ch->small_queue; 3076 ch->large.queue = &iobuf_ch->large_queue; 3077 ch->small.pool = g_iobuf.small_pool; 3078 ch->large.pool = g_iobuf.large_pool; 3079 ch->small.bufsize = g_iobuf.opts.small_bufsize; 3080 ch->large.bufsize = g_iobuf.opts.large_bufsize; 3081 ch->parent = ioch; 3082 ch->module = module; 3083 ch->small.cache_size = small_cache_size; 3084 ch->large.cache_size = large_cache_size; 3085 ch->small.cache_count = 0; 3086 ch->large.cache_count = 0; 3087 3088 STAILQ_INIT(&ch->small.cache); 3089 STAILQ_INIT(&ch->large.cache); 3090 3091 for (i = 0; i < small_cache_size; ++i) { 3092 buf = spdk_mempool_get(g_iobuf.small_pool); 3093 if (buf == NULL) { 3094 SPDK_ERRLOG("Failed to populate iobuf small buffer cache. " 3095 "You may need to increase spdk_iobuf_opts.small_pool_count\n"); 3096 goto error; 3097 } 3098 STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); 3099 ch->small.cache_count++; 3100 } 3101 for (i = 0; i < large_cache_size; ++i) { 3102 buf = spdk_mempool_get(g_iobuf.large_pool); 3103 if (buf == NULL) { 3104 SPDK_ERRLOG("Failed to populate iobuf large buffer cache. " 3105 "You may need to increase spdk_iobuf_opts.large_pool_count\n"); 3106 goto error; 3107 } 3108 STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); 3109 ch->large.cache_count++; 3110 } 3111 3112 return 0; 3113 error: 3114 spdk_iobuf_channel_fini(ch); 3115 3116 return -ENOMEM; 3117 } 3118 3119 void 3120 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 3121 { 3122 struct spdk_iobuf_entry *entry __attribute__((unused)); 3123 struct spdk_iobuf_buffer *buf; 3124 3125 /* Make sure none of the wait queue entries are coming from this module */ 3126 STAILQ_FOREACH(entry, ch->small.queue, stailq) { 3127 assert(entry->module != ch->module); 3128 } 3129 STAILQ_FOREACH(entry, ch->large.queue, stailq) { 3130 assert(entry->module != ch->module); 3131 } 3132 3133 /* Release cached buffers back to the pool */ 3134 while (!STAILQ_EMPTY(&ch->small.cache)) { 3135 buf = STAILQ_FIRST(&ch->small.cache); 3136 STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); 3137 spdk_mempool_put(ch->small.pool, buf); 3138 ch->small.cache_count--; 3139 } 3140 while (!STAILQ_EMPTY(&ch->large.cache)) { 3141 buf = STAILQ_FIRST(&ch->large.cache); 3142 STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); 3143 spdk_mempool_put(ch->large.pool, buf); 3144 ch->large.cache_count--; 3145 } 3146 3147 assert(ch->small.cache_count == 0); 3148 assert(ch->large.cache_count == 0); 3149 3150 spdk_put_io_channel(ch->parent); 3151 ch->parent = NULL; 3152 } 3153 3154 int 3155 spdk_iobuf_register_module(const char *name) 3156 { 3157 struct iobuf_module *module; 3158 3159 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 3160 if (strcmp(name, module->name) == 0) { 3161 return -EEXIST; 3162 } 3163 } 3164 3165 module = calloc(1, sizeof(*module)); 3166 if (module == NULL) { 3167 return -ENOMEM; 3168 } 3169 3170 module->name = strdup(name); 3171 if (module->name == NULL) { 3172 free(module); 3173 return -ENOMEM; 3174 } 3175 3176 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 3177 3178 return 0; 3179 } 3180 3181 int 3182 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, 3183 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 3184 { 3185 struct spdk_iobuf_entry *entry, *tmp; 3186 int rc; 3187 3188 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 3189 /* We only want to iterate over the entries requested by the module which owns ch */ 3190 if (entry->module != ch->module) { 3191 continue; 3192 } 3193 3194 rc = cb_fn(ch, entry, cb_ctx); 3195 if (rc != 0) { 3196 return rc; 3197 } 3198 } 3199 3200 return 0; 3201 } 3202 3203 void 3204 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 3205 uint64_t len) 3206 { 3207 struct spdk_iobuf_pool *pool; 3208 3209 if (len <= ch->small.bufsize) { 3210 pool = &ch->small; 3211 } else { 3212 assert(len <= ch->large.bufsize); 3213 pool = &ch->large; 3214 } 3215 3216 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 3217 } 3218 3219 SPDK_LOG_REGISTER_COMPONENT(thread) 3220