xref: /dpdk/examples/eventdev_pipeline/pipeline_worker_tx.c (revision 6d239dd5295a8249a296ae9f0a5bc9802fea073e)
1*6d239dd5SPavan Nikhilesh /*
2*6d239dd5SPavan Nikhilesh  * SPDX-License-Identifier: BSD-3-Clause
3*6d239dd5SPavan Nikhilesh  * Copyright(c) 2010-2014 Intel Corporation
4*6d239dd5SPavan Nikhilesh  * Copyright 2017 Cavium, Inc.
5*6d239dd5SPavan Nikhilesh  */
6*6d239dd5SPavan Nikhilesh 
7*6d239dd5SPavan Nikhilesh #include "pipeline_common.h"
8*6d239dd5SPavan Nikhilesh 
9*6d239dd5SPavan Nikhilesh static __rte_always_inline void
10*6d239dd5SPavan Nikhilesh worker_fwd_event(struct rte_event *ev, uint8_t sched)
11*6d239dd5SPavan Nikhilesh {
12*6d239dd5SPavan Nikhilesh 	ev->event_type = RTE_EVENT_TYPE_CPU;
13*6d239dd5SPavan Nikhilesh 	ev->op = RTE_EVENT_OP_FORWARD;
14*6d239dd5SPavan Nikhilesh 	ev->sched_type = sched;
15*6d239dd5SPavan Nikhilesh }
16*6d239dd5SPavan Nikhilesh 
17*6d239dd5SPavan Nikhilesh static __rte_always_inline void
18*6d239dd5SPavan Nikhilesh worker_event_enqueue(const uint8_t dev, const uint8_t port,
19*6d239dd5SPavan Nikhilesh 		struct rte_event *ev)
20*6d239dd5SPavan Nikhilesh {
21*6d239dd5SPavan Nikhilesh 	while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
22*6d239dd5SPavan Nikhilesh 		rte_pause();
23*6d239dd5SPavan Nikhilesh }
24*6d239dd5SPavan Nikhilesh 
25*6d239dd5SPavan Nikhilesh static __rte_always_inline void
26*6d239dd5SPavan Nikhilesh worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
27*6d239dd5SPavan Nikhilesh 		struct rte_event *ev, const uint16_t nb_rx)
28*6d239dd5SPavan Nikhilesh {
29*6d239dd5SPavan Nikhilesh 	uint16_t enq;
30*6d239dd5SPavan Nikhilesh 
31*6d239dd5SPavan Nikhilesh 	enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
32*6d239dd5SPavan Nikhilesh 	while (enq < nb_rx) {
33*6d239dd5SPavan Nikhilesh 		enq += rte_event_enqueue_burst(dev, port,
34*6d239dd5SPavan Nikhilesh 						ev + enq, nb_rx - enq);
35*6d239dd5SPavan Nikhilesh 	}
36*6d239dd5SPavan Nikhilesh }
37*6d239dd5SPavan Nikhilesh 
38*6d239dd5SPavan Nikhilesh static __rte_always_inline void
39*6d239dd5SPavan Nikhilesh worker_tx_pkt(struct rte_mbuf *mbuf)
40*6d239dd5SPavan Nikhilesh {
41*6d239dd5SPavan Nikhilesh 	exchange_mac(mbuf);
42*6d239dd5SPavan Nikhilesh 	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
43*6d239dd5SPavan Nikhilesh 		rte_pause();
44*6d239dd5SPavan Nikhilesh }
45*6d239dd5SPavan Nikhilesh 
46*6d239dd5SPavan Nikhilesh /* Single stage pipeline workers */
47*6d239dd5SPavan Nikhilesh 
48*6d239dd5SPavan Nikhilesh static int
49*6d239dd5SPavan Nikhilesh worker_do_tx_single(void *arg)
50*6d239dd5SPavan Nikhilesh {
51*6d239dd5SPavan Nikhilesh 	struct worker_data *data = (struct worker_data *)arg;
52*6d239dd5SPavan Nikhilesh 	const uint8_t dev = data->dev_id;
53*6d239dd5SPavan Nikhilesh 	const uint8_t port = data->port_id;
54*6d239dd5SPavan Nikhilesh 	size_t fwd = 0, received = 0, tx = 0;
55*6d239dd5SPavan Nikhilesh 	struct rte_event ev;
56*6d239dd5SPavan Nikhilesh 
57*6d239dd5SPavan Nikhilesh 	while (!fdata->done) {
58*6d239dd5SPavan Nikhilesh 
59*6d239dd5SPavan Nikhilesh 		if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
60*6d239dd5SPavan Nikhilesh 			rte_pause();
61*6d239dd5SPavan Nikhilesh 			continue;
62*6d239dd5SPavan Nikhilesh 		}
63*6d239dd5SPavan Nikhilesh 
64*6d239dd5SPavan Nikhilesh 		received++;
65*6d239dd5SPavan Nikhilesh 
66*6d239dd5SPavan Nikhilesh 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
67*6d239dd5SPavan Nikhilesh 			worker_tx_pkt(ev.mbuf);
68*6d239dd5SPavan Nikhilesh 			tx++;
69*6d239dd5SPavan Nikhilesh 			continue;
70*6d239dd5SPavan Nikhilesh 		}
71*6d239dd5SPavan Nikhilesh 		work();
72*6d239dd5SPavan Nikhilesh 		ev.queue_id++;
73*6d239dd5SPavan Nikhilesh 		worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
74*6d239dd5SPavan Nikhilesh 		worker_event_enqueue(dev, port, &ev);
75*6d239dd5SPavan Nikhilesh 		fwd++;
76*6d239dd5SPavan Nikhilesh 	}
77*6d239dd5SPavan Nikhilesh 
78*6d239dd5SPavan Nikhilesh 	if (!cdata.quiet)
79*6d239dd5SPavan Nikhilesh 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
80*6d239dd5SPavan Nikhilesh 				rte_lcore_id(), received, fwd, tx);
81*6d239dd5SPavan Nikhilesh 	return 0;
82*6d239dd5SPavan Nikhilesh }
83*6d239dd5SPavan Nikhilesh 
84*6d239dd5SPavan Nikhilesh static int
85*6d239dd5SPavan Nikhilesh worker_do_tx_single_atq(void *arg)
86*6d239dd5SPavan Nikhilesh {
87*6d239dd5SPavan Nikhilesh 	struct worker_data *data = (struct worker_data *)arg;
88*6d239dd5SPavan Nikhilesh 	const uint8_t dev = data->dev_id;
89*6d239dd5SPavan Nikhilesh 	const uint8_t port = data->port_id;
90*6d239dd5SPavan Nikhilesh 	size_t fwd = 0, received = 0, tx = 0;
91*6d239dd5SPavan Nikhilesh 	struct rte_event ev;
92*6d239dd5SPavan Nikhilesh 
93*6d239dd5SPavan Nikhilesh 	while (!fdata->done) {
94*6d239dd5SPavan Nikhilesh 
95*6d239dd5SPavan Nikhilesh 		if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
96*6d239dd5SPavan Nikhilesh 			rte_pause();
97*6d239dd5SPavan Nikhilesh 			continue;
98*6d239dd5SPavan Nikhilesh 		}
99*6d239dd5SPavan Nikhilesh 
100*6d239dd5SPavan Nikhilesh 		received++;
101*6d239dd5SPavan Nikhilesh 
102*6d239dd5SPavan Nikhilesh 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
103*6d239dd5SPavan Nikhilesh 			worker_tx_pkt(ev.mbuf);
104*6d239dd5SPavan Nikhilesh 			tx++;
105*6d239dd5SPavan Nikhilesh 			continue;
106*6d239dd5SPavan Nikhilesh 		}
107*6d239dd5SPavan Nikhilesh 		work();
108*6d239dd5SPavan Nikhilesh 		worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
109*6d239dd5SPavan Nikhilesh 		worker_event_enqueue(dev, port, &ev);
110*6d239dd5SPavan Nikhilesh 		fwd++;
111*6d239dd5SPavan Nikhilesh 	}
112*6d239dd5SPavan Nikhilesh 
113*6d239dd5SPavan Nikhilesh 	if (!cdata.quiet)
114*6d239dd5SPavan Nikhilesh 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
115*6d239dd5SPavan Nikhilesh 				rte_lcore_id(), received, fwd, tx);
116*6d239dd5SPavan Nikhilesh 	return 0;
117*6d239dd5SPavan Nikhilesh }
118*6d239dd5SPavan Nikhilesh 
119*6d239dd5SPavan Nikhilesh static int
120*6d239dd5SPavan Nikhilesh worker_do_tx_single_burst(void *arg)
121*6d239dd5SPavan Nikhilesh {
122*6d239dd5SPavan Nikhilesh 	struct rte_event ev[BATCH_SIZE + 1];
123*6d239dd5SPavan Nikhilesh 
124*6d239dd5SPavan Nikhilesh 	struct worker_data *data = (struct worker_data *)arg;
125*6d239dd5SPavan Nikhilesh 	const uint8_t dev = data->dev_id;
126*6d239dd5SPavan Nikhilesh 	const uint8_t port = data->port_id;
127*6d239dd5SPavan Nikhilesh 	size_t fwd = 0, received = 0, tx = 0;
128*6d239dd5SPavan Nikhilesh 
129*6d239dd5SPavan Nikhilesh 	while (!fdata->done) {
130*6d239dd5SPavan Nikhilesh 		uint16_t i;
131*6d239dd5SPavan Nikhilesh 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
132*6d239dd5SPavan Nikhilesh 				BATCH_SIZE, 0);
133*6d239dd5SPavan Nikhilesh 
134*6d239dd5SPavan Nikhilesh 		if (!nb_rx) {
135*6d239dd5SPavan Nikhilesh 			rte_pause();
136*6d239dd5SPavan Nikhilesh 			continue;
137*6d239dd5SPavan Nikhilesh 		}
138*6d239dd5SPavan Nikhilesh 		received += nb_rx;
139*6d239dd5SPavan Nikhilesh 
140*6d239dd5SPavan Nikhilesh 		for (i = 0; i < nb_rx; i++) {
141*6d239dd5SPavan Nikhilesh 			rte_prefetch0(ev[i + 1].mbuf);
142*6d239dd5SPavan Nikhilesh 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
143*6d239dd5SPavan Nikhilesh 
144*6d239dd5SPavan Nikhilesh 				worker_tx_pkt(ev[i].mbuf);
145*6d239dd5SPavan Nikhilesh 				ev[i].op = RTE_EVENT_OP_RELEASE;
146*6d239dd5SPavan Nikhilesh 				tx++;
147*6d239dd5SPavan Nikhilesh 
148*6d239dd5SPavan Nikhilesh 			} else {
149*6d239dd5SPavan Nikhilesh 				ev[i].queue_id++;
150*6d239dd5SPavan Nikhilesh 				worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
151*6d239dd5SPavan Nikhilesh 			}
152*6d239dd5SPavan Nikhilesh 			work();
153*6d239dd5SPavan Nikhilesh 		}
154*6d239dd5SPavan Nikhilesh 
155*6d239dd5SPavan Nikhilesh 		worker_event_enqueue_burst(dev, port, ev, nb_rx);
156*6d239dd5SPavan Nikhilesh 		fwd += nb_rx;
157*6d239dd5SPavan Nikhilesh 	}
158*6d239dd5SPavan Nikhilesh 
159*6d239dd5SPavan Nikhilesh 	if (!cdata.quiet)
160*6d239dd5SPavan Nikhilesh 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
161*6d239dd5SPavan Nikhilesh 				rte_lcore_id(), received, fwd, tx);
162*6d239dd5SPavan Nikhilesh 	return 0;
163*6d239dd5SPavan Nikhilesh }
164*6d239dd5SPavan Nikhilesh 
165*6d239dd5SPavan Nikhilesh static int
166*6d239dd5SPavan Nikhilesh worker_do_tx_single_burst_atq(void *arg)
167*6d239dd5SPavan Nikhilesh {
168*6d239dd5SPavan Nikhilesh 	struct rte_event ev[BATCH_SIZE + 1];
169*6d239dd5SPavan Nikhilesh 
170*6d239dd5SPavan Nikhilesh 	struct worker_data *data = (struct worker_data *)arg;
171*6d239dd5SPavan Nikhilesh 	const uint8_t dev = data->dev_id;
172*6d239dd5SPavan Nikhilesh 	const uint8_t port = data->port_id;
173*6d239dd5SPavan Nikhilesh 	size_t fwd = 0, received = 0, tx = 0;
174*6d239dd5SPavan Nikhilesh 
175*6d239dd5SPavan Nikhilesh 	while (!fdata->done) {
176*6d239dd5SPavan Nikhilesh 		uint16_t i;
177*6d239dd5SPavan Nikhilesh 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
178*6d239dd5SPavan Nikhilesh 				BATCH_SIZE, 0);
179*6d239dd5SPavan Nikhilesh 
180*6d239dd5SPavan Nikhilesh 		if (!nb_rx) {
181*6d239dd5SPavan Nikhilesh 			rte_pause();
182*6d239dd5SPavan Nikhilesh 			continue;
183*6d239dd5SPavan Nikhilesh 		}
184*6d239dd5SPavan Nikhilesh 
185*6d239dd5SPavan Nikhilesh 		received += nb_rx;
186*6d239dd5SPavan Nikhilesh 
187*6d239dd5SPavan Nikhilesh 		for (i = 0; i < nb_rx; i++) {
188*6d239dd5SPavan Nikhilesh 			rte_prefetch0(ev[i + 1].mbuf);
189*6d239dd5SPavan Nikhilesh 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
190*6d239dd5SPavan Nikhilesh 
191*6d239dd5SPavan Nikhilesh 				worker_tx_pkt(ev[i].mbuf);
192*6d239dd5SPavan Nikhilesh 				ev[i].op = RTE_EVENT_OP_RELEASE;
193*6d239dd5SPavan Nikhilesh 				tx++;
194*6d239dd5SPavan Nikhilesh 			} else
195*6d239dd5SPavan Nikhilesh 				worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
196*6d239dd5SPavan Nikhilesh 			work();
197*6d239dd5SPavan Nikhilesh 		}
198*6d239dd5SPavan Nikhilesh 
199*6d239dd5SPavan Nikhilesh 		worker_event_enqueue_burst(dev, port, ev, nb_rx);
200*6d239dd5SPavan Nikhilesh 		fwd += nb_rx;
201*6d239dd5SPavan Nikhilesh 	}
202*6d239dd5SPavan Nikhilesh 
203*6d239dd5SPavan Nikhilesh 	if (!cdata.quiet)
204*6d239dd5SPavan Nikhilesh 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
205*6d239dd5SPavan Nikhilesh 				rte_lcore_id(), received, fwd, tx);
206*6d239dd5SPavan Nikhilesh 	return 0;
207*6d239dd5SPavan Nikhilesh }
208*6d239dd5SPavan Nikhilesh 
209*6d239dd5SPavan Nikhilesh /* Multi stage Pipeline Workers */
210*6d239dd5SPavan Nikhilesh 
211*6d239dd5SPavan Nikhilesh static int
212*6d239dd5SPavan Nikhilesh worker_do_tx(void *arg)
213*6d239dd5SPavan Nikhilesh {
214*6d239dd5SPavan Nikhilesh 	struct rte_event ev;
215*6d239dd5SPavan Nikhilesh 
216*6d239dd5SPavan Nikhilesh 	struct worker_data *data = (struct worker_data *)arg;
217*6d239dd5SPavan Nikhilesh 	const uint8_t dev = data->dev_id;
218*6d239dd5SPavan Nikhilesh 	const uint8_t port = data->port_id;
219*6d239dd5SPavan Nikhilesh 	const uint8_t lst_qid = cdata.num_stages - 1;
220*6d239dd5SPavan Nikhilesh 	size_t fwd = 0, received = 0, tx = 0;
221*6d239dd5SPavan Nikhilesh 
222*6d239dd5SPavan Nikhilesh 
223*6d239dd5SPavan Nikhilesh 	while (!fdata->done) {
224*6d239dd5SPavan Nikhilesh 
225*6d239dd5SPavan Nikhilesh 		if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
226*6d239dd5SPavan Nikhilesh 			rte_pause();
227*6d239dd5SPavan Nikhilesh 			continue;
228*6d239dd5SPavan Nikhilesh 		}
229*6d239dd5SPavan Nikhilesh 
230*6d239dd5SPavan Nikhilesh 		received++;
231*6d239dd5SPavan Nikhilesh 		const uint8_t cq_id = ev.queue_id % cdata.num_stages;
232*6d239dd5SPavan Nikhilesh 
233*6d239dd5SPavan Nikhilesh 		if (cq_id >= lst_qid) {
234*6d239dd5SPavan Nikhilesh 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
235*6d239dd5SPavan Nikhilesh 				worker_tx_pkt(ev.mbuf);
236*6d239dd5SPavan Nikhilesh 				tx++;
237*6d239dd5SPavan Nikhilesh 				continue;
238*6d239dd5SPavan Nikhilesh 			}
239*6d239dd5SPavan Nikhilesh 
240*6d239dd5SPavan Nikhilesh 			worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
241*6d239dd5SPavan Nikhilesh 			ev.queue_id = (cq_id == lst_qid) ?
242*6d239dd5SPavan Nikhilesh 				cdata.next_qid[ev.queue_id] : ev.queue_id;
243*6d239dd5SPavan Nikhilesh 		} else {
244*6d239dd5SPavan Nikhilesh 			ev.queue_id = cdata.next_qid[ev.queue_id];
245*6d239dd5SPavan Nikhilesh 			worker_fwd_event(&ev, cdata.queue_type);
246*6d239dd5SPavan Nikhilesh 		}
247*6d239dd5SPavan Nikhilesh 		work();
248*6d239dd5SPavan Nikhilesh 
249*6d239dd5SPavan Nikhilesh 		worker_event_enqueue(dev, port, &ev);
250*6d239dd5SPavan Nikhilesh 		fwd++;
251*6d239dd5SPavan Nikhilesh 	}
252*6d239dd5SPavan Nikhilesh 
253*6d239dd5SPavan Nikhilesh 	if (!cdata.quiet)
254*6d239dd5SPavan Nikhilesh 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
255*6d239dd5SPavan Nikhilesh 				rte_lcore_id(), received, fwd, tx);
256*6d239dd5SPavan Nikhilesh 
257*6d239dd5SPavan Nikhilesh 	return 0;
258*6d239dd5SPavan Nikhilesh }
259*6d239dd5SPavan Nikhilesh 
260*6d239dd5SPavan Nikhilesh static int
261*6d239dd5SPavan Nikhilesh worker_do_tx_atq(void *arg)
262*6d239dd5SPavan Nikhilesh {
263*6d239dd5SPavan Nikhilesh 	struct rte_event ev;
264*6d239dd5SPavan Nikhilesh 
265*6d239dd5SPavan Nikhilesh 	struct worker_data *data = (struct worker_data *)arg;
266*6d239dd5SPavan Nikhilesh 	const uint8_t dev = data->dev_id;
267*6d239dd5SPavan Nikhilesh 	const uint8_t port = data->port_id;
268*6d239dd5SPavan Nikhilesh 	const uint8_t lst_qid = cdata.num_stages - 1;
269*6d239dd5SPavan Nikhilesh 	size_t fwd = 0, received = 0, tx = 0;
270*6d239dd5SPavan Nikhilesh 
271*6d239dd5SPavan Nikhilesh 	while (!fdata->done) {
272*6d239dd5SPavan Nikhilesh 
273*6d239dd5SPavan Nikhilesh 		if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
274*6d239dd5SPavan Nikhilesh 			rte_pause();
275*6d239dd5SPavan Nikhilesh 			continue;
276*6d239dd5SPavan Nikhilesh 		}
277*6d239dd5SPavan Nikhilesh 
278*6d239dd5SPavan Nikhilesh 		received++;
279*6d239dd5SPavan Nikhilesh 		const uint8_t cq_id = ev.sub_event_type % cdata.num_stages;
280*6d239dd5SPavan Nikhilesh 
281*6d239dd5SPavan Nikhilesh 		if (cq_id == lst_qid) {
282*6d239dd5SPavan Nikhilesh 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
283*6d239dd5SPavan Nikhilesh 				worker_tx_pkt(ev.mbuf);
284*6d239dd5SPavan Nikhilesh 				tx++;
285*6d239dd5SPavan Nikhilesh 				continue;
286*6d239dd5SPavan Nikhilesh 			}
287*6d239dd5SPavan Nikhilesh 
288*6d239dd5SPavan Nikhilesh 			worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
289*6d239dd5SPavan Nikhilesh 		} else {
290*6d239dd5SPavan Nikhilesh 			ev.sub_event_type++;
291*6d239dd5SPavan Nikhilesh 			worker_fwd_event(&ev, cdata.queue_type);
292*6d239dd5SPavan Nikhilesh 		}
293*6d239dd5SPavan Nikhilesh 		work();
294*6d239dd5SPavan Nikhilesh 
295*6d239dd5SPavan Nikhilesh 		worker_event_enqueue(dev, port, &ev);
296*6d239dd5SPavan Nikhilesh 		fwd++;
297*6d239dd5SPavan Nikhilesh 	}
298*6d239dd5SPavan Nikhilesh 
299*6d239dd5SPavan Nikhilesh 	if (!cdata.quiet)
300*6d239dd5SPavan Nikhilesh 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
301*6d239dd5SPavan Nikhilesh 				rte_lcore_id(), received, fwd, tx);
302*6d239dd5SPavan Nikhilesh 
303*6d239dd5SPavan Nikhilesh 	return 0;
304*6d239dd5SPavan Nikhilesh }
305*6d239dd5SPavan Nikhilesh 
306*6d239dd5SPavan Nikhilesh static int
307*6d239dd5SPavan Nikhilesh worker_do_tx_burst(void *arg)
308*6d239dd5SPavan Nikhilesh {
309*6d239dd5SPavan Nikhilesh 	struct rte_event ev[BATCH_SIZE];
310*6d239dd5SPavan Nikhilesh 
311*6d239dd5SPavan Nikhilesh 	struct worker_data *data = (struct worker_data *)arg;
312*6d239dd5SPavan Nikhilesh 	uint8_t dev = data->dev_id;
313*6d239dd5SPavan Nikhilesh 	uint8_t port = data->port_id;
314*6d239dd5SPavan Nikhilesh 	uint8_t lst_qid = cdata.num_stages - 1;
315*6d239dd5SPavan Nikhilesh 	size_t fwd = 0, received = 0, tx = 0;
316*6d239dd5SPavan Nikhilesh 
317*6d239dd5SPavan Nikhilesh 	while (!fdata->done) {
318*6d239dd5SPavan Nikhilesh 		uint16_t i;
319*6d239dd5SPavan Nikhilesh 		const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
320*6d239dd5SPavan Nikhilesh 				ev, BATCH_SIZE, 0);
321*6d239dd5SPavan Nikhilesh 
322*6d239dd5SPavan Nikhilesh 		if (nb_rx == 0) {
323*6d239dd5SPavan Nikhilesh 			rte_pause();
324*6d239dd5SPavan Nikhilesh 			continue;
325*6d239dd5SPavan Nikhilesh 		}
326*6d239dd5SPavan Nikhilesh 		received += nb_rx;
327*6d239dd5SPavan Nikhilesh 
328*6d239dd5SPavan Nikhilesh 		for (i = 0; i < nb_rx; i++) {
329*6d239dd5SPavan Nikhilesh 			const uint8_t cq_id = ev[i].queue_id % cdata.num_stages;
330*6d239dd5SPavan Nikhilesh 
331*6d239dd5SPavan Nikhilesh 			if (cq_id >= lst_qid) {
332*6d239dd5SPavan Nikhilesh 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
333*6d239dd5SPavan Nikhilesh 					worker_tx_pkt(ev[i].mbuf);
334*6d239dd5SPavan Nikhilesh 					tx++;
335*6d239dd5SPavan Nikhilesh 					ev[i].op = RTE_EVENT_OP_RELEASE;
336*6d239dd5SPavan Nikhilesh 					continue;
337*6d239dd5SPavan Nikhilesh 				}
338*6d239dd5SPavan Nikhilesh 				ev[i].queue_id = (cq_id == lst_qid) ?
339*6d239dd5SPavan Nikhilesh 					cdata.next_qid[ev[i].queue_id] :
340*6d239dd5SPavan Nikhilesh 					ev[i].queue_id;
341*6d239dd5SPavan Nikhilesh 
342*6d239dd5SPavan Nikhilesh 				worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
343*6d239dd5SPavan Nikhilesh 			} else {
344*6d239dd5SPavan Nikhilesh 				ev[i].queue_id = cdata.next_qid[ev[i].queue_id];
345*6d239dd5SPavan Nikhilesh 				worker_fwd_event(&ev[i], cdata.queue_type);
346*6d239dd5SPavan Nikhilesh 			}
347*6d239dd5SPavan Nikhilesh 			work();
348*6d239dd5SPavan Nikhilesh 		}
349*6d239dd5SPavan Nikhilesh 		worker_event_enqueue_burst(dev, port, ev, nb_rx);
350*6d239dd5SPavan Nikhilesh 
351*6d239dd5SPavan Nikhilesh 		fwd += nb_rx;
352*6d239dd5SPavan Nikhilesh 	}
353*6d239dd5SPavan Nikhilesh 
354*6d239dd5SPavan Nikhilesh 	if (!cdata.quiet)
355*6d239dd5SPavan Nikhilesh 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
356*6d239dd5SPavan Nikhilesh 				rte_lcore_id(), received, fwd, tx);
357*6d239dd5SPavan Nikhilesh 
358*6d239dd5SPavan Nikhilesh 	return 0;
359*6d239dd5SPavan Nikhilesh }
360*6d239dd5SPavan Nikhilesh 
361*6d239dd5SPavan Nikhilesh static int
362*6d239dd5SPavan Nikhilesh worker_do_tx_burst_atq(void *arg)
363*6d239dd5SPavan Nikhilesh {
364*6d239dd5SPavan Nikhilesh 	struct rte_event ev[BATCH_SIZE];
365*6d239dd5SPavan Nikhilesh 
366*6d239dd5SPavan Nikhilesh 	struct worker_data *data = (struct worker_data *)arg;
367*6d239dd5SPavan Nikhilesh 	uint8_t dev = data->dev_id;
368*6d239dd5SPavan Nikhilesh 	uint8_t port = data->port_id;
369*6d239dd5SPavan Nikhilesh 	uint8_t lst_qid = cdata.num_stages - 1;
370*6d239dd5SPavan Nikhilesh 	size_t fwd = 0, received = 0, tx = 0;
371*6d239dd5SPavan Nikhilesh 
372*6d239dd5SPavan Nikhilesh 	while (!fdata->done) {
373*6d239dd5SPavan Nikhilesh 		uint16_t i;
374*6d239dd5SPavan Nikhilesh 
375*6d239dd5SPavan Nikhilesh 		const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
376*6d239dd5SPavan Nikhilesh 				ev, BATCH_SIZE, 0);
377*6d239dd5SPavan Nikhilesh 
378*6d239dd5SPavan Nikhilesh 		if (nb_rx == 0) {
379*6d239dd5SPavan Nikhilesh 			rte_pause();
380*6d239dd5SPavan Nikhilesh 			continue;
381*6d239dd5SPavan Nikhilesh 		}
382*6d239dd5SPavan Nikhilesh 		received += nb_rx;
383*6d239dd5SPavan Nikhilesh 
384*6d239dd5SPavan Nikhilesh 		for (i = 0; i < nb_rx; i++) {
385*6d239dd5SPavan Nikhilesh 			const uint8_t cq_id = ev[i].sub_event_type %
386*6d239dd5SPavan Nikhilesh 				cdata.num_stages;
387*6d239dd5SPavan Nikhilesh 
388*6d239dd5SPavan Nikhilesh 			if (cq_id == lst_qid) {
389*6d239dd5SPavan Nikhilesh 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
390*6d239dd5SPavan Nikhilesh 					worker_tx_pkt(ev[i].mbuf);
391*6d239dd5SPavan Nikhilesh 					tx++;
392*6d239dd5SPavan Nikhilesh 					ev[i].op = RTE_EVENT_OP_RELEASE;
393*6d239dd5SPavan Nikhilesh 					continue;
394*6d239dd5SPavan Nikhilesh 				}
395*6d239dd5SPavan Nikhilesh 
396*6d239dd5SPavan Nikhilesh 				worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
397*6d239dd5SPavan Nikhilesh 			} else {
398*6d239dd5SPavan Nikhilesh 				ev[i].sub_event_type++;
399*6d239dd5SPavan Nikhilesh 				worker_fwd_event(&ev[i], cdata.queue_type);
400*6d239dd5SPavan Nikhilesh 			}
401*6d239dd5SPavan Nikhilesh 			work();
402*6d239dd5SPavan Nikhilesh 		}
403*6d239dd5SPavan Nikhilesh 
404*6d239dd5SPavan Nikhilesh 		worker_event_enqueue_burst(dev, port, ev, nb_rx);
405*6d239dd5SPavan Nikhilesh 		fwd += nb_rx;
406*6d239dd5SPavan Nikhilesh 	}
407*6d239dd5SPavan Nikhilesh 
408*6d239dd5SPavan Nikhilesh 	if (!cdata.quiet)
409*6d239dd5SPavan Nikhilesh 		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
410*6d239dd5SPavan Nikhilesh 				rte_lcore_id(), received, fwd, tx);
411*6d239dd5SPavan Nikhilesh 
412*6d239dd5SPavan Nikhilesh 	return 0;
413*6d239dd5SPavan Nikhilesh }
414*6d239dd5SPavan Nikhilesh 
415*6d239dd5SPavan Nikhilesh static int
416*6d239dd5SPavan Nikhilesh setup_eventdev_worker_tx(struct cons_data *cons_data,
417*6d239dd5SPavan Nikhilesh 		struct worker_data *worker_data)
418*6d239dd5SPavan Nikhilesh {
419*6d239dd5SPavan Nikhilesh 	RTE_SET_USED(cons_data);
420*6d239dd5SPavan Nikhilesh 	uint8_t i;
421*6d239dd5SPavan Nikhilesh 	const uint8_t atq = cdata.all_type_queues ? 1 : 0;
422*6d239dd5SPavan Nikhilesh 	const uint8_t dev_id = 0;
423*6d239dd5SPavan Nikhilesh 	const uint8_t nb_ports = cdata.num_workers;
424*6d239dd5SPavan Nikhilesh 	uint8_t nb_slots = 0;
425*6d239dd5SPavan Nikhilesh 	uint8_t nb_queues = rte_eth_dev_count();
426*6d239dd5SPavan Nikhilesh 
427*6d239dd5SPavan Nikhilesh 	/*
428*6d239dd5SPavan Nikhilesh 	 * In case where all type queues are not enabled, use queues equal to
429*6d239dd5SPavan Nikhilesh 	 * number of stages * eth_dev_count and one extra queue per pipeline
430*6d239dd5SPavan Nikhilesh 	 * for Tx.
431*6d239dd5SPavan Nikhilesh 	 */
432*6d239dd5SPavan Nikhilesh 	if (!atq) {
433*6d239dd5SPavan Nikhilesh 		nb_queues *= cdata.num_stages;
434*6d239dd5SPavan Nikhilesh 		nb_queues += rte_eth_dev_count();
435*6d239dd5SPavan Nikhilesh 	}
436*6d239dd5SPavan Nikhilesh 
437*6d239dd5SPavan Nikhilesh 	struct rte_event_dev_config config = {
438*6d239dd5SPavan Nikhilesh 			.nb_event_queues = nb_queues,
439*6d239dd5SPavan Nikhilesh 			.nb_event_ports = nb_ports,
440*6d239dd5SPavan Nikhilesh 			.nb_events_limit  = 4096,
441*6d239dd5SPavan Nikhilesh 			.nb_event_queue_flows = 1024,
442*6d239dd5SPavan Nikhilesh 			.nb_event_port_dequeue_depth = 128,
443*6d239dd5SPavan Nikhilesh 			.nb_event_port_enqueue_depth = 128,
444*6d239dd5SPavan Nikhilesh 	};
445*6d239dd5SPavan Nikhilesh 	struct rte_event_port_conf wkr_p_conf = {
446*6d239dd5SPavan Nikhilesh 			.dequeue_depth = cdata.worker_cq_depth,
447*6d239dd5SPavan Nikhilesh 			.enqueue_depth = 64,
448*6d239dd5SPavan Nikhilesh 			.new_event_threshold = 4096,
449*6d239dd5SPavan Nikhilesh 	};
450*6d239dd5SPavan Nikhilesh 	struct rte_event_queue_conf wkr_q_conf = {
451*6d239dd5SPavan Nikhilesh 			.schedule_type = cdata.queue_type,
452*6d239dd5SPavan Nikhilesh 			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
453*6d239dd5SPavan Nikhilesh 			.nb_atomic_flows = 1024,
454*6d239dd5SPavan Nikhilesh 			.nb_atomic_order_sequences = 1024,
455*6d239dd5SPavan Nikhilesh 	};
456*6d239dd5SPavan Nikhilesh 
457*6d239dd5SPavan Nikhilesh 	int ret, ndev = rte_event_dev_count();
458*6d239dd5SPavan Nikhilesh 
459*6d239dd5SPavan Nikhilesh 	if (ndev < 1) {
460*6d239dd5SPavan Nikhilesh 		printf("%d: No Eventdev Devices Found\n", __LINE__);
461*6d239dd5SPavan Nikhilesh 		return -1;
462*6d239dd5SPavan Nikhilesh 	}
463*6d239dd5SPavan Nikhilesh 
464*6d239dd5SPavan Nikhilesh 
465*6d239dd5SPavan Nikhilesh 	struct rte_event_dev_info dev_info;
466*6d239dd5SPavan Nikhilesh 	ret = rte_event_dev_info_get(dev_id, &dev_info);
467*6d239dd5SPavan Nikhilesh 	printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
468*6d239dd5SPavan Nikhilesh 
469*6d239dd5SPavan Nikhilesh 	if (dev_info.max_event_port_dequeue_depth <
470*6d239dd5SPavan Nikhilesh 			config.nb_event_port_dequeue_depth)
471*6d239dd5SPavan Nikhilesh 		config.nb_event_port_dequeue_depth =
472*6d239dd5SPavan Nikhilesh 				dev_info.max_event_port_dequeue_depth;
473*6d239dd5SPavan Nikhilesh 	if (dev_info.max_event_port_enqueue_depth <
474*6d239dd5SPavan Nikhilesh 			config.nb_event_port_enqueue_depth)
475*6d239dd5SPavan Nikhilesh 		config.nb_event_port_enqueue_depth =
476*6d239dd5SPavan Nikhilesh 				dev_info.max_event_port_enqueue_depth;
477*6d239dd5SPavan Nikhilesh 
478*6d239dd5SPavan Nikhilesh 	ret = rte_event_dev_configure(dev_id, &config);
479*6d239dd5SPavan Nikhilesh 	if (ret < 0) {
480*6d239dd5SPavan Nikhilesh 		printf("%d: Error configuring device\n", __LINE__);
481*6d239dd5SPavan Nikhilesh 		return -1;
482*6d239dd5SPavan Nikhilesh 	}
483*6d239dd5SPavan Nikhilesh 
484*6d239dd5SPavan Nikhilesh 	printf("  Stages:\n");
485*6d239dd5SPavan Nikhilesh 	for (i = 0; i < nb_queues; i++) {
486*6d239dd5SPavan Nikhilesh 
487*6d239dd5SPavan Nikhilesh 		if (atq) {
488*6d239dd5SPavan Nikhilesh 
489*6d239dd5SPavan Nikhilesh 			nb_slots = cdata.num_stages;
490*6d239dd5SPavan Nikhilesh 			wkr_q_conf.event_queue_cfg =
491*6d239dd5SPavan Nikhilesh 				RTE_EVENT_QUEUE_CFG_ALL_TYPES;
492*6d239dd5SPavan Nikhilesh 		} else {
493*6d239dd5SPavan Nikhilesh 			uint8_t slot;
494*6d239dd5SPavan Nikhilesh 
495*6d239dd5SPavan Nikhilesh 			nb_slots = cdata.num_stages + 1;
496*6d239dd5SPavan Nikhilesh 			slot = i % nb_slots;
497*6d239dd5SPavan Nikhilesh 			wkr_q_conf.schedule_type = slot == cdata.num_stages ?
498*6d239dd5SPavan Nikhilesh 				RTE_SCHED_TYPE_ATOMIC : cdata.queue_type;
499*6d239dd5SPavan Nikhilesh 		}
500*6d239dd5SPavan Nikhilesh 
501*6d239dd5SPavan Nikhilesh 		if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
502*6d239dd5SPavan Nikhilesh 			printf("%d: error creating qid %d\n", __LINE__, i);
503*6d239dd5SPavan Nikhilesh 			return -1;
504*6d239dd5SPavan Nikhilesh 		}
505*6d239dd5SPavan Nikhilesh 		cdata.qid[i] = i;
506*6d239dd5SPavan Nikhilesh 		cdata.next_qid[i] = i+1;
507*6d239dd5SPavan Nikhilesh 		if (cdata.enable_queue_priorities) {
508*6d239dd5SPavan Nikhilesh 			const uint32_t prio_delta =
509*6d239dd5SPavan Nikhilesh 				(RTE_EVENT_DEV_PRIORITY_LOWEST) /
510*6d239dd5SPavan Nikhilesh 				nb_slots;
511*6d239dd5SPavan Nikhilesh 
512*6d239dd5SPavan Nikhilesh 			/* higher priority for queues closer to tx */
513*6d239dd5SPavan Nikhilesh 			wkr_q_conf.priority =
514*6d239dd5SPavan Nikhilesh 				RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta *
515*6d239dd5SPavan Nikhilesh 				(i % nb_slots);
516*6d239dd5SPavan Nikhilesh 		}
517*6d239dd5SPavan Nikhilesh 
518*6d239dd5SPavan Nikhilesh 		const char *type_str = "Atomic";
519*6d239dd5SPavan Nikhilesh 		switch (wkr_q_conf.schedule_type) {
520*6d239dd5SPavan Nikhilesh 		case RTE_SCHED_TYPE_ORDERED:
521*6d239dd5SPavan Nikhilesh 			type_str = "Ordered";
522*6d239dd5SPavan Nikhilesh 			break;
523*6d239dd5SPavan Nikhilesh 		case RTE_SCHED_TYPE_PARALLEL:
524*6d239dd5SPavan Nikhilesh 			type_str = "Parallel";
525*6d239dd5SPavan Nikhilesh 			break;
526*6d239dd5SPavan Nikhilesh 		}
527*6d239dd5SPavan Nikhilesh 		printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
528*6d239dd5SPavan Nikhilesh 				wkr_q_conf.priority);
529*6d239dd5SPavan Nikhilesh 	}
530*6d239dd5SPavan Nikhilesh 
531*6d239dd5SPavan Nikhilesh 	printf("\n");
532*6d239dd5SPavan Nikhilesh 	if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
533*6d239dd5SPavan Nikhilesh 		wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
534*6d239dd5SPavan Nikhilesh 	if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
535*6d239dd5SPavan Nikhilesh 		wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
536*6d239dd5SPavan Nikhilesh 
537*6d239dd5SPavan Nikhilesh 	/* set up one port per worker, linking to all stage queues */
538*6d239dd5SPavan Nikhilesh 	for (i = 0; i < cdata.num_workers; i++) {
539*6d239dd5SPavan Nikhilesh 		struct worker_data *w = &worker_data[i];
540*6d239dd5SPavan Nikhilesh 		w->dev_id = dev_id;
541*6d239dd5SPavan Nikhilesh 		if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
542*6d239dd5SPavan Nikhilesh 			printf("Error setting up port %d\n", i);
543*6d239dd5SPavan Nikhilesh 			return -1;
544*6d239dd5SPavan Nikhilesh 		}
545*6d239dd5SPavan Nikhilesh 
546*6d239dd5SPavan Nikhilesh 		if (rte_event_port_link(dev_id, i, NULL, NULL, 0)
547*6d239dd5SPavan Nikhilesh 				!= nb_queues) {
548*6d239dd5SPavan Nikhilesh 			printf("%d: error creating link for port %d\n",
549*6d239dd5SPavan Nikhilesh 					__LINE__, i);
550*6d239dd5SPavan Nikhilesh 			return -1;
551*6d239dd5SPavan Nikhilesh 		}
552*6d239dd5SPavan Nikhilesh 		w->port_id = i;
553*6d239dd5SPavan Nikhilesh 	}
554*6d239dd5SPavan Nikhilesh 	/*
555*6d239dd5SPavan Nikhilesh 	 * Reduce the load on ingress event queue by splitting the traffic
556*6d239dd5SPavan Nikhilesh 	 * across multiple event queues.
557*6d239dd5SPavan Nikhilesh 	 * for example, nb_stages =  2 and nb_ethdev = 2 then
558*6d239dd5SPavan Nikhilesh 	 *
559*6d239dd5SPavan Nikhilesh 	 *	nb_queues = (2 * 2) + 2 = 6 (non atq)
560*6d239dd5SPavan Nikhilesh 	 *	rx_stride = 3
561*6d239dd5SPavan Nikhilesh 	 *
562*6d239dd5SPavan Nikhilesh 	 * So, traffic is split across queue 0 and queue 3 since queue id for
563*6d239dd5SPavan Nikhilesh 	 * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above
564*6d239dd5SPavan Nikhilesh 	 * case eth port 0, 1 will inject packets into event queue 0, 3
565*6d239dd5SPavan Nikhilesh 	 * respectively.
566*6d239dd5SPavan Nikhilesh 	 *
567*6d239dd5SPavan Nikhilesh 	 * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx.
568*6d239dd5SPavan Nikhilesh 	 */
569*6d239dd5SPavan Nikhilesh 	cdata.rx_stride = atq ? 1 : nb_slots;
570*6d239dd5SPavan Nikhilesh 	ret = rte_event_dev_service_id_get(dev_id,
571*6d239dd5SPavan Nikhilesh 				&fdata->evdev_service_id);
572*6d239dd5SPavan Nikhilesh 	if (ret != -ESRCH && ret != 0) {
573*6d239dd5SPavan Nikhilesh 		printf("Error getting the service ID\n");
574*6d239dd5SPavan Nikhilesh 		return -1;
575*6d239dd5SPavan Nikhilesh 	}
576*6d239dd5SPavan Nikhilesh 	rte_service_runstate_set(fdata->evdev_service_id, 1);
577*6d239dd5SPavan Nikhilesh 	rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
578*6d239dd5SPavan Nikhilesh 	if (rte_event_dev_start(dev_id) < 0) {
579*6d239dd5SPavan Nikhilesh 		printf("Error starting eventdev\n");
580*6d239dd5SPavan Nikhilesh 		return -1;
581*6d239dd5SPavan Nikhilesh 	}
582*6d239dd5SPavan Nikhilesh 
583*6d239dd5SPavan Nikhilesh 	return dev_id;
584*6d239dd5SPavan Nikhilesh }
585*6d239dd5SPavan Nikhilesh 
586*6d239dd5SPavan Nikhilesh 
587*6d239dd5SPavan Nikhilesh struct rx_adptr_services {
588*6d239dd5SPavan Nikhilesh 	uint16_t nb_rx_adptrs;
589*6d239dd5SPavan Nikhilesh 	uint32_t *rx_adpt_arr;
590*6d239dd5SPavan Nikhilesh };
591*6d239dd5SPavan Nikhilesh 
592*6d239dd5SPavan Nikhilesh static int32_t
593*6d239dd5SPavan Nikhilesh service_rx_adapter(void *arg)
594*6d239dd5SPavan Nikhilesh {
595*6d239dd5SPavan Nikhilesh 	int i;
596*6d239dd5SPavan Nikhilesh 	struct rx_adptr_services *adptr_services = arg;
597*6d239dd5SPavan Nikhilesh 
598*6d239dd5SPavan Nikhilesh 	for (i = 0; i < adptr_services->nb_rx_adptrs; i++)
599*6d239dd5SPavan Nikhilesh 		rte_service_run_iter_on_app_lcore(
600*6d239dd5SPavan Nikhilesh 				adptr_services->rx_adpt_arr[i], 1);
601*6d239dd5SPavan Nikhilesh 	return 0;
602*6d239dd5SPavan Nikhilesh }
603*6d239dd5SPavan Nikhilesh 
604*6d239dd5SPavan Nikhilesh static void
605*6d239dd5SPavan Nikhilesh init_rx_adapter(uint16_t nb_ports)
606*6d239dd5SPavan Nikhilesh {
607*6d239dd5SPavan Nikhilesh 	int i;
608*6d239dd5SPavan Nikhilesh 	int ret;
609*6d239dd5SPavan Nikhilesh 	uint8_t evdev_id = 0;
610*6d239dd5SPavan Nikhilesh 	struct rx_adptr_services *adptr_services = NULL;
611*6d239dd5SPavan Nikhilesh 	struct rte_event_dev_info dev_info;
612*6d239dd5SPavan Nikhilesh 
613*6d239dd5SPavan Nikhilesh 	ret = rte_event_dev_info_get(evdev_id, &dev_info);
614*6d239dd5SPavan Nikhilesh 	adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);
615*6d239dd5SPavan Nikhilesh 
616*6d239dd5SPavan Nikhilesh 	struct rte_event_port_conf rx_p_conf = {
617*6d239dd5SPavan Nikhilesh 		.dequeue_depth = 8,
618*6d239dd5SPavan Nikhilesh 		.enqueue_depth = 8,
619*6d239dd5SPavan Nikhilesh 		.new_event_threshold = 1200,
620*6d239dd5SPavan Nikhilesh 	};
621*6d239dd5SPavan Nikhilesh 
622*6d239dd5SPavan Nikhilesh 	if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
623*6d239dd5SPavan Nikhilesh 		rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
624*6d239dd5SPavan Nikhilesh 	if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
625*6d239dd5SPavan Nikhilesh 		rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
626*6d239dd5SPavan Nikhilesh 
627*6d239dd5SPavan Nikhilesh 
628*6d239dd5SPavan Nikhilesh 	struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
629*6d239dd5SPavan Nikhilesh 		.ev.sched_type = cdata.queue_type,
630*6d239dd5SPavan Nikhilesh 	};
631*6d239dd5SPavan Nikhilesh 
632*6d239dd5SPavan Nikhilesh 	for (i = 0; i < nb_ports; i++) {
633*6d239dd5SPavan Nikhilesh 		uint32_t cap;
634*6d239dd5SPavan Nikhilesh 		uint32_t service_id;
635*6d239dd5SPavan Nikhilesh 
636*6d239dd5SPavan Nikhilesh 		ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
637*6d239dd5SPavan Nikhilesh 		if (ret)
638*6d239dd5SPavan Nikhilesh 			rte_exit(EXIT_FAILURE,
639*6d239dd5SPavan Nikhilesh 					"failed to create rx adapter[%d]",
640*6d239dd5SPavan Nikhilesh 					cdata.rx_adapter_id);
641*6d239dd5SPavan Nikhilesh 
642*6d239dd5SPavan Nikhilesh 		ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
643*6d239dd5SPavan Nikhilesh 		if (ret)
644*6d239dd5SPavan Nikhilesh 			rte_exit(EXIT_FAILURE,
645*6d239dd5SPavan Nikhilesh 					"failed to get event rx adapter "
646*6d239dd5SPavan Nikhilesh 					"capabilities");
647*6d239dd5SPavan Nikhilesh 
648*6d239dd5SPavan Nikhilesh 		queue_conf.ev.queue_id = cdata.rx_stride ?
649*6d239dd5SPavan Nikhilesh 			(i * cdata.rx_stride)
650*6d239dd5SPavan Nikhilesh 			: (uint8_t)cdata.qid[0];
651*6d239dd5SPavan Nikhilesh 
652*6d239dd5SPavan Nikhilesh 		ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf);
653*6d239dd5SPavan Nikhilesh 		if (ret)
654*6d239dd5SPavan Nikhilesh 			rte_exit(EXIT_FAILURE,
655*6d239dd5SPavan Nikhilesh 					"Failed to add queues to Rx adapter");
656*6d239dd5SPavan Nikhilesh 
657*6d239dd5SPavan Nikhilesh 
658*6d239dd5SPavan Nikhilesh 		/* Producer needs to be scheduled. */
659*6d239dd5SPavan Nikhilesh 		if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
660*6d239dd5SPavan Nikhilesh 			ret = rte_event_eth_rx_adapter_service_id_get(i,
661*6d239dd5SPavan Nikhilesh 					&service_id);
662*6d239dd5SPavan Nikhilesh 			if (ret != -ESRCH && ret != 0) {
663*6d239dd5SPavan Nikhilesh 				rte_exit(EXIT_FAILURE,
664*6d239dd5SPavan Nikhilesh 				"Error getting the service ID for rx adptr\n");
665*6d239dd5SPavan Nikhilesh 			}
666*6d239dd5SPavan Nikhilesh 
667*6d239dd5SPavan Nikhilesh 			rte_service_runstate_set(service_id, 1);
668*6d239dd5SPavan Nikhilesh 			rte_service_set_runstate_mapped_check(service_id, 0);
669*6d239dd5SPavan Nikhilesh 
670*6d239dd5SPavan Nikhilesh 			adptr_services->nb_rx_adptrs++;
671*6d239dd5SPavan Nikhilesh 			adptr_services->rx_adpt_arr = rte_realloc(
672*6d239dd5SPavan Nikhilesh 					adptr_services->rx_adpt_arr,
673*6d239dd5SPavan Nikhilesh 					adptr_services->nb_rx_adptrs *
674*6d239dd5SPavan Nikhilesh 					sizeof(uint32_t), 0);
675*6d239dd5SPavan Nikhilesh 			adptr_services->rx_adpt_arr[
676*6d239dd5SPavan Nikhilesh 				adptr_services->nb_rx_adptrs - 1] =
677*6d239dd5SPavan Nikhilesh 				service_id;
678*6d239dd5SPavan Nikhilesh 		}
679*6d239dd5SPavan Nikhilesh 
680*6d239dd5SPavan Nikhilesh 		ret = rte_event_eth_rx_adapter_start(i);
681*6d239dd5SPavan Nikhilesh 		if (ret)
682*6d239dd5SPavan Nikhilesh 			rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
683*6d239dd5SPavan Nikhilesh 					cdata.rx_adapter_id);
684*6d239dd5SPavan Nikhilesh 	}
685*6d239dd5SPavan Nikhilesh 
686*6d239dd5SPavan Nikhilesh 	if (adptr_services->nb_rx_adptrs) {
687*6d239dd5SPavan Nikhilesh 		struct rte_service_spec service;
688*6d239dd5SPavan Nikhilesh 
689*6d239dd5SPavan Nikhilesh 		memset(&service, 0, sizeof(struct rte_service_spec));
690*6d239dd5SPavan Nikhilesh 		snprintf(service.name, sizeof(service.name), "rx_service");
691*6d239dd5SPavan Nikhilesh 		service.callback = service_rx_adapter;
692*6d239dd5SPavan Nikhilesh 		service.callback_userdata = (void *)adptr_services;
693*6d239dd5SPavan Nikhilesh 
694*6d239dd5SPavan Nikhilesh 		int32_t ret = rte_service_component_register(&service,
695*6d239dd5SPavan Nikhilesh 				&fdata->rxadptr_service_id);
696*6d239dd5SPavan Nikhilesh 		if (ret)
697*6d239dd5SPavan Nikhilesh 			rte_exit(EXIT_FAILURE,
698*6d239dd5SPavan Nikhilesh 				"Rx adapter[%d] service register failed",
699*6d239dd5SPavan Nikhilesh 				cdata.rx_adapter_id);
700*6d239dd5SPavan Nikhilesh 
701*6d239dd5SPavan Nikhilesh 		rte_service_runstate_set(fdata->rxadptr_service_id, 1);
702*6d239dd5SPavan Nikhilesh 		rte_service_component_runstate_set(fdata->rxadptr_service_id,
703*6d239dd5SPavan Nikhilesh 				1);
704*6d239dd5SPavan Nikhilesh 		rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id,
705*6d239dd5SPavan Nikhilesh 				0);
706*6d239dd5SPavan Nikhilesh 	} else {
707*6d239dd5SPavan Nikhilesh 		memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
708*6d239dd5SPavan Nikhilesh 		rte_free(adptr_services);
709*6d239dd5SPavan Nikhilesh 	}
710*6d239dd5SPavan Nikhilesh 
711*6d239dd5SPavan Nikhilesh 	if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
712*6d239dd5SPavan Nikhilesh 			(dev_info.event_dev_cap &
713*6d239dd5SPavan Nikhilesh 			 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
714*6d239dd5SPavan Nikhilesh 		fdata->cap.scheduler = NULL;
715*6d239dd5SPavan Nikhilesh 
716*6d239dd5SPavan Nikhilesh 	if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
717*6d239dd5SPavan Nikhilesh 		memset(fdata->sched_core, 0,
718*6d239dd5SPavan Nikhilesh 				sizeof(unsigned int) * MAX_NUM_CORE);
719*6d239dd5SPavan Nikhilesh }
720*6d239dd5SPavan Nikhilesh 
721*6d239dd5SPavan Nikhilesh static void
722*6d239dd5SPavan Nikhilesh worker_tx_opt_check(void)
723*6d239dd5SPavan Nikhilesh {
724*6d239dd5SPavan Nikhilesh 	int i;
725*6d239dd5SPavan Nikhilesh 	int ret;
726*6d239dd5SPavan Nikhilesh 	uint32_t cap = 0;
727*6d239dd5SPavan Nikhilesh 	uint8_t rx_needed = 0;
728*6d239dd5SPavan Nikhilesh 	struct rte_event_dev_info eventdev_info;
729*6d239dd5SPavan Nikhilesh 
730*6d239dd5SPavan Nikhilesh 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
731*6d239dd5SPavan Nikhilesh 	rte_event_dev_info_get(0, &eventdev_info);
732*6d239dd5SPavan Nikhilesh 
733*6d239dd5SPavan Nikhilesh 	if (cdata.all_type_queues && !(eventdev_info.event_dev_cap &
734*6d239dd5SPavan Nikhilesh 				RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
735*6d239dd5SPavan Nikhilesh 		rte_exit(EXIT_FAILURE,
736*6d239dd5SPavan Nikhilesh 				"Event dev doesn't support all type queues\n");
737*6d239dd5SPavan Nikhilesh 
738*6d239dd5SPavan Nikhilesh 	for (i = 0; i < rte_eth_dev_count(); i++) {
739*6d239dd5SPavan Nikhilesh 		ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
740*6d239dd5SPavan Nikhilesh 		if (ret)
741*6d239dd5SPavan Nikhilesh 			rte_exit(EXIT_FAILURE,
742*6d239dd5SPavan Nikhilesh 					"failed to get event rx adapter "
743*6d239dd5SPavan Nikhilesh 					"capabilities");
744*6d239dd5SPavan Nikhilesh 		rx_needed |=
745*6d239dd5SPavan Nikhilesh 			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
746*6d239dd5SPavan Nikhilesh 	}
747*6d239dd5SPavan Nikhilesh 
748*6d239dd5SPavan Nikhilesh 	if (cdata.worker_lcore_mask == 0 ||
749*6d239dd5SPavan Nikhilesh 			(rx_needed && cdata.rx_lcore_mask == 0) ||
750*6d239dd5SPavan Nikhilesh 			(cdata.sched_lcore_mask == 0 &&
751*6d239dd5SPavan Nikhilesh 			 !(eventdev_info.event_dev_cap &
752*6d239dd5SPavan Nikhilesh 				 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
753*6d239dd5SPavan Nikhilesh 		printf("Core part of pipeline was not assigned any cores. "
754*6d239dd5SPavan Nikhilesh 			"This will stall the pipeline, please check core masks "
755*6d239dd5SPavan Nikhilesh 			"(use -h for details on setting core masks):\n"
756*6d239dd5SPavan Nikhilesh 			"\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
757*6d239dd5SPavan Nikhilesh 			"\n\tworkers: %"PRIu64"\n",
758*6d239dd5SPavan Nikhilesh 			cdata.rx_lcore_mask, cdata.tx_lcore_mask,
759*6d239dd5SPavan Nikhilesh 			cdata.sched_lcore_mask,
760*6d239dd5SPavan Nikhilesh 			cdata.worker_lcore_mask);
761*6d239dd5SPavan Nikhilesh 		rte_exit(-1, "Fix core masks\n");
762*6d239dd5SPavan Nikhilesh 	}
763*6d239dd5SPavan Nikhilesh }
764*6d239dd5SPavan Nikhilesh 
765*6d239dd5SPavan Nikhilesh static worker_loop
766*6d239dd5SPavan Nikhilesh get_worker_loop_single_burst(uint8_t atq)
767*6d239dd5SPavan Nikhilesh {
768*6d239dd5SPavan Nikhilesh 	if (atq)
769*6d239dd5SPavan Nikhilesh 		return worker_do_tx_single_burst_atq;
770*6d239dd5SPavan Nikhilesh 
771*6d239dd5SPavan Nikhilesh 	return worker_do_tx_single_burst;
772*6d239dd5SPavan Nikhilesh }
773*6d239dd5SPavan Nikhilesh 
774*6d239dd5SPavan Nikhilesh static worker_loop
775*6d239dd5SPavan Nikhilesh get_worker_loop_single_non_burst(uint8_t atq)
776*6d239dd5SPavan Nikhilesh {
777*6d239dd5SPavan Nikhilesh 	if (atq)
778*6d239dd5SPavan Nikhilesh 		return worker_do_tx_single_atq;
779*6d239dd5SPavan Nikhilesh 
780*6d239dd5SPavan Nikhilesh 	return worker_do_tx_single;
781*6d239dd5SPavan Nikhilesh }
782*6d239dd5SPavan Nikhilesh 
783*6d239dd5SPavan Nikhilesh static worker_loop
784*6d239dd5SPavan Nikhilesh get_worker_loop_burst(uint8_t atq)
785*6d239dd5SPavan Nikhilesh {
786*6d239dd5SPavan Nikhilesh 	if (atq)
787*6d239dd5SPavan Nikhilesh 		return worker_do_tx_burst_atq;
788*6d239dd5SPavan Nikhilesh 
789*6d239dd5SPavan Nikhilesh 	return worker_do_tx_burst;
790*6d239dd5SPavan Nikhilesh }
791*6d239dd5SPavan Nikhilesh 
792*6d239dd5SPavan Nikhilesh static worker_loop
793*6d239dd5SPavan Nikhilesh get_worker_loop_non_burst(uint8_t atq)
794*6d239dd5SPavan Nikhilesh {
795*6d239dd5SPavan Nikhilesh 	if (atq)
796*6d239dd5SPavan Nikhilesh 		return worker_do_tx_atq;
797*6d239dd5SPavan Nikhilesh 
798*6d239dd5SPavan Nikhilesh 	return worker_do_tx;
799*6d239dd5SPavan Nikhilesh }
800*6d239dd5SPavan Nikhilesh 
801*6d239dd5SPavan Nikhilesh static worker_loop
802*6d239dd5SPavan Nikhilesh get_worker_single_stage(bool burst)
803*6d239dd5SPavan Nikhilesh {
804*6d239dd5SPavan Nikhilesh 	uint8_t atq = cdata.all_type_queues ? 1 : 0;
805*6d239dd5SPavan Nikhilesh 
806*6d239dd5SPavan Nikhilesh 	if (burst)
807*6d239dd5SPavan Nikhilesh 		return get_worker_loop_single_burst(atq);
808*6d239dd5SPavan Nikhilesh 
809*6d239dd5SPavan Nikhilesh 	return get_worker_loop_single_non_burst(atq);
810*6d239dd5SPavan Nikhilesh }
811*6d239dd5SPavan Nikhilesh 
812*6d239dd5SPavan Nikhilesh static worker_loop
813*6d239dd5SPavan Nikhilesh get_worker_multi_stage(bool burst)
814*6d239dd5SPavan Nikhilesh {
815*6d239dd5SPavan Nikhilesh 	uint8_t atq = cdata.all_type_queues ? 1 : 0;
816*6d239dd5SPavan Nikhilesh 
817*6d239dd5SPavan Nikhilesh 	if (burst)
818*6d239dd5SPavan Nikhilesh 		return get_worker_loop_burst(atq);
819*6d239dd5SPavan Nikhilesh 
820*6d239dd5SPavan Nikhilesh 	return get_worker_loop_non_burst(atq);
821*6d239dd5SPavan Nikhilesh }
822*6d239dd5SPavan Nikhilesh 
823*6d239dd5SPavan Nikhilesh void
824*6d239dd5SPavan Nikhilesh set_worker_tx_setup_data(struct setup_data *caps, bool burst)
825*6d239dd5SPavan Nikhilesh {
826*6d239dd5SPavan Nikhilesh 	if (cdata.num_stages == 1)
827*6d239dd5SPavan Nikhilesh 		caps->worker = get_worker_single_stage(burst);
828*6d239dd5SPavan Nikhilesh 	else
829*6d239dd5SPavan Nikhilesh 		caps->worker = get_worker_multi_stage(burst);
830*6d239dd5SPavan Nikhilesh 
831*6d239dd5SPavan Nikhilesh 	memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
832*6d239dd5SPavan Nikhilesh 
833*6d239dd5SPavan Nikhilesh 	caps->check_opt = worker_tx_opt_check;
834*6d239dd5SPavan Nikhilesh 	caps->consumer = NULL;
835*6d239dd5SPavan Nikhilesh 	caps->scheduler = schedule_devices;
836*6d239dd5SPavan Nikhilesh 	caps->evdev_setup = setup_eventdev_worker_tx;
837*6d239dd5SPavan Nikhilesh 	caps->adptr_setup = init_rx_adapter;
838*6d239dd5SPavan Nikhilesh }
839