1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/env.h" 9 #include "spdk/likely.h" 10 #include "spdk/queue.h" 11 #include "spdk/string.h" 12 #include "spdk/thread.h" 13 #include "spdk/trace.h" 14 #include "spdk/util.h" 15 #include "spdk/fd_group.h" 16 17 #include "spdk/log.h" 18 #include "spdk_internal/thread.h" 19 #include "spdk_internal/usdt.h" 20 #include "thread_internal.h" 21 22 #include "spdk_internal/trace_defs.h" 23 24 #ifdef __linux__ 25 #include <sys/timerfd.h> 26 #include <sys/eventfd.h> 27 #endif 28 29 #define SPDK_MSG_BATCH_SIZE 8 30 #define SPDK_MAX_DEVICE_NAME_LEN 256 31 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 32 #define SPDK_MAX_POLLER_NAME_LEN 256 33 #define SPDK_MAX_THREAD_NAME_LEN 256 34 35 static struct spdk_thread *g_app_thread; 36 37 enum spdk_poller_state { 38 /* The poller is registered with a thread but not currently executing its fn. */ 39 SPDK_POLLER_STATE_WAITING, 40 41 /* The poller is currently running its fn. */ 42 SPDK_POLLER_STATE_RUNNING, 43 44 /* The poller was unregistered during the execution of its fn. */ 45 SPDK_POLLER_STATE_UNREGISTERED, 46 47 /* The poller is in the process of being paused. It will be paused 48 * during the next time it's supposed to be executed. 49 */ 50 SPDK_POLLER_STATE_PAUSING, 51 52 /* The poller is registered but currently paused. It's on the 53 * paused_pollers list. 54 */ 55 SPDK_POLLER_STATE_PAUSED, 56 }; 57 58 struct spdk_poller { 59 TAILQ_ENTRY(spdk_poller) tailq; 60 RB_ENTRY(spdk_poller) node; 61 62 /* Current state of the poller; should only be accessed from the poller's thread. */ 63 enum spdk_poller_state state; 64 65 uint64_t period_ticks; 66 uint64_t next_run_tick; 67 uint64_t run_count; 68 uint64_t busy_count; 69 uint64_t id; 70 spdk_poller_fn fn; 71 void *arg; 72 struct spdk_thread *thread; 73 /* Native interruptfd for period or busy poller */ 74 int interruptfd; 75 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 76 void *set_intr_cb_arg; 77 78 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 79 }; 80 81 enum spdk_thread_state { 82 /* The thread is processing poller and message by spdk_thread_poll(). */ 83 SPDK_THREAD_STATE_RUNNING, 84 85 /* The thread is in the process of termination. It reaps unregistering 86 * poller are releasing I/O channel. 87 */ 88 SPDK_THREAD_STATE_EXITING, 89 90 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 91 SPDK_THREAD_STATE_EXITED, 92 }; 93 94 struct spdk_thread { 95 uint64_t tsc_last; 96 struct spdk_thread_stats stats; 97 /* 98 * Contains pollers actively running on this thread. Pollers 99 * are run round-robin. The thread takes one poller from the head 100 * of the ring, executes it, then puts it back at the tail of 101 * the ring. 102 */ 103 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 104 /** 105 * Contains pollers running on this thread with a periodic timer. 106 */ 107 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 108 struct spdk_poller *first_timed_poller; 109 /* 110 * Contains paused pollers. Pollers on this queue are waiting until 111 * they are resumed (in which case they're put onto the active/timer 112 * queues) or unregistered. 113 */ 114 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 115 struct spdk_ring *messages; 116 int msg_fd; 117 SLIST_HEAD(, spdk_msg) msg_cache; 118 size_t msg_cache_count; 119 spdk_msg_fn critical_msg; 120 uint64_t id; 121 uint64_t next_poller_id; 122 enum spdk_thread_state state; 123 int pending_unregister_count; 124 125 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 126 TAILQ_ENTRY(spdk_thread) tailq; 127 128 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 129 struct spdk_cpuset cpumask; 130 uint64_t exit_timeout_tsc; 131 132 /* Indicates whether this spdk_thread currently runs in interrupt. */ 133 bool in_interrupt; 134 bool poller_unregistered; 135 struct spdk_fd_group *fgrp; 136 137 /* User context allocated at the end */ 138 uint8_t ctx[0]; 139 }; 140 141 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 142 143 static spdk_new_thread_fn g_new_thread_fn = NULL; 144 static spdk_thread_op_fn g_thread_op_fn = NULL; 145 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 146 static size_t g_ctx_sz = 0; 147 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 148 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 149 * SPDK application is required. 150 */ 151 static uint64_t g_thread_id = 1; 152 153 struct io_device { 154 void *io_device; 155 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 156 spdk_io_channel_create_cb create_cb; 157 spdk_io_channel_destroy_cb destroy_cb; 158 spdk_io_device_unregister_cb unregister_cb; 159 struct spdk_thread *unregister_thread; 160 uint32_t ctx_size; 161 uint32_t for_each_count; 162 RB_ENTRY(io_device) node; 163 164 uint32_t refcnt; 165 166 bool pending_unregister; 167 bool unregistered; 168 }; 169 170 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 171 172 static int 173 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 174 { 175 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 176 } 177 178 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 179 180 static int 181 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 182 { 183 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 184 } 185 186 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 187 188 struct spdk_msg { 189 spdk_msg_fn fn; 190 void *arg; 191 192 SLIST_ENTRY(spdk_msg) link; 193 }; 194 195 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 196 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 197 198 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 199 static uint32_t g_thread_count = 0; 200 201 static __thread struct spdk_thread *tls_thread = NULL; 202 203 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 204 { 205 spdk_trace_register_description("THREAD_IOCH_GET", 206 TRACE_THREAD_IOCH_GET, 207 OWNER_NONE, OBJECT_NONE, 0, 208 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 209 spdk_trace_register_description("THREAD_IOCH_PUT", 210 TRACE_THREAD_IOCH_PUT, 211 OWNER_NONE, OBJECT_NONE, 0, 212 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 213 } 214 215 /* 216 * If this compare function returns zero when two next_run_ticks are equal, 217 * the macro RB_INSERT() returns a pointer to the element with the same 218 * next_run_tick. 219 * 220 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 221 * to remove as a parameter. 222 * 223 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 224 * side by returning 1 when two next_run_ticks are equal. 225 */ 226 static inline int 227 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 228 { 229 if (poller1->next_run_tick < poller2->next_run_tick) { 230 return -1; 231 } else { 232 return 1; 233 } 234 } 235 236 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 237 238 static inline struct spdk_thread * 239 _get_thread(void) 240 { 241 return tls_thread; 242 } 243 244 static int 245 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 246 { 247 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 248 249 g_ctx_sz = ctx_sz; 250 251 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 252 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 253 sizeof(struct spdk_msg), 254 0, /* No cache. We do our own. */ 255 SPDK_ENV_SOCKET_ID_ANY); 256 257 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 258 msg_mempool_sz); 259 260 if (!g_spdk_msg_mempool) { 261 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 262 return -ENOMEM; 263 } 264 265 return 0; 266 } 267 268 static void thread_interrupt_destroy(struct spdk_thread *thread); 269 static int thread_interrupt_create(struct spdk_thread *thread); 270 271 static void 272 _free_thread(struct spdk_thread *thread) 273 { 274 struct spdk_io_channel *ch; 275 struct spdk_msg *msg; 276 struct spdk_poller *poller, *ptmp; 277 278 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 279 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 280 thread->name, ch->dev->name); 281 } 282 283 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 284 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 285 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 286 poller->name); 287 } 288 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 289 free(poller); 290 } 291 292 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 293 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 294 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 295 poller->name); 296 } 297 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 298 free(poller); 299 } 300 301 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 302 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 303 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 304 free(poller); 305 } 306 307 pthread_mutex_lock(&g_devlist_mutex); 308 assert(g_thread_count > 0); 309 g_thread_count--; 310 TAILQ_REMOVE(&g_threads, thread, tailq); 311 pthread_mutex_unlock(&g_devlist_mutex); 312 313 msg = SLIST_FIRST(&thread->msg_cache); 314 while (msg != NULL) { 315 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 316 317 assert(thread->msg_cache_count > 0); 318 thread->msg_cache_count--; 319 spdk_mempool_put(g_spdk_msg_mempool, msg); 320 321 msg = SLIST_FIRST(&thread->msg_cache); 322 } 323 324 assert(thread->msg_cache_count == 0); 325 326 if (spdk_interrupt_mode_is_enabled()) { 327 thread_interrupt_destroy(thread); 328 } 329 330 spdk_ring_free(thread->messages); 331 free(thread); 332 } 333 334 int 335 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 336 { 337 assert(g_new_thread_fn == NULL); 338 assert(g_thread_op_fn == NULL); 339 340 if (new_thread_fn == NULL) { 341 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 342 } else { 343 g_new_thread_fn = new_thread_fn; 344 } 345 346 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 347 } 348 349 int 350 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 351 spdk_thread_op_supported_fn thread_op_supported_fn, 352 size_t ctx_sz, size_t msg_mempool_sz) 353 { 354 assert(g_new_thread_fn == NULL); 355 assert(g_thread_op_fn == NULL); 356 assert(g_thread_op_supported_fn == NULL); 357 358 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 359 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 360 return -EINVAL; 361 } 362 363 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 364 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 365 } else { 366 g_thread_op_fn = thread_op_fn; 367 g_thread_op_supported_fn = thread_op_supported_fn; 368 } 369 370 return _thread_lib_init(ctx_sz, msg_mempool_sz); 371 } 372 373 void 374 spdk_thread_lib_fini(void) 375 { 376 struct io_device *dev; 377 378 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 379 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 380 } 381 382 g_new_thread_fn = NULL; 383 g_thread_op_fn = NULL; 384 g_thread_op_supported_fn = NULL; 385 g_ctx_sz = 0; 386 if (g_app_thread != NULL) { 387 _free_thread(g_app_thread); 388 g_app_thread = NULL; 389 } 390 391 if (g_spdk_msg_mempool) { 392 spdk_mempool_free(g_spdk_msg_mempool); 393 g_spdk_msg_mempool = NULL; 394 } 395 } 396 397 struct spdk_thread * 398 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 399 { 400 struct spdk_thread *thread, *null_thread; 401 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 402 int rc = 0, i; 403 404 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 405 if (!thread) { 406 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 407 return NULL; 408 } 409 410 if (cpumask) { 411 spdk_cpuset_copy(&thread->cpumask, cpumask); 412 } else { 413 spdk_cpuset_negate(&thread->cpumask); 414 } 415 416 RB_INIT(&thread->io_channels); 417 TAILQ_INIT(&thread->active_pollers); 418 RB_INIT(&thread->timed_pollers); 419 TAILQ_INIT(&thread->paused_pollers); 420 SLIST_INIT(&thread->msg_cache); 421 thread->msg_cache_count = 0; 422 423 thread->tsc_last = spdk_get_ticks(); 424 425 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 426 * ID exceeds UINT64_MAX a warning message is logged 427 */ 428 thread->next_poller_id = 1; 429 430 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 431 if (!thread->messages) { 432 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 433 free(thread); 434 return NULL; 435 } 436 437 /* Fill the local message pool cache. */ 438 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 439 if (rc == 0) { 440 /* If we can't populate the cache it's ok. The cache will get filled 441 * up organically as messages are passed to the thread. */ 442 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 443 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 444 thread->msg_cache_count++; 445 } 446 } 447 448 if (name) { 449 snprintf(thread->name, sizeof(thread->name), "%s", name); 450 } else { 451 snprintf(thread->name, sizeof(thread->name), "%p", thread); 452 } 453 454 pthread_mutex_lock(&g_devlist_mutex); 455 if (g_thread_id == 0) { 456 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 457 pthread_mutex_unlock(&g_devlist_mutex); 458 _free_thread(thread); 459 return NULL; 460 } 461 thread->id = g_thread_id++; 462 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 463 g_thread_count++; 464 pthread_mutex_unlock(&g_devlist_mutex); 465 466 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 467 thread->id, thread->name); 468 469 if (spdk_interrupt_mode_is_enabled()) { 470 thread->in_interrupt = true; 471 rc = thread_interrupt_create(thread); 472 if (rc != 0) { 473 _free_thread(thread); 474 return NULL; 475 } 476 } 477 478 if (g_new_thread_fn) { 479 rc = g_new_thread_fn(thread); 480 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 481 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 482 } 483 484 if (rc != 0) { 485 _free_thread(thread); 486 return NULL; 487 } 488 489 thread->state = SPDK_THREAD_STATE_RUNNING; 490 491 /* If this is the first thread, save it as the app thread. Use an atomic 492 * compare + exchange to guard against crazy users who might try to 493 * call spdk_thread_create() simultaneously on multiple threads. 494 */ 495 null_thread = NULL; 496 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 497 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 498 499 return thread; 500 } 501 502 struct spdk_thread * 503 spdk_thread_get_app_thread(void) 504 { 505 return g_app_thread; 506 } 507 508 void 509 spdk_set_thread(struct spdk_thread *thread) 510 { 511 tls_thread = thread; 512 } 513 514 static void 515 thread_exit(struct spdk_thread *thread, uint64_t now) 516 { 517 struct spdk_poller *poller; 518 struct spdk_io_channel *ch; 519 520 if (now >= thread->exit_timeout_tsc) { 521 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 522 thread->name); 523 goto exited; 524 } 525 526 if (spdk_ring_count(thread->messages) > 0) { 527 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 528 return; 529 } 530 531 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 532 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 533 SPDK_INFOLOG(thread, 534 "thread %s still has active poller %s\n", 535 thread->name, poller->name); 536 return; 537 } 538 } 539 540 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 541 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 542 SPDK_INFOLOG(thread, 543 "thread %s still has active timed poller %s\n", 544 thread->name, poller->name); 545 return; 546 } 547 } 548 549 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 550 SPDK_INFOLOG(thread, 551 "thread %s still has paused poller %s\n", 552 thread->name, poller->name); 553 return; 554 } 555 556 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 557 SPDK_INFOLOG(thread, 558 "thread %s still has channel for io_device %s\n", 559 thread->name, ch->dev->name); 560 return; 561 } 562 563 if (thread->pending_unregister_count > 0) { 564 SPDK_INFOLOG(thread, 565 "thread %s is still unregistering io_devices\n", 566 thread->name); 567 return; 568 } 569 570 exited: 571 thread->state = SPDK_THREAD_STATE_EXITED; 572 if (spdk_unlikely(thread->in_interrupt)) { 573 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 574 } 575 } 576 577 int 578 spdk_thread_exit(struct spdk_thread *thread) 579 { 580 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 581 582 assert(tls_thread == thread); 583 584 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 585 SPDK_INFOLOG(thread, 586 "thread %s is already exiting\n", 587 thread->name); 588 return 0; 589 } 590 591 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 592 SPDK_THREAD_EXIT_TIMEOUT_SEC); 593 thread->state = SPDK_THREAD_STATE_EXITING; 594 return 0; 595 } 596 597 bool 598 spdk_thread_is_running(struct spdk_thread *thread) 599 { 600 return thread->state == SPDK_THREAD_STATE_RUNNING; 601 } 602 603 bool 604 spdk_thread_is_exited(struct spdk_thread *thread) 605 { 606 return thread->state == SPDK_THREAD_STATE_EXITED; 607 } 608 609 void 610 spdk_thread_destroy(struct spdk_thread *thread) 611 { 612 assert(thread != NULL); 613 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 614 615 assert(thread->state == SPDK_THREAD_STATE_EXITED); 616 617 if (tls_thread == thread) { 618 tls_thread = NULL; 619 } 620 621 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 622 if (thread != g_app_thread) { 623 _free_thread(thread); 624 } 625 } 626 627 void * 628 spdk_thread_get_ctx(struct spdk_thread *thread) 629 { 630 if (g_ctx_sz > 0) { 631 return thread->ctx; 632 } 633 634 return NULL; 635 } 636 637 struct spdk_cpuset * 638 spdk_thread_get_cpumask(struct spdk_thread *thread) 639 { 640 return &thread->cpumask; 641 } 642 643 int 644 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 645 { 646 struct spdk_thread *thread; 647 648 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 649 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 650 assert(false); 651 return -ENOTSUP; 652 } 653 654 thread = spdk_get_thread(); 655 if (!thread) { 656 SPDK_ERRLOG("Called from non-SPDK thread\n"); 657 assert(false); 658 return -EINVAL; 659 } 660 661 spdk_cpuset_copy(&thread->cpumask, cpumask); 662 663 /* Invoke framework's reschedule operation. If this function is called multiple times 664 * in a single spdk_thread_poll() context, the last cpumask will be used in the 665 * reschedule operation. 666 */ 667 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 668 669 return 0; 670 } 671 672 struct spdk_thread * 673 spdk_thread_get_from_ctx(void *ctx) 674 { 675 if (ctx == NULL) { 676 assert(false); 677 return NULL; 678 } 679 680 assert(g_ctx_sz > 0); 681 682 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 683 } 684 685 static inline uint32_t 686 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 687 { 688 unsigned count, i; 689 void *messages[SPDK_MSG_BATCH_SIZE]; 690 uint64_t notify = 1; 691 int rc; 692 693 #ifdef DEBUG 694 /* 695 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 696 * so we will never actually read uninitialized data from events, but just to be sure 697 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 698 */ 699 memset(messages, 0, sizeof(messages)); 700 #endif 701 702 if (max_msgs > 0) { 703 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 704 } else { 705 max_msgs = SPDK_MSG_BATCH_SIZE; 706 } 707 708 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 709 if (spdk_unlikely(thread->in_interrupt) && 710 spdk_ring_count(thread->messages) != 0) { 711 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 712 if (rc < 0) { 713 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 714 } 715 } 716 if (count == 0) { 717 return 0; 718 } 719 720 for (i = 0; i < count; i++) { 721 struct spdk_msg *msg = messages[i]; 722 723 assert(msg != NULL); 724 725 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 726 727 msg->fn(msg->arg); 728 729 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 730 /* Insert the messages at the head. We want to re-use the hot 731 * ones. */ 732 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 733 thread->msg_cache_count++; 734 } else { 735 spdk_mempool_put(g_spdk_msg_mempool, msg); 736 } 737 } 738 739 return count; 740 } 741 742 static void 743 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 744 { 745 struct spdk_poller *tmp __attribute__((unused)); 746 747 poller->next_run_tick = now + poller->period_ticks; 748 749 /* 750 * Insert poller in the thread's timed_pollers tree by next scheduled run time 751 * as its key. 752 */ 753 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 754 assert(tmp == NULL); 755 756 /* Update the cache only if it is empty or the inserted poller is earlier than it. 757 * RB_MIN() is not necessary here because all pollers, which has exactly the same 758 * next_run_tick as the existing poller, are inserted on the right side. 759 */ 760 if (thread->first_timed_poller == NULL || 761 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 762 thread->first_timed_poller = poller; 763 } 764 } 765 766 static inline void 767 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 768 { 769 struct spdk_poller *tmp __attribute__((unused)); 770 771 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 772 assert(tmp != NULL); 773 774 /* This function is not used in any case that is performance critical. 775 * Update the cache simply by RB_MIN() if it needs to be changed. 776 */ 777 if (thread->first_timed_poller == poller) { 778 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 779 } 780 } 781 782 static void 783 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 784 { 785 if (poller->period_ticks) { 786 poller_insert_timer(thread, poller, spdk_get_ticks()); 787 } else { 788 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 789 } 790 } 791 792 static inline void 793 thread_update_stats(struct spdk_thread *thread, uint64_t end, 794 uint64_t start, int rc) 795 { 796 if (rc == 0) { 797 /* Poller status idle */ 798 thread->stats.idle_tsc += end - start; 799 } else if (rc > 0) { 800 /* Poller status busy */ 801 thread->stats.busy_tsc += end - start; 802 } 803 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 804 thread->tsc_last = end; 805 } 806 807 static inline int 808 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 809 { 810 int rc; 811 812 switch (poller->state) { 813 case SPDK_POLLER_STATE_UNREGISTERED: 814 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 815 free(poller); 816 return 0; 817 case SPDK_POLLER_STATE_PAUSING: 818 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 819 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 820 poller->state = SPDK_POLLER_STATE_PAUSED; 821 return 0; 822 case SPDK_POLLER_STATE_WAITING: 823 break; 824 default: 825 assert(false); 826 break; 827 } 828 829 poller->state = SPDK_POLLER_STATE_RUNNING; 830 rc = poller->fn(poller->arg); 831 832 poller->run_count++; 833 if (rc > 0) { 834 poller->busy_count++; 835 } 836 837 #ifdef DEBUG 838 if (rc == -1) { 839 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 840 } 841 #endif 842 843 switch (poller->state) { 844 case SPDK_POLLER_STATE_UNREGISTERED: 845 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 846 free(poller); 847 break; 848 case SPDK_POLLER_STATE_PAUSING: 849 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 850 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 851 poller->state = SPDK_POLLER_STATE_PAUSED; 852 break; 853 case SPDK_POLLER_STATE_PAUSED: 854 case SPDK_POLLER_STATE_WAITING: 855 break; 856 case SPDK_POLLER_STATE_RUNNING: 857 poller->state = SPDK_POLLER_STATE_WAITING; 858 break; 859 default: 860 assert(false); 861 break; 862 } 863 864 return rc; 865 } 866 867 static inline int 868 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 869 uint64_t now) 870 { 871 int rc; 872 873 switch (poller->state) { 874 case SPDK_POLLER_STATE_UNREGISTERED: 875 free(poller); 876 return 0; 877 case SPDK_POLLER_STATE_PAUSING: 878 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 879 poller->state = SPDK_POLLER_STATE_PAUSED; 880 return 0; 881 case SPDK_POLLER_STATE_WAITING: 882 break; 883 default: 884 assert(false); 885 break; 886 } 887 888 poller->state = SPDK_POLLER_STATE_RUNNING; 889 rc = poller->fn(poller->arg); 890 891 poller->run_count++; 892 if (rc > 0) { 893 poller->busy_count++; 894 } 895 896 #ifdef DEBUG 897 if (rc == -1) { 898 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 899 } 900 #endif 901 902 switch (poller->state) { 903 case SPDK_POLLER_STATE_UNREGISTERED: 904 free(poller); 905 break; 906 case SPDK_POLLER_STATE_PAUSING: 907 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 908 poller->state = SPDK_POLLER_STATE_PAUSED; 909 break; 910 case SPDK_POLLER_STATE_PAUSED: 911 break; 912 case SPDK_POLLER_STATE_RUNNING: 913 poller->state = SPDK_POLLER_STATE_WAITING; 914 /* fallthrough */ 915 case SPDK_POLLER_STATE_WAITING: 916 poller_insert_timer(thread, poller, now); 917 break; 918 default: 919 assert(false); 920 break; 921 } 922 923 return rc; 924 } 925 926 static int 927 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 928 { 929 uint32_t msg_count; 930 struct spdk_poller *poller, *tmp; 931 spdk_msg_fn critical_msg; 932 int rc = 0; 933 934 thread->tsc_last = now; 935 936 critical_msg = thread->critical_msg; 937 if (spdk_unlikely(critical_msg != NULL)) { 938 critical_msg(NULL); 939 thread->critical_msg = NULL; 940 rc = 1; 941 } 942 943 msg_count = msg_queue_run_batch(thread, max_msgs); 944 if (msg_count) { 945 rc = 1; 946 } 947 948 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 949 active_pollers_head, tailq, tmp) { 950 int poller_rc; 951 952 poller_rc = thread_execute_poller(thread, poller); 953 if (poller_rc > rc) { 954 rc = poller_rc; 955 } 956 } 957 958 poller = thread->first_timed_poller; 959 while (poller != NULL) { 960 int timer_rc = 0; 961 962 if (now < poller->next_run_tick) { 963 break; 964 } 965 966 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 967 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 968 969 /* Update the cache to the next timed poller in the list 970 * only if the current poller is still the closest, otherwise, 971 * do nothing because the cache has been already updated. 972 */ 973 if (thread->first_timed_poller == poller) { 974 thread->first_timed_poller = tmp; 975 } 976 977 timer_rc = thread_execute_timed_poller(thread, poller, now); 978 if (timer_rc > rc) { 979 rc = timer_rc; 980 } 981 982 poller = tmp; 983 } 984 985 return rc; 986 } 987 988 int 989 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 990 { 991 struct spdk_thread *orig_thread; 992 int rc; 993 uint64_t notify = 1; 994 995 orig_thread = _get_thread(); 996 tls_thread = thread; 997 998 if (now == 0) { 999 now = spdk_get_ticks(); 1000 } 1001 1002 if (spdk_likely(!thread->in_interrupt)) { 1003 rc = thread_poll(thread, max_msgs, now); 1004 if (spdk_unlikely(thread->in_interrupt)) { 1005 /* The thread transitioned to interrupt mode during the above poll. 1006 * Poll it one more time in case that during the transition time 1007 * there is msg received without notification. 1008 */ 1009 rc = thread_poll(thread, max_msgs, now); 1010 } 1011 } else { 1012 /* Non-block wait on thread's fd_group */ 1013 rc = spdk_fd_group_wait(thread->fgrp, 0); 1014 if (spdk_unlikely(!thread->in_interrupt)) { 1015 /* The thread transitioned to poll mode in a msg during the above processing. 1016 * Clear msg_fd since thread messages will be polled directly in poll mode. 1017 */ 1018 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 1019 if (rc < 0 && errno != EAGAIN) { 1020 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 1021 } 1022 } 1023 1024 /* Reap unregistered pollers out of poller execution in intr mode */ 1025 if (spdk_unlikely(thread->poller_unregistered)) { 1026 struct spdk_poller *poller, *tmp; 1027 1028 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1029 active_pollers_head, tailq, tmp) { 1030 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1031 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1032 free(poller); 1033 } 1034 } 1035 1036 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1037 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1038 poller_remove_timer(thread, poller); 1039 free(poller); 1040 } 1041 } 1042 1043 thread->poller_unregistered = false; 1044 } 1045 } 1046 1047 1048 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1049 thread_exit(thread, now); 1050 } 1051 1052 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1053 1054 tls_thread = orig_thread; 1055 1056 return rc; 1057 } 1058 1059 uint64_t 1060 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1061 { 1062 struct spdk_poller *poller; 1063 1064 poller = thread->first_timed_poller; 1065 if (poller) { 1066 return poller->next_run_tick; 1067 } 1068 1069 return 0; 1070 } 1071 1072 int 1073 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1074 { 1075 return !TAILQ_EMPTY(&thread->active_pollers); 1076 } 1077 1078 static bool 1079 thread_has_unpaused_pollers(struct spdk_thread *thread) 1080 { 1081 if (TAILQ_EMPTY(&thread->active_pollers) && 1082 RB_EMPTY(&thread->timed_pollers)) { 1083 return false; 1084 } 1085 1086 return true; 1087 } 1088 1089 bool 1090 spdk_thread_has_pollers(struct spdk_thread *thread) 1091 { 1092 if (!thread_has_unpaused_pollers(thread) && 1093 TAILQ_EMPTY(&thread->paused_pollers)) { 1094 return false; 1095 } 1096 1097 return true; 1098 } 1099 1100 bool 1101 spdk_thread_is_idle(struct spdk_thread *thread) 1102 { 1103 if (spdk_ring_count(thread->messages) || 1104 thread_has_unpaused_pollers(thread) || 1105 thread->critical_msg != NULL) { 1106 return false; 1107 } 1108 1109 return true; 1110 } 1111 1112 uint32_t 1113 spdk_thread_get_count(void) 1114 { 1115 /* 1116 * Return cached value of the current thread count. We could acquire the 1117 * lock and iterate through the TAILQ of threads to count them, but that 1118 * count could still be invalidated after we release the lock. 1119 */ 1120 return g_thread_count; 1121 } 1122 1123 struct spdk_thread * 1124 spdk_get_thread(void) 1125 { 1126 return _get_thread(); 1127 } 1128 1129 const char * 1130 spdk_thread_get_name(const struct spdk_thread *thread) 1131 { 1132 return thread->name; 1133 } 1134 1135 uint64_t 1136 spdk_thread_get_id(const struct spdk_thread *thread) 1137 { 1138 return thread->id; 1139 } 1140 1141 struct spdk_thread * 1142 spdk_thread_get_by_id(uint64_t id) 1143 { 1144 struct spdk_thread *thread; 1145 1146 if (id == 0 || id >= g_thread_id) { 1147 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1148 return NULL; 1149 } 1150 pthread_mutex_lock(&g_devlist_mutex); 1151 TAILQ_FOREACH(thread, &g_threads, tailq) { 1152 if (thread->id == id) { 1153 break; 1154 } 1155 } 1156 pthread_mutex_unlock(&g_devlist_mutex); 1157 return thread; 1158 } 1159 1160 int 1161 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1162 { 1163 struct spdk_thread *thread; 1164 1165 thread = _get_thread(); 1166 if (!thread) { 1167 SPDK_ERRLOG("No thread allocated\n"); 1168 return -EINVAL; 1169 } 1170 1171 if (stats == NULL) { 1172 return -EINVAL; 1173 } 1174 1175 *stats = thread->stats; 1176 1177 return 0; 1178 } 1179 1180 uint64_t 1181 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1182 { 1183 if (thread == NULL) { 1184 thread = _get_thread(); 1185 } 1186 1187 return thread->tsc_last; 1188 } 1189 1190 static inline int 1191 thread_send_msg_notification(const struct spdk_thread *target_thread) 1192 { 1193 uint64_t notify = 1; 1194 int rc; 1195 1196 /* Not necessary to do notification if interrupt facility is not enabled */ 1197 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1198 return 0; 1199 } 1200 1201 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1202 * after sending thread msg, it is necessary to check whether target thread runs in 1203 * interrupt mode and then decide whether do event notification. 1204 */ 1205 if (spdk_unlikely(target_thread->in_interrupt)) { 1206 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1207 if (rc < 0) { 1208 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1209 return -EIO; 1210 } 1211 } 1212 1213 return 0; 1214 } 1215 1216 int 1217 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1218 { 1219 struct spdk_thread *local_thread; 1220 struct spdk_msg *msg; 1221 int rc; 1222 1223 assert(thread != NULL); 1224 1225 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1226 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1227 return -EIO; 1228 } 1229 1230 local_thread = _get_thread(); 1231 1232 msg = NULL; 1233 if (local_thread != NULL) { 1234 if (local_thread->msg_cache_count > 0) { 1235 msg = SLIST_FIRST(&local_thread->msg_cache); 1236 assert(msg != NULL); 1237 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1238 local_thread->msg_cache_count--; 1239 } 1240 } 1241 1242 if (msg == NULL) { 1243 msg = spdk_mempool_get(g_spdk_msg_mempool); 1244 if (!msg) { 1245 SPDK_ERRLOG("msg could not be allocated\n"); 1246 return -ENOMEM; 1247 } 1248 } 1249 1250 msg->fn = fn; 1251 msg->arg = ctx; 1252 1253 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1254 if (rc != 1) { 1255 SPDK_ERRLOG("msg could not be enqueued\n"); 1256 spdk_mempool_put(g_spdk_msg_mempool, msg); 1257 return -EIO; 1258 } 1259 1260 return thread_send_msg_notification(thread); 1261 } 1262 1263 int 1264 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1265 { 1266 spdk_msg_fn expected = NULL; 1267 1268 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1269 __ATOMIC_SEQ_CST)) { 1270 return -EIO; 1271 } 1272 1273 return thread_send_msg_notification(thread); 1274 } 1275 1276 #ifdef __linux__ 1277 static int 1278 interrupt_timerfd_process(void *arg) 1279 { 1280 struct spdk_poller *poller = arg; 1281 uint64_t exp; 1282 int rc; 1283 1284 /* clear the level of interval timer */ 1285 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1286 if (rc < 0) { 1287 if (rc == -EAGAIN) { 1288 return 0; 1289 } 1290 1291 return rc; 1292 } 1293 1294 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1295 1296 return poller->fn(poller->arg); 1297 } 1298 1299 static int 1300 period_poller_interrupt_init(struct spdk_poller *poller) 1301 { 1302 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1303 int timerfd; 1304 int rc; 1305 1306 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1307 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1308 if (timerfd < 0) { 1309 return -errno; 1310 } 1311 1312 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1313 if (rc < 0) { 1314 close(timerfd); 1315 return rc; 1316 } 1317 1318 poller->interruptfd = timerfd; 1319 return 0; 1320 } 1321 1322 static void 1323 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1324 { 1325 int timerfd = poller->interruptfd; 1326 uint64_t now_tick = spdk_get_ticks(); 1327 uint64_t ticks = spdk_get_ticks_hz(); 1328 int ret; 1329 struct itimerspec new_tv = {}; 1330 struct itimerspec old_tv = {}; 1331 1332 assert(poller->period_ticks != 0); 1333 assert(timerfd >= 0); 1334 1335 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1336 interrupt_mode ? "interrupt" : "poll"); 1337 1338 if (interrupt_mode) { 1339 /* Set repeated timer expiration */ 1340 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1341 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1342 1343 /* Update next timer expiration */ 1344 if (poller->next_run_tick == 0) { 1345 poller->next_run_tick = now_tick + poller->period_ticks; 1346 } else if (poller->next_run_tick < now_tick) { 1347 poller->next_run_tick = now_tick; 1348 } 1349 1350 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1351 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1352 1353 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1354 if (ret < 0) { 1355 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1356 assert(false); 1357 } 1358 } else { 1359 /* Disarm the timer */ 1360 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1361 if (ret < 0) { 1362 /* timerfd_settime's failure indicates that the timerfd is in error */ 1363 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1364 assert(false); 1365 } 1366 1367 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1368 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1369 */ 1370 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1371 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1372 poller_remove_timer(poller->thread, poller); 1373 poller_insert_timer(poller->thread, poller, now_tick); 1374 } 1375 } 1376 1377 static void 1378 poller_interrupt_fini(struct spdk_poller *poller) 1379 { 1380 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1381 assert(poller->interruptfd >= 0); 1382 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1383 close(poller->interruptfd); 1384 poller->interruptfd = -1; 1385 } 1386 1387 static int 1388 busy_poller_interrupt_init(struct spdk_poller *poller) 1389 { 1390 int busy_efd; 1391 int rc; 1392 1393 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1394 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1395 if (busy_efd < 0) { 1396 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1397 return -errno; 1398 } 1399 1400 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1401 poller->fn, poller->arg, poller->name); 1402 if (rc < 0) { 1403 close(busy_efd); 1404 return rc; 1405 } 1406 1407 poller->interruptfd = busy_efd; 1408 return 0; 1409 } 1410 1411 static void 1412 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1413 { 1414 int busy_efd = poller->interruptfd; 1415 uint64_t notify = 1; 1416 int rc __attribute__((unused)); 1417 1418 assert(busy_efd >= 0); 1419 1420 if (interrupt_mode) { 1421 /* Write without read on eventfd will get it repeatedly triggered. */ 1422 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1423 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1424 } 1425 } else { 1426 /* Read on eventfd will clear its level triggering. */ 1427 rc = read(busy_efd, ¬ify, sizeof(notify)); 1428 } 1429 } 1430 1431 #else 1432 1433 static int 1434 period_poller_interrupt_init(struct spdk_poller *poller) 1435 { 1436 return -ENOTSUP; 1437 } 1438 1439 static void 1440 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1441 { 1442 } 1443 1444 static void 1445 poller_interrupt_fini(struct spdk_poller *poller) 1446 { 1447 } 1448 1449 static int 1450 busy_poller_interrupt_init(struct spdk_poller *poller) 1451 { 1452 return -ENOTSUP; 1453 } 1454 1455 static void 1456 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1457 { 1458 } 1459 1460 #endif 1461 1462 void 1463 spdk_poller_register_interrupt(struct spdk_poller *poller, 1464 spdk_poller_set_interrupt_mode_cb cb_fn, 1465 void *cb_arg) 1466 { 1467 assert(poller != NULL); 1468 assert(cb_fn != NULL); 1469 assert(spdk_get_thread() == poller->thread); 1470 1471 if (!spdk_interrupt_mode_is_enabled()) { 1472 return; 1473 } 1474 1475 /* when a poller is created we don't know if the user is ever going to 1476 * enable interrupts on it by calling this function, so the poller 1477 * registration function has to immediately create a interruptfd. 1478 * When this function does get called by user, we have to then destroy 1479 * that interruptfd. 1480 */ 1481 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1482 poller_interrupt_fini(poller); 1483 } 1484 1485 poller->set_intr_cb_fn = cb_fn; 1486 poller->set_intr_cb_arg = cb_arg; 1487 1488 /* Set poller into interrupt mode if thread is in interrupt. */ 1489 if (poller->thread->in_interrupt) { 1490 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1491 } 1492 } 1493 1494 static uint64_t 1495 convert_us_to_ticks(uint64_t us) 1496 { 1497 uint64_t quotient, remainder, ticks; 1498 1499 if (us) { 1500 quotient = us / SPDK_SEC_TO_USEC; 1501 remainder = us % SPDK_SEC_TO_USEC; 1502 ticks = spdk_get_ticks_hz(); 1503 1504 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1505 } else { 1506 return 0; 1507 } 1508 } 1509 1510 static struct spdk_poller * 1511 poller_register(spdk_poller_fn fn, 1512 void *arg, 1513 uint64_t period_microseconds, 1514 const char *name) 1515 { 1516 struct spdk_thread *thread; 1517 struct spdk_poller *poller; 1518 1519 thread = spdk_get_thread(); 1520 if (!thread) { 1521 assert(false); 1522 return NULL; 1523 } 1524 1525 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1526 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1527 return NULL; 1528 } 1529 1530 poller = calloc(1, sizeof(*poller)); 1531 if (poller == NULL) { 1532 SPDK_ERRLOG("Poller memory allocation failed\n"); 1533 return NULL; 1534 } 1535 1536 if (name) { 1537 snprintf(poller->name, sizeof(poller->name), "%s", name); 1538 } else { 1539 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1540 } 1541 1542 poller->state = SPDK_POLLER_STATE_WAITING; 1543 poller->fn = fn; 1544 poller->arg = arg; 1545 poller->thread = thread; 1546 poller->interruptfd = -1; 1547 if (thread->next_poller_id == 0) { 1548 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1549 thread->next_poller_id = 1; 1550 } 1551 poller->id = thread->next_poller_id++; 1552 1553 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1554 1555 if (spdk_interrupt_mode_is_enabled()) { 1556 int rc; 1557 1558 if (period_microseconds) { 1559 rc = period_poller_interrupt_init(poller); 1560 if (rc < 0) { 1561 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1562 free(poller); 1563 return NULL; 1564 } 1565 1566 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1567 } else { 1568 /* If the poller doesn't have a period, create interruptfd that's always 1569 * busy automatically when running in interrupt mode. 1570 */ 1571 rc = busy_poller_interrupt_init(poller); 1572 if (rc > 0) { 1573 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1574 free(poller); 1575 return NULL; 1576 } 1577 1578 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1579 } 1580 } 1581 1582 thread_insert_poller(thread, poller); 1583 1584 return poller; 1585 } 1586 1587 struct spdk_poller * 1588 spdk_poller_register(spdk_poller_fn fn, 1589 void *arg, 1590 uint64_t period_microseconds) 1591 { 1592 return poller_register(fn, arg, period_microseconds, NULL); 1593 } 1594 1595 struct spdk_poller * 1596 spdk_poller_register_named(spdk_poller_fn fn, 1597 void *arg, 1598 uint64_t period_microseconds, 1599 const char *name) 1600 { 1601 return poller_register(fn, arg, period_microseconds, name); 1602 } 1603 1604 static void 1605 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1606 struct spdk_thread *curthread) 1607 { 1608 if (thread == NULL) { 1609 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1610 abort(); 1611 } 1612 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1613 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1614 thread->name, thread->id); 1615 assert(false); 1616 } 1617 1618 void 1619 spdk_poller_unregister(struct spdk_poller **ppoller) 1620 { 1621 struct spdk_thread *thread; 1622 struct spdk_poller *poller; 1623 1624 poller = *ppoller; 1625 if (poller == NULL) { 1626 return; 1627 } 1628 1629 *ppoller = NULL; 1630 1631 thread = spdk_get_thread(); 1632 if (!thread) { 1633 assert(false); 1634 return; 1635 } 1636 1637 if (poller->thread != thread) { 1638 wrong_thread(__func__, poller->name, poller->thread, thread); 1639 return; 1640 } 1641 1642 if (spdk_interrupt_mode_is_enabled()) { 1643 /* Release the interrupt resource for period or busy poller */ 1644 if (poller->interruptfd >= 0) { 1645 poller_interrupt_fini(poller); 1646 } 1647 1648 /* Mark there is poller unregistered. Then unregistered pollers will 1649 * get reaped by spdk_thread_poll also in intr mode. 1650 */ 1651 thread->poller_unregistered = true; 1652 } 1653 1654 /* If the poller was paused, put it on the active_pollers list so that 1655 * its unregistration can be processed by spdk_thread_poll(). 1656 */ 1657 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1658 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1659 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1660 poller->period_ticks = 0; 1661 } 1662 1663 /* Simply set the state to unregistered. The poller will get cleaned up 1664 * in a subsequent call to spdk_thread_poll(). 1665 */ 1666 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1667 } 1668 1669 void 1670 spdk_poller_pause(struct spdk_poller *poller) 1671 { 1672 struct spdk_thread *thread; 1673 1674 thread = spdk_get_thread(); 1675 if (!thread) { 1676 assert(false); 1677 return; 1678 } 1679 1680 if (poller->thread != thread) { 1681 wrong_thread(__func__, poller->name, poller->thread, thread); 1682 return; 1683 } 1684 1685 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1686 * spdk_thread_poll() move it. It allows a poller to be paused from 1687 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1688 * iteration, or from within itself without breaking the logic to always 1689 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1690 */ 1691 switch (poller->state) { 1692 case SPDK_POLLER_STATE_PAUSED: 1693 case SPDK_POLLER_STATE_PAUSING: 1694 break; 1695 case SPDK_POLLER_STATE_RUNNING: 1696 case SPDK_POLLER_STATE_WAITING: 1697 poller->state = SPDK_POLLER_STATE_PAUSING; 1698 break; 1699 default: 1700 assert(false); 1701 break; 1702 } 1703 } 1704 1705 void 1706 spdk_poller_resume(struct spdk_poller *poller) 1707 { 1708 struct spdk_thread *thread; 1709 1710 thread = spdk_get_thread(); 1711 if (!thread) { 1712 assert(false); 1713 return; 1714 } 1715 1716 if (poller->thread != thread) { 1717 wrong_thread(__func__, poller->name, poller->thread, thread); 1718 return; 1719 } 1720 1721 /* If a poller is paused it has to be removed from the paused pollers 1722 * list and put on the active list or timer tree depending on its 1723 * period_ticks. If a poller is still in the process of being paused, 1724 * we just need to flip its state back to waiting, as it's already on 1725 * the appropriate list or tree. 1726 */ 1727 switch (poller->state) { 1728 case SPDK_POLLER_STATE_PAUSED: 1729 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1730 thread_insert_poller(thread, poller); 1731 /* fallthrough */ 1732 case SPDK_POLLER_STATE_PAUSING: 1733 poller->state = SPDK_POLLER_STATE_WAITING; 1734 break; 1735 case SPDK_POLLER_STATE_RUNNING: 1736 case SPDK_POLLER_STATE_WAITING: 1737 break; 1738 default: 1739 assert(false); 1740 break; 1741 } 1742 } 1743 1744 const char * 1745 spdk_poller_get_name(struct spdk_poller *poller) 1746 { 1747 return poller->name; 1748 } 1749 1750 uint64_t 1751 spdk_poller_get_id(struct spdk_poller *poller) 1752 { 1753 return poller->id; 1754 } 1755 1756 const char * 1757 spdk_poller_get_state_str(struct spdk_poller *poller) 1758 { 1759 switch (poller->state) { 1760 case SPDK_POLLER_STATE_WAITING: 1761 return "waiting"; 1762 case SPDK_POLLER_STATE_RUNNING: 1763 return "running"; 1764 case SPDK_POLLER_STATE_UNREGISTERED: 1765 return "unregistered"; 1766 case SPDK_POLLER_STATE_PAUSING: 1767 return "pausing"; 1768 case SPDK_POLLER_STATE_PAUSED: 1769 return "paused"; 1770 default: 1771 return NULL; 1772 } 1773 } 1774 1775 uint64_t 1776 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1777 { 1778 return poller->period_ticks; 1779 } 1780 1781 void 1782 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1783 { 1784 stats->run_count = poller->run_count; 1785 stats->busy_count = poller->busy_count; 1786 } 1787 1788 struct spdk_poller * 1789 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1790 { 1791 return TAILQ_FIRST(&thread->active_pollers); 1792 } 1793 1794 struct spdk_poller * 1795 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1796 { 1797 return TAILQ_NEXT(prev, tailq); 1798 } 1799 1800 struct spdk_poller * 1801 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1802 { 1803 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1804 } 1805 1806 struct spdk_poller * 1807 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1808 { 1809 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1810 } 1811 1812 struct spdk_poller * 1813 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1814 { 1815 return TAILQ_FIRST(&thread->paused_pollers); 1816 } 1817 1818 struct spdk_poller * 1819 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1820 { 1821 return TAILQ_NEXT(prev, tailq); 1822 } 1823 1824 struct spdk_io_channel * 1825 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1826 { 1827 return RB_MIN(io_channel_tree, &thread->io_channels); 1828 } 1829 1830 struct spdk_io_channel * 1831 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1832 { 1833 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1834 } 1835 1836 struct call_thread { 1837 struct spdk_thread *cur_thread; 1838 spdk_msg_fn fn; 1839 void *ctx; 1840 1841 struct spdk_thread *orig_thread; 1842 spdk_msg_fn cpl; 1843 }; 1844 1845 static void 1846 _on_thread(void *ctx) 1847 { 1848 struct call_thread *ct = ctx; 1849 int rc __attribute__((unused)); 1850 1851 ct->fn(ct->ctx); 1852 1853 pthread_mutex_lock(&g_devlist_mutex); 1854 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1855 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1856 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1857 ct->cur_thread->name); 1858 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1859 } 1860 pthread_mutex_unlock(&g_devlist_mutex); 1861 1862 if (!ct->cur_thread) { 1863 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1864 1865 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1866 free(ctx); 1867 } else { 1868 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1869 ct->cur_thread->name); 1870 1871 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1872 } 1873 assert(rc == 0); 1874 } 1875 1876 void 1877 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1878 { 1879 struct call_thread *ct; 1880 struct spdk_thread *thread; 1881 int rc __attribute__((unused)); 1882 1883 ct = calloc(1, sizeof(*ct)); 1884 if (!ct) { 1885 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1886 cpl(ctx); 1887 return; 1888 } 1889 1890 ct->fn = fn; 1891 ct->ctx = ctx; 1892 ct->cpl = cpl; 1893 1894 thread = _get_thread(); 1895 if (!thread) { 1896 SPDK_ERRLOG("No thread allocated\n"); 1897 free(ct); 1898 cpl(ctx); 1899 return; 1900 } 1901 ct->orig_thread = thread; 1902 1903 pthread_mutex_lock(&g_devlist_mutex); 1904 ct->cur_thread = TAILQ_FIRST(&g_threads); 1905 pthread_mutex_unlock(&g_devlist_mutex); 1906 1907 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1908 ct->orig_thread->name); 1909 1910 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1911 assert(rc == 0); 1912 } 1913 1914 static inline void 1915 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1916 { 1917 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1918 return; 1919 } 1920 1921 if (!poller->set_intr_cb_fn) { 1922 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1923 assert(false); 1924 return; 1925 } 1926 1927 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1928 } 1929 1930 void 1931 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1932 { 1933 struct spdk_thread *thread = _get_thread(); 1934 struct spdk_poller *poller, *tmp; 1935 1936 assert(thread); 1937 assert(spdk_interrupt_mode_is_enabled()); 1938 1939 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1940 thread->name, enable_interrupt ? "intr" : "poll", 1941 thread->in_interrupt ? "intr" : "poll"); 1942 1943 if (thread->in_interrupt == enable_interrupt) { 1944 return; 1945 } 1946 1947 /* Set pollers to expected mode */ 1948 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1949 poller_set_interrupt_mode(poller, enable_interrupt); 1950 } 1951 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1952 poller_set_interrupt_mode(poller, enable_interrupt); 1953 } 1954 /* All paused pollers will go to work in interrupt mode */ 1955 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1956 poller_set_interrupt_mode(poller, enable_interrupt); 1957 } 1958 1959 thread->in_interrupt = enable_interrupt; 1960 return; 1961 } 1962 1963 static struct io_device * 1964 io_device_get(void *io_device) 1965 { 1966 struct io_device find = {}; 1967 1968 find.io_device = io_device; 1969 return RB_FIND(io_device_tree, &g_io_devices, &find); 1970 } 1971 1972 void 1973 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1974 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1975 const char *name) 1976 { 1977 struct io_device *dev, *tmp; 1978 struct spdk_thread *thread; 1979 1980 assert(io_device != NULL); 1981 assert(create_cb != NULL); 1982 assert(destroy_cb != NULL); 1983 1984 thread = spdk_get_thread(); 1985 if (!thread) { 1986 SPDK_ERRLOG("called from non-SPDK thread\n"); 1987 assert(false); 1988 return; 1989 } 1990 1991 dev = calloc(1, sizeof(struct io_device)); 1992 if (dev == NULL) { 1993 SPDK_ERRLOG("could not allocate io_device\n"); 1994 return; 1995 } 1996 1997 dev->io_device = io_device; 1998 if (name) { 1999 snprintf(dev->name, sizeof(dev->name), "%s", name); 2000 } else { 2001 snprintf(dev->name, sizeof(dev->name), "%p", dev); 2002 } 2003 dev->create_cb = create_cb; 2004 dev->destroy_cb = destroy_cb; 2005 dev->unregister_cb = NULL; 2006 dev->ctx_size = ctx_size; 2007 dev->for_each_count = 0; 2008 dev->unregistered = false; 2009 dev->refcnt = 0; 2010 2011 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2012 dev->name, dev->io_device, thread->name); 2013 2014 pthread_mutex_lock(&g_devlist_mutex); 2015 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2016 if (tmp != NULL) { 2017 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2018 io_device, tmp->name, dev->name); 2019 free(dev); 2020 } 2021 2022 pthread_mutex_unlock(&g_devlist_mutex); 2023 } 2024 2025 static void 2026 _finish_unregister(void *arg) 2027 { 2028 struct io_device *dev = arg; 2029 struct spdk_thread *thread; 2030 2031 thread = spdk_get_thread(); 2032 assert(thread == dev->unregister_thread); 2033 2034 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2035 dev->name, dev->io_device, thread->name); 2036 2037 assert(thread->pending_unregister_count > 0); 2038 thread->pending_unregister_count--; 2039 2040 dev->unregister_cb(dev->io_device); 2041 free(dev); 2042 } 2043 2044 static void 2045 io_device_free(struct io_device *dev) 2046 { 2047 int rc __attribute__((unused)); 2048 2049 if (dev->unregister_cb == NULL) { 2050 free(dev); 2051 } else { 2052 assert(dev->unregister_thread != NULL); 2053 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2054 dev->name, dev->io_device, dev->unregister_thread->name); 2055 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2056 assert(rc == 0); 2057 } 2058 } 2059 2060 void 2061 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2062 { 2063 struct io_device *dev; 2064 uint32_t refcnt; 2065 struct spdk_thread *thread; 2066 2067 thread = spdk_get_thread(); 2068 if (!thread) { 2069 SPDK_ERRLOG("called from non-SPDK thread\n"); 2070 assert(false); 2071 return; 2072 } 2073 2074 pthread_mutex_lock(&g_devlist_mutex); 2075 dev = io_device_get(io_device); 2076 if (!dev) { 2077 SPDK_ERRLOG("io_device %p not found\n", io_device); 2078 assert(false); 2079 pthread_mutex_unlock(&g_devlist_mutex); 2080 return; 2081 } 2082 2083 /* The for_each_count check differentiates the user attempting to unregister the 2084 * device a second time, from the internal call to this function that occurs 2085 * after the for_each_count reaches 0. 2086 */ 2087 if (dev->pending_unregister && dev->for_each_count > 0) { 2088 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2089 assert(false); 2090 pthread_mutex_unlock(&g_devlist_mutex); 2091 return; 2092 } 2093 2094 dev->unregister_cb = unregister_cb; 2095 dev->unregister_thread = thread; 2096 2097 if (dev->for_each_count > 0) { 2098 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2099 dev->name, io_device, dev->for_each_count); 2100 dev->pending_unregister = true; 2101 pthread_mutex_unlock(&g_devlist_mutex); 2102 return; 2103 } 2104 2105 dev->unregistered = true; 2106 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2107 refcnt = dev->refcnt; 2108 pthread_mutex_unlock(&g_devlist_mutex); 2109 2110 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2111 dev->name, dev->io_device, thread->name); 2112 2113 if (unregister_cb) { 2114 thread->pending_unregister_count++; 2115 } 2116 2117 if (refcnt > 0) { 2118 /* defer deletion */ 2119 return; 2120 } 2121 2122 io_device_free(dev); 2123 } 2124 2125 const char * 2126 spdk_io_device_get_name(struct io_device *dev) 2127 { 2128 return dev->name; 2129 } 2130 2131 static struct spdk_io_channel * 2132 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2133 { 2134 struct spdk_io_channel find = {}; 2135 2136 find.dev = dev; 2137 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2138 } 2139 2140 struct spdk_io_channel * 2141 spdk_get_io_channel(void *io_device) 2142 { 2143 struct spdk_io_channel *ch; 2144 struct spdk_thread *thread; 2145 struct io_device *dev; 2146 int rc; 2147 2148 pthread_mutex_lock(&g_devlist_mutex); 2149 dev = io_device_get(io_device); 2150 if (dev == NULL) { 2151 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2152 pthread_mutex_unlock(&g_devlist_mutex); 2153 return NULL; 2154 } 2155 2156 thread = _get_thread(); 2157 if (!thread) { 2158 SPDK_ERRLOG("No thread allocated\n"); 2159 pthread_mutex_unlock(&g_devlist_mutex); 2160 return NULL; 2161 } 2162 2163 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2164 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2165 pthread_mutex_unlock(&g_devlist_mutex); 2166 return NULL; 2167 } 2168 2169 ch = thread_get_io_channel(thread, dev); 2170 if (ch != NULL) { 2171 ch->ref++; 2172 2173 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2174 ch, dev->name, dev->io_device, thread->name, ch->ref); 2175 2176 /* 2177 * An I/O channel already exists for this device on this 2178 * thread, so return it. 2179 */ 2180 pthread_mutex_unlock(&g_devlist_mutex); 2181 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2182 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2183 return ch; 2184 } 2185 2186 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2187 if (ch == NULL) { 2188 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2189 pthread_mutex_unlock(&g_devlist_mutex); 2190 return NULL; 2191 } 2192 2193 ch->dev = dev; 2194 ch->destroy_cb = dev->destroy_cb; 2195 ch->thread = thread; 2196 ch->ref = 1; 2197 ch->destroy_ref = 0; 2198 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2199 2200 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2201 ch, dev->name, dev->io_device, thread->name, ch->ref); 2202 2203 dev->refcnt++; 2204 2205 pthread_mutex_unlock(&g_devlist_mutex); 2206 2207 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2208 if (rc != 0) { 2209 pthread_mutex_lock(&g_devlist_mutex); 2210 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2211 dev->refcnt--; 2212 free(ch); 2213 pthread_mutex_unlock(&g_devlist_mutex); 2214 return NULL; 2215 } 2216 2217 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2218 return ch; 2219 } 2220 2221 static void 2222 put_io_channel(void *arg) 2223 { 2224 struct spdk_io_channel *ch = arg; 2225 bool do_remove_dev = true; 2226 struct spdk_thread *thread; 2227 2228 thread = spdk_get_thread(); 2229 if (!thread) { 2230 SPDK_ERRLOG("called from non-SPDK thread\n"); 2231 assert(false); 2232 return; 2233 } 2234 2235 SPDK_DEBUGLOG(thread, 2236 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2237 ch, ch->dev->name, ch->dev->io_device, thread->name); 2238 2239 assert(ch->thread == thread); 2240 2241 ch->destroy_ref--; 2242 2243 if (ch->ref > 0 || ch->destroy_ref > 0) { 2244 /* 2245 * Another reference to the associated io_device was requested 2246 * after this message was sent but before it had a chance to 2247 * execute. 2248 */ 2249 return; 2250 } 2251 2252 pthread_mutex_lock(&g_devlist_mutex); 2253 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2254 pthread_mutex_unlock(&g_devlist_mutex); 2255 2256 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2257 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2258 2259 pthread_mutex_lock(&g_devlist_mutex); 2260 ch->dev->refcnt--; 2261 2262 if (!ch->dev->unregistered) { 2263 do_remove_dev = false; 2264 } 2265 2266 if (ch->dev->refcnt > 0) { 2267 do_remove_dev = false; 2268 } 2269 2270 pthread_mutex_unlock(&g_devlist_mutex); 2271 2272 if (do_remove_dev) { 2273 io_device_free(ch->dev); 2274 } 2275 free(ch); 2276 } 2277 2278 void 2279 spdk_put_io_channel(struct spdk_io_channel *ch) 2280 { 2281 struct spdk_thread *thread; 2282 int rc __attribute__((unused)); 2283 2284 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2285 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2286 2287 thread = spdk_get_thread(); 2288 if (!thread) { 2289 SPDK_ERRLOG("called from non-SPDK thread\n"); 2290 assert(false); 2291 return; 2292 } 2293 2294 if (ch->thread != thread) { 2295 wrong_thread(__func__, "ch", ch->thread, thread); 2296 return; 2297 } 2298 2299 SPDK_DEBUGLOG(thread, 2300 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2301 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2302 2303 ch->ref--; 2304 2305 if (ch->ref == 0) { 2306 ch->destroy_ref++; 2307 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2308 assert(rc == 0); 2309 } 2310 } 2311 2312 struct spdk_io_channel * 2313 spdk_io_channel_from_ctx(void *ctx) 2314 { 2315 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2316 } 2317 2318 struct spdk_thread * 2319 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2320 { 2321 return ch->thread; 2322 } 2323 2324 void * 2325 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2326 { 2327 return ch->dev->io_device; 2328 } 2329 2330 const char * 2331 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2332 { 2333 return spdk_io_device_get_name(ch->dev); 2334 } 2335 2336 int 2337 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2338 { 2339 return ch->ref; 2340 } 2341 2342 struct spdk_io_channel_iter { 2343 void *io_device; 2344 struct io_device *dev; 2345 spdk_channel_msg fn; 2346 int status; 2347 void *ctx; 2348 struct spdk_io_channel *ch; 2349 2350 struct spdk_thread *cur_thread; 2351 2352 struct spdk_thread *orig_thread; 2353 spdk_channel_for_each_cpl cpl; 2354 }; 2355 2356 void * 2357 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2358 { 2359 return i->io_device; 2360 } 2361 2362 struct spdk_io_channel * 2363 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2364 { 2365 return i->ch; 2366 } 2367 2368 void * 2369 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2370 { 2371 return i->ctx; 2372 } 2373 2374 static void 2375 _call_completion(void *ctx) 2376 { 2377 struct spdk_io_channel_iter *i = ctx; 2378 2379 if (i->cpl != NULL) { 2380 i->cpl(i, i->status); 2381 } 2382 free(i); 2383 } 2384 2385 static void 2386 _call_channel(void *ctx) 2387 { 2388 struct spdk_io_channel_iter *i = ctx; 2389 struct spdk_io_channel *ch; 2390 2391 /* 2392 * It is possible that the channel was deleted before this 2393 * message had a chance to execute. If so, skip calling 2394 * the fn() on this thread. 2395 */ 2396 pthread_mutex_lock(&g_devlist_mutex); 2397 ch = thread_get_io_channel(i->cur_thread, i->dev); 2398 pthread_mutex_unlock(&g_devlist_mutex); 2399 2400 if (ch) { 2401 i->fn(i); 2402 } else { 2403 spdk_for_each_channel_continue(i, 0); 2404 } 2405 } 2406 2407 void 2408 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2409 spdk_channel_for_each_cpl cpl) 2410 { 2411 struct spdk_thread *thread; 2412 struct spdk_io_channel *ch; 2413 struct spdk_io_channel_iter *i; 2414 int rc __attribute__((unused)); 2415 2416 i = calloc(1, sizeof(*i)); 2417 if (!i) { 2418 SPDK_ERRLOG("Unable to allocate iterator\n"); 2419 assert(false); 2420 return; 2421 } 2422 2423 i->io_device = io_device; 2424 i->fn = fn; 2425 i->ctx = ctx; 2426 i->cpl = cpl; 2427 i->orig_thread = _get_thread(); 2428 2429 pthread_mutex_lock(&g_devlist_mutex); 2430 i->dev = io_device_get(io_device); 2431 if (i->dev == NULL) { 2432 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2433 assert(false); 2434 i->status = -ENODEV; 2435 goto end; 2436 } 2437 2438 /* Do not allow new for_each operations if we are already waiting to unregister 2439 * the device for other for_each operations to complete. 2440 */ 2441 if (i->dev->pending_unregister) { 2442 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2443 i->status = -ENODEV; 2444 goto end; 2445 } 2446 2447 TAILQ_FOREACH(thread, &g_threads, tailq) { 2448 ch = thread_get_io_channel(thread, i->dev); 2449 if (ch != NULL) { 2450 ch->dev->for_each_count++; 2451 i->cur_thread = thread; 2452 i->ch = ch; 2453 pthread_mutex_unlock(&g_devlist_mutex); 2454 rc = spdk_thread_send_msg(thread, _call_channel, i); 2455 assert(rc == 0); 2456 return; 2457 } 2458 } 2459 2460 end: 2461 pthread_mutex_unlock(&g_devlist_mutex); 2462 2463 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2464 assert(rc == 0); 2465 } 2466 2467 static void 2468 __pending_unregister(void *arg) 2469 { 2470 struct io_device *dev = arg; 2471 2472 assert(dev->pending_unregister); 2473 assert(dev->for_each_count == 0); 2474 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2475 } 2476 2477 void 2478 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2479 { 2480 struct spdk_thread *thread; 2481 struct spdk_io_channel *ch; 2482 struct io_device *dev; 2483 int rc __attribute__((unused)); 2484 2485 assert(i->cur_thread == spdk_get_thread()); 2486 2487 i->status = status; 2488 2489 pthread_mutex_lock(&g_devlist_mutex); 2490 dev = i->dev; 2491 if (status) { 2492 goto end; 2493 } 2494 2495 thread = TAILQ_NEXT(i->cur_thread, tailq); 2496 while (thread) { 2497 ch = thread_get_io_channel(thread, dev); 2498 if (ch != NULL) { 2499 i->cur_thread = thread; 2500 i->ch = ch; 2501 pthread_mutex_unlock(&g_devlist_mutex); 2502 rc = spdk_thread_send_msg(thread, _call_channel, i); 2503 assert(rc == 0); 2504 return; 2505 } 2506 thread = TAILQ_NEXT(thread, tailq); 2507 } 2508 2509 end: 2510 dev->for_each_count--; 2511 i->ch = NULL; 2512 pthread_mutex_unlock(&g_devlist_mutex); 2513 2514 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2515 assert(rc == 0); 2516 2517 pthread_mutex_lock(&g_devlist_mutex); 2518 if (dev->pending_unregister && dev->for_each_count == 0) { 2519 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2520 assert(rc == 0); 2521 } 2522 pthread_mutex_unlock(&g_devlist_mutex); 2523 } 2524 2525 struct spdk_interrupt { 2526 int efd; 2527 struct spdk_thread *thread; 2528 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2529 }; 2530 2531 static void 2532 thread_interrupt_destroy(struct spdk_thread *thread) 2533 { 2534 struct spdk_fd_group *fgrp = thread->fgrp; 2535 2536 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2537 2538 if (thread->msg_fd < 0) { 2539 return; 2540 } 2541 2542 spdk_fd_group_remove(fgrp, thread->msg_fd); 2543 close(thread->msg_fd); 2544 thread->msg_fd = -1; 2545 2546 spdk_fd_group_destroy(fgrp); 2547 thread->fgrp = NULL; 2548 } 2549 2550 #ifdef __linux__ 2551 static int 2552 thread_interrupt_msg_process(void *arg) 2553 { 2554 struct spdk_thread *thread = arg; 2555 uint32_t msg_count; 2556 spdk_msg_fn critical_msg; 2557 int rc = 0; 2558 uint64_t notify = 1; 2559 2560 assert(spdk_interrupt_mode_is_enabled()); 2561 2562 /* There may be race between msg_acknowledge and another producer's msg_notify, 2563 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2564 * This can avoid msg notification missing. 2565 */ 2566 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2567 if (rc < 0 && errno != EAGAIN) { 2568 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2569 } 2570 2571 critical_msg = thread->critical_msg; 2572 if (spdk_unlikely(critical_msg != NULL)) { 2573 critical_msg(NULL); 2574 thread->critical_msg = NULL; 2575 rc = 1; 2576 } 2577 2578 msg_count = msg_queue_run_batch(thread, 0); 2579 if (msg_count) { 2580 rc = 1; 2581 } 2582 2583 return rc; 2584 } 2585 2586 static int 2587 thread_interrupt_create(struct spdk_thread *thread) 2588 { 2589 int rc; 2590 2591 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2592 2593 rc = spdk_fd_group_create(&thread->fgrp); 2594 if (rc) { 2595 return rc; 2596 } 2597 2598 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2599 if (thread->msg_fd < 0) { 2600 rc = -errno; 2601 spdk_fd_group_destroy(thread->fgrp); 2602 thread->fgrp = NULL; 2603 2604 return rc; 2605 } 2606 2607 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2608 thread_interrupt_msg_process, thread); 2609 } 2610 #else 2611 static int 2612 thread_interrupt_create(struct spdk_thread *thread) 2613 { 2614 return -ENOTSUP; 2615 } 2616 #endif 2617 2618 struct spdk_interrupt * 2619 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2620 void *arg, const char *name) 2621 { 2622 struct spdk_thread *thread; 2623 struct spdk_interrupt *intr; 2624 int ret; 2625 2626 thread = spdk_get_thread(); 2627 if (!thread) { 2628 assert(false); 2629 return NULL; 2630 } 2631 2632 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2633 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2634 return NULL; 2635 } 2636 2637 intr = calloc(1, sizeof(*intr)); 2638 if (intr == NULL) { 2639 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2640 return NULL; 2641 } 2642 2643 if (name) { 2644 snprintf(intr->name, sizeof(intr->name), "%s", name); 2645 } else { 2646 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2647 } 2648 2649 intr->efd = efd; 2650 intr->thread = thread; 2651 2652 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2653 2654 if (ret != 0) { 2655 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2656 thread->name, efd, spdk_strerror(-ret)); 2657 free(intr); 2658 return NULL; 2659 } 2660 2661 return intr; 2662 } 2663 2664 void 2665 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2666 { 2667 struct spdk_thread *thread; 2668 struct spdk_interrupt *intr; 2669 2670 intr = *pintr; 2671 if (intr == NULL) { 2672 return; 2673 } 2674 2675 *pintr = NULL; 2676 2677 thread = spdk_get_thread(); 2678 if (!thread) { 2679 assert(false); 2680 return; 2681 } 2682 2683 if (intr->thread != thread) { 2684 wrong_thread(__func__, intr->name, intr->thread, thread); 2685 return; 2686 } 2687 2688 spdk_fd_group_remove(thread->fgrp, intr->efd); 2689 free(intr); 2690 } 2691 2692 int 2693 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2694 enum spdk_interrupt_event_types event_types) 2695 { 2696 struct spdk_thread *thread; 2697 2698 thread = spdk_get_thread(); 2699 if (!thread) { 2700 assert(false); 2701 return -EINVAL; 2702 } 2703 2704 if (intr->thread != thread) { 2705 wrong_thread(__func__, intr->name, intr->thread, thread); 2706 return -EINVAL; 2707 } 2708 2709 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2710 } 2711 2712 int 2713 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2714 { 2715 return spdk_fd_group_get_fd(thread->fgrp); 2716 } 2717 2718 static bool g_interrupt_mode = false; 2719 2720 int 2721 spdk_interrupt_mode_enable(void) 2722 { 2723 /* It must be called once prior to initializing the threading library. 2724 * g_spdk_msg_mempool will be valid if thread library is initialized. 2725 */ 2726 if (g_spdk_msg_mempool) { 2727 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2728 return -1; 2729 } 2730 2731 #ifdef __linux__ 2732 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2733 g_interrupt_mode = true; 2734 return 0; 2735 #else 2736 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2737 g_interrupt_mode = false; 2738 return -ENOTSUP; 2739 #endif 2740 } 2741 2742 bool 2743 spdk_interrupt_mode_is_enabled(void) 2744 { 2745 return g_interrupt_mode; 2746 } 2747 2748 SPDK_LOG_REGISTER_COMPONENT(thread) 2749