1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/likely.h" 8 9 #include "event_internal.h" 10 11 #include "spdk_internal/event.h" 12 #include "spdk_internal/usdt.h" 13 14 #include "spdk/log.h" 15 #include "spdk/thread.h" 16 #include "spdk/env.h" 17 #include "spdk/util.h" 18 #include "spdk/scheduler.h" 19 #include "spdk/string.h" 20 #include "spdk/fd_group.h" 21 #include "spdk/trace.h" 22 #include "spdk_internal/trace_defs.h" 23 24 #ifdef __linux__ 25 #include <sys/prctl.h> 26 #include <sys/eventfd.h> 27 #endif 28 29 #ifdef __FreeBSD__ 30 #include <pthread_np.h> 31 #endif 32 33 #define SPDK_EVENT_BATCH_SIZE 8 34 35 static struct spdk_reactor *g_reactors; 36 static uint32_t g_reactor_count; 37 static struct spdk_cpuset g_reactor_core_mask; 38 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; 39 40 static bool g_framework_context_switch_monitor_enabled = true; 41 42 static struct spdk_mempool *g_spdk_event_mempool = NULL; 43 44 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list 45 = TAILQ_HEAD_INITIALIZER(g_scheduler_list); 46 47 static struct spdk_scheduler *g_scheduler = NULL; 48 static struct spdk_reactor *g_scheduling_reactor; 49 bool g_scheduling_in_progress = false; 50 static uint64_t g_scheduler_period = 0; 51 static uint32_t g_scheduler_core_number; 52 static struct spdk_scheduler_core_info *g_core_infos = NULL; 53 static struct spdk_cpuset g_scheduler_isolated_core_mask; 54 55 TAILQ_HEAD(, spdk_governor) g_governor_list 56 = TAILQ_HEAD_INITIALIZER(g_governor_list); 57 58 static struct spdk_governor *g_governor = NULL; 59 60 static int reactor_interrupt_init(struct spdk_reactor *reactor); 61 static void reactor_interrupt_fini(struct spdk_reactor *reactor); 62 63 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER; 64 static bool g_stopping_reactors = false; 65 66 static struct spdk_scheduler * 67 _scheduler_find(const char *name) 68 { 69 struct spdk_scheduler *tmp; 70 71 TAILQ_FOREACH(tmp, &g_scheduler_list, link) { 72 if (strcmp(name, tmp->name) == 0) { 73 return tmp; 74 } 75 } 76 77 return NULL; 78 } 79 80 int 81 spdk_scheduler_set(const char *name) 82 { 83 struct spdk_scheduler *scheduler; 84 int rc = 0; 85 86 /* NULL scheduler was specifically requested */ 87 if (name == NULL) { 88 if (g_scheduler) { 89 g_scheduler->deinit(); 90 } 91 g_scheduler = NULL; 92 return 0; 93 } 94 95 scheduler = _scheduler_find(name); 96 if (scheduler == NULL) { 97 SPDK_ERRLOG("Requested scheduler is missing\n"); 98 return -EINVAL; 99 } 100 101 if (g_scheduler == scheduler) { 102 return 0; 103 } 104 105 if (g_scheduler) { 106 g_scheduler->deinit(); 107 } 108 109 rc = scheduler->init(); 110 if (rc == 0) { 111 g_scheduler = scheduler; 112 } else { 113 /* Could not switch to the new scheduler, so keep the old 114 * one. We need to check if it wasn't NULL, and ->init() it again. 115 */ 116 if (g_scheduler) { 117 SPDK_ERRLOG("Could not ->init() '%s' scheduler, reverting to '%s'\n", 118 name, g_scheduler->name); 119 g_scheduler->init(); 120 } else { 121 SPDK_ERRLOG("Could not ->init() '%s' scheduler.\n", name); 122 } 123 } 124 125 return rc; 126 } 127 128 struct spdk_scheduler * 129 spdk_scheduler_get(void) 130 { 131 return g_scheduler; 132 } 133 134 uint64_t 135 spdk_scheduler_get_period(void) 136 { 137 /* Convert from ticks to microseconds */ 138 return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz()); 139 } 140 141 void 142 spdk_scheduler_set_period(uint64_t period) 143 { 144 /* Convert microseconds to ticks */ 145 g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 146 } 147 148 void 149 spdk_scheduler_register(struct spdk_scheduler *scheduler) 150 { 151 if (_scheduler_find(scheduler->name)) { 152 SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name); 153 assert(false); 154 return; 155 } 156 157 TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link); 158 } 159 160 uint32_t 161 spdk_scheduler_get_scheduling_lcore(void) 162 { 163 return g_scheduling_reactor->lcore; 164 } 165 166 bool 167 spdk_scheduler_set_scheduling_lcore(uint32_t core) 168 { 169 struct spdk_reactor *reactor = spdk_reactor_get(core); 170 if (reactor == NULL) { 171 SPDK_ERRLOG("Failed to set scheduling reactor. Reactor(lcore:%d) does not exist", core); 172 return false; 173 } 174 175 g_scheduling_reactor = reactor; 176 return true; 177 } 178 179 bool 180 scheduler_set_isolated_core_mask(struct spdk_cpuset isolated_core_mask) 181 { 182 struct spdk_cpuset tmp_mask; 183 184 spdk_cpuset_copy(&tmp_mask, spdk_app_get_core_mask()); 185 spdk_cpuset_or(&tmp_mask, &isolated_core_mask); 186 if (spdk_cpuset_equal(&tmp_mask, spdk_app_get_core_mask()) == false) { 187 SPDK_ERRLOG("Isolated core mask is not included in app core mask.\n"); 188 return false; 189 } 190 spdk_cpuset_copy(&g_scheduler_isolated_core_mask, &isolated_core_mask); 191 return true; 192 } 193 194 const char * 195 scheduler_get_isolated_core_mask(void) 196 { 197 return spdk_cpuset_fmt(&g_scheduler_isolated_core_mask); 198 } 199 200 static bool 201 scheduler_is_isolated_core(uint32_t core) 202 { 203 return spdk_cpuset_get_cpu(&g_scheduler_isolated_core_mask, core); 204 } 205 206 static void 207 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 208 { 209 reactor->lcore = lcore; 210 reactor->flags.is_valid = true; 211 212 TAILQ_INIT(&reactor->threads); 213 reactor->thread_count = 0; 214 spdk_cpuset_zero(&reactor->notify_cpuset); 215 216 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY); 217 if (reactor->events == NULL) { 218 SPDK_ERRLOG("Failed to allocate events ring\n"); 219 assert(false); 220 } 221 222 /* Always initialize interrupt facilities for reactor */ 223 if (reactor_interrupt_init(reactor) != 0) { 224 /* Reactor interrupt facilities are necessary if setting app to interrupt mode. */ 225 if (spdk_interrupt_mode_is_enabled()) { 226 SPDK_ERRLOG("Failed to prepare intr facilities\n"); 227 assert(false); 228 } 229 return; 230 } 231 232 /* If application runs with full interrupt ability, 233 * all reactors are going to run in interrupt mode. 234 */ 235 if (spdk_interrupt_mode_is_enabled()) { 236 uint32_t i; 237 238 SPDK_ENV_FOREACH_CORE(i) { 239 spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true); 240 } 241 reactor->in_interrupt = true; 242 } 243 } 244 245 struct spdk_reactor * 246 spdk_reactor_get(uint32_t lcore) 247 { 248 struct spdk_reactor *reactor; 249 250 if (g_reactors == NULL) { 251 SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); 252 return NULL; 253 } 254 255 if (lcore >= g_reactor_count) { 256 return NULL; 257 } 258 259 reactor = &g_reactors[lcore]; 260 261 if (reactor->flags.is_valid == false) { 262 return NULL; 263 } 264 265 return reactor; 266 } 267 268 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); 269 static bool reactor_thread_op_supported(enum spdk_thread_op op); 270 271 /* Power of 2 minus 1 is optimal for memory consumption */ 272 #define EVENT_MSG_MEMPOOL_SHIFT 14 /* 2^14 = 16384 */ 273 #define EVENT_MSG_MEMPOOL_SIZE ((1 << EVENT_MSG_MEMPOOL_SHIFT) - 1) 274 275 int 276 spdk_reactors_init(size_t msg_mempool_size) 277 { 278 struct spdk_reactor *reactor; 279 int rc; 280 uint32_t i, current_core; 281 char mempool_name[32]; 282 283 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 284 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 285 EVENT_MSG_MEMPOOL_SIZE, 286 sizeof(struct spdk_event), 287 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 288 SPDK_ENV_NUMA_ID_ANY); 289 290 if (g_spdk_event_mempool == NULL) { 291 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 292 return -1; 293 } 294 295 /* struct spdk_reactor must be aligned on 64 byte boundary */ 296 g_reactor_count = spdk_env_get_last_core() + 1; 297 rc = posix_memalign((void **)&g_reactors, 64, 298 g_reactor_count * sizeof(struct spdk_reactor)); 299 if (rc != 0) { 300 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 301 g_reactor_count); 302 spdk_mempool_free(g_spdk_event_mempool); 303 return -1; 304 } 305 306 g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos)); 307 if (g_core_infos == NULL) { 308 SPDK_ERRLOG("Could not allocate memory for g_core_infos\n"); 309 spdk_mempool_free(g_spdk_event_mempool); 310 free(g_reactors); 311 return -ENOMEM; 312 } 313 314 memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor)); 315 316 rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, 317 sizeof(struct spdk_lw_thread), msg_mempool_size); 318 if (rc != 0) { 319 SPDK_ERRLOG("Initialize spdk thread lib failed\n"); 320 spdk_mempool_free(g_spdk_event_mempool); 321 free(g_reactors); 322 free(g_core_infos); 323 return rc; 324 } 325 326 SPDK_ENV_FOREACH_CORE(i) { 327 reactor_construct(&g_reactors[i], i); 328 } 329 330 current_core = spdk_env_get_current_core(); 331 reactor = spdk_reactor_get(current_core); 332 assert(reactor != NULL); 333 g_scheduling_reactor = reactor; 334 335 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 336 337 return 0; 338 } 339 340 void 341 spdk_reactors_fini(void) 342 { 343 uint32_t i; 344 struct spdk_reactor *reactor; 345 346 if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { 347 return; 348 } 349 350 spdk_thread_lib_fini(); 351 352 SPDK_ENV_FOREACH_CORE(i) { 353 reactor = spdk_reactor_get(i); 354 assert(reactor != NULL); 355 assert(reactor->thread_count == 0); 356 if (reactor->events != NULL) { 357 spdk_ring_free(reactor->events); 358 } 359 360 reactor_interrupt_fini(reactor); 361 362 if (g_core_infos != NULL) { 363 free(g_core_infos[i].thread_infos); 364 } 365 } 366 367 spdk_mempool_free(g_spdk_event_mempool); 368 369 free(g_reactors); 370 g_reactors = NULL; 371 free(g_core_infos); 372 g_core_infos = NULL; 373 } 374 375 static void _reactor_set_interrupt_mode(void *arg1, void *arg2); 376 377 static void 378 _reactor_set_notify_cpuset(void *arg1, void *arg2) 379 { 380 struct spdk_reactor *target = arg1; 381 struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); 382 383 assert(reactor != NULL); 384 spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt); 385 } 386 387 static void 388 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 389 { 390 struct spdk_event *ev; 391 392 ev = spdk_event_allocate(lcore, fn, arg1, arg2); 393 assert(ev); 394 spdk_event_call(ev); 395 } 396 397 static void 398 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2) 399 { 400 struct spdk_reactor *target = arg1; 401 402 if (target->new_in_interrupt == false) { 403 target->set_interrupt_mode_in_progress = false; 404 _event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn, 405 target->set_interrupt_mode_cb_arg, NULL); 406 } else { 407 _event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL); 408 } 409 } 410 411 static void 412 _reactor_set_thread_interrupt_mode(void *ctx) 413 { 414 struct spdk_reactor *reactor = ctx; 415 416 spdk_thread_set_interrupt_mode(reactor->in_interrupt); 417 } 418 419 static void 420 _reactor_set_interrupt_mode(void *arg1, void *arg2) 421 { 422 struct spdk_reactor *target = arg1; 423 struct spdk_thread *thread; 424 struct spdk_fd_group *grp; 425 struct spdk_lw_thread *lw_thread, *tmp; 426 427 assert(target == spdk_reactor_get(spdk_env_get_current_core())); 428 assert(target != NULL); 429 assert(target->in_interrupt != target->new_in_interrupt); 430 SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n", 431 target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll"); 432 433 target->in_interrupt = target->new_in_interrupt; 434 435 if (spdk_interrupt_mode_is_enabled()) { 436 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 437 TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) { 438 thread = spdk_thread_get_from_ctx(lw_thread); 439 if (target->in_interrupt) { 440 grp = spdk_thread_get_interrupt_fd_group(thread); 441 spdk_fd_group_nest(target->fgrp, grp); 442 } else { 443 grp = spdk_thread_get_interrupt_fd_group(thread); 444 spdk_fd_group_unnest(target->fgrp, grp); 445 } 446 447 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target); 448 } 449 } 450 451 if (target->new_in_interrupt == false) { 452 /* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately 453 * track reactor stats. */ 454 target->tsc_last = spdk_get_ticks(); 455 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 456 } else { 457 uint64_t notify = 1; 458 int rc = 0; 459 460 /* Always trigger spdk_event and resched event in case of race condition */ 461 rc = write(target->events_fd, ¬ify, sizeof(notify)); 462 if (rc < 0) { 463 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 464 } 465 rc = write(target->resched_fd, ¬ify, sizeof(notify)); 466 if (rc < 0) { 467 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 468 } 469 470 target->set_interrupt_mode_in_progress = false; 471 _event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn, 472 target->set_interrupt_mode_cb_arg, NULL); 473 } 474 } 475 476 int 477 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, 478 spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg) 479 { 480 struct spdk_reactor *target; 481 482 target = spdk_reactor_get(lcore); 483 if (target == NULL) { 484 return -EINVAL; 485 } 486 487 /* Eventfd has to be supported in order to use interrupt functionality. */ 488 if (target->fgrp == NULL) { 489 return -ENOTSUP; 490 } 491 492 if (spdk_env_get_current_core() != g_scheduling_reactor->lcore) { 493 SPDK_ERRLOG("It is only permitted within scheduling reactor.\n"); 494 return -EPERM; 495 } 496 497 if (target->in_interrupt == new_in_interrupt) { 498 cb_fn(cb_arg, NULL); 499 return 0; 500 } 501 502 if (target->set_interrupt_mode_in_progress) { 503 SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore); 504 return -EBUSY; 505 } 506 target->set_interrupt_mode_in_progress = true; 507 508 target->new_in_interrupt = new_in_interrupt; 509 target->set_interrupt_mode_cb_fn = cb_fn; 510 target->set_interrupt_mode_cb_arg = cb_arg; 511 512 SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n", 513 spdk_env_get_current_core(), lcore); 514 515 if (new_in_interrupt == false) { 516 /* For potential race cases, when setting the reactor to poll mode, 517 * first change the mode of the reactor and then clear the corresponding 518 * bit of the notify_cpuset of each reactor. 519 */ 520 _event_call(lcore, _reactor_set_interrupt_mode, target, NULL); 521 } else { 522 /* For race cases, when setting the reactor to interrupt mode, first set the 523 * corresponding bit of the notify_cpuset of each reactor and then change the mode. 524 */ 525 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 526 } 527 528 return 0; 529 } 530 531 struct spdk_event * 532 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 533 { 534 struct spdk_event *event = NULL; 535 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 536 537 if (!reactor) { 538 assert(false); 539 return NULL; 540 } 541 542 event = spdk_mempool_get(g_spdk_event_mempool); 543 if (event == NULL) { 544 assert(false); 545 return NULL; 546 } 547 548 event->lcore = lcore; 549 event->fn = fn; 550 event->arg1 = arg1; 551 event->arg2 = arg2; 552 553 return event; 554 } 555 556 void 557 spdk_event_call(struct spdk_event *event) 558 { 559 int rc; 560 struct spdk_reactor *reactor; 561 struct spdk_reactor *local_reactor = NULL; 562 uint32_t current_core = spdk_env_get_current_core(); 563 564 reactor = spdk_reactor_get(event->lcore); 565 566 assert(reactor != NULL); 567 assert(reactor->events != NULL); 568 569 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); 570 if (rc != 1) { 571 assert(false); 572 } 573 574 if (current_core != SPDK_ENV_LCORE_ID_ANY) { 575 local_reactor = spdk_reactor_get(current_core); 576 } 577 578 /* If spdk_event_call isn't called on a reactor, always send a notification. 579 * If it is called on a reactor, send a notification if the destination reactor 580 * is indicated in interrupt mode state. 581 */ 582 if (spdk_unlikely(local_reactor == NULL) || 583 spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) { 584 uint64_t notify = 1; 585 586 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 587 if (rc < 0) { 588 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 589 } 590 } 591 } 592 593 static inline int 594 event_queue_run_batch(void *arg) 595 { 596 struct spdk_reactor *reactor = arg; 597 size_t count, i; 598 void *events[SPDK_EVENT_BATCH_SIZE]; 599 600 #ifdef DEBUG 601 /* 602 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 603 * so we will never actually read uninitialized data from events, but just to be sure 604 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 605 */ 606 memset(events, 0, sizeof(events)); 607 #endif 608 609 /* Operate event notification if this reactor currently runs in interrupt state */ 610 if (spdk_unlikely(reactor->in_interrupt)) { 611 uint64_t notify = 1; 612 int rc; 613 614 /* There may be race between event_acknowledge and another producer's event_notify, 615 * so event_acknowledge should be applied ahead. And then check for self's event_notify. 616 * This can avoid event notification missing. 617 */ 618 rc = read(reactor->events_fd, ¬ify, sizeof(notify)); 619 if (rc < 0) { 620 SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno)); 621 return -errno; 622 } 623 624 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 625 626 if (spdk_ring_count(reactor->events) != 0) { 627 /* Trigger new notification if there are still events in event-queue waiting for processing. */ 628 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 629 if (rc < 0) { 630 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 631 return -errno; 632 } 633 } 634 } else { 635 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 636 } 637 638 if (count == 0) { 639 return 0; 640 } 641 642 for (i = 0; i < count; i++) { 643 struct spdk_event *event = events[i]; 644 645 assert(event != NULL); 646 assert(spdk_get_thread() == NULL); 647 SPDK_DTRACE_PROBE3(event_exec, event->fn, 648 event->arg1, event->arg2); 649 event->fn(event->arg1, event->arg2); 650 } 651 652 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 653 654 return (int)count; 655 } 656 657 /* 1s */ 658 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 659 660 static int 661 get_rusage(struct spdk_reactor *reactor) 662 { 663 struct rusage rusage; 664 665 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 666 return -1; 667 } 668 669 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 670 SPDK_INFOLOG(reactor, 671 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 672 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 673 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 674 } 675 reactor->rusage = rusage; 676 677 return -1; 678 } 679 680 void 681 spdk_framework_enable_context_switch_monitor(bool enable) 682 { 683 /* This global is being read by multiple threads, so this isn't 684 * strictly thread safe. However, we're toggling between true and 685 * false here, and if a thread sees the value update later than it 686 * should, it's no big deal. */ 687 g_framework_context_switch_monitor_enabled = enable; 688 } 689 690 bool 691 spdk_framework_context_switch_monitor_enabled(void) 692 { 693 return g_framework_context_switch_monitor_enabled; 694 } 695 696 static void 697 _set_thread_name(const char *thread_name) 698 { 699 #if defined(__linux__) 700 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 701 #elif defined(__FreeBSD__) 702 pthread_set_name_np(pthread_self(), thread_name); 703 #else 704 pthread_setname_np(pthread_self(), thread_name); 705 #endif 706 } 707 708 static void 709 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 710 { 711 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 712 struct spdk_thread_stats prev_total_stats; 713 714 /* Read total_stats before updating it to calculate stats during the last scheduling period. */ 715 prev_total_stats = lw_thread->total_stats; 716 717 spdk_set_thread(thread); 718 spdk_thread_get_stats(&lw_thread->total_stats); 719 spdk_set_thread(NULL); 720 721 lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc; 722 lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc; 723 } 724 725 static void 726 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info) 727 { 728 struct spdk_lw_thread *lw_thread; 729 struct spdk_thread *thread; 730 731 thread = spdk_thread_get_by_id(thread_info->thread_id); 732 if (thread == NULL) { 733 /* Thread no longer exists. */ 734 return; 735 } 736 lw_thread = spdk_thread_get_ctx(thread); 737 assert(lw_thread != NULL); 738 739 lw_thread->lcore = thread_info->lcore; 740 lw_thread->resched = true; 741 } 742 743 static void 744 _threads_reschedule(struct spdk_scheduler_core_info *cores_info) 745 { 746 struct spdk_scheduler_core_info *core; 747 struct spdk_scheduler_thread_info *thread_info; 748 uint32_t i, j; 749 750 SPDK_ENV_FOREACH_CORE(i) { 751 core = &cores_info[i]; 752 for (j = 0; j < core->threads_count; j++) { 753 thread_info = &core->thread_infos[j]; 754 if (thread_info->lcore != i) { 755 if (core->isolated || cores_info[thread_info->lcore].isolated) { 756 SPDK_ERRLOG("A thread cannot be moved from an isolated core or \ 757 moved to an isolated core. Skip rescheduling thread\n"); 758 continue; 759 } 760 _threads_reschedule_thread(thread_info); 761 } 762 } 763 core->threads_count = 0; 764 free(core->thread_infos); 765 core->thread_infos = NULL; 766 } 767 } 768 769 static void 770 _reactors_scheduler_fini(void) 771 { 772 /* Reschedule based on the balancing output */ 773 _threads_reschedule(g_core_infos); 774 775 g_scheduling_in_progress = false; 776 } 777 778 static void 779 _reactors_scheduler_update_core_mode(void *ctx1, void *ctx2) 780 { 781 struct spdk_reactor *reactor; 782 uint32_t i; 783 int rc = 0; 784 785 for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) { 786 reactor = spdk_reactor_get(i); 787 assert(reactor != NULL); 788 if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) { 789 /* Switch next found reactor to new state */ 790 rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode, 791 _reactors_scheduler_update_core_mode, NULL); 792 if (rc == 0) { 793 /* Set core to start with after callback completes */ 794 g_scheduler_core_number = spdk_env_get_next_core(i); 795 return; 796 } 797 } 798 } 799 _reactors_scheduler_fini(); 800 } 801 802 static void 803 _reactors_scheduler_cancel(void *arg1, void *arg2) 804 { 805 struct spdk_scheduler_core_info *core; 806 uint32_t i; 807 808 SPDK_ENV_FOREACH_CORE(i) { 809 core = &g_core_infos[i]; 810 core->threads_count = 0; 811 free(core->thread_infos); 812 core->thread_infos = NULL; 813 } 814 815 g_scheduling_in_progress = false; 816 } 817 818 static void 819 _reactors_scheduler_balance(void *arg1, void *arg2) 820 { 821 struct spdk_scheduler *scheduler = spdk_scheduler_get(); 822 823 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) { 824 _reactors_scheduler_cancel(NULL, NULL); 825 return; 826 } 827 828 scheduler->balance(g_core_infos, g_reactor_count); 829 830 g_scheduler_core_number = spdk_env_get_first_core(); 831 _reactors_scheduler_update_core_mode(NULL, NULL); 832 } 833 834 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */ 835 static void 836 _reactors_scheduler_gather_metrics(void *arg1, void *arg2) 837 { 838 struct spdk_scheduler_core_info *core_info; 839 struct spdk_lw_thread *lw_thread; 840 struct spdk_thread *thread; 841 struct spdk_reactor *reactor; 842 uint32_t next_core; 843 uint32_t i = 0; 844 845 reactor = spdk_reactor_get(spdk_env_get_current_core()); 846 assert(reactor != NULL); 847 core_info = &g_core_infos[reactor->lcore]; 848 core_info->lcore = reactor->lcore; 849 core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc; 850 core_info->total_idle_tsc = reactor->idle_tsc; 851 core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc; 852 core_info->total_busy_tsc = reactor->busy_tsc; 853 core_info->interrupt_mode = reactor->in_interrupt; 854 core_info->threads_count = 0; 855 core_info->isolated = scheduler_is_isolated_core(reactor->lcore); 856 857 SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); 858 859 spdk_trace_record(TRACE_SCHEDULER_CORE_STATS, reactor->trace_id, 0, 0, 860 core_info->current_busy_tsc, 861 core_info->current_idle_tsc); 862 863 if (reactor->thread_count > 0) { 864 core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos)); 865 if (core_info->thread_infos == NULL) { 866 SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore); 867 868 /* Cancel this round of schedule work */ 869 _event_call(spdk_scheduler_get_scheduling_lcore(), _reactors_scheduler_cancel, NULL, NULL); 870 return; 871 } 872 873 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 874 _init_thread_stats(reactor, lw_thread); 875 876 core_info->thread_infos[i].lcore = lw_thread->lcore; 877 thread = spdk_thread_get_from_ctx(lw_thread); 878 assert(thread != NULL); 879 core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread); 880 core_info->thread_infos[i].total_stats = lw_thread->total_stats; 881 core_info->thread_infos[i].current_stats = lw_thread->current_stats; 882 core_info->threads_count++; 883 assert(core_info->threads_count <= reactor->thread_count); 884 885 spdk_trace_record(TRACE_SCHEDULER_THREAD_STATS, spdk_thread_get_trace_id(thread), 0, 0, 886 lw_thread->current_stats.busy_tsc, 887 lw_thread->current_stats.idle_tsc); 888 889 i++; 890 } 891 } 892 893 next_core = spdk_env_get_next_core(reactor->lcore); 894 if (next_core == UINT32_MAX) { 895 next_core = spdk_env_get_first_core(); 896 } 897 898 /* If we've looped back around to the scheduler thread, move to the next phase */ 899 if (next_core == spdk_scheduler_get_scheduling_lcore()) { 900 /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */ 901 _event_call(next_core, _reactors_scheduler_balance, NULL, NULL); 902 return; 903 } 904 905 _event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL); 906 } 907 908 static int _reactor_schedule_thread(struct spdk_thread *thread); 909 static uint64_t g_rusage_period; 910 911 static void 912 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 913 { 914 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 915 struct spdk_fd_group *grp; 916 917 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 918 assert(reactor->thread_count > 0); 919 reactor->thread_count--; 920 921 /* Operate thread intr if running with full interrupt ability */ 922 if (spdk_interrupt_mode_is_enabled()) { 923 if (reactor->in_interrupt) { 924 grp = spdk_thread_get_interrupt_fd_group(thread); 925 spdk_fd_group_unnest(reactor->fgrp, grp); 926 } 927 } 928 } 929 930 static bool 931 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 932 { 933 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 934 935 if (spdk_unlikely(spdk_thread_is_exited(thread) && 936 spdk_thread_is_idle(thread))) { 937 _reactor_remove_lw_thread(reactor, lw_thread); 938 spdk_thread_destroy(thread); 939 return true; 940 } 941 942 if (spdk_unlikely(lw_thread->resched && !spdk_thread_is_bound(thread))) { 943 lw_thread->resched = false; 944 _reactor_remove_lw_thread(reactor, lw_thread); 945 _reactor_schedule_thread(thread); 946 return true; 947 } 948 949 return false; 950 } 951 952 static void 953 reactor_interrupt_run(struct spdk_reactor *reactor) 954 { 955 int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */ 956 957 spdk_fd_group_wait(reactor->fgrp, block_timeout); 958 } 959 960 static void 961 _reactor_run(struct spdk_reactor *reactor) 962 { 963 struct spdk_thread *thread; 964 struct spdk_lw_thread *lw_thread, *tmp; 965 uint64_t now; 966 int rc; 967 968 event_queue_run_batch(reactor); 969 970 /* If no threads are present on the reactor, 971 * tsc_last gets outdated. Update it to track 972 * thread execution time correctly. */ 973 if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) { 974 now = spdk_get_ticks(); 975 reactor->idle_tsc += now - reactor->tsc_last; 976 reactor->tsc_last = now; 977 return; 978 } 979 980 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 981 thread = spdk_thread_get_from_ctx(lw_thread); 982 rc = spdk_thread_poll(thread, 0, reactor->tsc_last); 983 984 now = spdk_thread_get_last_tsc(thread); 985 if (rc == 0) { 986 reactor->idle_tsc += now - reactor->tsc_last; 987 } else if (rc > 0) { 988 reactor->busy_tsc += now - reactor->tsc_last; 989 } 990 reactor->tsc_last = now; 991 992 reactor_post_process_lw_thread(reactor, lw_thread); 993 } 994 } 995 996 static int 997 reactor_run(void *arg) 998 { 999 struct spdk_reactor *reactor = arg; 1000 struct spdk_thread *thread; 1001 struct spdk_lw_thread *lw_thread, *tmp; 1002 char thread_name[32]; 1003 uint64_t last_sched = 0; 1004 1005 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 1006 1007 /* Rename the POSIX thread because the reactor is tied to the POSIX 1008 * thread in the SPDK event library. 1009 */ 1010 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 1011 _set_thread_name(thread_name); 1012 1013 reactor->trace_id = spdk_trace_register_owner(OWNER_TYPE_REACTOR, thread_name); 1014 1015 reactor->tsc_last = spdk_get_ticks(); 1016 1017 while (1) { 1018 /* Execute interrupt process fn if this reactor currently runs in interrupt state */ 1019 if (spdk_unlikely(reactor->in_interrupt)) { 1020 reactor_interrupt_run(reactor); 1021 } else { 1022 _reactor_run(reactor); 1023 } 1024 1025 if (g_framework_context_switch_monitor_enabled) { 1026 if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { 1027 get_rusage(reactor); 1028 reactor->last_rusage = reactor->tsc_last; 1029 } 1030 } 1031 1032 if (spdk_unlikely(g_scheduler_period > 0 && 1033 (reactor->tsc_last - last_sched) > g_scheduler_period && 1034 reactor == g_scheduling_reactor && 1035 !g_scheduling_in_progress)) { 1036 last_sched = reactor->tsc_last; 1037 g_scheduling_in_progress = true; 1038 spdk_trace_record(TRACE_SCHEDULER_PERIOD_START, 0, 0, 0); 1039 _reactors_scheduler_gather_metrics(NULL, NULL); 1040 } 1041 1042 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 1043 break; 1044 } 1045 } 1046 1047 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 1048 thread = spdk_thread_get_from_ctx(lw_thread); 1049 /* All threads should have already had spdk_thread_exit() called on them, except 1050 * for the app thread. 1051 */ 1052 if (spdk_thread_is_running(thread)) { 1053 if (!spdk_thread_is_app_thread(thread)) { 1054 SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n", 1055 spdk_thread_get_name(thread)); 1056 SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n"); 1057 } 1058 spdk_set_thread(thread); 1059 spdk_thread_exit(thread); 1060 } 1061 } 1062 1063 while (!TAILQ_EMPTY(&reactor->threads)) { 1064 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1065 thread = spdk_thread_get_from_ctx(lw_thread); 1066 spdk_set_thread(thread); 1067 if (spdk_thread_is_exited(thread)) { 1068 _reactor_remove_lw_thread(reactor, lw_thread); 1069 spdk_thread_destroy(thread); 1070 } else { 1071 if (spdk_unlikely(reactor->in_interrupt)) { 1072 reactor_interrupt_run(reactor); 1073 } else { 1074 spdk_thread_poll(thread, 0, 0); 1075 } 1076 } 1077 } 1078 } 1079 1080 return 0; 1081 } 1082 1083 int 1084 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 1085 { 1086 int ret; 1087 const struct spdk_cpuset *validmask; 1088 1089 ret = spdk_cpuset_parse(cpumask, mask); 1090 if (ret < 0) { 1091 return ret; 1092 } 1093 1094 validmask = spdk_app_get_core_mask(); 1095 spdk_cpuset_and(cpumask, validmask); 1096 1097 return 0; 1098 } 1099 1100 const struct spdk_cpuset * 1101 spdk_app_get_core_mask(void) 1102 { 1103 return &g_reactor_core_mask; 1104 } 1105 1106 void 1107 spdk_reactors_start(void) 1108 { 1109 struct spdk_reactor *reactor; 1110 uint32_t i, current_core; 1111 int rc; 1112 1113 g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; 1114 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 1115 /* Reinitialize to false, in case the app framework is restarting in the same process. */ 1116 g_stopping_reactors = false; 1117 1118 current_core = spdk_env_get_current_core(); 1119 SPDK_ENV_FOREACH_CORE(i) { 1120 if (i != current_core) { 1121 reactor = spdk_reactor_get(i); 1122 if (reactor == NULL) { 1123 continue; 1124 } 1125 1126 rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); 1127 if (rc < 0) { 1128 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 1129 assert(false); 1130 return; 1131 } 1132 } 1133 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 1134 } 1135 1136 /* Start the main reactor */ 1137 reactor = spdk_reactor_get(current_core); 1138 assert(reactor != NULL); 1139 reactor_run(reactor); 1140 1141 spdk_env_thread_wait_all(); 1142 1143 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 1144 } 1145 1146 static void 1147 _reactors_stop(void *arg1, void *arg2) 1148 { 1149 uint32_t i; 1150 int rc; 1151 struct spdk_reactor *reactor; 1152 struct spdk_reactor *local_reactor; 1153 uint64_t notify = 1; 1154 1155 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 1156 local_reactor = spdk_reactor_get(spdk_env_get_current_core()); 1157 1158 SPDK_ENV_FOREACH_CORE(i) { 1159 /* If spdk_event_call isn't called on a reactor, always send a notification. 1160 * If it is called on a reactor, send a notification if the destination reactor 1161 * is indicated in interrupt mode state. 1162 */ 1163 if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) { 1164 reactor = spdk_reactor_get(i); 1165 assert(reactor != NULL); 1166 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 1167 if (rc < 0) { 1168 SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno)); 1169 continue; 1170 } 1171 } 1172 } 1173 } 1174 1175 static void 1176 nop(void *arg1, void *arg2) 1177 { 1178 } 1179 1180 void 1181 spdk_reactors_stop(void *arg1) 1182 { 1183 spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop); 1184 } 1185 1186 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 1187 static uint32_t g_next_core = UINT32_MAX; 1188 1189 static void 1190 _schedule_thread(void *arg1, void *arg2) 1191 { 1192 struct spdk_lw_thread *lw_thread = arg1; 1193 struct spdk_thread *thread; 1194 struct spdk_reactor *reactor; 1195 uint32_t current_core; 1196 struct spdk_fd_group *grp; 1197 1198 current_core = spdk_env_get_current_core(); 1199 reactor = spdk_reactor_get(current_core); 1200 assert(reactor != NULL); 1201 1202 /* Update total_stats to reflect state of thread 1203 * at the end of the move. */ 1204 thread = spdk_thread_get_from_ctx(lw_thread); 1205 spdk_set_thread(thread); 1206 spdk_thread_get_stats(&lw_thread->total_stats); 1207 spdk_set_thread(NULL); 1208 1209 lw_thread->lcore = current_core; 1210 1211 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 1212 reactor->thread_count++; 1213 1214 /* Operate thread intr if running with full interrupt ability */ 1215 if (spdk_interrupt_mode_is_enabled()) { 1216 int rc; 1217 1218 if (reactor->in_interrupt) { 1219 grp = spdk_thread_get_interrupt_fd_group(thread); 1220 rc = spdk_fd_group_nest(reactor->fgrp, grp); 1221 if (rc < 0) { 1222 SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc)); 1223 } 1224 } 1225 1226 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 1227 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor); 1228 } 1229 } 1230 1231 static int 1232 _reactor_schedule_thread(struct spdk_thread *thread) 1233 { 1234 uint32_t core; 1235 struct spdk_lw_thread *lw_thread; 1236 struct spdk_event *evt = NULL; 1237 struct spdk_cpuset *cpumask; 1238 uint32_t i; 1239 struct spdk_reactor *local_reactor = NULL; 1240 uint32_t current_lcore = spdk_env_get_current_core(); 1241 struct spdk_cpuset polling_cpumask; 1242 struct spdk_cpuset valid_cpumask; 1243 1244 cpumask = spdk_thread_get_cpumask(thread); 1245 1246 lw_thread = spdk_thread_get_ctx(thread); 1247 assert(lw_thread != NULL); 1248 core = lw_thread->lcore; 1249 memset(lw_thread, 0, sizeof(*lw_thread)); 1250 1251 if (current_lcore != SPDK_ENV_LCORE_ID_ANY) { 1252 local_reactor = spdk_reactor_get(current_lcore); 1253 assert(local_reactor); 1254 } 1255 1256 /* When interrupt ability of spdk_thread is not enabled and the current 1257 * reactor runs on DPDK thread, skip reactors which are in interrupt mode. 1258 */ 1259 if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) { 1260 /* Get the cpumask of all reactors in polling */ 1261 spdk_cpuset_zero(&polling_cpumask); 1262 SPDK_ENV_FOREACH_CORE(i) { 1263 spdk_cpuset_set_cpu(&polling_cpumask, i, true); 1264 } 1265 spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset); 1266 1267 if (core == SPDK_ENV_LCORE_ID_ANY) { 1268 /* Get the cpumask of all valid reactors which are suggested and also in polling */ 1269 spdk_cpuset_copy(&valid_cpumask, &polling_cpumask); 1270 spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread)); 1271 1272 /* If there are any valid reactors, spdk_thread should be scheduled 1273 * into one of the valid reactors. 1274 * If there is no valid reactors, spdk_thread should be scheduled 1275 * into one of the polling reactors. 1276 */ 1277 if (spdk_cpuset_count(&valid_cpumask) != 0) { 1278 cpumask = &valid_cpumask; 1279 } else { 1280 cpumask = &polling_cpumask; 1281 } 1282 } else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) { 1283 /* If specified reactor is not in polling, spdk_thread should be scheduled 1284 * into one of the polling reactors. 1285 */ 1286 core = SPDK_ENV_LCORE_ID_ANY; 1287 cpumask = &polling_cpumask; 1288 } 1289 } 1290 1291 pthread_mutex_lock(&g_scheduler_mtx); 1292 if (core == SPDK_ENV_LCORE_ID_ANY) { 1293 for (i = 0; i < spdk_env_get_core_count(); i++) { 1294 if (g_next_core >= g_reactor_count) { 1295 g_next_core = spdk_env_get_first_core(); 1296 } 1297 core = g_next_core; 1298 g_next_core = spdk_env_get_next_core(g_next_core); 1299 1300 if (spdk_cpuset_get_cpu(cpumask, core)) { 1301 break; 1302 } 1303 } 1304 } 1305 1306 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 1307 1308 if (current_lcore != core) { 1309 spdk_trace_record(TRACE_SCHEDULER_MOVE_THREAD, spdk_thread_get_trace_id(thread), 0, 0, 1310 current_lcore, core); 1311 } 1312 1313 pthread_mutex_unlock(&g_scheduler_mtx); 1314 1315 assert(evt != NULL); 1316 if (evt == NULL) { 1317 SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); 1318 return -1; 1319 } 1320 1321 lw_thread->tsc_start = spdk_get_ticks(); 1322 1323 spdk_event_call(evt); 1324 1325 return 0; 1326 } 1327 1328 static void 1329 _reactor_request_thread_reschedule(struct spdk_thread *thread) 1330 { 1331 struct spdk_lw_thread *lw_thread; 1332 struct spdk_reactor *reactor; 1333 uint32_t current_core; 1334 1335 assert(thread == spdk_get_thread()); 1336 1337 lw_thread = spdk_thread_get_ctx(thread); 1338 1339 assert(lw_thread != NULL); 1340 lw_thread->resched = true; 1341 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1342 1343 current_core = spdk_env_get_current_core(); 1344 reactor = spdk_reactor_get(current_core); 1345 assert(reactor != NULL); 1346 1347 /* Send a notification if the destination reactor is indicated in intr mode state */ 1348 if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) { 1349 uint64_t notify = 1; 1350 1351 if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1352 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 1353 } 1354 } 1355 } 1356 1357 static int 1358 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) 1359 { 1360 struct spdk_lw_thread *lw_thread; 1361 1362 switch (op) { 1363 case SPDK_THREAD_OP_NEW: 1364 lw_thread = spdk_thread_get_ctx(thread); 1365 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1366 return _reactor_schedule_thread(thread); 1367 case SPDK_THREAD_OP_RESCHED: 1368 _reactor_request_thread_reschedule(thread); 1369 return 0; 1370 default: 1371 return -ENOTSUP; 1372 } 1373 } 1374 1375 static bool 1376 reactor_thread_op_supported(enum spdk_thread_op op) 1377 { 1378 switch (op) { 1379 case SPDK_THREAD_OP_NEW: 1380 case SPDK_THREAD_OP_RESCHED: 1381 return true; 1382 default: 1383 return false; 1384 } 1385 } 1386 1387 struct call_reactor { 1388 uint32_t cur_core; 1389 spdk_event_fn fn; 1390 void *arg1; 1391 void *arg2; 1392 1393 uint32_t orig_core; 1394 spdk_event_fn cpl; 1395 }; 1396 1397 static void 1398 on_reactor(void *arg1, void *arg2) 1399 { 1400 struct call_reactor *cr = arg1; 1401 struct spdk_event *evt; 1402 1403 cr->fn(cr->arg1, cr->arg2); 1404 1405 cr->cur_core = spdk_env_get_next_core(cr->cur_core); 1406 1407 if (cr->cur_core >= g_reactor_count) { 1408 SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n"); 1409 1410 evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); 1411 free(cr); 1412 } else { 1413 SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n", 1414 cr->cur_core); 1415 1416 evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); 1417 } 1418 assert(evt != NULL); 1419 spdk_event_call(evt); 1420 } 1421 1422 void 1423 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) 1424 { 1425 struct call_reactor *cr; 1426 1427 /* When the application framework is shutting down, we will send one 1428 * final for_each_reactor operation with completion callback _reactors_stop, 1429 * to flush any existing for_each_reactor operations to avoid any memory 1430 * leaks. We use a mutex here to protect a boolean flag that will ensure 1431 * we don't start any more operations once we've started shutting down. 1432 */ 1433 pthread_mutex_lock(&g_stopping_reactors_mtx); 1434 if (g_stopping_reactors) { 1435 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1436 return; 1437 } else if (cpl == _reactors_stop) { 1438 g_stopping_reactors = true; 1439 } 1440 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1441 1442 cr = calloc(1, sizeof(*cr)); 1443 if (!cr) { 1444 SPDK_ERRLOG("Unable to perform reactor iteration\n"); 1445 cpl(arg1, arg2); 1446 return; 1447 } 1448 1449 cr->fn = fn; 1450 cr->arg1 = arg1; 1451 cr->arg2 = arg2; 1452 cr->cpl = cpl; 1453 cr->orig_core = spdk_env_get_current_core(); 1454 cr->cur_core = spdk_env_get_first_core(); 1455 1456 SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core); 1457 1458 _event_call(cr->cur_core, on_reactor, cr, NULL); 1459 } 1460 1461 #ifdef __linux__ 1462 static int 1463 reactor_schedule_thread_event(void *arg) 1464 { 1465 struct spdk_reactor *reactor = arg; 1466 struct spdk_lw_thread *lw_thread, *tmp; 1467 uint32_t count = 0; 1468 uint64_t notify = 1; 1469 1470 assert(reactor->in_interrupt); 1471 1472 if (read(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1473 SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno)); 1474 return -errno; 1475 } 1476 1477 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1478 count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0; 1479 } 1480 1481 return count; 1482 } 1483 1484 static int 1485 reactor_interrupt_init(struct spdk_reactor *reactor) 1486 { 1487 int rc; 1488 1489 rc = spdk_fd_group_create(&reactor->fgrp); 1490 if (rc != 0) { 1491 return rc; 1492 } 1493 1494 reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1495 if (reactor->resched_fd < 0) { 1496 rc = -EBADF; 1497 goto err; 1498 } 1499 1500 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event, 1501 reactor); 1502 if (rc) { 1503 close(reactor->resched_fd); 1504 goto err; 1505 } 1506 1507 reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1508 if (reactor->events_fd < 0) { 1509 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1510 close(reactor->resched_fd); 1511 1512 rc = -EBADF; 1513 goto err; 1514 } 1515 1516 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd, 1517 event_queue_run_batch, reactor); 1518 if (rc) { 1519 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1520 close(reactor->resched_fd); 1521 close(reactor->events_fd); 1522 goto err; 1523 } 1524 1525 return 0; 1526 1527 err: 1528 spdk_fd_group_destroy(reactor->fgrp); 1529 reactor->fgrp = NULL; 1530 return rc; 1531 } 1532 #else 1533 static int 1534 reactor_interrupt_init(struct spdk_reactor *reactor) 1535 { 1536 return -ENOTSUP; 1537 } 1538 #endif 1539 1540 static void 1541 reactor_interrupt_fini(struct spdk_reactor *reactor) 1542 { 1543 struct spdk_fd_group *fgrp = reactor->fgrp; 1544 1545 if (!fgrp) { 1546 return; 1547 } 1548 1549 spdk_fd_group_remove(fgrp, reactor->events_fd); 1550 spdk_fd_group_remove(fgrp, reactor->resched_fd); 1551 1552 close(reactor->events_fd); 1553 close(reactor->resched_fd); 1554 1555 spdk_fd_group_destroy(fgrp); 1556 reactor->fgrp = NULL; 1557 } 1558 1559 static struct spdk_governor * 1560 _governor_find(const char *name) 1561 { 1562 struct spdk_governor *governor, *tmp; 1563 1564 TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) { 1565 if (strcmp(name, governor->name) == 0) { 1566 return governor; 1567 } 1568 } 1569 1570 return NULL; 1571 } 1572 1573 int 1574 spdk_governor_set(const char *name) 1575 { 1576 struct spdk_governor *governor; 1577 int rc = 0; 1578 1579 /* NULL governor was specifically requested */ 1580 if (name == NULL) { 1581 if (g_governor) { 1582 g_governor->deinit(); 1583 } 1584 g_governor = NULL; 1585 return 0; 1586 } 1587 1588 governor = _governor_find(name); 1589 if (governor == NULL) { 1590 return -EINVAL; 1591 } 1592 1593 if (g_governor == governor) { 1594 return 0; 1595 } 1596 1597 rc = governor->init(); 1598 if (rc == 0) { 1599 if (g_governor) { 1600 g_governor->deinit(); 1601 } 1602 g_governor = governor; 1603 } 1604 1605 return rc; 1606 } 1607 1608 struct spdk_governor * 1609 spdk_governor_get(void) 1610 { 1611 return g_governor; 1612 } 1613 1614 void 1615 spdk_governor_register(struct spdk_governor *governor) 1616 { 1617 if (_governor_find(governor->name)) { 1618 SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name); 1619 assert(false); 1620 return; 1621 } 1622 1623 TAILQ_INSERT_TAIL(&g_governor_list, governor, link); 1624 } 1625 1626 SPDK_LOG_REGISTER_COMPONENT(reactor) 1627 1628 static void 1629 scheduler_trace(void) 1630 { 1631 struct spdk_trace_tpoint_opts opts[] = { 1632 { 1633 "SCHEDULER_PERIOD_START", TRACE_SCHEDULER_PERIOD_START, 1634 OWNER_TYPE_NONE, OBJECT_NONE, 0, 1635 { 1636 1637 } 1638 }, 1639 { 1640 "SCHEDULER_CORE_STATS", TRACE_SCHEDULER_CORE_STATS, 1641 OWNER_TYPE_REACTOR, OBJECT_NONE, 0, 1642 { 1643 { "busy", SPDK_TRACE_ARG_TYPE_INT, 8}, 1644 { "idle", SPDK_TRACE_ARG_TYPE_INT, 8} 1645 } 1646 }, 1647 { 1648 "SCHEDULER_THREAD_STATS", TRACE_SCHEDULER_THREAD_STATS, 1649 OWNER_TYPE_THREAD, OBJECT_NONE, 0, 1650 { 1651 { "busy", SPDK_TRACE_ARG_TYPE_INT, 8}, 1652 { "idle", SPDK_TRACE_ARG_TYPE_INT, 8} 1653 } 1654 }, 1655 { 1656 "SCHEDULER_MOVE_THREAD", TRACE_SCHEDULER_MOVE_THREAD, 1657 OWNER_TYPE_THREAD, OBJECT_NONE, 0, 1658 { 1659 { "src", SPDK_TRACE_ARG_TYPE_INT, 8 }, 1660 { "dst", SPDK_TRACE_ARG_TYPE_INT, 8 } 1661 } 1662 } 1663 }; 1664 1665 spdk_trace_register_owner_type(OWNER_TYPE_REACTOR, 'r'); 1666 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 1667 1668 } 1669 1670 SPDK_TRACE_REGISTER_FN(scheduler_trace, "scheduler", TRACE_GROUP_SCHEDULER) 1671