1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/likely.h" 36 37 #include "spdk_internal/event.h" 38 39 #include "spdk/log.h" 40 #include "spdk/thread.h" 41 #include "spdk/env.h" 42 #include "spdk/util.h" 43 #include "spdk/string.h" 44 #include "spdk/fd_group.h" 45 46 #ifdef __linux__ 47 #include <sys/prctl.h> 48 #include <sys/eventfd.h> 49 #endif 50 51 #ifdef __FreeBSD__ 52 #include <pthread_np.h> 53 #endif 54 55 #define SPDK_EVENT_BATCH_SIZE 8 56 57 static struct spdk_reactor *g_reactors; 58 static struct spdk_cpuset g_reactor_core_mask; 59 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; 60 61 static bool g_framework_context_switch_monitor_enabled = true; 62 63 static struct spdk_mempool *g_spdk_event_mempool = NULL; 64 65 TAILQ_HEAD(, spdk_scheduler) g_scheduler_list 66 = TAILQ_HEAD_INITIALIZER(g_scheduler_list); 67 68 static struct spdk_scheduler *g_scheduler; 69 static struct spdk_scheduler *g_new_scheduler; 70 static struct spdk_reactor *g_scheduling_reactor; 71 static uint32_t g_scheduler_period; 72 static struct spdk_scheduler_core_info *g_core_infos = NULL; 73 74 TAILQ_HEAD(, spdk_governor) g_governor_list 75 = TAILQ_HEAD_INITIALIZER(g_governor_list); 76 77 static int _governor_get_capabilities(uint32_t lcore_id, 78 struct spdk_governor_capabilities *capabilities); 79 80 static struct spdk_governor g_governor = { 81 .name = "default", 82 .get_core_capabilities = _governor_get_capabilities, 83 }; 84 85 static int reactor_interrupt_init(struct spdk_reactor *reactor); 86 static void reactor_interrupt_fini(struct spdk_reactor *reactor); 87 88 static struct spdk_scheduler * 89 _scheduler_find(char *name) 90 { 91 struct spdk_scheduler *tmp; 92 93 TAILQ_FOREACH(tmp, &g_scheduler_list, link) { 94 if (strcmp(name, tmp->name) == 0) { 95 return tmp; 96 } 97 } 98 99 return NULL; 100 } 101 102 int 103 _spdk_scheduler_set(char *name) 104 { 105 struct spdk_scheduler *scheduler; 106 107 scheduler = _scheduler_find(name); 108 if (scheduler == NULL) { 109 SPDK_ERRLOG("Requested scheduler is missing\n"); 110 return -ENOENT; 111 } 112 113 if (g_reactors == NULL || g_scheduling_reactor == NULL) { 114 g_new_scheduler = scheduler; 115 g_scheduler = scheduler; 116 return 0; 117 } 118 119 if (g_scheduling_reactor->flags.is_scheduling) { 120 g_new_scheduler = scheduler; 121 } else { 122 if (g_scheduler->deinit != NULL) { 123 g_scheduler->deinit(&g_governor); 124 } 125 126 g_new_scheduler = scheduler; 127 g_scheduler = scheduler; 128 } 129 130 if (scheduler->init != NULL) { 131 scheduler->init(&g_governor); 132 } 133 134 return 0; 135 } 136 137 void 138 _spdk_scheduler_period_set(uint32_t period) 139 { 140 g_scheduler_period = period; 141 } 142 143 void 144 _spdk_scheduler_list_add(struct spdk_scheduler *scheduler) 145 { 146 if (_scheduler_find(scheduler->name)) { 147 SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name); 148 assert(false); 149 return; 150 } 151 152 TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link); 153 } 154 155 static void 156 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 157 { 158 reactor->lcore = lcore; 159 reactor->flags.is_valid = true; 160 161 TAILQ_INIT(&reactor->threads); 162 reactor->thread_count = 0; 163 164 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 165 if (reactor->events == NULL) { 166 SPDK_ERRLOG("Failed to allocate events ring\n"); 167 assert(false); 168 } 169 170 if (spdk_interrupt_mode_is_enabled()) { 171 reactor_interrupt_init(reactor); 172 } 173 } 174 175 struct spdk_reactor * 176 spdk_reactor_get(uint32_t lcore) 177 { 178 struct spdk_reactor *reactor; 179 180 if (g_reactors == NULL) { 181 SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); 182 return NULL; 183 } 184 185 reactor = &g_reactors[lcore]; 186 187 if (reactor->flags.is_valid == false) { 188 return NULL; 189 } 190 191 return reactor; 192 } 193 194 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); 195 static bool reactor_thread_op_supported(enum spdk_thread_op op); 196 197 int 198 spdk_reactors_init(void) 199 { 200 int rc; 201 uint32_t i, last_core; 202 char mempool_name[32]; 203 204 rc = _spdk_scheduler_set("static"); 205 if (rc != 0) { 206 SPDK_ERRLOG("Failed setting up scheduler\n"); 207 return rc; 208 } 209 210 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 211 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 212 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 213 sizeof(struct spdk_event), 214 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 215 SPDK_ENV_SOCKET_ID_ANY); 216 217 if (g_spdk_event_mempool == NULL) { 218 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 219 return -1; 220 } 221 222 /* struct spdk_reactor must be aligned on 64 byte boundary */ 223 last_core = spdk_env_get_last_core(); 224 rc = posix_memalign((void **)&g_reactors, 64, 225 (last_core + 1) * sizeof(struct spdk_reactor)); 226 if (rc != 0) { 227 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 228 last_core + 1); 229 spdk_mempool_free(g_spdk_event_mempool); 230 return -1; 231 } 232 233 g_core_infos = calloc(last_core + 1, sizeof(*g_core_infos)); 234 if (g_core_infos == NULL) { 235 SPDK_ERRLOG("Could not allocate memory for g_core_infos\n"); 236 spdk_mempool_free(g_spdk_event_mempool); 237 free(g_reactors); 238 return -ENOMEM; 239 } 240 241 memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor)); 242 243 spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, 244 sizeof(struct spdk_lw_thread)); 245 246 SPDK_ENV_FOREACH_CORE(i) { 247 reactor_construct(&g_reactors[i], i); 248 } 249 250 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 251 252 return 0; 253 } 254 255 void 256 spdk_reactors_fini(void) 257 { 258 uint32_t i; 259 struct spdk_reactor *reactor; 260 261 if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { 262 return; 263 } 264 265 if (g_scheduler->deinit != NULL) { 266 g_scheduler->deinit(&g_governor); 267 } 268 269 spdk_thread_lib_fini(); 270 271 SPDK_ENV_FOREACH_CORE(i) { 272 reactor = spdk_reactor_get(i); 273 assert(reactor != NULL); 274 assert(reactor->thread_count == 0); 275 if (reactor->events != NULL) { 276 spdk_ring_free(reactor->events); 277 } 278 279 if (reactor->interrupt_mode) { 280 reactor_interrupt_fini(reactor); 281 } 282 283 if (g_core_infos != NULL) { 284 free(g_core_infos[i].threads); 285 } 286 } 287 288 spdk_mempool_free(g_spdk_event_mempool); 289 290 free(g_reactors); 291 g_reactors = NULL; 292 free(g_core_infos); 293 g_core_infos = NULL; 294 } 295 296 struct spdk_event * 297 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 298 { 299 struct spdk_event *event = NULL; 300 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 301 302 if (!reactor) { 303 assert(false); 304 return NULL; 305 } 306 307 event = spdk_mempool_get(g_spdk_event_mempool); 308 if (event == NULL) { 309 assert(false); 310 return NULL; 311 } 312 313 event->lcore = lcore; 314 event->fn = fn; 315 event->arg1 = arg1; 316 event->arg2 = arg2; 317 318 return event; 319 } 320 321 void 322 spdk_event_call(struct spdk_event *event) 323 { 324 int rc; 325 struct spdk_reactor *reactor; 326 327 reactor = spdk_reactor_get(event->lcore); 328 329 assert(reactor != NULL); 330 assert(reactor->events != NULL); 331 332 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); 333 if (rc != 1) { 334 assert(false); 335 } 336 337 if (reactor->interrupt_mode) { 338 uint64_t notify = 1; 339 340 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 341 if (rc < 0) { 342 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 343 } 344 } 345 } 346 347 static inline uint32_t 348 event_queue_run_batch(struct spdk_reactor *reactor) 349 { 350 unsigned count, i; 351 void *events[SPDK_EVENT_BATCH_SIZE]; 352 struct spdk_thread *thread; 353 struct spdk_lw_thread *lw_thread; 354 355 #ifdef DEBUG 356 /* 357 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 358 * so we will never actually read uninitialized data from events, but just to be sure 359 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 360 */ 361 memset(events, 0, sizeof(events)); 362 #endif 363 364 if (reactor->interrupt_mode) { 365 uint64_t notify = 1; 366 int rc; 367 368 /* There may be race between event_acknowledge and another producer's event_notify, 369 * so event_acknowledge should be applied ahead. And then check for self's event_notify. 370 * This can avoid event notification missing. 371 */ 372 rc = read(reactor->events_fd, ¬ify, sizeof(notify)); 373 if (rc < 0) { 374 SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno)); 375 return -errno; 376 } 377 378 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 379 380 if (spdk_ring_count(reactor->events) != 0) { 381 /* Trigger new notification if there are still events in event-queue waiting for processing. */ 382 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 383 if (rc < 0) { 384 SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); 385 return -errno; 386 } 387 } 388 } else { 389 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 390 } 391 392 if (count == 0) { 393 return 0; 394 } 395 396 /* Execute the events. There are still some remaining events 397 * that must occur on an SPDK thread. To accomodate those, try to 398 * run them on the first thread in the list, if it exists. */ 399 lw_thread = TAILQ_FIRST(&reactor->threads); 400 if (lw_thread) { 401 thread = spdk_thread_get_from_ctx(lw_thread); 402 } else { 403 thread = NULL; 404 } 405 406 spdk_set_thread(thread); 407 408 for (i = 0; i < count; i++) { 409 struct spdk_event *event = events[i]; 410 411 assert(event != NULL); 412 event->fn(event->arg1, event->arg2); 413 } 414 415 spdk_set_thread(NULL); 416 417 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 418 419 return count; 420 } 421 422 /* 1s */ 423 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 424 425 static int 426 get_rusage(struct spdk_reactor *reactor) 427 { 428 struct rusage rusage; 429 430 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 431 return -1; 432 } 433 434 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 435 SPDK_INFOLOG(reactor, 436 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 437 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 438 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 439 } 440 reactor->rusage = rusage; 441 442 return -1; 443 } 444 445 void 446 spdk_framework_enable_context_switch_monitor(bool enable) 447 { 448 /* This global is being read by multiple threads, so this isn't 449 * strictly thread safe. However, we're toggling between true and 450 * false here, and if a thread sees the value update later than it 451 * should, it's no big deal. */ 452 g_framework_context_switch_monitor_enabled = enable; 453 } 454 455 bool 456 spdk_framework_context_switch_monitor_enabled(void) 457 { 458 return g_framework_context_switch_monitor_enabled; 459 } 460 461 static void 462 _set_thread_name(const char *thread_name) 463 { 464 #if defined(__linux__) 465 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 466 #elif defined(__FreeBSD__) 467 pthread_set_name_np(pthread_self(), thread_name); 468 #else 469 #error missing platform support for thread name 470 #endif 471 } 472 473 static void 474 _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 475 { 476 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 477 478 lw_thread->lcore = reactor->lcore; 479 480 spdk_set_thread(thread); 481 spdk_thread_get_stats(&lw_thread->current_stats); 482 } 483 484 static void 485 _threads_reschedule(struct spdk_scheduler_core_info *cores_info) 486 { 487 struct spdk_scheduler_core_info *core; 488 struct spdk_lw_thread *lw_thread; 489 uint32_t i, j; 490 491 SPDK_ENV_FOREACH_CORE(i) { 492 core = &cores_info[i]; 493 for (j = 0; j < core->threads_count; j++) { 494 lw_thread = core->threads[j]; 495 if (lw_thread->lcore != lw_thread->new_lcore) { 496 _spdk_lw_thread_set_core(lw_thread, lw_thread->new_lcore); 497 } 498 } 499 } 500 } 501 502 static void 503 _reactors_scheduler_fini(void *arg1, void *arg2) 504 { 505 struct spdk_reactor *reactor; 506 uint32_t last_core; 507 uint32_t i; 508 509 if (g_reactor_state == SPDK_REACTOR_STATE_RUNNING) { 510 last_core = spdk_env_get_last_core(); 511 g_scheduler->balance(g_core_infos, last_core + 1, &g_governor); 512 513 /* Reschedule based on the balancing output */ 514 _threads_reschedule(g_core_infos); 515 516 SPDK_ENV_FOREACH_CORE(i) { 517 reactor = spdk_reactor_get(i); 518 reactor->flags.is_scheduling = false; 519 } 520 } 521 } 522 523 static void 524 _reactors_scheduler_cancel(void *arg1, void *arg2) 525 { 526 struct spdk_reactor *reactor; 527 uint32_t i; 528 529 SPDK_ENV_FOREACH_CORE(i) { 530 reactor = spdk_reactor_get(i); 531 reactor->flags.is_scheduling = false; 532 } 533 } 534 535 /* Phase 1 of thread scheduling is to gather metrics on the existing threads */ 536 static void 537 _reactors_scheduler_gather_metrics(void *arg1, void *arg2) 538 { 539 struct spdk_scheduler_core_info *core_info; 540 struct spdk_lw_thread *lw_thread; 541 struct spdk_reactor *reactor; 542 struct spdk_event *evt; 543 uint32_t next_core; 544 uint32_t i; 545 546 reactor = spdk_reactor_get(spdk_env_get_current_core()); 547 reactor->flags.is_scheduling = true; 548 core_info = &g_core_infos[reactor->lcore]; 549 core_info->lcore = reactor->lcore; 550 core_info->core_idle_tsc = reactor->idle_tsc; 551 core_info->core_busy_tsc = reactor->busy_tsc; 552 553 SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); 554 555 free(core_info->threads); 556 core_info->threads = NULL; 557 558 i = 0; 559 560 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 561 _init_thread_stats(reactor, lw_thread); 562 i++; 563 } 564 565 core_info->threads_count = i; 566 567 if (core_info->threads_count > 0) { 568 core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *)); 569 if (core_info->threads == NULL) { 570 SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore); 571 572 /* Cancel this round of schedule work */ 573 evt = spdk_event_allocate(g_scheduling_reactor->lcore, _reactors_scheduler_cancel, NULL, NULL); 574 spdk_event_call(evt); 575 return; 576 } 577 578 i = 0; 579 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 580 core_info->threads[i] = lw_thread; 581 i++; 582 } 583 } 584 585 next_core = spdk_env_get_next_core(reactor->lcore); 586 if (next_core == UINT32_MAX) { 587 next_core = spdk_env_get_first_core(); 588 } 589 590 /* If we've looped back around to the scheduler thread, move to the next phase */ 591 if (next_core == g_scheduling_reactor->lcore) { 592 /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */ 593 evt = spdk_event_allocate(next_core, _reactors_scheduler_fini, NULL, NULL); 594 spdk_event_call(evt); 595 return; 596 } 597 598 evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL); 599 spdk_event_call(evt); 600 } 601 602 static int _reactor_schedule_thread(struct spdk_thread *thread); 603 static uint64_t g_rusage_period; 604 605 static bool 606 reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) 607 { 608 struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); 609 int efd; 610 611 if (spdk_unlikely(lw_thread->resched)) { 612 lw_thread->resched = false; 613 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 614 assert(reactor->thread_count > 0); 615 reactor->thread_count--; 616 617 if (reactor->interrupt_mode) { 618 efd = spdk_thread_get_interrupt_fd(thread); 619 spdk_fd_group_remove(reactor->fgrp, efd); 620 } 621 _reactor_schedule_thread(thread); 622 return true; 623 } 624 625 if (spdk_unlikely(spdk_thread_is_exited(thread) && 626 spdk_thread_is_idle(thread))) { 627 if (reactor->flags.is_scheduling == false) { 628 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 629 assert(reactor->thread_count > 0); 630 reactor->thread_count--; 631 632 if (reactor->interrupt_mode) { 633 efd = spdk_thread_get_interrupt_fd(thread); 634 spdk_fd_group_remove(reactor->fgrp, efd); 635 } 636 spdk_thread_destroy(thread); 637 return true; 638 } 639 } 640 641 return false; 642 } 643 644 static void 645 reactor_interrupt_run(struct spdk_reactor *reactor) 646 { 647 int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */ 648 649 spdk_fd_group_wait(reactor->fgrp, block_timeout); 650 651 /* TODO: add tsc records and g_framework_context_switch_monitor_enabled */ 652 } 653 654 static void 655 _reactor_run(struct spdk_reactor *reactor) 656 { 657 struct spdk_thread *thread; 658 struct spdk_lw_thread *lw_thread, *tmp; 659 uint64_t now; 660 int rc; 661 662 event_queue_run_batch(reactor); 663 664 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 665 thread = spdk_thread_get_from_ctx(lw_thread); 666 rc = spdk_thread_poll(thread, 0, reactor->tsc_last); 667 668 now = spdk_thread_get_last_tsc(thread); 669 if (rc == 0) { 670 reactor->idle_tsc += now - reactor->tsc_last; 671 } else if (rc > 0) { 672 reactor->busy_tsc += now - reactor->tsc_last; 673 } 674 reactor->tsc_last = now; 675 676 reactor_post_process_lw_thread(reactor, lw_thread); 677 } 678 679 if (g_framework_context_switch_monitor_enabled) { 680 if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { 681 get_rusage(reactor); 682 reactor->last_rusage = reactor->tsc_last; 683 } 684 } 685 } 686 687 static int 688 reactor_run(void *arg) 689 { 690 struct spdk_reactor *reactor = arg; 691 struct spdk_thread *thread; 692 struct spdk_lw_thread *lw_thread, *tmp; 693 char thread_name[32]; 694 uint64_t last_sched = 0; 695 696 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 697 698 /* Rename the POSIX thread because the reactor is tied to the POSIX 699 * thread in the SPDK event library. 700 */ 701 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 702 _set_thread_name(thread_name); 703 704 reactor->tsc_last = spdk_get_ticks(); 705 706 while (1) { 707 if (spdk_unlikely(reactor->interrupt_mode)) { 708 reactor_interrupt_run(reactor); 709 } else { 710 _reactor_run(reactor); 711 } 712 713 if (spdk_unlikely((reactor->tsc_last - last_sched) > g_scheduler_period && 714 reactor == g_scheduling_reactor && 715 !reactor->flags.is_scheduling)) { 716 if (spdk_unlikely(g_scheduler != g_new_scheduler)) { 717 if (g_scheduler->deinit != NULL) { 718 g_scheduler->deinit(&g_governor); 719 } 720 g_scheduler = g_new_scheduler; 721 } 722 723 if (spdk_unlikely(g_scheduler->balance != NULL)) { 724 last_sched = reactor->tsc_last; 725 _reactors_scheduler_gather_metrics(NULL, NULL); 726 } 727 } 728 729 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 730 break; 731 } 732 } 733 734 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 735 thread = spdk_thread_get_from_ctx(lw_thread); 736 spdk_set_thread(thread); 737 spdk_thread_exit(thread); 738 } 739 740 while (!TAILQ_EMPTY(&reactor->threads)) { 741 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 742 thread = spdk_thread_get_from_ctx(lw_thread); 743 spdk_set_thread(thread); 744 if (spdk_thread_is_exited(thread)) { 745 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 746 assert(reactor->thread_count > 0); 747 reactor->thread_count--; 748 if (reactor->interrupt_mode) { 749 int efd = spdk_thread_get_interrupt_fd(thread); 750 751 spdk_fd_group_remove(reactor->fgrp, efd); 752 } 753 spdk_thread_destroy(thread); 754 } else { 755 spdk_thread_poll(thread, 0, 0); 756 } 757 } 758 } 759 760 return 0; 761 } 762 763 int 764 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 765 { 766 int ret; 767 const struct spdk_cpuset *validmask; 768 769 ret = spdk_cpuset_parse(cpumask, mask); 770 if (ret < 0) { 771 return ret; 772 } 773 774 validmask = spdk_app_get_core_mask(); 775 spdk_cpuset_and(cpumask, validmask); 776 777 return 0; 778 } 779 780 const struct spdk_cpuset * 781 spdk_app_get_core_mask(void) 782 { 783 return &g_reactor_core_mask; 784 } 785 786 void 787 spdk_reactors_start(void) 788 { 789 struct spdk_reactor *reactor; 790 struct spdk_cpuset tmp_cpumask = {}; 791 uint32_t i, current_core; 792 int rc; 793 char thread_name[32]; 794 795 g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; 796 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 797 798 current_core = spdk_env_get_current_core(); 799 SPDK_ENV_FOREACH_CORE(i) { 800 if (i != current_core) { 801 reactor = spdk_reactor_get(i); 802 if (reactor == NULL) { 803 continue; 804 } 805 806 rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); 807 if (rc < 0) { 808 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 809 assert(false); 810 return; 811 } 812 813 /* For now, for each reactor spawn one thread. */ 814 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 815 816 spdk_cpuset_zero(&tmp_cpumask); 817 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 818 819 spdk_thread_create(thread_name, &tmp_cpumask); 820 } 821 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 822 } 823 824 /* Start the master reactor */ 825 reactor = spdk_reactor_get(current_core); 826 assert(reactor != NULL); 827 g_scheduling_reactor = reactor; 828 reactor_run(reactor); 829 830 spdk_env_thread_wait_all(); 831 832 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 833 } 834 835 void 836 spdk_reactors_stop(void *arg1) 837 { 838 uint32_t i; 839 int rc; 840 struct spdk_reactor *reactor; 841 uint64_t notify = 1; 842 843 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 844 845 if (spdk_interrupt_mode_is_enabled()) { 846 SPDK_ENV_FOREACH_CORE(i) { 847 reactor = spdk_reactor_get(i); 848 849 rc = write(reactor->events_fd, ¬ify, sizeof(notify)); 850 if (rc < 0) { 851 SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno)); 852 continue; 853 } 854 } 855 } 856 } 857 858 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 859 static uint32_t g_next_core = UINT32_MAX; 860 861 static int 862 thread_process_interrupts(void *arg) 863 { 864 struct spdk_thread *thread = arg; 865 866 return spdk_thread_poll(thread, 0, 0); 867 } 868 869 static void 870 _schedule_thread(void *arg1, void *arg2) 871 { 872 struct spdk_lw_thread *lw_thread = arg1; 873 struct spdk_reactor *reactor; 874 uint32_t current_core; 875 int efd; 876 877 current_core = spdk_env_get_current_core(); 878 879 reactor = spdk_reactor_get(current_core); 880 assert(reactor != NULL); 881 882 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 883 reactor->thread_count++; 884 885 if (reactor->interrupt_mode) { 886 int rc; 887 struct spdk_thread *thread; 888 889 thread = spdk_thread_get_from_ctx(lw_thread); 890 efd = spdk_thread_get_interrupt_fd(thread); 891 rc = spdk_fd_group_add(reactor->fgrp, efd, thread_process_interrupts, thread); 892 if (rc < 0) { 893 SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc)); 894 } 895 } 896 } 897 898 static int 899 _reactor_schedule_thread(struct spdk_thread *thread) 900 { 901 uint32_t core; 902 struct spdk_lw_thread *lw_thread; 903 struct spdk_event *evt = NULL; 904 struct spdk_cpuset *cpumask; 905 uint32_t i; 906 907 cpumask = spdk_thread_get_cpumask(thread); 908 909 lw_thread = spdk_thread_get_ctx(thread); 910 assert(lw_thread != NULL); 911 core = lw_thread->lcore; 912 memset(lw_thread, 0, sizeof(*lw_thread)); 913 914 pthread_mutex_lock(&g_scheduler_mtx); 915 if (core == SPDK_ENV_LCORE_ID_ANY) { 916 for (i = 0; i < spdk_env_get_core_count(); i++) { 917 if (g_next_core > spdk_env_get_last_core()) { 918 g_next_core = spdk_env_get_first_core(); 919 } 920 core = g_next_core; 921 g_next_core = spdk_env_get_next_core(g_next_core); 922 923 if (spdk_cpuset_get_cpu(cpumask, core)) { 924 break; 925 } 926 } 927 } 928 929 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 930 931 pthread_mutex_unlock(&g_scheduler_mtx); 932 933 assert(evt != NULL); 934 if (evt == NULL) { 935 SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); 936 return -1; 937 } 938 939 lw_thread->tsc_start = spdk_get_ticks(); 940 941 spdk_event_call(evt); 942 943 return 0; 944 } 945 946 static void 947 _reactor_request_thread_reschedule(struct spdk_thread *thread) 948 { 949 struct spdk_lw_thread *lw_thread; 950 struct spdk_reactor *reactor; 951 uint32_t current_core; 952 953 assert(thread == spdk_get_thread()); 954 955 lw_thread = spdk_thread_get_ctx(thread); 956 957 _spdk_lw_thread_set_core(lw_thread, SPDK_ENV_LCORE_ID_ANY); 958 959 current_core = spdk_env_get_current_core(); 960 reactor = spdk_reactor_get(current_core); 961 assert(reactor != NULL); 962 if (reactor->interrupt_mode) { 963 uint64_t notify = 1; 964 965 if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 966 SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); 967 } 968 } 969 } 970 971 static int 972 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) 973 { 974 struct spdk_lw_thread *lw_thread; 975 976 switch (op) { 977 case SPDK_THREAD_OP_NEW: 978 lw_thread = spdk_thread_get_ctx(thread); 979 lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; 980 return _reactor_schedule_thread(thread); 981 case SPDK_THREAD_OP_RESCHED: 982 _reactor_request_thread_reschedule(thread); 983 return 0; 984 default: 985 return -ENOTSUP; 986 } 987 } 988 989 static bool 990 reactor_thread_op_supported(enum spdk_thread_op op) 991 { 992 switch (op) { 993 case SPDK_THREAD_OP_NEW: 994 case SPDK_THREAD_OP_RESCHED: 995 return true; 996 default: 997 return false; 998 } 999 } 1000 1001 struct call_reactor { 1002 uint32_t cur_core; 1003 spdk_event_fn fn; 1004 void *arg1; 1005 void *arg2; 1006 1007 uint32_t orig_core; 1008 spdk_event_fn cpl; 1009 }; 1010 1011 static void 1012 on_reactor(void *arg1, void *arg2) 1013 { 1014 struct call_reactor *cr = arg1; 1015 struct spdk_event *evt; 1016 1017 cr->fn(cr->arg1, cr->arg2); 1018 1019 cr->cur_core = spdk_env_get_next_core(cr->cur_core); 1020 1021 if (cr->cur_core > spdk_env_get_last_core()) { 1022 SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n"); 1023 1024 evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); 1025 free(cr); 1026 } else { 1027 SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n", 1028 cr->cur_core); 1029 1030 evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); 1031 } 1032 assert(evt != NULL); 1033 spdk_event_call(evt); 1034 } 1035 1036 void 1037 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) 1038 { 1039 struct call_reactor *cr; 1040 struct spdk_event *evt; 1041 1042 cr = calloc(1, sizeof(*cr)); 1043 if (!cr) { 1044 SPDK_ERRLOG("Unable to perform reactor iteration\n"); 1045 cpl(arg1, arg2); 1046 return; 1047 } 1048 1049 cr->fn = fn; 1050 cr->arg1 = arg1; 1051 cr->arg2 = arg2; 1052 cr->cpl = cpl; 1053 cr->orig_core = spdk_env_get_current_core(); 1054 cr->cur_core = spdk_env_get_first_core(); 1055 1056 SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core); 1057 1058 evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL); 1059 assert(evt != NULL); 1060 1061 spdk_event_call(evt); 1062 } 1063 1064 #ifdef __linux__ 1065 static int 1066 reactor_schedule_thread_event(void *arg) 1067 { 1068 struct spdk_reactor *reactor = arg; 1069 struct spdk_lw_thread *lw_thread, *tmp; 1070 uint32_t count = 0; 1071 uint64_t notify = 1; 1072 1073 assert(reactor->interrupt_mode); 1074 1075 if (read(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { 1076 SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno)); 1077 return -errno; 1078 } 1079 1080 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 1081 count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0; 1082 } 1083 1084 return count; 1085 } 1086 1087 static int 1088 reactor_interrupt_init(struct spdk_reactor *reactor) 1089 { 1090 int rc; 1091 1092 rc = spdk_fd_group_create(&reactor->fgrp); 1093 if (rc != 0) { 1094 return rc; 1095 } 1096 1097 reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1098 if (reactor->resched_fd < 0) { 1099 rc = -EBADF; 1100 goto err; 1101 } 1102 1103 rc = spdk_fd_group_add(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event, 1104 reactor); 1105 if (rc) { 1106 close(reactor->resched_fd); 1107 goto err; 1108 } 1109 1110 reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 1111 if (reactor->events_fd < 0) { 1112 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1113 close(reactor->resched_fd); 1114 1115 rc = -EBADF; 1116 goto err; 1117 } 1118 1119 rc = spdk_fd_group_add(reactor->fgrp, reactor->events_fd, 1120 (spdk_fd_fn)event_queue_run_batch, reactor); 1121 if (rc) { 1122 spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); 1123 close(reactor->resched_fd); 1124 close(reactor->events_fd); 1125 goto err; 1126 } 1127 1128 reactor->interrupt_mode = true; 1129 return 0; 1130 1131 err: 1132 spdk_fd_group_destroy(reactor->fgrp); 1133 return rc; 1134 } 1135 #else 1136 static int 1137 reactor_interrupt_init(struct spdk_reactor *reactor) 1138 { 1139 return -ENOTSUP; 1140 } 1141 #endif 1142 1143 static void 1144 reactor_interrupt_fini(struct spdk_reactor *reactor) 1145 { 1146 struct spdk_fd_group *fgrp = reactor->fgrp; 1147 1148 if (!fgrp) { 1149 return; 1150 } 1151 1152 spdk_fd_group_remove(fgrp, reactor->events_fd); 1153 spdk_fd_group_remove(fgrp, reactor->resched_fd); 1154 1155 close(reactor->events_fd); 1156 close(reactor->resched_fd); 1157 1158 spdk_fd_group_destroy(fgrp); 1159 reactor->fgrp = NULL; 1160 } 1161 1162 void 1163 _spdk_lw_thread_set_core(struct spdk_lw_thread *thread, uint32_t lcore) 1164 { 1165 assert(thread != NULL); 1166 thread->lcore = lcore; 1167 thread->resched = true; 1168 } 1169 1170 void 1171 _spdk_lw_thread_get_current_stats(struct spdk_lw_thread *thread, struct spdk_thread_stats *stats) 1172 { 1173 assert(thread != NULL); 1174 *stats = thread->current_stats; 1175 } 1176 1177 static int 1178 _governor_get_capabilities(uint32_t lcore_id, struct spdk_governor_capabilities *capabilities) 1179 { 1180 capabilities->freq_change = false; 1181 capabilities->freq_getset = false; 1182 capabilities->freq_up = false; 1183 capabilities->freq_down = false; 1184 capabilities->freq_max = false; 1185 capabilities->freq_min = false; 1186 capabilities->turbo_set = false; 1187 capabilities->priority = false; 1188 capabilities->turbo_available = false; 1189 1190 return 0; 1191 } 1192 1193 static struct spdk_governor * 1194 _governor_find(char *name) 1195 { 1196 struct spdk_governor *governor, *tmp; 1197 1198 TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) { 1199 if (strcmp(name, governor->name) == 0) { 1200 return governor; 1201 } 1202 } 1203 1204 return NULL; 1205 } 1206 1207 int 1208 _spdk_governor_set(char *name) 1209 { 1210 struct spdk_governor *governor; 1211 uint32_t i; 1212 int rc; 1213 1214 governor = _governor_find(name); 1215 if (governor == NULL) { 1216 return -EINVAL; 1217 } 1218 1219 g_governor = *governor; 1220 1221 if (g_governor.init) { 1222 rc = g_governor.init(); 1223 if (rc != 0) { 1224 return rc; 1225 } 1226 } 1227 1228 SPDK_ENV_FOREACH_CORE(i) { 1229 if (g_governor.init_core) { 1230 rc = g_governor.init_core(i); 1231 if (rc != 0) { 1232 return rc; 1233 } 1234 } 1235 } 1236 1237 return 0; 1238 } 1239 1240 void 1241 _spdk_governor_list_add(struct spdk_governor *governor) 1242 { 1243 if (_governor_find(governor->name)) { 1244 SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name); 1245 assert(false); 1246 return; 1247 } 1248 1249 TAILQ_INSERT_TAIL(&g_governor_list, governor, link); 1250 } 1251 1252 SPDK_LOG_REGISTER_COMPONENT(reactor) 1253