1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2020 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk_internal/cunit.h" 9 #include "common/lib/test_env.c" 10 #include "event/reactor.c" 11 #include "spdk/thread.h" 12 #include "spdk_internal/thread.h" 13 #include "event/scheduler_static.c" 14 #include "../module/scheduler/dynamic/scheduler_dynamic.c" 15 16 static void 17 test_create_reactor(void) 18 { 19 struct spdk_reactor *reactor; 20 int rc; 21 22 /* See SPDK issue #3004. Seems like a bug with gcc + asan on Fedora 38, so we can't 23 * allocate g_reactors on the stack and need to explicitly used aligned allocation here. 24 */ 25 rc = posix_memalign((void **)&reactor, SPDK_CACHE_LINE_SIZE, sizeof(*reactor)); 26 SPDK_CU_ASSERT_FATAL(rc == 0); 27 28 g_reactors = reactor; 29 g_reactor_count = 1; 30 31 reactor_construct(reactor, 0); 32 33 CU_ASSERT(spdk_reactor_get(0) == reactor); 34 35 spdk_ring_free(reactor->events); 36 reactor_interrupt_fini(reactor); 37 free(reactor); 38 g_reactors = NULL; 39 } 40 41 static void 42 test_init_reactors(void) 43 { 44 uint32_t core; 45 46 MOCK_SET(spdk_env_get_current_core, 0); 47 48 allocate_cores(3); 49 50 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 51 52 CU_ASSERT(g_reactor_state == SPDK_REACTOR_STATE_INITIALIZED); 53 for (core = 0; core < 3; core++) { 54 CU_ASSERT(spdk_reactor_get(core) != NULL); 55 } 56 57 spdk_reactors_fini(); 58 59 free_cores(); 60 61 MOCK_CLEAR(spdk_env_get_current_core); 62 } 63 64 static void 65 ut_event_fn(void *arg1, void *arg2) 66 { 67 uint8_t *test1 = arg1; 68 uint8_t *test2 = arg2; 69 70 *test1 = 1; 71 *test2 = 0xFF; 72 } 73 74 static void 75 test_event_call(void) 76 { 77 uint8_t test1 = 0, test2 = 0; 78 struct spdk_event *evt; 79 struct spdk_reactor *reactor; 80 81 MOCK_SET(spdk_env_get_current_core, 0); 82 83 allocate_cores(1); 84 85 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 86 87 evt = spdk_event_allocate(0, ut_event_fn, &test1, &test2); 88 CU_ASSERT(evt != NULL); 89 90 MOCK_SET(spdk_env_get_current_core, 0); 91 92 spdk_event_call(evt); 93 94 reactor = spdk_reactor_get(0); 95 CU_ASSERT(reactor != NULL); 96 97 CU_ASSERT(event_queue_run_batch(reactor) == 1); 98 CU_ASSERT(test1 == 1); 99 CU_ASSERT(test2 == 0xFF); 100 101 MOCK_CLEAR(spdk_env_get_current_core); 102 103 spdk_reactors_fini(); 104 105 free_cores(); 106 107 MOCK_CLEAR(spdk_env_get_current_core); 108 } 109 110 static void 111 test_schedule_thread(void) 112 { 113 struct spdk_cpuset cpuset = {}; 114 struct spdk_thread *thread; 115 struct spdk_reactor *reactor; 116 struct spdk_lw_thread *lw_thread; 117 118 MOCK_SET(spdk_env_get_current_core, 0); 119 120 allocate_cores(5); 121 122 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 123 124 spdk_cpuset_set_cpu(&cpuset, 3, true); 125 g_next_core = 4; 126 127 MOCK_SET(spdk_env_get_current_core, 3); 128 129 /* _reactor_schedule_thread() will be called in spdk_thread_create() 130 * at its end because it is passed to SPDK thread library by 131 * spdk_thread_lib_init(). 132 */ 133 thread = spdk_thread_create(NULL, &cpuset); 134 CU_ASSERT(thread != NULL); 135 136 reactor = spdk_reactor_get(3); 137 CU_ASSERT(reactor != NULL); 138 139 CU_ASSERT(event_queue_run_batch(reactor) == 1); 140 141 MOCK_CLEAR(spdk_env_get_current_core); 142 143 lw_thread = TAILQ_FIRST(&reactor->threads); 144 CU_ASSERT(lw_thread != NULL); 145 CU_ASSERT(spdk_thread_get_from_ctx(lw_thread) == thread); 146 147 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 148 reactor->thread_count--; 149 spdk_set_thread(thread); 150 spdk_thread_exit(thread); 151 while (!spdk_thread_is_exited(thread)) { 152 spdk_thread_poll(thread, 0, 0); 153 } 154 spdk_thread_destroy(thread); 155 spdk_set_thread(NULL); 156 157 spdk_reactors_fini(); 158 159 free_cores(); 160 } 161 162 static void 163 test_reschedule_thread(void) 164 { 165 struct spdk_cpuset cpuset = {}; 166 struct spdk_thread *thread; 167 struct spdk_reactor *reactor; 168 struct spdk_lw_thread *lw_thread; 169 170 MOCK_SET(spdk_env_get_current_core, 0); 171 172 allocate_cores(3); 173 174 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 175 176 spdk_cpuset_set_cpu(&g_reactor_core_mask, 0, true); 177 spdk_cpuset_set_cpu(&g_reactor_core_mask, 1, true); 178 spdk_cpuset_set_cpu(&g_reactor_core_mask, 2, true); 179 g_next_core = 0; 180 181 MOCK_SET(spdk_env_get_current_core, 1); 182 /* Create and schedule the thread to core 1. */ 183 spdk_cpuset_set_cpu(&cpuset, 1, true); 184 185 thread = spdk_thread_create(NULL, &cpuset); 186 CU_ASSERT(thread != NULL); 187 lw_thread = spdk_thread_get_ctx(thread); 188 189 reactor = spdk_reactor_get(1); 190 CU_ASSERT(reactor != NULL); 191 192 CU_ASSERT(event_queue_run_batch(reactor) == 1); 193 CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread); 194 195 spdk_set_thread(thread); 196 197 /* Call spdk_thread_set_cpumask() twice with different cpumask values. 198 * The cpumask of the 2nd call will be used in reschedule operation. 199 */ 200 201 spdk_cpuset_zero(&cpuset); 202 spdk_cpuset_set_cpu(&cpuset, 0, true); 203 CU_ASSERT(spdk_thread_set_cpumask(&cpuset) == 0); 204 205 spdk_cpuset_zero(&cpuset); 206 spdk_cpuset_set_cpu(&cpuset, 2, true); 207 CU_ASSERT(spdk_thread_set_cpumask(&cpuset) == 0); 208 209 CU_ASSERT(lw_thread->resched == true); 210 211 reactor_run(reactor); 212 213 CU_ASSERT(lw_thread->resched == false); 214 CU_ASSERT(TAILQ_EMPTY(&reactor->threads)); 215 216 spdk_set_thread(NULL); 217 218 reactor = spdk_reactor_get(0); 219 CU_ASSERT(reactor != NULL); 220 MOCK_SET(spdk_env_get_current_core, 0); 221 222 CU_ASSERT(event_queue_run_batch(reactor) == 0); 223 224 reactor = spdk_reactor_get(2); 225 CU_ASSERT(reactor != NULL); 226 MOCK_SET(spdk_env_get_current_core, 2); 227 228 CU_ASSERT(event_queue_run_batch(reactor) == 1); 229 230 CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread); 231 232 MOCK_CLEAR(spdk_env_get_current_core); 233 234 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 235 reactor->thread_count--; 236 spdk_set_thread(thread); 237 spdk_thread_exit(thread); 238 while (!spdk_thread_is_exited(thread)) { 239 spdk_thread_poll(thread, 0, 0); 240 } 241 spdk_thread_destroy(thread); 242 spdk_set_thread(NULL); 243 244 spdk_reactors_fini(); 245 246 free_cores(); 247 } 248 249 static void 250 for_each_reactor_done(void *arg1, void *arg2) 251 { 252 uint32_t *count = arg1; 253 bool *done = arg2; 254 255 (*count)++; 256 *done = true; 257 } 258 259 static void 260 for_each_reactor_cb(void *arg1, void *arg2) 261 { 262 uint32_t *count = arg1; 263 264 (*count)++; 265 } 266 267 static void 268 test_for_each_reactor(void) 269 { 270 uint32_t count = 0, i; 271 bool done = false; 272 struct spdk_reactor *reactor; 273 274 MOCK_SET(spdk_env_get_current_core, 0); 275 276 allocate_cores(5); 277 278 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 279 280 spdk_for_each_reactor(for_each_reactor_cb, &count, &done, for_each_reactor_done); 281 282 MOCK_CLEAR(spdk_env_get_current_core); 283 284 /* We have not processed any event yet, so count and done should be 0 and false, 285 * respectively. 286 */ 287 CU_ASSERT(count == 0); 288 289 /* Poll each reactor to verify the event is passed to each */ 290 for (i = 0; i < 5; i++) { 291 reactor = spdk_reactor_get(i); 292 CU_ASSERT(reactor != NULL); 293 MOCK_SET(spdk_env_get_current_core, i); 294 295 event_queue_run_batch(reactor); 296 CU_ASSERT(count == (i + 1)); 297 CU_ASSERT(done == false); 298 MOCK_CLEAR(spdk_env_get_current_core); 299 } 300 301 MOCK_SET(spdk_env_get_current_core, 0); 302 /* After each reactor is called, the completion calls it one more time. */ 303 reactor = spdk_reactor_get(0); 304 CU_ASSERT(reactor != NULL); 305 306 event_queue_run_batch(reactor); 307 CU_ASSERT(count == 6); 308 CU_ASSERT(done == true); 309 MOCK_CLEAR(spdk_env_get_current_core); 310 311 spdk_reactors_fini(); 312 313 free_cores(); 314 } 315 316 static int 317 poller_run_idle(void *ctx) 318 { 319 uint64_t delay_us = (uint64_t)ctx; 320 321 spdk_delay_us(delay_us); 322 323 return 0; 324 } 325 326 static int 327 poller_run_busy(void *ctx) 328 { 329 uint64_t delay_us = (uint64_t)ctx; 330 331 spdk_delay_us(delay_us); 332 333 return 1; 334 } 335 336 static void 337 test_reactor_stats(void) 338 { 339 struct spdk_cpuset cpuset = {}; 340 struct spdk_thread *thread1, *thread2; 341 struct spdk_reactor *reactor; 342 struct spdk_poller *busy1, *idle1, *busy2, *idle2; 343 struct spdk_thread_stats stats; 344 int rc __attribute__((unused)); 345 346 /* Test case is the following: 347 * Create a reactor on CPU core0. 348 * Create thread1 and thread2 simultaneously on reactor0 at TSC = 100. 349 * Reactor runs 350 * - thread1 for 100 with busy 351 * - thread2 for 200 with idle 352 * - thread1 for 300 with idle 353 * - thread2 for 400 with busy. 354 * Then, 355 * - both elapsed TSC of thread1 and thread2 should be 1100 (= 100 + 1000). 356 * - busy TSC of reactor should be 500 (= 100 + 400). 357 * - idle TSC of reactor should be 500 (= 200 + 300). 358 * 359 * After that reactor0 runs with no threads for 900 TSC. 360 * Create thread1 on reactor0 at TSC = 2000. 361 * Reactor runs 362 * - thread1 for 100 with busy 363 * Then, 364 * - elapsed TSC of thread1 should be 2100 (= 2000+ 100). 365 * - busy TSC of reactor should be 600 (= 500 + 100). 366 * - idle TSC of reactor should be 500 (= 500 + 900). 367 */ 368 369 MOCK_SET(spdk_env_get_current_core, 0); 370 371 allocate_cores(1); 372 373 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 374 375 spdk_cpuset_set_cpu(&cpuset, 0, true); 376 377 reactor = spdk_reactor_get(0); 378 SPDK_CU_ASSERT_FATAL(reactor != NULL); 379 380 /* First reactor_run() sets the tsc_last. */ 381 MOCK_SET(spdk_get_ticks, 100); 382 reactor->tsc_last = spdk_get_ticks(); 383 384 thread1 = spdk_thread_create(NULL, &cpuset); 385 SPDK_CU_ASSERT_FATAL(thread1 != NULL); 386 387 thread2 = spdk_thread_create(NULL, &cpuset); 388 SPDK_CU_ASSERT_FATAL(thread2 != NULL); 389 390 spdk_set_thread(thread1); 391 busy1 = spdk_poller_register(poller_run_busy, (void *)100, 0); 392 CU_ASSERT(busy1 != NULL); 393 394 spdk_set_thread(thread2); 395 idle2 = spdk_poller_register(poller_run_idle, (void *)300, 0); 396 CU_ASSERT(idle2 != NULL); 397 398 spdk_set_thread(NULL); 399 _reactor_run(reactor); 400 401 spdk_set_thread(thread1); 402 CU_ASSERT(spdk_thread_get_last_tsc(thread1) == 200); 403 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 404 CU_ASSERT(stats.busy_tsc == 100); 405 CU_ASSERT(stats.idle_tsc == 0); 406 spdk_set_thread(thread2); 407 CU_ASSERT(spdk_thread_get_last_tsc(thread2) == 500); 408 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 409 CU_ASSERT(stats.busy_tsc == 0); 410 CU_ASSERT(stats.idle_tsc == 300); 411 412 CU_ASSERT(reactor->busy_tsc == 100); 413 CU_ASSERT(reactor->idle_tsc == 300); 414 415 /* 100 + 100 + 300 = 500 ticks elapsed */ 416 CU_ASSERT(reactor->tsc_last == 500); 417 418 spdk_set_thread(thread1); 419 spdk_poller_unregister(&busy1); 420 idle1 = spdk_poller_register(poller_run_idle, (void *)200, 0); 421 CU_ASSERT(idle1 != NULL); 422 423 spdk_set_thread(thread2); 424 spdk_poller_unregister(&idle2); 425 busy2 = spdk_poller_register(poller_run_busy, (void *)400, 0); 426 CU_ASSERT(busy2 != NULL); 427 428 _reactor_run(reactor); 429 430 spdk_set_thread(thread1); 431 CU_ASSERT(spdk_thread_get_last_tsc(thread1) == 700); 432 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 433 CU_ASSERT(stats.busy_tsc == 100); 434 CU_ASSERT(stats.idle_tsc == 200); 435 spdk_set_thread(thread2); 436 CU_ASSERT(spdk_thread_get_last_tsc(thread2) == 1100); 437 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 438 CU_ASSERT(stats.busy_tsc == 400); 439 CU_ASSERT(stats.idle_tsc == 300); 440 441 CU_ASSERT(reactor->busy_tsc == 500); 442 CU_ASSERT(reactor->idle_tsc == 500); 443 444 /* 500 + 200 + 400 = 1100 ticks elapsed */ 445 CU_ASSERT(reactor->tsc_last == 1100); 446 447 spdk_set_thread(thread1); 448 spdk_poller_unregister(&idle1); 449 spdk_thread_exit(thread1); 450 451 spdk_set_thread(thread2); 452 spdk_poller_unregister(&busy2); 453 spdk_thread_exit(thread2); 454 455 _reactor_run(reactor); 456 457 /* After 900 ticks new thread is created. */ 458 /* 1100 + 900 = 2000 ticks elapsed */ 459 MOCK_SET(spdk_get_ticks, 2000); 460 _reactor_run(reactor); 461 CU_ASSERT(reactor->tsc_last == 2000); 462 463 thread1 = spdk_thread_create(NULL, &cpuset); 464 SPDK_CU_ASSERT_FATAL(thread1 != NULL); 465 466 spdk_set_thread(thread1); 467 busy1 = spdk_poller_register(poller_run_busy, (void *)100, 0); 468 CU_ASSERT(busy1 != NULL); 469 470 spdk_set_thread(NULL); 471 _reactor_run(reactor); 472 473 spdk_set_thread(thread1); 474 CU_ASSERT(spdk_thread_get_last_tsc(thread1) == 2100); 475 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 476 CU_ASSERT(stats.busy_tsc == 100); 477 CU_ASSERT(stats.idle_tsc == 0); 478 479 CU_ASSERT(reactor->busy_tsc == 600); 480 CU_ASSERT(reactor->idle_tsc == 1400); 481 482 /* 2000 + 100 = 2100 ticks elapsed */ 483 CU_ASSERT(reactor->tsc_last == 2100); 484 485 spdk_set_thread(thread1); 486 spdk_poller_unregister(&busy1); 487 spdk_thread_exit(thread1); 488 489 _reactor_run(reactor); 490 491 CU_ASSERT(TAILQ_EMPTY(&reactor->threads)); 492 493 /* No further than 2100 ticks elapsed */ 494 CU_ASSERT(reactor->tsc_last == 2100); 495 496 spdk_reactors_fini(); 497 498 free_cores(); 499 500 MOCK_CLEAR(spdk_env_get_current_core); 501 } 502 503 static uint32_t 504 _run_events_till_completion(uint32_t reactor_count) 505 { 506 struct spdk_reactor *reactor; 507 uint32_t i, events; 508 uint32_t total_events = 0; 509 510 do { 511 events = 0; 512 for (i = 0; i < reactor_count; i++) { 513 reactor = spdk_reactor_get(i); 514 CU_ASSERT(reactor != NULL); 515 MOCK_SET(spdk_env_get_current_core, i); 516 events += event_queue_run_batch(reactor); 517 518 /* Some events require scheduling core to run */ 519 MOCK_SET(spdk_env_get_current_core, g_scheduling_reactor->lcore); 520 events += event_queue_run_batch(g_scheduling_reactor); 521 522 MOCK_CLEAR(spdk_env_get_current_core); 523 } 524 total_events += events; 525 } while (events > 0); 526 527 return total_events; 528 } 529 530 static void 531 test_scheduler(void) 532 { 533 struct spdk_cpuset cpuset = {}; 534 struct spdk_thread *thread[3]; 535 struct spdk_reactor *reactor; 536 struct spdk_poller *busy, *idle; 537 uint64_t reactor_busy_tsc[3], reactor_idle_tsc[3]; 538 uint64_t thread_busy_tsc[3], thread_idle_tsc[3]; 539 uint64_t current_time, end_time, busy_time, idle_time; 540 struct spdk_thread_stats stats; 541 int i; 542 543 MOCK_SET(spdk_env_get_current_core, 0); 544 545 allocate_cores(3); 546 547 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 548 549 spdk_scheduler_set("dynamic"); 550 551 for (i = 0; i < 3; i++) { 552 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 553 } 554 g_next_core = 0; 555 556 /* Create threads. */ 557 for (i = 0; i < 3; i++) { 558 spdk_cpuset_zero(&cpuset); 559 spdk_cpuset_set_cpu(&cpuset, i, true); 560 thread[i] = spdk_thread_create(NULL, &cpuset); 561 CU_ASSERT(thread[i] != NULL); 562 thread_busy_tsc[i] = 0; 563 thread_idle_tsc[i] = 0; 564 } 565 566 for (i = 0; i < 3; i++) { 567 reactor = spdk_reactor_get(i); 568 CU_ASSERT(reactor != NULL); 569 MOCK_SET(spdk_env_get_current_core, i); 570 event_queue_run_batch(reactor); 571 CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); 572 reactor_busy_tsc[i] = 0; 573 reactor_idle_tsc[i] = 0; 574 } 575 576 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 577 578 MOCK_SET(spdk_env_get_current_core, 0); 579 580 /* Init threads stats (low load) */ 581 /* Each reactor starts at 100 tsc, 582 * ends at 100 + 100 = 200 tsc. */ 583 current_time = 100; 584 idle_time = 100; 585 busy_time = 0; 586 end_time = current_time + idle_time + busy_time; 587 for (i = 0; i < 3; i++) { 588 spdk_set_thread(thread[i]); 589 idle = spdk_poller_register(poller_run_idle, (void *)idle_time, 0); 590 reactor = spdk_reactor_get(i); 591 CU_ASSERT(reactor != NULL); 592 MOCK_SET(spdk_get_ticks, current_time); 593 reactor->tsc_last = spdk_get_ticks(); 594 _reactor_run(reactor); 595 CU_ASSERT(reactor->tsc_last == end_time); 596 spdk_poller_unregister(&idle); 597 598 CU_ASSERT(spdk_thread_get_last_tsc(thread[i]) == end_time); 599 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 600 CU_ASSERT(stats.busy_tsc == busy_time); 601 thread_busy_tsc[i] = stats.busy_tsc; 602 CU_ASSERT(stats.idle_tsc == idle_time); 603 thread_idle_tsc[i] = stats.idle_tsc; 604 CU_ASSERT(reactor->busy_tsc == busy_time); 605 reactor_busy_tsc[i] = reactor->busy_tsc; 606 CU_ASSERT(reactor->idle_tsc == idle_time); 607 reactor_idle_tsc[i] = reactor->idle_tsc; 608 } 609 CU_ASSERT(spdk_get_ticks() == end_time); 610 current_time = 200; 611 612 MOCK_SET(spdk_env_get_current_core, 0); 613 _reactors_scheduler_gather_metrics(NULL, NULL); 614 615 _run_events_till_completion(3); 616 MOCK_SET(spdk_env_get_current_core, 0); 617 618 /* Threads were idle, so all of them should be placed on core 0. 619 * All reactors start and end at 200 tsc, since for this iteration 620 * the threads have no pollers (so they consume no idle or busy tsc). 621 */ 622 for (i = 0; i < 3; i++) { 623 reactor = spdk_reactor_get(i); 624 CU_ASSERT(reactor != NULL); 625 MOCK_SET(spdk_get_ticks, current_time); 626 _reactor_run(reactor); 627 CU_ASSERT(reactor->tsc_last == current_time); 628 CU_ASSERT(reactor->busy_tsc == reactor_busy_tsc[i]); 629 CU_ASSERT(reactor->idle_tsc == reactor_idle_tsc[i]); 630 spdk_set_thread(thread[i]); 631 CU_ASSERT(spdk_thread_get_last_tsc(thread[i]) == current_time); 632 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 633 CU_ASSERT(stats.busy_tsc == thread_busy_tsc[i]); 634 CU_ASSERT(stats.idle_tsc == thread_idle_tsc[i]); 635 } 636 CU_ASSERT(spdk_get_ticks() == current_time); 637 638 /* 2 threads should be scheduled to core 0 */ 639 reactor = spdk_reactor_get(0); 640 CU_ASSERT(reactor != NULL); 641 MOCK_SET(spdk_env_get_current_core, 0); 642 spdk_set_thread(NULL); 643 event_queue_run_batch(reactor); 644 645 reactor = spdk_reactor_get(0); 646 CU_ASSERT(reactor != NULL); 647 CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); 648 reactor = spdk_reactor_get(1); 649 CU_ASSERT(reactor != NULL); 650 CU_ASSERT(TAILQ_EMPTY(&reactor->threads)); 651 reactor = spdk_reactor_get(2); 652 CU_ASSERT(reactor != NULL); 653 CU_ASSERT(TAILQ_EMPTY(&reactor->threads)); 654 655 /* Make threads busy */ 656 reactor = spdk_reactor_get(0); 657 CU_ASSERT(reactor != NULL); 658 659 /* All threads run on single reactor, 660 * reactor 0 starts at 200 tsc, 661 * ending at 200 + (100 * 3) = 500 tsc. */ 662 MOCK_SET(spdk_get_ticks, current_time); 663 busy_time = 100; 664 idle_time = 0; 665 for (i = 0; i < 3; i++) { 666 spdk_set_thread(thread[i]); 667 busy = spdk_poller_register(poller_run_busy, (void *)busy_time, 0); 668 _reactor_run(reactor); 669 spdk_poller_unregister(&busy); 670 current_time += busy_time; 671 672 CU_ASSERT(reactor->tsc_last == current_time); 673 CU_ASSERT(spdk_thread_get_last_tsc(thread[i]) == current_time); 674 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 675 CU_ASSERT(stats.busy_tsc == thread_busy_tsc[i] + busy_time); 676 CU_ASSERT(stats.idle_tsc == thread_idle_tsc[i] + idle_time);; 677 } 678 CU_ASSERT(reactor->busy_tsc == reactor_busy_tsc[0] + 3 * busy_time); 679 CU_ASSERT(reactor->idle_tsc == reactor_idle_tsc[0] + 3 * idle_time); 680 CU_ASSERT(spdk_get_ticks() == current_time); 681 682 /* Run scheduler again, this time all threads are busy */ 683 MOCK_SET(spdk_env_get_current_core, 0); 684 _reactors_scheduler_gather_metrics(NULL, NULL); 685 686 _run_events_till_completion(3); 687 MOCK_SET(spdk_env_get_current_core, 0); 688 689 /* Threads were busy, 2 will stay on core 0, 1 will move to core 1 */ 690 for (i = 0; i < 3; i++) { 691 MOCK_SET(spdk_env_get_current_core, i); 692 reactor = spdk_reactor_get(i); 693 CU_ASSERT(reactor != NULL); 694 _reactor_run(reactor); 695 } 696 697 for (i = 0; i < 3; i++) { 698 reactor = spdk_reactor_get(i); 699 CU_ASSERT(reactor != NULL); 700 CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); 701 } 702 703 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 704 705 /* Destroy threads */ 706 for (i = 0; i < 3; i++) { 707 spdk_set_thread(thread[i]); 708 spdk_thread_exit(thread[i]); 709 } 710 for (i = 0; i < 3; i++) { 711 reactor = spdk_reactor_get(i); 712 CU_ASSERT(reactor != NULL); 713 reactor_run(reactor); 714 } 715 716 spdk_set_thread(NULL); 717 718 MOCK_CLEAR(spdk_env_get_current_core); 719 720 spdk_reactors_fini(); 721 722 free_cores(); 723 } 724 725 static void 726 test_bind_thread(void) 727 { 728 struct spdk_cpuset cpuset = {}; 729 struct spdk_thread *thread[3]; 730 struct spdk_reactor *reactor; 731 struct spdk_poller *idle; 732 uint64_t reactor_busy_tsc[3], reactor_idle_tsc[3]; 733 uint64_t thread_busy_tsc[3], thread_idle_tsc[3]; 734 uint64_t current_time, end_time, busy_time, idle_time; 735 struct spdk_thread_stats stats; 736 int i; 737 738 MOCK_SET(spdk_env_get_current_core, 0); 739 740 allocate_cores(3); 741 742 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 743 744 spdk_scheduler_set("dynamic"); 745 746 for (i = 0; i < 3; i++) { 747 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 748 } 749 g_next_core = 0; 750 751 /* Create threads. */ 752 for (i = 0; i < 3; i++) { 753 spdk_cpuset_zero(&cpuset); 754 spdk_cpuset_set_cpu(&cpuset, i, true); 755 thread[i] = spdk_thread_create(NULL, &cpuset); 756 CU_ASSERT(thread[i] != NULL); 757 thread_busy_tsc[i] = 0; 758 thread_idle_tsc[i] = 0; 759 } 760 761 for (i = 0; i < 3; i++) { 762 reactor = spdk_reactor_get(i); 763 CU_ASSERT(reactor != NULL); 764 MOCK_SET(spdk_env_get_current_core, i); 765 event_queue_run_batch(reactor); 766 CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); 767 reactor_busy_tsc[i] = 0; 768 reactor_idle_tsc[i] = 0; 769 } 770 771 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 772 773 MOCK_SET(spdk_env_get_current_core, 0); 774 775 /* Init threads stats (low load) */ 776 /* Each reactor starts at 100 tsc, 777 * ends at 100 + 100 = 200 tsc. */ 778 current_time = 100; 779 idle_time = 100; 780 busy_time = 0; 781 end_time = current_time + idle_time + busy_time; 782 for (i = 0; i < 3; i++) { 783 spdk_set_thread(thread[i]); 784 idle = spdk_poller_register(poller_run_idle, (void *)idle_time, 0); 785 reactor = spdk_reactor_get(i); 786 CU_ASSERT(reactor != NULL); 787 MOCK_SET(spdk_get_ticks, current_time); 788 reactor->tsc_last = spdk_get_ticks(); 789 _reactor_run(reactor); 790 CU_ASSERT(reactor->tsc_last == end_time); 791 spdk_poller_unregister(&idle); 792 793 CU_ASSERT(spdk_thread_get_last_tsc(thread[i]) == end_time); 794 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 795 CU_ASSERT(stats.busy_tsc == busy_time); 796 thread_busy_tsc[i] = stats.busy_tsc; 797 CU_ASSERT(stats.idle_tsc == idle_time); 798 thread_idle_tsc[i] = stats.idle_tsc; 799 CU_ASSERT(reactor->busy_tsc == busy_time); 800 reactor_busy_tsc[i] = reactor->busy_tsc; 801 CU_ASSERT(reactor->idle_tsc == idle_time); 802 reactor_idle_tsc[i] = reactor->idle_tsc; 803 } 804 CU_ASSERT(spdk_get_ticks() == end_time); 805 current_time = 200; 806 /* Bind thread 1 */ 807 spdk_thread_bind(thread[1], true); 808 CU_ASSERT(spdk_thread_is_bound(thread[1]) == true); 809 MOCK_SET(spdk_env_get_current_core, 0); 810 _reactors_scheduler_gather_metrics(NULL, NULL); 811 _run_events_till_completion(3); 812 MOCK_SET(spdk_env_get_current_core, 0); 813 814 /* Threads were idle, so all of them should be placed on core 0 except thread 1 815 * since it has been limited on core 1 816 * All reactors start and end at 200 tsc, since for this iteration 817 * the threads have no pollers (so they consume no idle or busy tsc). 818 */ 819 for (i = 0; i < 3; i++) { 820 reactor = spdk_reactor_get(i); 821 CU_ASSERT(reactor != NULL); 822 MOCK_SET(spdk_get_ticks, current_time); 823 _reactor_run(reactor); 824 CU_ASSERT(reactor->tsc_last == current_time); 825 CU_ASSERT(reactor->busy_tsc == reactor_busy_tsc[i]); 826 CU_ASSERT(reactor->idle_tsc == reactor_idle_tsc[i]); 827 spdk_set_thread(thread[i]); 828 CU_ASSERT(spdk_thread_get_last_tsc(thread[i]) == current_time); 829 CU_ASSERT(spdk_thread_get_stats(&stats) == 0); 830 CU_ASSERT(stats.busy_tsc == thread_busy_tsc[i]); 831 CU_ASSERT(stats.idle_tsc == thread_idle_tsc[i]); 832 } 833 CU_ASSERT(spdk_get_ticks() == current_time); 834 835 spdk_set_thread(NULL); 836 837 /* Thread on core 2 should be scheduled to core 0 */ 838 reactor = spdk_reactor_get(0); 839 CU_ASSERT(reactor != NULL); 840 MOCK_SET(spdk_env_get_current_core, 0); 841 event_queue_run_batch(reactor); 842 843 reactor = spdk_reactor_get(0); 844 CU_ASSERT(reactor != NULL); 845 CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); 846 /* Thread 1 has been limited and stiil on core 0 */ 847 reactor = spdk_reactor_get(1); 848 CU_ASSERT(reactor != NULL); 849 CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); 850 851 reactor = spdk_reactor_get(2); 852 CU_ASSERT(reactor != NULL); 853 CU_ASSERT(TAILQ_EMPTY(&reactor->threads)); 854 855 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 856 857 /* Destroy threads */ 858 for (i = 0; i < 3; i++) { 859 spdk_set_thread(thread[i]); 860 spdk_thread_exit(thread[i]); 861 } 862 for (i = 0; i < 3; i++) { 863 reactor = spdk_reactor_get(i); 864 CU_ASSERT(reactor != NULL); 865 reactor_run(reactor); 866 } 867 868 spdk_set_thread(NULL); 869 870 MOCK_CLEAR(spdk_env_get_current_core); 871 872 spdk_reactors_fini(); 873 874 free_cores(); 875 } 876 877 #ifndef __FreeBSD__ 878 uint8_t g_curr_freq; 879 880 static int 881 core_freq_up(uint32_t lcore) 882 { 883 if (g_curr_freq != UINT8_MAX) { 884 g_curr_freq++; 885 } 886 887 return 0; 888 } 889 890 static int 891 core_freq_down(uint32_t lcore) 892 { 893 if (g_curr_freq != 0) { 894 g_curr_freq--; 895 } 896 897 return 0; 898 } 899 900 static int 901 core_freq_max(uint32_t lcore) 902 { 903 g_curr_freq = UINT8_MAX; 904 905 return 0; 906 } 907 908 DEFINE_STUB(core_freq_min, int, (uint32_t lcore_id), 0); 909 DEFINE_STUB(core_caps, int, 910 (uint32_t lcore_id, struct spdk_governor_capabilities *capabilities), 0); 911 DEFINE_STUB(governor_init, int, (void), 0); 912 DEFINE_STUB_V(governor_deinit, (void)); 913 914 static struct spdk_governor governor = { 915 .name = "dpdk_governor", 916 .get_core_curr_freq = NULL, 917 .core_freq_up = core_freq_up, 918 .core_freq_down = core_freq_down, 919 .set_core_freq_max = core_freq_max, 920 .set_core_freq_min = core_freq_min, 921 .get_core_capabilities = core_caps, 922 .init = governor_init, 923 .deinit = governor_deinit, 924 }; 925 926 static void 927 test_governor(void) 928 { 929 struct spdk_cpuset cpuset = {}; 930 struct spdk_thread *thread[2]; 931 struct spdk_lw_thread *lw_thread; 932 struct spdk_reactor *reactor; 933 struct spdk_poller *busy, *idle; 934 uint8_t last_freq = 100; 935 int i; 936 937 MOCK_SET(spdk_env_get_current_core, 0); 938 939 g_curr_freq = last_freq; 940 spdk_governor_register(&governor); 941 942 allocate_cores(2); 943 944 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 945 946 spdk_scheduler_set("dynamic"); 947 spdk_governor_set("dpdk_governor"); 948 949 for (i = 0; i < 2; i++) { 950 spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); 951 } 952 953 /* Create threads. */ 954 for (i = 0; i < 2; i++) { 955 spdk_cpuset_zero(&cpuset); 956 spdk_cpuset_set_cpu(&cpuset, i, true); 957 thread[i] = spdk_thread_create(NULL, &cpuset); 958 CU_ASSERT(thread[i] != NULL); 959 } 960 961 for (i = 0; i < 2; i++) { 962 reactor = spdk_reactor_get(i); 963 CU_ASSERT(reactor != NULL); 964 MOCK_SET(spdk_env_get_current_core, i); 965 CU_ASSERT(event_queue_run_batch(reactor) == 1); 966 CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); 967 } 968 969 reactor = spdk_reactor_get(0); 970 CU_ASSERT(reactor != NULL); 971 MOCK_SET(spdk_env_get_current_core, 0); 972 973 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 974 975 /* TEST 1 */ 976 /* Init thread stats (low load) */ 977 MOCK_SET(spdk_get_ticks, 100); 978 reactor->tsc_last = 100; 979 980 for (i = 0; i < 2; i++) { 981 spdk_set_thread(thread[i]); 982 idle = spdk_poller_register(poller_run_idle, (void *)200, 0); 983 reactor = spdk_reactor_get(i); 984 CU_ASSERT(reactor != NULL); 985 MOCK_SET(spdk_env_get_current_core, i); 986 _reactor_run(reactor); 987 spdk_poller_unregister(&idle); 988 989 /* Update last stats so that we don't have to call scheduler twice */ 990 lw_thread = spdk_thread_get_ctx(thread[i]); 991 lw_thread->current_stats.idle_tsc = 1; 992 } 993 994 MOCK_SET(spdk_env_get_current_core, 0); 995 _reactors_scheduler_gather_metrics(NULL, NULL); 996 997 CU_ASSERT(_run_events_till_completion(2) == 2); 998 MOCK_SET(spdk_env_get_current_core, 0); 999 1000 /* Threads were idle, so all of them should be placed on core 0 */ 1001 for (i = 0; i < 2; i++) { 1002 reactor = spdk_reactor_get(i); 1003 CU_ASSERT(reactor != NULL); 1004 _reactor_run(reactor); 1005 } 1006 1007 /* 1 thread should be scheduled to core 0 */ 1008 reactor = spdk_reactor_get(0); 1009 CU_ASSERT(reactor != NULL); 1010 MOCK_SET(spdk_env_get_current_core, 0); 1011 CU_ASSERT(event_queue_run_batch(reactor) == 1); 1012 1013 /* Main core should be busy less than 50% time now - frequency should be lowered */ 1014 CU_ASSERT(g_curr_freq == last_freq - 1); 1015 1016 last_freq = g_curr_freq; 1017 1018 /* TEST 2 */ 1019 /* Make first threads busy - both threads will be still on core 0, but frequency will have to be raised */ 1020 spdk_set_thread(thread[0]); 1021 busy = spdk_poller_register(poller_run_busy, (void *)1000, 0); 1022 _reactor_run(reactor); 1023 spdk_poller_unregister(&busy); 1024 1025 spdk_set_thread(thread[1]); 1026 idle = spdk_poller_register(poller_run_idle, (void *)100, 0); 1027 _reactor_run(reactor); 1028 spdk_poller_unregister(&idle); 1029 1030 /* Run scheduler again */ 1031 MOCK_SET(spdk_env_get_current_core, 0); 1032 _reactors_scheduler_gather_metrics(NULL, NULL); 1033 1034 i = _run_events_till_completion(2); 1035 /* Six runs when interrupt mode is supported, two if not. */ 1036 CU_ASSERT(i == 7 || i == 3); 1037 MOCK_SET(spdk_env_get_current_core, 0); 1038 1039 /* Main core should be busy more than 50% time now - frequency should be raised */ 1040 CU_ASSERT(g_curr_freq == last_freq + 1); 1041 1042 /* TEST 3 */ 1043 /* Make second thread very busy so that it will be moved to second core */ 1044 spdk_set_thread(thread[1]); 1045 busy = spdk_poller_register(poller_run_busy, (void *)2000, 0); 1046 _reactor_run(reactor); 1047 spdk_poller_unregister(&busy); 1048 1049 /* Update first thread stats */ 1050 spdk_set_thread(thread[0]); 1051 idle = spdk_poller_register(poller_run_idle, (void *)100, 0); 1052 _reactor_run(reactor); 1053 spdk_poller_unregister(&idle); 1054 1055 /* Run scheduler again */ 1056 MOCK_SET(spdk_env_get_current_core, 0); 1057 _reactors_scheduler_gather_metrics(NULL, NULL); 1058 1059 i = _run_events_till_completion(2); 1060 /* Six runs when interrupt mode is supported, two if not. */ 1061 CU_ASSERT(i == 7 || i == 3); 1062 MOCK_SET(spdk_env_get_current_core, 0); 1063 1064 for (i = 0; i < 2; i++) { 1065 reactor = spdk_reactor_get(i); 1066 CU_ASSERT(reactor != NULL); 1067 _reactor_run(reactor); 1068 } 1069 1070 /* Main core frequency should be set to max when we have busy threads on other cores */ 1071 CU_ASSERT(g_curr_freq == UINT8_MAX); 1072 1073 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 1074 1075 /* Destroy threads */ 1076 for (i = 0; i < 2; i++) { 1077 spdk_set_thread(thread[i]); 1078 spdk_thread_exit(thread[i]); 1079 } 1080 for (i = 0; i < 2; i++) { 1081 reactor = spdk_reactor_get(i); 1082 CU_ASSERT(reactor != NULL); 1083 reactor_run(reactor); 1084 } 1085 1086 spdk_set_thread(NULL); 1087 1088 MOCK_CLEAR(spdk_env_get_current_core); 1089 1090 spdk_reactors_fini(); 1091 1092 free_cores(); 1093 } 1094 #endif 1095 1096 static void 1097 test_scheduler_set_isolated_core_mask(void) 1098 { 1099 struct spdk_cpuset isolated_core_mask = {}; 1100 MOCK_SET(spdk_env_get_current_core, 0); 1101 1102 allocate_cores(3); 1103 1104 CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0); 1105 1106 spdk_cpuset_set_cpu(&g_reactor_core_mask, 0, true); 1107 spdk_cpuset_set_cpu(&g_reactor_core_mask, 1, true); 1108 spdk_cpuset_set_cpu(&g_reactor_core_mask, 2, true); 1109 1110 spdk_cpuset_set_cpu(&isolated_core_mask, 1, true); 1111 spdk_cpuset_set_cpu(&isolated_core_mask, 2, true); 1112 CU_ASSERT(scheduler_set_isolated_core_mask(isolated_core_mask) == true); 1113 1114 spdk_cpuset_zero(&isolated_core_mask); 1115 1116 spdk_cpuset_set_cpu(&isolated_core_mask, 4, true); 1117 CU_ASSERT(scheduler_set_isolated_core_mask(isolated_core_mask) == false); 1118 1119 spdk_cpuset_zero(&isolated_core_mask); 1120 1121 spdk_cpuset_set_cpu(&isolated_core_mask, 0, true); 1122 spdk_cpuset_set_cpu(&isolated_core_mask, 1, true); 1123 spdk_cpuset_set_cpu(&isolated_core_mask, 4, true); 1124 CU_ASSERT(scheduler_set_isolated_core_mask(isolated_core_mask) == false); 1125 } 1126 1127 int 1128 main(int argc, char **argv) 1129 { 1130 CU_pSuite suite = NULL; 1131 unsigned int num_failures; 1132 1133 CU_initialize_registry(); 1134 1135 suite = CU_add_suite("app_suite", NULL, NULL); 1136 1137 CU_ADD_TEST(suite, test_create_reactor); 1138 CU_ADD_TEST(suite, test_init_reactors); 1139 CU_ADD_TEST(suite, test_event_call); 1140 CU_ADD_TEST(suite, test_schedule_thread); 1141 CU_ADD_TEST(suite, test_reschedule_thread); 1142 CU_ADD_TEST(suite, test_bind_thread); 1143 CU_ADD_TEST(suite, test_for_each_reactor); 1144 CU_ADD_TEST(suite, test_reactor_stats); 1145 CU_ADD_TEST(suite, test_scheduler); 1146 #ifndef __FreeBSD__ 1147 /* governor is only supported on Linux, so don't run this specific unit test on FreeBSD */ 1148 CU_ADD_TEST(suite, test_governor); 1149 #endif 1150 CU_ADD_TEST(suite, test_scheduler_set_isolated_core_mask); 1151 1152 num_failures = spdk_ut_run_tests(argc, argv, NULL); 1153 CU_cleanup_registry(); 1154 1155 return num_failures; 1156 } 1157