1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/trace.h" 42 #include "spdk/tree.h" 43 #include "spdk/util.h" 44 #include "spdk/fd_group.h" 45 46 #include "spdk/log.h" 47 #include "spdk_internal/thread.h" 48 #include "thread_internal.h" 49 50 #ifdef __linux__ 51 #include <sys/timerfd.h> 52 #include <sys/eventfd.h> 53 #endif 54 55 #define SPDK_MSG_BATCH_SIZE 8 56 #define SPDK_MAX_DEVICE_NAME_LEN 256 57 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 58 #define SPDK_MAX_POLLER_NAME_LEN 256 59 #define SPDK_MAX_THREAD_NAME_LEN 256 60 61 enum spdk_poller_state { 62 /* The poller is registered with a thread but not currently executing its fn. */ 63 SPDK_POLLER_STATE_WAITING, 64 65 /* The poller is currently running its fn. */ 66 SPDK_POLLER_STATE_RUNNING, 67 68 /* The poller was unregistered during the execution of its fn. */ 69 SPDK_POLLER_STATE_UNREGISTERED, 70 71 /* The poller is in the process of being paused. It will be paused 72 * during the next time it's supposed to be executed. 73 */ 74 SPDK_POLLER_STATE_PAUSING, 75 76 /* The poller is registered but currently paused. It's on the 77 * paused_pollers list. 78 */ 79 SPDK_POLLER_STATE_PAUSED, 80 }; 81 82 struct spdk_poller { 83 TAILQ_ENTRY(spdk_poller) tailq; 84 RB_ENTRY(spdk_poller) node; 85 86 /* Current state of the poller; should only be accessed from the poller's thread. */ 87 enum spdk_poller_state state; 88 89 uint64_t period_ticks; 90 uint64_t next_run_tick; 91 uint64_t run_count; 92 uint64_t busy_count; 93 spdk_poller_fn fn; 94 void *arg; 95 struct spdk_thread *thread; 96 int interruptfd; 97 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 98 void *set_intr_cb_arg; 99 100 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 101 }; 102 103 enum spdk_thread_state { 104 /* The thread is pocessing poller and message by spdk_thread_poll(). */ 105 SPDK_THREAD_STATE_RUNNING, 106 107 /* The thread is in the process of termination. It reaps unregistering 108 * poller are releasing I/O channel. 109 */ 110 SPDK_THREAD_STATE_EXITING, 111 112 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 113 SPDK_THREAD_STATE_EXITED, 114 }; 115 116 struct spdk_thread { 117 uint64_t tsc_last; 118 struct spdk_thread_stats stats; 119 /* 120 * Contains pollers actively running on this thread. Pollers 121 * are run round-robin. The thread takes one poller from the head 122 * of the ring, executes it, then puts it back at the tail of 123 * the ring. 124 */ 125 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 126 /** 127 * Contains pollers running on this thread with a periodic timer. 128 */ 129 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 130 struct spdk_poller *first_timed_poller; 131 /* 132 * Contains paused pollers. Pollers on this queue are waiting until 133 * they are resumed (in which case they're put onto the active/timer 134 * queues) or unregistered. 135 */ 136 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 137 struct spdk_ring *messages; 138 int msg_fd; 139 SLIST_HEAD(, spdk_msg) msg_cache; 140 size_t msg_cache_count; 141 spdk_msg_fn critical_msg; 142 uint64_t id; 143 enum spdk_thread_state state; 144 int pending_unregister_count; 145 146 TAILQ_HEAD(, spdk_io_channel) io_channels; 147 TAILQ_ENTRY(spdk_thread) tailq; 148 149 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 150 struct spdk_cpuset cpumask; 151 uint64_t exit_timeout_tsc; 152 153 /* Indicates whether this spdk_thread currently runs in interrupt. */ 154 bool in_interrupt; 155 struct spdk_fd_group *fgrp; 156 157 /* User context allocated at the end */ 158 uint8_t ctx[0]; 159 }; 160 161 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 162 163 static spdk_new_thread_fn g_new_thread_fn = NULL; 164 static spdk_thread_op_fn g_thread_op_fn = NULL; 165 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 166 static size_t g_ctx_sz = 0; 167 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 168 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 169 * SPDK application is required. 170 */ 171 static uint64_t g_thread_id = 1; 172 173 struct io_device { 174 void *io_device; 175 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 176 spdk_io_channel_create_cb create_cb; 177 spdk_io_channel_destroy_cb destroy_cb; 178 spdk_io_device_unregister_cb unregister_cb; 179 struct spdk_thread *unregister_thread; 180 uint32_t ctx_size; 181 uint32_t for_each_count; 182 RB_ENTRY(io_device) node; 183 184 uint32_t refcnt; 185 186 bool unregistered; 187 }; 188 189 static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); 190 191 static int 192 io_device_cmp(struct io_device *dev1, struct io_device *dev2) 193 { 194 return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device); 195 } 196 197 RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp); 198 199 struct spdk_msg { 200 spdk_msg_fn fn; 201 void *arg; 202 203 SLIST_ENTRY(spdk_msg) link; 204 }; 205 206 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 207 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 208 209 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 210 static uint32_t g_thread_count = 0; 211 212 static __thread struct spdk_thread *tls_thread = NULL; 213 214 #define TRACE_GROUP_THREAD 0xa 215 #define TRACE_THREAD_IOCH_GET SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x0) 216 #define TRACE_THREAD_IOCH_PUT SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x1) 217 218 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 219 { 220 spdk_trace_register_description("THREAD_IOCH_GET", 221 TRACE_THREAD_IOCH_GET, 222 OWNER_NONE, OBJECT_NONE, 0, 223 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 224 spdk_trace_register_description("THREAD_IOCH_PUT", 225 TRACE_THREAD_IOCH_PUT, 226 OWNER_NONE, OBJECT_NONE, 0, 227 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 228 } 229 230 /* 231 * If this compare function returns zero when two next_run_ticks are equal, 232 * the macro RB_INSERT() returns a pointer to the element with the same 233 * next_run_tick. 234 * 235 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 236 * to remove as a parameter. 237 * 238 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 239 * side by returning 1 when two next_run_ticks are equal. 240 */ 241 static inline int 242 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 243 { 244 if (poller1->next_run_tick < poller2->next_run_tick) { 245 return -1; 246 } else { 247 return 1; 248 } 249 } 250 251 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 252 253 static inline struct spdk_thread * 254 _get_thread(void) 255 { 256 return tls_thread; 257 } 258 259 static int 260 _thread_lib_init(size_t ctx_sz) 261 { 262 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 263 264 g_ctx_sz = ctx_sz; 265 266 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 267 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 268 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 269 sizeof(struct spdk_msg), 270 0, /* No cache. We do our own. */ 271 SPDK_ENV_SOCKET_ID_ANY); 272 273 if (!g_spdk_msg_mempool) { 274 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 275 return -1; 276 } 277 278 return 0; 279 } 280 281 int 282 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 283 { 284 assert(g_new_thread_fn == NULL); 285 assert(g_thread_op_fn == NULL); 286 287 if (new_thread_fn == NULL) { 288 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 289 } else { 290 g_new_thread_fn = new_thread_fn; 291 } 292 293 return _thread_lib_init(ctx_sz); 294 } 295 296 int 297 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 298 spdk_thread_op_supported_fn thread_op_supported_fn, 299 size_t ctx_sz) 300 { 301 assert(g_new_thread_fn == NULL); 302 assert(g_thread_op_fn == NULL); 303 assert(g_thread_op_supported_fn == NULL); 304 305 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 306 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 307 return -EINVAL; 308 } 309 310 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 311 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 312 } else { 313 g_thread_op_fn = thread_op_fn; 314 g_thread_op_supported_fn = thread_op_supported_fn; 315 } 316 317 return _thread_lib_init(ctx_sz); 318 } 319 320 void 321 spdk_thread_lib_fini(void) 322 { 323 struct io_device *dev; 324 325 RB_FOREACH(dev, io_device_tree, &g_io_devices) { 326 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 327 } 328 329 if (g_spdk_msg_mempool) { 330 spdk_mempool_free(g_spdk_msg_mempool); 331 g_spdk_msg_mempool = NULL; 332 } 333 334 g_new_thread_fn = NULL; 335 g_thread_op_fn = NULL; 336 g_thread_op_supported_fn = NULL; 337 g_ctx_sz = 0; 338 } 339 340 static void thread_interrupt_destroy(struct spdk_thread *thread); 341 static int thread_interrupt_create(struct spdk_thread *thread); 342 343 static void 344 _free_thread(struct spdk_thread *thread) 345 { 346 struct spdk_io_channel *ch; 347 struct spdk_msg *msg; 348 struct spdk_poller *poller, *ptmp; 349 350 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 351 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 352 thread->name, ch->dev->name); 353 } 354 355 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 356 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 357 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 358 poller->name); 359 } 360 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 361 free(poller); 362 } 363 364 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 365 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 366 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 367 poller->name); 368 } 369 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 370 free(poller); 371 } 372 373 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 374 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 375 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 376 free(poller); 377 } 378 379 pthread_mutex_lock(&g_devlist_mutex); 380 assert(g_thread_count > 0); 381 g_thread_count--; 382 TAILQ_REMOVE(&g_threads, thread, tailq); 383 pthread_mutex_unlock(&g_devlist_mutex); 384 385 msg = SLIST_FIRST(&thread->msg_cache); 386 while (msg != NULL) { 387 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 388 389 assert(thread->msg_cache_count > 0); 390 thread->msg_cache_count--; 391 spdk_mempool_put(g_spdk_msg_mempool, msg); 392 393 msg = SLIST_FIRST(&thread->msg_cache); 394 } 395 396 assert(thread->msg_cache_count == 0); 397 398 if (spdk_interrupt_mode_is_enabled()) { 399 thread_interrupt_destroy(thread); 400 } 401 402 spdk_ring_free(thread->messages); 403 free(thread); 404 } 405 406 struct spdk_thread * 407 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 408 { 409 struct spdk_thread *thread; 410 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 411 int rc = 0, i; 412 413 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 414 if (!thread) { 415 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 416 return NULL; 417 } 418 419 if (cpumask) { 420 spdk_cpuset_copy(&thread->cpumask, cpumask); 421 } else { 422 spdk_cpuset_negate(&thread->cpumask); 423 } 424 425 TAILQ_INIT(&thread->io_channels); 426 TAILQ_INIT(&thread->active_pollers); 427 RB_INIT(&thread->timed_pollers); 428 TAILQ_INIT(&thread->paused_pollers); 429 SLIST_INIT(&thread->msg_cache); 430 thread->msg_cache_count = 0; 431 432 thread->tsc_last = spdk_get_ticks(); 433 434 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 435 if (!thread->messages) { 436 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 437 free(thread); 438 return NULL; 439 } 440 441 /* Fill the local message pool cache. */ 442 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 443 if (rc == 0) { 444 /* If we can't populate the cache it's ok. The cache will get filled 445 * up organically as messages are passed to the thread. */ 446 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 447 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 448 thread->msg_cache_count++; 449 } 450 } 451 452 if (name) { 453 snprintf(thread->name, sizeof(thread->name), "%s", name); 454 } else { 455 snprintf(thread->name, sizeof(thread->name), "%p", thread); 456 } 457 458 pthread_mutex_lock(&g_devlist_mutex); 459 if (g_thread_id == 0) { 460 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 461 pthread_mutex_unlock(&g_devlist_mutex); 462 _free_thread(thread); 463 return NULL; 464 } 465 thread->id = g_thread_id++; 466 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 467 g_thread_count++; 468 pthread_mutex_unlock(&g_devlist_mutex); 469 470 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 471 thread->id, thread->name); 472 473 if (spdk_interrupt_mode_is_enabled()) { 474 thread->in_interrupt = true; 475 rc = thread_interrupt_create(thread); 476 if (rc != 0) { 477 _free_thread(thread); 478 return NULL; 479 } 480 } 481 482 if (g_new_thread_fn) { 483 rc = g_new_thread_fn(thread); 484 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 485 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 486 } 487 488 if (rc != 0) { 489 _free_thread(thread); 490 return NULL; 491 } 492 493 thread->state = SPDK_THREAD_STATE_RUNNING; 494 495 return thread; 496 } 497 498 void 499 spdk_set_thread(struct spdk_thread *thread) 500 { 501 tls_thread = thread; 502 } 503 504 static void 505 thread_exit(struct spdk_thread *thread, uint64_t now) 506 { 507 struct spdk_poller *poller; 508 struct spdk_io_channel *ch; 509 510 if (now >= thread->exit_timeout_tsc) { 511 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 512 thread->name); 513 goto exited; 514 } 515 516 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 517 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 518 SPDK_INFOLOG(thread, 519 "thread %s still has active poller %s\n", 520 thread->name, poller->name); 521 return; 522 } 523 } 524 525 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 526 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 527 SPDK_INFOLOG(thread, 528 "thread %s still has active timed poller %s\n", 529 thread->name, poller->name); 530 return; 531 } 532 } 533 534 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 535 SPDK_INFOLOG(thread, 536 "thread %s still has paused poller %s\n", 537 thread->name, poller->name); 538 return; 539 } 540 541 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 542 SPDK_INFOLOG(thread, 543 "thread %s still has channel for io_device %s\n", 544 thread->name, ch->dev->name); 545 return; 546 } 547 548 if (thread->pending_unregister_count > 0) { 549 SPDK_INFOLOG(thread, 550 "thread %s is still unregistering io_devices\n", 551 thread->name); 552 return; 553 } 554 555 exited: 556 thread->state = SPDK_THREAD_STATE_EXITED; 557 } 558 559 int 560 spdk_thread_exit(struct spdk_thread *thread) 561 { 562 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 563 564 assert(tls_thread == thread); 565 566 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 567 SPDK_INFOLOG(thread, 568 "thread %s is already exiting\n", 569 thread->name); 570 return 0; 571 } 572 573 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 574 SPDK_THREAD_EXIT_TIMEOUT_SEC); 575 thread->state = SPDK_THREAD_STATE_EXITING; 576 return 0; 577 } 578 579 bool 580 spdk_thread_is_exited(struct spdk_thread *thread) 581 { 582 return thread->state == SPDK_THREAD_STATE_EXITED; 583 } 584 585 void 586 spdk_thread_destroy(struct spdk_thread *thread) 587 { 588 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 589 590 assert(thread->state == SPDK_THREAD_STATE_EXITED); 591 592 if (tls_thread == thread) { 593 tls_thread = NULL; 594 } 595 596 _free_thread(thread); 597 } 598 599 void * 600 spdk_thread_get_ctx(struct spdk_thread *thread) 601 { 602 if (g_ctx_sz > 0) { 603 return thread->ctx; 604 } 605 606 return NULL; 607 } 608 609 struct spdk_cpuset * 610 spdk_thread_get_cpumask(struct spdk_thread *thread) 611 { 612 return &thread->cpumask; 613 } 614 615 int 616 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 617 { 618 struct spdk_thread *thread; 619 620 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 621 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 622 assert(false); 623 return -ENOTSUP; 624 } 625 626 thread = spdk_get_thread(); 627 if (!thread) { 628 SPDK_ERRLOG("Called from non-SPDK thread\n"); 629 assert(false); 630 return -EINVAL; 631 } 632 633 spdk_cpuset_copy(&thread->cpumask, cpumask); 634 635 /* Invoke framework's reschedule operation. If this function is called multiple times 636 * in a single spdk_thread_poll() context, the last cpumask will be used in the 637 * reschedule operation. 638 */ 639 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 640 641 return 0; 642 } 643 644 struct spdk_thread * 645 spdk_thread_get_from_ctx(void *ctx) 646 { 647 if (ctx == NULL) { 648 assert(false); 649 return NULL; 650 } 651 652 assert(g_ctx_sz > 0); 653 654 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 655 } 656 657 static inline uint32_t 658 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 659 { 660 unsigned count, i; 661 void *messages[SPDK_MSG_BATCH_SIZE]; 662 uint64_t notify = 1; 663 int rc; 664 665 #ifdef DEBUG 666 /* 667 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 668 * so we will never actually read uninitialized data from events, but just to be sure 669 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 670 */ 671 memset(messages, 0, sizeof(messages)); 672 #endif 673 674 if (max_msgs > 0) { 675 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 676 } else { 677 max_msgs = SPDK_MSG_BATCH_SIZE; 678 } 679 680 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 681 if (spdk_unlikely(thread->in_interrupt) && 682 spdk_ring_count(thread->messages) != 0) { 683 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 684 if (rc < 0) { 685 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 686 } 687 } 688 if (count == 0) { 689 return 0; 690 } 691 692 for (i = 0; i < count; i++) { 693 struct spdk_msg *msg = messages[i]; 694 695 assert(msg != NULL); 696 msg->fn(msg->arg); 697 698 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 699 /* Insert the messages at the head. We want to re-use the hot 700 * ones. */ 701 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 702 thread->msg_cache_count++; 703 } else { 704 spdk_mempool_put(g_spdk_msg_mempool, msg); 705 } 706 } 707 708 return count; 709 } 710 711 static void 712 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 713 { 714 struct spdk_poller *tmp __attribute__((unused)); 715 716 poller->next_run_tick = now + poller->period_ticks; 717 718 /* 719 * Insert poller in the thread's timed_pollers tree by next scheduled run time 720 * as its key. 721 */ 722 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 723 assert(tmp == NULL); 724 725 /* Update the cache only if it is empty or the inserted poller is earlier than it. 726 * RB_MIN() is not necessary here because all pollers, which has exactly the same 727 * next_run_tick as the existing poller, are inserted on the right side. 728 */ 729 if (thread->first_timed_poller == NULL || 730 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 731 thread->first_timed_poller = poller; 732 } 733 } 734 735 #ifdef __linux__ 736 static inline void 737 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 738 { 739 struct spdk_poller *tmp __attribute__((unused)); 740 741 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 742 assert(tmp != NULL); 743 744 /* This function is not used in any case that is performance critical. 745 * Update the cache simply by RB_MIN() if it needs to be changed. 746 */ 747 if (thread->first_timed_poller == poller) { 748 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 749 } 750 } 751 #endif 752 753 static void 754 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 755 { 756 if (poller->period_ticks) { 757 poller_insert_timer(thread, poller, spdk_get_ticks()); 758 } else { 759 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 760 } 761 } 762 763 static inline void 764 thread_update_stats(struct spdk_thread *thread, uint64_t end, 765 uint64_t start, int rc) 766 { 767 if (rc == 0) { 768 /* Poller status idle */ 769 thread->stats.idle_tsc += end - start; 770 } else if (rc > 0) { 771 /* Poller status busy */ 772 thread->stats.busy_tsc += end - start; 773 } 774 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 775 thread->tsc_last = end; 776 } 777 778 static inline int 779 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 780 { 781 int rc; 782 783 switch (poller->state) { 784 case SPDK_POLLER_STATE_UNREGISTERED: 785 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 786 free(poller); 787 return 0; 788 case SPDK_POLLER_STATE_PAUSING: 789 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 790 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 791 poller->state = SPDK_POLLER_STATE_PAUSED; 792 return 0; 793 case SPDK_POLLER_STATE_WAITING: 794 break; 795 default: 796 assert(false); 797 break; 798 } 799 800 poller->state = SPDK_POLLER_STATE_RUNNING; 801 rc = poller->fn(poller->arg); 802 803 poller->run_count++; 804 if (rc > 0) { 805 poller->busy_count++; 806 } 807 808 #ifdef DEBUG 809 if (rc == -1) { 810 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 811 } 812 #endif 813 814 switch (poller->state) { 815 case SPDK_POLLER_STATE_UNREGISTERED: 816 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 817 free(poller); 818 break; 819 case SPDK_POLLER_STATE_PAUSING: 820 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 821 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 822 poller->state = SPDK_POLLER_STATE_PAUSED; 823 break; 824 case SPDK_POLLER_STATE_PAUSED: 825 case SPDK_POLLER_STATE_WAITING: 826 break; 827 case SPDK_POLLER_STATE_RUNNING: 828 poller->state = SPDK_POLLER_STATE_WAITING; 829 break; 830 default: 831 assert(false); 832 break; 833 } 834 835 return rc; 836 } 837 838 static inline int 839 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 840 uint64_t now) 841 { 842 int rc; 843 844 switch (poller->state) { 845 case SPDK_POLLER_STATE_UNREGISTERED: 846 free(poller); 847 return 0; 848 case SPDK_POLLER_STATE_PAUSING: 849 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 850 poller->state = SPDK_POLLER_STATE_PAUSED; 851 return 0; 852 case SPDK_POLLER_STATE_WAITING: 853 break; 854 default: 855 assert(false); 856 break; 857 } 858 859 poller->state = SPDK_POLLER_STATE_RUNNING; 860 rc = poller->fn(poller->arg); 861 862 poller->run_count++; 863 if (rc > 0) { 864 poller->busy_count++; 865 } 866 867 #ifdef DEBUG 868 if (rc == -1) { 869 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 870 } 871 #endif 872 873 switch (poller->state) { 874 case SPDK_POLLER_STATE_UNREGISTERED: 875 free(poller); 876 break; 877 case SPDK_POLLER_STATE_PAUSING: 878 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 879 poller->state = SPDK_POLLER_STATE_PAUSED; 880 break; 881 case SPDK_POLLER_STATE_PAUSED: 882 break; 883 case SPDK_POLLER_STATE_RUNNING: 884 poller->state = SPDK_POLLER_STATE_WAITING; 885 /* fallthrough */ 886 case SPDK_POLLER_STATE_WAITING: 887 poller_insert_timer(thread, poller, now); 888 break; 889 default: 890 assert(false); 891 break; 892 } 893 894 return rc; 895 } 896 897 static int 898 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 899 { 900 uint32_t msg_count; 901 struct spdk_poller *poller, *tmp; 902 spdk_msg_fn critical_msg; 903 int rc = 0; 904 905 thread->tsc_last = now; 906 907 critical_msg = thread->critical_msg; 908 if (spdk_unlikely(critical_msg != NULL)) { 909 critical_msg(NULL); 910 thread->critical_msg = NULL; 911 rc = 1; 912 } 913 914 msg_count = msg_queue_run_batch(thread, max_msgs); 915 if (msg_count) { 916 rc = 1; 917 } 918 919 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 920 active_pollers_head, tailq, tmp) { 921 int poller_rc; 922 923 poller_rc = thread_execute_poller(thread, poller); 924 if (poller_rc > rc) { 925 rc = poller_rc; 926 } 927 } 928 929 poller = thread->first_timed_poller; 930 while (poller != NULL) { 931 int timer_rc = 0; 932 933 if (now < poller->next_run_tick) { 934 break; 935 } 936 937 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 938 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 939 940 /* Update the cache to the next timed poller in the list 941 * only if the current poller is still the closest, otherwise, 942 * do nothing because the cache has been already updated. 943 */ 944 if (thread->first_timed_poller == poller) { 945 thread->first_timed_poller = tmp; 946 } 947 948 timer_rc = thread_execute_timed_poller(thread, poller, now); 949 if (timer_rc > rc) { 950 rc = timer_rc; 951 } 952 953 poller = tmp; 954 } 955 956 return rc; 957 } 958 959 int 960 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 961 { 962 struct spdk_thread *orig_thread; 963 int rc; 964 uint64_t notify = 1; 965 966 orig_thread = _get_thread(); 967 tls_thread = thread; 968 969 if (now == 0) { 970 now = spdk_get_ticks(); 971 } 972 973 if (spdk_likely(!thread->in_interrupt)) { 974 rc = thread_poll(thread, max_msgs, now); 975 if (spdk_unlikely(thread->in_interrupt)) { 976 /* The thread transitioned to interrupt mode during the above poll. 977 * Poll it one more time in case that during the transition time 978 * there is msg received without notification. 979 */ 980 rc = thread_poll(thread, max_msgs, now); 981 } 982 } else { 983 /* Non-block wait on thread's fd_group */ 984 rc = spdk_fd_group_wait(thread->fgrp, 0); 985 if (spdk_unlikely(!thread->in_interrupt)) { 986 /* The thread transitioned to poll mode in a msg during the above processing. 987 * Clear msg_fd since thread messages will be polled directly in poll mode. 988 */ 989 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 990 if (rc < 0 && errno != EAGAIN) { 991 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 992 } 993 } 994 } 995 996 997 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 998 thread_exit(thread, now); 999 } 1000 1001 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1002 1003 tls_thread = orig_thread; 1004 1005 return rc; 1006 } 1007 1008 uint64_t 1009 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1010 { 1011 struct spdk_poller *poller; 1012 1013 poller = thread->first_timed_poller; 1014 if (poller) { 1015 return poller->next_run_tick; 1016 } 1017 1018 return 0; 1019 } 1020 1021 int 1022 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1023 { 1024 return !TAILQ_EMPTY(&thread->active_pollers); 1025 } 1026 1027 static bool 1028 thread_has_unpaused_pollers(struct spdk_thread *thread) 1029 { 1030 if (TAILQ_EMPTY(&thread->active_pollers) && 1031 RB_EMPTY(&thread->timed_pollers)) { 1032 return false; 1033 } 1034 1035 return true; 1036 } 1037 1038 bool 1039 spdk_thread_has_pollers(struct spdk_thread *thread) 1040 { 1041 if (!thread_has_unpaused_pollers(thread) && 1042 TAILQ_EMPTY(&thread->paused_pollers)) { 1043 return false; 1044 } 1045 1046 return true; 1047 } 1048 1049 bool 1050 spdk_thread_is_idle(struct spdk_thread *thread) 1051 { 1052 if (spdk_ring_count(thread->messages) || 1053 thread_has_unpaused_pollers(thread) || 1054 thread->critical_msg != NULL) { 1055 return false; 1056 } 1057 1058 return true; 1059 } 1060 1061 uint32_t 1062 spdk_thread_get_count(void) 1063 { 1064 /* 1065 * Return cached value of the current thread count. We could acquire the 1066 * lock and iterate through the TAILQ of threads to count them, but that 1067 * count could still be invalidated after we release the lock. 1068 */ 1069 return g_thread_count; 1070 } 1071 1072 struct spdk_thread * 1073 spdk_get_thread(void) 1074 { 1075 return _get_thread(); 1076 } 1077 1078 const char * 1079 spdk_thread_get_name(const struct spdk_thread *thread) 1080 { 1081 return thread->name; 1082 } 1083 1084 uint64_t 1085 spdk_thread_get_id(const struct spdk_thread *thread) 1086 { 1087 return thread->id; 1088 } 1089 1090 struct spdk_thread * 1091 spdk_thread_get_by_id(uint64_t id) 1092 { 1093 struct spdk_thread *thread; 1094 1095 if (id == 0 || id >= g_thread_id) { 1096 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1097 return NULL; 1098 } 1099 pthread_mutex_lock(&g_devlist_mutex); 1100 TAILQ_FOREACH(thread, &g_threads, tailq) { 1101 if (thread->id == id) { 1102 break; 1103 } 1104 } 1105 pthread_mutex_unlock(&g_devlist_mutex); 1106 return thread; 1107 } 1108 1109 int 1110 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1111 { 1112 struct spdk_thread *thread; 1113 1114 thread = _get_thread(); 1115 if (!thread) { 1116 SPDK_ERRLOG("No thread allocated\n"); 1117 return -EINVAL; 1118 } 1119 1120 if (stats == NULL) { 1121 return -EINVAL; 1122 } 1123 1124 *stats = thread->stats; 1125 1126 return 0; 1127 } 1128 1129 uint64_t 1130 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1131 { 1132 if (thread == NULL) { 1133 thread = _get_thread(); 1134 } 1135 1136 return thread->tsc_last; 1137 } 1138 1139 static inline int 1140 thread_send_msg_notification(const struct spdk_thread *target_thread) 1141 { 1142 uint64_t notify = 1; 1143 int rc; 1144 1145 /* Not necessary to do notification if interrupt facility is not enabled */ 1146 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1147 return 0; 1148 } 1149 1150 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1151 * after sending thread msg, it is necessary to check whether target thread runs in 1152 * interrupt mode and then decide whether do event notification. 1153 */ 1154 if (spdk_unlikely(target_thread->in_interrupt)) { 1155 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1156 if (rc < 0) { 1157 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1158 return -EIO; 1159 } 1160 } 1161 1162 return 0; 1163 } 1164 1165 int 1166 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1167 { 1168 struct spdk_thread *local_thread; 1169 struct spdk_msg *msg; 1170 int rc; 1171 1172 assert(thread != NULL); 1173 1174 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1175 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1176 return -EIO; 1177 } 1178 1179 local_thread = _get_thread(); 1180 1181 msg = NULL; 1182 if (local_thread != NULL) { 1183 if (local_thread->msg_cache_count > 0) { 1184 msg = SLIST_FIRST(&local_thread->msg_cache); 1185 assert(msg != NULL); 1186 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1187 local_thread->msg_cache_count--; 1188 } 1189 } 1190 1191 if (msg == NULL) { 1192 msg = spdk_mempool_get(g_spdk_msg_mempool); 1193 if (!msg) { 1194 SPDK_ERRLOG("msg could not be allocated\n"); 1195 return -ENOMEM; 1196 } 1197 } 1198 1199 msg->fn = fn; 1200 msg->arg = ctx; 1201 1202 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1203 if (rc != 1) { 1204 SPDK_ERRLOG("msg could not be enqueued\n"); 1205 spdk_mempool_put(g_spdk_msg_mempool, msg); 1206 return -EIO; 1207 } 1208 1209 return thread_send_msg_notification(thread); 1210 } 1211 1212 int 1213 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1214 { 1215 spdk_msg_fn expected = NULL; 1216 1217 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1218 __ATOMIC_SEQ_CST)) { 1219 return -EIO; 1220 } 1221 1222 return thread_send_msg_notification(thread); 1223 } 1224 1225 #ifdef __linux__ 1226 static int 1227 interrupt_timerfd_process(void *arg) 1228 { 1229 struct spdk_poller *poller = arg; 1230 uint64_t exp; 1231 int rc; 1232 1233 /* clear the level of interval timer */ 1234 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1235 if (rc < 0) { 1236 if (rc == -EAGAIN) { 1237 return 0; 1238 } 1239 1240 return rc; 1241 } 1242 1243 return poller->fn(poller->arg); 1244 } 1245 1246 static int 1247 period_poller_interrupt_init(struct spdk_poller *poller) 1248 { 1249 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1250 int timerfd; 1251 int rc; 1252 1253 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1254 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1255 if (timerfd < 0) { 1256 return -errno; 1257 } 1258 1259 rc = spdk_fd_group_add(fgrp, timerfd, 1260 interrupt_timerfd_process, poller); 1261 if (rc < 0) { 1262 close(timerfd); 1263 return rc; 1264 } 1265 1266 poller->interruptfd = timerfd; 1267 return 0; 1268 } 1269 1270 static void 1271 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1272 { 1273 int timerfd = poller->interruptfd; 1274 uint64_t now_tick = spdk_get_ticks(); 1275 uint64_t ticks = spdk_get_ticks_hz(); 1276 int ret; 1277 struct itimerspec new_tv = {}; 1278 struct itimerspec old_tv = {}; 1279 1280 assert(poller->period_ticks != 0); 1281 assert(timerfd >= 0); 1282 1283 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1284 interrupt_mode ? "interrupt" : "poll"); 1285 1286 if (interrupt_mode) { 1287 /* Set repeated timer expiration */ 1288 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1289 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1290 1291 /* Update next timer expiration */ 1292 if (poller->next_run_tick == 0) { 1293 poller->next_run_tick = now_tick + poller->period_ticks; 1294 } else if (poller->next_run_tick < now_tick) { 1295 poller->next_run_tick = now_tick; 1296 } 1297 1298 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1299 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1300 1301 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1302 if (ret < 0) { 1303 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1304 assert(false); 1305 } 1306 } else { 1307 /* Disarm the timer */ 1308 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1309 if (ret < 0) { 1310 /* timerfd_settime's failure indicates that the timerfd is in error */ 1311 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1312 assert(false); 1313 } 1314 1315 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1316 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1317 */ 1318 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1319 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1320 poller_remove_timer(poller->thread, poller); 1321 poller_insert_timer(poller->thread, poller, now_tick); 1322 } 1323 } 1324 1325 static void 1326 poller_interrupt_fini(struct spdk_poller *poller) 1327 { 1328 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1329 assert(poller->interruptfd >= 0); 1330 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1331 close(poller->interruptfd); 1332 poller->interruptfd = -1; 1333 } 1334 1335 static int 1336 busy_poller_interrupt_init(struct spdk_poller *poller) 1337 { 1338 int busy_efd; 1339 int rc; 1340 1341 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1342 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1343 if (busy_efd < 0) { 1344 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1345 return -errno; 1346 } 1347 1348 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, poller->fn, poller->arg); 1349 if (rc < 0) { 1350 close(busy_efd); 1351 return rc; 1352 } 1353 1354 poller->interruptfd = busy_efd; 1355 return 0; 1356 } 1357 1358 static void 1359 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1360 { 1361 int busy_efd = poller->interruptfd; 1362 uint64_t notify = 1; 1363 int rc __attribute__((unused)); 1364 1365 assert(busy_efd >= 0); 1366 1367 if (interrupt_mode) { 1368 /* Write without read on eventfd will get it repeatedly triggered. */ 1369 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1370 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1371 } 1372 } else { 1373 /* Read on eventfd will clear its level triggering. */ 1374 rc = read(busy_efd, ¬ify, sizeof(notify)); 1375 } 1376 } 1377 1378 #else 1379 1380 static int 1381 period_poller_interrupt_init(struct spdk_poller *poller) 1382 { 1383 return -ENOTSUP; 1384 } 1385 1386 static void 1387 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1388 { 1389 } 1390 1391 static void 1392 poller_interrupt_fini(struct spdk_poller *poller) 1393 { 1394 } 1395 1396 static int 1397 busy_poller_interrupt_init(struct spdk_poller *poller) 1398 { 1399 return -ENOTSUP; 1400 } 1401 1402 static void 1403 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1404 { 1405 } 1406 1407 #endif 1408 1409 void 1410 spdk_poller_register_interrupt(struct spdk_poller *poller, 1411 spdk_poller_set_interrupt_mode_cb cb_fn, 1412 void *cb_arg) 1413 { 1414 assert(poller != NULL); 1415 assert(cb_fn != NULL); 1416 assert(spdk_get_thread() == poller->thread); 1417 1418 if (!spdk_interrupt_mode_is_enabled()) { 1419 return; 1420 } 1421 1422 /* when a poller is created we don't know if the user is ever going to 1423 * enable interrupts on it by calling this function, so the poller 1424 * registration function has to immediately create a interruptfd. 1425 * When this function does get called by user, we have to then destroy 1426 * that interruptfd. 1427 */ 1428 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1429 poller_interrupt_fini(poller); 1430 } 1431 1432 poller->set_intr_cb_fn = cb_fn; 1433 poller->set_intr_cb_arg = cb_arg; 1434 1435 /* Set poller into interrupt mode if thread is in interrupt. */ 1436 if (poller->thread->in_interrupt) { 1437 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1438 } 1439 } 1440 1441 static uint64_t 1442 convert_us_to_ticks(uint64_t us) 1443 { 1444 uint64_t quotient, remainder, ticks; 1445 1446 if (us) { 1447 quotient = us / SPDK_SEC_TO_USEC; 1448 remainder = us % SPDK_SEC_TO_USEC; 1449 ticks = spdk_get_ticks_hz(); 1450 1451 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1452 } else { 1453 return 0; 1454 } 1455 } 1456 1457 static struct spdk_poller * 1458 poller_register(spdk_poller_fn fn, 1459 void *arg, 1460 uint64_t period_microseconds, 1461 const char *name) 1462 { 1463 struct spdk_thread *thread; 1464 struct spdk_poller *poller; 1465 1466 thread = spdk_get_thread(); 1467 if (!thread) { 1468 assert(false); 1469 return NULL; 1470 } 1471 1472 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1473 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1474 return NULL; 1475 } 1476 1477 poller = calloc(1, sizeof(*poller)); 1478 if (poller == NULL) { 1479 SPDK_ERRLOG("Poller memory allocation failed\n"); 1480 return NULL; 1481 } 1482 1483 if (name) { 1484 snprintf(poller->name, sizeof(poller->name), "%s", name); 1485 } else { 1486 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1487 } 1488 1489 poller->state = SPDK_POLLER_STATE_WAITING; 1490 poller->fn = fn; 1491 poller->arg = arg; 1492 poller->thread = thread; 1493 poller->interruptfd = -1; 1494 1495 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1496 1497 if (spdk_interrupt_mode_is_enabled()) { 1498 int rc; 1499 1500 if (period_microseconds) { 1501 rc = period_poller_interrupt_init(poller); 1502 if (rc < 0) { 1503 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1504 free(poller); 1505 return NULL; 1506 } 1507 1508 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1509 } else { 1510 /* If the poller doesn't have a period, create interruptfd that's always 1511 * busy automatically when runnning in interrupt mode. 1512 */ 1513 rc = busy_poller_interrupt_init(poller); 1514 if (rc > 0) { 1515 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1516 free(poller); 1517 return NULL; 1518 } 1519 1520 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1521 } 1522 } 1523 1524 thread_insert_poller(thread, poller); 1525 1526 return poller; 1527 } 1528 1529 struct spdk_poller * 1530 spdk_poller_register(spdk_poller_fn fn, 1531 void *arg, 1532 uint64_t period_microseconds) 1533 { 1534 return poller_register(fn, arg, period_microseconds, NULL); 1535 } 1536 1537 struct spdk_poller * 1538 spdk_poller_register_named(spdk_poller_fn fn, 1539 void *arg, 1540 uint64_t period_microseconds, 1541 const char *name) 1542 { 1543 return poller_register(fn, arg, period_microseconds, name); 1544 } 1545 1546 void 1547 spdk_poller_unregister(struct spdk_poller **ppoller) 1548 { 1549 struct spdk_thread *thread; 1550 struct spdk_poller *poller; 1551 1552 poller = *ppoller; 1553 if (poller == NULL) { 1554 return; 1555 } 1556 1557 *ppoller = NULL; 1558 1559 thread = spdk_get_thread(); 1560 if (!thread) { 1561 assert(false); 1562 return; 1563 } 1564 1565 if (poller->thread != thread) { 1566 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1567 assert(false); 1568 return; 1569 } 1570 1571 if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) { 1572 poller_interrupt_fini(poller); 1573 } 1574 1575 /* If the poller was paused, put it on the active_pollers list so that 1576 * its unregistration can be processed by spdk_thread_poll(). 1577 */ 1578 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1579 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1580 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1581 poller->period_ticks = 0; 1582 } 1583 1584 /* Simply set the state to unregistered. The poller will get cleaned up 1585 * in a subsequent call to spdk_thread_poll(). 1586 */ 1587 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1588 } 1589 1590 void 1591 spdk_poller_pause(struct spdk_poller *poller) 1592 { 1593 struct spdk_thread *thread; 1594 1595 thread = spdk_get_thread(); 1596 if (!thread) { 1597 assert(false); 1598 return; 1599 } 1600 1601 if (poller->thread != thread) { 1602 SPDK_ERRLOG("different from the thread that called spdk_poller_pause()\n"); 1603 assert(false); 1604 return; 1605 } 1606 1607 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1608 * spdk_thread_poll() move it. It allows a poller to be paused from 1609 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1610 * iteration, or from within itself without breaking the logic to always 1611 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1612 */ 1613 switch (poller->state) { 1614 case SPDK_POLLER_STATE_PAUSED: 1615 case SPDK_POLLER_STATE_PAUSING: 1616 break; 1617 case SPDK_POLLER_STATE_RUNNING: 1618 case SPDK_POLLER_STATE_WAITING: 1619 poller->state = SPDK_POLLER_STATE_PAUSING; 1620 break; 1621 default: 1622 assert(false); 1623 break; 1624 } 1625 } 1626 1627 void 1628 spdk_poller_resume(struct spdk_poller *poller) 1629 { 1630 struct spdk_thread *thread; 1631 1632 thread = spdk_get_thread(); 1633 if (!thread) { 1634 assert(false); 1635 return; 1636 } 1637 1638 if (poller->thread != thread) { 1639 SPDK_ERRLOG("different from the thread that called spdk_poller_resume()\n"); 1640 assert(false); 1641 return; 1642 } 1643 1644 /* If a poller is paused it has to be removed from the paused pollers 1645 * list and put on the active list or timer tree depending on its 1646 * period_ticks. If a poller is still in the process of being paused, 1647 * we just need to flip its state back to waiting, as it's already on 1648 * the appropriate list or tree. 1649 */ 1650 switch (poller->state) { 1651 case SPDK_POLLER_STATE_PAUSED: 1652 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1653 thread_insert_poller(thread, poller); 1654 /* fallthrough */ 1655 case SPDK_POLLER_STATE_PAUSING: 1656 poller->state = SPDK_POLLER_STATE_WAITING; 1657 break; 1658 case SPDK_POLLER_STATE_RUNNING: 1659 case SPDK_POLLER_STATE_WAITING: 1660 break; 1661 default: 1662 assert(false); 1663 break; 1664 } 1665 } 1666 1667 const char * 1668 spdk_poller_get_name(struct spdk_poller *poller) 1669 { 1670 return poller->name; 1671 } 1672 1673 const char * 1674 spdk_poller_get_state_str(struct spdk_poller *poller) 1675 { 1676 switch (poller->state) { 1677 case SPDK_POLLER_STATE_WAITING: 1678 return "waiting"; 1679 case SPDK_POLLER_STATE_RUNNING: 1680 return "running"; 1681 case SPDK_POLLER_STATE_UNREGISTERED: 1682 return "unregistered"; 1683 case SPDK_POLLER_STATE_PAUSING: 1684 return "pausing"; 1685 case SPDK_POLLER_STATE_PAUSED: 1686 return "paused"; 1687 default: 1688 return NULL; 1689 } 1690 } 1691 1692 uint64_t 1693 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1694 { 1695 return poller->period_ticks; 1696 } 1697 1698 void 1699 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1700 { 1701 stats->run_count = poller->run_count; 1702 stats->busy_count = poller->busy_count; 1703 } 1704 1705 struct spdk_poller * 1706 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1707 { 1708 return TAILQ_FIRST(&thread->active_pollers); 1709 } 1710 1711 struct spdk_poller * 1712 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1713 { 1714 return TAILQ_NEXT(prev, tailq); 1715 } 1716 1717 struct spdk_poller * 1718 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1719 { 1720 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1721 } 1722 1723 struct spdk_poller * 1724 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1725 { 1726 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1727 } 1728 1729 struct spdk_poller * 1730 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1731 { 1732 return TAILQ_FIRST(&thread->paused_pollers); 1733 } 1734 1735 struct spdk_poller * 1736 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1737 { 1738 return TAILQ_NEXT(prev, tailq); 1739 } 1740 1741 struct spdk_io_channel * 1742 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1743 { 1744 return TAILQ_FIRST(&thread->io_channels); 1745 } 1746 1747 struct spdk_io_channel * 1748 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1749 { 1750 return TAILQ_NEXT(prev, tailq); 1751 } 1752 1753 struct call_thread { 1754 struct spdk_thread *cur_thread; 1755 spdk_msg_fn fn; 1756 void *ctx; 1757 1758 struct spdk_thread *orig_thread; 1759 spdk_msg_fn cpl; 1760 }; 1761 1762 static void 1763 _on_thread(void *ctx) 1764 { 1765 struct call_thread *ct = ctx; 1766 int rc __attribute__((unused)); 1767 1768 ct->fn(ct->ctx); 1769 1770 pthread_mutex_lock(&g_devlist_mutex); 1771 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1772 pthread_mutex_unlock(&g_devlist_mutex); 1773 1774 if (!ct->cur_thread) { 1775 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1776 1777 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1778 free(ctx); 1779 } else { 1780 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1781 ct->cur_thread->name); 1782 1783 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1784 } 1785 assert(rc == 0); 1786 } 1787 1788 void 1789 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1790 { 1791 struct call_thread *ct; 1792 struct spdk_thread *thread; 1793 int rc __attribute__((unused)); 1794 1795 ct = calloc(1, sizeof(*ct)); 1796 if (!ct) { 1797 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1798 cpl(ctx); 1799 return; 1800 } 1801 1802 ct->fn = fn; 1803 ct->ctx = ctx; 1804 ct->cpl = cpl; 1805 1806 thread = _get_thread(); 1807 if (!thread) { 1808 SPDK_ERRLOG("No thread allocated\n"); 1809 free(ct); 1810 cpl(ctx); 1811 return; 1812 } 1813 ct->orig_thread = thread; 1814 1815 pthread_mutex_lock(&g_devlist_mutex); 1816 ct->cur_thread = TAILQ_FIRST(&g_threads); 1817 pthread_mutex_unlock(&g_devlist_mutex); 1818 1819 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1820 ct->orig_thread->name); 1821 1822 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1823 assert(rc == 0); 1824 } 1825 1826 static inline void 1827 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1828 { 1829 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1830 return; 1831 } 1832 1833 if (!poller->set_intr_cb_fn) { 1834 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1835 assert(false); 1836 return; 1837 } 1838 1839 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1840 } 1841 1842 void 1843 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1844 { 1845 struct spdk_thread *thread = _get_thread(); 1846 struct spdk_poller *poller, *tmp; 1847 1848 assert(thread); 1849 assert(spdk_interrupt_mode_is_enabled()); 1850 1851 if (thread->in_interrupt == enable_interrupt) { 1852 return; 1853 } 1854 1855 /* Set pollers to expected mode */ 1856 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1857 poller_set_interrupt_mode(poller, enable_interrupt); 1858 } 1859 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1860 poller_set_interrupt_mode(poller, enable_interrupt); 1861 } 1862 /* All paused pollers will go to work in interrupt mode */ 1863 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1864 poller_set_interrupt_mode(poller, enable_interrupt); 1865 } 1866 1867 thread->in_interrupt = enable_interrupt; 1868 return; 1869 } 1870 1871 static struct io_device * 1872 io_device_get(void *io_device) 1873 { 1874 struct io_device find = {}; 1875 1876 find.io_device = io_device; 1877 return RB_FIND(io_device_tree, &g_io_devices, &find); 1878 } 1879 1880 void 1881 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1882 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1883 const char *name) 1884 { 1885 struct io_device *dev, *tmp; 1886 struct spdk_thread *thread; 1887 1888 assert(io_device != NULL); 1889 assert(create_cb != NULL); 1890 assert(destroy_cb != NULL); 1891 1892 thread = spdk_get_thread(); 1893 if (!thread) { 1894 SPDK_ERRLOG("called from non-SPDK thread\n"); 1895 assert(false); 1896 return; 1897 } 1898 1899 dev = calloc(1, sizeof(struct io_device)); 1900 if (dev == NULL) { 1901 SPDK_ERRLOG("could not allocate io_device\n"); 1902 return; 1903 } 1904 1905 dev->io_device = io_device; 1906 if (name) { 1907 snprintf(dev->name, sizeof(dev->name), "%s", name); 1908 } else { 1909 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1910 } 1911 dev->create_cb = create_cb; 1912 dev->destroy_cb = destroy_cb; 1913 dev->unregister_cb = NULL; 1914 dev->ctx_size = ctx_size; 1915 dev->for_each_count = 0; 1916 dev->unregistered = false; 1917 dev->refcnt = 0; 1918 1919 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1920 dev->name, dev->io_device, thread->name); 1921 1922 pthread_mutex_lock(&g_devlist_mutex); 1923 tmp = RB_INSERT(io_device_tree, &g_io_devices, dev); 1924 if (tmp != NULL) { 1925 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1926 io_device, tmp->name, dev->name); 1927 pthread_mutex_unlock(&g_devlist_mutex); 1928 free(dev); 1929 } 1930 1931 pthread_mutex_unlock(&g_devlist_mutex); 1932 } 1933 1934 static void 1935 _finish_unregister(void *arg) 1936 { 1937 struct io_device *dev = arg; 1938 struct spdk_thread *thread; 1939 1940 thread = spdk_get_thread(); 1941 assert(thread == dev->unregister_thread); 1942 1943 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1944 dev->name, dev->io_device, thread->name); 1945 1946 assert(thread->pending_unregister_count > 0); 1947 thread->pending_unregister_count--; 1948 1949 dev->unregister_cb(dev->io_device); 1950 free(dev); 1951 } 1952 1953 static void 1954 io_device_free(struct io_device *dev) 1955 { 1956 int rc __attribute__((unused)); 1957 1958 if (dev->unregister_cb == NULL) { 1959 free(dev); 1960 } else { 1961 assert(dev->unregister_thread != NULL); 1962 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1963 dev->name, dev->io_device, dev->unregister_thread->name); 1964 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1965 assert(rc == 0); 1966 } 1967 } 1968 1969 void 1970 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1971 { 1972 struct io_device *dev; 1973 uint32_t refcnt; 1974 struct spdk_thread *thread; 1975 1976 thread = spdk_get_thread(); 1977 if (!thread) { 1978 SPDK_ERRLOG("called from non-SPDK thread\n"); 1979 assert(false); 1980 return; 1981 } 1982 1983 pthread_mutex_lock(&g_devlist_mutex); 1984 dev = io_device_get(io_device); 1985 if (!dev) { 1986 SPDK_ERRLOG("io_device %p not found\n", io_device); 1987 assert(false); 1988 pthread_mutex_unlock(&g_devlist_mutex); 1989 return; 1990 } 1991 1992 if (dev->for_each_count > 0) { 1993 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1994 dev->name, io_device, dev->for_each_count); 1995 pthread_mutex_unlock(&g_devlist_mutex); 1996 return; 1997 } 1998 1999 dev->unregister_cb = unregister_cb; 2000 dev->unregistered = true; 2001 RB_REMOVE(io_device_tree, &g_io_devices, dev); 2002 refcnt = dev->refcnt; 2003 dev->unregister_thread = thread; 2004 pthread_mutex_unlock(&g_devlist_mutex); 2005 2006 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 2007 dev->name, dev->io_device, thread->name); 2008 2009 if (unregister_cb) { 2010 thread->pending_unregister_count++; 2011 } 2012 2013 if (refcnt > 0) { 2014 /* defer deletion */ 2015 return; 2016 } 2017 2018 io_device_free(dev); 2019 } 2020 2021 const char * 2022 spdk_io_device_get_name(struct io_device *dev) 2023 { 2024 return dev->name; 2025 } 2026 2027 struct spdk_io_channel * 2028 spdk_get_io_channel(void *io_device) 2029 { 2030 struct spdk_io_channel *ch; 2031 struct spdk_thread *thread; 2032 struct io_device *dev; 2033 int rc; 2034 2035 pthread_mutex_lock(&g_devlist_mutex); 2036 dev = io_device_get(io_device); 2037 if (dev == NULL) { 2038 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2039 pthread_mutex_unlock(&g_devlist_mutex); 2040 return NULL; 2041 } 2042 2043 thread = _get_thread(); 2044 if (!thread) { 2045 SPDK_ERRLOG("No thread allocated\n"); 2046 pthread_mutex_unlock(&g_devlist_mutex); 2047 return NULL; 2048 } 2049 2050 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2051 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2052 pthread_mutex_unlock(&g_devlist_mutex); 2053 return NULL; 2054 } 2055 2056 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2057 if (ch->dev == dev) { 2058 ch->ref++; 2059 2060 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2061 ch, dev->name, dev->io_device, thread->name, ch->ref); 2062 2063 /* 2064 * An I/O channel already exists for this device on this 2065 * thread, so return it. 2066 */ 2067 pthread_mutex_unlock(&g_devlist_mutex); 2068 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2069 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2070 return ch; 2071 } 2072 } 2073 2074 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2075 if (ch == NULL) { 2076 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2077 pthread_mutex_unlock(&g_devlist_mutex); 2078 return NULL; 2079 } 2080 2081 ch->dev = dev; 2082 ch->destroy_cb = dev->destroy_cb; 2083 ch->thread = thread; 2084 ch->ref = 1; 2085 ch->destroy_ref = 0; 2086 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 2087 2088 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2089 ch, dev->name, dev->io_device, thread->name, ch->ref); 2090 2091 dev->refcnt++; 2092 2093 pthread_mutex_unlock(&g_devlist_mutex); 2094 2095 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2096 if (rc != 0) { 2097 pthread_mutex_lock(&g_devlist_mutex); 2098 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 2099 dev->refcnt--; 2100 free(ch); 2101 pthread_mutex_unlock(&g_devlist_mutex); 2102 return NULL; 2103 } 2104 2105 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2106 return ch; 2107 } 2108 2109 static void 2110 put_io_channel(void *arg) 2111 { 2112 struct spdk_io_channel *ch = arg; 2113 bool do_remove_dev = true; 2114 struct spdk_thread *thread; 2115 2116 thread = spdk_get_thread(); 2117 if (!thread) { 2118 SPDK_ERRLOG("called from non-SPDK thread\n"); 2119 assert(false); 2120 return; 2121 } 2122 2123 SPDK_DEBUGLOG(thread, 2124 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2125 ch, ch->dev->name, ch->dev->io_device, thread->name); 2126 2127 assert(ch->thread == thread); 2128 2129 ch->destroy_ref--; 2130 2131 if (ch->ref > 0 || ch->destroy_ref > 0) { 2132 /* 2133 * Another reference to the associated io_device was requested 2134 * after this message was sent but before it had a chance to 2135 * execute. 2136 */ 2137 return; 2138 } 2139 2140 pthread_mutex_lock(&g_devlist_mutex); 2141 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 2142 pthread_mutex_unlock(&g_devlist_mutex); 2143 2144 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2145 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2146 2147 pthread_mutex_lock(&g_devlist_mutex); 2148 ch->dev->refcnt--; 2149 2150 if (!ch->dev->unregistered) { 2151 do_remove_dev = false; 2152 } 2153 2154 if (ch->dev->refcnt > 0) { 2155 do_remove_dev = false; 2156 } 2157 2158 pthread_mutex_unlock(&g_devlist_mutex); 2159 2160 if (do_remove_dev) { 2161 io_device_free(ch->dev); 2162 } 2163 free(ch); 2164 } 2165 2166 void 2167 spdk_put_io_channel(struct spdk_io_channel *ch) 2168 { 2169 struct spdk_thread *thread; 2170 int rc __attribute__((unused)); 2171 2172 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2173 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2174 2175 thread = spdk_get_thread(); 2176 if (!thread) { 2177 SPDK_ERRLOG("called from non-SPDK thread\n"); 2178 assert(false); 2179 return; 2180 } 2181 2182 if (ch->thread != thread) { 2183 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 2184 assert(false); 2185 return; 2186 } 2187 2188 SPDK_DEBUGLOG(thread, 2189 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2190 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2191 2192 ch->ref--; 2193 2194 if (ch->ref == 0) { 2195 ch->destroy_ref++; 2196 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2197 assert(rc == 0); 2198 } 2199 } 2200 2201 struct spdk_io_channel * 2202 spdk_io_channel_from_ctx(void *ctx) 2203 { 2204 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2205 } 2206 2207 struct spdk_thread * 2208 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2209 { 2210 return ch->thread; 2211 } 2212 2213 void * 2214 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2215 { 2216 return ch->dev->io_device; 2217 } 2218 2219 const char * 2220 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2221 { 2222 return spdk_io_device_get_name(ch->dev); 2223 } 2224 2225 int 2226 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2227 { 2228 return ch->ref; 2229 } 2230 2231 struct spdk_io_channel_iter { 2232 void *io_device; 2233 struct io_device *dev; 2234 spdk_channel_msg fn; 2235 int status; 2236 void *ctx; 2237 struct spdk_io_channel *ch; 2238 2239 struct spdk_thread *cur_thread; 2240 2241 struct spdk_thread *orig_thread; 2242 spdk_channel_for_each_cpl cpl; 2243 }; 2244 2245 void * 2246 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2247 { 2248 return i->io_device; 2249 } 2250 2251 struct spdk_io_channel * 2252 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2253 { 2254 return i->ch; 2255 } 2256 2257 void * 2258 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2259 { 2260 return i->ctx; 2261 } 2262 2263 static void 2264 _call_completion(void *ctx) 2265 { 2266 struct spdk_io_channel_iter *i = ctx; 2267 2268 if (i->cpl != NULL) { 2269 i->cpl(i, i->status); 2270 } 2271 free(i); 2272 } 2273 2274 static void 2275 _call_channel(void *ctx) 2276 { 2277 struct spdk_io_channel_iter *i = ctx; 2278 struct spdk_io_channel *ch; 2279 2280 /* 2281 * It is possible that the channel was deleted before this 2282 * message had a chance to execute. If so, skip calling 2283 * the fn() on this thread. 2284 */ 2285 pthread_mutex_lock(&g_devlist_mutex); 2286 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 2287 if (ch->dev->io_device == i->io_device) { 2288 break; 2289 } 2290 } 2291 pthread_mutex_unlock(&g_devlist_mutex); 2292 2293 if (ch) { 2294 i->fn(i); 2295 } else { 2296 spdk_for_each_channel_continue(i, 0); 2297 } 2298 } 2299 2300 void 2301 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2302 spdk_channel_for_each_cpl cpl) 2303 { 2304 struct spdk_thread *thread; 2305 struct spdk_io_channel *ch; 2306 struct spdk_io_channel_iter *i; 2307 int rc __attribute__((unused)); 2308 2309 i = calloc(1, sizeof(*i)); 2310 if (!i) { 2311 SPDK_ERRLOG("Unable to allocate iterator\n"); 2312 return; 2313 } 2314 2315 i->io_device = io_device; 2316 i->fn = fn; 2317 i->ctx = ctx; 2318 i->cpl = cpl; 2319 2320 pthread_mutex_lock(&g_devlist_mutex); 2321 i->orig_thread = _get_thread(); 2322 2323 TAILQ_FOREACH(thread, &g_threads, tailq) { 2324 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2325 if (ch->dev->io_device == io_device) { 2326 ch->dev->for_each_count++; 2327 i->dev = ch->dev; 2328 i->cur_thread = thread; 2329 i->ch = ch; 2330 pthread_mutex_unlock(&g_devlist_mutex); 2331 rc = spdk_thread_send_msg(thread, _call_channel, i); 2332 assert(rc == 0); 2333 return; 2334 } 2335 } 2336 } 2337 2338 pthread_mutex_unlock(&g_devlist_mutex); 2339 2340 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2341 assert(rc == 0); 2342 } 2343 2344 void 2345 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2346 { 2347 struct spdk_thread *thread; 2348 struct spdk_io_channel *ch; 2349 int rc __attribute__((unused)); 2350 2351 assert(i->cur_thread == spdk_get_thread()); 2352 2353 i->status = status; 2354 2355 pthread_mutex_lock(&g_devlist_mutex); 2356 if (status) { 2357 goto end; 2358 } 2359 thread = TAILQ_NEXT(i->cur_thread, tailq); 2360 while (thread) { 2361 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2362 if (ch->dev->io_device == i->io_device) { 2363 i->cur_thread = thread; 2364 i->ch = ch; 2365 pthread_mutex_unlock(&g_devlist_mutex); 2366 rc = spdk_thread_send_msg(thread, _call_channel, i); 2367 assert(rc == 0); 2368 return; 2369 } 2370 } 2371 thread = TAILQ_NEXT(thread, tailq); 2372 } 2373 2374 end: 2375 i->dev->for_each_count--; 2376 i->ch = NULL; 2377 pthread_mutex_unlock(&g_devlist_mutex); 2378 2379 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2380 assert(rc == 0); 2381 } 2382 2383 struct spdk_interrupt { 2384 int efd; 2385 struct spdk_thread *thread; 2386 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2387 }; 2388 2389 static void 2390 thread_interrupt_destroy(struct spdk_thread *thread) 2391 { 2392 struct spdk_fd_group *fgrp = thread->fgrp; 2393 2394 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2395 2396 if (thread->msg_fd < 0) { 2397 return; 2398 } 2399 2400 spdk_fd_group_remove(fgrp, thread->msg_fd); 2401 close(thread->msg_fd); 2402 thread->msg_fd = -1; 2403 2404 spdk_fd_group_destroy(fgrp); 2405 thread->fgrp = NULL; 2406 } 2407 2408 #ifdef __linux__ 2409 static int 2410 thread_interrupt_msg_process(void *arg) 2411 { 2412 struct spdk_thread *thread = arg; 2413 uint32_t msg_count; 2414 spdk_msg_fn critical_msg; 2415 int rc = 0; 2416 uint64_t notify = 1; 2417 2418 assert(spdk_interrupt_mode_is_enabled()); 2419 2420 /* There may be race between msg_acknowledge and another producer's msg_notify, 2421 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2422 * This can avoid msg notification missing. 2423 */ 2424 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2425 if (rc < 0 && errno != EAGAIN) { 2426 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2427 } 2428 2429 critical_msg = thread->critical_msg; 2430 if (spdk_unlikely(critical_msg != NULL)) { 2431 critical_msg(NULL); 2432 thread->critical_msg = NULL; 2433 rc = 1; 2434 } 2435 2436 msg_count = msg_queue_run_batch(thread, 0); 2437 if (msg_count) { 2438 rc = 1; 2439 } 2440 2441 return rc; 2442 } 2443 2444 static int 2445 thread_interrupt_create(struct spdk_thread *thread) 2446 { 2447 int rc; 2448 2449 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2450 2451 rc = spdk_fd_group_create(&thread->fgrp); 2452 if (rc) { 2453 return rc; 2454 } 2455 2456 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2457 if (thread->msg_fd < 0) { 2458 rc = -errno; 2459 spdk_fd_group_destroy(thread->fgrp); 2460 thread->fgrp = NULL; 2461 2462 return rc; 2463 } 2464 2465 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 2466 } 2467 #else 2468 static int 2469 thread_interrupt_create(struct spdk_thread *thread) 2470 { 2471 return -ENOTSUP; 2472 } 2473 #endif 2474 2475 struct spdk_interrupt * 2476 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2477 void *arg, const char *name) 2478 { 2479 struct spdk_thread *thread; 2480 struct spdk_interrupt *intr; 2481 2482 thread = spdk_get_thread(); 2483 if (!thread) { 2484 assert(false); 2485 return NULL; 2486 } 2487 2488 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2489 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2490 return NULL; 2491 } 2492 2493 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 2494 return NULL; 2495 } 2496 2497 intr = calloc(1, sizeof(*intr)); 2498 if (intr == NULL) { 2499 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2500 return NULL; 2501 } 2502 2503 if (name) { 2504 snprintf(intr->name, sizeof(intr->name), "%s", name); 2505 } else { 2506 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2507 } 2508 2509 intr->efd = efd; 2510 intr->thread = thread; 2511 2512 return intr; 2513 } 2514 2515 void 2516 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2517 { 2518 struct spdk_thread *thread; 2519 struct spdk_interrupt *intr; 2520 2521 intr = *pintr; 2522 if (intr == NULL) { 2523 return; 2524 } 2525 2526 *pintr = NULL; 2527 2528 thread = spdk_get_thread(); 2529 if (!thread) { 2530 assert(false); 2531 return; 2532 } 2533 2534 if (intr->thread != thread) { 2535 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2536 assert(false); 2537 return; 2538 } 2539 2540 spdk_fd_group_remove(thread->fgrp, intr->efd); 2541 free(intr); 2542 } 2543 2544 int 2545 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2546 enum spdk_interrupt_event_types event_types) 2547 { 2548 struct spdk_thread *thread; 2549 2550 thread = spdk_get_thread(); 2551 if (!thread) { 2552 assert(false); 2553 return -EINVAL; 2554 } 2555 2556 if (intr->thread != thread) { 2557 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2558 assert(false); 2559 return -EINVAL; 2560 } 2561 2562 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2563 } 2564 2565 int 2566 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2567 { 2568 return spdk_fd_group_get_fd(thread->fgrp); 2569 } 2570 2571 static bool g_interrupt_mode = false; 2572 2573 int 2574 spdk_interrupt_mode_enable(void) 2575 { 2576 /* It must be called once prior to initializing the threading library. 2577 * g_spdk_msg_mempool will be valid if thread library is initialized. 2578 */ 2579 if (g_spdk_msg_mempool) { 2580 SPDK_ERRLOG("Failed due to threading library is already initailzied.\n"); 2581 return -1; 2582 } 2583 2584 #ifdef __linux__ 2585 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2586 g_interrupt_mode = true; 2587 return 0; 2588 #else 2589 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2590 g_interrupt_mode = false; 2591 return -ENOTSUP; 2592 #endif 2593 } 2594 2595 bool 2596 spdk_interrupt_mode_is_enabled(void) 2597 { 2598 return g_interrupt_mode; 2599 } 2600 2601 SPDK_LOG_REGISTER_COMPONENT(thread) 2602