xref: /dpdk/app/test/test_soring_stress_impl.h (revision 70581c355d6965f7be2dbf1c4fc0d30778c53b98)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  */
3 
4 #include <stdalign.h>
5 
6 #include "test_soring_stress.h"
7 
8 /**
9  * Stress test for soring enqueue/dequeue/acquire/release operations.
10  * Depending on the role, performs at least one of the following patterns
11  * on each worker:
12  * - dequeue/read-write data from/to the dequeued objects/enqueue.
13  * - acquire/read-write data from/to the acquired objects/release.
14  * Serves as both functional and performance test of soring
15  * data-path API under high contention
16  * (for both over committed and non-over committed scenarios).
17  */
18 
19 #define RING_NAME	"SORING_STRESS"
20 #define BULK_NUM	32
21 #define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
22 
23 #define MAX_STAGES	16
24 
25 enum {
26 	WRK_CMD_STOP,
27 	WRK_CMD_RUN,
28 };
29 
30 static alignas(RTE_CACHE_LINE_SIZE) RTE_ATOMIC(uint32_t) wrk_cmd = WRK_CMD_STOP;
31 
32 /* test run-time in seconds */
33 static const uint32_t run_time = 60;
34 static const uint32_t verbose;
35 
36 static rte_spinlock_t dump_lock;
37 
38 struct lcore_op_stat {
39 	uint64_t nb_lcore;
40 	uint64_t nb_call;
41 	uint64_t nb_obj;
42 	uint64_t nb_cycle;
43 	uint64_t max_cycle;
44 	uint64_t min_cycle;
45 };
46 
47 struct lcore_stat {
48 	uint64_t nb_cycle;
49 	struct lcore_op_stat deqenq;
50 	uint32_t role_mask;
51 	uint32_t nb_stage;
52 	struct lcore_op_stat stage[MAX_STAGES];
53 };
54 
55 #define	ROLE_DEQENQ	RTE_BIT32(0)
56 #define	ROLE_STAGE(n)	RTE_BIT32(n + 1)
57 
58 struct __rte_cache_aligned lcore_arg {
59 	struct rte_soring *rng;
60 	struct lcore_stat stats;
61 };
62 
63 struct __rte_cache_aligned ring_elem {
64 	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)];
65 };
66 
67 /*
68  * redefinable functions
69  */
70 
71 static uint32_t
72 _st_ring_dequeue_burst(struct rte_soring *r, void **obj, uint32_t n,
73 	uint32_t *avail);
74 
75 static uint32_t
76 _st_ring_enqueue_bulk(struct rte_soring *r, void * const *obj, uint32_t n,
77 	uint32_t *free);
78 
79 static uint32_t
80 _st_ring_acquire_burst(struct rte_soring *r, uint32_t stage, void **obj,
81 	uint32_t num, uint32_t *token, uint32_t *avail);
82 
83 static void
84 _st_ring_release(struct rte_soring *r, uint32_t stage, uint32_t token,
85 	void * const *obj, uint32_t num);
86 
87 static void
88 lcore_op_stat_update(struct lcore_op_stat *ls, uint64_t call, uint64_t obj,
89 	uint64_t tm, int32_t prcs)
90 {
91 	ls->nb_call += call;
92 	ls->nb_obj += obj;
93 	ls->nb_cycle += tm;
94 	if (prcs) {
95 		ls->max_cycle = RTE_MAX(ls->max_cycle, tm);
96 		ls->min_cycle = RTE_MIN(ls->min_cycle, tm);
97 	}
98 }
99 
100 static void
101 lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
102 	uint64_t tm, int32_t prcs)
103 {
104 	uint32_t i;
105 
106 	ls->nb_cycle += tm;
107 	lcore_op_stat_update(&ls->deqenq, call, obj, tm, prcs);
108 	for (i = 0; i != ls->nb_stage; i++)
109 		lcore_op_stat_update(ls->stage + i, call, obj, tm, prcs);
110 }
111 
112 static void
113 lcore_op_stat_aggr(struct lcore_op_stat *ms, const struct lcore_op_stat *ls)
114 {
115 	ms->nb_call += ls->nb_call;
116 	ms->nb_obj += ls->nb_obj;
117 	ms->nb_cycle += ls->nb_cycle;
118 	ms->max_cycle = RTE_MAX(ms->max_cycle, ls->max_cycle);
119 	ms->min_cycle = RTE_MIN(ms->min_cycle, ls->min_cycle);
120 }
121 
122 static void
123 lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
124 {
125 	uint32_t i;
126 
127 	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
128 	lcore_op_stat_aggr(&ms->deqenq, &ls->deqenq);
129 	ms->deqenq.nb_lcore += ((ls->role_mask & ROLE_DEQENQ) != 0);
130 	for (i = 0; i != ms->nb_stage; i++) {
131 		lcore_op_stat_aggr(ms->stage + i, ls->stage + i);
132 		ms->stage[i].nb_lcore += ((ls->role_mask & ROLE_STAGE(i)) != 0);
133 	}
134 }
135 
136 static void
137 lcore_op_stat_dump(FILE *f, const struct lcore_op_stat *ls, const char *cap,
138 	long double st)
139 {
140 	fprintf(f, "\t%s={\n", cap);
141 
142 	fprintf(f, "\t\tnb_lcore=%" PRIu64 ",\n", ls->nb_lcore);
143 	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->nb_call);
144 	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->nb_obj);
145 	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->nb_cycle);
146 	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
147 		(long double)ls->nb_obj / ls->nb_call);
148 	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
149 		(long double)ls->nb_cycle / ls->nb_obj);
150 	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
151 		(long double)ls->nb_cycle / ls->nb_call);
152 
153 	/* if min/max cycles per call stats was collected */
154 	if (ls->min_cycle != UINT64_MAX) {
155 		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
156 			ls->max_cycle,
157 			(long double)ls->max_cycle / st);
158 		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
159 			ls->min_cycle,
160 			(long double)ls->min_cycle / st);
161 	}
162 
163 	fprintf(f, "\t},\n");
164 }
165 
166 static void
167 lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls)
168 {
169 	uint32_t i;
170 	long double st;
171 	char cap[64];
172 
173 	st = (long double)rte_get_timer_hz() / US_PER_S;
174 
175 	if (lc == UINT32_MAX)
176 		fprintf(f, "%s(AGGREGATE)={\n", __func__);
177 	else
178 		fprintf(f, "%s(lcore=%u)={\n", __func__, lc);
179 
180 	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
181 		ls->nb_cycle, (long double)ls->nb_cycle / st);
182 
183 	lcore_op_stat_dump(f, &ls->deqenq, "DEQ+ENQ", st);
184 	for (i = 0; i != ls->nb_stage; i++) {
185 		snprintf(cap, sizeof(cap), "%s#%u", "STAGE", i);
186 		lcore_op_stat_dump(f, ls->stage + i, cap, st);
187 	}
188 
189 	fprintf(f, "};\n");
190 }
191 
192 static void
193 fill_ring_elm(struct ring_elem *elm, uint32_t fill)
194 {
195 	uint32_t i;
196 
197 	for (i = 0; i != RTE_DIM(elm->cnt); i++)
198 		elm->cnt[i] = fill;
199 }
200 
201 static int32_t
202 check_updt_elem(struct ring_elem *elm[], uint32_t num,
203 	const struct ring_elem *check, const struct ring_elem *fill,
204 	const char *fname, const char *opname, const struct rte_soring *sor)
205 {
206 	uint32_t i;
207 
208 	for (i = 0; i != num; i++) {
209 		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
210 			rte_spinlock_lock(&dump_lock);
211 			printf("%s:%s: %s(lc=%u, num=%u) failed at %u-th iter, "
212 				"offending object: %p\n",
213 				fname, opname, __func__, rte_lcore_id(), num, i,
214 				elm[i]);
215 			rte_memdump(stdout, "expected", check, sizeof(*check));
216 			rte_memdump(stdout, "result", elm[i], sizeof(*elm[i]));
217 			rte_soring_dump(stdout, sor);
218 			rte_spinlock_unlock(&dump_lock);
219 			return -EINVAL;
220 		}
221 		memcpy(elm[i], fill, sizeof(*elm[i]));
222 	}
223 
224 	return 0;
225 }
226 
227 static int
228 check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
229 	enum rte_ring_queue_behavior bhv, const char *fname, const char *opname,
230 	const struct rte_soring *sor)
231 {
232 	if ((bhv == RTE_RING_QUEUE_FIXED && exp != res) ||
233 			(bhv == RTE_RING_QUEUE_VARIABLE && exp < res)) {
234 		rte_spinlock_lock(&dump_lock);
235 		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
236 			fname, lc, opname, exp, res);
237 		rte_soring_dump(stdout, sor);
238 		rte_spinlock_unlock(&dump_lock);
239 		return -ENOSPC;
240 	}
241 	return 0;
242 }
243 
244 /* num in interval [7/8, 11/8] of BULK_NUM */
245 static inline uint32_t
246 rand_elem_num(void)
247 {
248 	uint32_t num;
249 
250 	num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
251 	return num;
252 }
253 
254 /*
255  * for each enabled stage do:
256  *   acquire burst of objects
257  *   read and check their contents
258  *   update and check their contents
259  *   release burst of objects
260  * done
261  */
262 static int32_t
263 test_worker_stages(struct lcore_arg *la, uint32_t lc, const char *fname,
264 	struct ring_elem *obj[2 * BULK_NUM],
265 	const struct ring_elem *def_elm, const struct ring_elem *loc_elm,
266 	const struct ring_elem stg_elm[MAX_STAGES], int32_t prcs)
267 {
268 	int32_t rc;
269 	uint32_t i, n, num, tkn;
270 	uint64_t tm0, tm1;
271 	const struct ring_elem *celm, *pelm;
272 
273 	num = rand_elem_num();
274 
275 	rc = 0;
276 	tkn = 0;
277 	for (i = 0, pelm = def_elm; i != la->stats.nb_stage; pelm = celm, i++) {
278 
279 		celm = stg_elm + i;
280 
281 		/* given stage is not enabled on that lcore */
282 		if ((la->stats.role_mask & ROLE_STAGE(i)) == 0)
283 			continue;
284 
285 		/* reset all pointer values */
286 		memset(obj, 0, sizeof(*obj) * num);
287 
288 		/* acquire num elems */
289 		tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
290 		n = _st_ring_acquire_burst(la->rng, i, (void **)obj, num,
291 				&tkn, NULL);
292 		tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
293 
294 		/* check return value and objects */
295 		rc = check_ring_op(num, n, lc, RTE_RING_QUEUE_VARIABLE, fname,
296 			RTE_STR(_st_ring_stage_acquire), la->rng);
297 		if (rc == 0)
298 			rc = check_updt_elem(obj, n, pelm, loc_elm, fname,
299 				RTE_STR(_st_ring_stage_acquire), la->rng);
300 		if (rc != 0)
301 			break;
302 
303 		/* release num elems */
304 		rte_compiler_barrier();
305 		rc = check_updt_elem(obj, n, loc_elm, celm, fname,
306 			RTE_STR(_st_ring_stage_release), la->rng);
307 		if (rc != 0)
308 			break;
309 
310 		if (n == 0)
311 			tm1 = 0;
312 		else {
313 			tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
314 			_st_ring_release(la->rng, i, tkn,
315 					(void **)obj, n);
316 			tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
317 		}
318 		lcore_op_stat_update(la->stats.stage + i, 1, n, tm0 + tm1,
319 				prcs);
320 	}
321 
322 	return rc;
323 }
324 
325 static int32_t
326 test_worker_deqenq(struct lcore_arg *la, uint32_t lc, const char *fname,
327 	struct ring_elem *obj[2 * BULK_NUM],
328 	const struct ring_elem *def_elm, const struct ring_elem *loc_elm,
329 	const struct ring_elem *pelm, int32_t prcs)
330 {
331 	int32_t rc;
332 	uint32_t k, n, num;
333 	uint64_t tm0, tm1;
334 
335 	num = rand_elem_num();
336 
337 	/* reset all pointer values */
338 	memset(obj, 0, sizeof(*obj) * num);
339 
340 	/* dequeue num elems */
341 	tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
342 	n = _st_ring_dequeue_burst(la->rng, (void **)obj, num, NULL);
343 
344 	tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
345 
346 	/* check return value and objects */
347 	rc = check_ring_op(num, n, lc, RTE_RING_QUEUE_VARIABLE, fname,
348 			RTE_STR(_st_ring_dequeue_bulk), la->rng);
349 	if (rc == 0)
350 		rc = check_updt_elem(obj, n, pelm, loc_elm, fname,
351 			RTE_STR(_st_ring_dequeue_bulk), la->rng);
352 	if (rc != 0)
353 		return rc;
354 
355 	/* enqueue n elems */
356 	rte_compiler_barrier();
357 	rc = check_updt_elem(obj, n, loc_elm, def_elm, fname,
358 		RTE_STR(_st_ring_enqueue_bulk), la->rng);
359 	if (rc != 0)
360 		return rc;
361 
362 	tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
363 	k = _st_ring_enqueue_bulk(la->rng, (void **)obj, n, NULL);
364 	tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
365 
366 	/* check return value */
367 	rc = check_ring_op(n, k, lc, RTE_RING_QUEUE_FIXED, fname,
368 			RTE_STR(_st_ring_enqueue_bulk), la->rng);
369 	if (rc != 0)
370 		return rc;
371 
372 	lcore_op_stat_update(&la->stats.deqenq, 1, n, tm0 + tm1, prcs);
373 	return 0;
374 }
375 
376 static int
377 test_worker(void *arg, const char *fname, int32_t prcs)
378 {
379 	int32_t rc;
380 	uint32_t i, lc;
381 	uint64_t cl;
382 	struct lcore_arg *la;
383 	struct ring_elem *obj[2 * BULK_NUM];
384 	struct ring_elem *pelm, def_elm, loc_elm, stg_elm[MAX_STAGES];
385 
386 	la = arg;
387 	lc = rte_lcore_id();
388 
389 	fill_ring_elm(&def_elm, UINT32_MAX);
390 	fill_ring_elm(&loc_elm, lc);
391 
392 	for (i = 0; i != RTE_DIM(stg_elm); i++)
393 		fill_ring_elm(stg_elm + i, (i + 1) << 24);
394 
395 	pelm = stg_elm + la->stats.nb_stage - 1;
396 
397 	/* Acquire ordering is not required as the main is not
398 	 * really releasing any data through 'wrk_cmd' to
399 	 * the worker.
400 	 */
401 	while (rte_atomic_load_explicit(&wrk_cmd, rte_memory_order_relaxed) !=
402 			WRK_CMD_RUN)
403 		rte_pause();
404 
405 	cl = rte_rdtsc_precise();
406 
407 	do {
408 		if ((la->stats.role_mask & ~ROLE_DEQENQ) != 0) {
409 			rc = test_worker_stages(la, lc, fname, obj,
410 				&def_elm, &loc_elm, stg_elm, prcs);
411 			if (rc != 0)
412 				break;
413 		}
414 
415 		if ((la->stats.role_mask & ROLE_DEQENQ) != 0) {
416 			rc = test_worker_deqenq(la, lc, fname, obj,
417 				&def_elm, &loc_elm, pelm, prcs);
418 			if (rc != 0)
419 				break;
420 		}
421 
422 	} while (rte_atomic_load_explicit(&wrk_cmd,
423 				rte_memory_order_relaxed) == WRK_CMD_RUN);
424 
425 	cl = rte_rdtsc_precise() - cl;
426 	if (prcs == 0)
427 		lcore_stat_update(&la->stats, 0, 0, cl, 0);
428 	la->stats.nb_cycle = cl;
429 	return rc;
430 }
431 static int
432 test_worker_prcs(void *arg)
433 {
434 	return test_worker(arg, __func__, 1);
435 }
436 
437 static int
438 test_worker_avg(void *arg)
439 {
440 	return test_worker(arg, __func__, 0);
441 }
442 
443 static void
444 mt1_fini(struct rte_soring *rng, void *data)
445 {
446 	rte_free(rng);
447 	rte_free(data);
448 }
449 
450 static int
451 mt1_init(struct rte_soring **rng, void **data, uint32_t num,
452 	enum rte_ring_sync_type prod_synt, enum rte_ring_sync_type cons_synt,
453 	uint32_t nb_stages)
454 {
455 	int32_t rc;
456 	size_t sz;
457 	uint32_t i;
458 	struct rte_soring *r;
459 	struct ring_elem *elm;
460 	void *p;
461 	struct rte_soring_param prm;
462 
463 	*rng = NULL;
464 	*data = NULL;
465 
466 	sz = num * sizeof(*elm);
467 	elm = rte_zmalloc(NULL, sz, alignof(typeof(*elm)));
468 	if (elm == NULL) {
469 		printf("%s: alloc(%zu) for %u elems data failed",
470 			__func__, sz, num);
471 		return -ENOMEM;
472 	}
473 
474 	*data = elm;
475 
476 	/* alloc soring */
477 	memset(&prm, 0, sizeof(prm));
478 
479 	prm.name = __func__;
480 	prm.elems = num;
481 	prm.elem_size = sizeof(uintptr_t);
482 	prm.stages = nb_stages;
483 	prm.prod_synt = prod_synt;
484 	prm.cons_synt = cons_synt;
485 
486 	sz = rte_soring_get_memsize(&prm);
487 	r = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
488 	if (r == NULL) {
489 		printf("%s: alloc(%zu) for FIFO with %u elems failed",
490 			__func__, sz, prm.elems);
491 		return -ENOMEM;
492 	}
493 
494 	*rng = r;
495 
496 	rc = rte_soring_init(r, &prm);
497 	if (rc != 0) {
498 		printf("%s: rte_soring_init(r=%p,elems=%u,stages=%u) failed, "
499 			"error: %d(%s)\n",
500 			__func__, r, prm.elems, prm.stages, rc, strerror(-rc));
501 		return rc;
502 	}
503 
504 	for (i = 0; i != num; i++) {
505 		fill_ring_elm(elm + i, UINT32_MAX);
506 		p = elm + i;
507 		if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
508 			break;
509 	}
510 
511 	if (i != num) {
512 		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
513 			__func__, r, num, i);
514 		return -ENOSPC;
515 	}
516 
517 	return 0;
518 }
519 
520 static int
521 test_mt(int (*test)(void *), enum rte_ring_sync_type prod_synt,
522 	enum rte_ring_sync_type cons_synt, uint32_t nb_stage,
523 	const uint32_t role_mask[RTE_MAX_LCORE])
524 {
525 	int32_t rc;
526 	uint32_t i, lc, mc;
527 	struct rte_soring *r;
528 	void *data;
529 	struct lcore_arg arg[RTE_MAX_LCORE];
530 
531 	static const struct lcore_op_stat init_stat = {
532 		.min_cycle = UINT64_MAX,
533 	};
534 
535 	rc = mt1_init(&r, &data, RING_SIZE, prod_synt, cons_synt, nb_stage);
536 
537 	if (rc != 0) {
538 		mt1_fini(r, data);
539 		return rc;
540 	}
541 
542 	memset(arg, 0, sizeof(arg));
543 
544 	/* launch on all workers */
545 	RTE_LCORE_FOREACH_WORKER(lc) {
546 		arg[lc].rng = r;
547 		arg[lc].stats.deqenq = init_stat;
548 		arg[lc].stats.nb_stage = nb_stage;
549 		arg[lc].stats.role_mask = role_mask[lc];
550 		for (i = 0; i != arg[lc].stats.nb_stage; i++)
551 			arg[lc].stats.stage[i] = init_stat;
552 		rte_eal_remote_launch(test, &arg[lc], lc);
553 	}
554 
555 	/* signal workers to start test */
556 	rte_atomic_store_explicit(&wrk_cmd, WRK_CMD_RUN,
557 			rte_memory_order_release);
558 
559 	rte_delay_us(run_time * US_PER_S);
560 
561 	/* signal workers to stop test */
562 	rte_atomic_store_explicit(&wrk_cmd, WRK_CMD_STOP,
563 			rte_memory_order_release);
564 
565 	/* wait for workers and collect stats. */
566 	mc = rte_lcore_id();
567 	arg[mc].stats.deqenq = init_stat;
568 	arg[mc].stats.nb_stage = nb_stage;
569 	for (i = 0; i != arg[mc].stats.nb_stage; i++)
570 		arg[mc].stats.stage[i] = init_stat;
571 
572 	rc = 0;
573 	RTE_LCORE_FOREACH_WORKER(lc) {
574 		rc |= rte_eal_wait_lcore(lc);
575 		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
576 		if (verbose != 0)
577 			lcore_stat_dump(stdout, lc, &arg[lc].stats);
578 	}
579 
580 	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
581 	rte_soring_dump(stdout, r);
582 	mt1_fini(r, data);
583 	return rc;
584 }
585 
586 /*
587  * launch all stages and deq+enq on all worker lcores
588  */
589 static void
590 role_mask_sym(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
591 {
592 	uint32_t lc;
593 	const uint32_t msk =  RTE_BIT32(nb_stage + 2) - 1;
594 
595 	memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
596 	RTE_LCORE_FOREACH_WORKER(lc)
597 		role_mask[lc] = msk;
598 }
599 
600 /*
601  * Divide all workers in two (nearly) equal groups:
602  * - workers from 'even' group do deque+enque
603  * - workers from 'odd' group do acquire/release (for all stages)
604  */
605 static void
606 role_mask_even_odd(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
607 {
608 	uint32_t i, lc;
609 	const uint32_t msk[2] = {
610 		[0] = ROLE_DEQENQ,
611 		[1] =  RTE_GENMASK32(nb_stage + 1, 1),
612 	};
613 
614 	memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
615 
616 	i = 0;
617 	RTE_LCORE_FOREACH_WORKER(lc) {
618 		role_mask[lc] = msk[i & 1];
619 		i++;
620 	}
621 	if (i == 1) {
622 		lc = rte_get_next_lcore(-1, 1, 0);
623 		role_mask[lc] |= msk[i & 1];
624 	}
625 }
626 
627 /*
628  * Divide all workers (nearly) evenly among all possible stages
629  */
630 static void
631 role_mask_div(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
632 {
633 	uint32_t i, lc;
634 	uint32_t msk[nb_stage + 1];
635 
636 	memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
637 
638 	for (i = 0; i != RTE_DIM(msk); i++) {
639 		msk[i] = RTE_BIT32(i);
640 	};
641 
642 	i = 0;
643 	RTE_LCORE_FOREACH_WORKER(lc) {
644 		role_mask[lc] = msk[i % RTE_DIM(msk)];
645 		i++;
646 	}
647 	if (i < RTE_DIM(msk)) {
648 		lc = rte_get_next_lcore(-1, 1, 0);
649 		for (; i != RTE_DIM(msk); i++)
650 			role_mask[lc] |= msk[i % RTE_DIM(msk)];
651 	}
652 }
653 
654 /*
655  * one worker does ST enqueue+dequeue, while all others - stages processing.
656  */
657 static void
658 role_mask_denq_st(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
659 {
660 	uint32_t i, lc;
661 	const uint32_t msk[2] = {
662 		[0] = ROLE_DEQENQ,
663 		[1] =  RTE_GENMASK32(nb_stage + 1, 1),
664 	};
665 
666 	memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
667 
668 	i = 0;
669 	RTE_LCORE_FOREACH_WORKER(lc) {
670 		if (i == 0)
671 			role_mask[lc] = msk[0];
672 		else
673 			role_mask[lc] = msk[1];
674 		i++;
675 	}
676 	if (i == 1) {
677 		lc = rte_get_next_lcore(-1, 1, 0);
678 		role_mask[lc] |= msk[1];
679 	}
680 }
681 
682 
683 static int
684 test_sym_mt1(int (*test)(void *))
685 {
686 	uint32_t role_mask[RTE_MAX_LCORE];
687 	const uint32_t nb_stage = 1;
688 
689 	role_mask_sym(nb_stage, role_mask);
690 	return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
691 			nb_stage, role_mask);
692 }
693 
694 static int
695 test_sym_mt4(int (*test)(void *))
696 {
697 	uint32_t role_mask[RTE_MAX_LCORE];
698 
699 	const uint32_t nb_stage = 4;
700 
701 	role_mask_sym(nb_stage, role_mask);
702 	return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
703 			nb_stage, role_mask);
704 }
705 
706 static int
707 test_sym_mt_rts4(int (*test)(void *))
708 {
709 	uint32_t role_mask[RTE_MAX_LCORE];
710 
711 	const uint32_t nb_stage = 4;
712 
713 	role_mask_sym(nb_stage, role_mask);
714 	return test_mt(test, RTE_RING_SYNC_MT_RTS, RTE_RING_SYNC_MT_RTS,
715 			nb_stage, role_mask);
716 }
717 
718 static int
719 test_sym_mt_hts4(int (*test)(void *))
720 {
721 	uint32_t role_mask[RTE_MAX_LCORE];
722 
723 	const uint32_t nb_stage = 4;
724 
725 	role_mask_sym(nb_stage, role_mask);
726 	return test_mt(test, RTE_RING_SYNC_MT_HTS, RTE_RING_SYNC_MT_HTS,
727 			nb_stage, role_mask);
728 }
729 
730 static int
731 test_stdenq_stage4(int (*test)(void *))
732 {
733 	uint32_t role_mask[RTE_MAX_LCORE];
734 
735 	const uint32_t nb_stage = 4;
736 
737 	role_mask_denq_st(nb_stage, role_mask);
738 	return test_mt(test, RTE_RING_SYNC_ST, RTE_RING_SYNC_ST,
739 			nb_stage, role_mask);
740 }
741 
742 
743 static int
744 test_even_odd_mt5(int (*test)(void *))
745 {
746 	uint32_t role_mask[RTE_MAX_LCORE];
747 
748 	const uint32_t nb_stage = 5;
749 
750 	role_mask_even_odd(nb_stage, role_mask);
751 	return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
752 			nb_stage, role_mask);
753 }
754 
755 static int
756 test_div_mt3(int (*test)(void *))
757 {
758 	uint32_t role_mask[RTE_MAX_LCORE];
759 
760 	const uint32_t nb_stage = 3;
761 
762 	role_mask_div(nb_stage, role_mask);
763 	return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
764 			nb_stage, role_mask);
765 }
766 
767 static const struct test_case tests[] = {
768 	{
769 		.name = "MT_DEQENQ-MT_STG1-PRCS",
770 		.func = test_sym_mt1,
771 		.wfunc = test_worker_prcs,
772 	},
773 	{
774 		.name = "MT_DEQENQ-MT_STG1-AVG",
775 		.func = test_sym_mt1,
776 		.wfunc = test_worker_avg,
777 	},
778 	{
779 		.name = "MT_DEQENQ-MT_STG4-PRCS",
780 		.func = test_sym_mt4,
781 		.wfunc = test_worker_prcs,
782 	},
783 	{
784 		.name = "MT_DEQENQ-MT_STG4-AVG",
785 		.func = test_sym_mt4,
786 		.wfunc = test_worker_avg,
787 	},
788 	{
789 		.name = "MTRTS_DEQENQ-MT_STG4-PRCS",
790 		.func = test_sym_mt_rts4,
791 		.wfunc = test_worker_prcs,
792 	},
793 	{
794 		.name = "MTRTS_DEQENQ-MT_STG4-AVG",
795 		.func = test_sym_mt_rts4,
796 		.wfunc = test_worker_avg,
797 	},
798 	{
799 		.name = "MTHTS_DEQENQ-MT_STG4-PRCS",
800 		.func = test_sym_mt_hts4,
801 		.wfunc = test_worker_prcs,
802 	},
803 	{
804 		.name = "MTHTS_DEQENQ-MT_STG4-AVG",
805 		.func = test_sym_mt_hts4,
806 		.wfunc = test_worker_avg,
807 	},
808 	{
809 		.name = "MT_DEQENQ-MT_STG5-1:1-PRCS",
810 		.func = test_even_odd_mt5,
811 		.wfunc = test_worker_prcs,
812 	},
813 	{
814 		.name = "MT_DEQENQ-MT_STG5-1:1-AVG",
815 		.func = test_even_odd_mt5,
816 		.wfunc = test_worker_avg,
817 	},
818 	{
819 		.name = "MT_DEQENQ-MT_STG3-1:3-PRCS",
820 		.func = test_div_mt3,
821 		.wfunc = test_worker_prcs,
822 	},
823 	{
824 		.name = "MT_DEQENQ_MT_STG3-1:3-AVG",
825 		.func = test_div_mt3,
826 		.wfunc = test_worker_avg,
827 	},
828 	{
829 		.name = "ST_DEQENQ-MT_STG4-PRCS",
830 		.func = test_stdenq_stage4,
831 		.wfunc = test_worker_prcs,
832 	},
833 	{
834 		.name = "ST_DEQENQ-MT_STG4-AVG",
835 		.func = test_stdenq_stage4,
836 		.wfunc = test_worker_avg,
837 	},
838 };
839