xref: /dpdk/app/test/test_ring_perf.c (revision b53d106d34b5c638f5a2cbdfee0da5bd42d4383f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  * Copyright(c) 2019 Arm Limited
4  */
5 
6 
7 #include <stdio.h>
8 #include <inttypes.h>
9 #include <rte_ring.h>
10 #include <rte_cycles.h>
11 #include <rte_launch.h>
12 #include <rte_pause.h>
13 #include <string.h>
14 
15 #include "test.h"
16 #include "test_ring.h"
17 
18 /*
19  * Ring performance test cases, measures performance of various operations
20  * using rdtsc for legacy and 16B size ring elements.
21  */
22 
23 #define RING_NAME "RING_PERF"
24 #define RING_SIZE 4096
25 #define MAX_BURST 32
26 
27 /*
28  * the sizes to enqueue and dequeue in testing
29  * (marked volatile so they won't be seen as compile-time constants)
30  */
31 static const volatile unsigned bulk_sizes[] = { 8, 32 };
32 
33 struct lcore_pair {
34 	unsigned c1, c2;
35 };
36 
37 static volatile unsigned lcore_count = 0;
38 
39 static void
40 test_ring_print_test_string(unsigned int api_type, int esize,
41 	unsigned int bsz, double value)
42 {
43 	if (esize == -1)
44 		printf("legacy APIs");
45 	else
46 		printf("elem APIs: element size %dB", esize);
47 
48 	if (api_type == TEST_RING_IGNORE_API_TYPE)
49 		return;
50 
51 	if ((api_type & TEST_RING_THREAD_DEF) == TEST_RING_THREAD_DEF)
52 		printf(": default enqueue/dequeue: ");
53 	else if ((api_type & TEST_RING_THREAD_SPSC) == TEST_RING_THREAD_SPSC)
54 		printf(": SP/SC: ");
55 	else if ((api_type & TEST_RING_THREAD_MPMC) == TEST_RING_THREAD_MPMC)
56 		printf(": MP/MC: ");
57 
58 	if ((api_type & TEST_RING_ELEM_SINGLE) == TEST_RING_ELEM_SINGLE)
59 		printf("single: ");
60 	else if ((api_type & TEST_RING_ELEM_BULK) == TEST_RING_ELEM_BULK)
61 		printf("bulk (size: %u): ", bsz);
62 	else if ((api_type & TEST_RING_ELEM_BURST) == TEST_RING_ELEM_BURST)
63 		printf("burst (size: %u): ", bsz);
64 
65 	printf("%.2F\n", value);
66 }
67 
68 /**** Functions to analyse our core mask to get cores for different tests ***/
69 
70 static int
71 get_two_hyperthreads(struct lcore_pair *lcp)
72 {
73 	unsigned id1, id2;
74 	unsigned c1, c2, s1, s2;
75 	RTE_LCORE_FOREACH(id1) {
76 		/* inner loop just re-reads all id's. We could skip the first few
77 		 * elements, but since number of cores is small there is little point
78 		 */
79 		RTE_LCORE_FOREACH(id2) {
80 			if (id1 == id2)
81 				continue;
82 
83 			c1 = rte_lcore_to_cpu_id(id1);
84 			c2 = rte_lcore_to_cpu_id(id2);
85 			s1 = rte_lcore_to_socket_id(id1);
86 			s2 = rte_lcore_to_socket_id(id2);
87 			if ((c1 == c2) && (s1 == s2)){
88 				lcp->c1 = id1;
89 				lcp->c2 = id2;
90 				return 0;
91 			}
92 		}
93 	}
94 	return 1;
95 }
96 
97 static int
98 get_two_cores(struct lcore_pair *lcp)
99 {
100 	unsigned id1, id2;
101 	unsigned c1, c2, s1, s2;
102 	RTE_LCORE_FOREACH(id1) {
103 		RTE_LCORE_FOREACH(id2) {
104 			if (id1 == id2)
105 				continue;
106 
107 			c1 = rte_lcore_to_cpu_id(id1);
108 			c2 = rte_lcore_to_cpu_id(id2);
109 			s1 = rte_lcore_to_socket_id(id1);
110 			s2 = rte_lcore_to_socket_id(id2);
111 			if ((c1 != c2) && (s1 == s2)){
112 				lcp->c1 = id1;
113 				lcp->c2 = id2;
114 				return 0;
115 			}
116 		}
117 	}
118 	return 1;
119 }
120 
121 static int
122 get_two_sockets(struct lcore_pair *lcp)
123 {
124 	unsigned id1, id2;
125 	unsigned s1, s2;
126 	RTE_LCORE_FOREACH(id1) {
127 		RTE_LCORE_FOREACH(id2) {
128 			if (id1 == id2)
129 				continue;
130 			s1 = rte_lcore_to_socket_id(id1);
131 			s2 = rte_lcore_to_socket_id(id2);
132 			if (s1 != s2){
133 				lcp->c1 = id1;
134 				lcp->c2 = id2;
135 				return 0;
136 			}
137 		}
138 	}
139 	return 1;
140 }
141 
142 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */
143 static void
144 test_empty_dequeue(struct rte_ring *r, const int esize,
145 			const unsigned int api_type)
146 {
147 	const unsigned int iter_shift = 26;
148 	const unsigned int iterations = 1 << iter_shift;
149 	unsigned int i = 0;
150 	void *burst[MAX_BURST];
151 
152 	const uint64_t start = rte_rdtsc();
153 	for (i = 0; i < iterations; i++)
154 		test_ring_dequeue(r, burst, esize, bulk_sizes[0], api_type);
155 	const uint64_t end = rte_rdtsc();
156 
157 	test_ring_print_test_string(api_type, esize, bulk_sizes[0],
158 					((double)(end - start)) / iterations);
159 }
160 
161 /*
162  * for the separate enqueue and dequeue threads they take in one param
163  * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc
164  */
165 struct thread_params {
166 	struct rte_ring *r;
167 	unsigned size;        /* input value, the burst size */
168 	double spsc, mpmc;    /* output value, the single or multi timings */
169 };
170 
171 /*
172  * Helper function to call bulk SP/MP enqueue functions.
173  * flag == 0 -> enqueue
174  * flag == 1 -> dequeue
175  */
176 static __rte_always_inline int
177 enqueue_dequeue_bulk_helper(const unsigned int flag, const int esize,
178 	struct thread_params *p)
179 {
180 	int ret;
181 	const unsigned int iter_shift = 15;
182 	const unsigned int iterations = 1 << iter_shift;
183 	struct rte_ring *r = p->r;
184 	unsigned int bsize = p->size;
185 	unsigned int i;
186 	void *burst = NULL;
187 
188 #ifdef RTE_USE_C11_MEM_MODEL
189 	if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2)
190 #else
191 	if (__sync_add_and_fetch(&lcore_count, 1) != 2)
192 #endif
193 		while(lcore_count != 2)
194 			rte_pause();
195 
196 	burst = test_ring_calloc(MAX_BURST, esize);
197 	if (burst == NULL)
198 		return -1;
199 
200 	const uint64_t sp_start = rte_rdtsc();
201 	for (i = 0; i < iterations; i++)
202 		do {
203 			if (flag == 0)
204 				ret = test_ring_enqueue(r, burst, esize, bsize,
205 						TEST_RING_THREAD_SPSC |
206 						TEST_RING_ELEM_BULK);
207 			else if (flag == 1)
208 				ret = test_ring_dequeue(r, burst, esize, bsize,
209 						TEST_RING_THREAD_SPSC |
210 						TEST_RING_ELEM_BULK);
211 			if (ret == 0)
212 				rte_pause();
213 		} while (!ret);
214 	const uint64_t sp_end = rte_rdtsc();
215 
216 	const uint64_t mp_start = rte_rdtsc();
217 	for (i = 0; i < iterations; i++)
218 		do {
219 			if (flag == 0)
220 				ret = test_ring_enqueue(r, burst, esize, bsize,
221 						TEST_RING_THREAD_MPMC |
222 						TEST_RING_ELEM_BULK);
223 			else if (flag == 1)
224 				ret = test_ring_dequeue(r, burst, esize, bsize,
225 						TEST_RING_THREAD_MPMC |
226 						TEST_RING_ELEM_BULK);
227 			if (ret == 0)
228 				rte_pause();
229 		} while (!ret);
230 	const uint64_t mp_end = rte_rdtsc();
231 
232 	p->spsc = ((double)(sp_end - sp_start))/(iterations * bsize);
233 	p->mpmc = ((double)(mp_end - mp_start))/(iterations * bsize);
234 	return 0;
235 }
236 
237 /*
238  * Function that uses rdtsc to measure timing for ring enqueue. Needs pair
239  * thread running dequeue_bulk function
240  */
241 static int
242 enqueue_bulk(void *p)
243 {
244 	struct thread_params *params = p;
245 
246 	return enqueue_dequeue_bulk_helper(0, -1, params);
247 }
248 
249 static int
250 enqueue_bulk_16B(void *p)
251 {
252 	struct thread_params *params = p;
253 
254 	return enqueue_dequeue_bulk_helper(0, 16, params);
255 }
256 
257 /*
258  * Function that uses rdtsc to measure timing for ring dequeue. Needs pair
259  * thread running enqueue_bulk function
260  */
261 static int
262 dequeue_bulk(void *p)
263 {
264 	struct thread_params *params = p;
265 
266 	return enqueue_dequeue_bulk_helper(1, -1, params);
267 }
268 
269 static int
270 dequeue_bulk_16B(void *p)
271 {
272 	struct thread_params *params = p;
273 
274 	return enqueue_dequeue_bulk_helper(1, 16, params);
275 }
276 
277 /*
278  * Function that calls the enqueue and dequeue bulk functions on pairs of cores.
279  * used to measure ring perf between hyperthreads, cores and sockets.
280  */
281 static int
282 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r, const int esize)
283 {
284 	lcore_function_t *f1, *f2;
285 	struct thread_params param1 = {0}, param2 = {0};
286 	unsigned i;
287 
288 	if (esize == -1) {
289 		f1 = enqueue_bulk;
290 		f2 = dequeue_bulk;
291 	} else {
292 		f1 = enqueue_bulk_16B;
293 		f2 = dequeue_bulk_16B;
294 	}
295 
296 	for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
297 		lcore_count = 0;
298 		param1.size = param2.size = bulk_sizes[i];
299 		param1.r = param2.r = r;
300 		if (cores->c1 == rte_get_main_lcore()) {
301 			rte_eal_remote_launch(f2, &param2, cores->c2);
302 			f1(&param1);
303 			rte_eal_wait_lcore(cores->c2);
304 		} else {
305 			rte_eal_remote_launch(f1, &param1, cores->c1);
306 			rte_eal_remote_launch(f2, &param2, cores->c2);
307 			if (rte_eal_wait_lcore(cores->c1) < 0)
308 				return -1;
309 			if (rte_eal_wait_lcore(cores->c2) < 0)
310 				return -1;
311 		}
312 		test_ring_print_test_string(
313 			TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK,
314 			esize, bulk_sizes[i], param1.spsc + param2.spsc);
315 		test_ring_print_test_string(
316 			TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK,
317 			esize, bulk_sizes[i], param1.mpmc + param2.mpmc);
318 	}
319 
320 	return 0;
321 }
322 
323 static uint32_t synchro;
324 static uint64_t queue_count[RTE_MAX_LCORE];
325 
326 #define TIME_MS 100
327 
328 static int
329 load_loop_fn_helper(struct thread_params *p, const int esize)
330 {
331 	uint64_t time_diff = 0;
332 	uint64_t begin = 0;
333 	uint64_t hz = rte_get_timer_hz();
334 	uint64_t lcount = 0;
335 	const unsigned int lcore = rte_lcore_id();
336 	struct thread_params *params = p;
337 	void *burst = NULL;
338 
339 	burst = test_ring_calloc(MAX_BURST, esize);
340 	if (burst == NULL)
341 		return -1;
342 
343 	/* wait synchro for workers */
344 	if (lcore != rte_get_main_lcore())
345 		rte_wait_until_equal_32(&synchro, 1, __ATOMIC_RELAXED);
346 
347 	begin = rte_get_timer_cycles();
348 	while (time_diff < hz * TIME_MS / 1000) {
349 		test_ring_enqueue(params->r, burst, esize, params->size,
350 				TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
351 		test_ring_dequeue(params->r, burst, esize, params->size,
352 				TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
353 		lcount++;
354 		time_diff = rte_get_timer_cycles() - begin;
355 	}
356 	queue_count[lcore] = lcount;
357 
358 	rte_free(burst);
359 
360 	return 0;
361 }
362 
363 static int
364 load_loop_fn(void *p)
365 {
366 	struct thread_params *params = p;
367 
368 	return load_loop_fn_helper(params, -1);
369 }
370 
371 static int
372 load_loop_fn_16B(void *p)
373 {
374 	struct thread_params *params = p;
375 
376 	return load_loop_fn_helper(params, 16);
377 }
378 
379 static int
380 run_on_all_cores(struct rte_ring *r, const int esize)
381 {
382 	uint64_t total;
383 	struct thread_params param;
384 	lcore_function_t *lcore_f;
385 	unsigned int i, c;
386 
387 	if (esize == -1)
388 		lcore_f = load_loop_fn;
389 	else
390 		lcore_f = load_loop_fn_16B;
391 
392 	memset(&param, 0, sizeof(struct thread_params));
393 	for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
394 		total = 0;
395 		printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]);
396 		param.size = bulk_sizes[i];
397 		param.r = r;
398 
399 		/* clear synchro and start workers */
400 		__atomic_store_n(&synchro, 0, __ATOMIC_RELAXED);
401 		if (rte_eal_mp_remote_launch(lcore_f, &param, SKIP_MAIN) < 0)
402 			return -1;
403 
404 		/* start synchro and launch test on main */
405 		__atomic_store_n(&synchro, 1, __ATOMIC_RELAXED);
406 		lcore_f(&param);
407 
408 		rte_eal_mp_wait_lcore();
409 
410 		RTE_LCORE_FOREACH(c) {
411 			printf("Core [%u] count = %"PRIu64"\n",
412 					c, queue_count[c]);
413 			total += queue_count[c];
414 		}
415 
416 		printf("Total count (size: %u): %"PRIu64"\n",
417 				bulk_sizes[i], total);
418 	}
419 
420 	return 0;
421 }
422 
423 /*
424  * Test function that determines how long an enqueue + dequeue of a single item
425  * takes on a single lcore. Result is for comparison with the bulk enq+deq.
426  */
427 static int
428 test_single_enqueue_dequeue(struct rte_ring *r, const int esize,
429 	const unsigned int api_type)
430 {
431 	const unsigned int iter_shift = 24;
432 	const unsigned int iterations = 1 << iter_shift;
433 	unsigned int i = 0;
434 	void *burst = NULL;
435 
436 	/* alloc dummy object pointers */
437 	burst = test_ring_calloc(1, esize);
438 	if (burst == NULL)
439 		return -1;
440 
441 	const uint64_t start = rte_rdtsc();
442 	for (i = 0; i < iterations; i++) {
443 		test_ring_enqueue(r, burst, esize, 1, api_type);
444 		test_ring_dequeue(r, burst, esize, 1, api_type);
445 	}
446 	const uint64_t end = rte_rdtsc();
447 
448 	test_ring_print_test_string(api_type, esize, 1,
449 					((double)(end - start)) / iterations);
450 
451 	rte_free(burst);
452 
453 	return 0;
454 }
455 
456 /*
457  * Test that does both enqueue and dequeue on a core using the burst/bulk API
458  * calls Results should be the same as for the bulk function called on a
459  * single lcore.
460  */
461 static int
462 test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize,
463 	const unsigned int api_type)
464 {
465 	const unsigned int iter_shift = 23;
466 	const unsigned int iterations = 1 << iter_shift;
467 	unsigned int sz, i = 0;
468 	void **burst = NULL;
469 
470 	burst = test_ring_calloc(MAX_BURST, esize);
471 	if (burst == NULL)
472 		return -1;
473 
474 	for (sz = 0; sz < RTE_DIM(bulk_sizes); sz++) {
475 		const uint64_t start = rte_rdtsc();
476 		for (i = 0; i < iterations; i++) {
477 			test_ring_enqueue(r, burst, esize, bulk_sizes[sz],
478 						api_type);
479 			test_ring_dequeue(r, burst, esize, bulk_sizes[sz],
480 						api_type);
481 		}
482 		const uint64_t end = rte_rdtsc();
483 
484 		test_ring_print_test_string(api_type, esize, bulk_sizes[sz],
485 					((double)(end - start)) / iterations);
486 	}
487 
488 	rte_free(burst);
489 
490 	return 0;
491 }
492 
493 /* Run all tests for a given element size */
494 static __rte_always_inline int
495 test_ring_perf_esize(const int esize)
496 {
497 	struct lcore_pair cores;
498 	struct rte_ring *r = NULL;
499 
500 	/*
501 	 * Performance test for legacy/_elem APIs
502 	 * SP-SC/MP-MC, single
503 	 */
504 	r = test_ring_create(RING_NAME, esize, RING_SIZE, rte_socket_id(), 0);
505 	if (r == NULL)
506 		goto test_fail;
507 
508 	printf("\n### Testing single element enq/deq ###\n");
509 	if (test_single_enqueue_dequeue(r, esize,
510 			TEST_RING_THREAD_SPSC | TEST_RING_ELEM_SINGLE) < 0)
511 		goto test_fail;
512 	if (test_single_enqueue_dequeue(r, esize,
513 			TEST_RING_THREAD_MPMC | TEST_RING_ELEM_SINGLE) < 0)
514 		goto test_fail;
515 
516 	printf("\n### Testing burst enq/deq ###\n");
517 	if (test_burst_bulk_enqueue_dequeue(r, esize,
518 			TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BURST) < 0)
519 		goto test_fail;
520 	if (test_burst_bulk_enqueue_dequeue(r, esize,
521 			TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BURST) < 0)
522 		goto test_fail;
523 
524 	printf("\n### Testing bulk enq/deq ###\n");
525 	if (test_burst_bulk_enqueue_dequeue(r, esize,
526 			TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK) < 0)
527 		goto test_fail;
528 	if (test_burst_bulk_enqueue_dequeue(r, esize,
529 			TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK) < 0)
530 		goto test_fail;
531 
532 	printf("\n### Testing empty bulk deq ###\n");
533 	test_empty_dequeue(r, esize,
534 			TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK);
535 	test_empty_dequeue(r, esize,
536 			TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
537 
538 	if (get_two_hyperthreads(&cores) == 0) {
539 		printf("\n### Testing using two hyperthreads ###\n");
540 		if (run_on_core_pair(&cores, r, esize) < 0)
541 			goto test_fail;
542 	}
543 
544 	if (get_two_cores(&cores) == 0) {
545 		printf("\n### Testing using two physical cores ###\n");
546 		if (run_on_core_pair(&cores, r, esize) < 0)
547 			goto test_fail;
548 	}
549 	if (get_two_sockets(&cores) == 0) {
550 		printf("\n### Testing using two NUMA nodes ###\n");
551 		if (run_on_core_pair(&cores, r, esize) < 0)
552 			goto test_fail;
553 	}
554 
555 	printf("\n### Testing using all worker nodes ###\n");
556 	if (run_on_all_cores(r, esize) < 0)
557 		goto test_fail;
558 
559 	rte_ring_free(r);
560 
561 	return 0;
562 
563 test_fail:
564 	rte_ring_free(r);
565 
566 	return -1;
567 }
568 
569 static int
570 test_ring_perf(void)
571 {
572 	/* Run all the tests for different element sizes */
573 	if (test_ring_perf_esize(-1) == -1)
574 		return -1;
575 
576 	if (test_ring_perf_esize(16) == -1)
577 		return -1;
578 
579 	return 0;
580 }
581 
582 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf);
583