1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #ifdef __linux__ 46 #include <sys/prctl.h> 47 #endif 48 49 #ifdef __FreeBSD__ 50 #include <pthread_np.h> 51 #endif 52 53 #define SPDK_MSG_BATCH_SIZE 8 54 55 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 56 57 struct io_device { 58 void *io_device; 59 char *name; 60 spdk_io_channel_create_cb create_cb; 61 spdk_io_channel_destroy_cb destroy_cb; 62 spdk_io_device_unregister_cb unregister_cb; 63 struct spdk_thread *unregister_thread; 64 uint32_t ctx_size; 65 uint32_t for_each_count; 66 TAILQ_ENTRY(io_device) tailq; 67 68 uint32_t refcnt; 69 70 bool unregistered; 71 }; 72 73 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 74 75 struct spdk_msg { 76 spdk_msg_fn fn; 77 void *arg; 78 }; 79 80 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 81 82 enum spdk_poller_state { 83 /* The poller is registered with a thread but not currently executing its fn. */ 84 SPDK_POLLER_STATE_WAITING, 85 86 /* The poller is currently running its fn. */ 87 SPDK_POLLER_STATE_RUNNING, 88 89 /* The poller was unregistered during the execution of its fn. */ 90 SPDK_POLLER_STATE_UNREGISTERED, 91 }; 92 93 struct spdk_poller { 94 TAILQ_ENTRY(spdk_poller) tailq; 95 96 /* Current state of the poller; should only be accessed from the poller's thread. */ 97 enum spdk_poller_state state; 98 99 uint64_t period_ticks; 100 uint64_t next_run_tick; 101 spdk_poller_fn fn; 102 void *arg; 103 }; 104 105 struct spdk_thread { 106 pthread_t thread_id; 107 spdk_thread_pass_msg msg_fn; 108 spdk_start_poller start_poller_fn; 109 spdk_stop_poller stop_poller_fn; 110 void *thread_ctx; 111 TAILQ_HEAD(, spdk_io_channel) io_channels; 112 TAILQ_ENTRY(spdk_thread) tailq; 113 char *name; 114 115 /* 116 * Contains pollers actively running on this thread. Pollers 117 * are run round-robin. The thread takes one poller from the head 118 * of the ring, executes it, then puts it back at the tail of 119 * the ring. 120 */ 121 TAILQ_HEAD(, spdk_poller) active_pollers; 122 123 /** 124 * Contains pollers running on this thread with a periodic timer. 125 */ 126 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 127 128 struct spdk_ring *messages; 129 }; 130 131 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 132 static uint32_t g_thread_count = 0; 133 134 static __thread struct spdk_thread *tls_thread = NULL; 135 136 static inline struct spdk_thread * 137 _get_thread(void) 138 { 139 return tls_thread; 140 } 141 142 static void 143 _set_thread_name(const char *thread_name) 144 { 145 #if defined(__linux__) 146 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 147 #elif defined(__FreeBSD__) 148 pthread_set_name_np(pthread_self(), thread_name); 149 #else 150 #error missing platform support for thread name 151 #endif 152 } 153 154 int 155 spdk_thread_lib_init(void) 156 { 157 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 158 159 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 160 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 161 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 162 sizeof(struct spdk_msg), 163 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 164 SPDK_ENV_SOCKET_ID_ANY); 165 166 if (!g_spdk_msg_mempool) { 167 return -1; 168 } 169 170 return 0; 171 } 172 173 void 174 spdk_thread_lib_fini(void) 175 { 176 struct io_device *dev; 177 178 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 179 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 180 } 181 182 if (g_spdk_msg_mempool) { 183 spdk_mempool_free(g_spdk_msg_mempool); 184 } 185 } 186 187 struct spdk_thread * 188 spdk_allocate_thread(spdk_thread_pass_msg msg_fn, 189 spdk_start_poller start_poller_fn, 190 spdk_stop_poller stop_poller_fn, 191 void *thread_ctx, const char *name) 192 { 193 struct spdk_thread *thread; 194 195 thread = _get_thread(); 196 if (thread) { 197 SPDK_ERRLOG("Double allocated SPDK thread\n"); 198 return NULL; 199 } 200 201 if ((start_poller_fn != NULL && stop_poller_fn == NULL) || 202 (start_poller_fn == NULL && stop_poller_fn != NULL)) { 203 SPDK_ERRLOG("start_poller_fn and stop_poller_fn must either both be NULL or both be non-NULL\n"); 204 return NULL; 205 } 206 207 thread = calloc(1, sizeof(*thread)); 208 if (!thread) { 209 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 210 return NULL; 211 } 212 213 thread->thread_id = pthread_self(); 214 thread->msg_fn = msg_fn; 215 thread->start_poller_fn = start_poller_fn; 216 thread->stop_poller_fn = stop_poller_fn; 217 thread->thread_ctx = thread_ctx; 218 TAILQ_INIT(&thread->io_channels); 219 TAILQ_INIT(&thread->active_pollers); 220 TAILQ_INIT(&thread->timer_pollers); 221 222 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 223 if (!thread->messages) { 224 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 225 free(thread); 226 return NULL; 227 } 228 229 if (name) { 230 _set_thread_name(name); 231 thread->name = strdup(name); 232 } else { 233 thread->name = spdk_sprintf_alloc("%p", thread); 234 } 235 236 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 237 238 pthread_mutex_lock(&g_devlist_mutex); 239 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 240 g_thread_count++; 241 pthread_mutex_unlock(&g_devlist_mutex); 242 243 return thread; 244 } 245 246 void 247 spdk_set_thread(struct spdk_thread *thread) 248 { 249 tls_thread = thread; 250 } 251 252 void 253 spdk_free_thread(void) 254 { 255 struct spdk_thread *thread; 256 struct spdk_io_channel *ch; 257 258 thread = _get_thread(); 259 if (!thread) { 260 SPDK_ERRLOG("No thread allocated\n"); 261 return; 262 } 263 264 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); 265 266 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 267 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 268 thread->name, ch->dev->name); 269 } 270 271 pthread_mutex_lock(&g_devlist_mutex); 272 assert(g_thread_count > 0); 273 g_thread_count--; 274 TAILQ_REMOVE(&g_threads, thread, tailq); 275 pthread_mutex_unlock(&g_devlist_mutex); 276 277 free(thread->name); 278 279 if (thread->messages) { 280 spdk_ring_free(thread->messages); 281 } 282 283 free(thread); 284 285 tls_thread = NULL; 286 } 287 288 static inline uint32_t 289 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 290 { 291 unsigned count, i; 292 void *messages[SPDK_MSG_BATCH_SIZE]; 293 294 #ifdef DEBUG 295 /* 296 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 297 * so we will never actually read uninitialized data from events, but just to be sure 298 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 299 */ 300 memset(messages, 0, sizeof(messages)); 301 #endif 302 303 if (max_msgs > 0) { 304 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 305 } else { 306 max_msgs = SPDK_MSG_BATCH_SIZE; 307 } 308 309 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 310 if (count == 0) { 311 return 0; 312 } 313 314 for (i = 0; i < count; i++) { 315 struct spdk_msg *msg = messages[i]; 316 317 assert(msg != NULL); 318 msg->fn(msg->arg); 319 } 320 321 spdk_mempool_put_bulk(g_spdk_msg_mempool, messages, count); 322 323 return count; 324 } 325 326 static void 327 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 328 { 329 struct spdk_poller *iter; 330 331 poller->next_run_tick = now + poller->period_ticks; 332 333 /* 334 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 335 * run time. 336 */ 337 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 338 if (iter->next_run_tick <= poller->next_run_tick) { 339 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 340 return; 341 } 342 } 343 344 /* No earlier pollers were found, so this poller must be the new head */ 345 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 346 } 347 348 int 349 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs) 350 { 351 uint32_t msg_count; 352 struct spdk_poller *poller; 353 int rc = 0; 354 355 assert(_get_thread() == NULL); 356 357 tls_thread = thread; 358 359 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 360 if (msg_count) { 361 rc = 1; 362 } 363 364 poller = TAILQ_FIRST(&thread->active_pollers); 365 if (poller) { 366 int poller_rc; 367 368 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 369 poller->state = SPDK_POLLER_STATE_RUNNING; 370 poller_rc = poller->fn(poller->arg); 371 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 372 free(poller); 373 } else { 374 poller->state = SPDK_POLLER_STATE_WAITING; 375 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 376 } 377 378 #ifdef DEBUG 379 if (poller_rc == -1) { 380 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 381 } 382 #endif 383 384 if (poller_rc > rc) { 385 rc = poller_rc; 386 } 387 } 388 389 poller = TAILQ_FIRST(&thread->timer_pollers); 390 if (poller) { 391 uint64_t now = spdk_get_ticks(); 392 393 if (now >= poller->next_run_tick) { 394 int timer_rc = 0; 395 396 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 397 poller->state = SPDK_POLLER_STATE_RUNNING; 398 timer_rc = poller->fn(poller->arg); 399 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 400 free(poller); 401 } else { 402 poller->state = SPDK_POLLER_STATE_WAITING; 403 _spdk_poller_insert_timer(thread, poller, now); 404 } 405 406 #ifdef DEBUG 407 if (timer_rc == -1) { 408 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 409 } 410 #endif 411 412 if (timer_rc > rc) { 413 rc = timer_rc; 414 415 } 416 } 417 } 418 419 tls_thread = NULL; 420 421 return rc; 422 } 423 424 uint64_t 425 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 426 { 427 struct spdk_poller *poller; 428 429 poller = TAILQ_FIRST(&thread->timer_pollers); 430 if (poller) { 431 return poller->next_run_tick; 432 } 433 434 return 0; 435 } 436 437 int 438 spdk_thread_has_active_pollers(struct spdk_thread *thread) 439 { 440 return !TAILQ_EMPTY(&thread->active_pollers); 441 } 442 443 uint32_t 444 spdk_thread_get_count(void) 445 { 446 /* 447 * Return cached value of the current thread count. We could acquire the 448 * lock and iterate through the TAILQ of threads to count them, but that 449 * count could still be invalidated after we release the lock. 450 */ 451 return g_thread_count; 452 } 453 454 struct spdk_thread * 455 spdk_get_thread(void) 456 { 457 struct spdk_thread *thread; 458 459 thread = _get_thread(); 460 if (!thread) { 461 SPDK_ERRLOG("No thread allocated\n"); 462 } 463 464 return thread; 465 } 466 467 const char * 468 spdk_thread_get_name(const struct spdk_thread *thread) 469 { 470 return thread->name; 471 } 472 473 void 474 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 475 { 476 struct spdk_msg *msg; 477 int rc; 478 479 if (!thread) { 480 assert(false); 481 return; 482 } 483 484 if (thread->msg_fn) { 485 thread->msg_fn(fn, ctx, thread->thread_ctx); 486 return; 487 } 488 489 msg = spdk_mempool_get(g_spdk_msg_mempool); 490 if (!msg) { 491 assert(false); 492 return; 493 } 494 495 msg->fn = fn; 496 msg->arg = ctx; 497 498 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1); 499 if (rc != 1) { 500 assert(false); 501 spdk_mempool_put(g_spdk_msg_mempool, msg); 502 return; 503 } 504 } 505 506 struct spdk_poller * 507 spdk_poller_register(spdk_poller_fn fn, 508 void *arg, 509 uint64_t period_microseconds) 510 { 511 struct spdk_thread *thread; 512 struct spdk_poller *poller; 513 uint64_t quotient, remainder, ticks; 514 515 thread = spdk_get_thread(); 516 if (!thread) { 517 assert(false); 518 return NULL; 519 } 520 521 if (thread->start_poller_fn) { 522 return thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds); 523 } 524 525 poller = calloc(1, sizeof(*poller)); 526 if (poller == NULL) { 527 SPDK_ERRLOG("Poller memory allocation failed\n"); 528 return NULL; 529 } 530 531 poller->state = SPDK_POLLER_STATE_WAITING; 532 poller->fn = fn; 533 poller->arg = arg; 534 535 if (period_microseconds) { 536 quotient = period_microseconds / SPDK_SEC_TO_USEC; 537 remainder = period_microseconds % SPDK_SEC_TO_USEC; 538 ticks = spdk_get_ticks_hz(); 539 540 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 541 } else { 542 poller->period_ticks = 0; 543 } 544 545 if (poller->period_ticks) { 546 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 547 } else { 548 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 549 } 550 551 return poller; 552 } 553 554 void 555 spdk_poller_unregister(struct spdk_poller **ppoller) 556 { 557 struct spdk_thread *thread; 558 struct spdk_poller *poller; 559 560 poller = *ppoller; 561 if (poller == NULL) { 562 return; 563 } 564 565 *ppoller = NULL; 566 567 thread = spdk_get_thread(); 568 if (!thread) { 569 assert(false); 570 return; 571 } 572 573 if (thread->stop_poller_fn) { 574 thread->stop_poller_fn(poller, thread->thread_ctx); 575 return; 576 } 577 578 if (poller->state == SPDK_POLLER_STATE_RUNNING) { 579 /* 580 * We are being called from the poller_fn, so set the state to unregistered 581 * and let the thread poll loop free the poller. 582 */ 583 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 584 } else { 585 /* Poller is not running currently, so just free it. */ 586 if (poller->period_ticks) { 587 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 588 } else { 589 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 590 } 591 592 free(poller); 593 } 594 } 595 596 struct call_thread { 597 struct spdk_thread *cur_thread; 598 spdk_msg_fn fn; 599 void *ctx; 600 601 struct spdk_thread *orig_thread; 602 spdk_msg_fn cpl; 603 }; 604 605 static void 606 spdk_on_thread(void *ctx) 607 { 608 struct call_thread *ct = ctx; 609 610 ct->fn(ct->ctx); 611 612 pthread_mutex_lock(&g_devlist_mutex); 613 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 614 pthread_mutex_unlock(&g_devlist_mutex); 615 616 if (!ct->cur_thread) { 617 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 618 619 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 620 free(ctx); 621 } else { 622 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 623 ct->cur_thread->name); 624 625 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 626 } 627 } 628 629 void 630 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 631 { 632 struct call_thread *ct; 633 struct spdk_thread *thread; 634 635 ct = calloc(1, sizeof(*ct)); 636 if (!ct) { 637 SPDK_ERRLOG("Unable to perform thread iteration\n"); 638 cpl(ctx); 639 return; 640 } 641 642 ct->fn = fn; 643 ct->ctx = ctx; 644 ct->cpl = cpl; 645 646 pthread_mutex_lock(&g_devlist_mutex); 647 thread = _get_thread(); 648 if (!thread) { 649 SPDK_ERRLOG("No thread allocated\n"); 650 free(ct); 651 cpl(ctx); 652 return; 653 } 654 ct->orig_thread = thread; 655 ct->cur_thread = TAILQ_FIRST(&g_threads); 656 pthread_mutex_unlock(&g_devlist_mutex); 657 658 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 659 ct->orig_thread->name); 660 661 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 662 } 663 664 void 665 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 666 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 667 const char *name) 668 { 669 struct io_device *dev, *tmp; 670 struct spdk_thread *thread; 671 672 assert(io_device != NULL); 673 assert(create_cb != NULL); 674 assert(destroy_cb != NULL); 675 676 thread = spdk_get_thread(); 677 if (!thread) { 678 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 679 assert(false); 680 return; 681 } 682 683 dev = calloc(1, sizeof(struct io_device)); 684 if (dev == NULL) { 685 SPDK_ERRLOG("could not allocate io_device\n"); 686 return; 687 } 688 689 dev->io_device = io_device; 690 if (name) { 691 dev->name = strdup(name); 692 } else { 693 dev->name = spdk_sprintf_alloc("%p", dev); 694 } 695 dev->create_cb = create_cb; 696 dev->destroy_cb = destroy_cb; 697 dev->unregister_cb = NULL; 698 dev->ctx_size = ctx_size; 699 dev->for_each_count = 0; 700 dev->unregistered = false; 701 dev->refcnt = 0; 702 703 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 704 dev->name, dev->io_device, thread->name); 705 706 pthread_mutex_lock(&g_devlist_mutex); 707 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 708 if (tmp->io_device == io_device) { 709 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 710 io_device, tmp->name, dev->name); 711 free(dev->name); 712 free(dev); 713 pthread_mutex_unlock(&g_devlist_mutex); 714 return; 715 } 716 } 717 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 718 pthread_mutex_unlock(&g_devlist_mutex); 719 } 720 721 static void 722 _finish_unregister(void *arg) 723 { 724 struct io_device *dev = arg; 725 726 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 727 dev->name, dev->io_device, dev->unregister_thread->name); 728 729 dev->unregister_cb(dev->io_device); 730 free(dev->name); 731 free(dev); 732 } 733 734 static void 735 _spdk_io_device_free(struct io_device *dev) 736 { 737 if (dev->unregister_cb == NULL) { 738 free(dev->name); 739 free(dev); 740 } else { 741 assert(dev->unregister_thread != NULL); 742 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 743 dev->name, dev->io_device, dev->unregister_thread->name); 744 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 745 } 746 } 747 748 void 749 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 750 { 751 struct io_device *dev; 752 uint32_t refcnt; 753 struct spdk_thread *thread; 754 755 thread = spdk_get_thread(); 756 if (!thread) { 757 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 758 assert(false); 759 return; 760 } 761 762 pthread_mutex_lock(&g_devlist_mutex); 763 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 764 if (dev->io_device == io_device) { 765 break; 766 } 767 } 768 769 if (!dev) { 770 SPDK_ERRLOG("io_device %p not found\n", io_device); 771 assert(false); 772 pthread_mutex_unlock(&g_devlist_mutex); 773 return; 774 } 775 776 if (dev->for_each_count > 0) { 777 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 778 dev->name, io_device, dev->for_each_count); 779 pthread_mutex_unlock(&g_devlist_mutex); 780 return; 781 } 782 783 dev->unregister_cb = unregister_cb; 784 dev->unregistered = true; 785 TAILQ_REMOVE(&g_io_devices, dev, tailq); 786 refcnt = dev->refcnt; 787 dev->unregister_thread = thread; 788 pthread_mutex_unlock(&g_devlist_mutex); 789 790 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 791 dev->name, dev->io_device, thread->name); 792 793 if (refcnt > 0) { 794 /* defer deletion */ 795 return; 796 } 797 798 _spdk_io_device_free(dev); 799 } 800 801 struct spdk_io_channel * 802 spdk_get_io_channel(void *io_device) 803 { 804 struct spdk_io_channel *ch; 805 struct spdk_thread *thread; 806 struct io_device *dev; 807 int rc; 808 809 pthread_mutex_lock(&g_devlist_mutex); 810 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 811 if (dev->io_device == io_device) { 812 break; 813 } 814 } 815 if (dev == NULL) { 816 SPDK_ERRLOG("could not find io_device %p\n", io_device); 817 pthread_mutex_unlock(&g_devlist_mutex); 818 return NULL; 819 } 820 821 thread = _get_thread(); 822 if (!thread) { 823 SPDK_ERRLOG("No thread allocated\n"); 824 pthread_mutex_unlock(&g_devlist_mutex); 825 return NULL; 826 } 827 828 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 829 if (ch->dev == dev) { 830 ch->ref++; 831 832 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 833 ch, dev->name, dev->io_device, thread->name, ch->ref); 834 835 /* 836 * An I/O channel already exists for this device on this 837 * thread, so return it. 838 */ 839 pthread_mutex_unlock(&g_devlist_mutex); 840 return ch; 841 } 842 } 843 844 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 845 if (ch == NULL) { 846 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 847 pthread_mutex_unlock(&g_devlist_mutex); 848 return NULL; 849 } 850 851 ch->dev = dev; 852 ch->destroy_cb = dev->destroy_cb; 853 ch->thread = thread; 854 ch->ref = 1; 855 ch->destroy_ref = 0; 856 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 857 858 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 859 ch, dev->name, dev->io_device, thread->name, ch->ref); 860 861 dev->refcnt++; 862 863 pthread_mutex_unlock(&g_devlist_mutex); 864 865 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 866 if (rc != 0) { 867 pthread_mutex_lock(&g_devlist_mutex); 868 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 869 dev->refcnt--; 870 free(ch); 871 pthread_mutex_unlock(&g_devlist_mutex); 872 return NULL; 873 } 874 875 return ch; 876 } 877 878 static void 879 _spdk_put_io_channel(void *arg) 880 { 881 struct spdk_io_channel *ch = arg; 882 bool do_remove_dev = true; 883 struct spdk_thread *thread; 884 885 thread = spdk_get_thread(); 886 if (!thread) { 887 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__); 888 assert(false); 889 return; 890 } 891 892 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 893 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 894 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 895 896 assert(ch->thread == thread); 897 898 ch->destroy_ref--; 899 900 if (ch->ref > 0 || ch->destroy_ref > 0) { 901 /* 902 * Another reference to the associated io_device was requested 903 * after this message was sent but before it had a chance to 904 * execute. 905 */ 906 return; 907 } 908 909 pthread_mutex_lock(&g_devlist_mutex); 910 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 911 pthread_mutex_unlock(&g_devlist_mutex); 912 913 /* Don't hold the devlist mutex while the destroy_cb is called. */ 914 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 915 916 pthread_mutex_lock(&g_devlist_mutex); 917 ch->dev->refcnt--; 918 919 if (!ch->dev->unregistered) { 920 do_remove_dev = false; 921 } 922 923 if (ch->dev->refcnt > 0) { 924 do_remove_dev = false; 925 } 926 927 pthread_mutex_unlock(&g_devlist_mutex); 928 929 if (do_remove_dev) { 930 _spdk_io_device_free(ch->dev); 931 } 932 free(ch); 933 } 934 935 void 936 spdk_put_io_channel(struct spdk_io_channel *ch) 937 { 938 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 939 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 940 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 941 942 ch->ref--; 943 944 if (ch->ref == 0) { 945 ch->destroy_ref++; 946 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 947 } 948 } 949 950 struct spdk_io_channel * 951 spdk_io_channel_from_ctx(void *ctx) 952 { 953 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 954 } 955 956 struct spdk_thread * 957 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 958 { 959 return ch->thread; 960 } 961 962 struct spdk_io_channel_iter { 963 void *io_device; 964 struct io_device *dev; 965 spdk_channel_msg fn; 966 int status; 967 void *ctx; 968 struct spdk_io_channel *ch; 969 970 struct spdk_thread *cur_thread; 971 972 struct spdk_thread *orig_thread; 973 spdk_channel_for_each_cpl cpl; 974 }; 975 976 void * 977 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 978 { 979 return i->io_device; 980 } 981 982 struct spdk_io_channel * 983 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 984 { 985 return i->ch; 986 } 987 988 void * 989 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 990 { 991 return i->ctx; 992 } 993 994 static void 995 _call_completion(void *ctx) 996 { 997 struct spdk_io_channel_iter *i = ctx; 998 999 if (i->cpl != NULL) { 1000 i->cpl(i, i->status); 1001 } 1002 free(i); 1003 } 1004 1005 static void 1006 _call_channel(void *ctx) 1007 { 1008 struct spdk_io_channel_iter *i = ctx; 1009 struct spdk_io_channel *ch; 1010 1011 /* 1012 * It is possible that the channel was deleted before this 1013 * message had a chance to execute. If so, skip calling 1014 * the fn() on this thread. 1015 */ 1016 pthread_mutex_lock(&g_devlist_mutex); 1017 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1018 if (ch->dev->io_device == i->io_device) { 1019 break; 1020 } 1021 } 1022 pthread_mutex_unlock(&g_devlist_mutex); 1023 1024 if (ch) { 1025 i->fn(i); 1026 } else { 1027 spdk_for_each_channel_continue(i, 0); 1028 } 1029 } 1030 1031 void 1032 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1033 spdk_channel_for_each_cpl cpl) 1034 { 1035 struct spdk_thread *thread; 1036 struct spdk_io_channel *ch; 1037 struct spdk_io_channel_iter *i; 1038 1039 i = calloc(1, sizeof(*i)); 1040 if (!i) { 1041 SPDK_ERRLOG("Unable to allocate iterator\n"); 1042 return; 1043 } 1044 1045 i->io_device = io_device; 1046 i->fn = fn; 1047 i->ctx = ctx; 1048 i->cpl = cpl; 1049 1050 pthread_mutex_lock(&g_devlist_mutex); 1051 i->orig_thread = _get_thread(); 1052 1053 TAILQ_FOREACH(thread, &g_threads, tailq) { 1054 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1055 if (ch->dev->io_device == io_device) { 1056 ch->dev->for_each_count++; 1057 i->dev = ch->dev; 1058 i->cur_thread = thread; 1059 i->ch = ch; 1060 pthread_mutex_unlock(&g_devlist_mutex); 1061 spdk_thread_send_msg(thread, _call_channel, i); 1062 return; 1063 } 1064 } 1065 } 1066 1067 pthread_mutex_unlock(&g_devlist_mutex); 1068 1069 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1070 } 1071 1072 void 1073 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1074 { 1075 struct spdk_thread *thread; 1076 struct spdk_io_channel *ch; 1077 1078 assert(i->cur_thread == spdk_get_thread()); 1079 1080 i->status = status; 1081 1082 pthread_mutex_lock(&g_devlist_mutex); 1083 if (status) { 1084 goto end; 1085 } 1086 thread = TAILQ_NEXT(i->cur_thread, tailq); 1087 while (thread) { 1088 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1089 if (ch->dev->io_device == i->io_device) { 1090 i->cur_thread = thread; 1091 i->ch = ch; 1092 pthread_mutex_unlock(&g_devlist_mutex); 1093 spdk_thread_send_msg(thread, _call_channel, i); 1094 return; 1095 } 1096 } 1097 thread = TAILQ_NEXT(thread, tailq); 1098 } 1099 1100 end: 1101 i->dev->for_each_count--; 1102 i->ch = NULL; 1103 pthread_mutex_unlock(&g_devlist_mutex); 1104 1105 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1106 } 1107 1108 1109 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1110