1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/env.h" 10 #include "spdk/bdev.h" 11 #include "spdk/likely.h" 12 #include "spdk/queue.h" 13 #include "spdk/string.h" 14 #include "spdk/thread.h" 15 #include "spdk/trace.h" 16 #include "spdk/util.h" 17 #include "spdk/fd_group.h" 18 19 #include "spdk/log.h" 20 #include "spdk_internal/thread.h" 21 #include "spdk_internal/usdt.h" 22 #include "thread_internal.h" 23 24 #include "spdk_internal/trace_defs.h" 25 26 #ifdef __linux__ 27 #include <sys/timerfd.h> 28 #include <sys/eventfd.h> 29 #endif 30 31 #define SPDK_MSG_BATCH_SIZE 8 32 #define SPDK_MAX_DEVICE_NAME_LEN 256 33 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 34 #define SPDK_MAX_POLLER_NAME_LEN 256 35 #define SPDK_MAX_THREAD_NAME_LEN 256 36 #define IOBUF_MIN_SMALL_POOL_SIZE 8191 37 #define IOBUF_MIN_LARGE_POOL_SIZE 1023 38 #define IOBUF_ALIGNMENT 512 39 #define IOBUF_MIN_SMALL_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + \ 40 IOBUF_ALIGNMENT) 41 #define IOBUF_MIN_LARGE_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + \ 42 IOBUF_ALIGNMENT) 43 44 static struct spdk_thread *g_app_thread; 45 46 struct spdk_interrupt { 47 int efd; 48 struct spdk_thread *thread; 49 spdk_interrupt_fn fn; 50 void *arg; 51 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 52 }; 53 54 enum spdk_poller_state { 55 /* The poller is registered with a thread but not currently executing its fn. */ 56 SPDK_POLLER_STATE_WAITING, 57 58 /* The poller is currently running its fn. */ 59 SPDK_POLLER_STATE_RUNNING, 60 61 /* The poller was unregistered during the execution of its fn. */ 62 SPDK_POLLER_STATE_UNREGISTERED, 63 64 /* The poller is in the process of being paused. It will be paused 65 * during the next time it's supposed to be executed. 66 */ 67 SPDK_POLLER_STATE_PAUSING, 68 69 /* The poller is registered but currently paused. It's on the 70 * paused_pollers list. 71 */ 72 SPDK_POLLER_STATE_PAUSED, 73 }; 74 75 struct spdk_poller { 76 TAILQ_ENTRY(spdk_poller) tailq; 77 RB_ENTRY(spdk_poller) node; 78 79 /* Current state of the poller; should only be accessed from the poller's thread. */ 80 enum spdk_poller_state state; 81 82 uint64_t period_ticks; 83 uint64_t next_run_tick; 84 uint64_t run_count; 85 uint64_t busy_count; 86 uint64_t id; 87 spdk_poller_fn fn; 88 void *arg; 89 struct spdk_thread *thread; 90 struct spdk_interrupt *intr; 91 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 92 void *set_intr_cb_arg; 93 94 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 95 }; 96 97 enum spdk_thread_state { 98 /* The thread is processing poller and message by spdk_thread_poll(). */ 99 SPDK_THREAD_STATE_RUNNING, 100 101 /* The thread is in the process of termination. It reaps unregistering 102 * poller are releasing I/O channel. 103 */ 104 SPDK_THREAD_STATE_EXITING, 105 106 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 107 SPDK_THREAD_STATE_EXITED, 108 }; 109 110 struct spdk_thread { 111 uint64_t tsc_last; 112 struct spdk_thread_stats stats; 113 /* 114 * Contains pollers actively running on this thread. Pollers 115 * are run round-robin. The thread takes one poller from the head 116 * of the ring, executes it, then puts it back at the tail of 117 * the ring. 118 */ 119 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 120 /** 121 * Contains pollers running on this thread with a periodic timer. 122 */ 123 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 124 struct spdk_poller *first_timed_poller; 125 /* 126 * Contains paused pollers. Pollers on this queue are waiting until 127 * they are resumed (in which case they're put onto the active/timer 128 * queues) or unregistered. 129 */ 130 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 131 struct spdk_ring *messages; 132 int msg_fd; 133 SLIST_HEAD(, spdk_msg) msg_cache; 134 size_t msg_cache_count; 135 spdk_msg_fn critical_msg; 136 uint64_t id; 137 uint64_t next_poller_id; 138 enum spdk_thread_state state; 139 int pending_unregister_count; 140 141 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 142 TAILQ_ENTRY(spdk_thread) tailq; 143 144 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 145 struct spdk_cpuset cpumask; 146 uint64_t exit_timeout_tsc; 147 148 int32_t lock_count; 149 150 /* Indicates whether this spdk_thread currently runs in interrupt. */ 151 bool in_interrupt; 152 bool poller_unregistered; 153 struct spdk_fd_group *fgrp; 154 155 /* User context allocated at the end */ 156 uint8_t ctx[0]; 157 }; 158 159 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 160 161 static spdk_new_thread_fn g_new_thread_fn = NULL; 162 static spdk_thread_op_fn g_thread_op_fn = NULL; 163 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 164 static size_t g_ctx_sz = 0; 165 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 166 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 167 * SPDK application is required. 168 */ 169 static uint64_t g_thread_id = 1; 170 171 enum spin_error { 172 SPIN_ERR_NONE, 173 /* Trying to use an SPDK lock while not on an SPDK thread */ 174 SPIN_ERR_NOT_SPDK_THREAD, 175 /* Trying to lock a lock already held by this SPDK thread */ 176 SPIN_ERR_DEADLOCK, 177 /* Trying to unlock a lock not held by this SPDK thread */ 178 SPIN_ERR_WRONG_THREAD, 179 /* pthread_spin_*() returned an error */ 180 SPIN_ERR_PTHREAD, 181 /* Trying to destroy a lock that is held */ 182 SPIN_ERR_LOCK_HELD, 183 /* lock_count is invalid */ 184 SPIN_ERR_LOCK_COUNT, 185 /* 186 * An spdk_thread may migrate to another pthread. A spinlock held across migration leads to 187 * undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to 188 * deadlock when another SPDK thread on the same pthread tries to take that lock. 189 */ 190 SPIN_ERR_HOLD_DURING_SWITCH, 191 /* Must be last, not an actual error code */ 192 SPIN_ERR_LAST 193 }; 194 195 static const char *spin_error_strings[] = { 196 [SPIN_ERR_NONE] = "No error", 197 [SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread", 198 [SPIN_ERR_DEADLOCK] = "Deadlock detected", 199 [SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread", 200 [SPIN_ERR_PTHREAD] = "Error from pthread_spinlock", 201 [SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock", 202 [SPIN_ERR_LOCK_COUNT] = "Lock count is invalid", 203 [SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU", 204 }; 205 206 #define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \ 207 ? "Unknown error" : spin_error_strings[err] 208 209 static void 210 __posix_abort(enum spin_error err) 211 { 212 abort(); 213 } 214 215 typedef void (*spin_abort)(enum spin_error err); 216 spin_abort g_spin_abort_fn = __posix_abort; 217 218 #define SPIN_ASSERT_IMPL(cond, err, ret) \ 219 do { \ 220 if (spdk_unlikely(!(cond))) { \ 221 SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \ 222 SPIN_ERROR_STRING(err), #cond); \ 223 g_spin_abort_fn(err); \ 224 ret; \ 225 } \ 226 } while (0) 227 #define SPIN_ASSERT_RETURN_VOID(cond, err) SPIN_ASSERT_IMPL(cond, err, return) 228 #define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, return ret) 229 #define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err,) 230 231 struct io_device { 232 void *io_device; 233 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 234 spdk_io_channel_create_cb create_cb; 235 spdk_io_channel_destroy_cb destroy_cb; 236 spdk_io_device_unregister_cb unregister_cb; 237 struct spdk_thread *unregister_thread; 238 uint32_t ctx_size; 239 uint32_t for_each_count; 240 RB_ENTRY(io_device) node; 241 242 uint32_t refcnt; 243 244 bool pending_unregister; 245 bool unregistered; 246 }; 247 248 struct iobuf_channel { 249 spdk_iobuf_entry_stailq_t small_queue; 250 spdk_iobuf_entry_stailq_t large_queue; 251 }; 252 253 struct iobuf_module { 254 char *name; 255 TAILQ_ENTRY(iobuf_module) tailq; 256 }; 257 258 struct iobuf { 259 struct spdk_mempool *small_pool; 260 struct spdk_mempool *large_pool; 261 struct spdk_iobuf_opts opts; 262 TAILQ_HEAD(, iobuf_module) modules; 263 spdk_iobuf_finish_cb finish_cb; 264 void *finish_arg; 265 }; 266 267 static struct iobuf g_iobuf = { 268 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 269 .opts = { 270 .small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE, 271 .large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE, 272 .small_bufsize = IOBUF_MIN_SMALL_BUFSIZE, 273 .large_bufsize = IOBUF_MIN_LARGE_BUFSIZE, 274 }, 275 }; 276 277 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 278 279 static int 280 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 281 { 282 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 283 } 284 285 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 286 287 static int 288 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 289 { 290 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 291 } 292 293 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 294 295 struct spdk_msg { 296 spdk_msg_fn fn; 297 void *arg; 298 299 SLIST_ENTRY(spdk_msg) link; 300 }; 301 302 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 303 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 304 305 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 306 static uint32_t g_thread_count = 0; 307 308 static __thread struct spdk_thread *tls_thread = NULL; 309 310 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 311 { 312 spdk_trace_register_description("THREAD_IOCH_GET", 313 TRACE_THREAD_IOCH_GET, 314 OWNER_NONE, OBJECT_NONE, 0, 315 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 316 spdk_trace_register_description("THREAD_IOCH_PUT", 317 TRACE_THREAD_IOCH_PUT, 318 OWNER_NONE, OBJECT_NONE, 0, 319 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 320 } 321 322 /* 323 * If this compare function returns zero when two next_run_ticks are equal, 324 * the macro RB_INSERT() returns a pointer to the element with the same 325 * next_run_tick. 326 * 327 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 328 * to remove as a parameter. 329 * 330 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 331 * side by returning 1 when two next_run_ticks are equal. 332 */ 333 static inline int 334 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 335 { 336 if (poller1->next_run_tick < poller2->next_run_tick) { 337 return -1; 338 } else { 339 return 1; 340 } 341 } 342 343 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 344 345 static inline struct spdk_thread * 346 _get_thread(void) 347 { 348 return tls_thread; 349 } 350 351 static int 352 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 353 { 354 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 355 356 g_ctx_sz = ctx_sz; 357 358 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 359 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 360 sizeof(struct spdk_msg), 361 0, /* No cache. We do our own. */ 362 SPDK_ENV_SOCKET_ID_ANY); 363 364 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 365 msg_mempool_sz); 366 367 if (!g_spdk_msg_mempool) { 368 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 369 return -ENOMEM; 370 } 371 372 return 0; 373 } 374 375 static void thread_interrupt_destroy(struct spdk_thread *thread); 376 static int thread_interrupt_create(struct spdk_thread *thread); 377 378 static void 379 _free_thread(struct spdk_thread *thread) 380 { 381 struct spdk_io_channel *ch; 382 struct spdk_msg *msg; 383 struct spdk_poller *poller, *ptmp; 384 385 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 386 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 387 thread->name, ch->dev->name); 388 } 389 390 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 391 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 392 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 393 poller->name); 394 } 395 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 396 free(poller); 397 } 398 399 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 400 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 401 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 402 poller->name); 403 } 404 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 405 free(poller); 406 } 407 408 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 409 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 410 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 411 free(poller); 412 } 413 414 pthread_mutex_lock(&g_devlist_mutex); 415 assert(g_thread_count > 0); 416 g_thread_count--; 417 TAILQ_REMOVE(&g_threads, thread, tailq); 418 pthread_mutex_unlock(&g_devlist_mutex); 419 420 msg = SLIST_FIRST(&thread->msg_cache); 421 while (msg != NULL) { 422 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 423 424 assert(thread->msg_cache_count > 0); 425 thread->msg_cache_count--; 426 spdk_mempool_put(g_spdk_msg_mempool, msg); 427 428 msg = SLIST_FIRST(&thread->msg_cache); 429 } 430 431 assert(thread->msg_cache_count == 0); 432 433 if (spdk_interrupt_mode_is_enabled()) { 434 thread_interrupt_destroy(thread); 435 } 436 437 spdk_ring_free(thread->messages); 438 free(thread); 439 } 440 441 int 442 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 443 { 444 assert(g_new_thread_fn == NULL); 445 assert(g_thread_op_fn == NULL); 446 447 if (new_thread_fn == NULL) { 448 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 449 } else { 450 g_new_thread_fn = new_thread_fn; 451 } 452 453 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 454 } 455 456 int 457 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 458 spdk_thread_op_supported_fn thread_op_supported_fn, 459 size_t ctx_sz, size_t msg_mempool_sz) 460 { 461 assert(g_new_thread_fn == NULL); 462 assert(g_thread_op_fn == NULL); 463 assert(g_thread_op_supported_fn == NULL); 464 465 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 466 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 467 return -EINVAL; 468 } 469 470 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 471 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 472 } else { 473 g_thread_op_fn = thread_op_fn; 474 g_thread_op_supported_fn = thread_op_supported_fn; 475 } 476 477 return _thread_lib_init(ctx_sz, msg_mempool_sz); 478 } 479 480 void 481 spdk_thread_lib_fini(void) 482 { 483 struct io_device *dev; 484 485 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 486 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 487 } 488 489 g_new_thread_fn = NULL; 490 g_thread_op_fn = NULL; 491 g_thread_op_supported_fn = NULL; 492 g_ctx_sz = 0; 493 if (g_app_thread != NULL) { 494 _free_thread(g_app_thread); 495 g_app_thread = NULL; 496 } 497 498 if (g_spdk_msg_mempool) { 499 spdk_mempool_free(g_spdk_msg_mempool); 500 g_spdk_msg_mempool = NULL; 501 } 502 } 503 504 struct spdk_thread * 505 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 506 { 507 struct spdk_thread *thread, *null_thread; 508 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 509 int rc = 0, i; 510 511 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 512 if (!thread) { 513 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 514 return NULL; 515 } 516 517 if (cpumask) { 518 spdk_cpuset_copy(&thread->cpumask, cpumask); 519 } else { 520 spdk_cpuset_negate(&thread->cpumask); 521 } 522 523 RB_INIT(&thread->io_channels); 524 TAILQ_INIT(&thread->active_pollers); 525 RB_INIT(&thread->timed_pollers); 526 TAILQ_INIT(&thread->paused_pollers); 527 SLIST_INIT(&thread->msg_cache); 528 thread->msg_cache_count = 0; 529 530 thread->tsc_last = spdk_get_ticks(); 531 532 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 533 * ID exceeds UINT64_MAX a warning message is logged 534 */ 535 thread->next_poller_id = 1; 536 537 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 538 if (!thread->messages) { 539 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 540 free(thread); 541 return NULL; 542 } 543 544 /* Fill the local message pool cache. */ 545 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 546 if (rc == 0) { 547 /* If we can't populate the cache it's ok. The cache will get filled 548 * up organically as messages are passed to the thread. */ 549 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 550 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 551 thread->msg_cache_count++; 552 } 553 } 554 555 if (name) { 556 snprintf(thread->name, sizeof(thread->name), "%s", name); 557 } else { 558 snprintf(thread->name, sizeof(thread->name), "%p", thread); 559 } 560 561 pthread_mutex_lock(&g_devlist_mutex); 562 if (g_thread_id == 0) { 563 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 564 pthread_mutex_unlock(&g_devlist_mutex); 565 _free_thread(thread); 566 return NULL; 567 } 568 thread->id = g_thread_id++; 569 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 570 g_thread_count++; 571 pthread_mutex_unlock(&g_devlist_mutex); 572 573 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 574 thread->id, thread->name); 575 576 if (spdk_interrupt_mode_is_enabled()) { 577 thread->in_interrupt = true; 578 rc = thread_interrupt_create(thread); 579 if (rc != 0) { 580 _free_thread(thread); 581 return NULL; 582 } 583 } 584 585 if (g_new_thread_fn) { 586 rc = g_new_thread_fn(thread); 587 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 588 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 589 } 590 591 if (rc != 0) { 592 _free_thread(thread); 593 return NULL; 594 } 595 596 thread->state = SPDK_THREAD_STATE_RUNNING; 597 598 /* If this is the first thread, save it as the app thread. Use an atomic 599 * compare + exchange to guard against crazy users who might try to 600 * call spdk_thread_create() simultaneously on multiple threads. 601 */ 602 null_thread = NULL; 603 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 604 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 605 606 return thread; 607 } 608 609 struct spdk_thread * 610 spdk_thread_get_app_thread(void) 611 { 612 return g_app_thread; 613 } 614 615 void 616 spdk_set_thread(struct spdk_thread *thread) 617 { 618 tls_thread = thread; 619 } 620 621 static void 622 thread_exit(struct spdk_thread *thread, uint64_t now) 623 { 624 struct spdk_poller *poller; 625 struct spdk_io_channel *ch; 626 627 if (now >= thread->exit_timeout_tsc) { 628 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 629 thread->name); 630 goto exited; 631 } 632 633 if (spdk_ring_count(thread->messages) > 0) { 634 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 635 return; 636 } 637 638 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 639 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 640 SPDK_INFOLOG(thread, 641 "thread %s still has active poller %s\n", 642 thread->name, poller->name); 643 return; 644 } 645 } 646 647 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 648 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 649 SPDK_INFOLOG(thread, 650 "thread %s still has active timed poller %s\n", 651 thread->name, poller->name); 652 return; 653 } 654 } 655 656 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 657 SPDK_INFOLOG(thread, 658 "thread %s still has paused poller %s\n", 659 thread->name, poller->name); 660 return; 661 } 662 663 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 664 SPDK_INFOLOG(thread, 665 "thread %s still has channel for io_device %s\n", 666 thread->name, ch->dev->name); 667 return; 668 } 669 670 if (thread->pending_unregister_count > 0) { 671 SPDK_INFOLOG(thread, 672 "thread %s is still unregistering io_devices\n", 673 thread->name); 674 return; 675 } 676 677 exited: 678 thread->state = SPDK_THREAD_STATE_EXITED; 679 if (spdk_unlikely(thread->in_interrupt)) { 680 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 681 } 682 } 683 684 static void _thread_exit(void *ctx); 685 686 int 687 spdk_thread_exit(struct spdk_thread *thread) 688 { 689 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 690 691 assert(tls_thread == thread); 692 693 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 694 SPDK_INFOLOG(thread, 695 "thread %s is already exiting\n", 696 thread->name); 697 return 0; 698 } 699 700 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 701 SPDK_THREAD_EXIT_TIMEOUT_SEC); 702 thread->state = SPDK_THREAD_STATE_EXITING; 703 704 if (spdk_interrupt_mode_is_enabled()) { 705 spdk_thread_send_msg(thread, _thread_exit, thread); 706 } 707 708 return 0; 709 } 710 711 bool 712 spdk_thread_is_running(struct spdk_thread *thread) 713 { 714 return thread->state == SPDK_THREAD_STATE_RUNNING; 715 } 716 717 bool 718 spdk_thread_is_exited(struct spdk_thread *thread) 719 { 720 return thread->state == SPDK_THREAD_STATE_EXITED; 721 } 722 723 void 724 spdk_thread_destroy(struct spdk_thread *thread) 725 { 726 assert(thread != NULL); 727 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 728 729 assert(thread->state == SPDK_THREAD_STATE_EXITED); 730 731 if (tls_thread == thread) { 732 tls_thread = NULL; 733 } 734 735 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 736 if (thread != g_app_thread) { 737 _free_thread(thread); 738 } 739 } 740 741 void * 742 spdk_thread_get_ctx(struct spdk_thread *thread) 743 { 744 if (g_ctx_sz > 0) { 745 return thread->ctx; 746 } 747 748 return NULL; 749 } 750 751 struct spdk_cpuset * 752 spdk_thread_get_cpumask(struct spdk_thread *thread) 753 { 754 return &thread->cpumask; 755 } 756 757 int 758 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 759 { 760 struct spdk_thread *thread; 761 762 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 763 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 764 assert(false); 765 return -ENOTSUP; 766 } 767 768 thread = spdk_get_thread(); 769 if (!thread) { 770 SPDK_ERRLOG("Called from non-SPDK thread\n"); 771 assert(false); 772 return -EINVAL; 773 } 774 775 spdk_cpuset_copy(&thread->cpumask, cpumask); 776 777 /* Invoke framework's reschedule operation. If this function is called multiple times 778 * in a single spdk_thread_poll() context, the last cpumask will be used in the 779 * reschedule operation. 780 */ 781 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 782 783 return 0; 784 } 785 786 struct spdk_thread * 787 spdk_thread_get_from_ctx(void *ctx) 788 { 789 if (ctx == NULL) { 790 assert(false); 791 return NULL; 792 } 793 794 assert(g_ctx_sz > 0); 795 796 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 797 } 798 799 static inline uint32_t 800 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 801 { 802 unsigned count, i; 803 void *messages[SPDK_MSG_BATCH_SIZE]; 804 uint64_t notify = 1; 805 int rc; 806 807 #ifdef DEBUG 808 /* 809 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 810 * so we will never actually read uninitialized data from events, but just to be sure 811 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 812 */ 813 memset(messages, 0, sizeof(messages)); 814 #endif 815 816 if (max_msgs > 0) { 817 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 818 } else { 819 max_msgs = SPDK_MSG_BATCH_SIZE; 820 } 821 822 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 823 if (spdk_unlikely(thread->in_interrupt) && 824 spdk_ring_count(thread->messages) != 0) { 825 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 826 if (rc < 0) { 827 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 828 } 829 } 830 if (count == 0) { 831 return 0; 832 } 833 834 for (i = 0; i < count; i++) { 835 struct spdk_msg *msg = messages[i]; 836 837 assert(msg != NULL); 838 839 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 840 841 msg->fn(msg->arg); 842 843 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 844 845 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 846 /* Insert the messages at the head. We want to re-use the hot 847 * ones. */ 848 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 849 thread->msg_cache_count++; 850 } else { 851 spdk_mempool_put(g_spdk_msg_mempool, msg); 852 } 853 } 854 855 return count; 856 } 857 858 static void 859 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 860 { 861 struct spdk_poller *tmp __attribute__((unused)); 862 863 poller->next_run_tick = now + poller->period_ticks; 864 865 /* 866 * Insert poller in the thread's timed_pollers tree by next scheduled run time 867 * as its key. 868 */ 869 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 870 assert(tmp == NULL); 871 872 /* Update the cache only if it is empty or the inserted poller is earlier than it. 873 * RB_MIN() is not necessary here because all pollers, which has exactly the same 874 * next_run_tick as the existing poller, are inserted on the right side. 875 */ 876 if (thread->first_timed_poller == NULL || 877 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 878 thread->first_timed_poller = poller; 879 } 880 } 881 882 static inline void 883 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 884 { 885 struct spdk_poller *tmp __attribute__((unused)); 886 887 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 888 assert(tmp != NULL); 889 890 /* This function is not used in any case that is performance critical. 891 * Update the cache simply by RB_MIN() if it needs to be changed. 892 */ 893 if (thread->first_timed_poller == poller) { 894 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 895 } 896 } 897 898 static void 899 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 900 { 901 if (poller->period_ticks) { 902 poller_insert_timer(thread, poller, spdk_get_ticks()); 903 } else { 904 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 905 } 906 } 907 908 static inline void 909 thread_update_stats(struct spdk_thread *thread, uint64_t end, 910 uint64_t start, int rc) 911 { 912 if (rc == 0) { 913 /* Poller status idle */ 914 thread->stats.idle_tsc += end - start; 915 } else if (rc > 0) { 916 /* Poller status busy */ 917 thread->stats.busy_tsc += end - start; 918 } 919 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 920 thread->tsc_last = end; 921 } 922 923 static inline int 924 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 925 { 926 int rc; 927 928 switch (poller->state) { 929 case SPDK_POLLER_STATE_UNREGISTERED: 930 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 931 free(poller); 932 return 0; 933 case SPDK_POLLER_STATE_PAUSING: 934 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 935 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 936 poller->state = SPDK_POLLER_STATE_PAUSED; 937 return 0; 938 case SPDK_POLLER_STATE_WAITING: 939 break; 940 default: 941 assert(false); 942 break; 943 } 944 945 poller->state = SPDK_POLLER_STATE_RUNNING; 946 rc = poller->fn(poller->arg); 947 948 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 949 950 poller->run_count++; 951 if (rc > 0) { 952 poller->busy_count++; 953 } 954 955 #ifdef DEBUG 956 if (rc == -1) { 957 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 958 } 959 #endif 960 961 switch (poller->state) { 962 case SPDK_POLLER_STATE_UNREGISTERED: 963 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 964 free(poller); 965 break; 966 case SPDK_POLLER_STATE_PAUSING: 967 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 968 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 969 poller->state = SPDK_POLLER_STATE_PAUSED; 970 break; 971 case SPDK_POLLER_STATE_PAUSED: 972 case SPDK_POLLER_STATE_WAITING: 973 break; 974 case SPDK_POLLER_STATE_RUNNING: 975 poller->state = SPDK_POLLER_STATE_WAITING; 976 break; 977 default: 978 assert(false); 979 break; 980 } 981 982 return rc; 983 } 984 985 static inline int 986 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 987 uint64_t now) 988 { 989 int rc; 990 991 switch (poller->state) { 992 case SPDK_POLLER_STATE_UNREGISTERED: 993 free(poller); 994 return 0; 995 case SPDK_POLLER_STATE_PAUSING: 996 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 997 poller->state = SPDK_POLLER_STATE_PAUSED; 998 return 0; 999 case SPDK_POLLER_STATE_WAITING: 1000 break; 1001 default: 1002 assert(false); 1003 break; 1004 } 1005 1006 poller->state = SPDK_POLLER_STATE_RUNNING; 1007 rc = poller->fn(poller->arg); 1008 1009 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 1010 1011 poller->run_count++; 1012 if (rc > 0) { 1013 poller->busy_count++; 1014 } 1015 1016 #ifdef DEBUG 1017 if (rc == -1) { 1018 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 1019 } 1020 #endif 1021 1022 switch (poller->state) { 1023 case SPDK_POLLER_STATE_UNREGISTERED: 1024 free(poller); 1025 break; 1026 case SPDK_POLLER_STATE_PAUSING: 1027 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1028 poller->state = SPDK_POLLER_STATE_PAUSED; 1029 break; 1030 case SPDK_POLLER_STATE_PAUSED: 1031 break; 1032 case SPDK_POLLER_STATE_RUNNING: 1033 poller->state = SPDK_POLLER_STATE_WAITING; 1034 /* fallthrough */ 1035 case SPDK_POLLER_STATE_WAITING: 1036 poller_insert_timer(thread, poller, now); 1037 break; 1038 default: 1039 assert(false); 1040 break; 1041 } 1042 1043 return rc; 1044 } 1045 1046 static int 1047 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1048 { 1049 uint32_t msg_count; 1050 struct spdk_poller *poller, *tmp; 1051 spdk_msg_fn critical_msg; 1052 int rc = 0; 1053 1054 thread->tsc_last = now; 1055 1056 critical_msg = thread->critical_msg; 1057 if (spdk_unlikely(critical_msg != NULL)) { 1058 critical_msg(NULL); 1059 thread->critical_msg = NULL; 1060 rc = 1; 1061 } 1062 1063 msg_count = msg_queue_run_batch(thread, max_msgs); 1064 if (msg_count) { 1065 rc = 1; 1066 } 1067 1068 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1069 active_pollers_head, tailq, tmp) { 1070 int poller_rc; 1071 1072 poller_rc = thread_execute_poller(thread, poller); 1073 if (poller_rc > rc) { 1074 rc = poller_rc; 1075 } 1076 } 1077 1078 poller = thread->first_timed_poller; 1079 while (poller != NULL) { 1080 int timer_rc = 0; 1081 1082 if (now < poller->next_run_tick) { 1083 break; 1084 } 1085 1086 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 1087 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 1088 1089 /* Update the cache to the next timed poller in the list 1090 * only if the current poller is still the closest, otherwise, 1091 * do nothing because the cache has been already updated. 1092 */ 1093 if (thread->first_timed_poller == poller) { 1094 thread->first_timed_poller = tmp; 1095 } 1096 1097 timer_rc = thread_execute_timed_poller(thread, poller, now); 1098 if (timer_rc > rc) { 1099 rc = timer_rc; 1100 } 1101 1102 poller = tmp; 1103 } 1104 1105 return rc; 1106 } 1107 1108 static void 1109 _thread_remove_pollers(void *ctx) 1110 { 1111 struct spdk_thread *thread = ctx; 1112 struct spdk_poller *poller, *tmp; 1113 1114 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1115 active_pollers_head, tailq, tmp) { 1116 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1117 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1118 free(poller); 1119 } 1120 } 1121 1122 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1123 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1124 poller_remove_timer(thread, poller); 1125 free(poller); 1126 } 1127 } 1128 1129 thread->poller_unregistered = false; 1130 } 1131 1132 static void 1133 _thread_exit(void *ctx) 1134 { 1135 struct spdk_thread *thread = ctx; 1136 1137 assert(thread->state == SPDK_THREAD_STATE_EXITING); 1138 1139 thread_exit(thread, spdk_get_ticks()); 1140 } 1141 1142 int 1143 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 1144 { 1145 struct spdk_thread *orig_thread; 1146 int rc; 1147 1148 orig_thread = _get_thread(); 1149 tls_thread = thread; 1150 1151 if (now == 0) { 1152 now = spdk_get_ticks(); 1153 } 1154 1155 if (spdk_likely(!thread->in_interrupt)) { 1156 rc = thread_poll(thread, max_msgs, now); 1157 if (spdk_unlikely(thread->in_interrupt)) { 1158 /* The thread transitioned to interrupt mode during the above poll. 1159 * Poll it one more time in case that during the transition time 1160 * there is msg received without notification. 1161 */ 1162 rc = thread_poll(thread, max_msgs, now); 1163 } 1164 1165 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1166 thread_exit(thread, now); 1167 } 1168 } else { 1169 /* Non-block wait on thread's fd_group */ 1170 rc = spdk_fd_group_wait(thread->fgrp, 0); 1171 } 1172 1173 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1174 1175 tls_thread = orig_thread; 1176 1177 return rc; 1178 } 1179 1180 uint64_t 1181 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1182 { 1183 struct spdk_poller *poller; 1184 1185 poller = thread->first_timed_poller; 1186 if (poller) { 1187 return poller->next_run_tick; 1188 } 1189 1190 return 0; 1191 } 1192 1193 int 1194 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1195 { 1196 return !TAILQ_EMPTY(&thread->active_pollers); 1197 } 1198 1199 static bool 1200 thread_has_unpaused_pollers(struct spdk_thread *thread) 1201 { 1202 if (TAILQ_EMPTY(&thread->active_pollers) && 1203 RB_EMPTY(&thread->timed_pollers)) { 1204 return false; 1205 } 1206 1207 return true; 1208 } 1209 1210 bool 1211 spdk_thread_has_pollers(struct spdk_thread *thread) 1212 { 1213 if (!thread_has_unpaused_pollers(thread) && 1214 TAILQ_EMPTY(&thread->paused_pollers)) { 1215 return false; 1216 } 1217 1218 return true; 1219 } 1220 1221 bool 1222 spdk_thread_is_idle(struct spdk_thread *thread) 1223 { 1224 if (spdk_ring_count(thread->messages) || 1225 thread_has_unpaused_pollers(thread) || 1226 thread->critical_msg != NULL) { 1227 return false; 1228 } 1229 1230 return true; 1231 } 1232 1233 uint32_t 1234 spdk_thread_get_count(void) 1235 { 1236 /* 1237 * Return cached value of the current thread count. We could acquire the 1238 * lock and iterate through the TAILQ of threads to count them, but that 1239 * count could still be invalidated after we release the lock. 1240 */ 1241 return g_thread_count; 1242 } 1243 1244 struct spdk_thread * 1245 spdk_get_thread(void) 1246 { 1247 return _get_thread(); 1248 } 1249 1250 const char * 1251 spdk_thread_get_name(const struct spdk_thread *thread) 1252 { 1253 return thread->name; 1254 } 1255 1256 uint64_t 1257 spdk_thread_get_id(const struct spdk_thread *thread) 1258 { 1259 return thread->id; 1260 } 1261 1262 struct spdk_thread * 1263 spdk_thread_get_by_id(uint64_t id) 1264 { 1265 struct spdk_thread *thread; 1266 1267 if (id == 0 || id >= g_thread_id) { 1268 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1269 return NULL; 1270 } 1271 pthread_mutex_lock(&g_devlist_mutex); 1272 TAILQ_FOREACH(thread, &g_threads, tailq) { 1273 if (thread->id == id) { 1274 break; 1275 } 1276 } 1277 pthread_mutex_unlock(&g_devlist_mutex); 1278 return thread; 1279 } 1280 1281 int 1282 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1283 { 1284 struct spdk_thread *thread; 1285 1286 thread = _get_thread(); 1287 if (!thread) { 1288 SPDK_ERRLOG("No thread allocated\n"); 1289 return -EINVAL; 1290 } 1291 1292 if (stats == NULL) { 1293 return -EINVAL; 1294 } 1295 1296 *stats = thread->stats; 1297 1298 return 0; 1299 } 1300 1301 uint64_t 1302 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1303 { 1304 if (thread == NULL) { 1305 thread = _get_thread(); 1306 } 1307 1308 return thread->tsc_last; 1309 } 1310 1311 static inline int 1312 thread_send_msg_notification(const struct spdk_thread *target_thread) 1313 { 1314 uint64_t notify = 1; 1315 int rc; 1316 1317 /* Not necessary to do notification if interrupt facility is not enabled */ 1318 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1319 return 0; 1320 } 1321 1322 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1323 * after sending thread msg, it is necessary to check whether target thread runs in 1324 * interrupt mode and then decide whether do event notification. 1325 */ 1326 if (spdk_unlikely(target_thread->in_interrupt)) { 1327 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1328 if (rc < 0) { 1329 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1330 return -EIO; 1331 } 1332 } 1333 1334 return 0; 1335 } 1336 1337 int 1338 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1339 { 1340 struct spdk_thread *local_thread; 1341 struct spdk_msg *msg; 1342 int rc; 1343 1344 assert(thread != NULL); 1345 1346 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1347 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1348 return -EIO; 1349 } 1350 1351 local_thread = _get_thread(); 1352 1353 msg = NULL; 1354 if (local_thread != NULL) { 1355 if (local_thread->msg_cache_count > 0) { 1356 msg = SLIST_FIRST(&local_thread->msg_cache); 1357 assert(msg != NULL); 1358 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1359 local_thread->msg_cache_count--; 1360 } 1361 } 1362 1363 if (msg == NULL) { 1364 msg = spdk_mempool_get(g_spdk_msg_mempool); 1365 if (!msg) { 1366 SPDK_ERRLOG("msg could not be allocated\n"); 1367 return -ENOMEM; 1368 } 1369 } 1370 1371 msg->fn = fn; 1372 msg->arg = ctx; 1373 1374 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1375 if (rc != 1) { 1376 SPDK_ERRLOG("msg could not be enqueued\n"); 1377 spdk_mempool_put(g_spdk_msg_mempool, msg); 1378 return -EIO; 1379 } 1380 1381 return thread_send_msg_notification(thread); 1382 } 1383 1384 int 1385 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1386 { 1387 spdk_msg_fn expected = NULL; 1388 1389 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1390 __ATOMIC_SEQ_CST)) { 1391 return -EIO; 1392 } 1393 1394 return thread_send_msg_notification(thread); 1395 } 1396 1397 #ifdef __linux__ 1398 static int 1399 interrupt_timerfd_process(void *arg) 1400 { 1401 struct spdk_poller *poller = arg; 1402 uint64_t exp; 1403 int rc; 1404 1405 /* clear the level of interval timer */ 1406 rc = read(poller->intr->efd, &exp, sizeof(exp)); 1407 if (rc < 0) { 1408 if (rc == -EAGAIN) { 1409 return 0; 1410 } 1411 1412 return rc; 1413 } 1414 1415 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1416 1417 return poller->fn(poller->arg); 1418 } 1419 1420 static int 1421 period_poller_interrupt_init(struct spdk_poller *poller) 1422 { 1423 int timerfd; 1424 1425 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1426 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1427 if (timerfd < 0) { 1428 return -errno; 1429 } 1430 1431 poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name); 1432 if (poller->intr == NULL) { 1433 close(timerfd); 1434 return -1; 1435 } 1436 1437 return 0; 1438 } 1439 1440 static void 1441 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1442 { 1443 int timerfd; 1444 uint64_t now_tick = spdk_get_ticks(); 1445 uint64_t ticks = spdk_get_ticks_hz(); 1446 int ret; 1447 struct itimerspec new_tv = {}; 1448 struct itimerspec old_tv = {}; 1449 1450 assert(poller->intr != NULL); 1451 assert(poller->period_ticks != 0); 1452 1453 timerfd = poller->intr->efd; 1454 1455 assert(timerfd >= 0); 1456 1457 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1458 interrupt_mode ? "interrupt" : "poll"); 1459 1460 if (interrupt_mode) { 1461 /* Set repeated timer expiration */ 1462 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1463 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1464 1465 /* Update next timer expiration */ 1466 if (poller->next_run_tick == 0) { 1467 poller->next_run_tick = now_tick + poller->period_ticks; 1468 } else if (poller->next_run_tick < now_tick) { 1469 poller->next_run_tick = now_tick; 1470 } 1471 1472 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1473 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1474 1475 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1476 if (ret < 0) { 1477 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1478 assert(false); 1479 } 1480 } else { 1481 /* Disarm the timer */ 1482 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1483 if (ret < 0) { 1484 /* timerfd_settime's failure indicates that the timerfd is in error */ 1485 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1486 assert(false); 1487 } 1488 1489 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1490 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1491 */ 1492 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1493 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1494 poller_remove_timer(poller->thread, poller); 1495 poller_insert_timer(poller->thread, poller, now_tick); 1496 } 1497 } 1498 1499 static void 1500 poller_interrupt_fini(struct spdk_poller *poller) 1501 { 1502 int fd; 1503 1504 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1505 assert(poller->intr != NULL); 1506 fd = poller->intr->efd; 1507 spdk_interrupt_unregister(&poller->intr); 1508 close(fd); 1509 } 1510 1511 static int 1512 busy_poller_interrupt_init(struct spdk_poller *poller) 1513 { 1514 int busy_efd; 1515 1516 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1517 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1518 if (busy_efd < 0) { 1519 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1520 return -errno; 1521 } 1522 1523 poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name); 1524 if (poller->intr == NULL) { 1525 close(busy_efd); 1526 return -1; 1527 } 1528 1529 return 0; 1530 } 1531 1532 static void 1533 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1534 { 1535 int busy_efd = poller->intr->efd; 1536 uint64_t notify = 1; 1537 int rc __attribute__((unused)); 1538 1539 assert(busy_efd >= 0); 1540 1541 if (interrupt_mode) { 1542 /* Write without read on eventfd will get it repeatedly triggered. */ 1543 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1544 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1545 } 1546 } else { 1547 /* Read on eventfd will clear its level triggering. */ 1548 rc = read(busy_efd, ¬ify, sizeof(notify)); 1549 } 1550 } 1551 1552 #else 1553 1554 static int 1555 period_poller_interrupt_init(struct spdk_poller *poller) 1556 { 1557 return -ENOTSUP; 1558 } 1559 1560 static void 1561 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1562 { 1563 } 1564 1565 static void 1566 poller_interrupt_fini(struct spdk_poller *poller) 1567 { 1568 } 1569 1570 static int 1571 busy_poller_interrupt_init(struct spdk_poller *poller) 1572 { 1573 return -ENOTSUP; 1574 } 1575 1576 static void 1577 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1578 { 1579 } 1580 1581 #endif 1582 1583 void 1584 spdk_poller_register_interrupt(struct spdk_poller *poller, 1585 spdk_poller_set_interrupt_mode_cb cb_fn, 1586 void *cb_arg) 1587 { 1588 assert(poller != NULL); 1589 assert(cb_fn != NULL); 1590 assert(spdk_get_thread() == poller->thread); 1591 1592 if (!spdk_interrupt_mode_is_enabled()) { 1593 return; 1594 } 1595 1596 /* If this poller already had an interrupt, clean the old one up. */ 1597 if (poller->intr != NULL) { 1598 poller_interrupt_fini(poller); 1599 } 1600 1601 poller->set_intr_cb_fn = cb_fn; 1602 poller->set_intr_cb_arg = cb_arg; 1603 1604 /* Set poller into interrupt mode if thread is in interrupt. */ 1605 if (poller->thread->in_interrupt) { 1606 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1607 } 1608 } 1609 1610 static uint64_t 1611 convert_us_to_ticks(uint64_t us) 1612 { 1613 uint64_t quotient, remainder, ticks; 1614 1615 if (us) { 1616 quotient = us / SPDK_SEC_TO_USEC; 1617 remainder = us % SPDK_SEC_TO_USEC; 1618 ticks = spdk_get_ticks_hz(); 1619 1620 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1621 } else { 1622 return 0; 1623 } 1624 } 1625 1626 static struct spdk_poller * 1627 poller_register(spdk_poller_fn fn, 1628 void *arg, 1629 uint64_t period_microseconds, 1630 const char *name) 1631 { 1632 struct spdk_thread *thread; 1633 struct spdk_poller *poller; 1634 1635 thread = spdk_get_thread(); 1636 if (!thread) { 1637 assert(false); 1638 return NULL; 1639 } 1640 1641 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1642 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1643 return NULL; 1644 } 1645 1646 poller = calloc(1, sizeof(*poller)); 1647 if (poller == NULL) { 1648 SPDK_ERRLOG("Poller memory allocation failed\n"); 1649 return NULL; 1650 } 1651 1652 if (name) { 1653 snprintf(poller->name, sizeof(poller->name), "%s", name); 1654 } else { 1655 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1656 } 1657 1658 poller->state = SPDK_POLLER_STATE_WAITING; 1659 poller->fn = fn; 1660 poller->arg = arg; 1661 poller->thread = thread; 1662 poller->intr = NULL; 1663 if (thread->next_poller_id == 0) { 1664 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1665 thread->next_poller_id = 1; 1666 } 1667 poller->id = thread->next_poller_id++; 1668 1669 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1670 1671 if (spdk_interrupt_mode_is_enabled()) { 1672 int rc; 1673 1674 if (period_microseconds) { 1675 rc = period_poller_interrupt_init(poller); 1676 if (rc < 0) { 1677 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1678 free(poller); 1679 return NULL; 1680 } 1681 1682 poller->set_intr_cb_fn = period_poller_set_interrupt_mode; 1683 poller->set_intr_cb_arg = NULL; 1684 1685 } else { 1686 /* If the poller doesn't have a period, create interruptfd that's always 1687 * busy automatically when running in interrupt mode. 1688 */ 1689 rc = busy_poller_interrupt_init(poller); 1690 if (rc > 0) { 1691 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1692 free(poller); 1693 return NULL; 1694 } 1695 1696 poller->set_intr_cb_fn = busy_poller_set_interrupt_mode; 1697 poller->set_intr_cb_arg = NULL; 1698 } 1699 1700 /* Set poller into interrupt mode if thread is in interrupt. */ 1701 if (poller->thread->in_interrupt) { 1702 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1703 } 1704 } 1705 1706 thread_insert_poller(thread, poller); 1707 1708 return poller; 1709 } 1710 1711 struct spdk_poller * 1712 spdk_poller_register(spdk_poller_fn fn, 1713 void *arg, 1714 uint64_t period_microseconds) 1715 { 1716 return poller_register(fn, arg, period_microseconds, NULL); 1717 } 1718 1719 struct spdk_poller * 1720 spdk_poller_register_named(spdk_poller_fn fn, 1721 void *arg, 1722 uint64_t period_microseconds, 1723 const char *name) 1724 { 1725 return poller_register(fn, arg, period_microseconds, name); 1726 } 1727 1728 static void 1729 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1730 struct spdk_thread *curthread) 1731 { 1732 if (thread == NULL) { 1733 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1734 abort(); 1735 } 1736 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1737 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1738 thread->name, thread->id); 1739 assert(false); 1740 } 1741 1742 void 1743 spdk_poller_unregister(struct spdk_poller **ppoller) 1744 { 1745 struct spdk_thread *thread; 1746 struct spdk_poller *poller; 1747 1748 poller = *ppoller; 1749 if (poller == NULL) { 1750 return; 1751 } 1752 1753 *ppoller = NULL; 1754 1755 thread = spdk_get_thread(); 1756 if (!thread) { 1757 assert(false); 1758 return; 1759 } 1760 1761 if (poller->thread != thread) { 1762 wrong_thread(__func__, poller->name, poller->thread, thread); 1763 return; 1764 } 1765 1766 if (spdk_interrupt_mode_is_enabled()) { 1767 /* Release the interrupt resource for period or busy poller */ 1768 if (poller->intr != NULL) { 1769 poller_interrupt_fini(poller); 1770 } 1771 1772 /* If there is not already a pending poller removal, generate 1773 * a message to go process removals. */ 1774 if (!thread->poller_unregistered) { 1775 thread->poller_unregistered = true; 1776 spdk_thread_send_msg(thread, _thread_remove_pollers, thread); 1777 } 1778 } 1779 1780 /* If the poller was paused, put it on the active_pollers list so that 1781 * its unregistration can be processed by spdk_thread_poll(). 1782 */ 1783 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1784 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1785 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1786 poller->period_ticks = 0; 1787 } 1788 1789 /* Simply set the state to unregistered. The poller will get cleaned up 1790 * in a subsequent call to spdk_thread_poll(). 1791 */ 1792 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1793 } 1794 1795 void 1796 spdk_poller_pause(struct spdk_poller *poller) 1797 { 1798 struct spdk_thread *thread; 1799 1800 thread = spdk_get_thread(); 1801 if (!thread) { 1802 assert(false); 1803 return; 1804 } 1805 1806 if (poller->thread != thread) { 1807 wrong_thread(__func__, poller->name, poller->thread, thread); 1808 return; 1809 } 1810 1811 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1812 * spdk_thread_poll() move it. It allows a poller to be paused from 1813 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1814 * iteration, or from within itself without breaking the logic to always 1815 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1816 */ 1817 switch (poller->state) { 1818 case SPDK_POLLER_STATE_PAUSED: 1819 case SPDK_POLLER_STATE_PAUSING: 1820 break; 1821 case SPDK_POLLER_STATE_RUNNING: 1822 case SPDK_POLLER_STATE_WAITING: 1823 poller->state = SPDK_POLLER_STATE_PAUSING; 1824 break; 1825 default: 1826 assert(false); 1827 break; 1828 } 1829 } 1830 1831 void 1832 spdk_poller_resume(struct spdk_poller *poller) 1833 { 1834 struct spdk_thread *thread; 1835 1836 thread = spdk_get_thread(); 1837 if (!thread) { 1838 assert(false); 1839 return; 1840 } 1841 1842 if (poller->thread != thread) { 1843 wrong_thread(__func__, poller->name, poller->thread, thread); 1844 return; 1845 } 1846 1847 /* If a poller is paused it has to be removed from the paused pollers 1848 * list and put on the active list or timer tree depending on its 1849 * period_ticks. If a poller is still in the process of being paused, 1850 * we just need to flip its state back to waiting, as it's already on 1851 * the appropriate list or tree. 1852 */ 1853 switch (poller->state) { 1854 case SPDK_POLLER_STATE_PAUSED: 1855 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1856 thread_insert_poller(thread, poller); 1857 /* fallthrough */ 1858 case SPDK_POLLER_STATE_PAUSING: 1859 poller->state = SPDK_POLLER_STATE_WAITING; 1860 break; 1861 case SPDK_POLLER_STATE_RUNNING: 1862 case SPDK_POLLER_STATE_WAITING: 1863 break; 1864 default: 1865 assert(false); 1866 break; 1867 } 1868 } 1869 1870 const char * 1871 spdk_poller_get_name(struct spdk_poller *poller) 1872 { 1873 return poller->name; 1874 } 1875 1876 uint64_t 1877 spdk_poller_get_id(struct spdk_poller *poller) 1878 { 1879 return poller->id; 1880 } 1881 1882 const char * 1883 spdk_poller_get_state_str(struct spdk_poller *poller) 1884 { 1885 switch (poller->state) { 1886 case SPDK_POLLER_STATE_WAITING: 1887 return "waiting"; 1888 case SPDK_POLLER_STATE_RUNNING: 1889 return "running"; 1890 case SPDK_POLLER_STATE_UNREGISTERED: 1891 return "unregistered"; 1892 case SPDK_POLLER_STATE_PAUSING: 1893 return "pausing"; 1894 case SPDK_POLLER_STATE_PAUSED: 1895 return "paused"; 1896 default: 1897 return NULL; 1898 } 1899 } 1900 1901 uint64_t 1902 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1903 { 1904 return poller->period_ticks; 1905 } 1906 1907 void 1908 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1909 { 1910 stats->run_count = poller->run_count; 1911 stats->busy_count = poller->busy_count; 1912 } 1913 1914 struct spdk_poller * 1915 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1916 { 1917 return TAILQ_FIRST(&thread->active_pollers); 1918 } 1919 1920 struct spdk_poller * 1921 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1922 { 1923 return TAILQ_NEXT(prev, tailq); 1924 } 1925 1926 struct spdk_poller * 1927 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1928 { 1929 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1930 } 1931 1932 struct spdk_poller * 1933 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1934 { 1935 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1936 } 1937 1938 struct spdk_poller * 1939 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1940 { 1941 return TAILQ_FIRST(&thread->paused_pollers); 1942 } 1943 1944 struct spdk_poller * 1945 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1946 { 1947 return TAILQ_NEXT(prev, tailq); 1948 } 1949 1950 struct spdk_io_channel * 1951 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1952 { 1953 return RB_MIN(io_channel_tree, &thread->io_channels); 1954 } 1955 1956 struct spdk_io_channel * 1957 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1958 { 1959 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1960 } 1961 1962 struct call_thread { 1963 struct spdk_thread *cur_thread; 1964 spdk_msg_fn fn; 1965 void *ctx; 1966 1967 struct spdk_thread *orig_thread; 1968 spdk_msg_fn cpl; 1969 }; 1970 1971 static void 1972 _on_thread(void *ctx) 1973 { 1974 struct call_thread *ct = ctx; 1975 int rc __attribute__((unused)); 1976 1977 ct->fn(ct->ctx); 1978 1979 pthread_mutex_lock(&g_devlist_mutex); 1980 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1981 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1982 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1983 ct->cur_thread->name); 1984 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1985 } 1986 pthread_mutex_unlock(&g_devlist_mutex); 1987 1988 if (!ct->cur_thread) { 1989 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1990 1991 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1992 free(ctx); 1993 } else { 1994 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1995 ct->cur_thread->name); 1996 1997 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1998 } 1999 assert(rc == 0); 2000 } 2001 2002 void 2003 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 2004 { 2005 struct call_thread *ct; 2006 struct spdk_thread *thread; 2007 int rc __attribute__((unused)); 2008 2009 ct = calloc(1, sizeof(*ct)); 2010 if (!ct) { 2011 SPDK_ERRLOG("Unable to perform thread iteration\n"); 2012 cpl(ctx); 2013 return; 2014 } 2015 2016 ct->fn = fn; 2017 ct->ctx = ctx; 2018 ct->cpl = cpl; 2019 2020 thread = _get_thread(); 2021 if (!thread) { 2022 SPDK_ERRLOG("No thread allocated\n"); 2023 free(ct); 2024 cpl(ctx); 2025 return; 2026 } 2027 ct->orig_thread = thread; 2028 2029 pthread_mutex_lock(&g_devlist_mutex); 2030 ct->cur_thread = TAILQ_FIRST(&g_threads); 2031 pthread_mutex_unlock(&g_devlist_mutex); 2032 2033 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 2034 ct->orig_thread->name); 2035 2036 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 2037 assert(rc == 0); 2038 } 2039 2040 static inline void 2041 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 2042 { 2043 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 2044 return; 2045 } 2046 2047 if (!poller->set_intr_cb_fn) { 2048 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 2049 assert(false); 2050 return; 2051 } 2052 2053 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 2054 } 2055 2056 void 2057 spdk_thread_set_interrupt_mode(bool enable_interrupt) 2058 { 2059 struct spdk_thread *thread = _get_thread(); 2060 struct spdk_poller *poller, *tmp; 2061 2062 assert(thread); 2063 assert(spdk_interrupt_mode_is_enabled()); 2064 2065 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 2066 thread->name, enable_interrupt ? "intr" : "poll", 2067 thread->in_interrupt ? "intr" : "poll"); 2068 2069 if (thread->in_interrupt == enable_interrupt) { 2070 return; 2071 } 2072 2073 /* Set pollers to expected mode */ 2074 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 2075 poller_set_interrupt_mode(poller, enable_interrupt); 2076 } 2077 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 2078 poller_set_interrupt_mode(poller, enable_interrupt); 2079 } 2080 /* All paused pollers will go to work in interrupt mode */ 2081 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 2082 poller_set_interrupt_mode(poller, enable_interrupt); 2083 } 2084 2085 thread->in_interrupt = enable_interrupt; 2086 return; 2087 } 2088 2089 static struct io_device * 2090 io_device_get(void *io_device) 2091 { 2092 struct io_device find = {}; 2093 2094 find.io_device = io_device; 2095 return RB_FIND(io_device_tree, &g_io_devices, &find); 2096 } 2097 2098 void 2099 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 2100 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 2101 const char *name) 2102 { 2103 struct io_device *dev, *tmp; 2104 struct spdk_thread *thread; 2105 2106 assert(io_device != NULL); 2107 assert(create_cb != NULL); 2108 assert(destroy_cb != NULL); 2109 2110 thread = spdk_get_thread(); 2111 if (!thread) { 2112 SPDK_ERRLOG("called from non-SPDK thread\n"); 2113 assert(false); 2114 return; 2115 } 2116 2117 dev = calloc(1, sizeof(struct io_device)); 2118 if (dev == NULL) { 2119 SPDK_ERRLOG("could not allocate io_device\n"); 2120 return; 2121 } 2122 2123 dev->io_device = io_device; 2124 if (name) { 2125 snprintf(dev->name, sizeof(dev->name), "%s", name); 2126 } else { 2127 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2128 } 2129 dev->create_cb = create_cb; 2130 dev->destroy_cb = destroy_cb; 2131 dev->unregister_cb = NULL; 2132 dev->ctx_size = ctx_size; 2133 dev->for_each_count = 0; 2134 dev->unregistered = false; 2135 dev->refcnt = 0; 2136 2137 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2138 dev->name, dev->io_device, thread->name); 2139 2140 pthread_mutex_lock(&g_devlist_mutex); 2141 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2142 if (tmp != NULL) { 2143 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2144 io_device, tmp->name, dev->name); 2145 free(dev); 2146 } 2147 2148 pthread_mutex_unlock(&g_devlist_mutex); 2149 } 2150 2151 static void 2152 _finish_unregister(void *arg) 2153 { 2154 struct io_device *dev = arg; 2155 struct spdk_thread *thread; 2156 2157 thread = spdk_get_thread(); 2158 assert(thread == dev->unregister_thread); 2159 2160 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2161 dev->name, dev->io_device, thread->name); 2162 2163 assert(thread->pending_unregister_count > 0); 2164 thread->pending_unregister_count--; 2165 2166 dev->unregister_cb(dev->io_device); 2167 free(dev); 2168 } 2169 2170 static void 2171 io_device_free(struct io_device *dev) 2172 { 2173 int rc __attribute__((unused)); 2174 2175 if (dev->unregister_cb == NULL) { 2176 free(dev); 2177 } else { 2178 assert(dev->unregister_thread != NULL); 2179 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2180 dev->name, dev->io_device, dev->unregister_thread->name); 2181 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2182 assert(rc == 0); 2183 } 2184 } 2185 2186 void 2187 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2188 { 2189 struct io_device *dev; 2190 uint32_t refcnt; 2191 struct spdk_thread *thread; 2192 2193 thread = spdk_get_thread(); 2194 if (!thread) { 2195 SPDK_ERRLOG("called from non-SPDK thread\n"); 2196 assert(false); 2197 return; 2198 } 2199 2200 pthread_mutex_lock(&g_devlist_mutex); 2201 dev = io_device_get(io_device); 2202 if (!dev) { 2203 SPDK_ERRLOG("io_device %p not found\n", io_device); 2204 assert(false); 2205 pthread_mutex_unlock(&g_devlist_mutex); 2206 return; 2207 } 2208 2209 /* The for_each_count check differentiates the user attempting to unregister the 2210 * device a second time, from the internal call to this function that occurs 2211 * after the for_each_count reaches 0. 2212 */ 2213 if (dev->pending_unregister && dev->for_each_count > 0) { 2214 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2215 assert(false); 2216 pthread_mutex_unlock(&g_devlist_mutex); 2217 return; 2218 } 2219 2220 dev->unregister_cb = unregister_cb; 2221 dev->unregister_thread = thread; 2222 2223 if (dev->for_each_count > 0) { 2224 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2225 dev->name, io_device, dev->for_each_count); 2226 dev->pending_unregister = true; 2227 pthread_mutex_unlock(&g_devlist_mutex); 2228 return; 2229 } 2230 2231 dev->unregistered = true; 2232 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2233 refcnt = dev->refcnt; 2234 pthread_mutex_unlock(&g_devlist_mutex); 2235 2236 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2237 dev->name, dev->io_device, thread->name); 2238 2239 if (unregister_cb) { 2240 thread->pending_unregister_count++; 2241 } 2242 2243 if (refcnt > 0) { 2244 /* defer deletion */ 2245 return; 2246 } 2247 2248 io_device_free(dev); 2249 } 2250 2251 const char * 2252 spdk_io_device_get_name(struct io_device *dev) 2253 { 2254 return dev->name; 2255 } 2256 2257 static struct spdk_io_channel * 2258 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2259 { 2260 struct spdk_io_channel find = {}; 2261 2262 find.dev = dev; 2263 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2264 } 2265 2266 struct spdk_io_channel * 2267 spdk_get_io_channel(void *io_device) 2268 { 2269 struct spdk_io_channel *ch; 2270 struct spdk_thread *thread; 2271 struct io_device *dev; 2272 int rc; 2273 2274 pthread_mutex_lock(&g_devlist_mutex); 2275 dev = io_device_get(io_device); 2276 if (dev == NULL) { 2277 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2278 pthread_mutex_unlock(&g_devlist_mutex); 2279 return NULL; 2280 } 2281 2282 thread = _get_thread(); 2283 if (!thread) { 2284 SPDK_ERRLOG("No thread allocated\n"); 2285 pthread_mutex_unlock(&g_devlist_mutex); 2286 return NULL; 2287 } 2288 2289 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2290 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2291 pthread_mutex_unlock(&g_devlist_mutex); 2292 return NULL; 2293 } 2294 2295 ch = thread_get_io_channel(thread, dev); 2296 if (ch != NULL) { 2297 ch->ref++; 2298 2299 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2300 ch, dev->name, dev->io_device, thread->name, ch->ref); 2301 2302 /* 2303 * An I/O channel already exists for this device on this 2304 * thread, so return it. 2305 */ 2306 pthread_mutex_unlock(&g_devlist_mutex); 2307 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2308 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2309 return ch; 2310 } 2311 2312 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2313 if (ch == NULL) { 2314 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2315 pthread_mutex_unlock(&g_devlist_mutex); 2316 return NULL; 2317 } 2318 2319 ch->dev = dev; 2320 ch->destroy_cb = dev->destroy_cb; 2321 ch->thread = thread; 2322 ch->ref = 1; 2323 ch->destroy_ref = 0; 2324 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2325 2326 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2327 ch, dev->name, dev->io_device, thread->name, ch->ref); 2328 2329 dev->refcnt++; 2330 2331 pthread_mutex_unlock(&g_devlist_mutex); 2332 2333 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2334 if (rc != 0) { 2335 pthread_mutex_lock(&g_devlist_mutex); 2336 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2337 dev->refcnt--; 2338 free(ch); 2339 pthread_mutex_unlock(&g_devlist_mutex); 2340 return NULL; 2341 } 2342 2343 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2344 return ch; 2345 } 2346 2347 static void 2348 put_io_channel(void *arg) 2349 { 2350 struct spdk_io_channel *ch = arg; 2351 bool do_remove_dev = true; 2352 struct spdk_thread *thread; 2353 2354 thread = spdk_get_thread(); 2355 if (!thread) { 2356 SPDK_ERRLOG("called from non-SPDK thread\n"); 2357 assert(false); 2358 return; 2359 } 2360 2361 SPDK_DEBUGLOG(thread, 2362 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2363 ch, ch->dev->name, ch->dev->io_device, thread->name); 2364 2365 assert(ch->thread == thread); 2366 2367 ch->destroy_ref--; 2368 2369 if (ch->ref > 0 || ch->destroy_ref > 0) { 2370 /* 2371 * Another reference to the associated io_device was requested 2372 * after this message was sent but before it had a chance to 2373 * execute. 2374 */ 2375 return; 2376 } 2377 2378 pthread_mutex_lock(&g_devlist_mutex); 2379 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2380 pthread_mutex_unlock(&g_devlist_mutex); 2381 2382 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2383 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2384 2385 pthread_mutex_lock(&g_devlist_mutex); 2386 ch->dev->refcnt--; 2387 2388 if (!ch->dev->unregistered) { 2389 do_remove_dev = false; 2390 } 2391 2392 if (ch->dev->refcnt > 0) { 2393 do_remove_dev = false; 2394 } 2395 2396 pthread_mutex_unlock(&g_devlist_mutex); 2397 2398 if (do_remove_dev) { 2399 io_device_free(ch->dev); 2400 } 2401 free(ch); 2402 } 2403 2404 void 2405 spdk_put_io_channel(struct spdk_io_channel *ch) 2406 { 2407 struct spdk_thread *thread; 2408 int rc __attribute__((unused)); 2409 2410 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2411 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2412 2413 thread = spdk_get_thread(); 2414 if (!thread) { 2415 SPDK_ERRLOG("called from non-SPDK thread\n"); 2416 assert(false); 2417 return; 2418 } 2419 2420 if (ch->thread != thread) { 2421 wrong_thread(__func__, "ch", ch->thread, thread); 2422 return; 2423 } 2424 2425 SPDK_DEBUGLOG(thread, 2426 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2427 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2428 2429 ch->ref--; 2430 2431 if (ch->ref == 0) { 2432 ch->destroy_ref++; 2433 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2434 assert(rc == 0); 2435 } 2436 } 2437 2438 struct spdk_io_channel * 2439 spdk_io_channel_from_ctx(void *ctx) 2440 { 2441 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2442 } 2443 2444 struct spdk_thread * 2445 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2446 { 2447 return ch->thread; 2448 } 2449 2450 void * 2451 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2452 { 2453 return ch->dev->io_device; 2454 } 2455 2456 const char * 2457 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2458 { 2459 return spdk_io_device_get_name(ch->dev); 2460 } 2461 2462 int 2463 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2464 { 2465 return ch->ref; 2466 } 2467 2468 struct spdk_io_channel_iter { 2469 void *io_device; 2470 struct io_device *dev; 2471 spdk_channel_msg fn; 2472 int status; 2473 void *ctx; 2474 struct spdk_io_channel *ch; 2475 2476 struct spdk_thread *cur_thread; 2477 2478 struct spdk_thread *orig_thread; 2479 spdk_channel_for_each_cpl cpl; 2480 }; 2481 2482 void * 2483 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2484 { 2485 return i->io_device; 2486 } 2487 2488 struct spdk_io_channel * 2489 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2490 { 2491 return i->ch; 2492 } 2493 2494 void * 2495 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2496 { 2497 return i->ctx; 2498 } 2499 2500 static void 2501 _call_completion(void *ctx) 2502 { 2503 struct spdk_io_channel_iter *i = ctx; 2504 2505 if (i->cpl != NULL) { 2506 i->cpl(i, i->status); 2507 } 2508 free(i); 2509 } 2510 2511 static void 2512 _call_channel(void *ctx) 2513 { 2514 struct spdk_io_channel_iter *i = ctx; 2515 struct spdk_io_channel *ch; 2516 2517 /* 2518 * It is possible that the channel was deleted before this 2519 * message had a chance to execute. If so, skip calling 2520 * the fn() on this thread. 2521 */ 2522 pthread_mutex_lock(&g_devlist_mutex); 2523 ch = thread_get_io_channel(i->cur_thread, i->dev); 2524 pthread_mutex_unlock(&g_devlist_mutex); 2525 2526 if (ch) { 2527 i->fn(i); 2528 } else { 2529 spdk_for_each_channel_continue(i, 0); 2530 } 2531 } 2532 2533 void 2534 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2535 spdk_channel_for_each_cpl cpl) 2536 { 2537 struct spdk_thread *thread; 2538 struct spdk_io_channel *ch; 2539 struct spdk_io_channel_iter *i; 2540 int rc __attribute__((unused)); 2541 2542 i = calloc(1, sizeof(*i)); 2543 if (!i) { 2544 SPDK_ERRLOG("Unable to allocate iterator\n"); 2545 assert(false); 2546 return; 2547 } 2548 2549 i->io_device = io_device; 2550 i->fn = fn; 2551 i->ctx = ctx; 2552 i->cpl = cpl; 2553 i->orig_thread = _get_thread(); 2554 2555 pthread_mutex_lock(&g_devlist_mutex); 2556 i->dev = io_device_get(io_device); 2557 if (i->dev == NULL) { 2558 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2559 assert(false); 2560 i->status = -ENODEV; 2561 goto end; 2562 } 2563 2564 /* Do not allow new for_each operations if we are already waiting to unregister 2565 * the device for other for_each operations to complete. 2566 */ 2567 if (i->dev->pending_unregister) { 2568 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2569 i->status = -ENODEV; 2570 goto end; 2571 } 2572 2573 TAILQ_FOREACH(thread, &g_threads, tailq) { 2574 ch = thread_get_io_channel(thread, i->dev); 2575 if (ch != NULL) { 2576 ch->dev->for_each_count++; 2577 i->cur_thread = thread; 2578 i->ch = ch; 2579 pthread_mutex_unlock(&g_devlist_mutex); 2580 rc = spdk_thread_send_msg(thread, _call_channel, i); 2581 assert(rc == 0); 2582 return; 2583 } 2584 } 2585 2586 end: 2587 pthread_mutex_unlock(&g_devlist_mutex); 2588 2589 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2590 assert(rc == 0); 2591 } 2592 2593 static void 2594 __pending_unregister(void *arg) 2595 { 2596 struct io_device *dev = arg; 2597 2598 assert(dev->pending_unregister); 2599 assert(dev->for_each_count == 0); 2600 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2601 } 2602 2603 void 2604 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2605 { 2606 struct spdk_thread *thread; 2607 struct spdk_io_channel *ch; 2608 struct io_device *dev; 2609 int rc __attribute__((unused)); 2610 2611 assert(i->cur_thread == spdk_get_thread()); 2612 2613 i->status = status; 2614 2615 pthread_mutex_lock(&g_devlist_mutex); 2616 dev = i->dev; 2617 if (status) { 2618 goto end; 2619 } 2620 2621 thread = TAILQ_NEXT(i->cur_thread, tailq); 2622 while (thread) { 2623 ch = thread_get_io_channel(thread, dev); 2624 if (ch != NULL) { 2625 i->cur_thread = thread; 2626 i->ch = ch; 2627 pthread_mutex_unlock(&g_devlist_mutex); 2628 rc = spdk_thread_send_msg(thread, _call_channel, i); 2629 assert(rc == 0); 2630 return; 2631 } 2632 thread = TAILQ_NEXT(thread, tailq); 2633 } 2634 2635 end: 2636 dev->for_each_count--; 2637 i->ch = NULL; 2638 pthread_mutex_unlock(&g_devlist_mutex); 2639 2640 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2641 assert(rc == 0); 2642 2643 pthread_mutex_lock(&g_devlist_mutex); 2644 if (dev->pending_unregister && dev->for_each_count == 0) { 2645 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2646 assert(rc == 0); 2647 } 2648 pthread_mutex_unlock(&g_devlist_mutex); 2649 } 2650 2651 static void 2652 thread_interrupt_destroy(struct spdk_thread *thread) 2653 { 2654 struct spdk_fd_group *fgrp = thread->fgrp; 2655 2656 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2657 2658 if (thread->msg_fd < 0) { 2659 return; 2660 } 2661 2662 spdk_fd_group_remove(fgrp, thread->msg_fd); 2663 close(thread->msg_fd); 2664 thread->msg_fd = -1; 2665 2666 spdk_fd_group_destroy(fgrp); 2667 thread->fgrp = NULL; 2668 } 2669 2670 #ifdef __linux__ 2671 static int 2672 thread_interrupt_msg_process(void *arg) 2673 { 2674 struct spdk_thread *thread = arg; 2675 struct spdk_thread *orig_thread; 2676 uint32_t msg_count; 2677 spdk_msg_fn critical_msg; 2678 int rc = 0; 2679 uint64_t notify = 1; 2680 2681 assert(spdk_interrupt_mode_is_enabled()); 2682 2683 orig_thread = spdk_get_thread(); 2684 spdk_set_thread(thread); 2685 2686 /* There may be race between msg_acknowledge and another producer's msg_notify, 2687 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2688 * This can avoid msg notification missing. 2689 */ 2690 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2691 if (rc < 0 && errno != EAGAIN) { 2692 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2693 } 2694 2695 critical_msg = thread->critical_msg; 2696 if (spdk_unlikely(critical_msg != NULL)) { 2697 critical_msg(NULL); 2698 thread->critical_msg = NULL; 2699 rc = 1; 2700 } 2701 2702 msg_count = msg_queue_run_batch(thread, 0); 2703 if (msg_count) { 2704 rc = 1; 2705 } 2706 2707 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2708 if (spdk_unlikely(!thread->in_interrupt)) { 2709 /* The thread transitioned to poll mode in a msg during the above processing. 2710 * Clear msg_fd since thread messages will be polled directly in poll mode. 2711 */ 2712 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2713 if (rc < 0 && errno != EAGAIN) { 2714 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 2715 } 2716 } 2717 2718 spdk_set_thread(orig_thread); 2719 return rc; 2720 } 2721 2722 static int 2723 thread_interrupt_create(struct spdk_thread *thread) 2724 { 2725 int rc; 2726 2727 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2728 2729 rc = spdk_fd_group_create(&thread->fgrp); 2730 if (rc) { 2731 return rc; 2732 } 2733 2734 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2735 if (thread->msg_fd < 0) { 2736 rc = -errno; 2737 spdk_fd_group_destroy(thread->fgrp); 2738 thread->fgrp = NULL; 2739 2740 return rc; 2741 } 2742 2743 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2744 thread_interrupt_msg_process, thread); 2745 } 2746 #else 2747 static int 2748 thread_interrupt_create(struct spdk_thread *thread) 2749 { 2750 return -ENOTSUP; 2751 } 2752 #endif 2753 2754 static int 2755 _interrupt_wrapper(void *ctx) 2756 { 2757 struct spdk_interrupt *intr = ctx; 2758 struct spdk_thread *orig_thread, *thread; 2759 int rc; 2760 2761 orig_thread = spdk_get_thread(); 2762 thread = intr->thread; 2763 2764 spdk_set_thread(thread); 2765 2766 SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd, 2767 intr->fn, intr->arg); 2768 2769 rc = intr->fn(intr->arg); 2770 2771 SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); 2772 2773 spdk_set_thread(orig_thread); 2774 2775 return rc; 2776 } 2777 2778 struct spdk_interrupt * 2779 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2780 void *arg, const char *name) 2781 { 2782 struct spdk_thread *thread; 2783 struct spdk_interrupt *intr; 2784 int ret; 2785 2786 thread = spdk_get_thread(); 2787 if (!thread) { 2788 assert(false); 2789 return NULL; 2790 } 2791 2792 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2793 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2794 return NULL; 2795 } 2796 2797 intr = calloc(1, sizeof(*intr)); 2798 if (intr == NULL) { 2799 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2800 return NULL; 2801 } 2802 2803 if (name) { 2804 snprintf(intr->name, sizeof(intr->name), "%s", name); 2805 } else { 2806 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2807 } 2808 2809 intr->efd = efd; 2810 intr->thread = thread; 2811 intr->fn = fn; 2812 intr->arg = arg; 2813 2814 ret = spdk_fd_group_add(thread->fgrp, efd, _interrupt_wrapper, intr, name); 2815 2816 if (ret != 0) { 2817 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2818 thread->name, efd, spdk_strerror(-ret)); 2819 free(intr); 2820 return NULL; 2821 } 2822 2823 return intr; 2824 } 2825 2826 void 2827 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2828 { 2829 struct spdk_thread *thread; 2830 struct spdk_interrupt *intr; 2831 2832 intr = *pintr; 2833 if (intr == NULL) { 2834 return; 2835 } 2836 2837 *pintr = NULL; 2838 2839 thread = spdk_get_thread(); 2840 if (!thread) { 2841 assert(false); 2842 return; 2843 } 2844 2845 if (intr->thread != thread) { 2846 wrong_thread(__func__, intr->name, intr->thread, thread); 2847 return; 2848 } 2849 2850 spdk_fd_group_remove(thread->fgrp, intr->efd); 2851 free(intr); 2852 } 2853 2854 int 2855 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2856 enum spdk_interrupt_event_types event_types) 2857 { 2858 struct spdk_thread *thread; 2859 2860 thread = spdk_get_thread(); 2861 if (!thread) { 2862 assert(false); 2863 return -EINVAL; 2864 } 2865 2866 if (intr->thread != thread) { 2867 wrong_thread(__func__, intr->name, intr->thread, thread); 2868 return -EINVAL; 2869 } 2870 2871 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2872 } 2873 2874 int 2875 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2876 { 2877 return spdk_fd_group_get_fd(thread->fgrp); 2878 } 2879 2880 struct spdk_fd_group * 2881 spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread) 2882 { 2883 return thread->fgrp; 2884 } 2885 2886 static bool g_interrupt_mode = false; 2887 2888 int 2889 spdk_interrupt_mode_enable(void) 2890 { 2891 /* It must be called once prior to initializing the threading library. 2892 * g_spdk_msg_mempool will be valid if thread library is initialized. 2893 */ 2894 if (g_spdk_msg_mempool) { 2895 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2896 return -1; 2897 } 2898 2899 #ifdef __linux__ 2900 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2901 g_interrupt_mode = true; 2902 return 0; 2903 #else 2904 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2905 g_interrupt_mode = false; 2906 return -ENOTSUP; 2907 #endif 2908 } 2909 2910 bool 2911 spdk_interrupt_mode_is_enabled(void) 2912 { 2913 return g_interrupt_mode; 2914 } 2915 2916 void 2917 spdk_spin_init(struct spdk_spinlock *sspin) 2918 { 2919 int rc; 2920 2921 memset(sspin, 0, sizeof(*sspin)); 2922 rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE); 2923 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2924 } 2925 2926 void 2927 spdk_spin_destroy(struct spdk_spinlock *sspin) 2928 { 2929 int rc; 2930 2931 SPIN_ASSERT_RETURN_VOID(sspin->thread == NULL, SPIN_ERR_LOCK_HELD); 2932 2933 rc = pthread_spin_destroy(&sspin->spinlock); 2934 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2935 } 2936 2937 void 2938 spdk_spin_lock(struct spdk_spinlock *sspin) 2939 { 2940 struct spdk_thread *thread = spdk_get_thread(); 2941 int rc; 2942 2943 SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD); 2944 SPIN_ASSERT_RETURN_VOID(thread != sspin->thread, SPIN_ERR_DEADLOCK); 2945 2946 rc = pthread_spin_lock(&sspin->spinlock); 2947 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2948 2949 sspin->thread = thread; 2950 sspin->thread->lock_count++; 2951 } 2952 2953 void 2954 spdk_spin_unlock(struct spdk_spinlock *sspin) 2955 { 2956 struct spdk_thread *thread = spdk_get_thread(); 2957 int rc; 2958 2959 SPIN_ASSERT_RETURN_VOID(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD); 2960 SPIN_ASSERT_RETURN_VOID(thread == sspin->thread, SPIN_ERR_WRONG_THREAD); 2961 2962 SPIN_ASSERT_RETURN_VOID(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT); 2963 thread->lock_count--; 2964 sspin->thread = NULL; 2965 2966 rc = pthread_spin_unlock(&sspin->spinlock); 2967 SPIN_ASSERT_RETURN_VOID(rc == 0, SPIN_ERR_PTHREAD); 2968 } 2969 2970 bool 2971 spdk_spin_held(struct spdk_spinlock *sspin) 2972 { 2973 struct spdk_thread *thread = spdk_get_thread(); 2974 2975 SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false); 2976 2977 return sspin->thread == thread; 2978 } 2979 2980 static int 2981 iobuf_channel_create_cb(void *io_device, void *ctx) 2982 { 2983 struct iobuf_channel *ch = ctx; 2984 2985 STAILQ_INIT(&ch->small_queue); 2986 STAILQ_INIT(&ch->large_queue); 2987 2988 return 0; 2989 } 2990 2991 static void 2992 iobuf_channel_destroy_cb(void *io_device, void *ctx) 2993 { 2994 struct iobuf_channel *ch __attribute__((unused)) = ctx; 2995 2996 assert(STAILQ_EMPTY(&ch->small_queue)); 2997 assert(STAILQ_EMPTY(&ch->large_queue)); 2998 } 2999 3000 int 3001 spdk_iobuf_initialize(void) 3002 { 3003 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 3004 int rc = 0; 3005 3006 g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count, 3007 opts->small_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY); 3008 if (!g_iobuf.small_pool) { 3009 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 3010 rc = -ENOMEM; 3011 goto error; 3012 } 3013 3014 g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count, 3015 opts->large_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY); 3016 if (!g_iobuf.large_pool) { 3017 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 3018 rc = -ENOMEM; 3019 goto error; 3020 } 3021 3022 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 3023 sizeof(struct iobuf_channel), "iobuf"); 3024 3025 return 0; 3026 error: 3027 spdk_mempool_free(g_iobuf.small_pool); 3028 return rc; 3029 } 3030 3031 static void 3032 iobuf_unregister_cb(void *io_device) 3033 { 3034 struct iobuf_module *module; 3035 3036 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 3037 module = TAILQ_FIRST(&g_iobuf.modules); 3038 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 3039 free(module->name); 3040 free(module); 3041 } 3042 3043 if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { 3044 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 3045 spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); 3046 } 3047 3048 if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { 3049 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 3050 spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); 3051 } 3052 3053 spdk_mempool_free(g_iobuf.small_pool); 3054 spdk_mempool_free(g_iobuf.large_pool); 3055 3056 if (g_iobuf.finish_cb != NULL) { 3057 g_iobuf.finish_cb(g_iobuf.finish_arg); 3058 } 3059 } 3060 3061 void 3062 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 3063 { 3064 g_iobuf.finish_cb = cb_fn; 3065 g_iobuf.finish_arg = cb_arg; 3066 3067 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 3068 } 3069 3070 int 3071 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 3072 { 3073 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 3074 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 3075 IOBUF_MIN_SMALL_POOL_SIZE); 3076 return -EINVAL; 3077 } 3078 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 3079 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 3080 IOBUF_MIN_LARGE_POOL_SIZE); 3081 return -EINVAL; 3082 } 3083 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 3084 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n", 3085 IOBUF_MIN_SMALL_BUFSIZE); 3086 return -EINVAL; 3087 } 3088 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 3089 SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n", 3090 IOBUF_MIN_LARGE_BUFSIZE); 3091 return -EINVAL; 3092 } 3093 3094 g_iobuf.opts = *opts; 3095 3096 return 0; 3097 } 3098 3099 void 3100 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts) 3101 { 3102 *opts = g_iobuf.opts; 3103 } 3104 3105 int 3106 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 3107 uint32_t small_cache_size, uint32_t large_cache_size) 3108 { 3109 struct spdk_io_channel *ioch; 3110 struct iobuf_channel *iobuf_ch; 3111 struct iobuf_module *module; 3112 struct spdk_iobuf_buffer *buf; 3113 uint32_t i; 3114 3115 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 3116 if (strcmp(name, module->name) == 0) { 3117 break; 3118 } 3119 } 3120 3121 if (module == NULL) { 3122 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 3123 return -ENODEV; 3124 } 3125 3126 ioch = spdk_get_io_channel(&g_iobuf); 3127 if (ioch == NULL) { 3128 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 3129 return -ENOMEM; 3130 } 3131 3132 iobuf_ch = spdk_io_channel_get_ctx(ioch); 3133 3134 ch->small.queue = &iobuf_ch->small_queue; 3135 ch->large.queue = &iobuf_ch->large_queue; 3136 ch->small.pool = g_iobuf.small_pool; 3137 ch->large.pool = g_iobuf.large_pool; 3138 ch->small.bufsize = g_iobuf.opts.small_bufsize; 3139 ch->large.bufsize = g_iobuf.opts.large_bufsize; 3140 ch->parent = ioch; 3141 ch->module = module; 3142 ch->small.cache_size = small_cache_size; 3143 ch->large.cache_size = large_cache_size; 3144 ch->small.cache_count = 0; 3145 ch->large.cache_count = 0; 3146 3147 STAILQ_INIT(&ch->small.cache); 3148 STAILQ_INIT(&ch->large.cache); 3149 3150 for (i = 0; i < small_cache_size; ++i) { 3151 buf = spdk_mempool_get(g_iobuf.small_pool); 3152 if (buf == NULL) { 3153 SPDK_ERRLOG("Failed to populate iobuf small buffer cache. " 3154 "You may need to increase spdk_iobuf_opts.small_pool_count\n"); 3155 goto error; 3156 } 3157 STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); 3158 ch->small.cache_count++; 3159 } 3160 for (i = 0; i < large_cache_size; ++i) { 3161 buf = spdk_mempool_get(g_iobuf.large_pool); 3162 if (buf == NULL) { 3163 SPDK_ERRLOG("Failed to populate iobuf large buffer cache. " 3164 "You may need to increase spdk_iobuf_opts.large_pool_count\n"); 3165 goto error; 3166 } 3167 STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); 3168 ch->large.cache_count++; 3169 } 3170 3171 return 0; 3172 error: 3173 spdk_iobuf_channel_fini(ch); 3174 3175 return -ENOMEM; 3176 } 3177 3178 void 3179 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 3180 { 3181 struct spdk_iobuf_entry *entry __attribute__((unused)); 3182 struct spdk_iobuf_buffer *buf; 3183 3184 /* Make sure none of the wait queue entries are coming from this module */ 3185 STAILQ_FOREACH(entry, ch->small.queue, stailq) { 3186 assert(entry->module != ch->module); 3187 } 3188 STAILQ_FOREACH(entry, ch->large.queue, stailq) { 3189 assert(entry->module != ch->module); 3190 } 3191 3192 /* Release cached buffers back to the pool */ 3193 while (!STAILQ_EMPTY(&ch->small.cache)) { 3194 buf = STAILQ_FIRST(&ch->small.cache); 3195 STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); 3196 spdk_mempool_put(ch->small.pool, buf); 3197 ch->small.cache_count--; 3198 } 3199 while (!STAILQ_EMPTY(&ch->large.cache)) { 3200 buf = STAILQ_FIRST(&ch->large.cache); 3201 STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); 3202 spdk_mempool_put(ch->large.pool, buf); 3203 ch->large.cache_count--; 3204 } 3205 3206 assert(ch->small.cache_count == 0); 3207 assert(ch->large.cache_count == 0); 3208 3209 spdk_put_io_channel(ch->parent); 3210 ch->parent = NULL; 3211 } 3212 3213 int 3214 spdk_iobuf_register_module(const char *name) 3215 { 3216 struct iobuf_module *module; 3217 3218 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 3219 if (strcmp(name, module->name) == 0) { 3220 return -EEXIST; 3221 } 3222 } 3223 3224 module = calloc(1, sizeof(*module)); 3225 if (module == NULL) { 3226 return -ENOMEM; 3227 } 3228 3229 module->name = strdup(name); 3230 if (module->name == NULL) { 3231 free(module); 3232 return -ENOMEM; 3233 } 3234 3235 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 3236 3237 return 0; 3238 } 3239 3240 int 3241 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, 3242 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 3243 { 3244 struct spdk_iobuf_entry *entry, *tmp; 3245 int rc; 3246 3247 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 3248 /* We only want to iterate over the entries requested by the module which owns ch */ 3249 if (entry->module != ch->module) { 3250 continue; 3251 } 3252 3253 rc = cb_fn(ch, entry, cb_ctx); 3254 if (rc != 0) { 3255 return rc; 3256 } 3257 } 3258 3259 return 0; 3260 } 3261 3262 void 3263 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 3264 uint64_t len) 3265 { 3266 struct spdk_iobuf_pool *pool; 3267 3268 if (len <= ch->small.bufsize) { 3269 pool = &ch->small; 3270 } else { 3271 assert(len <= ch->large.bufsize); 3272 pool = &ch->large; 3273 } 3274 3275 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 3276 } 3277 3278 SPDK_LOG_REGISTER_COMPONENT(thread) 3279