1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 #include "spdk/fd_group.h" 43 44 #include "spdk/log.h" 45 #include "spdk_internal/thread.h" 46 47 #ifdef __linux__ 48 #include <sys/timerfd.h> 49 #include <sys/eventfd.h> 50 #endif 51 52 #define SPDK_MSG_BATCH_SIZE 8 53 #define SPDK_MAX_DEVICE_NAME_LEN 256 54 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 55 56 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 57 58 static spdk_new_thread_fn g_new_thread_fn = NULL; 59 static spdk_thread_op_fn g_thread_op_fn = NULL; 60 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 61 static size_t g_ctx_sz = 0; 62 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 63 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 64 * SPDK application is required. 65 */ 66 static uint64_t g_thread_id = 1; 67 68 struct io_device { 69 void *io_device; 70 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 71 spdk_io_channel_create_cb create_cb; 72 spdk_io_channel_destroy_cb destroy_cb; 73 spdk_io_device_unregister_cb unregister_cb; 74 struct spdk_thread *unregister_thread; 75 uint32_t ctx_size; 76 uint32_t for_each_count; 77 TAILQ_ENTRY(io_device) tailq; 78 79 uint32_t refcnt; 80 81 bool unregistered; 82 }; 83 84 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 85 86 struct spdk_msg { 87 spdk_msg_fn fn; 88 void *arg; 89 90 SLIST_ENTRY(spdk_msg) link; 91 }; 92 93 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 94 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 95 96 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 97 static uint32_t g_thread_count = 0; 98 99 static __thread struct spdk_thread *tls_thread = NULL; 100 101 static inline struct spdk_thread * 102 _get_thread(void) 103 { 104 return tls_thread; 105 } 106 107 static int 108 _thread_lib_init(size_t ctx_sz) 109 { 110 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 111 112 g_ctx_sz = ctx_sz; 113 114 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 115 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 116 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 117 sizeof(struct spdk_msg), 118 0, /* No cache. We do our own. */ 119 SPDK_ENV_SOCKET_ID_ANY); 120 121 if (!g_spdk_msg_mempool) { 122 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 123 return -1; 124 } 125 126 return 0; 127 } 128 129 int 130 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 131 { 132 assert(g_new_thread_fn == NULL); 133 assert(g_thread_op_fn == NULL); 134 135 if (new_thread_fn == NULL) { 136 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 137 } else { 138 g_new_thread_fn = new_thread_fn; 139 } 140 141 return _thread_lib_init(ctx_sz); 142 } 143 144 int 145 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 146 spdk_thread_op_supported_fn thread_op_supported_fn, 147 size_t ctx_sz) 148 { 149 assert(g_new_thread_fn == NULL); 150 assert(g_thread_op_fn == NULL); 151 assert(g_thread_op_supported_fn == NULL); 152 153 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 154 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 155 return -EINVAL; 156 } 157 158 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 159 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 160 } else { 161 g_thread_op_fn = thread_op_fn; 162 g_thread_op_supported_fn = thread_op_supported_fn; 163 } 164 165 return _thread_lib_init(ctx_sz); 166 } 167 168 void 169 spdk_thread_lib_fini(void) 170 { 171 struct io_device *dev; 172 173 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 174 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 175 } 176 177 if (g_spdk_msg_mempool) { 178 spdk_mempool_free(g_spdk_msg_mempool); 179 g_spdk_msg_mempool = NULL; 180 } 181 182 g_new_thread_fn = NULL; 183 g_thread_op_fn = NULL; 184 g_thread_op_supported_fn = NULL; 185 g_ctx_sz = 0; 186 } 187 188 static void thread_interrupt_destroy(struct spdk_thread *thread); 189 static int thread_interrupt_create(struct spdk_thread *thread); 190 191 static void 192 _free_thread(struct spdk_thread *thread) 193 { 194 struct spdk_io_channel *ch; 195 struct spdk_msg *msg; 196 struct spdk_poller *poller, *ptmp; 197 198 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 199 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 200 thread->name, ch->dev->name); 201 } 202 203 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 204 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 205 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 206 poller->name); 207 } 208 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 209 free(poller); 210 } 211 212 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 213 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 214 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 215 poller->name); 216 } 217 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 218 free(poller); 219 } 220 221 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 222 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 223 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 224 free(poller); 225 } 226 227 pthread_mutex_lock(&g_devlist_mutex); 228 assert(g_thread_count > 0); 229 g_thread_count--; 230 TAILQ_REMOVE(&g_threads, thread, tailq); 231 pthread_mutex_unlock(&g_devlist_mutex); 232 233 msg = SLIST_FIRST(&thread->msg_cache); 234 while (msg != NULL) { 235 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 236 237 assert(thread->msg_cache_count > 0); 238 thread->msg_cache_count--; 239 spdk_mempool_put(g_spdk_msg_mempool, msg); 240 241 msg = SLIST_FIRST(&thread->msg_cache); 242 } 243 244 assert(thread->msg_cache_count == 0); 245 246 if (spdk_interrupt_mode_is_enabled()) { 247 thread_interrupt_destroy(thread); 248 } 249 250 spdk_ring_free(thread->messages); 251 free(thread); 252 } 253 254 struct spdk_thread * 255 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 256 { 257 struct spdk_thread *thread; 258 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 259 int rc = 0, i; 260 261 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 262 if (!thread) { 263 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 264 return NULL; 265 } 266 267 if (cpumask) { 268 spdk_cpuset_copy(&thread->cpumask, cpumask); 269 } else { 270 spdk_cpuset_negate(&thread->cpumask); 271 } 272 273 TAILQ_INIT(&thread->io_channels); 274 TAILQ_INIT(&thread->active_pollers); 275 TAILQ_INIT(&thread->timed_pollers); 276 TAILQ_INIT(&thread->paused_pollers); 277 SLIST_INIT(&thread->msg_cache); 278 thread->msg_cache_count = 0; 279 280 thread->tsc_last = spdk_get_ticks(); 281 282 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 283 if (!thread->messages) { 284 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 285 free(thread); 286 return NULL; 287 } 288 289 /* Fill the local message pool cache. */ 290 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 291 if (rc == 0) { 292 /* If we can't populate the cache it's ok. The cache will get filled 293 * up organically as messages are passed to the thread. */ 294 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 295 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 296 thread->msg_cache_count++; 297 } 298 } 299 300 if (name) { 301 snprintf(thread->name, sizeof(thread->name), "%s", name); 302 } else { 303 snprintf(thread->name, sizeof(thread->name), "%p", thread); 304 } 305 306 pthread_mutex_lock(&g_devlist_mutex); 307 if (g_thread_id == 0) { 308 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 309 pthread_mutex_unlock(&g_devlist_mutex); 310 _free_thread(thread); 311 return NULL; 312 } 313 thread->id = g_thread_id++; 314 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 315 g_thread_count++; 316 pthread_mutex_unlock(&g_devlist_mutex); 317 318 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 319 thread->id, thread->name); 320 321 if (spdk_interrupt_mode_is_enabled()) { 322 thread->in_interrupt = true; 323 rc = thread_interrupt_create(thread); 324 if (rc != 0) { 325 _free_thread(thread); 326 return NULL; 327 } 328 } 329 330 if (g_new_thread_fn) { 331 rc = g_new_thread_fn(thread); 332 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 333 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 334 } 335 336 if (rc != 0) { 337 _free_thread(thread); 338 return NULL; 339 } 340 341 thread->state = SPDK_THREAD_STATE_RUNNING; 342 343 return thread; 344 } 345 346 void 347 spdk_set_thread(struct spdk_thread *thread) 348 { 349 tls_thread = thread; 350 } 351 352 static void 353 thread_exit(struct spdk_thread *thread, uint64_t now) 354 { 355 struct spdk_poller *poller; 356 struct spdk_io_channel *ch; 357 358 if (now >= thread->exit_timeout_tsc) { 359 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 360 thread->name); 361 goto exited; 362 } 363 364 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 365 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 366 SPDK_INFOLOG(thread, 367 "thread %s still has active poller %s\n", 368 thread->name, poller->name); 369 return; 370 } 371 } 372 373 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 374 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 375 SPDK_INFOLOG(thread, 376 "thread %s still has active timed poller %s\n", 377 thread->name, poller->name); 378 return; 379 } 380 } 381 382 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 383 SPDK_INFOLOG(thread, 384 "thread %s still has paused poller %s\n", 385 thread->name, poller->name); 386 return; 387 } 388 389 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 390 SPDK_INFOLOG(thread, 391 "thread %s still has channel for io_device %s\n", 392 thread->name, ch->dev->name); 393 return; 394 } 395 396 if (thread->pending_unregister_count > 0) { 397 SPDK_INFOLOG(thread, 398 "thread %s is still unregistering io_devices\n", 399 thread->name); 400 return; 401 } 402 403 exited: 404 thread->state = SPDK_THREAD_STATE_EXITED; 405 } 406 407 int 408 spdk_thread_exit(struct spdk_thread *thread) 409 { 410 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 411 412 assert(tls_thread == thread); 413 414 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 415 SPDK_INFOLOG(thread, 416 "thread %s is already exiting\n", 417 thread->name); 418 return 0; 419 } 420 421 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 422 SPDK_THREAD_EXIT_TIMEOUT_SEC); 423 thread->state = SPDK_THREAD_STATE_EXITING; 424 return 0; 425 } 426 427 bool 428 spdk_thread_is_exited(struct spdk_thread *thread) 429 { 430 return thread->state == SPDK_THREAD_STATE_EXITED; 431 } 432 433 void 434 spdk_thread_destroy(struct spdk_thread *thread) 435 { 436 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 437 438 assert(thread->state == SPDK_THREAD_STATE_EXITED); 439 440 if (tls_thread == thread) { 441 tls_thread = NULL; 442 } 443 444 _free_thread(thread); 445 } 446 447 void * 448 spdk_thread_get_ctx(struct spdk_thread *thread) 449 { 450 if (g_ctx_sz > 0) { 451 return thread->ctx; 452 } 453 454 return NULL; 455 } 456 457 struct spdk_cpuset * 458 spdk_thread_get_cpumask(struct spdk_thread *thread) 459 { 460 return &thread->cpumask; 461 } 462 463 int 464 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 465 { 466 struct spdk_thread *thread; 467 468 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 469 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 470 assert(false); 471 return -ENOTSUP; 472 } 473 474 thread = spdk_get_thread(); 475 if (!thread) { 476 SPDK_ERRLOG("Called from non-SPDK thread\n"); 477 assert(false); 478 return -EINVAL; 479 } 480 481 spdk_cpuset_copy(&thread->cpumask, cpumask); 482 483 /* Invoke framework's reschedule operation. If this function is called multiple times 484 * in a single spdk_thread_poll() context, the last cpumask will be used in the 485 * reschedule operation. 486 */ 487 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 488 489 return 0; 490 } 491 492 struct spdk_thread * 493 spdk_thread_get_from_ctx(void *ctx) 494 { 495 if (ctx == NULL) { 496 assert(false); 497 return NULL; 498 } 499 500 assert(g_ctx_sz > 0); 501 502 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 503 } 504 505 static inline uint32_t 506 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 507 { 508 unsigned count, i; 509 void *messages[SPDK_MSG_BATCH_SIZE]; 510 uint64_t notify = 1; 511 int rc; 512 513 #ifdef DEBUG 514 /* 515 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 516 * so we will never actually read uninitialized data from events, but just to be sure 517 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 518 */ 519 memset(messages, 0, sizeof(messages)); 520 #endif 521 522 if (max_msgs > 0) { 523 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 524 } else { 525 max_msgs = SPDK_MSG_BATCH_SIZE; 526 } 527 528 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 529 if (spdk_unlikely(thread->in_interrupt) && 530 spdk_ring_count(thread->messages) != 0) { 531 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 532 if (rc < 0) { 533 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 534 } 535 } 536 if (count == 0) { 537 return 0; 538 } 539 540 for (i = 0; i < count; i++) { 541 struct spdk_msg *msg = messages[i]; 542 543 assert(msg != NULL); 544 msg->fn(msg->arg); 545 546 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 547 /* Insert the messages at the head. We want to re-use the hot 548 * ones. */ 549 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 550 thread->msg_cache_count++; 551 } else { 552 spdk_mempool_put(g_spdk_msg_mempool, msg); 553 } 554 } 555 556 return count; 557 } 558 559 static void 560 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 561 { 562 struct spdk_poller *iter; 563 564 poller->next_run_tick = now + poller->period_ticks; 565 566 /* 567 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 568 * run time. 569 */ 570 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 571 if (iter->next_run_tick <= poller->next_run_tick) { 572 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 573 return; 574 } 575 } 576 577 /* No earlier pollers were found, so this poller must be the new head */ 578 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 579 } 580 581 static void 582 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 583 { 584 if (poller->period_ticks) { 585 poller_insert_timer(thread, poller, spdk_get_ticks()); 586 } else { 587 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 588 } 589 } 590 591 static inline void 592 thread_update_stats(struct spdk_thread *thread, uint64_t end, 593 uint64_t start, int rc) 594 { 595 if (rc == 0) { 596 /* Poller status idle */ 597 thread->stats.idle_tsc += end - start; 598 } else if (rc > 0) { 599 /* Poller status busy */ 600 thread->stats.busy_tsc += end - start; 601 } 602 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 603 thread->tsc_last = end; 604 } 605 606 static int 607 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 608 { 609 uint32_t msg_count; 610 struct spdk_poller *poller, *tmp; 611 spdk_msg_fn critical_msg; 612 int rc = 0; 613 614 thread->tsc_last = now; 615 616 critical_msg = thread->critical_msg; 617 if (spdk_unlikely(critical_msg != NULL)) { 618 critical_msg(NULL); 619 thread->critical_msg = NULL; 620 rc = 1; 621 } 622 623 msg_count = msg_queue_run_batch(thread, max_msgs); 624 if (msg_count) { 625 rc = 1; 626 } 627 628 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 629 active_pollers_head, tailq, tmp) { 630 int poller_rc; 631 632 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 633 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 634 free(poller); 635 continue; 636 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 637 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 638 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 639 poller->state = SPDK_POLLER_STATE_PAUSED; 640 continue; 641 } 642 643 poller->state = SPDK_POLLER_STATE_RUNNING; 644 poller_rc = poller->fn(poller->arg); 645 646 poller->run_count++; 647 if (poller_rc > 0) { 648 poller->busy_count++; 649 } 650 651 #ifdef DEBUG 652 if (poller_rc == -1) { 653 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 654 } 655 #endif 656 657 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 658 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 659 free(poller); 660 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 661 poller->state = SPDK_POLLER_STATE_WAITING; 662 } 663 664 if (poller_rc > rc) { 665 rc = poller_rc; 666 } 667 } 668 669 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 670 int timer_rc = 0; 671 672 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 673 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 674 free(poller); 675 continue; 676 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 677 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 678 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 679 poller->state = SPDK_POLLER_STATE_PAUSED; 680 continue; 681 } 682 683 if (now < poller->next_run_tick) { 684 break; 685 } 686 687 poller->state = SPDK_POLLER_STATE_RUNNING; 688 timer_rc = poller->fn(poller->arg); 689 690 poller->run_count++; 691 if (timer_rc > 0) { 692 poller->busy_count++; 693 } 694 695 #ifdef DEBUG 696 if (timer_rc == -1) { 697 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 698 } 699 #endif 700 701 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 702 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 703 free(poller); 704 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 705 poller->state = SPDK_POLLER_STATE_WAITING; 706 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 707 poller_insert_timer(thread, poller, now); 708 } 709 710 if (timer_rc > rc) { 711 rc = timer_rc; 712 } 713 } 714 715 return rc; 716 } 717 718 int 719 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 720 { 721 struct spdk_thread *orig_thread; 722 int rc; 723 724 orig_thread = _get_thread(); 725 tls_thread = thread; 726 727 if (now == 0) { 728 now = spdk_get_ticks(); 729 } 730 731 if (spdk_likely(!thread->in_interrupt)) { 732 rc = thread_poll(thread, max_msgs, now); 733 } else { 734 /* Non-block wait on thread's fd_group */ 735 rc = spdk_fd_group_wait(thread->fgrp, 0); 736 } 737 738 739 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 740 thread_exit(thread, now); 741 } 742 743 thread_update_stats(thread, spdk_get_ticks(), now, rc); 744 745 tls_thread = orig_thread; 746 747 return rc; 748 } 749 750 uint64_t 751 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 752 { 753 struct spdk_poller *poller; 754 755 poller = TAILQ_FIRST(&thread->timed_pollers); 756 if (poller) { 757 return poller->next_run_tick; 758 } 759 760 return 0; 761 } 762 763 int 764 spdk_thread_has_active_pollers(struct spdk_thread *thread) 765 { 766 return !TAILQ_EMPTY(&thread->active_pollers); 767 } 768 769 static bool 770 thread_has_unpaused_pollers(struct spdk_thread *thread) 771 { 772 if (TAILQ_EMPTY(&thread->active_pollers) && 773 TAILQ_EMPTY(&thread->timed_pollers)) { 774 return false; 775 } 776 777 return true; 778 } 779 780 bool 781 spdk_thread_has_pollers(struct spdk_thread *thread) 782 { 783 if (!thread_has_unpaused_pollers(thread) && 784 TAILQ_EMPTY(&thread->paused_pollers)) { 785 return false; 786 } 787 788 return true; 789 } 790 791 bool 792 spdk_thread_is_idle(struct spdk_thread *thread) 793 { 794 if (spdk_ring_count(thread->messages) || 795 thread_has_unpaused_pollers(thread) || 796 thread->critical_msg != NULL) { 797 return false; 798 } 799 800 return true; 801 } 802 803 uint32_t 804 spdk_thread_get_count(void) 805 { 806 /* 807 * Return cached value of the current thread count. We could acquire the 808 * lock and iterate through the TAILQ of threads to count them, but that 809 * count could still be invalidated after we release the lock. 810 */ 811 return g_thread_count; 812 } 813 814 struct spdk_thread * 815 spdk_get_thread(void) 816 { 817 return _get_thread(); 818 } 819 820 const char * 821 spdk_thread_get_name(const struct spdk_thread *thread) 822 { 823 return thread->name; 824 } 825 826 uint64_t 827 spdk_thread_get_id(const struct spdk_thread *thread) 828 { 829 return thread->id; 830 } 831 832 struct spdk_thread * 833 spdk_thread_get_by_id(uint64_t id) 834 { 835 struct spdk_thread *thread; 836 837 if (id == 0 || id >= g_thread_id) { 838 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 839 return NULL; 840 } 841 pthread_mutex_lock(&g_devlist_mutex); 842 TAILQ_FOREACH(thread, &g_threads, tailq) { 843 if (thread->id == id) { 844 break; 845 } 846 } 847 pthread_mutex_unlock(&g_devlist_mutex); 848 return thread; 849 } 850 851 int 852 spdk_thread_get_stats(struct spdk_thread_stats *stats) 853 { 854 struct spdk_thread *thread; 855 856 thread = _get_thread(); 857 if (!thread) { 858 SPDK_ERRLOG("No thread allocated\n"); 859 return -EINVAL; 860 } 861 862 if (stats == NULL) { 863 return -EINVAL; 864 } 865 866 *stats = thread->stats; 867 868 return 0; 869 } 870 871 uint64_t 872 spdk_thread_get_last_tsc(struct spdk_thread *thread) 873 { 874 if (thread == NULL) { 875 thread = _get_thread(); 876 } 877 878 return thread->tsc_last; 879 } 880 881 static inline int 882 thread_send_msg_notification(const struct spdk_thread *target_thread) 883 { 884 uint64_t notify = 1; 885 int rc; 886 887 /* Not necessary to do notification if interrupt facility is not enabled */ 888 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 889 return 0; 890 } 891 892 if (spdk_unlikely(target_thread->in_interrupt)) { 893 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 894 if (rc < 0) { 895 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 896 return -EIO; 897 } 898 } 899 900 return 0; 901 } 902 903 int 904 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 905 { 906 struct spdk_thread *local_thread; 907 struct spdk_msg *msg; 908 int rc; 909 910 assert(thread != NULL); 911 912 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 913 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 914 return -EIO; 915 } 916 917 local_thread = _get_thread(); 918 919 msg = NULL; 920 if (local_thread != NULL) { 921 if (local_thread->msg_cache_count > 0) { 922 msg = SLIST_FIRST(&local_thread->msg_cache); 923 assert(msg != NULL); 924 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 925 local_thread->msg_cache_count--; 926 } 927 } 928 929 if (msg == NULL) { 930 msg = spdk_mempool_get(g_spdk_msg_mempool); 931 if (!msg) { 932 SPDK_ERRLOG("msg could not be allocated\n"); 933 return -ENOMEM; 934 } 935 } 936 937 msg->fn = fn; 938 msg->arg = ctx; 939 940 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 941 if (rc != 1) { 942 SPDK_ERRLOG("msg could not be enqueued\n"); 943 spdk_mempool_put(g_spdk_msg_mempool, msg); 944 return -EIO; 945 } 946 947 return thread_send_msg_notification(thread); 948 } 949 950 int 951 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 952 { 953 spdk_msg_fn expected = NULL; 954 955 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 956 __ATOMIC_SEQ_CST)) { 957 return -EIO; 958 } 959 960 return thread_send_msg_notification(thread); 961 } 962 963 #ifdef __linux__ 964 static int 965 interrupt_timerfd_prepare(uint64_t period_microseconds) 966 { 967 int timerfd; 968 int ret; 969 struct itimerspec new_tv; 970 uint64_t period_seconds; 971 uint64_t period_nanoseconds; 972 973 if (period_microseconds == 0) { 974 return -EINVAL; 975 } 976 977 period_seconds = period_microseconds / SPDK_SEC_TO_USEC; 978 period_nanoseconds = period_microseconds % SPDK_SEC_TO_USEC * 1000; 979 980 new_tv.it_value.tv_sec = period_seconds; 981 new_tv.it_value.tv_nsec = period_nanoseconds; 982 983 new_tv.it_interval.tv_sec = period_seconds; 984 new_tv.it_interval.tv_nsec = period_nanoseconds; 985 986 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK & TFD_CLOEXEC); 987 if (timerfd < 0) { 988 return -errno; 989 } 990 991 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 992 if (ret < 0) { 993 close(timerfd); 994 return -errno; 995 } 996 997 return timerfd; 998 } 999 #else 1000 static int 1001 interrupt_timerfd_prepare(uint64_t period_microseconds) 1002 { 1003 return -ENOTSUP; 1004 } 1005 #endif 1006 1007 static int 1008 interrupt_timerfd_process(void *arg) 1009 { 1010 struct spdk_poller *poller = arg; 1011 uint64_t exp; 1012 int rc; 1013 1014 /* clear the level of interval timer */ 1015 rc = read(poller->timerfd, &exp, sizeof(exp)); 1016 if (rc < 0) { 1017 if (rc == -EAGAIN) { 1018 return 0; 1019 } 1020 1021 return rc; 1022 } 1023 1024 return poller->fn(poller->arg); 1025 } 1026 1027 static int 1028 thread_interrupt_register_timerfd(struct spdk_fd_group *fgrp, 1029 uint64_t period_microseconds, 1030 struct spdk_poller *poller) 1031 { 1032 int timerfd; 1033 int rc; 1034 1035 timerfd = interrupt_timerfd_prepare(period_microseconds); 1036 if (timerfd < 0) { 1037 return timerfd; 1038 } 1039 1040 rc = spdk_fd_group_add(fgrp, timerfd, 1041 interrupt_timerfd_process, poller); 1042 if (rc < 0) { 1043 close(timerfd); 1044 return rc; 1045 } 1046 1047 return timerfd; 1048 } 1049 1050 static struct spdk_poller * 1051 poller_register(spdk_poller_fn fn, 1052 void *arg, 1053 uint64_t period_microseconds, 1054 const char *name) 1055 { 1056 struct spdk_thread *thread; 1057 struct spdk_poller *poller; 1058 uint64_t quotient, remainder, ticks; 1059 1060 thread = spdk_get_thread(); 1061 if (!thread) { 1062 assert(false); 1063 return NULL; 1064 } 1065 1066 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1067 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1068 return NULL; 1069 } 1070 1071 poller = calloc(1, sizeof(*poller)); 1072 if (poller == NULL) { 1073 SPDK_ERRLOG("Poller memory allocation failed\n"); 1074 return NULL; 1075 } 1076 1077 if (name) { 1078 snprintf(poller->name, sizeof(poller->name), "%s", name); 1079 } else { 1080 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1081 } 1082 1083 poller->state = SPDK_POLLER_STATE_WAITING; 1084 poller->fn = fn; 1085 poller->arg = arg; 1086 poller->thread = thread; 1087 1088 if (period_microseconds) { 1089 quotient = period_microseconds / SPDK_SEC_TO_USEC; 1090 remainder = period_microseconds % SPDK_SEC_TO_USEC; 1091 ticks = spdk_get_ticks_hz(); 1092 1093 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1094 } else { 1095 poller->period_ticks = 0; 1096 } 1097 1098 if (spdk_interrupt_mode_is_enabled() && period_microseconds != 0) { 1099 int rc; 1100 1101 poller->timerfd = -1; 1102 rc = thread_interrupt_register_timerfd(thread->fgrp, period_microseconds, poller); 1103 if (rc < 0) { 1104 SPDK_ERRLOG("Failed to register timerfd for periodic poller: %s\n", spdk_strerror(-rc)); 1105 free(poller); 1106 return NULL; 1107 } 1108 poller->timerfd = rc; 1109 } 1110 1111 thread_insert_poller(thread, poller); 1112 1113 return poller; 1114 } 1115 1116 struct spdk_poller * 1117 spdk_poller_register(spdk_poller_fn fn, 1118 void *arg, 1119 uint64_t period_microseconds) 1120 { 1121 return poller_register(fn, arg, period_microseconds, NULL); 1122 } 1123 1124 struct spdk_poller * 1125 spdk_poller_register_named(spdk_poller_fn fn, 1126 void *arg, 1127 uint64_t period_microseconds, 1128 const char *name) 1129 { 1130 return poller_register(fn, arg, period_microseconds, name); 1131 } 1132 1133 void 1134 spdk_poller_unregister(struct spdk_poller **ppoller) 1135 { 1136 struct spdk_thread *thread; 1137 struct spdk_poller *poller; 1138 1139 poller = *ppoller; 1140 if (poller == NULL) { 1141 return; 1142 } 1143 1144 *ppoller = NULL; 1145 1146 thread = spdk_get_thread(); 1147 if (!thread) { 1148 assert(false); 1149 return; 1150 } 1151 1152 if (poller->thread != thread) { 1153 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1154 assert(false); 1155 return; 1156 } 1157 1158 if (spdk_interrupt_mode_is_enabled() && poller->timerfd >= 0) { 1159 spdk_fd_group_remove(thread->fgrp, poller->timerfd); 1160 close(poller->timerfd); 1161 poller->timerfd = -1; 1162 } 1163 1164 /* If the poller was paused, put it on the active_pollers list so that 1165 * its unregistration can be processed by spdk_thread_poll(). 1166 */ 1167 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1168 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1169 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1170 poller->period_ticks = 0; 1171 } 1172 1173 /* Simply set the state to unregistered. The poller will get cleaned up 1174 * in a subsequent call to spdk_thread_poll(). 1175 */ 1176 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1177 } 1178 1179 void 1180 spdk_poller_pause(struct spdk_poller *poller) 1181 { 1182 struct spdk_thread *thread; 1183 1184 if (poller->state == SPDK_POLLER_STATE_PAUSED || 1185 poller->state == SPDK_POLLER_STATE_PAUSING) { 1186 return; 1187 } 1188 1189 thread = spdk_get_thread(); 1190 if (!thread) { 1191 assert(false); 1192 return; 1193 } 1194 1195 /* If a poller is paused from within itself, we can immediately move it 1196 * on the paused_pollers list. Otherwise we just set its state to 1197 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 1198 * allows a poller to be paused from another one's context without 1199 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 1200 */ 1201 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 1202 poller->state = SPDK_POLLER_STATE_PAUSING; 1203 } else { 1204 if (poller->period_ticks > 0) { 1205 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 1206 } else { 1207 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1208 } 1209 1210 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1211 poller->state = SPDK_POLLER_STATE_PAUSED; 1212 } 1213 } 1214 1215 void 1216 spdk_poller_resume(struct spdk_poller *poller) 1217 { 1218 struct spdk_thread *thread; 1219 1220 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1221 poller->state != SPDK_POLLER_STATE_PAUSING) { 1222 return; 1223 } 1224 1225 thread = spdk_get_thread(); 1226 if (!thread) { 1227 assert(false); 1228 return; 1229 } 1230 1231 /* If a poller is paused it has to be removed from the paused pollers 1232 * list and put on the active / timer list depending on its 1233 * period_ticks. If a poller is still in the process of being paused, 1234 * we just need to flip its state back to waiting, as it's already on 1235 * the appropriate list. 1236 */ 1237 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1238 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1239 thread_insert_poller(thread, poller); 1240 } 1241 1242 poller->state = SPDK_POLLER_STATE_WAITING; 1243 } 1244 1245 const char * 1246 spdk_poller_state_str(enum spdk_poller_state state) 1247 { 1248 switch (state) { 1249 case SPDK_POLLER_STATE_WAITING: 1250 return "waiting"; 1251 case SPDK_POLLER_STATE_RUNNING: 1252 return "running"; 1253 case SPDK_POLLER_STATE_UNREGISTERED: 1254 return "unregistered"; 1255 case SPDK_POLLER_STATE_PAUSING: 1256 return "pausing"; 1257 case SPDK_POLLER_STATE_PAUSED: 1258 return "paused"; 1259 default: 1260 return NULL; 1261 } 1262 } 1263 1264 struct call_thread { 1265 struct spdk_thread *cur_thread; 1266 spdk_msg_fn fn; 1267 void *ctx; 1268 1269 struct spdk_thread *orig_thread; 1270 spdk_msg_fn cpl; 1271 }; 1272 1273 static void 1274 _on_thread(void *ctx) 1275 { 1276 struct call_thread *ct = ctx; 1277 int rc __attribute__((unused)); 1278 1279 ct->fn(ct->ctx); 1280 1281 pthread_mutex_lock(&g_devlist_mutex); 1282 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1283 pthread_mutex_unlock(&g_devlist_mutex); 1284 1285 if (!ct->cur_thread) { 1286 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1287 1288 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1289 free(ctx); 1290 } else { 1291 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1292 ct->cur_thread->name); 1293 1294 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1295 } 1296 assert(rc == 0); 1297 } 1298 1299 void 1300 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1301 { 1302 struct call_thread *ct; 1303 struct spdk_thread *thread; 1304 int rc __attribute__((unused)); 1305 1306 ct = calloc(1, sizeof(*ct)); 1307 if (!ct) { 1308 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1309 cpl(ctx); 1310 return; 1311 } 1312 1313 ct->fn = fn; 1314 ct->ctx = ctx; 1315 ct->cpl = cpl; 1316 1317 thread = _get_thread(); 1318 if (!thread) { 1319 SPDK_ERRLOG("No thread allocated\n"); 1320 free(ct); 1321 cpl(ctx); 1322 return; 1323 } 1324 ct->orig_thread = thread; 1325 1326 pthread_mutex_lock(&g_devlist_mutex); 1327 ct->cur_thread = TAILQ_FIRST(&g_threads); 1328 pthread_mutex_unlock(&g_devlist_mutex); 1329 1330 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1331 ct->orig_thread->name); 1332 1333 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1334 assert(rc == 0); 1335 } 1336 1337 void 1338 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1339 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1340 const char *name) 1341 { 1342 struct io_device *dev, *tmp; 1343 struct spdk_thread *thread; 1344 1345 assert(io_device != NULL); 1346 assert(create_cb != NULL); 1347 assert(destroy_cb != NULL); 1348 1349 thread = spdk_get_thread(); 1350 if (!thread) { 1351 SPDK_ERRLOG("called from non-SPDK thread\n"); 1352 assert(false); 1353 return; 1354 } 1355 1356 dev = calloc(1, sizeof(struct io_device)); 1357 if (dev == NULL) { 1358 SPDK_ERRLOG("could not allocate io_device\n"); 1359 return; 1360 } 1361 1362 dev->io_device = io_device; 1363 if (name) { 1364 snprintf(dev->name, sizeof(dev->name), "%s", name); 1365 } else { 1366 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1367 } 1368 dev->create_cb = create_cb; 1369 dev->destroy_cb = destroy_cb; 1370 dev->unregister_cb = NULL; 1371 dev->ctx_size = ctx_size; 1372 dev->for_each_count = 0; 1373 dev->unregistered = false; 1374 dev->refcnt = 0; 1375 1376 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1377 dev->name, dev->io_device, thread->name); 1378 1379 pthread_mutex_lock(&g_devlist_mutex); 1380 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1381 if (tmp->io_device == io_device) { 1382 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1383 io_device, tmp->name, dev->name); 1384 free(dev); 1385 pthread_mutex_unlock(&g_devlist_mutex); 1386 return; 1387 } 1388 } 1389 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1390 pthread_mutex_unlock(&g_devlist_mutex); 1391 } 1392 1393 static void 1394 _finish_unregister(void *arg) 1395 { 1396 struct io_device *dev = arg; 1397 struct spdk_thread *thread; 1398 1399 thread = spdk_get_thread(); 1400 assert(thread == dev->unregister_thread); 1401 1402 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1403 dev->name, dev->io_device, thread->name); 1404 1405 assert(thread->pending_unregister_count > 0); 1406 thread->pending_unregister_count--; 1407 1408 dev->unregister_cb(dev->io_device); 1409 free(dev); 1410 } 1411 1412 static void 1413 io_device_free(struct io_device *dev) 1414 { 1415 int rc __attribute__((unused)); 1416 1417 if (dev->unregister_cb == NULL) { 1418 free(dev); 1419 } else { 1420 assert(dev->unregister_thread != NULL); 1421 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1422 dev->name, dev->io_device, dev->unregister_thread->name); 1423 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1424 assert(rc == 0); 1425 } 1426 } 1427 1428 void 1429 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1430 { 1431 struct io_device *dev; 1432 uint32_t refcnt; 1433 struct spdk_thread *thread; 1434 1435 thread = spdk_get_thread(); 1436 if (!thread) { 1437 SPDK_ERRLOG("called from non-SPDK thread\n"); 1438 assert(false); 1439 return; 1440 } 1441 1442 pthread_mutex_lock(&g_devlist_mutex); 1443 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1444 if (dev->io_device == io_device) { 1445 break; 1446 } 1447 } 1448 1449 if (!dev) { 1450 SPDK_ERRLOG("io_device %p not found\n", io_device); 1451 assert(false); 1452 pthread_mutex_unlock(&g_devlist_mutex); 1453 return; 1454 } 1455 1456 if (dev->for_each_count > 0) { 1457 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1458 dev->name, io_device, dev->for_each_count); 1459 pthread_mutex_unlock(&g_devlist_mutex); 1460 return; 1461 } 1462 1463 dev->unregister_cb = unregister_cb; 1464 dev->unregistered = true; 1465 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1466 refcnt = dev->refcnt; 1467 dev->unregister_thread = thread; 1468 pthread_mutex_unlock(&g_devlist_mutex); 1469 1470 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1471 dev->name, dev->io_device, thread->name); 1472 1473 if (unregister_cb) { 1474 thread->pending_unregister_count++; 1475 } 1476 1477 if (refcnt > 0) { 1478 /* defer deletion */ 1479 return; 1480 } 1481 1482 io_device_free(dev); 1483 } 1484 1485 const char * 1486 spdk_io_device_get_name(struct io_device *dev) 1487 { 1488 return dev->name; 1489 } 1490 1491 struct spdk_io_channel * 1492 spdk_get_io_channel(void *io_device) 1493 { 1494 struct spdk_io_channel *ch; 1495 struct spdk_thread *thread; 1496 struct io_device *dev; 1497 int rc; 1498 1499 pthread_mutex_lock(&g_devlist_mutex); 1500 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1501 if (dev->io_device == io_device) { 1502 break; 1503 } 1504 } 1505 if (dev == NULL) { 1506 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1507 pthread_mutex_unlock(&g_devlist_mutex); 1508 return NULL; 1509 } 1510 1511 thread = _get_thread(); 1512 if (!thread) { 1513 SPDK_ERRLOG("No thread allocated\n"); 1514 pthread_mutex_unlock(&g_devlist_mutex); 1515 return NULL; 1516 } 1517 1518 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1519 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1520 pthread_mutex_unlock(&g_devlist_mutex); 1521 return NULL; 1522 } 1523 1524 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1525 if (ch->dev == dev) { 1526 ch->ref++; 1527 1528 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1529 ch, dev->name, dev->io_device, thread->name, ch->ref); 1530 1531 /* 1532 * An I/O channel already exists for this device on this 1533 * thread, so return it. 1534 */ 1535 pthread_mutex_unlock(&g_devlist_mutex); 1536 return ch; 1537 } 1538 } 1539 1540 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1541 if (ch == NULL) { 1542 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1543 pthread_mutex_unlock(&g_devlist_mutex); 1544 return NULL; 1545 } 1546 1547 ch->dev = dev; 1548 ch->destroy_cb = dev->destroy_cb; 1549 ch->thread = thread; 1550 ch->ref = 1; 1551 ch->destroy_ref = 0; 1552 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1553 1554 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1555 ch, dev->name, dev->io_device, thread->name, ch->ref); 1556 1557 dev->refcnt++; 1558 1559 pthread_mutex_unlock(&g_devlist_mutex); 1560 1561 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1562 if (rc != 0) { 1563 pthread_mutex_lock(&g_devlist_mutex); 1564 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1565 dev->refcnt--; 1566 free(ch); 1567 pthread_mutex_unlock(&g_devlist_mutex); 1568 return NULL; 1569 } 1570 1571 return ch; 1572 } 1573 1574 static void 1575 put_io_channel(void *arg) 1576 { 1577 struct spdk_io_channel *ch = arg; 1578 bool do_remove_dev = true; 1579 struct spdk_thread *thread; 1580 1581 thread = spdk_get_thread(); 1582 if (!thread) { 1583 SPDK_ERRLOG("called from non-SPDK thread\n"); 1584 assert(false); 1585 return; 1586 } 1587 1588 SPDK_DEBUGLOG(thread, 1589 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1590 ch, ch->dev->name, ch->dev->io_device, thread->name); 1591 1592 assert(ch->thread == thread); 1593 1594 ch->destroy_ref--; 1595 1596 if (ch->ref > 0 || ch->destroy_ref > 0) { 1597 /* 1598 * Another reference to the associated io_device was requested 1599 * after this message was sent but before it had a chance to 1600 * execute. 1601 */ 1602 return; 1603 } 1604 1605 pthread_mutex_lock(&g_devlist_mutex); 1606 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1607 pthread_mutex_unlock(&g_devlist_mutex); 1608 1609 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1610 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1611 1612 pthread_mutex_lock(&g_devlist_mutex); 1613 ch->dev->refcnt--; 1614 1615 if (!ch->dev->unregistered) { 1616 do_remove_dev = false; 1617 } 1618 1619 if (ch->dev->refcnt > 0) { 1620 do_remove_dev = false; 1621 } 1622 1623 pthread_mutex_unlock(&g_devlist_mutex); 1624 1625 if (do_remove_dev) { 1626 io_device_free(ch->dev); 1627 } 1628 free(ch); 1629 } 1630 1631 void 1632 spdk_put_io_channel(struct spdk_io_channel *ch) 1633 { 1634 struct spdk_thread *thread; 1635 int rc __attribute__((unused)); 1636 1637 thread = spdk_get_thread(); 1638 if (!thread) { 1639 SPDK_ERRLOG("called from non-SPDK thread\n"); 1640 assert(false); 1641 return; 1642 } 1643 1644 if (ch->thread != thread) { 1645 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1646 assert(false); 1647 return; 1648 } 1649 1650 SPDK_DEBUGLOG(thread, 1651 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1652 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1653 1654 ch->ref--; 1655 1656 if (ch->ref == 0) { 1657 ch->destroy_ref++; 1658 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 1659 assert(rc == 0); 1660 } 1661 } 1662 1663 struct spdk_io_channel * 1664 spdk_io_channel_from_ctx(void *ctx) 1665 { 1666 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1667 } 1668 1669 struct spdk_thread * 1670 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1671 { 1672 return ch->thread; 1673 } 1674 1675 void * 1676 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 1677 { 1678 return ch->dev->io_device; 1679 } 1680 1681 struct spdk_io_channel_iter { 1682 void *io_device; 1683 struct io_device *dev; 1684 spdk_channel_msg fn; 1685 int status; 1686 void *ctx; 1687 struct spdk_io_channel *ch; 1688 1689 struct spdk_thread *cur_thread; 1690 1691 struct spdk_thread *orig_thread; 1692 spdk_channel_for_each_cpl cpl; 1693 }; 1694 1695 void * 1696 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1697 { 1698 return i->io_device; 1699 } 1700 1701 struct spdk_io_channel * 1702 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1703 { 1704 return i->ch; 1705 } 1706 1707 void * 1708 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1709 { 1710 return i->ctx; 1711 } 1712 1713 static void 1714 _call_completion(void *ctx) 1715 { 1716 struct spdk_io_channel_iter *i = ctx; 1717 1718 if (i->cpl != NULL) { 1719 i->cpl(i, i->status); 1720 } 1721 free(i); 1722 } 1723 1724 static void 1725 _call_channel(void *ctx) 1726 { 1727 struct spdk_io_channel_iter *i = ctx; 1728 struct spdk_io_channel *ch; 1729 1730 /* 1731 * It is possible that the channel was deleted before this 1732 * message had a chance to execute. If so, skip calling 1733 * the fn() on this thread. 1734 */ 1735 pthread_mutex_lock(&g_devlist_mutex); 1736 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1737 if (ch->dev->io_device == i->io_device) { 1738 break; 1739 } 1740 } 1741 pthread_mutex_unlock(&g_devlist_mutex); 1742 1743 if (ch) { 1744 i->fn(i); 1745 } else { 1746 spdk_for_each_channel_continue(i, 0); 1747 } 1748 } 1749 1750 void 1751 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1752 spdk_channel_for_each_cpl cpl) 1753 { 1754 struct spdk_thread *thread; 1755 struct spdk_io_channel *ch; 1756 struct spdk_io_channel_iter *i; 1757 int rc __attribute__((unused)); 1758 1759 i = calloc(1, sizeof(*i)); 1760 if (!i) { 1761 SPDK_ERRLOG("Unable to allocate iterator\n"); 1762 return; 1763 } 1764 1765 i->io_device = io_device; 1766 i->fn = fn; 1767 i->ctx = ctx; 1768 i->cpl = cpl; 1769 1770 pthread_mutex_lock(&g_devlist_mutex); 1771 i->orig_thread = _get_thread(); 1772 1773 TAILQ_FOREACH(thread, &g_threads, tailq) { 1774 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1775 if (ch->dev->io_device == io_device) { 1776 ch->dev->for_each_count++; 1777 i->dev = ch->dev; 1778 i->cur_thread = thread; 1779 i->ch = ch; 1780 pthread_mutex_unlock(&g_devlist_mutex); 1781 rc = spdk_thread_send_msg(thread, _call_channel, i); 1782 assert(rc == 0); 1783 return; 1784 } 1785 } 1786 } 1787 1788 pthread_mutex_unlock(&g_devlist_mutex); 1789 1790 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1791 assert(rc == 0); 1792 } 1793 1794 void 1795 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1796 { 1797 struct spdk_thread *thread; 1798 struct spdk_io_channel *ch; 1799 int rc __attribute__((unused)); 1800 1801 assert(i->cur_thread == spdk_get_thread()); 1802 1803 i->status = status; 1804 1805 pthread_mutex_lock(&g_devlist_mutex); 1806 if (status) { 1807 goto end; 1808 } 1809 thread = TAILQ_NEXT(i->cur_thread, tailq); 1810 while (thread) { 1811 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1812 if (ch->dev->io_device == i->io_device) { 1813 i->cur_thread = thread; 1814 i->ch = ch; 1815 pthread_mutex_unlock(&g_devlist_mutex); 1816 rc = spdk_thread_send_msg(thread, _call_channel, i); 1817 assert(rc == 0); 1818 return; 1819 } 1820 } 1821 thread = TAILQ_NEXT(thread, tailq); 1822 } 1823 1824 end: 1825 i->dev->for_each_count--; 1826 i->ch = NULL; 1827 pthread_mutex_unlock(&g_devlist_mutex); 1828 1829 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1830 assert(rc == 0); 1831 } 1832 1833 struct spdk_interrupt { 1834 int efd; 1835 struct spdk_thread *thread; 1836 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 1837 }; 1838 1839 static void 1840 thread_interrupt_destroy(struct spdk_thread *thread) 1841 { 1842 struct spdk_fd_group *fgrp = thread->fgrp; 1843 1844 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 1845 1846 if (thread->msg_fd < 0) { 1847 return; 1848 } 1849 1850 spdk_fd_group_remove(fgrp, thread->msg_fd); 1851 close(thread->msg_fd); 1852 thread->msg_fd = -1; 1853 1854 spdk_fd_group_destroy(fgrp); 1855 thread->fgrp = NULL; 1856 } 1857 1858 #ifdef __linux__ 1859 static int 1860 thread_interrupt_msg_process(void *arg) 1861 { 1862 struct spdk_thread *thread = arg; 1863 uint32_t msg_count; 1864 spdk_msg_fn critical_msg; 1865 int rc = 0; 1866 uint64_t notify = 1; 1867 1868 assert(spdk_interrupt_mode_is_enabled()); 1869 1870 /* There may be race between msg_acknowledge and another producer's msg_notify, 1871 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 1872 * This can avoid msg notification missing. 1873 */ 1874 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 1875 if (rc < 0 && errno != EAGAIN) { 1876 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 1877 } 1878 1879 critical_msg = thread->critical_msg; 1880 if (spdk_unlikely(critical_msg != NULL)) { 1881 critical_msg(NULL); 1882 thread->critical_msg = NULL; 1883 rc = 1; 1884 } 1885 1886 msg_count = msg_queue_run_batch(thread, 0); 1887 if (msg_count) { 1888 rc = 1; 1889 } 1890 1891 return rc; 1892 } 1893 1894 static int 1895 thread_interrupt_create(struct spdk_thread *thread) 1896 { 1897 int rc; 1898 1899 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 1900 1901 rc = spdk_fd_group_create(&thread->fgrp); 1902 if (rc) { 1903 return rc; 1904 } 1905 1906 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1907 if (thread->msg_fd < 0) { 1908 rc = -errno; 1909 spdk_fd_group_destroy(thread->fgrp); 1910 thread->fgrp = NULL; 1911 1912 return rc; 1913 } 1914 1915 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 1916 } 1917 #else 1918 static int 1919 thread_interrupt_create(struct spdk_thread *thread) 1920 { 1921 return -ENOTSUP; 1922 } 1923 #endif 1924 1925 struct spdk_interrupt * 1926 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 1927 void *arg, const char *name) 1928 { 1929 struct spdk_thread *thread; 1930 struct spdk_interrupt *intr; 1931 1932 thread = spdk_get_thread(); 1933 if (!thread) { 1934 assert(false); 1935 return NULL; 1936 } 1937 1938 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 1939 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1940 return NULL; 1941 } 1942 1943 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 1944 return NULL; 1945 } 1946 1947 intr = calloc(1, sizeof(*intr)); 1948 if (intr == NULL) { 1949 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 1950 return NULL; 1951 } 1952 1953 if (name) { 1954 snprintf(intr->name, sizeof(intr->name), "%s", name); 1955 } else { 1956 snprintf(intr->name, sizeof(intr->name), "%p", fn); 1957 } 1958 1959 intr->efd = efd; 1960 intr->thread = thread; 1961 1962 return intr; 1963 } 1964 1965 void 1966 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 1967 { 1968 struct spdk_thread *thread; 1969 struct spdk_interrupt *intr; 1970 1971 intr = *pintr; 1972 if (intr == NULL) { 1973 return; 1974 } 1975 1976 *pintr = NULL; 1977 1978 thread = spdk_get_thread(); 1979 if (!thread) { 1980 assert(false); 1981 return; 1982 } 1983 1984 if (intr->thread != thread) { 1985 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 1986 assert(false); 1987 return; 1988 } 1989 1990 spdk_fd_group_remove(thread->fgrp, intr->efd); 1991 free(intr); 1992 } 1993 1994 int 1995 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 1996 enum spdk_interrupt_event_types event_types) 1997 { 1998 struct spdk_thread *thread; 1999 2000 thread = spdk_get_thread(); 2001 if (!thread) { 2002 assert(false); 2003 return -EINVAL; 2004 } 2005 2006 if (intr->thread != thread) { 2007 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2008 assert(false); 2009 return -EINVAL; 2010 } 2011 2012 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2013 } 2014 2015 int 2016 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2017 { 2018 return spdk_fd_group_get_fd(thread->fgrp); 2019 } 2020 2021 static bool g_interrupt_mode = false; 2022 2023 int 2024 spdk_interrupt_mode_enable(void) 2025 { 2026 /* It must be called once prior to initializing the threading library. 2027 * g_spdk_msg_mempool will be valid if thread library is initialized. 2028 */ 2029 if (g_spdk_msg_mempool) { 2030 SPDK_ERRLOG("Failed due to threading library is already initailzied.\n"); 2031 return -1; 2032 } 2033 2034 #ifdef __linux__ 2035 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2036 g_interrupt_mode = true; 2037 return 0; 2038 #else 2039 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2040 g_interrupt_mode = false; 2041 return -ENOTSUP; 2042 #endif 2043 } 2044 2045 bool 2046 spdk_interrupt_mode_is_enabled(void) 2047 { 2048 return g_interrupt_mode; 2049 } 2050 2051 SPDK_LOG_REGISTER_COMPONENT(thread) 2052