1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/env.h" 9 #include "spdk/likely.h" 10 #include "spdk/queue.h" 11 #include "spdk/string.h" 12 #include "spdk/thread.h" 13 #include "spdk/trace.h" 14 #include "spdk/util.h" 15 #include "spdk/fd_group.h" 16 17 #include "spdk/log.h" 18 #include "spdk_internal/thread.h" 19 #include "spdk_internal/usdt.h" 20 #include "thread_internal.h" 21 22 #include "spdk_internal/trace_defs.h" 23 24 #ifdef __linux__ 25 #include <sys/timerfd.h> 26 #include <sys/eventfd.h> 27 #endif 28 29 #define SPDK_MSG_BATCH_SIZE 8 30 #define SPDK_MAX_DEVICE_NAME_LEN 256 31 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 32 #define SPDK_MAX_POLLER_NAME_LEN 256 33 #define SPDK_MAX_THREAD_NAME_LEN 256 34 35 enum spdk_poller_state { 36 /* The poller is registered with a thread but not currently executing its fn. */ 37 SPDK_POLLER_STATE_WAITING, 38 39 /* The poller is currently running its fn. */ 40 SPDK_POLLER_STATE_RUNNING, 41 42 /* The poller was unregistered during the execution of its fn. */ 43 SPDK_POLLER_STATE_UNREGISTERED, 44 45 /* The poller is in the process of being paused. It will be paused 46 * during the next time it's supposed to be executed. 47 */ 48 SPDK_POLLER_STATE_PAUSING, 49 50 /* The poller is registered but currently paused. It's on the 51 * paused_pollers list. 52 */ 53 SPDK_POLLER_STATE_PAUSED, 54 }; 55 56 struct spdk_poller { 57 TAILQ_ENTRY(spdk_poller) tailq; 58 RB_ENTRY(spdk_poller) node; 59 60 /* Current state of the poller; should only be accessed from the poller's thread. */ 61 enum spdk_poller_state state; 62 63 uint64_t period_ticks; 64 uint64_t next_run_tick; 65 uint64_t run_count; 66 uint64_t busy_count; 67 uint64_t id; 68 spdk_poller_fn fn; 69 void *arg; 70 struct spdk_thread *thread; 71 /* Native interruptfd for period or busy poller */ 72 int interruptfd; 73 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 74 void *set_intr_cb_arg; 75 76 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 77 }; 78 79 enum spdk_thread_state { 80 /* The thread is processing poller and message by spdk_thread_poll(). */ 81 SPDK_THREAD_STATE_RUNNING, 82 83 /* The thread is in the process of termination. It reaps unregistering 84 * poller are releasing I/O channel. 85 */ 86 SPDK_THREAD_STATE_EXITING, 87 88 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 89 SPDK_THREAD_STATE_EXITED, 90 }; 91 92 struct spdk_thread { 93 uint64_t tsc_last; 94 struct spdk_thread_stats stats; 95 /* 96 * Contains pollers actively running on this thread. Pollers 97 * are run round-robin. The thread takes one poller from the head 98 * of the ring, executes it, then puts it back at the tail of 99 * the ring. 100 */ 101 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 102 /** 103 * Contains pollers running on this thread with a periodic timer. 104 */ 105 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 106 struct spdk_poller *first_timed_poller; 107 /* 108 * Contains paused pollers. Pollers on this queue are waiting until 109 * they are resumed (in which case they're put onto the active/timer 110 * queues) or unregistered. 111 */ 112 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 113 struct spdk_ring *messages; 114 int msg_fd; 115 SLIST_HEAD(, spdk_msg) msg_cache; 116 size_t msg_cache_count; 117 spdk_msg_fn critical_msg; 118 uint64_t id; 119 uint64_t next_poller_id; 120 enum spdk_thread_state state; 121 int pending_unregister_count; 122 123 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 124 TAILQ_ENTRY(spdk_thread) tailq; 125 126 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 127 struct spdk_cpuset cpumask; 128 uint64_t exit_timeout_tsc; 129 130 /* Indicates whether this spdk_thread currently runs in interrupt. */ 131 bool in_interrupt; 132 bool poller_unregistered; 133 struct spdk_fd_group *fgrp; 134 135 /* User context allocated at the end */ 136 uint8_t ctx[0]; 137 }; 138 139 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 140 141 static spdk_new_thread_fn g_new_thread_fn = NULL; 142 static spdk_thread_op_fn g_thread_op_fn = NULL; 143 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 144 static size_t g_ctx_sz = 0; 145 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 146 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 147 * SPDK application is required. 148 */ 149 static uint64_t g_thread_id = 1; 150 151 struct io_device { 152 void *io_device; 153 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 154 spdk_io_channel_create_cb create_cb; 155 spdk_io_channel_destroy_cb destroy_cb; 156 spdk_io_device_unregister_cb unregister_cb; 157 struct spdk_thread *unregister_thread; 158 uint32_t ctx_size; 159 uint32_t for_each_count; 160 RB_ENTRY(io_device) node; 161 162 uint32_t refcnt; 163 164 bool pending_unregister; 165 bool unregistered; 166 }; 167 168 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 169 170 static int 171 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 172 { 173 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 174 } 175 176 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 177 178 static int 179 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 180 { 181 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 182 } 183 184 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 185 186 struct spdk_msg { 187 spdk_msg_fn fn; 188 void *arg; 189 190 SLIST_ENTRY(spdk_msg) link; 191 }; 192 193 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 194 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 195 196 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 197 static uint32_t g_thread_count = 0; 198 199 static __thread struct spdk_thread *tls_thread = NULL; 200 201 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 202 { 203 spdk_trace_register_description("THREAD_IOCH_GET", 204 TRACE_THREAD_IOCH_GET, 205 OWNER_NONE, OBJECT_NONE, 0, 206 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 207 spdk_trace_register_description("THREAD_IOCH_PUT", 208 TRACE_THREAD_IOCH_PUT, 209 OWNER_NONE, OBJECT_NONE, 0, 210 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 211 } 212 213 /* 214 * If this compare function returns zero when two next_run_ticks are equal, 215 * the macro RB_INSERT() returns a pointer to the element with the same 216 * next_run_tick. 217 * 218 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 219 * to remove as a parameter. 220 * 221 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 222 * side by returning 1 when two next_run_ticks are equal. 223 */ 224 static inline int 225 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 226 { 227 if (poller1->next_run_tick < poller2->next_run_tick) { 228 return -1; 229 } else { 230 return 1; 231 } 232 } 233 234 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 235 236 static inline struct spdk_thread * 237 _get_thread(void) 238 { 239 return tls_thread; 240 } 241 242 static int 243 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 244 { 245 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 246 247 g_ctx_sz = ctx_sz; 248 249 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 250 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 251 sizeof(struct spdk_msg), 252 0, /* No cache. We do our own. */ 253 SPDK_ENV_SOCKET_ID_ANY); 254 255 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 256 msg_mempool_sz); 257 258 if (!g_spdk_msg_mempool) { 259 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 260 return -ENOMEM; 261 } 262 263 return 0; 264 } 265 266 int 267 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 268 { 269 assert(g_new_thread_fn == NULL); 270 assert(g_thread_op_fn == NULL); 271 272 if (new_thread_fn == NULL) { 273 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 274 } else { 275 g_new_thread_fn = new_thread_fn; 276 } 277 278 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 279 } 280 281 int 282 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 283 spdk_thread_op_supported_fn thread_op_supported_fn, 284 size_t ctx_sz, size_t msg_mempool_sz) 285 { 286 assert(g_new_thread_fn == NULL); 287 assert(g_thread_op_fn == NULL); 288 assert(g_thread_op_supported_fn == NULL); 289 290 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 291 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 292 return -EINVAL; 293 } 294 295 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 296 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 297 } else { 298 g_thread_op_fn = thread_op_fn; 299 g_thread_op_supported_fn = thread_op_supported_fn; 300 } 301 302 return _thread_lib_init(ctx_sz, msg_mempool_sz); 303 } 304 305 void 306 spdk_thread_lib_fini(void) 307 { 308 struct io_device *dev; 309 310 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 311 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 312 } 313 314 if (g_spdk_msg_mempool) { 315 spdk_mempool_free(g_spdk_msg_mempool); 316 g_spdk_msg_mempool = NULL; 317 } 318 319 g_new_thread_fn = NULL; 320 g_thread_op_fn = NULL; 321 g_thread_op_supported_fn = NULL; 322 g_ctx_sz = 0; 323 } 324 325 static void thread_interrupt_destroy(struct spdk_thread *thread); 326 static int thread_interrupt_create(struct spdk_thread *thread); 327 328 static void 329 _free_thread(struct spdk_thread *thread) 330 { 331 struct spdk_io_channel *ch; 332 struct spdk_msg *msg; 333 struct spdk_poller *poller, *ptmp; 334 335 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 336 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 337 thread->name, ch->dev->name); 338 } 339 340 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 341 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 342 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 343 poller->name); 344 } 345 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 346 free(poller); 347 } 348 349 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 350 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 351 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 352 poller->name); 353 } 354 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 355 free(poller); 356 } 357 358 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 359 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 360 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 361 free(poller); 362 } 363 364 pthread_mutex_lock(&g_devlist_mutex); 365 assert(g_thread_count > 0); 366 g_thread_count--; 367 TAILQ_REMOVE(&g_threads, thread, tailq); 368 pthread_mutex_unlock(&g_devlist_mutex); 369 370 msg = SLIST_FIRST(&thread->msg_cache); 371 while (msg != NULL) { 372 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 373 374 assert(thread->msg_cache_count > 0); 375 thread->msg_cache_count--; 376 spdk_mempool_put(g_spdk_msg_mempool, msg); 377 378 msg = SLIST_FIRST(&thread->msg_cache); 379 } 380 381 assert(thread->msg_cache_count == 0); 382 383 if (spdk_interrupt_mode_is_enabled()) { 384 thread_interrupt_destroy(thread); 385 } 386 387 spdk_ring_free(thread->messages); 388 free(thread); 389 } 390 391 struct spdk_thread * 392 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 393 { 394 struct spdk_thread *thread; 395 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 396 int rc = 0, i; 397 398 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 399 if (!thread) { 400 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 401 return NULL; 402 } 403 404 if (cpumask) { 405 spdk_cpuset_copy(&thread->cpumask, cpumask); 406 } else { 407 spdk_cpuset_negate(&thread->cpumask); 408 } 409 410 RB_INIT(&thread->io_channels); 411 TAILQ_INIT(&thread->active_pollers); 412 RB_INIT(&thread->timed_pollers); 413 TAILQ_INIT(&thread->paused_pollers); 414 SLIST_INIT(&thread->msg_cache); 415 thread->msg_cache_count = 0; 416 417 thread->tsc_last = spdk_get_ticks(); 418 419 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 420 * ID exceeds UINT64_MAX a warning message is logged 421 */ 422 thread->next_poller_id = 1; 423 424 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 425 if (!thread->messages) { 426 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 427 free(thread); 428 return NULL; 429 } 430 431 /* Fill the local message pool cache. */ 432 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 433 if (rc == 0) { 434 /* If we can't populate the cache it's ok. The cache will get filled 435 * up organically as messages are passed to the thread. */ 436 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 437 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 438 thread->msg_cache_count++; 439 } 440 } 441 442 if (name) { 443 snprintf(thread->name, sizeof(thread->name), "%s", name); 444 } else { 445 snprintf(thread->name, sizeof(thread->name), "%p", thread); 446 } 447 448 pthread_mutex_lock(&g_devlist_mutex); 449 if (g_thread_id == 0) { 450 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 451 pthread_mutex_unlock(&g_devlist_mutex); 452 _free_thread(thread); 453 return NULL; 454 } 455 thread->id = g_thread_id++; 456 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 457 g_thread_count++; 458 pthread_mutex_unlock(&g_devlist_mutex); 459 460 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 461 thread->id, thread->name); 462 463 if (spdk_interrupt_mode_is_enabled()) { 464 thread->in_interrupt = true; 465 rc = thread_interrupt_create(thread); 466 if (rc != 0) { 467 _free_thread(thread); 468 return NULL; 469 } 470 } 471 472 if (g_new_thread_fn) { 473 rc = g_new_thread_fn(thread); 474 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 475 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 476 } 477 478 if (rc != 0) { 479 _free_thread(thread); 480 return NULL; 481 } 482 483 thread->state = SPDK_THREAD_STATE_RUNNING; 484 485 return thread; 486 } 487 488 void 489 spdk_set_thread(struct spdk_thread *thread) 490 { 491 tls_thread = thread; 492 } 493 494 static void 495 thread_exit(struct spdk_thread *thread, uint64_t now) 496 { 497 struct spdk_poller *poller; 498 struct spdk_io_channel *ch; 499 500 if (now >= thread->exit_timeout_tsc) { 501 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 502 thread->name); 503 goto exited; 504 } 505 506 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 507 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 508 SPDK_INFOLOG(thread, 509 "thread %s still has active poller %s\n", 510 thread->name, poller->name); 511 return; 512 } 513 } 514 515 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 516 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 517 SPDK_INFOLOG(thread, 518 "thread %s still has active timed poller %s\n", 519 thread->name, poller->name); 520 return; 521 } 522 } 523 524 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 525 SPDK_INFOLOG(thread, 526 "thread %s still has paused poller %s\n", 527 thread->name, poller->name); 528 return; 529 } 530 531 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 532 SPDK_INFOLOG(thread, 533 "thread %s still has channel for io_device %s\n", 534 thread->name, ch->dev->name); 535 return; 536 } 537 538 if (thread->pending_unregister_count > 0) { 539 SPDK_INFOLOG(thread, 540 "thread %s is still unregistering io_devices\n", 541 thread->name); 542 return; 543 } 544 545 exited: 546 thread->state = SPDK_THREAD_STATE_EXITED; 547 if (spdk_unlikely(thread->in_interrupt)) { 548 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 549 } 550 } 551 552 int 553 spdk_thread_exit(struct spdk_thread *thread) 554 { 555 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 556 557 assert(tls_thread == thread); 558 559 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 560 SPDK_INFOLOG(thread, 561 "thread %s is already exiting\n", 562 thread->name); 563 return 0; 564 } 565 566 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 567 SPDK_THREAD_EXIT_TIMEOUT_SEC); 568 thread->state = SPDK_THREAD_STATE_EXITING; 569 return 0; 570 } 571 572 bool 573 spdk_thread_is_exited(struct spdk_thread *thread) 574 { 575 return thread->state == SPDK_THREAD_STATE_EXITED; 576 } 577 578 void 579 spdk_thread_destroy(struct spdk_thread *thread) 580 { 581 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 582 583 assert(thread->state == SPDK_THREAD_STATE_EXITED); 584 585 if (tls_thread == thread) { 586 tls_thread = NULL; 587 } 588 589 _free_thread(thread); 590 } 591 592 void * 593 spdk_thread_get_ctx(struct spdk_thread *thread) 594 { 595 if (g_ctx_sz > 0) { 596 return thread->ctx; 597 } 598 599 return NULL; 600 } 601 602 struct spdk_cpuset * 603 spdk_thread_get_cpumask(struct spdk_thread *thread) 604 { 605 return &thread->cpumask; 606 } 607 608 int 609 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 610 { 611 struct spdk_thread *thread; 612 613 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 614 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 615 assert(false); 616 return -ENOTSUP; 617 } 618 619 thread = spdk_get_thread(); 620 if (!thread) { 621 SPDK_ERRLOG("Called from non-SPDK thread\n"); 622 assert(false); 623 return -EINVAL; 624 } 625 626 spdk_cpuset_copy(&thread->cpumask, cpumask); 627 628 /* Invoke framework's reschedule operation. If this function is called multiple times 629 * in a single spdk_thread_poll() context, the last cpumask will be used in the 630 * reschedule operation. 631 */ 632 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 633 634 return 0; 635 } 636 637 struct spdk_thread * 638 spdk_thread_get_from_ctx(void *ctx) 639 { 640 if (ctx == NULL) { 641 assert(false); 642 return NULL; 643 } 644 645 assert(g_ctx_sz > 0); 646 647 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 648 } 649 650 static inline uint32_t 651 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 652 { 653 unsigned count, i; 654 void *messages[SPDK_MSG_BATCH_SIZE]; 655 uint64_t notify = 1; 656 int rc; 657 658 #ifdef DEBUG 659 /* 660 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 661 * so we will never actually read uninitialized data from events, but just to be sure 662 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 663 */ 664 memset(messages, 0, sizeof(messages)); 665 #endif 666 667 if (max_msgs > 0) { 668 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 669 } else { 670 max_msgs = SPDK_MSG_BATCH_SIZE; 671 } 672 673 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 674 if (spdk_unlikely(thread->in_interrupt) && 675 spdk_ring_count(thread->messages) != 0) { 676 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 677 if (rc < 0) { 678 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 679 } 680 } 681 if (count == 0) { 682 return 0; 683 } 684 685 for (i = 0; i < count; i++) { 686 struct spdk_msg *msg = messages[i]; 687 688 assert(msg != NULL); 689 690 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 691 692 msg->fn(msg->arg); 693 694 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 695 /* Insert the messages at the head. We want to re-use the hot 696 * ones. */ 697 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 698 thread->msg_cache_count++; 699 } else { 700 spdk_mempool_put(g_spdk_msg_mempool, msg); 701 } 702 } 703 704 return count; 705 } 706 707 static void 708 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 709 { 710 struct spdk_poller *tmp __attribute__((unused)); 711 712 poller->next_run_tick = now + poller->period_ticks; 713 714 /* 715 * Insert poller in the thread's timed_pollers tree by next scheduled run time 716 * as its key. 717 */ 718 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 719 assert(tmp == NULL); 720 721 /* Update the cache only if it is empty or the inserted poller is earlier than it. 722 * RB_MIN() is not necessary here because all pollers, which has exactly the same 723 * next_run_tick as the existing poller, are inserted on the right side. 724 */ 725 if (thread->first_timed_poller == NULL || 726 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 727 thread->first_timed_poller = poller; 728 } 729 } 730 731 static inline void 732 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 733 { 734 struct spdk_poller *tmp __attribute__((unused)); 735 736 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 737 assert(tmp != NULL); 738 739 /* This function is not used in any case that is performance critical. 740 * Update the cache simply by RB_MIN() if it needs to be changed. 741 */ 742 if (thread->first_timed_poller == poller) { 743 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 744 } 745 } 746 747 static void 748 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 749 { 750 if (poller->period_ticks) { 751 poller_insert_timer(thread, poller, spdk_get_ticks()); 752 } else { 753 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 754 } 755 } 756 757 static inline void 758 thread_update_stats(struct spdk_thread *thread, uint64_t end, 759 uint64_t start, int rc) 760 { 761 if (rc == 0) { 762 /* Poller status idle */ 763 thread->stats.idle_tsc += end - start; 764 } else if (rc > 0) { 765 /* Poller status busy */ 766 thread->stats.busy_tsc += end - start; 767 } 768 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 769 thread->tsc_last = end; 770 } 771 772 static inline int 773 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 774 { 775 int rc; 776 777 switch (poller->state) { 778 case SPDK_POLLER_STATE_UNREGISTERED: 779 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 780 free(poller); 781 return 0; 782 case SPDK_POLLER_STATE_PAUSING: 783 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 784 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 785 poller->state = SPDK_POLLER_STATE_PAUSED; 786 return 0; 787 case SPDK_POLLER_STATE_WAITING: 788 break; 789 default: 790 assert(false); 791 break; 792 } 793 794 poller->state = SPDK_POLLER_STATE_RUNNING; 795 rc = poller->fn(poller->arg); 796 797 poller->run_count++; 798 if (rc > 0) { 799 poller->busy_count++; 800 } 801 802 #ifdef DEBUG 803 if (rc == -1) { 804 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 805 } 806 #endif 807 808 switch (poller->state) { 809 case SPDK_POLLER_STATE_UNREGISTERED: 810 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 811 free(poller); 812 break; 813 case SPDK_POLLER_STATE_PAUSING: 814 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 815 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 816 poller->state = SPDK_POLLER_STATE_PAUSED; 817 break; 818 case SPDK_POLLER_STATE_PAUSED: 819 case SPDK_POLLER_STATE_WAITING: 820 break; 821 case SPDK_POLLER_STATE_RUNNING: 822 poller->state = SPDK_POLLER_STATE_WAITING; 823 break; 824 default: 825 assert(false); 826 break; 827 } 828 829 return rc; 830 } 831 832 static inline int 833 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 834 uint64_t now) 835 { 836 int rc; 837 838 switch (poller->state) { 839 case SPDK_POLLER_STATE_UNREGISTERED: 840 free(poller); 841 return 0; 842 case SPDK_POLLER_STATE_PAUSING: 843 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 844 poller->state = SPDK_POLLER_STATE_PAUSED; 845 return 0; 846 case SPDK_POLLER_STATE_WAITING: 847 break; 848 default: 849 assert(false); 850 break; 851 } 852 853 poller->state = SPDK_POLLER_STATE_RUNNING; 854 rc = poller->fn(poller->arg); 855 856 poller->run_count++; 857 if (rc > 0) { 858 poller->busy_count++; 859 } 860 861 #ifdef DEBUG 862 if (rc == -1) { 863 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 864 } 865 #endif 866 867 switch (poller->state) { 868 case SPDK_POLLER_STATE_UNREGISTERED: 869 free(poller); 870 break; 871 case SPDK_POLLER_STATE_PAUSING: 872 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 873 poller->state = SPDK_POLLER_STATE_PAUSED; 874 break; 875 case SPDK_POLLER_STATE_PAUSED: 876 break; 877 case SPDK_POLLER_STATE_RUNNING: 878 poller->state = SPDK_POLLER_STATE_WAITING; 879 /* fallthrough */ 880 case SPDK_POLLER_STATE_WAITING: 881 poller_insert_timer(thread, poller, now); 882 break; 883 default: 884 assert(false); 885 break; 886 } 887 888 return rc; 889 } 890 891 static int 892 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 893 { 894 uint32_t msg_count; 895 struct spdk_poller *poller, *tmp; 896 spdk_msg_fn critical_msg; 897 int rc = 0; 898 899 thread->tsc_last = now; 900 901 critical_msg = thread->critical_msg; 902 if (spdk_unlikely(critical_msg != NULL)) { 903 critical_msg(NULL); 904 thread->critical_msg = NULL; 905 rc = 1; 906 } 907 908 msg_count = msg_queue_run_batch(thread, max_msgs); 909 if (msg_count) { 910 rc = 1; 911 } 912 913 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 914 active_pollers_head, tailq, tmp) { 915 int poller_rc; 916 917 poller_rc = thread_execute_poller(thread, poller); 918 if (poller_rc > rc) { 919 rc = poller_rc; 920 } 921 } 922 923 poller = thread->first_timed_poller; 924 while (poller != NULL) { 925 int timer_rc = 0; 926 927 if (now < poller->next_run_tick) { 928 break; 929 } 930 931 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 932 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 933 934 /* Update the cache to the next timed poller in the list 935 * only if the current poller is still the closest, otherwise, 936 * do nothing because the cache has been already updated. 937 */ 938 if (thread->first_timed_poller == poller) { 939 thread->first_timed_poller = tmp; 940 } 941 942 timer_rc = thread_execute_timed_poller(thread, poller, now); 943 if (timer_rc > rc) { 944 rc = timer_rc; 945 } 946 947 poller = tmp; 948 } 949 950 return rc; 951 } 952 953 int 954 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 955 { 956 struct spdk_thread *orig_thread; 957 int rc; 958 uint64_t notify = 1; 959 960 orig_thread = _get_thread(); 961 tls_thread = thread; 962 963 if (now == 0) { 964 now = spdk_get_ticks(); 965 } 966 967 if (spdk_likely(!thread->in_interrupt)) { 968 rc = thread_poll(thread, max_msgs, now); 969 if (spdk_unlikely(thread->in_interrupt)) { 970 /* The thread transitioned to interrupt mode during the above poll. 971 * Poll it one more time in case that during the transition time 972 * there is msg received without notification. 973 */ 974 rc = thread_poll(thread, max_msgs, now); 975 } 976 } else { 977 /* Non-block wait on thread's fd_group */ 978 rc = spdk_fd_group_wait(thread->fgrp, 0); 979 if (spdk_unlikely(!thread->in_interrupt)) { 980 /* The thread transitioned to poll mode in a msg during the above processing. 981 * Clear msg_fd since thread messages will be polled directly in poll mode. 982 */ 983 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 984 if (rc < 0 && errno != EAGAIN) { 985 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 986 } 987 } 988 989 /* Reap unregistered pollers out of poller execution in intr mode */ 990 if (spdk_unlikely(thread->poller_unregistered)) { 991 struct spdk_poller *poller, *tmp; 992 993 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 994 active_pollers_head, tailq, tmp) { 995 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 996 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 997 free(poller); 998 } 999 } 1000 1001 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1002 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1003 poller_remove_timer(thread, poller); 1004 free(poller); 1005 } 1006 } 1007 1008 thread->poller_unregistered = false; 1009 } 1010 } 1011 1012 1013 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1014 thread_exit(thread, now); 1015 } 1016 1017 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1018 1019 tls_thread = orig_thread; 1020 1021 return rc; 1022 } 1023 1024 uint64_t 1025 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1026 { 1027 struct spdk_poller *poller; 1028 1029 poller = thread->first_timed_poller; 1030 if (poller) { 1031 return poller->next_run_tick; 1032 } 1033 1034 return 0; 1035 } 1036 1037 int 1038 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1039 { 1040 return !TAILQ_EMPTY(&thread->active_pollers); 1041 } 1042 1043 static bool 1044 thread_has_unpaused_pollers(struct spdk_thread *thread) 1045 { 1046 if (TAILQ_EMPTY(&thread->active_pollers) && 1047 RB_EMPTY(&thread->timed_pollers)) { 1048 return false; 1049 } 1050 1051 return true; 1052 } 1053 1054 bool 1055 spdk_thread_has_pollers(struct spdk_thread *thread) 1056 { 1057 if (!thread_has_unpaused_pollers(thread) && 1058 TAILQ_EMPTY(&thread->paused_pollers)) { 1059 return false; 1060 } 1061 1062 return true; 1063 } 1064 1065 bool 1066 spdk_thread_is_idle(struct spdk_thread *thread) 1067 { 1068 if (spdk_ring_count(thread->messages) || 1069 thread_has_unpaused_pollers(thread) || 1070 thread->critical_msg != NULL) { 1071 return false; 1072 } 1073 1074 return true; 1075 } 1076 1077 uint32_t 1078 spdk_thread_get_count(void) 1079 { 1080 /* 1081 * Return cached value of the current thread count. We could acquire the 1082 * lock and iterate through the TAILQ of threads to count them, but that 1083 * count could still be invalidated after we release the lock. 1084 */ 1085 return g_thread_count; 1086 } 1087 1088 struct spdk_thread * 1089 spdk_get_thread(void) 1090 { 1091 return _get_thread(); 1092 } 1093 1094 const char * 1095 spdk_thread_get_name(const struct spdk_thread *thread) 1096 { 1097 return thread->name; 1098 } 1099 1100 uint64_t 1101 spdk_thread_get_id(const struct spdk_thread *thread) 1102 { 1103 return thread->id; 1104 } 1105 1106 struct spdk_thread * 1107 spdk_thread_get_by_id(uint64_t id) 1108 { 1109 struct spdk_thread *thread; 1110 1111 if (id == 0 || id >= g_thread_id) { 1112 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1113 return NULL; 1114 } 1115 pthread_mutex_lock(&g_devlist_mutex); 1116 TAILQ_FOREACH(thread, &g_threads, tailq) { 1117 if (thread->id == id) { 1118 break; 1119 } 1120 } 1121 pthread_mutex_unlock(&g_devlist_mutex); 1122 return thread; 1123 } 1124 1125 int 1126 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1127 { 1128 struct spdk_thread *thread; 1129 1130 thread = _get_thread(); 1131 if (!thread) { 1132 SPDK_ERRLOG("No thread allocated\n"); 1133 return -EINVAL; 1134 } 1135 1136 if (stats == NULL) { 1137 return -EINVAL; 1138 } 1139 1140 *stats = thread->stats; 1141 1142 return 0; 1143 } 1144 1145 uint64_t 1146 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1147 { 1148 if (thread == NULL) { 1149 thread = _get_thread(); 1150 } 1151 1152 return thread->tsc_last; 1153 } 1154 1155 static inline int 1156 thread_send_msg_notification(const struct spdk_thread *target_thread) 1157 { 1158 uint64_t notify = 1; 1159 int rc; 1160 1161 /* Not necessary to do notification if interrupt facility is not enabled */ 1162 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1163 return 0; 1164 } 1165 1166 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1167 * after sending thread msg, it is necessary to check whether target thread runs in 1168 * interrupt mode and then decide whether do event notification. 1169 */ 1170 if (spdk_unlikely(target_thread->in_interrupt)) { 1171 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1172 if (rc < 0) { 1173 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1174 return -EIO; 1175 } 1176 } 1177 1178 return 0; 1179 } 1180 1181 int 1182 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1183 { 1184 struct spdk_thread *local_thread; 1185 struct spdk_msg *msg; 1186 int rc; 1187 1188 assert(thread != NULL); 1189 1190 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1191 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1192 return -EIO; 1193 } 1194 1195 local_thread = _get_thread(); 1196 1197 msg = NULL; 1198 if (local_thread != NULL) { 1199 if (local_thread->msg_cache_count > 0) { 1200 msg = SLIST_FIRST(&local_thread->msg_cache); 1201 assert(msg != NULL); 1202 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1203 local_thread->msg_cache_count--; 1204 } 1205 } 1206 1207 if (msg == NULL) { 1208 msg = spdk_mempool_get(g_spdk_msg_mempool); 1209 if (!msg) { 1210 SPDK_ERRLOG("msg could not be allocated\n"); 1211 return -ENOMEM; 1212 } 1213 } 1214 1215 msg->fn = fn; 1216 msg->arg = ctx; 1217 1218 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1219 if (rc != 1) { 1220 SPDK_ERRLOG("msg could not be enqueued\n"); 1221 spdk_mempool_put(g_spdk_msg_mempool, msg); 1222 return -EIO; 1223 } 1224 1225 return thread_send_msg_notification(thread); 1226 } 1227 1228 int 1229 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1230 { 1231 spdk_msg_fn expected = NULL; 1232 1233 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1234 __ATOMIC_SEQ_CST)) { 1235 return -EIO; 1236 } 1237 1238 return thread_send_msg_notification(thread); 1239 } 1240 1241 #ifdef __linux__ 1242 static int 1243 interrupt_timerfd_process(void *arg) 1244 { 1245 struct spdk_poller *poller = arg; 1246 uint64_t exp; 1247 int rc; 1248 1249 /* clear the level of interval timer */ 1250 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1251 if (rc < 0) { 1252 if (rc == -EAGAIN) { 1253 return 0; 1254 } 1255 1256 return rc; 1257 } 1258 1259 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1260 1261 return poller->fn(poller->arg); 1262 } 1263 1264 static int 1265 period_poller_interrupt_init(struct spdk_poller *poller) 1266 { 1267 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1268 int timerfd; 1269 int rc; 1270 1271 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1272 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1273 if (timerfd < 0) { 1274 return -errno; 1275 } 1276 1277 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1278 if (rc < 0) { 1279 close(timerfd); 1280 return rc; 1281 } 1282 1283 poller->interruptfd = timerfd; 1284 return 0; 1285 } 1286 1287 static void 1288 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1289 { 1290 int timerfd = poller->interruptfd; 1291 uint64_t now_tick = spdk_get_ticks(); 1292 uint64_t ticks = spdk_get_ticks_hz(); 1293 int ret; 1294 struct itimerspec new_tv = {}; 1295 struct itimerspec old_tv = {}; 1296 1297 assert(poller->period_ticks != 0); 1298 assert(timerfd >= 0); 1299 1300 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1301 interrupt_mode ? "interrupt" : "poll"); 1302 1303 if (interrupt_mode) { 1304 /* Set repeated timer expiration */ 1305 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1306 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1307 1308 /* Update next timer expiration */ 1309 if (poller->next_run_tick == 0) { 1310 poller->next_run_tick = now_tick + poller->period_ticks; 1311 } else if (poller->next_run_tick < now_tick) { 1312 poller->next_run_tick = now_tick; 1313 } 1314 1315 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1316 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1317 1318 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1319 if (ret < 0) { 1320 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1321 assert(false); 1322 } 1323 } else { 1324 /* Disarm the timer */ 1325 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1326 if (ret < 0) { 1327 /* timerfd_settime's failure indicates that the timerfd is in error */ 1328 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1329 assert(false); 1330 } 1331 1332 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1333 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1334 */ 1335 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1336 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1337 poller_remove_timer(poller->thread, poller); 1338 poller_insert_timer(poller->thread, poller, now_tick); 1339 } 1340 } 1341 1342 static void 1343 poller_interrupt_fini(struct spdk_poller *poller) 1344 { 1345 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1346 assert(poller->interruptfd >= 0); 1347 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1348 close(poller->interruptfd); 1349 poller->interruptfd = -1; 1350 } 1351 1352 static int 1353 busy_poller_interrupt_init(struct spdk_poller *poller) 1354 { 1355 int busy_efd; 1356 int rc; 1357 1358 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1359 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1360 if (busy_efd < 0) { 1361 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1362 return -errno; 1363 } 1364 1365 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1366 poller->fn, poller->arg, poller->name); 1367 if (rc < 0) { 1368 close(busy_efd); 1369 return rc; 1370 } 1371 1372 poller->interruptfd = busy_efd; 1373 return 0; 1374 } 1375 1376 static void 1377 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1378 { 1379 int busy_efd = poller->interruptfd; 1380 uint64_t notify = 1; 1381 int rc __attribute__((unused)); 1382 1383 assert(busy_efd >= 0); 1384 1385 if (interrupt_mode) { 1386 /* Write without read on eventfd will get it repeatedly triggered. */ 1387 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1388 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1389 } 1390 } else { 1391 /* Read on eventfd will clear its level triggering. */ 1392 rc = read(busy_efd, ¬ify, sizeof(notify)); 1393 } 1394 } 1395 1396 #else 1397 1398 static int 1399 period_poller_interrupt_init(struct spdk_poller *poller) 1400 { 1401 return -ENOTSUP; 1402 } 1403 1404 static void 1405 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1406 { 1407 } 1408 1409 static void 1410 poller_interrupt_fini(struct spdk_poller *poller) 1411 { 1412 } 1413 1414 static int 1415 busy_poller_interrupt_init(struct spdk_poller *poller) 1416 { 1417 return -ENOTSUP; 1418 } 1419 1420 static void 1421 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1422 { 1423 } 1424 1425 #endif 1426 1427 void 1428 spdk_poller_register_interrupt(struct spdk_poller *poller, 1429 spdk_poller_set_interrupt_mode_cb cb_fn, 1430 void *cb_arg) 1431 { 1432 assert(poller != NULL); 1433 assert(cb_fn != NULL); 1434 assert(spdk_get_thread() == poller->thread); 1435 1436 if (!spdk_interrupt_mode_is_enabled()) { 1437 return; 1438 } 1439 1440 /* when a poller is created we don't know if the user is ever going to 1441 * enable interrupts on it by calling this function, so the poller 1442 * registration function has to immediately create a interruptfd. 1443 * When this function does get called by user, we have to then destroy 1444 * that interruptfd. 1445 */ 1446 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1447 poller_interrupt_fini(poller); 1448 } 1449 1450 poller->set_intr_cb_fn = cb_fn; 1451 poller->set_intr_cb_arg = cb_arg; 1452 1453 /* Set poller into interrupt mode if thread is in interrupt. */ 1454 if (poller->thread->in_interrupt) { 1455 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1456 } 1457 } 1458 1459 static uint64_t 1460 convert_us_to_ticks(uint64_t us) 1461 { 1462 uint64_t quotient, remainder, ticks; 1463 1464 if (us) { 1465 quotient = us / SPDK_SEC_TO_USEC; 1466 remainder = us % SPDK_SEC_TO_USEC; 1467 ticks = spdk_get_ticks_hz(); 1468 1469 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1470 } else { 1471 return 0; 1472 } 1473 } 1474 1475 static struct spdk_poller * 1476 poller_register(spdk_poller_fn fn, 1477 void *arg, 1478 uint64_t period_microseconds, 1479 const char *name) 1480 { 1481 struct spdk_thread *thread; 1482 struct spdk_poller *poller; 1483 1484 thread = spdk_get_thread(); 1485 if (!thread) { 1486 assert(false); 1487 return NULL; 1488 } 1489 1490 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1491 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1492 return NULL; 1493 } 1494 1495 poller = calloc(1, sizeof(*poller)); 1496 if (poller == NULL) { 1497 SPDK_ERRLOG("Poller memory allocation failed\n"); 1498 return NULL; 1499 } 1500 1501 if (name) { 1502 snprintf(poller->name, sizeof(poller->name), "%s", name); 1503 } else { 1504 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1505 } 1506 1507 poller->state = SPDK_POLLER_STATE_WAITING; 1508 poller->fn = fn; 1509 poller->arg = arg; 1510 poller->thread = thread; 1511 poller->interruptfd = -1; 1512 if (thread->next_poller_id == 0) { 1513 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1514 thread->next_poller_id = 1; 1515 } 1516 poller->id = thread->next_poller_id++; 1517 1518 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1519 1520 if (spdk_interrupt_mode_is_enabled()) { 1521 int rc; 1522 1523 if (period_microseconds) { 1524 rc = period_poller_interrupt_init(poller); 1525 if (rc < 0) { 1526 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1527 free(poller); 1528 return NULL; 1529 } 1530 1531 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1532 } else { 1533 /* If the poller doesn't have a period, create interruptfd that's always 1534 * busy automatically when running in interrupt mode. 1535 */ 1536 rc = busy_poller_interrupt_init(poller); 1537 if (rc > 0) { 1538 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1539 free(poller); 1540 return NULL; 1541 } 1542 1543 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1544 } 1545 } 1546 1547 thread_insert_poller(thread, poller); 1548 1549 return poller; 1550 } 1551 1552 struct spdk_poller * 1553 spdk_poller_register(spdk_poller_fn fn, 1554 void *arg, 1555 uint64_t period_microseconds) 1556 { 1557 return poller_register(fn, arg, period_microseconds, NULL); 1558 } 1559 1560 struct spdk_poller * 1561 spdk_poller_register_named(spdk_poller_fn fn, 1562 void *arg, 1563 uint64_t period_microseconds, 1564 const char *name) 1565 { 1566 return poller_register(fn, arg, period_microseconds, name); 1567 } 1568 1569 static void 1570 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1571 struct spdk_thread *curthread) 1572 { 1573 if (thread == NULL) { 1574 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1575 abort(); 1576 } 1577 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1578 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1579 thread->name, thread->id); 1580 assert(false); 1581 } 1582 1583 void 1584 spdk_poller_unregister(struct spdk_poller **ppoller) 1585 { 1586 struct spdk_thread *thread; 1587 struct spdk_poller *poller; 1588 1589 poller = *ppoller; 1590 if (poller == NULL) { 1591 return; 1592 } 1593 1594 *ppoller = NULL; 1595 1596 thread = spdk_get_thread(); 1597 if (!thread) { 1598 assert(false); 1599 return; 1600 } 1601 1602 if (poller->thread != thread) { 1603 wrong_thread(__func__, poller->name, poller->thread, thread); 1604 return; 1605 } 1606 1607 if (spdk_interrupt_mode_is_enabled()) { 1608 /* Release the interrupt resource for period or busy poller */ 1609 if (poller->interruptfd >= 0) { 1610 poller_interrupt_fini(poller); 1611 } 1612 1613 /* Mark there is poller unregistered. Then unregistered pollers will 1614 * get reaped by spdk_thread_poll also in intr mode. 1615 */ 1616 thread->poller_unregistered = true; 1617 } 1618 1619 /* If the poller was paused, put it on the active_pollers list so that 1620 * its unregistration can be processed by spdk_thread_poll(). 1621 */ 1622 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1623 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1624 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1625 poller->period_ticks = 0; 1626 } 1627 1628 /* Simply set the state to unregistered. The poller will get cleaned up 1629 * in a subsequent call to spdk_thread_poll(). 1630 */ 1631 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1632 } 1633 1634 void 1635 spdk_poller_pause(struct spdk_poller *poller) 1636 { 1637 struct spdk_thread *thread; 1638 1639 thread = spdk_get_thread(); 1640 if (!thread) { 1641 assert(false); 1642 return; 1643 } 1644 1645 if (poller->thread != thread) { 1646 wrong_thread(__func__, poller->name, poller->thread, thread); 1647 return; 1648 } 1649 1650 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1651 * spdk_thread_poll() move it. It allows a poller to be paused from 1652 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1653 * iteration, or from within itself without breaking the logic to always 1654 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1655 */ 1656 switch (poller->state) { 1657 case SPDK_POLLER_STATE_PAUSED: 1658 case SPDK_POLLER_STATE_PAUSING: 1659 break; 1660 case SPDK_POLLER_STATE_RUNNING: 1661 case SPDK_POLLER_STATE_WAITING: 1662 poller->state = SPDK_POLLER_STATE_PAUSING; 1663 break; 1664 default: 1665 assert(false); 1666 break; 1667 } 1668 } 1669 1670 void 1671 spdk_poller_resume(struct spdk_poller *poller) 1672 { 1673 struct spdk_thread *thread; 1674 1675 thread = spdk_get_thread(); 1676 if (!thread) { 1677 assert(false); 1678 return; 1679 } 1680 1681 if (poller->thread != thread) { 1682 wrong_thread(__func__, poller->name, poller->thread, thread); 1683 return; 1684 } 1685 1686 /* If a poller is paused it has to be removed from the paused pollers 1687 * list and put on the active list or timer tree depending on its 1688 * period_ticks. If a poller is still in the process of being paused, 1689 * we just need to flip its state back to waiting, as it's already on 1690 * the appropriate list or tree. 1691 */ 1692 switch (poller->state) { 1693 case SPDK_POLLER_STATE_PAUSED: 1694 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1695 thread_insert_poller(thread, poller); 1696 /* fallthrough */ 1697 case SPDK_POLLER_STATE_PAUSING: 1698 poller->state = SPDK_POLLER_STATE_WAITING; 1699 break; 1700 case SPDK_POLLER_STATE_RUNNING: 1701 case SPDK_POLLER_STATE_WAITING: 1702 break; 1703 default: 1704 assert(false); 1705 break; 1706 } 1707 } 1708 1709 const char * 1710 spdk_poller_get_name(struct spdk_poller *poller) 1711 { 1712 return poller->name; 1713 } 1714 1715 uint64_t 1716 spdk_poller_get_id(struct spdk_poller *poller) 1717 { 1718 return poller->id; 1719 } 1720 1721 const char * 1722 spdk_poller_get_state_str(struct spdk_poller *poller) 1723 { 1724 switch (poller->state) { 1725 case SPDK_POLLER_STATE_WAITING: 1726 return "waiting"; 1727 case SPDK_POLLER_STATE_RUNNING: 1728 return "running"; 1729 case SPDK_POLLER_STATE_UNREGISTERED: 1730 return "unregistered"; 1731 case SPDK_POLLER_STATE_PAUSING: 1732 return "pausing"; 1733 case SPDK_POLLER_STATE_PAUSED: 1734 return "paused"; 1735 default: 1736 return NULL; 1737 } 1738 } 1739 1740 uint64_t 1741 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1742 { 1743 return poller->period_ticks; 1744 } 1745 1746 void 1747 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1748 { 1749 stats->run_count = poller->run_count; 1750 stats->busy_count = poller->busy_count; 1751 } 1752 1753 struct spdk_poller * 1754 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1755 { 1756 return TAILQ_FIRST(&thread->active_pollers); 1757 } 1758 1759 struct spdk_poller * 1760 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1761 { 1762 return TAILQ_NEXT(prev, tailq); 1763 } 1764 1765 struct spdk_poller * 1766 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1767 { 1768 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1769 } 1770 1771 struct spdk_poller * 1772 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1773 { 1774 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1775 } 1776 1777 struct spdk_poller * 1778 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1779 { 1780 return TAILQ_FIRST(&thread->paused_pollers); 1781 } 1782 1783 struct spdk_poller * 1784 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1785 { 1786 return TAILQ_NEXT(prev, tailq); 1787 } 1788 1789 struct spdk_io_channel * 1790 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1791 { 1792 return RB_MIN(io_channel_tree, &thread->io_channels); 1793 } 1794 1795 struct spdk_io_channel * 1796 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1797 { 1798 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1799 } 1800 1801 struct call_thread { 1802 struct spdk_thread *cur_thread; 1803 spdk_msg_fn fn; 1804 void *ctx; 1805 1806 struct spdk_thread *orig_thread; 1807 spdk_msg_fn cpl; 1808 }; 1809 1810 static void 1811 _on_thread(void *ctx) 1812 { 1813 struct call_thread *ct = ctx; 1814 int rc __attribute__((unused)); 1815 1816 ct->fn(ct->ctx); 1817 1818 pthread_mutex_lock(&g_devlist_mutex); 1819 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1820 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1821 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1822 ct->cur_thread->name); 1823 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1824 } 1825 pthread_mutex_unlock(&g_devlist_mutex); 1826 1827 if (!ct->cur_thread) { 1828 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1829 1830 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1831 free(ctx); 1832 } else { 1833 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1834 ct->cur_thread->name); 1835 1836 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1837 } 1838 assert(rc == 0); 1839 } 1840 1841 void 1842 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1843 { 1844 struct call_thread *ct; 1845 struct spdk_thread *thread; 1846 int rc __attribute__((unused)); 1847 1848 ct = calloc(1, sizeof(*ct)); 1849 if (!ct) { 1850 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1851 cpl(ctx); 1852 return; 1853 } 1854 1855 ct->fn = fn; 1856 ct->ctx = ctx; 1857 ct->cpl = cpl; 1858 1859 thread = _get_thread(); 1860 if (!thread) { 1861 SPDK_ERRLOG("No thread allocated\n"); 1862 free(ct); 1863 cpl(ctx); 1864 return; 1865 } 1866 ct->orig_thread = thread; 1867 1868 pthread_mutex_lock(&g_devlist_mutex); 1869 ct->cur_thread = TAILQ_FIRST(&g_threads); 1870 pthread_mutex_unlock(&g_devlist_mutex); 1871 1872 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1873 ct->orig_thread->name); 1874 1875 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1876 assert(rc == 0); 1877 } 1878 1879 static inline void 1880 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1881 { 1882 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1883 return; 1884 } 1885 1886 if (!poller->set_intr_cb_fn) { 1887 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1888 assert(false); 1889 return; 1890 } 1891 1892 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1893 } 1894 1895 void 1896 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1897 { 1898 struct spdk_thread *thread = _get_thread(); 1899 struct spdk_poller *poller, *tmp; 1900 1901 assert(thread); 1902 assert(spdk_interrupt_mode_is_enabled()); 1903 1904 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1905 thread->name, enable_interrupt ? "intr" : "poll", 1906 thread->in_interrupt ? "intr" : "poll"); 1907 1908 if (thread->in_interrupt == enable_interrupt) { 1909 return; 1910 } 1911 1912 /* Set pollers to expected mode */ 1913 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1914 poller_set_interrupt_mode(poller, enable_interrupt); 1915 } 1916 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1917 poller_set_interrupt_mode(poller, enable_interrupt); 1918 } 1919 /* All paused pollers will go to work in interrupt mode */ 1920 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1921 poller_set_interrupt_mode(poller, enable_interrupt); 1922 } 1923 1924 thread->in_interrupt = enable_interrupt; 1925 return; 1926 } 1927 1928 static struct io_device * 1929 io_device_get(void *io_device) 1930 { 1931 struct io_device find = {}; 1932 1933 find.io_device = io_device; 1934 return RB_FIND(io_device_tree, &g_io_devices, &find); 1935 } 1936 1937 void 1938 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1939 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1940 const char *name) 1941 { 1942 struct io_device *dev, *tmp; 1943 struct spdk_thread *thread; 1944 1945 assert(io_device != NULL); 1946 assert(create_cb != NULL); 1947 assert(destroy_cb != NULL); 1948 1949 thread = spdk_get_thread(); 1950 if (!thread) { 1951 SPDK_ERRLOG("called from non-SPDK thread\n"); 1952 assert(false); 1953 return; 1954 } 1955 1956 dev = calloc(1, sizeof(struct io_device)); 1957 if (dev == NULL) { 1958 SPDK_ERRLOG("could not allocate io_device\n"); 1959 return; 1960 } 1961 1962 dev->io_device = io_device; 1963 if (name) { 1964 snprintf(dev->name, sizeof(dev->name), "%s", name); 1965 } else { 1966 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1967 } 1968 dev->create_cb = create_cb; 1969 dev->destroy_cb = destroy_cb; 1970 dev->unregister_cb = NULL; 1971 dev->ctx_size = ctx_size; 1972 dev->for_each_count = 0; 1973 dev->unregistered = false; 1974 dev->refcnt = 0; 1975 1976 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1977 dev->name, dev->io_device, thread->name); 1978 1979 pthread_mutex_lock(&g_devlist_mutex); 1980 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1981 if (tmp != NULL) { 1982 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1983 io_device, tmp->name, dev->name); 1984 free(dev); 1985 } 1986 1987 pthread_mutex_unlock(&g_devlist_mutex); 1988 } 1989 1990 static void 1991 _finish_unregister(void *arg) 1992 { 1993 struct io_device *dev = arg; 1994 struct spdk_thread *thread; 1995 1996 thread = spdk_get_thread(); 1997 assert(thread == dev->unregister_thread); 1998 1999 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2000 dev->name, dev->io_device, thread->name); 2001 2002 assert(thread->pending_unregister_count > 0); 2003 thread->pending_unregister_count--; 2004 2005 dev->unregister_cb(dev->io_device); 2006 free(dev); 2007 } 2008 2009 static void 2010 io_device_free(struct io_device *dev) 2011 { 2012 int rc __attribute__((unused)); 2013 2014 if (dev->unregister_cb == NULL) { 2015 free(dev); 2016 } else { 2017 assert(dev->unregister_thread != NULL); 2018 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2019 dev->name, dev->io_device, dev->unregister_thread->name); 2020 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2021 assert(rc == 0); 2022 } 2023 } 2024 2025 void 2026 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2027 { 2028 struct io_device *dev; 2029 uint32_t refcnt; 2030 struct spdk_thread *thread; 2031 2032 thread = spdk_get_thread(); 2033 if (!thread) { 2034 SPDK_ERRLOG("called from non-SPDK thread\n"); 2035 assert(false); 2036 return; 2037 } 2038 2039 pthread_mutex_lock(&g_devlist_mutex); 2040 dev = io_device_get(io_device); 2041 if (!dev) { 2042 SPDK_ERRLOG("io_device %p not found\n", io_device); 2043 assert(false); 2044 pthread_mutex_unlock(&g_devlist_mutex); 2045 return; 2046 } 2047 2048 /* The for_each_count check differentiates the user attempting to unregister the 2049 * device a second time, from the internal call to this function that occurs 2050 * after the for_each_count reaches 0. 2051 */ 2052 if (dev->pending_unregister && dev->for_each_count > 0) { 2053 SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device); 2054 assert(false); 2055 pthread_mutex_unlock(&g_devlist_mutex); 2056 return; 2057 } 2058 2059 dev->unregister_cb = unregister_cb; 2060 dev->unregister_thread = thread; 2061 2062 if (dev->for_each_count > 0) { 2063 SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2064 dev->name, io_device, dev->for_each_count); 2065 dev->pending_unregister = true; 2066 pthread_mutex_unlock(&g_devlist_mutex); 2067 return; 2068 } 2069 2070 dev->unregistered = true; 2071 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2072 refcnt = dev->refcnt; 2073 pthread_mutex_unlock(&g_devlist_mutex); 2074 2075 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2076 dev->name, dev->io_device, thread->name); 2077 2078 if (unregister_cb) { 2079 thread->pending_unregister_count++; 2080 } 2081 2082 if (refcnt > 0) { 2083 /* defer deletion */ 2084 return; 2085 } 2086 2087 io_device_free(dev); 2088 } 2089 2090 const char * 2091 spdk_io_device_get_name(struct io_device *dev) 2092 { 2093 return dev->name; 2094 } 2095 2096 static struct spdk_io_channel * 2097 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2098 { 2099 struct spdk_io_channel find = {}; 2100 2101 find.dev = dev; 2102 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2103 } 2104 2105 struct spdk_io_channel * 2106 spdk_get_io_channel(void *io_device) 2107 { 2108 struct spdk_io_channel *ch; 2109 struct spdk_thread *thread; 2110 struct io_device *dev; 2111 int rc; 2112 2113 pthread_mutex_lock(&g_devlist_mutex); 2114 dev = io_device_get(io_device); 2115 if (dev == NULL) { 2116 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2117 pthread_mutex_unlock(&g_devlist_mutex); 2118 return NULL; 2119 } 2120 2121 thread = _get_thread(); 2122 if (!thread) { 2123 SPDK_ERRLOG("No thread allocated\n"); 2124 pthread_mutex_unlock(&g_devlist_mutex); 2125 return NULL; 2126 } 2127 2128 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2129 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2130 pthread_mutex_unlock(&g_devlist_mutex); 2131 return NULL; 2132 } 2133 2134 ch = thread_get_io_channel(thread, dev); 2135 if (ch != NULL) { 2136 ch->ref++; 2137 2138 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2139 ch, dev->name, dev->io_device, thread->name, ch->ref); 2140 2141 /* 2142 * An I/O channel already exists for this device on this 2143 * thread, so return it. 2144 */ 2145 pthread_mutex_unlock(&g_devlist_mutex); 2146 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2147 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2148 return ch; 2149 } 2150 2151 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2152 if (ch == NULL) { 2153 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2154 pthread_mutex_unlock(&g_devlist_mutex); 2155 return NULL; 2156 } 2157 2158 ch->dev = dev; 2159 ch->destroy_cb = dev->destroy_cb; 2160 ch->thread = thread; 2161 ch->ref = 1; 2162 ch->destroy_ref = 0; 2163 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2164 2165 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2166 ch, dev->name, dev->io_device, thread->name, ch->ref); 2167 2168 dev->refcnt++; 2169 2170 pthread_mutex_unlock(&g_devlist_mutex); 2171 2172 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2173 if (rc != 0) { 2174 pthread_mutex_lock(&g_devlist_mutex); 2175 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2176 dev->refcnt--; 2177 free(ch); 2178 pthread_mutex_unlock(&g_devlist_mutex); 2179 return NULL; 2180 } 2181 2182 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2183 return ch; 2184 } 2185 2186 static void 2187 put_io_channel(void *arg) 2188 { 2189 struct spdk_io_channel *ch = arg; 2190 bool do_remove_dev = true; 2191 struct spdk_thread *thread; 2192 2193 thread = spdk_get_thread(); 2194 if (!thread) { 2195 SPDK_ERRLOG("called from non-SPDK thread\n"); 2196 assert(false); 2197 return; 2198 } 2199 2200 SPDK_DEBUGLOG(thread, 2201 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2202 ch, ch->dev->name, ch->dev->io_device, thread->name); 2203 2204 assert(ch->thread == thread); 2205 2206 ch->destroy_ref--; 2207 2208 if (ch->ref > 0 || ch->destroy_ref > 0) { 2209 /* 2210 * Another reference to the associated io_device was requested 2211 * after this message was sent but before it had a chance to 2212 * execute. 2213 */ 2214 return; 2215 } 2216 2217 pthread_mutex_lock(&g_devlist_mutex); 2218 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2219 pthread_mutex_unlock(&g_devlist_mutex); 2220 2221 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2222 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2223 2224 pthread_mutex_lock(&g_devlist_mutex); 2225 ch->dev->refcnt--; 2226 2227 if (!ch->dev->unregistered) { 2228 do_remove_dev = false; 2229 } 2230 2231 if (ch->dev->refcnt > 0) { 2232 do_remove_dev = false; 2233 } 2234 2235 pthread_mutex_unlock(&g_devlist_mutex); 2236 2237 if (do_remove_dev) { 2238 io_device_free(ch->dev); 2239 } 2240 free(ch); 2241 } 2242 2243 void 2244 spdk_put_io_channel(struct spdk_io_channel *ch) 2245 { 2246 struct spdk_thread *thread; 2247 int rc __attribute__((unused)); 2248 2249 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2250 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2251 2252 thread = spdk_get_thread(); 2253 if (!thread) { 2254 SPDK_ERRLOG("called from non-SPDK thread\n"); 2255 assert(false); 2256 return; 2257 } 2258 2259 if (ch->thread != thread) { 2260 wrong_thread(__func__, "ch", ch->thread, thread); 2261 return; 2262 } 2263 2264 SPDK_DEBUGLOG(thread, 2265 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2266 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2267 2268 ch->ref--; 2269 2270 if (ch->ref == 0) { 2271 ch->destroy_ref++; 2272 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2273 assert(rc == 0); 2274 } 2275 } 2276 2277 struct spdk_io_channel * 2278 spdk_io_channel_from_ctx(void *ctx) 2279 { 2280 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2281 } 2282 2283 struct spdk_thread * 2284 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2285 { 2286 return ch->thread; 2287 } 2288 2289 void * 2290 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2291 { 2292 return ch->dev->io_device; 2293 } 2294 2295 const char * 2296 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2297 { 2298 return spdk_io_device_get_name(ch->dev); 2299 } 2300 2301 int 2302 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2303 { 2304 return ch->ref; 2305 } 2306 2307 struct spdk_io_channel_iter { 2308 void *io_device; 2309 struct io_device *dev; 2310 spdk_channel_msg fn; 2311 int status; 2312 void *ctx; 2313 struct spdk_io_channel *ch; 2314 2315 struct spdk_thread *cur_thread; 2316 2317 struct spdk_thread *orig_thread; 2318 spdk_channel_for_each_cpl cpl; 2319 }; 2320 2321 void * 2322 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2323 { 2324 return i->io_device; 2325 } 2326 2327 struct spdk_io_channel * 2328 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2329 { 2330 return i->ch; 2331 } 2332 2333 void * 2334 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2335 { 2336 return i->ctx; 2337 } 2338 2339 static void 2340 _call_completion(void *ctx) 2341 { 2342 struct spdk_io_channel_iter *i = ctx; 2343 2344 if (i->cpl != NULL) { 2345 i->cpl(i, i->status); 2346 } 2347 free(i); 2348 } 2349 2350 static void 2351 _call_channel(void *ctx) 2352 { 2353 struct spdk_io_channel_iter *i = ctx; 2354 struct spdk_io_channel *ch; 2355 2356 /* 2357 * It is possible that the channel was deleted before this 2358 * message had a chance to execute. If so, skip calling 2359 * the fn() on this thread. 2360 */ 2361 pthread_mutex_lock(&g_devlist_mutex); 2362 ch = thread_get_io_channel(i->cur_thread, i->dev); 2363 pthread_mutex_unlock(&g_devlist_mutex); 2364 2365 if (ch) { 2366 i->fn(i); 2367 } else { 2368 spdk_for_each_channel_continue(i, 0); 2369 } 2370 } 2371 2372 void 2373 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2374 spdk_channel_for_each_cpl cpl) 2375 { 2376 struct spdk_thread *thread; 2377 struct spdk_io_channel *ch; 2378 struct spdk_io_channel_iter *i; 2379 int rc __attribute__((unused)); 2380 2381 i = calloc(1, sizeof(*i)); 2382 if (!i) { 2383 SPDK_ERRLOG("Unable to allocate iterator\n"); 2384 assert(false); 2385 return; 2386 } 2387 2388 i->io_device = io_device; 2389 i->fn = fn; 2390 i->ctx = ctx; 2391 i->cpl = cpl; 2392 i->orig_thread = _get_thread(); 2393 2394 pthread_mutex_lock(&g_devlist_mutex); 2395 i->dev = io_device_get(io_device); 2396 if (i->dev == NULL) { 2397 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2398 assert(false); 2399 i->status = -ENODEV; 2400 goto end; 2401 } 2402 2403 /* Do not allow new for_each operations if we are already waiting to unregister 2404 * the device for other for_each operations to complete. 2405 */ 2406 if (i->dev->pending_unregister) { 2407 SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device); 2408 i->status = -ENODEV; 2409 goto end; 2410 } 2411 2412 TAILQ_FOREACH(thread, &g_threads, tailq) { 2413 ch = thread_get_io_channel(thread, i->dev); 2414 if (ch != NULL) { 2415 ch->dev->for_each_count++; 2416 i->cur_thread = thread; 2417 i->ch = ch; 2418 pthread_mutex_unlock(&g_devlist_mutex); 2419 rc = spdk_thread_send_msg(thread, _call_channel, i); 2420 assert(rc == 0); 2421 return; 2422 } 2423 } 2424 2425 end: 2426 pthread_mutex_unlock(&g_devlist_mutex); 2427 2428 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2429 assert(rc == 0); 2430 } 2431 2432 static void 2433 __pending_unregister(void *arg) 2434 { 2435 struct io_device *dev = arg; 2436 2437 assert(dev->pending_unregister); 2438 assert(dev->for_each_count == 0); 2439 spdk_io_device_unregister(dev->io_device, dev->unregister_cb); 2440 } 2441 2442 void 2443 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2444 { 2445 struct spdk_thread *thread; 2446 struct spdk_io_channel *ch; 2447 struct io_device *dev; 2448 int rc __attribute__((unused)); 2449 2450 assert(i->cur_thread == spdk_get_thread()); 2451 2452 i->status = status; 2453 2454 pthread_mutex_lock(&g_devlist_mutex); 2455 dev = i->dev; 2456 if (status) { 2457 goto end; 2458 } 2459 2460 thread = TAILQ_NEXT(i->cur_thread, tailq); 2461 while (thread) { 2462 ch = thread_get_io_channel(thread, dev); 2463 if (ch != NULL) { 2464 i->cur_thread = thread; 2465 i->ch = ch; 2466 pthread_mutex_unlock(&g_devlist_mutex); 2467 rc = spdk_thread_send_msg(thread, _call_channel, i); 2468 assert(rc == 0); 2469 return; 2470 } 2471 thread = TAILQ_NEXT(thread, tailq); 2472 } 2473 2474 end: 2475 dev->for_each_count--; 2476 i->ch = NULL; 2477 pthread_mutex_unlock(&g_devlist_mutex); 2478 2479 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2480 assert(rc == 0); 2481 2482 pthread_mutex_lock(&g_devlist_mutex); 2483 if (dev->pending_unregister && dev->for_each_count == 0) { 2484 rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev); 2485 assert(rc == 0); 2486 } 2487 pthread_mutex_unlock(&g_devlist_mutex); 2488 } 2489 2490 struct spdk_interrupt { 2491 int efd; 2492 struct spdk_thread *thread; 2493 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2494 }; 2495 2496 static void 2497 thread_interrupt_destroy(struct spdk_thread *thread) 2498 { 2499 struct spdk_fd_group *fgrp = thread->fgrp; 2500 2501 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2502 2503 if (thread->msg_fd < 0) { 2504 return; 2505 } 2506 2507 spdk_fd_group_remove(fgrp, thread->msg_fd); 2508 close(thread->msg_fd); 2509 thread->msg_fd = -1; 2510 2511 spdk_fd_group_destroy(fgrp); 2512 thread->fgrp = NULL; 2513 } 2514 2515 #ifdef __linux__ 2516 static int 2517 thread_interrupt_msg_process(void *arg) 2518 { 2519 struct spdk_thread *thread = arg; 2520 uint32_t msg_count; 2521 spdk_msg_fn critical_msg; 2522 int rc = 0; 2523 uint64_t notify = 1; 2524 2525 assert(spdk_interrupt_mode_is_enabled()); 2526 2527 /* There may be race between msg_acknowledge and another producer's msg_notify, 2528 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2529 * This can avoid msg notification missing. 2530 */ 2531 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2532 if (rc < 0 && errno != EAGAIN) { 2533 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2534 } 2535 2536 critical_msg = thread->critical_msg; 2537 if (spdk_unlikely(critical_msg != NULL)) { 2538 critical_msg(NULL); 2539 thread->critical_msg = NULL; 2540 rc = 1; 2541 } 2542 2543 msg_count = msg_queue_run_batch(thread, 0); 2544 if (msg_count) { 2545 rc = 1; 2546 } 2547 2548 return rc; 2549 } 2550 2551 static int 2552 thread_interrupt_create(struct spdk_thread *thread) 2553 { 2554 int rc; 2555 2556 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2557 2558 rc = spdk_fd_group_create(&thread->fgrp); 2559 if (rc) { 2560 return rc; 2561 } 2562 2563 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2564 if (thread->msg_fd < 0) { 2565 rc = -errno; 2566 spdk_fd_group_destroy(thread->fgrp); 2567 thread->fgrp = NULL; 2568 2569 return rc; 2570 } 2571 2572 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2573 thread_interrupt_msg_process, thread); 2574 } 2575 #else 2576 static int 2577 thread_interrupt_create(struct spdk_thread *thread) 2578 { 2579 return -ENOTSUP; 2580 } 2581 #endif 2582 2583 struct spdk_interrupt * 2584 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2585 void *arg, const char *name) 2586 { 2587 struct spdk_thread *thread; 2588 struct spdk_interrupt *intr; 2589 int ret; 2590 2591 thread = spdk_get_thread(); 2592 if (!thread) { 2593 assert(false); 2594 return NULL; 2595 } 2596 2597 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2598 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2599 return NULL; 2600 } 2601 2602 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2603 2604 if (ret != 0) { 2605 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2606 thread->name, efd, spdk_strerror(-ret)); 2607 return NULL; 2608 } 2609 2610 intr = calloc(1, sizeof(*intr)); 2611 if (intr == NULL) { 2612 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2613 return NULL; 2614 } 2615 2616 if (name) { 2617 snprintf(intr->name, sizeof(intr->name), "%s", name); 2618 } else { 2619 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2620 } 2621 2622 intr->efd = efd; 2623 intr->thread = thread; 2624 2625 return intr; 2626 } 2627 2628 void 2629 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2630 { 2631 struct spdk_thread *thread; 2632 struct spdk_interrupt *intr; 2633 2634 intr = *pintr; 2635 if (intr == NULL) { 2636 return; 2637 } 2638 2639 *pintr = NULL; 2640 2641 thread = spdk_get_thread(); 2642 if (!thread) { 2643 assert(false); 2644 return; 2645 } 2646 2647 if (intr->thread != thread) { 2648 wrong_thread(__func__, intr->name, intr->thread, thread); 2649 return; 2650 } 2651 2652 spdk_fd_group_remove(thread->fgrp, intr->efd); 2653 free(intr); 2654 } 2655 2656 int 2657 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2658 enum spdk_interrupt_event_types event_types) 2659 { 2660 struct spdk_thread *thread; 2661 2662 thread = spdk_get_thread(); 2663 if (!thread) { 2664 assert(false); 2665 return -EINVAL; 2666 } 2667 2668 if (intr->thread != thread) { 2669 wrong_thread(__func__, intr->name, intr->thread, thread); 2670 return -EINVAL; 2671 } 2672 2673 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2674 } 2675 2676 int 2677 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2678 { 2679 return spdk_fd_group_get_fd(thread->fgrp); 2680 } 2681 2682 static bool g_interrupt_mode = false; 2683 2684 int 2685 spdk_interrupt_mode_enable(void) 2686 { 2687 /* It must be called once prior to initializing the threading library. 2688 * g_spdk_msg_mempool will be valid if thread library is initialized. 2689 */ 2690 if (g_spdk_msg_mempool) { 2691 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2692 return -1; 2693 } 2694 2695 #ifdef __linux__ 2696 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2697 g_interrupt_mode = true; 2698 return 0; 2699 #else 2700 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2701 g_interrupt_mode = false; 2702 return -ENOTSUP; 2703 #endif 2704 } 2705 2706 bool 2707 spdk_interrupt_mode_is_enabled(void) 2708 { 2709 return g_interrupt_mode; 2710 } 2711 2712 SPDK_LOG_REGISTER_COMPONENT(thread) 2713