1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/trace.h" 42 #include "spdk/util.h" 43 #include "spdk/fd_group.h" 44 45 #include "spdk/log.h" 46 #include "spdk_internal/thread.h" 47 #include "thread_internal.h" 48 49 #include "spdk_internal/trace_defs.h" 50 51 #ifdef __linux__ 52 #include <sys/timerfd.h> 53 #include <sys/eventfd.h> 54 #endif 55 56 #define SPDK_MSG_BATCH_SIZE 8 57 #define SPDK_MAX_DEVICE_NAME_LEN 256 58 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 59 #define SPDK_MAX_POLLER_NAME_LEN 256 60 #define SPDK_MAX_THREAD_NAME_LEN 256 61 62 enum spdk_poller_state { 63 /* The poller is registered with a thread but not currently executing its fn. */ 64 SPDK_POLLER_STATE_WAITING, 65 66 /* The poller is currently running its fn. */ 67 SPDK_POLLER_STATE_RUNNING, 68 69 /* The poller was unregistered during the execution of its fn. */ 70 SPDK_POLLER_STATE_UNREGISTERED, 71 72 /* The poller is in the process of being paused. It will be paused 73 * during the next time it's supposed to be executed. 74 */ 75 SPDK_POLLER_STATE_PAUSING, 76 77 /* The poller is registered but currently paused. It's on the 78 * paused_pollers list. 79 */ 80 SPDK_POLLER_STATE_PAUSED, 81 }; 82 83 struct spdk_poller { 84 TAILQ_ENTRY(spdk_poller) tailq; 85 RB_ENTRY(spdk_poller) node; 86 87 /* Current state of the poller; should only be accessed from the poller's thread. */ 88 enum spdk_poller_state state; 89 90 uint64_t period_ticks; 91 uint64_t next_run_tick; 92 uint64_t run_count; 93 uint64_t busy_count; 94 uint64_t id; 95 spdk_poller_fn fn; 96 void *arg; 97 struct spdk_thread *thread; 98 /* Native interruptfd for period or busy poller */ 99 int interruptfd; 100 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 101 void *set_intr_cb_arg; 102 103 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 104 }; 105 106 enum spdk_thread_state { 107 /* The thread is processing poller and message by spdk_thread_poll(). */ 108 SPDK_THREAD_STATE_RUNNING, 109 110 /* The thread is in the process of termination. It reaps unregistering 111 * poller are releasing I/O channel. 112 */ 113 SPDK_THREAD_STATE_EXITING, 114 115 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 116 SPDK_THREAD_STATE_EXITED, 117 }; 118 119 struct spdk_thread { 120 uint64_t tsc_last; 121 struct spdk_thread_stats stats; 122 /* 123 * Contains pollers actively running on this thread. Pollers 124 * are run round-robin. The thread takes one poller from the head 125 * of the ring, executes it, then puts it back at the tail of 126 * the ring. 127 */ 128 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 129 /** 130 * Contains pollers running on this thread with a periodic timer. 131 */ 132 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 133 struct spdk_poller *first_timed_poller; 134 /* 135 * Contains paused pollers. Pollers on this queue are waiting until 136 * they are resumed (in which case they're put onto the active/timer 137 * queues) or unregistered. 138 */ 139 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 140 struct spdk_ring *messages; 141 int msg_fd; 142 SLIST_HEAD(, spdk_msg) msg_cache; 143 size_t msg_cache_count; 144 spdk_msg_fn critical_msg; 145 uint64_t id; 146 uint64_t next_poller_id; 147 enum spdk_thread_state state; 148 int pending_unregister_count; 149 150 RB_HEAD(io_channel_tree, spdk_io_channel) io_channels; 151 TAILQ_ENTRY(spdk_thread) tailq; 152 153 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 154 struct spdk_cpuset cpumask; 155 uint64_t exit_timeout_tsc; 156 157 /* Indicates whether this spdk_thread currently runs in interrupt. */ 158 bool in_interrupt; 159 bool poller_unregistered; 160 struct spdk_fd_group *fgrp; 161 162 /* User context allocated at the end */ 163 uint8_t ctx[0]; 164 }; 165 166 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 167 168 static spdk_new_thread_fn g_new_thread_fn = NULL; 169 static spdk_thread_op_fn g_thread_op_fn = NULL; 170 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 171 static size_t g_ctx_sz = 0; 172 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 173 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 174 * SPDK application is required. 175 */ 176 static uint64_t g_thread_id = 1; 177 178 struct io_device { 179 void *io_device; 180 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 181 spdk_io_channel_create_cb create_cb; 182 spdk_io_channel_destroy_cb destroy_cb; 183 spdk_io_device_unregister_cb unregister_cb; 184 struct spdk_thread *unregister_thread; 185 uint32_t ctx_size; 186 uint32_t for_each_count; 187 RB_ENTRY(io_device) node; 188 189 uint32_t refcnt; 190 191 bool unregistered; 192 }; 193 194 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 195 196 static int 197 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 198 { 199 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 200 } 201 202 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 203 204 static int 205 io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2) 206 { 207 return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev); 208 } 209 210 RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp); 211 212 struct spdk_msg { 213 spdk_msg_fn fn; 214 void *arg; 215 216 SLIST_ENTRY(spdk_msg) link; 217 }; 218 219 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 220 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 221 222 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 223 static uint32_t g_thread_count = 0; 224 225 static __thread struct spdk_thread *tls_thread = NULL; 226 227 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 228 { 229 spdk_trace_register_description("THREAD_IOCH_GET", 230 TRACE_THREAD_IOCH_GET, 231 OWNER_NONE, OBJECT_NONE, 0, 232 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 233 spdk_trace_register_description("THREAD_IOCH_PUT", 234 TRACE_THREAD_IOCH_PUT, 235 OWNER_NONE, OBJECT_NONE, 0, 236 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 237 } 238 239 /* 240 * If this compare function returns zero when two next_run_ticks are equal, 241 * the macro RB_INSERT() returns a pointer to the element with the same 242 * next_run_tick. 243 * 244 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 245 * to remove as a parameter. 246 * 247 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 248 * side by returning 1 when two next_run_ticks are equal. 249 */ 250 static inline int 251 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 252 { 253 if (poller1->next_run_tick < poller2->next_run_tick) { 254 return -1; 255 } else { 256 return 1; 257 } 258 } 259 260 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 261 262 static inline struct spdk_thread * 263 _get_thread(void) 264 { 265 return tls_thread; 266 } 267 268 static int 269 _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) 270 { 271 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 272 273 g_ctx_sz = ctx_sz; 274 275 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 276 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz, 277 sizeof(struct spdk_msg), 278 0, /* No cache. We do our own. */ 279 SPDK_ENV_SOCKET_ID_ANY); 280 281 SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n", 282 msg_mempool_sz); 283 284 if (!g_spdk_msg_mempool) { 285 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 286 return -1; 287 } 288 289 return 0; 290 } 291 292 int 293 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 294 { 295 assert(g_new_thread_fn == NULL); 296 assert(g_thread_op_fn == NULL); 297 298 if (new_thread_fn == NULL) { 299 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 300 } else { 301 g_new_thread_fn = new_thread_fn; 302 } 303 304 return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE); 305 } 306 307 int 308 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 309 spdk_thread_op_supported_fn thread_op_supported_fn, 310 size_t ctx_sz, size_t msg_mempool_sz) 311 { 312 assert(g_new_thread_fn == NULL); 313 assert(g_thread_op_fn == NULL); 314 assert(g_thread_op_supported_fn == NULL); 315 316 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 317 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 318 return -EINVAL; 319 } 320 321 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 322 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 323 } else { 324 g_thread_op_fn = thread_op_fn; 325 g_thread_op_supported_fn = thread_op_supported_fn; 326 } 327 328 return _thread_lib_init(ctx_sz, msg_mempool_sz); 329 } 330 331 void 332 spdk_thread_lib_fini(void) 333 { 334 struct io_device *dev; 335 336 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 337 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 338 } 339 340 if (g_spdk_msg_mempool) { 341 spdk_mempool_free(g_spdk_msg_mempool); 342 g_spdk_msg_mempool = NULL; 343 } 344 345 g_new_thread_fn = NULL; 346 g_thread_op_fn = NULL; 347 g_thread_op_supported_fn = NULL; 348 g_ctx_sz = 0; 349 } 350 351 static void thread_interrupt_destroy(struct spdk_thread *thread); 352 static int thread_interrupt_create(struct spdk_thread *thread); 353 354 static void 355 _free_thread(struct spdk_thread *thread) 356 { 357 struct spdk_io_channel *ch; 358 struct spdk_msg *msg; 359 struct spdk_poller *poller, *ptmp; 360 361 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 362 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 363 thread->name, ch->dev->name); 364 } 365 366 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 367 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 368 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 369 poller->name); 370 } 371 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 372 free(poller); 373 } 374 375 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 376 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 377 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 378 poller->name); 379 } 380 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 381 free(poller); 382 } 383 384 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 385 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 386 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 387 free(poller); 388 } 389 390 pthread_mutex_lock(&g_devlist_mutex); 391 assert(g_thread_count > 0); 392 g_thread_count--; 393 TAILQ_REMOVE(&g_threads, thread, tailq); 394 pthread_mutex_unlock(&g_devlist_mutex); 395 396 msg = SLIST_FIRST(&thread->msg_cache); 397 while (msg != NULL) { 398 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 399 400 assert(thread->msg_cache_count > 0); 401 thread->msg_cache_count--; 402 spdk_mempool_put(g_spdk_msg_mempool, msg); 403 404 msg = SLIST_FIRST(&thread->msg_cache); 405 } 406 407 assert(thread->msg_cache_count == 0); 408 409 if (spdk_interrupt_mode_is_enabled()) { 410 thread_interrupt_destroy(thread); 411 } 412 413 spdk_ring_free(thread->messages); 414 free(thread); 415 } 416 417 struct spdk_thread * 418 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 419 { 420 struct spdk_thread *thread; 421 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 422 int rc = 0, i; 423 424 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 425 if (!thread) { 426 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 427 return NULL; 428 } 429 430 if (cpumask) { 431 spdk_cpuset_copy(&thread->cpumask, cpumask); 432 } else { 433 spdk_cpuset_negate(&thread->cpumask); 434 } 435 436 RB_INIT(&thread->io_channels); 437 TAILQ_INIT(&thread->active_pollers); 438 RB_INIT(&thread->timed_pollers); 439 TAILQ_INIT(&thread->paused_pollers); 440 SLIST_INIT(&thread->msg_cache); 441 thread->msg_cache_count = 0; 442 443 thread->tsc_last = spdk_get_ticks(); 444 445 /* Monotonic increasing ID is set to each created poller beginning at 1. Once the 446 * ID exceeds UINT64_MAX a warning message is logged 447 */ 448 thread->next_poller_id = 1; 449 450 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 451 if (!thread->messages) { 452 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 453 free(thread); 454 return NULL; 455 } 456 457 /* Fill the local message pool cache. */ 458 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 459 if (rc == 0) { 460 /* If we can't populate the cache it's ok. The cache will get filled 461 * up organically as messages are passed to the thread. */ 462 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 463 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 464 thread->msg_cache_count++; 465 } 466 } 467 468 if (name) { 469 snprintf(thread->name, sizeof(thread->name), "%s", name); 470 } else { 471 snprintf(thread->name, sizeof(thread->name), "%p", thread); 472 } 473 474 pthread_mutex_lock(&g_devlist_mutex); 475 if (g_thread_id == 0) { 476 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 477 pthread_mutex_unlock(&g_devlist_mutex); 478 _free_thread(thread); 479 return NULL; 480 } 481 thread->id = g_thread_id++; 482 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 483 g_thread_count++; 484 pthread_mutex_unlock(&g_devlist_mutex); 485 486 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 487 thread->id, thread->name); 488 489 if (spdk_interrupt_mode_is_enabled()) { 490 thread->in_interrupt = true; 491 rc = thread_interrupt_create(thread); 492 if (rc != 0) { 493 _free_thread(thread); 494 return NULL; 495 } 496 } 497 498 if (g_new_thread_fn) { 499 rc = g_new_thread_fn(thread); 500 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 501 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 502 } 503 504 if (rc != 0) { 505 _free_thread(thread); 506 return NULL; 507 } 508 509 thread->state = SPDK_THREAD_STATE_RUNNING; 510 511 return thread; 512 } 513 514 void 515 spdk_set_thread(struct spdk_thread *thread) 516 { 517 tls_thread = thread; 518 } 519 520 static void 521 thread_exit(struct spdk_thread *thread, uint64_t now) 522 { 523 struct spdk_poller *poller; 524 struct spdk_io_channel *ch; 525 526 if (now >= thread->exit_timeout_tsc) { 527 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 528 thread->name); 529 goto exited; 530 } 531 532 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 533 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 534 SPDK_INFOLOG(thread, 535 "thread %s still has active poller %s\n", 536 thread->name, poller->name); 537 return; 538 } 539 } 540 541 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 542 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 543 SPDK_INFOLOG(thread, 544 "thread %s still has active timed poller %s\n", 545 thread->name, poller->name); 546 return; 547 } 548 } 549 550 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 551 SPDK_INFOLOG(thread, 552 "thread %s still has paused poller %s\n", 553 thread->name, poller->name); 554 return; 555 } 556 557 RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { 558 SPDK_INFOLOG(thread, 559 "thread %s still has channel for io_device %s\n", 560 thread->name, ch->dev->name); 561 return; 562 } 563 564 if (thread->pending_unregister_count > 0) { 565 SPDK_INFOLOG(thread, 566 "thread %s is still unregistering io_devices\n", 567 thread->name); 568 return; 569 } 570 571 exited: 572 thread->state = SPDK_THREAD_STATE_EXITED; 573 } 574 575 int 576 spdk_thread_exit(struct spdk_thread *thread) 577 { 578 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 579 580 assert(tls_thread == thread); 581 582 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 583 SPDK_INFOLOG(thread, 584 "thread %s is already exiting\n", 585 thread->name); 586 return 0; 587 } 588 589 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 590 SPDK_THREAD_EXIT_TIMEOUT_SEC); 591 thread->state = SPDK_THREAD_STATE_EXITING; 592 return 0; 593 } 594 595 bool 596 spdk_thread_is_exited(struct spdk_thread *thread) 597 { 598 return thread->state == SPDK_THREAD_STATE_EXITED; 599 } 600 601 void 602 spdk_thread_destroy(struct spdk_thread *thread) 603 { 604 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 605 606 assert(thread->state == SPDK_THREAD_STATE_EXITED); 607 608 if (tls_thread == thread) { 609 tls_thread = NULL; 610 } 611 612 _free_thread(thread); 613 } 614 615 void * 616 spdk_thread_get_ctx(struct spdk_thread *thread) 617 { 618 if (g_ctx_sz > 0) { 619 return thread->ctx; 620 } 621 622 return NULL; 623 } 624 625 struct spdk_cpuset * 626 spdk_thread_get_cpumask(struct spdk_thread *thread) 627 { 628 return &thread->cpumask; 629 } 630 631 int 632 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 633 { 634 struct spdk_thread *thread; 635 636 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 637 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 638 assert(false); 639 return -ENOTSUP; 640 } 641 642 thread = spdk_get_thread(); 643 if (!thread) { 644 SPDK_ERRLOG("Called from non-SPDK thread\n"); 645 assert(false); 646 return -EINVAL; 647 } 648 649 spdk_cpuset_copy(&thread->cpumask, cpumask); 650 651 /* Invoke framework's reschedule operation. If this function is called multiple times 652 * in a single spdk_thread_poll() context, the last cpumask will be used in the 653 * reschedule operation. 654 */ 655 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 656 657 return 0; 658 } 659 660 struct spdk_thread * 661 spdk_thread_get_from_ctx(void *ctx) 662 { 663 if (ctx == NULL) { 664 assert(false); 665 return NULL; 666 } 667 668 assert(g_ctx_sz > 0); 669 670 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 671 } 672 673 static inline uint32_t 674 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 675 { 676 unsigned count, i; 677 void *messages[SPDK_MSG_BATCH_SIZE]; 678 uint64_t notify = 1; 679 int rc; 680 681 #ifdef DEBUG 682 /* 683 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 684 * so we will never actually read uninitialized data from events, but just to be sure 685 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 686 */ 687 memset(messages, 0, sizeof(messages)); 688 #endif 689 690 if (max_msgs > 0) { 691 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 692 } else { 693 max_msgs = SPDK_MSG_BATCH_SIZE; 694 } 695 696 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 697 if (spdk_unlikely(thread->in_interrupt) && 698 spdk_ring_count(thread->messages) != 0) { 699 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 700 if (rc < 0) { 701 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 702 } 703 } 704 if (count == 0) { 705 return 0; 706 } 707 708 for (i = 0; i < count; i++) { 709 struct spdk_msg *msg = messages[i]; 710 711 assert(msg != NULL); 712 msg->fn(msg->arg); 713 714 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 715 /* Insert the messages at the head. We want to re-use the hot 716 * ones. */ 717 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 718 thread->msg_cache_count++; 719 } else { 720 spdk_mempool_put(g_spdk_msg_mempool, msg); 721 } 722 } 723 724 return count; 725 } 726 727 static void 728 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 729 { 730 struct spdk_poller *tmp __attribute__((unused)); 731 732 poller->next_run_tick = now + poller->period_ticks; 733 734 /* 735 * Insert poller in the thread's timed_pollers tree by next scheduled run time 736 * as its key. 737 */ 738 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 739 assert(tmp == NULL); 740 741 /* Update the cache only if it is empty or the inserted poller is earlier than it. 742 * RB_MIN() is not necessary here because all pollers, which has exactly the same 743 * next_run_tick as the existing poller, are inserted on the right side. 744 */ 745 if (thread->first_timed_poller == NULL || 746 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 747 thread->first_timed_poller = poller; 748 } 749 } 750 751 static inline void 752 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 753 { 754 struct spdk_poller *tmp __attribute__((unused)); 755 756 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 757 assert(tmp != NULL); 758 759 /* This function is not used in any case that is performance critical. 760 * Update the cache simply by RB_MIN() if it needs to be changed. 761 */ 762 if (thread->first_timed_poller == poller) { 763 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 764 } 765 } 766 767 static void 768 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 769 { 770 if (poller->period_ticks) { 771 poller_insert_timer(thread, poller, spdk_get_ticks()); 772 } else { 773 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 774 } 775 } 776 777 static inline void 778 thread_update_stats(struct spdk_thread *thread, uint64_t end, 779 uint64_t start, int rc) 780 { 781 if (rc == 0) { 782 /* Poller status idle */ 783 thread->stats.idle_tsc += end - start; 784 } else if (rc > 0) { 785 /* Poller status busy */ 786 thread->stats.busy_tsc += end - start; 787 } 788 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 789 thread->tsc_last = end; 790 } 791 792 static inline int 793 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 794 { 795 int rc; 796 797 switch (poller->state) { 798 case SPDK_POLLER_STATE_UNREGISTERED: 799 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 800 free(poller); 801 return 0; 802 case SPDK_POLLER_STATE_PAUSING: 803 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 804 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 805 poller->state = SPDK_POLLER_STATE_PAUSED; 806 return 0; 807 case SPDK_POLLER_STATE_WAITING: 808 break; 809 default: 810 assert(false); 811 break; 812 } 813 814 poller->state = SPDK_POLLER_STATE_RUNNING; 815 rc = poller->fn(poller->arg); 816 817 poller->run_count++; 818 if (rc > 0) { 819 poller->busy_count++; 820 } 821 822 #ifdef DEBUG 823 if (rc == -1) { 824 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 825 } 826 #endif 827 828 switch (poller->state) { 829 case SPDK_POLLER_STATE_UNREGISTERED: 830 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 831 free(poller); 832 break; 833 case SPDK_POLLER_STATE_PAUSING: 834 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 835 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 836 poller->state = SPDK_POLLER_STATE_PAUSED; 837 break; 838 case SPDK_POLLER_STATE_PAUSED: 839 case SPDK_POLLER_STATE_WAITING: 840 break; 841 case SPDK_POLLER_STATE_RUNNING: 842 poller->state = SPDK_POLLER_STATE_WAITING; 843 break; 844 default: 845 assert(false); 846 break; 847 } 848 849 return rc; 850 } 851 852 static inline int 853 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 854 uint64_t now) 855 { 856 int rc; 857 858 switch (poller->state) { 859 case SPDK_POLLER_STATE_UNREGISTERED: 860 free(poller); 861 return 0; 862 case SPDK_POLLER_STATE_PAUSING: 863 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 864 poller->state = SPDK_POLLER_STATE_PAUSED; 865 return 0; 866 case SPDK_POLLER_STATE_WAITING: 867 break; 868 default: 869 assert(false); 870 break; 871 } 872 873 poller->state = SPDK_POLLER_STATE_RUNNING; 874 rc = poller->fn(poller->arg); 875 876 poller->run_count++; 877 if (rc > 0) { 878 poller->busy_count++; 879 } 880 881 #ifdef DEBUG 882 if (rc == -1) { 883 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 884 } 885 #endif 886 887 switch (poller->state) { 888 case SPDK_POLLER_STATE_UNREGISTERED: 889 free(poller); 890 break; 891 case SPDK_POLLER_STATE_PAUSING: 892 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 893 poller->state = SPDK_POLLER_STATE_PAUSED; 894 break; 895 case SPDK_POLLER_STATE_PAUSED: 896 break; 897 case SPDK_POLLER_STATE_RUNNING: 898 poller->state = SPDK_POLLER_STATE_WAITING; 899 /* fallthrough */ 900 case SPDK_POLLER_STATE_WAITING: 901 poller_insert_timer(thread, poller, now); 902 break; 903 default: 904 assert(false); 905 break; 906 } 907 908 return rc; 909 } 910 911 static int 912 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 913 { 914 uint32_t msg_count; 915 struct spdk_poller *poller, *tmp; 916 spdk_msg_fn critical_msg; 917 int rc = 0; 918 919 thread->tsc_last = now; 920 921 critical_msg = thread->critical_msg; 922 if (spdk_unlikely(critical_msg != NULL)) { 923 critical_msg(NULL); 924 thread->critical_msg = NULL; 925 rc = 1; 926 } 927 928 msg_count = msg_queue_run_batch(thread, max_msgs); 929 if (msg_count) { 930 rc = 1; 931 } 932 933 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 934 active_pollers_head, tailq, tmp) { 935 int poller_rc; 936 937 poller_rc = thread_execute_poller(thread, poller); 938 if (poller_rc > rc) { 939 rc = poller_rc; 940 } 941 } 942 943 poller = thread->first_timed_poller; 944 while (poller != NULL) { 945 int timer_rc = 0; 946 947 if (now < poller->next_run_tick) { 948 break; 949 } 950 951 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 952 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 953 954 /* Update the cache to the next timed poller in the list 955 * only if the current poller is still the closest, otherwise, 956 * do nothing because the cache has been already updated. 957 */ 958 if (thread->first_timed_poller == poller) { 959 thread->first_timed_poller = tmp; 960 } 961 962 timer_rc = thread_execute_timed_poller(thread, poller, now); 963 if (timer_rc > rc) { 964 rc = timer_rc; 965 } 966 967 poller = tmp; 968 } 969 970 return rc; 971 } 972 973 int 974 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 975 { 976 struct spdk_thread *orig_thread; 977 int rc; 978 uint64_t notify = 1; 979 980 orig_thread = _get_thread(); 981 tls_thread = thread; 982 983 if (now == 0) { 984 now = spdk_get_ticks(); 985 } 986 987 if (spdk_likely(!thread->in_interrupt)) { 988 rc = thread_poll(thread, max_msgs, now); 989 if (spdk_unlikely(thread->in_interrupt)) { 990 /* The thread transitioned to interrupt mode during the above poll. 991 * Poll it one more time in case that during the transition time 992 * there is msg received without notification. 993 */ 994 rc = thread_poll(thread, max_msgs, now); 995 } 996 } else { 997 /* Non-block wait on thread's fd_group */ 998 rc = spdk_fd_group_wait(thread->fgrp, 0); 999 if (spdk_unlikely(!thread->in_interrupt)) { 1000 /* The thread transitioned to poll mode in a msg during the above processing. 1001 * Clear msg_fd since thread messages will be polled directly in poll mode. 1002 */ 1003 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 1004 if (rc < 0 && errno != EAGAIN) { 1005 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 1006 } 1007 } 1008 1009 /* Reap unregistered pollers out of poller execution in intr mode */ 1010 if (spdk_unlikely(thread->poller_unregistered)) { 1011 struct spdk_poller *poller, *tmp; 1012 1013 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 1014 active_pollers_head, tailq, tmp) { 1015 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1016 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1017 free(poller); 1018 } 1019 } 1020 1021 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1022 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1023 poller_remove_timer(thread, poller); 1024 free(poller); 1025 } 1026 } 1027 1028 thread->poller_unregistered = false; 1029 } 1030 } 1031 1032 1033 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 1034 thread_exit(thread, now); 1035 } 1036 1037 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1038 1039 tls_thread = orig_thread; 1040 1041 return rc; 1042 } 1043 1044 uint64_t 1045 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1046 { 1047 struct spdk_poller *poller; 1048 1049 poller = thread->first_timed_poller; 1050 if (poller) { 1051 return poller->next_run_tick; 1052 } 1053 1054 return 0; 1055 } 1056 1057 int 1058 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1059 { 1060 return !TAILQ_EMPTY(&thread->active_pollers); 1061 } 1062 1063 static bool 1064 thread_has_unpaused_pollers(struct spdk_thread *thread) 1065 { 1066 if (TAILQ_EMPTY(&thread->active_pollers) && 1067 RB_EMPTY(&thread->timed_pollers)) { 1068 return false; 1069 } 1070 1071 return true; 1072 } 1073 1074 bool 1075 spdk_thread_has_pollers(struct spdk_thread *thread) 1076 { 1077 if (!thread_has_unpaused_pollers(thread) && 1078 TAILQ_EMPTY(&thread->paused_pollers)) { 1079 return false; 1080 } 1081 1082 return true; 1083 } 1084 1085 bool 1086 spdk_thread_is_idle(struct spdk_thread *thread) 1087 { 1088 if (spdk_ring_count(thread->messages) || 1089 thread_has_unpaused_pollers(thread) || 1090 thread->critical_msg != NULL) { 1091 return false; 1092 } 1093 1094 return true; 1095 } 1096 1097 uint32_t 1098 spdk_thread_get_count(void) 1099 { 1100 /* 1101 * Return cached value of the current thread count. We could acquire the 1102 * lock and iterate through the TAILQ of threads to count them, but that 1103 * count could still be invalidated after we release the lock. 1104 */ 1105 return g_thread_count; 1106 } 1107 1108 struct spdk_thread * 1109 spdk_get_thread(void) 1110 { 1111 return _get_thread(); 1112 } 1113 1114 const char * 1115 spdk_thread_get_name(const struct spdk_thread *thread) 1116 { 1117 return thread->name; 1118 } 1119 1120 uint64_t 1121 spdk_thread_get_id(const struct spdk_thread *thread) 1122 { 1123 return thread->id; 1124 } 1125 1126 struct spdk_thread * 1127 spdk_thread_get_by_id(uint64_t id) 1128 { 1129 struct spdk_thread *thread; 1130 1131 if (id == 0 || id >= g_thread_id) { 1132 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1133 return NULL; 1134 } 1135 pthread_mutex_lock(&g_devlist_mutex); 1136 TAILQ_FOREACH(thread, &g_threads, tailq) { 1137 if (thread->id == id) { 1138 break; 1139 } 1140 } 1141 pthread_mutex_unlock(&g_devlist_mutex); 1142 return thread; 1143 } 1144 1145 int 1146 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1147 { 1148 struct spdk_thread *thread; 1149 1150 thread = _get_thread(); 1151 if (!thread) { 1152 SPDK_ERRLOG("No thread allocated\n"); 1153 return -EINVAL; 1154 } 1155 1156 if (stats == NULL) { 1157 return -EINVAL; 1158 } 1159 1160 *stats = thread->stats; 1161 1162 return 0; 1163 } 1164 1165 uint64_t 1166 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1167 { 1168 if (thread == NULL) { 1169 thread = _get_thread(); 1170 } 1171 1172 return thread->tsc_last; 1173 } 1174 1175 static inline int 1176 thread_send_msg_notification(const struct spdk_thread *target_thread) 1177 { 1178 uint64_t notify = 1; 1179 int rc; 1180 1181 /* Not necessary to do notification if interrupt facility is not enabled */ 1182 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1183 return 0; 1184 } 1185 1186 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1187 * after sending thread msg, it is necessary to check whether target thread runs in 1188 * interrupt mode and then decide whether do event notification. 1189 */ 1190 if (spdk_unlikely(target_thread->in_interrupt)) { 1191 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1192 if (rc < 0) { 1193 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1194 return -EIO; 1195 } 1196 } 1197 1198 return 0; 1199 } 1200 1201 int 1202 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1203 { 1204 struct spdk_thread *local_thread; 1205 struct spdk_msg *msg; 1206 int rc; 1207 1208 assert(thread != NULL); 1209 1210 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1211 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1212 return -EIO; 1213 } 1214 1215 local_thread = _get_thread(); 1216 1217 msg = NULL; 1218 if (local_thread != NULL) { 1219 if (local_thread->msg_cache_count > 0) { 1220 msg = SLIST_FIRST(&local_thread->msg_cache); 1221 assert(msg != NULL); 1222 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1223 local_thread->msg_cache_count--; 1224 } 1225 } 1226 1227 if (msg == NULL) { 1228 msg = spdk_mempool_get(g_spdk_msg_mempool); 1229 if (!msg) { 1230 SPDK_ERRLOG("msg could not be allocated\n"); 1231 return -ENOMEM; 1232 } 1233 } 1234 1235 msg->fn = fn; 1236 msg->arg = ctx; 1237 1238 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1239 if (rc != 1) { 1240 SPDK_ERRLOG("msg could not be enqueued\n"); 1241 spdk_mempool_put(g_spdk_msg_mempool, msg); 1242 return -EIO; 1243 } 1244 1245 return thread_send_msg_notification(thread); 1246 } 1247 1248 int 1249 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1250 { 1251 spdk_msg_fn expected = NULL; 1252 1253 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1254 __ATOMIC_SEQ_CST)) { 1255 return -EIO; 1256 } 1257 1258 return thread_send_msg_notification(thread); 1259 } 1260 1261 #ifdef __linux__ 1262 static int 1263 interrupt_timerfd_process(void *arg) 1264 { 1265 struct spdk_poller *poller = arg; 1266 uint64_t exp; 1267 int rc; 1268 1269 /* clear the level of interval timer */ 1270 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1271 if (rc < 0) { 1272 if (rc == -EAGAIN) { 1273 return 0; 1274 } 1275 1276 return rc; 1277 } 1278 1279 return poller->fn(poller->arg); 1280 } 1281 1282 static int 1283 period_poller_interrupt_init(struct spdk_poller *poller) 1284 { 1285 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1286 int timerfd; 1287 int rc; 1288 1289 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1290 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1291 if (timerfd < 0) { 1292 return -errno; 1293 } 1294 1295 rc = SPDK_FD_GROUP_ADD(fgrp, timerfd, interrupt_timerfd_process, poller); 1296 if (rc < 0) { 1297 close(timerfd); 1298 return rc; 1299 } 1300 1301 poller->interruptfd = timerfd; 1302 return 0; 1303 } 1304 1305 static void 1306 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1307 { 1308 int timerfd = poller->interruptfd; 1309 uint64_t now_tick = spdk_get_ticks(); 1310 uint64_t ticks = spdk_get_ticks_hz(); 1311 int ret; 1312 struct itimerspec new_tv = {}; 1313 struct itimerspec old_tv = {}; 1314 1315 assert(poller->period_ticks != 0); 1316 assert(timerfd >= 0); 1317 1318 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1319 interrupt_mode ? "interrupt" : "poll"); 1320 1321 if (interrupt_mode) { 1322 /* Set repeated timer expiration */ 1323 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1324 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1325 1326 /* Update next timer expiration */ 1327 if (poller->next_run_tick == 0) { 1328 poller->next_run_tick = now_tick + poller->period_ticks; 1329 } else if (poller->next_run_tick < now_tick) { 1330 poller->next_run_tick = now_tick; 1331 } 1332 1333 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1334 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1335 1336 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1337 if (ret < 0) { 1338 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1339 assert(false); 1340 } 1341 } else { 1342 /* Disarm the timer */ 1343 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1344 if (ret < 0) { 1345 /* timerfd_settime's failure indicates that the timerfd is in error */ 1346 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1347 assert(false); 1348 } 1349 1350 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1351 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1352 */ 1353 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1354 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1355 poller_remove_timer(poller->thread, poller); 1356 poller_insert_timer(poller->thread, poller, now_tick); 1357 } 1358 } 1359 1360 static void 1361 poller_interrupt_fini(struct spdk_poller *poller) 1362 { 1363 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1364 assert(poller->interruptfd >= 0); 1365 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1366 close(poller->interruptfd); 1367 poller->interruptfd = -1; 1368 } 1369 1370 static int 1371 busy_poller_interrupt_init(struct spdk_poller *poller) 1372 { 1373 int busy_efd; 1374 int rc; 1375 1376 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1377 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1378 if (busy_efd < 0) { 1379 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1380 return -errno; 1381 } 1382 1383 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, 1384 poller->fn, poller->arg, poller->name); 1385 if (rc < 0) { 1386 close(busy_efd); 1387 return rc; 1388 } 1389 1390 poller->interruptfd = busy_efd; 1391 return 0; 1392 } 1393 1394 static void 1395 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1396 { 1397 int busy_efd = poller->interruptfd; 1398 uint64_t notify = 1; 1399 int rc __attribute__((unused)); 1400 1401 assert(busy_efd >= 0); 1402 1403 if (interrupt_mode) { 1404 /* Write without read on eventfd will get it repeatedly triggered. */ 1405 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1406 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1407 } 1408 } else { 1409 /* Read on eventfd will clear its level triggering. */ 1410 rc = read(busy_efd, ¬ify, sizeof(notify)); 1411 } 1412 } 1413 1414 #else 1415 1416 static int 1417 period_poller_interrupt_init(struct spdk_poller *poller) 1418 { 1419 return -ENOTSUP; 1420 } 1421 1422 static void 1423 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1424 { 1425 } 1426 1427 static void 1428 poller_interrupt_fini(struct spdk_poller *poller) 1429 { 1430 } 1431 1432 static int 1433 busy_poller_interrupt_init(struct spdk_poller *poller) 1434 { 1435 return -ENOTSUP; 1436 } 1437 1438 static void 1439 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1440 { 1441 } 1442 1443 #endif 1444 1445 void 1446 spdk_poller_register_interrupt(struct spdk_poller *poller, 1447 spdk_poller_set_interrupt_mode_cb cb_fn, 1448 void *cb_arg) 1449 { 1450 assert(poller != NULL); 1451 assert(cb_fn != NULL); 1452 assert(spdk_get_thread() == poller->thread); 1453 1454 if (!spdk_interrupt_mode_is_enabled()) { 1455 return; 1456 } 1457 1458 /* when a poller is created we don't know if the user is ever going to 1459 * enable interrupts on it by calling this function, so the poller 1460 * registration function has to immediately create a interruptfd. 1461 * When this function does get called by user, we have to then destroy 1462 * that interruptfd. 1463 */ 1464 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1465 poller_interrupt_fini(poller); 1466 } 1467 1468 poller->set_intr_cb_fn = cb_fn; 1469 poller->set_intr_cb_arg = cb_arg; 1470 1471 /* Set poller into interrupt mode if thread is in interrupt. */ 1472 if (poller->thread->in_interrupt) { 1473 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1474 } 1475 } 1476 1477 static uint64_t 1478 convert_us_to_ticks(uint64_t us) 1479 { 1480 uint64_t quotient, remainder, ticks; 1481 1482 if (us) { 1483 quotient = us / SPDK_SEC_TO_USEC; 1484 remainder = us % SPDK_SEC_TO_USEC; 1485 ticks = spdk_get_ticks_hz(); 1486 1487 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1488 } else { 1489 return 0; 1490 } 1491 } 1492 1493 static struct spdk_poller * 1494 poller_register(spdk_poller_fn fn, 1495 void *arg, 1496 uint64_t period_microseconds, 1497 const char *name) 1498 { 1499 struct spdk_thread *thread; 1500 struct spdk_poller *poller; 1501 1502 thread = spdk_get_thread(); 1503 if (!thread) { 1504 assert(false); 1505 return NULL; 1506 } 1507 1508 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1509 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1510 return NULL; 1511 } 1512 1513 poller = calloc(1, sizeof(*poller)); 1514 if (poller == NULL) { 1515 SPDK_ERRLOG("Poller memory allocation failed\n"); 1516 return NULL; 1517 } 1518 1519 if (name) { 1520 snprintf(poller->name, sizeof(poller->name), "%s", name); 1521 } else { 1522 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1523 } 1524 1525 poller->state = SPDK_POLLER_STATE_WAITING; 1526 poller->fn = fn; 1527 poller->arg = arg; 1528 poller->thread = thread; 1529 poller->interruptfd = -1; 1530 if (thread->next_poller_id == 0) { 1531 SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n"); 1532 thread->next_poller_id = 1; 1533 } 1534 poller->id = thread->next_poller_id++; 1535 1536 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1537 1538 if (spdk_interrupt_mode_is_enabled()) { 1539 int rc; 1540 1541 if (period_microseconds) { 1542 rc = period_poller_interrupt_init(poller); 1543 if (rc < 0) { 1544 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1545 free(poller); 1546 return NULL; 1547 } 1548 1549 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1550 } else { 1551 /* If the poller doesn't have a period, create interruptfd that's always 1552 * busy automatically when running in interrupt mode. 1553 */ 1554 rc = busy_poller_interrupt_init(poller); 1555 if (rc > 0) { 1556 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1557 free(poller); 1558 return NULL; 1559 } 1560 1561 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1562 } 1563 } 1564 1565 thread_insert_poller(thread, poller); 1566 1567 return poller; 1568 } 1569 1570 struct spdk_poller * 1571 spdk_poller_register(spdk_poller_fn fn, 1572 void *arg, 1573 uint64_t period_microseconds) 1574 { 1575 return poller_register(fn, arg, period_microseconds, NULL); 1576 } 1577 1578 struct spdk_poller * 1579 spdk_poller_register_named(spdk_poller_fn fn, 1580 void *arg, 1581 uint64_t period_microseconds, 1582 const char *name) 1583 { 1584 return poller_register(fn, arg, period_microseconds, name); 1585 } 1586 1587 static void 1588 wrong_thread(const char *func, const char *name, struct spdk_thread *thread, 1589 struct spdk_thread *curthread) 1590 { 1591 if (thread == NULL) { 1592 SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name); 1593 abort(); 1594 } 1595 SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be " 1596 "%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id, 1597 thread->name, thread->id); 1598 assert(false); 1599 } 1600 1601 void 1602 spdk_poller_unregister(struct spdk_poller **ppoller) 1603 { 1604 struct spdk_thread *thread; 1605 struct spdk_poller *poller; 1606 1607 poller = *ppoller; 1608 if (poller == NULL) { 1609 return; 1610 } 1611 1612 *ppoller = NULL; 1613 1614 thread = spdk_get_thread(); 1615 if (!thread) { 1616 assert(false); 1617 return; 1618 } 1619 1620 if (poller->thread != thread) { 1621 wrong_thread(__func__, poller->name, poller->thread, thread); 1622 return; 1623 } 1624 1625 if (spdk_interrupt_mode_is_enabled()) { 1626 /* Release the interrupt resource for period or busy poller */ 1627 if (poller->interruptfd >= 0) { 1628 poller_interrupt_fini(poller); 1629 } 1630 1631 /* Mark there is poller unregistered. Then unregistered pollers will 1632 * get reaped by spdk_thread_poll also in intr mode. 1633 */ 1634 thread->poller_unregistered = true; 1635 } 1636 1637 /* If the poller was paused, put it on the active_pollers list so that 1638 * its unregistration can be processed by spdk_thread_poll(). 1639 */ 1640 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1641 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1642 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1643 poller->period_ticks = 0; 1644 } 1645 1646 /* Simply set the state to unregistered. The poller will get cleaned up 1647 * in a subsequent call to spdk_thread_poll(). 1648 */ 1649 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1650 } 1651 1652 void 1653 spdk_poller_pause(struct spdk_poller *poller) 1654 { 1655 struct spdk_thread *thread; 1656 1657 thread = spdk_get_thread(); 1658 if (!thread) { 1659 assert(false); 1660 return; 1661 } 1662 1663 if (poller->thread != thread) { 1664 wrong_thread(__func__, poller->name, poller->thread, thread); 1665 return; 1666 } 1667 1668 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1669 * spdk_thread_poll() move it. It allows a poller to be paused from 1670 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1671 * iteration, or from within itself without breaking the logic to always 1672 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1673 */ 1674 switch (poller->state) { 1675 case SPDK_POLLER_STATE_PAUSED: 1676 case SPDK_POLLER_STATE_PAUSING: 1677 break; 1678 case SPDK_POLLER_STATE_RUNNING: 1679 case SPDK_POLLER_STATE_WAITING: 1680 poller->state = SPDK_POLLER_STATE_PAUSING; 1681 break; 1682 default: 1683 assert(false); 1684 break; 1685 } 1686 } 1687 1688 void 1689 spdk_poller_resume(struct spdk_poller *poller) 1690 { 1691 struct spdk_thread *thread; 1692 1693 thread = spdk_get_thread(); 1694 if (!thread) { 1695 assert(false); 1696 return; 1697 } 1698 1699 if (poller->thread != thread) { 1700 wrong_thread(__func__, poller->name, poller->thread, thread); 1701 return; 1702 } 1703 1704 /* If a poller is paused it has to be removed from the paused pollers 1705 * list and put on the active list or timer tree depending on its 1706 * period_ticks. If a poller is still in the process of being paused, 1707 * we just need to flip its state back to waiting, as it's already on 1708 * the appropriate list or tree. 1709 */ 1710 switch (poller->state) { 1711 case SPDK_POLLER_STATE_PAUSED: 1712 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1713 thread_insert_poller(thread, poller); 1714 /* fallthrough */ 1715 case SPDK_POLLER_STATE_PAUSING: 1716 poller->state = SPDK_POLLER_STATE_WAITING; 1717 break; 1718 case SPDK_POLLER_STATE_RUNNING: 1719 case SPDK_POLLER_STATE_WAITING: 1720 break; 1721 default: 1722 assert(false); 1723 break; 1724 } 1725 } 1726 1727 const char * 1728 spdk_poller_get_name(struct spdk_poller *poller) 1729 { 1730 return poller->name; 1731 } 1732 1733 uint64_t 1734 spdk_poller_get_id(struct spdk_poller *poller) 1735 { 1736 return poller->id; 1737 } 1738 1739 const char * 1740 spdk_poller_get_state_str(struct spdk_poller *poller) 1741 { 1742 switch (poller->state) { 1743 case SPDK_POLLER_STATE_WAITING: 1744 return "waiting"; 1745 case SPDK_POLLER_STATE_RUNNING: 1746 return "running"; 1747 case SPDK_POLLER_STATE_UNREGISTERED: 1748 return "unregistered"; 1749 case SPDK_POLLER_STATE_PAUSING: 1750 return "pausing"; 1751 case SPDK_POLLER_STATE_PAUSED: 1752 return "paused"; 1753 default: 1754 return NULL; 1755 } 1756 } 1757 1758 uint64_t 1759 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1760 { 1761 return poller->period_ticks; 1762 } 1763 1764 void 1765 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1766 { 1767 stats->run_count = poller->run_count; 1768 stats->busy_count = poller->busy_count; 1769 } 1770 1771 struct spdk_poller * 1772 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1773 { 1774 return TAILQ_FIRST(&thread->active_pollers); 1775 } 1776 1777 struct spdk_poller * 1778 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1779 { 1780 return TAILQ_NEXT(prev, tailq); 1781 } 1782 1783 struct spdk_poller * 1784 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1785 { 1786 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1787 } 1788 1789 struct spdk_poller * 1790 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1791 { 1792 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1793 } 1794 1795 struct spdk_poller * 1796 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1797 { 1798 return TAILQ_FIRST(&thread->paused_pollers); 1799 } 1800 1801 struct spdk_poller * 1802 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1803 { 1804 return TAILQ_NEXT(prev, tailq); 1805 } 1806 1807 struct spdk_io_channel * 1808 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1809 { 1810 return RB_MIN(io_channel_tree, &thread->io_channels); 1811 } 1812 1813 struct spdk_io_channel * 1814 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1815 { 1816 return RB_NEXT(io_channel_tree, &thread->io_channels, prev); 1817 } 1818 1819 struct call_thread { 1820 struct spdk_thread *cur_thread; 1821 spdk_msg_fn fn; 1822 void *ctx; 1823 1824 struct spdk_thread *orig_thread; 1825 spdk_msg_fn cpl; 1826 }; 1827 1828 static void 1829 _on_thread(void *ctx) 1830 { 1831 struct call_thread *ct = ctx; 1832 int rc __attribute__((unused)); 1833 1834 ct->fn(ct->ctx); 1835 1836 pthread_mutex_lock(&g_devlist_mutex); 1837 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1838 pthread_mutex_unlock(&g_devlist_mutex); 1839 1840 if (!ct->cur_thread) { 1841 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1842 1843 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1844 free(ctx); 1845 } else { 1846 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1847 ct->cur_thread->name); 1848 1849 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1850 } 1851 assert(rc == 0); 1852 } 1853 1854 void 1855 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1856 { 1857 struct call_thread *ct; 1858 struct spdk_thread *thread; 1859 int rc __attribute__((unused)); 1860 1861 ct = calloc(1, sizeof(*ct)); 1862 if (!ct) { 1863 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1864 cpl(ctx); 1865 return; 1866 } 1867 1868 ct->fn = fn; 1869 ct->ctx = ctx; 1870 ct->cpl = cpl; 1871 1872 thread = _get_thread(); 1873 if (!thread) { 1874 SPDK_ERRLOG("No thread allocated\n"); 1875 free(ct); 1876 cpl(ctx); 1877 return; 1878 } 1879 ct->orig_thread = thread; 1880 1881 pthread_mutex_lock(&g_devlist_mutex); 1882 ct->cur_thread = TAILQ_FIRST(&g_threads); 1883 pthread_mutex_unlock(&g_devlist_mutex); 1884 1885 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1886 ct->orig_thread->name); 1887 1888 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1889 assert(rc == 0); 1890 } 1891 1892 static inline void 1893 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1894 { 1895 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1896 return; 1897 } 1898 1899 if (!poller->set_intr_cb_fn) { 1900 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1901 assert(false); 1902 return; 1903 } 1904 1905 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1906 } 1907 1908 void 1909 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1910 { 1911 struct spdk_thread *thread = _get_thread(); 1912 struct spdk_poller *poller, *tmp; 1913 1914 assert(thread); 1915 assert(spdk_interrupt_mode_is_enabled()); 1916 1917 SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n", 1918 thread->name, enable_interrupt ? "intr" : "poll", 1919 thread->in_interrupt ? "intr" : "poll"); 1920 1921 if (thread->in_interrupt == enable_interrupt) { 1922 return; 1923 } 1924 1925 /* Set pollers to expected mode */ 1926 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1927 poller_set_interrupt_mode(poller, enable_interrupt); 1928 } 1929 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1930 poller_set_interrupt_mode(poller, enable_interrupt); 1931 } 1932 /* All paused pollers will go to work in interrupt mode */ 1933 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1934 poller_set_interrupt_mode(poller, enable_interrupt); 1935 } 1936 1937 thread->in_interrupt = enable_interrupt; 1938 return; 1939 } 1940 1941 static struct io_device * 1942 io_device_get(void *io_device) 1943 { 1944 struct io_device find = {}; 1945 1946 find.io_device = io_device; 1947 return RB_FIND(io_device_tree, &g_io_devices, &find); 1948 } 1949 1950 void 1951 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1952 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1953 const char *name) 1954 { 1955 struct io_device *dev, *tmp; 1956 struct spdk_thread *thread; 1957 1958 assert(io_device != NULL); 1959 assert(create_cb != NULL); 1960 assert(destroy_cb != NULL); 1961 1962 thread = spdk_get_thread(); 1963 if (!thread) { 1964 SPDK_ERRLOG("called from non-SPDK thread\n"); 1965 assert(false); 1966 return; 1967 } 1968 1969 dev = calloc(1, sizeof(struct io_device)); 1970 if (dev == NULL) { 1971 SPDK_ERRLOG("could not allocate io_device\n"); 1972 return; 1973 } 1974 1975 dev->io_device = io_device; 1976 if (name) { 1977 snprintf(dev->name, sizeof(dev->name), "%s", name); 1978 } else { 1979 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1980 } 1981 dev->create_cb = create_cb; 1982 dev->destroy_cb = destroy_cb; 1983 dev->unregister_cb = NULL; 1984 dev->ctx_size = ctx_size; 1985 dev->for_each_count = 0; 1986 dev->unregistered = false; 1987 dev->refcnt = 0; 1988 1989 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1990 dev->name, dev->io_device, thread->name); 1991 1992 pthread_mutex_lock(&g_devlist_mutex); 1993 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1994 if (tmp != NULL) { 1995 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1996 io_device, tmp->name, dev->name); 1997 free(dev); 1998 } 1999 2000 pthread_mutex_unlock(&g_devlist_mutex); 2001 } 2002 2003 static void 2004 _finish_unregister(void *arg) 2005 { 2006 struct io_device *dev = arg; 2007 struct spdk_thread *thread; 2008 2009 thread = spdk_get_thread(); 2010 assert(thread == dev->unregister_thread); 2011 2012 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 2013 dev->name, dev->io_device, thread->name); 2014 2015 assert(thread->pending_unregister_count > 0); 2016 thread->pending_unregister_count--; 2017 2018 dev->unregister_cb(dev->io_device); 2019 free(dev); 2020 } 2021 2022 static void 2023 io_device_free(struct io_device *dev) 2024 { 2025 int rc __attribute__((unused)); 2026 2027 if (dev->unregister_cb == NULL) { 2028 free(dev); 2029 } else { 2030 assert(dev->unregister_thread != NULL); 2031 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 2032 dev->name, dev->io_device, dev->unregister_thread->name); 2033 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 2034 assert(rc == 0); 2035 } 2036 } 2037 2038 void 2039 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 2040 { 2041 struct io_device *dev; 2042 uint32_t refcnt; 2043 struct spdk_thread *thread; 2044 2045 thread = spdk_get_thread(); 2046 if (!thread) { 2047 SPDK_ERRLOG("called from non-SPDK thread\n"); 2048 assert(false); 2049 return; 2050 } 2051 2052 pthread_mutex_lock(&g_devlist_mutex); 2053 dev = io_device_get(io_device); 2054 if (!dev) { 2055 SPDK_ERRLOG("io_device %p not found\n", io_device); 2056 assert(false); 2057 pthread_mutex_unlock(&g_devlist_mutex); 2058 return; 2059 } 2060 2061 if (dev->for_each_count > 0) { 2062 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 2063 dev->name, io_device, dev->for_each_count); 2064 pthread_mutex_unlock(&g_devlist_mutex); 2065 return; 2066 } 2067 2068 dev->unregister_cb = unregister_cb; 2069 dev->unregistered = true; 2070 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2071 refcnt = dev->refcnt; 2072 dev->unregister_thread = thread; 2073 pthread_mutex_unlock(&g_devlist_mutex); 2074 2075 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2076 dev->name, dev->io_device, thread->name); 2077 2078 if (unregister_cb) { 2079 thread->pending_unregister_count++; 2080 } 2081 2082 if (refcnt > 0) { 2083 /* defer deletion */ 2084 return; 2085 } 2086 2087 io_device_free(dev); 2088 } 2089 2090 const char * 2091 spdk_io_device_get_name(struct io_device *dev) 2092 { 2093 return dev->name; 2094 } 2095 2096 static struct spdk_io_channel * 2097 thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev) 2098 { 2099 struct spdk_io_channel find = {}; 2100 2101 find.dev = dev; 2102 return RB_FIND(io_channel_tree, &thread->io_channels, &find); 2103 } 2104 2105 struct spdk_io_channel * 2106 spdk_get_io_channel(void *io_device) 2107 { 2108 struct spdk_io_channel *ch; 2109 struct spdk_thread *thread; 2110 struct io_device *dev; 2111 int rc; 2112 2113 pthread_mutex_lock(&g_devlist_mutex); 2114 dev = io_device_get(io_device); 2115 if (dev == NULL) { 2116 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2117 pthread_mutex_unlock(&g_devlist_mutex); 2118 return NULL; 2119 } 2120 2121 thread = _get_thread(); 2122 if (!thread) { 2123 SPDK_ERRLOG("No thread allocated\n"); 2124 pthread_mutex_unlock(&g_devlist_mutex); 2125 return NULL; 2126 } 2127 2128 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2129 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2130 pthread_mutex_unlock(&g_devlist_mutex); 2131 return NULL; 2132 } 2133 2134 ch = thread_get_io_channel(thread, dev); 2135 if (ch != NULL) { 2136 ch->ref++; 2137 2138 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2139 ch, dev->name, dev->io_device, thread->name, ch->ref); 2140 2141 /* 2142 * An I/O channel already exists for this device on this 2143 * thread, so return it. 2144 */ 2145 pthread_mutex_unlock(&g_devlist_mutex); 2146 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2147 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2148 return ch; 2149 } 2150 2151 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2152 if (ch == NULL) { 2153 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2154 pthread_mutex_unlock(&g_devlist_mutex); 2155 return NULL; 2156 } 2157 2158 ch->dev = dev; 2159 ch->destroy_cb = dev->destroy_cb; 2160 ch->thread = thread; 2161 ch->ref = 1; 2162 ch->destroy_ref = 0; 2163 RB_INSERT(io_channel_tree, &thread->io_channels, ch); 2164 2165 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2166 ch, dev->name, dev->io_device, thread->name, ch->ref); 2167 2168 dev->refcnt++; 2169 2170 pthread_mutex_unlock(&g_devlist_mutex); 2171 2172 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2173 if (rc != 0) { 2174 pthread_mutex_lock(&g_devlist_mutex); 2175 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2176 dev->refcnt--; 2177 free(ch); 2178 pthread_mutex_unlock(&g_devlist_mutex); 2179 return NULL; 2180 } 2181 2182 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2183 return ch; 2184 } 2185 2186 static void 2187 put_io_channel(void *arg) 2188 { 2189 struct spdk_io_channel *ch = arg; 2190 bool do_remove_dev = true; 2191 struct spdk_thread *thread; 2192 2193 thread = spdk_get_thread(); 2194 if (!thread) { 2195 SPDK_ERRLOG("called from non-SPDK thread\n"); 2196 assert(false); 2197 return; 2198 } 2199 2200 SPDK_DEBUGLOG(thread, 2201 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2202 ch, ch->dev->name, ch->dev->io_device, thread->name); 2203 2204 assert(ch->thread == thread); 2205 2206 ch->destroy_ref--; 2207 2208 if (ch->ref > 0 || ch->destroy_ref > 0) { 2209 /* 2210 * Another reference to the associated io_device was requested 2211 * after this message was sent but before it had a chance to 2212 * execute. 2213 */ 2214 return; 2215 } 2216 2217 pthread_mutex_lock(&g_devlist_mutex); 2218 RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch); 2219 pthread_mutex_unlock(&g_devlist_mutex); 2220 2221 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2222 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2223 2224 pthread_mutex_lock(&g_devlist_mutex); 2225 ch->dev->refcnt--; 2226 2227 if (!ch->dev->unregistered) { 2228 do_remove_dev = false; 2229 } 2230 2231 if (ch->dev->refcnt > 0) { 2232 do_remove_dev = false; 2233 } 2234 2235 pthread_mutex_unlock(&g_devlist_mutex); 2236 2237 if (do_remove_dev) { 2238 io_device_free(ch->dev); 2239 } 2240 free(ch); 2241 } 2242 2243 void 2244 spdk_put_io_channel(struct spdk_io_channel *ch) 2245 { 2246 struct spdk_thread *thread; 2247 int rc __attribute__((unused)); 2248 2249 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2250 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2251 2252 thread = spdk_get_thread(); 2253 if (!thread) { 2254 SPDK_ERRLOG("called from non-SPDK thread\n"); 2255 assert(false); 2256 return; 2257 } 2258 2259 if (ch->thread != thread) { 2260 wrong_thread(__func__, "ch", ch->thread, thread); 2261 return; 2262 } 2263 2264 SPDK_DEBUGLOG(thread, 2265 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2266 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2267 2268 ch->ref--; 2269 2270 if (ch->ref == 0) { 2271 ch->destroy_ref++; 2272 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2273 assert(rc == 0); 2274 } 2275 } 2276 2277 struct spdk_io_channel * 2278 spdk_io_channel_from_ctx(void *ctx) 2279 { 2280 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2281 } 2282 2283 struct spdk_thread * 2284 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2285 { 2286 return ch->thread; 2287 } 2288 2289 void * 2290 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2291 { 2292 return ch->dev->io_device; 2293 } 2294 2295 const char * 2296 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2297 { 2298 return spdk_io_device_get_name(ch->dev); 2299 } 2300 2301 int 2302 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2303 { 2304 return ch->ref; 2305 } 2306 2307 struct spdk_io_channel_iter { 2308 void *io_device; 2309 struct io_device *dev; 2310 spdk_channel_msg fn; 2311 int status; 2312 void *ctx; 2313 struct spdk_io_channel *ch; 2314 2315 struct spdk_thread *cur_thread; 2316 2317 struct spdk_thread *orig_thread; 2318 spdk_channel_for_each_cpl cpl; 2319 }; 2320 2321 void * 2322 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2323 { 2324 return i->io_device; 2325 } 2326 2327 struct spdk_io_channel * 2328 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2329 { 2330 return i->ch; 2331 } 2332 2333 void * 2334 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2335 { 2336 return i->ctx; 2337 } 2338 2339 static void 2340 _call_completion(void *ctx) 2341 { 2342 struct spdk_io_channel_iter *i = ctx; 2343 2344 if (i->cpl != NULL) { 2345 i->cpl(i, i->status); 2346 } 2347 free(i); 2348 } 2349 2350 static void 2351 _call_channel(void *ctx) 2352 { 2353 struct spdk_io_channel_iter *i = ctx; 2354 struct spdk_io_channel *ch; 2355 2356 /* 2357 * It is possible that the channel was deleted before this 2358 * message had a chance to execute. If so, skip calling 2359 * the fn() on this thread. 2360 */ 2361 pthread_mutex_lock(&g_devlist_mutex); 2362 ch = thread_get_io_channel(i->cur_thread, i->dev); 2363 pthread_mutex_unlock(&g_devlist_mutex); 2364 2365 if (ch) { 2366 i->fn(i); 2367 } else { 2368 spdk_for_each_channel_continue(i, 0); 2369 } 2370 } 2371 2372 void 2373 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2374 spdk_channel_for_each_cpl cpl) 2375 { 2376 struct spdk_thread *thread; 2377 struct spdk_io_channel *ch; 2378 struct spdk_io_channel_iter *i; 2379 int rc __attribute__((unused)); 2380 2381 i = calloc(1, sizeof(*i)); 2382 if (!i) { 2383 SPDK_ERRLOG("Unable to allocate iterator\n"); 2384 return; 2385 } 2386 2387 i->io_device = io_device; 2388 i->fn = fn; 2389 i->ctx = ctx; 2390 i->cpl = cpl; 2391 i->orig_thread = _get_thread(); 2392 2393 pthread_mutex_lock(&g_devlist_mutex); 2394 i->dev = io_device_get(io_device); 2395 if (i->dev == NULL) { 2396 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2397 assert(false); 2398 goto end; 2399 } 2400 2401 TAILQ_FOREACH(thread, &g_threads, tailq) { 2402 ch = thread_get_io_channel(thread, i->dev); 2403 if (ch != NULL) { 2404 ch->dev->for_each_count++; 2405 i->cur_thread = thread; 2406 i->ch = ch; 2407 pthread_mutex_unlock(&g_devlist_mutex); 2408 rc = spdk_thread_send_msg(thread, _call_channel, i); 2409 assert(rc == 0); 2410 return; 2411 } 2412 } 2413 2414 end: 2415 pthread_mutex_unlock(&g_devlist_mutex); 2416 2417 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2418 assert(rc == 0); 2419 } 2420 2421 void 2422 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2423 { 2424 struct spdk_thread *thread; 2425 struct spdk_io_channel *ch; 2426 int rc __attribute__((unused)); 2427 2428 assert(i->cur_thread == spdk_get_thread()); 2429 2430 i->status = status; 2431 2432 pthread_mutex_lock(&g_devlist_mutex); 2433 if (status) { 2434 goto end; 2435 } 2436 thread = TAILQ_NEXT(i->cur_thread, tailq); 2437 while (thread) { 2438 ch = thread_get_io_channel(thread, i->dev); 2439 if (ch != NULL) { 2440 i->cur_thread = thread; 2441 i->ch = ch; 2442 pthread_mutex_unlock(&g_devlist_mutex); 2443 rc = spdk_thread_send_msg(thread, _call_channel, i); 2444 assert(rc == 0); 2445 return; 2446 } 2447 thread = TAILQ_NEXT(thread, tailq); 2448 } 2449 2450 end: 2451 i->dev->for_each_count--; 2452 i->ch = NULL; 2453 pthread_mutex_unlock(&g_devlist_mutex); 2454 2455 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2456 assert(rc == 0); 2457 } 2458 2459 struct spdk_interrupt { 2460 int efd; 2461 struct spdk_thread *thread; 2462 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2463 }; 2464 2465 static void 2466 thread_interrupt_destroy(struct spdk_thread *thread) 2467 { 2468 struct spdk_fd_group *fgrp = thread->fgrp; 2469 2470 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2471 2472 if (thread->msg_fd < 0) { 2473 return; 2474 } 2475 2476 spdk_fd_group_remove(fgrp, thread->msg_fd); 2477 close(thread->msg_fd); 2478 thread->msg_fd = -1; 2479 2480 spdk_fd_group_destroy(fgrp); 2481 thread->fgrp = NULL; 2482 } 2483 2484 #ifdef __linux__ 2485 static int 2486 thread_interrupt_msg_process(void *arg) 2487 { 2488 struct spdk_thread *thread = arg; 2489 uint32_t msg_count; 2490 spdk_msg_fn critical_msg; 2491 int rc = 0; 2492 uint64_t notify = 1; 2493 2494 assert(spdk_interrupt_mode_is_enabled()); 2495 2496 /* There may be race between msg_acknowledge and another producer's msg_notify, 2497 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2498 * This can avoid msg notification missing. 2499 */ 2500 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2501 if (rc < 0 && errno != EAGAIN) { 2502 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2503 } 2504 2505 critical_msg = thread->critical_msg; 2506 if (spdk_unlikely(critical_msg != NULL)) { 2507 critical_msg(NULL); 2508 thread->critical_msg = NULL; 2509 rc = 1; 2510 } 2511 2512 msg_count = msg_queue_run_batch(thread, 0); 2513 if (msg_count) { 2514 rc = 1; 2515 } 2516 2517 return rc; 2518 } 2519 2520 static int 2521 thread_interrupt_create(struct spdk_thread *thread) 2522 { 2523 int rc; 2524 2525 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2526 2527 rc = spdk_fd_group_create(&thread->fgrp); 2528 if (rc) { 2529 return rc; 2530 } 2531 2532 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2533 if (thread->msg_fd < 0) { 2534 rc = -errno; 2535 spdk_fd_group_destroy(thread->fgrp); 2536 thread->fgrp = NULL; 2537 2538 return rc; 2539 } 2540 2541 return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd, 2542 thread_interrupt_msg_process, thread); 2543 } 2544 #else 2545 static int 2546 thread_interrupt_create(struct spdk_thread *thread) 2547 { 2548 return -ENOTSUP; 2549 } 2550 #endif 2551 2552 struct spdk_interrupt * 2553 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2554 void *arg, const char *name) 2555 { 2556 struct spdk_thread *thread; 2557 struct spdk_interrupt *intr; 2558 int ret; 2559 2560 thread = spdk_get_thread(); 2561 if (!thread) { 2562 assert(false); 2563 return NULL; 2564 } 2565 2566 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2567 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2568 return NULL; 2569 } 2570 2571 ret = spdk_fd_group_add(thread->fgrp, efd, fn, arg, name); 2572 2573 if (ret != 0) { 2574 SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n", 2575 thread->name, efd, spdk_strerror(-ret)); 2576 return NULL; 2577 } 2578 2579 intr = calloc(1, sizeof(*intr)); 2580 if (intr == NULL) { 2581 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2582 return NULL; 2583 } 2584 2585 if (name) { 2586 snprintf(intr->name, sizeof(intr->name), "%s", name); 2587 } else { 2588 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2589 } 2590 2591 intr->efd = efd; 2592 intr->thread = thread; 2593 2594 return intr; 2595 } 2596 2597 void 2598 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2599 { 2600 struct spdk_thread *thread; 2601 struct spdk_interrupt *intr; 2602 2603 intr = *pintr; 2604 if (intr == NULL) { 2605 return; 2606 } 2607 2608 *pintr = NULL; 2609 2610 thread = spdk_get_thread(); 2611 if (!thread) { 2612 assert(false); 2613 return; 2614 } 2615 2616 if (intr->thread != thread) { 2617 wrong_thread(__func__, intr->name, intr->thread, thread); 2618 return; 2619 } 2620 2621 spdk_fd_group_remove(thread->fgrp, intr->efd); 2622 free(intr); 2623 } 2624 2625 int 2626 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2627 enum spdk_interrupt_event_types event_types) 2628 { 2629 struct spdk_thread *thread; 2630 2631 thread = spdk_get_thread(); 2632 if (!thread) { 2633 assert(false); 2634 return -EINVAL; 2635 } 2636 2637 if (intr->thread != thread) { 2638 wrong_thread(__func__, intr->name, intr->thread, thread); 2639 return -EINVAL; 2640 } 2641 2642 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2643 } 2644 2645 int 2646 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2647 { 2648 return spdk_fd_group_get_fd(thread->fgrp); 2649 } 2650 2651 static bool g_interrupt_mode = false; 2652 2653 int 2654 spdk_interrupt_mode_enable(void) 2655 { 2656 /* It must be called once prior to initializing the threading library. 2657 * g_spdk_msg_mempool will be valid if thread library is initialized. 2658 */ 2659 if (g_spdk_msg_mempool) { 2660 SPDK_ERRLOG("Failed due to threading library is already initialized.\n"); 2661 return -1; 2662 } 2663 2664 #ifdef __linux__ 2665 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2666 g_interrupt_mode = true; 2667 return 0; 2668 #else 2669 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2670 g_interrupt_mode = false; 2671 return -ENOTSUP; 2672 #endif 2673 } 2674 2675 bool 2676 spdk_interrupt_mode_is_enabled(void) 2677 { 2678 return g_interrupt_mode; 2679 } 2680 2681 SPDK_LOG_REGISTER_COMPONENT(thread) 2682