1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #ifdef __linux__ 46 #include <sys/prctl.h> 47 #endif 48 49 #ifdef __FreeBSD__ 50 #include <pthread_np.h> 51 #endif 52 53 #define SPDK_MSG_BATCH_SIZE 8 54 55 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 56 57 static spdk_new_thread_fn g_new_thread_fn = NULL; 58 static size_t g_ctx_sz = 0; 59 60 struct io_device { 61 void *io_device; 62 char *name; 63 spdk_io_channel_create_cb create_cb; 64 spdk_io_channel_destroy_cb destroy_cb; 65 spdk_io_device_unregister_cb unregister_cb; 66 struct spdk_thread *unregister_thread; 67 uint32_t ctx_size; 68 uint32_t for_each_count; 69 TAILQ_ENTRY(io_device) tailq; 70 71 uint32_t refcnt; 72 73 bool unregistered; 74 }; 75 76 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 77 78 struct spdk_msg { 79 spdk_msg_fn fn; 80 void *arg; 81 82 SLIST_ENTRY(spdk_msg) link; 83 }; 84 85 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 86 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 87 88 enum spdk_poller_state { 89 /* The poller is registered with a thread but not currently executing its fn. */ 90 SPDK_POLLER_STATE_WAITING, 91 92 /* The poller is currently running its fn. */ 93 SPDK_POLLER_STATE_RUNNING, 94 95 /* The poller was unregistered during the execution of its fn. */ 96 SPDK_POLLER_STATE_UNREGISTERED, 97 }; 98 99 struct spdk_poller { 100 TAILQ_ENTRY(spdk_poller) tailq; 101 102 /* Current state of the poller; should only be accessed from the poller's thread. */ 103 enum spdk_poller_state state; 104 105 uint64_t period_ticks; 106 uint64_t next_run_tick; 107 spdk_poller_fn fn; 108 void *arg; 109 }; 110 111 struct spdk_thread { 112 TAILQ_HEAD(, spdk_io_channel) io_channels; 113 TAILQ_ENTRY(spdk_thread) tailq; 114 char *name; 115 116 uint64_t tsc_last; 117 struct spdk_thread_stats stats; 118 119 /* 120 * Contains pollers actively running on this thread. Pollers 121 * are run round-robin. The thread takes one poller from the head 122 * of the ring, executes it, then puts it back at the tail of 123 * the ring. 124 */ 125 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 126 127 /** 128 * Contains pollers running on this thread with a periodic timer. 129 */ 130 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 131 132 struct spdk_ring *messages; 133 134 SLIST_HEAD(, spdk_msg) msg_cache; 135 size_t msg_cache_count; 136 137 /* User context allocated at the end */ 138 uint8_t ctx[0]; 139 }; 140 141 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 142 static uint32_t g_thread_count = 0; 143 144 static __thread struct spdk_thread *tls_thread = NULL; 145 146 static inline struct spdk_thread * 147 _get_thread(void) 148 { 149 return tls_thread; 150 } 151 152 static void 153 _set_thread_name(const char *thread_name) 154 { 155 #if defined(__linux__) 156 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 157 #elif defined(__FreeBSD__) 158 pthread_set_name_np(pthread_self(), thread_name); 159 #else 160 #error missing platform support for thread name 161 #endif 162 } 163 164 int 165 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 166 { 167 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 168 169 assert(g_new_thread_fn == NULL); 170 g_new_thread_fn = new_thread_fn; 171 172 g_ctx_sz = ctx_sz; 173 174 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 175 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 176 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 177 sizeof(struct spdk_msg), 178 0, /* No cache. We do our own. */ 179 SPDK_ENV_SOCKET_ID_ANY); 180 181 if (!g_spdk_msg_mempool) { 182 return -1; 183 } 184 185 return 0; 186 } 187 188 void 189 spdk_thread_lib_fini(void) 190 { 191 struct io_device *dev; 192 193 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 194 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 195 } 196 197 if (g_spdk_msg_mempool) { 198 spdk_mempool_free(g_spdk_msg_mempool); 199 } 200 } 201 202 struct spdk_thread * 203 spdk_thread_create(const char *name) 204 { 205 struct spdk_thread *thread; 206 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 207 int rc, i; 208 209 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 210 if (!thread) { 211 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 212 return NULL; 213 } 214 215 TAILQ_INIT(&thread->io_channels); 216 TAILQ_INIT(&thread->active_pollers); 217 TAILQ_INIT(&thread->timer_pollers); 218 SLIST_INIT(&thread->msg_cache); 219 thread->msg_cache_count = 0; 220 221 thread->tsc_last = spdk_get_ticks(); 222 223 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 224 if (!thread->messages) { 225 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 226 free(thread); 227 return NULL; 228 } 229 230 /* Fill the local message pool cache. */ 231 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 232 if (rc == 0) { 233 /* If we can't populate the cache it's ok. The cache will get filled 234 * up organically as messages are passed to the thread. */ 235 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 236 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 237 thread->msg_cache_count++; 238 } 239 } 240 241 if (name) { 242 _set_thread_name(name); 243 thread->name = strdup(name); 244 } else { 245 thread->name = spdk_sprintf_alloc("%p", thread); 246 } 247 248 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 249 250 pthread_mutex_lock(&g_devlist_mutex); 251 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 252 g_thread_count++; 253 pthread_mutex_unlock(&g_devlist_mutex); 254 255 if (g_new_thread_fn) { 256 g_new_thread_fn(thread); 257 } 258 259 return thread; 260 } 261 262 void 263 spdk_set_thread(struct spdk_thread *thread) 264 { 265 tls_thread = thread; 266 } 267 268 void 269 spdk_thread_exit(struct spdk_thread *thread) 270 { 271 struct spdk_io_channel *ch; 272 struct spdk_msg *msg; 273 struct spdk_poller *poller, *ptmp; 274 275 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 276 277 if (tls_thread == thread) { 278 tls_thread = NULL; 279 } 280 281 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 282 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 283 thread->name, ch->dev->name); 284 } 285 286 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 287 if (poller->state == SPDK_POLLER_STATE_WAITING) { 288 SPDK_WARNLOG("poller %p still registered at thread exit\n", 289 poller); 290 } 291 292 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 293 free(poller); 294 } 295 296 297 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { 298 if (poller->state == SPDK_POLLER_STATE_WAITING) { 299 SPDK_WARNLOG("poller %p still registered at thread exit\n", 300 poller); 301 } 302 303 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 304 free(poller); 305 } 306 307 pthread_mutex_lock(&g_devlist_mutex); 308 assert(g_thread_count > 0); 309 g_thread_count--; 310 TAILQ_REMOVE(&g_threads, thread, tailq); 311 pthread_mutex_unlock(&g_devlist_mutex); 312 313 free(thread->name); 314 315 msg = SLIST_FIRST(&thread->msg_cache); 316 while (msg != NULL) { 317 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 318 319 assert(thread->msg_cache_count > 0); 320 thread->msg_cache_count--; 321 spdk_mempool_put(g_spdk_msg_mempool, msg); 322 323 msg = SLIST_FIRST(&thread->msg_cache); 324 } 325 326 assert(thread->msg_cache_count == 0); 327 328 if (thread->messages) { 329 spdk_ring_free(thread->messages); 330 } 331 332 free(thread); 333 } 334 335 void * 336 spdk_thread_get_ctx(struct spdk_thread *thread) 337 { 338 if (g_ctx_sz > 0) { 339 return thread->ctx; 340 } 341 342 return NULL; 343 } 344 345 struct spdk_thread * 346 spdk_thread_get_from_ctx(void *ctx) 347 { 348 if (ctx == NULL) { 349 assert(false); 350 return NULL; 351 } 352 353 assert(g_ctx_sz > 0); 354 355 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 356 } 357 358 static inline uint32_t 359 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 360 { 361 unsigned count, i; 362 void *messages[SPDK_MSG_BATCH_SIZE]; 363 364 #ifdef DEBUG 365 /* 366 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 367 * so we will never actually read uninitialized data from events, but just to be sure 368 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 369 */ 370 memset(messages, 0, sizeof(messages)); 371 #endif 372 373 if (max_msgs > 0) { 374 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 375 } else { 376 max_msgs = SPDK_MSG_BATCH_SIZE; 377 } 378 379 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 380 if (count == 0) { 381 return 0; 382 } 383 384 for (i = 0; i < count; i++) { 385 struct spdk_msg *msg = messages[i]; 386 387 assert(msg != NULL); 388 msg->fn(msg->arg); 389 390 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 391 /* Insert the messages at the head. We want to re-use the hot 392 * ones. */ 393 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 394 thread->msg_cache_count++; 395 } else { 396 spdk_mempool_put(g_spdk_msg_mempool, msg); 397 } 398 } 399 400 return count; 401 } 402 403 static void 404 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 405 { 406 struct spdk_poller *iter; 407 408 poller->next_run_tick = now + poller->period_ticks; 409 410 /* 411 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 412 * run time. 413 */ 414 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 415 if (iter->next_run_tick <= poller->next_run_tick) { 416 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 417 return; 418 } 419 } 420 421 /* No earlier pollers were found, so this poller must be the new head */ 422 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 423 } 424 425 int 426 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 427 { 428 uint32_t msg_count; 429 struct spdk_thread *orig_thread; 430 struct spdk_poller *poller, *tmp; 431 int rc = 0; 432 433 orig_thread = _get_thread(); 434 tls_thread = thread; 435 436 if (now == 0) { 437 now = spdk_get_ticks(); 438 } 439 440 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 441 if (msg_count) { 442 rc = 1; 443 } 444 445 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 446 active_pollers_head, tailq, tmp) { 447 int poller_rc; 448 449 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 450 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 451 free(poller); 452 continue; 453 } 454 455 poller->state = SPDK_POLLER_STATE_RUNNING; 456 poller_rc = poller->fn(poller->arg); 457 458 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 459 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 460 free(poller); 461 continue; 462 } 463 464 poller->state = SPDK_POLLER_STATE_WAITING; 465 466 #ifdef DEBUG 467 if (poller_rc == -1) { 468 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 469 } 470 #endif 471 472 if (poller_rc > rc) { 473 rc = poller_rc; 474 } 475 476 } 477 478 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 479 int timer_rc = 0; 480 481 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 482 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 483 free(poller); 484 continue; 485 } 486 487 if (now < poller->next_run_tick) { 488 break; 489 } 490 491 poller->state = SPDK_POLLER_STATE_RUNNING; 492 timer_rc = poller->fn(poller->arg); 493 494 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 495 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 496 free(poller); 497 continue; 498 } 499 500 poller->state = SPDK_POLLER_STATE_WAITING; 501 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 502 _spdk_poller_insert_timer(thread, poller, now); 503 504 #ifdef DEBUG 505 if (timer_rc == -1) { 506 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 507 } 508 #endif 509 510 if (timer_rc > rc) { 511 rc = timer_rc; 512 513 } 514 } 515 516 if (rc == 0) { 517 /* Poller status idle */ 518 thread->stats.idle_tsc += now - thread->tsc_last; 519 } else if (rc > 0) { 520 /* Poller status busy */ 521 thread->stats.busy_tsc += now - thread->tsc_last; 522 } else { 523 /* Poller status unknown */ 524 thread->stats.unknown_tsc += now - thread->tsc_last; 525 } 526 thread->tsc_last = now; 527 528 tls_thread = orig_thread; 529 530 return rc; 531 } 532 533 uint64_t 534 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 535 { 536 struct spdk_poller *poller; 537 538 poller = TAILQ_FIRST(&thread->timer_pollers); 539 if (poller) { 540 return poller->next_run_tick; 541 } 542 543 return 0; 544 } 545 546 int 547 spdk_thread_has_active_pollers(struct spdk_thread *thread) 548 { 549 return !TAILQ_EMPTY(&thread->active_pollers); 550 } 551 552 uint32_t 553 spdk_thread_get_count(void) 554 { 555 /* 556 * Return cached value of the current thread count. We could acquire the 557 * lock and iterate through the TAILQ of threads to count them, but that 558 * count could still be invalidated after we release the lock. 559 */ 560 return g_thread_count; 561 } 562 563 struct spdk_thread * 564 spdk_get_thread(void) 565 { 566 struct spdk_thread *thread; 567 568 thread = _get_thread(); 569 if (!thread) { 570 SPDK_ERRLOG("No thread allocated\n"); 571 } 572 573 return thread; 574 } 575 576 const char * 577 spdk_thread_get_name(const struct spdk_thread *thread) 578 { 579 return thread->name; 580 } 581 582 int 583 spdk_thread_get_stats(struct spdk_thread_stats *stats) 584 { 585 struct spdk_thread *thread; 586 587 thread = _get_thread(); 588 if (!thread) { 589 SPDK_ERRLOG("No thread allocated\n"); 590 return -EINVAL; 591 } 592 593 if (stats == NULL) { 594 return -EINVAL; 595 } 596 597 *stats = thread->stats; 598 599 return 0; 600 } 601 602 void 603 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 604 { 605 struct spdk_thread *local_thread; 606 struct spdk_msg *msg; 607 int rc; 608 609 if (!thread) { 610 assert(false); 611 return; 612 } 613 614 local_thread = _get_thread(); 615 616 msg = NULL; 617 if (local_thread != NULL) { 618 if (local_thread->msg_cache_count > 0) { 619 msg = SLIST_FIRST(&local_thread->msg_cache); 620 assert(msg != NULL); 621 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 622 local_thread->msg_cache_count--; 623 } 624 } 625 626 if (msg == NULL) { 627 msg = spdk_mempool_get(g_spdk_msg_mempool); 628 if (!msg) { 629 assert(false); 630 return; 631 } 632 } 633 634 msg->fn = fn; 635 msg->arg = ctx; 636 637 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 638 if (rc != 1) { 639 assert(false); 640 spdk_mempool_put(g_spdk_msg_mempool, msg); 641 return; 642 } 643 } 644 645 struct spdk_poller * 646 spdk_poller_register(spdk_poller_fn fn, 647 void *arg, 648 uint64_t period_microseconds) 649 { 650 struct spdk_thread *thread; 651 struct spdk_poller *poller; 652 uint64_t quotient, remainder, ticks; 653 654 thread = spdk_get_thread(); 655 if (!thread) { 656 assert(false); 657 return NULL; 658 } 659 660 poller = calloc(1, sizeof(*poller)); 661 if (poller == NULL) { 662 SPDK_ERRLOG("Poller memory allocation failed\n"); 663 return NULL; 664 } 665 666 poller->state = SPDK_POLLER_STATE_WAITING; 667 poller->fn = fn; 668 poller->arg = arg; 669 670 if (period_microseconds) { 671 quotient = period_microseconds / SPDK_SEC_TO_USEC; 672 remainder = period_microseconds % SPDK_SEC_TO_USEC; 673 ticks = spdk_get_ticks_hz(); 674 675 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 676 } else { 677 poller->period_ticks = 0; 678 } 679 680 if (poller->period_ticks) { 681 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 682 } else { 683 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 684 } 685 686 return poller; 687 } 688 689 void 690 spdk_poller_unregister(struct spdk_poller **ppoller) 691 { 692 struct spdk_thread *thread; 693 struct spdk_poller *poller; 694 695 poller = *ppoller; 696 if (poller == NULL) { 697 return; 698 } 699 700 *ppoller = NULL; 701 702 thread = spdk_get_thread(); 703 if (!thread) { 704 assert(false); 705 return; 706 } 707 708 /* Simply set the state to unregistered. The poller will get cleaned up 709 * in a subsequent call to spdk_thread_poll(). 710 */ 711 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 712 } 713 714 struct call_thread { 715 struct spdk_thread *cur_thread; 716 spdk_msg_fn fn; 717 void *ctx; 718 719 struct spdk_thread *orig_thread; 720 spdk_msg_fn cpl; 721 }; 722 723 static void 724 spdk_on_thread(void *ctx) 725 { 726 struct call_thread *ct = ctx; 727 728 ct->fn(ct->ctx); 729 730 pthread_mutex_lock(&g_devlist_mutex); 731 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 732 pthread_mutex_unlock(&g_devlist_mutex); 733 734 if (!ct->cur_thread) { 735 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 736 737 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 738 free(ctx); 739 } else { 740 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 741 ct->cur_thread->name); 742 743 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 744 } 745 } 746 747 void 748 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 749 { 750 struct call_thread *ct; 751 struct spdk_thread *thread; 752 753 ct = calloc(1, sizeof(*ct)); 754 if (!ct) { 755 SPDK_ERRLOG("Unable to perform thread iteration\n"); 756 cpl(ctx); 757 return; 758 } 759 760 ct->fn = fn; 761 ct->ctx = ctx; 762 ct->cpl = cpl; 763 764 pthread_mutex_lock(&g_devlist_mutex); 765 thread = _get_thread(); 766 if (!thread) { 767 SPDK_ERRLOG("No thread allocated\n"); 768 free(ct); 769 cpl(ctx); 770 return; 771 } 772 ct->orig_thread = thread; 773 ct->cur_thread = TAILQ_FIRST(&g_threads); 774 pthread_mutex_unlock(&g_devlist_mutex); 775 776 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 777 ct->orig_thread->name); 778 779 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 780 } 781 782 void 783 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 784 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 785 const char *name) 786 { 787 struct io_device *dev, *tmp; 788 struct spdk_thread *thread; 789 790 assert(io_device != NULL); 791 assert(create_cb != NULL); 792 assert(destroy_cb != NULL); 793 794 thread = spdk_get_thread(); 795 if (!thread) { 796 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 797 assert(false); 798 return; 799 } 800 801 dev = calloc(1, sizeof(struct io_device)); 802 if (dev == NULL) { 803 SPDK_ERRLOG("could not allocate io_device\n"); 804 return; 805 } 806 807 dev->io_device = io_device; 808 if (name) { 809 dev->name = strdup(name); 810 } else { 811 dev->name = spdk_sprintf_alloc("%p", dev); 812 } 813 dev->create_cb = create_cb; 814 dev->destroy_cb = destroy_cb; 815 dev->unregister_cb = NULL; 816 dev->ctx_size = ctx_size; 817 dev->for_each_count = 0; 818 dev->unregistered = false; 819 dev->refcnt = 0; 820 821 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 822 dev->name, dev->io_device, thread->name); 823 824 pthread_mutex_lock(&g_devlist_mutex); 825 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 826 if (tmp->io_device == io_device) { 827 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 828 io_device, tmp->name, dev->name); 829 free(dev->name); 830 free(dev); 831 pthread_mutex_unlock(&g_devlist_mutex); 832 return; 833 } 834 } 835 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 836 pthread_mutex_unlock(&g_devlist_mutex); 837 } 838 839 static void 840 _finish_unregister(void *arg) 841 { 842 struct io_device *dev = arg; 843 844 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 845 dev->name, dev->io_device, dev->unregister_thread->name); 846 847 dev->unregister_cb(dev->io_device); 848 free(dev->name); 849 free(dev); 850 } 851 852 static void 853 _spdk_io_device_free(struct io_device *dev) 854 { 855 if (dev->unregister_cb == NULL) { 856 free(dev->name); 857 free(dev); 858 } else { 859 assert(dev->unregister_thread != NULL); 860 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 861 dev->name, dev->io_device, dev->unregister_thread->name); 862 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 863 } 864 } 865 866 void 867 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 868 { 869 struct io_device *dev; 870 uint32_t refcnt; 871 struct spdk_thread *thread; 872 873 thread = spdk_get_thread(); 874 if (!thread) { 875 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 876 assert(false); 877 return; 878 } 879 880 pthread_mutex_lock(&g_devlist_mutex); 881 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 882 if (dev->io_device == io_device) { 883 break; 884 } 885 } 886 887 if (!dev) { 888 SPDK_ERRLOG("io_device %p not found\n", io_device); 889 assert(false); 890 pthread_mutex_unlock(&g_devlist_mutex); 891 return; 892 } 893 894 if (dev->for_each_count > 0) { 895 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 896 dev->name, io_device, dev->for_each_count); 897 pthread_mutex_unlock(&g_devlist_mutex); 898 return; 899 } 900 901 dev->unregister_cb = unregister_cb; 902 dev->unregistered = true; 903 TAILQ_REMOVE(&g_io_devices, dev, tailq); 904 refcnt = dev->refcnt; 905 dev->unregister_thread = thread; 906 pthread_mutex_unlock(&g_devlist_mutex); 907 908 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 909 dev->name, dev->io_device, thread->name); 910 911 if (refcnt > 0) { 912 /* defer deletion */ 913 return; 914 } 915 916 _spdk_io_device_free(dev); 917 } 918 919 struct spdk_io_channel * 920 spdk_get_io_channel(void *io_device) 921 { 922 struct spdk_io_channel *ch; 923 struct spdk_thread *thread; 924 struct io_device *dev; 925 int rc; 926 927 pthread_mutex_lock(&g_devlist_mutex); 928 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 929 if (dev->io_device == io_device) { 930 break; 931 } 932 } 933 if (dev == NULL) { 934 SPDK_ERRLOG("could not find io_device %p\n", io_device); 935 pthread_mutex_unlock(&g_devlist_mutex); 936 return NULL; 937 } 938 939 thread = _get_thread(); 940 if (!thread) { 941 SPDK_ERRLOG("No thread allocated\n"); 942 pthread_mutex_unlock(&g_devlist_mutex); 943 return NULL; 944 } 945 946 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 947 if (ch->dev == dev) { 948 ch->ref++; 949 950 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 951 ch, dev->name, dev->io_device, thread->name, ch->ref); 952 953 /* 954 * An I/O channel already exists for this device on this 955 * thread, so return it. 956 */ 957 pthread_mutex_unlock(&g_devlist_mutex); 958 return ch; 959 } 960 } 961 962 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 963 if (ch == NULL) { 964 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 965 pthread_mutex_unlock(&g_devlist_mutex); 966 return NULL; 967 } 968 969 ch->dev = dev; 970 ch->destroy_cb = dev->destroy_cb; 971 ch->thread = thread; 972 ch->ref = 1; 973 ch->destroy_ref = 0; 974 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 975 976 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 977 ch, dev->name, dev->io_device, thread->name, ch->ref); 978 979 dev->refcnt++; 980 981 pthread_mutex_unlock(&g_devlist_mutex); 982 983 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 984 if (rc != 0) { 985 pthread_mutex_lock(&g_devlist_mutex); 986 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 987 dev->refcnt--; 988 free(ch); 989 pthread_mutex_unlock(&g_devlist_mutex); 990 return NULL; 991 } 992 993 return ch; 994 } 995 996 static void 997 _spdk_put_io_channel(void *arg) 998 { 999 struct spdk_io_channel *ch = arg; 1000 bool do_remove_dev = true; 1001 struct spdk_thread *thread; 1002 1003 thread = spdk_get_thread(); 1004 if (!thread) { 1005 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 1006 assert(false); 1007 return; 1008 } 1009 1010 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1011 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 1012 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 1013 1014 assert(ch->thread == thread); 1015 1016 ch->destroy_ref--; 1017 1018 if (ch->ref > 0 || ch->destroy_ref > 0) { 1019 /* 1020 * Another reference to the associated io_device was requested 1021 * after this message was sent but before it had a chance to 1022 * execute. 1023 */ 1024 return; 1025 } 1026 1027 pthread_mutex_lock(&g_devlist_mutex); 1028 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1029 pthread_mutex_unlock(&g_devlist_mutex); 1030 1031 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1032 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1033 1034 pthread_mutex_lock(&g_devlist_mutex); 1035 ch->dev->refcnt--; 1036 1037 if (!ch->dev->unregistered) { 1038 do_remove_dev = false; 1039 } 1040 1041 if (ch->dev->refcnt > 0) { 1042 do_remove_dev = false; 1043 } 1044 1045 pthread_mutex_unlock(&g_devlist_mutex); 1046 1047 if (do_remove_dev) { 1048 _spdk_io_device_free(ch->dev); 1049 } 1050 free(ch); 1051 } 1052 1053 void 1054 spdk_put_io_channel(struct spdk_io_channel *ch) 1055 { 1056 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1057 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1058 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1059 1060 ch->ref--; 1061 1062 if (ch->ref == 0) { 1063 ch->destroy_ref++; 1064 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1065 } 1066 } 1067 1068 struct spdk_io_channel * 1069 spdk_io_channel_from_ctx(void *ctx) 1070 { 1071 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1072 } 1073 1074 struct spdk_thread * 1075 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1076 { 1077 return ch->thread; 1078 } 1079 1080 struct spdk_io_channel_iter { 1081 void *io_device; 1082 struct io_device *dev; 1083 spdk_channel_msg fn; 1084 int status; 1085 void *ctx; 1086 struct spdk_io_channel *ch; 1087 1088 struct spdk_thread *cur_thread; 1089 1090 struct spdk_thread *orig_thread; 1091 spdk_channel_for_each_cpl cpl; 1092 }; 1093 1094 void * 1095 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1096 { 1097 return i->io_device; 1098 } 1099 1100 struct spdk_io_channel * 1101 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1102 { 1103 return i->ch; 1104 } 1105 1106 void * 1107 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1108 { 1109 return i->ctx; 1110 } 1111 1112 static void 1113 _call_completion(void *ctx) 1114 { 1115 struct spdk_io_channel_iter *i = ctx; 1116 1117 if (i->cpl != NULL) { 1118 i->cpl(i, i->status); 1119 } 1120 free(i); 1121 } 1122 1123 static void 1124 _call_channel(void *ctx) 1125 { 1126 struct spdk_io_channel_iter *i = ctx; 1127 struct spdk_io_channel *ch; 1128 1129 /* 1130 * It is possible that the channel was deleted before this 1131 * message had a chance to execute. If so, skip calling 1132 * the fn() on this thread. 1133 */ 1134 pthread_mutex_lock(&g_devlist_mutex); 1135 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1136 if (ch->dev->io_device == i->io_device) { 1137 break; 1138 } 1139 } 1140 pthread_mutex_unlock(&g_devlist_mutex); 1141 1142 if (ch) { 1143 i->fn(i); 1144 } else { 1145 spdk_for_each_channel_continue(i, 0); 1146 } 1147 } 1148 1149 void 1150 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1151 spdk_channel_for_each_cpl cpl) 1152 { 1153 struct spdk_thread *thread; 1154 struct spdk_io_channel *ch; 1155 struct spdk_io_channel_iter *i; 1156 1157 i = calloc(1, sizeof(*i)); 1158 if (!i) { 1159 SPDK_ERRLOG("Unable to allocate iterator\n"); 1160 return; 1161 } 1162 1163 i->io_device = io_device; 1164 i->fn = fn; 1165 i->ctx = ctx; 1166 i->cpl = cpl; 1167 1168 pthread_mutex_lock(&g_devlist_mutex); 1169 i->orig_thread = _get_thread(); 1170 1171 TAILQ_FOREACH(thread, &g_threads, tailq) { 1172 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1173 if (ch->dev->io_device == io_device) { 1174 ch->dev->for_each_count++; 1175 i->dev = ch->dev; 1176 i->cur_thread = thread; 1177 i->ch = ch; 1178 pthread_mutex_unlock(&g_devlist_mutex); 1179 spdk_thread_send_msg(thread, _call_channel, i); 1180 return; 1181 } 1182 } 1183 } 1184 1185 pthread_mutex_unlock(&g_devlist_mutex); 1186 1187 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1188 } 1189 1190 void 1191 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1192 { 1193 struct spdk_thread *thread; 1194 struct spdk_io_channel *ch; 1195 1196 assert(i->cur_thread == spdk_get_thread()); 1197 1198 i->status = status; 1199 1200 pthread_mutex_lock(&g_devlist_mutex); 1201 if (status) { 1202 goto end; 1203 } 1204 thread = TAILQ_NEXT(i->cur_thread, tailq); 1205 while (thread) { 1206 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1207 if (ch->dev->io_device == i->io_device) { 1208 i->cur_thread = thread; 1209 i->ch = ch; 1210 pthread_mutex_unlock(&g_devlist_mutex); 1211 spdk_thread_send_msg(thread, _call_channel, i); 1212 return; 1213 } 1214 } 1215 thread = TAILQ_NEXT(thread, tailq); 1216 } 1217 1218 end: 1219 i->dev->for_each_count--; 1220 i->ch = NULL; 1221 pthread_mutex_unlock(&g_devlist_mutex); 1222 1223 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1224 } 1225 1226 1227 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1228