1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #ifdef __linux__ 46 #include <sys/prctl.h> 47 #endif 48 49 #ifdef __FreeBSD__ 50 #include <pthread_np.h> 51 #endif 52 53 #define SPDK_MSG_BATCH_SIZE 8 54 55 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 56 57 static spdk_new_thread_fn g_new_thread_fn = NULL; 58 59 struct io_device { 60 void *io_device; 61 char *name; 62 spdk_io_channel_create_cb create_cb; 63 spdk_io_channel_destroy_cb destroy_cb; 64 spdk_io_device_unregister_cb unregister_cb; 65 struct spdk_thread *unregister_thread; 66 uint32_t ctx_size; 67 uint32_t for_each_count; 68 TAILQ_ENTRY(io_device) tailq; 69 70 uint32_t refcnt; 71 72 bool unregistered; 73 }; 74 75 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 76 77 struct spdk_msg { 78 spdk_msg_fn fn; 79 void *arg; 80 }; 81 82 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 83 84 enum spdk_poller_state { 85 /* The poller is registered with a thread but not currently executing its fn. */ 86 SPDK_POLLER_STATE_WAITING, 87 88 /* The poller is currently running its fn. */ 89 SPDK_POLLER_STATE_RUNNING, 90 91 /* The poller was unregistered during the execution of its fn. */ 92 SPDK_POLLER_STATE_UNREGISTERED, 93 }; 94 95 struct spdk_poller { 96 TAILQ_ENTRY(spdk_poller) tailq; 97 98 /* Current state of the poller; should only be accessed from the poller's thread. */ 99 enum spdk_poller_state state; 100 101 uint64_t period_ticks; 102 uint64_t next_run_tick; 103 spdk_poller_fn fn; 104 void *arg; 105 }; 106 107 struct spdk_thread { 108 TAILQ_HEAD(, spdk_io_channel) io_channels; 109 TAILQ_ENTRY(spdk_thread) tailq; 110 char *name; 111 112 /* 113 * Contains pollers actively running on this thread. Pollers 114 * are run round-robin. The thread takes one poller from the head 115 * of the ring, executes it, then puts it back at the tail of 116 * the ring. 117 */ 118 TAILQ_HEAD(, spdk_poller) active_pollers; 119 120 /** 121 * Contains pollers running on this thread with a periodic timer. 122 */ 123 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 124 125 struct spdk_ring *messages; 126 }; 127 128 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 129 static uint32_t g_thread_count = 0; 130 131 static __thread struct spdk_thread *tls_thread = NULL; 132 133 static inline struct spdk_thread * 134 _get_thread(void) 135 { 136 return tls_thread; 137 } 138 139 static void 140 _set_thread_name(const char *thread_name) 141 { 142 #if defined(__linux__) 143 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 144 #elif defined(__FreeBSD__) 145 pthread_set_name_np(pthread_self(), thread_name); 146 #else 147 #error missing platform support for thread name 148 #endif 149 } 150 151 int 152 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn) 153 { 154 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 155 156 assert(g_new_thread_fn == NULL); 157 g_new_thread_fn = new_thread_fn; 158 159 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 160 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 161 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 162 sizeof(struct spdk_msg), 163 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 164 SPDK_ENV_SOCKET_ID_ANY); 165 166 if (!g_spdk_msg_mempool) { 167 return -1; 168 } 169 170 return 0; 171 } 172 173 void 174 spdk_thread_lib_fini(void) 175 { 176 struct io_device *dev; 177 178 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 179 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 180 } 181 182 if (g_spdk_msg_mempool) { 183 spdk_mempool_free(g_spdk_msg_mempool); 184 } 185 } 186 187 struct spdk_thread * 188 spdk_thread_create(const char *name) 189 { 190 struct spdk_thread *thread; 191 192 thread = calloc(1, sizeof(*thread)); 193 if (!thread) { 194 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 195 return NULL; 196 } 197 198 TAILQ_INIT(&thread->io_channels); 199 TAILQ_INIT(&thread->active_pollers); 200 TAILQ_INIT(&thread->timer_pollers); 201 202 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 203 if (!thread->messages) { 204 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 205 free(thread); 206 return NULL; 207 } 208 209 if (name) { 210 _set_thread_name(name); 211 thread->name = strdup(name); 212 } else { 213 thread->name = spdk_sprintf_alloc("%p", thread); 214 } 215 216 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 217 218 pthread_mutex_lock(&g_devlist_mutex); 219 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 220 g_thread_count++; 221 pthread_mutex_unlock(&g_devlist_mutex); 222 223 if (g_new_thread_fn) { 224 g_new_thread_fn(thread); 225 } 226 227 return thread; 228 } 229 230 void 231 spdk_set_thread(struct spdk_thread *thread) 232 { 233 tls_thread = thread; 234 } 235 236 void 237 spdk_thread_exit(struct spdk_thread *thread) 238 { 239 struct spdk_io_channel *ch; 240 241 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 242 243 if (tls_thread == thread) { 244 tls_thread = NULL; 245 } 246 247 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 248 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 249 thread->name, ch->dev->name); 250 } 251 252 pthread_mutex_lock(&g_devlist_mutex); 253 assert(g_thread_count > 0); 254 g_thread_count--; 255 TAILQ_REMOVE(&g_threads, thread, tailq); 256 pthread_mutex_unlock(&g_devlist_mutex); 257 258 free(thread->name); 259 260 if (thread->messages) { 261 spdk_ring_free(thread->messages); 262 } 263 264 free(thread); 265 } 266 267 static inline uint32_t 268 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 269 { 270 unsigned count, i; 271 void *messages[SPDK_MSG_BATCH_SIZE]; 272 273 #ifdef DEBUG 274 /* 275 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 276 * so we will never actually read uninitialized data from events, but just to be sure 277 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 278 */ 279 memset(messages, 0, sizeof(messages)); 280 #endif 281 282 if (max_msgs > 0) { 283 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 284 } else { 285 max_msgs = SPDK_MSG_BATCH_SIZE; 286 } 287 288 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 289 if (count == 0) { 290 return 0; 291 } 292 293 for (i = 0; i < count; i++) { 294 struct spdk_msg *msg = messages[i]; 295 296 assert(msg != NULL); 297 msg->fn(msg->arg); 298 } 299 300 spdk_mempool_put_bulk(g_spdk_msg_mempool, messages, count); 301 302 return count; 303 } 304 305 static void 306 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 307 { 308 struct spdk_poller *iter; 309 310 poller->next_run_tick = now + poller->period_ticks; 311 312 /* 313 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 314 * run time. 315 */ 316 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 317 if (iter->next_run_tick <= poller->next_run_tick) { 318 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 319 return; 320 } 321 } 322 323 /* No earlier pollers were found, so this poller must be the new head */ 324 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 325 } 326 327 int 328 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs) 329 { 330 uint32_t msg_count; 331 struct spdk_thread *orig_thread; 332 struct spdk_poller *poller; 333 int rc = 0; 334 335 orig_thread = _get_thread(); 336 tls_thread = thread; 337 338 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 339 if (msg_count) { 340 rc = 1; 341 } 342 343 poller = TAILQ_FIRST(&thread->active_pollers); 344 if (poller) { 345 int poller_rc; 346 347 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 348 poller->state = SPDK_POLLER_STATE_RUNNING; 349 poller_rc = poller->fn(poller->arg); 350 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 351 free(poller); 352 } else { 353 poller->state = SPDK_POLLER_STATE_WAITING; 354 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 355 } 356 357 #ifdef DEBUG 358 if (poller_rc == -1) { 359 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 360 } 361 #endif 362 363 if (poller_rc > rc) { 364 rc = poller_rc; 365 } 366 } 367 368 poller = TAILQ_FIRST(&thread->timer_pollers); 369 if (poller) { 370 uint64_t now = spdk_get_ticks(); 371 372 if (now >= poller->next_run_tick) { 373 int timer_rc = 0; 374 375 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 376 poller->state = SPDK_POLLER_STATE_RUNNING; 377 timer_rc = poller->fn(poller->arg); 378 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 379 free(poller); 380 } else { 381 poller->state = SPDK_POLLER_STATE_WAITING; 382 _spdk_poller_insert_timer(thread, poller, now); 383 } 384 385 #ifdef DEBUG 386 if (timer_rc == -1) { 387 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 388 } 389 #endif 390 391 if (timer_rc > rc) { 392 rc = timer_rc; 393 394 } 395 } 396 } 397 398 tls_thread = orig_thread; 399 400 return rc; 401 } 402 403 uint64_t 404 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 405 { 406 struct spdk_poller *poller; 407 408 poller = TAILQ_FIRST(&thread->timer_pollers); 409 if (poller) { 410 return poller->next_run_tick; 411 } 412 413 return 0; 414 } 415 416 int 417 spdk_thread_has_active_pollers(struct spdk_thread *thread) 418 { 419 return !TAILQ_EMPTY(&thread->active_pollers); 420 } 421 422 uint32_t 423 spdk_thread_get_count(void) 424 { 425 /* 426 * Return cached value of the current thread count. We could acquire the 427 * lock and iterate through the TAILQ of threads to count them, but that 428 * count could still be invalidated after we release the lock. 429 */ 430 return g_thread_count; 431 } 432 433 struct spdk_thread * 434 spdk_get_thread(void) 435 { 436 struct spdk_thread *thread; 437 438 thread = _get_thread(); 439 if (!thread) { 440 SPDK_ERRLOG("No thread allocated\n"); 441 } 442 443 return thread; 444 } 445 446 const char * 447 spdk_thread_get_name(const struct spdk_thread *thread) 448 { 449 return thread->name; 450 } 451 452 void 453 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 454 { 455 struct spdk_msg *msg; 456 int rc; 457 458 if (!thread) { 459 assert(false); 460 return; 461 } 462 463 msg = spdk_mempool_get(g_spdk_msg_mempool); 464 if (!msg) { 465 assert(false); 466 return; 467 } 468 469 msg->fn = fn; 470 msg->arg = ctx; 471 472 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 473 if (rc != 1) { 474 assert(false); 475 spdk_mempool_put(g_spdk_msg_mempool, msg); 476 return; 477 } 478 } 479 480 struct spdk_poller * 481 spdk_poller_register(spdk_poller_fn fn, 482 void *arg, 483 uint64_t period_microseconds) 484 { 485 struct spdk_thread *thread; 486 struct spdk_poller *poller; 487 uint64_t quotient, remainder, ticks; 488 489 thread = spdk_get_thread(); 490 if (!thread) { 491 assert(false); 492 return NULL; 493 } 494 495 poller = calloc(1, sizeof(*poller)); 496 if (poller == NULL) { 497 SPDK_ERRLOG("Poller memory allocation failed\n"); 498 return NULL; 499 } 500 501 poller->state = SPDK_POLLER_STATE_WAITING; 502 poller->fn = fn; 503 poller->arg = arg; 504 505 if (period_microseconds) { 506 quotient = period_microseconds / SPDK_SEC_TO_USEC; 507 remainder = period_microseconds % SPDK_SEC_TO_USEC; 508 ticks = spdk_get_ticks_hz(); 509 510 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 511 } else { 512 poller->period_ticks = 0; 513 } 514 515 if (poller->period_ticks) { 516 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 517 } else { 518 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 519 } 520 521 return poller; 522 } 523 524 void 525 spdk_poller_unregister(struct spdk_poller **ppoller) 526 { 527 struct spdk_thread *thread; 528 struct spdk_poller *poller; 529 530 poller = *ppoller; 531 if (poller == NULL) { 532 return; 533 } 534 535 *ppoller = NULL; 536 537 thread = spdk_get_thread(); 538 if (!thread) { 539 assert(false); 540 return; 541 } 542 543 if (poller->state == SPDK_POLLER_STATE_RUNNING) { 544 /* 545 * We are being called from the poller_fn, so set the state to unregistered 546 * and let the thread poll loop free the poller. 547 */ 548 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 549 } else { 550 /* Poller is not running currently, so just free it. */ 551 if (poller->period_ticks) { 552 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 553 } else { 554 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 555 } 556 557 free(poller); 558 } 559 } 560 561 struct call_thread { 562 struct spdk_thread *cur_thread; 563 spdk_msg_fn fn; 564 void *ctx; 565 566 struct spdk_thread *orig_thread; 567 spdk_msg_fn cpl; 568 }; 569 570 static void 571 spdk_on_thread(void *ctx) 572 { 573 struct call_thread *ct = ctx; 574 575 ct->fn(ct->ctx); 576 577 pthread_mutex_lock(&g_devlist_mutex); 578 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 579 pthread_mutex_unlock(&g_devlist_mutex); 580 581 if (!ct->cur_thread) { 582 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 583 584 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 585 free(ctx); 586 } else { 587 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 588 ct->cur_thread->name); 589 590 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 591 } 592 } 593 594 void 595 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 596 { 597 struct call_thread *ct; 598 struct spdk_thread *thread; 599 600 ct = calloc(1, sizeof(*ct)); 601 if (!ct) { 602 SPDK_ERRLOG("Unable to perform thread iteration\n"); 603 cpl(ctx); 604 return; 605 } 606 607 ct->fn = fn; 608 ct->ctx = ctx; 609 ct->cpl = cpl; 610 611 pthread_mutex_lock(&g_devlist_mutex); 612 thread = _get_thread(); 613 if (!thread) { 614 SPDK_ERRLOG("No thread allocated\n"); 615 free(ct); 616 cpl(ctx); 617 return; 618 } 619 ct->orig_thread = thread; 620 ct->cur_thread = TAILQ_FIRST(&g_threads); 621 pthread_mutex_unlock(&g_devlist_mutex); 622 623 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 624 ct->orig_thread->name); 625 626 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 627 } 628 629 void 630 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 631 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 632 const char *name) 633 { 634 struct io_device *dev, *tmp; 635 struct spdk_thread *thread; 636 637 assert(io_device != NULL); 638 assert(create_cb != NULL); 639 assert(destroy_cb != NULL); 640 641 thread = spdk_get_thread(); 642 if (!thread) { 643 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 644 assert(false); 645 return; 646 } 647 648 dev = calloc(1, sizeof(struct io_device)); 649 if (dev == NULL) { 650 SPDK_ERRLOG("could not allocate io_device\n"); 651 return; 652 } 653 654 dev->io_device = io_device; 655 if (name) { 656 dev->name = strdup(name); 657 } else { 658 dev->name = spdk_sprintf_alloc("%p", dev); 659 } 660 dev->create_cb = create_cb; 661 dev->destroy_cb = destroy_cb; 662 dev->unregister_cb = NULL; 663 dev->ctx_size = ctx_size; 664 dev->for_each_count = 0; 665 dev->unregistered = false; 666 dev->refcnt = 0; 667 668 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 669 dev->name, dev->io_device, thread->name); 670 671 pthread_mutex_lock(&g_devlist_mutex); 672 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 673 if (tmp->io_device == io_device) { 674 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 675 io_device, tmp->name, dev->name); 676 free(dev->name); 677 free(dev); 678 pthread_mutex_unlock(&g_devlist_mutex); 679 return; 680 } 681 } 682 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 683 pthread_mutex_unlock(&g_devlist_mutex); 684 } 685 686 static void 687 _finish_unregister(void *arg) 688 { 689 struct io_device *dev = arg; 690 691 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 692 dev->name, dev->io_device, dev->unregister_thread->name); 693 694 dev->unregister_cb(dev->io_device); 695 free(dev->name); 696 free(dev); 697 } 698 699 static void 700 _spdk_io_device_free(struct io_device *dev) 701 { 702 if (dev->unregister_cb == NULL) { 703 free(dev->name); 704 free(dev); 705 } else { 706 assert(dev->unregister_thread != NULL); 707 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 708 dev->name, dev->io_device, dev->unregister_thread->name); 709 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 710 } 711 } 712 713 void 714 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 715 { 716 struct io_device *dev; 717 uint32_t refcnt; 718 struct spdk_thread *thread; 719 720 thread = spdk_get_thread(); 721 if (!thread) { 722 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 723 assert(false); 724 return; 725 } 726 727 pthread_mutex_lock(&g_devlist_mutex); 728 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 729 if (dev->io_device == io_device) { 730 break; 731 } 732 } 733 734 if (!dev) { 735 SPDK_ERRLOG("io_device %p not found\n", io_device); 736 assert(false); 737 pthread_mutex_unlock(&g_devlist_mutex); 738 return; 739 } 740 741 if (dev->for_each_count > 0) { 742 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 743 dev->name, io_device, dev->for_each_count); 744 pthread_mutex_unlock(&g_devlist_mutex); 745 return; 746 } 747 748 dev->unregister_cb = unregister_cb; 749 dev->unregistered = true; 750 TAILQ_REMOVE(&g_io_devices, dev, tailq); 751 refcnt = dev->refcnt; 752 dev->unregister_thread = thread; 753 pthread_mutex_unlock(&g_devlist_mutex); 754 755 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 756 dev->name, dev->io_device, thread->name); 757 758 if (refcnt > 0) { 759 /* defer deletion */ 760 return; 761 } 762 763 _spdk_io_device_free(dev); 764 } 765 766 struct spdk_io_channel * 767 spdk_get_io_channel(void *io_device) 768 { 769 struct spdk_io_channel *ch; 770 struct spdk_thread *thread; 771 struct io_device *dev; 772 int rc; 773 774 pthread_mutex_lock(&g_devlist_mutex); 775 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 776 if (dev->io_device == io_device) { 777 break; 778 } 779 } 780 if (dev == NULL) { 781 SPDK_ERRLOG("could not find io_device %p\n", io_device); 782 pthread_mutex_unlock(&g_devlist_mutex); 783 return NULL; 784 } 785 786 thread = _get_thread(); 787 if (!thread) { 788 SPDK_ERRLOG("No thread allocated\n"); 789 pthread_mutex_unlock(&g_devlist_mutex); 790 return NULL; 791 } 792 793 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 794 if (ch->dev == dev) { 795 ch->ref++; 796 797 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 798 ch, dev->name, dev->io_device, thread->name, ch->ref); 799 800 /* 801 * An I/O channel already exists for this device on this 802 * thread, so return it. 803 */ 804 pthread_mutex_unlock(&g_devlist_mutex); 805 return ch; 806 } 807 } 808 809 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 810 if (ch == NULL) { 811 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 812 pthread_mutex_unlock(&g_devlist_mutex); 813 return NULL; 814 } 815 816 ch->dev = dev; 817 ch->destroy_cb = dev->destroy_cb; 818 ch->thread = thread; 819 ch->ref = 1; 820 ch->destroy_ref = 0; 821 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 822 823 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 824 ch, dev->name, dev->io_device, thread->name, ch->ref); 825 826 dev->refcnt++; 827 828 pthread_mutex_unlock(&g_devlist_mutex); 829 830 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 831 if (rc != 0) { 832 pthread_mutex_lock(&g_devlist_mutex); 833 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 834 dev->refcnt--; 835 free(ch); 836 pthread_mutex_unlock(&g_devlist_mutex); 837 return NULL; 838 } 839 840 return ch; 841 } 842 843 static void 844 _spdk_put_io_channel(void *arg) 845 { 846 struct spdk_io_channel *ch = arg; 847 bool do_remove_dev = true; 848 struct spdk_thread *thread; 849 850 thread = spdk_get_thread(); 851 if (!thread) { 852 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 853 assert(false); 854 return; 855 } 856 857 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 858 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 859 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 860 861 assert(ch->thread == thread); 862 863 ch->destroy_ref--; 864 865 if (ch->ref > 0 || ch->destroy_ref > 0) { 866 /* 867 * Another reference to the associated io_device was requested 868 * after this message was sent but before it had a chance to 869 * execute. 870 */ 871 return; 872 } 873 874 pthread_mutex_lock(&g_devlist_mutex); 875 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 876 pthread_mutex_unlock(&g_devlist_mutex); 877 878 /* Don't hold the devlist mutex while the destroy_cb is called. */ 879 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 880 881 pthread_mutex_lock(&g_devlist_mutex); 882 ch->dev->refcnt--; 883 884 if (!ch->dev->unregistered) { 885 do_remove_dev = false; 886 } 887 888 if (ch->dev->refcnt > 0) { 889 do_remove_dev = false; 890 } 891 892 pthread_mutex_unlock(&g_devlist_mutex); 893 894 if (do_remove_dev) { 895 _spdk_io_device_free(ch->dev); 896 } 897 free(ch); 898 } 899 900 void 901 spdk_put_io_channel(struct spdk_io_channel *ch) 902 { 903 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 904 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 905 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 906 907 ch->ref--; 908 909 if (ch->ref == 0) { 910 ch->destroy_ref++; 911 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 912 } 913 } 914 915 struct spdk_io_channel * 916 spdk_io_channel_from_ctx(void *ctx) 917 { 918 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 919 } 920 921 struct spdk_thread * 922 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 923 { 924 return ch->thread; 925 } 926 927 struct spdk_io_channel_iter { 928 void *io_device; 929 struct io_device *dev; 930 spdk_channel_msg fn; 931 int status; 932 void *ctx; 933 struct spdk_io_channel *ch; 934 935 struct spdk_thread *cur_thread; 936 937 struct spdk_thread *orig_thread; 938 spdk_channel_for_each_cpl cpl; 939 }; 940 941 void * 942 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 943 { 944 return i->io_device; 945 } 946 947 struct spdk_io_channel * 948 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 949 { 950 return i->ch; 951 } 952 953 void * 954 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 955 { 956 return i->ctx; 957 } 958 959 static void 960 _call_completion(void *ctx) 961 { 962 struct spdk_io_channel_iter *i = ctx; 963 964 if (i->cpl != NULL) { 965 i->cpl(i, i->status); 966 } 967 free(i); 968 } 969 970 static void 971 _call_channel(void *ctx) 972 { 973 struct spdk_io_channel_iter *i = ctx; 974 struct spdk_io_channel *ch; 975 976 /* 977 * It is possible that the channel was deleted before this 978 * message had a chance to execute. If so, skip calling 979 * the fn() on this thread. 980 */ 981 pthread_mutex_lock(&g_devlist_mutex); 982 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 983 if (ch->dev->io_device == i->io_device) { 984 break; 985 } 986 } 987 pthread_mutex_unlock(&g_devlist_mutex); 988 989 if (ch) { 990 i->fn(i); 991 } else { 992 spdk_for_each_channel_continue(i, 0); 993 } 994 } 995 996 void 997 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 998 spdk_channel_for_each_cpl cpl) 999 { 1000 struct spdk_thread *thread; 1001 struct spdk_io_channel *ch; 1002 struct spdk_io_channel_iter *i; 1003 1004 i = calloc(1, sizeof(*i)); 1005 if (!i) { 1006 SPDK_ERRLOG("Unable to allocate iterator\n"); 1007 return; 1008 } 1009 1010 i->io_device = io_device; 1011 i->fn = fn; 1012 i->ctx = ctx; 1013 i->cpl = cpl; 1014 1015 pthread_mutex_lock(&g_devlist_mutex); 1016 i->orig_thread = _get_thread(); 1017 1018 TAILQ_FOREACH(thread, &g_threads, tailq) { 1019 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1020 if (ch->dev->io_device == io_device) { 1021 ch->dev->for_each_count++; 1022 i->dev = ch->dev; 1023 i->cur_thread = thread; 1024 i->ch = ch; 1025 pthread_mutex_unlock(&g_devlist_mutex); 1026 spdk_thread_send_msg(thread, _call_channel, i); 1027 return; 1028 } 1029 } 1030 } 1031 1032 pthread_mutex_unlock(&g_devlist_mutex); 1033 1034 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1035 } 1036 1037 void 1038 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1039 { 1040 struct spdk_thread *thread; 1041 struct spdk_io_channel *ch; 1042 1043 assert(i->cur_thread == spdk_get_thread()); 1044 1045 i->status = status; 1046 1047 pthread_mutex_lock(&g_devlist_mutex); 1048 if (status) { 1049 goto end; 1050 } 1051 thread = TAILQ_NEXT(i->cur_thread, tailq); 1052 while (thread) { 1053 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1054 if (ch->dev->io_device == i->io_device) { 1055 i->cur_thread = thread; 1056 i->ch = ch; 1057 pthread_mutex_unlock(&g_devlist_mutex); 1058 spdk_thread_send_msg(thread, _call_channel, i); 1059 return; 1060 } 1061 } 1062 thread = TAILQ_NEXT(thread, tailq); 1063 } 1064 1065 end: 1066 i->dev->for_each_count--; 1067 i->ch = NULL; 1068 pthread_mutex_unlock(&g_devlist_mutex); 1069 1070 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1071 } 1072 1073 1074 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1075