1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/env.h" 37 #include "spdk/likely.h" 38 #include "spdk/queue.h" 39 #include "spdk/string.h" 40 #include "spdk/thread.h" 41 #include "spdk/util.h" 42 #include "spdk/fd_group.h" 43 44 #include "spdk/log.h" 45 #include "spdk_internal/thread.h" 46 47 #ifdef __linux__ 48 #include <sys/timerfd.h> 49 #include <sys/eventfd.h> 50 #endif 51 52 #define SPDK_MSG_BATCH_SIZE 8 53 #define SPDK_MAX_DEVICE_NAME_LEN 256 54 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 55 56 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 57 58 static spdk_new_thread_fn g_new_thread_fn = NULL; 59 static spdk_thread_op_fn g_thread_op_fn = NULL; 60 static spdk_thread_op_supported_fn g_thread_op_supported_fn; 61 static size_t g_ctx_sz = 0; 62 /* Monotonic increasing ID is set to each created thread beginning at 1. Once the 63 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting 64 * SPDK application is required. 65 */ 66 static uint64_t g_thread_id = 1; 67 68 struct io_device { 69 void *io_device; 70 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; 71 spdk_io_channel_create_cb create_cb; 72 spdk_io_channel_destroy_cb destroy_cb; 73 spdk_io_device_unregister_cb unregister_cb; 74 struct spdk_thread *unregister_thread; 75 uint32_t ctx_size; 76 uint32_t for_each_count; 77 TAILQ_ENTRY(io_device) tailq; 78 79 uint32_t refcnt; 80 81 bool unregistered; 82 }; 83 84 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 85 86 struct spdk_msg { 87 spdk_msg_fn fn; 88 void *arg; 89 90 SLIST_ENTRY(spdk_msg) link; 91 }; 92 93 #define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 94 static struct spdk_mempool *g_spdk_msg_mempool = NULL; 95 96 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 97 static uint32_t g_thread_count = 0; 98 99 static __thread struct spdk_thread *tls_thread = NULL; 100 101 static inline struct spdk_thread * 102 _get_thread(void) 103 { 104 return tls_thread; 105 } 106 107 static int 108 _thread_lib_init(size_t ctx_sz) 109 { 110 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; 111 112 g_ctx_sz = ctx_sz; 113 114 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); 115 g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 116 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 117 sizeof(struct spdk_msg), 118 0, /* No cache. We do our own. */ 119 SPDK_ENV_SOCKET_ID_ANY); 120 121 if (!g_spdk_msg_mempool) { 122 SPDK_ERRLOG("spdk_msg_mempool creation failed\n"); 123 return -1; 124 } 125 126 return 0; 127 } 128 129 int 130 spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) 131 { 132 assert(g_new_thread_fn == NULL); 133 assert(g_thread_op_fn == NULL); 134 135 if (new_thread_fn == NULL) { 136 SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n"); 137 } else { 138 g_new_thread_fn = new_thread_fn; 139 } 140 141 return _thread_lib_init(ctx_sz); 142 } 143 144 int 145 spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, 146 spdk_thread_op_supported_fn thread_op_supported_fn, 147 size_t ctx_sz) 148 { 149 assert(g_new_thread_fn == NULL); 150 assert(g_thread_op_fn == NULL); 151 assert(g_thread_op_supported_fn == NULL); 152 153 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { 154 SPDK_ERRLOG("Both must be defined or undefined together.\n"); 155 return -EINVAL; 156 } 157 158 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { 159 SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n"); 160 } else { 161 g_thread_op_fn = thread_op_fn; 162 g_thread_op_supported_fn = thread_op_supported_fn; 163 } 164 165 return _thread_lib_init(ctx_sz); 166 } 167 168 void 169 spdk_thread_lib_fini(void) 170 { 171 struct io_device *dev; 172 173 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 174 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); 175 } 176 177 if (g_spdk_msg_mempool) { 178 spdk_mempool_free(g_spdk_msg_mempool); 179 g_spdk_msg_mempool = NULL; 180 } 181 182 g_new_thread_fn = NULL; 183 g_thread_op_fn = NULL; 184 g_thread_op_supported_fn = NULL; 185 g_ctx_sz = 0; 186 } 187 188 static void thread_interrupt_destroy(struct spdk_thread *thread); 189 static int thread_interrupt_create(struct spdk_thread *thread); 190 191 static void 192 _free_thread(struct spdk_thread *thread) 193 { 194 struct spdk_io_channel *ch; 195 struct spdk_msg *msg; 196 struct spdk_poller *poller, *ptmp; 197 198 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 199 SPDK_ERRLOG("thread %s still has channel for io_device %s\n", 200 thread->name, ch->dev->name); 201 } 202 203 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { 204 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 205 SPDK_WARNLOG("active_poller %s still registered at thread exit\n", 206 poller->name); 207 } 208 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 209 free(poller); 210 } 211 212 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { 213 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 214 SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", 215 poller->name); 216 } 217 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 218 free(poller); 219 } 220 221 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { 222 SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name); 223 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 224 free(poller); 225 } 226 227 pthread_mutex_lock(&g_devlist_mutex); 228 assert(g_thread_count > 0); 229 g_thread_count--; 230 TAILQ_REMOVE(&g_threads, thread, tailq); 231 pthread_mutex_unlock(&g_devlist_mutex); 232 233 msg = SLIST_FIRST(&thread->msg_cache); 234 while (msg != NULL) { 235 SLIST_REMOVE_HEAD(&thread->msg_cache, link); 236 237 assert(thread->msg_cache_count > 0); 238 thread->msg_cache_count--; 239 spdk_mempool_put(g_spdk_msg_mempool, msg); 240 241 msg = SLIST_FIRST(&thread->msg_cache); 242 } 243 244 assert(thread->msg_cache_count == 0); 245 246 if (spdk_interrupt_mode_is_enabled()) { 247 thread_interrupt_destroy(thread); 248 } 249 250 spdk_ring_free(thread->messages); 251 free(thread); 252 } 253 254 struct spdk_thread * 255 spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) 256 { 257 struct spdk_thread *thread; 258 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; 259 int rc = 0, i; 260 261 thread = calloc(1, sizeof(*thread) + g_ctx_sz); 262 if (!thread) { 263 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 264 return NULL; 265 } 266 267 if (cpumask) { 268 spdk_cpuset_copy(&thread->cpumask, cpumask); 269 } else { 270 spdk_cpuset_negate(&thread->cpumask); 271 } 272 273 TAILQ_INIT(&thread->io_channels); 274 TAILQ_INIT(&thread->active_pollers); 275 TAILQ_INIT(&thread->timed_pollers); 276 TAILQ_INIT(&thread->paused_pollers); 277 SLIST_INIT(&thread->msg_cache); 278 thread->msg_cache_count = 0; 279 280 thread->tsc_last = spdk_get_ticks(); 281 282 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 283 if (!thread->messages) { 284 SPDK_ERRLOG("Unable to allocate memory for message ring\n"); 285 free(thread); 286 return NULL; 287 } 288 289 /* Fill the local message pool cache. */ 290 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); 291 if (rc == 0) { 292 /* If we can't populate the cache it's ok. The cache will get filled 293 * up organically as messages are passed to the thread. */ 294 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { 295 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); 296 thread->msg_cache_count++; 297 } 298 } 299 300 if (name) { 301 snprintf(thread->name, sizeof(thread->name), "%s", name); 302 } else { 303 snprintf(thread->name, sizeof(thread->name), "%p", thread); 304 } 305 306 pthread_mutex_lock(&g_devlist_mutex); 307 if (g_thread_id == 0) { 308 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); 309 pthread_mutex_unlock(&g_devlist_mutex); 310 _free_thread(thread); 311 return NULL; 312 } 313 thread->id = g_thread_id++; 314 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 315 g_thread_count++; 316 pthread_mutex_unlock(&g_devlist_mutex); 317 318 SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", 319 thread->id, thread->name); 320 321 if (spdk_interrupt_mode_is_enabled()) { 322 thread->in_interrupt = true; 323 rc = thread_interrupt_create(thread); 324 if (rc != 0) { 325 _free_thread(thread); 326 return NULL; 327 } 328 } 329 330 if (g_new_thread_fn) { 331 rc = g_new_thread_fn(thread); 332 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { 333 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); 334 } 335 336 if (rc != 0) { 337 _free_thread(thread); 338 return NULL; 339 } 340 341 thread->state = SPDK_THREAD_STATE_RUNNING; 342 343 return thread; 344 } 345 346 void 347 spdk_set_thread(struct spdk_thread *thread) 348 { 349 tls_thread = thread; 350 } 351 352 static void 353 thread_exit(struct spdk_thread *thread, uint64_t now) 354 { 355 struct spdk_poller *poller; 356 struct spdk_io_channel *ch; 357 358 if (now >= thread->exit_timeout_tsc) { 359 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", 360 thread->name); 361 goto exited; 362 } 363 364 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { 365 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 366 SPDK_INFOLOG(thread, 367 "thread %s still has active poller %s\n", 368 thread->name, poller->name); 369 return; 370 } 371 } 372 373 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { 374 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { 375 SPDK_INFOLOG(thread, 376 "thread %s still has active timed poller %s\n", 377 thread->name, poller->name); 378 return; 379 } 380 } 381 382 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { 383 SPDK_INFOLOG(thread, 384 "thread %s still has paused poller %s\n", 385 thread->name, poller->name); 386 return; 387 } 388 389 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 390 SPDK_INFOLOG(thread, 391 "thread %s still has channel for io_device %s\n", 392 thread->name, ch->dev->name); 393 return; 394 } 395 396 if (thread->pending_unregister_count > 0) { 397 SPDK_INFOLOG(thread, 398 "thread %s is still unregistering io_devices\n", 399 thread->name); 400 return; 401 } 402 403 exited: 404 thread->state = SPDK_THREAD_STATE_EXITED; 405 } 406 407 int 408 spdk_thread_exit(struct spdk_thread *thread) 409 { 410 SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name); 411 412 assert(tls_thread == thread); 413 414 if (thread->state >= SPDK_THREAD_STATE_EXITING) { 415 SPDK_INFOLOG(thread, 416 "thread %s is already exiting\n", 417 thread->name); 418 return 0; 419 } 420 421 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * 422 SPDK_THREAD_EXIT_TIMEOUT_SEC); 423 thread->state = SPDK_THREAD_STATE_EXITING; 424 return 0; 425 } 426 427 bool 428 spdk_thread_is_exited(struct spdk_thread *thread) 429 { 430 return thread->state == SPDK_THREAD_STATE_EXITED; 431 } 432 433 void 434 spdk_thread_destroy(struct spdk_thread *thread) 435 { 436 SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name); 437 438 assert(thread->state == SPDK_THREAD_STATE_EXITED); 439 440 if (tls_thread == thread) { 441 tls_thread = NULL; 442 } 443 444 _free_thread(thread); 445 } 446 447 void * 448 spdk_thread_get_ctx(struct spdk_thread *thread) 449 { 450 if (g_ctx_sz > 0) { 451 return thread->ctx; 452 } 453 454 return NULL; 455 } 456 457 struct spdk_cpuset * 458 spdk_thread_get_cpumask(struct spdk_thread *thread) 459 { 460 return &thread->cpumask; 461 } 462 463 int 464 spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) 465 { 466 struct spdk_thread *thread; 467 468 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { 469 SPDK_ERRLOG("Framework does not support reschedule operation.\n"); 470 assert(false); 471 return -ENOTSUP; 472 } 473 474 thread = spdk_get_thread(); 475 if (!thread) { 476 SPDK_ERRLOG("Called from non-SPDK thread\n"); 477 assert(false); 478 return -EINVAL; 479 } 480 481 spdk_cpuset_copy(&thread->cpumask, cpumask); 482 483 /* Invoke framework's reschedule operation. If this function is called multiple times 484 * in a single spdk_thread_poll() context, the last cpumask will be used in the 485 * reschedule operation. 486 */ 487 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); 488 489 return 0; 490 } 491 492 struct spdk_thread * 493 spdk_thread_get_from_ctx(void *ctx) 494 { 495 if (ctx == NULL) { 496 assert(false); 497 return NULL; 498 } 499 500 assert(g_ctx_sz > 0); 501 502 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); 503 } 504 505 static inline uint32_t 506 msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) 507 { 508 unsigned count, i; 509 void *messages[SPDK_MSG_BATCH_SIZE]; 510 uint64_t notify = 1; 511 int rc; 512 513 #ifdef DEBUG 514 /* 515 * spdk_ring_dequeue() fills messages and returns how many entries it wrote, 516 * so we will never actually read uninitialized data from events, but just to be sure 517 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 518 */ 519 memset(messages, 0, sizeof(messages)); 520 #endif 521 522 if (max_msgs > 0) { 523 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); 524 } else { 525 max_msgs = SPDK_MSG_BATCH_SIZE; 526 } 527 528 count = spdk_ring_dequeue(thread->messages, messages, max_msgs); 529 if (spdk_unlikely(thread->in_interrupt) && 530 spdk_ring_count(thread->messages) != 0) { 531 rc = write(thread->msg_fd, ¬ify, sizeof(notify)); 532 if (rc < 0) { 533 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 534 } 535 } 536 if (count == 0) { 537 return 0; 538 } 539 540 for (i = 0; i < count; i++) { 541 struct spdk_msg *msg = messages[i]; 542 543 assert(msg != NULL); 544 msg->fn(msg->arg); 545 546 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { 547 /* Insert the messages at the head. We want to re-use the hot 548 * ones. */ 549 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); 550 thread->msg_cache_count++; 551 } else { 552 spdk_mempool_put(g_spdk_msg_mempool, msg); 553 } 554 } 555 556 return count; 557 } 558 559 static void 560 poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) 561 { 562 struct spdk_poller *iter; 563 564 poller->next_run_tick = now + poller->period_ticks; 565 566 /* 567 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled 568 * run time. 569 */ 570 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { 571 if (iter->next_run_tick <= poller->next_run_tick) { 572 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); 573 return; 574 } 575 } 576 577 /* No earlier pollers were found, so this poller must be the new head */ 578 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); 579 } 580 581 static void 582 thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) 583 { 584 if (poller->period_ticks) { 585 poller_insert_timer(thread, poller, spdk_get_ticks()); 586 } else { 587 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 588 } 589 } 590 591 static inline void 592 thread_update_stats(struct spdk_thread *thread, uint64_t end, 593 uint64_t start, int rc) 594 { 595 if (rc == 0) { 596 /* Poller status idle */ 597 thread->stats.idle_tsc += end - start; 598 } else if (rc > 0) { 599 /* Poller status busy */ 600 thread->stats.busy_tsc += end - start; 601 } 602 /* Store end time to use it as start time of the next spdk_thread_poll(). */ 603 thread->tsc_last = end; 604 } 605 606 static int 607 thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 608 { 609 uint32_t msg_count; 610 struct spdk_poller *poller, *tmp; 611 spdk_msg_fn critical_msg; 612 int rc = 0; 613 614 thread->tsc_last = now; 615 616 critical_msg = thread->critical_msg; 617 if (spdk_unlikely(critical_msg != NULL)) { 618 critical_msg(NULL); 619 thread->critical_msg = NULL; 620 rc = 1; 621 } 622 623 msg_count = msg_queue_run_batch(thread, max_msgs); 624 if (msg_count) { 625 rc = 1; 626 } 627 628 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 629 active_pollers_head, tailq, tmp) { 630 int poller_rc; 631 632 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 633 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 634 free(poller); 635 continue; 636 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 637 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 638 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 639 poller->state = SPDK_POLLER_STATE_PAUSED; 640 continue; 641 } 642 643 poller->state = SPDK_POLLER_STATE_RUNNING; 644 poller_rc = poller->fn(poller->arg); 645 646 poller->run_count++; 647 if (poller_rc > 0) { 648 poller->busy_count++; 649 } 650 651 #ifdef DEBUG 652 if (poller_rc == -1) { 653 SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name); 654 } 655 #endif 656 657 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 658 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 659 free(poller); 660 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 661 poller->state = SPDK_POLLER_STATE_WAITING; 662 } 663 664 if (poller_rc > rc) { 665 rc = poller_rc; 666 } 667 } 668 669 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 670 int timer_rc = 0; 671 672 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 673 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 674 free(poller); 675 continue; 676 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { 677 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 678 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 679 poller->state = SPDK_POLLER_STATE_PAUSED; 680 continue; 681 } 682 683 if (now < poller->next_run_tick) { 684 break; 685 } 686 687 poller->state = SPDK_POLLER_STATE_RUNNING; 688 timer_rc = poller->fn(poller->arg); 689 690 poller->run_count++; 691 if (timer_rc > 0) { 692 poller->busy_count++; 693 } 694 695 #ifdef DEBUG 696 if (timer_rc == -1) { 697 SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name); 698 } 699 #endif 700 701 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 702 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 703 free(poller); 704 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { 705 poller->state = SPDK_POLLER_STATE_WAITING; 706 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 707 poller_insert_timer(thread, poller, now); 708 } 709 710 if (timer_rc > rc) { 711 rc = timer_rc; 712 } 713 } 714 715 return rc; 716 } 717 718 int 719 spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) 720 { 721 struct spdk_thread *orig_thread; 722 int rc; 723 uint64_t notify = 1; 724 725 orig_thread = _get_thread(); 726 tls_thread = thread; 727 728 if (now == 0) { 729 now = spdk_get_ticks(); 730 } 731 732 if (spdk_likely(!thread->in_interrupt)) { 733 rc = thread_poll(thread, max_msgs, now); 734 if (spdk_unlikely(thread->in_interrupt)) { 735 /* The thread transitioned to interrupt mode during the above poll. 736 * Poll it one more time in case that during the transition time 737 * there is msg received without notification. 738 */ 739 rc = thread_poll(thread, max_msgs, now); 740 } 741 } else { 742 /* Non-block wait on thread's fd_group */ 743 rc = spdk_fd_group_wait(thread->fgrp, 0); 744 if (spdk_unlikely(!thread->in_interrupt)) { 745 /* The thread transitioned to poll mode in a msg during the above processing. 746 * Clear msg_fd since thread messages will be polled directly in poll mode. 747 */ 748 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 749 if (rc < 0 && errno != EAGAIN) { 750 SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno)); 751 } 752 } 753 } 754 755 756 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { 757 thread_exit(thread, now); 758 } 759 760 thread_update_stats(thread, spdk_get_ticks(), now, rc); 761 762 tls_thread = orig_thread; 763 764 return rc; 765 } 766 767 uint64_t 768 spdk_thread_next_poller_expiration(struct spdk_thread *thread) 769 { 770 struct spdk_poller *poller; 771 772 poller = TAILQ_FIRST(&thread->timed_pollers); 773 if (poller) { 774 return poller->next_run_tick; 775 } 776 777 return 0; 778 } 779 780 int 781 spdk_thread_has_active_pollers(struct spdk_thread *thread) 782 { 783 return !TAILQ_EMPTY(&thread->active_pollers); 784 } 785 786 static bool 787 thread_has_unpaused_pollers(struct spdk_thread *thread) 788 { 789 if (TAILQ_EMPTY(&thread->active_pollers) && 790 TAILQ_EMPTY(&thread->timed_pollers)) { 791 return false; 792 } 793 794 return true; 795 } 796 797 bool 798 spdk_thread_has_pollers(struct spdk_thread *thread) 799 { 800 if (!thread_has_unpaused_pollers(thread) && 801 TAILQ_EMPTY(&thread->paused_pollers)) { 802 return false; 803 } 804 805 return true; 806 } 807 808 bool 809 spdk_thread_is_idle(struct spdk_thread *thread) 810 { 811 if (spdk_ring_count(thread->messages) || 812 thread_has_unpaused_pollers(thread) || 813 thread->critical_msg != NULL) { 814 return false; 815 } 816 817 return true; 818 } 819 820 uint32_t 821 spdk_thread_get_count(void) 822 { 823 /* 824 * Return cached value of the current thread count. We could acquire the 825 * lock and iterate through the TAILQ of threads to count them, but that 826 * count could still be invalidated after we release the lock. 827 */ 828 return g_thread_count; 829 } 830 831 struct spdk_thread * 832 spdk_get_thread(void) 833 { 834 return _get_thread(); 835 } 836 837 const char * 838 spdk_thread_get_name(const struct spdk_thread *thread) 839 { 840 return thread->name; 841 } 842 843 uint64_t 844 spdk_thread_get_id(const struct spdk_thread *thread) 845 { 846 return thread->id; 847 } 848 849 struct spdk_thread * 850 spdk_thread_get_by_id(uint64_t id) 851 { 852 struct spdk_thread *thread; 853 854 if (id == 0 || id >= g_thread_id) { 855 SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id); 856 return NULL; 857 } 858 pthread_mutex_lock(&g_devlist_mutex); 859 TAILQ_FOREACH(thread, &g_threads, tailq) { 860 if (thread->id == id) { 861 break; 862 } 863 } 864 pthread_mutex_unlock(&g_devlist_mutex); 865 return thread; 866 } 867 868 int 869 spdk_thread_get_stats(struct spdk_thread_stats *stats) 870 { 871 struct spdk_thread *thread; 872 873 thread = _get_thread(); 874 if (!thread) { 875 SPDK_ERRLOG("No thread allocated\n"); 876 return -EINVAL; 877 } 878 879 if (stats == NULL) { 880 return -EINVAL; 881 } 882 883 *stats = thread->stats; 884 885 return 0; 886 } 887 888 uint64_t 889 spdk_thread_get_last_tsc(struct spdk_thread *thread) 890 { 891 if (thread == NULL) { 892 thread = _get_thread(); 893 } 894 895 return thread->tsc_last; 896 } 897 898 static inline int 899 thread_send_msg_notification(const struct spdk_thread *target_thread) 900 { 901 uint64_t notify = 1; 902 int rc; 903 904 /* Not necessary to do notification if interrupt facility is not enabled */ 905 if (spdk_likely(!spdk_interrupt_mode_is_enabled())) { 906 return 0; 907 } 908 909 /* When each spdk_thread can switch between poll and interrupt mode dynamically, 910 * after sending thread msg, it is necessary to check whether target thread runs in 911 * interrupt mode and then decide whether do event notification. 912 */ 913 if (spdk_unlikely(target_thread->in_interrupt)) { 914 rc = write(target_thread->msg_fd, ¬ify, sizeof(notify)); 915 if (rc < 0) { 916 SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno)); 917 return -EIO; 918 } 919 } 920 921 return 0; 922 } 923 924 int 925 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) 926 { 927 struct spdk_thread *local_thread; 928 struct spdk_msg *msg; 929 int rc; 930 931 assert(thread != NULL); 932 933 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 934 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); 935 return -EIO; 936 } 937 938 local_thread = _get_thread(); 939 940 msg = NULL; 941 if (local_thread != NULL) { 942 if (local_thread->msg_cache_count > 0) { 943 msg = SLIST_FIRST(&local_thread->msg_cache); 944 assert(msg != NULL); 945 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); 946 local_thread->msg_cache_count--; 947 } 948 } 949 950 if (msg == NULL) { 951 msg = spdk_mempool_get(g_spdk_msg_mempool); 952 if (!msg) { 953 SPDK_ERRLOG("msg could not be allocated\n"); 954 return -ENOMEM; 955 } 956 } 957 958 msg->fn = fn; 959 msg->arg = ctx; 960 961 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); 962 if (rc != 1) { 963 SPDK_ERRLOG("msg could not be enqueued\n"); 964 spdk_mempool_put(g_spdk_msg_mempool, msg); 965 return -EIO; 966 } 967 968 return thread_send_msg_notification(thread); 969 } 970 971 int 972 spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) 973 { 974 spdk_msg_fn expected = NULL; 975 976 if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, 977 __ATOMIC_SEQ_CST)) { 978 return -EIO; 979 } 980 981 return thread_send_msg_notification(thread); 982 } 983 984 #ifdef __linux__ 985 static int 986 interrupt_timerfd_process(void *arg) 987 { 988 struct spdk_poller *poller = arg; 989 uint64_t exp; 990 int rc; 991 992 /* clear the level of interval timer */ 993 rc = read(poller->interruptfd, &exp, sizeof(exp)); 994 if (rc < 0) { 995 if (rc == -EAGAIN) { 996 return 0; 997 } 998 999 return rc; 1000 } 1001 1002 return poller->fn(poller->arg); 1003 } 1004 1005 static int 1006 period_poller_interrupt_init(struct spdk_poller *poller) 1007 { 1008 struct spdk_fd_group *fgrp = poller->thread->fgrp; 1009 int timerfd; 1010 int rc; 1011 1012 SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name); 1013 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 1014 if (timerfd < 0) { 1015 return -errno; 1016 } 1017 1018 rc = spdk_fd_group_add(fgrp, timerfd, 1019 interrupt_timerfd_process, poller); 1020 if (rc < 0) { 1021 close(timerfd); 1022 return rc; 1023 } 1024 1025 poller->interruptfd = timerfd; 1026 return 0; 1027 } 1028 1029 static void 1030 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1031 { 1032 int timerfd = poller->interruptfd; 1033 uint64_t now_tick = spdk_get_ticks(); 1034 uint64_t ticks = spdk_get_ticks_hz(); 1035 int ret; 1036 struct itimerspec new_tv = {}; 1037 struct itimerspec old_tv = {}; 1038 1039 assert(poller->period_ticks != 0); 1040 assert(timerfd >= 0); 1041 1042 SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name, 1043 interrupt_mode ? "interrupt" : "poll"); 1044 1045 if (interrupt_mode) { 1046 /* Set repeated timer expiration */ 1047 new_tv.it_interval.tv_sec = poller->period_ticks / ticks; 1048 new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks; 1049 1050 /* Update next timer expiration */ 1051 if (poller->next_run_tick == 0) { 1052 poller->next_run_tick = now_tick + poller->period_ticks; 1053 } else if (poller->next_run_tick < now_tick) { 1054 poller->next_run_tick = now_tick; 1055 } 1056 1057 new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks; 1058 new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks; 1059 1060 ret = timerfd_settime(timerfd, 0, &new_tv, NULL); 1061 if (ret < 0) { 1062 SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno); 1063 assert(false); 1064 } 1065 } else { 1066 /* Disarm the timer */ 1067 ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv); 1068 if (ret < 0) { 1069 /* timerfd_settime's failure indicates that the timerfd is in error */ 1070 SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno); 1071 assert(false); 1072 } 1073 1074 /* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be 1075 * now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC 1076 */ 1077 now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \ 1078 (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC; 1079 TAILQ_REMOVE(&poller->thread->timed_pollers, poller, tailq); 1080 poller_insert_timer(poller->thread, poller, now_tick); 1081 } 1082 } 1083 1084 static void 1085 poller_interrupt_fini(struct spdk_poller *poller) 1086 { 1087 SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name); 1088 assert(poller->interruptfd >= 0); 1089 spdk_fd_group_remove(poller->thread->fgrp, poller->interruptfd); 1090 close(poller->interruptfd); 1091 poller->interruptfd = -1; 1092 } 1093 1094 static int 1095 busy_poller_interrupt_init(struct spdk_poller *poller) 1096 { 1097 int busy_efd; 1098 int rc; 1099 1100 SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name); 1101 busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1102 if (busy_efd < 0) { 1103 SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name); 1104 return -errno; 1105 } 1106 1107 rc = spdk_fd_group_add(poller->thread->fgrp, busy_efd, poller->fn, poller->arg); 1108 if (rc < 0) { 1109 close(busy_efd); 1110 return rc; 1111 } 1112 1113 poller->interruptfd = busy_efd; 1114 return 0; 1115 } 1116 1117 static void 1118 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1119 { 1120 int busy_efd = poller->interruptfd; 1121 uint64_t notify = 1; 1122 int rc __attribute__((unused)); 1123 1124 assert(busy_efd >= 0); 1125 1126 if (interrupt_mode) { 1127 /* Write without read on eventfd will get it repeatedly triggered. */ 1128 if (write(busy_efd, ¬ify, sizeof(notify)) < 0) { 1129 SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name); 1130 } 1131 } else { 1132 /* Read on eventfd will clear its level triggering. */ 1133 rc = read(busy_efd, ¬ify, sizeof(notify)); 1134 } 1135 } 1136 1137 #else 1138 1139 static int 1140 period_poller_interrupt_init(struct spdk_poller *poller) 1141 { 1142 return -ENOTSUP; 1143 } 1144 1145 static void 1146 period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1147 { 1148 } 1149 1150 static void 1151 poller_interrupt_fini(struct spdk_poller *poller) 1152 { 1153 } 1154 1155 static int 1156 busy_poller_interrupt_init(struct spdk_poller *poller) 1157 { 1158 return -ENOTSUP; 1159 } 1160 1161 static void 1162 busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 1163 { 1164 } 1165 1166 #endif 1167 1168 void 1169 spdk_poller_register_interrupt(struct spdk_poller *poller, 1170 spdk_poller_set_interrupt_mode_cb cb_fn, 1171 void *cb_arg) 1172 { 1173 assert(poller != NULL); 1174 assert(cb_fn != NULL); 1175 assert(spdk_get_thread() == poller->thread); 1176 1177 if (!spdk_interrupt_mode_is_enabled()) { 1178 return; 1179 } 1180 1181 /* when a poller is created we don't know if the user is ever going to 1182 * enable interrupts on it by calling this function, so the poller 1183 * registration function has to immediately create a interruptfd. 1184 * When this function does get called by user, we have to then destroy 1185 * that interruptfd. 1186 */ 1187 if (poller->set_intr_cb_fn && poller->interruptfd >= 0) { 1188 poller_interrupt_fini(poller); 1189 } 1190 1191 poller->set_intr_cb_fn = cb_fn; 1192 poller->set_intr_cb_arg = cb_arg; 1193 1194 /* Set poller into interrupt mode if thread is in interrupt. */ 1195 if (poller->thread->in_interrupt) { 1196 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true); 1197 } 1198 } 1199 1200 static struct spdk_poller * 1201 poller_register(spdk_poller_fn fn, 1202 void *arg, 1203 uint64_t period_microseconds, 1204 const char *name) 1205 { 1206 struct spdk_thread *thread; 1207 struct spdk_poller *poller; 1208 uint64_t quotient, remainder, ticks; 1209 1210 thread = spdk_get_thread(); 1211 if (!thread) { 1212 assert(false); 1213 return NULL; 1214 } 1215 1216 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1217 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 1218 return NULL; 1219 } 1220 1221 poller = calloc(1, sizeof(*poller)); 1222 if (poller == NULL) { 1223 SPDK_ERRLOG("Poller memory allocation failed\n"); 1224 return NULL; 1225 } 1226 1227 if (name) { 1228 snprintf(poller->name, sizeof(poller->name), "%s", name); 1229 } else { 1230 snprintf(poller->name, sizeof(poller->name), "%p", fn); 1231 } 1232 1233 poller->state = SPDK_POLLER_STATE_WAITING; 1234 poller->fn = fn; 1235 poller->arg = arg; 1236 poller->thread = thread; 1237 poller->interruptfd = -1; 1238 1239 if (period_microseconds) { 1240 quotient = period_microseconds / SPDK_SEC_TO_USEC; 1241 remainder = period_microseconds % SPDK_SEC_TO_USEC; 1242 ticks = spdk_get_ticks_hz(); 1243 1244 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; 1245 } else { 1246 poller->period_ticks = 0; 1247 } 1248 1249 if (spdk_interrupt_mode_is_enabled()) { 1250 int rc; 1251 1252 if (period_microseconds) { 1253 rc = period_poller_interrupt_init(poller); 1254 if (rc < 0) { 1255 SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc)); 1256 free(poller); 1257 return NULL; 1258 } 1259 1260 spdk_poller_register_interrupt(poller, period_poller_set_interrupt_mode, NULL); 1261 } else { 1262 /* If the poller doesn't have a period, create interruptfd that's always 1263 * busy automatically when runnning in interrupt mode. 1264 */ 1265 rc = busy_poller_interrupt_init(poller); 1266 if (rc > 0) { 1267 SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc)); 1268 free(poller); 1269 return NULL; 1270 } 1271 1272 spdk_poller_register_interrupt(poller, busy_poller_set_interrupt_mode, NULL); 1273 } 1274 } 1275 1276 thread_insert_poller(thread, poller); 1277 1278 return poller; 1279 } 1280 1281 struct spdk_poller * 1282 spdk_poller_register(spdk_poller_fn fn, 1283 void *arg, 1284 uint64_t period_microseconds) 1285 { 1286 return poller_register(fn, arg, period_microseconds, NULL); 1287 } 1288 1289 struct spdk_poller * 1290 spdk_poller_register_named(spdk_poller_fn fn, 1291 void *arg, 1292 uint64_t period_microseconds, 1293 const char *name) 1294 { 1295 return poller_register(fn, arg, period_microseconds, name); 1296 } 1297 1298 void 1299 spdk_poller_unregister(struct spdk_poller **ppoller) 1300 { 1301 struct spdk_thread *thread; 1302 struct spdk_poller *poller; 1303 1304 poller = *ppoller; 1305 if (poller == NULL) { 1306 return; 1307 } 1308 1309 *ppoller = NULL; 1310 1311 thread = spdk_get_thread(); 1312 if (!thread) { 1313 assert(false); 1314 return; 1315 } 1316 1317 if (poller->thread != thread) { 1318 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); 1319 assert(false); 1320 return; 1321 } 1322 1323 if (spdk_interrupt_mode_is_enabled() && poller->interruptfd >= 0) { 1324 poller_interrupt_fini(poller); 1325 } 1326 1327 /* If the poller was paused, put it on the active_pollers list so that 1328 * its unregistration can be processed by spdk_thread_poll(). 1329 */ 1330 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1331 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1332 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); 1333 poller->period_ticks = 0; 1334 } 1335 1336 /* Simply set the state to unregistered. The poller will get cleaned up 1337 * in a subsequent call to spdk_thread_poll(). 1338 */ 1339 poller->state = SPDK_POLLER_STATE_UNREGISTERED; 1340 } 1341 1342 void 1343 spdk_poller_pause(struct spdk_poller *poller) 1344 { 1345 struct spdk_thread *thread; 1346 1347 if (poller->state == SPDK_POLLER_STATE_PAUSED || 1348 poller->state == SPDK_POLLER_STATE_PAUSING) { 1349 return; 1350 } 1351 1352 thread = spdk_get_thread(); 1353 if (!thread) { 1354 assert(false); 1355 return; 1356 } 1357 1358 /* If a poller is paused from within itself, we can immediately move it 1359 * on the paused_pollers list. Otherwise we just set its state to 1360 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It 1361 * allows a poller to be paused from another one's context without 1362 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. 1363 */ 1364 if (poller->state != SPDK_POLLER_STATE_RUNNING) { 1365 poller->state = SPDK_POLLER_STATE_PAUSING; 1366 } else { 1367 if (poller->period_ticks > 0) { 1368 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); 1369 } else { 1370 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 1371 } 1372 1373 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); 1374 poller->state = SPDK_POLLER_STATE_PAUSED; 1375 } 1376 } 1377 1378 void 1379 spdk_poller_resume(struct spdk_poller *poller) 1380 { 1381 struct spdk_thread *thread; 1382 1383 if (poller->state != SPDK_POLLER_STATE_PAUSED && 1384 poller->state != SPDK_POLLER_STATE_PAUSING) { 1385 return; 1386 } 1387 1388 thread = spdk_get_thread(); 1389 if (!thread) { 1390 assert(false); 1391 return; 1392 } 1393 1394 /* If a poller is paused it has to be removed from the paused pollers 1395 * list and put on the active / timer list depending on its 1396 * period_ticks. If a poller is still in the process of being paused, 1397 * we just need to flip its state back to waiting, as it's already on 1398 * the appropriate list. 1399 */ 1400 if (poller->state == SPDK_POLLER_STATE_PAUSED) { 1401 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); 1402 thread_insert_poller(thread, poller); 1403 } 1404 1405 poller->state = SPDK_POLLER_STATE_WAITING; 1406 } 1407 1408 const char * 1409 spdk_poller_state_str(enum spdk_poller_state state) 1410 { 1411 switch (state) { 1412 case SPDK_POLLER_STATE_WAITING: 1413 return "waiting"; 1414 case SPDK_POLLER_STATE_RUNNING: 1415 return "running"; 1416 case SPDK_POLLER_STATE_UNREGISTERED: 1417 return "unregistered"; 1418 case SPDK_POLLER_STATE_PAUSING: 1419 return "pausing"; 1420 case SPDK_POLLER_STATE_PAUSED: 1421 return "paused"; 1422 default: 1423 return NULL; 1424 } 1425 } 1426 1427 struct call_thread { 1428 struct spdk_thread *cur_thread; 1429 spdk_msg_fn fn; 1430 void *ctx; 1431 1432 struct spdk_thread *orig_thread; 1433 spdk_msg_fn cpl; 1434 }; 1435 1436 static void 1437 _on_thread(void *ctx) 1438 { 1439 struct call_thread *ct = ctx; 1440 int rc __attribute__((unused)); 1441 1442 ct->fn(ct->ctx); 1443 1444 pthread_mutex_lock(&g_devlist_mutex); 1445 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 1446 pthread_mutex_unlock(&g_devlist_mutex); 1447 1448 if (!ct->cur_thread) { 1449 SPDK_DEBUGLOG(thread, "Completed thread iteration\n"); 1450 1451 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 1452 free(ctx); 1453 } else { 1454 SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n", 1455 ct->cur_thread->name); 1456 1457 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); 1458 } 1459 assert(rc == 0); 1460 } 1461 1462 void 1463 spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) 1464 { 1465 struct call_thread *ct; 1466 struct spdk_thread *thread; 1467 int rc __attribute__((unused)); 1468 1469 ct = calloc(1, sizeof(*ct)); 1470 if (!ct) { 1471 SPDK_ERRLOG("Unable to perform thread iteration\n"); 1472 cpl(ctx); 1473 return; 1474 } 1475 1476 ct->fn = fn; 1477 ct->ctx = ctx; 1478 ct->cpl = cpl; 1479 1480 thread = _get_thread(); 1481 if (!thread) { 1482 SPDK_ERRLOG("No thread allocated\n"); 1483 free(ct); 1484 cpl(ctx); 1485 return; 1486 } 1487 ct->orig_thread = thread; 1488 1489 pthread_mutex_lock(&g_devlist_mutex); 1490 ct->cur_thread = TAILQ_FIRST(&g_threads); 1491 pthread_mutex_unlock(&g_devlist_mutex); 1492 1493 SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n", 1494 ct->orig_thread->name); 1495 1496 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); 1497 assert(rc == 0); 1498 } 1499 1500 static inline void 1501 poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode) 1502 { 1503 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 1504 return; 1505 } 1506 1507 if (!poller->set_intr_cb_fn) { 1508 SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name); 1509 assert(false); 1510 return; 1511 } 1512 1513 poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode); 1514 } 1515 1516 void 1517 spdk_thread_set_interrupt_mode(bool enable_interrupt) 1518 { 1519 struct spdk_thread *thread = _get_thread(); 1520 struct spdk_poller *poller, *tmp; 1521 1522 assert(thread); 1523 assert(spdk_interrupt_mode_is_enabled()); 1524 1525 if (thread->in_interrupt == enable_interrupt) { 1526 return; 1527 } 1528 1529 /* Set pollers to expected mode */ 1530 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { 1531 poller_set_interrupt_mode(poller, enable_interrupt); 1532 } 1533 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { 1534 poller_set_interrupt_mode(poller, enable_interrupt); 1535 } 1536 /* All paused pollers will go to work in interrupt mode */ 1537 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) { 1538 poller_set_interrupt_mode(poller, enable_interrupt); 1539 } 1540 1541 thread->in_interrupt = enable_interrupt; 1542 return; 1543 } 1544 1545 void 1546 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 1547 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, 1548 const char *name) 1549 { 1550 struct io_device *dev, *tmp; 1551 struct spdk_thread *thread; 1552 1553 assert(io_device != NULL); 1554 assert(create_cb != NULL); 1555 assert(destroy_cb != NULL); 1556 1557 thread = spdk_get_thread(); 1558 if (!thread) { 1559 SPDK_ERRLOG("called from non-SPDK thread\n"); 1560 assert(false); 1561 return; 1562 } 1563 1564 dev = calloc(1, sizeof(struct io_device)); 1565 if (dev == NULL) { 1566 SPDK_ERRLOG("could not allocate io_device\n"); 1567 return; 1568 } 1569 1570 dev->io_device = io_device; 1571 if (name) { 1572 snprintf(dev->name, sizeof(dev->name), "%s", name); 1573 } else { 1574 snprintf(dev->name, sizeof(dev->name), "%p", dev); 1575 } 1576 dev->create_cb = create_cb; 1577 dev->destroy_cb = destroy_cb; 1578 dev->unregister_cb = NULL; 1579 dev->ctx_size = ctx_size; 1580 dev->for_each_count = 0; 1581 dev->unregistered = false; 1582 dev->refcnt = 0; 1583 1584 SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n", 1585 dev->name, dev->io_device, thread->name); 1586 1587 pthread_mutex_lock(&g_devlist_mutex); 1588 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 1589 if (tmp->io_device == io_device) { 1590 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", 1591 io_device, tmp->name, dev->name); 1592 free(dev); 1593 pthread_mutex_unlock(&g_devlist_mutex); 1594 return; 1595 } 1596 } 1597 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 1598 pthread_mutex_unlock(&g_devlist_mutex); 1599 } 1600 1601 static void 1602 _finish_unregister(void *arg) 1603 { 1604 struct io_device *dev = arg; 1605 struct spdk_thread *thread; 1606 1607 thread = spdk_get_thread(); 1608 assert(thread == dev->unregister_thread); 1609 1610 SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n", 1611 dev->name, dev->io_device, thread->name); 1612 1613 assert(thread->pending_unregister_count > 0); 1614 thread->pending_unregister_count--; 1615 1616 dev->unregister_cb(dev->io_device); 1617 free(dev); 1618 } 1619 1620 static void 1621 io_device_free(struct io_device *dev) 1622 { 1623 int rc __attribute__((unused)); 1624 1625 if (dev->unregister_cb == NULL) { 1626 free(dev); 1627 } else { 1628 assert(dev->unregister_thread != NULL); 1629 SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n", 1630 dev->name, dev->io_device, dev->unregister_thread->name); 1631 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 1632 assert(rc == 0); 1633 } 1634 } 1635 1636 void 1637 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 1638 { 1639 struct io_device *dev; 1640 uint32_t refcnt; 1641 struct spdk_thread *thread; 1642 1643 thread = spdk_get_thread(); 1644 if (!thread) { 1645 SPDK_ERRLOG("called from non-SPDK thread\n"); 1646 assert(false); 1647 return; 1648 } 1649 1650 pthread_mutex_lock(&g_devlist_mutex); 1651 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1652 if (dev->io_device == io_device) { 1653 break; 1654 } 1655 } 1656 1657 if (!dev) { 1658 SPDK_ERRLOG("io_device %p not found\n", io_device); 1659 assert(false); 1660 pthread_mutex_unlock(&g_devlist_mutex); 1661 return; 1662 } 1663 1664 if (dev->for_each_count > 0) { 1665 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", 1666 dev->name, io_device, dev->for_each_count); 1667 pthread_mutex_unlock(&g_devlist_mutex); 1668 return; 1669 } 1670 1671 dev->unregister_cb = unregister_cb; 1672 dev->unregistered = true; 1673 TAILQ_REMOVE(&g_io_devices, dev, tailq); 1674 refcnt = dev->refcnt; 1675 dev->unregister_thread = thread; 1676 pthread_mutex_unlock(&g_devlist_mutex); 1677 1678 SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n", 1679 dev->name, dev->io_device, thread->name); 1680 1681 if (unregister_cb) { 1682 thread->pending_unregister_count++; 1683 } 1684 1685 if (refcnt > 0) { 1686 /* defer deletion */ 1687 return; 1688 } 1689 1690 io_device_free(dev); 1691 } 1692 1693 const char * 1694 spdk_io_device_get_name(struct io_device *dev) 1695 { 1696 return dev->name; 1697 } 1698 1699 struct spdk_io_channel * 1700 spdk_get_io_channel(void *io_device) 1701 { 1702 struct spdk_io_channel *ch; 1703 struct spdk_thread *thread; 1704 struct io_device *dev; 1705 int rc; 1706 1707 pthread_mutex_lock(&g_devlist_mutex); 1708 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 1709 if (dev->io_device == io_device) { 1710 break; 1711 } 1712 } 1713 if (dev == NULL) { 1714 SPDK_ERRLOG("could not find io_device %p\n", io_device); 1715 pthread_mutex_unlock(&g_devlist_mutex); 1716 return NULL; 1717 } 1718 1719 thread = _get_thread(); 1720 if (!thread) { 1721 SPDK_ERRLOG("No thread allocated\n"); 1722 pthread_mutex_unlock(&g_devlist_mutex); 1723 return NULL; 1724 } 1725 1726 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { 1727 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); 1728 pthread_mutex_unlock(&g_devlist_mutex); 1729 return NULL; 1730 } 1731 1732 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1733 if (ch->dev == dev) { 1734 ch->ref++; 1735 1736 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1737 ch, dev->name, dev->io_device, thread->name, ch->ref); 1738 1739 /* 1740 * An I/O channel already exists for this device on this 1741 * thread, so return it. 1742 */ 1743 pthread_mutex_unlock(&g_devlist_mutex); 1744 return ch; 1745 } 1746 } 1747 1748 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 1749 if (ch == NULL) { 1750 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 1751 pthread_mutex_unlock(&g_devlist_mutex); 1752 return NULL; 1753 } 1754 1755 ch->dev = dev; 1756 ch->destroy_cb = dev->destroy_cb; 1757 ch->thread = thread; 1758 ch->ref = 1; 1759 ch->destroy_ref = 0; 1760 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 1761 1762 SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1763 ch, dev->name, dev->io_device, thread->name, ch->ref); 1764 1765 dev->refcnt++; 1766 1767 pthread_mutex_unlock(&g_devlist_mutex); 1768 1769 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 1770 if (rc != 0) { 1771 pthread_mutex_lock(&g_devlist_mutex); 1772 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1773 dev->refcnt--; 1774 free(ch); 1775 pthread_mutex_unlock(&g_devlist_mutex); 1776 return NULL; 1777 } 1778 1779 return ch; 1780 } 1781 1782 static void 1783 put_io_channel(void *arg) 1784 { 1785 struct spdk_io_channel *ch = arg; 1786 bool do_remove_dev = true; 1787 struct spdk_thread *thread; 1788 1789 thread = spdk_get_thread(); 1790 if (!thread) { 1791 SPDK_ERRLOG("called from non-SPDK thread\n"); 1792 assert(false); 1793 return; 1794 } 1795 1796 SPDK_DEBUGLOG(thread, 1797 "Releasing io_channel %p for io_device %s (%p) on thread %s\n", 1798 ch, ch->dev->name, ch->dev->io_device, thread->name); 1799 1800 assert(ch->thread == thread); 1801 1802 ch->destroy_ref--; 1803 1804 if (ch->ref > 0 || ch->destroy_ref > 0) { 1805 /* 1806 * Another reference to the associated io_device was requested 1807 * after this message was sent but before it had a chance to 1808 * execute. 1809 */ 1810 return; 1811 } 1812 1813 pthread_mutex_lock(&g_devlist_mutex); 1814 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 1815 pthread_mutex_unlock(&g_devlist_mutex); 1816 1817 /* Don't hold the devlist mutex while the destroy_cb is called. */ 1818 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 1819 1820 pthread_mutex_lock(&g_devlist_mutex); 1821 ch->dev->refcnt--; 1822 1823 if (!ch->dev->unregistered) { 1824 do_remove_dev = false; 1825 } 1826 1827 if (ch->dev->refcnt > 0) { 1828 do_remove_dev = false; 1829 } 1830 1831 pthread_mutex_unlock(&g_devlist_mutex); 1832 1833 if (do_remove_dev) { 1834 io_device_free(ch->dev); 1835 } 1836 free(ch); 1837 } 1838 1839 void 1840 spdk_put_io_channel(struct spdk_io_channel *ch) 1841 { 1842 struct spdk_thread *thread; 1843 int rc __attribute__((unused)); 1844 1845 thread = spdk_get_thread(); 1846 if (!thread) { 1847 SPDK_ERRLOG("called from non-SPDK thread\n"); 1848 assert(false); 1849 return; 1850 } 1851 1852 if (ch->thread != thread) { 1853 SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); 1854 assert(false); 1855 return; 1856 } 1857 1858 SPDK_DEBUGLOG(thread, 1859 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", 1860 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); 1861 1862 ch->ref--; 1863 1864 if (ch->ref == 0) { 1865 ch->destroy_ref++; 1866 rc = spdk_thread_send_msg(thread, put_io_channel, ch); 1867 assert(rc == 0); 1868 } 1869 } 1870 1871 struct spdk_io_channel * 1872 spdk_io_channel_from_ctx(void *ctx) 1873 { 1874 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 1875 } 1876 1877 struct spdk_thread * 1878 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 1879 { 1880 return ch->thread; 1881 } 1882 1883 void * 1884 spdk_io_channel_get_io_device(struct spdk_io_channel *ch) 1885 { 1886 return ch->dev->io_device; 1887 } 1888 1889 struct spdk_io_channel_iter { 1890 void *io_device; 1891 struct io_device *dev; 1892 spdk_channel_msg fn; 1893 int status; 1894 void *ctx; 1895 struct spdk_io_channel *ch; 1896 1897 struct spdk_thread *cur_thread; 1898 1899 struct spdk_thread *orig_thread; 1900 spdk_channel_for_each_cpl cpl; 1901 }; 1902 1903 void * 1904 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 1905 { 1906 return i->io_device; 1907 } 1908 1909 struct spdk_io_channel * 1910 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 1911 { 1912 return i->ch; 1913 } 1914 1915 void * 1916 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 1917 { 1918 return i->ctx; 1919 } 1920 1921 static void 1922 _call_completion(void *ctx) 1923 { 1924 struct spdk_io_channel_iter *i = ctx; 1925 1926 if (i->cpl != NULL) { 1927 i->cpl(i, i->status); 1928 } 1929 free(i); 1930 } 1931 1932 static void 1933 _call_channel(void *ctx) 1934 { 1935 struct spdk_io_channel_iter *i = ctx; 1936 struct spdk_io_channel *ch; 1937 1938 /* 1939 * It is possible that the channel was deleted before this 1940 * message had a chance to execute. If so, skip calling 1941 * the fn() on this thread. 1942 */ 1943 pthread_mutex_lock(&g_devlist_mutex); 1944 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 1945 if (ch->dev->io_device == i->io_device) { 1946 break; 1947 } 1948 } 1949 pthread_mutex_unlock(&g_devlist_mutex); 1950 1951 if (ch) { 1952 i->fn(i); 1953 } else { 1954 spdk_for_each_channel_continue(i, 0); 1955 } 1956 } 1957 1958 void 1959 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 1960 spdk_channel_for_each_cpl cpl) 1961 { 1962 struct spdk_thread *thread; 1963 struct spdk_io_channel *ch; 1964 struct spdk_io_channel_iter *i; 1965 int rc __attribute__((unused)); 1966 1967 i = calloc(1, sizeof(*i)); 1968 if (!i) { 1969 SPDK_ERRLOG("Unable to allocate iterator\n"); 1970 return; 1971 } 1972 1973 i->io_device = io_device; 1974 i->fn = fn; 1975 i->ctx = ctx; 1976 i->cpl = cpl; 1977 1978 pthread_mutex_lock(&g_devlist_mutex); 1979 i->orig_thread = _get_thread(); 1980 1981 TAILQ_FOREACH(thread, &g_threads, tailq) { 1982 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 1983 if (ch->dev->io_device == io_device) { 1984 ch->dev->for_each_count++; 1985 i->dev = ch->dev; 1986 i->cur_thread = thread; 1987 i->ch = ch; 1988 pthread_mutex_unlock(&g_devlist_mutex); 1989 rc = spdk_thread_send_msg(thread, _call_channel, i); 1990 assert(rc == 0); 1991 return; 1992 } 1993 } 1994 } 1995 1996 pthread_mutex_unlock(&g_devlist_mutex); 1997 1998 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 1999 assert(rc == 0); 2000 } 2001 2002 void 2003 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 2004 { 2005 struct spdk_thread *thread; 2006 struct spdk_io_channel *ch; 2007 int rc __attribute__((unused)); 2008 2009 assert(i->cur_thread == spdk_get_thread()); 2010 2011 i->status = status; 2012 2013 pthread_mutex_lock(&g_devlist_mutex); 2014 if (status) { 2015 goto end; 2016 } 2017 thread = TAILQ_NEXT(i->cur_thread, tailq); 2018 while (thread) { 2019 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 2020 if (ch->dev->io_device == i->io_device) { 2021 i->cur_thread = thread; 2022 i->ch = ch; 2023 pthread_mutex_unlock(&g_devlist_mutex); 2024 rc = spdk_thread_send_msg(thread, _call_channel, i); 2025 assert(rc == 0); 2026 return; 2027 } 2028 } 2029 thread = TAILQ_NEXT(thread, tailq); 2030 } 2031 2032 end: 2033 i->dev->for_each_count--; 2034 i->ch = NULL; 2035 pthread_mutex_unlock(&g_devlist_mutex); 2036 2037 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); 2038 assert(rc == 0); 2039 } 2040 2041 struct spdk_interrupt { 2042 int efd; 2043 struct spdk_thread *thread; 2044 char name[SPDK_MAX_POLLER_NAME_LEN + 1]; 2045 }; 2046 2047 static void 2048 thread_interrupt_destroy(struct spdk_thread *thread) 2049 { 2050 struct spdk_fd_group *fgrp = thread->fgrp; 2051 2052 SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); 2053 2054 if (thread->msg_fd < 0) { 2055 return; 2056 } 2057 2058 spdk_fd_group_remove(fgrp, thread->msg_fd); 2059 close(thread->msg_fd); 2060 thread->msg_fd = -1; 2061 2062 spdk_fd_group_destroy(fgrp); 2063 thread->fgrp = NULL; 2064 } 2065 2066 #ifdef __linux__ 2067 static int 2068 thread_interrupt_msg_process(void *arg) 2069 { 2070 struct spdk_thread *thread = arg; 2071 uint32_t msg_count; 2072 spdk_msg_fn critical_msg; 2073 int rc = 0; 2074 uint64_t notify = 1; 2075 2076 assert(spdk_interrupt_mode_is_enabled()); 2077 2078 /* There may be race between msg_acknowledge and another producer's msg_notify, 2079 * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. 2080 * This can avoid msg notification missing. 2081 */ 2082 rc = read(thread->msg_fd, ¬ify, sizeof(notify)); 2083 if (rc < 0 && errno != EAGAIN) { 2084 SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno)); 2085 } 2086 2087 critical_msg = thread->critical_msg; 2088 if (spdk_unlikely(critical_msg != NULL)) { 2089 critical_msg(NULL); 2090 thread->critical_msg = NULL; 2091 rc = 1; 2092 } 2093 2094 msg_count = msg_queue_run_batch(thread, 0); 2095 if (msg_count) { 2096 rc = 1; 2097 } 2098 2099 return rc; 2100 } 2101 2102 static int 2103 thread_interrupt_create(struct spdk_thread *thread) 2104 { 2105 int rc; 2106 2107 SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); 2108 2109 rc = spdk_fd_group_create(&thread->fgrp); 2110 if (rc) { 2111 return rc; 2112 } 2113 2114 thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2115 if (thread->msg_fd < 0) { 2116 rc = -errno; 2117 spdk_fd_group_destroy(thread->fgrp); 2118 thread->fgrp = NULL; 2119 2120 return rc; 2121 } 2122 2123 return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); 2124 } 2125 #else 2126 static int 2127 thread_interrupt_create(struct spdk_thread *thread) 2128 { 2129 return -ENOTSUP; 2130 } 2131 #endif 2132 2133 struct spdk_interrupt * 2134 spdk_interrupt_register(int efd, spdk_interrupt_fn fn, 2135 void *arg, const char *name) 2136 { 2137 struct spdk_thread *thread; 2138 struct spdk_interrupt *intr; 2139 2140 thread = spdk_get_thread(); 2141 if (!thread) { 2142 assert(false); 2143 return NULL; 2144 } 2145 2146 if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { 2147 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); 2148 return NULL; 2149 } 2150 2151 if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { 2152 return NULL; 2153 } 2154 2155 intr = calloc(1, sizeof(*intr)); 2156 if (intr == NULL) { 2157 SPDK_ERRLOG("Interrupt handler allocation failed\n"); 2158 return NULL; 2159 } 2160 2161 if (name) { 2162 snprintf(intr->name, sizeof(intr->name), "%s", name); 2163 } else { 2164 snprintf(intr->name, sizeof(intr->name), "%p", fn); 2165 } 2166 2167 intr->efd = efd; 2168 intr->thread = thread; 2169 2170 return intr; 2171 } 2172 2173 void 2174 spdk_interrupt_unregister(struct spdk_interrupt **pintr) 2175 { 2176 struct spdk_thread *thread; 2177 struct spdk_interrupt *intr; 2178 2179 intr = *pintr; 2180 if (intr == NULL) { 2181 return; 2182 } 2183 2184 *pintr = NULL; 2185 2186 thread = spdk_get_thread(); 2187 if (!thread) { 2188 assert(false); 2189 return; 2190 } 2191 2192 if (intr->thread != thread) { 2193 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2194 assert(false); 2195 return; 2196 } 2197 2198 spdk_fd_group_remove(thread->fgrp, intr->efd); 2199 free(intr); 2200 } 2201 2202 int 2203 spdk_interrupt_set_event_types(struct spdk_interrupt *intr, 2204 enum spdk_interrupt_event_types event_types) 2205 { 2206 struct spdk_thread *thread; 2207 2208 thread = spdk_get_thread(); 2209 if (!thread) { 2210 assert(false); 2211 return -EINVAL; 2212 } 2213 2214 if (intr->thread != thread) { 2215 SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); 2216 assert(false); 2217 return -EINVAL; 2218 } 2219 2220 return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); 2221 } 2222 2223 int 2224 spdk_thread_get_interrupt_fd(struct spdk_thread *thread) 2225 { 2226 return spdk_fd_group_get_fd(thread->fgrp); 2227 } 2228 2229 static bool g_interrupt_mode = false; 2230 2231 int 2232 spdk_interrupt_mode_enable(void) 2233 { 2234 /* It must be called once prior to initializing the threading library. 2235 * g_spdk_msg_mempool will be valid if thread library is initialized. 2236 */ 2237 if (g_spdk_msg_mempool) { 2238 SPDK_ERRLOG("Failed due to threading library is already initailzied.\n"); 2239 return -1; 2240 } 2241 2242 #ifdef __linux__ 2243 SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); 2244 g_interrupt_mode = true; 2245 return 0; 2246 #else 2247 SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); 2248 g_interrupt_mode = false; 2249 return -ENOTSUP; 2250 #endif 2251 } 2252 2253 bool 2254 spdk_interrupt_mode_is_enabled(void) 2255 { 2256 return g_interrupt_mode; 2257 } 2258 2259 SPDK_LOG_REGISTER_COMPONENT(thread) 2260