1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 43 #include "spdk_internal/log.h" 44 #include "spdk_internal/thread.h" 45 46 #define SPDK_MSG_BATCH_SIZE 8 47 #define SPDK_MAX_DEVICE_NAME_LEN 256 48 #define SPDK_MAX_THREAD_NAME_LEN 256 49 50 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 51 52 static spdk_new_thread_fn g_new_thread_fn = NULL; 53 static size_t g_ctx_sz = 0; 54 55 struct io_device { 56 void *io_device; 57 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 58 spdk_io_channel_create_cb create_cb; 59 spdk_io_channel_destroy_cb destroy_cb; 60 spdk_io_device_unregister_cb unregister_cb; 61 struct spdk_thread *unregister_thread; 62 uint32_t ctx_size; 63 uint32_t for_each_count; 64 TAILQ_ENTRY(io_device) tailq; 65 66 uint32_t refcnt; 67 68 bool unregistered; 69 }; 70 71 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 72 73 struct spdk_msg { 74 spdk_msg_fn fn; 75 void *arg; 76 77 SLIST_ENTRY(spdk_msg) link; 78 }; 79 80 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 81 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 82 83 enum spdk_poller_state { 84 /* The poller is registered with a thread but not currently executing its fn. */ 85 SPDK_POLLER_STATE_WAITING, 86 87 /* The poller is currently running its fn. */ 88 SPDK_POLLER_STATE_RUNNING, 89 90 /* The poller was unregistered during the execution of its fn. */ 91 SPDK_POLLER_STATE_UNREGISTERED, 92 93 /* The poller is in the process of being paused. It will be paused 94 * during the next time it's supposed to be executed. 95 */ 96 SPDK_POLLER_STATE_PAUSING, 97 98 /* The poller is registered but currently paused. It's on the 99 * paused_pollers list. 100 */ 101 SPDK_POLLER_STATE_PAUSED, 102 }; 103 104 struct spdk_poller { 105 TAILQ_ENTRY(spdk_poller) tailq; 106 107 /* Current state of the poller; should only be accessed from the poller's thread. */ 108 enum spdk_poller_state state; 109 110 uint64_t period_ticks; 111 uint64_t next_run_tick; 112 spdk_poller_fn fn; 113 void *arg; 114 struct spdk_thread *thread; 115 }; 116 117 struct spdk_thread { 118 TAILQ_HEAD(, spdk_io_channel) io_channels; 119 TAILQ_ENTRY(spdk_thread) tailq; 120 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 121 122 bool exit; 123 124 struct spdk_cpuset cpumask; 125 126 uint64_t tsc_last; 127 struct spdk_thread_stats stats; 128 129 /* 130 * Contains pollers actively running on this thread. Pollers 131 * are run round-robin. The thread takes one poller from the head 132 * of the ring, executes it, then puts it back at the tail of 133 * the ring. 134 */ 135 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 136 137 /** 138 * Contains pollers running on this thread with a periodic timer. 139 */ 140 TAILQ_HEAD(timed_pollers_head, spdk_poller) timed_pollers; 141 142 /* 143 * Contains paused pollers. Pollers on this queue are waiting until 144 * they are resumed (in which case they're put onto the active/timer 145 * queues) or unregistered. 146 */ 147 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 148 149 struct spdk_ring *messages; 150 151 SLIST_HEAD(, spdk_msg) msg_cache; 152 size_t msg_cache_count; 153 154 spdk_msg_fn critical_msg; 155 156 /* User context allocated at the end */ 157 uint8_t ctx[0]; 158 }; 159 160 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 161 static uint32_t g_thread_count = 0; 162 163 static __thread struct spdk_thread *tls_thread = NULL; 164 165 static inline struct spdk_thread * 166 _get_thread(void) 167 { 168 return tls_thread; 169 } 170 171 int 172 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 173 { 174 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 175 176 assert(g_new_thread_fn == NULL); 177 g_new_thread_fn = new_thread_fn; 178 179 g_ctx_sz = ctx_sz; 180 181 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 182 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 183 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 184 sizeof(struct spdk_msg), 185 0, /* No cache. We do our own. */ 186 SPDK_ENV_SOCKET_ID_ANY); 187 188 if (!g_spdk_msg_mempool) { 189 return -1; 190 } 191 192 return 0; 193 } 194 195 void 196 spdk_thread_lib_fini(void) 197 { 198 struct io_device *dev; 199 200 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 201 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 202 } 203 204 if (g_spdk_msg_mempool) { 205 spdk_mempool_free(g_spdk_msg_mempool); 206 g_spdk_msg_mempool = NULL; 207 } 208 209 g_new_thread_fn = NULL; 210 g_ctx_sz = 0; 211 } 212 213 static void 214 _free_thread(struct spdk_thread *thread) 215 { 216 struct spdk_io_channel *ch; 217 struct spdk_msg *msg; 218 struct spdk_poller *poller, *ptmp; 219 220 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 221 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 222 thread->name, ch->dev->name); 223 } 224 225 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 226 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 227 SPDK_WARNLOG("poller %p still registered at thread exit\n", 228 poller); 229 } 230 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 231 free(poller); 232 } 233 234 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 235 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 236 SPDK_WARNLOG("poller %p still registered at thread exit\n", 237 poller); 238 } 239 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 240 free(poller); 241 } 242 243 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 244 SPDK_WARNLOG("poller %p still registered at thread exit\n", poller); 245 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 246 free(poller); 247 } 248 249 pthread_mutex_lock(&g_devlist_mutex); 250 assert(g_thread_count > 0); 251 g_thread_count--; 252 TAILQ_REMOVE(&g_threads, thread, tailq); 253 pthread_mutex_unlock(&g_devlist_mutex); 254 255 msg = SLIST_FIRST(&thread->msg_cache); 256 while (msg != NULL) { 257 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 258 259 assert(thread->msg_cache_count > 0); 260 thread->msg_cache_count--; 261 spdk_mempool_put(g_spdk_msg_mempool, msg); 262 263 msg = SLIST_FIRST(&thread->msg_cache); 264 } 265 266 assert(thread->msg_cache_count == 0); 267 268 spdk_ring_free(thread->messages); 269 free(thread); 270 } 271 272 struct spdk_thread * 273 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 274 { 275 struct spdk_thread *thread; 276 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 277 int rc, i; 278 279 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 280 if (!thread) { 281 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 282 return NULL; 283 } 284 285 if (cpumask) { 286 spdk_cpuset_copy(&thread->cpumask, cpumask); 287 } else { 288 spdk_cpuset_negate(&thread->cpumask); 289 } 290 291 TAILQ_INIT(&thread->io_channels); 292 TAILQ_INIT(&thread->active_pollers); 293 TAILQ_INIT(&thread->timed_pollers); 294 TAILQ_INIT(&thread->paused_pollers); 295 SLIST_INIT(&thread->msg_cache); 296 thread->msg_cache_count = 0; 297 298 thread->tsc_last = spdk_get_ticks(); 299 300 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 301 if (!thread->messages) { 302 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 303 free(thread); 304 return NULL; 305 } 306 307 /* Fill the local message pool cache. */ 308 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 309 if (rc == 0) { 310 /* If we can't populate the cache it's ok. The cache will get filled 311 * up organically as messages are passed to the thread. */ 312 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 313 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 314 thread->msg_cache_count++; 315 } 316 } 317 318 if (name) { 319 snprintf(thread->name, sizeof(thread->name), "%s", name); 320 } else { 321 snprintf(thread->name, sizeof(thread->name), "%p", thread); 322 } 323 324 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 325 326 pthread_mutex_lock(&g_devlist_mutex); 327 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 328 g_thread_count++; 329 pthread_mutex_unlock(&g_devlist_mutex); 330 331 if (g_new_thread_fn) { 332 rc = g_new_thread_fn(thread); 333 if (rc != 0) { 334 _free_thread(thread); 335 return NULL; 336 } 337 } 338 339 return thread; 340 } 341 342 void 343 spdk_set_thread(struct spdk_thread *thread) 344 { 345 tls_thread = thread; 346 } 347 348 int 349 spdk_thread_exit(struct spdk_thread *thread) 350 { 351 struct spdk_poller *poller; 352 struct spdk_io_channel *ch; 353 354 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); 355 356 assert(tls_thread == thread); 357 358 if (thread->exit) { 359 SPDK_ERRLOG("thread %s is already marked as exited\n", 360 thread->name); 361 return -EINVAL; 362 } 363 364 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 365 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 366 SPDK_ERRLOG("thread %s still has active poller %p\n", 367 thread->name, poller); 368 return -EBUSY; 369 } 370 } 371 372 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 373 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 374 SPDK_ERRLOG("thread %s still has active timed poller %p\n", 375 thread->name, poller); 376 return -EBUSY; 377 } 378 } 379 380 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 381 SPDK_ERRLOG("thread %s still has paused poller %p\n", 382 thread->name, poller); 383 return -EBUSY; 384 } 385 386 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 387 if (ch->ref != 0) { 388 SPDK_ERRLOG("thread %s still has active channel for io_device %s\n", 389 thread->name, ch->dev->name); 390 return -EBUSY; 391 } 392 } 393 394 thread->exit = true; 395 return 0; 396 } 397 398 bool 399 spdk_thread_is_exited(struct spdk_thread *thread) 400 { 401 return thread->exit; 402 } 403 404 void 405 spdk_thread_destroy(struct spdk_thread *thread) 406 { 407 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); 408 409 assert(thread->exit == true); 410 411 if (tls_thread == thread) { 412 tls_thread = NULL; 413 } 414 415 _free_thread(thread); 416 } 417 418 void * 419 spdk_thread_get_ctx(struct spdk_thread *thread) 420 { 421 if (g_ctx_sz > 0) { 422 return thread->ctx; 423 } 424 425 return NULL; 426 } 427 428 struct spdk_cpuset * 429 spdk_thread_get_cpumask(struct spdk_thread *thread) 430 { 431 return &thread->cpumask; 432 } 433 434 struct spdk_thread * 435 spdk_thread_get_from_ctx(void *ctx) 436 { 437 if (ctx == NULL) { 438 assert(false); 439 return NULL; 440 } 441 442 assert(g_ctx_sz > 0); 443 444 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 445 } 446 447 static inline uint32_t 448 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 449 { 450 unsigned count, i; 451 void *messages[SPDK_MSG_BATCH_SIZE]; 452 453 #ifdef DEBUG 454 /* 455 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 456 * so we will never actually read uninitialized data from events, but just to be sure 457 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 458 */ 459 memset(messages, 0, sizeof(messages)); 460 #endif 461 462 if (max_msgs > 0) { 463 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 464 } else { 465 max_msgs = SPDK_MSG_BATCH_SIZE; 466 } 467 468 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 469 if (count == 0) { 470 return 0; 471 } 472 473 for (i = 0; i < count; i++) { 474 struct spdk_msg *msg = messages[i]; 475 476 assert(msg != NULL); 477 msg->fn(msg->arg); 478 479 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 480 /* Insert the messages at the head. We want to re-use the hot 481 * ones. */ 482 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 483 thread->msg_cache_count++; 484 } else { 485 spdk_mempool_put(g_spdk_msg_mempool, msg); 486 } 487 } 488 489 return count; 490 } 491 492 static void 493 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 494 { 495 struct spdk_poller *iter; 496 497 poller->next_run_tick = now + poller->period_ticks; 498 499 /* 500 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 501 * run time. 502 */ 503 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 504 if (iter->next_run_tick <= poller->next_run_tick) { 505 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 506 return; 507 } 508 } 509 510 /* No earlier pollers were found, so this poller must be the new head */ 511 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 512 } 513 514 static void 515 _spdk_thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 516 { 517 if (poller->period_ticks) { 518 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 519 } else { 520 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 521 } 522 } 523 524 int 525 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 526 { 527 uint32_t msg_count; 528 struct spdk_thread *orig_thread; 529 struct spdk_poller *poller, *tmp; 530 spdk_msg_fn critical_msg; 531 int rc = 0; 532 533 orig_thread = _get_thread(); 534 tls_thread = thread; 535 536 if (now == 0) { 537 now = spdk_get_ticks(); 538 } 539 540 critical_msg = thread->critical_msg; 541 if (spdk_unlikely(critical_msg != NULL)) { 542 critical_msg(NULL); 543 thread->critical_msg = NULL; 544 } 545 546 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 547 if (msg_count) { 548 rc = 1; 549 } 550 551 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 552 active_pollers_head, tailq, tmp) { 553 int poller_rc; 554 555 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 556 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 557 free(poller); 558 continue; 559 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 560 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 561 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 562 poller->state = SPDK_POLLER_STATE_PAUSED; 563 continue; 564 } 565 566 poller->state = SPDK_POLLER_STATE_RUNNING; 567 poller_rc = poller->fn(poller->arg); 568 569 #ifdef DEBUG 570 if (poller_rc == -1) { 571 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 572 } 573 #endif 574 575 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 576 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 577 free(poller); 578 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 579 poller->state = SPDK_POLLER_STATE_WAITING; 580 } 581 582 if (poller_rc > rc) { 583 rc = poller_rc; 584 } 585 } 586 587 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 588 int timer_rc = 0; 589 590 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 591 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 592 free(poller); 593 continue; 594 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 595 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 596 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 597 poller->state = SPDK_POLLER_STATE_PAUSED; 598 continue; 599 } 600 601 if (now < poller->next_run_tick) { 602 break; 603 } 604 605 poller->state = SPDK_POLLER_STATE_RUNNING; 606 timer_rc = poller->fn(poller->arg); 607 608 #ifdef DEBUG 609 if (timer_rc == -1) { 610 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 611 } 612 #endif 613 614 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 615 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 616 free(poller); 617 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 618 poller->state = SPDK_POLLER_STATE_WAITING; 619 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 620 _spdk_poller_insert_timer(thread, poller, now); 621 } 622 623 if (timer_rc > rc) { 624 rc = timer_rc; 625 } 626 } 627 628 if (rc == 0) { 629 /* Poller status idle */ 630 thread->stats.idle_tsc += now - thread->tsc_last; 631 } else if (rc > 0) { 632 /* Poller status busy */ 633 thread->stats.busy_tsc += now - thread->tsc_last; 634 } 635 thread->tsc_last = now; 636 637 tls_thread = orig_thread; 638 639 return rc; 640 } 641 642 uint64_t 643 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 644 { 645 struct spdk_poller *poller; 646 647 poller = TAILQ_FIRST(&thread->timed_pollers); 648 if (poller) { 649 return poller->next_run_tick; 650 } 651 652 return 0; 653 } 654 655 int 656 spdk_thread_has_active_pollers(struct spdk_thread *thread) 657 { 658 return !TAILQ_EMPTY(&thread->active_pollers); 659 } 660 661 static bool 662 _spdk_thread_has_unpaused_pollers(struct spdk_thread *thread) 663 { 664 if (TAILQ_EMPTY(&thread->active_pollers) && 665 TAILQ_EMPTY(&thread->timed_pollers)) { 666 return false; 667 } 668 669 return true; 670 } 671 672 bool 673 spdk_thread_has_pollers(struct spdk_thread *thread) 674 { 675 if (!_spdk_thread_has_unpaused_pollers(thread) && 676 TAILQ_EMPTY(&thread->paused_pollers)) { 677 return false; 678 } 679 680 return true; 681 } 682 683 bool 684 spdk_thread_is_idle(struct spdk_thread *thread) 685 { 686 if (spdk_ring_count(thread->messages) || 687 _spdk_thread_has_unpaused_pollers(thread) || 688 thread->critical_msg != NULL) { 689 return false; 690 } 691 692 return true; 693 } 694 695 uint32_t 696 spdk_thread_get_count(void) 697 { 698 /* 699 * Return cached value of the current thread count. We could acquire the 700 * lock and iterate through the TAILQ of threads to count them, but that 701 * count could still be invalidated after we release the lock. 702 */ 703 return g_thread_count; 704 } 705 706 struct spdk_thread * 707 spdk_get_thread(void) 708 { 709 return _get_thread(); 710 } 711 712 const char * 713 spdk_thread_get_name(const struct spdk_thread *thread) 714 { 715 return thread->name; 716 } 717 718 int 719 spdk_thread_get_stats(struct spdk_thread_stats *stats) 720 { 721 struct spdk_thread *thread; 722 723 thread = _get_thread(); 724 if (!thread) { 725 SPDK_ERRLOG("No thread allocated\n"); 726 return -EINVAL; 727 } 728 729 if (stats == NULL) { 730 return -EINVAL; 731 } 732 733 *stats = thread->stats; 734 735 return 0; 736 } 737 738 int 739 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 740 { 741 struct spdk_thread *local_thread; 742 struct spdk_msg *msg; 743 int rc; 744 745 assert(thread != NULL); 746 747 if (spdk_unlikely(thread->exit)) { 748 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 749 return -EIO; 750 } 751 752 local_thread = _get_thread(); 753 754 msg = NULL; 755 if (local_thread != NULL) { 756 if (local_thread->msg_cache_count > 0) { 757 msg = SLIST_FIRST(&local_thread->msg_cache); 758 assert(msg != NULL); 759 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 760 local_thread->msg_cache_count--; 761 } 762 } 763 764 if (msg == NULL) { 765 msg = spdk_mempool_get(g_spdk_msg_mempool); 766 if (!msg) { 767 SPDK_ERRLOG("msg could not be allocated\n"); 768 return -ENOMEM; 769 } 770 } 771 772 msg->fn = fn; 773 msg->arg = ctx; 774 775 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 776 if (rc != 1) { 777 SPDK_ERRLOG("msg could not be enqueued\n"); 778 spdk_mempool_put(g_spdk_msg_mempool, msg); 779 return -EIO; 780 } 781 782 return 0; 783 } 784 785 int 786 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 787 { 788 spdk_msg_fn expected = NULL; 789 790 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 791 __ATOMIC_SEQ_CST)) { 792 return 0; 793 } 794 795 return -EIO; 796 } 797 798 struct spdk_poller * 799 spdk_poller_register(spdk_poller_fn fn, 800 void *arg, 801 uint64_t period_microseconds) 802 { 803 struct spdk_thread *thread; 804 struct spdk_poller *poller; 805 uint64_t quotient, remainder, ticks; 806 807 thread = spdk_get_thread(); 808 if (!thread) { 809 assert(false); 810 return NULL; 811 } 812 813 if (spdk_unlikely(thread->exit)) { 814 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 815 return NULL; 816 } 817 818 poller = calloc(1, sizeof(*poller)); 819 if (poller == NULL) { 820 SPDK_ERRLOG("Poller memory allocation failed\n"); 821 return NULL; 822 } 823 824 poller->state = SPDK_POLLER_STATE_WAITING; 825 poller->fn = fn; 826 poller->arg = arg; 827 poller->thread = thread; 828 829 if (period_microseconds) { 830 quotient = period_microseconds / SPDK_SEC_TO_USEC; 831 remainder = period_microseconds % SPDK_SEC_TO_USEC; 832 ticks = spdk_get_ticks_hz(); 833 834 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 835 } else { 836 poller->period_ticks = 0; 837 } 838 839 _spdk_thread_insert_poller(thread, poller); 840 841 return poller; 842 } 843 844 void 845 spdk_poller_unregister(struct spdk_poller **ppoller) 846 { 847 struct spdk_thread *thread; 848 struct spdk_poller *poller; 849 850 poller = *ppoller; 851 if (poller == NULL) { 852 return; 853 } 854 855 *ppoller = NULL; 856 857 thread = spdk_get_thread(); 858 if (!thread) { 859 assert(false); 860 return; 861 } 862 863 if (poller->thread != thread) { 864 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 865 assert(false); 866 return; 867 } 868 869 /* If the poller was paused, put it on the active_pollers list so that 870 * its unregistration can be processed by spdk_thread_poll(). 871 */ 872 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 873 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 874 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 875 poller->period_ticks = 0; 876 } 877 878 /* Simply set the state to unregistered. The poller will get cleaned up 879 * in a subsequent call to spdk_thread_poll(). 880 */ 881 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 882 } 883 884 void 885 spdk_poller_pause(struct spdk_poller *poller) 886 { 887 struct spdk_thread *thread; 888 889 if (poller->state == SPDK_POLLER_STATE_PAUSED || 890 poller->state == SPDK_POLLER_STATE_PAUSING) { 891 return; 892 } 893 894 thread = spdk_get_thread(); 895 if (!thread) { 896 assert(false); 897 return; 898 } 899 900 /* If a poller is paused from within itself, we can immediately move it 901 * on the paused_pollers list. Otherwise we just set its state to 902 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 903 * allows a poller to be paused from another one's context without 904 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 905 */ 906 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 907 poller->state = SPDK_POLLER_STATE_PAUSING; 908 } else { 909 if (poller->period_ticks > 0) { 910 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 911 } else { 912 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 913 } 914 915 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 916 poller->state = SPDK_POLLER_STATE_PAUSED; 917 } 918 } 919 920 void 921 spdk_poller_resume(struct spdk_poller *poller) 922 { 923 struct spdk_thread *thread; 924 925 if (poller->state != SPDK_POLLER_STATE_PAUSED && 926 poller->state != SPDK_POLLER_STATE_PAUSING) { 927 return; 928 } 929 930 thread = spdk_get_thread(); 931 if (!thread) { 932 assert(false); 933 return; 934 } 935 936 /* If a poller is paused it has to be removed from the paused pollers 937 * list and put on the active / timer list depending on its 938 * period_ticks. If a poller is still in the process of being paused, 939 * we just need to flip its state back to waiting, as it's already on 940 * the appropriate list. 941 */ 942 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 943 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 944 _spdk_thread_insert_poller(thread, poller); 945 } 946 947 poller->state = SPDK_POLLER_STATE_WAITING; 948 } 949 950 struct call_thread { 951 struct spdk_thread *cur_thread; 952 spdk_msg_fn fn; 953 void *ctx; 954 955 struct spdk_thread *orig_thread; 956 spdk_msg_fn cpl; 957 }; 958 959 static void 960 spdk_on_thread(void *ctx) 961 { 962 struct call_thread *ct = ctx; 963 964 ct->fn(ct->ctx); 965 966 pthread_mutex_lock(&g_devlist_mutex); 967 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 968 pthread_mutex_unlock(&g_devlist_mutex); 969 970 if (!ct->cur_thread) { 971 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 972 973 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 974 free(ctx); 975 } else { 976 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 977 ct->cur_thread->name); 978 979 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 980 } 981 } 982 983 void 984 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 985 { 986 struct call_thread *ct; 987 struct spdk_thread *thread; 988 989 ct = calloc(1, sizeof(*ct)); 990 if (!ct) { 991 SPDK_ERRLOG("Unable to perform thread iteration\n"); 992 cpl(ctx); 993 return; 994 } 995 996 ct->fn = fn; 997 ct->ctx = ctx; 998 ct->cpl = cpl; 999 1000 thread = _get_thread(); 1001 if (!thread) { 1002 SPDK_ERRLOG("No thread allocated\n"); 1003 free(ct); 1004 cpl(ctx); 1005 return; 1006 } 1007 ct->orig_thread = thread; 1008 1009 pthread_mutex_lock(&g_devlist_mutex); 1010 ct->cur_thread = TAILQ_FIRST(&g_threads); 1011 pthread_mutex_unlock(&g_devlist_mutex); 1012 1013 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 1014 ct->orig_thread->name); 1015 1016 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 1017 } 1018 1019 void 1020 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1021 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1022 const char *name) 1023 { 1024 struct io_device *dev, *tmp; 1025 struct spdk_thread *thread; 1026 1027 assert(io_device != NULL); 1028 assert(create_cb != NULL); 1029 assert(destroy_cb != NULL); 1030 1031 thread = spdk_get_thread(); 1032 if (!thread) { 1033 SPDK_ERRLOG("called from non-SPDK thread\n"); 1034 assert(false); 1035 return; 1036 } 1037 1038 dev = calloc(1, sizeof(struct io_device)); 1039 if (dev == NULL) { 1040 SPDK_ERRLOG("could not allocate io_device\n"); 1041 return; 1042 } 1043 1044 dev->io_device = io_device; 1045 if (name) { 1046 snprintf(dev->name, sizeof(dev->name), "%s", name); 1047 } else { 1048 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1049 } 1050 dev->create_cb = create_cb; 1051 dev->destroy_cb = destroy_cb; 1052 dev->unregister_cb = NULL; 1053 dev->ctx_size = ctx_size; 1054 dev->for_each_count = 0; 1055 dev->unregistered = false; 1056 dev->refcnt = 0; 1057 1058 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 1059 dev->name, dev->io_device, thread->name); 1060 1061 pthread_mutex_lock(&g_devlist_mutex); 1062 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1063 if (tmp->io_device == io_device) { 1064 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1065 io_device, tmp->name, dev->name); 1066 free(dev); 1067 pthread_mutex_unlock(&g_devlist_mutex); 1068 return; 1069 } 1070 } 1071 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1072 pthread_mutex_unlock(&g_devlist_mutex); 1073 } 1074 1075 static void 1076 _finish_unregister(void *arg) 1077 { 1078 struct io_device *dev = arg; 1079 1080 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1081 dev->name, dev->io_device, dev->unregister_thread->name); 1082 1083 dev->unregister_cb(dev->io_device); 1084 free(dev); 1085 } 1086 1087 static void 1088 _spdk_io_device_free(struct io_device *dev) 1089 { 1090 if (dev->unregister_cb == NULL) { 1091 free(dev); 1092 } else { 1093 assert(dev->unregister_thread != NULL); 1094 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 1095 dev->name, dev->io_device, dev->unregister_thread->name); 1096 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1097 } 1098 } 1099 1100 void 1101 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1102 { 1103 struct io_device *dev; 1104 uint32_t refcnt; 1105 struct spdk_thread *thread; 1106 1107 thread = spdk_get_thread(); 1108 if (!thread) { 1109 SPDK_ERRLOG("called from non-SPDK thread\n"); 1110 assert(false); 1111 return; 1112 } 1113 1114 pthread_mutex_lock(&g_devlist_mutex); 1115 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1116 if (dev->io_device == io_device) { 1117 break; 1118 } 1119 } 1120 1121 if (!dev) { 1122 SPDK_ERRLOG("io_device %p not found\n", io_device); 1123 assert(false); 1124 pthread_mutex_unlock(&g_devlist_mutex); 1125 return; 1126 } 1127 1128 if (dev->for_each_count > 0) { 1129 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1130 dev->name, io_device, dev->for_each_count); 1131 pthread_mutex_unlock(&g_devlist_mutex); 1132 return; 1133 } 1134 1135 dev->unregister_cb = unregister_cb; 1136 dev->unregistered = true; 1137 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1138 refcnt = dev->refcnt; 1139 dev->unregister_thread = thread; 1140 pthread_mutex_unlock(&g_devlist_mutex); 1141 1142 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 1143 dev->name, dev->io_device, thread->name); 1144 1145 if (refcnt > 0) { 1146 /* defer deletion */ 1147 return; 1148 } 1149 1150 _spdk_io_device_free(dev); 1151 } 1152 1153 struct spdk_io_channel * 1154 spdk_get_io_channel(void *io_device) 1155 { 1156 struct spdk_io_channel *ch; 1157 struct spdk_thread *thread; 1158 struct io_device *dev; 1159 int rc; 1160 1161 pthread_mutex_lock(&g_devlist_mutex); 1162 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1163 if (dev->io_device == io_device) { 1164 break; 1165 } 1166 } 1167 if (dev == NULL) { 1168 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1169 pthread_mutex_unlock(&g_devlist_mutex); 1170 return NULL; 1171 } 1172 1173 thread = _get_thread(); 1174 if (!thread) { 1175 SPDK_ERRLOG("No thread allocated\n"); 1176 pthread_mutex_unlock(&g_devlist_mutex); 1177 return NULL; 1178 } 1179 1180 if (spdk_unlikely(thread->exit)) { 1181 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1182 pthread_mutex_unlock(&g_devlist_mutex); 1183 return NULL; 1184 } 1185 1186 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1187 if (ch->dev == dev) { 1188 ch->ref++; 1189 1190 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1191 ch, dev->name, dev->io_device, thread->name, ch->ref); 1192 1193 /* 1194 * An I/O channel already exists for this device on this 1195 * thread, so return it. 1196 */ 1197 pthread_mutex_unlock(&g_devlist_mutex); 1198 return ch; 1199 } 1200 } 1201 1202 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1203 if (ch == NULL) { 1204 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1205 pthread_mutex_unlock(&g_devlist_mutex); 1206 return NULL; 1207 } 1208 1209 ch->dev = dev; 1210 ch->destroy_cb = dev->destroy_cb; 1211 ch->thread = thread; 1212 ch->ref = 1; 1213 ch->destroy_ref = 0; 1214 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1215 1216 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1217 ch, dev->name, dev->io_device, thread->name, ch->ref); 1218 1219 dev->refcnt++; 1220 1221 pthread_mutex_unlock(&g_devlist_mutex); 1222 1223 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1224 if (rc != 0) { 1225 pthread_mutex_lock(&g_devlist_mutex); 1226 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1227 dev->refcnt--; 1228 free(ch); 1229 pthread_mutex_unlock(&g_devlist_mutex); 1230 return NULL; 1231 } 1232 1233 return ch; 1234 } 1235 1236 static void 1237 _spdk_put_io_channel(void *arg) 1238 { 1239 struct spdk_io_channel *ch = arg; 1240 bool do_remove_dev = true; 1241 struct spdk_thread *thread; 1242 1243 thread = spdk_get_thread(); 1244 if (!thread) { 1245 SPDK_ERRLOG("called from non-SPDK thread\n"); 1246 assert(false); 1247 return; 1248 } 1249 1250 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1251 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1252 ch, ch->dev->name, ch->dev->io_device, thread->name); 1253 1254 assert(ch->thread == thread); 1255 1256 ch->destroy_ref--; 1257 1258 if (ch->ref > 0 || ch->destroy_ref > 0) { 1259 /* 1260 * Another reference to the associated io_device was requested 1261 * after this message was sent but before it had a chance to 1262 * execute. 1263 */ 1264 return; 1265 } 1266 1267 pthread_mutex_lock(&g_devlist_mutex); 1268 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1269 pthread_mutex_unlock(&g_devlist_mutex); 1270 1271 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1272 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1273 1274 pthread_mutex_lock(&g_devlist_mutex); 1275 ch->dev->refcnt--; 1276 1277 if (!ch->dev->unregistered) { 1278 do_remove_dev = false; 1279 } 1280 1281 if (ch->dev->refcnt > 0) { 1282 do_remove_dev = false; 1283 } 1284 1285 pthread_mutex_unlock(&g_devlist_mutex); 1286 1287 if (do_remove_dev) { 1288 _spdk_io_device_free(ch->dev); 1289 } 1290 free(ch); 1291 } 1292 1293 void 1294 spdk_put_io_channel(struct spdk_io_channel *ch) 1295 { 1296 struct spdk_thread *thread; 1297 1298 thread = spdk_get_thread(); 1299 if (!thread) { 1300 SPDK_ERRLOG("called from non-SPDK thread\n"); 1301 assert(false); 1302 return; 1303 } 1304 1305 if (ch->thread != thread) { 1306 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1307 assert(false); 1308 return; 1309 } 1310 1311 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1312 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1313 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1314 1315 ch->ref--; 1316 1317 if (ch->ref == 0) { 1318 ch->destroy_ref++; 1319 spdk_thread_send_msg(thread, _spdk_put_io_channel, ch); 1320 } 1321 } 1322 1323 struct spdk_io_channel * 1324 spdk_io_channel_from_ctx(void *ctx) 1325 { 1326 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1327 } 1328 1329 struct spdk_thread * 1330 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1331 { 1332 return ch->thread; 1333 } 1334 1335 struct spdk_io_channel_iter { 1336 void *io_device; 1337 struct io_device *dev; 1338 spdk_channel_msg fn; 1339 int status; 1340 void *ctx; 1341 struct spdk_io_channel *ch; 1342 1343 struct spdk_thread *cur_thread; 1344 1345 struct spdk_thread *orig_thread; 1346 spdk_channel_for_each_cpl cpl; 1347 }; 1348 1349 void * 1350 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1351 { 1352 return i->io_device; 1353 } 1354 1355 struct spdk_io_channel * 1356 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1357 { 1358 return i->ch; 1359 } 1360 1361 void * 1362 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1363 { 1364 return i->ctx; 1365 } 1366 1367 static void 1368 _call_completion(void *ctx) 1369 { 1370 struct spdk_io_channel_iter *i = ctx; 1371 1372 if (i->cpl != NULL) { 1373 i->cpl(i, i->status); 1374 } 1375 free(i); 1376 } 1377 1378 static void 1379 _call_channel(void *ctx) 1380 { 1381 struct spdk_io_channel_iter *i = ctx; 1382 struct spdk_io_channel *ch; 1383 1384 /* 1385 * It is possible that the channel was deleted before this 1386 * message had a chance to execute. If so, skip calling 1387 * the fn() on this thread. 1388 */ 1389 pthread_mutex_lock(&g_devlist_mutex); 1390 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1391 if (ch->dev->io_device == i->io_device) { 1392 break; 1393 } 1394 } 1395 pthread_mutex_unlock(&g_devlist_mutex); 1396 1397 if (ch) { 1398 i->fn(i); 1399 } else { 1400 spdk_for_each_channel_continue(i, 0); 1401 } 1402 } 1403 1404 void 1405 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1406 spdk_channel_for_each_cpl cpl) 1407 { 1408 struct spdk_thread *thread; 1409 struct spdk_io_channel *ch; 1410 struct spdk_io_channel_iter *i; 1411 int rc __attribute__((unused)); 1412 1413 i = calloc(1, sizeof(*i)); 1414 if (!i) { 1415 SPDK_ERRLOG("Unable to allocate iterator\n"); 1416 return; 1417 } 1418 1419 i->io_device = io_device; 1420 i->fn = fn; 1421 i->ctx = ctx; 1422 i->cpl = cpl; 1423 1424 pthread_mutex_lock(&g_devlist_mutex); 1425 i->orig_thread = _get_thread(); 1426 1427 TAILQ_FOREACH(thread, &g_threads, tailq) { 1428 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1429 if (ch->dev->io_device == io_device) { 1430 ch->dev->for_each_count++; 1431 i->dev = ch->dev; 1432 i->cur_thread = thread; 1433 i->ch = ch; 1434 pthread_mutex_unlock(&g_devlist_mutex); 1435 rc = spdk_thread_send_msg(thread, _call_channel, i); 1436 assert(rc == 0); 1437 return; 1438 } 1439 } 1440 } 1441 1442 pthread_mutex_unlock(&g_devlist_mutex); 1443 1444 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1445 assert(rc == 0); 1446 } 1447 1448 void 1449 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1450 { 1451 struct spdk_thread *thread; 1452 struct spdk_io_channel *ch; 1453 1454 assert(i->cur_thread == spdk_get_thread()); 1455 1456 i->status = status; 1457 1458 pthread_mutex_lock(&g_devlist_mutex); 1459 if (status) { 1460 goto end; 1461 } 1462 thread = TAILQ_NEXT(i->cur_thread, tailq); 1463 while (thread) { 1464 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1465 if (ch->dev->io_device == i->io_device) { 1466 i->cur_thread = thread; 1467 i->ch = ch; 1468 pthread_mutex_unlock(&g_devlist_mutex); 1469 spdk_thread_send_msg(thread, _call_channel, i); 1470 return; 1471 } 1472 } 1473 thread = TAILQ_NEXT(thread, tailq); 1474 } 1475 1476 end: 1477 i->dev->for_each_count--; 1478 i->ch = NULL; 1479 pthread_mutex_unlock(&g_devlist_mutex); 1480 1481 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1482 } 1483 1484 1485 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1486