1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #define SPDK_MSG_BATCH_SIZE 8 46 47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 48 49 static spdk_new_thread_fn g_new_thread_fn = NULL; 50 static size_t g_ctx_sz = 0; 51 52 struct io_device { 53 void *io_device; 54 char *name; 55 spdk_io_channel_create_cb create_cb; 56 spdk_io_channel_destroy_cb destroy_cb; 57 spdk_io_device_unregister_cb unregister_cb; 58 struct spdk_thread *unregister_thread; 59 uint32_t ctx_size; 60 uint32_t for_each_count; 61 TAILQ_ENTRY(io_device) tailq; 62 63 uint32_t refcnt; 64 65 bool unregistered; 66 }; 67 68 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 69 70 struct spdk_msg { 71 spdk_msg_fn fn; 72 void *arg; 73 74 SLIST_ENTRY(spdk_msg) link; 75 }; 76 77 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 78 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 79 80 enum spdk_poller_state { 81 /* The poller is registered with a thread but not currently executing its fn. */ 82 SPDK_POLLER_STATE_WAITING, 83 84 /* The poller is currently running its fn. */ 85 SPDK_POLLER_STATE_RUNNING, 86 87 /* The poller was unregistered during the execution of its fn. */ 88 SPDK_POLLER_STATE_UNREGISTERED, 89 }; 90 91 struct spdk_poller { 92 TAILQ_ENTRY(spdk_poller) tailq; 93 94 /* Current state of the poller; should only be accessed from the poller's thread. */ 95 enum spdk_poller_state state; 96 97 uint64_t period_ticks; 98 uint64_t next_run_tick; 99 spdk_poller_fn fn; 100 void *arg; 101 }; 102 103 struct spdk_thread { 104 TAILQ_HEAD(, spdk_io_channel) io_channels; 105 TAILQ_ENTRY(spdk_thread) tailq; 106 char *name; 107 108 struct spdk_cpuset *cpumask; 109 110 uint64_t tsc_last; 111 struct spdk_thread_stats stats; 112 113 /* 114 * Contains pollers actively running on this thread. Pollers 115 * are run round-robin. The thread takes one poller from the head 116 * of the ring, executes it, then puts it back at the tail of 117 * the ring. 118 */ 119 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 120 121 /** 122 * Contains pollers running on this thread with a periodic timer. 123 */ 124 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 125 126 struct spdk_ring *messages; 127 128 SLIST_HEAD(, spdk_msg) msg_cache; 129 size_t msg_cache_count; 130 131 /* User context allocated at the end */ 132 uint8_t ctx[0]; 133 }; 134 135 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 136 static uint32_t g_thread_count = 0; 137 138 static __thread struct spdk_thread *tls_thread = NULL; 139 140 static inline struct spdk_thread * 141 _get_thread(void) 142 { 143 return tls_thread; 144 } 145 146 int 147 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 148 { 149 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 150 151 assert(g_new_thread_fn == NULL); 152 g_new_thread_fn = new_thread_fn; 153 154 g_ctx_sz = ctx_sz; 155 156 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 157 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 158 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 159 sizeof(struct spdk_msg), 160 0, /* No cache. We do our own. */ 161 SPDK_ENV_SOCKET_ID_ANY); 162 163 if (!g_spdk_msg_mempool) { 164 return -1; 165 } 166 167 return 0; 168 } 169 170 void 171 spdk_thread_lib_fini(void) 172 { 173 struct io_device *dev; 174 175 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 176 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 177 } 178 179 if (g_spdk_msg_mempool) { 180 spdk_mempool_free(g_spdk_msg_mempool); 181 } 182 } 183 184 struct spdk_thread * 185 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 186 { 187 struct spdk_thread *thread; 188 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 189 int rc, i; 190 191 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 192 if (!thread) { 193 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 194 return NULL; 195 } 196 197 thread->cpumask = spdk_cpuset_alloc(); 198 if (!thread->cpumask) { 199 free(thread); 200 SPDK_ERRLOG("Unable to allocate memory for CPU mask\n"); 201 return NULL; 202 } 203 204 if (cpumask) { 205 spdk_cpuset_copy(thread->cpumask, cpumask); 206 } else { 207 spdk_cpuset_negate(thread->cpumask); 208 } 209 210 TAILQ_INIT(&thread->io_channels); 211 TAILQ_INIT(&thread->active_pollers); 212 TAILQ_INIT(&thread->timer_pollers); 213 SLIST_INIT(&thread->msg_cache); 214 thread->msg_cache_count = 0; 215 216 thread->tsc_last = spdk_get_ticks(); 217 218 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 219 if (!thread->messages) { 220 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 221 spdk_cpuset_free(thread->cpumask); 222 free(thread); 223 return NULL; 224 } 225 226 /* Fill the local message pool cache. */ 227 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 228 if (rc == 0) { 229 /* If we can't populate the cache it's ok. The cache will get filled 230 * up organically as messages are passed to the thread. */ 231 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 232 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 233 thread->msg_cache_count++; 234 } 235 } 236 237 if (name) { 238 thread->name = strdup(name); 239 } else { 240 thread->name = spdk_sprintf_alloc("%p", thread); 241 } 242 243 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 244 245 pthread_mutex_lock(&g_devlist_mutex); 246 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 247 g_thread_count++; 248 pthread_mutex_unlock(&g_devlist_mutex); 249 250 if (g_new_thread_fn) { 251 g_new_thread_fn(thread); 252 } 253 254 return thread; 255 } 256 257 void 258 spdk_set_thread(struct spdk_thread *thread) 259 { 260 tls_thread = thread; 261 } 262 263 void 264 spdk_thread_exit(struct spdk_thread *thread) 265 { 266 struct spdk_io_channel *ch; 267 struct spdk_msg *msg; 268 struct spdk_poller *poller, *ptmp; 269 270 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 271 272 if (tls_thread == thread) { 273 tls_thread = NULL; 274 } 275 276 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 277 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 278 thread->name, ch->dev->name); 279 } 280 281 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 282 if (poller->state == SPDK_POLLER_STATE_WAITING) { 283 SPDK_WARNLOG("poller %p still registered at thread exit\n", 284 poller); 285 } 286 287 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 288 free(poller); 289 } 290 291 292 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { 293 if (poller->state == SPDK_POLLER_STATE_WAITING) { 294 SPDK_WARNLOG("poller %p still registered at thread exit\n", 295 poller); 296 } 297 298 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 299 free(poller); 300 } 301 302 spdk_cpuset_free(thread->cpumask); 303 304 pthread_mutex_lock(&g_devlist_mutex); 305 assert(g_thread_count > 0); 306 g_thread_count--; 307 TAILQ_REMOVE(&g_threads, thread, tailq); 308 pthread_mutex_unlock(&g_devlist_mutex); 309 310 free(thread->name); 311 312 msg = SLIST_FIRST(&thread->msg_cache); 313 while (msg != NULL) { 314 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 315 316 assert(thread->msg_cache_count > 0); 317 thread->msg_cache_count--; 318 spdk_mempool_put(g_spdk_msg_mempool, msg); 319 320 msg = SLIST_FIRST(&thread->msg_cache); 321 } 322 323 assert(thread->msg_cache_count == 0); 324 325 if (thread->messages) { 326 spdk_ring_free(thread->messages); 327 } 328 329 free(thread); 330 } 331 332 void * 333 spdk_thread_get_ctx(struct spdk_thread *thread) 334 { 335 if (g_ctx_sz > 0) { 336 return thread->ctx; 337 } 338 339 return NULL; 340 } 341 342 struct spdk_thread * 343 spdk_thread_get_from_ctx(void *ctx) 344 { 345 if (ctx == NULL) { 346 assert(false); 347 return NULL; 348 } 349 350 assert(g_ctx_sz > 0); 351 352 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 353 } 354 355 static inline uint32_t 356 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 357 { 358 unsigned count, i; 359 void *messages[SPDK_MSG_BATCH_SIZE]; 360 361 #ifdef DEBUG 362 /* 363 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 364 * so we will never actually read uninitialized data from events, but just to be sure 365 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 366 */ 367 memset(messages, 0, sizeof(messages)); 368 #endif 369 370 if (max_msgs > 0) { 371 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 372 } else { 373 max_msgs = SPDK_MSG_BATCH_SIZE; 374 } 375 376 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 377 if (count == 0) { 378 return 0; 379 } 380 381 for (i = 0; i < count; i++) { 382 struct spdk_msg *msg = messages[i]; 383 384 assert(msg != NULL); 385 msg->fn(msg->arg); 386 387 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 388 /* Insert the messages at the head. We want to re-use the hot 389 * ones. */ 390 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 391 thread->msg_cache_count++; 392 } else { 393 spdk_mempool_put(g_spdk_msg_mempool, msg); 394 } 395 } 396 397 return count; 398 } 399 400 static void 401 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 402 { 403 struct spdk_poller *iter; 404 405 poller->next_run_tick = now + poller->period_ticks; 406 407 /* 408 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 409 * run time. 410 */ 411 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 412 if (iter->next_run_tick <= poller->next_run_tick) { 413 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 414 return; 415 } 416 } 417 418 /* No earlier pollers were found, so this poller must be the new head */ 419 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 420 } 421 422 int 423 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 424 { 425 uint32_t msg_count; 426 struct spdk_thread *orig_thread; 427 struct spdk_poller *poller, *tmp; 428 int rc = 0; 429 430 orig_thread = _get_thread(); 431 tls_thread = thread; 432 433 if (now == 0) { 434 now = spdk_get_ticks(); 435 } 436 437 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 438 if (msg_count) { 439 rc = 1; 440 } 441 442 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 443 active_pollers_head, tailq, tmp) { 444 int poller_rc; 445 446 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 447 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 448 free(poller); 449 continue; 450 } 451 452 poller->state = SPDK_POLLER_STATE_RUNNING; 453 poller_rc = poller->fn(poller->arg); 454 455 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 456 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 457 free(poller); 458 continue; 459 } 460 461 poller->state = SPDK_POLLER_STATE_WAITING; 462 463 #ifdef DEBUG 464 if (poller_rc == -1) { 465 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 466 } 467 #endif 468 469 if (poller_rc > rc) { 470 rc = poller_rc; 471 } 472 473 } 474 475 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 476 int timer_rc = 0; 477 478 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 479 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 480 free(poller); 481 continue; 482 } 483 484 if (now < poller->next_run_tick) { 485 break; 486 } 487 488 poller->state = SPDK_POLLER_STATE_RUNNING; 489 timer_rc = poller->fn(poller->arg); 490 491 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 492 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 493 free(poller); 494 continue; 495 } 496 497 poller->state = SPDK_POLLER_STATE_WAITING; 498 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 499 _spdk_poller_insert_timer(thread, poller, now); 500 501 #ifdef DEBUG 502 if (timer_rc == -1) { 503 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 504 } 505 #endif 506 507 if (timer_rc > rc) { 508 rc = timer_rc; 509 510 } 511 } 512 513 if (rc == 0) { 514 /* Poller status idle */ 515 thread->stats.idle_tsc += now - thread->tsc_last; 516 } else if (rc > 0) { 517 /* Poller status busy */ 518 thread->stats.busy_tsc += now - thread->tsc_last; 519 } else { 520 /* Poller status unknown */ 521 thread->stats.unknown_tsc += now - thread->tsc_last; 522 } 523 thread->tsc_last = now; 524 525 tls_thread = orig_thread; 526 527 return rc; 528 } 529 530 uint64_t 531 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 532 { 533 struct spdk_poller *poller; 534 535 poller = TAILQ_FIRST(&thread->timer_pollers); 536 if (poller) { 537 return poller->next_run_tick; 538 } 539 540 return 0; 541 } 542 543 int 544 spdk_thread_has_active_pollers(struct spdk_thread *thread) 545 { 546 return !TAILQ_EMPTY(&thread->active_pollers); 547 } 548 549 bool 550 spdk_thread_has_pollers(struct spdk_thread *thread) 551 { 552 if (TAILQ_EMPTY(&thread->active_pollers) && 553 TAILQ_EMPTY(&thread->timer_pollers)) { 554 return false; 555 } 556 557 return true; 558 } 559 560 bool 561 spdk_thread_is_idle(struct spdk_thread *thread) 562 { 563 if (spdk_ring_count(thread->messages) || 564 spdk_thread_has_pollers(thread)) { 565 return false; 566 } 567 568 return true; 569 } 570 571 uint32_t 572 spdk_thread_get_count(void) 573 { 574 /* 575 * Return cached value of the current thread count. We could acquire the 576 * lock and iterate through the TAILQ of threads to count them, but that 577 * count could still be invalidated after we release the lock. 578 */ 579 return g_thread_count; 580 } 581 582 struct spdk_thread * 583 spdk_get_thread(void) 584 { 585 struct spdk_thread *thread; 586 587 thread = _get_thread(); 588 if (!thread) { 589 SPDK_ERRLOG("No thread allocated\n"); 590 } 591 592 return thread; 593 } 594 595 const char * 596 spdk_thread_get_name(const struct spdk_thread *thread) 597 { 598 return thread->name; 599 } 600 601 int 602 spdk_thread_get_stats(struct spdk_thread_stats *stats) 603 { 604 struct spdk_thread *thread; 605 606 thread = _get_thread(); 607 if (!thread) { 608 SPDK_ERRLOG("No thread allocated\n"); 609 return -EINVAL; 610 } 611 612 if (stats == NULL) { 613 return -EINVAL; 614 } 615 616 *stats = thread->stats; 617 618 return 0; 619 } 620 621 void 622 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 623 { 624 struct spdk_thread *local_thread; 625 struct spdk_msg *msg; 626 int rc; 627 628 if (!thread) { 629 assert(false); 630 return; 631 } 632 633 local_thread = _get_thread(); 634 635 msg = NULL; 636 if (local_thread != NULL) { 637 if (local_thread->msg_cache_count > 0) { 638 msg = SLIST_FIRST(&local_thread->msg_cache); 639 assert(msg != NULL); 640 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 641 local_thread->msg_cache_count--; 642 } 643 } 644 645 if (msg == NULL) { 646 msg = spdk_mempool_get(g_spdk_msg_mempool); 647 if (!msg) { 648 assert(false); 649 return; 650 } 651 } 652 653 msg->fn = fn; 654 msg->arg = ctx; 655 656 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 657 if (rc != 1) { 658 assert(false); 659 spdk_mempool_put(g_spdk_msg_mempool, msg); 660 return; 661 } 662 } 663 664 struct spdk_poller * 665 spdk_poller_register(spdk_poller_fn fn, 666 void *arg, 667 uint64_t period_microseconds) 668 { 669 struct spdk_thread *thread; 670 struct spdk_poller *poller; 671 uint64_t quotient, remainder, ticks; 672 673 thread = spdk_get_thread(); 674 if (!thread) { 675 assert(false); 676 return NULL; 677 } 678 679 poller = calloc(1, sizeof(*poller)); 680 if (poller == NULL) { 681 SPDK_ERRLOG("Poller memory allocation failed\n"); 682 return NULL; 683 } 684 685 poller->state = SPDK_POLLER_STATE_WAITING; 686 poller->fn = fn; 687 poller->arg = arg; 688 689 if (period_microseconds) { 690 quotient = period_microseconds / SPDK_SEC_TO_USEC; 691 remainder = period_microseconds % SPDK_SEC_TO_USEC; 692 ticks = spdk_get_ticks_hz(); 693 694 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 695 } else { 696 poller->period_ticks = 0; 697 } 698 699 if (poller->period_ticks) { 700 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 701 } else { 702 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 703 } 704 705 return poller; 706 } 707 708 void 709 spdk_poller_unregister(struct spdk_poller **ppoller) 710 { 711 struct spdk_thread *thread; 712 struct spdk_poller *poller; 713 714 poller = *ppoller; 715 if (poller == NULL) { 716 return; 717 } 718 719 *ppoller = NULL; 720 721 thread = spdk_get_thread(); 722 if (!thread) { 723 assert(false); 724 return; 725 } 726 727 /* Simply set the state to unregistered. The poller will get cleaned up 728 * in a subsequent call to spdk_thread_poll(). 729 */ 730 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 731 } 732 733 struct call_thread { 734 struct spdk_thread *cur_thread; 735 spdk_msg_fn fn; 736 void *ctx; 737 738 struct spdk_thread *orig_thread; 739 spdk_msg_fn cpl; 740 }; 741 742 static void 743 spdk_on_thread(void *ctx) 744 { 745 struct call_thread *ct = ctx; 746 747 ct->fn(ct->ctx); 748 749 pthread_mutex_lock(&g_devlist_mutex); 750 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 751 pthread_mutex_unlock(&g_devlist_mutex); 752 753 if (!ct->cur_thread) { 754 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 755 756 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 757 free(ctx); 758 } else { 759 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 760 ct->cur_thread->name); 761 762 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 763 } 764 } 765 766 void 767 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 768 { 769 struct call_thread *ct; 770 struct spdk_thread *thread; 771 772 ct = calloc(1, sizeof(*ct)); 773 if (!ct) { 774 SPDK_ERRLOG("Unable to perform thread iteration\n"); 775 cpl(ctx); 776 return; 777 } 778 779 ct->fn = fn; 780 ct->ctx = ctx; 781 ct->cpl = cpl; 782 783 pthread_mutex_lock(&g_devlist_mutex); 784 thread = _get_thread(); 785 if (!thread) { 786 SPDK_ERRLOG("No thread allocated\n"); 787 free(ct); 788 cpl(ctx); 789 return; 790 } 791 ct->orig_thread = thread; 792 ct->cur_thread = TAILQ_FIRST(&g_threads); 793 pthread_mutex_unlock(&g_devlist_mutex); 794 795 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 796 ct->orig_thread->name); 797 798 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 799 } 800 801 void 802 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 803 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 804 const char *name) 805 { 806 struct io_device *dev, *tmp; 807 struct spdk_thread *thread; 808 809 assert(io_device != NULL); 810 assert(create_cb != NULL); 811 assert(destroy_cb != NULL); 812 813 thread = spdk_get_thread(); 814 if (!thread) { 815 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 816 assert(false); 817 return; 818 } 819 820 dev = calloc(1, sizeof(struct io_device)); 821 if (dev == NULL) { 822 SPDK_ERRLOG("could not allocate io_device\n"); 823 return; 824 } 825 826 dev->io_device = io_device; 827 if (name) { 828 dev->name = strdup(name); 829 } else { 830 dev->name = spdk_sprintf_alloc("%p", dev); 831 } 832 dev->create_cb = create_cb; 833 dev->destroy_cb = destroy_cb; 834 dev->unregister_cb = NULL; 835 dev->ctx_size = ctx_size; 836 dev->for_each_count = 0; 837 dev->unregistered = false; 838 dev->refcnt = 0; 839 840 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 841 dev->name, dev->io_device, thread->name); 842 843 pthread_mutex_lock(&g_devlist_mutex); 844 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 845 if (tmp->io_device == io_device) { 846 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 847 io_device, tmp->name, dev->name); 848 free(dev->name); 849 free(dev); 850 pthread_mutex_unlock(&g_devlist_mutex); 851 return; 852 } 853 } 854 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 855 pthread_mutex_unlock(&g_devlist_mutex); 856 } 857 858 static void 859 _finish_unregister(void *arg) 860 { 861 struct io_device *dev = arg; 862 863 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 864 dev->name, dev->io_device, dev->unregister_thread->name); 865 866 dev->unregister_cb(dev->io_device); 867 free(dev->name); 868 free(dev); 869 } 870 871 static void 872 _spdk_io_device_free(struct io_device *dev) 873 { 874 if (dev->unregister_cb == NULL) { 875 free(dev->name); 876 free(dev); 877 } else { 878 assert(dev->unregister_thread != NULL); 879 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 880 dev->name, dev->io_device, dev->unregister_thread->name); 881 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 882 } 883 } 884 885 void 886 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 887 { 888 struct io_device *dev; 889 uint32_t refcnt; 890 struct spdk_thread *thread; 891 892 thread = spdk_get_thread(); 893 if (!thread) { 894 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 895 assert(false); 896 return; 897 } 898 899 pthread_mutex_lock(&g_devlist_mutex); 900 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 901 if (dev->io_device == io_device) { 902 break; 903 } 904 } 905 906 if (!dev) { 907 SPDK_ERRLOG("io_device %p not found\n", io_device); 908 assert(false); 909 pthread_mutex_unlock(&g_devlist_mutex); 910 return; 911 } 912 913 if (dev->for_each_count > 0) { 914 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 915 dev->name, io_device, dev->for_each_count); 916 pthread_mutex_unlock(&g_devlist_mutex); 917 return; 918 } 919 920 dev->unregister_cb = unregister_cb; 921 dev->unregistered = true; 922 TAILQ_REMOVE(&g_io_devices, dev, tailq); 923 refcnt = dev->refcnt; 924 dev->unregister_thread = thread; 925 pthread_mutex_unlock(&g_devlist_mutex); 926 927 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 928 dev->name, dev->io_device, thread->name); 929 930 if (refcnt > 0) { 931 /* defer deletion */ 932 return; 933 } 934 935 _spdk_io_device_free(dev); 936 } 937 938 struct spdk_io_channel * 939 spdk_get_io_channel(void *io_device) 940 { 941 struct spdk_io_channel *ch; 942 struct spdk_thread *thread; 943 struct io_device *dev; 944 int rc; 945 946 pthread_mutex_lock(&g_devlist_mutex); 947 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 948 if (dev->io_device == io_device) { 949 break; 950 } 951 } 952 if (dev == NULL) { 953 SPDK_ERRLOG("could not find io_device %p\n", io_device); 954 pthread_mutex_unlock(&g_devlist_mutex); 955 return NULL; 956 } 957 958 thread = _get_thread(); 959 if (!thread) { 960 SPDK_ERRLOG("No thread allocated\n"); 961 pthread_mutex_unlock(&g_devlist_mutex); 962 return NULL; 963 } 964 965 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 966 if (ch->dev == dev) { 967 ch->ref++; 968 969 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 970 ch, dev->name, dev->io_device, thread->name, ch->ref); 971 972 /* 973 * An I/O channel already exists for this device on this 974 * thread, so return it. 975 */ 976 pthread_mutex_unlock(&g_devlist_mutex); 977 return ch; 978 } 979 } 980 981 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 982 if (ch == NULL) { 983 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 984 pthread_mutex_unlock(&g_devlist_mutex); 985 return NULL; 986 } 987 988 ch->dev = dev; 989 ch->destroy_cb = dev->destroy_cb; 990 ch->thread = thread; 991 ch->ref = 1; 992 ch->destroy_ref = 0; 993 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 994 995 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 996 ch, dev->name, dev->io_device, thread->name, ch->ref); 997 998 dev->refcnt++; 999 1000 pthread_mutex_unlock(&g_devlist_mutex); 1001 1002 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1003 if (rc != 0) { 1004 pthread_mutex_lock(&g_devlist_mutex); 1005 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1006 dev->refcnt--; 1007 free(ch); 1008 pthread_mutex_unlock(&g_devlist_mutex); 1009 return NULL; 1010 } 1011 1012 return ch; 1013 } 1014 1015 static void 1016 _spdk_put_io_channel(void *arg) 1017 { 1018 struct spdk_io_channel *ch = arg; 1019 bool do_remove_dev = true; 1020 struct spdk_thread *thread; 1021 1022 thread = spdk_get_thread(); 1023 if (!thread) { 1024 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 1025 assert(false); 1026 return; 1027 } 1028 1029 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1030 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 1031 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 1032 1033 assert(ch->thread == thread); 1034 1035 ch->destroy_ref--; 1036 1037 if (ch->ref > 0 || ch->destroy_ref > 0) { 1038 /* 1039 * Another reference to the associated io_device was requested 1040 * after this message was sent but before it had a chance to 1041 * execute. 1042 */ 1043 return; 1044 } 1045 1046 pthread_mutex_lock(&g_devlist_mutex); 1047 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1048 pthread_mutex_unlock(&g_devlist_mutex); 1049 1050 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1051 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1052 1053 pthread_mutex_lock(&g_devlist_mutex); 1054 ch->dev->refcnt--; 1055 1056 if (!ch->dev->unregistered) { 1057 do_remove_dev = false; 1058 } 1059 1060 if (ch->dev->refcnt > 0) { 1061 do_remove_dev = false; 1062 } 1063 1064 pthread_mutex_unlock(&g_devlist_mutex); 1065 1066 if (do_remove_dev) { 1067 _spdk_io_device_free(ch->dev); 1068 } 1069 free(ch); 1070 } 1071 1072 void 1073 spdk_put_io_channel(struct spdk_io_channel *ch) 1074 { 1075 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1076 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1077 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1078 1079 ch->ref--; 1080 1081 if (ch->ref == 0) { 1082 ch->destroy_ref++; 1083 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1084 } 1085 } 1086 1087 struct spdk_io_channel * 1088 spdk_io_channel_from_ctx(void *ctx) 1089 { 1090 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1091 } 1092 1093 struct spdk_thread * 1094 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1095 { 1096 return ch->thread; 1097 } 1098 1099 struct spdk_io_channel_iter { 1100 void *io_device; 1101 struct io_device *dev; 1102 spdk_channel_msg fn; 1103 int status; 1104 void *ctx; 1105 struct spdk_io_channel *ch; 1106 1107 struct spdk_thread *cur_thread; 1108 1109 struct spdk_thread *orig_thread; 1110 spdk_channel_for_each_cpl cpl; 1111 }; 1112 1113 void * 1114 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1115 { 1116 return i->io_device; 1117 } 1118 1119 struct spdk_io_channel * 1120 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1121 { 1122 return i->ch; 1123 } 1124 1125 void * 1126 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1127 { 1128 return i->ctx; 1129 } 1130 1131 static void 1132 _call_completion(void *ctx) 1133 { 1134 struct spdk_io_channel_iter *i = ctx; 1135 1136 if (i->cpl != NULL) { 1137 i->cpl(i, i->status); 1138 } 1139 free(i); 1140 } 1141 1142 static void 1143 _call_channel(void *ctx) 1144 { 1145 struct spdk_io_channel_iter *i = ctx; 1146 struct spdk_io_channel *ch; 1147 1148 /* 1149 * It is possible that the channel was deleted before this 1150 * message had a chance to execute. If so, skip calling 1151 * the fn() on this thread. 1152 */ 1153 pthread_mutex_lock(&g_devlist_mutex); 1154 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1155 if (ch->dev->io_device == i->io_device) { 1156 break; 1157 } 1158 } 1159 pthread_mutex_unlock(&g_devlist_mutex); 1160 1161 if (ch) { 1162 i->fn(i); 1163 } else { 1164 spdk_for_each_channel_continue(i, 0); 1165 } 1166 } 1167 1168 void 1169 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1170 spdk_channel_for_each_cpl cpl) 1171 { 1172 struct spdk_thread *thread; 1173 struct spdk_io_channel *ch; 1174 struct spdk_io_channel_iter *i; 1175 1176 i = calloc(1, sizeof(*i)); 1177 if (!i) { 1178 SPDK_ERRLOG("Unable to allocate iterator\n"); 1179 return; 1180 } 1181 1182 i->io_device = io_device; 1183 i->fn = fn; 1184 i->ctx = ctx; 1185 i->cpl = cpl; 1186 1187 pthread_mutex_lock(&g_devlist_mutex); 1188 i->orig_thread = _get_thread(); 1189 1190 TAILQ_FOREACH(thread, &g_threads, tailq) { 1191 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1192 if (ch->dev->io_device == io_device) { 1193 ch->dev->for_each_count++; 1194 i->dev = ch->dev; 1195 i->cur_thread = thread; 1196 i->ch = ch; 1197 pthread_mutex_unlock(&g_devlist_mutex); 1198 spdk_thread_send_msg(thread, _call_channel, i); 1199 return; 1200 } 1201 } 1202 } 1203 1204 pthread_mutex_unlock(&g_devlist_mutex); 1205 1206 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1207 } 1208 1209 void 1210 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1211 { 1212 struct spdk_thread *thread; 1213 struct spdk_io_channel *ch; 1214 1215 assert(i->cur_thread == spdk_get_thread()); 1216 1217 i->status = status; 1218 1219 pthread_mutex_lock(&g_devlist_mutex); 1220 if (status) { 1221 goto end; 1222 } 1223 thread = TAILQ_NEXT(i->cur_thread, tailq); 1224 while (thread) { 1225 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1226 if (ch->dev->io_device == i->io_device) { 1227 i->cur_thread = thread; 1228 i->ch = ch; 1229 pthread_mutex_unlock(&g_devlist_mutex); 1230 spdk_thread_send_msg(thread, _call_channel, i); 1231 return; 1232 } 1233 } 1234 thread = TAILQ_NEXT(thread, tailq); 1235 } 1236 1237 end: 1238 i->dev->for_each_count--; 1239 i->ch = NULL; 1240 pthread_mutex_unlock(&g_devlist_mutex); 1241 1242 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1243 } 1244 1245 1246 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1247