xref: /dpdk/app/test-eventdev/test_pipeline_atq.c (revision daa02b5cddbb8e11b31d41e2bf7bb1ae64dcae2f)
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5 
6 #include "test_pipeline_common.h"
7 
8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
9 
10 static __rte_always_inline int
11 pipeline_atq_nb_event_queues(struct evt_options *opt)
12 {
13 	RTE_SET_USED(opt);
14 
15 	return rte_eth_dev_count_avail();
16 }
17 
18 typedef int (*pipeline_atq_worker_t)(void *arg);
19 
20 static __rte_noinline int
21 pipeline_atq_worker_single_stage_tx(void *arg)
22 {
23 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
24 
25 	while (t->done == false) {
26 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
27 
28 		if (!event) {
29 			rte_pause();
30 			continue;
31 		}
32 
33 		pipeline_event_tx(dev, port, &ev);
34 		w->processed_pkts++;
35 	}
36 
37 	return 0;
38 }
39 
40 static __rte_noinline int
41 pipeline_atq_worker_single_stage_fwd(void *arg)
42 {
43 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
44 	const uint8_t *tx_queue = t->tx_evqueue_id;
45 
46 	while (t->done == false) {
47 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
48 
49 		if (!event) {
50 			rte_pause();
51 			continue;
52 		}
53 
54 		ev.queue_id = tx_queue[ev.mbuf->port];
55 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
56 		pipeline_event_enqueue(dev, port, &ev);
57 		w->processed_pkts++;
58 	}
59 
60 	return 0;
61 }
62 
63 static __rte_noinline int
64 pipeline_atq_worker_single_stage_burst_tx(void *arg)
65 {
66 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
67 
68 	while (t->done == false) {
69 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
70 				BURST_SIZE, 0);
71 
72 		if (!nb_rx) {
73 			rte_pause();
74 			continue;
75 		}
76 
77 		for (i = 0; i < nb_rx; i++) {
78 			rte_prefetch0(ev[i + 1].mbuf);
79 			rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
80 		}
81 
82 		pipeline_event_tx_burst(dev, port, ev, nb_rx);
83 		w->processed_pkts += nb_rx;
84 	}
85 
86 	return 0;
87 }
88 
89 static __rte_noinline int
90 pipeline_atq_worker_single_stage_burst_fwd(void *arg)
91 {
92 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
93 	const uint8_t *tx_queue = t->tx_evqueue_id;
94 
95 	while (t->done == false) {
96 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
97 				BURST_SIZE, 0);
98 
99 		if (!nb_rx) {
100 			rte_pause();
101 			continue;
102 		}
103 
104 		for (i = 0; i < nb_rx; i++) {
105 			rte_prefetch0(ev[i + 1].mbuf);
106 			rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
107 			ev[i].queue_id = tx_queue[ev[i].mbuf->port];
108 			pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
109 		}
110 
111 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
112 		w->processed_pkts += nb_rx;
113 	}
114 
115 	return 0;
116 }
117 
118 static __rte_noinline int
119 pipeline_atq_worker_single_stage_tx_vector(void *arg)
120 {
121 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
122 	uint16_t vector_sz;
123 
124 	while (!t->done) {
125 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
126 
127 		if (!event) {
128 			rte_pause();
129 			continue;
130 		}
131 		vector_sz = ev.vec->nb_elem;
132 		pipeline_event_tx_vector(dev, port, &ev);
133 		w->processed_pkts += vector_sz;
134 	}
135 
136 	return 0;
137 }
138 
139 static __rte_noinline int
140 pipeline_atq_worker_single_stage_fwd_vector(void *arg)
141 {
142 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
143 	const uint8_t *tx_queue = t->tx_evqueue_id;
144 	uint16_t vector_sz;
145 
146 	while (!t->done) {
147 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
148 
149 		if (!event) {
150 			rte_pause();
151 			continue;
152 		}
153 
154 		vector_sz = ev.vec->nb_elem;
155 		ev.queue_id = tx_queue[ev.vec->port];
156 		ev.vec->queue = 0;
157 		pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
158 		pipeline_event_enqueue(dev, port, &ev);
159 		w->processed_pkts += vector_sz;
160 	}
161 
162 	return 0;
163 }
164 
165 static __rte_noinline int
166 pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
167 {
168 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
169 	uint16_t vector_sz;
170 
171 	while (!t->done) {
172 		uint16_t nb_rx =
173 			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
174 
175 		if (!nb_rx) {
176 			rte_pause();
177 			continue;
178 		}
179 		vector_sz = 0;
180 		for (i = 0; i < nb_rx; i++) {
181 			vector_sz += ev[i].vec->nb_elem;
182 			ev[i].vec->queue = 0;
183 		}
184 
185 		pipeline_event_tx_burst(dev, port, ev, nb_rx);
186 		w->processed_pkts += vector_sz;
187 	}
188 
189 	return 0;
190 }
191 
192 static __rte_noinline int
193 pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
194 {
195 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
196 	const uint8_t *tx_queue = t->tx_evqueue_id;
197 	uint16_t vector_sz;
198 
199 	while (!t->done) {
200 		uint16_t nb_rx =
201 			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
202 
203 		if (!nb_rx) {
204 			rte_pause();
205 			continue;
206 		}
207 
208 		vector_sz = 0;
209 		for (i = 0; i < nb_rx; i++) {
210 			ev[i].queue_id = tx_queue[ev[i].vec->port];
211 			ev[i].vec->queue = 0;
212 			vector_sz += ev[i].vec->nb_elem;
213 			pipeline_fwd_event_vector(&ev[i],
214 						  RTE_SCHED_TYPE_ATOMIC);
215 		}
216 
217 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
218 		w->processed_pkts += vector_sz;
219 	}
220 
221 	return 0;
222 }
223 
224 static __rte_noinline int
225 pipeline_atq_worker_multi_stage_tx(void *arg)
226 {
227 	PIPELINE_WORKER_MULTI_STAGE_INIT;
228 
229 	while (t->done == false) {
230 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
231 
232 		if (!event) {
233 			rte_pause();
234 			continue;
235 		}
236 
237 		cq_id = ev.sub_event_type % nb_stages;
238 
239 		if (cq_id == last_queue) {
240 			pipeline_event_tx(dev, port, &ev);
241 			w->processed_pkts++;
242 			continue;
243 		}
244 
245 		ev.sub_event_type++;
246 		pipeline_fwd_event(&ev, sched_type_list[cq_id]);
247 		pipeline_event_enqueue(dev, port, &ev);
248 	}
249 
250 	return 0;
251 }
252 
253 static __rte_noinline int
254 pipeline_atq_worker_multi_stage_fwd(void *arg)
255 {
256 	PIPELINE_WORKER_MULTI_STAGE_INIT;
257 	const uint8_t *tx_queue = t->tx_evqueue_id;
258 
259 	while (t->done == false) {
260 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
261 
262 		if (!event) {
263 			rte_pause();
264 			continue;
265 		}
266 
267 		cq_id = ev.sub_event_type % nb_stages;
268 
269 		if (cq_id == last_queue) {
270 			ev.queue_id = tx_queue[ev.mbuf->port];
271 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
272 			w->processed_pkts++;
273 		} else {
274 			ev.sub_event_type++;
275 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
276 		}
277 
278 		pipeline_event_enqueue(dev, port, &ev);
279 	}
280 
281 	return 0;
282 }
283 
284 static __rte_noinline int
285 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
286 {
287 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
288 
289 	while (t->done == false) {
290 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
291 				BURST_SIZE, 0);
292 
293 		if (!nb_rx) {
294 			rte_pause();
295 			continue;
296 		}
297 
298 		for (i = 0; i < nb_rx; i++) {
299 			rte_prefetch0(ev[i + 1].mbuf);
300 			cq_id = ev[i].sub_event_type % nb_stages;
301 
302 			if (cq_id == last_queue) {
303 				pipeline_event_tx(dev, port, &ev[i]);
304 				ev[i].op = RTE_EVENT_OP_RELEASE;
305 				w->processed_pkts++;
306 				continue;
307 			}
308 
309 			ev[i].sub_event_type++;
310 			pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
311 		}
312 
313 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
314 	}
315 
316 	return 0;
317 }
318 
319 static __rte_noinline int
320 pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
321 {
322 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
323 	const uint8_t *tx_queue = t->tx_evqueue_id;
324 
325 	while (t->done == false) {
326 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
327 				BURST_SIZE, 0);
328 
329 		if (!nb_rx) {
330 			rte_pause();
331 			continue;
332 		}
333 
334 		for (i = 0; i < nb_rx; i++) {
335 			rte_prefetch0(ev[i + 1].mbuf);
336 			cq_id = ev[i].sub_event_type % nb_stages;
337 
338 			if (cq_id == last_queue) {
339 				w->processed_pkts++;
340 				ev[i].queue_id = tx_queue[ev[i].mbuf->port];
341 				pipeline_fwd_event(&ev[i],
342 						RTE_SCHED_TYPE_ATOMIC);
343 			} else {
344 				ev[i].sub_event_type++;
345 				pipeline_fwd_event(&ev[i],
346 						sched_type_list[cq_id]);
347 			}
348 		}
349 
350 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
351 	}
352 
353 	return 0;
354 }
355 
356 static __rte_noinline int
357 pipeline_atq_worker_multi_stage_tx_vector(void *arg)
358 {
359 	PIPELINE_WORKER_MULTI_STAGE_INIT;
360 	uint16_t vector_sz;
361 
362 	while (!t->done) {
363 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
364 
365 		if (!event) {
366 			rte_pause();
367 			continue;
368 		}
369 
370 		cq_id = ev.sub_event_type % nb_stages;
371 
372 		if (cq_id == last_queue) {
373 			vector_sz = ev.vec->nb_elem;
374 			pipeline_event_tx_vector(dev, port, &ev);
375 			w->processed_pkts += vector_sz;
376 			continue;
377 		}
378 
379 		ev.sub_event_type++;
380 		pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
381 		pipeline_event_enqueue(dev, port, &ev);
382 	}
383 
384 	return 0;
385 }
386 
387 static __rte_noinline int
388 pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
389 {
390 	PIPELINE_WORKER_MULTI_STAGE_INIT;
391 	const uint8_t *tx_queue = t->tx_evqueue_id;
392 	uint16_t vector_sz;
393 
394 	while (!t->done) {
395 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
396 
397 		if (!event) {
398 			rte_pause();
399 			continue;
400 		}
401 
402 		cq_id = ev.sub_event_type % nb_stages;
403 
404 		if (cq_id == last_queue) {
405 			ev.queue_id = tx_queue[ev.vec->port];
406 			ev.vec->queue = 0;
407 			vector_sz = ev.vec->nb_elem;
408 			pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
409 			pipeline_event_enqueue(dev, port, &ev);
410 			w->processed_pkts += vector_sz;
411 		} else {
412 			ev.sub_event_type++;
413 			pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
414 			pipeline_event_enqueue(dev, port, &ev);
415 		}
416 	}
417 
418 	return 0;
419 }
420 
421 static __rte_noinline int
422 pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
423 {
424 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
425 	uint16_t vector_sz;
426 
427 	while (!t->done) {
428 		uint16_t nb_rx =
429 			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
430 
431 		if (!nb_rx) {
432 			rte_pause();
433 			continue;
434 		}
435 
436 		for (i = 0; i < nb_rx; i++) {
437 			cq_id = ev[i].sub_event_type % nb_stages;
438 
439 			if (cq_id == last_queue) {
440 				vector_sz = ev[i].vec->nb_elem;
441 				pipeline_event_tx_vector(dev, port, &ev[i]);
442 				ev[i].op = RTE_EVENT_OP_RELEASE;
443 				w->processed_pkts += vector_sz;
444 				continue;
445 			}
446 
447 			ev[i].sub_event_type++;
448 			pipeline_fwd_event_vector(&ev[i],
449 						  sched_type_list[cq_id]);
450 		}
451 
452 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
453 	}
454 
455 	return 0;
456 }
457 
458 static __rte_noinline int
459 pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
460 {
461 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
462 	const uint8_t *tx_queue = t->tx_evqueue_id;
463 	uint16_t vector_sz;
464 
465 	while (!t->done) {
466 		uint16_t nb_rx =
467 			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
468 
469 		if (!nb_rx) {
470 			rte_pause();
471 			continue;
472 		}
473 
474 		for (i = 0; i < nb_rx; i++) {
475 			cq_id = ev[i].sub_event_type % nb_stages;
476 
477 			if (cq_id == last_queue) {
478 				vector_sz = ev[i].vec->nb_elem;
479 				ev[i].queue_id = tx_queue[ev[i].vec->port];
480 				ev[i].vec->queue = 0;
481 				pipeline_fwd_event_vector(
482 					&ev[i], RTE_SCHED_TYPE_ATOMIC);
483 				w->processed_pkts += vector_sz;
484 			} else {
485 				ev[i].sub_event_type++;
486 				pipeline_fwd_event_vector(
487 					&ev[i], sched_type_list[cq_id]);
488 			}
489 		}
490 
491 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
492 	}
493 
494 	return 0;
495 }
496 
497 static int
498 worker_wrapper(void *arg)
499 {
500 	struct worker_data *w  = arg;
501 	struct evt_options *opt = w->t->opt;
502 	const bool burst = evt_has_burst_mode(w->dev_id);
503 	const bool internal_port = w->t->internal_port;
504 	const uint8_t nb_stages = opt->nb_stages;
505 	/*vector/burst/internal_port*/
506 	const pipeline_atq_worker_t
507 	pipeline_atq_worker_single_stage[2][2][2] = {
508 		[0][0][0] = pipeline_atq_worker_single_stage_fwd,
509 		[0][0][1] = pipeline_atq_worker_single_stage_tx,
510 		[0][1][0] = pipeline_atq_worker_single_stage_burst_fwd,
511 		[0][1][1] = pipeline_atq_worker_single_stage_burst_tx,
512 		[1][0][0] = pipeline_atq_worker_single_stage_fwd_vector,
513 		[1][0][1] = pipeline_atq_worker_single_stage_tx_vector,
514 		[1][1][0] = pipeline_atq_worker_single_stage_burst_fwd_vector,
515 		[1][1][1] = pipeline_atq_worker_single_stage_burst_tx_vector,
516 	};
517 	const pipeline_atq_worker_t
518 	pipeline_atq_worker_multi_stage[2][2][2] = {
519 		[0][0][0] = pipeline_atq_worker_multi_stage_fwd,
520 		[0][0][1] = pipeline_atq_worker_multi_stage_tx,
521 		[0][1][0] = pipeline_atq_worker_multi_stage_burst_fwd,
522 		[0][1][1] = pipeline_atq_worker_multi_stage_burst_tx,
523 		[1][0][0] = pipeline_atq_worker_multi_stage_fwd_vector,
524 		[1][0][1] = pipeline_atq_worker_multi_stage_tx_vector,
525 		[1][1][0] = pipeline_atq_worker_multi_stage_burst_fwd_vector,
526 		[1][1][1] = pipeline_atq_worker_multi_stage_burst_tx_vector,
527 	};
528 
529 	if (nb_stages == 1)
530 		return (pipeline_atq_worker_single_stage[opt->ena_vector][burst]
531 							[internal_port])(arg);
532 	else
533 		return (pipeline_atq_worker_multi_stage[opt->ena_vector][burst]
534 						       [internal_port])(arg);
535 
536 	rte_panic("invalid worker\n");
537 }
538 
539 static int
540 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
541 {
542 	return pipeline_launch_lcores(test, opt, worker_wrapper);
543 }
544 
545 static int
546 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
547 {
548 	int ret;
549 	int nb_ports;
550 	int nb_queues;
551 	uint8_t queue, is_prod;
552 	uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
553 	uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
554 	uint8_t nb_worker_queues = 0;
555 	uint8_t tx_evport_id = 0;
556 	uint16_t prod = 0;
557 	struct rte_event_dev_info info;
558 	struct test_pipeline *t = evt_test_priv(test);
559 
560 	nb_ports = evt_nr_active_lcores(opt->wlcores);
561 	nb_queues = rte_eth_dev_count_avail();
562 
563 	memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
564 	memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
565 	/* One queue for Tx adapter per port */
566 	if (!t->internal_port) {
567 		RTE_ETH_FOREACH_DEV(prod) {
568 			tx_evqueue_id[prod] = nb_queues;
569 			nb_queues++;
570 		}
571 	}
572 
573 	rte_event_dev_info_get(opt->dev_id, &info);
574 
575 	ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
576 	if (ret) {
577 		evt_err("failed to configure eventdev %d", opt->dev_id);
578 		return ret;
579 	}
580 
581 	struct rte_event_queue_conf q_conf = {
582 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
583 		.nb_atomic_flows = opt->nb_flows,
584 		.nb_atomic_order_sequences = opt->nb_flows,
585 	};
586 	/* queue configurations */
587 	for (queue = 0; queue < nb_queues; queue++) {
588 		q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
589 
590 		if (!t->internal_port) {
591 			is_prod = false;
592 			RTE_ETH_FOREACH_DEV(prod) {
593 				if (queue == tx_evqueue_id[prod]) {
594 					q_conf.event_queue_cfg =
595 						RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
596 					is_prod = true;
597 					break;
598 				}
599 			}
600 			if (!is_prod) {
601 				queue_arr[nb_worker_queues] = queue;
602 				nb_worker_queues++;
603 			}
604 		}
605 
606 		ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
607 		if (ret) {
608 			evt_err("failed to setup queue=%d", queue);
609 			return ret;
610 		}
611 	}
612 
613 	if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
614 		opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
615 
616 	/* port configuration */
617 	const struct rte_event_port_conf p_conf = {
618 		.dequeue_depth = opt->wkr_deq_dep,
619 		.enqueue_depth = info.max_event_port_dequeue_depth,
620 		.new_event_threshold = info.max_num_events,
621 	};
622 
623 	if (!t->internal_port)
624 		ret = pipeline_event_port_setup(test, opt, queue_arr,
625 				nb_worker_queues, p_conf);
626 	else
627 		ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
628 				p_conf);
629 
630 	if (ret)
631 		return ret;
632 
633 	/*
634 	 * The pipelines are setup in the following manner:
635 	 *
636 	 * eth_dev_count = 2, nb_stages = 2, atq mode
637 	 *
638 	 * eth0, eth1 have Internal port capability :
639 	 *	queues = 2
640 	 *	stride = 1
641 	 *
642 	 *	event queue pipelines:
643 	 *	eth0 -> q0 ->Tx
644 	 *	eth1 -> q1 ->Tx
645 	 *
646 	 *	q0, q1 are configured as ATQ so, all the different stages can
647 	 *	be enqueued on the same queue.
648 	 *
649 	 * eth0, eth1 use Tx adapters service core :
650 	 *	queues = 4
651 	 *	stride = 1
652 	 *
653 	 *	event queue pipelines:
654 	 *	eth0 -> q0  -> q2 -> Tx
655 	 *	eth1 -> q1  -> q3 -> Tx
656 	 *
657 	 *	q0, q1 are configured as stated above.
658 	 *	q2, q3 configured as SINGLE_LINK.
659 	 */
660 	ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
661 	if (ret)
662 		return ret;
663 	ret = pipeline_event_tx_adapter_setup(opt, p_conf);
664 	if (ret)
665 		return ret;
666 
667 	if (!evt_has_distributed_sched(opt->dev_id)) {
668 		uint32_t service_id;
669 		rte_event_dev_service_id_get(opt->dev_id, &service_id);
670 		ret = evt_service_setup(service_id);
671 		if (ret) {
672 			evt_err("No service lcore found to run event dev.");
673 			return ret;
674 		}
675 	}
676 
677 	/* Connect the tx_evqueue_id to the Tx adapter port */
678 	if (!t->internal_port) {
679 		RTE_ETH_FOREACH_DEV(prod) {
680 			ret = rte_event_eth_tx_adapter_event_port_get(prod,
681 					&tx_evport_id);
682 			if (ret) {
683 				evt_err("Unable to get Tx adapter[%d]", prod);
684 				return ret;
685 			}
686 
687 			if (rte_event_port_link(opt->dev_id, tx_evport_id,
688 						&tx_evqueue_id[prod],
689 						NULL, 1) != 1) {
690 				evt_err("Unable to link Tx adptr[%d] evprt[%d]",
691 						prod, tx_evport_id);
692 				return ret;
693 			}
694 		}
695 	}
696 
697 	ret = rte_event_dev_start(opt->dev_id);
698 	if (ret) {
699 		evt_err("failed to start eventdev %d", opt->dev_id);
700 		return ret;
701 	}
702 
703 
704 	RTE_ETH_FOREACH_DEV(prod) {
705 		ret = rte_eth_dev_start(prod);
706 		if (ret) {
707 			evt_err("Ethernet dev [%d] failed to start."
708 					" Using synthetic producer", prod);
709 			return ret;
710 		}
711 	}
712 
713 	RTE_ETH_FOREACH_DEV(prod) {
714 		ret = rte_event_eth_rx_adapter_start(prod);
715 		if (ret) {
716 			evt_err("Rx adapter[%d] start failed", prod);
717 			return ret;
718 		}
719 
720 		ret = rte_event_eth_tx_adapter_start(prod);
721 		if (ret) {
722 			evt_err("Tx adapter[%d] start failed", prod);
723 			return ret;
724 		}
725 	}
726 
727 	memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
728 			RTE_MAX_ETHPORTS);
729 
730 	return 0;
731 }
732 
733 static void
734 pipeline_atq_opt_dump(struct evt_options *opt)
735 {
736 	pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
737 }
738 
739 static int
740 pipeline_atq_opt_check(struct evt_options *opt)
741 {
742 	return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
743 }
744 
745 static bool
746 pipeline_atq_capability_check(struct evt_options *opt)
747 {
748 	struct rte_event_dev_info dev_info;
749 
750 	rte_event_dev_info_get(opt->dev_id, &dev_info);
751 	if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
752 			dev_info.max_event_ports <
753 			evt_nr_active_lcores(opt->wlcores)) {
754 		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
755 			pipeline_atq_nb_event_queues(opt),
756 			dev_info.max_event_queues,
757 			evt_nr_active_lcores(opt->wlcores),
758 			dev_info.max_event_ports);
759 	}
760 	if (!evt_has_all_types_queue(opt->dev_id))
761 		return false;
762 
763 	return true;
764 }
765 
766 static const struct evt_test_ops pipeline_atq =  {
767 	.cap_check          = pipeline_atq_capability_check,
768 	.opt_check          = pipeline_atq_opt_check,
769 	.opt_dump           = pipeline_atq_opt_dump,
770 	.test_setup         = pipeline_test_setup,
771 	.mempool_setup      = pipeline_mempool_setup,
772 	.ethdev_setup	    = pipeline_ethdev_setup,
773 	.eventdev_setup     = pipeline_atq_eventdev_setup,
774 	.launch_lcores      = pipeline_atq_launch_lcores,
775 	.eventdev_destroy   = pipeline_eventdev_destroy,
776 	.mempool_destroy    = pipeline_mempool_destroy,
777 	.ethdev_destroy	    = pipeline_ethdev_destroy,
778 	.test_result        = pipeline_test_result,
779 	.test_destroy       = pipeline_test_destroy,
780 };
781 
782 EVT_TEST_REGISTER(pipeline_atq);
783