1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #ifdef __linux__ 46 #include <sys/prctl.h> 47 #endif 48 49 #ifdef __FreeBSD__ 50 #include <pthread_np.h> 51 #endif 52 53 #define SPDK_MSG_BATCH_SIZE 8 54 55 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 56 57 static spdk_new_thread_fn g_new_thread_fn = NULL; 58 static size_t g_ctx_sz = 0; 59 60 struct io_device { 61 void *io_device; 62 char *name; 63 spdk_io_channel_create_cb create_cb; 64 spdk_io_channel_destroy_cb destroy_cb; 65 spdk_io_device_unregister_cb unregister_cb; 66 struct spdk_thread *unregister_thread; 67 uint32_t ctx_size; 68 uint32_t for_each_count; 69 TAILQ_ENTRY(io_device) tailq; 70 71 uint32_t refcnt; 72 73 bool unregistered; 74 }; 75 76 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 77 78 struct spdk_msg { 79 spdk_msg_fn fn; 80 void *arg; 81 82 SLIST_ENTRY(spdk_msg) link; 83 }; 84 85 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 86 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 87 88 enum spdk_poller_state { 89 /* The poller is registered with a thread but not currently executing its fn. */ 90 SPDK_POLLER_STATE_WAITING, 91 92 /* The poller is currently running its fn. */ 93 SPDK_POLLER_STATE_RUNNING, 94 95 /* The poller was unregistered during the execution of its fn. */ 96 SPDK_POLLER_STATE_UNREGISTERED, 97 }; 98 99 struct spdk_poller { 100 TAILQ_ENTRY(spdk_poller) tailq; 101 102 /* Current state of the poller; should only be accessed from the poller's thread. */ 103 enum spdk_poller_state state; 104 105 uint64_t period_ticks; 106 uint64_t next_run_tick; 107 spdk_poller_fn fn; 108 void *arg; 109 }; 110 111 struct spdk_thread { 112 TAILQ_HEAD(, spdk_io_channel) io_channels; 113 TAILQ_ENTRY(spdk_thread) tailq; 114 char *name; 115 116 uint64_t tsc_last; 117 struct spdk_thread_stats stats; 118 119 /* 120 * Contains pollers actively running on this thread. Pollers 121 * are run round-robin. The thread takes one poller from the head 122 * of the ring, executes it, then puts it back at the tail of 123 * the ring. 124 */ 125 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 126 127 /** 128 * Contains pollers running on this thread with a periodic timer. 129 */ 130 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 131 132 struct spdk_ring *messages; 133 134 SLIST_HEAD(, spdk_msg) msg_cache; 135 size_t msg_cache_count; 136 137 /* User context allocated at the end */ 138 uint8_t ctx[0]; 139 }; 140 141 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 142 static uint32_t g_thread_count = 0; 143 144 static __thread struct spdk_thread *tls_thread = NULL; 145 146 static inline struct spdk_thread * 147 _get_thread(void) 148 { 149 return tls_thread; 150 } 151 152 static void 153 _set_thread_name(const char *thread_name) 154 { 155 #if defined(__linux__) 156 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 157 #elif defined(__FreeBSD__) 158 pthread_set_name_np(pthread_self(), thread_name); 159 #else 160 #error missing platform support for thread name 161 #endif 162 } 163 164 int 165 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 166 { 167 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 168 169 assert(g_new_thread_fn == NULL); 170 g_new_thread_fn = new_thread_fn; 171 172 g_ctx_sz = ctx_sz; 173 174 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 175 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 176 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 177 sizeof(struct spdk_msg), 178 0, /* No cache. We do our own. */ 179 SPDK_ENV_SOCKET_ID_ANY); 180 181 if (!g_spdk_msg_mempool) { 182 return -1; 183 } 184 185 return 0; 186 } 187 188 void 189 spdk_thread_lib_fini(void) 190 { 191 struct io_device *dev; 192 193 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 194 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 195 } 196 197 if (g_spdk_msg_mempool) { 198 spdk_mempool_free(g_spdk_msg_mempool); 199 } 200 } 201 202 struct spdk_thread * 203 spdk_thread_create(const char *name) 204 { 205 struct spdk_thread *thread; 206 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 207 int rc, i; 208 209 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 210 if (!thread) { 211 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 212 return NULL; 213 } 214 215 TAILQ_INIT(&thread->io_channels); 216 TAILQ_INIT(&thread->active_pollers); 217 TAILQ_INIT(&thread->timer_pollers); 218 SLIST_INIT(&thread->msg_cache); 219 thread->msg_cache_count = 0; 220 221 thread->tsc_last = spdk_get_ticks(); 222 223 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 224 if (!thread->messages) { 225 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 226 free(thread); 227 return NULL; 228 } 229 230 /* Fill the local message pool cache. */ 231 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 232 if (rc == 0) { 233 /* If we can't populate the cache it's ok. The cache will get filled 234 * up organically as messages are passed to the thread. */ 235 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 236 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 237 thread->msg_cache_count++; 238 } 239 } 240 241 if (name) { 242 _set_thread_name(name); 243 thread->name = strdup(name); 244 } else { 245 thread->name = spdk_sprintf_alloc("%p", thread); 246 } 247 248 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 249 250 pthread_mutex_lock(&g_devlist_mutex); 251 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 252 g_thread_count++; 253 pthread_mutex_unlock(&g_devlist_mutex); 254 255 if (g_new_thread_fn) { 256 g_new_thread_fn(thread); 257 } 258 259 return thread; 260 } 261 262 void 263 spdk_set_thread(struct spdk_thread *thread) 264 { 265 tls_thread = thread; 266 } 267 268 void 269 spdk_thread_exit(struct spdk_thread *thread) 270 { 271 struct spdk_io_channel *ch; 272 struct spdk_msg *msg; 273 struct spdk_poller *poller, *ptmp; 274 275 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 276 277 if (tls_thread == thread) { 278 tls_thread = NULL; 279 } 280 281 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 282 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 283 thread->name, ch->dev->name); 284 } 285 286 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 287 if (poller->state == SPDK_POLLER_STATE_WAITING) { 288 SPDK_WARNLOG("poller %p still registered at thread exit\n", 289 poller); 290 } 291 292 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 293 free(poller); 294 } 295 296 297 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { 298 if (poller->state == SPDK_POLLER_STATE_WAITING) { 299 SPDK_WARNLOG("poller %p still registered at thread exit\n", 300 poller); 301 } 302 303 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 304 free(poller); 305 } 306 307 pthread_mutex_lock(&g_devlist_mutex); 308 assert(g_thread_count > 0); 309 g_thread_count--; 310 TAILQ_REMOVE(&g_threads, thread, tailq); 311 pthread_mutex_unlock(&g_devlist_mutex); 312 313 free(thread->name); 314 315 msg = SLIST_FIRST(&thread->msg_cache); 316 while (msg != NULL) { 317 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 318 319 assert(thread->msg_cache_count > 0); 320 thread->msg_cache_count--; 321 spdk_mempool_put(g_spdk_msg_mempool, msg); 322 323 msg = SLIST_FIRST(&thread->msg_cache); 324 } 325 326 assert(thread->msg_cache_count == 0); 327 328 if (thread->messages) { 329 spdk_ring_free(thread->messages); 330 } 331 332 free(thread); 333 } 334 335 void * 336 spdk_thread_get_ctx(struct spdk_thread *thread) 337 { 338 if (g_ctx_sz > 0) { 339 return thread->ctx; 340 } 341 342 return NULL; 343 } 344 345 struct spdk_thread * 346 spdk_thread_get_from_ctx(void *ctx) 347 { 348 if (ctx == NULL) { 349 assert(false); 350 return NULL; 351 } 352 353 assert(g_ctx_sz > 0); 354 355 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 356 } 357 358 static inline uint32_t 359 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 360 { 361 unsigned count, i; 362 void *messages[SPDK_MSG_BATCH_SIZE]; 363 364 #ifdef DEBUG 365 /* 366 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 367 * so we will never actually read uninitialized data from events, but just to be sure 368 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 369 */ 370 memset(messages, 0, sizeof(messages)); 371 #endif 372 373 if (max_msgs > 0) { 374 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 375 } else { 376 max_msgs = SPDK_MSG_BATCH_SIZE; 377 } 378 379 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 380 if (count == 0) { 381 return 0; 382 } 383 384 for (i = 0; i < count; i++) { 385 struct spdk_msg *msg = messages[i]; 386 387 assert(msg != NULL); 388 msg->fn(msg->arg); 389 390 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 391 /* Insert the messages at the head. We want to re-use the hot 392 * ones. */ 393 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 394 thread->msg_cache_count++; 395 } else { 396 spdk_mempool_put(g_spdk_msg_mempool, msg); 397 } 398 } 399 400 return count; 401 } 402 403 static void 404 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 405 { 406 struct spdk_poller *iter; 407 408 poller->next_run_tick = now + poller->period_ticks; 409 410 /* 411 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 412 * run time. 413 */ 414 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 415 if (iter->next_run_tick <= poller->next_run_tick) { 416 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 417 return; 418 } 419 } 420 421 /* No earlier pollers were found, so this poller must be the new head */ 422 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 423 } 424 425 int 426 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 427 { 428 uint32_t msg_count; 429 struct spdk_thread *orig_thread; 430 struct spdk_poller *poller, *tmp; 431 int rc = 0; 432 433 orig_thread = _get_thread(); 434 tls_thread = thread; 435 436 if (now == 0) { 437 now = spdk_get_ticks(); 438 } 439 440 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 441 if (msg_count) { 442 rc = 1; 443 } 444 445 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 446 active_pollers_head, tailq, tmp) { 447 int poller_rc; 448 449 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 450 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 451 free(poller); 452 continue; 453 } 454 455 poller->state = SPDK_POLLER_STATE_RUNNING; 456 poller_rc = poller->fn(poller->arg); 457 458 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 459 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 460 free(poller); 461 continue; 462 } 463 464 poller->state = SPDK_POLLER_STATE_WAITING; 465 466 #ifdef DEBUG 467 if (poller_rc == -1) { 468 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 469 } 470 #endif 471 472 if (poller_rc > rc) { 473 rc = poller_rc; 474 } 475 476 } 477 478 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 479 int timer_rc = 0; 480 481 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 482 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 483 free(poller); 484 continue; 485 } 486 487 if (now < poller->next_run_tick) { 488 break; 489 } 490 491 poller->state = SPDK_POLLER_STATE_RUNNING; 492 timer_rc = poller->fn(poller->arg); 493 494 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 495 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 496 free(poller); 497 continue; 498 } 499 500 poller->state = SPDK_POLLER_STATE_WAITING; 501 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 502 _spdk_poller_insert_timer(thread, poller, now); 503 504 #ifdef DEBUG 505 if (timer_rc == -1) { 506 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 507 } 508 #endif 509 510 if (timer_rc > rc) { 511 rc = timer_rc; 512 513 } 514 } 515 516 if (rc == 0) { 517 /* Poller status idle */ 518 thread->stats.idle_tsc += now - thread->tsc_last; 519 } else if (rc > 0) { 520 /* Poller status busy */ 521 thread->stats.busy_tsc += now - thread->tsc_last; 522 } else { 523 /* Poller status unknown */ 524 thread->stats.unknown_tsc += now - thread->tsc_last; 525 } 526 thread->tsc_last = now; 527 528 tls_thread = orig_thread; 529 530 return rc; 531 } 532 533 uint64_t 534 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 535 { 536 struct spdk_poller *poller; 537 538 poller = TAILQ_FIRST(&thread->timer_pollers); 539 if (poller) { 540 return poller->next_run_tick; 541 } 542 543 return 0; 544 } 545 546 int 547 spdk_thread_has_active_pollers(struct spdk_thread *thread) 548 { 549 return !TAILQ_EMPTY(&thread->active_pollers); 550 } 551 552 bool 553 spdk_thread_has_pollers(struct spdk_thread *thread) 554 { 555 if (TAILQ_EMPTY(&thread->active_pollers) && 556 TAILQ_EMPTY(&thread->timer_pollers)) { 557 return false; 558 } 559 560 return true; 561 } 562 563 bool 564 spdk_thread_is_idle(struct spdk_thread *thread) 565 { 566 if (spdk_ring_count(thread->messages) || 567 spdk_thread_has_pollers(thread)) { 568 return false; 569 } 570 571 return true; 572 } 573 574 uint32_t 575 spdk_thread_get_count(void) 576 { 577 /* 578 * Return cached value of the current thread count. We could acquire the 579 * lock and iterate through the TAILQ of threads to count them, but that 580 * count could still be invalidated after we release the lock. 581 */ 582 return g_thread_count; 583 } 584 585 struct spdk_thread * 586 spdk_get_thread(void) 587 { 588 struct spdk_thread *thread; 589 590 thread = _get_thread(); 591 if (!thread) { 592 SPDK_ERRLOG("No thread allocated\n"); 593 } 594 595 return thread; 596 } 597 598 const char * 599 spdk_thread_get_name(const struct spdk_thread *thread) 600 { 601 return thread->name; 602 } 603 604 int 605 spdk_thread_get_stats(struct spdk_thread_stats *stats) 606 { 607 struct spdk_thread *thread; 608 609 thread = _get_thread(); 610 if (!thread) { 611 SPDK_ERRLOG("No thread allocated\n"); 612 return -EINVAL; 613 } 614 615 if (stats == NULL) { 616 return -EINVAL; 617 } 618 619 *stats = thread->stats; 620 621 return 0; 622 } 623 624 void 625 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 626 { 627 struct spdk_thread *local_thread; 628 struct spdk_msg *msg; 629 int rc; 630 631 if (!thread) { 632 assert(false); 633 return; 634 } 635 636 local_thread = _get_thread(); 637 638 msg = NULL; 639 if (local_thread != NULL) { 640 if (local_thread->msg_cache_count > 0) { 641 msg = SLIST_FIRST(&local_thread->msg_cache); 642 assert(msg != NULL); 643 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 644 local_thread->msg_cache_count--; 645 } 646 } 647 648 if (msg == NULL) { 649 msg = spdk_mempool_get(g_spdk_msg_mempool); 650 if (!msg) { 651 assert(false); 652 return; 653 } 654 } 655 656 msg->fn = fn; 657 msg->arg = ctx; 658 659 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 660 if (rc != 1) { 661 assert(false); 662 spdk_mempool_put(g_spdk_msg_mempool, msg); 663 return; 664 } 665 } 666 667 struct spdk_poller * 668 spdk_poller_register(spdk_poller_fn fn, 669 void *arg, 670 uint64_t period_microseconds) 671 { 672 struct spdk_thread *thread; 673 struct spdk_poller *poller; 674 uint64_t quotient, remainder, ticks; 675 676 thread = spdk_get_thread(); 677 if (!thread) { 678 assert(false); 679 return NULL; 680 } 681 682 poller = calloc(1, sizeof(*poller)); 683 if (poller == NULL) { 684 SPDK_ERRLOG("Poller memory allocation failed\n"); 685 return NULL; 686 } 687 688 poller->state = SPDK_POLLER_STATE_WAITING; 689 poller->fn = fn; 690 poller->arg = arg; 691 692 if (period_microseconds) { 693 quotient = period_microseconds / SPDK_SEC_TO_USEC; 694 remainder = period_microseconds % SPDK_SEC_TO_USEC; 695 ticks = spdk_get_ticks_hz(); 696 697 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 698 } else { 699 poller->period_ticks = 0; 700 } 701 702 if (poller->period_ticks) { 703 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 704 } else { 705 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 706 } 707 708 return poller; 709 } 710 711 void 712 spdk_poller_unregister(struct spdk_poller **ppoller) 713 { 714 struct spdk_thread *thread; 715 struct spdk_poller *poller; 716 717 poller = *ppoller; 718 if (poller == NULL) { 719 return; 720 } 721 722 *ppoller = NULL; 723 724 thread = spdk_get_thread(); 725 if (!thread) { 726 assert(false); 727 return; 728 } 729 730 /* Simply set the state to unregistered. The poller will get cleaned up 731 * in a subsequent call to spdk_thread_poll(). 732 */ 733 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 734 } 735 736 struct call_thread { 737 struct spdk_thread *cur_thread; 738 spdk_msg_fn fn; 739 void *ctx; 740 741 struct spdk_thread *orig_thread; 742 spdk_msg_fn cpl; 743 }; 744 745 static void 746 spdk_on_thread(void *ctx) 747 { 748 struct call_thread *ct = ctx; 749 750 ct->fn(ct->ctx); 751 752 pthread_mutex_lock(&g_devlist_mutex); 753 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 754 pthread_mutex_unlock(&g_devlist_mutex); 755 756 if (!ct->cur_thread) { 757 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 758 759 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 760 free(ctx); 761 } else { 762 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 763 ct->cur_thread->name); 764 765 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 766 } 767 } 768 769 void 770 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 771 { 772 struct call_thread *ct; 773 struct spdk_thread *thread; 774 775 ct = calloc(1, sizeof(*ct)); 776 if (!ct) { 777 SPDK_ERRLOG("Unable to perform thread iteration\n"); 778 cpl(ctx); 779 return; 780 } 781 782 ct->fn = fn; 783 ct->ctx = ctx; 784 ct->cpl = cpl; 785 786 pthread_mutex_lock(&g_devlist_mutex); 787 thread = _get_thread(); 788 if (!thread) { 789 SPDK_ERRLOG("No thread allocated\n"); 790 free(ct); 791 cpl(ctx); 792 return; 793 } 794 ct->orig_thread = thread; 795 ct->cur_thread = TAILQ_FIRST(&g_threads); 796 pthread_mutex_unlock(&g_devlist_mutex); 797 798 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 799 ct->orig_thread->name); 800 801 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 802 } 803 804 void 805 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 806 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 807 const char *name) 808 { 809 struct io_device *dev, *tmp; 810 struct spdk_thread *thread; 811 812 assert(io_device != NULL); 813 assert(create_cb != NULL); 814 assert(destroy_cb != NULL); 815 816 thread = spdk_get_thread(); 817 if (!thread) { 818 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 819 assert(false); 820 return; 821 } 822 823 dev = calloc(1, sizeof(struct io_device)); 824 if (dev == NULL) { 825 SPDK_ERRLOG("could not allocate io_device\n"); 826 return; 827 } 828 829 dev->io_device = io_device; 830 if (name) { 831 dev->name = strdup(name); 832 } else { 833 dev->name = spdk_sprintf_alloc("%p", dev); 834 } 835 dev->create_cb = create_cb; 836 dev->destroy_cb = destroy_cb; 837 dev->unregister_cb = NULL; 838 dev->ctx_size = ctx_size; 839 dev->for_each_count = 0; 840 dev->unregistered = false; 841 dev->refcnt = 0; 842 843 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 844 dev->name, dev->io_device, thread->name); 845 846 pthread_mutex_lock(&g_devlist_mutex); 847 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 848 if (tmp->io_device == io_device) { 849 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 850 io_device, tmp->name, dev->name); 851 free(dev->name); 852 free(dev); 853 pthread_mutex_unlock(&g_devlist_mutex); 854 return; 855 } 856 } 857 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 858 pthread_mutex_unlock(&g_devlist_mutex); 859 } 860 861 static void 862 _finish_unregister(void *arg) 863 { 864 struct io_device *dev = arg; 865 866 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 867 dev->name, dev->io_device, dev->unregister_thread->name); 868 869 dev->unregister_cb(dev->io_device); 870 free(dev->name); 871 free(dev); 872 } 873 874 static void 875 _spdk_io_device_free(struct io_device *dev) 876 { 877 if (dev->unregister_cb == NULL) { 878 free(dev->name); 879 free(dev); 880 } else { 881 assert(dev->unregister_thread != NULL); 882 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 883 dev->name, dev->io_device, dev->unregister_thread->name); 884 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 885 } 886 } 887 888 void 889 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 890 { 891 struct io_device *dev; 892 uint32_t refcnt; 893 struct spdk_thread *thread; 894 895 thread = spdk_get_thread(); 896 if (!thread) { 897 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 898 assert(false); 899 return; 900 } 901 902 pthread_mutex_lock(&g_devlist_mutex); 903 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 904 if (dev->io_device == io_device) { 905 break; 906 } 907 } 908 909 if (!dev) { 910 SPDK_ERRLOG("io_device %p not found\n", io_device); 911 assert(false); 912 pthread_mutex_unlock(&g_devlist_mutex); 913 return; 914 } 915 916 if (dev->for_each_count > 0) { 917 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 918 dev->name, io_device, dev->for_each_count); 919 pthread_mutex_unlock(&g_devlist_mutex); 920 return; 921 } 922 923 dev->unregister_cb = unregister_cb; 924 dev->unregistered = true; 925 TAILQ_REMOVE(&g_io_devices, dev, tailq); 926 refcnt = dev->refcnt; 927 dev->unregister_thread = thread; 928 pthread_mutex_unlock(&g_devlist_mutex); 929 930 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 931 dev->name, dev->io_device, thread->name); 932 933 if (refcnt > 0) { 934 /* defer deletion */ 935 return; 936 } 937 938 _spdk_io_device_free(dev); 939 } 940 941 struct spdk_io_channel * 942 spdk_get_io_channel(void *io_device) 943 { 944 struct spdk_io_channel *ch; 945 struct spdk_thread *thread; 946 struct io_device *dev; 947 int rc; 948 949 pthread_mutex_lock(&g_devlist_mutex); 950 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 951 if (dev->io_device == io_device) { 952 break; 953 } 954 } 955 if (dev == NULL) { 956 SPDK_ERRLOG("could not find io_device %p\n", io_device); 957 pthread_mutex_unlock(&g_devlist_mutex); 958 return NULL; 959 } 960 961 thread = _get_thread(); 962 if (!thread) { 963 SPDK_ERRLOG("No thread allocated\n"); 964 pthread_mutex_unlock(&g_devlist_mutex); 965 return NULL; 966 } 967 968 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 969 if (ch->dev == dev) { 970 ch->ref++; 971 972 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 973 ch, dev->name, dev->io_device, thread->name, ch->ref); 974 975 /* 976 * An I/O channel already exists for this device on this 977 * thread, so return it. 978 */ 979 pthread_mutex_unlock(&g_devlist_mutex); 980 return ch; 981 } 982 } 983 984 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 985 if (ch == NULL) { 986 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 987 pthread_mutex_unlock(&g_devlist_mutex); 988 return NULL; 989 } 990 991 ch->dev = dev; 992 ch->destroy_cb = dev->destroy_cb; 993 ch->thread = thread; 994 ch->ref = 1; 995 ch->destroy_ref = 0; 996 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 997 998 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 999 ch, dev->name, dev->io_device, thread->name, ch->ref); 1000 1001 dev->refcnt++; 1002 1003 pthread_mutex_unlock(&g_devlist_mutex); 1004 1005 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1006 if (rc != 0) { 1007 pthread_mutex_lock(&g_devlist_mutex); 1008 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1009 dev->refcnt--; 1010 free(ch); 1011 pthread_mutex_unlock(&g_devlist_mutex); 1012 return NULL; 1013 } 1014 1015 return ch; 1016 } 1017 1018 static void 1019 _spdk_put_io_channel(void *arg) 1020 { 1021 struct spdk_io_channel *ch = arg; 1022 bool do_remove_dev = true; 1023 struct spdk_thread *thread; 1024 1025 thread = spdk_get_thread(); 1026 if (!thread) { 1027 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 1028 assert(false); 1029 return; 1030 } 1031 1032 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1033 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 1034 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 1035 1036 assert(ch->thread == thread); 1037 1038 ch->destroy_ref--; 1039 1040 if (ch->ref > 0 || ch->destroy_ref > 0) { 1041 /* 1042 * Another reference to the associated io_device was requested 1043 * after this message was sent but before it had a chance to 1044 * execute. 1045 */ 1046 return; 1047 } 1048 1049 pthread_mutex_lock(&g_devlist_mutex); 1050 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1051 pthread_mutex_unlock(&g_devlist_mutex); 1052 1053 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1054 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1055 1056 pthread_mutex_lock(&g_devlist_mutex); 1057 ch->dev->refcnt--; 1058 1059 if (!ch->dev->unregistered) { 1060 do_remove_dev = false; 1061 } 1062 1063 if (ch->dev->refcnt > 0) { 1064 do_remove_dev = false; 1065 } 1066 1067 pthread_mutex_unlock(&g_devlist_mutex); 1068 1069 if (do_remove_dev) { 1070 _spdk_io_device_free(ch->dev); 1071 } 1072 free(ch); 1073 } 1074 1075 void 1076 spdk_put_io_channel(struct spdk_io_channel *ch) 1077 { 1078 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1079 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1080 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1081 1082 ch->ref--; 1083 1084 if (ch->ref == 0) { 1085 ch->destroy_ref++; 1086 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1087 } 1088 } 1089 1090 struct spdk_io_channel * 1091 spdk_io_channel_from_ctx(void *ctx) 1092 { 1093 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1094 } 1095 1096 struct spdk_thread * 1097 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1098 { 1099 return ch->thread; 1100 } 1101 1102 struct spdk_io_channel_iter { 1103 void *io_device; 1104 struct io_device *dev; 1105 spdk_channel_msg fn; 1106 int status; 1107 void *ctx; 1108 struct spdk_io_channel *ch; 1109 1110 struct spdk_thread *cur_thread; 1111 1112 struct spdk_thread *orig_thread; 1113 spdk_channel_for_each_cpl cpl; 1114 }; 1115 1116 void * 1117 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1118 { 1119 return i->io_device; 1120 } 1121 1122 struct spdk_io_channel * 1123 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1124 { 1125 return i->ch; 1126 } 1127 1128 void * 1129 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1130 { 1131 return i->ctx; 1132 } 1133 1134 static void 1135 _call_completion(void *ctx) 1136 { 1137 struct spdk_io_channel_iter *i = ctx; 1138 1139 if (i->cpl != NULL) { 1140 i->cpl(i, i->status); 1141 } 1142 free(i); 1143 } 1144 1145 static void 1146 _call_channel(void *ctx) 1147 { 1148 struct spdk_io_channel_iter *i = ctx; 1149 struct spdk_io_channel *ch; 1150 1151 /* 1152 * It is possible that the channel was deleted before this 1153 * message had a chance to execute. If so, skip calling 1154 * the fn() on this thread. 1155 */ 1156 pthread_mutex_lock(&g_devlist_mutex); 1157 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1158 if (ch->dev->io_device == i->io_device) { 1159 break; 1160 } 1161 } 1162 pthread_mutex_unlock(&g_devlist_mutex); 1163 1164 if (ch) { 1165 i->fn(i); 1166 } else { 1167 spdk_for_each_channel_continue(i, 0); 1168 } 1169 } 1170 1171 void 1172 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1173 spdk_channel_for_each_cpl cpl) 1174 { 1175 struct spdk_thread *thread; 1176 struct spdk_io_channel *ch; 1177 struct spdk_io_channel_iter *i; 1178 1179 i = calloc(1, sizeof(*i)); 1180 if (!i) { 1181 SPDK_ERRLOG("Unable to allocate iterator\n"); 1182 return; 1183 } 1184 1185 i->io_device = io_device; 1186 i->fn = fn; 1187 i->ctx = ctx; 1188 i->cpl = cpl; 1189 1190 pthread_mutex_lock(&g_devlist_mutex); 1191 i->orig_thread = _get_thread(); 1192 1193 TAILQ_FOREACH(thread, &g_threads, tailq) { 1194 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1195 if (ch->dev->io_device == io_device) { 1196 ch->dev->for_each_count++; 1197 i->dev = ch->dev; 1198 i->cur_thread = thread; 1199 i->ch = ch; 1200 pthread_mutex_unlock(&g_devlist_mutex); 1201 spdk_thread_send_msg(thread, _call_channel, i); 1202 return; 1203 } 1204 } 1205 } 1206 1207 pthread_mutex_unlock(&g_devlist_mutex); 1208 1209 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1210 } 1211 1212 void 1213 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1214 { 1215 struct spdk_thread *thread; 1216 struct spdk_io_channel *ch; 1217 1218 assert(i->cur_thread == spdk_get_thread()); 1219 1220 i->status = status; 1221 1222 pthread_mutex_lock(&g_devlist_mutex); 1223 if (status) { 1224 goto end; 1225 } 1226 thread = TAILQ_NEXT(i->cur_thread, tailq); 1227 while (thread) { 1228 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1229 if (ch->dev->io_device == i->io_device) { 1230 i->cur_thread = thread; 1231 i->ch = ch; 1232 pthread_mutex_unlock(&g_devlist_mutex); 1233 spdk_thread_send_msg(thread, _call_channel, i); 1234 return; 1235 } 1236 } 1237 thread = TAILQ_NEXT(thread, tailq); 1238 } 1239 1240 end: 1241 i->dev->for_each_count--; 1242 i->ch = NULL; 1243 pthread_mutex_unlock(&g_devlist_mutex); 1244 1245 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1246 } 1247 1248 1249 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1250