1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #ifdef __linux__ 46 #include <sys/prctl.h> 47 #endif 48 49 #ifdef __FreeBSD__ 50 #include <pthread_np.h> 51 #endif 52 53 #define SPDK_MSG_BATCH_SIZE 8 54 55 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 56 57 static spdk_new_thread_fn g_new_thread_fn = NULL; 58 59 struct io_device { 60 void *io_device; 61 char *name; 62 spdk_io_channel_create_cb create_cb; 63 spdk_io_channel_destroy_cb destroy_cb; 64 spdk_io_device_unregister_cb unregister_cb; 65 struct spdk_thread *unregister_thread; 66 uint32_t ctx_size; 67 uint32_t for_each_count; 68 TAILQ_ENTRY(io_device) tailq; 69 70 uint32_t refcnt; 71 72 bool unregistered; 73 }; 74 75 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 76 77 struct spdk_msg { 78 spdk_msg_fn fn; 79 void *arg; 80 81 SLIST_ENTRY(spdk_msg) link; 82 }; 83 84 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 85 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 86 87 enum spdk_poller_state { 88 /* The poller is registered with a thread but not currently executing its fn. */ 89 SPDK_POLLER_STATE_WAITING, 90 91 /* The poller is currently running its fn. */ 92 SPDK_POLLER_STATE_RUNNING, 93 94 /* The poller was unregistered during the execution of its fn. */ 95 SPDK_POLLER_STATE_UNREGISTERED, 96 }; 97 98 struct spdk_poller { 99 TAILQ_ENTRY(spdk_poller) tailq; 100 101 /* Current state of the poller; should only be accessed from the poller's thread. */ 102 enum spdk_poller_state state; 103 104 uint64_t period_ticks; 105 uint64_t next_run_tick; 106 spdk_poller_fn fn; 107 void *arg; 108 }; 109 110 struct spdk_thread { 111 TAILQ_HEAD(, spdk_io_channel) io_channels; 112 TAILQ_ENTRY(spdk_thread) tailq; 113 char *name; 114 115 /* 116 * Contains pollers actively running on this thread. Pollers 117 * are run round-robin. The thread takes one poller from the head 118 * of the ring, executes it, then puts it back at the tail of 119 * the ring. 120 */ 121 TAILQ_HEAD(, spdk_poller) active_pollers; 122 123 /** 124 * Contains pollers running on this thread with a periodic timer. 125 */ 126 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 127 128 struct spdk_ring *messages; 129 130 SLIST_HEAD(, spdk_msg) msg_cache; 131 size_t msg_cache_count; 132 }; 133 134 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 135 static uint32_t g_thread_count = 0; 136 137 static __thread struct spdk_thread *tls_thread = NULL; 138 139 static inline struct spdk_thread * 140 _get_thread(void) 141 { 142 return tls_thread; 143 } 144 145 static void 146 _set_thread_name(const char *thread_name) 147 { 148 #if defined(__linux__) 149 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 150 #elif defined(__FreeBSD__) 151 pthread_set_name_np(pthread_self(), thread_name); 152 #else 153 #error missing platform support for thread name 154 #endif 155 } 156 157 int 158 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn) 159 { 160 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 161 162 assert(g_new_thread_fn == NULL); 163 g_new_thread_fn = new_thread_fn; 164 165 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 166 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 167 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 168 sizeof(struct spdk_msg), 169 0, /* No cache. We do our own. */ 170 SPDK_ENV_SOCKET_ID_ANY); 171 172 if (!g_spdk_msg_mempool) { 173 return -1; 174 } 175 176 return 0; 177 } 178 179 void 180 spdk_thread_lib_fini(void) 181 { 182 struct io_device *dev; 183 184 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 185 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 186 } 187 188 if (g_spdk_msg_mempool) { 189 spdk_mempool_free(g_spdk_msg_mempool); 190 } 191 } 192 193 struct spdk_thread * 194 spdk_thread_create(const char *name) 195 { 196 struct spdk_thread *thread; 197 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 198 int rc, i; 199 200 thread = calloc(1, sizeof(*thread)); 201 if (!thread) { 202 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 203 return NULL; 204 } 205 206 TAILQ_INIT(&thread->io_channels); 207 TAILQ_INIT(&thread->active_pollers); 208 TAILQ_INIT(&thread->timer_pollers); 209 SLIST_INIT(&thread->msg_cache); 210 thread->msg_cache_count = 0; 211 212 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 213 if (!thread->messages) { 214 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 215 free(thread); 216 return NULL; 217 } 218 219 /* Fill the local message pool cache. */ 220 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 221 if (rc == 0) { 222 /* If we can't populate the cache it's ok. The cache will get filled 223 * up organically as messages are passed to the thread. */ 224 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 225 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 226 thread->msg_cache_count++; 227 } 228 } 229 230 if (name) { 231 _set_thread_name(name); 232 thread->name = strdup(name); 233 } else { 234 thread->name = spdk_sprintf_alloc("%p", thread); 235 } 236 237 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 238 239 pthread_mutex_lock(&g_devlist_mutex); 240 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 241 g_thread_count++; 242 pthread_mutex_unlock(&g_devlist_mutex); 243 244 if (g_new_thread_fn) { 245 g_new_thread_fn(thread); 246 } 247 248 return thread; 249 } 250 251 void 252 spdk_set_thread(struct spdk_thread *thread) 253 { 254 tls_thread = thread; 255 } 256 257 void 258 spdk_thread_exit(struct spdk_thread *thread) 259 { 260 struct spdk_io_channel *ch; 261 struct spdk_msg *msg; 262 263 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 264 265 if (tls_thread == thread) { 266 tls_thread = NULL; 267 } 268 269 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 270 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 271 thread->name, ch->dev->name); 272 } 273 274 pthread_mutex_lock(&g_devlist_mutex); 275 assert(g_thread_count > 0); 276 g_thread_count--; 277 TAILQ_REMOVE(&g_threads, thread, tailq); 278 pthread_mutex_unlock(&g_devlist_mutex); 279 280 free(thread->name); 281 282 msg = SLIST_FIRST(&thread->msg_cache); 283 while (msg != NULL) { 284 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 285 286 assert(thread->msg_cache_count > 0); 287 thread->msg_cache_count--; 288 spdk_mempool_put(g_spdk_msg_mempool, msg); 289 290 msg = SLIST_FIRST(&thread->msg_cache); 291 } 292 293 assert(thread->msg_cache_count == 0); 294 295 if (thread->messages) { 296 spdk_ring_free(thread->messages); 297 } 298 299 free(thread); 300 } 301 302 static inline uint32_t 303 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 304 { 305 unsigned count, i; 306 void *messages[SPDK_MSG_BATCH_SIZE]; 307 308 #ifdef DEBUG 309 /* 310 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 311 * so we will never actually read uninitialized data from events, but just to be sure 312 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 313 */ 314 memset(messages, 0, sizeof(messages)); 315 #endif 316 317 if (max_msgs > 0) { 318 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 319 } else { 320 max_msgs = SPDK_MSG_BATCH_SIZE; 321 } 322 323 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 324 if (count == 0) { 325 return 0; 326 } 327 328 for (i = 0; i < count; i++) { 329 struct spdk_msg *msg = messages[i]; 330 331 assert(msg != NULL); 332 msg->fn(msg->arg); 333 334 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 335 /* Insert the messages at the head. We want to re-use the hot 336 * ones. */ 337 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 338 thread->msg_cache_count++; 339 } else { 340 spdk_mempool_put(g_spdk_msg_mempool, msg); 341 } 342 } 343 344 return count; 345 } 346 347 static void 348 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 349 { 350 struct spdk_poller *iter; 351 352 poller->next_run_tick = now + poller->period_ticks; 353 354 /* 355 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 356 * run time. 357 */ 358 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 359 if (iter->next_run_tick <= poller->next_run_tick) { 360 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 361 return; 362 } 363 } 364 365 /* No earlier pollers were found, so this poller must be the new head */ 366 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 367 } 368 369 int 370 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs) 371 { 372 uint32_t msg_count; 373 struct spdk_thread *orig_thread; 374 struct spdk_poller *poller; 375 int rc = 0; 376 377 orig_thread = _get_thread(); 378 tls_thread = thread; 379 380 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 381 if (msg_count) { 382 rc = 1; 383 } 384 385 poller = TAILQ_FIRST(&thread->active_pollers); 386 if (poller) { 387 int poller_rc; 388 389 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 390 poller->state = SPDK_POLLER_STATE_RUNNING; 391 poller_rc = poller->fn(poller->arg); 392 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 393 free(poller); 394 } else { 395 poller->state = SPDK_POLLER_STATE_WAITING; 396 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 397 } 398 399 #ifdef DEBUG 400 if (poller_rc == -1) { 401 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 402 } 403 #endif 404 405 if (poller_rc > rc) { 406 rc = poller_rc; 407 } 408 } 409 410 poller = TAILQ_FIRST(&thread->timer_pollers); 411 if (poller) { 412 uint64_t now = spdk_get_ticks(); 413 414 if (now >= poller->next_run_tick) { 415 int timer_rc = 0; 416 417 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 418 poller->state = SPDK_POLLER_STATE_RUNNING; 419 timer_rc = poller->fn(poller->arg); 420 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 421 free(poller); 422 } else { 423 poller->state = SPDK_POLLER_STATE_WAITING; 424 _spdk_poller_insert_timer(thread, poller, now); 425 } 426 427 #ifdef DEBUG 428 if (timer_rc == -1) { 429 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 430 } 431 #endif 432 433 if (timer_rc > rc) { 434 rc = timer_rc; 435 436 } 437 } 438 } 439 440 tls_thread = orig_thread; 441 442 return rc; 443 } 444 445 uint64_t 446 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 447 { 448 struct spdk_poller *poller; 449 450 poller = TAILQ_FIRST(&thread->timer_pollers); 451 if (poller) { 452 return poller->next_run_tick; 453 } 454 455 return 0; 456 } 457 458 int 459 spdk_thread_has_active_pollers(struct spdk_thread *thread) 460 { 461 return !TAILQ_EMPTY(&thread->active_pollers); 462 } 463 464 uint32_t 465 spdk_thread_get_count(void) 466 { 467 /* 468 * Return cached value of the current thread count. We could acquire the 469 * lock and iterate through the TAILQ of threads to count them, but that 470 * count could still be invalidated after we release the lock. 471 */ 472 return g_thread_count; 473 } 474 475 struct spdk_thread * 476 spdk_get_thread(void) 477 { 478 struct spdk_thread *thread; 479 480 thread = _get_thread(); 481 if (!thread) { 482 SPDK_ERRLOG("No thread allocated\n"); 483 } 484 485 return thread; 486 } 487 488 const char * 489 spdk_thread_get_name(const struct spdk_thread *thread) 490 { 491 return thread->name; 492 } 493 494 void 495 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 496 { 497 struct spdk_thread *local_thread; 498 struct spdk_msg *msg; 499 int rc; 500 501 if (!thread) { 502 assert(false); 503 return; 504 } 505 506 local_thread = _get_thread(); 507 508 msg = NULL; 509 if (local_thread != NULL) { 510 if (local_thread->msg_cache_count > 0) { 511 msg = SLIST_FIRST(&local_thread->msg_cache); 512 assert(msg != NULL); 513 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 514 local_thread->msg_cache_count--; 515 } 516 } 517 518 if (msg == NULL) { 519 msg = spdk_mempool_get(g_spdk_msg_mempool); 520 if (!msg) { 521 assert(false); 522 return; 523 } 524 } 525 526 msg->fn = fn; 527 msg->arg = ctx; 528 529 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 530 if (rc != 1) { 531 assert(false); 532 spdk_mempool_put(g_spdk_msg_mempool, msg); 533 return; 534 } 535 } 536 537 struct spdk_poller * 538 spdk_poller_register(spdk_poller_fn fn, 539 void *arg, 540 uint64_t period_microseconds) 541 { 542 struct spdk_thread *thread; 543 struct spdk_poller *poller; 544 uint64_t quotient, remainder, ticks; 545 546 thread = spdk_get_thread(); 547 if (!thread) { 548 assert(false); 549 return NULL; 550 } 551 552 poller = calloc(1, sizeof(*poller)); 553 if (poller == NULL) { 554 SPDK_ERRLOG("Poller memory allocation failed\n"); 555 return NULL; 556 } 557 558 poller->state = SPDK_POLLER_STATE_WAITING; 559 poller->fn = fn; 560 poller->arg = arg; 561 562 if (period_microseconds) { 563 quotient = period_microseconds / SPDK_SEC_TO_USEC; 564 remainder = period_microseconds % SPDK_SEC_TO_USEC; 565 ticks = spdk_get_ticks_hz(); 566 567 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 568 } else { 569 poller->period_ticks = 0; 570 } 571 572 if (poller->period_ticks) { 573 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 574 } else { 575 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 576 } 577 578 return poller; 579 } 580 581 void 582 spdk_poller_unregister(struct spdk_poller **ppoller) 583 { 584 struct spdk_thread *thread; 585 struct spdk_poller *poller; 586 587 poller = *ppoller; 588 if (poller == NULL) { 589 return; 590 } 591 592 *ppoller = NULL; 593 594 thread = spdk_get_thread(); 595 if (!thread) { 596 assert(false); 597 return; 598 } 599 600 if (poller->state == SPDK_POLLER_STATE_RUNNING) { 601 /* 602 * We are being called from the poller_fn, so set the state to unregistered 603 * and let the thread poll loop free the poller. 604 */ 605 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 606 } else { 607 /* Poller is not running currently, so just free it. */ 608 if (poller->period_ticks) { 609 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 610 } else { 611 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 612 } 613 614 free(poller); 615 } 616 } 617 618 struct call_thread { 619 struct spdk_thread *cur_thread; 620 spdk_msg_fn fn; 621 void *ctx; 622 623 struct spdk_thread *orig_thread; 624 spdk_msg_fn cpl; 625 }; 626 627 static void 628 spdk_on_thread(void *ctx) 629 { 630 struct call_thread *ct = ctx; 631 632 ct->fn(ct->ctx); 633 634 pthread_mutex_lock(&g_devlist_mutex); 635 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 636 pthread_mutex_unlock(&g_devlist_mutex); 637 638 if (!ct->cur_thread) { 639 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 640 641 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 642 free(ctx); 643 } else { 644 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 645 ct->cur_thread->name); 646 647 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 648 } 649 } 650 651 void 652 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 653 { 654 struct call_thread *ct; 655 struct spdk_thread *thread; 656 657 ct = calloc(1, sizeof(*ct)); 658 if (!ct) { 659 SPDK_ERRLOG("Unable to perform thread iteration\n"); 660 cpl(ctx); 661 return; 662 } 663 664 ct->fn = fn; 665 ct->ctx = ctx; 666 ct->cpl = cpl; 667 668 pthread_mutex_lock(&g_devlist_mutex); 669 thread = _get_thread(); 670 if (!thread) { 671 SPDK_ERRLOG("No thread allocated\n"); 672 free(ct); 673 cpl(ctx); 674 return; 675 } 676 ct->orig_thread = thread; 677 ct->cur_thread = TAILQ_FIRST(&g_threads); 678 pthread_mutex_unlock(&g_devlist_mutex); 679 680 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 681 ct->orig_thread->name); 682 683 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 684 } 685 686 void 687 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 688 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 689 const char *name) 690 { 691 struct io_device *dev, *tmp; 692 struct spdk_thread *thread; 693 694 assert(io_device != NULL); 695 assert(create_cb != NULL); 696 assert(destroy_cb != NULL); 697 698 thread = spdk_get_thread(); 699 if (!thread) { 700 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 701 assert(false); 702 return; 703 } 704 705 dev = calloc(1, sizeof(struct io_device)); 706 if (dev == NULL) { 707 SPDK_ERRLOG("could not allocate io_device\n"); 708 return; 709 } 710 711 dev->io_device = io_device; 712 if (name) { 713 dev->name = strdup(name); 714 } else { 715 dev->name = spdk_sprintf_alloc("%p", dev); 716 } 717 dev->create_cb = create_cb; 718 dev->destroy_cb = destroy_cb; 719 dev->unregister_cb = NULL; 720 dev->ctx_size = ctx_size; 721 dev->for_each_count = 0; 722 dev->unregistered = false; 723 dev->refcnt = 0; 724 725 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 726 dev->name, dev->io_device, thread->name); 727 728 pthread_mutex_lock(&g_devlist_mutex); 729 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 730 if (tmp->io_device == io_device) { 731 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 732 io_device, tmp->name, dev->name); 733 free(dev->name); 734 free(dev); 735 pthread_mutex_unlock(&g_devlist_mutex); 736 return; 737 } 738 } 739 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 740 pthread_mutex_unlock(&g_devlist_mutex); 741 } 742 743 static void 744 _finish_unregister(void *arg) 745 { 746 struct io_device *dev = arg; 747 748 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 749 dev->name, dev->io_device, dev->unregister_thread->name); 750 751 dev->unregister_cb(dev->io_device); 752 free(dev->name); 753 free(dev); 754 } 755 756 static void 757 _spdk_io_device_free(struct io_device *dev) 758 { 759 if (dev->unregister_cb == NULL) { 760 free(dev->name); 761 free(dev); 762 } else { 763 assert(dev->unregister_thread != NULL); 764 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 765 dev->name, dev->io_device, dev->unregister_thread->name); 766 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 767 } 768 } 769 770 void 771 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 772 { 773 struct io_device *dev; 774 uint32_t refcnt; 775 struct spdk_thread *thread; 776 777 thread = spdk_get_thread(); 778 if (!thread) { 779 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 780 assert(false); 781 return; 782 } 783 784 pthread_mutex_lock(&g_devlist_mutex); 785 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 786 if (dev->io_device == io_device) { 787 break; 788 } 789 } 790 791 if (!dev) { 792 SPDK_ERRLOG("io_device %p not found\n", io_device); 793 assert(false); 794 pthread_mutex_unlock(&g_devlist_mutex); 795 return; 796 } 797 798 if (dev->for_each_count > 0) { 799 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 800 dev->name, io_device, dev->for_each_count); 801 pthread_mutex_unlock(&g_devlist_mutex); 802 return; 803 } 804 805 dev->unregister_cb = unregister_cb; 806 dev->unregistered = true; 807 TAILQ_REMOVE(&g_io_devices, dev, tailq); 808 refcnt = dev->refcnt; 809 dev->unregister_thread = thread; 810 pthread_mutex_unlock(&g_devlist_mutex); 811 812 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 813 dev->name, dev->io_device, thread->name); 814 815 if (refcnt > 0) { 816 /* defer deletion */ 817 return; 818 } 819 820 _spdk_io_device_free(dev); 821 } 822 823 struct spdk_io_channel * 824 spdk_get_io_channel(void *io_device) 825 { 826 struct spdk_io_channel *ch; 827 struct spdk_thread *thread; 828 struct io_device *dev; 829 int rc; 830 831 pthread_mutex_lock(&g_devlist_mutex); 832 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 833 if (dev->io_device == io_device) { 834 break; 835 } 836 } 837 if (dev == NULL) { 838 SPDK_ERRLOG("could not find io_device %p\n", io_device); 839 pthread_mutex_unlock(&g_devlist_mutex); 840 return NULL; 841 } 842 843 thread = _get_thread(); 844 if (!thread) { 845 SPDK_ERRLOG("No thread allocated\n"); 846 pthread_mutex_unlock(&g_devlist_mutex); 847 return NULL; 848 } 849 850 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 851 if (ch->dev == dev) { 852 ch->ref++; 853 854 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 855 ch, dev->name, dev->io_device, thread->name, ch->ref); 856 857 /* 858 * An I/O channel already exists for this device on this 859 * thread, so return it. 860 */ 861 pthread_mutex_unlock(&g_devlist_mutex); 862 return ch; 863 } 864 } 865 866 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 867 if (ch == NULL) { 868 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 869 pthread_mutex_unlock(&g_devlist_mutex); 870 return NULL; 871 } 872 873 ch->dev = dev; 874 ch->destroy_cb = dev->destroy_cb; 875 ch->thread = thread; 876 ch->ref = 1; 877 ch->destroy_ref = 0; 878 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 879 880 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 881 ch, dev->name, dev->io_device, thread->name, ch->ref); 882 883 dev->refcnt++; 884 885 pthread_mutex_unlock(&g_devlist_mutex); 886 887 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 888 if (rc != 0) { 889 pthread_mutex_lock(&g_devlist_mutex); 890 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 891 dev->refcnt--; 892 free(ch); 893 pthread_mutex_unlock(&g_devlist_mutex); 894 return NULL; 895 } 896 897 return ch; 898 } 899 900 static void 901 _spdk_put_io_channel(void *arg) 902 { 903 struct spdk_io_channel *ch = arg; 904 bool do_remove_dev = true; 905 struct spdk_thread *thread; 906 907 thread = spdk_get_thread(); 908 if (!thread) { 909 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 910 assert(false); 911 return; 912 } 913 914 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 915 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 916 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 917 918 assert(ch->thread == thread); 919 920 ch->destroy_ref--; 921 922 if (ch->ref > 0 || ch->destroy_ref > 0) { 923 /* 924 * Another reference to the associated io_device was requested 925 * after this message was sent but before it had a chance to 926 * execute. 927 */ 928 return; 929 } 930 931 pthread_mutex_lock(&g_devlist_mutex); 932 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 933 pthread_mutex_unlock(&g_devlist_mutex); 934 935 /* Don't hold the devlist mutex while the destroy_cb is called. */ 936 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 937 938 pthread_mutex_lock(&g_devlist_mutex); 939 ch->dev->refcnt--; 940 941 if (!ch->dev->unregistered) { 942 do_remove_dev = false; 943 } 944 945 if (ch->dev->refcnt > 0) { 946 do_remove_dev = false; 947 } 948 949 pthread_mutex_unlock(&g_devlist_mutex); 950 951 if (do_remove_dev) { 952 _spdk_io_device_free(ch->dev); 953 } 954 free(ch); 955 } 956 957 void 958 spdk_put_io_channel(struct spdk_io_channel *ch) 959 { 960 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 961 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 962 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 963 964 ch->ref--; 965 966 if (ch->ref == 0) { 967 ch->destroy_ref++; 968 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 969 } 970 } 971 972 struct spdk_io_channel * 973 spdk_io_channel_from_ctx(void *ctx) 974 { 975 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 976 } 977 978 struct spdk_thread * 979 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 980 { 981 return ch->thread; 982 } 983 984 struct spdk_io_channel_iter { 985 void *io_device; 986 struct io_device *dev; 987 spdk_channel_msg fn; 988 int status; 989 void *ctx; 990 struct spdk_io_channel *ch; 991 992 struct spdk_thread *cur_thread; 993 994 struct spdk_thread *orig_thread; 995 spdk_channel_for_each_cpl cpl; 996 }; 997 998 void * 999 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1000 { 1001 return i->io_device; 1002 } 1003 1004 struct spdk_io_channel * 1005 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1006 { 1007 return i->ch; 1008 } 1009 1010 void * 1011 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1012 { 1013 return i->ctx; 1014 } 1015 1016 static void 1017 _call_completion(void *ctx) 1018 { 1019 struct spdk_io_channel_iter *i = ctx; 1020 1021 if (i->cpl != NULL) { 1022 i->cpl(i, i->status); 1023 } 1024 free(i); 1025 } 1026 1027 static void 1028 _call_channel(void *ctx) 1029 { 1030 struct spdk_io_channel_iter *i = ctx; 1031 struct spdk_io_channel *ch; 1032 1033 /* 1034 * It is possible that the channel was deleted before this 1035 * message had a chance to execute. If so, skip calling 1036 * the fn() on this thread. 1037 */ 1038 pthread_mutex_lock(&g_devlist_mutex); 1039 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1040 if (ch->dev->io_device == i->io_device) { 1041 break; 1042 } 1043 } 1044 pthread_mutex_unlock(&g_devlist_mutex); 1045 1046 if (ch) { 1047 i->fn(i); 1048 } else { 1049 spdk_for_each_channel_continue(i, 0); 1050 } 1051 } 1052 1053 void 1054 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1055 spdk_channel_for_each_cpl cpl) 1056 { 1057 struct spdk_thread *thread; 1058 struct spdk_io_channel *ch; 1059 struct spdk_io_channel_iter *i; 1060 1061 i = calloc(1, sizeof(*i)); 1062 if (!i) { 1063 SPDK_ERRLOG("Unable to allocate iterator\n"); 1064 return; 1065 } 1066 1067 i->io_device = io_device; 1068 i->fn = fn; 1069 i->ctx = ctx; 1070 i->cpl = cpl; 1071 1072 pthread_mutex_lock(&g_devlist_mutex); 1073 i->orig_thread = _get_thread(); 1074 1075 TAILQ_FOREACH(thread, &g_threads, tailq) { 1076 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1077 if (ch->dev->io_device == io_device) { 1078 ch->dev->for_each_count++; 1079 i->dev = ch->dev; 1080 i->cur_thread = thread; 1081 i->ch = ch; 1082 pthread_mutex_unlock(&g_devlist_mutex); 1083 spdk_thread_send_msg(thread, _call_channel, i); 1084 return; 1085 } 1086 } 1087 } 1088 1089 pthread_mutex_unlock(&g_devlist_mutex); 1090 1091 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1092 } 1093 1094 void 1095 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1096 { 1097 struct spdk_thread *thread; 1098 struct spdk_io_channel *ch; 1099 1100 assert(i->cur_thread == spdk_get_thread()); 1101 1102 i->status = status; 1103 1104 pthread_mutex_lock(&g_devlist_mutex); 1105 if (status) { 1106 goto end; 1107 } 1108 thread = TAILQ_NEXT(i->cur_thread, tailq); 1109 while (thread) { 1110 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1111 if (ch->dev->io_device == i->io_device) { 1112 i->cur_thread = thread; 1113 i->ch = ch; 1114 pthread_mutex_unlock(&g_devlist_mutex); 1115 spdk_thread_send_msg(thread, _call_channel, i); 1116 return; 1117 } 1118 } 1119 thread = TAILQ_NEXT(thread, tailq); 1120 } 1121 1122 end: 1123 i->dev->for_each_count--; 1124 i->ch = NULL; 1125 pthread_mutex_unlock(&g_devlist_mutex); 1126 1127 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1128 } 1129 1130 1131 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1132