1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 * Copyright(c) 2019 Arm Limited 4 */ 5 6 7 #include <stdio.h> 8 #include <inttypes.h> 9 #include <rte_ring.h> 10 #include <rte_cycles.h> 11 #include <rte_launch.h> 12 #include <rte_pause.h> 13 #include <string.h> 14 15 #include "test.h" 16 #include "test_ring.h" 17 18 /* 19 * Ring performance test cases, measures performance of various operations 20 * using rdtsc for legacy and 16B size ring elements. 21 */ 22 23 #define RING_NAME "RING_PERF" 24 #define RING_SIZE 4096 25 #define MAX_BURST 256 26 27 /* 28 * the sizes to enqueue and dequeue in testing 29 * (marked volatile so they won't be seen as compile-time constants) 30 */ 31 static const volatile unsigned int bulk_sizes[] = { 8, 32, 64, 128, 256 }; 32 33 struct lcore_pair { 34 unsigned c1, c2; 35 }; 36 37 static volatile unsigned lcore_count = 0; 38 39 static void 40 test_ring_print_test_string(unsigned int api_type, int esize, 41 unsigned int bsz, double value) 42 { 43 if (esize == -1) 44 printf("legacy APIs"); 45 else 46 printf("elem APIs (size:%2dB)", esize); 47 48 if (api_type == TEST_RING_IGNORE_API_TYPE) 49 return; 50 51 if ((api_type & TEST_RING_THREAD_DEF) == TEST_RING_THREAD_DEF) 52 printf(" - default enqueue/dequeue"); 53 else if ((api_type & TEST_RING_THREAD_SPSC) == TEST_RING_THREAD_SPSC) 54 printf(" - SP/SC"); 55 else if ((api_type & TEST_RING_THREAD_MPMC) == TEST_RING_THREAD_MPMC) 56 printf(" - MP/MC"); 57 58 if ((api_type & TEST_RING_ELEM_SINGLE) == TEST_RING_ELEM_SINGLE) 59 printf(" - single - "); 60 else if ((api_type & TEST_RING_ELEM_BULK) == TEST_RING_ELEM_BULK) 61 printf(" - bulk (n:%-3u) - ", bsz); 62 else if ((api_type & TEST_RING_ELEM_BURST) == TEST_RING_ELEM_BURST) 63 printf(" - burst (n:%-3u) - ", bsz); 64 else if ((api_type & (TEST_RING_ELEM_BURST_ZC | 65 TEST_RING_ELEM_BURST_ZC_COMPRESS_PTR_16 | 66 TEST_RING_ELEM_BURST_ZC_COMPRESS_PTR_32)) != 0) 67 printf(" - burst zero copy (n:%-3u) - ", bsz); 68 69 printf("cycles per elem: %.3F\n", value); 70 } 71 72 /**** Functions to analyse our core mask to get cores for different tests ***/ 73 74 static int 75 get_two_hyperthreads(struct lcore_pair *lcp) 76 { 77 unsigned id1, id2; 78 unsigned c1, c2, s1, s2; 79 RTE_LCORE_FOREACH(id1) { 80 /* inner loop just re-reads all id's. We could skip the first few 81 * elements, but since number of cores is small there is little point 82 */ 83 RTE_LCORE_FOREACH(id2) { 84 if (id1 == id2) 85 continue; 86 87 c1 = rte_lcore_to_cpu_id(id1); 88 c2 = rte_lcore_to_cpu_id(id2); 89 s1 = rte_lcore_to_socket_id(id1); 90 s2 = rte_lcore_to_socket_id(id2); 91 if ((c1 == c2) && (s1 == s2)){ 92 lcp->c1 = id1; 93 lcp->c2 = id2; 94 return 0; 95 } 96 } 97 } 98 return 1; 99 } 100 101 static int 102 get_two_cores(struct lcore_pair *lcp) 103 { 104 unsigned id1, id2; 105 unsigned c1, c2, s1, s2; 106 RTE_LCORE_FOREACH(id1) { 107 RTE_LCORE_FOREACH(id2) { 108 if (id1 == id2) 109 continue; 110 111 c1 = rte_lcore_to_cpu_id(id1); 112 c2 = rte_lcore_to_cpu_id(id2); 113 s1 = rte_lcore_to_socket_id(id1); 114 s2 = rte_lcore_to_socket_id(id2); 115 if ((c1 != c2) && (s1 == s2)){ 116 lcp->c1 = id1; 117 lcp->c2 = id2; 118 return 0; 119 } 120 } 121 } 122 return 1; 123 } 124 125 static int 126 get_two_sockets(struct lcore_pair *lcp) 127 { 128 unsigned id1, id2; 129 unsigned s1, s2; 130 RTE_LCORE_FOREACH(id1) { 131 RTE_LCORE_FOREACH(id2) { 132 if (id1 == id2) 133 continue; 134 s1 = rte_lcore_to_socket_id(id1); 135 s2 = rte_lcore_to_socket_id(id2); 136 if (s1 != s2){ 137 lcp->c1 = id1; 138 lcp->c2 = id2; 139 return 0; 140 } 141 } 142 } 143 return 1; 144 } 145 146 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */ 147 static void 148 test_empty_dequeue(struct rte_ring *r, const int esize, 149 const unsigned int api_type) 150 { 151 const unsigned int iter_shift = 29; 152 const unsigned int iterations = 1 << iter_shift; 153 unsigned int i = 0; 154 void *burst[MAX_BURST]; 155 156 const unsigned int bulk_iterations = iterations / bulk_sizes[0]; 157 const uint64_t start = rte_rdtsc(); 158 for (i = 0; i < bulk_iterations; i++) 159 test_ring_dequeue(r, burst, esize, bulk_sizes[0], api_type); 160 const uint64_t end = rte_rdtsc(); 161 162 test_ring_print_test_string(api_type, esize, bulk_sizes[0], 163 ((double)end - start) / iterations); 164 } 165 166 /* describes the ring used by the enqueue and dequeue thread */ 167 struct ring_params { 168 struct rte_ring *r; 169 unsigned int elem_size; 170 unsigned int bulk_sizes_i; /* index into bulk_size array */ 171 unsigned int ring_flags; /* flags for test_ring_enqueue/dequeue */ 172 }; 173 174 /* Used to specify enqueue and dequeue ring operations and their results */ 175 struct thread_params { 176 struct ring_params *ring_params; 177 double *results; /* result array size must be equal to bulk_sizes */ 178 }; 179 180 /* 181 * Helper function to call bulk SP/MP enqueue functions. 182 * flag == 0 -> enqueue 183 * flag == 1 -> dequeue 184 */ 185 static __rte_always_inline int 186 enqueue_dequeue_bulk_helper(const unsigned int flag, struct thread_params *p) 187 { 188 int ret; 189 const unsigned int iter_shift = 22; 190 const unsigned int iterations = 1 << iter_shift; 191 unsigned int i; 192 void *burst = NULL; 193 unsigned int n_remaining; 194 const unsigned int bulk_n = bulk_sizes[p->ring_params->bulk_sizes_i]; 195 196 #ifdef RTE_USE_C11_MEM_MODEL 197 if (rte_atomic_fetch_add_explicit(&lcore_count, 1, rte_memory_order_relaxed) + 1 != 2) 198 #else 199 if (__sync_add_and_fetch(&lcore_count, 1) != 2) 200 #endif 201 while(lcore_count != 2) 202 rte_pause(); 203 204 burst = test_ring_calloc(MAX_BURST, p->ring_params->elem_size); 205 if (burst == NULL) 206 return -1; 207 208 const uint64_t sp_start = rte_rdtsc(); 209 const unsigned int bulk_iterations = iterations / bulk_n; 210 for (i = 0; i < bulk_iterations; i++) { 211 n_remaining = bulk_n; 212 do { 213 if (flag == 0) 214 ret = test_ring_enqueue(p->ring_params->r, 215 burst, 216 p->ring_params->elem_size, 217 n_remaining, 218 p->ring_params->ring_flags); 219 else if (flag == 1) 220 ret = test_ring_dequeue(p->ring_params->r, 221 burst, 222 p->ring_params->elem_size, 223 n_remaining, 224 p->ring_params->ring_flags); 225 if (ret == 0) 226 rte_pause(); 227 else 228 n_remaining -= ret; 229 } while (n_remaining > 0); 230 } 231 const uint64_t sp_end = rte_rdtsc(); 232 233 p->results[p->ring_params->bulk_sizes_i] = 234 ((double)sp_end - sp_start) / iterations; 235 236 return 0; 237 } 238 239 /* 240 * Function that uses rdtsc to measure timing for ring enqueue. Needs pair 241 * thread running dequeue_bulk function 242 */ 243 static int 244 enqueue_bulk(void *p) 245 { 246 struct thread_params *params = p; 247 248 return enqueue_dequeue_bulk_helper(0, params); 249 } 250 251 /* 252 * Function that uses rdtsc to measure timing for ring dequeue. Needs pair 253 * thread running enqueue_bulk function 254 */ 255 static int 256 dequeue_bulk(void *p) 257 { 258 struct thread_params *params = p; 259 260 return enqueue_dequeue_bulk_helper(1, params); 261 } 262 263 /* 264 * Function that calls the enqueue and dequeue bulk functions on pairs of cores. 265 * used to measure ring perf between hyperthreads, cores and sockets. 266 */ 267 static int 268 run_on_core_pair(struct lcore_pair *cores, 269 struct thread_params *param1, struct thread_params *param2) 270 { 271 unsigned i; 272 struct ring_params *ring_params = param1->ring_params; 273 274 for (i = 0; i < RTE_DIM(bulk_sizes); i++) { 275 lcore_count = 0; 276 ring_params->bulk_sizes_i = i; 277 if (cores->c1 == rte_get_main_lcore()) { 278 rte_eal_remote_launch(dequeue_bulk, param2, cores->c2); 279 enqueue_bulk(param1); 280 rte_eal_wait_lcore(cores->c2); 281 } else { 282 rte_eal_remote_launch(enqueue_bulk, param1, cores->c1); 283 rte_eal_remote_launch(dequeue_bulk, param2, cores->c2); 284 if (rte_eal_wait_lcore(cores->c1) < 0) 285 return -1; 286 if (rte_eal_wait_lcore(cores->c2) < 0) 287 return -1; 288 } 289 test_ring_print_test_string( 290 ring_params->ring_flags, 291 ring_params->elem_size, 292 bulk_sizes[i], 293 param1->results[i] + param2->results[i]); 294 } 295 296 return 0; 297 } 298 299 static RTE_ATOMIC(uint32_t) synchro; 300 static uint64_t queue_count[RTE_MAX_LCORE]; 301 302 #define TIME_MS 100 303 304 static int 305 load_loop_fn_helper(struct thread_params *p, const int esize) 306 { 307 uint64_t time_diff = 0; 308 uint64_t begin = 0; 309 uint64_t hz = rte_get_timer_hz(); 310 uint64_t lcount = 0; 311 const unsigned int lcore = rte_lcore_id(); 312 struct ring_params *ring_params = p->ring_params; 313 void *burst = NULL; 314 315 burst = test_ring_calloc(MAX_BURST, esize); 316 if (burst == NULL) 317 return -1; 318 319 /* wait synchro for workers */ 320 if (lcore != rte_get_main_lcore()) 321 rte_wait_until_equal_32((uint32_t *)(uintptr_t)&synchro, 1, 322 rte_memory_order_relaxed); 323 324 begin = rte_get_timer_cycles(); 325 while (time_diff < hz * TIME_MS / 1000) { 326 test_ring_enqueue(ring_params->r, burst, esize, 327 ring_params->elem_size, 328 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); 329 test_ring_dequeue(ring_params->r, burst, esize, 330 ring_params->elem_size, 331 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); 332 lcount++; 333 time_diff = rte_get_timer_cycles() - begin; 334 } 335 queue_count[lcore] = lcount; 336 337 rte_free(burst); 338 339 return 0; 340 } 341 342 static int 343 load_loop_fn(void *p) 344 { 345 struct thread_params *params = p; 346 347 return load_loop_fn_helper(params, -1); 348 } 349 350 static int 351 load_loop_fn_16B(void *p) 352 { 353 struct thread_params *params = p; 354 355 return load_loop_fn_helper(params, 16); 356 } 357 358 static int 359 run_on_all_cores(struct rte_ring *r, const int esize) 360 { 361 uint64_t total; 362 struct ring_params ring_params = {0}; 363 struct thread_params params = { .ring_params = &ring_params }; 364 lcore_function_t *lcore_f; 365 unsigned int i, c; 366 367 if (esize == -1) 368 lcore_f = load_loop_fn; 369 else 370 lcore_f = load_loop_fn_16B; 371 372 for (i = 0; i < RTE_DIM(bulk_sizes); i++) { 373 total = 0; 374 printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]); 375 params.ring_params->bulk_sizes_i = i; 376 params.ring_params->r = r; 377 378 /* clear synchro and start workers */ 379 rte_atomic_store_explicit(&synchro, 0, rte_memory_order_relaxed); 380 if (rte_eal_mp_remote_launch(lcore_f, ¶ms, SKIP_MAIN) < 0) 381 return -1; 382 383 /* start synchro and launch test on main */ 384 rte_atomic_store_explicit(&synchro, 1, rte_memory_order_relaxed); 385 lcore_f(¶ms); 386 387 rte_eal_mp_wait_lcore(); 388 389 RTE_LCORE_FOREACH(c) { 390 printf("Core [%u] count = %"PRIu64"\n", 391 c, queue_count[c]); 392 total += queue_count[c]; 393 } 394 395 printf("Total count (size: %u): %"PRIu64"\n", 396 bulk_sizes[i], total); 397 } 398 399 return 0; 400 } 401 402 /* 403 * Test function that determines how long an enqueue + dequeue of a single item 404 * takes on a single lcore. Result is for comparison with the bulk enq+deq. 405 */ 406 static int 407 test_single_enqueue_dequeue(struct rte_ring *r, const int esize, 408 const unsigned int api_type) 409 { 410 const unsigned int iter_shift = 24; 411 const unsigned int iterations = 1 << iter_shift; 412 unsigned int i = 0; 413 void *burst = NULL; 414 415 /* alloc dummy object pointers */ 416 burst = test_ring_calloc(1, esize); 417 if (burst == NULL) 418 return -1; 419 420 const uint64_t start = rte_rdtsc(); 421 for (i = 0; i < iterations; i++) { 422 test_ring_enqueue(r, burst, esize, 1, api_type); 423 test_ring_dequeue(r, burst, esize, 1, api_type); 424 } 425 const uint64_t end = rte_rdtsc(); 426 427 test_ring_print_test_string(api_type, esize, 1, 428 ((double)(end - start)) / iterations); 429 430 rte_free(burst); 431 432 return 0; 433 } 434 435 /* 436 * Test that does both enqueue and dequeue on a core using the burst/bulk API 437 * calls Results should be the same as for the bulk function called on a 438 * single lcore. 439 */ 440 static int 441 test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize, 442 const unsigned int api_type) 443 { 444 const unsigned int iter_shift = 26; 445 const unsigned int iterations = 1 << iter_shift; 446 unsigned int sz, i; 447 void **burst = NULL; 448 449 burst = test_ring_calloc(MAX_BURST, esize); 450 if (burst == NULL) 451 return -1; 452 453 for (sz = 0; sz < RTE_DIM(bulk_sizes); sz++) { 454 const unsigned int n = iterations / bulk_sizes[sz]; 455 const uint64_t start = rte_rdtsc(); 456 for (i = 0; i < n; i++) { 457 test_ring_enqueue(r, burst, esize, bulk_sizes[sz], 458 api_type); 459 test_ring_dequeue(r, burst, esize, bulk_sizes[sz], 460 api_type); 461 } 462 const uint64_t end = rte_rdtsc(); 463 464 test_ring_print_test_string(api_type, esize, bulk_sizes[sz], 465 ((double)end - start) / iterations); 466 } 467 468 rte_free(burst); 469 470 return 0; 471 } 472 473 static __rte_always_inline int 474 test_ring_perf_esize_run_on_two_cores( 475 struct thread_params *param1, struct thread_params *param2) 476 { 477 struct lcore_pair cores; 478 479 if (get_two_hyperthreads(&cores) == 0) { 480 printf("\n### Testing using two hyperthreads ###\n"); 481 if (run_on_core_pair(&cores, param1, param2) < 0) 482 return -1; 483 } 484 if (get_two_cores(&cores) == 0) { 485 printf("\n### Testing using two physical cores ###\n"); 486 if (run_on_core_pair(&cores, param1, param2) < 0) 487 return -1; 488 } 489 if (get_two_sockets(&cores) == 0) { 490 printf("\n### Testing using two NUMA nodes ###\n"); 491 if (run_on_core_pair(&cores, param1, param2) < 0) 492 return -1; 493 } 494 return 0; 495 } 496 497 /* Run all tests for a given element size */ 498 static __rte_always_inline int 499 test_ring_perf_esize(const int esize) 500 { 501 struct rte_ring *r = NULL; 502 double results_enq[RTE_DIM(bulk_sizes)]; 503 double results_deq[RTE_DIM(bulk_sizes)]; 504 struct ring_params ring_params = { 505 .elem_size = esize, .ring_flags = TEST_RING_ELEM_BULK }; 506 struct thread_params param1 = { 507 .ring_params = &ring_params, .results = results_enq }; 508 struct thread_params param2 = { 509 .ring_params = &ring_params, .results = results_deq }; 510 511 /* 512 * Performance test for legacy/_elem APIs 513 * SP-SC/MP-MC, single 514 */ 515 r = test_ring_create(RING_NAME, esize, RING_SIZE, rte_socket_id(), 0); 516 if (r == NULL) 517 goto test_fail; 518 519 printf("\n### Testing single element enq/deq ###\n"); 520 if (test_single_enqueue_dequeue(r, esize, 521 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_SINGLE) < 0) 522 goto test_fail; 523 if (test_single_enqueue_dequeue(r, esize, 524 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_SINGLE) < 0) 525 goto test_fail; 526 527 printf("\n### Testing burst enq/deq ###\n"); 528 if (test_burst_bulk_enqueue_dequeue(r, esize, 529 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BURST) < 0) 530 goto test_fail; 531 if (test_burst_bulk_enqueue_dequeue(r, esize, 532 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BURST) < 0) 533 goto test_fail; 534 535 printf("\n### Testing bulk enq/deq ###\n"); 536 if (test_burst_bulk_enqueue_dequeue(r, esize, 537 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK) < 0) 538 goto test_fail; 539 if (test_burst_bulk_enqueue_dequeue(r, esize, 540 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK) < 0) 541 goto test_fail; 542 543 printf("\n### Testing empty bulk deq ###\n"); 544 test_empty_dequeue(r, esize, 545 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK); 546 test_empty_dequeue(r, esize, 547 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); 548 549 ring_params.r = r; 550 551 ring_params.ring_flags = TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK; 552 test_ring_perf_esize_run_on_two_cores(¶m1, ¶m2); 553 554 ring_params.ring_flags = TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK; 555 test_ring_perf_esize_run_on_two_cores(¶m1, ¶m2); 556 557 printf("\n### Testing using all worker nodes ###\n"); 558 if (run_on_all_cores(r, esize) < 0) 559 goto test_fail; 560 561 rte_ring_free(r); 562 563 return 0; 564 565 test_fail: 566 rte_ring_free(r); 567 568 return -1; 569 } 570 571 572 static __rte_always_inline int 573 test_ring_perf_compression(void) 574 { 575 double results1[RTE_DIM(bulk_sizes)]; 576 double results2[RTE_DIM(bulk_sizes)]; 577 double results1_comp[2][RTE_DIM(bulk_sizes)]; 578 double results2_comp[2][RTE_DIM(bulk_sizes)]; 579 580 struct lcore_pair cores; 581 int ret = -1; 582 unsigned int i, j; 583 struct ring_params ring_params = { .elem_size = sizeof(void *) }; 584 struct thread_params param1 = { 585 .ring_params = &ring_params, .results = results1 }; 586 struct thread_params param2 = { 587 .ring_params = &ring_params, .results = results2 }; 588 589 printf("\n### Testing compression gain ###"); 590 591 ring_params.r = rte_ring_create_elem( 592 RING_NAME, sizeof(void *), 593 RING_SIZE, rte_socket_id(), 594 RING_F_SP_ENQ | RING_F_SC_DEQ); 595 596 if (ring_params.r == NULL) 597 return -1; 598 599 if (get_two_cores(&cores) == 0) { 600 printf("\n### Testing zero copy ###\n"); 601 ring_params.ring_flags = TEST_RING_ELEM_BURST_ZC; 602 ret = run_on_core_pair(&cores, ¶m1, ¶m2); 603 } 604 605 rte_ring_free(ring_params.r); 606 607 if (ret != 0) 608 return ret; 609 610 /* rings allow only multiples of 4 as sizes, 611 * we allocate size 4 despite only using 2 bytes 612 * and use half of RING_SIZE as the number of elements 613 */ 614 ring_params.r = rte_ring_create_elem( 615 RING_NAME, sizeof(uint32_t), 616 RING_SIZE / 2, rte_socket_id(), 617 RING_F_SP_ENQ | RING_F_SC_DEQ); 618 619 if (ring_params.r == NULL) 620 return -1; 621 622 param1.results = results1_comp[0]; 623 param2.results = results2_comp[0]; 624 625 if (get_two_cores(&cores) == 0) { 626 printf("\n### Testing zero copy with compression (16b) ###\n"); 627 ring_params.ring_flags = 628 TEST_RING_ELEM_BURST_ZC_COMPRESS_PTR_16; 629 ret = run_on_core_pair(&cores, ¶m1, ¶m2); 630 } 631 632 rte_ring_free(ring_params.r); 633 634 if (ret != 0) 635 return ret; 636 637 ring_params.r = rte_ring_create_elem( 638 RING_NAME, sizeof(uint32_t), 639 RING_SIZE, rte_socket_id(), 640 RING_F_SP_ENQ | RING_F_SC_DEQ); 641 642 if (ring_params.r == NULL) 643 return -1; 644 645 param1.results = results1_comp[1]; 646 param2.results = results2_comp[1]; 647 648 if (get_two_cores(&cores) == 0) { 649 printf("\n### Testing zero copy with compression (32b) ###\n"); 650 ring_params.ring_flags = 651 TEST_RING_ELEM_BURST_ZC_COMPRESS_PTR_32; 652 ret = run_on_core_pair(&cores, ¶m1, ¶m2); 653 } 654 655 rte_ring_free(ring_params.r); 656 657 for (j = 0; j < 2; j++) { 658 printf("\n### Potential gain from compression (%d-bit offsets) " 659 "###\n", (j + 1) * 16); 660 for (i = 0; i < RTE_DIM(bulk_sizes); i++) { 661 const double result = results1[i] + results2[i]; 662 const double result_comp = results1_comp[j][i] + 663 results2_comp[j][i]; 664 const double gain = 100 - (result_comp / result) * 100; 665 666 printf("Gain of %5.1F%% for burst of %-3u elems\n", 667 gain, bulk_sizes[i]); 668 } 669 } 670 671 return ret; 672 } 673 674 static int 675 test_ring_perf(void) 676 { 677 /* Run all the tests for different element sizes */ 678 if (test_ring_perf_esize(-1) == -1) 679 return -1; 680 681 if (test_ring_perf_esize(16) == -1) 682 return -1; 683 684 /* Test for performance gain of compression */ 685 if (test_ring_perf_compression() == -1) 686 return -1; 687 688 return 0; 689 } 690 691 REGISTER_PERF_TEST(ring_perf_autotest, test_ring_perf); 692