1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 * Copyright(c) 2019 Arm Limited 4 */ 5 6 7 #include <stdio.h> 8 #include <inttypes.h> 9 #include <rte_ring.h> 10 #include <rte_cycles.h> 11 #include <rte_launch.h> 12 #include <rte_pause.h> 13 #include <string.h> 14 15 #include "test.h" 16 #include "test_ring.h" 17 18 /* 19 * Ring performance test cases, measures performance of various operations 20 * using rdtsc for legacy and 16B size ring elements. 21 */ 22 23 #define RING_NAME "RING_PERF" 24 #define RING_SIZE 4096 25 #define MAX_BURST 32 26 27 /* 28 * the sizes to enqueue and dequeue in testing 29 * (marked volatile so they won't be seen as compile-time constants) 30 */ 31 static const volatile unsigned bulk_sizes[] = { 8, 32 }; 32 33 struct lcore_pair { 34 unsigned c1, c2; 35 }; 36 37 static volatile unsigned lcore_count = 0; 38 39 static void 40 test_ring_print_test_string(unsigned int api_type, int esize, 41 unsigned int bsz, double value) 42 { 43 if (esize == -1) 44 printf("legacy APIs"); 45 else 46 printf("elem APIs: element size %dB", esize); 47 48 if (api_type == TEST_RING_IGNORE_API_TYPE) 49 return; 50 51 if ((api_type & TEST_RING_THREAD_DEF) == TEST_RING_THREAD_DEF) 52 printf(": default enqueue/dequeue: "); 53 else if ((api_type & TEST_RING_THREAD_SPSC) == TEST_RING_THREAD_SPSC) 54 printf(": SP/SC: "); 55 else if ((api_type & TEST_RING_THREAD_MPMC) == TEST_RING_THREAD_MPMC) 56 printf(": MP/MC: "); 57 58 if ((api_type & TEST_RING_ELEM_SINGLE) == TEST_RING_ELEM_SINGLE) 59 printf("single: "); 60 else if ((api_type & TEST_RING_ELEM_BULK) == TEST_RING_ELEM_BULK) 61 printf("bulk (size: %u): ", bsz); 62 else if ((api_type & TEST_RING_ELEM_BURST) == TEST_RING_ELEM_BURST) 63 printf("burst (size: %u): ", bsz); 64 65 printf("%.2F\n", value); 66 } 67 68 /**** Functions to analyse our core mask to get cores for different tests ***/ 69 70 static int 71 get_two_hyperthreads(struct lcore_pair *lcp) 72 { 73 unsigned id1, id2; 74 unsigned c1, c2, s1, s2; 75 RTE_LCORE_FOREACH(id1) { 76 /* inner loop just re-reads all id's. We could skip the first few 77 * elements, but since number of cores is small there is little point 78 */ 79 RTE_LCORE_FOREACH(id2) { 80 if (id1 == id2) 81 continue; 82 83 c1 = rte_lcore_to_cpu_id(id1); 84 c2 = rte_lcore_to_cpu_id(id2); 85 s1 = rte_lcore_to_socket_id(id1); 86 s2 = rte_lcore_to_socket_id(id2); 87 if ((c1 == c2) && (s1 == s2)){ 88 lcp->c1 = id1; 89 lcp->c2 = id2; 90 return 0; 91 } 92 } 93 } 94 return 1; 95 } 96 97 static int 98 get_two_cores(struct lcore_pair *lcp) 99 { 100 unsigned id1, id2; 101 unsigned c1, c2, s1, s2; 102 RTE_LCORE_FOREACH(id1) { 103 RTE_LCORE_FOREACH(id2) { 104 if (id1 == id2) 105 continue; 106 107 c1 = rte_lcore_to_cpu_id(id1); 108 c2 = rte_lcore_to_cpu_id(id2); 109 s1 = rte_lcore_to_socket_id(id1); 110 s2 = rte_lcore_to_socket_id(id2); 111 if ((c1 != c2) && (s1 == s2)){ 112 lcp->c1 = id1; 113 lcp->c2 = id2; 114 return 0; 115 } 116 } 117 } 118 return 1; 119 } 120 121 static int 122 get_two_sockets(struct lcore_pair *lcp) 123 { 124 unsigned id1, id2; 125 unsigned s1, s2; 126 RTE_LCORE_FOREACH(id1) { 127 RTE_LCORE_FOREACH(id2) { 128 if (id1 == id2) 129 continue; 130 s1 = rte_lcore_to_socket_id(id1); 131 s2 = rte_lcore_to_socket_id(id2); 132 if (s1 != s2){ 133 lcp->c1 = id1; 134 lcp->c2 = id2; 135 return 0; 136 } 137 } 138 } 139 return 1; 140 } 141 142 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */ 143 static void 144 test_empty_dequeue(struct rte_ring *r, const int esize, 145 const unsigned int api_type) 146 { 147 const unsigned int iter_shift = 26; 148 const unsigned int iterations = 1 << iter_shift; 149 unsigned int i = 0; 150 void *burst[MAX_BURST]; 151 152 const uint64_t start = rte_rdtsc(); 153 for (i = 0; i < iterations; i++) 154 test_ring_dequeue(r, burst, esize, bulk_sizes[0], api_type); 155 const uint64_t end = rte_rdtsc(); 156 157 test_ring_print_test_string(api_type, esize, bulk_sizes[0], 158 ((double)(end - start)) / iterations); 159 } 160 161 /* 162 * for the separate enqueue and dequeue threads they take in one param 163 * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc 164 */ 165 struct thread_params { 166 struct rte_ring *r; 167 unsigned size; /* input value, the burst size */ 168 double spsc, mpmc; /* output value, the single or multi timings */ 169 }; 170 171 /* 172 * Helper function to call bulk SP/MP enqueue functions. 173 * flag == 0 -> enqueue 174 * flag == 1 -> dequeue 175 */ 176 static __rte_always_inline int 177 enqueue_dequeue_bulk_helper(const unsigned int flag, const int esize, 178 struct thread_params *p) 179 { 180 int ret; 181 const unsigned int iter_shift = 15; 182 const unsigned int iterations = 1 << iter_shift; 183 struct rte_ring *r = p->r; 184 unsigned int bsize = p->size; 185 unsigned int i; 186 void *burst = NULL; 187 188 #ifdef RTE_USE_C11_MEM_MODEL 189 if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2) 190 #else 191 if (__sync_add_and_fetch(&lcore_count, 1) != 2) 192 #endif 193 while(lcore_count != 2) 194 rte_pause(); 195 196 burst = test_ring_calloc(MAX_BURST, esize); 197 if (burst == NULL) 198 return -1; 199 200 const uint64_t sp_start = rte_rdtsc(); 201 for (i = 0; i < iterations; i++) 202 do { 203 if (flag == 0) 204 ret = test_ring_enqueue(r, burst, esize, bsize, 205 TEST_RING_THREAD_SPSC | 206 TEST_RING_ELEM_BULK); 207 else if (flag == 1) 208 ret = test_ring_dequeue(r, burst, esize, bsize, 209 TEST_RING_THREAD_SPSC | 210 TEST_RING_ELEM_BULK); 211 if (ret == 0) 212 rte_pause(); 213 } while (!ret); 214 const uint64_t sp_end = rte_rdtsc(); 215 216 const uint64_t mp_start = rte_rdtsc(); 217 for (i = 0; i < iterations; i++) 218 do { 219 if (flag == 0) 220 ret = test_ring_enqueue(r, burst, esize, bsize, 221 TEST_RING_THREAD_MPMC | 222 TEST_RING_ELEM_BULK); 223 else if (flag == 1) 224 ret = test_ring_dequeue(r, burst, esize, bsize, 225 TEST_RING_THREAD_MPMC | 226 TEST_RING_ELEM_BULK); 227 if (ret == 0) 228 rte_pause(); 229 } while (!ret); 230 const uint64_t mp_end = rte_rdtsc(); 231 232 p->spsc = ((double)(sp_end - sp_start))/(iterations * bsize); 233 p->mpmc = ((double)(mp_end - mp_start))/(iterations * bsize); 234 return 0; 235 } 236 237 /* 238 * Function that uses rdtsc to measure timing for ring enqueue. Needs pair 239 * thread running dequeue_bulk function 240 */ 241 static int 242 enqueue_bulk(void *p) 243 { 244 struct thread_params *params = p; 245 246 return enqueue_dequeue_bulk_helper(0, -1, params); 247 } 248 249 static int 250 enqueue_bulk_16B(void *p) 251 { 252 struct thread_params *params = p; 253 254 return enqueue_dequeue_bulk_helper(0, 16, params); 255 } 256 257 /* 258 * Function that uses rdtsc to measure timing for ring dequeue. Needs pair 259 * thread running enqueue_bulk function 260 */ 261 static int 262 dequeue_bulk(void *p) 263 { 264 struct thread_params *params = p; 265 266 return enqueue_dequeue_bulk_helper(1, -1, params); 267 } 268 269 static int 270 dequeue_bulk_16B(void *p) 271 { 272 struct thread_params *params = p; 273 274 return enqueue_dequeue_bulk_helper(1, 16, params); 275 } 276 277 /* 278 * Function that calls the enqueue and dequeue bulk functions on pairs of cores. 279 * used to measure ring perf between hyperthreads, cores and sockets. 280 */ 281 static int 282 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r, const int esize) 283 { 284 lcore_function_t *f1, *f2; 285 struct thread_params param1 = {0}, param2 = {0}; 286 unsigned i; 287 288 if (esize == -1) { 289 f1 = enqueue_bulk; 290 f2 = dequeue_bulk; 291 } else { 292 f1 = enqueue_bulk_16B; 293 f2 = dequeue_bulk_16B; 294 } 295 296 for (i = 0; i < RTE_DIM(bulk_sizes); i++) { 297 lcore_count = 0; 298 param1.size = param2.size = bulk_sizes[i]; 299 param1.r = param2.r = r; 300 if (cores->c1 == rte_get_main_lcore()) { 301 rte_eal_remote_launch(f2, ¶m2, cores->c2); 302 f1(¶m1); 303 rte_eal_wait_lcore(cores->c2); 304 } else { 305 rte_eal_remote_launch(f1, ¶m1, cores->c1); 306 rte_eal_remote_launch(f2, ¶m2, cores->c2); 307 if (rte_eal_wait_lcore(cores->c1) < 0) 308 return -1; 309 if (rte_eal_wait_lcore(cores->c2) < 0) 310 return -1; 311 } 312 test_ring_print_test_string( 313 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK, 314 esize, bulk_sizes[i], param1.spsc + param2.spsc); 315 test_ring_print_test_string( 316 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK, 317 esize, bulk_sizes[i], param1.mpmc + param2.mpmc); 318 } 319 320 return 0; 321 } 322 323 static rte_atomic32_t synchro; 324 static uint64_t queue_count[RTE_MAX_LCORE]; 325 326 #define TIME_MS 100 327 328 static int 329 load_loop_fn_helper(struct thread_params *p, const int esize) 330 { 331 uint64_t time_diff = 0; 332 uint64_t begin = 0; 333 uint64_t hz = rte_get_timer_hz(); 334 uint64_t lcount = 0; 335 const unsigned int lcore = rte_lcore_id(); 336 struct thread_params *params = p; 337 void *burst = NULL; 338 339 burst = test_ring_calloc(MAX_BURST, esize); 340 if (burst == NULL) 341 return -1; 342 343 /* wait synchro for workers */ 344 if (lcore != rte_get_main_lcore()) 345 while (rte_atomic32_read(&synchro) == 0) 346 rte_pause(); 347 348 begin = rte_get_timer_cycles(); 349 while (time_diff < hz * TIME_MS / 1000) { 350 test_ring_enqueue(params->r, burst, esize, params->size, 351 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); 352 test_ring_dequeue(params->r, burst, esize, params->size, 353 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); 354 lcount++; 355 time_diff = rte_get_timer_cycles() - begin; 356 } 357 queue_count[lcore] = lcount; 358 359 rte_free(burst); 360 361 return 0; 362 } 363 364 static int 365 load_loop_fn(void *p) 366 { 367 struct thread_params *params = p; 368 369 return load_loop_fn_helper(params, -1); 370 } 371 372 static int 373 load_loop_fn_16B(void *p) 374 { 375 struct thread_params *params = p; 376 377 return load_loop_fn_helper(params, 16); 378 } 379 380 static int 381 run_on_all_cores(struct rte_ring *r, const int esize) 382 { 383 uint64_t total; 384 struct thread_params param; 385 lcore_function_t *lcore_f; 386 unsigned int i, c; 387 388 if (esize == -1) 389 lcore_f = load_loop_fn; 390 else 391 lcore_f = load_loop_fn_16B; 392 393 memset(¶m, 0, sizeof(struct thread_params)); 394 for (i = 0; i < RTE_DIM(bulk_sizes); i++) { 395 total = 0; 396 printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]); 397 param.size = bulk_sizes[i]; 398 param.r = r; 399 400 /* clear synchro and start workers */ 401 rte_atomic32_set(&synchro, 0); 402 if (rte_eal_mp_remote_launch(lcore_f, ¶m, SKIP_MAIN) < 0) 403 return -1; 404 405 /* start synchro and launch test on main */ 406 rte_atomic32_set(&synchro, 1); 407 lcore_f(¶m); 408 409 rte_eal_mp_wait_lcore(); 410 411 RTE_LCORE_FOREACH(c) { 412 printf("Core [%u] count = %"PRIu64"\n", 413 c, queue_count[c]); 414 total += queue_count[c]; 415 } 416 417 printf("Total count (size: %u): %"PRIu64"\n", 418 bulk_sizes[i], total); 419 } 420 421 return 0; 422 } 423 424 /* 425 * Test function that determines how long an enqueue + dequeue of a single item 426 * takes on a single lcore. Result is for comparison with the bulk enq+deq. 427 */ 428 static int 429 test_single_enqueue_dequeue(struct rte_ring *r, const int esize, 430 const unsigned int api_type) 431 { 432 const unsigned int iter_shift = 24; 433 const unsigned int iterations = 1 << iter_shift; 434 unsigned int i = 0; 435 void *burst = NULL; 436 437 /* alloc dummy object pointers */ 438 burst = test_ring_calloc(1, esize); 439 if (burst == NULL) 440 return -1; 441 442 const uint64_t start = rte_rdtsc(); 443 for (i = 0; i < iterations; i++) { 444 test_ring_enqueue(r, burst, esize, 1, api_type); 445 test_ring_dequeue(r, burst, esize, 1, api_type); 446 } 447 const uint64_t end = rte_rdtsc(); 448 449 test_ring_print_test_string(api_type, esize, 1, 450 ((double)(end - start)) / iterations); 451 452 rte_free(burst); 453 454 return 0; 455 } 456 457 /* 458 * Test that does both enqueue and dequeue on a core using the burst/bulk API 459 * calls Results should be the same as for the bulk function called on a 460 * single lcore. 461 */ 462 static int 463 test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize, 464 const unsigned int api_type) 465 { 466 const unsigned int iter_shift = 23; 467 const unsigned int iterations = 1 << iter_shift; 468 unsigned int sz, i = 0; 469 void **burst = NULL; 470 471 burst = test_ring_calloc(MAX_BURST, esize); 472 if (burst == NULL) 473 return -1; 474 475 for (sz = 0; sz < RTE_DIM(bulk_sizes); sz++) { 476 const uint64_t start = rte_rdtsc(); 477 for (i = 0; i < iterations; i++) { 478 test_ring_enqueue(r, burst, esize, bulk_sizes[sz], 479 api_type); 480 test_ring_dequeue(r, burst, esize, bulk_sizes[sz], 481 api_type); 482 } 483 const uint64_t end = rte_rdtsc(); 484 485 test_ring_print_test_string(api_type, esize, bulk_sizes[sz], 486 ((double)(end - start)) / iterations); 487 } 488 489 rte_free(burst); 490 491 return 0; 492 } 493 494 /* Run all tests for a given element size */ 495 static __rte_always_inline int 496 test_ring_perf_esize(const int esize) 497 { 498 struct lcore_pair cores; 499 struct rte_ring *r = NULL; 500 501 /* 502 * Performance test for legacy/_elem APIs 503 * SP-SC/MP-MC, single 504 */ 505 r = test_ring_create(RING_NAME, esize, RING_SIZE, rte_socket_id(), 0); 506 if (r == NULL) 507 goto test_fail; 508 509 printf("\n### Testing single element enq/deq ###\n"); 510 if (test_single_enqueue_dequeue(r, esize, 511 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_SINGLE) < 0) 512 goto test_fail; 513 if (test_single_enqueue_dequeue(r, esize, 514 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_SINGLE) < 0) 515 goto test_fail; 516 517 printf("\n### Testing burst enq/deq ###\n"); 518 if (test_burst_bulk_enqueue_dequeue(r, esize, 519 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BURST) < 0) 520 goto test_fail; 521 if (test_burst_bulk_enqueue_dequeue(r, esize, 522 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BURST) < 0) 523 goto test_fail; 524 525 printf("\n### Testing bulk enq/deq ###\n"); 526 if (test_burst_bulk_enqueue_dequeue(r, esize, 527 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK) < 0) 528 goto test_fail; 529 if (test_burst_bulk_enqueue_dequeue(r, esize, 530 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK) < 0) 531 goto test_fail; 532 533 printf("\n### Testing empty bulk deq ###\n"); 534 test_empty_dequeue(r, esize, 535 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK); 536 test_empty_dequeue(r, esize, 537 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); 538 539 if (get_two_hyperthreads(&cores) == 0) { 540 printf("\n### Testing using two hyperthreads ###\n"); 541 if (run_on_core_pair(&cores, r, esize) < 0) 542 goto test_fail; 543 } 544 545 if (get_two_cores(&cores) == 0) { 546 printf("\n### Testing using two physical cores ###\n"); 547 if (run_on_core_pair(&cores, r, esize) < 0) 548 goto test_fail; 549 } 550 if (get_two_sockets(&cores) == 0) { 551 printf("\n### Testing using two NUMA nodes ###\n"); 552 if (run_on_core_pair(&cores, r, esize) < 0) 553 goto test_fail; 554 } 555 556 printf("\n### Testing using all worker nodes ###\n"); 557 if (run_on_all_cores(r, esize) < 0) 558 goto test_fail; 559 560 rte_ring_free(r); 561 562 return 0; 563 564 test_fail: 565 rte_ring_free(r); 566 567 return -1; 568 } 569 570 static int 571 test_ring_perf(void) 572 { 573 /* Run all the tests for different element sizes */ 574 if (test_ring_perf_esize(-1) == -1) 575 return -1; 576 577 if (test_ring_perf_esize(16) == -1) 578 return -1; 579 580 return 0; 581 } 582 583 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf); 584