1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 */ 4 5 6 #include <stdio.h> 7 #include <inttypes.h> 8 #include <rte_ring.h> 9 #include <rte_cycles.h> 10 #include <rte_launch.h> 11 #include <rte_pause.h> 12 13 #include "test.h" 14 15 /* 16 * Ring 17 * ==== 18 * 19 * Measures performance of various operations using rdtsc 20 * * Empty ring dequeue 21 * * Enqueue/dequeue of bursts in 1 threads 22 * * Enqueue/dequeue of bursts in 2 threads 23 */ 24 25 #define RING_NAME "RING_PERF" 26 #define RING_SIZE 4096 27 #define MAX_BURST 32 28 29 /* 30 * the sizes to enqueue and dequeue in testing 31 * (marked volatile so they won't be seen as compile-time constants) 32 */ 33 static const volatile unsigned bulk_sizes[] = { 8, 32 }; 34 35 struct lcore_pair { 36 unsigned c1, c2; 37 }; 38 39 static volatile unsigned lcore_count = 0; 40 41 /**** Functions to analyse our core mask to get cores for different tests ***/ 42 43 static int 44 get_two_hyperthreads(struct lcore_pair *lcp) 45 { 46 unsigned id1, id2; 47 unsigned c1, c2, s1, s2; 48 RTE_LCORE_FOREACH(id1) { 49 /* inner loop just re-reads all id's. We could skip the first few 50 * elements, but since number of cores is small there is little point 51 */ 52 RTE_LCORE_FOREACH(id2) { 53 if (id1 == id2) 54 continue; 55 56 c1 = rte_lcore_to_cpu_id(id1); 57 c2 = rte_lcore_to_cpu_id(id2); 58 s1 = rte_lcore_to_socket_id(id1); 59 s2 = rte_lcore_to_socket_id(id2); 60 if ((c1 == c2) && (s1 == s2)){ 61 lcp->c1 = id1; 62 lcp->c2 = id2; 63 return 0; 64 } 65 } 66 } 67 return 1; 68 } 69 70 static int 71 get_two_cores(struct lcore_pair *lcp) 72 { 73 unsigned id1, id2; 74 unsigned c1, c2, s1, s2; 75 RTE_LCORE_FOREACH(id1) { 76 RTE_LCORE_FOREACH(id2) { 77 if (id1 == id2) 78 continue; 79 80 c1 = rte_lcore_to_cpu_id(id1); 81 c2 = rte_lcore_to_cpu_id(id2); 82 s1 = rte_lcore_to_socket_id(id1); 83 s2 = rte_lcore_to_socket_id(id2); 84 if ((c1 != c2) && (s1 == s2)){ 85 lcp->c1 = id1; 86 lcp->c2 = id2; 87 return 0; 88 } 89 } 90 } 91 return 1; 92 } 93 94 static int 95 get_two_sockets(struct lcore_pair *lcp) 96 { 97 unsigned id1, id2; 98 unsigned s1, s2; 99 RTE_LCORE_FOREACH(id1) { 100 RTE_LCORE_FOREACH(id2) { 101 if (id1 == id2) 102 continue; 103 s1 = rte_lcore_to_socket_id(id1); 104 s2 = rte_lcore_to_socket_id(id2); 105 if (s1 != s2){ 106 lcp->c1 = id1; 107 lcp->c2 = id2; 108 return 0; 109 } 110 } 111 } 112 return 1; 113 } 114 115 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */ 116 static void 117 test_empty_dequeue(struct rte_ring *r) 118 { 119 const unsigned iter_shift = 26; 120 const unsigned iterations = 1<<iter_shift; 121 unsigned i = 0; 122 void *burst[MAX_BURST]; 123 124 const uint64_t sc_start = rte_rdtsc(); 125 for (i = 0; i < iterations; i++) 126 rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0], NULL); 127 const uint64_t sc_end = rte_rdtsc(); 128 129 const uint64_t mc_start = rte_rdtsc(); 130 for (i = 0; i < iterations; i++) 131 rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[0], NULL); 132 const uint64_t mc_end = rte_rdtsc(); 133 134 printf("SC empty dequeue: %.2F\n", 135 (double)(sc_end-sc_start) / iterations); 136 printf("MC empty dequeue: %.2F\n", 137 (double)(mc_end-mc_start) / iterations); 138 } 139 140 /* 141 * for the separate enqueue and dequeue threads they take in one param 142 * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc 143 */ 144 struct thread_params { 145 struct rte_ring *r; 146 unsigned size; /* input value, the burst size */ 147 double spsc, mpmc; /* output value, the single or multi timings */ 148 }; 149 150 /* 151 * Function that uses rdtsc to measure timing for ring enqueue. Needs pair 152 * thread running dequeue_bulk function 153 */ 154 static int 155 enqueue_bulk(void *p) 156 { 157 const unsigned iter_shift = 23; 158 const unsigned iterations = 1<<iter_shift; 159 struct thread_params *params = p; 160 struct rte_ring *r = params->r; 161 const unsigned size = params->size; 162 unsigned i; 163 void *burst[MAX_BURST] = {0}; 164 165 #ifdef RTE_USE_C11_MEM_MODEL 166 if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2) 167 #else 168 if (__sync_add_and_fetch(&lcore_count, 1) != 2) 169 #endif 170 while(lcore_count != 2) 171 rte_pause(); 172 173 const uint64_t sp_start = rte_rdtsc(); 174 for (i = 0; i < iterations; i++) 175 while (rte_ring_sp_enqueue_bulk(r, burst, size, NULL) == 0) 176 rte_pause(); 177 const uint64_t sp_end = rte_rdtsc(); 178 179 const uint64_t mp_start = rte_rdtsc(); 180 for (i = 0; i < iterations; i++) 181 while (rte_ring_mp_enqueue_bulk(r, burst, size, NULL) == 0) 182 rte_pause(); 183 const uint64_t mp_end = rte_rdtsc(); 184 185 params->spsc = ((double)(sp_end - sp_start))/(iterations*size); 186 params->mpmc = ((double)(mp_end - mp_start))/(iterations*size); 187 return 0; 188 } 189 190 /* 191 * Function that uses rdtsc to measure timing for ring dequeue. Needs pair 192 * thread running enqueue_bulk function 193 */ 194 static int 195 dequeue_bulk(void *p) 196 { 197 const unsigned iter_shift = 23; 198 const unsigned iterations = 1<<iter_shift; 199 struct thread_params *params = p; 200 struct rte_ring *r = params->r; 201 const unsigned size = params->size; 202 unsigned i; 203 void *burst[MAX_BURST] = {0}; 204 205 #ifdef RTE_USE_C11_MEM_MODEL 206 if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2) 207 #else 208 if (__sync_add_and_fetch(&lcore_count, 1) != 2) 209 #endif 210 while(lcore_count != 2) 211 rte_pause(); 212 213 const uint64_t sc_start = rte_rdtsc(); 214 for (i = 0; i < iterations; i++) 215 while (rte_ring_sc_dequeue_bulk(r, burst, size, NULL) == 0) 216 rte_pause(); 217 const uint64_t sc_end = rte_rdtsc(); 218 219 const uint64_t mc_start = rte_rdtsc(); 220 for (i = 0; i < iterations; i++) 221 while (rte_ring_mc_dequeue_bulk(r, burst, size, NULL) == 0) 222 rte_pause(); 223 const uint64_t mc_end = rte_rdtsc(); 224 225 params->spsc = ((double)(sc_end - sc_start))/(iterations*size); 226 params->mpmc = ((double)(mc_end - mc_start))/(iterations*size); 227 return 0; 228 } 229 230 /* 231 * Function that calls the enqueue and dequeue bulk functions on pairs of cores. 232 * used to measure ring perf between hyperthreads, cores and sockets. 233 */ 234 static void 235 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r, 236 lcore_function_t f1, lcore_function_t f2) 237 { 238 struct thread_params param1 = {0}, param2 = {0}; 239 unsigned i; 240 for (i = 0; i < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); i++) { 241 lcore_count = 0; 242 param1.size = param2.size = bulk_sizes[i]; 243 param1.r = param2.r = r; 244 if (cores->c1 == rte_get_master_lcore()) { 245 rte_eal_remote_launch(f2, ¶m2, cores->c2); 246 f1(¶m1); 247 rte_eal_wait_lcore(cores->c2); 248 } else { 249 rte_eal_remote_launch(f1, ¶m1, cores->c1); 250 rte_eal_remote_launch(f2, ¶m2, cores->c2); 251 rte_eal_wait_lcore(cores->c1); 252 rte_eal_wait_lcore(cores->c2); 253 } 254 printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i], 255 param1.spsc + param2.spsc); 256 printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i], 257 param1.mpmc + param2.mpmc); 258 } 259 } 260 261 /* 262 * Test function that determines how long an enqueue + dequeue of a single item 263 * takes on a single lcore. Result is for comparison with the bulk enq+deq. 264 */ 265 static void 266 test_single_enqueue_dequeue(struct rte_ring *r) 267 { 268 const unsigned iter_shift = 24; 269 const unsigned iterations = 1<<iter_shift; 270 unsigned i = 0; 271 void *burst = NULL; 272 273 const uint64_t sc_start = rte_rdtsc(); 274 for (i = 0; i < iterations; i++) { 275 rte_ring_sp_enqueue(r, burst); 276 rte_ring_sc_dequeue(r, &burst); 277 } 278 const uint64_t sc_end = rte_rdtsc(); 279 280 const uint64_t mc_start = rte_rdtsc(); 281 for (i = 0; i < iterations; i++) { 282 rte_ring_mp_enqueue(r, burst); 283 rte_ring_mc_dequeue(r, &burst); 284 } 285 const uint64_t mc_end = rte_rdtsc(); 286 287 printf("SP/SC single enq/dequeue: %"PRIu64"\n", 288 (sc_end-sc_start) >> iter_shift); 289 printf("MP/MC single enq/dequeue: %"PRIu64"\n", 290 (mc_end-mc_start) >> iter_shift); 291 } 292 293 /* 294 * Test that does both enqueue and dequeue on a core using the burst() API calls 295 * instead of the bulk() calls used in other tests. Results should be the same 296 * as for the bulk function called on a single lcore. 297 */ 298 static void 299 test_burst_enqueue_dequeue(struct rte_ring *r) 300 { 301 const unsigned iter_shift = 23; 302 const unsigned iterations = 1<<iter_shift; 303 unsigned sz, i = 0; 304 void *burst[MAX_BURST] = {0}; 305 306 for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) { 307 const uint64_t sc_start = rte_rdtsc(); 308 for (i = 0; i < iterations; i++) { 309 rte_ring_sp_enqueue_burst(r, burst, 310 bulk_sizes[sz], NULL); 311 rte_ring_sc_dequeue_burst(r, burst, 312 bulk_sizes[sz], NULL); 313 } 314 const uint64_t sc_end = rte_rdtsc(); 315 316 const uint64_t mc_start = rte_rdtsc(); 317 for (i = 0; i < iterations; i++) { 318 rte_ring_mp_enqueue_burst(r, burst, 319 bulk_sizes[sz], NULL); 320 rte_ring_mc_dequeue_burst(r, burst, 321 bulk_sizes[sz], NULL); 322 } 323 const uint64_t mc_end = rte_rdtsc(); 324 325 uint64_t mc_avg = ((mc_end-mc_start) >> iter_shift) / bulk_sizes[sz]; 326 uint64_t sc_avg = ((sc_end-sc_start) >> iter_shift) / bulk_sizes[sz]; 327 328 printf("SP/SC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz], 329 sc_avg); 330 printf("MP/MC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz], 331 mc_avg); 332 } 333 } 334 335 /* Times enqueue and dequeue on a single lcore */ 336 static void 337 test_bulk_enqueue_dequeue(struct rte_ring *r) 338 { 339 const unsigned iter_shift = 23; 340 const unsigned iterations = 1<<iter_shift; 341 unsigned sz, i = 0; 342 void *burst[MAX_BURST] = {0}; 343 344 for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) { 345 const uint64_t sc_start = rte_rdtsc(); 346 for (i = 0; i < iterations; i++) { 347 rte_ring_sp_enqueue_bulk(r, burst, 348 bulk_sizes[sz], NULL); 349 rte_ring_sc_dequeue_bulk(r, burst, 350 bulk_sizes[sz], NULL); 351 } 352 const uint64_t sc_end = rte_rdtsc(); 353 354 const uint64_t mc_start = rte_rdtsc(); 355 for (i = 0; i < iterations; i++) { 356 rte_ring_mp_enqueue_bulk(r, burst, 357 bulk_sizes[sz], NULL); 358 rte_ring_mc_dequeue_bulk(r, burst, 359 bulk_sizes[sz], NULL); 360 } 361 const uint64_t mc_end = rte_rdtsc(); 362 363 double sc_avg = ((double)(sc_end-sc_start) / 364 (iterations * bulk_sizes[sz])); 365 double mc_avg = ((double)(mc_end-mc_start) / 366 (iterations * bulk_sizes[sz])); 367 368 printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz], 369 sc_avg); 370 printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz], 371 mc_avg); 372 } 373 } 374 375 static int 376 test_ring_perf(void) 377 { 378 struct lcore_pair cores; 379 struct rte_ring *r = NULL; 380 381 r = rte_ring_create(RING_NAME, RING_SIZE, rte_socket_id(), 0); 382 if (r == NULL) 383 return -1; 384 385 printf("### Testing single element and burst enq/deq ###\n"); 386 test_single_enqueue_dequeue(r); 387 test_burst_enqueue_dequeue(r); 388 389 printf("\n### Testing empty dequeue ###\n"); 390 test_empty_dequeue(r); 391 392 printf("\n### Testing using a single lcore ###\n"); 393 test_bulk_enqueue_dequeue(r); 394 395 if (get_two_hyperthreads(&cores) == 0) { 396 printf("\n### Testing using two hyperthreads ###\n"); 397 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk); 398 } 399 if (get_two_cores(&cores) == 0) { 400 printf("\n### Testing using two physical cores ###\n"); 401 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk); 402 } 403 if (get_two_sockets(&cores) == 0) { 404 printf("\n### Testing using two NUMA nodes ###\n"); 405 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk); 406 } 407 rte_ring_free(r); 408 return 0; 409 } 410 411 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf); 412