1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/likely.h" 36 37 #include "spdk_internal/event.h" 38 39 #include "spdk/log.h" 40 #include "spdk/thread.h" 41 #include "spdk/env.h" 42 #include "spdk/util.h" 43 #include "spdk/string.h" 44 #include "spdk/fd_group.h" 45 46 #ifdef __linux__ 47 #include <sys/prctl.h> 48 #include <sys/eventfd.h> 49 #endif 50 51 #ifdef __FreeBSD__ 52 #include <pthread_np.h> 53 #endif 54 55 #define SPDK_EVENT_BATCH_SIZE 8 56 57 static struct spdk_reactor *g_reactors; 58 static struct spdk_cpuset g_reactor_core_mask; 59 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; 60 61 static bool g_framework_context_switch_monitor_enabled = true; 62 63 static struct spdk_mempool *g_spdk_event_mempool = NULL; 64 65 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list 66 = TAILQ_HEAD_INITIALIZER(g_scheduler_list); 67 68 static struct spdk_scheduler *g_scheduler; 69 static struct spdk_scheduler *g_new_scheduler; 70 static struct spdk_reactor *g_scheduling_reactor; 71 static uint32_t g_scheduler_period; 72 static struct spdk_scheduler_core_info *g_core_infos = NULL; 73 74 TAILQ_HEAD(, spdk_governor) g_governor_list 75 = TAILQ_HEAD_INITIALIZER(g_governor_list); 76 77 static int _governor_get_capabilities(uint32_t lcore_id, 78 struct spdk_governor_capabilities *capabilities); 79 80 static struct spdk_governor g_governor = { 81 .name = "default", 82 .get_core_capabilities = _governor_get_capabilities, 83 }; 84 85 static int reactor_interrupt_init(struct spdk_reactor *reactor); 86 static void reactor_interrupt_fini(struct spdk_reactor *reactor); 87 88 static struct spdk_scheduler * 89 _scheduler_find(char *name) 90 { 91 struct spdk_scheduler *tmp; 92 93 TAILQ_FOREACH(tmp, &g_scheduler_list, link) { 94 if (strcmp(name, tmp->name) == 0) { 95 return tmp; 96 } 97 } 98 99 return NULL; 100 } 101 102 int 103 _spdk_scheduler_set(char *name) 104 { 105 struct spdk_scheduler *scheduler; 106 107 scheduler = _scheduler_find(name); 108 if (scheduler == NULL) { 109 SPDK_ERRLOG("Requested scheduler is missing\n"); 110 return -ENOENT; 111 } 112 113 if (g_reactors == NULL || g_scheduling_reactor == NULL) { 114 g_new_scheduler = scheduler; 115 g_scheduler = scheduler; 116 return 0; 117 } 118 119 if (g_scheduling_reactor->flags.is_scheduling) { 120 g_new_scheduler = scheduler; 121 } else { 122 if (g_scheduler->deinit != NULL) { 123 g_scheduler->deinit(&g_governor); 124 } 125 126 g_new_scheduler = scheduler; 127 g_scheduler = scheduler; 128 } 129 130 if (scheduler->init != NULL) { 131 scheduler->init(&g_governor); 132 } 133 134 return 0; 135 } 136 137 void 138 _spdk_scheduler_period_set(uint32_t period) 139 { 140 g_scheduler_period = period; 141 } 142 143 void 144 _spdk_scheduler_list_add(struct spdk_scheduler *scheduler) 145 { 146 if (_scheduler_find(scheduler->name)) { 147 SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name); 148 assert(false); 149 return; 150 } 151 152 TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link); 153 } 154 155 static void 156 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 157 { 158 reactor->lcore = lcore; 159 reactor->flags.is_valid = true; 160 161 TAILQ_INIT(&reactor->threads); 162 reactor->thread_count = 0; 163 164 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 165 assert(reactor->events != NULL); 166 167 if (spdk_interrupt_mode_is_enabled()) { 168 reactor_interrupt_init(reactor); 169 } 170 } 171 172 struct spdk_reactor * 173 spdk_reactor_get(uint32_t lcore) 174 { 175 struct spdk_reactor *reactor; 176 177 if (g_reactors == NULL) { 178 SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); 179 return NULL; 180 } 181 182 reactor = &g_reactors[lcore]; 183 184 if (reactor->flags.is_valid == false) { 185 return NULL; 186 } 187 188 return reactor; 189 } 190 191 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); 192 static bool reactor_thread_op_supported(enum spdk_thread_op op); 193 194 int 195 spdk_reactors_init(void) 196 { 197 int rc; 198 uint32_t i, last_core; 199 char mempool_name[32]; 200 201 rc = _spdk_scheduler_set("static"); 202 if (rc != 0) { 203 SPDK_ERRLOG("Failed setting up scheduler\n"); 204 return rc; 205 } 206 207 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 208 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 209 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 210 sizeof(struct spdk_event), 211 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 212 SPDK_ENV_SOCKET_ID_ANY); 213 214 if (g_spdk_event_mempool == NULL) { 215 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 216 return -1; 217 } 218 219 /* struct spdk_reactor must be aligned on 64 byte boundary */ 220 last_core = spdk_env_get_last_core(); 221 rc = posix_memalign((void **)&g_reactors, 64, 222 (last_core + 1) * sizeof(struct spdk_reactor)); 223 if (rc != 0) { 224 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 225 last_core + 1); 226 spdk_mempool_free(g_spdk_event_mempool); 227 return -1; 228 } 229 230 g_core_infos = calloc(last_core + 1, sizeof(*g_core_infos)); 231 if (g_core_infos == NULL) { 232 SPDK_ERRLOG("Could not allocate memory for g_core_infos\n"); 233 spdk_mempool_free(g_spdk_event_mempool); 234 free(g_reactors); 235 return -ENOMEM; 236 } 237 238 memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor)); 239 240 spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, 241 sizeof(struct spdk_lw_thread)); 242 243 SPDK_ENV_FOREACH_CORE(i) { 244 reactor_construct(&g_reactors[i], i); 245 } 246 247 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 248 249 return 0; 250 } 251 252 void 253 spdk_reactors_fini(void) 254 { 255 uint32_t i; 256 struct spdk_reactor *reactor; 257 258 if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { 259 return; 260 } 261 262 if (g_scheduler->deinit != NULL) { 263 g_scheduler->deinit(&g_governor); 264 } 265 266 spdk_thread_lib_fini(); 267 268 SPDK_ENV_FOREACH_CORE(i) { 269 reactor = spdk_reactor_get(i); 270 assert(reactor != NULL); 271 assert(reactor->thread_count == 0); 272 if (reactor->events != NULL) { 273 spdk_ring_free(reactor->events); 274 } 275 276 if (reactor->interrupt_mode) { 277 reactor_interrupt_fini(reactor); 278 } 279 280 if (g_core_infos != NULL) { 281 free(g_core_infos[i].threads); 282 } 283 } 284 285 spdk_mempool_free(g_spdk_event_mempool); 286 287 free(g_reactors); 288 g_reactors = NULL; 289 free(g_core_infos); 290 g_core_infos = NULL; 291 } 292 293 struct spdk_event * 294 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 295 { 296 struct spdk_event *event = NULL; 297 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 298 299 if (!reactor) { 300 assert(false); 301 return NULL; 302 } 303 304 event = spdk_mempool_get(g_spdk_event_mempool); 305 if (event == NULL) { 306 assert(false); 307 return NULL; 308 } 309 310 event->lcore = lcore; 311 event->fn = fn; 312 event->arg1 = arg1; 313 event->arg2 = arg2; 314 315 return event; 316 } 317 318 void 319 spdk_event_call(struct spdk_event *event) 320 { 321 int rc; 322 struct spdk_reactor *reactor; 323 324 reactor = spdk_reactor_get(event->lcore); 325 326 assert(reactor != NULL); 327 assert(reactor->events != NULL); 328 329 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); 330 if (rc != 1) { 331 assert(false); 332 } 333 334 if (reactor->interrupt_mode) { 335 uint64_t notify = 1; 336 337 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 338 if (rc < 0) { 339 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 340 } 341 } 342 } 343 344 static inline uint32_t 345 event_queue_run_batch(struct spdk_reactor *reactor) 346 { 347 unsigned count, i; 348 void *events[SPDK_EVENT_BATCH_SIZE]; 349 struct spdk_thread *thread; 350 struct spdk_lw_thread *lw_thread; 351 352 #ifdef DEBUG 353 /* 354 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 355 * so we will never actually read uninitialized data from events, but just to be sure 356 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 357 */ 358 memset(events, 0, sizeof(events)); 359 #endif 360 361 if (reactor->interrupt_mode) { 362 uint64_t notify = 1; 363 int rc; 364 365 /* There may be race between event_acknowledge and another producer's event_notify, 366 * so event_acknowledge should be applied ahead. And then check for self's event_notify. 367 * This can avoid event notification missing. 368 */ 369 rc = read(reactor->events_fd, ¬ify, sizeof(notify)); 370 if (rc < 0) { 371 SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno)); 372 return -errno; 373 } 374 375 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 376 377 if (spdk_ring_count(reactor->events) != 0) { 378 /* Trigger new notification if there are still events in event-queue waiting for processing. */ 379 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 380 if (rc < 0) { 381 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 382 return -errno; 383 } 384 } 385 } else { 386 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 387 } 388 389 if (count == 0) { 390 return 0; 391 } 392 393 /* Execute the events. There are still some remaining events 394 * that must occur on an SPDK thread. To accomodate those, try to 395 * run them on the first thread in the list, if it exists. */ 396 lw_thread = TAILQ_FIRST(&reactor->threads); 397 if (lw_thread) { 398 thread = spdk_thread_get_from_ctx(lw_thread); 399 } else { 400 thread = NULL; 401 } 402 403 spdk_set_thread(thread); 404 405 for (i = 0; i < count; i++) { 406 struct spdk_event *event = events[i]; 407 408 assert(event != NULL); 409 event->fn(event->arg1, event->arg2); 410 } 411 412 spdk_set_thread(NULL); 413 414 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 415 416 return count; 417 } 418 419 /* 1s */ 420 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 421 422 static int 423 get_rusage(struct spdk_reactor *reactor) 424 { 425 struct rusage rusage; 426 427 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 428 return -1; 429 } 430 431 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 432 SPDK_INFOLOG(reactor, 433 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 434 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 435 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 436 } 437 reactor->rusage = rusage; 438 439 return -1; 440 } 441 442 void 443 spdk_framework_enable_context_switch_monitor(bool enable) 444 { 445 /* This global is being read by multiple threads, so this isn't 446 * strictly thread safe. However, we're toggling between true and 447 * false here, and if a thread sees the value update later than it 448 * should, it's no big deal. */ 449 g_framework_context_switch_monitor_enabled = enable; 450 } 451 452 bool 453 spdk_framework_context_switch_monitor_enabled(void) 454 { 455 return g_framework_context_switch_monitor_enabled; 456 } 457 458 static void 459 _set_thread_name(const char *thread_name) 460 { 461 #if defined(__linux__) 462 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 463 #elif defined(__FreeBSD__) 464 pthread_set_name_np(pthread_self(), thread_name); 465 #else 466 #error missing platform support for thread name 467 #endif 468 } 469 470 static void 471 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 472 { 473 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 474 475 lw_thread->lcore = reactor->lcore; 476 477 spdk_set_thread(thread); 478 spdk_thread_get_stats(&lw_thread->current_stats); 479 } 480 481 static void 482 _threads_reschedule(struct spdk_scheduler_core_info *cores_info) 483 { 484 struct spdk_scheduler_core_info *core; 485 struct spdk_lw_thread *lw_thread; 486 uint32_t i, j; 487 488 SPDK_ENV_FOREACH_CORE(i) { 489 core = &cores_info[i]; 490 for (j = 0; j < core->threads_count; j++) { 491 lw_thread = core->threads[j]; 492 if (lw_thread->lcore != lw_thread->new_lcore) { 493 _spdk_lw_thread_set_core(lw_thread, lw_thread->new_lcore); 494 } 495 } 496 } 497 } 498 499 static void 500 _reactors_scheduler_fini(void *arg1, void *arg2) 501 { 502 struct spdk_reactor *reactor; 503 uint32_t last_core; 504 uint32_t i; 505 506 if (g_reactor_state == SPDK_REACTOR_STATE_RUNNING) { 507 last_core = spdk_env_get_last_core(); 508 g_scheduler->balance(g_core_infos, last_core + 1, &g_governor); 509 510 /* Reschedule based on the balancing output */ 511 _threads_reschedule(g_core_infos); 512 513 SPDK_ENV_FOREACH_CORE(i) { 514 reactor = spdk_reactor_get(i); 515 reactor->flags.is_scheduling = false; 516 } 517 } 518 } 519 520 static void 521 _reactors_scheduler_cancel(void *arg1, void *arg2) 522 { 523 struct spdk_reactor *reactor; 524 uint32_t i; 525 526 SPDK_ENV_FOREACH_CORE(i) { 527 reactor = spdk_reactor_get(i); 528 reactor->flags.is_scheduling = false; 529 } 530 } 531 532 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */ 533 static void 534 _reactors_scheduler_gather_metrics(void *arg1, void *arg2) 535 { 536 struct spdk_scheduler_core_info *core_info; 537 struct spdk_lw_thread *lw_thread; 538 struct spdk_reactor *reactor; 539 struct spdk_event *evt; 540 uint32_t next_core; 541 uint32_t i; 542 543 reactor = spdk_reactor_get(spdk_env_get_current_core()); 544 reactor->flags.is_scheduling = true; 545 core_info = &g_core_infos[reactor->lcore]; 546 core_info->lcore = reactor->lcore; 547 core_info->core_idle_tsc = reactor->idle_tsc; 548 core_info->core_busy_tsc = reactor->busy_tsc; 549 550 SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); 551 552 free(core_info->threads); 553 core_info->threads = NULL; 554 555 i = 0; 556 557 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 558 _init_thread_stats(reactor, lw_thread); 559 i++; 560 } 561 562 core_info->threads_count = i; 563 564 if (core_info->threads_count > 0) { 565 core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *)); 566 if (core_info->threads == NULL) { 567 SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore); 568 569 /* Cancel this round of schedule work */ 570 evt = spdk_event_allocate(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL); 571 spdk_event_call(evt); 572 return; 573 } 574 575 i = 0; 576 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 577 core_info->threads[i] = lw_thread; 578 i++; 579 } 580 } 581 582 next_core = spdk_env_get_next_core(reactor->lcore); 583 if (next_core == UINT32_MAX) { 584 next_core = spdk_env_get_first_core(); 585 } 586 587 /* If we've looped back around to the scheduler thread, move to the next phase */ 588 if (next_core == g_scheduling_reactor->lcore) { 589 /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */ 590 evt = spdk_event_allocate(next_core, _reactors_scheduler_fini, NULL, NULL); 591 spdk_event_call(evt); 592 return; 593 } 594 595 evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL); 596 spdk_event_call(evt); 597 } 598 599 static int _reactor_schedule_thread(struct spdk_thread *thread); 600 static uint64_t g_rusage_period; 601 602 static bool 603 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 604 { 605 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 606 int efd; 607 608 if (spdk_unlikely(lw_thread->resched)) { 609 lw_thread->resched = false; 610 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 611 assert(reactor->thread_count > 0); 612 reactor->thread_count--; 613 614 if (reactor->interrupt_mode) { 615 efd = spdk_thread_get_interrupt_fd(thread); 616 spdk_fd_group_remove(reactor->fgrp, efd); 617 } 618 _reactor_schedule_thread(thread); 619 return true; 620 } 621 622 if (spdk_unlikely(spdk_thread_is_exited(thread) && 623 spdk_thread_is_idle(thread))) { 624 if (reactor->flags.is_scheduling == false) { 625 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 626 assert(reactor->thread_count > 0); 627 reactor->thread_count--; 628 629 if (reactor->interrupt_mode) { 630 efd = spdk_thread_get_interrupt_fd(thread); 631 spdk_fd_group_remove(reactor->fgrp, efd); 632 } 633 spdk_thread_destroy(thread); 634 return true; 635 } 636 } 637 638 return false; 639 } 640 641 static void 642 reactor_interrupt_run(struct spdk_reactor *reactor) 643 { 644 int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */ 645 646 spdk_fd_group_wait(reactor->fgrp, block_timeout); 647 648 /* TODO: add tsc records and g_framework_context_switch_monitor_enabled */ 649 } 650 651 static void 652 _reactor_run(struct spdk_reactor *reactor) 653 { 654 struct spdk_thread *thread; 655 struct spdk_lw_thread *lw_thread, *tmp; 656 uint64_t now; 657 int rc; 658 659 event_queue_run_batch(reactor); 660 661 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 662 thread = spdk_thread_get_from_ctx(lw_thread); 663 rc = spdk_thread_poll(thread, 0, reactor->tsc_last); 664 665 now = spdk_thread_get_last_tsc(thread); 666 if (rc == 0) { 667 reactor->idle_tsc += now - reactor->tsc_last; 668 } else if (rc > 0) { 669 reactor->busy_tsc += now - reactor->tsc_last; 670 } 671 reactor->tsc_last = now; 672 673 reactor_post_process_lw_thread(reactor, lw_thread); 674 } 675 676 if (g_framework_context_switch_monitor_enabled) { 677 if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { 678 get_rusage(reactor); 679 reactor->last_rusage = reactor->tsc_last; 680 } 681 } 682 } 683 684 static int 685 reactor_run(void *arg) 686 { 687 struct spdk_reactor *reactor = arg; 688 struct spdk_thread *thread; 689 struct spdk_lw_thread *lw_thread, *tmp; 690 char thread_name[32]; 691 uint64_t last_sched = 0; 692 693 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 694 695 /* Rename the POSIX thread because the reactor is tied to the POSIX 696 * thread in the SPDK event library. 697 */ 698 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 699 _set_thread_name(thread_name); 700 701 reactor->tsc_last = spdk_get_ticks(); 702 703 while (1) { 704 if (spdk_unlikely(reactor->interrupt_mode)) { 705 reactor_interrupt_run(reactor); 706 } else { 707 _reactor_run(reactor); 708 } 709 710 if (spdk_unlikely((reactor->tsc_last - last_sched) > g_scheduler_period && 711 reactor == g_scheduling_reactor && 712 !reactor->flags.is_scheduling)) { 713 if (spdk_unlikely(g_scheduler != g_new_scheduler)) { 714 if (g_scheduler->deinit != NULL) { 715 g_scheduler->deinit(&g_governor); 716 } 717 g_scheduler = g_new_scheduler; 718 } 719 720 if (spdk_unlikely(g_scheduler->balance != NULL)) { 721 last_sched = reactor->tsc_last; 722 _reactors_scheduler_gather_metrics(NULL, NULL); 723 } 724 } 725 726 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 727 break; 728 } 729 } 730 731 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 732 thread = spdk_thread_get_from_ctx(lw_thread); 733 spdk_set_thread(thread); 734 spdk_thread_exit(thread); 735 } 736 737 while (!TAILQ_EMPTY(&reactor->threads)) { 738 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 739 thread = spdk_thread_get_from_ctx(lw_thread); 740 spdk_set_thread(thread); 741 if (spdk_thread_is_exited(thread)) { 742 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 743 assert(reactor->thread_count > 0); 744 reactor->thread_count--; 745 if (reactor->interrupt_mode) { 746 int efd = spdk_thread_get_interrupt_fd(thread); 747 748 spdk_fd_group_remove(reactor->fgrp, efd); 749 } 750 spdk_thread_destroy(thread); 751 } else { 752 spdk_thread_poll(thread, 0, 0); 753 } 754 } 755 } 756 757 return 0; 758 } 759 760 int 761 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 762 { 763 int ret; 764 const struct spdk_cpuset *validmask; 765 766 ret = spdk_cpuset_parse(cpumask, mask); 767 if (ret < 0) { 768 return ret; 769 } 770 771 validmask = spdk_app_get_core_mask(); 772 spdk_cpuset_and(cpumask, validmask); 773 774 return 0; 775 } 776 777 const struct spdk_cpuset * 778 spdk_app_get_core_mask(void) 779 { 780 return &g_reactor_core_mask; 781 } 782 783 void 784 spdk_reactors_start(void) 785 { 786 struct spdk_reactor *reactor; 787 struct spdk_cpuset tmp_cpumask = {}; 788 uint32_t i, current_core; 789 int rc; 790 char thread_name[32]; 791 792 g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; 793 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 794 795 current_core = spdk_env_get_current_core(); 796 SPDK_ENV_FOREACH_CORE(i) { 797 if (i != current_core) { 798 reactor = spdk_reactor_get(i); 799 if (reactor == NULL) { 800 continue; 801 } 802 803 rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); 804 if (rc < 0) { 805 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 806 assert(false); 807 return; 808 } 809 810 /* For now, for each reactor spawn one thread. */ 811 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 812 813 spdk_cpuset_zero(&tmp_cpumask); 814 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 815 816 spdk_thread_create(thread_name, &tmp_cpumask); 817 } 818 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 819 } 820 821 /* Start the master reactor */ 822 reactor = spdk_reactor_get(current_core); 823 assert(reactor != NULL); 824 g_scheduling_reactor = reactor; 825 reactor_run(reactor); 826 827 spdk_env_thread_wait_all(); 828 829 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 830 } 831 832 void 833 spdk_reactors_stop(void *arg1) 834 { 835 uint32_t i; 836 int rc; 837 struct spdk_reactor *reactor; 838 uint64_t notify = 1; 839 840 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 841 842 if (spdk_interrupt_mode_is_enabled()) { 843 SPDK_ENV_FOREACH_CORE(i) { 844 reactor = spdk_reactor_get(i); 845 846 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 847 if (rc < 0) { 848 SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno)); 849 continue; 850 } 851 } 852 } 853 } 854 855 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 856 static uint32_t g_next_core = UINT32_MAX; 857 858 static int 859 thread_process_interrupts(void *arg) 860 { 861 struct spdk_thread *thread = arg; 862 863 return spdk_thread_poll(thread, 0, 0); 864 } 865 866 static void 867 _schedule_thread(void *arg1, void *arg2) 868 { 869 struct spdk_lw_thread *lw_thread = arg1; 870 struct spdk_reactor *reactor; 871 uint32_t current_core; 872 int efd; 873 874 current_core = spdk_env_get_current_core(); 875 876 reactor = spdk_reactor_get(current_core); 877 assert(reactor != NULL); 878 879 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 880 reactor->thread_count++; 881 882 if (reactor->interrupt_mode) { 883 int rc; 884 struct spdk_thread *thread; 885 886 thread = spdk_thread_get_from_ctx(lw_thread); 887 efd = spdk_thread_get_interrupt_fd(thread); 888 rc = spdk_fd_group_add(reactor->fgrp, efd, thread_process_interrupts, thread); 889 if (rc < 0) { 890 SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc)); 891 } 892 } 893 } 894 895 static int 896 _reactor_schedule_thread(struct spdk_thread *thread) 897 { 898 uint32_t core; 899 struct spdk_lw_thread *lw_thread; 900 struct spdk_event *evt = NULL; 901 struct spdk_cpuset *cpumask; 902 uint32_t i; 903 904 cpumask = spdk_thread_get_cpumask(thread); 905 906 lw_thread = spdk_thread_get_ctx(thread); 907 assert(lw_thread != NULL); 908 core = lw_thread->lcore; 909 memset(lw_thread, 0, sizeof(*lw_thread)); 910 911 pthread_mutex_lock(&g_scheduler_mtx); 912 if (core == SPDK_ENV_LCORE_ID_ANY) { 913 for (i = 0; i < spdk_env_get_core_count(); i++) { 914 if (g_next_core > spdk_env_get_last_core()) { 915 g_next_core = spdk_env_get_first_core(); 916 } 917 core = g_next_core; 918 g_next_core = spdk_env_get_next_core(g_next_core); 919 920 if (spdk_cpuset_get_cpu(cpumask, core)) { 921 break; 922 } 923 } 924 } 925 926 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 927 928 pthread_mutex_unlock(&g_scheduler_mtx); 929 930 assert(evt != NULL); 931 if (evt == NULL) { 932 SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); 933 return -1; 934 } 935 936 lw_thread->tsc_start = spdk_get_ticks(); 937 938 spdk_event_call(evt); 939 940 return 0; 941 } 942 943 static void 944 _reactor_request_thread_reschedule(struct spdk_thread *thread) 945 { 946 struct spdk_lw_thread *lw_thread; 947 struct spdk_reactor *reactor; 948 uint32_t current_core; 949 950 assert(thread == spdk_get_thread()); 951 952 lw_thread = spdk_thread_get_ctx(thread); 953 954 _spdk_lw_thread_set_core(lw_thread, SPDK_ENV_LCORE_ID_ANY); 955 956 current_core = spdk_env_get_current_core(); 957 reactor = spdk_reactor_get(current_core); 958 assert(reactor != NULL); 959 if (reactor->interrupt_mode) { 960 uint64_t notify = 1; 961 962 if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 963 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 964 } 965 } 966 } 967 968 static int 969 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) 970 { 971 struct spdk_lw_thread *lw_thread; 972 973 switch (op) { 974 case SPDK_THREAD_OP_NEW: 975 lw_thread = spdk_thread_get_ctx(thread); 976 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 977 return _reactor_schedule_thread(thread); 978 case SPDK_THREAD_OP_RESCHED: 979 _reactor_request_thread_reschedule(thread); 980 return 0; 981 default: 982 return -ENOTSUP; 983 } 984 } 985 986 static bool 987 reactor_thread_op_supported(enum spdk_thread_op op) 988 { 989 switch (op) { 990 case SPDK_THREAD_OP_NEW: 991 case SPDK_THREAD_OP_RESCHED: 992 return true; 993 default: 994 return false; 995 } 996 } 997 998 struct call_reactor { 999 uint32_t cur_core; 1000 spdk_event_fn fn; 1001 void *arg1; 1002 void *arg2; 1003 1004 uint32_t orig_core; 1005 spdk_event_fn cpl; 1006 }; 1007 1008 static void 1009 on_reactor(void *arg1, void *arg2) 1010 { 1011 struct call_reactor *cr = arg1; 1012 struct spdk_event *evt; 1013 1014 cr->fn(cr->arg1, cr->arg2); 1015 1016 cr->cur_core = spdk_env_get_next_core(cr->cur_core); 1017 1018 if (cr->cur_core > spdk_env_get_last_core()) { 1019 SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n"); 1020 1021 evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); 1022 free(cr); 1023 } else { 1024 SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n", 1025 cr->cur_core); 1026 1027 evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); 1028 } 1029 assert(evt != NULL); 1030 spdk_event_call(evt); 1031 } 1032 1033 void 1034 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) 1035 { 1036 struct call_reactor *cr; 1037 struct spdk_event *evt; 1038 1039 cr = calloc(1, sizeof(*cr)); 1040 if (!cr) { 1041 SPDK_ERRLOG("Unable to perform reactor iteration\n"); 1042 cpl(arg1, arg2); 1043 return; 1044 } 1045 1046 cr->fn = fn; 1047 cr->arg1 = arg1; 1048 cr->arg2 = arg2; 1049 cr->cpl = cpl; 1050 cr->orig_core = spdk_env_get_current_core(); 1051 cr->cur_core = spdk_env_get_first_core(); 1052 1053 SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core); 1054 1055 evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL); 1056 assert(evt != NULL); 1057 1058 spdk_event_call(evt); 1059 } 1060 1061 #ifdef __linux__ 1062 static int 1063 reactor_schedule_thread_event(void *arg) 1064 { 1065 struct spdk_reactor *reactor = arg; 1066 struct spdk_lw_thread *lw_thread, *tmp; 1067 uint32_t count = 0; 1068 uint64_t notify = 1; 1069 1070 assert(reactor->interrupt_mode); 1071 1072 if (read(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1073 SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno)); 1074 return -errno; 1075 } 1076 1077 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1078 count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0; 1079 } 1080 1081 return count; 1082 } 1083 1084 static int 1085 reactor_interrupt_init(struct spdk_reactor *reactor) 1086 { 1087 int rc; 1088 1089 rc = spdk_fd_group_create(&reactor->fgrp); 1090 if (rc != 0) { 1091 return rc; 1092 } 1093 1094 reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1095 if (reactor->resched_fd < 0) { 1096 rc = -EBADF; 1097 goto err; 1098 } 1099 1100 rc = spdk_fd_group_add(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event, 1101 reactor); 1102 if (rc) { 1103 close(reactor->resched_fd); 1104 goto err; 1105 } 1106 1107 reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1108 if (reactor->events_fd < 0) { 1109 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1110 close(reactor->resched_fd); 1111 1112 rc = -EBADF; 1113 goto err; 1114 } 1115 1116 rc = spdk_fd_group_add(reactor->fgrp, reactor->events_fd, 1117 (spdk_fd_fn)event_queue_run_batch, reactor); 1118 if (rc) { 1119 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1120 close(reactor->resched_fd); 1121 close(reactor->events_fd); 1122 goto err; 1123 } 1124 1125 reactor->interrupt_mode = true; 1126 return 0; 1127 1128 err: 1129 spdk_fd_group_destroy(reactor->fgrp); 1130 return rc; 1131 } 1132 #else 1133 static int 1134 reactor_interrupt_init(struct spdk_reactor *reactor) 1135 { 1136 return -ENOTSUP; 1137 } 1138 #endif 1139 1140 static void 1141 reactor_interrupt_fini(struct spdk_reactor *reactor) 1142 { 1143 struct spdk_fd_group *fgrp = reactor->fgrp; 1144 1145 if (!fgrp) { 1146 return; 1147 } 1148 1149 spdk_fd_group_remove(fgrp, reactor->events_fd); 1150 spdk_fd_group_remove(fgrp, reactor->resched_fd); 1151 1152 close(reactor->events_fd); 1153 close(reactor->resched_fd); 1154 1155 spdk_fd_group_destroy(fgrp); 1156 reactor->fgrp = NULL; 1157 } 1158 1159 void 1160 _spdk_lw_thread_set_core(struct spdk_lw_thread *thread, uint32_t lcore) 1161 { 1162 assert(thread != NULL); 1163 thread->lcore = lcore; 1164 thread->resched = true; 1165 } 1166 1167 void 1168 _spdk_lw_thread_get_current_stats(struct spdk_lw_thread *thread, struct spdk_thread_stats *stats) 1169 { 1170 assert(thread != NULL); 1171 *stats = thread->current_stats; 1172 } 1173 1174 static int 1175 _governor_get_capabilities(uint32_t lcore_id, struct spdk_governor_capabilities *capabilities) 1176 { 1177 capabilities->freq_change = false; 1178 capabilities->freq_getset = false; 1179 capabilities->freq_up = false; 1180 capabilities->freq_down = false; 1181 capabilities->freq_max = false; 1182 capabilities->freq_min = false; 1183 capabilities->turbo_set = false; 1184 capabilities->priority = false; 1185 capabilities->turbo_available = false; 1186 1187 return 0; 1188 } 1189 1190 static struct spdk_governor * 1191 _governor_find(char *name) 1192 { 1193 struct spdk_governor *governor, *tmp; 1194 1195 TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) { 1196 if (strcmp(name, governor->name) == 0) { 1197 return governor; 1198 } 1199 } 1200 1201 return NULL; 1202 } 1203 1204 int 1205 _spdk_governor_set(char *name) 1206 { 1207 struct spdk_governor *governor; 1208 uint32_t i; 1209 int rc; 1210 1211 governor = _governor_find(name); 1212 if (governor == NULL) { 1213 return -EINVAL; 1214 } 1215 1216 g_governor = *governor; 1217 1218 if (g_governor.init) { 1219 rc = g_governor.init(); 1220 if (rc != 0) { 1221 return rc; 1222 } 1223 } 1224 1225 SPDK_ENV_FOREACH_CORE(i) { 1226 if (g_governor.init_core) { 1227 rc = g_governor.init_core(i); 1228 if (rc != 0) { 1229 return rc; 1230 } 1231 } 1232 } 1233 1234 return 0; 1235 } 1236 1237 void 1238 _spdk_governor_list_add(struct spdk_governor *governor) 1239 { 1240 if (_governor_find(governor->name)) { 1241 SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name); 1242 assert(false); 1243 return; 1244 } 1245 1246 TAILQ_INSERT_TAIL(&g_governor_list, governor, link); 1247 } 1248 1249 SPDK_LOG_REGISTER_COMPONENT(reactor) 1250