1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #define SPDK_MSG_BATCH_SIZE 8 46 47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 48 49 static spdk_new_thread_fn g_new_thread_fn = NULL; 50 static size_t g_ctx_sz = 0; 51 52 struct io_device { 53 void *io_device; 54 char *name; 55 spdk_io_channel_create_cb create_cb; 56 spdk_io_channel_destroy_cb destroy_cb; 57 spdk_io_device_unregister_cb unregister_cb; 58 struct spdk_thread *unregister_thread; 59 uint32_t ctx_size; 60 uint32_t for_each_count; 61 TAILQ_ENTRY(io_device) tailq; 62 63 uint32_t refcnt; 64 65 bool unregistered; 66 }; 67 68 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 69 70 struct spdk_msg { 71 spdk_msg_fn fn; 72 void *arg; 73 74 SLIST_ENTRY(spdk_msg) link; 75 }; 76 77 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 78 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 79 80 enum spdk_poller_state { 81 /* The poller is registered with a thread but not currently executing its fn. */ 82 SPDK_POLLER_STATE_WAITING, 83 84 /* The poller is currently running its fn. */ 85 SPDK_POLLER_STATE_RUNNING, 86 87 /* The poller was unregistered during the execution of its fn. */ 88 SPDK_POLLER_STATE_UNREGISTERED, 89 }; 90 91 struct spdk_poller { 92 TAILQ_ENTRY(spdk_poller) tailq; 93 94 /* Current state of the poller; should only be accessed from the poller's thread. */ 95 enum spdk_poller_state state; 96 97 uint64_t period_ticks; 98 uint64_t next_run_tick; 99 spdk_poller_fn fn; 100 void *arg; 101 }; 102 103 struct spdk_thread { 104 TAILQ_HEAD(, spdk_io_channel) io_channels; 105 TAILQ_ENTRY(spdk_thread) tailq; 106 char *name; 107 108 bool exit; 109 110 struct spdk_cpuset *cpumask; 111 112 uint64_t tsc_last; 113 struct spdk_thread_stats stats; 114 115 /* 116 * Contains pollers actively running on this thread. Pollers 117 * are run round-robin. The thread takes one poller from the head 118 * of the ring, executes it, then puts it back at the tail of 119 * the ring. 120 */ 121 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 122 123 /** 124 * Contains pollers running on this thread with a periodic timer. 125 */ 126 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 127 128 struct spdk_ring *messages; 129 130 SLIST_HEAD(, spdk_msg) msg_cache; 131 size_t msg_cache_count; 132 133 /* User context allocated at the end */ 134 uint8_t ctx[0]; 135 }; 136 137 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 138 static uint32_t g_thread_count = 0; 139 140 static __thread struct spdk_thread *tls_thread = NULL; 141 142 static inline struct spdk_thread * 143 _get_thread(void) 144 { 145 return tls_thread; 146 } 147 148 int 149 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 150 { 151 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 152 153 assert(g_new_thread_fn == NULL); 154 g_new_thread_fn = new_thread_fn; 155 156 g_ctx_sz = ctx_sz; 157 158 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 159 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 160 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 161 sizeof(struct spdk_msg), 162 0, /* No cache. We do our own. */ 163 SPDK_ENV_SOCKET_ID_ANY); 164 165 if (!g_spdk_msg_mempool) { 166 return -1; 167 } 168 169 return 0; 170 } 171 172 void 173 spdk_thread_lib_fini(void) 174 { 175 struct io_device *dev; 176 177 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 178 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 179 } 180 181 if (g_spdk_msg_mempool) { 182 spdk_mempool_free(g_spdk_msg_mempool); 183 g_spdk_msg_mempool = NULL; 184 } 185 186 g_new_thread_fn = NULL; 187 g_ctx_sz = 0; 188 } 189 190 static void 191 _free_thread(struct spdk_thread *thread) 192 { 193 struct spdk_io_channel *ch; 194 struct spdk_msg *msg; 195 struct spdk_poller *poller, *ptmp; 196 197 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 198 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 199 thread->name, ch->dev->name); 200 } 201 202 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 203 if (poller->state == SPDK_POLLER_STATE_WAITING) { 204 SPDK_WARNLOG("poller %p still registered at thread exit\n", 205 poller); 206 } 207 208 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 209 free(poller); 210 } 211 212 213 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { 214 if (poller->state == SPDK_POLLER_STATE_WAITING) { 215 SPDK_WARNLOG("poller %p still registered at thread exit\n", 216 poller); 217 } 218 219 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 220 free(poller); 221 } 222 223 pthread_mutex_lock(&g_devlist_mutex); 224 assert(g_thread_count > 0); 225 g_thread_count--; 226 TAILQ_REMOVE(&g_threads, thread, tailq); 227 pthread_mutex_unlock(&g_devlist_mutex); 228 229 spdk_cpuset_free(thread->cpumask); 230 free(thread->name); 231 232 msg = SLIST_FIRST(&thread->msg_cache); 233 while (msg != NULL) { 234 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 235 236 assert(thread->msg_cache_count > 0); 237 thread->msg_cache_count--; 238 spdk_mempool_put(g_spdk_msg_mempool, msg); 239 240 msg = SLIST_FIRST(&thread->msg_cache); 241 } 242 243 assert(thread->msg_cache_count == 0); 244 245 spdk_ring_free(thread->messages); 246 free(thread); 247 } 248 249 struct spdk_thread * 250 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 251 { 252 struct spdk_thread *thread; 253 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 254 int rc, i; 255 256 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 257 if (!thread) { 258 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 259 return NULL; 260 } 261 262 thread->cpumask = spdk_cpuset_alloc(); 263 if (!thread->cpumask) { 264 free(thread); 265 SPDK_ERRLOG("Unable to allocate memory for CPU mask\n"); 266 return NULL; 267 } 268 269 if (cpumask) { 270 spdk_cpuset_copy(thread->cpumask, cpumask); 271 } else { 272 spdk_cpuset_negate(thread->cpumask); 273 } 274 275 TAILQ_INIT(&thread->io_channels); 276 TAILQ_INIT(&thread->active_pollers); 277 TAILQ_INIT(&thread->timer_pollers); 278 SLIST_INIT(&thread->msg_cache); 279 thread->msg_cache_count = 0; 280 281 thread->tsc_last = spdk_get_ticks(); 282 283 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 284 if (!thread->messages) { 285 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 286 spdk_cpuset_free(thread->cpumask); 287 free(thread); 288 return NULL; 289 } 290 291 /* Fill the local message pool cache. */ 292 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 293 if (rc == 0) { 294 /* If we can't populate the cache it's ok. The cache will get filled 295 * up organically as messages are passed to the thread. */ 296 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 297 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 298 thread->msg_cache_count++; 299 } 300 } 301 302 if (name) { 303 thread->name = strdup(name); 304 } else { 305 thread->name = spdk_sprintf_alloc("%p", thread); 306 } 307 308 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 309 310 pthread_mutex_lock(&g_devlist_mutex); 311 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 312 g_thread_count++; 313 pthread_mutex_unlock(&g_devlist_mutex); 314 315 if (g_new_thread_fn) { 316 rc = g_new_thread_fn(thread); 317 if (rc != 0) { 318 _free_thread(thread); 319 return NULL; 320 } 321 } 322 323 return thread; 324 } 325 326 void 327 spdk_set_thread(struct spdk_thread *thread) 328 { 329 tls_thread = thread; 330 } 331 332 void 333 spdk_thread_exit(struct spdk_thread *thread) 334 { 335 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); 336 337 assert(tls_thread == thread); 338 339 thread->exit = true; 340 } 341 342 void 343 spdk_thread_destroy(struct spdk_thread *thread) 344 { 345 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); 346 347 assert(thread->exit == true); 348 349 if (tls_thread == thread) { 350 tls_thread = NULL; 351 } 352 353 _free_thread(thread); 354 } 355 356 void * 357 spdk_thread_get_ctx(struct spdk_thread *thread) 358 { 359 if (g_ctx_sz > 0) { 360 return thread->ctx; 361 } 362 363 return NULL; 364 } 365 366 struct spdk_cpuset * 367 spdk_thread_get_cpumask(struct spdk_thread *thread) 368 { 369 return thread->cpumask; 370 } 371 372 struct spdk_thread * 373 spdk_thread_get_from_ctx(void *ctx) 374 { 375 if (ctx == NULL) { 376 assert(false); 377 return NULL; 378 } 379 380 assert(g_ctx_sz > 0); 381 382 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 383 } 384 385 static inline uint32_t 386 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 387 { 388 unsigned count, i; 389 void *messages[SPDK_MSG_BATCH_SIZE]; 390 391 #ifdef DEBUG 392 /* 393 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 394 * so we will never actually read uninitialized data from events, but just to be sure 395 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 396 */ 397 memset(messages, 0, sizeof(messages)); 398 #endif 399 400 if (max_msgs > 0) { 401 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 402 } else { 403 max_msgs = SPDK_MSG_BATCH_SIZE; 404 } 405 406 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 407 if (count == 0) { 408 return 0; 409 } 410 411 for (i = 0; i < count; i++) { 412 struct spdk_msg *msg = messages[i]; 413 414 assert(msg != NULL); 415 msg->fn(msg->arg); 416 417 if (thread->exit) { 418 break; 419 } 420 421 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 422 /* Insert the messages at the head. We want to re-use the hot 423 * ones. */ 424 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 425 thread->msg_cache_count++; 426 } else { 427 spdk_mempool_put(g_spdk_msg_mempool, msg); 428 } 429 } 430 431 return count; 432 } 433 434 static void 435 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 436 { 437 struct spdk_poller *iter; 438 439 poller->next_run_tick = now + poller->period_ticks; 440 441 /* 442 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 443 * run time. 444 */ 445 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 446 if (iter->next_run_tick <= poller->next_run_tick) { 447 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 448 return; 449 } 450 } 451 452 /* No earlier pollers were found, so this poller must be the new head */ 453 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 454 } 455 456 int 457 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 458 { 459 uint32_t msg_count; 460 struct spdk_thread *orig_thread; 461 struct spdk_poller *poller, *tmp; 462 int rc = 0; 463 464 orig_thread = _get_thread(); 465 tls_thread = thread; 466 467 if (now == 0) { 468 now = spdk_get_ticks(); 469 } 470 471 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 472 if (msg_count) { 473 rc = 1; 474 } 475 476 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 477 active_pollers_head, tailq, tmp) { 478 int poller_rc; 479 480 if (thread->exit) { 481 break; 482 } 483 484 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 485 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 486 free(poller); 487 continue; 488 } 489 490 poller->state = SPDK_POLLER_STATE_RUNNING; 491 poller_rc = poller->fn(poller->arg); 492 493 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 494 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 495 free(poller); 496 continue; 497 } 498 499 poller->state = SPDK_POLLER_STATE_WAITING; 500 501 #ifdef DEBUG 502 if (poller_rc == -1) { 503 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 504 } 505 #endif 506 507 if (poller_rc > rc) { 508 rc = poller_rc; 509 } 510 511 } 512 513 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 514 int timer_rc = 0; 515 516 if (thread->exit) { 517 break; 518 } 519 520 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 521 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 522 free(poller); 523 continue; 524 } 525 526 if (now < poller->next_run_tick) { 527 break; 528 } 529 530 poller->state = SPDK_POLLER_STATE_RUNNING; 531 timer_rc = poller->fn(poller->arg); 532 533 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 534 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 535 free(poller); 536 continue; 537 } 538 539 poller->state = SPDK_POLLER_STATE_WAITING; 540 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 541 _spdk_poller_insert_timer(thread, poller, now); 542 543 #ifdef DEBUG 544 if (timer_rc == -1) { 545 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 546 } 547 #endif 548 549 if (timer_rc > rc) { 550 rc = timer_rc; 551 552 } 553 } 554 555 if (rc == 0) { 556 /* Poller status idle */ 557 thread->stats.idle_tsc += now - thread->tsc_last; 558 } else if (rc > 0) { 559 /* Poller status busy */ 560 thread->stats.busy_tsc += now - thread->tsc_last; 561 } 562 thread->tsc_last = now; 563 564 tls_thread = orig_thread; 565 566 return rc; 567 } 568 569 uint64_t 570 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 571 { 572 struct spdk_poller *poller; 573 574 poller = TAILQ_FIRST(&thread->timer_pollers); 575 if (poller) { 576 return poller->next_run_tick; 577 } 578 579 return 0; 580 } 581 582 int 583 spdk_thread_has_active_pollers(struct spdk_thread *thread) 584 { 585 return !TAILQ_EMPTY(&thread->active_pollers); 586 } 587 588 bool 589 spdk_thread_has_pollers(struct spdk_thread *thread) 590 { 591 if (TAILQ_EMPTY(&thread->active_pollers) && 592 TAILQ_EMPTY(&thread->timer_pollers)) { 593 return false; 594 } 595 596 return true; 597 } 598 599 bool 600 spdk_thread_is_idle(struct spdk_thread *thread) 601 { 602 if (spdk_ring_count(thread->messages) || 603 spdk_thread_has_pollers(thread)) { 604 return false; 605 } 606 607 return true; 608 } 609 610 uint32_t 611 spdk_thread_get_count(void) 612 { 613 /* 614 * Return cached value of the current thread count. We could acquire the 615 * lock and iterate through the TAILQ of threads to count them, but that 616 * count could still be invalidated after we release the lock. 617 */ 618 return g_thread_count; 619 } 620 621 struct spdk_thread * 622 spdk_get_thread(void) 623 { 624 struct spdk_thread *thread; 625 626 thread = _get_thread(); 627 if (!thread) { 628 SPDK_ERRLOG("No thread allocated\n"); 629 } 630 631 return thread; 632 } 633 634 const char * 635 spdk_thread_get_name(const struct spdk_thread *thread) 636 { 637 return thread->name; 638 } 639 640 int 641 spdk_thread_get_stats(struct spdk_thread_stats *stats) 642 { 643 struct spdk_thread *thread; 644 645 thread = _get_thread(); 646 if (!thread) { 647 SPDK_ERRLOG("No thread allocated\n"); 648 return -EINVAL; 649 } 650 651 if (stats == NULL) { 652 return -EINVAL; 653 } 654 655 *stats = thread->stats; 656 657 return 0; 658 } 659 660 void 661 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 662 { 663 struct spdk_thread *local_thread; 664 struct spdk_msg *msg; 665 int rc; 666 667 if (!thread) { 668 assert(false); 669 return; 670 } 671 672 local_thread = _get_thread(); 673 674 msg = NULL; 675 if (local_thread != NULL) { 676 if (local_thread->msg_cache_count > 0) { 677 msg = SLIST_FIRST(&local_thread->msg_cache); 678 assert(msg != NULL); 679 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 680 local_thread->msg_cache_count--; 681 } 682 } 683 684 if (msg == NULL) { 685 msg = spdk_mempool_get(g_spdk_msg_mempool); 686 if (!msg) { 687 assert(false); 688 return; 689 } 690 } 691 692 msg->fn = fn; 693 msg->arg = ctx; 694 695 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 696 if (rc != 1) { 697 assert(false); 698 spdk_mempool_put(g_spdk_msg_mempool, msg); 699 return; 700 } 701 } 702 703 struct spdk_poller * 704 spdk_poller_register(spdk_poller_fn fn, 705 void *arg, 706 uint64_t period_microseconds) 707 { 708 struct spdk_thread *thread; 709 struct spdk_poller *poller; 710 uint64_t quotient, remainder, ticks; 711 712 thread = spdk_get_thread(); 713 if (!thread) { 714 assert(false); 715 return NULL; 716 } 717 718 poller = calloc(1, sizeof(*poller)); 719 if (poller == NULL) { 720 SPDK_ERRLOG("Poller memory allocation failed\n"); 721 return NULL; 722 } 723 724 poller->state = SPDK_POLLER_STATE_WAITING; 725 poller->fn = fn; 726 poller->arg = arg; 727 728 if (period_microseconds) { 729 quotient = period_microseconds / SPDK_SEC_TO_USEC; 730 remainder = period_microseconds % SPDK_SEC_TO_USEC; 731 ticks = spdk_get_ticks_hz(); 732 733 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 734 } else { 735 poller->period_ticks = 0; 736 } 737 738 if (poller->period_ticks) { 739 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 740 } else { 741 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 742 } 743 744 return poller; 745 } 746 747 void 748 spdk_poller_unregister(struct spdk_poller **ppoller) 749 { 750 struct spdk_thread *thread; 751 struct spdk_poller *poller; 752 753 poller = *ppoller; 754 if (poller == NULL) { 755 return; 756 } 757 758 *ppoller = NULL; 759 760 thread = spdk_get_thread(); 761 if (!thread) { 762 assert(false); 763 return; 764 } 765 766 /* Simply set the state to unregistered. The poller will get cleaned up 767 * in a subsequent call to spdk_thread_poll(). 768 */ 769 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 770 } 771 772 struct call_thread { 773 struct spdk_thread *cur_thread; 774 spdk_msg_fn fn; 775 void *ctx; 776 777 struct spdk_thread *orig_thread; 778 spdk_msg_fn cpl; 779 }; 780 781 static void 782 spdk_on_thread(void *ctx) 783 { 784 struct call_thread *ct = ctx; 785 786 ct->fn(ct->ctx); 787 788 pthread_mutex_lock(&g_devlist_mutex); 789 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 790 pthread_mutex_unlock(&g_devlist_mutex); 791 792 if (!ct->cur_thread) { 793 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 794 795 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 796 free(ctx); 797 } else { 798 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 799 ct->cur_thread->name); 800 801 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 802 } 803 } 804 805 void 806 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 807 { 808 struct call_thread *ct; 809 struct spdk_thread *thread; 810 811 ct = calloc(1, sizeof(*ct)); 812 if (!ct) { 813 SPDK_ERRLOG("Unable to perform thread iteration\n"); 814 cpl(ctx); 815 return; 816 } 817 818 ct->fn = fn; 819 ct->ctx = ctx; 820 ct->cpl = cpl; 821 822 thread = _get_thread(); 823 if (!thread) { 824 SPDK_ERRLOG("No thread allocated\n"); 825 free(ct); 826 cpl(ctx); 827 return; 828 } 829 ct->orig_thread = thread; 830 831 pthread_mutex_lock(&g_devlist_mutex); 832 ct->cur_thread = TAILQ_FIRST(&g_threads); 833 pthread_mutex_unlock(&g_devlist_mutex); 834 835 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 836 ct->orig_thread->name); 837 838 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 839 } 840 841 void 842 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 843 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 844 const char *name) 845 { 846 struct io_device *dev, *tmp; 847 struct spdk_thread *thread; 848 849 assert(io_device != NULL); 850 assert(create_cb != NULL); 851 assert(destroy_cb != NULL); 852 853 thread = spdk_get_thread(); 854 if (!thread) { 855 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 856 assert(false); 857 return; 858 } 859 860 dev = calloc(1, sizeof(struct io_device)); 861 if (dev == NULL) { 862 SPDK_ERRLOG("could not allocate io_device\n"); 863 return; 864 } 865 866 dev->io_device = io_device; 867 if (name) { 868 dev->name = strdup(name); 869 } else { 870 dev->name = spdk_sprintf_alloc("%p", dev); 871 } 872 dev->create_cb = create_cb; 873 dev->destroy_cb = destroy_cb; 874 dev->unregister_cb = NULL; 875 dev->ctx_size = ctx_size; 876 dev->for_each_count = 0; 877 dev->unregistered = false; 878 dev->refcnt = 0; 879 880 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 881 dev->name, dev->io_device, thread->name); 882 883 pthread_mutex_lock(&g_devlist_mutex); 884 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 885 if (tmp->io_device == io_device) { 886 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 887 io_device, tmp->name, dev->name); 888 free(dev->name); 889 free(dev); 890 pthread_mutex_unlock(&g_devlist_mutex); 891 return; 892 } 893 } 894 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 895 pthread_mutex_unlock(&g_devlist_mutex); 896 } 897 898 static void 899 _finish_unregister(void *arg) 900 { 901 struct io_device *dev = arg; 902 903 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 904 dev->name, dev->io_device, dev->unregister_thread->name); 905 906 dev->unregister_cb(dev->io_device); 907 free(dev->name); 908 free(dev); 909 } 910 911 static void 912 _spdk_io_device_free(struct io_device *dev) 913 { 914 if (dev->unregister_cb == NULL) { 915 free(dev->name); 916 free(dev); 917 } else { 918 assert(dev->unregister_thread != NULL); 919 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 920 dev->name, dev->io_device, dev->unregister_thread->name); 921 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 922 } 923 } 924 925 void 926 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 927 { 928 struct io_device *dev; 929 uint32_t refcnt; 930 struct spdk_thread *thread; 931 932 thread = spdk_get_thread(); 933 if (!thread) { 934 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 935 assert(false); 936 return; 937 } 938 939 pthread_mutex_lock(&g_devlist_mutex); 940 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 941 if (dev->io_device == io_device) { 942 break; 943 } 944 } 945 946 if (!dev) { 947 SPDK_ERRLOG("io_device %p not found\n", io_device); 948 assert(false); 949 pthread_mutex_unlock(&g_devlist_mutex); 950 return; 951 } 952 953 if (dev->for_each_count > 0) { 954 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 955 dev->name, io_device, dev->for_each_count); 956 pthread_mutex_unlock(&g_devlist_mutex); 957 return; 958 } 959 960 dev->unregister_cb = unregister_cb; 961 dev->unregistered = true; 962 TAILQ_REMOVE(&g_io_devices, dev, tailq); 963 refcnt = dev->refcnt; 964 dev->unregister_thread = thread; 965 pthread_mutex_unlock(&g_devlist_mutex); 966 967 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 968 dev->name, dev->io_device, thread->name); 969 970 if (refcnt > 0) { 971 /* defer deletion */ 972 return; 973 } 974 975 _spdk_io_device_free(dev); 976 } 977 978 struct spdk_io_channel * 979 spdk_get_io_channel(void *io_device) 980 { 981 struct spdk_io_channel *ch; 982 struct spdk_thread *thread; 983 struct io_device *dev; 984 int rc; 985 986 pthread_mutex_lock(&g_devlist_mutex); 987 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 988 if (dev->io_device == io_device) { 989 break; 990 } 991 } 992 if (dev == NULL) { 993 SPDK_ERRLOG("could not find io_device %p\n", io_device); 994 pthread_mutex_unlock(&g_devlist_mutex); 995 return NULL; 996 } 997 998 thread = _get_thread(); 999 if (!thread) { 1000 SPDK_ERRLOG("No thread allocated\n"); 1001 pthread_mutex_unlock(&g_devlist_mutex); 1002 return NULL; 1003 } 1004 1005 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1006 if (ch->dev == dev) { 1007 ch->ref++; 1008 1009 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1010 ch, dev->name, dev->io_device, thread->name, ch->ref); 1011 1012 /* 1013 * An I/O channel already exists for this device on this 1014 * thread, so return it. 1015 */ 1016 pthread_mutex_unlock(&g_devlist_mutex); 1017 return ch; 1018 } 1019 } 1020 1021 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1022 if (ch == NULL) { 1023 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1024 pthread_mutex_unlock(&g_devlist_mutex); 1025 return NULL; 1026 } 1027 1028 ch->dev = dev; 1029 ch->destroy_cb = dev->destroy_cb; 1030 ch->thread = thread; 1031 ch->ref = 1; 1032 ch->destroy_ref = 0; 1033 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1034 1035 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1036 ch, dev->name, dev->io_device, thread->name, ch->ref); 1037 1038 dev->refcnt++; 1039 1040 pthread_mutex_unlock(&g_devlist_mutex); 1041 1042 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1043 if (rc != 0) { 1044 pthread_mutex_lock(&g_devlist_mutex); 1045 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1046 dev->refcnt--; 1047 free(ch); 1048 pthread_mutex_unlock(&g_devlist_mutex); 1049 return NULL; 1050 } 1051 1052 return ch; 1053 } 1054 1055 static void 1056 _spdk_put_io_channel(void *arg) 1057 { 1058 struct spdk_io_channel *ch = arg; 1059 bool do_remove_dev = true; 1060 struct spdk_thread *thread; 1061 1062 thread = spdk_get_thread(); 1063 if (!thread) { 1064 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 1065 assert(false); 1066 return; 1067 } 1068 1069 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1070 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 1071 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 1072 1073 assert(ch->thread == thread); 1074 1075 ch->destroy_ref--; 1076 1077 if (ch->ref > 0 || ch->destroy_ref > 0) { 1078 /* 1079 * Another reference to the associated io_device was requested 1080 * after this message was sent but before it had a chance to 1081 * execute. 1082 */ 1083 return; 1084 } 1085 1086 pthread_mutex_lock(&g_devlist_mutex); 1087 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1088 pthread_mutex_unlock(&g_devlist_mutex); 1089 1090 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1091 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1092 1093 pthread_mutex_lock(&g_devlist_mutex); 1094 ch->dev->refcnt--; 1095 1096 if (!ch->dev->unregistered) { 1097 do_remove_dev = false; 1098 } 1099 1100 if (ch->dev->refcnt > 0) { 1101 do_remove_dev = false; 1102 } 1103 1104 pthread_mutex_unlock(&g_devlist_mutex); 1105 1106 if (do_remove_dev) { 1107 _spdk_io_device_free(ch->dev); 1108 } 1109 free(ch); 1110 } 1111 1112 void 1113 spdk_put_io_channel(struct spdk_io_channel *ch) 1114 { 1115 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1116 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1117 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1118 1119 ch->ref--; 1120 1121 if (ch->ref == 0) { 1122 ch->destroy_ref++; 1123 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1124 } 1125 } 1126 1127 struct spdk_io_channel * 1128 spdk_io_channel_from_ctx(void *ctx) 1129 { 1130 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1131 } 1132 1133 struct spdk_thread * 1134 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1135 { 1136 return ch->thread; 1137 } 1138 1139 struct spdk_io_channel_iter { 1140 void *io_device; 1141 struct io_device *dev; 1142 spdk_channel_msg fn; 1143 int status; 1144 void *ctx; 1145 struct spdk_io_channel *ch; 1146 1147 struct spdk_thread *cur_thread; 1148 1149 struct spdk_thread *orig_thread; 1150 spdk_channel_for_each_cpl cpl; 1151 }; 1152 1153 void * 1154 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1155 { 1156 return i->io_device; 1157 } 1158 1159 struct spdk_io_channel * 1160 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1161 { 1162 return i->ch; 1163 } 1164 1165 void * 1166 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1167 { 1168 return i->ctx; 1169 } 1170 1171 static void 1172 _call_completion(void *ctx) 1173 { 1174 struct spdk_io_channel_iter *i = ctx; 1175 1176 if (i->cpl != NULL) { 1177 i->cpl(i, i->status); 1178 } 1179 free(i); 1180 } 1181 1182 static void 1183 _call_channel(void *ctx) 1184 { 1185 struct spdk_io_channel_iter *i = ctx; 1186 struct spdk_io_channel *ch; 1187 1188 /* 1189 * It is possible that the channel was deleted before this 1190 * message had a chance to execute. If so, skip calling 1191 * the fn() on this thread. 1192 */ 1193 pthread_mutex_lock(&g_devlist_mutex); 1194 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1195 if (ch->dev->io_device == i->io_device) { 1196 break; 1197 } 1198 } 1199 pthread_mutex_unlock(&g_devlist_mutex); 1200 1201 if (ch) { 1202 i->fn(i); 1203 } else { 1204 spdk_for_each_channel_continue(i, 0); 1205 } 1206 } 1207 1208 void 1209 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1210 spdk_channel_for_each_cpl cpl) 1211 { 1212 struct spdk_thread *thread; 1213 struct spdk_io_channel *ch; 1214 struct spdk_io_channel_iter *i; 1215 1216 i = calloc(1, sizeof(*i)); 1217 if (!i) { 1218 SPDK_ERRLOG("Unable to allocate iterator\n"); 1219 return; 1220 } 1221 1222 i->io_device = io_device; 1223 i->fn = fn; 1224 i->ctx = ctx; 1225 i->cpl = cpl; 1226 1227 pthread_mutex_lock(&g_devlist_mutex); 1228 i->orig_thread = _get_thread(); 1229 1230 TAILQ_FOREACH(thread, &g_threads, tailq) { 1231 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1232 if (ch->dev->io_device == io_device) { 1233 ch->dev->for_each_count++; 1234 i->dev = ch->dev; 1235 i->cur_thread = thread; 1236 i->ch = ch; 1237 pthread_mutex_unlock(&g_devlist_mutex); 1238 spdk_thread_send_msg(thread, _call_channel, i); 1239 return; 1240 } 1241 } 1242 } 1243 1244 pthread_mutex_unlock(&g_devlist_mutex); 1245 1246 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1247 } 1248 1249 void 1250 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1251 { 1252 struct spdk_thread *thread; 1253 struct spdk_io_channel *ch; 1254 1255 assert(i->cur_thread == spdk_get_thread()); 1256 1257 i->status = status; 1258 1259 pthread_mutex_lock(&g_devlist_mutex); 1260 if (status) { 1261 goto end; 1262 } 1263 thread = TAILQ_NEXT(i->cur_thread, tailq); 1264 while (thread) { 1265 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1266 if (ch->dev->io_device == i->io_device) { 1267 i->cur_thread = thread; 1268 i->ch = ch; 1269 pthread_mutex_unlock(&g_devlist_mutex); 1270 spdk_thread_send_msg(thread, _call_channel, i); 1271 return; 1272 } 1273 } 1274 thread = TAILQ_NEXT(thread, tailq); 1275 } 1276 1277 end: 1278 i->dev->for_each_count--; 1279 i->ch = NULL; 1280 pthread_mutex_unlock(&g_devlist_mutex); 1281 1282 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1283 } 1284 1285 1286 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1287