1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/trace.h" 42 #include "spdk/util.h" 43 #include "spdk/fd_group.h" 44 45 #include "spdk/log.h" 46 #include "spdk_internal/thread.h" 47 #include "thread_internal.h" 48 49 #include "spdk_internal/trace_defs.h" 50 51 #ifdef __linux__ 52 #include <sys/timerfd.h> 53 #include <sys/eventfd.h> 54 #endif 55 56 #define SPDK_MSG_BATCH_SIZE 8 57 #define SPDK_MAX_DEVICE_NAME_LEN 256 58 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 59 #define SPDK_MAX_POLLER_NAME_LEN 256 60 #define SPDK_MAX_THREAD_NAME_LEN 256 61 62 enum spdk_poller_state { 63 /* The poller is registered with a thread but not currently executing its fn. */ 64 SPDK_POLLER_STATE_WAITING, 65 66 /* The poller is currently running its fn. */ 67 SPDK_POLLER_STATE_RUNNING, 68 69 /* The poller was unregistered during the execution of its fn. */ 70 SPDK_POLLER_STATE_UNREGISTERED, 71 72 /* The poller is in the process of being paused. It will be paused 73 * during the next time it's supposed to be executed. 74 */ 75 SPDK_POLLER_STATE_PAUSING, 76 77 /* The poller is registered but currently paused. It's on the 78 * paused_pollers list. 79 */ 80 SPDK_POLLER_STATE_PAUSED, 81 }; 82 83 struct spdk_poller { 84 TAILQ_ENTRY(spdk_poller) tailq; 85 RB_ENTRY(spdk_poller) node; 86 87 /* Current state of the poller; should only be accessed from the poller's thread. */ 88 enum spdk_poller_state state; 89 90 uint64_t period_ticks; 91 uint64_t next_run_tick; 92 uint64_t run_count; 93 uint64_t busy_count; 94 spdk_poller_fn fn; 95 void *arg; 96 struct spdk_thread *thread; 97 /* Native interruptfd for period or busy poller */ 98 int interruptfd; 99 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 100 void *set_intr_cb_arg; 101 102 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 103 }; 104 105 enum spdk_thread_state { 106 /* The thread is pocessing poller and message by spdk_thread_poll(). */ 107 SPDK_THREAD_STATE_RUNNING, 108 109 /* The thread is in the process of termination. It reaps unregistering 110 * poller are releasing I/O channel. 111 */ 112 SPDK_THREAD_STATE_EXITING, 113 114 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 115 SPDK_THREAD_STATE_EXITED, 116 }; 117 118 struct spdk_thread { 119 uint64_t tsc_last; 120 struct spdk_thread_stats stats; 121 /* 122 * Contains pollers actively running on this thread. Pollers 123 * are run round-robin. The thread takes one poller from the head 124 * of the ring, executes it, then puts it back at the tail of 125 * the ring. 126 */ 127 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 128 /** 129 * Contains pollers running on this thread with a periodic timer. 130 */ 131 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 132 struct spdk_poller *first_timed_poller; 133 /* 134 * Contains paused pollers. Pollers on this queue are waiting until 135 * they are resumed (in which case they're put onto the active/timer 136 * queues) or unregistered. 137 */ 138 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 139 struct spdk_ring *messages; 140 int msg_fd; 141 SLIST_HEAD(, spdk_msg) msg_cache; 142 size_t msg_cache_count; 143 spdk_msg_fn critical_msg; 144 uint64_t id; 145 enum spdk_thread_state state; 146 int pending_unregister_count; 147 148 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 149 TAILQ_ENTRY(spdk_thread) tailq; 150 151 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 152 struct spdk_cpuset cpumask; 153 uint64_t exit_timeout_tsc; 154 155 /* Indicates whether this spdk_thread currently runs in interrupt. */ 156 bool in_interrupt; 157 bool poller_unregistered; 158 struct spdk_fd_group *fgrp; 159 160 /* User context allocated at the end */ 161 uint8_t ctx[0]; 162 }; 163 164 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 165 166 static spdk_new_thread_fn g_new_thread_fn = NULL; 167 static spdk_thread_op_fn g_thread_op_fn = NULL; 168 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 169 static size_t g_ctx_sz = 0; 170 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 171 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 172 * SPDK application is required. 173 */ 174 static uint64_t g_thread_id = 1; 175 176 struct io_device { 177 void *io_device; 178 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 179 spdk_io_channel_create_cb create_cb; 180 spdk_io_channel_destroy_cb destroy_cb; 181 spdk_io_device_unregister_cb unregister_cb; 182 struct spdk_thread *unregister_thread; 183 uint32_t ctx_size; 184 uint32_t for_each_count; 185 RB_ENTRY(io_device) node; 186 187 uint32_t refcnt; 188 189 bool unregistered; 190 }; 191 192 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 193 194 static int 195 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 196 { 197 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 198 } 199 200 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 201 202 static int 203 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 204 { 205 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 206 } 207 208 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 209 210 struct spdk_msg { 211 spdk_msg_fn fn; 212 void *arg; 213 214 SLIST_ENTRY(spdk_msg) link; 215 }; 216 217 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 218 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 219 220 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 221 static uint32_t g_thread_count = 0; 222 223 static __thread struct spdk_thread *tls_thread = NULL; 224 225 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 226 { 227 spdk_trace_register_description("THREAD_IOCH_GET", 228 TRACE_THREAD_IOCH_GET, 229 OWNER_NONE, OBJECT_NONE, 0, 230 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 231 spdk_trace_register_description("THREAD_IOCH_PUT", 232 TRACE_THREAD_IOCH_PUT, 233 OWNER_NONE, OBJECT_NONE, 0, 234 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 235 } 236 237 /* 238 * If this compare function returns zero when two next_run_ticks are equal, 239 * the macro RB_INSERT() returns a pointer to the element with the same 240 * next_run_tick. 241 * 242 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 243 * to remove as a parameter. 244 * 245 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 246 * side by returning 1 when two next_run_ticks are equal. 247 */ 248 static inline int 249 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 250 { 251 if (poller1->next_run_tick < poller2->next_run_tick) { 252 return -1; 253 } else { 254 return 1; 255 } 256 } 257 258 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 259 260 static inline struct spdk_thread * 261 _get_thread(void) 262 { 263 return tls_thread; 264 } 265 266 static int 267 _thread_lib_init(size_t ctx_sz) 268 { 269 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 270 271 g_ctx_sz = ctx_sz; 272 273 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 274 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 275 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 276 sizeof(struct spdk_msg), 277 0, /* No cache. We do our own. */ 278 SPDK_ENV_SOCKET_ID_ANY); 279 280 if (!g_spdk_msg_mempool) { 281 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 282 return -1; 283 } 284 285 return 0; 286 } 287 288 int 289 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 290 { 291 assert(g_new_thread_fn == NULL); 292 assert(g_thread_op_fn == NULL); 293 294 if (new_thread_fn == NULL) { 295 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 296 } else { 297 g_new_thread_fn = new_thread_fn; 298 } 299 300 return _thread_lib_init(ctx_sz); 301 } 302 303 int 304 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 305 spdk_thread_op_supported_fn thread_op_supported_fn, 306 size_t ctx_sz) 307 { 308 assert(g_new_thread_fn == NULL); 309 assert(g_thread_op_fn == NULL); 310 assert(g_thread_op_supported_fn == NULL); 311 312 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 313 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 314 return -EINVAL; 315 } 316 317 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 318 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 319 } else { 320 g_thread_op_fn = thread_op_fn; 321 g_thread_op_supported_fn = thread_op_supported_fn; 322 } 323 324 return _thread_lib_init(ctx_sz); 325 } 326 327 void 328 spdk_thread_lib_fini(void) 329 { 330 struct io_device *dev; 331 332 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 333 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 334 } 335 336 if (g_spdk_msg_mempool) { 337 spdk_mempool_free(g_spdk_msg_mempool); 338 g_spdk_msg_mempool = NULL; 339 } 340 341 g_new_thread_fn = NULL; 342 g_thread_op_fn = NULL; 343 g_thread_op_supported_fn = NULL; 344 g_ctx_sz = 0; 345 } 346 347 static void thread_interrupt_destroy(struct spdk_thread *thread); 348 static int thread_interrupt_create(struct spdk_thread *thread); 349 350 static void 351 _free_thread(struct spdk_thread *thread) 352 { 353 struct spdk_io_channel *ch; 354 struct spdk_msg *msg; 355 struct spdk_poller *poller, *ptmp; 356 357 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 358 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 359 thread->name, ch->dev->name); 360 } 361 362 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 363 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 364 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 365 poller->name); 366 } 367 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 368 free(poller); 369 } 370 371 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 372 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 373 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 374 poller->name); 375 } 376 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 377 free(poller); 378 } 379 380 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 381 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 382 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 383 free(poller); 384 } 385 386 pthread_mutex_lock(&g_devlist_mutex); 387 assert(g_thread_count > 0); 388 g_thread_count--; 389 TAILQ_REMOVE(&g_threads, thread, tailq); 390 pthread_mutex_unlock(&g_devlist_mutex); 391 392 msg = SLIST_FIRST(&thread->msg_cache); 393 while (msg != NULL) { 394 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 395 396 assert(thread->msg_cache_count > 0); 397 thread->msg_cache_count--; 398 spdk_mempool_put(g_spdk_msg_mempool, msg); 399 400 msg = SLIST_FIRST(&thread->msg_cache); 401 } 402 403 assert(thread->msg_cache_count == 0); 404 405 if (spdk_interrupt_mode_is_enabled()) { 406 thread_interrupt_destroy(thread); 407 } 408 409 spdk_ring_free(thread->messages); 410 free(thread); 411 } 412 413 struct spdk_thread * 414 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 415 { 416 struct spdk_thread *thread; 417 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 418 int rc = 0, i; 419 420 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 421 if (!thread) { 422 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 423 return NULL; 424 } 425 426 if (cpumask) { 427 spdk_cpuset_copy(&thread->cpumask, cpumask); 428 } else { 429 spdk_cpuset_negate(&thread->cpumask); 430 } 431 432 RB_INIT(&thread->io_channels); 433 TAILQ_INIT(&thread->active_pollers); 434 RB_INIT(&thread->timed_pollers); 435 TAILQ_INIT(&thread->paused_pollers); 436 SLIST_INIT(&thread->msg_cache); 437 thread->msg_cache_count = 0; 438 439 thread->tsc_last = spdk_get_ticks(); 440 441 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 442 if (!thread->messages) { 443 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 444 free(thread); 445 return NULL; 446 } 447 448 /* Fill the local message pool cache. */ 449 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 450 if (rc == 0) { 451 /* If we can't populate the cache it's ok. The cache will get filled 452 * up organically as messages are passed to the thread. */ 453 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 454 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 455 thread->msg_cache_count++; 456 } 457 } 458 459 if (name) { 460 snprintf(thread->name, sizeof(thread->name), "%s", name); 461 } else { 462 snprintf(thread->name, sizeof(thread->name), "%p", thread); 463 } 464 465 pthread_mutex_lock(&g_devlist_mutex); 466 if (g_thread_id == 0) { 467 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 468 pthread_mutex_unlock(&g_devlist_mutex); 469 _free_thread(thread); 470 return NULL; 471 } 472 thread->id = g_thread_id++; 473 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 474 g_thread_count++; 475 pthread_mutex_unlock(&g_devlist_mutex); 476 477 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 478 thread->id, thread->name); 479 480 if (spdk_interrupt_mode_is_enabled()) { 481 thread->in_interrupt = true; 482 rc = thread_interrupt_create(thread); 483 if (rc != 0) { 484 _free_thread(thread); 485 return NULL; 486 } 487 } 488 489 if (g_new_thread_fn) { 490 rc = g_new_thread_fn(thread); 491 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 492 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 493 } 494 495 if (rc != 0) { 496 _free_thread(thread); 497 return NULL; 498 } 499 500 thread->state = SPDK_THREAD_STATE_RUNNING; 501 502 return thread; 503 } 504 505 void 506 spdk_set_thread(struct spdk_thread *thread) 507 { 508 tls_thread = thread; 509 } 510 511 static void 512 thread_exit(struct spdk_thread *thread, uint64_t now) 513 { 514 struct spdk_poller *poller; 515 struct spdk_io_channel *ch; 516 517 if (now >= thread->exit_timeout_tsc) { 518 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 519 thread->name); 520 goto exited; 521 } 522 523 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 524 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 525 SPDK_INFOLOG(thread, 526 "thread %s still has active poller %s\n", 527 thread->name, poller->name); 528 return; 529 } 530 } 531 532 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 533 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 534 SPDK_INFOLOG(thread, 535 "thread %s still has active timed poller %s\n", 536 thread->name, poller->name); 537 return; 538 } 539 } 540 541 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 542 SPDK_INFOLOG(thread, 543 "thread %s still has paused poller %s\n", 544 thread->name, poller->name); 545 return; 546 } 547 548 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 549 SPDK_INFOLOG(thread, 550 "thread %s still has channel for io_device %s\n", 551 thread->name, ch->dev->name); 552 return; 553 } 554 555 if (thread->pending_unregister_count > 0) { 556 SPDK_INFOLOG(thread, 557 "thread %s is still unregistering io_devices\n", 558 thread->name); 559 return; 560 } 561 562 exited: 563 thread->state = SPDK_THREAD_STATE_EXITED; 564 } 565 566 int 567 spdk_thread_exit(struct spdk_thread *thread) 568 { 569 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 570 571 assert(tls_thread == thread); 572 573 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 574 SPDK_INFOLOG(thread, 575 "thread %s is already exiting\n", 576 thread->name); 577 return 0; 578 } 579 580 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 581 SPDK_THREAD_EXIT_TIMEOUT_SEC); 582 thread->state = SPDK_THREAD_STATE_EXITING; 583 return 0; 584 } 585 586 bool 587 spdk_thread_is_exited(struct spdk_thread *thread) 588 { 589 return thread->state == SPDK_THREAD_STATE_EXITED; 590 } 591 592 void 593 spdk_thread_destroy(struct spdk_thread *thread) 594 { 595 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 596 597 assert(thread->state == SPDK_THREAD_STATE_EXITED); 598 599 if (tls_thread == thread) { 600 tls_thread = NULL; 601 } 602 603 _free_thread(thread); 604 } 605 606 void * 607 spdk_thread_get_ctx(struct spdk_thread *thread) 608 { 609 if (g_ctx_sz > 0) { 610 return thread->ctx; 611 } 612 613 return NULL; 614 } 615 616 struct spdk_cpuset * 617 spdk_thread_get_cpumask(struct spdk_thread *thread) 618 { 619 return &thread->cpumask; 620 } 621 622 int 623 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 624 { 625 struct spdk_thread *thread; 626 627 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 628 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 629 assert(false); 630 return -ENOTSUP; 631 } 632 633 thread = spdk_get_thread(); 634 if (!thread) { 635 SPDK_ERRLOG("Called from non-SPDK thread\n"); 636 assert(false); 637 return -EINVAL; 638 } 639 640 spdk_cpuset_copy(&thread->cpumask, cpumask); 641 642 /* Invoke framework's reschedule operation. If this function is called multiple times 643 * in a single spdk_thread_poll() context, the last cpumask will be used in the 644 * reschedule operation. 645 */ 646 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 647 648 return 0; 649 } 650 651 struct spdk_thread * 652 spdk_thread_get_from_ctx(void *ctx) 653 { 654 if (ctx == NULL) { 655 assert(false); 656 return NULL; 657 } 658 659 assert(g_ctx_sz > 0); 660 661 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 662 } 663 664 static inline uint32_t 665 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 666 { 667 unsigned count, i; 668 void *messages[SPDK_MSG_BATCH_SIZE]; 669 uint64_t notify = 1; 670 int rc; 671 672 #ifdef DEBUG 673 /* 674 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 675 * so we will never actually read uninitialized data from events, but just to be sure 676 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 677 */ 678 memset(messages, 0, sizeof(messages)); 679 #endif 680 681 if (max_msgs > 0) { 682 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 683 } else { 684 max_msgs = SPDK_MSG_BATCH_SIZE; 685 } 686 687 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 688 if (spdk_unlikely(thread->in_interrupt) && 689 spdk_ring_count(thread->messages) != 0) { 690 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 691 if (rc < 0) { 692 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 693 } 694 } 695 if (count == 0) { 696 return 0; 697 } 698 699 for (i = 0; i < count; i++) { 700 struct spdk_msg *msg = messages[i]; 701 702 assert(msg != NULL); 703 msg->fn(msg->arg); 704 705 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 706 /* Insert the messages at the head. We want to re-use the hot 707 * ones. */ 708 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 709 thread->msg_cache_count++; 710 } else { 711 spdk_mempool_put(g_spdk_msg_mempool, msg); 712 } 713 } 714 715 return count; 716 } 717 718 static void 719 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 720 { 721 struct spdk_poller *tmp __attribute__((unused)); 722 723 poller->next_run_tick = now + poller->period_ticks; 724 725 /* 726 * Insert poller in the thread's timed_pollers tree by next scheduled run time 727 * as its key. 728 */ 729 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 730 assert(tmp == NULL); 731 732 /* Update the cache only if it is empty or the inserted poller is earlier than it. 733 * RB_MIN() is not necessary here because all pollers, which has exactly the same 734 * next_run_tick as the existing poller, are inserted on the right side. 735 */ 736 if (thread->first_timed_poller == NULL || 737 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 738 thread->first_timed_poller = poller; 739 } 740 } 741 742 static inline void 743 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 744 { 745 struct spdk_poller *tmp __attribute__((unused)); 746 747 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 748 assert(tmp != NULL); 749 750 /* This function is not used in any case that is performance critical. 751 * Update the cache simply by RB_MIN() if it needs to be changed. 752 */ 753 if (thread->first_timed_poller == poller) { 754 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 755 } 756 } 757 758 static void 759 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 760 { 761 if (poller->period_ticks) { 762 poller_insert_timer(thread, poller, spdk_get_ticks()); 763 } else { 764 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 765 } 766 } 767 768 static inline void 769 thread_update_stats(struct spdk_thread *thread, uint64_t end, 770 uint64_t start, int rc) 771 { 772 if (rc == 0) { 773 /* Poller status idle */ 774 thread->stats.idle_tsc += end - start; 775 } else if (rc > 0) { 776 /* Poller status busy */ 777 thread->stats.busy_tsc += end - start; 778 } 779 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 780 thread->tsc_last = end; 781 } 782 783 static inline int 784 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 785 { 786 int rc; 787 788 switch (poller->state) { 789 case SPDK_POLLER_STATE_UNREGISTERED: 790 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 791 free(poller); 792 return 0; 793 case SPDK_POLLER_STATE_PAUSING: 794 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 795 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 796 poller->state = SPDK_POLLER_STATE_PAUSED; 797 return 0; 798 case SPDK_POLLER_STATE_WAITING: 799 break; 800 default: 801 assert(false); 802 break; 803 } 804 805 poller->state = SPDK_POLLER_STATE_RUNNING; 806 rc = poller->fn(poller->arg); 807 808 poller->run_count++; 809 if (rc > 0) { 810 poller->busy_count++; 811 } 812 813 #ifdef DEBUG 814 if (rc == -1) { 815 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 816 } 817 #endif 818 819 switch (poller->state) { 820 case SPDK_POLLER_STATE_UNREGISTERED: 821 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 822 free(poller); 823 break; 824 case SPDK_POLLER_STATE_PAUSING: 825 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 826 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 827 poller->state = SPDK_POLLER_STATE_PAUSED; 828 break; 829 case SPDK_POLLER_STATE_PAUSED: 830 case SPDK_POLLER_STATE_WAITING: 831 break; 832 case SPDK_POLLER_STATE_RUNNING: 833 poller->state = SPDK_POLLER_STATE_WAITING; 834 break; 835 default: 836 assert(false); 837 break; 838 } 839 840 return rc; 841 } 842 843 static inline int 844 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 845 uint64_t now) 846 { 847 int rc; 848 849 switch (poller->state) { 850 case SPDK_POLLER_STATE_UNREGISTERED: 851 free(poller); 852 return 0; 853 case SPDK_POLLER_STATE_PAUSING: 854 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 855 poller->state = SPDK_POLLER_STATE_PAUSED; 856 return 0; 857 case SPDK_POLLER_STATE_WAITING: 858 break; 859 default: 860 assert(false); 861 break; 862 } 863 864 poller->state = SPDK_POLLER_STATE_RUNNING; 865 rc = poller->fn(poller->arg); 866 867 poller->run_count++; 868 if (rc > 0) { 869 poller->busy_count++; 870 } 871 872 #ifdef DEBUG 873 if (rc == -1) { 874 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 875 } 876 #endif 877 878 switch (poller->state) { 879 case SPDK_POLLER_STATE_UNREGISTERED: 880 free(poller); 881 break; 882 case SPDK_POLLER_STATE_PAUSING: 883 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 884 poller->state = SPDK_POLLER_STATE_PAUSED; 885 break; 886 case SPDK_POLLER_STATE_PAUSED: 887 break; 888 case SPDK_POLLER_STATE_RUNNING: 889 poller->state = SPDK_POLLER_STATE_WAITING; 890 /* fallthrough */ 891 case SPDK_POLLER_STATE_WAITING: 892 poller_insert_timer(thread, poller, now); 893 break; 894 default: 895 assert(false); 896 break; 897 } 898 899 return rc; 900 } 901 902 static int 903 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 904 { 905 uint32_t msg_count; 906 struct spdk_poller *poller, *tmp; 907 spdk_msg_fn critical_msg; 908 int rc = 0; 909 910 thread->tsc_last = now; 911 912 critical_msg = thread->critical_msg; 913 if (spdk_unlikely(critical_msg != NULL)) { 914 critical_msg(NULL); 915 thread->critical_msg = NULL; 916 rc = 1; 917 } 918 919 msg_count = msg_queue_run_batch(thread, max_msgs); 920 if (msg_count) { 921 rc = 1; 922 } 923 924 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 925 active_pollers_head, tailq, tmp) { 926 int poller_rc; 927 928 poller_rc = thread_execute_poller(thread, poller); 929 if (poller_rc > rc) { 930 rc = poller_rc; 931 } 932 } 933 934 poller = thread->first_timed_poller; 935 while (poller != NULL) { 936 int timer_rc = 0; 937 938 if (now < poller->next_run_tick) { 939 break; 940 } 941 942 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 943 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 944 945 /* Update the cache to the next timed poller in the list 946 * only if the current poller is still the closest, otherwise, 947 * do nothing because the cache has been already updated. 948 */ 949 if (thread->first_timed_poller == poller) { 950 thread->first_timed_poller = tmp; 951 } 952 953 timer_rc = thread_execute_timed_poller(thread, poller, now); 954 if (timer_rc > rc) { 955 rc = timer_rc; 956 } 957 958 poller = tmp; 959 } 960 961 return rc; 962 } 963 964 int 965 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 966 { 967 struct spdk_thread *orig_thread; 968 int rc; 969 uint64_t notify = 1; 970 971 orig_thread = _get_thread(); 972 tls_thread = thread; 973 974 if (now == 0) { 975 now = spdk_get_ticks(); 976 } 977 978 if (spdk_likely(!thread->in_interrupt)) { 979 rc = thread_poll(thread, max_msgs, now); 980 if (spdk_unlikely(thread->in_interrupt)) { 981 /* The thread transitioned to interrupt mode during the above poll. 982 * Poll it one more time in case that during the transition time 983 * there is msg received without notification. 984 */ 985 rc = thread_poll(thread, max_msgs, now); 986 } 987 } else { 988 /* Non-block wait on thread's fd_group */ 989 rc = spdk_fd_group_wait(thread->fgrp, 0); 990 if (spdk_unlikely(!thread->in_interrupt)) { 991 /* The thread transitioned to poll mode in a msg during the above processing. 992 * Clear msg_fd since thread messages will be polled directly in poll mode. 993 */ 994 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 995 if (rc < 0 && errno != EAGAIN) { 996 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 997 } 998 } 999 1000 /* Reap unregistered pollers out of poller execution in intr mode */ 1001 if (spdk_unlikely(thread->poller_unregistered)) { 1002 struct spdk_poller *poller, *tmp; 1003 1004 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1005 active_pollers_head, tailq, tmp) { 1006 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1007 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1008 free(poller); 1009 } 1010 } 1011 1012 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1013 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1014 poller_remove_timer(thread, poller); 1015 free(poller); 1016 } 1017 } 1018 1019 thread->poller_unregistered = false; 1020 } 1021 } 1022 1023 1024 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1025 thread_exit(thread, now); 1026 } 1027 1028 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1029 1030 tls_thread = orig_thread; 1031 1032 return rc; 1033 } 1034 1035 uint64_t 1036 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1037 { 1038 struct spdk_poller *poller; 1039 1040 poller = thread->first_timed_poller; 1041 if (poller) { 1042 return poller->next_run_tick; 1043 } 1044 1045 return 0; 1046 } 1047 1048 int 1049 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1050 { 1051 return !TAILQ_EMPTY(&thread->active_pollers); 1052 } 1053 1054 static bool 1055 thread_has_unpaused_pollers(struct spdk_thread *thread) 1056 { 1057 if (TAILQ_EMPTY(&thread->active_pollers) && 1058 RB_EMPTY(&thread->timed_pollers)) { 1059 return false; 1060 } 1061 1062 return true; 1063 } 1064 1065 bool 1066 spdk_thread_has_pollers(struct spdk_thread *thread) 1067 { 1068 if (!thread_has_unpaused_pollers(thread) && 1069 TAILQ_EMPTY(&thread->paused_pollers)) { 1070 return false; 1071 } 1072 1073 return true; 1074 } 1075 1076 bool 1077 spdk_thread_is_idle(struct spdk_thread *thread) 1078 { 1079 if (spdk_ring_count(thread->messages) || 1080 thread_has_unpaused_pollers(thread) || 1081 thread->critical_msg != NULL) { 1082 return false; 1083 } 1084 1085 return true; 1086 } 1087 1088 uint32_t 1089 spdk_thread_get_count(void) 1090 { 1091 /* 1092 * Return cached value of the current thread count. We could acquire the 1093 * lock and iterate through the TAILQ of threads to count them, but that 1094 * count could still be invalidated after we release the lock. 1095 */ 1096 return g_thread_count; 1097 } 1098 1099 struct spdk_thread * 1100 spdk_get_thread(void) 1101 { 1102 return _get_thread(); 1103 } 1104 1105 const char * 1106 spdk_thread_get_name(const struct spdk_thread *thread) 1107 { 1108 return thread->name; 1109 } 1110 1111 uint64_t 1112 spdk_thread_get_id(const struct spdk_thread *thread) 1113 { 1114 return thread->id; 1115 } 1116 1117 struct spdk_thread * 1118 spdk_thread_get_by_id(uint64_t id) 1119 { 1120 struct spdk_thread *thread; 1121 1122 if (id == 0 || id >= g_thread_id) { 1123 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1124 return NULL; 1125 } 1126 pthread_mutex_lock(&g_devlist_mutex); 1127 TAILQ_FOREACH(thread, &g_threads, tailq) { 1128 if (thread->id == id) { 1129 break; 1130 } 1131 } 1132 pthread_mutex_unlock(&g_devlist_mutex); 1133 return thread; 1134 } 1135 1136 int 1137 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1138 { 1139 struct spdk_thread *thread; 1140 1141 thread = _get_thread(); 1142 if (!thread) { 1143 SPDK_ERRLOG("No thread allocated\n"); 1144 return -EINVAL; 1145 } 1146 1147 if (stats == NULL) { 1148 return -EINVAL; 1149 } 1150 1151 *stats = thread->stats; 1152 1153 return 0; 1154 } 1155 1156 uint64_t 1157 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1158 { 1159 if (thread == NULL) { 1160 thread = _get_thread(); 1161 } 1162 1163 return thread->tsc_last; 1164 } 1165 1166 static inline int 1167 thread_send_msg_notification(const struct spdk_thread *target_thread) 1168 { 1169 uint64_t notify = 1; 1170 int rc; 1171 1172 /* Not necessary to do notification if interrupt facility is not enabled */ 1173 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1174 return 0; 1175 } 1176 1177 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1178 * after sending thread msg, it is necessary to check whether target thread runs in 1179 * interrupt mode and then decide whether do event notification. 1180 */ 1181 if (spdk_unlikely(target_thread->in_interrupt)) { 1182 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1183 if (rc < 0) { 1184 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1185 return -EIO; 1186 } 1187 } 1188 1189 return 0; 1190 } 1191 1192 int 1193 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1194 { 1195 struct spdk_thread *local_thread; 1196 struct spdk_msg *msg; 1197 int rc; 1198 1199 assert(thread != NULL); 1200 1201 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1202 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1203 return -EIO; 1204 } 1205 1206 local_thread = _get_thread(); 1207 1208 msg = NULL; 1209 if (local_thread != NULL) { 1210 if (local_thread->msg_cache_count > 0) { 1211 msg = SLIST_FIRST(&local_thread->msg_cache); 1212 assert(msg != NULL); 1213 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1214 local_thread->msg_cache_count--; 1215 } 1216 } 1217 1218 if (msg == NULL) { 1219 msg = spdk_mempool_get(g_spdk_msg_mempool); 1220 if (!msg) { 1221 SPDK_ERRLOG("msg could not be allocated\n"); 1222 return -ENOMEM; 1223 } 1224 } 1225 1226 msg->fn = fn; 1227 msg->arg = ctx; 1228 1229 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1230 if (rc != 1) { 1231 SPDK_ERRLOG("msg could not be enqueued\n"); 1232 spdk_mempool_put(g_spdk_msg_mempool, msg); 1233 return -EIO; 1234 } 1235 1236 return thread_send_msg_notification(thread); 1237 } 1238 1239 int 1240 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1241 { 1242 spdk_msg_fn expected = NULL; 1243 1244 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1245 __ATOMIC_SEQ_CST)) { 1246 return -EIO; 1247 } 1248 1249 return thread_send_msg_notification(thread); 1250 } 1251 1252 #ifdef __linux__ 1253 static int 1254 interrupt_timerfd_process(void *arg) 1255 { 1256 struct spdk_poller *poller = arg; 1257 uint64_t exp; 1258 int rc; 1259 1260 /* clear the level of interval timer */ 1261 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1262 if (rc < 0) { 1263 if (rc == -EAGAIN) { 1264 return 0; 1265 } 1266 1267 return rc; 1268 } 1269 1270 return poller->fn(poller->arg); 1271 } 1272 1273 static int 1274 period_poller_interrupt_init(struct spdk_poller *poller) 1275 { 1276 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1277 int timerfd; 1278 int rc; 1279 1280 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1281 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1282 if (timerfd < 0) { 1283 return -errno; 1284 } 1285 1286 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1287 if (rc < 0) { 1288 close(timerfd); 1289 return rc; 1290 } 1291 1292 poller->interruptfd = timerfd; 1293 return 0; 1294 } 1295 1296 static void 1297 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1298 { 1299 int timerfd = poller->interruptfd; 1300 uint64_t now_tick = spdk_get_ticks(); 1301 uint64_t ticks = spdk_get_ticks_hz(); 1302 int ret; 1303 struct itimerspec new_tv = {}; 1304 struct itimerspec old_tv = {}; 1305 1306 assert(poller->period_ticks != 0); 1307 assert(timerfd >= 0); 1308 1309 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1310 interrupt_mode ? "interrupt" : "poll"); 1311 1312 if (interrupt_mode) { 1313 /* Set repeated timer expiration */ 1314 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1315 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1316 1317 /* Update next timer expiration */ 1318 if (poller->next_run_tick == 0) { 1319 poller->next_run_tick = now_tick + poller->period_ticks; 1320 } else if (poller->next_run_tick < now_tick) { 1321 poller->next_run_tick = now_tick; 1322 } 1323 1324 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1325 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1326 1327 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1328 if (ret < 0) { 1329 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1330 assert(false); 1331 } 1332 } else { 1333 /* Disarm the timer */ 1334 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1335 if (ret < 0) { 1336 /* timerfd_settime's failure indicates that the timerfd is in error */ 1337 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1338 assert(false); 1339 } 1340 1341 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1342 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1343 */ 1344 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1345 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1346 poller_remove_timer(poller->thread, poller); 1347 poller_insert_timer(poller->thread, poller, now_tick); 1348 } 1349 } 1350 1351 static void 1352 poller_interrupt_fini(struct spdk_poller *poller) 1353 { 1354 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1355 assert(poller->interruptfd >= 0); 1356 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1357 close(poller->interruptfd); 1358 poller->interruptfd = -1; 1359 } 1360 1361 static int 1362 busy_poller_interrupt_init(struct spdk_poller *poller) 1363 { 1364 int busy_efd; 1365 int rc; 1366 1367 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1368 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1369 if (busy_efd < 0) { 1370 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1371 return -errno; 1372 } 1373 1374 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1375 poller->fn, poller->arg, poller->name); 1376 if (rc < 0) { 1377 close(busy_efd); 1378 return rc; 1379 } 1380 1381 poller->interruptfd = busy_efd; 1382 return 0; 1383 } 1384 1385 static void 1386 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1387 { 1388 int busy_efd = poller->interruptfd; 1389 uint64_t notify = 1; 1390 int rc __attribute__((unused)); 1391 1392 assert(busy_efd >= 0); 1393 1394 if (interrupt_mode) { 1395 /* Write without read on eventfd will get it repeatedly triggered. */ 1396 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1397 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1398 } 1399 } else { 1400 /* Read on eventfd will clear its level triggering. */ 1401 rc = read(busy_efd, ¬ify, sizeof(notify)); 1402 } 1403 } 1404 1405 #else 1406 1407 static int 1408 period_poller_interrupt_init(struct spdk_poller *poller) 1409 { 1410 return -ENOTSUP; 1411 } 1412 1413 static void 1414 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1415 { 1416 } 1417 1418 static void 1419 poller_interrupt_fini(struct spdk_poller *poller) 1420 { 1421 } 1422 1423 static int 1424 busy_poller_interrupt_init(struct spdk_poller *poller) 1425 { 1426 return -ENOTSUP; 1427 } 1428 1429 static void 1430 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1431 { 1432 } 1433 1434 #endif 1435 1436 void 1437 spdk_poller_register_interrupt(struct spdk_poller *poller, 1438 spdk_poller_set_interrupt_mode_cb cb_fn, 1439 void *cb_arg) 1440 { 1441 assert(poller != NULL); 1442 assert(cb_fn != NULL); 1443 assert(spdk_get_thread() == poller->thread); 1444 1445 if (!spdk_interrupt_mode_is_enabled()) { 1446 return; 1447 } 1448 1449 /* when a poller is created we don't know if the user is ever going to 1450 * enable interrupts on it by calling this function, so the poller 1451 * registration function has to immediately create a interruptfd. 1452 * When this function does get called by user, we have to then destroy 1453 * that interruptfd. 1454 */ 1455 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1456 poller_interrupt_fini(poller); 1457 } 1458 1459 poller->set_intr_cb_fn = cb_fn; 1460 poller->set_intr_cb_arg = cb_arg; 1461 1462 /* Set poller into interrupt mode if thread is in interrupt. */ 1463 if (poller->thread->in_interrupt) { 1464 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1465 } 1466 } 1467 1468 static uint64_t 1469 convert_us_to_ticks(uint64_t us) 1470 { 1471 uint64_t quotient, remainder, ticks; 1472 1473 if (us) { 1474 quotient = us / SPDK_SEC_TO_USEC; 1475 remainder = us % SPDK_SEC_TO_USEC; 1476 ticks = spdk_get_ticks_hz(); 1477 1478 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1479 } else { 1480 return 0; 1481 } 1482 } 1483 1484 static struct spdk_poller * 1485 poller_register(spdk_poller_fn fn, 1486 void *arg, 1487 uint64_t period_microseconds, 1488 const char *name) 1489 { 1490 struct spdk_thread *thread; 1491 struct spdk_poller *poller; 1492 1493 thread = spdk_get_thread(); 1494 if (!thread) { 1495 assert(false); 1496 return NULL; 1497 } 1498 1499 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1500 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1501 return NULL; 1502 } 1503 1504 poller = calloc(1, sizeof(*poller)); 1505 if (poller == NULL) { 1506 SPDK_ERRLOG("Poller memory allocation failed\n"); 1507 return NULL; 1508 } 1509 1510 if (name) { 1511 snprintf(poller->name, sizeof(poller->name), "%s", name); 1512 } else { 1513 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1514 } 1515 1516 poller->state = SPDK_POLLER_STATE_WAITING; 1517 poller->fn = fn; 1518 poller->arg = arg; 1519 poller->thread = thread; 1520 poller->interruptfd = -1; 1521 1522 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1523 1524 if (spdk_interrupt_mode_is_enabled()) { 1525 int rc; 1526 1527 if (period_microseconds) { 1528 rc = period_poller_interrupt_init(poller); 1529 if (rc < 0) { 1530 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1531 free(poller); 1532 return NULL; 1533 } 1534 1535 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1536 } else { 1537 /* If the poller doesn't have a period, create interruptfd that's always 1538 * busy automatically when runnning in interrupt mode. 1539 */ 1540 rc = busy_poller_interrupt_init(poller); 1541 if (rc > 0) { 1542 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1543 free(poller); 1544 return NULL; 1545 } 1546 1547 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1548 } 1549 } 1550 1551 thread_insert_poller(thread, poller); 1552 1553 return poller; 1554 } 1555 1556 struct spdk_poller * 1557 spdk_poller_register(spdk_poller_fn fn, 1558 void *arg, 1559 uint64_t period_microseconds) 1560 { 1561 return poller_register(fn, arg, period_microseconds, NULL); 1562 } 1563 1564 struct spdk_poller * 1565 spdk_poller_register_named(spdk_poller_fn fn, 1566 void *arg, 1567 uint64_t period_microseconds, 1568 const char *name) 1569 { 1570 return poller_register(fn, arg, period_microseconds, name); 1571 } 1572 1573 static void 1574 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1575 struct spdk_thread *curthread) 1576 { 1577 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1578 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1579 thread->name, thread->id); 1580 assert(false); 1581 } 1582 1583 void 1584 spdk_poller_unregister(struct spdk_poller **ppoller) 1585 { 1586 struct spdk_thread *thread; 1587 struct spdk_poller *poller; 1588 1589 poller = *ppoller; 1590 if (poller == NULL) { 1591 return; 1592 } 1593 1594 *ppoller = NULL; 1595 1596 thread = spdk_get_thread(); 1597 if (!thread) { 1598 assert(false); 1599 return; 1600 } 1601 1602 if (poller->thread != thread) { 1603 wrong_thread(__func__, poller->name, poller->thread, thread); 1604 return; 1605 } 1606 1607 if (spdk_interrupt_mode_is_enabled()) { 1608 /* Release the interrupt resource for period or busy poller */ 1609 if (poller->interruptfd >= 0) { 1610 poller_interrupt_fini(poller); 1611 } 1612 1613 /* Mark there is poller unregistered. Then unregistered pollers will 1614 * get reaped by spdk_thread_poll also in intr mode. 1615 */ 1616 thread->poller_unregistered = true; 1617 } 1618 1619 /* If the poller was paused, put it on the active_pollers list so that 1620 * its unregistration can be processed by spdk_thread_poll(). 1621 */ 1622 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1623 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1624 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1625 poller->period_ticks = 0; 1626 } 1627 1628 /* Simply set the state to unregistered. The poller will get cleaned up 1629 * in a subsequent call to spdk_thread_poll(). 1630 */ 1631 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1632 } 1633 1634 void 1635 spdk_poller_pause(struct spdk_poller *poller) 1636 { 1637 struct spdk_thread *thread; 1638 1639 thread = spdk_get_thread(); 1640 if (!thread) { 1641 assert(false); 1642 return; 1643 } 1644 1645 if (poller->thread != thread) { 1646 wrong_thread(__func__, poller->name, poller->thread, thread); 1647 return; 1648 } 1649 1650 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1651 * spdk_thread_poll() move it. It allows a poller to be paused from 1652 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1653 * iteration, or from within itself without breaking the logic to always 1654 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1655 */ 1656 switch (poller->state) { 1657 case SPDK_POLLER_STATE_PAUSED: 1658 case SPDK_POLLER_STATE_PAUSING: 1659 break; 1660 case SPDK_POLLER_STATE_RUNNING: 1661 case SPDK_POLLER_STATE_WAITING: 1662 poller->state = SPDK_POLLER_STATE_PAUSING; 1663 break; 1664 default: 1665 assert(false); 1666 break; 1667 } 1668 } 1669 1670 void 1671 spdk_poller_resume(struct spdk_poller *poller) 1672 { 1673 struct spdk_thread *thread; 1674 1675 thread = spdk_get_thread(); 1676 if (!thread) { 1677 assert(false); 1678 return; 1679 } 1680 1681 if (poller->thread != thread) { 1682 wrong_thread(__func__, poller->name, poller->thread, thread); 1683 return; 1684 } 1685 1686 /* If a poller is paused it has to be removed from the paused pollers 1687 * list and put on the active list or timer tree depending on its 1688 * period_ticks. If a poller is still in the process of being paused, 1689 * we just need to flip its state back to waiting, as it's already on 1690 * the appropriate list or tree. 1691 */ 1692 switch (poller->state) { 1693 case SPDK_POLLER_STATE_PAUSED: 1694 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1695 thread_insert_poller(thread, poller); 1696 /* fallthrough */ 1697 case SPDK_POLLER_STATE_PAUSING: 1698 poller->state = SPDK_POLLER_STATE_WAITING; 1699 break; 1700 case SPDK_POLLER_STATE_RUNNING: 1701 case SPDK_POLLER_STATE_WAITING: 1702 break; 1703 default: 1704 assert(false); 1705 break; 1706 } 1707 } 1708 1709 const char * 1710 spdk_poller_get_name(struct spdk_poller *poller) 1711 { 1712 return poller->name; 1713 } 1714 1715 const char * 1716 spdk_poller_get_state_str(struct spdk_poller *poller) 1717 { 1718 switch (poller->state) { 1719 case SPDK_POLLER_STATE_WAITING: 1720 return "waiting"; 1721 case SPDK_POLLER_STATE_RUNNING: 1722 return "running"; 1723 case SPDK_POLLER_STATE_UNREGISTERED: 1724 return "unregistered"; 1725 case SPDK_POLLER_STATE_PAUSING: 1726 return "pausing"; 1727 case SPDK_POLLER_STATE_PAUSED: 1728 return "paused"; 1729 default: 1730 return NULL; 1731 } 1732 } 1733 1734 uint64_t 1735 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1736 { 1737 return poller->period_ticks; 1738 } 1739 1740 void 1741 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1742 { 1743 stats->run_count = poller->run_count; 1744 stats->busy_count = poller->busy_count; 1745 } 1746 1747 struct spdk_poller * 1748 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1749 { 1750 return TAILQ_FIRST(&thread->active_pollers); 1751 } 1752 1753 struct spdk_poller * 1754 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1755 { 1756 return TAILQ_NEXT(prev, tailq); 1757 } 1758 1759 struct spdk_poller * 1760 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1761 { 1762 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1763 } 1764 1765 struct spdk_poller * 1766 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1767 { 1768 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1769 } 1770 1771 struct spdk_poller * 1772 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1773 { 1774 return TAILQ_FIRST(&thread->paused_pollers); 1775 } 1776 1777 struct spdk_poller * 1778 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1779 { 1780 return TAILQ_NEXT(prev, tailq); 1781 } 1782 1783 struct spdk_io_channel * 1784 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1785 { 1786 return RB_MIN(io_channel_tree, &thread->io_channels); 1787 } 1788 1789 struct spdk_io_channel * 1790 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1791 { 1792 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1793 } 1794 1795 struct call_thread { 1796 struct spdk_thread *cur_thread; 1797 spdk_msg_fn fn; 1798 void *ctx; 1799 1800 struct spdk_thread *orig_thread; 1801 spdk_msg_fn cpl; 1802 }; 1803 1804 static void 1805 _on_thread(void *ctx) 1806 { 1807 struct call_thread *ct = ctx; 1808 int rc __attribute__((unused)); 1809 1810 ct->fn(ct->ctx); 1811 1812 pthread_mutex_lock(&g_devlist_mutex); 1813 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1814 pthread_mutex_unlock(&g_devlist_mutex); 1815 1816 if (!ct->cur_thread) { 1817 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1818 1819 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1820 free(ctx); 1821 } else { 1822 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1823 ct->cur_thread->name); 1824 1825 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1826 } 1827 assert(rc == 0); 1828 } 1829 1830 void 1831 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1832 { 1833 struct call_thread *ct; 1834 struct spdk_thread *thread; 1835 int rc __attribute__((unused)); 1836 1837 ct = calloc(1, sizeof(*ct)); 1838 if (!ct) { 1839 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1840 cpl(ctx); 1841 return; 1842 } 1843 1844 ct->fn = fn; 1845 ct->ctx = ctx; 1846 ct->cpl = cpl; 1847 1848 thread = _get_thread(); 1849 if (!thread) { 1850 SPDK_ERRLOG("No thread allocated\n"); 1851 free(ct); 1852 cpl(ctx); 1853 return; 1854 } 1855 ct->orig_thread = thread; 1856 1857 pthread_mutex_lock(&g_devlist_mutex); 1858 ct->cur_thread = TAILQ_FIRST(&g_threads); 1859 pthread_mutex_unlock(&g_devlist_mutex); 1860 1861 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1862 ct->orig_thread->name); 1863 1864 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1865 assert(rc == 0); 1866 } 1867 1868 static inline void 1869 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1870 { 1871 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1872 return; 1873 } 1874 1875 if (!poller->set_intr_cb_fn) { 1876 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1877 assert(false); 1878 return; 1879 } 1880 1881 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1882 } 1883 1884 void 1885 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1886 { 1887 struct spdk_thread *thread = _get_thread(); 1888 struct spdk_poller *poller, *tmp; 1889 1890 assert(thread); 1891 assert(spdk_interrupt_mode_is_enabled()); 1892 1893 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1894 thread->name, enable_interrupt ? "intr" : "poll", 1895 thread->in_interrupt ? "intr" : "poll"); 1896 1897 if (thread->in_interrupt == enable_interrupt) { 1898 return; 1899 } 1900 1901 /* Set pollers to expected mode */ 1902 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1903 poller_set_interrupt_mode(poller, enable_interrupt); 1904 } 1905 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1906 poller_set_interrupt_mode(poller, enable_interrupt); 1907 } 1908 /* All paused pollers will go to work in interrupt mode */ 1909 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1910 poller_set_interrupt_mode(poller, enable_interrupt); 1911 } 1912 1913 thread->in_interrupt = enable_interrupt; 1914 return; 1915 } 1916 1917 static struct io_device * 1918 io_device_get(void *io_device) 1919 { 1920 struct io_device find = {}; 1921 1922 find.io_device = io_device; 1923 return RB_FIND(io_device_tree, &g_io_devices, &find); 1924 } 1925 1926 void 1927 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1928 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1929 const char *name) 1930 { 1931 struct io_device *dev, *tmp; 1932 struct spdk_thread *thread; 1933 1934 assert(io_device != NULL); 1935 assert(create_cb != NULL); 1936 assert(destroy_cb != NULL); 1937 1938 thread = spdk_get_thread(); 1939 if (!thread) { 1940 SPDK_ERRLOG("called from non-SPDK thread\n"); 1941 assert(false); 1942 return; 1943 } 1944 1945 dev = calloc(1, sizeof(struct io_device)); 1946 if (dev == NULL) { 1947 SPDK_ERRLOG("could not allocate io_device\n"); 1948 return; 1949 } 1950 1951 dev->io_device = io_device; 1952 if (name) { 1953 snprintf(dev->name, sizeof(dev->name), "%s", name); 1954 } else { 1955 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1956 } 1957 dev->create_cb = create_cb; 1958 dev->destroy_cb = destroy_cb; 1959 dev->unregister_cb = NULL; 1960 dev->ctx_size = ctx_size; 1961 dev->for_each_count = 0; 1962 dev->unregistered = false; 1963 dev->refcnt = 0; 1964 1965 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1966 dev->name, dev->io_device, thread->name); 1967 1968 pthread_mutex_lock(&g_devlist_mutex); 1969 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1970 if (tmp != NULL) { 1971 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1972 io_device, tmp->name, dev->name); 1973 pthread_mutex_unlock(&g_devlist_mutex); 1974 free(dev); 1975 } 1976 1977 pthread_mutex_unlock(&g_devlist_mutex); 1978 } 1979 1980 static void 1981 _finish_unregister(void *arg) 1982 { 1983 struct io_device *dev = arg; 1984 struct spdk_thread *thread; 1985 1986 thread = spdk_get_thread(); 1987 assert(thread == dev->unregister_thread); 1988 1989 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1990 dev->name, dev->io_device, thread->name); 1991 1992 assert(thread->pending_unregister_count > 0); 1993 thread->pending_unregister_count--; 1994 1995 dev->unregister_cb(dev->io_device); 1996 free(dev); 1997 } 1998 1999 static void 2000 io_device_free(struct io_device *dev) 2001 { 2002 int rc __attribute__((unused)); 2003 2004 if (dev->unregister_cb == NULL) { 2005 free(dev); 2006 } else { 2007 assert(dev->unregister_thread != NULL); 2008 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2009 dev->name, dev->io_device, dev->unregister_thread->name); 2010 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2011 assert(rc == 0); 2012 } 2013 } 2014 2015 void 2016 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2017 { 2018 struct io_device *dev; 2019 uint32_t refcnt; 2020 struct spdk_thread *thread; 2021 2022 thread = spdk_get_thread(); 2023 if (!thread) { 2024 SPDK_ERRLOG("called from non-SPDK thread\n"); 2025 assert(false); 2026 return; 2027 } 2028 2029 pthread_mutex_lock(&g_devlist_mutex); 2030 dev = io_device_get(io_device); 2031 if (!dev) { 2032 SPDK_ERRLOG("io_device %p not found\n", io_device); 2033 assert(false); 2034 pthread_mutex_unlock(&g_devlist_mutex); 2035 return; 2036 } 2037 2038 if (dev->for_each_count > 0) { 2039 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2040 dev->name, io_device, dev->for_each_count); 2041 pthread_mutex_unlock(&g_devlist_mutex); 2042 return; 2043 } 2044 2045 dev->unregister_cb = unregister_cb; 2046 dev->unregistered = true; 2047 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2048 refcnt = dev->refcnt; 2049 dev->unregister_thread = thread; 2050 pthread_mutex_unlock(&g_devlist_mutex); 2051 2052 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2053 dev->name, dev->io_device, thread->name); 2054 2055 if (unregister_cb) { 2056 thread->pending_unregister_count++; 2057 } 2058 2059 if (refcnt > 0) { 2060 /* defer deletion */ 2061 return; 2062 } 2063 2064 io_device_free(dev); 2065 } 2066 2067 const char * 2068 spdk_io_device_get_name(struct io_device *dev) 2069 { 2070 return dev->name; 2071 } 2072 2073 static struct spdk_io_channel * 2074 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2075 { 2076 struct spdk_io_channel find = {}; 2077 2078 find.dev = dev; 2079 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2080 } 2081 2082 struct spdk_io_channel * 2083 spdk_get_io_channel(void *io_device) 2084 { 2085 struct spdk_io_channel *ch; 2086 struct spdk_thread *thread; 2087 struct io_device *dev; 2088 int rc; 2089 2090 pthread_mutex_lock(&g_devlist_mutex); 2091 dev = io_device_get(io_device); 2092 if (dev == NULL) { 2093 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2094 pthread_mutex_unlock(&g_devlist_mutex); 2095 return NULL; 2096 } 2097 2098 thread = _get_thread(); 2099 if (!thread) { 2100 SPDK_ERRLOG("No thread allocated\n"); 2101 pthread_mutex_unlock(&g_devlist_mutex); 2102 return NULL; 2103 } 2104 2105 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2106 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2107 pthread_mutex_unlock(&g_devlist_mutex); 2108 return NULL; 2109 } 2110 2111 ch = thread_get_io_channel(thread, dev); 2112 if (ch != NULL) { 2113 ch->ref++; 2114 2115 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2116 ch, dev->name, dev->io_device, thread->name, ch->ref); 2117 2118 /* 2119 * An I/O channel already exists for this device on this 2120 * thread, so return it. 2121 */ 2122 pthread_mutex_unlock(&g_devlist_mutex); 2123 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2124 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2125 return ch; 2126 } 2127 2128 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2129 if (ch == NULL) { 2130 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2131 pthread_mutex_unlock(&g_devlist_mutex); 2132 return NULL; 2133 } 2134 2135 ch->dev = dev; 2136 ch->destroy_cb = dev->destroy_cb; 2137 ch->thread = thread; 2138 ch->ref = 1; 2139 ch->destroy_ref = 0; 2140 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2141 2142 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2143 ch, dev->name, dev->io_device, thread->name, ch->ref); 2144 2145 dev->refcnt++; 2146 2147 pthread_mutex_unlock(&g_devlist_mutex); 2148 2149 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2150 if (rc != 0) { 2151 pthread_mutex_lock(&g_devlist_mutex); 2152 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2153 dev->refcnt--; 2154 free(ch); 2155 pthread_mutex_unlock(&g_devlist_mutex); 2156 return NULL; 2157 } 2158 2159 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2160 return ch; 2161 } 2162 2163 static void 2164 put_io_channel(void *arg) 2165 { 2166 struct spdk_io_channel *ch = arg; 2167 bool do_remove_dev = true; 2168 struct spdk_thread *thread; 2169 2170 thread = spdk_get_thread(); 2171 if (!thread) { 2172 SPDK_ERRLOG("called from non-SPDK thread\n"); 2173 assert(false); 2174 return; 2175 } 2176 2177 SPDK_DEBUGLOG(thread, 2178 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2179 ch, ch->dev->name, ch->dev->io_device, thread->name); 2180 2181 assert(ch->thread == thread); 2182 2183 ch->destroy_ref--; 2184 2185 if (ch->ref > 0 || ch->destroy_ref > 0) { 2186 /* 2187 * Another reference to the associated io_device was requested 2188 * after this message was sent but before it had a chance to 2189 * execute. 2190 */ 2191 return; 2192 } 2193 2194 pthread_mutex_lock(&g_devlist_mutex); 2195 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2196 pthread_mutex_unlock(&g_devlist_mutex); 2197 2198 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2199 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2200 2201 pthread_mutex_lock(&g_devlist_mutex); 2202 ch->dev->refcnt--; 2203 2204 if (!ch->dev->unregistered) { 2205 do_remove_dev = false; 2206 } 2207 2208 if (ch->dev->refcnt > 0) { 2209 do_remove_dev = false; 2210 } 2211 2212 pthread_mutex_unlock(&g_devlist_mutex); 2213 2214 if (do_remove_dev) { 2215 io_device_free(ch->dev); 2216 } 2217 free(ch); 2218 } 2219 2220 void 2221 spdk_put_io_channel(struct spdk_io_channel *ch) 2222 { 2223 struct spdk_thread *thread; 2224 int rc __attribute__((unused)); 2225 2226 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2227 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2228 2229 thread = spdk_get_thread(); 2230 if (!thread) { 2231 SPDK_ERRLOG("called from non-SPDK thread\n"); 2232 assert(false); 2233 return; 2234 } 2235 2236 if (ch->thread != thread) { 2237 wrong_thread(__func__, "ch", ch->thread, thread); 2238 return; 2239 } 2240 2241 SPDK_DEBUGLOG(thread, 2242 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2243 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2244 2245 ch->ref--; 2246 2247 if (ch->ref == 0) { 2248 ch->destroy_ref++; 2249 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2250 assert(rc == 0); 2251 } 2252 } 2253 2254 struct spdk_io_channel * 2255 spdk_io_channel_from_ctx(void *ctx) 2256 { 2257 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2258 } 2259 2260 struct spdk_thread * 2261 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2262 { 2263 return ch->thread; 2264 } 2265 2266 void * 2267 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2268 { 2269 return ch->dev->io_device; 2270 } 2271 2272 const char * 2273 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2274 { 2275 return spdk_io_device_get_name(ch->dev); 2276 } 2277 2278 int 2279 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2280 { 2281 return ch->ref; 2282 } 2283 2284 struct spdk_io_channel_iter { 2285 void *io_device; 2286 struct io_device *dev; 2287 spdk_channel_msg fn; 2288 int status; 2289 void *ctx; 2290 struct spdk_io_channel *ch; 2291 2292 struct spdk_thread *cur_thread; 2293 2294 struct spdk_thread *orig_thread; 2295 spdk_channel_for_each_cpl cpl; 2296 }; 2297 2298 void * 2299 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2300 { 2301 return i->io_device; 2302 } 2303 2304 struct spdk_io_channel * 2305 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2306 { 2307 return i->ch; 2308 } 2309 2310 void * 2311 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2312 { 2313 return i->ctx; 2314 } 2315 2316 static void 2317 _call_completion(void *ctx) 2318 { 2319 struct spdk_io_channel_iter *i = ctx; 2320 2321 if (i->cpl != NULL) { 2322 i->cpl(i, i->status); 2323 } 2324 free(i); 2325 } 2326 2327 static void 2328 _call_channel(void *ctx) 2329 { 2330 struct spdk_io_channel_iter *i = ctx; 2331 struct spdk_io_channel *ch; 2332 2333 /* 2334 * It is possible that the channel was deleted before this 2335 * message had a chance to execute. If so, skip calling 2336 * the fn() on this thread. 2337 */ 2338 pthread_mutex_lock(&g_devlist_mutex); 2339 ch = thread_get_io_channel(i->cur_thread, i->dev); 2340 pthread_mutex_unlock(&g_devlist_mutex); 2341 2342 if (ch) { 2343 i->fn(i); 2344 } else { 2345 spdk_for_each_channel_continue(i, 0); 2346 } 2347 } 2348 2349 void 2350 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2351 spdk_channel_for_each_cpl cpl) 2352 { 2353 struct spdk_thread *thread; 2354 struct spdk_io_channel *ch; 2355 struct spdk_io_channel_iter *i; 2356 int rc __attribute__((unused)); 2357 2358 i = calloc(1, sizeof(*i)); 2359 if (!i) { 2360 SPDK_ERRLOG("Unable to allocate iterator\n"); 2361 return; 2362 } 2363 2364 i->io_device = io_device; 2365 i->fn = fn; 2366 i->ctx = ctx; 2367 i->cpl = cpl; 2368 i->orig_thread = _get_thread(); 2369 2370 pthread_mutex_lock(&g_devlist_mutex); 2371 i->dev = io_device_get(io_device); 2372 if (i->dev == NULL) { 2373 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2374 assert(false); 2375 goto end; 2376 } 2377 2378 TAILQ_FOREACH(thread, &g_threads, tailq) { 2379 ch = thread_get_io_channel(thread, i->dev); 2380 if (ch != NULL) { 2381 ch->dev->for_each_count++; 2382 i->cur_thread = thread; 2383 i->ch = ch; 2384 pthread_mutex_unlock(&g_devlist_mutex); 2385 rc = spdk_thread_send_msg(thread, _call_channel, i); 2386 assert(rc == 0); 2387 return; 2388 } 2389 } 2390 2391 end: 2392 pthread_mutex_unlock(&g_devlist_mutex); 2393 2394 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2395 assert(rc == 0); 2396 } 2397 2398 void 2399 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2400 { 2401 struct spdk_thread *thread; 2402 struct spdk_io_channel *ch; 2403 int rc __attribute__((unused)); 2404 2405 assert(i->cur_thread == spdk_get_thread()); 2406 2407 i->status = status; 2408 2409 pthread_mutex_lock(&g_devlist_mutex); 2410 if (status) { 2411 goto end; 2412 } 2413 thread = TAILQ_NEXT(i->cur_thread, tailq); 2414 while (thread) { 2415 ch = thread_get_io_channel(thread, i->dev); 2416 if (ch != NULL) { 2417 i->cur_thread = thread; 2418 i->ch = ch; 2419 pthread_mutex_unlock(&g_devlist_mutex); 2420 rc = spdk_thread_send_msg(thread, _call_channel, i); 2421 assert(rc == 0); 2422 return; 2423 } 2424 thread = TAILQ_NEXT(thread, tailq); 2425 } 2426 2427 end: 2428 i->dev->for_each_count--; 2429 i->ch = NULL; 2430 pthread_mutex_unlock(&g_devlist_mutex); 2431 2432 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2433 assert(rc == 0); 2434 } 2435 2436 struct spdk_interrupt { 2437 int efd; 2438 struct spdk_thread *thread; 2439 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2440 }; 2441 2442 static void 2443 thread_interrupt_destroy(struct spdk_thread *thread) 2444 { 2445 struct spdk_fd_group *fgrp = thread->fgrp; 2446 2447 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2448 2449 if (thread->msg_fd < 0) { 2450 return; 2451 } 2452 2453 spdk_fd_group_remove(fgrp, thread->msg_fd); 2454 close(thread->msg_fd); 2455 thread->msg_fd = -1; 2456 2457 spdk_fd_group_destroy(fgrp); 2458 thread->fgrp = NULL; 2459 } 2460 2461 #ifdef __linux__ 2462 static int 2463 thread_interrupt_msg_process(void *arg) 2464 { 2465 struct spdk_thread *thread = arg; 2466 uint32_t msg_count; 2467 spdk_msg_fn critical_msg; 2468 int rc = 0; 2469 uint64_t notify = 1; 2470 2471 assert(spdk_interrupt_mode_is_enabled()); 2472 2473 /* There may be race between msg_acknowledge and another producer's msg_notify, 2474 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2475 * This can avoid msg notification missing. 2476 */ 2477 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2478 if (rc < 0 && errno != EAGAIN) { 2479 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2480 } 2481 2482 critical_msg = thread->critical_msg; 2483 if (spdk_unlikely(critical_msg != NULL)) { 2484 critical_msg(NULL); 2485 thread->critical_msg = NULL; 2486 rc = 1; 2487 } 2488 2489 msg_count = msg_queue_run_batch(thread, 0); 2490 if (msg_count) { 2491 rc = 1; 2492 } 2493 2494 return rc; 2495 } 2496 2497 static int 2498 thread_interrupt_create(struct spdk_thread *thread) 2499 { 2500 int rc; 2501 2502 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2503 2504 rc = spdk_fd_group_create(&thread->fgrp); 2505 if (rc) { 2506 return rc; 2507 } 2508 2509 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2510 if (thread->msg_fd < 0) { 2511 rc = -errno; 2512 spdk_fd_group_destroy(thread->fgrp); 2513 thread->fgrp = NULL; 2514 2515 return rc; 2516 } 2517 2518 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2519 thread_interrupt_msg_process, thread); 2520 } 2521 #else 2522 static int 2523 thread_interrupt_create(struct spdk_thread *thread) 2524 { 2525 return -ENOTSUP; 2526 } 2527 #endif 2528 2529 struct spdk_interrupt * 2530 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2531 void *arg, const char *name) 2532 { 2533 struct spdk_thread *thread; 2534 struct spdk_interrupt *intr; 2535 int ret; 2536 2537 thread = spdk_get_thread(); 2538 if (!thread) { 2539 assert(false); 2540 return NULL; 2541 } 2542 2543 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2544 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2545 return NULL; 2546 } 2547 2548 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2549 2550 if (ret != 0) { 2551 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2552 thread->name, efd, spdk_strerror(-ret)); 2553 return NULL; 2554 } 2555 2556 intr = calloc(1, sizeof(*intr)); 2557 if (intr == NULL) { 2558 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2559 return NULL; 2560 } 2561 2562 if (name) { 2563 snprintf(intr->name, sizeof(intr->name), "%s", name); 2564 } else { 2565 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2566 } 2567 2568 intr->efd = efd; 2569 intr->thread = thread; 2570 2571 return intr; 2572 } 2573 2574 void 2575 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2576 { 2577 struct spdk_thread *thread; 2578 struct spdk_interrupt *intr; 2579 2580 intr = *pintr; 2581 if (intr == NULL) { 2582 return; 2583 } 2584 2585 *pintr = NULL; 2586 2587 thread = spdk_get_thread(); 2588 if (!thread) { 2589 assert(false); 2590 return; 2591 } 2592 2593 if (intr->thread != thread) { 2594 wrong_thread(__func__, intr->name, intr->thread, thread); 2595 return; 2596 } 2597 2598 spdk_fd_group_remove(thread->fgrp, intr->efd); 2599 free(intr); 2600 } 2601 2602 int 2603 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2604 enum spdk_interrupt_event_types event_types) 2605 { 2606 struct spdk_thread *thread; 2607 2608 thread = spdk_get_thread(); 2609 if (!thread) { 2610 assert(false); 2611 return -EINVAL; 2612 } 2613 2614 if (intr->thread != thread) { 2615 wrong_thread(__func__, intr->name, intr->thread, thread); 2616 return -EINVAL; 2617 } 2618 2619 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2620 } 2621 2622 int 2623 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2624 { 2625 return spdk_fd_group_get_fd(thread->fgrp); 2626 } 2627 2628 static bool g_interrupt_mode = false; 2629 2630 int 2631 spdk_interrupt_mode_enable(void) 2632 { 2633 /* It must be called once prior to initializing the threading library. 2634 * g_spdk_msg_mempool will be valid if thread library is initialized. 2635 */ 2636 if (g_spdk_msg_mempool) { 2637 SPDK_ERRLOG("Failed due to threading library is already initailzied.\n"); 2638 return -1; 2639 } 2640 2641 #ifdef __linux__ 2642 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2643 g_interrupt_mode = true; 2644 return 0; 2645 #else 2646 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2647 g_interrupt_mode = false; 2648 return -ENOTSUP; 2649 #endif 2650 } 2651 2652 bool 2653 spdk_interrupt_mode_is_enabled(void) 2654 { 2655 return g_interrupt_mode; 2656 } 2657 2658 SPDK_LOG_REGISTER_COMPONENT(thread) 2659