1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 44 #ifdef __linux__ 45 #include <sys/prctl.h> 46 #endif 47 48 #ifdef __FreeBSD__ 49 #include <pthread_np.h> 50 #endif 51 52 #define SPDK_MSG_BATCH_SIZE 8 53 54 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 55 56 struct io_device { 57 void *io_device; 58 char *name; 59 spdk_io_channel_create_cb create_cb; 60 spdk_io_channel_destroy_cb destroy_cb; 61 spdk_io_device_unregister_cb unregister_cb; 62 struct spdk_thread *unregister_thread; 63 uint32_t ctx_size; 64 uint32_t for_each_count; 65 TAILQ_ENTRY(io_device) tailq; 66 67 uint32_t refcnt; 68 69 bool unregistered; 70 }; 71 72 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 73 74 struct spdk_msg { 75 spdk_thread_fn fn; 76 void *arg; 77 }; 78 79 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 80 81 enum spdk_poller_state { 82 /* The poller is registered with a thread but not currently executing its fn. */ 83 SPDK_POLLER_STATE_WAITING, 84 85 /* The poller is currently running its fn. */ 86 SPDK_POLLER_STATE_RUNNING, 87 88 /* The poller was unregistered during the execution of its fn. */ 89 SPDK_POLLER_STATE_UNREGISTERED, 90 }; 91 92 struct spdk_poller { 93 TAILQ_ENTRY(spdk_poller) tailq; 94 95 /* Current state of the poller; should only be accessed from the poller's thread. */ 96 enum spdk_poller_state state; 97 98 uint64_t period_ticks; 99 uint64_t next_run_tick; 100 spdk_poller_fn fn; 101 void *arg; 102 }; 103 104 struct spdk_thread { 105 pthread_t thread_id; 106 spdk_thread_pass_msg msg_fn; 107 spdk_start_poller start_poller_fn; 108 spdk_stop_poller stop_poller_fn; 109 void *thread_ctx; 110 TAILQ_HEAD(, spdk_io_channel) io_channels; 111 TAILQ_ENTRY(spdk_thread) tailq; 112 char *name; 113 114 /* 115 * Contains pollers actively running on this thread. Pollers 116 * are run round-robin. The thread takes one poller from the head 117 * of the ring, executes it, then puts it back at the tail of 118 * the ring. 119 */ 120 TAILQ_HEAD(, spdk_poller) active_pollers; 121 122 /** 123 * Contains pollers running on this thread with a periodic timer. 124 */ 125 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 126 127 struct spdk_ring *messages; 128 }; 129 130 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 131 static uint32_t g_thread_count = 0; 132 133 static struct spdk_thread * 134 _get_thread(void) 135 { 136 pthread_t thread_id; 137 struct spdk_thread *thread; 138 139 thread_id = pthread_self(); 140 141 thread = NULL; 142 TAILQ_FOREACH(thread, &g_threads, tailq) { 143 if (thread->thread_id == thread_id) { 144 return thread; 145 } 146 } 147 148 return NULL; 149 } 150 151 static void 152 _set_thread_name(const char *thread_name) 153 { 154 #if defined(__linux__) 155 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 156 #elif defined(__FreeBSD__) 157 pthread_set_name_np(pthread_self(), thread_name); 158 #else 159 #error missing platform support for thread name 160 #endif 161 } 162 163 int 164 spdk_thread_lib_init(void) 165 { 166 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 167 168 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 169 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 170 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 171 sizeof(struct spdk_msg), 172 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 173 SPDK_ENV_SOCKET_ID_ANY); 174 175 if (!g_spdk_msg_mempool) { 176 return -1; 177 } 178 179 return 0; 180 } 181 182 void 183 spdk_thread_lib_fini(void) 184 { 185 struct io_device *dev; 186 187 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 188 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 189 } 190 191 if (g_spdk_msg_mempool) { 192 spdk_mempool_free(g_spdk_msg_mempool); 193 } 194 } 195 196 struct spdk_thread * 197 spdk_allocate_thread(spdk_thread_pass_msg msg_fn, 198 spdk_start_poller start_poller_fn, 199 spdk_stop_poller stop_poller_fn, 200 void *thread_ctx, const char *name) 201 { 202 struct spdk_thread *thread; 203 204 pthread_mutex_lock(&g_devlist_mutex); 205 206 thread = _get_thread(); 207 if (thread) { 208 SPDK_ERRLOG("Double allocated SPDK thread\n"); 209 pthread_mutex_unlock(&g_devlist_mutex); 210 return NULL; 211 } 212 213 if ((start_poller_fn != NULL && stop_poller_fn == NULL) || 214 (start_poller_fn == NULL && stop_poller_fn != NULL)) { 215 SPDK_ERRLOG("start_poller_fn and stop_poller_fn must either both be NULL or both be non-NULL\n"); 216 return NULL; 217 } 218 219 thread = calloc(1, sizeof(*thread)); 220 if (!thread) { 221 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 222 pthread_mutex_unlock(&g_devlist_mutex); 223 return NULL; 224 } 225 226 thread->thread_id = pthread_self(); 227 thread->msg_fn = msg_fn; 228 thread->start_poller_fn = start_poller_fn; 229 thread->stop_poller_fn = stop_poller_fn; 230 thread->thread_ctx = thread_ctx; 231 TAILQ_INIT(&thread->io_channels); 232 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 233 234 TAILQ_INIT(&thread->active_pollers); 235 TAILQ_INIT(&thread->timer_pollers); 236 237 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 238 if (!thread->messages) { 239 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 240 free(thread); 241 pthread_mutex_unlock(&g_devlist_mutex); 242 return NULL; 243 } 244 245 g_thread_count++; 246 if (name) { 247 _set_thread_name(name); 248 thread->name = strdup(name); 249 } else { 250 thread->name = spdk_sprintf_alloc("%p", thread); 251 } 252 253 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 254 255 pthread_mutex_unlock(&g_devlist_mutex); 256 257 return thread; 258 } 259 260 void 261 spdk_free_thread(void) 262 { 263 struct spdk_thread *thread; 264 struct spdk_io_channel *ch; 265 266 pthread_mutex_lock(&g_devlist_mutex); 267 268 thread = _get_thread(); 269 if (!thread) { 270 SPDK_ERRLOG("No thread allocated\n"); 271 pthread_mutex_unlock(&g_devlist_mutex); 272 return; 273 } 274 275 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 276 277 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 278 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 279 thread->name, ch->dev->name); 280 } 281 282 assert(g_thread_count > 0); 283 g_thread_count--; 284 TAILQ_REMOVE(&g_threads, thread, tailq); 285 free(thread->name); 286 287 if (thread->messages) { 288 spdk_ring_free(thread->messages); 289 } 290 291 free(thread); 292 293 pthread_mutex_unlock(&g_devlist_mutex); 294 } 295 296 static inline uint32_t 297 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 298 { 299 unsigned count, i; 300 void *messages[SPDK_MSG_BATCH_SIZE]; 301 302 #ifdef DEBUG 303 /* 304 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 305 * so we will never actually read uninitialized data from events, but just to be sure 306 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 307 */ 308 memset(messages, 0, sizeof(messages)); 309 #endif 310 311 if (max_msgs > 0) { 312 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 313 } else { 314 max_msgs = SPDK_MSG_BATCH_SIZE; 315 } 316 317 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 318 if (count == 0) { 319 return 0; 320 } 321 322 for (i = 0; i < count; i++) { 323 struct spdk_msg *msg = messages[i]; 324 325 assert(msg != NULL); 326 msg->fn(msg->arg); 327 } 328 329 spdk_mempool_put_bulk(g_spdk_msg_mempool, messages, count); 330 331 return count; 332 } 333 334 static void 335 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 336 { 337 struct spdk_poller *iter; 338 339 poller->next_run_tick = now + poller->period_ticks; 340 341 /* 342 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 343 * run time. 344 */ 345 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 346 if (iter->next_run_tick <= poller->next_run_tick) { 347 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 348 return; 349 } 350 } 351 352 /* No earlier pollers were found, so this poller must be the new head */ 353 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 354 } 355 356 int 357 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs) 358 { 359 uint32_t msg_count; 360 struct spdk_poller *poller; 361 int rc = 0; 362 363 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 364 if (msg_count) { 365 rc = 1; 366 } 367 368 poller = TAILQ_FIRST(&thread->active_pollers); 369 if (poller) { 370 int poller_rc; 371 372 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 373 poller->state = SPDK_POLLER_STATE_RUNNING; 374 poller_rc = poller->fn(poller->arg); 375 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 376 free(poller); 377 } else { 378 poller->state = SPDK_POLLER_STATE_WAITING; 379 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 380 } 381 382 #ifdef DEBUG 383 if (rc == -1) { 384 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 385 } 386 #endif 387 388 if (poller_rc > rc) { 389 rc = poller_rc; 390 } 391 } 392 393 poller = TAILQ_FIRST(&thread->timer_pollers); 394 if (poller) { 395 uint64_t now = spdk_get_ticks(); 396 397 if (now >= poller->next_run_tick) { 398 int timer_rc = 0; 399 400 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 401 poller->state = SPDK_POLLER_STATE_RUNNING; 402 timer_rc = poller->fn(poller->arg); 403 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 404 free(poller); 405 } else { 406 poller->state = SPDK_POLLER_STATE_WAITING; 407 _spdk_poller_insert_timer(thread, poller, now); 408 } 409 410 #ifdef DEBUG 411 if (timer_rc == -1) { 412 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 413 } 414 #endif 415 416 if (timer_rc > rc) { 417 rc = timer_rc; 418 419 } 420 } 421 } 422 423 return rc; 424 } 425 426 uint64_t 427 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 428 { 429 struct spdk_poller *poller; 430 431 poller = TAILQ_FIRST(&thread->timer_pollers); 432 if (poller) { 433 return poller->next_run_tick; 434 } 435 436 return 0; 437 } 438 439 uint32_t 440 spdk_thread_get_count(void) 441 { 442 /* 443 * Return cached value of the current thread count. We could acquire the 444 * lock and iterate through the TAILQ of threads to count them, but that 445 * count could still be invalidated after we release the lock. 446 */ 447 return g_thread_count; 448 } 449 450 struct spdk_thread * 451 spdk_get_thread(void) 452 { 453 struct spdk_thread *thread; 454 455 pthread_mutex_lock(&g_devlist_mutex); 456 457 thread = _get_thread(); 458 if (!thread) { 459 SPDK_ERRLOG("No thread allocated\n"); 460 } 461 462 pthread_mutex_unlock(&g_devlist_mutex); 463 464 return thread; 465 } 466 467 const char * 468 spdk_thread_get_name(const struct spdk_thread *thread) 469 { 470 return thread->name; 471 } 472 473 void 474 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx) 475 { 476 struct spdk_msg *msg; 477 int rc; 478 479 if (!thread) { 480 assert(false); 481 return; 482 } 483 484 if (thread->msg_fn) { 485 thread->msg_fn(fn, ctx, thread->thread_ctx); 486 return; 487 } 488 489 msg = spdk_mempool_get(g_spdk_msg_mempool); 490 if (!msg) { 491 assert(false); 492 return; 493 } 494 495 msg->fn = fn; 496 msg->arg = ctx; 497 498 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 499 if (rc != 1) { 500 assert(false); 501 spdk_mempool_put(g_spdk_msg_mempool, msg); 502 return; 503 } 504 } 505 506 struct spdk_poller * 507 spdk_poller_register(spdk_poller_fn fn, 508 void *arg, 509 uint64_t period_microseconds) 510 { 511 struct spdk_thread *thread; 512 struct spdk_poller *poller; 513 uint64_t quotient, remainder, ticks; 514 515 thread = spdk_get_thread(); 516 if (!thread) { 517 assert(false); 518 return NULL; 519 } 520 521 if (thread->start_poller_fn) { 522 return thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds); 523 } 524 525 poller = calloc(1, sizeof(*poller)); 526 if (poller == NULL) { 527 SPDK_ERRLOG("Poller memory allocation failed\n"); 528 return NULL; 529 } 530 531 poller->state = SPDK_POLLER_STATE_WAITING; 532 poller->fn = fn; 533 poller->arg = arg; 534 535 if (period_microseconds) { 536 quotient = period_microseconds / SPDK_SEC_TO_USEC; 537 remainder = period_microseconds % SPDK_SEC_TO_USEC; 538 ticks = spdk_get_ticks_hz(); 539 540 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 541 } else { 542 poller->period_ticks = 0; 543 } 544 545 if (poller->period_ticks) { 546 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 547 } else { 548 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 549 } 550 551 return poller; 552 } 553 554 void 555 spdk_poller_unregister(struct spdk_poller **ppoller) 556 { 557 struct spdk_thread *thread; 558 struct spdk_poller *poller; 559 560 poller = *ppoller; 561 if (poller == NULL) { 562 return; 563 } 564 565 *ppoller = NULL; 566 567 thread = spdk_get_thread(); 568 if (!thread) { 569 assert(false); 570 return; 571 } 572 573 if (thread->stop_poller_fn) { 574 thread->stop_poller_fn(poller, thread->thread_ctx); 575 return; 576 } 577 578 if (poller->state == SPDK_POLLER_STATE_RUNNING) { 579 /* 580 * We are being called from the poller_fn, so set the state to unregistered 581 * and let the thread poll loop free the poller. 582 */ 583 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 584 } else { 585 /* Poller is not running currently, so just free it. */ 586 if (poller->period_ticks) { 587 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 588 } else { 589 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 590 } 591 592 free(poller); 593 } 594 } 595 596 struct call_thread { 597 struct spdk_thread *cur_thread; 598 spdk_thread_fn fn; 599 void *ctx; 600 601 struct spdk_thread *orig_thread; 602 spdk_thread_fn cpl; 603 }; 604 605 static void 606 spdk_on_thread(void *ctx) 607 { 608 struct call_thread *ct = ctx; 609 610 ct->fn(ct->ctx); 611 612 pthread_mutex_lock(&g_devlist_mutex); 613 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 614 pthread_mutex_unlock(&g_devlist_mutex); 615 616 if (!ct->cur_thread) { 617 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 618 619 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 620 free(ctx); 621 } else { 622 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 623 ct->cur_thread->name); 624 625 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 626 } 627 } 628 629 void 630 spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl) 631 { 632 struct call_thread *ct; 633 struct spdk_thread *thread; 634 635 ct = calloc(1, sizeof(*ct)); 636 if (!ct) { 637 SPDK_ERRLOG("Unable to perform thread iteration\n"); 638 cpl(ctx); 639 return; 640 } 641 642 ct->fn = fn; 643 ct->ctx = ctx; 644 ct->cpl = cpl; 645 646 pthread_mutex_lock(&g_devlist_mutex); 647 thread = _get_thread(); 648 if (!thread) { 649 SPDK_ERRLOG("No thread allocated\n"); 650 free(ct); 651 cpl(ctx); 652 return; 653 } 654 ct->orig_thread = thread; 655 ct->cur_thread = TAILQ_FIRST(&g_threads); 656 pthread_mutex_unlock(&g_devlist_mutex); 657 658 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 659 ct->orig_thread->name); 660 661 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 662 } 663 664 void 665 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 666 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 667 const char *name) 668 { 669 struct io_device *dev, *tmp; 670 struct spdk_thread *thread; 671 672 assert(io_device != NULL); 673 assert(create_cb != NULL); 674 assert(destroy_cb != NULL); 675 676 thread = spdk_get_thread(); 677 if (!thread) { 678 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 679 assert(false); 680 return; 681 } 682 683 dev = calloc(1, sizeof(struct io_device)); 684 if (dev == NULL) { 685 SPDK_ERRLOG("could not allocate io_device\n"); 686 return; 687 } 688 689 dev->io_device = io_device; 690 if (name) { 691 dev->name = strdup(name); 692 } else { 693 dev->name = spdk_sprintf_alloc("%p", dev); 694 } 695 dev->create_cb = create_cb; 696 dev->destroy_cb = destroy_cb; 697 dev->unregister_cb = NULL; 698 dev->ctx_size = ctx_size; 699 dev->for_each_count = 0; 700 dev->unregistered = false; 701 dev->refcnt = 0; 702 703 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 704 dev->name, dev->io_device, thread->name); 705 706 pthread_mutex_lock(&g_devlist_mutex); 707 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 708 if (tmp->io_device == io_device) { 709 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 710 io_device, tmp->name, dev->name); 711 free(dev->name); 712 free(dev); 713 pthread_mutex_unlock(&g_devlist_mutex); 714 return; 715 } 716 } 717 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 718 pthread_mutex_unlock(&g_devlist_mutex); 719 } 720 721 static void 722 _finish_unregister(void *arg) 723 { 724 struct io_device *dev = arg; 725 726 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 727 dev->name, dev->io_device, dev->unregister_thread->name); 728 729 dev->unregister_cb(dev->io_device); 730 free(dev->name); 731 free(dev); 732 } 733 734 static void 735 _spdk_io_device_free(struct io_device *dev) 736 { 737 if (dev->unregister_cb == NULL) { 738 free(dev->name); 739 free(dev); 740 } else { 741 assert(dev->unregister_thread != NULL); 742 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 743 dev->name, dev->io_device, dev->unregister_thread->name); 744 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 745 } 746 } 747 748 void 749 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 750 { 751 struct io_device *dev; 752 uint32_t refcnt; 753 struct spdk_thread *thread; 754 755 thread = spdk_get_thread(); 756 if (!thread) { 757 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 758 assert(false); 759 return; 760 } 761 762 pthread_mutex_lock(&g_devlist_mutex); 763 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 764 if (dev->io_device == io_device) { 765 break; 766 } 767 } 768 769 if (!dev) { 770 SPDK_ERRLOG("io_device %p not found\n", io_device); 771 assert(false); 772 pthread_mutex_unlock(&g_devlist_mutex); 773 return; 774 } 775 776 if (dev->for_each_count > 0) { 777 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 778 dev->name, io_device, dev->for_each_count); 779 pthread_mutex_unlock(&g_devlist_mutex); 780 return; 781 } 782 783 dev->unregister_cb = unregister_cb; 784 dev->unregistered = true; 785 TAILQ_REMOVE(&g_io_devices, dev, tailq); 786 refcnt = dev->refcnt; 787 dev->unregister_thread = thread; 788 pthread_mutex_unlock(&g_devlist_mutex); 789 790 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 791 dev->name, dev->io_device, thread->name); 792 793 if (refcnt > 0) { 794 /* defer deletion */ 795 return; 796 } 797 798 _spdk_io_device_free(dev); 799 } 800 801 struct spdk_io_channel * 802 spdk_get_io_channel(void *io_device) 803 { 804 struct spdk_io_channel *ch; 805 struct spdk_thread *thread; 806 struct io_device *dev; 807 int rc; 808 809 pthread_mutex_lock(&g_devlist_mutex); 810 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 811 if (dev->io_device == io_device) { 812 break; 813 } 814 } 815 if (dev == NULL) { 816 SPDK_ERRLOG("could not find io_device %p\n", io_device); 817 pthread_mutex_unlock(&g_devlist_mutex); 818 return NULL; 819 } 820 821 thread = _get_thread(); 822 if (!thread) { 823 SPDK_ERRLOG("No thread allocated\n"); 824 pthread_mutex_unlock(&g_devlist_mutex); 825 return NULL; 826 } 827 828 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 829 if (ch->dev == dev) { 830 ch->ref++; 831 832 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 833 ch, dev->name, dev->io_device, thread->name, ch->ref); 834 835 /* 836 * An I/O channel already exists for this device on this 837 * thread, so return it. 838 */ 839 pthread_mutex_unlock(&g_devlist_mutex); 840 return ch; 841 } 842 } 843 844 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 845 if (ch == NULL) { 846 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 847 pthread_mutex_unlock(&g_devlist_mutex); 848 return NULL; 849 } 850 851 ch->dev = dev; 852 ch->destroy_cb = dev->destroy_cb; 853 ch->thread = thread; 854 ch->ref = 1; 855 ch->destroy_ref = 0; 856 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 857 858 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 859 ch, dev->name, dev->io_device, thread->name, ch->ref); 860 861 dev->refcnt++; 862 863 pthread_mutex_unlock(&g_devlist_mutex); 864 865 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 866 if (rc != 0) { 867 pthread_mutex_lock(&g_devlist_mutex); 868 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 869 dev->refcnt--; 870 free(ch); 871 pthread_mutex_unlock(&g_devlist_mutex); 872 return NULL; 873 } 874 875 return ch; 876 } 877 878 static void 879 _spdk_put_io_channel(void *arg) 880 { 881 struct spdk_io_channel *ch = arg; 882 bool do_remove_dev = true; 883 struct spdk_thread *thread; 884 885 thread = spdk_get_thread(); 886 if (!thread) { 887 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 888 assert(false); 889 return; 890 } 891 892 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 893 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 894 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 895 896 assert(ch->thread == thread); 897 898 ch->destroy_ref--; 899 900 if (ch->ref > 0 || ch->destroy_ref > 0) { 901 /* 902 * Another reference to the associated io_device was requested 903 * after this message was sent but before it had a chance to 904 * execute. 905 */ 906 return; 907 } 908 909 pthread_mutex_lock(&g_devlist_mutex); 910 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 911 pthread_mutex_unlock(&g_devlist_mutex); 912 913 /* Don't hold the devlist mutex while the destroy_cb is called. */ 914 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 915 916 pthread_mutex_lock(&g_devlist_mutex); 917 ch->dev->refcnt--; 918 919 if (!ch->dev->unregistered) { 920 do_remove_dev = false; 921 } 922 923 if (ch->dev->refcnt > 0) { 924 do_remove_dev = false; 925 } 926 927 pthread_mutex_unlock(&g_devlist_mutex); 928 929 if (do_remove_dev) { 930 _spdk_io_device_free(ch->dev); 931 } 932 free(ch); 933 } 934 935 void 936 spdk_put_io_channel(struct spdk_io_channel *ch) 937 { 938 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 939 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 940 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 941 942 ch->ref--; 943 944 if (ch->ref == 0) { 945 ch->destroy_ref++; 946 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 947 } 948 } 949 950 struct spdk_io_channel * 951 spdk_io_channel_from_ctx(void *ctx) 952 { 953 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 954 } 955 956 struct spdk_thread * 957 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 958 { 959 return ch->thread; 960 } 961 962 struct spdk_io_channel_iter { 963 void *io_device; 964 struct io_device *dev; 965 spdk_channel_msg fn; 966 int status; 967 void *ctx; 968 struct spdk_io_channel *ch; 969 970 struct spdk_thread *cur_thread; 971 972 struct spdk_thread *orig_thread; 973 spdk_channel_for_each_cpl cpl; 974 }; 975 976 void * 977 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 978 { 979 return i->io_device; 980 } 981 982 struct spdk_io_channel * 983 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 984 { 985 return i->ch; 986 } 987 988 void * 989 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 990 { 991 return i->ctx; 992 } 993 994 static void 995 _call_completion(void *ctx) 996 { 997 struct spdk_io_channel_iter *i = ctx; 998 999 if (i->cpl != NULL) { 1000 i->cpl(i, i->status); 1001 } 1002 free(i); 1003 } 1004 1005 static void 1006 _call_channel(void *ctx) 1007 { 1008 struct spdk_io_channel_iter *i = ctx; 1009 struct spdk_io_channel *ch; 1010 1011 /* 1012 * It is possible that the channel was deleted before this 1013 * message had a chance to execute. If so, skip calling 1014 * the fn() on this thread. 1015 */ 1016 pthread_mutex_lock(&g_devlist_mutex); 1017 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1018 if (ch->dev->io_device == i->io_device) { 1019 break; 1020 } 1021 } 1022 pthread_mutex_unlock(&g_devlist_mutex); 1023 1024 if (ch) { 1025 i->fn(i); 1026 } else { 1027 spdk_for_each_channel_continue(i, 0); 1028 } 1029 } 1030 1031 void 1032 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1033 spdk_channel_for_each_cpl cpl) 1034 { 1035 struct spdk_thread *thread; 1036 struct spdk_io_channel *ch; 1037 struct spdk_io_channel_iter *i; 1038 1039 i = calloc(1, sizeof(*i)); 1040 if (!i) { 1041 SPDK_ERRLOG("Unable to allocate iterator\n"); 1042 return; 1043 } 1044 1045 i->io_device = io_device; 1046 i->fn = fn; 1047 i->ctx = ctx; 1048 i->cpl = cpl; 1049 1050 pthread_mutex_lock(&g_devlist_mutex); 1051 i->orig_thread = _get_thread(); 1052 1053 TAILQ_FOREACH(thread, &g_threads, tailq) { 1054 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1055 if (ch->dev->io_device == io_device) { 1056 ch->dev->for_each_count++; 1057 i->dev = ch->dev; 1058 i->cur_thread = thread; 1059 i->ch = ch; 1060 pthread_mutex_unlock(&g_devlist_mutex); 1061 spdk_thread_send_msg(thread, _call_channel, i); 1062 return; 1063 } 1064 } 1065 } 1066 1067 pthread_mutex_unlock(&g_devlist_mutex); 1068 1069 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1070 } 1071 1072 void 1073 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1074 { 1075 struct spdk_thread *thread; 1076 struct spdk_io_channel *ch; 1077 1078 assert(i->cur_thread == spdk_get_thread()); 1079 1080 i->status = status; 1081 1082 pthread_mutex_lock(&g_devlist_mutex); 1083 if (status) { 1084 goto end; 1085 } 1086 thread = TAILQ_NEXT(i->cur_thread, tailq); 1087 while (thread) { 1088 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1089 if (ch->dev->io_device == i->io_device) { 1090 i->cur_thread = thread; 1091 i->ch = ch; 1092 pthread_mutex_unlock(&g_devlist_mutex); 1093 spdk_thread_send_msg(thread, _call_channel, i); 1094 return; 1095 } 1096 } 1097 thread = TAILQ_NEXT(thread, tailq); 1098 } 1099 1100 end: 1101 i->dev->for_each_count--; 1102 i->ch = NULL; 1103 pthread_mutex_unlock(&g_devlist_mutex); 1104 1105 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1106 } 1107 1108 1109 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1110