1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/trace.h" 42 #include "spdk/util.h" 43 #include "spdk/fd_group.h" 44 45 #include "spdk/log.h" 46 #include "spdk_internal/thread.h" 47 #include "thread_internal.h" 48 49 #ifdef __linux__ 50 #include <sys/timerfd.h> 51 #include <sys/eventfd.h> 52 #endif 53 54 #define SPDK_MSG_BATCH_SIZE 8 55 #define SPDK_MAX_DEVICE_NAME_LEN 256 56 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 57 #define SPDK_MAX_POLLER_NAME_LEN 256 58 #define SPDK_MAX_THREAD_NAME_LEN 256 59 60 enum spdk_poller_state { 61 /* The poller is registered with a thread but not currently executing its fn. */ 62 SPDK_POLLER_STATE_WAITING, 63 64 /* The poller is currently running its fn. */ 65 SPDK_POLLER_STATE_RUNNING, 66 67 /* The poller was unregistered during the execution of its fn. */ 68 SPDK_POLLER_STATE_UNREGISTERED, 69 70 /* The poller is in the process of being paused. It will be paused 71 * during the next time it's supposed to be executed. 72 */ 73 SPDK_POLLER_STATE_PAUSING, 74 75 /* The poller is registered but currently paused. It's on the 76 * paused_pollers list. 77 */ 78 SPDK_POLLER_STATE_PAUSED, 79 }; 80 81 struct spdk_poller { 82 TAILQ_ENTRY(spdk_poller) tailq; 83 RB_ENTRY(spdk_poller) node; 84 85 /* Current state of the poller; should only be accessed from the poller's thread. */ 86 enum spdk_poller_state state; 87 88 uint64_t period_ticks; 89 uint64_t next_run_tick; 90 uint64_t run_count; 91 uint64_t busy_count; 92 spdk_poller_fn fn; 93 void *arg; 94 struct spdk_thread *thread; 95 int interruptfd; 96 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 97 void *set_intr_cb_arg; 98 99 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 100 }; 101 102 enum spdk_thread_state { 103 /* The thread is pocessing poller and message by spdk_thread_poll(). */ 104 SPDK_THREAD_STATE_RUNNING, 105 106 /* The thread is in the process of termination. It reaps unregistering 107 * poller are releasing I/O channel. 108 */ 109 SPDK_THREAD_STATE_EXITING, 110 111 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 112 SPDK_THREAD_STATE_EXITED, 113 }; 114 115 struct spdk_thread { 116 uint64_t tsc_last; 117 struct spdk_thread_stats stats; 118 /* 119 * Contains pollers actively running on this thread. Pollers 120 * are run round-robin. The thread takes one poller from the head 121 * of the ring, executes it, then puts it back at the tail of 122 * the ring. 123 */ 124 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 125 /** 126 * Contains pollers running on this thread with a periodic timer. 127 */ 128 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 129 struct spdk_poller *first_timed_poller; 130 /* 131 * Contains paused pollers. Pollers on this queue are waiting until 132 * they are resumed (in which case they're put onto the active/timer 133 * queues) or unregistered. 134 */ 135 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 136 struct spdk_ring *messages; 137 int msg_fd; 138 SLIST_HEAD(, spdk_msg) msg_cache; 139 size_t msg_cache_count; 140 spdk_msg_fn critical_msg; 141 uint64_t id; 142 enum spdk_thread_state state; 143 int pending_unregister_count; 144 145 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 146 TAILQ_ENTRY(spdk_thread) tailq; 147 148 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 149 struct spdk_cpuset cpumask; 150 uint64_t exit_timeout_tsc; 151 152 /* Indicates whether this spdk_thread currently runs in interrupt. */ 153 bool in_interrupt; 154 struct spdk_fd_group *fgrp; 155 156 /* User context allocated at the end */ 157 uint8_t ctx[0]; 158 }; 159 160 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 161 162 static spdk_new_thread_fn g_new_thread_fn = NULL; 163 static spdk_thread_op_fn g_thread_op_fn = NULL; 164 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 165 static size_t g_ctx_sz = 0; 166 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 167 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 168 * SPDK application is required. 169 */ 170 static uint64_t g_thread_id = 1; 171 172 struct io_device { 173 void *io_device; 174 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 175 spdk_io_channel_create_cb create_cb; 176 spdk_io_channel_destroy_cb destroy_cb; 177 spdk_io_device_unregister_cb unregister_cb; 178 struct spdk_thread *unregister_thread; 179 uint32_t ctx_size; 180 uint32_t for_each_count; 181 RB_ENTRY(io_device) node; 182 183 uint32_t refcnt; 184 185 bool unregistered; 186 }; 187 188 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 189 190 static int 191 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 192 { 193 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 194 } 195 196 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 197 198 static int 199 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 200 { 201 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 202 } 203 204 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 205 206 struct spdk_msg { 207 spdk_msg_fn fn; 208 void *arg; 209 210 SLIST_ENTRY(spdk_msg) link; 211 }; 212 213 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 214 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 215 216 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 217 static uint32_t g_thread_count = 0; 218 219 static __thread struct spdk_thread *tls_thread = NULL; 220 221 #define TRACE_GROUP_THREAD 0xa 222 #define TRACE_THREAD_IOCH_GET SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x0) 223 #define TRACE_THREAD_IOCH_PUT SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x1) 224 225 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 226 { 227 spdk_trace_register_description("THREAD_IOCH_GET", 228 TRACE_THREAD_IOCH_GET, 229 OWNER_NONE, OBJECT_NONE, 0, 230 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 231 spdk_trace_register_description("THREAD_IOCH_PUT", 232 TRACE_THREAD_IOCH_PUT, 233 OWNER_NONE, OBJECT_NONE, 0, 234 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 235 } 236 237 /* 238 * If this compare function returns zero when two next_run_ticks are equal, 239 * the macro RB_INSERT() returns a pointer to the element with the same 240 * next_run_tick. 241 * 242 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 243 * to remove as a parameter. 244 * 245 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 246 * side by returning 1 when two next_run_ticks are equal. 247 */ 248 static inline int 249 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 250 { 251 if (poller1->next_run_tick < poller2->next_run_tick) { 252 return -1; 253 } else { 254 return 1; 255 } 256 } 257 258 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 259 260 static inline struct spdk_thread * 261 _get_thread(void) 262 { 263 return tls_thread; 264 } 265 266 static int 267 _thread_lib_init(size_t ctx_sz) 268 { 269 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 270 271 g_ctx_sz = ctx_sz; 272 273 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 274 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 275 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 276 sizeof(struct spdk_msg), 277 0, /* No cache. We do our own. */ 278 SPDK_ENV_SOCKET_ID_ANY); 279 280 if (!g_spdk_msg_mempool) { 281 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 282 return -1; 283 } 284 285 return 0; 286 } 287 288 int 289 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 290 { 291 assert(g_new_thread_fn == NULL); 292 assert(g_thread_op_fn == NULL); 293 294 if (new_thread_fn == NULL) { 295 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 296 } else { 297 g_new_thread_fn = new_thread_fn; 298 } 299 300 return _thread_lib_init(ctx_sz); 301 } 302 303 int 304 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 305 spdk_thread_op_supported_fn thread_op_supported_fn, 306 size_t ctx_sz) 307 { 308 assert(g_new_thread_fn == NULL); 309 assert(g_thread_op_fn == NULL); 310 assert(g_thread_op_supported_fn == NULL); 311 312 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 313 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 314 return -EINVAL; 315 } 316 317 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 318 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 319 } else { 320 g_thread_op_fn = thread_op_fn; 321 g_thread_op_supported_fn = thread_op_supported_fn; 322 } 323 324 return _thread_lib_init(ctx_sz); 325 } 326 327 void 328 spdk_thread_lib_fini(void) 329 { 330 struct io_device *dev; 331 332 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 333 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 334 } 335 336 if (g_spdk_msg_mempool) { 337 spdk_mempool_free(g_spdk_msg_mempool); 338 g_spdk_msg_mempool = NULL; 339 } 340 341 g_new_thread_fn = NULL; 342 g_thread_op_fn = NULL; 343 g_thread_op_supported_fn = NULL; 344 g_ctx_sz = 0; 345 } 346 347 static void thread_interrupt_destroy(struct spdk_thread *thread); 348 static int thread_interrupt_create(struct spdk_thread *thread); 349 350 static void 351 _free_thread(struct spdk_thread *thread) 352 { 353 struct spdk_io_channel *ch; 354 struct spdk_msg *msg; 355 struct spdk_poller *poller, *ptmp; 356 357 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 358 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 359 thread->name, ch->dev->name); 360 } 361 362 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 363 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 364 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 365 poller->name); 366 } 367 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 368 free(poller); 369 } 370 371 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 372 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 373 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 374 poller->name); 375 } 376 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 377 free(poller); 378 } 379 380 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 381 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 382 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 383 free(poller); 384 } 385 386 pthread_mutex_lock(&g_devlist_mutex); 387 assert(g_thread_count > 0); 388 g_thread_count--; 389 TAILQ_REMOVE(&g_threads, thread, tailq); 390 pthread_mutex_unlock(&g_devlist_mutex); 391 392 msg = SLIST_FIRST(&thread->msg_cache); 393 while (msg != NULL) { 394 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 395 396 assert(thread->msg_cache_count > 0); 397 thread->msg_cache_count--; 398 spdk_mempool_put(g_spdk_msg_mempool, msg); 399 400 msg = SLIST_FIRST(&thread->msg_cache); 401 } 402 403 assert(thread->msg_cache_count == 0); 404 405 if (spdk_interrupt_mode_is_enabled()) { 406 thread_interrupt_destroy(thread); 407 } 408 409 spdk_ring_free(thread->messages); 410 free(thread); 411 } 412 413 struct spdk_thread * 414 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 415 { 416 struct spdk_thread *thread; 417 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 418 int rc = 0, i; 419 420 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 421 if (!thread) { 422 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 423 return NULL; 424 } 425 426 if (cpumask) { 427 spdk_cpuset_copy(&thread->cpumask, cpumask); 428 } else { 429 spdk_cpuset_negate(&thread->cpumask); 430 } 431 432 RB_INIT(&thread->io_channels); 433 TAILQ_INIT(&thread->active_pollers); 434 RB_INIT(&thread->timed_pollers); 435 TAILQ_INIT(&thread->paused_pollers); 436 SLIST_INIT(&thread->msg_cache); 437 thread->msg_cache_count = 0; 438 439 thread->tsc_last = spdk_get_ticks(); 440 441 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 442 if (!thread->messages) { 443 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 444 free(thread); 445 return NULL; 446 } 447 448 /* Fill the local message pool cache. */ 449 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 450 if (rc == 0) { 451 /* If we can't populate the cache it's ok. The cache will get filled 452 * up organically as messages are passed to the thread. */ 453 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 454 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 455 thread->msg_cache_count++; 456 } 457 } 458 459 if (name) { 460 snprintf(thread->name, sizeof(thread->name), "%s", name); 461 } else { 462 snprintf(thread->name, sizeof(thread->name), "%p", thread); 463 } 464 465 pthread_mutex_lock(&g_devlist_mutex); 466 if (g_thread_id == 0) { 467 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 468 pthread_mutex_unlock(&g_devlist_mutex); 469 _free_thread(thread); 470 return NULL; 471 } 472 thread->id = g_thread_id++; 473 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 474 g_thread_count++; 475 pthread_mutex_unlock(&g_devlist_mutex); 476 477 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 478 thread->id, thread->name); 479 480 if (spdk_interrupt_mode_is_enabled()) { 481 thread->in_interrupt = true; 482 rc = thread_interrupt_create(thread); 483 if (rc != 0) { 484 _free_thread(thread); 485 return NULL; 486 } 487 } 488 489 if (g_new_thread_fn) { 490 rc = g_new_thread_fn(thread); 491 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 492 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 493 } 494 495 if (rc != 0) { 496 _free_thread(thread); 497 return NULL; 498 } 499 500 thread->state = SPDK_THREAD_STATE_RUNNING; 501 502 return thread; 503 } 504 505 void 506 spdk_set_thread(struct spdk_thread *thread) 507 { 508 tls_thread = thread; 509 } 510 511 static void 512 thread_exit(struct spdk_thread *thread, uint64_t now) 513 { 514 struct spdk_poller *poller; 515 struct spdk_io_channel *ch; 516 517 if (now >= thread->exit_timeout_tsc) { 518 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 519 thread->name); 520 goto exited; 521 } 522 523 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 524 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 525 SPDK_INFOLOG(thread, 526 "thread %s still has active poller %s\n", 527 thread->name, poller->name); 528 return; 529 } 530 } 531 532 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 533 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 534 SPDK_INFOLOG(thread, 535 "thread %s still has active timed poller %s\n", 536 thread->name, poller->name); 537 return; 538 } 539 } 540 541 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 542 SPDK_INFOLOG(thread, 543 "thread %s still has paused poller %s\n", 544 thread->name, poller->name); 545 return; 546 } 547 548 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 549 SPDK_INFOLOG(thread, 550 "thread %s still has channel for io_device %s\n", 551 thread->name, ch->dev->name); 552 return; 553 } 554 555 if (thread->pending_unregister_count > 0) { 556 SPDK_INFOLOG(thread, 557 "thread %s is still unregistering io_devices\n", 558 thread->name); 559 return; 560 } 561 562 exited: 563 thread->state = SPDK_THREAD_STATE_EXITED; 564 } 565 566 int 567 spdk_thread_exit(struct spdk_thread *thread) 568 { 569 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 570 571 assert(tls_thread == thread); 572 573 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 574 SPDK_INFOLOG(thread, 575 "thread %s is already exiting\n", 576 thread->name); 577 return 0; 578 } 579 580 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 581 SPDK_THREAD_EXIT_TIMEOUT_SEC); 582 thread->state = SPDK_THREAD_STATE_EXITING; 583 return 0; 584 } 585 586 bool 587 spdk_thread_is_exited(struct spdk_thread *thread) 588 { 589 return thread->state == SPDK_THREAD_STATE_EXITED; 590 } 591 592 void 593 spdk_thread_destroy(struct spdk_thread *thread) 594 { 595 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 596 597 assert(thread->state == SPDK_THREAD_STATE_EXITED); 598 599 if (tls_thread == thread) { 600 tls_thread = NULL; 601 } 602 603 _free_thread(thread); 604 } 605 606 void * 607 spdk_thread_get_ctx(struct spdk_thread *thread) 608 { 609 if (g_ctx_sz > 0) { 610 return thread->ctx; 611 } 612 613 return NULL; 614 } 615 616 struct spdk_cpuset * 617 spdk_thread_get_cpumask(struct spdk_thread *thread) 618 { 619 return &thread->cpumask; 620 } 621 622 int 623 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 624 { 625 struct spdk_thread *thread; 626 627 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 628 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 629 assert(false); 630 return -ENOTSUP; 631 } 632 633 thread = spdk_get_thread(); 634 if (!thread) { 635 SPDK_ERRLOG("Called from non-SPDK thread\n"); 636 assert(false); 637 return -EINVAL; 638 } 639 640 spdk_cpuset_copy(&thread->cpumask, cpumask); 641 642 /* Invoke framework's reschedule operation. If this function is called multiple times 643 * in a single spdk_thread_poll() context, the last cpumask will be used in the 644 * reschedule operation. 645 */ 646 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 647 648 return 0; 649 } 650 651 struct spdk_thread * 652 spdk_thread_get_from_ctx(void *ctx) 653 { 654 if (ctx == NULL) { 655 assert(false); 656 return NULL; 657 } 658 659 assert(g_ctx_sz > 0); 660 661 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 662 } 663 664 static inline uint32_t 665 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 666 { 667 unsigned count, i; 668 void *messages[SPDK_MSG_BATCH_SIZE]; 669 uint64_t notify = 1; 670 int rc; 671 672 #ifdef DEBUG 673 /* 674 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 675 * so we will never actually read uninitialized data from events, but just to be sure 676 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 677 */ 678 memset(messages, 0, sizeof(messages)); 679 #endif 680 681 if (max_msgs > 0) { 682 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 683 } else { 684 max_msgs = SPDK_MSG_BATCH_SIZE; 685 } 686 687 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 688 if (spdk_unlikely(thread->in_interrupt) && 689 spdk_ring_count(thread->messages) != 0) { 690 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 691 if (rc < 0) { 692 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 693 } 694 } 695 if (count == 0) { 696 return 0; 697 } 698 699 for (i = 0; i < count; i++) { 700 struct spdk_msg *msg = messages[i]; 701 702 assert(msg != NULL); 703 msg->fn(msg->arg); 704 705 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 706 /* Insert the messages at the head. We want to re-use the hot 707 * ones. */ 708 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 709 thread->msg_cache_count++; 710 } else { 711 spdk_mempool_put(g_spdk_msg_mempool, msg); 712 } 713 } 714 715 return count; 716 } 717 718 static void 719 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 720 { 721 struct spdk_poller *tmp __attribute__((unused)); 722 723 poller->next_run_tick = now + poller->period_ticks; 724 725 /* 726 * Insert poller in the thread's timed_pollers tree by next scheduled run time 727 * as its key. 728 */ 729 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 730 assert(tmp == NULL); 731 732 /* Update the cache only if it is empty or the inserted poller is earlier than it. 733 * RB_MIN() is not necessary here because all pollers, which has exactly the same 734 * next_run_tick as the existing poller, are inserted on the right side. 735 */ 736 if (thread->first_timed_poller == NULL || 737 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 738 thread->first_timed_poller = poller; 739 } 740 } 741 742 #ifdef __linux__ 743 static inline void 744 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 745 { 746 struct spdk_poller *tmp __attribute__((unused)); 747 748 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 749 assert(tmp != NULL); 750 751 /* This function is not used in any case that is performance critical. 752 * Update the cache simply by RB_MIN() if it needs to be changed. 753 */ 754 if (thread->first_timed_poller == poller) { 755 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 756 } 757 } 758 #endif 759 760 static void 761 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 762 { 763 if (poller->period_ticks) { 764 poller_insert_timer(thread, poller, spdk_get_ticks()); 765 } else { 766 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 767 } 768 } 769 770 static inline void 771 thread_update_stats(struct spdk_thread *thread, uint64_t end, 772 uint64_t start, int rc) 773 { 774 if (rc == 0) { 775 /* Poller status idle */ 776 thread->stats.idle_tsc += end - start; 777 } else if (rc > 0) { 778 /* Poller status busy */ 779 thread->stats.busy_tsc += end - start; 780 } 781 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 782 thread->tsc_last = end; 783 } 784 785 static inline int 786 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 787 { 788 int rc; 789 790 switch (poller->state) { 791 case SPDK_POLLER_STATE_UNREGISTERED: 792 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 793 free(poller); 794 return 0; 795 case SPDK_POLLER_STATE_PAUSING: 796 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 797 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 798 poller->state = SPDK_POLLER_STATE_PAUSED; 799 return 0; 800 case SPDK_POLLER_STATE_WAITING: 801 break; 802 default: 803 assert(false); 804 break; 805 } 806 807 poller->state = SPDK_POLLER_STATE_RUNNING; 808 rc = poller->fn(poller->arg); 809 810 poller->run_count++; 811 if (rc > 0) { 812 poller->busy_count++; 813 } 814 815 #ifdef DEBUG 816 if (rc == -1) { 817 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 818 } 819 #endif 820 821 switch (poller->state) { 822 case SPDK_POLLER_STATE_UNREGISTERED: 823 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 824 free(poller); 825 break; 826 case SPDK_POLLER_STATE_PAUSING: 827 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 828 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 829 poller->state = SPDK_POLLER_STATE_PAUSED; 830 break; 831 case SPDK_POLLER_STATE_PAUSED: 832 case SPDK_POLLER_STATE_WAITING: 833 break; 834 case SPDK_POLLER_STATE_RUNNING: 835 poller->state = SPDK_POLLER_STATE_WAITING; 836 break; 837 default: 838 assert(false); 839 break; 840 } 841 842 return rc; 843 } 844 845 static inline int 846 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 847 uint64_t now) 848 { 849 int rc; 850 851 switch (poller->state) { 852 case SPDK_POLLER_STATE_UNREGISTERED: 853 free(poller); 854 return 0; 855 case SPDK_POLLER_STATE_PAUSING: 856 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 857 poller->state = SPDK_POLLER_STATE_PAUSED; 858 return 0; 859 case SPDK_POLLER_STATE_WAITING: 860 break; 861 default: 862 assert(false); 863 break; 864 } 865 866 poller->state = SPDK_POLLER_STATE_RUNNING; 867 rc = poller->fn(poller->arg); 868 869 poller->run_count++; 870 if (rc > 0) { 871 poller->busy_count++; 872 } 873 874 #ifdef DEBUG 875 if (rc == -1) { 876 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 877 } 878 #endif 879 880 switch (poller->state) { 881 case SPDK_POLLER_STATE_UNREGISTERED: 882 free(poller); 883 break; 884 case SPDK_POLLER_STATE_PAUSING: 885 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 886 poller->state = SPDK_POLLER_STATE_PAUSED; 887 break; 888 case SPDK_POLLER_STATE_PAUSED: 889 break; 890 case SPDK_POLLER_STATE_RUNNING: 891 poller->state = SPDK_POLLER_STATE_WAITING; 892 /* fallthrough */ 893 case SPDK_POLLER_STATE_WAITING: 894 poller_insert_timer(thread, poller, now); 895 break; 896 default: 897 assert(false); 898 break; 899 } 900 901 return rc; 902 } 903 904 static int 905 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 906 { 907 uint32_t msg_count; 908 struct spdk_poller *poller, *tmp; 909 spdk_msg_fn critical_msg; 910 int rc = 0; 911 912 thread->tsc_last = now; 913 914 critical_msg = thread->critical_msg; 915 if (spdk_unlikely(critical_msg != NULL)) { 916 critical_msg(NULL); 917 thread->critical_msg = NULL; 918 rc = 1; 919 } 920 921 msg_count = msg_queue_run_batch(thread, max_msgs); 922 if (msg_count) { 923 rc = 1; 924 } 925 926 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 927 active_pollers_head, tailq, tmp) { 928 int poller_rc; 929 930 poller_rc = thread_execute_poller(thread, poller); 931 if (poller_rc > rc) { 932 rc = poller_rc; 933 } 934 } 935 936 poller = thread->first_timed_poller; 937 while (poller != NULL) { 938 int timer_rc = 0; 939 940 if (now < poller->next_run_tick) { 941 break; 942 } 943 944 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 945 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 946 947 /* Update the cache to the next timed poller in the list 948 * only if the current poller is still the closest, otherwise, 949 * do nothing because the cache has been already updated. 950 */ 951 if (thread->first_timed_poller == poller) { 952 thread->first_timed_poller = tmp; 953 } 954 955 timer_rc = thread_execute_timed_poller(thread, poller, now); 956 if (timer_rc > rc) { 957 rc = timer_rc; 958 } 959 960 poller = tmp; 961 } 962 963 return rc; 964 } 965 966 int 967 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 968 { 969 struct spdk_thread *orig_thread; 970 int rc; 971 uint64_t notify = 1; 972 973 orig_thread = _get_thread(); 974 tls_thread = thread; 975 976 if (now == 0) { 977 now = spdk_get_ticks(); 978 } 979 980 if (spdk_likely(!thread->in_interrupt)) { 981 rc = thread_poll(thread, max_msgs, now); 982 if (spdk_unlikely(thread->in_interrupt)) { 983 /* The thread transitioned to interrupt mode during the above poll. 984 * Poll it one more time in case that during the transition time 985 * there is msg received without notification. 986 */ 987 rc = thread_poll(thread, max_msgs, now); 988 } 989 } else { 990 /* Non-block wait on thread's fd_group */ 991 rc = spdk_fd_group_wait(thread->fgrp, 0); 992 if (spdk_unlikely(!thread->in_interrupt)) { 993 /* The thread transitioned to poll mode in a msg during the above processing. 994 * Clear msg_fd since thread messages will be polled directly in poll mode. 995 */ 996 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 997 if (rc < 0 && errno != EAGAIN) { 998 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 999 } 1000 } 1001 } 1002 1003 1004 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1005 thread_exit(thread, now); 1006 } 1007 1008 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1009 1010 tls_thread = orig_thread; 1011 1012 return rc; 1013 } 1014 1015 uint64_t 1016 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1017 { 1018 struct spdk_poller *poller; 1019 1020 poller = thread->first_timed_poller; 1021 if (poller) { 1022 return poller->next_run_tick; 1023 } 1024 1025 return 0; 1026 } 1027 1028 int 1029 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1030 { 1031 return !TAILQ_EMPTY(&thread->active_pollers); 1032 } 1033 1034 static bool 1035 thread_has_unpaused_pollers(struct spdk_thread *thread) 1036 { 1037 if (TAILQ_EMPTY(&thread->active_pollers) && 1038 RB_EMPTY(&thread->timed_pollers)) { 1039 return false; 1040 } 1041 1042 return true; 1043 } 1044 1045 bool 1046 spdk_thread_has_pollers(struct spdk_thread *thread) 1047 { 1048 if (!thread_has_unpaused_pollers(thread) && 1049 TAILQ_EMPTY(&thread->paused_pollers)) { 1050 return false; 1051 } 1052 1053 return true; 1054 } 1055 1056 bool 1057 spdk_thread_is_idle(struct spdk_thread *thread) 1058 { 1059 if (spdk_ring_count(thread->messages) || 1060 thread_has_unpaused_pollers(thread) || 1061 thread->critical_msg != NULL) { 1062 return false; 1063 } 1064 1065 return true; 1066 } 1067 1068 uint32_t 1069 spdk_thread_get_count(void) 1070 { 1071 /* 1072 * Return cached value of the current thread count. We could acquire the 1073 * lock and iterate through the TAILQ of threads to count them, but that 1074 * count could still be invalidated after we release the lock. 1075 */ 1076 return g_thread_count; 1077 } 1078 1079 struct spdk_thread * 1080 spdk_get_thread(void) 1081 { 1082 return _get_thread(); 1083 } 1084 1085 const char * 1086 spdk_thread_get_name(const struct spdk_thread *thread) 1087 { 1088 return thread->name; 1089 } 1090 1091 uint64_t 1092 spdk_thread_get_id(const struct spdk_thread *thread) 1093 { 1094 return thread->id; 1095 } 1096 1097 struct spdk_thread * 1098 spdk_thread_get_by_id(uint64_t id) 1099 { 1100 struct spdk_thread *thread; 1101 1102 if (id == 0 || id >= g_thread_id) { 1103 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1104 return NULL; 1105 } 1106 pthread_mutex_lock(&g_devlist_mutex); 1107 TAILQ_FOREACH(thread, &g_threads, tailq) { 1108 if (thread->id == id) { 1109 break; 1110 } 1111 } 1112 pthread_mutex_unlock(&g_devlist_mutex); 1113 return thread; 1114 } 1115 1116 int 1117 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1118 { 1119 struct spdk_thread *thread; 1120 1121 thread = _get_thread(); 1122 if (!thread) { 1123 SPDK_ERRLOG("No thread allocated\n"); 1124 return -EINVAL; 1125 } 1126 1127 if (stats == NULL) { 1128 return -EINVAL; 1129 } 1130 1131 *stats = thread->stats; 1132 1133 return 0; 1134 } 1135 1136 uint64_t 1137 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1138 { 1139 if (thread == NULL) { 1140 thread = _get_thread(); 1141 } 1142 1143 return thread->tsc_last; 1144 } 1145 1146 static inline int 1147 thread_send_msg_notification(const struct spdk_thread *target_thread) 1148 { 1149 uint64_t notify = 1; 1150 int rc; 1151 1152 /* Not necessary to do notification if interrupt facility is not enabled */ 1153 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1154 return 0; 1155 } 1156 1157 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1158 * after sending thread msg, it is necessary to check whether target thread runs in 1159 * interrupt mode and then decide whether do event notification. 1160 */ 1161 if (spdk_unlikely(target_thread->in_interrupt)) { 1162 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1163 if (rc < 0) { 1164 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1165 return -EIO; 1166 } 1167 } 1168 1169 return 0; 1170 } 1171 1172 int 1173 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1174 { 1175 struct spdk_thread *local_thread; 1176 struct spdk_msg *msg; 1177 int rc; 1178 1179 assert(thread != NULL); 1180 1181 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1182 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1183 return -EIO; 1184 } 1185 1186 local_thread = _get_thread(); 1187 1188 msg = NULL; 1189 if (local_thread != NULL) { 1190 if (local_thread->msg_cache_count > 0) { 1191 msg = SLIST_FIRST(&local_thread->msg_cache); 1192 assert(msg != NULL); 1193 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1194 local_thread->msg_cache_count--; 1195 } 1196 } 1197 1198 if (msg == NULL) { 1199 msg = spdk_mempool_get(g_spdk_msg_mempool); 1200 if (!msg) { 1201 SPDK_ERRLOG("msg could not be allocated\n"); 1202 return -ENOMEM; 1203 } 1204 } 1205 1206 msg->fn = fn; 1207 msg->arg = ctx; 1208 1209 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1210 if (rc != 1) { 1211 SPDK_ERRLOG("msg could not be enqueued\n"); 1212 spdk_mempool_put(g_spdk_msg_mempool, msg); 1213 return -EIO; 1214 } 1215 1216 return thread_send_msg_notification(thread); 1217 } 1218 1219 int 1220 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1221 { 1222 spdk_msg_fn expected = NULL; 1223 1224 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1225 __ATOMIC_SEQ_CST)) { 1226 return -EIO; 1227 } 1228 1229 return thread_send_msg_notification(thread); 1230 } 1231 1232 #ifdef __linux__ 1233 static int 1234 interrupt_timerfd_process(void *arg) 1235 { 1236 struct spdk_poller *poller = arg; 1237 uint64_t exp; 1238 int rc; 1239 1240 /* clear the level of interval timer */ 1241 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1242 if (rc < 0) { 1243 if (rc == -EAGAIN) { 1244 return 0; 1245 } 1246 1247 return rc; 1248 } 1249 1250 return poller->fn(poller->arg); 1251 } 1252 1253 static int 1254 period_poller_interrupt_init(struct spdk_poller *poller) 1255 { 1256 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1257 int timerfd; 1258 int rc; 1259 1260 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1261 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1262 if (timerfd < 0) { 1263 return -errno; 1264 } 1265 1266 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1267 if (rc < 0) { 1268 close(timerfd); 1269 return rc; 1270 } 1271 1272 poller->interruptfd = timerfd; 1273 return 0; 1274 } 1275 1276 static void 1277 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1278 { 1279 int timerfd = poller->interruptfd; 1280 uint64_t now_tick = spdk_get_ticks(); 1281 uint64_t ticks = spdk_get_ticks_hz(); 1282 int ret; 1283 struct itimerspec new_tv = {}; 1284 struct itimerspec old_tv = {}; 1285 1286 assert(poller->period_ticks != 0); 1287 assert(timerfd >= 0); 1288 1289 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1290 interrupt_mode ? "interrupt" : "poll"); 1291 1292 if (interrupt_mode) { 1293 /* Set repeated timer expiration */ 1294 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1295 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1296 1297 /* Update next timer expiration */ 1298 if (poller->next_run_tick == 0) { 1299 poller->next_run_tick = now_tick + poller->period_ticks; 1300 } else if (poller->next_run_tick < now_tick) { 1301 poller->next_run_tick = now_tick; 1302 } 1303 1304 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1305 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1306 1307 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1308 if (ret < 0) { 1309 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1310 assert(false); 1311 } 1312 } else { 1313 /* Disarm the timer */ 1314 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1315 if (ret < 0) { 1316 /* timerfd_settime's failure indicates that the timerfd is in error */ 1317 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1318 assert(false); 1319 } 1320 1321 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1322 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1323 */ 1324 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1325 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1326 poller_remove_timer(poller->thread, poller); 1327 poller_insert_timer(poller->thread, poller, now_tick); 1328 } 1329 } 1330 1331 static void 1332 poller_interrupt_fini(struct spdk_poller *poller) 1333 { 1334 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1335 assert(poller->interruptfd >= 0); 1336 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1337 close(poller->interruptfd); 1338 poller->interruptfd = -1; 1339 } 1340 1341 static int 1342 busy_poller_interrupt_init(struct spdk_poller *poller) 1343 { 1344 int busy_efd; 1345 int rc; 1346 1347 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1348 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1349 if (busy_efd < 0) { 1350 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1351 return -errno; 1352 } 1353 1354 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1355 poller->fn, poller->arg, poller->name); 1356 if (rc < 0) { 1357 close(busy_efd); 1358 return rc; 1359 } 1360 1361 poller->interruptfd = busy_efd; 1362 return 0; 1363 } 1364 1365 static void 1366 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1367 { 1368 int busy_efd = poller->interruptfd; 1369 uint64_t notify = 1; 1370 int rc __attribute__((unused)); 1371 1372 assert(busy_efd >= 0); 1373 1374 if (interrupt_mode) { 1375 /* Write without read on eventfd will get it repeatedly triggered. */ 1376 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1377 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1378 } 1379 } else { 1380 /* Read on eventfd will clear its level triggering. */ 1381 rc = read(busy_efd, ¬ify, sizeof(notify)); 1382 } 1383 } 1384 1385 #else 1386 1387 static int 1388 period_poller_interrupt_init(struct spdk_poller *poller) 1389 { 1390 return -ENOTSUP; 1391 } 1392 1393 static void 1394 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1395 { 1396 } 1397 1398 static void 1399 poller_interrupt_fini(struct spdk_poller *poller) 1400 { 1401 } 1402 1403 static int 1404 busy_poller_interrupt_init(struct spdk_poller *poller) 1405 { 1406 return -ENOTSUP; 1407 } 1408 1409 static void 1410 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1411 { 1412 } 1413 1414 #endif 1415 1416 void 1417 spdk_poller_register_interrupt(struct spdk_poller *poller, 1418 spdk_poller_set_interrupt_mode_cb cb_fn, 1419 void *cb_arg) 1420 { 1421 assert(poller != NULL); 1422 assert(cb_fn != NULL); 1423 assert(spdk_get_thread() == poller->thread); 1424 1425 if (!spdk_interrupt_mode_is_enabled()) { 1426 return; 1427 } 1428 1429 /* when a poller is created we don't know if the user is ever going to 1430 * enable interrupts on it by calling this function, so the poller 1431 * registration function has to immediately create a interruptfd. 1432 * When this function does get called by user, we have to then destroy 1433 * that interruptfd. 1434 */ 1435 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1436 poller_interrupt_fini(poller); 1437 } 1438 1439 poller->set_intr_cb_fn = cb_fn; 1440 poller->set_intr_cb_arg = cb_arg; 1441 1442 /* Set poller into interrupt mode if thread is in interrupt. */ 1443 if (poller->thread->in_interrupt) { 1444 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1445 } 1446 } 1447 1448 static uint64_t 1449 convert_us_to_ticks(uint64_t us) 1450 { 1451 uint64_t quotient, remainder, ticks; 1452 1453 if (us) { 1454 quotient = us / SPDK_SEC_TO_USEC; 1455 remainder = us % SPDK_SEC_TO_USEC; 1456 ticks = spdk_get_ticks_hz(); 1457 1458 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1459 } else { 1460 return 0; 1461 } 1462 } 1463 1464 static struct spdk_poller * 1465 poller_register(spdk_poller_fn fn, 1466 void *arg, 1467 uint64_t period_microseconds, 1468 const char *name) 1469 { 1470 struct spdk_thread *thread; 1471 struct spdk_poller *poller; 1472 1473 thread = spdk_get_thread(); 1474 if (!thread) { 1475 assert(false); 1476 return NULL; 1477 } 1478 1479 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1480 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1481 return NULL; 1482 } 1483 1484 poller = calloc(1, sizeof(*poller)); 1485 if (poller == NULL) { 1486 SPDK_ERRLOG("Poller memory allocation failed\n"); 1487 return NULL; 1488 } 1489 1490 if (name) { 1491 snprintf(poller->name, sizeof(poller->name), "%s", name); 1492 } else { 1493 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1494 } 1495 1496 poller->state = SPDK_POLLER_STATE_WAITING; 1497 poller->fn = fn; 1498 poller->arg = arg; 1499 poller->thread = thread; 1500 poller->interruptfd = -1; 1501 1502 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1503 1504 if (spdk_interrupt_mode_is_enabled()) { 1505 int rc; 1506 1507 if (period_microseconds) { 1508 rc = period_poller_interrupt_init(poller); 1509 if (rc < 0) { 1510 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1511 free(poller); 1512 return NULL; 1513 } 1514 1515 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1516 } else { 1517 /* If the poller doesn't have a period, create interruptfd that's always 1518 * busy automatically when runnning in interrupt mode. 1519 */ 1520 rc = busy_poller_interrupt_init(poller); 1521 if (rc > 0) { 1522 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1523 free(poller); 1524 return NULL; 1525 } 1526 1527 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1528 } 1529 } 1530 1531 thread_insert_poller(thread, poller); 1532 1533 return poller; 1534 } 1535 1536 struct spdk_poller * 1537 spdk_poller_register(spdk_poller_fn fn, 1538 void *arg, 1539 uint64_t period_microseconds) 1540 { 1541 return poller_register(fn, arg, period_microseconds, NULL); 1542 } 1543 1544 struct spdk_poller * 1545 spdk_poller_register_named(spdk_poller_fn fn, 1546 void *arg, 1547 uint64_t period_microseconds, 1548 const char *name) 1549 { 1550 return poller_register(fn, arg, period_microseconds, name); 1551 } 1552 1553 static void 1554 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1555 struct spdk_thread *curthread) 1556 { 1557 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1558 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1559 thread->name, thread->id); 1560 assert(false); 1561 } 1562 1563 void 1564 spdk_poller_unregister(struct spdk_poller **ppoller) 1565 { 1566 struct spdk_thread *thread; 1567 struct spdk_poller *poller; 1568 1569 poller = *ppoller; 1570 if (poller == NULL) { 1571 return; 1572 } 1573 1574 *ppoller = NULL; 1575 1576 thread = spdk_get_thread(); 1577 if (!thread) { 1578 assert(false); 1579 return; 1580 } 1581 1582 if (poller->thread != thread) { 1583 wrong_thread(__func__, poller->name, poller->thread, thread); 1584 return; 1585 } 1586 1587 if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) { 1588 poller_interrupt_fini(poller); 1589 } 1590 1591 /* If the poller was paused, put it on the active_pollers list so that 1592 * its unregistration can be processed by spdk_thread_poll(). 1593 */ 1594 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1595 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1596 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1597 poller->period_ticks = 0; 1598 } 1599 1600 /* Simply set the state to unregistered. The poller will get cleaned up 1601 * in a subsequent call to spdk_thread_poll(). 1602 */ 1603 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1604 } 1605 1606 void 1607 spdk_poller_pause(struct spdk_poller *poller) 1608 { 1609 struct spdk_thread *thread; 1610 1611 thread = spdk_get_thread(); 1612 if (!thread) { 1613 assert(false); 1614 return; 1615 } 1616 1617 if (poller->thread != thread) { 1618 wrong_thread(__func__, poller->name, poller->thread, thread); 1619 return; 1620 } 1621 1622 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1623 * spdk_thread_poll() move it. It allows a poller to be paused from 1624 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1625 * iteration, or from within itself without breaking the logic to always 1626 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1627 */ 1628 switch (poller->state) { 1629 case SPDK_POLLER_STATE_PAUSED: 1630 case SPDK_POLLER_STATE_PAUSING: 1631 break; 1632 case SPDK_POLLER_STATE_RUNNING: 1633 case SPDK_POLLER_STATE_WAITING: 1634 poller->state = SPDK_POLLER_STATE_PAUSING; 1635 break; 1636 default: 1637 assert(false); 1638 break; 1639 } 1640 } 1641 1642 void 1643 spdk_poller_resume(struct spdk_poller *poller) 1644 { 1645 struct spdk_thread *thread; 1646 1647 thread = spdk_get_thread(); 1648 if (!thread) { 1649 assert(false); 1650 return; 1651 } 1652 1653 if (poller->thread != thread) { 1654 wrong_thread(__func__, poller->name, poller->thread, thread); 1655 return; 1656 } 1657 1658 /* If a poller is paused it has to be removed from the paused pollers 1659 * list and put on the active list or timer tree depending on its 1660 * period_ticks. If a poller is still in the process of being paused, 1661 * we just need to flip its state back to waiting, as it's already on 1662 * the appropriate list or tree. 1663 */ 1664 switch (poller->state) { 1665 case SPDK_POLLER_STATE_PAUSED: 1666 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1667 thread_insert_poller(thread, poller); 1668 /* fallthrough */ 1669 case SPDK_POLLER_STATE_PAUSING: 1670 poller->state = SPDK_POLLER_STATE_WAITING; 1671 break; 1672 case SPDK_POLLER_STATE_RUNNING: 1673 case SPDK_POLLER_STATE_WAITING: 1674 break; 1675 default: 1676 assert(false); 1677 break; 1678 } 1679 } 1680 1681 const char * 1682 spdk_poller_get_name(struct spdk_poller *poller) 1683 { 1684 return poller->name; 1685 } 1686 1687 const char * 1688 spdk_poller_get_state_str(struct spdk_poller *poller) 1689 { 1690 switch (poller->state) { 1691 case SPDK_POLLER_STATE_WAITING: 1692 return "waiting"; 1693 case SPDK_POLLER_STATE_RUNNING: 1694 return "running"; 1695 case SPDK_POLLER_STATE_UNREGISTERED: 1696 return "unregistered"; 1697 case SPDK_POLLER_STATE_PAUSING: 1698 return "pausing"; 1699 case SPDK_POLLER_STATE_PAUSED: 1700 return "paused"; 1701 default: 1702 return NULL; 1703 } 1704 } 1705 1706 uint64_t 1707 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1708 { 1709 return poller->period_ticks; 1710 } 1711 1712 void 1713 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1714 { 1715 stats->run_count = poller->run_count; 1716 stats->busy_count = poller->busy_count; 1717 } 1718 1719 struct spdk_poller * 1720 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1721 { 1722 return TAILQ_FIRST(&thread->active_pollers); 1723 } 1724 1725 struct spdk_poller * 1726 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1727 { 1728 return TAILQ_NEXT(prev, tailq); 1729 } 1730 1731 struct spdk_poller * 1732 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1733 { 1734 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1735 } 1736 1737 struct spdk_poller * 1738 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1739 { 1740 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1741 } 1742 1743 struct spdk_poller * 1744 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1745 { 1746 return TAILQ_FIRST(&thread->paused_pollers); 1747 } 1748 1749 struct spdk_poller * 1750 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1751 { 1752 return TAILQ_NEXT(prev, tailq); 1753 } 1754 1755 struct spdk_io_channel * 1756 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1757 { 1758 return RB_MIN(io_channel_tree, &thread->io_channels); 1759 } 1760 1761 struct spdk_io_channel * 1762 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1763 { 1764 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1765 } 1766 1767 struct call_thread { 1768 struct spdk_thread *cur_thread; 1769 spdk_msg_fn fn; 1770 void *ctx; 1771 1772 struct spdk_thread *orig_thread; 1773 spdk_msg_fn cpl; 1774 }; 1775 1776 static void 1777 _on_thread(void *ctx) 1778 { 1779 struct call_thread *ct = ctx; 1780 int rc __attribute__((unused)); 1781 1782 ct->fn(ct->ctx); 1783 1784 pthread_mutex_lock(&g_devlist_mutex); 1785 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1786 pthread_mutex_unlock(&g_devlist_mutex); 1787 1788 if (!ct->cur_thread) { 1789 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1790 1791 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1792 free(ctx); 1793 } else { 1794 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1795 ct->cur_thread->name); 1796 1797 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1798 } 1799 assert(rc == 0); 1800 } 1801 1802 void 1803 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1804 { 1805 struct call_thread *ct; 1806 struct spdk_thread *thread; 1807 int rc __attribute__((unused)); 1808 1809 ct = calloc(1, sizeof(*ct)); 1810 if (!ct) { 1811 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1812 cpl(ctx); 1813 return; 1814 } 1815 1816 ct->fn = fn; 1817 ct->ctx = ctx; 1818 ct->cpl = cpl; 1819 1820 thread = _get_thread(); 1821 if (!thread) { 1822 SPDK_ERRLOG("No thread allocated\n"); 1823 free(ct); 1824 cpl(ctx); 1825 return; 1826 } 1827 ct->orig_thread = thread; 1828 1829 pthread_mutex_lock(&g_devlist_mutex); 1830 ct->cur_thread = TAILQ_FIRST(&g_threads); 1831 pthread_mutex_unlock(&g_devlist_mutex); 1832 1833 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1834 ct->orig_thread->name); 1835 1836 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1837 assert(rc == 0); 1838 } 1839 1840 static inline void 1841 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1842 { 1843 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1844 return; 1845 } 1846 1847 if (!poller->set_intr_cb_fn) { 1848 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1849 assert(false); 1850 return; 1851 } 1852 1853 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1854 } 1855 1856 void 1857 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1858 { 1859 struct spdk_thread *thread = _get_thread(); 1860 struct spdk_poller *poller, *tmp; 1861 1862 assert(thread); 1863 assert(spdk_interrupt_mode_is_enabled()); 1864 1865 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1866 thread->name, enable_interrupt ? "intr" : "poll", 1867 thread->in_interrupt ? "intr" : "poll"); 1868 1869 if (thread->in_interrupt == enable_interrupt) { 1870 return; 1871 } 1872 1873 /* Set pollers to expected mode */ 1874 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1875 poller_set_interrupt_mode(poller, enable_interrupt); 1876 } 1877 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1878 poller_set_interrupt_mode(poller, enable_interrupt); 1879 } 1880 /* All paused pollers will go to work in interrupt mode */ 1881 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1882 poller_set_interrupt_mode(poller, enable_interrupt); 1883 } 1884 1885 thread->in_interrupt = enable_interrupt; 1886 return; 1887 } 1888 1889 static struct io_device * 1890 io_device_get(void *io_device) 1891 { 1892 struct io_device find = {}; 1893 1894 find.io_device = io_device; 1895 return RB_FIND(io_device_tree, &g_io_devices, &find); 1896 } 1897 1898 void 1899 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1900 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1901 const char *name) 1902 { 1903 struct io_device *dev, *tmp; 1904 struct spdk_thread *thread; 1905 1906 assert(io_device != NULL); 1907 assert(create_cb != NULL); 1908 assert(destroy_cb != NULL); 1909 1910 thread = spdk_get_thread(); 1911 if (!thread) { 1912 SPDK_ERRLOG("called from non-SPDK thread\n"); 1913 assert(false); 1914 return; 1915 } 1916 1917 dev = calloc(1, sizeof(struct io_device)); 1918 if (dev == NULL) { 1919 SPDK_ERRLOG("could not allocate io_device\n"); 1920 return; 1921 } 1922 1923 dev->io_device = io_device; 1924 if (name) { 1925 snprintf(dev->name, sizeof(dev->name), "%s", name); 1926 } else { 1927 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1928 } 1929 dev->create_cb = create_cb; 1930 dev->destroy_cb = destroy_cb; 1931 dev->unregister_cb = NULL; 1932 dev->ctx_size = ctx_size; 1933 dev->for_each_count = 0; 1934 dev->unregistered = false; 1935 dev->refcnt = 0; 1936 1937 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1938 dev->name, dev->io_device, thread->name); 1939 1940 pthread_mutex_lock(&g_devlist_mutex); 1941 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1942 if (tmp != NULL) { 1943 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1944 io_device, tmp->name, dev->name); 1945 pthread_mutex_unlock(&g_devlist_mutex); 1946 free(dev); 1947 } 1948 1949 pthread_mutex_unlock(&g_devlist_mutex); 1950 } 1951 1952 static void 1953 _finish_unregister(void *arg) 1954 { 1955 struct io_device *dev = arg; 1956 struct spdk_thread *thread; 1957 1958 thread = spdk_get_thread(); 1959 assert(thread == dev->unregister_thread); 1960 1961 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1962 dev->name, dev->io_device, thread->name); 1963 1964 assert(thread->pending_unregister_count > 0); 1965 thread->pending_unregister_count--; 1966 1967 dev->unregister_cb(dev->io_device); 1968 free(dev); 1969 } 1970 1971 static void 1972 io_device_free(struct io_device *dev) 1973 { 1974 int rc __attribute__((unused)); 1975 1976 if (dev->unregister_cb == NULL) { 1977 free(dev); 1978 } else { 1979 assert(dev->unregister_thread != NULL); 1980 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1981 dev->name, dev->io_device, dev->unregister_thread->name); 1982 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1983 assert(rc == 0); 1984 } 1985 } 1986 1987 void 1988 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1989 { 1990 struct io_device *dev; 1991 uint32_t refcnt; 1992 struct spdk_thread *thread; 1993 1994 thread = spdk_get_thread(); 1995 if (!thread) { 1996 SPDK_ERRLOG("called from non-SPDK thread\n"); 1997 assert(false); 1998 return; 1999 } 2000 2001 pthread_mutex_lock(&g_devlist_mutex); 2002 dev = io_device_get(io_device); 2003 if (!dev) { 2004 SPDK_ERRLOG("io_device %p not found\n", io_device); 2005 assert(false); 2006 pthread_mutex_unlock(&g_devlist_mutex); 2007 return; 2008 } 2009 2010 if (dev->for_each_count > 0) { 2011 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2012 dev->name, io_device, dev->for_each_count); 2013 pthread_mutex_unlock(&g_devlist_mutex); 2014 return; 2015 } 2016 2017 dev->unregister_cb = unregister_cb; 2018 dev->unregistered = true; 2019 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2020 refcnt = dev->refcnt; 2021 dev->unregister_thread = thread; 2022 pthread_mutex_unlock(&g_devlist_mutex); 2023 2024 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2025 dev->name, dev->io_device, thread->name); 2026 2027 if (unregister_cb) { 2028 thread->pending_unregister_count++; 2029 } 2030 2031 if (refcnt > 0) { 2032 /* defer deletion */ 2033 return; 2034 } 2035 2036 io_device_free(dev); 2037 } 2038 2039 const char * 2040 spdk_io_device_get_name(struct io_device *dev) 2041 { 2042 return dev->name; 2043 } 2044 2045 static struct spdk_io_channel * 2046 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2047 { 2048 struct spdk_io_channel find = {}; 2049 2050 find.dev = dev; 2051 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2052 } 2053 2054 struct spdk_io_channel * 2055 spdk_get_io_channel(void *io_device) 2056 { 2057 struct spdk_io_channel *ch; 2058 struct spdk_thread *thread; 2059 struct io_device *dev; 2060 int rc; 2061 2062 pthread_mutex_lock(&g_devlist_mutex); 2063 dev = io_device_get(io_device); 2064 if (dev == NULL) { 2065 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2066 pthread_mutex_unlock(&g_devlist_mutex); 2067 return NULL; 2068 } 2069 2070 thread = _get_thread(); 2071 if (!thread) { 2072 SPDK_ERRLOG("No thread allocated\n"); 2073 pthread_mutex_unlock(&g_devlist_mutex); 2074 return NULL; 2075 } 2076 2077 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2078 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2079 pthread_mutex_unlock(&g_devlist_mutex); 2080 return NULL; 2081 } 2082 2083 ch = thread_get_io_channel(thread, dev); 2084 if (ch != NULL) { 2085 ch->ref++; 2086 2087 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2088 ch, dev->name, dev->io_device, thread->name, ch->ref); 2089 2090 /* 2091 * An I/O channel already exists for this device on this 2092 * thread, so return it. 2093 */ 2094 pthread_mutex_unlock(&g_devlist_mutex); 2095 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2096 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2097 return ch; 2098 } 2099 2100 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2101 if (ch == NULL) { 2102 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2103 pthread_mutex_unlock(&g_devlist_mutex); 2104 return NULL; 2105 } 2106 2107 ch->dev = dev; 2108 ch->destroy_cb = dev->destroy_cb; 2109 ch->thread = thread; 2110 ch->ref = 1; 2111 ch->destroy_ref = 0; 2112 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2113 2114 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2115 ch, dev->name, dev->io_device, thread->name, ch->ref); 2116 2117 dev->refcnt++; 2118 2119 pthread_mutex_unlock(&g_devlist_mutex); 2120 2121 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2122 if (rc != 0) { 2123 pthread_mutex_lock(&g_devlist_mutex); 2124 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2125 dev->refcnt--; 2126 free(ch); 2127 pthread_mutex_unlock(&g_devlist_mutex); 2128 return NULL; 2129 } 2130 2131 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2132 return ch; 2133 } 2134 2135 static void 2136 put_io_channel(void *arg) 2137 { 2138 struct spdk_io_channel *ch = arg; 2139 bool do_remove_dev = true; 2140 struct spdk_thread *thread; 2141 2142 thread = spdk_get_thread(); 2143 if (!thread) { 2144 SPDK_ERRLOG("called from non-SPDK thread\n"); 2145 assert(false); 2146 return; 2147 } 2148 2149 SPDK_DEBUGLOG(thread, 2150 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2151 ch, ch->dev->name, ch->dev->io_device, thread->name); 2152 2153 assert(ch->thread == thread); 2154 2155 ch->destroy_ref--; 2156 2157 if (ch->ref > 0 || ch->destroy_ref > 0) { 2158 /* 2159 * Another reference to the associated io_device was requested 2160 * after this message was sent but before it had a chance to 2161 * execute. 2162 */ 2163 return; 2164 } 2165 2166 pthread_mutex_lock(&g_devlist_mutex); 2167 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2168 pthread_mutex_unlock(&g_devlist_mutex); 2169 2170 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2171 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2172 2173 pthread_mutex_lock(&g_devlist_mutex); 2174 ch->dev->refcnt--; 2175 2176 if (!ch->dev->unregistered) { 2177 do_remove_dev = false; 2178 } 2179 2180 if (ch->dev->refcnt > 0) { 2181 do_remove_dev = false; 2182 } 2183 2184 pthread_mutex_unlock(&g_devlist_mutex); 2185 2186 if (do_remove_dev) { 2187 io_device_free(ch->dev); 2188 } 2189 free(ch); 2190 } 2191 2192 void 2193 spdk_put_io_channel(struct spdk_io_channel *ch) 2194 { 2195 struct spdk_thread *thread; 2196 int rc __attribute__((unused)); 2197 2198 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2199 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2200 2201 thread = spdk_get_thread(); 2202 if (!thread) { 2203 SPDK_ERRLOG("called from non-SPDK thread\n"); 2204 assert(false); 2205 return; 2206 } 2207 2208 if (ch->thread != thread) { 2209 wrong_thread(__func__, "ch", ch->thread, thread); 2210 return; 2211 } 2212 2213 SPDK_DEBUGLOG(thread, 2214 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2215 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2216 2217 ch->ref--; 2218 2219 if (ch->ref == 0) { 2220 ch->destroy_ref++; 2221 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2222 assert(rc == 0); 2223 } 2224 } 2225 2226 struct spdk_io_channel * 2227 spdk_io_channel_from_ctx(void *ctx) 2228 { 2229 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2230 } 2231 2232 struct spdk_thread * 2233 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2234 { 2235 return ch->thread; 2236 } 2237 2238 void * 2239 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2240 { 2241 return ch->dev->io_device; 2242 } 2243 2244 const char * 2245 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2246 { 2247 return spdk_io_device_get_name(ch->dev); 2248 } 2249 2250 int 2251 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2252 { 2253 return ch->ref; 2254 } 2255 2256 struct spdk_io_channel_iter { 2257 void *io_device; 2258 struct io_device *dev; 2259 spdk_channel_msg fn; 2260 int status; 2261 void *ctx; 2262 struct spdk_io_channel *ch; 2263 2264 struct spdk_thread *cur_thread; 2265 2266 struct spdk_thread *orig_thread; 2267 spdk_channel_for_each_cpl cpl; 2268 }; 2269 2270 void * 2271 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2272 { 2273 return i->io_device; 2274 } 2275 2276 struct spdk_io_channel * 2277 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2278 { 2279 return i->ch; 2280 } 2281 2282 void * 2283 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2284 { 2285 return i->ctx; 2286 } 2287 2288 static void 2289 _call_completion(void *ctx) 2290 { 2291 struct spdk_io_channel_iter *i = ctx; 2292 2293 if (i->cpl != NULL) { 2294 i->cpl(i, i->status); 2295 } 2296 free(i); 2297 } 2298 2299 static void 2300 _call_channel(void *ctx) 2301 { 2302 struct spdk_io_channel_iter *i = ctx; 2303 struct spdk_io_channel *ch; 2304 2305 /* 2306 * It is possible that the channel was deleted before this 2307 * message had a chance to execute. If so, skip calling 2308 * the fn() on this thread. 2309 */ 2310 pthread_mutex_lock(&g_devlist_mutex); 2311 ch = thread_get_io_channel(i->cur_thread, i->dev); 2312 pthread_mutex_unlock(&g_devlist_mutex); 2313 2314 if (ch) { 2315 i->fn(i); 2316 } else { 2317 spdk_for_each_channel_continue(i, 0); 2318 } 2319 } 2320 2321 void 2322 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2323 spdk_channel_for_each_cpl cpl) 2324 { 2325 struct spdk_thread *thread; 2326 struct spdk_io_channel *ch; 2327 struct spdk_io_channel_iter *i; 2328 int rc __attribute__((unused)); 2329 2330 i = calloc(1, sizeof(*i)); 2331 if (!i) { 2332 SPDK_ERRLOG("Unable to allocate iterator\n"); 2333 return; 2334 } 2335 2336 i->io_device = io_device; 2337 i->fn = fn; 2338 i->ctx = ctx; 2339 i->cpl = cpl; 2340 i->orig_thread = _get_thread(); 2341 2342 pthread_mutex_lock(&g_devlist_mutex); 2343 i->dev = io_device_get(io_device); 2344 if (i->dev == NULL) { 2345 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2346 assert(false); 2347 goto end; 2348 } 2349 2350 TAILQ_FOREACH(thread, &g_threads, tailq) { 2351 ch = thread_get_io_channel(thread, i->dev); 2352 if (ch != NULL) { 2353 ch->dev->for_each_count++; 2354 i->cur_thread = thread; 2355 i->ch = ch; 2356 pthread_mutex_unlock(&g_devlist_mutex); 2357 rc = spdk_thread_send_msg(thread, _call_channel, i); 2358 assert(rc == 0); 2359 return; 2360 } 2361 } 2362 2363 end: 2364 pthread_mutex_unlock(&g_devlist_mutex); 2365 2366 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2367 assert(rc == 0); 2368 } 2369 2370 void 2371 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2372 { 2373 struct spdk_thread *thread; 2374 struct spdk_io_channel *ch; 2375 int rc __attribute__((unused)); 2376 2377 assert(i->cur_thread == spdk_get_thread()); 2378 2379 i->status = status; 2380 2381 pthread_mutex_lock(&g_devlist_mutex); 2382 if (status) { 2383 goto end; 2384 } 2385 thread = TAILQ_NEXT(i->cur_thread, tailq); 2386 while (thread) { 2387 ch = thread_get_io_channel(thread, i->dev); 2388 if (ch != NULL) { 2389 i->cur_thread = thread; 2390 i->ch = ch; 2391 pthread_mutex_unlock(&g_devlist_mutex); 2392 rc = spdk_thread_send_msg(thread, _call_channel, i); 2393 assert(rc == 0); 2394 return; 2395 } 2396 thread = TAILQ_NEXT(thread, tailq); 2397 } 2398 2399 end: 2400 i->dev->for_each_count--; 2401 i->ch = NULL; 2402 pthread_mutex_unlock(&g_devlist_mutex); 2403 2404 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2405 assert(rc == 0); 2406 } 2407 2408 struct spdk_interrupt { 2409 int efd; 2410 struct spdk_thread *thread; 2411 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2412 }; 2413 2414 static void 2415 thread_interrupt_destroy(struct spdk_thread *thread) 2416 { 2417 struct spdk_fd_group *fgrp = thread->fgrp; 2418 2419 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2420 2421 if (thread->msg_fd < 0) { 2422 return; 2423 } 2424 2425 spdk_fd_group_remove(fgrp, thread->msg_fd); 2426 close(thread->msg_fd); 2427 thread->msg_fd = -1; 2428 2429 spdk_fd_group_destroy(fgrp); 2430 thread->fgrp = NULL; 2431 } 2432 2433 #ifdef __linux__ 2434 static int 2435 thread_interrupt_msg_process(void *arg) 2436 { 2437 struct spdk_thread *thread = arg; 2438 uint32_t msg_count; 2439 spdk_msg_fn critical_msg; 2440 int rc = 0; 2441 uint64_t notify = 1; 2442 2443 assert(spdk_interrupt_mode_is_enabled()); 2444 2445 /* There may be race between msg_acknowledge and another producer's msg_notify, 2446 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2447 * This can avoid msg notification missing. 2448 */ 2449 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2450 if (rc < 0 && errno != EAGAIN) { 2451 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2452 } 2453 2454 critical_msg = thread->critical_msg; 2455 if (spdk_unlikely(critical_msg != NULL)) { 2456 critical_msg(NULL); 2457 thread->critical_msg = NULL; 2458 rc = 1; 2459 } 2460 2461 msg_count = msg_queue_run_batch(thread, 0); 2462 if (msg_count) { 2463 rc = 1; 2464 } 2465 2466 return rc; 2467 } 2468 2469 static int 2470 thread_interrupt_create(struct spdk_thread *thread) 2471 { 2472 int rc; 2473 2474 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2475 2476 rc = spdk_fd_group_create(&thread->fgrp); 2477 if (rc) { 2478 return rc; 2479 } 2480 2481 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2482 if (thread->msg_fd < 0) { 2483 rc = -errno; 2484 spdk_fd_group_destroy(thread->fgrp); 2485 thread->fgrp = NULL; 2486 2487 return rc; 2488 } 2489 2490 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2491 thread_interrupt_msg_process, thread); 2492 } 2493 #else 2494 static int 2495 thread_interrupt_create(struct spdk_thread *thread) 2496 { 2497 return -ENOTSUP; 2498 } 2499 #endif 2500 2501 struct spdk_interrupt * 2502 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2503 void *arg, const char *name) 2504 { 2505 struct spdk_thread *thread; 2506 struct spdk_interrupt *intr; 2507 int ret; 2508 2509 thread = spdk_get_thread(); 2510 if (!thread) { 2511 assert(false); 2512 return NULL; 2513 } 2514 2515 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2516 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2517 return NULL; 2518 } 2519 2520 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2521 2522 if (ret != 0) { 2523 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2524 thread->name, efd, spdk_strerror(-ret)); 2525 return NULL; 2526 } 2527 2528 intr = calloc(1, sizeof(*intr)); 2529 if (intr == NULL) { 2530 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2531 return NULL; 2532 } 2533 2534 if (name) { 2535 snprintf(intr->name, sizeof(intr->name), "%s", name); 2536 } else { 2537 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2538 } 2539 2540 intr->efd = efd; 2541 intr->thread = thread; 2542 2543 return intr; 2544 } 2545 2546 void 2547 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2548 { 2549 struct spdk_thread *thread; 2550 struct spdk_interrupt *intr; 2551 2552 intr = *pintr; 2553 if (intr == NULL) { 2554 return; 2555 } 2556 2557 *pintr = NULL; 2558 2559 thread = spdk_get_thread(); 2560 if (!thread) { 2561 assert(false); 2562 return; 2563 } 2564 2565 if (intr->thread != thread) { 2566 wrong_thread(__func__, intr->name, intr->thread, thread); 2567 return; 2568 } 2569 2570 spdk_fd_group_remove(thread->fgrp, intr->efd); 2571 free(intr); 2572 } 2573 2574 int 2575 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2576 enum spdk_interrupt_event_types event_types) 2577 { 2578 struct spdk_thread *thread; 2579 2580 thread = spdk_get_thread(); 2581 if (!thread) { 2582 assert(false); 2583 return -EINVAL; 2584 } 2585 2586 if (intr->thread != thread) { 2587 wrong_thread(__func__, intr->name, intr->thread, thread); 2588 return -EINVAL; 2589 } 2590 2591 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2592 } 2593 2594 int 2595 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2596 { 2597 return spdk_fd_group_get_fd(thread->fgrp); 2598 } 2599 2600 static bool g_interrupt_mode = false; 2601 2602 int 2603 spdk_interrupt_mode_enable(void) 2604 { 2605 /* It must be called once prior to initializing the threading library. 2606 * g_spdk_msg_mempool will be valid if thread library is initialized. 2607 */ 2608 if (g_spdk_msg_mempool) { 2609 SPDK_ERRLOG("Failed due to threading library is already initailzied.\n"); 2610 return -1; 2611 } 2612 2613 #ifdef __linux__ 2614 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2615 g_interrupt_mode = true; 2616 return 0; 2617 #else 2618 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2619 g_interrupt_mode = false; 2620 return -ENOTSUP; 2621 #endif 2622 } 2623 2624 bool 2625 spdk_interrupt_mode_is_enabled(void) 2626 { 2627 return g_interrupt_mode; 2628 } 2629 2630 SPDK_LOG_REGISTER_COMPONENT(thread) 2631