1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #ifdef __linux__ 46 #include <sys/prctl.h> 47 #endif 48 49 #ifdef __FreeBSD__ 50 #include <pthread_np.h> 51 #endif 52 53 #define SPDK_MSG_BATCH_SIZE 8 54 55 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 56 57 static spdk_new_thread_fn g_new_thread_fn = NULL; 58 59 struct io_device { 60 void *io_device; 61 char *name; 62 spdk_io_channel_create_cb create_cb; 63 spdk_io_channel_destroy_cb destroy_cb; 64 spdk_io_device_unregister_cb unregister_cb; 65 struct spdk_thread *unregister_thread; 66 uint32_t ctx_size; 67 uint32_t for_each_count; 68 TAILQ_ENTRY(io_device) tailq; 69 70 uint32_t refcnt; 71 72 bool unregistered; 73 }; 74 75 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 76 77 struct spdk_msg { 78 spdk_msg_fn fn; 79 void *arg; 80 81 SLIST_ENTRY(spdk_msg) link; 82 }; 83 84 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 85 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 86 87 enum spdk_poller_state { 88 /* The poller is registered with a thread but not currently executing its fn. */ 89 SPDK_POLLER_STATE_WAITING, 90 91 /* The poller is currently running its fn. */ 92 SPDK_POLLER_STATE_RUNNING, 93 94 /* The poller was unregistered during the execution of its fn. */ 95 SPDK_POLLER_STATE_UNREGISTERED, 96 }; 97 98 struct spdk_poller { 99 TAILQ_ENTRY(spdk_poller) tailq; 100 101 /* Current state of the poller; should only be accessed from the poller's thread. */ 102 enum spdk_poller_state state; 103 104 uint64_t period_ticks; 105 uint64_t next_run_tick; 106 spdk_poller_fn fn; 107 void *arg; 108 }; 109 110 struct spdk_thread { 111 TAILQ_HEAD(, spdk_io_channel) io_channels; 112 TAILQ_ENTRY(spdk_thread) tailq; 113 char *name; 114 115 uint64_t tsc_last; 116 struct spdk_thread_stats stats; 117 118 /* 119 * Contains pollers actively running on this thread. Pollers 120 * are run round-robin. The thread takes one poller from the head 121 * of the ring, executes it, then puts it back at the tail of 122 * the ring. 123 */ 124 TAILQ_HEAD(, spdk_poller) active_pollers; 125 126 /** 127 * Contains pollers running on this thread with a periodic timer. 128 */ 129 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 130 131 struct spdk_ring *messages; 132 133 SLIST_HEAD(, spdk_msg) msg_cache; 134 size_t msg_cache_count; 135 }; 136 137 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 138 static uint32_t g_thread_count = 0; 139 140 static __thread struct spdk_thread *tls_thread = NULL; 141 142 static inline struct spdk_thread * 143 _get_thread(void) 144 { 145 return tls_thread; 146 } 147 148 static void 149 _set_thread_name(const char *thread_name) 150 { 151 #if defined(__linux__) 152 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 153 #elif defined(__FreeBSD__) 154 pthread_set_name_np(pthread_self(), thread_name); 155 #else 156 #error missing platform support for thread name 157 #endif 158 } 159 160 int 161 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn) 162 { 163 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 164 165 assert(g_new_thread_fn == NULL); 166 g_new_thread_fn = new_thread_fn; 167 168 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 169 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 170 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 171 sizeof(struct spdk_msg), 172 0, /* No cache. We do our own. */ 173 SPDK_ENV_SOCKET_ID_ANY); 174 175 if (!g_spdk_msg_mempool) { 176 return -1; 177 } 178 179 return 0; 180 } 181 182 void 183 spdk_thread_lib_fini(void) 184 { 185 struct io_device *dev; 186 187 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 188 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 189 } 190 191 if (g_spdk_msg_mempool) { 192 spdk_mempool_free(g_spdk_msg_mempool); 193 } 194 } 195 196 struct spdk_thread * 197 spdk_thread_create(const char *name) 198 { 199 struct spdk_thread *thread; 200 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 201 int rc, i; 202 203 thread = calloc(1, sizeof(*thread)); 204 if (!thread) { 205 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 206 return NULL; 207 } 208 209 TAILQ_INIT(&thread->io_channels); 210 TAILQ_INIT(&thread->active_pollers); 211 TAILQ_INIT(&thread->timer_pollers); 212 SLIST_INIT(&thread->msg_cache); 213 thread->msg_cache_count = 0; 214 215 thread->tsc_last = spdk_get_ticks(); 216 217 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 218 if (!thread->messages) { 219 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 220 free(thread); 221 return NULL; 222 } 223 224 /* Fill the local message pool cache. */ 225 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 226 if (rc == 0) { 227 /* If we can't populate the cache it's ok. The cache will get filled 228 * up organically as messages are passed to the thread. */ 229 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 230 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 231 thread->msg_cache_count++; 232 } 233 } 234 235 if (name) { 236 _set_thread_name(name); 237 thread->name = strdup(name); 238 } else { 239 thread->name = spdk_sprintf_alloc("%p", thread); 240 } 241 242 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 243 244 pthread_mutex_lock(&g_devlist_mutex); 245 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 246 g_thread_count++; 247 pthread_mutex_unlock(&g_devlist_mutex); 248 249 if (g_new_thread_fn) { 250 g_new_thread_fn(thread); 251 } 252 253 return thread; 254 } 255 256 void 257 spdk_set_thread(struct spdk_thread *thread) 258 { 259 tls_thread = thread; 260 } 261 262 void 263 spdk_thread_exit(struct spdk_thread *thread) 264 { 265 struct spdk_io_channel *ch; 266 struct spdk_msg *msg; 267 268 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 269 270 if (tls_thread == thread) { 271 tls_thread = NULL; 272 } 273 274 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 275 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 276 thread->name, ch->dev->name); 277 } 278 279 pthread_mutex_lock(&g_devlist_mutex); 280 assert(g_thread_count > 0); 281 g_thread_count--; 282 TAILQ_REMOVE(&g_threads, thread, tailq); 283 pthread_mutex_unlock(&g_devlist_mutex); 284 285 free(thread->name); 286 287 msg = SLIST_FIRST(&thread->msg_cache); 288 while (msg != NULL) { 289 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 290 291 assert(thread->msg_cache_count > 0); 292 thread->msg_cache_count--; 293 spdk_mempool_put(g_spdk_msg_mempool, msg); 294 295 msg = SLIST_FIRST(&thread->msg_cache); 296 } 297 298 assert(thread->msg_cache_count == 0); 299 300 if (thread->messages) { 301 spdk_ring_free(thread->messages); 302 } 303 304 free(thread); 305 } 306 307 static inline uint32_t 308 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 309 { 310 unsigned count, i; 311 void *messages[SPDK_MSG_BATCH_SIZE]; 312 313 #ifdef DEBUG 314 /* 315 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 316 * so we will never actually read uninitialized data from events, but just to be sure 317 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 318 */ 319 memset(messages, 0, sizeof(messages)); 320 #endif 321 322 if (max_msgs > 0) { 323 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 324 } else { 325 max_msgs = SPDK_MSG_BATCH_SIZE; 326 } 327 328 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 329 if (count == 0) { 330 return 0; 331 } 332 333 for (i = 0; i < count; i++) { 334 struct spdk_msg *msg = messages[i]; 335 336 assert(msg != NULL); 337 msg->fn(msg->arg); 338 339 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 340 /* Insert the messages at the head. We want to re-use the hot 341 * ones. */ 342 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 343 thread->msg_cache_count++; 344 } else { 345 spdk_mempool_put(g_spdk_msg_mempool, msg); 346 } 347 } 348 349 return count; 350 } 351 352 static void 353 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 354 { 355 struct spdk_poller *iter; 356 357 poller->next_run_tick = now + poller->period_ticks; 358 359 /* 360 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 361 * run time. 362 */ 363 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 364 if (iter->next_run_tick <= poller->next_run_tick) { 365 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 366 return; 367 } 368 } 369 370 /* No earlier pollers were found, so this poller must be the new head */ 371 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 372 } 373 374 int 375 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 376 { 377 uint32_t msg_count; 378 struct spdk_thread *orig_thread; 379 struct spdk_poller *poller; 380 int rc = 0; 381 382 orig_thread = _get_thread(); 383 tls_thread = thread; 384 385 if (now == 0) { 386 now = spdk_get_ticks(); 387 } 388 389 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 390 if (msg_count) { 391 rc = 1; 392 } 393 394 poller = TAILQ_FIRST(&thread->active_pollers); 395 if (poller) { 396 int poller_rc; 397 398 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 399 poller->state = SPDK_POLLER_STATE_RUNNING; 400 poller_rc = poller->fn(poller->arg); 401 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 402 free(poller); 403 } else { 404 poller->state = SPDK_POLLER_STATE_WAITING; 405 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 406 } 407 408 #ifdef DEBUG 409 if (poller_rc == -1) { 410 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 411 } 412 #endif 413 414 if (poller_rc > rc) { 415 rc = poller_rc; 416 } 417 } 418 419 poller = TAILQ_FIRST(&thread->timer_pollers); 420 if (poller) { 421 if (now >= poller->next_run_tick) { 422 int timer_rc = 0; 423 424 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 425 poller->state = SPDK_POLLER_STATE_RUNNING; 426 timer_rc = poller->fn(poller->arg); 427 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 428 free(poller); 429 } else { 430 poller->state = SPDK_POLLER_STATE_WAITING; 431 _spdk_poller_insert_timer(thread, poller, now); 432 } 433 434 #ifdef DEBUG 435 if (timer_rc == -1) { 436 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 437 } 438 #endif 439 440 if (timer_rc > rc) { 441 rc = timer_rc; 442 443 } 444 } 445 } 446 447 if (rc == 0) { 448 /* Poller status idle */ 449 thread->stats.idle_tsc += now - thread->tsc_last; 450 } else if (rc > 0) { 451 /* Poller status busy */ 452 thread->stats.busy_tsc += now - thread->tsc_last; 453 } else { 454 /* Poller status unknown */ 455 thread->stats.unknown_tsc += now - thread->tsc_last; 456 } 457 thread->tsc_last = now; 458 459 tls_thread = orig_thread; 460 461 return rc; 462 } 463 464 uint64_t 465 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 466 { 467 struct spdk_poller *poller; 468 469 poller = TAILQ_FIRST(&thread->timer_pollers); 470 if (poller) { 471 return poller->next_run_tick; 472 } 473 474 return 0; 475 } 476 477 int 478 spdk_thread_has_active_pollers(struct spdk_thread *thread) 479 { 480 return !TAILQ_EMPTY(&thread->active_pollers); 481 } 482 483 uint32_t 484 spdk_thread_get_count(void) 485 { 486 /* 487 * Return cached value of the current thread count. We could acquire the 488 * lock and iterate through the TAILQ of threads to count them, but that 489 * count could still be invalidated after we release the lock. 490 */ 491 return g_thread_count; 492 } 493 494 struct spdk_thread * 495 spdk_get_thread(void) 496 { 497 struct spdk_thread *thread; 498 499 thread = _get_thread(); 500 if (!thread) { 501 SPDK_ERRLOG("No thread allocated\n"); 502 } 503 504 return thread; 505 } 506 507 const char * 508 spdk_thread_get_name(const struct spdk_thread *thread) 509 { 510 return thread->name; 511 } 512 513 int 514 spdk_thread_get_stats(struct spdk_thread_stats *stats) 515 { 516 struct spdk_thread *thread; 517 518 thread = _get_thread(); 519 if (!thread) { 520 SPDK_ERRLOG("No thread allocated\n"); 521 return -EINVAL; 522 } 523 524 if (stats == NULL) { 525 return -EINVAL; 526 } 527 528 *stats = thread->stats; 529 530 return 0; 531 } 532 533 void 534 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 535 { 536 struct spdk_thread *local_thread; 537 struct spdk_msg *msg; 538 int rc; 539 540 if (!thread) { 541 assert(false); 542 return; 543 } 544 545 local_thread = _get_thread(); 546 547 msg = NULL; 548 if (local_thread != NULL) { 549 if (local_thread->msg_cache_count > 0) { 550 msg = SLIST_FIRST(&local_thread->msg_cache); 551 assert(msg != NULL); 552 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 553 local_thread->msg_cache_count--; 554 } 555 } 556 557 if (msg == NULL) { 558 msg = spdk_mempool_get(g_spdk_msg_mempool); 559 if (!msg) { 560 assert(false); 561 return; 562 } 563 } 564 565 msg->fn = fn; 566 msg->arg = ctx; 567 568 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 569 if (rc != 1) { 570 assert(false); 571 spdk_mempool_put(g_spdk_msg_mempool, msg); 572 return; 573 } 574 } 575 576 struct spdk_poller * 577 spdk_poller_register(spdk_poller_fn fn, 578 void *arg, 579 uint64_t period_microseconds) 580 { 581 struct spdk_thread *thread; 582 struct spdk_poller *poller; 583 uint64_t quotient, remainder, ticks; 584 585 thread = spdk_get_thread(); 586 if (!thread) { 587 assert(false); 588 return NULL; 589 } 590 591 poller = calloc(1, sizeof(*poller)); 592 if (poller == NULL) { 593 SPDK_ERRLOG("Poller memory allocation failed\n"); 594 return NULL; 595 } 596 597 poller->state = SPDK_POLLER_STATE_WAITING; 598 poller->fn = fn; 599 poller->arg = arg; 600 601 if (period_microseconds) { 602 quotient = period_microseconds / SPDK_SEC_TO_USEC; 603 remainder = period_microseconds % SPDK_SEC_TO_USEC; 604 ticks = spdk_get_ticks_hz(); 605 606 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 607 } else { 608 poller->period_ticks = 0; 609 } 610 611 if (poller->period_ticks) { 612 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 613 } else { 614 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 615 } 616 617 return poller; 618 } 619 620 void 621 spdk_poller_unregister(struct spdk_poller **ppoller) 622 { 623 struct spdk_thread *thread; 624 struct spdk_poller *poller; 625 626 poller = *ppoller; 627 if (poller == NULL) { 628 return; 629 } 630 631 *ppoller = NULL; 632 633 thread = spdk_get_thread(); 634 if (!thread) { 635 assert(false); 636 return; 637 } 638 639 if (poller->state == SPDK_POLLER_STATE_RUNNING) { 640 /* 641 * We are being called from the poller_fn, so set the state to unregistered 642 * and let the thread poll loop free the poller. 643 */ 644 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 645 } else { 646 /* Poller is not running currently, so just free it. */ 647 if (poller->period_ticks) { 648 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 649 } else { 650 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 651 } 652 653 free(poller); 654 } 655 } 656 657 struct call_thread { 658 struct spdk_thread *cur_thread; 659 spdk_msg_fn fn; 660 void *ctx; 661 662 struct spdk_thread *orig_thread; 663 spdk_msg_fn cpl; 664 }; 665 666 static void 667 spdk_on_thread(void *ctx) 668 { 669 struct call_thread *ct = ctx; 670 671 ct->fn(ct->ctx); 672 673 pthread_mutex_lock(&g_devlist_mutex); 674 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 675 pthread_mutex_unlock(&g_devlist_mutex); 676 677 if (!ct->cur_thread) { 678 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 679 680 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 681 free(ctx); 682 } else { 683 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 684 ct->cur_thread->name); 685 686 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 687 } 688 } 689 690 void 691 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 692 { 693 struct call_thread *ct; 694 struct spdk_thread *thread; 695 696 ct = calloc(1, sizeof(*ct)); 697 if (!ct) { 698 SPDK_ERRLOG("Unable to perform thread iteration\n"); 699 cpl(ctx); 700 return; 701 } 702 703 ct->fn = fn; 704 ct->ctx = ctx; 705 ct->cpl = cpl; 706 707 pthread_mutex_lock(&g_devlist_mutex); 708 thread = _get_thread(); 709 if (!thread) { 710 SPDK_ERRLOG("No thread allocated\n"); 711 free(ct); 712 cpl(ctx); 713 return; 714 } 715 ct->orig_thread = thread; 716 ct->cur_thread = TAILQ_FIRST(&g_threads); 717 pthread_mutex_unlock(&g_devlist_mutex); 718 719 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 720 ct->orig_thread->name); 721 722 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 723 } 724 725 void 726 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 727 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 728 const char *name) 729 { 730 struct io_device *dev, *tmp; 731 struct spdk_thread *thread; 732 733 assert(io_device != NULL); 734 assert(create_cb != NULL); 735 assert(destroy_cb != NULL); 736 737 thread = spdk_get_thread(); 738 if (!thread) { 739 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 740 assert(false); 741 return; 742 } 743 744 dev = calloc(1, sizeof(struct io_device)); 745 if (dev == NULL) { 746 SPDK_ERRLOG("could not allocate io_device\n"); 747 return; 748 } 749 750 dev->io_device = io_device; 751 if (name) { 752 dev->name = strdup(name); 753 } else { 754 dev->name = spdk_sprintf_alloc("%p", dev); 755 } 756 dev->create_cb = create_cb; 757 dev->destroy_cb = destroy_cb; 758 dev->unregister_cb = NULL; 759 dev->ctx_size = ctx_size; 760 dev->for_each_count = 0; 761 dev->unregistered = false; 762 dev->refcnt = 0; 763 764 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 765 dev->name, dev->io_device, thread->name); 766 767 pthread_mutex_lock(&g_devlist_mutex); 768 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 769 if (tmp->io_device == io_device) { 770 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 771 io_device, tmp->name, dev->name); 772 free(dev->name); 773 free(dev); 774 pthread_mutex_unlock(&g_devlist_mutex); 775 return; 776 } 777 } 778 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 779 pthread_mutex_unlock(&g_devlist_mutex); 780 } 781 782 static void 783 _finish_unregister(void *arg) 784 { 785 struct io_device *dev = arg; 786 787 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 788 dev->name, dev->io_device, dev->unregister_thread->name); 789 790 dev->unregister_cb(dev->io_device); 791 free(dev->name); 792 free(dev); 793 } 794 795 static void 796 _spdk_io_device_free(struct io_device *dev) 797 { 798 if (dev->unregister_cb == NULL) { 799 free(dev->name); 800 free(dev); 801 } else { 802 assert(dev->unregister_thread != NULL); 803 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 804 dev->name, dev->io_device, dev->unregister_thread->name); 805 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 806 } 807 } 808 809 void 810 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 811 { 812 struct io_device *dev; 813 uint32_t refcnt; 814 struct spdk_thread *thread; 815 816 thread = spdk_get_thread(); 817 if (!thread) { 818 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 819 assert(false); 820 return; 821 } 822 823 pthread_mutex_lock(&g_devlist_mutex); 824 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 825 if (dev->io_device == io_device) { 826 break; 827 } 828 } 829 830 if (!dev) { 831 SPDK_ERRLOG("io_device %p not found\n", io_device); 832 assert(false); 833 pthread_mutex_unlock(&g_devlist_mutex); 834 return; 835 } 836 837 if (dev->for_each_count > 0) { 838 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 839 dev->name, io_device, dev->for_each_count); 840 pthread_mutex_unlock(&g_devlist_mutex); 841 return; 842 } 843 844 dev->unregister_cb = unregister_cb; 845 dev->unregistered = true; 846 TAILQ_REMOVE(&g_io_devices, dev, tailq); 847 refcnt = dev->refcnt; 848 dev->unregister_thread = thread; 849 pthread_mutex_unlock(&g_devlist_mutex); 850 851 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 852 dev->name, dev->io_device, thread->name); 853 854 if (refcnt > 0) { 855 /* defer deletion */ 856 return; 857 } 858 859 _spdk_io_device_free(dev); 860 } 861 862 struct spdk_io_channel * 863 spdk_get_io_channel(void *io_device) 864 { 865 struct spdk_io_channel *ch; 866 struct spdk_thread *thread; 867 struct io_device *dev; 868 int rc; 869 870 pthread_mutex_lock(&g_devlist_mutex); 871 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 872 if (dev->io_device == io_device) { 873 break; 874 } 875 } 876 if (dev == NULL) { 877 SPDK_ERRLOG("could not find io_device %p\n", io_device); 878 pthread_mutex_unlock(&g_devlist_mutex); 879 return NULL; 880 } 881 882 thread = _get_thread(); 883 if (!thread) { 884 SPDK_ERRLOG("No thread allocated\n"); 885 pthread_mutex_unlock(&g_devlist_mutex); 886 return NULL; 887 } 888 889 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 890 if (ch->dev == dev) { 891 ch->ref++; 892 893 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 894 ch, dev->name, dev->io_device, thread->name, ch->ref); 895 896 /* 897 * An I/O channel already exists for this device on this 898 * thread, so return it. 899 */ 900 pthread_mutex_unlock(&g_devlist_mutex); 901 return ch; 902 } 903 } 904 905 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 906 if (ch == NULL) { 907 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 908 pthread_mutex_unlock(&g_devlist_mutex); 909 return NULL; 910 } 911 912 ch->dev = dev; 913 ch->destroy_cb = dev->destroy_cb; 914 ch->thread = thread; 915 ch->ref = 1; 916 ch->destroy_ref = 0; 917 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 918 919 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 920 ch, dev->name, dev->io_device, thread->name, ch->ref); 921 922 dev->refcnt++; 923 924 pthread_mutex_unlock(&g_devlist_mutex); 925 926 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 927 if (rc != 0) { 928 pthread_mutex_lock(&g_devlist_mutex); 929 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 930 dev->refcnt--; 931 free(ch); 932 pthread_mutex_unlock(&g_devlist_mutex); 933 return NULL; 934 } 935 936 return ch; 937 } 938 939 static void 940 _spdk_put_io_channel(void *arg) 941 { 942 struct spdk_io_channel *ch = arg; 943 bool do_remove_dev = true; 944 struct spdk_thread *thread; 945 946 thread = spdk_get_thread(); 947 if (!thread) { 948 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 949 assert(false); 950 return; 951 } 952 953 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 954 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 955 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 956 957 assert(ch->thread == thread); 958 959 ch->destroy_ref--; 960 961 if (ch->ref > 0 || ch->destroy_ref > 0) { 962 /* 963 * Another reference to the associated io_device was requested 964 * after this message was sent but before it had a chance to 965 * execute. 966 */ 967 return; 968 } 969 970 pthread_mutex_lock(&g_devlist_mutex); 971 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 972 pthread_mutex_unlock(&g_devlist_mutex); 973 974 /* Don't hold the devlist mutex while the destroy_cb is called. */ 975 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 976 977 pthread_mutex_lock(&g_devlist_mutex); 978 ch->dev->refcnt--; 979 980 if (!ch->dev->unregistered) { 981 do_remove_dev = false; 982 } 983 984 if (ch->dev->refcnt > 0) { 985 do_remove_dev = false; 986 } 987 988 pthread_mutex_unlock(&g_devlist_mutex); 989 990 if (do_remove_dev) { 991 _spdk_io_device_free(ch->dev); 992 } 993 free(ch); 994 } 995 996 void 997 spdk_put_io_channel(struct spdk_io_channel *ch) 998 { 999 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1000 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1001 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1002 1003 ch->ref--; 1004 1005 if (ch->ref == 0) { 1006 ch->destroy_ref++; 1007 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1008 } 1009 } 1010 1011 struct spdk_io_channel * 1012 spdk_io_channel_from_ctx(void *ctx) 1013 { 1014 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1015 } 1016 1017 struct spdk_thread * 1018 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1019 { 1020 return ch->thread; 1021 } 1022 1023 struct spdk_io_channel_iter { 1024 void *io_device; 1025 struct io_device *dev; 1026 spdk_channel_msg fn; 1027 int status; 1028 void *ctx; 1029 struct spdk_io_channel *ch; 1030 1031 struct spdk_thread *cur_thread; 1032 1033 struct spdk_thread *orig_thread; 1034 spdk_channel_for_each_cpl cpl; 1035 }; 1036 1037 void * 1038 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1039 { 1040 return i->io_device; 1041 } 1042 1043 struct spdk_io_channel * 1044 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1045 { 1046 return i->ch; 1047 } 1048 1049 void * 1050 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1051 { 1052 return i->ctx; 1053 } 1054 1055 static void 1056 _call_completion(void *ctx) 1057 { 1058 struct spdk_io_channel_iter *i = ctx; 1059 1060 if (i->cpl != NULL) { 1061 i->cpl(i, i->status); 1062 } 1063 free(i); 1064 } 1065 1066 static void 1067 _call_channel(void *ctx) 1068 { 1069 struct spdk_io_channel_iter *i = ctx; 1070 struct spdk_io_channel *ch; 1071 1072 /* 1073 * It is possible that the channel was deleted before this 1074 * message had a chance to execute. If so, skip calling 1075 * the fn() on this thread. 1076 */ 1077 pthread_mutex_lock(&g_devlist_mutex); 1078 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1079 if (ch->dev->io_device == i->io_device) { 1080 break; 1081 } 1082 } 1083 pthread_mutex_unlock(&g_devlist_mutex); 1084 1085 if (ch) { 1086 i->fn(i); 1087 } else { 1088 spdk_for_each_channel_continue(i, 0); 1089 } 1090 } 1091 1092 void 1093 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1094 spdk_channel_for_each_cpl cpl) 1095 { 1096 struct spdk_thread *thread; 1097 struct spdk_io_channel *ch; 1098 struct spdk_io_channel_iter *i; 1099 1100 i = calloc(1, sizeof(*i)); 1101 if (!i) { 1102 SPDK_ERRLOG("Unable to allocate iterator\n"); 1103 return; 1104 } 1105 1106 i->io_device = io_device; 1107 i->fn = fn; 1108 i->ctx = ctx; 1109 i->cpl = cpl; 1110 1111 pthread_mutex_lock(&g_devlist_mutex); 1112 i->orig_thread = _get_thread(); 1113 1114 TAILQ_FOREACH(thread, &g_threads, tailq) { 1115 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1116 if (ch->dev->io_device == io_device) { 1117 ch->dev->for_each_count++; 1118 i->dev = ch->dev; 1119 i->cur_thread = thread; 1120 i->ch = ch; 1121 pthread_mutex_unlock(&g_devlist_mutex); 1122 spdk_thread_send_msg(thread, _call_channel, i); 1123 return; 1124 } 1125 } 1126 } 1127 1128 pthread_mutex_unlock(&g_devlist_mutex); 1129 1130 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1131 } 1132 1133 void 1134 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1135 { 1136 struct spdk_thread *thread; 1137 struct spdk_io_channel *ch; 1138 1139 assert(i->cur_thread == spdk_get_thread()); 1140 1141 i->status = status; 1142 1143 pthread_mutex_lock(&g_devlist_mutex); 1144 if (status) { 1145 goto end; 1146 } 1147 thread = TAILQ_NEXT(i->cur_thread, tailq); 1148 while (thread) { 1149 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1150 if (ch->dev->io_device == i->io_device) { 1151 i->cur_thread = thread; 1152 i->ch = ch; 1153 pthread_mutex_unlock(&g_devlist_mutex); 1154 spdk_thread_send_msg(thread, _call_channel, i); 1155 return; 1156 } 1157 } 1158 thread = TAILQ_NEXT(thread, tailq); 1159 } 1160 1161 end: 1162 i->dev->for_each_count--; 1163 i->ch = NULL; 1164 pthread_mutex_unlock(&g_devlist_mutex); 1165 1166 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1167 } 1168 1169 1170 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1171