1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 43 #include "spdk_internal/log.h" 44 #include "spdk_internal/thread.h" 45 46 #define SPDK_MSG_BATCH_SIZE 8 47 #define SPDK_MAX_DEVICE_NAME_LEN 256 48 #define SPDK_MAX_THREAD_NAME_LEN 256 49 50 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 51 52 static spdk_new_thread_fn g_new_thread_fn = NULL; 53 static size_t g_ctx_sz = 0; 54 55 struct io_device { 56 void *io_device; 57 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 58 spdk_io_channel_create_cb create_cb; 59 spdk_io_channel_destroy_cb destroy_cb; 60 spdk_io_device_unregister_cb unregister_cb; 61 struct spdk_thread *unregister_thread; 62 uint32_t ctx_size; 63 uint32_t for_each_count; 64 TAILQ_ENTRY(io_device) tailq; 65 66 uint32_t refcnt; 67 68 bool unregistered; 69 }; 70 71 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 72 73 struct spdk_msg { 74 spdk_msg_fn fn; 75 void *arg; 76 77 SLIST_ENTRY(spdk_msg) link; 78 }; 79 80 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 81 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 82 83 enum spdk_poller_state { 84 /* The poller is registered with a thread but not currently executing its fn. */ 85 SPDK_POLLER_STATE_WAITING, 86 87 /* The poller is currently running its fn. */ 88 SPDK_POLLER_STATE_RUNNING, 89 90 /* The poller was unregistered during the execution of its fn. */ 91 SPDK_POLLER_STATE_UNREGISTERED, 92 93 /* The poller is in the process of being paused. It will be paused 94 * during the next time it's supposed to be executed. 95 */ 96 SPDK_POLLER_STATE_PAUSING, 97 98 /* The poller is registered but currently paused. It's on the 99 * paused_pollers list. 100 */ 101 SPDK_POLLER_STATE_PAUSED, 102 }; 103 104 struct spdk_poller { 105 TAILQ_ENTRY(spdk_poller) tailq; 106 107 /* Current state of the poller; should only be accessed from the poller's thread. */ 108 enum spdk_poller_state state; 109 110 uint64_t period_ticks; 111 uint64_t next_run_tick; 112 spdk_poller_fn fn; 113 void *arg; 114 }; 115 116 struct spdk_thread { 117 TAILQ_HEAD(, spdk_io_channel) io_channels; 118 TAILQ_ENTRY(spdk_thread) tailq; 119 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 120 121 bool exit; 122 123 struct spdk_cpuset cpumask; 124 125 uint64_t tsc_last; 126 struct spdk_thread_stats stats; 127 128 /* 129 * Contains pollers actively running on this thread. Pollers 130 * are run round-robin. The thread takes one poller from the head 131 * of the ring, executes it, then puts it back at the tail of 132 * the ring. 133 */ 134 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 135 136 /** 137 * Contains pollers running on this thread with a periodic timer. 138 */ 139 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 140 141 /* 142 * Contains paused pollers. Pollers on this queue are waiting until 143 * they are resumed (in which case they're put onto the active/timer 144 * queues) or unregistered. 145 */ 146 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 147 148 struct spdk_ring *messages; 149 150 SLIST_HEAD(, spdk_msg) msg_cache; 151 size_t msg_cache_count; 152 153 spdk_msg_fn critical_msg; 154 155 /* User context allocated at the end */ 156 uint8_t ctx[0]; 157 }; 158 159 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 160 static uint32_t g_thread_count = 0; 161 162 static __thread struct spdk_thread *tls_thread = NULL; 163 164 static inline struct spdk_thread * 165 _get_thread(void) 166 { 167 return tls_thread; 168 } 169 170 int 171 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 172 { 173 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 174 175 assert(g_new_thread_fn == NULL); 176 g_new_thread_fn = new_thread_fn; 177 178 g_ctx_sz = ctx_sz; 179 180 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 181 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 182 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 183 sizeof(struct spdk_msg), 184 0, /* No cache. We do our own. */ 185 SPDK_ENV_SOCKET_ID_ANY); 186 187 if (!g_spdk_msg_mempool) { 188 return -1; 189 } 190 191 return 0; 192 } 193 194 void 195 spdk_thread_lib_fini(void) 196 { 197 struct io_device *dev; 198 199 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 200 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 201 } 202 203 if (g_spdk_msg_mempool) { 204 spdk_mempool_free(g_spdk_msg_mempool); 205 g_spdk_msg_mempool = NULL; 206 } 207 208 g_new_thread_fn = NULL; 209 g_ctx_sz = 0; 210 } 211 212 static void 213 _free_thread(struct spdk_thread *thread) 214 { 215 struct spdk_io_channel *ch; 216 struct spdk_msg *msg; 217 struct spdk_poller *poller, *ptmp; 218 219 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 220 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 221 thread->name, ch->dev->name); 222 } 223 224 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 225 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 226 SPDK_WARNLOG("poller %p still registered at thread exit\n", 227 poller); 228 } 229 230 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 231 free(poller); 232 } 233 234 235 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { 236 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 237 SPDK_WARNLOG("poller %p still registered at thread exit\n", 238 poller); 239 } 240 241 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 242 free(poller); 243 } 244 245 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 246 SPDK_WARNLOG("poller %p still registered at thread exit\n", poller); 247 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 248 free(poller); 249 } 250 251 pthread_mutex_lock(&g_devlist_mutex); 252 assert(g_thread_count > 0); 253 g_thread_count--; 254 TAILQ_REMOVE(&g_threads, thread, tailq); 255 pthread_mutex_unlock(&g_devlist_mutex); 256 257 msg = SLIST_FIRST(&thread->msg_cache); 258 while (msg != NULL) { 259 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 260 261 assert(thread->msg_cache_count > 0); 262 thread->msg_cache_count--; 263 spdk_mempool_put(g_spdk_msg_mempool, msg); 264 265 msg = SLIST_FIRST(&thread->msg_cache); 266 } 267 268 assert(thread->msg_cache_count == 0); 269 270 spdk_ring_free(thread->messages); 271 free(thread); 272 } 273 274 struct spdk_thread * 275 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 276 { 277 struct spdk_thread *thread; 278 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 279 int rc, i; 280 281 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 282 if (!thread) { 283 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 284 return NULL; 285 } 286 287 if (cpumask) { 288 spdk_cpuset_copy(&thread->cpumask, cpumask); 289 } else { 290 spdk_cpuset_negate(&thread->cpumask); 291 } 292 293 TAILQ_INIT(&thread->io_channels); 294 TAILQ_INIT(&thread->active_pollers); 295 TAILQ_INIT(&thread->timer_pollers); 296 TAILQ_INIT(&thread->paused_pollers); 297 SLIST_INIT(&thread->msg_cache); 298 thread->msg_cache_count = 0; 299 300 thread->tsc_last = spdk_get_ticks(); 301 302 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 303 if (!thread->messages) { 304 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 305 free(thread); 306 return NULL; 307 } 308 309 /* Fill the local message pool cache. */ 310 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 311 if (rc == 0) { 312 /* If we can't populate the cache it's ok. The cache will get filled 313 * up organically as messages are passed to the thread. */ 314 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 315 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 316 thread->msg_cache_count++; 317 } 318 } 319 320 if (name) { 321 snprintf(thread->name, sizeof(thread->name), "%s", name); 322 } else { 323 snprintf(thread->name, sizeof(thread->name), "%p", thread); 324 } 325 326 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 327 328 pthread_mutex_lock(&g_devlist_mutex); 329 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 330 g_thread_count++; 331 pthread_mutex_unlock(&g_devlist_mutex); 332 333 if (g_new_thread_fn) { 334 rc = g_new_thread_fn(thread); 335 if (rc != 0) { 336 _free_thread(thread); 337 return NULL; 338 } 339 } 340 341 return thread; 342 } 343 344 void 345 spdk_set_thread(struct spdk_thread *thread) 346 { 347 tls_thread = thread; 348 } 349 350 void 351 spdk_thread_exit(struct spdk_thread *thread) 352 { 353 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); 354 355 assert(tls_thread == thread); 356 357 thread->exit = true; 358 } 359 360 void 361 spdk_thread_destroy(struct spdk_thread *thread) 362 { 363 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); 364 365 assert(thread->exit == true); 366 367 if (tls_thread == thread) { 368 tls_thread = NULL; 369 } 370 371 _free_thread(thread); 372 } 373 374 void * 375 spdk_thread_get_ctx(struct spdk_thread *thread) 376 { 377 if (g_ctx_sz > 0) { 378 return thread->ctx; 379 } 380 381 return NULL; 382 } 383 384 struct spdk_cpuset * 385 spdk_thread_get_cpumask(struct spdk_thread *thread) 386 { 387 return &thread->cpumask; 388 } 389 390 struct spdk_thread * 391 spdk_thread_get_from_ctx(void *ctx) 392 { 393 if (ctx == NULL) { 394 assert(false); 395 return NULL; 396 } 397 398 assert(g_ctx_sz > 0); 399 400 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 401 } 402 403 static inline uint32_t 404 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 405 { 406 unsigned count, i; 407 void *messages[SPDK_MSG_BATCH_SIZE]; 408 409 #ifdef DEBUG 410 /* 411 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 412 * so we will never actually read uninitialized data from events, but just to be sure 413 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 414 */ 415 memset(messages, 0, sizeof(messages)); 416 #endif 417 418 if (max_msgs > 0) { 419 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 420 } else { 421 max_msgs = SPDK_MSG_BATCH_SIZE; 422 } 423 424 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 425 if (count == 0) { 426 return 0; 427 } 428 429 for (i = 0; i < count; i++) { 430 struct spdk_msg *msg = messages[i]; 431 432 assert(msg != NULL); 433 msg->fn(msg->arg); 434 435 if (thread->exit) { 436 break; 437 } 438 439 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 440 /* Insert the messages at the head. We want to re-use the hot 441 * ones. */ 442 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 443 thread->msg_cache_count++; 444 } else { 445 spdk_mempool_put(g_spdk_msg_mempool, msg); 446 } 447 } 448 449 return count; 450 } 451 452 static void 453 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 454 { 455 struct spdk_poller *iter; 456 457 poller->next_run_tick = now + poller->period_ticks; 458 459 /* 460 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 461 * run time. 462 */ 463 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 464 if (iter->next_run_tick <= poller->next_run_tick) { 465 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 466 return; 467 } 468 } 469 470 /* No earlier pollers were found, so this poller must be the new head */ 471 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 472 } 473 474 static void 475 _spdk_thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 476 { 477 if (poller->period_ticks) { 478 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 479 } else { 480 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 481 } 482 } 483 484 int 485 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 486 { 487 uint32_t msg_count; 488 struct spdk_thread *orig_thread; 489 struct spdk_poller *poller, *tmp; 490 spdk_msg_fn critical_msg; 491 int rc = 0; 492 493 orig_thread = _get_thread(); 494 tls_thread = thread; 495 496 if (now == 0) { 497 now = spdk_get_ticks(); 498 } 499 500 critical_msg = thread->critical_msg; 501 if (spdk_unlikely(critical_msg != NULL)) { 502 critical_msg(NULL); 503 thread->critical_msg = NULL; 504 } 505 506 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 507 if (msg_count) { 508 rc = 1; 509 } 510 511 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 512 active_pollers_head, tailq, tmp) { 513 int poller_rc; 514 515 if (thread->exit) { 516 break; 517 } 518 519 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 520 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 521 free(poller); 522 continue; 523 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 524 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 525 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 526 poller->state = SPDK_POLLER_STATE_PAUSED; 527 continue; 528 } 529 530 poller->state = SPDK_POLLER_STATE_RUNNING; 531 poller_rc = poller->fn(poller->arg); 532 533 #ifdef DEBUG 534 if (poller_rc == -1) { 535 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 536 } 537 #endif 538 539 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 540 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 541 free(poller); 542 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 543 poller->state = SPDK_POLLER_STATE_WAITING; 544 } 545 546 if (poller_rc > rc) { 547 rc = poller_rc; 548 } 549 } 550 551 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 552 int timer_rc = 0; 553 554 if (thread->exit) { 555 break; 556 } 557 558 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 559 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 560 free(poller); 561 continue; 562 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 563 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 564 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 565 poller->state = SPDK_POLLER_STATE_PAUSED; 566 continue; 567 } 568 569 if (now < poller->next_run_tick) { 570 break; 571 } 572 573 poller->state = SPDK_POLLER_STATE_RUNNING; 574 timer_rc = poller->fn(poller->arg); 575 576 #ifdef DEBUG 577 if (timer_rc == -1) { 578 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 579 } 580 #endif 581 582 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 583 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 584 free(poller); 585 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 586 poller->state = SPDK_POLLER_STATE_WAITING; 587 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 588 _spdk_poller_insert_timer(thread, poller, now); 589 } 590 591 if (timer_rc > rc) { 592 rc = timer_rc; 593 } 594 } 595 596 if (rc == 0) { 597 /* Poller status idle */ 598 thread->stats.idle_tsc += now - thread->tsc_last; 599 } else if (rc > 0) { 600 /* Poller status busy */ 601 thread->stats.busy_tsc += now - thread->tsc_last; 602 } 603 thread->tsc_last = now; 604 605 tls_thread = orig_thread; 606 607 return rc; 608 } 609 610 uint64_t 611 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 612 { 613 struct spdk_poller *poller; 614 615 poller = TAILQ_FIRST(&thread->timer_pollers); 616 if (poller) { 617 return poller->next_run_tick; 618 } 619 620 return 0; 621 } 622 623 int 624 spdk_thread_has_active_pollers(struct spdk_thread *thread) 625 { 626 return !TAILQ_EMPTY(&thread->active_pollers); 627 } 628 629 static bool 630 _spdk_thread_has_unpaused_pollers(struct spdk_thread *thread) 631 { 632 if (TAILQ_EMPTY(&thread->active_pollers) && 633 TAILQ_EMPTY(&thread->timer_pollers)) { 634 return false; 635 } 636 637 return true; 638 } 639 640 bool 641 spdk_thread_has_pollers(struct spdk_thread *thread) 642 { 643 if (!_spdk_thread_has_unpaused_pollers(thread) && 644 TAILQ_EMPTY(&thread->paused_pollers)) { 645 return false; 646 } 647 648 return true; 649 } 650 651 bool 652 spdk_thread_is_idle(struct spdk_thread *thread) 653 { 654 if (spdk_ring_count(thread->messages) || 655 _spdk_thread_has_unpaused_pollers(thread) || 656 thread->critical_msg != NULL) { 657 return false; 658 } 659 660 return true; 661 } 662 663 uint32_t 664 spdk_thread_get_count(void) 665 { 666 /* 667 * Return cached value of the current thread count. We could acquire the 668 * lock and iterate through the TAILQ of threads to count them, but that 669 * count could still be invalidated after we release the lock. 670 */ 671 return g_thread_count; 672 } 673 674 struct spdk_thread * 675 spdk_get_thread(void) 676 { 677 struct spdk_thread *thread; 678 679 thread = _get_thread(); 680 if (!thread) { 681 SPDK_ERRLOG("No thread allocated\n"); 682 } 683 684 return thread; 685 } 686 687 const char * 688 spdk_thread_get_name(const struct spdk_thread *thread) 689 { 690 return thread->name; 691 } 692 693 int 694 spdk_thread_get_stats(struct spdk_thread_stats *stats) 695 { 696 struct spdk_thread *thread; 697 698 thread = _get_thread(); 699 if (!thread) { 700 SPDK_ERRLOG("No thread allocated\n"); 701 return -EINVAL; 702 } 703 704 if (stats == NULL) { 705 return -EINVAL; 706 } 707 708 *stats = thread->stats; 709 710 return 0; 711 } 712 713 int 714 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 715 { 716 struct spdk_thread *local_thread; 717 struct spdk_msg *msg; 718 int rc; 719 720 assert(thread != NULL); 721 722 local_thread = _get_thread(); 723 724 msg = NULL; 725 if (local_thread != NULL) { 726 if (local_thread->msg_cache_count > 0) { 727 msg = SLIST_FIRST(&local_thread->msg_cache); 728 assert(msg != NULL); 729 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 730 local_thread->msg_cache_count--; 731 } 732 } 733 734 if (msg == NULL) { 735 msg = spdk_mempool_get(g_spdk_msg_mempool); 736 if (!msg) { 737 SPDK_ERRLOG("msg could not be allocated\n"); 738 return -ENOMEM; 739 } 740 } 741 742 msg->fn = fn; 743 msg->arg = ctx; 744 745 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 746 if (rc != 1) { 747 SPDK_ERRLOG("msg could not be enqueued\n"); 748 spdk_mempool_put(g_spdk_msg_mempool, msg); 749 return -EIO; 750 } 751 752 return 0; 753 } 754 755 int 756 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 757 { 758 spdk_msg_fn expected = NULL; 759 760 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 761 __ATOMIC_SEQ_CST)) { 762 return 0; 763 } 764 765 return -EIO; 766 } 767 768 struct spdk_poller * 769 spdk_poller_register(spdk_poller_fn fn, 770 void *arg, 771 uint64_t period_microseconds) 772 { 773 struct spdk_thread *thread; 774 struct spdk_poller *poller; 775 uint64_t quotient, remainder, ticks; 776 777 thread = spdk_get_thread(); 778 if (!thread) { 779 assert(false); 780 return NULL; 781 } 782 783 poller = calloc(1, sizeof(*poller)); 784 if (poller == NULL) { 785 SPDK_ERRLOG("Poller memory allocation failed\n"); 786 return NULL; 787 } 788 789 poller->state = SPDK_POLLER_STATE_WAITING; 790 poller->fn = fn; 791 poller->arg = arg; 792 793 if (period_microseconds) { 794 quotient = period_microseconds / SPDK_SEC_TO_USEC; 795 remainder = period_microseconds % SPDK_SEC_TO_USEC; 796 ticks = spdk_get_ticks_hz(); 797 798 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 799 } else { 800 poller->period_ticks = 0; 801 } 802 803 _spdk_thread_insert_poller(thread, poller); 804 805 return poller; 806 } 807 808 void 809 spdk_poller_unregister(struct spdk_poller **ppoller) 810 { 811 struct spdk_thread *thread; 812 struct spdk_poller *poller; 813 814 poller = *ppoller; 815 if (poller == NULL) { 816 return; 817 } 818 819 *ppoller = NULL; 820 821 thread = spdk_get_thread(); 822 if (!thread) { 823 assert(false); 824 return; 825 } 826 827 /* If the poller was paused, put it on the active_pollers list so that 828 * its unregistration can be processed by spdk_thread_poll(). 829 */ 830 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 831 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 832 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 833 poller->period_ticks = 0; 834 } 835 836 /* Simply set the state to unregistered. The poller will get cleaned up 837 * in a subsequent call to spdk_thread_poll(). 838 */ 839 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 840 } 841 842 void 843 spdk_poller_pause(struct spdk_poller *poller) 844 { 845 struct spdk_thread *thread; 846 847 if (poller->state == SPDK_POLLER_STATE_PAUSED || 848 poller->state == SPDK_POLLER_STATE_PAUSING) { 849 return; 850 } 851 852 thread = spdk_get_thread(); 853 if (!thread) { 854 assert(false); 855 return; 856 } 857 858 /* If a poller is paused from within itself, we can immediately move it 859 * on the paused_pollers list. Otherwise we just set its state to 860 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 861 * allows a poller to be paused from another one's context without 862 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 863 */ 864 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 865 poller->state = SPDK_POLLER_STATE_PAUSING; 866 } else { 867 if (poller->period_ticks > 0) { 868 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 869 } else { 870 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 871 } 872 873 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 874 poller->state = SPDK_POLLER_STATE_PAUSED; 875 } 876 } 877 878 void 879 spdk_poller_resume(struct spdk_poller *poller) 880 { 881 struct spdk_thread *thread; 882 883 if (poller->state != SPDK_POLLER_STATE_PAUSED && 884 poller->state != SPDK_POLLER_STATE_PAUSING) { 885 return; 886 } 887 888 thread = spdk_get_thread(); 889 if (!thread) { 890 assert(false); 891 return; 892 } 893 894 /* If a poller is paused it has to be removed from the paused pollers 895 * list and put on the active / timer list depending on its 896 * period_ticks. If a poller is still in the process of being paused, 897 * we just need to flip its state back to waiting, as it's already on 898 * the appropriate list. 899 */ 900 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 901 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 902 _spdk_thread_insert_poller(thread, poller); 903 } 904 905 poller->state = SPDK_POLLER_STATE_WAITING; 906 } 907 908 struct call_thread { 909 struct spdk_thread *cur_thread; 910 spdk_msg_fn fn; 911 void *ctx; 912 913 struct spdk_thread *orig_thread; 914 spdk_msg_fn cpl; 915 }; 916 917 static void 918 spdk_on_thread(void *ctx) 919 { 920 struct call_thread *ct = ctx; 921 922 ct->fn(ct->ctx); 923 924 pthread_mutex_lock(&g_devlist_mutex); 925 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 926 pthread_mutex_unlock(&g_devlist_mutex); 927 928 if (!ct->cur_thread) { 929 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 930 931 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 932 free(ctx); 933 } else { 934 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 935 ct->cur_thread->name); 936 937 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 938 } 939 } 940 941 void 942 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 943 { 944 struct call_thread *ct; 945 struct spdk_thread *thread; 946 947 ct = calloc(1, sizeof(*ct)); 948 if (!ct) { 949 SPDK_ERRLOG("Unable to perform thread iteration\n"); 950 cpl(ctx); 951 return; 952 } 953 954 ct->fn = fn; 955 ct->ctx = ctx; 956 ct->cpl = cpl; 957 958 thread = _get_thread(); 959 if (!thread) { 960 SPDK_ERRLOG("No thread allocated\n"); 961 free(ct); 962 cpl(ctx); 963 return; 964 } 965 ct->orig_thread = thread; 966 967 pthread_mutex_lock(&g_devlist_mutex); 968 ct->cur_thread = TAILQ_FIRST(&g_threads); 969 pthread_mutex_unlock(&g_devlist_mutex); 970 971 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 972 ct->orig_thread->name); 973 974 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 975 } 976 977 void 978 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 979 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 980 const char *name) 981 { 982 struct io_device *dev, *tmp; 983 struct spdk_thread *thread; 984 985 assert(io_device != NULL); 986 assert(create_cb != NULL); 987 assert(destroy_cb != NULL); 988 989 thread = spdk_get_thread(); 990 if (!thread) { 991 SPDK_ERRLOG("called from non-SPDK thread\n"); 992 assert(false); 993 return; 994 } 995 996 dev = calloc(1, sizeof(struct io_device)); 997 if (dev == NULL) { 998 SPDK_ERRLOG("could not allocate io_device\n"); 999 return; 1000 } 1001 1002 dev->io_device = io_device; 1003 if (name) { 1004 snprintf(dev->name, sizeof(dev->name), "%s", name); 1005 } else { 1006 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1007 } 1008 dev->create_cb = create_cb; 1009 dev->destroy_cb = destroy_cb; 1010 dev->unregister_cb = NULL; 1011 dev->ctx_size = ctx_size; 1012 dev->for_each_count = 0; 1013 dev->unregistered = false; 1014 dev->refcnt = 0; 1015 1016 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 1017 dev->name, dev->io_device, thread->name); 1018 1019 pthread_mutex_lock(&g_devlist_mutex); 1020 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1021 if (tmp->io_device == io_device) { 1022 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1023 io_device, tmp->name, dev->name); 1024 free(dev); 1025 pthread_mutex_unlock(&g_devlist_mutex); 1026 return; 1027 } 1028 } 1029 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1030 pthread_mutex_unlock(&g_devlist_mutex); 1031 } 1032 1033 static void 1034 _finish_unregister(void *arg) 1035 { 1036 struct io_device *dev = arg; 1037 1038 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1039 dev->name, dev->io_device, dev->unregister_thread->name); 1040 1041 dev->unregister_cb(dev->io_device); 1042 free(dev); 1043 } 1044 1045 static void 1046 _spdk_io_device_free(struct io_device *dev) 1047 { 1048 if (dev->unregister_cb == NULL) { 1049 free(dev); 1050 } else { 1051 assert(dev->unregister_thread != NULL); 1052 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 1053 dev->name, dev->io_device, dev->unregister_thread->name); 1054 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1055 } 1056 } 1057 1058 void 1059 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1060 { 1061 struct io_device *dev; 1062 uint32_t refcnt; 1063 struct spdk_thread *thread; 1064 1065 thread = spdk_get_thread(); 1066 if (!thread) { 1067 SPDK_ERRLOG("called from non-SPDK thread\n"); 1068 assert(false); 1069 return; 1070 } 1071 1072 pthread_mutex_lock(&g_devlist_mutex); 1073 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1074 if (dev->io_device == io_device) { 1075 break; 1076 } 1077 } 1078 1079 if (!dev) { 1080 SPDK_ERRLOG("io_device %p not found\n", io_device); 1081 assert(false); 1082 pthread_mutex_unlock(&g_devlist_mutex); 1083 return; 1084 } 1085 1086 if (dev->for_each_count > 0) { 1087 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1088 dev->name, io_device, dev->for_each_count); 1089 pthread_mutex_unlock(&g_devlist_mutex); 1090 return; 1091 } 1092 1093 dev->unregister_cb = unregister_cb; 1094 dev->unregistered = true; 1095 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1096 refcnt = dev->refcnt; 1097 dev->unregister_thread = thread; 1098 pthread_mutex_unlock(&g_devlist_mutex); 1099 1100 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 1101 dev->name, dev->io_device, thread->name); 1102 1103 if (refcnt > 0) { 1104 /* defer deletion */ 1105 return; 1106 } 1107 1108 _spdk_io_device_free(dev); 1109 } 1110 1111 struct spdk_io_channel * 1112 spdk_get_io_channel(void *io_device) 1113 { 1114 struct spdk_io_channel *ch; 1115 struct spdk_thread *thread; 1116 struct io_device *dev; 1117 int rc; 1118 1119 pthread_mutex_lock(&g_devlist_mutex); 1120 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1121 if (dev->io_device == io_device) { 1122 break; 1123 } 1124 } 1125 if (dev == NULL) { 1126 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1127 pthread_mutex_unlock(&g_devlist_mutex); 1128 return NULL; 1129 } 1130 1131 thread = _get_thread(); 1132 if (!thread) { 1133 SPDK_ERRLOG("No thread allocated\n"); 1134 pthread_mutex_unlock(&g_devlist_mutex); 1135 return NULL; 1136 } 1137 1138 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1139 if (ch->dev == dev) { 1140 ch->ref++; 1141 1142 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1143 ch, dev->name, dev->io_device, thread->name, ch->ref); 1144 1145 /* 1146 * An I/O channel already exists for this device on this 1147 * thread, so return it. 1148 */ 1149 pthread_mutex_unlock(&g_devlist_mutex); 1150 return ch; 1151 } 1152 } 1153 1154 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1155 if (ch == NULL) { 1156 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1157 pthread_mutex_unlock(&g_devlist_mutex); 1158 return NULL; 1159 } 1160 1161 ch->dev = dev; 1162 ch->destroy_cb = dev->destroy_cb; 1163 ch->thread = thread; 1164 ch->ref = 1; 1165 ch->destroy_ref = 0; 1166 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1167 1168 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1169 ch, dev->name, dev->io_device, thread->name, ch->ref); 1170 1171 dev->refcnt++; 1172 1173 pthread_mutex_unlock(&g_devlist_mutex); 1174 1175 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1176 if (rc != 0) { 1177 pthread_mutex_lock(&g_devlist_mutex); 1178 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1179 dev->refcnt--; 1180 free(ch); 1181 pthread_mutex_unlock(&g_devlist_mutex); 1182 return NULL; 1183 } 1184 1185 return ch; 1186 } 1187 1188 static void 1189 _spdk_put_io_channel(void *arg) 1190 { 1191 struct spdk_io_channel *ch = arg; 1192 bool do_remove_dev = true; 1193 struct spdk_thread *thread; 1194 1195 thread = spdk_get_thread(); 1196 if (!thread) { 1197 SPDK_ERRLOG("called from non-SPDK thread\n"); 1198 assert(false); 1199 return; 1200 } 1201 1202 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1203 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 1204 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 1205 1206 assert(ch->thread == thread); 1207 1208 ch->destroy_ref--; 1209 1210 if (ch->ref > 0 || ch->destroy_ref > 0) { 1211 /* 1212 * Another reference to the associated io_device was requested 1213 * after this message was sent but before it had a chance to 1214 * execute. 1215 */ 1216 return; 1217 } 1218 1219 pthread_mutex_lock(&g_devlist_mutex); 1220 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1221 pthread_mutex_unlock(&g_devlist_mutex); 1222 1223 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1224 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1225 1226 pthread_mutex_lock(&g_devlist_mutex); 1227 ch->dev->refcnt--; 1228 1229 if (!ch->dev->unregistered) { 1230 do_remove_dev = false; 1231 } 1232 1233 if (ch->dev->refcnt > 0) { 1234 do_remove_dev = false; 1235 } 1236 1237 pthread_mutex_unlock(&g_devlist_mutex); 1238 1239 if (do_remove_dev) { 1240 _spdk_io_device_free(ch->dev); 1241 } 1242 free(ch); 1243 } 1244 1245 void 1246 spdk_put_io_channel(struct spdk_io_channel *ch) 1247 { 1248 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1249 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1250 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1251 1252 ch->ref--; 1253 1254 if (ch->ref == 0) { 1255 ch->destroy_ref++; 1256 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1257 } 1258 } 1259 1260 struct spdk_io_channel * 1261 spdk_io_channel_from_ctx(void *ctx) 1262 { 1263 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1264 } 1265 1266 struct spdk_thread * 1267 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1268 { 1269 return ch->thread; 1270 } 1271 1272 struct spdk_io_channel_iter { 1273 void *io_device; 1274 struct io_device *dev; 1275 spdk_channel_msg fn; 1276 int status; 1277 void *ctx; 1278 struct spdk_io_channel *ch; 1279 1280 struct spdk_thread *cur_thread; 1281 1282 struct spdk_thread *orig_thread; 1283 spdk_channel_for_each_cpl cpl; 1284 }; 1285 1286 void * 1287 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1288 { 1289 return i->io_device; 1290 } 1291 1292 struct spdk_io_channel * 1293 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1294 { 1295 return i->ch; 1296 } 1297 1298 void * 1299 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1300 { 1301 return i->ctx; 1302 } 1303 1304 static void 1305 _call_completion(void *ctx) 1306 { 1307 struct spdk_io_channel_iter *i = ctx; 1308 1309 if (i->cpl != NULL) { 1310 i->cpl(i, i->status); 1311 } 1312 free(i); 1313 } 1314 1315 static void 1316 _call_channel(void *ctx) 1317 { 1318 struct spdk_io_channel_iter *i = ctx; 1319 struct spdk_io_channel *ch; 1320 1321 /* 1322 * It is possible that the channel was deleted before this 1323 * message had a chance to execute. If so, skip calling 1324 * the fn() on this thread. 1325 */ 1326 pthread_mutex_lock(&g_devlist_mutex); 1327 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1328 if (ch->dev->io_device == i->io_device) { 1329 break; 1330 } 1331 } 1332 pthread_mutex_unlock(&g_devlist_mutex); 1333 1334 if (ch) { 1335 i->fn(i); 1336 } else { 1337 spdk_for_each_channel_continue(i, 0); 1338 } 1339 } 1340 1341 void 1342 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1343 spdk_channel_for_each_cpl cpl) 1344 { 1345 struct spdk_thread *thread; 1346 struct spdk_io_channel *ch; 1347 struct spdk_io_channel_iter *i; 1348 int rc __attribute__((unused)); 1349 1350 i = calloc(1, sizeof(*i)); 1351 if (!i) { 1352 SPDK_ERRLOG("Unable to allocate iterator\n"); 1353 return; 1354 } 1355 1356 i->io_device = io_device; 1357 i->fn = fn; 1358 i->ctx = ctx; 1359 i->cpl = cpl; 1360 1361 pthread_mutex_lock(&g_devlist_mutex); 1362 i->orig_thread = _get_thread(); 1363 1364 TAILQ_FOREACH(thread, &g_threads, tailq) { 1365 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1366 if (ch->dev->io_device == io_device) { 1367 ch->dev->for_each_count++; 1368 i->dev = ch->dev; 1369 i->cur_thread = thread; 1370 i->ch = ch; 1371 pthread_mutex_unlock(&g_devlist_mutex); 1372 rc = spdk_thread_send_msg(thread, _call_channel, i); 1373 assert(rc == 0); 1374 return; 1375 } 1376 } 1377 } 1378 1379 pthread_mutex_unlock(&g_devlist_mutex); 1380 1381 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1382 assert(rc == 0); 1383 } 1384 1385 void 1386 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1387 { 1388 struct spdk_thread *thread; 1389 struct spdk_io_channel *ch; 1390 1391 assert(i->cur_thread == spdk_get_thread()); 1392 1393 i->status = status; 1394 1395 pthread_mutex_lock(&g_devlist_mutex); 1396 if (status) { 1397 goto end; 1398 } 1399 thread = TAILQ_NEXT(i->cur_thread, tailq); 1400 while (thread) { 1401 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1402 if (ch->dev->io_device == i->io_device) { 1403 i->cur_thread = thread; 1404 i->ch = ch; 1405 pthread_mutex_unlock(&g_devlist_mutex); 1406 spdk_thread_send_msg(thread, _call_channel, i); 1407 return; 1408 } 1409 } 1410 thread = TAILQ_NEXT(thread, tailq); 1411 } 1412 1413 end: 1414 i->dev->for_each_count--; 1415 i->ch = NULL; 1416 pthread_mutex_unlock(&g_devlist_mutex); 1417 1418 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1419 } 1420 1421 1422 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1423