xref: /dpdk/app/test-eventdev/test_pipeline_queue.c (revision 99a2dd955fba6e4cc23b77d590a033650ced9c45)
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5 
6 #include "test_pipeline_common.h"
7 
8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
9 
10 static __rte_always_inline int
11 pipeline_queue_nb_event_queues(struct evt_options *opt)
12 {
13 	uint16_t eth_count = rte_eth_dev_count_avail();
14 
15 	return (eth_count * opt->nb_stages) + eth_count;
16 }
17 
18 typedef int (*pipeline_queue_worker_t)(void *arg);
19 
20 static __rte_noinline int
21 pipeline_queue_worker_single_stage_tx(void *arg)
22 {
23 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
24 
25 	while (t->done == false) {
26 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
27 
28 		if (!event) {
29 			rte_pause();
30 			continue;
31 		}
32 
33 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
34 			pipeline_event_tx(dev, port, &ev);
35 			w->processed_pkts++;
36 		} else {
37 			ev.queue_id++;
38 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
39 			pipeline_event_enqueue(dev, port, &ev);
40 		}
41 	}
42 
43 	return 0;
44 }
45 
46 static __rte_noinline int
47 pipeline_queue_worker_single_stage_fwd(void *arg)
48 {
49 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
50 	const uint8_t *tx_queue = t->tx_evqueue_id;
51 
52 	while (t->done == false) {
53 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
54 
55 		if (!event) {
56 			rte_pause();
57 			continue;
58 		}
59 
60 		ev.queue_id = tx_queue[ev.mbuf->port];
61 		rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
62 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
63 		pipeline_event_enqueue(dev, port, &ev);
64 		w->processed_pkts++;
65 	}
66 
67 	return 0;
68 }
69 
70 static __rte_noinline int
71 pipeline_queue_worker_single_stage_burst_tx(void *arg)
72 {
73 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
74 
75 	while (t->done == false) {
76 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
77 				BURST_SIZE, 0);
78 
79 		if (!nb_rx) {
80 			rte_pause();
81 			continue;
82 		}
83 
84 		for (i = 0; i < nb_rx; i++) {
85 			rte_prefetch0(ev[i + 1].mbuf);
86 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
87 				pipeline_event_tx(dev, port, &ev[i]);
88 				w->processed_pkts++;
89 			} else {
90 				ev[i].queue_id++;
91 				pipeline_fwd_event(&ev[i],
92 						RTE_SCHED_TYPE_ATOMIC);
93 				pipeline_event_enqueue_burst(dev, port, ev,
94 						nb_rx);
95 			}
96 		}
97 	}
98 
99 	return 0;
100 }
101 
102 static __rte_noinline int
103 pipeline_queue_worker_single_stage_burst_fwd(void *arg)
104 {
105 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
106 	const uint8_t *tx_queue = t->tx_evqueue_id;
107 
108 	while (t->done == false) {
109 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
110 				BURST_SIZE, 0);
111 
112 		if (!nb_rx) {
113 			rte_pause();
114 			continue;
115 		}
116 
117 		for (i = 0; i < nb_rx; i++) {
118 			rte_prefetch0(ev[i + 1].mbuf);
119 			ev[i].queue_id = tx_queue[ev[i].mbuf->port];
120 			rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
121 			pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
122 		}
123 
124 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
125 		w->processed_pkts += nb_rx;
126 	}
127 
128 	return 0;
129 }
130 
131 static __rte_noinline int
132 pipeline_queue_worker_single_stage_tx_vector(void *arg)
133 {
134 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
135 	uint16_t vector_sz;
136 
137 	while (!t->done) {
138 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
139 
140 		if (!event) {
141 			rte_pause();
142 			continue;
143 		}
144 
145 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
146 			vector_sz = ev.vec->nb_elem;
147 			pipeline_event_tx_vector(dev, port, &ev);
148 			w->processed_pkts += vector_sz;
149 		} else {
150 			ev.queue_id++;
151 			pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
152 			pipeline_event_enqueue(dev, port, &ev);
153 		}
154 	}
155 
156 	return 0;
157 }
158 
159 static __rte_noinline int
160 pipeline_queue_worker_single_stage_fwd_vector(void *arg)
161 {
162 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
163 	const uint8_t *tx_queue = t->tx_evqueue_id;
164 	uint16_t vector_sz;
165 
166 	while (!t->done) {
167 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
168 
169 		if (!event) {
170 			rte_pause();
171 			continue;
172 		}
173 
174 		ev.queue_id = tx_queue[ev.vec->port];
175 		ev.vec->queue = 0;
176 		vector_sz = ev.vec->nb_elem;
177 		pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
178 		pipeline_event_enqueue(dev, port, &ev);
179 		w->processed_pkts += vector_sz;
180 	}
181 
182 	return 0;
183 }
184 
185 static __rte_noinline int
186 pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
187 {
188 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
189 	uint16_t vector_sz;
190 
191 	while (!t->done) {
192 		uint16_t nb_rx =
193 			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
194 
195 		if (!nb_rx) {
196 			rte_pause();
197 			continue;
198 		}
199 
200 		for (i = 0; i < nb_rx; i++) {
201 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
202 				vector_sz = ev[i].vec->nb_elem;
203 				pipeline_event_tx_vector(dev, port, &ev[i]);
204 				ev[i].op = RTE_EVENT_OP_RELEASE;
205 				w->processed_pkts += vector_sz;
206 			} else {
207 				ev[i].queue_id++;
208 				pipeline_fwd_event_vector(
209 					&ev[i], RTE_SCHED_TYPE_ATOMIC);
210 			}
211 		}
212 
213 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
214 	}
215 
216 	return 0;
217 }
218 
219 static __rte_noinline int
220 pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
221 {
222 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
223 	const uint8_t *tx_queue = t->tx_evqueue_id;
224 	uint16_t vector_sz;
225 
226 	while (!t->done) {
227 		uint16_t nb_rx =
228 			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
229 
230 		if (!nb_rx) {
231 			rte_pause();
232 			continue;
233 		}
234 
235 		vector_sz = 0;
236 		for (i = 0; i < nb_rx; i++) {
237 			ev[i].queue_id = tx_queue[ev[i].vec->port];
238 			ev[i].vec->queue = 0;
239 			vector_sz += ev[i].vec->nb_elem;
240 			pipeline_fwd_event_vector(&ev[i],
241 						  RTE_SCHED_TYPE_ATOMIC);
242 		}
243 
244 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
245 		w->processed_pkts += vector_sz;
246 	}
247 
248 	return 0;
249 }
250 
251 static __rte_noinline int
252 pipeline_queue_worker_multi_stage_tx(void *arg)
253 {
254 	PIPELINE_WORKER_MULTI_STAGE_INIT;
255 	const uint8_t *tx_queue = t->tx_evqueue_id;
256 
257 	while (t->done == false) {
258 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
259 
260 		if (!event) {
261 			rte_pause();
262 			continue;
263 		}
264 
265 		cq_id = ev.queue_id % nb_stages;
266 
267 		if (ev.queue_id == tx_queue[ev.mbuf->port]) {
268 			pipeline_event_tx(dev, port, &ev);
269 			w->processed_pkts++;
270 			continue;
271 		}
272 
273 		ev.queue_id++;
274 		pipeline_fwd_event(&ev, cq_id != last_queue ?
275 				sched_type_list[cq_id] :
276 				RTE_SCHED_TYPE_ATOMIC);
277 		pipeline_event_enqueue(dev, port, &ev);
278 	}
279 
280 	return 0;
281 }
282 
283 static __rte_noinline int
284 pipeline_queue_worker_multi_stage_fwd(void *arg)
285 {
286 	PIPELINE_WORKER_MULTI_STAGE_INIT;
287 	const uint8_t *tx_queue = t->tx_evqueue_id;
288 
289 	while (t->done == false) {
290 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
291 
292 		if (!event) {
293 			rte_pause();
294 			continue;
295 		}
296 
297 		cq_id = ev.queue_id % nb_stages;
298 
299 		if (cq_id == last_queue) {
300 			ev.queue_id = tx_queue[ev.mbuf->port];
301 			rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
302 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
303 			pipeline_event_enqueue(dev, port, &ev);
304 			w->processed_pkts++;
305 		} else {
306 			ev.queue_id++;
307 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
308 			pipeline_event_enqueue(dev, port, &ev);
309 		}
310 	}
311 
312 	return 0;
313 }
314 
315 static __rte_noinline int
316 pipeline_queue_worker_multi_stage_burst_tx(void *arg)
317 {
318 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
319 	const uint8_t *tx_queue = t->tx_evqueue_id;
320 
321 	while (t->done == false) {
322 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
323 				BURST_SIZE, 0);
324 
325 		if (!nb_rx) {
326 			rte_pause();
327 			continue;
328 		}
329 
330 		for (i = 0; i < nb_rx; i++) {
331 			rte_prefetch0(ev[i + 1].mbuf);
332 			cq_id = ev[i].queue_id % nb_stages;
333 
334 			if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
335 				pipeline_event_tx(dev, port, &ev[i]);
336 				w->processed_pkts++;
337 				continue;
338 			}
339 
340 			ev[i].queue_id++;
341 			pipeline_fwd_event(&ev[i], cq_id != last_queue ?
342 					sched_type_list[cq_id] :
343 					RTE_SCHED_TYPE_ATOMIC);
344 			pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
345 		}
346 	}
347 
348 	return 0;
349 }
350 
351 static __rte_noinline int
352 pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
353 {
354 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
355 	const uint8_t *tx_queue = t->tx_evqueue_id;
356 
357 	while (t->done == false) {
358 		uint16_t processed_pkts = 0;
359 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
360 				BURST_SIZE, 0);
361 
362 		if (!nb_rx) {
363 			rte_pause();
364 			continue;
365 		}
366 
367 		for (i = 0; i < nb_rx; i++) {
368 			rte_prefetch0(ev[i + 1].mbuf);
369 			cq_id = ev[i].queue_id % nb_stages;
370 
371 			if (cq_id == last_queue) {
372 				ev[i].queue_id = tx_queue[ev[i].mbuf->port];
373 				rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
374 				pipeline_fwd_event(&ev[i],
375 						RTE_SCHED_TYPE_ATOMIC);
376 				processed_pkts++;
377 			} else {
378 				ev[i].queue_id++;
379 				pipeline_fwd_event(&ev[i],
380 						sched_type_list[cq_id]);
381 			}
382 		}
383 
384 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
385 		w->processed_pkts += processed_pkts;
386 	}
387 
388 	return 0;
389 }
390 
391 static __rte_noinline int
392 pipeline_queue_worker_multi_stage_tx_vector(void *arg)
393 {
394 	PIPELINE_WORKER_MULTI_STAGE_INIT;
395 	const uint8_t *tx_queue = t->tx_evqueue_id;
396 	uint16_t vector_sz;
397 
398 	while (!t->done) {
399 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
400 
401 		if (!event) {
402 			rte_pause();
403 			continue;
404 		}
405 
406 		cq_id = ev.queue_id % nb_stages;
407 
408 		if (ev.queue_id == tx_queue[ev.vec->port]) {
409 			vector_sz = ev.vec->nb_elem;
410 			pipeline_event_tx_vector(dev, port, &ev);
411 			w->processed_pkts += vector_sz;
412 			continue;
413 		}
414 
415 		ev.queue_id++;
416 		pipeline_fwd_event_vector(&ev, cq_id != last_queue
417 						       ? sched_type_list[cq_id]
418 						       : RTE_SCHED_TYPE_ATOMIC);
419 		pipeline_event_enqueue(dev, port, &ev);
420 	}
421 
422 	return 0;
423 }
424 
425 static __rte_noinline int
426 pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
427 {
428 	PIPELINE_WORKER_MULTI_STAGE_INIT;
429 	const uint8_t *tx_queue = t->tx_evqueue_id;
430 	uint16_t vector_sz;
431 
432 	while (!t->done) {
433 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
434 
435 		if (!event) {
436 			rte_pause();
437 			continue;
438 		}
439 
440 		cq_id = ev.queue_id % nb_stages;
441 
442 		if (cq_id == last_queue) {
443 			vector_sz = ev.vec->nb_elem;
444 			ev.queue_id = tx_queue[ev.vec->port];
445 			pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
446 			w->processed_pkts += vector_sz;
447 		} else {
448 			ev.queue_id++;
449 			pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
450 		}
451 
452 		pipeline_event_enqueue(dev, port, &ev);
453 	}
454 
455 	return 0;
456 }
457 
458 static __rte_noinline int
459 pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
460 {
461 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
462 	const uint8_t *tx_queue = t->tx_evqueue_id;
463 	uint16_t vector_sz;
464 
465 	while (!t->done) {
466 		uint16_t nb_rx =
467 			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
468 
469 		if (!nb_rx) {
470 			rte_pause();
471 			continue;
472 		}
473 
474 		for (i = 0; i < nb_rx; i++) {
475 			cq_id = ev[i].queue_id % nb_stages;
476 
477 			if (ev[i].queue_id == tx_queue[ev[i].vec->port]) {
478 				vector_sz = ev[i].vec->nb_elem;
479 				pipeline_event_tx_vector(dev, port, &ev[i]);
480 				ev[i].op = RTE_EVENT_OP_RELEASE;
481 				w->processed_pkts += vector_sz;
482 				continue;
483 			}
484 
485 			ev[i].queue_id++;
486 			pipeline_fwd_event_vector(
487 				&ev[i], cq_id != last_queue
488 						? sched_type_list[cq_id]
489 						: RTE_SCHED_TYPE_ATOMIC);
490 		}
491 
492 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
493 	}
494 
495 	return 0;
496 }
497 
498 static __rte_noinline int
499 pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
500 {
501 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
502 	const uint8_t *tx_queue = t->tx_evqueue_id;
503 	uint16_t vector_sz;
504 
505 	while (!t->done) {
506 		uint16_t nb_rx =
507 			rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
508 
509 		if (!nb_rx) {
510 			rte_pause();
511 			continue;
512 		}
513 
514 		for (i = 0; i < nb_rx; i++) {
515 			cq_id = ev[i].queue_id % nb_stages;
516 
517 			if (cq_id == last_queue) {
518 				ev[i].queue_id = tx_queue[ev[i].vec->port];
519 				vector_sz = ev[i].vec->nb_elem;
520 				pipeline_fwd_event_vector(
521 					&ev[i], RTE_SCHED_TYPE_ATOMIC);
522 				w->processed_pkts += vector_sz;
523 			} else {
524 				ev[i].queue_id++;
525 				pipeline_fwd_event_vector(
526 					&ev[i], sched_type_list[cq_id]);
527 			}
528 		}
529 
530 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
531 	}
532 
533 	return 0;
534 }
535 
536 static int
537 worker_wrapper(void *arg)
538 {
539 	struct worker_data *w  = arg;
540 	struct evt_options *opt = w->t->opt;
541 	const bool burst = evt_has_burst_mode(w->dev_id);
542 	const bool internal_port = w->t->internal_port;
543 	const uint8_t nb_stages = opt->nb_stages;
544 	/*vector/burst/internal_port*/
545 	const pipeline_queue_worker_t
546 	pipeline_queue_worker_single_stage[2][2][2] = {
547 		[0][0][0] = pipeline_queue_worker_single_stage_fwd,
548 		[0][0][1] = pipeline_queue_worker_single_stage_tx,
549 		[0][1][0] = pipeline_queue_worker_single_stage_burst_fwd,
550 		[0][1][1] = pipeline_queue_worker_single_stage_burst_tx,
551 		[1][0][0] = pipeline_queue_worker_single_stage_fwd_vector,
552 		[1][0][1] = pipeline_queue_worker_single_stage_tx_vector,
553 		[1][1][0] = pipeline_queue_worker_single_stage_burst_fwd_vector,
554 		[1][1][1] = pipeline_queue_worker_single_stage_burst_tx_vector,
555 	};
556 	const pipeline_queue_worker_t
557 	pipeline_queue_worker_multi_stage[2][2][2] = {
558 		[0][0][0] = pipeline_queue_worker_multi_stage_fwd,
559 		[0][0][1] = pipeline_queue_worker_multi_stage_tx,
560 		[0][1][0] = pipeline_queue_worker_multi_stage_burst_fwd,
561 		[0][1][1] = pipeline_queue_worker_multi_stage_burst_tx,
562 		[1][0][0] = pipeline_queue_worker_multi_stage_fwd_vector,
563 		[1][0][1] = pipeline_queue_worker_multi_stage_tx_vector,
564 		[1][1][0] = pipeline_queue_worker_multi_stage_burst_fwd_vector,
565 		[1][1][1] = pipeline_queue_worker_multi_stage_burst_tx_vector,
566 	};
567 
568 	if (nb_stages == 1)
569 		return (pipeline_queue_worker_single_stage[opt->ena_vector]
570 							  [burst]
571 							  [internal_port])(arg);
572 	else
573 		return (pipeline_queue_worker_multi_stage[opt->ena_vector]
574 							 [burst]
575 							 [internal_port])(arg);
576 
577 	rte_panic("invalid worker\n");
578 }
579 
580 static int
581 pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
582 {
583 	return pipeline_launch_lcores(test, opt, worker_wrapper);
584 }
585 
586 static int
587 pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
588 {
589 	int ret;
590 	int nb_ports;
591 	int nb_queues;
592 	int nb_stages = opt->nb_stages;
593 	uint8_t queue;
594 	uint8_t tx_evport_id = 0;
595 	uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
596 	uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
597 	uint8_t nb_worker_queues = 0;
598 	uint16_t prod = 0;
599 	struct rte_event_dev_info info;
600 	struct test_pipeline *t = evt_test_priv(test);
601 
602 	nb_ports = evt_nr_active_lcores(opt->wlcores);
603 	nb_queues = rte_eth_dev_count_avail() * (nb_stages);
604 
605 	/* One queue for Tx adapter per port */
606 	nb_queues += rte_eth_dev_count_avail();
607 
608 	memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
609 	memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
610 
611 	rte_event_dev_info_get(opt->dev_id, &info);
612 	ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
613 	if (ret) {
614 		evt_err("failed to configure eventdev %d", opt->dev_id);
615 		return ret;
616 	}
617 
618 	struct rte_event_queue_conf q_conf = {
619 			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
620 			.nb_atomic_flows = opt->nb_flows,
621 			.nb_atomic_order_sequences = opt->nb_flows,
622 	};
623 	/* queue configurations */
624 	for (queue = 0; queue < nb_queues; queue++) {
625 		uint8_t slot;
626 
627 		q_conf.event_queue_cfg = 0;
628 		slot = queue % (nb_stages + 1);
629 		if (slot == nb_stages) {
630 			q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC;
631 			if (!t->internal_port) {
632 				q_conf.event_queue_cfg =
633 					RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
634 			}
635 			tx_evqueue_id[prod++] = queue;
636 		} else {
637 			q_conf.schedule_type = opt->sched_type_list[slot];
638 			queue_arr[nb_worker_queues] = queue;
639 			nb_worker_queues++;
640 		}
641 
642 		ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
643 		if (ret) {
644 			evt_err("failed to setup queue=%d", queue);
645 			return ret;
646 		}
647 	}
648 
649 	if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
650 		opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
651 
652 	/* port configuration */
653 	const struct rte_event_port_conf p_conf = {
654 			.dequeue_depth = opt->wkr_deq_dep,
655 			.enqueue_depth = info.max_event_port_dequeue_depth,
656 			.new_event_threshold = info.max_num_events,
657 	};
658 
659 	if (!t->internal_port) {
660 		ret = pipeline_event_port_setup(test, opt, queue_arr,
661 				nb_worker_queues, p_conf);
662 		if (ret)
663 			return ret;
664 	} else
665 		ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
666 				p_conf);
667 
668 	if (ret)
669 		return ret;
670 	/*
671 	 * The pipelines are setup in the following manner:
672 	 *
673 	 * eth_dev_count = 2, nb_stages = 2.
674 	 *
675 	 *	queues = 6
676 	 *	stride = 3
677 	 *
678 	 *	event queue pipelines:
679 	 *	eth0 -> q0 -> q1 -> (q2->tx)
680 	 *	eth1 -> q3 -> q4 -> (q5->tx)
681 	 *
682 	 *	q2, q5 configured as ATOMIC | SINGLE_LINK
683 	 *
684 	 */
685 	ret = pipeline_event_rx_adapter_setup(opt, nb_stages + 1, p_conf);
686 	if (ret)
687 		return ret;
688 
689 	ret = pipeline_event_tx_adapter_setup(opt, p_conf);
690 	if (ret)
691 		return ret;
692 
693 	if (!evt_has_distributed_sched(opt->dev_id)) {
694 		uint32_t service_id;
695 		rte_event_dev_service_id_get(opt->dev_id, &service_id);
696 		ret = evt_service_setup(service_id);
697 		if (ret) {
698 			evt_err("No service lcore found to run event dev.");
699 			return ret;
700 		}
701 	}
702 
703 	/* Connect the tx_evqueue_id to the Tx adapter port */
704 	if (!t->internal_port) {
705 		RTE_ETH_FOREACH_DEV(prod) {
706 			ret = rte_event_eth_tx_adapter_event_port_get(prod,
707 					&tx_evport_id);
708 			if (ret) {
709 				evt_err("Unable to get Tx adptr[%d] evprt[%d]",
710 						prod, tx_evport_id);
711 				return ret;
712 			}
713 
714 			if (rte_event_port_link(opt->dev_id, tx_evport_id,
715 						&tx_evqueue_id[prod],
716 						NULL, 1) != 1) {
717 				evt_err("Unable to link Tx adptr[%d] evprt[%d]",
718 						prod, tx_evport_id);
719 				return ret;
720 			}
721 		}
722 	}
723 
724 	ret = rte_event_dev_start(opt->dev_id);
725 	if (ret) {
726 		evt_err("failed to start eventdev %d", opt->dev_id);
727 		return ret;
728 	}
729 
730 
731 	RTE_ETH_FOREACH_DEV(prod) {
732 		ret = rte_eth_dev_start(prod);
733 		if (ret) {
734 			evt_err("Ethernet dev [%d] failed to start."
735 					" Using synthetic producer", prod);
736 			return ret;
737 		}
738 
739 	}
740 
741 	RTE_ETH_FOREACH_DEV(prod) {
742 		ret = rte_event_eth_rx_adapter_start(prod);
743 		if (ret) {
744 			evt_err("Rx adapter[%d] start failed", prod);
745 			return ret;
746 		}
747 
748 		ret = rte_event_eth_tx_adapter_start(prod);
749 		if (ret) {
750 			evt_err("Tx adapter[%d] start failed", prod);
751 			return ret;
752 		}
753 	}
754 
755 	memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
756 			RTE_MAX_ETHPORTS);
757 
758 	return 0;
759 }
760 
761 static void
762 pipeline_queue_opt_dump(struct evt_options *opt)
763 {
764 	pipeline_opt_dump(opt, pipeline_queue_nb_event_queues(opt));
765 }
766 
767 static int
768 pipeline_queue_opt_check(struct evt_options *opt)
769 {
770 	return pipeline_opt_check(opt, pipeline_queue_nb_event_queues(opt));
771 }
772 
773 static bool
774 pipeline_queue_capability_check(struct evt_options *opt)
775 {
776 	struct rte_event_dev_info dev_info;
777 
778 	rte_event_dev_info_get(opt->dev_id, &dev_info);
779 	if (dev_info.max_event_queues < pipeline_queue_nb_event_queues(opt) ||
780 			dev_info.max_event_ports <
781 			evt_nr_active_lcores(opt->wlcores)) {
782 		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
783 			pipeline_queue_nb_event_queues(opt),
784 			dev_info.max_event_queues,
785 			evt_nr_active_lcores(opt->wlcores),
786 			dev_info.max_event_ports);
787 	}
788 
789 	return true;
790 }
791 
792 static const struct evt_test_ops pipeline_queue =  {
793 	.cap_check          = pipeline_queue_capability_check,
794 	.opt_check          = pipeline_queue_opt_check,
795 	.opt_dump           = pipeline_queue_opt_dump,
796 	.test_setup         = pipeline_test_setup,
797 	.mempool_setup      = pipeline_mempool_setup,
798 	.ethdev_setup	    = pipeline_ethdev_setup,
799 	.eventdev_setup     = pipeline_queue_eventdev_setup,
800 	.launch_lcores      = pipeline_queue_launch_lcores,
801 	.eventdev_destroy   = pipeline_eventdev_destroy,
802 	.mempool_destroy    = pipeline_mempool_destroy,
803 	.ethdev_destroy	    = pipeline_ethdev_destroy,
804 	.test_result        = pipeline_test_result,
805 	.test_destroy       = pipeline_test_destroy,
806 };
807 
808 EVT_TEST_REGISTER(pipeline_queue);
809