1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/likely.h" 8 9 #include "spdk_internal/event.h" 10 #include "spdk_internal/usdt.h" 11 12 #include "spdk/log.h" 13 #include "spdk/thread.h" 14 #include "spdk/env.h" 15 #include "spdk/util.h" 16 #include "spdk/scheduler.h" 17 #include "spdk/string.h" 18 #include "spdk/fd_group.h" 19 20 #ifdef __linux__ 21 #include <sys/prctl.h> 22 #include <sys/eventfd.h> 23 #endif 24 25 #ifdef __FreeBSD__ 26 #include <pthread_np.h> 27 #endif 28 29 #define SPDK_EVENT_BATCH_SIZE 8 30 31 static struct spdk_reactor *g_reactors; 32 static uint32_t g_reactor_count; 33 static struct spdk_cpuset g_reactor_core_mask; 34 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; 35 36 static bool g_framework_context_switch_monitor_enabled = true; 37 38 static struct spdk_mempool *g_spdk_event_mempool = NULL; 39 40 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list 41 = TAILQ_HEAD_INITIALIZER(g_scheduler_list); 42 43 static struct spdk_scheduler *g_scheduler = NULL; 44 static struct spdk_reactor *g_scheduling_reactor; 45 bool g_scheduling_in_progress = false; 46 static uint64_t g_scheduler_period = 0; 47 static uint32_t g_scheduler_core_number; 48 static struct spdk_scheduler_core_info *g_core_infos = NULL; 49 50 TAILQ_HEAD(, spdk_governor) g_governor_list 51 = TAILQ_HEAD_INITIALIZER(g_governor_list); 52 53 static struct spdk_governor *g_governor = NULL; 54 55 static int reactor_interrupt_init(struct spdk_reactor *reactor); 56 static void reactor_interrupt_fini(struct spdk_reactor *reactor); 57 58 static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER; 59 static bool g_stopping_reactors = false; 60 61 static struct spdk_scheduler * 62 _scheduler_find(const char *name) 63 { 64 struct spdk_scheduler *tmp; 65 66 TAILQ_FOREACH(tmp, &g_scheduler_list, link) { 67 if (strcmp(name, tmp->name) == 0) { 68 return tmp; 69 } 70 } 71 72 return NULL; 73 } 74 75 int 76 spdk_scheduler_set(const char *name) 77 { 78 struct spdk_scheduler *scheduler; 79 int rc = 0; 80 81 /* NULL scheduler was specifically requested */ 82 if (name == NULL) { 83 if (g_scheduler) { 84 g_scheduler->deinit(); 85 } 86 g_scheduler = NULL; 87 return 0; 88 } 89 90 scheduler = _scheduler_find(name); 91 if (scheduler == NULL) { 92 SPDK_ERRLOG("Requested scheduler is missing\n"); 93 return -EINVAL; 94 } 95 96 if (g_scheduler == scheduler) { 97 return 0; 98 } 99 100 rc = scheduler->init(); 101 if (rc == 0) { 102 if (g_scheduler) { 103 g_scheduler->deinit(); 104 } 105 g_scheduler = scheduler; 106 } 107 108 return rc; 109 } 110 111 struct spdk_scheduler * 112 spdk_scheduler_get(void) 113 { 114 return g_scheduler; 115 } 116 117 uint64_t 118 spdk_scheduler_get_period(void) 119 { 120 /* Convert from ticks to microseconds */ 121 return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz()); 122 } 123 124 void 125 spdk_scheduler_set_period(uint64_t period) 126 { 127 /* Convert microseconds to ticks */ 128 g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 129 } 130 131 void 132 spdk_scheduler_register(struct spdk_scheduler *scheduler) 133 { 134 if (_scheduler_find(scheduler->name)) { 135 SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name); 136 assert(false); 137 return; 138 } 139 140 TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link); 141 } 142 143 static void 144 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 145 { 146 reactor->lcore = lcore; 147 reactor->flags.is_valid = true; 148 149 TAILQ_INIT(&reactor->threads); 150 reactor->thread_count = 0; 151 spdk_cpuset_zero(&reactor->notify_cpuset); 152 153 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 154 if (reactor->events == NULL) { 155 SPDK_ERRLOG("Failed to allocate events ring\n"); 156 assert(false); 157 } 158 159 /* Always initialize interrupt facilities for reactor */ 160 if (reactor_interrupt_init(reactor) != 0) { 161 /* Reactor interrupt facilities are necessary if seting app to interrupt mode. */ 162 if (spdk_interrupt_mode_is_enabled()) { 163 SPDK_ERRLOG("Failed to prepare intr facilities\n"); 164 assert(false); 165 } 166 return; 167 } 168 169 /* If application runs with full interrupt ability, 170 * all reactors are going to run in interrupt mode. 171 */ 172 if (spdk_interrupt_mode_is_enabled()) { 173 uint32_t i; 174 175 SPDK_ENV_FOREACH_CORE(i) { 176 spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true); 177 } 178 reactor->in_interrupt = true; 179 } 180 } 181 182 struct spdk_reactor * 183 spdk_reactor_get(uint32_t lcore) 184 { 185 struct spdk_reactor *reactor; 186 187 if (g_reactors == NULL) { 188 SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); 189 return NULL; 190 } 191 192 if (lcore >= g_reactor_count) { 193 return NULL; 194 } 195 196 reactor = &g_reactors[lcore]; 197 198 if (reactor->flags.is_valid == false) { 199 return NULL; 200 } 201 202 return reactor; 203 } 204 205 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); 206 static bool reactor_thread_op_supported(enum spdk_thread_op op); 207 208 int 209 spdk_reactors_init(size_t msg_mempool_size) 210 { 211 struct spdk_reactor *reactor; 212 int rc; 213 uint32_t i, current_core; 214 char mempool_name[32]; 215 216 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 217 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 218 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 219 sizeof(struct spdk_event), 220 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 221 SPDK_ENV_SOCKET_ID_ANY); 222 223 if (g_spdk_event_mempool == NULL) { 224 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 225 return -1; 226 } 227 228 /* struct spdk_reactor must be aligned on 64 byte boundary */ 229 g_reactor_count = spdk_env_get_last_core() + 1; 230 rc = posix_memalign((void **)&g_reactors, 64, 231 g_reactor_count * sizeof(struct spdk_reactor)); 232 if (rc != 0) { 233 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 234 g_reactor_count); 235 spdk_mempool_free(g_spdk_event_mempool); 236 return -1; 237 } 238 239 g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos)); 240 if (g_core_infos == NULL) { 241 SPDK_ERRLOG("Could not allocate memory for g_core_infos\n"); 242 spdk_mempool_free(g_spdk_event_mempool); 243 free(g_reactors); 244 return -ENOMEM; 245 } 246 247 memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor)); 248 249 rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, 250 sizeof(struct spdk_lw_thread), msg_mempool_size); 251 if (rc != 0) { 252 SPDK_ERRLOG("Initialize spdk thread lib failed\n"); 253 spdk_mempool_free(g_spdk_event_mempool); 254 free(g_reactors); 255 free(g_core_infos); 256 return rc; 257 } 258 259 SPDK_ENV_FOREACH_CORE(i) { 260 reactor_construct(&g_reactors[i], i); 261 } 262 263 current_core = spdk_env_get_current_core(); 264 reactor = spdk_reactor_get(current_core); 265 assert(reactor != NULL); 266 g_scheduling_reactor = reactor; 267 268 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 269 270 return 0; 271 } 272 273 void 274 spdk_reactors_fini(void) 275 { 276 uint32_t i; 277 struct spdk_reactor *reactor; 278 279 if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { 280 return; 281 } 282 283 spdk_thread_lib_fini(); 284 285 SPDK_ENV_FOREACH_CORE(i) { 286 reactor = spdk_reactor_get(i); 287 assert(reactor != NULL); 288 assert(reactor->thread_count == 0); 289 if (reactor->events != NULL) { 290 spdk_ring_free(reactor->events); 291 } 292 293 reactor_interrupt_fini(reactor); 294 295 if (g_core_infos != NULL) { 296 free(g_core_infos[i].thread_infos); 297 } 298 } 299 300 spdk_mempool_free(g_spdk_event_mempool); 301 302 free(g_reactors); 303 g_reactors = NULL; 304 free(g_core_infos); 305 g_core_infos = NULL; 306 } 307 308 static void _reactor_set_interrupt_mode(void *arg1, void *arg2); 309 310 static void 311 _reactor_set_notify_cpuset(void *arg1, void *arg2) 312 { 313 struct spdk_reactor *target = arg1; 314 struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); 315 316 assert(reactor != NULL); 317 spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt); 318 } 319 320 static void 321 _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 322 { 323 struct spdk_event *ev; 324 325 ev = spdk_event_allocate(lcore, fn, arg1, arg2); 326 assert(ev); 327 spdk_event_call(ev); 328 } 329 330 static void 331 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2) 332 { 333 struct spdk_reactor *target = arg1; 334 335 if (target->new_in_interrupt == false) { 336 target->set_interrupt_mode_in_progress = false; 337 spdk_thread_send_msg(spdk_thread_get_app_thread(), target->set_interrupt_mode_cb_fn, 338 target->set_interrupt_mode_cb_arg); 339 } else { 340 _event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL); 341 } 342 } 343 344 static void 345 _reactor_set_thread_interrupt_mode(void *ctx) 346 { 347 struct spdk_reactor *reactor = ctx; 348 349 spdk_thread_set_interrupt_mode(reactor->in_interrupt); 350 } 351 352 static void 353 _reactor_set_interrupt_mode(void *arg1, void *arg2) 354 { 355 struct spdk_reactor *target = arg1; 356 struct spdk_thread *thread; 357 struct spdk_lw_thread *lw_thread, *tmp; 358 359 assert(target == spdk_reactor_get(spdk_env_get_current_core())); 360 assert(target != NULL); 361 assert(target->in_interrupt != target->new_in_interrupt); 362 SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n", 363 target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll"); 364 365 target->in_interrupt = target->new_in_interrupt; 366 367 if (spdk_interrupt_mode_is_enabled()) { 368 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 369 TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) { 370 thread = spdk_thread_get_from_ctx(lw_thread); 371 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target); 372 } 373 } 374 375 if (target->new_in_interrupt == false) { 376 /* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately 377 * track reactor stats. */ 378 target->tsc_last = spdk_get_ticks(); 379 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 380 } else { 381 uint64_t notify = 1; 382 int rc = 0; 383 384 /* Always trigger spdk_event and resched event in case of race condition */ 385 rc = write(target->events_fd, ¬ify, sizeof(notify)); 386 if (rc < 0) { 387 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 388 } 389 rc = write(target->resched_fd, ¬ify, sizeof(notify)); 390 if (rc < 0) { 391 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 392 } 393 394 target->set_interrupt_mode_in_progress = false; 395 spdk_thread_send_msg(spdk_thread_get_app_thread(), target->set_interrupt_mode_cb_fn, 396 target->set_interrupt_mode_cb_arg); 397 } 398 } 399 400 int 401 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, 402 spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg) 403 { 404 struct spdk_reactor *target; 405 406 target = spdk_reactor_get(lcore); 407 if (target == NULL) { 408 return -EINVAL; 409 } 410 411 /* Eventfd has to be supported in order to use interrupt functionality. */ 412 if (target->fgrp == NULL) { 413 return -ENOTSUP; 414 } 415 416 if (spdk_get_thread() != spdk_thread_get_app_thread()) { 417 SPDK_ERRLOG("It is only permitted within spdk application thread.\n"); 418 return -EPERM; 419 } 420 421 if (target->in_interrupt == new_in_interrupt) { 422 cb_fn(cb_arg); 423 return 0; 424 } 425 426 if (target->set_interrupt_mode_in_progress) { 427 SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore); 428 return -EBUSY; 429 } 430 target->set_interrupt_mode_in_progress = true; 431 432 target->new_in_interrupt = new_in_interrupt; 433 target->set_interrupt_mode_cb_fn = cb_fn; 434 target->set_interrupt_mode_cb_arg = cb_arg; 435 436 SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n", 437 spdk_env_get_current_core(), lcore); 438 439 if (new_in_interrupt == false) { 440 /* For potential race cases, when setting the reactor to poll mode, 441 * first change the mode of the reactor and then clear the corresponding 442 * bit of the notify_cpuset of each reactor. 443 */ 444 _event_call(lcore, _reactor_set_interrupt_mode, target, NULL); 445 } else { 446 /* For race cases, when setting the reactor to interrupt mode, first set the 447 * corresponding bit of the notify_cpuset of each reactor and then change the mode. 448 */ 449 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 450 } 451 452 return 0; 453 } 454 455 struct spdk_event * 456 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 457 { 458 struct spdk_event *event = NULL; 459 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 460 461 if (!reactor) { 462 assert(false); 463 return NULL; 464 } 465 466 event = spdk_mempool_get(g_spdk_event_mempool); 467 if (event == NULL) { 468 assert(false); 469 return NULL; 470 } 471 472 event->lcore = lcore; 473 event->fn = fn; 474 event->arg1 = arg1; 475 event->arg2 = arg2; 476 477 return event; 478 } 479 480 void 481 spdk_event_call(struct spdk_event *event) 482 { 483 int rc; 484 struct spdk_reactor *reactor; 485 struct spdk_reactor *local_reactor = NULL; 486 uint32_t current_core = spdk_env_get_current_core(); 487 488 reactor = spdk_reactor_get(event->lcore); 489 490 assert(reactor != NULL); 491 assert(reactor->events != NULL); 492 493 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); 494 if (rc != 1) { 495 assert(false); 496 } 497 498 if (current_core != SPDK_ENV_LCORE_ID_ANY) { 499 local_reactor = spdk_reactor_get(current_core); 500 } 501 502 /* If spdk_event_call isn't called on a reactor, always send a notification. 503 * If it is called on a reactor, send a notification if the destination reactor 504 * is indicated in interrupt mode state. 505 */ 506 if (spdk_unlikely(local_reactor == NULL) || 507 spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) { 508 uint64_t notify = 1; 509 510 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 511 if (rc < 0) { 512 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 513 } 514 } 515 } 516 517 static inline int 518 event_queue_run_batch(void *arg) 519 { 520 struct spdk_reactor *reactor = arg; 521 size_t count, i; 522 void *events[SPDK_EVENT_BATCH_SIZE]; 523 struct spdk_thread *thread; 524 struct spdk_lw_thread *lw_thread; 525 526 #ifdef DEBUG 527 /* 528 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 529 * so we will never actually read uninitialized data from events, but just to be sure 530 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 531 */ 532 memset(events, 0, sizeof(events)); 533 #endif 534 535 /* Operate event notification if this reactor currently runs in interrupt state */ 536 if (spdk_unlikely(reactor->in_interrupt)) { 537 uint64_t notify = 1; 538 int rc; 539 540 /* There may be race between event_acknowledge and another producer's event_notify, 541 * so event_acknowledge should be applied ahead. And then check for self's event_notify. 542 * This can avoid event notification missing. 543 */ 544 rc = read(reactor->events_fd, ¬ify, sizeof(notify)); 545 if (rc < 0) { 546 SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno)); 547 return -errno; 548 } 549 550 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 551 552 if (spdk_ring_count(reactor->events) != 0) { 553 /* Trigger new notification if there are still events in event-queue waiting for processing. */ 554 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 555 if (rc < 0) { 556 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 557 return -errno; 558 } 559 } 560 } else { 561 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 562 } 563 564 if (count == 0) { 565 return 0; 566 } 567 568 /* Execute the events. There are still some remaining events 569 * that must occur on an SPDK thread. To accommodate those, try to 570 * run them on the first thread in the list, if it exists. */ 571 lw_thread = TAILQ_FIRST(&reactor->threads); 572 if (lw_thread) { 573 thread = spdk_thread_get_from_ctx(lw_thread); 574 } else { 575 thread = NULL; 576 } 577 578 for (i = 0; i < count; i++) { 579 struct spdk_event *event = events[i]; 580 581 assert(event != NULL); 582 spdk_set_thread(thread); 583 584 SPDK_DTRACE_PROBE3(event_exec, event->fn, 585 event->arg1, event->arg2); 586 event->fn(event->arg1, event->arg2); 587 spdk_set_thread(NULL); 588 } 589 590 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 591 592 return (int)count; 593 } 594 595 /* 1s */ 596 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 597 598 static int 599 get_rusage(struct spdk_reactor *reactor) 600 { 601 struct rusage rusage; 602 603 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 604 return -1; 605 } 606 607 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 608 SPDK_INFOLOG(reactor, 609 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 610 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 611 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 612 } 613 reactor->rusage = rusage; 614 615 return -1; 616 } 617 618 void 619 spdk_framework_enable_context_switch_monitor(bool enable) 620 { 621 /* This global is being read by multiple threads, so this isn't 622 * strictly thread safe. However, we're toggling between true and 623 * false here, and if a thread sees the value update later than it 624 * should, it's no big deal. */ 625 g_framework_context_switch_monitor_enabled = enable; 626 } 627 628 bool 629 spdk_framework_context_switch_monitor_enabled(void) 630 { 631 return g_framework_context_switch_monitor_enabled; 632 } 633 634 static void 635 _set_thread_name(const char *thread_name) 636 { 637 #if defined(__linux__) 638 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 639 #elif defined(__FreeBSD__) 640 pthread_set_name_np(pthread_self(), thread_name); 641 #else 642 pthread_setname_np(pthread_self(), thread_name); 643 #endif 644 } 645 646 static void 647 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 648 { 649 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 650 struct spdk_thread_stats prev_total_stats; 651 652 /* Read total_stats before updating it to calculate stats during the last scheduling period. */ 653 prev_total_stats = lw_thread->total_stats; 654 655 spdk_set_thread(thread); 656 spdk_thread_get_stats(&lw_thread->total_stats); 657 spdk_set_thread(NULL); 658 659 lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc; 660 lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc; 661 } 662 663 static void 664 _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info) 665 { 666 struct spdk_lw_thread *lw_thread; 667 struct spdk_thread *thread; 668 669 thread = spdk_thread_get_by_id(thread_info->thread_id); 670 if (thread == NULL) { 671 /* Thread no longer exists. */ 672 return; 673 } 674 lw_thread = spdk_thread_get_ctx(thread); 675 assert(lw_thread != NULL); 676 677 lw_thread->lcore = thread_info->lcore; 678 lw_thread->resched = true; 679 } 680 681 static void 682 _threads_reschedule(struct spdk_scheduler_core_info *cores_info) 683 { 684 struct spdk_scheduler_core_info *core; 685 struct spdk_scheduler_thread_info *thread_info; 686 uint32_t i, j; 687 688 SPDK_ENV_FOREACH_CORE(i) { 689 core = &cores_info[i]; 690 for (j = 0; j < core->threads_count; j++) { 691 thread_info = &core->thread_infos[j]; 692 if (thread_info->lcore != i) { 693 _threads_reschedule_thread(thread_info); 694 } 695 } 696 core->threads_count = 0; 697 free(core->thread_infos); 698 core->thread_infos = NULL; 699 } 700 } 701 702 static void 703 _reactors_scheduler_fini(void) 704 { 705 /* Reschedule based on the balancing output */ 706 _threads_reschedule(g_core_infos); 707 708 g_scheduling_in_progress = false; 709 } 710 711 static void 712 _reactors_scheduler_update_core_mode(void *ctx) 713 { 714 struct spdk_reactor *reactor; 715 uint32_t i; 716 int rc = 0; 717 718 for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) { 719 reactor = spdk_reactor_get(i); 720 assert(reactor != NULL); 721 if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) { 722 /* Switch next found reactor to new state */ 723 rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode, 724 _reactors_scheduler_update_core_mode, NULL); 725 if (rc == 0) { 726 /* Set core to start with after callback completes */ 727 g_scheduler_core_number = spdk_env_get_next_core(i); 728 return; 729 } 730 } 731 } 732 _reactors_scheduler_fini(); 733 } 734 735 static void 736 _reactors_scheduler_cancel(void *arg1, void *arg2) 737 { 738 struct spdk_scheduler_core_info *core; 739 uint32_t i; 740 741 SPDK_ENV_FOREACH_CORE(i) { 742 core = &g_core_infos[i]; 743 core->threads_count = 0; 744 free(core->thread_infos); 745 core->thread_infos = NULL; 746 } 747 748 g_scheduling_in_progress = false; 749 } 750 751 static void 752 _reactors_scheduler_balance(void *arg1, void *arg2) 753 { 754 struct spdk_scheduler *scheduler = spdk_scheduler_get(); 755 756 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) { 757 _reactors_scheduler_cancel(NULL, NULL); 758 return; 759 } 760 761 scheduler->balance(g_core_infos, g_reactor_count); 762 763 g_scheduler_core_number = spdk_env_get_first_core(); 764 _reactors_scheduler_update_core_mode(NULL); 765 } 766 767 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */ 768 static void 769 _reactors_scheduler_gather_metrics(void *arg1, void *arg2) 770 { 771 struct spdk_scheduler_core_info *core_info; 772 struct spdk_lw_thread *lw_thread; 773 struct spdk_thread *thread; 774 struct spdk_reactor *reactor; 775 uint32_t next_core; 776 uint32_t i = 0; 777 778 reactor = spdk_reactor_get(spdk_env_get_current_core()); 779 assert(reactor != NULL); 780 core_info = &g_core_infos[reactor->lcore]; 781 core_info->lcore = reactor->lcore; 782 core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc; 783 core_info->total_idle_tsc = reactor->idle_tsc; 784 core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc; 785 core_info->total_busy_tsc = reactor->busy_tsc; 786 core_info->interrupt_mode = reactor->in_interrupt; 787 core_info->threads_count = 0; 788 789 SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); 790 791 if (reactor->thread_count > 0) { 792 core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos)); 793 if (core_info->thread_infos == NULL) { 794 SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore); 795 796 /* Cancel this round of schedule work */ 797 _event_call(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL); 798 return; 799 } 800 801 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 802 _init_thread_stats(reactor, lw_thread); 803 804 core_info->thread_infos[i].lcore = lw_thread->lcore; 805 thread = spdk_thread_get_from_ctx(lw_thread); 806 assert(thread != NULL); 807 core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread); 808 core_info->thread_infos[i].total_stats = lw_thread->total_stats; 809 core_info->thread_infos[i].current_stats = lw_thread->current_stats; 810 core_info->threads_count++; 811 assert(core_info->threads_count <= reactor->thread_count); 812 i++; 813 } 814 } 815 816 next_core = spdk_env_get_next_core(reactor->lcore); 817 if (next_core == UINT32_MAX) { 818 next_core = spdk_env_get_first_core(); 819 } 820 821 /* If we've looped back around to the scheduler thread, move to the next phase */ 822 if (next_core == g_scheduling_reactor->lcore) { 823 /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */ 824 _event_call(next_core, _reactors_scheduler_balance, NULL, NULL); 825 return; 826 } 827 828 _event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL); 829 } 830 831 static int _reactor_schedule_thread(struct spdk_thread *thread); 832 static uint64_t g_rusage_period; 833 834 static void 835 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 836 { 837 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 838 int efd; 839 840 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 841 assert(reactor->thread_count > 0); 842 reactor->thread_count--; 843 844 /* Operate thread intr if running with full interrupt ability */ 845 if (spdk_interrupt_mode_is_enabled()) { 846 efd = spdk_thread_get_interrupt_fd(thread); 847 spdk_fd_group_remove(reactor->fgrp, efd); 848 } 849 } 850 851 static bool 852 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 853 { 854 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 855 856 if (spdk_unlikely(spdk_thread_is_exited(thread) && 857 spdk_thread_is_idle(thread))) { 858 _reactor_remove_lw_thread(reactor, lw_thread); 859 spdk_thread_destroy(thread); 860 return true; 861 } 862 863 if (spdk_unlikely(lw_thread->resched)) { 864 lw_thread->resched = false; 865 _reactor_remove_lw_thread(reactor, lw_thread); 866 _reactor_schedule_thread(thread); 867 return true; 868 } 869 870 return false; 871 } 872 873 static void 874 reactor_interrupt_run(struct spdk_reactor *reactor) 875 { 876 int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */ 877 878 spdk_fd_group_wait(reactor->fgrp, block_timeout); 879 } 880 881 static void 882 _reactor_run(struct spdk_reactor *reactor) 883 { 884 struct spdk_thread *thread; 885 struct spdk_lw_thread *lw_thread, *tmp; 886 uint64_t now; 887 int rc; 888 889 event_queue_run_batch(reactor); 890 891 /* If no threads are present on the reactor, 892 * tsc_last gets outdated. Update it to track 893 * thread execution time correctly. */ 894 if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) { 895 now = spdk_get_ticks(); 896 reactor->idle_tsc += now - reactor->tsc_last; 897 reactor->tsc_last = now; 898 return; 899 } 900 901 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 902 thread = spdk_thread_get_from_ctx(lw_thread); 903 rc = spdk_thread_poll(thread, 0, reactor->tsc_last); 904 905 now = spdk_thread_get_last_tsc(thread); 906 if (rc == 0) { 907 reactor->idle_tsc += now - reactor->tsc_last; 908 } else if (rc > 0) { 909 reactor->busy_tsc += now - reactor->tsc_last; 910 } 911 reactor->tsc_last = now; 912 913 reactor_post_process_lw_thread(reactor, lw_thread); 914 } 915 } 916 917 static int 918 reactor_run(void *arg) 919 { 920 struct spdk_reactor *reactor = arg; 921 struct spdk_thread *thread; 922 struct spdk_lw_thread *lw_thread, *tmp; 923 char thread_name[32]; 924 uint64_t last_sched = 0; 925 926 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 927 928 /* Rename the POSIX thread because the reactor is tied to the POSIX 929 * thread in the SPDK event library. 930 */ 931 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 932 _set_thread_name(thread_name); 933 934 reactor->tsc_last = spdk_get_ticks(); 935 936 while (1) { 937 /* Execute interrupt process fn if this reactor currently runs in interrupt state */ 938 if (spdk_unlikely(reactor->in_interrupt)) { 939 reactor_interrupt_run(reactor); 940 } else { 941 _reactor_run(reactor); 942 } 943 944 if (g_framework_context_switch_monitor_enabled) { 945 if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { 946 get_rusage(reactor); 947 reactor->last_rusage = reactor->tsc_last; 948 } 949 } 950 951 if (spdk_unlikely(g_scheduler_period > 0 && 952 (reactor->tsc_last - last_sched) > g_scheduler_period && 953 reactor == g_scheduling_reactor && 954 !g_scheduling_in_progress)) { 955 last_sched = reactor->tsc_last; 956 g_scheduling_in_progress = true; 957 _reactors_scheduler_gather_metrics(NULL, NULL); 958 } 959 960 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 961 break; 962 } 963 } 964 965 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 966 thread = spdk_thread_get_from_ctx(lw_thread); 967 /* All threads should have already had spdk_thread_exit() called on them, except 968 * for the app thread. 969 */ 970 if (spdk_thread_is_running(thread)) { 971 if (thread != spdk_thread_get_app_thread()) { 972 SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n", 973 spdk_thread_get_name(thread)); 974 SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n"); 975 } 976 spdk_set_thread(thread); 977 spdk_thread_exit(thread); 978 } 979 } 980 981 while (!TAILQ_EMPTY(&reactor->threads)) { 982 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 983 thread = spdk_thread_get_from_ctx(lw_thread); 984 spdk_set_thread(thread); 985 if (spdk_thread_is_exited(thread)) { 986 _reactor_remove_lw_thread(reactor, lw_thread); 987 spdk_thread_destroy(thread); 988 } else { 989 spdk_thread_poll(thread, 0, 0); 990 } 991 } 992 } 993 994 return 0; 995 } 996 997 int 998 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 999 { 1000 int ret; 1001 const struct spdk_cpuset *validmask; 1002 1003 ret = spdk_cpuset_parse(cpumask, mask); 1004 if (ret < 0) { 1005 return ret; 1006 } 1007 1008 validmask = spdk_app_get_core_mask(); 1009 spdk_cpuset_and(cpumask, validmask); 1010 1011 return 0; 1012 } 1013 1014 const struct spdk_cpuset * 1015 spdk_app_get_core_mask(void) 1016 { 1017 return &g_reactor_core_mask; 1018 } 1019 1020 void 1021 spdk_reactors_start(void) 1022 { 1023 struct spdk_reactor *reactor; 1024 uint32_t i, current_core; 1025 int rc; 1026 1027 g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; 1028 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 1029 /* Reinitialize to false, in case the app framework is restarting in the same process. */ 1030 g_stopping_reactors = false; 1031 1032 current_core = spdk_env_get_current_core(); 1033 SPDK_ENV_FOREACH_CORE(i) { 1034 if (i != current_core) { 1035 reactor = spdk_reactor_get(i); 1036 if (reactor == NULL) { 1037 continue; 1038 } 1039 1040 rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); 1041 if (rc < 0) { 1042 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 1043 assert(false); 1044 return; 1045 } 1046 } 1047 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 1048 } 1049 1050 /* Start the main reactor */ 1051 reactor = spdk_reactor_get(current_core); 1052 assert(reactor != NULL); 1053 reactor_run(reactor); 1054 1055 spdk_env_thread_wait_all(); 1056 1057 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 1058 } 1059 1060 static void 1061 _reactors_stop(void *arg1, void *arg2) 1062 { 1063 uint32_t i; 1064 int rc; 1065 struct spdk_reactor *reactor; 1066 struct spdk_reactor *local_reactor; 1067 uint64_t notify = 1; 1068 1069 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 1070 local_reactor = spdk_reactor_get(spdk_env_get_current_core()); 1071 1072 SPDK_ENV_FOREACH_CORE(i) { 1073 /* If spdk_event_call isn't called on a reactor, always send a notification. 1074 * If it is called on a reactor, send a notification if the destination reactor 1075 * is indicated in interrupt mode state. 1076 */ 1077 if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) { 1078 reactor = spdk_reactor_get(i); 1079 assert(reactor != NULL); 1080 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 1081 if (rc < 0) { 1082 SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno)); 1083 continue; 1084 } 1085 } 1086 } 1087 } 1088 1089 static void 1090 nop(void *arg1, void *arg2) 1091 { 1092 } 1093 1094 void 1095 spdk_reactors_stop(void *arg1) 1096 { 1097 spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop); 1098 } 1099 1100 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 1101 static uint32_t g_next_core = UINT32_MAX; 1102 1103 static int 1104 thread_process_interrupts(void *arg) 1105 { 1106 struct spdk_thread *thread = arg; 1107 struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); 1108 uint64_t now; 1109 int rc; 1110 1111 assert(reactor != NULL); 1112 1113 /* Update idle_tsc between the end of last intr_fn and the start of this intr_fn. */ 1114 now = spdk_get_ticks(); 1115 reactor->idle_tsc += now - reactor->tsc_last; 1116 reactor->tsc_last = now; 1117 1118 rc = spdk_thread_poll(thread, 0, now); 1119 1120 /* Update tsc between the start and the end of this intr_fn. */ 1121 now = spdk_thread_get_last_tsc(thread); 1122 if (rc == 0) { 1123 reactor->idle_tsc += now - reactor->tsc_last; 1124 } else if (rc > 0) { 1125 reactor->busy_tsc += now - reactor->tsc_last; 1126 } 1127 reactor->tsc_last = now; 1128 1129 return rc; 1130 } 1131 1132 static void 1133 _schedule_thread(void *arg1, void *arg2) 1134 { 1135 struct spdk_lw_thread *lw_thread = arg1; 1136 struct spdk_thread *thread; 1137 struct spdk_reactor *reactor; 1138 uint32_t current_core; 1139 int efd; 1140 1141 current_core = spdk_env_get_current_core(); 1142 reactor = spdk_reactor_get(current_core); 1143 assert(reactor != NULL); 1144 1145 /* Update total_stats to reflect state of thread 1146 * at the end of the move. */ 1147 thread = spdk_thread_get_from_ctx(lw_thread); 1148 spdk_set_thread(thread); 1149 spdk_thread_get_stats(&lw_thread->total_stats); 1150 spdk_set_thread(NULL); 1151 1152 lw_thread->lcore = current_core; 1153 1154 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 1155 reactor->thread_count++; 1156 1157 /* Operate thread intr if running with full interrupt ability */ 1158 if (spdk_interrupt_mode_is_enabled()) { 1159 int rc; 1160 1161 efd = spdk_thread_get_interrupt_fd(thread); 1162 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, efd, 1163 thread_process_interrupts, thread); 1164 if (rc < 0) { 1165 SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc)); 1166 } 1167 1168 /* Align spdk_thread with reactor to interrupt mode or poll mode */ 1169 spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor); 1170 } 1171 } 1172 1173 static int 1174 _reactor_schedule_thread(struct spdk_thread *thread) 1175 { 1176 uint32_t core; 1177 struct spdk_lw_thread *lw_thread; 1178 struct spdk_event *evt = NULL; 1179 struct spdk_cpuset *cpumask; 1180 uint32_t i; 1181 struct spdk_reactor *local_reactor = NULL; 1182 uint32_t current_lcore = spdk_env_get_current_core(); 1183 struct spdk_cpuset polling_cpumask; 1184 struct spdk_cpuset valid_cpumask; 1185 1186 cpumask = spdk_thread_get_cpumask(thread); 1187 1188 lw_thread = spdk_thread_get_ctx(thread); 1189 assert(lw_thread != NULL); 1190 core = lw_thread->lcore; 1191 memset(lw_thread, 0, sizeof(*lw_thread)); 1192 1193 if (current_lcore != SPDK_ENV_LCORE_ID_ANY) { 1194 local_reactor = spdk_reactor_get(current_lcore); 1195 assert(local_reactor); 1196 } 1197 1198 /* When interrupt ability of spdk_thread is not enabled and the current 1199 * reactor runs on DPDK thread, skip reactors which are in interrupt mode. 1200 */ 1201 if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) { 1202 /* Get the cpumask of all reactors in polling */ 1203 spdk_cpuset_zero(&polling_cpumask); 1204 SPDK_ENV_FOREACH_CORE(i) { 1205 spdk_cpuset_set_cpu(&polling_cpumask, i, true); 1206 } 1207 spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset); 1208 1209 if (core == SPDK_ENV_LCORE_ID_ANY) { 1210 /* Get the cpumask of all valid reactors which are suggested and also in polling */ 1211 spdk_cpuset_copy(&valid_cpumask, &polling_cpumask); 1212 spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread)); 1213 1214 /* If there are any valid reactors, spdk_thread should be scheduled 1215 * into one of the valid reactors. 1216 * If there is no valid reactors, spdk_thread should be scheduled 1217 * into one of the polling reactors. 1218 */ 1219 if (spdk_cpuset_count(&valid_cpumask) != 0) { 1220 cpumask = &valid_cpumask; 1221 } else { 1222 cpumask = &polling_cpumask; 1223 } 1224 } else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) { 1225 /* If specified reactor is not in polling, spdk_thread should be scheduled 1226 * into one of the polling reactors. 1227 */ 1228 core = SPDK_ENV_LCORE_ID_ANY; 1229 cpumask = &polling_cpumask; 1230 } 1231 } 1232 1233 pthread_mutex_lock(&g_scheduler_mtx); 1234 if (core == SPDK_ENV_LCORE_ID_ANY) { 1235 for (i = 0; i < spdk_env_get_core_count(); i++) { 1236 if (g_next_core >= g_reactor_count) { 1237 g_next_core = spdk_env_get_first_core(); 1238 } 1239 core = g_next_core; 1240 g_next_core = spdk_env_get_next_core(g_next_core); 1241 1242 if (spdk_cpuset_get_cpu(cpumask, core)) { 1243 break; 1244 } 1245 } 1246 } 1247 1248 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 1249 1250 pthread_mutex_unlock(&g_scheduler_mtx); 1251 1252 assert(evt != NULL); 1253 if (evt == NULL) { 1254 SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); 1255 return -1; 1256 } 1257 1258 lw_thread->tsc_start = spdk_get_ticks(); 1259 1260 spdk_event_call(evt); 1261 1262 return 0; 1263 } 1264 1265 static void 1266 _reactor_request_thread_reschedule(struct spdk_thread *thread) 1267 { 1268 struct spdk_lw_thread *lw_thread; 1269 struct spdk_reactor *reactor; 1270 uint32_t current_core; 1271 1272 assert(thread == spdk_get_thread()); 1273 1274 lw_thread = spdk_thread_get_ctx(thread); 1275 1276 assert(lw_thread != NULL); 1277 lw_thread->resched = true; 1278 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1279 1280 current_core = spdk_env_get_current_core(); 1281 reactor = spdk_reactor_get(current_core); 1282 assert(reactor != NULL); 1283 1284 /* Send a notification if the destination reactor is indicated in intr mode state */ 1285 if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) { 1286 uint64_t notify = 1; 1287 1288 if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1289 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 1290 } 1291 } 1292 } 1293 1294 static int 1295 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) 1296 { 1297 struct spdk_lw_thread *lw_thread; 1298 1299 switch (op) { 1300 case SPDK_THREAD_OP_NEW: 1301 lw_thread = spdk_thread_get_ctx(thread); 1302 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1303 return _reactor_schedule_thread(thread); 1304 case SPDK_THREAD_OP_RESCHED: 1305 _reactor_request_thread_reschedule(thread); 1306 return 0; 1307 default: 1308 return -ENOTSUP; 1309 } 1310 } 1311 1312 static bool 1313 reactor_thread_op_supported(enum spdk_thread_op op) 1314 { 1315 switch (op) { 1316 case SPDK_THREAD_OP_NEW: 1317 case SPDK_THREAD_OP_RESCHED: 1318 return true; 1319 default: 1320 return false; 1321 } 1322 } 1323 1324 struct call_reactor { 1325 uint32_t cur_core; 1326 spdk_event_fn fn; 1327 void *arg1; 1328 void *arg2; 1329 1330 uint32_t orig_core; 1331 spdk_event_fn cpl; 1332 }; 1333 1334 static void 1335 on_reactor(void *arg1, void *arg2) 1336 { 1337 struct call_reactor *cr = arg1; 1338 struct spdk_event *evt; 1339 1340 cr->fn(cr->arg1, cr->arg2); 1341 1342 cr->cur_core = spdk_env_get_next_core(cr->cur_core); 1343 1344 if (cr->cur_core >= g_reactor_count) { 1345 SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n"); 1346 1347 evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); 1348 free(cr); 1349 } else { 1350 SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n", 1351 cr->cur_core); 1352 1353 evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); 1354 } 1355 assert(evt != NULL); 1356 spdk_event_call(evt); 1357 } 1358 1359 void 1360 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) 1361 { 1362 struct call_reactor *cr; 1363 1364 /* When the application framework is shutting down, we will send one 1365 * final for_each_reactor operation with completion callback _reactors_stop, 1366 * to flush any existing for_each_reactor operations to avoid any memory 1367 * leaks. We use a mutex here to protect a boolean flag that will ensure 1368 * we don't start any more operations once we've started shutting down. 1369 */ 1370 pthread_mutex_lock(&g_stopping_reactors_mtx); 1371 if (g_stopping_reactors) { 1372 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1373 return; 1374 } else if (cpl == _reactors_stop) { 1375 g_stopping_reactors = true; 1376 } 1377 pthread_mutex_unlock(&g_stopping_reactors_mtx); 1378 1379 cr = calloc(1, sizeof(*cr)); 1380 if (!cr) { 1381 SPDK_ERRLOG("Unable to perform reactor iteration\n"); 1382 cpl(arg1, arg2); 1383 return; 1384 } 1385 1386 cr->fn = fn; 1387 cr->arg1 = arg1; 1388 cr->arg2 = arg2; 1389 cr->cpl = cpl; 1390 cr->orig_core = spdk_env_get_current_core(); 1391 cr->cur_core = spdk_env_get_first_core(); 1392 1393 SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core); 1394 1395 _event_call(cr->cur_core, on_reactor, cr, NULL); 1396 } 1397 1398 #ifdef __linux__ 1399 static int 1400 reactor_schedule_thread_event(void *arg) 1401 { 1402 struct spdk_reactor *reactor = arg; 1403 struct spdk_lw_thread *lw_thread, *tmp; 1404 uint32_t count = 0; 1405 uint64_t notify = 1; 1406 1407 assert(reactor->in_interrupt); 1408 1409 if (read(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1410 SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno)); 1411 return -errno; 1412 } 1413 1414 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1415 count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0; 1416 } 1417 1418 return count; 1419 } 1420 1421 static int 1422 reactor_interrupt_init(struct spdk_reactor *reactor) 1423 { 1424 int rc; 1425 1426 rc = spdk_fd_group_create(&reactor->fgrp); 1427 if (rc != 0) { 1428 return rc; 1429 } 1430 1431 reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1432 if (reactor->resched_fd < 0) { 1433 rc = -EBADF; 1434 goto err; 1435 } 1436 1437 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event, 1438 reactor); 1439 if (rc) { 1440 close(reactor->resched_fd); 1441 goto err; 1442 } 1443 1444 reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1445 if (reactor->events_fd < 0) { 1446 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1447 close(reactor->resched_fd); 1448 1449 rc = -EBADF; 1450 goto err; 1451 } 1452 1453 rc = SPDK_FD_GROUP_ADD(reactor->fgrp, reactor->events_fd, 1454 event_queue_run_batch, reactor); 1455 if (rc) { 1456 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1457 close(reactor->resched_fd); 1458 close(reactor->events_fd); 1459 goto err; 1460 } 1461 1462 return 0; 1463 1464 err: 1465 spdk_fd_group_destroy(reactor->fgrp); 1466 reactor->fgrp = NULL; 1467 return rc; 1468 } 1469 #else 1470 static int 1471 reactor_interrupt_init(struct spdk_reactor *reactor) 1472 { 1473 return -ENOTSUP; 1474 } 1475 #endif 1476 1477 static void 1478 reactor_interrupt_fini(struct spdk_reactor *reactor) 1479 { 1480 struct spdk_fd_group *fgrp = reactor->fgrp; 1481 1482 if (!fgrp) { 1483 return; 1484 } 1485 1486 spdk_fd_group_remove(fgrp, reactor->events_fd); 1487 spdk_fd_group_remove(fgrp, reactor->resched_fd); 1488 1489 close(reactor->events_fd); 1490 close(reactor->resched_fd); 1491 1492 spdk_fd_group_destroy(fgrp); 1493 reactor->fgrp = NULL; 1494 } 1495 1496 static struct spdk_governor * 1497 _governor_find(const char *name) 1498 { 1499 struct spdk_governor *governor, *tmp; 1500 1501 TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) { 1502 if (strcmp(name, governor->name) == 0) { 1503 return governor; 1504 } 1505 } 1506 1507 return NULL; 1508 } 1509 1510 int 1511 spdk_governor_set(const char *name) 1512 { 1513 struct spdk_governor *governor; 1514 int rc = 0; 1515 1516 /* NULL governor was specifically requested */ 1517 if (name == NULL) { 1518 if (g_governor) { 1519 g_governor->deinit(); 1520 } 1521 g_governor = NULL; 1522 return 0; 1523 } 1524 1525 governor = _governor_find(name); 1526 if (governor == NULL) { 1527 return -EINVAL; 1528 } 1529 1530 if (g_governor == governor) { 1531 return 0; 1532 } 1533 1534 rc = governor->init(); 1535 if (rc == 0) { 1536 if (g_governor) { 1537 g_governor->deinit(); 1538 } 1539 g_governor = governor; 1540 } 1541 1542 return rc; 1543 } 1544 1545 struct spdk_governor * 1546 spdk_governor_get(void) 1547 { 1548 return g_governor; 1549 } 1550 1551 void 1552 spdk_governor_register(struct spdk_governor *governor) 1553 { 1554 if (_governor_find(governor->name)) { 1555 SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name); 1556 assert(false); 1557 return; 1558 } 1559 1560 TAILQ_INSERT_TAIL(&g_governor_list, governor, link); 1561 } 1562 1563 SPDK_LOG_REGISTER_COMPONENT(reactor) 1564