1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 #include "spdk/fd_group.h" 43 44 #include "spdk/log.h" 45 #include "spdk_internal/thread.h" 46 47 #ifdef __linux__ 48 #include <sys/timerfd.h> 49 #include <sys/eventfd.h> 50 #endif 51 52 #define SPDK_MSG_BATCH_SIZE 8 53 #define SPDK_MAX_DEVICE_NAME_LEN 256 54 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 55 56 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 57 58 static spdk_new_thread_fn g_new_thread_fn = NULL; 59 static spdk_thread_op_fn g_thread_op_fn = NULL; 60 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 61 static size_t g_ctx_sz = 0; 62 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 63 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 64 * SPDK application is required. 65 */ 66 static uint64_t g_thread_id = 1; 67 68 struct io_device { 69 void *io_device; 70 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 71 spdk_io_channel_create_cb create_cb; 72 spdk_io_channel_destroy_cb destroy_cb; 73 spdk_io_device_unregister_cb unregister_cb; 74 struct spdk_thread *unregister_thread; 75 uint32_t ctx_size; 76 uint32_t for_each_count; 77 TAILQ_ENTRY(io_device) tailq; 78 79 uint32_t refcnt; 80 81 bool unregistered; 82 }; 83 84 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 85 86 struct spdk_msg { 87 spdk_msg_fn fn; 88 void *arg; 89 90 SLIST_ENTRY(spdk_msg) link; 91 }; 92 93 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 94 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 95 96 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 97 static uint32_t g_thread_count = 0; 98 99 static __thread struct spdk_thread *tls_thread = NULL; 100 101 static inline struct spdk_thread * 102 _get_thread(void) 103 { 104 return tls_thread; 105 } 106 107 static int 108 _thread_lib_init(size_t ctx_sz) 109 { 110 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 111 112 g_ctx_sz = ctx_sz; 113 114 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 115 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 116 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 117 sizeof(struct spdk_msg), 118 0, /* No cache. We do our own. */ 119 SPDK_ENV_SOCKET_ID_ANY); 120 121 if (!g_spdk_msg_mempool) { 122 return -1; 123 } 124 125 return 0; 126 } 127 128 int 129 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 130 { 131 assert(g_new_thread_fn == NULL); 132 assert(g_thread_op_fn == NULL); 133 134 if (new_thread_fn == NULL) { 135 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 136 } else { 137 g_new_thread_fn = new_thread_fn; 138 } 139 140 return _thread_lib_init(ctx_sz); 141 } 142 143 int 144 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 145 spdk_thread_op_supported_fn thread_op_supported_fn, 146 size_t ctx_sz) 147 { 148 assert(g_new_thread_fn == NULL); 149 assert(g_thread_op_fn == NULL); 150 assert(g_thread_op_supported_fn == NULL); 151 152 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 153 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 154 return -EINVAL; 155 } 156 157 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 158 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 159 } else { 160 g_thread_op_fn = thread_op_fn; 161 g_thread_op_supported_fn = thread_op_supported_fn; 162 } 163 164 return _thread_lib_init(ctx_sz); 165 } 166 167 void 168 spdk_thread_lib_fini(void) 169 { 170 struct io_device *dev; 171 172 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 173 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 174 } 175 176 if (g_spdk_msg_mempool) { 177 spdk_mempool_free(g_spdk_msg_mempool); 178 g_spdk_msg_mempool = NULL; 179 } 180 181 g_new_thread_fn = NULL; 182 g_thread_op_fn = NULL; 183 g_thread_op_supported_fn = NULL; 184 g_ctx_sz = 0; 185 } 186 187 static void thread_interrupt_destroy(struct spdk_thread *thread); 188 static int thread_interrupt_create(struct spdk_thread *thread); 189 190 static void 191 _free_thread(struct spdk_thread *thread) 192 { 193 struct spdk_io_channel *ch; 194 struct spdk_msg *msg; 195 struct spdk_poller *poller, *ptmp; 196 197 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 198 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 199 thread->name, ch->dev->name); 200 } 201 202 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 203 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 204 SPDK_WARNLOG("poller %s still registered at thread exit\n", 205 poller->name); 206 } 207 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 208 free(poller); 209 } 210 211 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 212 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 213 SPDK_WARNLOG("poller %s still registered at thread exit\n", 214 poller->name); 215 } 216 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 217 free(poller); 218 } 219 220 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 221 SPDK_WARNLOG("poller %s still registered at thread exit\n", poller->name); 222 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 223 free(poller); 224 } 225 226 pthread_mutex_lock(&g_devlist_mutex); 227 assert(g_thread_count > 0); 228 g_thread_count--; 229 TAILQ_REMOVE(&g_threads, thread, tailq); 230 pthread_mutex_unlock(&g_devlist_mutex); 231 232 msg = SLIST_FIRST(&thread->msg_cache); 233 while (msg != NULL) { 234 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 235 236 assert(thread->msg_cache_count > 0); 237 thread->msg_cache_count--; 238 spdk_mempool_put(g_spdk_msg_mempool, msg); 239 240 msg = SLIST_FIRST(&thread->msg_cache); 241 } 242 243 assert(thread->msg_cache_count == 0); 244 245 if (thread->interrupt_mode) { 246 thread_interrupt_destroy(thread); 247 } 248 249 spdk_ring_free(thread->messages); 250 free(thread); 251 } 252 253 struct spdk_thread * 254 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 255 { 256 struct spdk_thread *thread; 257 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 258 int rc = 0, i; 259 260 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 261 if (!thread) { 262 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 263 return NULL; 264 } 265 266 if (cpumask) { 267 spdk_cpuset_copy(&thread->cpumask, cpumask); 268 } else { 269 spdk_cpuset_negate(&thread->cpumask); 270 } 271 272 TAILQ_INIT(&thread->io_channels); 273 TAILQ_INIT(&thread->active_pollers); 274 TAILQ_INIT(&thread->timed_pollers); 275 TAILQ_INIT(&thread->paused_pollers); 276 SLIST_INIT(&thread->msg_cache); 277 thread->msg_cache_count = 0; 278 279 thread->tsc_last = spdk_get_ticks(); 280 281 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 282 if (!thread->messages) { 283 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 284 free(thread); 285 return NULL; 286 } 287 288 /* Fill the local message pool cache. */ 289 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 290 if (rc == 0) { 291 /* If we can't populate the cache it's ok. The cache will get filled 292 * up organically as messages are passed to the thread. */ 293 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 294 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 295 thread->msg_cache_count++; 296 } 297 } 298 299 if (name) { 300 snprintf(thread->name, sizeof(thread->name), "%s", name); 301 } else { 302 snprintf(thread->name, sizeof(thread->name), "%p", thread); 303 } 304 305 pthread_mutex_lock(&g_devlist_mutex); 306 if (g_thread_id == 0) { 307 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 308 pthread_mutex_unlock(&g_devlist_mutex); 309 _free_thread(thread); 310 return NULL; 311 } 312 thread->id = g_thread_id++; 313 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 314 g_thread_count++; 315 pthread_mutex_unlock(&g_devlist_mutex); 316 317 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 318 thread->id, thread->name); 319 320 if (spdk_interrupt_mode_is_enabled()) { 321 thread->interrupt_mode = true; 322 rc = thread_interrupt_create(thread); 323 if (rc != 0) { 324 _free_thread(thread); 325 return NULL; 326 } 327 } 328 329 if (g_new_thread_fn) { 330 rc = g_new_thread_fn(thread); 331 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 332 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 333 } 334 335 if (rc != 0) { 336 _free_thread(thread); 337 return NULL; 338 } 339 340 thread->state = SPDK_THREAD_STATE_RUNNING; 341 342 return thread; 343 } 344 345 void 346 spdk_set_thread(struct spdk_thread *thread) 347 { 348 tls_thread = thread; 349 } 350 351 static void 352 thread_exit(struct spdk_thread *thread, uint64_t now) 353 { 354 struct spdk_poller *poller; 355 struct spdk_io_channel *ch; 356 357 if (now >= thread->exit_timeout_tsc) { 358 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 359 thread->name); 360 goto exited; 361 } 362 363 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 364 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 365 SPDK_INFOLOG(thread, 366 "thread %s still has active poller %s\n", 367 thread->name, poller->name); 368 return; 369 } 370 } 371 372 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 373 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 374 SPDK_INFOLOG(thread, 375 "thread %s still has active timed poller %s\n", 376 thread->name, poller->name); 377 return; 378 } 379 } 380 381 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 382 SPDK_INFOLOG(thread, 383 "thread %s still has paused poller %s\n", 384 thread->name, poller->name); 385 return; 386 } 387 388 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 389 SPDK_INFOLOG(thread, 390 "thread %s still has channel for io_device %s\n", 391 thread->name, ch->dev->name); 392 return; 393 } 394 395 exited: 396 thread->state = SPDK_THREAD_STATE_EXITED; 397 } 398 399 int 400 spdk_thread_exit(struct spdk_thread *thread) 401 { 402 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 403 404 assert(tls_thread == thread); 405 406 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 407 SPDK_INFOLOG(thread, 408 "thread %s is already exiting\n", 409 thread->name); 410 return 0; 411 } 412 413 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 414 SPDK_THREAD_EXIT_TIMEOUT_SEC); 415 thread->state = SPDK_THREAD_STATE_EXITING; 416 return 0; 417 } 418 419 bool 420 spdk_thread_is_exited(struct spdk_thread *thread) 421 { 422 return thread->state == SPDK_THREAD_STATE_EXITED; 423 } 424 425 void 426 spdk_thread_destroy(struct spdk_thread *thread) 427 { 428 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 429 430 assert(thread->state == SPDK_THREAD_STATE_EXITED); 431 432 if (tls_thread == thread) { 433 tls_thread = NULL; 434 } 435 436 _free_thread(thread); 437 } 438 439 void * 440 spdk_thread_get_ctx(struct spdk_thread *thread) 441 { 442 if (g_ctx_sz > 0) { 443 return thread->ctx; 444 } 445 446 return NULL; 447 } 448 449 struct spdk_cpuset * 450 spdk_thread_get_cpumask(struct spdk_thread *thread) 451 { 452 return &thread->cpumask; 453 } 454 455 int 456 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 457 { 458 struct spdk_thread *thread; 459 460 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 461 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 462 assert(false); 463 return -ENOTSUP; 464 } 465 466 thread = spdk_get_thread(); 467 if (!thread) { 468 SPDK_ERRLOG("Called from non-SPDK thread\n"); 469 assert(false); 470 return -EINVAL; 471 } 472 473 spdk_cpuset_copy(&thread->cpumask, cpumask); 474 475 /* Invoke framework's reschedule operation. If this function is called multiple times 476 * in a single spdk_thread_poll() context, the last cpumask will be used in the 477 * reschedule operation. 478 */ 479 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 480 481 return 0; 482 } 483 484 struct spdk_thread * 485 spdk_thread_get_from_ctx(void *ctx) 486 { 487 if (ctx == NULL) { 488 assert(false); 489 return NULL; 490 } 491 492 assert(g_ctx_sz > 0); 493 494 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 495 } 496 497 static inline uint32_t 498 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 499 { 500 unsigned count, i; 501 void *messages[SPDK_MSG_BATCH_SIZE]; 502 uint64_t notify = 1; 503 int rc; 504 505 #ifdef DEBUG 506 /* 507 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 508 * so we will never actually read uninitialized data from events, but just to be sure 509 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 510 */ 511 memset(messages, 0, sizeof(messages)); 512 #endif 513 514 if (max_msgs > 0) { 515 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 516 } else { 517 max_msgs = SPDK_MSG_BATCH_SIZE; 518 } 519 if (thread->interrupt_mode) { 520 /* There may be race between msg_acknowledge and another producer's msg_notify, 521 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 522 * This can avoid msg notification missing. 523 */ 524 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 525 if (rc < 0) { 526 SPDK_ERRLOG("failed to acknowledge msg_queue: %s.\n", spdk_strerror(errno)); 527 } 528 } 529 530 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 531 if (thread->interrupt_mode && spdk_ring_count(thread->messages) != 0) { 532 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 533 if (rc < 0) { 534 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 535 } 536 } 537 if (count == 0) { 538 return 0; 539 } 540 541 for (i = 0; i < count; i++) { 542 struct spdk_msg *msg = messages[i]; 543 544 assert(msg != NULL); 545 msg->fn(msg->arg); 546 547 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 548 /* Insert the messages at the head. We want to re-use the hot 549 * ones. */ 550 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 551 thread->msg_cache_count++; 552 } else { 553 spdk_mempool_put(g_spdk_msg_mempool, msg); 554 } 555 } 556 557 return count; 558 } 559 560 static void 561 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 562 { 563 struct spdk_poller *iter; 564 565 poller->next_run_tick = now + poller->period_ticks; 566 567 /* 568 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 569 * run time. 570 */ 571 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 572 if (iter->next_run_tick <= poller->next_run_tick) { 573 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 574 return; 575 } 576 } 577 578 /* No earlier pollers were found, so this poller must be the new head */ 579 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 580 } 581 582 static void 583 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 584 { 585 if (poller->period_ticks) { 586 poller_insert_timer(thread, poller, spdk_get_ticks()); 587 } else { 588 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 589 } 590 } 591 592 static inline void 593 thread_update_stats(struct spdk_thread *thread, uint64_t end, 594 uint64_t start, int rc) 595 { 596 if (rc == 0) { 597 /* Poller status idle */ 598 thread->stats.idle_tsc += end - start; 599 } else if (rc > 0) { 600 /* Poller status busy */ 601 thread->stats.busy_tsc += end - start; 602 } 603 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 604 thread->tsc_last = end; 605 } 606 607 static int 608 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 609 { 610 uint32_t msg_count; 611 struct spdk_poller *poller, *tmp; 612 spdk_msg_fn critical_msg; 613 int rc = 0; 614 615 thread->tsc_last = now; 616 617 critical_msg = thread->critical_msg; 618 if (spdk_unlikely(critical_msg != NULL)) { 619 critical_msg(NULL); 620 thread->critical_msg = NULL; 621 } 622 623 msg_count = msg_queue_run_batch(thread, max_msgs); 624 if (msg_count) { 625 rc = 1; 626 } 627 628 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 629 active_pollers_head, tailq, tmp) { 630 int poller_rc; 631 632 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 633 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 634 free(poller); 635 continue; 636 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 637 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 638 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 639 poller->state = SPDK_POLLER_STATE_PAUSED; 640 continue; 641 } 642 643 poller->state = SPDK_POLLER_STATE_RUNNING; 644 poller_rc = poller->fn(poller->arg); 645 646 poller->run_count++; 647 if (poller_rc > 0) { 648 poller->busy_count++; 649 } 650 651 #ifdef DEBUG 652 if (poller_rc == -1) { 653 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 654 } 655 #endif 656 657 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 658 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 659 free(poller); 660 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 661 poller->state = SPDK_POLLER_STATE_WAITING; 662 } 663 664 if (poller_rc > rc) { 665 rc = poller_rc; 666 } 667 } 668 669 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 670 int timer_rc = 0; 671 672 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 673 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 674 free(poller); 675 continue; 676 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 677 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 678 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 679 poller->state = SPDK_POLLER_STATE_PAUSED; 680 continue; 681 } 682 683 if (now < poller->next_run_tick) { 684 break; 685 } 686 687 poller->state = SPDK_POLLER_STATE_RUNNING; 688 timer_rc = poller->fn(poller->arg); 689 690 poller->run_count++; 691 if (timer_rc > 0) { 692 poller->busy_count++; 693 } 694 695 #ifdef DEBUG 696 if (timer_rc == -1) { 697 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 698 } 699 #endif 700 701 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 702 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 703 free(poller); 704 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 705 poller->state = SPDK_POLLER_STATE_WAITING; 706 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 707 poller_insert_timer(thread, poller, now); 708 } 709 710 if (timer_rc > rc) { 711 rc = timer_rc; 712 } 713 } 714 715 return rc; 716 } 717 718 int 719 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 720 { 721 struct spdk_thread *orig_thread; 722 int rc; 723 724 orig_thread = _get_thread(); 725 tls_thread = thread; 726 727 if (now == 0) { 728 now = spdk_get_ticks(); 729 } 730 731 if (!thread->interrupt_mode) { 732 rc = thread_poll(thread, max_msgs, now); 733 } else { 734 /* Non-block wait on thread's fd_group */ 735 rc = spdk_fd_group_wait(thread->fgrp, 0); 736 } 737 738 739 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 740 thread_exit(thread, now); 741 } 742 743 thread_update_stats(thread, spdk_get_ticks(), now, rc); 744 745 tls_thread = orig_thread; 746 747 return rc; 748 } 749 750 uint64_t 751 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 752 { 753 struct spdk_poller *poller; 754 755 poller = TAILQ_FIRST(&thread->timed_pollers); 756 if (poller) { 757 return poller->next_run_tick; 758 } 759 760 return 0; 761 } 762 763 int 764 spdk_thread_has_active_pollers(struct spdk_thread *thread) 765 { 766 return !TAILQ_EMPTY(&thread->active_pollers); 767 } 768 769 static bool 770 thread_has_unpaused_pollers(struct spdk_thread *thread) 771 { 772 if (TAILQ_EMPTY(&thread->active_pollers) && 773 TAILQ_EMPTY(&thread->timed_pollers)) { 774 return false; 775 } 776 777 return true; 778 } 779 780 bool 781 spdk_thread_has_pollers(struct spdk_thread *thread) 782 { 783 if (!thread_has_unpaused_pollers(thread) && 784 TAILQ_EMPTY(&thread->paused_pollers)) { 785 return false; 786 } 787 788 return true; 789 } 790 791 bool 792 spdk_thread_is_idle(struct spdk_thread *thread) 793 { 794 if (spdk_ring_count(thread->messages) || 795 thread_has_unpaused_pollers(thread) || 796 thread->critical_msg != NULL) { 797 return false; 798 } 799 800 return true; 801 } 802 803 uint32_t 804 spdk_thread_get_count(void) 805 { 806 /* 807 * Return cached value of the current thread count. We could acquire the 808 * lock and iterate through the TAILQ of threads to count them, but that 809 * count could still be invalidated after we release the lock. 810 */ 811 return g_thread_count; 812 } 813 814 struct spdk_thread * 815 spdk_get_thread(void) 816 { 817 return _get_thread(); 818 } 819 820 const char * 821 spdk_thread_get_name(const struct spdk_thread *thread) 822 { 823 return thread->name; 824 } 825 826 uint64_t 827 spdk_thread_get_id(const struct spdk_thread *thread) 828 { 829 return thread->id; 830 } 831 832 struct spdk_thread * 833 spdk_thread_get_by_id(uint64_t id) 834 { 835 struct spdk_thread *thread; 836 837 pthread_mutex_lock(&g_devlist_mutex); 838 TAILQ_FOREACH(thread, &g_threads, tailq) { 839 if (thread->id == id) { 840 pthread_mutex_unlock(&g_devlist_mutex); 841 842 return thread; 843 } 844 } 845 pthread_mutex_unlock(&g_devlist_mutex); 846 847 return NULL; 848 } 849 850 int 851 spdk_thread_get_stats(struct spdk_thread_stats *stats) 852 { 853 struct spdk_thread *thread; 854 855 thread = _get_thread(); 856 if (!thread) { 857 SPDK_ERRLOG("No thread allocated\n"); 858 return -EINVAL; 859 } 860 861 if (stats == NULL) { 862 return -EINVAL; 863 } 864 865 *stats = thread->stats; 866 867 return 0; 868 } 869 870 uint64_t 871 spdk_thread_get_last_tsc(struct spdk_thread *thread) 872 { 873 if (thread == NULL) { 874 thread = _get_thread(); 875 } 876 877 return thread->tsc_last; 878 } 879 880 int 881 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 882 { 883 struct spdk_thread *local_thread; 884 struct spdk_msg *msg; 885 int rc; 886 887 assert(thread != NULL); 888 889 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 890 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 891 return -EIO; 892 } 893 894 local_thread = _get_thread(); 895 896 msg = NULL; 897 if (local_thread != NULL) { 898 if (local_thread->msg_cache_count > 0) { 899 msg = SLIST_FIRST(&local_thread->msg_cache); 900 assert(msg != NULL); 901 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 902 local_thread->msg_cache_count--; 903 } 904 } 905 906 if (msg == NULL) { 907 msg = spdk_mempool_get(g_spdk_msg_mempool); 908 if (!msg) { 909 SPDK_ERRLOG("msg could not be allocated\n"); 910 return -ENOMEM; 911 } 912 } 913 914 msg->fn = fn; 915 msg->arg = ctx; 916 917 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 918 if (rc != 1) { 919 SPDK_ERRLOG("msg could not be enqueued\n"); 920 spdk_mempool_put(g_spdk_msg_mempool, msg); 921 return -EIO; 922 } 923 924 if (thread->interrupt_mode) { 925 uint64_t notify = 1; 926 927 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 928 if (rc < 0) { 929 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 930 return -EIO; 931 } 932 } 933 934 return 0; 935 } 936 937 int 938 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 939 { 940 spdk_msg_fn expected = NULL; 941 942 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 943 __ATOMIC_SEQ_CST)) { 944 if (thread->interrupt_mode) { 945 uint64_t notify = 1; 946 int rc; 947 948 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 949 if (rc < 0) { 950 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 951 return -EIO; 952 } 953 } 954 955 return 0; 956 } 957 958 return -EIO; 959 } 960 961 #ifdef __linux__ 962 static int 963 interrupt_timerfd_prepare(uint64_t period_microseconds) 964 { 965 int timerfd; 966 int ret; 967 struct itimerspec new_tv; 968 uint64_t period_seconds; 969 uint64_t period_nanoseconds; 970 971 if (period_microseconds == 0) { 972 return -EINVAL; 973 } 974 975 period_seconds = period_microseconds / SPDK_SEC_TO_USEC; 976 period_nanoseconds = period_microseconds % SPDK_SEC_TO_USEC * 1000; 977 978 new_tv.it_value.tv_sec = period_seconds; 979 new_tv.it_value.tv_nsec = period_nanoseconds; 980 981 new_tv.it_interval.tv_sec = period_seconds; 982 new_tv.it_interval.tv_nsec = period_nanoseconds; 983 984 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK & TFD_CLOEXEC); 985 if (timerfd < 0) { 986 return -errno; 987 } 988 989 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 990 if (ret < 0) { 991 close(timerfd); 992 return -errno; 993 } 994 995 return timerfd; 996 } 997 #else 998 static int 999 interrupt_timerfd_prepare(uint64_t period_microseconds) 1000 { 1001 return -ENOTSUP; 1002 } 1003 #endif 1004 1005 static int 1006 interrupt_timerfd_process(void *arg) 1007 { 1008 struct spdk_poller *poller = arg; 1009 uint64_t exp; 1010 int rc; 1011 1012 /* clear the level of interval timer */ 1013 rc = read(poller->timerfd, &exp, sizeof(exp)); 1014 if (rc < 0) { 1015 if (rc == -EAGAIN) { 1016 return 0; 1017 } 1018 1019 return rc; 1020 } 1021 1022 return poller->fn(poller->arg); 1023 } 1024 1025 static int 1026 thread_interrupt_register_timerfd(struct spdk_fd_group *fgrp, 1027 uint64_t period_microseconds, 1028 struct spdk_poller *poller) 1029 { 1030 int timerfd; 1031 int rc; 1032 1033 timerfd = interrupt_timerfd_prepare(period_microseconds); 1034 if (timerfd < 0) { 1035 return timerfd; 1036 } 1037 1038 rc = spdk_fd_group_add(fgrp, timerfd, 1039 interrupt_timerfd_process, poller); 1040 if (rc < 0) { 1041 close(timerfd); 1042 return rc; 1043 } 1044 1045 return timerfd; 1046 } 1047 1048 static struct spdk_poller * 1049 poller_register(spdk_poller_fn fn, 1050 void *arg, 1051 uint64_t period_microseconds, 1052 const char *name) 1053 { 1054 struct spdk_thread *thread; 1055 struct spdk_poller *poller; 1056 uint64_t quotient, remainder, ticks; 1057 1058 thread = spdk_get_thread(); 1059 if (!thread) { 1060 assert(false); 1061 return NULL; 1062 } 1063 1064 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1065 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1066 return NULL; 1067 } 1068 1069 poller = calloc(1, sizeof(*poller)); 1070 if (poller == NULL) { 1071 SPDK_ERRLOG("Poller memory allocation failed\n"); 1072 return NULL; 1073 } 1074 1075 if (name) { 1076 snprintf(poller->name, sizeof(poller->name), "%s", name); 1077 } else { 1078 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1079 } 1080 1081 poller->state = SPDK_POLLER_STATE_WAITING; 1082 poller->fn = fn; 1083 poller->arg = arg; 1084 poller->thread = thread; 1085 1086 if (period_microseconds) { 1087 quotient = period_microseconds / SPDK_SEC_TO_USEC; 1088 remainder = period_microseconds % SPDK_SEC_TO_USEC; 1089 ticks = spdk_get_ticks_hz(); 1090 1091 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1092 } else { 1093 poller->period_ticks = 0; 1094 } 1095 1096 if (thread->interrupt_mode && period_microseconds != 0) { 1097 int rc; 1098 1099 rc = thread_interrupt_register_timerfd(thread->fgrp, period_microseconds, poller); 1100 if (rc < 0) { 1101 SPDK_ERRLOG("Failed to register timerfd for periodic poller: %s\n", spdk_strerror(-rc)); 1102 free(poller); 1103 return NULL; 1104 } 1105 poller->timerfd = rc; 1106 } 1107 1108 thread_insert_poller(thread, poller); 1109 1110 return poller; 1111 } 1112 1113 struct spdk_poller * 1114 spdk_poller_register(spdk_poller_fn fn, 1115 void *arg, 1116 uint64_t period_microseconds) 1117 { 1118 return poller_register(fn, arg, period_microseconds, NULL); 1119 } 1120 1121 struct spdk_poller * 1122 spdk_poller_register_named(spdk_poller_fn fn, 1123 void *arg, 1124 uint64_t period_microseconds, 1125 const char *name) 1126 { 1127 return poller_register(fn, arg, period_microseconds, name); 1128 } 1129 1130 void 1131 spdk_poller_unregister(struct spdk_poller **ppoller) 1132 { 1133 struct spdk_thread *thread; 1134 struct spdk_poller *poller; 1135 1136 poller = *ppoller; 1137 if (poller == NULL) { 1138 return; 1139 } 1140 1141 *ppoller = NULL; 1142 1143 thread = spdk_get_thread(); 1144 if (!thread) { 1145 assert(false); 1146 return; 1147 } 1148 1149 if (poller->thread != thread) { 1150 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1151 assert(false); 1152 return; 1153 } 1154 1155 if (thread->interrupt_mode && poller->timerfd) { 1156 spdk_fd_group_remove(thread->fgrp, poller->timerfd); 1157 close(poller->timerfd); 1158 poller->timerfd = 0; 1159 } 1160 1161 /* If the poller was paused, put it on the active_pollers list so that 1162 * its unregistration can be processed by spdk_thread_poll(). 1163 */ 1164 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1165 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1166 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1167 poller->period_ticks = 0; 1168 } 1169 1170 /* Simply set the state to unregistered. The poller will get cleaned up 1171 * in a subsequent call to spdk_thread_poll(). 1172 */ 1173 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1174 } 1175 1176 void 1177 spdk_poller_pause(struct spdk_poller *poller) 1178 { 1179 struct spdk_thread *thread; 1180 1181 if (poller->state == SPDK_POLLER_STATE_PAUSED || 1182 poller->state == SPDK_POLLER_STATE_PAUSING) { 1183 return; 1184 } 1185 1186 thread = spdk_get_thread(); 1187 if (!thread) { 1188 assert(false); 1189 return; 1190 } 1191 1192 /* If a poller is paused from within itself, we can immediately move it 1193 * on the paused_pollers list. Otherwise we just set its state to 1194 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 1195 * allows a poller to be paused from another one's context without 1196 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 1197 */ 1198 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 1199 poller->state = SPDK_POLLER_STATE_PAUSING; 1200 } else { 1201 if (poller->period_ticks > 0) { 1202 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 1203 } else { 1204 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1205 } 1206 1207 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1208 poller->state = SPDK_POLLER_STATE_PAUSED; 1209 } 1210 } 1211 1212 void 1213 spdk_poller_resume(struct spdk_poller *poller) 1214 { 1215 struct spdk_thread *thread; 1216 1217 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1218 poller->state != SPDK_POLLER_STATE_PAUSING) { 1219 return; 1220 } 1221 1222 thread = spdk_get_thread(); 1223 if (!thread) { 1224 assert(false); 1225 return; 1226 } 1227 1228 /* If a poller is paused it has to be removed from the paused pollers 1229 * list and put on the active / timer list depending on its 1230 * period_ticks. If a poller is still in the process of being paused, 1231 * we just need to flip its state back to waiting, as it's already on 1232 * the appropriate list. 1233 */ 1234 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1235 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1236 thread_insert_poller(thread, poller); 1237 } 1238 1239 poller->state = SPDK_POLLER_STATE_WAITING; 1240 } 1241 1242 const char * 1243 spdk_poller_state_str(enum spdk_poller_state state) 1244 { 1245 switch (state) { 1246 case SPDK_POLLER_STATE_WAITING: 1247 return "waiting"; 1248 case SPDK_POLLER_STATE_RUNNING: 1249 return "running"; 1250 case SPDK_POLLER_STATE_UNREGISTERED: 1251 return "unregistered"; 1252 case SPDK_POLLER_STATE_PAUSING: 1253 return "pausing"; 1254 case SPDK_POLLER_STATE_PAUSED: 1255 return "paused"; 1256 default: 1257 return NULL; 1258 } 1259 } 1260 1261 struct call_thread { 1262 struct spdk_thread *cur_thread; 1263 spdk_msg_fn fn; 1264 void *ctx; 1265 1266 struct spdk_thread *orig_thread; 1267 spdk_msg_fn cpl; 1268 }; 1269 1270 static void 1271 _on_thread(void *ctx) 1272 { 1273 struct call_thread *ct = ctx; 1274 int rc __attribute__((unused)); 1275 1276 ct->fn(ct->ctx); 1277 1278 pthread_mutex_lock(&g_devlist_mutex); 1279 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1280 pthread_mutex_unlock(&g_devlist_mutex); 1281 1282 if (!ct->cur_thread) { 1283 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1284 1285 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1286 free(ctx); 1287 } else { 1288 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1289 ct->cur_thread->name); 1290 1291 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1292 } 1293 assert(rc == 0); 1294 } 1295 1296 void 1297 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1298 { 1299 struct call_thread *ct; 1300 struct spdk_thread *thread; 1301 int rc __attribute__((unused)); 1302 1303 ct = calloc(1, sizeof(*ct)); 1304 if (!ct) { 1305 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1306 cpl(ctx); 1307 return; 1308 } 1309 1310 ct->fn = fn; 1311 ct->ctx = ctx; 1312 ct->cpl = cpl; 1313 1314 thread = _get_thread(); 1315 if (!thread) { 1316 SPDK_ERRLOG("No thread allocated\n"); 1317 free(ct); 1318 cpl(ctx); 1319 return; 1320 } 1321 ct->orig_thread = thread; 1322 1323 pthread_mutex_lock(&g_devlist_mutex); 1324 ct->cur_thread = TAILQ_FIRST(&g_threads); 1325 pthread_mutex_unlock(&g_devlist_mutex); 1326 1327 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1328 ct->orig_thread->name); 1329 1330 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1331 assert(rc == 0); 1332 } 1333 1334 void 1335 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1336 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1337 const char *name) 1338 { 1339 struct io_device *dev, *tmp; 1340 struct spdk_thread *thread; 1341 1342 assert(io_device != NULL); 1343 assert(create_cb != NULL); 1344 assert(destroy_cb != NULL); 1345 1346 thread = spdk_get_thread(); 1347 if (!thread) { 1348 SPDK_ERRLOG("called from non-SPDK thread\n"); 1349 assert(false); 1350 return; 1351 } 1352 1353 dev = calloc(1, sizeof(struct io_device)); 1354 if (dev == NULL) { 1355 SPDK_ERRLOG("could not allocate io_device\n"); 1356 return; 1357 } 1358 1359 dev->io_device = io_device; 1360 if (name) { 1361 snprintf(dev->name, sizeof(dev->name), "%s", name); 1362 } else { 1363 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1364 } 1365 dev->create_cb = create_cb; 1366 dev->destroy_cb = destroy_cb; 1367 dev->unregister_cb = NULL; 1368 dev->ctx_size = ctx_size; 1369 dev->for_each_count = 0; 1370 dev->unregistered = false; 1371 dev->refcnt = 0; 1372 1373 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1374 dev->name, dev->io_device, thread->name); 1375 1376 pthread_mutex_lock(&g_devlist_mutex); 1377 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1378 if (tmp->io_device == io_device) { 1379 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1380 io_device, tmp->name, dev->name); 1381 free(dev); 1382 pthread_mutex_unlock(&g_devlist_mutex); 1383 return; 1384 } 1385 } 1386 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1387 pthread_mutex_unlock(&g_devlist_mutex); 1388 } 1389 1390 static void 1391 _finish_unregister(void *arg) 1392 { 1393 struct io_device *dev = arg; 1394 1395 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1396 dev->name, dev->io_device, dev->unregister_thread->name); 1397 1398 dev->unregister_cb(dev->io_device); 1399 free(dev); 1400 } 1401 1402 static void 1403 io_device_free(struct io_device *dev) 1404 { 1405 int rc __attribute__((unused)); 1406 1407 if (dev->unregister_cb == NULL) { 1408 free(dev); 1409 } else { 1410 assert(dev->unregister_thread != NULL); 1411 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1412 dev->name, dev->io_device, dev->unregister_thread->name); 1413 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1414 assert(rc == 0); 1415 } 1416 } 1417 1418 void 1419 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1420 { 1421 struct io_device *dev; 1422 uint32_t refcnt; 1423 struct spdk_thread *thread; 1424 1425 thread = spdk_get_thread(); 1426 if (!thread) { 1427 SPDK_ERRLOG("called from non-SPDK thread\n"); 1428 assert(false); 1429 return; 1430 } 1431 1432 pthread_mutex_lock(&g_devlist_mutex); 1433 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1434 if (dev->io_device == io_device) { 1435 break; 1436 } 1437 } 1438 1439 if (!dev) { 1440 SPDK_ERRLOG("io_device %p not found\n", io_device); 1441 assert(false); 1442 pthread_mutex_unlock(&g_devlist_mutex); 1443 return; 1444 } 1445 1446 if (dev->for_each_count > 0) { 1447 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1448 dev->name, io_device, dev->for_each_count); 1449 pthread_mutex_unlock(&g_devlist_mutex); 1450 return; 1451 } 1452 1453 dev->unregister_cb = unregister_cb; 1454 dev->unregistered = true; 1455 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1456 refcnt = dev->refcnt; 1457 dev->unregister_thread = thread; 1458 pthread_mutex_unlock(&g_devlist_mutex); 1459 1460 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1461 dev->name, dev->io_device, thread->name); 1462 1463 if (refcnt > 0) { 1464 /* defer deletion */ 1465 return; 1466 } 1467 1468 io_device_free(dev); 1469 } 1470 1471 const char * 1472 spdk_io_device_get_name(struct io_device *dev) 1473 { 1474 return dev->name; 1475 } 1476 1477 struct spdk_io_channel * 1478 spdk_get_io_channel(void *io_device) 1479 { 1480 struct spdk_io_channel *ch; 1481 struct spdk_thread *thread; 1482 struct io_device *dev; 1483 int rc; 1484 1485 pthread_mutex_lock(&g_devlist_mutex); 1486 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1487 if (dev->io_device == io_device) { 1488 break; 1489 } 1490 } 1491 if (dev == NULL) { 1492 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1493 pthread_mutex_unlock(&g_devlist_mutex); 1494 return NULL; 1495 } 1496 1497 thread = _get_thread(); 1498 if (!thread) { 1499 SPDK_ERRLOG("No thread allocated\n"); 1500 pthread_mutex_unlock(&g_devlist_mutex); 1501 return NULL; 1502 } 1503 1504 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1505 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1506 pthread_mutex_unlock(&g_devlist_mutex); 1507 return NULL; 1508 } 1509 1510 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1511 if (ch->dev == dev) { 1512 ch->ref++; 1513 1514 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1515 ch, dev->name, dev->io_device, thread->name, ch->ref); 1516 1517 /* 1518 * An I/O channel already exists for this device on this 1519 * thread, so return it. 1520 */ 1521 pthread_mutex_unlock(&g_devlist_mutex); 1522 return ch; 1523 } 1524 } 1525 1526 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1527 if (ch == NULL) { 1528 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1529 pthread_mutex_unlock(&g_devlist_mutex); 1530 return NULL; 1531 } 1532 1533 ch->dev = dev; 1534 ch->destroy_cb = dev->destroy_cb; 1535 ch->thread = thread; 1536 ch->ref = 1; 1537 ch->destroy_ref = 0; 1538 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1539 1540 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1541 ch, dev->name, dev->io_device, thread->name, ch->ref); 1542 1543 dev->refcnt++; 1544 1545 pthread_mutex_unlock(&g_devlist_mutex); 1546 1547 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1548 if (rc != 0) { 1549 pthread_mutex_lock(&g_devlist_mutex); 1550 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1551 dev->refcnt--; 1552 free(ch); 1553 pthread_mutex_unlock(&g_devlist_mutex); 1554 return NULL; 1555 } 1556 1557 return ch; 1558 } 1559 1560 static void 1561 put_io_channel(void *arg) 1562 { 1563 struct spdk_io_channel *ch = arg; 1564 bool do_remove_dev = true; 1565 struct spdk_thread *thread; 1566 1567 thread = spdk_get_thread(); 1568 if (!thread) { 1569 SPDK_ERRLOG("called from non-SPDK thread\n"); 1570 assert(false); 1571 return; 1572 } 1573 1574 SPDK_DEBUGLOG(thread, 1575 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1576 ch, ch->dev->name, ch->dev->io_device, thread->name); 1577 1578 assert(ch->thread == thread); 1579 1580 ch->destroy_ref--; 1581 1582 if (ch->ref > 0 || ch->destroy_ref > 0) { 1583 /* 1584 * Another reference to the associated io_device was requested 1585 * after this message was sent but before it had a chance to 1586 * execute. 1587 */ 1588 return; 1589 } 1590 1591 pthread_mutex_lock(&g_devlist_mutex); 1592 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1593 pthread_mutex_unlock(&g_devlist_mutex); 1594 1595 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1596 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1597 1598 pthread_mutex_lock(&g_devlist_mutex); 1599 ch->dev->refcnt--; 1600 1601 if (!ch->dev->unregistered) { 1602 do_remove_dev = false; 1603 } 1604 1605 if (ch->dev->refcnt > 0) { 1606 do_remove_dev = false; 1607 } 1608 1609 pthread_mutex_unlock(&g_devlist_mutex); 1610 1611 if (do_remove_dev) { 1612 io_device_free(ch->dev); 1613 } 1614 free(ch); 1615 } 1616 1617 void 1618 spdk_put_io_channel(struct spdk_io_channel *ch) 1619 { 1620 struct spdk_thread *thread; 1621 int rc __attribute__((unused)); 1622 1623 thread = spdk_get_thread(); 1624 if (!thread) { 1625 SPDK_ERRLOG("called from non-SPDK thread\n"); 1626 assert(false); 1627 return; 1628 } 1629 1630 if (ch->thread != thread) { 1631 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1632 assert(false); 1633 return; 1634 } 1635 1636 SPDK_DEBUGLOG(thread, 1637 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1638 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1639 1640 ch->ref--; 1641 1642 if (ch->ref == 0) { 1643 ch->destroy_ref++; 1644 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 1645 assert(rc == 0); 1646 } 1647 } 1648 1649 struct spdk_io_channel * 1650 spdk_io_channel_from_ctx(void *ctx) 1651 { 1652 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1653 } 1654 1655 struct spdk_thread * 1656 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1657 { 1658 return ch->thread; 1659 } 1660 1661 struct spdk_io_channel_iter { 1662 void *io_device; 1663 struct io_device *dev; 1664 spdk_channel_msg fn; 1665 int status; 1666 void *ctx; 1667 struct spdk_io_channel *ch; 1668 1669 struct spdk_thread *cur_thread; 1670 1671 struct spdk_thread *orig_thread; 1672 spdk_channel_for_each_cpl cpl; 1673 }; 1674 1675 void * 1676 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1677 { 1678 return i->io_device; 1679 } 1680 1681 struct spdk_io_channel * 1682 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1683 { 1684 return i->ch; 1685 } 1686 1687 void * 1688 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1689 { 1690 return i->ctx; 1691 } 1692 1693 static void 1694 _call_completion(void *ctx) 1695 { 1696 struct spdk_io_channel_iter *i = ctx; 1697 1698 if (i->cpl != NULL) { 1699 i->cpl(i, i->status); 1700 } 1701 free(i); 1702 } 1703 1704 static void 1705 _call_channel(void *ctx) 1706 { 1707 struct spdk_io_channel_iter *i = ctx; 1708 struct spdk_io_channel *ch; 1709 1710 /* 1711 * It is possible that the channel was deleted before this 1712 * message had a chance to execute. If so, skip calling 1713 * the fn() on this thread. 1714 */ 1715 pthread_mutex_lock(&g_devlist_mutex); 1716 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1717 if (ch->dev->io_device == i->io_device) { 1718 break; 1719 } 1720 } 1721 pthread_mutex_unlock(&g_devlist_mutex); 1722 1723 if (ch) { 1724 i->fn(i); 1725 } else { 1726 spdk_for_each_channel_continue(i, 0); 1727 } 1728 } 1729 1730 void 1731 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1732 spdk_channel_for_each_cpl cpl) 1733 { 1734 struct spdk_thread *thread; 1735 struct spdk_io_channel *ch; 1736 struct spdk_io_channel_iter *i; 1737 int rc __attribute__((unused)); 1738 1739 i = calloc(1, sizeof(*i)); 1740 if (!i) { 1741 SPDK_ERRLOG("Unable to allocate iterator\n"); 1742 return; 1743 } 1744 1745 i->io_device = io_device; 1746 i->fn = fn; 1747 i->ctx = ctx; 1748 i->cpl = cpl; 1749 1750 pthread_mutex_lock(&g_devlist_mutex); 1751 i->orig_thread = _get_thread(); 1752 1753 TAILQ_FOREACH(thread, &g_threads, tailq) { 1754 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1755 if (ch->dev->io_device == io_device) { 1756 ch->dev->for_each_count++; 1757 i->dev = ch->dev; 1758 i->cur_thread = thread; 1759 i->ch = ch; 1760 pthread_mutex_unlock(&g_devlist_mutex); 1761 rc = spdk_thread_send_msg(thread, _call_channel, i); 1762 assert(rc == 0); 1763 return; 1764 } 1765 } 1766 } 1767 1768 pthread_mutex_unlock(&g_devlist_mutex); 1769 1770 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1771 assert(rc == 0); 1772 } 1773 1774 void 1775 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1776 { 1777 struct spdk_thread *thread; 1778 struct spdk_io_channel *ch; 1779 int rc __attribute__((unused)); 1780 1781 assert(i->cur_thread == spdk_get_thread()); 1782 1783 i->status = status; 1784 1785 pthread_mutex_lock(&g_devlist_mutex); 1786 if (status) { 1787 goto end; 1788 } 1789 thread = TAILQ_NEXT(i->cur_thread, tailq); 1790 while (thread) { 1791 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1792 if (ch->dev->io_device == i->io_device) { 1793 i->cur_thread = thread; 1794 i->ch = ch; 1795 pthread_mutex_unlock(&g_devlist_mutex); 1796 rc = spdk_thread_send_msg(thread, _call_channel, i); 1797 assert(rc == 0); 1798 return; 1799 } 1800 } 1801 thread = TAILQ_NEXT(thread, tailq); 1802 } 1803 1804 end: 1805 i->dev->for_each_count--; 1806 i->ch = NULL; 1807 pthread_mutex_unlock(&g_devlist_mutex); 1808 1809 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1810 assert(rc == 0); 1811 } 1812 1813 struct spdk_interrupt { 1814 int efd; 1815 struct spdk_thread *thread; 1816 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 1817 }; 1818 1819 static void 1820 thread_interrupt_destroy(struct spdk_thread *thread) 1821 { 1822 struct spdk_fd_group *fgrp = thread->fgrp; 1823 1824 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 1825 1826 if (thread->msg_fd <= 0) { 1827 return; 1828 } 1829 1830 spdk_fd_group_remove(fgrp, thread->msg_fd); 1831 close(thread->msg_fd); 1832 1833 spdk_fd_group_destroy(fgrp); 1834 thread->fgrp = NULL; 1835 } 1836 1837 #ifdef __linux__ 1838 static int 1839 thread_interrupt_msg_process(void *arg) 1840 { 1841 struct spdk_thread *thread = arg; 1842 struct spdk_thread *orig_thread; 1843 uint32_t msg_count; 1844 spdk_msg_fn critical_msg; 1845 int rc = 0; 1846 uint64_t now = spdk_get_ticks(); 1847 1848 orig_thread = _get_thread(); 1849 tls_thread = thread; 1850 1851 critical_msg = thread->critical_msg; 1852 if (spdk_unlikely(critical_msg != NULL)) { 1853 critical_msg(NULL); 1854 thread->critical_msg = NULL; 1855 } 1856 1857 msg_count = msg_queue_run_batch(thread, 0); 1858 if (msg_count) { 1859 rc = 1; 1860 } 1861 1862 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1863 1864 tls_thread = orig_thread; 1865 1866 return rc; 1867 } 1868 1869 static int 1870 thread_interrupt_create(struct spdk_thread *thread) 1871 { 1872 int rc; 1873 1874 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 1875 1876 rc = spdk_fd_group_create(&thread->fgrp); 1877 if (rc) { 1878 return rc; 1879 } 1880 1881 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1882 if (thread->msg_fd < 0) { 1883 rc = -errno; 1884 spdk_fd_group_destroy(thread->fgrp); 1885 thread->fgrp = NULL; 1886 1887 return rc; 1888 } 1889 1890 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 1891 } 1892 #else 1893 static int 1894 thread_interrupt_create(struct spdk_thread *thread) 1895 { 1896 return -ENOTSUP; 1897 } 1898 #endif 1899 1900 struct spdk_interrupt * 1901 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 1902 void *arg, const char *name) 1903 { 1904 struct spdk_thread *thread; 1905 struct spdk_interrupt *intr; 1906 1907 thread = spdk_get_thread(); 1908 if (!thread) { 1909 assert(false); 1910 return NULL; 1911 } 1912 1913 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 1914 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1915 return NULL; 1916 } 1917 1918 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 1919 return NULL; 1920 } 1921 1922 intr = calloc(1, sizeof(*intr)); 1923 if (intr == NULL) { 1924 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 1925 return NULL; 1926 } 1927 1928 if (name) { 1929 snprintf(intr->name, sizeof(intr->name), "%s", name); 1930 } else { 1931 snprintf(intr->name, sizeof(intr->name), "%p", fn); 1932 } 1933 1934 intr->efd = efd; 1935 intr->thread = thread; 1936 1937 return intr; 1938 } 1939 1940 void 1941 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 1942 { 1943 struct spdk_thread *thread; 1944 struct spdk_interrupt *intr; 1945 1946 intr = *pintr; 1947 if (intr == NULL) { 1948 return; 1949 } 1950 1951 *pintr = NULL; 1952 1953 thread = spdk_get_thread(); 1954 if (!thread) { 1955 assert(false); 1956 return; 1957 } 1958 1959 if (intr->thread != thread) { 1960 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 1961 assert(false); 1962 return; 1963 } 1964 1965 spdk_fd_group_remove(thread->fgrp, intr->efd); 1966 free(intr); 1967 } 1968 1969 int 1970 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 1971 enum spdk_interrupt_event_types event_types) 1972 { 1973 struct spdk_thread *thread; 1974 1975 thread = spdk_get_thread(); 1976 if (!thread) { 1977 assert(false); 1978 return -EINVAL; 1979 } 1980 1981 if (intr->thread != thread) { 1982 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 1983 assert(false); 1984 return -EINVAL; 1985 } 1986 1987 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 1988 } 1989 1990 int 1991 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 1992 { 1993 return spdk_fd_group_get_fd(thread->fgrp); 1994 } 1995 1996 static bool g_interrupt_mode = false; 1997 1998 int 1999 spdk_interrupt_mode_enable(void) 2000 { 2001 #ifdef __linux__ 2002 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2003 g_interrupt_mode = true; 2004 return 0; 2005 #else 2006 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2007 g_interrupt_mode = false; 2008 return -ENOTSUP; 2009 #endif 2010 } 2011 2012 bool 2013 spdk_interrupt_mode_is_enabled(void) 2014 { 2015 return g_interrupt_mode; 2016 } 2017 2018 SPDK_LOG_REGISTER_COMPONENT(thread) 2019