1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 43 #include "spdk_internal/log.h" 44 #include "spdk_internal/thread.h" 45 46 #define SPDK_MSG_BATCH_SIZE 8 47 #define SPDK_MAX_DEVICE_NAME_LEN 256 48 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 49 50 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 51 52 static spdk_new_thread_fn g_new_thread_fn = NULL; 53 static spdk_thread_op_fn g_thread_op_fn = NULL; 54 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 55 static size_t g_ctx_sz = 0; 56 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 57 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 58 * SPDK application is required. 59 */ 60 static uint64_t g_thread_id = 1; 61 62 struct io_device { 63 void *io_device; 64 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 65 spdk_io_channel_create_cb create_cb; 66 spdk_io_channel_destroy_cb destroy_cb; 67 spdk_io_device_unregister_cb unregister_cb; 68 struct spdk_thread *unregister_thread; 69 uint32_t ctx_size; 70 uint32_t for_each_count; 71 TAILQ_ENTRY(io_device) tailq; 72 73 uint32_t refcnt; 74 75 bool unregistered; 76 }; 77 78 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 79 80 struct spdk_msg { 81 spdk_msg_fn fn; 82 void *arg; 83 84 SLIST_ENTRY(spdk_msg) link; 85 }; 86 87 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 88 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 89 90 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 91 static uint32_t g_thread_count = 0; 92 93 static __thread struct spdk_thread *tls_thread = NULL; 94 95 static inline struct spdk_thread * 96 _get_thread(void) 97 { 98 return tls_thread; 99 } 100 101 static int 102 _thread_lib_init(size_t ctx_sz) 103 { 104 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 105 106 g_ctx_sz = ctx_sz; 107 108 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 109 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 110 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 111 sizeof(struct spdk_msg), 112 0, /* No cache. We do our own. */ 113 SPDK_ENV_SOCKET_ID_ANY); 114 115 if (!g_spdk_msg_mempool) { 116 return -1; 117 } 118 119 return 0; 120 } 121 122 int 123 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 124 { 125 assert(g_new_thread_fn == NULL); 126 assert(g_thread_op_fn == NULL); 127 g_new_thread_fn = new_thread_fn; 128 129 return _thread_lib_init(ctx_sz); 130 } 131 132 int 133 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 134 spdk_thread_op_supported_fn thread_op_supported_fn, 135 size_t ctx_sz) 136 { 137 assert(g_new_thread_fn == NULL); 138 assert(g_thread_op_fn == NULL); 139 assert(g_thread_op_supported_fn == NULL); 140 141 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 142 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 143 return -EINVAL; 144 } 145 146 g_thread_op_fn = thread_op_fn; 147 g_thread_op_supported_fn = thread_op_supported_fn; 148 149 return _thread_lib_init(ctx_sz); 150 } 151 152 void 153 spdk_thread_lib_fini(void) 154 { 155 struct io_device *dev; 156 157 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 158 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 159 } 160 161 if (g_spdk_msg_mempool) { 162 spdk_mempool_free(g_spdk_msg_mempool); 163 g_spdk_msg_mempool = NULL; 164 } 165 166 g_new_thread_fn = NULL; 167 g_thread_op_fn = NULL; 168 g_thread_op_supported_fn = NULL; 169 g_ctx_sz = 0; 170 } 171 172 static void 173 _free_thread(struct spdk_thread *thread) 174 { 175 struct spdk_io_channel *ch; 176 struct spdk_msg *msg; 177 struct spdk_poller *poller, *ptmp; 178 179 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 180 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 181 thread->name, ch->dev->name); 182 } 183 184 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 185 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 186 SPDK_WARNLOG("poller %s still registered at thread exit\n", 187 poller->name); 188 } 189 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 190 free(poller); 191 } 192 193 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 194 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 195 SPDK_WARNLOG("poller %s still registered at thread exit\n", 196 poller->name); 197 } 198 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 199 free(poller); 200 } 201 202 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 203 SPDK_WARNLOG("poller %s still registered at thread exit\n", poller->name); 204 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 205 free(poller); 206 } 207 208 pthread_mutex_lock(&g_devlist_mutex); 209 assert(g_thread_count > 0); 210 g_thread_count--; 211 TAILQ_REMOVE(&g_threads, thread, tailq); 212 pthread_mutex_unlock(&g_devlist_mutex); 213 214 msg = SLIST_FIRST(&thread->msg_cache); 215 while (msg != NULL) { 216 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 217 218 assert(thread->msg_cache_count > 0); 219 thread->msg_cache_count--; 220 spdk_mempool_put(g_spdk_msg_mempool, msg); 221 222 msg = SLIST_FIRST(&thread->msg_cache); 223 } 224 225 assert(thread->msg_cache_count == 0); 226 227 spdk_ring_free(thread->messages); 228 free(thread); 229 } 230 231 struct spdk_thread * 232 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 233 { 234 struct spdk_thread *thread; 235 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 236 int rc = 0, i; 237 238 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 239 if (!thread) { 240 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 241 return NULL; 242 } 243 244 if (cpumask) { 245 spdk_cpuset_copy(&thread->cpumask, cpumask); 246 } else { 247 spdk_cpuset_negate(&thread->cpumask); 248 } 249 250 TAILQ_INIT(&thread->io_channels); 251 TAILQ_INIT(&thread->active_pollers); 252 TAILQ_INIT(&thread->timed_pollers); 253 TAILQ_INIT(&thread->paused_pollers); 254 SLIST_INIT(&thread->msg_cache); 255 thread->msg_cache_count = 0; 256 257 thread->tsc_last = spdk_get_ticks(); 258 259 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 260 if (!thread->messages) { 261 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 262 free(thread); 263 return NULL; 264 } 265 266 /* Fill the local message pool cache. */ 267 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 268 if (rc == 0) { 269 /* If we can't populate the cache it's ok. The cache will get filled 270 * up organically as messages are passed to the thread. */ 271 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 272 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 273 thread->msg_cache_count++; 274 } 275 } 276 277 if (name) { 278 snprintf(thread->name, sizeof(thread->name), "%s", name); 279 } else { 280 snprintf(thread->name, sizeof(thread->name), "%p", thread); 281 } 282 283 pthread_mutex_lock(&g_devlist_mutex); 284 if (g_thread_id == 0) { 285 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 286 pthread_mutex_unlock(&g_devlist_mutex); 287 _free_thread(thread); 288 return NULL; 289 } 290 thread->id = g_thread_id++; 291 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 292 g_thread_count++; 293 pthread_mutex_unlock(&g_devlist_mutex); 294 295 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread (%" PRIu64 ", %s)\n", 296 thread->id, thread->name); 297 298 if (g_new_thread_fn) { 299 rc = g_new_thread_fn(thread); 300 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 301 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 302 } 303 304 if (rc != 0) { 305 _free_thread(thread); 306 return NULL; 307 } 308 309 thread->state = SPDK_THREAD_STATE_RUNNING; 310 311 return thread; 312 } 313 314 void 315 spdk_set_thread(struct spdk_thread *thread) 316 { 317 tls_thread = thread; 318 } 319 320 static void 321 thread_exit(struct spdk_thread *thread, uint64_t now) 322 { 323 struct spdk_poller *poller; 324 struct spdk_io_channel *ch; 325 326 if (now >= thread->exit_timeout_tsc) { 327 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 328 thread->name); 329 goto exited; 330 } 331 332 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 333 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 334 SPDK_INFOLOG(SPDK_LOG_THREAD, 335 "thread %s still has active poller %s\n", 336 thread->name, poller->name); 337 return; 338 } 339 } 340 341 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 342 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 343 SPDK_INFOLOG(SPDK_LOG_THREAD, 344 "thread %s still has active timed poller %s\n", 345 thread->name, poller->name); 346 return; 347 } 348 } 349 350 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 351 SPDK_INFOLOG(SPDK_LOG_THREAD, 352 "thread %s still has paused poller %s\n", 353 thread->name, poller->name); 354 return; 355 } 356 357 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 358 SPDK_INFOLOG(SPDK_LOG_THREAD, 359 "thread %s still has channel for io_device %s\n", 360 thread->name, ch->dev->name); 361 return; 362 } 363 364 exited: 365 thread->state = SPDK_THREAD_STATE_EXITED; 366 } 367 368 int 369 spdk_thread_exit(struct spdk_thread *thread) 370 { 371 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); 372 373 assert(tls_thread == thread); 374 375 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 376 SPDK_INFOLOG(SPDK_LOG_THREAD, 377 "thread %s is already exiting\n", 378 thread->name); 379 return 0; 380 } 381 382 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 383 SPDK_THREAD_EXIT_TIMEOUT_SEC); 384 thread->state = SPDK_THREAD_STATE_EXITING; 385 return 0; 386 } 387 388 bool 389 spdk_thread_is_exited(struct spdk_thread *thread) 390 { 391 return thread->state == SPDK_THREAD_STATE_EXITED; 392 } 393 394 void 395 spdk_thread_destroy(struct spdk_thread *thread) 396 { 397 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); 398 399 assert(thread->state == SPDK_THREAD_STATE_EXITED); 400 401 if (tls_thread == thread) { 402 tls_thread = NULL; 403 } 404 405 _free_thread(thread); 406 } 407 408 void * 409 spdk_thread_get_ctx(struct spdk_thread *thread) 410 { 411 if (g_ctx_sz > 0) { 412 return thread->ctx; 413 } 414 415 return NULL; 416 } 417 418 struct spdk_cpuset * 419 spdk_thread_get_cpumask(struct spdk_thread *thread) 420 { 421 return &thread->cpumask; 422 } 423 424 int 425 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 426 { 427 struct spdk_thread *thread; 428 429 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 430 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 431 assert(false); 432 return -ENOTSUP; 433 } 434 435 thread = spdk_get_thread(); 436 if (!thread) { 437 SPDK_ERRLOG("Called from non-SPDK thread\n"); 438 assert(false); 439 return -EINVAL; 440 } 441 442 spdk_cpuset_copy(&thread->cpumask, cpumask); 443 444 /* Invoke framework's reschedule operation. If this function is called multiple times 445 * in a single spdk_thread_poll() context, the last cpumask will be used in the 446 * reschedule operation. 447 */ 448 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 449 450 return 0; 451 } 452 453 struct spdk_thread * 454 spdk_thread_get_from_ctx(void *ctx) 455 { 456 if (ctx == NULL) { 457 assert(false); 458 return NULL; 459 } 460 461 assert(g_ctx_sz > 0); 462 463 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 464 } 465 466 static inline uint32_t 467 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 468 { 469 unsigned count, i; 470 void *messages[SPDK_MSG_BATCH_SIZE]; 471 472 #ifdef DEBUG 473 /* 474 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 475 * so we will never actually read uninitialized data from events, but just to be sure 476 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 477 */ 478 memset(messages, 0, sizeof(messages)); 479 #endif 480 481 if (max_msgs > 0) { 482 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 483 } else { 484 max_msgs = SPDK_MSG_BATCH_SIZE; 485 } 486 487 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 488 if (count == 0) { 489 return 0; 490 } 491 492 for (i = 0; i < count; i++) { 493 struct spdk_msg *msg = messages[i]; 494 495 assert(msg != NULL); 496 msg->fn(msg->arg); 497 498 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 499 /* Insert the messages at the head. We want to re-use the hot 500 * ones. */ 501 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 502 thread->msg_cache_count++; 503 } else { 504 spdk_mempool_put(g_spdk_msg_mempool, msg); 505 } 506 } 507 508 return count; 509 } 510 511 static void 512 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 513 { 514 struct spdk_poller *iter; 515 516 poller->next_run_tick = now + poller->period_ticks; 517 518 /* 519 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 520 * run time. 521 */ 522 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 523 if (iter->next_run_tick <= poller->next_run_tick) { 524 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 525 return; 526 } 527 } 528 529 /* No earlier pollers were found, so this poller must be the new head */ 530 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 531 } 532 533 static void 534 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 535 { 536 if (poller->period_ticks) { 537 poller_insert_timer(thread, poller, spdk_get_ticks()); 538 } else { 539 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 540 } 541 } 542 543 static inline void 544 thread_update_stats(struct spdk_thread *thread, uint64_t end, 545 uint64_t start, int rc) 546 { 547 if (rc == 0) { 548 /* Poller status idle */ 549 thread->stats.idle_tsc += end - start; 550 } else if (rc > 0) { 551 /* Poller status busy */ 552 thread->stats.busy_tsc += end - start; 553 } 554 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 555 thread->tsc_last = end; 556 } 557 558 static int 559 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 560 { 561 uint32_t msg_count; 562 struct spdk_poller *poller, *tmp; 563 spdk_msg_fn critical_msg; 564 int rc = 0; 565 566 critical_msg = thread->critical_msg; 567 if (spdk_unlikely(critical_msg != NULL)) { 568 critical_msg(NULL); 569 thread->critical_msg = NULL; 570 } 571 572 msg_count = msg_queue_run_batch(thread, max_msgs); 573 if (msg_count) { 574 rc = 1; 575 } 576 577 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 578 active_pollers_head, tailq, tmp) { 579 int poller_rc; 580 581 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 582 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 583 free(poller); 584 continue; 585 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 586 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 587 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 588 poller->state = SPDK_POLLER_STATE_PAUSED; 589 continue; 590 } 591 592 poller->state = SPDK_POLLER_STATE_RUNNING; 593 poller_rc = poller->fn(poller->arg); 594 595 poller->run_count++; 596 if (poller_rc > 0) { 597 poller->busy_count++; 598 } 599 600 #ifdef DEBUG 601 if (poller_rc == -1) { 602 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %s returned -1\n", poller->name); 603 } 604 #endif 605 606 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 607 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 608 free(poller); 609 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 610 poller->state = SPDK_POLLER_STATE_WAITING; 611 } 612 613 if (poller_rc > rc) { 614 rc = poller_rc; 615 } 616 } 617 618 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 619 int timer_rc = 0; 620 621 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 622 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 623 free(poller); 624 continue; 625 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 626 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 627 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 628 poller->state = SPDK_POLLER_STATE_PAUSED; 629 continue; 630 } 631 632 if (now < poller->next_run_tick) { 633 break; 634 } 635 636 poller->state = SPDK_POLLER_STATE_RUNNING; 637 timer_rc = poller->fn(poller->arg); 638 639 poller->run_count++; 640 if (timer_rc > 0) { 641 poller->busy_count++; 642 } 643 644 #ifdef DEBUG 645 if (timer_rc == -1) { 646 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %s returned -1\n", poller->name); 647 } 648 #endif 649 650 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 651 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 652 free(poller); 653 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 654 poller->state = SPDK_POLLER_STATE_WAITING; 655 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 656 poller_insert_timer(thread, poller, now); 657 } 658 659 if (timer_rc > rc) { 660 rc = timer_rc; 661 } 662 } 663 664 return rc; 665 } 666 667 int 668 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 669 { 670 struct spdk_thread *orig_thread; 671 int rc; 672 673 orig_thread = _get_thread(); 674 tls_thread = thread; 675 676 if (now == 0) { 677 now = spdk_get_ticks(); 678 } 679 680 rc = thread_poll(thread, max_msgs, now); 681 682 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 683 thread_exit(thread, now); 684 } 685 686 thread_update_stats(thread, spdk_get_ticks(), now, rc); 687 688 tls_thread = orig_thread; 689 690 return rc; 691 } 692 693 uint64_t 694 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 695 { 696 struct spdk_poller *poller; 697 698 poller = TAILQ_FIRST(&thread->timed_pollers); 699 if (poller) { 700 return poller->next_run_tick; 701 } 702 703 return 0; 704 } 705 706 int 707 spdk_thread_has_active_pollers(struct spdk_thread *thread) 708 { 709 return !TAILQ_EMPTY(&thread->active_pollers); 710 } 711 712 static bool 713 thread_has_unpaused_pollers(struct spdk_thread *thread) 714 { 715 if (TAILQ_EMPTY(&thread->active_pollers) && 716 TAILQ_EMPTY(&thread->timed_pollers)) { 717 return false; 718 } 719 720 return true; 721 } 722 723 bool 724 spdk_thread_has_pollers(struct spdk_thread *thread) 725 { 726 if (!thread_has_unpaused_pollers(thread) && 727 TAILQ_EMPTY(&thread->paused_pollers)) { 728 return false; 729 } 730 731 return true; 732 } 733 734 bool 735 spdk_thread_is_idle(struct spdk_thread *thread) 736 { 737 if (spdk_ring_count(thread->messages) || 738 thread_has_unpaused_pollers(thread) || 739 thread->critical_msg != NULL) { 740 return false; 741 } 742 743 return true; 744 } 745 746 uint32_t 747 spdk_thread_get_count(void) 748 { 749 /* 750 * Return cached value of the current thread count. We could acquire the 751 * lock and iterate through the TAILQ of threads to count them, but that 752 * count could still be invalidated after we release the lock. 753 */ 754 return g_thread_count; 755 } 756 757 struct spdk_thread * 758 spdk_get_thread(void) 759 { 760 return _get_thread(); 761 } 762 763 const char * 764 spdk_thread_get_name(const struct spdk_thread *thread) 765 { 766 return thread->name; 767 } 768 769 uint64_t 770 spdk_thread_get_id(const struct spdk_thread *thread) 771 { 772 return thread->id; 773 } 774 775 struct spdk_thread * 776 spdk_thread_get_by_id(uint64_t id) 777 { 778 struct spdk_thread *thread; 779 780 pthread_mutex_lock(&g_devlist_mutex); 781 TAILQ_FOREACH(thread, &g_threads, tailq) { 782 if (thread->id == id) { 783 pthread_mutex_unlock(&g_devlist_mutex); 784 785 return thread; 786 } 787 } 788 pthread_mutex_unlock(&g_devlist_mutex); 789 790 return NULL; 791 } 792 793 int 794 spdk_thread_get_stats(struct spdk_thread_stats *stats) 795 { 796 struct spdk_thread *thread; 797 798 thread = _get_thread(); 799 if (!thread) { 800 SPDK_ERRLOG("No thread allocated\n"); 801 return -EINVAL; 802 } 803 804 if (stats == NULL) { 805 return -EINVAL; 806 } 807 808 *stats = thread->stats; 809 810 return 0; 811 } 812 813 uint64_t 814 spdk_thread_get_last_tsc(struct spdk_thread *thread) 815 { 816 return thread->tsc_last; 817 } 818 819 int 820 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 821 { 822 struct spdk_thread *local_thread; 823 struct spdk_msg *msg; 824 int rc; 825 826 assert(thread != NULL); 827 828 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 829 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 830 return -EIO; 831 } 832 833 local_thread = _get_thread(); 834 835 msg = NULL; 836 if (local_thread != NULL) { 837 if (local_thread->msg_cache_count > 0) { 838 msg = SLIST_FIRST(&local_thread->msg_cache); 839 assert(msg != NULL); 840 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 841 local_thread->msg_cache_count--; 842 } 843 } 844 845 if (msg == NULL) { 846 msg = spdk_mempool_get(g_spdk_msg_mempool); 847 if (!msg) { 848 SPDK_ERRLOG("msg could not be allocated\n"); 849 return -ENOMEM; 850 } 851 } 852 853 msg->fn = fn; 854 msg->arg = ctx; 855 856 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 857 if (rc != 1) { 858 SPDK_ERRLOG("msg could not be enqueued\n"); 859 spdk_mempool_put(g_spdk_msg_mempool, msg); 860 return -EIO; 861 } 862 863 return 0; 864 } 865 866 int 867 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 868 { 869 spdk_msg_fn expected = NULL; 870 871 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 872 __ATOMIC_SEQ_CST)) { 873 return 0; 874 } 875 876 return -EIO; 877 } 878 879 static struct spdk_poller * 880 poller_register(spdk_poller_fn fn, 881 void *arg, 882 uint64_t period_microseconds, 883 const char *name) 884 { 885 struct spdk_thread *thread; 886 struct spdk_poller *poller; 887 uint64_t quotient, remainder, ticks; 888 889 thread = spdk_get_thread(); 890 if (!thread) { 891 assert(false); 892 return NULL; 893 } 894 895 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 896 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 897 return NULL; 898 } 899 900 poller = calloc(1, sizeof(*poller)); 901 if (poller == NULL) { 902 SPDK_ERRLOG("Poller memory allocation failed\n"); 903 return NULL; 904 } 905 906 if (name) { 907 snprintf(poller->name, sizeof(poller->name), "%s", name); 908 } else { 909 snprintf(poller->name, sizeof(poller->name), "%p", fn); 910 } 911 912 poller->state = SPDK_POLLER_STATE_WAITING; 913 poller->fn = fn; 914 poller->arg = arg; 915 poller->thread = thread; 916 917 if (period_microseconds) { 918 quotient = period_microseconds / SPDK_SEC_TO_USEC; 919 remainder = period_microseconds % SPDK_SEC_TO_USEC; 920 ticks = spdk_get_ticks_hz(); 921 922 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 923 } else { 924 poller->period_ticks = 0; 925 } 926 927 thread_insert_poller(thread, poller); 928 929 return poller; 930 } 931 932 struct spdk_poller * 933 spdk_poller_register(spdk_poller_fn fn, 934 void *arg, 935 uint64_t period_microseconds) 936 { 937 return poller_register(fn, arg, period_microseconds, NULL); 938 } 939 940 struct spdk_poller * 941 spdk_poller_register_named(spdk_poller_fn fn, 942 void *arg, 943 uint64_t period_microseconds, 944 const char *name) 945 { 946 return poller_register(fn, arg, period_microseconds, name); 947 } 948 949 void 950 spdk_poller_unregister(struct spdk_poller **ppoller) 951 { 952 struct spdk_thread *thread; 953 struct spdk_poller *poller; 954 955 poller = *ppoller; 956 if (poller == NULL) { 957 return; 958 } 959 960 *ppoller = NULL; 961 962 thread = spdk_get_thread(); 963 if (!thread) { 964 assert(false); 965 return; 966 } 967 968 if (poller->thread != thread) { 969 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 970 assert(false); 971 return; 972 } 973 974 /* If the poller was paused, put it on the active_pollers list so that 975 * its unregistration can be processed by spdk_thread_poll(). 976 */ 977 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 978 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 979 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 980 poller->period_ticks = 0; 981 } 982 983 /* Simply set the state to unregistered. The poller will get cleaned up 984 * in a subsequent call to spdk_thread_poll(). 985 */ 986 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 987 } 988 989 void 990 spdk_poller_pause(struct spdk_poller *poller) 991 { 992 struct spdk_thread *thread; 993 994 if (poller->state == SPDK_POLLER_STATE_PAUSED || 995 poller->state == SPDK_POLLER_STATE_PAUSING) { 996 return; 997 } 998 999 thread = spdk_get_thread(); 1000 if (!thread) { 1001 assert(false); 1002 return; 1003 } 1004 1005 /* If a poller is paused from within itself, we can immediately move it 1006 * on the paused_pollers list. Otherwise we just set its state to 1007 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 1008 * allows a poller to be paused from another one's context without 1009 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 1010 */ 1011 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 1012 poller->state = SPDK_POLLER_STATE_PAUSING; 1013 } else { 1014 if (poller->period_ticks > 0) { 1015 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 1016 } else { 1017 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1018 } 1019 1020 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1021 poller->state = SPDK_POLLER_STATE_PAUSED; 1022 } 1023 } 1024 1025 void 1026 spdk_poller_resume(struct spdk_poller *poller) 1027 { 1028 struct spdk_thread *thread; 1029 1030 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1031 poller->state != SPDK_POLLER_STATE_PAUSING) { 1032 return; 1033 } 1034 1035 thread = spdk_get_thread(); 1036 if (!thread) { 1037 assert(false); 1038 return; 1039 } 1040 1041 /* If a poller is paused it has to be removed from the paused pollers 1042 * list and put on the active / timer list depending on its 1043 * period_ticks. If a poller is still in the process of being paused, 1044 * we just need to flip its state back to waiting, as it's already on 1045 * the appropriate list. 1046 */ 1047 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1048 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1049 thread_insert_poller(thread, poller); 1050 } 1051 1052 poller->state = SPDK_POLLER_STATE_WAITING; 1053 } 1054 1055 const char * 1056 spdk_poller_state_str(enum spdk_poller_state state) 1057 { 1058 switch (state) { 1059 case SPDK_POLLER_STATE_WAITING: 1060 return "waiting"; 1061 case SPDK_POLLER_STATE_RUNNING: 1062 return "running"; 1063 case SPDK_POLLER_STATE_UNREGISTERED: 1064 return "unregistered"; 1065 case SPDK_POLLER_STATE_PAUSING: 1066 return "pausing"; 1067 case SPDK_POLLER_STATE_PAUSED: 1068 return "paused"; 1069 default: 1070 return NULL; 1071 } 1072 } 1073 1074 struct call_thread { 1075 struct spdk_thread *cur_thread; 1076 spdk_msg_fn fn; 1077 void *ctx; 1078 1079 struct spdk_thread *orig_thread; 1080 spdk_msg_fn cpl; 1081 }; 1082 1083 static void 1084 _on_thread(void *ctx) 1085 { 1086 struct call_thread *ct = ctx; 1087 int rc __attribute__((unused)); 1088 1089 ct->fn(ct->ctx); 1090 1091 pthread_mutex_lock(&g_devlist_mutex); 1092 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1093 pthread_mutex_unlock(&g_devlist_mutex); 1094 1095 if (!ct->cur_thread) { 1096 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 1097 1098 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1099 free(ctx); 1100 } else { 1101 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 1102 ct->cur_thread->name); 1103 1104 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1105 } 1106 assert(rc == 0); 1107 } 1108 1109 void 1110 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1111 { 1112 struct call_thread *ct; 1113 struct spdk_thread *thread; 1114 int rc __attribute__((unused)); 1115 1116 ct = calloc(1, sizeof(*ct)); 1117 if (!ct) { 1118 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1119 cpl(ctx); 1120 return; 1121 } 1122 1123 ct->fn = fn; 1124 ct->ctx = ctx; 1125 ct->cpl = cpl; 1126 1127 thread = _get_thread(); 1128 if (!thread) { 1129 SPDK_ERRLOG("No thread allocated\n"); 1130 free(ct); 1131 cpl(ctx); 1132 return; 1133 } 1134 ct->orig_thread = thread; 1135 1136 pthread_mutex_lock(&g_devlist_mutex); 1137 ct->cur_thread = TAILQ_FIRST(&g_threads); 1138 pthread_mutex_unlock(&g_devlist_mutex); 1139 1140 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 1141 ct->orig_thread->name); 1142 1143 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1144 assert(rc == 0); 1145 } 1146 1147 void 1148 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1149 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1150 const char *name) 1151 { 1152 struct io_device *dev, *tmp; 1153 struct spdk_thread *thread; 1154 1155 assert(io_device != NULL); 1156 assert(create_cb != NULL); 1157 assert(destroy_cb != NULL); 1158 1159 thread = spdk_get_thread(); 1160 if (!thread) { 1161 SPDK_ERRLOG("called from non-SPDK thread\n"); 1162 assert(false); 1163 return; 1164 } 1165 1166 dev = calloc(1, sizeof(struct io_device)); 1167 if (dev == NULL) { 1168 SPDK_ERRLOG("could not allocate io_device\n"); 1169 return; 1170 } 1171 1172 dev->io_device = io_device; 1173 if (name) { 1174 snprintf(dev->name, sizeof(dev->name), "%s", name); 1175 } else { 1176 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1177 } 1178 dev->create_cb = create_cb; 1179 dev->destroy_cb = destroy_cb; 1180 dev->unregister_cb = NULL; 1181 dev->ctx_size = ctx_size; 1182 dev->for_each_count = 0; 1183 dev->unregistered = false; 1184 dev->refcnt = 0; 1185 1186 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 1187 dev->name, dev->io_device, thread->name); 1188 1189 pthread_mutex_lock(&g_devlist_mutex); 1190 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1191 if (tmp->io_device == io_device) { 1192 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1193 io_device, tmp->name, dev->name); 1194 free(dev); 1195 pthread_mutex_unlock(&g_devlist_mutex); 1196 return; 1197 } 1198 } 1199 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1200 pthread_mutex_unlock(&g_devlist_mutex); 1201 } 1202 1203 static void 1204 _finish_unregister(void *arg) 1205 { 1206 struct io_device *dev = arg; 1207 1208 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1209 dev->name, dev->io_device, dev->unregister_thread->name); 1210 1211 dev->unregister_cb(dev->io_device); 1212 free(dev); 1213 } 1214 1215 static void 1216 io_device_free(struct io_device *dev) 1217 { 1218 int rc __attribute__((unused)); 1219 1220 if (dev->unregister_cb == NULL) { 1221 free(dev); 1222 } else { 1223 assert(dev->unregister_thread != NULL); 1224 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 1225 dev->name, dev->io_device, dev->unregister_thread->name); 1226 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1227 assert(rc == 0); 1228 } 1229 } 1230 1231 void 1232 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1233 { 1234 struct io_device *dev; 1235 uint32_t refcnt; 1236 struct spdk_thread *thread; 1237 1238 thread = spdk_get_thread(); 1239 if (!thread) { 1240 SPDK_ERRLOG("called from non-SPDK thread\n"); 1241 assert(false); 1242 return; 1243 } 1244 1245 pthread_mutex_lock(&g_devlist_mutex); 1246 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1247 if (dev->io_device == io_device) { 1248 break; 1249 } 1250 } 1251 1252 if (!dev) { 1253 SPDK_ERRLOG("io_device %p not found\n", io_device); 1254 assert(false); 1255 pthread_mutex_unlock(&g_devlist_mutex); 1256 return; 1257 } 1258 1259 if (dev->for_each_count > 0) { 1260 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1261 dev->name, io_device, dev->for_each_count); 1262 pthread_mutex_unlock(&g_devlist_mutex); 1263 return; 1264 } 1265 1266 dev->unregister_cb = unregister_cb; 1267 dev->unregistered = true; 1268 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1269 refcnt = dev->refcnt; 1270 dev->unregister_thread = thread; 1271 pthread_mutex_unlock(&g_devlist_mutex); 1272 1273 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 1274 dev->name, dev->io_device, thread->name); 1275 1276 if (refcnt > 0) { 1277 /* defer deletion */ 1278 return; 1279 } 1280 1281 io_device_free(dev); 1282 } 1283 1284 const char * 1285 spdk_io_device_get_name(struct io_device *dev) 1286 { 1287 return dev->name; 1288 } 1289 1290 struct spdk_io_channel * 1291 spdk_get_io_channel(void *io_device) 1292 { 1293 struct spdk_io_channel *ch; 1294 struct spdk_thread *thread; 1295 struct io_device *dev; 1296 int rc; 1297 1298 pthread_mutex_lock(&g_devlist_mutex); 1299 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1300 if (dev->io_device == io_device) { 1301 break; 1302 } 1303 } 1304 if (dev == NULL) { 1305 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1306 pthread_mutex_unlock(&g_devlist_mutex); 1307 return NULL; 1308 } 1309 1310 thread = _get_thread(); 1311 if (!thread) { 1312 SPDK_ERRLOG("No thread allocated\n"); 1313 pthread_mutex_unlock(&g_devlist_mutex); 1314 return NULL; 1315 } 1316 1317 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1318 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1319 pthread_mutex_unlock(&g_devlist_mutex); 1320 return NULL; 1321 } 1322 1323 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1324 if (ch->dev == dev) { 1325 ch->ref++; 1326 1327 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1328 ch, dev->name, dev->io_device, thread->name, ch->ref); 1329 1330 /* 1331 * An I/O channel already exists for this device on this 1332 * thread, so return it. 1333 */ 1334 pthread_mutex_unlock(&g_devlist_mutex); 1335 return ch; 1336 } 1337 } 1338 1339 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1340 if (ch == NULL) { 1341 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1342 pthread_mutex_unlock(&g_devlist_mutex); 1343 return NULL; 1344 } 1345 1346 ch->dev = dev; 1347 ch->destroy_cb = dev->destroy_cb; 1348 ch->thread = thread; 1349 ch->ref = 1; 1350 ch->destroy_ref = 0; 1351 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1352 1353 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1354 ch, dev->name, dev->io_device, thread->name, ch->ref); 1355 1356 dev->refcnt++; 1357 1358 pthread_mutex_unlock(&g_devlist_mutex); 1359 1360 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1361 if (rc != 0) { 1362 pthread_mutex_lock(&g_devlist_mutex); 1363 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1364 dev->refcnt--; 1365 free(ch); 1366 pthread_mutex_unlock(&g_devlist_mutex); 1367 return NULL; 1368 } 1369 1370 return ch; 1371 } 1372 1373 static void 1374 put_io_channel(void *arg) 1375 { 1376 struct spdk_io_channel *ch = arg; 1377 bool do_remove_dev = true; 1378 struct spdk_thread *thread; 1379 1380 thread = spdk_get_thread(); 1381 if (!thread) { 1382 SPDK_ERRLOG("called from non-SPDK thread\n"); 1383 assert(false); 1384 return; 1385 } 1386 1387 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1388 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1389 ch, ch->dev->name, ch->dev->io_device, thread->name); 1390 1391 assert(ch->thread == thread); 1392 1393 ch->destroy_ref--; 1394 1395 if (ch->ref > 0 || ch->destroy_ref > 0) { 1396 /* 1397 * Another reference to the associated io_device was requested 1398 * after this message was sent but before it had a chance to 1399 * execute. 1400 */ 1401 return; 1402 } 1403 1404 pthread_mutex_lock(&g_devlist_mutex); 1405 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1406 pthread_mutex_unlock(&g_devlist_mutex); 1407 1408 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1409 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1410 1411 pthread_mutex_lock(&g_devlist_mutex); 1412 ch->dev->refcnt--; 1413 1414 if (!ch->dev->unregistered) { 1415 do_remove_dev = false; 1416 } 1417 1418 if (ch->dev->refcnt > 0) { 1419 do_remove_dev = false; 1420 } 1421 1422 pthread_mutex_unlock(&g_devlist_mutex); 1423 1424 if (do_remove_dev) { 1425 io_device_free(ch->dev); 1426 } 1427 free(ch); 1428 } 1429 1430 void 1431 spdk_put_io_channel(struct spdk_io_channel *ch) 1432 { 1433 struct spdk_thread *thread; 1434 int rc __attribute__((unused)); 1435 1436 thread = spdk_get_thread(); 1437 if (!thread) { 1438 SPDK_ERRLOG("called from non-SPDK thread\n"); 1439 assert(false); 1440 return; 1441 } 1442 1443 if (ch->thread != thread) { 1444 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1445 assert(false); 1446 return; 1447 } 1448 1449 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1450 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1451 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1452 1453 ch->ref--; 1454 1455 if (ch->ref == 0) { 1456 ch->destroy_ref++; 1457 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 1458 assert(rc == 0); 1459 } 1460 } 1461 1462 struct spdk_io_channel * 1463 spdk_io_channel_from_ctx(void *ctx) 1464 { 1465 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1466 } 1467 1468 struct spdk_thread * 1469 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1470 { 1471 return ch->thread; 1472 } 1473 1474 struct spdk_io_channel_iter { 1475 void *io_device; 1476 struct io_device *dev; 1477 spdk_channel_msg fn; 1478 int status; 1479 void *ctx; 1480 struct spdk_io_channel *ch; 1481 1482 struct spdk_thread *cur_thread; 1483 1484 struct spdk_thread *orig_thread; 1485 spdk_channel_for_each_cpl cpl; 1486 }; 1487 1488 void * 1489 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1490 { 1491 return i->io_device; 1492 } 1493 1494 struct spdk_io_channel * 1495 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1496 { 1497 return i->ch; 1498 } 1499 1500 void * 1501 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1502 { 1503 return i->ctx; 1504 } 1505 1506 static void 1507 _call_completion(void *ctx) 1508 { 1509 struct spdk_io_channel_iter *i = ctx; 1510 1511 if (i->cpl != NULL) { 1512 i->cpl(i, i->status); 1513 } 1514 free(i); 1515 } 1516 1517 static void 1518 _call_channel(void *ctx) 1519 { 1520 struct spdk_io_channel_iter *i = ctx; 1521 struct spdk_io_channel *ch; 1522 1523 /* 1524 * It is possible that the channel was deleted before this 1525 * message had a chance to execute. If so, skip calling 1526 * the fn() on this thread. 1527 */ 1528 pthread_mutex_lock(&g_devlist_mutex); 1529 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1530 if (ch->dev->io_device == i->io_device) { 1531 break; 1532 } 1533 } 1534 pthread_mutex_unlock(&g_devlist_mutex); 1535 1536 if (ch) { 1537 i->fn(i); 1538 } else { 1539 spdk_for_each_channel_continue(i, 0); 1540 } 1541 } 1542 1543 void 1544 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1545 spdk_channel_for_each_cpl cpl) 1546 { 1547 struct spdk_thread *thread; 1548 struct spdk_io_channel *ch; 1549 struct spdk_io_channel_iter *i; 1550 int rc __attribute__((unused)); 1551 1552 i = calloc(1, sizeof(*i)); 1553 if (!i) { 1554 SPDK_ERRLOG("Unable to allocate iterator\n"); 1555 return; 1556 } 1557 1558 i->io_device = io_device; 1559 i->fn = fn; 1560 i->ctx = ctx; 1561 i->cpl = cpl; 1562 1563 pthread_mutex_lock(&g_devlist_mutex); 1564 i->orig_thread = _get_thread(); 1565 1566 TAILQ_FOREACH(thread, &g_threads, tailq) { 1567 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1568 if (ch->dev->io_device == io_device) { 1569 ch->dev->for_each_count++; 1570 i->dev = ch->dev; 1571 i->cur_thread = thread; 1572 i->ch = ch; 1573 pthread_mutex_unlock(&g_devlist_mutex); 1574 rc = spdk_thread_send_msg(thread, _call_channel, i); 1575 assert(rc == 0); 1576 return; 1577 } 1578 } 1579 } 1580 1581 pthread_mutex_unlock(&g_devlist_mutex); 1582 1583 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1584 assert(rc == 0); 1585 } 1586 1587 void 1588 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1589 { 1590 struct spdk_thread *thread; 1591 struct spdk_io_channel *ch; 1592 int rc __attribute__((unused)); 1593 1594 assert(i->cur_thread == spdk_get_thread()); 1595 1596 i->status = status; 1597 1598 pthread_mutex_lock(&g_devlist_mutex); 1599 if (status) { 1600 goto end; 1601 } 1602 thread = TAILQ_NEXT(i->cur_thread, tailq); 1603 while (thread) { 1604 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1605 if (ch->dev->io_device == i->io_device) { 1606 i->cur_thread = thread; 1607 i->ch = ch; 1608 pthread_mutex_unlock(&g_devlist_mutex); 1609 rc = spdk_thread_send_msg(thread, _call_channel, i); 1610 assert(rc == 0); 1611 return; 1612 } 1613 } 1614 thread = TAILQ_NEXT(thread, tailq); 1615 } 1616 1617 end: 1618 i->dev->for_each_count--; 1619 i->ch = NULL; 1620 pthread_mutex_unlock(&g_devlist_mutex); 1621 1622 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1623 assert(rc == 0); 1624 } 1625 1626 1627 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1628