1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 * Copyright(c) 2019 Arm Limited 4 */ 5 6 7 #include <stdio.h> 8 #include <inttypes.h> 9 #include <rte_ring.h> 10 #include <rte_cycles.h> 11 #include <rte_launch.h> 12 #include <rte_pause.h> 13 #include <string.h> 14 15 #include "test.h" 16 #include "test_ring.h" 17 18 /* 19 * Ring performance test cases, measures performance of various operations 20 * using rdtsc for legacy and 16B size ring elements. 21 */ 22 23 #define RING_NAME "RING_PERF" 24 #define RING_SIZE 4096 25 #define MAX_BURST 32 26 27 /* 28 * the sizes to enqueue and dequeue in testing 29 * (marked volatile so they won't be seen as compile-time constants) 30 */ 31 static const volatile unsigned bulk_sizes[] = { 8, 32 }; 32 33 struct lcore_pair { 34 unsigned c1, c2; 35 }; 36 37 static volatile unsigned lcore_count = 0; 38 39 static void 40 test_ring_print_test_string(unsigned int api_type, int esize, 41 unsigned int bsz, double value) 42 { 43 if (esize == -1) 44 printf("legacy APIs"); 45 else 46 printf("elem APIs: element size %dB", esize); 47 48 if (api_type == TEST_RING_IGNORE_API_TYPE) 49 return; 50 51 if ((api_type & TEST_RING_THREAD_DEF) == TEST_RING_THREAD_DEF) 52 printf(": default enqueue/dequeue: "); 53 else if ((api_type & TEST_RING_THREAD_SPSC) == TEST_RING_THREAD_SPSC) 54 printf(": SP/SC: "); 55 else if ((api_type & TEST_RING_THREAD_MPMC) == TEST_RING_THREAD_MPMC) 56 printf(": MP/MC: "); 57 58 if ((api_type & TEST_RING_ELEM_SINGLE) == TEST_RING_ELEM_SINGLE) 59 printf("single: "); 60 else if ((api_type & TEST_RING_ELEM_BULK) == TEST_RING_ELEM_BULK) 61 printf("bulk (size: %u): ", bsz); 62 else if ((api_type & TEST_RING_ELEM_BURST) == TEST_RING_ELEM_BURST) 63 printf("burst (size: %u): ", bsz); 64 65 printf("%.2F\n", value); 66 } 67 68 /**** Functions to analyse our core mask to get cores for different tests ***/ 69 70 static int 71 get_two_hyperthreads(struct lcore_pair *lcp) 72 { 73 unsigned id1, id2; 74 unsigned c1, c2, s1, s2; 75 RTE_LCORE_FOREACH(id1) { 76 /* inner loop just re-reads all id's. We could skip the first few 77 * elements, but since number of cores is small there is little point 78 */ 79 RTE_LCORE_FOREACH(id2) { 80 if (id1 == id2) 81 continue; 82 83 c1 = rte_lcore_to_cpu_id(id1); 84 c2 = rte_lcore_to_cpu_id(id2); 85 s1 = rte_lcore_to_socket_id(id1); 86 s2 = rte_lcore_to_socket_id(id2); 87 if ((c1 == c2) && (s1 == s2)){ 88 lcp->c1 = id1; 89 lcp->c2 = id2; 90 return 0; 91 } 92 } 93 } 94 return 1; 95 } 96 97 static int 98 get_two_cores(struct lcore_pair *lcp) 99 { 100 unsigned id1, id2; 101 unsigned c1, c2, s1, s2; 102 RTE_LCORE_FOREACH(id1) { 103 RTE_LCORE_FOREACH(id2) { 104 if (id1 == id2) 105 continue; 106 107 c1 = rte_lcore_to_cpu_id(id1); 108 c2 = rte_lcore_to_cpu_id(id2); 109 s1 = rte_lcore_to_socket_id(id1); 110 s2 = rte_lcore_to_socket_id(id2); 111 if ((c1 != c2) && (s1 == s2)){ 112 lcp->c1 = id1; 113 lcp->c2 = id2; 114 return 0; 115 } 116 } 117 } 118 return 1; 119 } 120 121 static int 122 get_two_sockets(struct lcore_pair *lcp) 123 { 124 unsigned id1, id2; 125 unsigned s1, s2; 126 RTE_LCORE_FOREACH(id1) { 127 RTE_LCORE_FOREACH(id2) { 128 if (id1 == id2) 129 continue; 130 s1 = rte_lcore_to_socket_id(id1); 131 s2 = rte_lcore_to_socket_id(id2); 132 if (s1 != s2){ 133 lcp->c1 = id1; 134 lcp->c2 = id2; 135 return 0; 136 } 137 } 138 } 139 return 1; 140 } 141 142 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */ 143 static void 144 test_empty_dequeue(struct rte_ring *r, const int esize, 145 const unsigned int api_type) 146 { 147 const unsigned int iter_shift = 26; 148 const unsigned int iterations = 1 << iter_shift; 149 unsigned int i = 0; 150 void *burst[MAX_BURST]; 151 152 const uint64_t start = rte_rdtsc(); 153 for (i = 0; i < iterations; i++) 154 test_ring_dequeue(r, burst, esize, bulk_sizes[0], api_type); 155 const uint64_t end = rte_rdtsc(); 156 157 test_ring_print_test_string(api_type, esize, bulk_sizes[0], 158 ((double)(end - start)) / iterations); 159 } 160 161 /* 162 * for the separate enqueue and dequeue threads they take in one param 163 * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc 164 */ 165 struct thread_params { 166 struct rte_ring *r; 167 unsigned size; /* input value, the burst size */ 168 double spsc, mpmc; /* output value, the single or multi timings */ 169 }; 170 171 /* 172 * Helper function to call bulk SP/MP enqueue functions. 173 * flag == 0 -> enqueue 174 * flag == 1 -> dequeue 175 */ 176 static __rte_always_inline int 177 enqueue_dequeue_bulk_helper(const unsigned int flag, const int esize, 178 struct thread_params *p) 179 { 180 int ret; 181 const unsigned int iter_shift = 15; 182 const unsigned int iterations = 1 << iter_shift; 183 struct rte_ring *r = p->r; 184 unsigned int bsize = p->size; 185 unsigned int i; 186 void *burst = NULL; 187 188 #ifdef RTE_USE_C11_MEM_MODEL 189 if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2) 190 #else 191 if (__sync_add_and_fetch(&lcore_count, 1) != 2) 192 #endif 193 while(lcore_count != 2) 194 rte_pause(); 195 196 burst = test_ring_calloc(MAX_BURST, esize); 197 if (burst == NULL) 198 return -1; 199 200 const uint64_t sp_start = rte_rdtsc(); 201 for (i = 0; i < iterations; i++) 202 do { 203 if (flag == 0) 204 ret = test_ring_enqueue(r, burst, esize, bsize, 205 TEST_RING_THREAD_SPSC | 206 TEST_RING_ELEM_BULK); 207 else if (flag == 1) 208 ret = test_ring_dequeue(r, burst, esize, bsize, 209 TEST_RING_THREAD_SPSC | 210 TEST_RING_ELEM_BULK); 211 if (ret == 0) 212 rte_pause(); 213 } while (!ret); 214 const uint64_t sp_end = rte_rdtsc(); 215 216 const uint64_t mp_start = rte_rdtsc(); 217 for (i = 0; i < iterations; i++) 218 do { 219 if (flag == 0) 220 ret = test_ring_enqueue(r, burst, esize, bsize, 221 TEST_RING_THREAD_MPMC | 222 TEST_RING_ELEM_BULK); 223 else if (flag == 1) 224 ret = test_ring_dequeue(r, burst, esize, bsize, 225 TEST_RING_THREAD_MPMC | 226 TEST_RING_ELEM_BULK); 227 if (ret == 0) 228 rte_pause(); 229 } while (!ret); 230 const uint64_t mp_end = rte_rdtsc(); 231 232 p->spsc = ((double)(sp_end - sp_start))/(iterations * bsize); 233 p->mpmc = ((double)(mp_end - mp_start))/(iterations * bsize); 234 return 0; 235 } 236 237 /* 238 * Function that uses rdtsc to measure timing for ring enqueue. Needs pair 239 * thread running dequeue_bulk function 240 */ 241 static int 242 enqueue_bulk(void *p) 243 { 244 struct thread_params *params = p; 245 246 return enqueue_dequeue_bulk_helper(0, -1, params); 247 } 248 249 static int 250 enqueue_bulk_16B(void *p) 251 { 252 struct thread_params *params = p; 253 254 return enqueue_dequeue_bulk_helper(0, 16, params); 255 } 256 257 /* 258 * Function that uses rdtsc to measure timing for ring dequeue. Needs pair 259 * thread running enqueue_bulk function 260 */ 261 static int 262 dequeue_bulk(void *p) 263 { 264 struct thread_params *params = p; 265 266 return enqueue_dequeue_bulk_helper(1, -1, params); 267 } 268 269 static int 270 dequeue_bulk_16B(void *p) 271 { 272 struct thread_params *params = p; 273 274 return enqueue_dequeue_bulk_helper(1, 16, params); 275 } 276 277 /* 278 * Function that calls the enqueue and dequeue bulk functions on pairs of cores. 279 * used to measure ring perf between hyperthreads, cores and sockets. 280 */ 281 static int 282 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r, const int esize) 283 { 284 lcore_function_t *f1, *f2; 285 struct thread_params param1 = {0}, param2 = {0}; 286 unsigned i; 287 288 if (esize == -1) { 289 f1 = enqueue_bulk; 290 f2 = dequeue_bulk; 291 } else { 292 f1 = enqueue_bulk_16B; 293 f2 = dequeue_bulk_16B; 294 } 295 296 for (i = 0; i < RTE_DIM(bulk_sizes); i++) { 297 lcore_count = 0; 298 param1.size = param2.size = bulk_sizes[i]; 299 param1.r = param2.r = r; 300 if (cores->c1 == rte_get_main_lcore()) { 301 rte_eal_remote_launch(f2, ¶m2, cores->c2); 302 f1(¶m1); 303 rte_eal_wait_lcore(cores->c2); 304 } else { 305 rte_eal_remote_launch(f1, ¶m1, cores->c1); 306 rte_eal_remote_launch(f2, ¶m2, cores->c2); 307 if (rte_eal_wait_lcore(cores->c1) < 0) 308 return -1; 309 if (rte_eal_wait_lcore(cores->c2) < 0) 310 return -1; 311 } 312 test_ring_print_test_string( 313 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK, 314 esize, bulk_sizes[i], param1.spsc + param2.spsc); 315 test_ring_print_test_string( 316 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK, 317 esize, bulk_sizes[i], param1.mpmc + param2.mpmc); 318 } 319 320 return 0; 321 } 322 323 static uint32_t synchro; 324 static uint64_t queue_count[RTE_MAX_LCORE]; 325 326 #define TIME_MS 100 327 328 static int 329 load_loop_fn_helper(struct thread_params *p, const int esize) 330 { 331 uint64_t time_diff = 0; 332 uint64_t begin = 0; 333 uint64_t hz = rte_get_timer_hz(); 334 uint64_t lcount = 0; 335 const unsigned int lcore = rte_lcore_id(); 336 struct thread_params *params = p; 337 void *burst = NULL; 338 339 burst = test_ring_calloc(MAX_BURST, esize); 340 if (burst == NULL) 341 return -1; 342 343 /* wait synchro for workers */ 344 if (lcore != rte_get_main_lcore()) 345 rte_wait_until_equal_32(&synchro, 1, __ATOMIC_RELAXED); 346 347 begin = rte_get_timer_cycles(); 348 while (time_diff < hz * TIME_MS / 1000) { 349 test_ring_enqueue(params->r, burst, esize, params->size, 350 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); 351 test_ring_dequeue(params->r, burst, esize, params->size, 352 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); 353 lcount++; 354 time_diff = rte_get_timer_cycles() - begin; 355 } 356 queue_count[lcore] = lcount; 357 358 rte_free(burst); 359 360 return 0; 361 } 362 363 static int 364 load_loop_fn(void *p) 365 { 366 struct thread_params *params = p; 367 368 return load_loop_fn_helper(params, -1); 369 } 370 371 static int 372 load_loop_fn_16B(void *p) 373 { 374 struct thread_params *params = p; 375 376 return load_loop_fn_helper(params, 16); 377 } 378 379 static int 380 run_on_all_cores(struct rte_ring *r, const int esize) 381 { 382 uint64_t total; 383 struct thread_params param; 384 lcore_function_t *lcore_f; 385 unsigned int i, c; 386 387 if (esize == -1) 388 lcore_f = load_loop_fn; 389 else 390 lcore_f = load_loop_fn_16B; 391 392 memset(¶m, 0, sizeof(struct thread_params)); 393 for (i = 0; i < RTE_DIM(bulk_sizes); i++) { 394 total = 0; 395 printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]); 396 param.size = bulk_sizes[i]; 397 param.r = r; 398 399 /* clear synchro and start workers */ 400 __atomic_store_n(&synchro, 0, __ATOMIC_RELAXED); 401 if (rte_eal_mp_remote_launch(lcore_f, ¶m, SKIP_MAIN) < 0) 402 return -1; 403 404 /* start synchro and launch test on main */ 405 __atomic_store_n(&synchro, 1, __ATOMIC_RELAXED); 406 lcore_f(¶m); 407 408 rte_eal_mp_wait_lcore(); 409 410 RTE_LCORE_FOREACH(c) { 411 printf("Core [%u] count = %"PRIu64"\n", 412 c, queue_count[c]); 413 total += queue_count[c]; 414 } 415 416 printf("Total count (size: %u): %"PRIu64"\n", 417 bulk_sizes[i], total); 418 } 419 420 return 0; 421 } 422 423 /* 424 * Test function that determines how long an enqueue + dequeue of a single item 425 * takes on a single lcore. Result is for comparison with the bulk enq+deq. 426 */ 427 static int 428 test_single_enqueue_dequeue(struct rte_ring *r, const int esize, 429 const unsigned int api_type) 430 { 431 const unsigned int iter_shift = 24; 432 const unsigned int iterations = 1 << iter_shift; 433 unsigned int i = 0; 434 void *burst = NULL; 435 436 /* alloc dummy object pointers */ 437 burst = test_ring_calloc(1, esize); 438 if (burst == NULL) 439 return -1; 440 441 const uint64_t start = rte_rdtsc(); 442 for (i = 0; i < iterations; i++) { 443 test_ring_enqueue(r, burst, esize, 1, api_type); 444 test_ring_dequeue(r, burst, esize, 1, api_type); 445 } 446 const uint64_t end = rte_rdtsc(); 447 448 test_ring_print_test_string(api_type, esize, 1, 449 ((double)(end - start)) / iterations); 450 451 rte_free(burst); 452 453 return 0; 454 } 455 456 /* 457 * Test that does both enqueue and dequeue on a core using the burst/bulk API 458 * calls Results should be the same as for the bulk function called on a 459 * single lcore. 460 */ 461 static int 462 test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize, 463 const unsigned int api_type) 464 { 465 const unsigned int iter_shift = 23; 466 const unsigned int iterations = 1 << iter_shift; 467 unsigned int sz, i = 0; 468 void **burst = NULL; 469 470 burst = test_ring_calloc(MAX_BURST, esize); 471 if (burst == NULL) 472 return -1; 473 474 for (sz = 0; sz < RTE_DIM(bulk_sizes); sz++) { 475 const uint64_t start = rte_rdtsc(); 476 for (i = 0; i < iterations; i++) { 477 test_ring_enqueue(r, burst, esize, bulk_sizes[sz], 478 api_type); 479 test_ring_dequeue(r, burst, esize, bulk_sizes[sz], 480 api_type); 481 } 482 const uint64_t end = rte_rdtsc(); 483 484 test_ring_print_test_string(api_type, esize, bulk_sizes[sz], 485 ((double)(end - start)) / iterations); 486 } 487 488 rte_free(burst); 489 490 return 0; 491 } 492 493 /* Run all tests for a given element size */ 494 static __rte_always_inline int 495 test_ring_perf_esize(const int esize) 496 { 497 struct lcore_pair cores; 498 struct rte_ring *r = NULL; 499 500 /* 501 * Performance test for legacy/_elem APIs 502 * SP-SC/MP-MC, single 503 */ 504 r = test_ring_create(RING_NAME, esize, RING_SIZE, rte_socket_id(), 0); 505 if (r == NULL) 506 goto test_fail; 507 508 printf("\n### Testing single element enq/deq ###\n"); 509 if (test_single_enqueue_dequeue(r, esize, 510 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_SINGLE) < 0) 511 goto test_fail; 512 if (test_single_enqueue_dequeue(r, esize, 513 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_SINGLE) < 0) 514 goto test_fail; 515 516 printf("\n### Testing burst enq/deq ###\n"); 517 if (test_burst_bulk_enqueue_dequeue(r, esize, 518 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BURST) < 0) 519 goto test_fail; 520 if (test_burst_bulk_enqueue_dequeue(r, esize, 521 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BURST) < 0) 522 goto test_fail; 523 524 printf("\n### Testing bulk enq/deq ###\n"); 525 if (test_burst_bulk_enqueue_dequeue(r, esize, 526 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK) < 0) 527 goto test_fail; 528 if (test_burst_bulk_enqueue_dequeue(r, esize, 529 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK) < 0) 530 goto test_fail; 531 532 printf("\n### Testing empty bulk deq ###\n"); 533 test_empty_dequeue(r, esize, 534 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK); 535 test_empty_dequeue(r, esize, 536 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); 537 538 if (get_two_hyperthreads(&cores) == 0) { 539 printf("\n### Testing using two hyperthreads ###\n"); 540 if (run_on_core_pair(&cores, r, esize) < 0) 541 goto test_fail; 542 } 543 544 if (get_two_cores(&cores) == 0) { 545 printf("\n### Testing using two physical cores ###\n"); 546 if (run_on_core_pair(&cores, r, esize) < 0) 547 goto test_fail; 548 } 549 if (get_two_sockets(&cores) == 0) { 550 printf("\n### Testing using two NUMA nodes ###\n"); 551 if (run_on_core_pair(&cores, r, esize) < 0) 552 goto test_fail; 553 } 554 555 printf("\n### Testing using all worker nodes ###\n"); 556 if (run_on_all_cores(r, esize) < 0) 557 goto test_fail; 558 559 rte_ring_free(r); 560 561 return 0; 562 563 test_fail: 564 rte_ring_free(r); 565 566 return -1; 567 } 568 569 static int 570 test_ring_perf(void) 571 { 572 /* Run all the tests for different element sizes */ 573 if (test_ring_perf_esize(-1) == -1) 574 return -1; 575 576 if (test_ring_perf_esize(16) == -1) 577 return -1; 578 579 return 0; 580 } 581 582 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf); 583