1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #define SPDK_MSG_BATCH_SIZE 8 46 #define SPDK_MAX_DEVICE_NAME_LEN 256 47 #define SPDK_MAX_THREAD_NAME_LEN 256 48 49 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 50 51 static spdk_new_thread_fn g_new_thread_fn = NULL; 52 static size_t g_ctx_sz = 0; 53 54 struct io_device { 55 void *io_device; 56 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 57 spdk_io_channel_create_cb create_cb; 58 spdk_io_channel_destroy_cb destroy_cb; 59 spdk_io_device_unregister_cb unregister_cb; 60 struct spdk_thread *unregister_thread; 61 uint32_t ctx_size; 62 uint32_t for_each_count; 63 TAILQ_ENTRY(io_device) tailq; 64 65 uint32_t refcnt; 66 67 bool unregistered; 68 }; 69 70 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 71 72 struct spdk_msg { 73 spdk_msg_fn fn; 74 void *arg; 75 76 SLIST_ENTRY(spdk_msg) link; 77 }; 78 79 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 80 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 81 82 enum spdk_poller_state { 83 /* The poller is registered with a thread but not currently executing its fn. */ 84 SPDK_POLLER_STATE_WAITING, 85 86 /* The poller is currently running its fn. */ 87 SPDK_POLLER_STATE_RUNNING, 88 89 /* The poller was unregistered during the execution of its fn. */ 90 SPDK_POLLER_STATE_UNREGISTERED, 91 }; 92 93 struct spdk_poller { 94 TAILQ_ENTRY(spdk_poller) tailq; 95 96 /* Current state of the poller; should only be accessed from the poller's thread. */ 97 enum spdk_poller_state state; 98 99 uint64_t period_ticks; 100 uint64_t next_run_tick; 101 spdk_poller_fn fn; 102 void *arg; 103 }; 104 105 struct spdk_thread { 106 TAILQ_HEAD(, spdk_io_channel) io_channels; 107 TAILQ_ENTRY(spdk_thread) tailq; 108 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 109 110 bool exit; 111 112 struct spdk_cpuset cpumask; 113 114 uint64_t tsc_last; 115 struct spdk_thread_stats stats; 116 117 /* 118 * Contains pollers actively running on this thread. Pollers 119 * are run round-robin. The thread takes one poller from the head 120 * of the ring, executes it, then puts it back at the tail of 121 * the ring. 122 */ 123 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 124 125 /** 126 * Contains pollers running on this thread with a periodic timer. 127 */ 128 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 129 130 struct spdk_ring *messages; 131 132 SLIST_HEAD(, spdk_msg) msg_cache; 133 size_t msg_cache_count; 134 135 /* User context allocated at the end */ 136 uint8_t ctx[0]; 137 }; 138 139 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 140 static uint32_t g_thread_count = 0; 141 142 static __thread struct spdk_thread *tls_thread = NULL; 143 144 static inline struct spdk_thread * 145 _get_thread(void) 146 { 147 return tls_thread; 148 } 149 150 int 151 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 152 { 153 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 154 155 assert(g_new_thread_fn == NULL); 156 g_new_thread_fn = new_thread_fn; 157 158 g_ctx_sz = ctx_sz; 159 160 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 161 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 162 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 163 sizeof(struct spdk_msg), 164 0, /* No cache. We do our own. */ 165 SPDK_ENV_SOCKET_ID_ANY); 166 167 if (!g_spdk_msg_mempool) { 168 return -1; 169 } 170 171 return 0; 172 } 173 174 void 175 spdk_thread_lib_fini(void) 176 { 177 struct io_device *dev; 178 179 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 180 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 181 } 182 183 if (g_spdk_msg_mempool) { 184 spdk_mempool_free(g_spdk_msg_mempool); 185 g_spdk_msg_mempool = NULL; 186 } 187 188 g_new_thread_fn = NULL; 189 g_ctx_sz = 0; 190 } 191 192 static void 193 _free_thread(struct spdk_thread *thread) 194 { 195 struct spdk_io_channel *ch; 196 struct spdk_msg *msg; 197 struct spdk_poller *poller, *ptmp; 198 199 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 200 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 201 thread->name, ch->dev->name); 202 } 203 204 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 205 if (poller->state == SPDK_POLLER_STATE_WAITING) { 206 SPDK_WARNLOG("poller %p still registered at thread exit\n", 207 poller); 208 } 209 210 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 211 free(poller); 212 } 213 214 215 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { 216 if (poller->state == SPDK_POLLER_STATE_WAITING) { 217 SPDK_WARNLOG("poller %p still registered at thread exit\n", 218 poller); 219 } 220 221 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 222 free(poller); 223 } 224 225 pthread_mutex_lock(&g_devlist_mutex); 226 assert(g_thread_count > 0); 227 g_thread_count--; 228 TAILQ_REMOVE(&g_threads, thread, tailq); 229 pthread_mutex_unlock(&g_devlist_mutex); 230 231 msg = SLIST_FIRST(&thread->msg_cache); 232 while (msg != NULL) { 233 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 234 235 assert(thread->msg_cache_count > 0); 236 thread->msg_cache_count--; 237 spdk_mempool_put(g_spdk_msg_mempool, msg); 238 239 msg = SLIST_FIRST(&thread->msg_cache); 240 } 241 242 assert(thread->msg_cache_count == 0); 243 244 spdk_ring_free(thread->messages); 245 free(thread); 246 } 247 248 struct spdk_thread * 249 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 250 { 251 struct spdk_thread *thread; 252 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 253 int rc, i; 254 255 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 256 if (!thread) { 257 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 258 return NULL; 259 } 260 261 if (cpumask) { 262 spdk_cpuset_copy(&thread->cpumask, cpumask); 263 } else { 264 spdk_cpuset_negate(&thread->cpumask); 265 } 266 267 TAILQ_INIT(&thread->io_channels); 268 TAILQ_INIT(&thread->active_pollers); 269 TAILQ_INIT(&thread->timer_pollers); 270 SLIST_INIT(&thread->msg_cache); 271 thread->msg_cache_count = 0; 272 273 thread->tsc_last = spdk_get_ticks(); 274 275 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 276 if (!thread->messages) { 277 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 278 free(thread); 279 return NULL; 280 } 281 282 /* Fill the local message pool cache. */ 283 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 284 if (rc == 0) { 285 /* If we can't populate the cache it's ok. The cache will get filled 286 * up organically as messages are passed to the thread. */ 287 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 288 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 289 thread->msg_cache_count++; 290 } 291 } 292 293 if (name) { 294 snprintf(thread->name, sizeof(thread->name), "%s", name); 295 } else { 296 snprintf(thread->name, sizeof(thread->name), "%p", thread); 297 } 298 299 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 300 301 pthread_mutex_lock(&g_devlist_mutex); 302 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 303 g_thread_count++; 304 pthread_mutex_unlock(&g_devlist_mutex); 305 306 if (g_new_thread_fn) { 307 rc = g_new_thread_fn(thread); 308 if (rc != 0) { 309 _free_thread(thread); 310 return NULL; 311 } 312 } 313 314 return thread; 315 } 316 317 void 318 spdk_set_thread(struct spdk_thread *thread) 319 { 320 tls_thread = thread; 321 } 322 323 void 324 spdk_thread_exit(struct spdk_thread *thread) 325 { 326 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); 327 328 assert(tls_thread == thread); 329 330 thread->exit = true; 331 } 332 333 void 334 spdk_thread_destroy(struct spdk_thread *thread) 335 { 336 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); 337 338 assert(thread->exit == true); 339 340 if (tls_thread == thread) { 341 tls_thread = NULL; 342 } 343 344 _free_thread(thread); 345 } 346 347 void * 348 spdk_thread_get_ctx(struct spdk_thread *thread) 349 { 350 if (g_ctx_sz > 0) { 351 return thread->ctx; 352 } 353 354 return NULL; 355 } 356 357 struct spdk_cpuset * 358 spdk_thread_get_cpumask(struct spdk_thread *thread) 359 { 360 return &thread->cpumask; 361 } 362 363 struct spdk_thread * 364 spdk_thread_get_from_ctx(void *ctx) 365 { 366 if (ctx == NULL) { 367 assert(false); 368 return NULL; 369 } 370 371 assert(g_ctx_sz > 0); 372 373 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 374 } 375 376 static inline uint32_t 377 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 378 { 379 unsigned count, i; 380 void *messages[SPDK_MSG_BATCH_SIZE]; 381 382 #ifdef DEBUG 383 /* 384 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 385 * so we will never actually read uninitialized data from events, but just to be sure 386 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 387 */ 388 memset(messages, 0, sizeof(messages)); 389 #endif 390 391 if (max_msgs > 0) { 392 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 393 } else { 394 max_msgs = SPDK_MSG_BATCH_SIZE; 395 } 396 397 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 398 if (count == 0) { 399 return 0; 400 } 401 402 for (i = 0; i < count; i++) { 403 struct spdk_msg *msg = messages[i]; 404 405 assert(msg != NULL); 406 msg->fn(msg->arg); 407 408 if (thread->exit) { 409 break; 410 } 411 412 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 413 /* Insert the messages at the head. We want to re-use the hot 414 * ones. */ 415 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 416 thread->msg_cache_count++; 417 } else { 418 spdk_mempool_put(g_spdk_msg_mempool, msg); 419 } 420 } 421 422 return count; 423 } 424 425 static void 426 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 427 { 428 struct spdk_poller *iter; 429 430 poller->next_run_tick = now + poller->period_ticks; 431 432 /* 433 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 434 * run time. 435 */ 436 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 437 if (iter->next_run_tick <= poller->next_run_tick) { 438 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 439 return; 440 } 441 } 442 443 /* No earlier pollers were found, so this poller must be the new head */ 444 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 445 } 446 447 int 448 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 449 { 450 uint32_t msg_count; 451 struct spdk_thread *orig_thread; 452 struct spdk_poller *poller, *tmp; 453 int rc = 0; 454 455 orig_thread = _get_thread(); 456 tls_thread = thread; 457 458 if (now == 0) { 459 now = spdk_get_ticks(); 460 } 461 462 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 463 if (msg_count) { 464 rc = 1; 465 } 466 467 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 468 active_pollers_head, tailq, tmp) { 469 int poller_rc; 470 471 if (thread->exit) { 472 break; 473 } 474 475 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 476 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 477 free(poller); 478 continue; 479 } 480 481 poller->state = SPDK_POLLER_STATE_RUNNING; 482 poller_rc = poller->fn(poller->arg); 483 484 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 485 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 486 free(poller); 487 continue; 488 } 489 490 poller->state = SPDK_POLLER_STATE_WAITING; 491 492 #ifdef DEBUG 493 if (poller_rc == -1) { 494 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 495 } 496 #endif 497 498 if (poller_rc > rc) { 499 rc = poller_rc; 500 } 501 502 } 503 504 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 505 int timer_rc = 0; 506 507 if (thread->exit) { 508 break; 509 } 510 511 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 512 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 513 free(poller); 514 continue; 515 } 516 517 if (now < poller->next_run_tick) { 518 break; 519 } 520 521 poller->state = SPDK_POLLER_STATE_RUNNING; 522 timer_rc = poller->fn(poller->arg); 523 524 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 525 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 526 free(poller); 527 continue; 528 } 529 530 poller->state = SPDK_POLLER_STATE_WAITING; 531 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 532 _spdk_poller_insert_timer(thread, poller, now); 533 534 #ifdef DEBUG 535 if (timer_rc == -1) { 536 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 537 } 538 #endif 539 540 if (timer_rc > rc) { 541 rc = timer_rc; 542 543 } 544 } 545 546 if (rc == 0) { 547 /* Poller status idle */ 548 thread->stats.idle_tsc += now - thread->tsc_last; 549 } else if (rc > 0) { 550 /* Poller status busy */ 551 thread->stats.busy_tsc += now - thread->tsc_last; 552 } 553 thread->tsc_last = now; 554 555 tls_thread = orig_thread; 556 557 return rc; 558 } 559 560 uint64_t 561 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 562 { 563 struct spdk_poller *poller; 564 565 poller = TAILQ_FIRST(&thread->timer_pollers); 566 if (poller) { 567 return poller->next_run_tick; 568 } 569 570 return 0; 571 } 572 573 int 574 spdk_thread_has_active_pollers(struct spdk_thread *thread) 575 { 576 return !TAILQ_EMPTY(&thread->active_pollers); 577 } 578 579 bool 580 spdk_thread_has_pollers(struct spdk_thread *thread) 581 { 582 if (TAILQ_EMPTY(&thread->active_pollers) && 583 TAILQ_EMPTY(&thread->timer_pollers)) { 584 return false; 585 } 586 587 return true; 588 } 589 590 bool 591 spdk_thread_is_idle(struct spdk_thread *thread) 592 { 593 if (spdk_ring_count(thread->messages) || 594 spdk_thread_has_pollers(thread)) { 595 return false; 596 } 597 598 return true; 599 } 600 601 uint32_t 602 spdk_thread_get_count(void) 603 { 604 /* 605 * Return cached value of the current thread count. We could acquire the 606 * lock and iterate through the TAILQ of threads to count them, but that 607 * count could still be invalidated after we release the lock. 608 */ 609 return g_thread_count; 610 } 611 612 struct spdk_thread * 613 spdk_get_thread(void) 614 { 615 struct spdk_thread *thread; 616 617 thread = _get_thread(); 618 if (!thread) { 619 SPDK_ERRLOG("No thread allocated\n"); 620 } 621 622 return thread; 623 } 624 625 const char * 626 spdk_thread_get_name(const struct spdk_thread *thread) 627 { 628 return thread->name; 629 } 630 631 int 632 spdk_thread_get_stats(struct spdk_thread_stats *stats) 633 { 634 struct spdk_thread *thread; 635 636 thread = _get_thread(); 637 if (!thread) { 638 SPDK_ERRLOG("No thread allocated\n"); 639 return -EINVAL; 640 } 641 642 if (stats == NULL) { 643 return -EINVAL; 644 } 645 646 *stats = thread->stats; 647 648 return 0; 649 } 650 651 int 652 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 653 { 654 struct spdk_thread *local_thread; 655 struct spdk_msg *msg; 656 int rc; 657 658 assert(thread != NULL); 659 660 local_thread = _get_thread(); 661 662 msg = NULL; 663 if (local_thread != NULL) { 664 if (local_thread->msg_cache_count > 0) { 665 msg = SLIST_FIRST(&local_thread->msg_cache); 666 assert(msg != NULL); 667 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 668 local_thread->msg_cache_count--; 669 } 670 } 671 672 if (msg == NULL) { 673 msg = spdk_mempool_get(g_spdk_msg_mempool); 674 if (!msg) { 675 SPDK_ERRLOG("msg could not be allocated\n"); 676 return -ENOMEM; 677 } 678 } 679 680 msg->fn = fn; 681 msg->arg = ctx; 682 683 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 684 if (rc != 1) { 685 SPDK_ERRLOG("msg could not be enqueued\n"); 686 spdk_mempool_put(g_spdk_msg_mempool, msg); 687 return -EIO; 688 } 689 690 return 0; 691 } 692 693 struct spdk_poller * 694 spdk_poller_register(spdk_poller_fn fn, 695 void *arg, 696 uint64_t period_microseconds) 697 { 698 struct spdk_thread *thread; 699 struct spdk_poller *poller; 700 uint64_t quotient, remainder, ticks; 701 702 thread = spdk_get_thread(); 703 if (!thread) { 704 assert(false); 705 return NULL; 706 } 707 708 poller = calloc(1, sizeof(*poller)); 709 if (poller == NULL) { 710 SPDK_ERRLOG("Poller memory allocation failed\n"); 711 return NULL; 712 } 713 714 poller->state = SPDK_POLLER_STATE_WAITING; 715 poller->fn = fn; 716 poller->arg = arg; 717 718 if (period_microseconds) { 719 quotient = period_microseconds / SPDK_SEC_TO_USEC; 720 remainder = period_microseconds % SPDK_SEC_TO_USEC; 721 ticks = spdk_get_ticks_hz(); 722 723 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 724 } else { 725 poller->period_ticks = 0; 726 } 727 728 if (poller->period_ticks) { 729 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 730 } else { 731 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 732 } 733 734 return poller; 735 } 736 737 void 738 spdk_poller_unregister(struct spdk_poller **ppoller) 739 { 740 struct spdk_thread *thread; 741 struct spdk_poller *poller; 742 743 poller = *ppoller; 744 if (poller == NULL) { 745 return; 746 } 747 748 *ppoller = NULL; 749 750 thread = spdk_get_thread(); 751 if (!thread) { 752 assert(false); 753 return; 754 } 755 756 /* Simply set the state to unregistered. The poller will get cleaned up 757 * in a subsequent call to spdk_thread_poll(). 758 */ 759 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 760 } 761 762 struct call_thread { 763 struct spdk_thread *cur_thread; 764 spdk_msg_fn fn; 765 void *ctx; 766 767 struct spdk_thread *orig_thread; 768 spdk_msg_fn cpl; 769 }; 770 771 static void 772 spdk_on_thread(void *ctx) 773 { 774 struct call_thread *ct = ctx; 775 776 ct->fn(ct->ctx); 777 778 pthread_mutex_lock(&g_devlist_mutex); 779 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 780 pthread_mutex_unlock(&g_devlist_mutex); 781 782 if (!ct->cur_thread) { 783 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 784 785 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 786 free(ctx); 787 } else { 788 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 789 ct->cur_thread->name); 790 791 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 792 } 793 } 794 795 void 796 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 797 { 798 struct call_thread *ct; 799 struct spdk_thread *thread; 800 801 ct = calloc(1, sizeof(*ct)); 802 if (!ct) { 803 SPDK_ERRLOG("Unable to perform thread iteration\n"); 804 cpl(ctx); 805 return; 806 } 807 808 ct->fn = fn; 809 ct->ctx = ctx; 810 ct->cpl = cpl; 811 812 thread = _get_thread(); 813 if (!thread) { 814 SPDK_ERRLOG("No thread allocated\n"); 815 free(ct); 816 cpl(ctx); 817 return; 818 } 819 ct->orig_thread = thread; 820 821 pthread_mutex_lock(&g_devlist_mutex); 822 ct->cur_thread = TAILQ_FIRST(&g_threads); 823 pthread_mutex_unlock(&g_devlist_mutex); 824 825 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 826 ct->orig_thread->name); 827 828 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 829 } 830 831 void 832 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 833 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 834 const char *name) 835 { 836 struct io_device *dev, *tmp; 837 struct spdk_thread *thread; 838 839 assert(io_device != NULL); 840 assert(create_cb != NULL); 841 assert(destroy_cb != NULL); 842 843 thread = spdk_get_thread(); 844 if (!thread) { 845 SPDK_ERRLOG("called from non-SPDK thread\n"); 846 assert(false); 847 return; 848 } 849 850 dev = calloc(1, sizeof(struct io_device)); 851 if (dev == NULL) { 852 SPDK_ERRLOG("could not allocate io_device\n"); 853 return; 854 } 855 856 dev->io_device = io_device; 857 if (name) { 858 snprintf(dev->name, sizeof(dev->name), "%s", name); 859 } else { 860 snprintf(dev->name, sizeof(dev->name), "%p", dev); 861 } 862 dev->create_cb = create_cb; 863 dev->destroy_cb = destroy_cb; 864 dev->unregister_cb = NULL; 865 dev->ctx_size = ctx_size; 866 dev->for_each_count = 0; 867 dev->unregistered = false; 868 dev->refcnt = 0; 869 870 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 871 dev->name, dev->io_device, thread->name); 872 873 pthread_mutex_lock(&g_devlist_mutex); 874 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 875 if (tmp->io_device == io_device) { 876 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 877 io_device, tmp->name, dev->name); 878 free(dev); 879 pthread_mutex_unlock(&g_devlist_mutex); 880 return; 881 } 882 } 883 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 884 pthread_mutex_unlock(&g_devlist_mutex); 885 } 886 887 static void 888 _finish_unregister(void *arg) 889 { 890 struct io_device *dev = arg; 891 892 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 893 dev->name, dev->io_device, dev->unregister_thread->name); 894 895 dev->unregister_cb(dev->io_device); 896 free(dev); 897 } 898 899 static void 900 _spdk_io_device_free(struct io_device *dev) 901 { 902 if (dev->unregister_cb == NULL) { 903 free(dev); 904 } else { 905 assert(dev->unregister_thread != NULL); 906 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 907 dev->name, dev->io_device, dev->unregister_thread->name); 908 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 909 } 910 } 911 912 void 913 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 914 { 915 struct io_device *dev; 916 uint32_t refcnt; 917 struct spdk_thread *thread; 918 919 thread = spdk_get_thread(); 920 if (!thread) { 921 SPDK_ERRLOG("called from non-SPDK thread\n"); 922 assert(false); 923 return; 924 } 925 926 pthread_mutex_lock(&g_devlist_mutex); 927 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 928 if (dev->io_device == io_device) { 929 break; 930 } 931 } 932 933 if (!dev) { 934 SPDK_ERRLOG("io_device %p not found\n", io_device); 935 assert(false); 936 pthread_mutex_unlock(&g_devlist_mutex); 937 return; 938 } 939 940 if (dev->for_each_count > 0) { 941 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 942 dev->name, io_device, dev->for_each_count); 943 pthread_mutex_unlock(&g_devlist_mutex); 944 return; 945 } 946 947 dev->unregister_cb = unregister_cb; 948 dev->unregistered = true; 949 TAILQ_REMOVE(&g_io_devices, dev, tailq); 950 refcnt = dev->refcnt; 951 dev->unregister_thread = thread; 952 pthread_mutex_unlock(&g_devlist_mutex); 953 954 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 955 dev->name, dev->io_device, thread->name); 956 957 if (refcnt > 0) { 958 /* defer deletion */ 959 return; 960 } 961 962 _spdk_io_device_free(dev); 963 } 964 965 struct spdk_io_channel * 966 spdk_get_io_channel(void *io_device) 967 { 968 struct spdk_io_channel *ch; 969 struct spdk_thread *thread; 970 struct io_device *dev; 971 int rc; 972 973 pthread_mutex_lock(&g_devlist_mutex); 974 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 975 if (dev->io_device == io_device) { 976 break; 977 } 978 } 979 if (dev == NULL) { 980 SPDK_ERRLOG("could not find io_device %p\n", io_device); 981 pthread_mutex_unlock(&g_devlist_mutex); 982 return NULL; 983 } 984 985 thread = _get_thread(); 986 if (!thread) { 987 SPDK_ERRLOG("No thread allocated\n"); 988 pthread_mutex_unlock(&g_devlist_mutex); 989 return NULL; 990 } 991 992 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 993 if (ch->dev == dev) { 994 ch->ref++; 995 996 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 997 ch, dev->name, dev->io_device, thread->name, ch->ref); 998 999 /* 1000 * An I/O channel already exists for this device on this 1001 * thread, so return it. 1002 */ 1003 pthread_mutex_unlock(&g_devlist_mutex); 1004 return ch; 1005 } 1006 } 1007 1008 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1009 if (ch == NULL) { 1010 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1011 pthread_mutex_unlock(&g_devlist_mutex); 1012 return NULL; 1013 } 1014 1015 ch->dev = dev; 1016 ch->destroy_cb = dev->destroy_cb; 1017 ch->thread = thread; 1018 ch->ref = 1; 1019 ch->destroy_ref = 0; 1020 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1021 1022 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1023 ch, dev->name, dev->io_device, thread->name, ch->ref); 1024 1025 dev->refcnt++; 1026 1027 pthread_mutex_unlock(&g_devlist_mutex); 1028 1029 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1030 if (rc != 0) { 1031 pthread_mutex_lock(&g_devlist_mutex); 1032 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1033 dev->refcnt--; 1034 free(ch); 1035 pthread_mutex_unlock(&g_devlist_mutex); 1036 return NULL; 1037 } 1038 1039 return ch; 1040 } 1041 1042 static void 1043 _spdk_put_io_channel(void *arg) 1044 { 1045 struct spdk_io_channel *ch = arg; 1046 bool do_remove_dev = true; 1047 struct spdk_thread *thread; 1048 1049 thread = spdk_get_thread(); 1050 if (!thread) { 1051 SPDK_ERRLOG("called from non-SPDK thread\n"); 1052 assert(false); 1053 return; 1054 } 1055 1056 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1057 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 1058 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 1059 1060 assert(ch->thread == thread); 1061 1062 ch->destroy_ref--; 1063 1064 if (ch->ref > 0 || ch->destroy_ref > 0) { 1065 /* 1066 * Another reference to the associated io_device was requested 1067 * after this message was sent but before it had a chance to 1068 * execute. 1069 */ 1070 return; 1071 } 1072 1073 pthread_mutex_lock(&g_devlist_mutex); 1074 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1075 pthread_mutex_unlock(&g_devlist_mutex); 1076 1077 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1078 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1079 1080 pthread_mutex_lock(&g_devlist_mutex); 1081 ch->dev->refcnt--; 1082 1083 if (!ch->dev->unregistered) { 1084 do_remove_dev = false; 1085 } 1086 1087 if (ch->dev->refcnt > 0) { 1088 do_remove_dev = false; 1089 } 1090 1091 pthread_mutex_unlock(&g_devlist_mutex); 1092 1093 if (do_remove_dev) { 1094 _spdk_io_device_free(ch->dev); 1095 } 1096 free(ch); 1097 } 1098 1099 void 1100 spdk_put_io_channel(struct spdk_io_channel *ch) 1101 { 1102 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1103 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1104 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1105 1106 ch->ref--; 1107 1108 if (ch->ref == 0) { 1109 ch->destroy_ref++; 1110 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1111 } 1112 } 1113 1114 struct spdk_io_channel * 1115 spdk_io_channel_from_ctx(void *ctx) 1116 { 1117 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1118 } 1119 1120 struct spdk_thread * 1121 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1122 { 1123 return ch->thread; 1124 } 1125 1126 struct spdk_io_channel_iter { 1127 void *io_device; 1128 struct io_device *dev; 1129 spdk_channel_msg fn; 1130 int status; 1131 void *ctx; 1132 struct spdk_io_channel *ch; 1133 1134 struct spdk_thread *cur_thread; 1135 1136 struct spdk_thread *orig_thread; 1137 spdk_channel_for_each_cpl cpl; 1138 }; 1139 1140 void * 1141 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1142 { 1143 return i->io_device; 1144 } 1145 1146 struct spdk_io_channel * 1147 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1148 { 1149 return i->ch; 1150 } 1151 1152 void * 1153 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1154 { 1155 return i->ctx; 1156 } 1157 1158 static void 1159 _call_completion(void *ctx) 1160 { 1161 struct spdk_io_channel_iter *i = ctx; 1162 1163 if (i->cpl != NULL) { 1164 i->cpl(i, i->status); 1165 } 1166 free(i); 1167 } 1168 1169 static void 1170 _call_channel(void *ctx) 1171 { 1172 struct spdk_io_channel_iter *i = ctx; 1173 struct spdk_io_channel *ch; 1174 1175 /* 1176 * It is possible that the channel was deleted before this 1177 * message had a chance to execute. If so, skip calling 1178 * the fn() on this thread. 1179 */ 1180 pthread_mutex_lock(&g_devlist_mutex); 1181 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1182 if (ch->dev->io_device == i->io_device) { 1183 break; 1184 } 1185 } 1186 pthread_mutex_unlock(&g_devlist_mutex); 1187 1188 if (ch) { 1189 i->fn(i); 1190 } else { 1191 spdk_for_each_channel_continue(i, 0); 1192 } 1193 } 1194 1195 void 1196 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1197 spdk_channel_for_each_cpl cpl) 1198 { 1199 struct spdk_thread *thread; 1200 struct spdk_io_channel *ch; 1201 struct spdk_io_channel_iter *i; 1202 int rc __attribute__((unused)); 1203 1204 i = calloc(1, sizeof(*i)); 1205 if (!i) { 1206 SPDK_ERRLOG("Unable to allocate iterator\n"); 1207 return; 1208 } 1209 1210 i->io_device = io_device; 1211 i->fn = fn; 1212 i->ctx = ctx; 1213 i->cpl = cpl; 1214 1215 pthread_mutex_lock(&g_devlist_mutex); 1216 i->orig_thread = _get_thread(); 1217 1218 TAILQ_FOREACH(thread, &g_threads, tailq) { 1219 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1220 if (ch->dev->io_device == io_device) { 1221 ch->dev->for_each_count++; 1222 i->dev = ch->dev; 1223 i->cur_thread = thread; 1224 i->ch = ch; 1225 pthread_mutex_unlock(&g_devlist_mutex); 1226 rc = spdk_thread_send_msg(thread, _call_channel, i); 1227 assert(rc == 0); 1228 return; 1229 } 1230 } 1231 } 1232 1233 pthread_mutex_unlock(&g_devlist_mutex); 1234 1235 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1236 assert(rc == 0); 1237 } 1238 1239 void 1240 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1241 { 1242 struct spdk_thread *thread; 1243 struct spdk_io_channel *ch; 1244 1245 assert(i->cur_thread == spdk_get_thread()); 1246 1247 i->status = status; 1248 1249 pthread_mutex_lock(&g_devlist_mutex); 1250 if (status) { 1251 goto end; 1252 } 1253 thread = TAILQ_NEXT(i->cur_thread, tailq); 1254 while (thread) { 1255 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1256 if (ch->dev->io_device == i->io_device) { 1257 i->cur_thread = thread; 1258 i->ch = ch; 1259 pthread_mutex_unlock(&g_devlist_mutex); 1260 spdk_thread_send_msg(thread, _call_channel, i); 1261 return; 1262 } 1263 } 1264 thread = TAILQ_NEXT(thread, tailq); 1265 } 1266 1267 end: 1268 i->dev->for_each_count--; 1269 i->ch = NULL; 1270 pthread_mutex_unlock(&g_devlist_mutex); 1271 1272 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1273 } 1274 1275 1276 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1277