xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 97b914f4e715565d53d38ac6e04815b9be5e58a9)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9 
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
21 #include <rte_telemetry.h>
22 
23 #include "rte_eventdev.h"
24 #include "eventdev_pmd.h"
25 #include "eventdev_trace.h"
26 #include "rte_event_eth_rx_adapter.h"
27 
28 #define BATCH_SIZE		32
29 #define BLOCK_CNT_THRESHOLD	10
30 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
31 #define MAX_VECTOR_SIZE		1024
32 #define MIN_VECTOR_SIZE		4
33 #define MAX_VECTOR_NS		1E9
34 #define MIN_VECTOR_NS		1E5
35 
36 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
37 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
38 
39 #define RSS_KEY_SIZE	40
40 /* value written to intr thread pipe to signal thread exit */
41 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
42 /* Sentinel value to detect initialized file handle */
43 #define INIT_FD		-1
44 
45 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
46 
47 /*
48  * Used to store port and queue ID of interrupting Rx queue
49  */
50 union queue_data {
51 	RTE_STD_C11
52 	void *ptr;
53 	struct {
54 		uint16_t port;
55 		uint16_t queue;
56 	};
57 };
58 
59 /*
60  * There is an instance of this struct per polled Rx queue added to the
61  * adapter
62  */
63 struct eth_rx_poll_entry {
64 	/* Eth port to poll */
65 	uint16_t eth_dev_id;
66 	/* Eth rx queue to poll */
67 	uint16_t eth_rx_qid;
68 };
69 
70 struct eth_rx_vector_data {
71 	TAILQ_ENTRY(eth_rx_vector_data) next;
72 	uint16_t port;
73 	uint16_t queue;
74 	uint16_t max_vector_count;
75 	uint64_t event;
76 	uint64_t ts;
77 	uint64_t vector_timeout_ticks;
78 	struct rte_mempool *vector_pool;
79 	struct rte_event_vector *vector_ev;
80 } __rte_cache_aligned;
81 
82 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
83 
84 /* Instance per adapter */
85 struct eth_event_enqueue_buffer {
86 	/* Count of events in this buffer */
87 	uint16_t count;
88 	/* Array of events in this buffer */
89 	struct rte_event *events;
90 	/* size of event buffer */
91 	uint16_t events_size;
92 	/* Event enqueue happens from head */
93 	uint16_t head;
94 	/* New packets from rte_eth_rx_burst is enqued from tail */
95 	uint16_t tail;
96 	/* last element in the buffer before rollover */
97 	uint16_t last;
98 	uint16_t last_mask;
99 };
100 
101 struct event_eth_rx_adapter {
102 	/* RSS key */
103 	uint8_t rss_key_be[RSS_KEY_SIZE];
104 	/* Event device identifier */
105 	uint8_t eventdev_id;
106 	/* Event port identifier */
107 	uint8_t event_port_id;
108 	/* Flag indicating per rxq event buffer */
109 	bool use_queue_event_buf;
110 	/* Per ethernet device structure */
111 	struct eth_device_info *eth_devices;
112 	/* Lock to serialize config updates with service function */
113 	rte_spinlock_t rx_lock;
114 	/* Max mbufs processed in any service function invocation */
115 	uint32_t max_nb_rx;
116 	/* Receive queues that need to be polled */
117 	struct eth_rx_poll_entry *eth_rx_poll;
118 	/* Size of the eth_rx_poll array */
119 	uint16_t num_rx_polled;
120 	/* Weighted round robin schedule */
121 	uint32_t *wrr_sched;
122 	/* wrr_sched[] size */
123 	uint32_t wrr_len;
124 	/* Next entry in wrr[] to begin polling */
125 	uint32_t wrr_pos;
126 	/* Event burst buffer */
127 	struct eth_event_enqueue_buffer event_enqueue_buffer;
128 	/* Vector enable flag */
129 	uint8_t ena_vector;
130 	/* Timestamp of previous vector expiry list traversal */
131 	uint64_t prev_expiry_ts;
132 	/* Minimum ticks to wait before traversing expiry list */
133 	uint64_t vector_tmo_ticks;
134 	/* vector list */
135 	struct eth_rx_vector_data_list vector_list;
136 	/* Per adapter stats */
137 	struct rte_event_eth_rx_adapter_stats stats;
138 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
139 	uint16_t enq_block_count;
140 	/* Block start ts */
141 	uint64_t rx_enq_block_start_ts;
142 	/* epoll fd used to wait for Rx interrupts */
143 	int epd;
144 	/* Num of interrupt driven interrupt queues */
145 	uint32_t num_rx_intr;
146 	/* Used to send <dev id, queue id> of interrupting Rx queues from
147 	 * the interrupt thread to the Rx thread
148 	 */
149 	struct rte_ring *intr_ring;
150 	/* Rx Queue data (dev id, queue id) for the last non-empty
151 	 * queue polled
152 	 */
153 	union queue_data qd;
154 	/* queue_data is valid */
155 	int qd_valid;
156 	/* Interrupt ring lock, synchronizes Rx thread
157 	 * and interrupt thread
158 	 */
159 	rte_spinlock_t intr_ring_lock;
160 	/* event array passed to rte_poll_wait */
161 	struct rte_epoll_event *epoll_events;
162 	/* Count of interrupt vectors in use */
163 	uint32_t num_intr_vec;
164 	/* Thread blocked on Rx interrupts */
165 	pthread_t rx_intr_thread;
166 	/* Configuration callback for rte_service configuration */
167 	rte_event_eth_rx_adapter_conf_cb conf_cb;
168 	/* Configuration callback argument */
169 	void *conf_arg;
170 	/* Set if  default_cb is being used */
171 	int default_cb_arg;
172 	/* Service initialization state */
173 	uint8_t service_inited;
174 	/* Total count of Rx queues in adapter */
175 	uint32_t nb_queues;
176 	/* Memory allocation name */
177 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
178 	/* Socket identifier cached from eventdev */
179 	int socket_id;
180 	/* Per adapter EAL service */
181 	uint32_t service_id;
182 	/* Adapter started flag */
183 	uint8_t rxa_started;
184 	/* Adapter ID */
185 	uint8_t id;
186 } __rte_cache_aligned;
187 
188 /* Per eth device */
189 struct eth_device_info {
190 	struct rte_eth_dev *dev;
191 	struct eth_rx_queue_info *rx_queue;
192 	/* Rx callback */
193 	rte_event_eth_rx_adapter_cb_fn cb_fn;
194 	/* Rx callback argument */
195 	void *cb_arg;
196 	/* Set if ethdev->eventdev packet transfer uses a
197 	 * hardware mechanism
198 	 */
199 	uint8_t internal_event_port;
200 	/* Set if the adapter is processing rx queues for
201 	 * this eth device and packet processing has been
202 	 * started, allows for the code to know if the PMD
203 	 * rx_adapter_stop callback needs to be invoked
204 	 */
205 	uint8_t dev_rx_started;
206 	/* Number of queues added for this device */
207 	uint16_t nb_dev_queues;
208 	/* Number of poll based queues
209 	 * If nb_rx_poll > 0, the start callback will
210 	 * be invoked if not already invoked
211 	 */
212 	uint16_t nb_rx_poll;
213 	/* Number of interrupt based queues
214 	 * If nb_rx_intr > 0, the start callback will
215 	 * be invoked if not already invoked.
216 	 */
217 	uint16_t nb_rx_intr;
218 	/* Number of queues that use the shared interrupt */
219 	uint16_t nb_shared_intr;
220 	/* sum(wrr(q)) for all queues within the device
221 	 * useful when deleting all device queues
222 	 */
223 	uint32_t wrr_len;
224 	/* Intr based queue index to start polling from, this is used
225 	 * if the number of shared interrupts is non-zero
226 	 */
227 	uint16_t next_q_idx;
228 	/* Intr based queue indices */
229 	uint16_t *intr_queue;
230 	/* device generates per Rx queue interrupt for queue index
231 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
232 	 */
233 	int multi_intr_cap;
234 	/* shared interrupt enabled */
235 	int shared_intr_enabled;
236 };
237 
238 /* Per Rx queue */
239 struct eth_rx_queue_info {
240 	int queue_enabled;	/* True if added */
241 	int intr_enabled;
242 	uint8_t ena_vector;
243 	uint16_t wt;		/* Polling weight */
244 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
245 	uint64_t event;
246 	struct eth_rx_vector_data vector_data;
247 	struct eth_event_enqueue_buffer *event_buf;
248 	/* use adapter stats struct for queue level stats,
249 	 * as same stats need to be updated for adapter and queue
250 	 */
251 	struct rte_event_eth_rx_adapter_stats *stats;
252 };
253 
254 static struct event_eth_rx_adapter **event_eth_rx_adapter;
255 
256 /* Enable dynamic timestamp field in mbuf */
257 static uint64_t event_eth_rx_timestamp_dynflag;
258 static int event_eth_rx_timestamp_dynfield_offset = -1;
259 
260 static inline rte_mbuf_timestamp_t *
261 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
262 {
263 	return RTE_MBUF_DYNFIELD(mbuf,
264 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
265 }
266 
267 static inline int
268 rxa_validate_id(uint8_t id)
269 {
270 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
271 }
272 
273 static inline struct eth_event_enqueue_buffer *
274 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
275 		  uint16_t rx_queue_id,
276 		  struct rte_event_eth_rx_adapter_stats **stats)
277 {
278 	if (rx_adapter->use_queue_event_buf) {
279 		struct eth_device_info *dev_info =
280 			&rx_adapter->eth_devices[eth_dev_id];
281 		*stats = dev_info->rx_queue[rx_queue_id].stats;
282 		return dev_info->rx_queue[rx_queue_id].event_buf;
283 	} else {
284 		*stats = &rx_adapter->stats;
285 		return &rx_adapter->event_enqueue_buffer;
286 	}
287 }
288 
289 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
290 	if (!rxa_validate_id(id)) { \
291 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
292 		return retval; \
293 	} \
294 } while (0)
295 
296 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(id, retval) do { \
297 	if (!rxa_validate_id(id)) { \
298 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
299 		ret = retval; \
300 		goto error; \
301 	} \
302 } while (0)
303 
304 #define RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, retval) do { \
305 	if ((token) == NULL || strlen(token) == 0 || !isdigit(*token)) { \
306 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter token\n"); \
307 		ret = retval; \
308 		goto error; \
309 	} \
310 } while (0)
311 
312 #define RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(port_id, retval) do { \
313 	if (!rte_eth_dev_is_valid_port(port_id)) { \
314 		RTE_ETHDEV_LOG(ERR, "Invalid port_id=%u\n", port_id); \
315 		ret = retval; \
316 		goto error; \
317 	} \
318 } while (0)
319 
320 static inline int
321 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
322 {
323 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
324 }
325 
326 /* Greatest common divisor */
327 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
328 {
329 	uint16_t r = a % b;
330 
331 	return r ? rxa_gcd_u16(b, r) : b;
332 }
333 
334 /* Returns the next queue in the polling sequence
335  *
336  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
337  */
338 static int
339 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
340 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
341 	     uint16_t gcd, int prev)
342 {
343 	int i = prev;
344 	uint16_t w;
345 
346 	while (1) {
347 		uint16_t q;
348 		uint16_t d;
349 
350 		i = (i + 1) % n;
351 		if (i == 0) {
352 			*cw = *cw - gcd;
353 			if (*cw <= 0)
354 				*cw = max_wt;
355 		}
356 
357 		q = eth_rx_poll[i].eth_rx_qid;
358 		d = eth_rx_poll[i].eth_dev_id;
359 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
360 
361 		if ((int)w >= *cw)
362 			return i;
363 	}
364 }
365 
366 static inline int
367 rxa_shared_intr(struct eth_device_info *dev_info,
368 	int rx_queue_id)
369 {
370 	int multi_intr_cap;
371 
372 	if (dev_info->dev->intr_handle == NULL)
373 		return 0;
374 
375 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
376 	return !multi_intr_cap ||
377 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
378 }
379 
380 static inline int
381 rxa_intr_queue(struct eth_device_info *dev_info,
382 	int rx_queue_id)
383 {
384 	struct eth_rx_queue_info *queue_info;
385 
386 	queue_info = &dev_info->rx_queue[rx_queue_id];
387 	return dev_info->rx_queue &&
388 		!dev_info->internal_event_port &&
389 		queue_info->queue_enabled && queue_info->wt == 0;
390 }
391 
392 static inline int
393 rxa_polled_queue(struct eth_device_info *dev_info,
394 	int rx_queue_id)
395 {
396 	struct eth_rx_queue_info *queue_info;
397 
398 	queue_info = &dev_info->rx_queue[rx_queue_id];
399 	return !dev_info->internal_event_port &&
400 		dev_info->rx_queue &&
401 		queue_info->queue_enabled && queue_info->wt != 0;
402 }
403 
404 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
405 static int
406 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
407 {
408 	uint16_t i;
409 	int n, s;
410 	uint16_t nbq;
411 
412 	nbq = dev_info->dev->data->nb_rx_queues;
413 	n = 0; /* non shared count */
414 	s = 0; /* shared count */
415 
416 	if (rx_queue_id == -1) {
417 		for (i = 0; i < nbq; i++) {
418 			if (!rxa_shared_intr(dev_info, i))
419 				n += add ? !rxa_intr_queue(dev_info, i) :
420 					rxa_intr_queue(dev_info, i);
421 			else
422 				s += add ? !rxa_intr_queue(dev_info, i) :
423 					rxa_intr_queue(dev_info, i);
424 		}
425 
426 		if (s > 0) {
427 			if ((add && dev_info->nb_shared_intr == 0) ||
428 				(!add && dev_info->nb_shared_intr))
429 				n += 1;
430 		}
431 	} else {
432 		if (!rxa_shared_intr(dev_info, rx_queue_id))
433 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
434 				rxa_intr_queue(dev_info, rx_queue_id);
435 		else
436 			n = add ? !dev_info->nb_shared_intr :
437 				dev_info->nb_shared_intr == 1;
438 	}
439 
440 	return add ? n : -n;
441 }
442 
443 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
444  */
445 static void
446 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
447 			  struct eth_device_info *dev_info, int rx_queue_id,
448 			  uint32_t *nb_rx_intr)
449 {
450 	uint32_t intr_diff;
451 
452 	if (rx_queue_id == -1)
453 		intr_diff = dev_info->nb_rx_intr;
454 	else
455 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
456 
457 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
458 }
459 
460 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
461  * interrupt queues could currently be poll mode Rx queues
462  */
463 static void
464 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
465 			  struct eth_device_info *dev_info, int rx_queue_id,
466 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
467 			  uint32_t *nb_wrr)
468 {
469 	uint32_t intr_diff;
470 	uint32_t poll_diff;
471 	uint32_t wrr_len_diff;
472 
473 	if (rx_queue_id == -1) {
474 		intr_diff = dev_info->dev->data->nb_rx_queues -
475 						dev_info->nb_rx_intr;
476 		poll_diff = dev_info->nb_rx_poll;
477 		wrr_len_diff = dev_info->wrr_len;
478 	} else {
479 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
480 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
481 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
482 					0;
483 	}
484 
485 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
486 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
487 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
488 }
489 
490 /* Calculate size of the eth_rx_poll and wrr_sched arrays
491  * after deleting poll mode rx queues
492  */
493 static void
494 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
495 			  struct eth_device_info *dev_info, int rx_queue_id,
496 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
497 {
498 	uint32_t poll_diff;
499 	uint32_t wrr_len_diff;
500 
501 	if (rx_queue_id == -1) {
502 		poll_diff = dev_info->nb_rx_poll;
503 		wrr_len_diff = dev_info->wrr_len;
504 	} else {
505 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
506 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
507 					0;
508 	}
509 
510 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
511 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
512 }
513 
514 /* Calculate nb_rx_* after adding poll mode rx queues
515  */
516 static void
517 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
518 			  struct eth_device_info *dev_info, int rx_queue_id,
519 			  uint16_t wt, uint32_t *nb_rx_poll,
520 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
521 {
522 	uint32_t intr_diff;
523 	uint32_t poll_diff;
524 	uint32_t wrr_len_diff;
525 
526 	if (rx_queue_id == -1) {
527 		intr_diff = dev_info->nb_rx_intr;
528 		poll_diff = dev_info->dev->data->nb_rx_queues -
529 						dev_info->nb_rx_poll;
530 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
531 				- dev_info->wrr_len;
532 	} else {
533 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
534 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
535 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
536 				wt - dev_info->rx_queue[rx_queue_id].wt :
537 				wt;
538 	}
539 
540 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
541 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
542 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
543 }
544 
545 /* Calculate nb_rx_* after adding rx_queue_id */
546 static void
547 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
548 		     struct eth_device_info *dev_info, int rx_queue_id,
549 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
550 		     uint32_t *nb_wrr)
551 {
552 	if (wt != 0)
553 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
554 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
555 	else
556 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
557 					nb_rx_poll, nb_rx_intr, nb_wrr);
558 }
559 
560 /* Calculate nb_rx_* after deleting rx_queue_id */
561 static void
562 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
563 		     struct eth_device_info *dev_info, int rx_queue_id,
564 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
565 		     uint32_t *nb_wrr)
566 {
567 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
568 				nb_wrr);
569 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
570 				nb_rx_intr);
571 }
572 
573 /*
574  * Allocate the rx_poll array
575  */
576 static struct eth_rx_poll_entry *
577 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
578 {
579 	size_t len;
580 
581 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
582 							RTE_CACHE_LINE_SIZE);
583 	return  rte_zmalloc_socket(rx_adapter->mem_name,
584 				len,
585 				RTE_CACHE_LINE_SIZE,
586 				rx_adapter->socket_id);
587 }
588 
589 /*
590  * Allocate the WRR array
591  */
592 static uint32_t *
593 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
594 {
595 	size_t len;
596 
597 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
598 			RTE_CACHE_LINE_SIZE);
599 	return  rte_zmalloc_socket(rx_adapter->mem_name,
600 				len,
601 				RTE_CACHE_LINE_SIZE,
602 				rx_adapter->socket_id);
603 }
604 
605 static int
606 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
607 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
608 		      uint32_t **wrr_sched)
609 {
610 
611 	if (nb_poll == 0) {
612 		*rx_poll = NULL;
613 		*wrr_sched = NULL;
614 		return 0;
615 	}
616 
617 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
618 	if (*rx_poll == NULL) {
619 		*wrr_sched = NULL;
620 		return -ENOMEM;
621 	}
622 
623 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
624 	if (*wrr_sched == NULL) {
625 		rte_free(*rx_poll);
626 		return -ENOMEM;
627 	}
628 	return 0;
629 }
630 
631 /* Precalculate WRR polling sequence for all queues in rx_adapter */
632 static void
633 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
634 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
635 {
636 	uint16_t d;
637 	uint16_t q;
638 	unsigned int i;
639 	int prev = -1;
640 	int cw = -1;
641 
642 	/* Initialize variables for calculation of wrr schedule */
643 	uint16_t max_wrr_pos = 0;
644 	unsigned int poll_q = 0;
645 	uint16_t max_wt = 0;
646 	uint16_t gcd = 0;
647 
648 	if (rx_poll == NULL)
649 		return;
650 
651 	/* Generate array of all queues to poll, the size of this
652 	 * array is poll_q
653 	 */
654 	RTE_ETH_FOREACH_DEV(d) {
655 		uint16_t nb_rx_queues;
656 		struct eth_device_info *dev_info =
657 				&rx_adapter->eth_devices[d];
658 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
659 		if (dev_info->rx_queue == NULL)
660 			continue;
661 		if (dev_info->internal_event_port)
662 			continue;
663 		dev_info->wrr_len = 0;
664 		for (q = 0; q < nb_rx_queues; q++) {
665 			struct eth_rx_queue_info *queue_info =
666 				&dev_info->rx_queue[q];
667 			uint16_t wt;
668 
669 			if (!rxa_polled_queue(dev_info, q))
670 				continue;
671 			wt = queue_info->wt;
672 			rx_poll[poll_q].eth_dev_id = d;
673 			rx_poll[poll_q].eth_rx_qid = q;
674 			max_wrr_pos += wt;
675 			dev_info->wrr_len += wt;
676 			max_wt = RTE_MAX(max_wt, wt);
677 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
678 			poll_q++;
679 		}
680 	}
681 
682 	/* Generate polling sequence based on weights */
683 	prev = -1;
684 	cw = -1;
685 	for (i = 0; i < max_wrr_pos; i++) {
686 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
687 				     rx_poll, max_wt, gcd, prev);
688 		prev = rx_wrr[i];
689 	}
690 }
691 
692 static inline void
693 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
694 	struct rte_ipv6_hdr **ipv6_hdr)
695 {
696 	struct rte_ether_hdr *eth_hdr =
697 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
698 	struct rte_vlan_hdr *vlan_hdr;
699 
700 	*ipv4_hdr = NULL;
701 	*ipv6_hdr = NULL;
702 
703 	switch (eth_hdr->ether_type) {
704 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
705 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
706 		break;
707 
708 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
709 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
710 		break;
711 
712 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
713 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
714 		switch (vlan_hdr->eth_proto) {
715 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
716 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
717 			break;
718 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
719 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
720 			break;
721 		default:
722 			break;
723 		}
724 		break;
725 
726 	default:
727 		break;
728 	}
729 }
730 
731 /* Calculate RSS hash for IPv4/6 */
732 static inline uint32_t
733 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
734 {
735 	uint32_t input_len;
736 	void *tuple;
737 	struct rte_ipv4_tuple ipv4_tuple;
738 	struct rte_ipv6_tuple ipv6_tuple;
739 	struct rte_ipv4_hdr *ipv4_hdr;
740 	struct rte_ipv6_hdr *ipv6_hdr;
741 
742 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
743 
744 	if (ipv4_hdr) {
745 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
746 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
747 		tuple = &ipv4_tuple;
748 		input_len = RTE_THASH_V4_L3_LEN;
749 	} else if (ipv6_hdr) {
750 		rte_thash_load_v6_addrs(ipv6_hdr,
751 					(union rte_thash_tuple *)&ipv6_tuple);
752 		tuple = &ipv6_tuple;
753 		input_len = RTE_THASH_V6_L3_LEN;
754 	} else
755 		return 0;
756 
757 	return rte_softrss_be(tuple, input_len, rss_key_be);
758 }
759 
760 static inline int
761 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
762 {
763 	return !!rx_adapter->enq_block_count;
764 }
765 
766 static inline void
767 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
768 {
769 	if (rx_adapter->rx_enq_block_start_ts)
770 		return;
771 
772 	rx_adapter->enq_block_count++;
773 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
774 		return;
775 
776 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
777 }
778 
779 static inline void
780 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
781 		     struct rte_event_eth_rx_adapter_stats *stats)
782 {
783 	if (unlikely(!stats->rx_enq_start_ts))
784 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
785 
786 	if (likely(!rxa_enq_blocked(rx_adapter)))
787 		return;
788 
789 	rx_adapter->enq_block_count = 0;
790 	if (rx_adapter->rx_enq_block_start_ts) {
791 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
792 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
793 		    rx_adapter->rx_enq_block_start_ts;
794 		rx_adapter->rx_enq_block_start_ts = 0;
795 	}
796 }
797 
798 /* Enqueue buffered events to event device */
799 static inline uint16_t
800 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
801 		       struct eth_event_enqueue_buffer *buf,
802 		       struct rte_event_eth_rx_adapter_stats *stats)
803 {
804 	uint16_t count = buf->count;
805 	uint16_t n = 0;
806 
807 	if (!count)
808 		return 0;
809 
810 	if (buf->last)
811 		count = buf->last - buf->head;
812 
813 	if (count) {
814 		n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
815 						rx_adapter->event_port_id,
816 						&buf->events[buf->head],
817 						count);
818 		if (n != count)
819 			stats->rx_enq_retry++;
820 
821 		buf->head += n;
822 	}
823 
824 	if (buf->last && n == count) {
825 		uint16_t n1;
826 
827 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
828 					rx_adapter->event_port_id,
829 					&buf->events[0],
830 					buf->tail);
831 
832 		if (n1 != buf->tail)
833 			stats->rx_enq_retry++;
834 
835 		buf->last = 0;
836 		buf->head = n1;
837 		buf->last_mask = 0;
838 		n += n1;
839 	}
840 
841 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
842 		rxa_enq_block_start_ts(rx_adapter);
843 
844 	buf->count -= n;
845 	stats->rx_enq_count += n;
846 
847 	return n;
848 }
849 
850 static inline void
851 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
852 		struct eth_rx_vector_data *vec)
853 {
854 	vec->vector_ev->nb_elem = 0;
855 	vec->vector_ev->port = vec->port;
856 	vec->vector_ev->queue = vec->queue;
857 	vec->vector_ev->attr_valid = true;
858 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
859 }
860 
861 static inline uint16_t
862 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
863 			struct eth_rx_queue_info *queue_info,
864 			struct eth_event_enqueue_buffer *buf,
865 			struct rte_mbuf **mbufs, uint16_t num)
866 {
867 	struct rte_event *ev = &buf->events[buf->count];
868 	struct eth_rx_vector_data *vec;
869 	uint16_t filled, space, sz;
870 
871 	filled = 0;
872 	vec = &queue_info->vector_data;
873 
874 	if (vec->vector_ev == NULL) {
875 		if (rte_mempool_get(vec->vector_pool,
876 				    (void **)&vec->vector_ev) < 0) {
877 			rte_pktmbuf_free_bulk(mbufs, num);
878 			return 0;
879 		}
880 		rxa_init_vector(rx_adapter, vec);
881 	}
882 	while (num) {
883 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
884 			/* Event ready. */
885 			ev->event = vec->event;
886 			ev->vec = vec->vector_ev;
887 			ev++;
888 			filled++;
889 			vec->vector_ev = NULL;
890 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
891 			if (rte_mempool_get(vec->vector_pool,
892 					    (void **)&vec->vector_ev) < 0) {
893 				rte_pktmbuf_free_bulk(mbufs, num);
894 				return 0;
895 			}
896 			rxa_init_vector(rx_adapter, vec);
897 		}
898 
899 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
900 		sz = num > space ? space : num;
901 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
902 		       sizeof(void *) * sz);
903 		vec->vector_ev->nb_elem += sz;
904 		num -= sz;
905 		mbufs += sz;
906 		vec->ts = rte_rdtsc();
907 	}
908 
909 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
910 		ev->event = vec->event;
911 		ev->vec = vec->vector_ev;
912 		ev++;
913 		filled++;
914 		vec->vector_ev = NULL;
915 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
916 	}
917 
918 	return filled;
919 }
920 
921 static inline void
922 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
923 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
924 		 struct eth_event_enqueue_buffer *buf,
925 		 struct rte_event_eth_rx_adapter_stats *stats)
926 {
927 	uint32_t i;
928 	struct eth_device_info *dev_info =
929 					&rx_adapter->eth_devices[eth_dev_id];
930 	struct eth_rx_queue_info *eth_rx_queue_info =
931 					&dev_info->rx_queue[rx_queue_id];
932 	uint16_t new_tail = buf->tail;
933 	uint64_t event = eth_rx_queue_info->event;
934 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
935 	struct rte_mbuf *m = mbufs[0];
936 	uint32_t rss_mask;
937 	uint32_t rss;
938 	int do_rss;
939 	uint16_t nb_cb;
940 	uint16_t dropped;
941 	uint64_t ts, ts_mask;
942 
943 	if (!eth_rx_queue_info->ena_vector) {
944 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
945 						0 : rte_get_tsc_cycles();
946 
947 		/* 0xffff ffff ffff ffff if RTE_MBUF_F_RX_TIMESTAMP is set,
948 		 * otherwise 0
949 		 */
950 		ts_mask = (uint64_t)(!(m->ol_flags &
951 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
952 
953 		/* 0xffff ffff if RTE_MBUF_F_RX_RSS_HASH is set, otherwise 0 */
954 		rss_mask = ~(((m->ol_flags & RTE_MBUF_F_RX_RSS_HASH) != 0) - 1);
955 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
956 		for (i = 0; i < num; i++) {
957 			struct rte_event *ev;
958 
959 			m = mbufs[i];
960 			*rxa_timestamp_dynfield(m) = ts |
961 					(*rxa_timestamp_dynfield(m) & ts_mask);
962 
963 			ev = &buf->events[new_tail];
964 
965 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
966 				     : m->hash.rss;
967 			ev->event = event;
968 			ev->flow_id = (rss & ~flow_id_mask) |
969 				      (ev->flow_id & flow_id_mask);
970 			ev->mbuf = m;
971 			new_tail++;
972 		}
973 	} else {
974 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
975 					      buf, mbufs, num);
976 	}
977 
978 	if (num && dev_info->cb_fn) {
979 
980 		dropped = 0;
981 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
982 				       buf->last |
983 				       (buf->events_size & ~buf->last_mask),
984 				       buf->count >= BATCH_SIZE ?
985 						buf->count - BATCH_SIZE : 0,
986 				       &buf->events[buf->tail],
987 				       num,
988 				       dev_info->cb_arg,
989 				       &dropped);
990 		if (unlikely(nb_cb > num))
991 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
992 				nb_cb, num);
993 		else
994 			num = nb_cb;
995 		if (dropped)
996 			stats->rx_dropped += dropped;
997 	}
998 
999 	buf->count += num;
1000 	buf->tail += num;
1001 }
1002 
1003 static inline bool
1004 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
1005 {
1006 	uint32_t nb_req = buf->tail + BATCH_SIZE;
1007 
1008 	if (!buf->last) {
1009 		if (nb_req <= buf->events_size)
1010 			return true;
1011 
1012 		if (buf->head >= BATCH_SIZE) {
1013 			buf->last_mask = ~0;
1014 			buf->last = buf->tail;
1015 			buf->tail = 0;
1016 			return true;
1017 		}
1018 	}
1019 
1020 	return nb_req <= buf->head;
1021 }
1022 
1023 /* Enqueue packets from  <port, q>  to event buffer */
1024 static inline uint32_t
1025 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
1026 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
1027 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf,
1028 	   struct rte_event_eth_rx_adapter_stats *stats)
1029 {
1030 	struct rte_mbuf *mbufs[BATCH_SIZE];
1031 	uint16_t n;
1032 	uint32_t nb_rx = 0;
1033 	uint32_t nb_flushed = 0;
1034 
1035 	if (rxq_empty)
1036 		*rxq_empty = 0;
1037 	/* Don't do a batch dequeue from the rx queue if there isn't
1038 	 * enough space in the enqueue buffer.
1039 	 */
1040 	while (rxa_pkt_buf_available(buf)) {
1041 		if (buf->count >= BATCH_SIZE)
1042 			nb_flushed +=
1043 				rxa_flush_event_buffer(rx_adapter, buf, stats);
1044 
1045 		stats->rx_poll_count++;
1046 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1047 		if (unlikely(!n)) {
1048 			if (rxq_empty)
1049 				*rxq_empty = 1;
1050 			break;
1051 		}
1052 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf,
1053 				 stats);
1054 		nb_rx += n;
1055 		if (rx_count + nb_rx > max_rx)
1056 			break;
1057 	}
1058 
1059 	if (buf->count > 0)
1060 		nb_flushed += rxa_flush_event_buffer(rx_adapter, buf, stats);
1061 
1062 	stats->rx_packets += nb_rx;
1063 	if (nb_flushed == 0)
1064 		rte_event_maintain(rx_adapter->eventdev_id,
1065 				   rx_adapter->event_port_id, 0);
1066 
1067 	return nb_rx;
1068 }
1069 
1070 static inline void
1071 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1072 {
1073 	uint16_t port_id;
1074 	uint16_t queue;
1075 	int err;
1076 	union queue_data qd;
1077 	struct eth_device_info *dev_info;
1078 	struct eth_rx_queue_info *queue_info;
1079 	int *intr_enabled;
1080 
1081 	qd.ptr = data;
1082 	port_id = qd.port;
1083 	queue = qd.queue;
1084 
1085 	dev_info = &rx_adapter->eth_devices[port_id];
1086 	queue_info = &dev_info->rx_queue[queue];
1087 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1088 	if (rxa_shared_intr(dev_info, queue))
1089 		intr_enabled = &dev_info->shared_intr_enabled;
1090 	else
1091 		intr_enabled = &queue_info->intr_enabled;
1092 
1093 	if (*intr_enabled) {
1094 		*intr_enabled = 0;
1095 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1096 		/* Entry should always be available.
1097 		 * The ring size equals the maximum number of interrupt
1098 		 * vectors supported (an interrupt vector is shared in
1099 		 * case of shared interrupts)
1100 		 */
1101 		if (err)
1102 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1103 				" to ring: %s", strerror(-err));
1104 		else
1105 			rte_eth_dev_rx_intr_disable(port_id, queue);
1106 	}
1107 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1108 }
1109 
1110 static int
1111 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1112 			  uint32_t num_intr_vec)
1113 {
1114 	if (rx_adapter->num_intr_vec + num_intr_vec >
1115 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1116 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1117 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1118 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1119 		return -ENOSPC;
1120 	}
1121 
1122 	return 0;
1123 }
1124 
1125 /* Delete entries for (dev, queue) from the interrupt ring */
1126 static void
1127 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1128 			  struct eth_device_info *dev_info,
1129 			  uint16_t rx_queue_id)
1130 {
1131 	int i, n;
1132 	union queue_data qd;
1133 
1134 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1135 
1136 	n = rte_ring_count(rx_adapter->intr_ring);
1137 	for (i = 0; i < n; i++) {
1138 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1139 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1140 			if (qd.port == dev_info->dev->data->port_id &&
1141 				qd.queue == rx_queue_id)
1142 				continue;
1143 		} else {
1144 			if (qd.port == dev_info->dev->data->port_id)
1145 				continue;
1146 		}
1147 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1148 	}
1149 
1150 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1151 }
1152 
1153 /* pthread callback handling interrupt mode receive queues
1154  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1155  * interrupting queue to the adapter's ring buffer for interrupt events.
1156  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1157  * the adapter service function.
1158  */
1159 static void *
1160 rxa_intr_thread(void *arg)
1161 {
1162 	struct event_eth_rx_adapter *rx_adapter = arg;
1163 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1164 	int n, i;
1165 
1166 	while (1) {
1167 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1168 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1169 		if (unlikely(n < 0))
1170 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1171 					n);
1172 		for (i = 0; i < n; i++) {
1173 			rxa_intr_ring_enqueue(rx_adapter,
1174 					epoll_events[i].epdata.data);
1175 		}
1176 	}
1177 
1178 	return NULL;
1179 }
1180 
1181 /* Dequeue <port, q> from interrupt ring and enqueue received
1182  * mbufs to eventdev
1183  */
1184 static inline void
1185 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1186 {
1187 	uint32_t n;
1188 	uint32_t nb_rx = 0;
1189 	int rxq_empty;
1190 	struct eth_event_enqueue_buffer *buf;
1191 	struct rte_event_eth_rx_adapter_stats *stats;
1192 	rte_spinlock_t *ring_lock;
1193 	uint8_t max_done = 0;
1194 
1195 	if (rx_adapter->num_rx_intr == 0)
1196 		return;
1197 
1198 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1199 		&& !rx_adapter->qd_valid)
1200 		return;
1201 
1202 	buf = &rx_adapter->event_enqueue_buffer;
1203 	stats = &rx_adapter->stats;
1204 	ring_lock = &rx_adapter->intr_ring_lock;
1205 
1206 	if (buf->count >= BATCH_SIZE)
1207 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1208 
1209 	while (rxa_pkt_buf_available(buf)) {
1210 		struct eth_device_info *dev_info;
1211 		uint16_t port;
1212 		uint16_t queue;
1213 		union queue_data qd  = rx_adapter->qd;
1214 		int err;
1215 
1216 		if (!rx_adapter->qd_valid) {
1217 			struct eth_rx_queue_info *queue_info;
1218 
1219 			rte_spinlock_lock(ring_lock);
1220 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1221 			if (err) {
1222 				rte_spinlock_unlock(ring_lock);
1223 				break;
1224 			}
1225 
1226 			port = qd.port;
1227 			queue = qd.queue;
1228 			rx_adapter->qd = qd;
1229 			rx_adapter->qd_valid = 1;
1230 			dev_info = &rx_adapter->eth_devices[port];
1231 			if (rxa_shared_intr(dev_info, queue))
1232 				dev_info->shared_intr_enabled = 1;
1233 			else {
1234 				queue_info = &dev_info->rx_queue[queue];
1235 				queue_info->intr_enabled = 1;
1236 			}
1237 			rte_eth_dev_rx_intr_enable(port, queue);
1238 			rte_spinlock_unlock(ring_lock);
1239 		} else {
1240 			port = qd.port;
1241 			queue = qd.queue;
1242 
1243 			dev_info = &rx_adapter->eth_devices[port];
1244 		}
1245 
1246 		if (rxa_shared_intr(dev_info, queue)) {
1247 			uint16_t i;
1248 			uint16_t nb_queues;
1249 
1250 			nb_queues = dev_info->dev->data->nb_rx_queues;
1251 			n = 0;
1252 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1253 				uint8_t enq_buffer_full;
1254 
1255 				if (!rxa_intr_queue(dev_info, i))
1256 					continue;
1257 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1258 					rx_adapter->max_nb_rx,
1259 					&rxq_empty, buf, stats);
1260 				nb_rx += n;
1261 
1262 				enq_buffer_full = !rxq_empty && n == 0;
1263 				max_done = nb_rx > rx_adapter->max_nb_rx;
1264 
1265 				if (enq_buffer_full || max_done) {
1266 					dev_info->next_q_idx = i;
1267 					goto done;
1268 				}
1269 			}
1270 
1271 			rx_adapter->qd_valid = 0;
1272 
1273 			/* Reinitialize for next interrupt */
1274 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1275 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1276 						0;
1277 		} else {
1278 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1279 				rx_adapter->max_nb_rx,
1280 				&rxq_empty, buf, stats);
1281 			rx_adapter->qd_valid = !rxq_empty;
1282 			nb_rx += n;
1283 			if (nb_rx > rx_adapter->max_nb_rx)
1284 				break;
1285 		}
1286 	}
1287 
1288 done:
1289 	rx_adapter->stats.rx_intr_packets += nb_rx;
1290 }
1291 
1292 /*
1293  * Polls receive queues added to the event adapter and enqueues received
1294  * packets to the event device.
1295  *
1296  * The receive code enqueues initially to a temporary buffer, the
1297  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1298  *
1299  * If there isn't space available in the temporary buffer, packets from the
1300  * Rx queue aren't dequeued from the eth device, this back pressures the
1301  * eth device, in virtual device environments this back pressure is relayed to
1302  * the hypervisor's switching layer where adjustments can be made to deal with
1303  * it.
1304  */
1305 static inline void
1306 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1307 {
1308 	uint32_t num_queue;
1309 	uint32_t nb_rx = 0;
1310 	struct eth_event_enqueue_buffer *buf = NULL;
1311 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1312 	uint32_t wrr_pos;
1313 	uint32_t max_nb_rx;
1314 
1315 	wrr_pos = rx_adapter->wrr_pos;
1316 	max_nb_rx = rx_adapter->max_nb_rx;
1317 
1318 	/* Iterate through a WRR sequence */
1319 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1320 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1321 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1322 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1323 
1324 		buf = rxa_event_buf_get(rx_adapter, d, qid, &stats);
1325 
1326 		/* Don't do a batch dequeue from the rx queue if there isn't
1327 		 * enough space in the enqueue buffer.
1328 		 */
1329 		if (buf->count >= BATCH_SIZE)
1330 			rxa_flush_event_buffer(rx_adapter, buf, stats);
1331 		if (!rxa_pkt_buf_available(buf)) {
1332 			if (rx_adapter->use_queue_event_buf)
1333 				goto poll_next_entry;
1334 			else {
1335 				rx_adapter->wrr_pos = wrr_pos;
1336 				return;
1337 			}
1338 		}
1339 
1340 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1341 				NULL, buf, stats);
1342 		if (nb_rx > max_nb_rx) {
1343 			rx_adapter->wrr_pos =
1344 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1345 			break;
1346 		}
1347 
1348 poll_next_entry:
1349 		if (++wrr_pos == rx_adapter->wrr_len)
1350 			wrr_pos = 0;
1351 	}
1352 }
1353 
1354 static void
1355 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1356 {
1357 	struct event_eth_rx_adapter *rx_adapter = arg;
1358 	struct eth_event_enqueue_buffer *buf = NULL;
1359 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1360 	struct rte_event *ev;
1361 
1362 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue, &stats);
1363 
1364 	if (buf->count)
1365 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1366 
1367 	if (vec->vector_ev->nb_elem == 0)
1368 		return;
1369 	ev = &buf->events[buf->count];
1370 
1371 	/* Event ready. */
1372 	ev->event = vec->event;
1373 	ev->vec = vec->vector_ev;
1374 	buf->count++;
1375 
1376 	vec->vector_ev = NULL;
1377 	vec->ts = 0;
1378 }
1379 
1380 static int
1381 rxa_service_func(void *args)
1382 {
1383 	struct event_eth_rx_adapter *rx_adapter = args;
1384 
1385 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1386 		return 0;
1387 	if (!rx_adapter->rxa_started) {
1388 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1389 		return 0;
1390 	}
1391 
1392 	if (rx_adapter->ena_vector) {
1393 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1394 		    rx_adapter->vector_tmo_ticks) {
1395 			struct eth_rx_vector_data *vec;
1396 
1397 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1398 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1399 
1400 				if (elapsed_time >= vec->vector_timeout_ticks) {
1401 					rxa_vector_expire(vec, rx_adapter);
1402 					TAILQ_REMOVE(&rx_adapter->vector_list,
1403 						     vec, next);
1404 				}
1405 			}
1406 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1407 		}
1408 	}
1409 
1410 	rxa_intr_ring_dequeue(rx_adapter);
1411 	rxa_poll(rx_adapter);
1412 
1413 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1414 
1415 	return 0;
1416 }
1417 
1418 static int
1419 rte_event_eth_rx_adapter_init(void)
1420 {
1421 	const char *name = RXA_ADAPTER_ARRAY;
1422 	const struct rte_memzone *mz;
1423 	unsigned int sz;
1424 
1425 	sz = sizeof(*event_eth_rx_adapter) *
1426 	    RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1427 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1428 
1429 	mz = rte_memzone_lookup(name);
1430 	if (mz == NULL) {
1431 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1432 						 RTE_CACHE_LINE_SIZE);
1433 		if (mz == NULL) {
1434 			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1435 					PRId32, rte_errno);
1436 			return -rte_errno;
1437 		}
1438 	}
1439 
1440 	event_eth_rx_adapter = mz->addr;
1441 	return 0;
1442 }
1443 
1444 static int
1445 rxa_memzone_lookup(void)
1446 {
1447 	const struct rte_memzone *mz;
1448 
1449 	if (event_eth_rx_adapter == NULL) {
1450 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1451 		if (mz == NULL)
1452 			return -ENOMEM;
1453 		event_eth_rx_adapter = mz->addr;
1454 	}
1455 
1456 	return 0;
1457 }
1458 
1459 static inline struct event_eth_rx_adapter *
1460 rxa_id_to_adapter(uint8_t id)
1461 {
1462 	return event_eth_rx_adapter ?
1463 		event_eth_rx_adapter[id] : NULL;
1464 }
1465 
1466 static int
1467 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1468 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1469 {
1470 	int ret;
1471 	struct rte_eventdev *dev;
1472 	struct rte_event_dev_config dev_conf;
1473 	int started;
1474 	uint8_t port_id;
1475 	struct rte_event_port_conf *port_conf = arg;
1476 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1477 
1478 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1479 	dev_conf = dev->data->dev_conf;
1480 
1481 	started = dev->data->dev_started;
1482 	if (started)
1483 		rte_event_dev_stop(dev_id);
1484 	port_id = dev_conf.nb_event_ports;
1485 	dev_conf.nb_event_ports += 1;
1486 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1487 	if (ret) {
1488 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1489 						dev_id);
1490 		if (started) {
1491 			if (rte_event_dev_start(dev_id))
1492 				return -EIO;
1493 		}
1494 		return ret;
1495 	}
1496 
1497 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1498 	if (ret) {
1499 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1500 					port_id);
1501 		return ret;
1502 	}
1503 
1504 	conf->event_port_id = port_id;
1505 	conf->max_nb_rx = 128;
1506 	if (started)
1507 		ret = rte_event_dev_start(dev_id);
1508 	rx_adapter->default_cb_arg = 1;
1509 	return ret;
1510 }
1511 
1512 static int
1513 rxa_epoll_create1(void)
1514 {
1515 #if defined(LINUX)
1516 	int fd;
1517 	fd = epoll_create1(EPOLL_CLOEXEC);
1518 	return fd < 0 ? -errno : fd;
1519 #elif defined(BSD)
1520 	return -ENOTSUP;
1521 #endif
1522 }
1523 
1524 static int
1525 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1526 {
1527 	if (rx_adapter->epd != INIT_FD)
1528 		return 0;
1529 
1530 	rx_adapter->epd = rxa_epoll_create1();
1531 	if (rx_adapter->epd < 0) {
1532 		int err = rx_adapter->epd;
1533 		rx_adapter->epd = INIT_FD;
1534 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1535 		return err;
1536 	}
1537 
1538 	return 0;
1539 }
1540 
1541 static int
1542 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1543 {
1544 	int err;
1545 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1546 
1547 	if (rx_adapter->intr_ring)
1548 		return 0;
1549 
1550 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1551 					RTE_EVENT_ETH_INTR_RING_SIZE,
1552 					rte_socket_id(), 0);
1553 	if (!rx_adapter->intr_ring)
1554 		return -ENOMEM;
1555 
1556 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1557 					RTE_EVENT_ETH_INTR_RING_SIZE *
1558 					sizeof(struct rte_epoll_event),
1559 					RTE_CACHE_LINE_SIZE,
1560 					rx_adapter->socket_id);
1561 	if (!rx_adapter->epoll_events) {
1562 		err = -ENOMEM;
1563 		goto error;
1564 	}
1565 
1566 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1567 
1568 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1569 			"rx-intr-thread-%d", rx_adapter->id);
1570 
1571 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1572 				NULL, rxa_intr_thread, rx_adapter);
1573 	if (!err)
1574 		return 0;
1575 
1576 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1577 	rte_free(rx_adapter->epoll_events);
1578 error:
1579 	rte_ring_free(rx_adapter->intr_ring);
1580 	rx_adapter->intr_ring = NULL;
1581 	rx_adapter->epoll_events = NULL;
1582 	return err;
1583 }
1584 
1585 static int
1586 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1587 {
1588 	int err;
1589 
1590 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1591 	if (err)
1592 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1593 				err);
1594 
1595 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1596 	if (err)
1597 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1598 
1599 	rte_free(rx_adapter->epoll_events);
1600 	rte_ring_free(rx_adapter->intr_ring);
1601 	rx_adapter->intr_ring = NULL;
1602 	rx_adapter->epoll_events = NULL;
1603 	return 0;
1604 }
1605 
1606 static int
1607 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1608 {
1609 	int ret;
1610 
1611 	if (rx_adapter->num_rx_intr == 0)
1612 		return 0;
1613 
1614 	ret = rxa_destroy_intr_thread(rx_adapter);
1615 	if (ret)
1616 		return ret;
1617 
1618 	close(rx_adapter->epd);
1619 	rx_adapter->epd = INIT_FD;
1620 
1621 	return ret;
1622 }
1623 
1624 static int
1625 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1626 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1627 {
1628 	int err;
1629 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1630 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1631 
1632 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1633 	if (err) {
1634 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1635 			rx_queue_id);
1636 		return err;
1637 	}
1638 
1639 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1640 					rx_adapter->epd,
1641 					RTE_INTR_EVENT_DEL,
1642 					0);
1643 	if (err)
1644 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1645 
1646 	if (sintr)
1647 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1648 	else
1649 		dev_info->shared_intr_enabled = 0;
1650 	return err;
1651 }
1652 
1653 static int
1654 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1655 		   struct eth_device_info *dev_info, int rx_queue_id)
1656 {
1657 	int err;
1658 	int i;
1659 	int s;
1660 
1661 	if (dev_info->nb_rx_intr == 0)
1662 		return 0;
1663 
1664 	err = 0;
1665 	if (rx_queue_id == -1) {
1666 		s = dev_info->nb_shared_intr;
1667 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1668 			int sintr;
1669 			uint16_t q;
1670 
1671 			q = dev_info->intr_queue[i];
1672 			sintr = rxa_shared_intr(dev_info, q);
1673 			s -= sintr;
1674 
1675 			if (!sintr || s == 0) {
1676 
1677 				err = rxa_disable_intr(rx_adapter, dev_info,
1678 						q);
1679 				if (err)
1680 					return err;
1681 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1682 							q);
1683 			}
1684 		}
1685 	} else {
1686 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1687 			return 0;
1688 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1689 				dev_info->nb_shared_intr == 1) {
1690 			err = rxa_disable_intr(rx_adapter, dev_info,
1691 					rx_queue_id);
1692 			if (err)
1693 				return err;
1694 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1695 						rx_queue_id);
1696 		}
1697 
1698 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1699 			if (dev_info->intr_queue[i] == rx_queue_id) {
1700 				for (; i < dev_info->nb_rx_intr - 1; i++)
1701 					dev_info->intr_queue[i] =
1702 						dev_info->intr_queue[i + 1];
1703 				break;
1704 			}
1705 		}
1706 	}
1707 
1708 	return err;
1709 }
1710 
1711 static int
1712 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1713 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1714 {
1715 	int err, err1;
1716 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1717 	union queue_data qd;
1718 	int init_fd;
1719 	uint16_t *intr_queue;
1720 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1721 
1722 	if (rxa_intr_queue(dev_info, rx_queue_id))
1723 		return 0;
1724 
1725 	intr_queue = dev_info->intr_queue;
1726 	if (dev_info->intr_queue == NULL) {
1727 		size_t len =
1728 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1729 		dev_info->intr_queue =
1730 			rte_zmalloc_socket(
1731 				rx_adapter->mem_name,
1732 				len,
1733 				0,
1734 				rx_adapter->socket_id);
1735 		if (dev_info->intr_queue == NULL)
1736 			return -ENOMEM;
1737 	}
1738 
1739 	init_fd = rx_adapter->epd;
1740 	err = rxa_init_epd(rx_adapter);
1741 	if (err)
1742 		goto err_free_queue;
1743 
1744 	qd.port = eth_dev_id;
1745 	qd.queue = rx_queue_id;
1746 
1747 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1748 					rx_adapter->epd,
1749 					RTE_INTR_EVENT_ADD,
1750 					qd.ptr);
1751 	if (err) {
1752 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1753 			" Rx Queue %u err %d", rx_queue_id, err);
1754 		goto err_del_fd;
1755 	}
1756 
1757 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1758 	if (err) {
1759 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1760 				" Rx Queue %u err %d", rx_queue_id, err);
1761 
1762 		goto err_del_event;
1763 	}
1764 
1765 	err = rxa_create_intr_thread(rx_adapter);
1766 	if (!err)  {
1767 		if (sintr)
1768 			dev_info->shared_intr_enabled = 1;
1769 		else
1770 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1771 		return 0;
1772 	}
1773 
1774 
1775 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1776 	if (err)
1777 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1778 				" Rx Queue %u err %d", rx_queue_id, err);
1779 err_del_event:
1780 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1781 					rx_adapter->epd,
1782 					RTE_INTR_EVENT_DEL,
1783 					0);
1784 	if (err1) {
1785 		RTE_EDEV_LOG_ERR("Could not delete event for"
1786 				" Rx Queue %u err %d", rx_queue_id, err1);
1787 	}
1788 err_del_fd:
1789 	if (init_fd == INIT_FD) {
1790 		close(rx_adapter->epd);
1791 		rx_adapter->epd = -1;
1792 	}
1793 err_free_queue:
1794 	if (intr_queue == NULL)
1795 		rte_free(dev_info->intr_queue);
1796 
1797 	return err;
1798 }
1799 
1800 static int
1801 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1802 		   struct eth_device_info *dev_info, int rx_queue_id)
1803 
1804 {
1805 	int i, j, err;
1806 	int si = -1;
1807 	int shared_done = (dev_info->nb_shared_intr > 0);
1808 
1809 	if (rx_queue_id != -1) {
1810 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1811 			return 0;
1812 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1813 	}
1814 
1815 	err = 0;
1816 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1817 
1818 		if (rxa_shared_intr(dev_info, i) && shared_done)
1819 			continue;
1820 
1821 		err = rxa_config_intr(rx_adapter, dev_info, i);
1822 
1823 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1824 		if (shared_done) {
1825 			si = i;
1826 			dev_info->shared_intr_enabled = 1;
1827 		}
1828 		if (err)
1829 			break;
1830 	}
1831 
1832 	if (err == 0)
1833 		return 0;
1834 
1835 	shared_done = (dev_info->nb_shared_intr > 0);
1836 	for (j = 0; j < i; j++) {
1837 		if (rxa_intr_queue(dev_info, j))
1838 			continue;
1839 		if (rxa_shared_intr(dev_info, j) && si != j)
1840 			continue;
1841 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1842 		if (err)
1843 			break;
1844 
1845 	}
1846 
1847 	return err;
1848 }
1849 
1850 static int
1851 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1852 {
1853 	int ret;
1854 	struct rte_service_spec service;
1855 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1856 
1857 	if (rx_adapter->service_inited)
1858 		return 0;
1859 
1860 	memset(&service, 0, sizeof(service));
1861 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1862 		"rte_event_eth_rx_adapter_%d", id);
1863 	service.socket_id = rx_adapter->socket_id;
1864 	service.callback = rxa_service_func;
1865 	service.callback_userdata = rx_adapter;
1866 	/* Service function handles locking for queue add/del updates */
1867 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1868 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1869 	if (ret) {
1870 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1871 			service.name, ret);
1872 		return ret;
1873 	}
1874 
1875 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1876 		&rx_adapter_conf, rx_adapter->conf_arg);
1877 	if (ret) {
1878 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1879 			ret);
1880 		goto err_done;
1881 	}
1882 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1883 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1884 	rx_adapter->service_inited = 1;
1885 	rx_adapter->epd = INIT_FD;
1886 	return 0;
1887 
1888 err_done:
1889 	rte_service_component_unregister(rx_adapter->service_id);
1890 	return ret;
1891 }
1892 
1893 static void
1894 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1895 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1896 		 uint8_t add)
1897 {
1898 	struct eth_rx_queue_info *queue_info;
1899 	int enabled;
1900 	uint16_t i;
1901 
1902 	if (dev_info->rx_queue == NULL)
1903 		return;
1904 
1905 	if (rx_queue_id == -1) {
1906 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1907 			rxa_update_queue(rx_adapter, dev_info, i, add);
1908 	} else {
1909 		queue_info = &dev_info->rx_queue[rx_queue_id];
1910 		enabled = queue_info->queue_enabled;
1911 		if (add) {
1912 			rx_adapter->nb_queues += !enabled;
1913 			dev_info->nb_dev_queues += !enabled;
1914 		} else {
1915 			rx_adapter->nb_queues -= enabled;
1916 			dev_info->nb_dev_queues -= enabled;
1917 		}
1918 		queue_info->queue_enabled = !!add;
1919 	}
1920 }
1921 
1922 static void
1923 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1924 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1925 		    uint16_t port_id)
1926 {
1927 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1928 	struct eth_rx_vector_data *vector_data;
1929 	uint32_t flow_id;
1930 
1931 	vector_data = &queue_info->vector_data;
1932 	vector_data->max_vector_count = vector_count;
1933 	vector_data->port = port_id;
1934 	vector_data->queue = qid;
1935 	vector_data->vector_pool = mp;
1936 	vector_data->vector_timeout_ticks =
1937 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1938 	vector_data->ts = 0;
1939 	flow_id = queue_info->event & 0xFFFFF;
1940 	flow_id =
1941 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1942 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1943 }
1944 
1945 static void
1946 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
1947 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
1948 {
1949 	struct eth_rx_vector_data *vec;
1950 	int pollq;
1951 	int intrq;
1952 	int sintrq;
1953 
1954 
1955 	if (rx_adapter->nb_queues == 0)
1956 		return;
1957 
1958 	if (rx_queue_id == -1) {
1959 		uint16_t nb_rx_queues;
1960 		uint16_t i;
1961 
1962 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1963 		for (i = 0; i <	nb_rx_queues; i++)
1964 			rxa_sw_del(rx_adapter, dev_info, i);
1965 		return;
1966 	}
1967 
1968 	/* Push all the partial event vectors to event device. */
1969 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1970 		if (vec->queue != rx_queue_id)
1971 			continue;
1972 		rxa_vector_expire(vec, rx_adapter);
1973 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1974 	}
1975 
1976 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1977 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1978 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1979 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1980 	rx_adapter->num_rx_polled -= pollq;
1981 	dev_info->nb_rx_poll -= pollq;
1982 	rx_adapter->num_rx_intr -= intrq;
1983 	dev_info->nb_rx_intr -= intrq;
1984 	dev_info->nb_shared_intr -= intrq && sintrq;
1985 	if (rx_adapter->use_queue_event_buf) {
1986 		struct eth_event_enqueue_buffer *event_buf =
1987 			dev_info->rx_queue[rx_queue_id].event_buf;
1988 		struct rte_event_eth_rx_adapter_stats *stats =
1989 			dev_info->rx_queue[rx_queue_id].stats;
1990 		rte_free(event_buf->events);
1991 		rte_free(event_buf);
1992 		rte_free(stats);
1993 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
1994 		dev_info->rx_queue[rx_queue_id].stats = NULL;
1995 	}
1996 }
1997 
1998 static int
1999 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
2000 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
2001 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
2002 {
2003 	struct eth_rx_queue_info *queue_info;
2004 	const struct rte_event *ev = &conf->ev;
2005 	int pollq;
2006 	int intrq;
2007 	int sintrq;
2008 	struct rte_event *qi_ev;
2009 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
2010 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
2011 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
2012 	int ret;
2013 
2014 	if (rx_queue_id == -1) {
2015 		uint16_t nb_rx_queues;
2016 		uint16_t i;
2017 
2018 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2019 		for (i = 0; i <	nb_rx_queues; i++) {
2020 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
2021 			if (ret)
2022 				return ret;
2023 		}
2024 		return 0;
2025 	}
2026 
2027 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2028 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2029 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2030 
2031 	queue_info = &dev_info->rx_queue[rx_queue_id];
2032 	queue_info->wt = conf->servicing_weight;
2033 
2034 	qi_ev = (struct rte_event *)&queue_info->event;
2035 	qi_ev->event = ev->event;
2036 	qi_ev->op = RTE_EVENT_OP_NEW;
2037 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2038 	qi_ev->sub_event_type = 0;
2039 
2040 	if (conf->rx_queue_flags &
2041 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2042 		queue_info->flow_id_mask = ~0;
2043 	} else
2044 		qi_ev->flow_id = 0;
2045 
2046 	if (conf->rx_queue_flags &
2047 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2048 		queue_info->ena_vector = 1;
2049 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2050 		rxa_set_vector_data(queue_info, conf->vector_sz,
2051 				    conf->vector_timeout_ns, conf->vector_mp,
2052 				    rx_queue_id, dev_info->dev->data->port_id);
2053 		rx_adapter->ena_vector = 1;
2054 		rx_adapter->vector_tmo_ticks =
2055 			rx_adapter->vector_tmo_ticks ?
2056 				      RTE_MIN(queue_info->vector_data
2057 							.vector_timeout_ticks >>
2058 						1,
2059 					rx_adapter->vector_tmo_ticks) :
2060 				queue_info->vector_data.vector_timeout_ticks >>
2061 					1;
2062 	}
2063 
2064 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2065 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2066 		rx_adapter->num_rx_polled += !pollq;
2067 		dev_info->nb_rx_poll += !pollq;
2068 		rx_adapter->num_rx_intr -= intrq;
2069 		dev_info->nb_rx_intr -= intrq;
2070 		dev_info->nb_shared_intr -= intrq && sintrq;
2071 	}
2072 
2073 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2074 		rx_adapter->num_rx_polled -= pollq;
2075 		dev_info->nb_rx_poll -= pollq;
2076 		rx_adapter->num_rx_intr += !intrq;
2077 		dev_info->nb_rx_intr += !intrq;
2078 		dev_info->nb_shared_intr += !intrq && sintrq;
2079 		if (dev_info->nb_shared_intr == 1) {
2080 			if (dev_info->multi_intr_cap)
2081 				dev_info->next_q_idx =
2082 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2083 			else
2084 				dev_info->next_q_idx = 0;
2085 		}
2086 	}
2087 
2088 	if (!rx_adapter->use_queue_event_buf)
2089 		return 0;
2090 
2091 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2092 				sizeof(*new_rx_buf), 0,
2093 				rte_eth_dev_socket_id(eth_dev_id));
2094 	if (new_rx_buf == NULL) {
2095 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2096 				 "dev_id: %d queue_id: %d",
2097 				 eth_dev_id, rx_queue_id);
2098 		return -ENOMEM;
2099 	}
2100 
2101 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2102 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2103 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2104 				sizeof(struct rte_event) *
2105 				new_rx_buf->events_size, 0,
2106 				rte_eth_dev_socket_id(eth_dev_id));
2107 	if (new_rx_buf->events == NULL) {
2108 		rte_free(new_rx_buf);
2109 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2110 				 "dev_id: %d queue_id: %d",
2111 				 eth_dev_id, rx_queue_id);
2112 		return -ENOMEM;
2113 	}
2114 
2115 	queue_info->event_buf = new_rx_buf;
2116 
2117 	/* Allocate storage for adapter queue stats */
2118 	stats = rte_zmalloc_socket("rx_queue_stats",
2119 				sizeof(*stats), 0,
2120 				rte_eth_dev_socket_id(eth_dev_id));
2121 	if (stats == NULL) {
2122 		rte_free(new_rx_buf->events);
2123 		rte_free(new_rx_buf);
2124 		RTE_EDEV_LOG_ERR("Failed to allocate stats storage for"
2125 				 " dev_id: %d queue_id: %d",
2126 				 eth_dev_id, rx_queue_id);
2127 		return -ENOMEM;
2128 	}
2129 
2130 	queue_info->stats = stats;
2131 
2132 	return 0;
2133 }
2134 
2135 static int
2136 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2137 	   int rx_queue_id,
2138 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2139 {
2140 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2141 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2142 	int ret;
2143 	struct eth_rx_poll_entry *rx_poll;
2144 	struct eth_rx_queue_info *rx_queue;
2145 	uint32_t *rx_wrr;
2146 	uint16_t nb_rx_queues;
2147 	uint32_t nb_rx_poll, nb_wrr;
2148 	uint32_t nb_rx_intr;
2149 	int num_intr_vec;
2150 	uint16_t wt;
2151 
2152 	if (queue_conf->servicing_weight == 0) {
2153 		struct rte_eth_dev_data *data = dev_info->dev->data;
2154 
2155 		temp_conf = *queue_conf;
2156 		if (!data->dev_conf.intr_conf.rxq) {
2157 			/* If Rx interrupts are disabled set wt = 1 */
2158 			temp_conf.servicing_weight = 1;
2159 		}
2160 		queue_conf = &temp_conf;
2161 
2162 		if (queue_conf->servicing_weight == 0 &&
2163 		    rx_adapter->use_queue_event_buf) {
2164 
2165 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2166 					 "not supported for interrupt queues "
2167 					 "dev_id: %d queue_id: %d",
2168 					 eth_dev_id, rx_queue_id);
2169 			return -EINVAL;
2170 		}
2171 	}
2172 
2173 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2174 	rx_queue = dev_info->rx_queue;
2175 	wt = queue_conf->servicing_weight;
2176 
2177 	if (dev_info->rx_queue == NULL) {
2178 		dev_info->rx_queue =
2179 		    rte_zmalloc_socket(rx_adapter->mem_name,
2180 				       nb_rx_queues *
2181 				       sizeof(struct eth_rx_queue_info), 0,
2182 				       rx_adapter->socket_id);
2183 		if (dev_info->rx_queue == NULL)
2184 			return -ENOMEM;
2185 	}
2186 	rx_wrr = NULL;
2187 	rx_poll = NULL;
2188 
2189 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2190 			queue_conf->servicing_weight,
2191 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2192 
2193 	if (dev_info->dev->intr_handle)
2194 		dev_info->multi_intr_cap =
2195 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2196 
2197 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2198 				&rx_poll, &rx_wrr);
2199 	if (ret)
2200 		goto err_free_rxqueue;
2201 
2202 	if (wt == 0) {
2203 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2204 
2205 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2206 		if (ret)
2207 			goto err_free_rxqueue;
2208 
2209 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2210 		if (ret)
2211 			goto err_free_rxqueue;
2212 	} else {
2213 
2214 		num_intr_vec = 0;
2215 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2216 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2217 						rx_queue_id, 0);
2218 			/* interrupt based queues are being converted to
2219 			 * poll mode queues, delete the interrupt configuration
2220 			 * for those.
2221 			 */
2222 			ret = rxa_del_intr_queue(rx_adapter,
2223 						dev_info, rx_queue_id);
2224 			if (ret)
2225 				goto err_free_rxqueue;
2226 		}
2227 	}
2228 
2229 	if (nb_rx_intr == 0) {
2230 		ret = rxa_free_intr_resources(rx_adapter);
2231 		if (ret)
2232 			goto err_free_rxqueue;
2233 	}
2234 
2235 	if (wt == 0) {
2236 		uint16_t i;
2237 
2238 		if (rx_queue_id  == -1) {
2239 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2240 				dev_info->intr_queue[i] = i;
2241 		} else {
2242 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2243 				dev_info->intr_queue[nb_rx_intr - 1] =
2244 					rx_queue_id;
2245 		}
2246 	}
2247 
2248 
2249 
2250 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2251 	if (ret)
2252 		goto err_free_rxqueue;
2253 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2254 
2255 	rte_free(rx_adapter->eth_rx_poll);
2256 	rte_free(rx_adapter->wrr_sched);
2257 
2258 	rx_adapter->eth_rx_poll = rx_poll;
2259 	rx_adapter->wrr_sched = rx_wrr;
2260 	rx_adapter->wrr_len = nb_wrr;
2261 	rx_adapter->num_intr_vec += num_intr_vec;
2262 	return 0;
2263 
2264 err_free_rxqueue:
2265 	if (rx_queue == NULL) {
2266 		rte_free(dev_info->rx_queue);
2267 		dev_info->rx_queue = NULL;
2268 	}
2269 
2270 	rte_free(rx_poll);
2271 	rte_free(rx_wrr);
2272 
2273 	return ret;
2274 }
2275 
2276 static int
2277 rxa_ctrl(uint8_t id, int start)
2278 {
2279 	struct event_eth_rx_adapter *rx_adapter;
2280 	struct rte_eventdev *dev;
2281 	struct eth_device_info *dev_info;
2282 	uint32_t i;
2283 	int use_service = 0;
2284 	int stop = !start;
2285 
2286 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2287 	rx_adapter = rxa_id_to_adapter(id);
2288 	if (rx_adapter == NULL)
2289 		return -EINVAL;
2290 
2291 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2292 
2293 	RTE_ETH_FOREACH_DEV(i) {
2294 		dev_info = &rx_adapter->eth_devices[i];
2295 		/* if start  check for num dev queues */
2296 		if (start && !dev_info->nb_dev_queues)
2297 			continue;
2298 		/* if stop check if dev has been started */
2299 		if (stop && !dev_info->dev_rx_started)
2300 			continue;
2301 		use_service |= !dev_info->internal_event_port;
2302 		dev_info->dev_rx_started = start;
2303 		if (dev_info->internal_event_port == 0)
2304 			continue;
2305 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2306 						&rte_eth_devices[i]) :
2307 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2308 						&rte_eth_devices[i]);
2309 	}
2310 
2311 	if (use_service) {
2312 		rte_spinlock_lock(&rx_adapter->rx_lock);
2313 		rx_adapter->rxa_started = start;
2314 		rte_service_runstate_set(rx_adapter->service_id, start);
2315 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2316 	}
2317 
2318 	return 0;
2319 }
2320 
2321 static int
2322 rxa_create(uint8_t id, uint8_t dev_id,
2323 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2324 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2325 	   void *conf_arg)
2326 {
2327 	struct event_eth_rx_adapter *rx_adapter;
2328 	struct eth_event_enqueue_buffer *buf;
2329 	struct rte_event *events;
2330 	int ret;
2331 	int socket_id;
2332 	uint16_t i;
2333 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2334 	const uint8_t default_rss_key[] = {
2335 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2336 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2337 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2338 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2339 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2340 	};
2341 
2342 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2343 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2344 
2345 	if (conf_cb == NULL)
2346 		return -EINVAL;
2347 
2348 	if (event_eth_rx_adapter == NULL) {
2349 		ret = rte_event_eth_rx_adapter_init();
2350 		if (ret)
2351 			return ret;
2352 	}
2353 
2354 	rx_adapter = rxa_id_to_adapter(id);
2355 	if (rx_adapter != NULL) {
2356 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2357 		return -EEXIST;
2358 	}
2359 
2360 	socket_id = rte_event_dev_socket_id(dev_id);
2361 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2362 		"rte_event_eth_rx_adapter_%d",
2363 		id);
2364 
2365 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2366 			RTE_CACHE_LINE_SIZE, socket_id);
2367 	if (rx_adapter == NULL) {
2368 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2369 		return -ENOMEM;
2370 	}
2371 
2372 	rx_adapter->eventdev_id = dev_id;
2373 	rx_adapter->socket_id = socket_id;
2374 	rx_adapter->conf_cb = conf_cb;
2375 	rx_adapter->conf_arg = conf_arg;
2376 	rx_adapter->id = id;
2377 	TAILQ_INIT(&rx_adapter->vector_list);
2378 	strcpy(rx_adapter->mem_name, mem_name);
2379 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2380 					RTE_MAX_ETHPORTS *
2381 					sizeof(struct eth_device_info), 0,
2382 					socket_id);
2383 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2384 			(uint32_t *)rx_adapter->rss_key_be,
2385 			    RTE_DIM(default_rss_key));
2386 
2387 	if (rx_adapter->eth_devices == NULL) {
2388 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2389 		rte_free(rx_adapter);
2390 		return -ENOMEM;
2391 	}
2392 
2393 	rte_spinlock_init(&rx_adapter->rx_lock);
2394 
2395 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2396 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2397 
2398 	/* Rx adapter event buffer allocation */
2399 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2400 
2401 	if (!rx_adapter->use_queue_event_buf) {
2402 		buf = &rx_adapter->event_enqueue_buffer;
2403 		buf->events_size = rxa_params->event_buf_size;
2404 
2405 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2406 					    buf->events_size * sizeof(*events),
2407 					    0, socket_id);
2408 		if (events == NULL) {
2409 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2410 					 "for adapter event buffer");
2411 			rte_free(rx_adapter->eth_devices);
2412 			rte_free(rx_adapter);
2413 			return -ENOMEM;
2414 		}
2415 
2416 		rx_adapter->event_enqueue_buffer.events = events;
2417 	}
2418 
2419 	event_eth_rx_adapter[id] = rx_adapter;
2420 
2421 	if (conf_cb == rxa_default_conf_cb)
2422 		rx_adapter->default_cb_arg = 1;
2423 
2424 	if (rte_mbuf_dyn_rx_timestamp_register(
2425 			&event_eth_rx_timestamp_dynfield_offset,
2426 			&event_eth_rx_timestamp_dynflag) != 0) {
2427 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2428 		return -rte_errno;
2429 	}
2430 
2431 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2432 		conf_arg);
2433 	return 0;
2434 }
2435 
2436 int
2437 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2438 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2439 				void *conf_arg)
2440 {
2441 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2442 
2443 	/* use default values for adapter params */
2444 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2445 	rxa_params.use_queue_event_buf = false;
2446 
2447 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2448 }
2449 
2450 int
2451 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2452 			struct rte_event_port_conf *port_config,
2453 			struct rte_event_eth_rx_adapter_params *rxa_params)
2454 {
2455 	struct rte_event_port_conf *pc;
2456 	int ret;
2457 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2458 
2459 	if (port_config == NULL)
2460 		return -EINVAL;
2461 
2462 	if (rxa_params == NULL) {
2463 		/* use default values if rxa_params is NULL */
2464 		rxa_params = &temp_params;
2465 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2466 		rxa_params->use_queue_event_buf = false;
2467 	} else if ((!rxa_params->use_queue_event_buf &&
2468 		    rxa_params->event_buf_size == 0) ||
2469 		   (rxa_params->use_queue_event_buf &&
2470 		    rxa_params->event_buf_size != 0)) {
2471 		RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2472 		return -EINVAL;
2473 	} else if (!rxa_params->use_queue_event_buf) {
2474 		/* adjust event buff size with BATCH_SIZE used for fetching
2475 		 * packets from NIC rx queues to get full buffer utilization
2476 		 * and prevent unnecessary rollovers.
2477 		 */
2478 
2479 		rxa_params->event_buf_size =
2480 			RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2481 		rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2482 	}
2483 
2484 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2485 	if (pc == NULL)
2486 		return -ENOMEM;
2487 
2488 	*pc = *port_config;
2489 
2490 	ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2491 	if (ret)
2492 		rte_free(pc);
2493 
2494 	return ret;
2495 }
2496 
2497 int
2498 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2499 		struct rte_event_port_conf *port_config)
2500 {
2501 	struct rte_event_port_conf *pc;
2502 	int ret;
2503 
2504 	if (port_config == NULL)
2505 		return -EINVAL;
2506 
2507 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2508 
2509 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2510 	if (pc == NULL)
2511 		return -ENOMEM;
2512 	*pc = *port_config;
2513 
2514 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2515 					rxa_default_conf_cb,
2516 					pc);
2517 	if (ret)
2518 		rte_free(pc);
2519 	return ret;
2520 }
2521 
2522 int
2523 rte_event_eth_rx_adapter_free(uint8_t id)
2524 {
2525 	struct event_eth_rx_adapter *rx_adapter;
2526 
2527 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2528 
2529 	rx_adapter = rxa_id_to_adapter(id);
2530 	if (rx_adapter == NULL)
2531 		return -EINVAL;
2532 
2533 	if (rx_adapter->nb_queues) {
2534 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2535 				rx_adapter->nb_queues);
2536 		return -EBUSY;
2537 	}
2538 
2539 	if (rx_adapter->default_cb_arg)
2540 		rte_free(rx_adapter->conf_arg);
2541 	rte_free(rx_adapter->eth_devices);
2542 	if (!rx_adapter->use_queue_event_buf)
2543 		rte_free(rx_adapter->event_enqueue_buffer.events);
2544 	rte_free(rx_adapter);
2545 	event_eth_rx_adapter[id] = NULL;
2546 
2547 	rte_eventdev_trace_eth_rx_adapter_free(id);
2548 	return 0;
2549 }
2550 
2551 int
2552 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2553 		uint16_t eth_dev_id,
2554 		int32_t rx_queue_id,
2555 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2556 {
2557 	int ret;
2558 	uint32_t cap;
2559 	struct event_eth_rx_adapter *rx_adapter;
2560 	struct rte_eventdev *dev;
2561 	struct eth_device_info *dev_info;
2562 	struct rte_event_eth_rx_adapter_vector_limits limits;
2563 
2564 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2565 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2566 
2567 	rx_adapter = rxa_id_to_adapter(id);
2568 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2569 		return -EINVAL;
2570 
2571 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2572 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2573 						eth_dev_id,
2574 						&cap);
2575 	if (ret) {
2576 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2577 			"eth port %" PRIu16, id, eth_dev_id);
2578 		return ret;
2579 	}
2580 
2581 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2582 		&& (queue_conf->rx_queue_flags &
2583 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2584 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2585 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2586 				eth_dev_id, id);
2587 		return -EINVAL;
2588 	}
2589 
2590 	if (queue_conf->rx_queue_flags &
2591 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2592 
2593 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2594 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2595 					 " eth port: %" PRIu16
2596 					 " adapter id: %" PRIu8,
2597 					 eth_dev_id, id);
2598 			return -EINVAL;
2599 		}
2600 
2601 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2602 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2603 		if (ret < 0) {
2604 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2605 					 " eth port: %" PRIu16
2606 					 " adapter id: %" PRIu8,
2607 					 eth_dev_id, id);
2608 			return -EINVAL;
2609 		}
2610 		if (queue_conf->vector_sz < limits.min_sz ||
2611 		    queue_conf->vector_sz > limits.max_sz ||
2612 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2613 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2614 		    queue_conf->vector_mp == NULL) {
2615 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2616 					 " eth port: %" PRIu16
2617 					 " adapter id: %" PRIu8,
2618 					 eth_dev_id, id);
2619 			return -EINVAL;
2620 		}
2621 		if (queue_conf->vector_mp->elt_size <
2622 		    (sizeof(struct rte_event_vector) +
2623 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2624 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2625 					 " eth port: %" PRIu16
2626 					 " adapter id: %" PRIu8,
2627 					 eth_dev_id, id);
2628 			return -EINVAL;
2629 		}
2630 	}
2631 
2632 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2633 		(rx_queue_id != -1)) {
2634 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2635 			"event queue, eth port: %" PRIu16 " adapter id: %"
2636 			PRIu8, eth_dev_id, id);
2637 		return -EINVAL;
2638 	}
2639 
2640 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2641 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2642 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2643 			 (uint16_t)rx_queue_id);
2644 		return -EINVAL;
2645 	}
2646 
2647 	if ((rx_adapter->use_queue_event_buf &&
2648 	     queue_conf->event_buf_size == 0) ||
2649 	    (!rx_adapter->use_queue_event_buf &&
2650 	     queue_conf->event_buf_size != 0)) {
2651 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2652 		return -EINVAL;
2653 	}
2654 
2655 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2656 
2657 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2658 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2659 					-ENOTSUP);
2660 		if (dev_info->rx_queue == NULL) {
2661 			dev_info->rx_queue =
2662 			    rte_zmalloc_socket(rx_adapter->mem_name,
2663 					dev_info->dev->data->nb_rx_queues *
2664 					sizeof(struct eth_rx_queue_info), 0,
2665 					rx_adapter->socket_id);
2666 			if (dev_info->rx_queue == NULL)
2667 				return -ENOMEM;
2668 		}
2669 
2670 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2671 				&rte_eth_devices[eth_dev_id],
2672 				rx_queue_id, queue_conf);
2673 		if (ret == 0) {
2674 			dev_info->internal_event_port = 1;
2675 			rxa_update_queue(rx_adapter,
2676 					&rx_adapter->eth_devices[eth_dev_id],
2677 					rx_queue_id,
2678 					1);
2679 		}
2680 	} else {
2681 		rte_spinlock_lock(&rx_adapter->rx_lock);
2682 		dev_info->internal_event_port = 0;
2683 		ret = rxa_init_service(rx_adapter, id);
2684 		if (ret == 0) {
2685 			uint32_t service_id = rx_adapter->service_id;
2686 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2687 					queue_conf);
2688 			rte_service_component_runstate_set(service_id,
2689 				rxa_sw_adapter_queue_count(rx_adapter));
2690 		}
2691 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2692 	}
2693 
2694 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2695 		rx_queue_id, queue_conf, ret);
2696 	if (ret)
2697 		return ret;
2698 
2699 	return 0;
2700 }
2701 
2702 static int
2703 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2704 {
2705 	limits->max_sz = MAX_VECTOR_SIZE;
2706 	limits->min_sz = MIN_VECTOR_SIZE;
2707 	limits->max_timeout_ns = MAX_VECTOR_NS;
2708 	limits->min_timeout_ns = MIN_VECTOR_NS;
2709 
2710 	return 0;
2711 }
2712 
2713 int
2714 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2715 				int32_t rx_queue_id)
2716 {
2717 	int ret = 0;
2718 	struct rte_eventdev *dev;
2719 	struct event_eth_rx_adapter *rx_adapter;
2720 	struct eth_device_info *dev_info;
2721 	uint32_t cap;
2722 	uint32_t nb_rx_poll = 0;
2723 	uint32_t nb_wrr = 0;
2724 	uint32_t nb_rx_intr;
2725 	struct eth_rx_poll_entry *rx_poll = NULL;
2726 	uint32_t *rx_wrr = NULL;
2727 	int num_intr_vec;
2728 
2729 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2730 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2731 
2732 	rx_adapter = rxa_id_to_adapter(id);
2733 	if (rx_adapter == NULL)
2734 		return -EINVAL;
2735 
2736 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2737 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2738 						eth_dev_id,
2739 						&cap);
2740 	if (ret)
2741 		return ret;
2742 
2743 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2744 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2745 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2746 			 (uint16_t)rx_queue_id);
2747 		return -EINVAL;
2748 	}
2749 
2750 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2751 
2752 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2753 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2754 				 -ENOTSUP);
2755 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2756 						&rte_eth_devices[eth_dev_id],
2757 						rx_queue_id);
2758 		if (ret == 0) {
2759 			rxa_update_queue(rx_adapter,
2760 					&rx_adapter->eth_devices[eth_dev_id],
2761 					rx_queue_id,
2762 					0);
2763 			if (dev_info->nb_dev_queues == 0) {
2764 				rte_free(dev_info->rx_queue);
2765 				dev_info->rx_queue = NULL;
2766 			}
2767 		}
2768 	} else {
2769 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2770 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2771 
2772 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2773 			&rx_poll, &rx_wrr);
2774 		if (ret)
2775 			return ret;
2776 
2777 		rte_spinlock_lock(&rx_adapter->rx_lock);
2778 
2779 		num_intr_vec = 0;
2780 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2781 
2782 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2783 						rx_queue_id, 0);
2784 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2785 					rx_queue_id);
2786 			if (ret)
2787 				goto unlock_ret;
2788 		}
2789 
2790 		if (nb_rx_intr == 0) {
2791 			ret = rxa_free_intr_resources(rx_adapter);
2792 			if (ret)
2793 				goto unlock_ret;
2794 		}
2795 
2796 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2797 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2798 
2799 		rte_free(rx_adapter->eth_rx_poll);
2800 		rte_free(rx_adapter->wrr_sched);
2801 
2802 		if (nb_rx_intr == 0) {
2803 			rte_free(dev_info->intr_queue);
2804 			dev_info->intr_queue = NULL;
2805 		}
2806 
2807 		rx_adapter->eth_rx_poll = rx_poll;
2808 		rx_adapter->wrr_sched = rx_wrr;
2809 		rx_adapter->wrr_len = nb_wrr;
2810 		/*
2811 		 * reset next poll start position (wrr_pos) to avoid buffer
2812 		 * overrun when wrr_len is reduced in case of queue delete
2813 		 */
2814 		rx_adapter->wrr_pos = 0;
2815 		rx_adapter->num_intr_vec += num_intr_vec;
2816 
2817 		if (dev_info->nb_dev_queues == 0) {
2818 			rte_free(dev_info->rx_queue);
2819 			dev_info->rx_queue = NULL;
2820 		}
2821 unlock_ret:
2822 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2823 		if (ret) {
2824 			rte_free(rx_poll);
2825 			rte_free(rx_wrr);
2826 			return ret;
2827 		}
2828 
2829 		rte_service_component_runstate_set(rx_adapter->service_id,
2830 				rxa_sw_adapter_queue_count(rx_adapter));
2831 	}
2832 
2833 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2834 		rx_queue_id, ret);
2835 	return ret;
2836 }
2837 
2838 int
2839 rte_event_eth_rx_adapter_vector_limits_get(
2840 	uint8_t dev_id, uint16_t eth_port_id,
2841 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2842 {
2843 	struct rte_eventdev *dev;
2844 	uint32_t cap;
2845 	int ret;
2846 
2847 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2848 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2849 
2850 	if (limits == NULL)
2851 		return -EINVAL;
2852 
2853 	dev = &rte_eventdevs[dev_id];
2854 
2855 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2856 	if (ret) {
2857 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2858 				 "eth port %" PRIu16,
2859 				 dev_id, eth_port_id);
2860 		return ret;
2861 	}
2862 
2863 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2864 		RTE_FUNC_PTR_OR_ERR_RET(
2865 			*dev->dev_ops->eth_rx_adapter_vector_limits_get,
2866 			-ENOTSUP);
2867 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2868 			dev, &rte_eth_devices[eth_port_id], limits);
2869 	} else {
2870 		ret = rxa_sw_vector_limits(limits);
2871 	}
2872 
2873 	return ret;
2874 }
2875 
2876 int
2877 rte_event_eth_rx_adapter_start(uint8_t id)
2878 {
2879 	rte_eventdev_trace_eth_rx_adapter_start(id);
2880 	return rxa_ctrl(id, 1);
2881 }
2882 
2883 int
2884 rte_event_eth_rx_adapter_stop(uint8_t id)
2885 {
2886 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2887 	return rxa_ctrl(id, 0);
2888 }
2889 
2890 static inline void
2891 rxa_queue_stats_reset(struct eth_rx_queue_info *queue_info)
2892 {
2893 	struct rte_event_eth_rx_adapter_stats *q_stats;
2894 
2895 	q_stats = queue_info->stats;
2896 	memset(q_stats, 0, sizeof(*q_stats));
2897 }
2898 
2899 int
2900 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2901 			       struct rte_event_eth_rx_adapter_stats *stats)
2902 {
2903 	struct event_eth_rx_adapter *rx_adapter;
2904 	struct eth_event_enqueue_buffer *buf;
2905 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2906 	struct rte_event_eth_rx_adapter_stats dev_stats;
2907 	struct rte_eventdev *dev;
2908 	struct eth_device_info *dev_info;
2909 	struct eth_rx_queue_info *queue_info;
2910 	struct rte_event_eth_rx_adapter_stats *q_stats;
2911 	uint32_t i, j;
2912 	int ret;
2913 
2914 	if (rxa_memzone_lookup())
2915 		return -ENOMEM;
2916 
2917 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2918 
2919 	rx_adapter = rxa_id_to_adapter(id);
2920 	if (rx_adapter  == NULL || stats == NULL)
2921 		return -EINVAL;
2922 
2923 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2924 	memset(stats, 0, sizeof(*stats));
2925 
2926 	if (rx_adapter->service_inited)
2927 		*stats = rx_adapter->stats;
2928 
2929 	RTE_ETH_FOREACH_DEV(i) {
2930 		dev_info = &rx_adapter->eth_devices[i];
2931 
2932 		if (rx_adapter->use_queue_event_buf && dev_info->rx_queue) {
2933 
2934 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
2935 			     j++) {
2936 				queue_info = &dev_info->rx_queue[j];
2937 				if (!queue_info->queue_enabled)
2938 					continue;
2939 				q_stats = queue_info->stats;
2940 
2941 				stats->rx_packets += q_stats->rx_packets;
2942 				stats->rx_poll_count += q_stats->rx_poll_count;
2943 				stats->rx_enq_count += q_stats->rx_enq_count;
2944 				stats->rx_enq_retry += q_stats->rx_enq_retry;
2945 				stats->rx_dropped += q_stats->rx_dropped;
2946 				stats->rx_enq_block_cycles +=
2947 						q_stats->rx_enq_block_cycles;
2948 			}
2949 		}
2950 
2951 		if (dev_info->internal_event_port == 0 ||
2952 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2953 			continue;
2954 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2955 						&rte_eth_devices[i],
2956 						&dev_stats);
2957 		if (ret)
2958 			continue;
2959 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
2960 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2961 	}
2962 
2963 	buf = &rx_adapter->event_enqueue_buffer;
2964 	stats->rx_packets += dev_stats_sum.rx_packets;
2965 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2966 	stats->rx_event_buf_count = buf->count;
2967 	stats->rx_event_buf_size = buf->events_size;
2968 
2969 	return 0;
2970 }
2971 
2972 int
2973 rte_event_eth_rx_adapter_queue_stats_get(uint8_t id,
2974 		uint16_t eth_dev_id,
2975 		uint16_t rx_queue_id,
2976 		struct rte_event_eth_rx_adapter_queue_stats *stats)
2977 {
2978 	struct event_eth_rx_adapter *rx_adapter;
2979 	struct eth_device_info *dev_info;
2980 	struct eth_rx_queue_info *queue_info;
2981 	struct eth_event_enqueue_buffer *event_buf;
2982 	struct rte_event_eth_rx_adapter_stats *q_stats;
2983 	struct rte_eventdev *dev;
2984 
2985 	if (rxa_memzone_lookup())
2986 		return -ENOMEM;
2987 
2988 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2989 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2990 
2991 	rx_adapter = rxa_id_to_adapter(id);
2992 
2993 	if (rx_adapter == NULL || stats == NULL)
2994 		return -EINVAL;
2995 
2996 	if (!rx_adapter->use_queue_event_buf)
2997 		return -EINVAL;
2998 
2999 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3000 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3001 		return -EINVAL;
3002 	}
3003 
3004 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3005 	if (dev_info->rx_queue == NULL ||
3006 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3007 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3008 		return -EINVAL;
3009 	}
3010 
3011 	if (dev_info->internal_event_port == 0) {
3012 		queue_info = &dev_info->rx_queue[rx_queue_id];
3013 		event_buf = queue_info->event_buf;
3014 		q_stats = queue_info->stats;
3015 
3016 		stats->rx_event_buf_count = event_buf->count;
3017 		stats->rx_event_buf_size = event_buf->events_size;
3018 		stats->rx_packets = q_stats->rx_packets;
3019 		stats->rx_poll_count = q_stats->rx_poll_count;
3020 		stats->rx_dropped = q_stats->rx_dropped;
3021 	}
3022 
3023 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3024 	if (dev->dev_ops->eth_rx_adapter_queue_stats_get != NULL) {
3025 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_get)(dev,
3026 						&rte_eth_devices[eth_dev_id],
3027 						rx_queue_id, stats);
3028 	}
3029 
3030 	return 0;
3031 }
3032 
3033 int
3034 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
3035 {
3036 	struct event_eth_rx_adapter *rx_adapter;
3037 	struct rte_eventdev *dev;
3038 	struct eth_device_info *dev_info;
3039 	struct eth_rx_queue_info *queue_info;
3040 	uint32_t i, j;
3041 
3042 	if (rxa_memzone_lookup())
3043 		return -ENOMEM;
3044 
3045 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3046 
3047 	rx_adapter = rxa_id_to_adapter(id);
3048 	if (rx_adapter == NULL)
3049 		return -EINVAL;
3050 
3051 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3052 
3053 	RTE_ETH_FOREACH_DEV(i) {
3054 		dev_info = &rx_adapter->eth_devices[i];
3055 
3056 		if (rx_adapter->use_queue_event_buf  && dev_info->rx_queue) {
3057 
3058 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3059 						j++) {
3060 				queue_info = &dev_info->rx_queue[j];
3061 				if (!queue_info->queue_enabled)
3062 					continue;
3063 				rxa_queue_stats_reset(queue_info);
3064 			}
3065 		}
3066 
3067 		if (dev_info->internal_event_port == 0 ||
3068 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
3069 			continue;
3070 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
3071 							&rte_eth_devices[i]);
3072 	}
3073 
3074 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
3075 
3076 	return 0;
3077 }
3078 
3079 int
3080 rte_event_eth_rx_adapter_queue_stats_reset(uint8_t id,
3081 		uint16_t eth_dev_id,
3082 		uint16_t rx_queue_id)
3083 {
3084 	struct event_eth_rx_adapter *rx_adapter;
3085 	struct eth_device_info *dev_info;
3086 	struct eth_rx_queue_info *queue_info;
3087 	struct rte_eventdev *dev;
3088 
3089 	if (rxa_memzone_lookup())
3090 		return -ENOMEM;
3091 
3092 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3093 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3094 
3095 	rx_adapter = rxa_id_to_adapter(id);
3096 	if (rx_adapter == NULL)
3097 		return -EINVAL;
3098 
3099 	if (!rx_adapter->use_queue_event_buf)
3100 		return -EINVAL;
3101 
3102 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3103 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3104 		return -EINVAL;
3105 	}
3106 
3107 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3108 
3109 	if (dev_info->rx_queue == NULL ||
3110 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3111 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3112 		return -EINVAL;
3113 	}
3114 
3115 	if (dev_info->internal_event_port == 0) {
3116 		queue_info = &dev_info->rx_queue[rx_queue_id];
3117 		rxa_queue_stats_reset(queue_info);
3118 	}
3119 
3120 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3121 	if (dev->dev_ops->eth_rx_adapter_queue_stats_reset != NULL) {
3122 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_reset)(dev,
3123 						&rte_eth_devices[eth_dev_id],
3124 						rx_queue_id);
3125 	}
3126 
3127 	return 0;
3128 }
3129 
3130 int
3131 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
3132 {
3133 	struct event_eth_rx_adapter *rx_adapter;
3134 
3135 	if (rxa_memzone_lookup())
3136 		return -ENOMEM;
3137 
3138 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3139 
3140 	rx_adapter = rxa_id_to_adapter(id);
3141 	if (rx_adapter == NULL || service_id == NULL)
3142 		return -EINVAL;
3143 
3144 	if (rx_adapter->service_inited)
3145 		*service_id = rx_adapter->service_id;
3146 
3147 	return rx_adapter->service_inited ? 0 : -ESRCH;
3148 }
3149 
3150 int
3151 rte_event_eth_rx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
3152 {
3153 	struct event_eth_rx_adapter *rx_adapter;
3154 
3155 	if (rxa_memzone_lookup())
3156 		return -ENOMEM;
3157 
3158 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3159 
3160 	rx_adapter = rxa_id_to_adapter(id);
3161 	if (rx_adapter == NULL || event_port_id == NULL)
3162 		return -EINVAL;
3163 
3164 	if (rx_adapter->service_inited)
3165 		*event_port_id = rx_adapter->event_port_id;
3166 
3167 	return rx_adapter->service_inited ? 0 : -ESRCH;
3168 }
3169 
3170 int
3171 rte_event_eth_rx_adapter_cb_register(uint8_t id,
3172 					uint16_t eth_dev_id,
3173 					rte_event_eth_rx_adapter_cb_fn cb_fn,
3174 					void *cb_arg)
3175 {
3176 	struct event_eth_rx_adapter *rx_adapter;
3177 	struct eth_device_info *dev_info;
3178 	uint32_t cap;
3179 	int ret;
3180 
3181 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3182 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3183 
3184 	rx_adapter = rxa_id_to_adapter(id);
3185 	if (rx_adapter == NULL)
3186 		return -EINVAL;
3187 
3188 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3189 	if (dev_info->rx_queue == NULL)
3190 		return -EINVAL;
3191 
3192 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3193 						eth_dev_id,
3194 						&cap);
3195 	if (ret) {
3196 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3197 			"eth port %" PRIu16, id, eth_dev_id);
3198 		return ret;
3199 	}
3200 
3201 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3202 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
3203 				PRIu16, eth_dev_id);
3204 		return -EINVAL;
3205 	}
3206 
3207 	rte_spinlock_lock(&rx_adapter->rx_lock);
3208 	dev_info->cb_fn = cb_fn;
3209 	dev_info->cb_arg = cb_arg;
3210 	rte_spinlock_unlock(&rx_adapter->rx_lock);
3211 
3212 	return 0;
3213 }
3214 
3215 int
3216 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
3217 			uint16_t eth_dev_id,
3218 			uint16_t rx_queue_id,
3219 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
3220 {
3221 #define TICK2NSEC(_ticks, _freq) (((_ticks) * (1E9)) / (_freq))
3222 	struct rte_eventdev *dev;
3223 	struct event_eth_rx_adapter *rx_adapter;
3224 	struct eth_device_info *dev_info;
3225 	struct eth_rx_queue_info *queue_info;
3226 	int ret;
3227 
3228 	if (rxa_memzone_lookup())
3229 		return -ENOMEM;
3230 
3231 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3232 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3233 
3234 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3235 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3236 		return -EINVAL;
3237 	}
3238 
3239 	if (queue_conf == NULL) {
3240 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3241 		return -EINVAL;
3242 	}
3243 
3244 	rx_adapter = rxa_id_to_adapter(id);
3245 	if (rx_adapter == NULL)
3246 		return -EINVAL;
3247 
3248 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3249 	if (dev_info->rx_queue == NULL ||
3250 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3251 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3252 		return -EINVAL;
3253 	}
3254 
3255 	queue_info = &dev_info->rx_queue[rx_queue_id];
3256 
3257 	memset(queue_conf, 0, sizeof(*queue_conf));
3258 	queue_conf->rx_queue_flags = 0;
3259 	if (queue_info->flow_id_mask != 0)
3260 		queue_conf->rx_queue_flags |=
3261 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3262 	queue_conf->servicing_weight = queue_info->wt;
3263 
3264 	queue_conf->ev.event = queue_info->event;
3265 
3266 	queue_conf->vector_sz = queue_info->vector_data.max_vector_count;
3267 	queue_conf->vector_mp = queue_info->vector_data.vector_pool;
3268 	/* need to be converted from ticks to ns */
3269 	queue_conf->vector_timeout_ns = TICK2NSEC(
3270 		queue_info->vector_data.vector_timeout_ticks, rte_get_timer_hz());
3271 
3272 	if (queue_info->event_buf != NULL)
3273 		queue_conf->event_buf_size = queue_info->event_buf->events_size;
3274 	else
3275 		queue_conf->event_buf_size = 0;
3276 
3277 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3278 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3279 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3280 						&rte_eth_devices[eth_dev_id],
3281 						rx_queue_id,
3282 						queue_conf);
3283 		return ret;
3284 	}
3285 
3286 	return 0;
3287 }
3288 
3289 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_u64(d, #s, stats.s)
3290 
3291 static int
3292 handle_rxa_stats(const char *cmd __rte_unused,
3293 		 const char *params,
3294 		 struct rte_tel_data *d)
3295 {
3296 	uint8_t rx_adapter_id;
3297 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3298 
3299 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3300 		return -1;
3301 
3302 	/* Get Rx adapter ID from parameter string */
3303 	rx_adapter_id = atoi(params);
3304 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3305 
3306 	/* Get Rx adapter stats */
3307 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3308 					       &rx_adptr_stats)) {
3309 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats\n");
3310 		return -1;
3311 	}
3312 
3313 	rte_tel_data_start_dict(d);
3314 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3315 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3316 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3317 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3318 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3319 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3320 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3321 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3322 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3323 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3324 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3325 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3326 
3327 	return 0;
3328 }
3329 
3330 static int
3331 handle_rxa_stats_reset(const char *cmd __rte_unused,
3332 		       const char *params,
3333 		       struct rte_tel_data *d __rte_unused)
3334 {
3335 	uint8_t rx_adapter_id;
3336 
3337 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3338 		return -1;
3339 
3340 	/* Get Rx adapter ID from parameter string */
3341 	rx_adapter_id = atoi(params);
3342 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3343 
3344 	/* Reset Rx adapter stats */
3345 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3346 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats\n");
3347 		return -1;
3348 	}
3349 
3350 	return 0;
3351 }
3352 
3353 static int
3354 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3355 			  const char *params,
3356 			  struct rte_tel_data *d)
3357 {
3358 	uint8_t rx_adapter_id;
3359 	uint16_t rx_queue_id;
3360 	int eth_dev_id, ret = -1;
3361 	char *token, *l_params;
3362 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3363 
3364 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3365 		return -1;
3366 
3367 	/* Get Rx adapter ID from parameter string */
3368 	l_params = strdup(params);
3369 	if (l_params == NULL)
3370 		return -ENOMEM;
3371 	token = strtok(l_params, ",");
3372 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3373 	rx_adapter_id = strtoul(token, NULL, 10);
3374 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3375 
3376 	token = strtok(NULL, ",");
3377 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3378 
3379 	/* Get device ID from parameter string */
3380 	eth_dev_id = strtoul(token, NULL, 10);
3381 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3382 
3383 	token = strtok(NULL, ",");
3384 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3385 
3386 	/* Get Rx queue ID from parameter string */
3387 	rx_queue_id = strtoul(token, NULL, 10);
3388 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3389 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3390 		ret = -EINVAL;
3391 		goto error;
3392 	}
3393 
3394 	token = strtok(NULL, "\0");
3395 	if (token != NULL)
3396 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3397 				 " telemetry command, ignoring");
3398 	/* Parsing parameter finished */
3399 	free(l_params);
3400 
3401 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3402 						    rx_queue_id, &queue_conf)) {
3403 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3404 		return -1;
3405 	}
3406 
3407 	rte_tel_data_start_dict(d);
3408 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3409 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3410 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3411 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3412 	RXA_ADD_DICT(queue_conf, servicing_weight);
3413 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3414 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3415 	RXA_ADD_DICT(queue_conf.ev, priority);
3416 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3417 
3418 	return 0;
3419 
3420 error:
3421 	free(l_params);
3422 	return ret;
3423 }
3424 
3425 static int
3426 handle_rxa_get_queue_stats(const char *cmd __rte_unused,
3427 			   const char *params,
3428 			   struct rte_tel_data *d)
3429 {
3430 	uint8_t rx_adapter_id;
3431 	uint16_t rx_queue_id;
3432 	int eth_dev_id, ret = -1;
3433 	char *token, *l_params;
3434 	struct rte_event_eth_rx_adapter_queue_stats q_stats;
3435 
3436 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3437 		return -1;
3438 
3439 	/* Get Rx adapter ID from parameter string */
3440 	l_params = strdup(params);
3441 	if (l_params == NULL)
3442 		return -ENOMEM;
3443 	token = strtok(l_params, ",");
3444 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3445 	rx_adapter_id = strtoul(token, NULL, 10);
3446 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3447 
3448 	token = strtok(NULL, ",");
3449 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3450 
3451 	/* Get device ID from parameter string */
3452 	eth_dev_id = strtoul(token, NULL, 10);
3453 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3454 
3455 	token = strtok(NULL, ",");
3456 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3457 
3458 	/* Get Rx queue ID from parameter string */
3459 	rx_queue_id = strtoul(token, NULL, 10);
3460 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3461 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3462 		ret = -EINVAL;
3463 		goto error;
3464 	}
3465 
3466 	token = strtok(NULL, "\0");
3467 	if (token != NULL)
3468 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3469 				 " telemetry command, ignoring");
3470 	/* Parsing parameter finished */
3471 	free(l_params);
3472 
3473 	if (rte_event_eth_rx_adapter_queue_stats_get(rx_adapter_id, eth_dev_id,
3474 						    rx_queue_id, &q_stats)) {
3475 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue stats");
3476 		return -1;
3477 	}
3478 
3479 	rte_tel_data_start_dict(d);
3480 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3481 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3482 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3483 	RXA_ADD_DICT(q_stats, rx_event_buf_count);
3484 	RXA_ADD_DICT(q_stats, rx_event_buf_size);
3485 	RXA_ADD_DICT(q_stats, rx_poll_count);
3486 	RXA_ADD_DICT(q_stats, rx_packets);
3487 	RXA_ADD_DICT(q_stats, rx_dropped);
3488 
3489 	return 0;
3490 
3491 error:
3492 	free(l_params);
3493 	return ret;
3494 }
3495 
3496 static int
3497 handle_rxa_queue_stats_reset(const char *cmd __rte_unused,
3498 			     const char *params,
3499 			     struct rte_tel_data *d __rte_unused)
3500 {
3501 	uint8_t rx_adapter_id;
3502 	uint16_t rx_queue_id;
3503 	int eth_dev_id, ret = -1;
3504 	char *token, *l_params;
3505 
3506 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3507 		return -1;
3508 
3509 	/* Get Rx adapter ID from parameter string */
3510 	l_params = strdup(params);
3511 	if (l_params == NULL)
3512 		return -ENOMEM;
3513 	token = strtok(l_params, ",");
3514 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3515 	rx_adapter_id = strtoul(token, NULL, 10);
3516 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3517 
3518 	token = strtok(NULL, ",");
3519 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3520 
3521 	/* Get device ID from parameter string */
3522 	eth_dev_id = strtoul(token, NULL, 10);
3523 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3524 
3525 	token = strtok(NULL, ",");
3526 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3527 
3528 	/* Get Rx queue ID from parameter string */
3529 	rx_queue_id = strtoul(token, NULL, 10);
3530 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3531 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3532 		ret = -EINVAL;
3533 		goto error;
3534 	}
3535 
3536 	token = strtok(NULL, "\0");
3537 	if (token != NULL)
3538 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3539 				 " telemetry command, ignoring");
3540 	/* Parsing parameter finished */
3541 	free(l_params);
3542 
3543 	if (rte_event_eth_rx_adapter_queue_stats_reset(rx_adapter_id,
3544 						       eth_dev_id,
3545 						       rx_queue_id)) {
3546 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter queue stats");
3547 		return -1;
3548 	}
3549 
3550 	return 0;
3551 
3552 error:
3553 	free(l_params);
3554 	return ret;
3555 }
3556 
3557 RTE_INIT(rxa_init_telemetry)
3558 {
3559 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3560 		handle_rxa_stats,
3561 		"Returns Rx adapter stats. Parameter: rxa_id");
3562 
3563 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3564 		handle_rxa_stats_reset,
3565 		"Reset Rx adapter stats. Parameter: rxa_id");
3566 
3567 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3568 		handle_rxa_get_queue_conf,
3569 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3570 
3571 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats",
3572 		handle_rxa_get_queue_stats,
3573 		"Returns Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3574 
3575 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats_reset",
3576 		handle_rxa_queue_stats_reset,
3577 		"Reset Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3578 }
3579