1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/env.h" 9 #include "spdk/likely.h" 10 #include "spdk/queue.h" 11 #include "spdk/string.h" 12 #include "spdk/thread.h" 13 #include "spdk/trace.h" 14 #include "spdk/util.h" 15 #include "spdk/fd_group.h" 16 17 #include "spdk/log.h" 18 #include "spdk_internal/thread.h" 19 #include "spdk_internal/usdt.h" 20 #include "thread_internal.h" 21 22 #include "spdk_internal/trace_defs.h" 23 24 #ifdef __linux__ 25 #include <sys/timerfd.h> 26 #include <sys/eventfd.h> 27 #endif 28 29 #define SPDK_MSG_BATCH_SIZE 8 30 #define SPDK_MAX_DEVICE_NAME_LEN 256 31 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 32 #define SPDK_MAX_POLLER_NAME_LEN 256 33 #define SPDK_MAX_THREAD_NAME_LEN 256 34 35 enum spdk_poller_state { 36 /* The poller is registered with a thread but not currently executing its fn. */ 37 SPDK_POLLER_STATE_WAITING, 38 39 /* The poller is currently running its fn. */ 40 SPDK_POLLER_STATE_RUNNING, 41 42 /* The poller was unregistered during the execution of its fn. */ 43 SPDK_POLLER_STATE_UNREGISTERED, 44 45 /* The poller is in the process of being paused. It will be paused 46 * during the next time it's supposed to be executed. 47 */ 48 SPDK_POLLER_STATE_PAUSING, 49 50 /* The poller is registered but currently paused. It's on the 51 * paused_pollers list. 52 */ 53 SPDK_POLLER_STATE_PAUSED, 54 }; 55 56 struct spdk_poller { 57 TAILQ_ENTRY(spdk_poller) tailq; 58 RB_ENTRY(spdk_poller) node; 59 60 /* Current state of the poller; should only be accessed from the poller's thread. */ 61 enum spdk_poller_state state; 62 63 uint64_t period_ticks; 64 uint64_t next_run_tick; 65 uint64_t run_count; 66 uint64_t busy_count; 67 uint64_t id; 68 spdk_poller_fn fn; 69 void *arg; 70 struct spdk_thread *thread; 71 /* Native interruptfd for period or busy poller */ 72 int interruptfd; 73 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 74 void *set_intr_cb_arg; 75 76 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 77 }; 78 79 enum spdk_thread_state { 80 /* The thread is processing poller and message by spdk_thread_poll(). */ 81 SPDK_THREAD_STATE_RUNNING, 82 83 /* The thread is in the process of termination. It reaps unregistering 84 * poller are releasing I/O channel. 85 */ 86 SPDK_THREAD_STATE_EXITING, 87 88 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 89 SPDK_THREAD_STATE_EXITED, 90 }; 91 92 struct spdk_thread { 93 uint64_t tsc_last; 94 struct spdk_thread_stats stats; 95 /* 96 * Contains pollers actively running on this thread. Pollers 97 * are run round-robin. The thread takes one poller from the head 98 * of the ring, executes it, then puts it back at the tail of 99 * the ring. 100 */ 101 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 102 /** 103 * Contains pollers running on this thread with a periodic timer. 104 */ 105 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 106 struct spdk_poller *first_timed_poller; 107 /* 108 * Contains paused pollers. Pollers on this queue are waiting until 109 * they are resumed (in which case they're put onto the active/timer 110 * queues) or unregistered. 111 */ 112 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 113 struct spdk_ring *messages; 114 int msg_fd; 115 SLIST_HEAD(, spdk_msg) msg_cache; 116 size_t msg_cache_count; 117 spdk_msg_fn critical_msg; 118 uint64_t id; 119 uint64_t next_poller_id; 120 enum spdk_thread_state state; 121 int pending_unregister_count; 122 123 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 124 TAILQ_ENTRY(spdk_thread) tailq; 125 126 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 127 struct spdk_cpuset cpumask; 128 uint64_t exit_timeout_tsc; 129 130 /* Indicates whether this spdk_thread currently runs in interrupt. */ 131 bool in_interrupt; 132 bool poller_unregistered; 133 struct spdk_fd_group *fgrp; 134 135 /* User context allocated at the end */ 136 uint8_t ctx[0]; 137 }; 138 139 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 140 141 static spdk_new_thread_fn g_new_thread_fn = NULL; 142 static spdk_thread_op_fn g_thread_op_fn = NULL; 143 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 144 static size_t g_ctx_sz = 0; 145 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 146 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 147 * SPDK application is required. 148 */ 149 static uint64_t g_thread_id = 1; 150 151 struct io_device { 152 void *io_device; 153 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 154 spdk_io_channel_create_cb create_cb; 155 spdk_io_channel_destroy_cb destroy_cb; 156 spdk_io_device_unregister_cb unregister_cb; 157 struct spdk_thread *unregister_thread; 158 uint32_t ctx_size; 159 uint32_t for_each_count; 160 RB_ENTRY(io_device) node; 161 162 uint32_t refcnt; 163 164 bool unregistered; 165 }; 166 167 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 168 169 static int 170 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 171 { 172 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 173 } 174 175 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 176 177 static int 178 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 179 { 180 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 181 } 182 183 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 184 185 struct spdk_msg { 186 spdk_msg_fn fn; 187 void *arg; 188 189 SLIST_ENTRY(spdk_msg) link; 190 }; 191 192 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 193 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 194 195 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 196 static uint32_t g_thread_count = 0; 197 198 static __thread struct spdk_thread *tls_thread = NULL; 199 200 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 201 { 202 spdk_trace_register_description("THREAD_IOCH_GET", 203 TRACE_THREAD_IOCH_GET, 204 OWNER_NONE, OBJECT_NONE, 0, 205 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 206 spdk_trace_register_description("THREAD_IOCH_PUT", 207 TRACE_THREAD_IOCH_PUT, 208 OWNER_NONE, OBJECT_NONE, 0, 209 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 210 } 211 212 /* 213 * If this compare function returns zero when two next_run_ticks are equal, 214 * the macro RB_INSERT() returns a pointer to the element with the same 215 * next_run_tick. 216 * 217 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 218 * to remove as a parameter. 219 * 220 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 221 * side by returning 1 when two next_run_ticks are equal. 222 */ 223 static inline int 224 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 225 { 226 if (poller1->next_run_tick < poller2->next_run_tick) { 227 return -1; 228 } else { 229 return 1; 230 } 231 } 232 233 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 234 235 static inline struct spdk_thread * 236 _get_thread(void) 237 { 238 return tls_thread; 239 } 240 241 static int 242 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 243 { 244 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 245 246 g_ctx_sz = ctx_sz; 247 248 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 249 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 250 sizeof(struct spdk_msg), 251 0, /* No cache. We do our own. */ 252 SPDK_ENV_SOCKET_ID_ANY); 253 254 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 255 msg_mempool_sz); 256 257 if (!g_spdk_msg_mempool) { 258 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 259 return -1; 260 } 261 262 return 0; 263 } 264 265 int 266 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 267 { 268 assert(g_new_thread_fn == NULL); 269 assert(g_thread_op_fn == NULL); 270 271 if (new_thread_fn == NULL) { 272 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 273 } else { 274 g_new_thread_fn = new_thread_fn; 275 } 276 277 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 278 } 279 280 int 281 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 282 spdk_thread_op_supported_fn thread_op_supported_fn, 283 size_t ctx_sz, size_t msg_mempool_sz) 284 { 285 assert(g_new_thread_fn == NULL); 286 assert(g_thread_op_fn == NULL); 287 assert(g_thread_op_supported_fn == NULL); 288 289 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 290 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 291 return -EINVAL; 292 } 293 294 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 295 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 296 } else { 297 g_thread_op_fn = thread_op_fn; 298 g_thread_op_supported_fn = thread_op_supported_fn; 299 } 300 301 return _thread_lib_init(ctx_sz, msg_mempool_sz); 302 } 303 304 void 305 spdk_thread_lib_fini(void) 306 { 307 struct io_device *dev; 308 309 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 310 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 311 } 312 313 if (g_spdk_msg_mempool) { 314 spdk_mempool_free(g_spdk_msg_mempool); 315 g_spdk_msg_mempool = NULL; 316 } 317 318 g_new_thread_fn = NULL; 319 g_thread_op_fn = NULL; 320 g_thread_op_supported_fn = NULL; 321 g_ctx_sz = 0; 322 } 323 324 static void thread_interrupt_destroy(struct spdk_thread *thread); 325 static int thread_interrupt_create(struct spdk_thread *thread); 326 327 static void 328 _free_thread(struct spdk_thread *thread) 329 { 330 struct spdk_io_channel *ch; 331 struct spdk_msg *msg; 332 struct spdk_poller *poller, *ptmp; 333 334 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 335 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 336 thread->name, ch->dev->name); 337 } 338 339 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 340 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 341 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 342 poller->name); 343 } 344 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 345 free(poller); 346 } 347 348 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 349 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 350 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 351 poller->name); 352 } 353 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 354 free(poller); 355 } 356 357 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 358 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 359 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 360 free(poller); 361 } 362 363 pthread_mutex_lock(&g_devlist_mutex); 364 assert(g_thread_count > 0); 365 g_thread_count--; 366 TAILQ_REMOVE(&g_threads, thread, tailq); 367 pthread_mutex_unlock(&g_devlist_mutex); 368 369 msg = SLIST_FIRST(&thread->msg_cache); 370 while (msg != NULL) { 371 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 372 373 assert(thread->msg_cache_count > 0); 374 thread->msg_cache_count--; 375 spdk_mempool_put(g_spdk_msg_mempool, msg); 376 377 msg = SLIST_FIRST(&thread->msg_cache); 378 } 379 380 assert(thread->msg_cache_count == 0); 381 382 if (spdk_interrupt_mode_is_enabled()) { 383 thread_interrupt_destroy(thread); 384 } 385 386 spdk_ring_free(thread->messages); 387 free(thread); 388 } 389 390 struct spdk_thread * 391 spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) 392 { 393 struct spdk_thread *thread; 394 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 395 int rc = 0, i; 396 397 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 398 if (!thread) { 399 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 400 return NULL; 401 } 402 403 if (cpumask) { 404 spdk_cpuset_copy(&thread->cpumask, cpumask); 405 } else { 406 spdk_cpuset_negate(&thread->cpumask); 407 } 408 409 RB_INIT(&thread->io_channels); 410 TAILQ_INIT(&thread->active_pollers); 411 RB_INIT(&thread->timed_pollers); 412 TAILQ_INIT(&thread->paused_pollers); 413 SLIST_INIT(&thread->msg_cache); 414 thread->msg_cache_count = 0; 415 416 thread->tsc_last = spdk_get_ticks(); 417 418 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 419 * ID exceeds UINT64_MAX a warning message is logged 420 */ 421 thread->next_poller_id = 1; 422 423 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 424 if (!thread->messages) { 425 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 426 free(thread); 427 return NULL; 428 } 429 430 /* Fill the local message pool cache. */ 431 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 432 if (rc == 0) { 433 /* If we can't populate the cache it's ok. The cache will get filled 434 * up organically as messages are passed to the thread. */ 435 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 436 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 437 thread->msg_cache_count++; 438 } 439 } 440 441 if (name) { 442 snprintf(thread->name, sizeof(thread->name), "%s", name); 443 } else { 444 snprintf(thread->name, sizeof(thread->name), "%p", thread); 445 } 446 447 pthread_mutex_lock(&g_devlist_mutex); 448 if (g_thread_id == 0) { 449 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 450 pthread_mutex_unlock(&g_devlist_mutex); 451 _free_thread(thread); 452 return NULL; 453 } 454 thread->id = g_thread_id++; 455 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 456 g_thread_count++; 457 pthread_mutex_unlock(&g_devlist_mutex); 458 459 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 460 thread->id, thread->name); 461 462 if (spdk_interrupt_mode_is_enabled()) { 463 thread->in_interrupt = true; 464 rc = thread_interrupt_create(thread); 465 if (rc != 0) { 466 _free_thread(thread); 467 return NULL; 468 } 469 } 470 471 if (g_new_thread_fn) { 472 rc = g_new_thread_fn(thread); 473 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 474 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 475 } 476 477 if (rc != 0) { 478 _free_thread(thread); 479 return NULL; 480 } 481 482 thread->state = SPDK_THREAD_STATE_RUNNING; 483 484 return thread; 485 } 486 487 void 488 spdk_set_thread(struct spdk_thread *thread) 489 { 490 tls_thread = thread; 491 } 492 493 static void 494 thread_exit(struct spdk_thread *thread, uint64_t now) 495 { 496 struct spdk_poller *poller; 497 struct spdk_io_channel *ch; 498 499 if (now >= thread->exit_timeout_tsc) { 500 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 501 thread->name); 502 goto exited; 503 } 504 505 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 506 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 507 SPDK_INFOLOG(thread, 508 "thread %s still has active poller %s\n", 509 thread->name, poller->name); 510 return; 511 } 512 } 513 514 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 515 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 516 SPDK_INFOLOG(thread, 517 "thread %s still has active timed poller %s\n", 518 thread->name, poller->name); 519 return; 520 } 521 } 522 523 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 524 SPDK_INFOLOG(thread, 525 "thread %s still has paused poller %s\n", 526 thread->name, poller->name); 527 return; 528 } 529 530 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 531 SPDK_INFOLOG(thread, 532 "thread %s still has channel for io_device %s\n", 533 thread->name, ch->dev->name); 534 return; 535 } 536 537 if (thread->pending_unregister_count > 0) { 538 SPDK_INFOLOG(thread, 539 "thread %s is still unregistering io_devices\n", 540 thread->name); 541 return; 542 } 543 544 exited: 545 thread->state = SPDK_THREAD_STATE_EXITED; 546 if (spdk_unlikely(thread->in_interrupt)) { 547 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 548 } 549 } 550 551 int 552 spdk_thread_exit(struct spdk_thread *thread) 553 { 554 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 555 556 assert(tls_thread == thread); 557 558 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 559 SPDK_INFOLOG(thread, 560 "thread %s is already exiting\n", 561 thread->name); 562 return 0; 563 } 564 565 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 566 SPDK_THREAD_EXIT_TIMEOUT_SEC); 567 thread->state = SPDK_THREAD_STATE_EXITING; 568 return 0; 569 } 570 571 bool 572 spdk_thread_is_exited(struct spdk_thread *thread) 573 { 574 return thread->state == SPDK_THREAD_STATE_EXITED; 575 } 576 577 void 578 spdk_thread_destroy(struct spdk_thread *thread) 579 { 580 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 581 582 assert(thread->state == SPDK_THREAD_STATE_EXITED); 583 584 if (tls_thread == thread) { 585 tls_thread = NULL; 586 } 587 588 _free_thread(thread); 589 } 590 591 void * 592 spdk_thread_get_ctx(struct spdk_thread *thread) 593 { 594 if (g_ctx_sz > 0) { 595 return thread->ctx; 596 } 597 598 return NULL; 599 } 600 601 struct spdk_cpuset * 602 spdk_thread_get_cpumask(struct spdk_thread *thread) 603 { 604 return &thread->cpumask; 605 } 606 607 int 608 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 609 { 610 struct spdk_thread *thread; 611 612 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 613 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 614 assert(false); 615 return -ENOTSUP; 616 } 617 618 thread = spdk_get_thread(); 619 if (!thread) { 620 SPDK_ERRLOG("Called from non-SPDK thread\n"); 621 assert(false); 622 return -EINVAL; 623 } 624 625 spdk_cpuset_copy(&thread->cpumask, cpumask); 626 627 /* Invoke framework's reschedule operation. If this function is called multiple times 628 * in a single spdk_thread_poll() context, the last cpumask will be used in the 629 * reschedule operation. 630 */ 631 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 632 633 return 0; 634 } 635 636 struct spdk_thread * 637 spdk_thread_get_from_ctx(void *ctx) 638 { 639 if (ctx == NULL) { 640 assert(false); 641 return NULL; 642 } 643 644 assert(g_ctx_sz > 0); 645 646 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 647 } 648 649 static inline uint32_t 650 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 651 { 652 unsigned count, i; 653 void *messages[SPDK_MSG_BATCH_SIZE]; 654 uint64_t notify = 1; 655 int rc; 656 657 #ifdef DEBUG 658 /* 659 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 660 * so we will never actually read uninitialized data from events, but just to be sure 661 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 662 */ 663 memset(messages, 0, sizeof(messages)); 664 #endif 665 666 if (max_msgs > 0) { 667 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 668 } else { 669 max_msgs = SPDK_MSG_BATCH_SIZE; 670 } 671 672 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 673 if (spdk_unlikely(thread->in_interrupt) && 674 spdk_ring_count(thread->messages) != 0) { 675 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 676 if (rc < 0) { 677 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 678 } 679 } 680 if (count == 0) { 681 return 0; 682 } 683 684 for (i = 0; i < count; i++) { 685 struct spdk_msg *msg = messages[i]; 686 687 assert(msg != NULL); 688 689 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 690 691 msg->fn(msg->arg); 692 693 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 694 /* Insert the messages at the head. We want to re-use the hot 695 * ones. */ 696 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 697 thread->msg_cache_count++; 698 } else { 699 spdk_mempool_put(g_spdk_msg_mempool, msg); 700 } 701 } 702 703 return count; 704 } 705 706 static void 707 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 708 { 709 struct spdk_poller *tmp __attribute__((unused)); 710 711 poller->next_run_tick = now + poller->period_ticks; 712 713 /* 714 * Insert poller in the thread's timed_pollers tree by next scheduled run time 715 * as its key. 716 */ 717 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 718 assert(tmp == NULL); 719 720 /* Update the cache only if it is empty or the inserted poller is earlier than it. 721 * RB_MIN() is not necessary here because all pollers, which has exactly the same 722 * next_run_tick as the existing poller, are inserted on the right side. 723 */ 724 if (thread->first_timed_poller == NULL || 725 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 726 thread->first_timed_poller = poller; 727 } 728 } 729 730 static inline void 731 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 732 { 733 struct spdk_poller *tmp __attribute__((unused)); 734 735 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 736 assert(tmp != NULL); 737 738 /* This function is not used in any case that is performance critical. 739 * Update the cache simply by RB_MIN() if it needs to be changed. 740 */ 741 if (thread->first_timed_poller == poller) { 742 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 743 } 744 } 745 746 static void 747 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 748 { 749 if (poller->period_ticks) { 750 poller_insert_timer(thread, poller, spdk_get_ticks()); 751 } else { 752 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 753 } 754 } 755 756 static inline void 757 thread_update_stats(struct spdk_thread *thread, uint64_t end, 758 uint64_t start, int rc) 759 { 760 if (rc == 0) { 761 /* Poller status idle */ 762 thread->stats.idle_tsc += end - start; 763 } else if (rc > 0) { 764 /* Poller status busy */ 765 thread->stats.busy_tsc += end - start; 766 } 767 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 768 thread->tsc_last = end; 769 } 770 771 static inline int 772 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 773 { 774 int rc; 775 776 switch (poller->state) { 777 case SPDK_POLLER_STATE_UNREGISTERED: 778 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 779 free(poller); 780 return 0; 781 case SPDK_POLLER_STATE_PAUSING: 782 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 783 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 784 poller->state = SPDK_POLLER_STATE_PAUSED; 785 return 0; 786 case SPDK_POLLER_STATE_WAITING: 787 break; 788 default: 789 assert(false); 790 break; 791 } 792 793 poller->state = SPDK_POLLER_STATE_RUNNING; 794 rc = poller->fn(poller->arg); 795 796 poller->run_count++; 797 if (rc > 0) { 798 poller->busy_count++; 799 } 800 801 #ifdef DEBUG 802 if (rc == -1) { 803 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 804 } 805 #endif 806 807 switch (poller->state) { 808 case SPDK_POLLER_STATE_UNREGISTERED: 809 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 810 free(poller); 811 break; 812 case SPDK_POLLER_STATE_PAUSING: 813 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 814 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 815 poller->state = SPDK_POLLER_STATE_PAUSED; 816 break; 817 case SPDK_POLLER_STATE_PAUSED: 818 case SPDK_POLLER_STATE_WAITING: 819 break; 820 case SPDK_POLLER_STATE_RUNNING: 821 poller->state = SPDK_POLLER_STATE_WAITING; 822 break; 823 default: 824 assert(false); 825 break; 826 } 827 828 return rc; 829 } 830 831 static inline int 832 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 833 uint64_t now) 834 { 835 int rc; 836 837 switch (poller->state) { 838 case SPDK_POLLER_STATE_UNREGISTERED: 839 free(poller); 840 return 0; 841 case SPDK_POLLER_STATE_PAUSING: 842 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 843 poller->state = SPDK_POLLER_STATE_PAUSED; 844 return 0; 845 case SPDK_POLLER_STATE_WAITING: 846 break; 847 default: 848 assert(false); 849 break; 850 } 851 852 poller->state = SPDK_POLLER_STATE_RUNNING; 853 rc = poller->fn(poller->arg); 854 855 poller->run_count++; 856 if (rc > 0) { 857 poller->busy_count++; 858 } 859 860 #ifdef DEBUG 861 if (rc == -1) { 862 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 863 } 864 #endif 865 866 switch (poller->state) { 867 case SPDK_POLLER_STATE_UNREGISTERED: 868 free(poller); 869 break; 870 case SPDK_POLLER_STATE_PAUSING: 871 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 872 poller->state = SPDK_POLLER_STATE_PAUSED; 873 break; 874 case SPDK_POLLER_STATE_PAUSED: 875 break; 876 case SPDK_POLLER_STATE_RUNNING: 877 poller->state = SPDK_POLLER_STATE_WAITING; 878 /* fallthrough */ 879 case SPDK_POLLER_STATE_WAITING: 880 poller_insert_timer(thread, poller, now); 881 break; 882 default: 883 assert(false); 884 break; 885 } 886 887 return rc; 888 } 889 890 static int 891 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 892 { 893 uint32_t msg_count; 894 struct spdk_poller *poller, *tmp; 895 spdk_msg_fn critical_msg; 896 int rc = 0; 897 898 thread->tsc_last = now; 899 900 critical_msg = thread->critical_msg; 901 if (spdk_unlikely(critical_msg != NULL)) { 902 critical_msg(NULL); 903 thread->critical_msg = NULL; 904 rc = 1; 905 } 906 907 msg_count = msg_queue_run_batch(thread, max_msgs); 908 if (msg_count) { 909 rc = 1; 910 } 911 912 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 913 active_pollers_head, tailq, tmp) { 914 int poller_rc; 915 916 poller_rc = thread_execute_poller(thread, poller); 917 if (poller_rc > rc) { 918 rc = poller_rc; 919 } 920 } 921 922 poller = thread->first_timed_poller; 923 while (poller != NULL) { 924 int timer_rc = 0; 925 926 if (now < poller->next_run_tick) { 927 break; 928 } 929 930 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 931 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 932 933 /* Update the cache to the next timed poller in the list 934 * only if the current poller is still the closest, otherwise, 935 * do nothing because the cache has been already updated. 936 */ 937 if (thread->first_timed_poller == poller) { 938 thread->first_timed_poller = tmp; 939 } 940 941 timer_rc = thread_execute_timed_poller(thread, poller, now); 942 if (timer_rc > rc) { 943 rc = timer_rc; 944 } 945 946 poller = tmp; 947 } 948 949 return rc; 950 } 951 952 int 953 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 954 { 955 struct spdk_thread *orig_thread; 956 int rc; 957 uint64_t notify = 1; 958 959 orig_thread = _get_thread(); 960 tls_thread = thread; 961 962 if (now == 0) { 963 now = spdk_get_ticks(); 964 } 965 966 if (spdk_likely(!thread->in_interrupt)) { 967 rc = thread_poll(thread, max_msgs, now); 968 if (spdk_unlikely(thread->in_interrupt)) { 969 /* The thread transitioned to interrupt mode during the above poll. 970 * Poll it one more time in case that during the transition time 971 * there is msg received without notification. 972 */ 973 rc = thread_poll(thread, max_msgs, now); 974 } 975 } else { 976 /* Non-block wait on thread's fd_group */ 977 rc = spdk_fd_group_wait(thread->fgrp, 0); 978 if (spdk_unlikely(!thread->in_interrupt)) { 979 /* The thread transitioned to poll mode in a msg during the above processing. 980 * Clear msg_fd since thread messages will be polled directly in poll mode. 981 */ 982 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 983 if (rc < 0 && errno != EAGAIN) { 984 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 985 } 986 } 987 988 /* Reap unregistered pollers out of poller execution in intr mode */ 989 if (spdk_unlikely(thread->poller_unregistered)) { 990 struct spdk_poller *poller, *tmp; 991 992 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 993 active_pollers_head, tailq, tmp) { 994 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 995 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 996 free(poller); 997 } 998 } 999 1000 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1001 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1002 poller_remove_timer(thread, poller); 1003 free(poller); 1004 } 1005 } 1006 1007 thread->poller_unregistered = false; 1008 } 1009 } 1010 1011 1012 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1013 thread_exit(thread, now); 1014 } 1015 1016 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1017 1018 tls_thread = orig_thread; 1019 1020 return rc; 1021 } 1022 1023 uint64_t 1024 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1025 { 1026 struct spdk_poller *poller; 1027 1028 poller = thread->first_timed_poller; 1029 if (poller) { 1030 return poller->next_run_tick; 1031 } 1032 1033 return 0; 1034 } 1035 1036 int 1037 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1038 { 1039 return !TAILQ_EMPTY(&thread->active_pollers); 1040 } 1041 1042 static bool 1043 thread_has_unpaused_pollers(struct spdk_thread *thread) 1044 { 1045 if (TAILQ_EMPTY(&thread->active_pollers) && 1046 RB_EMPTY(&thread->timed_pollers)) { 1047 return false; 1048 } 1049 1050 return true; 1051 } 1052 1053 bool 1054 spdk_thread_has_pollers(struct spdk_thread *thread) 1055 { 1056 if (!thread_has_unpaused_pollers(thread) && 1057 TAILQ_EMPTY(&thread->paused_pollers)) { 1058 return false; 1059 } 1060 1061 return true; 1062 } 1063 1064 bool 1065 spdk_thread_is_idle(struct spdk_thread *thread) 1066 { 1067 if (spdk_ring_count(thread->messages) || 1068 thread_has_unpaused_pollers(thread) || 1069 thread->critical_msg != NULL) { 1070 return false; 1071 } 1072 1073 return true; 1074 } 1075 1076 uint32_t 1077 spdk_thread_get_count(void) 1078 { 1079 /* 1080 * Return cached value of the current thread count. We could acquire the 1081 * lock and iterate through the TAILQ of threads to count them, but that 1082 * count could still be invalidated after we release the lock. 1083 */ 1084 return g_thread_count; 1085 } 1086 1087 struct spdk_thread * 1088 spdk_get_thread(void) 1089 { 1090 return _get_thread(); 1091 } 1092 1093 const char * 1094 spdk_thread_get_name(const struct spdk_thread *thread) 1095 { 1096 return thread->name; 1097 } 1098 1099 uint64_t 1100 spdk_thread_get_id(const struct spdk_thread *thread) 1101 { 1102 return thread->id; 1103 } 1104 1105 struct spdk_thread * 1106 spdk_thread_get_by_id(uint64_t id) 1107 { 1108 struct spdk_thread *thread; 1109 1110 if (id == 0 || id >= g_thread_id) { 1111 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1112 return NULL; 1113 } 1114 pthread_mutex_lock(&g_devlist_mutex); 1115 TAILQ_FOREACH(thread, &g_threads, tailq) { 1116 if (thread->id == id) { 1117 break; 1118 } 1119 } 1120 pthread_mutex_unlock(&g_devlist_mutex); 1121 return thread; 1122 } 1123 1124 int 1125 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1126 { 1127 struct spdk_thread *thread; 1128 1129 thread = _get_thread(); 1130 if (!thread) { 1131 SPDK_ERRLOG("No thread allocated\n"); 1132 return -EINVAL; 1133 } 1134 1135 if (stats == NULL) { 1136 return -EINVAL; 1137 } 1138 1139 *stats = thread->stats; 1140 1141 return 0; 1142 } 1143 1144 uint64_t 1145 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1146 { 1147 if (thread == NULL) { 1148 thread = _get_thread(); 1149 } 1150 1151 return thread->tsc_last; 1152 } 1153 1154 static inline int 1155 thread_send_msg_notification(const struct spdk_thread *target_thread) 1156 { 1157 uint64_t notify = 1; 1158 int rc; 1159 1160 /* Not necessary to do notification if interrupt facility is not enabled */ 1161 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1162 return 0; 1163 } 1164 1165 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1166 * after sending thread msg, it is necessary to check whether target thread runs in 1167 * interrupt mode and then decide whether do event notification. 1168 */ 1169 if (spdk_unlikely(target_thread->in_interrupt)) { 1170 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1171 if (rc < 0) { 1172 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1173 return -EIO; 1174 } 1175 } 1176 1177 return 0; 1178 } 1179 1180 int 1181 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1182 { 1183 struct spdk_thread *local_thread; 1184 struct spdk_msg *msg; 1185 int rc; 1186 1187 assert(thread != NULL); 1188 1189 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1190 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1191 return -EIO; 1192 } 1193 1194 local_thread = _get_thread(); 1195 1196 msg = NULL; 1197 if (local_thread != NULL) { 1198 if (local_thread->msg_cache_count > 0) { 1199 msg = SLIST_FIRST(&local_thread->msg_cache); 1200 assert(msg != NULL); 1201 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1202 local_thread->msg_cache_count--; 1203 } 1204 } 1205 1206 if (msg == NULL) { 1207 msg = spdk_mempool_get(g_spdk_msg_mempool); 1208 if (!msg) { 1209 SPDK_ERRLOG("msg could not be allocated\n"); 1210 return -ENOMEM; 1211 } 1212 } 1213 1214 msg->fn = fn; 1215 msg->arg = ctx; 1216 1217 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1218 if (rc != 1) { 1219 SPDK_ERRLOG("msg could not be enqueued\n"); 1220 spdk_mempool_put(g_spdk_msg_mempool, msg); 1221 return -EIO; 1222 } 1223 1224 return thread_send_msg_notification(thread); 1225 } 1226 1227 int 1228 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1229 { 1230 spdk_msg_fn expected = NULL; 1231 1232 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1233 __ATOMIC_SEQ_CST)) { 1234 return -EIO; 1235 } 1236 1237 return thread_send_msg_notification(thread); 1238 } 1239 1240 #ifdef __linux__ 1241 static int 1242 interrupt_timerfd_process(void *arg) 1243 { 1244 struct spdk_poller *poller = arg; 1245 uint64_t exp; 1246 int rc; 1247 1248 /* clear the level of interval timer */ 1249 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1250 if (rc < 0) { 1251 if (rc == -EAGAIN) { 1252 return 0; 1253 } 1254 1255 return rc; 1256 } 1257 1258 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1259 1260 return poller->fn(poller->arg); 1261 } 1262 1263 static int 1264 period_poller_interrupt_init(struct spdk_poller *poller) 1265 { 1266 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1267 int timerfd; 1268 int rc; 1269 1270 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1271 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1272 if (timerfd < 0) { 1273 return -errno; 1274 } 1275 1276 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1277 if (rc < 0) { 1278 close(timerfd); 1279 return rc; 1280 } 1281 1282 poller->interruptfd = timerfd; 1283 return 0; 1284 } 1285 1286 static void 1287 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1288 { 1289 int timerfd = poller->interruptfd; 1290 uint64_t now_tick = spdk_get_ticks(); 1291 uint64_t ticks = spdk_get_ticks_hz(); 1292 int ret; 1293 struct itimerspec new_tv = {}; 1294 struct itimerspec old_tv = {}; 1295 1296 assert(poller->period_ticks != 0); 1297 assert(timerfd >= 0); 1298 1299 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1300 interrupt_mode ? "interrupt" : "poll"); 1301 1302 if (interrupt_mode) { 1303 /* Set repeated timer expiration */ 1304 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1305 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1306 1307 /* Update next timer expiration */ 1308 if (poller->next_run_tick == 0) { 1309 poller->next_run_tick = now_tick + poller->period_ticks; 1310 } else if (poller->next_run_tick < now_tick) { 1311 poller->next_run_tick = now_tick; 1312 } 1313 1314 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1315 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1316 1317 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1318 if (ret < 0) { 1319 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1320 assert(false); 1321 } 1322 } else { 1323 /* Disarm the timer */ 1324 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1325 if (ret < 0) { 1326 /* timerfd_settime's failure indicates that the timerfd is in error */ 1327 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1328 assert(false); 1329 } 1330 1331 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1332 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1333 */ 1334 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1335 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1336 poller_remove_timer(poller->thread, poller); 1337 poller_insert_timer(poller->thread, poller, now_tick); 1338 } 1339 } 1340 1341 static void 1342 poller_interrupt_fini(struct spdk_poller *poller) 1343 { 1344 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1345 assert(poller->interruptfd >= 0); 1346 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1347 close(poller->interruptfd); 1348 poller->interruptfd = -1; 1349 } 1350 1351 static int 1352 busy_poller_interrupt_init(struct spdk_poller *poller) 1353 { 1354 int busy_efd; 1355 int rc; 1356 1357 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1358 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1359 if (busy_efd < 0) { 1360 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1361 return -errno; 1362 } 1363 1364 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1365 poller->fn, poller->arg, poller->name); 1366 if (rc < 0) { 1367 close(busy_efd); 1368 return rc; 1369 } 1370 1371 poller->interruptfd = busy_efd; 1372 return 0; 1373 } 1374 1375 static void 1376 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1377 { 1378 int busy_efd = poller->interruptfd; 1379 uint64_t notify = 1; 1380 int rc __attribute__((unused)); 1381 1382 assert(busy_efd >= 0); 1383 1384 if (interrupt_mode) { 1385 /* Write without read on eventfd will get it repeatedly triggered. */ 1386 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1387 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1388 } 1389 } else { 1390 /* Read on eventfd will clear its level triggering. */ 1391 rc = read(busy_efd, ¬ify, sizeof(notify)); 1392 } 1393 } 1394 1395 #else 1396 1397 static int 1398 period_poller_interrupt_init(struct spdk_poller *poller) 1399 { 1400 return -ENOTSUP; 1401 } 1402 1403 static void 1404 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1405 { 1406 } 1407 1408 static void 1409 poller_interrupt_fini(struct spdk_poller *poller) 1410 { 1411 } 1412 1413 static int 1414 busy_poller_interrupt_init(struct spdk_poller *poller) 1415 { 1416 return -ENOTSUP; 1417 } 1418 1419 static void 1420 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1421 { 1422 } 1423 1424 #endif 1425 1426 void 1427 spdk_poller_register_interrupt(struct spdk_poller *poller, 1428 spdk_poller_set_interrupt_mode_cb cb_fn, 1429 void *cb_arg) 1430 { 1431 assert(poller != NULL); 1432 assert(cb_fn != NULL); 1433 assert(spdk_get_thread() == poller->thread); 1434 1435 if (!spdk_interrupt_mode_is_enabled()) { 1436 return; 1437 } 1438 1439 /* when a poller is created we don't know if the user is ever going to 1440 * enable interrupts on it by calling this function, so the poller 1441 * registration function has to immediately create a interruptfd. 1442 * When this function does get called by user, we have to then destroy 1443 * that interruptfd. 1444 */ 1445 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1446 poller_interrupt_fini(poller); 1447 } 1448 1449 poller->set_intr_cb_fn = cb_fn; 1450 poller->set_intr_cb_arg = cb_arg; 1451 1452 /* Set poller into interrupt mode if thread is in interrupt. */ 1453 if (poller->thread->in_interrupt) { 1454 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1455 } 1456 } 1457 1458 static uint64_t 1459 convert_us_to_ticks(uint64_t us) 1460 { 1461 uint64_t quotient, remainder, ticks; 1462 1463 if (us) { 1464 quotient = us / SPDK_SEC_TO_USEC; 1465 remainder = us % SPDK_SEC_TO_USEC; 1466 ticks = spdk_get_ticks_hz(); 1467 1468 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1469 } else { 1470 return 0; 1471 } 1472 } 1473 1474 static struct spdk_poller * 1475 poller_register(spdk_poller_fn fn, 1476 void *arg, 1477 uint64_t period_microseconds, 1478 const char *name) 1479 { 1480 struct spdk_thread *thread; 1481 struct spdk_poller *poller; 1482 1483 thread = spdk_get_thread(); 1484 if (!thread) { 1485 assert(false); 1486 return NULL; 1487 } 1488 1489 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1490 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1491 return NULL; 1492 } 1493 1494 poller = calloc(1, sizeof(*poller)); 1495 if (poller == NULL) { 1496 SPDK_ERRLOG("Poller memory allocation failed\n"); 1497 return NULL; 1498 } 1499 1500 if (name) { 1501 snprintf(poller->name, sizeof(poller->name), "%s", name); 1502 } else { 1503 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1504 } 1505 1506 poller->state = SPDK_POLLER_STATE_WAITING; 1507 poller->fn = fn; 1508 poller->arg = arg; 1509 poller->thread = thread; 1510 poller->interruptfd = -1; 1511 if (thread->next_poller_id == 0) { 1512 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1513 thread->next_poller_id = 1; 1514 } 1515 poller->id = thread->next_poller_id++; 1516 1517 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1518 1519 if (spdk_interrupt_mode_is_enabled()) { 1520 int rc; 1521 1522 if (period_microseconds) { 1523 rc = period_poller_interrupt_init(poller); 1524 if (rc < 0) { 1525 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1526 free(poller); 1527 return NULL; 1528 } 1529 1530 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1531 } else { 1532 /* If the poller doesn't have a period, create interruptfd that's always 1533 * busy automatically when running in interrupt mode. 1534 */ 1535 rc = busy_poller_interrupt_init(poller); 1536 if (rc > 0) { 1537 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1538 free(poller); 1539 return NULL; 1540 } 1541 1542 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1543 } 1544 } 1545 1546 thread_insert_poller(thread, poller); 1547 1548 return poller; 1549 } 1550 1551 struct spdk_poller * 1552 spdk_poller_register(spdk_poller_fn fn, 1553 void *arg, 1554 uint64_t period_microseconds) 1555 { 1556 return poller_register(fn, arg, period_microseconds, NULL); 1557 } 1558 1559 struct spdk_poller * 1560 spdk_poller_register_named(spdk_poller_fn fn, 1561 void *arg, 1562 uint64_t period_microseconds, 1563 const char *name) 1564 { 1565 return poller_register(fn, arg, period_microseconds, name); 1566 } 1567 1568 static void 1569 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1570 struct spdk_thread *curthread) 1571 { 1572 if (thread == NULL) { 1573 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1574 abort(); 1575 } 1576 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1577 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1578 thread->name, thread->id); 1579 assert(false); 1580 } 1581 1582 void 1583 spdk_poller_unregister(struct spdk_poller **ppoller) 1584 { 1585 struct spdk_thread *thread; 1586 struct spdk_poller *poller; 1587 1588 poller = *ppoller; 1589 if (poller == NULL) { 1590 return; 1591 } 1592 1593 *ppoller = NULL; 1594 1595 thread = spdk_get_thread(); 1596 if (!thread) { 1597 assert(false); 1598 return; 1599 } 1600 1601 if (poller->thread != thread) { 1602 wrong_thread(__func__, poller->name, poller->thread, thread); 1603 return; 1604 } 1605 1606 if (spdk_interrupt_mode_is_enabled()) { 1607 /* Release the interrupt resource for period or busy poller */ 1608 if (poller->interruptfd >= 0) { 1609 poller_interrupt_fini(poller); 1610 } 1611 1612 /* Mark there is poller unregistered. Then unregistered pollers will 1613 * get reaped by spdk_thread_poll also in intr mode. 1614 */ 1615 thread->poller_unregistered = true; 1616 } 1617 1618 /* If the poller was paused, put it on the active_pollers list so that 1619 * its unregistration can be processed by spdk_thread_poll(). 1620 */ 1621 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1622 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1623 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1624 poller->period_ticks = 0; 1625 } 1626 1627 /* Simply set the state to unregistered. The poller will get cleaned up 1628 * in a subsequent call to spdk_thread_poll(). 1629 */ 1630 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1631 } 1632 1633 void 1634 spdk_poller_pause(struct spdk_poller *poller) 1635 { 1636 struct spdk_thread *thread; 1637 1638 thread = spdk_get_thread(); 1639 if (!thread) { 1640 assert(false); 1641 return; 1642 } 1643 1644 if (poller->thread != thread) { 1645 wrong_thread(__func__, poller->name, poller->thread, thread); 1646 return; 1647 } 1648 1649 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1650 * spdk_thread_poll() move it. It allows a poller to be paused from 1651 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1652 * iteration, or from within itself without breaking the logic to always 1653 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1654 */ 1655 switch (poller->state) { 1656 case SPDK_POLLER_STATE_PAUSED: 1657 case SPDK_POLLER_STATE_PAUSING: 1658 break; 1659 case SPDK_POLLER_STATE_RUNNING: 1660 case SPDK_POLLER_STATE_WAITING: 1661 poller->state = SPDK_POLLER_STATE_PAUSING; 1662 break; 1663 default: 1664 assert(false); 1665 break; 1666 } 1667 } 1668 1669 void 1670 spdk_poller_resume(struct spdk_poller *poller) 1671 { 1672 struct spdk_thread *thread; 1673 1674 thread = spdk_get_thread(); 1675 if (!thread) { 1676 assert(false); 1677 return; 1678 } 1679 1680 if (poller->thread != thread) { 1681 wrong_thread(__func__, poller->name, poller->thread, thread); 1682 return; 1683 } 1684 1685 /* If a poller is paused it has to be removed from the paused pollers 1686 * list and put on the active list or timer tree depending on its 1687 * period_ticks. If a poller is still in the process of being paused, 1688 * we just need to flip its state back to waiting, as it's already on 1689 * the appropriate list or tree. 1690 */ 1691 switch (poller->state) { 1692 case SPDK_POLLER_STATE_PAUSED: 1693 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1694 thread_insert_poller(thread, poller); 1695 /* fallthrough */ 1696 case SPDK_POLLER_STATE_PAUSING: 1697 poller->state = SPDK_POLLER_STATE_WAITING; 1698 break; 1699 case SPDK_POLLER_STATE_RUNNING: 1700 case SPDK_POLLER_STATE_WAITING: 1701 break; 1702 default: 1703 assert(false); 1704 break; 1705 } 1706 } 1707 1708 const char * 1709 spdk_poller_get_name(struct spdk_poller *poller) 1710 { 1711 return poller->name; 1712 } 1713 1714 uint64_t 1715 spdk_poller_get_id(struct spdk_poller *poller) 1716 { 1717 return poller->id; 1718 } 1719 1720 const char * 1721 spdk_poller_get_state_str(struct spdk_poller *poller) 1722 { 1723 switch (poller->state) { 1724 case SPDK_POLLER_STATE_WAITING: 1725 return "waiting"; 1726 case SPDK_POLLER_STATE_RUNNING: 1727 return "running"; 1728 case SPDK_POLLER_STATE_UNREGISTERED: 1729 return "unregistered"; 1730 case SPDK_POLLER_STATE_PAUSING: 1731 return "pausing"; 1732 case SPDK_POLLER_STATE_PAUSED: 1733 return "paused"; 1734 default: 1735 return NULL; 1736 } 1737 } 1738 1739 uint64_t 1740 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1741 { 1742 return poller->period_ticks; 1743 } 1744 1745 void 1746 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1747 { 1748 stats->run_count = poller->run_count; 1749 stats->busy_count = poller->busy_count; 1750 } 1751 1752 struct spdk_poller * 1753 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1754 { 1755 return TAILQ_FIRST(&thread->active_pollers); 1756 } 1757 1758 struct spdk_poller * 1759 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1760 { 1761 return TAILQ_NEXT(prev, tailq); 1762 } 1763 1764 struct spdk_poller * 1765 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1766 { 1767 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1768 } 1769 1770 struct spdk_poller * 1771 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1772 { 1773 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1774 } 1775 1776 struct spdk_poller * 1777 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1778 { 1779 return TAILQ_FIRST(&thread->paused_pollers); 1780 } 1781 1782 struct spdk_poller * 1783 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1784 { 1785 return TAILQ_NEXT(prev, tailq); 1786 } 1787 1788 struct spdk_io_channel * 1789 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1790 { 1791 return RB_MIN(io_channel_tree, &thread->io_channels); 1792 } 1793 1794 struct spdk_io_channel * 1795 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1796 { 1797 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1798 } 1799 1800 struct call_thread { 1801 struct spdk_thread *cur_thread; 1802 spdk_msg_fn fn; 1803 void *ctx; 1804 1805 struct spdk_thread *orig_thread; 1806 spdk_msg_fn cpl; 1807 }; 1808 1809 static void 1810 _on_thread(void *ctx) 1811 { 1812 struct call_thread *ct = ctx; 1813 int rc __attribute__((unused)); 1814 1815 ct->fn(ct->ctx); 1816 1817 pthread_mutex_lock(&g_devlist_mutex); 1818 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1819 while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) { 1820 SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n", 1821 ct->cur_thread->name); 1822 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1823 } 1824 pthread_mutex_unlock(&g_devlist_mutex); 1825 1826 if (!ct->cur_thread) { 1827 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1828 1829 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1830 free(ctx); 1831 } else { 1832 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1833 ct->cur_thread->name); 1834 1835 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1836 } 1837 assert(rc == 0); 1838 } 1839 1840 void 1841 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1842 { 1843 struct call_thread *ct; 1844 struct spdk_thread *thread; 1845 int rc __attribute__((unused)); 1846 1847 ct = calloc(1, sizeof(*ct)); 1848 if (!ct) { 1849 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1850 cpl(ctx); 1851 return; 1852 } 1853 1854 ct->fn = fn; 1855 ct->ctx = ctx; 1856 ct->cpl = cpl; 1857 1858 thread = _get_thread(); 1859 if (!thread) { 1860 SPDK_ERRLOG("No thread allocated\n"); 1861 free(ct); 1862 cpl(ctx); 1863 return; 1864 } 1865 ct->orig_thread = thread; 1866 1867 pthread_mutex_lock(&g_devlist_mutex); 1868 ct->cur_thread = TAILQ_FIRST(&g_threads); 1869 pthread_mutex_unlock(&g_devlist_mutex); 1870 1871 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1872 ct->orig_thread->name); 1873 1874 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1875 assert(rc == 0); 1876 } 1877 1878 static inline void 1879 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1880 { 1881 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1882 return; 1883 } 1884 1885 if (!poller->set_intr_cb_fn) { 1886 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1887 assert(false); 1888 return; 1889 } 1890 1891 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1892 } 1893 1894 void 1895 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1896 { 1897 struct spdk_thread *thread = _get_thread(); 1898 struct spdk_poller *poller, *tmp; 1899 1900 assert(thread); 1901 assert(spdk_interrupt_mode_is_enabled()); 1902 1903 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1904 thread->name, enable_interrupt ? "intr" : "poll", 1905 thread->in_interrupt ? "intr" : "poll"); 1906 1907 if (thread->in_interrupt == enable_interrupt) { 1908 return; 1909 } 1910 1911 /* Set pollers to expected mode */ 1912 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1913 poller_set_interrupt_mode(poller, enable_interrupt); 1914 } 1915 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1916 poller_set_interrupt_mode(poller, enable_interrupt); 1917 } 1918 /* All paused pollers will go to work in interrupt mode */ 1919 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1920 poller_set_interrupt_mode(poller, enable_interrupt); 1921 } 1922 1923 thread->in_interrupt = enable_interrupt; 1924 return; 1925 } 1926 1927 static struct io_device * 1928 io_device_get(void *io_device) 1929 { 1930 struct io_device find = {}; 1931 1932 find.io_device = io_device; 1933 return RB_FIND(io_device_tree, &g_io_devices, &find); 1934 } 1935 1936 void 1937 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1938 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1939 const char *name) 1940 { 1941 struct io_device *dev, *tmp; 1942 struct spdk_thread *thread; 1943 1944 assert(io_device != NULL); 1945 assert(create_cb != NULL); 1946 assert(destroy_cb != NULL); 1947 1948 thread = spdk_get_thread(); 1949 if (!thread) { 1950 SPDK_ERRLOG("called from non-SPDK thread\n"); 1951 assert(false); 1952 return; 1953 } 1954 1955 dev = calloc(1, sizeof(struct io_device)); 1956 if (dev == NULL) { 1957 SPDK_ERRLOG("could not allocate io_device\n"); 1958 return; 1959 } 1960 1961 dev->io_device = io_device; 1962 if (name) { 1963 snprintf(dev->name, sizeof(dev->name), "%s", name); 1964 } else { 1965 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1966 } 1967 dev->create_cb = create_cb; 1968 dev->destroy_cb = destroy_cb; 1969 dev->unregister_cb = NULL; 1970 dev->ctx_size = ctx_size; 1971 dev->for_each_count = 0; 1972 dev->unregistered = false; 1973 dev->refcnt = 0; 1974 1975 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1976 dev->name, dev->io_device, thread->name); 1977 1978 pthread_mutex_lock(&g_devlist_mutex); 1979 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1980 if (tmp != NULL) { 1981 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1982 io_device, tmp->name, dev->name); 1983 free(dev); 1984 } 1985 1986 pthread_mutex_unlock(&g_devlist_mutex); 1987 } 1988 1989 static void 1990 _finish_unregister(void *arg) 1991 { 1992 struct io_device *dev = arg; 1993 struct spdk_thread *thread; 1994 1995 thread = spdk_get_thread(); 1996 assert(thread == dev->unregister_thread); 1997 1998 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1999 dev->name, dev->io_device, thread->name); 2000 2001 assert(thread->pending_unregister_count > 0); 2002 thread->pending_unregister_count--; 2003 2004 dev->unregister_cb(dev->io_device); 2005 free(dev); 2006 } 2007 2008 static void 2009 io_device_free(struct io_device *dev) 2010 { 2011 int rc __attribute__((unused)); 2012 2013 if (dev->unregister_cb == NULL) { 2014 free(dev); 2015 } else { 2016 assert(dev->unregister_thread != NULL); 2017 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2018 dev->name, dev->io_device, dev->unregister_thread->name); 2019 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2020 assert(rc == 0); 2021 } 2022 } 2023 2024 void 2025 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2026 { 2027 struct io_device *dev; 2028 uint32_t refcnt; 2029 struct spdk_thread *thread; 2030 2031 thread = spdk_get_thread(); 2032 if (!thread) { 2033 SPDK_ERRLOG("called from non-SPDK thread\n"); 2034 assert(false); 2035 return; 2036 } 2037 2038 pthread_mutex_lock(&g_devlist_mutex); 2039 dev = io_device_get(io_device); 2040 if (!dev) { 2041 SPDK_ERRLOG("io_device %p not found\n", io_device); 2042 assert(false); 2043 pthread_mutex_unlock(&g_devlist_mutex); 2044 return; 2045 } 2046 2047 if (dev->for_each_count > 0) { 2048 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2049 dev->name, io_device, dev->for_each_count); 2050 pthread_mutex_unlock(&g_devlist_mutex); 2051 return; 2052 } 2053 2054 dev->unregister_cb = unregister_cb; 2055 dev->unregistered = true; 2056 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2057 refcnt = dev->refcnt; 2058 dev->unregister_thread = thread; 2059 pthread_mutex_unlock(&g_devlist_mutex); 2060 2061 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2062 dev->name, dev->io_device, thread->name); 2063 2064 if (unregister_cb) { 2065 thread->pending_unregister_count++; 2066 } 2067 2068 if (refcnt > 0) { 2069 /* defer deletion */ 2070 return; 2071 } 2072 2073 io_device_free(dev); 2074 } 2075 2076 const char * 2077 spdk_io_device_get_name(struct io_device *dev) 2078 { 2079 return dev->name; 2080 } 2081 2082 static struct spdk_io_channel * 2083 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2084 { 2085 struct spdk_io_channel find = {}; 2086 2087 find.dev = dev; 2088 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2089 } 2090 2091 struct spdk_io_channel * 2092 spdk_get_io_channel(void *io_device) 2093 { 2094 struct spdk_io_channel *ch; 2095 struct spdk_thread *thread; 2096 struct io_device *dev; 2097 int rc; 2098 2099 pthread_mutex_lock(&g_devlist_mutex); 2100 dev = io_device_get(io_device); 2101 if (dev == NULL) { 2102 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2103 pthread_mutex_unlock(&g_devlist_mutex); 2104 return NULL; 2105 } 2106 2107 thread = _get_thread(); 2108 if (!thread) { 2109 SPDK_ERRLOG("No thread allocated\n"); 2110 pthread_mutex_unlock(&g_devlist_mutex); 2111 return NULL; 2112 } 2113 2114 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2115 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2116 pthread_mutex_unlock(&g_devlist_mutex); 2117 return NULL; 2118 } 2119 2120 ch = thread_get_io_channel(thread, dev); 2121 if (ch != NULL) { 2122 ch->ref++; 2123 2124 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2125 ch, dev->name, dev->io_device, thread->name, ch->ref); 2126 2127 /* 2128 * An I/O channel already exists for this device on this 2129 * thread, so return it. 2130 */ 2131 pthread_mutex_unlock(&g_devlist_mutex); 2132 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2133 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2134 return ch; 2135 } 2136 2137 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2138 if (ch == NULL) { 2139 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2140 pthread_mutex_unlock(&g_devlist_mutex); 2141 return NULL; 2142 } 2143 2144 ch->dev = dev; 2145 ch->destroy_cb = dev->destroy_cb; 2146 ch->thread = thread; 2147 ch->ref = 1; 2148 ch->destroy_ref = 0; 2149 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2150 2151 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2152 ch, dev->name, dev->io_device, thread->name, ch->ref); 2153 2154 dev->refcnt++; 2155 2156 pthread_mutex_unlock(&g_devlist_mutex); 2157 2158 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2159 if (rc != 0) { 2160 pthread_mutex_lock(&g_devlist_mutex); 2161 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2162 dev->refcnt--; 2163 free(ch); 2164 pthread_mutex_unlock(&g_devlist_mutex); 2165 return NULL; 2166 } 2167 2168 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2169 return ch; 2170 } 2171 2172 static void 2173 put_io_channel(void *arg) 2174 { 2175 struct spdk_io_channel *ch = arg; 2176 bool do_remove_dev = true; 2177 struct spdk_thread *thread; 2178 2179 thread = spdk_get_thread(); 2180 if (!thread) { 2181 SPDK_ERRLOG("called from non-SPDK thread\n"); 2182 assert(false); 2183 return; 2184 } 2185 2186 SPDK_DEBUGLOG(thread, 2187 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2188 ch, ch->dev->name, ch->dev->io_device, thread->name); 2189 2190 assert(ch->thread == thread); 2191 2192 ch->destroy_ref--; 2193 2194 if (ch->ref > 0 || ch->destroy_ref > 0) { 2195 /* 2196 * Another reference to the associated io_device was requested 2197 * after this message was sent but before it had a chance to 2198 * execute. 2199 */ 2200 return; 2201 } 2202 2203 pthread_mutex_lock(&g_devlist_mutex); 2204 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2205 pthread_mutex_unlock(&g_devlist_mutex); 2206 2207 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2208 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2209 2210 pthread_mutex_lock(&g_devlist_mutex); 2211 ch->dev->refcnt--; 2212 2213 if (!ch->dev->unregistered) { 2214 do_remove_dev = false; 2215 } 2216 2217 if (ch->dev->refcnt > 0) { 2218 do_remove_dev = false; 2219 } 2220 2221 pthread_mutex_unlock(&g_devlist_mutex); 2222 2223 if (do_remove_dev) { 2224 io_device_free(ch->dev); 2225 } 2226 free(ch); 2227 } 2228 2229 void 2230 spdk_put_io_channel(struct spdk_io_channel *ch) 2231 { 2232 struct spdk_thread *thread; 2233 int rc __attribute__((unused)); 2234 2235 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2236 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2237 2238 thread = spdk_get_thread(); 2239 if (!thread) { 2240 SPDK_ERRLOG("called from non-SPDK thread\n"); 2241 assert(false); 2242 return; 2243 } 2244 2245 if (ch->thread != thread) { 2246 wrong_thread(__func__, "ch", ch->thread, thread); 2247 return; 2248 } 2249 2250 SPDK_DEBUGLOG(thread, 2251 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2252 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2253 2254 ch->ref--; 2255 2256 if (ch->ref == 0) { 2257 ch->destroy_ref++; 2258 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2259 assert(rc == 0); 2260 } 2261 } 2262 2263 struct spdk_io_channel * 2264 spdk_io_channel_from_ctx(void *ctx) 2265 { 2266 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2267 } 2268 2269 struct spdk_thread * 2270 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2271 { 2272 return ch->thread; 2273 } 2274 2275 void * 2276 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2277 { 2278 return ch->dev->io_device; 2279 } 2280 2281 const char * 2282 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2283 { 2284 return spdk_io_device_get_name(ch->dev); 2285 } 2286 2287 int 2288 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2289 { 2290 return ch->ref; 2291 } 2292 2293 struct spdk_io_channel_iter { 2294 void *io_device; 2295 struct io_device *dev; 2296 spdk_channel_msg fn; 2297 int status; 2298 void *ctx; 2299 struct spdk_io_channel *ch; 2300 2301 struct spdk_thread *cur_thread; 2302 2303 struct spdk_thread *orig_thread; 2304 spdk_channel_for_each_cpl cpl; 2305 }; 2306 2307 void * 2308 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2309 { 2310 return i->io_device; 2311 } 2312 2313 struct spdk_io_channel * 2314 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2315 { 2316 return i->ch; 2317 } 2318 2319 void * 2320 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2321 { 2322 return i->ctx; 2323 } 2324 2325 static void 2326 _call_completion(void *ctx) 2327 { 2328 struct spdk_io_channel_iter *i = ctx; 2329 2330 if (i->cpl != NULL) { 2331 i->cpl(i, i->status); 2332 } 2333 free(i); 2334 } 2335 2336 static void 2337 _call_channel(void *ctx) 2338 { 2339 struct spdk_io_channel_iter *i = ctx; 2340 struct spdk_io_channel *ch; 2341 2342 /* 2343 * It is possible that the channel was deleted before this 2344 * message had a chance to execute. If so, skip calling 2345 * the fn() on this thread. 2346 */ 2347 pthread_mutex_lock(&g_devlist_mutex); 2348 ch = thread_get_io_channel(i->cur_thread, i->dev); 2349 pthread_mutex_unlock(&g_devlist_mutex); 2350 2351 if (ch) { 2352 i->fn(i); 2353 } else { 2354 spdk_for_each_channel_continue(i, 0); 2355 } 2356 } 2357 2358 void 2359 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2360 spdk_channel_for_each_cpl cpl) 2361 { 2362 struct spdk_thread *thread; 2363 struct spdk_io_channel *ch; 2364 struct spdk_io_channel_iter *i; 2365 int rc __attribute__((unused)); 2366 2367 i = calloc(1, sizeof(*i)); 2368 if (!i) { 2369 SPDK_ERRLOG("Unable to allocate iterator\n"); 2370 return; 2371 } 2372 2373 i->io_device = io_device; 2374 i->fn = fn; 2375 i->ctx = ctx; 2376 i->cpl = cpl; 2377 i->orig_thread = _get_thread(); 2378 2379 pthread_mutex_lock(&g_devlist_mutex); 2380 i->dev = io_device_get(io_device); 2381 if (i->dev == NULL) { 2382 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2383 assert(false); 2384 goto end; 2385 } 2386 2387 TAILQ_FOREACH(thread, &g_threads, tailq) { 2388 ch = thread_get_io_channel(thread, i->dev); 2389 if (ch != NULL) { 2390 ch->dev->for_each_count++; 2391 i->cur_thread = thread; 2392 i->ch = ch; 2393 pthread_mutex_unlock(&g_devlist_mutex); 2394 rc = spdk_thread_send_msg(thread, _call_channel, i); 2395 assert(rc == 0); 2396 return; 2397 } 2398 } 2399 2400 end: 2401 pthread_mutex_unlock(&g_devlist_mutex); 2402 2403 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2404 assert(rc == 0); 2405 } 2406 2407 void 2408 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2409 { 2410 struct spdk_thread *thread; 2411 struct spdk_io_channel *ch; 2412 int rc __attribute__((unused)); 2413 2414 assert(i->cur_thread == spdk_get_thread()); 2415 2416 i->status = status; 2417 2418 pthread_mutex_lock(&g_devlist_mutex); 2419 if (status) { 2420 goto end; 2421 } 2422 thread = TAILQ_NEXT(i->cur_thread, tailq); 2423 while (thread) { 2424 ch = thread_get_io_channel(thread, i->dev); 2425 if (ch != NULL) { 2426 i->cur_thread = thread; 2427 i->ch = ch; 2428 pthread_mutex_unlock(&g_devlist_mutex); 2429 rc = spdk_thread_send_msg(thread, _call_channel, i); 2430 assert(rc == 0); 2431 return; 2432 } 2433 thread = TAILQ_NEXT(thread, tailq); 2434 } 2435 2436 end: 2437 i->dev->for_each_count--; 2438 i->ch = NULL; 2439 pthread_mutex_unlock(&g_devlist_mutex); 2440 2441 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2442 assert(rc == 0); 2443 } 2444 2445 struct spdk_interrupt { 2446 int efd; 2447 struct spdk_thread *thread; 2448 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2449 }; 2450 2451 static void 2452 thread_interrupt_destroy(struct spdk_thread *thread) 2453 { 2454 struct spdk_fd_group *fgrp = thread->fgrp; 2455 2456 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2457 2458 if (thread->msg_fd < 0) { 2459 return; 2460 } 2461 2462 spdk_fd_group_remove(fgrp, thread->msg_fd); 2463 close(thread->msg_fd); 2464 thread->msg_fd = -1; 2465 2466 spdk_fd_group_destroy(fgrp); 2467 thread->fgrp = NULL; 2468 } 2469 2470 #ifdef __linux__ 2471 static int 2472 thread_interrupt_msg_process(void *arg) 2473 { 2474 struct spdk_thread *thread = arg; 2475 uint32_t msg_count; 2476 spdk_msg_fn critical_msg; 2477 int rc = 0; 2478 uint64_t notify = 1; 2479 2480 assert(spdk_interrupt_mode_is_enabled()); 2481 2482 /* There may be race between msg_acknowledge and another producer's msg_notify, 2483 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2484 * This can avoid msg notification missing. 2485 */ 2486 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2487 if (rc < 0 && errno != EAGAIN) { 2488 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2489 } 2490 2491 critical_msg = thread->critical_msg; 2492 if (spdk_unlikely(critical_msg != NULL)) { 2493 critical_msg(NULL); 2494 thread->critical_msg = NULL; 2495 rc = 1; 2496 } 2497 2498 msg_count = msg_queue_run_batch(thread, 0); 2499 if (msg_count) { 2500 rc = 1; 2501 } 2502 2503 return rc; 2504 } 2505 2506 static int 2507 thread_interrupt_create(struct spdk_thread *thread) 2508 { 2509 int rc; 2510 2511 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2512 2513 rc = spdk_fd_group_create(&thread->fgrp); 2514 if (rc) { 2515 return rc; 2516 } 2517 2518 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2519 if (thread->msg_fd < 0) { 2520 rc = -errno; 2521 spdk_fd_group_destroy(thread->fgrp); 2522 thread->fgrp = NULL; 2523 2524 return rc; 2525 } 2526 2527 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2528 thread_interrupt_msg_process, thread); 2529 } 2530 #else 2531 static int 2532 thread_interrupt_create(struct spdk_thread *thread) 2533 { 2534 return -ENOTSUP; 2535 } 2536 #endif 2537 2538 struct spdk_interrupt * 2539 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2540 void *arg, const char *name) 2541 { 2542 struct spdk_thread *thread; 2543 struct spdk_interrupt *intr; 2544 int ret; 2545 2546 thread = spdk_get_thread(); 2547 if (!thread) { 2548 assert(false); 2549 return NULL; 2550 } 2551 2552 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2553 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2554 return NULL; 2555 } 2556 2557 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2558 2559 if (ret != 0) { 2560 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2561 thread->name, efd, spdk_strerror(-ret)); 2562 return NULL; 2563 } 2564 2565 intr = calloc(1, sizeof(*intr)); 2566 if (intr == NULL) { 2567 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2568 return NULL; 2569 } 2570 2571 if (name) { 2572 snprintf(intr->name, sizeof(intr->name), "%s", name); 2573 } else { 2574 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2575 } 2576 2577 intr->efd = efd; 2578 intr->thread = thread; 2579 2580 return intr; 2581 } 2582 2583 void 2584 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2585 { 2586 struct spdk_thread *thread; 2587 struct spdk_interrupt *intr; 2588 2589 intr = *pintr; 2590 if (intr == NULL) { 2591 return; 2592 } 2593 2594 *pintr = NULL; 2595 2596 thread = spdk_get_thread(); 2597 if (!thread) { 2598 assert(false); 2599 return; 2600 } 2601 2602 if (intr->thread != thread) { 2603 wrong_thread(__func__, intr->name, intr->thread, thread); 2604 return; 2605 } 2606 2607 spdk_fd_group_remove(thread->fgrp, intr->efd); 2608 free(intr); 2609 } 2610 2611 int 2612 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2613 enum spdk_interrupt_event_types event_types) 2614 { 2615 struct spdk_thread *thread; 2616 2617 thread = spdk_get_thread(); 2618 if (!thread) { 2619 assert(false); 2620 return -EINVAL; 2621 } 2622 2623 if (intr->thread != thread) { 2624 wrong_thread(__func__, intr->name, intr->thread, thread); 2625 return -EINVAL; 2626 } 2627 2628 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2629 } 2630 2631 int 2632 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2633 { 2634 return spdk_fd_group_get_fd(thread->fgrp); 2635 } 2636 2637 static bool g_interrupt_mode = false; 2638 2639 int 2640 spdk_interrupt_mode_enable(void) 2641 { 2642 /* It must be called once prior to initializing the threading library. 2643 * g_spdk_msg_mempool will be valid if thread library is initialized. 2644 */ 2645 if (g_spdk_msg_mempool) { 2646 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2647 return -1; 2648 } 2649 2650 #ifdef __linux__ 2651 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2652 g_interrupt_mode = true; 2653 return 0; 2654 #else 2655 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2656 g_interrupt_mode = false; 2657 return -ENOTSUP; 2658 #endif 2659 } 2660 2661 bool 2662 spdk_interrupt_mode_is_enabled(void) 2663 { 2664 return g_interrupt_mode; 2665 } 2666 2667 SPDK_LOG_REGISTER_COMPONENT(thread) 2668