1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/env.h" 9 #include "spdk/likely.h" 10 #include "spdk/queue.h" 11 #include "spdk/string.h" 12 #include "spdk/thread.h" 13 #include "spdk/trace.h" 14 #include "spdk/util.h" 15 #include "spdk/fd_group.h" 16 17 #include "spdk/log.h" 18 #include "spdk_internal/thread.h" 19 #include "spdk_internal/usdt.h" 20 #include "thread_internal.h" 21 22 #include "spdk_internal/trace_defs.h" 23 24 #ifdef __linux__ 25 #include <sys/timerfd.h> 26 #include <sys/eventfd.h> 27 #endif 28 29 #define SPDK_MSG_BATCH_SIZE 8 30 #define SPDK_MAX_DEVICE_NAME_LEN 256 31 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 32 #define SPDK_MAX_POLLER_NAME_LEN 256 33 #define SPDK_MAX_THREAD_NAME_LEN 256 34 35 enum spdk_poller_state { 36 /* The poller is registered with a thread but not currently executing its fn. */ 37 SPDK_POLLER_STATE_WAITING, 38 39 /* The poller is currently running its fn. */ 40 SPDK_POLLER_STATE_RUNNING, 41 42 /* The poller was unregistered during the execution of its fn. */ 43 SPDK_POLLER_STATE_UNREGISTERED, 44 45 /* The poller is in the process of being paused. It will be paused 46 * during the next time it's supposed to be executed. 47 */ 48 SPDK_POLLER_STATE_PAUSING, 49 50 /* The poller is registered but currently paused. It's on the 51 * paused_pollers list. 52 */ 53 SPDK_POLLER_STATE_PAUSED, 54 }; 55 56 struct spdk_poller { 57 TAILQ_ENTRY(spdk_poller) tailq; 58 RB_ENTRY(spdk_poller) node; 59 60 /* Current state of the poller; should only be accessed from the poller's thread. */ 61 enum spdk_poller_state state; 62 63 uint64_t period_ticks; 64 uint64_t next_run_tick; 65 uint64_t run_count; 66 uint64_t busy_count; 67 uint64_t id; 68 spdk_poller_fn fn; 69 void *arg; 70 struct spdk_thread *thread; 71 /* Native interruptfd for period or busy poller */ 72 int interruptfd; 73 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 74 void *set_intr_cb_arg; 75 76 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 77 }; 78 79 enum spdk_thread_state { 80 /* The thread is processing poller and message by spdk_thread_poll(). */ 81 SPDK_THREAD_STATE_RUNNING, 82 83 /* The thread is in the process of termination. It reaps unregistering 84 * poller are releasing I/O channel. 85 */ 86 SPDK_THREAD_STATE_EXITING, 87 88 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 89 SPDK_THREAD_STATE_EXITED, 90 }; 91 92 struct spdk_thread { 93 uint64_t tsc_last; 94 struct spdk_thread_stats stats; 95 /* 96 * Contains pollers actively running on this thread. Pollers 97 * are run round-robin. The thread takes one poller from the head 98 * of the ring, executes it, then puts it back at the tail of 99 * the ring. 100 */ 101 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 102 /** 103 * Contains pollers running on this thread with a periodic timer. 104 */ 105 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 106 struct spdk_poller *first_timed_poller; 107 /* 108 * Contains paused pollers. Pollers on this queue are waiting until 109 * they are resumed (in which case they're put onto the active/timer 110 * queues) or unregistered. 111 */ 112 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 113 struct spdk_ring *messages; 114 int msg_fd; 115 SLIST_HEAD(, spdk_msg) msg_cache; 116 size_t msg_cache_count; 117 spdk_msg_fn critical_msg; 118 uint64_t id; 119 uint64_t next_poller_id; 120 enum spdk_thread_state state; 121 int pending_unregister_count; 122 123 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 124 TAILQ_ENTRY(spdk_thread) tailq; 125 126 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 127 struct spdk_cpuset cpumask; 128 uint64_t exit_timeout_tsc; 129 130 /* Indicates whether this spdk_thread currently runs in interrupt. */ 131 bool in_interrupt; 132 bool poller_unregistered; 133 struct spdk_fd_group *fgrp; 134 135 /* User context allocated at the end */ 136 uint8_t ctx[0]; 137 }; 138 139 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 140 141 static spdk_new_thread_fn g_new_thread_fn = NULL; 142 static spdk_thread_op_fn g_thread_op_fn = NULL; 143 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 144 static size_t g_ctx_sz = 0; 145 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 146 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 147 * SPDK application is required. 148 */ 149 static uint64_t g_thread_id = 1; 150 151 struct io_device { 152 void *io_device; 153 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 154 spdk_io_channel_create_cb create_cb; 155 spdk_io_channel_destroy_cb destroy_cb; 156 spdk_io_device_unregister_cb unregister_cb; 157 struct spdk_thread *unregister_thread; 158 uint32_t ctx_size; 159 uint32_t for_each_count; 160 RB_ENTRY(io_device) node; 161 162 uint32_t refcnt; 163 164 bool unregistered; 165 }; 166 167 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 168 169 static int 170 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 171 { 172 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 173 } 174 175 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 176 177 static int 178 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 179 { 180 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 181 } 182 183 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 184 185 struct spdk_msg { 186 spdk_msg_fn fn; 187 void *arg; 188 189 SLIST_ENTRY(spdk_msg) link; 190 }; 191 192 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 193 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 194 195 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 196 static uint32_t g_thread_count = 0; 197 198 static __thread struct spdk_thread *tls_thread = NULL; 199 200 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 201 { 202 spdk_trace_register_description("THREAD_IOCH_GET", 203 TRACE_THREAD_IOCH_GET, 204 OWNER_NONE, OBJECT_NONE, 0, 205 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 206 spdk_trace_register_description("THREAD_IOCH_PUT", 207 TRACE_THREAD_IOCH_PUT, 208 OWNER_NONE, OBJECT_NONE, 0, 209 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 210 } 211 212 /* 213 * If this compare function returns zero when two next_run_ticks are equal, 214 * the macro RB_INSERT() returns a pointer to the element with the same 215 * next_run_tick. 216 * 217 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 218 * to remove as a parameter. 219 * 220 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 221 * side by returning 1 when two next_run_ticks are equal. 222 */ 223 static inline int 224 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 225 { 226 if (poller1->next_run_tick < poller2->next_run_tick) { 227 return -1; 228 } else { 229 return 1; 230 } 231 } 232 233 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 234 235 static inline struct spdk_thread * 236 _get_thread(void) 237 { 238 return tls_thread; 239 } 240 241 static int 242 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 243 { 244 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 245 246 g_ctx_sz = ctx_sz; 247 248 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 249 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 250 sizeof(struct spdk_msg), 251 0, /* No cache. We do our own. */ 252 SPDK_ENV_SOCKET_ID_ANY); 253 254 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 255 msg_mempool_sz); 256 257 if (!g_spdk_msg_mempool) { 258 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 259 return -1; 260 } 261 262 return 0; 263 } 264 265 int 266 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 267 { 268 assert(g_new_thread_fn == NULL); 269 assert(g_thread_op_fn == NULL); 270 271 if (new_thread_fn == NULL) { 272 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 273 } else { 274 g_new_thread_fn = new_thread_fn; 275 } 276 277 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 278 } 279 280 int 281 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 282 spdk_thread_op_supported_fn thread_op_supported_fn, 283 size_t ctx_sz, size_t msg_mempool_sz) 284 { 285 assert(g_new_thread_fn == NULL); 286 assert(g_thread_op_fn == NULL); 287 assert(g_thread_op_supported_fn == NULL); 288 289 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 290 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 291 return -EINVAL; 292 } 293 294 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 295 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 296 } else { 297 g_thread_op_fn = thread_op_fn; 298 g_thread_op_supported_fn = thread_op_supported_fn; 299 } 300 301 return _thread_lib_init(ctx_sz, msg_mempool_sz); 302 } 303 304 void 305 spdk_thread_lib_fini(void) 306 { 307 struct io_device *dev; 308 309 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 310 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 311 } 312 313 if (g_spdk_msg_mempool) { 314 spdk_mempool_free(g_spdk_msg_mempool); 315 g_spdk_msg_mempool = NULL; 316 } 317 318 g_new_thread_fn = NULL; 319 g_thread_op_fn = NULL; 320 g_thread_op_supported_fn = NULL; 321 g_ctx_sz = 0; 322 } 323 324 static void thread_interrupt_destroy(struct spdk_thread *thread); 325 static int thread_interrupt_create(struct spdk_thread *thread); 326 327 static void 328 _free_thread(struct spdk_thread *thread) 329 { 330 struct spdk_io_channel *ch; 331 struct spdk_msg *msg; 332 struct spdk_poller *poller, *ptmp; 333 334 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 335 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 336 thread->name, ch->dev->name); 337 } 338 339 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 340 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 341 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 342 poller->name); 343 } 344 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 345 free(poller); 346 } 347 348 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 349 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 350 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 351 poller->name); 352 } 353 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 354 free(poller); 355 } 356 357 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 358 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 359 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 360 free(poller); 361 } 362 363 pthread_mutex_lock(&g_devlist_mutex); 364 assert(g_thread_count > 0); 365 g_thread_count--; 366 TAILQ_REMOVE(&g_threads, thread, tailq); 367 pthread_mutex_unlock(&g_devlist_mutex); 368 369 msg = SLIST_FIRST(&thread->msg_cache); 370 while (msg != NULL) { 371 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 372 373 assert(thread->msg_cache_count > 0); 374 thread->msg_cache_count--; 375 spdk_mempool_put(g_spdk_msg_mempool, msg); 376 377 msg = SLIST_FIRST(&thread->msg_cache); 378 } 379 380 assert(thread->msg_cache_count == 0); 381 382 if (spdk_interrupt_mode_is_enabled()) { 383 thread_interrupt_destroy(thread); 384 } 385 386 spdk_ring_free(thread->messages); 387 free(thread); 388 } 389 390 struct spdk_thread * 391 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 392 { 393 struct spdk_thread *thread; 394 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 395 int rc = 0, i; 396 397 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 398 if (!thread) { 399 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 400 return NULL; 401 } 402 403 if (cpumask) { 404 spdk_cpuset_copy(&thread->cpumask, cpumask); 405 } else { 406 spdk_cpuset_negate(&thread->cpumask); 407 } 408 409 RB_INIT(&thread->io_channels); 410 TAILQ_INIT(&thread->active_pollers); 411 RB_INIT(&thread->timed_pollers); 412 TAILQ_INIT(&thread->paused_pollers); 413 SLIST_INIT(&thread->msg_cache); 414 thread->msg_cache_count = 0; 415 416 thread->tsc_last = spdk_get_ticks(); 417 418 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 419 * ID exceeds UINT64_MAX a warning message is logged 420 */ 421 thread->next_poller_id = 1; 422 423 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 424 if (!thread->messages) { 425 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 426 free(thread); 427 return NULL; 428 } 429 430 /* Fill the local message pool cache. */ 431 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 432 if (rc == 0) { 433 /* If we can't populate the cache it's ok. The cache will get filled 434 * up organically as messages are passed to the thread. */ 435 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 436 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 437 thread->msg_cache_count++; 438 } 439 } 440 441 if (name) { 442 snprintf(thread->name, sizeof(thread->name), "%s", name); 443 } else { 444 snprintf(thread->name, sizeof(thread->name), "%p", thread); 445 } 446 447 pthread_mutex_lock(&g_devlist_mutex); 448 if (g_thread_id == 0) { 449 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 450 pthread_mutex_unlock(&g_devlist_mutex); 451 _free_thread(thread); 452 return NULL; 453 } 454 thread->id = g_thread_id++; 455 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 456 g_thread_count++; 457 pthread_mutex_unlock(&g_devlist_mutex); 458 459 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 460 thread->id, thread->name); 461 462 if (spdk_interrupt_mode_is_enabled()) { 463 thread->in_interrupt = true; 464 rc = thread_interrupt_create(thread); 465 if (rc != 0) { 466 _free_thread(thread); 467 return NULL; 468 } 469 } 470 471 if (g_new_thread_fn) { 472 rc = g_new_thread_fn(thread); 473 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 474 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 475 } 476 477 if (rc != 0) { 478 _free_thread(thread); 479 return NULL; 480 } 481 482 thread->state = SPDK_THREAD_STATE_RUNNING; 483 484 return thread; 485 } 486 487 void 488 spdk_set_thread(struct spdk_thread *thread) 489 { 490 tls_thread = thread; 491 } 492 493 static void 494 thread_exit(struct spdk_thread *thread, uint64_t now) 495 { 496 struct spdk_poller *poller; 497 struct spdk_io_channel *ch; 498 499 if (now >= thread->exit_timeout_tsc) { 500 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 501 thread->name); 502 goto exited; 503 } 504 505 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 506 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 507 SPDK_INFOLOG(thread, 508 "thread %s still has active poller %s\n", 509 thread->name, poller->name); 510 return; 511 } 512 } 513 514 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 515 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 516 SPDK_INFOLOG(thread, 517 "thread %s still has active timed poller %s\n", 518 thread->name, poller->name); 519 return; 520 } 521 } 522 523 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 524 SPDK_INFOLOG(thread, 525 "thread %s still has paused poller %s\n", 526 thread->name, poller->name); 527 return; 528 } 529 530 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 531 SPDK_INFOLOG(thread, 532 "thread %s still has channel for io_device %s\n", 533 thread->name, ch->dev->name); 534 return; 535 } 536 537 if (thread->pending_unregister_count > 0) { 538 SPDK_INFOLOG(thread, 539 "thread %s is still unregistering io_devices\n", 540 thread->name); 541 return; 542 } 543 544 exited: 545 thread->state = SPDK_THREAD_STATE_EXITED; 546 } 547 548 int 549 spdk_thread_exit(struct spdk_thread *thread) 550 { 551 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 552 553 assert(tls_thread == thread); 554 555 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 556 SPDK_INFOLOG(thread, 557 "thread %s is already exiting\n", 558 thread->name); 559 return 0; 560 } 561 562 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 563 SPDK_THREAD_EXIT_TIMEOUT_SEC); 564 thread->state = SPDK_THREAD_STATE_EXITING; 565 return 0; 566 } 567 568 bool 569 spdk_thread_is_exited(struct spdk_thread *thread) 570 { 571 return thread->state == SPDK_THREAD_STATE_EXITED; 572 } 573 574 void 575 spdk_thread_destroy(struct spdk_thread *thread) 576 { 577 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 578 579 assert(thread->state == SPDK_THREAD_STATE_EXITED); 580 581 if (tls_thread == thread) { 582 tls_thread = NULL; 583 } 584 585 _free_thread(thread); 586 } 587 588 void * 589 spdk_thread_get_ctx(struct spdk_thread *thread) 590 { 591 if (g_ctx_sz > 0) { 592 return thread->ctx; 593 } 594 595 return NULL; 596 } 597 598 struct spdk_cpuset * 599 spdk_thread_get_cpumask(struct spdk_thread *thread) 600 { 601 return &thread->cpumask; 602 } 603 604 int 605 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 606 { 607 struct spdk_thread *thread; 608 609 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 610 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 611 assert(false); 612 return -ENOTSUP; 613 } 614 615 thread = spdk_get_thread(); 616 if (!thread) { 617 SPDK_ERRLOG("Called from non-SPDK thread\n"); 618 assert(false); 619 return -EINVAL; 620 } 621 622 spdk_cpuset_copy(&thread->cpumask, cpumask); 623 624 /* Invoke framework's reschedule operation. If this function is called multiple times 625 * in a single spdk_thread_poll() context, the last cpumask will be used in the 626 * reschedule operation. 627 */ 628 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 629 630 return 0; 631 } 632 633 struct spdk_thread * 634 spdk_thread_get_from_ctx(void *ctx) 635 { 636 if (ctx == NULL) { 637 assert(false); 638 return NULL; 639 } 640 641 assert(g_ctx_sz > 0); 642 643 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 644 } 645 646 static inline uint32_t 647 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 648 { 649 unsigned count, i; 650 void *messages[SPDK_MSG_BATCH_SIZE]; 651 uint64_t notify = 1; 652 int rc; 653 654 #ifdef DEBUG 655 /* 656 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 657 * so we will never actually read uninitialized data from events, but just to be sure 658 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 659 */ 660 memset(messages, 0, sizeof(messages)); 661 #endif 662 663 if (max_msgs > 0) { 664 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 665 } else { 666 max_msgs = SPDK_MSG_BATCH_SIZE; 667 } 668 669 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 670 if (spdk_unlikely(thread->in_interrupt) && 671 spdk_ring_count(thread->messages) != 0) { 672 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 673 if (rc < 0) { 674 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 675 } 676 } 677 if (count == 0) { 678 return 0; 679 } 680 681 for (i = 0; i < count; i++) { 682 struct spdk_msg *msg = messages[i]; 683 684 assert(msg != NULL); 685 686 SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg); 687 688 msg->fn(msg->arg); 689 690 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 691 /* Insert the messages at the head. We want to re-use the hot 692 * ones. */ 693 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 694 thread->msg_cache_count++; 695 } else { 696 spdk_mempool_put(g_spdk_msg_mempool, msg); 697 } 698 } 699 700 return count; 701 } 702 703 static void 704 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 705 { 706 struct spdk_poller *tmp __attribute__((unused)); 707 708 poller->next_run_tick = now + poller->period_ticks; 709 710 /* 711 * Insert poller in the thread's timed_pollers tree by next scheduled run time 712 * as its key. 713 */ 714 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 715 assert(tmp == NULL); 716 717 /* Update the cache only if it is empty or the inserted poller is earlier than it. 718 * RB_MIN() is not necessary here because all pollers, which has exactly the same 719 * next_run_tick as the existing poller, are inserted on the right side. 720 */ 721 if (thread->first_timed_poller == NULL || 722 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 723 thread->first_timed_poller = poller; 724 } 725 } 726 727 static inline void 728 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 729 { 730 struct spdk_poller *tmp __attribute__((unused)); 731 732 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 733 assert(tmp != NULL); 734 735 /* This function is not used in any case that is performance critical. 736 * Update the cache simply by RB_MIN() if it needs to be changed. 737 */ 738 if (thread->first_timed_poller == poller) { 739 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 740 } 741 } 742 743 static void 744 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 745 { 746 if (poller->period_ticks) { 747 poller_insert_timer(thread, poller, spdk_get_ticks()); 748 } else { 749 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 750 } 751 } 752 753 static inline void 754 thread_update_stats(struct spdk_thread *thread, uint64_t end, 755 uint64_t start, int rc) 756 { 757 if (rc == 0) { 758 /* Poller status idle */ 759 thread->stats.idle_tsc += end - start; 760 } else if (rc > 0) { 761 /* Poller status busy */ 762 thread->stats.busy_tsc += end - start; 763 } 764 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 765 thread->tsc_last = end; 766 } 767 768 static inline int 769 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 770 { 771 int rc; 772 773 switch (poller->state) { 774 case SPDK_POLLER_STATE_UNREGISTERED: 775 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 776 free(poller); 777 return 0; 778 case SPDK_POLLER_STATE_PAUSING: 779 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 780 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 781 poller->state = SPDK_POLLER_STATE_PAUSED; 782 return 0; 783 case SPDK_POLLER_STATE_WAITING: 784 break; 785 default: 786 assert(false); 787 break; 788 } 789 790 poller->state = SPDK_POLLER_STATE_RUNNING; 791 rc = poller->fn(poller->arg); 792 793 poller->run_count++; 794 if (rc > 0) { 795 poller->busy_count++; 796 } 797 798 #ifdef DEBUG 799 if (rc == -1) { 800 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 801 } 802 #endif 803 804 switch (poller->state) { 805 case SPDK_POLLER_STATE_UNREGISTERED: 806 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 807 free(poller); 808 break; 809 case SPDK_POLLER_STATE_PAUSING: 810 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 811 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 812 poller->state = SPDK_POLLER_STATE_PAUSED; 813 break; 814 case SPDK_POLLER_STATE_PAUSED: 815 case SPDK_POLLER_STATE_WAITING: 816 break; 817 case SPDK_POLLER_STATE_RUNNING: 818 poller->state = SPDK_POLLER_STATE_WAITING; 819 break; 820 default: 821 assert(false); 822 break; 823 } 824 825 return rc; 826 } 827 828 static inline int 829 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 830 uint64_t now) 831 { 832 int rc; 833 834 switch (poller->state) { 835 case SPDK_POLLER_STATE_UNREGISTERED: 836 free(poller); 837 return 0; 838 case SPDK_POLLER_STATE_PAUSING: 839 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 840 poller->state = SPDK_POLLER_STATE_PAUSED; 841 return 0; 842 case SPDK_POLLER_STATE_WAITING: 843 break; 844 default: 845 assert(false); 846 break; 847 } 848 849 poller->state = SPDK_POLLER_STATE_RUNNING; 850 rc = poller->fn(poller->arg); 851 852 poller->run_count++; 853 if (rc > 0) { 854 poller->busy_count++; 855 } 856 857 #ifdef DEBUG 858 if (rc == -1) { 859 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 860 } 861 #endif 862 863 switch (poller->state) { 864 case SPDK_POLLER_STATE_UNREGISTERED: 865 free(poller); 866 break; 867 case SPDK_POLLER_STATE_PAUSING: 868 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 869 poller->state = SPDK_POLLER_STATE_PAUSED; 870 break; 871 case SPDK_POLLER_STATE_PAUSED: 872 break; 873 case SPDK_POLLER_STATE_RUNNING: 874 poller->state = SPDK_POLLER_STATE_WAITING; 875 /* fallthrough */ 876 case SPDK_POLLER_STATE_WAITING: 877 poller_insert_timer(thread, poller, now); 878 break; 879 default: 880 assert(false); 881 break; 882 } 883 884 return rc; 885 } 886 887 static int 888 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 889 { 890 uint32_t msg_count; 891 struct spdk_poller *poller, *tmp; 892 spdk_msg_fn critical_msg; 893 int rc = 0; 894 895 thread->tsc_last = now; 896 897 critical_msg = thread->critical_msg; 898 if (spdk_unlikely(critical_msg != NULL)) { 899 critical_msg(NULL); 900 thread->critical_msg = NULL; 901 rc = 1; 902 } 903 904 msg_count = msg_queue_run_batch(thread, max_msgs); 905 if (msg_count) { 906 rc = 1; 907 } 908 909 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 910 active_pollers_head, tailq, tmp) { 911 int poller_rc; 912 913 poller_rc = thread_execute_poller(thread, poller); 914 if (poller_rc > rc) { 915 rc = poller_rc; 916 } 917 } 918 919 poller = thread->first_timed_poller; 920 while (poller != NULL) { 921 int timer_rc = 0; 922 923 if (now < poller->next_run_tick) { 924 break; 925 } 926 927 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 928 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 929 930 /* Update the cache to the next timed poller in the list 931 * only if the current poller is still the closest, otherwise, 932 * do nothing because the cache has been already updated. 933 */ 934 if (thread->first_timed_poller == poller) { 935 thread->first_timed_poller = tmp; 936 } 937 938 timer_rc = thread_execute_timed_poller(thread, poller, now); 939 if (timer_rc > rc) { 940 rc = timer_rc; 941 } 942 943 poller = tmp; 944 } 945 946 return rc; 947 } 948 949 int 950 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 951 { 952 struct spdk_thread *orig_thread; 953 int rc; 954 uint64_t notify = 1; 955 956 orig_thread = _get_thread(); 957 tls_thread = thread; 958 959 if (now == 0) { 960 now = spdk_get_ticks(); 961 } 962 963 if (spdk_likely(!thread->in_interrupt)) { 964 rc = thread_poll(thread, max_msgs, now); 965 if (spdk_unlikely(thread->in_interrupt)) { 966 /* The thread transitioned to interrupt mode during the above poll. 967 * Poll it one more time in case that during the transition time 968 * there is msg received without notification. 969 */ 970 rc = thread_poll(thread, max_msgs, now); 971 } 972 } else { 973 /* Non-block wait on thread's fd_group */ 974 rc = spdk_fd_group_wait(thread->fgrp, 0); 975 if (spdk_unlikely(!thread->in_interrupt)) { 976 /* The thread transitioned to poll mode in a msg during the above processing. 977 * Clear msg_fd since thread messages will be polled directly in poll mode. 978 */ 979 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 980 if (rc < 0 && errno != EAGAIN) { 981 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 982 } 983 } 984 985 /* Reap unregistered pollers out of poller execution in intr mode */ 986 if (spdk_unlikely(thread->poller_unregistered)) { 987 struct spdk_poller *poller, *tmp; 988 989 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 990 active_pollers_head, tailq, tmp) { 991 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 992 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 993 free(poller); 994 } 995 } 996 997 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 998 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 999 poller_remove_timer(thread, poller); 1000 free(poller); 1001 } 1002 } 1003 1004 thread->poller_unregistered = false; 1005 } 1006 } 1007 1008 1009 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1010 thread_exit(thread, now); 1011 } 1012 1013 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1014 1015 tls_thread = orig_thread; 1016 1017 return rc; 1018 } 1019 1020 uint64_t 1021 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1022 { 1023 struct spdk_poller *poller; 1024 1025 poller = thread->first_timed_poller; 1026 if (poller) { 1027 return poller->next_run_tick; 1028 } 1029 1030 return 0; 1031 } 1032 1033 int 1034 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1035 { 1036 return !TAILQ_EMPTY(&thread->active_pollers); 1037 } 1038 1039 static bool 1040 thread_has_unpaused_pollers(struct spdk_thread *thread) 1041 { 1042 if (TAILQ_EMPTY(&thread->active_pollers) && 1043 RB_EMPTY(&thread->timed_pollers)) { 1044 return false; 1045 } 1046 1047 return true; 1048 } 1049 1050 bool 1051 spdk_thread_has_pollers(struct spdk_thread *thread) 1052 { 1053 if (!thread_has_unpaused_pollers(thread) && 1054 TAILQ_EMPTY(&thread->paused_pollers)) { 1055 return false; 1056 } 1057 1058 return true; 1059 } 1060 1061 bool 1062 spdk_thread_is_idle(struct spdk_thread *thread) 1063 { 1064 if (spdk_ring_count(thread->messages) || 1065 thread_has_unpaused_pollers(thread) || 1066 thread->critical_msg != NULL) { 1067 return false; 1068 } 1069 1070 return true; 1071 } 1072 1073 uint32_t 1074 spdk_thread_get_count(void) 1075 { 1076 /* 1077 * Return cached value of the current thread count. We could acquire the 1078 * lock and iterate through the TAILQ of threads to count them, but that 1079 * count could still be invalidated after we release the lock. 1080 */ 1081 return g_thread_count; 1082 } 1083 1084 struct spdk_thread * 1085 spdk_get_thread(void) 1086 { 1087 return _get_thread(); 1088 } 1089 1090 const char * 1091 spdk_thread_get_name(const struct spdk_thread *thread) 1092 { 1093 return thread->name; 1094 } 1095 1096 uint64_t 1097 spdk_thread_get_id(const struct spdk_thread *thread) 1098 { 1099 return thread->id; 1100 } 1101 1102 struct spdk_thread * 1103 spdk_thread_get_by_id(uint64_t id) 1104 { 1105 struct spdk_thread *thread; 1106 1107 if (id == 0 || id >= g_thread_id) { 1108 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1109 return NULL; 1110 } 1111 pthread_mutex_lock(&g_devlist_mutex); 1112 TAILQ_FOREACH(thread, &g_threads, tailq) { 1113 if (thread->id == id) { 1114 break; 1115 } 1116 } 1117 pthread_mutex_unlock(&g_devlist_mutex); 1118 return thread; 1119 } 1120 1121 int 1122 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1123 { 1124 struct spdk_thread *thread; 1125 1126 thread = _get_thread(); 1127 if (!thread) { 1128 SPDK_ERRLOG("No thread allocated\n"); 1129 return -EINVAL; 1130 } 1131 1132 if (stats == NULL) { 1133 return -EINVAL; 1134 } 1135 1136 *stats = thread->stats; 1137 1138 return 0; 1139 } 1140 1141 uint64_t 1142 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1143 { 1144 if (thread == NULL) { 1145 thread = _get_thread(); 1146 } 1147 1148 return thread->tsc_last; 1149 } 1150 1151 static inline int 1152 thread_send_msg_notification(const struct spdk_thread *target_thread) 1153 { 1154 uint64_t notify = 1; 1155 int rc; 1156 1157 /* Not necessary to do notification if interrupt facility is not enabled */ 1158 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1159 return 0; 1160 } 1161 1162 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1163 * after sending thread msg, it is necessary to check whether target thread runs in 1164 * interrupt mode and then decide whether do event notification. 1165 */ 1166 if (spdk_unlikely(target_thread->in_interrupt)) { 1167 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1168 if (rc < 0) { 1169 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1170 return -EIO; 1171 } 1172 } 1173 1174 return 0; 1175 } 1176 1177 int 1178 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1179 { 1180 struct spdk_thread *local_thread; 1181 struct spdk_msg *msg; 1182 int rc; 1183 1184 assert(thread != NULL); 1185 1186 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1187 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1188 return -EIO; 1189 } 1190 1191 local_thread = _get_thread(); 1192 1193 msg = NULL; 1194 if (local_thread != NULL) { 1195 if (local_thread->msg_cache_count > 0) { 1196 msg = SLIST_FIRST(&local_thread->msg_cache); 1197 assert(msg != NULL); 1198 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1199 local_thread->msg_cache_count--; 1200 } 1201 } 1202 1203 if (msg == NULL) { 1204 msg = spdk_mempool_get(g_spdk_msg_mempool); 1205 if (!msg) { 1206 SPDK_ERRLOG("msg could not be allocated\n"); 1207 return -ENOMEM; 1208 } 1209 } 1210 1211 msg->fn = fn; 1212 msg->arg = ctx; 1213 1214 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1215 if (rc != 1) { 1216 SPDK_ERRLOG("msg could not be enqueued\n"); 1217 spdk_mempool_put(g_spdk_msg_mempool, msg); 1218 return -EIO; 1219 } 1220 1221 return thread_send_msg_notification(thread); 1222 } 1223 1224 int 1225 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1226 { 1227 spdk_msg_fn expected = NULL; 1228 1229 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1230 __ATOMIC_SEQ_CST)) { 1231 return -EIO; 1232 } 1233 1234 return thread_send_msg_notification(thread); 1235 } 1236 1237 #ifdef __linux__ 1238 static int 1239 interrupt_timerfd_process(void *arg) 1240 { 1241 struct spdk_poller *poller = arg; 1242 uint64_t exp; 1243 int rc; 1244 1245 /* clear the level of interval timer */ 1246 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1247 if (rc < 0) { 1248 if (rc == -EAGAIN) { 1249 return 0; 1250 } 1251 1252 return rc; 1253 } 1254 1255 SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg); 1256 1257 return poller->fn(poller->arg); 1258 } 1259 1260 static int 1261 period_poller_interrupt_init(struct spdk_poller *poller) 1262 { 1263 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1264 int timerfd; 1265 int rc; 1266 1267 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1268 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1269 if (timerfd < 0) { 1270 return -errno; 1271 } 1272 1273 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1274 if (rc < 0) { 1275 close(timerfd); 1276 return rc; 1277 } 1278 1279 poller->interruptfd = timerfd; 1280 return 0; 1281 } 1282 1283 static void 1284 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1285 { 1286 int timerfd = poller->interruptfd; 1287 uint64_t now_tick = spdk_get_ticks(); 1288 uint64_t ticks = spdk_get_ticks_hz(); 1289 int ret; 1290 struct itimerspec new_tv = {}; 1291 struct itimerspec old_tv = {}; 1292 1293 assert(poller->period_ticks != 0); 1294 assert(timerfd >= 0); 1295 1296 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1297 interrupt_mode ? "interrupt" : "poll"); 1298 1299 if (interrupt_mode) { 1300 /* Set repeated timer expiration */ 1301 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1302 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1303 1304 /* Update next timer expiration */ 1305 if (poller->next_run_tick == 0) { 1306 poller->next_run_tick = now_tick + poller->period_ticks; 1307 } else if (poller->next_run_tick < now_tick) { 1308 poller->next_run_tick = now_tick; 1309 } 1310 1311 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1312 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1313 1314 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1315 if (ret < 0) { 1316 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1317 assert(false); 1318 } 1319 } else { 1320 /* Disarm the timer */ 1321 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1322 if (ret < 0) { 1323 /* timerfd_settime's failure indicates that the timerfd is in error */ 1324 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1325 assert(false); 1326 } 1327 1328 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1329 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1330 */ 1331 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1332 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1333 poller_remove_timer(poller->thread, poller); 1334 poller_insert_timer(poller->thread, poller, now_tick); 1335 } 1336 } 1337 1338 static void 1339 poller_interrupt_fini(struct spdk_poller *poller) 1340 { 1341 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1342 assert(poller->interruptfd >= 0); 1343 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1344 close(poller->interruptfd); 1345 poller->interruptfd = -1; 1346 } 1347 1348 static int 1349 busy_poller_interrupt_init(struct spdk_poller *poller) 1350 { 1351 int busy_efd; 1352 int rc; 1353 1354 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1355 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1356 if (busy_efd < 0) { 1357 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1358 return -errno; 1359 } 1360 1361 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1362 poller->fn, poller->arg, poller->name); 1363 if (rc < 0) { 1364 close(busy_efd); 1365 return rc; 1366 } 1367 1368 poller->interruptfd = busy_efd; 1369 return 0; 1370 } 1371 1372 static void 1373 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1374 { 1375 int busy_efd = poller->interruptfd; 1376 uint64_t notify = 1; 1377 int rc __attribute__((unused)); 1378 1379 assert(busy_efd >= 0); 1380 1381 if (interrupt_mode) { 1382 /* Write without read on eventfd will get it repeatedly triggered. */ 1383 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1384 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1385 } 1386 } else { 1387 /* Read on eventfd will clear its level triggering. */ 1388 rc = read(busy_efd, ¬ify, sizeof(notify)); 1389 } 1390 } 1391 1392 #else 1393 1394 static int 1395 period_poller_interrupt_init(struct spdk_poller *poller) 1396 { 1397 return -ENOTSUP; 1398 } 1399 1400 static void 1401 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1402 { 1403 } 1404 1405 static void 1406 poller_interrupt_fini(struct spdk_poller *poller) 1407 { 1408 } 1409 1410 static int 1411 busy_poller_interrupt_init(struct spdk_poller *poller) 1412 { 1413 return -ENOTSUP; 1414 } 1415 1416 static void 1417 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1418 { 1419 } 1420 1421 #endif 1422 1423 void 1424 spdk_poller_register_interrupt(struct spdk_poller *poller, 1425 spdk_poller_set_interrupt_mode_cb cb_fn, 1426 void *cb_arg) 1427 { 1428 assert(poller != NULL); 1429 assert(cb_fn != NULL); 1430 assert(spdk_get_thread() == poller->thread); 1431 1432 if (!spdk_interrupt_mode_is_enabled()) { 1433 return; 1434 } 1435 1436 /* when a poller is created we don't know if the user is ever going to 1437 * enable interrupts on it by calling this function, so the poller 1438 * registration function has to immediately create a interruptfd. 1439 * When this function does get called by user, we have to then destroy 1440 * that interruptfd. 1441 */ 1442 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1443 poller_interrupt_fini(poller); 1444 } 1445 1446 poller->set_intr_cb_fn = cb_fn; 1447 poller->set_intr_cb_arg = cb_arg; 1448 1449 /* Set poller into interrupt mode if thread is in interrupt. */ 1450 if (poller->thread->in_interrupt) { 1451 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1452 } 1453 } 1454 1455 static uint64_t 1456 convert_us_to_ticks(uint64_t us) 1457 { 1458 uint64_t quotient, remainder, ticks; 1459 1460 if (us) { 1461 quotient = us / SPDK_SEC_TO_USEC; 1462 remainder = us % SPDK_SEC_TO_USEC; 1463 ticks = spdk_get_ticks_hz(); 1464 1465 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1466 } else { 1467 return 0; 1468 } 1469 } 1470 1471 static struct spdk_poller * 1472 poller_register(spdk_poller_fn fn, 1473 void *arg, 1474 uint64_t period_microseconds, 1475 const char *name) 1476 { 1477 struct spdk_thread *thread; 1478 struct spdk_poller *poller; 1479 1480 thread = spdk_get_thread(); 1481 if (!thread) { 1482 assert(false); 1483 return NULL; 1484 } 1485 1486 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1487 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1488 return NULL; 1489 } 1490 1491 poller = calloc(1, sizeof(*poller)); 1492 if (poller == NULL) { 1493 SPDK_ERRLOG("Poller memory allocation failed\n"); 1494 return NULL; 1495 } 1496 1497 if (name) { 1498 snprintf(poller->name, sizeof(poller->name), "%s", name); 1499 } else { 1500 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1501 } 1502 1503 poller->state = SPDK_POLLER_STATE_WAITING; 1504 poller->fn = fn; 1505 poller->arg = arg; 1506 poller->thread = thread; 1507 poller->interruptfd = -1; 1508 if (thread->next_poller_id == 0) { 1509 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1510 thread->next_poller_id = 1; 1511 } 1512 poller->id = thread->next_poller_id++; 1513 1514 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1515 1516 if (spdk_interrupt_mode_is_enabled()) { 1517 int rc; 1518 1519 if (period_microseconds) { 1520 rc = period_poller_interrupt_init(poller); 1521 if (rc < 0) { 1522 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1523 free(poller); 1524 return NULL; 1525 } 1526 1527 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1528 } else { 1529 /* If the poller doesn't have a period, create interruptfd that's always 1530 * busy automatically when running in interrupt mode. 1531 */ 1532 rc = busy_poller_interrupt_init(poller); 1533 if (rc > 0) { 1534 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1535 free(poller); 1536 return NULL; 1537 } 1538 1539 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1540 } 1541 } 1542 1543 thread_insert_poller(thread, poller); 1544 1545 return poller; 1546 } 1547 1548 struct spdk_poller * 1549 spdk_poller_register(spdk_poller_fn fn, 1550 void *arg, 1551 uint64_t period_microseconds) 1552 { 1553 return poller_register(fn, arg, period_microseconds, NULL); 1554 } 1555 1556 struct spdk_poller * 1557 spdk_poller_register_named(spdk_poller_fn fn, 1558 void *arg, 1559 uint64_t period_microseconds, 1560 const char *name) 1561 { 1562 return poller_register(fn, arg, period_microseconds, name); 1563 } 1564 1565 static void 1566 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1567 struct spdk_thread *curthread) 1568 { 1569 if (thread == NULL) { 1570 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1571 abort(); 1572 } 1573 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1574 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1575 thread->name, thread->id); 1576 assert(false); 1577 } 1578 1579 void 1580 spdk_poller_unregister(struct spdk_poller **ppoller) 1581 { 1582 struct spdk_thread *thread; 1583 struct spdk_poller *poller; 1584 1585 poller = *ppoller; 1586 if (poller == NULL) { 1587 return; 1588 } 1589 1590 *ppoller = NULL; 1591 1592 thread = spdk_get_thread(); 1593 if (!thread) { 1594 assert(false); 1595 return; 1596 } 1597 1598 if (poller->thread != thread) { 1599 wrong_thread(__func__, poller->name, poller->thread, thread); 1600 return; 1601 } 1602 1603 if (spdk_interrupt_mode_is_enabled()) { 1604 /* Release the interrupt resource for period or busy poller */ 1605 if (poller->interruptfd >= 0) { 1606 poller_interrupt_fini(poller); 1607 } 1608 1609 /* Mark there is poller unregistered. Then unregistered pollers will 1610 * get reaped by spdk_thread_poll also in intr mode. 1611 */ 1612 thread->poller_unregistered = true; 1613 } 1614 1615 /* If the poller was paused, put it on the active_pollers list so that 1616 * its unregistration can be processed by spdk_thread_poll(). 1617 */ 1618 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1619 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1620 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1621 poller->period_ticks = 0; 1622 } 1623 1624 /* Simply set the state to unregistered. The poller will get cleaned up 1625 * in a subsequent call to spdk_thread_poll(). 1626 */ 1627 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1628 } 1629 1630 void 1631 spdk_poller_pause(struct spdk_poller *poller) 1632 { 1633 struct spdk_thread *thread; 1634 1635 thread = spdk_get_thread(); 1636 if (!thread) { 1637 assert(false); 1638 return; 1639 } 1640 1641 if (poller->thread != thread) { 1642 wrong_thread(__func__, poller->name, poller->thread, thread); 1643 return; 1644 } 1645 1646 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1647 * spdk_thread_poll() move it. It allows a poller to be paused from 1648 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1649 * iteration, or from within itself without breaking the logic to always 1650 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1651 */ 1652 switch (poller->state) { 1653 case SPDK_POLLER_STATE_PAUSED: 1654 case SPDK_POLLER_STATE_PAUSING: 1655 break; 1656 case SPDK_POLLER_STATE_RUNNING: 1657 case SPDK_POLLER_STATE_WAITING: 1658 poller->state = SPDK_POLLER_STATE_PAUSING; 1659 break; 1660 default: 1661 assert(false); 1662 break; 1663 } 1664 } 1665 1666 void 1667 spdk_poller_resume(struct spdk_poller *poller) 1668 { 1669 struct spdk_thread *thread; 1670 1671 thread = spdk_get_thread(); 1672 if (!thread) { 1673 assert(false); 1674 return; 1675 } 1676 1677 if (poller->thread != thread) { 1678 wrong_thread(__func__, poller->name, poller->thread, thread); 1679 return; 1680 } 1681 1682 /* If a poller is paused it has to be removed from the paused pollers 1683 * list and put on the active list or timer tree depending on its 1684 * period_ticks. If a poller is still in the process of being paused, 1685 * we just need to flip its state back to waiting, as it's already on 1686 * the appropriate list or tree. 1687 */ 1688 switch (poller->state) { 1689 case SPDK_POLLER_STATE_PAUSED: 1690 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1691 thread_insert_poller(thread, poller); 1692 /* fallthrough */ 1693 case SPDK_POLLER_STATE_PAUSING: 1694 poller->state = SPDK_POLLER_STATE_WAITING; 1695 break; 1696 case SPDK_POLLER_STATE_RUNNING: 1697 case SPDK_POLLER_STATE_WAITING: 1698 break; 1699 default: 1700 assert(false); 1701 break; 1702 } 1703 } 1704 1705 const char * 1706 spdk_poller_get_name(struct spdk_poller *poller) 1707 { 1708 return poller->name; 1709 } 1710 1711 uint64_t 1712 spdk_poller_get_id(struct spdk_poller *poller) 1713 { 1714 return poller->id; 1715 } 1716 1717 const char * 1718 spdk_poller_get_state_str(struct spdk_poller *poller) 1719 { 1720 switch (poller->state) { 1721 case SPDK_POLLER_STATE_WAITING: 1722 return "waiting"; 1723 case SPDK_POLLER_STATE_RUNNING: 1724 return "running"; 1725 case SPDK_POLLER_STATE_UNREGISTERED: 1726 return "unregistered"; 1727 case SPDK_POLLER_STATE_PAUSING: 1728 return "pausing"; 1729 case SPDK_POLLER_STATE_PAUSED: 1730 return "paused"; 1731 default: 1732 return NULL; 1733 } 1734 } 1735 1736 uint64_t 1737 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1738 { 1739 return poller->period_ticks; 1740 } 1741 1742 void 1743 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1744 { 1745 stats->run_count = poller->run_count; 1746 stats->busy_count = poller->busy_count; 1747 } 1748 1749 struct spdk_poller * 1750 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1751 { 1752 return TAILQ_FIRST(&thread->active_pollers); 1753 } 1754 1755 struct spdk_poller * 1756 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1757 { 1758 return TAILQ_NEXT(prev, tailq); 1759 } 1760 1761 struct spdk_poller * 1762 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1763 { 1764 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1765 } 1766 1767 struct spdk_poller * 1768 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1769 { 1770 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1771 } 1772 1773 struct spdk_poller * 1774 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1775 { 1776 return TAILQ_FIRST(&thread->paused_pollers); 1777 } 1778 1779 struct spdk_poller * 1780 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1781 { 1782 return TAILQ_NEXT(prev, tailq); 1783 } 1784 1785 struct spdk_io_channel * 1786 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1787 { 1788 return RB_MIN(io_channel_tree, &thread->io_channels); 1789 } 1790 1791 struct spdk_io_channel * 1792 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1793 { 1794 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1795 } 1796 1797 struct call_thread { 1798 struct spdk_thread *cur_thread; 1799 spdk_msg_fn fn; 1800 void *ctx; 1801 1802 struct spdk_thread *orig_thread; 1803 spdk_msg_fn cpl; 1804 }; 1805 1806 static void 1807 _on_thread(void *ctx) 1808 { 1809 struct call_thread *ct = ctx; 1810 int rc __attribute__((unused)); 1811 1812 ct->fn(ct->ctx); 1813 1814 pthread_mutex_lock(&g_devlist_mutex); 1815 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1816 pthread_mutex_unlock(&g_devlist_mutex); 1817 1818 if (!ct->cur_thread) { 1819 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1820 1821 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1822 free(ctx); 1823 } else { 1824 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1825 ct->cur_thread->name); 1826 1827 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1828 } 1829 assert(rc == 0); 1830 } 1831 1832 void 1833 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1834 { 1835 struct call_thread *ct; 1836 struct spdk_thread *thread; 1837 int rc __attribute__((unused)); 1838 1839 ct = calloc(1, sizeof(*ct)); 1840 if (!ct) { 1841 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1842 cpl(ctx); 1843 return; 1844 } 1845 1846 ct->fn = fn; 1847 ct->ctx = ctx; 1848 ct->cpl = cpl; 1849 1850 thread = _get_thread(); 1851 if (!thread) { 1852 SPDK_ERRLOG("No thread allocated\n"); 1853 free(ct); 1854 cpl(ctx); 1855 return; 1856 } 1857 ct->orig_thread = thread; 1858 1859 pthread_mutex_lock(&g_devlist_mutex); 1860 ct->cur_thread = TAILQ_FIRST(&g_threads); 1861 pthread_mutex_unlock(&g_devlist_mutex); 1862 1863 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1864 ct->orig_thread->name); 1865 1866 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1867 assert(rc == 0); 1868 } 1869 1870 static inline void 1871 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1872 { 1873 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1874 return; 1875 } 1876 1877 if (!poller->set_intr_cb_fn) { 1878 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1879 assert(false); 1880 return; 1881 } 1882 1883 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1884 } 1885 1886 void 1887 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1888 { 1889 struct spdk_thread *thread = _get_thread(); 1890 struct spdk_poller *poller, *tmp; 1891 1892 assert(thread); 1893 assert(spdk_interrupt_mode_is_enabled()); 1894 1895 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1896 thread->name, enable_interrupt ? "intr" : "poll", 1897 thread->in_interrupt ? "intr" : "poll"); 1898 1899 if (thread->in_interrupt == enable_interrupt) { 1900 return; 1901 } 1902 1903 /* Set pollers to expected mode */ 1904 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1905 poller_set_interrupt_mode(poller, enable_interrupt); 1906 } 1907 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1908 poller_set_interrupt_mode(poller, enable_interrupt); 1909 } 1910 /* All paused pollers will go to work in interrupt mode */ 1911 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1912 poller_set_interrupt_mode(poller, enable_interrupt); 1913 } 1914 1915 thread->in_interrupt = enable_interrupt; 1916 return; 1917 } 1918 1919 static struct io_device * 1920 io_device_get(void *io_device) 1921 { 1922 struct io_device find = {}; 1923 1924 find.io_device = io_device; 1925 return RB_FIND(io_device_tree, &g_io_devices, &find); 1926 } 1927 1928 void 1929 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1930 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1931 const char *name) 1932 { 1933 struct io_device *dev, *tmp; 1934 struct spdk_thread *thread; 1935 1936 assert(io_device != NULL); 1937 assert(create_cb != NULL); 1938 assert(destroy_cb != NULL); 1939 1940 thread = spdk_get_thread(); 1941 if (!thread) { 1942 SPDK_ERRLOG("called from non-SPDK thread\n"); 1943 assert(false); 1944 return; 1945 } 1946 1947 dev = calloc(1, sizeof(struct io_device)); 1948 if (dev == NULL) { 1949 SPDK_ERRLOG("could not allocate io_device\n"); 1950 return; 1951 } 1952 1953 dev->io_device = io_device; 1954 if (name) { 1955 snprintf(dev->name, sizeof(dev->name), "%s", name); 1956 } else { 1957 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1958 } 1959 dev->create_cb = create_cb; 1960 dev->destroy_cb = destroy_cb; 1961 dev->unregister_cb = NULL; 1962 dev->ctx_size = ctx_size; 1963 dev->for_each_count = 0; 1964 dev->unregistered = false; 1965 dev->refcnt = 0; 1966 1967 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1968 dev->name, dev->io_device, thread->name); 1969 1970 pthread_mutex_lock(&g_devlist_mutex); 1971 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1972 if (tmp != NULL) { 1973 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1974 io_device, tmp->name, dev->name); 1975 free(dev); 1976 } 1977 1978 pthread_mutex_unlock(&g_devlist_mutex); 1979 } 1980 1981 static void 1982 _finish_unregister(void *arg) 1983 { 1984 struct io_device *dev = arg; 1985 struct spdk_thread *thread; 1986 1987 thread = spdk_get_thread(); 1988 assert(thread == dev->unregister_thread); 1989 1990 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1991 dev->name, dev->io_device, thread->name); 1992 1993 assert(thread->pending_unregister_count > 0); 1994 thread->pending_unregister_count--; 1995 1996 dev->unregister_cb(dev->io_device); 1997 free(dev); 1998 } 1999 2000 static void 2001 io_device_free(struct io_device *dev) 2002 { 2003 int rc __attribute__((unused)); 2004 2005 if (dev->unregister_cb == NULL) { 2006 free(dev); 2007 } else { 2008 assert(dev->unregister_thread != NULL); 2009 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2010 dev->name, dev->io_device, dev->unregister_thread->name); 2011 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2012 assert(rc == 0); 2013 } 2014 } 2015 2016 void 2017 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2018 { 2019 struct io_device *dev; 2020 uint32_t refcnt; 2021 struct spdk_thread *thread; 2022 2023 thread = spdk_get_thread(); 2024 if (!thread) { 2025 SPDK_ERRLOG("called from non-SPDK thread\n"); 2026 assert(false); 2027 return; 2028 } 2029 2030 pthread_mutex_lock(&g_devlist_mutex); 2031 dev = io_device_get(io_device); 2032 if (!dev) { 2033 SPDK_ERRLOG("io_device %p not found\n", io_device); 2034 assert(false); 2035 pthread_mutex_unlock(&g_devlist_mutex); 2036 return; 2037 } 2038 2039 if (dev->for_each_count > 0) { 2040 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2041 dev->name, io_device, dev->for_each_count); 2042 pthread_mutex_unlock(&g_devlist_mutex); 2043 return; 2044 } 2045 2046 dev->unregister_cb = unregister_cb; 2047 dev->unregistered = true; 2048 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2049 refcnt = dev->refcnt; 2050 dev->unregister_thread = thread; 2051 pthread_mutex_unlock(&g_devlist_mutex); 2052 2053 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2054 dev->name, dev->io_device, thread->name); 2055 2056 if (unregister_cb) { 2057 thread->pending_unregister_count++; 2058 } 2059 2060 if (refcnt > 0) { 2061 /* defer deletion */ 2062 return; 2063 } 2064 2065 io_device_free(dev); 2066 } 2067 2068 const char * 2069 spdk_io_device_get_name(struct io_device *dev) 2070 { 2071 return dev->name; 2072 } 2073 2074 static struct spdk_io_channel * 2075 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2076 { 2077 struct spdk_io_channel find = {}; 2078 2079 find.dev = dev; 2080 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2081 } 2082 2083 struct spdk_io_channel * 2084 spdk_get_io_channel(void *io_device) 2085 { 2086 struct spdk_io_channel *ch; 2087 struct spdk_thread *thread; 2088 struct io_device *dev; 2089 int rc; 2090 2091 pthread_mutex_lock(&g_devlist_mutex); 2092 dev = io_device_get(io_device); 2093 if (dev == NULL) { 2094 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2095 pthread_mutex_unlock(&g_devlist_mutex); 2096 return NULL; 2097 } 2098 2099 thread = _get_thread(); 2100 if (!thread) { 2101 SPDK_ERRLOG("No thread allocated\n"); 2102 pthread_mutex_unlock(&g_devlist_mutex); 2103 return NULL; 2104 } 2105 2106 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2107 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2108 pthread_mutex_unlock(&g_devlist_mutex); 2109 return NULL; 2110 } 2111 2112 ch = thread_get_io_channel(thread, dev); 2113 if (ch != NULL) { 2114 ch->ref++; 2115 2116 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2117 ch, dev->name, dev->io_device, thread->name, ch->ref); 2118 2119 /* 2120 * An I/O channel already exists for this device on this 2121 * thread, so return it. 2122 */ 2123 pthread_mutex_unlock(&g_devlist_mutex); 2124 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2125 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2126 return ch; 2127 } 2128 2129 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2130 if (ch == NULL) { 2131 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2132 pthread_mutex_unlock(&g_devlist_mutex); 2133 return NULL; 2134 } 2135 2136 ch->dev = dev; 2137 ch->destroy_cb = dev->destroy_cb; 2138 ch->thread = thread; 2139 ch->ref = 1; 2140 ch->destroy_ref = 0; 2141 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2142 2143 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2144 ch, dev->name, dev->io_device, thread->name, ch->ref); 2145 2146 dev->refcnt++; 2147 2148 pthread_mutex_unlock(&g_devlist_mutex); 2149 2150 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2151 if (rc != 0) { 2152 pthread_mutex_lock(&g_devlist_mutex); 2153 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2154 dev->refcnt--; 2155 free(ch); 2156 pthread_mutex_unlock(&g_devlist_mutex); 2157 return NULL; 2158 } 2159 2160 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2161 return ch; 2162 } 2163 2164 static void 2165 put_io_channel(void *arg) 2166 { 2167 struct spdk_io_channel *ch = arg; 2168 bool do_remove_dev = true; 2169 struct spdk_thread *thread; 2170 2171 thread = spdk_get_thread(); 2172 if (!thread) { 2173 SPDK_ERRLOG("called from non-SPDK thread\n"); 2174 assert(false); 2175 return; 2176 } 2177 2178 SPDK_DEBUGLOG(thread, 2179 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2180 ch, ch->dev->name, ch->dev->io_device, thread->name); 2181 2182 assert(ch->thread == thread); 2183 2184 ch->destroy_ref--; 2185 2186 if (ch->ref > 0 || ch->destroy_ref > 0) { 2187 /* 2188 * Another reference to the associated io_device was requested 2189 * after this message was sent but before it had a chance to 2190 * execute. 2191 */ 2192 return; 2193 } 2194 2195 pthread_mutex_lock(&g_devlist_mutex); 2196 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2197 pthread_mutex_unlock(&g_devlist_mutex); 2198 2199 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2200 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2201 2202 pthread_mutex_lock(&g_devlist_mutex); 2203 ch->dev->refcnt--; 2204 2205 if (!ch->dev->unregistered) { 2206 do_remove_dev = false; 2207 } 2208 2209 if (ch->dev->refcnt > 0) { 2210 do_remove_dev = false; 2211 } 2212 2213 pthread_mutex_unlock(&g_devlist_mutex); 2214 2215 if (do_remove_dev) { 2216 io_device_free(ch->dev); 2217 } 2218 free(ch); 2219 } 2220 2221 void 2222 spdk_put_io_channel(struct spdk_io_channel *ch) 2223 { 2224 struct spdk_thread *thread; 2225 int rc __attribute__((unused)); 2226 2227 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2228 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2229 2230 thread = spdk_get_thread(); 2231 if (!thread) { 2232 SPDK_ERRLOG("called from non-SPDK thread\n"); 2233 assert(false); 2234 return; 2235 } 2236 2237 if (ch->thread != thread) { 2238 wrong_thread(__func__, "ch", ch->thread, thread); 2239 return; 2240 } 2241 2242 SPDK_DEBUGLOG(thread, 2243 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2244 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2245 2246 ch->ref--; 2247 2248 if (ch->ref == 0) { 2249 ch->destroy_ref++; 2250 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2251 assert(rc == 0); 2252 } 2253 } 2254 2255 struct spdk_io_channel * 2256 spdk_io_channel_from_ctx(void *ctx) 2257 { 2258 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2259 } 2260 2261 struct spdk_thread * 2262 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2263 { 2264 return ch->thread; 2265 } 2266 2267 void * 2268 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2269 { 2270 return ch->dev->io_device; 2271 } 2272 2273 const char * 2274 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2275 { 2276 return spdk_io_device_get_name(ch->dev); 2277 } 2278 2279 int 2280 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2281 { 2282 return ch->ref; 2283 } 2284 2285 struct spdk_io_channel_iter { 2286 void *io_device; 2287 struct io_device *dev; 2288 spdk_channel_msg fn; 2289 int status; 2290 void *ctx; 2291 struct spdk_io_channel *ch; 2292 2293 struct spdk_thread *cur_thread; 2294 2295 struct spdk_thread *orig_thread; 2296 spdk_channel_for_each_cpl cpl; 2297 }; 2298 2299 void * 2300 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2301 { 2302 return i->io_device; 2303 } 2304 2305 struct spdk_io_channel * 2306 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2307 { 2308 return i->ch; 2309 } 2310 2311 void * 2312 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2313 { 2314 return i->ctx; 2315 } 2316 2317 static void 2318 _call_completion(void *ctx) 2319 { 2320 struct spdk_io_channel_iter *i = ctx; 2321 2322 if (i->cpl != NULL) { 2323 i->cpl(i, i->status); 2324 } 2325 free(i); 2326 } 2327 2328 static void 2329 _call_channel(void *ctx) 2330 { 2331 struct spdk_io_channel_iter *i = ctx; 2332 struct spdk_io_channel *ch; 2333 2334 /* 2335 * It is possible that the channel was deleted before this 2336 * message had a chance to execute. If so, skip calling 2337 * the fn() on this thread. 2338 */ 2339 pthread_mutex_lock(&g_devlist_mutex); 2340 ch = thread_get_io_channel(i->cur_thread, i->dev); 2341 pthread_mutex_unlock(&g_devlist_mutex); 2342 2343 if (ch) { 2344 i->fn(i); 2345 } else { 2346 spdk_for_each_channel_continue(i, 0); 2347 } 2348 } 2349 2350 void 2351 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2352 spdk_channel_for_each_cpl cpl) 2353 { 2354 struct spdk_thread *thread; 2355 struct spdk_io_channel *ch; 2356 struct spdk_io_channel_iter *i; 2357 int rc __attribute__((unused)); 2358 2359 i = calloc(1, sizeof(*i)); 2360 if (!i) { 2361 SPDK_ERRLOG("Unable to allocate iterator\n"); 2362 return; 2363 } 2364 2365 i->io_device = io_device; 2366 i->fn = fn; 2367 i->ctx = ctx; 2368 i->cpl = cpl; 2369 i->orig_thread = _get_thread(); 2370 2371 pthread_mutex_lock(&g_devlist_mutex); 2372 i->dev = io_device_get(io_device); 2373 if (i->dev == NULL) { 2374 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2375 assert(false); 2376 goto end; 2377 } 2378 2379 TAILQ_FOREACH(thread, &g_threads, tailq) { 2380 ch = thread_get_io_channel(thread, i->dev); 2381 if (ch != NULL) { 2382 ch->dev->for_each_count++; 2383 i->cur_thread = thread; 2384 i->ch = ch; 2385 pthread_mutex_unlock(&g_devlist_mutex); 2386 rc = spdk_thread_send_msg(thread, _call_channel, i); 2387 assert(rc == 0); 2388 return; 2389 } 2390 } 2391 2392 end: 2393 pthread_mutex_unlock(&g_devlist_mutex); 2394 2395 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2396 assert(rc == 0); 2397 } 2398 2399 void 2400 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2401 { 2402 struct spdk_thread *thread; 2403 struct spdk_io_channel *ch; 2404 int rc __attribute__((unused)); 2405 2406 assert(i->cur_thread == spdk_get_thread()); 2407 2408 i->status = status; 2409 2410 pthread_mutex_lock(&g_devlist_mutex); 2411 if (status) { 2412 goto end; 2413 } 2414 thread = TAILQ_NEXT(i->cur_thread, tailq); 2415 while (thread) { 2416 ch = thread_get_io_channel(thread, i->dev); 2417 if (ch != NULL) { 2418 i->cur_thread = thread; 2419 i->ch = ch; 2420 pthread_mutex_unlock(&g_devlist_mutex); 2421 rc = spdk_thread_send_msg(thread, _call_channel, i); 2422 assert(rc == 0); 2423 return; 2424 } 2425 thread = TAILQ_NEXT(thread, tailq); 2426 } 2427 2428 end: 2429 i->dev->for_each_count--; 2430 i->ch = NULL; 2431 pthread_mutex_unlock(&g_devlist_mutex); 2432 2433 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2434 assert(rc == 0); 2435 } 2436 2437 struct spdk_interrupt { 2438 int efd; 2439 struct spdk_thread *thread; 2440 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2441 }; 2442 2443 static void 2444 thread_interrupt_destroy(struct spdk_thread *thread) 2445 { 2446 struct spdk_fd_group *fgrp = thread->fgrp; 2447 2448 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2449 2450 if (thread->msg_fd < 0) { 2451 return; 2452 } 2453 2454 spdk_fd_group_remove(fgrp, thread->msg_fd); 2455 close(thread->msg_fd); 2456 thread->msg_fd = -1; 2457 2458 spdk_fd_group_destroy(fgrp); 2459 thread->fgrp = NULL; 2460 } 2461 2462 #ifdef __linux__ 2463 static int 2464 thread_interrupt_msg_process(void *arg) 2465 { 2466 struct spdk_thread *thread = arg; 2467 uint32_t msg_count; 2468 spdk_msg_fn critical_msg; 2469 int rc = 0; 2470 uint64_t notify = 1; 2471 2472 assert(spdk_interrupt_mode_is_enabled()); 2473 2474 /* There may be race between msg_acknowledge and another producer's msg_notify, 2475 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2476 * This can avoid msg notification missing. 2477 */ 2478 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2479 if (rc < 0 && errno != EAGAIN) { 2480 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2481 } 2482 2483 critical_msg = thread->critical_msg; 2484 if (spdk_unlikely(critical_msg != NULL)) { 2485 critical_msg(NULL); 2486 thread->critical_msg = NULL; 2487 rc = 1; 2488 } 2489 2490 msg_count = msg_queue_run_batch(thread, 0); 2491 if (msg_count) { 2492 rc = 1; 2493 } 2494 2495 return rc; 2496 } 2497 2498 static int 2499 thread_interrupt_create(struct spdk_thread *thread) 2500 { 2501 int rc; 2502 2503 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2504 2505 rc = spdk_fd_group_create(&thread->fgrp); 2506 if (rc) { 2507 return rc; 2508 } 2509 2510 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2511 if (thread->msg_fd < 0) { 2512 rc = -errno; 2513 spdk_fd_group_destroy(thread->fgrp); 2514 thread->fgrp = NULL; 2515 2516 return rc; 2517 } 2518 2519 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2520 thread_interrupt_msg_process, thread); 2521 } 2522 #else 2523 static int 2524 thread_interrupt_create(struct spdk_thread *thread) 2525 { 2526 return -ENOTSUP; 2527 } 2528 #endif 2529 2530 struct spdk_interrupt * 2531 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2532 void *arg, const char *name) 2533 { 2534 struct spdk_thread *thread; 2535 struct spdk_interrupt *intr; 2536 int ret; 2537 2538 thread = spdk_get_thread(); 2539 if (!thread) { 2540 assert(false); 2541 return NULL; 2542 } 2543 2544 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2545 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2546 return NULL; 2547 } 2548 2549 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2550 2551 if (ret != 0) { 2552 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2553 thread->name, efd, spdk_strerror(-ret)); 2554 return NULL; 2555 } 2556 2557 intr = calloc(1, sizeof(*intr)); 2558 if (intr == NULL) { 2559 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2560 return NULL; 2561 } 2562 2563 if (name) { 2564 snprintf(intr->name, sizeof(intr->name), "%s", name); 2565 } else { 2566 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2567 } 2568 2569 intr->efd = efd; 2570 intr->thread = thread; 2571 2572 return intr; 2573 } 2574 2575 void 2576 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2577 { 2578 struct spdk_thread *thread; 2579 struct spdk_interrupt *intr; 2580 2581 intr = *pintr; 2582 if (intr == NULL) { 2583 return; 2584 } 2585 2586 *pintr = NULL; 2587 2588 thread = spdk_get_thread(); 2589 if (!thread) { 2590 assert(false); 2591 return; 2592 } 2593 2594 if (intr->thread != thread) { 2595 wrong_thread(__func__, intr->name, intr->thread, thread); 2596 return; 2597 } 2598 2599 spdk_fd_group_remove(thread->fgrp, intr->efd); 2600 free(intr); 2601 } 2602 2603 int 2604 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2605 enum spdk_interrupt_event_types event_types) 2606 { 2607 struct spdk_thread *thread; 2608 2609 thread = spdk_get_thread(); 2610 if (!thread) { 2611 assert(false); 2612 return -EINVAL; 2613 } 2614 2615 if (intr->thread != thread) { 2616 wrong_thread(__func__, intr->name, intr->thread, thread); 2617 return -EINVAL; 2618 } 2619 2620 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2621 } 2622 2623 int 2624 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2625 { 2626 return spdk_fd_group_get_fd(thread->fgrp); 2627 } 2628 2629 static bool g_interrupt_mode = false; 2630 2631 int 2632 spdk_interrupt_mode_enable(void) 2633 { 2634 /* It must be called once prior to initializing the threading library. 2635 * g_spdk_msg_mempool will be valid if thread library is initialized. 2636 */ 2637 if (g_spdk_msg_mempool) { 2638 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2639 return -1; 2640 } 2641 2642 #ifdef __linux__ 2643 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2644 g_interrupt_mode = true; 2645 return 0; 2646 #else 2647 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2648 g_interrupt_mode = false; 2649 return -ENOTSUP; 2650 #endif 2651 } 2652 2653 bool 2654 spdk_interrupt_mode_is_enabled(void) 2655 { 2656 return g_interrupt_mode; 2657 } 2658 2659 SPDK_LOG_REGISTER_COMPONENT(thread) 2660