1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #define SPDK_MSG_BATCH_SIZE 8 46 47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 48 49 static spdk_new_thread_fn g_new_thread_fn = NULL; 50 static size_t g_ctx_sz = 0; 51 52 struct io_device { 53 void *io_device; 54 char *name; 55 spdk_io_channel_create_cb create_cb; 56 spdk_io_channel_destroy_cb destroy_cb; 57 spdk_io_device_unregister_cb unregister_cb; 58 struct spdk_thread *unregister_thread; 59 uint32_t ctx_size; 60 uint32_t for_each_count; 61 TAILQ_ENTRY(io_device) tailq; 62 63 uint32_t refcnt; 64 65 bool unregistered; 66 }; 67 68 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 69 70 struct spdk_msg { 71 spdk_msg_fn fn; 72 void *arg; 73 74 SLIST_ENTRY(spdk_msg) link; 75 }; 76 77 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 78 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 79 80 enum spdk_poller_state { 81 /* The poller is registered with a thread but not currently executing its fn. */ 82 SPDK_POLLER_STATE_WAITING, 83 84 /* The poller is currently running its fn. */ 85 SPDK_POLLER_STATE_RUNNING, 86 87 /* The poller was unregistered during the execution of its fn. */ 88 SPDK_POLLER_STATE_UNREGISTERED, 89 }; 90 91 struct spdk_poller { 92 TAILQ_ENTRY(spdk_poller) tailq; 93 94 /* Current state of the poller; should only be accessed from the poller's thread. */ 95 enum spdk_poller_state state; 96 97 uint64_t period_ticks; 98 uint64_t next_run_tick; 99 spdk_poller_fn fn; 100 void *arg; 101 }; 102 103 struct spdk_thread { 104 TAILQ_HEAD(, spdk_io_channel) io_channels; 105 TAILQ_ENTRY(spdk_thread) tailq; 106 char *name; 107 108 struct spdk_cpuset *cpumask; 109 110 uint64_t tsc_last; 111 struct spdk_thread_stats stats; 112 113 /* 114 * Contains pollers actively running on this thread. Pollers 115 * are run round-robin. The thread takes one poller from the head 116 * of the ring, executes it, then puts it back at the tail of 117 * the ring. 118 */ 119 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 120 121 /** 122 * Contains pollers running on this thread with a periodic timer. 123 */ 124 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 125 126 struct spdk_ring *messages; 127 128 SLIST_HEAD(, spdk_msg) msg_cache; 129 size_t msg_cache_count; 130 131 /* User context allocated at the end */ 132 uint8_t ctx[0]; 133 }; 134 135 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 136 static uint32_t g_thread_count = 0; 137 138 static __thread struct spdk_thread *tls_thread = NULL; 139 140 static inline struct spdk_thread * 141 _get_thread(void) 142 { 143 return tls_thread; 144 } 145 146 int 147 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 148 { 149 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 150 151 assert(g_new_thread_fn == NULL); 152 g_new_thread_fn = new_thread_fn; 153 154 g_ctx_sz = ctx_sz; 155 156 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 157 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 158 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 159 sizeof(struct spdk_msg), 160 0, /* No cache. We do our own. */ 161 SPDK_ENV_SOCKET_ID_ANY); 162 163 if (!g_spdk_msg_mempool) { 164 return -1; 165 } 166 167 return 0; 168 } 169 170 void 171 spdk_thread_lib_fini(void) 172 { 173 struct io_device *dev; 174 175 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 176 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 177 } 178 179 if (g_spdk_msg_mempool) { 180 spdk_mempool_free(g_spdk_msg_mempool); 181 g_spdk_msg_mempool = NULL; 182 } 183 184 g_new_thread_fn = NULL; 185 g_ctx_sz = 0; 186 } 187 188 static void 189 _free_thread(struct spdk_thread *thread) 190 { 191 struct spdk_io_channel *ch; 192 struct spdk_msg *msg; 193 struct spdk_poller *poller, *ptmp; 194 195 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 196 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 197 thread->name, ch->dev->name); 198 } 199 200 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 201 if (poller->state == SPDK_POLLER_STATE_WAITING) { 202 SPDK_WARNLOG("poller %p still registered at thread exit\n", 203 poller); 204 } 205 206 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 207 free(poller); 208 } 209 210 211 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { 212 if (poller->state == SPDK_POLLER_STATE_WAITING) { 213 SPDK_WARNLOG("poller %p still registered at thread exit\n", 214 poller); 215 } 216 217 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 218 free(poller); 219 } 220 221 pthread_mutex_lock(&g_devlist_mutex); 222 assert(g_thread_count > 0); 223 g_thread_count--; 224 TAILQ_REMOVE(&g_threads, thread, tailq); 225 pthread_mutex_unlock(&g_devlist_mutex); 226 227 spdk_cpuset_free(thread->cpumask); 228 free(thread->name); 229 230 msg = SLIST_FIRST(&thread->msg_cache); 231 while (msg != NULL) { 232 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 233 234 assert(thread->msg_cache_count > 0); 235 thread->msg_cache_count--; 236 spdk_mempool_put(g_spdk_msg_mempool, msg); 237 238 msg = SLIST_FIRST(&thread->msg_cache); 239 } 240 241 assert(thread->msg_cache_count == 0); 242 243 spdk_ring_free(thread->messages); 244 free(thread); 245 } 246 247 struct spdk_thread * 248 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 249 { 250 struct spdk_thread *thread; 251 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 252 int rc, i; 253 254 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 255 if (!thread) { 256 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 257 return NULL; 258 } 259 260 thread->cpumask = spdk_cpuset_alloc(); 261 if (!thread->cpumask) { 262 free(thread); 263 SPDK_ERRLOG("Unable to allocate memory for CPU mask\n"); 264 return NULL; 265 } 266 267 if (cpumask) { 268 spdk_cpuset_copy(thread->cpumask, cpumask); 269 } else { 270 spdk_cpuset_negate(thread->cpumask); 271 } 272 273 TAILQ_INIT(&thread->io_channels); 274 TAILQ_INIT(&thread->active_pollers); 275 TAILQ_INIT(&thread->timer_pollers); 276 SLIST_INIT(&thread->msg_cache); 277 thread->msg_cache_count = 0; 278 279 thread->tsc_last = spdk_get_ticks(); 280 281 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 282 if (!thread->messages) { 283 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 284 spdk_cpuset_free(thread->cpumask); 285 free(thread); 286 return NULL; 287 } 288 289 /* Fill the local message pool cache. */ 290 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 291 if (rc == 0) { 292 /* If we can't populate the cache it's ok. The cache will get filled 293 * up organically as messages are passed to the thread. */ 294 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 295 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 296 thread->msg_cache_count++; 297 } 298 } 299 300 if (name) { 301 thread->name = strdup(name); 302 } else { 303 thread->name = spdk_sprintf_alloc("%p", thread); 304 } 305 306 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 307 308 pthread_mutex_lock(&g_devlist_mutex); 309 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 310 g_thread_count++; 311 pthread_mutex_unlock(&g_devlist_mutex); 312 313 if (g_new_thread_fn) { 314 rc = g_new_thread_fn(thread); 315 if (rc != 0) { 316 _free_thread(thread); 317 return NULL; 318 } 319 } 320 321 return thread; 322 } 323 324 void 325 spdk_set_thread(struct spdk_thread *thread) 326 { 327 tls_thread = thread; 328 } 329 330 void 331 spdk_thread_exit(struct spdk_thread *thread) 332 { 333 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 334 335 if (tls_thread == thread) { 336 tls_thread = NULL; 337 } 338 339 _free_thread(thread); 340 } 341 342 void * 343 spdk_thread_get_ctx(struct spdk_thread *thread) 344 { 345 if (g_ctx_sz > 0) { 346 return thread->ctx; 347 } 348 349 return NULL; 350 } 351 352 struct spdk_cpuset * 353 spdk_thread_get_cpumask(struct spdk_thread *thread) 354 { 355 return thread->cpumask; 356 } 357 358 struct spdk_thread * 359 spdk_thread_get_from_ctx(void *ctx) 360 { 361 if (ctx == NULL) { 362 assert(false); 363 return NULL; 364 } 365 366 assert(g_ctx_sz > 0); 367 368 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 369 } 370 371 static inline uint32_t 372 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 373 { 374 unsigned count, i; 375 void *messages[SPDK_MSG_BATCH_SIZE]; 376 377 #ifdef DEBUG 378 /* 379 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 380 * so we will never actually read uninitialized data from events, but just to be sure 381 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 382 */ 383 memset(messages, 0, sizeof(messages)); 384 #endif 385 386 if (max_msgs > 0) { 387 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 388 } else { 389 max_msgs = SPDK_MSG_BATCH_SIZE; 390 } 391 392 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 393 if (count == 0) { 394 return 0; 395 } 396 397 for (i = 0; i < count; i++) { 398 struct spdk_msg *msg = messages[i]; 399 400 assert(msg != NULL); 401 msg->fn(msg->arg); 402 403 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 404 /* Insert the messages at the head. We want to re-use the hot 405 * ones. */ 406 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 407 thread->msg_cache_count++; 408 } else { 409 spdk_mempool_put(g_spdk_msg_mempool, msg); 410 } 411 } 412 413 return count; 414 } 415 416 static void 417 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 418 { 419 struct spdk_poller *iter; 420 421 poller->next_run_tick = now + poller->period_ticks; 422 423 /* 424 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 425 * run time. 426 */ 427 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 428 if (iter->next_run_tick <= poller->next_run_tick) { 429 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 430 return; 431 } 432 } 433 434 /* No earlier pollers were found, so this poller must be the new head */ 435 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 436 } 437 438 int 439 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 440 { 441 uint32_t msg_count; 442 struct spdk_thread *orig_thread; 443 struct spdk_poller *poller, *tmp; 444 int rc = 0; 445 446 orig_thread = _get_thread(); 447 tls_thread = thread; 448 449 if (now == 0) { 450 now = spdk_get_ticks(); 451 } 452 453 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 454 if (msg_count) { 455 rc = 1; 456 } 457 458 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 459 active_pollers_head, tailq, tmp) { 460 int poller_rc; 461 462 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 463 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 464 free(poller); 465 continue; 466 } 467 468 poller->state = SPDK_POLLER_STATE_RUNNING; 469 poller_rc = poller->fn(poller->arg); 470 471 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 472 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 473 free(poller); 474 continue; 475 } 476 477 poller->state = SPDK_POLLER_STATE_WAITING; 478 479 #ifdef DEBUG 480 if (poller_rc == -1) { 481 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 482 } 483 #endif 484 485 if (poller_rc > rc) { 486 rc = poller_rc; 487 } 488 489 } 490 491 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 492 int timer_rc = 0; 493 494 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 495 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 496 free(poller); 497 continue; 498 } 499 500 if (now < poller->next_run_tick) { 501 break; 502 } 503 504 poller->state = SPDK_POLLER_STATE_RUNNING; 505 timer_rc = poller->fn(poller->arg); 506 507 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 508 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 509 free(poller); 510 continue; 511 } 512 513 poller->state = SPDK_POLLER_STATE_WAITING; 514 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 515 _spdk_poller_insert_timer(thread, poller, now); 516 517 #ifdef DEBUG 518 if (timer_rc == -1) { 519 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 520 } 521 #endif 522 523 if (timer_rc > rc) { 524 rc = timer_rc; 525 526 } 527 } 528 529 if (rc == 0) { 530 /* Poller status idle */ 531 thread->stats.idle_tsc += now - thread->tsc_last; 532 } else if (rc > 0) { 533 /* Poller status busy */ 534 thread->stats.busy_tsc += now - thread->tsc_last; 535 } else { 536 /* Poller status unknown */ 537 thread->stats.unknown_tsc += now - thread->tsc_last; 538 } 539 thread->tsc_last = now; 540 541 tls_thread = orig_thread; 542 543 return rc; 544 } 545 546 uint64_t 547 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 548 { 549 struct spdk_poller *poller; 550 551 poller = TAILQ_FIRST(&thread->timer_pollers); 552 if (poller) { 553 return poller->next_run_tick; 554 } 555 556 return 0; 557 } 558 559 int 560 spdk_thread_has_active_pollers(struct spdk_thread *thread) 561 { 562 return !TAILQ_EMPTY(&thread->active_pollers); 563 } 564 565 bool 566 spdk_thread_has_pollers(struct spdk_thread *thread) 567 { 568 if (TAILQ_EMPTY(&thread->active_pollers) && 569 TAILQ_EMPTY(&thread->timer_pollers)) { 570 return false; 571 } 572 573 return true; 574 } 575 576 bool 577 spdk_thread_is_idle(struct spdk_thread *thread) 578 { 579 if (spdk_ring_count(thread->messages) || 580 spdk_thread_has_pollers(thread)) { 581 return false; 582 } 583 584 return true; 585 } 586 587 uint32_t 588 spdk_thread_get_count(void) 589 { 590 /* 591 * Return cached value of the current thread count. We could acquire the 592 * lock and iterate through the TAILQ of threads to count them, but that 593 * count could still be invalidated after we release the lock. 594 */ 595 return g_thread_count; 596 } 597 598 struct spdk_thread * 599 spdk_get_thread(void) 600 { 601 struct spdk_thread *thread; 602 603 thread = _get_thread(); 604 if (!thread) { 605 SPDK_ERRLOG("No thread allocated\n"); 606 } 607 608 return thread; 609 } 610 611 const char * 612 spdk_thread_get_name(const struct spdk_thread *thread) 613 { 614 return thread->name; 615 } 616 617 int 618 spdk_thread_get_stats(struct spdk_thread_stats *stats) 619 { 620 struct spdk_thread *thread; 621 622 thread = _get_thread(); 623 if (!thread) { 624 SPDK_ERRLOG("No thread allocated\n"); 625 return -EINVAL; 626 } 627 628 if (stats == NULL) { 629 return -EINVAL; 630 } 631 632 *stats = thread->stats; 633 634 return 0; 635 } 636 637 void 638 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 639 { 640 struct spdk_thread *local_thread; 641 struct spdk_msg *msg; 642 int rc; 643 644 if (!thread) { 645 assert(false); 646 return; 647 } 648 649 local_thread = _get_thread(); 650 651 msg = NULL; 652 if (local_thread != NULL) { 653 if (local_thread->msg_cache_count > 0) { 654 msg = SLIST_FIRST(&local_thread->msg_cache); 655 assert(msg != NULL); 656 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 657 local_thread->msg_cache_count--; 658 } 659 } 660 661 if (msg == NULL) { 662 msg = spdk_mempool_get(g_spdk_msg_mempool); 663 if (!msg) { 664 assert(false); 665 return; 666 } 667 } 668 669 msg->fn = fn; 670 msg->arg = ctx; 671 672 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 673 if (rc != 1) { 674 assert(false); 675 spdk_mempool_put(g_spdk_msg_mempool, msg); 676 return; 677 } 678 } 679 680 struct spdk_poller * 681 spdk_poller_register(spdk_poller_fn fn, 682 void *arg, 683 uint64_t period_microseconds) 684 { 685 struct spdk_thread *thread; 686 struct spdk_poller *poller; 687 uint64_t quotient, remainder, ticks; 688 689 thread = spdk_get_thread(); 690 if (!thread) { 691 assert(false); 692 return NULL; 693 } 694 695 poller = calloc(1, sizeof(*poller)); 696 if (poller == NULL) { 697 SPDK_ERRLOG("Poller memory allocation failed\n"); 698 return NULL; 699 } 700 701 poller->state = SPDK_POLLER_STATE_WAITING; 702 poller->fn = fn; 703 poller->arg = arg; 704 705 if (period_microseconds) { 706 quotient = period_microseconds / SPDK_SEC_TO_USEC; 707 remainder = period_microseconds % SPDK_SEC_TO_USEC; 708 ticks = spdk_get_ticks_hz(); 709 710 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 711 } else { 712 poller->period_ticks = 0; 713 } 714 715 if (poller->period_ticks) { 716 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 717 } else { 718 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 719 } 720 721 return poller; 722 } 723 724 void 725 spdk_poller_unregister(struct spdk_poller **ppoller) 726 { 727 struct spdk_thread *thread; 728 struct spdk_poller *poller; 729 730 poller = *ppoller; 731 if (poller == NULL) { 732 return; 733 } 734 735 *ppoller = NULL; 736 737 thread = spdk_get_thread(); 738 if (!thread) { 739 assert(false); 740 return; 741 } 742 743 /* Simply set the state to unregistered. The poller will get cleaned up 744 * in a subsequent call to spdk_thread_poll(). 745 */ 746 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 747 } 748 749 struct call_thread { 750 struct spdk_thread *cur_thread; 751 spdk_msg_fn fn; 752 void *ctx; 753 754 struct spdk_thread *orig_thread; 755 spdk_msg_fn cpl; 756 }; 757 758 static void 759 spdk_on_thread(void *ctx) 760 { 761 struct call_thread *ct = ctx; 762 763 ct->fn(ct->ctx); 764 765 pthread_mutex_lock(&g_devlist_mutex); 766 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 767 pthread_mutex_unlock(&g_devlist_mutex); 768 769 if (!ct->cur_thread) { 770 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 771 772 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 773 free(ctx); 774 } else { 775 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 776 ct->cur_thread->name); 777 778 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 779 } 780 } 781 782 void 783 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 784 { 785 struct call_thread *ct; 786 struct spdk_thread *thread; 787 788 ct = calloc(1, sizeof(*ct)); 789 if (!ct) { 790 SPDK_ERRLOG("Unable to perform thread iteration\n"); 791 cpl(ctx); 792 return; 793 } 794 795 ct->fn = fn; 796 ct->ctx = ctx; 797 ct->cpl = cpl; 798 799 thread = _get_thread(); 800 if (!thread) { 801 SPDK_ERRLOG("No thread allocated\n"); 802 free(ct); 803 cpl(ctx); 804 return; 805 } 806 ct->orig_thread = thread; 807 808 pthread_mutex_lock(&g_devlist_mutex); 809 ct->cur_thread = TAILQ_FIRST(&g_threads); 810 pthread_mutex_unlock(&g_devlist_mutex); 811 812 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 813 ct->orig_thread->name); 814 815 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 816 } 817 818 void 819 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 820 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 821 const char *name) 822 { 823 struct io_device *dev, *tmp; 824 struct spdk_thread *thread; 825 826 assert(io_device != NULL); 827 assert(create_cb != NULL); 828 assert(destroy_cb != NULL); 829 830 thread = spdk_get_thread(); 831 if (!thread) { 832 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 833 assert(false); 834 return; 835 } 836 837 dev = calloc(1, sizeof(struct io_device)); 838 if (dev == NULL) { 839 SPDK_ERRLOG("could not allocate io_device\n"); 840 return; 841 } 842 843 dev->io_device = io_device; 844 if (name) { 845 dev->name = strdup(name); 846 } else { 847 dev->name = spdk_sprintf_alloc("%p", dev); 848 } 849 dev->create_cb = create_cb; 850 dev->destroy_cb = destroy_cb; 851 dev->unregister_cb = NULL; 852 dev->ctx_size = ctx_size; 853 dev->for_each_count = 0; 854 dev->unregistered = false; 855 dev->refcnt = 0; 856 857 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 858 dev->name, dev->io_device, thread->name); 859 860 pthread_mutex_lock(&g_devlist_mutex); 861 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 862 if (tmp->io_device == io_device) { 863 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 864 io_device, tmp->name, dev->name); 865 free(dev->name); 866 free(dev); 867 pthread_mutex_unlock(&g_devlist_mutex); 868 return; 869 } 870 } 871 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 872 pthread_mutex_unlock(&g_devlist_mutex); 873 } 874 875 static void 876 _finish_unregister(void *arg) 877 { 878 struct io_device *dev = arg; 879 880 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 881 dev->name, dev->io_device, dev->unregister_thread->name); 882 883 dev->unregister_cb(dev->io_device); 884 free(dev->name); 885 free(dev); 886 } 887 888 static void 889 _spdk_io_device_free(struct io_device *dev) 890 { 891 if (dev->unregister_cb == NULL) { 892 free(dev->name); 893 free(dev); 894 } else { 895 assert(dev->unregister_thread != NULL); 896 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 897 dev->name, dev->io_device, dev->unregister_thread->name); 898 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 899 } 900 } 901 902 void 903 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 904 { 905 struct io_device *dev; 906 uint32_t refcnt; 907 struct spdk_thread *thread; 908 909 thread = spdk_get_thread(); 910 if (!thread) { 911 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 912 assert(false); 913 return; 914 } 915 916 pthread_mutex_lock(&g_devlist_mutex); 917 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 918 if (dev->io_device == io_device) { 919 break; 920 } 921 } 922 923 if (!dev) { 924 SPDK_ERRLOG("io_device %p not found\n", io_device); 925 assert(false); 926 pthread_mutex_unlock(&g_devlist_mutex); 927 return; 928 } 929 930 if (dev->for_each_count > 0) { 931 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 932 dev->name, io_device, dev->for_each_count); 933 pthread_mutex_unlock(&g_devlist_mutex); 934 return; 935 } 936 937 dev->unregister_cb = unregister_cb; 938 dev->unregistered = true; 939 TAILQ_REMOVE(&g_io_devices, dev, tailq); 940 refcnt = dev->refcnt; 941 dev->unregister_thread = thread; 942 pthread_mutex_unlock(&g_devlist_mutex); 943 944 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 945 dev->name, dev->io_device, thread->name); 946 947 if (refcnt > 0) { 948 /* defer deletion */ 949 return; 950 } 951 952 _spdk_io_device_free(dev); 953 } 954 955 struct spdk_io_channel * 956 spdk_get_io_channel(void *io_device) 957 { 958 struct spdk_io_channel *ch; 959 struct spdk_thread *thread; 960 struct io_device *dev; 961 int rc; 962 963 pthread_mutex_lock(&g_devlist_mutex); 964 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 965 if (dev->io_device == io_device) { 966 break; 967 } 968 } 969 if (dev == NULL) { 970 SPDK_ERRLOG("could not find io_device %p\n", io_device); 971 pthread_mutex_unlock(&g_devlist_mutex); 972 return NULL; 973 } 974 975 thread = _get_thread(); 976 if (!thread) { 977 SPDK_ERRLOG("No thread allocated\n"); 978 pthread_mutex_unlock(&g_devlist_mutex); 979 return NULL; 980 } 981 982 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 983 if (ch->dev == dev) { 984 ch->ref++; 985 986 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 987 ch, dev->name, dev->io_device, thread->name, ch->ref); 988 989 /* 990 * An I/O channel already exists for this device on this 991 * thread, so return it. 992 */ 993 pthread_mutex_unlock(&g_devlist_mutex); 994 return ch; 995 } 996 } 997 998 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 999 if (ch == NULL) { 1000 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1001 pthread_mutex_unlock(&g_devlist_mutex); 1002 return NULL; 1003 } 1004 1005 ch->dev = dev; 1006 ch->destroy_cb = dev->destroy_cb; 1007 ch->thread = thread; 1008 ch->ref = 1; 1009 ch->destroy_ref = 0; 1010 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1011 1012 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1013 ch, dev->name, dev->io_device, thread->name, ch->ref); 1014 1015 dev->refcnt++; 1016 1017 pthread_mutex_unlock(&g_devlist_mutex); 1018 1019 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1020 if (rc != 0) { 1021 pthread_mutex_lock(&g_devlist_mutex); 1022 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1023 dev->refcnt--; 1024 free(ch); 1025 pthread_mutex_unlock(&g_devlist_mutex); 1026 return NULL; 1027 } 1028 1029 return ch; 1030 } 1031 1032 static void 1033 _spdk_put_io_channel(void *arg) 1034 { 1035 struct spdk_io_channel *ch = arg; 1036 bool do_remove_dev = true; 1037 struct spdk_thread *thread; 1038 1039 thread = spdk_get_thread(); 1040 if (!thread) { 1041 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 1042 assert(false); 1043 return; 1044 } 1045 1046 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1047 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 1048 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 1049 1050 assert(ch->thread == thread); 1051 1052 ch->destroy_ref--; 1053 1054 if (ch->ref > 0 || ch->destroy_ref > 0) { 1055 /* 1056 * Another reference to the associated io_device was requested 1057 * after this message was sent but before it had a chance to 1058 * execute. 1059 */ 1060 return; 1061 } 1062 1063 pthread_mutex_lock(&g_devlist_mutex); 1064 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1065 pthread_mutex_unlock(&g_devlist_mutex); 1066 1067 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1068 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1069 1070 pthread_mutex_lock(&g_devlist_mutex); 1071 ch->dev->refcnt--; 1072 1073 if (!ch->dev->unregistered) { 1074 do_remove_dev = false; 1075 } 1076 1077 if (ch->dev->refcnt > 0) { 1078 do_remove_dev = false; 1079 } 1080 1081 pthread_mutex_unlock(&g_devlist_mutex); 1082 1083 if (do_remove_dev) { 1084 _spdk_io_device_free(ch->dev); 1085 } 1086 free(ch); 1087 } 1088 1089 void 1090 spdk_put_io_channel(struct spdk_io_channel *ch) 1091 { 1092 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1093 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1094 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1095 1096 ch->ref--; 1097 1098 if (ch->ref == 0) { 1099 ch->destroy_ref++; 1100 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1101 } 1102 } 1103 1104 struct spdk_io_channel * 1105 spdk_io_channel_from_ctx(void *ctx) 1106 { 1107 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1108 } 1109 1110 struct spdk_thread * 1111 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1112 { 1113 return ch->thread; 1114 } 1115 1116 struct spdk_io_channel_iter { 1117 void *io_device; 1118 struct io_device *dev; 1119 spdk_channel_msg fn; 1120 int status; 1121 void *ctx; 1122 struct spdk_io_channel *ch; 1123 1124 struct spdk_thread *cur_thread; 1125 1126 struct spdk_thread *orig_thread; 1127 spdk_channel_for_each_cpl cpl; 1128 }; 1129 1130 void * 1131 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1132 { 1133 return i->io_device; 1134 } 1135 1136 struct spdk_io_channel * 1137 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1138 { 1139 return i->ch; 1140 } 1141 1142 void * 1143 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1144 { 1145 return i->ctx; 1146 } 1147 1148 static void 1149 _call_completion(void *ctx) 1150 { 1151 struct spdk_io_channel_iter *i = ctx; 1152 1153 if (i->cpl != NULL) { 1154 i->cpl(i, i->status); 1155 } 1156 free(i); 1157 } 1158 1159 static void 1160 _call_channel(void *ctx) 1161 { 1162 struct spdk_io_channel_iter *i = ctx; 1163 struct spdk_io_channel *ch; 1164 1165 /* 1166 * It is possible that the channel was deleted before this 1167 * message had a chance to execute. If so, skip calling 1168 * the fn() on this thread. 1169 */ 1170 pthread_mutex_lock(&g_devlist_mutex); 1171 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1172 if (ch->dev->io_device == i->io_device) { 1173 break; 1174 } 1175 } 1176 pthread_mutex_unlock(&g_devlist_mutex); 1177 1178 if (ch) { 1179 i->fn(i); 1180 } else { 1181 spdk_for_each_channel_continue(i, 0); 1182 } 1183 } 1184 1185 void 1186 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1187 spdk_channel_for_each_cpl cpl) 1188 { 1189 struct spdk_thread *thread; 1190 struct spdk_io_channel *ch; 1191 struct spdk_io_channel_iter *i; 1192 1193 i = calloc(1, sizeof(*i)); 1194 if (!i) { 1195 SPDK_ERRLOG("Unable to allocate iterator\n"); 1196 return; 1197 } 1198 1199 i->io_device = io_device; 1200 i->fn = fn; 1201 i->ctx = ctx; 1202 i->cpl = cpl; 1203 1204 pthread_mutex_lock(&g_devlist_mutex); 1205 i->orig_thread = _get_thread(); 1206 1207 TAILQ_FOREACH(thread, &g_threads, tailq) { 1208 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1209 if (ch->dev->io_device == io_device) { 1210 ch->dev->for_each_count++; 1211 i->dev = ch->dev; 1212 i->cur_thread = thread; 1213 i->ch = ch; 1214 pthread_mutex_unlock(&g_devlist_mutex); 1215 spdk_thread_send_msg(thread, _call_channel, i); 1216 return; 1217 } 1218 } 1219 } 1220 1221 pthread_mutex_unlock(&g_devlist_mutex); 1222 1223 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1224 } 1225 1226 void 1227 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1228 { 1229 struct spdk_thread *thread; 1230 struct spdk_io_channel *ch; 1231 1232 assert(i->cur_thread == spdk_get_thread()); 1233 1234 i->status = status; 1235 1236 pthread_mutex_lock(&g_devlist_mutex); 1237 if (status) { 1238 goto end; 1239 } 1240 thread = TAILQ_NEXT(i->cur_thread, tailq); 1241 while (thread) { 1242 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1243 if (ch->dev->io_device == i->io_device) { 1244 i->cur_thread = thread; 1245 i->ch = ch; 1246 pthread_mutex_unlock(&g_devlist_mutex); 1247 spdk_thread_send_msg(thread, _call_channel, i); 1248 return; 1249 } 1250 } 1251 thread = TAILQ_NEXT(thread, tailq); 1252 } 1253 1254 end: 1255 i->dev->for_each_count--; 1256 i->ch = NULL; 1257 pthread_mutex_unlock(&g_devlist_mutex); 1258 1259 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1260 } 1261 1262 1263 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1264