xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 515cd4a488b6a0c6e40d20e6b10d8e89657dc23f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <ctype.h>
6 #include <stdlib.h>
7 #if defined(LINUX)
8 #include <sys/epoll.h>
9 #endif
10 #include <unistd.h>
11 
12 #include <rte_cycles.h>
13 #include <rte_common.h>
14 #include <dev_driver.h>
15 #include <rte_errno.h>
16 #include <ethdev_driver.h>
17 #include <rte_log.h>
18 #include <rte_malloc.h>
19 #include <rte_service_component.h>
20 #include <rte_thash.h>
21 #include <rte_interrupts.h>
22 #include <rte_mbuf_dyn.h>
23 #include <rte_telemetry.h>
24 
25 #include "rte_eventdev.h"
26 #include "eventdev_pmd.h"
27 #include "eventdev_trace.h"
28 #include "rte_event_eth_rx_adapter.h"
29 
30 #define BATCH_SIZE		32
31 #define BLOCK_CNT_THRESHOLD	10
32 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
33 #define MAX_VECTOR_SIZE		1024
34 #define MIN_VECTOR_SIZE		4
35 #define MAX_VECTOR_NS		1E9
36 #define MIN_VECTOR_NS		1E5
37 
38 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
39 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
40 
41 #define RSS_KEY_SIZE	40
42 /* value written to intr thread pipe to signal thread exit */
43 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
44 /* Sentinel value to detect initialized file handle */
45 #define INIT_FD		-1
46 
47 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
48 
49 /*
50  * Used to store port and queue ID of interrupting Rx queue
51  */
52 union queue_data {
53 	RTE_STD_C11
54 	void *ptr;
55 	struct {
56 		uint16_t port;
57 		uint16_t queue;
58 	};
59 };
60 
61 /*
62  * There is an instance of this struct per polled Rx queue added to the
63  * adapter
64  */
65 struct eth_rx_poll_entry {
66 	/* Eth port to poll */
67 	uint16_t eth_dev_id;
68 	/* Eth rx queue to poll */
69 	uint16_t eth_rx_qid;
70 };
71 
72 struct eth_rx_vector_data {
73 	TAILQ_ENTRY(eth_rx_vector_data) next;
74 	uint16_t port;
75 	uint16_t queue;
76 	uint16_t max_vector_count;
77 	uint64_t event;
78 	uint64_t ts;
79 	uint64_t vector_timeout_ticks;
80 	struct rte_mempool *vector_pool;
81 	struct rte_event_vector *vector_ev;
82 } __rte_cache_aligned;
83 
84 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
85 
86 /* Instance per adapter */
87 struct eth_event_enqueue_buffer {
88 	/* Count of events in this buffer */
89 	uint16_t count;
90 	/* Array of events in this buffer */
91 	struct rte_event *events;
92 	/* size of event buffer */
93 	uint16_t events_size;
94 	/* Event enqueue happens from head */
95 	uint16_t head;
96 	/* New packets from rte_eth_rx_burst is enqued from tail */
97 	uint16_t tail;
98 	/* last element in the buffer before rollover */
99 	uint16_t last;
100 	uint16_t last_mask;
101 };
102 
103 struct event_eth_rx_adapter {
104 	/* RSS key */
105 	uint8_t rss_key_be[RSS_KEY_SIZE];
106 	/* Event device identifier */
107 	uint8_t eventdev_id;
108 	/* Event port identifier */
109 	uint8_t event_port_id;
110 	/* Flag indicating per rxq event buffer */
111 	bool use_queue_event_buf;
112 	/* Per ethernet device structure */
113 	struct eth_device_info *eth_devices;
114 	/* Lock to serialize config updates with service function */
115 	rte_spinlock_t rx_lock;
116 	/* Max mbufs processed in any service function invocation */
117 	uint32_t max_nb_rx;
118 	/* Receive queues that need to be polled */
119 	struct eth_rx_poll_entry *eth_rx_poll;
120 	/* Size of the eth_rx_poll array */
121 	uint16_t num_rx_polled;
122 	/* Weighted round robin schedule */
123 	uint32_t *wrr_sched;
124 	/* wrr_sched[] size */
125 	uint32_t wrr_len;
126 	/* Next entry in wrr[] to begin polling */
127 	uint32_t wrr_pos;
128 	/* Event burst buffer */
129 	struct eth_event_enqueue_buffer event_enqueue_buffer;
130 	/* Vector enable flag */
131 	uint8_t ena_vector;
132 	/* Timestamp of previous vector expiry list traversal */
133 	uint64_t prev_expiry_ts;
134 	/* Minimum ticks to wait before traversing expiry list */
135 	uint64_t vector_tmo_ticks;
136 	/* vector list */
137 	struct eth_rx_vector_data_list vector_list;
138 	/* Per adapter stats */
139 	struct rte_event_eth_rx_adapter_stats stats;
140 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
141 	uint16_t enq_block_count;
142 	/* Block start ts */
143 	uint64_t rx_enq_block_start_ts;
144 	/* epoll fd used to wait for Rx interrupts */
145 	int epd;
146 	/* Num of interrupt driven interrupt queues */
147 	uint32_t num_rx_intr;
148 	/* Used to send <dev id, queue id> of interrupting Rx queues from
149 	 * the interrupt thread to the Rx thread
150 	 */
151 	struct rte_ring *intr_ring;
152 	/* Rx Queue data (dev id, queue id) for the last non-empty
153 	 * queue polled
154 	 */
155 	union queue_data qd;
156 	/* queue_data is valid */
157 	int qd_valid;
158 	/* Interrupt ring lock, synchronizes Rx thread
159 	 * and interrupt thread
160 	 */
161 	rte_spinlock_t intr_ring_lock;
162 	/* event array passed to rte_poll_wait */
163 	struct rte_epoll_event *epoll_events;
164 	/* Count of interrupt vectors in use */
165 	uint32_t num_intr_vec;
166 	/* Thread blocked on Rx interrupts */
167 	pthread_t rx_intr_thread;
168 	/* Configuration callback for rte_service configuration */
169 	rte_event_eth_rx_adapter_conf_cb conf_cb;
170 	/* Configuration callback argument */
171 	void *conf_arg;
172 	/* Set if  default_cb is being used */
173 	int default_cb_arg;
174 	/* Service initialization state */
175 	uint8_t service_inited;
176 	/* Total count of Rx queues in adapter */
177 	uint32_t nb_queues;
178 	/* Memory allocation name */
179 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
180 	/* Socket identifier cached from eventdev */
181 	int socket_id;
182 	/* Per adapter EAL service */
183 	uint32_t service_id;
184 	/* Adapter started flag */
185 	uint8_t rxa_started;
186 	/* Adapter ID */
187 	uint8_t id;
188 } __rte_cache_aligned;
189 
190 /* Per eth device */
191 struct eth_device_info {
192 	struct rte_eth_dev *dev;
193 	struct eth_rx_queue_info *rx_queue;
194 	/* Rx callback */
195 	rte_event_eth_rx_adapter_cb_fn cb_fn;
196 	/* Rx callback argument */
197 	void *cb_arg;
198 	/* Set if ethdev->eventdev packet transfer uses a
199 	 * hardware mechanism
200 	 */
201 	uint8_t internal_event_port;
202 	/* Set if the adapter is processing rx queues for
203 	 * this eth device and packet processing has been
204 	 * started, allows for the code to know if the PMD
205 	 * rx_adapter_stop callback needs to be invoked
206 	 */
207 	uint8_t dev_rx_started;
208 	/* Number of queues added for this device */
209 	uint16_t nb_dev_queues;
210 	/* Number of poll based queues
211 	 * If nb_rx_poll > 0, the start callback will
212 	 * be invoked if not already invoked
213 	 */
214 	uint16_t nb_rx_poll;
215 	/* Number of interrupt based queues
216 	 * If nb_rx_intr > 0, the start callback will
217 	 * be invoked if not already invoked.
218 	 */
219 	uint16_t nb_rx_intr;
220 	/* Number of queues that use the shared interrupt */
221 	uint16_t nb_shared_intr;
222 	/* sum(wrr(q)) for all queues within the device
223 	 * useful when deleting all device queues
224 	 */
225 	uint32_t wrr_len;
226 	/* Intr based queue index to start polling from, this is used
227 	 * if the number of shared interrupts is non-zero
228 	 */
229 	uint16_t next_q_idx;
230 	/* Intr based queue indices */
231 	uint16_t *intr_queue;
232 	/* device generates per Rx queue interrupt for queue index
233 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
234 	 */
235 	int multi_intr_cap;
236 	/* shared interrupt enabled */
237 	int shared_intr_enabled;
238 };
239 
240 /* Per Rx queue */
241 struct eth_rx_queue_info {
242 	int queue_enabled;	/* True if added */
243 	int intr_enabled;
244 	uint8_t ena_vector;
245 	uint16_t wt;		/* Polling weight */
246 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
247 	uint64_t event;
248 	struct eth_rx_vector_data vector_data;
249 	struct eth_event_enqueue_buffer *event_buf;
250 	/* use adapter stats struct for queue level stats,
251 	 * as same stats need to be updated for adapter and queue
252 	 */
253 	struct rte_event_eth_rx_adapter_stats *stats;
254 };
255 
256 static struct event_eth_rx_adapter **event_eth_rx_adapter;
257 
258 /* Enable dynamic timestamp field in mbuf */
259 static uint64_t event_eth_rx_timestamp_dynflag;
260 static int event_eth_rx_timestamp_dynfield_offset = -1;
261 
262 static inline rte_mbuf_timestamp_t *
263 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
264 {
265 	return RTE_MBUF_DYNFIELD(mbuf,
266 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
267 }
268 
269 static inline int
270 rxa_validate_id(uint8_t id)
271 {
272 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
273 }
274 
275 static inline struct eth_event_enqueue_buffer *
276 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
277 		  uint16_t rx_queue_id,
278 		  struct rte_event_eth_rx_adapter_stats **stats)
279 {
280 	if (rx_adapter->use_queue_event_buf) {
281 		struct eth_device_info *dev_info =
282 			&rx_adapter->eth_devices[eth_dev_id];
283 		*stats = dev_info->rx_queue[rx_queue_id].stats;
284 		return dev_info->rx_queue[rx_queue_id].event_buf;
285 	} else {
286 		*stats = &rx_adapter->stats;
287 		return &rx_adapter->event_enqueue_buffer;
288 	}
289 }
290 
291 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
292 	if (!rxa_validate_id(id)) { \
293 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
294 		return retval; \
295 	} \
296 } while (0)
297 
298 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(id, retval) do { \
299 	if (!rxa_validate_id(id)) { \
300 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
301 		ret = retval; \
302 		goto error; \
303 	} \
304 } while (0)
305 
306 #define RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, retval) do { \
307 	if ((token) == NULL || strlen(token) == 0 || !isdigit(*token)) { \
308 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter token\n"); \
309 		ret = retval; \
310 		goto error; \
311 	} \
312 } while (0)
313 
314 #define RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(port_id, retval) do { \
315 	if (!rte_eth_dev_is_valid_port(port_id)) { \
316 		RTE_ETHDEV_LOG(ERR, "Invalid port_id=%u\n", port_id); \
317 		ret = retval; \
318 		goto error; \
319 	} \
320 } while (0)
321 
322 static inline int
323 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
324 {
325 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
326 }
327 
328 /* Greatest common divisor */
329 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
330 {
331 	uint16_t r = a % b;
332 
333 	return r ? rxa_gcd_u16(b, r) : b;
334 }
335 
336 /* Returns the next queue in the polling sequence
337  *
338  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
339  */
340 static int
341 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
342 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
343 	     uint16_t gcd, int prev)
344 {
345 	int i = prev;
346 	uint16_t w;
347 
348 	while (1) {
349 		uint16_t q;
350 		uint16_t d;
351 
352 		i = (i + 1) % n;
353 		if (i == 0) {
354 			*cw = *cw - gcd;
355 			if (*cw <= 0)
356 				*cw = max_wt;
357 		}
358 
359 		q = eth_rx_poll[i].eth_rx_qid;
360 		d = eth_rx_poll[i].eth_dev_id;
361 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
362 
363 		if ((int)w >= *cw)
364 			return i;
365 	}
366 }
367 
368 static inline int
369 rxa_shared_intr(struct eth_device_info *dev_info,
370 	int rx_queue_id)
371 {
372 	int multi_intr_cap;
373 
374 	if (dev_info->dev->intr_handle == NULL)
375 		return 0;
376 
377 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
378 	return !multi_intr_cap ||
379 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
380 }
381 
382 static inline int
383 rxa_intr_queue(struct eth_device_info *dev_info,
384 	int rx_queue_id)
385 {
386 	struct eth_rx_queue_info *queue_info;
387 
388 	queue_info = &dev_info->rx_queue[rx_queue_id];
389 	return dev_info->rx_queue &&
390 		!dev_info->internal_event_port &&
391 		queue_info->queue_enabled && queue_info->wt == 0;
392 }
393 
394 static inline int
395 rxa_polled_queue(struct eth_device_info *dev_info,
396 	int rx_queue_id)
397 {
398 	struct eth_rx_queue_info *queue_info;
399 
400 	queue_info = &dev_info->rx_queue[rx_queue_id];
401 	return !dev_info->internal_event_port &&
402 		dev_info->rx_queue &&
403 		queue_info->queue_enabled && queue_info->wt != 0;
404 }
405 
406 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
407 static int
408 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
409 {
410 	uint16_t i;
411 	int n, s;
412 	uint16_t nbq;
413 
414 	nbq = dev_info->dev->data->nb_rx_queues;
415 	n = 0; /* non shared count */
416 	s = 0; /* shared count */
417 
418 	if (rx_queue_id == -1) {
419 		for (i = 0; i < nbq; i++) {
420 			if (!rxa_shared_intr(dev_info, i))
421 				n += add ? !rxa_intr_queue(dev_info, i) :
422 					rxa_intr_queue(dev_info, i);
423 			else
424 				s += add ? !rxa_intr_queue(dev_info, i) :
425 					rxa_intr_queue(dev_info, i);
426 		}
427 
428 		if (s > 0) {
429 			if ((add && dev_info->nb_shared_intr == 0) ||
430 				(!add && dev_info->nb_shared_intr))
431 				n += 1;
432 		}
433 	} else {
434 		if (!rxa_shared_intr(dev_info, rx_queue_id))
435 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
436 				rxa_intr_queue(dev_info, rx_queue_id);
437 		else
438 			n = add ? !dev_info->nb_shared_intr :
439 				dev_info->nb_shared_intr == 1;
440 	}
441 
442 	return add ? n : -n;
443 }
444 
445 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
446  */
447 static void
448 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
449 			  struct eth_device_info *dev_info, int rx_queue_id,
450 			  uint32_t *nb_rx_intr)
451 {
452 	uint32_t intr_diff;
453 
454 	if (rx_queue_id == -1)
455 		intr_diff = dev_info->nb_rx_intr;
456 	else
457 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
458 
459 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
460 }
461 
462 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
463  * interrupt queues could currently be poll mode Rx queues
464  */
465 static void
466 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
467 			  struct eth_device_info *dev_info, int rx_queue_id,
468 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
469 			  uint32_t *nb_wrr)
470 {
471 	uint32_t intr_diff;
472 	uint32_t poll_diff;
473 	uint32_t wrr_len_diff;
474 
475 	if (rx_queue_id == -1) {
476 		intr_diff = dev_info->dev->data->nb_rx_queues -
477 						dev_info->nb_rx_intr;
478 		poll_diff = dev_info->nb_rx_poll;
479 		wrr_len_diff = dev_info->wrr_len;
480 	} else {
481 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
482 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
483 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
484 					0;
485 	}
486 
487 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
488 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
489 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
490 }
491 
492 /* Calculate size of the eth_rx_poll and wrr_sched arrays
493  * after deleting poll mode rx queues
494  */
495 static void
496 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
497 			  struct eth_device_info *dev_info, int rx_queue_id,
498 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
499 {
500 	uint32_t poll_diff;
501 	uint32_t wrr_len_diff;
502 
503 	if (rx_queue_id == -1) {
504 		poll_diff = dev_info->nb_rx_poll;
505 		wrr_len_diff = dev_info->wrr_len;
506 	} else {
507 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
508 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
509 					0;
510 	}
511 
512 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
513 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
514 }
515 
516 /* Calculate nb_rx_* after adding poll mode rx queues
517  */
518 static void
519 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
520 			  struct eth_device_info *dev_info, int rx_queue_id,
521 			  uint16_t wt, uint32_t *nb_rx_poll,
522 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
523 {
524 	uint32_t intr_diff;
525 	uint32_t poll_diff;
526 	uint32_t wrr_len_diff;
527 
528 	if (rx_queue_id == -1) {
529 		intr_diff = dev_info->nb_rx_intr;
530 		poll_diff = dev_info->dev->data->nb_rx_queues -
531 						dev_info->nb_rx_poll;
532 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
533 				- dev_info->wrr_len;
534 	} else {
535 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
536 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
537 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
538 				wt - dev_info->rx_queue[rx_queue_id].wt :
539 				wt;
540 	}
541 
542 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
543 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
544 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
545 }
546 
547 /* Calculate nb_rx_* after adding rx_queue_id */
548 static void
549 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
550 		     struct eth_device_info *dev_info, int rx_queue_id,
551 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
552 		     uint32_t *nb_wrr)
553 {
554 	if (wt != 0)
555 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
556 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
557 	else
558 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
559 					nb_rx_poll, nb_rx_intr, nb_wrr);
560 }
561 
562 /* Calculate nb_rx_* after deleting rx_queue_id */
563 static void
564 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
565 		     struct eth_device_info *dev_info, int rx_queue_id,
566 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
567 		     uint32_t *nb_wrr)
568 {
569 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
570 				nb_wrr);
571 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
572 				nb_rx_intr);
573 }
574 
575 /*
576  * Allocate the rx_poll array
577  */
578 static struct eth_rx_poll_entry *
579 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
580 {
581 	size_t len;
582 
583 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
584 							RTE_CACHE_LINE_SIZE);
585 	return  rte_zmalloc_socket(rx_adapter->mem_name,
586 				len,
587 				RTE_CACHE_LINE_SIZE,
588 				rx_adapter->socket_id);
589 }
590 
591 /*
592  * Allocate the WRR array
593  */
594 static uint32_t *
595 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
596 {
597 	size_t len;
598 
599 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
600 			RTE_CACHE_LINE_SIZE);
601 	return  rte_zmalloc_socket(rx_adapter->mem_name,
602 				len,
603 				RTE_CACHE_LINE_SIZE,
604 				rx_adapter->socket_id);
605 }
606 
607 static int
608 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
609 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
610 		      uint32_t **wrr_sched)
611 {
612 
613 	if (nb_poll == 0) {
614 		*rx_poll = NULL;
615 		*wrr_sched = NULL;
616 		return 0;
617 	}
618 
619 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
620 	if (*rx_poll == NULL) {
621 		*wrr_sched = NULL;
622 		return -ENOMEM;
623 	}
624 
625 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
626 	if (*wrr_sched == NULL) {
627 		rte_free(*rx_poll);
628 		return -ENOMEM;
629 	}
630 	return 0;
631 }
632 
633 /* Precalculate WRR polling sequence for all queues in rx_adapter */
634 static void
635 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
636 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
637 {
638 	uint16_t d;
639 	uint16_t q;
640 	unsigned int i;
641 	int prev = -1;
642 	int cw = -1;
643 
644 	/* Initialize variables for calculation of wrr schedule */
645 	uint16_t max_wrr_pos = 0;
646 	unsigned int poll_q = 0;
647 	uint16_t max_wt = 0;
648 	uint16_t gcd = 0;
649 
650 	if (rx_poll == NULL)
651 		return;
652 
653 	/* Generate array of all queues to poll, the size of this
654 	 * array is poll_q
655 	 */
656 	RTE_ETH_FOREACH_DEV(d) {
657 		uint16_t nb_rx_queues;
658 		struct eth_device_info *dev_info =
659 				&rx_adapter->eth_devices[d];
660 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
661 		if (dev_info->rx_queue == NULL)
662 			continue;
663 		if (dev_info->internal_event_port)
664 			continue;
665 		dev_info->wrr_len = 0;
666 		for (q = 0; q < nb_rx_queues; q++) {
667 			struct eth_rx_queue_info *queue_info =
668 				&dev_info->rx_queue[q];
669 			uint16_t wt;
670 
671 			if (!rxa_polled_queue(dev_info, q))
672 				continue;
673 			wt = queue_info->wt;
674 			rx_poll[poll_q].eth_dev_id = d;
675 			rx_poll[poll_q].eth_rx_qid = q;
676 			max_wrr_pos += wt;
677 			dev_info->wrr_len += wt;
678 			max_wt = RTE_MAX(max_wt, wt);
679 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
680 			poll_q++;
681 		}
682 	}
683 
684 	/* Generate polling sequence based on weights */
685 	prev = -1;
686 	cw = -1;
687 	for (i = 0; i < max_wrr_pos; i++) {
688 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
689 				     rx_poll, max_wt, gcd, prev);
690 		prev = rx_wrr[i];
691 	}
692 }
693 
694 static inline void
695 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
696 	struct rte_ipv6_hdr **ipv6_hdr)
697 {
698 	struct rte_ether_hdr *eth_hdr =
699 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
700 	struct rte_vlan_hdr *vlan_hdr;
701 
702 	*ipv4_hdr = NULL;
703 	*ipv6_hdr = NULL;
704 
705 	switch (eth_hdr->ether_type) {
706 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
707 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
708 		break;
709 
710 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
711 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
712 		break;
713 
714 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
715 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
716 		switch (vlan_hdr->eth_proto) {
717 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
718 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
719 			break;
720 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
721 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
722 			break;
723 		default:
724 			break;
725 		}
726 		break;
727 
728 	default:
729 		break;
730 	}
731 }
732 
733 /* Calculate RSS hash for IPv4/6 */
734 static inline uint32_t
735 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
736 {
737 	uint32_t input_len;
738 	void *tuple;
739 	struct rte_ipv4_tuple ipv4_tuple;
740 	struct rte_ipv6_tuple ipv6_tuple;
741 	struct rte_ipv4_hdr *ipv4_hdr;
742 	struct rte_ipv6_hdr *ipv6_hdr;
743 
744 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
745 
746 	if (ipv4_hdr) {
747 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
748 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
749 		tuple = &ipv4_tuple;
750 		input_len = RTE_THASH_V4_L3_LEN;
751 	} else if (ipv6_hdr) {
752 		rte_thash_load_v6_addrs(ipv6_hdr,
753 					(union rte_thash_tuple *)&ipv6_tuple);
754 		tuple = &ipv6_tuple;
755 		input_len = RTE_THASH_V6_L3_LEN;
756 	} else
757 		return 0;
758 
759 	return rte_softrss_be(tuple, input_len, rss_key_be);
760 }
761 
762 static inline int
763 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
764 {
765 	return !!rx_adapter->enq_block_count;
766 }
767 
768 static inline void
769 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
770 {
771 	if (rx_adapter->rx_enq_block_start_ts)
772 		return;
773 
774 	rx_adapter->enq_block_count++;
775 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
776 		return;
777 
778 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
779 }
780 
781 static inline void
782 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
783 		     struct rte_event_eth_rx_adapter_stats *stats)
784 {
785 	if (unlikely(!stats->rx_enq_start_ts))
786 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
787 
788 	if (likely(!rxa_enq_blocked(rx_adapter)))
789 		return;
790 
791 	rx_adapter->enq_block_count = 0;
792 	if (rx_adapter->rx_enq_block_start_ts) {
793 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
794 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
795 		    rx_adapter->rx_enq_block_start_ts;
796 		rx_adapter->rx_enq_block_start_ts = 0;
797 	}
798 }
799 
800 /* Enqueue buffered events to event device */
801 static inline uint16_t
802 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
803 		       struct eth_event_enqueue_buffer *buf,
804 		       struct rte_event_eth_rx_adapter_stats *stats)
805 {
806 	uint16_t count = buf->count;
807 	uint16_t n = 0;
808 
809 	if (!count)
810 		return 0;
811 
812 	if (buf->last)
813 		count = buf->last - buf->head;
814 
815 	if (count) {
816 		n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
817 						rx_adapter->event_port_id,
818 						&buf->events[buf->head],
819 						count);
820 		if (n != count)
821 			stats->rx_enq_retry++;
822 
823 		buf->head += n;
824 	}
825 
826 	if (buf->last && n == count) {
827 		uint16_t n1;
828 
829 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
830 					rx_adapter->event_port_id,
831 					&buf->events[0],
832 					buf->tail);
833 
834 		if (n1 != buf->tail)
835 			stats->rx_enq_retry++;
836 
837 		buf->last = 0;
838 		buf->head = n1;
839 		buf->last_mask = 0;
840 		n += n1;
841 	}
842 
843 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
844 		rxa_enq_block_start_ts(rx_adapter);
845 
846 	buf->count -= n;
847 	stats->rx_enq_count += n;
848 
849 	return n;
850 }
851 
852 static inline void
853 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
854 		struct eth_rx_vector_data *vec)
855 {
856 	vec->vector_ev->nb_elem = 0;
857 	vec->vector_ev->port = vec->port;
858 	vec->vector_ev->queue = vec->queue;
859 	vec->vector_ev->attr_valid = true;
860 	vec->vector_ev->elem_offset = 0;
861 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
862 }
863 
864 static inline uint16_t
865 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
866 			struct eth_rx_queue_info *queue_info,
867 			struct eth_event_enqueue_buffer *buf,
868 			struct rte_mbuf **mbufs, uint16_t num)
869 {
870 	struct rte_event *ev = &buf->events[buf->count];
871 	struct eth_rx_vector_data *vec;
872 	uint16_t filled, space, sz;
873 
874 	filled = 0;
875 	vec = &queue_info->vector_data;
876 
877 	if (vec->vector_ev == NULL) {
878 		if (rte_mempool_get(vec->vector_pool,
879 				    (void **)&vec->vector_ev) < 0) {
880 			rte_pktmbuf_free_bulk(mbufs, num);
881 			return 0;
882 		}
883 		rxa_init_vector(rx_adapter, vec);
884 	}
885 	while (num) {
886 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
887 			/* Event ready. */
888 			ev->event = vec->event;
889 			ev->vec = vec->vector_ev;
890 			ev++;
891 			filled++;
892 			vec->vector_ev = NULL;
893 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
894 			if (rte_mempool_get(vec->vector_pool,
895 					    (void **)&vec->vector_ev) < 0) {
896 				rte_pktmbuf_free_bulk(mbufs, num);
897 				return 0;
898 			}
899 			rxa_init_vector(rx_adapter, vec);
900 		}
901 
902 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
903 		sz = num > space ? space : num;
904 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
905 		       sizeof(void *) * sz);
906 		vec->vector_ev->nb_elem += sz;
907 		num -= sz;
908 		mbufs += sz;
909 		vec->ts = rte_rdtsc();
910 	}
911 
912 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
913 		ev->event = vec->event;
914 		ev->vec = vec->vector_ev;
915 		ev++;
916 		filled++;
917 		vec->vector_ev = NULL;
918 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
919 	}
920 
921 	return filled;
922 }
923 
924 static inline void
925 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
926 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
927 		 struct eth_event_enqueue_buffer *buf,
928 		 struct rte_event_eth_rx_adapter_stats *stats)
929 {
930 	uint32_t i;
931 	struct eth_device_info *dev_info =
932 					&rx_adapter->eth_devices[eth_dev_id];
933 	struct eth_rx_queue_info *eth_rx_queue_info =
934 					&dev_info->rx_queue[rx_queue_id];
935 	uint16_t new_tail = buf->tail;
936 	uint64_t event = eth_rx_queue_info->event;
937 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
938 	struct rte_mbuf *m = mbufs[0];
939 	uint32_t rss_mask;
940 	uint32_t rss;
941 	int do_rss;
942 	uint16_t nb_cb;
943 	uint16_t dropped;
944 	uint64_t ts, ts_mask;
945 
946 	if (!eth_rx_queue_info->ena_vector) {
947 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
948 						0 : rte_get_tsc_cycles();
949 
950 		/* 0xffff ffff ffff ffff if RTE_MBUF_F_RX_TIMESTAMP is set,
951 		 * otherwise 0
952 		 */
953 		ts_mask = (uint64_t)(!(m->ol_flags &
954 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
955 
956 		/* 0xffff ffff if RTE_MBUF_F_RX_RSS_HASH is set, otherwise 0 */
957 		rss_mask = ~(((m->ol_flags & RTE_MBUF_F_RX_RSS_HASH) != 0) - 1);
958 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
959 		for (i = 0; i < num; i++) {
960 			struct rte_event *ev;
961 
962 			m = mbufs[i];
963 			*rxa_timestamp_dynfield(m) = ts |
964 					(*rxa_timestamp_dynfield(m) & ts_mask);
965 
966 			ev = &buf->events[new_tail];
967 
968 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
969 				     : m->hash.rss;
970 			ev->event = event;
971 			ev->flow_id = (rss & ~flow_id_mask) |
972 				      (ev->flow_id & flow_id_mask);
973 			ev->mbuf = m;
974 			new_tail++;
975 		}
976 	} else {
977 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
978 					      buf, mbufs, num);
979 	}
980 
981 	if (num && dev_info->cb_fn) {
982 
983 		dropped = 0;
984 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
985 				       buf->last |
986 				       (buf->events_size & ~buf->last_mask),
987 				       buf->count >= BATCH_SIZE ?
988 						buf->count - BATCH_SIZE : 0,
989 				       &buf->events[buf->tail],
990 				       num,
991 				       dev_info->cb_arg,
992 				       &dropped);
993 		if (unlikely(nb_cb > num))
994 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
995 				nb_cb, num);
996 		else
997 			num = nb_cb;
998 		if (dropped)
999 			stats->rx_dropped += dropped;
1000 	}
1001 
1002 	buf->count += num;
1003 	buf->tail += num;
1004 }
1005 
1006 static inline bool
1007 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
1008 {
1009 	uint32_t nb_req = buf->tail + BATCH_SIZE;
1010 
1011 	if (!buf->last) {
1012 		if (nb_req <= buf->events_size)
1013 			return true;
1014 
1015 		if (buf->head >= BATCH_SIZE) {
1016 			buf->last_mask = ~0;
1017 			buf->last = buf->tail;
1018 			buf->tail = 0;
1019 			return true;
1020 		}
1021 	}
1022 
1023 	return nb_req <= buf->head;
1024 }
1025 
1026 /* Enqueue packets from  <port, q>  to event buffer */
1027 static inline uint32_t
1028 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
1029 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
1030 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf,
1031 	   struct rte_event_eth_rx_adapter_stats *stats)
1032 {
1033 	struct rte_mbuf *mbufs[BATCH_SIZE];
1034 	uint16_t n;
1035 	uint32_t nb_rx = 0;
1036 	uint32_t nb_flushed = 0;
1037 
1038 	if (rxq_empty)
1039 		*rxq_empty = 0;
1040 	/* Don't do a batch dequeue from the rx queue if there isn't
1041 	 * enough space in the enqueue buffer.
1042 	 */
1043 	while (rxa_pkt_buf_available(buf)) {
1044 		if (buf->count >= BATCH_SIZE)
1045 			nb_flushed +=
1046 				rxa_flush_event_buffer(rx_adapter, buf, stats);
1047 
1048 		stats->rx_poll_count++;
1049 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1050 		if (unlikely(!n)) {
1051 			if (rxq_empty)
1052 				*rxq_empty = 1;
1053 			break;
1054 		}
1055 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf,
1056 				 stats);
1057 		nb_rx += n;
1058 		if (rx_count + nb_rx > max_rx)
1059 			break;
1060 	}
1061 
1062 	if (buf->count > 0)
1063 		nb_flushed += rxa_flush_event_buffer(rx_adapter, buf, stats);
1064 
1065 	stats->rx_packets += nb_rx;
1066 	if (nb_flushed == 0)
1067 		rte_event_maintain(rx_adapter->eventdev_id,
1068 				   rx_adapter->event_port_id, 0);
1069 
1070 	return nb_rx;
1071 }
1072 
1073 static inline void
1074 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1075 {
1076 	uint16_t port_id;
1077 	uint16_t queue;
1078 	int err;
1079 	union queue_data qd;
1080 	struct eth_device_info *dev_info;
1081 	struct eth_rx_queue_info *queue_info;
1082 	int *intr_enabled;
1083 
1084 	qd.ptr = data;
1085 	port_id = qd.port;
1086 	queue = qd.queue;
1087 
1088 	dev_info = &rx_adapter->eth_devices[port_id];
1089 	queue_info = &dev_info->rx_queue[queue];
1090 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1091 	if (rxa_shared_intr(dev_info, queue))
1092 		intr_enabled = &dev_info->shared_intr_enabled;
1093 	else
1094 		intr_enabled = &queue_info->intr_enabled;
1095 
1096 	if (*intr_enabled) {
1097 		*intr_enabled = 0;
1098 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1099 		/* Entry should always be available.
1100 		 * The ring size equals the maximum number of interrupt
1101 		 * vectors supported (an interrupt vector is shared in
1102 		 * case of shared interrupts)
1103 		 */
1104 		if (err)
1105 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1106 				" to ring: %s", strerror(-err));
1107 		else
1108 			rte_eth_dev_rx_intr_disable(port_id, queue);
1109 	}
1110 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1111 }
1112 
1113 static int
1114 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1115 			  uint32_t num_intr_vec)
1116 {
1117 	if (rx_adapter->num_intr_vec + num_intr_vec >
1118 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1119 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1120 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1121 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1122 		return -ENOSPC;
1123 	}
1124 
1125 	return 0;
1126 }
1127 
1128 /* Delete entries for (dev, queue) from the interrupt ring */
1129 static void
1130 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1131 			  struct eth_device_info *dev_info,
1132 			  uint16_t rx_queue_id)
1133 {
1134 	int i, n;
1135 	union queue_data qd;
1136 
1137 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1138 
1139 	n = rte_ring_count(rx_adapter->intr_ring);
1140 	for (i = 0; i < n; i++) {
1141 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1142 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1143 			if (qd.port == dev_info->dev->data->port_id &&
1144 				qd.queue == rx_queue_id)
1145 				continue;
1146 		} else {
1147 			if (qd.port == dev_info->dev->data->port_id)
1148 				continue;
1149 		}
1150 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1151 	}
1152 
1153 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1154 }
1155 
1156 /* pthread callback handling interrupt mode receive queues
1157  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1158  * interrupting queue to the adapter's ring buffer for interrupt events.
1159  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1160  * the adapter service function.
1161  */
1162 static void *
1163 rxa_intr_thread(void *arg)
1164 {
1165 	struct event_eth_rx_adapter *rx_adapter = arg;
1166 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1167 	int n, i;
1168 
1169 	while (1) {
1170 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1171 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1172 		if (unlikely(n < 0))
1173 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1174 					n);
1175 		for (i = 0; i < n; i++) {
1176 			rxa_intr_ring_enqueue(rx_adapter,
1177 					epoll_events[i].epdata.data);
1178 		}
1179 	}
1180 
1181 	return NULL;
1182 }
1183 
1184 /* Dequeue <port, q> from interrupt ring and enqueue received
1185  * mbufs to eventdev
1186  */
1187 static inline void
1188 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1189 {
1190 	uint32_t n;
1191 	uint32_t nb_rx = 0;
1192 	int rxq_empty;
1193 	struct eth_event_enqueue_buffer *buf;
1194 	struct rte_event_eth_rx_adapter_stats *stats;
1195 	rte_spinlock_t *ring_lock;
1196 	uint8_t max_done = 0;
1197 
1198 	if (rx_adapter->num_rx_intr == 0)
1199 		return;
1200 
1201 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1202 		&& !rx_adapter->qd_valid)
1203 		return;
1204 
1205 	buf = &rx_adapter->event_enqueue_buffer;
1206 	stats = &rx_adapter->stats;
1207 	ring_lock = &rx_adapter->intr_ring_lock;
1208 
1209 	if (buf->count >= BATCH_SIZE)
1210 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1211 
1212 	while (rxa_pkt_buf_available(buf)) {
1213 		struct eth_device_info *dev_info;
1214 		uint16_t port;
1215 		uint16_t queue;
1216 		union queue_data qd  = rx_adapter->qd;
1217 		int err;
1218 
1219 		if (!rx_adapter->qd_valid) {
1220 			struct eth_rx_queue_info *queue_info;
1221 
1222 			rte_spinlock_lock(ring_lock);
1223 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1224 			if (err) {
1225 				rte_spinlock_unlock(ring_lock);
1226 				break;
1227 			}
1228 
1229 			port = qd.port;
1230 			queue = qd.queue;
1231 			rx_adapter->qd = qd;
1232 			rx_adapter->qd_valid = 1;
1233 			dev_info = &rx_adapter->eth_devices[port];
1234 			if (rxa_shared_intr(dev_info, queue))
1235 				dev_info->shared_intr_enabled = 1;
1236 			else {
1237 				queue_info = &dev_info->rx_queue[queue];
1238 				queue_info->intr_enabled = 1;
1239 			}
1240 			rte_eth_dev_rx_intr_enable(port, queue);
1241 			rte_spinlock_unlock(ring_lock);
1242 		} else {
1243 			port = qd.port;
1244 			queue = qd.queue;
1245 
1246 			dev_info = &rx_adapter->eth_devices[port];
1247 		}
1248 
1249 		if (rxa_shared_intr(dev_info, queue)) {
1250 			uint16_t i;
1251 			uint16_t nb_queues;
1252 
1253 			nb_queues = dev_info->dev->data->nb_rx_queues;
1254 			n = 0;
1255 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1256 				uint8_t enq_buffer_full;
1257 
1258 				if (!rxa_intr_queue(dev_info, i))
1259 					continue;
1260 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1261 					rx_adapter->max_nb_rx,
1262 					&rxq_empty, buf, stats);
1263 				nb_rx += n;
1264 
1265 				enq_buffer_full = !rxq_empty && n == 0;
1266 				max_done = nb_rx > rx_adapter->max_nb_rx;
1267 
1268 				if (enq_buffer_full || max_done) {
1269 					dev_info->next_q_idx = i;
1270 					goto done;
1271 				}
1272 			}
1273 
1274 			rx_adapter->qd_valid = 0;
1275 
1276 			/* Reinitialize for next interrupt */
1277 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1278 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1279 						0;
1280 		} else {
1281 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1282 				rx_adapter->max_nb_rx,
1283 				&rxq_empty, buf, stats);
1284 			rx_adapter->qd_valid = !rxq_empty;
1285 			nb_rx += n;
1286 			if (nb_rx > rx_adapter->max_nb_rx)
1287 				break;
1288 		}
1289 	}
1290 
1291 done:
1292 	rx_adapter->stats.rx_intr_packets += nb_rx;
1293 }
1294 
1295 /*
1296  * Polls receive queues added to the event adapter and enqueues received
1297  * packets to the event device.
1298  *
1299  * The receive code enqueues initially to a temporary buffer, the
1300  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1301  *
1302  * If there isn't space available in the temporary buffer, packets from the
1303  * Rx queue aren't dequeued from the eth device, this back pressures the
1304  * eth device, in virtual device environments this back pressure is relayed to
1305  * the hypervisor's switching layer where adjustments can be made to deal with
1306  * it.
1307  */
1308 static inline void
1309 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1310 {
1311 	uint32_t num_queue;
1312 	uint32_t nb_rx = 0;
1313 	struct eth_event_enqueue_buffer *buf = NULL;
1314 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1315 	uint32_t wrr_pos;
1316 	uint32_t max_nb_rx;
1317 
1318 	wrr_pos = rx_adapter->wrr_pos;
1319 	max_nb_rx = rx_adapter->max_nb_rx;
1320 
1321 	/* Iterate through a WRR sequence */
1322 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1323 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1324 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1325 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1326 
1327 		buf = rxa_event_buf_get(rx_adapter, d, qid, &stats);
1328 
1329 		/* Don't do a batch dequeue from the rx queue if there isn't
1330 		 * enough space in the enqueue buffer.
1331 		 */
1332 		if (buf->count >= BATCH_SIZE)
1333 			rxa_flush_event_buffer(rx_adapter, buf, stats);
1334 		if (!rxa_pkt_buf_available(buf)) {
1335 			if (rx_adapter->use_queue_event_buf)
1336 				goto poll_next_entry;
1337 			else {
1338 				rx_adapter->wrr_pos = wrr_pos;
1339 				return;
1340 			}
1341 		}
1342 
1343 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1344 				NULL, buf, stats);
1345 		if (nb_rx > max_nb_rx) {
1346 			rx_adapter->wrr_pos =
1347 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1348 			break;
1349 		}
1350 
1351 poll_next_entry:
1352 		if (++wrr_pos == rx_adapter->wrr_len)
1353 			wrr_pos = 0;
1354 	}
1355 }
1356 
1357 static void
1358 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1359 {
1360 	struct event_eth_rx_adapter *rx_adapter = arg;
1361 	struct eth_event_enqueue_buffer *buf = NULL;
1362 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1363 	struct rte_event *ev;
1364 
1365 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue, &stats);
1366 
1367 	if (buf->count)
1368 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1369 
1370 	if (vec->vector_ev->nb_elem == 0)
1371 		return;
1372 	ev = &buf->events[buf->count];
1373 
1374 	/* Event ready. */
1375 	ev->event = vec->event;
1376 	ev->vec = vec->vector_ev;
1377 	buf->count++;
1378 
1379 	vec->vector_ev = NULL;
1380 	vec->ts = 0;
1381 }
1382 
1383 static int
1384 rxa_service_func(void *args)
1385 {
1386 	struct event_eth_rx_adapter *rx_adapter = args;
1387 
1388 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1389 		return 0;
1390 	if (!rx_adapter->rxa_started) {
1391 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1392 		return 0;
1393 	}
1394 
1395 	if (rx_adapter->ena_vector) {
1396 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1397 		    rx_adapter->vector_tmo_ticks) {
1398 			struct eth_rx_vector_data *vec;
1399 
1400 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1401 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1402 
1403 				if (elapsed_time >= vec->vector_timeout_ticks) {
1404 					rxa_vector_expire(vec, rx_adapter);
1405 					TAILQ_REMOVE(&rx_adapter->vector_list,
1406 						     vec, next);
1407 				}
1408 			}
1409 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1410 		}
1411 	}
1412 
1413 	rxa_intr_ring_dequeue(rx_adapter);
1414 	rxa_poll(rx_adapter);
1415 
1416 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1417 
1418 	return 0;
1419 }
1420 
1421 static void *
1422 rxa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
1423 {
1424 	const struct rte_memzone *mz;
1425 	unsigned int sz;
1426 
1427 	sz = elt_size * nb_elems;
1428 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1429 
1430 	mz = rte_memzone_lookup(name);
1431 	if (mz == NULL) {
1432 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1433 						 RTE_CACHE_LINE_SIZE);
1434 		if (mz == NULL) {
1435 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
1436 					 " name = %s, err = %"
1437 					 PRId32, name, rte_errno);
1438 			return NULL;
1439 		}
1440 	}
1441 
1442 	return mz->addr;
1443 }
1444 
1445 static int
1446 rte_event_eth_rx_adapter_init(void)
1447 {
1448 	uint8_t i;
1449 
1450 	if (event_eth_rx_adapter == NULL) {
1451 		event_eth_rx_adapter =
1452 			rxa_memzone_array_get(RXA_ADAPTER_ARRAY,
1453 					sizeof(*event_eth_rx_adapter),
1454 					RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE);
1455 		if (event_eth_rx_adapter == NULL)
1456 			return -ENOMEM;
1457 
1458 		for (i = 0; i < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; i++)
1459 			event_eth_rx_adapter[i] = NULL;
1460 
1461 	}
1462 
1463 	return 0;
1464 }
1465 
1466 static int
1467 rxa_memzone_lookup(void)
1468 {
1469 	const struct rte_memzone *mz;
1470 
1471 	if (event_eth_rx_adapter == NULL) {
1472 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1473 		if (mz == NULL)
1474 			return -ENOMEM;
1475 
1476 		event_eth_rx_adapter = mz->addr;
1477 	}
1478 
1479 	return 0;
1480 }
1481 
1482 static inline struct event_eth_rx_adapter *
1483 rxa_id_to_adapter(uint8_t id)
1484 {
1485 	return event_eth_rx_adapter ?
1486 		event_eth_rx_adapter[id] : NULL;
1487 }
1488 
1489 static int
1490 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1491 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1492 {
1493 	int ret;
1494 	struct rte_eventdev *dev;
1495 	struct rte_event_dev_config dev_conf;
1496 	int started;
1497 	uint8_t port_id;
1498 	struct rte_event_port_conf *port_conf = arg;
1499 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1500 
1501 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1502 	dev_conf = dev->data->dev_conf;
1503 
1504 	started = dev->data->dev_started;
1505 	if (started)
1506 		rte_event_dev_stop(dev_id);
1507 	port_id = dev_conf.nb_event_ports;
1508 	dev_conf.nb_event_ports += 1;
1509 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1510 	if (ret) {
1511 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1512 						dev_id);
1513 		if (started) {
1514 			if (rte_event_dev_start(dev_id))
1515 				return -EIO;
1516 		}
1517 		return ret;
1518 	}
1519 
1520 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1521 	if (ret) {
1522 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1523 					port_id);
1524 		return ret;
1525 	}
1526 
1527 	conf->event_port_id = port_id;
1528 	conf->max_nb_rx = 128;
1529 	if (started)
1530 		ret = rte_event_dev_start(dev_id);
1531 	rx_adapter->default_cb_arg = 1;
1532 	return ret;
1533 }
1534 
1535 static int
1536 rxa_epoll_create1(void)
1537 {
1538 #if defined(LINUX)
1539 	int fd;
1540 	fd = epoll_create1(EPOLL_CLOEXEC);
1541 	return fd < 0 ? -errno : fd;
1542 #elif defined(BSD)
1543 	return -ENOTSUP;
1544 #endif
1545 }
1546 
1547 static int
1548 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1549 {
1550 	if (rx_adapter->epd != INIT_FD)
1551 		return 0;
1552 
1553 	rx_adapter->epd = rxa_epoll_create1();
1554 	if (rx_adapter->epd < 0) {
1555 		int err = rx_adapter->epd;
1556 		rx_adapter->epd = INIT_FD;
1557 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1558 		return err;
1559 	}
1560 
1561 	return 0;
1562 }
1563 
1564 static int
1565 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1566 {
1567 	int err;
1568 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1569 
1570 	if (rx_adapter->intr_ring)
1571 		return 0;
1572 
1573 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1574 					RTE_EVENT_ETH_INTR_RING_SIZE,
1575 					rte_socket_id(), 0);
1576 	if (!rx_adapter->intr_ring)
1577 		return -ENOMEM;
1578 
1579 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1580 					RTE_EVENT_ETH_INTR_RING_SIZE *
1581 					sizeof(struct rte_epoll_event),
1582 					RTE_CACHE_LINE_SIZE,
1583 					rx_adapter->socket_id);
1584 	if (!rx_adapter->epoll_events) {
1585 		err = -ENOMEM;
1586 		goto error;
1587 	}
1588 
1589 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1590 
1591 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1592 			"rx-intr-thread-%d", rx_adapter->id);
1593 
1594 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1595 				NULL, rxa_intr_thread, rx_adapter);
1596 	if (!err)
1597 		return 0;
1598 
1599 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1600 	rte_free(rx_adapter->epoll_events);
1601 error:
1602 	rte_ring_free(rx_adapter->intr_ring);
1603 	rx_adapter->intr_ring = NULL;
1604 	rx_adapter->epoll_events = NULL;
1605 	return err;
1606 }
1607 
1608 static int
1609 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1610 {
1611 	int err;
1612 
1613 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1614 	if (err)
1615 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1616 				err);
1617 
1618 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1619 	if (err)
1620 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1621 
1622 	rte_free(rx_adapter->epoll_events);
1623 	rte_ring_free(rx_adapter->intr_ring);
1624 	rx_adapter->intr_ring = NULL;
1625 	rx_adapter->epoll_events = NULL;
1626 	return 0;
1627 }
1628 
1629 static int
1630 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1631 {
1632 	int ret;
1633 
1634 	if (rx_adapter->num_rx_intr == 0)
1635 		return 0;
1636 
1637 	ret = rxa_destroy_intr_thread(rx_adapter);
1638 	if (ret)
1639 		return ret;
1640 
1641 	close(rx_adapter->epd);
1642 	rx_adapter->epd = INIT_FD;
1643 
1644 	return ret;
1645 }
1646 
1647 static int
1648 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1649 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1650 {
1651 	int err;
1652 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1653 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1654 
1655 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1656 	if (err) {
1657 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1658 			rx_queue_id);
1659 		return err;
1660 	}
1661 
1662 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1663 					rx_adapter->epd,
1664 					RTE_INTR_EVENT_DEL,
1665 					0);
1666 	if (err)
1667 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1668 
1669 	if (sintr)
1670 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1671 	else
1672 		dev_info->shared_intr_enabled = 0;
1673 	return err;
1674 }
1675 
1676 static int
1677 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1678 		   struct eth_device_info *dev_info, int rx_queue_id)
1679 {
1680 	int err;
1681 	int i;
1682 	int s;
1683 
1684 	if (dev_info->nb_rx_intr == 0)
1685 		return 0;
1686 
1687 	err = 0;
1688 	if (rx_queue_id == -1) {
1689 		s = dev_info->nb_shared_intr;
1690 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1691 			int sintr;
1692 			uint16_t q;
1693 
1694 			q = dev_info->intr_queue[i];
1695 			sintr = rxa_shared_intr(dev_info, q);
1696 			s -= sintr;
1697 
1698 			if (!sintr || s == 0) {
1699 
1700 				err = rxa_disable_intr(rx_adapter, dev_info,
1701 						q);
1702 				if (err)
1703 					return err;
1704 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1705 							q);
1706 			}
1707 		}
1708 	} else {
1709 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1710 			return 0;
1711 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1712 				dev_info->nb_shared_intr == 1) {
1713 			err = rxa_disable_intr(rx_adapter, dev_info,
1714 					rx_queue_id);
1715 			if (err)
1716 				return err;
1717 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1718 						rx_queue_id);
1719 		}
1720 
1721 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1722 			if (dev_info->intr_queue[i] == rx_queue_id) {
1723 				for (; i < dev_info->nb_rx_intr - 1; i++)
1724 					dev_info->intr_queue[i] =
1725 						dev_info->intr_queue[i + 1];
1726 				break;
1727 			}
1728 		}
1729 	}
1730 
1731 	return err;
1732 }
1733 
1734 static int
1735 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1736 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1737 {
1738 	int err, err1;
1739 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1740 	union queue_data qd;
1741 	int init_fd;
1742 	uint16_t *intr_queue;
1743 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1744 
1745 	if (rxa_intr_queue(dev_info, rx_queue_id))
1746 		return 0;
1747 
1748 	intr_queue = dev_info->intr_queue;
1749 	if (dev_info->intr_queue == NULL) {
1750 		size_t len =
1751 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1752 		dev_info->intr_queue =
1753 			rte_zmalloc_socket(
1754 				rx_adapter->mem_name,
1755 				len,
1756 				0,
1757 				rx_adapter->socket_id);
1758 		if (dev_info->intr_queue == NULL)
1759 			return -ENOMEM;
1760 	}
1761 
1762 	init_fd = rx_adapter->epd;
1763 	err = rxa_init_epd(rx_adapter);
1764 	if (err)
1765 		goto err_free_queue;
1766 
1767 	qd.port = eth_dev_id;
1768 	qd.queue = rx_queue_id;
1769 
1770 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1771 					rx_adapter->epd,
1772 					RTE_INTR_EVENT_ADD,
1773 					qd.ptr);
1774 	if (err) {
1775 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1776 			" Rx Queue %u err %d", rx_queue_id, err);
1777 		goto err_del_fd;
1778 	}
1779 
1780 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1781 	if (err) {
1782 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1783 				" Rx Queue %u err %d", rx_queue_id, err);
1784 
1785 		goto err_del_event;
1786 	}
1787 
1788 	err = rxa_create_intr_thread(rx_adapter);
1789 	if (!err)  {
1790 		if (sintr)
1791 			dev_info->shared_intr_enabled = 1;
1792 		else
1793 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1794 		return 0;
1795 	}
1796 
1797 
1798 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1799 	if (err)
1800 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1801 				" Rx Queue %u err %d", rx_queue_id, err);
1802 err_del_event:
1803 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1804 					rx_adapter->epd,
1805 					RTE_INTR_EVENT_DEL,
1806 					0);
1807 	if (err1) {
1808 		RTE_EDEV_LOG_ERR("Could not delete event for"
1809 				" Rx Queue %u err %d", rx_queue_id, err1);
1810 	}
1811 err_del_fd:
1812 	if (init_fd == INIT_FD) {
1813 		close(rx_adapter->epd);
1814 		rx_adapter->epd = -1;
1815 	}
1816 err_free_queue:
1817 	if (intr_queue == NULL)
1818 		rte_free(dev_info->intr_queue);
1819 
1820 	return err;
1821 }
1822 
1823 static int
1824 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1825 		   struct eth_device_info *dev_info, int rx_queue_id)
1826 
1827 {
1828 	int i, j, err;
1829 	int si = -1;
1830 	int shared_done = (dev_info->nb_shared_intr > 0);
1831 
1832 	if (rx_queue_id != -1) {
1833 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1834 			return 0;
1835 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1836 	}
1837 
1838 	err = 0;
1839 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1840 
1841 		if (rxa_shared_intr(dev_info, i) && shared_done)
1842 			continue;
1843 
1844 		err = rxa_config_intr(rx_adapter, dev_info, i);
1845 
1846 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1847 		if (shared_done) {
1848 			si = i;
1849 			dev_info->shared_intr_enabled = 1;
1850 		}
1851 		if (err)
1852 			break;
1853 	}
1854 
1855 	if (err == 0)
1856 		return 0;
1857 
1858 	shared_done = (dev_info->nb_shared_intr > 0);
1859 	for (j = 0; j < i; j++) {
1860 		if (rxa_intr_queue(dev_info, j))
1861 			continue;
1862 		if (rxa_shared_intr(dev_info, j) && si != j)
1863 			continue;
1864 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1865 		if (err)
1866 			break;
1867 
1868 	}
1869 
1870 	return err;
1871 }
1872 
1873 static int
1874 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1875 {
1876 	int ret;
1877 	struct rte_service_spec service;
1878 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1879 
1880 	if (rx_adapter->service_inited)
1881 		return 0;
1882 
1883 	memset(&service, 0, sizeof(service));
1884 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1885 		"rte_event_eth_rx_adapter_%d", id);
1886 	service.socket_id = rx_adapter->socket_id;
1887 	service.callback = rxa_service_func;
1888 	service.callback_userdata = rx_adapter;
1889 	/* Service function handles locking for queue add/del updates */
1890 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1891 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1892 	if (ret) {
1893 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1894 			service.name, ret);
1895 		return ret;
1896 	}
1897 
1898 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1899 		&rx_adapter_conf, rx_adapter->conf_arg);
1900 	if (ret) {
1901 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1902 			ret);
1903 		goto err_done;
1904 	}
1905 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1906 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1907 	rx_adapter->service_inited = 1;
1908 	rx_adapter->epd = INIT_FD;
1909 	return 0;
1910 
1911 err_done:
1912 	rte_service_component_unregister(rx_adapter->service_id);
1913 	return ret;
1914 }
1915 
1916 static void
1917 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1918 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1919 		 uint8_t add)
1920 {
1921 	struct eth_rx_queue_info *queue_info;
1922 	int enabled;
1923 	uint16_t i;
1924 
1925 	if (dev_info->rx_queue == NULL)
1926 		return;
1927 
1928 	if (rx_queue_id == -1) {
1929 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1930 			rxa_update_queue(rx_adapter, dev_info, i, add);
1931 	} else {
1932 		queue_info = &dev_info->rx_queue[rx_queue_id];
1933 		enabled = queue_info->queue_enabled;
1934 		if (add) {
1935 			rx_adapter->nb_queues += !enabled;
1936 			dev_info->nb_dev_queues += !enabled;
1937 		} else {
1938 			rx_adapter->nb_queues -= enabled;
1939 			dev_info->nb_dev_queues -= enabled;
1940 		}
1941 		queue_info->queue_enabled = !!add;
1942 	}
1943 }
1944 
1945 static void
1946 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1947 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1948 		    uint16_t port_id)
1949 {
1950 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1951 	struct eth_rx_vector_data *vector_data;
1952 	uint32_t flow_id;
1953 
1954 	vector_data = &queue_info->vector_data;
1955 	vector_data->max_vector_count = vector_count;
1956 	vector_data->port = port_id;
1957 	vector_data->queue = qid;
1958 	vector_data->vector_pool = mp;
1959 	vector_data->vector_timeout_ticks =
1960 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1961 	vector_data->ts = 0;
1962 	flow_id = queue_info->event & 0xFFFFF;
1963 	flow_id =
1964 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1965 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1966 }
1967 
1968 static void
1969 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
1970 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
1971 {
1972 	struct eth_rx_vector_data *vec;
1973 	int pollq;
1974 	int intrq;
1975 	int sintrq;
1976 
1977 	if (rx_adapter->nb_queues == 0)
1978 		return;
1979 
1980 	if (rx_queue_id == -1) {
1981 		uint16_t nb_rx_queues;
1982 		uint16_t i;
1983 
1984 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1985 		for (i = 0; i <	nb_rx_queues; i++)
1986 			rxa_sw_del(rx_adapter, dev_info, i);
1987 		return;
1988 	}
1989 
1990 	/* Push all the partial event vectors to event device. */
1991 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1992 		if (vec->queue != rx_queue_id)
1993 			continue;
1994 		rxa_vector_expire(vec, rx_adapter);
1995 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1996 	}
1997 
1998 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1999 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2000 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2001 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
2002 	rx_adapter->num_rx_polled -= pollq;
2003 	dev_info->nb_rx_poll -= pollq;
2004 	rx_adapter->num_rx_intr -= intrq;
2005 	dev_info->nb_rx_intr -= intrq;
2006 	dev_info->nb_shared_intr -= intrq && sintrq;
2007 	if (rx_adapter->use_queue_event_buf) {
2008 		struct eth_event_enqueue_buffer *event_buf =
2009 			dev_info->rx_queue[rx_queue_id].event_buf;
2010 		struct rte_event_eth_rx_adapter_stats *stats =
2011 			dev_info->rx_queue[rx_queue_id].stats;
2012 		rte_free(event_buf->events);
2013 		rte_free(event_buf);
2014 		rte_free(stats);
2015 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
2016 		dev_info->rx_queue[rx_queue_id].stats = NULL;
2017 	}
2018 }
2019 
2020 static int
2021 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
2022 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
2023 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
2024 {
2025 	struct eth_rx_queue_info *queue_info;
2026 	const struct rte_event *ev = &conf->ev;
2027 	int pollq;
2028 	int intrq;
2029 	int sintrq;
2030 	struct rte_event *qi_ev;
2031 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
2032 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
2033 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
2034 	int ret;
2035 
2036 	if (rx_queue_id == -1) {
2037 		uint16_t nb_rx_queues;
2038 		uint16_t i;
2039 
2040 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2041 		for (i = 0; i <	nb_rx_queues; i++) {
2042 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
2043 			if (ret)
2044 				return ret;
2045 		}
2046 		return 0;
2047 	}
2048 
2049 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2050 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2051 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2052 
2053 	queue_info = &dev_info->rx_queue[rx_queue_id];
2054 	queue_info->wt = conf->servicing_weight;
2055 
2056 	qi_ev = (struct rte_event *)&queue_info->event;
2057 	qi_ev->event = ev->event;
2058 	qi_ev->op = RTE_EVENT_OP_NEW;
2059 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2060 
2061 	if (conf->rx_queue_flags &
2062 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2063 		queue_info->flow_id_mask = ~0;
2064 	} else
2065 		qi_ev->flow_id = 0;
2066 
2067 	if (conf->rx_queue_flags &
2068 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2069 		queue_info->ena_vector = 1;
2070 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2071 		rxa_set_vector_data(queue_info, conf->vector_sz,
2072 				    conf->vector_timeout_ns, conf->vector_mp,
2073 				    rx_queue_id, dev_info->dev->data->port_id);
2074 		rx_adapter->ena_vector = 1;
2075 		rx_adapter->vector_tmo_ticks =
2076 			rx_adapter->vector_tmo_ticks ?
2077 				      RTE_MIN(queue_info->vector_data
2078 							.vector_timeout_ticks >>
2079 						1,
2080 					rx_adapter->vector_tmo_ticks) :
2081 				queue_info->vector_data.vector_timeout_ticks >>
2082 					1;
2083 	}
2084 
2085 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2086 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2087 		rx_adapter->num_rx_polled += !pollq;
2088 		dev_info->nb_rx_poll += !pollq;
2089 		rx_adapter->num_rx_intr -= intrq;
2090 		dev_info->nb_rx_intr -= intrq;
2091 		dev_info->nb_shared_intr -= intrq && sintrq;
2092 	}
2093 
2094 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2095 		rx_adapter->num_rx_polled -= pollq;
2096 		dev_info->nb_rx_poll -= pollq;
2097 		rx_adapter->num_rx_intr += !intrq;
2098 		dev_info->nb_rx_intr += !intrq;
2099 		dev_info->nb_shared_intr += !intrq && sintrq;
2100 		if (dev_info->nb_shared_intr == 1) {
2101 			if (dev_info->multi_intr_cap)
2102 				dev_info->next_q_idx =
2103 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2104 			else
2105 				dev_info->next_q_idx = 0;
2106 		}
2107 	}
2108 
2109 	if (!rx_adapter->use_queue_event_buf)
2110 		return 0;
2111 
2112 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2113 				sizeof(*new_rx_buf), 0,
2114 				rte_eth_dev_socket_id(eth_dev_id));
2115 	if (new_rx_buf == NULL) {
2116 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2117 				 "dev_id: %d queue_id: %d",
2118 				 eth_dev_id, rx_queue_id);
2119 		return -ENOMEM;
2120 	}
2121 
2122 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2123 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2124 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2125 				sizeof(struct rte_event) *
2126 				new_rx_buf->events_size, 0,
2127 				rte_eth_dev_socket_id(eth_dev_id));
2128 	if (new_rx_buf->events == NULL) {
2129 		rte_free(new_rx_buf);
2130 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2131 				 "dev_id: %d queue_id: %d",
2132 				 eth_dev_id, rx_queue_id);
2133 		return -ENOMEM;
2134 	}
2135 
2136 	queue_info->event_buf = new_rx_buf;
2137 
2138 	/* Allocate storage for adapter queue stats */
2139 	stats = rte_zmalloc_socket("rx_queue_stats",
2140 				sizeof(*stats), 0,
2141 				rte_eth_dev_socket_id(eth_dev_id));
2142 	if (stats == NULL) {
2143 		rte_free(new_rx_buf->events);
2144 		rte_free(new_rx_buf);
2145 		RTE_EDEV_LOG_ERR("Failed to allocate stats storage for"
2146 				 " dev_id: %d queue_id: %d",
2147 				 eth_dev_id, rx_queue_id);
2148 		return -ENOMEM;
2149 	}
2150 
2151 	queue_info->stats = stats;
2152 
2153 	return 0;
2154 }
2155 
2156 static int
2157 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2158 	   int rx_queue_id,
2159 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2160 {
2161 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2162 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2163 	int ret;
2164 	struct eth_rx_poll_entry *rx_poll;
2165 	struct eth_rx_queue_info *rx_queue;
2166 	uint32_t *rx_wrr;
2167 	uint16_t nb_rx_queues;
2168 	uint32_t nb_rx_poll, nb_wrr;
2169 	uint32_t nb_rx_intr;
2170 	int num_intr_vec;
2171 	uint16_t wt;
2172 
2173 	if (queue_conf->servicing_weight == 0) {
2174 		struct rte_eth_dev_data *data = dev_info->dev->data;
2175 
2176 		temp_conf = *queue_conf;
2177 		if (!data->dev_conf.intr_conf.rxq) {
2178 			/* If Rx interrupts are disabled set wt = 1 */
2179 			temp_conf.servicing_weight = 1;
2180 		}
2181 		queue_conf = &temp_conf;
2182 
2183 		if (queue_conf->servicing_weight == 0 &&
2184 		    rx_adapter->use_queue_event_buf) {
2185 
2186 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2187 					 "not supported for interrupt queues "
2188 					 "dev_id: %d queue_id: %d",
2189 					 eth_dev_id, rx_queue_id);
2190 			return -EINVAL;
2191 		}
2192 	}
2193 
2194 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2195 	rx_queue = dev_info->rx_queue;
2196 	wt = queue_conf->servicing_weight;
2197 
2198 	if (dev_info->rx_queue == NULL) {
2199 		dev_info->rx_queue =
2200 		    rte_zmalloc_socket(rx_adapter->mem_name,
2201 				       nb_rx_queues *
2202 				       sizeof(struct eth_rx_queue_info), 0,
2203 				       rx_adapter->socket_id);
2204 		if (dev_info->rx_queue == NULL)
2205 			return -ENOMEM;
2206 	}
2207 	rx_wrr = NULL;
2208 	rx_poll = NULL;
2209 
2210 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2211 			queue_conf->servicing_weight,
2212 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2213 
2214 	if (dev_info->dev->intr_handle)
2215 		dev_info->multi_intr_cap =
2216 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2217 
2218 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2219 				&rx_poll, &rx_wrr);
2220 	if (ret)
2221 		goto err_free_rxqueue;
2222 
2223 	if (wt == 0) {
2224 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2225 
2226 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2227 		if (ret)
2228 			goto err_free_rxqueue;
2229 
2230 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2231 		if (ret)
2232 			goto err_free_rxqueue;
2233 	} else {
2234 
2235 		num_intr_vec = 0;
2236 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2237 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2238 						rx_queue_id, 0);
2239 			/* interrupt based queues are being converted to
2240 			 * poll mode queues, delete the interrupt configuration
2241 			 * for those.
2242 			 */
2243 			ret = rxa_del_intr_queue(rx_adapter,
2244 						dev_info, rx_queue_id);
2245 			if (ret)
2246 				goto err_free_rxqueue;
2247 		}
2248 	}
2249 
2250 	if (nb_rx_intr == 0) {
2251 		ret = rxa_free_intr_resources(rx_adapter);
2252 		if (ret)
2253 			goto err_free_rxqueue;
2254 	}
2255 
2256 	if (wt == 0) {
2257 		uint16_t i;
2258 
2259 		if (rx_queue_id  == -1) {
2260 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2261 				dev_info->intr_queue[i] = i;
2262 		} else {
2263 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2264 				dev_info->intr_queue[nb_rx_intr - 1] =
2265 					rx_queue_id;
2266 		}
2267 	}
2268 
2269 
2270 
2271 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2272 	if (ret)
2273 		goto err_free_rxqueue;
2274 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2275 
2276 	rte_free(rx_adapter->eth_rx_poll);
2277 	rte_free(rx_adapter->wrr_sched);
2278 
2279 	rx_adapter->eth_rx_poll = rx_poll;
2280 	rx_adapter->wrr_sched = rx_wrr;
2281 	rx_adapter->wrr_len = nb_wrr;
2282 	rx_adapter->num_intr_vec += num_intr_vec;
2283 	return 0;
2284 
2285 err_free_rxqueue:
2286 	if (rx_queue == NULL) {
2287 		rte_free(dev_info->rx_queue);
2288 		dev_info->rx_queue = NULL;
2289 	}
2290 
2291 	rte_free(rx_poll);
2292 	rte_free(rx_wrr);
2293 
2294 	return ret;
2295 }
2296 
2297 static int
2298 rxa_ctrl(uint8_t id, int start)
2299 {
2300 	struct event_eth_rx_adapter *rx_adapter;
2301 	struct rte_eventdev *dev;
2302 	struct eth_device_info *dev_info;
2303 	uint32_t i;
2304 	int use_service = 0;
2305 	int stop = !start;
2306 
2307 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2308 	rx_adapter = rxa_id_to_adapter(id);
2309 	if (rx_adapter == NULL)
2310 		return -EINVAL;
2311 
2312 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2313 
2314 	RTE_ETH_FOREACH_DEV(i) {
2315 		dev_info = &rx_adapter->eth_devices[i];
2316 		/* if start  check for num dev queues */
2317 		if (start && !dev_info->nb_dev_queues)
2318 			continue;
2319 		/* if stop check if dev has been started */
2320 		if (stop && !dev_info->dev_rx_started)
2321 			continue;
2322 		use_service |= !dev_info->internal_event_port;
2323 		dev_info->dev_rx_started = start;
2324 		if (dev_info->internal_event_port == 0)
2325 			continue;
2326 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2327 						&rte_eth_devices[i]) :
2328 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2329 						&rte_eth_devices[i]);
2330 	}
2331 
2332 	if (use_service) {
2333 		rte_spinlock_lock(&rx_adapter->rx_lock);
2334 		rx_adapter->rxa_started = start;
2335 		rte_service_runstate_set(rx_adapter->service_id, start);
2336 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2337 	}
2338 
2339 	return 0;
2340 }
2341 
2342 static int
2343 rxa_create(uint8_t id, uint8_t dev_id,
2344 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2345 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2346 	   void *conf_arg)
2347 {
2348 	struct event_eth_rx_adapter *rx_adapter;
2349 	struct eth_event_enqueue_buffer *buf;
2350 	struct rte_event *events;
2351 	int ret;
2352 	int socket_id;
2353 	uint16_t i;
2354 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2355 	const uint8_t default_rss_key[] = {
2356 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2357 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2358 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2359 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2360 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2361 	};
2362 
2363 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2364 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2365 
2366 	if (conf_cb == NULL)
2367 		return -EINVAL;
2368 
2369 	if (event_eth_rx_adapter == NULL) {
2370 		ret = rte_event_eth_rx_adapter_init();
2371 		if (ret)
2372 			return ret;
2373 	}
2374 
2375 	rx_adapter = rxa_id_to_adapter(id);
2376 	if (rx_adapter != NULL) {
2377 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2378 		return -EEXIST;
2379 	}
2380 
2381 	socket_id = rte_event_dev_socket_id(dev_id);
2382 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2383 		"rte_event_eth_rx_adapter_%d",
2384 		id);
2385 
2386 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2387 			RTE_CACHE_LINE_SIZE, socket_id);
2388 	if (rx_adapter == NULL) {
2389 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2390 		return -ENOMEM;
2391 	}
2392 
2393 	rx_adapter->eventdev_id = dev_id;
2394 	rx_adapter->socket_id = socket_id;
2395 	rx_adapter->conf_cb = conf_cb;
2396 	rx_adapter->conf_arg = conf_arg;
2397 	rx_adapter->id = id;
2398 	TAILQ_INIT(&rx_adapter->vector_list);
2399 	strcpy(rx_adapter->mem_name, mem_name);
2400 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2401 					RTE_MAX_ETHPORTS *
2402 					sizeof(struct eth_device_info), 0,
2403 					socket_id);
2404 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2405 			(uint32_t *)rx_adapter->rss_key_be,
2406 			    RTE_DIM(default_rss_key));
2407 
2408 	if (rx_adapter->eth_devices == NULL) {
2409 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2410 		rte_free(rx_adapter);
2411 		return -ENOMEM;
2412 	}
2413 
2414 	rte_spinlock_init(&rx_adapter->rx_lock);
2415 
2416 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2417 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2418 
2419 	/* Rx adapter event buffer allocation */
2420 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2421 
2422 	if (!rx_adapter->use_queue_event_buf) {
2423 		buf = &rx_adapter->event_enqueue_buffer;
2424 		buf->events_size = rxa_params->event_buf_size;
2425 
2426 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2427 					    buf->events_size * sizeof(*events),
2428 					    0, socket_id);
2429 		if (events == NULL) {
2430 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2431 					 "for adapter event buffer");
2432 			rte_free(rx_adapter->eth_devices);
2433 			rte_free(rx_adapter);
2434 			return -ENOMEM;
2435 		}
2436 
2437 		rx_adapter->event_enqueue_buffer.events = events;
2438 	}
2439 
2440 	event_eth_rx_adapter[id] = rx_adapter;
2441 
2442 	if (conf_cb == rxa_default_conf_cb)
2443 		rx_adapter->default_cb_arg = 1;
2444 
2445 	if (rte_mbuf_dyn_rx_timestamp_register(
2446 			&event_eth_rx_timestamp_dynfield_offset,
2447 			&event_eth_rx_timestamp_dynflag) != 0) {
2448 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2449 		return -rte_errno;
2450 	}
2451 
2452 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2453 		conf_arg);
2454 	return 0;
2455 }
2456 
2457 int
2458 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2459 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2460 				void *conf_arg)
2461 {
2462 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2463 
2464 	/* use default values for adapter params */
2465 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2466 	rxa_params.use_queue_event_buf = false;
2467 
2468 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2469 }
2470 
2471 int
2472 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2473 			struct rte_event_port_conf *port_config,
2474 			struct rte_event_eth_rx_adapter_params *rxa_params)
2475 {
2476 	struct rte_event_port_conf *pc;
2477 	int ret;
2478 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2479 
2480 	if (port_config == NULL)
2481 		return -EINVAL;
2482 
2483 	if (rxa_params == NULL) {
2484 		/* use default values if rxa_params is NULL */
2485 		rxa_params = &temp_params;
2486 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2487 		rxa_params->use_queue_event_buf = false;
2488 	} else if ((!rxa_params->use_queue_event_buf &&
2489 		    rxa_params->event_buf_size == 0) ||
2490 		   (rxa_params->use_queue_event_buf &&
2491 		    rxa_params->event_buf_size != 0)) {
2492 		RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2493 		return -EINVAL;
2494 	} else if (!rxa_params->use_queue_event_buf) {
2495 		/* adjust event buff size with BATCH_SIZE used for fetching
2496 		 * packets from NIC rx queues to get full buffer utilization
2497 		 * and prevent unnecessary rollovers.
2498 		 */
2499 
2500 		rxa_params->event_buf_size =
2501 			RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2502 		rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2503 	}
2504 
2505 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2506 	if (pc == NULL)
2507 		return -ENOMEM;
2508 
2509 	*pc = *port_config;
2510 
2511 	ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2512 	if (ret)
2513 		rte_free(pc);
2514 
2515 	return ret;
2516 }
2517 
2518 int
2519 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2520 		struct rte_event_port_conf *port_config)
2521 {
2522 	struct rte_event_port_conf *pc;
2523 	int ret;
2524 
2525 	if (port_config == NULL)
2526 		return -EINVAL;
2527 
2528 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2529 
2530 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2531 	if (pc == NULL)
2532 		return -ENOMEM;
2533 	*pc = *port_config;
2534 
2535 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2536 					rxa_default_conf_cb,
2537 					pc);
2538 	if (ret)
2539 		rte_free(pc);
2540 	return ret;
2541 }
2542 
2543 int
2544 rte_event_eth_rx_adapter_free(uint8_t id)
2545 {
2546 	struct event_eth_rx_adapter *rx_adapter;
2547 
2548 	if (rxa_memzone_lookup())
2549 		return -ENOMEM;
2550 
2551 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2552 
2553 	rx_adapter = rxa_id_to_adapter(id);
2554 	if (rx_adapter == NULL)
2555 		return -EINVAL;
2556 
2557 	if (rx_adapter->nb_queues) {
2558 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2559 				rx_adapter->nb_queues);
2560 		return -EBUSY;
2561 	}
2562 
2563 	if (rx_adapter->default_cb_arg)
2564 		rte_free(rx_adapter->conf_arg);
2565 	rte_free(rx_adapter->eth_devices);
2566 	if (!rx_adapter->use_queue_event_buf)
2567 		rte_free(rx_adapter->event_enqueue_buffer.events);
2568 	rte_free(rx_adapter);
2569 	event_eth_rx_adapter[id] = NULL;
2570 
2571 	rte_eventdev_trace_eth_rx_adapter_free(id);
2572 	return 0;
2573 }
2574 
2575 int
2576 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2577 		uint16_t eth_dev_id,
2578 		int32_t rx_queue_id,
2579 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2580 {
2581 	int ret;
2582 	uint32_t cap;
2583 	struct event_eth_rx_adapter *rx_adapter;
2584 	struct rte_eventdev *dev;
2585 	struct eth_device_info *dev_info;
2586 	struct rte_event_eth_rx_adapter_vector_limits limits;
2587 
2588 	if (rxa_memzone_lookup())
2589 		return -ENOMEM;
2590 
2591 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2592 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2593 
2594 	rx_adapter = rxa_id_to_adapter(id);
2595 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2596 		return -EINVAL;
2597 
2598 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2599 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2600 						eth_dev_id,
2601 						&cap);
2602 	if (ret) {
2603 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2604 			"eth port %" PRIu16, id, eth_dev_id);
2605 		return ret;
2606 	}
2607 
2608 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2609 		&& (queue_conf->rx_queue_flags &
2610 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2611 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2612 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2613 				eth_dev_id, id);
2614 		return -EINVAL;
2615 	}
2616 
2617 	if (queue_conf->rx_queue_flags &
2618 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2619 
2620 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2621 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2622 					 " eth port: %" PRIu16
2623 					 " adapter id: %" PRIu8,
2624 					 eth_dev_id, id);
2625 			return -EINVAL;
2626 		}
2627 
2628 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2629 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2630 		if (ret < 0) {
2631 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2632 					 " eth port: %" PRIu16
2633 					 " adapter id: %" PRIu8,
2634 					 eth_dev_id, id);
2635 			return -EINVAL;
2636 		}
2637 		if (queue_conf->vector_sz < limits.min_sz ||
2638 		    queue_conf->vector_sz > limits.max_sz ||
2639 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2640 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2641 		    queue_conf->vector_mp == NULL) {
2642 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2643 					 " eth port: %" PRIu16
2644 					 " adapter id: %" PRIu8,
2645 					 eth_dev_id, id);
2646 			return -EINVAL;
2647 		}
2648 		if (queue_conf->vector_mp->elt_size <
2649 		    (sizeof(struct rte_event_vector) +
2650 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2651 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2652 					 " eth port: %" PRIu16
2653 					 " adapter id: %" PRIu8,
2654 					 eth_dev_id, id);
2655 			return -EINVAL;
2656 		}
2657 	}
2658 
2659 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2660 		(rx_queue_id != -1)) {
2661 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2662 			"event queue, eth port: %" PRIu16 " adapter id: %"
2663 			PRIu8, eth_dev_id, id);
2664 		return -EINVAL;
2665 	}
2666 
2667 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2668 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2669 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2670 			 (uint16_t)rx_queue_id);
2671 		return -EINVAL;
2672 	}
2673 
2674 	if ((rx_adapter->use_queue_event_buf &&
2675 	     queue_conf->event_buf_size == 0) ||
2676 	    (!rx_adapter->use_queue_event_buf &&
2677 	     queue_conf->event_buf_size != 0)) {
2678 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2679 		return -EINVAL;
2680 	}
2681 
2682 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2683 
2684 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2685 		if (*dev->dev_ops->eth_rx_adapter_queue_add == NULL)
2686 			return -ENOTSUP;
2687 		if (dev_info->rx_queue == NULL) {
2688 			dev_info->rx_queue =
2689 			    rte_zmalloc_socket(rx_adapter->mem_name,
2690 					dev_info->dev->data->nb_rx_queues *
2691 					sizeof(struct eth_rx_queue_info), 0,
2692 					rx_adapter->socket_id);
2693 			if (dev_info->rx_queue == NULL)
2694 				return -ENOMEM;
2695 		}
2696 
2697 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2698 				&rte_eth_devices[eth_dev_id],
2699 				rx_queue_id, queue_conf);
2700 		if (ret == 0) {
2701 			dev_info->internal_event_port = 1;
2702 			rxa_update_queue(rx_adapter,
2703 					&rx_adapter->eth_devices[eth_dev_id],
2704 					rx_queue_id,
2705 					1);
2706 		}
2707 	} else {
2708 		rte_spinlock_lock(&rx_adapter->rx_lock);
2709 		dev_info->internal_event_port = 0;
2710 		ret = rxa_init_service(rx_adapter, id);
2711 		if (ret == 0) {
2712 			uint32_t service_id = rx_adapter->service_id;
2713 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2714 					queue_conf);
2715 			rte_service_component_runstate_set(service_id,
2716 				rxa_sw_adapter_queue_count(rx_adapter));
2717 		}
2718 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2719 	}
2720 
2721 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2722 		rx_queue_id, queue_conf, ret);
2723 	if (ret)
2724 		return ret;
2725 
2726 	return 0;
2727 }
2728 
2729 static int
2730 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2731 {
2732 	limits->max_sz = MAX_VECTOR_SIZE;
2733 	limits->min_sz = MIN_VECTOR_SIZE;
2734 	limits->max_timeout_ns = MAX_VECTOR_NS;
2735 	limits->min_timeout_ns = MIN_VECTOR_NS;
2736 
2737 	return 0;
2738 }
2739 
2740 int
2741 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2742 				int32_t rx_queue_id)
2743 {
2744 	int ret = 0;
2745 	struct rte_eventdev *dev;
2746 	struct event_eth_rx_adapter *rx_adapter;
2747 	struct eth_device_info *dev_info;
2748 	uint32_t cap;
2749 	uint32_t nb_rx_poll = 0;
2750 	uint32_t nb_wrr = 0;
2751 	uint32_t nb_rx_intr;
2752 	struct eth_rx_poll_entry *rx_poll = NULL;
2753 	uint32_t *rx_wrr = NULL;
2754 	int num_intr_vec;
2755 
2756 	if (rxa_memzone_lookup())
2757 		return -ENOMEM;
2758 
2759 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2760 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2761 
2762 	rx_adapter = rxa_id_to_adapter(id);
2763 	if (rx_adapter == NULL)
2764 		return -EINVAL;
2765 
2766 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2767 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2768 						eth_dev_id,
2769 						&cap);
2770 	if (ret)
2771 		return ret;
2772 
2773 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2774 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2775 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2776 			 (uint16_t)rx_queue_id);
2777 		return -EINVAL;
2778 	}
2779 
2780 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2781 
2782 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2783 		if (*dev->dev_ops->eth_rx_adapter_queue_del == NULL)
2784 			return -ENOTSUP;
2785 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2786 						&rte_eth_devices[eth_dev_id],
2787 						rx_queue_id);
2788 		if (ret == 0) {
2789 			rxa_update_queue(rx_adapter,
2790 					&rx_adapter->eth_devices[eth_dev_id],
2791 					rx_queue_id,
2792 					0);
2793 			if (dev_info->nb_dev_queues == 0) {
2794 				rte_free(dev_info->rx_queue);
2795 				dev_info->rx_queue = NULL;
2796 			}
2797 		}
2798 	} else {
2799 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2800 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2801 
2802 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2803 			&rx_poll, &rx_wrr);
2804 		if (ret)
2805 			return ret;
2806 
2807 		rte_spinlock_lock(&rx_adapter->rx_lock);
2808 
2809 		num_intr_vec = 0;
2810 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2811 
2812 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2813 						rx_queue_id, 0);
2814 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2815 					rx_queue_id);
2816 			if (ret)
2817 				goto unlock_ret;
2818 		}
2819 
2820 		if (nb_rx_intr == 0) {
2821 			ret = rxa_free_intr_resources(rx_adapter);
2822 			if (ret)
2823 				goto unlock_ret;
2824 		}
2825 
2826 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2827 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2828 
2829 		rte_free(rx_adapter->eth_rx_poll);
2830 		rte_free(rx_adapter->wrr_sched);
2831 
2832 		if (nb_rx_intr == 0) {
2833 			rte_free(dev_info->intr_queue);
2834 			dev_info->intr_queue = NULL;
2835 		}
2836 
2837 		rx_adapter->eth_rx_poll = rx_poll;
2838 		rx_adapter->wrr_sched = rx_wrr;
2839 		rx_adapter->wrr_len = nb_wrr;
2840 		/*
2841 		 * reset next poll start position (wrr_pos) to avoid buffer
2842 		 * overrun when wrr_len is reduced in case of queue delete
2843 		 */
2844 		rx_adapter->wrr_pos = 0;
2845 		rx_adapter->num_intr_vec += num_intr_vec;
2846 
2847 		if (dev_info->nb_dev_queues == 0) {
2848 			rte_free(dev_info->rx_queue);
2849 			dev_info->rx_queue = NULL;
2850 		}
2851 unlock_ret:
2852 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2853 		if (ret) {
2854 			rte_free(rx_poll);
2855 			rte_free(rx_wrr);
2856 			return ret;
2857 		}
2858 
2859 		rte_service_component_runstate_set(rx_adapter->service_id,
2860 				rxa_sw_adapter_queue_count(rx_adapter));
2861 	}
2862 
2863 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2864 		rx_queue_id, ret);
2865 
2866 	return ret;
2867 }
2868 
2869 int
2870 rte_event_eth_rx_adapter_vector_limits_get(
2871 	uint8_t dev_id, uint16_t eth_port_id,
2872 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2873 {
2874 	struct rte_eventdev *dev;
2875 	uint32_t cap;
2876 	int ret;
2877 
2878 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2879 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2880 
2881 	if (limits == NULL)
2882 		return -EINVAL;
2883 
2884 	dev = &rte_eventdevs[dev_id];
2885 
2886 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2887 	if (ret) {
2888 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2889 				 "eth port %" PRIu16,
2890 				 dev_id, eth_port_id);
2891 		return ret;
2892 	}
2893 
2894 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2895 		if (*dev->dev_ops->eth_rx_adapter_vector_limits_get == NULL)
2896 			return -ENOTSUP;
2897 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2898 			dev, &rte_eth_devices[eth_port_id], limits);
2899 	} else {
2900 		ret = rxa_sw_vector_limits(limits);
2901 	}
2902 
2903 	return ret;
2904 }
2905 
2906 int
2907 rte_event_eth_rx_adapter_start(uint8_t id)
2908 {
2909 	rte_eventdev_trace_eth_rx_adapter_start(id);
2910 	return rxa_ctrl(id, 1);
2911 }
2912 
2913 int
2914 rte_event_eth_rx_adapter_stop(uint8_t id)
2915 {
2916 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2917 	return rxa_ctrl(id, 0);
2918 }
2919 
2920 static inline void
2921 rxa_queue_stats_reset(struct eth_rx_queue_info *queue_info)
2922 {
2923 	struct rte_event_eth_rx_adapter_stats *q_stats;
2924 
2925 	q_stats = queue_info->stats;
2926 	memset(q_stats, 0, sizeof(*q_stats));
2927 }
2928 
2929 int
2930 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2931 			       struct rte_event_eth_rx_adapter_stats *stats)
2932 {
2933 	struct event_eth_rx_adapter *rx_adapter;
2934 	struct eth_event_enqueue_buffer *buf;
2935 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2936 	struct rte_event_eth_rx_adapter_stats dev_stats;
2937 	struct rte_eventdev *dev;
2938 	struct eth_device_info *dev_info;
2939 	struct eth_rx_queue_info *queue_info;
2940 	struct rte_event_eth_rx_adapter_stats *q_stats;
2941 	uint32_t i, j;
2942 	int ret;
2943 
2944 	if (rxa_memzone_lookup())
2945 		return -ENOMEM;
2946 
2947 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2948 
2949 	rx_adapter = rxa_id_to_adapter(id);
2950 	if (rx_adapter  == NULL || stats == NULL)
2951 		return -EINVAL;
2952 
2953 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2954 	memset(stats, 0, sizeof(*stats));
2955 
2956 	if (rx_adapter->service_inited)
2957 		*stats = rx_adapter->stats;
2958 
2959 	RTE_ETH_FOREACH_DEV(i) {
2960 		dev_info = &rx_adapter->eth_devices[i];
2961 
2962 		if (rx_adapter->use_queue_event_buf && dev_info->rx_queue) {
2963 
2964 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
2965 			     j++) {
2966 				queue_info = &dev_info->rx_queue[j];
2967 				if (!queue_info->queue_enabled)
2968 					continue;
2969 				q_stats = queue_info->stats;
2970 
2971 				stats->rx_packets += q_stats->rx_packets;
2972 				stats->rx_poll_count += q_stats->rx_poll_count;
2973 				stats->rx_enq_count += q_stats->rx_enq_count;
2974 				stats->rx_enq_retry += q_stats->rx_enq_retry;
2975 				stats->rx_dropped += q_stats->rx_dropped;
2976 				stats->rx_enq_block_cycles +=
2977 						q_stats->rx_enq_block_cycles;
2978 			}
2979 		}
2980 
2981 		if (dev_info->internal_event_port == 0 ||
2982 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2983 			continue;
2984 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2985 						&rte_eth_devices[i],
2986 						&dev_stats);
2987 		if (ret)
2988 			continue;
2989 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
2990 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2991 	}
2992 
2993 	buf = &rx_adapter->event_enqueue_buffer;
2994 	stats->rx_packets += dev_stats_sum.rx_packets;
2995 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2996 	stats->rx_event_buf_count = buf->count;
2997 	stats->rx_event_buf_size = buf->events_size;
2998 
2999 	return 0;
3000 }
3001 
3002 int
3003 rte_event_eth_rx_adapter_queue_stats_get(uint8_t id,
3004 		uint16_t eth_dev_id,
3005 		uint16_t rx_queue_id,
3006 		struct rte_event_eth_rx_adapter_queue_stats *stats)
3007 {
3008 	struct event_eth_rx_adapter *rx_adapter;
3009 	struct eth_device_info *dev_info;
3010 	struct eth_rx_queue_info *queue_info;
3011 	struct eth_event_enqueue_buffer *event_buf;
3012 	struct rte_event_eth_rx_adapter_stats *q_stats;
3013 	struct rte_eventdev *dev;
3014 
3015 	if (rxa_memzone_lookup())
3016 		return -ENOMEM;
3017 
3018 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3019 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3020 
3021 	rx_adapter = rxa_id_to_adapter(id);
3022 
3023 	if (rx_adapter == NULL || stats == NULL)
3024 		return -EINVAL;
3025 
3026 	if (!rx_adapter->use_queue_event_buf)
3027 		return -EINVAL;
3028 
3029 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3030 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3031 		return -EINVAL;
3032 	}
3033 
3034 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3035 	if (dev_info->rx_queue == NULL ||
3036 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3037 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3038 		return -EINVAL;
3039 	}
3040 
3041 	if (dev_info->internal_event_port == 0) {
3042 		queue_info = &dev_info->rx_queue[rx_queue_id];
3043 		event_buf = queue_info->event_buf;
3044 		q_stats = queue_info->stats;
3045 
3046 		stats->rx_event_buf_count = event_buf->count;
3047 		stats->rx_event_buf_size = event_buf->events_size;
3048 		stats->rx_packets = q_stats->rx_packets;
3049 		stats->rx_poll_count = q_stats->rx_poll_count;
3050 		stats->rx_dropped = q_stats->rx_dropped;
3051 	}
3052 
3053 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3054 	if (dev->dev_ops->eth_rx_adapter_queue_stats_get != NULL) {
3055 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_get)(dev,
3056 						&rte_eth_devices[eth_dev_id],
3057 						rx_queue_id, stats);
3058 	}
3059 
3060 	return 0;
3061 }
3062 
3063 int
3064 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
3065 {
3066 	struct event_eth_rx_adapter *rx_adapter;
3067 	struct rte_eventdev *dev;
3068 	struct eth_device_info *dev_info;
3069 	struct eth_rx_queue_info *queue_info;
3070 	uint32_t i, j;
3071 
3072 	if (rxa_memzone_lookup())
3073 		return -ENOMEM;
3074 
3075 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3076 
3077 	rx_adapter = rxa_id_to_adapter(id);
3078 	if (rx_adapter == NULL)
3079 		return -EINVAL;
3080 
3081 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3082 
3083 	RTE_ETH_FOREACH_DEV(i) {
3084 		dev_info = &rx_adapter->eth_devices[i];
3085 
3086 		if (rx_adapter->use_queue_event_buf  && dev_info->rx_queue) {
3087 
3088 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3089 						j++) {
3090 				queue_info = &dev_info->rx_queue[j];
3091 				if (!queue_info->queue_enabled)
3092 					continue;
3093 				rxa_queue_stats_reset(queue_info);
3094 			}
3095 		}
3096 
3097 		if (dev_info->internal_event_port == 0 ||
3098 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
3099 			continue;
3100 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
3101 							&rte_eth_devices[i]);
3102 	}
3103 
3104 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
3105 
3106 	return 0;
3107 }
3108 
3109 int
3110 rte_event_eth_rx_adapter_queue_stats_reset(uint8_t id,
3111 		uint16_t eth_dev_id,
3112 		uint16_t rx_queue_id)
3113 {
3114 	struct event_eth_rx_adapter *rx_adapter;
3115 	struct eth_device_info *dev_info;
3116 	struct eth_rx_queue_info *queue_info;
3117 	struct rte_eventdev *dev;
3118 
3119 	if (rxa_memzone_lookup())
3120 		return -ENOMEM;
3121 
3122 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3123 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3124 
3125 	rx_adapter = rxa_id_to_adapter(id);
3126 	if (rx_adapter == NULL)
3127 		return -EINVAL;
3128 
3129 	if (!rx_adapter->use_queue_event_buf)
3130 		return -EINVAL;
3131 
3132 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3133 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3134 		return -EINVAL;
3135 	}
3136 
3137 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3138 
3139 	if (dev_info->rx_queue == NULL ||
3140 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3141 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3142 		return -EINVAL;
3143 	}
3144 
3145 	if (dev_info->internal_event_port == 0) {
3146 		queue_info = &dev_info->rx_queue[rx_queue_id];
3147 		rxa_queue_stats_reset(queue_info);
3148 	}
3149 
3150 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3151 	if (dev->dev_ops->eth_rx_adapter_queue_stats_reset != NULL) {
3152 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_reset)(dev,
3153 						&rte_eth_devices[eth_dev_id],
3154 						rx_queue_id);
3155 	}
3156 
3157 	return 0;
3158 }
3159 
3160 int
3161 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
3162 {
3163 	struct event_eth_rx_adapter *rx_adapter;
3164 
3165 	if (rxa_memzone_lookup())
3166 		return -ENOMEM;
3167 
3168 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3169 
3170 	rx_adapter = rxa_id_to_adapter(id);
3171 	if (rx_adapter == NULL || service_id == NULL)
3172 		return -EINVAL;
3173 
3174 	if (rx_adapter->service_inited)
3175 		*service_id = rx_adapter->service_id;
3176 
3177 	return rx_adapter->service_inited ? 0 : -ESRCH;
3178 }
3179 
3180 int
3181 rte_event_eth_rx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
3182 {
3183 	struct event_eth_rx_adapter *rx_adapter;
3184 
3185 	if (rxa_memzone_lookup())
3186 		return -ENOMEM;
3187 
3188 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3189 
3190 	rx_adapter = rxa_id_to_adapter(id);
3191 	if (rx_adapter == NULL || event_port_id == NULL)
3192 		return -EINVAL;
3193 
3194 	if (rx_adapter->service_inited)
3195 		*event_port_id = rx_adapter->event_port_id;
3196 
3197 	return rx_adapter->service_inited ? 0 : -ESRCH;
3198 }
3199 
3200 int
3201 rte_event_eth_rx_adapter_cb_register(uint8_t id,
3202 					uint16_t eth_dev_id,
3203 					rte_event_eth_rx_adapter_cb_fn cb_fn,
3204 					void *cb_arg)
3205 {
3206 	struct event_eth_rx_adapter *rx_adapter;
3207 	struct eth_device_info *dev_info;
3208 	uint32_t cap;
3209 	int ret;
3210 
3211 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3212 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3213 
3214 	rx_adapter = rxa_id_to_adapter(id);
3215 	if (rx_adapter == NULL)
3216 		return -EINVAL;
3217 
3218 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3219 	if (dev_info->rx_queue == NULL)
3220 		return -EINVAL;
3221 
3222 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3223 						eth_dev_id,
3224 						&cap);
3225 	if (ret) {
3226 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3227 			"eth port %" PRIu16, id, eth_dev_id);
3228 		return ret;
3229 	}
3230 
3231 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3232 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
3233 				PRIu16, eth_dev_id);
3234 		return -EINVAL;
3235 	}
3236 
3237 	rte_spinlock_lock(&rx_adapter->rx_lock);
3238 	dev_info->cb_fn = cb_fn;
3239 	dev_info->cb_arg = cb_arg;
3240 	rte_spinlock_unlock(&rx_adapter->rx_lock);
3241 
3242 	return 0;
3243 }
3244 
3245 int
3246 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
3247 			uint16_t eth_dev_id,
3248 			uint16_t rx_queue_id,
3249 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
3250 {
3251 #define TICK2NSEC(_ticks, _freq) (((_ticks) * (1E9)) / (_freq))
3252 	struct rte_eventdev *dev;
3253 	struct event_eth_rx_adapter *rx_adapter;
3254 	struct eth_device_info *dev_info;
3255 	struct eth_rx_queue_info *queue_info;
3256 	int ret;
3257 
3258 	if (rxa_memzone_lookup())
3259 		return -ENOMEM;
3260 
3261 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3262 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3263 
3264 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3265 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3266 		return -EINVAL;
3267 	}
3268 
3269 	if (queue_conf == NULL) {
3270 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3271 		return -EINVAL;
3272 	}
3273 
3274 	rx_adapter = rxa_id_to_adapter(id);
3275 	if (rx_adapter == NULL)
3276 		return -EINVAL;
3277 
3278 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3279 	if (dev_info->rx_queue == NULL ||
3280 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3281 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3282 		return -EINVAL;
3283 	}
3284 
3285 	queue_info = &dev_info->rx_queue[rx_queue_id];
3286 
3287 	memset(queue_conf, 0, sizeof(*queue_conf));
3288 	queue_conf->rx_queue_flags = 0;
3289 	if (queue_info->flow_id_mask != 0)
3290 		queue_conf->rx_queue_flags |=
3291 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3292 	queue_conf->servicing_weight = queue_info->wt;
3293 
3294 	queue_conf->ev.event = queue_info->event;
3295 
3296 	queue_conf->vector_sz = queue_info->vector_data.max_vector_count;
3297 	queue_conf->vector_mp = queue_info->vector_data.vector_pool;
3298 	/* need to be converted from ticks to ns */
3299 	queue_conf->vector_timeout_ns = TICK2NSEC(
3300 		queue_info->vector_data.vector_timeout_ticks, rte_get_timer_hz());
3301 
3302 	if (queue_info->event_buf != NULL)
3303 		queue_conf->event_buf_size = queue_info->event_buf->events_size;
3304 	else
3305 		queue_conf->event_buf_size = 0;
3306 
3307 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3308 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3309 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3310 						&rte_eth_devices[eth_dev_id],
3311 						rx_queue_id,
3312 						queue_conf);
3313 		return ret;
3314 	}
3315 
3316 	return 0;
3317 }
3318 
3319 static int
3320 rxa_is_queue_added(struct event_eth_rx_adapter *rx_adapter,
3321 		   uint16_t eth_dev_id,
3322 		   uint16_t rx_queue_id)
3323 {
3324 	struct eth_device_info *dev_info;
3325 	struct eth_rx_queue_info *queue_info;
3326 
3327 	if (!rx_adapter->eth_devices)
3328 		return 0;
3329 
3330 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3331 	if (!dev_info || !dev_info->rx_queue)
3332 		return 0;
3333 
3334 	queue_info = &dev_info->rx_queue[rx_queue_id];
3335 
3336 	return queue_info && queue_info->queue_enabled;
3337 }
3338 
3339 #define rxa_evdev(rx_adapter) (&rte_eventdevs[(rx_adapter)->eventdev_id])
3340 
3341 #define rxa_dev_instance_get(rx_adapter) \
3342 		rxa_evdev((rx_adapter))->dev_ops->eth_rx_adapter_instance_get
3343 
3344 int
3345 rte_event_eth_rx_adapter_instance_get(uint16_t eth_dev_id,
3346 				      uint16_t rx_queue_id,
3347 				      uint8_t *rxa_inst_id)
3348 {
3349 	uint8_t id;
3350 	int ret = -EINVAL;
3351 	uint32_t caps;
3352 	struct event_eth_rx_adapter *rx_adapter;
3353 
3354 	if (rxa_memzone_lookup())
3355 		return -ENOMEM;
3356 
3357 	if (eth_dev_id >= rte_eth_dev_count_avail()) {
3358 		RTE_EDEV_LOG_ERR("Invalid ethernet port id %u", eth_dev_id);
3359 		return -EINVAL;
3360 	}
3361 
3362 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3363 		RTE_EDEV_LOG_ERR("Invalid Rx queue %u", rx_queue_id);
3364 		return -EINVAL;
3365 	}
3366 
3367 	if (rxa_inst_id == NULL) {
3368 		RTE_EDEV_LOG_ERR("rxa_inst_id cannot be NULL");
3369 		return -EINVAL;
3370 	}
3371 
3372 	/* Iterate through all adapter instances */
3373 	for (id = 0; id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; id++) {
3374 		rx_adapter = rxa_id_to_adapter(id);
3375 		if (!rx_adapter)
3376 			continue;
3377 
3378 		if (rxa_is_queue_added(rx_adapter, eth_dev_id, rx_queue_id)) {
3379 			*rxa_inst_id = rx_adapter->id;
3380 			ret = 0;
3381 		}
3382 
3383 		/* Rx adapter internally mainatains queue information
3384 		 * for both internal port and DPDK service port.
3385 		 * Eventdev PMD callback is called for future proof only and
3386 		 * overrides the above return value if defined.
3387 		 */
3388 		caps = 0;
3389 		if (!rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3390 						      eth_dev_id,
3391 						      &caps)) {
3392 			if (caps & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3393 				ret = rxa_dev_instance_get(rx_adapter) ?
3394 						rxa_dev_instance_get(rx_adapter)
3395 								(eth_dev_id,
3396 								 rx_queue_id,
3397 								 rxa_inst_id)
3398 							: -EINVAL;
3399 			}
3400 		}
3401 
3402 		/* return if entry found */
3403 		if (ret == 0)
3404 			return ret;
3405 	}
3406 
3407 	return -EINVAL;
3408 }
3409 
3410 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_u64(d, #s, stats.s)
3411 
3412 static int
3413 handle_rxa_stats(const char *cmd __rte_unused,
3414 		 const char *params,
3415 		 struct rte_tel_data *d)
3416 {
3417 	uint8_t rx_adapter_id;
3418 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3419 
3420 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3421 		return -1;
3422 
3423 	/* Get Rx adapter ID from parameter string */
3424 	rx_adapter_id = atoi(params);
3425 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3426 
3427 	/* Get Rx adapter stats */
3428 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3429 					       &rx_adptr_stats)) {
3430 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats\n");
3431 		return -1;
3432 	}
3433 
3434 	rte_tel_data_start_dict(d);
3435 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3436 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3437 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3438 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3439 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3440 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3441 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3442 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3443 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3444 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3445 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3446 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3447 
3448 	return 0;
3449 }
3450 
3451 static int
3452 handle_rxa_stats_reset(const char *cmd __rte_unused,
3453 		       const char *params,
3454 		       struct rte_tel_data *d __rte_unused)
3455 {
3456 	uint8_t rx_adapter_id;
3457 
3458 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3459 		return -1;
3460 
3461 	/* Get Rx adapter ID from parameter string */
3462 	rx_adapter_id = atoi(params);
3463 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3464 
3465 	/* Reset Rx adapter stats */
3466 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3467 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats\n");
3468 		return -1;
3469 	}
3470 
3471 	return 0;
3472 }
3473 
3474 static int
3475 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3476 			  const char *params,
3477 			  struct rte_tel_data *d)
3478 {
3479 	uint8_t rx_adapter_id;
3480 	uint16_t rx_queue_id;
3481 	int eth_dev_id, ret = -1;
3482 	char *token, *l_params;
3483 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3484 
3485 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3486 		return -1;
3487 
3488 	/* Get Rx adapter ID from parameter string */
3489 	l_params = strdup(params);
3490 	if (l_params == NULL)
3491 		return -ENOMEM;
3492 	token = strtok(l_params, ",");
3493 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3494 	rx_adapter_id = strtoul(token, NULL, 10);
3495 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3496 
3497 	token = strtok(NULL, ",");
3498 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3499 
3500 	/* Get device ID from parameter string */
3501 	eth_dev_id = strtoul(token, NULL, 10);
3502 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3503 
3504 	token = strtok(NULL, ",");
3505 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3506 
3507 	/* Get Rx queue ID from parameter string */
3508 	rx_queue_id = strtoul(token, NULL, 10);
3509 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3510 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3511 		ret = -EINVAL;
3512 		goto error;
3513 	}
3514 
3515 	token = strtok(NULL, "\0");
3516 	if (token != NULL)
3517 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3518 				 " telemetry command, ignoring");
3519 	/* Parsing parameter finished */
3520 	free(l_params);
3521 
3522 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3523 						    rx_queue_id, &queue_conf)) {
3524 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3525 		return -1;
3526 	}
3527 
3528 	rte_tel_data_start_dict(d);
3529 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3530 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3531 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3532 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3533 	RXA_ADD_DICT(queue_conf, servicing_weight);
3534 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3535 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3536 	RXA_ADD_DICT(queue_conf.ev, priority);
3537 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3538 
3539 	return 0;
3540 
3541 error:
3542 	free(l_params);
3543 	return ret;
3544 }
3545 
3546 static int
3547 handle_rxa_get_queue_stats(const char *cmd __rte_unused,
3548 			   const char *params,
3549 			   struct rte_tel_data *d)
3550 {
3551 	uint8_t rx_adapter_id;
3552 	uint16_t rx_queue_id;
3553 	int eth_dev_id, ret = -1;
3554 	char *token, *l_params;
3555 	struct rte_event_eth_rx_adapter_queue_stats q_stats;
3556 
3557 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3558 		return -1;
3559 
3560 	/* Get Rx adapter ID from parameter string */
3561 	l_params = strdup(params);
3562 	if (l_params == NULL)
3563 		return -ENOMEM;
3564 	token = strtok(l_params, ",");
3565 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3566 	rx_adapter_id = strtoul(token, NULL, 10);
3567 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3568 
3569 	token = strtok(NULL, ",");
3570 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3571 
3572 	/* Get device ID from parameter string */
3573 	eth_dev_id = strtoul(token, NULL, 10);
3574 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3575 
3576 	token = strtok(NULL, ",");
3577 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3578 
3579 	/* Get Rx queue ID from parameter string */
3580 	rx_queue_id = strtoul(token, NULL, 10);
3581 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3582 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3583 		ret = -EINVAL;
3584 		goto error;
3585 	}
3586 
3587 	token = strtok(NULL, "\0");
3588 	if (token != NULL)
3589 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3590 				 " telemetry command, ignoring");
3591 	/* Parsing parameter finished */
3592 	free(l_params);
3593 
3594 	if (rte_event_eth_rx_adapter_queue_stats_get(rx_adapter_id, eth_dev_id,
3595 						    rx_queue_id, &q_stats)) {
3596 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue stats");
3597 		return -1;
3598 	}
3599 
3600 	rte_tel_data_start_dict(d);
3601 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3602 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3603 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3604 	RXA_ADD_DICT(q_stats, rx_event_buf_count);
3605 	RXA_ADD_DICT(q_stats, rx_event_buf_size);
3606 	RXA_ADD_DICT(q_stats, rx_poll_count);
3607 	RXA_ADD_DICT(q_stats, rx_packets);
3608 	RXA_ADD_DICT(q_stats, rx_dropped);
3609 
3610 	return 0;
3611 
3612 error:
3613 	free(l_params);
3614 	return ret;
3615 }
3616 
3617 static int
3618 handle_rxa_queue_stats_reset(const char *cmd __rte_unused,
3619 			     const char *params,
3620 			     struct rte_tel_data *d __rte_unused)
3621 {
3622 	uint8_t rx_adapter_id;
3623 	uint16_t rx_queue_id;
3624 	int eth_dev_id, ret = -1;
3625 	char *token, *l_params;
3626 
3627 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3628 		return -1;
3629 
3630 	/* Get Rx adapter ID from parameter string */
3631 	l_params = strdup(params);
3632 	if (l_params == NULL)
3633 		return -ENOMEM;
3634 	token = strtok(l_params, ",");
3635 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3636 	rx_adapter_id = strtoul(token, NULL, 10);
3637 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3638 
3639 	token = strtok(NULL, ",");
3640 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3641 
3642 	/* Get device ID from parameter string */
3643 	eth_dev_id = strtoul(token, NULL, 10);
3644 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3645 
3646 	token = strtok(NULL, ",");
3647 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3648 
3649 	/* Get Rx queue ID from parameter string */
3650 	rx_queue_id = strtoul(token, NULL, 10);
3651 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3652 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3653 		ret = -EINVAL;
3654 		goto error;
3655 	}
3656 
3657 	token = strtok(NULL, "\0");
3658 	if (token != NULL)
3659 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3660 				 " telemetry command, ignoring");
3661 	/* Parsing parameter finished */
3662 	free(l_params);
3663 
3664 	if (rte_event_eth_rx_adapter_queue_stats_reset(rx_adapter_id,
3665 						       eth_dev_id,
3666 						       rx_queue_id)) {
3667 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter queue stats");
3668 		return -1;
3669 	}
3670 
3671 	return 0;
3672 
3673 error:
3674 	free(l_params);
3675 	return ret;
3676 }
3677 
3678 static int
3679 handle_rxa_instance_get(const char *cmd __rte_unused,
3680 			const char *params,
3681 			struct rte_tel_data *d)
3682 {
3683 	uint8_t instance_id;
3684 	uint16_t rx_queue_id;
3685 	int eth_dev_id, ret = -1;
3686 	char *token, *l_params;
3687 
3688 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3689 		return -1;
3690 
3691 	l_params = strdup(params);
3692 	if (l_params == NULL)
3693 		return -ENOMEM;
3694 	token = strtok(l_params, ",");
3695 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3696 
3697 	/* Get device ID from parameter string */
3698 	eth_dev_id = strtoul(token, NULL, 10);
3699 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3700 
3701 	token = strtok(NULL, ",");
3702 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3703 
3704 	/* Get Rx queue ID from parameter string */
3705 	rx_queue_id = strtoul(token, NULL, 10);
3706 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3707 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3708 		ret = -EINVAL;
3709 		goto error;
3710 	}
3711 
3712 	token = strtok(NULL, "\0");
3713 	if (token != NULL)
3714 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3715 				 " telemetry command, ignoring");
3716 
3717 	/* Parsing parameter finished */
3718 	free(l_params);
3719 
3720 	if (rte_event_eth_rx_adapter_instance_get(eth_dev_id,
3721 						  rx_queue_id,
3722 						  &instance_id)) {
3723 		RTE_EDEV_LOG_ERR("Failed to get RX adapter instance ID "
3724 				 " for rx_queue_id = %d", rx_queue_id);
3725 		return -1;
3726 	}
3727 
3728 	rte_tel_data_start_dict(d);
3729 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3730 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3731 	rte_tel_data_add_dict_u64(d, "rxa_instance_id", instance_id);
3732 
3733 	return 0;
3734 
3735 error:
3736 	free(l_params);
3737 	return ret;
3738 }
3739 
3740 RTE_INIT(rxa_init_telemetry)
3741 {
3742 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3743 		handle_rxa_stats,
3744 		"Returns Rx adapter stats. Parameter: rxa_id");
3745 
3746 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3747 		handle_rxa_stats_reset,
3748 		"Reset Rx adapter stats. Parameter: rxa_id");
3749 
3750 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3751 		handle_rxa_get_queue_conf,
3752 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3753 
3754 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats",
3755 		handle_rxa_get_queue_stats,
3756 		"Returns Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3757 
3758 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats_reset",
3759 		handle_rxa_queue_stats_reset,
3760 		"Reset Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3761 
3762 	rte_telemetry_register_cmd("/eventdev/rxa_rxq_instance_get",
3763 		handle_rxa_instance_get,
3764 		"Returns Rx adapter instance id. Parameter: dev_id, queue_id");
3765 }
3766