1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/trace.h" 42 #include "spdk/util.h" 43 #include "spdk/fd_group.h" 44 45 #include "spdk/log.h" 46 #include "spdk_internal/thread.h" 47 #include "thread_internal.h" 48 49 #include "spdk_internal/trace_defs.h" 50 51 #ifdef __linux__ 52 #include <sys/timerfd.h> 53 #include <sys/eventfd.h> 54 #endif 55 56 #define SPDK_MSG_BATCH_SIZE 8 57 #define SPDK_MAX_DEVICE_NAME_LEN 256 58 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 59 #define SPDK_MAX_POLLER_NAME_LEN 256 60 #define SPDK_MAX_THREAD_NAME_LEN 256 61 62 enum spdk_poller_state { 63 /* The poller is registered with a thread but not currently executing its fn. */ 64 SPDK_POLLER_STATE_WAITING, 65 66 /* The poller is currently running its fn. */ 67 SPDK_POLLER_STATE_RUNNING, 68 69 /* The poller was unregistered during the execution of its fn. */ 70 SPDK_POLLER_STATE_UNREGISTERED, 71 72 /* The poller is in the process of being paused. It will be paused 73 * during the next time it's supposed to be executed. 74 */ 75 SPDK_POLLER_STATE_PAUSING, 76 77 /* The poller is registered but currently paused. It's on the 78 * paused_pollers list. 79 */ 80 SPDK_POLLER_STATE_PAUSED, 81 }; 82 83 struct spdk_poller { 84 TAILQ_ENTRY(spdk_poller) tailq; 85 RB_ENTRY(spdk_poller) node; 86 87 /* Current state of the poller; should only be accessed from the poller's thread. */ 88 enum spdk_poller_state state; 89 90 uint64_t period_ticks; 91 uint64_t next_run_tick; 92 uint64_t run_count; 93 uint64_t busy_count; 94 uint64_t id; 95 spdk_poller_fn fn; 96 void *arg; 97 struct spdk_thread *thread; 98 /* Native interruptfd for period or busy poller */ 99 int interruptfd; 100 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 101 void *set_intr_cb_arg; 102 103 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 104 }; 105 106 enum spdk_thread_state { 107 /* The thread is processing poller and message by spdk_thread_poll(). */ 108 SPDK_THREAD_STATE_RUNNING, 109 110 /* The thread is in the process of termination. It reaps unregistering 111 * poller are releasing I/O channel. 112 */ 113 SPDK_THREAD_STATE_EXITING, 114 115 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 116 SPDK_THREAD_STATE_EXITED, 117 }; 118 119 struct spdk_thread { 120 uint64_t tsc_last; 121 struct spdk_thread_stats stats; 122 /* 123 * Contains pollers actively running on this thread. Pollers 124 * are run round-robin. The thread takes one poller from the head 125 * of the ring, executes it, then puts it back at the tail of 126 * the ring. 127 */ 128 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 129 /** 130 * Contains pollers running on this thread with a periodic timer. 131 */ 132 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 133 struct spdk_poller *first_timed_poller; 134 /* 135 * Contains paused pollers. Pollers on this queue are waiting until 136 * they are resumed (in which case they're put onto the active/timer 137 * queues) or unregistered. 138 */ 139 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 140 struct spdk_ring *messages; 141 int msg_fd; 142 SLIST_HEAD(, spdk_msg) msg_cache; 143 size_t msg_cache_count; 144 spdk_msg_fn critical_msg; 145 uint64_t id; 146 uint64_t next_poller_id; 147 enum spdk_thread_state state; 148 int pending_unregister_count; 149 150 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 151 TAILQ_ENTRY(spdk_thread) tailq; 152 153 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 154 struct spdk_cpuset cpumask; 155 uint64_t exit_timeout_tsc; 156 157 /* Indicates whether this spdk_thread currently runs in interrupt. */ 158 bool in_interrupt; 159 bool poller_unregistered; 160 struct spdk_fd_group *fgrp; 161 162 /* User context allocated at the end */ 163 uint8_t ctx[0]; 164 }; 165 166 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 167 168 static spdk_new_thread_fn g_new_thread_fn = NULL; 169 static spdk_thread_op_fn g_thread_op_fn = NULL; 170 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 171 static size_t g_ctx_sz = 0; 172 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 173 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 174 * SPDK application is required. 175 */ 176 static uint64_t g_thread_id = 1; 177 178 struct io_device { 179 void *io_device; 180 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 181 spdk_io_channel_create_cb create_cb; 182 spdk_io_channel_destroy_cb destroy_cb; 183 spdk_io_device_unregister_cb unregister_cb; 184 struct spdk_thread *unregister_thread; 185 uint32_t ctx_size; 186 uint32_t for_each_count; 187 RB_ENTRY(io_device) node; 188 189 uint32_t refcnt; 190 191 bool unregistered; 192 }; 193 194 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 195 196 static int 197 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 198 { 199 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 200 } 201 202 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 203 204 static int 205 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 206 { 207 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 208 } 209 210 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 211 212 struct spdk_msg { 213 spdk_msg_fn fn; 214 void *arg; 215 216 SLIST_ENTRY(spdk_msg) link; 217 }; 218 219 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 220 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 221 222 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 223 static uint32_t g_thread_count = 0; 224 225 static __thread struct spdk_thread *tls_thread = NULL; 226 227 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 228 { 229 spdk_trace_register_description("THREAD_IOCH_GET", 230 TRACE_THREAD_IOCH_GET, 231 OWNER_NONE, OBJECT_NONE, 0, 232 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 233 spdk_trace_register_description("THREAD_IOCH_PUT", 234 TRACE_THREAD_IOCH_PUT, 235 OWNER_NONE, OBJECT_NONE, 0, 236 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 237 } 238 239 /* 240 * If this compare function returns zero when two next_run_ticks are equal, 241 * the macro RB_INSERT() returns a pointer to the element with the same 242 * next_run_tick. 243 * 244 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 245 * to remove as a parameter. 246 * 247 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 248 * side by returning 1 when two next_run_ticks are equal. 249 */ 250 static inline int 251 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 252 { 253 if (poller1->next_run_tick < poller2->next_run_tick) { 254 return -1; 255 } else { 256 return 1; 257 } 258 } 259 260 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 261 262 static inline struct spdk_thread * 263 _get_thread(void) 264 { 265 return tls_thread; 266 } 267 268 static int 269 _thread_lib_init(size_t ctx_sz) 270 { 271 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 272 273 g_ctx_sz = ctx_sz; 274 275 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 276 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 277 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 278 sizeof(struct spdk_msg), 279 0, /* No cache. We do our own. */ 280 SPDK_ENV_SOCKET_ID_ANY); 281 282 if (!g_spdk_msg_mempool) { 283 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 284 return -1; 285 } 286 287 return 0; 288 } 289 290 int 291 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 292 { 293 assert(g_new_thread_fn == NULL); 294 assert(g_thread_op_fn == NULL); 295 296 if (new_thread_fn == NULL) { 297 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 298 } else { 299 g_new_thread_fn = new_thread_fn; 300 } 301 302 return _thread_lib_init(ctx_sz); 303 } 304 305 int 306 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 307 spdk_thread_op_supported_fn thread_op_supported_fn, 308 size_t ctx_sz) 309 { 310 assert(g_new_thread_fn == NULL); 311 assert(g_thread_op_fn == NULL); 312 assert(g_thread_op_supported_fn == NULL); 313 314 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 315 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 316 return -EINVAL; 317 } 318 319 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 320 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 321 } else { 322 g_thread_op_fn = thread_op_fn; 323 g_thread_op_supported_fn = thread_op_supported_fn; 324 } 325 326 return _thread_lib_init(ctx_sz); 327 } 328 329 void 330 spdk_thread_lib_fini(void) 331 { 332 struct io_device *dev; 333 334 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 335 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 336 } 337 338 if (g_spdk_msg_mempool) { 339 spdk_mempool_free(g_spdk_msg_mempool); 340 g_spdk_msg_mempool = NULL; 341 } 342 343 g_new_thread_fn = NULL; 344 g_thread_op_fn = NULL; 345 g_thread_op_supported_fn = NULL; 346 g_ctx_sz = 0; 347 } 348 349 static void thread_interrupt_destroy(struct spdk_thread *thread); 350 static int thread_interrupt_create(struct spdk_thread *thread); 351 352 static void 353 _free_thread(struct spdk_thread *thread) 354 { 355 struct spdk_io_channel *ch; 356 struct spdk_msg *msg; 357 struct spdk_poller *poller, *ptmp; 358 359 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 360 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 361 thread->name, ch->dev->name); 362 } 363 364 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 365 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 366 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 367 poller->name); 368 } 369 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 370 free(poller); 371 } 372 373 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 374 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 375 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 376 poller->name); 377 } 378 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 379 free(poller); 380 } 381 382 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 383 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 384 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 385 free(poller); 386 } 387 388 pthread_mutex_lock(&g_devlist_mutex); 389 assert(g_thread_count > 0); 390 g_thread_count--; 391 TAILQ_REMOVE(&g_threads, thread, tailq); 392 pthread_mutex_unlock(&g_devlist_mutex); 393 394 msg = SLIST_FIRST(&thread->msg_cache); 395 while (msg != NULL) { 396 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 397 398 assert(thread->msg_cache_count > 0); 399 thread->msg_cache_count--; 400 spdk_mempool_put(g_spdk_msg_mempool, msg); 401 402 msg = SLIST_FIRST(&thread->msg_cache); 403 } 404 405 assert(thread->msg_cache_count == 0); 406 407 if (spdk_interrupt_mode_is_enabled()) { 408 thread_interrupt_destroy(thread); 409 } 410 411 spdk_ring_free(thread->messages); 412 free(thread); 413 } 414 415 struct spdk_thread * 416 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 417 { 418 struct spdk_thread *thread; 419 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 420 int rc = 0, i; 421 422 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 423 if (!thread) { 424 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 425 return NULL; 426 } 427 428 if (cpumask) { 429 spdk_cpuset_copy(&thread->cpumask, cpumask); 430 } else { 431 spdk_cpuset_negate(&thread->cpumask); 432 } 433 434 RB_INIT(&thread->io_channels); 435 TAILQ_INIT(&thread->active_pollers); 436 RB_INIT(&thread->timed_pollers); 437 TAILQ_INIT(&thread->paused_pollers); 438 SLIST_INIT(&thread->msg_cache); 439 thread->msg_cache_count = 0; 440 441 thread->tsc_last = spdk_get_ticks(); 442 443 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 444 * ID exceeds UINT64_MAX a warning message is logged 445 */ 446 thread->next_poller_id = 1; 447 448 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 449 if (!thread->messages) { 450 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 451 free(thread); 452 return NULL; 453 } 454 455 /* Fill the local message pool cache. */ 456 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 457 if (rc == 0) { 458 /* If we can't populate the cache it's ok. The cache will get filled 459 * up organically as messages are passed to the thread. */ 460 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 461 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 462 thread->msg_cache_count++; 463 } 464 } 465 466 if (name) { 467 snprintf(thread->name, sizeof(thread->name), "%s", name); 468 } else { 469 snprintf(thread->name, sizeof(thread->name), "%p", thread); 470 } 471 472 pthread_mutex_lock(&g_devlist_mutex); 473 if (g_thread_id == 0) { 474 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 475 pthread_mutex_unlock(&g_devlist_mutex); 476 _free_thread(thread); 477 return NULL; 478 } 479 thread->id = g_thread_id++; 480 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 481 g_thread_count++; 482 pthread_mutex_unlock(&g_devlist_mutex); 483 484 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 485 thread->id, thread->name); 486 487 if (spdk_interrupt_mode_is_enabled()) { 488 thread->in_interrupt = true; 489 rc = thread_interrupt_create(thread); 490 if (rc != 0) { 491 _free_thread(thread); 492 return NULL; 493 } 494 } 495 496 if (g_new_thread_fn) { 497 rc = g_new_thread_fn(thread); 498 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 499 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 500 } 501 502 if (rc != 0) { 503 _free_thread(thread); 504 return NULL; 505 } 506 507 thread->state = SPDK_THREAD_STATE_RUNNING; 508 509 return thread; 510 } 511 512 void 513 spdk_set_thread(struct spdk_thread *thread) 514 { 515 tls_thread = thread; 516 } 517 518 static void 519 thread_exit(struct spdk_thread *thread, uint64_t now) 520 { 521 struct spdk_poller *poller; 522 struct spdk_io_channel *ch; 523 524 if (now >= thread->exit_timeout_tsc) { 525 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 526 thread->name); 527 goto exited; 528 } 529 530 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 531 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 532 SPDK_INFOLOG(thread, 533 "thread %s still has active poller %s\n", 534 thread->name, poller->name); 535 return; 536 } 537 } 538 539 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 540 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 541 SPDK_INFOLOG(thread, 542 "thread %s still has active timed poller %s\n", 543 thread->name, poller->name); 544 return; 545 } 546 } 547 548 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 549 SPDK_INFOLOG(thread, 550 "thread %s still has paused poller %s\n", 551 thread->name, poller->name); 552 return; 553 } 554 555 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 556 SPDK_INFOLOG(thread, 557 "thread %s still has channel for io_device %s\n", 558 thread->name, ch->dev->name); 559 return; 560 } 561 562 if (thread->pending_unregister_count > 0) { 563 SPDK_INFOLOG(thread, 564 "thread %s is still unregistering io_devices\n", 565 thread->name); 566 return; 567 } 568 569 exited: 570 thread->state = SPDK_THREAD_STATE_EXITED; 571 } 572 573 int 574 spdk_thread_exit(struct spdk_thread *thread) 575 { 576 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 577 578 assert(tls_thread == thread); 579 580 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 581 SPDK_INFOLOG(thread, 582 "thread %s is already exiting\n", 583 thread->name); 584 return 0; 585 } 586 587 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 588 SPDK_THREAD_EXIT_TIMEOUT_SEC); 589 thread->state = SPDK_THREAD_STATE_EXITING; 590 return 0; 591 } 592 593 bool 594 spdk_thread_is_exited(struct spdk_thread *thread) 595 { 596 return thread->state == SPDK_THREAD_STATE_EXITED; 597 } 598 599 void 600 spdk_thread_destroy(struct spdk_thread *thread) 601 { 602 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 603 604 assert(thread->state == SPDK_THREAD_STATE_EXITED); 605 606 if (tls_thread == thread) { 607 tls_thread = NULL; 608 } 609 610 _free_thread(thread); 611 } 612 613 void * 614 spdk_thread_get_ctx(struct spdk_thread *thread) 615 { 616 if (g_ctx_sz > 0) { 617 return thread->ctx; 618 } 619 620 return NULL; 621 } 622 623 struct spdk_cpuset * 624 spdk_thread_get_cpumask(struct spdk_thread *thread) 625 { 626 return &thread->cpumask; 627 } 628 629 int 630 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 631 { 632 struct spdk_thread *thread; 633 634 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 635 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 636 assert(false); 637 return -ENOTSUP; 638 } 639 640 thread = spdk_get_thread(); 641 if (!thread) { 642 SPDK_ERRLOG("Called from non-SPDK thread\n"); 643 assert(false); 644 return -EINVAL; 645 } 646 647 spdk_cpuset_copy(&thread->cpumask, cpumask); 648 649 /* Invoke framework's reschedule operation. If this function is called multiple times 650 * in a single spdk_thread_poll() context, the last cpumask will be used in the 651 * reschedule operation. 652 */ 653 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 654 655 return 0; 656 } 657 658 struct spdk_thread * 659 spdk_thread_get_from_ctx(void *ctx) 660 { 661 if (ctx == NULL) { 662 assert(false); 663 return NULL; 664 } 665 666 assert(g_ctx_sz > 0); 667 668 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 669 } 670 671 static inline uint32_t 672 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 673 { 674 unsigned count, i; 675 void *messages[SPDK_MSG_BATCH_SIZE]; 676 uint64_t notify = 1; 677 int rc; 678 679 #ifdef DEBUG 680 /* 681 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 682 * so we will never actually read uninitialized data from events, but just to be sure 683 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 684 */ 685 memset(messages, 0, sizeof(messages)); 686 #endif 687 688 if (max_msgs > 0) { 689 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 690 } else { 691 max_msgs = SPDK_MSG_BATCH_SIZE; 692 } 693 694 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 695 if (spdk_unlikely(thread->in_interrupt) && 696 spdk_ring_count(thread->messages) != 0) { 697 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 698 if (rc < 0) { 699 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 700 } 701 } 702 if (count == 0) { 703 return 0; 704 } 705 706 for (i = 0; i < count; i++) { 707 struct spdk_msg *msg = messages[i]; 708 709 assert(msg != NULL); 710 msg->fn(msg->arg); 711 712 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 713 /* Insert the messages at the head. We want to re-use the hot 714 * ones. */ 715 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 716 thread->msg_cache_count++; 717 } else { 718 spdk_mempool_put(g_spdk_msg_mempool, msg); 719 } 720 } 721 722 return count; 723 } 724 725 static void 726 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 727 { 728 struct spdk_poller *tmp __attribute__((unused)); 729 730 poller->next_run_tick = now + poller->period_ticks; 731 732 /* 733 * Insert poller in the thread's timed_pollers tree by next scheduled run time 734 * as its key. 735 */ 736 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 737 assert(tmp == NULL); 738 739 /* Update the cache only if it is empty or the inserted poller is earlier than it. 740 * RB_MIN() is not necessary here because all pollers, which has exactly the same 741 * next_run_tick as the existing poller, are inserted on the right side. 742 */ 743 if (thread->first_timed_poller == NULL || 744 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 745 thread->first_timed_poller = poller; 746 } 747 } 748 749 static inline void 750 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 751 { 752 struct spdk_poller *tmp __attribute__((unused)); 753 754 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 755 assert(tmp != NULL); 756 757 /* This function is not used in any case that is performance critical. 758 * Update the cache simply by RB_MIN() if it needs to be changed. 759 */ 760 if (thread->first_timed_poller == poller) { 761 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 762 } 763 } 764 765 static void 766 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 767 { 768 if (poller->period_ticks) { 769 poller_insert_timer(thread, poller, spdk_get_ticks()); 770 } else { 771 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 772 } 773 } 774 775 static inline void 776 thread_update_stats(struct spdk_thread *thread, uint64_t end, 777 uint64_t start, int rc) 778 { 779 if (rc == 0) { 780 /* Poller status idle */ 781 thread->stats.idle_tsc += end - start; 782 } else if (rc > 0) { 783 /* Poller status busy */ 784 thread->stats.busy_tsc += end - start; 785 } 786 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 787 thread->tsc_last = end; 788 } 789 790 static inline int 791 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 792 { 793 int rc; 794 795 switch (poller->state) { 796 case SPDK_POLLER_STATE_UNREGISTERED: 797 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 798 free(poller); 799 return 0; 800 case SPDK_POLLER_STATE_PAUSING: 801 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 802 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 803 poller->state = SPDK_POLLER_STATE_PAUSED; 804 return 0; 805 case SPDK_POLLER_STATE_WAITING: 806 break; 807 default: 808 assert(false); 809 break; 810 } 811 812 poller->state = SPDK_POLLER_STATE_RUNNING; 813 rc = poller->fn(poller->arg); 814 815 poller->run_count++; 816 if (rc > 0) { 817 poller->busy_count++; 818 } 819 820 #ifdef DEBUG 821 if (rc == -1) { 822 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 823 } 824 #endif 825 826 switch (poller->state) { 827 case SPDK_POLLER_STATE_UNREGISTERED: 828 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 829 free(poller); 830 break; 831 case SPDK_POLLER_STATE_PAUSING: 832 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 833 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 834 poller->state = SPDK_POLLER_STATE_PAUSED; 835 break; 836 case SPDK_POLLER_STATE_PAUSED: 837 case SPDK_POLLER_STATE_WAITING: 838 break; 839 case SPDK_POLLER_STATE_RUNNING: 840 poller->state = SPDK_POLLER_STATE_WAITING; 841 break; 842 default: 843 assert(false); 844 break; 845 } 846 847 return rc; 848 } 849 850 static inline int 851 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 852 uint64_t now) 853 { 854 int rc; 855 856 switch (poller->state) { 857 case SPDK_POLLER_STATE_UNREGISTERED: 858 free(poller); 859 return 0; 860 case SPDK_POLLER_STATE_PAUSING: 861 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 862 poller->state = SPDK_POLLER_STATE_PAUSED; 863 return 0; 864 case SPDK_POLLER_STATE_WAITING: 865 break; 866 default: 867 assert(false); 868 break; 869 } 870 871 poller->state = SPDK_POLLER_STATE_RUNNING; 872 rc = poller->fn(poller->arg); 873 874 poller->run_count++; 875 if (rc > 0) { 876 poller->busy_count++; 877 } 878 879 #ifdef DEBUG 880 if (rc == -1) { 881 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 882 } 883 #endif 884 885 switch (poller->state) { 886 case SPDK_POLLER_STATE_UNREGISTERED: 887 free(poller); 888 break; 889 case SPDK_POLLER_STATE_PAUSING: 890 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 891 poller->state = SPDK_POLLER_STATE_PAUSED; 892 break; 893 case SPDK_POLLER_STATE_PAUSED: 894 break; 895 case SPDK_POLLER_STATE_RUNNING: 896 poller->state = SPDK_POLLER_STATE_WAITING; 897 /* fallthrough */ 898 case SPDK_POLLER_STATE_WAITING: 899 poller_insert_timer(thread, poller, now); 900 break; 901 default: 902 assert(false); 903 break; 904 } 905 906 return rc; 907 } 908 909 static int 910 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 911 { 912 uint32_t msg_count; 913 struct spdk_poller *poller, *tmp; 914 spdk_msg_fn critical_msg; 915 int rc = 0; 916 917 thread->tsc_last = now; 918 919 critical_msg = thread->critical_msg; 920 if (spdk_unlikely(critical_msg != NULL)) { 921 critical_msg(NULL); 922 thread->critical_msg = NULL; 923 rc = 1; 924 } 925 926 msg_count = msg_queue_run_batch(thread, max_msgs); 927 if (msg_count) { 928 rc = 1; 929 } 930 931 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 932 active_pollers_head, tailq, tmp) { 933 int poller_rc; 934 935 poller_rc = thread_execute_poller(thread, poller); 936 if (poller_rc > rc) { 937 rc = poller_rc; 938 } 939 } 940 941 poller = thread->first_timed_poller; 942 while (poller != NULL) { 943 int timer_rc = 0; 944 945 if (now < poller->next_run_tick) { 946 break; 947 } 948 949 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 950 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 951 952 /* Update the cache to the next timed poller in the list 953 * only if the current poller is still the closest, otherwise, 954 * do nothing because the cache has been already updated. 955 */ 956 if (thread->first_timed_poller == poller) { 957 thread->first_timed_poller = tmp; 958 } 959 960 timer_rc = thread_execute_timed_poller(thread, poller, now); 961 if (timer_rc > rc) { 962 rc = timer_rc; 963 } 964 965 poller = tmp; 966 } 967 968 return rc; 969 } 970 971 int 972 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 973 { 974 struct spdk_thread *orig_thread; 975 int rc; 976 uint64_t notify = 1; 977 978 orig_thread = _get_thread(); 979 tls_thread = thread; 980 981 if (now == 0) { 982 now = spdk_get_ticks(); 983 } 984 985 if (spdk_likely(!thread->in_interrupt)) { 986 rc = thread_poll(thread, max_msgs, now); 987 if (spdk_unlikely(thread->in_interrupt)) { 988 /* The thread transitioned to interrupt mode during the above poll. 989 * Poll it one more time in case that during the transition time 990 * there is msg received without notification. 991 */ 992 rc = thread_poll(thread, max_msgs, now); 993 } 994 } else { 995 /* Non-block wait on thread's fd_group */ 996 rc = spdk_fd_group_wait(thread->fgrp, 0); 997 if (spdk_unlikely(!thread->in_interrupt)) { 998 /* The thread transitioned to poll mode in a msg during the above processing. 999 * Clear msg_fd since thread messages will be polled directly in poll mode. 1000 */ 1001 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 1002 if (rc < 0 && errno != EAGAIN) { 1003 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 1004 } 1005 } 1006 1007 /* Reap unregistered pollers out of poller execution in intr mode */ 1008 if (spdk_unlikely(thread->poller_unregistered)) { 1009 struct spdk_poller *poller, *tmp; 1010 1011 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1012 active_pollers_head, tailq, tmp) { 1013 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1014 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1015 free(poller); 1016 } 1017 } 1018 1019 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1020 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1021 poller_remove_timer(thread, poller); 1022 free(poller); 1023 } 1024 } 1025 1026 thread->poller_unregistered = false; 1027 } 1028 } 1029 1030 1031 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1032 thread_exit(thread, now); 1033 } 1034 1035 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1036 1037 tls_thread = orig_thread; 1038 1039 return rc; 1040 } 1041 1042 uint64_t 1043 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1044 { 1045 struct spdk_poller *poller; 1046 1047 poller = thread->first_timed_poller; 1048 if (poller) { 1049 return poller->next_run_tick; 1050 } 1051 1052 return 0; 1053 } 1054 1055 int 1056 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1057 { 1058 return !TAILQ_EMPTY(&thread->active_pollers); 1059 } 1060 1061 static bool 1062 thread_has_unpaused_pollers(struct spdk_thread *thread) 1063 { 1064 if (TAILQ_EMPTY(&thread->active_pollers) && 1065 RB_EMPTY(&thread->timed_pollers)) { 1066 return false; 1067 } 1068 1069 return true; 1070 } 1071 1072 bool 1073 spdk_thread_has_pollers(struct spdk_thread *thread) 1074 { 1075 if (!thread_has_unpaused_pollers(thread) && 1076 TAILQ_EMPTY(&thread->paused_pollers)) { 1077 return false; 1078 } 1079 1080 return true; 1081 } 1082 1083 bool 1084 spdk_thread_is_idle(struct spdk_thread *thread) 1085 { 1086 if (spdk_ring_count(thread->messages) || 1087 thread_has_unpaused_pollers(thread) || 1088 thread->critical_msg != NULL) { 1089 return false; 1090 } 1091 1092 return true; 1093 } 1094 1095 uint32_t 1096 spdk_thread_get_count(void) 1097 { 1098 /* 1099 * Return cached value of the current thread count. We could acquire the 1100 * lock and iterate through the TAILQ of threads to count them, but that 1101 * count could still be invalidated after we release the lock. 1102 */ 1103 return g_thread_count; 1104 } 1105 1106 struct spdk_thread * 1107 spdk_get_thread(void) 1108 { 1109 return _get_thread(); 1110 } 1111 1112 const char * 1113 spdk_thread_get_name(const struct spdk_thread *thread) 1114 { 1115 return thread->name; 1116 } 1117 1118 uint64_t 1119 spdk_thread_get_id(const struct spdk_thread *thread) 1120 { 1121 return thread->id; 1122 } 1123 1124 struct spdk_thread * 1125 spdk_thread_get_by_id(uint64_t id) 1126 { 1127 struct spdk_thread *thread; 1128 1129 if (id == 0 || id >= g_thread_id) { 1130 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1131 return NULL; 1132 } 1133 pthread_mutex_lock(&g_devlist_mutex); 1134 TAILQ_FOREACH(thread, &g_threads, tailq) { 1135 if (thread->id == id) { 1136 break; 1137 } 1138 } 1139 pthread_mutex_unlock(&g_devlist_mutex); 1140 return thread; 1141 } 1142 1143 int 1144 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1145 { 1146 struct spdk_thread *thread; 1147 1148 thread = _get_thread(); 1149 if (!thread) { 1150 SPDK_ERRLOG("No thread allocated\n"); 1151 return -EINVAL; 1152 } 1153 1154 if (stats == NULL) { 1155 return -EINVAL; 1156 } 1157 1158 *stats = thread->stats; 1159 1160 return 0; 1161 } 1162 1163 uint64_t 1164 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1165 { 1166 if (thread == NULL) { 1167 thread = _get_thread(); 1168 } 1169 1170 return thread->tsc_last; 1171 } 1172 1173 static inline int 1174 thread_send_msg_notification(const struct spdk_thread *target_thread) 1175 { 1176 uint64_t notify = 1; 1177 int rc; 1178 1179 /* Not necessary to do notification if interrupt facility is not enabled */ 1180 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1181 return 0; 1182 } 1183 1184 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1185 * after sending thread msg, it is necessary to check whether target thread runs in 1186 * interrupt mode and then decide whether do event notification. 1187 */ 1188 if (spdk_unlikely(target_thread->in_interrupt)) { 1189 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1190 if (rc < 0) { 1191 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1192 return -EIO; 1193 } 1194 } 1195 1196 return 0; 1197 } 1198 1199 int 1200 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1201 { 1202 struct spdk_thread *local_thread; 1203 struct spdk_msg *msg; 1204 int rc; 1205 1206 assert(thread != NULL); 1207 1208 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1209 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1210 return -EIO; 1211 } 1212 1213 local_thread = _get_thread(); 1214 1215 msg = NULL; 1216 if (local_thread != NULL) { 1217 if (local_thread->msg_cache_count > 0) { 1218 msg = SLIST_FIRST(&local_thread->msg_cache); 1219 assert(msg != NULL); 1220 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1221 local_thread->msg_cache_count--; 1222 } 1223 } 1224 1225 if (msg == NULL) { 1226 msg = spdk_mempool_get(g_spdk_msg_mempool); 1227 if (!msg) { 1228 SPDK_ERRLOG("msg could not be allocated\n"); 1229 return -ENOMEM; 1230 } 1231 } 1232 1233 msg->fn = fn; 1234 msg->arg = ctx; 1235 1236 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1237 if (rc != 1) { 1238 SPDK_ERRLOG("msg could not be enqueued\n"); 1239 spdk_mempool_put(g_spdk_msg_mempool, msg); 1240 return -EIO; 1241 } 1242 1243 return thread_send_msg_notification(thread); 1244 } 1245 1246 int 1247 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1248 { 1249 spdk_msg_fn expected = NULL; 1250 1251 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1252 __ATOMIC_SEQ_CST)) { 1253 return -EIO; 1254 } 1255 1256 return thread_send_msg_notification(thread); 1257 } 1258 1259 #ifdef __linux__ 1260 static int 1261 interrupt_timerfd_process(void *arg) 1262 { 1263 struct spdk_poller *poller = arg; 1264 uint64_t exp; 1265 int rc; 1266 1267 /* clear the level of interval timer */ 1268 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1269 if (rc < 0) { 1270 if (rc == -EAGAIN) { 1271 return 0; 1272 } 1273 1274 return rc; 1275 } 1276 1277 return poller->fn(poller->arg); 1278 } 1279 1280 static int 1281 period_poller_interrupt_init(struct spdk_poller *poller) 1282 { 1283 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1284 int timerfd; 1285 int rc; 1286 1287 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1288 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1289 if (timerfd < 0) { 1290 return -errno; 1291 } 1292 1293 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1294 if (rc < 0) { 1295 close(timerfd); 1296 return rc; 1297 } 1298 1299 poller->interruptfd = timerfd; 1300 return 0; 1301 } 1302 1303 static void 1304 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1305 { 1306 int timerfd = poller->interruptfd; 1307 uint64_t now_tick = spdk_get_ticks(); 1308 uint64_t ticks = spdk_get_ticks_hz(); 1309 int ret; 1310 struct itimerspec new_tv = {}; 1311 struct itimerspec old_tv = {}; 1312 1313 assert(poller->period_ticks != 0); 1314 assert(timerfd >= 0); 1315 1316 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1317 interrupt_mode ? "interrupt" : "poll"); 1318 1319 if (interrupt_mode) { 1320 /* Set repeated timer expiration */ 1321 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1322 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1323 1324 /* Update next timer expiration */ 1325 if (poller->next_run_tick == 0) { 1326 poller->next_run_tick = now_tick + poller->period_ticks; 1327 } else if (poller->next_run_tick < now_tick) { 1328 poller->next_run_tick = now_tick; 1329 } 1330 1331 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1332 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1333 1334 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1335 if (ret < 0) { 1336 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1337 assert(false); 1338 } 1339 } else { 1340 /* Disarm the timer */ 1341 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1342 if (ret < 0) { 1343 /* timerfd_settime's failure indicates that the timerfd is in error */ 1344 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1345 assert(false); 1346 } 1347 1348 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1349 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1350 */ 1351 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1352 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1353 poller_remove_timer(poller->thread, poller); 1354 poller_insert_timer(poller->thread, poller, now_tick); 1355 } 1356 } 1357 1358 static void 1359 poller_interrupt_fini(struct spdk_poller *poller) 1360 { 1361 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1362 assert(poller->interruptfd >= 0); 1363 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1364 close(poller->interruptfd); 1365 poller->interruptfd = -1; 1366 } 1367 1368 static int 1369 busy_poller_interrupt_init(struct spdk_poller *poller) 1370 { 1371 int busy_efd; 1372 int rc; 1373 1374 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1375 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1376 if (busy_efd < 0) { 1377 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1378 return -errno; 1379 } 1380 1381 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1382 poller->fn, poller->arg, poller->name); 1383 if (rc < 0) { 1384 close(busy_efd); 1385 return rc; 1386 } 1387 1388 poller->interruptfd = busy_efd; 1389 return 0; 1390 } 1391 1392 static void 1393 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1394 { 1395 int busy_efd = poller->interruptfd; 1396 uint64_t notify = 1; 1397 int rc __attribute__((unused)); 1398 1399 assert(busy_efd >= 0); 1400 1401 if (interrupt_mode) { 1402 /* Write without read on eventfd will get it repeatedly triggered. */ 1403 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1404 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1405 } 1406 } else { 1407 /* Read on eventfd will clear its level triggering. */ 1408 rc = read(busy_efd, ¬ify, sizeof(notify)); 1409 } 1410 } 1411 1412 #else 1413 1414 static int 1415 period_poller_interrupt_init(struct spdk_poller *poller) 1416 { 1417 return -ENOTSUP; 1418 } 1419 1420 static void 1421 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1422 { 1423 } 1424 1425 static void 1426 poller_interrupt_fini(struct spdk_poller *poller) 1427 { 1428 } 1429 1430 static int 1431 busy_poller_interrupt_init(struct spdk_poller *poller) 1432 { 1433 return -ENOTSUP; 1434 } 1435 1436 static void 1437 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1438 { 1439 } 1440 1441 #endif 1442 1443 void 1444 spdk_poller_register_interrupt(struct spdk_poller *poller, 1445 spdk_poller_set_interrupt_mode_cb cb_fn, 1446 void *cb_arg) 1447 { 1448 assert(poller != NULL); 1449 assert(cb_fn != NULL); 1450 assert(spdk_get_thread() == poller->thread); 1451 1452 if (!spdk_interrupt_mode_is_enabled()) { 1453 return; 1454 } 1455 1456 /* when a poller is created we don't know if the user is ever going to 1457 * enable interrupts on it by calling this function, so the poller 1458 * registration function has to immediately create a interruptfd. 1459 * When this function does get called by user, we have to then destroy 1460 * that interruptfd. 1461 */ 1462 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1463 poller_interrupt_fini(poller); 1464 } 1465 1466 poller->set_intr_cb_fn = cb_fn; 1467 poller->set_intr_cb_arg = cb_arg; 1468 1469 /* Set poller into interrupt mode if thread is in interrupt. */ 1470 if (poller->thread->in_interrupt) { 1471 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1472 } 1473 } 1474 1475 static uint64_t 1476 convert_us_to_ticks(uint64_t us) 1477 { 1478 uint64_t quotient, remainder, ticks; 1479 1480 if (us) { 1481 quotient = us / SPDK_SEC_TO_USEC; 1482 remainder = us % SPDK_SEC_TO_USEC; 1483 ticks = spdk_get_ticks_hz(); 1484 1485 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1486 } else { 1487 return 0; 1488 } 1489 } 1490 1491 static struct spdk_poller * 1492 poller_register(spdk_poller_fn fn, 1493 void *arg, 1494 uint64_t period_microseconds, 1495 const char *name) 1496 { 1497 struct spdk_thread *thread; 1498 struct spdk_poller *poller; 1499 1500 thread = spdk_get_thread(); 1501 if (!thread) { 1502 assert(false); 1503 return NULL; 1504 } 1505 1506 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1507 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1508 return NULL; 1509 } 1510 1511 poller = calloc(1, sizeof(*poller)); 1512 if (poller == NULL) { 1513 SPDK_ERRLOG("Poller memory allocation failed\n"); 1514 return NULL; 1515 } 1516 1517 if (name) { 1518 snprintf(poller->name, sizeof(poller->name), "%s", name); 1519 } else { 1520 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1521 } 1522 1523 poller->state = SPDK_POLLER_STATE_WAITING; 1524 poller->fn = fn; 1525 poller->arg = arg; 1526 poller->thread = thread; 1527 poller->interruptfd = -1; 1528 if (thread->next_poller_id == 0) { 1529 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1530 thread->next_poller_id = 1; 1531 } 1532 poller->id = thread->next_poller_id++; 1533 1534 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1535 1536 if (spdk_interrupt_mode_is_enabled()) { 1537 int rc; 1538 1539 if (period_microseconds) { 1540 rc = period_poller_interrupt_init(poller); 1541 if (rc < 0) { 1542 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1543 free(poller); 1544 return NULL; 1545 } 1546 1547 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1548 } else { 1549 /* If the poller doesn't have a period, create interruptfd that's always 1550 * busy automatically when running in interrupt mode. 1551 */ 1552 rc = busy_poller_interrupt_init(poller); 1553 if (rc > 0) { 1554 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1555 free(poller); 1556 return NULL; 1557 } 1558 1559 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1560 } 1561 } 1562 1563 thread_insert_poller(thread, poller); 1564 1565 return poller; 1566 } 1567 1568 struct spdk_poller * 1569 spdk_poller_register(spdk_poller_fn fn, 1570 void *arg, 1571 uint64_t period_microseconds) 1572 { 1573 return poller_register(fn, arg, period_microseconds, NULL); 1574 } 1575 1576 struct spdk_poller * 1577 spdk_poller_register_named(spdk_poller_fn fn, 1578 void *arg, 1579 uint64_t period_microseconds, 1580 const char *name) 1581 { 1582 return poller_register(fn, arg, period_microseconds, name); 1583 } 1584 1585 static void 1586 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1587 struct spdk_thread *curthread) 1588 { 1589 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1590 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1591 thread->name, thread->id); 1592 assert(false); 1593 } 1594 1595 void 1596 spdk_poller_unregister(struct spdk_poller **ppoller) 1597 { 1598 struct spdk_thread *thread; 1599 struct spdk_poller *poller; 1600 1601 poller = *ppoller; 1602 if (poller == NULL) { 1603 return; 1604 } 1605 1606 *ppoller = NULL; 1607 1608 thread = spdk_get_thread(); 1609 if (!thread) { 1610 assert(false); 1611 return; 1612 } 1613 1614 if (poller->thread != thread) { 1615 wrong_thread(__func__, poller->name, poller->thread, thread); 1616 return; 1617 } 1618 1619 if (spdk_interrupt_mode_is_enabled()) { 1620 /* Release the interrupt resource for period or busy poller */ 1621 if (poller->interruptfd >= 0) { 1622 poller_interrupt_fini(poller); 1623 } 1624 1625 /* Mark there is poller unregistered. Then unregistered pollers will 1626 * get reaped by spdk_thread_poll also in intr mode. 1627 */ 1628 thread->poller_unregistered = true; 1629 } 1630 1631 /* If the poller was paused, put it on the active_pollers list so that 1632 * its unregistration can be processed by spdk_thread_poll(). 1633 */ 1634 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1635 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1636 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1637 poller->period_ticks = 0; 1638 } 1639 1640 /* Simply set the state to unregistered. The poller will get cleaned up 1641 * in a subsequent call to spdk_thread_poll(). 1642 */ 1643 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1644 } 1645 1646 void 1647 spdk_poller_pause(struct spdk_poller *poller) 1648 { 1649 struct spdk_thread *thread; 1650 1651 thread = spdk_get_thread(); 1652 if (!thread) { 1653 assert(false); 1654 return; 1655 } 1656 1657 if (poller->thread != thread) { 1658 wrong_thread(__func__, poller->name, poller->thread, thread); 1659 return; 1660 } 1661 1662 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1663 * spdk_thread_poll() move it. It allows a poller to be paused from 1664 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1665 * iteration, or from within itself without breaking the logic to always 1666 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1667 */ 1668 switch (poller->state) { 1669 case SPDK_POLLER_STATE_PAUSED: 1670 case SPDK_POLLER_STATE_PAUSING: 1671 break; 1672 case SPDK_POLLER_STATE_RUNNING: 1673 case SPDK_POLLER_STATE_WAITING: 1674 poller->state = SPDK_POLLER_STATE_PAUSING; 1675 break; 1676 default: 1677 assert(false); 1678 break; 1679 } 1680 } 1681 1682 void 1683 spdk_poller_resume(struct spdk_poller *poller) 1684 { 1685 struct spdk_thread *thread; 1686 1687 thread = spdk_get_thread(); 1688 if (!thread) { 1689 assert(false); 1690 return; 1691 } 1692 1693 if (poller->thread != thread) { 1694 wrong_thread(__func__, poller->name, poller->thread, thread); 1695 return; 1696 } 1697 1698 /* If a poller is paused it has to be removed from the paused pollers 1699 * list and put on the active list or timer tree depending on its 1700 * period_ticks. If a poller is still in the process of being paused, 1701 * we just need to flip its state back to waiting, as it's already on 1702 * the appropriate list or tree. 1703 */ 1704 switch (poller->state) { 1705 case SPDK_POLLER_STATE_PAUSED: 1706 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1707 thread_insert_poller(thread, poller); 1708 /* fallthrough */ 1709 case SPDK_POLLER_STATE_PAUSING: 1710 poller->state = SPDK_POLLER_STATE_WAITING; 1711 break; 1712 case SPDK_POLLER_STATE_RUNNING: 1713 case SPDK_POLLER_STATE_WAITING: 1714 break; 1715 default: 1716 assert(false); 1717 break; 1718 } 1719 } 1720 1721 const char * 1722 spdk_poller_get_name(struct spdk_poller *poller) 1723 { 1724 return poller->name; 1725 } 1726 1727 uint64_t 1728 spdk_poller_get_id(struct spdk_poller *poller) 1729 { 1730 return poller->id; 1731 } 1732 1733 const char * 1734 spdk_poller_get_state_str(struct spdk_poller *poller) 1735 { 1736 switch (poller->state) { 1737 case SPDK_POLLER_STATE_WAITING: 1738 return "waiting"; 1739 case SPDK_POLLER_STATE_RUNNING: 1740 return "running"; 1741 case SPDK_POLLER_STATE_UNREGISTERED: 1742 return "unregistered"; 1743 case SPDK_POLLER_STATE_PAUSING: 1744 return "pausing"; 1745 case SPDK_POLLER_STATE_PAUSED: 1746 return "paused"; 1747 default: 1748 return NULL; 1749 } 1750 } 1751 1752 uint64_t 1753 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1754 { 1755 return poller->period_ticks; 1756 } 1757 1758 void 1759 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1760 { 1761 stats->run_count = poller->run_count; 1762 stats->busy_count = poller->busy_count; 1763 } 1764 1765 struct spdk_poller * 1766 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1767 { 1768 return TAILQ_FIRST(&thread->active_pollers); 1769 } 1770 1771 struct spdk_poller * 1772 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1773 { 1774 return TAILQ_NEXT(prev, tailq); 1775 } 1776 1777 struct spdk_poller * 1778 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1779 { 1780 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1781 } 1782 1783 struct spdk_poller * 1784 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1785 { 1786 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1787 } 1788 1789 struct spdk_poller * 1790 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1791 { 1792 return TAILQ_FIRST(&thread->paused_pollers); 1793 } 1794 1795 struct spdk_poller * 1796 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1797 { 1798 return TAILQ_NEXT(prev, tailq); 1799 } 1800 1801 struct spdk_io_channel * 1802 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1803 { 1804 return RB_MIN(io_channel_tree, &thread->io_channels); 1805 } 1806 1807 struct spdk_io_channel * 1808 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1809 { 1810 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1811 } 1812 1813 struct call_thread { 1814 struct spdk_thread *cur_thread; 1815 spdk_msg_fn fn; 1816 void *ctx; 1817 1818 struct spdk_thread *orig_thread; 1819 spdk_msg_fn cpl; 1820 }; 1821 1822 static void 1823 _on_thread(void *ctx) 1824 { 1825 struct call_thread *ct = ctx; 1826 int rc __attribute__((unused)); 1827 1828 ct->fn(ct->ctx); 1829 1830 pthread_mutex_lock(&g_devlist_mutex); 1831 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1832 pthread_mutex_unlock(&g_devlist_mutex); 1833 1834 if (!ct->cur_thread) { 1835 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1836 1837 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1838 free(ctx); 1839 } else { 1840 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1841 ct->cur_thread->name); 1842 1843 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1844 } 1845 assert(rc == 0); 1846 } 1847 1848 void 1849 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1850 { 1851 struct call_thread *ct; 1852 struct spdk_thread *thread; 1853 int rc __attribute__((unused)); 1854 1855 ct = calloc(1, sizeof(*ct)); 1856 if (!ct) { 1857 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1858 cpl(ctx); 1859 return; 1860 } 1861 1862 ct->fn = fn; 1863 ct->ctx = ctx; 1864 ct->cpl = cpl; 1865 1866 thread = _get_thread(); 1867 if (!thread) { 1868 SPDK_ERRLOG("No thread allocated\n"); 1869 free(ct); 1870 cpl(ctx); 1871 return; 1872 } 1873 ct->orig_thread = thread; 1874 1875 pthread_mutex_lock(&g_devlist_mutex); 1876 ct->cur_thread = TAILQ_FIRST(&g_threads); 1877 pthread_mutex_unlock(&g_devlist_mutex); 1878 1879 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1880 ct->orig_thread->name); 1881 1882 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1883 assert(rc == 0); 1884 } 1885 1886 static inline void 1887 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1888 { 1889 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1890 return; 1891 } 1892 1893 if (!poller->set_intr_cb_fn) { 1894 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1895 assert(false); 1896 return; 1897 } 1898 1899 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1900 } 1901 1902 void 1903 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1904 { 1905 struct spdk_thread *thread = _get_thread(); 1906 struct spdk_poller *poller, *tmp; 1907 1908 assert(thread); 1909 assert(spdk_interrupt_mode_is_enabled()); 1910 1911 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1912 thread->name, enable_interrupt ? "intr" : "poll", 1913 thread->in_interrupt ? "intr" : "poll"); 1914 1915 if (thread->in_interrupt == enable_interrupt) { 1916 return; 1917 } 1918 1919 /* Set pollers to expected mode */ 1920 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1921 poller_set_interrupt_mode(poller, enable_interrupt); 1922 } 1923 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1924 poller_set_interrupt_mode(poller, enable_interrupt); 1925 } 1926 /* All paused pollers will go to work in interrupt mode */ 1927 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1928 poller_set_interrupt_mode(poller, enable_interrupt); 1929 } 1930 1931 thread->in_interrupt = enable_interrupt; 1932 return; 1933 } 1934 1935 static struct io_device * 1936 io_device_get(void *io_device) 1937 { 1938 struct io_device find = {}; 1939 1940 find.io_device = io_device; 1941 return RB_FIND(io_device_tree, &g_io_devices, &find); 1942 } 1943 1944 void 1945 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1946 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1947 const char *name) 1948 { 1949 struct io_device *dev, *tmp; 1950 struct spdk_thread *thread; 1951 1952 assert(io_device != NULL); 1953 assert(create_cb != NULL); 1954 assert(destroy_cb != NULL); 1955 1956 thread = spdk_get_thread(); 1957 if (!thread) { 1958 SPDK_ERRLOG("called from non-SPDK thread\n"); 1959 assert(false); 1960 return; 1961 } 1962 1963 dev = calloc(1, sizeof(struct io_device)); 1964 if (dev == NULL) { 1965 SPDK_ERRLOG("could not allocate io_device\n"); 1966 return; 1967 } 1968 1969 dev->io_device = io_device; 1970 if (name) { 1971 snprintf(dev->name, sizeof(dev->name), "%s", name); 1972 } else { 1973 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1974 } 1975 dev->create_cb = create_cb; 1976 dev->destroy_cb = destroy_cb; 1977 dev->unregister_cb = NULL; 1978 dev->ctx_size = ctx_size; 1979 dev->for_each_count = 0; 1980 dev->unregistered = false; 1981 dev->refcnt = 0; 1982 1983 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1984 dev->name, dev->io_device, thread->name); 1985 1986 pthread_mutex_lock(&g_devlist_mutex); 1987 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1988 if (tmp != NULL) { 1989 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1990 io_device, tmp->name, dev->name); 1991 pthread_mutex_unlock(&g_devlist_mutex); 1992 free(dev); 1993 } 1994 1995 pthread_mutex_unlock(&g_devlist_mutex); 1996 } 1997 1998 static void 1999 _finish_unregister(void *arg) 2000 { 2001 struct io_device *dev = arg; 2002 struct spdk_thread *thread; 2003 2004 thread = spdk_get_thread(); 2005 assert(thread == dev->unregister_thread); 2006 2007 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2008 dev->name, dev->io_device, thread->name); 2009 2010 assert(thread->pending_unregister_count > 0); 2011 thread->pending_unregister_count--; 2012 2013 dev->unregister_cb(dev->io_device); 2014 free(dev); 2015 } 2016 2017 static void 2018 io_device_free(struct io_device *dev) 2019 { 2020 int rc __attribute__((unused)); 2021 2022 if (dev->unregister_cb == NULL) { 2023 free(dev); 2024 } else { 2025 assert(dev->unregister_thread != NULL); 2026 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2027 dev->name, dev->io_device, dev->unregister_thread->name); 2028 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2029 assert(rc == 0); 2030 } 2031 } 2032 2033 void 2034 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2035 { 2036 struct io_device *dev; 2037 uint32_t refcnt; 2038 struct spdk_thread *thread; 2039 2040 thread = spdk_get_thread(); 2041 if (!thread) { 2042 SPDK_ERRLOG("called from non-SPDK thread\n"); 2043 assert(false); 2044 return; 2045 } 2046 2047 pthread_mutex_lock(&g_devlist_mutex); 2048 dev = io_device_get(io_device); 2049 if (!dev) { 2050 SPDK_ERRLOG("io_device %p not found\n", io_device); 2051 assert(false); 2052 pthread_mutex_unlock(&g_devlist_mutex); 2053 return; 2054 } 2055 2056 if (dev->for_each_count > 0) { 2057 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2058 dev->name, io_device, dev->for_each_count); 2059 pthread_mutex_unlock(&g_devlist_mutex); 2060 return; 2061 } 2062 2063 dev->unregister_cb = unregister_cb; 2064 dev->unregistered = true; 2065 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2066 refcnt = dev->refcnt; 2067 dev->unregister_thread = thread; 2068 pthread_mutex_unlock(&g_devlist_mutex); 2069 2070 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2071 dev->name, dev->io_device, thread->name); 2072 2073 if (unregister_cb) { 2074 thread->pending_unregister_count++; 2075 } 2076 2077 if (refcnt > 0) { 2078 /* defer deletion */ 2079 return; 2080 } 2081 2082 io_device_free(dev); 2083 } 2084 2085 const char * 2086 spdk_io_device_get_name(struct io_device *dev) 2087 { 2088 return dev->name; 2089 } 2090 2091 static struct spdk_io_channel * 2092 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2093 { 2094 struct spdk_io_channel find = {}; 2095 2096 find.dev = dev; 2097 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2098 } 2099 2100 struct spdk_io_channel * 2101 spdk_get_io_channel(void *io_device) 2102 { 2103 struct spdk_io_channel *ch; 2104 struct spdk_thread *thread; 2105 struct io_device *dev; 2106 int rc; 2107 2108 pthread_mutex_lock(&g_devlist_mutex); 2109 dev = io_device_get(io_device); 2110 if (dev == NULL) { 2111 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2112 pthread_mutex_unlock(&g_devlist_mutex); 2113 return NULL; 2114 } 2115 2116 thread = _get_thread(); 2117 if (!thread) { 2118 SPDK_ERRLOG("No thread allocated\n"); 2119 pthread_mutex_unlock(&g_devlist_mutex); 2120 return NULL; 2121 } 2122 2123 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2124 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2125 pthread_mutex_unlock(&g_devlist_mutex); 2126 return NULL; 2127 } 2128 2129 ch = thread_get_io_channel(thread, dev); 2130 if (ch != NULL) { 2131 ch->ref++; 2132 2133 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2134 ch, dev->name, dev->io_device, thread->name, ch->ref); 2135 2136 /* 2137 * An I/O channel already exists for this device on this 2138 * thread, so return it. 2139 */ 2140 pthread_mutex_unlock(&g_devlist_mutex); 2141 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2142 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2143 return ch; 2144 } 2145 2146 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2147 if (ch == NULL) { 2148 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2149 pthread_mutex_unlock(&g_devlist_mutex); 2150 return NULL; 2151 } 2152 2153 ch->dev = dev; 2154 ch->destroy_cb = dev->destroy_cb; 2155 ch->thread = thread; 2156 ch->ref = 1; 2157 ch->destroy_ref = 0; 2158 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2159 2160 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2161 ch, dev->name, dev->io_device, thread->name, ch->ref); 2162 2163 dev->refcnt++; 2164 2165 pthread_mutex_unlock(&g_devlist_mutex); 2166 2167 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2168 if (rc != 0) { 2169 pthread_mutex_lock(&g_devlist_mutex); 2170 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2171 dev->refcnt--; 2172 free(ch); 2173 pthread_mutex_unlock(&g_devlist_mutex); 2174 return NULL; 2175 } 2176 2177 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2178 return ch; 2179 } 2180 2181 static void 2182 put_io_channel(void *arg) 2183 { 2184 struct spdk_io_channel *ch = arg; 2185 bool do_remove_dev = true; 2186 struct spdk_thread *thread; 2187 2188 thread = spdk_get_thread(); 2189 if (!thread) { 2190 SPDK_ERRLOG("called from non-SPDK thread\n"); 2191 assert(false); 2192 return; 2193 } 2194 2195 SPDK_DEBUGLOG(thread, 2196 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2197 ch, ch->dev->name, ch->dev->io_device, thread->name); 2198 2199 assert(ch->thread == thread); 2200 2201 ch->destroy_ref--; 2202 2203 if (ch->ref > 0 || ch->destroy_ref > 0) { 2204 /* 2205 * Another reference to the associated io_device was requested 2206 * after this message was sent but before it had a chance to 2207 * execute. 2208 */ 2209 return; 2210 } 2211 2212 pthread_mutex_lock(&g_devlist_mutex); 2213 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2214 pthread_mutex_unlock(&g_devlist_mutex); 2215 2216 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2217 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2218 2219 pthread_mutex_lock(&g_devlist_mutex); 2220 ch->dev->refcnt--; 2221 2222 if (!ch->dev->unregistered) { 2223 do_remove_dev = false; 2224 } 2225 2226 if (ch->dev->refcnt > 0) { 2227 do_remove_dev = false; 2228 } 2229 2230 pthread_mutex_unlock(&g_devlist_mutex); 2231 2232 if (do_remove_dev) { 2233 io_device_free(ch->dev); 2234 } 2235 free(ch); 2236 } 2237 2238 void 2239 spdk_put_io_channel(struct spdk_io_channel *ch) 2240 { 2241 struct spdk_thread *thread; 2242 int rc __attribute__((unused)); 2243 2244 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2245 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2246 2247 thread = spdk_get_thread(); 2248 if (!thread) { 2249 SPDK_ERRLOG("called from non-SPDK thread\n"); 2250 assert(false); 2251 return; 2252 } 2253 2254 if (ch->thread != thread) { 2255 wrong_thread(__func__, "ch", ch->thread, thread); 2256 return; 2257 } 2258 2259 SPDK_DEBUGLOG(thread, 2260 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2261 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2262 2263 ch->ref--; 2264 2265 if (ch->ref == 0) { 2266 ch->destroy_ref++; 2267 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2268 assert(rc == 0); 2269 } 2270 } 2271 2272 struct spdk_io_channel * 2273 spdk_io_channel_from_ctx(void *ctx) 2274 { 2275 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2276 } 2277 2278 struct spdk_thread * 2279 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2280 { 2281 return ch->thread; 2282 } 2283 2284 void * 2285 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2286 { 2287 return ch->dev->io_device; 2288 } 2289 2290 const char * 2291 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2292 { 2293 return spdk_io_device_get_name(ch->dev); 2294 } 2295 2296 int 2297 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2298 { 2299 return ch->ref; 2300 } 2301 2302 struct spdk_io_channel_iter { 2303 void *io_device; 2304 struct io_device *dev; 2305 spdk_channel_msg fn; 2306 int status; 2307 void *ctx; 2308 struct spdk_io_channel *ch; 2309 2310 struct spdk_thread *cur_thread; 2311 2312 struct spdk_thread *orig_thread; 2313 spdk_channel_for_each_cpl cpl; 2314 }; 2315 2316 void * 2317 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2318 { 2319 return i->io_device; 2320 } 2321 2322 struct spdk_io_channel * 2323 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2324 { 2325 return i->ch; 2326 } 2327 2328 void * 2329 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2330 { 2331 return i->ctx; 2332 } 2333 2334 static void 2335 _call_completion(void *ctx) 2336 { 2337 struct spdk_io_channel_iter *i = ctx; 2338 2339 if (i->cpl != NULL) { 2340 i->cpl(i, i->status); 2341 } 2342 free(i); 2343 } 2344 2345 static void 2346 _call_channel(void *ctx) 2347 { 2348 struct spdk_io_channel_iter *i = ctx; 2349 struct spdk_io_channel *ch; 2350 2351 /* 2352 * It is possible that the channel was deleted before this 2353 * message had a chance to execute. If so, skip calling 2354 * the fn() on this thread. 2355 */ 2356 pthread_mutex_lock(&g_devlist_mutex); 2357 ch = thread_get_io_channel(i->cur_thread, i->dev); 2358 pthread_mutex_unlock(&g_devlist_mutex); 2359 2360 if (ch) { 2361 i->fn(i); 2362 } else { 2363 spdk_for_each_channel_continue(i, 0); 2364 } 2365 } 2366 2367 void 2368 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2369 spdk_channel_for_each_cpl cpl) 2370 { 2371 struct spdk_thread *thread; 2372 struct spdk_io_channel *ch; 2373 struct spdk_io_channel_iter *i; 2374 int rc __attribute__((unused)); 2375 2376 i = calloc(1, sizeof(*i)); 2377 if (!i) { 2378 SPDK_ERRLOG("Unable to allocate iterator\n"); 2379 return; 2380 } 2381 2382 i->io_device = io_device; 2383 i->fn = fn; 2384 i->ctx = ctx; 2385 i->cpl = cpl; 2386 i->orig_thread = _get_thread(); 2387 2388 pthread_mutex_lock(&g_devlist_mutex); 2389 i->dev = io_device_get(io_device); 2390 if (i->dev == NULL) { 2391 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2392 assert(false); 2393 goto end; 2394 } 2395 2396 TAILQ_FOREACH(thread, &g_threads, tailq) { 2397 ch = thread_get_io_channel(thread, i->dev); 2398 if (ch != NULL) { 2399 ch->dev->for_each_count++; 2400 i->cur_thread = thread; 2401 i->ch = ch; 2402 pthread_mutex_unlock(&g_devlist_mutex); 2403 rc = spdk_thread_send_msg(thread, _call_channel, i); 2404 assert(rc == 0); 2405 return; 2406 } 2407 } 2408 2409 end: 2410 pthread_mutex_unlock(&g_devlist_mutex); 2411 2412 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2413 assert(rc == 0); 2414 } 2415 2416 void 2417 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2418 { 2419 struct spdk_thread *thread; 2420 struct spdk_io_channel *ch; 2421 int rc __attribute__((unused)); 2422 2423 assert(i->cur_thread == spdk_get_thread()); 2424 2425 i->status = status; 2426 2427 pthread_mutex_lock(&g_devlist_mutex); 2428 if (status) { 2429 goto end; 2430 } 2431 thread = TAILQ_NEXT(i->cur_thread, tailq); 2432 while (thread) { 2433 ch = thread_get_io_channel(thread, i->dev); 2434 if (ch != NULL) { 2435 i->cur_thread = thread; 2436 i->ch = ch; 2437 pthread_mutex_unlock(&g_devlist_mutex); 2438 rc = spdk_thread_send_msg(thread, _call_channel, i); 2439 assert(rc == 0); 2440 return; 2441 } 2442 thread = TAILQ_NEXT(thread, tailq); 2443 } 2444 2445 end: 2446 i->dev->for_each_count--; 2447 i->ch = NULL; 2448 pthread_mutex_unlock(&g_devlist_mutex); 2449 2450 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2451 assert(rc == 0); 2452 } 2453 2454 struct spdk_interrupt { 2455 int efd; 2456 struct spdk_thread *thread; 2457 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2458 }; 2459 2460 static void 2461 thread_interrupt_destroy(struct spdk_thread *thread) 2462 { 2463 struct spdk_fd_group *fgrp = thread->fgrp; 2464 2465 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2466 2467 if (thread->msg_fd < 0) { 2468 return; 2469 } 2470 2471 spdk_fd_group_remove(fgrp, thread->msg_fd); 2472 close(thread->msg_fd); 2473 thread->msg_fd = -1; 2474 2475 spdk_fd_group_destroy(fgrp); 2476 thread->fgrp = NULL; 2477 } 2478 2479 #ifdef __linux__ 2480 static int 2481 thread_interrupt_msg_process(void *arg) 2482 { 2483 struct spdk_thread *thread = arg; 2484 uint32_t msg_count; 2485 spdk_msg_fn critical_msg; 2486 int rc = 0; 2487 uint64_t notify = 1; 2488 2489 assert(spdk_interrupt_mode_is_enabled()); 2490 2491 /* There may be race between msg_acknowledge and another producer's msg_notify, 2492 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2493 * This can avoid msg notification missing. 2494 */ 2495 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2496 if (rc < 0 && errno != EAGAIN) { 2497 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2498 } 2499 2500 critical_msg = thread->critical_msg; 2501 if (spdk_unlikely(critical_msg != NULL)) { 2502 critical_msg(NULL); 2503 thread->critical_msg = NULL; 2504 rc = 1; 2505 } 2506 2507 msg_count = msg_queue_run_batch(thread, 0); 2508 if (msg_count) { 2509 rc = 1; 2510 } 2511 2512 return rc; 2513 } 2514 2515 static int 2516 thread_interrupt_create(struct spdk_thread *thread) 2517 { 2518 int rc; 2519 2520 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2521 2522 rc = spdk_fd_group_create(&thread->fgrp); 2523 if (rc) { 2524 return rc; 2525 } 2526 2527 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2528 if (thread->msg_fd < 0) { 2529 rc = -errno; 2530 spdk_fd_group_destroy(thread->fgrp); 2531 thread->fgrp = NULL; 2532 2533 return rc; 2534 } 2535 2536 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2537 thread_interrupt_msg_process, thread); 2538 } 2539 #else 2540 static int 2541 thread_interrupt_create(struct spdk_thread *thread) 2542 { 2543 return -ENOTSUP; 2544 } 2545 #endif 2546 2547 struct spdk_interrupt * 2548 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2549 void *arg, const char *name) 2550 { 2551 struct spdk_thread *thread; 2552 struct spdk_interrupt *intr; 2553 int ret; 2554 2555 thread = spdk_get_thread(); 2556 if (!thread) { 2557 assert(false); 2558 return NULL; 2559 } 2560 2561 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2562 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2563 return NULL; 2564 } 2565 2566 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2567 2568 if (ret != 0) { 2569 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2570 thread->name, efd, spdk_strerror(-ret)); 2571 return NULL; 2572 } 2573 2574 intr = calloc(1, sizeof(*intr)); 2575 if (intr == NULL) { 2576 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2577 return NULL; 2578 } 2579 2580 if (name) { 2581 snprintf(intr->name, sizeof(intr->name), "%s", name); 2582 } else { 2583 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2584 } 2585 2586 intr->efd = efd; 2587 intr->thread = thread; 2588 2589 return intr; 2590 } 2591 2592 void 2593 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2594 { 2595 struct spdk_thread *thread; 2596 struct spdk_interrupt *intr; 2597 2598 intr = *pintr; 2599 if (intr == NULL) { 2600 return; 2601 } 2602 2603 *pintr = NULL; 2604 2605 thread = spdk_get_thread(); 2606 if (!thread) { 2607 assert(false); 2608 return; 2609 } 2610 2611 if (intr->thread != thread) { 2612 wrong_thread(__func__, intr->name, intr->thread, thread); 2613 return; 2614 } 2615 2616 spdk_fd_group_remove(thread->fgrp, intr->efd); 2617 free(intr); 2618 } 2619 2620 int 2621 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2622 enum spdk_interrupt_event_types event_types) 2623 { 2624 struct spdk_thread *thread; 2625 2626 thread = spdk_get_thread(); 2627 if (!thread) { 2628 assert(false); 2629 return -EINVAL; 2630 } 2631 2632 if (intr->thread != thread) { 2633 wrong_thread(__func__, intr->name, intr->thread, thread); 2634 return -EINVAL; 2635 } 2636 2637 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2638 } 2639 2640 int 2641 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2642 { 2643 return spdk_fd_group_get_fd(thread->fgrp); 2644 } 2645 2646 static bool g_interrupt_mode = false; 2647 2648 int 2649 spdk_interrupt_mode_enable(void) 2650 { 2651 /* It must be called once prior to initializing the threading library. 2652 * g_spdk_msg_mempool will be valid if thread library is initialized. 2653 */ 2654 if (g_spdk_msg_mempool) { 2655 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2656 return -1; 2657 } 2658 2659 #ifdef __linux__ 2660 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2661 g_interrupt_mode = true; 2662 return 0; 2663 #else 2664 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2665 g_interrupt_mode = false; 2666 return -ENOTSUP; 2667 #endif 2668 } 2669 2670 bool 2671 spdk_interrupt_mode_is_enabled(void) 2672 { 2673 return g_interrupt_mode; 2674 } 2675 2676 SPDK_LOG_REGISTER_COMPONENT(thread) 2677