1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #ifdef __linux__ 46 #include <sys/prctl.h> 47 #endif 48 49 #ifdef __FreeBSD__ 50 #include <pthread_np.h> 51 #endif 52 53 #define SPDK_MSG_BATCH_SIZE 8 54 55 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 56 57 struct io_device { 58 void *io_device; 59 char *name; 60 spdk_io_channel_create_cb create_cb; 61 spdk_io_channel_destroy_cb destroy_cb; 62 spdk_io_device_unregister_cb unregister_cb; 63 struct spdk_thread *unregister_thread; 64 uint32_t ctx_size; 65 uint32_t for_each_count; 66 TAILQ_ENTRY(io_device) tailq; 67 68 uint32_t refcnt; 69 70 bool unregistered; 71 }; 72 73 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 74 75 struct spdk_msg { 76 spdk_msg_fn fn; 77 void *arg; 78 }; 79 80 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 81 82 enum spdk_poller_state { 83 /* The poller is registered with a thread but not currently executing its fn. */ 84 SPDK_POLLER_STATE_WAITING, 85 86 /* The poller is currently running its fn. */ 87 SPDK_POLLER_STATE_RUNNING, 88 89 /* The poller was unregistered during the execution of its fn. */ 90 SPDK_POLLER_STATE_UNREGISTERED, 91 }; 92 93 struct spdk_poller { 94 TAILQ_ENTRY(spdk_poller) tailq; 95 96 /* Current state of the poller; should only be accessed from the poller's thread. */ 97 enum spdk_poller_state state; 98 99 uint64_t period_ticks; 100 uint64_t next_run_tick; 101 spdk_poller_fn fn; 102 void *arg; 103 }; 104 105 struct spdk_thread { 106 spdk_thread_pass_msg msg_fn; 107 spdk_start_poller start_poller_fn; 108 spdk_stop_poller stop_poller_fn; 109 void *thread_ctx; 110 TAILQ_HEAD(, spdk_io_channel) io_channels; 111 TAILQ_ENTRY(spdk_thread) tailq; 112 char *name; 113 114 /* 115 * Contains pollers actively running on this thread. Pollers 116 * are run round-robin. The thread takes one poller from the head 117 * of the ring, executes it, then puts it back at the tail of 118 * the ring. 119 */ 120 TAILQ_HEAD(, spdk_poller) active_pollers; 121 122 /** 123 * Contains pollers running on this thread with a periodic timer. 124 */ 125 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 126 127 struct spdk_ring *messages; 128 }; 129 130 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 131 static uint32_t g_thread_count = 0; 132 133 static __thread struct spdk_thread *tls_thread = NULL; 134 135 static inline struct spdk_thread * 136 _get_thread(void) 137 { 138 return tls_thread; 139 } 140 141 static void 142 _set_thread_name(const char *thread_name) 143 { 144 #if defined(__linux__) 145 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 146 #elif defined(__FreeBSD__) 147 pthread_set_name_np(pthread_self(), thread_name); 148 #else 149 #error missing platform support for thread name 150 #endif 151 } 152 153 int 154 spdk_thread_lib_init(void) 155 { 156 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 157 158 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 159 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 160 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 161 sizeof(struct spdk_msg), 162 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 163 SPDK_ENV_SOCKET_ID_ANY); 164 165 if (!g_spdk_msg_mempool) { 166 return -1; 167 } 168 169 return 0; 170 } 171 172 void 173 spdk_thread_lib_fini(void) 174 { 175 struct io_device *dev; 176 177 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 178 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 179 } 180 181 if (g_spdk_msg_mempool) { 182 spdk_mempool_free(g_spdk_msg_mempool); 183 } 184 } 185 186 struct spdk_thread * 187 spdk_allocate_thread(spdk_thread_pass_msg msg_fn, 188 spdk_start_poller start_poller_fn, 189 spdk_stop_poller stop_poller_fn, 190 void *thread_ctx, const char *name) 191 { 192 struct spdk_thread *thread; 193 194 thread = _get_thread(); 195 if (thread) { 196 SPDK_ERRLOG("Double allocated SPDK thread\n"); 197 return NULL; 198 } 199 200 if ((start_poller_fn != NULL && stop_poller_fn == NULL) || 201 (start_poller_fn == NULL && stop_poller_fn != NULL)) { 202 SPDK_ERRLOG("start_poller_fn and stop_poller_fn must either both be NULL or both be non-NULL\n"); 203 return NULL; 204 } 205 206 thread = calloc(1, sizeof(*thread)); 207 if (!thread) { 208 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 209 return NULL; 210 } 211 212 thread->msg_fn = msg_fn; 213 thread->start_poller_fn = start_poller_fn; 214 thread->stop_poller_fn = stop_poller_fn; 215 thread->thread_ctx = thread_ctx; 216 TAILQ_INIT(&thread->io_channels); 217 TAILQ_INIT(&thread->active_pollers); 218 TAILQ_INIT(&thread->timer_pollers); 219 220 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 221 if (!thread->messages) { 222 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 223 free(thread); 224 return NULL; 225 } 226 227 if (name) { 228 _set_thread_name(name); 229 thread->name = strdup(name); 230 } else { 231 thread->name = spdk_sprintf_alloc("%p", thread); 232 } 233 234 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 235 236 pthread_mutex_lock(&g_devlist_mutex); 237 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 238 g_thread_count++; 239 pthread_mutex_unlock(&g_devlist_mutex); 240 241 return thread; 242 } 243 244 void 245 spdk_set_thread(struct spdk_thread *thread) 246 { 247 tls_thread = thread; 248 } 249 250 void 251 spdk_free_thread(void) 252 { 253 struct spdk_thread *thread; 254 struct spdk_io_channel *ch; 255 256 thread = _get_thread(); 257 if (!thread) { 258 SPDK_ERRLOG("No thread allocated\n"); 259 return; 260 } 261 262 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 263 264 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 265 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 266 thread->name, ch->dev->name); 267 } 268 269 pthread_mutex_lock(&g_devlist_mutex); 270 assert(g_thread_count > 0); 271 g_thread_count--; 272 TAILQ_REMOVE(&g_threads, thread, tailq); 273 pthread_mutex_unlock(&g_devlist_mutex); 274 275 free(thread->name); 276 277 if (thread->messages) { 278 spdk_ring_free(thread->messages); 279 } 280 281 free(thread); 282 283 tls_thread = NULL; 284 } 285 286 static inline uint32_t 287 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 288 { 289 unsigned count, i; 290 void *messages[SPDK_MSG_BATCH_SIZE]; 291 292 #ifdef DEBUG 293 /* 294 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 295 * so we will never actually read uninitialized data from events, but just to be sure 296 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 297 */ 298 memset(messages, 0, sizeof(messages)); 299 #endif 300 301 if (max_msgs > 0) { 302 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 303 } else { 304 max_msgs = SPDK_MSG_BATCH_SIZE; 305 } 306 307 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 308 if (count == 0) { 309 return 0; 310 } 311 312 for (i = 0; i < count; i++) { 313 struct spdk_msg *msg = messages[i]; 314 315 assert(msg != NULL); 316 msg->fn(msg->arg); 317 } 318 319 spdk_mempool_put_bulk(g_spdk_msg_mempool, messages, count); 320 321 return count; 322 } 323 324 static void 325 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 326 { 327 struct spdk_poller *iter; 328 329 poller->next_run_tick = now + poller->period_ticks; 330 331 /* 332 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 333 * run time. 334 */ 335 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 336 if (iter->next_run_tick <= poller->next_run_tick) { 337 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 338 return; 339 } 340 } 341 342 /* No earlier pollers were found, so this poller must be the new head */ 343 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 344 } 345 346 int 347 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs) 348 { 349 uint32_t msg_count; 350 struct spdk_poller *poller; 351 int rc = 0; 352 353 assert(_get_thread() == NULL); 354 355 tls_thread = thread; 356 357 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 358 if (msg_count) { 359 rc = 1; 360 } 361 362 poller = TAILQ_FIRST(&thread->active_pollers); 363 if (poller) { 364 int poller_rc; 365 366 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 367 poller->state = SPDK_POLLER_STATE_RUNNING; 368 poller_rc = poller->fn(poller->arg); 369 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 370 free(poller); 371 } else { 372 poller->state = SPDK_POLLER_STATE_WAITING; 373 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 374 } 375 376 #ifdef DEBUG 377 if (poller_rc == -1) { 378 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 379 } 380 #endif 381 382 if (poller_rc > rc) { 383 rc = poller_rc; 384 } 385 } 386 387 poller = TAILQ_FIRST(&thread->timer_pollers); 388 if (poller) { 389 uint64_t now = spdk_get_ticks(); 390 391 if (now >= poller->next_run_tick) { 392 int timer_rc = 0; 393 394 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 395 poller->state = SPDK_POLLER_STATE_RUNNING; 396 timer_rc = poller->fn(poller->arg); 397 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 398 free(poller); 399 } else { 400 poller->state = SPDK_POLLER_STATE_WAITING; 401 _spdk_poller_insert_timer(thread, poller, now); 402 } 403 404 #ifdef DEBUG 405 if (timer_rc == -1) { 406 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 407 } 408 #endif 409 410 if (timer_rc > rc) { 411 rc = timer_rc; 412 413 } 414 } 415 } 416 417 tls_thread = NULL; 418 419 return rc; 420 } 421 422 uint64_t 423 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 424 { 425 struct spdk_poller *poller; 426 427 poller = TAILQ_FIRST(&thread->timer_pollers); 428 if (poller) { 429 return poller->next_run_tick; 430 } 431 432 return 0; 433 } 434 435 int 436 spdk_thread_has_active_pollers(struct spdk_thread *thread) 437 { 438 return !TAILQ_EMPTY(&thread->active_pollers); 439 } 440 441 uint32_t 442 spdk_thread_get_count(void) 443 { 444 /* 445 * Return cached value of the current thread count. We could acquire the 446 * lock and iterate through the TAILQ of threads to count them, but that 447 * count could still be invalidated after we release the lock. 448 */ 449 return g_thread_count; 450 } 451 452 struct spdk_thread * 453 spdk_get_thread(void) 454 { 455 struct spdk_thread *thread; 456 457 thread = _get_thread(); 458 if (!thread) { 459 SPDK_ERRLOG("No thread allocated\n"); 460 } 461 462 return thread; 463 } 464 465 const char * 466 spdk_thread_get_name(const struct spdk_thread *thread) 467 { 468 return thread->name; 469 } 470 471 void 472 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 473 { 474 struct spdk_msg *msg; 475 int rc; 476 477 if (!thread) { 478 assert(false); 479 return; 480 } 481 482 if (thread->msg_fn) { 483 thread->msg_fn(fn, ctx, thread->thread_ctx); 484 return; 485 } 486 487 msg = spdk_mempool_get(g_spdk_msg_mempool); 488 if (!msg) { 489 assert(false); 490 return; 491 } 492 493 msg->fn = fn; 494 msg->arg = ctx; 495 496 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 497 if (rc != 1) { 498 assert(false); 499 spdk_mempool_put(g_spdk_msg_mempool, msg); 500 return; 501 } 502 } 503 504 struct spdk_poller * 505 spdk_poller_register(spdk_poller_fn fn, 506 void *arg, 507 uint64_t period_microseconds) 508 { 509 struct spdk_thread *thread; 510 struct spdk_poller *poller; 511 uint64_t quotient, remainder, ticks; 512 513 thread = spdk_get_thread(); 514 if (!thread) { 515 assert(false); 516 return NULL; 517 } 518 519 if (thread->start_poller_fn) { 520 return thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds); 521 } 522 523 poller = calloc(1, sizeof(*poller)); 524 if (poller == NULL) { 525 SPDK_ERRLOG("Poller memory allocation failed\n"); 526 return NULL; 527 } 528 529 poller->state = SPDK_POLLER_STATE_WAITING; 530 poller->fn = fn; 531 poller->arg = arg; 532 533 if (period_microseconds) { 534 quotient = period_microseconds / SPDK_SEC_TO_USEC; 535 remainder = period_microseconds % SPDK_SEC_TO_USEC; 536 ticks = spdk_get_ticks_hz(); 537 538 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 539 } else { 540 poller->period_ticks = 0; 541 } 542 543 if (poller->period_ticks) { 544 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 545 } else { 546 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 547 } 548 549 return poller; 550 } 551 552 void 553 spdk_poller_unregister(struct spdk_poller **ppoller) 554 { 555 struct spdk_thread *thread; 556 struct spdk_poller *poller; 557 558 poller = *ppoller; 559 if (poller == NULL) { 560 return; 561 } 562 563 *ppoller = NULL; 564 565 thread = spdk_get_thread(); 566 if (!thread) { 567 assert(false); 568 return; 569 } 570 571 if (thread->stop_poller_fn) { 572 thread->stop_poller_fn(poller, thread->thread_ctx); 573 return; 574 } 575 576 if (poller->state == SPDK_POLLER_STATE_RUNNING) { 577 /* 578 * We are being called from the poller_fn, so set the state to unregistered 579 * and let the thread poll loop free the poller. 580 */ 581 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 582 } else { 583 /* Poller is not running currently, so just free it. */ 584 if (poller->period_ticks) { 585 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 586 } else { 587 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 588 } 589 590 free(poller); 591 } 592 } 593 594 struct call_thread { 595 struct spdk_thread *cur_thread; 596 spdk_msg_fn fn; 597 void *ctx; 598 599 struct spdk_thread *orig_thread; 600 spdk_msg_fn cpl; 601 }; 602 603 static void 604 spdk_on_thread(void *ctx) 605 { 606 struct call_thread *ct = ctx; 607 608 ct->fn(ct->ctx); 609 610 pthread_mutex_lock(&g_devlist_mutex); 611 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 612 pthread_mutex_unlock(&g_devlist_mutex); 613 614 if (!ct->cur_thread) { 615 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 616 617 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 618 free(ctx); 619 } else { 620 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 621 ct->cur_thread->name); 622 623 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 624 } 625 } 626 627 void 628 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 629 { 630 struct call_thread *ct; 631 struct spdk_thread *thread; 632 633 ct = calloc(1, sizeof(*ct)); 634 if (!ct) { 635 SPDK_ERRLOG("Unable to perform thread iteration\n"); 636 cpl(ctx); 637 return; 638 } 639 640 ct->fn = fn; 641 ct->ctx = ctx; 642 ct->cpl = cpl; 643 644 pthread_mutex_lock(&g_devlist_mutex); 645 thread = _get_thread(); 646 if (!thread) { 647 SPDK_ERRLOG("No thread allocated\n"); 648 free(ct); 649 cpl(ctx); 650 return; 651 } 652 ct->orig_thread = thread; 653 ct->cur_thread = TAILQ_FIRST(&g_threads); 654 pthread_mutex_unlock(&g_devlist_mutex); 655 656 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 657 ct->orig_thread->name); 658 659 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 660 } 661 662 void 663 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 664 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 665 const char *name) 666 { 667 struct io_device *dev, *tmp; 668 struct spdk_thread *thread; 669 670 assert(io_device != NULL); 671 assert(create_cb != NULL); 672 assert(destroy_cb != NULL); 673 674 thread = spdk_get_thread(); 675 if (!thread) { 676 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 677 assert(false); 678 return; 679 } 680 681 dev = calloc(1, sizeof(struct io_device)); 682 if (dev == NULL) { 683 SPDK_ERRLOG("could not allocate io_device\n"); 684 return; 685 } 686 687 dev->io_device = io_device; 688 if (name) { 689 dev->name = strdup(name); 690 } else { 691 dev->name = spdk_sprintf_alloc("%p", dev); 692 } 693 dev->create_cb = create_cb; 694 dev->destroy_cb = destroy_cb; 695 dev->unregister_cb = NULL; 696 dev->ctx_size = ctx_size; 697 dev->for_each_count = 0; 698 dev->unregistered = false; 699 dev->refcnt = 0; 700 701 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 702 dev->name, dev->io_device, thread->name); 703 704 pthread_mutex_lock(&g_devlist_mutex); 705 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 706 if (tmp->io_device == io_device) { 707 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 708 io_device, tmp->name, dev->name); 709 free(dev->name); 710 free(dev); 711 pthread_mutex_unlock(&g_devlist_mutex); 712 return; 713 } 714 } 715 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 716 pthread_mutex_unlock(&g_devlist_mutex); 717 } 718 719 static void 720 _finish_unregister(void *arg) 721 { 722 struct io_device *dev = arg; 723 724 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 725 dev->name, dev->io_device, dev->unregister_thread->name); 726 727 dev->unregister_cb(dev->io_device); 728 free(dev->name); 729 free(dev); 730 } 731 732 static void 733 _spdk_io_device_free(struct io_device *dev) 734 { 735 if (dev->unregister_cb == NULL) { 736 free(dev->name); 737 free(dev); 738 } else { 739 assert(dev->unregister_thread != NULL); 740 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 741 dev->name, dev->io_device, dev->unregister_thread->name); 742 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 743 } 744 } 745 746 void 747 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 748 { 749 struct io_device *dev; 750 uint32_t refcnt; 751 struct spdk_thread *thread; 752 753 thread = spdk_get_thread(); 754 if (!thread) { 755 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 756 assert(false); 757 return; 758 } 759 760 pthread_mutex_lock(&g_devlist_mutex); 761 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 762 if (dev->io_device == io_device) { 763 break; 764 } 765 } 766 767 if (!dev) { 768 SPDK_ERRLOG("io_device %p not found\n", io_device); 769 assert(false); 770 pthread_mutex_unlock(&g_devlist_mutex); 771 return; 772 } 773 774 if (dev->for_each_count > 0) { 775 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 776 dev->name, io_device, dev->for_each_count); 777 pthread_mutex_unlock(&g_devlist_mutex); 778 return; 779 } 780 781 dev->unregister_cb = unregister_cb; 782 dev->unregistered = true; 783 TAILQ_REMOVE(&g_io_devices, dev, tailq); 784 refcnt = dev->refcnt; 785 dev->unregister_thread = thread; 786 pthread_mutex_unlock(&g_devlist_mutex); 787 788 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 789 dev->name, dev->io_device, thread->name); 790 791 if (refcnt > 0) { 792 /* defer deletion */ 793 return; 794 } 795 796 _spdk_io_device_free(dev); 797 } 798 799 struct spdk_io_channel * 800 spdk_get_io_channel(void *io_device) 801 { 802 struct spdk_io_channel *ch; 803 struct spdk_thread *thread; 804 struct io_device *dev; 805 int rc; 806 807 pthread_mutex_lock(&g_devlist_mutex); 808 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 809 if (dev->io_device == io_device) { 810 break; 811 } 812 } 813 if (dev == NULL) { 814 SPDK_ERRLOG("could not find io_device %p\n", io_device); 815 pthread_mutex_unlock(&g_devlist_mutex); 816 return NULL; 817 } 818 819 thread = _get_thread(); 820 if (!thread) { 821 SPDK_ERRLOG("No thread allocated\n"); 822 pthread_mutex_unlock(&g_devlist_mutex); 823 return NULL; 824 } 825 826 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 827 if (ch->dev == dev) { 828 ch->ref++; 829 830 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 831 ch, dev->name, dev->io_device, thread->name, ch->ref); 832 833 /* 834 * An I/O channel already exists for this device on this 835 * thread, so return it. 836 */ 837 pthread_mutex_unlock(&g_devlist_mutex); 838 return ch; 839 } 840 } 841 842 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 843 if (ch == NULL) { 844 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 845 pthread_mutex_unlock(&g_devlist_mutex); 846 return NULL; 847 } 848 849 ch->dev = dev; 850 ch->destroy_cb = dev->destroy_cb; 851 ch->thread = thread; 852 ch->ref = 1; 853 ch->destroy_ref = 0; 854 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 855 856 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 857 ch, dev->name, dev->io_device, thread->name, ch->ref); 858 859 dev->refcnt++; 860 861 pthread_mutex_unlock(&g_devlist_mutex); 862 863 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 864 if (rc != 0) { 865 pthread_mutex_lock(&g_devlist_mutex); 866 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 867 dev->refcnt--; 868 free(ch); 869 pthread_mutex_unlock(&g_devlist_mutex); 870 return NULL; 871 } 872 873 return ch; 874 } 875 876 static void 877 _spdk_put_io_channel(void *arg) 878 { 879 struct spdk_io_channel *ch = arg; 880 bool do_remove_dev = true; 881 struct spdk_thread *thread; 882 883 thread = spdk_get_thread(); 884 if (!thread) { 885 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 886 assert(false); 887 return; 888 } 889 890 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 891 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 892 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 893 894 assert(ch->thread == thread); 895 896 ch->destroy_ref--; 897 898 if (ch->ref > 0 || ch->destroy_ref > 0) { 899 /* 900 * Another reference to the associated io_device was requested 901 * after this message was sent but before it had a chance to 902 * execute. 903 */ 904 return; 905 } 906 907 pthread_mutex_lock(&g_devlist_mutex); 908 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 909 pthread_mutex_unlock(&g_devlist_mutex); 910 911 /* Don't hold the devlist mutex while the destroy_cb is called. */ 912 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 913 914 pthread_mutex_lock(&g_devlist_mutex); 915 ch->dev->refcnt--; 916 917 if (!ch->dev->unregistered) { 918 do_remove_dev = false; 919 } 920 921 if (ch->dev->refcnt > 0) { 922 do_remove_dev = false; 923 } 924 925 pthread_mutex_unlock(&g_devlist_mutex); 926 927 if (do_remove_dev) { 928 _spdk_io_device_free(ch->dev); 929 } 930 free(ch); 931 } 932 933 void 934 spdk_put_io_channel(struct spdk_io_channel *ch) 935 { 936 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 937 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 938 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 939 940 ch->ref--; 941 942 if (ch->ref == 0) { 943 ch->destroy_ref++; 944 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 945 } 946 } 947 948 struct spdk_io_channel * 949 spdk_io_channel_from_ctx(void *ctx) 950 { 951 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 952 } 953 954 struct spdk_thread * 955 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 956 { 957 return ch->thread; 958 } 959 960 struct spdk_io_channel_iter { 961 void *io_device; 962 struct io_device *dev; 963 spdk_channel_msg fn; 964 int status; 965 void *ctx; 966 struct spdk_io_channel *ch; 967 968 struct spdk_thread *cur_thread; 969 970 struct spdk_thread *orig_thread; 971 spdk_channel_for_each_cpl cpl; 972 }; 973 974 void * 975 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 976 { 977 return i->io_device; 978 } 979 980 struct spdk_io_channel * 981 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 982 { 983 return i->ch; 984 } 985 986 void * 987 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 988 { 989 return i->ctx; 990 } 991 992 static void 993 _call_completion(void *ctx) 994 { 995 struct spdk_io_channel_iter *i = ctx; 996 997 if (i->cpl != NULL) { 998 i->cpl(i, i->status); 999 } 1000 free(i); 1001 } 1002 1003 static void 1004 _call_channel(void *ctx) 1005 { 1006 struct spdk_io_channel_iter *i = ctx; 1007 struct spdk_io_channel *ch; 1008 1009 /* 1010 * It is possible that the channel was deleted before this 1011 * message had a chance to execute. If so, skip calling 1012 * the fn() on this thread. 1013 */ 1014 pthread_mutex_lock(&g_devlist_mutex); 1015 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1016 if (ch->dev->io_device == i->io_device) { 1017 break; 1018 } 1019 } 1020 pthread_mutex_unlock(&g_devlist_mutex); 1021 1022 if (ch) { 1023 i->fn(i); 1024 } else { 1025 spdk_for_each_channel_continue(i, 0); 1026 } 1027 } 1028 1029 void 1030 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1031 spdk_channel_for_each_cpl cpl) 1032 { 1033 struct spdk_thread *thread; 1034 struct spdk_io_channel *ch; 1035 struct spdk_io_channel_iter *i; 1036 1037 i = calloc(1, sizeof(*i)); 1038 if (!i) { 1039 SPDK_ERRLOG("Unable to allocate iterator\n"); 1040 return; 1041 } 1042 1043 i->io_device = io_device; 1044 i->fn = fn; 1045 i->ctx = ctx; 1046 i->cpl = cpl; 1047 1048 pthread_mutex_lock(&g_devlist_mutex); 1049 i->orig_thread = _get_thread(); 1050 1051 TAILQ_FOREACH(thread, &g_threads, tailq) { 1052 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1053 if (ch->dev->io_device == io_device) { 1054 ch->dev->for_each_count++; 1055 i->dev = ch->dev; 1056 i->cur_thread = thread; 1057 i->ch = ch; 1058 pthread_mutex_unlock(&g_devlist_mutex); 1059 spdk_thread_send_msg(thread, _call_channel, i); 1060 return; 1061 } 1062 } 1063 } 1064 1065 pthread_mutex_unlock(&g_devlist_mutex); 1066 1067 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1068 } 1069 1070 void 1071 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1072 { 1073 struct spdk_thread *thread; 1074 struct spdk_io_channel *ch; 1075 1076 assert(i->cur_thread == spdk_get_thread()); 1077 1078 i->status = status; 1079 1080 pthread_mutex_lock(&g_devlist_mutex); 1081 if (status) { 1082 goto end; 1083 } 1084 thread = TAILQ_NEXT(i->cur_thread, tailq); 1085 while (thread) { 1086 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1087 if (ch->dev->io_device == i->io_device) { 1088 i->cur_thread = thread; 1089 i->ch = ch; 1090 pthread_mutex_unlock(&g_devlist_mutex); 1091 spdk_thread_send_msg(thread, _call_channel, i); 1092 return; 1093 } 1094 } 1095 thread = TAILQ_NEXT(thread, tailq); 1096 } 1097 1098 end: 1099 i->dev->for_each_count--; 1100 i->ch = NULL; 1101 pthread_mutex_unlock(&g_devlist_mutex); 1102 1103 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1104 } 1105 1106 1107 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1108