1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 #include "spdk/fd_group.h" 43 44 #include "spdk/log.h" 45 #include "spdk_internal/thread.h" 46 47 #ifdef __linux__ 48 #include <sys/timerfd.h> 49 #include <sys/eventfd.h> 50 #endif 51 52 #define SPDK_MSG_BATCH_SIZE 8 53 #define SPDK_MAX_DEVICE_NAME_LEN 256 54 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 55 56 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 57 58 static spdk_new_thread_fn g_new_thread_fn = NULL; 59 static spdk_thread_op_fn g_thread_op_fn = NULL; 60 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 61 static size_t g_ctx_sz = 0; 62 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 63 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 64 * SPDK application is required. 65 */ 66 static uint64_t g_thread_id = 1; 67 68 struct io_device { 69 void *io_device; 70 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 71 spdk_io_channel_create_cb create_cb; 72 spdk_io_channel_destroy_cb destroy_cb; 73 spdk_io_device_unregister_cb unregister_cb; 74 struct spdk_thread *unregister_thread; 75 uint32_t ctx_size; 76 uint32_t for_each_count; 77 TAILQ_ENTRY(io_device) tailq; 78 79 uint32_t refcnt; 80 81 bool unregistered; 82 }; 83 84 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 85 86 struct spdk_msg { 87 spdk_msg_fn fn; 88 void *arg; 89 90 SLIST_ENTRY(spdk_msg) link; 91 }; 92 93 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 94 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 95 96 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 97 static uint32_t g_thread_count = 0; 98 99 static __thread struct spdk_thread *tls_thread = NULL; 100 101 static inline struct spdk_thread * 102 _get_thread(void) 103 { 104 return tls_thread; 105 } 106 107 static int 108 _thread_lib_init(size_t ctx_sz) 109 { 110 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 111 112 g_ctx_sz = ctx_sz; 113 114 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 115 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 116 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 117 sizeof(struct spdk_msg), 118 0, /* No cache. We do our own. */ 119 SPDK_ENV_SOCKET_ID_ANY); 120 121 if (!g_spdk_msg_mempool) { 122 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 123 return -1; 124 } 125 126 return 0; 127 } 128 129 int 130 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 131 { 132 assert(g_new_thread_fn == NULL); 133 assert(g_thread_op_fn == NULL); 134 135 if (new_thread_fn == NULL) { 136 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 137 } else { 138 g_new_thread_fn = new_thread_fn; 139 } 140 141 return _thread_lib_init(ctx_sz); 142 } 143 144 int 145 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 146 spdk_thread_op_supported_fn thread_op_supported_fn, 147 size_t ctx_sz) 148 { 149 assert(g_new_thread_fn == NULL); 150 assert(g_thread_op_fn == NULL); 151 assert(g_thread_op_supported_fn == NULL); 152 153 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 154 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 155 return -EINVAL; 156 } 157 158 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 159 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 160 } else { 161 g_thread_op_fn = thread_op_fn; 162 g_thread_op_supported_fn = thread_op_supported_fn; 163 } 164 165 return _thread_lib_init(ctx_sz); 166 } 167 168 void 169 spdk_thread_lib_fini(void) 170 { 171 struct io_device *dev; 172 173 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 174 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 175 } 176 177 if (g_spdk_msg_mempool) { 178 spdk_mempool_free(g_spdk_msg_mempool); 179 g_spdk_msg_mempool = NULL; 180 } 181 182 g_new_thread_fn = NULL; 183 g_thread_op_fn = NULL; 184 g_thread_op_supported_fn = NULL; 185 g_ctx_sz = 0; 186 } 187 188 static void thread_interrupt_destroy(struct spdk_thread *thread); 189 static int thread_interrupt_create(struct spdk_thread *thread); 190 191 static void 192 _free_thread(struct spdk_thread *thread) 193 { 194 struct spdk_io_channel *ch; 195 struct spdk_msg *msg; 196 struct spdk_poller *poller, *ptmp; 197 198 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 199 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 200 thread->name, ch->dev->name); 201 } 202 203 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 204 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 205 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 206 poller->name); 207 } 208 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 209 free(poller); 210 } 211 212 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 213 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 214 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 215 poller->name); 216 } 217 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 218 free(poller); 219 } 220 221 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 222 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 223 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 224 free(poller); 225 } 226 227 pthread_mutex_lock(&g_devlist_mutex); 228 assert(g_thread_count > 0); 229 g_thread_count--; 230 TAILQ_REMOVE(&g_threads, thread, tailq); 231 pthread_mutex_unlock(&g_devlist_mutex); 232 233 msg = SLIST_FIRST(&thread->msg_cache); 234 while (msg != NULL) { 235 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 236 237 assert(thread->msg_cache_count > 0); 238 thread->msg_cache_count--; 239 spdk_mempool_put(g_spdk_msg_mempool, msg); 240 241 msg = SLIST_FIRST(&thread->msg_cache); 242 } 243 244 assert(thread->msg_cache_count == 0); 245 246 if (thread->interrupt_mode) { 247 thread_interrupt_destroy(thread); 248 } 249 250 spdk_ring_free(thread->messages); 251 free(thread); 252 } 253 254 struct spdk_thread * 255 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 256 { 257 struct spdk_thread *thread; 258 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 259 int rc = 0, i; 260 261 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 262 if (!thread) { 263 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 264 return NULL; 265 } 266 267 if (cpumask) { 268 spdk_cpuset_copy(&thread->cpumask, cpumask); 269 } else { 270 spdk_cpuset_negate(&thread->cpumask); 271 } 272 273 TAILQ_INIT(&thread->io_channels); 274 TAILQ_INIT(&thread->active_pollers); 275 TAILQ_INIT(&thread->timed_pollers); 276 TAILQ_INIT(&thread->paused_pollers); 277 SLIST_INIT(&thread->msg_cache); 278 thread->msg_cache_count = 0; 279 280 thread->tsc_last = spdk_get_ticks(); 281 282 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 283 if (!thread->messages) { 284 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 285 free(thread); 286 return NULL; 287 } 288 289 /* Fill the local message pool cache. */ 290 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 291 if (rc == 0) { 292 /* If we can't populate the cache it's ok. The cache will get filled 293 * up organically as messages are passed to the thread. */ 294 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 295 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 296 thread->msg_cache_count++; 297 } 298 } 299 300 if (name) { 301 snprintf(thread->name, sizeof(thread->name), "%s", name); 302 } else { 303 snprintf(thread->name, sizeof(thread->name), "%p", thread); 304 } 305 306 pthread_mutex_lock(&g_devlist_mutex); 307 if (g_thread_id == 0) { 308 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 309 pthread_mutex_unlock(&g_devlist_mutex); 310 _free_thread(thread); 311 return NULL; 312 } 313 thread->id = g_thread_id++; 314 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 315 g_thread_count++; 316 pthread_mutex_unlock(&g_devlist_mutex); 317 318 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 319 thread->id, thread->name); 320 321 if (spdk_interrupt_mode_is_enabled()) { 322 thread->interrupt_mode = true; 323 rc = thread_interrupt_create(thread); 324 if (rc != 0) { 325 _free_thread(thread); 326 return NULL; 327 } 328 } 329 330 if (g_new_thread_fn) { 331 rc = g_new_thread_fn(thread); 332 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 333 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 334 } 335 336 if (rc != 0) { 337 _free_thread(thread); 338 return NULL; 339 } 340 341 thread->state = SPDK_THREAD_STATE_RUNNING; 342 343 return thread; 344 } 345 346 void 347 spdk_set_thread(struct spdk_thread *thread) 348 { 349 tls_thread = thread; 350 } 351 352 static void 353 thread_exit(struct spdk_thread *thread, uint64_t now) 354 { 355 struct spdk_poller *poller; 356 struct spdk_io_channel *ch; 357 358 if (now >= thread->exit_timeout_tsc) { 359 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 360 thread->name); 361 goto exited; 362 } 363 364 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 365 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 366 SPDK_INFOLOG(thread, 367 "thread %s still has active poller %s\n", 368 thread->name, poller->name); 369 return; 370 } 371 } 372 373 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 374 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 375 SPDK_INFOLOG(thread, 376 "thread %s still has active timed poller %s\n", 377 thread->name, poller->name); 378 return; 379 } 380 } 381 382 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 383 SPDK_INFOLOG(thread, 384 "thread %s still has paused poller %s\n", 385 thread->name, poller->name); 386 return; 387 } 388 389 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 390 SPDK_INFOLOG(thread, 391 "thread %s still has channel for io_device %s\n", 392 thread->name, ch->dev->name); 393 return; 394 } 395 396 if (thread->pending_unregister_count > 0) { 397 SPDK_INFOLOG(thread, 398 "thread %s is still unregistering io_devices\n", 399 thread->name); 400 return; 401 } 402 403 exited: 404 thread->state = SPDK_THREAD_STATE_EXITED; 405 } 406 407 int 408 spdk_thread_exit(struct spdk_thread *thread) 409 { 410 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 411 412 assert(tls_thread == thread); 413 414 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 415 SPDK_INFOLOG(thread, 416 "thread %s is already exiting\n", 417 thread->name); 418 return 0; 419 } 420 421 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 422 SPDK_THREAD_EXIT_TIMEOUT_SEC); 423 thread->state = SPDK_THREAD_STATE_EXITING; 424 return 0; 425 } 426 427 bool 428 spdk_thread_is_exited(struct spdk_thread *thread) 429 { 430 return thread->state == SPDK_THREAD_STATE_EXITED; 431 } 432 433 void 434 spdk_thread_destroy(struct spdk_thread *thread) 435 { 436 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 437 438 assert(thread->state == SPDK_THREAD_STATE_EXITED); 439 440 if (tls_thread == thread) { 441 tls_thread = NULL; 442 } 443 444 _free_thread(thread); 445 } 446 447 void * 448 spdk_thread_get_ctx(struct spdk_thread *thread) 449 { 450 if (g_ctx_sz > 0) { 451 return thread->ctx; 452 } 453 454 return NULL; 455 } 456 457 struct spdk_cpuset * 458 spdk_thread_get_cpumask(struct spdk_thread *thread) 459 { 460 return &thread->cpumask; 461 } 462 463 int 464 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 465 { 466 struct spdk_thread *thread; 467 468 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 469 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 470 assert(false); 471 return -ENOTSUP; 472 } 473 474 thread = spdk_get_thread(); 475 if (!thread) { 476 SPDK_ERRLOG("Called from non-SPDK thread\n"); 477 assert(false); 478 return -EINVAL; 479 } 480 481 spdk_cpuset_copy(&thread->cpumask, cpumask); 482 483 /* Invoke framework's reschedule operation. If this function is called multiple times 484 * in a single spdk_thread_poll() context, the last cpumask will be used in the 485 * reschedule operation. 486 */ 487 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 488 489 return 0; 490 } 491 492 struct spdk_thread * 493 spdk_thread_get_from_ctx(void *ctx) 494 { 495 if (ctx == NULL) { 496 assert(false); 497 return NULL; 498 } 499 500 assert(g_ctx_sz > 0); 501 502 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 503 } 504 505 static inline uint32_t 506 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 507 { 508 unsigned count, i; 509 void *messages[SPDK_MSG_BATCH_SIZE]; 510 uint64_t notify = 1; 511 int rc; 512 513 #ifdef DEBUG 514 /* 515 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 516 * so we will never actually read uninitialized data from events, but just to be sure 517 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 518 */ 519 memset(messages, 0, sizeof(messages)); 520 #endif 521 522 if (max_msgs > 0) { 523 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 524 } else { 525 max_msgs = SPDK_MSG_BATCH_SIZE; 526 } 527 if (thread->interrupt_mode) { 528 /* There may be race between msg_acknowledge and another producer's msg_notify, 529 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 530 * This can avoid msg notification missing. 531 */ 532 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 533 if (rc < 0 && errno != EAGAIN) { 534 SPDK_ERRLOG("failed to acknowledge msg_queue: %s.\n", spdk_strerror(errno)); 535 } 536 } 537 538 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 539 if (thread->interrupt_mode && spdk_ring_count(thread->messages) != 0) { 540 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 541 if (rc < 0) { 542 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 543 } 544 } 545 if (count == 0) { 546 return 0; 547 } 548 549 for (i = 0; i < count; i++) { 550 struct spdk_msg *msg = messages[i]; 551 552 assert(msg != NULL); 553 msg->fn(msg->arg); 554 555 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 556 /* Insert the messages at the head. We want to re-use the hot 557 * ones. */ 558 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 559 thread->msg_cache_count++; 560 } else { 561 spdk_mempool_put(g_spdk_msg_mempool, msg); 562 } 563 } 564 565 return count; 566 } 567 568 static void 569 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 570 { 571 struct spdk_poller *iter; 572 573 poller->next_run_tick = now + poller->period_ticks; 574 575 /* 576 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 577 * run time. 578 */ 579 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 580 if (iter->next_run_tick <= poller->next_run_tick) { 581 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 582 return; 583 } 584 } 585 586 /* No earlier pollers were found, so this poller must be the new head */ 587 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 588 } 589 590 static void 591 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 592 { 593 if (poller->period_ticks) { 594 poller_insert_timer(thread, poller, spdk_get_ticks()); 595 } else { 596 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 597 } 598 } 599 600 static inline void 601 thread_update_stats(struct spdk_thread *thread, uint64_t end, 602 uint64_t start, int rc) 603 { 604 if (rc == 0) { 605 /* Poller status idle */ 606 thread->stats.idle_tsc += end - start; 607 } else if (rc > 0) { 608 /* Poller status busy */ 609 thread->stats.busy_tsc += end - start; 610 } 611 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 612 thread->tsc_last = end; 613 } 614 615 static int 616 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 617 { 618 uint32_t msg_count; 619 struct spdk_poller *poller, *tmp; 620 spdk_msg_fn critical_msg; 621 int rc = 0; 622 623 thread->tsc_last = now; 624 625 critical_msg = thread->critical_msg; 626 if (spdk_unlikely(critical_msg != NULL)) { 627 critical_msg(NULL); 628 thread->critical_msg = NULL; 629 } 630 631 msg_count = msg_queue_run_batch(thread, max_msgs); 632 if (msg_count) { 633 rc = 1; 634 } 635 636 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 637 active_pollers_head, tailq, tmp) { 638 int poller_rc; 639 640 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 641 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 642 free(poller); 643 continue; 644 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 645 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 646 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 647 poller->state = SPDK_POLLER_STATE_PAUSED; 648 continue; 649 } 650 651 poller->state = SPDK_POLLER_STATE_RUNNING; 652 poller_rc = poller->fn(poller->arg); 653 654 poller->run_count++; 655 if (poller_rc > 0) { 656 poller->busy_count++; 657 } 658 659 #ifdef DEBUG 660 if (poller_rc == -1) { 661 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 662 } 663 #endif 664 665 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 666 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 667 free(poller); 668 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 669 poller->state = SPDK_POLLER_STATE_WAITING; 670 } 671 672 if (poller_rc > rc) { 673 rc = poller_rc; 674 } 675 } 676 677 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 678 int timer_rc = 0; 679 680 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 681 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 682 free(poller); 683 continue; 684 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 685 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 686 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 687 poller->state = SPDK_POLLER_STATE_PAUSED; 688 continue; 689 } 690 691 if (now < poller->next_run_tick) { 692 break; 693 } 694 695 poller->state = SPDK_POLLER_STATE_RUNNING; 696 timer_rc = poller->fn(poller->arg); 697 698 poller->run_count++; 699 if (timer_rc > 0) { 700 poller->busy_count++; 701 } 702 703 #ifdef DEBUG 704 if (timer_rc == -1) { 705 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 706 } 707 #endif 708 709 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 710 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 711 free(poller); 712 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 713 poller->state = SPDK_POLLER_STATE_WAITING; 714 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 715 poller_insert_timer(thread, poller, now); 716 } 717 718 if (timer_rc > rc) { 719 rc = timer_rc; 720 } 721 } 722 723 return rc; 724 } 725 726 int 727 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 728 { 729 struct spdk_thread *orig_thread; 730 int rc; 731 732 orig_thread = _get_thread(); 733 tls_thread = thread; 734 735 if (now == 0) { 736 now = spdk_get_ticks(); 737 } 738 739 if (!thread->interrupt_mode) { 740 rc = thread_poll(thread, max_msgs, now); 741 } else { 742 /* Non-block wait on thread's fd_group */ 743 rc = spdk_fd_group_wait(thread->fgrp, 0); 744 } 745 746 747 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 748 thread_exit(thread, now); 749 } 750 751 thread_update_stats(thread, spdk_get_ticks(), now, rc); 752 753 tls_thread = orig_thread; 754 755 return rc; 756 } 757 758 uint64_t 759 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 760 { 761 struct spdk_poller *poller; 762 763 poller = TAILQ_FIRST(&thread->timed_pollers); 764 if (poller) { 765 return poller->next_run_tick; 766 } 767 768 return 0; 769 } 770 771 int 772 spdk_thread_has_active_pollers(struct spdk_thread *thread) 773 { 774 return !TAILQ_EMPTY(&thread->active_pollers); 775 } 776 777 static bool 778 thread_has_unpaused_pollers(struct spdk_thread *thread) 779 { 780 if (TAILQ_EMPTY(&thread->active_pollers) && 781 TAILQ_EMPTY(&thread->timed_pollers)) { 782 return false; 783 } 784 785 return true; 786 } 787 788 bool 789 spdk_thread_has_pollers(struct spdk_thread *thread) 790 { 791 if (!thread_has_unpaused_pollers(thread) && 792 TAILQ_EMPTY(&thread->paused_pollers)) { 793 return false; 794 } 795 796 return true; 797 } 798 799 bool 800 spdk_thread_is_idle(struct spdk_thread *thread) 801 { 802 if (spdk_ring_count(thread->messages) || 803 thread_has_unpaused_pollers(thread) || 804 thread->critical_msg != NULL) { 805 return false; 806 } 807 808 return true; 809 } 810 811 uint32_t 812 spdk_thread_get_count(void) 813 { 814 /* 815 * Return cached value of the current thread count. We could acquire the 816 * lock and iterate through the TAILQ of threads to count them, but that 817 * count could still be invalidated after we release the lock. 818 */ 819 return g_thread_count; 820 } 821 822 struct spdk_thread * 823 spdk_get_thread(void) 824 { 825 return _get_thread(); 826 } 827 828 const char * 829 spdk_thread_get_name(const struct spdk_thread *thread) 830 { 831 return thread->name; 832 } 833 834 uint64_t 835 spdk_thread_get_id(const struct spdk_thread *thread) 836 { 837 return thread->id; 838 } 839 840 struct spdk_thread * 841 spdk_thread_get_by_id(uint64_t id) 842 { 843 struct spdk_thread *thread; 844 845 if (id == 0 || id >= g_thread_id) { 846 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 847 return NULL; 848 } 849 pthread_mutex_lock(&g_devlist_mutex); 850 TAILQ_FOREACH(thread, &g_threads, tailq) { 851 if (thread->id == id) { 852 break; 853 } 854 } 855 pthread_mutex_unlock(&g_devlist_mutex); 856 return thread; 857 } 858 859 int 860 spdk_thread_get_stats(struct spdk_thread_stats *stats) 861 { 862 struct spdk_thread *thread; 863 864 thread = _get_thread(); 865 if (!thread) { 866 SPDK_ERRLOG("No thread allocated\n"); 867 return -EINVAL; 868 } 869 870 if (stats == NULL) { 871 return -EINVAL; 872 } 873 874 *stats = thread->stats; 875 876 return 0; 877 } 878 879 uint64_t 880 spdk_thread_get_last_tsc(struct spdk_thread *thread) 881 { 882 if (thread == NULL) { 883 thread = _get_thread(); 884 } 885 886 return thread->tsc_last; 887 } 888 889 int 890 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 891 { 892 struct spdk_thread *local_thread; 893 struct spdk_msg *msg; 894 int rc; 895 896 assert(thread != NULL); 897 898 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 899 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 900 return -EIO; 901 } 902 903 local_thread = _get_thread(); 904 905 msg = NULL; 906 if (local_thread != NULL) { 907 if (local_thread->msg_cache_count > 0) { 908 msg = SLIST_FIRST(&local_thread->msg_cache); 909 assert(msg != NULL); 910 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 911 local_thread->msg_cache_count--; 912 } 913 } 914 915 if (msg == NULL) { 916 msg = spdk_mempool_get(g_spdk_msg_mempool); 917 if (!msg) { 918 SPDK_ERRLOG("msg could not be allocated\n"); 919 return -ENOMEM; 920 } 921 } 922 923 msg->fn = fn; 924 msg->arg = ctx; 925 926 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 927 if (rc != 1) { 928 SPDK_ERRLOG("msg could not be enqueued\n"); 929 spdk_mempool_put(g_spdk_msg_mempool, msg); 930 return -EIO; 931 } 932 933 if (thread->interrupt_mode) { 934 uint64_t notify = 1; 935 936 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 937 if (rc < 0) { 938 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 939 return -EIO; 940 } 941 } 942 943 return 0; 944 } 945 946 int 947 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 948 { 949 spdk_msg_fn expected = NULL; 950 951 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 952 __ATOMIC_SEQ_CST)) { 953 if (thread->interrupt_mode) { 954 uint64_t notify = 1; 955 int rc; 956 957 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 958 if (rc < 0) { 959 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 960 return -EIO; 961 } 962 } 963 964 return 0; 965 } 966 967 return -EIO; 968 } 969 970 #ifdef __linux__ 971 static int 972 interrupt_timerfd_prepare(uint64_t period_microseconds) 973 { 974 int timerfd; 975 int ret; 976 struct itimerspec new_tv; 977 uint64_t period_seconds; 978 uint64_t period_nanoseconds; 979 980 if (period_microseconds == 0) { 981 return -EINVAL; 982 } 983 984 period_seconds = period_microseconds / SPDK_SEC_TO_USEC; 985 period_nanoseconds = period_microseconds % SPDK_SEC_TO_USEC * 1000; 986 987 new_tv.it_value.tv_sec = period_seconds; 988 new_tv.it_value.tv_nsec = period_nanoseconds; 989 990 new_tv.it_interval.tv_sec = period_seconds; 991 new_tv.it_interval.tv_nsec = period_nanoseconds; 992 993 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK & TFD_CLOEXEC); 994 if (timerfd < 0) { 995 return -errno; 996 } 997 998 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 999 if (ret < 0) { 1000 close(timerfd); 1001 return -errno; 1002 } 1003 1004 return timerfd; 1005 } 1006 #else 1007 static int 1008 interrupt_timerfd_prepare(uint64_t period_microseconds) 1009 { 1010 return -ENOTSUP; 1011 } 1012 #endif 1013 1014 static int 1015 interrupt_timerfd_process(void *arg) 1016 { 1017 struct spdk_poller *poller = arg; 1018 uint64_t exp; 1019 int rc; 1020 1021 /* clear the level of interval timer */ 1022 rc = read(poller->timerfd, &exp, sizeof(exp)); 1023 if (rc < 0) { 1024 if (rc == -EAGAIN) { 1025 return 0; 1026 } 1027 1028 return rc; 1029 } 1030 1031 return poller->fn(poller->arg); 1032 } 1033 1034 static int 1035 thread_interrupt_register_timerfd(struct spdk_fd_group *fgrp, 1036 uint64_t period_microseconds, 1037 struct spdk_poller *poller) 1038 { 1039 int timerfd; 1040 int rc; 1041 1042 timerfd = interrupt_timerfd_prepare(period_microseconds); 1043 if (timerfd < 0) { 1044 return timerfd; 1045 } 1046 1047 rc = spdk_fd_group_add(fgrp, timerfd, 1048 interrupt_timerfd_process, poller); 1049 if (rc < 0) { 1050 close(timerfd); 1051 return rc; 1052 } 1053 1054 return timerfd; 1055 } 1056 1057 static struct spdk_poller * 1058 poller_register(spdk_poller_fn fn, 1059 void *arg, 1060 uint64_t period_microseconds, 1061 const char *name) 1062 { 1063 struct spdk_thread *thread; 1064 struct spdk_poller *poller; 1065 uint64_t quotient, remainder, ticks; 1066 1067 thread = spdk_get_thread(); 1068 if (!thread) { 1069 assert(false); 1070 return NULL; 1071 } 1072 1073 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1074 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1075 return NULL; 1076 } 1077 1078 poller = calloc(1, sizeof(*poller)); 1079 if (poller == NULL) { 1080 SPDK_ERRLOG("Poller memory allocation failed\n"); 1081 return NULL; 1082 } 1083 1084 if (name) { 1085 snprintf(poller->name, sizeof(poller->name), "%s", name); 1086 } else { 1087 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1088 } 1089 1090 poller->state = SPDK_POLLER_STATE_WAITING; 1091 poller->fn = fn; 1092 poller->arg = arg; 1093 poller->thread = thread; 1094 1095 if (period_microseconds) { 1096 quotient = period_microseconds / SPDK_SEC_TO_USEC; 1097 remainder = period_microseconds % SPDK_SEC_TO_USEC; 1098 ticks = spdk_get_ticks_hz(); 1099 1100 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1101 } else { 1102 poller->period_ticks = 0; 1103 } 1104 1105 if (thread->interrupt_mode && period_microseconds != 0) { 1106 int rc; 1107 1108 poller->timerfd = -1; 1109 rc = thread_interrupt_register_timerfd(thread->fgrp, period_microseconds, poller); 1110 if (rc < 0) { 1111 SPDK_ERRLOG("Failed to register timerfd for periodic poller: %s\n", spdk_strerror(-rc)); 1112 free(poller); 1113 return NULL; 1114 } 1115 poller->timerfd = rc; 1116 } 1117 1118 thread_insert_poller(thread, poller); 1119 1120 return poller; 1121 } 1122 1123 struct spdk_poller * 1124 spdk_poller_register(spdk_poller_fn fn, 1125 void *arg, 1126 uint64_t period_microseconds) 1127 { 1128 return poller_register(fn, arg, period_microseconds, NULL); 1129 } 1130 1131 struct spdk_poller * 1132 spdk_poller_register_named(spdk_poller_fn fn, 1133 void *arg, 1134 uint64_t period_microseconds, 1135 const char *name) 1136 { 1137 return poller_register(fn, arg, period_microseconds, name); 1138 } 1139 1140 void 1141 spdk_poller_unregister(struct spdk_poller **ppoller) 1142 { 1143 struct spdk_thread *thread; 1144 struct spdk_poller *poller; 1145 1146 poller = *ppoller; 1147 if (poller == NULL) { 1148 return; 1149 } 1150 1151 *ppoller = NULL; 1152 1153 thread = spdk_get_thread(); 1154 if (!thread) { 1155 assert(false); 1156 return; 1157 } 1158 1159 if (poller->thread != thread) { 1160 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1161 assert(false); 1162 return; 1163 } 1164 1165 if (thread->interrupt_mode && poller->timerfd >= 0) { 1166 spdk_fd_group_remove(thread->fgrp, poller->timerfd); 1167 close(poller->timerfd); 1168 poller->timerfd = -1; 1169 } 1170 1171 /* If the poller was paused, put it on the active_pollers list so that 1172 * its unregistration can be processed by spdk_thread_poll(). 1173 */ 1174 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1175 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1176 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1177 poller->period_ticks = 0; 1178 } 1179 1180 /* Simply set the state to unregistered. The poller will get cleaned up 1181 * in a subsequent call to spdk_thread_poll(). 1182 */ 1183 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1184 } 1185 1186 void 1187 spdk_poller_pause(struct spdk_poller *poller) 1188 { 1189 struct spdk_thread *thread; 1190 1191 if (poller->state == SPDK_POLLER_STATE_PAUSED || 1192 poller->state == SPDK_POLLER_STATE_PAUSING) { 1193 return; 1194 } 1195 1196 thread = spdk_get_thread(); 1197 if (!thread) { 1198 assert(false); 1199 return; 1200 } 1201 1202 /* If a poller is paused from within itself, we can immediately move it 1203 * on the paused_pollers list. Otherwise we just set its state to 1204 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 1205 * allows a poller to be paused from another one's context without 1206 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 1207 */ 1208 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 1209 poller->state = SPDK_POLLER_STATE_PAUSING; 1210 } else { 1211 if (poller->period_ticks > 0) { 1212 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 1213 } else { 1214 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1215 } 1216 1217 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1218 poller->state = SPDK_POLLER_STATE_PAUSED; 1219 } 1220 } 1221 1222 void 1223 spdk_poller_resume(struct spdk_poller *poller) 1224 { 1225 struct spdk_thread *thread; 1226 1227 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1228 poller->state != SPDK_POLLER_STATE_PAUSING) { 1229 return; 1230 } 1231 1232 thread = spdk_get_thread(); 1233 if (!thread) { 1234 assert(false); 1235 return; 1236 } 1237 1238 /* If a poller is paused it has to be removed from the paused pollers 1239 * list and put on the active / timer list depending on its 1240 * period_ticks. If a poller is still in the process of being paused, 1241 * we just need to flip its state back to waiting, as it's already on 1242 * the appropriate list. 1243 */ 1244 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1245 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1246 thread_insert_poller(thread, poller); 1247 } 1248 1249 poller->state = SPDK_POLLER_STATE_WAITING; 1250 } 1251 1252 const char * 1253 spdk_poller_state_str(enum spdk_poller_state state) 1254 { 1255 switch (state) { 1256 case SPDK_POLLER_STATE_WAITING: 1257 return "waiting"; 1258 case SPDK_POLLER_STATE_RUNNING: 1259 return "running"; 1260 case SPDK_POLLER_STATE_UNREGISTERED: 1261 return "unregistered"; 1262 case SPDK_POLLER_STATE_PAUSING: 1263 return "pausing"; 1264 case SPDK_POLLER_STATE_PAUSED: 1265 return "paused"; 1266 default: 1267 return NULL; 1268 } 1269 } 1270 1271 struct call_thread { 1272 struct spdk_thread *cur_thread; 1273 spdk_msg_fn fn; 1274 void *ctx; 1275 1276 struct spdk_thread *orig_thread; 1277 spdk_msg_fn cpl; 1278 }; 1279 1280 static void 1281 _on_thread(void *ctx) 1282 { 1283 struct call_thread *ct = ctx; 1284 int rc __attribute__((unused)); 1285 1286 ct->fn(ct->ctx); 1287 1288 pthread_mutex_lock(&g_devlist_mutex); 1289 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1290 pthread_mutex_unlock(&g_devlist_mutex); 1291 1292 if (!ct->cur_thread) { 1293 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1294 1295 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1296 free(ctx); 1297 } else { 1298 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1299 ct->cur_thread->name); 1300 1301 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1302 } 1303 assert(rc == 0); 1304 } 1305 1306 void 1307 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1308 { 1309 struct call_thread *ct; 1310 struct spdk_thread *thread; 1311 int rc __attribute__((unused)); 1312 1313 ct = calloc(1, sizeof(*ct)); 1314 if (!ct) { 1315 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1316 cpl(ctx); 1317 return; 1318 } 1319 1320 ct->fn = fn; 1321 ct->ctx = ctx; 1322 ct->cpl = cpl; 1323 1324 thread = _get_thread(); 1325 if (!thread) { 1326 SPDK_ERRLOG("No thread allocated\n"); 1327 free(ct); 1328 cpl(ctx); 1329 return; 1330 } 1331 ct->orig_thread = thread; 1332 1333 pthread_mutex_lock(&g_devlist_mutex); 1334 ct->cur_thread = TAILQ_FIRST(&g_threads); 1335 pthread_mutex_unlock(&g_devlist_mutex); 1336 1337 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1338 ct->orig_thread->name); 1339 1340 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1341 assert(rc == 0); 1342 } 1343 1344 void 1345 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1346 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1347 const char *name) 1348 { 1349 struct io_device *dev, *tmp; 1350 struct spdk_thread *thread; 1351 1352 assert(io_device != NULL); 1353 assert(create_cb != NULL); 1354 assert(destroy_cb != NULL); 1355 1356 thread = spdk_get_thread(); 1357 if (!thread) { 1358 SPDK_ERRLOG("called from non-SPDK thread\n"); 1359 assert(false); 1360 return; 1361 } 1362 1363 dev = calloc(1, sizeof(struct io_device)); 1364 if (dev == NULL) { 1365 SPDK_ERRLOG("could not allocate io_device\n"); 1366 return; 1367 } 1368 1369 dev->io_device = io_device; 1370 if (name) { 1371 snprintf(dev->name, sizeof(dev->name), "%s", name); 1372 } else { 1373 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1374 } 1375 dev->create_cb = create_cb; 1376 dev->destroy_cb = destroy_cb; 1377 dev->unregister_cb = NULL; 1378 dev->ctx_size = ctx_size; 1379 dev->for_each_count = 0; 1380 dev->unregistered = false; 1381 dev->refcnt = 0; 1382 1383 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1384 dev->name, dev->io_device, thread->name); 1385 1386 pthread_mutex_lock(&g_devlist_mutex); 1387 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1388 if (tmp->io_device == io_device) { 1389 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1390 io_device, tmp->name, dev->name); 1391 free(dev); 1392 pthread_mutex_unlock(&g_devlist_mutex); 1393 return; 1394 } 1395 } 1396 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1397 pthread_mutex_unlock(&g_devlist_mutex); 1398 } 1399 1400 static void 1401 _finish_unregister(void *arg) 1402 { 1403 struct io_device *dev = arg; 1404 struct spdk_thread *thread; 1405 1406 thread = spdk_get_thread(); 1407 assert(thread == dev->unregister_thread); 1408 1409 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1410 dev->name, dev->io_device, thread->name); 1411 1412 assert(thread->pending_unregister_count > 0); 1413 thread->pending_unregister_count--; 1414 1415 dev->unregister_cb(dev->io_device); 1416 free(dev); 1417 } 1418 1419 static void 1420 io_device_free(struct io_device *dev) 1421 { 1422 int rc __attribute__((unused)); 1423 1424 if (dev->unregister_cb == NULL) { 1425 free(dev); 1426 } else { 1427 assert(dev->unregister_thread != NULL); 1428 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1429 dev->name, dev->io_device, dev->unregister_thread->name); 1430 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1431 assert(rc == 0); 1432 } 1433 } 1434 1435 void 1436 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1437 { 1438 struct io_device *dev; 1439 uint32_t refcnt; 1440 struct spdk_thread *thread; 1441 1442 thread = spdk_get_thread(); 1443 if (!thread) { 1444 SPDK_ERRLOG("called from non-SPDK thread\n"); 1445 assert(false); 1446 return; 1447 } 1448 1449 pthread_mutex_lock(&g_devlist_mutex); 1450 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1451 if (dev->io_device == io_device) { 1452 break; 1453 } 1454 } 1455 1456 if (!dev) { 1457 SPDK_ERRLOG("io_device %p not found\n", io_device); 1458 assert(false); 1459 pthread_mutex_unlock(&g_devlist_mutex); 1460 return; 1461 } 1462 1463 if (dev->for_each_count > 0) { 1464 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1465 dev->name, io_device, dev->for_each_count); 1466 pthread_mutex_unlock(&g_devlist_mutex); 1467 return; 1468 } 1469 1470 dev->unregister_cb = unregister_cb; 1471 dev->unregistered = true; 1472 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1473 refcnt = dev->refcnt; 1474 dev->unregister_thread = thread; 1475 pthread_mutex_unlock(&g_devlist_mutex); 1476 1477 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1478 dev->name, dev->io_device, thread->name); 1479 1480 if (unregister_cb) { 1481 thread->pending_unregister_count++; 1482 } 1483 1484 if (refcnt > 0) { 1485 /* defer deletion */ 1486 return; 1487 } 1488 1489 io_device_free(dev); 1490 } 1491 1492 const char * 1493 spdk_io_device_get_name(struct io_device *dev) 1494 { 1495 return dev->name; 1496 } 1497 1498 struct spdk_io_channel * 1499 spdk_get_io_channel(void *io_device) 1500 { 1501 struct spdk_io_channel *ch; 1502 struct spdk_thread *thread; 1503 struct io_device *dev; 1504 int rc; 1505 1506 pthread_mutex_lock(&g_devlist_mutex); 1507 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1508 if (dev->io_device == io_device) { 1509 break; 1510 } 1511 } 1512 if (dev == NULL) { 1513 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1514 pthread_mutex_unlock(&g_devlist_mutex); 1515 return NULL; 1516 } 1517 1518 thread = _get_thread(); 1519 if (!thread) { 1520 SPDK_ERRLOG("No thread allocated\n"); 1521 pthread_mutex_unlock(&g_devlist_mutex); 1522 return NULL; 1523 } 1524 1525 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1526 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1527 pthread_mutex_unlock(&g_devlist_mutex); 1528 return NULL; 1529 } 1530 1531 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1532 if (ch->dev == dev) { 1533 ch->ref++; 1534 1535 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1536 ch, dev->name, dev->io_device, thread->name, ch->ref); 1537 1538 /* 1539 * An I/O channel already exists for this device on this 1540 * thread, so return it. 1541 */ 1542 pthread_mutex_unlock(&g_devlist_mutex); 1543 return ch; 1544 } 1545 } 1546 1547 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1548 if (ch == NULL) { 1549 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1550 pthread_mutex_unlock(&g_devlist_mutex); 1551 return NULL; 1552 } 1553 1554 ch->dev = dev; 1555 ch->destroy_cb = dev->destroy_cb; 1556 ch->thread = thread; 1557 ch->ref = 1; 1558 ch->destroy_ref = 0; 1559 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1560 1561 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1562 ch, dev->name, dev->io_device, thread->name, ch->ref); 1563 1564 dev->refcnt++; 1565 1566 pthread_mutex_unlock(&g_devlist_mutex); 1567 1568 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1569 if (rc != 0) { 1570 pthread_mutex_lock(&g_devlist_mutex); 1571 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1572 dev->refcnt--; 1573 free(ch); 1574 pthread_mutex_unlock(&g_devlist_mutex); 1575 return NULL; 1576 } 1577 1578 return ch; 1579 } 1580 1581 static void 1582 put_io_channel(void *arg) 1583 { 1584 struct spdk_io_channel *ch = arg; 1585 bool do_remove_dev = true; 1586 struct spdk_thread *thread; 1587 1588 thread = spdk_get_thread(); 1589 if (!thread) { 1590 SPDK_ERRLOG("called from non-SPDK thread\n"); 1591 assert(false); 1592 return; 1593 } 1594 1595 SPDK_DEBUGLOG(thread, 1596 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1597 ch, ch->dev->name, ch->dev->io_device, thread->name); 1598 1599 assert(ch->thread == thread); 1600 1601 ch->destroy_ref--; 1602 1603 if (ch->ref > 0 || ch->destroy_ref > 0) { 1604 /* 1605 * Another reference to the associated io_device was requested 1606 * after this message was sent but before it had a chance to 1607 * execute. 1608 */ 1609 return; 1610 } 1611 1612 pthread_mutex_lock(&g_devlist_mutex); 1613 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1614 pthread_mutex_unlock(&g_devlist_mutex); 1615 1616 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1617 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1618 1619 pthread_mutex_lock(&g_devlist_mutex); 1620 ch->dev->refcnt--; 1621 1622 if (!ch->dev->unregistered) { 1623 do_remove_dev = false; 1624 } 1625 1626 if (ch->dev->refcnt > 0) { 1627 do_remove_dev = false; 1628 } 1629 1630 pthread_mutex_unlock(&g_devlist_mutex); 1631 1632 if (do_remove_dev) { 1633 io_device_free(ch->dev); 1634 } 1635 free(ch); 1636 } 1637 1638 void 1639 spdk_put_io_channel(struct spdk_io_channel *ch) 1640 { 1641 struct spdk_thread *thread; 1642 int rc __attribute__((unused)); 1643 1644 thread = spdk_get_thread(); 1645 if (!thread) { 1646 SPDK_ERRLOG("called from non-SPDK thread\n"); 1647 assert(false); 1648 return; 1649 } 1650 1651 if (ch->thread != thread) { 1652 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1653 assert(false); 1654 return; 1655 } 1656 1657 SPDK_DEBUGLOG(thread, 1658 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1659 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1660 1661 ch->ref--; 1662 1663 if (ch->ref == 0) { 1664 ch->destroy_ref++; 1665 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 1666 assert(rc == 0); 1667 } 1668 } 1669 1670 struct spdk_io_channel * 1671 spdk_io_channel_from_ctx(void *ctx) 1672 { 1673 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1674 } 1675 1676 struct spdk_thread * 1677 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1678 { 1679 return ch->thread; 1680 } 1681 1682 struct spdk_io_channel_iter { 1683 void *io_device; 1684 struct io_device *dev; 1685 spdk_channel_msg fn; 1686 int status; 1687 void *ctx; 1688 struct spdk_io_channel *ch; 1689 1690 struct spdk_thread *cur_thread; 1691 1692 struct spdk_thread *orig_thread; 1693 spdk_channel_for_each_cpl cpl; 1694 }; 1695 1696 void * 1697 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1698 { 1699 return i->io_device; 1700 } 1701 1702 struct spdk_io_channel * 1703 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1704 { 1705 return i->ch; 1706 } 1707 1708 void * 1709 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1710 { 1711 return i->ctx; 1712 } 1713 1714 static void 1715 _call_completion(void *ctx) 1716 { 1717 struct spdk_io_channel_iter *i = ctx; 1718 1719 if (i->cpl != NULL) { 1720 i->cpl(i, i->status); 1721 } 1722 free(i); 1723 } 1724 1725 static void 1726 _call_channel(void *ctx) 1727 { 1728 struct spdk_io_channel_iter *i = ctx; 1729 struct spdk_io_channel *ch; 1730 1731 /* 1732 * It is possible that the channel was deleted before this 1733 * message had a chance to execute. If so, skip calling 1734 * the fn() on this thread. 1735 */ 1736 pthread_mutex_lock(&g_devlist_mutex); 1737 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1738 if (ch->dev->io_device == i->io_device) { 1739 break; 1740 } 1741 } 1742 pthread_mutex_unlock(&g_devlist_mutex); 1743 1744 if (ch) { 1745 i->fn(i); 1746 } else { 1747 spdk_for_each_channel_continue(i, 0); 1748 } 1749 } 1750 1751 void 1752 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1753 spdk_channel_for_each_cpl cpl) 1754 { 1755 struct spdk_thread *thread; 1756 struct spdk_io_channel *ch; 1757 struct spdk_io_channel_iter *i; 1758 int rc __attribute__((unused)); 1759 1760 i = calloc(1, sizeof(*i)); 1761 if (!i) { 1762 SPDK_ERRLOG("Unable to allocate iterator\n"); 1763 return; 1764 } 1765 1766 i->io_device = io_device; 1767 i->fn = fn; 1768 i->ctx = ctx; 1769 i->cpl = cpl; 1770 1771 pthread_mutex_lock(&g_devlist_mutex); 1772 i->orig_thread = _get_thread(); 1773 1774 TAILQ_FOREACH(thread, &g_threads, tailq) { 1775 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1776 if (ch->dev->io_device == io_device) { 1777 ch->dev->for_each_count++; 1778 i->dev = ch->dev; 1779 i->cur_thread = thread; 1780 i->ch = ch; 1781 pthread_mutex_unlock(&g_devlist_mutex); 1782 rc = spdk_thread_send_msg(thread, _call_channel, i); 1783 assert(rc == 0); 1784 return; 1785 } 1786 } 1787 } 1788 1789 pthread_mutex_unlock(&g_devlist_mutex); 1790 1791 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1792 assert(rc == 0); 1793 } 1794 1795 void 1796 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1797 { 1798 struct spdk_thread *thread; 1799 struct spdk_io_channel *ch; 1800 int rc __attribute__((unused)); 1801 1802 assert(i->cur_thread == spdk_get_thread()); 1803 1804 i->status = status; 1805 1806 pthread_mutex_lock(&g_devlist_mutex); 1807 if (status) { 1808 goto end; 1809 } 1810 thread = TAILQ_NEXT(i->cur_thread, tailq); 1811 while (thread) { 1812 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1813 if (ch->dev->io_device == i->io_device) { 1814 i->cur_thread = thread; 1815 i->ch = ch; 1816 pthread_mutex_unlock(&g_devlist_mutex); 1817 rc = spdk_thread_send_msg(thread, _call_channel, i); 1818 assert(rc == 0); 1819 return; 1820 } 1821 } 1822 thread = TAILQ_NEXT(thread, tailq); 1823 } 1824 1825 end: 1826 i->dev->for_each_count--; 1827 i->ch = NULL; 1828 pthread_mutex_unlock(&g_devlist_mutex); 1829 1830 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1831 assert(rc == 0); 1832 } 1833 1834 struct spdk_interrupt { 1835 int efd; 1836 struct spdk_thread *thread; 1837 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 1838 }; 1839 1840 static void 1841 thread_interrupt_destroy(struct spdk_thread *thread) 1842 { 1843 struct spdk_fd_group *fgrp = thread->fgrp; 1844 1845 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 1846 1847 if (thread->msg_fd < 0) { 1848 return; 1849 } 1850 1851 spdk_fd_group_remove(fgrp, thread->msg_fd); 1852 close(thread->msg_fd); 1853 thread->msg_fd = -1; 1854 1855 spdk_fd_group_destroy(fgrp); 1856 thread->fgrp = NULL; 1857 } 1858 1859 #ifdef __linux__ 1860 static int 1861 thread_interrupt_msg_process(void *arg) 1862 { 1863 struct spdk_thread *thread = arg; 1864 struct spdk_thread *orig_thread; 1865 uint32_t msg_count; 1866 spdk_msg_fn critical_msg; 1867 int rc = 0; 1868 uint64_t now = spdk_get_ticks(); 1869 1870 orig_thread = _get_thread(); 1871 tls_thread = thread; 1872 1873 critical_msg = thread->critical_msg; 1874 if (spdk_unlikely(critical_msg != NULL)) { 1875 critical_msg(NULL); 1876 thread->critical_msg = NULL; 1877 } 1878 1879 msg_count = msg_queue_run_batch(thread, 0); 1880 if (msg_count) { 1881 rc = 1; 1882 } 1883 1884 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1885 1886 tls_thread = orig_thread; 1887 1888 return rc; 1889 } 1890 1891 static int 1892 thread_interrupt_create(struct spdk_thread *thread) 1893 { 1894 int rc; 1895 1896 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 1897 1898 rc = spdk_fd_group_create(&thread->fgrp); 1899 if (rc) { 1900 return rc; 1901 } 1902 1903 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1904 if (thread->msg_fd < 0) { 1905 rc = -errno; 1906 spdk_fd_group_destroy(thread->fgrp); 1907 thread->fgrp = NULL; 1908 1909 return rc; 1910 } 1911 1912 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 1913 } 1914 #else 1915 static int 1916 thread_interrupt_create(struct spdk_thread *thread) 1917 { 1918 return -ENOTSUP; 1919 } 1920 #endif 1921 1922 struct spdk_interrupt * 1923 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 1924 void *arg, const char *name) 1925 { 1926 struct spdk_thread *thread; 1927 struct spdk_interrupt *intr; 1928 1929 thread = spdk_get_thread(); 1930 if (!thread) { 1931 assert(false); 1932 return NULL; 1933 } 1934 1935 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 1936 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1937 return NULL; 1938 } 1939 1940 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 1941 return NULL; 1942 } 1943 1944 intr = calloc(1, sizeof(*intr)); 1945 if (intr == NULL) { 1946 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 1947 return NULL; 1948 } 1949 1950 if (name) { 1951 snprintf(intr->name, sizeof(intr->name), "%s", name); 1952 } else { 1953 snprintf(intr->name, sizeof(intr->name), "%p", fn); 1954 } 1955 1956 intr->efd = efd; 1957 intr->thread = thread; 1958 1959 return intr; 1960 } 1961 1962 void 1963 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 1964 { 1965 struct spdk_thread *thread; 1966 struct spdk_interrupt *intr; 1967 1968 intr = *pintr; 1969 if (intr == NULL) { 1970 return; 1971 } 1972 1973 *pintr = NULL; 1974 1975 thread = spdk_get_thread(); 1976 if (!thread) { 1977 assert(false); 1978 return; 1979 } 1980 1981 if (intr->thread != thread) { 1982 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 1983 assert(false); 1984 return; 1985 } 1986 1987 spdk_fd_group_remove(thread->fgrp, intr->efd); 1988 free(intr); 1989 } 1990 1991 int 1992 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 1993 enum spdk_interrupt_event_types event_types) 1994 { 1995 struct spdk_thread *thread; 1996 1997 thread = spdk_get_thread(); 1998 if (!thread) { 1999 assert(false); 2000 return -EINVAL; 2001 } 2002 2003 if (intr->thread != thread) { 2004 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2005 assert(false); 2006 return -EINVAL; 2007 } 2008 2009 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2010 } 2011 2012 int 2013 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2014 { 2015 return spdk_fd_group_get_fd(thread->fgrp); 2016 } 2017 2018 static bool g_interrupt_mode = false; 2019 2020 int 2021 spdk_interrupt_mode_enable(void) 2022 { 2023 #ifdef __linux__ 2024 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2025 g_interrupt_mode = true; 2026 return 0; 2027 #else 2028 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2029 g_interrupt_mode = false; 2030 return -ENOTSUP; 2031 #endif 2032 } 2033 2034 bool 2035 spdk_interrupt_mode_is_enabled(void) 2036 { 2037 return g_interrupt_mode; 2038 } 2039 2040 SPDK_LOG_REGISTER_COMPONENT(thread) 2041