xref: /dpdk/app/test/test_distributor.c (revision 089e5ed727a15da2729cfee9b63533dd120bd04c)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4 
5 #include "test.h"
6 
7 #include <unistd.h>
8 #include <string.h>
9 #include <rte_cycles.h>
10 #include <rte_errno.h>
11 #include <rte_mempool.h>
12 #include <rte_mbuf.h>
13 #include <rte_distributor.h>
14 #include <rte_string_fns.h>
15 
16 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
17 #define BURST 32
18 #define BIG_BATCH 1024
19 
20 struct worker_params {
21 	char name[64];
22 	struct rte_distributor *dist;
23 };
24 
25 struct worker_params worker_params;
26 
27 /* statics - all zero-initialized by default */
28 static volatile int quit;      /**< general quit variable for all threads */
29 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
30 static volatile unsigned worker_idx;
31 
32 struct worker_stats {
33 	volatile unsigned handled_packets;
34 } __rte_cache_aligned;
35 struct worker_stats worker_stats[RTE_MAX_LCORE];
36 
37 /* returns the total count of the number of packets handled by the worker
38  * functions given below.
39  */
40 static inline unsigned
41 total_packet_count(void)
42 {
43 	unsigned i, count = 0;
44 	for (i = 0; i < worker_idx; i++)
45 		count += worker_stats[i].handled_packets;
46 	return count;
47 }
48 
49 /* resets the packet counts for a new test */
50 static inline void
51 clear_packet_count(void)
52 {
53 	memset(&worker_stats, 0, sizeof(worker_stats));
54 }
55 
56 /* this is the basic worker function for sanity test
57  * it does nothing but return packets and count them.
58  */
59 static int
60 handle_work(void *arg)
61 {
62 	struct rte_mbuf *buf[8] __rte_cache_aligned;
63 	struct worker_params *wp = arg;
64 	struct rte_distributor *db = wp->dist;
65 	unsigned int count = 0, num = 0;
66 	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
67 	int i;
68 
69 	for (i = 0; i < 8; i++)
70 		buf[i] = NULL;
71 	num = rte_distributor_get_pkt(db, id, buf, buf, num);
72 	while (!quit) {
73 		worker_stats[id].handled_packets += num;
74 		count += num;
75 		num = rte_distributor_get_pkt(db, id,
76 				buf, buf, num);
77 	}
78 	worker_stats[id].handled_packets += num;
79 	count += num;
80 	rte_distributor_return_pkt(db, id, buf, num);
81 	return 0;
82 }
83 
84 /* do basic sanity testing of the distributor. This test tests the following:
85  * - send 32 packets through distributor with the same tag and ensure they
86  *   all go to the one worker
87  * - send 32 packets through the distributor with two different tags and
88  *   verify that they go equally to two different workers.
89  * - send 32 packets with different tags through the distributors and
90  *   just verify we get all packets back.
91  * - send 1024 packets through the distributor, gathering the returned packets
92  *   as we go. Then verify that we correctly got all 1024 pointers back again,
93  *   not necessarily in the same order (as different flows).
94  */
95 static int
96 sanity_test(struct worker_params *wp, struct rte_mempool *p)
97 {
98 	struct rte_distributor *db = wp->dist;
99 	struct rte_mbuf *bufs[BURST];
100 	struct rte_mbuf *returns[BURST*2];
101 	unsigned int i, count;
102 	unsigned int retries;
103 
104 	printf("=== Basic distributor sanity tests ===\n");
105 	clear_packet_count();
106 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
107 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
108 		return -1;
109 	}
110 
111 	/* now set all hash values in all buffers to zero, so all pkts go to the
112 	 * one worker thread */
113 	for (i = 0; i < BURST; i++)
114 		bufs[i]->hash.usr = 0;
115 
116 	rte_distributor_process(db, bufs, BURST);
117 	count = 0;
118 	do {
119 
120 		rte_distributor_flush(db);
121 		count += rte_distributor_returned_pkts(db,
122 				returns, BURST*2);
123 	} while (count < BURST);
124 
125 	if (total_packet_count() != BURST) {
126 		printf("Line %d: Error, not all packets flushed. "
127 				"Expected %u, got %u\n",
128 				__LINE__, BURST, total_packet_count());
129 		return -1;
130 	}
131 
132 	for (i = 0; i < rte_lcore_count() - 1; i++)
133 		printf("Worker %u handled %u packets\n", i,
134 				worker_stats[i].handled_packets);
135 	printf("Sanity test with all zero hashes done.\n");
136 
137 	/* pick two flows and check they go correctly */
138 	if (rte_lcore_count() >= 3) {
139 		clear_packet_count();
140 		for (i = 0; i < BURST; i++)
141 			bufs[i]->hash.usr = (i & 1) << 8;
142 
143 		rte_distributor_process(db, bufs, BURST);
144 		count = 0;
145 		do {
146 			rte_distributor_flush(db);
147 			count += rte_distributor_returned_pkts(db,
148 					returns, BURST*2);
149 		} while (count < BURST);
150 		if (total_packet_count() != BURST) {
151 			printf("Line %d: Error, not all packets flushed. "
152 					"Expected %u, got %u\n",
153 					__LINE__, BURST, total_packet_count());
154 			return -1;
155 		}
156 
157 		for (i = 0; i < rte_lcore_count() - 1; i++)
158 			printf("Worker %u handled %u packets\n", i,
159 					worker_stats[i].handled_packets);
160 		printf("Sanity test with two hash values done\n");
161 	}
162 
163 	/* give a different hash value to each packet,
164 	 * so load gets distributed */
165 	clear_packet_count();
166 	for (i = 0; i < BURST; i++)
167 		bufs[i]->hash.usr = i+1;
168 
169 	rte_distributor_process(db, bufs, BURST);
170 	count = 0;
171 	do {
172 		rte_distributor_flush(db);
173 		count += rte_distributor_returned_pkts(db,
174 				returns, BURST*2);
175 	} while (count < BURST);
176 	if (total_packet_count() != BURST) {
177 		printf("Line %d: Error, not all packets flushed. "
178 				"Expected %u, got %u\n",
179 				__LINE__, BURST, total_packet_count());
180 		return -1;
181 	}
182 
183 	for (i = 0; i < rte_lcore_count() - 1; i++)
184 		printf("Worker %u handled %u packets\n", i,
185 				worker_stats[i].handled_packets);
186 	printf("Sanity test with non-zero hashes done\n");
187 
188 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
189 
190 	/* sanity test with BIG_BATCH packets to ensure they all arrived back
191 	 * from the returned packets function */
192 	clear_packet_count();
193 	struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
194 	unsigned num_returned = 0;
195 
196 	/* flush out any remaining packets */
197 	rte_distributor_flush(db);
198 	rte_distributor_clear_returns(db);
199 
200 	if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
201 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
202 		return -1;
203 	}
204 	for (i = 0; i < BIG_BATCH; i++)
205 		many_bufs[i]->hash.usr = i << 2;
206 
207 	printf("=== testing big burst (%s) ===\n", wp->name);
208 	for (i = 0; i < BIG_BATCH/BURST; i++) {
209 		rte_distributor_process(db,
210 				&many_bufs[i*BURST], BURST);
211 		count = rte_distributor_returned_pkts(db,
212 				&return_bufs[num_returned],
213 				BIG_BATCH - num_returned);
214 		num_returned += count;
215 	}
216 	rte_distributor_flush(db);
217 	count = rte_distributor_returned_pkts(db,
218 		&return_bufs[num_returned],
219 			BIG_BATCH - num_returned);
220 	num_returned += count;
221 	retries = 0;
222 	do {
223 		rte_distributor_flush(db);
224 		count = rte_distributor_returned_pkts(db,
225 				&return_bufs[num_returned],
226 				BIG_BATCH - num_returned);
227 		num_returned += count;
228 		retries++;
229 	} while ((num_returned < BIG_BATCH) && (retries < 100));
230 
231 	if (num_returned != BIG_BATCH) {
232 		printf("line %d: Missing packets, expected %d\n",
233 				__LINE__, num_returned);
234 		return -1;
235 	}
236 
237 	/* big check -  make sure all packets made it back!! */
238 	for (i = 0; i < BIG_BATCH; i++) {
239 		unsigned j;
240 		struct rte_mbuf *src = many_bufs[i];
241 		for (j = 0; j < BIG_BATCH; j++) {
242 			if (return_bufs[j] == src)
243 				break;
244 		}
245 
246 		if (j == BIG_BATCH) {
247 			printf("Error: could not find source packet #%u\n", i);
248 			return -1;
249 		}
250 	}
251 	printf("Sanity test of returned packets done\n");
252 
253 	rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
254 
255 	printf("\n");
256 	return 0;
257 }
258 
259 
260 /* to test that the distributor does not lose packets, we use this worker
261  * function which frees mbufs when it gets them. The distributor thread does
262  * the mbuf allocation. If distributor drops packets we'll eventually run out
263  * of mbufs.
264  */
265 static int
266 handle_work_with_free_mbufs(void *arg)
267 {
268 	struct rte_mbuf *buf[8] __rte_cache_aligned;
269 	struct worker_params *wp = arg;
270 	struct rte_distributor *d = wp->dist;
271 	unsigned int count = 0;
272 	unsigned int i;
273 	unsigned int num = 0;
274 	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
275 
276 	for (i = 0; i < 8; i++)
277 		buf[i] = NULL;
278 	num = rte_distributor_get_pkt(d, id, buf, buf, num);
279 	while (!quit) {
280 		worker_stats[id].handled_packets += num;
281 		count += num;
282 		for (i = 0; i < num; i++)
283 			rte_pktmbuf_free(buf[i]);
284 		num = rte_distributor_get_pkt(d,
285 				id, buf, buf, num);
286 	}
287 	worker_stats[id].handled_packets += num;
288 	count += num;
289 	rte_distributor_return_pkt(d, id, buf, num);
290 	return 0;
291 }
292 
293 /* Perform a sanity test of the distributor with a large number of packets,
294  * where we allocate a new set of mbufs for each burst. The workers then
295  * free the mbufs. This ensures that we don't have any packet leaks in the
296  * library.
297  */
298 static int
299 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
300 {
301 	struct rte_distributor *d = wp->dist;
302 	unsigned i;
303 	struct rte_mbuf *bufs[BURST];
304 
305 	printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
306 
307 	clear_packet_count();
308 	for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
309 		unsigned j;
310 		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
311 			rte_distributor_process(d, NULL, 0);
312 		for (j = 0; j < BURST; j++) {
313 			bufs[j]->hash.usr = (i+j) << 1;
314 			rte_mbuf_refcnt_set(bufs[j], 1);
315 		}
316 
317 		rte_distributor_process(d, bufs, BURST);
318 	}
319 
320 	rte_distributor_flush(d);
321 
322 	rte_delay_us(10000);
323 
324 	if (total_packet_count() < (1<<ITER_POWER)) {
325 		printf("Line %u: Packet count is incorrect, %u, expected %u\n",
326 				__LINE__, total_packet_count(),
327 				(1<<ITER_POWER));
328 		return -1;
329 	}
330 
331 	printf("Sanity test with mbuf alloc/free passed\n\n");
332 	return 0;
333 }
334 
335 static int
336 handle_work_for_shutdown_test(void *arg)
337 {
338 	struct rte_mbuf *pkt = NULL;
339 	struct rte_mbuf *buf[8] __rte_cache_aligned;
340 	struct worker_params *wp = arg;
341 	struct rte_distributor *d = wp->dist;
342 	unsigned int count = 0;
343 	unsigned int num = 0;
344 	unsigned int total = 0;
345 	unsigned int i;
346 	unsigned int returned = 0;
347 	const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
348 			__ATOMIC_RELAXED);
349 
350 	num = rte_distributor_get_pkt(d, id, buf, buf, num);
351 
352 	/* wait for quit single globally, or for worker zero, wait
353 	 * for zero_quit */
354 	while (!quit && !(id == 0 && zero_quit)) {
355 		worker_stats[id].handled_packets += num;
356 		count += num;
357 		for (i = 0; i < num; i++)
358 			rte_pktmbuf_free(buf[i]);
359 		num = rte_distributor_get_pkt(d,
360 				id, buf, buf, num);
361 		total += num;
362 	}
363 	worker_stats[id].handled_packets += num;
364 	count += num;
365 	returned = rte_distributor_return_pkt(d, id, buf, num);
366 
367 	if (id == 0) {
368 		/* for worker zero, allow it to restart to pick up last packet
369 		 * when all workers are shutting down.
370 		 */
371 		while (zero_quit)
372 			usleep(100);
373 
374 		num = rte_distributor_get_pkt(d,
375 				id, buf, buf, num);
376 
377 		while (!quit) {
378 			worker_stats[id].handled_packets += num;
379 			count += num;
380 			rte_pktmbuf_free(pkt);
381 			num = rte_distributor_get_pkt(d, id, buf, buf, num);
382 		}
383 		returned = rte_distributor_return_pkt(d,
384 				id, buf, num);
385 		printf("Num returned = %d\n", returned);
386 	}
387 	return 0;
388 }
389 
390 
391 /* Perform a sanity test of the distributor with a large number of packets,
392  * where we allocate a new set of mbufs for each burst. The workers then
393  * free the mbufs. This ensures that we don't have any packet leaks in the
394  * library.
395  */
396 static int
397 sanity_test_with_worker_shutdown(struct worker_params *wp,
398 		struct rte_mempool *p)
399 {
400 	struct rte_distributor *d = wp->dist;
401 	struct rte_mbuf *bufs[BURST];
402 	unsigned i;
403 
404 	printf("=== Sanity test of worker shutdown ===\n");
405 
406 	clear_packet_count();
407 
408 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
409 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
410 		return -1;
411 	}
412 
413 	/*
414 	 * Now set all hash values in all buffers to same value so all
415 	 * pkts go to the one worker thread
416 	 */
417 	for (i = 0; i < BURST; i++)
418 		bufs[i]->hash.usr = 1;
419 
420 	rte_distributor_process(d, bufs, BURST);
421 	rte_distributor_flush(d);
422 
423 	/* at this point, we will have processed some packets and have a full
424 	 * backlog for the other ones at worker 0.
425 	 */
426 
427 	/* get more buffers to queue up, again setting them to the same flow */
428 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
429 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
430 		return -1;
431 	}
432 	for (i = 0; i < BURST; i++)
433 		bufs[i]->hash.usr = 1;
434 
435 	/* get worker zero to quit */
436 	zero_quit = 1;
437 	rte_distributor_process(d, bufs, BURST);
438 
439 	/* flush the distributor */
440 	rte_distributor_flush(d);
441 	rte_delay_us(10000);
442 
443 	for (i = 0; i < rte_lcore_count() - 1; i++)
444 		printf("Worker %u handled %u packets\n", i,
445 				worker_stats[i].handled_packets);
446 
447 	if (total_packet_count() != BURST * 2) {
448 		printf("Line %d: Error, not all packets flushed. "
449 				"Expected %u, got %u\n",
450 				__LINE__, BURST * 2, total_packet_count());
451 		return -1;
452 	}
453 
454 	printf("Sanity test with worker shutdown passed\n\n");
455 	return 0;
456 }
457 
458 /* Test that the flush function is able to move packets between workers when
459  * one worker shuts down..
460  */
461 static int
462 test_flush_with_worker_shutdown(struct worker_params *wp,
463 		struct rte_mempool *p)
464 {
465 	struct rte_distributor *d = wp->dist;
466 	struct rte_mbuf *bufs[BURST];
467 	unsigned i;
468 
469 	printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
470 
471 	clear_packet_count();
472 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
473 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
474 		return -1;
475 	}
476 
477 	/* now set all hash values in all buffers to zero, so all pkts go to the
478 	 * one worker thread */
479 	for (i = 0; i < BURST; i++)
480 		bufs[i]->hash.usr = 0;
481 
482 	rte_distributor_process(d, bufs, BURST);
483 	/* at this point, we will have processed some packets and have a full
484 	 * backlog for the other ones at worker 0.
485 	 */
486 
487 	/* get worker zero to quit */
488 	zero_quit = 1;
489 
490 	/* flush the distributor */
491 	rte_distributor_flush(d);
492 
493 	rte_delay_us(10000);
494 
495 	zero_quit = 0;
496 	for (i = 0; i < rte_lcore_count() - 1; i++)
497 		printf("Worker %u handled %u packets\n", i,
498 				worker_stats[i].handled_packets);
499 
500 	if (total_packet_count() != BURST) {
501 		printf("Line %d: Error, not all packets flushed. "
502 				"Expected %u, got %u\n",
503 				__LINE__, BURST, total_packet_count());
504 		return -1;
505 	}
506 
507 	printf("Flush test with worker shutdown passed\n\n");
508 	return 0;
509 }
510 
511 static
512 int test_error_distributor_create_name(void)
513 {
514 	struct rte_distributor *d = NULL;
515 	struct rte_distributor *db = NULL;
516 	char *name = NULL;
517 
518 	d = rte_distributor_create(name, rte_socket_id(),
519 			rte_lcore_count() - 1,
520 			RTE_DIST_ALG_SINGLE);
521 	if (d != NULL || rte_errno != EINVAL) {
522 		printf("ERROR: No error on create() with NULL name param\n");
523 		return -1;
524 	}
525 
526 	db = rte_distributor_create(name, rte_socket_id(),
527 			rte_lcore_count() - 1,
528 			RTE_DIST_ALG_BURST);
529 	if (db != NULL || rte_errno != EINVAL) {
530 		printf("ERROR: No error on create() with NULL param\n");
531 		return -1;
532 	}
533 
534 	return 0;
535 }
536 
537 
538 static
539 int test_error_distributor_create_numworkers(void)
540 {
541 	struct rte_distributor *ds = NULL;
542 	struct rte_distributor *db = NULL;
543 
544 	ds = rte_distributor_create("test_numworkers", rte_socket_id(),
545 			RTE_MAX_LCORE + 10,
546 			RTE_DIST_ALG_SINGLE);
547 	if (ds != NULL || rte_errno != EINVAL) {
548 		printf("ERROR: No error on create() with num_workers > MAX\n");
549 		return -1;
550 	}
551 
552 	db = rte_distributor_create("test_numworkers", rte_socket_id(),
553 			RTE_MAX_LCORE + 10,
554 			RTE_DIST_ALG_BURST);
555 	if (db != NULL || rte_errno != EINVAL) {
556 		printf("ERROR: No error on create() num_workers > MAX\n");
557 		return -1;
558 	}
559 
560 	return 0;
561 }
562 
563 
564 /* Useful function which ensures that all worker functions terminate */
565 static void
566 quit_workers(struct worker_params *wp, struct rte_mempool *p)
567 {
568 	struct rte_distributor *d = wp->dist;
569 	const unsigned num_workers = rte_lcore_count() - 1;
570 	unsigned i;
571 	struct rte_mbuf *bufs[RTE_MAX_LCORE];
572 	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
573 
574 	zero_quit = 0;
575 	quit = 1;
576 	for (i = 0; i < num_workers; i++)
577 		bufs[i]->hash.usr = i << 1;
578 	rte_distributor_process(d, bufs, num_workers);
579 
580 	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
581 
582 	rte_distributor_process(d, NULL, 0);
583 	rte_distributor_flush(d);
584 	rte_eal_mp_wait_lcore();
585 	quit = 0;
586 	worker_idx = 0;
587 }
588 
589 static int
590 test_distributor(void)
591 {
592 	static struct rte_distributor *ds;
593 	static struct rte_distributor *db;
594 	static struct rte_distributor *dist[2];
595 	static struct rte_mempool *p;
596 	int i;
597 
598 	if (rte_lcore_count() < 2) {
599 		printf("Not enough cores for distributor_autotest, expecting at least 2\n");
600 		return TEST_SKIPPED;
601 	}
602 
603 	if (db == NULL) {
604 		db = rte_distributor_create("Test_dist_burst", rte_socket_id(),
605 				rte_lcore_count() - 1,
606 				RTE_DIST_ALG_BURST);
607 		if (db == NULL) {
608 			printf("Error creating burst distributor\n");
609 			return -1;
610 		}
611 	} else {
612 		rte_distributor_flush(db);
613 		rte_distributor_clear_returns(db);
614 	}
615 
616 	if (ds == NULL) {
617 		ds = rte_distributor_create("Test_dist_single",
618 				rte_socket_id(),
619 				rte_lcore_count() - 1,
620 			RTE_DIST_ALG_SINGLE);
621 		if (ds == NULL) {
622 			printf("Error creating single distributor\n");
623 			return -1;
624 		}
625 	} else {
626 		rte_distributor_flush(ds);
627 		rte_distributor_clear_returns(ds);
628 	}
629 
630 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
631 			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
632 	if (p == NULL) {
633 		p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
634 			0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
635 		if (p == NULL) {
636 			printf("Error creating mempool\n");
637 			return -1;
638 		}
639 	}
640 
641 	dist[0] = ds;
642 	dist[1] = db;
643 
644 	for (i = 0; i < 2; i++) {
645 
646 		worker_params.dist = dist[i];
647 		if (i)
648 			strlcpy(worker_params.name, "burst",
649 					sizeof(worker_params.name));
650 		else
651 			strlcpy(worker_params.name, "single",
652 					sizeof(worker_params.name));
653 
654 		rte_eal_mp_remote_launch(handle_work,
655 				&worker_params, SKIP_MASTER);
656 		if (sanity_test(&worker_params, p) < 0)
657 			goto err;
658 		quit_workers(&worker_params, p);
659 
660 		rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
661 				&worker_params, SKIP_MASTER);
662 		if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
663 			goto err;
664 		quit_workers(&worker_params, p);
665 
666 		if (rte_lcore_count() > 2) {
667 			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
668 					&worker_params,
669 					SKIP_MASTER);
670 			if (sanity_test_with_worker_shutdown(&worker_params,
671 					p) < 0)
672 				goto err;
673 			quit_workers(&worker_params, p);
674 
675 			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
676 					&worker_params,
677 					SKIP_MASTER);
678 			if (test_flush_with_worker_shutdown(&worker_params,
679 					p) < 0)
680 				goto err;
681 			quit_workers(&worker_params, p);
682 
683 		} else {
684 			printf("Too few cores to run worker shutdown test\n");
685 		}
686 
687 	}
688 
689 	if (test_error_distributor_create_numworkers() == -1 ||
690 			test_error_distributor_create_name() == -1) {
691 		printf("rte_distributor_create parameter check tests failed");
692 		return -1;
693 	}
694 
695 	return 0;
696 
697 err:
698 	quit_workers(&worker_params, p);
699 	return -1;
700 }
701 
702 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);
703