xref: /dpdk/app/test-eventdev/test_pipeline_atq.c (revision f0b68c0b2af72465559445ac7548bfe0f1c005e3)
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5 
6 #include "test_pipeline_common.h"
7 
8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
9 
10 static __rte_always_inline int
pipeline_atq_nb_event_queues(struct evt_options * opt)11 pipeline_atq_nb_event_queues(struct evt_options *opt)
12 {
13 	RTE_SET_USED(opt);
14 
15 	return rte_eth_dev_count_avail();
16 }
17 
18 typedef int (*pipeline_atq_worker_t)(void *arg);
19 
20 static __rte_noinline int
pipeline_atq_worker_single_stage_tx(void * arg)21 pipeline_atq_worker_single_stage_tx(void *arg)
22 {
23 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
24 	uint8_t enq = 0, deq = 0;
25 
26 	while (t->done == false) {
27 		deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
28 
29 		if (!deq) {
30 			rte_pause();
31 			continue;
32 		}
33 
34 		deq = pipeline_event_tx(dev, port, &ev, t);
35 		w->processed_pkts++;
36 	}
37 	pipeline_worker_cleanup(dev, port, &ev, enq, deq);
38 
39 	return 0;
40 }
41 
42 static __rte_noinline int
pipeline_atq_worker_single_stage_fwd(void * arg)43 pipeline_atq_worker_single_stage_fwd(void *arg)
44 {
45 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
46 	const uint8_t *tx_queue = t->tx_evqueue_id;
47 	uint8_t enq = 0, deq = 0;
48 
49 	while (t->done == false) {
50 		deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
51 
52 		if (!deq) {
53 			rte_pause();
54 			continue;
55 		}
56 
57 		ev.queue_id = tx_queue[ev.mbuf->port];
58 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
59 		enq = pipeline_event_enqueue(dev, port, &ev, t);
60 		w->processed_pkts++;
61 	}
62 	pipeline_worker_cleanup(dev, port, &ev, enq, deq);
63 
64 	return 0;
65 }
66 
67 static __rte_noinline int
pipeline_atq_worker_single_stage_burst_tx(void * arg)68 pipeline_atq_worker_single_stage_burst_tx(void *arg)
69 {
70 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
71 	uint16_t nb_rx = 0, nb_tx = 0;
72 
73 	while (t->done == false) {
74 		nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
75 
76 		if (!nb_rx) {
77 			rte_pause();
78 			continue;
79 		}
80 
81 		for (i = 0; i < nb_rx; i++) {
82 			rte_prefetch0(ev[i + 1].mbuf);
83 			rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
84 		}
85 
86 		nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
87 		w->processed_pkts += nb_tx;
88 	}
89 	pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
90 
91 	return 0;
92 }
93 
94 static __rte_noinline int
pipeline_atq_worker_single_stage_burst_fwd(void * arg)95 pipeline_atq_worker_single_stage_burst_fwd(void *arg)
96 {
97 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
98 	const uint8_t *tx_queue = t->tx_evqueue_id;
99 	uint16_t nb_rx = 0, nb_tx = 0;
100 
101 	while (t->done == false) {
102 		nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
103 
104 		if (!nb_rx) {
105 			rte_pause();
106 			continue;
107 		}
108 
109 		for (i = 0; i < nb_rx; i++) {
110 			rte_prefetch0(ev[i + 1].mbuf);
111 			rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
112 			ev[i].queue_id = tx_queue[ev[i].mbuf->port];
113 			pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
114 		}
115 
116 		nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
117 		w->processed_pkts += nb_tx;
118 	}
119 	pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
120 
121 	return 0;
122 }
123 
124 static __rte_noinline int
pipeline_atq_worker_single_stage_tx_vector(void * arg)125 pipeline_atq_worker_single_stage_tx_vector(void *arg)
126 {
127 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
128 	uint8_t enq = 0, deq = 0;
129 	uint16_t vector_sz;
130 
131 	while (!t->done) {
132 		deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
133 
134 		if (!deq) {
135 			rte_pause();
136 			continue;
137 		}
138 		vector_sz = ev.vec->nb_elem;
139 		enq = pipeline_event_tx_vector(dev, port, &ev, t);
140 		w->processed_pkts += vector_sz;
141 	}
142 	pipeline_worker_cleanup(dev, port, &ev, enq, deq);
143 
144 	return 0;
145 }
146 
147 static __rte_noinline int
pipeline_atq_worker_single_stage_fwd_vector(void * arg)148 pipeline_atq_worker_single_stage_fwd_vector(void *arg)
149 {
150 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
151 	const uint8_t *tx_queue = t->tx_evqueue_id;
152 	uint8_t enq = 0, deq = 0;
153 	uint16_t vector_sz;
154 
155 	while (!t->done) {
156 		deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
157 
158 		if (!deq) {
159 			rte_pause();
160 			continue;
161 		}
162 
163 		vector_sz = ev.vec->nb_elem;
164 		ev.queue_id = tx_queue[ev.vec->port];
165 		ev.vec->queue = 0;
166 		pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
167 		enq = pipeline_event_enqueue(dev, port, &ev, t);
168 		w->processed_pkts += vector_sz;
169 	}
170 	pipeline_worker_cleanup(dev, port, &ev, enq, deq);
171 
172 	return 0;
173 }
174 
175 static __rte_noinline int
pipeline_atq_worker_single_stage_burst_tx_vector(void * arg)176 pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
177 {
178 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
179 	uint16_t nb_rx = 0, nb_tx = 0;
180 	uint16_t vector_sz;
181 
182 	while (!t->done) {
183 		nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
184 
185 		if (!nb_rx) {
186 			rte_pause();
187 			continue;
188 		}
189 		vector_sz = 0;
190 		for (i = 0; i < nb_rx; i++) {
191 			vector_sz += ev[i].vec->nb_elem;
192 			ev[i].vec->queue = 0;
193 		}
194 
195 		nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
196 		w->processed_pkts += vector_sz;
197 	}
198 	pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
199 
200 	return 0;
201 }
202 
203 static __rte_noinline int
pipeline_atq_worker_single_stage_burst_fwd_vector(void * arg)204 pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
205 {
206 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
207 	const uint8_t *tx_queue = t->tx_evqueue_id;
208 	uint16_t nb_rx = 0, nb_tx = 0;
209 	uint16_t vector_sz;
210 
211 	while (!t->done) {
212 		nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
213 
214 		if (!nb_rx) {
215 			rte_pause();
216 			continue;
217 		}
218 
219 		vector_sz = 0;
220 		for (i = 0; i < nb_rx; i++) {
221 			ev[i].queue_id = tx_queue[ev[i].vec->port];
222 			ev[i].vec->queue = 0;
223 			vector_sz += ev[i].vec->nb_elem;
224 			pipeline_fwd_event_vector(&ev[i],
225 						  RTE_SCHED_TYPE_ATOMIC);
226 		}
227 
228 		nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
229 		w->processed_pkts += vector_sz;
230 	}
231 	pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
232 
233 	return 0;
234 }
235 
236 static __rte_noinline int
pipeline_atq_worker_multi_stage_tx(void * arg)237 pipeline_atq_worker_multi_stage_tx(void *arg)
238 {
239 	PIPELINE_WORKER_MULTI_STAGE_INIT;
240 	uint8_t enq = 0, deq = 0;
241 
242 	while (t->done == false) {
243 		deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
244 
245 		if (!deq) {
246 			rte_pause();
247 			continue;
248 		}
249 
250 		cq_id = ev.sub_event_type % nb_stages;
251 
252 		if (cq_id == last_queue) {
253 			enq = pipeline_event_tx(dev, port, &ev, t);
254 			w->processed_pkts++;
255 			continue;
256 		}
257 
258 		ev.sub_event_type++;
259 		pipeline_fwd_event(&ev, sched_type_list[cq_id]);
260 		enq = pipeline_event_enqueue(dev, port, &ev, t);
261 	}
262 	pipeline_worker_cleanup(dev, port, &ev, enq, deq);
263 
264 	return 0;
265 }
266 
267 static __rte_noinline int
pipeline_atq_worker_multi_stage_fwd(void * arg)268 pipeline_atq_worker_multi_stage_fwd(void *arg)
269 {
270 	PIPELINE_WORKER_MULTI_STAGE_INIT;
271 	const uint8_t *tx_queue = t->tx_evqueue_id;
272 	uint8_t enq = 0, deq = 0;
273 
274 	while (t->done == false) {
275 		deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
276 
277 		if (!deq) {
278 			rte_pause();
279 			continue;
280 		}
281 
282 		cq_id = ev.sub_event_type % nb_stages;
283 
284 		if (cq_id == last_queue) {
285 			ev.queue_id = tx_queue[ev.mbuf->port];
286 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
287 			w->processed_pkts++;
288 		} else {
289 			ev.sub_event_type++;
290 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
291 		}
292 
293 		enq = pipeline_event_enqueue(dev, port, &ev, t);
294 	}
295 	pipeline_worker_cleanup(dev, port, &ev, enq, deq);
296 
297 	return 0;
298 }
299 
300 static __rte_noinline int
pipeline_atq_worker_multi_stage_burst_tx(void * arg)301 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
302 {
303 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
304 	uint16_t nb_rx = 0, nb_tx = 0;
305 
306 	while (t->done == false) {
307 		nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
308 
309 		if (!nb_rx) {
310 			rte_pause();
311 			continue;
312 		}
313 
314 		for (i = 0; i < nb_rx; i++) {
315 			rte_prefetch0(ev[i + 1].mbuf);
316 			cq_id = ev[i].sub_event_type % nb_stages;
317 
318 			if (cq_id == last_queue) {
319 				pipeline_event_tx(dev, port, &ev[i], t);
320 				ev[i].op = RTE_EVENT_OP_RELEASE;
321 				w->processed_pkts++;
322 				continue;
323 			}
324 
325 			ev[i].sub_event_type++;
326 			pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
327 		}
328 
329 		nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
330 	}
331 	pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
332 
333 	return 0;
334 }
335 
336 static __rte_noinline int
pipeline_atq_worker_multi_stage_burst_fwd(void * arg)337 pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
338 {
339 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
340 	const uint8_t *tx_queue = t->tx_evqueue_id;
341 	uint16_t nb_rx = 0, nb_tx = 0;
342 
343 	while (t->done == false) {
344 		nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
345 
346 		if (!nb_rx) {
347 			rte_pause();
348 			continue;
349 		}
350 
351 		for (i = 0; i < nb_rx; i++) {
352 			rte_prefetch0(ev[i + 1].mbuf);
353 			cq_id = ev[i].sub_event_type % nb_stages;
354 
355 			if (cq_id == last_queue) {
356 				w->processed_pkts++;
357 				ev[i].queue_id = tx_queue[ev[i].mbuf->port];
358 				pipeline_fwd_event(&ev[i],
359 						RTE_SCHED_TYPE_ATOMIC);
360 			} else {
361 				ev[i].sub_event_type++;
362 				pipeline_fwd_event(&ev[i],
363 						sched_type_list[cq_id]);
364 			}
365 		}
366 
367 		nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
368 	}
369 	pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
370 
371 	return 0;
372 }
373 
374 static __rte_noinline int
pipeline_atq_worker_multi_stage_tx_vector(void * arg)375 pipeline_atq_worker_multi_stage_tx_vector(void *arg)
376 {
377 	PIPELINE_WORKER_MULTI_STAGE_INIT;
378 	uint8_t enq = 0, deq = 0;
379 	uint16_t vector_sz;
380 
381 	while (!t->done) {
382 		deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
383 
384 		if (!deq) {
385 			rte_pause();
386 			continue;
387 		}
388 
389 		cq_id = ev.sub_event_type % nb_stages;
390 
391 		if (cq_id == last_queue) {
392 			vector_sz = ev.vec->nb_elem;
393 			enq = pipeline_event_tx_vector(dev, port, &ev, t);
394 			w->processed_pkts += vector_sz;
395 			continue;
396 		}
397 
398 		ev.sub_event_type++;
399 		pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
400 		enq = pipeline_event_enqueue(dev, port, &ev, t);
401 	}
402 	pipeline_worker_cleanup(dev, port, &ev, enq, deq);
403 
404 	return 0;
405 }
406 
407 static __rte_noinline int
pipeline_atq_worker_multi_stage_fwd_vector(void * arg)408 pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
409 {
410 	PIPELINE_WORKER_MULTI_STAGE_INIT;
411 	const uint8_t *tx_queue = t->tx_evqueue_id;
412 	uint8_t enq = 0, deq = 0;
413 	uint16_t vector_sz;
414 
415 	while (!t->done) {
416 		deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
417 
418 		if (!deq) {
419 			rte_pause();
420 			continue;
421 		}
422 
423 		cq_id = ev.sub_event_type % nb_stages;
424 
425 		if (cq_id == last_queue) {
426 			ev.queue_id = tx_queue[ev.vec->port];
427 			ev.vec->queue = 0;
428 			vector_sz = ev.vec->nb_elem;
429 			pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
430 			enq = pipeline_event_enqueue(dev, port, &ev, t);
431 			w->processed_pkts += vector_sz;
432 		} else {
433 			ev.sub_event_type++;
434 			pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
435 			enq = pipeline_event_enqueue(dev, port, &ev, t);
436 		}
437 	}
438 	pipeline_worker_cleanup(dev, port, &ev, enq, deq);
439 
440 	return 0;
441 }
442 
443 static __rte_noinline int
pipeline_atq_worker_multi_stage_burst_tx_vector(void * arg)444 pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
445 {
446 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
447 	uint16_t nb_rx = 0, nb_tx = 0;
448 	uint16_t vector_sz;
449 
450 	while (!t->done) {
451 		nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
452 
453 		if (!nb_rx) {
454 			rte_pause();
455 			continue;
456 		}
457 
458 		for (i = 0; i < nb_rx; i++) {
459 			cq_id = ev[i].sub_event_type % nb_stages;
460 
461 			if (cq_id == last_queue) {
462 				vector_sz = ev[i].vec->nb_elem;
463 				pipeline_event_tx_vector(dev, port, &ev[i], t);
464 				ev[i].op = RTE_EVENT_OP_RELEASE;
465 				w->processed_pkts += vector_sz;
466 				continue;
467 			}
468 
469 			ev[i].sub_event_type++;
470 			pipeline_fwd_event_vector(&ev[i],
471 						  sched_type_list[cq_id]);
472 		}
473 
474 		nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
475 	}
476 	pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
477 
478 	return 0;
479 }
480 
481 static __rte_noinline int
pipeline_atq_worker_multi_stage_burst_fwd_vector(void * arg)482 pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
483 {
484 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
485 	const uint8_t *tx_queue = t->tx_evqueue_id;
486 	uint16_t nb_rx = 0, nb_tx = 0;
487 	uint16_t vector_sz;
488 
489 	while (!t->done) {
490 		nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
491 
492 		if (!nb_rx) {
493 			rte_pause();
494 			continue;
495 		}
496 
497 		for (i = 0; i < nb_rx; i++) {
498 			cq_id = ev[i].sub_event_type % nb_stages;
499 
500 			if (cq_id == last_queue) {
501 				vector_sz = ev[i].vec->nb_elem;
502 				ev[i].queue_id = tx_queue[ev[i].vec->port];
503 				ev[i].vec->queue = 0;
504 				pipeline_fwd_event_vector(
505 					&ev[i], RTE_SCHED_TYPE_ATOMIC);
506 				w->processed_pkts += vector_sz;
507 			} else {
508 				ev[i].sub_event_type++;
509 				pipeline_fwd_event_vector(
510 					&ev[i], sched_type_list[cq_id]);
511 			}
512 		}
513 
514 		nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
515 	}
516 	pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
517 
518 	return 0;
519 }
520 
521 static int
worker_wrapper(void * arg)522 worker_wrapper(void *arg)
523 {
524 	struct worker_data *w  = arg;
525 	struct evt_options *opt = w->t->opt;
526 	const bool burst = evt_has_burst_mode(w->dev_id);
527 	const bool internal_port = w->t->internal_port;
528 	const uint8_t nb_stages = opt->nb_stages;
529 	/*vector/burst/internal_port*/
530 	const pipeline_atq_worker_t
531 	pipeline_atq_worker_single_stage[2][2][2] = {
532 		[0][0][0] = pipeline_atq_worker_single_stage_fwd,
533 		[0][0][1] = pipeline_atq_worker_single_stage_tx,
534 		[0][1][0] = pipeline_atq_worker_single_stage_burst_fwd,
535 		[0][1][1] = pipeline_atq_worker_single_stage_burst_tx,
536 		[1][0][0] = pipeline_atq_worker_single_stage_fwd_vector,
537 		[1][0][1] = pipeline_atq_worker_single_stage_tx_vector,
538 		[1][1][0] = pipeline_atq_worker_single_stage_burst_fwd_vector,
539 		[1][1][1] = pipeline_atq_worker_single_stage_burst_tx_vector,
540 	};
541 	const pipeline_atq_worker_t
542 	pipeline_atq_worker_multi_stage[2][2][2] = {
543 		[0][0][0] = pipeline_atq_worker_multi_stage_fwd,
544 		[0][0][1] = pipeline_atq_worker_multi_stage_tx,
545 		[0][1][0] = pipeline_atq_worker_multi_stage_burst_fwd,
546 		[0][1][1] = pipeline_atq_worker_multi_stage_burst_tx,
547 		[1][0][0] = pipeline_atq_worker_multi_stage_fwd_vector,
548 		[1][0][1] = pipeline_atq_worker_multi_stage_tx_vector,
549 		[1][1][0] = pipeline_atq_worker_multi_stage_burst_fwd_vector,
550 		[1][1][1] = pipeline_atq_worker_multi_stage_burst_tx_vector,
551 	};
552 
553 	if (nb_stages == 1)
554 		return (pipeline_atq_worker_single_stage[opt->ena_vector][burst]
555 							[internal_port])(arg);
556 	else
557 		return (pipeline_atq_worker_multi_stage[opt->ena_vector][burst]
558 						       [internal_port])(arg);
559 
560 	rte_panic("invalid worker\n");
561 }
562 
563 static int
pipeline_atq_launch_lcores(struct evt_test * test,struct evt_options * opt)564 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
565 {
566 	return pipeline_launch_lcores(test, opt, worker_wrapper);
567 }
568 
569 static int
pipeline_atq_eventdev_setup(struct evt_test * test,struct evt_options * opt)570 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
571 {
572 	int ret;
573 	int nb_ports;
574 	int nb_queues;
575 	uint8_t queue, is_prod;
576 	uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
577 	uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
578 	uint8_t nb_worker_queues = 0;
579 	uint8_t tx_evport_id = 0;
580 	uint16_t prod = 0;
581 	struct rte_event_dev_info info;
582 	struct test_pipeline *t = evt_test_priv(test);
583 
584 	nb_ports = evt_nr_active_lcores(opt->wlcores);
585 	nb_queues = rte_eth_dev_count_avail();
586 
587 	memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
588 	memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
589 	/* One queue for Tx adapter per port */
590 	if (!t->internal_port) {
591 		RTE_ETH_FOREACH_DEV(prod) {
592 			tx_evqueue_id[prod] = nb_queues;
593 			nb_queues++;
594 		}
595 	}
596 
597 	rte_event_dev_info_get(opt->dev_id, &info);
598 
599 	ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
600 	if (ret) {
601 		evt_err("failed to configure eventdev %d", opt->dev_id);
602 		return ret;
603 	}
604 
605 	struct rte_event_queue_conf q_conf = {
606 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
607 		.nb_atomic_flows = opt->nb_flows,
608 		.nb_atomic_order_sequences = opt->nb_flows,
609 	};
610 	/* queue configurations */
611 	for (queue = 0; queue < nb_queues; queue++) {
612 		q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
613 
614 		if (!t->internal_port) {
615 			is_prod = false;
616 			RTE_ETH_FOREACH_DEV(prod) {
617 				if (queue == tx_evqueue_id[prod]) {
618 					q_conf.event_queue_cfg =
619 						RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
620 					is_prod = true;
621 					break;
622 				}
623 			}
624 			if (!is_prod) {
625 				queue_arr[nb_worker_queues] = queue;
626 				nb_worker_queues++;
627 			}
628 		}
629 
630 		ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
631 		if (ret) {
632 			evt_err("failed to setup queue=%d", queue);
633 			return ret;
634 		}
635 	}
636 
637 	if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
638 		opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
639 
640 	/* port configuration */
641 	const struct rte_event_port_conf p_conf = {
642 		.dequeue_depth = opt->wkr_deq_dep,
643 		.enqueue_depth = info.max_event_port_dequeue_depth,
644 		.new_event_threshold = info.max_num_events,
645 	};
646 
647 	if (!t->internal_port)
648 		ret = pipeline_event_port_setup(test, opt, queue_arr,
649 				nb_worker_queues, p_conf);
650 	else
651 		ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
652 				p_conf);
653 
654 	if (ret)
655 		return ret;
656 
657 	/*
658 	 * The pipelines are setup in the following manner:
659 	 *
660 	 * eth_dev_count = 2, nb_stages = 2, atq mode
661 	 *
662 	 * eth0, eth1 have Internal port capability :
663 	 *	queues = 2
664 	 *	stride = 1
665 	 *
666 	 *	event queue pipelines:
667 	 *	eth0 -> q0 ->Tx
668 	 *	eth1 -> q1 ->Tx
669 	 *
670 	 *	q0, q1 are configured as ATQ so, all the different stages can
671 	 *	be enqueued on the same queue.
672 	 *
673 	 * eth0, eth1 use Tx adapters service core :
674 	 *	queues = 4
675 	 *	stride = 1
676 	 *
677 	 *	event queue pipelines:
678 	 *	eth0 -> q0  -> q2 -> Tx
679 	 *	eth1 -> q1  -> q3 -> Tx
680 	 *
681 	 *	q0, q1 are configured as stated above.
682 	 *	q2, q3 configured as SINGLE_LINK.
683 	 */
684 	ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
685 	if (ret)
686 		return ret;
687 	ret = pipeline_event_tx_adapter_setup(opt, p_conf);
688 	if (ret)
689 		return ret;
690 
691 	if (!evt_has_distributed_sched(opt->dev_id)) {
692 		uint32_t service_id;
693 		rte_event_dev_service_id_get(opt->dev_id, &service_id);
694 		ret = evt_service_setup(service_id);
695 		if (ret) {
696 			evt_err("No service lcore found to run event dev.");
697 			return ret;
698 		}
699 	}
700 
701 	/* Connect the tx_evqueue_id to the Tx adapter port */
702 	if (!t->internal_port) {
703 		RTE_ETH_FOREACH_DEV(prod) {
704 			ret = rte_event_eth_tx_adapter_event_port_get(prod,
705 					&tx_evport_id);
706 			if (ret) {
707 				evt_err("Unable to get Tx adapter[%d]", prod);
708 				return ret;
709 			}
710 
711 			if (rte_event_port_link(opt->dev_id, tx_evport_id,
712 						&tx_evqueue_id[prod],
713 						NULL, 1) != 1) {
714 				evt_err("Unable to link Tx adptr[%d] evprt[%d]",
715 						prod, tx_evport_id);
716 				return ret;
717 			}
718 		}
719 	}
720 
721 	ret = rte_event_dev_start(opt->dev_id);
722 	if (ret) {
723 		evt_err("failed to start eventdev %d", opt->dev_id);
724 		return ret;
725 	}
726 
727 
728 	RTE_ETH_FOREACH_DEV(prod) {
729 		ret = rte_eth_dev_start(prod);
730 		if (ret) {
731 			evt_err("Ethernet dev [%d] failed to start."
732 					" Using synthetic producer", prod);
733 			return ret;
734 		}
735 	}
736 
737 	RTE_ETH_FOREACH_DEV(prod) {
738 		ret = rte_event_eth_rx_adapter_start(prod);
739 		if (ret) {
740 			evt_err("Rx adapter[%d] start failed", prod);
741 			return ret;
742 		}
743 
744 		ret = rte_event_eth_tx_adapter_start(prod);
745 		if (ret) {
746 			evt_err("Tx adapter[%d] start failed", prod);
747 			return ret;
748 		}
749 	}
750 
751 	memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
752 			RTE_MAX_ETHPORTS);
753 
754 	return 0;
755 }
756 
757 static void
pipeline_atq_opt_dump(struct evt_options * opt)758 pipeline_atq_opt_dump(struct evt_options *opt)
759 {
760 	pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
761 }
762 
763 static int
pipeline_atq_opt_check(struct evt_options * opt)764 pipeline_atq_opt_check(struct evt_options *opt)
765 {
766 	return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
767 }
768 
769 static bool
pipeline_atq_capability_check(struct evt_options * opt)770 pipeline_atq_capability_check(struct evt_options *opt)
771 {
772 	struct rte_event_dev_info dev_info;
773 
774 	rte_event_dev_info_get(opt->dev_id, &dev_info);
775 	if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
776 			dev_info.max_event_ports <
777 			evt_nr_active_lcores(opt->wlcores)) {
778 		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
779 			pipeline_atq_nb_event_queues(opt),
780 			dev_info.max_event_queues,
781 			evt_nr_active_lcores(opt->wlcores),
782 			dev_info.max_event_ports);
783 	}
784 	if (!evt_has_all_types_queue(opt->dev_id))
785 		return false;
786 
787 	return true;
788 }
789 
790 static const struct evt_test_ops pipeline_atq =  {
791 	.cap_check          = pipeline_atq_capability_check,
792 	.opt_check          = pipeline_atq_opt_check,
793 	.opt_dump           = pipeline_atq_opt_dump,
794 	.test_setup         = pipeline_test_setup,
795 	.mempool_setup      = pipeline_mempool_setup,
796 	.ethdev_setup	    = pipeline_ethdev_setup,
797 	.eventdev_setup     = pipeline_atq_eventdev_setup,
798 	.launch_lcores      = pipeline_atq_launch_lcores,
799 	.ethdev_rx_stop     = pipeline_ethdev_rx_stop,
800 	.eventdev_destroy   = pipeline_eventdev_destroy,
801 	.mempool_destroy    = pipeline_mempool_destroy,
802 	.ethdev_destroy	    = pipeline_ethdev_destroy,
803 	.test_result        = pipeline_test_result,
804 	.test_destroy       = pipeline_test_destroy,
805 };
806 
807 EVT_TEST_REGISTER(pipeline_atq);
808