1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/queue.h" 38 #include "spdk/string.h" 39 #include "spdk/thread.h" 40 #include "spdk/util.h" 41 42 #include "spdk_internal/log.h" 43 #include "spdk_internal/thread.h" 44 45 #define SPDK_MSG_BATCH_SIZE 8 46 #define SPDK_MAX_DEVICE_NAME_LEN 256 47 #define SPDK_MAX_THREAD_NAME_LEN 256 48 49 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 50 51 static spdk_new_thread_fn g_new_thread_fn = NULL; 52 static size_t g_ctx_sz = 0; 53 54 struct io_device { 55 void *io_device; 56 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 57 spdk_io_channel_create_cb create_cb; 58 spdk_io_channel_destroy_cb destroy_cb; 59 spdk_io_device_unregister_cb unregister_cb; 60 struct spdk_thread *unregister_thread; 61 uint32_t ctx_size; 62 uint32_t for_each_count; 63 TAILQ_ENTRY(io_device) tailq; 64 65 uint32_t refcnt; 66 67 bool unregistered; 68 }; 69 70 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 71 72 struct spdk_msg { 73 spdk_msg_fn fn; 74 void *arg; 75 76 SLIST_ENTRY(spdk_msg) link; 77 }; 78 79 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 80 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 81 82 enum spdk_poller_state { 83 /* The poller is registered with a thread but not currently executing its fn. */ 84 SPDK_POLLER_STATE_WAITING, 85 86 /* The poller is currently running its fn. */ 87 SPDK_POLLER_STATE_RUNNING, 88 89 /* The poller was unregistered during the execution of its fn. */ 90 SPDK_POLLER_STATE_UNREGISTERED, 91 92 /* The poller is in the process of being paused. It will be paused 93 * during the next time it's supposed to be executed. 94 */ 95 SPDK_POLLER_STATE_PAUSING, 96 97 /* The poller is registered but currently paused. It's on the 98 * paused_pollers list. 99 */ 100 SPDK_POLLER_STATE_PAUSED, 101 }; 102 103 struct spdk_poller { 104 TAILQ_ENTRY(spdk_poller) tailq; 105 106 /* Current state of the poller; should only be accessed from the poller's thread. */ 107 enum spdk_poller_state state; 108 109 uint64_t period_ticks; 110 uint64_t next_run_tick; 111 spdk_poller_fn fn; 112 void *arg; 113 }; 114 115 struct spdk_thread { 116 TAILQ_HEAD(, spdk_io_channel) io_channels; 117 TAILQ_ENTRY(spdk_thread) tailq; 118 char name[SPDK_MAX_THREAD_NAME_LEN + 1]; 119 120 bool exit; 121 122 struct spdk_cpuset cpumask; 123 124 uint64_t tsc_last; 125 struct spdk_thread_stats stats; 126 127 /* 128 * Contains pollers actively running on this thread. Pollers 129 * are run round-robin. The thread takes one poller from the head 130 * of the ring, executes it, then puts it back at the tail of 131 * the ring. 132 */ 133 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers; 134 135 /** 136 * Contains pollers running on this thread with a periodic timer. 137 */ 138 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; 139 140 /* 141 * Contains paused pollers. Pollers on this queue are waiting until 142 * they are resumed (in which case they're put onto the active/timer 143 * queues) or unregistered. 144 */ 145 TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; 146 147 struct spdk_ring *messages; 148 149 SLIST_HEAD(, spdk_msg) msg_cache; 150 size_t msg_cache_count; 151 152 /* User context allocated at the end */ 153 uint8_t ctx[0]; 154 }; 155 156 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 157 static uint32_t g_thread_count = 0; 158 159 static __thread struct spdk_thread *tls_thread = NULL; 160 161 static inline struct spdk_thread * 162 _get_thread(void) 163 { 164 return tls_thread; 165 } 166 167 int 168 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 169 { 170 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 171 172 assert(g_new_thread_fn == NULL); 173 g_new_thread_fn = new_thread_fn; 174 175 g_ctx_sz = ctx_sz; 176 177 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 178 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 179 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 180 sizeof(struct spdk_msg), 181 0, /* No cache. We do our own. */ 182 SPDK_ENV_SOCKET_ID_ANY); 183 184 if (!g_spdk_msg_mempool) { 185 return -1; 186 } 187 188 return 0; 189 } 190 191 void 192 spdk_thread_lib_fini(void) 193 { 194 struct io_device *dev; 195 196 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 197 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 198 } 199 200 if (g_spdk_msg_mempool) { 201 spdk_mempool_free(g_spdk_msg_mempool); 202 g_spdk_msg_mempool = NULL; 203 } 204 205 g_new_thread_fn = NULL; 206 g_ctx_sz = 0; 207 } 208 209 static void 210 _free_thread(struct spdk_thread *thread) 211 { 212 struct spdk_io_channel *ch; 213 struct spdk_msg *msg; 214 struct spdk_poller *poller, *ptmp; 215 216 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 217 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 218 thread->name, ch->dev->name); 219 } 220 221 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 222 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 223 SPDK_WARNLOG("poller %p still registered at thread exit\n", 224 poller); 225 } 226 227 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 228 free(poller); 229 } 230 231 232 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { 233 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 234 SPDK_WARNLOG("poller %p still registered at thread exit\n", 235 poller); 236 } 237 238 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 239 free(poller); 240 } 241 242 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 243 SPDK_WARNLOG("poller %p still registered at thread exit\n", poller); 244 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 245 free(poller); 246 } 247 248 pthread_mutex_lock(&g_devlist_mutex); 249 assert(g_thread_count > 0); 250 g_thread_count--; 251 TAILQ_REMOVE(&g_threads, thread, tailq); 252 pthread_mutex_unlock(&g_devlist_mutex); 253 254 msg = SLIST_FIRST(&thread->msg_cache); 255 while (msg != NULL) { 256 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 257 258 assert(thread->msg_cache_count > 0); 259 thread->msg_cache_count--; 260 spdk_mempool_put(g_spdk_msg_mempool, msg); 261 262 msg = SLIST_FIRST(&thread->msg_cache); 263 } 264 265 assert(thread->msg_cache_count == 0); 266 267 spdk_ring_free(thread->messages); 268 free(thread); 269 } 270 271 struct spdk_thread * 272 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 273 { 274 struct spdk_thread *thread; 275 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 276 int rc, i; 277 278 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 279 if (!thread) { 280 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 281 return NULL; 282 } 283 284 if (cpumask) { 285 spdk_cpuset_copy(&thread->cpumask, cpumask); 286 } else { 287 spdk_cpuset_negate(&thread->cpumask); 288 } 289 290 TAILQ_INIT(&thread->io_channels); 291 TAILQ_INIT(&thread->active_pollers); 292 TAILQ_INIT(&thread->timer_pollers); 293 TAILQ_INIT(&thread->paused_pollers); 294 SLIST_INIT(&thread->msg_cache); 295 thread->msg_cache_count = 0; 296 297 thread->tsc_last = spdk_get_ticks(); 298 299 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 300 if (!thread->messages) { 301 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 302 free(thread); 303 return NULL; 304 } 305 306 /* Fill the local message pool cache. */ 307 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 308 if (rc == 0) { 309 /* If we can't populate the cache it's ok. The cache will get filled 310 * up organically as messages are passed to the thread. */ 311 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 312 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 313 thread->msg_cache_count++; 314 } 315 } 316 317 if (name) { 318 snprintf(thread->name, sizeof(thread->name), "%s", name); 319 } else { 320 snprintf(thread->name, sizeof(thread->name), "%p", thread); 321 } 322 323 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name); 324 325 pthread_mutex_lock(&g_devlist_mutex); 326 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 327 g_thread_count++; 328 pthread_mutex_unlock(&g_devlist_mutex); 329 330 if (g_new_thread_fn) { 331 rc = g_new_thread_fn(thread); 332 if (rc != 0) { 333 _free_thread(thread); 334 return NULL; 335 } 336 } 337 338 return thread; 339 } 340 341 void 342 spdk_set_thread(struct spdk_thread *thread) 343 { 344 tls_thread = thread; 345 } 346 347 void 348 spdk_thread_exit(struct spdk_thread *thread) 349 { 350 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); 351 352 assert(tls_thread == thread); 353 354 thread->exit = true; 355 } 356 357 void 358 spdk_thread_destroy(struct spdk_thread *thread) 359 { 360 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); 361 362 assert(thread->exit == true); 363 364 if (tls_thread == thread) { 365 tls_thread = NULL; 366 } 367 368 _free_thread(thread); 369 } 370 371 void * 372 spdk_thread_get_ctx(struct spdk_thread *thread) 373 { 374 if (g_ctx_sz > 0) { 375 return thread->ctx; 376 } 377 378 return NULL; 379 } 380 381 struct spdk_cpuset * 382 spdk_thread_get_cpumask(struct spdk_thread *thread) 383 { 384 return &thread->cpumask; 385 } 386 387 struct spdk_thread * 388 spdk_thread_get_from_ctx(void *ctx) 389 { 390 if (ctx == NULL) { 391 assert(false); 392 return NULL; 393 } 394 395 assert(g_ctx_sz > 0); 396 397 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 398 } 399 400 static inline uint32_t 401 _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 402 { 403 unsigned count, i; 404 void *messages[SPDK_MSG_BATCH_SIZE]; 405 406 #ifdef DEBUG 407 /* 408 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 409 * so we will never actually read uninitialized data from events, but just to be sure 410 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 411 */ 412 memset(messages, 0, sizeof(messages)); 413 #endif 414 415 if (max_msgs > 0) { 416 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 417 } else { 418 max_msgs = SPDK_MSG_BATCH_SIZE; 419 } 420 421 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 422 if (count == 0) { 423 return 0; 424 } 425 426 for (i = 0; i < count; i++) { 427 struct spdk_msg *msg = messages[i]; 428 429 assert(msg != NULL); 430 msg->fn(msg->arg); 431 432 if (thread->exit) { 433 break; 434 } 435 436 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 437 /* Insert the messages at the head. We want to re-use the hot 438 * ones. */ 439 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 440 thread->msg_cache_count++; 441 } else { 442 spdk_mempool_put(g_spdk_msg_mempool, msg); 443 } 444 } 445 446 return count; 447 } 448 449 static void 450 _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 451 { 452 struct spdk_poller *iter; 453 454 poller->next_run_tick = now + poller->period_ticks; 455 456 /* 457 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled 458 * run time. 459 */ 460 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { 461 if (iter->next_run_tick <= poller->next_run_tick) { 462 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); 463 return; 464 } 465 } 466 467 /* No earlier pollers were found, so this poller must be the new head */ 468 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); 469 } 470 471 static void 472 _spdk_thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 473 { 474 if (poller->period_ticks) { 475 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); 476 } else { 477 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 478 } 479 } 480 481 int 482 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 483 { 484 uint32_t msg_count; 485 struct spdk_thread *orig_thread; 486 struct spdk_poller *poller, *tmp; 487 int rc = 0; 488 489 orig_thread = _get_thread(); 490 tls_thread = thread; 491 492 if (now == 0) { 493 now = spdk_get_ticks(); 494 } 495 496 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 497 if (msg_count) { 498 rc = 1; 499 } 500 501 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 502 active_pollers_head, tailq, tmp) { 503 int poller_rc; 504 505 if (thread->exit) { 506 break; 507 } 508 509 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 510 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 511 free(poller); 512 continue; 513 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 514 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 515 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 516 poller->state = SPDK_POLLER_STATE_PAUSED; 517 continue; 518 } 519 520 poller->state = SPDK_POLLER_STATE_RUNNING; 521 poller_rc = poller->fn(poller->arg); 522 523 #ifdef DEBUG 524 if (poller_rc == -1) { 525 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); 526 } 527 #endif 528 529 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 530 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 531 free(poller); 532 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 533 poller->state = SPDK_POLLER_STATE_WAITING; 534 } 535 536 if (poller_rc > rc) { 537 rc = poller_rc; 538 } 539 } 540 541 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 542 int timer_rc = 0; 543 544 if (thread->exit) { 545 break; 546 } 547 548 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 549 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 550 free(poller); 551 continue; 552 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 553 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 554 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 555 poller->state = SPDK_POLLER_STATE_PAUSED; 556 continue; 557 } 558 559 if (now < poller->next_run_tick) { 560 break; 561 } 562 563 poller->state = SPDK_POLLER_STATE_RUNNING; 564 timer_rc = poller->fn(poller->arg); 565 566 #ifdef DEBUG 567 if (timer_rc == -1) { 568 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); 569 } 570 #endif 571 572 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 573 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 574 free(poller); 575 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 576 poller->state = SPDK_POLLER_STATE_WAITING; 577 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 578 _spdk_poller_insert_timer(thread, poller, now); 579 } 580 581 if (timer_rc > rc) { 582 rc = timer_rc; 583 } 584 } 585 586 if (rc == 0) { 587 /* Poller status idle */ 588 thread->stats.idle_tsc += now - thread->tsc_last; 589 } else if (rc > 0) { 590 /* Poller status busy */ 591 thread->stats.busy_tsc += now - thread->tsc_last; 592 } 593 thread->tsc_last = now; 594 595 tls_thread = orig_thread; 596 597 return rc; 598 } 599 600 uint64_t 601 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 602 { 603 struct spdk_poller *poller; 604 605 poller = TAILQ_FIRST(&thread->timer_pollers); 606 if (poller) { 607 return poller->next_run_tick; 608 } 609 610 return 0; 611 } 612 613 int 614 spdk_thread_has_active_pollers(struct spdk_thread *thread) 615 { 616 return !TAILQ_EMPTY(&thread->active_pollers); 617 } 618 619 static bool 620 _spdk_thread_has_unpaused_pollers(struct spdk_thread *thread) 621 { 622 if (TAILQ_EMPTY(&thread->active_pollers) && 623 TAILQ_EMPTY(&thread->timer_pollers)) { 624 return false; 625 } 626 627 return true; 628 } 629 630 bool 631 spdk_thread_has_pollers(struct spdk_thread *thread) 632 { 633 if (!_spdk_thread_has_unpaused_pollers(thread) && 634 TAILQ_EMPTY(&thread->paused_pollers)) { 635 return false; 636 } 637 638 return true; 639 } 640 641 bool 642 spdk_thread_is_idle(struct spdk_thread *thread) 643 { 644 if (spdk_ring_count(thread->messages) || 645 _spdk_thread_has_unpaused_pollers(thread)) { 646 return false; 647 } 648 649 return true; 650 } 651 652 uint32_t 653 spdk_thread_get_count(void) 654 { 655 /* 656 * Return cached value of the current thread count. We could acquire the 657 * lock and iterate through the TAILQ of threads to count them, but that 658 * count could still be invalidated after we release the lock. 659 */ 660 return g_thread_count; 661 } 662 663 struct spdk_thread * 664 spdk_get_thread(void) 665 { 666 struct spdk_thread *thread; 667 668 thread = _get_thread(); 669 if (!thread) { 670 SPDK_ERRLOG("No thread allocated\n"); 671 } 672 673 return thread; 674 } 675 676 const char * 677 spdk_thread_get_name(const struct spdk_thread *thread) 678 { 679 return thread->name; 680 } 681 682 int 683 spdk_thread_get_stats(struct spdk_thread_stats *stats) 684 { 685 struct spdk_thread *thread; 686 687 thread = _get_thread(); 688 if (!thread) { 689 SPDK_ERRLOG("No thread allocated\n"); 690 return -EINVAL; 691 } 692 693 if (stats == NULL) { 694 return -EINVAL; 695 } 696 697 *stats = thread->stats; 698 699 return 0; 700 } 701 702 int 703 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 704 { 705 struct spdk_thread *local_thread; 706 struct spdk_msg *msg; 707 int rc; 708 709 assert(thread != NULL); 710 711 local_thread = _get_thread(); 712 713 msg = NULL; 714 if (local_thread != NULL) { 715 if (local_thread->msg_cache_count > 0) { 716 msg = SLIST_FIRST(&local_thread->msg_cache); 717 assert(msg != NULL); 718 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 719 local_thread->msg_cache_count--; 720 } 721 } 722 723 if (msg == NULL) { 724 msg = spdk_mempool_get(g_spdk_msg_mempool); 725 if (!msg) { 726 SPDK_ERRLOG("msg could not be allocated\n"); 727 return -ENOMEM; 728 } 729 } 730 731 msg->fn = fn; 732 msg->arg = ctx; 733 734 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 735 if (rc != 1) { 736 SPDK_ERRLOG("msg could not be enqueued\n"); 737 spdk_mempool_put(g_spdk_msg_mempool, msg); 738 return -EIO; 739 } 740 741 return 0; 742 } 743 744 struct spdk_poller * 745 spdk_poller_register(spdk_poller_fn fn, 746 void *arg, 747 uint64_t period_microseconds) 748 { 749 struct spdk_thread *thread; 750 struct spdk_poller *poller; 751 uint64_t quotient, remainder, ticks; 752 753 thread = spdk_get_thread(); 754 if (!thread) { 755 assert(false); 756 return NULL; 757 } 758 759 poller = calloc(1, sizeof(*poller)); 760 if (poller == NULL) { 761 SPDK_ERRLOG("Poller memory allocation failed\n"); 762 return NULL; 763 } 764 765 poller->state = SPDK_POLLER_STATE_WAITING; 766 poller->fn = fn; 767 poller->arg = arg; 768 769 if (period_microseconds) { 770 quotient = period_microseconds / SPDK_SEC_TO_USEC; 771 remainder = period_microseconds % SPDK_SEC_TO_USEC; 772 ticks = spdk_get_ticks_hz(); 773 774 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 775 } else { 776 poller->period_ticks = 0; 777 } 778 779 _spdk_thread_insert_poller(thread, poller); 780 781 return poller; 782 } 783 784 void 785 spdk_poller_unregister(struct spdk_poller **ppoller) 786 { 787 struct spdk_thread *thread; 788 struct spdk_poller *poller; 789 790 poller = *ppoller; 791 if (poller == NULL) { 792 return; 793 } 794 795 *ppoller = NULL; 796 797 thread = spdk_get_thread(); 798 if (!thread) { 799 assert(false); 800 return; 801 } 802 803 /* If the poller was paused, put it on the active_pollers list so that 804 * its unregistration can be processed by spdk_thread_poll(). 805 */ 806 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 807 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 808 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 809 poller->period_ticks = 0; 810 } 811 812 /* Simply set the state to unregistered. The poller will get cleaned up 813 * in a subsequent call to spdk_thread_poll(). 814 */ 815 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 816 } 817 818 void 819 spdk_poller_pause(struct spdk_poller *poller) 820 { 821 struct spdk_thread *thread; 822 823 if (poller->state == SPDK_POLLER_STATE_PAUSED || 824 poller->state == SPDK_POLLER_STATE_PAUSING) { 825 return; 826 } 827 828 thread = spdk_get_thread(); 829 if (!thread) { 830 assert(false); 831 return; 832 } 833 834 /* If a poller is paused from within itself, we can immediately move it 835 * on the paused_pollers list. Otherwise we just set its state to 836 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 837 * allows a poller to be paused from another one's context without 838 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 839 */ 840 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 841 poller->state = SPDK_POLLER_STATE_PAUSING; 842 } else { 843 if (poller->period_ticks > 0) { 844 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); 845 } else { 846 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 847 } 848 849 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 850 poller->state = SPDK_POLLER_STATE_PAUSED; 851 } 852 } 853 854 void 855 spdk_poller_resume(struct spdk_poller *poller) 856 { 857 struct spdk_thread *thread; 858 859 if (poller->state != SPDK_POLLER_STATE_PAUSED && 860 poller->state != SPDK_POLLER_STATE_PAUSING) { 861 return; 862 } 863 864 thread = spdk_get_thread(); 865 if (!thread) { 866 assert(false); 867 return; 868 } 869 870 /* If a poller is paused it has to be removed from the paused pollers 871 * list and put on the active / timer list depending on its 872 * period_ticks. If a poller is still in the process of being paused, 873 * we just need to flip its state back to waiting, as it's already on 874 * the appropriate list. 875 */ 876 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 877 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 878 _spdk_thread_insert_poller(thread, poller); 879 } 880 881 poller->state = SPDK_POLLER_STATE_WAITING; 882 } 883 884 struct call_thread { 885 struct spdk_thread *cur_thread; 886 spdk_msg_fn fn; 887 void *ctx; 888 889 struct spdk_thread *orig_thread; 890 spdk_msg_fn cpl; 891 }; 892 893 static void 894 spdk_on_thread(void *ctx) 895 { 896 struct call_thread *ct = ctx; 897 898 ct->fn(ct->ctx); 899 900 pthread_mutex_lock(&g_devlist_mutex); 901 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 902 pthread_mutex_unlock(&g_devlist_mutex); 903 904 if (!ct->cur_thread) { 905 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); 906 907 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 908 free(ctx); 909 } else { 910 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", 911 ct->cur_thread->name); 912 913 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 914 } 915 } 916 917 void 918 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 919 { 920 struct call_thread *ct; 921 struct spdk_thread *thread; 922 923 ct = calloc(1, sizeof(*ct)); 924 if (!ct) { 925 SPDK_ERRLOG("Unable to perform thread iteration\n"); 926 cpl(ctx); 927 return; 928 } 929 930 ct->fn = fn; 931 ct->ctx = ctx; 932 ct->cpl = cpl; 933 934 thread = _get_thread(); 935 if (!thread) { 936 SPDK_ERRLOG("No thread allocated\n"); 937 free(ct); 938 cpl(ctx); 939 return; 940 } 941 ct->orig_thread = thread; 942 943 pthread_mutex_lock(&g_devlist_mutex); 944 ct->cur_thread = TAILQ_FIRST(&g_threads); 945 pthread_mutex_unlock(&g_devlist_mutex); 946 947 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", 948 ct->orig_thread->name); 949 950 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 951 } 952 953 void 954 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 955 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 956 const char *name) 957 { 958 struct io_device *dev, *tmp; 959 struct spdk_thread *thread; 960 961 assert(io_device != NULL); 962 assert(create_cb != NULL); 963 assert(destroy_cb != NULL); 964 965 thread = spdk_get_thread(); 966 if (!thread) { 967 SPDK_ERRLOG("called from non-SPDK thread\n"); 968 assert(false); 969 return; 970 } 971 972 dev = calloc(1, sizeof(struct io_device)); 973 if (dev == NULL) { 974 SPDK_ERRLOG("could not allocate io_device\n"); 975 return; 976 } 977 978 dev->io_device = io_device; 979 if (name) { 980 snprintf(dev->name, sizeof(dev->name), "%s", name); 981 } else { 982 snprintf(dev->name, sizeof(dev->name), "%p", dev); 983 } 984 dev->create_cb = create_cb; 985 dev->destroy_cb = destroy_cb; 986 dev->unregister_cb = NULL; 987 dev->ctx_size = ctx_size; 988 dev->for_each_count = 0; 989 dev->unregistered = false; 990 dev->refcnt = 0; 991 992 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", 993 dev->name, dev->io_device, thread->name); 994 995 pthread_mutex_lock(&g_devlist_mutex); 996 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 997 if (tmp->io_device == io_device) { 998 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 999 io_device, tmp->name, dev->name); 1000 free(dev); 1001 pthread_mutex_unlock(&g_devlist_mutex); 1002 return; 1003 } 1004 } 1005 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1006 pthread_mutex_unlock(&g_devlist_mutex); 1007 } 1008 1009 static void 1010 _finish_unregister(void *arg) 1011 { 1012 struct io_device *dev = arg; 1013 1014 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1015 dev->name, dev->io_device, dev->unregister_thread->name); 1016 1017 dev->unregister_cb(dev->io_device); 1018 free(dev); 1019 } 1020 1021 static void 1022 _spdk_io_device_free(struct io_device *dev) 1023 { 1024 if (dev->unregister_cb == NULL) { 1025 free(dev); 1026 } else { 1027 assert(dev->unregister_thread != NULL); 1028 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", 1029 dev->name, dev->io_device, dev->unregister_thread->name); 1030 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1031 } 1032 } 1033 1034 void 1035 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1036 { 1037 struct io_device *dev; 1038 uint32_t refcnt; 1039 struct spdk_thread *thread; 1040 1041 thread = spdk_get_thread(); 1042 if (!thread) { 1043 SPDK_ERRLOG("called from non-SPDK thread\n"); 1044 assert(false); 1045 return; 1046 } 1047 1048 pthread_mutex_lock(&g_devlist_mutex); 1049 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1050 if (dev->io_device == io_device) { 1051 break; 1052 } 1053 } 1054 1055 if (!dev) { 1056 SPDK_ERRLOG("io_device %p not found\n", io_device); 1057 assert(false); 1058 pthread_mutex_unlock(&g_devlist_mutex); 1059 return; 1060 } 1061 1062 if (dev->for_each_count > 0) { 1063 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1064 dev->name, io_device, dev->for_each_count); 1065 pthread_mutex_unlock(&g_devlist_mutex); 1066 return; 1067 } 1068 1069 dev->unregister_cb = unregister_cb; 1070 dev->unregistered = true; 1071 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1072 refcnt = dev->refcnt; 1073 dev->unregister_thread = thread; 1074 pthread_mutex_unlock(&g_devlist_mutex); 1075 1076 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", 1077 dev->name, dev->io_device, thread->name); 1078 1079 if (refcnt > 0) { 1080 /* defer deletion */ 1081 return; 1082 } 1083 1084 _spdk_io_device_free(dev); 1085 } 1086 1087 struct spdk_io_channel * 1088 spdk_get_io_channel(void *io_device) 1089 { 1090 struct spdk_io_channel *ch; 1091 struct spdk_thread *thread; 1092 struct io_device *dev; 1093 int rc; 1094 1095 pthread_mutex_lock(&g_devlist_mutex); 1096 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1097 if (dev->io_device == io_device) { 1098 break; 1099 } 1100 } 1101 if (dev == NULL) { 1102 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1103 pthread_mutex_unlock(&g_devlist_mutex); 1104 return NULL; 1105 } 1106 1107 thread = _get_thread(); 1108 if (!thread) { 1109 SPDK_ERRLOG("No thread allocated\n"); 1110 pthread_mutex_unlock(&g_devlist_mutex); 1111 return NULL; 1112 } 1113 1114 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1115 if (ch->dev == dev) { 1116 ch->ref++; 1117 1118 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1119 ch, dev->name, dev->io_device, thread->name, ch->ref); 1120 1121 /* 1122 * An I/O channel already exists for this device on this 1123 * thread, so return it. 1124 */ 1125 pthread_mutex_unlock(&g_devlist_mutex); 1126 return ch; 1127 } 1128 } 1129 1130 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1131 if (ch == NULL) { 1132 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1133 pthread_mutex_unlock(&g_devlist_mutex); 1134 return NULL; 1135 } 1136 1137 ch->dev = dev; 1138 ch->destroy_cb = dev->destroy_cb; 1139 ch->thread = thread; 1140 ch->ref = 1; 1141 ch->destroy_ref = 0; 1142 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1143 1144 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1145 ch, dev->name, dev->io_device, thread->name, ch->ref); 1146 1147 dev->refcnt++; 1148 1149 pthread_mutex_unlock(&g_devlist_mutex); 1150 1151 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1152 if (rc != 0) { 1153 pthread_mutex_lock(&g_devlist_mutex); 1154 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1155 dev->refcnt--; 1156 free(ch); 1157 pthread_mutex_unlock(&g_devlist_mutex); 1158 return NULL; 1159 } 1160 1161 return ch; 1162 } 1163 1164 static void 1165 _spdk_put_io_channel(void *arg) 1166 { 1167 struct spdk_io_channel *ch = arg; 1168 bool do_remove_dev = true; 1169 struct spdk_thread *thread; 1170 1171 thread = spdk_get_thread(); 1172 if (!thread) { 1173 SPDK_ERRLOG("called from non-SPDK thread\n"); 1174 assert(false); 1175 return; 1176 } 1177 1178 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1179 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n", 1180 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name); 1181 1182 assert(ch->thread == thread); 1183 1184 ch->destroy_ref--; 1185 1186 if (ch->ref > 0 || ch->destroy_ref > 0) { 1187 /* 1188 * Another reference to the associated io_device was requested 1189 * after this message was sent but before it had a chance to 1190 * execute. 1191 */ 1192 return; 1193 } 1194 1195 pthread_mutex_lock(&g_devlist_mutex); 1196 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1197 pthread_mutex_unlock(&g_devlist_mutex); 1198 1199 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1200 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1201 1202 pthread_mutex_lock(&g_devlist_mutex); 1203 ch->dev->refcnt--; 1204 1205 if (!ch->dev->unregistered) { 1206 do_remove_dev = false; 1207 } 1208 1209 if (ch->dev->refcnt > 0) { 1210 do_remove_dev = false; 1211 } 1212 1213 pthread_mutex_unlock(&g_devlist_mutex); 1214 1215 if (do_remove_dev) { 1216 _spdk_io_device_free(ch->dev); 1217 } 1218 free(ch); 1219 } 1220 1221 void 1222 spdk_put_io_channel(struct spdk_io_channel *ch) 1223 { 1224 SPDK_DEBUGLOG(SPDK_LOG_THREAD, 1225 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1226 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref); 1227 1228 ch->ref--; 1229 1230 if (ch->ref == 0) { 1231 ch->destroy_ref++; 1232 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 1233 } 1234 } 1235 1236 struct spdk_io_channel * 1237 spdk_io_channel_from_ctx(void *ctx) 1238 { 1239 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1240 } 1241 1242 struct spdk_thread * 1243 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1244 { 1245 return ch->thread; 1246 } 1247 1248 struct spdk_io_channel_iter { 1249 void *io_device; 1250 struct io_device *dev; 1251 spdk_channel_msg fn; 1252 int status; 1253 void *ctx; 1254 struct spdk_io_channel *ch; 1255 1256 struct spdk_thread *cur_thread; 1257 1258 struct spdk_thread *orig_thread; 1259 spdk_channel_for_each_cpl cpl; 1260 }; 1261 1262 void * 1263 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1264 { 1265 return i->io_device; 1266 } 1267 1268 struct spdk_io_channel * 1269 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1270 { 1271 return i->ch; 1272 } 1273 1274 void * 1275 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1276 { 1277 return i->ctx; 1278 } 1279 1280 static void 1281 _call_completion(void *ctx) 1282 { 1283 struct spdk_io_channel_iter *i = ctx; 1284 1285 if (i->cpl != NULL) { 1286 i->cpl(i, i->status); 1287 } 1288 free(i); 1289 } 1290 1291 static void 1292 _call_channel(void *ctx) 1293 { 1294 struct spdk_io_channel_iter *i = ctx; 1295 struct spdk_io_channel *ch; 1296 1297 /* 1298 * It is possible that the channel was deleted before this 1299 * message had a chance to execute. If so, skip calling 1300 * the fn() on this thread. 1301 */ 1302 pthread_mutex_lock(&g_devlist_mutex); 1303 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1304 if (ch->dev->io_device == i->io_device) { 1305 break; 1306 } 1307 } 1308 pthread_mutex_unlock(&g_devlist_mutex); 1309 1310 if (ch) { 1311 i->fn(i); 1312 } else { 1313 spdk_for_each_channel_continue(i, 0); 1314 } 1315 } 1316 1317 void 1318 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1319 spdk_channel_for_each_cpl cpl) 1320 { 1321 struct spdk_thread *thread; 1322 struct spdk_io_channel *ch; 1323 struct spdk_io_channel_iter *i; 1324 int rc __attribute__((unused)); 1325 1326 i = calloc(1, sizeof(*i)); 1327 if (!i) { 1328 SPDK_ERRLOG("Unable to allocate iterator\n"); 1329 return; 1330 } 1331 1332 i->io_device = io_device; 1333 i->fn = fn; 1334 i->ctx = ctx; 1335 i->cpl = cpl; 1336 1337 pthread_mutex_lock(&g_devlist_mutex); 1338 i->orig_thread = _get_thread(); 1339 1340 TAILQ_FOREACH(thread, &g_threads, tailq) { 1341 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1342 if (ch->dev->io_device == io_device) { 1343 ch->dev->for_each_count++; 1344 i->dev = ch->dev; 1345 i->cur_thread = thread; 1346 i->ch = ch; 1347 pthread_mutex_unlock(&g_devlist_mutex); 1348 rc = spdk_thread_send_msg(thread, _call_channel, i); 1349 assert(rc == 0); 1350 return; 1351 } 1352 } 1353 } 1354 1355 pthread_mutex_unlock(&g_devlist_mutex); 1356 1357 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1358 assert(rc == 0); 1359 } 1360 1361 void 1362 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1363 { 1364 struct spdk_thread *thread; 1365 struct spdk_io_channel *ch; 1366 1367 assert(i->cur_thread == spdk_get_thread()); 1368 1369 i->status = status; 1370 1371 pthread_mutex_lock(&g_devlist_mutex); 1372 if (status) { 1373 goto end; 1374 } 1375 thread = TAILQ_NEXT(i->cur_thread, tailq); 1376 while (thread) { 1377 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1378 if (ch->dev->io_device == i->io_device) { 1379 i->cur_thread = thread; 1380 i->ch = ch; 1381 pthread_mutex_unlock(&g_devlist_mutex); 1382 spdk_thread_send_msg(thread, _call_channel, i); 1383 return; 1384 } 1385 } 1386 thread = TAILQ_NEXT(thread, tailq); 1387 } 1388 1389 end: 1390 i->dev->for_each_count--; 1391 i->ch = NULL; 1392 pthread_mutex_unlock(&g_devlist_mutex); 1393 1394 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1395 } 1396 1397 1398 SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) 1399