1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 43 #include "spdk_internal/log.h" 44 #include "spdk_internal/thread.h" 45 46 #define SPDK_MSG_BATCH_SIZE 8 47 #define SPDK_MAX_DEVICE_NAME_LEN 256 48 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 49 50 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 51 52 static spdk_new_thread_fn g_new_thread_fn = NULL; 53 static spdk_thread_op_fn g_thread_op_fn = NULL; 54 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 55 static size_t g_ctx_sz = 0; 56 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 57 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 58 * SPDK application is required. 59 */ 60 static uint64_t g_thread_id = 1; 61 62 struct io_device { 63 void *io_device; 64 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 65 spdk_io_channel_create_cb create_cb; 66 spdk_io_channel_destroy_cb destroy_cb; 67 spdk_io_device_unregister_cb unregister_cb; 68 struct spdk_thread *unregister_thread; 69 uint32_t ctx_size; 70 uint32_t for_each_count; 71 TAILQ_ENTRY(io_device) tailq; 72 73 uint32_t refcnt; 74 75 bool unregistered; 76 }; 77 78 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 79 80 struct spdk_msg { 81 spdk_msg_fn fn; 82 void *arg; 83 84 SLIST_ENTRY(spdk_msg) link; 85 }; 86 87 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 88 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 89 90 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 91 static uint32_t g_thread_count = 0; 92 93 static __thread struct spdk_thread *tls_thread = NULL; 94 95 static inline struct spdk_thread * 96 _get_thread(void) 97 { 98 return tls_thread; 99 } 100 101 static int 102 _thread_lib_init(size_t ctx_sz) 103 { 104 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 105 106 g_ctx_sz = ctx_sz; 107 108 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 109 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 110 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 111 sizeof(struct spdk_msg), 112 0, /* No cache. We do our own. */ 113 SPDK_ENV_SOCKET_ID_ANY); 114 115 if (!g_spdk_msg_mempool) { 116 return -1; 117 } 118 119 return 0; 120 } 121 122 int 123 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 124 { 125 assert(g_new_thread_fn == NULL); 126 assert(g_thread_op_fn == NULL); 127 128 if (new_thread_fn == NULL) { 129 SPDK_INFOLOG(SPDK_LOG_THREAD, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 130 } else { 131 g_new_thread_fn = new_thread_fn; 132 } 133 134 return _thread_lib_init(ctx_sz); 135 } 136 137 int 138 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 139 spdk_thread_op_supported_fn thread_op_supported_fn, 140 size_t ctx_sz) 141 { 142 assert(g_new_thread_fn == NULL); 143 assert(g_thread_op_fn == NULL); 144 assert(g_thread_op_supported_fn == NULL); 145 146 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 147 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 148 return -EINVAL; 149 } 150 151 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 152 SPDK_INFOLOG(SPDK_LOG_THREAD, "thread_op_fn and thread_op_supported_fn were not specified\n"); 153 } else { 154 g_thread_op_fn = thread_op_fn; 155 g_thread_op_supported_fn = thread_op_supported_fn; 156 } 157 158 return _thread_lib_init(ctx_sz); 159 } 160 161 void 162 spdk_thread_lib_fini(void) 163 { 164 struct io_device *dev; 165 166 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 167 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 168 } 169 170 if (g_spdk_msg_mempool) { 171 spdk_mempool_free(g_spdk_msg_mempool); 172 g_spdk_msg_mempool = NULL; 173 } 174 175 g_new_thread_fn = NULL; 176 g_thread_op_fn = NULL; 177 g_thread_op_supported_fn = NULL; 178 g_ctx_sz = 0; 179 } 180 181 static void 182 _free_thread(struct spdk_thread *thread) 183 { 184 struct spdk_io_channel *ch; 185 struct spdk_msg *msg; 186 struct spdk_poller *poller, *ptmp; 187 188 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 189 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 190 thread->name, ch->dev->name); 191 } 192 193 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 194 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 195 SPDK_WARNLOG("poller %s still registered at thread exit\n", 196 poller->name); 197 } 198 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 199 free(poller); 200 } 201 202 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 203 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 204 SPDK_WARNLOG("poller %s still registered at thread exit\n", 205 poller->name); 206 } 207 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 208 free(poller); 209 } 210 211 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 212 SPDK_WARNLOG("poller %s still registered at thread exit\n", poller->name); 213 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 214 free(poller); 215 } 216 217 pthread_mutex_lock(&g_devlist_mutex); 218 assert(g_thread_count > 0); 219 g_thread_count--; 220 TAILQ_REMOVE(&g_threads, thread, tailq); 221 pthread_mutex_unlock(&g_devlist_mutex); 222 223 msg = SLIST_FIRST(&thread->msg_cache); 224 while (msg != NULL) { 225 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 226 227 assert(thread->msg_cache_count > 0); 228 thread->msg_cache_count--; 229 spdk_mempool_put(g_spdk_msg_mempool, msg); 230 231 msg = SLIST_FIRST(&thread->msg_cache); 232 } 233 234 assert(thread->msg_cache_count == 0); 235 236 spdk_ring_free(thread->messages); 237 free(thread); 238 } 239 240 struct spdk_thread * 241 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 242 { 243 struct spdk_thread *thread; 244 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 245 int rc = 0, i; 246 247 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 248 if (!thread) { 249 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 250 return NULL; 251 } 252 253 if (cpumask) { 254 spdk_cpuset_copy(&thread->cpumask, cpumask); 255 } else { 256 spdk_cpuset_negate(&thread->cpumask); 257 } 258 259 TAILQ_INIT(&thread->io_channels); 260 TAILQ_INIT(&thread->active_pollers); 261 TAILQ_INIT(&thread->timed_pollers); 262 TAILQ_INIT(&thread->paused_pollers); 263 SLIST_INIT(&thread->msg_cache); 264 thread->msg_cache_count = 0; 265 266 thread->tsc_last = spdk_get_ticks(); 267 268 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 269 if (!thread->messages) { 270 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 271 free(thread); 272 return NULL; 273 } 274 275 /* Fill the local message pool cache. */ 276 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 277 if (rc == 0) { 278 /* If we can't populate the cache it's ok. The cache will get filled 279 * up organically as messages are passed to the thread. */ 280 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 281 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 282 thread->msg_cache_count++; 283 } 284 } 285 286 if (name) { 287 snprintf(thread->name, sizeof(thread->name), "%s", name); 288 } else { 289 snprintf(thread->name, sizeof(thread->name), "%p", thread); 290 } 291 292 pthread_mutex_lock(&g_devlist_mutex); 293 if (g_thread_id == 0) { 294 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 295 pthread_mutex_unlock(&g_devlist_mutex); 296 _free_thread(thread); 297 return NULL; 298 } 299 thread->id = g_thread_id++; 300 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 301 g_thread_count++; 302 pthread_mutex_unlock(&g_devlist_mutex); 303 304 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread (%" PRIu64 ", %s)\n", 305 thread->id, thread->name); 306 307 if (g_new_thread_fn) { 308 rc = g_new_thread_fn(thread); 309 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 310 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 311 } 312 313 if (rc != 0) { 314 _free_thread(thread); 315 return NULL; 316 } 317 318 thread->state = SPDK_THREAD_STATE_RUNNING; 319 320 return thread; 321 } 322 323 void 324 spdk_set_thread(struct spdk_thread *thread) 325 { 326 tls_thread = thread; 327 } 328 329 static void 330 thread_exit(struct spdk_thread *thread, uint64_t now) 331 { 332 struct spdk_poller *poller; 333 struct spdk_io_channel *ch; 334 335 if (now >= thread->exit_timeout_tsc) { 336 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 337 thread->name); 338 goto exited; 339 } 340 341 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 342 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 343 SPDK_INFOLOG(SPDK_LOG_THREAD, 344 "thread %s still has active poller %s\n", 345 thread->name, poller->name); 346 return; 347 } 348 } 349 350 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 351 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 352 SPDK_INFOLOG(SPDK_LOG_THREAD, 353 "thread %s still has active timed poller %s\n", 354 thread->name, poller->name); 355 return; 356 } 357 } 358 359 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 360 SPDK_INFOLOG(SPDK_LOG_THREAD, 361 "thread %s still has paused poller %s\n", 362 thread->name, poller->name); 363 return; 364 } 365 366 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 367 SPDK_INFOLOG(SPDK_LOG_THREAD, 368 "thread %s still has channel for io_device %s\n", 369 thread->name, ch->dev->name); 370 return; 371 } 372 373 exited: 374 thread->state = SPDK_THREAD_STATE_EXITED; 375 } 376 377 int 378 spdk_thread_exit(struct spdk_thread *thread) 379 { 380 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); 381 382 assert(tls_thread == thread); 383 384 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 385 SPDK_INFOLOG(SPDK_LOG_THREAD, 386 "thread %s is already exiting\n", 387 thread->name); 388 return 0; 389 } 390 391 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 392 SPDK_THREAD_EXIT_TIMEOUT_SEC); 393 thread->state = SPDK_THREAD_STATE_EXITING; 394 return 0; 395 } 396 397 bool 398 spdk_thread_is_exited(struct spdk_thread *thread) 399 { 400 return thread->state == SPDK_THREAD_STATE_EXITED; 401 } 402 403 void 404 spdk_thread_destroy(struct spdk_thread *thread) 405 { 406 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); 407 408 assert(thread->state == SPDK_THREAD_STATE_EXITED); 409 410 if (tls_thread == thread) { 411 tls_thread = NULL; 412 } 413 414 _free_thread(thread); 415 } 416 417 void * 418 spdk_thread_get_ctx(struct spdk_thread *thread) 419 { 420 if (g_ctx_sz > 0) { 421 return thread->ctx; 422 } 423 424 return NULL; 425 } 426 427 struct spdk_cpuset * 428 spdk_thread_get_cpumask(struct spdk_thread *thread) 429 { 430 return &thread->cpumask; 431 } 432 433 int 434 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 435 { 436 struct spdk_thread *thread; 437 438 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 439 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 440 assert(false); 441 return -ENOTSUP; 442 } 443 444 thread = spdk_get_thread(); 445 if (!thread) { 446 SPDK_ERRLOG("Called from non-SPDK thread\n"); 447 assert(false); 448 return -EINVAL; 449 } 450 451 spdk_cpuset_copy(&thread->cpumask, cpumask); 452 453 /* Invoke framework's reschedule operation. If this function is called multiple times 454 * in a single spdk_thread_poll() context, the last cpumask will be used in the 455 * reschedule operation. 456 */ 457 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 458 459 return 0; 460 } 461 462 struct spdk_thread * 463 spdk_thread_get_from_ctx(void *ctx) 464 { 465 if (ctx == NULL) { 466 assert(false); 467 return NULL; 468 } 469 470 assert(g_ctx_sz > 0); 471 472 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 473 } 474 475 static inline uint32_t 476 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 477 { 478 unsigned count, i; 479 void *messages[SPDK_MSG_BATCH_SIZE]; 480 481 #ifdef DEBUG 482 /* 483 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 484 * so we will never actually read uninitialized data from events, but just to be sure 485 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 486 */ 487 memset(messages, 0, sizeof(messages)); 488 #endif 489 490 if (max_msgs > 0) { 491 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 492 } else { 493 max_msgs = SPDK_MSG_BATCH_SIZE; 494 } 495 496 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 497 if (count == 0) { 498 return 0; 499 } 500 501 for (i = 0; i < count; i++) { 502 struct spdk_msg *msg = messages[i]; 503 504 assert(msg != NULL); 505 msg->fn(msg->arg); 506 507 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 508 /* Insert the messages at the head. We want to re-use the hot 509 * ones. */ 510 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 511 thread->msg_cache_count++; 512 } else { 513 spdk_mempool_put(g_spdk_msg_mempool, msg); 514 } 515 } 516 517 return count; 518 } 519 520 static void 521 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 522 { 523 struct spdk_poller *iter; 524 525 poller->next_run_tick = now + poller->period_ticks; 526 527 /* 528 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 529 * run time. 530 */ 531 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 532 if (iter->next_run_tick <= poller->next_run_tick) { 533 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 534 return; 535 } 536 } 537 538 /* No earlier pollers were found, so this poller must be the new head */ 539 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 540 } 541 542 static void 543 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 544 { 545 if (poller->period_ticks) { 546 poller_insert_timer(thread, poller, spdk_get_ticks()); 547 } else { 548 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 549 } 550 } 551 552 static inline void 553 thread_update_stats(struct spdk_thread *thread, uint64_t end, 554 uint64_t start, int rc) 555 { 556 if (rc == 0) { 557 /* Poller status idle */ 558 thread->stats.idle_tsc += end - start; 559 } else if (rc > 0) { 560 /* Poller status busy */ 561 thread->stats.busy_tsc += end - start; 562 } 563 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 564 thread->tsc_last = end; 565 } 566 567 static int 568 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 569 { 570 uint32_t msg_count; 571 struct spdk_poller *poller, *tmp; 572 spdk_msg_fn critical_msg; 573 int rc = 0; 574 575 critical_msg = thread->critical_msg; 576 if (spdk_unlikely(critical_msg != NULL)) { 577 critical_msg(NULL); 578 thread->critical_msg = NULL; 579 } 580 581 msg_count = msg_queue_run_batch(thread, max_msgs); 582 if (msg_count) { 583 rc = 1; 584 } 585 586 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 587 active_pollers_head, tailq, tmp) { 588 int poller_rc; 589 590 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 591 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 592 free(poller); 593 continue; 594 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 595 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 596 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 597 poller->state = SPDK_POLLER_STATE_PAUSED; 598 continue; 599 } 600 601 poller->state = SPDK_POLLER_STATE_RUNNING; 602 poller_rc = poller->fn(poller->arg); 603 604 poller->run_count++; 605 if (poller_rc > 0) { 606 poller->busy_count++; 607 } 608 609 #ifdef DEBUG 610 if (poller_rc == -1) { 611 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %s returned -1\n", poller->name); 612 } 613 #endif 614 615 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 616 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 617 free(poller); 618 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 619 poller->state = SPDK_POLLER_STATE_WAITING; 620 } 621 622 if (poller_rc > rc) { 623 rc = poller_rc; 624 } 625 } 626 627 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 628 int timer_rc = 0; 629 630 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 631 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 632 free(poller); 633 continue; 634 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 635 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 636 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 637 poller->state = SPDK_POLLER_STATE_PAUSED; 638 continue; 639 } 640 641 if (now < poller->next_run_tick) { 642 break; 643 } 644 645 poller->state = SPDK_POLLER_STATE_RUNNING; 646 timer_rc = poller->fn(poller->arg); 647 648 poller->run_count++; 649 if (timer_rc > 0) { 650 poller->busy_count++; 651 } 652 653 #ifdef DEBUG 654 if (timer_rc == -1) { 655 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %s returned -1\n", poller->name); 656 } 657 #endif 658 659 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 660 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 661 free(poller); 662 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 663 poller->state = SPDK_POLLER_STATE_WAITING; 664 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 665 poller_insert_timer(thread, poller, now); 666 } 667 668 if (timer_rc > rc) { 669 rc = timer_rc; 670 } 671 } 672 673 return rc; 674 } 675 676 int 677 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 678 { 679 struct spdk_thread *orig_thread; 680 int rc; 681 682 orig_thread = _get_thread(); 683 tls_thread = thread; 684 685 if (now == 0) { 686 now = spdk_get_ticks(); 687 } 688 689 rc = thread_poll(thread, max_msgs, now); 690 691 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 692 thread_exit(thread, now); 693 } 694 695 thread_update_stats(thread, spdk_get_ticks(), now, rc); 696 697 tls_thread = orig_thread; 698 699 return rc; 700 } 701 702 uint64_t 703 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 704 { 705 struct spdk_poller *poller; 706 707 poller = TAILQ_FIRST(&thread->timed_pollers); 708 if (poller) { 709 return poller->next_run_tick; 710 } 711 712 return 0; 713 } 714 715 int 716 spdk_thread_has_active_pollers(struct spdk_thread *thread) 717 { 718 return !TAILQ_EMPTY(&thread->active_pollers); 719 } 720 721 static bool 722 thread_has_unpaused_pollers(struct spdk_thread *thread) 723 { 724 if (TAILQ_EMPTY(&thread->active_pollers) && 725 TAILQ_EMPTY(&thread->timed_pollers)) { 726 return false; 727 } 728 729 return true; 730 } 731 732 bool 733 spdk_thread_has_pollers(struct spdk_thread *thread) 734 { 735 if (!thread_has_unpaused_pollers(thread) && 736 TAILQ_EMPTY(&thread->paused_pollers)) { 737 return false; 738 } 739 740 return true; 741 } 742 743 bool 744 spdk_thread_is_idle(struct spdk_thread *thread) 745 { 746 if (spdk_ring_count(thread->messages) || 747 thread_has_unpaused_pollers(thread) || 748 thread->critical_msg != NULL) { 749 return false; 750 } 751 752 return true; 753 } 754 755 uint32_t 756 spdk_thread_get_count(void) 757 { 758 /* 759 * Return cached value of the current thread count. We could acquire the 760 * lock and iterate through the TAILQ of threads to count them, but that 761 * count could still be invalidated after we release the lock. 762 */ 763 return g_thread_count; 764 } 765 766 struct spdk_thread * 767 spdk_get_thread(void) 768 { 769 return _get_thread(); 770 } 771 772 const char * 773 spdk_thread_get_name(const struct spdk_thread *thread) 774 { 775 return thread->name; 776 } 777 778 uint64_t 779 spdk_thread_get_id(const struct spdk_thread *thread) 780 { 781 return thread->id; 782 } 783 784 struct spdk_thread * 785 spdk_thread_get_by_id(uint64_t id) 786 { 787 struct spdk_thread *thread; 788 789 pthread_mutex_lock(&g_devlist_mutex); 790 TAILQ_FOREACH(thread, &g_threads, tailq) { 791 if (thread->id == id) { 792 pthread_mutex_unlock(&g_devlist_mutex); 793 794 return thread; 795 } 796 } 797 pthread_mutex_unlock(&g_devlist_mutex); 798 799 return NULL; 800 } 801 802 int 803 spdk_thread_get_stats(struct spdk_thread_stats *stats) 804 { 805 struct spdk_thread *thread; 806 807 thread = _get_thread(); 808 if (!thread) { 809 SPDK_ERRLOG("No thread allocated\n"); 810 return -EINVAL; 811 } 812 813 if (stats == NULL) { 814 return -EINVAL; 815 } 816 817 *stats = thread->stats; 818 819 return 0; 820 } 821 822 uint64_t 823 spdk_thread_get_last_tsc(struct spdk_thread *thread) 824 { 825 return thread->tsc_last; 826 } 827 828 int 829 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 830 { 831 struct spdk_thread *local_thread; 832 struct spdk_msg *msg; 833 int rc; 834 835 assert(thread != NULL); 836 837 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 838 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 839 return -EIO; 840 } 841 842 local_thread = _get_thread(); 843 844 msg = NULL; 845 if (local_thread != NULL) { 846 if (local_thread->msg_cache_count > 0) { 847 msg = SLIST_FIRST(&local_thread->msg_cache); 848 assert(msg != NULL); 849 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 850 local_thread->msg_cache_count--; 851 } 852 } 853 854 if (msg == NULL) { 855 msg = spdk_mempool_get(g_spdk_msg_mempool); 856 if (!msg) { 857 SPDK_ERRLOG("msg could not be allocated\n"); 858 return -ENOMEM; 859 } 860 } 861 862 msg->fn = fn; 863 msg->arg = ctx; 864 865 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 866 if (rc != 1) { 867 SPDK_ERRLOG("msg could not be enqueued\n"); 868 spdk_mempool_put(g_spdk_msg_mempool, msg); 869 return -EIO; 870 } 871 872 return 0; 873 } 874 875 int 876 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 877 { 878 spdk_msg_fn expected = NULL; 879 880 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 881 __ATOMIC_SEQ_CST)) { 882 return 0; 883 } 884 885 return -EIO; 886 } 887 888 static struct spdk_poller * 889 poller_register(spdk_poller_fn fn, 890 void *arg, 891 uint64_t period_microseconds, 892 const char *name) 893 { 894 struct spdk_thread *thread; 895 struct spdk_poller *poller; 896 uint64_t quotient, remainder, ticks; 897 898 thread = spdk_get_thread(); 899 if (!thread) { 900 assert(false); 901 return NULL; 902 } 903 904 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 905 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 906 return NULL; 907 } 908 909 poller = calloc(1, sizeof(*poller)); 910 if (poller == NULL) { 911 SPDK_ERRLOG("Poller memory allocation failed\n"); 912 return NULL; 913 } 914 915 if (name) { 916 snprintf(poller->name, sizeof(poller->name), "%s", name); 917 } else { 918 snprintf(poller->name, sizeof(poller->name), "%p", fn); 919 } 920 921 poller->state = SPDK_POLLER_STATE_WAITING; 922 poller->fn = fn; 923 poller->arg = arg; 924 poller->thread = thread; 925 926 if (period_microseconds) { 927 quotient = period_microseconds / SPDK_SEC_TO_USEC; 928 remainder = period_microseconds % SPDK_SEC_TO_USEC; 929 ticks = spdk_get_ticks_hz(); 930 931 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 932 } else { 933 poller->period_ticks = 0; 934 } 935 936 thread_insert_poller(thread, poller); 937 938 return poller; 939 } 940 941 struct spdk_poller * 942 spdk_poller_register(spdk_poller_fn fn, 943 void *arg, 944 uint64_t period_microseconds) 945 { 946 return poller_register(fn, arg, period_microseconds, NULL); 947 } 948 949 struct spdk_poller * 950 spdk_poller_register_named(spdk_poller_fn fn, 951 void *arg, 952 uint64_t period_microseconds, 953 const char *name) 954 { 955 return poller_register(fn, arg, period_microseconds, name); 956 } 957 958 void 959 spdk_poller_unregister(struct spdk_poller **ppoller) 960 { 961 struct spdk_thread *thread; 962 struct spdk_poller *poller; 963 964 poller = *ppoller; 965 if (poller == NULL) { 966 return; 967 } 968 969 *ppoller = NULL; 970 971 thread = spdk_get_thread(); 972 if (!thread) { 973 assert(false); 974 return; 975 } 976 977 if (poller->thread != thread) { 978 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 979 assert(false); 980 return; 981 } 982 983 /* If the poller was paused, put it on the active_pollers list so that 984 * its unregistration can be processed by spdk_thread_poll(). 985 */ 986 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 987 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 988 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 989 poller->period_ticks = 0; 990 } 991 992 /* Simply set the state to unregistered. The poller will get cleaned up 993 * in a subsequent call to spdk_thread_poll(). 994 */ 995 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 996 } 997 998 void 999 spdk_poller_pause(struct spdk_poller *poller) 1000 { 1001 struct spdk_thread *thread; 1002 1003 if (poller->state == SPDK_POLLER_STATE_PAUSED || 1004 poller->state == SPDK_POLLER_STATE_PAUSING) { 1005 return; 1006 } 1007 1008 thread = spdk_get_thread(); 1009 if (!thread) { 1010 assert(false); 1011 return; 1012 } 1013 1014 /* If a poller is paused from within itself, we can immediately move it 1015 * on the paused_pollers list. Otherwise we just set its state to 1016 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 1017 * allows a poller to be paused from another one's context without 1018 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 1019 */ 1020 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 1021 poller->state = SPDK_POLLER_STATE_PAUSING; 1022 } else { 1023 if (poller->period_ticks > 0) { 1024 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 1025 } else { 1026 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1027 } 1028 1029 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1030 poller->state = SPDK_POLLER_STATE_PAUSED; 1031 } 1032 } 1033 1034 void 1035 spdk_poller_resume(struct spdk_poller *poller) 1036 { 1037 struct spdk_thread *thread; 1038 1039 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1040 poller->state != SPDK_POLLER_STATE_PAUSING) { 1041 return; 1042 } 1043 1044 thread = spdk_get_thread(); 1045 if (!thread) { 1046 assert(false); 1047 return; 1048 } 1049 1050 /* If a poller is paused it has to be removed from the paused pollers 1051 * list and put on the active / timer list depending on its 1052 * period_ticks. If a poller is still in the process of being paused, 1053 * we just need to flip its state back to waiting, as it's already on 1054 * the appropriate list. 1055 */ 1056 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1057 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1058 thread_insert_poller(thread, poller); 1059 } 1060 1061 poller->state = SPDK_POLLER_STATE_WAITING; 1062 } 1063 1064 const char * 1065 spdk_poller_state_str(enum spdk_poller_state state) 1066 { 1067 switch (state) { 1068 case SPDK_POLLER_STATE_WAITING: 1069 return "waiting"; 1070 case SPDK_POLLER_STATE_RUNNING: 1071 return "running"; 1072 case SPDK_POLLER_STATE_UNREGISTERED: 1073 return "unregistered"; 1074 case SPDK_POLLER_STATE_PAUSING: 1075 return "pausing"; 1076 case SPDK_POLLER_STATE_PAUSED: 1077 return "paused"; 1078 default: 1079 return NULL; 1080 } 1081 } 1082 1083 struct call_thread { 1084 struct spdk_thread *cur_thread; 1085 spdk_msg_fn fn; 1086 void *ctx; 1087 1088 struct spdk_thread *orig_thread; 1089 spdk_msg_fn cpl; 1090 }; 1091 1092 static void 1093 _on_thread(void *ctx) 1094 { 1095 struct call_thread *ct = ctx; 1096 int rc __attribute__((unused)); 1097 1098 ct->fn(ct->ctx); 1099 1100 pthread_mutex_lock(&g_devlist_mutex); 1101 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1102 pthread_mutex_unlock(&g_devlist_mutex); 1103 1104 if (!ct->cur_thread) { 1105 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 1106 1107 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1108 free(ctx); 1109 } else { 1110 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 1111 ct->cur_thread->name); 1112 1113 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1114 } 1115 assert(rc == 0); 1116 } 1117 1118 void 1119 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1120 { 1121 struct call_thread *ct; 1122 struct spdk_thread *thread; 1123 int rc __attribute__((unused)); 1124 1125 ct = calloc(1, sizeof(*ct)); 1126 if (!ct) { 1127 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1128 cpl(ctx); 1129 return; 1130 } 1131 1132 ct->fn = fn; 1133 ct->ctx = ctx; 1134 ct->cpl = cpl; 1135 1136 thread = _get_thread(); 1137 if (!thread) { 1138 SPDK_ERRLOG("No thread allocated\n"); 1139 free(ct); 1140 cpl(ctx); 1141 return; 1142 } 1143 ct->orig_thread = thread; 1144 1145 pthread_mutex_lock(&g_devlist_mutex); 1146 ct->cur_thread = TAILQ_FIRST(&g_threads); 1147 pthread_mutex_unlock(&g_devlist_mutex); 1148 1149 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 1150 ct->orig_thread->name); 1151 1152 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1153 assert(rc == 0); 1154 } 1155 1156 void 1157 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1158 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1159 const char *name) 1160 { 1161 struct io_device *dev, *tmp; 1162 struct spdk_thread *thread; 1163 1164 assert(io_device != NULL); 1165 assert(create_cb != NULL); 1166 assert(destroy_cb != NULL); 1167 1168 thread = spdk_get_thread(); 1169 if (!thread) { 1170 SPDK_ERRLOG("called from non-SPDK thread\n"); 1171 assert(false); 1172 return; 1173 } 1174 1175 dev = calloc(1, sizeof(struct io_device)); 1176 if (dev == NULL) { 1177 SPDK_ERRLOG("could not allocate io_device\n"); 1178 return; 1179 } 1180 1181 dev->io_device = io_device; 1182 if (name) { 1183 snprintf(dev->name, sizeof(dev->name), "%s", name); 1184 } else { 1185 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1186 } 1187 dev->create_cb = create_cb; 1188 dev->destroy_cb = destroy_cb; 1189 dev->unregister_cb = NULL; 1190 dev->ctx_size = ctx_size; 1191 dev->for_each_count = 0; 1192 dev->unregistered = false; 1193 dev->refcnt = 0; 1194 1195 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 1196 dev->name, dev->io_device, thread->name); 1197 1198 pthread_mutex_lock(&g_devlist_mutex); 1199 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1200 if (tmp->io_device == io_device) { 1201 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1202 io_device, tmp->name, dev->name); 1203 free(dev); 1204 pthread_mutex_unlock(&g_devlist_mutex); 1205 return; 1206 } 1207 } 1208 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1209 pthread_mutex_unlock(&g_devlist_mutex); 1210 } 1211 1212 static void 1213 _finish_unregister(void *arg) 1214 { 1215 struct io_device *dev = arg; 1216 1217 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1218 dev->name, dev->io_device, dev->unregister_thread->name); 1219 1220 dev->unregister_cb(dev->io_device); 1221 free(dev); 1222 } 1223 1224 static void 1225 io_device_free(struct io_device *dev) 1226 { 1227 int rc __attribute__((unused)); 1228 1229 if (dev->unregister_cb == NULL) { 1230 free(dev); 1231 } else { 1232 assert(dev->unregister_thread != NULL); 1233 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 1234 dev->name, dev->io_device, dev->unregister_thread->name); 1235 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1236 assert(rc == 0); 1237 } 1238 } 1239 1240 void 1241 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1242 { 1243 struct io_device *dev; 1244 uint32_t refcnt; 1245 struct spdk_thread *thread; 1246 1247 thread = spdk_get_thread(); 1248 if (!thread) { 1249 SPDK_ERRLOG("called from non-SPDK thread\n"); 1250 assert(false); 1251 return; 1252 } 1253 1254 pthread_mutex_lock(&g_devlist_mutex); 1255 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1256 if (dev->io_device == io_device) { 1257 break; 1258 } 1259 } 1260 1261 if (!dev) { 1262 SPDK_ERRLOG("io_device %p not found\n", io_device); 1263 assert(false); 1264 pthread_mutex_unlock(&g_devlist_mutex); 1265 return; 1266 } 1267 1268 if (dev->for_each_count > 0) { 1269 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1270 dev->name, io_device, dev->for_each_count); 1271 pthread_mutex_unlock(&g_devlist_mutex); 1272 return; 1273 } 1274 1275 dev->unregister_cb = unregister_cb; 1276 dev->unregistered = true; 1277 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1278 refcnt = dev->refcnt; 1279 dev->unregister_thread = thread; 1280 pthread_mutex_unlock(&g_devlist_mutex); 1281 1282 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 1283 dev->name, dev->io_device, thread->name); 1284 1285 if (refcnt > 0) { 1286 /* defer deletion */ 1287 return; 1288 } 1289 1290 io_device_free(dev); 1291 } 1292 1293 const char * 1294 spdk_io_device_get_name(struct io_device *dev) 1295 { 1296 return dev->name; 1297 } 1298 1299 struct spdk_io_channel * 1300 spdk_get_io_channel(void *io_device) 1301 { 1302 struct spdk_io_channel *ch; 1303 struct spdk_thread *thread; 1304 struct io_device *dev; 1305 int rc; 1306 1307 pthread_mutex_lock(&g_devlist_mutex); 1308 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1309 if (dev->io_device == io_device) { 1310 break; 1311 } 1312 } 1313 if (dev == NULL) { 1314 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1315 pthread_mutex_unlock(&g_devlist_mutex); 1316 return NULL; 1317 } 1318 1319 thread = _get_thread(); 1320 if (!thread) { 1321 SPDK_ERRLOG("No thread allocated\n"); 1322 pthread_mutex_unlock(&g_devlist_mutex); 1323 return NULL; 1324 } 1325 1326 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1327 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1328 pthread_mutex_unlock(&g_devlist_mutex); 1329 return NULL; 1330 } 1331 1332 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1333 if (ch->dev == dev) { 1334 ch->ref++; 1335 1336 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1337 ch, dev->name, dev->io_device, thread->name, ch->ref); 1338 1339 /* 1340 * An I/O channel already exists for this device on this 1341 * thread, so return it. 1342 */ 1343 pthread_mutex_unlock(&g_devlist_mutex); 1344 return ch; 1345 } 1346 } 1347 1348 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1349 if (ch == NULL) { 1350 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1351 pthread_mutex_unlock(&g_devlist_mutex); 1352 return NULL; 1353 } 1354 1355 ch->dev = dev; 1356 ch->destroy_cb = dev->destroy_cb; 1357 ch->thread = thread; 1358 ch->ref = 1; 1359 ch->destroy_ref = 0; 1360 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1361 1362 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1363 ch, dev->name, dev->io_device, thread->name, ch->ref); 1364 1365 dev->refcnt++; 1366 1367 pthread_mutex_unlock(&g_devlist_mutex); 1368 1369 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1370 if (rc != 0) { 1371 pthread_mutex_lock(&g_devlist_mutex); 1372 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1373 dev->refcnt--; 1374 free(ch); 1375 pthread_mutex_unlock(&g_devlist_mutex); 1376 return NULL; 1377 } 1378 1379 return ch; 1380 } 1381 1382 static void 1383 put_io_channel(void *arg) 1384 { 1385 struct spdk_io_channel *ch = arg; 1386 bool do_remove_dev = true; 1387 struct spdk_thread *thread; 1388 1389 thread = spdk_get_thread(); 1390 if (!thread) { 1391 SPDK_ERRLOG("called from non-SPDK thread\n"); 1392 assert(false); 1393 return; 1394 } 1395 1396 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1397 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1398 ch, ch->dev->name, ch->dev->io_device, thread->name); 1399 1400 assert(ch->thread == thread); 1401 1402 ch->destroy_ref--; 1403 1404 if (ch->ref > 0 || ch->destroy_ref > 0) { 1405 /* 1406 * Another reference to the associated io_device was requested 1407 * after this message was sent but before it had a chance to 1408 * execute. 1409 */ 1410 return; 1411 } 1412 1413 pthread_mutex_lock(&g_devlist_mutex); 1414 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1415 pthread_mutex_unlock(&g_devlist_mutex); 1416 1417 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1418 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1419 1420 pthread_mutex_lock(&g_devlist_mutex); 1421 ch->dev->refcnt--; 1422 1423 if (!ch->dev->unregistered) { 1424 do_remove_dev = false; 1425 } 1426 1427 if (ch->dev->refcnt > 0) { 1428 do_remove_dev = false; 1429 } 1430 1431 pthread_mutex_unlock(&g_devlist_mutex); 1432 1433 if (do_remove_dev) { 1434 io_device_free(ch->dev); 1435 } 1436 free(ch); 1437 } 1438 1439 void 1440 spdk_put_io_channel(struct spdk_io_channel *ch) 1441 { 1442 struct spdk_thread *thread; 1443 int rc __attribute__((unused)); 1444 1445 thread = spdk_get_thread(); 1446 if (!thread) { 1447 SPDK_ERRLOG("called from non-SPDK thread\n"); 1448 assert(false); 1449 return; 1450 } 1451 1452 if (ch->thread != thread) { 1453 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1454 assert(false); 1455 return; 1456 } 1457 1458 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1459 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1460 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1461 1462 ch->ref--; 1463 1464 if (ch->ref == 0) { 1465 ch->destroy_ref++; 1466 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 1467 assert(rc == 0); 1468 } 1469 } 1470 1471 struct spdk_io_channel * 1472 spdk_io_channel_from_ctx(void *ctx) 1473 { 1474 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1475 } 1476 1477 struct spdk_thread * 1478 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1479 { 1480 return ch->thread; 1481 } 1482 1483 struct spdk_io_channel_iter { 1484 void *io_device; 1485 struct io_device *dev; 1486 spdk_channel_msg fn; 1487 int status; 1488 void *ctx; 1489 struct spdk_io_channel *ch; 1490 1491 struct spdk_thread *cur_thread; 1492 1493 struct spdk_thread *orig_thread; 1494 spdk_channel_for_each_cpl cpl; 1495 }; 1496 1497 void * 1498 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1499 { 1500 return i->io_device; 1501 } 1502 1503 struct spdk_io_channel * 1504 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1505 { 1506 return i->ch; 1507 } 1508 1509 void * 1510 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1511 { 1512 return i->ctx; 1513 } 1514 1515 static void 1516 _call_completion(void *ctx) 1517 { 1518 struct spdk_io_channel_iter *i = ctx; 1519 1520 if (i->cpl != NULL) { 1521 i->cpl(i, i->status); 1522 } 1523 free(i); 1524 } 1525 1526 static void 1527 _call_channel(void *ctx) 1528 { 1529 struct spdk_io_channel_iter *i = ctx; 1530 struct spdk_io_channel *ch; 1531 1532 /* 1533 * It is possible that the channel was deleted before this 1534 * message had a chance to execute. If so, skip calling 1535 * the fn() on this thread. 1536 */ 1537 pthread_mutex_lock(&g_devlist_mutex); 1538 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1539 if (ch->dev->io_device == i->io_device) { 1540 break; 1541 } 1542 } 1543 pthread_mutex_unlock(&g_devlist_mutex); 1544 1545 if (ch) { 1546 i->fn(i); 1547 } else { 1548 spdk_for_each_channel_continue(i, 0); 1549 } 1550 } 1551 1552 void 1553 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1554 spdk_channel_for_each_cpl cpl) 1555 { 1556 struct spdk_thread *thread; 1557 struct spdk_io_channel *ch; 1558 struct spdk_io_channel_iter *i; 1559 int rc __attribute__((unused)); 1560 1561 i = calloc(1, sizeof(*i)); 1562 if (!i) { 1563 SPDK_ERRLOG("Unable to allocate iterator\n"); 1564 return; 1565 } 1566 1567 i->io_device = io_device; 1568 i->fn = fn; 1569 i->ctx = ctx; 1570 i->cpl = cpl; 1571 1572 pthread_mutex_lock(&g_devlist_mutex); 1573 i->orig_thread = _get_thread(); 1574 1575 TAILQ_FOREACH(thread, &g_threads, tailq) { 1576 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1577 if (ch->dev->io_device == io_device) { 1578 ch->dev->for_each_count++; 1579 i->dev = ch->dev; 1580 i->cur_thread = thread; 1581 i->ch = ch; 1582 pthread_mutex_unlock(&g_devlist_mutex); 1583 rc = spdk_thread_send_msg(thread, _call_channel, i); 1584 assert(rc == 0); 1585 return; 1586 } 1587 } 1588 } 1589 1590 pthread_mutex_unlock(&g_devlist_mutex); 1591 1592 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1593 assert(rc == 0); 1594 } 1595 1596 void 1597 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1598 { 1599 struct spdk_thread *thread; 1600 struct spdk_io_channel *ch; 1601 int rc __attribute__((unused)); 1602 1603 assert(i->cur_thread == spdk_get_thread()); 1604 1605 i->status = status; 1606 1607 pthread_mutex_lock(&g_devlist_mutex); 1608 if (status) { 1609 goto end; 1610 } 1611 thread = TAILQ_NEXT(i->cur_thread, tailq); 1612 while (thread) { 1613 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1614 if (ch->dev->io_device == i->io_device) { 1615 i->cur_thread = thread; 1616 i->ch = ch; 1617 pthread_mutex_unlock(&g_devlist_mutex); 1618 rc = spdk_thread_send_msg(thread, _call_channel, i); 1619 assert(rc == 0); 1620 return; 1621 } 1622 } 1623 thread = TAILQ_NEXT(thread, tailq); 1624 } 1625 1626 end: 1627 i->dev->for_each_count--; 1628 i->ch = NULL; 1629 pthread_mutex_unlock(&g_devlist_mutex); 1630 1631 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1632 assert(rc == 0); 1633 } 1634 1635 1636 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1637