1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #define SPDK_MSG_BATCH_SIZE 8 46 #define SPDK_MAX_DEVICE_NAME_LEN 256 47 #define SPDK_MAX_THREAD_NAME_LEN 256 48 49 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 50 51 static spdk_new_thread_fn g_new_thread_fn = NULL; 52 static size_t g_ctx_sz = 0; 53 54 struct io_device { 55 void *io_device; 56 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 57 spdk_io_channel_create_cb create_cb; 58 spdk_io_channel_destroy_cb destroy_cb; 59 spdk_io_device_unregister_cb unregister_cb; 60 struct spdk_thread *unregister_thread; 61 uint32_t ctx_size; 62 uint32_t for_each_count; 63 TAILQ_ENTRY(io_device) tailq; 64 65 uint32_t refcnt; 66 67 bool unregistered; 68 }; 69 70 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 71 72 struct spdk_msg { 73 spdk_msg_fn fn; 74 void *arg; 75 76 SLIST_ENTRY(spdk_msg) link; 77 }; 78 79 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 80 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 81 82 enum spdk_poller_state { 83 /* The poller is registered with a thread but not currently executing its fn. */ 84 SPDK_POLLER_STATE_WAITING, 85 86 /* The poller is currently running its fn. */ 87 SPDK_POLLER_STATE_RUNNING, 88 89 /* The poller was unregistered during the execution of its fn. */ 90 SPDK_POLLER_STATE_UNREGISTERED, 91 }; 92 93 struct spdk_poller { 94 TAILQ_ENTRY(spdk_poller) tailq; 95 96 /* Current state of the poller; should only be accessed from the poller's thread. */ 97 enum spdk_poller_state state; 98 99 uint64_t period_ticks; 100 uint64_t next_run_tick; 101 spdk_poller_fn fn; 102 void *arg; 103 }; 104 105 struct spdk_thread { 106 TAILQ_HEAD(, spdk_io_channel) io_channels; 107 TAILQ_ENTRY(spdk_thread) tailq; 108 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 109 110 bool exit; 111 112 struct spdk_cpuset cpumask; 113 114 uint64_t tsc_last; 115 struct spdk_thread_stats stats; 116 117 /* 118 * Contains pollers actively running on this thread. Pollers 119 * are run round-robin. The thread takes one poller from the head 120 * of the ring, executes it, then puts it back at the tail of 121 * the ring. 122 */ 123 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 124 125 /** 126 * Contains pollers running on this thread with a periodic timer. 127 */ 128 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 129 130 struct spdk_ring *messages; 131 132 SLIST_HEAD(, spdk_msg) msg_cache; 133 size_t msg_cache_count; 134 135 /* User context allocated at the end */ 136 uint8_t ctx[0]; 137 }; 138 139 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 140 static uint32_t g_thread_count = 0; 141 142 static __thread struct spdk_thread *tls_thread = NULL; 143 144 static inline struct spdk_thread * 145 _get_thread(void) 146 { 147 return tls_thread; 148 } 149 150 int 151 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 152 { 153 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 154 155 assert(g_new_thread_fn == NULL); 156 g_new_thread_fn = new_thread_fn; 157 158 g_ctx_sz = ctx_sz; 159 160 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 161 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 162 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 163 sizeof(struct spdk_msg), 164 0, /* No cache. We do our own. */ 165 SPDK_ENV_SOCKET_ID_ANY); 166 167 if (!g_spdk_msg_mempool) { 168 return -1; 169 } 170 171 return 0; 172 } 173 174 void 175 spdk_thread_lib_fini(void) 176 { 177 struct io_device *dev; 178 179 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 180 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 181 } 182 183 if (g_spdk_msg_mempool) { 184 spdk_mempool_free(g_spdk_msg_mempool); 185 g_spdk_msg_mempool = NULL; 186 } 187 188 g_new_thread_fn = NULL; 189 g_ctx_sz = 0; 190 } 191 192 static void 193 _free_thread(struct spdk_thread *thread) 194 { 195 struct spdk_io_channel *ch; 196 struct spdk_msg *msg; 197 struct spdk_poller *poller, *ptmp; 198 199 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 200 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 201 thread->name, ch->dev->name); 202 } 203 204 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 205 if (poller->state == SPDK_POLLER_STATE_WAITING) { 206 SPDK_WARNLOG("poller %p still registered at thread exit\n", 207 poller); 208 } 209 210 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 211 free(poller); 212 } 213 214 215 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { 216 if (poller->state == SPDK_POLLER_STATE_WAITING) { 217 SPDK_WARNLOG("poller %p still registered at thread exit\n", 218 poller); 219 } 220 221 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 222 free(poller); 223 } 224 225 pthread_mutex_lock(&g_devlist_mutex); 226 assert(g_thread_count > 0); 227 g_thread_count--; 228 TAILQ_REMOVE(&g_threads, thread, tailq); 229 pthread_mutex_unlock(&g_devlist_mutex); 230 231 msg = SLIST_FIRST(&thread->msg_cache); 232 while (msg != NULL) { 233 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 234 235 assert(thread->msg_cache_count > 0); 236 thread->msg_cache_count--; 237 spdk_mempool_put(g_spdk_msg_mempool, msg); 238 239 msg = SLIST_FIRST(&thread->msg_cache); 240 } 241 242 assert(thread->msg_cache_count == 0); 243 244 spdk_ring_free(thread->messages); 245 free(thread); 246 } 247 248 struct spdk_thread * 249 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 250 { 251 struct spdk_thread *thread; 252 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 253 int rc, i; 254 255 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 256 if (!thread) { 257 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 258 return NULL; 259 } 260 261 if (cpumask) { 262 spdk_cpuset_copy(&thread->cpumask, cpumask); 263 } else { 264 spdk_cpuset_negate(&thread->cpumask); 265 } 266 267 TAILQ_INIT(&thread->io_channels); 268 TAILQ_INIT(&thread->active_pollers); 269 TAILQ_INIT(&thread->timer_pollers); 270 SLIST_INIT(&thread->msg_cache); 271 thread->msg_cache_count = 0; 272 273 thread->tsc_last = spdk_get_ticks(); 274 275 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 276 if (!thread->messages) { 277 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 278 free(thread); 279 return NULL; 280 } 281 282 /* Fill the local message pool cache. */ 283 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 284 if (rc == 0) { 285 /* If we can't populate the cache it's ok. The cache will get filled 286 * up organically as messages are passed to the thread. */ 287 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 288 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 289 thread->msg_cache_count++; 290 } 291 } 292 293 if (name) { 294 snprintf(thread->name, sizeof(thread->name), "%s", name); 295 } else { 296 snprintf(thread->name, sizeof(thread->name), "%p", thread); 297 } 298 299 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 300 301 pthread_mutex_lock(&g_devlist_mutex); 302 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 303 g_thread_count++; 304 pthread_mutex_unlock(&g_devlist_mutex); 305 306 if (g_new_thread_fn) { 307 rc = g_new_thread_fn(thread); 308 if (rc != 0) { 309 _free_thread(thread); 310 return NULL; 311 } 312 } 313 314 return thread; 315 } 316 317 void 318 spdk_set_thread(struct spdk_thread *thread) 319 { 320 tls_thread = thread; 321 } 322 323 void 324 spdk_thread_exit(struct spdk_thread *thread) 325 { 326 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); 327 328 assert(tls_thread == thread); 329 330 thread->exit = true; 331 } 332 333 void 334 spdk_thread_destroy(struct spdk_thread *thread) 335 { 336 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); 337 338 assert(thread->exit == true); 339 340 if (tls_thread == thread) { 341 tls_thread = NULL; 342 } 343 344 _free_thread(thread); 345 } 346 347 void * 348 spdk_thread_get_ctx(struct spdk_thread *thread) 349 { 350 if (g_ctx_sz > 0) { 351 return thread->ctx; 352 } 353 354 return NULL; 355 } 356 357 struct spdk_cpuset * 358 spdk_thread_get_cpumask(struct spdk_thread *thread) 359 { 360 return &thread->cpumask; 361 } 362 363 struct spdk_thread * 364 spdk_thread_get_from_ctx(void *ctx) 365 { 366 if (ctx == NULL) { 367 assert(false); 368 return NULL; 369 } 370 371 assert(g_ctx_sz > 0); 372 373 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 374 } 375 376 static inline uint32_t 377 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 378 { 379 unsigned count, i; 380 void *messages[SPDK_MSG_BATCH_SIZE]; 381 382 #ifdef DEBUG 383 /* 384 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 385 * so we will never actually read uninitialized data from events, but just to be sure 386 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 387 */ 388 memset(messages, 0, sizeof(messages)); 389 #endif 390 391 if (max_msgs > 0) { 392 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 393 } else { 394 max_msgs = SPDK_MSG_BATCH_SIZE; 395 } 396 397 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 398 if (count == 0) { 399 return 0; 400 } 401 402 for (i = 0; i < count; i++) { 403 struct spdk_msg *msg = messages[i]; 404 405 assert(msg != NULL); 406 msg->fn(msg->arg); 407 408 if (thread->exit) { 409 break; 410 } 411 412 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 413 /* Insert the messages at the head. We want to re-use the hot 414 * ones. */ 415 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 416 thread->msg_cache_count++; 417 } else { 418 spdk_mempool_put(g_spdk_msg_mempool, msg); 419 } 420 } 421 422 return count; 423 } 424 425 static void 426 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 427 { 428 struct spdk_poller *iter; 429 430 poller->next_run_tick = now + poller->period_ticks; 431 432 /* 433 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 434 * run time. 435 */ 436 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 437 if (iter->next_run_tick <= poller->next_run_tick) { 438 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 439 return; 440 } 441 } 442 443 /* No earlier pollers were found, so this poller must be the new head */ 444 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 445 } 446 447 int 448 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 449 { 450 uint32_t msg_count; 451 struct spdk_thread *orig_thread; 452 struct spdk_poller *poller, *tmp; 453 int rc = 0; 454 455 orig_thread = _get_thread(); 456 tls_thread = thread; 457 458 if (now == 0) { 459 now = spdk_get_ticks(); 460 } 461 462 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 463 if (msg_count) { 464 rc = 1; 465 } 466 467 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 468 active_pollers_head, tailq, tmp) { 469 int poller_rc; 470 471 if (thread->exit) { 472 break; 473 } 474 475 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 476 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 477 free(poller); 478 continue; 479 } 480 481 poller->state = SPDK_POLLER_STATE_RUNNING; 482 poller_rc = poller->fn(poller->arg); 483 484 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 485 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 486 free(poller); 487 continue; 488 } 489 490 poller->state = SPDK_POLLER_STATE_WAITING; 491 492 #ifdef DEBUG 493 if (poller_rc == -1) { 494 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 495 } 496 #endif 497 498 if (poller_rc > rc) { 499 rc = poller_rc; 500 } 501 502 } 503 504 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 505 int timer_rc = 0; 506 507 if (thread->exit) { 508 break; 509 } 510 511 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 512 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 513 free(poller); 514 continue; 515 } 516 517 if (now < poller->next_run_tick) { 518 break; 519 } 520 521 poller->state = SPDK_POLLER_STATE_RUNNING; 522 timer_rc = poller->fn(poller->arg); 523 524 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 525 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 526 free(poller); 527 continue; 528 } 529 530 poller->state = SPDK_POLLER_STATE_WAITING; 531 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 532 _spdk_poller_insert_timer(thread, poller, now); 533 534 #ifdef DEBUG 535 if (timer_rc == -1) { 536 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 537 } 538 #endif 539 540 if (timer_rc > rc) { 541 rc = timer_rc; 542 543 } 544 } 545 546 if (rc == 0) { 547 /* Poller status idle */ 548 thread->stats.idle_tsc += now - thread->tsc_last; 549 } else if (rc > 0) { 550 /* Poller status busy */ 551 thread->stats.busy_tsc += now - thread->tsc_last; 552 } 553 thread->tsc_last = now; 554 555 tls_thread = orig_thread; 556 557 return rc; 558 } 559 560 uint64_t 561 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 562 { 563 struct spdk_poller *poller; 564 565 poller = TAILQ_FIRST(&thread->timer_pollers); 566 if (poller) { 567 return poller->next_run_tick; 568 } 569 570 return 0; 571 } 572 573 int 574 spdk_thread_has_active_pollers(struct spdk_thread *thread) 575 { 576 return !TAILQ_EMPTY(&thread->active_pollers); 577 } 578 579 bool 580 spdk_thread_has_pollers(struct spdk_thread *thread) 581 { 582 if (TAILQ_EMPTY(&thread->active_pollers) && 583 TAILQ_EMPTY(&thread->timer_pollers)) { 584 return false; 585 } 586 587 return true; 588 } 589 590 bool 591 spdk_thread_is_idle(struct spdk_thread *thread) 592 { 593 if (spdk_ring_count(thread->messages) || 594 spdk_thread_has_pollers(thread)) { 595 return false; 596 } 597 598 return true; 599 } 600 601 uint32_t 602 spdk_thread_get_count(void) 603 { 604 /* 605 * Return cached value of the current thread count. We could acquire the 606 * lock and iterate through the TAILQ of threads to count them, but that 607 * count could still be invalidated after we release the lock. 608 */ 609 return g_thread_count; 610 } 611 612 struct spdk_thread * 613 spdk_get_thread(void) 614 { 615 struct spdk_thread *thread; 616 617 thread = _get_thread(); 618 if (!thread) { 619 SPDK_ERRLOG("No thread allocated\n"); 620 } 621 622 return thread; 623 } 624 625 const char * 626 spdk_thread_get_name(const struct spdk_thread *thread) 627 { 628 return thread->name; 629 } 630 631 int 632 spdk_thread_get_stats(struct spdk_thread_stats *stats) 633 { 634 struct spdk_thread *thread; 635 636 thread = _get_thread(); 637 if (!thread) { 638 SPDK_ERRLOG("No thread allocated\n"); 639 return -EINVAL; 640 } 641 642 if (stats == NULL) { 643 return -EINVAL; 644 } 645 646 *stats = thread->stats; 647 648 return 0; 649 } 650 651 void 652 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 653 { 654 struct spdk_thread *local_thread; 655 struct spdk_msg *msg; 656 int rc; 657 658 if (!thread) { 659 assert(false); 660 return; 661 } 662 663 local_thread = _get_thread(); 664 665 msg = NULL; 666 if (local_thread != NULL) { 667 if (local_thread->msg_cache_count > 0) { 668 msg = SLIST_FIRST(&local_thread->msg_cache); 669 assert(msg != NULL); 670 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 671 local_thread->msg_cache_count--; 672 } 673 } 674 675 if (msg == NULL) { 676 msg = spdk_mempool_get(g_spdk_msg_mempool); 677 if (!msg) { 678 assert(false); 679 return; 680 } 681 } 682 683 msg->fn = fn; 684 msg->arg = ctx; 685 686 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 687 if (rc != 1) { 688 assert(false); 689 spdk_mempool_put(g_spdk_msg_mempool, msg); 690 return; 691 } 692 } 693 694 struct spdk_poller * 695 spdk_poller_register(spdk_poller_fn fn, 696 void *arg, 697 uint64_t period_microseconds) 698 { 699 struct spdk_thread *thread; 700 struct spdk_poller *poller; 701 uint64_t quotient, remainder, ticks; 702 703 thread = spdk_get_thread(); 704 if (!thread) { 705 assert(false); 706 return NULL; 707 } 708 709 poller = calloc(1, sizeof(*poller)); 710 if (poller == NULL) { 711 SPDK_ERRLOG("Poller memory allocation failed\n"); 712 return NULL; 713 } 714 715 poller->state = SPDK_POLLER_STATE_WAITING; 716 poller->fn = fn; 717 poller->arg = arg; 718 719 if (period_microseconds) { 720 quotient = period_microseconds / SPDK_SEC_TO_USEC; 721 remainder = period_microseconds % SPDK_SEC_TO_USEC; 722 ticks = spdk_get_ticks_hz(); 723 724 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 725 } else { 726 poller->period_ticks = 0; 727 } 728 729 if (poller->period_ticks) { 730 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 731 } else { 732 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 733 } 734 735 return poller; 736 } 737 738 void 739 spdk_poller_unregister(struct spdk_poller **ppoller) 740 { 741 struct spdk_thread *thread; 742 struct spdk_poller *poller; 743 744 poller = *ppoller; 745 if (poller == NULL) { 746 return; 747 } 748 749 *ppoller = NULL; 750 751 thread = spdk_get_thread(); 752 if (!thread) { 753 assert(false); 754 return; 755 } 756 757 /* Simply set the state to unregistered. The poller will get cleaned up 758 * in a subsequent call to spdk_thread_poll(). 759 */ 760 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 761 } 762 763 struct call_thread { 764 struct spdk_thread *cur_thread; 765 spdk_msg_fn fn; 766 void *ctx; 767 768 struct spdk_thread *orig_thread; 769 spdk_msg_fn cpl; 770 }; 771 772 static void 773 spdk_on_thread(void *ctx) 774 { 775 struct call_thread *ct = ctx; 776 777 ct->fn(ct->ctx); 778 779 pthread_mutex_lock(&g_devlist_mutex); 780 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 781 pthread_mutex_unlock(&g_devlist_mutex); 782 783 if (!ct->cur_thread) { 784 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 785 786 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 787 free(ctx); 788 } else { 789 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 790 ct->cur_thread->name); 791 792 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 793 } 794 } 795 796 void 797 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 798 { 799 struct call_thread *ct; 800 struct spdk_thread *thread; 801 802 ct = calloc(1, sizeof(*ct)); 803 if (!ct) { 804 SPDK_ERRLOG("Unable to perform thread iteration\n"); 805 cpl(ctx); 806 return; 807 } 808 809 ct->fn = fn; 810 ct->ctx = ctx; 811 ct->cpl = cpl; 812 813 thread = _get_thread(); 814 if (!thread) { 815 SPDK_ERRLOG("No thread allocated\n"); 816 free(ct); 817 cpl(ctx); 818 return; 819 } 820 ct->orig_thread = thread; 821 822 pthread_mutex_lock(&g_devlist_mutex); 823 ct->cur_thread = TAILQ_FIRST(&g_threads); 824 pthread_mutex_unlock(&g_devlist_mutex); 825 826 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 827 ct->orig_thread->name); 828 829 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 830 } 831 832 void 833 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 834 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 835 const char *name) 836 { 837 struct io_device *dev, *tmp; 838 struct spdk_thread *thread; 839 840 assert(io_device != NULL); 841 assert(create_cb != NULL); 842 assert(destroy_cb != NULL); 843 844 thread = spdk_get_thread(); 845 if (!thread) { 846 SPDK_ERRLOG("called from non-SPDK thread\n"); 847 assert(false); 848 return; 849 } 850 851 dev = calloc(1, sizeof(struct io_device)); 852 if (dev == NULL) { 853 SPDK_ERRLOG("could not allocate io_device\n"); 854 return; 855 } 856 857 dev->io_device = io_device; 858 if (name) { 859 snprintf(dev->name, sizeof(dev->name), "%s", name); 860 } else { 861 snprintf(dev->name, sizeof(dev->name), "%p", dev); 862 } 863 dev->create_cb = create_cb; 864 dev->destroy_cb = destroy_cb; 865 dev->unregister_cb = NULL; 866 dev->ctx_size = ctx_size; 867 dev->for_each_count = 0; 868 dev->unregistered = false; 869 dev->refcnt = 0; 870 871 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 872 dev->name, dev->io_device, thread->name); 873 874 pthread_mutex_lock(&g_devlist_mutex); 875 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 876 if (tmp->io_device == io_device) { 877 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 878 io_device, tmp->name, dev->name); 879 free(dev); 880 pthread_mutex_unlock(&g_devlist_mutex); 881 return; 882 } 883 } 884 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 885 pthread_mutex_unlock(&g_devlist_mutex); 886 } 887 888 static void 889 _finish_unregister(void *arg) 890 { 891 struct io_device *dev = arg; 892 893 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 894 dev->name, dev->io_device, dev->unregister_thread->name); 895 896 dev->unregister_cb(dev->io_device); 897 free(dev); 898 } 899 900 static void 901 _spdk_io_device_free(struct io_device *dev) 902 { 903 if (dev->unregister_cb == NULL) { 904 free(dev); 905 } else { 906 assert(dev->unregister_thread != NULL); 907 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 908 dev->name, dev->io_device, dev->unregister_thread->name); 909 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 910 } 911 } 912 913 void 914 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 915 { 916 struct io_device *dev; 917 uint32_t refcnt; 918 struct spdk_thread *thread; 919 920 thread = spdk_get_thread(); 921 if (!thread) { 922 SPDK_ERRLOG("called from non-SPDK thread\n"); 923 assert(false); 924 return; 925 } 926 927 pthread_mutex_lock(&g_devlist_mutex); 928 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 929 if (dev->io_device == io_device) { 930 break; 931 } 932 } 933 934 if (!dev) { 935 SPDK_ERRLOG("io_device %p not found\n", io_device); 936 assert(false); 937 pthread_mutex_unlock(&g_devlist_mutex); 938 return; 939 } 940 941 if (dev->for_each_count > 0) { 942 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 943 dev->name, io_device, dev->for_each_count); 944 pthread_mutex_unlock(&g_devlist_mutex); 945 return; 946 } 947 948 dev->unregister_cb = unregister_cb; 949 dev->unregistered = true; 950 TAILQ_REMOVE(&g_io_devices, dev, tailq); 951 refcnt = dev->refcnt; 952 dev->unregister_thread = thread; 953 pthread_mutex_unlock(&g_devlist_mutex); 954 955 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 956 dev->name, dev->io_device, thread->name); 957 958 if (refcnt > 0) { 959 /* defer deletion */ 960 return; 961 } 962 963 _spdk_io_device_free(dev); 964 } 965 966 struct spdk_io_channel * 967 spdk_get_io_channel(void *io_device) 968 { 969 struct spdk_io_channel *ch; 970 struct spdk_thread *thread; 971 struct io_device *dev; 972 int rc; 973 974 pthread_mutex_lock(&g_devlist_mutex); 975 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 976 if (dev->io_device == io_device) { 977 break; 978 } 979 } 980 if (dev == NULL) { 981 SPDK_ERRLOG("could not find io_device %p\n", io_device); 982 pthread_mutex_unlock(&g_devlist_mutex); 983 return NULL; 984 } 985 986 thread = _get_thread(); 987 if (!thread) { 988 SPDK_ERRLOG("No thread allocated\n"); 989 pthread_mutex_unlock(&g_devlist_mutex); 990 return NULL; 991 } 992 993 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 994 if (ch->dev == dev) { 995 ch->ref++; 996 997 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 998 ch, dev->name, dev->io_device, thread->name, ch->ref); 999 1000 /* 1001 * An I/O channel already exists for this device on this 1002 * thread, so return it. 1003 */ 1004 pthread_mutex_unlock(&g_devlist_mutex); 1005 return ch; 1006 } 1007 } 1008 1009 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1010 if (ch == NULL) { 1011 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1012 pthread_mutex_unlock(&g_devlist_mutex); 1013 return NULL; 1014 } 1015 1016 ch->dev = dev; 1017 ch->destroy_cb = dev->destroy_cb; 1018 ch->thread = thread; 1019 ch->ref = 1; 1020 ch->destroy_ref = 0; 1021 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1022 1023 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1024 ch, dev->name, dev->io_device, thread->name, ch->ref); 1025 1026 dev->refcnt++; 1027 1028 pthread_mutex_unlock(&g_devlist_mutex); 1029 1030 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1031 if (rc != 0) { 1032 pthread_mutex_lock(&g_devlist_mutex); 1033 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1034 dev->refcnt--; 1035 free(ch); 1036 pthread_mutex_unlock(&g_devlist_mutex); 1037 return NULL; 1038 } 1039 1040 return ch; 1041 } 1042 1043 static void 1044 _spdk_put_io_channel(void *arg) 1045 { 1046 struct spdk_io_channel *ch = arg; 1047 bool do_remove_dev = true; 1048 struct spdk_thread *thread; 1049 1050 thread = spdk_get_thread(); 1051 if (!thread) { 1052 SPDK_ERRLOG("called from non-SPDK thread\n"); 1053 assert(false); 1054 return; 1055 } 1056 1057 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1058 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 1059 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 1060 1061 assert(ch->thread == thread); 1062 1063 ch->destroy_ref--; 1064 1065 if (ch->ref > 0 || ch->destroy_ref > 0) { 1066 /* 1067 * Another reference to the associated io_device was requested 1068 * after this message was sent but before it had a chance to 1069 * execute. 1070 */ 1071 return; 1072 } 1073 1074 pthread_mutex_lock(&g_devlist_mutex); 1075 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1076 pthread_mutex_unlock(&g_devlist_mutex); 1077 1078 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1079 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1080 1081 pthread_mutex_lock(&g_devlist_mutex); 1082 ch->dev->refcnt--; 1083 1084 if (!ch->dev->unregistered) { 1085 do_remove_dev = false; 1086 } 1087 1088 if (ch->dev->refcnt > 0) { 1089 do_remove_dev = false; 1090 } 1091 1092 pthread_mutex_unlock(&g_devlist_mutex); 1093 1094 if (do_remove_dev) { 1095 _spdk_io_device_free(ch->dev); 1096 } 1097 free(ch); 1098 } 1099 1100 void 1101 spdk_put_io_channel(struct spdk_io_channel *ch) 1102 { 1103 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1104 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1105 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1106 1107 ch->ref--; 1108 1109 if (ch->ref == 0) { 1110 ch->destroy_ref++; 1111 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1112 } 1113 } 1114 1115 struct spdk_io_channel * 1116 spdk_io_channel_from_ctx(void *ctx) 1117 { 1118 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1119 } 1120 1121 struct spdk_thread * 1122 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1123 { 1124 return ch->thread; 1125 } 1126 1127 struct spdk_io_channel_iter { 1128 void *io_device; 1129 struct io_device *dev; 1130 spdk_channel_msg fn; 1131 int status; 1132 void *ctx; 1133 struct spdk_io_channel *ch; 1134 1135 struct spdk_thread *cur_thread; 1136 1137 struct spdk_thread *orig_thread; 1138 spdk_channel_for_each_cpl cpl; 1139 }; 1140 1141 void * 1142 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1143 { 1144 return i->io_device; 1145 } 1146 1147 struct spdk_io_channel * 1148 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1149 { 1150 return i->ch; 1151 } 1152 1153 void * 1154 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1155 { 1156 return i->ctx; 1157 } 1158 1159 static void 1160 _call_completion(void *ctx) 1161 { 1162 struct spdk_io_channel_iter *i = ctx; 1163 1164 if (i->cpl != NULL) { 1165 i->cpl(i, i->status); 1166 } 1167 free(i); 1168 } 1169 1170 static void 1171 _call_channel(void *ctx) 1172 { 1173 struct spdk_io_channel_iter *i = ctx; 1174 struct spdk_io_channel *ch; 1175 1176 /* 1177 * It is possible that the channel was deleted before this 1178 * message had a chance to execute. If so, skip calling 1179 * the fn() on this thread. 1180 */ 1181 pthread_mutex_lock(&g_devlist_mutex); 1182 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1183 if (ch->dev->io_device == i->io_device) { 1184 break; 1185 } 1186 } 1187 pthread_mutex_unlock(&g_devlist_mutex); 1188 1189 if (ch) { 1190 i->fn(i); 1191 } else { 1192 spdk_for_each_channel_continue(i, 0); 1193 } 1194 } 1195 1196 void 1197 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1198 spdk_channel_for_each_cpl cpl) 1199 { 1200 struct spdk_thread *thread; 1201 struct spdk_io_channel *ch; 1202 struct spdk_io_channel_iter *i; 1203 1204 i = calloc(1, sizeof(*i)); 1205 if (!i) { 1206 SPDK_ERRLOG("Unable to allocate iterator\n"); 1207 return; 1208 } 1209 1210 i->io_device = io_device; 1211 i->fn = fn; 1212 i->ctx = ctx; 1213 i->cpl = cpl; 1214 1215 pthread_mutex_lock(&g_devlist_mutex); 1216 i->orig_thread = _get_thread(); 1217 1218 TAILQ_FOREACH(thread, &g_threads, tailq) { 1219 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1220 if (ch->dev->io_device == io_device) { 1221 ch->dev->for_each_count++; 1222 i->dev = ch->dev; 1223 i->cur_thread = thread; 1224 i->ch = ch; 1225 pthread_mutex_unlock(&g_devlist_mutex); 1226 spdk_thread_send_msg(thread, _call_channel, i); 1227 return; 1228 } 1229 } 1230 } 1231 1232 pthread_mutex_unlock(&g_devlist_mutex); 1233 1234 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1235 } 1236 1237 void 1238 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1239 { 1240 struct spdk_thread *thread; 1241 struct spdk_io_channel *ch; 1242 1243 assert(i->cur_thread == spdk_get_thread()); 1244 1245 i->status = status; 1246 1247 pthread_mutex_lock(&g_devlist_mutex); 1248 if (status) { 1249 goto end; 1250 } 1251 thread = TAILQ_NEXT(i->cur_thread, tailq); 1252 while (thread) { 1253 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1254 if (ch->dev->io_device == i->io_device) { 1255 i->cur_thread = thread; 1256 i->ch = ch; 1257 pthread_mutex_unlock(&g_devlist_mutex); 1258 spdk_thread_send_msg(thread, _call_channel, i); 1259 return; 1260 } 1261 } 1262 thread = TAILQ_NEXT(thread, tailq); 1263 } 1264 1265 end: 1266 i->dev->for_each_count--; 1267 i->ch = NULL; 1268 pthread_mutex_unlock(&g_devlist_mutex); 1269 1270 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1271 } 1272 1273 1274 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1275