1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 #include "spdk/fd_group.h" 43 44 #include "spdk/log.h" 45 #include "spdk_internal/thread.h" 46 47 #ifdef __linux__ 48 #include <sys/timerfd.h> 49 #include <sys/eventfd.h> 50 #endif 51 52 #define SPDK_MSG_BATCH_SIZE 8 53 #define SPDK_MAX_DEVICE_NAME_LEN 256 54 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 55 56 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 57 58 static spdk_new_thread_fn g_new_thread_fn = NULL; 59 static spdk_thread_op_fn g_thread_op_fn = NULL; 60 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 61 static size_t g_ctx_sz = 0; 62 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 63 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 64 * SPDK application is required. 65 */ 66 static uint64_t g_thread_id = 1; 67 68 struct io_device { 69 void *io_device; 70 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 71 spdk_io_channel_create_cb create_cb; 72 spdk_io_channel_destroy_cb destroy_cb; 73 spdk_io_device_unregister_cb unregister_cb; 74 struct spdk_thread *unregister_thread; 75 uint32_t ctx_size; 76 uint32_t for_each_count; 77 TAILQ_ENTRY(io_device) tailq; 78 79 uint32_t refcnt; 80 81 bool unregistered; 82 }; 83 84 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 85 86 struct spdk_msg { 87 spdk_msg_fn fn; 88 void *arg; 89 90 SLIST_ENTRY(spdk_msg) link; 91 }; 92 93 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 94 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 95 96 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 97 static uint32_t g_thread_count = 0; 98 99 static __thread struct spdk_thread *tls_thread = NULL; 100 101 static inline struct spdk_thread * 102 _get_thread(void) 103 { 104 return tls_thread; 105 } 106 107 static int 108 _thread_lib_init(size_t ctx_sz) 109 { 110 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 111 112 g_ctx_sz = ctx_sz; 113 114 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 115 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 116 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 117 sizeof(struct spdk_msg), 118 0, /* No cache. We do our own. */ 119 SPDK_ENV_SOCKET_ID_ANY); 120 121 if (!g_spdk_msg_mempool) { 122 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 123 return -1; 124 } 125 126 return 0; 127 } 128 129 int 130 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 131 { 132 assert(g_new_thread_fn == NULL); 133 assert(g_thread_op_fn == NULL); 134 135 if (new_thread_fn == NULL) { 136 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 137 } else { 138 g_new_thread_fn = new_thread_fn; 139 } 140 141 return _thread_lib_init(ctx_sz); 142 } 143 144 int 145 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 146 spdk_thread_op_supported_fn thread_op_supported_fn, 147 size_t ctx_sz) 148 { 149 assert(g_new_thread_fn == NULL); 150 assert(g_thread_op_fn == NULL); 151 assert(g_thread_op_supported_fn == NULL); 152 153 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 154 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 155 return -EINVAL; 156 } 157 158 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 159 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 160 } else { 161 g_thread_op_fn = thread_op_fn; 162 g_thread_op_supported_fn = thread_op_supported_fn; 163 } 164 165 return _thread_lib_init(ctx_sz); 166 } 167 168 void 169 spdk_thread_lib_fini(void) 170 { 171 struct io_device *dev; 172 173 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 174 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 175 } 176 177 if (g_spdk_msg_mempool) { 178 spdk_mempool_free(g_spdk_msg_mempool); 179 g_spdk_msg_mempool = NULL; 180 } 181 182 g_new_thread_fn = NULL; 183 g_thread_op_fn = NULL; 184 g_thread_op_supported_fn = NULL; 185 g_ctx_sz = 0; 186 } 187 188 static void thread_interrupt_destroy(struct spdk_thread *thread); 189 static int thread_interrupt_create(struct spdk_thread *thread); 190 191 static void 192 _free_thread(struct spdk_thread *thread) 193 { 194 struct spdk_io_channel *ch; 195 struct spdk_msg *msg; 196 struct spdk_poller *poller, *ptmp; 197 198 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 199 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 200 thread->name, ch->dev->name); 201 } 202 203 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 204 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 205 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 206 poller->name); 207 } 208 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 209 free(poller); 210 } 211 212 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 213 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 214 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 215 poller->name); 216 } 217 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 218 free(poller); 219 } 220 221 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 222 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 223 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 224 free(poller); 225 } 226 227 pthread_mutex_lock(&g_devlist_mutex); 228 assert(g_thread_count > 0); 229 g_thread_count--; 230 TAILQ_REMOVE(&g_threads, thread, tailq); 231 pthread_mutex_unlock(&g_devlist_mutex); 232 233 msg = SLIST_FIRST(&thread->msg_cache); 234 while (msg != NULL) { 235 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 236 237 assert(thread->msg_cache_count > 0); 238 thread->msg_cache_count--; 239 spdk_mempool_put(g_spdk_msg_mempool, msg); 240 241 msg = SLIST_FIRST(&thread->msg_cache); 242 } 243 244 assert(thread->msg_cache_count == 0); 245 246 if (thread->interrupt_mode) { 247 thread_interrupt_destroy(thread); 248 } 249 250 spdk_ring_free(thread->messages); 251 free(thread); 252 } 253 254 struct spdk_thread * 255 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 256 { 257 struct spdk_thread *thread; 258 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 259 int rc = 0, i; 260 261 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 262 if (!thread) { 263 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 264 return NULL; 265 } 266 267 if (cpumask) { 268 spdk_cpuset_copy(&thread->cpumask, cpumask); 269 } else { 270 spdk_cpuset_negate(&thread->cpumask); 271 } 272 273 TAILQ_INIT(&thread->io_channels); 274 TAILQ_INIT(&thread->active_pollers); 275 TAILQ_INIT(&thread->timed_pollers); 276 TAILQ_INIT(&thread->paused_pollers); 277 SLIST_INIT(&thread->msg_cache); 278 thread->msg_cache_count = 0; 279 280 thread->tsc_last = spdk_get_ticks(); 281 282 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 283 if (!thread->messages) { 284 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 285 free(thread); 286 return NULL; 287 } 288 289 /* Fill the local message pool cache. */ 290 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 291 if (rc == 0) { 292 /* If we can't populate the cache it's ok. The cache will get filled 293 * up organically as messages are passed to the thread. */ 294 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 295 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 296 thread->msg_cache_count++; 297 } 298 } 299 300 if (name) { 301 snprintf(thread->name, sizeof(thread->name), "%s", name); 302 } else { 303 snprintf(thread->name, sizeof(thread->name), "%p", thread); 304 } 305 306 pthread_mutex_lock(&g_devlist_mutex); 307 if (g_thread_id == 0) { 308 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 309 pthread_mutex_unlock(&g_devlist_mutex); 310 _free_thread(thread); 311 return NULL; 312 } 313 thread->id = g_thread_id++; 314 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 315 g_thread_count++; 316 pthread_mutex_unlock(&g_devlist_mutex); 317 318 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 319 thread->id, thread->name); 320 321 if (spdk_interrupt_mode_is_enabled()) { 322 thread->interrupt_mode = true; 323 rc = thread_interrupt_create(thread); 324 if (rc != 0) { 325 _free_thread(thread); 326 return NULL; 327 } 328 } 329 330 if (g_new_thread_fn) { 331 rc = g_new_thread_fn(thread); 332 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 333 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 334 } 335 336 if (rc != 0) { 337 _free_thread(thread); 338 return NULL; 339 } 340 341 thread->state = SPDK_THREAD_STATE_RUNNING; 342 343 return thread; 344 } 345 346 void 347 spdk_set_thread(struct spdk_thread *thread) 348 { 349 tls_thread = thread; 350 } 351 352 static void 353 thread_exit(struct spdk_thread *thread, uint64_t now) 354 { 355 struct spdk_poller *poller; 356 struct spdk_io_channel *ch; 357 358 if (now >= thread->exit_timeout_tsc) { 359 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 360 thread->name); 361 goto exited; 362 } 363 364 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 365 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 366 SPDK_INFOLOG(thread, 367 "thread %s still has active poller %s\n", 368 thread->name, poller->name); 369 return; 370 } 371 } 372 373 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 374 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 375 SPDK_INFOLOG(thread, 376 "thread %s still has active timed poller %s\n", 377 thread->name, poller->name); 378 return; 379 } 380 } 381 382 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 383 SPDK_INFOLOG(thread, 384 "thread %s still has paused poller %s\n", 385 thread->name, poller->name); 386 return; 387 } 388 389 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 390 SPDK_INFOLOG(thread, 391 "thread %s still has channel for io_device %s\n", 392 thread->name, ch->dev->name); 393 return; 394 } 395 396 if (thread->pending_unregister_count > 0) { 397 SPDK_INFOLOG(thread, 398 "thread %s is still unregistering io_devices\n", 399 thread->name); 400 return; 401 } 402 403 exited: 404 thread->state = SPDK_THREAD_STATE_EXITED; 405 } 406 407 int 408 spdk_thread_exit(struct spdk_thread *thread) 409 { 410 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 411 412 assert(tls_thread == thread); 413 414 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 415 SPDK_INFOLOG(thread, 416 "thread %s is already exiting\n", 417 thread->name); 418 return 0; 419 } 420 421 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 422 SPDK_THREAD_EXIT_TIMEOUT_SEC); 423 thread->state = SPDK_THREAD_STATE_EXITING; 424 return 0; 425 } 426 427 bool 428 spdk_thread_is_exited(struct spdk_thread *thread) 429 { 430 return thread->state == SPDK_THREAD_STATE_EXITED; 431 } 432 433 void 434 spdk_thread_destroy(struct spdk_thread *thread) 435 { 436 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 437 438 assert(thread->state == SPDK_THREAD_STATE_EXITED); 439 440 if (tls_thread == thread) { 441 tls_thread = NULL; 442 } 443 444 _free_thread(thread); 445 } 446 447 void * 448 spdk_thread_get_ctx(struct spdk_thread *thread) 449 { 450 if (g_ctx_sz > 0) { 451 return thread->ctx; 452 } 453 454 return NULL; 455 } 456 457 struct spdk_cpuset * 458 spdk_thread_get_cpumask(struct spdk_thread *thread) 459 { 460 return &thread->cpumask; 461 } 462 463 int 464 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 465 { 466 struct spdk_thread *thread; 467 468 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 469 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 470 assert(false); 471 return -ENOTSUP; 472 } 473 474 thread = spdk_get_thread(); 475 if (!thread) { 476 SPDK_ERRLOG("Called from non-SPDK thread\n"); 477 assert(false); 478 return -EINVAL; 479 } 480 481 spdk_cpuset_copy(&thread->cpumask, cpumask); 482 483 /* Invoke framework's reschedule operation. If this function is called multiple times 484 * in a single spdk_thread_poll() context, the last cpumask will be used in the 485 * reschedule operation. 486 */ 487 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 488 489 return 0; 490 } 491 492 struct spdk_thread * 493 spdk_thread_get_from_ctx(void *ctx) 494 { 495 if (ctx == NULL) { 496 assert(false); 497 return NULL; 498 } 499 500 assert(g_ctx_sz > 0); 501 502 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 503 } 504 505 static inline uint32_t 506 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 507 { 508 unsigned count, i; 509 void *messages[SPDK_MSG_BATCH_SIZE]; 510 uint64_t notify = 1; 511 int rc; 512 513 #ifdef DEBUG 514 /* 515 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 516 * so we will never actually read uninitialized data from events, but just to be sure 517 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 518 */ 519 memset(messages, 0, sizeof(messages)); 520 #endif 521 522 if (max_msgs > 0) { 523 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 524 } else { 525 max_msgs = SPDK_MSG_BATCH_SIZE; 526 } 527 if (thread->interrupt_mode) { 528 /* There may be race between msg_acknowledge and another producer's msg_notify, 529 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 530 * This can avoid msg notification missing. 531 */ 532 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 533 if (rc < 0 && errno != EAGAIN) { 534 SPDK_ERRLOG("failed to acknowledge msg_queue: %s.\n", spdk_strerror(errno)); 535 } 536 } 537 538 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 539 if (thread->interrupt_mode && spdk_ring_count(thread->messages) != 0) { 540 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 541 if (rc < 0) { 542 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 543 } 544 } 545 if (count == 0) { 546 return 0; 547 } 548 549 for (i = 0; i < count; i++) { 550 struct spdk_msg *msg = messages[i]; 551 552 assert(msg != NULL); 553 msg->fn(msg->arg); 554 555 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 556 /* Insert the messages at the head. We want to re-use the hot 557 * ones. */ 558 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 559 thread->msg_cache_count++; 560 } else { 561 spdk_mempool_put(g_spdk_msg_mempool, msg); 562 } 563 } 564 565 return count; 566 } 567 568 static void 569 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 570 { 571 struct spdk_poller *iter; 572 573 poller->next_run_tick = now + poller->period_ticks; 574 575 /* 576 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 577 * run time. 578 */ 579 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 580 if (iter->next_run_tick <= poller->next_run_tick) { 581 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 582 return; 583 } 584 } 585 586 /* No earlier pollers were found, so this poller must be the new head */ 587 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 588 } 589 590 static void 591 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 592 { 593 if (poller->period_ticks) { 594 poller_insert_timer(thread, poller, spdk_get_ticks()); 595 } else { 596 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 597 } 598 } 599 600 static inline void 601 thread_update_stats(struct spdk_thread *thread, uint64_t end, 602 uint64_t start, int rc) 603 { 604 if (rc == 0) { 605 /* Poller status idle */ 606 thread->stats.idle_tsc += end - start; 607 } else if (rc > 0) { 608 /* Poller status busy */ 609 thread->stats.busy_tsc += end - start; 610 } 611 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 612 thread->tsc_last = end; 613 } 614 615 static int 616 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 617 { 618 uint32_t msg_count; 619 struct spdk_poller *poller, *tmp; 620 spdk_msg_fn critical_msg; 621 int rc = 0; 622 623 thread->tsc_last = now; 624 625 critical_msg = thread->critical_msg; 626 if (spdk_unlikely(critical_msg != NULL)) { 627 critical_msg(NULL); 628 thread->critical_msg = NULL; 629 rc = 1; 630 } 631 632 msg_count = msg_queue_run_batch(thread, max_msgs); 633 if (msg_count) { 634 rc = 1; 635 } 636 637 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 638 active_pollers_head, tailq, tmp) { 639 int poller_rc; 640 641 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 642 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 643 free(poller); 644 continue; 645 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 646 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 647 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 648 poller->state = SPDK_POLLER_STATE_PAUSED; 649 continue; 650 } 651 652 poller->state = SPDK_POLLER_STATE_RUNNING; 653 poller_rc = poller->fn(poller->arg); 654 655 poller->run_count++; 656 if (poller_rc > 0) { 657 poller->busy_count++; 658 } 659 660 #ifdef DEBUG 661 if (poller_rc == -1) { 662 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 663 } 664 #endif 665 666 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 667 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 668 free(poller); 669 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 670 poller->state = SPDK_POLLER_STATE_WAITING; 671 } 672 673 if (poller_rc > rc) { 674 rc = poller_rc; 675 } 676 } 677 678 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 679 int timer_rc = 0; 680 681 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 682 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 683 free(poller); 684 continue; 685 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 686 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 687 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 688 poller->state = SPDK_POLLER_STATE_PAUSED; 689 continue; 690 } 691 692 if (now < poller->next_run_tick) { 693 break; 694 } 695 696 poller->state = SPDK_POLLER_STATE_RUNNING; 697 timer_rc = poller->fn(poller->arg); 698 699 poller->run_count++; 700 if (timer_rc > 0) { 701 poller->busy_count++; 702 } 703 704 #ifdef DEBUG 705 if (timer_rc == -1) { 706 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 707 } 708 #endif 709 710 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 711 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 712 free(poller); 713 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 714 poller->state = SPDK_POLLER_STATE_WAITING; 715 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 716 poller_insert_timer(thread, poller, now); 717 } 718 719 if (timer_rc > rc) { 720 rc = timer_rc; 721 } 722 } 723 724 return rc; 725 } 726 727 int 728 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 729 { 730 struct spdk_thread *orig_thread; 731 int rc; 732 733 orig_thread = _get_thread(); 734 tls_thread = thread; 735 736 if (now == 0) { 737 now = spdk_get_ticks(); 738 } 739 740 if (!thread->interrupt_mode) { 741 rc = thread_poll(thread, max_msgs, now); 742 } else { 743 /* Non-block wait on thread's fd_group */ 744 rc = spdk_fd_group_wait(thread->fgrp, 0); 745 } 746 747 748 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 749 thread_exit(thread, now); 750 } 751 752 thread_update_stats(thread, spdk_get_ticks(), now, rc); 753 754 tls_thread = orig_thread; 755 756 return rc; 757 } 758 759 uint64_t 760 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 761 { 762 struct spdk_poller *poller; 763 764 poller = TAILQ_FIRST(&thread->timed_pollers); 765 if (poller) { 766 return poller->next_run_tick; 767 } 768 769 return 0; 770 } 771 772 int 773 spdk_thread_has_active_pollers(struct spdk_thread *thread) 774 { 775 return !TAILQ_EMPTY(&thread->active_pollers); 776 } 777 778 static bool 779 thread_has_unpaused_pollers(struct spdk_thread *thread) 780 { 781 if (TAILQ_EMPTY(&thread->active_pollers) && 782 TAILQ_EMPTY(&thread->timed_pollers)) { 783 return false; 784 } 785 786 return true; 787 } 788 789 bool 790 spdk_thread_has_pollers(struct spdk_thread *thread) 791 { 792 if (!thread_has_unpaused_pollers(thread) && 793 TAILQ_EMPTY(&thread->paused_pollers)) { 794 return false; 795 } 796 797 return true; 798 } 799 800 bool 801 spdk_thread_is_idle(struct spdk_thread *thread) 802 { 803 if (spdk_ring_count(thread->messages) || 804 thread_has_unpaused_pollers(thread) || 805 thread->critical_msg != NULL) { 806 return false; 807 } 808 809 return true; 810 } 811 812 uint32_t 813 spdk_thread_get_count(void) 814 { 815 /* 816 * Return cached value of the current thread count. We could acquire the 817 * lock and iterate through the TAILQ of threads to count them, but that 818 * count could still be invalidated after we release the lock. 819 */ 820 return g_thread_count; 821 } 822 823 struct spdk_thread * 824 spdk_get_thread(void) 825 { 826 return _get_thread(); 827 } 828 829 const char * 830 spdk_thread_get_name(const struct spdk_thread *thread) 831 { 832 return thread->name; 833 } 834 835 uint64_t 836 spdk_thread_get_id(const struct spdk_thread *thread) 837 { 838 return thread->id; 839 } 840 841 struct spdk_thread * 842 spdk_thread_get_by_id(uint64_t id) 843 { 844 struct spdk_thread *thread; 845 846 if (id == 0 || id >= g_thread_id) { 847 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 848 return NULL; 849 } 850 pthread_mutex_lock(&g_devlist_mutex); 851 TAILQ_FOREACH(thread, &g_threads, tailq) { 852 if (thread->id == id) { 853 break; 854 } 855 } 856 pthread_mutex_unlock(&g_devlist_mutex); 857 return thread; 858 } 859 860 int 861 spdk_thread_get_stats(struct spdk_thread_stats *stats) 862 { 863 struct spdk_thread *thread; 864 865 thread = _get_thread(); 866 if (!thread) { 867 SPDK_ERRLOG("No thread allocated\n"); 868 return -EINVAL; 869 } 870 871 if (stats == NULL) { 872 return -EINVAL; 873 } 874 875 *stats = thread->stats; 876 877 return 0; 878 } 879 880 uint64_t 881 spdk_thread_get_last_tsc(struct spdk_thread *thread) 882 { 883 if (thread == NULL) { 884 thread = _get_thread(); 885 } 886 887 return thread->tsc_last; 888 } 889 890 int 891 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 892 { 893 struct spdk_thread *local_thread; 894 struct spdk_msg *msg; 895 int rc; 896 897 assert(thread != NULL); 898 899 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 900 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 901 return -EIO; 902 } 903 904 local_thread = _get_thread(); 905 906 msg = NULL; 907 if (local_thread != NULL) { 908 if (local_thread->msg_cache_count > 0) { 909 msg = SLIST_FIRST(&local_thread->msg_cache); 910 assert(msg != NULL); 911 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 912 local_thread->msg_cache_count--; 913 } 914 } 915 916 if (msg == NULL) { 917 msg = spdk_mempool_get(g_spdk_msg_mempool); 918 if (!msg) { 919 SPDK_ERRLOG("msg could not be allocated\n"); 920 return -ENOMEM; 921 } 922 } 923 924 msg->fn = fn; 925 msg->arg = ctx; 926 927 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 928 if (rc != 1) { 929 SPDK_ERRLOG("msg could not be enqueued\n"); 930 spdk_mempool_put(g_spdk_msg_mempool, msg); 931 return -EIO; 932 } 933 934 if (thread->interrupt_mode) { 935 uint64_t notify = 1; 936 937 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 938 if (rc < 0) { 939 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 940 return -EIO; 941 } 942 } 943 944 return 0; 945 } 946 947 int 948 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 949 { 950 spdk_msg_fn expected = NULL; 951 952 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 953 __ATOMIC_SEQ_CST)) { 954 if (thread->interrupt_mode) { 955 uint64_t notify = 1; 956 int rc; 957 958 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 959 if (rc < 0) { 960 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 961 return -EIO; 962 } 963 } 964 965 return 0; 966 } 967 968 return -EIO; 969 } 970 971 #ifdef __linux__ 972 static int 973 interrupt_timerfd_prepare(uint64_t period_microseconds) 974 { 975 int timerfd; 976 int ret; 977 struct itimerspec new_tv; 978 uint64_t period_seconds; 979 uint64_t period_nanoseconds; 980 981 if (period_microseconds == 0) { 982 return -EINVAL; 983 } 984 985 period_seconds = period_microseconds / SPDK_SEC_TO_USEC; 986 period_nanoseconds = period_microseconds % SPDK_SEC_TO_USEC * 1000; 987 988 new_tv.it_value.tv_sec = period_seconds; 989 new_tv.it_value.tv_nsec = period_nanoseconds; 990 991 new_tv.it_interval.tv_sec = period_seconds; 992 new_tv.it_interval.tv_nsec = period_nanoseconds; 993 994 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK & TFD_CLOEXEC); 995 if (timerfd < 0) { 996 return -errno; 997 } 998 999 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1000 if (ret < 0) { 1001 close(timerfd); 1002 return -errno; 1003 } 1004 1005 return timerfd; 1006 } 1007 #else 1008 static int 1009 interrupt_timerfd_prepare(uint64_t period_microseconds) 1010 { 1011 return -ENOTSUP; 1012 } 1013 #endif 1014 1015 static int 1016 interrupt_timerfd_process(void *arg) 1017 { 1018 struct spdk_poller *poller = arg; 1019 uint64_t exp; 1020 int rc; 1021 1022 /* clear the level of interval timer */ 1023 rc = read(poller->timerfd, &exp, sizeof(exp)); 1024 if (rc < 0) { 1025 if (rc == -EAGAIN) { 1026 return 0; 1027 } 1028 1029 return rc; 1030 } 1031 1032 return poller->fn(poller->arg); 1033 } 1034 1035 static int 1036 thread_interrupt_register_timerfd(struct spdk_fd_group *fgrp, 1037 uint64_t period_microseconds, 1038 struct spdk_poller *poller) 1039 { 1040 int timerfd; 1041 int rc; 1042 1043 timerfd = interrupt_timerfd_prepare(period_microseconds); 1044 if (timerfd < 0) { 1045 return timerfd; 1046 } 1047 1048 rc = spdk_fd_group_add(fgrp, timerfd, 1049 interrupt_timerfd_process, poller); 1050 if (rc < 0) { 1051 close(timerfd); 1052 return rc; 1053 } 1054 1055 return timerfd; 1056 } 1057 1058 static struct spdk_poller * 1059 poller_register(spdk_poller_fn fn, 1060 void *arg, 1061 uint64_t period_microseconds, 1062 const char *name) 1063 { 1064 struct spdk_thread *thread; 1065 struct spdk_poller *poller; 1066 uint64_t quotient, remainder, ticks; 1067 1068 thread = spdk_get_thread(); 1069 if (!thread) { 1070 assert(false); 1071 return NULL; 1072 } 1073 1074 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1075 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1076 return NULL; 1077 } 1078 1079 poller = calloc(1, sizeof(*poller)); 1080 if (poller == NULL) { 1081 SPDK_ERRLOG("Poller memory allocation failed\n"); 1082 return NULL; 1083 } 1084 1085 if (name) { 1086 snprintf(poller->name, sizeof(poller->name), "%s", name); 1087 } else { 1088 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1089 } 1090 1091 poller->state = SPDK_POLLER_STATE_WAITING; 1092 poller->fn = fn; 1093 poller->arg = arg; 1094 poller->thread = thread; 1095 1096 if (period_microseconds) { 1097 quotient = period_microseconds / SPDK_SEC_TO_USEC; 1098 remainder = period_microseconds % SPDK_SEC_TO_USEC; 1099 ticks = spdk_get_ticks_hz(); 1100 1101 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1102 } else { 1103 poller->period_ticks = 0; 1104 } 1105 1106 if (thread->interrupt_mode && period_microseconds != 0) { 1107 int rc; 1108 1109 poller->timerfd = -1; 1110 rc = thread_interrupt_register_timerfd(thread->fgrp, period_microseconds, poller); 1111 if (rc < 0) { 1112 SPDK_ERRLOG("Failed to register timerfd for periodic poller: %s\n", spdk_strerror(-rc)); 1113 free(poller); 1114 return NULL; 1115 } 1116 poller->timerfd = rc; 1117 } 1118 1119 thread_insert_poller(thread, poller); 1120 1121 return poller; 1122 } 1123 1124 struct spdk_poller * 1125 spdk_poller_register(spdk_poller_fn fn, 1126 void *arg, 1127 uint64_t period_microseconds) 1128 { 1129 return poller_register(fn, arg, period_microseconds, NULL); 1130 } 1131 1132 struct spdk_poller * 1133 spdk_poller_register_named(spdk_poller_fn fn, 1134 void *arg, 1135 uint64_t period_microseconds, 1136 const char *name) 1137 { 1138 return poller_register(fn, arg, period_microseconds, name); 1139 } 1140 1141 void 1142 spdk_poller_unregister(struct spdk_poller **ppoller) 1143 { 1144 struct spdk_thread *thread; 1145 struct spdk_poller *poller; 1146 1147 poller = *ppoller; 1148 if (poller == NULL) { 1149 return; 1150 } 1151 1152 *ppoller = NULL; 1153 1154 thread = spdk_get_thread(); 1155 if (!thread) { 1156 assert(false); 1157 return; 1158 } 1159 1160 if (poller->thread != thread) { 1161 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1162 assert(false); 1163 return; 1164 } 1165 1166 if (thread->interrupt_mode && poller->timerfd >= 0) { 1167 spdk_fd_group_remove(thread->fgrp, poller->timerfd); 1168 close(poller->timerfd); 1169 poller->timerfd = -1; 1170 } 1171 1172 /* If the poller was paused, put it on the active_pollers list so that 1173 * its unregistration can be processed by spdk_thread_poll(). 1174 */ 1175 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1176 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1177 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1178 poller->period_ticks = 0; 1179 } 1180 1181 /* Simply set the state to unregistered. The poller will get cleaned up 1182 * in a subsequent call to spdk_thread_poll(). 1183 */ 1184 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1185 } 1186 1187 void 1188 spdk_poller_pause(struct spdk_poller *poller) 1189 { 1190 struct spdk_thread *thread; 1191 1192 if (poller->state == SPDK_POLLER_STATE_PAUSED || 1193 poller->state == SPDK_POLLER_STATE_PAUSING) { 1194 return; 1195 } 1196 1197 thread = spdk_get_thread(); 1198 if (!thread) { 1199 assert(false); 1200 return; 1201 } 1202 1203 /* If a poller is paused from within itself, we can immediately move it 1204 * on the paused_pollers list. Otherwise we just set its state to 1205 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 1206 * allows a poller to be paused from another one's context without 1207 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 1208 */ 1209 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 1210 poller->state = SPDK_POLLER_STATE_PAUSING; 1211 } else { 1212 if (poller->period_ticks > 0) { 1213 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 1214 } else { 1215 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1216 } 1217 1218 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1219 poller->state = SPDK_POLLER_STATE_PAUSED; 1220 } 1221 } 1222 1223 void 1224 spdk_poller_resume(struct spdk_poller *poller) 1225 { 1226 struct spdk_thread *thread; 1227 1228 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1229 poller->state != SPDK_POLLER_STATE_PAUSING) { 1230 return; 1231 } 1232 1233 thread = spdk_get_thread(); 1234 if (!thread) { 1235 assert(false); 1236 return; 1237 } 1238 1239 /* If a poller is paused it has to be removed from the paused pollers 1240 * list and put on the active / timer list depending on its 1241 * period_ticks. If a poller is still in the process of being paused, 1242 * we just need to flip its state back to waiting, as it's already on 1243 * the appropriate list. 1244 */ 1245 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1246 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1247 thread_insert_poller(thread, poller); 1248 } 1249 1250 poller->state = SPDK_POLLER_STATE_WAITING; 1251 } 1252 1253 const char * 1254 spdk_poller_state_str(enum spdk_poller_state state) 1255 { 1256 switch (state) { 1257 case SPDK_POLLER_STATE_WAITING: 1258 return "waiting"; 1259 case SPDK_POLLER_STATE_RUNNING: 1260 return "running"; 1261 case SPDK_POLLER_STATE_UNREGISTERED: 1262 return "unregistered"; 1263 case SPDK_POLLER_STATE_PAUSING: 1264 return "pausing"; 1265 case SPDK_POLLER_STATE_PAUSED: 1266 return "paused"; 1267 default: 1268 return NULL; 1269 } 1270 } 1271 1272 struct call_thread { 1273 struct spdk_thread *cur_thread; 1274 spdk_msg_fn fn; 1275 void *ctx; 1276 1277 struct spdk_thread *orig_thread; 1278 spdk_msg_fn cpl; 1279 }; 1280 1281 static void 1282 _on_thread(void *ctx) 1283 { 1284 struct call_thread *ct = ctx; 1285 int rc __attribute__((unused)); 1286 1287 ct->fn(ct->ctx); 1288 1289 pthread_mutex_lock(&g_devlist_mutex); 1290 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1291 pthread_mutex_unlock(&g_devlist_mutex); 1292 1293 if (!ct->cur_thread) { 1294 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1295 1296 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1297 free(ctx); 1298 } else { 1299 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1300 ct->cur_thread->name); 1301 1302 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1303 } 1304 assert(rc == 0); 1305 } 1306 1307 void 1308 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1309 { 1310 struct call_thread *ct; 1311 struct spdk_thread *thread; 1312 int rc __attribute__((unused)); 1313 1314 ct = calloc(1, sizeof(*ct)); 1315 if (!ct) { 1316 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1317 cpl(ctx); 1318 return; 1319 } 1320 1321 ct->fn = fn; 1322 ct->ctx = ctx; 1323 ct->cpl = cpl; 1324 1325 thread = _get_thread(); 1326 if (!thread) { 1327 SPDK_ERRLOG("No thread allocated\n"); 1328 free(ct); 1329 cpl(ctx); 1330 return; 1331 } 1332 ct->orig_thread = thread; 1333 1334 pthread_mutex_lock(&g_devlist_mutex); 1335 ct->cur_thread = TAILQ_FIRST(&g_threads); 1336 pthread_mutex_unlock(&g_devlist_mutex); 1337 1338 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1339 ct->orig_thread->name); 1340 1341 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1342 assert(rc == 0); 1343 } 1344 1345 void 1346 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1347 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1348 const char *name) 1349 { 1350 struct io_device *dev, *tmp; 1351 struct spdk_thread *thread; 1352 1353 assert(io_device != NULL); 1354 assert(create_cb != NULL); 1355 assert(destroy_cb != NULL); 1356 1357 thread = spdk_get_thread(); 1358 if (!thread) { 1359 SPDK_ERRLOG("called from non-SPDK thread\n"); 1360 assert(false); 1361 return; 1362 } 1363 1364 dev = calloc(1, sizeof(struct io_device)); 1365 if (dev == NULL) { 1366 SPDK_ERRLOG("could not allocate io_device\n"); 1367 return; 1368 } 1369 1370 dev->io_device = io_device; 1371 if (name) { 1372 snprintf(dev->name, sizeof(dev->name), "%s", name); 1373 } else { 1374 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1375 } 1376 dev->create_cb = create_cb; 1377 dev->destroy_cb = destroy_cb; 1378 dev->unregister_cb = NULL; 1379 dev->ctx_size = ctx_size; 1380 dev->for_each_count = 0; 1381 dev->unregistered = false; 1382 dev->refcnt = 0; 1383 1384 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1385 dev->name, dev->io_device, thread->name); 1386 1387 pthread_mutex_lock(&g_devlist_mutex); 1388 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1389 if (tmp->io_device == io_device) { 1390 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1391 io_device, tmp->name, dev->name); 1392 free(dev); 1393 pthread_mutex_unlock(&g_devlist_mutex); 1394 return; 1395 } 1396 } 1397 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1398 pthread_mutex_unlock(&g_devlist_mutex); 1399 } 1400 1401 static void 1402 _finish_unregister(void *arg) 1403 { 1404 struct io_device *dev = arg; 1405 struct spdk_thread *thread; 1406 1407 thread = spdk_get_thread(); 1408 assert(thread == dev->unregister_thread); 1409 1410 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1411 dev->name, dev->io_device, thread->name); 1412 1413 assert(thread->pending_unregister_count > 0); 1414 thread->pending_unregister_count--; 1415 1416 dev->unregister_cb(dev->io_device); 1417 free(dev); 1418 } 1419 1420 static void 1421 io_device_free(struct io_device *dev) 1422 { 1423 int rc __attribute__((unused)); 1424 1425 if (dev->unregister_cb == NULL) { 1426 free(dev); 1427 } else { 1428 assert(dev->unregister_thread != NULL); 1429 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1430 dev->name, dev->io_device, dev->unregister_thread->name); 1431 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1432 assert(rc == 0); 1433 } 1434 } 1435 1436 void 1437 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1438 { 1439 struct io_device *dev; 1440 uint32_t refcnt; 1441 struct spdk_thread *thread; 1442 1443 thread = spdk_get_thread(); 1444 if (!thread) { 1445 SPDK_ERRLOG("called from non-SPDK thread\n"); 1446 assert(false); 1447 return; 1448 } 1449 1450 pthread_mutex_lock(&g_devlist_mutex); 1451 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1452 if (dev->io_device == io_device) { 1453 break; 1454 } 1455 } 1456 1457 if (!dev) { 1458 SPDK_ERRLOG("io_device %p not found\n", io_device); 1459 assert(false); 1460 pthread_mutex_unlock(&g_devlist_mutex); 1461 return; 1462 } 1463 1464 if (dev->for_each_count > 0) { 1465 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1466 dev->name, io_device, dev->for_each_count); 1467 pthread_mutex_unlock(&g_devlist_mutex); 1468 return; 1469 } 1470 1471 dev->unregister_cb = unregister_cb; 1472 dev->unregistered = true; 1473 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1474 refcnt = dev->refcnt; 1475 dev->unregister_thread = thread; 1476 pthread_mutex_unlock(&g_devlist_mutex); 1477 1478 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1479 dev->name, dev->io_device, thread->name); 1480 1481 if (unregister_cb) { 1482 thread->pending_unregister_count++; 1483 } 1484 1485 if (refcnt > 0) { 1486 /* defer deletion */ 1487 return; 1488 } 1489 1490 io_device_free(dev); 1491 } 1492 1493 const char * 1494 spdk_io_device_get_name(struct io_device *dev) 1495 { 1496 return dev->name; 1497 } 1498 1499 struct spdk_io_channel * 1500 spdk_get_io_channel(void *io_device) 1501 { 1502 struct spdk_io_channel *ch; 1503 struct spdk_thread *thread; 1504 struct io_device *dev; 1505 int rc; 1506 1507 pthread_mutex_lock(&g_devlist_mutex); 1508 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1509 if (dev->io_device == io_device) { 1510 break; 1511 } 1512 } 1513 if (dev == NULL) { 1514 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1515 pthread_mutex_unlock(&g_devlist_mutex); 1516 return NULL; 1517 } 1518 1519 thread = _get_thread(); 1520 if (!thread) { 1521 SPDK_ERRLOG("No thread allocated\n"); 1522 pthread_mutex_unlock(&g_devlist_mutex); 1523 return NULL; 1524 } 1525 1526 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1527 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1528 pthread_mutex_unlock(&g_devlist_mutex); 1529 return NULL; 1530 } 1531 1532 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1533 if (ch->dev == dev) { 1534 ch->ref++; 1535 1536 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1537 ch, dev->name, dev->io_device, thread->name, ch->ref); 1538 1539 /* 1540 * An I/O channel already exists for this device on this 1541 * thread, so return it. 1542 */ 1543 pthread_mutex_unlock(&g_devlist_mutex); 1544 return ch; 1545 } 1546 } 1547 1548 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1549 if (ch == NULL) { 1550 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1551 pthread_mutex_unlock(&g_devlist_mutex); 1552 return NULL; 1553 } 1554 1555 ch->dev = dev; 1556 ch->destroy_cb = dev->destroy_cb; 1557 ch->thread = thread; 1558 ch->ref = 1; 1559 ch->destroy_ref = 0; 1560 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1561 1562 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1563 ch, dev->name, dev->io_device, thread->name, ch->ref); 1564 1565 dev->refcnt++; 1566 1567 pthread_mutex_unlock(&g_devlist_mutex); 1568 1569 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1570 if (rc != 0) { 1571 pthread_mutex_lock(&g_devlist_mutex); 1572 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1573 dev->refcnt--; 1574 free(ch); 1575 pthread_mutex_unlock(&g_devlist_mutex); 1576 return NULL; 1577 } 1578 1579 return ch; 1580 } 1581 1582 static void 1583 put_io_channel(void *arg) 1584 { 1585 struct spdk_io_channel *ch = arg; 1586 bool do_remove_dev = true; 1587 struct spdk_thread *thread; 1588 1589 thread = spdk_get_thread(); 1590 if (!thread) { 1591 SPDK_ERRLOG("called from non-SPDK thread\n"); 1592 assert(false); 1593 return; 1594 } 1595 1596 SPDK_DEBUGLOG(thread, 1597 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1598 ch, ch->dev->name, ch->dev->io_device, thread->name); 1599 1600 assert(ch->thread == thread); 1601 1602 ch->destroy_ref--; 1603 1604 if (ch->ref > 0 || ch->destroy_ref > 0) { 1605 /* 1606 * Another reference to the associated io_device was requested 1607 * after this message was sent but before it had a chance to 1608 * execute. 1609 */ 1610 return; 1611 } 1612 1613 pthread_mutex_lock(&g_devlist_mutex); 1614 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1615 pthread_mutex_unlock(&g_devlist_mutex); 1616 1617 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1618 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1619 1620 pthread_mutex_lock(&g_devlist_mutex); 1621 ch->dev->refcnt--; 1622 1623 if (!ch->dev->unregistered) { 1624 do_remove_dev = false; 1625 } 1626 1627 if (ch->dev->refcnt > 0) { 1628 do_remove_dev = false; 1629 } 1630 1631 pthread_mutex_unlock(&g_devlist_mutex); 1632 1633 if (do_remove_dev) { 1634 io_device_free(ch->dev); 1635 } 1636 free(ch); 1637 } 1638 1639 void 1640 spdk_put_io_channel(struct spdk_io_channel *ch) 1641 { 1642 struct spdk_thread *thread; 1643 int rc __attribute__((unused)); 1644 1645 thread = spdk_get_thread(); 1646 if (!thread) { 1647 SPDK_ERRLOG("called from non-SPDK thread\n"); 1648 assert(false); 1649 return; 1650 } 1651 1652 if (ch->thread != thread) { 1653 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1654 assert(false); 1655 return; 1656 } 1657 1658 SPDK_DEBUGLOG(thread, 1659 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1660 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1661 1662 ch->ref--; 1663 1664 if (ch->ref == 0) { 1665 ch->destroy_ref++; 1666 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 1667 assert(rc == 0); 1668 } 1669 } 1670 1671 struct spdk_io_channel * 1672 spdk_io_channel_from_ctx(void *ctx) 1673 { 1674 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1675 } 1676 1677 struct spdk_thread * 1678 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1679 { 1680 return ch->thread; 1681 } 1682 1683 void * 1684 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 1685 { 1686 return ch->dev->io_device; 1687 } 1688 1689 struct spdk_io_channel_iter { 1690 void *io_device; 1691 struct io_device *dev; 1692 spdk_channel_msg fn; 1693 int status; 1694 void *ctx; 1695 struct spdk_io_channel *ch; 1696 1697 struct spdk_thread *cur_thread; 1698 1699 struct spdk_thread *orig_thread; 1700 spdk_channel_for_each_cpl cpl; 1701 }; 1702 1703 void * 1704 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1705 { 1706 return i->io_device; 1707 } 1708 1709 struct spdk_io_channel * 1710 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1711 { 1712 return i->ch; 1713 } 1714 1715 void * 1716 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1717 { 1718 return i->ctx; 1719 } 1720 1721 static void 1722 _call_completion(void *ctx) 1723 { 1724 struct spdk_io_channel_iter *i = ctx; 1725 1726 if (i->cpl != NULL) { 1727 i->cpl(i, i->status); 1728 } 1729 free(i); 1730 } 1731 1732 static void 1733 _call_channel(void *ctx) 1734 { 1735 struct spdk_io_channel_iter *i = ctx; 1736 struct spdk_io_channel *ch; 1737 1738 /* 1739 * It is possible that the channel was deleted before this 1740 * message had a chance to execute. If so, skip calling 1741 * the fn() on this thread. 1742 */ 1743 pthread_mutex_lock(&g_devlist_mutex); 1744 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1745 if (ch->dev->io_device == i->io_device) { 1746 break; 1747 } 1748 } 1749 pthread_mutex_unlock(&g_devlist_mutex); 1750 1751 if (ch) { 1752 i->fn(i); 1753 } else { 1754 spdk_for_each_channel_continue(i, 0); 1755 } 1756 } 1757 1758 void 1759 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1760 spdk_channel_for_each_cpl cpl) 1761 { 1762 struct spdk_thread *thread; 1763 struct spdk_io_channel *ch; 1764 struct spdk_io_channel_iter *i; 1765 int rc __attribute__((unused)); 1766 1767 i = calloc(1, sizeof(*i)); 1768 if (!i) { 1769 SPDK_ERRLOG("Unable to allocate iterator\n"); 1770 return; 1771 } 1772 1773 i->io_device = io_device; 1774 i->fn = fn; 1775 i->ctx = ctx; 1776 i->cpl = cpl; 1777 1778 pthread_mutex_lock(&g_devlist_mutex); 1779 i->orig_thread = _get_thread(); 1780 1781 TAILQ_FOREACH(thread, &g_threads, tailq) { 1782 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1783 if (ch->dev->io_device == io_device) { 1784 ch->dev->for_each_count++; 1785 i->dev = ch->dev; 1786 i->cur_thread = thread; 1787 i->ch = ch; 1788 pthread_mutex_unlock(&g_devlist_mutex); 1789 rc = spdk_thread_send_msg(thread, _call_channel, i); 1790 assert(rc == 0); 1791 return; 1792 } 1793 } 1794 } 1795 1796 pthread_mutex_unlock(&g_devlist_mutex); 1797 1798 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1799 assert(rc == 0); 1800 } 1801 1802 void 1803 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 1804 { 1805 struct spdk_thread *thread; 1806 struct spdk_io_channel *ch; 1807 int rc __attribute__((unused)); 1808 1809 assert(i->cur_thread == spdk_get_thread()); 1810 1811 i->status = status; 1812 1813 pthread_mutex_lock(&g_devlist_mutex); 1814 if (status) { 1815 goto end; 1816 } 1817 thread = TAILQ_NEXT(i->cur_thread, tailq); 1818 while (thread) { 1819 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1820 if (ch->dev->io_device == i->io_device) { 1821 i->cur_thread = thread; 1822 i->ch = ch; 1823 pthread_mutex_unlock(&g_devlist_mutex); 1824 rc = spdk_thread_send_msg(thread, _call_channel, i); 1825 assert(rc == 0); 1826 return; 1827 } 1828 } 1829 thread = TAILQ_NEXT(thread, tailq); 1830 } 1831 1832 end: 1833 i->dev->for_each_count--; 1834 i->ch = NULL; 1835 pthread_mutex_unlock(&g_devlist_mutex); 1836 1837 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1838 assert(rc == 0); 1839 } 1840 1841 struct spdk_interrupt { 1842 int efd; 1843 struct spdk_thread *thread; 1844 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 1845 }; 1846 1847 static void 1848 thread_interrupt_destroy(struct spdk_thread *thread) 1849 { 1850 struct spdk_fd_group *fgrp = thread->fgrp; 1851 1852 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 1853 1854 if (thread->msg_fd < 0) { 1855 return; 1856 } 1857 1858 spdk_fd_group_remove(fgrp, thread->msg_fd); 1859 close(thread->msg_fd); 1860 thread->msg_fd = -1; 1861 1862 spdk_fd_group_destroy(fgrp); 1863 thread->fgrp = NULL; 1864 } 1865 1866 #ifdef __linux__ 1867 static int 1868 thread_interrupt_msg_process(void *arg) 1869 { 1870 struct spdk_thread *thread = arg; 1871 uint32_t msg_count; 1872 spdk_msg_fn critical_msg; 1873 int rc = 0; 1874 1875 critical_msg = thread->critical_msg; 1876 if (spdk_unlikely(critical_msg != NULL)) { 1877 critical_msg(NULL); 1878 thread->critical_msg = NULL; 1879 rc = 1; 1880 } 1881 1882 msg_count = msg_queue_run_batch(thread, 0); 1883 if (msg_count) { 1884 rc = 1; 1885 } 1886 1887 return rc; 1888 } 1889 1890 static int 1891 thread_interrupt_create(struct spdk_thread *thread) 1892 { 1893 int rc; 1894 1895 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 1896 1897 rc = spdk_fd_group_create(&thread->fgrp); 1898 if (rc) { 1899 return rc; 1900 } 1901 1902 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1903 if (thread->msg_fd < 0) { 1904 rc = -errno; 1905 spdk_fd_group_destroy(thread->fgrp); 1906 thread->fgrp = NULL; 1907 1908 return rc; 1909 } 1910 1911 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 1912 } 1913 #else 1914 static int 1915 thread_interrupt_create(struct spdk_thread *thread) 1916 { 1917 return -ENOTSUP; 1918 } 1919 #endif 1920 1921 struct spdk_interrupt * 1922 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 1923 void *arg, const char *name) 1924 { 1925 struct spdk_thread *thread; 1926 struct spdk_interrupt *intr; 1927 1928 thread = spdk_get_thread(); 1929 if (!thread) { 1930 assert(false); 1931 return NULL; 1932 } 1933 1934 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 1935 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1936 return NULL; 1937 } 1938 1939 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 1940 return NULL; 1941 } 1942 1943 intr = calloc(1, sizeof(*intr)); 1944 if (intr == NULL) { 1945 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 1946 return NULL; 1947 } 1948 1949 if (name) { 1950 snprintf(intr->name, sizeof(intr->name), "%s", name); 1951 } else { 1952 snprintf(intr->name, sizeof(intr->name), "%p", fn); 1953 } 1954 1955 intr->efd = efd; 1956 intr->thread = thread; 1957 1958 return intr; 1959 } 1960 1961 void 1962 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 1963 { 1964 struct spdk_thread *thread; 1965 struct spdk_interrupt *intr; 1966 1967 intr = *pintr; 1968 if (intr == NULL) { 1969 return; 1970 } 1971 1972 *pintr = NULL; 1973 1974 thread = spdk_get_thread(); 1975 if (!thread) { 1976 assert(false); 1977 return; 1978 } 1979 1980 if (intr->thread != thread) { 1981 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 1982 assert(false); 1983 return; 1984 } 1985 1986 spdk_fd_group_remove(thread->fgrp, intr->efd); 1987 free(intr); 1988 } 1989 1990 int 1991 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 1992 enum spdk_interrupt_event_types event_types) 1993 { 1994 struct spdk_thread *thread; 1995 1996 thread = spdk_get_thread(); 1997 if (!thread) { 1998 assert(false); 1999 return -EINVAL; 2000 } 2001 2002 if (intr->thread != thread) { 2003 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2004 assert(false); 2005 return -EINVAL; 2006 } 2007 2008 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2009 } 2010 2011 int 2012 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2013 { 2014 return spdk_fd_group_get_fd(thread->fgrp); 2015 } 2016 2017 static bool g_interrupt_mode = false; 2018 2019 int 2020 spdk_interrupt_mode_enable(void) 2021 { 2022 #ifdef __linux__ 2023 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2024 g_interrupt_mode = true; 2025 return 0; 2026 #else 2027 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2028 g_interrupt_mode = false; 2029 return -ENOTSUP; 2030 #endif 2031 } 2032 2033 bool 2034 spdk_interrupt_mode_is_enabled(void) 2035 { 2036 return g_interrupt_mode; 2037 } 2038 2039 SPDK_LOG_REGISTER_COMPONENT(thread) 2040