xref: /dpdk/app/test/test_distributor.c (revision b6a7e6852e9ab82ae0e05e2d2a0b83abca17de3b)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4 
5 #include "test.h"
6 
7 #include <unistd.h>
8 #include <stdalign.h>
9 #include <string.h>
10 #include <rte_cycles.h>
11 #include <rte_errno.h>
12 #include <rte_mempool.h>
13 #include <rte_mbuf.h>
14 #include <rte_mbuf_dyn.h>
15 
16 #ifdef RTE_EXEC_ENV_WINDOWS
17 static int
test_distributor(void)18 test_distributor(void)
19 {
20 	printf("distributor not supported on Windows, skipping test\n");
21 	return TEST_SKIPPED;
22 }
23 
24 #else
25 
26 #include <rte_distributor.h>
27 #include <rte_string_fns.h>
28 
29 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
30 #define BURST 32
31 #define BIG_BATCH 1024
32 
33 typedef uint32_t seq_dynfield_t;
34 static int seq_dynfield_offset = -1;
35 
36 static inline seq_dynfield_t *
seq_field(struct rte_mbuf * mbuf)37 seq_field(struct rte_mbuf *mbuf)
38 {
39 	return RTE_MBUF_DYNFIELD(mbuf, seq_dynfield_offset, seq_dynfield_t *);
40 }
41 
42 struct worker_params {
43 	char name[64];
44 	struct rte_distributor *dist;
45 };
46 
47 struct worker_params worker_params;
48 
49 /* statics - all zero-initialized by default */
50 static volatile RTE_ATOMIC(int) quit;      /**< general quit variable for all threads */
51 static volatile RTE_ATOMIC(int) zero_quit; /**< var for when we just want thr0 to quit*/
52 static volatile RTE_ATOMIC(int) zero_sleep; /**< thr0 has quit basic loop and is sleeping*/
53 static volatile RTE_ATOMIC(unsigned int) worker_idx;
54 static volatile RTE_ATOMIC(unsigned int) zero_idx;
55 
56 struct __rte_cache_aligned worker_stats {
57 	volatile RTE_ATOMIC(unsigned int) handled_packets;
58 };
59 struct worker_stats worker_stats[RTE_MAX_LCORE];
60 
61 /* returns the total count of the number of packets handled by the worker
62  * functions given below.
63  */
64 static inline unsigned
total_packet_count(void)65 total_packet_count(void)
66 {
67 	unsigned i, count = 0;
68 	for (i = 0; i < worker_idx; i++)
69 		count += rte_atomic_load_explicit(&worker_stats[i].handled_packets,
70 				rte_memory_order_relaxed);
71 	return count;
72 }
73 
74 /* resets the packet counts for a new test */
75 static inline void
clear_packet_count(void)76 clear_packet_count(void)
77 {
78 	unsigned int i;
79 	for (i = 0; i < RTE_MAX_LCORE; i++)
80 		rte_atomic_store_explicit(&worker_stats[i].handled_packets, 0,
81 			rte_memory_order_relaxed);
82 }
83 
84 /* this is the basic worker function for sanity test
85  * it does nothing but return packets and count them.
86  */
87 static int
handle_work(void * arg)88 handle_work(void *arg)
89 {
90 	alignas(RTE_CACHE_LINE_SIZE) struct rte_mbuf *buf[8];
91 	struct worker_params *wp = arg;
92 	struct rte_distributor *db = wp->dist;
93 	unsigned int num;
94 	unsigned int id = rte_atomic_fetch_add_explicit(&worker_idx, 1, rte_memory_order_relaxed);
95 
96 	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
97 	while (!quit) {
98 		rte_atomic_fetch_add_explicit(&worker_stats[id].handled_packets, num,
99 				rte_memory_order_relaxed);
100 		num = rte_distributor_get_pkt(db, id,
101 				buf, buf, num);
102 	}
103 	rte_atomic_fetch_add_explicit(&worker_stats[id].handled_packets, num,
104 			rte_memory_order_relaxed);
105 	rte_distributor_return_pkt(db, id, buf, num);
106 	return 0;
107 }
108 
109 /* do basic sanity testing of the distributor. This test tests the following:
110  * - send 32 packets through distributor with the same tag and ensure they
111  *   all go to the one worker
112  * - send 32 packets through the distributor with two different tags and
113  *   verify that they go equally to two different workers.
114  * - send 32 packets with different tags through the distributors and
115  *   just verify we get all packets back.
116  * - send 1024 packets through the distributor, gathering the returned packets
117  *   as we go. Then verify that we correctly got all 1024 pointers back again,
118  *   not necessarily in the same order (as different flows).
119  */
120 static int
sanity_test(struct worker_params * wp,struct rte_mempool * p)121 sanity_test(struct worker_params *wp, struct rte_mempool *p)
122 {
123 	struct rte_distributor *db = wp->dist;
124 	struct rte_mbuf *bufs[BURST];
125 	struct rte_mbuf *returns[BURST*2];
126 	unsigned int i, count;
127 	unsigned int retries;
128 	unsigned int processed;
129 
130 	printf("=== Basic distributor sanity tests ===\n");
131 	clear_packet_count();
132 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
133 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
134 		return -1;
135 	}
136 
137 	/* now set all hash values in all buffers to zero, so all pkts go to the
138 	 * one worker thread */
139 	for (i = 0; i < BURST; i++)
140 		bufs[i]->hash.usr = 0;
141 
142 	processed = 0;
143 	while (processed < BURST)
144 		processed += rte_distributor_process(db, &bufs[processed],
145 			BURST - processed);
146 
147 	count = 0;
148 	do {
149 
150 		rte_distributor_flush(db);
151 		count += rte_distributor_returned_pkts(db,
152 				returns, BURST*2);
153 	} while (count < BURST);
154 
155 	if (total_packet_count() != BURST) {
156 		printf("Line %d: Error, not all packets flushed. "
157 				"Expected %u, got %u\n",
158 				__LINE__, BURST, total_packet_count());
159 		rte_mempool_put_bulk(p, (void *)bufs, BURST);
160 		return -1;
161 	}
162 
163 	for (i = 0; i < rte_lcore_count() - 1; i++)
164 		printf("Worker %u handled %u packets\n", i,
165 			rte_atomic_load_explicit(&worker_stats[i].handled_packets,
166 					rte_memory_order_relaxed));
167 	printf("Sanity test with all zero hashes done.\n");
168 
169 	/* pick two flows and check they go correctly */
170 	if (rte_lcore_count() >= 3) {
171 		clear_packet_count();
172 		for (i = 0; i < BURST; i++)
173 			bufs[i]->hash.usr = (i & 1) << 8;
174 
175 		rte_distributor_process(db, bufs, BURST);
176 		count = 0;
177 		do {
178 			rte_distributor_flush(db);
179 			count += rte_distributor_returned_pkts(db,
180 					returns, BURST*2);
181 		} while (count < BURST);
182 		if (total_packet_count() != BURST) {
183 			printf("Line %d: Error, not all packets flushed. "
184 					"Expected %u, got %u\n",
185 					__LINE__, BURST, total_packet_count());
186 			rte_mempool_put_bulk(p, (void *)bufs, BURST);
187 			return -1;
188 		}
189 
190 		for (i = 0; i < rte_lcore_count() - 1; i++)
191 			printf("Worker %u handled %u packets\n", i,
192 				rte_atomic_load_explicit(
193 					&worker_stats[i].handled_packets,
194 					rte_memory_order_relaxed));
195 		printf("Sanity test with two hash values done\n");
196 	}
197 
198 	/* give a different hash value to each packet,
199 	 * so load gets distributed */
200 	clear_packet_count();
201 	for (i = 0; i < BURST; i++)
202 		bufs[i]->hash.usr = i+1;
203 
204 	rte_distributor_process(db, bufs, BURST);
205 	count = 0;
206 	do {
207 		rte_distributor_flush(db);
208 		count += rte_distributor_returned_pkts(db,
209 				returns, BURST*2);
210 	} while (count < BURST);
211 	if (total_packet_count() != BURST) {
212 		printf("Line %d: Error, not all packets flushed. "
213 				"Expected %u, got %u\n",
214 				__LINE__, BURST, total_packet_count());
215 		rte_mempool_put_bulk(p, (void *)bufs, BURST);
216 		return -1;
217 	}
218 
219 	for (i = 0; i < rte_lcore_count() - 1; i++)
220 		printf("Worker %u handled %u packets\n", i,
221 			rte_atomic_load_explicit(&worker_stats[i].handled_packets,
222 					rte_memory_order_relaxed));
223 	printf("Sanity test with non-zero hashes done\n");
224 
225 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
226 
227 	/* sanity test with BIG_BATCH packets to ensure they all arrived back
228 	 * from the returned packets function */
229 	clear_packet_count();
230 	struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
231 	unsigned num_returned = 0;
232 	unsigned int num_being_processed = 0;
233 	unsigned int return_buffer_capacity = 127;/* RTE_DISTRIB_RETURNS_MASK */
234 
235 	/* flush out any remaining packets */
236 	rte_distributor_flush(db);
237 	rte_distributor_clear_returns(db);
238 
239 	if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
240 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
241 		return -1;
242 	}
243 	for (i = 0; i < BIG_BATCH; i++)
244 		many_bufs[i]->hash.usr = i << 2;
245 
246 	printf("=== testing big burst (%s) ===\n", wp->name);
247 	for (i = 0; i < BIG_BATCH/BURST; i++) {
248 		rte_distributor_process(db,
249 				&many_bufs[i*BURST], BURST);
250 		num_being_processed += BURST;
251 		do {
252 			count = rte_distributor_returned_pkts(db,
253 					&return_bufs[num_returned],
254 					BIG_BATCH - num_returned);
255 			num_being_processed -= count;
256 			num_returned += count;
257 			rte_distributor_flush(db);
258 		} while (num_being_processed + BURST > return_buffer_capacity);
259 	}
260 	retries = 0;
261 	do {
262 		rte_distributor_flush(db);
263 		count = rte_distributor_returned_pkts(db,
264 				&return_bufs[num_returned],
265 				BIG_BATCH - num_returned);
266 		num_returned += count;
267 		retries++;
268 	} while ((num_returned < BIG_BATCH) && (retries < 100));
269 
270 	if (num_returned != BIG_BATCH) {
271 		printf("line %d: Missing packets, expected %d\n",
272 				__LINE__, num_returned);
273 		rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
274 		return -1;
275 	}
276 
277 	/* big check -  make sure all packets made it back!! */
278 	for (i = 0; i < BIG_BATCH; i++) {
279 		unsigned j;
280 		struct rte_mbuf *src = many_bufs[i];
281 		for (j = 0; j < BIG_BATCH; j++) {
282 			if (return_bufs[j] == src)
283 				break;
284 		}
285 
286 		if (j == BIG_BATCH) {
287 			printf("Error: could not find source packet #%u\n", i);
288 			rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
289 			return -1;
290 		}
291 	}
292 	printf("Sanity test of returned packets done\n");
293 
294 	rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
295 
296 	printf("\n");
297 	return 0;
298 }
299 
300 
301 /* to test that the distributor does not lose packets, we use this worker
302  * function which frees mbufs when it gets them. The distributor thread does
303  * the mbuf allocation. If distributor drops packets we'll eventually run out
304  * of mbufs.
305  */
306 static int
handle_work_with_free_mbufs(void * arg)307 handle_work_with_free_mbufs(void *arg)
308 {
309 	alignas(RTE_CACHE_LINE_SIZE) struct rte_mbuf *buf[8];
310 	struct worker_params *wp = arg;
311 	struct rte_distributor *d = wp->dist;
312 	unsigned int i;
313 	unsigned int num;
314 	unsigned int id = rte_atomic_fetch_add_explicit(&worker_idx, 1, rte_memory_order_relaxed);
315 
316 	num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
317 	while (!quit) {
318 		rte_atomic_fetch_add_explicit(&worker_stats[id].handled_packets, num,
319 				rte_memory_order_relaxed);
320 		for (i = 0; i < num; i++)
321 			rte_pktmbuf_free(buf[i]);
322 		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
323 	}
324 	rte_atomic_fetch_add_explicit(&worker_stats[id].handled_packets, num,
325 			rte_memory_order_relaxed);
326 	rte_distributor_return_pkt(d, id, buf, num);
327 	return 0;
328 }
329 
330 /* Perform a sanity test of the distributor with a large number of packets,
331  * where we allocate a new set of mbufs for each burst. The workers then
332  * free the mbufs. This ensures that we don't have any packet leaks in the
333  * library.
334  */
335 static int
sanity_test_with_mbuf_alloc(struct worker_params * wp,struct rte_mempool * p)336 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
337 {
338 	struct rte_distributor *d = wp->dist;
339 	unsigned i;
340 	struct rte_mbuf *bufs[BURST];
341 	unsigned int processed;
342 
343 	printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
344 
345 	clear_packet_count();
346 	for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
347 		unsigned j;
348 		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
349 			rte_distributor_process(d, NULL, 0);
350 		for (j = 0; j < BURST; j++) {
351 			bufs[j]->hash.usr = (i+j) << 1;
352 		}
353 
354 		processed = 0;
355 		while (processed < BURST)
356 			processed += rte_distributor_process(d,
357 				&bufs[processed], BURST - processed);
358 	}
359 
360 	rte_distributor_flush(d);
361 
362 	rte_delay_us(10000);
363 
364 	if (total_packet_count() < (1<<ITER_POWER)) {
365 		printf("Line %u: Packet count is incorrect, %u, expected %u\n",
366 				__LINE__, total_packet_count(),
367 				(1<<ITER_POWER));
368 		return -1;
369 	}
370 
371 	printf("Sanity test with mbuf alloc/free passed\n\n");
372 	return 0;
373 }
374 
375 static int
handle_work_for_shutdown_test(void * arg)376 handle_work_for_shutdown_test(void *arg)
377 {
378 	alignas(RTE_CACHE_LINE_SIZE) struct rte_mbuf *buf[8];
379 	struct worker_params *wp = arg;
380 	struct rte_distributor *d = wp->dist;
381 	unsigned int num;
382 	unsigned int zero_id = 0;
383 	unsigned int zero_unset;
384 	const unsigned int id = rte_atomic_fetch_add_explicit(&worker_idx, 1,
385 			rte_memory_order_relaxed);
386 
387 	num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
388 
389 	if (num > 0) {
390 		zero_unset = RTE_MAX_LCORE;
391 		rte_atomic_compare_exchange_strong_explicit(&zero_idx, &zero_unset, id,
392 			rte_memory_order_acq_rel, rte_memory_order_acquire);
393 	}
394 	zero_id = rte_atomic_load_explicit(&zero_idx, rte_memory_order_acquire);
395 
396 	/* wait for quit single globally, or for worker zero, wait
397 	 * for zero_quit */
398 	while (!quit && !(id == zero_id && zero_quit)) {
399 		rte_atomic_fetch_add_explicit(&worker_stats[id].handled_packets, num,
400 				rte_memory_order_relaxed);
401 		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
402 
403 		if (num > 0) {
404 			zero_unset = RTE_MAX_LCORE;
405 			rte_atomic_compare_exchange_strong_explicit(&zero_idx, &zero_unset, id,
406 				rte_memory_order_acq_rel, rte_memory_order_acquire);
407 		}
408 		zero_id = rte_atomic_load_explicit(&zero_idx, rte_memory_order_acquire);
409 	}
410 
411 	rte_atomic_fetch_add_explicit(&worker_stats[id].handled_packets, num,
412 			rte_memory_order_relaxed);
413 	if (id == zero_id) {
414 		rte_distributor_return_pkt(d, id, NULL, 0);
415 
416 		/* for worker zero, allow it to restart to pick up last packet
417 		 * when all workers are shutting down.
418 		 */
419 		rte_atomic_store_explicit(&zero_sleep, 1, rte_memory_order_release);
420 		while (zero_quit)
421 			usleep(100);
422 		rte_atomic_store_explicit(&zero_sleep, 0, rte_memory_order_release);
423 
424 		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
425 
426 		while (!quit) {
427 			rte_atomic_fetch_add_explicit(&worker_stats[id].handled_packets,
428 					num, rte_memory_order_relaxed);
429 			num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
430 		}
431 	}
432 	rte_distributor_return_pkt(d, id, buf, num);
433 	return 0;
434 }
435 
436 
437 /* Perform a sanity test of the distributor with a large number of packets,
438  * where we allocate a new set of mbufs for each burst. The workers then
439  * free the mbufs. This ensures that we don't have any packet leaks in the
440  * library.
441  */
442 static int
sanity_test_with_worker_shutdown(struct worker_params * wp,struct rte_mempool * p)443 sanity_test_with_worker_shutdown(struct worker_params *wp,
444 		struct rte_mempool *p)
445 {
446 	struct rte_distributor *d = wp->dist;
447 	struct rte_mbuf *bufs[BURST];
448 	struct rte_mbuf *bufs2[BURST];
449 	unsigned int i;
450 	unsigned int failed = 0;
451 	unsigned int processed = 0;
452 
453 	printf("=== Sanity test of worker shutdown ===\n");
454 
455 	clear_packet_count();
456 
457 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
458 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
459 		return -1;
460 	}
461 
462 	/*
463 	 * Now set all hash values in all buffers to same value so all
464 	 * pkts go to the one worker thread
465 	 */
466 	for (i = 0; i < BURST; i++)
467 		bufs[i]->hash.usr = 1;
468 
469 	processed = 0;
470 	while (processed < BURST)
471 		processed += rte_distributor_process(d, &bufs[processed],
472 			BURST - processed);
473 	rte_distributor_flush(d);
474 
475 	/* at this point, we will have processed some packets and have a full
476 	 * backlog for the other ones at worker 0.
477 	 */
478 
479 	/* get more buffers to queue up, again setting them to the same flow */
480 	if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
481 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
482 		rte_mempool_put_bulk(p, (void *)bufs, BURST);
483 		return -1;
484 	}
485 	for (i = 0; i < BURST; i++)
486 		bufs2[i]->hash.usr = 1;
487 
488 	/* get worker zero to quit */
489 	zero_quit = 1;
490 	rte_distributor_process(d, bufs2, BURST);
491 
492 	/* flush the distributor */
493 	rte_distributor_flush(d);
494 	while (!rte_atomic_load_explicit(&zero_sleep, rte_memory_order_acquire))
495 		rte_distributor_flush(d);
496 
497 	zero_quit = 0;
498 	while (rte_atomic_load_explicit(&zero_sleep, rte_memory_order_acquire))
499 		rte_delay_us(100);
500 
501 	for (i = 0; i < rte_lcore_count() - 1; i++)
502 		printf("Worker %u handled %u packets\n", i,
503 			rte_atomic_load_explicit(&worker_stats[i].handled_packets,
504 					rte_memory_order_relaxed));
505 
506 	if (total_packet_count() != BURST * 2) {
507 		printf("Line %d: Error, not all packets flushed. "
508 				"Expected %u, got %u\n",
509 				__LINE__, BURST * 2, total_packet_count());
510 		failed = 1;
511 	}
512 
513 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
514 	rte_mempool_put_bulk(p, (void *)bufs2, BURST);
515 
516 	if (failed)
517 		return -1;
518 
519 	printf("Sanity test with worker shutdown passed\n\n");
520 	return 0;
521 }
522 
523 /* Test that the flush function is able to move packets between workers when
524  * one worker shuts down..
525  */
526 static int
test_flush_with_worker_shutdown(struct worker_params * wp,struct rte_mempool * p)527 test_flush_with_worker_shutdown(struct worker_params *wp,
528 		struct rte_mempool *p)
529 {
530 	struct rte_distributor *d = wp->dist;
531 	struct rte_mbuf *bufs[BURST];
532 	unsigned int i;
533 	unsigned int failed = 0;
534 	unsigned int processed;
535 
536 	printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
537 
538 	clear_packet_count();
539 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
540 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
541 		return -1;
542 	}
543 
544 	/* now set all hash values in all buffers to zero, so all pkts go to the
545 	 * one worker thread */
546 	for (i = 0; i < BURST; i++)
547 		bufs[i]->hash.usr = 0;
548 
549 	processed = 0;
550 	while (processed < BURST)
551 		processed += rte_distributor_process(d, &bufs[processed],
552 			BURST - processed);
553 	/* at this point, we will have processed some packets and have a full
554 	 * backlog for the other ones at worker 0.
555 	 */
556 
557 	/* get worker zero to quit */
558 	zero_quit = 1;
559 
560 	/* flush the distributor */
561 	rte_distributor_flush(d);
562 
563 	while (!rte_atomic_load_explicit(&zero_sleep, rte_memory_order_acquire))
564 		rte_distributor_flush(d);
565 
566 	zero_quit = 0;
567 
568 	while (rte_atomic_load_explicit(&zero_sleep, rte_memory_order_acquire))
569 		rte_delay_us(100);
570 
571 	for (i = 0; i < rte_lcore_count() - 1; i++)
572 		printf("Worker %u handled %u packets\n", i,
573 			rte_atomic_load_explicit(&worker_stats[i].handled_packets,
574 					rte_memory_order_relaxed));
575 
576 	if (total_packet_count() != BURST) {
577 		printf("Line %d: Error, not all packets flushed. "
578 				"Expected %u, got %u\n",
579 				__LINE__, BURST, total_packet_count());
580 		failed = 1;
581 	}
582 
583 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
584 
585 	if (failed)
586 		return -1;
587 
588 	printf("Flush test with worker shutdown passed\n\n");
589 	return 0;
590 }
591 
592 static int
handle_and_mark_work(void * arg)593 handle_and_mark_work(void *arg)
594 {
595 	alignas(RTE_CACHE_LINE_SIZE) struct rte_mbuf *buf[8];
596 	struct worker_params *wp = arg;
597 	struct rte_distributor *db = wp->dist;
598 	unsigned int num, i;
599 	unsigned int id = rte_atomic_fetch_add_explicit(&worker_idx, 1, rte_memory_order_relaxed);
600 	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
601 	while (!quit) {
602 		rte_atomic_fetch_add_explicit(&worker_stats[id].handled_packets, num,
603 				rte_memory_order_relaxed);
604 		for (i = 0; i < num; i++)
605 			*seq_field(buf[i]) += id + 1;
606 		num = rte_distributor_get_pkt(db, id,
607 				buf, buf, num);
608 	}
609 	rte_atomic_fetch_add_explicit(&worker_stats[id].handled_packets, num,
610 			rte_memory_order_relaxed);
611 	rte_distributor_return_pkt(db, id, buf, num);
612 	return 0;
613 }
614 
615 /* sanity_mark_test sends packets to workers which mark them.
616  * Every packet has also encoded sequence number.
617  * The returned packets are sorted and verified if they were handled
618  * by proper workers.
619  */
620 static int
sanity_mark_test(struct worker_params * wp,struct rte_mempool * p)621 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
622 {
623 	const unsigned int buf_count = 24;
624 	const unsigned int burst = 8;
625 	const unsigned int shift = 12;
626 	const unsigned int seq_shift = 10;
627 
628 	struct rte_distributor *db = wp->dist;
629 	struct rte_mbuf *bufs[buf_count];
630 	struct rte_mbuf *returns[buf_count];
631 	unsigned int i, count, id;
632 	unsigned int sorted[buf_count], seq;
633 	unsigned int failed = 0;
634 	unsigned int processed;
635 
636 	printf("=== Marked packets test ===\n");
637 	clear_packet_count();
638 	if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
639 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
640 		return -1;
641 	}
642 
643 	/* bufs' hashes will be like these below, but shifted left.
644 	 * The shifting is for avoiding collisions with backlogs
645 	 * and in-flight tags left by previous tests.
646 	 * [1, 1, 1, 1, 1, 1, 1, 1
647 	 *  1, 1, 1, 1, 2, 2, 2, 2
648 	 *  2, 2, 2, 2, 1, 1, 1, 1]
649 	 */
650 	for (i = 0; i < burst; i++) {
651 		bufs[0 * burst + i]->hash.usr = 1 << shift;
652 		bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
653 			<< shift;
654 		bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
655 			<< shift;
656 	}
657 	/* Assign a sequence number to each packet. The sequence is shifted,
658 	 * so that lower bits will hold mark from worker.
659 	 */
660 	for (i = 0; i < buf_count; i++)
661 		*seq_field(bufs[i]) = i << seq_shift;
662 
663 	count = 0;
664 	for (i = 0; i < buf_count/burst; i++) {
665 		processed = 0;
666 		while (processed < burst)
667 			processed += rte_distributor_process(db,
668 				&bufs[i * burst + processed],
669 				burst - processed);
670 		count += rte_distributor_returned_pkts(db, &returns[count],
671 			buf_count - count);
672 	}
673 
674 	do {
675 		rte_distributor_flush(db);
676 		count += rte_distributor_returned_pkts(db, &returns[count],
677 			buf_count - count);
678 	} while (count < buf_count);
679 
680 	for (i = 0; i < rte_lcore_count() - 1; i++)
681 		printf("Worker %u handled %u packets\n", i,
682 			rte_atomic_load_explicit(&worker_stats[i].handled_packets,
683 					rte_memory_order_relaxed));
684 
685 	/* Sort returned packets by sent order (sequence numbers). */
686 	for (i = 0; i < buf_count; i++) {
687 		seq = *seq_field(returns[i]) >> seq_shift;
688 		id = *seq_field(returns[i]) - (seq << seq_shift);
689 		sorted[seq] = id;
690 	}
691 
692 	/* Verify that packets [0-11] and [20-23] were processed
693 	 * by the same worker
694 	 */
695 	for (i = 1; i < 12; i++) {
696 		if (sorted[i] != sorted[0]) {
697 			printf("Packet number %u processed by worker %u,"
698 				" but should be processes by worker %u\n",
699 				i, sorted[i], sorted[0]);
700 			failed = 1;
701 		}
702 	}
703 	for (i = 20; i < 24; i++) {
704 		if (sorted[i] != sorted[0]) {
705 			printf("Packet number %u processed by worker %u,"
706 				" but should be processes by worker %u\n",
707 				i, sorted[i], sorted[0]);
708 			failed = 1;
709 		}
710 	}
711 	/* And verify that packets [12-19] were processed
712 	 * by the another worker
713 	 */
714 	for (i = 13; i < 20; i++) {
715 		if (sorted[i] != sorted[12]) {
716 			printf("Packet number %u processed by worker %u,"
717 				" but should be processes by worker %u\n",
718 				i, sorted[i], sorted[12]);
719 			failed = 1;
720 		}
721 	}
722 
723 	rte_mempool_put_bulk(p, (void *)bufs, buf_count);
724 
725 	if (failed)
726 		return -1;
727 
728 	printf("Marked packets test passed\n");
729 	return 0;
730 }
731 
732 static
test_error_distributor_create_name(void)733 int test_error_distributor_create_name(void)
734 {
735 	struct rte_distributor *d = NULL;
736 	struct rte_distributor *db = NULL;
737 	char *name = NULL;
738 
739 	d = rte_distributor_create(name, rte_socket_id(),
740 			rte_lcore_count() - 1,
741 			RTE_DIST_ALG_SINGLE);
742 	if (d != NULL || rte_errno != EINVAL) {
743 		printf("ERROR: No error on create() with NULL name param\n");
744 		return -1;
745 	}
746 
747 	db = rte_distributor_create(name, rte_socket_id(),
748 			rte_lcore_count() - 1,
749 			RTE_DIST_ALG_BURST);
750 	if (db != NULL || rte_errno != EINVAL) {
751 		printf("ERROR: No error on create() with NULL param\n");
752 		return -1;
753 	}
754 
755 	return 0;
756 }
757 
758 
759 static
test_error_distributor_create_numworkers(void)760 int test_error_distributor_create_numworkers(void)
761 {
762 	struct rte_distributor *ds = NULL;
763 	struct rte_distributor *db = NULL;
764 
765 	ds = rte_distributor_create("test_numworkers", rte_socket_id(),
766 			RTE_MAX_LCORE + 10,
767 			RTE_DIST_ALG_SINGLE);
768 	if (ds != NULL || rte_errno != EINVAL) {
769 		printf("ERROR: No error on create() with num_workers > MAX\n");
770 		return -1;
771 	}
772 
773 	db = rte_distributor_create("test_numworkers", rte_socket_id(),
774 			RTE_MAX_LCORE + 10,
775 			RTE_DIST_ALG_BURST);
776 	if (db != NULL || rte_errno != EINVAL) {
777 		printf("ERROR: No error on create() num_workers > MAX\n");
778 		return -1;
779 	}
780 
781 	return 0;
782 }
783 
784 
785 /* Useful function which ensures that all worker functions terminate */
786 static void
quit_workers(struct worker_params * wp,struct rte_mempool * p)787 quit_workers(struct worker_params *wp, struct rte_mempool *p)
788 {
789 	struct rte_distributor *d = wp->dist;
790 	const unsigned num_workers = rte_lcore_count() - 1;
791 	unsigned i;
792 	struct rte_mbuf *bufs[RTE_MAX_LCORE];
793 	struct rte_mbuf *returns[RTE_MAX_LCORE];
794 	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
795 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
796 		return;
797 	}
798 
799 	zero_quit = 0;
800 	quit = 1;
801 	for (i = 0; i < num_workers; i++) {
802 		bufs[i]->hash.usr = i << 1;
803 		rte_distributor_process(d, &bufs[i], 1);
804 	}
805 
806 	rte_distributor_process(d, NULL, 0);
807 	rte_distributor_flush(d);
808 	rte_eal_mp_wait_lcore();
809 
810 	while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE))
811 		;
812 
813 	rte_distributor_clear_returns(d);
814 	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
815 
816 	quit = 0;
817 	worker_idx = 0;
818 	zero_idx = RTE_MAX_LCORE;
819 	zero_quit = 0;
820 	zero_sleep = 0;
821 }
822 
823 static int
test_distributor(void)824 test_distributor(void)
825 {
826 	static struct rte_distributor *ds;
827 	static struct rte_distributor *db;
828 	static struct rte_distributor *dist[2];
829 	static struct rte_mempool *p;
830 	int i;
831 
832 	static const struct rte_mbuf_dynfield seq_dynfield_desc = {
833 		.name = "test_distributor_dynfield_seq",
834 		.size = sizeof(seq_dynfield_t),
835 		.align = alignof(seq_dynfield_t),
836 	};
837 	seq_dynfield_offset =
838 		rte_mbuf_dynfield_register(&seq_dynfield_desc);
839 	if (seq_dynfield_offset < 0) {
840 		printf("Error registering mbuf field\n");
841 		return TEST_FAILED;
842 	}
843 
844 	if (rte_lcore_count() < 2) {
845 		printf("Not enough cores for distributor_autotest, expecting at least 2\n");
846 		return TEST_SKIPPED;
847 	}
848 
849 	if (db == NULL) {
850 		db = rte_distributor_create("Test_dist_burst", rte_socket_id(),
851 				rte_lcore_count() - 1,
852 				RTE_DIST_ALG_BURST);
853 		if (db == NULL) {
854 			printf("Error creating burst distributor\n");
855 			return -1;
856 		}
857 	} else {
858 		rte_distributor_flush(db);
859 		rte_distributor_clear_returns(db);
860 	}
861 
862 	if (ds == NULL) {
863 		ds = rte_distributor_create("Test_dist_single",
864 				rte_socket_id(),
865 				rte_lcore_count() - 1,
866 			RTE_DIST_ALG_SINGLE);
867 		if (ds == NULL) {
868 			printf("Error creating single distributor\n");
869 			return -1;
870 		}
871 	} else {
872 		rte_distributor_flush(ds);
873 		rte_distributor_clear_returns(ds);
874 	}
875 
876 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
877 			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
878 	if (p == NULL) {
879 		p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
880 			0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
881 		if (p == NULL) {
882 			printf("Error creating mempool\n");
883 			return -1;
884 		}
885 	}
886 
887 	dist[0] = ds;
888 	dist[1] = db;
889 
890 	for (i = 0; i < 2; i++) {
891 
892 		worker_params.dist = dist[i];
893 		if (i)
894 			strlcpy(worker_params.name, "burst",
895 					sizeof(worker_params.name));
896 		else
897 			strlcpy(worker_params.name, "single",
898 					sizeof(worker_params.name));
899 
900 		rte_eal_mp_remote_launch(handle_work,
901 				&worker_params, SKIP_MAIN);
902 		if (sanity_test(&worker_params, p) < 0)
903 			goto err;
904 		quit_workers(&worker_params, p);
905 
906 		rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
907 				&worker_params, SKIP_MAIN);
908 		if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
909 			goto err;
910 		quit_workers(&worker_params, p);
911 
912 		if (rte_lcore_count() > 2) {
913 			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
914 					&worker_params,
915 					SKIP_MAIN);
916 			if (sanity_test_with_worker_shutdown(&worker_params,
917 					p) < 0)
918 				goto err;
919 			quit_workers(&worker_params, p);
920 
921 			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
922 					&worker_params,
923 					SKIP_MAIN);
924 			if (test_flush_with_worker_shutdown(&worker_params,
925 					p) < 0)
926 				goto err;
927 			quit_workers(&worker_params, p);
928 
929 			rte_eal_mp_remote_launch(handle_and_mark_work,
930 					&worker_params, SKIP_MAIN);
931 			if (sanity_mark_test(&worker_params, p) < 0)
932 				goto err;
933 			quit_workers(&worker_params, p);
934 
935 		} else {
936 			printf("Too few cores to run worker shutdown test\n");
937 		}
938 
939 	}
940 
941 	if (test_error_distributor_create_numworkers() == -1 ||
942 			test_error_distributor_create_name() == -1) {
943 		printf("rte_distributor_create parameter check tests failed");
944 		return -1;
945 	}
946 
947 	return 0;
948 
949 err:
950 	quit_workers(&worker_params, p);
951 	return -1;
952 }
953 
954 #endif /* !RTE_EXEC_ENV_WINDOWS */
955 
956 REGISTER_FAST_TEST(distributor_autotest, false, true, test_distributor);
957