1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 43 #include "spdk/log.h" 44 #include "spdk_internal/thread.h" 45 46 #define SPDK_MSG_BATCH_SIZE 8 47 #define SPDK_MAX_DEVICE_NAME_LEN 256 48 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 49 50 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 51 52 static spdk_new_thread_fn g_new_thread_fn = NULL; 53 static spdk_thread_op_fn g_thread_op_fn = NULL; 54 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 55 static size_t g_ctx_sz = 0; 56 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 57 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 58 * SPDK application is required. 59 */ 60 static uint64_t g_thread_id = 1; 61 62 struct io_device { 63 void *io_device; 64 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 65 spdk_io_channel_create_cb create_cb; 66 spdk_io_channel_destroy_cb destroy_cb; 67 spdk_io_device_unregister_cb unregister_cb; 68 struct spdk_thread *unregister_thread; 69 uint32_t ctx_size; 70 uint32_t for_each_count; 71 TAILQ_ENTRY(io_device) tailq; 72 73 uint32_t refcnt; 74 75 bool unregistered; 76 }; 77 78 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 79 80 struct spdk_msg { 81 spdk_msg_fn fn; 82 void *arg; 83 84 SLIST_ENTRY(spdk_msg) link; 85 }; 86 87 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 88 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 89 90 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 91 static uint32_t g_thread_count = 0; 92 93 static __thread struct spdk_thread *tls_thread = NULL; 94 95 static inline struct spdk_thread * 96 _get_thread(void) 97 { 98 return tls_thread; 99 } 100 101 static int 102 _thread_lib_init(size_t ctx_sz) 103 { 104 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 105 106 g_ctx_sz = ctx_sz; 107 108 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 109 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 110 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 111 sizeof(struct spdk_msg), 112 0, /* No cache. We do our own. */ 113 SPDK_ENV_SOCKET_ID_ANY); 114 115 if (!g_spdk_msg_mempool) { 116 return -1; 117 } 118 119 return 0; 120 } 121 122 int 123 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 124 { 125 assert(g_new_thread_fn == NULL); 126 assert(g_thread_op_fn == NULL); 127 128 if (new_thread_fn == NULL) { 129 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 130 } else { 131 g_new_thread_fn = new_thread_fn; 132 } 133 134 return _thread_lib_init(ctx_sz); 135 } 136 137 int 138 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 139 spdk_thread_op_supported_fn thread_op_supported_fn, 140 size_t ctx_sz) 141 { 142 assert(g_new_thread_fn == NULL); 143 assert(g_thread_op_fn == NULL); 144 assert(g_thread_op_supported_fn == NULL); 145 146 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 147 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 148 return -EINVAL; 149 } 150 151 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 152 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 153 } else { 154 g_thread_op_fn = thread_op_fn; 155 g_thread_op_supported_fn = thread_op_supported_fn; 156 } 157 158 return _thread_lib_init(ctx_sz); 159 } 160 161 void 162 spdk_thread_lib_fini(void) 163 { 164 struct io_device *dev; 165 166 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 167 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 168 } 169 170 if (g_spdk_msg_mempool) { 171 spdk_mempool_free(g_spdk_msg_mempool); 172 g_spdk_msg_mempool = NULL; 173 } 174 175 g_new_thread_fn = NULL; 176 g_thread_op_fn = NULL; 177 g_thread_op_supported_fn = NULL; 178 g_ctx_sz = 0; 179 } 180 181 static void 182 _free_thread(struct spdk_thread *thread) 183 { 184 struct spdk_io_channel *ch; 185 struct spdk_msg *msg; 186 struct spdk_poller *poller, *ptmp; 187 188 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 189 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 190 thread->name, ch->dev->name); 191 } 192 193 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 194 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 195 SPDK_WARNLOG("poller %s still registered at thread exit\n", 196 poller->name); 197 } 198 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 199 free(poller); 200 } 201 202 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 203 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 204 SPDK_WARNLOG("poller %s still registered at thread exit\n", 205 poller->name); 206 } 207 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 208 free(poller); 209 } 210 211 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 212 SPDK_WARNLOG("poller %s still registered at thread exit\n", poller->name); 213 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 214 free(poller); 215 } 216 217 pthread_mutex_lock(&g_devlist_mutex); 218 assert(g_thread_count > 0); 219 g_thread_count--; 220 TAILQ_REMOVE(&g_threads, thread, tailq); 221 pthread_mutex_unlock(&g_devlist_mutex); 222 223 msg = SLIST_FIRST(&thread->msg_cache); 224 while (msg != NULL) { 225 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 226 227 assert(thread->msg_cache_count > 0); 228 thread->msg_cache_count--; 229 spdk_mempool_put(g_spdk_msg_mempool, msg); 230 231 msg = SLIST_FIRST(&thread->msg_cache); 232 } 233 234 assert(thread->msg_cache_count == 0); 235 236 spdk_ring_free(thread->messages); 237 free(thread); 238 } 239 240 struct spdk_thread * 241 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 242 { 243 struct spdk_thread *thread; 244 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 245 int rc = 0, i; 246 247 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 248 if (!thread) { 249 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 250 return NULL; 251 } 252 253 if (cpumask) { 254 spdk_cpuset_copy(&thread->cpumask, cpumask); 255 } else { 256 spdk_cpuset_negate(&thread->cpumask); 257 } 258 259 TAILQ_INIT(&thread->io_channels); 260 TAILQ_INIT(&thread->active_pollers); 261 TAILQ_INIT(&thread->timed_pollers); 262 TAILQ_INIT(&thread->paused_pollers); 263 SLIST_INIT(&thread->msg_cache); 264 thread->msg_cache_count = 0; 265 266 thread->tsc_last = spdk_get_ticks(); 267 268 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 269 if (!thread->messages) { 270 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 271 free(thread); 272 return NULL; 273 } 274 275 /* Fill the local message pool cache. */ 276 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 277 if (rc == 0) { 278 /* If we can't populate the cache it's ok. The cache will get filled 279 * up organically as messages are passed to the thread. */ 280 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 281 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 282 thread->msg_cache_count++; 283 } 284 } 285 286 if (name) { 287 snprintf(thread->name, sizeof(thread->name), "%s", name); 288 } else { 289 snprintf(thread->name, sizeof(thread->name), "%p", thread); 290 } 291 292 pthread_mutex_lock(&g_devlist_mutex); 293 if (g_thread_id == 0) { 294 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 295 pthread_mutex_unlock(&g_devlist_mutex); 296 _free_thread(thread); 297 return NULL; 298 } 299 thread->id = g_thread_id++; 300 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 301 g_thread_count++; 302 pthread_mutex_unlock(&g_devlist_mutex); 303 304 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 305 thread->id, thread->name); 306 307 if (g_new_thread_fn) { 308 rc = g_new_thread_fn(thread); 309 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 310 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 311 } 312 313 if (rc != 0) { 314 _free_thread(thread); 315 return NULL; 316 } 317 318 thread->state = SPDK_THREAD_STATE_RUNNING; 319 320 return thread; 321 } 322 323 void 324 spdk_set_thread(struct spdk_thread *thread) 325 { 326 tls_thread = thread; 327 } 328 329 static void 330 thread_exit(struct spdk_thread *thread, uint64_t now) 331 { 332 struct spdk_poller *poller; 333 struct spdk_io_channel *ch; 334 335 if (now >= thread->exit_timeout_tsc) { 336 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 337 thread->name); 338 goto exited; 339 } 340 341 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 342 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 343 SPDK_INFOLOG(thread, 344 "thread %s still has active poller %s\n", 345 thread->name, poller->name); 346 return; 347 } 348 } 349 350 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 351 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 352 SPDK_INFOLOG(thread, 353 "thread %s still has active timed poller %s\n", 354 thread->name, poller->name); 355 return; 356 } 357 } 358 359 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 360 SPDK_INFOLOG(thread, 361 "thread %s still has paused poller %s\n", 362 thread->name, poller->name); 363 return; 364 } 365 366 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 367 SPDK_INFOLOG(thread, 368 "thread %s still has channel for io_device %s\n", 369 thread->name, ch->dev->name); 370 return; 371 } 372 373 exited: 374 thread->state = SPDK_THREAD_STATE_EXITED; 375 } 376 377 int 378 spdk_thread_exit(struct spdk_thread *thread) 379 { 380 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 381 382 assert(tls_thread == thread); 383 384 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 385 SPDK_INFOLOG(thread, 386 "thread %s is already exiting\n", 387 thread->name); 388 return 0; 389 } 390 391 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 392 SPDK_THREAD_EXIT_TIMEOUT_SEC); 393 thread->state = SPDK_THREAD_STATE_EXITING; 394 return 0; 395 } 396 397 bool 398 spdk_thread_is_exited(struct spdk_thread *thread) 399 { 400 return thread->state == SPDK_THREAD_STATE_EXITED; 401 } 402 403 void 404 spdk_thread_destroy(struct spdk_thread *thread) 405 { 406 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 407 408 assert(thread->state == SPDK_THREAD_STATE_EXITED); 409 410 if (tls_thread == thread) { 411 tls_thread = NULL; 412 } 413 414 _free_thread(thread); 415 } 416 417 void * 418 spdk_thread_get_ctx(struct spdk_thread *thread) 419 { 420 if (g_ctx_sz > 0) { 421 return thread->ctx; 422 } 423 424 return NULL; 425 } 426 427 struct spdk_cpuset * 428 spdk_thread_get_cpumask(struct spdk_thread *thread) 429 { 430 return &thread->cpumask; 431 } 432 433 int 434 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 435 { 436 struct spdk_thread *thread; 437 438 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 439 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 440 assert(false); 441 return -ENOTSUP; 442 } 443 444 thread = spdk_get_thread(); 445 if (!thread) { 446 SPDK_ERRLOG("Called from non-SPDK thread\n"); 447 assert(false); 448 return -EINVAL; 449 } 450 451 spdk_cpuset_copy(&thread->cpumask, cpumask); 452 453 /* Invoke framework's reschedule operation. If this function is called multiple times 454 * in a single spdk_thread_poll() context, the last cpumask will be used in the 455 * reschedule operation. 456 */ 457 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 458 459 return 0; 460 } 461 462 struct spdk_thread * 463 spdk_thread_get_from_ctx(void *ctx) 464 { 465 if (ctx == NULL) { 466 assert(false); 467 return NULL; 468 } 469 470 assert(g_ctx_sz > 0); 471 472 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 473 } 474 475 static inline uint32_t 476 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 477 { 478 unsigned count, i; 479 void *messages[SPDK_MSG_BATCH_SIZE]; 480 481 #ifdef DEBUG 482 /* 483 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 484 * so we will never actually read uninitialized data from events, but just to be sure 485 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 486 */ 487 memset(messages, 0, sizeof(messages)); 488 #endif 489 490 if (max_msgs > 0) { 491 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 492 } else { 493 max_msgs = SPDK_MSG_BATCH_SIZE; 494 } 495 496 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 497 if (count == 0) { 498 return 0; 499 } 500 501 for (i = 0; i < count; i++) { 502 struct spdk_msg *msg = messages[i]; 503 504 assert(msg != NULL); 505 msg->fn(msg->arg); 506 507 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 508 /* Insert the messages at the head. We want to re-use the hot 509 * ones. */ 510 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 511 thread->msg_cache_count++; 512 } else { 513 spdk_mempool_put(g_spdk_msg_mempool, msg); 514 } 515 } 516 517 return count; 518 } 519 520 static void 521 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 522 { 523 struct spdk_poller *iter; 524 525 poller->next_run_tick = now + poller->period_ticks; 526 527 /* 528 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 529 * run time. 530 */ 531 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 532 if (iter->next_run_tick <= poller->next_run_tick) { 533 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 534 return; 535 } 536 } 537 538 /* No earlier pollers were found, so this poller must be the new head */ 539 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 540 } 541 542 static void 543 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 544 { 545 if (poller->period_ticks) { 546 poller_insert_timer(thread, poller, spdk_get_ticks()); 547 } else { 548 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 549 } 550 } 551 552 static inline void 553 thread_update_stats(struct spdk_thread *thread, uint64_t end, 554 uint64_t start, int rc) 555 { 556 if (rc == 0) { 557 /* Poller status idle */ 558 thread->stats.idle_tsc += end - start; 559 } else if (rc > 0) { 560 /* Poller status busy */ 561 thread->stats.busy_tsc += end - start; 562 } 563 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 564 thread->tsc_last = end; 565 } 566 567 static int 568 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 569 { 570 uint32_t msg_count; 571 struct spdk_poller *poller, *tmp; 572 spdk_msg_fn critical_msg; 573 int rc = 0; 574 575 thread->tsc_last = now; 576 577 critical_msg = thread->critical_msg; 578 if (spdk_unlikely(critical_msg != NULL)) { 579 critical_msg(NULL); 580 thread->critical_msg = NULL; 581 } 582 583 msg_count = msg_queue_run_batch(thread, max_msgs); 584 if (msg_count) { 585 rc = 1; 586 } 587 588 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 589 active_pollers_head, tailq, tmp) { 590 int poller_rc; 591 592 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 593 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 594 free(poller); 595 continue; 596 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 597 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 598 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 599 poller->state = SPDK_POLLER_STATE_PAUSED; 600 continue; 601 } 602 603 poller->state = SPDK_POLLER_STATE_RUNNING; 604 poller_rc = poller->fn(poller->arg); 605 606 poller->run_count++; 607 if (poller_rc > 0) { 608 poller->busy_count++; 609 } 610 611 #ifdef DEBUG 612 if (poller_rc == -1) { 613 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 614 } 615 #endif 616 617 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 618 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 619 free(poller); 620 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 621 poller->state = SPDK_POLLER_STATE_WAITING; 622 } 623 624 if (poller_rc > rc) { 625 rc = poller_rc; 626 } 627 } 628 629 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 630 int timer_rc = 0; 631 632 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 633 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 634 free(poller); 635 continue; 636 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 637 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 638 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 639 poller->state = SPDK_POLLER_STATE_PAUSED; 640 continue; 641 } 642 643 if (now < poller->next_run_tick) { 644 break; 645 } 646 647 poller->state = SPDK_POLLER_STATE_RUNNING; 648 timer_rc = poller->fn(poller->arg); 649 650 poller->run_count++; 651 if (timer_rc > 0) { 652 poller->busy_count++; 653 } 654 655 #ifdef DEBUG 656 if (timer_rc == -1) { 657 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 658 } 659 #endif 660 661 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 662 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 663 free(poller); 664 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 665 poller->state = SPDK_POLLER_STATE_WAITING; 666 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 667 poller_insert_timer(thread, poller, now); 668 } 669 670 if (timer_rc > rc) { 671 rc = timer_rc; 672 } 673 } 674 675 return rc; 676 } 677 678 int 679 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 680 { 681 struct spdk_thread *orig_thread; 682 int rc; 683 684 orig_thread = _get_thread(); 685 tls_thread = thread; 686 687 if (now == 0) { 688 now = spdk_get_ticks(); 689 } 690 691 rc = thread_poll(thread, max_msgs, now); 692 693 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 694 thread_exit(thread, now); 695 } 696 697 thread_update_stats(thread, spdk_get_ticks(), now, rc); 698 699 tls_thread = orig_thread; 700 701 return rc; 702 } 703 704 uint64_t 705 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 706 { 707 struct spdk_poller *poller; 708 709 poller = TAILQ_FIRST(&thread->timed_pollers); 710 if (poller) { 711 return poller->next_run_tick; 712 } 713 714 return 0; 715 } 716 717 int 718 spdk_thread_has_active_pollers(struct spdk_thread *thread) 719 { 720 return !TAILQ_EMPTY(&thread->active_pollers); 721 } 722 723 static bool 724 thread_has_unpaused_pollers(struct spdk_thread *thread) 725 { 726 if (TAILQ_EMPTY(&thread->active_pollers) && 727 TAILQ_EMPTY(&thread->timed_pollers)) { 728 return false; 729 } 730 731 return true; 732 } 733 734 bool 735 spdk_thread_has_pollers(struct spdk_thread *thread) 736 { 737 if (!thread_has_unpaused_pollers(thread) && 738 TAILQ_EMPTY(&thread->paused_pollers)) { 739 return false; 740 } 741 742 return true; 743 } 744 745 bool 746 spdk_thread_is_idle(struct spdk_thread *thread) 747 { 748 if (spdk_ring_count(thread->messages) || 749 thread_has_unpaused_pollers(thread) || 750 thread->critical_msg != NULL) { 751 return false; 752 } 753 754 return true; 755 } 756 757 uint32_t 758 spdk_thread_get_count(void) 759 { 760 /* 761 * Return cached value of the current thread count. We could acquire the 762 * lock and iterate through the TAILQ of threads to count them, but that 763 * count could still be invalidated after we release the lock. 764 */ 765 return g_thread_count; 766 } 767 768 struct spdk_thread * 769 spdk_get_thread(void) 770 { 771 return _get_thread(); 772 } 773 774 const char * 775 spdk_thread_get_name(const struct spdk_thread *thread) 776 { 777 return thread->name; 778 } 779 780 uint64_t 781 spdk_thread_get_id(const struct spdk_thread *thread) 782 { 783 return thread->id; 784 } 785 786 struct spdk_thread * 787 spdk_thread_get_by_id(uint64_t id) 788 { 789 struct spdk_thread *thread; 790 791 pthread_mutex_lock(&g_devlist_mutex); 792 TAILQ_FOREACH(thread, &g_threads, tailq) { 793 if (thread->id == id) { 794 pthread_mutex_unlock(&g_devlist_mutex); 795 796 return thread; 797 } 798 } 799 pthread_mutex_unlock(&g_devlist_mutex); 800 801 return NULL; 802 } 803 804 int 805 spdk_thread_get_stats(struct spdk_thread_stats *stats) 806 { 807 struct spdk_thread *thread; 808 809 thread = _get_thread(); 810 if (!thread) { 811 SPDK_ERRLOG("No thread allocated\n"); 812 return -EINVAL; 813 } 814 815 if (stats == NULL) { 816 return -EINVAL; 817 } 818 819 *stats = thread->stats; 820 821 return 0; 822 } 823 824 uint64_t 825 spdk_thread_get_last_tsc(struct spdk_thread *thread) 826 { 827 if (thread == NULL) { 828 thread = _get_thread(); 829 } 830 831 return thread->tsc_last; 832 } 833 834 int 835 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 836 { 837 struct spdk_thread *local_thread; 838 struct spdk_msg *msg; 839 int rc; 840 841 assert(thread != NULL); 842 843 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 844 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 845 return -EIO; 846 } 847 848 local_thread = _get_thread(); 849 850 msg = NULL; 851 if (local_thread != NULL) { 852 if (local_thread->msg_cache_count > 0) { 853 msg = SLIST_FIRST(&local_thread->msg_cache); 854 assert(msg != NULL); 855 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 856 local_thread->msg_cache_count--; 857 } 858 } 859 860 if (msg == NULL) { 861 msg = spdk_mempool_get(g_spdk_msg_mempool); 862 if (!msg) { 863 SPDK_ERRLOG("msg could not be allocated\n"); 864 return -ENOMEM; 865 } 866 } 867 868 msg->fn = fn; 869 msg->arg = ctx; 870 871 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 872 if (rc != 1) { 873 SPDK_ERRLOG("msg could not be enqueued\n"); 874 spdk_mempool_put(g_spdk_msg_mempool, msg); 875 return -EIO; 876 } 877 878 return 0; 879 } 880 881 int 882 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 883 { 884 spdk_msg_fn expected = NULL; 885 886 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 887 __ATOMIC_SEQ_CST)) { 888 return 0; 889 } 890 891 return -EIO; 892 } 893 894 static struct spdk_poller * 895 poller_register(spdk_poller_fn fn, 896 void *arg, 897 uint64_t period_microseconds, 898 const char *name) 899 { 900 struct spdk_thread *thread; 901 struct spdk_poller *poller; 902 uint64_t quotient, remainder, ticks; 903 904 thread = spdk_get_thread(); 905 if (!thread) { 906 assert(false); 907 return NULL; 908 } 909 910 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 911 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 912 return NULL; 913 } 914 915 poller = calloc(1, sizeof(*poller)); 916 if (poller == NULL) { 917 SPDK_ERRLOG("Poller memory allocation failed\n"); 918 return NULL; 919 } 920 921 if (name) { 922 snprintf(poller->name, sizeof(poller->name), "%s", name); 923 } else { 924 snprintf(poller->name, sizeof(poller->name), "%p", fn); 925 } 926 927 poller->state = SPDK_POLLER_STATE_WAITING; 928 poller->fn = fn; 929 poller->arg = arg; 930 poller->thread = thread; 931 932 if (period_microseconds) { 933 quotient = period_microseconds / SPDK_SEC_TO_USEC; 934 remainder = period_microseconds % SPDK_SEC_TO_USEC; 935 ticks = spdk_get_ticks_hz(); 936 937 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 938 } else { 939 poller->period_ticks = 0; 940 } 941 942 thread_insert_poller(thread, poller); 943 944 return poller; 945 } 946 947 struct spdk_poller * 948 spdk_poller_register(spdk_poller_fn fn, 949 void *arg, 950 uint64_t period_microseconds) 951 { 952 return poller_register(fn, arg, period_microseconds, NULL); 953 } 954 955 struct spdk_poller * 956 spdk_poller_register_named(spdk_poller_fn fn, 957 void *arg, 958 uint64_t period_microseconds, 959 const char *name) 960 { 961 return poller_register(fn, arg, period_microseconds, name); 962 } 963 964 void 965 spdk_poller_unregister(struct spdk_poller **ppoller) 966 { 967 struct spdk_thread *thread; 968 struct spdk_poller *poller; 969 970 poller = *ppoller; 971 if (poller == NULL) { 972 return; 973 } 974 975 *ppoller = NULL; 976 977 thread = spdk_get_thread(); 978 if (!thread) { 979 assert(false); 980 return; 981 } 982 983 if (poller->thread != thread) { 984 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 985 assert(false); 986 return; 987 } 988 989 /* If the poller was paused, put it on the active_pollers list so that 990 * its unregistration can be processed by spdk_thread_poll(). 991 */ 992 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 993 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 994 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 995 poller->period_ticks = 0; 996 } 997 998 /* Simply set the state to unregistered. The poller will get cleaned up 999 * in a subsequent call to spdk_thread_poll(). 1000 */ 1001 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1002 } 1003 1004 void 1005 spdk_poller_pause(struct spdk_poller *poller) 1006 { 1007 struct spdk_thread *thread; 1008 1009 if (poller->state == SPDK_POLLER_STATE_PAUSED || 1010 poller->state == SPDK_POLLER_STATE_PAUSING) { 1011 return; 1012 } 1013 1014 thread = spdk_get_thread(); 1015 if (!thread) { 1016 assert(false); 1017 return; 1018 } 1019 1020 /* If a poller is paused from within itself, we can immediately move it 1021 * on the paused_pollers list. Otherwise we just set its state to 1022 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 1023 * allows a poller to be paused from another one's context without 1024 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 1025 */ 1026 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 1027 poller->state = SPDK_POLLER_STATE_PAUSING; 1028 } else { 1029 if (poller->period_ticks > 0) { 1030 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 1031 } else { 1032 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1033 } 1034 1035 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1036 poller->state = SPDK_POLLER_STATE_PAUSED; 1037 } 1038 } 1039 1040 void 1041 spdk_poller_resume(struct spdk_poller *poller) 1042 { 1043 struct spdk_thread *thread; 1044 1045 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1046 poller->state != SPDK_POLLER_STATE_PAUSING) { 1047 return; 1048 } 1049 1050 thread = spdk_get_thread(); 1051 if (!thread) { 1052 assert(false); 1053 return; 1054 } 1055 1056 /* If a poller is paused it has to be removed from the paused pollers 1057 * list and put on the active / timer list depending on its 1058 * period_ticks. If a poller is still in the process of being paused, 1059 * we just need to flip its state back to waiting, as it's already on 1060 * the appropriate list. 1061 */ 1062 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1063 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1064 thread_insert_poller(thread, poller); 1065 } 1066 1067 poller->state = SPDK_POLLER_STATE_WAITING; 1068 } 1069 1070 const char * 1071 spdk_poller_state_str(enum spdk_poller_state state) 1072 { 1073 switch (state) { 1074 case SPDK_POLLER_STATE_WAITING: 1075 return "waiting"; 1076 case SPDK_POLLER_STATE_RUNNING: 1077 return "running"; 1078 case SPDK_POLLER_STATE_UNREGISTERED: 1079 return "unregistered"; 1080 case SPDK_POLLER_STATE_PAUSING: 1081 return "pausing"; 1082 case SPDK_POLLER_STATE_PAUSED: 1083 return "paused"; 1084 default: 1085 return NULL; 1086 } 1087 } 1088 1089 struct call_thread { 1090 struct spdk_thread *cur_thread; 1091 spdk_msg_fn fn; 1092 void *ctx; 1093 1094 struct spdk_thread *orig_thread; 1095 spdk_msg_fn cpl; 1096 }; 1097 1098 static void 1099 _on_thread(void *ctx) 1100 { 1101 struct call_thread *ct = ctx; 1102 int rc __attribute__((unused)); 1103 1104 ct->fn(ct->ctx); 1105 1106 pthread_mutex_lock(&g_devlist_mutex); 1107 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1108 pthread_mutex_unlock(&g_devlist_mutex); 1109 1110 if (!ct->cur_thread) { 1111 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1112 1113 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1114 free(ctx); 1115 } else { 1116 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1117 ct->cur_thread->name); 1118 1119 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1120 } 1121 assert(rc == 0); 1122 } 1123 1124 void 1125 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1126 { 1127 struct call_thread *ct; 1128 struct spdk_thread *thread; 1129 int rc __attribute__((unused)); 1130 1131 ct = calloc(1, sizeof(*ct)); 1132 if (!ct) { 1133 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1134 cpl(ctx); 1135 return; 1136 } 1137 1138 ct->fn = fn; 1139 ct->ctx = ctx; 1140 ct->cpl = cpl; 1141 1142 thread = _get_thread(); 1143 if (!thread) { 1144 SPDK_ERRLOG("No thread allocated\n"); 1145 free(ct); 1146 cpl(ctx); 1147 return; 1148 } 1149 ct->orig_thread = thread; 1150 1151 pthread_mutex_lock(&g_devlist_mutex); 1152 ct->cur_thread = TAILQ_FIRST(&g_threads); 1153 pthread_mutex_unlock(&g_devlist_mutex); 1154 1155 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1156 ct->orig_thread->name); 1157 1158 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1159 assert(rc == 0); 1160 } 1161 1162 void 1163 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1164 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1165 const char *name) 1166 { 1167 struct io_device *dev, *tmp; 1168 struct spdk_thread *thread; 1169 1170 assert(io_device != NULL); 1171 assert(create_cb != NULL); 1172 assert(destroy_cb != NULL); 1173 1174 thread = spdk_get_thread(); 1175 if (!thread) { 1176 SPDK_ERRLOG("called from non-SPDK thread\n"); 1177 assert(false); 1178 return; 1179 } 1180 1181 dev = calloc(1, sizeof(struct io_device)); 1182 if (dev == NULL) { 1183 SPDK_ERRLOG("could not allocate io_device\n"); 1184 return; 1185 } 1186 1187 dev->io_device = io_device; 1188 if (name) { 1189 snprintf(dev->name, sizeof(dev->name), "%s", name); 1190 } else { 1191 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1192 } 1193 dev->create_cb = create_cb; 1194 dev->destroy_cb = destroy_cb; 1195 dev->unregister_cb = NULL; 1196 dev->ctx_size = ctx_size; 1197 dev->for_each_count = 0; 1198 dev->unregistered = false; 1199 dev->refcnt = 0; 1200 1201 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1202 dev->name, dev->io_device, thread->name); 1203 1204 pthread_mutex_lock(&g_devlist_mutex); 1205 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1206 if (tmp->io_device == io_device) { 1207 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1208 io_device, tmp->name, dev->name); 1209 free(dev); 1210 pthread_mutex_unlock(&g_devlist_mutex); 1211 return; 1212 } 1213 } 1214 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1215 pthread_mutex_unlock(&g_devlist_mutex); 1216 } 1217 1218 static void 1219 _finish_unregister(void *arg) 1220 { 1221 struct io_device *dev = arg; 1222 1223 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1224 dev->name, dev->io_device, dev->unregister_thread->name); 1225 1226 dev->unregister_cb(dev->io_device); 1227 free(dev); 1228 } 1229 1230 static void 1231 io_device_free(struct io_device *dev) 1232 { 1233 int rc __attribute__((unused)); 1234 1235 if (dev->unregister_cb == NULL) { 1236 free(dev); 1237 } else { 1238 assert(dev->unregister_thread != NULL); 1239 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1240 dev->name, dev->io_device, dev->unregister_thread->name); 1241 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1242 assert(rc == 0); 1243 } 1244 } 1245 1246 void 1247 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1248 { 1249 struct io_device *dev; 1250 uint32_t refcnt; 1251 struct spdk_thread *thread; 1252 1253 thread = spdk_get_thread(); 1254 if (!thread) { 1255 SPDK_ERRLOG("called from non-SPDK thread\n"); 1256 assert(false); 1257 return; 1258 } 1259 1260 pthread_mutex_lock(&g_devlist_mutex); 1261 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1262 if (dev->io_device == io_device) { 1263 break; 1264 } 1265 } 1266 1267 if (!dev) { 1268 SPDK_ERRLOG("io_device %p not found\n", io_device); 1269 assert(false); 1270 pthread_mutex_unlock(&g_devlist_mutex); 1271 return; 1272 } 1273 1274 if (dev->for_each_count > 0) { 1275 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1276 dev->name, io_device, dev->for_each_count); 1277 pthread_mutex_unlock(&g_devlist_mutex); 1278 return; 1279 } 1280 1281 dev->unregister_cb = unregister_cb; 1282 dev->unregistered = true; 1283 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1284 refcnt = dev->refcnt; 1285 dev->unregister_thread = thread; 1286 pthread_mutex_unlock(&g_devlist_mutex); 1287 1288 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1289 dev->name, dev->io_device, thread->name); 1290 1291 if (refcnt > 0) { 1292 /* defer deletion */ 1293 return; 1294 } 1295 1296 io_device_free(dev); 1297 } 1298 1299 const char * 1300 spdk_io_device_get_name(struct io_device *dev) 1301 { 1302 return dev->name; 1303 } 1304 1305 struct spdk_io_channel * 1306 spdk_get_io_channel(void *io_device) 1307 { 1308 struct spdk_io_channel *ch; 1309 struct spdk_thread *thread; 1310 struct io_device *dev; 1311 int rc; 1312 1313 pthread_mutex_lock(&g_devlist_mutex); 1314 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1315 if (dev->io_device == io_device) { 1316 break; 1317 } 1318 } 1319 if (dev == NULL) { 1320 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1321 pthread_mutex_unlock(&g_devlist_mutex); 1322 return NULL; 1323 } 1324 1325 thread = _get_thread(); 1326 if (!thread) { 1327 SPDK_ERRLOG("No thread allocated\n"); 1328 pthread_mutex_unlock(&g_devlist_mutex); 1329 return NULL; 1330 } 1331 1332 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1333 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1334 pthread_mutex_unlock(&g_devlist_mutex); 1335 return NULL; 1336 } 1337 1338 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1339 if (ch->dev == dev) { 1340 ch->ref++; 1341 1342 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1343 ch, dev->name, dev->io_device, thread->name, ch->ref); 1344 1345 /* 1346 * An I/O channel already exists for this device on this 1347 * thread, so return it. 1348 */ 1349 pthread_mutex_unlock(&g_devlist_mutex); 1350 return ch; 1351 } 1352 } 1353 1354 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1355 if (ch == NULL) { 1356 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1357 pthread_mutex_unlock(&g_devlist_mutex); 1358 return NULL; 1359 } 1360 1361 ch->dev = dev; 1362 ch->destroy_cb = dev->destroy_cb; 1363 ch->thread = thread; 1364 ch->ref = 1; 1365 ch->destroy_ref = 0; 1366 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1367 1368 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1369 ch, dev->name, dev->io_device, thread->name, ch->ref); 1370 1371 dev->refcnt++; 1372 1373 pthread_mutex_unlock(&g_devlist_mutex); 1374 1375 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1376 if (rc != 0) { 1377 pthread_mutex_lock(&g_devlist_mutex); 1378 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1379 dev->refcnt--; 1380 free(ch); 1381 pthread_mutex_unlock(&g_devlist_mutex); 1382 return NULL; 1383 } 1384 1385 return ch; 1386 } 1387 1388 static void 1389 put_io_channel(void *arg) 1390 { 1391 struct spdk_io_channel *ch = arg; 1392 bool do_remove_dev = true; 1393 struct spdk_thread *thread; 1394 1395 thread = spdk_get_thread(); 1396 if (!thread) { 1397 SPDK_ERRLOG("called from non-SPDK thread\n"); 1398 assert(false); 1399 return; 1400 } 1401 1402 SPDK_DEBUGLOG(thread, 1403 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1404 ch, ch->dev->name, ch->dev->io_device, thread->name); 1405 1406 assert(ch->thread == thread); 1407 1408 ch->destroy_ref--; 1409 1410 if (ch->ref > 0 || ch->destroy_ref > 0) { 1411 /* 1412 * Another reference to the associated io_device was requested 1413 * after this message was sent but before it had a chance to 1414 * execute. 1415 */ 1416 return; 1417 } 1418 1419 pthread_mutex_lock(&g_devlist_mutex); 1420 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1421 pthread_mutex_unlock(&g_devlist_mutex); 1422 1423 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1424 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1425 1426 pthread_mutex_lock(&g_devlist_mutex); 1427 ch->dev->refcnt--; 1428 1429 if (!ch->dev->unregistered) { 1430 do_remove_dev = false; 1431 } 1432 1433 if (ch->dev->refcnt > 0) { 1434 do_remove_dev = false; 1435 } 1436 1437 pthread_mutex_unlock(&g_devlist_mutex); 1438 1439 if (do_remove_dev) { 1440 io_device_free(ch->dev); 1441 } 1442 free(ch); 1443 } 1444 1445 void 1446 spdk_put_io_channel(struct spdk_io_channel *ch) 1447 { 1448 struct spdk_thread *thread; 1449 int rc __attribute__((unused)); 1450 1451 thread = spdk_get_thread(); 1452 if (!thread) { 1453 SPDK_ERRLOG("called from non-SPDK thread\n"); 1454 assert(false); 1455 return; 1456 } 1457 1458 if (ch->thread != thread) { 1459 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1460 assert(false); 1461 return; 1462 } 1463 1464 SPDK_DEBUGLOG(thread, 1465 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1466 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1467 1468 ch->ref--; 1469 1470 if (ch->ref == 0) { 1471 ch->destroy_ref++; 1472 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 1473 assert(rc == 0); 1474 } 1475 } 1476 1477 struct spdk_io_channel * 1478 spdk_io_channel_from_ctx(void *ctx) 1479 { 1480 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1481 } 1482 1483 struct spdk_thread * 1484 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1485 { 1486 return ch->thread; 1487 } 1488 1489 struct spdk_io_channel_iter { 1490 void *io_device; 1491 struct io_device *dev; 1492 spdk_channel_msg fn; 1493 int status; 1494 void *ctx; 1495 struct spdk_io_channel *ch; 1496 1497 struct spdk_thread *cur_thread; 1498 1499 struct spdk_thread *orig_thread; 1500 spdk_channel_for_each_cpl cpl; 1501 }; 1502 1503 void * 1504 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1505 { 1506 return i->io_device; 1507 } 1508 1509 struct spdk_io_channel * 1510 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1511 { 1512 return i->ch; 1513 } 1514 1515 void * 1516 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1517 { 1518 return i->ctx; 1519 } 1520 1521 static void 1522 _call_completion(void *ctx) 1523 { 1524 struct spdk_io_channel_iter *i = ctx; 1525 1526 if (i->cpl != NULL) { 1527 i->cpl(i, i->status); 1528 } 1529 free(i); 1530 } 1531 1532 static void 1533 _call_channel(void *ctx) 1534 { 1535 struct spdk_io_channel_iter *i = ctx; 1536 struct spdk_io_channel *ch; 1537 1538 /* 1539 * It is possible that the channel was deleted before this 1540 * message had a chance to execute. If so, skip calling 1541 * the fn() on this thread. 1542 */ 1543 pthread_mutex_lock(&g_devlist_mutex); 1544 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1545 if (ch->dev->io_device == i->io_device) { 1546 break; 1547 } 1548 } 1549 pthread_mutex_unlock(&g_devlist_mutex); 1550 1551 if (ch) { 1552 i->fn(i); 1553 } else { 1554 spdk_for_each_channel_continue(i, 0); 1555 } 1556 } 1557 1558 void 1559 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1560 spdk_channel_for_each_cpl cpl) 1561 { 1562 struct spdk_thread *thread; 1563 struct spdk_io_channel *ch; 1564 struct spdk_io_channel_iter *i; 1565 int rc __attribute__((unused)); 1566 1567 i = calloc(1, sizeof(*i)); 1568 if (!i) { 1569 SPDK_ERRLOG("Unable to allocate iterator\n"); 1570 return; 1571 } 1572 1573 i->io_device = io_device; 1574 i->fn = fn; 1575 i->ctx = ctx; 1576 i->cpl = cpl; 1577 1578 pthread_mutex_lock(&g_devlist_mutex); 1579 i->orig_thread = _get_thread(); 1580 1581 TAILQ_FOREACH(thread, &g_threads, tailq) { 1582 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1583 if (ch->dev->io_device == io_device) { 1584 ch->dev->for_each_count++; 1585 i->dev = ch->dev; 1586 i->cur_thread = thread; 1587 i->ch = ch; 1588 pthread_mutex_unlock(&g_devlist_mutex); 1589 rc = spdk_thread_send_msg(thread, _call_channel, i); 1590 assert(rc == 0); 1591 return; 1592 } 1593 } 1594 } 1595 1596 pthread_mutex_unlock(&g_devlist_mutex); 1597 1598 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1599 assert(rc == 0); 1600 } 1601 1602 void 1603 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1604 { 1605 struct spdk_thread *thread; 1606 struct spdk_io_channel *ch; 1607 int rc __attribute__((unused)); 1608 1609 assert(i->cur_thread == spdk_get_thread()); 1610 1611 i->status = status; 1612 1613 pthread_mutex_lock(&g_devlist_mutex); 1614 if (status) { 1615 goto end; 1616 } 1617 thread = TAILQ_NEXT(i->cur_thread, tailq); 1618 while (thread) { 1619 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1620 if (ch->dev->io_device == i->io_device) { 1621 i->cur_thread = thread; 1622 i->ch = ch; 1623 pthread_mutex_unlock(&g_devlist_mutex); 1624 rc = spdk_thread_send_msg(thread, _call_channel, i); 1625 assert(rc == 0); 1626 return; 1627 } 1628 } 1629 thread = TAILQ_NEXT(thread, tailq); 1630 } 1631 1632 end: 1633 i->dev->for_each_count--; 1634 i->ch = NULL; 1635 pthread_mutex_unlock(&g_devlist_mutex); 1636 1637 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1638 assert(rc == 0); 1639 } 1640 1641 1642 SPDK_LOG_REGISTER_COMPONENT(thread) 1643