1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/likely.h" 8 9 #include "event_internal.h" 10 11 #include "spdk_internal/event.h" 12 #include "spdk_internal/usdt.h" 13 14 #include "spdk/log.h" 15 #include "spdk/thread.h" 16 #include "spdk/env.h" 17 #include "spdk/util.h" 18 #include "spdk/scheduler.h" 19 #include "spdk/string.h" 20 #include "spdk/fd_group.h" 21 #include "spdk/trace.h" 22 #include "spdk_internal/trace_defs.h" 23 24 #ifdef __linux__ 25 #include <sys/prctl.h> 26 #include <sys/eventfd.h> 27 #endif 28 29 #ifdef __FreeBSD__ 30 #include <pthread_np.h> 31 #endif 32 33 #define SPDK_EVENT_BATCH_SIZE 8 34 35 static struct spdk_reactor *g_reactors; 36 static uint32_t g_reactor_count; 37 static struct spdk_cpuset g_reactor_core_mask; 38 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; 39 40 static bool g_framework_context_switch_monitor_enabled = true; 41 42 static struct spdk_mempool *g_spdk_event_mempool = NULL; 43 44 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list 45 = TAILQ_HEAD_INITIALIZER(g_scheduler_list); 46 47 static struct spdk_scheduler *g_scheduler = NULL; 48 static struct spdk_reactor *g_scheduling_reactor; 49 bool g_scheduling_in_progress = false; 50 static uint64_t g_scheduler_period_in_tsc = 0; 51 static uint64_t g_scheduler_period_in_us; 52 static uint32_t g_scheduler_core_number; 53 static struct spdk_scheduler_core_info *g_core_infos = NULL; 54 static struct spdk_cpuset g_scheduler_isolated_core_mask; 55 56 TAILQ_HEAD(, spdk_governor) g_governor_list 57 = TAILQ_HEAD_INITIALIZER(g_governor_list); 58 59 static struct spdk_governor *g_governor = NULL; 60 61 static int reactor_interrupt_init(struct spdk_reactor *reactor); 62 static void reactor_interrupt_fini(struct spdk_reactor *reactor); 63 64 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER; 65 static bool g_stopping_reactors = false; 66 67 static struct spdk_scheduler * 68 _scheduler_find(const char *name) 69 { 70 struct spdk_scheduler *tmp; 71 72 TAILQ_FOREACH(tmp, &g_scheduler_list, link) { 73 if (strcmp(name, tmp->name) == 0) { 74 return tmp; 75 } 76 } 77 78 return NULL; 79 } 80 81 int 82 spdk_scheduler_set(const char *name) 83 { 84 struct spdk_scheduler *scheduler; 85 int rc = 0; 86 87 /* NULL scheduler was specifically requested */ 88 if (name == NULL) { 89 if (g_scheduler) { 90 g_scheduler->deinit(); 91 } 92 g_scheduler = NULL; 93 return 0; 94 } 95 96 scheduler = _scheduler_find(name); 97 if (scheduler == NULL) { 98 SPDK_ERRLOG("Requested scheduler is missing\n"); 99 return -EINVAL; 100 } 101 102 if (g_scheduler == scheduler) { 103 return 0; 104 } 105 106 if (g_scheduler) { 107 g_scheduler->deinit(); 108 } 109 110 rc = scheduler->init(); 111 if (rc == 0) { 112 g_scheduler = scheduler; 113 } else { 114 /* Could not switch to the new scheduler, so keep the old 115 * one. We need to check if it wasn't NULL, and ->init() it again. 116 */ 117 if (g_scheduler) { 118 SPDK_ERRLOG("Could not ->init() '%s' scheduler, reverting to '%s'\n", 119 name, g_scheduler->name); 120 g_scheduler->init(); 121 } else { 122 SPDK_ERRLOG("Could not ->init() '%s' scheduler.\n", name); 123 } 124 } 125 126 return rc; 127 } 128 129 struct spdk_scheduler * 130 spdk_scheduler_get(void) 131 { 132 return g_scheduler; 133 } 134 135 uint64_t 136 spdk_scheduler_get_period(void) 137 { 138 return g_scheduler_period_in_us; 139 } 140 141 void 142 spdk_scheduler_set_period(uint64_t period) 143 { 144 g_scheduler_period_in_us = period; 145 g_scheduler_period_in_tsc = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 146 } 147 148 void 149 spdk_scheduler_register(struct spdk_scheduler *scheduler) 150 { 151 if (_scheduler_find(scheduler->name)) { 152 SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name); 153 assert(false); 154 return; 155 } 156 157 TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link); 158 } 159 160 uint32_t 161 spdk_scheduler_get_scheduling_lcore(void) 162 { 163 return g_scheduling_reactor->lcore; 164 } 165 166 bool 167 spdk_scheduler_set_scheduling_lcore(uint32_t core) 168 { 169 struct spdk_reactor *reactor = spdk_reactor_get(core); 170 if (reactor == NULL) { 171 SPDK_ERRLOG("Failed to set scheduling reactor. Reactor(lcore:%d) does not exist", core); 172 return false; 173 } 174 175 g_scheduling_reactor = reactor; 176 return true; 177 } 178 179 bool 180 scheduler_set_isolated_core_mask(struct spdk_cpuset isolated_core_mask) 181 { 182 struct spdk_cpuset tmp_mask; 183 184 spdk_cpuset_copy(&tmp_mask, spdk_app_get_core_mask()); 185 spdk_cpuset_or(&tmp_mask, &isolated_core_mask); 186 if (spdk_cpuset_equal(&tmp_mask, spdk_app_get_core_mask()) == false) { 187 SPDK_ERRLOG("Isolated core mask is not included in app core mask.\n"); 188 return false; 189 } 190 spdk_cpuset_copy(&g_scheduler_isolated_core_mask, &isolated_core_mask); 191 return true; 192 } 193 194 const char * 195 scheduler_get_isolated_core_mask(void) 196 { 197 return spdk_cpuset_fmt(&g_scheduler_isolated_core_mask); 198 } 199 200 static bool 201 scheduler_is_isolated_core(uint32_t core) 202 { 203 return spdk_cpuset_get_cpu(&g_scheduler_isolated_core_mask, core); 204 } 205 206 static void 207 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 208 { 209 reactor->lcore = lcore; 210 reactor->flags.is_valid = true; 211 212 TAILQ_INIT(&reactor->threads); 213 reactor->thread_count = 0; 214 spdk_cpuset_zero(&reactor->notify_cpuset); 215 216 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY); 217 if (reactor->events == NULL) { 218 SPDK_ERRLOG("Failed to allocate events ring\n"); 219 assert(false); 220 } 221 222 /* Always initialize interrupt facilities for reactor */ 223 if (reactor_interrupt_init(reactor) != 0) { 224 /* Reactor interrupt facilities are necessary if setting app to interrupt mode. */ 225 if (spdk_interrupt_mode_is_enabled()) { 226 SPDK_ERRLOG("Failed to prepare intr facilities\n"); 227 assert(false); 228 } 229 return; 230 } 231 232 /* If application runs with full interrupt ability, 233 * all reactors are going to run in interrupt mode. 234 */ 235 if (spdk_interrupt_mode_is_enabled()) { 236 uint32_t i; 237 238 SPDK_ENV_FOREACH_CORE(i) { 239 spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true); 240 } 241 reactor->in_interrupt = true; 242 } 243 } 244 245 struct spdk_reactor * 246 spdk_reactor_get(uint32_t lcore) 247 { 248 struct spdk_reactor *reactor; 249 250 if (g_reactors == NULL) { 251 SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); 252 return NULL; 253 } 254 255 if (lcore >= g_reactor_count) { 256 return NULL; 257 } 258 259 reactor = &g_reactors[lcore]; 260 261 if (reactor->flags.is_valid == false) { 262 return NULL; 263 } 264 265 return reactor; 266 } 267 268 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); 269 static bool reactor_thread_op_supported(enum spdk_thread_op op); 270 271 /* Power of 2 minus 1 is optimal for memory consumption */ 272 #define EVENT_MSG_MEMPOOL_SHIFT 14 /* 2^14 = 16384 */ 273 #define EVENT_MSG_MEMPOOL_SIZE ((1 << EVENT_MSG_MEMPOOL_SHIFT) - 1) 274 275 int 276 spdk_reactors_init(size_t msg_mempool_size) 277 { 278 struct spdk_reactor *reactor; 279 int rc; 280 uint32_t i, current_core; 281 char mempool_name[32]; 282 283 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 284 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 285 EVENT_MSG_MEMPOOL_SIZE, 286 sizeof(struct spdk_event), 287 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 288 SPDK_ENV_NUMA_ID_ANY); 289 290 if (g_spdk_event_mempool == NULL) { 291 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 292 return -1; 293 } 294 295 /* struct spdk_reactor must be aligned on 64 byte boundary */ 296 g_reactor_count = spdk_env_get_last_core() + 1; 297 rc = posix_memalign((void **)&g_reactors, 64, 298 g_reactor_count * sizeof(struct spdk_reactor)); 299 if (rc != 0) { 300 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 301 g_reactor_count); 302 spdk_mempool_free(g_spdk_event_mempool); 303 return -1; 304 } 305 306 g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos)); 307 if (g_core_infos == NULL) { 308 SPDK_ERRLOG("Could not allocate memory for g_core_infos\n"); 309 spdk_mempool_free(g_spdk_event_mempool); 310 free(g_reactors); 311 return -ENOMEM; 312 } 313 314 memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor)); 315 316 rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, 317 sizeof(struct spdk_lw_thread), msg_mempool_size); 318 if (rc != 0) { 319 SPDK_ERRLOG("Initialize spdk thread lib failed\n"); 320 spdk_mempool_free(g_spdk_event_mempool); 321 free(g_reactors); 322 free(g_core_infos); 323 return rc; 324 } 325 326 SPDK_ENV_FOREACH_CORE(i) { 327 reactor_construct(&g_reactors[i], i); 328 } 329 330 current_core = spdk_env_get_current_core(); 331 reactor = spdk_reactor_get(current_core); 332 assert(reactor != NULL); 333 g_scheduling_reactor = reactor; 334 335 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 336 337 return 0; 338 } 339 340 void 341 spdk_reactors_fini(void) 342 { 343 uint32_t i; 344 struct spdk_reactor *reactor; 345 346 if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { 347 return; 348 } 349 350 spdk_thread_lib_fini(); 351 352 SPDK_ENV_FOREACH_CORE(i) { 353 reactor = spdk_reactor_get(i); 354 assert(reactor != NULL); 355 assert(reactor->thread_count == 0); 356 if (reactor->events != NULL) { 357 spdk_ring_free(reactor->events); 358 } 359 360 reactor_interrupt_fini(reactor); 361 362 if (g_core_infos != NULL) { 363 free(g_core_infos[i].thread_infos); 364 } 365 } 366 367 spdk_mempool_free(g_spdk_event_mempool); 368 369 free(g_reactors); 370 g_reactors = NULL; 371 free(g_core_infos); 372 g_core_infos = NULL; 373 } 374 375 static void _reactor_set_interrupt_mode(void *arg1, void *arg2); 376 377 static void 378 _reactor_set_notify_cpuset(void *arg1, void *arg2) 379 { 380 struct spdk_reactor *target = arg1; 381 struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); 382 383 assert(reactor != NULL); 384 spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt); 385 } 386 387 static void 388 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 389 { 390 struct spdk_event *ev; 391 392 ev = spdk_event_allocate(lcore, fn, arg1, arg2); 393 assert(ev); 394 spdk_event_call(ev); 395 } 396 397 static void 398 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2) 399 { 400 struct spdk_reactor *target = arg1; 401 402 if (target->new_in_interrupt == false) { 403 target->set_interrupt_mode_in_progress = false; 404 _event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn, 405 target->set_interrupt_mode_cb_arg, NULL); 406 } else { 407 _event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL); 408 } 409 } 410 411 static void 412 _reactor_set_thread_interrupt_mode(void *ctx) 413 { 414 struct spdk_reactor *reactor = ctx; 415 416 spdk_thread_set_interrupt_mode(reactor->in_interrupt); 417 } 418 419 static void 420 _reactor_set_interrupt_mode(void *arg1, void *arg2) 421 { 422 struct spdk_reactor *target = arg1; 423 struct spdk_thread *thread; 424 struct spdk_fd_group *grp; 425 struct spdk_lw_thread *lw_thread, *tmp; 426 427 assert(target == spdk_reactor_get(spdk_env_get_current_core())); 428 assert(target != NULL); 429 assert(target->in_interrupt != target->new_in_interrupt); 430 SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n", 431 target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll"); 432 433 target->in_interrupt = target->new_in_interrupt; 434 435 if (spdk_interrupt_mode_is_enabled()) { 436 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 437 TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) { 438 thread = spdk_thread_get_from_ctx(lw_thread); 439 if (target->in_interrupt) { 440 grp = spdk_thread_get_interrupt_fd_group(thread); 441 spdk_fd_group_nest(target->fgrp, grp); 442 } else { 443 grp = spdk_thread_get_interrupt_fd_group(thread); 444 spdk_fd_group_unnest(target->fgrp, grp); 445 } 446 447 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target); 448 } 449 } 450 451 if (target->new_in_interrupt == false) { 452 /* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately 453 * track reactor stats. */ 454 target->tsc_last = spdk_get_ticks(); 455 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 456 } else { 457 uint64_t notify = 1; 458 int rc = 0; 459 460 /* Always trigger spdk_event and resched event in case of race condition */ 461 rc = write(target->events_fd, ¬ify, sizeof(notify)); 462 if (rc < 0) { 463 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 464 } 465 rc = write(target->resched_fd, ¬ify, sizeof(notify)); 466 if (rc < 0) { 467 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 468 } 469 470 target->set_interrupt_mode_in_progress = false; 471 _event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn, 472 target->set_interrupt_mode_cb_arg, NULL); 473 } 474 } 475 476 int 477 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, 478 spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg) 479 { 480 struct spdk_reactor *target; 481 482 target = spdk_reactor_get(lcore); 483 if (target == NULL) { 484 return -EINVAL; 485 } 486 487 /* Eventfd has to be supported in order to use interrupt functionality. */ 488 if (target->fgrp == NULL) { 489 return -ENOTSUP; 490 } 491 492 if (spdk_env_get_current_core() != g_scheduling_reactor->lcore) { 493 SPDK_ERRLOG("It is only permitted within scheduling reactor.\n"); 494 return -EPERM; 495 } 496 497 if (target->in_interrupt == new_in_interrupt) { 498 cb_fn(cb_arg, NULL); 499 return 0; 500 } 501 502 if (target->set_interrupt_mode_in_progress) { 503 SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore); 504 return -EBUSY; 505 } 506 target->set_interrupt_mode_in_progress = true; 507 508 target->new_in_interrupt = new_in_interrupt; 509 target->set_interrupt_mode_cb_fn = cb_fn; 510 target->set_interrupt_mode_cb_arg = cb_arg; 511 512 SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n", 513 spdk_env_get_current_core(), lcore); 514 515 if (new_in_interrupt == false) { 516 /* For potential race cases, when setting the reactor to poll mode, 517 * first change the mode of the reactor and then clear the corresponding 518 * bit of the notify_cpuset of each reactor. 519 */ 520 _event_call(lcore, _reactor_set_interrupt_mode, target, NULL); 521 } else { 522 /* For race cases, when setting the reactor to interrupt mode, first set the 523 * corresponding bit of the notify_cpuset of each reactor and then change the mode. 524 */ 525 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 526 } 527 528 return 0; 529 } 530 531 struct spdk_event * 532 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 533 { 534 struct spdk_event *event = NULL; 535 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 536 537 if (!reactor) { 538 assert(false); 539 return NULL; 540 } 541 542 event = spdk_mempool_get(g_spdk_event_mempool); 543 if (event == NULL) { 544 assert(false); 545 return NULL; 546 } 547 548 event->lcore = lcore; 549 event->fn = fn; 550 event->arg1 = arg1; 551 event->arg2 = arg2; 552 553 return event; 554 } 555 556 void 557 spdk_event_call(struct spdk_event *event) 558 { 559 int rc; 560 struct spdk_reactor *reactor; 561 struct spdk_reactor *local_reactor = NULL; 562 uint32_t current_core = spdk_env_get_current_core(); 563 564 reactor = spdk_reactor_get(event->lcore); 565 566 assert(reactor != NULL); 567 assert(reactor->events != NULL); 568 569 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); 570 if (rc != 1) { 571 assert(false); 572 } 573 574 if (current_core != SPDK_ENV_LCORE_ID_ANY) { 575 local_reactor = spdk_reactor_get(current_core); 576 } 577 578 /* If spdk_event_call isn't called on a reactor, always send a notification. 579 * If it is called on a reactor, send a notification if the destination reactor 580 * is indicated in interrupt mode state. 581 */ 582 if (spdk_unlikely(local_reactor == NULL) || 583 spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) { 584 uint64_t notify = 1; 585 586 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 587 if (rc < 0) { 588 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 589 } 590 } 591 } 592 593 static inline int 594 event_queue_run_batch(void *arg) 595 { 596 struct spdk_reactor *reactor = arg; 597 size_t count, i; 598 void *events[SPDK_EVENT_BATCH_SIZE]; 599 600 #ifdef DEBUG 601 /* 602 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 603 * so we will never actually read uninitialized data from events, but just to be sure 604 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 605 */ 606 memset(events, 0, sizeof(events)); 607 #endif 608 609 /* Operate event notification if this reactor currently runs in interrupt state */ 610 if (spdk_unlikely(reactor->in_interrupt)) { 611 uint64_t notify = 1; 612 int rc; 613 614 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 615 616 if (spdk_ring_count(reactor->events) != 0) { 617 /* Trigger new notification if there are still events in event-queue waiting for processing. */ 618 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 619 if (rc < 0) { 620 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 621 return -errno; 622 } 623 } 624 } else { 625 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 626 } 627 628 if (count == 0) { 629 return 0; 630 } 631 632 for (i = 0; i < count; i++) { 633 struct spdk_event *event = events[i]; 634 635 assert(event != NULL); 636 assert(spdk_get_thread() == NULL); 637 SPDK_DTRACE_PROBE3(event_exec, event->fn, 638 event->arg1, event->arg2); 639 event->fn(event->arg1, event->arg2); 640 } 641 642 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 643 644 return (int)count; 645 } 646 647 /* 1s */ 648 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 649 650 static int 651 get_rusage(struct spdk_reactor *reactor) 652 { 653 struct rusage rusage; 654 655 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 656 return -1; 657 } 658 659 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 660 SPDK_INFOLOG(reactor, 661 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 662 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 663 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 664 } 665 reactor->rusage = rusage; 666 667 return -1; 668 } 669 670 void 671 spdk_framework_enable_context_switch_monitor(bool enable) 672 { 673 /* This global is being read by multiple threads, so this isn't 674 * strictly thread safe. However, we're toggling between true and 675 * false here, and if a thread sees the value update later than it 676 * should, it's no big deal. */ 677 g_framework_context_switch_monitor_enabled = enable; 678 } 679 680 bool 681 spdk_framework_context_switch_monitor_enabled(void) 682 { 683 return g_framework_context_switch_monitor_enabled; 684 } 685 686 static void 687 _set_thread_name(const char *thread_name) 688 { 689 #if defined(__linux__) 690 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 691 #elif defined(__FreeBSD__) 692 pthread_set_name_np(pthread_self(), thread_name); 693 #else 694 pthread_setname_np(pthread_self(), thread_name); 695 #endif 696 } 697 698 static void 699 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 700 { 701 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 702 struct spdk_thread_stats prev_total_stats; 703 704 /* Read total_stats before updating it to calculate stats during the last scheduling period. */ 705 prev_total_stats = lw_thread->total_stats; 706 707 spdk_set_thread(thread); 708 spdk_thread_get_stats(&lw_thread->total_stats); 709 spdk_set_thread(NULL); 710 711 lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc; 712 lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc; 713 } 714 715 static void 716 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info) 717 { 718 struct spdk_lw_thread *lw_thread; 719 struct spdk_thread *thread; 720 721 thread = spdk_thread_get_by_id(thread_info->thread_id); 722 if (thread == NULL) { 723 /* Thread no longer exists. */ 724 return; 725 } 726 lw_thread = spdk_thread_get_ctx(thread); 727 assert(lw_thread != NULL); 728 729 lw_thread->lcore = thread_info->lcore; 730 lw_thread->resched = true; 731 } 732 733 static void 734 _threads_reschedule(struct spdk_scheduler_core_info *cores_info) 735 { 736 struct spdk_scheduler_core_info *core; 737 struct spdk_scheduler_thread_info *thread_info; 738 uint32_t i, j; 739 740 SPDK_ENV_FOREACH_CORE(i) { 741 core = &cores_info[i]; 742 for (j = 0; j < core->threads_count; j++) { 743 thread_info = &core->thread_infos[j]; 744 if (thread_info->lcore != i) { 745 if (core->isolated || cores_info[thread_info->lcore].isolated) { 746 SPDK_ERRLOG("A thread cannot be moved from an isolated core or \ 747 moved to an isolated core. Skip rescheduling thread\n"); 748 continue; 749 } 750 _threads_reschedule_thread(thread_info); 751 } 752 } 753 core->threads_count = 0; 754 free(core->thread_infos); 755 core->thread_infos = NULL; 756 } 757 } 758 759 static void 760 _reactors_scheduler_fini(void) 761 { 762 /* Reschedule based on the balancing output */ 763 _threads_reschedule(g_core_infos); 764 765 g_scheduling_in_progress = false; 766 } 767 768 static void 769 _reactors_scheduler_update_core_mode(void *ctx1, void *ctx2) 770 { 771 struct spdk_reactor *reactor; 772 uint32_t i; 773 int rc = 0; 774 775 for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) { 776 reactor = spdk_reactor_get(i); 777 assert(reactor != NULL); 778 if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) { 779 /* Switch next found reactor to new state */ 780 rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode, 781 _reactors_scheduler_update_core_mode, NULL); 782 if (rc == 0) { 783 /* Set core to start with after callback completes */ 784 g_scheduler_core_number = spdk_env_get_next_core(i); 785 return; 786 } 787 } 788 } 789 _reactors_scheduler_fini(); 790 } 791 792 static void 793 _reactors_scheduler_cancel(void *arg1, void *arg2) 794 { 795 struct spdk_scheduler_core_info *core; 796 uint32_t i; 797 798 SPDK_ENV_FOREACH_CORE(i) { 799 core = &g_core_infos[i]; 800 core->threads_count = 0; 801 free(core->thread_infos); 802 core->thread_infos = NULL; 803 } 804 805 g_scheduling_in_progress = false; 806 } 807 808 static void 809 _reactors_scheduler_balance(void *arg1, void *arg2) 810 { 811 struct spdk_scheduler *scheduler = spdk_scheduler_get(); 812 813 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) { 814 _reactors_scheduler_cancel(NULL, NULL); 815 return; 816 } 817 818 scheduler->balance(g_core_infos, g_reactor_count); 819 820 g_scheduler_core_number = spdk_env_get_first_core(); 821 _reactors_scheduler_update_core_mode(NULL, NULL); 822 } 823 824 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */ 825 static void 826 _reactors_scheduler_gather_metrics(void *arg1, void *arg2) 827 { 828 struct spdk_scheduler_core_info *core_info; 829 struct spdk_lw_thread *lw_thread; 830 struct spdk_thread *thread; 831 struct spdk_reactor *reactor; 832 uint32_t next_core; 833 uint32_t i = 0; 834 835 reactor = spdk_reactor_get(spdk_env_get_current_core()); 836 assert(reactor != NULL); 837 core_info = &g_core_infos[reactor->lcore]; 838 core_info->lcore = reactor->lcore; 839 core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc; 840 core_info->total_idle_tsc = reactor->idle_tsc; 841 core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc; 842 core_info->total_busy_tsc = reactor->busy_tsc; 843 core_info->interrupt_mode = reactor->in_interrupt; 844 core_info->threads_count = 0; 845 core_info->isolated = scheduler_is_isolated_core(reactor->lcore); 846 847 SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); 848 849 spdk_trace_record(TRACE_SCHEDULER_CORE_STATS, reactor->trace_id, 0, 0, 850 core_info->current_busy_tsc, 851 core_info->current_idle_tsc); 852 853 if (reactor->thread_count > 0) { 854 core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos)); 855 if (core_info->thread_infos == NULL) { 856 SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore); 857 858 /* Cancel this round of schedule work */ 859 _event_call(spdk_scheduler_get_scheduling_lcore(), _reactors_scheduler_cancel, NULL, NULL); 860 return; 861 } 862 863 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 864 _init_thread_stats(reactor, lw_thread); 865 866 core_info->thread_infos[i].lcore = lw_thread->lcore; 867 thread = spdk_thread_get_from_ctx(lw_thread); 868 assert(thread != NULL); 869 core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread); 870 core_info->thread_infos[i].total_stats = lw_thread->total_stats; 871 core_info->thread_infos[i].current_stats = lw_thread->current_stats; 872 core_info->threads_count++; 873 assert(core_info->threads_count <= reactor->thread_count); 874 875 spdk_trace_record(TRACE_SCHEDULER_THREAD_STATS, spdk_thread_get_trace_id(thread), 0, 0, 876 lw_thread->current_stats.busy_tsc, 877 lw_thread->current_stats.idle_tsc); 878 879 i++; 880 } 881 } 882 883 next_core = spdk_env_get_next_core(reactor->lcore); 884 if (next_core == UINT32_MAX) { 885 next_core = spdk_env_get_first_core(); 886 } 887 888 /* If we've looped back around to the scheduler thread, move to the next phase */ 889 if (next_core == spdk_scheduler_get_scheduling_lcore()) { 890 /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */ 891 _event_call(next_core, _reactors_scheduler_balance, NULL, NULL); 892 return; 893 } 894 895 _event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL); 896 } 897 898 static int _reactor_schedule_thread(struct spdk_thread *thread); 899 static uint64_t g_rusage_period; 900 901 static void 902 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 903 { 904 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 905 struct spdk_fd_group *grp; 906 907 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 908 assert(reactor->thread_count > 0); 909 reactor->thread_count--; 910 911 /* Operate thread intr if running with full interrupt ability */ 912 if (spdk_interrupt_mode_is_enabled()) { 913 if (reactor->in_interrupt) { 914 grp = spdk_thread_get_interrupt_fd_group(thread); 915 spdk_fd_group_unnest(reactor->fgrp, grp); 916 } 917 } 918 } 919 920 static bool 921 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 922 { 923 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 924 925 if (spdk_unlikely(spdk_thread_is_exited(thread) && 926 spdk_thread_is_idle(thread))) { 927 _reactor_remove_lw_thread(reactor, lw_thread); 928 spdk_thread_destroy(thread); 929 return true; 930 } 931 932 if (spdk_unlikely(lw_thread->resched && !spdk_thread_is_bound(thread))) { 933 lw_thread->resched = false; 934 _reactor_remove_lw_thread(reactor, lw_thread); 935 _reactor_schedule_thread(thread); 936 return true; 937 } 938 939 return false; 940 } 941 942 static void 943 reactor_interrupt_run(struct spdk_reactor *reactor) 944 { 945 int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */ 946 947 spdk_fd_group_wait(reactor->fgrp, block_timeout); 948 } 949 950 static void 951 _reactor_run(struct spdk_reactor *reactor) 952 { 953 struct spdk_thread *thread; 954 struct spdk_lw_thread *lw_thread, *tmp; 955 uint64_t now; 956 int rc; 957 958 event_queue_run_batch(reactor); 959 960 /* If no threads are present on the reactor, 961 * tsc_last gets outdated. Update it to track 962 * thread execution time correctly. */ 963 if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) { 964 now = spdk_get_ticks(); 965 reactor->idle_tsc += now - reactor->tsc_last; 966 reactor->tsc_last = now; 967 return; 968 } 969 970 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 971 thread = spdk_thread_get_from_ctx(lw_thread); 972 rc = spdk_thread_poll(thread, 0, reactor->tsc_last); 973 974 now = spdk_thread_get_last_tsc(thread); 975 if (rc == 0) { 976 reactor->idle_tsc += now - reactor->tsc_last; 977 } else if (rc > 0) { 978 reactor->busy_tsc += now - reactor->tsc_last; 979 } 980 reactor->tsc_last = now; 981 982 reactor_post_process_lw_thread(reactor, lw_thread); 983 } 984 } 985 986 static int 987 reactor_run(void *arg) 988 { 989 struct spdk_reactor *reactor = arg; 990 struct spdk_thread *thread; 991 struct spdk_lw_thread *lw_thread, *tmp; 992 char thread_name[32]; 993 uint64_t last_sched = 0; 994 995 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 996 997 /* Rename the POSIX thread because the reactor is tied to the POSIX 998 * thread in the SPDK event library. 999 */ 1000 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 1001 _set_thread_name(thread_name); 1002 1003 reactor->trace_id = spdk_trace_register_owner(OWNER_TYPE_REACTOR, thread_name); 1004 1005 reactor->tsc_last = spdk_get_ticks(); 1006 1007 while (1) { 1008 /* Execute interrupt process fn if this reactor currently runs in interrupt state */ 1009 if (spdk_unlikely(reactor->in_interrupt)) { 1010 reactor_interrupt_run(reactor); 1011 } else { 1012 _reactor_run(reactor); 1013 } 1014 1015 if (g_framework_context_switch_monitor_enabled) { 1016 if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { 1017 get_rusage(reactor); 1018 reactor->last_rusage = reactor->tsc_last; 1019 } 1020 } 1021 1022 if (spdk_unlikely(g_scheduler_period_in_tsc > 0 && 1023 (reactor->tsc_last - last_sched) > g_scheduler_period_in_tsc && 1024 reactor == g_scheduling_reactor && 1025 !g_scheduling_in_progress)) { 1026 last_sched = reactor->tsc_last; 1027 g_scheduling_in_progress = true; 1028 spdk_trace_record(TRACE_SCHEDULER_PERIOD_START, 0, 0, 0); 1029 _reactors_scheduler_gather_metrics(NULL, NULL); 1030 } 1031 1032 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 1033 break; 1034 } 1035 } 1036 1037 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 1038 thread = spdk_thread_get_from_ctx(lw_thread); 1039 /* All threads should have already had spdk_thread_exit() called on them, except 1040 * for the app thread. 1041 */ 1042 if (spdk_thread_is_running(thread)) { 1043 if (!spdk_thread_is_app_thread(thread)) { 1044 SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n", 1045 spdk_thread_get_name(thread)); 1046 SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n"); 1047 } 1048 spdk_set_thread(thread); 1049 spdk_thread_exit(thread); 1050 } 1051 } 1052 1053 while (!TAILQ_EMPTY(&reactor->threads)) { 1054 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1055 thread = spdk_thread_get_from_ctx(lw_thread); 1056 spdk_set_thread(thread); 1057 if (spdk_thread_is_exited(thread)) { 1058 _reactor_remove_lw_thread(reactor, lw_thread); 1059 spdk_thread_destroy(thread); 1060 } else { 1061 if (spdk_unlikely(reactor->in_interrupt)) { 1062 reactor_interrupt_run(reactor); 1063 } else { 1064 spdk_thread_poll(thread, 0, 0); 1065 } 1066 } 1067 } 1068 } 1069 1070 return 0; 1071 } 1072 1073 int 1074 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 1075 { 1076 int ret; 1077 const struct spdk_cpuset *validmask; 1078 1079 ret = spdk_cpuset_parse(cpumask, mask); 1080 if (ret < 0) { 1081 return ret; 1082 } 1083 1084 validmask = spdk_app_get_core_mask(); 1085 spdk_cpuset_and(cpumask, validmask); 1086 1087 return 0; 1088 } 1089 1090 const struct spdk_cpuset * 1091 spdk_app_get_core_mask(void) 1092 { 1093 return &g_reactor_core_mask; 1094 } 1095 1096 void 1097 spdk_reactors_start(void) 1098 { 1099 struct spdk_reactor *reactor; 1100 uint32_t i, current_core; 1101 int rc; 1102 1103 g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; 1104 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 1105 /* Reinitialize to false, in case the app framework is restarting in the same process. */ 1106 g_stopping_reactors = false; 1107 1108 current_core = spdk_env_get_current_core(); 1109 SPDK_ENV_FOREACH_CORE(i) { 1110 if (i != current_core) { 1111 reactor = spdk_reactor_get(i); 1112 if (reactor == NULL) { 1113 continue; 1114 } 1115 1116 rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); 1117 if (rc < 0) { 1118 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 1119 assert(false); 1120 return; 1121 } 1122 } 1123 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 1124 } 1125 1126 /* Start the main reactor */ 1127 reactor = spdk_reactor_get(current_core); 1128 assert(reactor != NULL); 1129 reactor_run(reactor); 1130 1131 spdk_env_thread_wait_all(); 1132 1133 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 1134 } 1135 1136 static void 1137 _reactors_stop(void *arg1, void *arg2) 1138 { 1139 uint32_t i; 1140 int rc; 1141 struct spdk_reactor *reactor; 1142 struct spdk_reactor *local_reactor; 1143 uint64_t notify = 1; 1144 1145 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 1146 local_reactor = spdk_reactor_get(spdk_env_get_current_core()); 1147 1148 SPDK_ENV_FOREACH_CORE(i) { 1149 /* If spdk_event_call isn't called on a reactor, always send a notification. 1150 * If it is called on a reactor, send a notification if the destination reactor 1151 * is indicated in interrupt mode state. 1152 */ 1153 if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) { 1154 reactor = spdk_reactor_get(i); 1155 assert(reactor != NULL); 1156 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 1157 if (rc < 0) { 1158 SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno)); 1159 continue; 1160 } 1161 } 1162 } 1163 } 1164 1165 static void 1166 nop(void *arg1, void *arg2) 1167 { 1168 } 1169 1170 void 1171 spdk_reactors_stop(void *arg1) 1172 { 1173 spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop); 1174 } 1175 1176 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 1177 static uint32_t g_next_core = UINT32_MAX; 1178 1179 static void 1180 _schedule_thread(void *arg1, void *arg2) 1181 { 1182 struct spdk_lw_thread *lw_thread = arg1; 1183 struct spdk_thread *thread; 1184 struct spdk_reactor *reactor; 1185 uint32_t current_core; 1186 struct spdk_fd_group *grp; 1187 1188 current_core = spdk_env_get_current_core(); 1189 reactor = spdk_reactor_get(current_core); 1190 assert(reactor != NULL); 1191 1192 /* Update total_stats to reflect state of thread 1193 * at the end of the move. */ 1194 thread = spdk_thread_get_from_ctx(lw_thread); 1195 spdk_set_thread(thread); 1196 spdk_thread_get_stats(&lw_thread->total_stats); 1197 spdk_set_thread(NULL); 1198 1199 if (lw_thread->initial_lcore == SPDK_ENV_LCORE_ID_ANY) { 1200 lw_thread->initial_lcore = current_core; 1201 } 1202 lw_thread->lcore = current_core; 1203 1204 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 1205 reactor->thread_count++; 1206 1207 /* Operate thread intr if running with full interrupt ability */ 1208 if (spdk_interrupt_mode_is_enabled()) { 1209 int rc; 1210 1211 if (reactor->in_interrupt) { 1212 grp = spdk_thread_get_interrupt_fd_group(thread); 1213 rc = spdk_fd_group_nest(reactor->fgrp, grp); 1214 if (rc < 0) { 1215 SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc)); 1216 } 1217 } 1218 1219 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 1220 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor); 1221 } 1222 } 1223 1224 static int 1225 _reactor_schedule_thread(struct spdk_thread *thread) 1226 { 1227 uint32_t core, initial_core; 1228 struct spdk_lw_thread *lw_thread; 1229 struct spdk_event *evt = NULL; 1230 struct spdk_cpuset *cpumask; 1231 uint32_t i; 1232 struct spdk_reactor *local_reactor = NULL; 1233 uint32_t current_lcore = spdk_env_get_current_core(); 1234 struct spdk_cpuset polling_cpumask; 1235 struct spdk_cpuset valid_cpumask; 1236 1237 cpumask = spdk_thread_get_cpumask(thread); 1238 1239 lw_thread = spdk_thread_get_ctx(thread); 1240 assert(lw_thread != NULL); 1241 core = lw_thread->lcore; 1242 initial_core = lw_thread->initial_lcore; 1243 memset(lw_thread, 0, sizeof(*lw_thread)); 1244 lw_thread->initial_lcore = initial_core; 1245 1246 if (current_lcore != SPDK_ENV_LCORE_ID_ANY) { 1247 local_reactor = spdk_reactor_get(current_lcore); 1248 assert(local_reactor); 1249 } 1250 1251 /* When interrupt ability of spdk_thread is not enabled and the current 1252 * reactor runs on DPDK thread, skip reactors which are in interrupt mode. 1253 */ 1254 if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) { 1255 /* Get the cpumask of all reactors in polling */ 1256 spdk_cpuset_zero(&polling_cpumask); 1257 SPDK_ENV_FOREACH_CORE(i) { 1258 spdk_cpuset_set_cpu(&polling_cpumask, i, true); 1259 } 1260 spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset); 1261 1262 if (core == SPDK_ENV_LCORE_ID_ANY) { 1263 /* Get the cpumask of all valid reactors which are suggested and also in polling */ 1264 spdk_cpuset_copy(&valid_cpumask, &polling_cpumask); 1265 spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread)); 1266 1267 /* If there are any valid reactors, spdk_thread should be scheduled 1268 * into one of the valid reactors. 1269 * If there is no valid reactors, spdk_thread should be scheduled 1270 * into one of the polling reactors. 1271 */ 1272 if (spdk_cpuset_count(&valid_cpumask) != 0) { 1273 cpumask = &valid_cpumask; 1274 } else { 1275 cpumask = &polling_cpumask; 1276 } 1277 } else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) { 1278 /* If specified reactor is not in polling, spdk_thread should be scheduled 1279 * into one of the polling reactors. 1280 */ 1281 core = SPDK_ENV_LCORE_ID_ANY; 1282 cpumask = &polling_cpumask; 1283 } 1284 } 1285 1286 pthread_mutex_lock(&g_scheduler_mtx); 1287 if (core == SPDK_ENV_LCORE_ID_ANY) { 1288 for (i = 0; i < spdk_env_get_core_count(); i++) { 1289 if (g_next_core >= g_reactor_count) { 1290 g_next_core = spdk_env_get_first_core(); 1291 } 1292 core = g_next_core; 1293 g_next_core = spdk_env_get_next_core(g_next_core); 1294 1295 if (spdk_cpuset_get_cpu(cpumask, core)) { 1296 break; 1297 } 1298 } 1299 } 1300 1301 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 1302 1303 if (current_lcore != core) { 1304 spdk_trace_record(TRACE_SCHEDULER_MOVE_THREAD, spdk_thread_get_trace_id(thread), 0, 0, 1305 current_lcore, core); 1306 } 1307 1308 pthread_mutex_unlock(&g_scheduler_mtx); 1309 1310 assert(evt != NULL); 1311 if (evt == NULL) { 1312 SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); 1313 return -1; 1314 } 1315 1316 lw_thread->tsc_start = spdk_get_ticks(); 1317 1318 spdk_event_call(evt); 1319 1320 return 0; 1321 } 1322 1323 static void 1324 _reactor_request_thread_reschedule(struct spdk_thread *thread) 1325 { 1326 struct spdk_lw_thread *lw_thread; 1327 struct spdk_reactor *reactor; 1328 uint32_t current_core; 1329 1330 assert(thread == spdk_get_thread()); 1331 1332 lw_thread = spdk_thread_get_ctx(thread); 1333 1334 assert(lw_thread != NULL); 1335 lw_thread->resched = true; 1336 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1337 1338 current_core = spdk_env_get_current_core(); 1339 reactor = spdk_reactor_get(current_core); 1340 assert(reactor != NULL); 1341 1342 /* Send a notification if the destination reactor is indicated in intr mode state */ 1343 if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) { 1344 uint64_t notify = 1; 1345 1346 if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1347 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 1348 } 1349 } 1350 } 1351 1352 static int 1353 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) 1354 { 1355 struct spdk_lw_thread *lw_thread; 1356 1357 switch (op) { 1358 case SPDK_THREAD_OP_NEW: 1359 lw_thread = spdk_thread_get_ctx(thread); 1360 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1361 lw_thread->initial_lcore = SPDK_ENV_LCORE_ID_ANY; 1362 return _reactor_schedule_thread(thread); 1363 case SPDK_THREAD_OP_RESCHED: 1364 _reactor_request_thread_reschedule(thread); 1365 return 0; 1366 default: 1367 return -ENOTSUP; 1368 } 1369 } 1370 1371 static bool 1372 reactor_thread_op_supported(enum spdk_thread_op op) 1373 { 1374 switch (op) { 1375 case SPDK_THREAD_OP_NEW: 1376 case SPDK_THREAD_OP_RESCHED: 1377 return true; 1378 default: 1379 return false; 1380 } 1381 } 1382 1383 struct call_reactor { 1384 uint32_t cur_core; 1385 spdk_event_fn fn; 1386 void *arg1; 1387 void *arg2; 1388 1389 uint32_t orig_core; 1390 spdk_event_fn cpl; 1391 }; 1392 1393 static void 1394 on_reactor(void *arg1, void *arg2) 1395 { 1396 struct call_reactor *cr = arg1; 1397 struct spdk_event *evt; 1398 1399 cr->fn(cr->arg1, cr->arg2); 1400 1401 cr->cur_core = spdk_env_get_next_core(cr->cur_core); 1402 1403 if (cr->cur_core >= g_reactor_count) { 1404 SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n"); 1405 1406 evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); 1407 free(cr); 1408 } else { 1409 SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n", 1410 cr->cur_core); 1411 1412 evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); 1413 } 1414 assert(evt != NULL); 1415 spdk_event_call(evt); 1416 } 1417 1418 void 1419 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) 1420 { 1421 struct call_reactor *cr; 1422 1423 /* When the application framework is shutting down, we will send one 1424 * final for_each_reactor operation with completion callback _reactors_stop, 1425 * to flush any existing for_each_reactor operations to avoid any memory 1426 * leaks. We use a mutex here to protect a boolean flag that will ensure 1427 * we don't start any more operations once we've started shutting down. 1428 */ 1429 pthread_mutex_lock(&g_stopping_reactors_mtx); 1430 if (g_stopping_reactors) { 1431 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1432 return; 1433 } else if (cpl == _reactors_stop) { 1434 g_stopping_reactors = true; 1435 } 1436 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1437 1438 cr = calloc(1, sizeof(*cr)); 1439 if (!cr) { 1440 SPDK_ERRLOG("Unable to perform reactor iteration\n"); 1441 cpl(arg1, arg2); 1442 return; 1443 } 1444 1445 cr->fn = fn; 1446 cr->arg1 = arg1; 1447 cr->arg2 = arg2; 1448 cr->cpl = cpl; 1449 cr->orig_core = spdk_env_get_current_core(); 1450 cr->cur_core = spdk_env_get_first_core(); 1451 1452 SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core); 1453 1454 _event_call(cr->cur_core, on_reactor, cr, NULL); 1455 } 1456 1457 #ifdef __linux__ 1458 static int 1459 reactor_schedule_thread_event(void *arg) 1460 { 1461 struct spdk_reactor *reactor = arg; 1462 struct spdk_lw_thread *lw_thread, *tmp; 1463 uint32_t count = 0; 1464 1465 assert(reactor->in_interrupt); 1466 1467 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1468 count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0; 1469 } 1470 1471 return count; 1472 } 1473 1474 static int 1475 reactor_interrupt_init(struct spdk_reactor *reactor) 1476 { 1477 struct spdk_event_handler_opts opts = {}; 1478 int rc; 1479 1480 rc = spdk_fd_group_create(&reactor->fgrp); 1481 if (rc != 0) { 1482 return rc; 1483 } 1484 1485 reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1486 if (reactor->resched_fd < 0) { 1487 rc = -EBADF; 1488 goto err; 1489 } 1490 1491 spdk_fd_group_get_default_event_handler_opts(&opts, sizeof(opts)); 1492 opts.fd_type = SPDK_FD_TYPE_EVENTFD; 1493 1494 rc = SPDK_FD_GROUP_ADD_EXT(reactor->fgrp, reactor->resched_fd, 1495 reactor_schedule_thread_event, reactor, &opts); 1496 if (rc) { 1497 close(reactor->resched_fd); 1498 goto err; 1499 } 1500 1501 reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1502 if (reactor->events_fd < 0) { 1503 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1504 close(reactor->resched_fd); 1505 1506 rc = -EBADF; 1507 goto err; 1508 } 1509 1510 rc = SPDK_FD_GROUP_ADD_EXT(reactor->fgrp, reactor->events_fd, 1511 event_queue_run_batch, reactor, &opts); 1512 if (rc) { 1513 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1514 close(reactor->resched_fd); 1515 close(reactor->events_fd); 1516 goto err; 1517 } 1518 1519 return 0; 1520 1521 err: 1522 spdk_fd_group_destroy(reactor->fgrp); 1523 reactor->fgrp = NULL; 1524 return rc; 1525 } 1526 #else 1527 static int 1528 reactor_interrupt_init(struct spdk_reactor *reactor) 1529 { 1530 return -ENOTSUP; 1531 } 1532 #endif 1533 1534 static void 1535 reactor_interrupt_fini(struct spdk_reactor *reactor) 1536 { 1537 struct spdk_fd_group *fgrp = reactor->fgrp; 1538 1539 if (!fgrp) { 1540 return; 1541 } 1542 1543 spdk_fd_group_remove(fgrp, reactor->events_fd); 1544 spdk_fd_group_remove(fgrp, reactor->resched_fd); 1545 1546 close(reactor->events_fd); 1547 close(reactor->resched_fd); 1548 1549 spdk_fd_group_destroy(fgrp); 1550 reactor->fgrp = NULL; 1551 } 1552 1553 static struct spdk_governor * 1554 _governor_find(const char *name) 1555 { 1556 struct spdk_governor *governor, *tmp; 1557 1558 TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) { 1559 if (strcmp(name, governor->name) == 0) { 1560 return governor; 1561 } 1562 } 1563 1564 return NULL; 1565 } 1566 1567 int 1568 spdk_governor_set(const char *name) 1569 { 1570 struct spdk_governor *governor; 1571 int rc = 0; 1572 1573 /* NULL governor was specifically requested */ 1574 if (name == NULL) { 1575 if (g_governor) { 1576 g_governor->deinit(); 1577 } 1578 g_governor = NULL; 1579 return 0; 1580 } 1581 1582 governor = _governor_find(name); 1583 if (governor == NULL) { 1584 return -EINVAL; 1585 } 1586 1587 if (g_governor == governor) { 1588 return 0; 1589 } 1590 1591 rc = governor->init(); 1592 if (rc == 0) { 1593 if (g_governor) { 1594 g_governor->deinit(); 1595 } 1596 g_governor = governor; 1597 } 1598 1599 return rc; 1600 } 1601 1602 struct spdk_governor * 1603 spdk_governor_get(void) 1604 { 1605 return g_governor; 1606 } 1607 1608 void 1609 spdk_governor_register(struct spdk_governor *governor) 1610 { 1611 if (_governor_find(governor->name)) { 1612 SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name); 1613 assert(false); 1614 return; 1615 } 1616 1617 TAILQ_INSERT_TAIL(&g_governor_list, governor, link); 1618 } 1619 1620 SPDK_LOG_REGISTER_COMPONENT(reactor) 1621 1622 static void 1623 scheduler_trace(void) 1624 { 1625 struct spdk_trace_tpoint_opts opts[] = { 1626 { 1627 "SCHEDULER_PERIOD_START", TRACE_SCHEDULER_PERIOD_START, 1628 OWNER_TYPE_NONE, OBJECT_NONE, 0, 1629 { 1630 1631 } 1632 }, 1633 { 1634 "SCHEDULER_CORE_STATS", TRACE_SCHEDULER_CORE_STATS, 1635 OWNER_TYPE_REACTOR, OBJECT_NONE, 0, 1636 { 1637 { "busy", SPDK_TRACE_ARG_TYPE_INT, 8}, 1638 { "idle", SPDK_TRACE_ARG_TYPE_INT, 8} 1639 } 1640 }, 1641 { 1642 "SCHEDULER_THREAD_STATS", TRACE_SCHEDULER_THREAD_STATS, 1643 OWNER_TYPE_THREAD, OBJECT_NONE, 0, 1644 { 1645 { "busy", SPDK_TRACE_ARG_TYPE_INT, 8}, 1646 { "idle", SPDK_TRACE_ARG_TYPE_INT, 8} 1647 } 1648 }, 1649 { 1650 "SCHEDULER_MOVE_THREAD", TRACE_SCHEDULER_MOVE_THREAD, 1651 OWNER_TYPE_THREAD, OBJECT_NONE, 0, 1652 { 1653 { "src", SPDK_TRACE_ARG_TYPE_INT, 8 }, 1654 { "dst", SPDK_TRACE_ARG_TYPE_INT, 8 } 1655 } 1656 } 1657 }; 1658 1659 spdk_trace_register_owner_type(OWNER_TYPE_REACTOR, 'r'); 1660 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 1661 1662 } 1663 1664 SPDK_TRACE_REGISTER_FN(scheduler_trace, "scheduler", TRACE_GROUP_SCHEDULER) 1665