1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/likely.h" 8 9 #include "event_internal.h" 10 11 #include "spdk_internal/event.h" 12 #include "spdk_internal/usdt.h" 13 14 #include "spdk/log.h" 15 #include "spdk/thread.h" 16 #include "spdk/env.h" 17 #include "spdk/util.h" 18 #include "spdk/scheduler.h" 19 #include "spdk/string.h" 20 #include "spdk/fd_group.h" 21 22 #ifdef __linux__ 23 #include <sys/prctl.h> 24 #include <sys/eventfd.h> 25 #endif 26 27 #ifdef __FreeBSD__ 28 #include <pthread_np.h> 29 #endif 30 31 #define SPDK_EVENT_BATCH_SIZE 8 32 33 static struct spdk_reactor *g_reactors; 34 static uint32_t g_reactor_count; 35 static struct spdk_cpuset g_reactor_core_mask; 36 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; 37 38 static bool g_framework_context_switch_monitor_enabled = true; 39 40 static struct spdk_mempool *g_spdk_event_mempool = NULL; 41 42 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list 43 = TAILQ_HEAD_INITIALIZER(g_scheduler_list); 44 45 static struct spdk_scheduler *g_scheduler = NULL; 46 static struct spdk_reactor *g_scheduling_reactor; 47 bool g_scheduling_in_progress = false; 48 static uint64_t g_scheduler_period = 0; 49 static uint32_t g_scheduler_core_number; 50 static struct spdk_scheduler_core_info *g_core_infos = NULL; 51 static struct spdk_cpuset g_scheduler_isolated_core_mask; 52 53 TAILQ_HEAD(, spdk_governor) g_governor_list 54 = TAILQ_HEAD_INITIALIZER(g_governor_list); 55 56 static struct spdk_governor *g_governor = NULL; 57 58 static int reactor_interrupt_init(struct spdk_reactor *reactor); 59 static void reactor_interrupt_fini(struct spdk_reactor *reactor); 60 61 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER; 62 static bool g_stopping_reactors = false; 63 64 static struct spdk_scheduler * 65 _scheduler_find(const char *name) 66 { 67 struct spdk_scheduler *tmp; 68 69 TAILQ_FOREACH(tmp, &g_scheduler_list, link) { 70 if (strcmp(name, tmp->name) == 0) { 71 return tmp; 72 } 73 } 74 75 return NULL; 76 } 77 78 int 79 spdk_scheduler_set(const char *name) 80 { 81 struct spdk_scheduler *scheduler; 82 int rc = 0; 83 84 /* NULL scheduler was specifically requested */ 85 if (name == NULL) { 86 if (g_scheduler) { 87 g_scheduler->deinit(); 88 } 89 g_scheduler = NULL; 90 return 0; 91 } 92 93 scheduler = _scheduler_find(name); 94 if (scheduler == NULL) { 95 SPDK_ERRLOG("Requested scheduler is missing\n"); 96 return -EINVAL; 97 } 98 99 if (g_scheduler == scheduler) { 100 return 0; 101 } 102 103 if (g_scheduler) { 104 g_scheduler->deinit(); 105 } 106 107 rc = scheduler->init(); 108 if (rc == 0) { 109 g_scheduler = scheduler; 110 } else { 111 /* Could not switch to the new scheduler, so keep the old 112 * one. We need to check if it wasn't NULL, and ->init() it again. 113 */ 114 if (g_scheduler) { 115 SPDK_ERRLOG("Could not ->init() '%s' scheduler, reverting to '%s'\n", 116 name, g_scheduler->name); 117 g_scheduler->init(); 118 } else { 119 SPDK_ERRLOG("Could not ->init() '%s' scheduler.\n", name); 120 } 121 } 122 123 return rc; 124 } 125 126 struct spdk_scheduler * 127 spdk_scheduler_get(void) 128 { 129 return g_scheduler; 130 } 131 132 uint64_t 133 spdk_scheduler_get_period(void) 134 { 135 /* Convert from ticks to microseconds */ 136 return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz()); 137 } 138 139 void 140 spdk_scheduler_set_period(uint64_t period) 141 { 142 /* Convert microseconds to ticks */ 143 g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 144 } 145 146 void 147 spdk_scheduler_register(struct spdk_scheduler *scheduler) 148 { 149 if (_scheduler_find(scheduler->name)) { 150 SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name); 151 assert(false); 152 return; 153 } 154 155 TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link); 156 } 157 158 uint32_t 159 spdk_scheduler_get_scheduling_lcore(void) 160 { 161 return g_scheduling_reactor->lcore; 162 } 163 164 bool 165 spdk_scheduler_set_scheduling_lcore(uint32_t core) 166 { 167 struct spdk_reactor *reactor = spdk_reactor_get(core); 168 if (reactor == NULL) { 169 SPDK_ERRLOG("Failed to set scheduling reactor. Reactor(lcore:%d) does not exist", core); 170 return false; 171 } 172 173 g_scheduling_reactor = reactor; 174 return true; 175 } 176 177 bool 178 scheduler_set_isolated_core_mask(struct spdk_cpuset isolated_core_mask) 179 { 180 struct spdk_cpuset tmp_mask; 181 182 spdk_cpuset_copy(&tmp_mask, spdk_app_get_core_mask()); 183 spdk_cpuset_or(&tmp_mask, &isolated_core_mask); 184 if (spdk_cpuset_equal(&tmp_mask, spdk_app_get_core_mask()) == false) { 185 SPDK_ERRLOG("Isolated core mask is not included in app core mask.\n"); 186 return false; 187 } 188 spdk_cpuset_copy(&g_scheduler_isolated_core_mask, &isolated_core_mask); 189 return true; 190 } 191 192 const char * 193 scheduler_get_isolated_core_mask(void) 194 { 195 return spdk_cpuset_fmt(&g_scheduler_isolated_core_mask); 196 } 197 198 static bool 199 scheduler_is_isolated_core(uint32_t core) 200 { 201 return spdk_cpuset_get_cpu(&g_scheduler_isolated_core_mask, core); 202 } 203 204 static void 205 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 206 { 207 reactor->lcore = lcore; 208 reactor->flags.is_valid = true; 209 210 TAILQ_INIT(&reactor->threads); 211 reactor->thread_count = 0; 212 spdk_cpuset_zero(&reactor->notify_cpuset); 213 214 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 215 if (reactor->events == NULL) { 216 SPDK_ERRLOG("Failed to allocate events ring\n"); 217 assert(false); 218 } 219 220 /* Always initialize interrupt facilities for reactor */ 221 if (reactor_interrupt_init(reactor) != 0) { 222 /* Reactor interrupt facilities are necessary if setting app to interrupt mode. */ 223 if (spdk_interrupt_mode_is_enabled()) { 224 SPDK_ERRLOG("Failed to prepare intr facilities\n"); 225 assert(false); 226 } 227 return; 228 } 229 230 /* If application runs with full interrupt ability, 231 * all reactors are going to run in interrupt mode. 232 */ 233 if (spdk_interrupt_mode_is_enabled()) { 234 uint32_t i; 235 236 SPDK_ENV_FOREACH_CORE(i) { 237 spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true); 238 } 239 reactor->in_interrupt = true; 240 } 241 } 242 243 struct spdk_reactor * 244 spdk_reactor_get(uint32_t lcore) 245 { 246 struct spdk_reactor *reactor; 247 248 if (g_reactors == NULL) { 249 SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); 250 return NULL; 251 } 252 253 if (lcore >= g_reactor_count) { 254 return NULL; 255 } 256 257 reactor = &g_reactors[lcore]; 258 259 if (reactor->flags.is_valid == false) { 260 return NULL; 261 } 262 263 return reactor; 264 } 265 266 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); 267 static bool reactor_thread_op_supported(enum spdk_thread_op op); 268 269 int 270 spdk_reactors_init(size_t msg_mempool_size) 271 { 272 struct spdk_reactor *reactor; 273 int rc; 274 uint32_t i, current_core; 275 char mempool_name[32]; 276 277 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 278 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 279 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 280 sizeof(struct spdk_event), 281 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 282 SPDK_ENV_SOCKET_ID_ANY); 283 284 if (g_spdk_event_mempool == NULL) { 285 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 286 return -1; 287 } 288 289 /* struct spdk_reactor must be aligned on 64 byte boundary */ 290 g_reactor_count = spdk_env_get_last_core() + 1; 291 rc = posix_memalign((void **)&g_reactors, 64, 292 g_reactor_count * sizeof(struct spdk_reactor)); 293 if (rc != 0) { 294 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 295 g_reactor_count); 296 spdk_mempool_free(g_spdk_event_mempool); 297 return -1; 298 } 299 300 g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos)); 301 if (g_core_infos == NULL) { 302 SPDK_ERRLOG("Could not allocate memory for g_core_infos\n"); 303 spdk_mempool_free(g_spdk_event_mempool); 304 free(g_reactors); 305 return -ENOMEM; 306 } 307 308 memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor)); 309 310 rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, 311 sizeof(struct spdk_lw_thread), msg_mempool_size); 312 if (rc != 0) { 313 SPDK_ERRLOG("Initialize spdk thread lib failed\n"); 314 spdk_mempool_free(g_spdk_event_mempool); 315 free(g_reactors); 316 free(g_core_infos); 317 return rc; 318 } 319 320 SPDK_ENV_FOREACH_CORE(i) { 321 reactor_construct(&g_reactors[i], i); 322 } 323 324 current_core = spdk_env_get_current_core(); 325 reactor = spdk_reactor_get(current_core); 326 assert(reactor != NULL); 327 g_scheduling_reactor = reactor; 328 329 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 330 331 return 0; 332 } 333 334 void 335 spdk_reactors_fini(void) 336 { 337 uint32_t i; 338 struct spdk_reactor *reactor; 339 340 if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { 341 return; 342 } 343 344 spdk_thread_lib_fini(); 345 346 SPDK_ENV_FOREACH_CORE(i) { 347 reactor = spdk_reactor_get(i); 348 assert(reactor != NULL); 349 assert(reactor->thread_count == 0); 350 if (reactor->events != NULL) { 351 spdk_ring_free(reactor->events); 352 } 353 354 reactor_interrupt_fini(reactor); 355 356 if (g_core_infos != NULL) { 357 free(g_core_infos[i].thread_infos); 358 } 359 } 360 361 spdk_mempool_free(g_spdk_event_mempool); 362 363 free(g_reactors); 364 g_reactors = NULL; 365 free(g_core_infos); 366 g_core_infos = NULL; 367 } 368 369 static void _reactor_set_interrupt_mode(void *arg1, void *arg2); 370 371 static void 372 _reactor_set_notify_cpuset(void *arg1, void *arg2) 373 { 374 struct spdk_reactor *target = arg1; 375 struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); 376 377 assert(reactor != NULL); 378 spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt); 379 } 380 381 static void 382 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 383 { 384 struct spdk_event *ev; 385 386 ev = spdk_event_allocate(lcore, fn, arg1, arg2); 387 assert(ev); 388 spdk_event_call(ev); 389 } 390 391 static void 392 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2) 393 { 394 struct spdk_reactor *target = arg1; 395 396 if (target->new_in_interrupt == false) { 397 target->set_interrupt_mode_in_progress = false; 398 _event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn, 399 target->set_interrupt_mode_cb_arg, NULL); 400 } else { 401 _event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL); 402 } 403 } 404 405 static void 406 _reactor_set_thread_interrupt_mode(void *ctx) 407 { 408 struct spdk_reactor *reactor = ctx; 409 410 spdk_thread_set_interrupt_mode(reactor->in_interrupt); 411 } 412 413 static void 414 _reactor_set_interrupt_mode(void *arg1, void *arg2) 415 { 416 struct spdk_reactor *target = arg1; 417 struct spdk_thread *thread; 418 struct spdk_fd_group *grp; 419 struct spdk_lw_thread *lw_thread, *tmp; 420 421 assert(target == spdk_reactor_get(spdk_env_get_current_core())); 422 assert(target != NULL); 423 assert(target->in_interrupt != target->new_in_interrupt); 424 SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n", 425 target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll"); 426 427 target->in_interrupt = target->new_in_interrupt; 428 429 if (spdk_interrupt_mode_is_enabled()) { 430 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 431 TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) { 432 thread = spdk_thread_get_from_ctx(lw_thread); 433 if (target->in_interrupt) { 434 grp = spdk_thread_get_interrupt_fd_group(thread); 435 spdk_fd_group_nest(target->fgrp, grp); 436 } else { 437 grp = spdk_thread_get_interrupt_fd_group(thread); 438 spdk_fd_group_unnest(target->fgrp, grp); 439 } 440 441 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target); 442 } 443 } 444 445 if (target->new_in_interrupt == false) { 446 /* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately 447 * track reactor stats. */ 448 target->tsc_last = spdk_get_ticks(); 449 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 450 } else { 451 uint64_t notify = 1; 452 int rc = 0; 453 454 /* Always trigger spdk_event and resched event in case of race condition */ 455 rc = write(target->events_fd, ¬ify, sizeof(notify)); 456 if (rc < 0) { 457 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 458 } 459 rc = write(target->resched_fd, ¬ify, sizeof(notify)); 460 if (rc < 0) { 461 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 462 } 463 464 target->set_interrupt_mode_in_progress = false; 465 _event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn, 466 target->set_interrupt_mode_cb_arg, NULL); 467 } 468 } 469 470 int 471 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, 472 spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg) 473 { 474 struct spdk_reactor *target; 475 476 target = spdk_reactor_get(lcore); 477 if (target == NULL) { 478 return -EINVAL; 479 } 480 481 /* Eventfd has to be supported in order to use interrupt functionality. */ 482 if (target->fgrp == NULL) { 483 return -ENOTSUP; 484 } 485 486 if (spdk_env_get_current_core() != g_scheduling_reactor->lcore) { 487 SPDK_ERRLOG("It is only permitted within scheduling reactor.\n"); 488 return -EPERM; 489 } 490 491 if (target->in_interrupt == new_in_interrupt) { 492 cb_fn(cb_arg, NULL); 493 return 0; 494 } 495 496 if (target->set_interrupt_mode_in_progress) { 497 SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore); 498 return -EBUSY; 499 } 500 target->set_interrupt_mode_in_progress = true; 501 502 target->new_in_interrupt = new_in_interrupt; 503 target->set_interrupt_mode_cb_fn = cb_fn; 504 target->set_interrupt_mode_cb_arg = cb_arg; 505 506 SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n", 507 spdk_env_get_current_core(), lcore); 508 509 if (new_in_interrupt == false) { 510 /* For potential race cases, when setting the reactor to poll mode, 511 * first change the mode of the reactor and then clear the corresponding 512 * bit of the notify_cpuset of each reactor. 513 */ 514 _event_call(lcore, _reactor_set_interrupt_mode, target, NULL); 515 } else { 516 /* For race cases, when setting the reactor to interrupt mode, first set the 517 * corresponding bit of the notify_cpuset of each reactor and then change the mode. 518 */ 519 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 520 } 521 522 return 0; 523 } 524 525 struct spdk_event * 526 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 527 { 528 struct spdk_event *event = NULL; 529 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 530 531 if (!reactor) { 532 assert(false); 533 return NULL; 534 } 535 536 event = spdk_mempool_get(g_spdk_event_mempool); 537 if (event == NULL) { 538 assert(false); 539 return NULL; 540 } 541 542 event->lcore = lcore; 543 event->fn = fn; 544 event->arg1 = arg1; 545 event->arg2 = arg2; 546 547 return event; 548 } 549 550 void 551 spdk_event_call(struct spdk_event *event) 552 { 553 int rc; 554 struct spdk_reactor *reactor; 555 struct spdk_reactor *local_reactor = NULL; 556 uint32_t current_core = spdk_env_get_current_core(); 557 558 reactor = spdk_reactor_get(event->lcore); 559 560 assert(reactor != NULL); 561 assert(reactor->events != NULL); 562 563 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); 564 if (rc != 1) { 565 assert(false); 566 } 567 568 if (current_core != SPDK_ENV_LCORE_ID_ANY) { 569 local_reactor = spdk_reactor_get(current_core); 570 } 571 572 /* If spdk_event_call isn't called on a reactor, always send a notification. 573 * If it is called on a reactor, send a notification if the destination reactor 574 * is indicated in interrupt mode state. 575 */ 576 if (spdk_unlikely(local_reactor == NULL) || 577 spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) { 578 uint64_t notify = 1; 579 580 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 581 if (rc < 0) { 582 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 583 } 584 } 585 } 586 587 static inline int 588 event_queue_run_batch(void *arg) 589 { 590 struct spdk_reactor *reactor = arg; 591 size_t count, i; 592 void *events[SPDK_EVENT_BATCH_SIZE]; 593 594 #ifdef DEBUG 595 /* 596 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 597 * so we will never actually read uninitialized data from events, but just to be sure 598 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 599 */ 600 memset(events, 0, sizeof(events)); 601 #endif 602 603 /* Operate event notification if this reactor currently runs in interrupt state */ 604 if (spdk_unlikely(reactor->in_interrupt)) { 605 uint64_t notify = 1; 606 int rc; 607 608 /* There may be race between event_acknowledge and another producer's event_notify, 609 * so event_acknowledge should be applied ahead. And then check for self's event_notify. 610 * This can avoid event notification missing. 611 */ 612 rc = read(reactor->events_fd, ¬ify, sizeof(notify)); 613 if (rc < 0) { 614 SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno)); 615 return -errno; 616 } 617 618 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 619 620 if (spdk_ring_count(reactor->events) != 0) { 621 /* Trigger new notification if there are still events in event-queue waiting for processing. */ 622 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 623 if (rc < 0) { 624 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 625 return -errno; 626 } 627 } 628 } else { 629 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 630 } 631 632 if (count == 0) { 633 return 0; 634 } 635 636 for (i = 0; i < count; i++) { 637 struct spdk_event *event = events[i]; 638 639 assert(event != NULL); 640 assert(spdk_get_thread() == NULL); 641 SPDK_DTRACE_PROBE3(event_exec, event->fn, 642 event->arg1, event->arg2); 643 event->fn(event->arg1, event->arg2); 644 } 645 646 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 647 648 return (int)count; 649 } 650 651 /* 1s */ 652 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 653 654 static int 655 get_rusage(struct spdk_reactor *reactor) 656 { 657 struct rusage rusage; 658 659 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 660 return -1; 661 } 662 663 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 664 SPDK_INFOLOG(reactor, 665 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 666 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 667 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 668 } 669 reactor->rusage = rusage; 670 671 return -1; 672 } 673 674 void 675 spdk_framework_enable_context_switch_monitor(bool enable) 676 { 677 /* This global is being read by multiple threads, so this isn't 678 * strictly thread safe. However, we're toggling between true and 679 * false here, and if a thread sees the value update later than it 680 * should, it's no big deal. */ 681 g_framework_context_switch_monitor_enabled = enable; 682 } 683 684 bool 685 spdk_framework_context_switch_monitor_enabled(void) 686 { 687 return g_framework_context_switch_monitor_enabled; 688 } 689 690 static void 691 _set_thread_name(const char *thread_name) 692 { 693 #if defined(__linux__) 694 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 695 #elif defined(__FreeBSD__) 696 pthread_set_name_np(pthread_self(), thread_name); 697 #else 698 pthread_setname_np(pthread_self(), thread_name); 699 #endif 700 } 701 702 static void 703 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 704 { 705 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 706 struct spdk_thread_stats prev_total_stats; 707 708 /* Read total_stats before updating it to calculate stats during the last scheduling period. */ 709 prev_total_stats = lw_thread->total_stats; 710 711 spdk_set_thread(thread); 712 spdk_thread_get_stats(&lw_thread->total_stats); 713 spdk_set_thread(NULL); 714 715 lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc; 716 lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc; 717 } 718 719 static void 720 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info) 721 { 722 struct spdk_lw_thread *lw_thread; 723 struct spdk_thread *thread; 724 725 thread = spdk_thread_get_by_id(thread_info->thread_id); 726 if (thread == NULL) { 727 /* Thread no longer exists. */ 728 return; 729 } 730 lw_thread = spdk_thread_get_ctx(thread); 731 assert(lw_thread != NULL); 732 733 lw_thread->lcore = thread_info->lcore; 734 lw_thread->resched = true; 735 } 736 737 static void 738 _threads_reschedule(struct spdk_scheduler_core_info *cores_info) 739 { 740 struct spdk_scheduler_core_info *core; 741 struct spdk_scheduler_thread_info *thread_info; 742 uint32_t i, j; 743 744 SPDK_ENV_FOREACH_CORE(i) { 745 core = &cores_info[i]; 746 for (j = 0; j < core->threads_count; j++) { 747 thread_info = &core->thread_infos[j]; 748 if (thread_info->lcore != i) { 749 if (core->isolated || cores_info[thread_info->lcore].isolated) { 750 SPDK_ERRLOG("A thread cannot be moved from an isolated core or \ 751 moved to an isolated core. Skip rescheduling thread\n"); 752 continue; 753 } 754 _threads_reschedule_thread(thread_info); 755 } 756 } 757 core->threads_count = 0; 758 free(core->thread_infos); 759 core->thread_infos = NULL; 760 } 761 } 762 763 static void 764 _reactors_scheduler_fini(void) 765 { 766 /* Reschedule based on the balancing output */ 767 _threads_reschedule(g_core_infos); 768 769 g_scheduling_in_progress = false; 770 } 771 772 static void 773 _reactors_scheduler_update_core_mode(void *ctx1, void *ctx2) 774 { 775 struct spdk_reactor *reactor; 776 uint32_t i; 777 int rc = 0; 778 779 for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) { 780 reactor = spdk_reactor_get(i); 781 assert(reactor != NULL); 782 if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) { 783 /* Switch next found reactor to new state */ 784 rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode, 785 _reactors_scheduler_update_core_mode, NULL); 786 if (rc == 0) { 787 /* Set core to start with after callback completes */ 788 g_scheduler_core_number = spdk_env_get_next_core(i); 789 return; 790 } 791 } 792 } 793 _reactors_scheduler_fini(); 794 } 795 796 static void 797 _reactors_scheduler_cancel(void *arg1, void *arg2) 798 { 799 struct spdk_scheduler_core_info *core; 800 uint32_t i; 801 802 SPDK_ENV_FOREACH_CORE(i) { 803 core = &g_core_infos[i]; 804 core->threads_count = 0; 805 free(core->thread_infos); 806 core->thread_infos = NULL; 807 } 808 809 g_scheduling_in_progress = false; 810 } 811 812 static void 813 _reactors_scheduler_balance(void *arg1, void *arg2) 814 { 815 struct spdk_scheduler *scheduler = spdk_scheduler_get(); 816 817 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) { 818 _reactors_scheduler_cancel(NULL, NULL); 819 return; 820 } 821 822 scheduler->balance(g_core_infos, g_reactor_count); 823 824 g_scheduler_core_number = spdk_env_get_first_core(); 825 _reactors_scheduler_update_core_mode(NULL, NULL); 826 } 827 828 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */ 829 static void 830 _reactors_scheduler_gather_metrics(void *arg1, void *arg2) 831 { 832 struct spdk_scheduler_core_info *core_info; 833 struct spdk_lw_thread *lw_thread; 834 struct spdk_thread *thread; 835 struct spdk_reactor *reactor; 836 uint32_t next_core; 837 uint32_t i = 0; 838 839 reactor = spdk_reactor_get(spdk_env_get_current_core()); 840 assert(reactor != NULL); 841 core_info = &g_core_infos[reactor->lcore]; 842 core_info->lcore = reactor->lcore; 843 core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc; 844 core_info->total_idle_tsc = reactor->idle_tsc; 845 core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc; 846 core_info->total_busy_tsc = reactor->busy_tsc; 847 core_info->interrupt_mode = reactor->in_interrupt; 848 core_info->threads_count = 0; 849 core_info->isolated = scheduler_is_isolated_core(reactor->lcore); 850 851 SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); 852 853 if (reactor->thread_count > 0) { 854 core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos)); 855 if (core_info->thread_infos == NULL) { 856 SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore); 857 858 /* Cancel this round of schedule work */ 859 _event_call(spdk_scheduler_get_scheduling_lcore(), _reactors_scheduler_cancel, NULL, NULL); 860 return; 861 } 862 863 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 864 _init_thread_stats(reactor, lw_thread); 865 866 core_info->thread_infos[i].lcore = lw_thread->lcore; 867 thread = spdk_thread_get_from_ctx(lw_thread); 868 assert(thread != NULL); 869 core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread); 870 core_info->thread_infos[i].total_stats = lw_thread->total_stats; 871 core_info->thread_infos[i].current_stats = lw_thread->current_stats; 872 core_info->threads_count++; 873 assert(core_info->threads_count <= reactor->thread_count); 874 i++; 875 } 876 } 877 878 next_core = spdk_env_get_next_core(reactor->lcore); 879 if (next_core == UINT32_MAX) { 880 next_core = spdk_env_get_first_core(); 881 } 882 883 /* If we've looped back around to the scheduler thread, move to the next phase */ 884 if (next_core == spdk_scheduler_get_scheduling_lcore()) { 885 /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */ 886 _event_call(next_core, _reactors_scheduler_balance, NULL, NULL); 887 return; 888 } 889 890 _event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL); 891 } 892 893 static int _reactor_schedule_thread(struct spdk_thread *thread); 894 static uint64_t g_rusage_period; 895 896 static void 897 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 898 { 899 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 900 struct spdk_fd_group *grp; 901 902 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 903 assert(reactor->thread_count > 0); 904 reactor->thread_count--; 905 906 /* Operate thread intr if running with full interrupt ability */ 907 if (spdk_interrupt_mode_is_enabled()) { 908 if (reactor->in_interrupt) { 909 grp = spdk_thread_get_interrupt_fd_group(thread); 910 spdk_fd_group_unnest(reactor->fgrp, grp); 911 } 912 } 913 } 914 915 static bool 916 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 917 { 918 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 919 920 if (spdk_unlikely(spdk_thread_is_exited(thread) && 921 spdk_thread_is_idle(thread))) { 922 _reactor_remove_lw_thread(reactor, lw_thread); 923 spdk_thread_destroy(thread); 924 return true; 925 } 926 927 if (spdk_unlikely(lw_thread->resched && !spdk_thread_is_bound(thread))) { 928 lw_thread->resched = false; 929 _reactor_remove_lw_thread(reactor, lw_thread); 930 _reactor_schedule_thread(thread); 931 return true; 932 } 933 934 return false; 935 } 936 937 static void 938 reactor_interrupt_run(struct spdk_reactor *reactor) 939 { 940 int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */ 941 942 spdk_fd_group_wait(reactor->fgrp, block_timeout); 943 } 944 945 static void 946 _reactor_run(struct spdk_reactor *reactor) 947 { 948 struct spdk_thread *thread; 949 struct spdk_lw_thread *lw_thread, *tmp; 950 uint64_t now; 951 int rc; 952 953 event_queue_run_batch(reactor); 954 955 /* If no threads are present on the reactor, 956 * tsc_last gets outdated. Update it to track 957 * thread execution time correctly. */ 958 if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) { 959 now = spdk_get_ticks(); 960 reactor->idle_tsc += now - reactor->tsc_last; 961 reactor->tsc_last = now; 962 return; 963 } 964 965 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 966 thread = spdk_thread_get_from_ctx(lw_thread); 967 rc = spdk_thread_poll(thread, 0, reactor->tsc_last); 968 969 now = spdk_thread_get_last_tsc(thread); 970 if (rc == 0) { 971 reactor->idle_tsc += now - reactor->tsc_last; 972 } else if (rc > 0) { 973 reactor->busy_tsc += now - reactor->tsc_last; 974 } 975 reactor->tsc_last = now; 976 977 reactor_post_process_lw_thread(reactor, lw_thread); 978 } 979 } 980 981 static int 982 reactor_run(void *arg) 983 { 984 struct spdk_reactor *reactor = arg; 985 struct spdk_thread *thread; 986 struct spdk_lw_thread *lw_thread, *tmp; 987 char thread_name[32]; 988 uint64_t last_sched = 0; 989 990 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 991 992 /* Rename the POSIX thread because the reactor is tied to the POSIX 993 * thread in the SPDK event library. 994 */ 995 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 996 _set_thread_name(thread_name); 997 998 reactor->tsc_last = spdk_get_ticks(); 999 1000 while (1) { 1001 /* Execute interrupt process fn if this reactor currently runs in interrupt state */ 1002 if (spdk_unlikely(reactor->in_interrupt)) { 1003 reactor_interrupt_run(reactor); 1004 } else { 1005 _reactor_run(reactor); 1006 } 1007 1008 if (g_framework_context_switch_monitor_enabled) { 1009 if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { 1010 get_rusage(reactor); 1011 reactor->last_rusage = reactor->tsc_last; 1012 } 1013 } 1014 1015 if (spdk_unlikely(g_scheduler_period > 0 && 1016 (reactor->tsc_last - last_sched) > g_scheduler_period && 1017 reactor == g_scheduling_reactor && 1018 !g_scheduling_in_progress)) { 1019 last_sched = reactor->tsc_last; 1020 g_scheduling_in_progress = true; 1021 _reactors_scheduler_gather_metrics(NULL, NULL); 1022 } 1023 1024 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 1025 break; 1026 } 1027 } 1028 1029 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 1030 thread = spdk_thread_get_from_ctx(lw_thread); 1031 /* All threads should have already had spdk_thread_exit() called on them, except 1032 * for the app thread. 1033 */ 1034 if (spdk_thread_is_running(thread)) { 1035 if (!spdk_thread_is_app_thread(thread)) { 1036 SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n", 1037 spdk_thread_get_name(thread)); 1038 SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n"); 1039 } 1040 spdk_set_thread(thread); 1041 spdk_thread_exit(thread); 1042 } 1043 } 1044 1045 while (!TAILQ_EMPTY(&reactor->threads)) { 1046 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1047 thread = spdk_thread_get_from_ctx(lw_thread); 1048 spdk_set_thread(thread); 1049 if (spdk_thread_is_exited(thread)) { 1050 _reactor_remove_lw_thread(reactor, lw_thread); 1051 spdk_thread_destroy(thread); 1052 } else { 1053 if (spdk_unlikely(reactor->in_interrupt)) { 1054 reactor_interrupt_run(reactor); 1055 } else { 1056 spdk_thread_poll(thread, 0, 0); 1057 } 1058 } 1059 } 1060 } 1061 1062 return 0; 1063 } 1064 1065 int 1066 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 1067 { 1068 int ret; 1069 const struct spdk_cpuset *validmask; 1070 1071 ret = spdk_cpuset_parse(cpumask, mask); 1072 if (ret < 0) { 1073 return ret; 1074 } 1075 1076 validmask = spdk_app_get_core_mask(); 1077 spdk_cpuset_and(cpumask, validmask); 1078 1079 return 0; 1080 } 1081 1082 const struct spdk_cpuset * 1083 spdk_app_get_core_mask(void) 1084 { 1085 return &g_reactor_core_mask; 1086 } 1087 1088 void 1089 spdk_reactors_start(void) 1090 { 1091 struct spdk_reactor *reactor; 1092 uint32_t i, current_core; 1093 int rc; 1094 1095 g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; 1096 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 1097 /* Reinitialize to false, in case the app framework is restarting in the same process. */ 1098 g_stopping_reactors = false; 1099 1100 current_core = spdk_env_get_current_core(); 1101 SPDK_ENV_FOREACH_CORE(i) { 1102 if (i != current_core) { 1103 reactor = spdk_reactor_get(i); 1104 if (reactor == NULL) { 1105 continue; 1106 } 1107 1108 rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); 1109 if (rc < 0) { 1110 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 1111 assert(false); 1112 return; 1113 } 1114 } 1115 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 1116 } 1117 1118 /* Start the main reactor */ 1119 reactor = spdk_reactor_get(current_core); 1120 assert(reactor != NULL); 1121 reactor_run(reactor); 1122 1123 spdk_env_thread_wait_all(); 1124 1125 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 1126 } 1127 1128 static void 1129 _reactors_stop(void *arg1, void *arg2) 1130 { 1131 uint32_t i; 1132 int rc; 1133 struct spdk_reactor *reactor; 1134 struct spdk_reactor *local_reactor; 1135 uint64_t notify = 1; 1136 1137 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 1138 local_reactor = spdk_reactor_get(spdk_env_get_current_core()); 1139 1140 SPDK_ENV_FOREACH_CORE(i) { 1141 /* If spdk_event_call isn't called on a reactor, always send a notification. 1142 * If it is called on a reactor, send a notification if the destination reactor 1143 * is indicated in interrupt mode state. 1144 */ 1145 if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) { 1146 reactor = spdk_reactor_get(i); 1147 assert(reactor != NULL); 1148 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 1149 if (rc < 0) { 1150 SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno)); 1151 continue; 1152 } 1153 } 1154 } 1155 } 1156 1157 static void 1158 nop(void *arg1, void *arg2) 1159 { 1160 } 1161 1162 void 1163 spdk_reactors_stop(void *arg1) 1164 { 1165 spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop); 1166 } 1167 1168 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 1169 static uint32_t g_next_core = UINT32_MAX; 1170 1171 static void 1172 _schedule_thread(void *arg1, void *arg2) 1173 { 1174 struct spdk_lw_thread *lw_thread = arg1; 1175 struct spdk_thread *thread; 1176 struct spdk_reactor *reactor; 1177 uint32_t current_core; 1178 struct spdk_fd_group *grp; 1179 1180 current_core = spdk_env_get_current_core(); 1181 reactor = spdk_reactor_get(current_core); 1182 assert(reactor != NULL); 1183 1184 /* Update total_stats to reflect state of thread 1185 * at the end of the move. */ 1186 thread = spdk_thread_get_from_ctx(lw_thread); 1187 spdk_set_thread(thread); 1188 spdk_thread_get_stats(&lw_thread->total_stats); 1189 spdk_set_thread(NULL); 1190 1191 lw_thread->lcore = current_core; 1192 1193 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 1194 reactor->thread_count++; 1195 1196 /* Operate thread intr if running with full interrupt ability */ 1197 if (spdk_interrupt_mode_is_enabled()) { 1198 int rc; 1199 1200 if (reactor->in_interrupt) { 1201 grp = spdk_thread_get_interrupt_fd_group(thread); 1202 rc = spdk_fd_group_nest(reactor->fgrp, grp); 1203 if (rc < 0) { 1204 SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc)); 1205 } 1206 } 1207 1208 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 1209 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor); 1210 } 1211 } 1212 1213 static int 1214 _reactor_schedule_thread(struct spdk_thread *thread) 1215 { 1216 uint32_t core; 1217 struct spdk_lw_thread *lw_thread; 1218 struct spdk_event *evt = NULL; 1219 struct spdk_cpuset *cpumask; 1220 uint32_t i; 1221 struct spdk_reactor *local_reactor = NULL; 1222 uint32_t current_lcore = spdk_env_get_current_core(); 1223 struct spdk_cpuset polling_cpumask; 1224 struct spdk_cpuset valid_cpumask; 1225 1226 cpumask = spdk_thread_get_cpumask(thread); 1227 1228 lw_thread = spdk_thread_get_ctx(thread); 1229 assert(lw_thread != NULL); 1230 core = lw_thread->lcore; 1231 memset(lw_thread, 0, sizeof(*lw_thread)); 1232 1233 if (current_lcore != SPDK_ENV_LCORE_ID_ANY) { 1234 local_reactor = spdk_reactor_get(current_lcore); 1235 assert(local_reactor); 1236 } 1237 1238 /* When interrupt ability of spdk_thread is not enabled and the current 1239 * reactor runs on DPDK thread, skip reactors which are in interrupt mode. 1240 */ 1241 if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) { 1242 /* Get the cpumask of all reactors in polling */ 1243 spdk_cpuset_zero(&polling_cpumask); 1244 SPDK_ENV_FOREACH_CORE(i) { 1245 spdk_cpuset_set_cpu(&polling_cpumask, i, true); 1246 } 1247 spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset); 1248 1249 if (core == SPDK_ENV_LCORE_ID_ANY) { 1250 /* Get the cpumask of all valid reactors which are suggested and also in polling */ 1251 spdk_cpuset_copy(&valid_cpumask, &polling_cpumask); 1252 spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread)); 1253 1254 /* If there are any valid reactors, spdk_thread should be scheduled 1255 * into one of the valid reactors. 1256 * If there is no valid reactors, spdk_thread should be scheduled 1257 * into one of the polling reactors. 1258 */ 1259 if (spdk_cpuset_count(&valid_cpumask) != 0) { 1260 cpumask = &valid_cpumask; 1261 } else { 1262 cpumask = &polling_cpumask; 1263 } 1264 } else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) { 1265 /* If specified reactor is not in polling, spdk_thread should be scheduled 1266 * into one of the polling reactors. 1267 */ 1268 core = SPDK_ENV_LCORE_ID_ANY; 1269 cpumask = &polling_cpumask; 1270 } 1271 } 1272 1273 pthread_mutex_lock(&g_scheduler_mtx); 1274 if (core == SPDK_ENV_LCORE_ID_ANY) { 1275 for (i = 0; i < spdk_env_get_core_count(); i++) { 1276 if (g_next_core >= g_reactor_count) { 1277 g_next_core = spdk_env_get_first_core(); 1278 } 1279 core = g_next_core; 1280 g_next_core = spdk_env_get_next_core(g_next_core); 1281 1282 if (spdk_cpuset_get_cpu(cpumask, core)) { 1283 break; 1284 } 1285 } 1286 } 1287 1288 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 1289 1290 pthread_mutex_unlock(&g_scheduler_mtx); 1291 1292 assert(evt != NULL); 1293 if (evt == NULL) { 1294 SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); 1295 return -1; 1296 } 1297 1298 lw_thread->tsc_start = spdk_get_ticks(); 1299 1300 spdk_event_call(evt); 1301 1302 return 0; 1303 } 1304 1305 static void 1306 _reactor_request_thread_reschedule(struct spdk_thread *thread) 1307 { 1308 struct spdk_lw_thread *lw_thread; 1309 struct spdk_reactor *reactor; 1310 uint32_t current_core; 1311 1312 assert(thread == spdk_get_thread()); 1313 1314 lw_thread = spdk_thread_get_ctx(thread); 1315 1316 assert(lw_thread != NULL); 1317 lw_thread->resched = true; 1318 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1319 1320 current_core = spdk_env_get_current_core(); 1321 reactor = spdk_reactor_get(current_core); 1322 assert(reactor != NULL); 1323 1324 /* Send a notification if the destination reactor is indicated in intr mode state */ 1325 if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) { 1326 uint64_t notify = 1; 1327 1328 if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1329 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 1330 } 1331 } 1332 } 1333 1334 static int 1335 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) 1336 { 1337 struct spdk_lw_thread *lw_thread; 1338 1339 switch (op) { 1340 case SPDK_THREAD_OP_NEW: 1341 lw_thread = spdk_thread_get_ctx(thread); 1342 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1343 return _reactor_schedule_thread(thread); 1344 case SPDK_THREAD_OP_RESCHED: 1345 _reactor_request_thread_reschedule(thread); 1346 return 0; 1347 default: 1348 return -ENOTSUP; 1349 } 1350 } 1351 1352 static bool 1353 reactor_thread_op_supported(enum spdk_thread_op op) 1354 { 1355 switch (op) { 1356 case SPDK_THREAD_OP_NEW: 1357 case SPDK_THREAD_OP_RESCHED: 1358 return true; 1359 default: 1360 return false; 1361 } 1362 } 1363 1364 struct call_reactor { 1365 uint32_t cur_core; 1366 spdk_event_fn fn; 1367 void *arg1; 1368 void *arg2; 1369 1370 uint32_t orig_core; 1371 spdk_event_fn cpl; 1372 }; 1373 1374 static void 1375 on_reactor(void *arg1, void *arg2) 1376 { 1377 struct call_reactor *cr = arg1; 1378 struct spdk_event *evt; 1379 1380 cr->fn(cr->arg1, cr->arg2); 1381 1382 cr->cur_core = spdk_env_get_next_core(cr->cur_core); 1383 1384 if (cr->cur_core >= g_reactor_count) { 1385 SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n"); 1386 1387 evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); 1388 free(cr); 1389 } else { 1390 SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n", 1391 cr->cur_core); 1392 1393 evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); 1394 } 1395 assert(evt != NULL); 1396 spdk_event_call(evt); 1397 } 1398 1399 void 1400 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) 1401 { 1402 struct call_reactor *cr; 1403 1404 /* When the application framework is shutting down, we will send one 1405 * final for_each_reactor operation with completion callback _reactors_stop, 1406 * to flush any existing for_each_reactor operations to avoid any memory 1407 * leaks. We use a mutex here to protect a boolean flag that will ensure 1408 * we don't start any more operations once we've started shutting down. 1409 */ 1410 pthread_mutex_lock(&g_stopping_reactors_mtx); 1411 if (g_stopping_reactors) { 1412 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1413 return; 1414 } else if (cpl == _reactors_stop) { 1415 g_stopping_reactors = true; 1416 } 1417 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1418 1419 cr = calloc(1, sizeof(*cr)); 1420 if (!cr) { 1421 SPDK_ERRLOG("Unable to perform reactor iteration\n"); 1422 cpl(arg1, arg2); 1423 return; 1424 } 1425 1426 cr->fn = fn; 1427 cr->arg1 = arg1; 1428 cr->arg2 = arg2; 1429 cr->cpl = cpl; 1430 cr->orig_core = spdk_env_get_current_core(); 1431 cr->cur_core = spdk_env_get_first_core(); 1432 1433 SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core); 1434 1435 _event_call(cr->cur_core, on_reactor, cr, NULL); 1436 } 1437 1438 #ifdef __linux__ 1439 static int 1440 reactor_schedule_thread_event(void *arg) 1441 { 1442 struct spdk_reactor *reactor = arg; 1443 struct spdk_lw_thread *lw_thread, *tmp; 1444 uint32_t count = 0; 1445 uint64_t notify = 1; 1446 1447 assert(reactor->in_interrupt); 1448 1449 if (read(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1450 SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno)); 1451 return -errno; 1452 } 1453 1454 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1455 count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0; 1456 } 1457 1458 return count; 1459 } 1460 1461 static int 1462 reactor_interrupt_init(struct spdk_reactor *reactor) 1463 { 1464 int rc; 1465 1466 rc = spdk_fd_group_create(&reactor->fgrp); 1467 if (rc != 0) { 1468 return rc; 1469 } 1470 1471 reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1472 if (reactor->resched_fd < 0) { 1473 rc = -EBADF; 1474 goto err; 1475 } 1476 1477 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event, 1478 reactor); 1479 if (rc) { 1480 close(reactor->resched_fd); 1481 goto err; 1482 } 1483 1484 reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1485 if (reactor->events_fd < 0) { 1486 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1487 close(reactor->resched_fd); 1488 1489 rc = -EBADF; 1490 goto err; 1491 } 1492 1493 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd, 1494 event_queue_run_batch, reactor); 1495 if (rc) { 1496 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1497 close(reactor->resched_fd); 1498 close(reactor->events_fd); 1499 goto err; 1500 } 1501 1502 return 0; 1503 1504 err: 1505 spdk_fd_group_destroy(reactor->fgrp); 1506 reactor->fgrp = NULL; 1507 return rc; 1508 } 1509 #else 1510 static int 1511 reactor_interrupt_init(struct spdk_reactor *reactor) 1512 { 1513 return -ENOTSUP; 1514 } 1515 #endif 1516 1517 static void 1518 reactor_interrupt_fini(struct spdk_reactor *reactor) 1519 { 1520 struct spdk_fd_group *fgrp = reactor->fgrp; 1521 1522 if (!fgrp) { 1523 return; 1524 } 1525 1526 spdk_fd_group_remove(fgrp, reactor->events_fd); 1527 spdk_fd_group_remove(fgrp, reactor->resched_fd); 1528 1529 close(reactor->events_fd); 1530 close(reactor->resched_fd); 1531 1532 spdk_fd_group_destroy(fgrp); 1533 reactor->fgrp = NULL; 1534 } 1535 1536 static struct spdk_governor * 1537 _governor_find(const char *name) 1538 { 1539 struct spdk_governor *governor, *tmp; 1540 1541 TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) { 1542 if (strcmp(name, governor->name) == 0) { 1543 return governor; 1544 } 1545 } 1546 1547 return NULL; 1548 } 1549 1550 int 1551 spdk_governor_set(const char *name) 1552 { 1553 struct spdk_governor *governor; 1554 int rc = 0; 1555 1556 /* NULL governor was specifically requested */ 1557 if (name == NULL) { 1558 if (g_governor) { 1559 g_governor->deinit(); 1560 } 1561 g_governor = NULL; 1562 return 0; 1563 } 1564 1565 governor = _governor_find(name); 1566 if (governor == NULL) { 1567 return -EINVAL; 1568 } 1569 1570 if (g_governor == governor) { 1571 return 0; 1572 } 1573 1574 rc = governor->init(); 1575 if (rc == 0) { 1576 if (g_governor) { 1577 g_governor->deinit(); 1578 } 1579 g_governor = governor; 1580 } 1581 1582 return rc; 1583 } 1584 1585 struct spdk_governor * 1586 spdk_governor_get(void) 1587 { 1588 return g_governor; 1589 } 1590 1591 void 1592 spdk_governor_register(struct spdk_governor *governor) 1593 { 1594 if (_governor_find(governor->name)) { 1595 SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name); 1596 assert(false); 1597 return; 1598 } 1599 1600 TAILQ_INSERT_TAIL(&g_governor_list, governor, link); 1601 } 1602 1603 SPDK_LOG_REGISTER_COMPONENT(reactor) 1604