1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/trace.h" 42 #include "spdk/util.h" 43 #include "spdk/fd_group.h" 44 45 #include "spdk/log.h" 46 #include "spdk_internal/thread.h" 47 #include "thread_internal.h" 48 49 #include "spdk_internal/trace_defs.h" 50 51 #ifdef __linux__ 52 #include <sys/timerfd.h> 53 #include <sys/eventfd.h> 54 #endif 55 56 #define SPDK_MSG_BATCH_SIZE 8 57 #define SPDK_MAX_DEVICE_NAME_LEN 256 58 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 59 #define SPDK_MAX_POLLER_NAME_LEN 256 60 #define SPDK_MAX_THREAD_NAME_LEN 256 61 62 enum spdk_poller_state { 63 /* The poller is registered with a thread but not currently executing its fn. */ 64 SPDK_POLLER_STATE_WAITING, 65 66 /* The poller is currently running its fn. */ 67 SPDK_POLLER_STATE_RUNNING, 68 69 /* The poller was unregistered during the execution of its fn. */ 70 SPDK_POLLER_STATE_UNREGISTERED, 71 72 /* The poller is in the process of being paused. It will be paused 73 * during the next time it's supposed to be executed. 74 */ 75 SPDK_POLLER_STATE_PAUSING, 76 77 /* The poller is registered but currently paused. It's on the 78 * paused_pollers list. 79 */ 80 SPDK_POLLER_STATE_PAUSED, 81 }; 82 83 struct spdk_poller { 84 TAILQ_ENTRY(spdk_poller) tailq; 85 RB_ENTRY(spdk_poller) node; 86 87 /* Current state of the poller; should only be accessed from the poller's thread. */ 88 enum spdk_poller_state state; 89 90 uint64_t period_ticks; 91 uint64_t next_run_tick; 92 uint64_t run_count; 93 uint64_t busy_count; 94 uint64_t id; 95 spdk_poller_fn fn; 96 void *arg; 97 struct spdk_thread *thread; 98 /* Native interruptfd for period or busy poller */ 99 int interruptfd; 100 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 101 void *set_intr_cb_arg; 102 103 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 104 }; 105 106 enum spdk_thread_state { 107 /* The thread is processing poller and message by spdk_thread_poll(). */ 108 SPDK_THREAD_STATE_RUNNING, 109 110 /* The thread is in the process of termination. It reaps unregistering 111 * poller are releasing I/O channel. 112 */ 113 SPDK_THREAD_STATE_EXITING, 114 115 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 116 SPDK_THREAD_STATE_EXITED, 117 }; 118 119 struct spdk_thread { 120 uint64_t tsc_last; 121 struct spdk_thread_stats stats; 122 /* 123 * Contains pollers actively running on this thread. Pollers 124 * are run round-robin. The thread takes one poller from the head 125 * of the ring, executes it, then puts it back at the tail of 126 * the ring. 127 */ 128 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 129 /** 130 * Contains pollers running on this thread with a periodic timer. 131 */ 132 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 133 struct spdk_poller *first_timed_poller; 134 /* 135 * Contains paused pollers. Pollers on this queue are waiting until 136 * they are resumed (in which case they're put onto the active/timer 137 * queues) or unregistered. 138 */ 139 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 140 struct spdk_ring *messages; 141 int msg_fd; 142 SLIST_HEAD(, spdk_msg) msg_cache; 143 size_t msg_cache_count; 144 spdk_msg_fn critical_msg; 145 uint64_t id; 146 uint64_t next_poller_id; 147 enum spdk_thread_state state; 148 int pending_unregister_count; 149 150 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 151 TAILQ_ENTRY(spdk_thread) tailq; 152 153 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 154 struct spdk_cpuset cpumask; 155 uint64_t exit_timeout_tsc; 156 157 /* Indicates whether this spdk_thread currently runs in interrupt. */ 158 bool in_interrupt; 159 bool poller_unregistered; 160 struct spdk_fd_group *fgrp; 161 162 /* User context allocated at the end */ 163 uint8_t ctx[0]; 164 }; 165 166 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 167 168 static spdk_new_thread_fn g_new_thread_fn = NULL; 169 static spdk_thread_op_fn g_thread_op_fn = NULL; 170 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 171 static size_t g_ctx_sz = 0; 172 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 173 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 174 * SPDK application is required. 175 */ 176 static uint64_t g_thread_id = 1; 177 178 struct io_device { 179 void *io_device; 180 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 181 spdk_io_channel_create_cb create_cb; 182 spdk_io_channel_destroy_cb destroy_cb; 183 spdk_io_device_unregister_cb unregister_cb; 184 struct spdk_thread *unregister_thread; 185 uint32_t ctx_size; 186 uint32_t for_each_count; 187 RB_ENTRY(io_device) node; 188 189 uint32_t refcnt; 190 191 bool unregistered; 192 }; 193 194 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 195 196 static int 197 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 198 { 199 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 200 } 201 202 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 203 204 static int 205 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 206 { 207 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 208 } 209 210 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 211 212 struct spdk_msg { 213 spdk_msg_fn fn; 214 void *arg; 215 216 SLIST_ENTRY(spdk_msg) link; 217 }; 218 219 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 220 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 221 222 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 223 static uint32_t g_thread_count = 0; 224 225 static __thread struct spdk_thread *tls_thread = NULL; 226 227 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 228 { 229 spdk_trace_register_description("THREAD_IOCH_GET", 230 TRACE_THREAD_IOCH_GET, 231 OWNER_NONE, OBJECT_NONE, 0, 232 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 233 spdk_trace_register_description("THREAD_IOCH_PUT", 234 TRACE_THREAD_IOCH_PUT, 235 OWNER_NONE, OBJECT_NONE, 0, 236 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 237 } 238 239 /* 240 * If this compare function returns zero when two next_run_ticks are equal, 241 * the macro RB_INSERT() returns a pointer to the element with the same 242 * next_run_tick. 243 * 244 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 245 * to remove as a parameter. 246 * 247 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 248 * side by returning 1 when two next_run_ticks are equal. 249 */ 250 static inline int 251 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 252 { 253 if (poller1->next_run_tick < poller2->next_run_tick) { 254 return -1; 255 } else { 256 return 1; 257 } 258 } 259 260 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 261 262 static inline struct spdk_thread * 263 _get_thread(void) 264 { 265 return tls_thread; 266 } 267 268 static int 269 _thread_lib_init(size_t ctx_sz) 270 { 271 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 272 273 g_ctx_sz = ctx_sz; 274 275 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 276 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 277 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 278 sizeof(struct spdk_msg), 279 0, /* No cache. We do our own. */ 280 SPDK_ENV_SOCKET_ID_ANY); 281 282 if (!g_spdk_msg_mempool) { 283 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 284 return -1; 285 } 286 287 return 0; 288 } 289 290 int 291 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 292 { 293 assert(g_new_thread_fn == NULL); 294 assert(g_thread_op_fn == NULL); 295 296 if (new_thread_fn == NULL) { 297 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 298 } else { 299 g_new_thread_fn = new_thread_fn; 300 } 301 302 return _thread_lib_init(ctx_sz); 303 } 304 305 int 306 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 307 spdk_thread_op_supported_fn thread_op_supported_fn, 308 size_t ctx_sz) 309 { 310 assert(g_new_thread_fn == NULL); 311 assert(g_thread_op_fn == NULL); 312 assert(g_thread_op_supported_fn == NULL); 313 314 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 315 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 316 return -EINVAL; 317 } 318 319 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 320 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 321 } else { 322 g_thread_op_fn = thread_op_fn; 323 g_thread_op_supported_fn = thread_op_supported_fn; 324 } 325 326 return _thread_lib_init(ctx_sz); 327 } 328 329 void 330 spdk_thread_lib_fini(void) 331 { 332 struct io_device *dev; 333 334 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 335 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 336 } 337 338 if (g_spdk_msg_mempool) { 339 spdk_mempool_free(g_spdk_msg_mempool); 340 g_spdk_msg_mempool = NULL; 341 } 342 343 g_new_thread_fn = NULL; 344 g_thread_op_fn = NULL; 345 g_thread_op_supported_fn = NULL; 346 g_ctx_sz = 0; 347 } 348 349 static void thread_interrupt_destroy(struct spdk_thread *thread); 350 static int thread_interrupt_create(struct spdk_thread *thread); 351 352 static void 353 _free_thread(struct spdk_thread *thread) 354 { 355 struct spdk_io_channel *ch; 356 struct spdk_msg *msg; 357 struct spdk_poller *poller, *ptmp; 358 359 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 360 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 361 thread->name, ch->dev->name); 362 } 363 364 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 365 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 366 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 367 poller->name); 368 } 369 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 370 free(poller); 371 } 372 373 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 374 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 375 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 376 poller->name); 377 } 378 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 379 free(poller); 380 } 381 382 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 383 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 384 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 385 free(poller); 386 } 387 388 pthread_mutex_lock(&g_devlist_mutex); 389 assert(g_thread_count > 0); 390 g_thread_count--; 391 TAILQ_REMOVE(&g_threads, thread, tailq); 392 pthread_mutex_unlock(&g_devlist_mutex); 393 394 msg = SLIST_FIRST(&thread->msg_cache); 395 while (msg != NULL) { 396 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 397 398 assert(thread->msg_cache_count > 0); 399 thread->msg_cache_count--; 400 spdk_mempool_put(g_spdk_msg_mempool, msg); 401 402 msg = SLIST_FIRST(&thread->msg_cache); 403 } 404 405 assert(thread->msg_cache_count == 0); 406 407 if (spdk_interrupt_mode_is_enabled()) { 408 thread_interrupt_destroy(thread); 409 } 410 411 spdk_ring_free(thread->messages); 412 free(thread); 413 } 414 415 struct spdk_thread * 416 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 417 { 418 struct spdk_thread *thread; 419 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 420 int rc = 0, i; 421 422 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 423 if (!thread) { 424 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 425 return NULL; 426 } 427 428 if (cpumask) { 429 spdk_cpuset_copy(&thread->cpumask, cpumask); 430 } else { 431 spdk_cpuset_negate(&thread->cpumask); 432 } 433 434 RB_INIT(&thread->io_channels); 435 TAILQ_INIT(&thread->active_pollers); 436 RB_INIT(&thread->timed_pollers); 437 TAILQ_INIT(&thread->paused_pollers); 438 SLIST_INIT(&thread->msg_cache); 439 thread->msg_cache_count = 0; 440 441 thread->tsc_last = spdk_get_ticks(); 442 443 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 444 * ID exceeds UINT64_MAX a warning message is logged 445 */ 446 thread->next_poller_id = 1; 447 448 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 449 if (!thread->messages) { 450 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 451 free(thread); 452 return NULL; 453 } 454 455 /* Fill the local message pool cache. */ 456 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 457 if (rc == 0) { 458 /* If we can't populate the cache it's ok. The cache will get filled 459 * up organically as messages are passed to the thread. */ 460 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 461 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 462 thread->msg_cache_count++; 463 } 464 } 465 466 if (name) { 467 snprintf(thread->name, sizeof(thread->name), "%s", name); 468 } else { 469 snprintf(thread->name, sizeof(thread->name), "%p", thread); 470 } 471 472 pthread_mutex_lock(&g_devlist_mutex); 473 if (g_thread_id == 0) { 474 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 475 pthread_mutex_unlock(&g_devlist_mutex); 476 _free_thread(thread); 477 return NULL; 478 } 479 thread->id = g_thread_id++; 480 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 481 g_thread_count++; 482 pthread_mutex_unlock(&g_devlist_mutex); 483 484 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 485 thread->id, thread->name); 486 487 if (spdk_interrupt_mode_is_enabled()) { 488 thread->in_interrupt = true; 489 rc = thread_interrupt_create(thread); 490 if (rc != 0) { 491 _free_thread(thread); 492 return NULL; 493 } 494 } 495 496 if (g_new_thread_fn) { 497 rc = g_new_thread_fn(thread); 498 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 499 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 500 } 501 502 if (rc != 0) { 503 _free_thread(thread); 504 return NULL; 505 } 506 507 thread->state = SPDK_THREAD_STATE_RUNNING; 508 509 return thread; 510 } 511 512 void 513 spdk_set_thread(struct spdk_thread *thread) 514 { 515 tls_thread = thread; 516 } 517 518 static void 519 thread_exit(struct spdk_thread *thread, uint64_t now) 520 { 521 struct spdk_poller *poller; 522 struct spdk_io_channel *ch; 523 524 if (now >= thread->exit_timeout_tsc) { 525 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 526 thread->name); 527 goto exited; 528 } 529 530 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 531 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 532 SPDK_INFOLOG(thread, 533 "thread %s still has active poller %s\n", 534 thread->name, poller->name); 535 return; 536 } 537 } 538 539 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 540 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 541 SPDK_INFOLOG(thread, 542 "thread %s still has active timed poller %s\n", 543 thread->name, poller->name); 544 return; 545 } 546 } 547 548 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 549 SPDK_INFOLOG(thread, 550 "thread %s still has paused poller %s\n", 551 thread->name, poller->name); 552 return; 553 } 554 555 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 556 SPDK_INFOLOG(thread, 557 "thread %s still has channel for io_device %s\n", 558 thread->name, ch->dev->name); 559 return; 560 } 561 562 if (thread->pending_unregister_count > 0) { 563 SPDK_INFOLOG(thread, 564 "thread %s is still unregistering io_devices\n", 565 thread->name); 566 return; 567 } 568 569 exited: 570 thread->state = SPDK_THREAD_STATE_EXITED; 571 } 572 573 int 574 spdk_thread_exit(struct spdk_thread *thread) 575 { 576 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 577 578 assert(tls_thread == thread); 579 580 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 581 SPDK_INFOLOG(thread, 582 "thread %s is already exiting\n", 583 thread->name); 584 return 0; 585 } 586 587 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 588 SPDK_THREAD_EXIT_TIMEOUT_SEC); 589 thread->state = SPDK_THREAD_STATE_EXITING; 590 return 0; 591 } 592 593 bool 594 spdk_thread_is_exited(struct spdk_thread *thread) 595 { 596 return thread->state == SPDK_THREAD_STATE_EXITED; 597 } 598 599 void 600 spdk_thread_destroy(struct spdk_thread *thread) 601 { 602 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 603 604 assert(thread->state == SPDK_THREAD_STATE_EXITED); 605 606 if (tls_thread == thread) { 607 tls_thread = NULL; 608 } 609 610 _free_thread(thread); 611 } 612 613 void * 614 spdk_thread_get_ctx(struct spdk_thread *thread) 615 { 616 if (g_ctx_sz > 0) { 617 return thread->ctx; 618 } 619 620 return NULL; 621 } 622 623 struct spdk_cpuset * 624 spdk_thread_get_cpumask(struct spdk_thread *thread) 625 { 626 return &thread->cpumask; 627 } 628 629 int 630 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 631 { 632 struct spdk_thread *thread; 633 634 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 635 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 636 assert(false); 637 return -ENOTSUP; 638 } 639 640 thread = spdk_get_thread(); 641 if (!thread) { 642 SPDK_ERRLOG("Called from non-SPDK thread\n"); 643 assert(false); 644 return -EINVAL; 645 } 646 647 spdk_cpuset_copy(&thread->cpumask, cpumask); 648 649 /* Invoke framework's reschedule operation. If this function is called multiple times 650 * in a single spdk_thread_poll() context, the last cpumask will be used in the 651 * reschedule operation. 652 */ 653 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 654 655 return 0; 656 } 657 658 struct spdk_thread * 659 spdk_thread_get_from_ctx(void *ctx) 660 { 661 if (ctx == NULL) { 662 assert(false); 663 return NULL; 664 } 665 666 assert(g_ctx_sz > 0); 667 668 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 669 } 670 671 static inline uint32_t 672 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 673 { 674 unsigned count, i; 675 void *messages[SPDK_MSG_BATCH_SIZE]; 676 uint64_t notify = 1; 677 int rc; 678 679 #ifdef DEBUG 680 /* 681 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 682 * so we will never actually read uninitialized data from events, but just to be sure 683 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 684 */ 685 memset(messages, 0, sizeof(messages)); 686 #endif 687 688 if (max_msgs > 0) { 689 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 690 } else { 691 max_msgs = SPDK_MSG_BATCH_SIZE; 692 } 693 694 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 695 if (spdk_unlikely(thread->in_interrupt) && 696 spdk_ring_count(thread->messages) != 0) { 697 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 698 if (rc < 0) { 699 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 700 } 701 } 702 if (count == 0) { 703 return 0; 704 } 705 706 for (i = 0; i < count; i++) { 707 struct spdk_msg *msg = messages[i]; 708 709 assert(msg != NULL); 710 msg->fn(msg->arg); 711 712 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 713 /* Insert the messages at the head. We want to re-use the hot 714 * ones. */ 715 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 716 thread->msg_cache_count++; 717 } else { 718 spdk_mempool_put(g_spdk_msg_mempool, msg); 719 } 720 } 721 722 return count; 723 } 724 725 static void 726 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 727 { 728 struct spdk_poller *tmp __attribute__((unused)); 729 730 poller->next_run_tick = now + poller->period_ticks; 731 732 /* 733 * Insert poller in the thread's timed_pollers tree by next scheduled run time 734 * as its key. 735 */ 736 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 737 assert(tmp == NULL); 738 739 /* Update the cache only if it is empty or the inserted poller is earlier than it. 740 * RB_MIN() is not necessary here because all pollers, which has exactly the same 741 * next_run_tick as the existing poller, are inserted on the right side. 742 */ 743 if (thread->first_timed_poller == NULL || 744 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 745 thread->first_timed_poller = poller; 746 } 747 } 748 749 static inline void 750 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 751 { 752 struct spdk_poller *tmp __attribute__((unused)); 753 754 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 755 assert(tmp != NULL); 756 757 /* This function is not used in any case that is performance critical. 758 * Update the cache simply by RB_MIN() if it needs to be changed. 759 */ 760 if (thread->first_timed_poller == poller) { 761 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 762 } 763 } 764 765 static void 766 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 767 { 768 if (poller->period_ticks) { 769 poller_insert_timer(thread, poller, spdk_get_ticks()); 770 } else { 771 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 772 } 773 } 774 775 static inline void 776 thread_update_stats(struct spdk_thread *thread, uint64_t end, 777 uint64_t start, int rc) 778 { 779 if (rc == 0) { 780 /* Poller status idle */ 781 thread->stats.idle_tsc += end - start; 782 } else if (rc > 0) { 783 /* Poller status busy */ 784 thread->stats.busy_tsc += end - start; 785 } 786 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 787 thread->tsc_last = end; 788 } 789 790 static inline int 791 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 792 { 793 int rc; 794 795 switch (poller->state) { 796 case SPDK_POLLER_STATE_UNREGISTERED: 797 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 798 free(poller); 799 return 0; 800 case SPDK_POLLER_STATE_PAUSING: 801 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 802 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 803 poller->state = SPDK_POLLER_STATE_PAUSED; 804 return 0; 805 case SPDK_POLLER_STATE_WAITING: 806 break; 807 default: 808 assert(false); 809 break; 810 } 811 812 poller->state = SPDK_POLLER_STATE_RUNNING; 813 rc = poller->fn(poller->arg); 814 815 poller->run_count++; 816 if (rc > 0) { 817 poller->busy_count++; 818 } 819 820 #ifdef DEBUG 821 if (rc == -1) { 822 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 823 } 824 #endif 825 826 switch (poller->state) { 827 case SPDK_POLLER_STATE_UNREGISTERED: 828 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 829 free(poller); 830 break; 831 case SPDK_POLLER_STATE_PAUSING: 832 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 833 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 834 poller->state = SPDK_POLLER_STATE_PAUSED; 835 break; 836 case SPDK_POLLER_STATE_PAUSED: 837 case SPDK_POLLER_STATE_WAITING: 838 break; 839 case SPDK_POLLER_STATE_RUNNING: 840 poller->state = SPDK_POLLER_STATE_WAITING; 841 break; 842 default: 843 assert(false); 844 break; 845 } 846 847 return rc; 848 } 849 850 static inline int 851 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 852 uint64_t now) 853 { 854 int rc; 855 856 switch (poller->state) { 857 case SPDK_POLLER_STATE_UNREGISTERED: 858 free(poller); 859 return 0; 860 case SPDK_POLLER_STATE_PAUSING: 861 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 862 poller->state = SPDK_POLLER_STATE_PAUSED; 863 return 0; 864 case SPDK_POLLER_STATE_WAITING: 865 break; 866 default: 867 assert(false); 868 break; 869 } 870 871 poller->state = SPDK_POLLER_STATE_RUNNING; 872 rc = poller->fn(poller->arg); 873 874 poller->run_count++; 875 if (rc > 0) { 876 poller->busy_count++; 877 } 878 879 #ifdef DEBUG 880 if (rc == -1) { 881 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 882 } 883 #endif 884 885 switch (poller->state) { 886 case SPDK_POLLER_STATE_UNREGISTERED: 887 free(poller); 888 break; 889 case SPDK_POLLER_STATE_PAUSING: 890 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 891 poller->state = SPDK_POLLER_STATE_PAUSED; 892 break; 893 case SPDK_POLLER_STATE_PAUSED: 894 break; 895 case SPDK_POLLER_STATE_RUNNING: 896 poller->state = SPDK_POLLER_STATE_WAITING; 897 /* fallthrough */ 898 case SPDK_POLLER_STATE_WAITING: 899 poller_insert_timer(thread, poller, now); 900 break; 901 default: 902 assert(false); 903 break; 904 } 905 906 return rc; 907 } 908 909 static int 910 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 911 { 912 uint32_t msg_count; 913 struct spdk_poller *poller, *tmp; 914 spdk_msg_fn critical_msg; 915 int rc = 0; 916 917 thread->tsc_last = now; 918 919 critical_msg = thread->critical_msg; 920 if (spdk_unlikely(critical_msg != NULL)) { 921 critical_msg(NULL); 922 thread->critical_msg = NULL; 923 rc = 1; 924 } 925 926 msg_count = msg_queue_run_batch(thread, max_msgs); 927 if (msg_count) { 928 rc = 1; 929 } 930 931 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 932 active_pollers_head, tailq, tmp) { 933 int poller_rc; 934 935 poller_rc = thread_execute_poller(thread, poller); 936 if (poller_rc > rc) { 937 rc = poller_rc; 938 } 939 } 940 941 poller = thread->first_timed_poller; 942 while (poller != NULL) { 943 int timer_rc = 0; 944 945 if (now < poller->next_run_tick) { 946 break; 947 } 948 949 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 950 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 951 952 /* Update the cache to the next timed poller in the list 953 * only if the current poller is still the closest, otherwise, 954 * do nothing because the cache has been already updated. 955 */ 956 if (thread->first_timed_poller == poller) { 957 thread->first_timed_poller = tmp; 958 } 959 960 timer_rc = thread_execute_timed_poller(thread, poller, now); 961 if (timer_rc > rc) { 962 rc = timer_rc; 963 } 964 965 poller = tmp; 966 } 967 968 return rc; 969 } 970 971 int 972 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 973 { 974 struct spdk_thread *orig_thread; 975 int rc; 976 uint64_t notify = 1; 977 978 orig_thread = _get_thread(); 979 tls_thread = thread; 980 981 if (now == 0) { 982 now = spdk_get_ticks(); 983 } 984 985 if (spdk_likely(!thread->in_interrupt)) { 986 rc = thread_poll(thread, max_msgs, now); 987 if (spdk_unlikely(thread->in_interrupt)) { 988 /* The thread transitioned to interrupt mode during the above poll. 989 * Poll it one more time in case that during the transition time 990 * there is msg received without notification. 991 */ 992 rc = thread_poll(thread, max_msgs, now); 993 } 994 } else { 995 /* Non-block wait on thread's fd_group */ 996 rc = spdk_fd_group_wait(thread->fgrp, 0); 997 if (spdk_unlikely(!thread->in_interrupt)) { 998 /* The thread transitioned to poll mode in a msg during the above processing. 999 * Clear msg_fd since thread messages will be polled directly in poll mode. 1000 */ 1001 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 1002 if (rc < 0 && errno != EAGAIN) { 1003 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 1004 } 1005 } 1006 1007 /* Reap unregistered pollers out of poller execution in intr mode */ 1008 if (spdk_unlikely(thread->poller_unregistered)) { 1009 struct spdk_poller *poller, *tmp; 1010 1011 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1012 active_pollers_head, tailq, tmp) { 1013 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1014 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1015 free(poller); 1016 } 1017 } 1018 1019 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1020 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1021 poller_remove_timer(thread, poller); 1022 free(poller); 1023 } 1024 } 1025 1026 thread->poller_unregistered = false; 1027 } 1028 } 1029 1030 1031 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1032 thread_exit(thread, now); 1033 } 1034 1035 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1036 1037 tls_thread = orig_thread; 1038 1039 return rc; 1040 } 1041 1042 uint64_t 1043 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1044 { 1045 struct spdk_poller *poller; 1046 1047 poller = thread->first_timed_poller; 1048 if (poller) { 1049 return poller->next_run_tick; 1050 } 1051 1052 return 0; 1053 } 1054 1055 int 1056 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1057 { 1058 return !TAILQ_EMPTY(&thread->active_pollers); 1059 } 1060 1061 static bool 1062 thread_has_unpaused_pollers(struct spdk_thread *thread) 1063 { 1064 if (TAILQ_EMPTY(&thread->active_pollers) && 1065 RB_EMPTY(&thread->timed_pollers)) { 1066 return false; 1067 } 1068 1069 return true; 1070 } 1071 1072 bool 1073 spdk_thread_has_pollers(struct spdk_thread *thread) 1074 { 1075 if (!thread_has_unpaused_pollers(thread) && 1076 TAILQ_EMPTY(&thread->paused_pollers)) { 1077 return false; 1078 } 1079 1080 return true; 1081 } 1082 1083 bool 1084 spdk_thread_is_idle(struct spdk_thread *thread) 1085 { 1086 if (spdk_ring_count(thread->messages) || 1087 thread_has_unpaused_pollers(thread) || 1088 thread->critical_msg != NULL) { 1089 return false; 1090 } 1091 1092 return true; 1093 } 1094 1095 uint32_t 1096 spdk_thread_get_count(void) 1097 { 1098 /* 1099 * Return cached value of the current thread count. We could acquire the 1100 * lock and iterate through the TAILQ of threads to count them, but that 1101 * count could still be invalidated after we release the lock. 1102 */ 1103 return g_thread_count; 1104 } 1105 1106 struct spdk_thread * 1107 spdk_get_thread(void) 1108 { 1109 return _get_thread(); 1110 } 1111 1112 const char * 1113 spdk_thread_get_name(const struct spdk_thread *thread) 1114 { 1115 return thread->name; 1116 } 1117 1118 uint64_t 1119 spdk_thread_get_id(const struct spdk_thread *thread) 1120 { 1121 return thread->id; 1122 } 1123 1124 struct spdk_thread * 1125 spdk_thread_get_by_id(uint64_t id) 1126 { 1127 struct spdk_thread *thread; 1128 1129 if (id == 0 || id >= g_thread_id) { 1130 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1131 return NULL; 1132 } 1133 pthread_mutex_lock(&g_devlist_mutex); 1134 TAILQ_FOREACH(thread, &g_threads, tailq) { 1135 if (thread->id == id) { 1136 break; 1137 } 1138 } 1139 pthread_mutex_unlock(&g_devlist_mutex); 1140 return thread; 1141 } 1142 1143 int 1144 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1145 { 1146 struct spdk_thread *thread; 1147 1148 thread = _get_thread(); 1149 if (!thread) { 1150 SPDK_ERRLOG("No thread allocated\n"); 1151 return -EINVAL; 1152 } 1153 1154 if (stats == NULL) { 1155 return -EINVAL; 1156 } 1157 1158 *stats = thread->stats; 1159 1160 return 0; 1161 } 1162 1163 uint64_t 1164 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1165 { 1166 if (thread == NULL) { 1167 thread = _get_thread(); 1168 } 1169 1170 return thread->tsc_last; 1171 } 1172 1173 static inline int 1174 thread_send_msg_notification(const struct spdk_thread *target_thread) 1175 { 1176 uint64_t notify = 1; 1177 int rc; 1178 1179 /* Not necessary to do notification if interrupt facility is not enabled */ 1180 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1181 return 0; 1182 } 1183 1184 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1185 * after sending thread msg, it is necessary to check whether target thread runs in 1186 * interrupt mode and then decide whether do event notification. 1187 */ 1188 if (spdk_unlikely(target_thread->in_interrupt)) { 1189 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1190 if (rc < 0) { 1191 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1192 return -EIO; 1193 } 1194 } 1195 1196 return 0; 1197 } 1198 1199 int 1200 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1201 { 1202 struct spdk_thread *local_thread; 1203 struct spdk_msg *msg; 1204 int rc; 1205 1206 assert(thread != NULL); 1207 1208 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1209 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1210 return -EIO; 1211 } 1212 1213 local_thread = _get_thread(); 1214 1215 msg = NULL; 1216 if (local_thread != NULL) { 1217 if (local_thread->msg_cache_count > 0) { 1218 msg = SLIST_FIRST(&local_thread->msg_cache); 1219 assert(msg != NULL); 1220 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1221 local_thread->msg_cache_count--; 1222 } 1223 } 1224 1225 if (msg == NULL) { 1226 msg = spdk_mempool_get(g_spdk_msg_mempool); 1227 if (!msg) { 1228 SPDK_ERRLOG("msg could not be allocated\n"); 1229 return -ENOMEM; 1230 } 1231 } 1232 1233 msg->fn = fn; 1234 msg->arg = ctx; 1235 1236 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1237 if (rc != 1) { 1238 SPDK_ERRLOG("msg could not be enqueued\n"); 1239 spdk_mempool_put(g_spdk_msg_mempool, msg); 1240 return -EIO; 1241 } 1242 1243 return thread_send_msg_notification(thread); 1244 } 1245 1246 int 1247 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1248 { 1249 spdk_msg_fn expected = NULL; 1250 1251 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1252 __ATOMIC_SEQ_CST)) { 1253 return -EIO; 1254 } 1255 1256 return thread_send_msg_notification(thread); 1257 } 1258 1259 #ifdef __linux__ 1260 static int 1261 interrupt_timerfd_process(void *arg) 1262 { 1263 struct spdk_poller *poller = arg; 1264 uint64_t exp; 1265 int rc; 1266 1267 /* clear the level of interval timer */ 1268 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1269 if (rc < 0) { 1270 if (rc == -EAGAIN) { 1271 return 0; 1272 } 1273 1274 return rc; 1275 } 1276 1277 return poller->fn(poller->arg); 1278 } 1279 1280 static int 1281 period_poller_interrupt_init(struct spdk_poller *poller) 1282 { 1283 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1284 int timerfd; 1285 int rc; 1286 1287 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1288 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1289 if (timerfd < 0) { 1290 return -errno; 1291 } 1292 1293 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1294 if (rc < 0) { 1295 close(timerfd); 1296 return rc; 1297 } 1298 1299 poller->interruptfd = timerfd; 1300 return 0; 1301 } 1302 1303 static void 1304 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1305 { 1306 int timerfd = poller->interruptfd; 1307 uint64_t now_tick = spdk_get_ticks(); 1308 uint64_t ticks = spdk_get_ticks_hz(); 1309 int ret; 1310 struct itimerspec new_tv = {}; 1311 struct itimerspec old_tv = {}; 1312 1313 assert(poller->period_ticks != 0); 1314 assert(timerfd >= 0); 1315 1316 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1317 interrupt_mode ? "interrupt" : "poll"); 1318 1319 if (interrupt_mode) { 1320 /* Set repeated timer expiration */ 1321 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1322 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1323 1324 /* Update next timer expiration */ 1325 if (poller->next_run_tick == 0) { 1326 poller->next_run_tick = now_tick + poller->period_ticks; 1327 } else if (poller->next_run_tick < now_tick) { 1328 poller->next_run_tick = now_tick; 1329 } 1330 1331 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1332 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1333 1334 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1335 if (ret < 0) { 1336 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1337 assert(false); 1338 } 1339 } else { 1340 /* Disarm the timer */ 1341 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1342 if (ret < 0) { 1343 /* timerfd_settime's failure indicates that the timerfd is in error */ 1344 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1345 assert(false); 1346 } 1347 1348 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1349 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1350 */ 1351 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1352 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1353 poller_remove_timer(poller->thread, poller); 1354 poller_insert_timer(poller->thread, poller, now_tick); 1355 } 1356 } 1357 1358 static void 1359 poller_interrupt_fini(struct spdk_poller *poller) 1360 { 1361 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1362 assert(poller->interruptfd >= 0); 1363 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1364 close(poller->interruptfd); 1365 poller->interruptfd = -1; 1366 } 1367 1368 static int 1369 busy_poller_interrupt_init(struct spdk_poller *poller) 1370 { 1371 int busy_efd; 1372 int rc; 1373 1374 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1375 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1376 if (busy_efd < 0) { 1377 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1378 return -errno; 1379 } 1380 1381 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1382 poller->fn, poller->arg, poller->name); 1383 if (rc < 0) { 1384 close(busy_efd); 1385 return rc; 1386 } 1387 1388 poller->interruptfd = busy_efd; 1389 return 0; 1390 } 1391 1392 static void 1393 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1394 { 1395 int busy_efd = poller->interruptfd; 1396 uint64_t notify = 1; 1397 int rc __attribute__((unused)); 1398 1399 assert(busy_efd >= 0); 1400 1401 if (interrupt_mode) { 1402 /* Write without read on eventfd will get it repeatedly triggered. */ 1403 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1404 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1405 } 1406 } else { 1407 /* Read on eventfd will clear its level triggering. */ 1408 rc = read(busy_efd, ¬ify, sizeof(notify)); 1409 } 1410 } 1411 1412 #else 1413 1414 static int 1415 period_poller_interrupt_init(struct spdk_poller *poller) 1416 { 1417 return -ENOTSUP; 1418 } 1419 1420 static void 1421 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1422 { 1423 } 1424 1425 static void 1426 poller_interrupt_fini(struct spdk_poller *poller) 1427 { 1428 } 1429 1430 static int 1431 busy_poller_interrupt_init(struct spdk_poller *poller) 1432 { 1433 return -ENOTSUP; 1434 } 1435 1436 static void 1437 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1438 { 1439 } 1440 1441 #endif 1442 1443 void 1444 spdk_poller_register_interrupt(struct spdk_poller *poller, 1445 spdk_poller_set_interrupt_mode_cb cb_fn, 1446 void *cb_arg) 1447 { 1448 assert(poller != NULL); 1449 assert(cb_fn != NULL); 1450 assert(spdk_get_thread() == poller->thread); 1451 1452 if (!spdk_interrupt_mode_is_enabled()) { 1453 return; 1454 } 1455 1456 /* when a poller is created we don't know if the user is ever going to 1457 * enable interrupts on it by calling this function, so the poller 1458 * registration function has to immediately create a interruptfd. 1459 * When this function does get called by user, we have to then destroy 1460 * that interruptfd. 1461 */ 1462 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1463 poller_interrupt_fini(poller); 1464 } 1465 1466 poller->set_intr_cb_fn = cb_fn; 1467 poller->set_intr_cb_arg = cb_arg; 1468 1469 /* Set poller into interrupt mode if thread is in interrupt. */ 1470 if (poller->thread->in_interrupt) { 1471 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1472 } 1473 } 1474 1475 static uint64_t 1476 convert_us_to_ticks(uint64_t us) 1477 { 1478 uint64_t quotient, remainder, ticks; 1479 1480 if (us) { 1481 quotient = us / SPDK_SEC_TO_USEC; 1482 remainder = us % SPDK_SEC_TO_USEC; 1483 ticks = spdk_get_ticks_hz(); 1484 1485 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1486 } else { 1487 return 0; 1488 } 1489 } 1490 1491 static struct spdk_poller * 1492 poller_register(spdk_poller_fn fn, 1493 void *arg, 1494 uint64_t period_microseconds, 1495 const char *name) 1496 { 1497 struct spdk_thread *thread; 1498 struct spdk_poller *poller; 1499 1500 thread = spdk_get_thread(); 1501 if (!thread) { 1502 assert(false); 1503 return NULL; 1504 } 1505 1506 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1507 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1508 return NULL; 1509 } 1510 1511 poller = calloc(1, sizeof(*poller)); 1512 if (poller == NULL) { 1513 SPDK_ERRLOG("Poller memory allocation failed\n"); 1514 return NULL; 1515 } 1516 1517 if (name) { 1518 snprintf(poller->name, sizeof(poller->name), "%s", name); 1519 } else { 1520 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1521 } 1522 1523 poller->state = SPDK_POLLER_STATE_WAITING; 1524 poller->fn = fn; 1525 poller->arg = arg; 1526 poller->thread = thread; 1527 poller->interruptfd = -1; 1528 if (thread->next_poller_id == 0) { 1529 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1530 thread->next_poller_id = 1; 1531 } 1532 poller->id = thread->next_poller_id++; 1533 1534 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1535 1536 if (spdk_interrupt_mode_is_enabled()) { 1537 int rc; 1538 1539 if (period_microseconds) { 1540 rc = period_poller_interrupt_init(poller); 1541 if (rc < 0) { 1542 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1543 free(poller); 1544 return NULL; 1545 } 1546 1547 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1548 } else { 1549 /* If the poller doesn't have a period, create interruptfd that's always 1550 * busy automatically when running in interrupt mode. 1551 */ 1552 rc = busy_poller_interrupt_init(poller); 1553 if (rc > 0) { 1554 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1555 free(poller); 1556 return NULL; 1557 } 1558 1559 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1560 } 1561 } 1562 1563 thread_insert_poller(thread, poller); 1564 1565 return poller; 1566 } 1567 1568 struct spdk_poller * 1569 spdk_poller_register(spdk_poller_fn fn, 1570 void *arg, 1571 uint64_t period_microseconds) 1572 { 1573 return poller_register(fn, arg, period_microseconds, NULL); 1574 } 1575 1576 struct spdk_poller * 1577 spdk_poller_register_named(spdk_poller_fn fn, 1578 void *arg, 1579 uint64_t period_microseconds, 1580 const char *name) 1581 { 1582 return poller_register(fn, arg, period_microseconds, name); 1583 } 1584 1585 static void 1586 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1587 struct spdk_thread *curthread) 1588 { 1589 if (thread == NULL) { 1590 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1591 abort(); 1592 } 1593 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1594 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1595 thread->name, thread->id); 1596 assert(false); 1597 } 1598 1599 void 1600 spdk_poller_unregister(struct spdk_poller **ppoller) 1601 { 1602 struct spdk_thread *thread; 1603 struct spdk_poller *poller; 1604 1605 poller = *ppoller; 1606 if (poller == NULL) { 1607 return; 1608 } 1609 1610 *ppoller = NULL; 1611 1612 thread = spdk_get_thread(); 1613 if (!thread) { 1614 assert(false); 1615 return; 1616 } 1617 1618 if (poller->thread != thread) { 1619 wrong_thread(__func__, poller->name, poller->thread, thread); 1620 return; 1621 } 1622 1623 if (spdk_interrupt_mode_is_enabled()) { 1624 /* Release the interrupt resource for period or busy poller */ 1625 if (poller->interruptfd >= 0) { 1626 poller_interrupt_fini(poller); 1627 } 1628 1629 /* Mark there is poller unregistered. Then unregistered pollers will 1630 * get reaped by spdk_thread_poll also in intr mode. 1631 */ 1632 thread->poller_unregistered = true; 1633 } 1634 1635 /* If the poller was paused, put it on the active_pollers list so that 1636 * its unregistration can be processed by spdk_thread_poll(). 1637 */ 1638 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1639 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1640 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1641 poller->period_ticks = 0; 1642 } 1643 1644 /* Simply set the state to unregistered. The poller will get cleaned up 1645 * in a subsequent call to spdk_thread_poll(). 1646 */ 1647 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1648 } 1649 1650 void 1651 spdk_poller_pause(struct spdk_poller *poller) 1652 { 1653 struct spdk_thread *thread; 1654 1655 thread = spdk_get_thread(); 1656 if (!thread) { 1657 assert(false); 1658 return; 1659 } 1660 1661 if (poller->thread != thread) { 1662 wrong_thread(__func__, poller->name, poller->thread, thread); 1663 return; 1664 } 1665 1666 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1667 * spdk_thread_poll() move it. It allows a poller to be paused from 1668 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1669 * iteration, or from within itself without breaking the logic to always 1670 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1671 */ 1672 switch (poller->state) { 1673 case SPDK_POLLER_STATE_PAUSED: 1674 case SPDK_POLLER_STATE_PAUSING: 1675 break; 1676 case SPDK_POLLER_STATE_RUNNING: 1677 case SPDK_POLLER_STATE_WAITING: 1678 poller->state = SPDK_POLLER_STATE_PAUSING; 1679 break; 1680 default: 1681 assert(false); 1682 break; 1683 } 1684 } 1685 1686 void 1687 spdk_poller_resume(struct spdk_poller *poller) 1688 { 1689 struct spdk_thread *thread; 1690 1691 thread = spdk_get_thread(); 1692 if (!thread) { 1693 assert(false); 1694 return; 1695 } 1696 1697 if (poller->thread != thread) { 1698 wrong_thread(__func__, poller->name, poller->thread, thread); 1699 return; 1700 } 1701 1702 /* If a poller is paused it has to be removed from the paused pollers 1703 * list and put on the active list or timer tree depending on its 1704 * period_ticks. If a poller is still in the process of being paused, 1705 * we just need to flip its state back to waiting, as it's already on 1706 * the appropriate list or tree. 1707 */ 1708 switch (poller->state) { 1709 case SPDK_POLLER_STATE_PAUSED: 1710 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1711 thread_insert_poller(thread, poller); 1712 /* fallthrough */ 1713 case SPDK_POLLER_STATE_PAUSING: 1714 poller->state = SPDK_POLLER_STATE_WAITING; 1715 break; 1716 case SPDK_POLLER_STATE_RUNNING: 1717 case SPDK_POLLER_STATE_WAITING: 1718 break; 1719 default: 1720 assert(false); 1721 break; 1722 } 1723 } 1724 1725 const char * 1726 spdk_poller_get_name(struct spdk_poller *poller) 1727 { 1728 return poller->name; 1729 } 1730 1731 uint64_t 1732 spdk_poller_get_id(struct spdk_poller *poller) 1733 { 1734 return poller->id; 1735 } 1736 1737 const char * 1738 spdk_poller_get_state_str(struct spdk_poller *poller) 1739 { 1740 switch (poller->state) { 1741 case SPDK_POLLER_STATE_WAITING: 1742 return "waiting"; 1743 case SPDK_POLLER_STATE_RUNNING: 1744 return "running"; 1745 case SPDK_POLLER_STATE_UNREGISTERED: 1746 return "unregistered"; 1747 case SPDK_POLLER_STATE_PAUSING: 1748 return "pausing"; 1749 case SPDK_POLLER_STATE_PAUSED: 1750 return "paused"; 1751 default: 1752 return NULL; 1753 } 1754 } 1755 1756 uint64_t 1757 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1758 { 1759 return poller->period_ticks; 1760 } 1761 1762 void 1763 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1764 { 1765 stats->run_count = poller->run_count; 1766 stats->busy_count = poller->busy_count; 1767 } 1768 1769 struct spdk_poller * 1770 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1771 { 1772 return TAILQ_FIRST(&thread->active_pollers); 1773 } 1774 1775 struct spdk_poller * 1776 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1777 { 1778 return TAILQ_NEXT(prev, tailq); 1779 } 1780 1781 struct spdk_poller * 1782 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1783 { 1784 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1785 } 1786 1787 struct spdk_poller * 1788 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1789 { 1790 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1791 } 1792 1793 struct spdk_poller * 1794 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1795 { 1796 return TAILQ_FIRST(&thread->paused_pollers); 1797 } 1798 1799 struct spdk_poller * 1800 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1801 { 1802 return TAILQ_NEXT(prev, tailq); 1803 } 1804 1805 struct spdk_io_channel * 1806 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1807 { 1808 return RB_MIN(io_channel_tree, &thread->io_channels); 1809 } 1810 1811 struct spdk_io_channel * 1812 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1813 { 1814 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1815 } 1816 1817 struct call_thread { 1818 struct spdk_thread *cur_thread; 1819 spdk_msg_fn fn; 1820 void *ctx; 1821 1822 struct spdk_thread *orig_thread; 1823 spdk_msg_fn cpl; 1824 }; 1825 1826 static void 1827 _on_thread(void *ctx) 1828 { 1829 struct call_thread *ct = ctx; 1830 int rc __attribute__((unused)); 1831 1832 ct->fn(ct->ctx); 1833 1834 pthread_mutex_lock(&g_devlist_mutex); 1835 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1836 pthread_mutex_unlock(&g_devlist_mutex); 1837 1838 if (!ct->cur_thread) { 1839 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1840 1841 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1842 free(ctx); 1843 } else { 1844 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1845 ct->cur_thread->name); 1846 1847 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1848 } 1849 assert(rc == 0); 1850 } 1851 1852 void 1853 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1854 { 1855 struct call_thread *ct; 1856 struct spdk_thread *thread; 1857 int rc __attribute__((unused)); 1858 1859 ct = calloc(1, sizeof(*ct)); 1860 if (!ct) { 1861 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1862 cpl(ctx); 1863 return; 1864 } 1865 1866 ct->fn = fn; 1867 ct->ctx = ctx; 1868 ct->cpl = cpl; 1869 1870 thread = _get_thread(); 1871 if (!thread) { 1872 SPDK_ERRLOG("No thread allocated\n"); 1873 free(ct); 1874 cpl(ctx); 1875 return; 1876 } 1877 ct->orig_thread = thread; 1878 1879 pthread_mutex_lock(&g_devlist_mutex); 1880 ct->cur_thread = TAILQ_FIRST(&g_threads); 1881 pthread_mutex_unlock(&g_devlist_mutex); 1882 1883 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1884 ct->orig_thread->name); 1885 1886 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1887 assert(rc == 0); 1888 } 1889 1890 static inline void 1891 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1892 { 1893 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1894 return; 1895 } 1896 1897 if (!poller->set_intr_cb_fn) { 1898 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1899 assert(false); 1900 return; 1901 } 1902 1903 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1904 } 1905 1906 void 1907 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1908 { 1909 struct spdk_thread *thread = _get_thread(); 1910 struct spdk_poller *poller, *tmp; 1911 1912 assert(thread); 1913 assert(spdk_interrupt_mode_is_enabled()); 1914 1915 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1916 thread->name, enable_interrupt ? "intr" : "poll", 1917 thread->in_interrupt ? "intr" : "poll"); 1918 1919 if (thread->in_interrupt == enable_interrupt) { 1920 return; 1921 } 1922 1923 /* Set pollers to expected mode */ 1924 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1925 poller_set_interrupt_mode(poller, enable_interrupt); 1926 } 1927 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1928 poller_set_interrupt_mode(poller, enable_interrupt); 1929 } 1930 /* All paused pollers will go to work in interrupt mode */ 1931 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1932 poller_set_interrupt_mode(poller, enable_interrupt); 1933 } 1934 1935 thread->in_interrupt = enable_interrupt; 1936 return; 1937 } 1938 1939 static struct io_device * 1940 io_device_get(void *io_device) 1941 { 1942 struct io_device find = {}; 1943 1944 find.io_device = io_device; 1945 return RB_FIND(io_device_tree, &g_io_devices, &find); 1946 } 1947 1948 void 1949 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1950 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1951 const char *name) 1952 { 1953 struct io_device *dev, *tmp; 1954 struct spdk_thread *thread; 1955 1956 assert(io_device != NULL); 1957 assert(create_cb != NULL); 1958 assert(destroy_cb != NULL); 1959 1960 thread = spdk_get_thread(); 1961 if (!thread) { 1962 SPDK_ERRLOG("called from non-SPDK thread\n"); 1963 assert(false); 1964 return; 1965 } 1966 1967 dev = calloc(1, sizeof(struct io_device)); 1968 if (dev == NULL) { 1969 SPDK_ERRLOG("could not allocate io_device\n"); 1970 return; 1971 } 1972 1973 dev->io_device = io_device; 1974 if (name) { 1975 snprintf(dev->name, sizeof(dev->name), "%s", name); 1976 } else { 1977 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1978 } 1979 dev->create_cb = create_cb; 1980 dev->destroy_cb = destroy_cb; 1981 dev->unregister_cb = NULL; 1982 dev->ctx_size = ctx_size; 1983 dev->for_each_count = 0; 1984 dev->unregistered = false; 1985 dev->refcnt = 0; 1986 1987 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1988 dev->name, dev->io_device, thread->name); 1989 1990 pthread_mutex_lock(&g_devlist_mutex); 1991 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1992 if (tmp != NULL) { 1993 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1994 io_device, tmp->name, dev->name); 1995 free(dev); 1996 } 1997 1998 pthread_mutex_unlock(&g_devlist_mutex); 1999 } 2000 2001 static void 2002 _finish_unregister(void *arg) 2003 { 2004 struct io_device *dev = arg; 2005 struct spdk_thread *thread; 2006 2007 thread = spdk_get_thread(); 2008 assert(thread == dev->unregister_thread); 2009 2010 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2011 dev->name, dev->io_device, thread->name); 2012 2013 assert(thread->pending_unregister_count > 0); 2014 thread->pending_unregister_count--; 2015 2016 dev->unregister_cb(dev->io_device); 2017 free(dev); 2018 } 2019 2020 static void 2021 io_device_free(struct io_device *dev) 2022 { 2023 int rc __attribute__((unused)); 2024 2025 if (dev->unregister_cb == NULL) { 2026 free(dev); 2027 } else { 2028 assert(dev->unregister_thread != NULL); 2029 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2030 dev->name, dev->io_device, dev->unregister_thread->name); 2031 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2032 assert(rc == 0); 2033 } 2034 } 2035 2036 void 2037 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2038 { 2039 struct io_device *dev; 2040 uint32_t refcnt; 2041 struct spdk_thread *thread; 2042 2043 thread = spdk_get_thread(); 2044 if (!thread) { 2045 SPDK_ERRLOG("called from non-SPDK thread\n"); 2046 assert(false); 2047 return; 2048 } 2049 2050 pthread_mutex_lock(&g_devlist_mutex); 2051 dev = io_device_get(io_device); 2052 if (!dev) { 2053 SPDK_ERRLOG("io_device %p not found\n", io_device); 2054 assert(false); 2055 pthread_mutex_unlock(&g_devlist_mutex); 2056 return; 2057 } 2058 2059 if (dev->for_each_count > 0) { 2060 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2061 dev->name, io_device, dev->for_each_count); 2062 pthread_mutex_unlock(&g_devlist_mutex); 2063 return; 2064 } 2065 2066 dev->unregister_cb = unregister_cb; 2067 dev->unregistered = true; 2068 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2069 refcnt = dev->refcnt; 2070 dev->unregister_thread = thread; 2071 pthread_mutex_unlock(&g_devlist_mutex); 2072 2073 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2074 dev->name, dev->io_device, thread->name); 2075 2076 if (unregister_cb) { 2077 thread->pending_unregister_count++; 2078 } 2079 2080 if (refcnt > 0) { 2081 /* defer deletion */ 2082 return; 2083 } 2084 2085 io_device_free(dev); 2086 } 2087 2088 const char * 2089 spdk_io_device_get_name(struct io_device *dev) 2090 { 2091 return dev->name; 2092 } 2093 2094 static struct spdk_io_channel * 2095 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2096 { 2097 struct spdk_io_channel find = {}; 2098 2099 find.dev = dev; 2100 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2101 } 2102 2103 struct spdk_io_channel * 2104 spdk_get_io_channel(void *io_device) 2105 { 2106 struct spdk_io_channel *ch; 2107 struct spdk_thread *thread; 2108 struct io_device *dev; 2109 int rc; 2110 2111 pthread_mutex_lock(&g_devlist_mutex); 2112 dev = io_device_get(io_device); 2113 if (dev == NULL) { 2114 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2115 pthread_mutex_unlock(&g_devlist_mutex); 2116 return NULL; 2117 } 2118 2119 thread = _get_thread(); 2120 if (!thread) { 2121 SPDK_ERRLOG("No thread allocated\n"); 2122 pthread_mutex_unlock(&g_devlist_mutex); 2123 return NULL; 2124 } 2125 2126 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2127 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2128 pthread_mutex_unlock(&g_devlist_mutex); 2129 return NULL; 2130 } 2131 2132 ch = thread_get_io_channel(thread, dev); 2133 if (ch != NULL) { 2134 ch->ref++; 2135 2136 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2137 ch, dev->name, dev->io_device, thread->name, ch->ref); 2138 2139 /* 2140 * An I/O channel already exists for this device on this 2141 * thread, so return it. 2142 */ 2143 pthread_mutex_unlock(&g_devlist_mutex); 2144 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2145 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2146 return ch; 2147 } 2148 2149 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2150 if (ch == NULL) { 2151 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2152 pthread_mutex_unlock(&g_devlist_mutex); 2153 return NULL; 2154 } 2155 2156 ch->dev = dev; 2157 ch->destroy_cb = dev->destroy_cb; 2158 ch->thread = thread; 2159 ch->ref = 1; 2160 ch->destroy_ref = 0; 2161 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2162 2163 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2164 ch, dev->name, dev->io_device, thread->name, ch->ref); 2165 2166 dev->refcnt++; 2167 2168 pthread_mutex_unlock(&g_devlist_mutex); 2169 2170 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2171 if (rc != 0) { 2172 pthread_mutex_lock(&g_devlist_mutex); 2173 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2174 dev->refcnt--; 2175 free(ch); 2176 pthread_mutex_unlock(&g_devlist_mutex); 2177 return NULL; 2178 } 2179 2180 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2181 return ch; 2182 } 2183 2184 static void 2185 put_io_channel(void *arg) 2186 { 2187 struct spdk_io_channel *ch = arg; 2188 bool do_remove_dev = true; 2189 struct spdk_thread *thread; 2190 2191 thread = spdk_get_thread(); 2192 if (!thread) { 2193 SPDK_ERRLOG("called from non-SPDK thread\n"); 2194 assert(false); 2195 return; 2196 } 2197 2198 SPDK_DEBUGLOG(thread, 2199 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2200 ch, ch->dev->name, ch->dev->io_device, thread->name); 2201 2202 assert(ch->thread == thread); 2203 2204 ch->destroy_ref--; 2205 2206 if (ch->ref > 0 || ch->destroy_ref > 0) { 2207 /* 2208 * Another reference to the associated io_device was requested 2209 * after this message was sent but before it had a chance to 2210 * execute. 2211 */ 2212 return; 2213 } 2214 2215 pthread_mutex_lock(&g_devlist_mutex); 2216 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2217 pthread_mutex_unlock(&g_devlist_mutex); 2218 2219 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2220 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2221 2222 pthread_mutex_lock(&g_devlist_mutex); 2223 ch->dev->refcnt--; 2224 2225 if (!ch->dev->unregistered) { 2226 do_remove_dev = false; 2227 } 2228 2229 if (ch->dev->refcnt > 0) { 2230 do_remove_dev = false; 2231 } 2232 2233 pthread_mutex_unlock(&g_devlist_mutex); 2234 2235 if (do_remove_dev) { 2236 io_device_free(ch->dev); 2237 } 2238 free(ch); 2239 } 2240 2241 void 2242 spdk_put_io_channel(struct spdk_io_channel *ch) 2243 { 2244 struct spdk_thread *thread; 2245 int rc __attribute__((unused)); 2246 2247 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2248 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2249 2250 thread = spdk_get_thread(); 2251 if (!thread) { 2252 SPDK_ERRLOG("called from non-SPDK thread\n"); 2253 assert(false); 2254 return; 2255 } 2256 2257 if (ch->thread != thread) { 2258 wrong_thread(__func__, "ch", ch->thread, thread); 2259 return; 2260 } 2261 2262 SPDK_DEBUGLOG(thread, 2263 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2264 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2265 2266 ch->ref--; 2267 2268 if (ch->ref == 0) { 2269 ch->destroy_ref++; 2270 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2271 assert(rc == 0); 2272 } 2273 } 2274 2275 struct spdk_io_channel * 2276 spdk_io_channel_from_ctx(void *ctx) 2277 { 2278 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2279 } 2280 2281 struct spdk_thread * 2282 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2283 { 2284 return ch->thread; 2285 } 2286 2287 void * 2288 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2289 { 2290 return ch->dev->io_device; 2291 } 2292 2293 const char * 2294 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2295 { 2296 return spdk_io_device_get_name(ch->dev); 2297 } 2298 2299 int 2300 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2301 { 2302 return ch->ref; 2303 } 2304 2305 struct spdk_io_channel_iter { 2306 void *io_device; 2307 struct io_device *dev; 2308 spdk_channel_msg fn; 2309 int status; 2310 void *ctx; 2311 struct spdk_io_channel *ch; 2312 2313 struct spdk_thread *cur_thread; 2314 2315 struct spdk_thread *orig_thread; 2316 spdk_channel_for_each_cpl cpl; 2317 }; 2318 2319 void * 2320 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2321 { 2322 return i->io_device; 2323 } 2324 2325 struct spdk_io_channel * 2326 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2327 { 2328 return i->ch; 2329 } 2330 2331 void * 2332 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2333 { 2334 return i->ctx; 2335 } 2336 2337 static void 2338 _call_completion(void *ctx) 2339 { 2340 struct spdk_io_channel_iter *i = ctx; 2341 2342 if (i->cpl != NULL) { 2343 i->cpl(i, i->status); 2344 } 2345 free(i); 2346 } 2347 2348 static void 2349 _call_channel(void *ctx) 2350 { 2351 struct spdk_io_channel_iter *i = ctx; 2352 struct spdk_io_channel *ch; 2353 2354 /* 2355 * It is possible that the channel was deleted before this 2356 * message had a chance to execute. If so, skip calling 2357 * the fn() on this thread. 2358 */ 2359 pthread_mutex_lock(&g_devlist_mutex); 2360 ch = thread_get_io_channel(i->cur_thread, i->dev); 2361 pthread_mutex_unlock(&g_devlist_mutex); 2362 2363 if (ch) { 2364 i->fn(i); 2365 } else { 2366 spdk_for_each_channel_continue(i, 0); 2367 } 2368 } 2369 2370 void 2371 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2372 spdk_channel_for_each_cpl cpl) 2373 { 2374 struct spdk_thread *thread; 2375 struct spdk_io_channel *ch; 2376 struct spdk_io_channel_iter *i; 2377 int rc __attribute__((unused)); 2378 2379 i = calloc(1, sizeof(*i)); 2380 if (!i) { 2381 SPDK_ERRLOG("Unable to allocate iterator\n"); 2382 return; 2383 } 2384 2385 i->io_device = io_device; 2386 i->fn = fn; 2387 i->ctx = ctx; 2388 i->cpl = cpl; 2389 i->orig_thread = _get_thread(); 2390 2391 pthread_mutex_lock(&g_devlist_mutex); 2392 i->dev = io_device_get(io_device); 2393 if (i->dev == NULL) { 2394 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2395 assert(false); 2396 goto end; 2397 } 2398 2399 TAILQ_FOREACH(thread, &g_threads, tailq) { 2400 ch = thread_get_io_channel(thread, i->dev); 2401 if (ch != NULL) { 2402 ch->dev->for_each_count++; 2403 i->cur_thread = thread; 2404 i->ch = ch; 2405 pthread_mutex_unlock(&g_devlist_mutex); 2406 rc = spdk_thread_send_msg(thread, _call_channel, i); 2407 assert(rc == 0); 2408 return; 2409 } 2410 } 2411 2412 end: 2413 pthread_mutex_unlock(&g_devlist_mutex); 2414 2415 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2416 assert(rc == 0); 2417 } 2418 2419 void 2420 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2421 { 2422 struct spdk_thread *thread; 2423 struct spdk_io_channel *ch; 2424 int rc __attribute__((unused)); 2425 2426 assert(i->cur_thread == spdk_get_thread()); 2427 2428 i->status = status; 2429 2430 pthread_mutex_lock(&g_devlist_mutex); 2431 if (status) { 2432 goto end; 2433 } 2434 thread = TAILQ_NEXT(i->cur_thread, tailq); 2435 while (thread) { 2436 ch = thread_get_io_channel(thread, i->dev); 2437 if (ch != NULL) { 2438 i->cur_thread = thread; 2439 i->ch = ch; 2440 pthread_mutex_unlock(&g_devlist_mutex); 2441 rc = spdk_thread_send_msg(thread, _call_channel, i); 2442 assert(rc == 0); 2443 return; 2444 } 2445 thread = TAILQ_NEXT(thread, tailq); 2446 } 2447 2448 end: 2449 i->dev->for_each_count--; 2450 i->ch = NULL; 2451 pthread_mutex_unlock(&g_devlist_mutex); 2452 2453 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2454 assert(rc == 0); 2455 } 2456 2457 struct spdk_interrupt { 2458 int efd; 2459 struct spdk_thread *thread; 2460 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2461 }; 2462 2463 static void 2464 thread_interrupt_destroy(struct spdk_thread *thread) 2465 { 2466 struct spdk_fd_group *fgrp = thread->fgrp; 2467 2468 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2469 2470 if (thread->msg_fd < 0) { 2471 return; 2472 } 2473 2474 spdk_fd_group_remove(fgrp, thread->msg_fd); 2475 close(thread->msg_fd); 2476 thread->msg_fd = -1; 2477 2478 spdk_fd_group_destroy(fgrp); 2479 thread->fgrp = NULL; 2480 } 2481 2482 #ifdef __linux__ 2483 static int 2484 thread_interrupt_msg_process(void *arg) 2485 { 2486 struct spdk_thread *thread = arg; 2487 uint32_t msg_count; 2488 spdk_msg_fn critical_msg; 2489 int rc = 0; 2490 uint64_t notify = 1; 2491 2492 assert(spdk_interrupt_mode_is_enabled()); 2493 2494 /* There may be race between msg_acknowledge and another producer's msg_notify, 2495 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2496 * This can avoid msg notification missing. 2497 */ 2498 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2499 if (rc < 0 && errno != EAGAIN) { 2500 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2501 } 2502 2503 critical_msg = thread->critical_msg; 2504 if (spdk_unlikely(critical_msg != NULL)) { 2505 critical_msg(NULL); 2506 thread->critical_msg = NULL; 2507 rc = 1; 2508 } 2509 2510 msg_count = msg_queue_run_batch(thread, 0); 2511 if (msg_count) { 2512 rc = 1; 2513 } 2514 2515 return rc; 2516 } 2517 2518 static int 2519 thread_interrupt_create(struct spdk_thread *thread) 2520 { 2521 int rc; 2522 2523 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2524 2525 rc = spdk_fd_group_create(&thread->fgrp); 2526 if (rc) { 2527 return rc; 2528 } 2529 2530 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2531 if (thread->msg_fd < 0) { 2532 rc = -errno; 2533 spdk_fd_group_destroy(thread->fgrp); 2534 thread->fgrp = NULL; 2535 2536 return rc; 2537 } 2538 2539 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2540 thread_interrupt_msg_process, thread); 2541 } 2542 #else 2543 static int 2544 thread_interrupt_create(struct spdk_thread *thread) 2545 { 2546 return -ENOTSUP; 2547 } 2548 #endif 2549 2550 struct spdk_interrupt * 2551 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2552 void *arg, const char *name) 2553 { 2554 struct spdk_thread *thread; 2555 struct spdk_interrupt *intr; 2556 int ret; 2557 2558 thread = spdk_get_thread(); 2559 if (!thread) { 2560 assert(false); 2561 return NULL; 2562 } 2563 2564 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2565 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2566 return NULL; 2567 } 2568 2569 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2570 2571 if (ret != 0) { 2572 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2573 thread->name, efd, spdk_strerror(-ret)); 2574 return NULL; 2575 } 2576 2577 intr = calloc(1, sizeof(*intr)); 2578 if (intr == NULL) { 2579 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2580 return NULL; 2581 } 2582 2583 if (name) { 2584 snprintf(intr->name, sizeof(intr->name), "%s", name); 2585 } else { 2586 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2587 } 2588 2589 intr->efd = efd; 2590 intr->thread = thread; 2591 2592 return intr; 2593 } 2594 2595 void 2596 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2597 { 2598 struct spdk_thread *thread; 2599 struct spdk_interrupt *intr; 2600 2601 intr = *pintr; 2602 if (intr == NULL) { 2603 return; 2604 } 2605 2606 *pintr = NULL; 2607 2608 thread = spdk_get_thread(); 2609 if (!thread) { 2610 assert(false); 2611 return; 2612 } 2613 2614 if (intr->thread != thread) { 2615 wrong_thread(__func__, intr->name, intr->thread, thread); 2616 return; 2617 } 2618 2619 spdk_fd_group_remove(thread->fgrp, intr->efd); 2620 free(intr); 2621 } 2622 2623 int 2624 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2625 enum spdk_interrupt_event_types event_types) 2626 { 2627 struct spdk_thread *thread; 2628 2629 thread = spdk_get_thread(); 2630 if (!thread) { 2631 assert(false); 2632 return -EINVAL; 2633 } 2634 2635 if (intr->thread != thread) { 2636 wrong_thread(__func__, intr->name, intr->thread, thread); 2637 return -EINVAL; 2638 } 2639 2640 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2641 } 2642 2643 int 2644 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2645 { 2646 return spdk_fd_group_get_fd(thread->fgrp); 2647 } 2648 2649 static bool g_interrupt_mode = false; 2650 2651 int 2652 spdk_interrupt_mode_enable(void) 2653 { 2654 /* It must be called once prior to initializing the threading library. 2655 * g_spdk_msg_mempool will be valid if thread library is initialized. 2656 */ 2657 if (g_spdk_msg_mempool) { 2658 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2659 return -1; 2660 } 2661 2662 #ifdef __linux__ 2663 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2664 g_interrupt_mode = true; 2665 return 0; 2666 #else 2667 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2668 g_interrupt_mode = false; 2669 return -ENOTSUP; 2670 #endif 2671 } 2672 2673 bool 2674 spdk_interrupt_mode_is_enabled(void) 2675 { 2676 return g_interrupt_mode; 2677 } 2678 2679 SPDK_LOG_REGISTER_COMPONENT(thread) 2680