1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 #include "spdk/fd_group.h" 43 44 #include "spdk/log.h" 45 #include "spdk_internal/thread.h" 46 47 #ifdef __linux__ 48 #include <sys/timerfd.h> 49 #include <sys/eventfd.h> 50 #endif 51 52 #define SPDK_MSG_BATCH_SIZE 8 53 #define SPDK_MAX_DEVICE_NAME_LEN 256 54 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 55 56 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 57 58 static spdk_new_thread_fn g_new_thread_fn = NULL; 59 static spdk_thread_op_fn g_thread_op_fn = NULL; 60 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 61 static size_t g_ctx_sz = 0; 62 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 63 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 64 * SPDK application is required. 65 */ 66 static uint64_t g_thread_id = 1; 67 68 struct io_device { 69 void *io_device; 70 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 71 spdk_io_channel_create_cb create_cb; 72 spdk_io_channel_destroy_cb destroy_cb; 73 spdk_io_device_unregister_cb unregister_cb; 74 struct spdk_thread *unregister_thread; 75 uint32_t ctx_size; 76 uint32_t for_each_count; 77 TAILQ_ENTRY(io_device) tailq; 78 79 uint32_t refcnt; 80 81 bool unregistered; 82 }; 83 84 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 85 86 struct spdk_msg { 87 spdk_msg_fn fn; 88 void *arg; 89 90 SLIST_ENTRY(spdk_msg) link; 91 }; 92 93 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 94 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 95 96 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 97 static uint32_t g_thread_count = 0; 98 99 static __thread struct spdk_thread *tls_thread = NULL; 100 101 static inline struct spdk_thread * 102 _get_thread(void) 103 { 104 return tls_thread; 105 } 106 107 static int 108 _thread_lib_init(size_t ctx_sz) 109 { 110 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 111 112 g_ctx_sz = ctx_sz; 113 114 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 115 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 116 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 117 sizeof(struct spdk_msg), 118 0, /* No cache. We do our own. */ 119 SPDK_ENV_SOCKET_ID_ANY); 120 121 if (!g_spdk_msg_mempool) { 122 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 123 return -1; 124 } 125 126 return 0; 127 } 128 129 int 130 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 131 { 132 assert(g_new_thread_fn == NULL); 133 assert(g_thread_op_fn == NULL); 134 135 if (new_thread_fn == NULL) { 136 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 137 } else { 138 g_new_thread_fn = new_thread_fn; 139 } 140 141 return _thread_lib_init(ctx_sz); 142 } 143 144 int 145 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 146 spdk_thread_op_supported_fn thread_op_supported_fn, 147 size_t ctx_sz) 148 { 149 assert(g_new_thread_fn == NULL); 150 assert(g_thread_op_fn == NULL); 151 assert(g_thread_op_supported_fn == NULL); 152 153 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 154 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 155 return -EINVAL; 156 } 157 158 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 159 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 160 } else { 161 g_thread_op_fn = thread_op_fn; 162 g_thread_op_supported_fn = thread_op_supported_fn; 163 } 164 165 return _thread_lib_init(ctx_sz); 166 } 167 168 void 169 spdk_thread_lib_fini(void) 170 { 171 struct io_device *dev; 172 173 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 174 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 175 } 176 177 if (g_spdk_msg_mempool) { 178 spdk_mempool_free(g_spdk_msg_mempool); 179 g_spdk_msg_mempool = NULL; 180 } 181 182 g_new_thread_fn = NULL; 183 g_thread_op_fn = NULL; 184 g_thread_op_supported_fn = NULL; 185 g_ctx_sz = 0; 186 } 187 188 static void thread_interrupt_destroy(struct spdk_thread *thread); 189 static int thread_interrupt_create(struct spdk_thread *thread); 190 191 static void 192 _free_thread(struct spdk_thread *thread) 193 { 194 struct spdk_io_channel *ch; 195 struct spdk_msg *msg; 196 struct spdk_poller *poller, *ptmp; 197 198 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 199 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 200 thread->name, ch->dev->name); 201 } 202 203 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 204 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 205 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 206 poller->name); 207 } 208 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 209 free(poller); 210 } 211 212 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 213 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 214 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 215 poller->name); 216 } 217 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 218 free(poller); 219 } 220 221 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 222 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 223 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 224 free(poller); 225 } 226 227 pthread_mutex_lock(&g_devlist_mutex); 228 assert(g_thread_count > 0); 229 g_thread_count--; 230 TAILQ_REMOVE(&g_threads, thread, tailq); 231 pthread_mutex_unlock(&g_devlist_mutex); 232 233 msg = SLIST_FIRST(&thread->msg_cache); 234 while (msg != NULL) { 235 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 236 237 assert(thread->msg_cache_count > 0); 238 thread->msg_cache_count--; 239 spdk_mempool_put(g_spdk_msg_mempool, msg); 240 241 msg = SLIST_FIRST(&thread->msg_cache); 242 } 243 244 assert(thread->msg_cache_count == 0); 245 246 if (thread->interrupt_mode) { 247 thread_interrupt_destroy(thread); 248 } 249 250 spdk_ring_free(thread->messages); 251 free(thread); 252 } 253 254 struct spdk_thread * 255 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 256 { 257 struct spdk_thread *thread; 258 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 259 int rc = 0, i; 260 261 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 262 if (!thread) { 263 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 264 return NULL; 265 } 266 267 if (cpumask) { 268 spdk_cpuset_copy(&thread->cpumask, cpumask); 269 } else { 270 spdk_cpuset_negate(&thread->cpumask); 271 } 272 273 TAILQ_INIT(&thread->io_channels); 274 TAILQ_INIT(&thread->active_pollers); 275 TAILQ_INIT(&thread->timed_pollers); 276 TAILQ_INIT(&thread->paused_pollers); 277 SLIST_INIT(&thread->msg_cache); 278 thread->msg_cache_count = 0; 279 280 thread->tsc_last = spdk_get_ticks(); 281 282 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 283 if (!thread->messages) { 284 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 285 free(thread); 286 return NULL; 287 } 288 289 /* Fill the local message pool cache. */ 290 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 291 if (rc == 0) { 292 /* If we can't populate the cache it's ok. The cache will get filled 293 * up organically as messages are passed to the thread. */ 294 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 295 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 296 thread->msg_cache_count++; 297 } 298 } 299 300 if (name) { 301 snprintf(thread->name, sizeof(thread->name), "%s", name); 302 } else { 303 snprintf(thread->name, sizeof(thread->name), "%p", thread); 304 } 305 306 pthread_mutex_lock(&g_devlist_mutex); 307 if (g_thread_id == 0) { 308 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 309 pthread_mutex_unlock(&g_devlist_mutex); 310 _free_thread(thread); 311 return NULL; 312 } 313 thread->id = g_thread_id++; 314 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 315 g_thread_count++; 316 pthread_mutex_unlock(&g_devlist_mutex); 317 318 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 319 thread->id, thread->name); 320 321 if (spdk_interrupt_mode_is_enabled()) { 322 thread->interrupt_mode = true; 323 rc = thread_interrupt_create(thread); 324 if (rc != 0) { 325 _free_thread(thread); 326 return NULL; 327 } 328 } 329 330 if (g_new_thread_fn) { 331 rc = g_new_thread_fn(thread); 332 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 333 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 334 } 335 336 if (rc != 0) { 337 _free_thread(thread); 338 return NULL; 339 } 340 341 thread->state = SPDK_THREAD_STATE_RUNNING; 342 343 return thread; 344 } 345 346 void 347 spdk_set_thread(struct spdk_thread *thread) 348 { 349 tls_thread = thread; 350 } 351 352 static void 353 thread_exit(struct spdk_thread *thread, uint64_t now) 354 { 355 struct spdk_poller *poller; 356 struct spdk_io_channel *ch; 357 358 if (now >= thread->exit_timeout_tsc) { 359 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 360 thread->name); 361 goto exited; 362 } 363 364 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 365 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 366 SPDK_INFOLOG(thread, 367 "thread %s still has active poller %s\n", 368 thread->name, poller->name); 369 return; 370 } 371 } 372 373 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 374 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 375 SPDK_INFOLOG(thread, 376 "thread %s still has active timed poller %s\n", 377 thread->name, poller->name); 378 return; 379 } 380 } 381 382 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 383 SPDK_INFOLOG(thread, 384 "thread %s still has paused poller %s\n", 385 thread->name, poller->name); 386 return; 387 } 388 389 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 390 SPDK_INFOLOG(thread, 391 "thread %s still has channel for io_device %s\n", 392 thread->name, ch->dev->name); 393 return; 394 } 395 396 exited: 397 thread->state = SPDK_THREAD_STATE_EXITED; 398 } 399 400 int 401 spdk_thread_exit(struct spdk_thread *thread) 402 { 403 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 404 405 assert(tls_thread == thread); 406 407 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 408 SPDK_INFOLOG(thread, 409 "thread %s is already exiting\n", 410 thread->name); 411 return 0; 412 } 413 414 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 415 SPDK_THREAD_EXIT_TIMEOUT_SEC); 416 thread->state = SPDK_THREAD_STATE_EXITING; 417 return 0; 418 } 419 420 bool 421 spdk_thread_is_exited(struct spdk_thread *thread) 422 { 423 return thread->state == SPDK_THREAD_STATE_EXITED; 424 } 425 426 void 427 spdk_thread_destroy(struct spdk_thread *thread) 428 { 429 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 430 431 assert(thread->state == SPDK_THREAD_STATE_EXITED); 432 433 if (tls_thread == thread) { 434 tls_thread = NULL; 435 } 436 437 _free_thread(thread); 438 } 439 440 void * 441 spdk_thread_get_ctx(struct spdk_thread *thread) 442 { 443 if (g_ctx_sz > 0) { 444 return thread->ctx; 445 } 446 447 return NULL; 448 } 449 450 struct spdk_cpuset * 451 spdk_thread_get_cpumask(struct spdk_thread *thread) 452 { 453 return &thread->cpumask; 454 } 455 456 int 457 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 458 { 459 struct spdk_thread *thread; 460 461 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 462 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 463 assert(false); 464 return -ENOTSUP; 465 } 466 467 thread = spdk_get_thread(); 468 if (!thread) { 469 SPDK_ERRLOG("Called from non-SPDK thread\n"); 470 assert(false); 471 return -EINVAL; 472 } 473 474 spdk_cpuset_copy(&thread->cpumask, cpumask); 475 476 /* Invoke framework's reschedule operation. If this function is called multiple times 477 * in a single spdk_thread_poll() context, the last cpumask will be used in the 478 * reschedule operation. 479 */ 480 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 481 482 return 0; 483 } 484 485 struct spdk_thread * 486 spdk_thread_get_from_ctx(void *ctx) 487 { 488 if (ctx == NULL) { 489 assert(false); 490 return NULL; 491 } 492 493 assert(g_ctx_sz > 0); 494 495 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 496 } 497 498 static inline uint32_t 499 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 500 { 501 unsigned count, i; 502 void *messages[SPDK_MSG_BATCH_SIZE]; 503 uint64_t notify = 1; 504 int rc; 505 506 #ifdef DEBUG 507 /* 508 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 509 * so we will never actually read uninitialized data from events, but just to be sure 510 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 511 */ 512 memset(messages, 0, sizeof(messages)); 513 #endif 514 515 if (max_msgs > 0) { 516 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 517 } else { 518 max_msgs = SPDK_MSG_BATCH_SIZE; 519 } 520 if (thread->interrupt_mode) { 521 /* There may be race between msg_acknowledge and another producer's msg_notify, 522 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 523 * This can avoid msg notification missing. 524 */ 525 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 526 if (rc < 0) { 527 SPDK_ERRLOG("failed to acknowledge msg_queue: %s.\n", spdk_strerror(errno)); 528 } 529 } 530 531 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 532 if (thread->interrupt_mode && spdk_ring_count(thread->messages) != 0) { 533 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 534 if (rc < 0) { 535 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 536 } 537 } 538 if (count == 0) { 539 return 0; 540 } 541 542 for (i = 0; i < count; i++) { 543 struct spdk_msg *msg = messages[i]; 544 545 assert(msg != NULL); 546 msg->fn(msg->arg); 547 548 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 549 /* Insert the messages at the head. We want to re-use the hot 550 * ones. */ 551 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 552 thread->msg_cache_count++; 553 } else { 554 spdk_mempool_put(g_spdk_msg_mempool, msg); 555 } 556 } 557 558 return count; 559 } 560 561 static void 562 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 563 { 564 struct spdk_poller *iter; 565 566 poller->next_run_tick = now + poller->period_ticks; 567 568 /* 569 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 570 * run time. 571 */ 572 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 573 if (iter->next_run_tick <= poller->next_run_tick) { 574 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 575 return; 576 } 577 } 578 579 /* No earlier pollers were found, so this poller must be the new head */ 580 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 581 } 582 583 static void 584 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 585 { 586 if (poller->period_ticks) { 587 poller_insert_timer(thread, poller, spdk_get_ticks()); 588 } else { 589 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 590 } 591 } 592 593 static inline void 594 thread_update_stats(struct spdk_thread *thread, uint64_t end, 595 uint64_t start, int rc) 596 { 597 if (rc == 0) { 598 /* Poller status idle */ 599 thread->stats.idle_tsc += end - start; 600 } else if (rc > 0) { 601 /* Poller status busy */ 602 thread->stats.busy_tsc += end - start; 603 } 604 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 605 thread->tsc_last = end; 606 } 607 608 static int 609 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 610 { 611 uint32_t msg_count; 612 struct spdk_poller *poller, *tmp; 613 spdk_msg_fn critical_msg; 614 int rc = 0; 615 616 thread->tsc_last = now; 617 618 critical_msg = thread->critical_msg; 619 if (spdk_unlikely(critical_msg != NULL)) { 620 critical_msg(NULL); 621 thread->critical_msg = NULL; 622 } 623 624 msg_count = msg_queue_run_batch(thread, max_msgs); 625 if (msg_count) { 626 rc = 1; 627 } 628 629 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 630 active_pollers_head, tailq, tmp) { 631 int poller_rc; 632 633 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 634 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 635 free(poller); 636 continue; 637 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 638 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 639 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 640 poller->state = SPDK_POLLER_STATE_PAUSED; 641 continue; 642 } 643 644 poller->state = SPDK_POLLER_STATE_RUNNING; 645 poller_rc = poller->fn(poller->arg); 646 647 poller->run_count++; 648 if (poller_rc > 0) { 649 poller->busy_count++; 650 } 651 652 #ifdef DEBUG 653 if (poller_rc == -1) { 654 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 655 } 656 #endif 657 658 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 659 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 660 free(poller); 661 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 662 poller->state = SPDK_POLLER_STATE_WAITING; 663 } 664 665 if (poller_rc > rc) { 666 rc = poller_rc; 667 } 668 } 669 670 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 671 int timer_rc = 0; 672 673 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 674 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 675 free(poller); 676 continue; 677 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 678 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 679 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 680 poller->state = SPDK_POLLER_STATE_PAUSED; 681 continue; 682 } 683 684 if (now < poller->next_run_tick) { 685 break; 686 } 687 688 poller->state = SPDK_POLLER_STATE_RUNNING; 689 timer_rc = poller->fn(poller->arg); 690 691 poller->run_count++; 692 if (timer_rc > 0) { 693 poller->busy_count++; 694 } 695 696 #ifdef DEBUG 697 if (timer_rc == -1) { 698 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 699 } 700 #endif 701 702 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 703 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 704 free(poller); 705 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 706 poller->state = SPDK_POLLER_STATE_WAITING; 707 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 708 poller_insert_timer(thread, poller, now); 709 } 710 711 if (timer_rc > rc) { 712 rc = timer_rc; 713 } 714 } 715 716 return rc; 717 } 718 719 int 720 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 721 { 722 struct spdk_thread *orig_thread; 723 int rc; 724 725 orig_thread = _get_thread(); 726 tls_thread = thread; 727 728 if (now == 0) { 729 now = spdk_get_ticks(); 730 } 731 732 if (!thread->interrupt_mode) { 733 rc = thread_poll(thread, max_msgs, now); 734 } else { 735 /* Non-block wait on thread's fd_group */ 736 rc = spdk_fd_group_wait(thread->fgrp, 0); 737 } 738 739 740 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 741 thread_exit(thread, now); 742 } 743 744 thread_update_stats(thread, spdk_get_ticks(), now, rc); 745 746 tls_thread = orig_thread; 747 748 return rc; 749 } 750 751 uint64_t 752 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 753 { 754 struct spdk_poller *poller; 755 756 poller = TAILQ_FIRST(&thread->timed_pollers); 757 if (poller) { 758 return poller->next_run_tick; 759 } 760 761 return 0; 762 } 763 764 int 765 spdk_thread_has_active_pollers(struct spdk_thread *thread) 766 { 767 return !TAILQ_EMPTY(&thread->active_pollers); 768 } 769 770 static bool 771 thread_has_unpaused_pollers(struct spdk_thread *thread) 772 { 773 if (TAILQ_EMPTY(&thread->active_pollers) && 774 TAILQ_EMPTY(&thread->timed_pollers)) { 775 return false; 776 } 777 778 return true; 779 } 780 781 bool 782 spdk_thread_has_pollers(struct spdk_thread *thread) 783 { 784 if (!thread_has_unpaused_pollers(thread) && 785 TAILQ_EMPTY(&thread->paused_pollers)) { 786 return false; 787 } 788 789 return true; 790 } 791 792 bool 793 spdk_thread_is_idle(struct spdk_thread *thread) 794 { 795 if (spdk_ring_count(thread->messages) || 796 thread_has_unpaused_pollers(thread) || 797 thread->critical_msg != NULL) { 798 return false; 799 } 800 801 return true; 802 } 803 804 uint32_t 805 spdk_thread_get_count(void) 806 { 807 /* 808 * Return cached value of the current thread count. We could acquire the 809 * lock and iterate through the TAILQ of threads to count them, but that 810 * count could still be invalidated after we release the lock. 811 */ 812 return g_thread_count; 813 } 814 815 struct spdk_thread * 816 spdk_get_thread(void) 817 { 818 return _get_thread(); 819 } 820 821 const char * 822 spdk_thread_get_name(const struct spdk_thread *thread) 823 { 824 return thread->name; 825 } 826 827 uint64_t 828 spdk_thread_get_id(const struct spdk_thread *thread) 829 { 830 return thread->id; 831 } 832 833 struct spdk_thread * 834 spdk_thread_get_by_id(uint64_t id) 835 { 836 struct spdk_thread *thread; 837 838 if (id == 0 || id >= g_thread_id) { 839 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 840 return NULL; 841 } 842 pthread_mutex_lock(&g_devlist_mutex); 843 TAILQ_FOREACH(thread, &g_threads, tailq) { 844 if (thread->id == id) { 845 break; 846 } 847 } 848 pthread_mutex_unlock(&g_devlist_mutex); 849 return thread; 850 } 851 852 int 853 spdk_thread_get_stats(struct spdk_thread_stats *stats) 854 { 855 struct spdk_thread *thread; 856 857 thread = _get_thread(); 858 if (!thread) { 859 SPDK_ERRLOG("No thread allocated\n"); 860 return -EINVAL; 861 } 862 863 if (stats == NULL) { 864 return -EINVAL; 865 } 866 867 *stats = thread->stats; 868 869 return 0; 870 } 871 872 uint64_t 873 spdk_thread_get_last_tsc(struct spdk_thread *thread) 874 { 875 if (thread == NULL) { 876 thread = _get_thread(); 877 } 878 879 return thread->tsc_last; 880 } 881 882 int 883 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 884 { 885 struct spdk_thread *local_thread; 886 struct spdk_msg *msg; 887 int rc; 888 889 assert(thread != NULL); 890 891 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 892 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 893 return -EIO; 894 } 895 896 local_thread = _get_thread(); 897 898 msg = NULL; 899 if (local_thread != NULL) { 900 if (local_thread->msg_cache_count > 0) { 901 msg = SLIST_FIRST(&local_thread->msg_cache); 902 assert(msg != NULL); 903 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 904 local_thread->msg_cache_count--; 905 } 906 } 907 908 if (msg == NULL) { 909 msg = spdk_mempool_get(g_spdk_msg_mempool); 910 if (!msg) { 911 SPDK_ERRLOG("msg could not be allocated\n"); 912 return -ENOMEM; 913 } 914 } 915 916 msg->fn = fn; 917 msg->arg = ctx; 918 919 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 920 if (rc != 1) { 921 SPDK_ERRLOG("msg could not be enqueued\n"); 922 spdk_mempool_put(g_spdk_msg_mempool, msg); 923 return -EIO; 924 } 925 926 if (thread->interrupt_mode) { 927 uint64_t notify = 1; 928 929 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 930 if (rc < 0) { 931 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 932 return -EIO; 933 } 934 } 935 936 return 0; 937 } 938 939 int 940 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 941 { 942 spdk_msg_fn expected = NULL; 943 944 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 945 __ATOMIC_SEQ_CST)) { 946 if (thread->interrupt_mode) { 947 uint64_t notify = 1; 948 int rc; 949 950 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 951 if (rc < 0) { 952 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 953 return -EIO; 954 } 955 } 956 957 return 0; 958 } 959 960 return -EIO; 961 } 962 963 #ifdef __linux__ 964 static int 965 interrupt_timerfd_prepare(uint64_t period_microseconds) 966 { 967 int timerfd; 968 int ret; 969 struct itimerspec new_tv; 970 uint64_t period_seconds; 971 uint64_t period_nanoseconds; 972 973 if (period_microseconds == 0) { 974 return -EINVAL; 975 } 976 977 period_seconds = period_microseconds / SPDK_SEC_TO_USEC; 978 period_nanoseconds = period_microseconds % SPDK_SEC_TO_USEC * 1000; 979 980 new_tv.it_value.tv_sec = period_seconds; 981 new_tv.it_value.tv_nsec = period_nanoseconds; 982 983 new_tv.it_interval.tv_sec = period_seconds; 984 new_tv.it_interval.tv_nsec = period_nanoseconds; 985 986 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK & TFD_CLOEXEC); 987 if (timerfd < 0) { 988 return -errno; 989 } 990 991 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 992 if (ret < 0) { 993 close(timerfd); 994 return -errno; 995 } 996 997 return timerfd; 998 } 999 #else 1000 static int 1001 interrupt_timerfd_prepare(uint64_t period_microseconds) 1002 { 1003 return -ENOTSUP; 1004 } 1005 #endif 1006 1007 static int 1008 interrupt_timerfd_process(void *arg) 1009 { 1010 struct spdk_poller *poller = arg; 1011 uint64_t exp; 1012 int rc; 1013 1014 /* clear the level of interval timer */ 1015 rc = read(poller->timerfd, &exp, sizeof(exp)); 1016 if (rc < 0) { 1017 if (rc == -EAGAIN) { 1018 return 0; 1019 } 1020 1021 return rc; 1022 } 1023 1024 return poller->fn(poller->arg); 1025 } 1026 1027 static int 1028 thread_interrupt_register_timerfd(struct spdk_fd_group *fgrp, 1029 uint64_t period_microseconds, 1030 struct spdk_poller *poller) 1031 { 1032 int timerfd; 1033 int rc; 1034 1035 timerfd = interrupt_timerfd_prepare(period_microseconds); 1036 if (timerfd < 0) { 1037 return timerfd; 1038 } 1039 1040 rc = spdk_fd_group_add(fgrp, timerfd, 1041 interrupt_timerfd_process, poller); 1042 if (rc < 0) { 1043 close(timerfd); 1044 return rc; 1045 } 1046 1047 return timerfd; 1048 } 1049 1050 static struct spdk_poller * 1051 poller_register(spdk_poller_fn fn, 1052 void *arg, 1053 uint64_t period_microseconds, 1054 const char *name) 1055 { 1056 struct spdk_thread *thread; 1057 struct spdk_poller *poller; 1058 uint64_t quotient, remainder, ticks; 1059 1060 thread = spdk_get_thread(); 1061 if (!thread) { 1062 assert(false); 1063 return NULL; 1064 } 1065 1066 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1067 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1068 return NULL; 1069 } 1070 1071 poller = calloc(1, sizeof(*poller)); 1072 if (poller == NULL) { 1073 SPDK_ERRLOG("Poller memory allocation failed\n"); 1074 return NULL; 1075 } 1076 1077 if (name) { 1078 snprintf(poller->name, sizeof(poller->name), "%s", name); 1079 } else { 1080 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1081 } 1082 1083 poller->state = SPDK_POLLER_STATE_WAITING; 1084 poller->fn = fn; 1085 poller->arg = arg; 1086 poller->thread = thread; 1087 1088 if (period_microseconds) { 1089 quotient = period_microseconds / SPDK_SEC_TO_USEC; 1090 remainder = period_microseconds % SPDK_SEC_TO_USEC; 1091 ticks = spdk_get_ticks_hz(); 1092 1093 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1094 } else { 1095 poller->period_ticks = 0; 1096 } 1097 1098 if (thread->interrupt_mode && period_microseconds != 0) { 1099 int rc; 1100 1101 rc = thread_interrupt_register_timerfd(thread->fgrp, period_microseconds, poller); 1102 if (rc < 0) { 1103 SPDK_ERRLOG("Failed to register timerfd for periodic poller: %s\n", spdk_strerror(-rc)); 1104 free(poller); 1105 return NULL; 1106 } 1107 poller->timerfd = rc; 1108 } 1109 1110 thread_insert_poller(thread, poller); 1111 1112 return poller; 1113 } 1114 1115 struct spdk_poller * 1116 spdk_poller_register(spdk_poller_fn fn, 1117 void *arg, 1118 uint64_t period_microseconds) 1119 { 1120 return poller_register(fn, arg, period_microseconds, NULL); 1121 } 1122 1123 struct spdk_poller * 1124 spdk_poller_register_named(spdk_poller_fn fn, 1125 void *arg, 1126 uint64_t period_microseconds, 1127 const char *name) 1128 { 1129 return poller_register(fn, arg, period_microseconds, name); 1130 } 1131 1132 void 1133 spdk_poller_unregister(struct spdk_poller **ppoller) 1134 { 1135 struct spdk_thread *thread; 1136 struct spdk_poller *poller; 1137 1138 poller = *ppoller; 1139 if (poller == NULL) { 1140 return; 1141 } 1142 1143 *ppoller = NULL; 1144 1145 thread = spdk_get_thread(); 1146 if (!thread) { 1147 assert(false); 1148 return; 1149 } 1150 1151 if (poller->thread != thread) { 1152 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1153 assert(false); 1154 return; 1155 } 1156 1157 if (thread->interrupt_mode && poller->timerfd) { 1158 spdk_fd_group_remove(thread->fgrp, poller->timerfd); 1159 close(poller->timerfd); 1160 poller->timerfd = 0; 1161 } 1162 1163 /* If the poller was paused, put it on the active_pollers list so that 1164 * its unregistration can be processed by spdk_thread_poll(). 1165 */ 1166 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1167 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1168 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1169 poller->period_ticks = 0; 1170 } 1171 1172 /* Simply set the state to unregistered. The poller will get cleaned up 1173 * in a subsequent call to spdk_thread_poll(). 1174 */ 1175 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1176 } 1177 1178 void 1179 spdk_poller_pause(struct spdk_poller *poller) 1180 { 1181 struct spdk_thread *thread; 1182 1183 if (poller->state == SPDK_POLLER_STATE_PAUSED || 1184 poller->state == SPDK_POLLER_STATE_PAUSING) { 1185 return; 1186 } 1187 1188 thread = spdk_get_thread(); 1189 if (!thread) { 1190 assert(false); 1191 return; 1192 } 1193 1194 /* If a poller is paused from within itself, we can immediately move it 1195 * on the paused_pollers list. Otherwise we just set its state to 1196 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 1197 * allows a poller to be paused from another one's context without 1198 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 1199 */ 1200 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 1201 poller->state = SPDK_POLLER_STATE_PAUSING; 1202 } else { 1203 if (poller->period_ticks > 0) { 1204 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 1205 } else { 1206 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1207 } 1208 1209 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1210 poller->state = SPDK_POLLER_STATE_PAUSED; 1211 } 1212 } 1213 1214 void 1215 spdk_poller_resume(struct spdk_poller *poller) 1216 { 1217 struct spdk_thread *thread; 1218 1219 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1220 poller->state != SPDK_POLLER_STATE_PAUSING) { 1221 return; 1222 } 1223 1224 thread = spdk_get_thread(); 1225 if (!thread) { 1226 assert(false); 1227 return; 1228 } 1229 1230 /* If a poller is paused it has to be removed from the paused pollers 1231 * list and put on the active / timer list depending on its 1232 * period_ticks. If a poller is still in the process of being paused, 1233 * we just need to flip its state back to waiting, as it's already on 1234 * the appropriate list. 1235 */ 1236 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1237 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1238 thread_insert_poller(thread, poller); 1239 } 1240 1241 poller->state = SPDK_POLLER_STATE_WAITING; 1242 } 1243 1244 const char * 1245 spdk_poller_state_str(enum spdk_poller_state state) 1246 { 1247 switch (state) { 1248 case SPDK_POLLER_STATE_WAITING: 1249 return "waiting"; 1250 case SPDK_POLLER_STATE_RUNNING: 1251 return "running"; 1252 case SPDK_POLLER_STATE_UNREGISTERED: 1253 return "unregistered"; 1254 case SPDK_POLLER_STATE_PAUSING: 1255 return "pausing"; 1256 case SPDK_POLLER_STATE_PAUSED: 1257 return "paused"; 1258 default: 1259 return NULL; 1260 } 1261 } 1262 1263 struct call_thread { 1264 struct spdk_thread *cur_thread; 1265 spdk_msg_fn fn; 1266 void *ctx; 1267 1268 struct spdk_thread *orig_thread; 1269 spdk_msg_fn cpl; 1270 }; 1271 1272 static void 1273 _on_thread(void *ctx) 1274 { 1275 struct call_thread *ct = ctx; 1276 int rc __attribute__((unused)); 1277 1278 ct->fn(ct->ctx); 1279 1280 pthread_mutex_lock(&g_devlist_mutex); 1281 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1282 pthread_mutex_unlock(&g_devlist_mutex); 1283 1284 if (!ct->cur_thread) { 1285 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1286 1287 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1288 free(ctx); 1289 } else { 1290 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1291 ct->cur_thread->name); 1292 1293 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1294 } 1295 assert(rc == 0); 1296 } 1297 1298 void 1299 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1300 { 1301 struct call_thread *ct; 1302 struct spdk_thread *thread; 1303 int rc __attribute__((unused)); 1304 1305 ct = calloc(1, sizeof(*ct)); 1306 if (!ct) { 1307 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1308 cpl(ctx); 1309 return; 1310 } 1311 1312 ct->fn = fn; 1313 ct->ctx = ctx; 1314 ct->cpl = cpl; 1315 1316 thread = _get_thread(); 1317 if (!thread) { 1318 SPDK_ERRLOG("No thread allocated\n"); 1319 free(ct); 1320 cpl(ctx); 1321 return; 1322 } 1323 ct->orig_thread = thread; 1324 1325 pthread_mutex_lock(&g_devlist_mutex); 1326 ct->cur_thread = TAILQ_FIRST(&g_threads); 1327 pthread_mutex_unlock(&g_devlist_mutex); 1328 1329 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1330 ct->orig_thread->name); 1331 1332 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1333 assert(rc == 0); 1334 } 1335 1336 void 1337 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1338 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1339 const char *name) 1340 { 1341 struct io_device *dev, *tmp; 1342 struct spdk_thread *thread; 1343 1344 assert(io_device != NULL); 1345 assert(create_cb != NULL); 1346 assert(destroy_cb != NULL); 1347 1348 thread = spdk_get_thread(); 1349 if (!thread) { 1350 SPDK_ERRLOG("called from non-SPDK thread\n"); 1351 assert(false); 1352 return; 1353 } 1354 1355 dev = calloc(1, sizeof(struct io_device)); 1356 if (dev == NULL) { 1357 SPDK_ERRLOG("could not allocate io_device\n"); 1358 return; 1359 } 1360 1361 dev->io_device = io_device; 1362 if (name) { 1363 snprintf(dev->name, sizeof(dev->name), "%s", name); 1364 } else { 1365 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1366 } 1367 dev->create_cb = create_cb; 1368 dev->destroy_cb = destroy_cb; 1369 dev->unregister_cb = NULL; 1370 dev->ctx_size = ctx_size; 1371 dev->for_each_count = 0; 1372 dev->unregistered = false; 1373 dev->refcnt = 0; 1374 1375 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1376 dev->name, dev->io_device, thread->name); 1377 1378 pthread_mutex_lock(&g_devlist_mutex); 1379 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1380 if (tmp->io_device == io_device) { 1381 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1382 io_device, tmp->name, dev->name); 1383 free(dev); 1384 pthread_mutex_unlock(&g_devlist_mutex); 1385 return; 1386 } 1387 } 1388 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1389 pthread_mutex_unlock(&g_devlist_mutex); 1390 } 1391 1392 static void 1393 _finish_unregister(void *arg) 1394 { 1395 struct io_device *dev = arg; 1396 1397 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1398 dev->name, dev->io_device, dev->unregister_thread->name); 1399 1400 dev->unregister_cb(dev->io_device); 1401 free(dev); 1402 } 1403 1404 static void 1405 io_device_free(struct io_device *dev) 1406 { 1407 int rc __attribute__((unused)); 1408 1409 if (dev->unregister_cb == NULL) { 1410 free(dev); 1411 } else { 1412 assert(dev->unregister_thread != NULL); 1413 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1414 dev->name, dev->io_device, dev->unregister_thread->name); 1415 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1416 assert(rc == 0); 1417 } 1418 } 1419 1420 void 1421 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1422 { 1423 struct io_device *dev; 1424 uint32_t refcnt; 1425 struct spdk_thread *thread; 1426 1427 thread = spdk_get_thread(); 1428 if (!thread) { 1429 SPDK_ERRLOG("called from non-SPDK thread\n"); 1430 assert(false); 1431 return; 1432 } 1433 1434 pthread_mutex_lock(&g_devlist_mutex); 1435 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1436 if (dev->io_device == io_device) { 1437 break; 1438 } 1439 } 1440 1441 if (!dev) { 1442 SPDK_ERRLOG("io_device %p not found\n", io_device); 1443 assert(false); 1444 pthread_mutex_unlock(&g_devlist_mutex); 1445 return; 1446 } 1447 1448 if (dev->for_each_count > 0) { 1449 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1450 dev->name, io_device, dev->for_each_count); 1451 pthread_mutex_unlock(&g_devlist_mutex); 1452 return; 1453 } 1454 1455 dev->unregister_cb = unregister_cb; 1456 dev->unregistered = true; 1457 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1458 refcnt = dev->refcnt; 1459 dev->unregister_thread = thread; 1460 pthread_mutex_unlock(&g_devlist_mutex); 1461 1462 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1463 dev->name, dev->io_device, thread->name); 1464 1465 if (refcnt > 0) { 1466 /* defer deletion */ 1467 return; 1468 } 1469 1470 io_device_free(dev); 1471 } 1472 1473 const char * 1474 spdk_io_device_get_name(struct io_device *dev) 1475 { 1476 return dev->name; 1477 } 1478 1479 struct spdk_io_channel * 1480 spdk_get_io_channel(void *io_device) 1481 { 1482 struct spdk_io_channel *ch; 1483 struct spdk_thread *thread; 1484 struct io_device *dev; 1485 int rc; 1486 1487 pthread_mutex_lock(&g_devlist_mutex); 1488 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1489 if (dev->io_device == io_device) { 1490 break; 1491 } 1492 } 1493 if (dev == NULL) { 1494 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1495 pthread_mutex_unlock(&g_devlist_mutex); 1496 return NULL; 1497 } 1498 1499 thread = _get_thread(); 1500 if (!thread) { 1501 SPDK_ERRLOG("No thread allocated\n"); 1502 pthread_mutex_unlock(&g_devlist_mutex); 1503 return NULL; 1504 } 1505 1506 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1507 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1508 pthread_mutex_unlock(&g_devlist_mutex); 1509 return NULL; 1510 } 1511 1512 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1513 if (ch->dev == dev) { 1514 ch->ref++; 1515 1516 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1517 ch, dev->name, dev->io_device, thread->name, ch->ref); 1518 1519 /* 1520 * An I/O channel already exists for this device on this 1521 * thread, so return it. 1522 */ 1523 pthread_mutex_unlock(&g_devlist_mutex); 1524 return ch; 1525 } 1526 } 1527 1528 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1529 if (ch == NULL) { 1530 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1531 pthread_mutex_unlock(&g_devlist_mutex); 1532 return NULL; 1533 } 1534 1535 ch->dev = dev; 1536 ch->destroy_cb = dev->destroy_cb; 1537 ch->thread = thread; 1538 ch->ref = 1; 1539 ch->destroy_ref = 0; 1540 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1541 1542 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1543 ch, dev->name, dev->io_device, thread->name, ch->ref); 1544 1545 dev->refcnt++; 1546 1547 pthread_mutex_unlock(&g_devlist_mutex); 1548 1549 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1550 if (rc != 0) { 1551 pthread_mutex_lock(&g_devlist_mutex); 1552 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1553 dev->refcnt--; 1554 free(ch); 1555 pthread_mutex_unlock(&g_devlist_mutex); 1556 return NULL; 1557 } 1558 1559 return ch; 1560 } 1561 1562 static void 1563 put_io_channel(void *arg) 1564 { 1565 struct spdk_io_channel *ch = arg; 1566 bool do_remove_dev = true; 1567 struct spdk_thread *thread; 1568 1569 thread = spdk_get_thread(); 1570 if (!thread) { 1571 SPDK_ERRLOG("called from non-SPDK thread\n"); 1572 assert(false); 1573 return; 1574 } 1575 1576 SPDK_DEBUGLOG(thread, 1577 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1578 ch, ch->dev->name, ch->dev->io_device, thread->name); 1579 1580 assert(ch->thread == thread); 1581 1582 ch->destroy_ref--; 1583 1584 if (ch->ref > 0 || ch->destroy_ref > 0) { 1585 /* 1586 * Another reference to the associated io_device was requested 1587 * after this message was sent but before it had a chance to 1588 * execute. 1589 */ 1590 return; 1591 } 1592 1593 pthread_mutex_lock(&g_devlist_mutex); 1594 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1595 pthread_mutex_unlock(&g_devlist_mutex); 1596 1597 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1598 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1599 1600 pthread_mutex_lock(&g_devlist_mutex); 1601 ch->dev->refcnt--; 1602 1603 if (!ch->dev->unregistered) { 1604 do_remove_dev = false; 1605 } 1606 1607 if (ch->dev->refcnt > 0) { 1608 do_remove_dev = false; 1609 } 1610 1611 pthread_mutex_unlock(&g_devlist_mutex); 1612 1613 if (do_remove_dev) { 1614 io_device_free(ch->dev); 1615 } 1616 free(ch); 1617 } 1618 1619 void 1620 spdk_put_io_channel(struct spdk_io_channel *ch) 1621 { 1622 struct spdk_thread *thread; 1623 int rc __attribute__((unused)); 1624 1625 thread = spdk_get_thread(); 1626 if (!thread) { 1627 SPDK_ERRLOG("called from non-SPDK thread\n"); 1628 assert(false); 1629 return; 1630 } 1631 1632 if (ch->thread != thread) { 1633 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1634 assert(false); 1635 return; 1636 } 1637 1638 SPDK_DEBUGLOG(thread, 1639 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1640 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1641 1642 ch->ref--; 1643 1644 if (ch->ref == 0) { 1645 ch->destroy_ref++; 1646 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 1647 assert(rc == 0); 1648 } 1649 } 1650 1651 struct spdk_io_channel * 1652 spdk_io_channel_from_ctx(void *ctx) 1653 { 1654 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1655 } 1656 1657 struct spdk_thread * 1658 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1659 { 1660 return ch->thread; 1661 } 1662 1663 struct spdk_io_channel_iter { 1664 void *io_device; 1665 struct io_device *dev; 1666 spdk_channel_msg fn; 1667 int status; 1668 void *ctx; 1669 struct spdk_io_channel *ch; 1670 1671 struct spdk_thread *cur_thread; 1672 1673 struct spdk_thread *orig_thread; 1674 spdk_channel_for_each_cpl cpl; 1675 }; 1676 1677 void * 1678 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1679 { 1680 return i->io_device; 1681 } 1682 1683 struct spdk_io_channel * 1684 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1685 { 1686 return i->ch; 1687 } 1688 1689 void * 1690 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1691 { 1692 return i->ctx; 1693 } 1694 1695 static void 1696 _call_completion(void *ctx) 1697 { 1698 struct spdk_io_channel_iter *i = ctx; 1699 1700 if (i->cpl != NULL) { 1701 i->cpl(i, i->status); 1702 } 1703 free(i); 1704 } 1705 1706 static void 1707 _call_channel(void *ctx) 1708 { 1709 struct spdk_io_channel_iter *i = ctx; 1710 struct spdk_io_channel *ch; 1711 1712 /* 1713 * It is possible that the channel was deleted before this 1714 * message had a chance to execute. If so, skip calling 1715 * the fn() on this thread. 1716 */ 1717 pthread_mutex_lock(&g_devlist_mutex); 1718 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1719 if (ch->dev->io_device == i->io_device) { 1720 break; 1721 } 1722 } 1723 pthread_mutex_unlock(&g_devlist_mutex); 1724 1725 if (ch) { 1726 i->fn(i); 1727 } else { 1728 spdk_for_each_channel_continue(i, 0); 1729 } 1730 } 1731 1732 void 1733 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1734 spdk_channel_for_each_cpl cpl) 1735 { 1736 struct spdk_thread *thread; 1737 struct spdk_io_channel *ch; 1738 struct spdk_io_channel_iter *i; 1739 int rc __attribute__((unused)); 1740 1741 i = calloc(1, sizeof(*i)); 1742 if (!i) { 1743 SPDK_ERRLOG("Unable to allocate iterator\n"); 1744 return; 1745 } 1746 1747 i->io_device = io_device; 1748 i->fn = fn; 1749 i->ctx = ctx; 1750 i->cpl = cpl; 1751 1752 pthread_mutex_lock(&g_devlist_mutex); 1753 i->orig_thread = _get_thread(); 1754 1755 TAILQ_FOREACH(thread, &g_threads, tailq) { 1756 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1757 if (ch->dev->io_device == io_device) { 1758 ch->dev->for_each_count++; 1759 i->dev = ch->dev; 1760 i->cur_thread = thread; 1761 i->ch = ch; 1762 pthread_mutex_unlock(&g_devlist_mutex); 1763 rc = spdk_thread_send_msg(thread, _call_channel, i); 1764 assert(rc == 0); 1765 return; 1766 } 1767 } 1768 } 1769 1770 pthread_mutex_unlock(&g_devlist_mutex); 1771 1772 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1773 assert(rc == 0); 1774 } 1775 1776 void 1777 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1778 { 1779 struct spdk_thread *thread; 1780 struct spdk_io_channel *ch; 1781 int rc __attribute__((unused)); 1782 1783 assert(i->cur_thread == spdk_get_thread()); 1784 1785 i->status = status; 1786 1787 pthread_mutex_lock(&g_devlist_mutex); 1788 if (status) { 1789 goto end; 1790 } 1791 thread = TAILQ_NEXT(i->cur_thread, tailq); 1792 while (thread) { 1793 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1794 if (ch->dev->io_device == i->io_device) { 1795 i->cur_thread = thread; 1796 i->ch = ch; 1797 pthread_mutex_unlock(&g_devlist_mutex); 1798 rc = spdk_thread_send_msg(thread, _call_channel, i); 1799 assert(rc == 0); 1800 return; 1801 } 1802 } 1803 thread = TAILQ_NEXT(thread, tailq); 1804 } 1805 1806 end: 1807 i->dev->for_each_count--; 1808 i->ch = NULL; 1809 pthread_mutex_unlock(&g_devlist_mutex); 1810 1811 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1812 assert(rc == 0); 1813 } 1814 1815 struct spdk_interrupt { 1816 int efd; 1817 struct spdk_thread *thread; 1818 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 1819 }; 1820 1821 static void 1822 thread_interrupt_destroy(struct spdk_thread *thread) 1823 { 1824 struct spdk_fd_group *fgrp = thread->fgrp; 1825 1826 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 1827 1828 if (thread->msg_fd <= 0) { 1829 return; 1830 } 1831 1832 spdk_fd_group_remove(fgrp, thread->msg_fd); 1833 close(thread->msg_fd); 1834 1835 spdk_fd_group_destroy(fgrp); 1836 thread->fgrp = NULL; 1837 } 1838 1839 #ifdef __linux__ 1840 static int 1841 thread_interrupt_msg_process(void *arg) 1842 { 1843 struct spdk_thread *thread = arg; 1844 struct spdk_thread *orig_thread; 1845 uint32_t msg_count; 1846 spdk_msg_fn critical_msg; 1847 int rc = 0; 1848 uint64_t now = spdk_get_ticks(); 1849 1850 orig_thread = _get_thread(); 1851 tls_thread = thread; 1852 1853 critical_msg = thread->critical_msg; 1854 if (spdk_unlikely(critical_msg != NULL)) { 1855 critical_msg(NULL); 1856 thread->critical_msg = NULL; 1857 } 1858 1859 msg_count = msg_queue_run_batch(thread, 0); 1860 if (msg_count) { 1861 rc = 1; 1862 } 1863 1864 thread_update_stats(thread, spdk_get_ticks(), now, rc); 1865 1866 tls_thread = orig_thread; 1867 1868 return rc; 1869 } 1870 1871 static int 1872 thread_interrupt_create(struct spdk_thread *thread) 1873 { 1874 int rc; 1875 1876 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 1877 1878 rc = spdk_fd_group_create(&thread->fgrp); 1879 if (rc) { 1880 return rc; 1881 } 1882 1883 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1884 if (thread->msg_fd < 0) { 1885 rc = -errno; 1886 spdk_fd_group_destroy(thread->fgrp); 1887 thread->fgrp = NULL; 1888 1889 return rc; 1890 } 1891 1892 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 1893 } 1894 #else 1895 static int 1896 thread_interrupt_create(struct spdk_thread *thread) 1897 { 1898 return -ENOTSUP; 1899 } 1900 #endif 1901 1902 struct spdk_interrupt * 1903 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 1904 void *arg, const char *name) 1905 { 1906 struct spdk_thread *thread; 1907 struct spdk_interrupt *intr; 1908 1909 thread = spdk_get_thread(); 1910 if (!thread) { 1911 assert(false); 1912 return NULL; 1913 } 1914 1915 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 1916 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1917 return NULL; 1918 } 1919 1920 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 1921 return NULL; 1922 } 1923 1924 intr = calloc(1, sizeof(*intr)); 1925 if (intr == NULL) { 1926 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 1927 return NULL; 1928 } 1929 1930 if (name) { 1931 snprintf(intr->name, sizeof(intr->name), "%s", name); 1932 } else { 1933 snprintf(intr->name, sizeof(intr->name), "%p", fn); 1934 } 1935 1936 intr->efd = efd; 1937 intr->thread = thread; 1938 1939 return intr; 1940 } 1941 1942 void 1943 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 1944 { 1945 struct spdk_thread *thread; 1946 struct spdk_interrupt *intr; 1947 1948 intr = *pintr; 1949 if (intr == NULL) { 1950 return; 1951 } 1952 1953 *pintr = NULL; 1954 1955 thread = spdk_get_thread(); 1956 if (!thread) { 1957 assert(false); 1958 return; 1959 } 1960 1961 if (intr->thread != thread) { 1962 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 1963 assert(false); 1964 return; 1965 } 1966 1967 spdk_fd_group_remove(thread->fgrp, intr->efd); 1968 free(intr); 1969 } 1970 1971 int 1972 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 1973 enum spdk_interrupt_event_types event_types) 1974 { 1975 struct spdk_thread *thread; 1976 1977 thread = spdk_get_thread(); 1978 if (!thread) { 1979 assert(false); 1980 return -EINVAL; 1981 } 1982 1983 if (intr->thread != thread) { 1984 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 1985 assert(false); 1986 return -EINVAL; 1987 } 1988 1989 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 1990 } 1991 1992 int 1993 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 1994 { 1995 return spdk_fd_group_get_fd(thread->fgrp); 1996 } 1997 1998 static bool g_interrupt_mode = false; 1999 2000 int 2001 spdk_interrupt_mode_enable(void) 2002 { 2003 #ifdef __linux__ 2004 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2005 g_interrupt_mode = true; 2006 return 0; 2007 #else 2008 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2009 g_interrupt_mode = false; 2010 return -ENOTSUP; 2011 #endif 2012 } 2013 2014 bool 2015 spdk_interrupt_mode_is_enabled(void) 2016 { 2017 return g_interrupt_mode; 2018 } 2019 2020 SPDK_LOG_REGISTER_COMPONENT(thread) 2021