1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include <stdalign.h> 6 7 #include "test_ring_stress.h" 8 9 /** 10 * Stress test for ring enqueue/dequeue operations. 11 * Performs the following pattern on each worker: 12 * dequeue/read-write data from the dequeued objects/enqueue. 13 * Serves as both functional and performance test of ring 14 * enqueue/dequeue operations under high contention 15 * (for both over committed and non-over committed scenarios). 16 */ 17 18 #define RING_NAME "RING_STRESS" 19 #define BULK_NUM 32 20 #define RING_SIZE (2 * BULK_NUM * RTE_MAX_LCORE) 21 22 enum { 23 WRK_CMD_STOP, 24 WRK_CMD_RUN, 25 }; 26 27 static alignas(RTE_CACHE_LINE_SIZE) RTE_ATOMIC(uint32_t) wrk_cmd = WRK_CMD_STOP; 28 29 /* test run-time in seconds */ 30 static const uint32_t run_time = 60; 31 static const uint32_t verbose; 32 33 struct lcore_stat { 34 uint64_t nb_cycle; 35 struct { 36 uint64_t nb_call; 37 uint64_t nb_obj; 38 uint64_t nb_cycle; 39 uint64_t max_cycle; 40 uint64_t min_cycle; 41 } op; 42 }; 43 44 struct __rte_cache_aligned lcore_arg { 45 struct rte_ring *rng; 46 struct lcore_stat stats; 47 }; 48 49 struct __rte_cache_aligned ring_elem { 50 uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)]; 51 }; 52 53 /* 54 * redefinable functions 55 */ 56 static uint32_t 57 _st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, 58 uint32_t *avail); 59 60 static uint32_t 61 _st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, 62 uint32_t *free); 63 64 static int 65 _st_ring_init(struct rte_ring *r, const char *name, uint32_t num); 66 67 68 static void 69 lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj, 70 uint64_t tm, int32_t prcs) 71 { 72 ls->op.nb_call += call; 73 ls->op.nb_obj += obj; 74 ls->op.nb_cycle += tm; 75 if (prcs) { 76 ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm); 77 ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm); 78 } 79 } 80 81 static void 82 lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) 83 { 84 85 ms->op.nb_call += ls->op.nb_call; 86 ms->op.nb_obj += ls->op.nb_obj; 87 ms->op.nb_cycle += ls->op.nb_cycle; 88 ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle); 89 ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle); 90 } 91 92 static void 93 lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) 94 { 95 ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle); 96 lcore_op_stat_aggr(ms, ls); 97 } 98 99 static void 100 lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls) 101 { 102 long double st; 103 104 st = (long double)rte_get_timer_hz() / US_PER_S; 105 106 if (lc == UINT32_MAX) 107 fprintf(f, "%s(AGGREGATE)={\n", __func__); 108 else 109 fprintf(f, "%s(lcore=%u)={\n", __func__, lc); 110 111 fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n", 112 ls->nb_cycle, (long double)ls->nb_cycle / st); 113 114 fprintf(f, "\tDEQ+ENQ={\n"); 115 116 fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call); 117 fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj); 118 fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle); 119 fprintf(f, "\t\tobj/call(avg): %.2Lf\n", 120 (long double)ls->op.nb_obj / ls->op.nb_call); 121 fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n", 122 (long double)ls->op.nb_cycle / ls->op.nb_obj); 123 fprintf(f, "\t\tcycles/call(avg): %.2Lf\n", 124 (long double)ls->op.nb_cycle / ls->op.nb_call); 125 126 /* if min/max cycles per call stats was collected */ 127 if (ls->op.min_cycle != UINT64_MAX) { 128 fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n", 129 ls->op.max_cycle, 130 (long double)ls->op.max_cycle / st); 131 fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n", 132 ls->op.min_cycle, 133 (long double)ls->op.min_cycle / st); 134 } 135 136 fprintf(f, "\t},\n"); 137 fprintf(f, "};\n"); 138 } 139 140 static void 141 fill_ring_elm(struct ring_elem *elm, uint32_t fill) 142 { 143 uint32_t i; 144 145 for (i = 0; i != RTE_DIM(elm->cnt); i++) 146 elm->cnt[i] = fill; 147 } 148 149 static int32_t 150 check_updt_elem(struct ring_elem *elm[], uint32_t num, 151 const struct ring_elem *check, const struct ring_elem *fill) 152 { 153 uint32_t i; 154 155 static rte_spinlock_t dump_lock; 156 157 for (i = 0; i != num; i++) { 158 if (memcmp(check, elm[i], sizeof(*check)) != 0) { 159 rte_spinlock_lock(&dump_lock); 160 printf("%s(lc=%u, num=%u) failed at %u-th iter, " 161 "offending object: %p\n", 162 __func__, rte_lcore_id(), num, i, elm[i]); 163 rte_memdump(stdout, "expected", check, sizeof(*check)); 164 rte_memdump(stdout, "result", elm[i], sizeof(*elm[i])); 165 rte_spinlock_unlock(&dump_lock); 166 return -EINVAL; 167 } 168 memcpy(elm[i], fill, sizeof(*elm[i])); 169 } 170 171 return 0; 172 } 173 174 static int 175 check_ring_op(uint32_t exp, uint32_t res, uint32_t lc, 176 const char *fname, const char *opname) 177 { 178 if (exp != res) { 179 printf("%s(lc=%u) failure: %s expected: %u, returned %u\n", 180 fname, lc, opname, exp, res); 181 return -ENOSPC; 182 } 183 return 0; 184 } 185 186 static int 187 test_worker(void *arg, const char *fname, int32_t prcs) 188 { 189 int32_t rc; 190 uint32_t lc, n, num; 191 uint64_t cl, tm0, tm1; 192 struct lcore_arg *la; 193 struct ring_elem def_elm, loc_elm; 194 struct ring_elem *obj[2 * BULK_NUM]; 195 196 la = arg; 197 lc = rte_lcore_id(); 198 199 fill_ring_elm(&def_elm, UINT32_MAX); 200 fill_ring_elm(&loc_elm, lc); 201 202 /* Acquire ordering is not required as the main is not 203 * really releasing any data through 'wrk_cmd' to 204 * the worker. 205 */ 206 while (rte_atomic_load_explicit(&wrk_cmd, rte_memory_order_relaxed) != WRK_CMD_RUN) 207 rte_pause(); 208 209 cl = rte_rdtsc_precise(); 210 211 do { 212 /* num in interval [7/8, 11/8] of BULK_NUM */ 213 num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2); 214 215 /* reset all pointer values */ 216 memset(obj, 0, sizeof(obj)); 217 218 /* dequeue num elems */ 219 tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0; 220 n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL); 221 tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0; 222 223 /* check return value and objects */ 224 rc = check_ring_op(num, n, lc, fname, 225 RTE_STR(_st_ring_dequeue_bulk)); 226 if (rc == 0) 227 rc = check_updt_elem(obj, num, &def_elm, &loc_elm); 228 if (rc != 0) 229 break; 230 231 /* enqueue num elems */ 232 rte_compiler_barrier(); 233 rc = check_updt_elem(obj, num, &loc_elm, &def_elm); 234 if (rc != 0) 235 break; 236 237 tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0; 238 n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL); 239 tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0; 240 241 /* check return value */ 242 rc = check_ring_op(num, n, lc, fname, 243 RTE_STR(_st_ring_enqueue_bulk)); 244 if (rc != 0) 245 break; 246 247 lcore_stat_update(&la->stats, 1, num, tm0 + tm1, prcs); 248 249 } while (rte_atomic_load_explicit(&wrk_cmd, rte_memory_order_relaxed) == WRK_CMD_RUN); 250 251 cl = rte_rdtsc_precise() - cl; 252 if (prcs == 0) 253 lcore_stat_update(&la->stats, 0, 0, cl, 0); 254 la->stats.nb_cycle = cl; 255 return rc; 256 } 257 static int 258 test_worker_prcs(void *arg) 259 { 260 return test_worker(arg, __func__, 1); 261 } 262 263 static int 264 test_worker_avg(void *arg) 265 { 266 return test_worker(arg, __func__, 0); 267 } 268 269 static void 270 mt1_fini(struct rte_ring *rng, void *data) 271 { 272 rte_free(rng); 273 rte_free(data); 274 } 275 276 static int 277 mt1_init(struct rte_ring **rng, void **data, uint32_t num) 278 { 279 int32_t rc; 280 size_t sz; 281 uint32_t i, nr; 282 struct rte_ring *r; 283 struct ring_elem *elm; 284 void *p; 285 286 *rng = NULL; 287 *data = NULL; 288 289 sz = num * sizeof(*elm); 290 elm = rte_zmalloc(NULL, sz, alignof(typeof(*elm))); 291 if (elm == NULL) { 292 printf("%s: alloc(%zu) for %u elems data failed", 293 __func__, sz, num); 294 return -ENOMEM; 295 } 296 297 *data = elm; 298 299 /* alloc ring */ 300 nr = rte_align32pow2(2 * num); 301 sz = rte_ring_get_memsize(nr); 302 r = rte_zmalloc(NULL, sz, alignof(typeof(*r))); 303 if (r == NULL) { 304 printf("%s: alloc(%zu) for FIFO with %u elems failed", 305 __func__, sz, nr); 306 return -ENOMEM; 307 } 308 309 *rng = r; 310 311 rc = _st_ring_init(r, RING_NAME, nr); 312 if (rc != 0) { 313 printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n", 314 __func__, r, nr, rc, strerror(-rc)); 315 return rc; 316 } 317 318 for (i = 0; i != num; i++) { 319 fill_ring_elm(elm + i, UINT32_MAX); 320 p = elm + i; 321 if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1) 322 break; 323 } 324 325 if (i != num) { 326 printf("%s: _st_ring_enqueue(%p, %u) returned %u\n", 327 __func__, r, num, i); 328 return -ENOSPC; 329 } 330 331 return 0; 332 } 333 334 static int 335 test_mt1(int (*test)(void *)) 336 { 337 int32_t rc; 338 uint32_t lc, mc; 339 struct rte_ring *r; 340 void *data; 341 struct lcore_arg arg[RTE_MAX_LCORE]; 342 343 static const struct lcore_stat init_stat = { 344 .op.min_cycle = UINT64_MAX, 345 }; 346 347 rc = mt1_init(&r, &data, RING_SIZE); 348 if (rc != 0) { 349 mt1_fini(r, data); 350 return rc; 351 } 352 353 memset(arg, 0, sizeof(arg)); 354 355 /* launch on all workers */ 356 RTE_LCORE_FOREACH_WORKER(lc) { 357 arg[lc].rng = r; 358 arg[lc].stats = init_stat; 359 rte_eal_remote_launch(test, &arg[lc], lc); 360 } 361 362 /* signal worker to start test */ 363 rte_atomic_store_explicit(&wrk_cmd, WRK_CMD_RUN, rte_memory_order_release); 364 365 rte_delay_us(run_time * US_PER_S); 366 367 /* signal worker to start test */ 368 rte_atomic_store_explicit(&wrk_cmd, WRK_CMD_STOP, rte_memory_order_release); 369 370 /* wait for workers and collect stats. */ 371 mc = rte_lcore_id(); 372 arg[mc].stats = init_stat; 373 374 rc = 0; 375 RTE_LCORE_FOREACH_WORKER(lc) { 376 rc |= rte_eal_wait_lcore(lc); 377 lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats); 378 if (verbose != 0) 379 lcore_stat_dump(stdout, lc, &arg[lc].stats); 380 } 381 382 lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats); 383 rte_ring_dump(stdout, r); 384 mt1_fini(r, data); 385 return rc; 386 } 387 388 static const struct test_case tests[] = { 389 { 390 .name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS", 391 .func = test_mt1, 392 .wfunc = test_worker_prcs, 393 }, 394 { 395 .name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG", 396 .func = test_mt1, 397 .wfunc = test_worker_avg, 398 }, 399 }; 400