1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/env.h" 9 #include "spdk/likely.h" 10 #include "spdk/queue.h" 11 #include "spdk/string.h" 12 #include "spdk/thread.h" 13 #include "spdk/trace.h" 14 #include "spdk/util.h" 15 #include "spdk/fd_group.h" 16 17 #include "spdk/log.h" 18 #include "spdk_internal/thread.h" 19 #include "spdk_internal/usdt.h" 20 #include "thread_internal.h" 21 22 #include "spdk_internal/trace_defs.h" 23 24 #ifdef __linux__ 25 #include <sys/timerfd.h> 26 #include <sys/eventfd.h> 27 #endif 28 29 #define SPDK_MSG_BATCH_SIZE 8 30 #define SPDK_MAX_DEVICE_NAME_LEN 256 31 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 32 #define SPDK_MAX_POLLER_NAME_LEN 256 33 #define SPDK_MAX_THREAD_NAME_LEN 256 34 35 static struct spdk_thread *g_app_thread; 36 37 enum spdk_poller_state { 38 /* The poller is registered with a thread but not currently executing its fn. */ 39 SPDK_POLLER_STATE_WAITING, 40 41 /* The poller is currently running its fn. */ 42 SPDK_POLLER_STATE_RUNNING, 43 44 /* The poller was unregistered during the execution of its fn. */ 45 SPDK_POLLER_STATE_UNREGISTERED, 46 47 /* The poller is in the process of being paused. It will be paused 48 * during the next time it's supposed to be executed. 49 */ 50 SPDK_POLLER_STATE_PAUSING, 51 52 /* The poller is registered but currently paused. It's on the 53 * paused_pollers list. 54 */ 55 SPDK_POLLER_STATE_PAUSED, 56 }; 57 58 struct spdk_poller { 59 TAILQ_ENTRY(spdk_poller) tailq; 60 RB_ENTRY(spdk_poller) node; 61 62 /* Current state of the poller; should only be accessed from the poller's thread. */ 63 enum spdk_poller_state state; 64 65 uint64_t period_ticks; 66 uint64_t next_run_tick; 67 uint64_t run_count; 68 uint64_t busy_count; 69 uint64_t id; 70 spdk_poller_fn fn; 71 void *arg; 72 struct spdk_thread *thread; 73 /* Native interruptfd for period or busy poller */ 74 int interruptfd; 75 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 76 void *set_intr_cb_arg; 77 78 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 79 }; 80 81 enum spdk_thread_state { 82 /* The thread is processing poller and message by spdk_thread_poll(). */ 83 SPDK_THREAD_STATE_RUNNING, 84 85 /* The thread is in the process of termination. It reaps unregistering 86 * poller are releasing I/O channel. 87 */ 88 SPDK_THREAD_STATE_EXITING, 89 90 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 91 SPDK_THREAD_STATE_EXITED, 92 }; 93 94 struct spdk_thread { 95 uint64_t tsc_last; 96 struct spdk_thread_stats stats; 97 /* 98 * Contains pollers actively running on this thread. Pollers 99 * are run round-robin. The thread takes one poller from the head 100 * of the ring, executes it, then puts it back at the tail of 101 * the ring. 102 */ 103 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 104 /** 105 * Contains pollers running on this thread with a periodic timer. 106 */ 107 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 108 struct spdk_poller *first_timed_poller; 109 /* 110 * Contains paused pollers. Pollers on this queue are waiting until 111 * they are resumed (in which case they're put onto the active/timer 112 * queues) or unregistered. 113 */ 114 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 115 struct spdk_ring *messages; 116 int msg_fd; 117 SLIST_HEAD(, spdk_msg) msg_cache; 118 size_t msg_cache_count; 119 spdk_msg_fn critical_msg; 120 uint64_t id; 121 uint64_t next_poller_id; 122 enum spdk_thread_state state; 123 int pending_unregister_count; 124 125 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 126 TAILQ_ENTRY(spdk_thread) tailq; 127 128 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 129 struct spdk_cpuset cpumask; 130 uint64_t exit_timeout_tsc; 131 132 /* Indicates whether this spdk_thread currently runs in interrupt. */ 133 bool in_interrupt; 134 bool poller_unregistered; 135 struct spdk_fd_group *fgrp; 136 137 /* User context allocated at the end */ 138 uint8_t ctx[0]; 139 }; 140 141 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 142 143 static spdk_new_thread_fn g_new_thread_fn = NULL; 144 static spdk_thread_op_fn g_thread_op_fn = NULL; 145 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 146 static size_t g_ctx_sz = 0; 147 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 148 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 149 * SPDK application is required. 150 */ 151 static uint64_t g_thread_id = 1; 152 153 struct io_device { 154 void *io_device; 155 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 156 spdk_io_channel_create_cb create_cb; 157 spdk_io_channel_destroy_cb destroy_cb; 158 spdk_io_device_unregister_cb unregister_cb; 159 struct spdk_thread *unregister_thread; 160 uint32_t ctx_size; 161 uint32_t for_each_count; 162 RB_ENTRY(io_device) node; 163 164 uint32_t refcnt; 165 166 bool pending_unregister; 167 bool unregistered; 168 }; 169 170 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 171 172 static int 173 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 174 { 175 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 176 } 177 178 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 179 180 static int 181 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 182 { 183 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 184 } 185 186 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 187 188 struct spdk_msg { 189 spdk_msg_fn fn; 190 void *arg; 191 192 SLIST_ENTRY(spdk_msg) link; 193 }; 194 195 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 196 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 197 198 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 199 static uint32_t g_thread_count = 0; 200 201 static __thread struct spdk_thread *tls_thread = NULL; 202 203 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 204 { 205 spdk_trace_register_description("THREAD_IOCH_GET", 206 TRACE_THREAD_IOCH_GET, 207 OWNER_NONE, OBJECT_NONE, 0, 208 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 209 spdk_trace_register_description("THREAD_IOCH_PUT", 210 TRACE_THREAD_IOCH_PUT, 211 OWNER_NONE, OBJECT_NONE, 0, 212 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 213 } 214 215 /* 216 * If this compare function returns zero when two next_run_ticks are equal, 217 * the macro RB_INSERT() returns a pointer to the element with the same 218 * next_run_tick. 219 * 220 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 221 * to remove as a parameter. 222 * 223 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 224 * side by returning 1 when two next_run_ticks are equal. 225 */ 226 static inline int 227 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 228 { 229 if (poller1->next_run_tick < poller2->next_run_tick) { 230 return -1; 231 } else { 232 return 1; 233 } 234 } 235 236 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 237 238 static inline struct spdk_thread * 239 _get_thread(void) 240 { 241 return tls_thread; 242 } 243 244 static int 245 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 246 { 247 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 248 249 g_ctx_sz = ctx_sz; 250 251 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 252 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 253 sizeof(struct spdk_msg), 254 0, /* No cache. We do our own. */ 255 SPDK_ENV_SOCKET_ID_ANY); 256 257 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 258 msg_mempool_sz); 259 260 if (!g_spdk_msg_mempool) { 261 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 262 return -ENOMEM; 263 } 264 265 return 0; 266 } 267 268 static void thread_interrupt_destroy(struct spdk_thread *thread); 269 static int thread_interrupt_create(struct spdk_thread *thread); 270 271 static void 272 _free_thread(struct spdk_thread *thread) 273 { 274 struct spdk_io_channel *ch; 275 struct spdk_msg *msg; 276 struct spdk_poller *poller, *ptmp; 277 278 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 279 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 280 thread->name, ch->dev->name); 281 } 282 283 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 284 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 285 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 286 poller->name); 287 } 288 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 289 free(poller); 290 } 291 292 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 293 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 294 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 295 poller->name); 296 } 297 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 298 free(poller); 299 } 300 301 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 302 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 303 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 304 free(poller); 305 } 306 307 pthread_mutex_lock(&g_devlist_mutex); 308 assert(g_thread_count > 0); 309 g_thread_count--; 310 TAILQ_REMOVE(&g_threads, thread, tailq); 311 pthread_mutex_unlock(&g_devlist_mutex); 312 313 msg = SLIST_FIRST(&thread->msg_cache); 314 while (msg != NULL) { 315 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 316 317 assert(thread->msg_cache_count > 0); 318 thread->msg_cache_count--; 319 spdk_mempool_put(g_spdk_msg_mempool, msg); 320 321 msg = SLIST_FIRST(&thread->msg_cache); 322 } 323 324 assert(thread->msg_cache_count == 0); 325 326 if (spdk_interrupt_mode_is_enabled()) { 327 thread_interrupt_destroy(thread); 328 } 329 330 spdk_ring_free(thread->messages); 331 free(thread); 332 } 333 334 int 335 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 336 { 337 assert(g_new_thread_fn == NULL); 338 assert(g_thread_op_fn == NULL); 339 340 if (new_thread_fn == NULL) { 341 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 342 } else { 343 g_new_thread_fn = new_thread_fn; 344 } 345 346 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 347 } 348 349 int 350 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 351 spdk_thread_op_supported_fn thread_op_supported_fn, 352 size_t ctx_sz, size_t msg_mempool_sz) 353 { 354 assert(g_new_thread_fn == NULL); 355 assert(g_thread_op_fn == NULL); 356 assert(g_thread_op_supported_fn == NULL); 357 358 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 359 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 360 return -EINVAL; 361 } 362 363 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 364 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 365 } else { 366 g_thread_op_fn = thread_op_fn; 367 g_thread_op_supported_fn = thread_op_supported_fn; 368 } 369 370 return _thread_lib_init(ctx_sz, msg_mempool_sz); 371 } 372 373 void 374 spdk_thread_lib_fini(void) 375 { 376 struct io_device *dev; 377 378 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 379 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 380 } 381 382 g_new_thread_fn = NULL; 383 g_thread_op_fn = NULL; 384 g_thread_op_supported_fn = NULL; 385 g_ctx_sz = 0; 386 if (g_app_thread != NULL) { 387 _free_thread(g_app_thread); 388 g_app_thread = NULL; 389 } 390 391 if (g_spdk_msg_mempool) { 392 spdk_mempool_free(g_spdk_msg_mempool); 393 g_spdk_msg_mempool = NULL; 394 } 395 } 396 397 struct spdk_thread * 398 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 399 { 400 struct spdk_thread *thread, *null_thread; 401 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 402 int rc = 0, i; 403 404 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 405 if (!thread) { 406 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 407 return NULL; 408 } 409 410 if (cpumask) { 411 spdk_cpuset_copy(&thread->cpumask, cpumask); 412 } else { 413 spdk_cpuset_negate(&thread->cpumask); 414 } 415 416 RB_INIT(&thread->io_channels); 417 TAILQ_INIT(&thread->active_pollers); 418 RB_INIT(&thread->timed_pollers); 419 TAILQ_INIT(&thread->paused_pollers); 420 SLIST_INIT(&thread->msg_cache); 421 thread->msg_cache_count = 0; 422 423 thread->tsc_last = spdk_get_ticks(); 424 425 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 426 * ID exceeds UINT64_MAX a warning message is logged 427 */ 428 thread->next_poller_id = 1; 429 430 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 431 if (!thread->messages) { 432 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 433 free(thread); 434 return NULL; 435 } 436 437 /* Fill the local message pool cache. */ 438 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 439 if (rc == 0) { 440 /* If we can't populate the cache it's ok. The cache will get filled 441 * up organically as messages are passed to the thread. */ 442 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 443 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 444 thread->msg_cache_count++; 445 } 446 } 447 448 if (name) { 449 snprintf(thread->name, sizeof(thread->name), "%s", name); 450 } else { 451 snprintf(thread->name, sizeof(thread->name), "%p", thread); 452 } 453 454 pthread_mutex_lock(&g_devlist_mutex); 455 if (g_thread_id == 0) { 456 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 457 pthread_mutex_unlock(&g_devlist_mutex); 458 _free_thread(thread); 459 return NULL; 460 } 461 thread->id = g_thread_id++; 462 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 463 g_thread_count++; 464 pthread_mutex_unlock(&g_devlist_mutex); 465 466 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 467 thread->id, thread->name); 468 469 if (spdk_interrupt_mode_is_enabled()) { 470 thread->in_interrupt = true; 471 rc = thread_interrupt_create(thread); 472 if (rc != 0) { 473 _free_thread(thread); 474 return NULL; 475 } 476 } 477 478 if (g_new_thread_fn) { 479 rc = g_new_thread_fn(thread); 480 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 481 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 482 } 483 484 if (rc != 0) { 485 _free_thread(thread); 486 return NULL; 487 } 488 489 thread->state = SPDK_THREAD_STATE_RUNNING; 490 491 /* If this is the first thread, save it as the app thread. Use an atomic 492 * compare + exchange to guard against crazy users who might try to 493 * call spdk_thread_create() simultaneously on multiple threads. 494 */ 495 null_thread = NULL; 496 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 497 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 498 499 return thread; 500 } 501 502 struct spdk_thread * 503 spdk_thread_get_app_thread(void) 504 { 505 return g_app_thread; 506 } 507 508 void 509 spdk_set_thread(struct spdk_thread *thread) 510 { 511 tls_thread = thread; 512 } 513 514 static void 515 thread_exit(struct spdk_thread *thread, uint64_t now) 516 { 517 struct spdk_poller *poller; 518 struct spdk_io_channel *ch; 519 520 if (now >= thread->exit_timeout_tsc) { 521 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 522 thread->name); 523 goto exited; 524 } 525 526 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 527 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 528 SPDK_INFOLOG(thread, 529 "thread %s still has active poller %s\n", 530 thread->name, poller->name); 531 return; 532 } 533 } 534 535 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 536 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 537 SPDK_INFOLOG(thread, 538 "thread %s still has active timed poller %s\n", 539 thread->name, poller->name); 540 return; 541 } 542 } 543 544 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 545 SPDK_INFOLOG(thread, 546 "thread %s still has paused poller %s\n", 547 thread->name, poller->name); 548 return; 549 } 550 551 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 552 SPDK_INFOLOG(thread, 553 "thread %s still has channel for io_device %s\n", 554 thread->name, ch->dev->name); 555 return; 556 } 557 558 if (thread->pending_unregister_count > 0) { 559 SPDK_INFOLOG(thread, 560 "thread %s is still unregistering io_devices\n", 561 thread->name); 562 return; 563 } 564 565 exited: 566 thread->state = SPDK_THREAD_STATE_EXITED; 567 if (spdk_unlikely(thread->in_interrupt)) { 568 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 569 } 570 } 571 572 int 573 spdk_thread_exit(struct spdk_thread *thread) 574 { 575 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 576 577 assert(tls_thread == thread); 578 579 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 580 SPDK_INFOLOG(thread, 581 "thread %s is already exiting\n", 582 thread->name); 583 return 0; 584 } 585 586 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 587 SPDK_THREAD_EXIT_TIMEOUT_SEC); 588 thread->state = SPDK_THREAD_STATE_EXITING; 589 return 0; 590 } 591 592 bool 593 spdk_thread_is_exited(struct spdk_thread *thread) 594 { 595 return thread->state == SPDK_THREAD_STATE_EXITED; 596 } 597 598 void 599 spdk_thread_destroy(struct spdk_thread *thread) 600 { 601 assert(thread != NULL); 602 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 603 604 assert(thread->state == SPDK_THREAD_STATE_EXITED); 605 606 if (tls_thread == thread) { 607 tls_thread = NULL; 608 } 609 610 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 611 if (thread != g_app_thread) { 612 _free_thread(thread); 613 } 614 } 615 616 void * 617 spdk_thread_get_ctx(struct spdk_thread *thread) 618 { 619 if (g_ctx_sz > 0) { 620 return thread->ctx; 621 } 622 623 return NULL; 624 } 625 626 struct spdk_cpuset * 627 spdk_thread_get_cpumask(struct spdk_thread *thread) 628 { 629 return &thread->cpumask; 630 } 631 632 int 633 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 634 { 635 struct spdk_thread *thread; 636 637 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 638 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 639 assert(false); 640 return -ENOTSUP; 641 } 642 643 thread = spdk_get_thread(); 644 if (!thread) { 645 SPDK_ERRLOG("Called from non-SPDK thread\n"); 646 assert(false); 647 return -EINVAL; 648 } 649 650 spdk_cpuset_copy(&thread->cpumask, cpumask); 651 652 /* Invoke framework's reschedule operation. If this function is called multiple times 653 * in a single spdk_thread_poll() context, the last cpumask will be used in the 654 * reschedule operation. 655 */ 656 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 657 658 return 0; 659 } 660 661 struct spdk_thread * 662 spdk_thread_get_from_ctx(void *ctx) 663 { 664 if (ctx == NULL) { 665 assert(false); 666 return NULL; 667 } 668 669 assert(g_ctx_sz > 0); 670 671 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 672 } 673 674 static inline uint32_t 675 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 676 { 677 unsigned count, i; 678 void *messages[SPDK_MSG_BATCH_SIZE]; 679 uint64_t notify = 1; 680 int rc; 681 682 #ifdef DEBUG 683 /* 684 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 685 * so we will never actually read uninitialized data from events, but just to be sure 686 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 687 */ 688 memset(messages, 0, sizeof(messages)); 689 #endif 690 691 if (max_msgs > 0) { 692 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 693 } else { 694 max_msgs = SPDK_MSG_BATCH_SIZE; 695 } 696 697 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 698 if (spdk_unlikely(thread->in_interrupt) && 699 spdk_ring_count(thread->messages) != 0) { 700 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 701 if (rc < 0) { 702 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 703 } 704 } 705 if (count == 0) { 706 return 0; 707 } 708 709 for (i = 0; i < count; i++) { 710 struct spdk_msg *msg = messages[i]; 711 712 assert(msg != NULL); 713 714 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 715 716 msg->fn(msg->arg); 717 718 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 719 /* Insert the messages at the head. We want to re-use the hot 720 * ones. */ 721 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 722 thread->msg_cache_count++; 723 } else { 724 spdk_mempool_put(g_spdk_msg_mempool, msg); 725 } 726 } 727 728 return count; 729 } 730 731 static void 732 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 733 { 734 struct spdk_poller *tmp __attribute__((unused)); 735 736 poller->next_run_tick = now + poller->period_ticks; 737 738 /* 739 * Insert poller in the thread's timed_pollers tree by next scheduled run time 740 * as its key. 741 */ 742 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 743 assert(tmp == NULL); 744 745 /* Update the cache only if it is empty or the inserted poller is earlier than it. 746 * RB_MIN() is not necessary here because all pollers, which has exactly the same 747 * next_run_tick as the existing poller, are inserted on the right side. 748 */ 749 if (thread->first_timed_poller == NULL || 750 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 751 thread->first_timed_poller = poller; 752 } 753 } 754 755 static inline void 756 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 757 { 758 struct spdk_poller *tmp __attribute__((unused)); 759 760 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 761 assert(tmp != NULL); 762 763 /* This function is not used in any case that is performance critical. 764 * Update the cache simply by RB_MIN() if it needs to be changed. 765 */ 766 if (thread->first_timed_poller == poller) { 767 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 768 } 769 } 770 771 static void 772 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 773 { 774 if (poller->period_ticks) { 775 poller_insert_timer(thread, poller, spdk_get_ticks()); 776 } else { 777 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 778 } 779 } 780 781 static inline void 782 thread_update_stats(struct spdk_thread *thread, uint64_t end, 783 uint64_t start, int rc) 784 { 785 if (rc == 0) { 786 /* Poller status idle */ 787 thread->stats.idle_tsc += end - start; 788 } else if (rc > 0) { 789 /* Poller status busy */ 790 thread->stats.busy_tsc += end - start; 791 } 792 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 793 thread->tsc_last = end; 794 } 795 796 static inline int 797 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 798 { 799 int rc; 800 801 switch (poller->state) { 802 case SPDK_POLLER_STATE_UNREGISTERED: 803 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 804 free(poller); 805 return 0; 806 case SPDK_POLLER_STATE_PAUSING: 807 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 808 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 809 poller->state = SPDK_POLLER_STATE_PAUSED; 810 return 0; 811 case SPDK_POLLER_STATE_WAITING: 812 break; 813 default: 814 assert(false); 815 break; 816 } 817 818 poller->state = SPDK_POLLER_STATE_RUNNING; 819 rc = poller->fn(poller->arg); 820 821 poller->run_count++; 822 if (rc > 0) { 823 poller->busy_count++; 824 } 825 826 #ifdef DEBUG 827 if (rc == -1) { 828 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 829 } 830 #endif 831 832 switch (poller->state) { 833 case SPDK_POLLER_STATE_UNREGISTERED: 834 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 835 free(poller); 836 break; 837 case SPDK_POLLER_STATE_PAUSING: 838 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 839 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 840 poller->state = SPDK_POLLER_STATE_PAUSED; 841 break; 842 case SPDK_POLLER_STATE_PAUSED: 843 case SPDK_POLLER_STATE_WAITING: 844 break; 845 case SPDK_POLLER_STATE_RUNNING: 846 poller->state = SPDK_POLLER_STATE_WAITING; 847 break; 848 default: 849 assert(false); 850 break; 851 } 852 853 return rc; 854 } 855 856 static inline int 857 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 858 uint64_t now) 859 { 860 int rc; 861 862 switch (poller->state) { 863 case SPDK_POLLER_STATE_UNREGISTERED: 864 free(poller); 865 return 0; 866 case SPDK_POLLER_STATE_PAUSING: 867 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 868 poller->state = SPDK_POLLER_STATE_PAUSED; 869 return 0; 870 case SPDK_POLLER_STATE_WAITING: 871 break; 872 default: 873 assert(false); 874 break; 875 } 876 877 poller->state = SPDK_POLLER_STATE_RUNNING; 878 rc = poller->fn(poller->arg); 879 880 poller->run_count++; 881 if (rc > 0) { 882 poller->busy_count++; 883 } 884 885 #ifdef DEBUG 886 if (rc == -1) { 887 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 888 } 889 #endif 890 891 switch (poller->state) { 892 case SPDK_POLLER_STATE_UNREGISTERED: 893 free(poller); 894 break; 895 case SPDK_POLLER_STATE_PAUSING: 896 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 897 poller->state = SPDK_POLLER_STATE_PAUSED; 898 break; 899 case SPDK_POLLER_STATE_PAUSED: 900 break; 901 case SPDK_POLLER_STATE_RUNNING: 902 poller->state = SPDK_POLLER_STATE_WAITING; 903 /* fallthrough */ 904 case SPDK_POLLER_STATE_WAITING: 905 poller_insert_timer(thread, poller, now); 906 break; 907 default: 908 assert(false); 909 break; 910 } 911 912 return rc; 913 } 914 915 static int 916 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 917 { 918 uint32_t msg_count; 919 struct spdk_poller *poller, *tmp; 920 spdk_msg_fn critical_msg; 921 int rc = 0; 922 923 thread->tsc_last = now; 924 925 critical_msg = thread->critical_msg; 926 if (spdk_unlikely(critical_msg != NULL)) { 927 critical_msg(NULL); 928 thread->critical_msg = NULL; 929 rc = 1; 930 } 931 932 msg_count = msg_queue_run_batch(thread, max_msgs); 933 if (msg_count) { 934 rc = 1; 935 } 936 937 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 938 active_pollers_head, tailq, tmp) { 939 int poller_rc; 940 941 poller_rc = thread_execute_poller(thread, poller); 942 if (poller_rc > rc) { 943 rc = poller_rc; 944 } 945 } 946 947 poller = thread->first_timed_poller; 948 while (poller != NULL) { 949 int timer_rc = 0; 950 951 if (now < poller->next_run_tick) { 952 break; 953 } 954 955 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 956 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 957 958 /* Update the cache to the next timed poller in the list 959 * only if the current poller is still the closest, otherwise, 960 * do nothing because the cache has been already updated. 961 */ 962 if (thread->first_timed_poller == poller) { 963 thread->first_timed_poller = tmp; 964 } 965 966 timer_rc = thread_execute_timed_poller(thread, poller, now); 967 if (timer_rc > rc) { 968 rc = timer_rc; 969 } 970 971 poller = tmp; 972 } 973 974 return rc; 975 } 976 977 int 978 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 979 { 980 struct spdk_thread *orig_thread; 981 int rc; 982 uint64_t notify = 1; 983 984 orig_thread = _get_thread(); 985 tls_thread = thread; 986 987 if (now == 0) { 988 now = spdk_get_ticks(); 989 } 990 991 if (spdk_likely(!thread->in_interrupt)) { 992 rc = thread_poll(thread, max_msgs, now); 993 if (spdk_unlikely(thread->in_interrupt)) { 994 /* The thread transitioned to interrupt mode during the above poll. 995 * Poll it one more time in case that during the transition time 996 * there is msg received without notification. 997 */ 998 rc = thread_poll(thread, max_msgs, now); 999 } 1000 } else { 1001 /* Non-block wait on thread's fd_group */ 1002 rc = spdk_fd_group_wait(thread->fgrp, 0); 1003 if (spdk_unlikely(!thread->in_interrupt)) { 1004 /* The thread transitioned to poll mode in a msg during the above processing. 1005 * Clear msg_fd since thread messages will be polled directly in poll mode. 1006 */ 1007 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 1008 if (rc < 0 && errno != EAGAIN) { 1009 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 1010 } 1011 } 1012 1013 /* Reap unregistered pollers out of poller execution in intr mode */ 1014 if (spdk_unlikely(thread->poller_unregistered)) { 1015 struct spdk_poller *poller, *tmp; 1016 1017 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1018 active_pollers_head, tailq, tmp) { 1019 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1020 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1021 free(poller); 1022 } 1023 } 1024 1025 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1026 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1027 poller_remove_timer(thread, poller); 1028 free(poller); 1029 } 1030 } 1031 1032 thread->poller_unregistered = false; 1033 } 1034 } 1035 1036 1037 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1038 thread_exit(thread, now); 1039 } 1040 1041 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1042 1043 tls_thread = orig_thread; 1044 1045 return rc; 1046 } 1047 1048 uint64_t 1049 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1050 { 1051 struct spdk_poller *poller; 1052 1053 poller = thread->first_timed_poller; 1054 if (poller) { 1055 return poller->next_run_tick; 1056 } 1057 1058 return 0; 1059 } 1060 1061 int 1062 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1063 { 1064 return !TAILQ_EMPTY(&thread->active_pollers); 1065 } 1066 1067 static bool 1068 thread_has_unpaused_pollers(struct spdk_thread *thread) 1069 { 1070 if (TAILQ_EMPTY(&thread->active_pollers) && 1071 RB_EMPTY(&thread->timed_pollers)) { 1072 return false; 1073 } 1074 1075 return true; 1076 } 1077 1078 bool 1079 spdk_thread_has_pollers(struct spdk_thread *thread) 1080 { 1081 if (!thread_has_unpaused_pollers(thread) && 1082 TAILQ_EMPTY(&thread->paused_pollers)) { 1083 return false; 1084 } 1085 1086 return true; 1087 } 1088 1089 bool 1090 spdk_thread_is_idle(struct spdk_thread *thread) 1091 { 1092 if (spdk_ring_count(thread->messages) || 1093 thread_has_unpaused_pollers(thread) || 1094 thread->critical_msg != NULL) { 1095 return false; 1096 } 1097 1098 return true; 1099 } 1100 1101 uint32_t 1102 spdk_thread_get_count(void) 1103 { 1104 /* 1105 * Return cached value of the current thread count. We could acquire the 1106 * lock and iterate through the TAILQ of threads to count them, but that 1107 * count could still be invalidated after we release the lock. 1108 */ 1109 return g_thread_count; 1110 } 1111 1112 struct spdk_thread * 1113 spdk_get_thread(void) 1114 { 1115 return _get_thread(); 1116 } 1117 1118 const char * 1119 spdk_thread_get_name(const struct spdk_thread *thread) 1120 { 1121 return thread->name; 1122 } 1123 1124 uint64_t 1125 spdk_thread_get_id(const struct spdk_thread *thread) 1126 { 1127 return thread->id; 1128 } 1129 1130 struct spdk_thread * 1131 spdk_thread_get_by_id(uint64_t id) 1132 { 1133 struct spdk_thread *thread; 1134 1135 if (id == 0 || id >= g_thread_id) { 1136 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1137 return NULL; 1138 } 1139 pthread_mutex_lock(&g_devlist_mutex); 1140 TAILQ_FOREACH(thread, &g_threads, tailq) { 1141 if (thread->id == id) { 1142 break; 1143 } 1144 } 1145 pthread_mutex_unlock(&g_devlist_mutex); 1146 return thread; 1147 } 1148 1149 int 1150 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1151 { 1152 struct spdk_thread *thread; 1153 1154 thread = _get_thread(); 1155 if (!thread) { 1156 SPDK_ERRLOG("No thread allocated\n"); 1157 return -EINVAL; 1158 } 1159 1160 if (stats == NULL) { 1161 return -EINVAL; 1162 } 1163 1164 *stats = thread->stats; 1165 1166 return 0; 1167 } 1168 1169 uint64_t 1170 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1171 { 1172 if (thread == NULL) { 1173 thread = _get_thread(); 1174 } 1175 1176 return thread->tsc_last; 1177 } 1178 1179 static inline int 1180 thread_send_msg_notification(const struct spdk_thread *target_thread) 1181 { 1182 uint64_t notify = 1; 1183 int rc; 1184 1185 /* Not necessary to do notification if interrupt facility is not enabled */ 1186 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1187 return 0; 1188 } 1189 1190 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1191 * after sending thread msg, it is necessary to check whether target thread runs in 1192 * interrupt mode and then decide whether do event notification. 1193 */ 1194 if (spdk_unlikely(target_thread->in_interrupt)) { 1195 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1196 if (rc < 0) { 1197 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1198 return -EIO; 1199 } 1200 } 1201 1202 return 0; 1203 } 1204 1205 int 1206 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1207 { 1208 struct spdk_thread *local_thread; 1209 struct spdk_msg *msg; 1210 int rc; 1211 1212 assert(thread != NULL); 1213 1214 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1215 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1216 return -EIO; 1217 } 1218 1219 local_thread = _get_thread(); 1220 1221 msg = NULL; 1222 if (local_thread != NULL) { 1223 if (local_thread->msg_cache_count > 0) { 1224 msg = SLIST_FIRST(&local_thread->msg_cache); 1225 assert(msg != NULL); 1226 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1227 local_thread->msg_cache_count--; 1228 } 1229 } 1230 1231 if (msg == NULL) { 1232 msg = spdk_mempool_get(g_spdk_msg_mempool); 1233 if (!msg) { 1234 SPDK_ERRLOG("msg could not be allocated\n"); 1235 return -ENOMEM; 1236 } 1237 } 1238 1239 msg->fn = fn; 1240 msg->arg = ctx; 1241 1242 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1243 if (rc != 1) { 1244 SPDK_ERRLOG("msg could not be enqueued\n"); 1245 spdk_mempool_put(g_spdk_msg_mempool, msg); 1246 return -EIO; 1247 } 1248 1249 return thread_send_msg_notification(thread); 1250 } 1251 1252 int 1253 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1254 { 1255 spdk_msg_fn expected = NULL; 1256 1257 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1258 __ATOMIC_SEQ_CST)) { 1259 return -EIO; 1260 } 1261 1262 return thread_send_msg_notification(thread); 1263 } 1264 1265 #ifdef __linux__ 1266 static int 1267 interrupt_timerfd_process(void *arg) 1268 { 1269 struct spdk_poller *poller = arg; 1270 uint64_t exp; 1271 int rc; 1272 1273 /* clear the level of interval timer */ 1274 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1275 if (rc < 0) { 1276 if (rc == -EAGAIN) { 1277 return 0; 1278 } 1279 1280 return rc; 1281 } 1282 1283 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1284 1285 return poller->fn(poller->arg); 1286 } 1287 1288 static int 1289 period_poller_interrupt_init(struct spdk_poller *poller) 1290 { 1291 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1292 int timerfd; 1293 int rc; 1294 1295 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1296 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1297 if (timerfd < 0) { 1298 return -errno; 1299 } 1300 1301 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1302 if (rc < 0) { 1303 close(timerfd); 1304 return rc; 1305 } 1306 1307 poller->interruptfd = timerfd; 1308 return 0; 1309 } 1310 1311 static void 1312 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1313 { 1314 int timerfd = poller->interruptfd; 1315 uint64_t now_tick = spdk_get_ticks(); 1316 uint64_t ticks = spdk_get_ticks_hz(); 1317 int ret; 1318 struct itimerspec new_tv = {}; 1319 struct itimerspec old_tv = {}; 1320 1321 assert(poller->period_ticks != 0); 1322 assert(timerfd >= 0); 1323 1324 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1325 interrupt_mode ? "interrupt" : "poll"); 1326 1327 if (interrupt_mode) { 1328 /* Set repeated timer expiration */ 1329 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1330 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1331 1332 /* Update next timer expiration */ 1333 if (poller->next_run_tick == 0) { 1334 poller->next_run_tick = now_tick + poller->period_ticks; 1335 } else if (poller->next_run_tick < now_tick) { 1336 poller->next_run_tick = now_tick; 1337 } 1338 1339 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1340 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1341 1342 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1343 if (ret < 0) { 1344 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1345 assert(false); 1346 } 1347 } else { 1348 /* Disarm the timer */ 1349 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1350 if (ret < 0) { 1351 /* timerfd_settime's failure indicates that the timerfd is in error */ 1352 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1353 assert(false); 1354 } 1355 1356 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1357 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1358 */ 1359 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1360 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1361 poller_remove_timer(poller->thread, poller); 1362 poller_insert_timer(poller->thread, poller, now_tick); 1363 } 1364 } 1365 1366 static void 1367 poller_interrupt_fini(struct spdk_poller *poller) 1368 { 1369 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1370 assert(poller->interruptfd >= 0); 1371 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1372 close(poller->interruptfd); 1373 poller->interruptfd = -1; 1374 } 1375 1376 static int 1377 busy_poller_interrupt_init(struct spdk_poller *poller) 1378 { 1379 int busy_efd; 1380 int rc; 1381 1382 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1383 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1384 if (busy_efd < 0) { 1385 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1386 return -errno; 1387 } 1388 1389 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1390 poller->fn, poller->arg, poller->name); 1391 if (rc < 0) { 1392 close(busy_efd); 1393 return rc; 1394 } 1395 1396 poller->interruptfd = busy_efd; 1397 return 0; 1398 } 1399 1400 static void 1401 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1402 { 1403 int busy_efd = poller->interruptfd; 1404 uint64_t notify = 1; 1405 int rc __attribute__((unused)); 1406 1407 assert(busy_efd >= 0); 1408 1409 if (interrupt_mode) { 1410 /* Write without read on eventfd will get it repeatedly triggered. */ 1411 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1412 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1413 } 1414 } else { 1415 /* Read on eventfd will clear its level triggering. */ 1416 rc = read(busy_efd, ¬ify, sizeof(notify)); 1417 } 1418 } 1419 1420 #else 1421 1422 static int 1423 period_poller_interrupt_init(struct spdk_poller *poller) 1424 { 1425 return -ENOTSUP; 1426 } 1427 1428 static void 1429 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1430 { 1431 } 1432 1433 static void 1434 poller_interrupt_fini(struct spdk_poller *poller) 1435 { 1436 } 1437 1438 static int 1439 busy_poller_interrupt_init(struct spdk_poller *poller) 1440 { 1441 return -ENOTSUP; 1442 } 1443 1444 static void 1445 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1446 { 1447 } 1448 1449 #endif 1450 1451 void 1452 spdk_poller_register_interrupt(struct spdk_poller *poller, 1453 spdk_poller_set_interrupt_mode_cb cb_fn, 1454 void *cb_arg) 1455 { 1456 assert(poller != NULL); 1457 assert(cb_fn != NULL); 1458 assert(spdk_get_thread() == poller->thread); 1459 1460 if (!spdk_interrupt_mode_is_enabled()) { 1461 return; 1462 } 1463 1464 /* when a poller is created we don't know if the user is ever going to 1465 * enable interrupts on it by calling this function, so the poller 1466 * registration function has to immediately create a interruptfd. 1467 * When this function does get called by user, we have to then destroy 1468 * that interruptfd. 1469 */ 1470 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1471 poller_interrupt_fini(poller); 1472 } 1473 1474 poller->set_intr_cb_fn = cb_fn; 1475 poller->set_intr_cb_arg = cb_arg; 1476 1477 /* Set poller into interrupt mode if thread is in interrupt. */ 1478 if (poller->thread->in_interrupt) { 1479 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1480 } 1481 } 1482 1483 static uint64_t 1484 convert_us_to_ticks(uint64_t us) 1485 { 1486 uint64_t quotient, remainder, ticks; 1487 1488 if (us) { 1489 quotient = us / SPDK_SEC_TO_USEC; 1490 remainder = us % SPDK_SEC_TO_USEC; 1491 ticks = spdk_get_ticks_hz(); 1492 1493 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1494 } else { 1495 return 0; 1496 } 1497 } 1498 1499 static struct spdk_poller * 1500 poller_register(spdk_poller_fn fn, 1501 void *arg, 1502 uint64_t period_microseconds, 1503 const char *name) 1504 { 1505 struct spdk_thread *thread; 1506 struct spdk_poller *poller; 1507 1508 thread = spdk_get_thread(); 1509 if (!thread) { 1510 assert(false); 1511 return NULL; 1512 } 1513 1514 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1515 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1516 return NULL; 1517 } 1518 1519 poller = calloc(1, sizeof(*poller)); 1520 if (poller == NULL) { 1521 SPDK_ERRLOG("Poller memory allocation failed\n"); 1522 return NULL; 1523 } 1524 1525 if (name) { 1526 snprintf(poller->name, sizeof(poller->name), "%s", name); 1527 } else { 1528 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1529 } 1530 1531 poller->state = SPDK_POLLER_STATE_WAITING; 1532 poller->fn = fn; 1533 poller->arg = arg; 1534 poller->thread = thread; 1535 poller->interruptfd = -1; 1536 if (thread->next_poller_id == 0) { 1537 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1538 thread->next_poller_id = 1; 1539 } 1540 poller->id = thread->next_poller_id++; 1541 1542 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1543 1544 if (spdk_interrupt_mode_is_enabled()) { 1545 int rc; 1546 1547 if (period_microseconds) { 1548 rc = period_poller_interrupt_init(poller); 1549 if (rc < 0) { 1550 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1551 free(poller); 1552 return NULL; 1553 } 1554 1555 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1556 } else { 1557 /* If the poller doesn't have a period, create interruptfd that's always 1558 * busy automatically when running in interrupt mode. 1559 */ 1560 rc = busy_poller_interrupt_init(poller); 1561 if (rc > 0) { 1562 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1563 free(poller); 1564 return NULL; 1565 } 1566 1567 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1568 } 1569 } 1570 1571 thread_insert_poller(thread, poller); 1572 1573 return poller; 1574 } 1575 1576 struct spdk_poller * 1577 spdk_poller_register(spdk_poller_fn fn, 1578 void *arg, 1579 uint64_t period_microseconds) 1580 { 1581 return poller_register(fn, arg, period_microseconds, NULL); 1582 } 1583 1584 struct spdk_poller * 1585 spdk_poller_register_named(spdk_poller_fn fn, 1586 void *arg, 1587 uint64_t period_microseconds, 1588 const char *name) 1589 { 1590 return poller_register(fn, arg, period_microseconds, name); 1591 } 1592 1593 static void 1594 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1595 struct spdk_thread *curthread) 1596 { 1597 if (thread == NULL) { 1598 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1599 abort(); 1600 } 1601 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1602 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1603 thread->name, thread->id); 1604 assert(false); 1605 } 1606 1607 void 1608 spdk_poller_unregister(struct spdk_poller **ppoller) 1609 { 1610 struct spdk_thread *thread; 1611 struct spdk_poller *poller; 1612 1613 poller = *ppoller; 1614 if (poller == NULL) { 1615 return; 1616 } 1617 1618 *ppoller = NULL; 1619 1620 thread = spdk_get_thread(); 1621 if (!thread) { 1622 assert(false); 1623 return; 1624 } 1625 1626 if (poller->thread != thread) { 1627 wrong_thread(__func__, poller->name, poller->thread, thread); 1628 return; 1629 } 1630 1631 if (spdk_interrupt_mode_is_enabled()) { 1632 /* Release the interrupt resource for period or busy poller */ 1633 if (poller->interruptfd >= 0) { 1634 poller_interrupt_fini(poller); 1635 } 1636 1637 /* Mark there is poller unregistered. Then unregistered pollers will 1638 * get reaped by spdk_thread_poll also in intr mode. 1639 */ 1640 thread->poller_unregistered = true; 1641 } 1642 1643 /* If the poller was paused, put it on the active_pollers list so that 1644 * its unregistration can be processed by spdk_thread_poll(). 1645 */ 1646 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1647 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1648 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1649 poller->period_ticks = 0; 1650 } 1651 1652 /* Simply set the state to unregistered. The poller will get cleaned up 1653 * in a subsequent call to spdk_thread_poll(). 1654 */ 1655 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1656 } 1657 1658 void 1659 spdk_poller_pause(struct spdk_poller *poller) 1660 { 1661 struct spdk_thread *thread; 1662 1663 thread = spdk_get_thread(); 1664 if (!thread) { 1665 assert(false); 1666 return; 1667 } 1668 1669 if (poller->thread != thread) { 1670 wrong_thread(__func__, poller->name, poller->thread, thread); 1671 return; 1672 } 1673 1674 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1675 * spdk_thread_poll() move it. It allows a poller to be paused from 1676 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1677 * iteration, or from within itself without breaking the logic to always 1678 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1679 */ 1680 switch (poller->state) { 1681 case SPDK_POLLER_STATE_PAUSED: 1682 case SPDK_POLLER_STATE_PAUSING: 1683 break; 1684 case SPDK_POLLER_STATE_RUNNING: 1685 case SPDK_POLLER_STATE_WAITING: 1686 poller->state = SPDK_POLLER_STATE_PAUSING; 1687 break; 1688 default: 1689 assert(false); 1690 break; 1691 } 1692 } 1693 1694 void 1695 spdk_poller_resume(struct spdk_poller *poller) 1696 { 1697 struct spdk_thread *thread; 1698 1699 thread = spdk_get_thread(); 1700 if (!thread) { 1701 assert(false); 1702 return; 1703 } 1704 1705 if (poller->thread != thread) { 1706 wrong_thread(__func__, poller->name, poller->thread, thread); 1707 return; 1708 } 1709 1710 /* If a poller is paused it has to be removed from the paused pollers 1711 * list and put on the active list or timer tree depending on its 1712 * period_ticks. If a poller is still in the process of being paused, 1713 * we just need to flip its state back to waiting, as it's already on 1714 * the appropriate list or tree. 1715 */ 1716 switch (poller->state) { 1717 case SPDK_POLLER_STATE_PAUSED: 1718 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1719 thread_insert_poller(thread, poller); 1720 /* fallthrough */ 1721 case SPDK_POLLER_STATE_PAUSING: 1722 poller->state = SPDK_POLLER_STATE_WAITING; 1723 break; 1724 case SPDK_POLLER_STATE_RUNNING: 1725 case SPDK_POLLER_STATE_WAITING: 1726 break; 1727 default: 1728 assert(false); 1729 break; 1730 } 1731 } 1732 1733 const char * 1734 spdk_poller_get_name(struct spdk_poller *poller) 1735 { 1736 return poller->name; 1737 } 1738 1739 uint64_t 1740 spdk_poller_get_id(struct spdk_poller *poller) 1741 { 1742 return poller->id; 1743 } 1744 1745 const char * 1746 spdk_poller_get_state_str(struct spdk_poller *poller) 1747 { 1748 switch (poller->state) { 1749 case SPDK_POLLER_STATE_WAITING: 1750 return "waiting"; 1751 case SPDK_POLLER_STATE_RUNNING: 1752 return "running"; 1753 case SPDK_POLLER_STATE_UNREGISTERED: 1754 return "unregistered"; 1755 case SPDK_POLLER_STATE_PAUSING: 1756 return "pausing"; 1757 case SPDK_POLLER_STATE_PAUSED: 1758 return "paused"; 1759 default: 1760 return NULL; 1761 } 1762 } 1763 1764 uint64_t 1765 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1766 { 1767 return poller->period_ticks; 1768 } 1769 1770 void 1771 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1772 { 1773 stats->run_count = poller->run_count; 1774 stats->busy_count = poller->busy_count; 1775 } 1776 1777 struct spdk_poller * 1778 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1779 { 1780 return TAILQ_FIRST(&thread->active_pollers); 1781 } 1782 1783 struct spdk_poller * 1784 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1785 { 1786 return TAILQ_NEXT(prev, tailq); 1787 } 1788 1789 struct spdk_poller * 1790 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1791 { 1792 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1793 } 1794 1795 struct spdk_poller * 1796 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1797 { 1798 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1799 } 1800 1801 struct spdk_poller * 1802 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1803 { 1804 return TAILQ_FIRST(&thread->paused_pollers); 1805 } 1806 1807 struct spdk_poller * 1808 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1809 { 1810 return TAILQ_NEXT(prev, tailq); 1811 } 1812 1813 struct spdk_io_channel * 1814 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1815 { 1816 return RB_MIN(io_channel_tree, &thread->io_channels); 1817 } 1818 1819 struct spdk_io_channel * 1820 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1821 { 1822 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1823 } 1824 1825 struct call_thread { 1826 struct spdk_thread *cur_thread; 1827 spdk_msg_fn fn; 1828 void *ctx; 1829 1830 struct spdk_thread *orig_thread; 1831 spdk_msg_fn cpl; 1832 }; 1833 1834 static void 1835 _on_thread(void *ctx) 1836 { 1837 struct call_thread *ct = ctx; 1838 int rc __attribute__((unused)); 1839 1840 ct->fn(ct->ctx); 1841 1842 pthread_mutex_lock(&g_devlist_mutex); 1843 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1844 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1845 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1846 ct->cur_thread->name); 1847 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1848 } 1849 pthread_mutex_unlock(&g_devlist_mutex); 1850 1851 if (!ct->cur_thread) { 1852 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1853 1854 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1855 free(ctx); 1856 } else { 1857 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1858 ct->cur_thread->name); 1859 1860 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1861 } 1862 assert(rc == 0); 1863 } 1864 1865 void 1866 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1867 { 1868 struct call_thread *ct; 1869 struct spdk_thread *thread; 1870 int rc __attribute__((unused)); 1871 1872 ct = calloc(1, sizeof(*ct)); 1873 if (!ct) { 1874 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1875 cpl(ctx); 1876 return; 1877 } 1878 1879 ct->fn = fn; 1880 ct->ctx = ctx; 1881 ct->cpl = cpl; 1882 1883 thread = _get_thread(); 1884 if (!thread) { 1885 SPDK_ERRLOG("No thread allocated\n"); 1886 free(ct); 1887 cpl(ctx); 1888 return; 1889 } 1890 ct->orig_thread = thread; 1891 1892 pthread_mutex_lock(&g_devlist_mutex); 1893 ct->cur_thread = TAILQ_FIRST(&g_threads); 1894 pthread_mutex_unlock(&g_devlist_mutex); 1895 1896 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1897 ct->orig_thread->name); 1898 1899 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1900 assert(rc == 0); 1901 } 1902 1903 static inline void 1904 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1905 { 1906 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1907 return; 1908 } 1909 1910 if (!poller->set_intr_cb_fn) { 1911 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1912 assert(false); 1913 return; 1914 } 1915 1916 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1917 } 1918 1919 void 1920 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1921 { 1922 struct spdk_thread *thread = _get_thread(); 1923 struct spdk_poller *poller, *tmp; 1924 1925 assert(thread); 1926 assert(spdk_interrupt_mode_is_enabled()); 1927 1928 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1929 thread->name, enable_interrupt ? "intr" : "poll", 1930 thread->in_interrupt ? "intr" : "poll"); 1931 1932 if (thread->in_interrupt == enable_interrupt) { 1933 return; 1934 } 1935 1936 /* Set pollers to expected mode */ 1937 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1938 poller_set_interrupt_mode(poller, enable_interrupt); 1939 } 1940 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1941 poller_set_interrupt_mode(poller, enable_interrupt); 1942 } 1943 /* All paused pollers will go to work in interrupt mode */ 1944 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1945 poller_set_interrupt_mode(poller, enable_interrupt); 1946 } 1947 1948 thread->in_interrupt = enable_interrupt; 1949 return; 1950 } 1951 1952 static struct io_device * 1953 io_device_get(void *io_device) 1954 { 1955 struct io_device find = {}; 1956 1957 find.io_device = io_device; 1958 return RB_FIND(io_device_tree, &g_io_devices, &find); 1959 } 1960 1961 void 1962 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1963 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1964 const char *name) 1965 { 1966 struct io_device *dev, *tmp; 1967 struct spdk_thread *thread; 1968 1969 assert(io_device != NULL); 1970 assert(create_cb != NULL); 1971 assert(destroy_cb != NULL); 1972 1973 thread = spdk_get_thread(); 1974 if (!thread) { 1975 SPDK_ERRLOG("called from non-SPDK thread\n"); 1976 assert(false); 1977 return; 1978 } 1979 1980 dev = calloc(1, sizeof(struct io_device)); 1981 if (dev == NULL) { 1982 SPDK_ERRLOG("could not allocate io_device\n"); 1983 return; 1984 } 1985 1986 dev->io_device = io_device; 1987 if (name) { 1988 snprintf(dev->name, sizeof(dev->name), "%s", name); 1989 } else { 1990 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1991 } 1992 dev->create_cb = create_cb; 1993 dev->destroy_cb = destroy_cb; 1994 dev->unregister_cb = NULL; 1995 dev->ctx_size = ctx_size; 1996 dev->for_each_count = 0; 1997 dev->unregistered = false; 1998 dev->refcnt = 0; 1999 2000 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2001 dev->name, dev->io_device, thread->name); 2002 2003 pthread_mutex_lock(&g_devlist_mutex); 2004 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2005 if (tmp != NULL) { 2006 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2007 io_device, tmp->name, dev->name); 2008 free(dev); 2009 } 2010 2011 pthread_mutex_unlock(&g_devlist_mutex); 2012 } 2013 2014 static void 2015 _finish_unregister(void *arg) 2016 { 2017 struct io_device *dev = arg; 2018 struct spdk_thread *thread; 2019 2020 thread = spdk_get_thread(); 2021 assert(thread == dev->unregister_thread); 2022 2023 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2024 dev->name, dev->io_device, thread->name); 2025 2026 assert(thread->pending_unregister_count > 0); 2027 thread->pending_unregister_count--; 2028 2029 dev->unregister_cb(dev->io_device); 2030 free(dev); 2031 } 2032 2033 static void 2034 io_device_free(struct io_device *dev) 2035 { 2036 int rc __attribute__((unused)); 2037 2038 if (dev->unregister_cb == NULL) { 2039 free(dev); 2040 } else { 2041 assert(dev->unregister_thread != NULL); 2042 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2043 dev->name, dev->io_device, dev->unregister_thread->name); 2044 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2045 assert(rc == 0); 2046 } 2047 } 2048 2049 void 2050 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2051 { 2052 struct io_device *dev; 2053 uint32_t refcnt; 2054 struct spdk_thread *thread; 2055 2056 thread = spdk_get_thread(); 2057 if (!thread) { 2058 SPDK_ERRLOG("called from non-SPDK thread\n"); 2059 assert(false); 2060 return; 2061 } 2062 2063 pthread_mutex_lock(&g_devlist_mutex); 2064 dev = io_device_get(io_device); 2065 if (!dev) { 2066 SPDK_ERRLOG("io_device %p not found\n", io_device); 2067 assert(false); 2068 pthread_mutex_unlock(&g_devlist_mutex); 2069 return; 2070 } 2071 2072 /* The for_each_count check differentiates the user attempting to unregister the 2073 * device a second time, from the internal call to this function that occurs 2074 * after the for_each_count reaches 0. 2075 */ 2076 if (dev->pending_unregister && dev->for_each_count > 0) { 2077 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2078 assert(false); 2079 pthread_mutex_unlock(&g_devlist_mutex); 2080 return; 2081 } 2082 2083 dev->unregister_cb = unregister_cb; 2084 dev->unregister_thread = thread; 2085 2086 if (dev->for_each_count > 0) { 2087 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2088 dev->name, io_device, dev->for_each_count); 2089 dev->pending_unregister = true; 2090 pthread_mutex_unlock(&g_devlist_mutex); 2091 return; 2092 } 2093 2094 dev->unregistered = true; 2095 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2096 refcnt = dev->refcnt; 2097 pthread_mutex_unlock(&g_devlist_mutex); 2098 2099 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2100 dev->name, dev->io_device, thread->name); 2101 2102 if (unregister_cb) { 2103 thread->pending_unregister_count++; 2104 } 2105 2106 if (refcnt > 0) { 2107 /* defer deletion */ 2108 return; 2109 } 2110 2111 io_device_free(dev); 2112 } 2113 2114 const char * 2115 spdk_io_device_get_name(struct io_device *dev) 2116 { 2117 return dev->name; 2118 } 2119 2120 static struct spdk_io_channel * 2121 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2122 { 2123 struct spdk_io_channel find = {}; 2124 2125 find.dev = dev; 2126 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2127 } 2128 2129 struct spdk_io_channel * 2130 spdk_get_io_channel(void *io_device) 2131 { 2132 struct spdk_io_channel *ch; 2133 struct spdk_thread *thread; 2134 struct io_device *dev; 2135 int rc; 2136 2137 pthread_mutex_lock(&g_devlist_mutex); 2138 dev = io_device_get(io_device); 2139 if (dev == NULL) { 2140 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2141 pthread_mutex_unlock(&g_devlist_mutex); 2142 return NULL; 2143 } 2144 2145 thread = _get_thread(); 2146 if (!thread) { 2147 SPDK_ERRLOG("No thread allocated\n"); 2148 pthread_mutex_unlock(&g_devlist_mutex); 2149 return NULL; 2150 } 2151 2152 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2153 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2154 pthread_mutex_unlock(&g_devlist_mutex); 2155 return NULL; 2156 } 2157 2158 ch = thread_get_io_channel(thread, dev); 2159 if (ch != NULL) { 2160 ch->ref++; 2161 2162 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2163 ch, dev->name, dev->io_device, thread->name, ch->ref); 2164 2165 /* 2166 * An I/O channel already exists for this device on this 2167 * thread, so return it. 2168 */ 2169 pthread_mutex_unlock(&g_devlist_mutex); 2170 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2171 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2172 return ch; 2173 } 2174 2175 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2176 if (ch == NULL) { 2177 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2178 pthread_mutex_unlock(&g_devlist_mutex); 2179 return NULL; 2180 } 2181 2182 ch->dev = dev; 2183 ch->destroy_cb = dev->destroy_cb; 2184 ch->thread = thread; 2185 ch->ref = 1; 2186 ch->destroy_ref = 0; 2187 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2188 2189 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2190 ch, dev->name, dev->io_device, thread->name, ch->ref); 2191 2192 dev->refcnt++; 2193 2194 pthread_mutex_unlock(&g_devlist_mutex); 2195 2196 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2197 if (rc != 0) { 2198 pthread_mutex_lock(&g_devlist_mutex); 2199 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2200 dev->refcnt--; 2201 free(ch); 2202 pthread_mutex_unlock(&g_devlist_mutex); 2203 return NULL; 2204 } 2205 2206 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2207 return ch; 2208 } 2209 2210 static void 2211 put_io_channel(void *arg) 2212 { 2213 struct spdk_io_channel *ch = arg; 2214 bool do_remove_dev = true; 2215 struct spdk_thread *thread; 2216 2217 thread = spdk_get_thread(); 2218 if (!thread) { 2219 SPDK_ERRLOG("called from non-SPDK thread\n"); 2220 assert(false); 2221 return; 2222 } 2223 2224 SPDK_DEBUGLOG(thread, 2225 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2226 ch, ch->dev->name, ch->dev->io_device, thread->name); 2227 2228 assert(ch->thread == thread); 2229 2230 ch->destroy_ref--; 2231 2232 if (ch->ref > 0 || ch->destroy_ref > 0) { 2233 /* 2234 * Another reference to the associated io_device was requested 2235 * after this message was sent but before it had a chance to 2236 * execute. 2237 */ 2238 return; 2239 } 2240 2241 pthread_mutex_lock(&g_devlist_mutex); 2242 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2243 pthread_mutex_unlock(&g_devlist_mutex); 2244 2245 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2246 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2247 2248 pthread_mutex_lock(&g_devlist_mutex); 2249 ch->dev->refcnt--; 2250 2251 if (!ch->dev->unregistered) { 2252 do_remove_dev = false; 2253 } 2254 2255 if (ch->dev->refcnt > 0) { 2256 do_remove_dev = false; 2257 } 2258 2259 pthread_mutex_unlock(&g_devlist_mutex); 2260 2261 if (do_remove_dev) { 2262 io_device_free(ch->dev); 2263 } 2264 free(ch); 2265 } 2266 2267 void 2268 spdk_put_io_channel(struct spdk_io_channel *ch) 2269 { 2270 struct spdk_thread *thread; 2271 int rc __attribute__((unused)); 2272 2273 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2274 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2275 2276 thread = spdk_get_thread(); 2277 if (!thread) { 2278 SPDK_ERRLOG("called from non-SPDK thread\n"); 2279 assert(false); 2280 return; 2281 } 2282 2283 if (ch->thread != thread) { 2284 wrong_thread(__func__, "ch", ch->thread, thread); 2285 return; 2286 } 2287 2288 SPDK_DEBUGLOG(thread, 2289 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2290 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2291 2292 ch->ref--; 2293 2294 if (ch->ref == 0) { 2295 ch->destroy_ref++; 2296 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2297 assert(rc == 0); 2298 } 2299 } 2300 2301 struct spdk_io_channel * 2302 spdk_io_channel_from_ctx(void *ctx) 2303 { 2304 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2305 } 2306 2307 struct spdk_thread * 2308 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2309 { 2310 return ch->thread; 2311 } 2312 2313 void * 2314 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2315 { 2316 return ch->dev->io_device; 2317 } 2318 2319 const char * 2320 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2321 { 2322 return spdk_io_device_get_name(ch->dev); 2323 } 2324 2325 int 2326 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2327 { 2328 return ch->ref; 2329 } 2330 2331 struct spdk_io_channel_iter { 2332 void *io_device; 2333 struct io_device *dev; 2334 spdk_channel_msg fn; 2335 int status; 2336 void *ctx; 2337 struct spdk_io_channel *ch; 2338 2339 struct spdk_thread *cur_thread; 2340 2341 struct spdk_thread *orig_thread; 2342 spdk_channel_for_each_cpl cpl; 2343 }; 2344 2345 void * 2346 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2347 { 2348 return i->io_device; 2349 } 2350 2351 struct spdk_io_channel * 2352 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2353 { 2354 return i->ch; 2355 } 2356 2357 void * 2358 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2359 { 2360 return i->ctx; 2361 } 2362 2363 static void 2364 _call_completion(void *ctx) 2365 { 2366 struct spdk_io_channel_iter *i = ctx; 2367 2368 if (i->cpl != NULL) { 2369 i->cpl(i, i->status); 2370 } 2371 free(i); 2372 } 2373 2374 static void 2375 _call_channel(void *ctx) 2376 { 2377 struct spdk_io_channel_iter *i = ctx; 2378 struct spdk_io_channel *ch; 2379 2380 /* 2381 * It is possible that the channel was deleted before this 2382 * message had a chance to execute. If so, skip calling 2383 * the fn() on this thread. 2384 */ 2385 pthread_mutex_lock(&g_devlist_mutex); 2386 ch = thread_get_io_channel(i->cur_thread, i->dev); 2387 pthread_mutex_unlock(&g_devlist_mutex); 2388 2389 if (ch) { 2390 i->fn(i); 2391 } else { 2392 spdk_for_each_channel_continue(i, 0); 2393 } 2394 } 2395 2396 void 2397 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2398 spdk_channel_for_each_cpl cpl) 2399 { 2400 struct spdk_thread *thread; 2401 struct spdk_io_channel *ch; 2402 struct spdk_io_channel_iter *i; 2403 int rc __attribute__((unused)); 2404 2405 i = calloc(1, sizeof(*i)); 2406 if (!i) { 2407 SPDK_ERRLOG("Unable to allocate iterator\n"); 2408 assert(false); 2409 return; 2410 } 2411 2412 i->io_device = io_device; 2413 i->fn = fn; 2414 i->ctx = ctx; 2415 i->cpl = cpl; 2416 i->orig_thread = _get_thread(); 2417 2418 pthread_mutex_lock(&g_devlist_mutex); 2419 i->dev = io_device_get(io_device); 2420 if (i->dev == NULL) { 2421 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2422 assert(false); 2423 i->status = -ENODEV; 2424 goto end; 2425 } 2426 2427 /* Do not allow new for_each operations if we are already waiting to unregister 2428 * the device for other for_each operations to complete. 2429 */ 2430 if (i->dev->pending_unregister) { 2431 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2432 i->status = -ENODEV; 2433 goto end; 2434 } 2435 2436 TAILQ_FOREACH(thread, &g_threads, tailq) { 2437 ch = thread_get_io_channel(thread, i->dev); 2438 if (ch != NULL) { 2439 ch->dev->for_each_count++; 2440 i->cur_thread = thread; 2441 i->ch = ch; 2442 pthread_mutex_unlock(&g_devlist_mutex); 2443 rc = spdk_thread_send_msg(thread, _call_channel, i); 2444 assert(rc == 0); 2445 return; 2446 } 2447 } 2448 2449 end: 2450 pthread_mutex_unlock(&g_devlist_mutex); 2451 2452 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2453 assert(rc == 0); 2454 } 2455 2456 static void 2457 __pending_unregister(void *arg) 2458 { 2459 struct io_device *dev = arg; 2460 2461 assert(dev->pending_unregister); 2462 assert(dev->for_each_count == 0); 2463 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2464 } 2465 2466 void 2467 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2468 { 2469 struct spdk_thread *thread; 2470 struct spdk_io_channel *ch; 2471 struct io_device *dev; 2472 int rc __attribute__((unused)); 2473 2474 assert(i->cur_thread == spdk_get_thread()); 2475 2476 i->status = status; 2477 2478 pthread_mutex_lock(&g_devlist_mutex); 2479 dev = i->dev; 2480 if (status) { 2481 goto end; 2482 } 2483 2484 thread = TAILQ_NEXT(i->cur_thread, tailq); 2485 while (thread) { 2486 ch = thread_get_io_channel(thread, dev); 2487 if (ch != NULL) { 2488 i->cur_thread = thread; 2489 i->ch = ch; 2490 pthread_mutex_unlock(&g_devlist_mutex); 2491 rc = spdk_thread_send_msg(thread, _call_channel, i); 2492 assert(rc == 0); 2493 return; 2494 } 2495 thread = TAILQ_NEXT(thread, tailq); 2496 } 2497 2498 end: 2499 dev->for_each_count--; 2500 i->ch = NULL; 2501 pthread_mutex_unlock(&g_devlist_mutex); 2502 2503 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2504 assert(rc == 0); 2505 2506 pthread_mutex_lock(&g_devlist_mutex); 2507 if (dev->pending_unregister && dev->for_each_count == 0) { 2508 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2509 assert(rc == 0); 2510 } 2511 pthread_mutex_unlock(&g_devlist_mutex); 2512 } 2513 2514 struct spdk_interrupt { 2515 int efd; 2516 struct spdk_thread *thread; 2517 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2518 }; 2519 2520 static void 2521 thread_interrupt_destroy(struct spdk_thread *thread) 2522 { 2523 struct spdk_fd_group *fgrp = thread->fgrp; 2524 2525 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2526 2527 if (thread->msg_fd < 0) { 2528 return; 2529 } 2530 2531 spdk_fd_group_remove(fgrp, thread->msg_fd); 2532 close(thread->msg_fd); 2533 thread->msg_fd = -1; 2534 2535 spdk_fd_group_destroy(fgrp); 2536 thread->fgrp = NULL; 2537 } 2538 2539 #ifdef __linux__ 2540 static int 2541 thread_interrupt_msg_process(void *arg) 2542 { 2543 struct spdk_thread *thread = arg; 2544 uint32_t msg_count; 2545 spdk_msg_fn critical_msg; 2546 int rc = 0; 2547 uint64_t notify = 1; 2548 2549 assert(spdk_interrupt_mode_is_enabled()); 2550 2551 /* There may be race between msg_acknowledge and another producer's msg_notify, 2552 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2553 * This can avoid msg notification missing. 2554 */ 2555 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2556 if (rc < 0 && errno != EAGAIN) { 2557 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2558 } 2559 2560 critical_msg = thread->critical_msg; 2561 if (spdk_unlikely(critical_msg != NULL)) { 2562 critical_msg(NULL); 2563 thread->critical_msg = NULL; 2564 rc = 1; 2565 } 2566 2567 msg_count = msg_queue_run_batch(thread, 0); 2568 if (msg_count) { 2569 rc = 1; 2570 } 2571 2572 return rc; 2573 } 2574 2575 static int 2576 thread_interrupt_create(struct spdk_thread *thread) 2577 { 2578 int rc; 2579 2580 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2581 2582 rc = spdk_fd_group_create(&thread->fgrp); 2583 if (rc) { 2584 return rc; 2585 } 2586 2587 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2588 if (thread->msg_fd < 0) { 2589 rc = -errno; 2590 spdk_fd_group_destroy(thread->fgrp); 2591 thread->fgrp = NULL; 2592 2593 return rc; 2594 } 2595 2596 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2597 thread_interrupt_msg_process, thread); 2598 } 2599 #else 2600 static int 2601 thread_interrupt_create(struct spdk_thread *thread) 2602 { 2603 return -ENOTSUP; 2604 } 2605 #endif 2606 2607 struct spdk_interrupt * 2608 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2609 void *arg, const char *name) 2610 { 2611 struct spdk_thread *thread; 2612 struct spdk_interrupt *intr; 2613 int ret; 2614 2615 thread = spdk_get_thread(); 2616 if (!thread) { 2617 assert(false); 2618 return NULL; 2619 } 2620 2621 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2622 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2623 return NULL; 2624 } 2625 2626 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2627 2628 if (ret != 0) { 2629 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2630 thread->name, efd, spdk_strerror(-ret)); 2631 return NULL; 2632 } 2633 2634 intr = calloc(1, sizeof(*intr)); 2635 if (intr == NULL) { 2636 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2637 return NULL; 2638 } 2639 2640 if (name) { 2641 snprintf(intr->name, sizeof(intr->name), "%s", name); 2642 } else { 2643 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2644 } 2645 2646 intr->efd = efd; 2647 intr->thread = thread; 2648 2649 return intr; 2650 } 2651 2652 void 2653 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2654 { 2655 struct spdk_thread *thread; 2656 struct spdk_interrupt *intr; 2657 2658 intr = *pintr; 2659 if (intr == NULL) { 2660 return; 2661 } 2662 2663 *pintr = NULL; 2664 2665 thread = spdk_get_thread(); 2666 if (!thread) { 2667 assert(false); 2668 return; 2669 } 2670 2671 if (intr->thread != thread) { 2672 wrong_thread(__func__, intr->name, intr->thread, thread); 2673 return; 2674 } 2675 2676 spdk_fd_group_remove(thread->fgrp, intr->efd); 2677 free(intr); 2678 } 2679 2680 int 2681 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2682 enum spdk_interrupt_event_types event_types) 2683 { 2684 struct spdk_thread *thread; 2685 2686 thread = spdk_get_thread(); 2687 if (!thread) { 2688 assert(false); 2689 return -EINVAL; 2690 } 2691 2692 if (intr->thread != thread) { 2693 wrong_thread(__func__, intr->name, intr->thread, thread); 2694 return -EINVAL; 2695 } 2696 2697 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2698 } 2699 2700 int 2701 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2702 { 2703 return spdk_fd_group_get_fd(thread->fgrp); 2704 } 2705 2706 static bool g_interrupt_mode = false; 2707 2708 int 2709 spdk_interrupt_mode_enable(void) 2710 { 2711 /* It must be called once prior to initializing the threading library. 2712 * g_spdk_msg_mempool will be valid if thread library is initialized. 2713 */ 2714 if (g_spdk_msg_mempool) { 2715 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2716 return -1; 2717 } 2718 2719 #ifdef __linux__ 2720 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2721 g_interrupt_mode = true; 2722 return 0; 2723 #else 2724 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2725 g_interrupt_mode = false; 2726 return -ENOTSUP; 2727 #endif 2728 } 2729 2730 bool 2731 spdk_interrupt_mode_is_enabled(void) 2732 { 2733 return g_interrupt_mode; 2734 } 2735 2736 SPDK_LOG_REGISTER_COMPONENT(thread) 2737