xref: /dpdk/app/test/test_ring_stress_impl.h (revision 700989f512bbc2ee9758a8a9cb6973cfdeda6f27)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4 
5 #include <stdalign.h>
6 
7 #include "test_ring_stress.h"
8 
9 /**
10  * Stress test for ring enqueue/dequeue operations.
11  * Performs the following pattern on each worker:
12  * dequeue/read-write data from the dequeued objects/enqueue.
13  * Serves as both functional and performance test of ring
14  * enqueue/dequeue operations under high contention
15  * (for both over committed and non-over committed scenarios).
16  */
17 
18 #define RING_NAME	"RING_STRESS"
19 #define BULK_NUM	32
20 #define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
21 
22 enum {
23 	WRK_CMD_STOP,
24 	WRK_CMD_RUN,
25 };
26 
27 static alignas(RTE_CACHE_LINE_SIZE) RTE_ATOMIC(uint32_t) wrk_cmd = WRK_CMD_STOP;
28 
29 /* test run-time in seconds */
30 static const uint32_t run_time = 60;
31 static const uint32_t verbose;
32 
33 struct lcore_stat {
34 	uint64_t nb_cycle;
35 	struct {
36 		uint64_t nb_call;
37 		uint64_t nb_obj;
38 		uint64_t nb_cycle;
39 		uint64_t max_cycle;
40 		uint64_t min_cycle;
41 	} op;
42 };
43 
44 struct __rte_cache_aligned lcore_arg {
45 	struct rte_ring *rng;
46 	struct lcore_stat stats;
47 };
48 
49 struct __rte_cache_aligned ring_elem {
50 	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)];
51 };
52 
53 /*
54  * redefinable functions
55  */
56 static uint32_t
57 _st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
58 	uint32_t *avail);
59 
60 static uint32_t
61 _st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
62 	uint32_t *free);
63 
64 static int
65 _st_ring_init(struct rte_ring *r, const char *name, uint32_t num);
66 
67 
68 static void
69 lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
70 	uint64_t tm, int32_t prcs)
71 {
72 	ls->op.nb_call += call;
73 	ls->op.nb_obj += obj;
74 	ls->op.nb_cycle += tm;
75 	if (prcs) {
76 		ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm);
77 		ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm);
78 	}
79 }
80 
81 static void
82 lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
83 {
84 
85 	ms->op.nb_call += ls->op.nb_call;
86 	ms->op.nb_obj += ls->op.nb_obj;
87 	ms->op.nb_cycle += ls->op.nb_cycle;
88 	ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle);
89 	ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle);
90 }
91 
92 static void
93 lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
94 {
95 	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
96 	lcore_op_stat_aggr(ms, ls);
97 }
98 
99 static void
100 lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls)
101 {
102 	long double st;
103 
104 	st = (long double)rte_get_timer_hz() / US_PER_S;
105 
106 	if (lc == UINT32_MAX)
107 		fprintf(f, "%s(AGGREGATE)={\n", __func__);
108 	else
109 		fprintf(f, "%s(lcore=%u)={\n", __func__, lc);
110 
111 	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
112 		ls->nb_cycle, (long double)ls->nb_cycle / st);
113 
114 	fprintf(f, "\tDEQ+ENQ={\n");
115 
116 	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call);
117 	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj);
118 	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle);
119 	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
120 		(long double)ls->op.nb_obj / ls->op.nb_call);
121 	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
122 		(long double)ls->op.nb_cycle / ls->op.nb_obj);
123 	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
124 		(long double)ls->op.nb_cycle / ls->op.nb_call);
125 
126 	/* if min/max cycles per call stats was collected */
127 	if (ls->op.min_cycle != UINT64_MAX) {
128 		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
129 			ls->op.max_cycle,
130 			(long double)ls->op.max_cycle / st);
131 		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
132 			ls->op.min_cycle,
133 			(long double)ls->op.min_cycle / st);
134 	}
135 
136 	fprintf(f, "\t},\n");
137 	fprintf(f, "};\n");
138 }
139 
140 static void
141 fill_ring_elm(struct ring_elem *elm, uint32_t fill)
142 {
143 	uint32_t i;
144 
145 	for (i = 0; i != RTE_DIM(elm->cnt); i++)
146 		elm->cnt[i] = fill;
147 }
148 
149 static int32_t
150 check_updt_elem(struct ring_elem *elm[], uint32_t num,
151 	const struct ring_elem *check, const struct ring_elem *fill)
152 {
153 	uint32_t i;
154 
155 	static rte_spinlock_t dump_lock;
156 
157 	for (i = 0; i != num; i++) {
158 		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
159 			rte_spinlock_lock(&dump_lock);
160 			printf("%s(lc=%u, num=%u) failed at %u-th iter, "
161 				"offending object: %p\n",
162 				__func__, rte_lcore_id(), num, i, elm[i]);
163 			rte_memdump(stdout, "expected", check, sizeof(*check));
164 			rte_memdump(stdout, "result", elm[i], sizeof(*elm[i]));
165 			rte_spinlock_unlock(&dump_lock);
166 			return -EINVAL;
167 		}
168 		memcpy(elm[i], fill, sizeof(*elm[i]));
169 	}
170 
171 	return 0;
172 }
173 
174 static int
175 check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
176 	const char *fname, const char *opname)
177 {
178 	if (exp != res) {
179 		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
180 			fname, lc, opname, exp, res);
181 		return -ENOSPC;
182 	}
183 	return 0;
184 }
185 
186 static int
187 test_worker(void *arg, const char *fname, int32_t prcs)
188 {
189 	int32_t rc;
190 	uint32_t lc, n, num;
191 	uint64_t cl, tm0, tm1;
192 	struct lcore_arg *la;
193 	struct ring_elem def_elm, loc_elm;
194 	struct ring_elem *obj[2 * BULK_NUM];
195 
196 	la = arg;
197 	lc = rte_lcore_id();
198 
199 	fill_ring_elm(&def_elm, UINT32_MAX);
200 	fill_ring_elm(&loc_elm, lc);
201 
202 	/* Acquire ordering is not required as the main is not
203 	 * really releasing any data through 'wrk_cmd' to
204 	 * the worker.
205 	 */
206 	while (rte_atomic_load_explicit(&wrk_cmd, rte_memory_order_relaxed) != WRK_CMD_RUN)
207 		rte_pause();
208 
209 	cl = rte_rdtsc_precise();
210 
211 	do {
212 		/* num in interval [7/8, 11/8] of BULK_NUM */
213 		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
214 
215 		/* reset all pointer values */
216 		memset(obj, 0, sizeof(obj));
217 
218 		/* dequeue num elems */
219 		tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
220 		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
221 		tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
222 
223 		/* check return value and objects */
224 		rc = check_ring_op(num, n, lc, fname,
225 			RTE_STR(_st_ring_dequeue_bulk));
226 		if (rc == 0)
227 			rc = check_updt_elem(obj, num, &def_elm, &loc_elm);
228 		if (rc != 0)
229 			break;
230 
231 		/* enqueue num elems */
232 		rte_compiler_barrier();
233 		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
234 		if (rc != 0)
235 			break;
236 
237 		tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
238 		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
239 		tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
240 
241 		/* check return value */
242 		rc = check_ring_op(num, n, lc, fname,
243 			RTE_STR(_st_ring_enqueue_bulk));
244 		if (rc != 0)
245 			break;
246 
247 		lcore_stat_update(&la->stats, 1, num, tm0 + tm1, prcs);
248 
249 	} while (rte_atomic_load_explicit(&wrk_cmd, rte_memory_order_relaxed) == WRK_CMD_RUN);
250 
251 	cl = rte_rdtsc_precise() - cl;
252 	if (prcs == 0)
253 		lcore_stat_update(&la->stats, 0, 0, cl, 0);
254 	la->stats.nb_cycle = cl;
255 	return rc;
256 }
257 static int
258 test_worker_prcs(void *arg)
259 {
260 	return test_worker(arg, __func__, 1);
261 }
262 
263 static int
264 test_worker_avg(void *arg)
265 {
266 	return test_worker(arg, __func__, 0);
267 }
268 
269 static void
270 mt1_fini(struct rte_ring *rng, void *data)
271 {
272 	rte_free(rng);
273 	rte_free(data);
274 }
275 
276 static int
277 mt1_init(struct rte_ring **rng, void **data, uint32_t num)
278 {
279 	int32_t rc;
280 	size_t sz;
281 	uint32_t i, nr;
282 	struct rte_ring *r;
283 	struct ring_elem *elm;
284 	void *p;
285 
286 	*rng = NULL;
287 	*data = NULL;
288 
289 	sz = num * sizeof(*elm);
290 	elm = rte_zmalloc(NULL, sz, alignof(typeof(*elm)));
291 	if (elm == NULL) {
292 		printf("%s: alloc(%zu) for %u elems data failed",
293 			__func__, sz, num);
294 		return -ENOMEM;
295 	}
296 
297 	*data = elm;
298 
299 	/* alloc ring */
300 	nr = rte_align32pow2(2 * num);
301 	sz = rte_ring_get_memsize(nr);
302 	r = rte_zmalloc(NULL, sz, alignof(typeof(*r)));
303 	if (r == NULL) {
304 		printf("%s: alloc(%zu) for FIFO with %u elems failed",
305 			__func__, sz, nr);
306 		return -ENOMEM;
307 	}
308 
309 	*rng = r;
310 
311 	rc = _st_ring_init(r, RING_NAME, nr);
312 	if (rc != 0) {
313 		printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n",
314 			__func__, r, nr, rc, strerror(-rc));
315 		return rc;
316 	}
317 
318 	for (i = 0; i != num; i++) {
319 		fill_ring_elm(elm + i, UINT32_MAX);
320 		p = elm + i;
321 		if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
322 			break;
323 	}
324 
325 	if (i != num) {
326 		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
327 			__func__, r, num, i);
328 		return -ENOSPC;
329 	}
330 
331 	return 0;
332 }
333 
334 static int
335 test_mt1(int (*test)(void *))
336 {
337 	int32_t rc;
338 	uint32_t lc, mc;
339 	struct rte_ring *r;
340 	void *data;
341 	struct lcore_arg arg[RTE_MAX_LCORE];
342 
343 	static const struct lcore_stat init_stat = {
344 		.op.min_cycle = UINT64_MAX,
345 	};
346 
347 	rc = mt1_init(&r, &data, RING_SIZE);
348 	if (rc != 0) {
349 		mt1_fini(r, data);
350 		return rc;
351 	}
352 
353 	memset(arg, 0, sizeof(arg));
354 
355 	/* launch on all workers */
356 	RTE_LCORE_FOREACH_WORKER(lc) {
357 		arg[lc].rng = r;
358 		arg[lc].stats = init_stat;
359 		rte_eal_remote_launch(test, &arg[lc], lc);
360 	}
361 
362 	/* signal worker to start test */
363 	rte_atomic_store_explicit(&wrk_cmd, WRK_CMD_RUN, rte_memory_order_release);
364 
365 	rte_delay_us(run_time * US_PER_S);
366 
367 	/* signal worker to start test */
368 	rte_atomic_store_explicit(&wrk_cmd, WRK_CMD_STOP, rte_memory_order_release);
369 
370 	/* wait for workers and collect stats. */
371 	mc = rte_lcore_id();
372 	arg[mc].stats = init_stat;
373 
374 	rc = 0;
375 	RTE_LCORE_FOREACH_WORKER(lc) {
376 		rc |= rte_eal_wait_lcore(lc);
377 		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
378 		if (verbose != 0)
379 			lcore_stat_dump(stdout, lc, &arg[lc].stats);
380 	}
381 
382 	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
383 	rte_ring_dump(stdout, r);
384 	mt1_fini(r, data);
385 	return rc;
386 }
387 
388 static const struct test_case tests[] = {
389 	{
390 		.name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS",
391 		.func = test_mt1,
392 		.wfunc = test_worker_prcs,
393 	},
394 	{
395 		.name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG",
396 		.func = test_mt1,
397 		.wfunc = test_worker_avg,
398 	},
399 };
400