1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/trace.h" 42 #include "spdk/util.h" 43 #include "spdk/fd_group.h" 44 45 #include "spdk/log.h" 46 #include "spdk_internal/thread.h" 47 #include "thread_internal.h" 48 49 #include "spdk_internal/trace_defs.h" 50 51 #ifdef __linux__ 52 #include <sys/timerfd.h> 53 #include <sys/eventfd.h> 54 #endif 55 56 #define SPDK_MSG_BATCH_SIZE 8 57 #define SPDK_MAX_DEVICE_NAME_LEN 256 58 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 59 #define SPDK_MAX_POLLER_NAME_LEN 256 60 #define SPDK_MAX_THREAD_NAME_LEN 256 61 62 enum spdk_poller_state { 63 /* The poller is registered with a thread but not currently executing its fn. */ 64 SPDK_POLLER_STATE_WAITING, 65 66 /* The poller is currently running its fn. */ 67 SPDK_POLLER_STATE_RUNNING, 68 69 /* The poller was unregistered during the execution of its fn. */ 70 SPDK_POLLER_STATE_UNREGISTERED, 71 72 /* The poller is in the process of being paused. It will be paused 73 * during the next time it's supposed to be executed. 74 */ 75 SPDK_POLLER_STATE_PAUSING, 76 77 /* The poller is registered but currently paused. It's on the 78 * paused_pollers list. 79 */ 80 SPDK_POLLER_STATE_PAUSED, 81 }; 82 83 struct spdk_poller { 84 TAILQ_ENTRY(spdk_poller) tailq; 85 RB_ENTRY(spdk_poller) node; 86 87 /* Current state of the poller; should only be accessed from the poller's thread. */ 88 enum spdk_poller_state state; 89 90 uint64_t period_ticks; 91 uint64_t next_run_tick; 92 uint64_t run_count; 93 uint64_t busy_count; 94 spdk_poller_fn fn; 95 void *arg; 96 struct spdk_thread *thread; 97 int interruptfd; 98 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 99 void *set_intr_cb_arg; 100 101 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 102 }; 103 104 enum spdk_thread_state { 105 /* The thread is pocessing poller and message by spdk_thread_poll(). */ 106 SPDK_THREAD_STATE_RUNNING, 107 108 /* The thread is in the process of termination. It reaps unregistering 109 * poller are releasing I/O channel. 110 */ 111 SPDK_THREAD_STATE_EXITING, 112 113 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 114 SPDK_THREAD_STATE_EXITED, 115 }; 116 117 struct spdk_thread { 118 uint64_t tsc_last; 119 struct spdk_thread_stats stats; 120 /* 121 * Contains pollers actively running on this thread. Pollers 122 * are run round-robin. The thread takes one poller from the head 123 * of the ring, executes it, then puts it back at the tail of 124 * the ring. 125 */ 126 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 127 /** 128 * Contains pollers running on this thread with a periodic timer. 129 */ 130 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 131 struct spdk_poller *first_timed_poller; 132 /* 133 * Contains paused pollers. Pollers on this queue are waiting until 134 * they are resumed (in which case they're put onto the active/timer 135 * queues) or unregistered. 136 */ 137 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 138 struct spdk_ring *messages; 139 int msg_fd; 140 SLIST_HEAD(, spdk_msg) msg_cache; 141 size_t msg_cache_count; 142 spdk_msg_fn critical_msg; 143 uint64_t id; 144 enum spdk_thread_state state; 145 int pending_unregister_count; 146 147 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 148 TAILQ_ENTRY(spdk_thread) tailq; 149 150 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 151 struct spdk_cpuset cpumask; 152 uint64_t exit_timeout_tsc; 153 154 /* Indicates whether this spdk_thread currently runs in interrupt. */ 155 bool in_interrupt; 156 struct spdk_fd_group *fgrp; 157 158 /* User context allocated at the end */ 159 uint8_t ctx[0]; 160 }; 161 162 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 163 164 static spdk_new_thread_fn g_new_thread_fn = NULL; 165 static spdk_thread_op_fn g_thread_op_fn = NULL; 166 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 167 static size_t g_ctx_sz = 0; 168 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 169 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 170 * SPDK application is required. 171 */ 172 static uint64_t g_thread_id = 1; 173 174 struct io_device { 175 void *io_device; 176 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 177 spdk_io_channel_create_cb create_cb; 178 spdk_io_channel_destroy_cb destroy_cb; 179 spdk_io_device_unregister_cb unregister_cb; 180 struct spdk_thread *unregister_thread; 181 uint32_t ctx_size; 182 uint32_t for_each_count; 183 RB_ENTRY(io_device) node; 184 185 uint32_t refcnt; 186 187 bool unregistered; 188 }; 189 190 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 191 192 static int 193 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 194 { 195 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 196 } 197 198 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 199 200 static int 201 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 202 { 203 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 204 } 205 206 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 207 208 struct spdk_msg { 209 spdk_msg_fn fn; 210 void *arg; 211 212 SLIST_ENTRY(spdk_msg) link; 213 }; 214 215 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 216 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 217 218 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 219 static uint32_t g_thread_count = 0; 220 221 static __thread struct spdk_thread *tls_thread = NULL; 222 223 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 224 { 225 spdk_trace_register_description("THREAD_IOCH_GET", 226 TRACE_THREAD_IOCH_GET, 227 OWNER_NONE, OBJECT_NONE, 0, 228 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 229 spdk_trace_register_description("THREAD_IOCH_PUT", 230 TRACE_THREAD_IOCH_PUT, 231 OWNER_NONE, OBJECT_NONE, 0, 232 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 233 } 234 235 /* 236 * If this compare function returns zero when two next_run_ticks are equal, 237 * the macro RB_INSERT() returns a pointer to the element with the same 238 * next_run_tick. 239 * 240 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 241 * to remove as a parameter. 242 * 243 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 244 * side by returning 1 when two next_run_ticks are equal. 245 */ 246 static inline int 247 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 248 { 249 if (poller1->next_run_tick < poller2->next_run_tick) { 250 return -1; 251 } else { 252 return 1; 253 } 254 } 255 256 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 257 258 static inline struct spdk_thread * 259 _get_thread(void) 260 { 261 return tls_thread; 262 } 263 264 static int 265 _thread_lib_init(size_t ctx_sz) 266 { 267 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 268 269 g_ctx_sz = ctx_sz; 270 271 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 272 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 273 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 274 sizeof(struct spdk_msg), 275 0, /* No cache. We do our own. */ 276 SPDK_ENV_SOCKET_ID_ANY); 277 278 if (!g_spdk_msg_mempool) { 279 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 280 return -1; 281 } 282 283 return 0; 284 } 285 286 int 287 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 288 { 289 assert(g_new_thread_fn == NULL); 290 assert(g_thread_op_fn == NULL); 291 292 if (new_thread_fn == NULL) { 293 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 294 } else { 295 g_new_thread_fn = new_thread_fn; 296 } 297 298 return _thread_lib_init(ctx_sz); 299 } 300 301 int 302 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 303 spdk_thread_op_supported_fn thread_op_supported_fn, 304 size_t ctx_sz) 305 { 306 assert(g_new_thread_fn == NULL); 307 assert(g_thread_op_fn == NULL); 308 assert(g_thread_op_supported_fn == NULL); 309 310 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 311 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 312 return -EINVAL; 313 } 314 315 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 316 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 317 } else { 318 g_thread_op_fn = thread_op_fn; 319 g_thread_op_supported_fn = thread_op_supported_fn; 320 } 321 322 return _thread_lib_init(ctx_sz); 323 } 324 325 void 326 spdk_thread_lib_fini(void) 327 { 328 struct io_device *dev; 329 330 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 331 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 332 } 333 334 if (g_spdk_msg_mempool) { 335 spdk_mempool_free(g_spdk_msg_mempool); 336 g_spdk_msg_mempool = NULL; 337 } 338 339 g_new_thread_fn = NULL; 340 g_thread_op_fn = NULL; 341 g_thread_op_supported_fn = NULL; 342 g_ctx_sz = 0; 343 } 344 345 static void thread_interrupt_destroy(struct spdk_thread *thread); 346 static int thread_interrupt_create(struct spdk_thread *thread); 347 348 static void 349 _free_thread(struct spdk_thread *thread) 350 { 351 struct spdk_io_channel *ch; 352 struct spdk_msg *msg; 353 struct spdk_poller *poller, *ptmp; 354 355 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 356 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 357 thread->name, ch->dev->name); 358 } 359 360 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 361 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 362 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 363 poller->name); 364 } 365 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 366 free(poller); 367 } 368 369 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 370 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 371 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 372 poller->name); 373 } 374 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 375 free(poller); 376 } 377 378 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 379 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 380 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 381 free(poller); 382 } 383 384 pthread_mutex_lock(&g_devlist_mutex); 385 assert(g_thread_count > 0); 386 g_thread_count--; 387 TAILQ_REMOVE(&g_threads, thread, tailq); 388 pthread_mutex_unlock(&g_devlist_mutex); 389 390 msg = SLIST_FIRST(&thread->msg_cache); 391 while (msg != NULL) { 392 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 393 394 assert(thread->msg_cache_count > 0); 395 thread->msg_cache_count--; 396 spdk_mempool_put(g_spdk_msg_mempool, msg); 397 398 msg = SLIST_FIRST(&thread->msg_cache); 399 } 400 401 assert(thread->msg_cache_count == 0); 402 403 if (spdk_interrupt_mode_is_enabled()) { 404 thread_interrupt_destroy(thread); 405 } 406 407 spdk_ring_free(thread->messages); 408 free(thread); 409 } 410 411 struct spdk_thread * 412 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 413 { 414 struct spdk_thread *thread; 415 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 416 int rc = 0, i; 417 418 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 419 if (!thread) { 420 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 421 return NULL; 422 } 423 424 if (cpumask) { 425 spdk_cpuset_copy(&thread->cpumask, cpumask); 426 } else { 427 spdk_cpuset_negate(&thread->cpumask); 428 } 429 430 RB_INIT(&thread->io_channels); 431 TAILQ_INIT(&thread->active_pollers); 432 RB_INIT(&thread->timed_pollers); 433 TAILQ_INIT(&thread->paused_pollers); 434 SLIST_INIT(&thread->msg_cache); 435 thread->msg_cache_count = 0; 436 437 thread->tsc_last = spdk_get_ticks(); 438 439 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 440 if (!thread->messages) { 441 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 442 free(thread); 443 return NULL; 444 } 445 446 /* Fill the local message pool cache. */ 447 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 448 if (rc == 0) { 449 /* If we can't populate the cache it's ok. The cache will get filled 450 * up organically as messages are passed to the thread. */ 451 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 452 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 453 thread->msg_cache_count++; 454 } 455 } 456 457 if (name) { 458 snprintf(thread->name, sizeof(thread->name), "%s", name); 459 } else { 460 snprintf(thread->name, sizeof(thread->name), "%p", thread); 461 } 462 463 pthread_mutex_lock(&g_devlist_mutex); 464 if (g_thread_id == 0) { 465 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 466 pthread_mutex_unlock(&g_devlist_mutex); 467 _free_thread(thread); 468 return NULL; 469 } 470 thread->id = g_thread_id++; 471 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 472 g_thread_count++; 473 pthread_mutex_unlock(&g_devlist_mutex); 474 475 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 476 thread->id, thread->name); 477 478 if (spdk_interrupt_mode_is_enabled()) { 479 thread->in_interrupt = true; 480 rc = thread_interrupt_create(thread); 481 if (rc != 0) { 482 _free_thread(thread); 483 return NULL; 484 } 485 } 486 487 if (g_new_thread_fn) { 488 rc = g_new_thread_fn(thread); 489 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 490 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 491 } 492 493 if (rc != 0) { 494 _free_thread(thread); 495 return NULL; 496 } 497 498 thread->state = SPDK_THREAD_STATE_RUNNING; 499 500 return thread; 501 } 502 503 void 504 spdk_set_thread(struct spdk_thread *thread) 505 { 506 tls_thread = thread; 507 } 508 509 static void 510 thread_exit(struct spdk_thread *thread, uint64_t now) 511 { 512 struct spdk_poller *poller; 513 struct spdk_io_channel *ch; 514 515 if (now >= thread->exit_timeout_tsc) { 516 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 517 thread->name); 518 goto exited; 519 } 520 521 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 522 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 523 SPDK_INFOLOG(thread, 524 "thread %s still has active poller %s\n", 525 thread->name, poller->name); 526 return; 527 } 528 } 529 530 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 531 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 532 SPDK_INFOLOG(thread, 533 "thread %s still has active timed poller %s\n", 534 thread->name, poller->name); 535 return; 536 } 537 } 538 539 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 540 SPDK_INFOLOG(thread, 541 "thread %s still has paused poller %s\n", 542 thread->name, poller->name); 543 return; 544 } 545 546 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 547 SPDK_INFOLOG(thread, 548 "thread %s still has channel for io_device %s\n", 549 thread->name, ch->dev->name); 550 return; 551 } 552 553 if (thread->pending_unregister_count > 0) { 554 SPDK_INFOLOG(thread, 555 "thread %s is still unregistering io_devices\n", 556 thread->name); 557 return; 558 } 559 560 exited: 561 thread->state = SPDK_THREAD_STATE_EXITED; 562 } 563 564 int 565 spdk_thread_exit(struct spdk_thread *thread) 566 { 567 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 568 569 assert(tls_thread == thread); 570 571 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 572 SPDK_INFOLOG(thread, 573 "thread %s is already exiting\n", 574 thread->name); 575 return 0; 576 } 577 578 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 579 SPDK_THREAD_EXIT_TIMEOUT_SEC); 580 thread->state = SPDK_THREAD_STATE_EXITING; 581 return 0; 582 } 583 584 bool 585 spdk_thread_is_exited(struct spdk_thread *thread) 586 { 587 return thread->state == SPDK_THREAD_STATE_EXITED; 588 } 589 590 void 591 spdk_thread_destroy(struct spdk_thread *thread) 592 { 593 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 594 595 assert(thread->state == SPDK_THREAD_STATE_EXITED); 596 597 if (tls_thread == thread) { 598 tls_thread = NULL; 599 } 600 601 _free_thread(thread); 602 } 603 604 void * 605 spdk_thread_get_ctx(struct spdk_thread *thread) 606 { 607 if (g_ctx_sz > 0) { 608 return thread->ctx; 609 } 610 611 return NULL; 612 } 613 614 struct spdk_cpuset * 615 spdk_thread_get_cpumask(struct spdk_thread *thread) 616 { 617 return &thread->cpumask; 618 } 619 620 int 621 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 622 { 623 struct spdk_thread *thread; 624 625 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 626 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 627 assert(false); 628 return -ENOTSUP; 629 } 630 631 thread = spdk_get_thread(); 632 if (!thread) { 633 SPDK_ERRLOG("Called from non-SPDK thread\n"); 634 assert(false); 635 return -EINVAL; 636 } 637 638 spdk_cpuset_copy(&thread->cpumask, cpumask); 639 640 /* Invoke framework's reschedule operation. If this function is called multiple times 641 * in a single spdk_thread_poll() context, the last cpumask will be used in the 642 * reschedule operation. 643 */ 644 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 645 646 return 0; 647 } 648 649 struct spdk_thread * 650 spdk_thread_get_from_ctx(void *ctx) 651 { 652 if (ctx == NULL) { 653 assert(false); 654 return NULL; 655 } 656 657 assert(g_ctx_sz > 0); 658 659 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 660 } 661 662 static inline uint32_t 663 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 664 { 665 unsigned count, i; 666 void *messages[SPDK_MSG_BATCH_SIZE]; 667 uint64_t notify = 1; 668 int rc; 669 670 #ifdef DEBUG 671 /* 672 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 673 * so we will never actually read uninitialized data from events, but just to be sure 674 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 675 */ 676 memset(messages, 0, sizeof(messages)); 677 #endif 678 679 if (max_msgs > 0) { 680 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 681 } else { 682 max_msgs = SPDK_MSG_BATCH_SIZE; 683 } 684 685 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 686 if (spdk_unlikely(thread->in_interrupt) && 687 spdk_ring_count(thread->messages) != 0) { 688 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 689 if (rc < 0) { 690 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 691 } 692 } 693 if (count == 0) { 694 return 0; 695 } 696 697 for (i = 0; i < count; i++) { 698 struct spdk_msg *msg = messages[i]; 699 700 assert(msg != NULL); 701 msg->fn(msg->arg); 702 703 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 704 /* Insert the messages at the head. We want to re-use the hot 705 * ones. */ 706 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 707 thread->msg_cache_count++; 708 } else { 709 spdk_mempool_put(g_spdk_msg_mempool, msg); 710 } 711 } 712 713 return count; 714 } 715 716 static void 717 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 718 { 719 struct spdk_poller *tmp __attribute__((unused)); 720 721 poller->next_run_tick = now + poller->period_ticks; 722 723 /* 724 * Insert poller in the thread's timed_pollers tree by next scheduled run time 725 * as its key. 726 */ 727 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 728 assert(tmp == NULL); 729 730 /* Update the cache only if it is empty or the inserted poller is earlier than it. 731 * RB_MIN() is not necessary here because all pollers, which has exactly the same 732 * next_run_tick as the existing poller, are inserted on the right side. 733 */ 734 if (thread->first_timed_poller == NULL || 735 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 736 thread->first_timed_poller = poller; 737 } 738 } 739 740 #ifdef __linux__ 741 static inline void 742 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 743 { 744 struct spdk_poller *tmp __attribute__((unused)); 745 746 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 747 assert(tmp != NULL); 748 749 /* This function is not used in any case that is performance critical. 750 * Update the cache simply by RB_MIN() if it needs to be changed. 751 */ 752 if (thread->first_timed_poller == poller) { 753 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 754 } 755 } 756 #endif 757 758 static void 759 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 760 { 761 if (poller->period_ticks) { 762 poller_insert_timer(thread, poller, spdk_get_ticks()); 763 } else { 764 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 765 } 766 } 767 768 static inline void 769 thread_update_stats(struct spdk_thread *thread, uint64_t end, 770 uint64_t start, int rc) 771 { 772 if (rc == 0) { 773 /* Poller status idle */ 774 thread->stats.idle_tsc += end - start; 775 } else if (rc > 0) { 776 /* Poller status busy */ 777 thread->stats.busy_tsc += end - start; 778 } 779 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 780 thread->tsc_last = end; 781 } 782 783 static inline int 784 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 785 { 786 int rc; 787 788 switch (poller->state) { 789 case SPDK_POLLER_STATE_UNREGISTERED: 790 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 791 free(poller); 792 return 0; 793 case SPDK_POLLER_STATE_PAUSING: 794 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 795 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 796 poller->state = SPDK_POLLER_STATE_PAUSED; 797 return 0; 798 case SPDK_POLLER_STATE_WAITING: 799 break; 800 default: 801 assert(false); 802 break; 803 } 804 805 poller->state = SPDK_POLLER_STATE_RUNNING; 806 rc = poller->fn(poller->arg); 807 808 poller->run_count++; 809 if (rc > 0) { 810 poller->busy_count++; 811 } 812 813 #ifdef DEBUG 814 if (rc == -1) { 815 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 816 } 817 #endif 818 819 switch (poller->state) { 820 case SPDK_POLLER_STATE_UNREGISTERED: 821 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 822 free(poller); 823 break; 824 case SPDK_POLLER_STATE_PAUSING: 825 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 826 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 827 poller->state = SPDK_POLLER_STATE_PAUSED; 828 break; 829 case SPDK_POLLER_STATE_PAUSED: 830 case SPDK_POLLER_STATE_WAITING: 831 break; 832 case SPDK_POLLER_STATE_RUNNING: 833 poller->state = SPDK_POLLER_STATE_WAITING; 834 break; 835 default: 836 assert(false); 837 break; 838 } 839 840 return rc; 841 } 842 843 static inline int 844 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 845 uint64_t now) 846 { 847 int rc; 848 849 switch (poller->state) { 850 case SPDK_POLLER_STATE_UNREGISTERED: 851 free(poller); 852 return 0; 853 case SPDK_POLLER_STATE_PAUSING: 854 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 855 poller->state = SPDK_POLLER_STATE_PAUSED; 856 return 0; 857 case SPDK_POLLER_STATE_WAITING: 858 break; 859 default: 860 assert(false); 861 break; 862 } 863 864 poller->state = SPDK_POLLER_STATE_RUNNING; 865 rc = poller->fn(poller->arg); 866 867 poller->run_count++; 868 if (rc > 0) { 869 poller->busy_count++; 870 } 871 872 #ifdef DEBUG 873 if (rc == -1) { 874 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 875 } 876 #endif 877 878 switch (poller->state) { 879 case SPDK_POLLER_STATE_UNREGISTERED: 880 free(poller); 881 break; 882 case SPDK_POLLER_STATE_PAUSING: 883 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 884 poller->state = SPDK_POLLER_STATE_PAUSED; 885 break; 886 case SPDK_POLLER_STATE_PAUSED: 887 break; 888 case SPDK_POLLER_STATE_RUNNING: 889 poller->state = SPDK_POLLER_STATE_WAITING; 890 /* fallthrough */ 891 case SPDK_POLLER_STATE_WAITING: 892 poller_insert_timer(thread, poller, now); 893 break; 894 default: 895 assert(false); 896 break; 897 } 898 899 return rc; 900 } 901 902 static int 903 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 904 { 905 uint32_t msg_count; 906 struct spdk_poller *poller, *tmp; 907 spdk_msg_fn critical_msg; 908 int rc = 0; 909 910 thread->tsc_last = now; 911 912 critical_msg = thread->critical_msg; 913 if (spdk_unlikely(critical_msg != NULL)) { 914 critical_msg(NULL); 915 thread->critical_msg = NULL; 916 rc = 1; 917 } 918 919 msg_count = msg_queue_run_batch(thread, max_msgs); 920 if (msg_count) { 921 rc = 1; 922 } 923 924 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 925 active_pollers_head, tailq, tmp) { 926 int poller_rc; 927 928 poller_rc = thread_execute_poller(thread, poller); 929 if (poller_rc > rc) { 930 rc = poller_rc; 931 } 932 } 933 934 poller = thread->first_timed_poller; 935 while (poller != NULL) { 936 int timer_rc = 0; 937 938 if (now < poller->next_run_tick) { 939 break; 940 } 941 942 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 943 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 944 945 /* Update the cache to the next timed poller in the list 946 * only if the current poller is still the closest, otherwise, 947 * do nothing because the cache has been already updated. 948 */ 949 if (thread->first_timed_poller == poller) { 950 thread->first_timed_poller = tmp; 951 } 952 953 timer_rc = thread_execute_timed_poller(thread, poller, now); 954 if (timer_rc > rc) { 955 rc = timer_rc; 956 } 957 958 poller = tmp; 959 } 960 961 return rc; 962 } 963 964 int 965 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 966 { 967 struct spdk_thread *orig_thread; 968 int rc; 969 uint64_t notify = 1; 970 971 orig_thread = _get_thread(); 972 tls_thread = thread; 973 974 if (now == 0) { 975 now = spdk_get_ticks(); 976 } 977 978 if (spdk_likely(!thread->in_interrupt)) { 979 rc = thread_poll(thread, max_msgs, now); 980 if (spdk_unlikely(thread->in_interrupt)) { 981 /* The thread transitioned to interrupt mode during the above poll. 982 * Poll it one more time in case that during the transition time 983 * there is msg received without notification. 984 */ 985 rc = thread_poll(thread, max_msgs, now); 986 } 987 } else { 988 /* Non-block wait on thread's fd_group */ 989 rc = spdk_fd_group_wait(thread->fgrp, 0); 990 if (spdk_unlikely(!thread->in_interrupt)) { 991 /* The thread transitioned to poll mode in a msg during the above processing. 992 * Clear msg_fd since thread messages will be polled directly in poll mode. 993 */ 994 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 995 if (rc < 0 && errno != EAGAIN) { 996 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 997 } 998 } 999 } 1000 1001 1002 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1003 thread_exit(thread, now); 1004 } 1005 1006 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1007 1008 tls_thread = orig_thread; 1009 1010 return rc; 1011 } 1012 1013 uint64_t 1014 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1015 { 1016 struct spdk_poller *poller; 1017 1018 poller = thread->first_timed_poller; 1019 if (poller) { 1020 return poller->next_run_tick; 1021 } 1022 1023 return 0; 1024 } 1025 1026 int 1027 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1028 { 1029 return !TAILQ_EMPTY(&thread->active_pollers); 1030 } 1031 1032 static bool 1033 thread_has_unpaused_pollers(struct spdk_thread *thread) 1034 { 1035 if (TAILQ_EMPTY(&thread->active_pollers) && 1036 RB_EMPTY(&thread->timed_pollers)) { 1037 return false; 1038 } 1039 1040 return true; 1041 } 1042 1043 bool 1044 spdk_thread_has_pollers(struct spdk_thread *thread) 1045 { 1046 if (!thread_has_unpaused_pollers(thread) && 1047 TAILQ_EMPTY(&thread->paused_pollers)) { 1048 return false; 1049 } 1050 1051 return true; 1052 } 1053 1054 bool 1055 spdk_thread_is_idle(struct spdk_thread *thread) 1056 { 1057 if (spdk_ring_count(thread->messages) || 1058 thread_has_unpaused_pollers(thread) || 1059 thread->critical_msg != NULL) { 1060 return false; 1061 } 1062 1063 return true; 1064 } 1065 1066 uint32_t 1067 spdk_thread_get_count(void) 1068 { 1069 /* 1070 * Return cached value of the current thread count. We could acquire the 1071 * lock and iterate through the TAILQ of threads to count them, but that 1072 * count could still be invalidated after we release the lock. 1073 */ 1074 return g_thread_count; 1075 } 1076 1077 struct spdk_thread * 1078 spdk_get_thread(void) 1079 { 1080 return _get_thread(); 1081 } 1082 1083 const char * 1084 spdk_thread_get_name(const struct spdk_thread *thread) 1085 { 1086 return thread->name; 1087 } 1088 1089 uint64_t 1090 spdk_thread_get_id(const struct spdk_thread *thread) 1091 { 1092 return thread->id; 1093 } 1094 1095 struct spdk_thread * 1096 spdk_thread_get_by_id(uint64_t id) 1097 { 1098 struct spdk_thread *thread; 1099 1100 if (id == 0 || id >= g_thread_id) { 1101 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1102 return NULL; 1103 } 1104 pthread_mutex_lock(&g_devlist_mutex); 1105 TAILQ_FOREACH(thread, &g_threads, tailq) { 1106 if (thread->id == id) { 1107 break; 1108 } 1109 } 1110 pthread_mutex_unlock(&g_devlist_mutex); 1111 return thread; 1112 } 1113 1114 int 1115 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1116 { 1117 struct spdk_thread *thread; 1118 1119 thread = _get_thread(); 1120 if (!thread) { 1121 SPDK_ERRLOG("No thread allocated\n"); 1122 return -EINVAL; 1123 } 1124 1125 if (stats == NULL) { 1126 return -EINVAL; 1127 } 1128 1129 *stats = thread->stats; 1130 1131 return 0; 1132 } 1133 1134 uint64_t 1135 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1136 { 1137 if (thread == NULL) { 1138 thread = _get_thread(); 1139 } 1140 1141 return thread->tsc_last; 1142 } 1143 1144 static inline int 1145 thread_send_msg_notification(const struct spdk_thread *target_thread) 1146 { 1147 uint64_t notify = 1; 1148 int rc; 1149 1150 /* Not necessary to do notification if interrupt facility is not enabled */ 1151 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1152 return 0; 1153 } 1154 1155 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1156 * after sending thread msg, it is necessary to check whether target thread runs in 1157 * interrupt mode and then decide whether do event notification. 1158 */ 1159 if (spdk_unlikely(target_thread->in_interrupt)) { 1160 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1161 if (rc < 0) { 1162 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1163 return -EIO; 1164 } 1165 } 1166 1167 return 0; 1168 } 1169 1170 int 1171 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1172 { 1173 struct spdk_thread *local_thread; 1174 struct spdk_msg *msg; 1175 int rc; 1176 1177 assert(thread != NULL); 1178 1179 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1180 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1181 return -EIO; 1182 } 1183 1184 local_thread = _get_thread(); 1185 1186 msg = NULL; 1187 if (local_thread != NULL) { 1188 if (local_thread->msg_cache_count > 0) { 1189 msg = SLIST_FIRST(&local_thread->msg_cache); 1190 assert(msg != NULL); 1191 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1192 local_thread->msg_cache_count--; 1193 } 1194 } 1195 1196 if (msg == NULL) { 1197 msg = spdk_mempool_get(g_spdk_msg_mempool); 1198 if (!msg) { 1199 SPDK_ERRLOG("msg could not be allocated\n"); 1200 return -ENOMEM; 1201 } 1202 } 1203 1204 msg->fn = fn; 1205 msg->arg = ctx; 1206 1207 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1208 if (rc != 1) { 1209 SPDK_ERRLOG("msg could not be enqueued\n"); 1210 spdk_mempool_put(g_spdk_msg_mempool, msg); 1211 return -EIO; 1212 } 1213 1214 return thread_send_msg_notification(thread); 1215 } 1216 1217 int 1218 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1219 { 1220 spdk_msg_fn expected = NULL; 1221 1222 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1223 __ATOMIC_SEQ_CST)) { 1224 return -EIO; 1225 } 1226 1227 return thread_send_msg_notification(thread); 1228 } 1229 1230 #ifdef __linux__ 1231 static int 1232 interrupt_timerfd_process(void *arg) 1233 { 1234 struct spdk_poller *poller = arg; 1235 uint64_t exp; 1236 int rc; 1237 1238 /* clear the level of interval timer */ 1239 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1240 if (rc < 0) { 1241 if (rc == -EAGAIN) { 1242 return 0; 1243 } 1244 1245 return rc; 1246 } 1247 1248 return poller->fn(poller->arg); 1249 } 1250 1251 static int 1252 period_poller_interrupt_init(struct spdk_poller *poller) 1253 { 1254 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1255 int timerfd; 1256 int rc; 1257 1258 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1259 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1260 if (timerfd < 0) { 1261 return -errno; 1262 } 1263 1264 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1265 if (rc < 0) { 1266 close(timerfd); 1267 return rc; 1268 } 1269 1270 poller->interruptfd = timerfd; 1271 return 0; 1272 } 1273 1274 static void 1275 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1276 { 1277 int timerfd = poller->interruptfd; 1278 uint64_t now_tick = spdk_get_ticks(); 1279 uint64_t ticks = spdk_get_ticks_hz(); 1280 int ret; 1281 struct itimerspec new_tv = {}; 1282 struct itimerspec old_tv = {}; 1283 1284 assert(poller->period_ticks != 0); 1285 assert(timerfd >= 0); 1286 1287 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1288 interrupt_mode ? "interrupt" : "poll"); 1289 1290 if (interrupt_mode) { 1291 /* Set repeated timer expiration */ 1292 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1293 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1294 1295 /* Update next timer expiration */ 1296 if (poller->next_run_tick == 0) { 1297 poller->next_run_tick = now_tick + poller->period_ticks; 1298 } else if (poller->next_run_tick < now_tick) { 1299 poller->next_run_tick = now_tick; 1300 } 1301 1302 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1303 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1304 1305 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1306 if (ret < 0) { 1307 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1308 assert(false); 1309 } 1310 } else { 1311 /* Disarm the timer */ 1312 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1313 if (ret < 0) { 1314 /* timerfd_settime's failure indicates that the timerfd is in error */ 1315 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1316 assert(false); 1317 } 1318 1319 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1320 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1321 */ 1322 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1323 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1324 poller_remove_timer(poller->thread, poller); 1325 poller_insert_timer(poller->thread, poller, now_tick); 1326 } 1327 } 1328 1329 static void 1330 poller_interrupt_fini(struct spdk_poller *poller) 1331 { 1332 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1333 assert(poller->interruptfd >= 0); 1334 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1335 close(poller->interruptfd); 1336 poller->interruptfd = -1; 1337 } 1338 1339 static int 1340 busy_poller_interrupt_init(struct spdk_poller *poller) 1341 { 1342 int busy_efd; 1343 int rc; 1344 1345 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1346 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1347 if (busy_efd < 0) { 1348 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1349 return -errno; 1350 } 1351 1352 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1353 poller->fn, poller->arg, poller->name); 1354 if (rc < 0) { 1355 close(busy_efd); 1356 return rc; 1357 } 1358 1359 poller->interruptfd = busy_efd; 1360 return 0; 1361 } 1362 1363 static void 1364 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1365 { 1366 int busy_efd = poller->interruptfd; 1367 uint64_t notify = 1; 1368 int rc __attribute__((unused)); 1369 1370 assert(busy_efd >= 0); 1371 1372 if (interrupt_mode) { 1373 /* Write without read on eventfd will get it repeatedly triggered. */ 1374 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1375 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1376 } 1377 } else { 1378 /* Read on eventfd will clear its level triggering. */ 1379 rc = read(busy_efd, ¬ify, sizeof(notify)); 1380 } 1381 } 1382 1383 #else 1384 1385 static int 1386 period_poller_interrupt_init(struct spdk_poller *poller) 1387 { 1388 return -ENOTSUP; 1389 } 1390 1391 static void 1392 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1393 { 1394 } 1395 1396 static void 1397 poller_interrupt_fini(struct spdk_poller *poller) 1398 { 1399 } 1400 1401 static int 1402 busy_poller_interrupt_init(struct spdk_poller *poller) 1403 { 1404 return -ENOTSUP; 1405 } 1406 1407 static void 1408 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1409 { 1410 } 1411 1412 #endif 1413 1414 void 1415 spdk_poller_register_interrupt(struct spdk_poller *poller, 1416 spdk_poller_set_interrupt_mode_cb cb_fn, 1417 void *cb_arg) 1418 { 1419 assert(poller != NULL); 1420 assert(cb_fn != NULL); 1421 assert(spdk_get_thread() == poller->thread); 1422 1423 if (!spdk_interrupt_mode_is_enabled()) { 1424 return; 1425 } 1426 1427 /* when a poller is created we don't know if the user is ever going to 1428 * enable interrupts on it by calling this function, so the poller 1429 * registration function has to immediately create a interruptfd. 1430 * When this function does get called by user, we have to then destroy 1431 * that interruptfd. 1432 */ 1433 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1434 poller_interrupt_fini(poller); 1435 } 1436 1437 poller->set_intr_cb_fn = cb_fn; 1438 poller->set_intr_cb_arg = cb_arg; 1439 1440 /* Set poller into interrupt mode if thread is in interrupt. */ 1441 if (poller->thread->in_interrupt) { 1442 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1443 } 1444 } 1445 1446 static uint64_t 1447 convert_us_to_ticks(uint64_t us) 1448 { 1449 uint64_t quotient, remainder, ticks; 1450 1451 if (us) { 1452 quotient = us / SPDK_SEC_TO_USEC; 1453 remainder = us % SPDK_SEC_TO_USEC; 1454 ticks = spdk_get_ticks_hz(); 1455 1456 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1457 } else { 1458 return 0; 1459 } 1460 } 1461 1462 static struct spdk_poller * 1463 poller_register(spdk_poller_fn fn, 1464 void *arg, 1465 uint64_t period_microseconds, 1466 const char *name) 1467 { 1468 struct spdk_thread *thread; 1469 struct spdk_poller *poller; 1470 1471 thread = spdk_get_thread(); 1472 if (!thread) { 1473 assert(false); 1474 return NULL; 1475 } 1476 1477 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1478 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1479 return NULL; 1480 } 1481 1482 poller = calloc(1, sizeof(*poller)); 1483 if (poller == NULL) { 1484 SPDK_ERRLOG("Poller memory allocation failed\n"); 1485 return NULL; 1486 } 1487 1488 if (name) { 1489 snprintf(poller->name, sizeof(poller->name), "%s", name); 1490 } else { 1491 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1492 } 1493 1494 poller->state = SPDK_POLLER_STATE_WAITING; 1495 poller->fn = fn; 1496 poller->arg = arg; 1497 poller->thread = thread; 1498 poller->interruptfd = -1; 1499 1500 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1501 1502 if (spdk_interrupt_mode_is_enabled()) { 1503 int rc; 1504 1505 if (period_microseconds) { 1506 rc = period_poller_interrupt_init(poller); 1507 if (rc < 0) { 1508 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1509 free(poller); 1510 return NULL; 1511 } 1512 1513 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1514 } else { 1515 /* If the poller doesn't have a period, create interruptfd that's always 1516 * busy automatically when runnning in interrupt mode. 1517 */ 1518 rc = busy_poller_interrupt_init(poller); 1519 if (rc > 0) { 1520 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1521 free(poller); 1522 return NULL; 1523 } 1524 1525 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1526 } 1527 } 1528 1529 thread_insert_poller(thread, poller); 1530 1531 return poller; 1532 } 1533 1534 struct spdk_poller * 1535 spdk_poller_register(spdk_poller_fn fn, 1536 void *arg, 1537 uint64_t period_microseconds) 1538 { 1539 return poller_register(fn, arg, period_microseconds, NULL); 1540 } 1541 1542 struct spdk_poller * 1543 spdk_poller_register_named(spdk_poller_fn fn, 1544 void *arg, 1545 uint64_t period_microseconds, 1546 const char *name) 1547 { 1548 return poller_register(fn, arg, period_microseconds, name); 1549 } 1550 1551 static void 1552 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1553 struct spdk_thread *curthread) 1554 { 1555 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1556 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1557 thread->name, thread->id); 1558 assert(false); 1559 } 1560 1561 void 1562 spdk_poller_unregister(struct spdk_poller **ppoller) 1563 { 1564 struct spdk_thread *thread; 1565 struct spdk_poller *poller; 1566 1567 poller = *ppoller; 1568 if (poller == NULL) { 1569 return; 1570 } 1571 1572 *ppoller = NULL; 1573 1574 thread = spdk_get_thread(); 1575 if (!thread) { 1576 assert(false); 1577 return; 1578 } 1579 1580 if (poller->thread != thread) { 1581 wrong_thread(__func__, poller->name, poller->thread, thread); 1582 return; 1583 } 1584 1585 if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) { 1586 poller_interrupt_fini(poller); 1587 } 1588 1589 /* If the poller was paused, put it on the active_pollers list so that 1590 * its unregistration can be processed by spdk_thread_poll(). 1591 */ 1592 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1593 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1594 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1595 poller->period_ticks = 0; 1596 } 1597 1598 /* Simply set the state to unregistered. The poller will get cleaned up 1599 * in a subsequent call to spdk_thread_poll(). 1600 */ 1601 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1602 } 1603 1604 void 1605 spdk_poller_pause(struct spdk_poller *poller) 1606 { 1607 struct spdk_thread *thread; 1608 1609 thread = spdk_get_thread(); 1610 if (!thread) { 1611 assert(false); 1612 return; 1613 } 1614 1615 if (poller->thread != thread) { 1616 wrong_thread(__func__, poller->name, poller->thread, thread); 1617 return; 1618 } 1619 1620 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1621 * spdk_thread_poll() move it. It allows a poller to be paused from 1622 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1623 * iteration, or from within itself without breaking the logic to always 1624 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1625 */ 1626 switch (poller->state) { 1627 case SPDK_POLLER_STATE_PAUSED: 1628 case SPDK_POLLER_STATE_PAUSING: 1629 break; 1630 case SPDK_POLLER_STATE_RUNNING: 1631 case SPDK_POLLER_STATE_WAITING: 1632 poller->state = SPDK_POLLER_STATE_PAUSING; 1633 break; 1634 default: 1635 assert(false); 1636 break; 1637 } 1638 } 1639 1640 void 1641 spdk_poller_resume(struct spdk_poller *poller) 1642 { 1643 struct spdk_thread *thread; 1644 1645 thread = spdk_get_thread(); 1646 if (!thread) { 1647 assert(false); 1648 return; 1649 } 1650 1651 if (poller->thread != thread) { 1652 wrong_thread(__func__, poller->name, poller->thread, thread); 1653 return; 1654 } 1655 1656 /* If a poller is paused it has to be removed from the paused pollers 1657 * list and put on the active list or timer tree depending on its 1658 * period_ticks. If a poller is still in the process of being paused, 1659 * we just need to flip its state back to waiting, as it's already on 1660 * the appropriate list or tree. 1661 */ 1662 switch (poller->state) { 1663 case SPDK_POLLER_STATE_PAUSED: 1664 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1665 thread_insert_poller(thread, poller); 1666 /* fallthrough */ 1667 case SPDK_POLLER_STATE_PAUSING: 1668 poller->state = SPDK_POLLER_STATE_WAITING; 1669 break; 1670 case SPDK_POLLER_STATE_RUNNING: 1671 case SPDK_POLLER_STATE_WAITING: 1672 break; 1673 default: 1674 assert(false); 1675 break; 1676 } 1677 } 1678 1679 const char * 1680 spdk_poller_get_name(struct spdk_poller *poller) 1681 { 1682 return poller->name; 1683 } 1684 1685 const char * 1686 spdk_poller_get_state_str(struct spdk_poller *poller) 1687 { 1688 switch (poller->state) { 1689 case SPDK_POLLER_STATE_WAITING: 1690 return "waiting"; 1691 case SPDK_POLLER_STATE_RUNNING: 1692 return "running"; 1693 case SPDK_POLLER_STATE_UNREGISTERED: 1694 return "unregistered"; 1695 case SPDK_POLLER_STATE_PAUSING: 1696 return "pausing"; 1697 case SPDK_POLLER_STATE_PAUSED: 1698 return "paused"; 1699 default: 1700 return NULL; 1701 } 1702 } 1703 1704 uint64_t 1705 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1706 { 1707 return poller->period_ticks; 1708 } 1709 1710 void 1711 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1712 { 1713 stats->run_count = poller->run_count; 1714 stats->busy_count = poller->busy_count; 1715 } 1716 1717 struct spdk_poller * 1718 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1719 { 1720 return TAILQ_FIRST(&thread->active_pollers); 1721 } 1722 1723 struct spdk_poller * 1724 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1725 { 1726 return TAILQ_NEXT(prev, tailq); 1727 } 1728 1729 struct spdk_poller * 1730 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1731 { 1732 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1733 } 1734 1735 struct spdk_poller * 1736 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1737 { 1738 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1739 } 1740 1741 struct spdk_poller * 1742 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1743 { 1744 return TAILQ_FIRST(&thread->paused_pollers); 1745 } 1746 1747 struct spdk_poller * 1748 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1749 { 1750 return TAILQ_NEXT(prev, tailq); 1751 } 1752 1753 struct spdk_io_channel * 1754 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1755 { 1756 return RB_MIN(io_channel_tree, &thread->io_channels); 1757 } 1758 1759 struct spdk_io_channel * 1760 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1761 { 1762 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1763 } 1764 1765 struct call_thread { 1766 struct spdk_thread *cur_thread; 1767 spdk_msg_fn fn; 1768 void *ctx; 1769 1770 struct spdk_thread *orig_thread; 1771 spdk_msg_fn cpl; 1772 }; 1773 1774 static void 1775 _on_thread(void *ctx) 1776 { 1777 struct call_thread *ct = ctx; 1778 int rc __attribute__((unused)); 1779 1780 ct->fn(ct->ctx); 1781 1782 pthread_mutex_lock(&g_devlist_mutex); 1783 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1784 pthread_mutex_unlock(&g_devlist_mutex); 1785 1786 if (!ct->cur_thread) { 1787 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1788 1789 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1790 free(ctx); 1791 } else { 1792 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1793 ct->cur_thread->name); 1794 1795 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1796 } 1797 assert(rc == 0); 1798 } 1799 1800 void 1801 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1802 { 1803 struct call_thread *ct; 1804 struct spdk_thread *thread; 1805 int rc __attribute__((unused)); 1806 1807 ct = calloc(1, sizeof(*ct)); 1808 if (!ct) { 1809 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1810 cpl(ctx); 1811 return; 1812 } 1813 1814 ct->fn = fn; 1815 ct->ctx = ctx; 1816 ct->cpl = cpl; 1817 1818 thread = _get_thread(); 1819 if (!thread) { 1820 SPDK_ERRLOG("No thread allocated\n"); 1821 free(ct); 1822 cpl(ctx); 1823 return; 1824 } 1825 ct->orig_thread = thread; 1826 1827 pthread_mutex_lock(&g_devlist_mutex); 1828 ct->cur_thread = TAILQ_FIRST(&g_threads); 1829 pthread_mutex_unlock(&g_devlist_mutex); 1830 1831 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1832 ct->orig_thread->name); 1833 1834 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1835 assert(rc == 0); 1836 } 1837 1838 static inline void 1839 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1840 { 1841 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1842 return; 1843 } 1844 1845 if (!poller->set_intr_cb_fn) { 1846 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1847 assert(false); 1848 return; 1849 } 1850 1851 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1852 } 1853 1854 void 1855 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1856 { 1857 struct spdk_thread *thread = _get_thread(); 1858 struct spdk_poller *poller, *tmp; 1859 1860 assert(thread); 1861 assert(spdk_interrupt_mode_is_enabled()); 1862 1863 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1864 thread->name, enable_interrupt ? "intr" : "poll", 1865 thread->in_interrupt ? "intr" : "poll"); 1866 1867 if (thread->in_interrupt == enable_interrupt) { 1868 return; 1869 } 1870 1871 /* Set pollers to expected mode */ 1872 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1873 poller_set_interrupt_mode(poller, enable_interrupt); 1874 } 1875 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1876 poller_set_interrupt_mode(poller, enable_interrupt); 1877 } 1878 /* All paused pollers will go to work in interrupt mode */ 1879 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1880 poller_set_interrupt_mode(poller, enable_interrupt); 1881 } 1882 1883 thread->in_interrupt = enable_interrupt; 1884 return; 1885 } 1886 1887 static struct io_device * 1888 io_device_get(void *io_device) 1889 { 1890 struct io_device find = {}; 1891 1892 find.io_device = io_device; 1893 return RB_FIND(io_device_tree, &g_io_devices, &find); 1894 } 1895 1896 void 1897 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1898 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1899 const char *name) 1900 { 1901 struct io_device *dev, *tmp; 1902 struct spdk_thread *thread; 1903 1904 assert(io_device != NULL); 1905 assert(create_cb != NULL); 1906 assert(destroy_cb != NULL); 1907 1908 thread = spdk_get_thread(); 1909 if (!thread) { 1910 SPDK_ERRLOG("called from non-SPDK thread\n"); 1911 assert(false); 1912 return; 1913 } 1914 1915 dev = calloc(1, sizeof(struct io_device)); 1916 if (dev == NULL) { 1917 SPDK_ERRLOG("could not allocate io_device\n"); 1918 return; 1919 } 1920 1921 dev->io_device = io_device; 1922 if (name) { 1923 snprintf(dev->name, sizeof(dev->name), "%s", name); 1924 } else { 1925 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1926 } 1927 dev->create_cb = create_cb; 1928 dev->destroy_cb = destroy_cb; 1929 dev->unregister_cb = NULL; 1930 dev->ctx_size = ctx_size; 1931 dev->for_each_count = 0; 1932 dev->unregistered = false; 1933 dev->refcnt = 0; 1934 1935 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1936 dev->name, dev->io_device, thread->name); 1937 1938 pthread_mutex_lock(&g_devlist_mutex); 1939 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1940 if (tmp != NULL) { 1941 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1942 io_device, tmp->name, dev->name); 1943 pthread_mutex_unlock(&g_devlist_mutex); 1944 free(dev); 1945 } 1946 1947 pthread_mutex_unlock(&g_devlist_mutex); 1948 } 1949 1950 static void 1951 _finish_unregister(void *arg) 1952 { 1953 struct io_device *dev = arg; 1954 struct spdk_thread *thread; 1955 1956 thread = spdk_get_thread(); 1957 assert(thread == dev->unregister_thread); 1958 1959 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1960 dev->name, dev->io_device, thread->name); 1961 1962 assert(thread->pending_unregister_count > 0); 1963 thread->pending_unregister_count--; 1964 1965 dev->unregister_cb(dev->io_device); 1966 free(dev); 1967 } 1968 1969 static void 1970 io_device_free(struct io_device *dev) 1971 { 1972 int rc __attribute__((unused)); 1973 1974 if (dev->unregister_cb == NULL) { 1975 free(dev); 1976 } else { 1977 assert(dev->unregister_thread != NULL); 1978 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1979 dev->name, dev->io_device, dev->unregister_thread->name); 1980 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1981 assert(rc == 0); 1982 } 1983 } 1984 1985 void 1986 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1987 { 1988 struct io_device *dev; 1989 uint32_t refcnt; 1990 struct spdk_thread *thread; 1991 1992 thread = spdk_get_thread(); 1993 if (!thread) { 1994 SPDK_ERRLOG("called from non-SPDK thread\n"); 1995 assert(false); 1996 return; 1997 } 1998 1999 pthread_mutex_lock(&g_devlist_mutex); 2000 dev = io_device_get(io_device); 2001 if (!dev) { 2002 SPDK_ERRLOG("io_device %p not found\n", io_device); 2003 assert(false); 2004 pthread_mutex_unlock(&g_devlist_mutex); 2005 return; 2006 } 2007 2008 if (dev->for_each_count > 0) { 2009 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2010 dev->name, io_device, dev->for_each_count); 2011 pthread_mutex_unlock(&g_devlist_mutex); 2012 return; 2013 } 2014 2015 dev->unregister_cb = unregister_cb; 2016 dev->unregistered = true; 2017 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2018 refcnt = dev->refcnt; 2019 dev->unregister_thread = thread; 2020 pthread_mutex_unlock(&g_devlist_mutex); 2021 2022 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2023 dev->name, dev->io_device, thread->name); 2024 2025 if (unregister_cb) { 2026 thread->pending_unregister_count++; 2027 } 2028 2029 if (refcnt > 0) { 2030 /* defer deletion */ 2031 return; 2032 } 2033 2034 io_device_free(dev); 2035 } 2036 2037 const char * 2038 spdk_io_device_get_name(struct io_device *dev) 2039 { 2040 return dev->name; 2041 } 2042 2043 static struct spdk_io_channel * 2044 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2045 { 2046 struct spdk_io_channel find = {}; 2047 2048 find.dev = dev; 2049 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2050 } 2051 2052 struct spdk_io_channel * 2053 spdk_get_io_channel(void *io_device) 2054 { 2055 struct spdk_io_channel *ch; 2056 struct spdk_thread *thread; 2057 struct io_device *dev; 2058 int rc; 2059 2060 pthread_mutex_lock(&g_devlist_mutex); 2061 dev = io_device_get(io_device); 2062 if (dev == NULL) { 2063 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2064 pthread_mutex_unlock(&g_devlist_mutex); 2065 return NULL; 2066 } 2067 2068 thread = _get_thread(); 2069 if (!thread) { 2070 SPDK_ERRLOG("No thread allocated\n"); 2071 pthread_mutex_unlock(&g_devlist_mutex); 2072 return NULL; 2073 } 2074 2075 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2076 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2077 pthread_mutex_unlock(&g_devlist_mutex); 2078 return NULL; 2079 } 2080 2081 ch = thread_get_io_channel(thread, dev); 2082 if (ch != NULL) { 2083 ch->ref++; 2084 2085 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2086 ch, dev->name, dev->io_device, thread->name, ch->ref); 2087 2088 /* 2089 * An I/O channel already exists for this device on this 2090 * thread, so return it. 2091 */ 2092 pthread_mutex_unlock(&g_devlist_mutex); 2093 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2094 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2095 return ch; 2096 } 2097 2098 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2099 if (ch == NULL) { 2100 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2101 pthread_mutex_unlock(&g_devlist_mutex); 2102 return NULL; 2103 } 2104 2105 ch->dev = dev; 2106 ch->destroy_cb = dev->destroy_cb; 2107 ch->thread = thread; 2108 ch->ref = 1; 2109 ch->destroy_ref = 0; 2110 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2111 2112 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2113 ch, dev->name, dev->io_device, thread->name, ch->ref); 2114 2115 dev->refcnt++; 2116 2117 pthread_mutex_unlock(&g_devlist_mutex); 2118 2119 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2120 if (rc != 0) { 2121 pthread_mutex_lock(&g_devlist_mutex); 2122 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2123 dev->refcnt--; 2124 free(ch); 2125 pthread_mutex_unlock(&g_devlist_mutex); 2126 return NULL; 2127 } 2128 2129 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2130 return ch; 2131 } 2132 2133 static void 2134 put_io_channel(void *arg) 2135 { 2136 struct spdk_io_channel *ch = arg; 2137 bool do_remove_dev = true; 2138 struct spdk_thread *thread; 2139 2140 thread = spdk_get_thread(); 2141 if (!thread) { 2142 SPDK_ERRLOG("called from non-SPDK thread\n"); 2143 assert(false); 2144 return; 2145 } 2146 2147 SPDK_DEBUGLOG(thread, 2148 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2149 ch, ch->dev->name, ch->dev->io_device, thread->name); 2150 2151 assert(ch->thread == thread); 2152 2153 ch->destroy_ref--; 2154 2155 if (ch->ref > 0 || ch->destroy_ref > 0) { 2156 /* 2157 * Another reference to the associated io_device was requested 2158 * after this message was sent but before it had a chance to 2159 * execute. 2160 */ 2161 return; 2162 } 2163 2164 pthread_mutex_lock(&g_devlist_mutex); 2165 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2166 pthread_mutex_unlock(&g_devlist_mutex); 2167 2168 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2169 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2170 2171 pthread_mutex_lock(&g_devlist_mutex); 2172 ch->dev->refcnt--; 2173 2174 if (!ch->dev->unregistered) { 2175 do_remove_dev = false; 2176 } 2177 2178 if (ch->dev->refcnt > 0) { 2179 do_remove_dev = false; 2180 } 2181 2182 pthread_mutex_unlock(&g_devlist_mutex); 2183 2184 if (do_remove_dev) { 2185 io_device_free(ch->dev); 2186 } 2187 free(ch); 2188 } 2189 2190 void 2191 spdk_put_io_channel(struct spdk_io_channel *ch) 2192 { 2193 struct spdk_thread *thread; 2194 int rc __attribute__((unused)); 2195 2196 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2197 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2198 2199 thread = spdk_get_thread(); 2200 if (!thread) { 2201 SPDK_ERRLOG("called from non-SPDK thread\n"); 2202 assert(false); 2203 return; 2204 } 2205 2206 if (ch->thread != thread) { 2207 wrong_thread(__func__, "ch", ch->thread, thread); 2208 return; 2209 } 2210 2211 SPDK_DEBUGLOG(thread, 2212 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2213 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2214 2215 ch->ref--; 2216 2217 if (ch->ref == 0) { 2218 ch->destroy_ref++; 2219 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2220 assert(rc == 0); 2221 } 2222 } 2223 2224 struct spdk_io_channel * 2225 spdk_io_channel_from_ctx(void *ctx) 2226 { 2227 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2228 } 2229 2230 struct spdk_thread * 2231 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2232 { 2233 return ch->thread; 2234 } 2235 2236 void * 2237 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2238 { 2239 return ch->dev->io_device; 2240 } 2241 2242 const char * 2243 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2244 { 2245 return spdk_io_device_get_name(ch->dev); 2246 } 2247 2248 int 2249 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2250 { 2251 return ch->ref; 2252 } 2253 2254 struct spdk_io_channel_iter { 2255 void *io_device; 2256 struct io_device *dev; 2257 spdk_channel_msg fn; 2258 int status; 2259 void *ctx; 2260 struct spdk_io_channel *ch; 2261 2262 struct spdk_thread *cur_thread; 2263 2264 struct spdk_thread *orig_thread; 2265 spdk_channel_for_each_cpl cpl; 2266 }; 2267 2268 void * 2269 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2270 { 2271 return i->io_device; 2272 } 2273 2274 struct spdk_io_channel * 2275 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2276 { 2277 return i->ch; 2278 } 2279 2280 void * 2281 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2282 { 2283 return i->ctx; 2284 } 2285 2286 static void 2287 _call_completion(void *ctx) 2288 { 2289 struct spdk_io_channel_iter *i = ctx; 2290 2291 if (i->cpl != NULL) { 2292 i->cpl(i, i->status); 2293 } 2294 free(i); 2295 } 2296 2297 static void 2298 _call_channel(void *ctx) 2299 { 2300 struct spdk_io_channel_iter *i = ctx; 2301 struct spdk_io_channel *ch; 2302 2303 /* 2304 * It is possible that the channel was deleted before this 2305 * message had a chance to execute. If so, skip calling 2306 * the fn() on this thread. 2307 */ 2308 pthread_mutex_lock(&g_devlist_mutex); 2309 ch = thread_get_io_channel(i->cur_thread, i->dev); 2310 pthread_mutex_unlock(&g_devlist_mutex); 2311 2312 if (ch) { 2313 i->fn(i); 2314 } else { 2315 spdk_for_each_channel_continue(i, 0); 2316 } 2317 } 2318 2319 void 2320 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2321 spdk_channel_for_each_cpl cpl) 2322 { 2323 struct spdk_thread *thread; 2324 struct spdk_io_channel *ch; 2325 struct spdk_io_channel_iter *i; 2326 int rc __attribute__((unused)); 2327 2328 i = calloc(1, sizeof(*i)); 2329 if (!i) { 2330 SPDK_ERRLOG("Unable to allocate iterator\n"); 2331 return; 2332 } 2333 2334 i->io_device = io_device; 2335 i->fn = fn; 2336 i->ctx = ctx; 2337 i->cpl = cpl; 2338 i->orig_thread = _get_thread(); 2339 2340 pthread_mutex_lock(&g_devlist_mutex); 2341 i->dev = io_device_get(io_device); 2342 if (i->dev == NULL) { 2343 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2344 assert(false); 2345 goto end; 2346 } 2347 2348 TAILQ_FOREACH(thread, &g_threads, tailq) { 2349 ch = thread_get_io_channel(thread, i->dev); 2350 if (ch != NULL) { 2351 ch->dev->for_each_count++; 2352 i->cur_thread = thread; 2353 i->ch = ch; 2354 pthread_mutex_unlock(&g_devlist_mutex); 2355 rc = spdk_thread_send_msg(thread, _call_channel, i); 2356 assert(rc == 0); 2357 return; 2358 } 2359 } 2360 2361 end: 2362 pthread_mutex_unlock(&g_devlist_mutex); 2363 2364 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2365 assert(rc == 0); 2366 } 2367 2368 void 2369 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2370 { 2371 struct spdk_thread *thread; 2372 struct spdk_io_channel *ch; 2373 int rc __attribute__((unused)); 2374 2375 assert(i->cur_thread == spdk_get_thread()); 2376 2377 i->status = status; 2378 2379 pthread_mutex_lock(&g_devlist_mutex); 2380 if (status) { 2381 goto end; 2382 } 2383 thread = TAILQ_NEXT(i->cur_thread, tailq); 2384 while (thread) { 2385 ch = thread_get_io_channel(thread, i->dev); 2386 if (ch != NULL) { 2387 i->cur_thread = thread; 2388 i->ch = ch; 2389 pthread_mutex_unlock(&g_devlist_mutex); 2390 rc = spdk_thread_send_msg(thread, _call_channel, i); 2391 assert(rc == 0); 2392 return; 2393 } 2394 thread = TAILQ_NEXT(thread, tailq); 2395 } 2396 2397 end: 2398 i->dev->for_each_count--; 2399 i->ch = NULL; 2400 pthread_mutex_unlock(&g_devlist_mutex); 2401 2402 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2403 assert(rc == 0); 2404 } 2405 2406 struct spdk_interrupt { 2407 int efd; 2408 struct spdk_thread *thread; 2409 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2410 }; 2411 2412 static void 2413 thread_interrupt_destroy(struct spdk_thread *thread) 2414 { 2415 struct spdk_fd_group *fgrp = thread->fgrp; 2416 2417 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2418 2419 if (thread->msg_fd < 0) { 2420 return; 2421 } 2422 2423 spdk_fd_group_remove(fgrp, thread->msg_fd); 2424 close(thread->msg_fd); 2425 thread->msg_fd = -1; 2426 2427 spdk_fd_group_destroy(fgrp); 2428 thread->fgrp = NULL; 2429 } 2430 2431 #ifdef __linux__ 2432 static int 2433 thread_interrupt_msg_process(void *arg) 2434 { 2435 struct spdk_thread *thread = arg; 2436 uint32_t msg_count; 2437 spdk_msg_fn critical_msg; 2438 int rc = 0; 2439 uint64_t notify = 1; 2440 2441 assert(spdk_interrupt_mode_is_enabled()); 2442 2443 /* There may be race between msg_acknowledge and another producer's msg_notify, 2444 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2445 * This can avoid msg notification missing. 2446 */ 2447 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2448 if (rc < 0 && errno != EAGAIN) { 2449 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2450 } 2451 2452 critical_msg = thread->critical_msg; 2453 if (spdk_unlikely(critical_msg != NULL)) { 2454 critical_msg(NULL); 2455 thread->critical_msg = NULL; 2456 rc = 1; 2457 } 2458 2459 msg_count = msg_queue_run_batch(thread, 0); 2460 if (msg_count) { 2461 rc = 1; 2462 } 2463 2464 return rc; 2465 } 2466 2467 static int 2468 thread_interrupt_create(struct spdk_thread *thread) 2469 { 2470 int rc; 2471 2472 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2473 2474 rc = spdk_fd_group_create(&thread->fgrp); 2475 if (rc) { 2476 return rc; 2477 } 2478 2479 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2480 if (thread->msg_fd < 0) { 2481 rc = -errno; 2482 spdk_fd_group_destroy(thread->fgrp); 2483 thread->fgrp = NULL; 2484 2485 return rc; 2486 } 2487 2488 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2489 thread_interrupt_msg_process, thread); 2490 } 2491 #else 2492 static int 2493 thread_interrupt_create(struct spdk_thread *thread) 2494 { 2495 return -ENOTSUP; 2496 } 2497 #endif 2498 2499 struct spdk_interrupt * 2500 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2501 void *arg, const char *name) 2502 { 2503 struct spdk_thread *thread; 2504 struct spdk_interrupt *intr; 2505 int ret; 2506 2507 thread = spdk_get_thread(); 2508 if (!thread) { 2509 assert(false); 2510 return NULL; 2511 } 2512 2513 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2514 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2515 return NULL; 2516 } 2517 2518 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2519 2520 if (ret != 0) { 2521 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2522 thread->name, efd, spdk_strerror(-ret)); 2523 return NULL; 2524 } 2525 2526 intr = calloc(1, sizeof(*intr)); 2527 if (intr == NULL) { 2528 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2529 return NULL; 2530 } 2531 2532 if (name) { 2533 snprintf(intr->name, sizeof(intr->name), "%s", name); 2534 } else { 2535 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2536 } 2537 2538 intr->efd = efd; 2539 intr->thread = thread; 2540 2541 return intr; 2542 } 2543 2544 void 2545 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2546 { 2547 struct spdk_thread *thread; 2548 struct spdk_interrupt *intr; 2549 2550 intr = *pintr; 2551 if (intr == NULL) { 2552 return; 2553 } 2554 2555 *pintr = NULL; 2556 2557 thread = spdk_get_thread(); 2558 if (!thread) { 2559 assert(false); 2560 return; 2561 } 2562 2563 if (intr->thread != thread) { 2564 wrong_thread(__func__, intr->name, intr->thread, thread); 2565 return; 2566 } 2567 2568 spdk_fd_group_remove(thread->fgrp, intr->efd); 2569 free(intr); 2570 } 2571 2572 int 2573 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2574 enum spdk_interrupt_event_types event_types) 2575 { 2576 struct spdk_thread *thread; 2577 2578 thread = spdk_get_thread(); 2579 if (!thread) { 2580 assert(false); 2581 return -EINVAL; 2582 } 2583 2584 if (intr->thread != thread) { 2585 wrong_thread(__func__, intr->name, intr->thread, thread); 2586 return -EINVAL; 2587 } 2588 2589 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2590 } 2591 2592 int 2593 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2594 { 2595 return spdk_fd_group_get_fd(thread->fgrp); 2596 } 2597 2598 static bool g_interrupt_mode = false; 2599 2600 int 2601 spdk_interrupt_mode_enable(void) 2602 { 2603 /* It must be called once prior to initializing the threading library. 2604 * g_spdk_msg_mempool will be valid if thread library is initialized. 2605 */ 2606 if (g_spdk_msg_mempool) { 2607 SPDK_ERRLOG("Failed due to threading library is already initailzied.\n"); 2608 return -1; 2609 } 2610 2611 #ifdef __linux__ 2612 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2613 g_interrupt_mode = true; 2614 return 0; 2615 #else 2616 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2617 g_interrupt_mode = false; 2618 return -ENOTSUP; 2619 #endif 2620 } 2621 2622 bool 2623 spdk_interrupt_mode_is_enabled(void) 2624 { 2625 return g_interrupt_mode; 2626 } 2627 2628 SPDK_LOG_REGISTER_COMPONENT(thread) 2629