1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/tree.h" 42 #include "spdk/util.h" 43 #include "spdk/fd_group.h" 44 45 #include "spdk/log.h" 46 #include "spdk_internal/thread.h" 47 48 #ifdef __linux__ 49 #include <sys/timerfd.h> 50 #include <sys/eventfd.h> 51 #endif 52 53 #define SPDK_MSG_BATCH_SIZE 8 54 #define SPDK_MAX_DEVICE_NAME_LEN 256 55 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 56 #define SPDK_MAX_POLLER_NAME_LEN 256 57 #define SPDK_MAX_THREAD_NAME_LEN 256 58 59 enum spdk_poller_state { 60 /* The poller is registered with a thread but not currently executing its fn. */ 61 SPDK_POLLER_STATE_WAITING, 62 63 /* The poller is currently running its fn. */ 64 SPDK_POLLER_STATE_RUNNING, 65 66 /* The poller was unregistered during the execution of its fn. */ 67 SPDK_POLLER_STATE_UNREGISTERED, 68 69 /* The poller is in the process of being paused. It will be paused 70 * during the next time it's supposed to be executed. 71 */ 72 SPDK_POLLER_STATE_PAUSING, 73 74 /* The poller is registered but currently paused. It's on the 75 * paused_pollers list. 76 */ 77 SPDK_POLLER_STATE_PAUSED, 78 }; 79 80 struct spdk_poller { 81 TAILQ_ENTRY(spdk_poller) tailq; 82 RB_ENTRY(spdk_poller) node; 83 84 /* Current state of the poller; should only be accessed from the poller's thread. */ 85 enum spdk_poller_state state; 86 87 uint64_t period_ticks; 88 uint64_t next_run_tick; 89 uint64_t run_count; 90 uint64_t busy_count; 91 spdk_poller_fn fn; 92 void *arg; 93 struct spdk_thread *thread; 94 int interruptfd; 95 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 96 void *set_intr_cb_arg; 97 98 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 99 }; 100 101 enum spdk_thread_state { 102 /* The thread is pocessing poller and message by spdk_thread_poll(). */ 103 SPDK_THREAD_STATE_RUNNING, 104 105 /* The thread is in the process of termination. It reaps unregistering 106 * poller are releasing I/O channel. 107 */ 108 SPDK_THREAD_STATE_EXITING, 109 110 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 111 SPDK_THREAD_STATE_EXITED, 112 }; 113 114 struct spdk_thread { 115 uint64_t tsc_last; 116 struct spdk_thread_stats stats; 117 /* 118 * Contains pollers actively running on this thread. Pollers 119 * are run round-robin. The thread takes one poller from the head 120 * of the ring, executes it, then puts it back at the tail of 121 * the ring. 122 */ 123 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 124 /** 125 * Contains pollers running on this thread with a periodic timer. 126 */ 127 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 128 struct spdk_poller *first_timed_poller; 129 /* 130 * Contains paused pollers. Pollers on this queue are waiting until 131 * they are resumed (in which case they're put onto the active/timer 132 * queues) or unregistered. 133 */ 134 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 135 struct spdk_ring *messages; 136 int msg_fd; 137 SLIST_HEAD(, spdk_msg) msg_cache; 138 size_t msg_cache_count; 139 spdk_msg_fn critical_msg; 140 uint64_t id; 141 enum spdk_thread_state state; 142 int pending_unregister_count; 143 144 TAILQ_HEAD(, spdk_io_channel) io_channels; 145 TAILQ_ENTRY(spdk_thread) tailq; 146 147 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 148 struct spdk_cpuset cpumask; 149 uint64_t exit_timeout_tsc; 150 151 /* Indicates whether this spdk_thread currently runs in interrupt. */ 152 bool in_interrupt; 153 struct spdk_fd_group *fgrp; 154 155 /* User context allocated at the end */ 156 uint8_t ctx[0]; 157 }; 158 159 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 160 161 static spdk_new_thread_fn g_new_thread_fn = NULL; 162 static spdk_thread_op_fn g_thread_op_fn = NULL; 163 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 164 static size_t g_ctx_sz = 0; 165 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 166 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 167 * SPDK application is required. 168 */ 169 static uint64_t g_thread_id = 1; 170 171 struct io_device { 172 void *io_device; 173 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 174 spdk_io_channel_create_cb create_cb; 175 spdk_io_channel_destroy_cb destroy_cb; 176 spdk_io_device_unregister_cb unregister_cb; 177 struct spdk_thread *unregister_thread; 178 uint32_t ctx_size; 179 uint32_t for_each_count; 180 TAILQ_ENTRY(io_device) tailq; 181 182 uint32_t refcnt; 183 184 bool unregistered; 185 }; 186 187 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 188 189 struct spdk_msg { 190 spdk_msg_fn fn; 191 void *arg; 192 193 SLIST_ENTRY(spdk_msg) link; 194 }; 195 196 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 197 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 198 199 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 200 static uint32_t g_thread_count = 0; 201 202 static __thread struct spdk_thread *tls_thread = NULL; 203 204 /* 205 * If this compare function returns zero when two next_run_ticks are equal, 206 * the macro RB_INSERT() returns a pointer to the element with the same 207 * next_run_tick. 208 * 209 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 210 * to remove as a parameter. 211 * 212 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 213 * side by returning 1 when two next_run_ticks are equal. 214 */ 215 static inline int 216 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 217 { 218 if (poller1->next_run_tick < poller2->next_run_tick) { 219 return -1; 220 } else { 221 return 1; 222 } 223 } 224 225 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 226 227 static inline struct spdk_thread * 228 _get_thread(void) 229 { 230 return tls_thread; 231 } 232 233 static int 234 _thread_lib_init(size_t ctx_sz) 235 { 236 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 237 238 g_ctx_sz = ctx_sz; 239 240 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 241 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 242 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 243 sizeof(struct spdk_msg), 244 0, /* No cache. We do our own. */ 245 SPDK_ENV_SOCKET_ID_ANY); 246 247 if (!g_spdk_msg_mempool) { 248 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 249 return -1; 250 } 251 252 return 0; 253 } 254 255 int 256 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 257 { 258 assert(g_new_thread_fn == NULL); 259 assert(g_thread_op_fn == NULL); 260 261 if (new_thread_fn == NULL) { 262 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 263 } else { 264 g_new_thread_fn = new_thread_fn; 265 } 266 267 return _thread_lib_init(ctx_sz); 268 } 269 270 int 271 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 272 spdk_thread_op_supported_fn thread_op_supported_fn, 273 size_t ctx_sz) 274 { 275 assert(g_new_thread_fn == NULL); 276 assert(g_thread_op_fn == NULL); 277 assert(g_thread_op_supported_fn == NULL); 278 279 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 280 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 281 return -EINVAL; 282 } 283 284 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 285 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 286 } else { 287 g_thread_op_fn = thread_op_fn; 288 g_thread_op_supported_fn = thread_op_supported_fn; 289 } 290 291 return _thread_lib_init(ctx_sz); 292 } 293 294 void 295 spdk_thread_lib_fini(void) 296 { 297 struct io_device *dev; 298 299 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 300 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 301 } 302 303 if (g_spdk_msg_mempool) { 304 spdk_mempool_free(g_spdk_msg_mempool); 305 g_spdk_msg_mempool = NULL; 306 } 307 308 g_new_thread_fn = NULL; 309 g_thread_op_fn = NULL; 310 g_thread_op_supported_fn = NULL; 311 g_ctx_sz = 0; 312 } 313 314 static void thread_interrupt_destroy(struct spdk_thread *thread); 315 static int thread_interrupt_create(struct spdk_thread *thread); 316 317 static void 318 _free_thread(struct spdk_thread *thread) 319 { 320 struct spdk_io_channel *ch; 321 struct spdk_msg *msg; 322 struct spdk_poller *poller, *ptmp; 323 324 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 325 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 326 thread->name, ch->dev->name); 327 } 328 329 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 330 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 331 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 332 poller->name); 333 } 334 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 335 free(poller); 336 } 337 338 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 339 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 340 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 341 poller->name); 342 } 343 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 344 free(poller); 345 } 346 347 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 348 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 349 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 350 free(poller); 351 } 352 353 pthread_mutex_lock(&g_devlist_mutex); 354 assert(g_thread_count > 0); 355 g_thread_count--; 356 TAILQ_REMOVE(&g_threads, thread, tailq); 357 pthread_mutex_unlock(&g_devlist_mutex); 358 359 msg = SLIST_FIRST(&thread->msg_cache); 360 while (msg != NULL) { 361 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 362 363 assert(thread->msg_cache_count > 0); 364 thread->msg_cache_count--; 365 spdk_mempool_put(g_spdk_msg_mempool, msg); 366 367 msg = SLIST_FIRST(&thread->msg_cache); 368 } 369 370 assert(thread->msg_cache_count == 0); 371 372 if (spdk_interrupt_mode_is_enabled()) { 373 thread_interrupt_destroy(thread); 374 } 375 376 spdk_ring_free(thread->messages); 377 free(thread); 378 } 379 380 struct spdk_thread * 381 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 382 { 383 struct spdk_thread *thread; 384 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 385 int rc = 0, i; 386 387 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 388 if (!thread) { 389 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 390 return NULL; 391 } 392 393 if (cpumask) { 394 spdk_cpuset_copy(&thread->cpumask, cpumask); 395 } else { 396 spdk_cpuset_negate(&thread->cpumask); 397 } 398 399 TAILQ_INIT(&thread->io_channels); 400 TAILQ_INIT(&thread->active_pollers); 401 RB_INIT(&thread->timed_pollers); 402 TAILQ_INIT(&thread->paused_pollers); 403 SLIST_INIT(&thread->msg_cache); 404 thread->msg_cache_count = 0; 405 406 thread->tsc_last = spdk_get_ticks(); 407 408 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 409 if (!thread->messages) { 410 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 411 free(thread); 412 return NULL; 413 } 414 415 /* Fill the local message pool cache. */ 416 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 417 if (rc == 0) { 418 /* If we can't populate the cache it's ok. The cache will get filled 419 * up organically as messages are passed to the thread. */ 420 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 421 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 422 thread->msg_cache_count++; 423 } 424 } 425 426 if (name) { 427 snprintf(thread->name, sizeof(thread->name), "%s", name); 428 } else { 429 snprintf(thread->name, sizeof(thread->name), "%p", thread); 430 } 431 432 pthread_mutex_lock(&g_devlist_mutex); 433 if (g_thread_id == 0) { 434 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 435 pthread_mutex_unlock(&g_devlist_mutex); 436 _free_thread(thread); 437 return NULL; 438 } 439 thread->id = g_thread_id++; 440 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 441 g_thread_count++; 442 pthread_mutex_unlock(&g_devlist_mutex); 443 444 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 445 thread->id, thread->name); 446 447 if (spdk_interrupt_mode_is_enabled()) { 448 thread->in_interrupt = true; 449 rc = thread_interrupt_create(thread); 450 if (rc != 0) { 451 _free_thread(thread); 452 return NULL; 453 } 454 } 455 456 if (g_new_thread_fn) { 457 rc = g_new_thread_fn(thread); 458 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 459 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 460 } 461 462 if (rc != 0) { 463 _free_thread(thread); 464 return NULL; 465 } 466 467 thread->state = SPDK_THREAD_STATE_RUNNING; 468 469 return thread; 470 } 471 472 void 473 spdk_set_thread(struct spdk_thread *thread) 474 { 475 tls_thread = thread; 476 } 477 478 static void 479 thread_exit(struct spdk_thread *thread, uint64_t now) 480 { 481 struct spdk_poller *poller; 482 struct spdk_io_channel *ch; 483 484 if (now >= thread->exit_timeout_tsc) { 485 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 486 thread->name); 487 goto exited; 488 } 489 490 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 491 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 492 SPDK_INFOLOG(thread, 493 "thread %s still has active poller %s\n", 494 thread->name, poller->name); 495 return; 496 } 497 } 498 499 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 500 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 501 SPDK_INFOLOG(thread, 502 "thread %s still has active timed poller %s\n", 503 thread->name, poller->name); 504 return; 505 } 506 } 507 508 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 509 SPDK_INFOLOG(thread, 510 "thread %s still has paused poller %s\n", 511 thread->name, poller->name); 512 return; 513 } 514 515 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 516 SPDK_INFOLOG(thread, 517 "thread %s still has channel for io_device %s\n", 518 thread->name, ch->dev->name); 519 return; 520 } 521 522 if (thread->pending_unregister_count > 0) { 523 SPDK_INFOLOG(thread, 524 "thread %s is still unregistering io_devices\n", 525 thread->name); 526 return; 527 } 528 529 exited: 530 thread->state = SPDK_THREAD_STATE_EXITED; 531 } 532 533 int 534 spdk_thread_exit(struct spdk_thread *thread) 535 { 536 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 537 538 assert(tls_thread == thread); 539 540 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 541 SPDK_INFOLOG(thread, 542 "thread %s is already exiting\n", 543 thread->name); 544 return 0; 545 } 546 547 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 548 SPDK_THREAD_EXIT_TIMEOUT_SEC); 549 thread->state = SPDK_THREAD_STATE_EXITING; 550 return 0; 551 } 552 553 bool 554 spdk_thread_is_exited(struct spdk_thread *thread) 555 { 556 return thread->state == SPDK_THREAD_STATE_EXITED; 557 } 558 559 void 560 spdk_thread_destroy(struct spdk_thread *thread) 561 { 562 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 563 564 assert(thread->state == SPDK_THREAD_STATE_EXITED); 565 566 if (tls_thread == thread) { 567 tls_thread = NULL; 568 } 569 570 _free_thread(thread); 571 } 572 573 void * 574 spdk_thread_get_ctx(struct spdk_thread *thread) 575 { 576 if (g_ctx_sz > 0) { 577 return thread->ctx; 578 } 579 580 return NULL; 581 } 582 583 struct spdk_cpuset * 584 spdk_thread_get_cpumask(struct spdk_thread *thread) 585 { 586 return &thread->cpumask; 587 } 588 589 int 590 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 591 { 592 struct spdk_thread *thread; 593 594 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 595 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 596 assert(false); 597 return -ENOTSUP; 598 } 599 600 thread = spdk_get_thread(); 601 if (!thread) { 602 SPDK_ERRLOG("Called from non-SPDK thread\n"); 603 assert(false); 604 return -EINVAL; 605 } 606 607 spdk_cpuset_copy(&thread->cpumask, cpumask); 608 609 /* Invoke framework's reschedule operation. If this function is called multiple times 610 * in a single spdk_thread_poll() context, the last cpumask will be used in the 611 * reschedule operation. 612 */ 613 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 614 615 return 0; 616 } 617 618 struct spdk_thread * 619 spdk_thread_get_from_ctx(void *ctx) 620 { 621 if (ctx == NULL) { 622 assert(false); 623 return NULL; 624 } 625 626 assert(g_ctx_sz > 0); 627 628 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 629 } 630 631 static inline uint32_t 632 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 633 { 634 unsigned count, i; 635 void *messages[SPDK_MSG_BATCH_SIZE]; 636 uint64_t notify = 1; 637 int rc; 638 639 #ifdef DEBUG 640 /* 641 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 642 * so we will never actually read uninitialized data from events, but just to be sure 643 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 644 */ 645 memset(messages, 0, sizeof(messages)); 646 #endif 647 648 if (max_msgs > 0) { 649 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 650 } else { 651 max_msgs = SPDK_MSG_BATCH_SIZE; 652 } 653 654 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 655 if (spdk_unlikely(thread->in_interrupt) && 656 spdk_ring_count(thread->messages) != 0) { 657 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 658 if (rc < 0) { 659 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 660 } 661 } 662 if (count == 0) { 663 return 0; 664 } 665 666 for (i = 0; i < count; i++) { 667 struct spdk_msg *msg = messages[i]; 668 669 assert(msg != NULL); 670 msg->fn(msg->arg); 671 672 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 673 /* Insert the messages at the head. We want to re-use the hot 674 * ones. */ 675 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 676 thread->msg_cache_count++; 677 } else { 678 spdk_mempool_put(g_spdk_msg_mempool, msg); 679 } 680 } 681 682 return count; 683 } 684 685 static void 686 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 687 { 688 struct spdk_poller *tmp __attribute__((unused)); 689 690 poller->next_run_tick = now + poller->period_ticks; 691 692 /* 693 * Insert poller in the thread's timed_pollers tree by next scheduled run time 694 * as its key. 695 */ 696 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 697 assert(tmp == NULL); 698 699 /* Update the cache only if it is empty or the inserted poller is earlier than it. 700 * RB_MIN() is not necessary here because all pollers, which has exactly the same 701 * next_run_tick as the existing poller, are inserted on the right side. 702 */ 703 if (thread->first_timed_poller == NULL || 704 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 705 thread->first_timed_poller = poller; 706 } 707 } 708 709 #ifdef __linux__ 710 static inline void 711 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 712 { 713 struct spdk_poller *tmp __attribute__((unused)); 714 715 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 716 assert(tmp != NULL); 717 718 /* This function is not used in any case that is performance critical. 719 * Update the cache simply by RB_MIN() if it needs to be changed. 720 */ 721 if (thread->first_timed_poller == poller) { 722 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 723 } 724 } 725 #endif 726 727 static void 728 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 729 { 730 if (poller->period_ticks) { 731 poller_insert_timer(thread, poller, spdk_get_ticks()); 732 } else { 733 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 734 } 735 } 736 737 static inline void 738 thread_update_stats(struct spdk_thread *thread, uint64_t end, 739 uint64_t start, int rc) 740 { 741 if (rc == 0) { 742 /* Poller status idle */ 743 thread->stats.idle_tsc += end - start; 744 } else if (rc > 0) { 745 /* Poller status busy */ 746 thread->stats.busy_tsc += end - start; 747 } 748 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 749 thread->tsc_last = end; 750 } 751 752 static inline int 753 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 754 { 755 int rc; 756 757 switch (poller->state) { 758 case SPDK_POLLER_STATE_UNREGISTERED: 759 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 760 free(poller); 761 return 0; 762 case SPDK_POLLER_STATE_PAUSING: 763 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 764 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 765 poller->state = SPDK_POLLER_STATE_PAUSED; 766 return 0; 767 case SPDK_POLLER_STATE_WAITING: 768 break; 769 default: 770 assert(false); 771 break; 772 } 773 774 poller->state = SPDK_POLLER_STATE_RUNNING; 775 rc = poller->fn(poller->arg); 776 777 poller->run_count++; 778 if (rc > 0) { 779 poller->busy_count++; 780 } 781 782 #ifdef DEBUG 783 if (rc == -1) { 784 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 785 } 786 #endif 787 788 switch (poller->state) { 789 case SPDK_POLLER_STATE_UNREGISTERED: 790 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 791 free(poller); 792 break; 793 case SPDK_POLLER_STATE_PAUSING: 794 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 795 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 796 poller->state = SPDK_POLLER_STATE_PAUSED; 797 break; 798 case SPDK_POLLER_STATE_PAUSED: 799 case SPDK_POLLER_STATE_WAITING: 800 break; 801 case SPDK_POLLER_STATE_RUNNING: 802 poller->state = SPDK_POLLER_STATE_WAITING; 803 break; 804 default: 805 assert(false); 806 break; 807 } 808 809 return rc; 810 } 811 812 static inline int 813 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 814 uint64_t now) 815 { 816 int rc; 817 818 switch (poller->state) { 819 case SPDK_POLLER_STATE_UNREGISTERED: 820 free(poller); 821 return 0; 822 case SPDK_POLLER_STATE_PAUSING: 823 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 824 poller->state = SPDK_POLLER_STATE_PAUSED; 825 return 0; 826 case SPDK_POLLER_STATE_WAITING: 827 break; 828 default: 829 assert(false); 830 break; 831 } 832 833 poller->state = SPDK_POLLER_STATE_RUNNING; 834 rc = poller->fn(poller->arg); 835 836 poller->run_count++; 837 if (rc > 0) { 838 poller->busy_count++; 839 } 840 841 #ifdef DEBUG 842 if (rc == -1) { 843 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 844 } 845 #endif 846 847 switch (poller->state) { 848 case SPDK_POLLER_STATE_UNREGISTERED: 849 free(poller); 850 break; 851 case SPDK_POLLER_STATE_PAUSING: 852 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 853 poller->state = SPDK_POLLER_STATE_PAUSED; 854 break; 855 case SPDK_POLLER_STATE_PAUSED: 856 break; 857 case SPDK_POLLER_STATE_RUNNING: 858 poller->state = SPDK_POLLER_STATE_WAITING; 859 /* fallthrough */ 860 case SPDK_POLLER_STATE_WAITING: 861 poller_insert_timer(thread, poller, now); 862 break; 863 default: 864 assert(false); 865 break; 866 } 867 868 return rc; 869 } 870 871 static int 872 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 873 { 874 uint32_t msg_count; 875 struct spdk_poller *poller, *tmp; 876 spdk_msg_fn critical_msg; 877 int rc = 0; 878 879 thread->tsc_last = now; 880 881 critical_msg = thread->critical_msg; 882 if (spdk_unlikely(critical_msg != NULL)) { 883 critical_msg(NULL); 884 thread->critical_msg = NULL; 885 rc = 1; 886 } 887 888 msg_count = msg_queue_run_batch(thread, max_msgs); 889 if (msg_count) { 890 rc = 1; 891 } 892 893 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 894 active_pollers_head, tailq, tmp) { 895 int poller_rc; 896 897 poller_rc = thread_execute_poller(thread, poller); 898 if (poller_rc > rc) { 899 rc = poller_rc; 900 } 901 } 902 903 poller = thread->first_timed_poller; 904 while (poller != NULL) { 905 int timer_rc = 0; 906 907 if (now < poller->next_run_tick) { 908 break; 909 } 910 911 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 912 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 913 914 /* Update the cache to the next timed poller in the list 915 * only if the current poller is still the closest, otherwise, 916 * do nothing because the cache has been already updated. 917 */ 918 if (thread->first_timed_poller == poller) { 919 thread->first_timed_poller = tmp; 920 } 921 922 timer_rc = thread_execute_timed_poller(thread, poller, now); 923 if (timer_rc > rc) { 924 rc = timer_rc; 925 } 926 927 poller = tmp; 928 } 929 930 return rc; 931 } 932 933 int 934 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 935 { 936 struct spdk_thread *orig_thread; 937 int rc; 938 uint64_t notify = 1; 939 940 orig_thread = _get_thread(); 941 tls_thread = thread; 942 943 if (now == 0) { 944 now = spdk_get_ticks(); 945 } 946 947 if (spdk_likely(!thread->in_interrupt)) { 948 rc = thread_poll(thread, max_msgs, now); 949 if (spdk_unlikely(thread->in_interrupt)) { 950 /* The thread transitioned to interrupt mode during the above poll. 951 * Poll it one more time in case that during the transition time 952 * there is msg received without notification. 953 */ 954 rc = thread_poll(thread, max_msgs, now); 955 } 956 } else { 957 /* Non-block wait on thread's fd_group */ 958 rc = spdk_fd_group_wait(thread->fgrp, 0); 959 if (spdk_unlikely(!thread->in_interrupt)) { 960 /* The thread transitioned to poll mode in a msg during the above processing. 961 * Clear msg_fd since thread messages will be polled directly in poll mode. 962 */ 963 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 964 if (rc < 0 && errno != EAGAIN) { 965 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 966 } 967 } 968 } 969 970 971 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 972 thread_exit(thread, now); 973 } 974 975 thread_update_stats(thread, spdk_get_ticks(), now, rc); 976 977 tls_thread = orig_thread; 978 979 return rc; 980 } 981 982 uint64_t 983 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 984 { 985 struct spdk_poller *poller; 986 987 poller = thread->first_timed_poller; 988 if (poller) { 989 return poller->next_run_tick; 990 } 991 992 return 0; 993 } 994 995 int 996 spdk_thread_has_active_pollers(struct spdk_thread *thread) 997 { 998 return !TAILQ_EMPTY(&thread->active_pollers); 999 } 1000 1001 static bool 1002 thread_has_unpaused_pollers(struct spdk_thread *thread) 1003 { 1004 if (TAILQ_EMPTY(&thread->active_pollers) && 1005 RB_EMPTY(&thread->timed_pollers)) { 1006 return false; 1007 } 1008 1009 return true; 1010 } 1011 1012 bool 1013 spdk_thread_has_pollers(struct spdk_thread *thread) 1014 { 1015 if (!thread_has_unpaused_pollers(thread) && 1016 TAILQ_EMPTY(&thread->paused_pollers)) { 1017 return false; 1018 } 1019 1020 return true; 1021 } 1022 1023 bool 1024 spdk_thread_is_idle(struct spdk_thread *thread) 1025 { 1026 if (spdk_ring_count(thread->messages) || 1027 thread_has_unpaused_pollers(thread) || 1028 thread->critical_msg != NULL) { 1029 return false; 1030 } 1031 1032 return true; 1033 } 1034 1035 uint32_t 1036 spdk_thread_get_count(void) 1037 { 1038 /* 1039 * Return cached value of the current thread count. We could acquire the 1040 * lock and iterate through the TAILQ of threads to count them, but that 1041 * count could still be invalidated after we release the lock. 1042 */ 1043 return g_thread_count; 1044 } 1045 1046 struct spdk_thread * 1047 spdk_get_thread(void) 1048 { 1049 return _get_thread(); 1050 } 1051 1052 const char * 1053 spdk_thread_get_name(const struct spdk_thread *thread) 1054 { 1055 return thread->name; 1056 } 1057 1058 uint64_t 1059 spdk_thread_get_id(const struct spdk_thread *thread) 1060 { 1061 return thread->id; 1062 } 1063 1064 struct spdk_thread * 1065 spdk_thread_get_by_id(uint64_t id) 1066 { 1067 struct spdk_thread *thread; 1068 1069 if (id == 0 || id >= g_thread_id) { 1070 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1071 return NULL; 1072 } 1073 pthread_mutex_lock(&g_devlist_mutex); 1074 TAILQ_FOREACH(thread, &g_threads, tailq) { 1075 if (thread->id == id) { 1076 break; 1077 } 1078 } 1079 pthread_mutex_unlock(&g_devlist_mutex); 1080 return thread; 1081 } 1082 1083 int 1084 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1085 { 1086 struct spdk_thread *thread; 1087 1088 thread = _get_thread(); 1089 if (!thread) { 1090 SPDK_ERRLOG("No thread allocated\n"); 1091 return -EINVAL; 1092 } 1093 1094 if (stats == NULL) { 1095 return -EINVAL; 1096 } 1097 1098 *stats = thread->stats; 1099 1100 return 0; 1101 } 1102 1103 uint64_t 1104 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1105 { 1106 if (thread == NULL) { 1107 thread = _get_thread(); 1108 } 1109 1110 return thread->tsc_last; 1111 } 1112 1113 static inline int 1114 thread_send_msg_notification(const struct spdk_thread *target_thread) 1115 { 1116 uint64_t notify = 1; 1117 int rc; 1118 1119 /* Not necessary to do notification if interrupt facility is not enabled */ 1120 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1121 return 0; 1122 } 1123 1124 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1125 * after sending thread msg, it is necessary to check whether target thread runs in 1126 * interrupt mode and then decide whether do event notification. 1127 */ 1128 if (spdk_unlikely(target_thread->in_interrupt)) { 1129 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1130 if (rc < 0) { 1131 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1132 return -EIO; 1133 } 1134 } 1135 1136 return 0; 1137 } 1138 1139 int 1140 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1141 { 1142 struct spdk_thread *local_thread; 1143 struct spdk_msg *msg; 1144 int rc; 1145 1146 assert(thread != NULL); 1147 1148 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1149 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1150 return -EIO; 1151 } 1152 1153 local_thread = _get_thread(); 1154 1155 msg = NULL; 1156 if (local_thread != NULL) { 1157 if (local_thread->msg_cache_count > 0) { 1158 msg = SLIST_FIRST(&local_thread->msg_cache); 1159 assert(msg != NULL); 1160 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1161 local_thread->msg_cache_count--; 1162 } 1163 } 1164 1165 if (msg == NULL) { 1166 msg = spdk_mempool_get(g_spdk_msg_mempool); 1167 if (!msg) { 1168 SPDK_ERRLOG("msg could not be allocated\n"); 1169 return -ENOMEM; 1170 } 1171 } 1172 1173 msg->fn = fn; 1174 msg->arg = ctx; 1175 1176 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1177 if (rc != 1) { 1178 SPDK_ERRLOG("msg could not be enqueued\n"); 1179 spdk_mempool_put(g_spdk_msg_mempool, msg); 1180 return -EIO; 1181 } 1182 1183 return thread_send_msg_notification(thread); 1184 } 1185 1186 int 1187 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1188 { 1189 spdk_msg_fn expected = NULL; 1190 1191 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1192 __ATOMIC_SEQ_CST)) { 1193 return -EIO; 1194 } 1195 1196 return thread_send_msg_notification(thread); 1197 } 1198 1199 #ifdef __linux__ 1200 static int 1201 interrupt_timerfd_process(void *arg) 1202 { 1203 struct spdk_poller *poller = arg; 1204 uint64_t exp; 1205 int rc; 1206 1207 /* clear the level of interval timer */ 1208 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1209 if (rc < 0) { 1210 if (rc == -EAGAIN) { 1211 return 0; 1212 } 1213 1214 return rc; 1215 } 1216 1217 return poller->fn(poller->arg); 1218 } 1219 1220 static int 1221 period_poller_interrupt_init(struct spdk_poller *poller) 1222 { 1223 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1224 int timerfd; 1225 int rc; 1226 1227 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1228 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1229 if (timerfd < 0) { 1230 return -errno; 1231 } 1232 1233 rc = spdk_fd_group_add(fgrp, timerfd, 1234 interrupt_timerfd_process, poller); 1235 if (rc < 0) { 1236 close(timerfd); 1237 return rc; 1238 } 1239 1240 poller->interruptfd = timerfd; 1241 return 0; 1242 } 1243 1244 static void 1245 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1246 { 1247 int timerfd = poller->interruptfd; 1248 uint64_t now_tick = spdk_get_ticks(); 1249 uint64_t ticks = spdk_get_ticks_hz(); 1250 int ret; 1251 struct itimerspec new_tv = {}; 1252 struct itimerspec old_tv = {}; 1253 1254 assert(poller->period_ticks != 0); 1255 assert(timerfd >= 0); 1256 1257 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1258 interrupt_mode ? "interrupt" : "poll"); 1259 1260 if (interrupt_mode) { 1261 /* Set repeated timer expiration */ 1262 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1263 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1264 1265 /* Update next timer expiration */ 1266 if (poller->next_run_tick == 0) { 1267 poller->next_run_tick = now_tick + poller->period_ticks; 1268 } else if (poller->next_run_tick < now_tick) { 1269 poller->next_run_tick = now_tick; 1270 } 1271 1272 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1273 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1274 1275 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1276 if (ret < 0) { 1277 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1278 assert(false); 1279 } 1280 } else { 1281 /* Disarm the timer */ 1282 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1283 if (ret < 0) { 1284 /* timerfd_settime's failure indicates that the timerfd is in error */ 1285 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1286 assert(false); 1287 } 1288 1289 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1290 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1291 */ 1292 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1293 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1294 poller_remove_timer(poller->thread, poller); 1295 poller_insert_timer(poller->thread, poller, now_tick); 1296 } 1297 } 1298 1299 static void 1300 poller_interrupt_fini(struct spdk_poller *poller) 1301 { 1302 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1303 assert(poller->interruptfd >= 0); 1304 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1305 close(poller->interruptfd); 1306 poller->interruptfd = -1; 1307 } 1308 1309 static int 1310 busy_poller_interrupt_init(struct spdk_poller *poller) 1311 { 1312 int busy_efd; 1313 int rc; 1314 1315 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1316 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1317 if (busy_efd < 0) { 1318 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1319 return -errno; 1320 } 1321 1322 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, poller->fn, poller->arg); 1323 if (rc < 0) { 1324 close(busy_efd); 1325 return rc; 1326 } 1327 1328 poller->interruptfd = busy_efd; 1329 return 0; 1330 } 1331 1332 static void 1333 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1334 { 1335 int busy_efd = poller->interruptfd; 1336 uint64_t notify = 1; 1337 int rc __attribute__((unused)); 1338 1339 assert(busy_efd >= 0); 1340 1341 if (interrupt_mode) { 1342 /* Write without read on eventfd will get it repeatedly triggered. */ 1343 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1344 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1345 } 1346 } else { 1347 /* Read on eventfd will clear its level triggering. */ 1348 rc = read(busy_efd, ¬ify, sizeof(notify)); 1349 } 1350 } 1351 1352 #else 1353 1354 static int 1355 period_poller_interrupt_init(struct spdk_poller *poller) 1356 { 1357 return -ENOTSUP; 1358 } 1359 1360 static void 1361 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1362 { 1363 } 1364 1365 static void 1366 poller_interrupt_fini(struct spdk_poller *poller) 1367 { 1368 } 1369 1370 static int 1371 busy_poller_interrupt_init(struct spdk_poller *poller) 1372 { 1373 return -ENOTSUP; 1374 } 1375 1376 static void 1377 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1378 { 1379 } 1380 1381 #endif 1382 1383 void 1384 spdk_poller_register_interrupt(struct spdk_poller *poller, 1385 spdk_poller_set_interrupt_mode_cb cb_fn, 1386 void *cb_arg) 1387 { 1388 assert(poller != NULL); 1389 assert(cb_fn != NULL); 1390 assert(spdk_get_thread() == poller->thread); 1391 1392 if (!spdk_interrupt_mode_is_enabled()) { 1393 return; 1394 } 1395 1396 /* when a poller is created we don't know if the user is ever going to 1397 * enable interrupts on it by calling this function, so the poller 1398 * registration function has to immediately create a interruptfd. 1399 * When this function does get called by user, we have to then destroy 1400 * that interruptfd. 1401 */ 1402 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1403 poller_interrupt_fini(poller); 1404 } 1405 1406 poller->set_intr_cb_fn = cb_fn; 1407 poller->set_intr_cb_arg = cb_arg; 1408 1409 /* Set poller into interrupt mode if thread is in interrupt. */ 1410 if (poller->thread->in_interrupt) { 1411 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1412 } 1413 } 1414 1415 static uint64_t 1416 convert_us_to_ticks(uint64_t us) 1417 { 1418 uint64_t quotient, remainder, ticks; 1419 1420 if (us) { 1421 quotient = us / SPDK_SEC_TO_USEC; 1422 remainder = us % SPDK_SEC_TO_USEC; 1423 ticks = spdk_get_ticks_hz(); 1424 1425 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1426 } else { 1427 return 0; 1428 } 1429 } 1430 1431 static struct spdk_poller * 1432 poller_register(spdk_poller_fn fn, 1433 void *arg, 1434 uint64_t period_microseconds, 1435 const char *name) 1436 { 1437 struct spdk_thread *thread; 1438 struct spdk_poller *poller; 1439 1440 thread = spdk_get_thread(); 1441 if (!thread) { 1442 assert(false); 1443 return NULL; 1444 } 1445 1446 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1447 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1448 return NULL; 1449 } 1450 1451 poller = calloc(1, sizeof(*poller)); 1452 if (poller == NULL) { 1453 SPDK_ERRLOG("Poller memory allocation failed\n"); 1454 return NULL; 1455 } 1456 1457 if (name) { 1458 snprintf(poller->name, sizeof(poller->name), "%s", name); 1459 } else { 1460 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1461 } 1462 1463 poller->state = SPDK_POLLER_STATE_WAITING; 1464 poller->fn = fn; 1465 poller->arg = arg; 1466 poller->thread = thread; 1467 poller->interruptfd = -1; 1468 1469 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1470 1471 if (spdk_interrupt_mode_is_enabled()) { 1472 int rc; 1473 1474 if (period_microseconds) { 1475 rc = period_poller_interrupt_init(poller); 1476 if (rc < 0) { 1477 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1478 free(poller); 1479 return NULL; 1480 } 1481 1482 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1483 } else { 1484 /* If the poller doesn't have a period, create interruptfd that's always 1485 * busy automatically when runnning in interrupt mode. 1486 */ 1487 rc = busy_poller_interrupt_init(poller); 1488 if (rc > 0) { 1489 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1490 free(poller); 1491 return NULL; 1492 } 1493 1494 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1495 } 1496 } 1497 1498 thread_insert_poller(thread, poller); 1499 1500 return poller; 1501 } 1502 1503 struct spdk_poller * 1504 spdk_poller_register(spdk_poller_fn fn, 1505 void *arg, 1506 uint64_t period_microseconds) 1507 { 1508 return poller_register(fn, arg, period_microseconds, NULL); 1509 } 1510 1511 struct spdk_poller * 1512 spdk_poller_register_named(spdk_poller_fn fn, 1513 void *arg, 1514 uint64_t period_microseconds, 1515 const char *name) 1516 { 1517 return poller_register(fn, arg, period_microseconds, name); 1518 } 1519 1520 void 1521 spdk_poller_unregister(struct spdk_poller **ppoller) 1522 { 1523 struct spdk_thread *thread; 1524 struct spdk_poller *poller; 1525 1526 poller = *ppoller; 1527 if (poller == NULL) { 1528 return; 1529 } 1530 1531 *ppoller = NULL; 1532 1533 thread = spdk_get_thread(); 1534 if (!thread) { 1535 assert(false); 1536 return; 1537 } 1538 1539 if (poller->thread != thread) { 1540 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1541 assert(false); 1542 return; 1543 } 1544 1545 if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) { 1546 poller_interrupt_fini(poller); 1547 } 1548 1549 /* If the poller was paused, put it on the active_pollers list so that 1550 * its unregistration can be processed by spdk_thread_poll(). 1551 */ 1552 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1553 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1554 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1555 poller->period_ticks = 0; 1556 } 1557 1558 /* Simply set the state to unregistered. The poller will get cleaned up 1559 * in a subsequent call to spdk_thread_poll(). 1560 */ 1561 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1562 } 1563 1564 void 1565 spdk_poller_pause(struct spdk_poller *poller) 1566 { 1567 struct spdk_thread *thread; 1568 1569 thread = spdk_get_thread(); 1570 if (!thread) { 1571 assert(false); 1572 return; 1573 } 1574 1575 if (poller->thread != thread) { 1576 SPDK_ERRLOG("different from the thread that called spdk_poller_pause()\n"); 1577 assert(false); 1578 return; 1579 } 1580 1581 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1582 * spdk_thread_poll() move it. It allows a poller to be paused from 1583 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1584 * iteration, or from within itself without breaking the logic to always 1585 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1586 */ 1587 switch (poller->state) { 1588 case SPDK_POLLER_STATE_PAUSED: 1589 case SPDK_POLLER_STATE_PAUSING: 1590 break; 1591 case SPDK_POLLER_STATE_RUNNING: 1592 case SPDK_POLLER_STATE_WAITING: 1593 poller->state = SPDK_POLLER_STATE_PAUSING; 1594 break; 1595 default: 1596 assert(false); 1597 break; 1598 } 1599 } 1600 1601 void 1602 spdk_poller_resume(struct spdk_poller *poller) 1603 { 1604 struct spdk_thread *thread; 1605 1606 thread = spdk_get_thread(); 1607 if (!thread) { 1608 assert(false); 1609 return; 1610 } 1611 1612 if (poller->thread != thread) { 1613 SPDK_ERRLOG("different from the thread that called spdk_poller_resume()\n"); 1614 assert(false); 1615 return; 1616 } 1617 1618 /* If a poller is paused it has to be removed from the paused pollers 1619 * list and put on the active list or timer tree depending on its 1620 * period_ticks. If a poller is still in the process of being paused, 1621 * we just need to flip its state back to waiting, as it's already on 1622 * the appropriate list or tree. 1623 */ 1624 switch (poller->state) { 1625 case SPDK_POLLER_STATE_PAUSED: 1626 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1627 thread_insert_poller(thread, poller); 1628 /* fallthrough */ 1629 case SPDK_POLLER_STATE_PAUSING: 1630 poller->state = SPDK_POLLER_STATE_WAITING; 1631 break; 1632 case SPDK_POLLER_STATE_RUNNING: 1633 case SPDK_POLLER_STATE_WAITING: 1634 break; 1635 default: 1636 assert(false); 1637 break; 1638 } 1639 } 1640 1641 const char * 1642 spdk_poller_get_name(struct spdk_poller *poller) 1643 { 1644 return poller->name; 1645 } 1646 1647 const char * 1648 spdk_poller_get_state_str(struct spdk_poller *poller) 1649 { 1650 switch (poller->state) { 1651 case SPDK_POLLER_STATE_WAITING: 1652 return "waiting"; 1653 case SPDK_POLLER_STATE_RUNNING: 1654 return "running"; 1655 case SPDK_POLLER_STATE_UNREGISTERED: 1656 return "unregistered"; 1657 case SPDK_POLLER_STATE_PAUSING: 1658 return "pausing"; 1659 case SPDK_POLLER_STATE_PAUSED: 1660 return "paused"; 1661 default: 1662 return NULL; 1663 } 1664 } 1665 1666 uint64_t 1667 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1668 { 1669 return poller->period_ticks; 1670 } 1671 1672 void 1673 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1674 { 1675 stats->run_count = poller->run_count; 1676 stats->busy_count = poller->busy_count; 1677 } 1678 1679 struct spdk_poller * 1680 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1681 { 1682 return TAILQ_FIRST(&thread->active_pollers); 1683 } 1684 1685 struct spdk_poller * 1686 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1687 { 1688 return TAILQ_NEXT(prev, tailq); 1689 } 1690 1691 struct spdk_poller * 1692 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1693 { 1694 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1695 } 1696 1697 struct spdk_poller * 1698 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1699 { 1700 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1701 } 1702 1703 struct spdk_poller * 1704 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1705 { 1706 return TAILQ_FIRST(&thread->paused_pollers); 1707 } 1708 1709 struct spdk_poller * 1710 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1711 { 1712 return TAILQ_NEXT(prev, tailq); 1713 } 1714 1715 struct spdk_io_channel * 1716 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1717 { 1718 return TAILQ_FIRST(&thread->io_channels); 1719 } 1720 1721 struct spdk_io_channel * 1722 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1723 { 1724 return TAILQ_NEXT(prev, tailq); 1725 } 1726 1727 struct call_thread { 1728 struct spdk_thread *cur_thread; 1729 spdk_msg_fn fn; 1730 void *ctx; 1731 1732 struct spdk_thread *orig_thread; 1733 spdk_msg_fn cpl; 1734 }; 1735 1736 static void 1737 _on_thread(void *ctx) 1738 { 1739 struct call_thread *ct = ctx; 1740 int rc __attribute__((unused)); 1741 1742 ct->fn(ct->ctx); 1743 1744 pthread_mutex_lock(&g_devlist_mutex); 1745 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1746 pthread_mutex_unlock(&g_devlist_mutex); 1747 1748 if (!ct->cur_thread) { 1749 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1750 1751 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1752 free(ctx); 1753 } else { 1754 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1755 ct->cur_thread->name); 1756 1757 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1758 } 1759 assert(rc == 0); 1760 } 1761 1762 void 1763 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1764 { 1765 struct call_thread *ct; 1766 struct spdk_thread *thread; 1767 int rc __attribute__((unused)); 1768 1769 ct = calloc(1, sizeof(*ct)); 1770 if (!ct) { 1771 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1772 cpl(ctx); 1773 return; 1774 } 1775 1776 ct->fn = fn; 1777 ct->ctx = ctx; 1778 ct->cpl = cpl; 1779 1780 thread = _get_thread(); 1781 if (!thread) { 1782 SPDK_ERRLOG("No thread allocated\n"); 1783 free(ct); 1784 cpl(ctx); 1785 return; 1786 } 1787 ct->orig_thread = thread; 1788 1789 pthread_mutex_lock(&g_devlist_mutex); 1790 ct->cur_thread = TAILQ_FIRST(&g_threads); 1791 pthread_mutex_unlock(&g_devlist_mutex); 1792 1793 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1794 ct->orig_thread->name); 1795 1796 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1797 assert(rc == 0); 1798 } 1799 1800 static inline void 1801 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1802 { 1803 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1804 return; 1805 } 1806 1807 if (!poller->set_intr_cb_fn) { 1808 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1809 assert(false); 1810 return; 1811 } 1812 1813 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1814 } 1815 1816 void 1817 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1818 { 1819 struct spdk_thread *thread = _get_thread(); 1820 struct spdk_poller *poller, *tmp; 1821 1822 assert(thread); 1823 assert(spdk_interrupt_mode_is_enabled()); 1824 1825 if (thread->in_interrupt == enable_interrupt) { 1826 return; 1827 } 1828 1829 /* Set pollers to expected mode */ 1830 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1831 poller_set_interrupt_mode(poller, enable_interrupt); 1832 } 1833 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1834 poller_set_interrupt_mode(poller, enable_interrupt); 1835 } 1836 /* All paused pollers will go to work in interrupt mode */ 1837 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1838 poller_set_interrupt_mode(poller, enable_interrupt); 1839 } 1840 1841 thread->in_interrupt = enable_interrupt; 1842 return; 1843 } 1844 1845 void 1846 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1847 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1848 const char *name) 1849 { 1850 struct io_device *dev, *tmp; 1851 struct spdk_thread *thread; 1852 1853 assert(io_device != NULL); 1854 assert(create_cb != NULL); 1855 assert(destroy_cb != NULL); 1856 1857 thread = spdk_get_thread(); 1858 if (!thread) { 1859 SPDK_ERRLOG("called from non-SPDK thread\n"); 1860 assert(false); 1861 return; 1862 } 1863 1864 dev = calloc(1, sizeof(struct io_device)); 1865 if (dev == NULL) { 1866 SPDK_ERRLOG("could not allocate io_device\n"); 1867 return; 1868 } 1869 1870 dev->io_device = io_device; 1871 if (name) { 1872 snprintf(dev->name, sizeof(dev->name), "%s", name); 1873 } else { 1874 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1875 } 1876 dev->create_cb = create_cb; 1877 dev->destroy_cb = destroy_cb; 1878 dev->unregister_cb = NULL; 1879 dev->ctx_size = ctx_size; 1880 dev->for_each_count = 0; 1881 dev->unregistered = false; 1882 dev->refcnt = 0; 1883 1884 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1885 dev->name, dev->io_device, thread->name); 1886 1887 pthread_mutex_lock(&g_devlist_mutex); 1888 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1889 if (tmp->io_device == io_device) { 1890 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1891 io_device, tmp->name, dev->name); 1892 free(dev); 1893 pthread_mutex_unlock(&g_devlist_mutex); 1894 return; 1895 } 1896 } 1897 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1898 pthread_mutex_unlock(&g_devlist_mutex); 1899 } 1900 1901 static void 1902 _finish_unregister(void *arg) 1903 { 1904 struct io_device *dev = arg; 1905 struct spdk_thread *thread; 1906 1907 thread = spdk_get_thread(); 1908 assert(thread == dev->unregister_thread); 1909 1910 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1911 dev->name, dev->io_device, thread->name); 1912 1913 assert(thread->pending_unregister_count > 0); 1914 thread->pending_unregister_count--; 1915 1916 dev->unregister_cb(dev->io_device); 1917 free(dev); 1918 } 1919 1920 static void 1921 io_device_free(struct io_device *dev) 1922 { 1923 int rc __attribute__((unused)); 1924 1925 if (dev->unregister_cb == NULL) { 1926 free(dev); 1927 } else { 1928 assert(dev->unregister_thread != NULL); 1929 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1930 dev->name, dev->io_device, dev->unregister_thread->name); 1931 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1932 assert(rc == 0); 1933 } 1934 } 1935 1936 void 1937 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1938 { 1939 struct io_device *dev; 1940 uint32_t refcnt; 1941 struct spdk_thread *thread; 1942 1943 thread = spdk_get_thread(); 1944 if (!thread) { 1945 SPDK_ERRLOG("called from non-SPDK thread\n"); 1946 assert(false); 1947 return; 1948 } 1949 1950 pthread_mutex_lock(&g_devlist_mutex); 1951 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1952 if (dev->io_device == io_device) { 1953 break; 1954 } 1955 } 1956 1957 if (!dev) { 1958 SPDK_ERRLOG("io_device %p not found\n", io_device); 1959 assert(false); 1960 pthread_mutex_unlock(&g_devlist_mutex); 1961 return; 1962 } 1963 1964 if (dev->for_each_count > 0) { 1965 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1966 dev->name, io_device, dev->for_each_count); 1967 pthread_mutex_unlock(&g_devlist_mutex); 1968 return; 1969 } 1970 1971 dev->unregister_cb = unregister_cb; 1972 dev->unregistered = true; 1973 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1974 refcnt = dev->refcnt; 1975 dev->unregister_thread = thread; 1976 pthread_mutex_unlock(&g_devlist_mutex); 1977 1978 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1979 dev->name, dev->io_device, thread->name); 1980 1981 if (unregister_cb) { 1982 thread->pending_unregister_count++; 1983 } 1984 1985 if (refcnt > 0) { 1986 /* defer deletion */ 1987 return; 1988 } 1989 1990 io_device_free(dev); 1991 } 1992 1993 const char * 1994 spdk_io_device_get_name(struct io_device *dev) 1995 { 1996 return dev->name; 1997 } 1998 1999 struct spdk_io_channel * 2000 spdk_get_io_channel(void *io_device) 2001 { 2002 struct spdk_io_channel *ch; 2003 struct spdk_thread *thread; 2004 struct io_device *dev; 2005 int rc; 2006 2007 pthread_mutex_lock(&g_devlist_mutex); 2008 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 2009 if (dev->io_device == io_device) { 2010 break; 2011 } 2012 } 2013 if (dev == NULL) { 2014 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2015 pthread_mutex_unlock(&g_devlist_mutex); 2016 return NULL; 2017 } 2018 2019 thread = _get_thread(); 2020 if (!thread) { 2021 SPDK_ERRLOG("No thread allocated\n"); 2022 pthread_mutex_unlock(&g_devlist_mutex); 2023 return NULL; 2024 } 2025 2026 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2027 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2028 pthread_mutex_unlock(&g_devlist_mutex); 2029 return NULL; 2030 } 2031 2032 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2033 if (ch->dev == dev) { 2034 ch->ref++; 2035 2036 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2037 ch, dev->name, dev->io_device, thread->name, ch->ref); 2038 2039 /* 2040 * An I/O channel already exists for this device on this 2041 * thread, so return it. 2042 */ 2043 pthread_mutex_unlock(&g_devlist_mutex); 2044 return ch; 2045 } 2046 } 2047 2048 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2049 if (ch == NULL) { 2050 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2051 pthread_mutex_unlock(&g_devlist_mutex); 2052 return NULL; 2053 } 2054 2055 ch->dev = dev; 2056 ch->destroy_cb = dev->destroy_cb; 2057 ch->thread = thread; 2058 ch->ref = 1; 2059 ch->destroy_ref = 0; 2060 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 2061 2062 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2063 ch, dev->name, dev->io_device, thread->name, ch->ref); 2064 2065 dev->refcnt++; 2066 2067 pthread_mutex_unlock(&g_devlist_mutex); 2068 2069 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2070 if (rc != 0) { 2071 pthread_mutex_lock(&g_devlist_mutex); 2072 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 2073 dev->refcnt--; 2074 free(ch); 2075 pthread_mutex_unlock(&g_devlist_mutex); 2076 return NULL; 2077 } 2078 2079 return ch; 2080 } 2081 2082 static void 2083 put_io_channel(void *arg) 2084 { 2085 struct spdk_io_channel *ch = arg; 2086 bool do_remove_dev = true; 2087 struct spdk_thread *thread; 2088 2089 thread = spdk_get_thread(); 2090 if (!thread) { 2091 SPDK_ERRLOG("called from non-SPDK thread\n"); 2092 assert(false); 2093 return; 2094 } 2095 2096 SPDK_DEBUGLOG(thread, 2097 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2098 ch, ch->dev->name, ch->dev->io_device, thread->name); 2099 2100 assert(ch->thread == thread); 2101 2102 ch->destroy_ref--; 2103 2104 if (ch->ref > 0 || ch->destroy_ref > 0) { 2105 /* 2106 * Another reference to the associated io_device was requested 2107 * after this message was sent but before it had a chance to 2108 * execute. 2109 */ 2110 return; 2111 } 2112 2113 pthread_mutex_lock(&g_devlist_mutex); 2114 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 2115 pthread_mutex_unlock(&g_devlist_mutex); 2116 2117 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2118 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2119 2120 pthread_mutex_lock(&g_devlist_mutex); 2121 ch->dev->refcnt--; 2122 2123 if (!ch->dev->unregistered) { 2124 do_remove_dev = false; 2125 } 2126 2127 if (ch->dev->refcnt > 0) { 2128 do_remove_dev = false; 2129 } 2130 2131 pthread_mutex_unlock(&g_devlist_mutex); 2132 2133 if (do_remove_dev) { 2134 io_device_free(ch->dev); 2135 } 2136 free(ch); 2137 } 2138 2139 void 2140 spdk_put_io_channel(struct spdk_io_channel *ch) 2141 { 2142 struct spdk_thread *thread; 2143 int rc __attribute__((unused)); 2144 2145 thread = spdk_get_thread(); 2146 if (!thread) { 2147 SPDK_ERRLOG("called from non-SPDK thread\n"); 2148 assert(false); 2149 return; 2150 } 2151 2152 if (ch->thread != thread) { 2153 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 2154 assert(false); 2155 return; 2156 } 2157 2158 SPDK_DEBUGLOG(thread, 2159 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2160 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2161 2162 ch->ref--; 2163 2164 if (ch->ref == 0) { 2165 ch->destroy_ref++; 2166 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2167 assert(rc == 0); 2168 } 2169 } 2170 2171 struct spdk_io_channel * 2172 spdk_io_channel_from_ctx(void *ctx) 2173 { 2174 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2175 } 2176 2177 struct spdk_thread * 2178 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2179 { 2180 return ch->thread; 2181 } 2182 2183 void * 2184 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2185 { 2186 return ch->dev->io_device; 2187 } 2188 2189 int 2190 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2191 { 2192 return ch->ref; 2193 } 2194 2195 struct spdk_io_channel_iter { 2196 void *io_device; 2197 struct io_device *dev; 2198 spdk_channel_msg fn; 2199 int status; 2200 void *ctx; 2201 struct spdk_io_channel *ch; 2202 2203 struct spdk_thread *cur_thread; 2204 2205 struct spdk_thread *orig_thread; 2206 spdk_channel_for_each_cpl cpl; 2207 }; 2208 2209 void * 2210 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2211 { 2212 return i->io_device; 2213 } 2214 2215 struct spdk_io_channel * 2216 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2217 { 2218 return i->ch; 2219 } 2220 2221 void * 2222 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2223 { 2224 return i->ctx; 2225 } 2226 2227 static void 2228 _call_completion(void *ctx) 2229 { 2230 struct spdk_io_channel_iter *i = ctx; 2231 2232 if (i->cpl != NULL) { 2233 i->cpl(i, i->status); 2234 } 2235 free(i); 2236 } 2237 2238 static void 2239 _call_channel(void *ctx) 2240 { 2241 struct spdk_io_channel_iter *i = ctx; 2242 struct spdk_io_channel *ch; 2243 2244 /* 2245 * It is possible that the channel was deleted before this 2246 * message had a chance to execute. If so, skip calling 2247 * the fn() on this thread. 2248 */ 2249 pthread_mutex_lock(&g_devlist_mutex); 2250 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 2251 if (ch->dev->io_device == i->io_device) { 2252 break; 2253 } 2254 } 2255 pthread_mutex_unlock(&g_devlist_mutex); 2256 2257 if (ch) { 2258 i->fn(i); 2259 } else { 2260 spdk_for_each_channel_continue(i, 0); 2261 } 2262 } 2263 2264 void 2265 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2266 spdk_channel_for_each_cpl cpl) 2267 { 2268 struct spdk_thread *thread; 2269 struct spdk_io_channel *ch; 2270 struct spdk_io_channel_iter *i; 2271 int rc __attribute__((unused)); 2272 2273 i = calloc(1, sizeof(*i)); 2274 if (!i) { 2275 SPDK_ERRLOG("Unable to allocate iterator\n"); 2276 return; 2277 } 2278 2279 i->io_device = io_device; 2280 i->fn = fn; 2281 i->ctx = ctx; 2282 i->cpl = cpl; 2283 2284 pthread_mutex_lock(&g_devlist_mutex); 2285 i->orig_thread = _get_thread(); 2286 2287 TAILQ_FOREACH(thread, &g_threads, tailq) { 2288 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2289 if (ch->dev->io_device == io_device) { 2290 ch->dev->for_each_count++; 2291 i->dev = ch->dev; 2292 i->cur_thread = thread; 2293 i->ch = ch; 2294 pthread_mutex_unlock(&g_devlist_mutex); 2295 rc = spdk_thread_send_msg(thread, _call_channel, i); 2296 assert(rc == 0); 2297 return; 2298 } 2299 } 2300 } 2301 2302 pthread_mutex_unlock(&g_devlist_mutex); 2303 2304 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2305 assert(rc == 0); 2306 } 2307 2308 void 2309 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2310 { 2311 struct spdk_thread *thread; 2312 struct spdk_io_channel *ch; 2313 int rc __attribute__((unused)); 2314 2315 assert(i->cur_thread == spdk_get_thread()); 2316 2317 i->status = status; 2318 2319 pthread_mutex_lock(&g_devlist_mutex); 2320 if (status) { 2321 goto end; 2322 } 2323 thread = TAILQ_NEXT(i->cur_thread, tailq); 2324 while (thread) { 2325 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2326 if (ch->dev->io_device == i->io_device) { 2327 i->cur_thread = thread; 2328 i->ch = ch; 2329 pthread_mutex_unlock(&g_devlist_mutex); 2330 rc = spdk_thread_send_msg(thread, _call_channel, i); 2331 assert(rc == 0); 2332 return; 2333 } 2334 } 2335 thread = TAILQ_NEXT(thread, tailq); 2336 } 2337 2338 end: 2339 i->dev->for_each_count--; 2340 i->ch = NULL; 2341 pthread_mutex_unlock(&g_devlist_mutex); 2342 2343 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2344 assert(rc == 0); 2345 } 2346 2347 struct spdk_interrupt { 2348 int efd; 2349 struct spdk_thread *thread; 2350 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2351 }; 2352 2353 static void 2354 thread_interrupt_destroy(struct spdk_thread *thread) 2355 { 2356 struct spdk_fd_group *fgrp = thread->fgrp; 2357 2358 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2359 2360 if (thread->msg_fd < 0) { 2361 return; 2362 } 2363 2364 spdk_fd_group_remove(fgrp, thread->msg_fd); 2365 close(thread->msg_fd); 2366 thread->msg_fd = -1; 2367 2368 spdk_fd_group_destroy(fgrp); 2369 thread->fgrp = NULL; 2370 } 2371 2372 #ifdef __linux__ 2373 static int 2374 thread_interrupt_msg_process(void *arg) 2375 { 2376 struct spdk_thread *thread = arg; 2377 uint32_t msg_count; 2378 spdk_msg_fn critical_msg; 2379 int rc = 0; 2380 uint64_t notify = 1; 2381 2382 assert(spdk_interrupt_mode_is_enabled()); 2383 2384 /* There may be race between msg_acknowledge and another producer's msg_notify, 2385 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2386 * This can avoid msg notification missing. 2387 */ 2388 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2389 if (rc < 0 && errno != EAGAIN) { 2390 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2391 } 2392 2393 critical_msg = thread->critical_msg; 2394 if (spdk_unlikely(critical_msg != NULL)) { 2395 critical_msg(NULL); 2396 thread->critical_msg = NULL; 2397 rc = 1; 2398 } 2399 2400 msg_count = msg_queue_run_batch(thread, 0); 2401 if (msg_count) { 2402 rc = 1; 2403 } 2404 2405 return rc; 2406 } 2407 2408 static int 2409 thread_interrupt_create(struct spdk_thread *thread) 2410 { 2411 int rc; 2412 2413 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2414 2415 rc = spdk_fd_group_create(&thread->fgrp); 2416 if (rc) { 2417 return rc; 2418 } 2419 2420 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2421 if (thread->msg_fd < 0) { 2422 rc = -errno; 2423 spdk_fd_group_destroy(thread->fgrp); 2424 thread->fgrp = NULL; 2425 2426 return rc; 2427 } 2428 2429 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 2430 } 2431 #else 2432 static int 2433 thread_interrupt_create(struct spdk_thread *thread) 2434 { 2435 return -ENOTSUP; 2436 } 2437 #endif 2438 2439 struct spdk_interrupt * 2440 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2441 void *arg, const char *name) 2442 { 2443 struct spdk_thread *thread; 2444 struct spdk_interrupt *intr; 2445 2446 thread = spdk_get_thread(); 2447 if (!thread) { 2448 assert(false); 2449 return NULL; 2450 } 2451 2452 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2453 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2454 return NULL; 2455 } 2456 2457 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 2458 return NULL; 2459 } 2460 2461 intr = calloc(1, sizeof(*intr)); 2462 if (intr == NULL) { 2463 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2464 return NULL; 2465 } 2466 2467 if (name) { 2468 snprintf(intr->name, sizeof(intr->name), "%s", name); 2469 } else { 2470 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2471 } 2472 2473 intr->efd = efd; 2474 intr->thread = thread; 2475 2476 return intr; 2477 } 2478 2479 void 2480 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2481 { 2482 struct spdk_thread *thread; 2483 struct spdk_interrupt *intr; 2484 2485 intr = *pintr; 2486 if (intr == NULL) { 2487 return; 2488 } 2489 2490 *pintr = NULL; 2491 2492 thread = spdk_get_thread(); 2493 if (!thread) { 2494 assert(false); 2495 return; 2496 } 2497 2498 if (intr->thread != thread) { 2499 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2500 assert(false); 2501 return; 2502 } 2503 2504 spdk_fd_group_remove(thread->fgrp, intr->efd); 2505 free(intr); 2506 } 2507 2508 int 2509 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2510 enum spdk_interrupt_event_types event_types) 2511 { 2512 struct spdk_thread *thread; 2513 2514 thread = spdk_get_thread(); 2515 if (!thread) { 2516 assert(false); 2517 return -EINVAL; 2518 } 2519 2520 if (intr->thread != thread) { 2521 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2522 assert(false); 2523 return -EINVAL; 2524 } 2525 2526 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2527 } 2528 2529 int 2530 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2531 { 2532 return spdk_fd_group_get_fd(thread->fgrp); 2533 } 2534 2535 static bool g_interrupt_mode = false; 2536 2537 int 2538 spdk_interrupt_mode_enable(void) 2539 { 2540 /* It must be called once prior to initializing the threading library. 2541 * g_spdk_msg_mempool will be valid if thread library is initialized. 2542 */ 2543 if (g_spdk_msg_mempool) { 2544 SPDK_ERRLOG("Failed due to threading library is already initailzied.\n"); 2545 return -1; 2546 } 2547 2548 #ifdef __linux__ 2549 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2550 g_interrupt_mode = true; 2551 return 0; 2552 #else 2553 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2554 g_interrupt_mode = false; 2555 return -ENOTSUP; 2556 #endif 2557 } 2558 2559 bool 2560 spdk_interrupt_mode_is_enabled(void) 2561 { 2562 return g_interrupt_mode; 2563 } 2564 2565 SPDK_LOG_REGISTER_COMPONENT(thread) 2566