1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/trace.h" 42 #include "spdk/tree.h" 43 #include "spdk/util.h" 44 #include "spdk/fd_group.h" 45 46 #include "spdk/log.h" 47 #include "spdk_internal/thread.h" 48 #include "thread_internal.h" 49 50 #ifdef __linux__ 51 #include <sys/timerfd.h> 52 #include <sys/eventfd.h> 53 #endif 54 55 #define SPDK_MSG_BATCH_SIZE 8 56 #define SPDK_MAX_DEVICE_NAME_LEN 256 57 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 58 #define SPDK_MAX_POLLER_NAME_LEN 256 59 #define SPDK_MAX_THREAD_NAME_LEN 256 60 61 enum spdk_poller_state { 62 /* The poller is registered with a thread but not currently executing its fn. */ 63 SPDK_POLLER_STATE_WAITING, 64 65 /* The poller is currently running its fn. */ 66 SPDK_POLLER_STATE_RUNNING, 67 68 /* The poller was unregistered during the execution of its fn. */ 69 SPDK_POLLER_STATE_UNREGISTERED, 70 71 /* The poller is in the process of being paused. It will be paused 72 * during the next time it's supposed to be executed. 73 */ 74 SPDK_POLLER_STATE_PAUSING, 75 76 /* The poller is registered but currently paused. It's on the 77 * paused_pollers list. 78 */ 79 SPDK_POLLER_STATE_PAUSED, 80 }; 81 82 struct spdk_poller { 83 TAILQ_ENTRY(spdk_poller) tailq; 84 RB_ENTRY(spdk_poller) node; 85 86 /* Current state of the poller; should only be accessed from the poller's thread. */ 87 enum spdk_poller_state state; 88 89 uint64_t period_ticks; 90 uint64_t next_run_tick; 91 uint64_t run_count; 92 uint64_t busy_count; 93 spdk_poller_fn fn; 94 void *arg; 95 struct spdk_thread *thread; 96 int interruptfd; 97 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 98 void *set_intr_cb_arg; 99 100 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 101 }; 102 103 enum spdk_thread_state { 104 /* The thread is pocessing poller and message by spdk_thread_poll(). */ 105 SPDK_THREAD_STATE_RUNNING, 106 107 /* The thread is in the process of termination. It reaps unregistering 108 * poller are releasing I/O channel. 109 */ 110 SPDK_THREAD_STATE_EXITING, 111 112 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 113 SPDK_THREAD_STATE_EXITED, 114 }; 115 116 struct spdk_thread { 117 uint64_t tsc_last; 118 struct spdk_thread_stats stats; 119 /* 120 * Contains pollers actively running on this thread. Pollers 121 * are run round-robin. The thread takes one poller from the head 122 * of the ring, executes it, then puts it back at the tail of 123 * the ring. 124 */ 125 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 126 /** 127 * Contains pollers running on this thread with a periodic timer. 128 */ 129 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 130 struct spdk_poller *first_timed_poller; 131 /* 132 * Contains paused pollers. Pollers on this queue are waiting until 133 * they are resumed (in which case they're put onto the active/timer 134 * queues) or unregistered. 135 */ 136 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 137 struct spdk_ring *messages; 138 int msg_fd; 139 SLIST_HEAD(, spdk_msg) msg_cache; 140 size_t msg_cache_count; 141 spdk_msg_fn critical_msg; 142 uint64_t id; 143 enum spdk_thread_state state; 144 int pending_unregister_count; 145 146 TAILQ_HEAD(, spdk_io_channel) io_channels; 147 TAILQ_ENTRY(spdk_thread) tailq; 148 149 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 150 struct spdk_cpuset cpumask; 151 uint64_t exit_timeout_tsc; 152 153 /* Indicates whether this spdk_thread currently runs in interrupt. */ 154 bool in_interrupt; 155 struct spdk_fd_group *fgrp; 156 157 /* User context allocated at the end */ 158 uint8_t ctx[0]; 159 }; 160 161 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 162 163 static spdk_new_thread_fn g_new_thread_fn = NULL; 164 static spdk_thread_op_fn g_thread_op_fn = NULL; 165 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 166 static size_t g_ctx_sz = 0; 167 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 168 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 169 * SPDK application is required. 170 */ 171 static uint64_t g_thread_id = 1; 172 173 struct io_device { 174 void *io_device; 175 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 176 spdk_io_channel_create_cb create_cb; 177 spdk_io_channel_destroy_cb destroy_cb; 178 spdk_io_device_unregister_cb unregister_cb; 179 struct spdk_thread *unregister_thread; 180 uint32_t ctx_size; 181 uint32_t for_each_count; 182 TAILQ_ENTRY(io_device) tailq; 183 184 uint32_t refcnt; 185 186 bool unregistered; 187 }; 188 189 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 190 191 struct spdk_msg { 192 spdk_msg_fn fn; 193 void *arg; 194 195 SLIST_ENTRY(spdk_msg) link; 196 }; 197 198 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 199 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 200 201 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 202 static uint32_t g_thread_count = 0; 203 204 static __thread struct spdk_thread *tls_thread = NULL; 205 206 #define TRACE_GROUP_THREAD 0xa 207 #define TRACE_THREAD_IOCH_GET SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x0) 208 #define TRACE_THREAD_IOCH_PUT SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x1) 209 210 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 211 { 212 spdk_trace_register_description("THREAD_IOCH_GET", 213 TRACE_THREAD_IOCH_GET, 214 OWNER_NONE, OBJECT_NONE, 0, 215 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 216 spdk_trace_register_description("THREAD_IOCH_PUT", 217 TRACE_THREAD_IOCH_PUT, 218 OWNER_NONE, OBJECT_NONE, 0, 219 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 220 } 221 222 /* 223 * If this compare function returns zero when two next_run_ticks are equal, 224 * the macro RB_INSERT() returns a pointer to the element with the same 225 * next_run_tick. 226 * 227 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 228 * to remove as a parameter. 229 * 230 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 231 * side by returning 1 when two next_run_ticks are equal. 232 */ 233 static inline int 234 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 235 { 236 if (poller1->next_run_tick < poller2->next_run_tick) { 237 return -1; 238 } else { 239 return 1; 240 } 241 } 242 243 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 244 245 static inline struct spdk_thread * 246 _get_thread(void) 247 { 248 return tls_thread; 249 } 250 251 static int 252 _thread_lib_init(size_t ctx_sz) 253 { 254 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 255 256 g_ctx_sz = ctx_sz; 257 258 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 259 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 260 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 261 sizeof(struct spdk_msg), 262 0, /* No cache. We do our own. */ 263 SPDK_ENV_SOCKET_ID_ANY); 264 265 if (!g_spdk_msg_mempool) { 266 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 267 return -1; 268 } 269 270 return 0; 271 } 272 273 int 274 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 275 { 276 assert(g_new_thread_fn == NULL); 277 assert(g_thread_op_fn == NULL); 278 279 if (new_thread_fn == NULL) { 280 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 281 } else { 282 g_new_thread_fn = new_thread_fn; 283 } 284 285 return _thread_lib_init(ctx_sz); 286 } 287 288 int 289 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 290 spdk_thread_op_supported_fn thread_op_supported_fn, 291 size_t ctx_sz) 292 { 293 assert(g_new_thread_fn == NULL); 294 assert(g_thread_op_fn == NULL); 295 assert(g_thread_op_supported_fn == NULL); 296 297 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 298 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 299 return -EINVAL; 300 } 301 302 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 303 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 304 } else { 305 g_thread_op_fn = thread_op_fn; 306 g_thread_op_supported_fn = thread_op_supported_fn; 307 } 308 309 return _thread_lib_init(ctx_sz); 310 } 311 312 void 313 spdk_thread_lib_fini(void) 314 { 315 struct io_device *dev; 316 317 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 318 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 319 } 320 321 if (g_spdk_msg_mempool) { 322 spdk_mempool_free(g_spdk_msg_mempool); 323 g_spdk_msg_mempool = NULL; 324 } 325 326 g_new_thread_fn = NULL; 327 g_thread_op_fn = NULL; 328 g_thread_op_supported_fn = NULL; 329 g_ctx_sz = 0; 330 } 331 332 static void thread_interrupt_destroy(struct spdk_thread *thread); 333 static int thread_interrupt_create(struct spdk_thread *thread); 334 335 static void 336 _free_thread(struct spdk_thread *thread) 337 { 338 struct spdk_io_channel *ch; 339 struct spdk_msg *msg; 340 struct spdk_poller *poller, *ptmp; 341 342 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 343 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 344 thread->name, ch->dev->name); 345 } 346 347 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 348 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 349 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 350 poller->name); 351 } 352 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 353 free(poller); 354 } 355 356 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 357 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 358 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 359 poller->name); 360 } 361 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 362 free(poller); 363 } 364 365 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 366 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 367 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 368 free(poller); 369 } 370 371 pthread_mutex_lock(&g_devlist_mutex); 372 assert(g_thread_count > 0); 373 g_thread_count--; 374 TAILQ_REMOVE(&g_threads, thread, tailq); 375 pthread_mutex_unlock(&g_devlist_mutex); 376 377 msg = SLIST_FIRST(&thread->msg_cache); 378 while (msg != NULL) { 379 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 380 381 assert(thread->msg_cache_count > 0); 382 thread->msg_cache_count--; 383 spdk_mempool_put(g_spdk_msg_mempool, msg); 384 385 msg = SLIST_FIRST(&thread->msg_cache); 386 } 387 388 assert(thread->msg_cache_count == 0); 389 390 if (spdk_interrupt_mode_is_enabled()) { 391 thread_interrupt_destroy(thread); 392 } 393 394 spdk_ring_free(thread->messages); 395 free(thread); 396 } 397 398 struct spdk_thread * 399 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 400 { 401 struct spdk_thread *thread; 402 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 403 int rc = 0, i; 404 405 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 406 if (!thread) { 407 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 408 return NULL; 409 } 410 411 if (cpumask) { 412 spdk_cpuset_copy(&thread->cpumask, cpumask); 413 } else { 414 spdk_cpuset_negate(&thread->cpumask); 415 } 416 417 TAILQ_INIT(&thread->io_channels); 418 TAILQ_INIT(&thread->active_pollers); 419 RB_INIT(&thread->timed_pollers); 420 TAILQ_INIT(&thread->paused_pollers); 421 SLIST_INIT(&thread->msg_cache); 422 thread->msg_cache_count = 0; 423 424 thread->tsc_last = spdk_get_ticks(); 425 426 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 427 if (!thread->messages) { 428 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 429 free(thread); 430 return NULL; 431 } 432 433 /* Fill the local message pool cache. */ 434 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 435 if (rc == 0) { 436 /* If we can't populate the cache it's ok. The cache will get filled 437 * up organically as messages are passed to the thread. */ 438 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 439 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 440 thread->msg_cache_count++; 441 } 442 } 443 444 if (name) { 445 snprintf(thread->name, sizeof(thread->name), "%s", name); 446 } else { 447 snprintf(thread->name, sizeof(thread->name), "%p", thread); 448 } 449 450 pthread_mutex_lock(&g_devlist_mutex); 451 if (g_thread_id == 0) { 452 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 453 pthread_mutex_unlock(&g_devlist_mutex); 454 _free_thread(thread); 455 return NULL; 456 } 457 thread->id = g_thread_id++; 458 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 459 g_thread_count++; 460 pthread_mutex_unlock(&g_devlist_mutex); 461 462 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 463 thread->id, thread->name); 464 465 if (spdk_interrupt_mode_is_enabled()) { 466 thread->in_interrupt = true; 467 rc = thread_interrupt_create(thread); 468 if (rc != 0) { 469 _free_thread(thread); 470 return NULL; 471 } 472 } 473 474 if (g_new_thread_fn) { 475 rc = g_new_thread_fn(thread); 476 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 477 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 478 } 479 480 if (rc != 0) { 481 _free_thread(thread); 482 return NULL; 483 } 484 485 thread->state = SPDK_THREAD_STATE_RUNNING; 486 487 return thread; 488 } 489 490 void 491 spdk_set_thread(struct spdk_thread *thread) 492 { 493 tls_thread = thread; 494 } 495 496 static void 497 thread_exit(struct spdk_thread *thread, uint64_t now) 498 { 499 struct spdk_poller *poller; 500 struct spdk_io_channel *ch; 501 502 if (now >= thread->exit_timeout_tsc) { 503 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 504 thread->name); 505 goto exited; 506 } 507 508 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 509 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 510 SPDK_INFOLOG(thread, 511 "thread %s still has active poller %s\n", 512 thread->name, poller->name); 513 return; 514 } 515 } 516 517 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 518 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 519 SPDK_INFOLOG(thread, 520 "thread %s still has active timed poller %s\n", 521 thread->name, poller->name); 522 return; 523 } 524 } 525 526 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 527 SPDK_INFOLOG(thread, 528 "thread %s still has paused poller %s\n", 529 thread->name, poller->name); 530 return; 531 } 532 533 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 534 SPDK_INFOLOG(thread, 535 "thread %s still has channel for io_device %s\n", 536 thread->name, ch->dev->name); 537 return; 538 } 539 540 if (thread->pending_unregister_count > 0) { 541 SPDK_INFOLOG(thread, 542 "thread %s is still unregistering io_devices\n", 543 thread->name); 544 return; 545 } 546 547 exited: 548 thread->state = SPDK_THREAD_STATE_EXITED; 549 } 550 551 int 552 spdk_thread_exit(struct spdk_thread *thread) 553 { 554 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 555 556 assert(tls_thread == thread); 557 558 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 559 SPDK_INFOLOG(thread, 560 "thread %s is already exiting\n", 561 thread->name); 562 return 0; 563 } 564 565 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 566 SPDK_THREAD_EXIT_TIMEOUT_SEC); 567 thread->state = SPDK_THREAD_STATE_EXITING; 568 return 0; 569 } 570 571 bool 572 spdk_thread_is_exited(struct spdk_thread *thread) 573 { 574 return thread->state == SPDK_THREAD_STATE_EXITED; 575 } 576 577 void 578 spdk_thread_destroy(struct spdk_thread *thread) 579 { 580 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 581 582 assert(thread->state == SPDK_THREAD_STATE_EXITED); 583 584 if (tls_thread == thread) { 585 tls_thread = NULL; 586 } 587 588 _free_thread(thread); 589 } 590 591 void * 592 spdk_thread_get_ctx(struct spdk_thread *thread) 593 { 594 if (g_ctx_sz > 0) { 595 return thread->ctx; 596 } 597 598 return NULL; 599 } 600 601 struct spdk_cpuset * 602 spdk_thread_get_cpumask(struct spdk_thread *thread) 603 { 604 return &thread->cpumask; 605 } 606 607 int 608 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 609 { 610 struct spdk_thread *thread; 611 612 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 613 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 614 assert(false); 615 return -ENOTSUP; 616 } 617 618 thread = spdk_get_thread(); 619 if (!thread) { 620 SPDK_ERRLOG("Called from non-SPDK thread\n"); 621 assert(false); 622 return -EINVAL; 623 } 624 625 spdk_cpuset_copy(&thread->cpumask, cpumask); 626 627 /* Invoke framework's reschedule operation. If this function is called multiple times 628 * in a single spdk_thread_poll() context, the last cpumask will be used in the 629 * reschedule operation. 630 */ 631 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 632 633 return 0; 634 } 635 636 struct spdk_thread * 637 spdk_thread_get_from_ctx(void *ctx) 638 { 639 if (ctx == NULL) { 640 assert(false); 641 return NULL; 642 } 643 644 assert(g_ctx_sz > 0); 645 646 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 647 } 648 649 static inline uint32_t 650 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 651 { 652 unsigned count, i; 653 void *messages[SPDK_MSG_BATCH_SIZE]; 654 uint64_t notify = 1; 655 int rc; 656 657 #ifdef DEBUG 658 /* 659 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 660 * so we will never actually read uninitialized data from events, but just to be sure 661 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 662 */ 663 memset(messages, 0, sizeof(messages)); 664 #endif 665 666 if (max_msgs > 0) { 667 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 668 } else { 669 max_msgs = SPDK_MSG_BATCH_SIZE; 670 } 671 672 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 673 if (spdk_unlikely(thread->in_interrupt) && 674 spdk_ring_count(thread->messages) != 0) { 675 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 676 if (rc < 0) { 677 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 678 } 679 } 680 if (count == 0) { 681 return 0; 682 } 683 684 for (i = 0; i < count; i++) { 685 struct spdk_msg *msg = messages[i]; 686 687 assert(msg != NULL); 688 msg->fn(msg->arg); 689 690 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 691 /* Insert the messages at the head. We want to re-use the hot 692 * ones. */ 693 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 694 thread->msg_cache_count++; 695 } else { 696 spdk_mempool_put(g_spdk_msg_mempool, msg); 697 } 698 } 699 700 return count; 701 } 702 703 static void 704 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 705 { 706 struct spdk_poller *tmp __attribute__((unused)); 707 708 poller->next_run_tick = now + poller->period_ticks; 709 710 /* 711 * Insert poller in the thread's timed_pollers tree by next scheduled run time 712 * as its key. 713 */ 714 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 715 assert(tmp == NULL); 716 717 /* Update the cache only if it is empty or the inserted poller is earlier than it. 718 * RB_MIN() is not necessary here because all pollers, which has exactly the same 719 * next_run_tick as the existing poller, are inserted on the right side. 720 */ 721 if (thread->first_timed_poller == NULL || 722 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 723 thread->first_timed_poller = poller; 724 } 725 } 726 727 #ifdef __linux__ 728 static inline void 729 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 730 { 731 struct spdk_poller *tmp __attribute__((unused)); 732 733 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 734 assert(tmp != NULL); 735 736 /* This function is not used in any case that is performance critical. 737 * Update the cache simply by RB_MIN() if it needs to be changed. 738 */ 739 if (thread->first_timed_poller == poller) { 740 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 741 } 742 } 743 #endif 744 745 static void 746 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 747 { 748 if (poller->period_ticks) { 749 poller_insert_timer(thread, poller, spdk_get_ticks()); 750 } else { 751 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 752 } 753 } 754 755 static inline void 756 thread_update_stats(struct spdk_thread *thread, uint64_t end, 757 uint64_t start, int rc) 758 { 759 if (rc == 0) { 760 /* Poller status idle */ 761 thread->stats.idle_tsc += end - start; 762 } else if (rc > 0) { 763 /* Poller status busy */ 764 thread->stats.busy_tsc += end - start; 765 } 766 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 767 thread->tsc_last = end; 768 } 769 770 static inline int 771 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 772 { 773 int rc; 774 775 switch (poller->state) { 776 case SPDK_POLLER_STATE_UNREGISTERED: 777 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 778 free(poller); 779 return 0; 780 case SPDK_POLLER_STATE_PAUSING: 781 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 782 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 783 poller->state = SPDK_POLLER_STATE_PAUSED; 784 return 0; 785 case SPDK_POLLER_STATE_WAITING: 786 break; 787 default: 788 assert(false); 789 break; 790 } 791 792 poller->state = SPDK_POLLER_STATE_RUNNING; 793 rc = poller->fn(poller->arg); 794 795 poller->run_count++; 796 if (rc > 0) { 797 poller->busy_count++; 798 } 799 800 #ifdef DEBUG 801 if (rc == -1) { 802 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 803 } 804 #endif 805 806 switch (poller->state) { 807 case SPDK_POLLER_STATE_UNREGISTERED: 808 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 809 free(poller); 810 break; 811 case SPDK_POLLER_STATE_PAUSING: 812 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 813 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 814 poller->state = SPDK_POLLER_STATE_PAUSED; 815 break; 816 case SPDK_POLLER_STATE_PAUSED: 817 case SPDK_POLLER_STATE_WAITING: 818 break; 819 case SPDK_POLLER_STATE_RUNNING: 820 poller->state = SPDK_POLLER_STATE_WAITING; 821 break; 822 default: 823 assert(false); 824 break; 825 } 826 827 return rc; 828 } 829 830 static inline int 831 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 832 uint64_t now) 833 { 834 int rc; 835 836 switch (poller->state) { 837 case SPDK_POLLER_STATE_UNREGISTERED: 838 free(poller); 839 return 0; 840 case SPDK_POLLER_STATE_PAUSING: 841 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 842 poller->state = SPDK_POLLER_STATE_PAUSED; 843 return 0; 844 case SPDK_POLLER_STATE_WAITING: 845 break; 846 default: 847 assert(false); 848 break; 849 } 850 851 poller->state = SPDK_POLLER_STATE_RUNNING; 852 rc = poller->fn(poller->arg); 853 854 poller->run_count++; 855 if (rc > 0) { 856 poller->busy_count++; 857 } 858 859 #ifdef DEBUG 860 if (rc == -1) { 861 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 862 } 863 #endif 864 865 switch (poller->state) { 866 case SPDK_POLLER_STATE_UNREGISTERED: 867 free(poller); 868 break; 869 case SPDK_POLLER_STATE_PAUSING: 870 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 871 poller->state = SPDK_POLLER_STATE_PAUSED; 872 break; 873 case SPDK_POLLER_STATE_PAUSED: 874 break; 875 case SPDK_POLLER_STATE_RUNNING: 876 poller->state = SPDK_POLLER_STATE_WAITING; 877 /* fallthrough */ 878 case SPDK_POLLER_STATE_WAITING: 879 poller_insert_timer(thread, poller, now); 880 break; 881 default: 882 assert(false); 883 break; 884 } 885 886 return rc; 887 } 888 889 static int 890 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 891 { 892 uint32_t msg_count; 893 struct spdk_poller *poller, *tmp; 894 spdk_msg_fn critical_msg; 895 int rc = 0; 896 897 thread->tsc_last = now; 898 899 critical_msg = thread->critical_msg; 900 if (spdk_unlikely(critical_msg != NULL)) { 901 critical_msg(NULL); 902 thread->critical_msg = NULL; 903 rc = 1; 904 } 905 906 msg_count = msg_queue_run_batch(thread, max_msgs); 907 if (msg_count) { 908 rc = 1; 909 } 910 911 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 912 active_pollers_head, tailq, tmp) { 913 int poller_rc; 914 915 poller_rc = thread_execute_poller(thread, poller); 916 if (poller_rc > rc) { 917 rc = poller_rc; 918 } 919 } 920 921 poller = thread->first_timed_poller; 922 while (poller != NULL) { 923 int timer_rc = 0; 924 925 if (now < poller->next_run_tick) { 926 break; 927 } 928 929 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 930 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 931 932 /* Update the cache to the next timed poller in the list 933 * only if the current poller is still the closest, otherwise, 934 * do nothing because the cache has been already updated. 935 */ 936 if (thread->first_timed_poller == poller) { 937 thread->first_timed_poller = tmp; 938 } 939 940 timer_rc = thread_execute_timed_poller(thread, poller, now); 941 if (timer_rc > rc) { 942 rc = timer_rc; 943 } 944 945 poller = tmp; 946 } 947 948 return rc; 949 } 950 951 int 952 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 953 { 954 struct spdk_thread *orig_thread; 955 int rc; 956 uint64_t notify = 1; 957 958 orig_thread = _get_thread(); 959 tls_thread = thread; 960 961 if (now == 0) { 962 now = spdk_get_ticks(); 963 } 964 965 if (spdk_likely(!thread->in_interrupt)) { 966 rc = thread_poll(thread, max_msgs, now); 967 if (spdk_unlikely(thread->in_interrupt)) { 968 /* The thread transitioned to interrupt mode during the above poll. 969 * Poll it one more time in case that during the transition time 970 * there is msg received without notification. 971 */ 972 rc = thread_poll(thread, max_msgs, now); 973 } 974 } else { 975 /* Non-block wait on thread's fd_group */ 976 rc = spdk_fd_group_wait(thread->fgrp, 0); 977 if (spdk_unlikely(!thread->in_interrupt)) { 978 /* The thread transitioned to poll mode in a msg during the above processing. 979 * Clear msg_fd since thread messages will be polled directly in poll mode. 980 */ 981 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 982 if (rc < 0 && errno != EAGAIN) { 983 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 984 } 985 } 986 } 987 988 989 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 990 thread_exit(thread, now); 991 } 992 993 thread_update_stats(thread, spdk_get_ticks(), now, rc); 994 995 tls_thread = orig_thread; 996 997 return rc; 998 } 999 1000 uint64_t 1001 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1002 { 1003 struct spdk_poller *poller; 1004 1005 poller = thread->first_timed_poller; 1006 if (poller) { 1007 return poller->next_run_tick; 1008 } 1009 1010 return 0; 1011 } 1012 1013 int 1014 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1015 { 1016 return !TAILQ_EMPTY(&thread->active_pollers); 1017 } 1018 1019 static bool 1020 thread_has_unpaused_pollers(struct spdk_thread *thread) 1021 { 1022 if (TAILQ_EMPTY(&thread->active_pollers) && 1023 RB_EMPTY(&thread->timed_pollers)) { 1024 return false; 1025 } 1026 1027 return true; 1028 } 1029 1030 bool 1031 spdk_thread_has_pollers(struct spdk_thread *thread) 1032 { 1033 if (!thread_has_unpaused_pollers(thread) && 1034 TAILQ_EMPTY(&thread->paused_pollers)) { 1035 return false; 1036 } 1037 1038 return true; 1039 } 1040 1041 bool 1042 spdk_thread_is_idle(struct spdk_thread *thread) 1043 { 1044 if (spdk_ring_count(thread->messages) || 1045 thread_has_unpaused_pollers(thread) || 1046 thread->critical_msg != NULL) { 1047 return false; 1048 } 1049 1050 return true; 1051 } 1052 1053 uint32_t 1054 spdk_thread_get_count(void) 1055 { 1056 /* 1057 * Return cached value of the current thread count. We could acquire the 1058 * lock and iterate through the TAILQ of threads to count them, but that 1059 * count could still be invalidated after we release the lock. 1060 */ 1061 return g_thread_count; 1062 } 1063 1064 struct spdk_thread * 1065 spdk_get_thread(void) 1066 { 1067 return _get_thread(); 1068 } 1069 1070 const char * 1071 spdk_thread_get_name(const struct spdk_thread *thread) 1072 { 1073 return thread->name; 1074 } 1075 1076 uint64_t 1077 spdk_thread_get_id(const struct spdk_thread *thread) 1078 { 1079 return thread->id; 1080 } 1081 1082 struct spdk_thread * 1083 spdk_thread_get_by_id(uint64_t id) 1084 { 1085 struct spdk_thread *thread; 1086 1087 if (id == 0 || id >= g_thread_id) { 1088 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1089 return NULL; 1090 } 1091 pthread_mutex_lock(&g_devlist_mutex); 1092 TAILQ_FOREACH(thread, &g_threads, tailq) { 1093 if (thread->id == id) { 1094 break; 1095 } 1096 } 1097 pthread_mutex_unlock(&g_devlist_mutex); 1098 return thread; 1099 } 1100 1101 int 1102 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1103 { 1104 struct spdk_thread *thread; 1105 1106 thread = _get_thread(); 1107 if (!thread) { 1108 SPDK_ERRLOG("No thread allocated\n"); 1109 return -EINVAL; 1110 } 1111 1112 if (stats == NULL) { 1113 return -EINVAL; 1114 } 1115 1116 *stats = thread->stats; 1117 1118 return 0; 1119 } 1120 1121 uint64_t 1122 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1123 { 1124 if (thread == NULL) { 1125 thread = _get_thread(); 1126 } 1127 1128 return thread->tsc_last; 1129 } 1130 1131 static inline int 1132 thread_send_msg_notification(const struct spdk_thread *target_thread) 1133 { 1134 uint64_t notify = 1; 1135 int rc; 1136 1137 /* Not necessary to do notification if interrupt facility is not enabled */ 1138 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1139 return 0; 1140 } 1141 1142 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1143 * after sending thread msg, it is necessary to check whether target thread runs in 1144 * interrupt mode and then decide whether do event notification. 1145 */ 1146 if (spdk_unlikely(target_thread->in_interrupt)) { 1147 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1148 if (rc < 0) { 1149 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1150 return -EIO; 1151 } 1152 } 1153 1154 return 0; 1155 } 1156 1157 int 1158 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1159 { 1160 struct spdk_thread *local_thread; 1161 struct spdk_msg *msg; 1162 int rc; 1163 1164 assert(thread != NULL); 1165 1166 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1167 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1168 return -EIO; 1169 } 1170 1171 local_thread = _get_thread(); 1172 1173 msg = NULL; 1174 if (local_thread != NULL) { 1175 if (local_thread->msg_cache_count > 0) { 1176 msg = SLIST_FIRST(&local_thread->msg_cache); 1177 assert(msg != NULL); 1178 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1179 local_thread->msg_cache_count--; 1180 } 1181 } 1182 1183 if (msg == NULL) { 1184 msg = spdk_mempool_get(g_spdk_msg_mempool); 1185 if (!msg) { 1186 SPDK_ERRLOG("msg could not be allocated\n"); 1187 return -ENOMEM; 1188 } 1189 } 1190 1191 msg->fn = fn; 1192 msg->arg = ctx; 1193 1194 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1195 if (rc != 1) { 1196 SPDK_ERRLOG("msg could not be enqueued\n"); 1197 spdk_mempool_put(g_spdk_msg_mempool, msg); 1198 return -EIO; 1199 } 1200 1201 return thread_send_msg_notification(thread); 1202 } 1203 1204 int 1205 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1206 { 1207 spdk_msg_fn expected = NULL; 1208 1209 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1210 __ATOMIC_SEQ_CST)) { 1211 return -EIO; 1212 } 1213 1214 return thread_send_msg_notification(thread); 1215 } 1216 1217 #ifdef __linux__ 1218 static int 1219 interrupt_timerfd_process(void *arg) 1220 { 1221 struct spdk_poller *poller = arg; 1222 uint64_t exp; 1223 int rc; 1224 1225 /* clear the level of interval timer */ 1226 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1227 if (rc < 0) { 1228 if (rc == -EAGAIN) { 1229 return 0; 1230 } 1231 1232 return rc; 1233 } 1234 1235 return poller->fn(poller->arg); 1236 } 1237 1238 static int 1239 period_poller_interrupt_init(struct spdk_poller *poller) 1240 { 1241 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1242 int timerfd; 1243 int rc; 1244 1245 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1246 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1247 if (timerfd < 0) { 1248 return -errno; 1249 } 1250 1251 rc = spdk_fd_group_add(fgrp, timerfd, 1252 interrupt_timerfd_process, poller); 1253 if (rc < 0) { 1254 close(timerfd); 1255 return rc; 1256 } 1257 1258 poller->interruptfd = timerfd; 1259 return 0; 1260 } 1261 1262 static void 1263 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1264 { 1265 int timerfd = poller->interruptfd; 1266 uint64_t now_tick = spdk_get_ticks(); 1267 uint64_t ticks = spdk_get_ticks_hz(); 1268 int ret; 1269 struct itimerspec new_tv = {}; 1270 struct itimerspec old_tv = {}; 1271 1272 assert(poller->period_ticks != 0); 1273 assert(timerfd >= 0); 1274 1275 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1276 interrupt_mode ? "interrupt" : "poll"); 1277 1278 if (interrupt_mode) { 1279 /* Set repeated timer expiration */ 1280 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1281 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1282 1283 /* Update next timer expiration */ 1284 if (poller->next_run_tick == 0) { 1285 poller->next_run_tick = now_tick + poller->period_ticks; 1286 } else if (poller->next_run_tick < now_tick) { 1287 poller->next_run_tick = now_tick; 1288 } 1289 1290 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1291 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1292 1293 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1294 if (ret < 0) { 1295 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1296 assert(false); 1297 } 1298 } else { 1299 /* Disarm the timer */ 1300 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1301 if (ret < 0) { 1302 /* timerfd_settime's failure indicates that the timerfd is in error */ 1303 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1304 assert(false); 1305 } 1306 1307 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1308 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1309 */ 1310 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1311 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1312 poller_remove_timer(poller->thread, poller); 1313 poller_insert_timer(poller->thread, poller, now_tick); 1314 } 1315 } 1316 1317 static void 1318 poller_interrupt_fini(struct spdk_poller *poller) 1319 { 1320 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1321 assert(poller->interruptfd >= 0); 1322 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1323 close(poller->interruptfd); 1324 poller->interruptfd = -1; 1325 } 1326 1327 static int 1328 busy_poller_interrupt_init(struct spdk_poller *poller) 1329 { 1330 int busy_efd; 1331 int rc; 1332 1333 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1334 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1335 if (busy_efd < 0) { 1336 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1337 return -errno; 1338 } 1339 1340 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, poller->fn, poller->arg); 1341 if (rc < 0) { 1342 close(busy_efd); 1343 return rc; 1344 } 1345 1346 poller->interruptfd = busy_efd; 1347 return 0; 1348 } 1349 1350 static void 1351 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1352 { 1353 int busy_efd = poller->interruptfd; 1354 uint64_t notify = 1; 1355 int rc __attribute__((unused)); 1356 1357 assert(busy_efd >= 0); 1358 1359 if (interrupt_mode) { 1360 /* Write without read on eventfd will get it repeatedly triggered. */ 1361 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1362 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1363 } 1364 } else { 1365 /* Read on eventfd will clear its level triggering. */ 1366 rc = read(busy_efd, ¬ify, sizeof(notify)); 1367 } 1368 } 1369 1370 #else 1371 1372 static int 1373 period_poller_interrupt_init(struct spdk_poller *poller) 1374 { 1375 return -ENOTSUP; 1376 } 1377 1378 static void 1379 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1380 { 1381 } 1382 1383 static void 1384 poller_interrupt_fini(struct spdk_poller *poller) 1385 { 1386 } 1387 1388 static int 1389 busy_poller_interrupt_init(struct spdk_poller *poller) 1390 { 1391 return -ENOTSUP; 1392 } 1393 1394 static void 1395 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1396 { 1397 } 1398 1399 #endif 1400 1401 void 1402 spdk_poller_register_interrupt(struct spdk_poller *poller, 1403 spdk_poller_set_interrupt_mode_cb cb_fn, 1404 void *cb_arg) 1405 { 1406 assert(poller != NULL); 1407 assert(cb_fn != NULL); 1408 assert(spdk_get_thread() == poller->thread); 1409 1410 if (!spdk_interrupt_mode_is_enabled()) { 1411 return; 1412 } 1413 1414 /* when a poller is created we don't know if the user is ever going to 1415 * enable interrupts on it by calling this function, so the poller 1416 * registration function has to immediately create a interruptfd. 1417 * When this function does get called by user, we have to then destroy 1418 * that interruptfd. 1419 */ 1420 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1421 poller_interrupt_fini(poller); 1422 } 1423 1424 poller->set_intr_cb_fn = cb_fn; 1425 poller->set_intr_cb_arg = cb_arg; 1426 1427 /* Set poller into interrupt mode if thread is in interrupt. */ 1428 if (poller->thread->in_interrupt) { 1429 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1430 } 1431 } 1432 1433 static uint64_t 1434 convert_us_to_ticks(uint64_t us) 1435 { 1436 uint64_t quotient, remainder, ticks; 1437 1438 if (us) { 1439 quotient = us / SPDK_SEC_TO_USEC; 1440 remainder = us % SPDK_SEC_TO_USEC; 1441 ticks = spdk_get_ticks_hz(); 1442 1443 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1444 } else { 1445 return 0; 1446 } 1447 } 1448 1449 static struct spdk_poller * 1450 poller_register(spdk_poller_fn fn, 1451 void *arg, 1452 uint64_t period_microseconds, 1453 const char *name) 1454 { 1455 struct spdk_thread *thread; 1456 struct spdk_poller *poller; 1457 1458 thread = spdk_get_thread(); 1459 if (!thread) { 1460 assert(false); 1461 return NULL; 1462 } 1463 1464 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1465 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1466 return NULL; 1467 } 1468 1469 poller = calloc(1, sizeof(*poller)); 1470 if (poller == NULL) { 1471 SPDK_ERRLOG("Poller memory allocation failed\n"); 1472 return NULL; 1473 } 1474 1475 if (name) { 1476 snprintf(poller->name, sizeof(poller->name), "%s", name); 1477 } else { 1478 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1479 } 1480 1481 poller->state = SPDK_POLLER_STATE_WAITING; 1482 poller->fn = fn; 1483 poller->arg = arg; 1484 poller->thread = thread; 1485 poller->interruptfd = -1; 1486 1487 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1488 1489 if (spdk_interrupt_mode_is_enabled()) { 1490 int rc; 1491 1492 if (period_microseconds) { 1493 rc = period_poller_interrupt_init(poller); 1494 if (rc < 0) { 1495 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1496 free(poller); 1497 return NULL; 1498 } 1499 1500 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1501 } else { 1502 /* If the poller doesn't have a period, create interruptfd that's always 1503 * busy automatically when runnning in interrupt mode. 1504 */ 1505 rc = busy_poller_interrupt_init(poller); 1506 if (rc > 0) { 1507 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1508 free(poller); 1509 return NULL; 1510 } 1511 1512 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1513 } 1514 } 1515 1516 thread_insert_poller(thread, poller); 1517 1518 return poller; 1519 } 1520 1521 struct spdk_poller * 1522 spdk_poller_register(spdk_poller_fn fn, 1523 void *arg, 1524 uint64_t period_microseconds) 1525 { 1526 return poller_register(fn, arg, period_microseconds, NULL); 1527 } 1528 1529 struct spdk_poller * 1530 spdk_poller_register_named(spdk_poller_fn fn, 1531 void *arg, 1532 uint64_t period_microseconds, 1533 const char *name) 1534 { 1535 return poller_register(fn, arg, period_microseconds, name); 1536 } 1537 1538 void 1539 spdk_poller_unregister(struct spdk_poller **ppoller) 1540 { 1541 struct spdk_thread *thread; 1542 struct spdk_poller *poller; 1543 1544 poller = *ppoller; 1545 if (poller == NULL) { 1546 return; 1547 } 1548 1549 *ppoller = NULL; 1550 1551 thread = spdk_get_thread(); 1552 if (!thread) { 1553 assert(false); 1554 return; 1555 } 1556 1557 if (poller->thread != thread) { 1558 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1559 assert(false); 1560 return; 1561 } 1562 1563 if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) { 1564 poller_interrupt_fini(poller); 1565 } 1566 1567 /* If the poller was paused, put it on the active_pollers list so that 1568 * its unregistration can be processed by spdk_thread_poll(). 1569 */ 1570 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1571 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1572 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1573 poller->period_ticks = 0; 1574 } 1575 1576 /* Simply set the state to unregistered. The poller will get cleaned up 1577 * in a subsequent call to spdk_thread_poll(). 1578 */ 1579 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1580 } 1581 1582 void 1583 spdk_poller_pause(struct spdk_poller *poller) 1584 { 1585 struct spdk_thread *thread; 1586 1587 thread = spdk_get_thread(); 1588 if (!thread) { 1589 assert(false); 1590 return; 1591 } 1592 1593 if (poller->thread != thread) { 1594 SPDK_ERRLOG("different from the thread that called spdk_poller_pause()\n"); 1595 assert(false); 1596 return; 1597 } 1598 1599 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1600 * spdk_thread_poll() move it. It allows a poller to be paused from 1601 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1602 * iteration, or from within itself without breaking the logic to always 1603 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1604 */ 1605 switch (poller->state) { 1606 case SPDK_POLLER_STATE_PAUSED: 1607 case SPDK_POLLER_STATE_PAUSING: 1608 break; 1609 case SPDK_POLLER_STATE_RUNNING: 1610 case SPDK_POLLER_STATE_WAITING: 1611 poller->state = SPDK_POLLER_STATE_PAUSING; 1612 break; 1613 default: 1614 assert(false); 1615 break; 1616 } 1617 } 1618 1619 void 1620 spdk_poller_resume(struct spdk_poller *poller) 1621 { 1622 struct spdk_thread *thread; 1623 1624 thread = spdk_get_thread(); 1625 if (!thread) { 1626 assert(false); 1627 return; 1628 } 1629 1630 if (poller->thread != thread) { 1631 SPDK_ERRLOG("different from the thread that called spdk_poller_resume()\n"); 1632 assert(false); 1633 return; 1634 } 1635 1636 /* If a poller is paused it has to be removed from the paused pollers 1637 * list and put on the active list or timer tree depending on its 1638 * period_ticks. If a poller is still in the process of being paused, 1639 * we just need to flip its state back to waiting, as it's already on 1640 * the appropriate list or tree. 1641 */ 1642 switch (poller->state) { 1643 case SPDK_POLLER_STATE_PAUSED: 1644 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1645 thread_insert_poller(thread, poller); 1646 /* fallthrough */ 1647 case SPDK_POLLER_STATE_PAUSING: 1648 poller->state = SPDK_POLLER_STATE_WAITING; 1649 break; 1650 case SPDK_POLLER_STATE_RUNNING: 1651 case SPDK_POLLER_STATE_WAITING: 1652 break; 1653 default: 1654 assert(false); 1655 break; 1656 } 1657 } 1658 1659 const char * 1660 spdk_poller_get_name(struct spdk_poller *poller) 1661 { 1662 return poller->name; 1663 } 1664 1665 const char * 1666 spdk_poller_get_state_str(struct spdk_poller *poller) 1667 { 1668 switch (poller->state) { 1669 case SPDK_POLLER_STATE_WAITING: 1670 return "waiting"; 1671 case SPDK_POLLER_STATE_RUNNING: 1672 return "running"; 1673 case SPDK_POLLER_STATE_UNREGISTERED: 1674 return "unregistered"; 1675 case SPDK_POLLER_STATE_PAUSING: 1676 return "pausing"; 1677 case SPDK_POLLER_STATE_PAUSED: 1678 return "paused"; 1679 default: 1680 return NULL; 1681 } 1682 } 1683 1684 uint64_t 1685 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1686 { 1687 return poller->period_ticks; 1688 } 1689 1690 void 1691 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1692 { 1693 stats->run_count = poller->run_count; 1694 stats->busy_count = poller->busy_count; 1695 } 1696 1697 struct spdk_poller * 1698 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1699 { 1700 return TAILQ_FIRST(&thread->active_pollers); 1701 } 1702 1703 struct spdk_poller * 1704 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1705 { 1706 return TAILQ_NEXT(prev, tailq); 1707 } 1708 1709 struct spdk_poller * 1710 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1711 { 1712 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1713 } 1714 1715 struct spdk_poller * 1716 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1717 { 1718 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1719 } 1720 1721 struct spdk_poller * 1722 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1723 { 1724 return TAILQ_FIRST(&thread->paused_pollers); 1725 } 1726 1727 struct spdk_poller * 1728 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1729 { 1730 return TAILQ_NEXT(prev, tailq); 1731 } 1732 1733 struct spdk_io_channel * 1734 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1735 { 1736 return TAILQ_FIRST(&thread->io_channels); 1737 } 1738 1739 struct spdk_io_channel * 1740 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1741 { 1742 return TAILQ_NEXT(prev, tailq); 1743 } 1744 1745 struct call_thread { 1746 struct spdk_thread *cur_thread; 1747 spdk_msg_fn fn; 1748 void *ctx; 1749 1750 struct spdk_thread *orig_thread; 1751 spdk_msg_fn cpl; 1752 }; 1753 1754 static void 1755 _on_thread(void *ctx) 1756 { 1757 struct call_thread *ct = ctx; 1758 int rc __attribute__((unused)); 1759 1760 ct->fn(ct->ctx); 1761 1762 pthread_mutex_lock(&g_devlist_mutex); 1763 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1764 pthread_mutex_unlock(&g_devlist_mutex); 1765 1766 if (!ct->cur_thread) { 1767 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1768 1769 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1770 free(ctx); 1771 } else { 1772 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1773 ct->cur_thread->name); 1774 1775 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1776 } 1777 assert(rc == 0); 1778 } 1779 1780 void 1781 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1782 { 1783 struct call_thread *ct; 1784 struct spdk_thread *thread; 1785 int rc __attribute__((unused)); 1786 1787 ct = calloc(1, sizeof(*ct)); 1788 if (!ct) { 1789 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1790 cpl(ctx); 1791 return; 1792 } 1793 1794 ct->fn = fn; 1795 ct->ctx = ctx; 1796 ct->cpl = cpl; 1797 1798 thread = _get_thread(); 1799 if (!thread) { 1800 SPDK_ERRLOG("No thread allocated\n"); 1801 free(ct); 1802 cpl(ctx); 1803 return; 1804 } 1805 ct->orig_thread = thread; 1806 1807 pthread_mutex_lock(&g_devlist_mutex); 1808 ct->cur_thread = TAILQ_FIRST(&g_threads); 1809 pthread_mutex_unlock(&g_devlist_mutex); 1810 1811 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1812 ct->orig_thread->name); 1813 1814 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1815 assert(rc == 0); 1816 } 1817 1818 static inline void 1819 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1820 { 1821 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1822 return; 1823 } 1824 1825 if (!poller->set_intr_cb_fn) { 1826 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1827 assert(false); 1828 return; 1829 } 1830 1831 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1832 } 1833 1834 void 1835 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1836 { 1837 struct spdk_thread *thread = _get_thread(); 1838 struct spdk_poller *poller, *tmp; 1839 1840 assert(thread); 1841 assert(spdk_interrupt_mode_is_enabled()); 1842 1843 if (thread->in_interrupt == enable_interrupt) { 1844 return; 1845 } 1846 1847 /* Set pollers to expected mode */ 1848 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1849 poller_set_interrupt_mode(poller, enable_interrupt); 1850 } 1851 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1852 poller_set_interrupt_mode(poller, enable_interrupt); 1853 } 1854 /* All paused pollers will go to work in interrupt mode */ 1855 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1856 poller_set_interrupt_mode(poller, enable_interrupt); 1857 } 1858 1859 thread->in_interrupt = enable_interrupt; 1860 return; 1861 } 1862 1863 void 1864 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1865 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1866 const char *name) 1867 { 1868 struct io_device *dev, *tmp; 1869 struct spdk_thread *thread; 1870 1871 assert(io_device != NULL); 1872 assert(create_cb != NULL); 1873 assert(destroy_cb != NULL); 1874 1875 thread = spdk_get_thread(); 1876 if (!thread) { 1877 SPDK_ERRLOG("called from non-SPDK thread\n"); 1878 assert(false); 1879 return; 1880 } 1881 1882 dev = calloc(1, sizeof(struct io_device)); 1883 if (dev == NULL) { 1884 SPDK_ERRLOG("could not allocate io_device\n"); 1885 return; 1886 } 1887 1888 dev->io_device = io_device; 1889 if (name) { 1890 snprintf(dev->name, sizeof(dev->name), "%s", name); 1891 } else { 1892 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1893 } 1894 dev->create_cb = create_cb; 1895 dev->destroy_cb = destroy_cb; 1896 dev->unregister_cb = NULL; 1897 dev->ctx_size = ctx_size; 1898 dev->for_each_count = 0; 1899 dev->unregistered = false; 1900 dev->refcnt = 0; 1901 1902 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1903 dev->name, dev->io_device, thread->name); 1904 1905 pthread_mutex_lock(&g_devlist_mutex); 1906 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1907 if (tmp->io_device == io_device) { 1908 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1909 io_device, tmp->name, dev->name); 1910 free(dev); 1911 pthread_mutex_unlock(&g_devlist_mutex); 1912 return; 1913 } 1914 } 1915 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1916 pthread_mutex_unlock(&g_devlist_mutex); 1917 } 1918 1919 static void 1920 _finish_unregister(void *arg) 1921 { 1922 struct io_device *dev = arg; 1923 struct spdk_thread *thread; 1924 1925 thread = spdk_get_thread(); 1926 assert(thread == dev->unregister_thread); 1927 1928 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1929 dev->name, dev->io_device, thread->name); 1930 1931 assert(thread->pending_unregister_count > 0); 1932 thread->pending_unregister_count--; 1933 1934 dev->unregister_cb(dev->io_device); 1935 free(dev); 1936 } 1937 1938 static void 1939 io_device_free(struct io_device *dev) 1940 { 1941 int rc __attribute__((unused)); 1942 1943 if (dev->unregister_cb == NULL) { 1944 free(dev); 1945 } else { 1946 assert(dev->unregister_thread != NULL); 1947 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1948 dev->name, dev->io_device, dev->unregister_thread->name); 1949 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1950 assert(rc == 0); 1951 } 1952 } 1953 1954 void 1955 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1956 { 1957 struct io_device *dev; 1958 uint32_t refcnt; 1959 struct spdk_thread *thread; 1960 1961 thread = spdk_get_thread(); 1962 if (!thread) { 1963 SPDK_ERRLOG("called from non-SPDK thread\n"); 1964 assert(false); 1965 return; 1966 } 1967 1968 pthread_mutex_lock(&g_devlist_mutex); 1969 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1970 if (dev->io_device == io_device) { 1971 break; 1972 } 1973 } 1974 1975 if (!dev) { 1976 SPDK_ERRLOG("io_device %p not found\n", io_device); 1977 assert(false); 1978 pthread_mutex_unlock(&g_devlist_mutex); 1979 return; 1980 } 1981 1982 if (dev->for_each_count > 0) { 1983 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1984 dev->name, io_device, dev->for_each_count); 1985 pthread_mutex_unlock(&g_devlist_mutex); 1986 return; 1987 } 1988 1989 dev->unregister_cb = unregister_cb; 1990 dev->unregistered = true; 1991 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1992 refcnt = dev->refcnt; 1993 dev->unregister_thread = thread; 1994 pthread_mutex_unlock(&g_devlist_mutex); 1995 1996 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1997 dev->name, dev->io_device, thread->name); 1998 1999 if (unregister_cb) { 2000 thread->pending_unregister_count++; 2001 } 2002 2003 if (refcnt > 0) { 2004 /* defer deletion */ 2005 return; 2006 } 2007 2008 io_device_free(dev); 2009 } 2010 2011 const char * 2012 spdk_io_device_get_name(struct io_device *dev) 2013 { 2014 return dev->name; 2015 } 2016 2017 struct spdk_io_channel * 2018 spdk_get_io_channel(void *io_device) 2019 { 2020 struct spdk_io_channel *ch; 2021 struct spdk_thread *thread; 2022 struct io_device *dev; 2023 int rc; 2024 2025 pthread_mutex_lock(&g_devlist_mutex); 2026 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 2027 if (dev->io_device == io_device) { 2028 break; 2029 } 2030 } 2031 if (dev == NULL) { 2032 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2033 pthread_mutex_unlock(&g_devlist_mutex); 2034 return NULL; 2035 } 2036 2037 thread = _get_thread(); 2038 if (!thread) { 2039 SPDK_ERRLOG("No thread allocated\n"); 2040 pthread_mutex_unlock(&g_devlist_mutex); 2041 return NULL; 2042 } 2043 2044 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2045 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2046 pthread_mutex_unlock(&g_devlist_mutex); 2047 return NULL; 2048 } 2049 2050 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2051 if (ch->dev == dev) { 2052 ch->ref++; 2053 2054 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2055 ch, dev->name, dev->io_device, thread->name, ch->ref); 2056 2057 /* 2058 * An I/O channel already exists for this device on this 2059 * thread, so return it. 2060 */ 2061 pthread_mutex_unlock(&g_devlist_mutex); 2062 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2063 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2064 return ch; 2065 } 2066 } 2067 2068 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2069 if (ch == NULL) { 2070 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2071 pthread_mutex_unlock(&g_devlist_mutex); 2072 return NULL; 2073 } 2074 2075 ch->dev = dev; 2076 ch->destroy_cb = dev->destroy_cb; 2077 ch->thread = thread; 2078 ch->ref = 1; 2079 ch->destroy_ref = 0; 2080 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 2081 2082 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2083 ch, dev->name, dev->io_device, thread->name, ch->ref); 2084 2085 dev->refcnt++; 2086 2087 pthread_mutex_unlock(&g_devlist_mutex); 2088 2089 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2090 if (rc != 0) { 2091 pthread_mutex_lock(&g_devlist_mutex); 2092 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 2093 dev->refcnt--; 2094 free(ch); 2095 pthread_mutex_unlock(&g_devlist_mutex); 2096 return NULL; 2097 } 2098 2099 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2100 return ch; 2101 } 2102 2103 static void 2104 put_io_channel(void *arg) 2105 { 2106 struct spdk_io_channel *ch = arg; 2107 bool do_remove_dev = true; 2108 struct spdk_thread *thread; 2109 2110 thread = spdk_get_thread(); 2111 if (!thread) { 2112 SPDK_ERRLOG("called from non-SPDK thread\n"); 2113 assert(false); 2114 return; 2115 } 2116 2117 SPDK_DEBUGLOG(thread, 2118 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2119 ch, ch->dev->name, ch->dev->io_device, thread->name); 2120 2121 assert(ch->thread == thread); 2122 2123 ch->destroy_ref--; 2124 2125 if (ch->ref > 0 || ch->destroy_ref > 0) { 2126 /* 2127 * Another reference to the associated io_device was requested 2128 * after this message was sent but before it had a chance to 2129 * execute. 2130 */ 2131 return; 2132 } 2133 2134 pthread_mutex_lock(&g_devlist_mutex); 2135 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 2136 pthread_mutex_unlock(&g_devlist_mutex); 2137 2138 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2139 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2140 2141 pthread_mutex_lock(&g_devlist_mutex); 2142 ch->dev->refcnt--; 2143 2144 if (!ch->dev->unregistered) { 2145 do_remove_dev = false; 2146 } 2147 2148 if (ch->dev->refcnt > 0) { 2149 do_remove_dev = false; 2150 } 2151 2152 pthread_mutex_unlock(&g_devlist_mutex); 2153 2154 if (do_remove_dev) { 2155 io_device_free(ch->dev); 2156 } 2157 free(ch); 2158 } 2159 2160 void 2161 spdk_put_io_channel(struct spdk_io_channel *ch) 2162 { 2163 struct spdk_thread *thread; 2164 int rc __attribute__((unused)); 2165 2166 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2167 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2168 2169 thread = spdk_get_thread(); 2170 if (!thread) { 2171 SPDK_ERRLOG("called from non-SPDK thread\n"); 2172 assert(false); 2173 return; 2174 } 2175 2176 if (ch->thread != thread) { 2177 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 2178 assert(false); 2179 return; 2180 } 2181 2182 SPDK_DEBUGLOG(thread, 2183 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2184 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2185 2186 ch->ref--; 2187 2188 if (ch->ref == 0) { 2189 ch->destroy_ref++; 2190 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2191 assert(rc == 0); 2192 } 2193 } 2194 2195 struct spdk_io_channel * 2196 spdk_io_channel_from_ctx(void *ctx) 2197 { 2198 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2199 } 2200 2201 struct spdk_thread * 2202 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2203 { 2204 return ch->thread; 2205 } 2206 2207 void * 2208 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2209 { 2210 return ch->dev->io_device; 2211 } 2212 2213 const char * 2214 spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch) 2215 { 2216 return spdk_io_device_get_name(ch->dev); 2217 } 2218 2219 int 2220 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2221 { 2222 return ch->ref; 2223 } 2224 2225 struct spdk_io_channel_iter { 2226 void *io_device; 2227 struct io_device *dev; 2228 spdk_channel_msg fn; 2229 int status; 2230 void *ctx; 2231 struct spdk_io_channel *ch; 2232 2233 struct spdk_thread *cur_thread; 2234 2235 struct spdk_thread *orig_thread; 2236 spdk_channel_for_each_cpl cpl; 2237 }; 2238 2239 void * 2240 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2241 { 2242 return i->io_device; 2243 } 2244 2245 struct spdk_io_channel * 2246 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2247 { 2248 return i->ch; 2249 } 2250 2251 void * 2252 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2253 { 2254 return i->ctx; 2255 } 2256 2257 static void 2258 _call_completion(void *ctx) 2259 { 2260 struct spdk_io_channel_iter *i = ctx; 2261 2262 if (i->cpl != NULL) { 2263 i->cpl(i, i->status); 2264 } 2265 free(i); 2266 } 2267 2268 static void 2269 _call_channel(void *ctx) 2270 { 2271 struct spdk_io_channel_iter *i = ctx; 2272 struct spdk_io_channel *ch; 2273 2274 /* 2275 * It is possible that the channel was deleted before this 2276 * message had a chance to execute. If so, skip calling 2277 * the fn() on this thread. 2278 */ 2279 pthread_mutex_lock(&g_devlist_mutex); 2280 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 2281 if (ch->dev->io_device == i->io_device) { 2282 break; 2283 } 2284 } 2285 pthread_mutex_unlock(&g_devlist_mutex); 2286 2287 if (ch) { 2288 i->fn(i); 2289 } else { 2290 spdk_for_each_channel_continue(i, 0); 2291 } 2292 } 2293 2294 void 2295 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2296 spdk_channel_for_each_cpl cpl) 2297 { 2298 struct spdk_thread *thread; 2299 struct spdk_io_channel *ch; 2300 struct spdk_io_channel_iter *i; 2301 int rc __attribute__((unused)); 2302 2303 i = calloc(1, sizeof(*i)); 2304 if (!i) { 2305 SPDK_ERRLOG("Unable to allocate iterator\n"); 2306 return; 2307 } 2308 2309 i->io_device = io_device; 2310 i->fn = fn; 2311 i->ctx = ctx; 2312 i->cpl = cpl; 2313 2314 pthread_mutex_lock(&g_devlist_mutex); 2315 i->orig_thread = _get_thread(); 2316 2317 TAILQ_FOREACH(thread, &g_threads, tailq) { 2318 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2319 if (ch->dev->io_device == io_device) { 2320 ch->dev->for_each_count++; 2321 i->dev = ch->dev; 2322 i->cur_thread = thread; 2323 i->ch = ch; 2324 pthread_mutex_unlock(&g_devlist_mutex); 2325 rc = spdk_thread_send_msg(thread, _call_channel, i); 2326 assert(rc == 0); 2327 return; 2328 } 2329 } 2330 } 2331 2332 pthread_mutex_unlock(&g_devlist_mutex); 2333 2334 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2335 assert(rc == 0); 2336 } 2337 2338 void 2339 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2340 { 2341 struct spdk_thread *thread; 2342 struct spdk_io_channel *ch; 2343 int rc __attribute__((unused)); 2344 2345 assert(i->cur_thread == spdk_get_thread()); 2346 2347 i->status = status; 2348 2349 pthread_mutex_lock(&g_devlist_mutex); 2350 if (status) { 2351 goto end; 2352 } 2353 thread = TAILQ_NEXT(i->cur_thread, tailq); 2354 while (thread) { 2355 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2356 if (ch->dev->io_device == i->io_device) { 2357 i->cur_thread = thread; 2358 i->ch = ch; 2359 pthread_mutex_unlock(&g_devlist_mutex); 2360 rc = spdk_thread_send_msg(thread, _call_channel, i); 2361 assert(rc == 0); 2362 return; 2363 } 2364 } 2365 thread = TAILQ_NEXT(thread, tailq); 2366 } 2367 2368 end: 2369 i->dev->for_each_count--; 2370 i->ch = NULL; 2371 pthread_mutex_unlock(&g_devlist_mutex); 2372 2373 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2374 assert(rc == 0); 2375 } 2376 2377 struct spdk_interrupt { 2378 int efd; 2379 struct spdk_thread *thread; 2380 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2381 }; 2382 2383 static void 2384 thread_interrupt_destroy(struct spdk_thread *thread) 2385 { 2386 struct spdk_fd_group *fgrp = thread->fgrp; 2387 2388 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2389 2390 if (thread->msg_fd < 0) { 2391 return; 2392 } 2393 2394 spdk_fd_group_remove(fgrp, thread->msg_fd); 2395 close(thread->msg_fd); 2396 thread->msg_fd = -1; 2397 2398 spdk_fd_group_destroy(fgrp); 2399 thread->fgrp = NULL; 2400 } 2401 2402 #ifdef __linux__ 2403 static int 2404 thread_interrupt_msg_process(void *arg) 2405 { 2406 struct spdk_thread *thread = arg; 2407 uint32_t msg_count; 2408 spdk_msg_fn critical_msg; 2409 int rc = 0; 2410 uint64_t notify = 1; 2411 2412 assert(spdk_interrupt_mode_is_enabled()); 2413 2414 /* There may be race between msg_acknowledge and another producer's msg_notify, 2415 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2416 * This can avoid msg notification missing. 2417 */ 2418 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2419 if (rc < 0 && errno != EAGAIN) { 2420 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2421 } 2422 2423 critical_msg = thread->critical_msg; 2424 if (spdk_unlikely(critical_msg != NULL)) { 2425 critical_msg(NULL); 2426 thread->critical_msg = NULL; 2427 rc = 1; 2428 } 2429 2430 msg_count = msg_queue_run_batch(thread, 0); 2431 if (msg_count) { 2432 rc = 1; 2433 } 2434 2435 return rc; 2436 } 2437 2438 static int 2439 thread_interrupt_create(struct spdk_thread *thread) 2440 { 2441 int rc; 2442 2443 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2444 2445 rc = spdk_fd_group_create(&thread->fgrp); 2446 if (rc) { 2447 return rc; 2448 } 2449 2450 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2451 if (thread->msg_fd < 0) { 2452 rc = -errno; 2453 spdk_fd_group_destroy(thread->fgrp); 2454 thread->fgrp = NULL; 2455 2456 return rc; 2457 } 2458 2459 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 2460 } 2461 #else 2462 static int 2463 thread_interrupt_create(struct spdk_thread *thread) 2464 { 2465 return -ENOTSUP; 2466 } 2467 #endif 2468 2469 struct spdk_interrupt * 2470 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2471 void *arg, const char *name) 2472 { 2473 struct spdk_thread *thread; 2474 struct spdk_interrupt *intr; 2475 2476 thread = spdk_get_thread(); 2477 if (!thread) { 2478 assert(false); 2479 return NULL; 2480 } 2481 2482 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2483 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2484 return NULL; 2485 } 2486 2487 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 2488 return NULL; 2489 } 2490 2491 intr = calloc(1, sizeof(*intr)); 2492 if (intr == NULL) { 2493 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2494 return NULL; 2495 } 2496 2497 if (name) { 2498 snprintf(intr->name, sizeof(intr->name), "%s", name); 2499 } else { 2500 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2501 } 2502 2503 intr->efd = efd; 2504 intr->thread = thread; 2505 2506 return intr; 2507 } 2508 2509 void 2510 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2511 { 2512 struct spdk_thread *thread; 2513 struct spdk_interrupt *intr; 2514 2515 intr = *pintr; 2516 if (intr == NULL) { 2517 return; 2518 } 2519 2520 *pintr = NULL; 2521 2522 thread = spdk_get_thread(); 2523 if (!thread) { 2524 assert(false); 2525 return; 2526 } 2527 2528 if (intr->thread != thread) { 2529 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2530 assert(false); 2531 return; 2532 } 2533 2534 spdk_fd_group_remove(thread->fgrp, intr->efd); 2535 free(intr); 2536 } 2537 2538 int 2539 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2540 enum spdk_interrupt_event_types event_types) 2541 { 2542 struct spdk_thread *thread; 2543 2544 thread = spdk_get_thread(); 2545 if (!thread) { 2546 assert(false); 2547 return -EINVAL; 2548 } 2549 2550 if (intr->thread != thread) { 2551 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2552 assert(false); 2553 return -EINVAL; 2554 } 2555 2556 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2557 } 2558 2559 int 2560 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2561 { 2562 return spdk_fd_group_get_fd(thread->fgrp); 2563 } 2564 2565 static bool g_interrupt_mode = false; 2566 2567 int 2568 spdk_interrupt_mode_enable(void) 2569 { 2570 /* It must be called once prior to initializing the threading library. 2571 * g_spdk_msg_mempool will be valid if thread library is initialized. 2572 */ 2573 if (g_spdk_msg_mempool) { 2574 SPDK_ERRLOG("Failed due to threading library is already initailzied.\n"); 2575 return -1; 2576 } 2577 2578 #ifdef __linux__ 2579 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2580 g_interrupt_mode = true; 2581 return 0; 2582 #else 2583 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2584 g_interrupt_mode = false; 2585 return -ENOTSUP; 2586 #endif 2587 } 2588 2589 bool 2590 spdk_interrupt_mode_is_enabled(void) 2591 { 2592 return g_interrupt_mode; 2593 } 2594 2595 SPDK_LOG_REGISTER_COMPONENT(thread) 2596