1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/likely.h" 36 37 #include "spdk_internal/event.h" 38 39 #include "spdk/log.h" 40 #include "spdk/thread.h" 41 #include "spdk/env.h" 42 #include "spdk/util.h" 43 #include "spdk/string.h" 44 #include "spdk/fd_group.h" 45 46 #ifdef __linux__ 47 #include <sys/prctl.h> 48 #include <sys/eventfd.h> 49 #endif 50 51 #ifdef __FreeBSD__ 52 #include <pthread_np.h> 53 #endif 54 55 #define SPDK_EVENT_BATCH_SIZE 8 56 57 static struct spdk_reactor *g_reactors; 58 static uint32_t g_reactor_count; 59 static struct spdk_cpuset g_reactor_core_mask; 60 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; 61 62 static bool g_framework_context_switch_monitor_enabled = true; 63 64 static struct spdk_mempool *g_spdk_event_mempool = NULL; 65 66 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list 67 = TAILQ_HEAD_INITIALIZER(g_scheduler_list); 68 69 static struct spdk_scheduler *g_scheduler; 70 static struct spdk_scheduler *g_new_scheduler; 71 static struct spdk_reactor *g_scheduling_reactor; 72 static uint64_t g_scheduler_period; 73 static uint32_t g_scheduler_core_number; 74 static struct spdk_scheduler_core_info *g_core_infos = NULL; 75 76 TAILQ_HEAD(, spdk_governor) g_governor_list 77 = TAILQ_HEAD_INITIALIZER(g_governor_list); 78 79 static int _governor_get_capabilities(uint32_t lcore_id, 80 struct spdk_governor_capabilities *capabilities); 81 82 static struct spdk_governor g_governor = { 83 .name = "default", 84 .get_core_capabilities = _governor_get_capabilities, 85 }; 86 87 static int reactor_interrupt_init(struct spdk_reactor *reactor); 88 static void reactor_interrupt_fini(struct spdk_reactor *reactor); 89 90 static struct spdk_scheduler * 91 _scheduler_find(char *name) 92 { 93 struct spdk_scheduler *tmp; 94 95 TAILQ_FOREACH(tmp, &g_scheduler_list, link) { 96 if (strcmp(name, tmp->name) == 0) { 97 return tmp; 98 } 99 } 100 101 return NULL; 102 } 103 104 int 105 _spdk_scheduler_set(char *name) 106 { 107 struct spdk_scheduler *scheduler; 108 109 scheduler = _scheduler_find(name); 110 if (scheduler == NULL) { 111 SPDK_ERRLOG("Requested scheduler is missing\n"); 112 return -ENOENT; 113 } 114 115 if (g_scheduling_reactor->flags.is_scheduling) { 116 if (g_scheduler != g_new_scheduler) { 117 /* Scheduler already changed, cannot defer multiple deinits */ 118 return -EBUSY; 119 } 120 } else { 121 if (g_scheduler != NULL && g_scheduler->deinit != NULL) { 122 g_scheduler->deinit(&g_governor); 123 } 124 g_scheduler = scheduler; 125 } 126 127 g_new_scheduler = scheduler; 128 129 if (scheduler->init != NULL) { 130 scheduler->init(&g_governor); 131 } 132 133 return 0; 134 } 135 136 struct spdk_scheduler * 137 _spdk_scheduler_get(void) 138 { 139 return g_scheduler; 140 } 141 142 uint64_t 143 _spdk_scheduler_period_get(void) 144 { 145 /* Convert from ticks to microseconds */ 146 return (g_scheduler_period * SPDK_SEC_TO_USEC / spdk_get_ticks_hz()); 147 } 148 149 void 150 _spdk_scheduler_period_set(uint64_t period) 151 { 152 /* Convert microseconds to ticks */ 153 g_scheduler_period = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 154 } 155 156 void 157 _spdk_scheduler_disable(void) 158 { 159 g_scheduler_period = 0; 160 } 161 162 void 163 _spdk_scheduler_list_add(struct spdk_scheduler *scheduler) 164 { 165 if (_scheduler_find(scheduler->name)) { 166 SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name); 167 assert(false); 168 return; 169 } 170 171 TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link); 172 } 173 174 static void 175 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 176 { 177 reactor->lcore = lcore; 178 reactor->flags.is_valid = true; 179 180 TAILQ_INIT(&reactor->threads); 181 reactor->thread_count = 0; 182 spdk_cpuset_zero(&reactor->notify_cpuset); 183 184 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 185 if (reactor->events == NULL) { 186 SPDK_ERRLOG("Failed to allocate events ring\n"); 187 assert(false); 188 } 189 190 /* Always initialize interrupt facilities for reactor */ 191 if (reactor_interrupt_init(reactor) != 0) { 192 /* Reactor interrupt facilities are necessary if seting app to interrupt mode. */ 193 if (spdk_interrupt_mode_is_enabled()) { 194 SPDK_ERRLOG("Failed to prepare intr facilities\n"); 195 assert(false); 196 } 197 return; 198 } 199 200 /* If application runs with full interrupt ability, 201 * all reactors are going to run in interrupt mode. 202 */ 203 if (spdk_interrupt_mode_is_enabled()) { 204 uint32_t i; 205 206 SPDK_ENV_FOREACH_CORE(i) { 207 spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true); 208 } 209 reactor->in_interrupt = true; 210 } 211 } 212 213 struct spdk_reactor * 214 spdk_reactor_get(uint32_t lcore) 215 { 216 struct spdk_reactor *reactor; 217 218 if (g_reactors == NULL) { 219 SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); 220 return NULL; 221 } 222 223 if (lcore >= g_reactor_count) { 224 return NULL; 225 } 226 227 reactor = &g_reactors[lcore]; 228 229 if (reactor->flags.is_valid == false) { 230 return NULL; 231 } 232 233 return reactor; 234 } 235 236 struct spdk_reactor * 237 _spdk_get_scheduling_reactor(void) 238 { 239 return g_scheduling_reactor; 240 } 241 242 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); 243 static bool reactor_thread_op_supported(enum spdk_thread_op op); 244 245 int 246 spdk_reactors_init(void) 247 { 248 struct spdk_reactor *reactor; 249 int rc; 250 uint32_t i, current_core; 251 char mempool_name[32]; 252 253 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 254 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 255 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 256 sizeof(struct spdk_event), 257 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 258 SPDK_ENV_SOCKET_ID_ANY); 259 260 if (g_spdk_event_mempool == NULL) { 261 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 262 return -1; 263 } 264 265 /* struct spdk_reactor must be aligned on 64 byte boundary */ 266 g_reactor_count = spdk_env_get_last_core() + 1; 267 rc = posix_memalign((void **)&g_reactors, 64, 268 g_reactor_count * sizeof(struct spdk_reactor)); 269 if (rc != 0) { 270 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 271 g_reactor_count); 272 spdk_mempool_free(g_spdk_event_mempool); 273 return -1; 274 } 275 276 g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos)); 277 if (g_core_infos == NULL) { 278 SPDK_ERRLOG("Could not allocate memory for g_core_infos\n"); 279 spdk_mempool_free(g_spdk_event_mempool); 280 free(g_reactors); 281 return -ENOMEM; 282 } 283 284 memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor)); 285 286 spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, 287 sizeof(struct spdk_lw_thread)); 288 289 SPDK_ENV_FOREACH_CORE(i) { 290 reactor_construct(&g_reactors[i], i); 291 } 292 293 current_core = spdk_env_get_current_core(); 294 reactor = spdk_reactor_get(current_core); 295 assert(reactor != NULL); 296 g_scheduling_reactor = reactor; 297 298 /* set default scheduling period to one second */ 299 g_scheduler_period = spdk_get_ticks_hz(); 300 301 rc = _spdk_scheduler_set("static"); 302 assert(rc == 0); 303 304 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 305 306 return 0; 307 } 308 309 void 310 spdk_reactors_fini(void) 311 { 312 uint32_t i; 313 struct spdk_reactor *reactor; 314 315 if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { 316 return; 317 } 318 319 if (g_scheduler->deinit != NULL) { 320 g_scheduler->deinit(&g_governor); 321 } 322 323 spdk_thread_lib_fini(); 324 325 SPDK_ENV_FOREACH_CORE(i) { 326 reactor = spdk_reactor_get(i); 327 assert(reactor != NULL); 328 assert(reactor->thread_count == 0); 329 if (reactor->events != NULL) { 330 spdk_ring_free(reactor->events); 331 } 332 333 reactor_interrupt_fini(reactor); 334 335 if (g_core_infos != NULL) { 336 free(g_core_infos[i].threads); 337 } 338 } 339 340 spdk_mempool_free(g_spdk_event_mempool); 341 342 free(g_reactors); 343 g_reactors = NULL; 344 free(g_core_infos); 345 g_core_infos = NULL; 346 } 347 348 static void _reactor_set_interrupt_mode(void *arg1, void *arg2); 349 350 static void 351 _reactor_set_notify_cpuset(void *arg1, void *arg2) 352 { 353 struct spdk_reactor *target = arg1; 354 struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); 355 356 spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt); 357 } 358 359 static void 360 _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2) 361 { 362 struct spdk_reactor *target = arg1; 363 364 if (target->new_in_interrupt == false) { 365 target->set_interrupt_mode_in_progress = false; 366 spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn, 367 target->set_interrupt_mode_cb_arg); 368 } else { 369 struct spdk_event *ev; 370 371 ev = spdk_event_allocate(target->lcore, _reactor_set_interrupt_mode, target, NULL); 372 assert(ev); 373 spdk_event_call(ev); 374 } 375 } 376 377 static void 378 _reactor_set_interrupt_mode(void *arg1, void *arg2) 379 { 380 struct spdk_reactor *target = arg1; 381 382 assert(target == spdk_reactor_get(spdk_env_get_current_core())); 383 assert(target != NULL); 384 assert(target->in_interrupt != target->new_in_interrupt); 385 assert(TAILQ_EMPTY(&target->threads)); 386 SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n", 387 target->lcore, !target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll"); 388 389 target->in_interrupt = target->new_in_interrupt; 390 391 if (target->new_in_interrupt == false) { 392 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 393 } else { 394 uint64_t notify = 1; 395 int rc = 0; 396 397 /* Always trigger spdk_event and resched event in case of race condition */ 398 rc = write(target->events_fd, ¬ify, sizeof(notify)); 399 if (rc < 0) { 400 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 401 } 402 rc = write(target->resched_fd, ¬ify, sizeof(notify)); 403 if (rc < 0) { 404 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 405 } 406 407 target->set_interrupt_mode_in_progress = false; 408 spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn, 409 target->set_interrupt_mode_cb_arg); 410 } 411 } 412 413 int 414 spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, 415 spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg) 416 { 417 struct spdk_reactor *target; 418 419 target = spdk_reactor_get(lcore); 420 if (target == NULL) { 421 return -EINVAL; 422 } 423 424 if (spdk_get_thread() != _spdk_get_app_thread()) { 425 SPDK_ERRLOG("It is only permitted within spdk application thread.\n"); 426 return -EPERM; 427 } 428 429 if (target->in_interrupt == new_in_interrupt) { 430 return 0; 431 } 432 433 if (target->set_interrupt_mode_in_progress) { 434 SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore); 435 return -EBUSY; 436 } 437 target->set_interrupt_mode_in_progress = true; 438 439 target->new_in_interrupt = new_in_interrupt; 440 target->set_interrupt_mode_cb_fn = cb_fn; 441 target->set_interrupt_mode_cb_arg = cb_arg; 442 443 SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n", 444 spdk_env_get_current_core(), lcore); 445 446 if (new_in_interrupt == false) { 447 /* For potential race cases, when setting the reactor to poll mode, 448 * first change the mode of the reactor and then clear the corresponding 449 * bit of the notify_cpuset of each reactor. 450 */ 451 struct spdk_event *ev; 452 453 ev = spdk_event_allocate(lcore, _reactor_set_interrupt_mode, target, NULL); 454 assert(ev); 455 spdk_event_call(ev); 456 } else { 457 /* For race caces, when setting the reactor to interrupt mode, first set the 458 * corresponding bit of the notify_cpuset of each reactor and then change the mode. 459 */ 460 spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); 461 } 462 463 return 0; 464 } 465 466 struct spdk_event * 467 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 468 { 469 struct spdk_event *event = NULL; 470 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 471 472 if (!reactor) { 473 assert(false); 474 return NULL; 475 } 476 477 event = spdk_mempool_get(g_spdk_event_mempool); 478 if (event == NULL) { 479 assert(false); 480 return NULL; 481 } 482 483 event->lcore = lcore; 484 event->fn = fn; 485 event->arg1 = arg1; 486 event->arg2 = arg2; 487 488 return event; 489 } 490 491 void 492 spdk_event_call(struct spdk_event *event) 493 { 494 int rc; 495 struct spdk_reactor *reactor; 496 struct spdk_reactor *local_reactor = NULL; 497 uint32_t current_core = spdk_env_get_current_core(); 498 499 reactor = spdk_reactor_get(event->lcore); 500 501 assert(reactor != NULL); 502 assert(reactor->events != NULL); 503 504 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); 505 if (rc != 1) { 506 assert(false); 507 } 508 509 if (current_core != SPDK_ENV_LCORE_ID_ANY) { 510 local_reactor = spdk_reactor_get(current_core); 511 } 512 513 /* If spdk_event_call isn't called on a reactor, always send a notification. 514 * If it is called on a reactor, send a notification if the destination reactor 515 * is indicated in interrupt mode state. 516 */ 517 if (spdk_unlikely(local_reactor == NULL) || 518 spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) { 519 uint64_t notify = 1; 520 521 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 522 if (rc < 0) { 523 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 524 } 525 } 526 } 527 528 static inline uint32_t 529 event_queue_run_batch(struct spdk_reactor *reactor) 530 { 531 unsigned count, i; 532 void *events[SPDK_EVENT_BATCH_SIZE]; 533 struct spdk_thread *thread; 534 struct spdk_lw_thread *lw_thread; 535 536 #ifdef DEBUG 537 /* 538 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 539 * so we will never actually read uninitialized data from events, but just to be sure 540 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 541 */ 542 memset(events, 0, sizeof(events)); 543 #endif 544 545 /* Operate event notification if this reactor currently runs in interrupt state */ 546 if (spdk_unlikely(reactor->in_interrupt)) { 547 uint64_t notify = 1; 548 int rc; 549 550 /* There may be race between event_acknowledge and another producer's event_notify, 551 * so event_acknowledge should be applied ahead. And then check for self's event_notify. 552 * This can avoid event notification missing. 553 */ 554 rc = read(reactor->events_fd, ¬ify, sizeof(notify)); 555 if (rc < 0) { 556 SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno)); 557 return -errno; 558 } 559 560 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 561 562 if (spdk_ring_count(reactor->events) != 0) { 563 /* Trigger new notification if there are still events in event-queue waiting for processing. */ 564 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 565 if (rc < 0) { 566 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 567 return -errno; 568 } 569 } 570 } else { 571 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 572 } 573 574 if (count == 0) { 575 return 0; 576 } 577 578 /* Execute the events. There are still some remaining events 579 * that must occur on an SPDK thread. To accomodate those, try to 580 * run them on the first thread in the list, if it exists. */ 581 lw_thread = TAILQ_FIRST(&reactor->threads); 582 if (lw_thread) { 583 thread = spdk_thread_get_from_ctx(lw_thread); 584 } else { 585 thread = NULL; 586 } 587 588 spdk_set_thread(thread); 589 590 for (i = 0; i < count; i++) { 591 struct spdk_event *event = events[i]; 592 593 assert(event != NULL); 594 event->fn(event->arg1, event->arg2); 595 } 596 597 spdk_set_thread(NULL); 598 599 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 600 601 return count; 602 } 603 604 /* 1s */ 605 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 606 607 static int 608 get_rusage(struct spdk_reactor *reactor) 609 { 610 struct rusage rusage; 611 612 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 613 return -1; 614 } 615 616 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 617 SPDK_INFOLOG(reactor, 618 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 619 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 620 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 621 } 622 reactor->rusage = rusage; 623 624 return -1; 625 } 626 627 void 628 spdk_framework_enable_context_switch_monitor(bool enable) 629 { 630 /* This global is being read by multiple threads, so this isn't 631 * strictly thread safe. However, we're toggling between true and 632 * false here, and if a thread sees the value update later than it 633 * should, it's no big deal. */ 634 g_framework_context_switch_monitor_enabled = enable; 635 } 636 637 bool 638 spdk_framework_context_switch_monitor_enabled(void) 639 { 640 return g_framework_context_switch_monitor_enabled; 641 } 642 643 static void 644 _set_thread_name(const char *thread_name) 645 { 646 #if defined(__linux__) 647 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 648 #elif defined(__FreeBSD__) 649 pthread_set_name_np(pthread_self(), thread_name); 650 #else 651 pthread_setname_np(pthread_self(), thread_name); 652 #endif 653 } 654 655 static void 656 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 657 { 658 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 659 660 lw_thread->lcore = reactor->lcore; 661 662 spdk_set_thread(thread); 663 spdk_thread_get_stats(&lw_thread->current_stats); 664 } 665 666 static void 667 _threads_reschedule(struct spdk_scheduler_core_info *cores_info) 668 { 669 struct spdk_scheduler_core_info *core; 670 struct spdk_lw_thread *lw_thread; 671 uint32_t i, j; 672 673 SPDK_ENV_FOREACH_CORE(i) { 674 core = &cores_info[i]; 675 for (j = 0; j < core->threads_count; j++) { 676 lw_thread = core->threads[j]; 677 if (lw_thread->lcore != lw_thread->new_lcore) { 678 _spdk_lw_thread_set_core(lw_thread, lw_thread->new_lcore); 679 } 680 } 681 } 682 } 683 684 static void 685 _reactors_scheduler_fini(void) 686 { 687 struct spdk_reactor *reactor; 688 uint32_t i; 689 690 /* Reschedule based on the balancing output */ 691 _threads_reschedule(g_core_infos); 692 693 SPDK_ENV_FOREACH_CORE(i) { 694 reactor = spdk_reactor_get(i); 695 assert(reactor != NULL); 696 reactor->flags.is_scheduling = false; 697 } 698 } 699 700 static void 701 _reactors_scheduler_update_core_mode(void *ctx) 702 { 703 struct spdk_reactor *reactor; 704 int rc = 0; 705 706 if (g_scheduler_core_number == SPDK_ENV_LCORE_ID_ANY) { 707 g_scheduler_core_number = spdk_env_get_first_core(); 708 } else { 709 g_scheduler_core_number = spdk_env_get_next_core(g_scheduler_core_number); 710 } 711 712 if (g_scheduler_core_number == SPDK_ENV_LCORE_ID_ANY) { 713 _reactors_scheduler_fini(); 714 return; 715 } 716 717 reactor = spdk_reactor_get(g_scheduler_core_number); 718 if (reactor->in_interrupt != g_core_infos[g_scheduler_core_number].interrupt_mode) { 719 /* Switch next found reactor to new state */ 720 rc = spdk_reactor_set_interrupt_mode(g_scheduler_core_number, 721 g_core_infos[g_scheduler_core_number].interrupt_mode, _reactors_scheduler_update_core_mode, NULL); 722 if (rc == 0) { 723 return; 724 } 725 } 726 727 _reactors_scheduler_update_core_mode(NULL); 728 } 729 730 static void 731 _reactors_scheduler_balance(void *arg1, void *arg2) 732 { 733 if (g_reactor_state == SPDK_REACTOR_STATE_RUNNING) { 734 g_scheduler->balance(g_core_infos, g_reactor_count, &g_governor); 735 736 g_scheduler_core_number = SPDK_ENV_LCORE_ID_ANY; 737 _reactors_scheduler_update_core_mode(NULL); 738 } 739 } 740 741 static void 742 _reactors_scheduler_cancel(void *arg1, void *arg2) 743 { 744 struct spdk_reactor *reactor; 745 uint32_t i; 746 747 SPDK_ENV_FOREACH_CORE(i) { 748 reactor = spdk_reactor_get(i); 749 assert(reactor != NULL); 750 reactor->flags.is_scheduling = false; 751 } 752 } 753 754 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */ 755 static void 756 _reactors_scheduler_gather_metrics(void *arg1, void *arg2) 757 { 758 struct spdk_scheduler_core_info *core_info; 759 struct spdk_lw_thread *lw_thread; 760 struct spdk_reactor *reactor; 761 struct spdk_event *evt; 762 uint32_t next_core; 763 uint32_t i; 764 765 reactor = spdk_reactor_get(spdk_env_get_current_core()); 766 assert(reactor != NULL); 767 reactor->flags.is_scheduling = true; 768 core_info = &g_core_infos[reactor->lcore]; 769 core_info->lcore = reactor->lcore; 770 core_info->core_idle_tsc = reactor->idle_tsc; 771 core_info->core_busy_tsc = reactor->busy_tsc; 772 core_info->interrupt_mode = reactor->in_interrupt; 773 774 SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); 775 776 free(core_info->threads); 777 core_info->threads = NULL; 778 779 i = 0; 780 781 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 782 _init_thread_stats(reactor, lw_thread); 783 i++; 784 } 785 786 core_info->threads_count = i; 787 788 if (core_info->threads_count > 0) { 789 core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *)); 790 if (core_info->threads == NULL) { 791 SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore); 792 793 /* Cancel this round of schedule work */ 794 evt = spdk_event_allocate(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL); 795 spdk_event_call(evt); 796 return; 797 } 798 799 i = 0; 800 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 801 core_info->threads[i] = lw_thread; 802 _spdk_lw_thread_get_current_stats(lw_thread, &lw_thread->snapshot_stats); 803 i++; 804 } 805 } 806 807 next_core = spdk_env_get_next_core(reactor->lcore); 808 if (next_core == UINT32_MAX) { 809 next_core = spdk_env_get_first_core(); 810 } 811 812 /* If we've looped back around to the scheduler thread, move to the next phase */ 813 if (next_core == g_scheduling_reactor->lcore) { 814 /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */ 815 evt = spdk_event_allocate(next_core, _reactors_scheduler_balance, NULL, NULL); 816 spdk_event_call(evt); 817 return; 818 } 819 820 evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL); 821 spdk_event_call(evt); 822 } 823 824 static int _reactor_schedule_thread(struct spdk_thread *thread); 825 static uint64_t g_rusage_period; 826 827 static void 828 _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 829 { 830 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 831 int efd; 832 833 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 834 assert(reactor->thread_count > 0); 835 reactor->thread_count--; 836 837 /* Operate thread intr if running with full interrupt ability */ 838 if (spdk_interrupt_mode_is_enabled()) { 839 efd = spdk_thread_get_interrupt_fd(thread); 840 spdk_fd_group_remove(reactor->fgrp, efd); 841 } 842 } 843 844 static bool 845 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 846 { 847 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 848 849 if (spdk_unlikely(lw_thread->resched)) { 850 lw_thread->resched = false; 851 _reactor_remove_lw_thread(reactor, lw_thread); 852 _reactor_schedule_thread(thread); 853 return true; 854 } 855 856 if (spdk_unlikely(spdk_thread_is_exited(thread) && 857 spdk_thread_is_idle(thread))) { 858 if (reactor->flags.is_scheduling == false) { 859 _reactor_remove_lw_thread(reactor, lw_thread); 860 spdk_thread_destroy(thread); 861 return true; 862 } 863 } 864 865 return false; 866 } 867 868 static void 869 reactor_interrupt_run(struct spdk_reactor *reactor) 870 { 871 int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */ 872 873 spdk_fd_group_wait(reactor->fgrp, block_timeout); 874 } 875 876 static void 877 _reactor_run(struct spdk_reactor *reactor) 878 { 879 struct spdk_thread *thread; 880 struct spdk_lw_thread *lw_thread, *tmp; 881 uint64_t now; 882 int rc; 883 884 event_queue_run_batch(reactor); 885 886 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 887 thread = spdk_thread_get_from_ctx(lw_thread); 888 rc = spdk_thread_poll(thread, 0, reactor->tsc_last); 889 890 now = spdk_thread_get_last_tsc(thread); 891 if (rc == 0) { 892 reactor->idle_tsc += now - reactor->tsc_last; 893 } else if (rc > 0) { 894 reactor->busy_tsc += now - reactor->tsc_last; 895 } 896 reactor->tsc_last = now; 897 898 reactor_post_process_lw_thread(reactor, lw_thread); 899 } 900 } 901 902 static int 903 reactor_run(void *arg) 904 { 905 struct spdk_reactor *reactor = arg; 906 struct spdk_thread *thread; 907 struct spdk_lw_thread *lw_thread, *tmp; 908 char thread_name[32]; 909 uint64_t last_sched = 0; 910 911 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 912 913 /* Rename the POSIX thread because the reactor is tied to the POSIX 914 * thread in the SPDK event library. 915 */ 916 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 917 _set_thread_name(thread_name); 918 919 reactor->tsc_last = spdk_get_ticks(); 920 921 while (1) { 922 /* Execute interrupt process fn if this reactor currently runs in interrupt state */ 923 if (spdk_unlikely(reactor->in_interrupt)) { 924 reactor_interrupt_run(reactor); 925 } else { 926 _reactor_run(reactor); 927 } 928 929 if (g_framework_context_switch_monitor_enabled) { 930 if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { 931 get_rusage(reactor); 932 reactor->last_rusage = reactor->tsc_last; 933 } 934 } 935 936 if (spdk_unlikely(g_scheduler_period > 0 && 937 (reactor->tsc_last - last_sched) > g_scheduler_period && 938 reactor == g_scheduling_reactor && 939 !reactor->flags.is_scheduling)) { 940 if (spdk_unlikely(g_scheduler != g_new_scheduler)) { 941 if (g_scheduler->deinit != NULL) { 942 g_scheduler->deinit(&g_governor); 943 } 944 g_scheduler = g_new_scheduler; 945 } 946 947 if (spdk_unlikely(g_scheduler->balance != NULL)) { 948 last_sched = reactor->tsc_last; 949 _reactors_scheduler_gather_metrics(NULL, NULL); 950 } 951 } 952 953 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 954 break; 955 } 956 } 957 958 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 959 thread = spdk_thread_get_from_ctx(lw_thread); 960 spdk_set_thread(thread); 961 spdk_thread_exit(thread); 962 } 963 964 while (!TAILQ_EMPTY(&reactor->threads)) { 965 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 966 thread = spdk_thread_get_from_ctx(lw_thread); 967 spdk_set_thread(thread); 968 if (spdk_thread_is_exited(thread)) { 969 _reactor_remove_lw_thread(reactor, lw_thread); 970 spdk_thread_destroy(thread); 971 } else { 972 spdk_thread_poll(thread, 0, 0); 973 } 974 } 975 } 976 977 return 0; 978 } 979 980 int 981 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 982 { 983 int ret; 984 const struct spdk_cpuset *validmask; 985 986 ret = spdk_cpuset_parse(cpumask, mask); 987 if (ret < 0) { 988 return ret; 989 } 990 991 validmask = spdk_app_get_core_mask(); 992 spdk_cpuset_and(cpumask, validmask); 993 994 return 0; 995 } 996 997 const struct spdk_cpuset * 998 spdk_app_get_core_mask(void) 999 { 1000 return &g_reactor_core_mask; 1001 } 1002 1003 void 1004 spdk_reactors_start(void) 1005 { 1006 struct spdk_reactor *reactor; 1007 uint32_t i, current_core; 1008 int rc; 1009 1010 g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; 1011 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 1012 1013 current_core = spdk_env_get_current_core(); 1014 SPDK_ENV_FOREACH_CORE(i) { 1015 if (i != current_core) { 1016 reactor = spdk_reactor_get(i); 1017 if (reactor == NULL) { 1018 continue; 1019 } 1020 1021 rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); 1022 if (rc < 0) { 1023 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 1024 assert(false); 1025 return; 1026 } 1027 } 1028 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 1029 } 1030 1031 /* Start the main reactor */ 1032 reactor = spdk_reactor_get(current_core); 1033 assert(reactor != NULL); 1034 reactor_run(reactor); 1035 1036 spdk_env_thread_wait_all(); 1037 1038 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 1039 } 1040 1041 void 1042 spdk_reactors_stop(void *arg1) 1043 { 1044 uint32_t i; 1045 int rc; 1046 struct spdk_reactor *reactor; 1047 struct spdk_reactor *local_reactor; 1048 uint64_t notify = 1; 1049 1050 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 1051 local_reactor = spdk_reactor_get(spdk_env_get_current_core()); 1052 1053 SPDK_ENV_FOREACH_CORE(i) { 1054 /* If spdk_event_call isn't called on a reactor, always send a notification. 1055 * If it is called on a reactor, send a notification if the destination reactor 1056 * is indicated in interrupt mode state. 1057 */ 1058 if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) { 1059 reactor = spdk_reactor_get(i); 1060 assert(reactor != NULL); 1061 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 1062 if (rc < 0) { 1063 SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno)); 1064 continue; 1065 } 1066 } 1067 } 1068 } 1069 1070 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 1071 static uint32_t g_next_core = UINT32_MAX; 1072 1073 static int 1074 thread_process_interrupts(void *arg) 1075 { 1076 struct spdk_thread *thread = arg; 1077 struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); 1078 uint64_t now; 1079 int rc; 1080 1081 /* Update idle_tsc between the end of last intr_fn and the start of this intr_fn. */ 1082 now = spdk_get_ticks(); 1083 reactor->idle_tsc += now - reactor->tsc_last; 1084 reactor->tsc_last = now; 1085 1086 rc = spdk_thread_poll(thread, 0, now); 1087 1088 /* Update tsc between the start and the end of this intr_fn. */ 1089 now = spdk_thread_get_last_tsc(thread); 1090 if (rc == 0) { 1091 reactor->idle_tsc += now - reactor->tsc_last; 1092 } else if (rc > 0) { 1093 reactor->busy_tsc += now - reactor->tsc_last; 1094 } 1095 reactor->tsc_last = now; 1096 1097 return rc; 1098 } 1099 1100 static void 1101 _schedule_thread(void *arg1, void *arg2) 1102 { 1103 struct spdk_lw_thread *lw_thread = arg1; 1104 struct spdk_reactor *reactor; 1105 uint32_t current_core; 1106 int efd; 1107 1108 current_core = spdk_env_get_current_core(); 1109 reactor = spdk_reactor_get(current_core); 1110 assert(reactor != NULL); 1111 1112 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 1113 reactor->thread_count++; 1114 1115 /* Operate thread intr if running with full interrupt ability */ 1116 if (spdk_interrupt_mode_is_enabled()) { 1117 int rc; 1118 struct spdk_thread *thread; 1119 1120 thread = spdk_thread_get_from_ctx(lw_thread); 1121 efd = spdk_thread_get_interrupt_fd(thread); 1122 rc = spdk_fd_group_add(reactor->fgrp, efd, thread_process_interrupts, thread); 1123 if (rc < 0) { 1124 SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc)); 1125 } 1126 } 1127 } 1128 1129 static int 1130 _reactor_schedule_thread(struct spdk_thread *thread) 1131 { 1132 uint32_t core; 1133 struct spdk_lw_thread *lw_thread; 1134 struct spdk_thread_stats last_stats; 1135 struct spdk_event *evt = NULL; 1136 struct spdk_cpuset *cpumask; 1137 uint32_t i; 1138 struct spdk_reactor *local_reactor = NULL; 1139 uint32_t current_lcore = spdk_env_get_current_core(); 1140 struct spdk_cpuset polling_cpumask; 1141 struct spdk_cpuset valid_cpumask; 1142 1143 cpumask = spdk_thread_get_cpumask(thread); 1144 1145 lw_thread = spdk_thread_get_ctx(thread); 1146 assert(lw_thread != NULL); 1147 core = lw_thread->lcore; 1148 last_stats = lw_thread->last_stats; 1149 memset(lw_thread, 0, sizeof(*lw_thread)); 1150 lw_thread->last_stats = last_stats; 1151 1152 if (current_lcore != SPDK_ENV_LCORE_ID_ANY) { 1153 local_reactor = spdk_reactor_get(current_lcore); 1154 assert(local_reactor); 1155 } 1156 1157 /* When interrupt ability of spdk_thread is not enabled and the current 1158 * reactor runs on DPDK thread, skip reactors which are in interrupt mode. 1159 */ 1160 if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) { 1161 /* Get the cpumask of all reactors in polling */ 1162 spdk_cpuset_zero(&polling_cpumask); 1163 SPDK_ENV_FOREACH_CORE(i) { 1164 spdk_cpuset_set_cpu(&polling_cpumask, i, true); 1165 } 1166 spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset); 1167 1168 if (core == SPDK_ENV_LCORE_ID_ANY) { 1169 /* Get the cpumask of all valid reactors which are suggested and also in polling */ 1170 spdk_cpuset_copy(&valid_cpumask, &polling_cpumask); 1171 spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread)); 1172 1173 /* If there are any valid reactors, spdk_thread should be scheduled 1174 * into one of the valid reactors. 1175 * If there is no valid reactors, spdk_thread should be scheduled 1176 * into one of the polling reactors. 1177 */ 1178 if (spdk_cpuset_count(&valid_cpumask) != 0) { 1179 cpumask = &valid_cpumask; 1180 } else { 1181 cpumask = &polling_cpumask; 1182 } 1183 } else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) { 1184 /* If specified reactor is not in polling, spdk_thread should be scheduled 1185 * into one of the polling reactors. 1186 */ 1187 core = SPDK_ENV_LCORE_ID_ANY; 1188 cpumask = &polling_cpumask; 1189 } 1190 } 1191 1192 pthread_mutex_lock(&g_scheduler_mtx); 1193 if (core == SPDK_ENV_LCORE_ID_ANY) { 1194 for (i = 0; i < spdk_env_get_core_count(); i++) { 1195 if (g_next_core >= g_reactor_count) { 1196 g_next_core = spdk_env_get_first_core(); 1197 } 1198 core = g_next_core; 1199 g_next_core = spdk_env_get_next_core(g_next_core); 1200 1201 if (spdk_cpuset_get_cpu(cpumask, core)) { 1202 break; 1203 } 1204 } 1205 } 1206 1207 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 1208 1209 pthread_mutex_unlock(&g_scheduler_mtx); 1210 1211 assert(evt != NULL); 1212 if (evt == NULL) { 1213 SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); 1214 return -1; 1215 } 1216 1217 lw_thread->tsc_start = spdk_get_ticks(); 1218 1219 spdk_event_call(evt); 1220 1221 return 0; 1222 } 1223 1224 static void 1225 _reactor_request_thread_reschedule(struct spdk_thread *thread) 1226 { 1227 struct spdk_lw_thread *lw_thread; 1228 struct spdk_reactor *reactor; 1229 uint32_t current_core; 1230 1231 assert(thread == spdk_get_thread()); 1232 1233 lw_thread = spdk_thread_get_ctx(thread); 1234 1235 _spdk_lw_thread_set_core(lw_thread, SPDK_ENV_LCORE_ID_ANY); 1236 1237 current_core = spdk_env_get_current_core(); 1238 reactor = spdk_reactor_get(current_core); 1239 assert(reactor != NULL); 1240 1241 /* Send a notification if the destination reactor is indicated in intr mode state */ 1242 if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) { 1243 uint64_t notify = 1; 1244 1245 if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1246 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 1247 } 1248 } 1249 } 1250 1251 static int 1252 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) 1253 { 1254 struct spdk_lw_thread *lw_thread; 1255 1256 switch (op) { 1257 case SPDK_THREAD_OP_NEW: 1258 lw_thread = spdk_thread_get_ctx(thread); 1259 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 1260 return _reactor_schedule_thread(thread); 1261 case SPDK_THREAD_OP_RESCHED: 1262 _reactor_request_thread_reschedule(thread); 1263 return 0; 1264 default: 1265 return -ENOTSUP; 1266 } 1267 } 1268 1269 static bool 1270 reactor_thread_op_supported(enum spdk_thread_op op) 1271 { 1272 switch (op) { 1273 case SPDK_THREAD_OP_NEW: 1274 case SPDK_THREAD_OP_RESCHED: 1275 return true; 1276 default: 1277 return false; 1278 } 1279 } 1280 1281 struct call_reactor { 1282 uint32_t cur_core; 1283 spdk_event_fn fn; 1284 void *arg1; 1285 void *arg2; 1286 1287 uint32_t orig_core; 1288 spdk_event_fn cpl; 1289 }; 1290 1291 static void 1292 on_reactor(void *arg1, void *arg2) 1293 { 1294 struct call_reactor *cr = arg1; 1295 struct spdk_event *evt; 1296 1297 cr->fn(cr->arg1, cr->arg2); 1298 1299 cr->cur_core = spdk_env_get_next_core(cr->cur_core); 1300 1301 if (cr->cur_core >= g_reactor_count) { 1302 SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n"); 1303 1304 evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); 1305 free(cr); 1306 } else { 1307 SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n", 1308 cr->cur_core); 1309 1310 evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); 1311 } 1312 assert(evt != NULL); 1313 spdk_event_call(evt); 1314 } 1315 1316 void 1317 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) 1318 { 1319 struct call_reactor *cr; 1320 struct spdk_event *evt; 1321 1322 cr = calloc(1, sizeof(*cr)); 1323 if (!cr) { 1324 SPDK_ERRLOG("Unable to perform reactor iteration\n"); 1325 cpl(arg1, arg2); 1326 return; 1327 } 1328 1329 cr->fn = fn; 1330 cr->arg1 = arg1; 1331 cr->arg2 = arg2; 1332 cr->cpl = cpl; 1333 cr->orig_core = spdk_env_get_current_core(); 1334 cr->cur_core = spdk_env_get_first_core(); 1335 1336 SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core); 1337 1338 evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL); 1339 assert(evt != NULL); 1340 1341 spdk_event_call(evt); 1342 } 1343 1344 #ifdef __linux__ 1345 static int 1346 reactor_schedule_thread_event(void *arg) 1347 { 1348 struct spdk_reactor *reactor = arg; 1349 struct spdk_lw_thread *lw_thread, *tmp; 1350 uint32_t count = 0; 1351 uint64_t notify = 1; 1352 1353 assert(reactor->in_interrupt); 1354 1355 if (read(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1356 SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno)); 1357 return -errno; 1358 } 1359 1360 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1361 count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0; 1362 } 1363 1364 return count; 1365 } 1366 1367 static int 1368 reactor_interrupt_init(struct spdk_reactor *reactor) 1369 { 1370 int rc; 1371 1372 rc = spdk_fd_group_create(&reactor->fgrp); 1373 if (rc != 0) { 1374 return rc; 1375 } 1376 1377 reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1378 if (reactor->resched_fd < 0) { 1379 rc = -EBADF; 1380 goto err; 1381 } 1382 1383 rc = spdk_fd_group_add(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event, 1384 reactor); 1385 if (rc) { 1386 close(reactor->resched_fd); 1387 goto err; 1388 } 1389 1390 reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1391 if (reactor->events_fd < 0) { 1392 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1393 close(reactor->resched_fd); 1394 1395 rc = -EBADF; 1396 goto err; 1397 } 1398 1399 rc = spdk_fd_group_add(reactor->fgrp, reactor->events_fd, 1400 (spdk_fd_fn)event_queue_run_batch, reactor); 1401 if (rc) { 1402 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1403 close(reactor->resched_fd); 1404 close(reactor->events_fd); 1405 goto err; 1406 } 1407 1408 return 0; 1409 1410 err: 1411 spdk_fd_group_destroy(reactor->fgrp); 1412 reactor->fgrp = NULL; 1413 return rc; 1414 } 1415 #else 1416 static int 1417 reactor_interrupt_init(struct spdk_reactor *reactor) 1418 { 1419 return -ENOTSUP; 1420 } 1421 #endif 1422 1423 static void 1424 reactor_interrupt_fini(struct spdk_reactor *reactor) 1425 { 1426 struct spdk_fd_group *fgrp = reactor->fgrp; 1427 1428 if (!fgrp) { 1429 return; 1430 } 1431 1432 spdk_fd_group_remove(fgrp, reactor->events_fd); 1433 spdk_fd_group_remove(fgrp, reactor->resched_fd); 1434 1435 close(reactor->events_fd); 1436 close(reactor->resched_fd); 1437 1438 spdk_fd_group_destroy(fgrp); 1439 reactor->fgrp = NULL; 1440 } 1441 1442 void 1443 _spdk_lw_thread_set_core(struct spdk_lw_thread *thread, uint32_t lcore) 1444 { 1445 assert(thread != NULL); 1446 thread->lcore = lcore; 1447 thread->resched = true; 1448 } 1449 1450 void 1451 _spdk_lw_thread_get_current_stats(struct spdk_lw_thread *thread, struct spdk_thread_stats *stats) 1452 { 1453 assert(thread != NULL); 1454 *stats = thread->current_stats; 1455 } 1456 1457 static int 1458 _governor_get_capabilities(uint32_t lcore_id, struct spdk_governor_capabilities *capabilities) 1459 { 1460 capabilities->freq_change = false; 1461 capabilities->freq_getset = false; 1462 capabilities->freq_up = false; 1463 capabilities->freq_down = false; 1464 capabilities->freq_max = false; 1465 capabilities->freq_min = false; 1466 capabilities->turbo_set = false; 1467 capabilities->priority = false; 1468 capabilities->turbo_available = false; 1469 1470 return 0; 1471 } 1472 1473 static struct spdk_governor * 1474 _governor_find(char *name) 1475 { 1476 struct spdk_governor *governor, *tmp; 1477 1478 TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) { 1479 if (strcmp(name, governor->name) == 0) { 1480 return governor; 1481 } 1482 } 1483 1484 return NULL; 1485 } 1486 1487 int 1488 _spdk_governor_set(char *name) 1489 { 1490 struct spdk_governor *governor; 1491 uint32_t i; 1492 int rc; 1493 1494 governor = _governor_find(name); 1495 if (governor == NULL) { 1496 return -EINVAL; 1497 } 1498 1499 g_governor = *governor; 1500 1501 if (g_governor.init) { 1502 rc = g_governor.init(); 1503 if (rc != 0) { 1504 return rc; 1505 } 1506 } 1507 1508 SPDK_ENV_FOREACH_CORE(i) { 1509 if (g_governor.init_core) { 1510 rc = g_governor.init_core(i); 1511 if (rc != 0) { 1512 return rc; 1513 } 1514 } 1515 } 1516 1517 return 0; 1518 } 1519 1520 struct spdk_governor * 1521 _spdk_governor_get(void) 1522 { 1523 return &g_governor; 1524 } 1525 1526 void 1527 _spdk_governor_list_add(struct spdk_governor *governor) 1528 { 1529 if (_governor_find(governor->name)) { 1530 SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name); 1531 assert(false); 1532 return; 1533 } 1534 1535 TAILQ_INSERT_TAIL(&g_governor_list, governor, link); 1536 } 1537 1538 SPDK_LOG_REGISTER_COMPONENT(reactor) 1539