1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 43 #include "spdk_internal/log.h" 44 #include "spdk_internal/thread.h" 45 46 #define SPDK_MSG_BATCH_SIZE 8 47 #define SPDK_MAX_DEVICE_NAME_LEN 256 48 49 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 50 51 static spdk_new_thread_fn g_new_thread_fn = NULL; 52 static spdk_thread_op_fn g_thread_op_fn = NULL; 53 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 54 static size_t g_ctx_sz = 0; 55 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 56 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 57 * SPDK application is required. 58 */ 59 static uint64_t g_thread_id = 1; 60 61 struct io_device { 62 void *io_device; 63 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 64 spdk_io_channel_create_cb create_cb; 65 spdk_io_channel_destroy_cb destroy_cb; 66 spdk_io_device_unregister_cb unregister_cb; 67 struct spdk_thread *unregister_thread; 68 uint32_t ctx_size; 69 uint32_t for_each_count; 70 TAILQ_ENTRY(io_device) tailq; 71 72 uint32_t refcnt; 73 74 bool unregistered; 75 }; 76 77 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 78 79 struct spdk_msg { 80 spdk_msg_fn fn; 81 void *arg; 82 83 SLIST_ENTRY(spdk_msg) link; 84 }; 85 86 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 87 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 88 89 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 90 static uint32_t g_thread_count = 0; 91 92 static __thread struct spdk_thread *tls_thread = NULL; 93 94 static inline struct spdk_thread * 95 _get_thread(void) 96 { 97 return tls_thread; 98 } 99 100 static int 101 _thread_lib_init(size_t ctx_sz) 102 { 103 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 104 105 g_ctx_sz = ctx_sz; 106 107 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 108 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 109 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 110 sizeof(struct spdk_msg), 111 0, /* No cache. We do our own. */ 112 SPDK_ENV_SOCKET_ID_ANY); 113 114 if (!g_spdk_msg_mempool) { 115 return -1; 116 } 117 118 return 0; 119 } 120 121 int 122 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 123 { 124 assert(g_new_thread_fn == NULL); 125 assert(g_thread_op_fn == NULL); 126 g_new_thread_fn = new_thread_fn; 127 128 return _thread_lib_init(ctx_sz); 129 } 130 131 int 132 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 133 spdk_thread_op_supported_fn thread_op_supported_fn, 134 size_t ctx_sz) 135 { 136 assert(g_new_thread_fn == NULL); 137 assert(g_thread_op_fn == NULL); 138 assert(g_thread_op_supported_fn == NULL); 139 140 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 141 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 142 return -EINVAL; 143 } 144 145 g_thread_op_fn = thread_op_fn; 146 g_thread_op_supported_fn = thread_op_supported_fn; 147 148 return _thread_lib_init(ctx_sz); 149 } 150 151 void 152 spdk_thread_lib_fini(void) 153 { 154 struct io_device *dev; 155 156 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 157 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 158 } 159 160 if (g_spdk_msg_mempool) { 161 spdk_mempool_free(g_spdk_msg_mempool); 162 g_spdk_msg_mempool = NULL; 163 } 164 165 g_new_thread_fn = NULL; 166 g_thread_op_fn = NULL; 167 g_thread_op_supported_fn = NULL; 168 g_ctx_sz = 0; 169 } 170 171 static void 172 _free_thread(struct spdk_thread *thread) 173 { 174 struct spdk_io_channel *ch; 175 struct spdk_msg *msg; 176 struct spdk_poller *poller, *ptmp; 177 178 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 179 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 180 thread->name, ch->dev->name); 181 } 182 183 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 184 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 185 SPDK_WARNLOG("poller %s still registered at thread exit\n", 186 poller->name); 187 } 188 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 189 free(poller); 190 } 191 192 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 193 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 194 SPDK_WARNLOG("poller %s still registered at thread exit\n", 195 poller->name); 196 } 197 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 198 free(poller); 199 } 200 201 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 202 SPDK_WARNLOG("poller %s still registered at thread exit\n", poller->name); 203 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 204 free(poller); 205 } 206 207 pthread_mutex_lock(&g_devlist_mutex); 208 assert(g_thread_count > 0); 209 g_thread_count--; 210 TAILQ_REMOVE(&g_threads, thread, tailq); 211 pthread_mutex_unlock(&g_devlist_mutex); 212 213 msg = SLIST_FIRST(&thread->msg_cache); 214 while (msg != NULL) { 215 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 216 217 assert(thread->msg_cache_count > 0); 218 thread->msg_cache_count--; 219 spdk_mempool_put(g_spdk_msg_mempool, msg); 220 221 msg = SLIST_FIRST(&thread->msg_cache); 222 } 223 224 assert(thread->msg_cache_count == 0); 225 226 spdk_ring_free(thread->messages); 227 free(thread); 228 } 229 230 struct spdk_thread * 231 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 232 { 233 struct spdk_thread *thread; 234 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 235 int rc = 0, i; 236 237 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 238 if (!thread) { 239 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 240 return NULL; 241 } 242 243 if (cpumask) { 244 spdk_cpuset_copy(&thread->cpumask, cpumask); 245 } else { 246 spdk_cpuset_negate(&thread->cpumask); 247 } 248 249 TAILQ_INIT(&thread->io_channels); 250 TAILQ_INIT(&thread->active_pollers); 251 TAILQ_INIT(&thread->timed_pollers); 252 TAILQ_INIT(&thread->paused_pollers); 253 SLIST_INIT(&thread->msg_cache); 254 thread->msg_cache_count = 0; 255 256 thread->tsc_last = spdk_get_ticks(); 257 258 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 259 if (!thread->messages) { 260 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 261 free(thread); 262 return NULL; 263 } 264 265 /* Fill the local message pool cache. */ 266 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 267 if (rc == 0) { 268 /* If we can't populate the cache it's ok. The cache will get filled 269 * up organically as messages are passed to the thread. */ 270 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 271 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 272 thread->msg_cache_count++; 273 } 274 } 275 276 if (name) { 277 snprintf(thread->name, sizeof(thread->name), "%s", name); 278 } else { 279 snprintf(thread->name, sizeof(thread->name), "%p", thread); 280 } 281 282 pthread_mutex_lock(&g_devlist_mutex); 283 if (g_thread_id == 0) { 284 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 285 pthread_mutex_unlock(&g_devlist_mutex); 286 _free_thread(thread); 287 return NULL; 288 } 289 thread->id = g_thread_id++; 290 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 291 g_thread_count++; 292 pthread_mutex_unlock(&g_devlist_mutex); 293 294 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread (%" PRIu64 ", %s)\n", 295 thread->id, thread->name); 296 297 if (g_new_thread_fn) { 298 rc = g_new_thread_fn(thread); 299 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 300 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 301 } 302 303 if (rc != 0) { 304 _free_thread(thread); 305 return NULL; 306 } 307 308 return thread; 309 } 310 311 void 312 spdk_set_thread(struct spdk_thread *thread) 313 { 314 tls_thread = thread; 315 } 316 317 int 318 spdk_thread_exit(struct spdk_thread *thread) 319 { 320 struct spdk_poller *poller; 321 struct spdk_io_channel *ch; 322 323 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); 324 325 assert(tls_thread == thread); 326 327 if (thread->exit) { 328 SPDK_ERRLOG("thread %s is already marked as exited\n", 329 thread->name); 330 return -EINVAL; 331 } 332 333 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 334 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 335 SPDK_ERRLOG("thread %s still has active poller %s\n", 336 thread->name, poller->name); 337 return -EBUSY; 338 } 339 } 340 341 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 342 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 343 SPDK_ERRLOG("thread %s still has active timed poller %s\n", 344 thread->name, poller->name); 345 return -EBUSY; 346 } 347 } 348 349 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 350 SPDK_ERRLOG("thread %s still has paused poller %s\n", 351 thread->name, poller->name); 352 return -EBUSY; 353 } 354 355 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 356 if (ch->ref != 0) { 357 SPDK_ERRLOG("thread %s still has active channel for io_device %s\n", 358 thread->name, ch->dev->name); 359 return -EBUSY; 360 } 361 } 362 363 thread->exit = true; 364 return 0; 365 } 366 367 bool 368 spdk_thread_is_exited(struct spdk_thread *thread) 369 { 370 return thread->exit; 371 } 372 373 void 374 spdk_thread_destroy(struct spdk_thread *thread) 375 { 376 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); 377 378 assert(thread->exit == true); 379 380 if (tls_thread == thread) { 381 tls_thread = NULL; 382 } 383 384 _free_thread(thread); 385 } 386 387 void * 388 spdk_thread_get_ctx(struct spdk_thread *thread) 389 { 390 if (g_ctx_sz > 0) { 391 return thread->ctx; 392 } 393 394 return NULL; 395 } 396 397 struct spdk_cpuset * 398 spdk_thread_get_cpumask(struct spdk_thread *thread) 399 { 400 return &thread->cpumask; 401 } 402 403 int 404 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 405 { 406 struct spdk_thread *thread; 407 408 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 409 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 410 assert(false); 411 return -ENOTSUP; 412 } 413 414 thread = spdk_get_thread(); 415 if (!thread) { 416 SPDK_ERRLOG("Called from non-SPDK thread\n"); 417 assert(false); 418 return -EINVAL; 419 } 420 421 spdk_cpuset_copy(&thread->cpumask, cpumask); 422 423 /* Invoke framework's reschedule operation. If this function is called multiple times 424 * in a single spdk_thread_poll() context, the last cpumask will be used in the 425 * reschedule operation. 426 */ 427 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 428 429 return 0; 430 } 431 432 struct spdk_thread * 433 spdk_thread_get_from_ctx(void *ctx) 434 { 435 if (ctx == NULL) { 436 assert(false); 437 return NULL; 438 } 439 440 assert(g_ctx_sz > 0); 441 442 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 443 } 444 445 static inline uint32_t 446 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 447 { 448 unsigned count, i; 449 void *messages[SPDK_MSG_BATCH_SIZE]; 450 451 #ifdef DEBUG 452 /* 453 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 454 * so we will never actually read uninitialized data from events, but just to be sure 455 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 456 */ 457 memset(messages, 0, sizeof(messages)); 458 #endif 459 460 if (max_msgs > 0) { 461 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 462 } else { 463 max_msgs = SPDK_MSG_BATCH_SIZE; 464 } 465 466 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 467 if (count == 0) { 468 return 0; 469 } 470 471 for (i = 0; i < count; i++) { 472 struct spdk_msg *msg = messages[i]; 473 474 assert(msg != NULL); 475 msg->fn(msg->arg); 476 477 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 478 /* Insert the messages at the head. We want to re-use the hot 479 * ones. */ 480 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 481 thread->msg_cache_count++; 482 } else { 483 spdk_mempool_put(g_spdk_msg_mempool, msg); 484 } 485 } 486 487 return count; 488 } 489 490 static void 491 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 492 { 493 struct spdk_poller *iter; 494 495 poller->next_run_tick = now + poller->period_ticks; 496 497 /* 498 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 499 * run time. 500 */ 501 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 502 if (iter->next_run_tick <= poller->next_run_tick) { 503 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 504 return; 505 } 506 } 507 508 /* No earlier pollers were found, so this poller must be the new head */ 509 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 510 } 511 512 static void 513 _spdk_thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 514 { 515 if (poller->period_ticks) { 516 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 517 } else { 518 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 519 } 520 } 521 522 static inline void 523 _spdk_thread_update_stats(struct spdk_thread *thread, uint64_t end, 524 uint64_t start, int rc) 525 { 526 if (rc == 0) { 527 /* Poller status idle */ 528 thread->stats.idle_tsc += end - start; 529 } else if (rc > 0) { 530 /* Poller status busy */ 531 thread->stats.busy_tsc += end - start; 532 } 533 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 534 thread->tsc_last = end; 535 } 536 537 static int 538 _spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 539 { 540 uint32_t msg_count; 541 struct spdk_poller *poller, *tmp; 542 spdk_msg_fn critical_msg; 543 int rc = 0; 544 545 critical_msg = thread->critical_msg; 546 if (spdk_unlikely(critical_msg != NULL)) { 547 critical_msg(NULL); 548 thread->critical_msg = NULL; 549 } 550 551 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 552 if (msg_count) { 553 rc = 1; 554 } 555 556 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 557 active_pollers_head, tailq, tmp) { 558 int poller_rc; 559 560 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 561 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 562 free(poller); 563 continue; 564 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 565 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 566 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 567 poller->state = SPDK_POLLER_STATE_PAUSED; 568 continue; 569 } 570 571 poller->state = SPDK_POLLER_STATE_RUNNING; 572 poller_rc = poller->fn(poller->arg); 573 574 poller->run_count++; 575 576 #ifdef DEBUG 577 if (poller_rc == -1) { 578 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %s returned -1\n", poller->name); 579 } 580 #endif 581 582 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 583 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 584 free(poller); 585 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 586 poller->state = SPDK_POLLER_STATE_WAITING; 587 } 588 589 if (poller_rc > rc) { 590 rc = poller_rc; 591 } 592 } 593 594 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 595 int timer_rc = 0; 596 597 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 598 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 599 free(poller); 600 continue; 601 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 602 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 603 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 604 poller->state = SPDK_POLLER_STATE_PAUSED; 605 continue; 606 } 607 608 if (now < poller->next_run_tick) { 609 break; 610 } 611 612 poller->state = SPDK_POLLER_STATE_RUNNING; 613 timer_rc = poller->fn(poller->arg); 614 615 poller->run_count++; 616 617 #ifdef DEBUG 618 if (timer_rc == -1) { 619 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %s returned -1\n", poller->name); 620 } 621 #endif 622 623 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 624 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 625 free(poller); 626 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 627 poller->state = SPDK_POLLER_STATE_WAITING; 628 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 629 _spdk_poller_insert_timer(thread, poller, now); 630 } 631 632 if (timer_rc > rc) { 633 rc = timer_rc; 634 } 635 } 636 637 return rc; 638 } 639 640 int 641 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 642 { 643 struct spdk_thread *orig_thread; 644 int rc; 645 646 orig_thread = _get_thread(); 647 tls_thread = thread; 648 649 if (now == 0) { 650 now = spdk_get_ticks(); 651 } 652 653 rc = _spdk_thread_poll(thread, max_msgs, now); 654 655 _spdk_thread_update_stats(thread, spdk_get_ticks(), now, rc); 656 657 tls_thread = orig_thread; 658 659 return rc; 660 } 661 662 uint64_t 663 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 664 { 665 struct spdk_poller *poller; 666 667 poller = TAILQ_FIRST(&thread->timed_pollers); 668 if (poller) { 669 return poller->next_run_tick; 670 } 671 672 return 0; 673 } 674 675 int 676 spdk_thread_has_active_pollers(struct spdk_thread *thread) 677 { 678 return !TAILQ_EMPTY(&thread->active_pollers); 679 } 680 681 static bool 682 _spdk_thread_has_unpaused_pollers(struct spdk_thread *thread) 683 { 684 if (TAILQ_EMPTY(&thread->active_pollers) && 685 TAILQ_EMPTY(&thread->timed_pollers)) { 686 return false; 687 } 688 689 return true; 690 } 691 692 bool 693 spdk_thread_has_pollers(struct spdk_thread *thread) 694 { 695 if (!_spdk_thread_has_unpaused_pollers(thread) && 696 TAILQ_EMPTY(&thread->paused_pollers)) { 697 return false; 698 } 699 700 return true; 701 } 702 703 bool 704 spdk_thread_is_idle(struct spdk_thread *thread) 705 { 706 if (spdk_ring_count(thread->messages) || 707 _spdk_thread_has_unpaused_pollers(thread) || 708 thread->critical_msg != NULL) { 709 return false; 710 } 711 712 return true; 713 } 714 715 uint32_t 716 spdk_thread_get_count(void) 717 { 718 /* 719 * Return cached value of the current thread count. We could acquire the 720 * lock and iterate through the TAILQ of threads to count them, but that 721 * count could still be invalidated after we release the lock. 722 */ 723 return g_thread_count; 724 } 725 726 struct spdk_thread * 727 spdk_get_thread(void) 728 { 729 return _get_thread(); 730 } 731 732 const char * 733 spdk_thread_get_name(const struct spdk_thread *thread) 734 { 735 return thread->name; 736 } 737 738 uint64_t 739 spdk_thread_get_id(const struct spdk_thread *thread) 740 { 741 return thread->id; 742 } 743 744 struct spdk_thread * 745 spdk_thread_get_by_id(uint64_t id) 746 { 747 struct spdk_thread *thread; 748 749 pthread_mutex_lock(&g_devlist_mutex); 750 TAILQ_FOREACH(thread, &g_threads, tailq) { 751 if (thread->id == id) { 752 pthread_mutex_unlock(&g_devlist_mutex); 753 754 return thread; 755 } 756 } 757 pthread_mutex_unlock(&g_devlist_mutex); 758 759 return NULL; 760 } 761 762 int 763 spdk_thread_get_stats(struct spdk_thread_stats *stats) 764 { 765 struct spdk_thread *thread; 766 767 thread = _get_thread(); 768 if (!thread) { 769 SPDK_ERRLOG("No thread allocated\n"); 770 return -EINVAL; 771 } 772 773 if (stats == NULL) { 774 return -EINVAL; 775 } 776 777 *stats = thread->stats; 778 779 return 0; 780 } 781 782 uint64_t 783 spdk_thread_get_last_tsc(struct spdk_thread *thread) 784 { 785 return thread->tsc_last; 786 } 787 788 int 789 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 790 { 791 struct spdk_thread *local_thread; 792 struct spdk_msg *msg; 793 int rc; 794 795 assert(thread != NULL); 796 797 if (spdk_unlikely(thread->exit)) { 798 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 799 return -EIO; 800 } 801 802 local_thread = _get_thread(); 803 804 msg = NULL; 805 if (local_thread != NULL) { 806 if (local_thread->msg_cache_count > 0) { 807 msg = SLIST_FIRST(&local_thread->msg_cache); 808 assert(msg != NULL); 809 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 810 local_thread->msg_cache_count--; 811 } 812 } 813 814 if (msg == NULL) { 815 msg = spdk_mempool_get(g_spdk_msg_mempool); 816 if (!msg) { 817 SPDK_ERRLOG("msg could not be allocated\n"); 818 return -ENOMEM; 819 } 820 } 821 822 msg->fn = fn; 823 msg->arg = ctx; 824 825 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 826 if (rc != 1) { 827 SPDK_ERRLOG("msg could not be enqueued\n"); 828 spdk_mempool_put(g_spdk_msg_mempool, msg); 829 return -EIO; 830 } 831 832 return 0; 833 } 834 835 int 836 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 837 { 838 spdk_msg_fn expected = NULL; 839 840 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 841 __ATOMIC_SEQ_CST)) { 842 return 0; 843 } 844 845 return -EIO; 846 } 847 848 static struct spdk_poller * 849 _spdk_poller_register(spdk_poller_fn fn, 850 void *arg, 851 uint64_t period_microseconds, 852 const char *name) 853 { 854 struct spdk_thread *thread; 855 struct spdk_poller *poller; 856 uint64_t quotient, remainder, ticks; 857 858 thread = spdk_get_thread(); 859 if (!thread) { 860 assert(false); 861 return NULL; 862 } 863 864 if (spdk_unlikely(thread->exit)) { 865 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 866 return NULL; 867 } 868 869 poller = calloc(1, sizeof(*poller)); 870 if (poller == NULL) { 871 SPDK_ERRLOG("Poller memory allocation failed\n"); 872 return NULL; 873 } 874 875 if (name) { 876 snprintf(poller->name, sizeof(poller->name), "%s", name); 877 } else { 878 snprintf(poller->name, sizeof(poller->name), "%p", fn); 879 } 880 881 poller->state = SPDK_POLLER_STATE_WAITING; 882 poller->fn = fn; 883 poller->arg = arg; 884 poller->thread = thread; 885 886 if (period_microseconds) { 887 quotient = period_microseconds / SPDK_SEC_TO_USEC; 888 remainder = period_microseconds % SPDK_SEC_TO_USEC; 889 ticks = spdk_get_ticks_hz(); 890 891 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 892 } else { 893 poller->period_ticks = 0; 894 } 895 896 _spdk_thread_insert_poller(thread, poller); 897 898 return poller; 899 } 900 901 struct spdk_poller * 902 spdk_poller_register(spdk_poller_fn fn, 903 void *arg, 904 uint64_t period_microseconds) 905 { 906 return _spdk_poller_register(fn, arg, period_microseconds, NULL); 907 } 908 909 struct spdk_poller * 910 spdk_poller_register_named(spdk_poller_fn fn, 911 void *arg, 912 uint64_t period_microseconds, 913 const char *name) 914 { 915 return _spdk_poller_register(fn, arg, period_microseconds, name); 916 } 917 918 void 919 spdk_poller_unregister(struct spdk_poller **ppoller) 920 { 921 struct spdk_thread *thread; 922 struct spdk_poller *poller; 923 924 poller = *ppoller; 925 if (poller == NULL) { 926 return; 927 } 928 929 *ppoller = NULL; 930 931 thread = spdk_get_thread(); 932 if (!thread) { 933 assert(false); 934 return; 935 } 936 937 if (poller->thread != thread) { 938 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 939 assert(false); 940 return; 941 } 942 943 /* If the poller was paused, put it on the active_pollers list so that 944 * its unregistration can be processed by spdk_thread_poll(). 945 */ 946 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 947 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 948 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 949 poller->period_ticks = 0; 950 } 951 952 /* Simply set the state to unregistered. The poller will get cleaned up 953 * in a subsequent call to spdk_thread_poll(). 954 */ 955 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 956 } 957 958 void 959 spdk_poller_pause(struct spdk_poller *poller) 960 { 961 struct spdk_thread *thread; 962 963 if (poller->state == SPDK_POLLER_STATE_PAUSED || 964 poller->state == SPDK_POLLER_STATE_PAUSING) { 965 return; 966 } 967 968 thread = spdk_get_thread(); 969 if (!thread) { 970 assert(false); 971 return; 972 } 973 974 /* If a poller is paused from within itself, we can immediately move it 975 * on the paused_pollers list. Otherwise we just set its state to 976 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 977 * allows a poller to be paused from another one's context without 978 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 979 */ 980 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 981 poller->state = SPDK_POLLER_STATE_PAUSING; 982 } else { 983 if (poller->period_ticks > 0) { 984 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 985 } else { 986 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 987 } 988 989 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 990 poller->state = SPDK_POLLER_STATE_PAUSED; 991 } 992 } 993 994 void 995 spdk_poller_resume(struct spdk_poller *poller) 996 { 997 struct spdk_thread *thread; 998 999 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1000 poller->state != SPDK_POLLER_STATE_PAUSING) { 1001 return; 1002 } 1003 1004 thread = spdk_get_thread(); 1005 if (!thread) { 1006 assert(false); 1007 return; 1008 } 1009 1010 /* If a poller is paused it has to be removed from the paused pollers 1011 * list and put on the active / timer list depending on its 1012 * period_ticks. If a poller is still in the process of being paused, 1013 * we just need to flip its state back to waiting, as it's already on 1014 * the appropriate list. 1015 */ 1016 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1017 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1018 _spdk_thread_insert_poller(thread, poller); 1019 } 1020 1021 poller->state = SPDK_POLLER_STATE_WAITING; 1022 } 1023 1024 const char * 1025 spdk_poller_state_str(enum spdk_poller_state state) 1026 { 1027 switch (state) { 1028 case SPDK_POLLER_STATE_WAITING: 1029 return "waiting"; 1030 case SPDK_POLLER_STATE_RUNNING: 1031 return "running"; 1032 case SPDK_POLLER_STATE_UNREGISTERED: 1033 return "unregistered"; 1034 case SPDK_POLLER_STATE_PAUSING: 1035 return "pausing"; 1036 case SPDK_POLLER_STATE_PAUSED: 1037 return "paused"; 1038 default: 1039 return NULL; 1040 } 1041 } 1042 1043 struct call_thread { 1044 struct spdk_thread *cur_thread; 1045 spdk_msg_fn fn; 1046 void *ctx; 1047 1048 struct spdk_thread *orig_thread; 1049 spdk_msg_fn cpl; 1050 }; 1051 1052 static void 1053 spdk_on_thread(void *ctx) 1054 { 1055 struct call_thread *ct = ctx; 1056 int rc __attribute__((unused)); 1057 1058 ct->fn(ct->ctx); 1059 1060 pthread_mutex_lock(&g_devlist_mutex); 1061 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1062 pthread_mutex_unlock(&g_devlist_mutex); 1063 1064 if (!ct->cur_thread) { 1065 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 1066 1067 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1068 free(ctx); 1069 } else { 1070 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 1071 ct->cur_thread->name); 1072 1073 rc = spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 1074 } 1075 assert(rc == 0); 1076 } 1077 1078 void 1079 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1080 { 1081 struct call_thread *ct; 1082 struct spdk_thread *thread; 1083 int rc __attribute__((unused)); 1084 1085 ct = calloc(1, sizeof(*ct)); 1086 if (!ct) { 1087 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1088 cpl(ctx); 1089 return; 1090 } 1091 1092 ct->fn = fn; 1093 ct->ctx = ctx; 1094 ct->cpl = cpl; 1095 1096 thread = _get_thread(); 1097 if (!thread) { 1098 SPDK_ERRLOG("No thread allocated\n"); 1099 free(ct); 1100 cpl(ctx); 1101 return; 1102 } 1103 ct->orig_thread = thread; 1104 1105 pthread_mutex_lock(&g_devlist_mutex); 1106 ct->cur_thread = TAILQ_FIRST(&g_threads); 1107 pthread_mutex_unlock(&g_devlist_mutex); 1108 1109 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 1110 ct->orig_thread->name); 1111 1112 rc = spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 1113 assert(rc == 0); 1114 } 1115 1116 void 1117 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1118 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1119 const char *name) 1120 { 1121 struct io_device *dev, *tmp; 1122 struct spdk_thread *thread; 1123 1124 assert(io_device != NULL); 1125 assert(create_cb != NULL); 1126 assert(destroy_cb != NULL); 1127 1128 thread = spdk_get_thread(); 1129 if (!thread) { 1130 SPDK_ERRLOG("called from non-SPDK thread\n"); 1131 assert(false); 1132 return; 1133 } 1134 1135 dev = calloc(1, sizeof(struct io_device)); 1136 if (dev == NULL) { 1137 SPDK_ERRLOG("could not allocate io_device\n"); 1138 return; 1139 } 1140 1141 dev->io_device = io_device; 1142 if (name) { 1143 snprintf(dev->name, sizeof(dev->name), "%s", name); 1144 } else { 1145 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1146 } 1147 dev->create_cb = create_cb; 1148 dev->destroy_cb = destroy_cb; 1149 dev->unregister_cb = NULL; 1150 dev->ctx_size = ctx_size; 1151 dev->for_each_count = 0; 1152 dev->unregistered = false; 1153 dev->refcnt = 0; 1154 1155 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 1156 dev->name, dev->io_device, thread->name); 1157 1158 pthread_mutex_lock(&g_devlist_mutex); 1159 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1160 if (tmp->io_device == io_device) { 1161 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1162 io_device, tmp->name, dev->name); 1163 free(dev); 1164 pthread_mutex_unlock(&g_devlist_mutex); 1165 return; 1166 } 1167 } 1168 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1169 pthread_mutex_unlock(&g_devlist_mutex); 1170 } 1171 1172 static void 1173 _finish_unregister(void *arg) 1174 { 1175 struct io_device *dev = arg; 1176 1177 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1178 dev->name, dev->io_device, dev->unregister_thread->name); 1179 1180 dev->unregister_cb(dev->io_device); 1181 free(dev); 1182 } 1183 1184 static void 1185 _spdk_io_device_free(struct io_device *dev) 1186 { 1187 int rc __attribute__((unused)); 1188 1189 if (dev->unregister_cb == NULL) { 1190 free(dev); 1191 } else { 1192 assert(dev->unregister_thread != NULL); 1193 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 1194 dev->name, dev->io_device, dev->unregister_thread->name); 1195 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1196 assert(rc == 0); 1197 } 1198 } 1199 1200 void 1201 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1202 { 1203 struct io_device *dev; 1204 uint32_t refcnt; 1205 struct spdk_thread *thread; 1206 1207 thread = spdk_get_thread(); 1208 if (!thread) { 1209 SPDK_ERRLOG("called from non-SPDK thread\n"); 1210 assert(false); 1211 return; 1212 } 1213 1214 pthread_mutex_lock(&g_devlist_mutex); 1215 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1216 if (dev->io_device == io_device) { 1217 break; 1218 } 1219 } 1220 1221 if (!dev) { 1222 SPDK_ERRLOG("io_device %p not found\n", io_device); 1223 assert(false); 1224 pthread_mutex_unlock(&g_devlist_mutex); 1225 return; 1226 } 1227 1228 if (dev->for_each_count > 0) { 1229 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1230 dev->name, io_device, dev->for_each_count); 1231 pthread_mutex_unlock(&g_devlist_mutex); 1232 return; 1233 } 1234 1235 dev->unregister_cb = unregister_cb; 1236 dev->unregistered = true; 1237 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1238 refcnt = dev->refcnt; 1239 dev->unregister_thread = thread; 1240 pthread_mutex_unlock(&g_devlist_mutex); 1241 1242 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 1243 dev->name, dev->io_device, thread->name); 1244 1245 if (refcnt > 0) { 1246 /* defer deletion */ 1247 return; 1248 } 1249 1250 _spdk_io_device_free(dev); 1251 } 1252 1253 const char * 1254 spdk_io_device_get_name(struct io_device *dev) 1255 { 1256 return dev->name; 1257 } 1258 1259 struct spdk_io_channel * 1260 spdk_get_io_channel(void *io_device) 1261 { 1262 struct spdk_io_channel *ch; 1263 struct spdk_thread *thread; 1264 struct io_device *dev; 1265 int rc; 1266 1267 pthread_mutex_lock(&g_devlist_mutex); 1268 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1269 if (dev->io_device == io_device) { 1270 break; 1271 } 1272 } 1273 if (dev == NULL) { 1274 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1275 pthread_mutex_unlock(&g_devlist_mutex); 1276 return NULL; 1277 } 1278 1279 thread = _get_thread(); 1280 if (!thread) { 1281 SPDK_ERRLOG("No thread allocated\n"); 1282 pthread_mutex_unlock(&g_devlist_mutex); 1283 return NULL; 1284 } 1285 1286 if (spdk_unlikely(thread->exit)) { 1287 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1288 pthread_mutex_unlock(&g_devlist_mutex); 1289 return NULL; 1290 } 1291 1292 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1293 if (ch->dev == dev) { 1294 ch->ref++; 1295 1296 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1297 ch, dev->name, dev->io_device, thread->name, ch->ref); 1298 1299 /* 1300 * An I/O channel already exists for this device on this 1301 * thread, so return it. 1302 */ 1303 pthread_mutex_unlock(&g_devlist_mutex); 1304 return ch; 1305 } 1306 } 1307 1308 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1309 if (ch == NULL) { 1310 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1311 pthread_mutex_unlock(&g_devlist_mutex); 1312 return NULL; 1313 } 1314 1315 ch->dev = dev; 1316 ch->destroy_cb = dev->destroy_cb; 1317 ch->thread = thread; 1318 ch->ref = 1; 1319 ch->destroy_ref = 0; 1320 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1321 1322 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1323 ch, dev->name, dev->io_device, thread->name, ch->ref); 1324 1325 dev->refcnt++; 1326 1327 pthread_mutex_unlock(&g_devlist_mutex); 1328 1329 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1330 if (rc != 0) { 1331 pthread_mutex_lock(&g_devlist_mutex); 1332 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1333 dev->refcnt--; 1334 free(ch); 1335 pthread_mutex_unlock(&g_devlist_mutex); 1336 return NULL; 1337 } 1338 1339 return ch; 1340 } 1341 1342 static void 1343 _spdk_put_io_channel(void *arg) 1344 { 1345 struct spdk_io_channel *ch = arg; 1346 bool do_remove_dev = true; 1347 struct spdk_thread *thread; 1348 1349 thread = spdk_get_thread(); 1350 if (!thread) { 1351 SPDK_ERRLOG("called from non-SPDK thread\n"); 1352 assert(false); 1353 return; 1354 } 1355 1356 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1357 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1358 ch, ch->dev->name, ch->dev->io_device, thread->name); 1359 1360 assert(ch->thread == thread); 1361 1362 ch->destroy_ref--; 1363 1364 if (ch->ref > 0 || ch->destroy_ref > 0) { 1365 /* 1366 * Another reference to the associated io_device was requested 1367 * after this message was sent but before it had a chance to 1368 * execute. 1369 */ 1370 return; 1371 } 1372 1373 pthread_mutex_lock(&g_devlist_mutex); 1374 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1375 pthread_mutex_unlock(&g_devlist_mutex); 1376 1377 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1378 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1379 1380 pthread_mutex_lock(&g_devlist_mutex); 1381 ch->dev->refcnt--; 1382 1383 if (!ch->dev->unregistered) { 1384 do_remove_dev = false; 1385 } 1386 1387 if (ch->dev->refcnt > 0) { 1388 do_remove_dev = false; 1389 } 1390 1391 pthread_mutex_unlock(&g_devlist_mutex); 1392 1393 if (do_remove_dev) { 1394 _spdk_io_device_free(ch->dev); 1395 } 1396 free(ch); 1397 } 1398 1399 void 1400 spdk_put_io_channel(struct spdk_io_channel *ch) 1401 { 1402 struct spdk_thread *thread; 1403 int rc __attribute__((unused)); 1404 1405 thread = spdk_get_thread(); 1406 if (!thread) { 1407 SPDK_ERRLOG("called from non-SPDK thread\n"); 1408 assert(false); 1409 return; 1410 } 1411 1412 if (ch->thread != thread) { 1413 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1414 assert(false); 1415 return; 1416 } 1417 1418 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1419 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1420 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1421 1422 ch->ref--; 1423 1424 if (ch->ref == 0) { 1425 ch->destroy_ref++; 1426 rc = spdk_thread_send_msg(thread, _spdk_put_io_channel, ch); 1427 assert(rc == 0); 1428 } 1429 } 1430 1431 struct spdk_io_channel * 1432 spdk_io_channel_from_ctx(void *ctx) 1433 { 1434 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1435 } 1436 1437 struct spdk_thread * 1438 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1439 { 1440 return ch->thread; 1441 } 1442 1443 struct spdk_io_channel_iter { 1444 void *io_device; 1445 struct io_device *dev; 1446 spdk_channel_msg fn; 1447 int status; 1448 void *ctx; 1449 struct spdk_io_channel *ch; 1450 1451 struct spdk_thread *cur_thread; 1452 1453 struct spdk_thread *orig_thread; 1454 spdk_channel_for_each_cpl cpl; 1455 }; 1456 1457 void * 1458 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1459 { 1460 return i->io_device; 1461 } 1462 1463 struct spdk_io_channel * 1464 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1465 { 1466 return i->ch; 1467 } 1468 1469 void * 1470 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1471 { 1472 return i->ctx; 1473 } 1474 1475 static void 1476 _call_completion(void *ctx) 1477 { 1478 struct spdk_io_channel_iter *i = ctx; 1479 1480 if (i->cpl != NULL) { 1481 i->cpl(i, i->status); 1482 } 1483 free(i); 1484 } 1485 1486 static void 1487 _call_channel(void *ctx) 1488 { 1489 struct spdk_io_channel_iter *i = ctx; 1490 struct spdk_io_channel *ch; 1491 1492 /* 1493 * It is possible that the channel was deleted before this 1494 * message had a chance to execute. If so, skip calling 1495 * the fn() on this thread. 1496 */ 1497 pthread_mutex_lock(&g_devlist_mutex); 1498 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1499 if (ch->dev->io_device == i->io_device) { 1500 break; 1501 } 1502 } 1503 pthread_mutex_unlock(&g_devlist_mutex); 1504 1505 if (ch) { 1506 i->fn(i); 1507 } else { 1508 spdk_for_each_channel_continue(i, 0); 1509 } 1510 } 1511 1512 void 1513 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1514 spdk_channel_for_each_cpl cpl) 1515 { 1516 struct spdk_thread *thread; 1517 struct spdk_io_channel *ch; 1518 struct spdk_io_channel_iter *i; 1519 int rc __attribute__((unused)); 1520 1521 i = calloc(1, sizeof(*i)); 1522 if (!i) { 1523 SPDK_ERRLOG("Unable to allocate iterator\n"); 1524 return; 1525 } 1526 1527 i->io_device = io_device; 1528 i->fn = fn; 1529 i->ctx = ctx; 1530 i->cpl = cpl; 1531 1532 pthread_mutex_lock(&g_devlist_mutex); 1533 i->orig_thread = _get_thread(); 1534 1535 TAILQ_FOREACH(thread, &g_threads, tailq) { 1536 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1537 if (ch->dev->io_device == io_device) { 1538 ch->dev->for_each_count++; 1539 i->dev = ch->dev; 1540 i->cur_thread = thread; 1541 i->ch = ch; 1542 pthread_mutex_unlock(&g_devlist_mutex); 1543 rc = spdk_thread_send_msg(thread, _call_channel, i); 1544 assert(rc == 0); 1545 return; 1546 } 1547 } 1548 } 1549 1550 pthread_mutex_unlock(&g_devlist_mutex); 1551 1552 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1553 assert(rc == 0); 1554 } 1555 1556 void 1557 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1558 { 1559 struct spdk_thread *thread; 1560 struct spdk_io_channel *ch; 1561 int rc __attribute__((unused)); 1562 1563 assert(i->cur_thread == spdk_get_thread()); 1564 1565 i->status = status; 1566 1567 pthread_mutex_lock(&g_devlist_mutex); 1568 if (status) { 1569 goto end; 1570 } 1571 thread = TAILQ_NEXT(i->cur_thread, tailq); 1572 while (thread) { 1573 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1574 if (ch->dev->io_device == i->io_device) { 1575 i->cur_thread = thread; 1576 i->ch = ch; 1577 pthread_mutex_unlock(&g_devlist_mutex); 1578 rc = spdk_thread_send_msg(thread, _call_channel, i); 1579 assert(rc == 0); 1580 return; 1581 } 1582 } 1583 thread = TAILQ_NEXT(thread, tailq); 1584 } 1585 1586 end: 1587 i->dev->for_each_count--; 1588 i->ch = NULL; 1589 pthread_mutex_unlock(&g_devlist_mutex); 1590 1591 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1592 assert(rc == 0); 1593 } 1594 1595 1596 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1597