1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/likely.h" 36 37 #include "spdk_internal/event.h" 38 39 #include "spdk/log.h" 40 #include "spdk/thread.h" 41 #include "spdk/env.h" 42 #include "spdk/util.h" 43 #include "spdk/scheduler.h" 44 #include "spdk/string.h" 45 #include "spdk/fd_group.h" 46 47 #ifdef __linux__ 48 #include <sys/prctl.h> 49 #include <sys/eventfd.h> 50 #endif 51 52 #ifdef __FreeBSD__ 53 #include <pthread_np.h> 54 #endif 55 56 #define SPDK_EVENT_BATCH_SIZE 8 57 58 static struct spdk_reactor *g_reactors; 59 static uint32_t g_reactor_count; 60 static struct spdk_cpuset g_reactor_core_mask; 61 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; 62 63 static bool g_framework_context_switch_monitor_enabled = true; 64 65 static struct spdk_mempool *g_spdk_event_mempool = NULL; 66 67 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list 68 = TAILQ_HEAD_INITIALIZER(g_scheduler_list); 69 70 static struct spdk_scheduler *g_scheduler = NULL; 71 static struct spdk_reactor *g_scheduling_reactor; 72 bool g_scheduling_in_progress = false; 73 static uint64_t g_scheduler_period = 0; 74 static uint32_t g_scheduler_core_number; 75 static struct spdk_scheduler_core_info *g_core_infos = NULL; 76 77 TAILQ_HEAD(, spdk_governor) g_governor_list 78 = TAILQ_HEAD_INITIALIZER(g_governor_list); 79 80 static struct spdk_governor *g_governor = NULL; 81 82 static int reactor_interrupt_init(struct spdk_reactor *reactor); 83 static void reactor_interrupt_fini(struct spdk_reactor *reactor); 84 85 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER; 86 static bool g_stopping_reactors = false; 87 88 static struct spdk_scheduler * 89 _scheduler_find(const char *name) 90 { 91 struct spdk_scheduler *tmp; 92 93 TAILQ_FOREACH(tmp, &g_scheduler_list, link) { 94 if (strcmp(name, tmp->name) == 0) { 95 return tmp; 96 } 97 } 98 99 return NULL; 100 } 101 102 int 103 spdk_scheduler_set(const char *name) 104 { 105 struct spdk_scheduler *scheduler; 106 int rc = 0; 107 108 /* NULL scheduler was specifically requested */ 109 if (name == NULL) { 110 if (g_scheduler) { 111 g_scheduler->deinit(); 112 } 113 g_scheduler = NULL; 114 return 0; 115 } 116 117 scheduler = _scheduler_find(name); 118 if (scheduler == NULL) { 119 SPDK_ERRLOG("Requested scheduler is missing\n"); 120 return -EINVAL; 121 } 122 123 if (g_scheduler == scheduler) { 124 return 0; 125 } 126 127 rc = scheduler->init(); 128 if (rc == 0) { 129 if (g_scheduler) { 130 g_scheduler->deinit(); 131 } 132 g_scheduler = scheduler; 133 } 134 135 return rc; 136 } 137 138 struct spdk_scheduler * 139 spdk_scheduler_get(void) 140 { 141 return g_scheduler; 142 } 143 144 uint64_t 145 spdk_scheduler_get_period(void) 146 { 147 /* Convert from ticks to microseconds */ 148 return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz()); 149 } 150 151 void 152 spdk_scheduler_set_period(uint64_t period) 153 { 154 /* Convert microseconds to ticks */ 155 g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 156 } 157 158 void 159 spdk_scheduler_register(struct spdk_scheduler *scheduler) 160 { 161 if (_scheduler_find(scheduler->name)) { 162 SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name); 163 assert(false); 164 return; 165 } 166 167 TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link); 168 } 169 170 static void 171 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 172 { 173 reactor->lcore = lcore; 174 reactor->flags.is_valid = true; 175 176 TAILQ_INIT(&reactor->threads); 177 reactor->thread_count = 0; 178 spdk_cpuset_zero(&reactor->notify_cpuset); 179 180 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 181 if (reactor->events == NULL) { 182 SPDK_ERRLOG("Failed to allocate events ring\n"); 183 assert(false); 184 } 185 186 /* Always initialize interrupt facilities for reactor */ 187 if (reactor_interrupt_init(reactor) != 0) { 188 /* Reactor interrupt facilities are necessary if seting app to interrupt mode. */ 189 if (spdk_interrupt_mode_is_enabled()) { 190 SPDK_ERRLOG("Failed to prepare intr facilities\n"); 191 assert(false); 192 } 193 return; 194 } 195 196 /* If application runs with full interrupt ability, 197 * all reactors are going to run in interrupt mode. 198 */ 199 if (spdk_interrupt_mode_is_enabled()) { 200 uint32_t i; 201 202 SPDK_ENV_FOREACH_CORE(i) { 203 spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true); 204 } 205 reactor->in_interrupt = true; 206 } 207 } 208 209 struct spdk_reactor * 210 spdk_reactor_get(uint32_t lcore) 211 { 212 struct spdk_reactor *reactor; 213 214 if (g_reactors == NULL) { 215 SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); 216 return NULL; 217 } 218 219 if (lcore >= g_reactor_count) { 220 return NULL; 221 } 222 223 reactor = &g_reactors[lcore]; 224 225 if (reactor->flags.is_valid == false) { 226 return NULL; 227 } 228 229 return reactor; 230 } 231 232 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); 233 static bool reactor_thread_op_supported(enum spdk_thread_op op); 234 235 int 236 spdk_reactors_init(void) 237 { 238 struct spdk_reactor *reactor; 239 int rc; 240 uint32_t i, current_core; 241 char mempool_name[32]; 242 243 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 244 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 245 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 246 sizeof(struct spdk_event), 247 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 248 SPDK_ENV_SOCKET_ID_ANY); 249 250 if (g_spdk_event_mempool == NULL) { 251 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 252 return -1; 253 } 254 255 /* struct spdk_reactor must be aligned on 64 byte boundary */ 256 g_reactor_count = spdk_env_get_last_core() + 1; 257 rc = posix_memalign((void **)&g_reactors, 64, 258 g_reactor_count * sizeof(struct spdk_reactor)); 259 if (rc != 0) { 260 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 261 g_reactor_count); 262 spdk_mempool_free(g_spdk_event_mempool); 263 return -1; 264 } 265 266 g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos)); 267 if (g_core_infos == NULL) { 268 SPDK_ERRLOG("Could not allocate memory for g_core_infos\n"); 269 spdk_mempool_free(g_spdk_event_mempool); 270 free(g_reactors); 271 return -ENOMEM; 272 } 273 274 memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor)); 275 276 spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, 277 sizeof(struct spdk_lw_thread)); 278 279 SPDK_ENV_FOREACH_CORE(i) { 280 reactor_construct(&g_reactors[i], i); 281 } 282 283 current_core = spdk_env_get_current_core(); 284 reactor = spdk_reactor_get(current_core); 285 assert(reactor != NULL); 286 g_scheduling_reactor = reactor; 287 288 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 289 290 return 0; 291 } 292 293 void 294 spdk_reactors_fini(void) 295 { 296 uint32_t i; 297 struct spdk_reactor *reactor; 298 299 if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { 300 return; 301 } 302 303 spdk_thread_lib_fini(); 304 305 SPDK_ENV_FOREACH_CORE(i) { 306 reactor = spdk_reactor_get(i); 307 assert(reactor != NULL); 308 assert(reactor->thread_count == 0); 309 if (reactor->events != NULL) { 310 spdk_ring_free(reactor->events); 311 } 312 313 reactor_interrupt_fini(reactor); 314 315 if (g_core_infos != NULL) { 316 free(g_core_infos[i].thread_infos); 317 } 318 } 319 320 spdk_mempool_free(g_spdk_event_mempool); 321 322 free(g_reactors); 323 g_reactors = NULL; 324 free(g_core_infos); 325 g_core_infos = NULL; 326 } 327 328 static void _reactor_set_interrupt_mode(void *arg1, void *arg2); 329 330 static void 331 _reactor_set_notify_cpuset(void *arg1, void *arg2) 332 { 333 struct spdk_reactor *target = arg1; 334 struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); 335 336 assert(reactor != NULL); 337 spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt); 338 } 339 340 static void 341 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2) 342 { 343 struct spdk_reactor *target = arg1; 344 345 if (target->new_in_interrupt == false) { 346 target->set_interrupt_mode_in_progress = false; 347 spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn, 348 target->set_interrupt_mode_cb_arg); 349 } else { 350 struct spdk_event *ev; 351 352 ev = spdk_event_allocate(target->lcore, _reactor_set_interrupt_mode, target, NULL); 353 assert(ev); 354 spdk_event_call(ev); 355 } 356 } 357 358 static void 359 _reactor_set_thread_interrupt_mode(void *ctx) 360 { 361 struct spdk_reactor *reactor = ctx; 362 363 spdk_thread_set_interrupt_mode(reactor->in_interrupt); 364 } 365 366 static void 367 _reactor_set_interrupt_mode(void *arg1, void *arg2) 368 { 369 struct spdk_reactor *target = arg1; 370 struct spdk_thread *thread; 371 struct spdk_lw_thread *lw_thread, *tmp; 372 373 assert(target == spdk_reactor_get(spdk_env_get_current_core())); 374 assert(target != NULL); 375 assert(target->in_interrupt != target->new_in_interrupt); 376 SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n", 377 target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll"); 378 379 target->in_interrupt = target->new_in_interrupt; 380 381 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 382 TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) { 383 thread = spdk_thread_get_from_ctx(lw_thread); 384 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target); 385 } 386 387 if (target->new_in_interrupt == false) { 388 /* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately 389 * track reactor stats. */ 390 target->tsc_last = spdk_get_ticks(); 391 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 392 } else { 393 uint64_t notify = 1; 394 int rc = 0; 395 396 /* Always trigger spdk_event and resched event in case of race condition */ 397 rc = write(target->events_fd, ¬ify, sizeof(notify)); 398 if (rc < 0) { 399 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 400 } 401 rc = write(target->resched_fd, ¬ify, sizeof(notify)); 402 if (rc < 0) { 403 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 404 } 405 406 target->set_interrupt_mode_in_progress = false; 407 spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn, 408 target->set_interrupt_mode_cb_arg); 409 } 410 } 411 412 int 413 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, 414 spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg) 415 { 416 struct spdk_reactor *target; 417 418 target = spdk_reactor_get(lcore); 419 if (target == NULL) { 420 return -EINVAL; 421 } 422 423 /* Eventfd has to be supported in order to use interrupt functionality. */ 424 if (target->fgrp == NULL) { 425 return -ENOTSUP; 426 } 427 428 if (spdk_get_thread() != _spdk_get_app_thread()) { 429 SPDK_ERRLOG("It is only permitted within spdk application thread.\n"); 430 return -EPERM; 431 } 432 433 if (target->in_interrupt == new_in_interrupt) { 434 cb_fn(cb_arg); 435 return 0; 436 } 437 438 if (target->set_interrupt_mode_in_progress) { 439 SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore); 440 return -EBUSY; 441 } 442 target->set_interrupt_mode_in_progress = true; 443 444 target->new_in_interrupt = new_in_interrupt; 445 target->set_interrupt_mode_cb_fn = cb_fn; 446 target->set_interrupt_mode_cb_arg = cb_arg; 447 448 SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n", 449 spdk_env_get_current_core(), lcore); 450 451 if (new_in_interrupt == false) { 452 /* For potential race cases, when setting the reactor to poll mode, 453 * first change the mode of the reactor and then clear the corresponding 454 * bit of the notify_cpuset of each reactor. 455 */ 456 struct spdk_event *ev; 457 458 ev = spdk_event_allocate(lcore, _reactor_set_interrupt_mode, target, NULL); 459 assert(ev); 460 spdk_event_call(ev); 461 } else { 462 /* For race cases, when setting the reactor to interrupt mode, first set the 463 * corresponding bit of the notify_cpuset of each reactor and then change the mode. 464 */ 465 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 466 } 467 468 return 0; 469 } 470 471 struct spdk_event * 472 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 473 { 474 struct spdk_event *event = NULL; 475 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 476 477 if (!reactor) { 478 assert(false); 479 return NULL; 480 } 481 482 event = spdk_mempool_get(g_spdk_event_mempool); 483 if (event == NULL) { 484 assert(false); 485 return NULL; 486 } 487 488 event->lcore = lcore; 489 event->fn = fn; 490 event->arg1 = arg1; 491 event->arg2 = arg2; 492 493 return event; 494 } 495 496 void 497 spdk_event_call(struct spdk_event *event) 498 { 499 int rc; 500 struct spdk_reactor *reactor; 501 struct spdk_reactor *local_reactor = NULL; 502 uint32_t current_core = spdk_env_get_current_core(); 503 504 reactor = spdk_reactor_get(event->lcore); 505 506 assert(reactor != NULL); 507 assert(reactor->events != NULL); 508 509 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); 510 if (rc != 1) { 511 assert(false); 512 } 513 514 if (current_core != SPDK_ENV_LCORE_ID_ANY) { 515 local_reactor = spdk_reactor_get(current_core); 516 } 517 518 /* If spdk_event_call isn't called on a reactor, always send a notification. 519 * If it is called on a reactor, send a notification if the destination reactor 520 * is indicated in interrupt mode state. 521 */ 522 if (spdk_unlikely(local_reactor == NULL) || 523 spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) { 524 uint64_t notify = 1; 525 526 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 527 if (rc < 0) { 528 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 529 } 530 } 531 } 532 533 static inline int 534 event_queue_run_batch(void *arg) 535 { 536 struct spdk_reactor *reactor = arg; 537 size_t count, i; 538 void *events[SPDK_EVENT_BATCH_SIZE]; 539 struct spdk_thread *thread; 540 struct spdk_lw_thread *lw_thread; 541 542 #ifdef DEBUG 543 /* 544 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 545 * so we will never actually read uninitialized data from events, but just to be sure 546 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 547 */ 548 memset(events, 0, sizeof(events)); 549 #endif 550 551 /* Operate event notification if this reactor currently runs in interrupt state */ 552 if (spdk_unlikely(reactor->in_interrupt)) { 553 uint64_t notify = 1; 554 int rc; 555 556 /* There may be race between event_acknowledge and another producer's event_notify, 557 * so event_acknowledge should be applied ahead. And then check for self's event_notify. 558 * This can avoid event notification missing. 559 */ 560 rc = read(reactor->events_fd, ¬ify, sizeof(notify)); 561 if (rc < 0) { 562 SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno)); 563 return -errno; 564 } 565 566 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 567 568 if (spdk_ring_count(reactor->events) != 0) { 569 /* Trigger new notification if there are still events in event-queue waiting for processing. */ 570 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 571 if (rc < 0) { 572 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 573 return -errno; 574 } 575 } 576 } else { 577 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 578 } 579 580 if (count == 0) { 581 return 0; 582 } 583 584 /* Execute the events. There are still some remaining events 585 * that must occur on an SPDK thread. To accomodate those, try to 586 * run them on the first thread in the list, if it exists. */ 587 lw_thread = TAILQ_FIRST(&reactor->threads); 588 if (lw_thread) { 589 thread = spdk_thread_get_from_ctx(lw_thread); 590 } else { 591 thread = NULL; 592 } 593 594 for (i = 0; i < count; i++) { 595 struct spdk_event *event = events[i]; 596 597 assert(event != NULL); 598 spdk_set_thread(thread); 599 event->fn(event->arg1, event->arg2); 600 spdk_set_thread(NULL); 601 } 602 603 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 604 605 return (int)count; 606 } 607 608 /* 1s */ 609 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 610 611 static int 612 get_rusage(struct spdk_reactor *reactor) 613 { 614 struct rusage rusage; 615 616 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 617 return -1; 618 } 619 620 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 621 SPDK_INFOLOG(reactor, 622 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 623 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 624 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 625 } 626 reactor->rusage = rusage; 627 628 return -1; 629 } 630 631 void 632 spdk_framework_enable_context_switch_monitor(bool enable) 633 { 634 /* This global is being read by multiple threads, so this isn't 635 * strictly thread safe. However, we're toggling between true and 636 * false here, and if a thread sees the value update later than it 637 * should, it's no big deal. */ 638 g_framework_context_switch_monitor_enabled = enable; 639 } 640 641 bool 642 spdk_framework_context_switch_monitor_enabled(void) 643 { 644 return g_framework_context_switch_monitor_enabled; 645 } 646 647 static void 648 _set_thread_name(const char *thread_name) 649 { 650 #if defined(__linux__) 651 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 652 #elif defined(__FreeBSD__) 653 pthread_set_name_np(pthread_self(), thread_name); 654 #else 655 pthread_setname_np(pthread_self(), thread_name); 656 #endif 657 } 658 659 static void 660 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 661 { 662 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 663 struct spdk_thread_stats prev_total_stats; 664 665 /* Read total_stats before updating it to calculate stats during the last scheduling period. */ 666 prev_total_stats = lw_thread->total_stats; 667 668 spdk_set_thread(thread); 669 spdk_thread_get_stats(&lw_thread->total_stats); 670 spdk_set_thread(NULL); 671 672 lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc; 673 lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc; 674 } 675 676 static void 677 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info) 678 { 679 struct spdk_lw_thread *lw_thread; 680 struct spdk_thread *thread; 681 682 thread = spdk_thread_get_by_id(thread_info->thread_id); 683 if (thread == NULL) { 684 /* Thread no longer exists. */ 685 return; 686 } 687 lw_thread = spdk_thread_get_ctx(thread); 688 assert(lw_thread != NULL); 689 690 lw_thread->lcore = thread_info->lcore; 691 lw_thread->resched = true; 692 } 693 694 static void 695 _threads_reschedule(struct spdk_scheduler_core_info *cores_info) 696 { 697 struct spdk_scheduler_core_info *core; 698 struct spdk_scheduler_thread_info *thread_info; 699 uint32_t i, j; 700 701 SPDK_ENV_FOREACH_CORE(i) { 702 core = &cores_info[i]; 703 for (j = 0; j < core->threads_count; j++) { 704 thread_info = &core->thread_infos[j]; 705 if (thread_info->lcore != i) { 706 _threads_reschedule_thread(thread_info); 707 } 708 } 709 core->threads_count = 0; 710 free(core->thread_infos); 711 core->thread_infos = NULL; 712 } 713 } 714 715 static void 716 _reactors_scheduler_fini(void) 717 { 718 /* Reschedule based on the balancing output */ 719 _threads_reschedule(g_core_infos); 720 721 g_scheduling_in_progress = false; 722 } 723 724 static void 725 _reactors_scheduler_update_core_mode(void *ctx) 726 { 727 struct spdk_reactor *reactor; 728 uint32_t i; 729 int rc = 0; 730 731 for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) { 732 reactor = spdk_reactor_get(i); 733 assert(reactor != NULL); 734 if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) { 735 /* Switch next found reactor to new state */ 736 rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode, 737 _reactors_scheduler_update_core_mode, NULL); 738 if (rc == 0) { 739 /* Set core to start with after callback completes */ 740 g_scheduler_core_number = spdk_env_get_next_core(i); 741 return; 742 } 743 } 744 } 745 _reactors_scheduler_fini(); 746 } 747 748 static void 749 _reactors_scheduler_cancel(void *arg1, void *arg2) 750 { 751 struct spdk_scheduler_core_info *core; 752 uint32_t i; 753 754 SPDK_ENV_FOREACH_CORE(i) { 755 core = &g_core_infos[i]; 756 core->threads_count = 0; 757 free(core->thread_infos); 758 core->thread_infos = NULL; 759 } 760 761 g_scheduling_in_progress = false; 762 } 763 764 static void 765 _reactors_scheduler_balance(void *arg1, void *arg2) 766 { 767 struct spdk_scheduler *scheduler = spdk_scheduler_get(); 768 769 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) { 770 _reactors_scheduler_cancel(NULL, NULL); 771 return; 772 } 773 774 scheduler->balance(g_core_infos, g_reactor_count); 775 776 g_scheduler_core_number = spdk_env_get_first_core(); 777 _reactors_scheduler_update_core_mode(NULL); 778 } 779 780 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */ 781 static void 782 _reactors_scheduler_gather_metrics(void *arg1, void *arg2) 783 { 784 struct spdk_scheduler_core_info *core_info; 785 struct spdk_lw_thread *lw_thread; 786 struct spdk_thread *thread; 787 struct spdk_reactor *reactor; 788 struct spdk_event *evt; 789 uint32_t next_core; 790 uint32_t i = 0; 791 792 reactor = spdk_reactor_get(spdk_env_get_current_core()); 793 assert(reactor != NULL); 794 core_info = &g_core_infos[reactor->lcore]; 795 core_info->lcore = reactor->lcore; 796 core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc; 797 core_info->total_idle_tsc = reactor->idle_tsc; 798 core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc; 799 core_info->total_busy_tsc = reactor->busy_tsc; 800 core_info->interrupt_mode = reactor->in_interrupt; 801 core_info->threads_count = 0; 802 803 SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); 804 805 if (reactor->thread_count > 0) { 806 core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos)); 807 if (core_info->thread_infos == NULL) { 808 SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore); 809 810 /* Cancel this round of schedule work */ 811 evt = spdk_event_allocate(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL); 812 spdk_event_call(evt); 813 return; 814 } 815 816 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 817 _init_thread_stats(reactor, lw_thread); 818 819 core_info->thread_infos[i].lcore = lw_thread->lcore; 820 thread = spdk_thread_get_from_ctx(lw_thread); 821 assert(thread != NULL); 822 core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread); 823 core_info->thread_infos[i].total_stats = lw_thread->total_stats; 824 core_info->thread_infos[i].current_stats = lw_thread->current_stats; 825 core_info->threads_count++; 826 assert(core_info->threads_count <= reactor->thread_count); 827 i++; 828 } 829 } 830 831 next_core = spdk_env_get_next_core(reactor->lcore); 832 if (next_core == UINT32_MAX) { 833 next_core = spdk_env_get_first_core(); 834 } 835 836 /* If we've looped back around to the scheduler thread, move to the next phase */ 837 if (next_core == g_scheduling_reactor->lcore) { 838 /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */ 839 evt = spdk_event_allocate(next_core, _reactors_scheduler_balance, NULL, NULL); 840 spdk_event_call(evt); 841 return; 842 } 843 844 evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL); 845 spdk_event_call(evt); 846 } 847 848 static int _reactor_schedule_thread(struct spdk_thread *thread); 849 static uint64_t g_rusage_period; 850 851 static void 852 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 853 { 854 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 855 int efd; 856 857 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 858 assert(reactor->thread_count > 0); 859 reactor->thread_count--; 860 861 /* Operate thread intr if running with full interrupt ability */ 862 if (spdk_interrupt_mode_is_enabled()) { 863 efd = spdk_thread_get_interrupt_fd(thread); 864 spdk_fd_group_remove(reactor->fgrp, efd); 865 } 866 } 867 868 static bool 869 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 870 { 871 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 872 873 if (spdk_unlikely(lw_thread->resched)) { 874 lw_thread->resched = false; 875 _reactor_remove_lw_thread(reactor, lw_thread); 876 _reactor_schedule_thread(thread); 877 return true; 878 } 879 880 if (spdk_unlikely(spdk_thread_is_exited(thread) && 881 spdk_thread_is_idle(thread))) { 882 _reactor_remove_lw_thread(reactor, lw_thread); 883 spdk_thread_destroy(thread); 884 return true; 885 } 886 887 return false; 888 } 889 890 static void 891 reactor_interrupt_run(struct spdk_reactor *reactor) 892 { 893 int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */ 894 895 spdk_fd_group_wait(reactor->fgrp, block_timeout); 896 } 897 898 static void 899 _reactor_run(struct spdk_reactor *reactor) 900 { 901 struct spdk_thread *thread; 902 struct spdk_lw_thread *lw_thread, *tmp; 903 uint64_t now; 904 int rc; 905 906 event_queue_run_batch(reactor); 907 908 /* If no threads are present on the reactor, 909 * tsc_last gets outdated. Update it to track 910 * thread execution time correctly. */ 911 if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) { 912 now = spdk_get_ticks(); 913 reactor->idle_tsc += now - reactor->tsc_last; 914 reactor->tsc_last = now; 915 return; 916 } 917 918 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 919 thread = spdk_thread_get_from_ctx(lw_thread); 920 rc = spdk_thread_poll(thread, 0, reactor->tsc_last); 921 922 now = spdk_thread_get_last_tsc(thread); 923 if (rc == 0) { 924 reactor->idle_tsc += now - reactor->tsc_last; 925 } else if (rc > 0) { 926 reactor->busy_tsc += now - reactor->tsc_last; 927 } 928 reactor->tsc_last = now; 929 930 reactor_post_process_lw_thread(reactor, lw_thread); 931 } 932 } 933 934 static int 935 reactor_run(void *arg) 936 { 937 struct spdk_reactor *reactor = arg; 938 struct spdk_thread *thread; 939 struct spdk_lw_thread *lw_thread, *tmp; 940 char thread_name[32]; 941 uint64_t last_sched = 0; 942 943 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 944 945 /* Rename the POSIX thread because the reactor is tied to the POSIX 946 * thread in the SPDK event library. 947 */ 948 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 949 _set_thread_name(thread_name); 950 951 reactor->tsc_last = spdk_get_ticks(); 952 953 while (1) { 954 /* Execute interrupt process fn if this reactor currently runs in interrupt state */ 955 if (spdk_unlikely(reactor->in_interrupt)) { 956 reactor_interrupt_run(reactor); 957 } else { 958 _reactor_run(reactor); 959 } 960 961 if (g_framework_context_switch_monitor_enabled) { 962 if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { 963 get_rusage(reactor); 964 reactor->last_rusage = reactor->tsc_last; 965 } 966 } 967 968 if (spdk_unlikely(g_scheduler_period > 0 && 969 (reactor->tsc_last - last_sched) > g_scheduler_period && 970 reactor == g_scheduling_reactor && 971 !g_scheduling_in_progress)) { 972 last_sched = reactor->tsc_last; 973 g_scheduling_in_progress = true; 974 _reactors_scheduler_gather_metrics(NULL, NULL); 975 } 976 977 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 978 break; 979 } 980 } 981 982 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 983 thread = spdk_thread_get_from_ctx(lw_thread); 984 spdk_set_thread(thread); 985 spdk_thread_exit(thread); 986 } 987 988 while (!TAILQ_EMPTY(&reactor->threads)) { 989 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 990 thread = spdk_thread_get_from_ctx(lw_thread); 991 spdk_set_thread(thread); 992 if (spdk_thread_is_exited(thread)) { 993 _reactor_remove_lw_thread(reactor, lw_thread); 994 spdk_thread_destroy(thread); 995 } else { 996 spdk_thread_poll(thread, 0, 0); 997 } 998 } 999 } 1000 1001 return 0; 1002 } 1003 1004 int 1005 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 1006 { 1007 int ret; 1008 const struct spdk_cpuset *validmask; 1009 1010 ret = spdk_cpuset_parse(cpumask, mask); 1011 if (ret < 0) { 1012 return ret; 1013 } 1014 1015 validmask = spdk_app_get_core_mask(); 1016 spdk_cpuset_and(cpumask, validmask); 1017 1018 return 0; 1019 } 1020 1021 const struct spdk_cpuset * 1022 spdk_app_get_core_mask(void) 1023 { 1024 return &g_reactor_core_mask; 1025 } 1026 1027 void 1028 spdk_reactors_start(void) 1029 { 1030 struct spdk_reactor *reactor; 1031 uint32_t i, current_core; 1032 int rc; 1033 1034 g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; 1035 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 1036 /* Reinitialize to false, in case the app framework is restarting in the same process. */ 1037 g_stopping_reactors = false; 1038 1039 current_core = spdk_env_get_current_core(); 1040 SPDK_ENV_FOREACH_CORE(i) { 1041 if (i != current_core) { 1042 reactor = spdk_reactor_get(i); 1043 if (reactor == NULL) { 1044 continue; 1045 } 1046 1047 rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); 1048 if (rc < 0) { 1049 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 1050 assert(false); 1051 return; 1052 } 1053 } 1054 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 1055 } 1056 1057 /* Start the main reactor */ 1058 reactor = spdk_reactor_get(current_core); 1059 assert(reactor != NULL); 1060 reactor_run(reactor); 1061 1062 spdk_env_thread_wait_all(); 1063 1064 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 1065 } 1066 1067 static void 1068 _reactors_stop(void *arg1, void *arg2) 1069 { 1070 uint32_t i; 1071 int rc; 1072 struct spdk_reactor *reactor; 1073 struct spdk_reactor *local_reactor; 1074 uint64_t notify = 1; 1075 1076 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 1077 local_reactor = spdk_reactor_get(spdk_env_get_current_core()); 1078 1079 SPDK_ENV_FOREACH_CORE(i) { 1080 /* If spdk_event_call isn't called on a reactor, always send a notification. 1081 * If it is called on a reactor, send a notification if the destination reactor 1082 * is indicated in interrupt mode state. 1083 */ 1084 if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) { 1085 reactor = spdk_reactor_get(i); 1086 assert(reactor != NULL); 1087 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 1088 if (rc < 0) { 1089 SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno)); 1090 continue; 1091 } 1092 } 1093 } 1094 } 1095 1096 static void 1097 nop(void *arg1, void *arg2) 1098 { 1099 } 1100 1101 void 1102 spdk_reactors_stop(void *arg1) 1103 { 1104 spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop); 1105 } 1106 1107 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 1108 static uint32_t g_next_core = UINT32_MAX; 1109 1110 static int 1111 thread_process_interrupts(void *arg) 1112 { 1113 struct spdk_thread *thread = arg; 1114 struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); 1115 uint64_t now; 1116 int rc; 1117 1118 assert(reactor != NULL); 1119 1120 /* Update idle_tsc between the end of last intr_fn and the start of this intr_fn. */ 1121 now = spdk_get_ticks(); 1122 reactor->idle_tsc += now - reactor->tsc_last; 1123 reactor->tsc_last = now; 1124 1125 rc = spdk_thread_poll(thread, 0, now); 1126 1127 /* Update tsc between the start and the end of this intr_fn. */ 1128 now = spdk_thread_get_last_tsc(thread); 1129 if (rc == 0) { 1130 reactor->idle_tsc += now - reactor->tsc_last; 1131 } else if (rc > 0) { 1132 reactor->busy_tsc += now - reactor->tsc_last; 1133 } 1134 reactor->tsc_last = now; 1135 1136 return rc; 1137 } 1138 1139 static void 1140 _schedule_thread(void *arg1, void *arg2) 1141 { 1142 struct spdk_lw_thread *lw_thread = arg1; 1143 struct spdk_thread *thread; 1144 struct spdk_reactor *reactor; 1145 uint32_t current_core; 1146 int efd; 1147 1148 current_core = spdk_env_get_current_core(); 1149 reactor = spdk_reactor_get(current_core); 1150 assert(reactor != NULL); 1151 1152 /* Update total_stats to reflect state of thread 1153 * at the end of the move. */ 1154 thread = spdk_thread_get_from_ctx(lw_thread); 1155 spdk_set_thread(thread); 1156 spdk_thread_get_stats(&lw_thread->total_stats); 1157 spdk_set_thread(NULL); 1158 1159 lw_thread->lcore = current_core; 1160 1161 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 1162 reactor->thread_count++; 1163 1164 /* Operate thread intr if running with full interrupt ability */ 1165 if (spdk_interrupt_mode_is_enabled()) { 1166 int rc; 1167 1168 efd = spdk_thread_get_interrupt_fd(thread); 1169 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, efd, 1170 thread_process_interrupts, thread); 1171 if (rc < 0) { 1172 SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc)); 1173 } 1174 1175 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 1176 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor); 1177 } 1178 } 1179 1180 static int 1181 _reactor_schedule_thread(struct spdk_thread *thread) 1182 { 1183 uint32_t core; 1184 struct spdk_lw_thread *lw_thread; 1185 struct spdk_event *evt = NULL; 1186 struct spdk_cpuset *cpumask; 1187 uint32_t i; 1188 struct spdk_reactor *local_reactor = NULL; 1189 uint32_t current_lcore = spdk_env_get_current_core(); 1190 struct spdk_cpuset polling_cpumask; 1191 struct spdk_cpuset valid_cpumask; 1192 1193 cpumask = spdk_thread_get_cpumask(thread); 1194 1195 lw_thread = spdk_thread_get_ctx(thread); 1196 assert(lw_thread != NULL); 1197 core = lw_thread->lcore; 1198 memset(lw_thread, 0, sizeof(*lw_thread)); 1199 1200 if (current_lcore != SPDK_ENV_LCORE_ID_ANY) { 1201 local_reactor = spdk_reactor_get(current_lcore); 1202 assert(local_reactor); 1203 } 1204 1205 /* When interrupt ability of spdk_thread is not enabled and the current 1206 * reactor runs on DPDK thread, skip reactors which are in interrupt mode. 1207 */ 1208 if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) { 1209 /* Get the cpumask of all reactors in polling */ 1210 spdk_cpuset_zero(&polling_cpumask); 1211 SPDK_ENV_FOREACH_CORE(i) { 1212 spdk_cpuset_set_cpu(&polling_cpumask, i, true); 1213 } 1214 spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset); 1215 1216 if (core == SPDK_ENV_LCORE_ID_ANY) { 1217 /* Get the cpumask of all valid reactors which are suggested and also in polling */ 1218 spdk_cpuset_copy(&valid_cpumask, &polling_cpumask); 1219 spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread)); 1220 1221 /* If there are any valid reactors, spdk_thread should be scheduled 1222 * into one of the valid reactors. 1223 * If there is no valid reactors, spdk_thread should be scheduled 1224 * into one of the polling reactors. 1225 */ 1226 if (spdk_cpuset_count(&valid_cpumask) != 0) { 1227 cpumask = &valid_cpumask; 1228 } else { 1229 cpumask = &polling_cpumask; 1230 } 1231 } else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) { 1232 /* If specified reactor is not in polling, spdk_thread should be scheduled 1233 * into one of the polling reactors. 1234 */ 1235 core = SPDK_ENV_LCORE_ID_ANY; 1236 cpumask = &polling_cpumask; 1237 } 1238 } 1239 1240 pthread_mutex_lock(&g_scheduler_mtx); 1241 if (core == SPDK_ENV_LCORE_ID_ANY) { 1242 for (i = 0; i < spdk_env_get_core_count(); i++) { 1243 if (g_next_core >= g_reactor_count) { 1244 g_next_core = spdk_env_get_first_core(); 1245 } 1246 core = g_next_core; 1247 g_next_core = spdk_env_get_next_core(g_next_core); 1248 1249 if (spdk_cpuset_get_cpu(cpumask, core)) { 1250 break; 1251 } 1252 } 1253 } 1254 1255 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 1256 1257 pthread_mutex_unlock(&g_scheduler_mtx); 1258 1259 assert(evt != NULL); 1260 if (evt == NULL) { 1261 SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); 1262 return -1; 1263 } 1264 1265 lw_thread->tsc_start = spdk_get_ticks(); 1266 1267 spdk_event_call(evt); 1268 1269 return 0; 1270 } 1271 1272 static void 1273 _reactor_request_thread_reschedule(struct spdk_thread *thread) 1274 { 1275 struct spdk_lw_thread *lw_thread; 1276 struct spdk_reactor *reactor; 1277 uint32_t current_core; 1278 1279 assert(thread == spdk_get_thread()); 1280 1281 lw_thread = spdk_thread_get_ctx(thread); 1282 1283 assert(lw_thread != NULL); 1284 lw_thread->resched = true; 1285 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1286 1287 current_core = spdk_env_get_current_core(); 1288 reactor = spdk_reactor_get(current_core); 1289 assert(reactor != NULL); 1290 1291 /* Send a notification if the destination reactor is indicated in intr mode state */ 1292 if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) { 1293 uint64_t notify = 1; 1294 1295 if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1296 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 1297 } 1298 } 1299 } 1300 1301 static int 1302 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) 1303 { 1304 struct spdk_lw_thread *lw_thread; 1305 1306 switch (op) { 1307 case SPDK_THREAD_OP_NEW: 1308 lw_thread = spdk_thread_get_ctx(thread); 1309 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1310 return _reactor_schedule_thread(thread); 1311 case SPDK_THREAD_OP_RESCHED: 1312 _reactor_request_thread_reschedule(thread); 1313 return 0; 1314 default: 1315 return -ENOTSUP; 1316 } 1317 } 1318 1319 static bool 1320 reactor_thread_op_supported(enum spdk_thread_op op) 1321 { 1322 switch (op) { 1323 case SPDK_THREAD_OP_NEW: 1324 case SPDK_THREAD_OP_RESCHED: 1325 return true; 1326 default: 1327 return false; 1328 } 1329 } 1330 1331 struct call_reactor { 1332 uint32_t cur_core; 1333 spdk_event_fn fn; 1334 void *arg1; 1335 void *arg2; 1336 1337 uint32_t orig_core; 1338 spdk_event_fn cpl; 1339 }; 1340 1341 static void 1342 on_reactor(void *arg1, void *arg2) 1343 { 1344 struct call_reactor *cr = arg1; 1345 struct spdk_event *evt; 1346 1347 cr->fn(cr->arg1, cr->arg2); 1348 1349 cr->cur_core = spdk_env_get_next_core(cr->cur_core); 1350 1351 if (cr->cur_core >= g_reactor_count) { 1352 SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n"); 1353 1354 evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); 1355 free(cr); 1356 } else { 1357 SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n", 1358 cr->cur_core); 1359 1360 evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); 1361 } 1362 assert(evt != NULL); 1363 spdk_event_call(evt); 1364 } 1365 1366 void 1367 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) 1368 { 1369 struct call_reactor *cr; 1370 struct spdk_event *evt; 1371 1372 /* When the application framework is shutting down, we will send one 1373 * final for_each_reactor operation with completion callback _reactors_stop, 1374 * to flush any existing for_each_reactor operations to avoid any memory 1375 * leaks. We use a mutex here to protect a boolean flag that will ensure 1376 * we don't start any more operations once we've started shutting down. 1377 */ 1378 pthread_mutex_lock(&g_stopping_reactors_mtx); 1379 if (g_stopping_reactors) { 1380 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1381 return; 1382 } else if (cpl == _reactors_stop) { 1383 g_stopping_reactors = true; 1384 } 1385 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1386 1387 cr = calloc(1, sizeof(*cr)); 1388 if (!cr) { 1389 SPDK_ERRLOG("Unable to perform reactor iteration\n"); 1390 cpl(arg1, arg2); 1391 return; 1392 } 1393 1394 cr->fn = fn; 1395 cr->arg1 = arg1; 1396 cr->arg2 = arg2; 1397 cr->cpl = cpl; 1398 cr->orig_core = spdk_env_get_current_core(); 1399 cr->cur_core = spdk_env_get_first_core(); 1400 1401 SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core); 1402 1403 evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL); 1404 assert(evt != NULL); 1405 1406 spdk_event_call(evt); 1407 } 1408 1409 #ifdef __linux__ 1410 static int 1411 reactor_schedule_thread_event(void *arg) 1412 { 1413 struct spdk_reactor *reactor = arg; 1414 struct spdk_lw_thread *lw_thread, *tmp; 1415 uint32_t count = 0; 1416 uint64_t notify = 1; 1417 1418 assert(reactor->in_interrupt); 1419 1420 if (read(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1421 SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno)); 1422 return -errno; 1423 } 1424 1425 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1426 count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0; 1427 } 1428 1429 return count; 1430 } 1431 1432 static int 1433 reactor_interrupt_init(struct spdk_reactor *reactor) 1434 { 1435 int rc; 1436 1437 rc = spdk_fd_group_create(&reactor->fgrp); 1438 if (rc != 0) { 1439 return rc; 1440 } 1441 1442 reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1443 if (reactor->resched_fd < 0) { 1444 rc = -EBADF; 1445 goto err; 1446 } 1447 1448 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event, 1449 reactor); 1450 if (rc) { 1451 close(reactor->resched_fd); 1452 goto err; 1453 } 1454 1455 reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1456 if (reactor->events_fd < 0) { 1457 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1458 close(reactor->resched_fd); 1459 1460 rc = -EBADF; 1461 goto err; 1462 } 1463 1464 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd, 1465 event_queue_run_batch, reactor); 1466 if (rc) { 1467 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1468 close(reactor->resched_fd); 1469 close(reactor->events_fd); 1470 goto err; 1471 } 1472 1473 return 0; 1474 1475 err: 1476 spdk_fd_group_destroy(reactor->fgrp); 1477 reactor->fgrp = NULL; 1478 return rc; 1479 } 1480 #else 1481 static int 1482 reactor_interrupt_init(struct spdk_reactor *reactor) 1483 { 1484 return -ENOTSUP; 1485 } 1486 #endif 1487 1488 static void 1489 reactor_interrupt_fini(struct spdk_reactor *reactor) 1490 { 1491 struct spdk_fd_group *fgrp = reactor->fgrp; 1492 1493 if (!fgrp) { 1494 return; 1495 } 1496 1497 spdk_fd_group_remove(fgrp, reactor->events_fd); 1498 spdk_fd_group_remove(fgrp, reactor->resched_fd); 1499 1500 close(reactor->events_fd); 1501 close(reactor->resched_fd); 1502 1503 spdk_fd_group_destroy(fgrp); 1504 reactor->fgrp = NULL; 1505 } 1506 1507 static struct spdk_governor * 1508 _governor_find(const char *name) 1509 { 1510 struct spdk_governor *governor, *tmp; 1511 1512 TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) { 1513 if (strcmp(name, governor->name) == 0) { 1514 return governor; 1515 } 1516 } 1517 1518 return NULL; 1519 } 1520 1521 int 1522 spdk_governor_set(const char *name) 1523 { 1524 struct spdk_governor *governor; 1525 int rc = 0; 1526 1527 /* NULL governor was specifically requested */ 1528 if (name == NULL) { 1529 if (g_governor) { 1530 g_governor->deinit(); 1531 } 1532 g_governor = NULL; 1533 return 0; 1534 } 1535 1536 governor = _governor_find(name); 1537 if (governor == NULL) { 1538 return -EINVAL; 1539 } 1540 1541 if (g_governor == governor) { 1542 return 0; 1543 } 1544 1545 rc = governor->init(); 1546 if (rc == 0) { 1547 if (g_governor) { 1548 g_governor->deinit(); 1549 } 1550 g_governor = governor; 1551 } 1552 1553 return rc; 1554 } 1555 1556 struct spdk_governor * 1557 spdk_governor_get(void) 1558 { 1559 return g_governor; 1560 } 1561 1562 void 1563 spdk_governor_register(struct spdk_governor *governor) 1564 { 1565 if (_governor_find(governor->name)) { 1566 SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name); 1567 assert(false); 1568 return; 1569 } 1570 1571 TAILQ_INSERT_TAIL(&g_governor_list, governor, link); 1572 } 1573 1574 SPDK_LOG_REGISTER_COMPONENT(reactor) 1575