1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/trace.h" 42 #include "spdk/tree.h" 43 #include "spdk/util.h" 44 #include "spdk/fd_group.h" 45 46 #include "spdk/log.h" 47 #include "spdk_internal/thread.h" 48 49 #ifdef __linux__ 50 #include <sys/timerfd.h> 51 #include <sys/eventfd.h> 52 #endif 53 54 #define SPDK_MSG_BATCH_SIZE 8 55 #define SPDK_MAX_DEVICE_NAME_LEN 256 56 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 57 #define SPDK_MAX_POLLER_NAME_LEN 256 58 #define SPDK_MAX_THREAD_NAME_LEN 256 59 60 enum spdk_poller_state { 61 /* The poller is registered with a thread but not currently executing its fn. */ 62 SPDK_POLLER_STATE_WAITING, 63 64 /* The poller is currently running its fn. */ 65 SPDK_POLLER_STATE_RUNNING, 66 67 /* The poller was unregistered during the execution of its fn. */ 68 SPDK_POLLER_STATE_UNREGISTERED, 69 70 /* The poller is in the process of being paused. It will be paused 71 * during the next time it's supposed to be executed. 72 */ 73 SPDK_POLLER_STATE_PAUSING, 74 75 /* The poller is registered but currently paused. It's on the 76 * paused_pollers list. 77 */ 78 SPDK_POLLER_STATE_PAUSED, 79 }; 80 81 struct spdk_poller { 82 TAILQ_ENTRY(spdk_poller) tailq; 83 RB_ENTRY(spdk_poller) node; 84 85 /* Current state of the poller; should only be accessed from the poller's thread. */ 86 enum spdk_poller_state state; 87 88 uint64_t period_ticks; 89 uint64_t next_run_tick; 90 uint64_t run_count; 91 uint64_t busy_count; 92 spdk_poller_fn fn; 93 void *arg; 94 struct spdk_thread *thread; 95 int interruptfd; 96 spdk_poller_set_interrupt_mode_cb set_intr_cb_fn; 97 void *set_intr_cb_arg; 98 99 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 100 }; 101 102 enum spdk_thread_state { 103 /* The thread is pocessing poller and message by spdk_thread_poll(). */ 104 SPDK_THREAD_STATE_RUNNING, 105 106 /* The thread is in the process of termination. It reaps unregistering 107 * poller are releasing I/O channel. 108 */ 109 SPDK_THREAD_STATE_EXITING, 110 111 /* The thread is exited. It is ready to call spdk_thread_destroy(). */ 112 SPDK_THREAD_STATE_EXITED, 113 }; 114 115 struct spdk_thread { 116 uint64_t tsc_last; 117 struct spdk_thread_stats stats; 118 /* 119 * Contains pollers actively running on this thread. Pollers 120 * are run round-robin. The thread takes one poller from the head 121 * of the ring, executes it, then puts it back at the tail of 122 * the ring. 123 */ 124 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 125 /** 126 * Contains pollers running on this thread with a periodic timer. 127 */ 128 RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; 129 struct spdk_poller *first_timed_poller; 130 /* 131 * Contains paused pollers. Pollers on this queue are waiting until 132 * they are resumed (in which case they're put onto the active/timer 133 * queues) or unregistered. 134 */ 135 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 136 struct spdk_ring *messages; 137 int msg_fd; 138 SLIST_HEAD(, spdk_msg) msg_cache; 139 size_t msg_cache_count; 140 spdk_msg_fn critical_msg; 141 uint64_t id; 142 enum spdk_thread_state state; 143 int pending_unregister_count; 144 145 TAILQ_HEAD(, spdk_io_channel) io_channels; 146 TAILQ_ENTRY(spdk_thread) tailq; 147 148 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 149 struct spdk_cpuset cpumask; 150 uint64_t exit_timeout_tsc; 151 152 /* Indicates whether this spdk_thread currently runs in interrupt. */ 153 bool in_interrupt; 154 struct spdk_fd_group *fgrp; 155 156 /* User context allocated at the end */ 157 uint8_t ctx[0]; 158 }; 159 160 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 161 162 static spdk_new_thread_fn g_new_thread_fn = NULL; 163 static spdk_thread_op_fn g_thread_op_fn = NULL; 164 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 165 static size_t g_ctx_sz = 0; 166 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 167 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 168 * SPDK application is required. 169 */ 170 static uint64_t g_thread_id = 1; 171 172 struct io_device { 173 void *io_device; 174 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 175 spdk_io_channel_create_cb create_cb; 176 spdk_io_channel_destroy_cb destroy_cb; 177 spdk_io_device_unregister_cb unregister_cb; 178 struct spdk_thread *unregister_thread; 179 uint32_t ctx_size; 180 uint32_t for_each_count; 181 TAILQ_ENTRY(io_device) tailq; 182 183 uint32_t refcnt; 184 185 bool unregistered; 186 }; 187 188 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 189 190 struct spdk_msg { 191 spdk_msg_fn fn; 192 void *arg; 193 194 SLIST_ENTRY(spdk_msg) link; 195 }; 196 197 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 198 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 199 200 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 201 static uint32_t g_thread_count = 0; 202 203 static __thread struct spdk_thread *tls_thread = NULL; 204 205 #define TRACE_GROUP_THREAD 0xa 206 #define TRACE_THREAD_IOCH_GET SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x0) 207 #define TRACE_THREAD_IOCH_PUT SPDK_TPOINT_ID(TRACE_GROUP_THREAD, 0x1) 208 209 SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD) 210 { 211 spdk_trace_register_description("THREAD_IOCH_GET", 212 TRACE_THREAD_IOCH_GET, 213 OWNER_NONE, OBJECT_NONE, 0, 214 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 215 spdk_trace_register_description("THREAD_IOCH_PUT", 216 TRACE_THREAD_IOCH_PUT, 217 OWNER_NONE, OBJECT_NONE, 0, 218 SPDK_TRACE_ARG_TYPE_INT, "refcnt"); 219 } 220 221 /* 222 * If this compare function returns zero when two next_run_ticks are equal, 223 * the macro RB_INSERT() returns a pointer to the element with the same 224 * next_run_tick. 225 * 226 * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element 227 * to remove as a parameter. 228 * 229 * Hence we allow RB_INSERT() to insert elements with the same keys on the right 230 * side by returning 1 when two next_run_ticks are equal. 231 */ 232 static inline int 233 timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) 234 { 235 if (poller1->next_run_tick < poller2->next_run_tick) { 236 return -1; 237 } else { 238 return 1; 239 } 240 } 241 242 RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); 243 244 static inline struct spdk_thread * 245 _get_thread(void) 246 { 247 return tls_thread; 248 } 249 250 static int 251 _thread_lib_init(size_t ctx_sz) 252 { 253 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 254 255 g_ctx_sz = ctx_sz; 256 257 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 258 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 259 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 260 sizeof(struct spdk_msg), 261 0, /* No cache. We do our own. */ 262 SPDK_ENV_SOCKET_ID_ANY); 263 264 if (!g_spdk_msg_mempool) { 265 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 266 return -1; 267 } 268 269 return 0; 270 } 271 272 int 273 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 274 { 275 assert(g_new_thread_fn == NULL); 276 assert(g_thread_op_fn == NULL); 277 278 if (new_thread_fn == NULL) { 279 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 280 } else { 281 g_new_thread_fn = new_thread_fn; 282 } 283 284 return _thread_lib_init(ctx_sz); 285 } 286 287 int 288 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 289 spdk_thread_op_supported_fn thread_op_supported_fn, 290 size_t ctx_sz) 291 { 292 assert(g_new_thread_fn == NULL); 293 assert(g_thread_op_fn == NULL); 294 assert(g_thread_op_supported_fn == NULL); 295 296 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 297 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 298 return -EINVAL; 299 } 300 301 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 302 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 303 } else { 304 g_thread_op_fn = thread_op_fn; 305 g_thread_op_supported_fn = thread_op_supported_fn; 306 } 307 308 return _thread_lib_init(ctx_sz); 309 } 310 311 void 312 spdk_thread_lib_fini(void) 313 { 314 struct io_device *dev; 315 316 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 317 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 318 } 319 320 if (g_spdk_msg_mempool) { 321 spdk_mempool_free(g_spdk_msg_mempool); 322 g_spdk_msg_mempool = NULL; 323 } 324 325 g_new_thread_fn = NULL; 326 g_thread_op_fn = NULL; 327 g_thread_op_supported_fn = NULL; 328 g_ctx_sz = 0; 329 } 330 331 static void thread_interrupt_destroy(struct spdk_thread *thread); 332 static int thread_interrupt_create(struct spdk_thread *thread); 333 334 static void 335 _free_thread(struct spdk_thread *thread) 336 { 337 struct spdk_io_channel *ch; 338 struct spdk_msg *msg; 339 struct spdk_poller *poller, *ptmp; 340 341 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 342 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 343 thread->name, ch->dev->name); 344 } 345 346 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 347 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 348 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 349 poller->name); 350 } 351 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 352 free(poller); 353 } 354 355 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { 356 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 357 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 358 poller->name); 359 } 360 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 361 free(poller); 362 } 363 364 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 365 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 366 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 367 free(poller); 368 } 369 370 pthread_mutex_lock(&g_devlist_mutex); 371 assert(g_thread_count > 0); 372 g_thread_count--; 373 TAILQ_REMOVE(&g_threads, thread, tailq); 374 pthread_mutex_unlock(&g_devlist_mutex); 375 376 msg = SLIST_FIRST(&thread->msg_cache); 377 while (msg != NULL) { 378 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 379 380 assert(thread->msg_cache_count > 0); 381 thread->msg_cache_count--; 382 spdk_mempool_put(g_spdk_msg_mempool, msg); 383 384 msg = SLIST_FIRST(&thread->msg_cache); 385 } 386 387 assert(thread->msg_cache_count == 0); 388 389 if (spdk_interrupt_mode_is_enabled()) { 390 thread_interrupt_destroy(thread); 391 } 392 393 spdk_ring_free(thread->messages); 394 free(thread); 395 } 396 397 struct spdk_thread * 398 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 399 { 400 struct spdk_thread *thread; 401 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 402 int rc = 0, i; 403 404 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 405 if (!thread) { 406 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 407 return NULL; 408 } 409 410 if (cpumask) { 411 spdk_cpuset_copy(&thread->cpumask, cpumask); 412 } else { 413 spdk_cpuset_negate(&thread->cpumask); 414 } 415 416 TAILQ_INIT(&thread->io_channels); 417 TAILQ_INIT(&thread->active_pollers); 418 RB_INIT(&thread->timed_pollers); 419 TAILQ_INIT(&thread->paused_pollers); 420 SLIST_INIT(&thread->msg_cache); 421 thread->msg_cache_count = 0; 422 423 thread->tsc_last = spdk_get_ticks(); 424 425 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 426 if (!thread->messages) { 427 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 428 free(thread); 429 return NULL; 430 } 431 432 /* Fill the local message pool cache. */ 433 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 434 if (rc == 0) { 435 /* If we can't populate the cache it's ok. The cache will get filled 436 * up organically as messages are passed to the thread. */ 437 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 438 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 439 thread->msg_cache_count++; 440 } 441 } 442 443 if (name) { 444 snprintf(thread->name, sizeof(thread->name), "%s", name); 445 } else { 446 snprintf(thread->name, sizeof(thread->name), "%p", thread); 447 } 448 449 pthread_mutex_lock(&g_devlist_mutex); 450 if (g_thread_id == 0) { 451 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 452 pthread_mutex_unlock(&g_devlist_mutex); 453 _free_thread(thread); 454 return NULL; 455 } 456 thread->id = g_thread_id++; 457 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 458 g_thread_count++; 459 pthread_mutex_unlock(&g_devlist_mutex); 460 461 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 462 thread->id, thread->name); 463 464 if (spdk_interrupt_mode_is_enabled()) { 465 thread->in_interrupt = true; 466 rc = thread_interrupt_create(thread); 467 if (rc != 0) { 468 _free_thread(thread); 469 return NULL; 470 } 471 } 472 473 if (g_new_thread_fn) { 474 rc = g_new_thread_fn(thread); 475 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 476 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 477 } 478 479 if (rc != 0) { 480 _free_thread(thread); 481 return NULL; 482 } 483 484 thread->state = SPDK_THREAD_STATE_RUNNING; 485 486 return thread; 487 } 488 489 void 490 spdk_set_thread(struct spdk_thread *thread) 491 { 492 tls_thread = thread; 493 } 494 495 static void 496 thread_exit(struct spdk_thread *thread, uint64_t now) 497 { 498 struct spdk_poller *poller; 499 struct spdk_io_channel *ch; 500 501 if (now >= thread->exit_timeout_tsc) { 502 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 503 thread->name); 504 goto exited; 505 } 506 507 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 508 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 509 SPDK_INFOLOG(thread, 510 "thread %s still has active poller %s\n", 511 thread->name, poller->name); 512 return; 513 } 514 } 515 516 RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { 517 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 518 SPDK_INFOLOG(thread, 519 "thread %s still has active timed poller %s\n", 520 thread->name, poller->name); 521 return; 522 } 523 } 524 525 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 526 SPDK_INFOLOG(thread, 527 "thread %s still has paused poller %s\n", 528 thread->name, poller->name); 529 return; 530 } 531 532 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 533 SPDK_INFOLOG(thread, 534 "thread %s still has channel for io_device %s\n", 535 thread->name, ch->dev->name); 536 return; 537 } 538 539 if (thread->pending_unregister_count > 0) { 540 SPDK_INFOLOG(thread, 541 "thread %s is still unregistering io_devices\n", 542 thread->name); 543 return; 544 } 545 546 exited: 547 thread->state = SPDK_THREAD_STATE_EXITED; 548 } 549 550 int 551 spdk_thread_exit(struct spdk_thread *thread) 552 { 553 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 554 555 assert(tls_thread == thread); 556 557 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 558 SPDK_INFOLOG(thread, 559 "thread %s is already exiting\n", 560 thread->name); 561 return 0; 562 } 563 564 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 565 SPDK_THREAD_EXIT_TIMEOUT_SEC); 566 thread->state = SPDK_THREAD_STATE_EXITING; 567 return 0; 568 } 569 570 bool 571 spdk_thread_is_exited(struct spdk_thread *thread) 572 { 573 return thread->state == SPDK_THREAD_STATE_EXITED; 574 } 575 576 void 577 spdk_thread_destroy(struct spdk_thread *thread) 578 { 579 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 580 581 assert(thread->state == SPDK_THREAD_STATE_EXITED); 582 583 if (tls_thread == thread) { 584 tls_thread = NULL; 585 } 586 587 _free_thread(thread); 588 } 589 590 void * 591 spdk_thread_get_ctx(struct spdk_thread *thread) 592 { 593 if (g_ctx_sz > 0) { 594 return thread->ctx; 595 } 596 597 return NULL; 598 } 599 600 struct spdk_cpuset * 601 spdk_thread_get_cpumask(struct spdk_thread *thread) 602 { 603 return &thread->cpumask; 604 } 605 606 int 607 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 608 { 609 struct spdk_thread *thread; 610 611 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 612 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 613 assert(false); 614 return -ENOTSUP; 615 } 616 617 thread = spdk_get_thread(); 618 if (!thread) { 619 SPDK_ERRLOG("Called from non-SPDK thread\n"); 620 assert(false); 621 return -EINVAL; 622 } 623 624 spdk_cpuset_copy(&thread->cpumask, cpumask); 625 626 /* Invoke framework's reschedule operation. If this function is called multiple times 627 * in a single spdk_thread_poll() context, the last cpumask will be used in the 628 * reschedule operation. 629 */ 630 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 631 632 return 0; 633 } 634 635 struct spdk_thread * 636 spdk_thread_get_from_ctx(void *ctx) 637 { 638 if (ctx == NULL) { 639 assert(false); 640 return NULL; 641 } 642 643 assert(g_ctx_sz > 0); 644 645 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 646 } 647 648 static inline uint32_t 649 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 650 { 651 unsigned count, i; 652 void *messages[SPDK_MSG_BATCH_SIZE]; 653 uint64_t notify = 1; 654 int rc; 655 656 #ifdef DEBUG 657 /* 658 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 659 * so we will never actually read uninitialized data from events, but just to be sure 660 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 661 */ 662 memset(messages, 0, sizeof(messages)); 663 #endif 664 665 if (max_msgs > 0) { 666 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 667 } else { 668 max_msgs = SPDK_MSG_BATCH_SIZE; 669 } 670 671 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 672 if (spdk_unlikely(thread->in_interrupt) && 673 spdk_ring_count(thread->messages) != 0) { 674 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 675 if (rc < 0) { 676 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 677 } 678 } 679 if (count == 0) { 680 return 0; 681 } 682 683 for (i = 0; i < count; i++) { 684 struct spdk_msg *msg = messages[i]; 685 686 assert(msg != NULL); 687 msg->fn(msg->arg); 688 689 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 690 /* Insert the messages at the head. We want to re-use the hot 691 * ones. */ 692 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 693 thread->msg_cache_count++; 694 } else { 695 spdk_mempool_put(g_spdk_msg_mempool, msg); 696 } 697 } 698 699 return count; 700 } 701 702 static void 703 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 704 { 705 struct spdk_poller *tmp __attribute__((unused)); 706 707 poller->next_run_tick = now + poller->period_ticks; 708 709 /* 710 * Insert poller in the thread's timed_pollers tree by next scheduled run time 711 * as its key. 712 */ 713 tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); 714 assert(tmp == NULL); 715 716 /* Update the cache only if it is empty or the inserted poller is earlier than it. 717 * RB_MIN() is not necessary here because all pollers, which has exactly the same 718 * next_run_tick as the existing poller, are inserted on the right side. 719 */ 720 if (thread->first_timed_poller == NULL || 721 poller->next_run_tick < thread->first_timed_poller->next_run_tick) { 722 thread->first_timed_poller = poller; 723 } 724 } 725 726 #ifdef __linux__ 727 static inline void 728 poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) 729 { 730 struct spdk_poller *tmp __attribute__((unused)); 731 732 tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 733 assert(tmp != NULL); 734 735 /* This function is not used in any case that is performance critical. 736 * Update the cache simply by RB_MIN() if it needs to be changed. 737 */ 738 if (thread->first_timed_poller == poller) { 739 thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); 740 } 741 } 742 #endif 743 744 static void 745 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 746 { 747 if (poller->period_ticks) { 748 poller_insert_timer(thread, poller, spdk_get_ticks()); 749 } else { 750 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 751 } 752 } 753 754 static inline void 755 thread_update_stats(struct spdk_thread *thread, uint64_t end, 756 uint64_t start, int rc) 757 { 758 if (rc == 0) { 759 /* Poller status idle */ 760 thread->stats.idle_tsc += end - start; 761 } else if (rc > 0) { 762 /* Poller status busy */ 763 thread->stats.busy_tsc += end - start; 764 } 765 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 766 thread->tsc_last = end; 767 } 768 769 static inline int 770 thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller) 771 { 772 int rc; 773 774 switch (poller->state) { 775 case SPDK_POLLER_STATE_UNREGISTERED: 776 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 777 free(poller); 778 return 0; 779 case SPDK_POLLER_STATE_PAUSING: 780 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 781 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 782 poller->state = SPDK_POLLER_STATE_PAUSED; 783 return 0; 784 case SPDK_POLLER_STATE_WAITING: 785 break; 786 default: 787 assert(false); 788 break; 789 } 790 791 poller->state = SPDK_POLLER_STATE_RUNNING; 792 rc = poller->fn(poller->arg); 793 794 poller->run_count++; 795 if (rc > 0) { 796 poller->busy_count++; 797 } 798 799 #ifdef DEBUG 800 if (rc == -1) { 801 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 802 } 803 #endif 804 805 switch (poller->state) { 806 case SPDK_POLLER_STATE_UNREGISTERED: 807 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 808 free(poller); 809 break; 810 case SPDK_POLLER_STATE_PAUSING: 811 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 812 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 813 poller->state = SPDK_POLLER_STATE_PAUSED; 814 break; 815 case SPDK_POLLER_STATE_PAUSED: 816 case SPDK_POLLER_STATE_WAITING: 817 break; 818 case SPDK_POLLER_STATE_RUNNING: 819 poller->state = SPDK_POLLER_STATE_WAITING; 820 break; 821 default: 822 assert(false); 823 break; 824 } 825 826 return rc; 827 } 828 829 static inline int 830 thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller, 831 uint64_t now) 832 { 833 int rc; 834 835 switch (poller->state) { 836 case SPDK_POLLER_STATE_UNREGISTERED: 837 free(poller); 838 return 0; 839 case SPDK_POLLER_STATE_PAUSING: 840 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 841 poller->state = SPDK_POLLER_STATE_PAUSED; 842 return 0; 843 case SPDK_POLLER_STATE_WAITING: 844 break; 845 default: 846 assert(false); 847 break; 848 } 849 850 poller->state = SPDK_POLLER_STATE_RUNNING; 851 rc = poller->fn(poller->arg); 852 853 poller->run_count++; 854 if (rc > 0) { 855 poller->busy_count++; 856 } 857 858 #ifdef DEBUG 859 if (rc == -1) { 860 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 861 } 862 #endif 863 864 switch (poller->state) { 865 case SPDK_POLLER_STATE_UNREGISTERED: 866 free(poller); 867 break; 868 case SPDK_POLLER_STATE_PAUSING: 869 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 870 poller->state = SPDK_POLLER_STATE_PAUSED; 871 break; 872 case SPDK_POLLER_STATE_PAUSED: 873 break; 874 case SPDK_POLLER_STATE_RUNNING: 875 poller->state = SPDK_POLLER_STATE_WAITING; 876 /* fallthrough */ 877 case SPDK_POLLER_STATE_WAITING: 878 poller_insert_timer(thread, poller, now); 879 break; 880 default: 881 assert(false); 882 break; 883 } 884 885 return rc; 886 } 887 888 static int 889 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 890 { 891 uint32_t msg_count; 892 struct spdk_poller *poller, *tmp; 893 spdk_msg_fn critical_msg; 894 int rc = 0; 895 896 thread->tsc_last = now; 897 898 critical_msg = thread->critical_msg; 899 if (spdk_unlikely(critical_msg != NULL)) { 900 critical_msg(NULL); 901 thread->critical_msg = NULL; 902 rc = 1; 903 } 904 905 msg_count = msg_queue_run_batch(thread, max_msgs); 906 if (msg_count) { 907 rc = 1; 908 } 909 910 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 911 active_pollers_head, tailq, tmp) { 912 int poller_rc; 913 914 poller_rc = thread_execute_poller(thread, poller); 915 if (poller_rc > rc) { 916 rc = poller_rc; 917 } 918 } 919 920 poller = thread->first_timed_poller; 921 while (poller != NULL) { 922 int timer_rc = 0; 923 924 if (now < poller->next_run_tick) { 925 break; 926 } 927 928 tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); 929 RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); 930 931 /* Update the cache to the next timed poller in the list 932 * only if the current poller is still the closest, otherwise, 933 * do nothing because the cache has been already updated. 934 */ 935 if (thread->first_timed_poller == poller) { 936 thread->first_timed_poller = tmp; 937 } 938 939 timer_rc = thread_execute_timed_poller(thread, poller, now); 940 if (timer_rc > rc) { 941 rc = timer_rc; 942 } 943 944 poller = tmp; 945 } 946 947 return rc; 948 } 949 950 int 951 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 952 { 953 struct spdk_thread *orig_thread; 954 int rc; 955 uint64_t notify = 1; 956 957 orig_thread = _get_thread(); 958 tls_thread = thread; 959 960 if (now == 0) { 961 now = spdk_get_ticks(); 962 } 963 964 if (spdk_likely(!thread->in_interrupt)) { 965 rc = thread_poll(thread, max_msgs, now); 966 if (spdk_unlikely(thread->in_interrupt)) { 967 /* The thread transitioned to interrupt mode during the above poll. 968 * Poll it one more time in case that during the transition time 969 * there is msg received without notification. 970 */ 971 rc = thread_poll(thread, max_msgs, now); 972 } 973 } else { 974 /* Non-block wait on thread's fd_group */ 975 rc = spdk_fd_group_wait(thread->fgrp, 0); 976 if (spdk_unlikely(!thread->in_interrupt)) { 977 /* The thread transitioned to poll mode in a msg during the above processing. 978 * Clear msg_fd since thread messages will be polled directly in poll mode. 979 */ 980 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 981 if (rc < 0 && errno != EAGAIN) { 982 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 983 } 984 } 985 } 986 987 988 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 989 thread_exit(thread, now); 990 } 991 992 thread_update_stats(thread, spdk_get_ticks(), now, rc); 993 994 tls_thread = orig_thread; 995 996 return rc; 997 } 998 999 uint64_t 1000 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 1001 { 1002 struct spdk_poller *poller; 1003 1004 poller = thread->first_timed_poller; 1005 if (poller) { 1006 return poller->next_run_tick; 1007 } 1008 1009 return 0; 1010 } 1011 1012 int 1013 spdk_thread_has_active_pollers(struct spdk_thread *thread) 1014 { 1015 return !TAILQ_EMPTY(&thread->active_pollers); 1016 } 1017 1018 static bool 1019 thread_has_unpaused_pollers(struct spdk_thread *thread) 1020 { 1021 if (TAILQ_EMPTY(&thread->active_pollers) && 1022 RB_EMPTY(&thread->timed_pollers)) { 1023 return false; 1024 } 1025 1026 return true; 1027 } 1028 1029 bool 1030 spdk_thread_has_pollers(struct spdk_thread *thread) 1031 { 1032 if (!thread_has_unpaused_pollers(thread) && 1033 TAILQ_EMPTY(&thread->paused_pollers)) { 1034 return false; 1035 } 1036 1037 return true; 1038 } 1039 1040 bool 1041 spdk_thread_is_idle(struct spdk_thread *thread) 1042 { 1043 if (spdk_ring_count(thread->messages) || 1044 thread_has_unpaused_pollers(thread) || 1045 thread->critical_msg != NULL) { 1046 return false; 1047 } 1048 1049 return true; 1050 } 1051 1052 uint32_t 1053 spdk_thread_get_count(void) 1054 { 1055 /* 1056 * Return cached value of the current thread count. We could acquire the 1057 * lock and iterate through the TAILQ of threads to count them, but that 1058 * count could still be invalidated after we release the lock. 1059 */ 1060 return g_thread_count; 1061 } 1062 1063 struct spdk_thread * 1064 spdk_get_thread(void) 1065 { 1066 return _get_thread(); 1067 } 1068 1069 const char * 1070 spdk_thread_get_name(const struct spdk_thread *thread) 1071 { 1072 return thread->name; 1073 } 1074 1075 uint64_t 1076 spdk_thread_get_id(const struct spdk_thread *thread) 1077 { 1078 return thread->id; 1079 } 1080 1081 struct spdk_thread * 1082 spdk_thread_get_by_id(uint64_t id) 1083 { 1084 struct spdk_thread *thread; 1085 1086 if (id == 0 || id >= g_thread_id) { 1087 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 1088 return NULL; 1089 } 1090 pthread_mutex_lock(&g_devlist_mutex); 1091 TAILQ_FOREACH(thread, &g_threads, tailq) { 1092 if (thread->id == id) { 1093 break; 1094 } 1095 } 1096 pthread_mutex_unlock(&g_devlist_mutex); 1097 return thread; 1098 } 1099 1100 int 1101 spdk_thread_get_stats(struct spdk_thread_stats *stats) 1102 { 1103 struct spdk_thread *thread; 1104 1105 thread = _get_thread(); 1106 if (!thread) { 1107 SPDK_ERRLOG("No thread allocated\n"); 1108 return -EINVAL; 1109 } 1110 1111 if (stats == NULL) { 1112 return -EINVAL; 1113 } 1114 1115 *stats = thread->stats; 1116 1117 return 0; 1118 } 1119 1120 uint64_t 1121 spdk_thread_get_last_tsc(struct spdk_thread *thread) 1122 { 1123 if (thread == NULL) { 1124 thread = _get_thread(); 1125 } 1126 1127 return thread->tsc_last; 1128 } 1129 1130 static inline int 1131 thread_send_msg_notification(const struct spdk_thread *target_thread) 1132 { 1133 uint64_t notify = 1; 1134 int rc; 1135 1136 /* Not necessary to do notification if interrupt facility is not enabled */ 1137 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 1138 return 0; 1139 } 1140 1141 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 1142 * after sending thread msg, it is necessary to check whether target thread runs in 1143 * interrupt mode and then decide whether do event notification. 1144 */ 1145 if (spdk_unlikely(target_thread->in_interrupt)) { 1146 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 1147 if (rc < 0) { 1148 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 1149 return -EIO; 1150 } 1151 } 1152 1153 return 0; 1154 } 1155 1156 int 1157 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 1158 { 1159 struct spdk_thread *local_thread; 1160 struct spdk_msg *msg; 1161 int rc; 1162 1163 assert(thread != NULL); 1164 1165 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1166 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 1167 return -EIO; 1168 } 1169 1170 local_thread = _get_thread(); 1171 1172 msg = NULL; 1173 if (local_thread != NULL) { 1174 if (local_thread->msg_cache_count > 0) { 1175 msg = SLIST_FIRST(&local_thread->msg_cache); 1176 assert(msg != NULL); 1177 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 1178 local_thread->msg_cache_count--; 1179 } 1180 } 1181 1182 if (msg == NULL) { 1183 msg = spdk_mempool_get(g_spdk_msg_mempool); 1184 if (!msg) { 1185 SPDK_ERRLOG("msg could not be allocated\n"); 1186 return -ENOMEM; 1187 } 1188 } 1189 1190 msg->fn = fn; 1191 msg->arg = ctx; 1192 1193 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 1194 if (rc != 1) { 1195 SPDK_ERRLOG("msg could not be enqueued\n"); 1196 spdk_mempool_put(g_spdk_msg_mempool, msg); 1197 return -EIO; 1198 } 1199 1200 return thread_send_msg_notification(thread); 1201 } 1202 1203 int 1204 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 1205 { 1206 spdk_msg_fn expected = NULL; 1207 1208 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 1209 __ATOMIC_SEQ_CST)) { 1210 return -EIO; 1211 } 1212 1213 return thread_send_msg_notification(thread); 1214 } 1215 1216 #ifdef __linux__ 1217 static int 1218 interrupt_timerfd_process(void *arg) 1219 { 1220 struct spdk_poller *poller = arg; 1221 uint64_t exp; 1222 int rc; 1223 1224 /* clear the level of interval timer */ 1225 rc = read(poller->interruptfd, &exp, sizeof(exp)); 1226 if (rc < 0) { 1227 if (rc == -EAGAIN) { 1228 return 0; 1229 } 1230 1231 return rc; 1232 } 1233 1234 return poller->fn(poller->arg); 1235 } 1236 1237 static int 1238 period_poller_interrupt_init(struct spdk_poller *poller) 1239 { 1240 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1241 int timerfd; 1242 int rc; 1243 1244 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1245 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1246 if (timerfd < 0) { 1247 return -errno; 1248 } 1249 1250 rc = spdk_fd_group_add(fgrp, timerfd, 1251 interrupt_timerfd_process, poller); 1252 if (rc < 0) { 1253 close(timerfd); 1254 return rc; 1255 } 1256 1257 poller->interruptfd = timerfd; 1258 return 0; 1259 } 1260 1261 static void 1262 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1263 { 1264 int timerfd = poller->interruptfd; 1265 uint64_t now_tick = spdk_get_ticks(); 1266 uint64_t ticks = spdk_get_ticks_hz(); 1267 int ret; 1268 struct itimerspec new_tv = {}; 1269 struct itimerspec old_tv = {}; 1270 1271 assert(poller->period_ticks != 0); 1272 assert(timerfd >= 0); 1273 1274 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1275 interrupt_mode ? "interrupt" : "poll"); 1276 1277 if (interrupt_mode) { 1278 /* Set repeated timer expiration */ 1279 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1280 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1281 1282 /* Update next timer expiration */ 1283 if (poller->next_run_tick == 0) { 1284 poller->next_run_tick = now_tick + poller->period_ticks; 1285 } else if (poller->next_run_tick < now_tick) { 1286 poller->next_run_tick = now_tick; 1287 } 1288 1289 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1290 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1291 1292 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1293 if (ret < 0) { 1294 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1295 assert(false); 1296 } 1297 } else { 1298 /* Disarm the timer */ 1299 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1300 if (ret < 0) { 1301 /* timerfd_settime's failure indicates that the timerfd is in error */ 1302 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1303 assert(false); 1304 } 1305 1306 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1307 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1308 */ 1309 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1310 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1311 poller_remove_timer(poller->thread, poller); 1312 poller_insert_timer(poller->thread, poller, now_tick); 1313 } 1314 } 1315 1316 static void 1317 poller_interrupt_fini(struct spdk_poller *poller) 1318 { 1319 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1320 assert(poller->interruptfd >= 0); 1321 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1322 close(poller->interruptfd); 1323 poller->interruptfd = -1; 1324 } 1325 1326 static int 1327 busy_poller_interrupt_init(struct spdk_poller *poller) 1328 { 1329 int busy_efd; 1330 int rc; 1331 1332 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1333 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1334 if (busy_efd < 0) { 1335 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1336 return -errno; 1337 } 1338 1339 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, poller->fn, poller->arg); 1340 if (rc < 0) { 1341 close(busy_efd); 1342 return rc; 1343 } 1344 1345 poller->interruptfd = busy_efd; 1346 return 0; 1347 } 1348 1349 static void 1350 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1351 { 1352 int busy_efd = poller->interruptfd; 1353 uint64_t notify = 1; 1354 int rc __attribute__((unused)); 1355 1356 assert(busy_efd >= 0); 1357 1358 if (interrupt_mode) { 1359 /* Write without read on eventfd will get it repeatedly triggered. */ 1360 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1361 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1362 } 1363 } else { 1364 /* Read on eventfd will clear its level triggering. */ 1365 rc = read(busy_efd, ¬ify, sizeof(notify)); 1366 } 1367 } 1368 1369 #else 1370 1371 static int 1372 period_poller_interrupt_init(struct spdk_poller *poller) 1373 { 1374 return -ENOTSUP; 1375 } 1376 1377 static void 1378 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1379 { 1380 } 1381 1382 static void 1383 poller_interrupt_fini(struct spdk_poller *poller) 1384 { 1385 } 1386 1387 static int 1388 busy_poller_interrupt_init(struct spdk_poller *poller) 1389 { 1390 return -ENOTSUP; 1391 } 1392 1393 static void 1394 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1395 { 1396 } 1397 1398 #endif 1399 1400 void 1401 spdk_poller_register_interrupt(struct spdk_poller *poller, 1402 spdk_poller_set_interrupt_mode_cb cb_fn, 1403 void *cb_arg) 1404 { 1405 assert(poller != NULL); 1406 assert(cb_fn != NULL); 1407 assert(spdk_get_thread() == poller->thread); 1408 1409 if (!spdk_interrupt_mode_is_enabled()) { 1410 return; 1411 } 1412 1413 /* when a poller is created we don't know if the user is ever going to 1414 * enable interrupts on it by calling this function, so the poller 1415 * registration function has to immediately create a interruptfd. 1416 * When this function does get called by user, we have to then destroy 1417 * that interruptfd. 1418 */ 1419 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1420 poller_interrupt_fini(poller); 1421 } 1422 1423 poller->set_intr_cb_fn = cb_fn; 1424 poller->set_intr_cb_arg = cb_arg; 1425 1426 /* Set poller into interrupt mode if thread is in interrupt. */ 1427 if (poller->thread->in_interrupt) { 1428 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1429 } 1430 } 1431 1432 static uint64_t 1433 convert_us_to_ticks(uint64_t us) 1434 { 1435 uint64_t quotient, remainder, ticks; 1436 1437 if (us) { 1438 quotient = us / SPDK_SEC_TO_USEC; 1439 remainder = us % SPDK_SEC_TO_USEC; 1440 ticks = spdk_get_ticks_hz(); 1441 1442 return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1443 } else { 1444 return 0; 1445 } 1446 } 1447 1448 static struct spdk_poller * 1449 poller_register(spdk_poller_fn fn, 1450 void *arg, 1451 uint64_t period_microseconds, 1452 const char *name) 1453 { 1454 struct spdk_thread *thread; 1455 struct spdk_poller *poller; 1456 1457 thread = spdk_get_thread(); 1458 if (!thread) { 1459 assert(false); 1460 return NULL; 1461 } 1462 1463 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1464 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1465 return NULL; 1466 } 1467 1468 poller = calloc(1, sizeof(*poller)); 1469 if (poller == NULL) { 1470 SPDK_ERRLOG("Poller memory allocation failed\n"); 1471 return NULL; 1472 } 1473 1474 if (name) { 1475 snprintf(poller->name, sizeof(poller->name), "%s", name); 1476 } else { 1477 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1478 } 1479 1480 poller->state = SPDK_POLLER_STATE_WAITING; 1481 poller->fn = fn; 1482 poller->arg = arg; 1483 poller->thread = thread; 1484 poller->interruptfd = -1; 1485 1486 poller->period_ticks = convert_us_to_ticks(period_microseconds); 1487 1488 if (spdk_interrupt_mode_is_enabled()) { 1489 int rc; 1490 1491 if (period_microseconds) { 1492 rc = period_poller_interrupt_init(poller); 1493 if (rc < 0) { 1494 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1495 free(poller); 1496 return NULL; 1497 } 1498 1499 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1500 } else { 1501 /* If the poller doesn't have a period, create interruptfd that's always 1502 * busy automatically when runnning in interrupt mode. 1503 */ 1504 rc = busy_poller_interrupt_init(poller); 1505 if (rc > 0) { 1506 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1507 free(poller); 1508 return NULL; 1509 } 1510 1511 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1512 } 1513 } 1514 1515 thread_insert_poller(thread, poller); 1516 1517 return poller; 1518 } 1519 1520 struct spdk_poller * 1521 spdk_poller_register(spdk_poller_fn fn, 1522 void *arg, 1523 uint64_t period_microseconds) 1524 { 1525 return poller_register(fn, arg, period_microseconds, NULL); 1526 } 1527 1528 struct spdk_poller * 1529 spdk_poller_register_named(spdk_poller_fn fn, 1530 void *arg, 1531 uint64_t period_microseconds, 1532 const char *name) 1533 { 1534 return poller_register(fn, arg, period_microseconds, name); 1535 } 1536 1537 void 1538 spdk_poller_unregister(struct spdk_poller **ppoller) 1539 { 1540 struct spdk_thread *thread; 1541 struct spdk_poller *poller; 1542 1543 poller = *ppoller; 1544 if (poller == NULL) { 1545 return; 1546 } 1547 1548 *ppoller = NULL; 1549 1550 thread = spdk_get_thread(); 1551 if (!thread) { 1552 assert(false); 1553 return; 1554 } 1555 1556 if (poller->thread != thread) { 1557 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1558 assert(false); 1559 return; 1560 } 1561 1562 if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) { 1563 poller_interrupt_fini(poller); 1564 } 1565 1566 /* If the poller was paused, put it on the active_pollers list so that 1567 * its unregistration can be processed by spdk_thread_poll(). 1568 */ 1569 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1570 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1571 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1572 poller->period_ticks = 0; 1573 } 1574 1575 /* Simply set the state to unregistered. The poller will get cleaned up 1576 * in a subsequent call to spdk_thread_poll(). 1577 */ 1578 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1579 } 1580 1581 void 1582 spdk_poller_pause(struct spdk_poller *poller) 1583 { 1584 struct spdk_thread *thread; 1585 1586 thread = spdk_get_thread(); 1587 if (!thread) { 1588 assert(false); 1589 return; 1590 } 1591 1592 if (poller->thread != thread) { 1593 SPDK_ERRLOG("different from the thread that called spdk_poller_pause()\n"); 1594 assert(false); 1595 return; 1596 } 1597 1598 /* We just set its state to SPDK_POLLER_STATE_PAUSING and let 1599 * spdk_thread_poll() move it. It allows a poller to be paused from 1600 * another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE 1601 * iteration, or from within itself without breaking the logic to always 1602 * remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration. 1603 */ 1604 switch (poller->state) { 1605 case SPDK_POLLER_STATE_PAUSED: 1606 case SPDK_POLLER_STATE_PAUSING: 1607 break; 1608 case SPDK_POLLER_STATE_RUNNING: 1609 case SPDK_POLLER_STATE_WAITING: 1610 poller->state = SPDK_POLLER_STATE_PAUSING; 1611 break; 1612 default: 1613 assert(false); 1614 break; 1615 } 1616 } 1617 1618 void 1619 spdk_poller_resume(struct spdk_poller *poller) 1620 { 1621 struct spdk_thread *thread; 1622 1623 thread = spdk_get_thread(); 1624 if (!thread) { 1625 assert(false); 1626 return; 1627 } 1628 1629 if (poller->thread != thread) { 1630 SPDK_ERRLOG("different from the thread that called spdk_poller_resume()\n"); 1631 assert(false); 1632 return; 1633 } 1634 1635 /* If a poller is paused it has to be removed from the paused pollers 1636 * list and put on the active list or timer tree depending on its 1637 * period_ticks. If a poller is still in the process of being paused, 1638 * we just need to flip its state back to waiting, as it's already on 1639 * the appropriate list or tree. 1640 */ 1641 switch (poller->state) { 1642 case SPDK_POLLER_STATE_PAUSED: 1643 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1644 thread_insert_poller(thread, poller); 1645 /* fallthrough */ 1646 case SPDK_POLLER_STATE_PAUSING: 1647 poller->state = SPDK_POLLER_STATE_WAITING; 1648 break; 1649 case SPDK_POLLER_STATE_RUNNING: 1650 case SPDK_POLLER_STATE_WAITING: 1651 break; 1652 default: 1653 assert(false); 1654 break; 1655 } 1656 } 1657 1658 const char * 1659 spdk_poller_get_name(struct spdk_poller *poller) 1660 { 1661 return poller->name; 1662 } 1663 1664 const char * 1665 spdk_poller_get_state_str(struct spdk_poller *poller) 1666 { 1667 switch (poller->state) { 1668 case SPDK_POLLER_STATE_WAITING: 1669 return "waiting"; 1670 case SPDK_POLLER_STATE_RUNNING: 1671 return "running"; 1672 case SPDK_POLLER_STATE_UNREGISTERED: 1673 return "unregistered"; 1674 case SPDK_POLLER_STATE_PAUSING: 1675 return "pausing"; 1676 case SPDK_POLLER_STATE_PAUSED: 1677 return "paused"; 1678 default: 1679 return NULL; 1680 } 1681 } 1682 1683 uint64_t 1684 spdk_poller_get_period_ticks(struct spdk_poller *poller) 1685 { 1686 return poller->period_ticks; 1687 } 1688 1689 void 1690 spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats) 1691 { 1692 stats->run_count = poller->run_count; 1693 stats->busy_count = poller->busy_count; 1694 } 1695 1696 struct spdk_poller * 1697 spdk_thread_get_first_active_poller(struct spdk_thread *thread) 1698 { 1699 return TAILQ_FIRST(&thread->active_pollers); 1700 } 1701 1702 struct spdk_poller * 1703 spdk_thread_get_next_active_poller(struct spdk_poller *prev) 1704 { 1705 return TAILQ_NEXT(prev, tailq); 1706 } 1707 1708 struct spdk_poller * 1709 spdk_thread_get_first_timed_poller(struct spdk_thread *thread) 1710 { 1711 return RB_MIN(timed_pollers_tree, &thread->timed_pollers); 1712 } 1713 1714 struct spdk_poller * 1715 spdk_thread_get_next_timed_poller(struct spdk_poller *prev) 1716 { 1717 return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); 1718 } 1719 1720 struct spdk_poller * 1721 spdk_thread_get_first_paused_poller(struct spdk_thread *thread) 1722 { 1723 return TAILQ_FIRST(&thread->paused_pollers); 1724 } 1725 1726 struct spdk_poller * 1727 spdk_thread_get_next_paused_poller(struct spdk_poller *prev) 1728 { 1729 return TAILQ_NEXT(prev, tailq); 1730 } 1731 1732 struct spdk_io_channel * 1733 spdk_thread_get_first_io_channel(struct spdk_thread *thread) 1734 { 1735 return TAILQ_FIRST(&thread->io_channels); 1736 } 1737 1738 struct spdk_io_channel * 1739 spdk_thread_get_next_io_channel(struct spdk_io_channel *prev) 1740 { 1741 return TAILQ_NEXT(prev, tailq); 1742 } 1743 1744 struct call_thread { 1745 struct spdk_thread *cur_thread; 1746 spdk_msg_fn fn; 1747 void *ctx; 1748 1749 struct spdk_thread *orig_thread; 1750 spdk_msg_fn cpl; 1751 }; 1752 1753 static void 1754 _on_thread(void *ctx) 1755 { 1756 struct call_thread *ct = ctx; 1757 int rc __attribute__((unused)); 1758 1759 ct->fn(ct->ctx); 1760 1761 pthread_mutex_lock(&g_devlist_mutex); 1762 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1763 pthread_mutex_unlock(&g_devlist_mutex); 1764 1765 if (!ct->cur_thread) { 1766 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1767 1768 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1769 free(ctx); 1770 } else { 1771 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1772 ct->cur_thread->name); 1773 1774 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1775 } 1776 assert(rc == 0); 1777 } 1778 1779 void 1780 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1781 { 1782 struct call_thread *ct; 1783 struct spdk_thread *thread; 1784 int rc __attribute__((unused)); 1785 1786 ct = calloc(1, sizeof(*ct)); 1787 if (!ct) { 1788 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1789 cpl(ctx); 1790 return; 1791 } 1792 1793 ct->fn = fn; 1794 ct->ctx = ctx; 1795 ct->cpl = cpl; 1796 1797 thread = _get_thread(); 1798 if (!thread) { 1799 SPDK_ERRLOG("No thread allocated\n"); 1800 free(ct); 1801 cpl(ctx); 1802 return; 1803 } 1804 ct->orig_thread = thread; 1805 1806 pthread_mutex_lock(&g_devlist_mutex); 1807 ct->cur_thread = TAILQ_FIRST(&g_threads); 1808 pthread_mutex_unlock(&g_devlist_mutex); 1809 1810 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1811 ct->orig_thread->name); 1812 1813 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1814 assert(rc == 0); 1815 } 1816 1817 static inline void 1818 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1819 { 1820 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1821 return; 1822 } 1823 1824 if (!poller->set_intr_cb_fn) { 1825 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1826 assert(false); 1827 return; 1828 } 1829 1830 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1831 } 1832 1833 void 1834 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1835 { 1836 struct spdk_thread *thread = _get_thread(); 1837 struct spdk_poller *poller, *tmp; 1838 1839 assert(thread); 1840 assert(spdk_interrupt_mode_is_enabled()); 1841 1842 if (thread->in_interrupt == enable_interrupt) { 1843 return; 1844 } 1845 1846 /* Set pollers to expected mode */ 1847 RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { 1848 poller_set_interrupt_mode(poller, enable_interrupt); 1849 } 1850 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1851 poller_set_interrupt_mode(poller, enable_interrupt); 1852 } 1853 /* All paused pollers will go to work in interrupt mode */ 1854 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1855 poller_set_interrupt_mode(poller, enable_interrupt); 1856 } 1857 1858 thread->in_interrupt = enable_interrupt; 1859 return; 1860 } 1861 1862 void 1863 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1864 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1865 const char *name) 1866 { 1867 struct io_device *dev, *tmp; 1868 struct spdk_thread *thread; 1869 1870 assert(io_device != NULL); 1871 assert(create_cb != NULL); 1872 assert(destroy_cb != NULL); 1873 1874 thread = spdk_get_thread(); 1875 if (!thread) { 1876 SPDK_ERRLOG("called from non-SPDK thread\n"); 1877 assert(false); 1878 return; 1879 } 1880 1881 dev = calloc(1, sizeof(struct io_device)); 1882 if (dev == NULL) { 1883 SPDK_ERRLOG("could not allocate io_device\n"); 1884 return; 1885 } 1886 1887 dev->io_device = io_device; 1888 if (name) { 1889 snprintf(dev->name, sizeof(dev->name), "%s", name); 1890 } else { 1891 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1892 } 1893 dev->create_cb = create_cb; 1894 dev->destroy_cb = destroy_cb; 1895 dev->unregister_cb = NULL; 1896 dev->ctx_size = ctx_size; 1897 dev->for_each_count = 0; 1898 dev->unregistered = false; 1899 dev->refcnt = 0; 1900 1901 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1902 dev->name, dev->io_device, thread->name); 1903 1904 pthread_mutex_lock(&g_devlist_mutex); 1905 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1906 if (tmp->io_device == io_device) { 1907 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1908 io_device, tmp->name, dev->name); 1909 free(dev); 1910 pthread_mutex_unlock(&g_devlist_mutex); 1911 return; 1912 } 1913 } 1914 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1915 pthread_mutex_unlock(&g_devlist_mutex); 1916 } 1917 1918 static void 1919 _finish_unregister(void *arg) 1920 { 1921 struct io_device *dev = arg; 1922 struct spdk_thread *thread; 1923 1924 thread = spdk_get_thread(); 1925 assert(thread == dev->unregister_thread); 1926 1927 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1928 dev->name, dev->io_device, thread->name); 1929 1930 assert(thread->pending_unregister_count > 0); 1931 thread->pending_unregister_count--; 1932 1933 dev->unregister_cb(dev->io_device); 1934 free(dev); 1935 } 1936 1937 static void 1938 io_device_free(struct io_device *dev) 1939 { 1940 int rc __attribute__((unused)); 1941 1942 if (dev->unregister_cb == NULL) { 1943 free(dev); 1944 } else { 1945 assert(dev->unregister_thread != NULL); 1946 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1947 dev->name, dev->io_device, dev->unregister_thread->name); 1948 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1949 assert(rc == 0); 1950 } 1951 } 1952 1953 void 1954 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1955 { 1956 struct io_device *dev; 1957 uint32_t refcnt; 1958 struct spdk_thread *thread; 1959 1960 thread = spdk_get_thread(); 1961 if (!thread) { 1962 SPDK_ERRLOG("called from non-SPDK thread\n"); 1963 assert(false); 1964 return; 1965 } 1966 1967 pthread_mutex_lock(&g_devlist_mutex); 1968 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1969 if (dev->io_device == io_device) { 1970 break; 1971 } 1972 } 1973 1974 if (!dev) { 1975 SPDK_ERRLOG("io_device %p not found\n", io_device); 1976 assert(false); 1977 pthread_mutex_unlock(&g_devlist_mutex); 1978 return; 1979 } 1980 1981 if (dev->for_each_count > 0) { 1982 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1983 dev->name, io_device, dev->for_each_count); 1984 pthread_mutex_unlock(&g_devlist_mutex); 1985 return; 1986 } 1987 1988 dev->unregister_cb = unregister_cb; 1989 dev->unregistered = true; 1990 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1991 refcnt = dev->refcnt; 1992 dev->unregister_thread = thread; 1993 pthread_mutex_unlock(&g_devlist_mutex); 1994 1995 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1996 dev->name, dev->io_device, thread->name); 1997 1998 if (unregister_cb) { 1999 thread->pending_unregister_count++; 2000 } 2001 2002 if (refcnt > 0) { 2003 /* defer deletion */ 2004 return; 2005 } 2006 2007 io_device_free(dev); 2008 } 2009 2010 const char * 2011 spdk_io_device_get_name(struct io_device *dev) 2012 { 2013 return dev->name; 2014 } 2015 2016 struct spdk_io_channel * 2017 spdk_get_io_channel(void *io_device) 2018 { 2019 struct spdk_io_channel *ch; 2020 struct spdk_thread *thread; 2021 struct io_device *dev; 2022 int rc; 2023 2024 pthread_mutex_lock(&g_devlist_mutex); 2025 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 2026 if (dev->io_device == io_device) { 2027 break; 2028 } 2029 } 2030 if (dev == NULL) { 2031 SPDK_ERRLOG("could not find io_device %p\n", io_device); 2032 pthread_mutex_unlock(&g_devlist_mutex); 2033 return NULL; 2034 } 2035 2036 thread = _get_thread(); 2037 if (!thread) { 2038 SPDK_ERRLOG("No thread allocated\n"); 2039 pthread_mutex_unlock(&g_devlist_mutex); 2040 return NULL; 2041 } 2042 2043 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 2044 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 2045 pthread_mutex_unlock(&g_devlist_mutex); 2046 return NULL; 2047 } 2048 2049 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2050 if (ch->dev == dev) { 2051 ch->ref++; 2052 2053 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2054 ch, dev->name, dev->io_device, thread->name, ch->ref); 2055 2056 /* 2057 * An I/O channel already exists for this device on this 2058 * thread, so return it. 2059 */ 2060 pthread_mutex_unlock(&g_devlist_mutex); 2061 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, 2062 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2063 return ch; 2064 } 2065 } 2066 2067 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 2068 if (ch == NULL) { 2069 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 2070 pthread_mutex_unlock(&g_devlist_mutex); 2071 return NULL; 2072 } 2073 2074 ch->dev = dev; 2075 ch->destroy_cb = dev->destroy_cb; 2076 ch->thread = thread; 2077 ch->ref = 1; 2078 ch->destroy_ref = 0; 2079 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 2080 2081 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2082 ch, dev->name, dev->io_device, thread->name, ch->ref); 2083 2084 dev->refcnt++; 2085 2086 pthread_mutex_unlock(&g_devlist_mutex); 2087 2088 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 2089 if (rc != 0) { 2090 pthread_mutex_lock(&g_devlist_mutex); 2091 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 2092 dev->refcnt--; 2093 free(ch); 2094 pthread_mutex_unlock(&g_devlist_mutex); 2095 return NULL; 2096 } 2097 2098 spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1); 2099 return ch; 2100 } 2101 2102 static void 2103 put_io_channel(void *arg) 2104 { 2105 struct spdk_io_channel *ch = arg; 2106 bool do_remove_dev = true; 2107 struct spdk_thread *thread; 2108 2109 thread = spdk_get_thread(); 2110 if (!thread) { 2111 SPDK_ERRLOG("called from non-SPDK thread\n"); 2112 assert(false); 2113 return; 2114 } 2115 2116 SPDK_DEBUGLOG(thread, 2117 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 2118 ch, ch->dev->name, ch->dev->io_device, thread->name); 2119 2120 assert(ch->thread == thread); 2121 2122 ch->destroy_ref--; 2123 2124 if (ch->ref > 0 || ch->destroy_ref > 0) { 2125 /* 2126 * Another reference to the associated io_device was requested 2127 * after this message was sent but before it had a chance to 2128 * execute. 2129 */ 2130 return; 2131 } 2132 2133 pthread_mutex_lock(&g_devlist_mutex); 2134 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 2135 pthread_mutex_unlock(&g_devlist_mutex); 2136 2137 /* Don't hold the devlist mutex while the destroy_cb is called. */ 2138 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 2139 2140 pthread_mutex_lock(&g_devlist_mutex); 2141 ch->dev->refcnt--; 2142 2143 if (!ch->dev->unregistered) { 2144 do_remove_dev = false; 2145 } 2146 2147 if (ch->dev->refcnt > 0) { 2148 do_remove_dev = false; 2149 } 2150 2151 pthread_mutex_unlock(&g_devlist_mutex); 2152 2153 if (do_remove_dev) { 2154 io_device_free(ch->dev); 2155 } 2156 free(ch); 2157 } 2158 2159 void 2160 spdk_put_io_channel(struct spdk_io_channel *ch) 2161 { 2162 struct spdk_thread *thread; 2163 int rc __attribute__((unused)); 2164 2165 spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0, 2166 (uint64_t)spdk_io_channel_get_ctx(ch), ch->ref); 2167 2168 thread = spdk_get_thread(); 2169 if (!thread) { 2170 SPDK_ERRLOG("called from non-SPDK thread\n"); 2171 assert(false); 2172 return; 2173 } 2174 2175 if (ch->thread != thread) { 2176 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 2177 assert(false); 2178 return; 2179 } 2180 2181 SPDK_DEBUGLOG(thread, 2182 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 2183 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 2184 2185 ch->ref--; 2186 2187 if (ch->ref == 0) { 2188 ch->destroy_ref++; 2189 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 2190 assert(rc == 0); 2191 } 2192 } 2193 2194 struct spdk_io_channel * 2195 spdk_io_channel_from_ctx(void *ctx) 2196 { 2197 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 2198 } 2199 2200 struct spdk_thread * 2201 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 2202 { 2203 return ch->thread; 2204 } 2205 2206 void * 2207 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 2208 { 2209 return ch->dev->io_device; 2210 } 2211 2212 int 2213 spdk_io_channel_get_ref_count(struct spdk_io_channel *ch) 2214 { 2215 return ch->ref; 2216 } 2217 2218 struct spdk_io_channel_iter { 2219 void *io_device; 2220 struct io_device *dev; 2221 spdk_channel_msg fn; 2222 int status; 2223 void *ctx; 2224 struct spdk_io_channel *ch; 2225 2226 struct spdk_thread *cur_thread; 2227 2228 struct spdk_thread *orig_thread; 2229 spdk_channel_for_each_cpl cpl; 2230 }; 2231 2232 void * 2233 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 2234 { 2235 return i->io_device; 2236 } 2237 2238 struct spdk_io_channel * 2239 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 2240 { 2241 return i->ch; 2242 } 2243 2244 void * 2245 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 2246 { 2247 return i->ctx; 2248 } 2249 2250 static void 2251 _call_completion(void *ctx) 2252 { 2253 struct spdk_io_channel_iter *i = ctx; 2254 2255 if (i->cpl != NULL) { 2256 i->cpl(i, i->status); 2257 } 2258 free(i); 2259 } 2260 2261 static void 2262 _call_channel(void *ctx) 2263 { 2264 struct spdk_io_channel_iter *i = ctx; 2265 struct spdk_io_channel *ch; 2266 2267 /* 2268 * It is possible that the channel was deleted before this 2269 * message had a chance to execute. If so, skip calling 2270 * the fn() on this thread. 2271 */ 2272 pthread_mutex_lock(&g_devlist_mutex); 2273 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 2274 if (ch->dev->io_device == i->io_device) { 2275 break; 2276 } 2277 } 2278 pthread_mutex_unlock(&g_devlist_mutex); 2279 2280 if (ch) { 2281 i->fn(i); 2282 } else { 2283 spdk_for_each_channel_continue(i, 0); 2284 } 2285 } 2286 2287 void 2288 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 2289 spdk_channel_for_each_cpl cpl) 2290 { 2291 struct spdk_thread *thread; 2292 struct spdk_io_channel *ch; 2293 struct spdk_io_channel_iter *i; 2294 int rc __attribute__((unused)); 2295 2296 i = calloc(1, sizeof(*i)); 2297 if (!i) { 2298 SPDK_ERRLOG("Unable to allocate iterator\n"); 2299 return; 2300 } 2301 2302 i->io_device = io_device; 2303 i->fn = fn; 2304 i->ctx = ctx; 2305 i->cpl = cpl; 2306 2307 pthread_mutex_lock(&g_devlist_mutex); 2308 i->orig_thread = _get_thread(); 2309 2310 TAILQ_FOREACH(thread, &g_threads, tailq) { 2311 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2312 if (ch->dev->io_device == io_device) { 2313 ch->dev->for_each_count++; 2314 i->dev = ch->dev; 2315 i->cur_thread = thread; 2316 i->ch = ch; 2317 pthread_mutex_unlock(&g_devlist_mutex); 2318 rc = spdk_thread_send_msg(thread, _call_channel, i); 2319 assert(rc == 0); 2320 return; 2321 } 2322 } 2323 } 2324 2325 pthread_mutex_unlock(&g_devlist_mutex); 2326 2327 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2328 assert(rc == 0); 2329 } 2330 2331 void 2332 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2333 { 2334 struct spdk_thread *thread; 2335 struct spdk_io_channel *ch; 2336 int rc __attribute__((unused)); 2337 2338 assert(i->cur_thread == spdk_get_thread()); 2339 2340 i->status = status; 2341 2342 pthread_mutex_lock(&g_devlist_mutex); 2343 if (status) { 2344 goto end; 2345 } 2346 thread = TAILQ_NEXT(i->cur_thread, tailq); 2347 while (thread) { 2348 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2349 if (ch->dev->io_device == i->io_device) { 2350 i->cur_thread = thread; 2351 i->ch = ch; 2352 pthread_mutex_unlock(&g_devlist_mutex); 2353 rc = spdk_thread_send_msg(thread, _call_channel, i); 2354 assert(rc == 0); 2355 return; 2356 } 2357 } 2358 thread = TAILQ_NEXT(thread, tailq); 2359 } 2360 2361 end: 2362 i->dev->for_each_count--; 2363 i->ch = NULL; 2364 pthread_mutex_unlock(&g_devlist_mutex); 2365 2366 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2367 assert(rc == 0); 2368 } 2369 2370 struct spdk_interrupt { 2371 int efd; 2372 struct spdk_thread *thread; 2373 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2374 }; 2375 2376 static void 2377 thread_interrupt_destroy(struct spdk_thread *thread) 2378 { 2379 struct spdk_fd_group *fgrp = thread->fgrp; 2380 2381 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2382 2383 if (thread->msg_fd < 0) { 2384 return; 2385 } 2386 2387 spdk_fd_group_remove(fgrp, thread->msg_fd); 2388 close(thread->msg_fd); 2389 thread->msg_fd = -1; 2390 2391 spdk_fd_group_destroy(fgrp); 2392 thread->fgrp = NULL; 2393 } 2394 2395 #ifdef __linux__ 2396 static int 2397 thread_interrupt_msg_process(void *arg) 2398 { 2399 struct spdk_thread *thread = arg; 2400 uint32_t msg_count; 2401 spdk_msg_fn critical_msg; 2402 int rc = 0; 2403 uint64_t notify = 1; 2404 2405 assert(spdk_interrupt_mode_is_enabled()); 2406 2407 /* There may be race between msg_acknowledge and another producer's msg_notify, 2408 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2409 * This can avoid msg notification missing. 2410 */ 2411 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2412 if (rc < 0 && errno != EAGAIN) { 2413 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2414 } 2415 2416 critical_msg = thread->critical_msg; 2417 if (spdk_unlikely(critical_msg != NULL)) { 2418 critical_msg(NULL); 2419 thread->critical_msg = NULL; 2420 rc = 1; 2421 } 2422 2423 msg_count = msg_queue_run_batch(thread, 0); 2424 if (msg_count) { 2425 rc = 1; 2426 } 2427 2428 return rc; 2429 } 2430 2431 static int 2432 thread_interrupt_create(struct spdk_thread *thread) 2433 { 2434 int rc; 2435 2436 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2437 2438 rc = spdk_fd_group_create(&thread->fgrp); 2439 if (rc) { 2440 return rc; 2441 } 2442 2443 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2444 if (thread->msg_fd < 0) { 2445 rc = -errno; 2446 spdk_fd_group_destroy(thread->fgrp); 2447 thread->fgrp = NULL; 2448 2449 return rc; 2450 } 2451 2452 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 2453 } 2454 #else 2455 static int 2456 thread_interrupt_create(struct spdk_thread *thread) 2457 { 2458 return -ENOTSUP; 2459 } 2460 #endif 2461 2462 struct spdk_interrupt * 2463 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2464 void *arg, const char *name) 2465 { 2466 struct spdk_thread *thread; 2467 struct spdk_interrupt *intr; 2468 2469 thread = spdk_get_thread(); 2470 if (!thread) { 2471 assert(false); 2472 return NULL; 2473 } 2474 2475 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2476 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2477 return NULL; 2478 } 2479 2480 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 2481 return NULL; 2482 } 2483 2484 intr = calloc(1, sizeof(*intr)); 2485 if (intr == NULL) { 2486 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2487 return NULL; 2488 } 2489 2490 if (name) { 2491 snprintf(intr->name, sizeof(intr->name), "%s", name); 2492 } else { 2493 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2494 } 2495 2496 intr->efd = efd; 2497 intr->thread = thread; 2498 2499 return intr; 2500 } 2501 2502 void 2503 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2504 { 2505 struct spdk_thread *thread; 2506 struct spdk_interrupt *intr; 2507 2508 intr = *pintr; 2509 if (intr == NULL) { 2510 return; 2511 } 2512 2513 *pintr = NULL; 2514 2515 thread = spdk_get_thread(); 2516 if (!thread) { 2517 assert(false); 2518 return; 2519 } 2520 2521 if (intr->thread != thread) { 2522 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2523 assert(false); 2524 return; 2525 } 2526 2527 spdk_fd_group_remove(thread->fgrp, intr->efd); 2528 free(intr); 2529 } 2530 2531 int 2532 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2533 enum spdk_interrupt_event_types event_types) 2534 { 2535 struct spdk_thread *thread; 2536 2537 thread = spdk_get_thread(); 2538 if (!thread) { 2539 assert(false); 2540 return -EINVAL; 2541 } 2542 2543 if (intr->thread != thread) { 2544 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2545 assert(false); 2546 return -EINVAL; 2547 } 2548 2549 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2550 } 2551 2552 int 2553 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2554 { 2555 return spdk_fd_group_get_fd(thread->fgrp); 2556 } 2557 2558 static bool g_interrupt_mode = false; 2559 2560 int 2561 spdk_interrupt_mode_enable(void) 2562 { 2563 /* It must be called once prior to initializing the threading library. 2564 * g_spdk_msg_mempool will be valid if thread library is initialized. 2565 */ 2566 if (g_spdk_msg_mempool) { 2567 SPDK_ERRLOG("Failed due to threading library is already initailzied.\n"); 2568 return -1; 2569 } 2570 2571 #ifdef __linux__ 2572 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2573 g_interrupt_mode = true; 2574 return 0; 2575 #else 2576 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2577 g_interrupt_mode = false; 2578 return -ENOTSUP; 2579 #endif 2580 } 2581 2582 bool 2583 spdk_interrupt_mode_is_enabled(void) 2584 { 2585 return g_interrupt_mode; 2586 } 2587 2588 SPDK_LOG_REGISTER_COMPONENT(thread) 2589