1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/env.h" 9 #include "spdk/likely.h" 10 #include "spdk/queue.h" 11 #include "spdk/string.h" 12 #include "spdk/thread.h" 13 #include "spdk/trace.h" 14 #include "spdk/util.h" 15 #include "spdk/fd_group.h" 16 17 #include "spdk/log.h" 18 #include "spdk_internal/thread.h" 19 #include "spdk_internal/usdt.h" 20 #include "thread_internal.h" 21 22 #include "spdk_internal/trace_defs.h" 23 24 #ifdef __linux__ 25 #include <sys/timerfd.h> 26 #include <sys/eventfd.h> 27 #endif 28 29 #define SPDK_MSG_BATCH_SIZE 8 30 #define SPDK_MAX_DEVICE_NAME_LEN 256 31 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 32 #define SPDK_MAX_POLLER_NAME_LEN 256 33 #define SPDK_MAX_THREAD_NAME_LEN 256 34 35 static struct spdk_thread *g_app_thread; 36 37 enum spdk_poller_state { 38 /* The poller is registered with a thread but not currently executing its fn. */ 39 SPDK_POLLER_STATE_WAITING, 40 41 /* The poller is currently running its fn. */ 42 SPDK_POLLER_STATE_RUNNING, 43 44 /* The poller was unregistered during the execution of its fn. */ 45 SPDK_POLLER_STATE_UNREGISTERED, 46 47 /* The poller is in the process of being paused. It will be paused 48 * during the next time it's supposed to be executed. 49 */ 50 SPDK_POLLER_STATE_PAUSING, 51 52 /* The poller is registered but currently paused. It's on the 53 * paused_pollers list. 54 */ 55 SPDK_POLLER_STATE_PAUSED, 56 }; 57 58 struct spdk_poller { 59 TAILQ_ENTRY(spdk_poller) tailq; 60 RB_ENTRY(spdk_poller) node; 61 62 /* Current state of the poller; should only be accessed from the poller's thread. */ 63 enum spdk_poller_state state; 64 65 uint64_t period_ticks; 66 uint64_t next_run_tick; 67 uint64_t run_count; 68 uint64_t busy_count; 69 uint64_t id; 70 spdk_poller_fn fn; 71 void *arg; 72 struct spdk_thread *thread; 73 /* Native interruptfd for period or busy poller */ 74 int interruptfd; 75 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 76 void *set_intr_cb_arg; 77 78 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 79 }; 80 81 enum spdk_thread_state { 82 /* The thread is processing poller and message by spdk_thread_poll(). */ 83 SPDK_THREAD_STATE_RUNNING, 84 85 /* The thread is in the process of termination. It reaps unregistering 86 * poller are releasing I/O channel. 87 */ 88 SPDK_THREAD_STATE_EXITING, 89 90 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 91 SPDK_THREAD_STATE_EXITED, 92 }; 93 94 struct spdk_thread { 95 uint64_t tsc_last; 96 struct spdk_thread_stats stats; 97 /* 98 * Contains pollers actively running on this thread. Pollers 99 * are run round-robin. The thread takes one poller from the head 100 * of the ring, executes it, then puts it back at the tail of 101 * the ring. 102 */ 103 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 104 /** 105 * Contains pollers running on this thread with a periodic timer. 106 */ 107 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 108 struct spdk_poller *first_timed_poller; 109 /* 110 * Contains paused pollers. Pollers on this queue are waiting until 111 * they are resumed (in which case they're put onto the active/timer 112 * queues) or unregistered. 113 */ 114 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 115 struct spdk_ring *messages; 116 int msg_fd; 117 SLIST_HEAD(, spdk_msg) msg_cache; 118 size_t msg_cache_count; 119 spdk_msg_fn critical_msg; 120 uint64_t id; 121 uint64_t next_poller_id; 122 enum spdk_thread_state state; 123 int pending_unregister_count; 124 125 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 126 TAILQ_ENTRY(spdk_thread) tailq; 127 128 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 129 struct spdk_cpuset cpumask; 130 uint64_t exit_timeout_tsc; 131 132 /* Indicates whether this spdk_thread currently runs in interrupt. */ 133 bool in_interrupt; 134 bool poller_unregistered; 135 struct spdk_fd_group *fgrp; 136 137 /* User context allocated at the end */ 138 uint8_t ctx[0]; 139 }; 140 141 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 142 143 static spdk_new_thread_fn g_new_thread_fn = NULL; 144 static spdk_thread_op_fn g_thread_op_fn = NULL; 145 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 146 static size_t g_ctx_sz = 0; 147 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 148 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 149 * SPDK application is required. 150 */ 151 static uint64_t g_thread_id = 1; 152 153 struct io_device { 154 void *io_device; 155 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 156 spdk_io_channel_create_cb create_cb; 157 spdk_io_channel_destroy_cb destroy_cb; 158 spdk_io_device_unregister_cb unregister_cb; 159 struct spdk_thread *unregister_thread; 160 uint32_t ctx_size; 161 uint32_t for_each_count; 162 RB_ENTRY(io_device) node; 163 164 uint32_t refcnt; 165 166 bool pending_unregister; 167 bool unregistered; 168 }; 169 170 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 171 172 static int 173 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 174 { 175 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 176 } 177 178 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 179 180 static int 181 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 182 { 183 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 184 } 185 186 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 187 188 struct spdk_msg { 189 spdk_msg_fn fn; 190 void *arg; 191 192 SLIST_ENTRY(spdk_msg) link; 193 }; 194 195 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 196 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 197 198 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 199 static uint32_t g_thread_count = 0; 200 201 static __thread struct spdk_thread *tls_thread = NULL; 202 203 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 204 { 205 spdk_trace_register_description("THREAD_IOCH_GET", 206 TRACE_THREAD_IOCH_GET, 207 OWNER_NONE, OBJECT_NONE, 0, 208 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 209 spdk_trace_register_description("THREAD_IOCH_PUT", 210 TRACE_THREAD_IOCH_PUT, 211 OWNER_NONE, OBJECT_NONE, 0, 212 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 213 } 214 215 /* 216 * If this compare function returns zero when two next_run_ticks are equal, 217 * the macro RB_INSERT() returns a pointer to the element with the same 218 * next_run_tick. 219 * 220 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 221 * to remove as a parameter. 222 * 223 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 224 * side by returning 1 when two next_run_ticks are equal. 225 */ 226 static inline int 227 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 228 { 229 if (poller1->next_run_tick < poller2->next_run_tick) { 230 return -1; 231 } else { 232 return 1; 233 } 234 } 235 236 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 237 238 static inline struct spdk_thread * 239 _get_thread(void) 240 { 241 return tls_thread; 242 } 243 244 static int 245 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 246 { 247 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 248 249 g_ctx_sz = ctx_sz; 250 251 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 252 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 253 sizeof(struct spdk_msg), 254 0, /* No cache. We do our own. */ 255 SPDK_ENV_SOCKET_ID_ANY); 256 257 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 258 msg_mempool_sz); 259 260 if (!g_spdk_msg_mempool) { 261 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 262 return -ENOMEM; 263 } 264 265 return 0; 266 } 267 268 static void thread_interrupt_destroy(struct spdk_thread *thread); 269 static int thread_interrupt_create(struct spdk_thread *thread); 270 271 static void 272 _free_thread(struct spdk_thread *thread) 273 { 274 struct spdk_io_channel *ch; 275 struct spdk_msg *msg; 276 struct spdk_poller *poller, *ptmp; 277 278 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 279 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 280 thread->name, ch->dev->name); 281 } 282 283 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 284 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 285 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 286 poller->name); 287 } 288 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 289 free(poller); 290 } 291 292 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 293 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 294 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 295 poller->name); 296 } 297 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 298 free(poller); 299 } 300 301 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 302 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 303 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 304 free(poller); 305 } 306 307 pthread_mutex_lock(&g_devlist_mutex); 308 assert(g_thread_count > 0); 309 g_thread_count--; 310 TAILQ_REMOVE(&g_threads, thread, tailq); 311 pthread_mutex_unlock(&g_devlist_mutex); 312 313 msg = SLIST_FIRST(&thread->msg_cache); 314 while (msg != NULL) { 315 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 316 317 assert(thread->msg_cache_count > 0); 318 thread->msg_cache_count--; 319 spdk_mempool_put(g_spdk_msg_mempool, msg); 320 321 msg = SLIST_FIRST(&thread->msg_cache); 322 } 323 324 assert(thread->msg_cache_count == 0); 325 326 if (spdk_interrupt_mode_is_enabled()) { 327 thread_interrupt_destroy(thread); 328 } 329 330 spdk_ring_free(thread->messages); 331 free(thread); 332 } 333 334 int 335 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 336 { 337 assert(g_new_thread_fn == NULL); 338 assert(g_thread_op_fn == NULL); 339 340 if (new_thread_fn == NULL) { 341 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 342 } else { 343 g_new_thread_fn = new_thread_fn; 344 } 345 346 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 347 } 348 349 int 350 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 351 spdk_thread_op_supported_fn thread_op_supported_fn, 352 size_t ctx_sz, size_t msg_mempool_sz) 353 { 354 assert(g_new_thread_fn == NULL); 355 assert(g_thread_op_fn == NULL); 356 assert(g_thread_op_supported_fn == NULL); 357 358 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 359 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 360 return -EINVAL; 361 } 362 363 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 364 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 365 } else { 366 g_thread_op_fn = thread_op_fn; 367 g_thread_op_supported_fn = thread_op_supported_fn; 368 } 369 370 return _thread_lib_init(ctx_sz, msg_mempool_sz); 371 } 372 373 void 374 spdk_thread_lib_fini(void) 375 { 376 struct io_device *dev; 377 378 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 379 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 380 } 381 382 g_new_thread_fn = NULL; 383 g_thread_op_fn = NULL; 384 g_thread_op_supported_fn = NULL; 385 g_ctx_sz = 0; 386 if (g_app_thread != NULL) { 387 _free_thread(g_app_thread); 388 g_app_thread = NULL; 389 } 390 391 if (g_spdk_msg_mempool) { 392 spdk_mempool_free(g_spdk_msg_mempool); 393 g_spdk_msg_mempool = NULL; 394 } 395 } 396 397 struct spdk_thread * 398 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 399 { 400 struct spdk_thread *thread, *null_thread; 401 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 402 int rc = 0, i; 403 404 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 405 if (!thread) { 406 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 407 return NULL; 408 } 409 410 if (cpumask) { 411 spdk_cpuset_copy(&thread->cpumask, cpumask); 412 } else { 413 spdk_cpuset_negate(&thread->cpumask); 414 } 415 416 RB_INIT(&thread->io_channels); 417 TAILQ_INIT(&thread->active_pollers); 418 RB_INIT(&thread->timed_pollers); 419 TAILQ_INIT(&thread->paused_pollers); 420 SLIST_INIT(&thread->msg_cache); 421 thread->msg_cache_count = 0; 422 423 thread->tsc_last = spdk_get_ticks(); 424 425 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 426 * ID exceeds UINT64_MAX a warning message is logged 427 */ 428 thread->next_poller_id = 1; 429 430 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 431 if (!thread->messages) { 432 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 433 free(thread); 434 return NULL; 435 } 436 437 /* Fill the local message pool cache. */ 438 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 439 if (rc == 0) { 440 /* If we can't populate the cache it's ok. The cache will get filled 441 * up organically as messages are passed to the thread. */ 442 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 443 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 444 thread->msg_cache_count++; 445 } 446 } 447 448 if (name) { 449 snprintf(thread->name, sizeof(thread->name), "%s", name); 450 } else { 451 snprintf(thread->name, sizeof(thread->name), "%p", thread); 452 } 453 454 pthread_mutex_lock(&g_devlist_mutex); 455 if (g_thread_id == 0) { 456 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 457 pthread_mutex_unlock(&g_devlist_mutex); 458 _free_thread(thread); 459 return NULL; 460 } 461 thread->id = g_thread_id++; 462 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 463 g_thread_count++; 464 pthread_mutex_unlock(&g_devlist_mutex); 465 466 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 467 thread->id, thread->name); 468 469 if (spdk_interrupt_mode_is_enabled()) { 470 thread->in_interrupt = true; 471 rc = thread_interrupt_create(thread); 472 if (rc != 0) { 473 _free_thread(thread); 474 return NULL; 475 } 476 } 477 478 if (g_new_thread_fn) { 479 rc = g_new_thread_fn(thread); 480 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 481 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 482 } 483 484 if (rc != 0) { 485 _free_thread(thread); 486 return NULL; 487 } 488 489 thread->state = SPDK_THREAD_STATE_RUNNING; 490 491 /* If this is the first thread, save it as the app thread. Use an atomic 492 * compare + exchange to guard against crazy users who might try to 493 * call spdk_thread_create() simultaneously on multiple threads. 494 */ 495 null_thread = NULL; 496 __atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false, 497 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); 498 499 return thread; 500 } 501 502 struct spdk_thread * 503 spdk_thread_get_app_thread(void) 504 { 505 return g_app_thread; 506 } 507 508 void 509 spdk_set_thread(struct spdk_thread *thread) 510 { 511 tls_thread = thread; 512 } 513 514 static void 515 thread_exit(struct spdk_thread *thread, uint64_t now) 516 { 517 struct spdk_poller *poller; 518 struct spdk_io_channel *ch; 519 520 if (now >= thread->exit_timeout_tsc) { 521 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 522 thread->name); 523 goto exited; 524 } 525 526 if (spdk_ring_count(thread->messages) > 0) { 527 SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name); 528 return; 529 } 530 531 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 532 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 533 SPDK_INFOLOG(thread, 534 "thread %s still has active poller %s\n", 535 thread->name, poller->name); 536 return; 537 } 538 } 539 540 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 541 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 542 SPDK_INFOLOG(thread, 543 "thread %s still has active timed poller %s\n", 544 thread->name, poller->name); 545 return; 546 } 547 } 548 549 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 550 SPDK_INFOLOG(thread, 551 "thread %s still has paused poller %s\n", 552 thread->name, poller->name); 553 return; 554 } 555 556 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 557 SPDK_INFOLOG(thread, 558 "thread %s still has channel for io_device %s\n", 559 thread->name, ch->dev->name); 560 return; 561 } 562 563 if (thread->pending_unregister_count > 0) { 564 SPDK_INFOLOG(thread, 565 "thread %s is still unregistering io_devices\n", 566 thread->name); 567 return; 568 } 569 570 exited: 571 thread->state = SPDK_THREAD_STATE_EXITED; 572 if (spdk_unlikely(thread->in_interrupt)) { 573 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 574 } 575 } 576 577 int 578 spdk_thread_exit(struct spdk_thread *thread) 579 { 580 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 581 582 assert(tls_thread == thread); 583 584 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 585 SPDK_INFOLOG(thread, 586 "thread %s is already exiting\n", 587 thread->name); 588 return 0; 589 } 590 591 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 592 SPDK_THREAD_EXIT_TIMEOUT_SEC); 593 thread->state = SPDK_THREAD_STATE_EXITING; 594 return 0; 595 } 596 597 bool 598 spdk_thread_is_exited(struct spdk_thread *thread) 599 { 600 return thread->state == SPDK_THREAD_STATE_EXITED; 601 } 602 603 void 604 spdk_thread_destroy(struct spdk_thread *thread) 605 { 606 assert(thread != NULL); 607 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 608 609 assert(thread->state == SPDK_THREAD_STATE_EXITED); 610 611 if (tls_thread == thread) { 612 tls_thread = NULL; 613 } 614 615 /* To be safe, do not free the app thread until spdk_thread_lib_fini(). */ 616 if (thread != g_app_thread) { 617 _free_thread(thread); 618 } 619 } 620 621 void * 622 spdk_thread_get_ctx(struct spdk_thread *thread) 623 { 624 if (g_ctx_sz > 0) { 625 return thread->ctx; 626 } 627 628 return NULL; 629 } 630 631 struct spdk_cpuset * 632 spdk_thread_get_cpumask(struct spdk_thread *thread) 633 { 634 return &thread->cpumask; 635 } 636 637 int 638 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 639 { 640 struct spdk_thread *thread; 641 642 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 643 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 644 assert(false); 645 return -ENOTSUP; 646 } 647 648 thread = spdk_get_thread(); 649 if (!thread) { 650 SPDK_ERRLOG("Called from non-SPDK thread\n"); 651 assert(false); 652 return -EINVAL; 653 } 654 655 spdk_cpuset_copy(&thread->cpumask, cpumask); 656 657 /* Invoke framework's reschedule operation. If this function is called multiple times 658 * in a single spdk_thread_poll() context, the last cpumask will be used in the 659 * reschedule operation. 660 */ 661 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 662 663 return 0; 664 } 665 666 struct spdk_thread * 667 spdk_thread_get_from_ctx(void *ctx) 668 { 669 if (ctx == NULL) { 670 assert(false); 671 return NULL; 672 } 673 674 assert(g_ctx_sz > 0); 675 676 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 677 } 678 679 static inline uint32_t 680 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 681 { 682 unsigned count, i; 683 void *messages[SPDK_MSG_BATCH_SIZE]; 684 uint64_t notify = 1; 685 int rc; 686 687 #ifdef DEBUG 688 /* 689 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 690 * so we will never actually read uninitialized data from events, but just to be sure 691 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 692 */ 693 memset(messages, 0, sizeof(messages)); 694 #endif 695 696 if (max_msgs > 0) { 697 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 698 } else { 699 max_msgs = SPDK_MSG_BATCH_SIZE; 700 } 701 702 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 703 if (spdk_unlikely(thread->in_interrupt) && 704 spdk_ring_count(thread->messages) != 0) { 705 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 706 if (rc < 0) { 707 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 708 } 709 } 710 if (count == 0) { 711 return 0; 712 } 713 714 for (i = 0; i < count; i++) { 715 struct spdk_msg *msg = messages[i]; 716 717 assert(msg != NULL); 718 719 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 720 721 msg->fn(msg->arg); 722 723 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 724 /* Insert the messages at the head. We want to re-use the hot 725 * ones. */ 726 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 727 thread->msg_cache_count++; 728 } else { 729 spdk_mempool_put(g_spdk_msg_mempool, msg); 730 } 731 } 732 733 return count; 734 } 735 736 static void 737 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 738 { 739 struct spdk_poller *tmp __attribute__((unused)); 740 741 poller->next_run_tick = now + poller->period_ticks; 742 743 /* 744 * Insert poller in the thread's timed_pollers tree by next scheduled run time 745 * as its key. 746 */ 747 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 748 assert(tmp == NULL); 749 750 /* Update the cache only if it is empty or the inserted poller is earlier than it. 751 * RB_MIN() is not necessary here because all pollers, which has exactly the same 752 * next_run_tick as the existing poller, are inserted on the right side. 753 */ 754 if (thread->first_timed_poller == NULL || 755 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 756 thread->first_timed_poller = poller; 757 } 758 } 759 760 static inline void 761 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 762 { 763 struct spdk_poller *tmp __attribute__((unused)); 764 765 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 766 assert(tmp != NULL); 767 768 /* This function is not used in any case that is performance critical. 769 * Update the cache simply by RB_MIN() if it needs to be changed. 770 */ 771 if (thread->first_timed_poller == poller) { 772 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 773 } 774 } 775 776 static void 777 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 778 { 779 if (poller->period_ticks) { 780 poller_insert_timer(thread, poller, spdk_get_ticks()); 781 } else { 782 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 783 } 784 } 785 786 static inline void 787 thread_update_stats(struct spdk_thread *thread, uint64_t end, 788 uint64_t start, int rc) 789 { 790 if (rc == 0) { 791 /* Poller status idle */ 792 thread->stats.idle_tsc += end - start; 793 } else if (rc > 0) { 794 /* Poller status busy */ 795 thread->stats.busy_tsc += end - start; 796 } 797 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 798 thread->tsc_last = end; 799 } 800 801 static inline int 802 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 803 { 804 int rc; 805 806 switch (poller->state) { 807 case SPDK_POLLER_STATE_UNREGISTERED: 808 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 809 free(poller); 810 return 0; 811 case SPDK_POLLER_STATE_PAUSING: 812 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 813 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 814 poller->state = SPDK_POLLER_STATE_PAUSED; 815 return 0; 816 case SPDK_POLLER_STATE_WAITING: 817 break; 818 default: 819 assert(false); 820 break; 821 } 822 823 poller->state = SPDK_POLLER_STATE_RUNNING; 824 rc = poller->fn(poller->arg); 825 826 poller->run_count++; 827 if (rc > 0) { 828 poller->busy_count++; 829 } 830 831 #ifdef DEBUG 832 if (rc == -1) { 833 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 834 } 835 #endif 836 837 switch (poller->state) { 838 case SPDK_POLLER_STATE_UNREGISTERED: 839 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 840 free(poller); 841 break; 842 case SPDK_POLLER_STATE_PAUSING: 843 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 844 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 845 poller->state = SPDK_POLLER_STATE_PAUSED; 846 break; 847 case SPDK_POLLER_STATE_PAUSED: 848 case SPDK_POLLER_STATE_WAITING: 849 break; 850 case SPDK_POLLER_STATE_RUNNING: 851 poller->state = SPDK_POLLER_STATE_WAITING; 852 break; 853 default: 854 assert(false); 855 break; 856 } 857 858 return rc; 859 } 860 861 static inline int 862 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 863 uint64_t now) 864 { 865 int rc; 866 867 switch (poller->state) { 868 case SPDK_POLLER_STATE_UNREGISTERED: 869 free(poller); 870 return 0; 871 case SPDK_POLLER_STATE_PAUSING: 872 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 873 poller->state = SPDK_POLLER_STATE_PAUSED; 874 return 0; 875 case SPDK_POLLER_STATE_WAITING: 876 break; 877 default: 878 assert(false); 879 break; 880 } 881 882 poller->state = SPDK_POLLER_STATE_RUNNING; 883 rc = poller->fn(poller->arg); 884 885 poller->run_count++; 886 if (rc > 0) { 887 poller->busy_count++; 888 } 889 890 #ifdef DEBUG 891 if (rc == -1) { 892 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 893 } 894 #endif 895 896 switch (poller->state) { 897 case SPDK_POLLER_STATE_UNREGISTERED: 898 free(poller); 899 break; 900 case SPDK_POLLER_STATE_PAUSING: 901 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 902 poller->state = SPDK_POLLER_STATE_PAUSED; 903 break; 904 case SPDK_POLLER_STATE_PAUSED: 905 break; 906 case SPDK_POLLER_STATE_RUNNING: 907 poller->state = SPDK_POLLER_STATE_WAITING; 908 /* fallthrough */ 909 case SPDK_POLLER_STATE_WAITING: 910 poller_insert_timer(thread, poller, now); 911 break; 912 default: 913 assert(false); 914 break; 915 } 916 917 return rc; 918 } 919 920 static int 921 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 922 { 923 uint32_t msg_count; 924 struct spdk_poller *poller, *tmp; 925 spdk_msg_fn critical_msg; 926 int rc = 0; 927 928 thread->tsc_last = now; 929 930 critical_msg = thread->critical_msg; 931 if (spdk_unlikely(critical_msg != NULL)) { 932 critical_msg(NULL); 933 thread->critical_msg = NULL; 934 rc = 1; 935 } 936 937 msg_count = msg_queue_run_batch(thread, max_msgs); 938 if (msg_count) { 939 rc = 1; 940 } 941 942 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 943 active_pollers_head, tailq, tmp) { 944 int poller_rc; 945 946 poller_rc = thread_execute_poller(thread, poller); 947 if (poller_rc > rc) { 948 rc = poller_rc; 949 } 950 } 951 952 poller = thread->first_timed_poller; 953 while (poller != NULL) { 954 int timer_rc = 0; 955 956 if (now < poller->next_run_tick) { 957 break; 958 } 959 960 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 961 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 962 963 /* Update the cache to the next timed poller in the list 964 * only if the current poller is still the closest, otherwise, 965 * do nothing because the cache has been already updated. 966 */ 967 if (thread->first_timed_poller == poller) { 968 thread->first_timed_poller = tmp; 969 } 970 971 timer_rc = thread_execute_timed_poller(thread, poller, now); 972 if (timer_rc > rc) { 973 rc = timer_rc; 974 } 975 976 poller = tmp; 977 } 978 979 return rc; 980 } 981 982 int 983 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 984 { 985 struct spdk_thread *orig_thread; 986 int rc; 987 uint64_t notify = 1; 988 989 orig_thread = _get_thread(); 990 tls_thread = thread; 991 992 if (now == 0) { 993 now = spdk_get_ticks(); 994 } 995 996 if (spdk_likely(!thread->in_interrupt)) { 997 rc = thread_poll(thread, max_msgs, now); 998 if (spdk_unlikely(thread->in_interrupt)) { 999 /* The thread transitioned to interrupt mode during the above poll. 1000 * Poll it one more time in case that during the transition time 1001 * there is msg received without notification. 1002 */ 1003 rc = thread_poll(thread, max_msgs, now); 1004 } 1005 } else { 1006 /* Non-block wait on thread's fd_group */ 1007 rc = spdk_fd_group_wait(thread->fgrp, 0); 1008 if (spdk_unlikely(!thread->in_interrupt)) { 1009 /* The thread transitioned to poll mode in a msg during the above processing. 1010 * Clear msg_fd since thread messages will be polled directly in poll mode. 1011 */ 1012 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 1013 if (rc < 0 && errno != EAGAIN) { 1014 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 1015 } 1016 } 1017 1018 /* Reap unregistered pollers out of poller execution in intr mode */ 1019 if (spdk_unlikely(thread->poller_unregistered)) { 1020 struct spdk_poller *poller, *tmp; 1021 1022 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1023 active_pollers_head, tailq, tmp) { 1024 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1025 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1026 free(poller); 1027 } 1028 } 1029 1030 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1031 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1032 poller_remove_timer(thread, poller); 1033 free(poller); 1034 } 1035 } 1036 1037 thread->poller_unregistered = false; 1038 } 1039 } 1040 1041 1042 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1043 thread_exit(thread, now); 1044 } 1045 1046 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1047 1048 tls_thread = orig_thread; 1049 1050 return rc; 1051 } 1052 1053 uint64_t 1054 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1055 { 1056 struct spdk_poller *poller; 1057 1058 poller = thread->first_timed_poller; 1059 if (poller) { 1060 return poller->next_run_tick; 1061 } 1062 1063 return 0; 1064 } 1065 1066 int 1067 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1068 { 1069 return !TAILQ_EMPTY(&thread->active_pollers); 1070 } 1071 1072 static bool 1073 thread_has_unpaused_pollers(struct spdk_thread *thread) 1074 { 1075 if (TAILQ_EMPTY(&thread->active_pollers) && 1076 RB_EMPTY(&thread->timed_pollers)) { 1077 return false; 1078 } 1079 1080 return true; 1081 } 1082 1083 bool 1084 spdk_thread_has_pollers(struct spdk_thread *thread) 1085 { 1086 if (!thread_has_unpaused_pollers(thread) && 1087 TAILQ_EMPTY(&thread->paused_pollers)) { 1088 return false; 1089 } 1090 1091 return true; 1092 } 1093 1094 bool 1095 spdk_thread_is_idle(struct spdk_thread *thread) 1096 { 1097 if (spdk_ring_count(thread->messages) || 1098 thread_has_unpaused_pollers(thread) || 1099 thread->critical_msg != NULL) { 1100 return false; 1101 } 1102 1103 return true; 1104 } 1105 1106 uint32_t 1107 spdk_thread_get_count(void) 1108 { 1109 /* 1110 * Return cached value of the current thread count. We could acquire the 1111 * lock and iterate through the TAILQ of threads to count them, but that 1112 * count could still be invalidated after we release the lock. 1113 */ 1114 return g_thread_count; 1115 } 1116 1117 struct spdk_thread * 1118 spdk_get_thread(void) 1119 { 1120 return _get_thread(); 1121 } 1122 1123 const char * 1124 spdk_thread_get_name(const struct spdk_thread *thread) 1125 { 1126 return thread->name; 1127 } 1128 1129 uint64_t 1130 spdk_thread_get_id(const struct spdk_thread *thread) 1131 { 1132 return thread->id; 1133 } 1134 1135 struct spdk_thread * 1136 spdk_thread_get_by_id(uint64_t id) 1137 { 1138 struct spdk_thread *thread; 1139 1140 if (id == 0 || id >= g_thread_id) { 1141 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1142 return NULL; 1143 } 1144 pthread_mutex_lock(&g_devlist_mutex); 1145 TAILQ_FOREACH(thread, &g_threads, tailq) { 1146 if (thread->id == id) { 1147 break; 1148 } 1149 } 1150 pthread_mutex_unlock(&g_devlist_mutex); 1151 return thread; 1152 } 1153 1154 int 1155 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1156 { 1157 struct spdk_thread *thread; 1158 1159 thread = _get_thread(); 1160 if (!thread) { 1161 SPDK_ERRLOG("No thread allocated\n"); 1162 return -EINVAL; 1163 } 1164 1165 if (stats == NULL) { 1166 return -EINVAL; 1167 } 1168 1169 *stats = thread->stats; 1170 1171 return 0; 1172 } 1173 1174 uint64_t 1175 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1176 { 1177 if (thread == NULL) { 1178 thread = _get_thread(); 1179 } 1180 1181 return thread->tsc_last; 1182 } 1183 1184 static inline int 1185 thread_send_msg_notification(const struct spdk_thread *target_thread) 1186 { 1187 uint64_t notify = 1; 1188 int rc; 1189 1190 /* Not necessary to do notification if interrupt facility is not enabled */ 1191 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1192 return 0; 1193 } 1194 1195 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1196 * after sending thread msg, it is necessary to check whether target thread runs in 1197 * interrupt mode and then decide whether do event notification. 1198 */ 1199 if (spdk_unlikely(target_thread->in_interrupt)) { 1200 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1201 if (rc < 0) { 1202 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1203 return -EIO; 1204 } 1205 } 1206 1207 return 0; 1208 } 1209 1210 int 1211 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1212 { 1213 struct spdk_thread *local_thread; 1214 struct spdk_msg *msg; 1215 int rc; 1216 1217 assert(thread != NULL); 1218 1219 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1220 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1221 return -EIO; 1222 } 1223 1224 local_thread = _get_thread(); 1225 1226 msg = NULL; 1227 if (local_thread != NULL) { 1228 if (local_thread->msg_cache_count > 0) { 1229 msg = SLIST_FIRST(&local_thread->msg_cache); 1230 assert(msg != NULL); 1231 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1232 local_thread->msg_cache_count--; 1233 } 1234 } 1235 1236 if (msg == NULL) { 1237 msg = spdk_mempool_get(g_spdk_msg_mempool); 1238 if (!msg) { 1239 SPDK_ERRLOG("msg could not be allocated\n"); 1240 return -ENOMEM; 1241 } 1242 } 1243 1244 msg->fn = fn; 1245 msg->arg = ctx; 1246 1247 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1248 if (rc != 1) { 1249 SPDK_ERRLOG("msg could not be enqueued\n"); 1250 spdk_mempool_put(g_spdk_msg_mempool, msg); 1251 return -EIO; 1252 } 1253 1254 return thread_send_msg_notification(thread); 1255 } 1256 1257 int 1258 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1259 { 1260 spdk_msg_fn expected = NULL; 1261 1262 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1263 __ATOMIC_SEQ_CST)) { 1264 return -EIO; 1265 } 1266 1267 return thread_send_msg_notification(thread); 1268 } 1269 1270 #ifdef __linux__ 1271 static int 1272 interrupt_timerfd_process(void *arg) 1273 { 1274 struct spdk_poller *poller = arg; 1275 uint64_t exp; 1276 int rc; 1277 1278 /* clear the level of interval timer */ 1279 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1280 if (rc < 0) { 1281 if (rc == -EAGAIN) { 1282 return 0; 1283 } 1284 1285 return rc; 1286 } 1287 1288 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1289 1290 return poller->fn(poller->arg); 1291 } 1292 1293 static int 1294 period_poller_interrupt_init(struct spdk_poller *poller) 1295 { 1296 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1297 int timerfd; 1298 int rc; 1299 1300 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1301 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1302 if (timerfd < 0) { 1303 return -errno; 1304 } 1305 1306 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1307 if (rc < 0) { 1308 close(timerfd); 1309 return rc; 1310 } 1311 1312 poller->interruptfd = timerfd; 1313 return 0; 1314 } 1315 1316 static void 1317 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1318 { 1319 int timerfd = poller->interruptfd; 1320 uint64_t now_tick = spdk_get_ticks(); 1321 uint64_t ticks = spdk_get_ticks_hz(); 1322 int ret; 1323 struct itimerspec new_tv = {}; 1324 struct itimerspec old_tv = {}; 1325 1326 assert(poller->period_ticks != 0); 1327 assert(timerfd >= 0); 1328 1329 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1330 interrupt_mode ? "interrupt" : "poll"); 1331 1332 if (interrupt_mode) { 1333 /* Set repeated timer expiration */ 1334 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1335 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1336 1337 /* Update next timer expiration */ 1338 if (poller->next_run_tick == 0) { 1339 poller->next_run_tick = now_tick + poller->period_ticks; 1340 } else if (poller->next_run_tick < now_tick) { 1341 poller->next_run_tick = now_tick; 1342 } 1343 1344 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1345 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1346 1347 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1348 if (ret < 0) { 1349 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1350 assert(false); 1351 } 1352 } else { 1353 /* Disarm the timer */ 1354 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1355 if (ret < 0) { 1356 /* timerfd_settime's failure indicates that the timerfd is in error */ 1357 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1358 assert(false); 1359 } 1360 1361 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1362 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1363 */ 1364 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1365 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1366 poller_remove_timer(poller->thread, poller); 1367 poller_insert_timer(poller->thread, poller, now_tick); 1368 } 1369 } 1370 1371 static void 1372 poller_interrupt_fini(struct spdk_poller *poller) 1373 { 1374 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1375 assert(poller->interruptfd >= 0); 1376 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1377 close(poller->interruptfd); 1378 poller->interruptfd = -1; 1379 } 1380 1381 static int 1382 busy_poller_interrupt_init(struct spdk_poller *poller) 1383 { 1384 int busy_efd; 1385 int rc; 1386 1387 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1388 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1389 if (busy_efd < 0) { 1390 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1391 return -errno; 1392 } 1393 1394 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1395 poller->fn, poller->arg, poller->name); 1396 if (rc < 0) { 1397 close(busy_efd); 1398 return rc; 1399 } 1400 1401 poller->interruptfd = busy_efd; 1402 return 0; 1403 } 1404 1405 static void 1406 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1407 { 1408 int busy_efd = poller->interruptfd; 1409 uint64_t notify = 1; 1410 int rc __attribute__((unused)); 1411 1412 assert(busy_efd >= 0); 1413 1414 if (interrupt_mode) { 1415 /* Write without read on eventfd will get it repeatedly triggered. */ 1416 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1417 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1418 } 1419 } else { 1420 /* Read on eventfd will clear its level triggering. */ 1421 rc = read(busy_efd, ¬ify, sizeof(notify)); 1422 } 1423 } 1424 1425 #else 1426 1427 static int 1428 period_poller_interrupt_init(struct spdk_poller *poller) 1429 { 1430 return -ENOTSUP; 1431 } 1432 1433 static void 1434 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1435 { 1436 } 1437 1438 static void 1439 poller_interrupt_fini(struct spdk_poller *poller) 1440 { 1441 } 1442 1443 static int 1444 busy_poller_interrupt_init(struct spdk_poller *poller) 1445 { 1446 return -ENOTSUP; 1447 } 1448 1449 static void 1450 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1451 { 1452 } 1453 1454 #endif 1455 1456 void 1457 spdk_poller_register_interrupt(struct spdk_poller *poller, 1458 spdk_poller_set_interrupt_mode_cb cb_fn, 1459 void *cb_arg) 1460 { 1461 assert(poller != NULL); 1462 assert(cb_fn != NULL); 1463 assert(spdk_get_thread() == poller->thread); 1464 1465 if (!spdk_interrupt_mode_is_enabled()) { 1466 return; 1467 } 1468 1469 /* when a poller is created we don't know if the user is ever going to 1470 * enable interrupts on it by calling this function, so the poller 1471 * registration function has to immediately create a interruptfd. 1472 * When this function does get called by user, we have to then destroy 1473 * that interruptfd. 1474 */ 1475 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1476 poller_interrupt_fini(poller); 1477 } 1478 1479 poller->set_intr_cb_fn = cb_fn; 1480 poller->set_intr_cb_arg = cb_arg; 1481 1482 /* Set poller into interrupt mode if thread is in interrupt. */ 1483 if (poller->thread->in_interrupt) { 1484 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1485 } 1486 } 1487 1488 static uint64_t 1489 convert_us_to_ticks(uint64_t us) 1490 { 1491 uint64_t quotient, remainder, ticks; 1492 1493 if (us) { 1494 quotient = us / SPDK_SEC_TO_USEC; 1495 remainder = us % SPDK_SEC_TO_USEC; 1496 ticks = spdk_get_ticks_hz(); 1497 1498 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1499 } else { 1500 return 0; 1501 } 1502 } 1503 1504 static struct spdk_poller * 1505 poller_register(spdk_poller_fn fn, 1506 void *arg, 1507 uint64_t period_microseconds, 1508 const char *name) 1509 { 1510 struct spdk_thread *thread; 1511 struct spdk_poller *poller; 1512 1513 thread = spdk_get_thread(); 1514 if (!thread) { 1515 assert(false); 1516 return NULL; 1517 } 1518 1519 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1520 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1521 return NULL; 1522 } 1523 1524 poller = calloc(1, sizeof(*poller)); 1525 if (poller == NULL) { 1526 SPDK_ERRLOG("Poller memory allocation failed\n"); 1527 return NULL; 1528 } 1529 1530 if (name) { 1531 snprintf(poller->name, sizeof(poller->name), "%s", name); 1532 } else { 1533 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1534 } 1535 1536 poller->state = SPDK_POLLER_STATE_WAITING; 1537 poller->fn = fn; 1538 poller->arg = arg; 1539 poller->thread = thread; 1540 poller->interruptfd = -1; 1541 if (thread->next_poller_id == 0) { 1542 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1543 thread->next_poller_id = 1; 1544 } 1545 poller->id = thread->next_poller_id++; 1546 1547 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1548 1549 if (spdk_interrupt_mode_is_enabled()) { 1550 int rc; 1551 1552 if (period_microseconds) { 1553 rc = period_poller_interrupt_init(poller); 1554 if (rc < 0) { 1555 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1556 free(poller); 1557 return NULL; 1558 } 1559 1560 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1561 } else { 1562 /* If the poller doesn't have a period, create interruptfd that's always 1563 * busy automatically when running in interrupt mode. 1564 */ 1565 rc = busy_poller_interrupt_init(poller); 1566 if (rc > 0) { 1567 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1568 free(poller); 1569 return NULL; 1570 } 1571 1572 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1573 } 1574 } 1575 1576 thread_insert_poller(thread, poller); 1577 1578 return poller; 1579 } 1580 1581 struct spdk_poller * 1582 spdk_poller_register(spdk_poller_fn fn, 1583 void *arg, 1584 uint64_t period_microseconds) 1585 { 1586 return poller_register(fn, arg, period_microseconds, NULL); 1587 } 1588 1589 struct spdk_poller * 1590 spdk_poller_register_named(spdk_poller_fn fn, 1591 void *arg, 1592 uint64_t period_microseconds, 1593 const char *name) 1594 { 1595 return poller_register(fn, arg, period_microseconds, name); 1596 } 1597 1598 static void 1599 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1600 struct spdk_thread *curthread) 1601 { 1602 if (thread == NULL) { 1603 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1604 abort(); 1605 } 1606 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1607 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1608 thread->name, thread->id); 1609 assert(false); 1610 } 1611 1612 void 1613 spdk_poller_unregister(struct spdk_poller **ppoller) 1614 { 1615 struct spdk_thread *thread; 1616 struct spdk_poller *poller; 1617 1618 poller = *ppoller; 1619 if (poller == NULL) { 1620 return; 1621 } 1622 1623 *ppoller = NULL; 1624 1625 thread = spdk_get_thread(); 1626 if (!thread) { 1627 assert(false); 1628 return; 1629 } 1630 1631 if (poller->thread != thread) { 1632 wrong_thread(__func__, poller->name, poller->thread, thread); 1633 return; 1634 } 1635 1636 if (spdk_interrupt_mode_is_enabled()) { 1637 /* Release the interrupt resource for period or busy poller */ 1638 if (poller->interruptfd >= 0) { 1639 poller_interrupt_fini(poller); 1640 } 1641 1642 /* Mark there is poller unregistered. Then unregistered pollers will 1643 * get reaped by spdk_thread_poll also in intr mode. 1644 */ 1645 thread->poller_unregistered = true; 1646 } 1647 1648 /* If the poller was paused, put it on the active_pollers list so that 1649 * its unregistration can be processed by spdk_thread_poll(). 1650 */ 1651 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1652 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1653 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1654 poller->period_ticks = 0; 1655 } 1656 1657 /* Simply set the state to unregistered. The poller will get cleaned up 1658 * in a subsequent call to spdk_thread_poll(). 1659 */ 1660 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1661 } 1662 1663 void 1664 spdk_poller_pause(struct spdk_poller *poller) 1665 { 1666 struct spdk_thread *thread; 1667 1668 thread = spdk_get_thread(); 1669 if (!thread) { 1670 assert(false); 1671 return; 1672 } 1673 1674 if (poller->thread != thread) { 1675 wrong_thread(__func__, poller->name, poller->thread, thread); 1676 return; 1677 } 1678 1679 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1680 * spdk_thread_poll() move it. It allows a poller to be paused from 1681 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1682 * iteration, or from within itself without breaking the logic to always 1683 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1684 */ 1685 switch (poller->state) { 1686 case SPDK_POLLER_STATE_PAUSED: 1687 case SPDK_POLLER_STATE_PAUSING: 1688 break; 1689 case SPDK_POLLER_STATE_RUNNING: 1690 case SPDK_POLLER_STATE_WAITING: 1691 poller->state = SPDK_POLLER_STATE_PAUSING; 1692 break; 1693 default: 1694 assert(false); 1695 break; 1696 } 1697 } 1698 1699 void 1700 spdk_poller_resume(struct spdk_poller *poller) 1701 { 1702 struct spdk_thread *thread; 1703 1704 thread = spdk_get_thread(); 1705 if (!thread) { 1706 assert(false); 1707 return; 1708 } 1709 1710 if (poller->thread != thread) { 1711 wrong_thread(__func__, poller->name, poller->thread, thread); 1712 return; 1713 } 1714 1715 /* If a poller is paused it has to be removed from the paused pollers 1716 * list and put on the active list or timer tree depending on its 1717 * period_ticks. If a poller is still in the process of being paused, 1718 * we just need to flip its state back to waiting, as it's already on 1719 * the appropriate list or tree. 1720 */ 1721 switch (poller->state) { 1722 case SPDK_POLLER_STATE_PAUSED: 1723 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1724 thread_insert_poller(thread, poller); 1725 /* fallthrough */ 1726 case SPDK_POLLER_STATE_PAUSING: 1727 poller->state = SPDK_POLLER_STATE_WAITING; 1728 break; 1729 case SPDK_POLLER_STATE_RUNNING: 1730 case SPDK_POLLER_STATE_WAITING: 1731 break; 1732 default: 1733 assert(false); 1734 break; 1735 } 1736 } 1737 1738 const char * 1739 spdk_poller_get_name(struct spdk_poller *poller) 1740 { 1741 return poller->name; 1742 } 1743 1744 uint64_t 1745 spdk_poller_get_id(struct spdk_poller *poller) 1746 { 1747 return poller->id; 1748 } 1749 1750 const char * 1751 spdk_poller_get_state_str(struct spdk_poller *poller) 1752 { 1753 switch (poller->state) { 1754 case SPDK_POLLER_STATE_WAITING: 1755 return "waiting"; 1756 case SPDK_POLLER_STATE_RUNNING: 1757 return "running"; 1758 case SPDK_POLLER_STATE_UNREGISTERED: 1759 return "unregistered"; 1760 case SPDK_POLLER_STATE_PAUSING: 1761 return "pausing"; 1762 case SPDK_POLLER_STATE_PAUSED: 1763 return "paused"; 1764 default: 1765 return NULL; 1766 } 1767 } 1768 1769 uint64_t 1770 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1771 { 1772 return poller->period_ticks; 1773 } 1774 1775 void 1776 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1777 { 1778 stats->run_count = poller->run_count; 1779 stats->busy_count = poller->busy_count; 1780 } 1781 1782 struct spdk_poller * 1783 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1784 { 1785 return TAILQ_FIRST(&thread->active_pollers); 1786 } 1787 1788 struct spdk_poller * 1789 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1790 { 1791 return TAILQ_NEXT(prev, tailq); 1792 } 1793 1794 struct spdk_poller * 1795 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1796 { 1797 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1798 } 1799 1800 struct spdk_poller * 1801 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1802 { 1803 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1804 } 1805 1806 struct spdk_poller * 1807 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1808 { 1809 return TAILQ_FIRST(&thread->paused_pollers); 1810 } 1811 1812 struct spdk_poller * 1813 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1814 { 1815 return TAILQ_NEXT(prev, tailq); 1816 } 1817 1818 struct spdk_io_channel * 1819 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1820 { 1821 return RB_MIN(io_channel_tree, &thread->io_channels); 1822 } 1823 1824 struct spdk_io_channel * 1825 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1826 { 1827 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1828 } 1829 1830 struct call_thread { 1831 struct spdk_thread *cur_thread; 1832 spdk_msg_fn fn; 1833 void *ctx; 1834 1835 struct spdk_thread *orig_thread; 1836 spdk_msg_fn cpl; 1837 }; 1838 1839 static void 1840 _on_thread(void *ctx) 1841 { 1842 struct call_thread *ct = ctx; 1843 int rc __attribute__((unused)); 1844 1845 ct->fn(ct->ctx); 1846 1847 pthread_mutex_lock(&g_devlist_mutex); 1848 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1849 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1850 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1851 ct->cur_thread->name); 1852 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1853 } 1854 pthread_mutex_unlock(&g_devlist_mutex); 1855 1856 if (!ct->cur_thread) { 1857 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1858 1859 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1860 free(ctx); 1861 } else { 1862 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1863 ct->cur_thread->name); 1864 1865 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1866 } 1867 assert(rc == 0); 1868 } 1869 1870 void 1871 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1872 { 1873 struct call_thread *ct; 1874 struct spdk_thread *thread; 1875 int rc __attribute__((unused)); 1876 1877 ct = calloc(1, sizeof(*ct)); 1878 if (!ct) { 1879 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1880 cpl(ctx); 1881 return; 1882 } 1883 1884 ct->fn = fn; 1885 ct->ctx = ctx; 1886 ct->cpl = cpl; 1887 1888 thread = _get_thread(); 1889 if (!thread) { 1890 SPDK_ERRLOG("No thread allocated\n"); 1891 free(ct); 1892 cpl(ctx); 1893 return; 1894 } 1895 ct->orig_thread = thread; 1896 1897 pthread_mutex_lock(&g_devlist_mutex); 1898 ct->cur_thread = TAILQ_FIRST(&g_threads); 1899 pthread_mutex_unlock(&g_devlist_mutex); 1900 1901 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1902 ct->orig_thread->name); 1903 1904 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1905 assert(rc == 0); 1906 } 1907 1908 static inline void 1909 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1910 { 1911 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1912 return; 1913 } 1914 1915 if (!poller->set_intr_cb_fn) { 1916 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1917 assert(false); 1918 return; 1919 } 1920 1921 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1922 } 1923 1924 void 1925 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1926 { 1927 struct spdk_thread *thread = _get_thread(); 1928 struct spdk_poller *poller, *tmp; 1929 1930 assert(thread); 1931 assert(spdk_interrupt_mode_is_enabled()); 1932 1933 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1934 thread->name, enable_interrupt ? "intr" : "poll", 1935 thread->in_interrupt ? "intr" : "poll"); 1936 1937 if (thread->in_interrupt == enable_interrupt) { 1938 return; 1939 } 1940 1941 /* Set pollers to expected mode */ 1942 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1943 poller_set_interrupt_mode(poller, enable_interrupt); 1944 } 1945 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1946 poller_set_interrupt_mode(poller, enable_interrupt); 1947 } 1948 /* All paused pollers will go to work in interrupt mode */ 1949 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1950 poller_set_interrupt_mode(poller, enable_interrupt); 1951 } 1952 1953 thread->in_interrupt = enable_interrupt; 1954 return; 1955 } 1956 1957 static struct io_device * 1958 io_device_get(void *io_device) 1959 { 1960 struct io_device find = {}; 1961 1962 find.io_device = io_device; 1963 return RB_FIND(io_device_tree, &g_io_devices, &find); 1964 } 1965 1966 void 1967 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1968 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1969 const char *name) 1970 { 1971 struct io_device *dev, *tmp; 1972 struct spdk_thread *thread; 1973 1974 assert(io_device != NULL); 1975 assert(create_cb != NULL); 1976 assert(destroy_cb != NULL); 1977 1978 thread = spdk_get_thread(); 1979 if (!thread) { 1980 SPDK_ERRLOG("called from non-SPDK thread\n"); 1981 assert(false); 1982 return; 1983 } 1984 1985 dev = calloc(1, sizeof(struct io_device)); 1986 if (dev == NULL) { 1987 SPDK_ERRLOG("could not allocate io_device\n"); 1988 return; 1989 } 1990 1991 dev->io_device = io_device; 1992 if (name) { 1993 snprintf(dev->name, sizeof(dev->name), "%s", name); 1994 } else { 1995 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1996 } 1997 dev->create_cb = create_cb; 1998 dev->destroy_cb = destroy_cb; 1999 dev->unregister_cb = NULL; 2000 dev->ctx_size = ctx_size; 2001 dev->for_each_count = 0; 2002 dev->unregistered = false; 2003 dev->refcnt = 0; 2004 2005 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 2006 dev->name, dev->io_device, thread->name); 2007 2008 pthread_mutex_lock(&g_devlist_mutex); 2009 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 2010 if (tmp != NULL) { 2011 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 2012 io_device, tmp->name, dev->name); 2013 free(dev); 2014 } 2015 2016 pthread_mutex_unlock(&g_devlist_mutex); 2017 } 2018 2019 static void 2020 _finish_unregister(void *arg) 2021 { 2022 struct io_device *dev = arg; 2023 struct spdk_thread *thread; 2024 2025 thread = spdk_get_thread(); 2026 assert(thread == dev->unregister_thread); 2027 2028 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2029 dev->name, dev->io_device, thread->name); 2030 2031 assert(thread->pending_unregister_count > 0); 2032 thread->pending_unregister_count--; 2033 2034 dev->unregister_cb(dev->io_device); 2035 free(dev); 2036 } 2037 2038 static void 2039 io_device_free(struct io_device *dev) 2040 { 2041 int rc __attribute__((unused)); 2042 2043 if (dev->unregister_cb == NULL) { 2044 free(dev); 2045 } else { 2046 assert(dev->unregister_thread != NULL); 2047 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2048 dev->name, dev->io_device, dev->unregister_thread->name); 2049 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2050 assert(rc == 0); 2051 } 2052 } 2053 2054 void 2055 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2056 { 2057 struct io_device *dev; 2058 uint32_t refcnt; 2059 struct spdk_thread *thread; 2060 2061 thread = spdk_get_thread(); 2062 if (!thread) { 2063 SPDK_ERRLOG("called from non-SPDK thread\n"); 2064 assert(false); 2065 return; 2066 } 2067 2068 pthread_mutex_lock(&g_devlist_mutex); 2069 dev = io_device_get(io_device); 2070 if (!dev) { 2071 SPDK_ERRLOG("io_device %p not found\n", io_device); 2072 assert(false); 2073 pthread_mutex_unlock(&g_devlist_mutex); 2074 return; 2075 } 2076 2077 /* The for_each_count check differentiates the user attempting to unregister the 2078 * device a second time, from the internal call to this function that occurs 2079 * after the for_each_count reaches 0. 2080 */ 2081 if (dev->pending_unregister && dev->for_each_count > 0) { 2082 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2083 assert(false); 2084 pthread_mutex_unlock(&g_devlist_mutex); 2085 return; 2086 } 2087 2088 dev->unregister_cb = unregister_cb; 2089 dev->unregister_thread = thread; 2090 2091 if (dev->for_each_count > 0) { 2092 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2093 dev->name, io_device, dev->for_each_count); 2094 dev->pending_unregister = true; 2095 pthread_mutex_unlock(&g_devlist_mutex); 2096 return; 2097 } 2098 2099 dev->unregistered = true; 2100 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2101 refcnt = dev->refcnt; 2102 pthread_mutex_unlock(&g_devlist_mutex); 2103 2104 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2105 dev->name, dev->io_device, thread->name); 2106 2107 if (unregister_cb) { 2108 thread->pending_unregister_count++; 2109 } 2110 2111 if (refcnt > 0) { 2112 /* defer deletion */ 2113 return; 2114 } 2115 2116 io_device_free(dev); 2117 } 2118 2119 const char * 2120 spdk_io_device_get_name(struct io_device *dev) 2121 { 2122 return dev->name; 2123 } 2124 2125 static struct spdk_io_channel * 2126 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2127 { 2128 struct spdk_io_channel find = {}; 2129 2130 find.dev = dev; 2131 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2132 } 2133 2134 struct spdk_io_channel * 2135 spdk_get_io_channel(void *io_device) 2136 { 2137 struct spdk_io_channel *ch; 2138 struct spdk_thread *thread; 2139 struct io_device *dev; 2140 int rc; 2141 2142 pthread_mutex_lock(&g_devlist_mutex); 2143 dev = io_device_get(io_device); 2144 if (dev == NULL) { 2145 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2146 pthread_mutex_unlock(&g_devlist_mutex); 2147 return NULL; 2148 } 2149 2150 thread = _get_thread(); 2151 if (!thread) { 2152 SPDK_ERRLOG("No thread allocated\n"); 2153 pthread_mutex_unlock(&g_devlist_mutex); 2154 return NULL; 2155 } 2156 2157 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2158 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2159 pthread_mutex_unlock(&g_devlist_mutex); 2160 return NULL; 2161 } 2162 2163 ch = thread_get_io_channel(thread, dev); 2164 if (ch != NULL) { 2165 ch->ref++; 2166 2167 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2168 ch, dev->name, dev->io_device, thread->name, ch->ref); 2169 2170 /* 2171 * An I/O channel already exists for this device on this 2172 * thread, so return it. 2173 */ 2174 pthread_mutex_unlock(&g_devlist_mutex); 2175 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2176 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2177 return ch; 2178 } 2179 2180 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2181 if (ch == NULL) { 2182 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2183 pthread_mutex_unlock(&g_devlist_mutex); 2184 return NULL; 2185 } 2186 2187 ch->dev = dev; 2188 ch->destroy_cb = dev->destroy_cb; 2189 ch->thread = thread; 2190 ch->ref = 1; 2191 ch->destroy_ref = 0; 2192 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2193 2194 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2195 ch, dev->name, dev->io_device, thread->name, ch->ref); 2196 2197 dev->refcnt++; 2198 2199 pthread_mutex_unlock(&g_devlist_mutex); 2200 2201 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2202 if (rc != 0) { 2203 pthread_mutex_lock(&g_devlist_mutex); 2204 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2205 dev->refcnt--; 2206 free(ch); 2207 pthread_mutex_unlock(&g_devlist_mutex); 2208 return NULL; 2209 } 2210 2211 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2212 return ch; 2213 } 2214 2215 static void 2216 put_io_channel(void *arg) 2217 { 2218 struct spdk_io_channel *ch = arg; 2219 bool do_remove_dev = true; 2220 struct spdk_thread *thread; 2221 2222 thread = spdk_get_thread(); 2223 if (!thread) { 2224 SPDK_ERRLOG("called from non-SPDK thread\n"); 2225 assert(false); 2226 return; 2227 } 2228 2229 SPDK_DEBUGLOG(thread, 2230 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2231 ch, ch->dev->name, ch->dev->io_device, thread->name); 2232 2233 assert(ch->thread == thread); 2234 2235 ch->destroy_ref--; 2236 2237 if (ch->ref > 0 || ch->destroy_ref > 0) { 2238 /* 2239 * Another reference to the associated io_device was requested 2240 * after this message was sent but before it had a chance to 2241 * execute. 2242 */ 2243 return; 2244 } 2245 2246 pthread_mutex_lock(&g_devlist_mutex); 2247 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2248 pthread_mutex_unlock(&g_devlist_mutex); 2249 2250 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2251 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2252 2253 pthread_mutex_lock(&g_devlist_mutex); 2254 ch->dev->refcnt--; 2255 2256 if (!ch->dev->unregistered) { 2257 do_remove_dev = false; 2258 } 2259 2260 if (ch->dev->refcnt > 0) { 2261 do_remove_dev = false; 2262 } 2263 2264 pthread_mutex_unlock(&g_devlist_mutex); 2265 2266 if (do_remove_dev) { 2267 io_device_free(ch->dev); 2268 } 2269 free(ch); 2270 } 2271 2272 void 2273 spdk_put_io_channel(struct spdk_io_channel *ch) 2274 { 2275 struct spdk_thread *thread; 2276 int rc __attribute__((unused)); 2277 2278 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2279 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2280 2281 thread = spdk_get_thread(); 2282 if (!thread) { 2283 SPDK_ERRLOG("called from non-SPDK thread\n"); 2284 assert(false); 2285 return; 2286 } 2287 2288 if (ch->thread != thread) { 2289 wrong_thread(__func__, "ch", ch->thread, thread); 2290 return; 2291 } 2292 2293 SPDK_DEBUGLOG(thread, 2294 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2295 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2296 2297 ch->ref--; 2298 2299 if (ch->ref == 0) { 2300 ch->destroy_ref++; 2301 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2302 assert(rc == 0); 2303 } 2304 } 2305 2306 struct spdk_io_channel * 2307 spdk_io_channel_from_ctx(void *ctx) 2308 { 2309 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2310 } 2311 2312 struct spdk_thread * 2313 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2314 { 2315 return ch->thread; 2316 } 2317 2318 void * 2319 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2320 { 2321 return ch->dev->io_device; 2322 } 2323 2324 const char * 2325 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2326 { 2327 return spdk_io_device_get_name(ch->dev); 2328 } 2329 2330 int 2331 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2332 { 2333 return ch->ref; 2334 } 2335 2336 struct spdk_io_channel_iter { 2337 void *io_device; 2338 struct io_device *dev; 2339 spdk_channel_msg fn; 2340 int status; 2341 void *ctx; 2342 struct spdk_io_channel *ch; 2343 2344 struct spdk_thread *cur_thread; 2345 2346 struct spdk_thread *orig_thread; 2347 spdk_channel_for_each_cpl cpl; 2348 }; 2349 2350 void * 2351 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2352 { 2353 return i->io_device; 2354 } 2355 2356 struct spdk_io_channel * 2357 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2358 { 2359 return i->ch; 2360 } 2361 2362 void * 2363 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2364 { 2365 return i->ctx; 2366 } 2367 2368 static void 2369 _call_completion(void *ctx) 2370 { 2371 struct spdk_io_channel_iter *i = ctx; 2372 2373 if (i->cpl != NULL) { 2374 i->cpl(i, i->status); 2375 } 2376 free(i); 2377 } 2378 2379 static void 2380 _call_channel(void *ctx) 2381 { 2382 struct spdk_io_channel_iter *i = ctx; 2383 struct spdk_io_channel *ch; 2384 2385 /* 2386 * It is possible that the channel was deleted before this 2387 * message had a chance to execute. If so, skip calling 2388 * the fn() on this thread. 2389 */ 2390 pthread_mutex_lock(&g_devlist_mutex); 2391 ch = thread_get_io_channel(i->cur_thread, i->dev); 2392 pthread_mutex_unlock(&g_devlist_mutex); 2393 2394 if (ch) { 2395 i->fn(i); 2396 } else { 2397 spdk_for_each_channel_continue(i, 0); 2398 } 2399 } 2400 2401 void 2402 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2403 spdk_channel_for_each_cpl cpl) 2404 { 2405 struct spdk_thread *thread; 2406 struct spdk_io_channel *ch; 2407 struct spdk_io_channel_iter *i; 2408 int rc __attribute__((unused)); 2409 2410 i = calloc(1, sizeof(*i)); 2411 if (!i) { 2412 SPDK_ERRLOG("Unable to allocate iterator\n"); 2413 assert(false); 2414 return; 2415 } 2416 2417 i->io_device = io_device; 2418 i->fn = fn; 2419 i->ctx = ctx; 2420 i->cpl = cpl; 2421 i->orig_thread = _get_thread(); 2422 2423 pthread_mutex_lock(&g_devlist_mutex); 2424 i->dev = io_device_get(io_device); 2425 if (i->dev == NULL) { 2426 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2427 assert(false); 2428 i->status = -ENODEV; 2429 goto end; 2430 } 2431 2432 /* Do not allow new for_each operations if we are already waiting to unregister 2433 * the device for other for_each operations to complete. 2434 */ 2435 if (i->dev->pending_unregister) { 2436 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2437 i->status = -ENODEV; 2438 goto end; 2439 } 2440 2441 TAILQ_FOREACH(thread, &g_threads, tailq) { 2442 ch = thread_get_io_channel(thread, i->dev); 2443 if (ch != NULL) { 2444 ch->dev->for_each_count++; 2445 i->cur_thread = thread; 2446 i->ch = ch; 2447 pthread_mutex_unlock(&g_devlist_mutex); 2448 rc = spdk_thread_send_msg(thread, _call_channel, i); 2449 assert(rc == 0); 2450 return; 2451 } 2452 } 2453 2454 end: 2455 pthread_mutex_unlock(&g_devlist_mutex); 2456 2457 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2458 assert(rc == 0); 2459 } 2460 2461 static void 2462 __pending_unregister(void *arg) 2463 { 2464 struct io_device *dev = arg; 2465 2466 assert(dev->pending_unregister); 2467 assert(dev->for_each_count == 0); 2468 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2469 } 2470 2471 void 2472 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2473 { 2474 struct spdk_thread *thread; 2475 struct spdk_io_channel *ch; 2476 struct io_device *dev; 2477 int rc __attribute__((unused)); 2478 2479 assert(i->cur_thread == spdk_get_thread()); 2480 2481 i->status = status; 2482 2483 pthread_mutex_lock(&g_devlist_mutex); 2484 dev = i->dev; 2485 if (status) { 2486 goto end; 2487 } 2488 2489 thread = TAILQ_NEXT(i->cur_thread, tailq); 2490 while (thread) { 2491 ch = thread_get_io_channel(thread, dev); 2492 if (ch != NULL) { 2493 i->cur_thread = thread; 2494 i->ch = ch; 2495 pthread_mutex_unlock(&g_devlist_mutex); 2496 rc = spdk_thread_send_msg(thread, _call_channel, i); 2497 assert(rc == 0); 2498 return; 2499 } 2500 thread = TAILQ_NEXT(thread, tailq); 2501 } 2502 2503 end: 2504 dev->for_each_count--; 2505 i->ch = NULL; 2506 pthread_mutex_unlock(&g_devlist_mutex); 2507 2508 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2509 assert(rc == 0); 2510 2511 pthread_mutex_lock(&g_devlist_mutex); 2512 if (dev->pending_unregister && dev->for_each_count == 0) { 2513 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2514 assert(rc == 0); 2515 } 2516 pthread_mutex_unlock(&g_devlist_mutex); 2517 } 2518 2519 struct spdk_interrupt { 2520 int efd; 2521 struct spdk_thread *thread; 2522 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2523 }; 2524 2525 static void 2526 thread_interrupt_destroy(struct spdk_thread *thread) 2527 { 2528 struct spdk_fd_group *fgrp = thread->fgrp; 2529 2530 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2531 2532 if (thread->msg_fd < 0) { 2533 return; 2534 } 2535 2536 spdk_fd_group_remove(fgrp, thread->msg_fd); 2537 close(thread->msg_fd); 2538 thread->msg_fd = -1; 2539 2540 spdk_fd_group_destroy(fgrp); 2541 thread->fgrp = NULL; 2542 } 2543 2544 #ifdef __linux__ 2545 static int 2546 thread_interrupt_msg_process(void *arg) 2547 { 2548 struct spdk_thread *thread = arg; 2549 uint32_t msg_count; 2550 spdk_msg_fn critical_msg; 2551 int rc = 0; 2552 uint64_t notify = 1; 2553 2554 assert(spdk_interrupt_mode_is_enabled()); 2555 2556 /* There may be race between msg_acknowledge and another producer's msg_notify, 2557 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2558 * This can avoid msg notification missing. 2559 */ 2560 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2561 if (rc < 0 && errno != EAGAIN) { 2562 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2563 } 2564 2565 critical_msg = thread->critical_msg; 2566 if (spdk_unlikely(critical_msg != NULL)) { 2567 critical_msg(NULL); 2568 thread->critical_msg = NULL; 2569 rc = 1; 2570 } 2571 2572 msg_count = msg_queue_run_batch(thread, 0); 2573 if (msg_count) { 2574 rc = 1; 2575 } 2576 2577 return rc; 2578 } 2579 2580 static int 2581 thread_interrupt_create(struct spdk_thread *thread) 2582 { 2583 int rc; 2584 2585 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2586 2587 rc = spdk_fd_group_create(&thread->fgrp); 2588 if (rc) { 2589 return rc; 2590 } 2591 2592 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2593 if (thread->msg_fd < 0) { 2594 rc = -errno; 2595 spdk_fd_group_destroy(thread->fgrp); 2596 thread->fgrp = NULL; 2597 2598 return rc; 2599 } 2600 2601 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2602 thread_interrupt_msg_process, thread); 2603 } 2604 #else 2605 static int 2606 thread_interrupt_create(struct spdk_thread *thread) 2607 { 2608 return -ENOTSUP; 2609 } 2610 #endif 2611 2612 struct spdk_interrupt * 2613 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2614 void *arg, const char *name) 2615 { 2616 struct spdk_thread *thread; 2617 struct spdk_interrupt *intr; 2618 int ret; 2619 2620 thread = spdk_get_thread(); 2621 if (!thread) { 2622 assert(false); 2623 return NULL; 2624 } 2625 2626 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2627 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2628 return NULL; 2629 } 2630 2631 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2632 2633 if (ret != 0) { 2634 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2635 thread->name, efd, spdk_strerror(-ret)); 2636 return NULL; 2637 } 2638 2639 intr = calloc(1, sizeof(*intr)); 2640 if (intr == NULL) { 2641 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2642 return NULL; 2643 } 2644 2645 if (name) { 2646 snprintf(intr->name, sizeof(intr->name), "%s", name); 2647 } else { 2648 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2649 } 2650 2651 intr->efd = efd; 2652 intr->thread = thread; 2653 2654 return intr; 2655 } 2656 2657 void 2658 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2659 { 2660 struct spdk_thread *thread; 2661 struct spdk_interrupt *intr; 2662 2663 intr = *pintr; 2664 if (intr == NULL) { 2665 return; 2666 } 2667 2668 *pintr = NULL; 2669 2670 thread = spdk_get_thread(); 2671 if (!thread) { 2672 assert(false); 2673 return; 2674 } 2675 2676 if (intr->thread != thread) { 2677 wrong_thread(__func__, intr->name, intr->thread, thread); 2678 return; 2679 } 2680 2681 spdk_fd_group_remove(thread->fgrp, intr->efd); 2682 free(intr); 2683 } 2684 2685 int 2686 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2687 enum spdk_interrupt_event_types event_types) 2688 { 2689 struct spdk_thread *thread; 2690 2691 thread = spdk_get_thread(); 2692 if (!thread) { 2693 assert(false); 2694 return -EINVAL; 2695 } 2696 2697 if (intr->thread != thread) { 2698 wrong_thread(__func__, intr->name, intr->thread, thread); 2699 return -EINVAL; 2700 } 2701 2702 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2703 } 2704 2705 int 2706 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2707 { 2708 return spdk_fd_group_get_fd(thread->fgrp); 2709 } 2710 2711 static bool g_interrupt_mode = false; 2712 2713 int 2714 spdk_interrupt_mode_enable(void) 2715 { 2716 /* It must be called once prior to initializing the threading library. 2717 * g_spdk_msg_mempool will be valid if thread library is initialized. 2718 */ 2719 if (g_spdk_msg_mempool) { 2720 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2721 return -1; 2722 } 2723 2724 #ifdef __linux__ 2725 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2726 g_interrupt_mode = true; 2727 return 0; 2728 #else 2729 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2730 g_interrupt_mode = false; 2731 return -ENOTSUP; 2732 #endif 2733 } 2734 2735 bool 2736 spdk_interrupt_mode_is_enabled(void) 2737 { 2738 return g_interrupt_mode; 2739 } 2740 2741 SPDK_LOG_REGISTER_COMPONENT(thread) 2742