1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/likely.h" 36 37 #include "spdk_internal/event.h" 38 #include "spdk_internal/log.h" 39 #include "spdk_internal/thread.h" 40 41 #include "spdk/log.h" 42 #include "spdk/thread.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 46 #ifdef __linux__ 47 #include <sys/prctl.h> 48 #endif 49 50 #ifdef __FreeBSD__ 51 #include <pthread_np.h> 52 #endif 53 54 #define SPDK_EVENT_BATCH_SIZE 8 55 56 static struct spdk_reactor *g_reactors; 57 static struct spdk_cpuset g_reactor_core_mask; 58 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; 59 60 static bool g_framework_context_switch_monitor_enabled = true; 61 62 static struct spdk_mempool *g_spdk_event_mempool = NULL; 63 64 static void 65 reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 66 { 67 reactor->lcore = lcore; 68 reactor->flags.is_valid = true; 69 70 TAILQ_INIT(&reactor->threads); 71 reactor->thread_count = 0; 72 73 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 74 assert(reactor->events != NULL); 75 } 76 77 struct spdk_reactor * 78 spdk_reactor_get(uint32_t lcore) 79 { 80 struct spdk_reactor *reactor; 81 82 if (g_reactors == NULL) { 83 SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); 84 return NULL; 85 } 86 87 reactor = &g_reactors[lcore]; 88 89 if (reactor->flags.is_valid == false) { 90 return NULL; 91 } 92 93 return reactor; 94 } 95 96 static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); 97 static bool reactor_thread_op_supported(enum spdk_thread_op op); 98 99 int 100 spdk_reactors_init(void) 101 { 102 int rc; 103 uint32_t i, last_core; 104 char mempool_name[32]; 105 106 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 107 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 108 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 109 sizeof(struct spdk_event), 110 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 111 SPDK_ENV_SOCKET_ID_ANY); 112 113 if (g_spdk_event_mempool == NULL) { 114 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 115 return -1; 116 } 117 118 /* struct spdk_reactor must be aligned on 64 byte boundary */ 119 last_core = spdk_env_get_last_core(); 120 rc = posix_memalign((void **)&g_reactors, 64, 121 (last_core + 1) * sizeof(struct spdk_reactor)); 122 if (rc != 0) { 123 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 124 last_core + 1); 125 spdk_mempool_free(g_spdk_event_mempool); 126 return -1; 127 } 128 129 memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor)); 130 131 spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, 132 sizeof(struct spdk_lw_thread)); 133 134 SPDK_ENV_FOREACH_CORE(i) { 135 reactor_construct(&g_reactors[i], i); 136 } 137 138 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 139 140 return 0; 141 } 142 143 void 144 spdk_reactors_fini(void) 145 { 146 uint32_t i; 147 struct spdk_reactor *reactor; 148 149 if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { 150 return; 151 } 152 153 spdk_thread_lib_fini(); 154 155 SPDK_ENV_FOREACH_CORE(i) { 156 reactor = spdk_reactor_get(i); 157 assert(reactor != NULL); 158 assert(reactor->thread_count == 0); 159 if (reactor->events != NULL) { 160 spdk_ring_free(reactor->events); 161 } 162 } 163 164 spdk_mempool_free(g_spdk_event_mempool); 165 166 free(g_reactors); 167 g_reactors = NULL; 168 } 169 170 struct spdk_event * 171 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 172 { 173 struct spdk_event *event = NULL; 174 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 175 176 if (!reactor) { 177 assert(false); 178 return NULL; 179 } 180 181 event = spdk_mempool_get(g_spdk_event_mempool); 182 if (event == NULL) { 183 assert(false); 184 return NULL; 185 } 186 187 event->lcore = lcore; 188 event->fn = fn; 189 event->arg1 = arg1; 190 event->arg2 = arg2; 191 192 return event; 193 } 194 195 void 196 spdk_event_call(struct spdk_event *event) 197 { 198 int rc; 199 struct spdk_reactor *reactor; 200 201 reactor = spdk_reactor_get(event->lcore); 202 203 assert(reactor != NULL); 204 assert(reactor->events != NULL); 205 206 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); 207 if (rc != 1) { 208 assert(false); 209 } 210 } 211 212 static inline uint32_t 213 event_queue_run_batch(struct spdk_reactor *reactor) 214 { 215 unsigned count, i; 216 void *events[SPDK_EVENT_BATCH_SIZE]; 217 struct spdk_thread *thread; 218 struct spdk_lw_thread *lw_thread; 219 220 #ifdef DEBUG 221 /* 222 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 223 * so we will never actually read uninitialized data from events, but just to be sure 224 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 225 */ 226 memset(events, 0, sizeof(events)); 227 #endif 228 229 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 230 if (count == 0) { 231 return 0; 232 } 233 234 /* Execute the events. There are still some remaining events 235 * that must occur on an SPDK thread. To accomodate those, try to 236 * run them on the first thread in the list, if it exists. */ 237 lw_thread = TAILQ_FIRST(&reactor->threads); 238 if (lw_thread) { 239 thread = spdk_thread_get_from_ctx(lw_thread); 240 } else { 241 thread = NULL; 242 } 243 244 spdk_set_thread(thread); 245 246 for (i = 0; i < count; i++) { 247 struct spdk_event *event = events[i]; 248 249 assert(event != NULL); 250 event->fn(event->arg1, event->arg2); 251 } 252 253 spdk_set_thread(NULL); 254 255 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 256 257 return count; 258 } 259 260 /* 1s */ 261 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 262 263 static int 264 get_rusage(struct spdk_reactor *reactor) 265 { 266 struct rusage rusage; 267 268 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 269 return -1; 270 } 271 272 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 273 SPDK_INFOLOG(SPDK_LOG_REACTOR, 274 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 275 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 276 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 277 } 278 reactor->rusage = rusage; 279 280 return -1; 281 } 282 283 void 284 spdk_framework_enable_context_switch_monitor(bool enable) 285 { 286 /* This global is being read by multiple threads, so this isn't 287 * strictly thread safe. However, we're toggling between true and 288 * false here, and if a thread sees the value update later than it 289 * should, it's no big deal. */ 290 g_framework_context_switch_monitor_enabled = enable; 291 } 292 293 bool 294 spdk_framework_context_switch_monitor_enabled(void) 295 { 296 return g_framework_context_switch_monitor_enabled; 297 } 298 299 static void 300 _set_thread_name(const char *thread_name) 301 { 302 #if defined(__linux__) 303 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 304 #elif defined(__FreeBSD__) 305 pthread_set_name_np(pthread_self(), thread_name); 306 #else 307 #error missing platform support for thread name 308 #endif 309 } 310 311 static int _reactor_schedule_thread(struct spdk_thread *thread); 312 static uint64_t g_rusage_period; 313 314 static void 315 _reactor_run(struct spdk_reactor *reactor) 316 { 317 struct spdk_thread *thread; 318 struct spdk_lw_thread *lw_thread, *tmp; 319 uint64_t now; 320 int rc; 321 322 event_queue_run_batch(reactor); 323 324 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 325 thread = spdk_thread_get_from_ctx(lw_thread); 326 rc = spdk_thread_poll(thread, 0, reactor->tsc_last); 327 328 now = spdk_thread_get_last_tsc(thread); 329 if (rc == 0) { 330 reactor->idle_tsc += now - reactor->tsc_last; 331 } else if (rc > 0) { 332 reactor->busy_tsc += now - reactor->tsc_last; 333 } 334 reactor->tsc_last = now; 335 336 if (spdk_unlikely(lw_thread->resched)) { 337 lw_thread->resched = false; 338 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 339 assert(reactor->thread_count > 0); 340 reactor->thread_count--; 341 _reactor_schedule_thread(thread); 342 continue; 343 } 344 345 if (spdk_unlikely(spdk_thread_is_exited(thread) && 346 spdk_thread_is_idle(thread))) { 347 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 348 assert(reactor->thread_count > 0); 349 reactor->thread_count--; 350 spdk_thread_destroy(thread); 351 continue; 352 } 353 } 354 355 if (g_framework_context_switch_monitor_enabled) { 356 if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { 357 get_rusage(reactor); 358 reactor->last_rusage = reactor->tsc_last; 359 } 360 } 361 } 362 363 static int 364 reactor_run(void *arg) 365 { 366 struct spdk_reactor *reactor = arg; 367 struct spdk_thread *thread; 368 struct spdk_lw_thread *lw_thread, *tmp; 369 char thread_name[32]; 370 371 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 372 373 /* Rename the POSIX thread because the reactor is tied to the POSIX 374 * thread in the SPDK event library. 375 */ 376 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 377 _set_thread_name(thread_name); 378 379 reactor->tsc_last = spdk_get_ticks(); 380 381 while (1) { 382 _reactor_run(reactor); 383 384 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 385 break; 386 } 387 } 388 389 TAILQ_FOREACH(lw_thread, &reactor->threads, link) { 390 thread = spdk_thread_get_from_ctx(lw_thread); 391 spdk_set_thread(thread); 392 spdk_thread_exit(thread); 393 } 394 395 while (!TAILQ_EMPTY(&reactor->threads)) { 396 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 397 thread = spdk_thread_get_from_ctx(lw_thread); 398 spdk_set_thread(thread); 399 if (spdk_thread_is_exited(thread)) { 400 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 401 assert(reactor->thread_count > 0); 402 reactor->thread_count--; 403 spdk_thread_destroy(thread); 404 } else { 405 spdk_thread_poll(thread, 0, 0); 406 } 407 } 408 } 409 410 return 0; 411 } 412 413 int 414 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 415 { 416 int ret; 417 const struct spdk_cpuset *validmask; 418 419 ret = spdk_cpuset_parse(cpumask, mask); 420 if (ret < 0) { 421 return ret; 422 } 423 424 validmask = spdk_app_get_core_mask(); 425 spdk_cpuset_and(cpumask, validmask); 426 427 return 0; 428 } 429 430 const struct spdk_cpuset * 431 spdk_app_get_core_mask(void) 432 { 433 return &g_reactor_core_mask; 434 } 435 436 void 437 spdk_reactors_start(void) 438 { 439 struct spdk_reactor *reactor; 440 struct spdk_cpuset tmp_cpumask = {}; 441 uint32_t i, current_core; 442 int rc; 443 char thread_name[32]; 444 445 g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; 446 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 447 448 current_core = spdk_env_get_current_core(); 449 SPDK_ENV_FOREACH_CORE(i) { 450 if (i != current_core) { 451 reactor = spdk_reactor_get(i); 452 if (reactor == NULL) { 453 continue; 454 } 455 456 rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); 457 if (rc < 0) { 458 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 459 assert(false); 460 return; 461 } 462 463 /* For now, for each reactor spawn one thread. */ 464 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 465 466 spdk_cpuset_zero(&tmp_cpumask); 467 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 468 469 spdk_thread_create(thread_name, &tmp_cpumask); 470 } 471 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 472 } 473 474 /* Start the master reactor */ 475 reactor = spdk_reactor_get(current_core); 476 assert(reactor != NULL); 477 reactor_run(reactor); 478 479 spdk_env_thread_wait_all(); 480 481 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 482 } 483 484 void 485 spdk_reactors_stop(void *arg1) 486 { 487 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 488 } 489 490 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 491 static uint32_t g_next_core = UINT32_MAX; 492 493 static void 494 _schedule_thread(void *arg1, void *arg2) 495 { 496 struct spdk_lw_thread *lw_thread = arg1; 497 struct spdk_reactor *reactor; 498 uint32_t current_core; 499 500 current_core = spdk_env_get_current_core(); 501 502 reactor = spdk_reactor_get(current_core); 503 assert(reactor != NULL); 504 505 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 506 reactor->thread_count++; 507 } 508 509 static int 510 _reactor_schedule_thread(struct spdk_thread *thread) 511 { 512 uint32_t core; 513 struct spdk_lw_thread *lw_thread; 514 struct spdk_event *evt = NULL; 515 struct spdk_cpuset *cpumask; 516 uint32_t i; 517 518 cpumask = spdk_thread_get_cpumask(thread); 519 520 lw_thread = spdk_thread_get_ctx(thread); 521 assert(lw_thread != NULL); 522 memset(lw_thread, 0, sizeof(*lw_thread)); 523 524 pthread_mutex_lock(&g_scheduler_mtx); 525 for (i = 0; i < spdk_env_get_core_count(); i++) { 526 if (g_next_core > spdk_env_get_last_core()) { 527 g_next_core = spdk_env_get_first_core(); 528 } 529 core = g_next_core; 530 g_next_core = spdk_env_get_next_core(g_next_core); 531 532 if (spdk_cpuset_get_cpu(cpumask, core)) { 533 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 534 break; 535 } 536 } 537 pthread_mutex_unlock(&g_scheduler_mtx); 538 539 assert(evt != NULL); 540 if (evt == NULL) { 541 SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); 542 return -1; 543 } 544 545 lw_thread->tsc_start = spdk_get_ticks(); 546 547 spdk_event_call(evt); 548 549 return 0; 550 } 551 552 static void 553 _reactor_request_thread_reschedule(struct spdk_thread *thread) 554 { 555 struct spdk_lw_thread *lw_thread; 556 557 assert(thread == spdk_get_thread()); 558 559 lw_thread = spdk_thread_get_ctx(thread); 560 561 assert(lw_thread != NULL); 562 563 lw_thread->resched = true; 564 } 565 566 static int 567 reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) 568 { 569 switch (op) { 570 case SPDK_THREAD_OP_NEW: 571 return _reactor_schedule_thread(thread); 572 case SPDK_THREAD_OP_RESCHED: 573 _reactor_request_thread_reschedule(thread); 574 return 0; 575 default: 576 return -ENOTSUP; 577 } 578 } 579 580 static bool 581 reactor_thread_op_supported(enum spdk_thread_op op) 582 { 583 switch (op) { 584 case SPDK_THREAD_OP_NEW: 585 case SPDK_THREAD_OP_RESCHED: 586 return true; 587 default: 588 return false; 589 } 590 } 591 592 struct call_reactor { 593 uint32_t cur_core; 594 spdk_event_fn fn; 595 void *arg1; 596 void *arg2; 597 598 uint32_t orig_core; 599 spdk_event_fn cpl; 600 }; 601 602 static void 603 on_reactor(void *arg1, void *arg2) 604 { 605 struct call_reactor *cr = arg1; 606 struct spdk_event *evt; 607 608 cr->fn(cr->arg1, cr->arg2); 609 610 cr->cur_core = spdk_env_get_next_core(cr->cur_core); 611 612 if (cr->cur_core > spdk_env_get_last_core()) { 613 SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Completed reactor iteration\n"); 614 615 evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); 616 free(cr); 617 } else { 618 SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Continuing reactor iteration to %d\n", 619 cr->cur_core); 620 621 evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); 622 } 623 assert(evt != NULL); 624 spdk_event_call(evt); 625 } 626 627 void 628 spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) 629 { 630 struct call_reactor *cr; 631 struct spdk_event *evt; 632 633 cr = calloc(1, sizeof(*cr)); 634 if (!cr) { 635 SPDK_ERRLOG("Unable to perform reactor iteration\n"); 636 cpl(arg1, arg2); 637 return; 638 } 639 640 cr->fn = fn; 641 cr->arg1 = arg1; 642 cr->arg2 = arg2; 643 cr->cpl = cpl; 644 cr->orig_core = spdk_env_get_current_core(); 645 cr->cur_core = spdk_env_get_first_core(); 646 647 SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Starting reactor iteration from %d\n", cr->orig_core); 648 649 evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL); 650 assert(evt != NULL); 651 652 spdk_event_call(evt); 653 } 654 655 SPDK_LOG_REGISTER_COMPONENT("reactor", SPDK_LOG_REACTOR) 656