xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 8b8036a66e3d59ffa58afb8d96fa2c73262155a7)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9 
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
21 #include <rte_telemetry.h>
22 
23 #include "rte_eventdev.h"
24 #include "eventdev_pmd.h"
25 #include "eventdev_trace.h"
26 #include "rte_event_eth_rx_adapter.h"
27 
28 #define BATCH_SIZE		32
29 #define BLOCK_CNT_THRESHOLD	10
30 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
31 #define MAX_VECTOR_SIZE		1024
32 #define MIN_VECTOR_SIZE		4
33 #define MAX_VECTOR_NS		1E9
34 #define MIN_VECTOR_NS		1E5
35 
36 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
37 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
38 
39 #define RSS_KEY_SIZE	40
40 /* value written to intr thread pipe to signal thread exit */
41 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
42 /* Sentinel value to detect initialized file handle */
43 #define INIT_FD		-1
44 
45 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
46 
47 /*
48  * Used to store port and queue ID of interrupting Rx queue
49  */
50 union queue_data {
51 	RTE_STD_C11
52 	void *ptr;
53 	struct {
54 		uint16_t port;
55 		uint16_t queue;
56 	};
57 };
58 
59 /*
60  * There is an instance of this struct per polled Rx queue added to the
61  * adapter
62  */
63 struct eth_rx_poll_entry {
64 	/* Eth port to poll */
65 	uint16_t eth_dev_id;
66 	/* Eth rx queue to poll */
67 	uint16_t eth_rx_qid;
68 };
69 
70 struct eth_rx_vector_data {
71 	TAILQ_ENTRY(eth_rx_vector_data) next;
72 	uint16_t port;
73 	uint16_t queue;
74 	uint16_t max_vector_count;
75 	uint64_t event;
76 	uint64_t ts;
77 	uint64_t vector_timeout_ticks;
78 	struct rte_mempool *vector_pool;
79 	struct rte_event_vector *vector_ev;
80 } __rte_cache_aligned;
81 
82 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
83 
84 /* Instance per adapter */
85 struct eth_event_enqueue_buffer {
86 	/* Count of events in this buffer */
87 	uint16_t count;
88 	/* Array of events in this buffer */
89 	struct rte_event *events;
90 	/* size of event buffer */
91 	uint16_t events_size;
92 	/* Event enqueue happens from head */
93 	uint16_t head;
94 	/* New packets from rte_eth_rx_burst is enqued from tail */
95 	uint16_t tail;
96 	/* last element in the buffer before rollover */
97 	uint16_t last;
98 	uint16_t last_mask;
99 };
100 
101 struct event_eth_rx_adapter {
102 	/* RSS key */
103 	uint8_t rss_key_be[RSS_KEY_SIZE];
104 	/* Event device identifier */
105 	uint8_t eventdev_id;
106 	/* Event port identifier */
107 	uint8_t event_port_id;
108 	/* Flag indicating per rxq event buffer */
109 	bool use_queue_event_buf;
110 	/* Per ethernet device structure */
111 	struct eth_device_info *eth_devices;
112 	/* Lock to serialize config updates with service function */
113 	rte_spinlock_t rx_lock;
114 	/* Max mbufs processed in any service function invocation */
115 	uint32_t max_nb_rx;
116 	/* Receive queues that need to be polled */
117 	struct eth_rx_poll_entry *eth_rx_poll;
118 	/* Size of the eth_rx_poll array */
119 	uint16_t num_rx_polled;
120 	/* Weighted round robin schedule */
121 	uint32_t *wrr_sched;
122 	/* wrr_sched[] size */
123 	uint32_t wrr_len;
124 	/* Next entry in wrr[] to begin polling */
125 	uint32_t wrr_pos;
126 	/* Event burst buffer */
127 	struct eth_event_enqueue_buffer event_enqueue_buffer;
128 	/* Vector enable flag */
129 	uint8_t ena_vector;
130 	/* Timestamp of previous vector expiry list traversal */
131 	uint64_t prev_expiry_ts;
132 	/* Minimum ticks to wait before traversing expiry list */
133 	uint64_t vector_tmo_ticks;
134 	/* vector list */
135 	struct eth_rx_vector_data_list vector_list;
136 	/* Per adapter stats */
137 	struct rte_event_eth_rx_adapter_stats stats;
138 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
139 	uint16_t enq_block_count;
140 	/* Block start ts */
141 	uint64_t rx_enq_block_start_ts;
142 	/* epoll fd used to wait for Rx interrupts */
143 	int epd;
144 	/* Num of interrupt driven interrupt queues */
145 	uint32_t num_rx_intr;
146 	/* Used to send <dev id, queue id> of interrupting Rx queues from
147 	 * the interrupt thread to the Rx thread
148 	 */
149 	struct rte_ring *intr_ring;
150 	/* Rx Queue data (dev id, queue id) for the last non-empty
151 	 * queue polled
152 	 */
153 	union queue_data qd;
154 	/* queue_data is valid */
155 	int qd_valid;
156 	/* Interrupt ring lock, synchronizes Rx thread
157 	 * and interrupt thread
158 	 */
159 	rte_spinlock_t intr_ring_lock;
160 	/* event array passed to rte_poll_wait */
161 	struct rte_epoll_event *epoll_events;
162 	/* Count of interrupt vectors in use */
163 	uint32_t num_intr_vec;
164 	/* Thread blocked on Rx interrupts */
165 	pthread_t rx_intr_thread;
166 	/* Configuration callback for rte_service configuration */
167 	rte_event_eth_rx_adapter_conf_cb conf_cb;
168 	/* Configuration callback argument */
169 	void *conf_arg;
170 	/* Set if  default_cb is being used */
171 	int default_cb_arg;
172 	/* Service initialization state */
173 	uint8_t service_inited;
174 	/* Total count of Rx queues in adapter */
175 	uint32_t nb_queues;
176 	/* Memory allocation name */
177 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
178 	/* Socket identifier cached from eventdev */
179 	int socket_id;
180 	/* Per adapter EAL service */
181 	uint32_t service_id;
182 	/* Adapter started flag */
183 	uint8_t rxa_started;
184 	/* Adapter ID */
185 	uint8_t id;
186 } __rte_cache_aligned;
187 
188 /* Per eth device */
189 struct eth_device_info {
190 	struct rte_eth_dev *dev;
191 	struct eth_rx_queue_info *rx_queue;
192 	/* Rx callback */
193 	rte_event_eth_rx_adapter_cb_fn cb_fn;
194 	/* Rx callback argument */
195 	void *cb_arg;
196 	/* Set if ethdev->eventdev packet transfer uses a
197 	 * hardware mechanism
198 	 */
199 	uint8_t internal_event_port;
200 	/* Set if the adapter is processing rx queues for
201 	 * this eth device and packet processing has been
202 	 * started, allows for the code to know if the PMD
203 	 * rx_adapter_stop callback needs to be invoked
204 	 */
205 	uint8_t dev_rx_started;
206 	/* Number of queues added for this device */
207 	uint16_t nb_dev_queues;
208 	/* Number of poll based queues
209 	 * If nb_rx_poll > 0, the start callback will
210 	 * be invoked if not already invoked
211 	 */
212 	uint16_t nb_rx_poll;
213 	/* Number of interrupt based queues
214 	 * If nb_rx_intr > 0, the start callback will
215 	 * be invoked if not already invoked.
216 	 */
217 	uint16_t nb_rx_intr;
218 	/* Number of queues that use the shared interrupt */
219 	uint16_t nb_shared_intr;
220 	/* sum(wrr(q)) for all queues within the device
221 	 * useful when deleting all device queues
222 	 */
223 	uint32_t wrr_len;
224 	/* Intr based queue index to start polling from, this is used
225 	 * if the number of shared interrupts is non-zero
226 	 */
227 	uint16_t next_q_idx;
228 	/* Intr based queue indices */
229 	uint16_t *intr_queue;
230 	/* device generates per Rx queue interrupt for queue index
231 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
232 	 */
233 	int multi_intr_cap;
234 	/* shared interrupt enabled */
235 	int shared_intr_enabled;
236 };
237 
238 /* Per Rx queue */
239 struct eth_rx_queue_info {
240 	int queue_enabled;	/* True if added */
241 	int intr_enabled;
242 	uint8_t ena_vector;
243 	uint16_t wt;		/* Polling weight */
244 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
245 	uint64_t event;
246 	struct eth_rx_vector_data vector_data;
247 	struct eth_event_enqueue_buffer *event_buf;
248 	/* use adapter stats struct for queue level stats,
249 	 * as same stats need to be updated for adapter and queue
250 	 */
251 	struct rte_event_eth_rx_adapter_stats *stats;
252 };
253 
254 static struct event_eth_rx_adapter **event_eth_rx_adapter;
255 
256 /* Enable dynamic timestamp field in mbuf */
257 static uint64_t event_eth_rx_timestamp_dynflag;
258 static int event_eth_rx_timestamp_dynfield_offset = -1;
259 
260 static inline rte_mbuf_timestamp_t *
261 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
262 {
263 	return RTE_MBUF_DYNFIELD(mbuf,
264 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
265 }
266 
267 static inline int
268 rxa_validate_id(uint8_t id)
269 {
270 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
271 }
272 
273 static inline struct eth_event_enqueue_buffer *
274 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
275 		  uint16_t rx_queue_id,
276 		  struct rte_event_eth_rx_adapter_stats **stats)
277 {
278 	if (rx_adapter->use_queue_event_buf) {
279 		struct eth_device_info *dev_info =
280 			&rx_adapter->eth_devices[eth_dev_id];
281 		*stats = dev_info->rx_queue[rx_queue_id].stats;
282 		return dev_info->rx_queue[rx_queue_id].event_buf;
283 	} else {
284 		*stats = &rx_adapter->stats;
285 		return &rx_adapter->event_enqueue_buffer;
286 	}
287 }
288 
289 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
290 	if (!rxa_validate_id(id)) { \
291 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
292 		return retval; \
293 	} \
294 } while (0)
295 
296 static inline int
297 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
298 {
299 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
300 }
301 
302 /* Greatest common divisor */
303 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
304 {
305 	uint16_t r = a % b;
306 
307 	return r ? rxa_gcd_u16(b, r) : b;
308 }
309 
310 /* Returns the next queue in the polling sequence
311  *
312  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
313  */
314 static int
315 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
316 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
317 	     uint16_t gcd, int prev)
318 {
319 	int i = prev;
320 	uint16_t w;
321 
322 	while (1) {
323 		uint16_t q;
324 		uint16_t d;
325 
326 		i = (i + 1) % n;
327 		if (i == 0) {
328 			*cw = *cw - gcd;
329 			if (*cw <= 0)
330 				*cw = max_wt;
331 		}
332 
333 		q = eth_rx_poll[i].eth_rx_qid;
334 		d = eth_rx_poll[i].eth_dev_id;
335 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
336 
337 		if ((int)w >= *cw)
338 			return i;
339 	}
340 }
341 
342 static inline int
343 rxa_shared_intr(struct eth_device_info *dev_info,
344 	int rx_queue_id)
345 {
346 	int multi_intr_cap;
347 
348 	if (dev_info->dev->intr_handle == NULL)
349 		return 0;
350 
351 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
352 	return !multi_intr_cap ||
353 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
354 }
355 
356 static inline int
357 rxa_intr_queue(struct eth_device_info *dev_info,
358 	int rx_queue_id)
359 {
360 	struct eth_rx_queue_info *queue_info;
361 
362 	queue_info = &dev_info->rx_queue[rx_queue_id];
363 	return dev_info->rx_queue &&
364 		!dev_info->internal_event_port &&
365 		queue_info->queue_enabled && queue_info->wt == 0;
366 }
367 
368 static inline int
369 rxa_polled_queue(struct eth_device_info *dev_info,
370 	int rx_queue_id)
371 {
372 	struct eth_rx_queue_info *queue_info;
373 
374 	queue_info = &dev_info->rx_queue[rx_queue_id];
375 	return !dev_info->internal_event_port &&
376 		dev_info->rx_queue &&
377 		queue_info->queue_enabled && queue_info->wt != 0;
378 }
379 
380 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
381 static int
382 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
383 {
384 	uint16_t i;
385 	int n, s;
386 	uint16_t nbq;
387 
388 	nbq = dev_info->dev->data->nb_rx_queues;
389 	n = 0; /* non shared count */
390 	s = 0; /* shared count */
391 
392 	if (rx_queue_id == -1) {
393 		for (i = 0; i < nbq; i++) {
394 			if (!rxa_shared_intr(dev_info, i))
395 				n += add ? !rxa_intr_queue(dev_info, i) :
396 					rxa_intr_queue(dev_info, i);
397 			else
398 				s += add ? !rxa_intr_queue(dev_info, i) :
399 					rxa_intr_queue(dev_info, i);
400 		}
401 
402 		if (s > 0) {
403 			if ((add && dev_info->nb_shared_intr == 0) ||
404 				(!add && dev_info->nb_shared_intr))
405 				n += 1;
406 		}
407 	} else {
408 		if (!rxa_shared_intr(dev_info, rx_queue_id))
409 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
410 				rxa_intr_queue(dev_info, rx_queue_id);
411 		else
412 			n = add ? !dev_info->nb_shared_intr :
413 				dev_info->nb_shared_intr == 1;
414 	}
415 
416 	return add ? n : -n;
417 }
418 
419 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
420  */
421 static void
422 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
423 			  struct eth_device_info *dev_info, int rx_queue_id,
424 			  uint32_t *nb_rx_intr)
425 {
426 	uint32_t intr_diff;
427 
428 	if (rx_queue_id == -1)
429 		intr_diff = dev_info->nb_rx_intr;
430 	else
431 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
432 
433 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
434 }
435 
436 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
437  * interrupt queues could currently be poll mode Rx queues
438  */
439 static void
440 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
441 			  struct eth_device_info *dev_info, int rx_queue_id,
442 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
443 			  uint32_t *nb_wrr)
444 {
445 	uint32_t intr_diff;
446 	uint32_t poll_diff;
447 	uint32_t wrr_len_diff;
448 
449 	if (rx_queue_id == -1) {
450 		intr_diff = dev_info->dev->data->nb_rx_queues -
451 						dev_info->nb_rx_intr;
452 		poll_diff = dev_info->nb_rx_poll;
453 		wrr_len_diff = dev_info->wrr_len;
454 	} else {
455 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
456 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
457 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
458 					0;
459 	}
460 
461 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
462 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
463 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
464 }
465 
466 /* Calculate size of the eth_rx_poll and wrr_sched arrays
467  * after deleting poll mode rx queues
468  */
469 static void
470 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
471 			  struct eth_device_info *dev_info, int rx_queue_id,
472 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
473 {
474 	uint32_t poll_diff;
475 	uint32_t wrr_len_diff;
476 
477 	if (rx_queue_id == -1) {
478 		poll_diff = dev_info->nb_rx_poll;
479 		wrr_len_diff = dev_info->wrr_len;
480 	} else {
481 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
482 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
483 					0;
484 	}
485 
486 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
487 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
488 }
489 
490 /* Calculate nb_rx_* after adding poll mode rx queues
491  */
492 static void
493 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
494 			  struct eth_device_info *dev_info, int rx_queue_id,
495 			  uint16_t wt, uint32_t *nb_rx_poll,
496 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
497 {
498 	uint32_t intr_diff;
499 	uint32_t poll_diff;
500 	uint32_t wrr_len_diff;
501 
502 	if (rx_queue_id == -1) {
503 		intr_diff = dev_info->nb_rx_intr;
504 		poll_diff = dev_info->dev->data->nb_rx_queues -
505 						dev_info->nb_rx_poll;
506 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
507 				- dev_info->wrr_len;
508 	} else {
509 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
510 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
511 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
512 				wt - dev_info->rx_queue[rx_queue_id].wt :
513 				wt;
514 	}
515 
516 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
517 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
518 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
519 }
520 
521 /* Calculate nb_rx_* after adding rx_queue_id */
522 static void
523 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
524 		     struct eth_device_info *dev_info, int rx_queue_id,
525 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
526 		     uint32_t *nb_wrr)
527 {
528 	if (wt != 0)
529 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
530 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
531 	else
532 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
533 					nb_rx_poll, nb_rx_intr, nb_wrr);
534 }
535 
536 /* Calculate nb_rx_* after deleting rx_queue_id */
537 static void
538 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
539 		     struct eth_device_info *dev_info, int rx_queue_id,
540 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
541 		     uint32_t *nb_wrr)
542 {
543 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
544 				nb_wrr);
545 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
546 				nb_rx_intr);
547 }
548 
549 /*
550  * Allocate the rx_poll array
551  */
552 static struct eth_rx_poll_entry *
553 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
554 {
555 	size_t len;
556 
557 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
558 							RTE_CACHE_LINE_SIZE);
559 	return  rte_zmalloc_socket(rx_adapter->mem_name,
560 				len,
561 				RTE_CACHE_LINE_SIZE,
562 				rx_adapter->socket_id);
563 }
564 
565 /*
566  * Allocate the WRR array
567  */
568 static uint32_t *
569 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
570 {
571 	size_t len;
572 
573 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
574 			RTE_CACHE_LINE_SIZE);
575 	return  rte_zmalloc_socket(rx_adapter->mem_name,
576 				len,
577 				RTE_CACHE_LINE_SIZE,
578 				rx_adapter->socket_id);
579 }
580 
581 static int
582 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
583 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
584 		      uint32_t **wrr_sched)
585 {
586 
587 	if (nb_poll == 0) {
588 		*rx_poll = NULL;
589 		*wrr_sched = NULL;
590 		return 0;
591 	}
592 
593 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
594 	if (*rx_poll == NULL) {
595 		*wrr_sched = NULL;
596 		return -ENOMEM;
597 	}
598 
599 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
600 	if (*wrr_sched == NULL) {
601 		rte_free(*rx_poll);
602 		return -ENOMEM;
603 	}
604 	return 0;
605 }
606 
607 /* Precalculate WRR polling sequence for all queues in rx_adapter */
608 static void
609 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
610 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
611 {
612 	uint16_t d;
613 	uint16_t q;
614 	unsigned int i;
615 	int prev = -1;
616 	int cw = -1;
617 
618 	/* Initialize variables for calculation of wrr schedule */
619 	uint16_t max_wrr_pos = 0;
620 	unsigned int poll_q = 0;
621 	uint16_t max_wt = 0;
622 	uint16_t gcd = 0;
623 
624 	if (rx_poll == NULL)
625 		return;
626 
627 	/* Generate array of all queues to poll, the size of this
628 	 * array is poll_q
629 	 */
630 	RTE_ETH_FOREACH_DEV(d) {
631 		uint16_t nb_rx_queues;
632 		struct eth_device_info *dev_info =
633 				&rx_adapter->eth_devices[d];
634 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
635 		if (dev_info->rx_queue == NULL)
636 			continue;
637 		if (dev_info->internal_event_port)
638 			continue;
639 		dev_info->wrr_len = 0;
640 		for (q = 0; q < nb_rx_queues; q++) {
641 			struct eth_rx_queue_info *queue_info =
642 				&dev_info->rx_queue[q];
643 			uint16_t wt;
644 
645 			if (!rxa_polled_queue(dev_info, q))
646 				continue;
647 			wt = queue_info->wt;
648 			rx_poll[poll_q].eth_dev_id = d;
649 			rx_poll[poll_q].eth_rx_qid = q;
650 			max_wrr_pos += wt;
651 			dev_info->wrr_len += wt;
652 			max_wt = RTE_MAX(max_wt, wt);
653 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
654 			poll_q++;
655 		}
656 	}
657 
658 	/* Generate polling sequence based on weights */
659 	prev = -1;
660 	cw = -1;
661 	for (i = 0; i < max_wrr_pos; i++) {
662 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
663 				     rx_poll, max_wt, gcd, prev);
664 		prev = rx_wrr[i];
665 	}
666 }
667 
668 static inline void
669 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
670 	struct rte_ipv6_hdr **ipv6_hdr)
671 {
672 	struct rte_ether_hdr *eth_hdr =
673 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
674 	struct rte_vlan_hdr *vlan_hdr;
675 
676 	*ipv4_hdr = NULL;
677 	*ipv6_hdr = NULL;
678 
679 	switch (eth_hdr->ether_type) {
680 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
681 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
682 		break;
683 
684 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
685 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
686 		break;
687 
688 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
689 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
690 		switch (vlan_hdr->eth_proto) {
691 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
692 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
693 			break;
694 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
695 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
696 			break;
697 		default:
698 			break;
699 		}
700 		break;
701 
702 	default:
703 		break;
704 	}
705 }
706 
707 /* Calculate RSS hash for IPv4/6 */
708 static inline uint32_t
709 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
710 {
711 	uint32_t input_len;
712 	void *tuple;
713 	struct rte_ipv4_tuple ipv4_tuple;
714 	struct rte_ipv6_tuple ipv6_tuple;
715 	struct rte_ipv4_hdr *ipv4_hdr;
716 	struct rte_ipv6_hdr *ipv6_hdr;
717 
718 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
719 
720 	if (ipv4_hdr) {
721 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
722 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
723 		tuple = &ipv4_tuple;
724 		input_len = RTE_THASH_V4_L3_LEN;
725 	} else if (ipv6_hdr) {
726 		rte_thash_load_v6_addrs(ipv6_hdr,
727 					(union rte_thash_tuple *)&ipv6_tuple);
728 		tuple = &ipv6_tuple;
729 		input_len = RTE_THASH_V6_L3_LEN;
730 	} else
731 		return 0;
732 
733 	return rte_softrss_be(tuple, input_len, rss_key_be);
734 }
735 
736 static inline int
737 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
738 {
739 	return !!rx_adapter->enq_block_count;
740 }
741 
742 static inline void
743 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
744 {
745 	if (rx_adapter->rx_enq_block_start_ts)
746 		return;
747 
748 	rx_adapter->enq_block_count++;
749 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
750 		return;
751 
752 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
753 }
754 
755 static inline void
756 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
757 		     struct rte_event_eth_rx_adapter_stats *stats)
758 {
759 	if (unlikely(!stats->rx_enq_start_ts))
760 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
761 
762 	if (likely(!rxa_enq_blocked(rx_adapter)))
763 		return;
764 
765 	rx_adapter->enq_block_count = 0;
766 	if (rx_adapter->rx_enq_block_start_ts) {
767 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
768 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
769 		    rx_adapter->rx_enq_block_start_ts;
770 		rx_adapter->rx_enq_block_start_ts = 0;
771 	}
772 }
773 
774 /* Enqueue buffered events to event device */
775 static inline uint16_t
776 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
777 		       struct eth_event_enqueue_buffer *buf,
778 		       struct rte_event_eth_rx_adapter_stats *stats)
779 {
780 	uint16_t count = buf->last ? buf->last - buf->head : buf->count;
781 
782 	if (!count)
783 		return 0;
784 
785 	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
786 					rx_adapter->event_port_id,
787 					&buf->events[buf->head],
788 					count);
789 	if (n != count)
790 		stats->rx_enq_retry++;
791 
792 	buf->head += n;
793 
794 	if (buf->last && n == count) {
795 		uint16_t n1;
796 
797 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
798 					rx_adapter->event_port_id,
799 					&buf->events[0],
800 					buf->tail);
801 
802 		if (n1 != buf->tail)
803 			stats->rx_enq_retry++;
804 
805 		buf->last = 0;
806 		buf->head = n1;
807 		buf->last_mask = 0;
808 		n += n1;
809 	}
810 
811 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
812 		rxa_enq_block_start_ts(rx_adapter);
813 
814 	buf->count -= n;
815 	stats->rx_enq_count += n;
816 
817 	return n;
818 }
819 
820 static inline void
821 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
822 		struct eth_rx_vector_data *vec)
823 {
824 	vec->vector_ev->nb_elem = 0;
825 	vec->vector_ev->port = vec->port;
826 	vec->vector_ev->queue = vec->queue;
827 	vec->vector_ev->attr_valid = true;
828 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
829 }
830 
831 static inline uint16_t
832 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
833 			struct eth_rx_queue_info *queue_info,
834 			struct eth_event_enqueue_buffer *buf,
835 			struct rte_mbuf **mbufs, uint16_t num)
836 {
837 	struct rte_event *ev = &buf->events[buf->count];
838 	struct eth_rx_vector_data *vec;
839 	uint16_t filled, space, sz;
840 
841 	filled = 0;
842 	vec = &queue_info->vector_data;
843 
844 	if (vec->vector_ev == NULL) {
845 		if (rte_mempool_get(vec->vector_pool,
846 				    (void **)&vec->vector_ev) < 0) {
847 			rte_pktmbuf_free_bulk(mbufs, num);
848 			return 0;
849 		}
850 		rxa_init_vector(rx_adapter, vec);
851 	}
852 	while (num) {
853 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
854 			/* Event ready. */
855 			ev->event = vec->event;
856 			ev->vec = vec->vector_ev;
857 			ev++;
858 			filled++;
859 			vec->vector_ev = NULL;
860 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
861 			if (rte_mempool_get(vec->vector_pool,
862 					    (void **)&vec->vector_ev) < 0) {
863 				rte_pktmbuf_free_bulk(mbufs, num);
864 				return 0;
865 			}
866 			rxa_init_vector(rx_adapter, vec);
867 		}
868 
869 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
870 		sz = num > space ? space : num;
871 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
872 		       sizeof(void *) * sz);
873 		vec->vector_ev->nb_elem += sz;
874 		num -= sz;
875 		mbufs += sz;
876 		vec->ts = rte_rdtsc();
877 	}
878 
879 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
880 		ev->event = vec->event;
881 		ev->vec = vec->vector_ev;
882 		ev++;
883 		filled++;
884 		vec->vector_ev = NULL;
885 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
886 	}
887 
888 	return filled;
889 }
890 
891 static inline void
892 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
893 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
894 		 struct eth_event_enqueue_buffer *buf,
895 		 struct rte_event_eth_rx_adapter_stats *stats)
896 {
897 	uint32_t i;
898 	struct eth_device_info *dev_info =
899 					&rx_adapter->eth_devices[eth_dev_id];
900 	struct eth_rx_queue_info *eth_rx_queue_info =
901 					&dev_info->rx_queue[rx_queue_id];
902 	uint16_t new_tail = buf->tail;
903 	uint64_t event = eth_rx_queue_info->event;
904 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
905 	struct rte_mbuf *m = mbufs[0];
906 	uint32_t rss_mask;
907 	uint32_t rss;
908 	int do_rss;
909 	uint16_t nb_cb;
910 	uint16_t dropped;
911 	uint64_t ts, ts_mask;
912 
913 	if (!eth_rx_queue_info->ena_vector) {
914 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
915 						0 : rte_get_tsc_cycles();
916 
917 		/* 0xffff ffff ffff ffff if RTE_MBUF_F_RX_TIMESTAMP is set,
918 		 * otherwise 0
919 		 */
920 		ts_mask = (uint64_t)(!(m->ol_flags &
921 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
922 
923 		/* 0xffff ffff if RTE_MBUF_F_RX_RSS_HASH is set, otherwise 0 */
924 		rss_mask = ~(((m->ol_flags & RTE_MBUF_F_RX_RSS_HASH) != 0) - 1);
925 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
926 		for (i = 0; i < num; i++) {
927 			struct rte_event *ev;
928 
929 			m = mbufs[i];
930 			*rxa_timestamp_dynfield(m) = ts |
931 					(*rxa_timestamp_dynfield(m) & ts_mask);
932 
933 			ev = &buf->events[new_tail];
934 
935 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
936 				     : m->hash.rss;
937 			ev->event = event;
938 			ev->flow_id = (rss & ~flow_id_mask) |
939 				      (ev->flow_id & flow_id_mask);
940 			ev->mbuf = m;
941 			new_tail++;
942 		}
943 	} else {
944 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
945 					      buf, mbufs, num);
946 	}
947 
948 	if (num && dev_info->cb_fn) {
949 
950 		dropped = 0;
951 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
952 				       buf->last |
953 				       (buf->events_size & ~buf->last_mask),
954 				       buf->count >= BATCH_SIZE ?
955 						buf->count - BATCH_SIZE : 0,
956 				       &buf->events[buf->tail],
957 				       num,
958 				       dev_info->cb_arg,
959 				       &dropped);
960 		if (unlikely(nb_cb > num))
961 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
962 				nb_cb, num);
963 		else
964 			num = nb_cb;
965 		if (dropped)
966 			stats->rx_dropped += dropped;
967 	}
968 
969 	buf->count += num;
970 	buf->tail += num;
971 }
972 
973 static inline bool
974 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
975 {
976 	uint32_t nb_req = buf->tail + BATCH_SIZE;
977 
978 	if (!buf->last) {
979 		if (nb_req <= buf->events_size)
980 			return true;
981 
982 		if (buf->head >= BATCH_SIZE) {
983 			buf->last_mask = ~0;
984 			buf->last = buf->tail;
985 			buf->tail = 0;
986 			return true;
987 		}
988 	}
989 
990 	return nb_req <= buf->head;
991 }
992 
993 /* Enqueue packets from  <port, q>  to event buffer */
994 static inline uint32_t
995 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
996 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
997 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf,
998 	   struct rte_event_eth_rx_adapter_stats *stats)
999 {
1000 	struct rte_mbuf *mbufs[BATCH_SIZE];
1001 	uint16_t n;
1002 	uint32_t nb_rx = 0;
1003 	uint32_t nb_flushed = 0;
1004 
1005 	if (rxq_empty)
1006 		*rxq_empty = 0;
1007 	/* Don't do a batch dequeue from the rx queue if there isn't
1008 	 * enough space in the enqueue buffer.
1009 	 */
1010 	while (rxa_pkt_buf_available(buf)) {
1011 		if (buf->count >= BATCH_SIZE)
1012 			nb_flushed +=
1013 				rxa_flush_event_buffer(rx_adapter, buf, stats);
1014 
1015 		stats->rx_poll_count++;
1016 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1017 		if (unlikely(!n)) {
1018 			if (rxq_empty)
1019 				*rxq_empty = 1;
1020 			break;
1021 		}
1022 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf,
1023 				 stats);
1024 		nb_rx += n;
1025 		if (rx_count + nb_rx > max_rx)
1026 			break;
1027 	}
1028 
1029 	if (buf->count > 0)
1030 		nb_flushed += rxa_flush_event_buffer(rx_adapter, buf, stats);
1031 
1032 	stats->rx_packets += nb_rx;
1033 	if (nb_flushed == 0)
1034 		rte_event_maintain(rx_adapter->eventdev_id,
1035 				   rx_adapter->event_port_id, 0);
1036 
1037 	return nb_rx;
1038 }
1039 
1040 static inline void
1041 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1042 {
1043 	uint16_t port_id;
1044 	uint16_t queue;
1045 	int err;
1046 	union queue_data qd;
1047 	struct eth_device_info *dev_info;
1048 	struct eth_rx_queue_info *queue_info;
1049 	int *intr_enabled;
1050 
1051 	qd.ptr = data;
1052 	port_id = qd.port;
1053 	queue = qd.queue;
1054 
1055 	dev_info = &rx_adapter->eth_devices[port_id];
1056 	queue_info = &dev_info->rx_queue[queue];
1057 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1058 	if (rxa_shared_intr(dev_info, queue))
1059 		intr_enabled = &dev_info->shared_intr_enabled;
1060 	else
1061 		intr_enabled = &queue_info->intr_enabled;
1062 
1063 	if (*intr_enabled) {
1064 		*intr_enabled = 0;
1065 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1066 		/* Entry should always be available.
1067 		 * The ring size equals the maximum number of interrupt
1068 		 * vectors supported (an interrupt vector is shared in
1069 		 * case of shared interrupts)
1070 		 */
1071 		if (err)
1072 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1073 				" to ring: %s", strerror(-err));
1074 		else
1075 			rte_eth_dev_rx_intr_disable(port_id, queue);
1076 	}
1077 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1078 }
1079 
1080 static int
1081 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1082 			  uint32_t num_intr_vec)
1083 {
1084 	if (rx_adapter->num_intr_vec + num_intr_vec >
1085 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1086 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1087 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1088 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1089 		return -ENOSPC;
1090 	}
1091 
1092 	return 0;
1093 }
1094 
1095 /* Delete entries for (dev, queue) from the interrupt ring */
1096 static void
1097 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1098 			  struct eth_device_info *dev_info,
1099 			  uint16_t rx_queue_id)
1100 {
1101 	int i, n;
1102 	union queue_data qd;
1103 
1104 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1105 
1106 	n = rte_ring_count(rx_adapter->intr_ring);
1107 	for (i = 0; i < n; i++) {
1108 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1109 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1110 			if (qd.port == dev_info->dev->data->port_id &&
1111 				qd.queue == rx_queue_id)
1112 				continue;
1113 		} else {
1114 			if (qd.port == dev_info->dev->data->port_id)
1115 				continue;
1116 		}
1117 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1118 	}
1119 
1120 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1121 }
1122 
1123 /* pthread callback handling interrupt mode receive queues
1124  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1125  * interrupting queue to the adapter's ring buffer for interrupt events.
1126  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1127  * the adapter service function.
1128  */
1129 static void *
1130 rxa_intr_thread(void *arg)
1131 {
1132 	struct event_eth_rx_adapter *rx_adapter = arg;
1133 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1134 	int n, i;
1135 
1136 	while (1) {
1137 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1138 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1139 		if (unlikely(n < 0))
1140 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1141 					n);
1142 		for (i = 0; i < n; i++) {
1143 			rxa_intr_ring_enqueue(rx_adapter,
1144 					epoll_events[i].epdata.data);
1145 		}
1146 	}
1147 
1148 	return NULL;
1149 }
1150 
1151 /* Dequeue <port, q> from interrupt ring and enqueue received
1152  * mbufs to eventdev
1153  */
1154 static inline void
1155 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1156 {
1157 	uint32_t n;
1158 	uint32_t nb_rx = 0;
1159 	int rxq_empty;
1160 	struct eth_event_enqueue_buffer *buf;
1161 	struct rte_event_eth_rx_adapter_stats *stats;
1162 	rte_spinlock_t *ring_lock;
1163 	uint8_t max_done = 0;
1164 
1165 	if (rx_adapter->num_rx_intr == 0)
1166 		return;
1167 
1168 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1169 		&& !rx_adapter->qd_valid)
1170 		return;
1171 
1172 	buf = &rx_adapter->event_enqueue_buffer;
1173 	stats = &rx_adapter->stats;
1174 	ring_lock = &rx_adapter->intr_ring_lock;
1175 
1176 	if (buf->count >= BATCH_SIZE)
1177 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1178 
1179 	while (rxa_pkt_buf_available(buf)) {
1180 		struct eth_device_info *dev_info;
1181 		uint16_t port;
1182 		uint16_t queue;
1183 		union queue_data qd  = rx_adapter->qd;
1184 		int err;
1185 
1186 		if (!rx_adapter->qd_valid) {
1187 			struct eth_rx_queue_info *queue_info;
1188 
1189 			rte_spinlock_lock(ring_lock);
1190 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1191 			if (err) {
1192 				rte_spinlock_unlock(ring_lock);
1193 				break;
1194 			}
1195 
1196 			port = qd.port;
1197 			queue = qd.queue;
1198 			rx_adapter->qd = qd;
1199 			rx_adapter->qd_valid = 1;
1200 			dev_info = &rx_adapter->eth_devices[port];
1201 			if (rxa_shared_intr(dev_info, queue))
1202 				dev_info->shared_intr_enabled = 1;
1203 			else {
1204 				queue_info = &dev_info->rx_queue[queue];
1205 				queue_info->intr_enabled = 1;
1206 			}
1207 			rte_eth_dev_rx_intr_enable(port, queue);
1208 			rte_spinlock_unlock(ring_lock);
1209 		} else {
1210 			port = qd.port;
1211 			queue = qd.queue;
1212 
1213 			dev_info = &rx_adapter->eth_devices[port];
1214 		}
1215 
1216 		if (rxa_shared_intr(dev_info, queue)) {
1217 			uint16_t i;
1218 			uint16_t nb_queues;
1219 
1220 			nb_queues = dev_info->dev->data->nb_rx_queues;
1221 			n = 0;
1222 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1223 				uint8_t enq_buffer_full;
1224 
1225 				if (!rxa_intr_queue(dev_info, i))
1226 					continue;
1227 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1228 					rx_adapter->max_nb_rx,
1229 					&rxq_empty, buf, stats);
1230 				nb_rx += n;
1231 
1232 				enq_buffer_full = !rxq_empty && n == 0;
1233 				max_done = nb_rx > rx_adapter->max_nb_rx;
1234 
1235 				if (enq_buffer_full || max_done) {
1236 					dev_info->next_q_idx = i;
1237 					goto done;
1238 				}
1239 			}
1240 
1241 			rx_adapter->qd_valid = 0;
1242 
1243 			/* Reinitialize for next interrupt */
1244 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1245 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1246 						0;
1247 		} else {
1248 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1249 				rx_adapter->max_nb_rx,
1250 				&rxq_empty, buf, stats);
1251 			rx_adapter->qd_valid = !rxq_empty;
1252 			nb_rx += n;
1253 			if (nb_rx > rx_adapter->max_nb_rx)
1254 				break;
1255 		}
1256 	}
1257 
1258 done:
1259 	rx_adapter->stats.rx_intr_packets += nb_rx;
1260 }
1261 
1262 /*
1263  * Polls receive queues added to the event adapter and enqueues received
1264  * packets to the event device.
1265  *
1266  * The receive code enqueues initially to a temporary buffer, the
1267  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1268  *
1269  * If there isn't space available in the temporary buffer, packets from the
1270  * Rx queue aren't dequeued from the eth device, this back pressures the
1271  * eth device, in virtual device environments this back pressure is relayed to
1272  * the hypervisor's switching layer where adjustments can be made to deal with
1273  * it.
1274  */
1275 static inline void
1276 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1277 {
1278 	uint32_t num_queue;
1279 	uint32_t nb_rx = 0;
1280 	struct eth_event_enqueue_buffer *buf = NULL;
1281 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1282 	uint32_t wrr_pos;
1283 	uint32_t max_nb_rx;
1284 
1285 	wrr_pos = rx_adapter->wrr_pos;
1286 	max_nb_rx = rx_adapter->max_nb_rx;
1287 
1288 	/* Iterate through a WRR sequence */
1289 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1290 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1291 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1292 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1293 
1294 		buf = rxa_event_buf_get(rx_adapter, d, qid, &stats);
1295 
1296 		/* Don't do a batch dequeue from the rx queue if there isn't
1297 		 * enough space in the enqueue buffer.
1298 		 */
1299 		if (buf->count >= BATCH_SIZE)
1300 			rxa_flush_event_buffer(rx_adapter, buf, stats);
1301 		if (!rxa_pkt_buf_available(buf)) {
1302 			if (rx_adapter->use_queue_event_buf)
1303 				goto poll_next_entry;
1304 			else {
1305 				rx_adapter->wrr_pos = wrr_pos;
1306 				return;
1307 			}
1308 		}
1309 
1310 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1311 				NULL, buf, stats);
1312 		if (nb_rx > max_nb_rx) {
1313 			rx_adapter->wrr_pos =
1314 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1315 			break;
1316 		}
1317 
1318 poll_next_entry:
1319 		if (++wrr_pos == rx_adapter->wrr_len)
1320 			wrr_pos = 0;
1321 	}
1322 }
1323 
1324 static void
1325 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1326 {
1327 	struct event_eth_rx_adapter *rx_adapter = arg;
1328 	struct eth_event_enqueue_buffer *buf = NULL;
1329 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1330 	struct rte_event *ev;
1331 
1332 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue, &stats);
1333 
1334 	if (buf->count)
1335 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1336 
1337 	if (vec->vector_ev->nb_elem == 0)
1338 		return;
1339 	ev = &buf->events[buf->count];
1340 
1341 	/* Event ready. */
1342 	ev->event = vec->event;
1343 	ev->vec = vec->vector_ev;
1344 	buf->count++;
1345 
1346 	vec->vector_ev = NULL;
1347 	vec->ts = 0;
1348 }
1349 
1350 static int
1351 rxa_service_func(void *args)
1352 {
1353 	struct event_eth_rx_adapter *rx_adapter = args;
1354 
1355 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1356 		return 0;
1357 	if (!rx_adapter->rxa_started) {
1358 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1359 		return 0;
1360 	}
1361 
1362 	if (rx_adapter->ena_vector) {
1363 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1364 		    rx_adapter->vector_tmo_ticks) {
1365 			struct eth_rx_vector_data *vec;
1366 
1367 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1368 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1369 
1370 				if (elapsed_time >= vec->vector_timeout_ticks) {
1371 					rxa_vector_expire(vec, rx_adapter);
1372 					TAILQ_REMOVE(&rx_adapter->vector_list,
1373 						     vec, next);
1374 				}
1375 			}
1376 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1377 		}
1378 	}
1379 
1380 	rxa_intr_ring_dequeue(rx_adapter);
1381 	rxa_poll(rx_adapter);
1382 
1383 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1384 
1385 	return 0;
1386 }
1387 
1388 static int
1389 rte_event_eth_rx_adapter_init(void)
1390 {
1391 	const char *name = RXA_ADAPTER_ARRAY;
1392 	const struct rte_memzone *mz;
1393 	unsigned int sz;
1394 
1395 	sz = sizeof(*event_eth_rx_adapter) *
1396 	    RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1397 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1398 
1399 	mz = rte_memzone_lookup(name);
1400 	if (mz == NULL) {
1401 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1402 						 RTE_CACHE_LINE_SIZE);
1403 		if (mz == NULL) {
1404 			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1405 					PRId32, rte_errno);
1406 			return -rte_errno;
1407 		}
1408 	}
1409 
1410 	event_eth_rx_adapter = mz->addr;
1411 	return 0;
1412 }
1413 
1414 static int
1415 rxa_memzone_lookup(void)
1416 {
1417 	const struct rte_memzone *mz;
1418 
1419 	if (event_eth_rx_adapter == NULL) {
1420 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1421 		if (mz == NULL)
1422 			return -ENOMEM;
1423 		event_eth_rx_adapter = mz->addr;
1424 	}
1425 
1426 	return 0;
1427 }
1428 
1429 static inline struct event_eth_rx_adapter *
1430 rxa_id_to_adapter(uint8_t id)
1431 {
1432 	return event_eth_rx_adapter ?
1433 		event_eth_rx_adapter[id] : NULL;
1434 }
1435 
1436 static int
1437 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1438 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1439 {
1440 	int ret;
1441 	struct rte_eventdev *dev;
1442 	struct rte_event_dev_config dev_conf;
1443 	int started;
1444 	uint8_t port_id;
1445 	struct rte_event_port_conf *port_conf = arg;
1446 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1447 
1448 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1449 	dev_conf = dev->data->dev_conf;
1450 
1451 	started = dev->data->dev_started;
1452 	if (started)
1453 		rte_event_dev_stop(dev_id);
1454 	port_id = dev_conf.nb_event_ports;
1455 	dev_conf.nb_event_ports += 1;
1456 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1457 	if (ret) {
1458 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1459 						dev_id);
1460 		if (started) {
1461 			if (rte_event_dev_start(dev_id))
1462 				return -EIO;
1463 		}
1464 		return ret;
1465 	}
1466 
1467 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1468 	if (ret) {
1469 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1470 					port_id);
1471 		return ret;
1472 	}
1473 
1474 	conf->event_port_id = port_id;
1475 	conf->max_nb_rx = 128;
1476 	if (started)
1477 		ret = rte_event_dev_start(dev_id);
1478 	rx_adapter->default_cb_arg = 1;
1479 	return ret;
1480 }
1481 
1482 static int
1483 rxa_epoll_create1(void)
1484 {
1485 #if defined(LINUX)
1486 	int fd;
1487 	fd = epoll_create1(EPOLL_CLOEXEC);
1488 	return fd < 0 ? -errno : fd;
1489 #elif defined(BSD)
1490 	return -ENOTSUP;
1491 #endif
1492 }
1493 
1494 static int
1495 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1496 {
1497 	if (rx_adapter->epd != INIT_FD)
1498 		return 0;
1499 
1500 	rx_adapter->epd = rxa_epoll_create1();
1501 	if (rx_adapter->epd < 0) {
1502 		int err = rx_adapter->epd;
1503 		rx_adapter->epd = INIT_FD;
1504 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1505 		return err;
1506 	}
1507 
1508 	return 0;
1509 }
1510 
1511 static int
1512 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1513 {
1514 	int err;
1515 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1516 
1517 	if (rx_adapter->intr_ring)
1518 		return 0;
1519 
1520 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1521 					RTE_EVENT_ETH_INTR_RING_SIZE,
1522 					rte_socket_id(), 0);
1523 	if (!rx_adapter->intr_ring)
1524 		return -ENOMEM;
1525 
1526 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1527 					RTE_EVENT_ETH_INTR_RING_SIZE *
1528 					sizeof(struct rte_epoll_event),
1529 					RTE_CACHE_LINE_SIZE,
1530 					rx_adapter->socket_id);
1531 	if (!rx_adapter->epoll_events) {
1532 		err = -ENOMEM;
1533 		goto error;
1534 	}
1535 
1536 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1537 
1538 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1539 			"rx-intr-thread-%d", rx_adapter->id);
1540 
1541 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1542 				NULL, rxa_intr_thread, rx_adapter);
1543 	if (!err)
1544 		return 0;
1545 
1546 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1547 	rte_free(rx_adapter->epoll_events);
1548 error:
1549 	rte_ring_free(rx_adapter->intr_ring);
1550 	rx_adapter->intr_ring = NULL;
1551 	rx_adapter->epoll_events = NULL;
1552 	return err;
1553 }
1554 
1555 static int
1556 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1557 {
1558 	int err;
1559 
1560 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1561 	if (err)
1562 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1563 				err);
1564 
1565 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1566 	if (err)
1567 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1568 
1569 	rte_free(rx_adapter->epoll_events);
1570 	rte_ring_free(rx_adapter->intr_ring);
1571 	rx_adapter->intr_ring = NULL;
1572 	rx_adapter->epoll_events = NULL;
1573 	return 0;
1574 }
1575 
1576 static int
1577 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1578 {
1579 	int ret;
1580 
1581 	if (rx_adapter->num_rx_intr == 0)
1582 		return 0;
1583 
1584 	ret = rxa_destroy_intr_thread(rx_adapter);
1585 	if (ret)
1586 		return ret;
1587 
1588 	close(rx_adapter->epd);
1589 	rx_adapter->epd = INIT_FD;
1590 
1591 	return ret;
1592 }
1593 
1594 static int
1595 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1596 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1597 {
1598 	int err;
1599 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1600 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1601 
1602 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1603 	if (err) {
1604 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1605 			rx_queue_id);
1606 		return err;
1607 	}
1608 
1609 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1610 					rx_adapter->epd,
1611 					RTE_INTR_EVENT_DEL,
1612 					0);
1613 	if (err)
1614 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1615 
1616 	if (sintr)
1617 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1618 	else
1619 		dev_info->shared_intr_enabled = 0;
1620 	return err;
1621 }
1622 
1623 static int
1624 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1625 		   struct eth_device_info *dev_info, int rx_queue_id)
1626 {
1627 	int err;
1628 	int i;
1629 	int s;
1630 
1631 	if (dev_info->nb_rx_intr == 0)
1632 		return 0;
1633 
1634 	err = 0;
1635 	if (rx_queue_id == -1) {
1636 		s = dev_info->nb_shared_intr;
1637 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1638 			int sintr;
1639 			uint16_t q;
1640 
1641 			q = dev_info->intr_queue[i];
1642 			sintr = rxa_shared_intr(dev_info, q);
1643 			s -= sintr;
1644 
1645 			if (!sintr || s == 0) {
1646 
1647 				err = rxa_disable_intr(rx_adapter, dev_info,
1648 						q);
1649 				if (err)
1650 					return err;
1651 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1652 							q);
1653 			}
1654 		}
1655 	} else {
1656 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1657 			return 0;
1658 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1659 				dev_info->nb_shared_intr == 1) {
1660 			err = rxa_disable_intr(rx_adapter, dev_info,
1661 					rx_queue_id);
1662 			if (err)
1663 				return err;
1664 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1665 						rx_queue_id);
1666 		}
1667 
1668 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1669 			if (dev_info->intr_queue[i] == rx_queue_id) {
1670 				for (; i < dev_info->nb_rx_intr - 1; i++)
1671 					dev_info->intr_queue[i] =
1672 						dev_info->intr_queue[i + 1];
1673 				break;
1674 			}
1675 		}
1676 	}
1677 
1678 	return err;
1679 }
1680 
1681 static int
1682 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1683 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1684 {
1685 	int err, err1;
1686 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1687 	union queue_data qd;
1688 	int init_fd;
1689 	uint16_t *intr_queue;
1690 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1691 
1692 	if (rxa_intr_queue(dev_info, rx_queue_id))
1693 		return 0;
1694 
1695 	intr_queue = dev_info->intr_queue;
1696 	if (dev_info->intr_queue == NULL) {
1697 		size_t len =
1698 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1699 		dev_info->intr_queue =
1700 			rte_zmalloc_socket(
1701 				rx_adapter->mem_name,
1702 				len,
1703 				0,
1704 				rx_adapter->socket_id);
1705 		if (dev_info->intr_queue == NULL)
1706 			return -ENOMEM;
1707 	}
1708 
1709 	init_fd = rx_adapter->epd;
1710 	err = rxa_init_epd(rx_adapter);
1711 	if (err)
1712 		goto err_free_queue;
1713 
1714 	qd.port = eth_dev_id;
1715 	qd.queue = rx_queue_id;
1716 
1717 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1718 					rx_adapter->epd,
1719 					RTE_INTR_EVENT_ADD,
1720 					qd.ptr);
1721 	if (err) {
1722 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1723 			" Rx Queue %u err %d", rx_queue_id, err);
1724 		goto err_del_fd;
1725 	}
1726 
1727 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1728 	if (err) {
1729 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1730 				" Rx Queue %u err %d", rx_queue_id, err);
1731 
1732 		goto err_del_event;
1733 	}
1734 
1735 	err = rxa_create_intr_thread(rx_adapter);
1736 	if (!err)  {
1737 		if (sintr)
1738 			dev_info->shared_intr_enabled = 1;
1739 		else
1740 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1741 		return 0;
1742 	}
1743 
1744 
1745 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1746 	if (err)
1747 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1748 				" Rx Queue %u err %d", rx_queue_id, err);
1749 err_del_event:
1750 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1751 					rx_adapter->epd,
1752 					RTE_INTR_EVENT_DEL,
1753 					0);
1754 	if (err1) {
1755 		RTE_EDEV_LOG_ERR("Could not delete event for"
1756 				" Rx Queue %u err %d", rx_queue_id, err1);
1757 	}
1758 err_del_fd:
1759 	if (init_fd == INIT_FD) {
1760 		close(rx_adapter->epd);
1761 		rx_adapter->epd = -1;
1762 	}
1763 err_free_queue:
1764 	if (intr_queue == NULL)
1765 		rte_free(dev_info->intr_queue);
1766 
1767 	return err;
1768 }
1769 
1770 static int
1771 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1772 		   struct eth_device_info *dev_info, int rx_queue_id)
1773 
1774 {
1775 	int i, j, err;
1776 	int si = -1;
1777 	int shared_done = (dev_info->nb_shared_intr > 0);
1778 
1779 	if (rx_queue_id != -1) {
1780 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1781 			return 0;
1782 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1783 	}
1784 
1785 	err = 0;
1786 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1787 
1788 		if (rxa_shared_intr(dev_info, i) && shared_done)
1789 			continue;
1790 
1791 		err = rxa_config_intr(rx_adapter, dev_info, i);
1792 
1793 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1794 		if (shared_done) {
1795 			si = i;
1796 			dev_info->shared_intr_enabled = 1;
1797 		}
1798 		if (err)
1799 			break;
1800 	}
1801 
1802 	if (err == 0)
1803 		return 0;
1804 
1805 	shared_done = (dev_info->nb_shared_intr > 0);
1806 	for (j = 0; j < i; j++) {
1807 		if (rxa_intr_queue(dev_info, j))
1808 			continue;
1809 		if (rxa_shared_intr(dev_info, j) && si != j)
1810 			continue;
1811 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1812 		if (err)
1813 			break;
1814 
1815 	}
1816 
1817 	return err;
1818 }
1819 
1820 static int
1821 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1822 {
1823 	int ret;
1824 	struct rte_service_spec service;
1825 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1826 
1827 	if (rx_adapter->service_inited)
1828 		return 0;
1829 
1830 	memset(&service, 0, sizeof(service));
1831 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1832 		"rte_event_eth_rx_adapter_%d", id);
1833 	service.socket_id = rx_adapter->socket_id;
1834 	service.callback = rxa_service_func;
1835 	service.callback_userdata = rx_adapter;
1836 	/* Service function handles locking for queue add/del updates */
1837 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1838 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1839 	if (ret) {
1840 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1841 			service.name, ret);
1842 		return ret;
1843 	}
1844 
1845 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1846 		&rx_adapter_conf, rx_adapter->conf_arg);
1847 	if (ret) {
1848 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1849 			ret);
1850 		goto err_done;
1851 	}
1852 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1853 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1854 	rx_adapter->service_inited = 1;
1855 	rx_adapter->epd = INIT_FD;
1856 	return 0;
1857 
1858 err_done:
1859 	rte_service_component_unregister(rx_adapter->service_id);
1860 	return ret;
1861 }
1862 
1863 static void
1864 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1865 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1866 		 uint8_t add)
1867 {
1868 	struct eth_rx_queue_info *queue_info;
1869 	int enabled;
1870 	uint16_t i;
1871 
1872 	if (dev_info->rx_queue == NULL)
1873 		return;
1874 
1875 	if (rx_queue_id == -1) {
1876 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1877 			rxa_update_queue(rx_adapter, dev_info, i, add);
1878 	} else {
1879 		queue_info = &dev_info->rx_queue[rx_queue_id];
1880 		enabled = queue_info->queue_enabled;
1881 		if (add) {
1882 			rx_adapter->nb_queues += !enabled;
1883 			dev_info->nb_dev_queues += !enabled;
1884 		} else {
1885 			rx_adapter->nb_queues -= enabled;
1886 			dev_info->nb_dev_queues -= enabled;
1887 		}
1888 		queue_info->queue_enabled = !!add;
1889 	}
1890 }
1891 
1892 static void
1893 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1894 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1895 		    uint16_t port_id)
1896 {
1897 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1898 	struct eth_rx_vector_data *vector_data;
1899 	uint32_t flow_id;
1900 
1901 	vector_data = &queue_info->vector_data;
1902 	vector_data->max_vector_count = vector_count;
1903 	vector_data->port = port_id;
1904 	vector_data->queue = qid;
1905 	vector_data->vector_pool = mp;
1906 	vector_data->vector_timeout_ticks =
1907 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1908 	vector_data->ts = 0;
1909 	flow_id = queue_info->event & 0xFFFFF;
1910 	flow_id =
1911 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1912 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1913 }
1914 
1915 static void
1916 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
1917 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
1918 {
1919 	struct eth_rx_vector_data *vec;
1920 	int pollq;
1921 	int intrq;
1922 	int sintrq;
1923 
1924 
1925 	if (rx_adapter->nb_queues == 0)
1926 		return;
1927 
1928 	if (rx_queue_id == -1) {
1929 		uint16_t nb_rx_queues;
1930 		uint16_t i;
1931 
1932 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1933 		for (i = 0; i <	nb_rx_queues; i++)
1934 			rxa_sw_del(rx_adapter, dev_info, i);
1935 		return;
1936 	}
1937 
1938 	/* Push all the partial event vectors to event device. */
1939 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1940 		if (vec->queue != rx_queue_id)
1941 			continue;
1942 		rxa_vector_expire(vec, rx_adapter);
1943 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1944 	}
1945 
1946 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1947 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1948 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1949 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1950 	rx_adapter->num_rx_polled -= pollq;
1951 	dev_info->nb_rx_poll -= pollq;
1952 	rx_adapter->num_rx_intr -= intrq;
1953 	dev_info->nb_rx_intr -= intrq;
1954 	dev_info->nb_shared_intr -= intrq && sintrq;
1955 	if (rx_adapter->use_queue_event_buf) {
1956 		struct eth_event_enqueue_buffer *event_buf =
1957 			dev_info->rx_queue[rx_queue_id].event_buf;
1958 		struct rte_event_eth_rx_adapter_stats *stats =
1959 			dev_info->rx_queue[rx_queue_id].stats;
1960 		rte_free(event_buf->events);
1961 		rte_free(event_buf);
1962 		rte_free(stats);
1963 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
1964 		dev_info->rx_queue[rx_queue_id].stats = NULL;
1965 	}
1966 }
1967 
1968 static int
1969 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
1970 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
1971 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
1972 {
1973 	struct eth_rx_queue_info *queue_info;
1974 	const struct rte_event *ev = &conf->ev;
1975 	int pollq;
1976 	int intrq;
1977 	int sintrq;
1978 	struct rte_event *qi_ev;
1979 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
1980 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1981 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1982 	int ret;
1983 
1984 	if (rx_queue_id == -1) {
1985 		uint16_t nb_rx_queues;
1986 		uint16_t i;
1987 
1988 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1989 		for (i = 0; i <	nb_rx_queues; i++) {
1990 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
1991 			if (ret)
1992 				return ret;
1993 		}
1994 		return 0;
1995 	}
1996 
1997 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1998 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1999 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2000 
2001 	queue_info = &dev_info->rx_queue[rx_queue_id];
2002 	queue_info->wt = conf->servicing_weight;
2003 
2004 	qi_ev = (struct rte_event *)&queue_info->event;
2005 	qi_ev->event = ev->event;
2006 	qi_ev->op = RTE_EVENT_OP_NEW;
2007 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2008 	qi_ev->sub_event_type = 0;
2009 
2010 	if (conf->rx_queue_flags &
2011 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2012 		queue_info->flow_id_mask = ~0;
2013 	} else
2014 		qi_ev->flow_id = 0;
2015 
2016 	if (conf->rx_queue_flags &
2017 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2018 		queue_info->ena_vector = 1;
2019 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2020 		rxa_set_vector_data(queue_info, conf->vector_sz,
2021 				    conf->vector_timeout_ns, conf->vector_mp,
2022 				    rx_queue_id, dev_info->dev->data->port_id);
2023 		rx_adapter->ena_vector = 1;
2024 		rx_adapter->vector_tmo_ticks =
2025 			rx_adapter->vector_tmo_ticks ?
2026 				      RTE_MIN(queue_info->vector_data
2027 							.vector_timeout_ticks >>
2028 						1,
2029 					rx_adapter->vector_tmo_ticks) :
2030 				queue_info->vector_data.vector_timeout_ticks >>
2031 					1;
2032 	}
2033 
2034 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2035 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2036 		rx_adapter->num_rx_polled += !pollq;
2037 		dev_info->nb_rx_poll += !pollq;
2038 		rx_adapter->num_rx_intr -= intrq;
2039 		dev_info->nb_rx_intr -= intrq;
2040 		dev_info->nb_shared_intr -= intrq && sintrq;
2041 	}
2042 
2043 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2044 		rx_adapter->num_rx_polled -= pollq;
2045 		dev_info->nb_rx_poll -= pollq;
2046 		rx_adapter->num_rx_intr += !intrq;
2047 		dev_info->nb_rx_intr += !intrq;
2048 		dev_info->nb_shared_intr += !intrq && sintrq;
2049 		if (dev_info->nb_shared_intr == 1) {
2050 			if (dev_info->multi_intr_cap)
2051 				dev_info->next_q_idx =
2052 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2053 			else
2054 				dev_info->next_q_idx = 0;
2055 		}
2056 	}
2057 
2058 	if (!rx_adapter->use_queue_event_buf)
2059 		return 0;
2060 
2061 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2062 				sizeof(*new_rx_buf), 0,
2063 				rte_eth_dev_socket_id(eth_dev_id));
2064 	if (new_rx_buf == NULL) {
2065 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2066 				 "dev_id: %d queue_id: %d",
2067 				 eth_dev_id, rx_queue_id);
2068 		return -ENOMEM;
2069 	}
2070 
2071 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2072 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2073 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2074 				sizeof(struct rte_event) *
2075 				new_rx_buf->events_size, 0,
2076 				rte_eth_dev_socket_id(eth_dev_id));
2077 	if (new_rx_buf->events == NULL) {
2078 		rte_free(new_rx_buf);
2079 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2080 				 "dev_id: %d queue_id: %d",
2081 				 eth_dev_id, rx_queue_id);
2082 		return -ENOMEM;
2083 	}
2084 
2085 	queue_info->event_buf = new_rx_buf;
2086 
2087 	/* Allocate storage for adapter queue stats */
2088 	stats = rte_zmalloc_socket("rx_queue_stats",
2089 				sizeof(*stats), 0,
2090 				rte_eth_dev_socket_id(eth_dev_id));
2091 	if (stats == NULL) {
2092 		rte_free(new_rx_buf->events);
2093 		rte_free(new_rx_buf);
2094 		RTE_EDEV_LOG_ERR("Failed to allocate stats storage for"
2095 				 " dev_id: %d queue_id: %d",
2096 				 eth_dev_id, rx_queue_id);
2097 		return -ENOMEM;
2098 	}
2099 
2100 	queue_info->stats = stats;
2101 
2102 	return 0;
2103 }
2104 
2105 static int
2106 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2107 	   int rx_queue_id,
2108 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2109 {
2110 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2111 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2112 	int ret;
2113 	struct eth_rx_poll_entry *rx_poll;
2114 	struct eth_rx_queue_info *rx_queue;
2115 	uint32_t *rx_wrr;
2116 	uint16_t nb_rx_queues;
2117 	uint32_t nb_rx_poll, nb_wrr;
2118 	uint32_t nb_rx_intr;
2119 	int num_intr_vec;
2120 	uint16_t wt;
2121 
2122 	if (queue_conf->servicing_weight == 0) {
2123 		struct rte_eth_dev_data *data = dev_info->dev->data;
2124 
2125 		temp_conf = *queue_conf;
2126 		if (!data->dev_conf.intr_conf.rxq) {
2127 			/* If Rx interrupts are disabled set wt = 1 */
2128 			temp_conf.servicing_weight = 1;
2129 		}
2130 		queue_conf = &temp_conf;
2131 
2132 		if (queue_conf->servicing_weight == 0 &&
2133 		    rx_adapter->use_queue_event_buf) {
2134 
2135 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2136 					 "not supported for interrupt queues "
2137 					 "dev_id: %d queue_id: %d",
2138 					 eth_dev_id, rx_queue_id);
2139 			return -EINVAL;
2140 		}
2141 	}
2142 
2143 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2144 	rx_queue = dev_info->rx_queue;
2145 	wt = queue_conf->servicing_weight;
2146 
2147 	if (dev_info->rx_queue == NULL) {
2148 		dev_info->rx_queue =
2149 		    rte_zmalloc_socket(rx_adapter->mem_name,
2150 				       nb_rx_queues *
2151 				       sizeof(struct eth_rx_queue_info), 0,
2152 				       rx_adapter->socket_id);
2153 		if (dev_info->rx_queue == NULL)
2154 			return -ENOMEM;
2155 	}
2156 	rx_wrr = NULL;
2157 	rx_poll = NULL;
2158 
2159 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2160 			queue_conf->servicing_weight,
2161 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2162 
2163 	if (dev_info->dev->intr_handle)
2164 		dev_info->multi_intr_cap =
2165 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2166 
2167 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2168 				&rx_poll, &rx_wrr);
2169 	if (ret)
2170 		goto err_free_rxqueue;
2171 
2172 	if (wt == 0) {
2173 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2174 
2175 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2176 		if (ret)
2177 			goto err_free_rxqueue;
2178 
2179 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2180 		if (ret)
2181 			goto err_free_rxqueue;
2182 	} else {
2183 
2184 		num_intr_vec = 0;
2185 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2186 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2187 						rx_queue_id, 0);
2188 			/* interrupt based queues are being converted to
2189 			 * poll mode queues, delete the interrupt configuration
2190 			 * for those.
2191 			 */
2192 			ret = rxa_del_intr_queue(rx_adapter,
2193 						dev_info, rx_queue_id);
2194 			if (ret)
2195 				goto err_free_rxqueue;
2196 		}
2197 	}
2198 
2199 	if (nb_rx_intr == 0) {
2200 		ret = rxa_free_intr_resources(rx_adapter);
2201 		if (ret)
2202 			goto err_free_rxqueue;
2203 	}
2204 
2205 	if (wt == 0) {
2206 		uint16_t i;
2207 
2208 		if (rx_queue_id  == -1) {
2209 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2210 				dev_info->intr_queue[i] = i;
2211 		} else {
2212 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2213 				dev_info->intr_queue[nb_rx_intr - 1] =
2214 					rx_queue_id;
2215 		}
2216 	}
2217 
2218 
2219 
2220 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2221 	if (ret)
2222 		goto err_free_rxqueue;
2223 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2224 
2225 	rte_free(rx_adapter->eth_rx_poll);
2226 	rte_free(rx_adapter->wrr_sched);
2227 
2228 	rx_adapter->eth_rx_poll = rx_poll;
2229 	rx_adapter->wrr_sched = rx_wrr;
2230 	rx_adapter->wrr_len = nb_wrr;
2231 	rx_adapter->num_intr_vec += num_intr_vec;
2232 	return 0;
2233 
2234 err_free_rxqueue:
2235 	if (rx_queue == NULL) {
2236 		rte_free(dev_info->rx_queue);
2237 		dev_info->rx_queue = NULL;
2238 	}
2239 
2240 	rte_free(rx_poll);
2241 	rte_free(rx_wrr);
2242 
2243 	return ret;
2244 }
2245 
2246 static int
2247 rxa_ctrl(uint8_t id, int start)
2248 {
2249 	struct event_eth_rx_adapter *rx_adapter;
2250 	struct rte_eventdev *dev;
2251 	struct eth_device_info *dev_info;
2252 	uint32_t i;
2253 	int use_service = 0;
2254 	int stop = !start;
2255 
2256 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2257 	rx_adapter = rxa_id_to_adapter(id);
2258 	if (rx_adapter == NULL)
2259 		return -EINVAL;
2260 
2261 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2262 
2263 	RTE_ETH_FOREACH_DEV(i) {
2264 		dev_info = &rx_adapter->eth_devices[i];
2265 		/* if start  check for num dev queues */
2266 		if (start && !dev_info->nb_dev_queues)
2267 			continue;
2268 		/* if stop check if dev has been started */
2269 		if (stop && !dev_info->dev_rx_started)
2270 			continue;
2271 		use_service |= !dev_info->internal_event_port;
2272 		dev_info->dev_rx_started = start;
2273 		if (dev_info->internal_event_port == 0)
2274 			continue;
2275 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2276 						&rte_eth_devices[i]) :
2277 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2278 						&rte_eth_devices[i]);
2279 	}
2280 
2281 	if (use_service) {
2282 		rte_spinlock_lock(&rx_adapter->rx_lock);
2283 		rx_adapter->rxa_started = start;
2284 		rte_service_runstate_set(rx_adapter->service_id, start);
2285 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2286 	}
2287 
2288 	return 0;
2289 }
2290 
2291 static int
2292 rxa_create(uint8_t id, uint8_t dev_id,
2293 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2294 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2295 	   void *conf_arg)
2296 {
2297 	struct event_eth_rx_adapter *rx_adapter;
2298 	struct eth_event_enqueue_buffer *buf;
2299 	struct rte_event *events;
2300 	int ret;
2301 	int socket_id;
2302 	uint16_t i;
2303 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2304 	const uint8_t default_rss_key[] = {
2305 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2306 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2307 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2308 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2309 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2310 	};
2311 
2312 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2313 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2314 
2315 	if (conf_cb == NULL)
2316 		return -EINVAL;
2317 
2318 	if (event_eth_rx_adapter == NULL) {
2319 		ret = rte_event_eth_rx_adapter_init();
2320 		if (ret)
2321 			return ret;
2322 	}
2323 
2324 	rx_adapter = rxa_id_to_adapter(id);
2325 	if (rx_adapter != NULL) {
2326 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2327 		return -EEXIST;
2328 	}
2329 
2330 	socket_id = rte_event_dev_socket_id(dev_id);
2331 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2332 		"rte_event_eth_rx_adapter_%d",
2333 		id);
2334 
2335 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2336 			RTE_CACHE_LINE_SIZE, socket_id);
2337 	if (rx_adapter == NULL) {
2338 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2339 		return -ENOMEM;
2340 	}
2341 
2342 	rx_adapter->eventdev_id = dev_id;
2343 	rx_adapter->socket_id = socket_id;
2344 	rx_adapter->conf_cb = conf_cb;
2345 	rx_adapter->conf_arg = conf_arg;
2346 	rx_adapter->id = id;
2347 	TAILQ_INIT(&rx_adapter->vector_list);
2348 	strcpy(rx_adapter->mem_name, mem_name);
2349 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2350 					RTE_MAX_ETHPORTS *
2351 					sizeof(struct eth_device_info), 0,
2352 					socket_id);
2353 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2354 			(uint32_t *)rx_adapter->rss_key_be,
2355 			    RTE_DIM(default_rss_key));
2356 
2357 	if (rx_adapter->eth_devices == NULL) {
2358 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2359 		rte_free(rx_adapter);
2360 		return -ENOMEM;
2361 	}
2362 
2363 	rte_spinlock_init(&rx_adapter->rx_lock);
2364 
2365 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2366 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2367 
2368 	/* Rx adapter event buffer allocation */
2369 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2370 
2371 	if (!rx_adapter->use_queue_event_buf) {
2372 		buf = &rx_adapter->event_enqueue_buffer;
2373 		buf->events_size = rxa_params->event_buf_size;
2374 
2375 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2376 					    buf->events_size * sizeof(*events),
2377 					    0, socket_id);
2378 		if (events == NULL) {
2379 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2380 					 "for adapter event buffer");
2381 			rte_free(rx_adapter->eth_devices);
2382 			rte_free(rx_adapter);
2383 			return -ENOMEM;
2384 		}
2385 
2386 		rx_adapter->event_enqueue_buffer.events = events;
2387 	}
2388 
2389 	event_eth_rx_adapter[id] = rx_adapter;
2390 
2391 	if (conf_cb == rxa_default_conf_cb)
2392 		rx_adapter->default_cb_arg = 1;
2393 
2394 	if (rte_mbuf_dyn_rx_timestamp_register(
2395 			&event_eth_rx_timestamp_dynfield_offset,
2396 			&event_eth_rx_timestamp_dynflag) != 0) {
2397 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2398 		return -rte_errno;
2399 	}
2400 
2401 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2402 		conf_arg);
2403 	return 0;
2404 }
2405 
2406 int
2407 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2408 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2409 				void *conf_arg)
2410 {
2411 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2412 
2413 	/* use default values for adapter params */
2414 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2415 	rxa_params.use_queue_event_buf = false;
2416 
2417 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2418 }
2419 
2420 int
2421 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2422 			struct rte_event_port_conf *port_config,
2423 			struct rte_event_eth_rx_adapter_params *rxa_params)
2424 {
2425 	struct rte_event_port_conf *pc;
2426 	int ret;
2427 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2428 
2429 	if (port_config == NULL)
2430 		return -EINVAL;
2431 
2432 	if (rxa_params == NULL) {
2433 		/* use default values if rxa_params is NULL */
2434 		rxa_params = &temp_params;
2435 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2436 		rxa_params->use_queue_event_buf = false;
2437 	} else if ((!rxa_params->use_queue_event_buf &&
2438 		    rxa_params->event_buf_size == 0) ||
2439 		   (rxa_params->use_queue_event_buf &&
2440 		    rxa_params->event_buf_size != 0)) {
2441 		RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2442 		return -EINVAL;
2443 	} else if (!rxa_params->use_queue_event_buf) {
2444 		/* adjust event buff size with BATCH_SIZE used for fetching
2445 		 * packets from NIC rx queues to get full buffer utilization
2446 		 * and prevent unnecessary rollovers.
2447 		 */
2448 
2449 		rxa_params->event_buf_size =
2450 			RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2451 		rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2452 	}
2453 
2454 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2455 	if (pc == NULL)
2456 		return -ENOMEM;
2457 
2458 	*pc = *port_config;
2459 
2460 	ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2461 	if (ret)
2462 		rte_free(pc);
2463 
2464 	return ret;
2465 }
2466 
2467 int
2468 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2469 		struct rte_event_port_conf *port_config)
2470 {
2471 	struct rte_event_port_conf *pc;
2472 	int ret;
2473 
2474 	if (port_config == NULL)
2475 		return -EINVAL;
2476 
2477 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2478 
2479 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2480 	if (pc == NULL)
2481 		return -ENOMEM;
2482 	*pc = *port_config;
2483 
2484 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2485 					rxa_default_conf_cb,
2486 					pc);
2487 	if (ret)
2488 		rte_free(pc);
2489 	return ret;
2490 }
2491 
2492 int
2493 rte_event_eth_rx_adapter_free(uint8_t id)
2494 {
2495 	struct event_eth_rx_adapter *rx_adapter;
2496 
2497 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2498 
2499 	rx_adapter = rxa_id_to_adapter(id);
2500 	if (rx_adapter == NULL)
2501 		return -EINVAL;
2502 
2503 	if (rx_adapter->nb_queues) {
2504 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2505 				rx_adapter->nb_queues);
2506 		return -EBUSY;
2507 	}
2508 
2509 	if (rx_adapter->default_cb_arg)
2510 		rte_free(rx_adapter->conf_arg);
2511 	rte_free(rx_adapter->eth_devices);
2512 	if (!rx_adapter->use_queue_event_buf)
2513 		rte_free(rx_adapter->event_enqueue_buffer.events);
2514 	rte_free(rx_adapter);
2515 	event_eth_rx_adapter[id] = NULL;
2516 
2517 	rte_eventdev_trace_eth_rx_adapter_free(id);
2518 	return 0;
2519 }
2520 
2521 int
2522 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2523 		uint16_t eth_dev_id,
2524 		int32_t rx_queue_id,
2525 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2526 {
2527 	int ret;
2528 	uint32_t cap;
2529 	struct event_eth_rx_adapter *rx_adapter;
2530 	struct rte_eventdev *dev;
2531 	struct eth_device_info *dev_info;
2532 	struct rte_event_eth_rx_adapter_vector_limits limits;
2533 
2534 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2535 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2536 
2537 	rx_adapter = rxa_id_to_adapter(id);
2538 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2539 		return -EINVAL;
2540 
2541 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2542 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2543 						eth_dev_id,
2544 						&cap);
2545 	if (ret) {
2546 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2547 			"eth port %" PRIu16, id, eth_dev_id);
2548 		return ret;
2549 	}
2550 
2551 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2552 		&& (queue_conf->rx_queue_flags &
2553 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2554 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2555 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2556 				eth_dev_id, id);
2557 		return -EINVAL;
2558 	}
2559 
2560 	if (queue_conf->rx_queue_flags &
2561 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2562 
2563 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2564 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2565 					 " eth port: %" PRIu16
2566 					 " adapter id: %" PRIu8,
2567 					 eth_dev_id, id);
2568 			return -EINVAL;
2569 		}
2570 
2571 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2572 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2573 		if (ret < 0) {
2574 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2575 					 " eth port: %" PRIu16
2576 					 " adapter id: %" PRIu8,
2577 					 eth_dev_id, id);
2578 			return -EINVAL;
2579 		}
2580 		if (queue_conf->vector_sz < limits.min_sz ||
2581 		    queue_conf->vector_sz > limits.max_sz ||
2582 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2583 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2584 		    queue_conf->vector_mp == NULL) {
2585 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2586 					 " eth port: %" PRIu16
2587 					 " adapter id: %" PRIu8,
2588 					 eth_dev_id, id);
2589 			return -EINVAL;
2590 		}
2591 		if (queue_conf->vector_mp->elt_size <
2592 		    (sizeof(struct rte_event_vector) +
2593 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2594 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2595 					 " eth port: %" PRIu16
2596 					 " adapter id: %" PRIu8,
2597 					 eth_dev_id, id);
2598 			return -EINVAL;
2599 		}
2600 	}
2601 
2602 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2603 		(rx_queue_id != -1)) {
2604 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2605 			"event queue, eth port: %" PRIu16 " adapter id: %"
2606 			PRIu8, eth_dev_id, id);
2607 		return -EINVAL;
2608 	}
2609 
2610 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2611 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2612 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2613 			 (uint16_t)rx_queue_id);
2614 		return -EINVAL;
2615 	}
2616 
2617 	if ((rx_adapter->use_queue_event_buf &&
2618 	     queue_conf->event_buf_size == 0) ||
2619 	    (!rx_adapter->use_queue_event_buf &&
2620 	     queue_conf->event_buf_size != 0)) {
2621 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2622 		return -EINVAL;
2623 	}
2624 
2625 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2626 
2627 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2628 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2629 					-ENOTSUP);
2630 		if (dev_info->rx_queue == NULL) {
2631 			dev_info->rx_queue =
2632 			    rte_zmalloc_socket(rx_adapter->mem_name,
2633 					dev_info->dev->data->nb_rx_queues *
2634 					sizeof(struct eth_rx_queue_info), 0,
2635 					rx_adapter->socket_id);
2636 			if (dev_info->rx_queue == NULL)
2637 				return -ENOMEM;
2638 		}
2639 
2640 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2641 				&rte_eth_devices[eth_dev_id],
2642 				rx_queue_id, queue_conf);
2643 		if (ret == 0) {
2644 			dev_info->internal_event_port = 1;
2645 			rxa_update_queue(rx_adapter,
2646 					&rx_adapter->eth_devices[eth_dev_id],
2647 					rx_queue_id,
2648 					1);
2649 		}
2650 	} else {
2651 		rte_spinlock_lock(&rx_adapter->rx_lock);
2652 		dev_info->internal_event_port = 0;
2653 		ret = rxa_init_service(rx_adapter, id);
2654 		if (ret == 0) {
2655 			uint32_t service_id = rx_adapter->service_id;
2656 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2657 					queue_conf);
2658 			rte_service_component_runstate_set(service_id,
2659 				rxa_sw_adapter_queue_count(rx_adapter));
2660 		}
2661 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2662 	}
2663 
2664 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2665 		rx_queue_id, queue_conf, ret);
2666 	if (ret)
2667 		return ret;
2668 
2669 	return 0;
2670 }
2671 
2672 static int
2673 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2674 {
2675 	limits->max_sz = MAX_VECTOR_SIZE;
2676 	limits->min_sz = MIN_VECTOR_SIZE;
2677 	limits->max_timeout_ns = MAX_VECTOR_NS;
2678 	limits->min_timeout_ns = MIN_VECTOR_NS;
2679 
2680 	return 0;
2681 }
2682 
2683 int
2684 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2685 				int32_t rx_queue_id)
2686 {
2687 	int ret = 0;
2688 	struct rte_eventdev *dev;
2689 	struct event_eth_rx_adapter *rx_adapter;
2690 	struct eth_device_info *dev_info;
2691 	uint32_t cap;
2692 	uint32_t nb_rx_poll = 0;
2693 	uint32_t nb_wrr = 0;
2694 	uint32_t nb_rx_intr;
2695 	struct eth_rx_poll_entry *rx_poll = NULL;
2696 	uint32_t *rx_wrr = NULL;
2697 	int num_intr_vec;
2698 
2699 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2700 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2701 
2702 	rx_adapter = rxa_id_to_adapter(id);
2703 	if (rx_adapter == NULL)
2704 		return -EINVAL;
2705 
2706 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2707 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2708 						eth_dev_id,
2709 						&cap);
2710 	if (ret)
2711 		return ret;
2712 
2713 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2714 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2715 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2716 			 (uint16_t)rx_queue_id);
2717 		return -EINVAL;
2718 	}
2719 
2720 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2721 
2722 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2723 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2724 				 -ENOTSUP);
2725 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2726 						&rte_eth_devices[eth_dev_id],
2727 						rx_queue_id);
2728 		if (ret == 0) {
2729 			rxa_update_queue(rx_adapter,
2730 					&rx_adapter->eth_devices[eth_dev_id],
2731 					rx_queue_id,
2732 					0);
2733 			if (dev_info->nb_dev_queues == 0) {
2734 				rte_free(dev_info->rx_queue);
2735 				dev_info->rx_queue = NULL;
2736 			}
2737 		}
2738 	} else {
2739 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2740 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2741 
2742 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2743 			&rx_poll, &rx_wrr);
2744 		if (ret)
2745 			return ret;
2746 
2747 		rte_spinlock_lock(&rx_adapter->rx_lock);
2748 
2749 		num_intr_vec = 0;
2750 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2751 
2752 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2753 						rx_queue_id, 0);
2754 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2755 					rx_queue_id);
2756 			if (ret)
2757 				goto unlock_ret;
2758 		}
2759 
2760 		if (nb_rx_intr == 0) {
2761 			ret = rxa_free_intr_resources(rx_adapter);
2762 			if (ret)
2763 				goto unlock_ret;
2764 		}
2765 
2766 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2767 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2768 
2769 		rte_free(rx_adapter->eth_rx_poll);
2770 		rte_free(rx_adapter->wrr_sched);
2771 
2772 		if (nb_rx_intr == 0) {
2773 			rte_free(dev_info->intr_queue);
2774 			dev_info->intr_queue = NULL;
2775 		}
2776 
2777 		rx_adapter->eth_rx_poll = rx_poll;
2778 		rx_adapter->wrr_sched = rx_wrr;
2779 		rx_adapter->wrr_len = nb_wrr;
2780 		/*
2781 		 * reset next poll start position (wrr_pos) to avoid buffer
2782 		 * overrun when wrr_len is reduced in case of queue delete
2783 		 */
2784 		rx_adapter->wrr_pos = 0;
2785 		rx_adapter->num_intr_vec += num_intr_vec;
2786 
2787 		if (dev_info->nb_dev_queues == 0) {
2788 			rte_free(dev_info->rx_queue);
2789 			dev_info->rx_queue = NULL;
2790 		}
2791 unlock_ret:
2792 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2793 		if (ret) {
2794 			rte_free(rx_poll);
2795 			rte_free(rx_wrr);
2796 			return ret;
2797 		}
2798 
2799 		rte_service_component_runstate_set(rx_adapter->service_id,
2800 				rxa_sw_adapter_queue_count(rx_adapter));
2801 	}
2802 
2803 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2804 		rx_queue_id, ret);
2805 	return ret;
2806 }
2807 
2808 int
2809 rte_event_eth_rx_adapter_vector_limits_get(
2810 	uint8_t dev_id, uint16_t eth_port_id,
2811 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2812 {
2813 	struct rte_eventdev *dev;
2814 	uint32_t cap;
2815 	int ret;
2816 
2817 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2818 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2819 
2820 	if (limits == NULL)
2821 		return -EINVAL;
2822 
2823 	dev = &rte_eventdevs[dev_id];
2824 
2825 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2826 	if (ret) {
2827 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2828 				 "eth port %" PRIu16,
2829 				 dev_id, eth_port_id);
2830 		return ret;
2831 	}
2832 
2833 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2834 		RTE_FUNC_PTR_OR_ERR_RET(
2835 			*dev->dev_ops->eth_rx_adapter_vector_limits_get,
2836 			-ENOTSUP);
2837 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2838 			dev, &rte_eth_devices[eth_port_id], limits);
2839 	} else {
2840 		ret = rxa_sw_vector_limits(limits);
2841 	}
2842 
2843 	return ret;
2844 }
2845 
2846 int
2847 rte_event_eth_rx_adapter_start(uint8_t id)
2848 {
2849 	rte_eventdev_trace_eth_rx_adapter_start(id);
2850 	return rxa_ctrl(id, 1);
2851 }
2852 
2853 int
2854 rte_event_eth_rx_adapter_stop(uint8_t id)
2855 {
2856 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2857 	return rxa_ctrl(id, 0);
2858 }
2859 
2860 static inline void
2861 rxa_queue_stats_reset(struct eth_rx_queue_info *queue_info)
2862 {
2863 	struct rte_event_eth_rx_adapter_stats *q_stats;
2864 
2865 	q_stats = queue_info->stats;
2866 	memset(q_stats, 0, sizeof(*q_stats));
2867 }
2868 
2869 int
2870 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2871 			       struct rte_event_eth_rx_adapter_stats *stats)
2872 {
2873 	struct event_eth_rx_adapter *rx_adapter;
2874 	struct eth_event_enqueue_buffer *buf;
2875 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2876 	struct rte_event_eth_rx_adapter_stats dev_stats;
2877 	struct rte_eventdev *dev;
2878 	struct eth_device_info *dev_info;
2879 	struct eth_rx_queue_info *queue_info;
2880 	struct rte_event_eth_rx_adapter_stats *q_stats;
2881 	uint32_t i, j;
2882 	int ret;
2883 
2884 	if (rxa_memzone_lookup())
2885 		return -ENOMEM;
2886 
2887 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2888 
2889 	rx_adapter = rxa_id_to_adapter(id);
2890 	if (rx_adapter  == NULL || stats == NULL)
2891 		return -EINVAL;
2892 
2893 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2894 	memset(stats, 0, sizeof(*stats));
2895 
2896 	if (rx_adapter->service_inited)
2897 		*stats = rx_adapter->stats;
2898 
2899 	RTE_ETH_FOREACH_DEV(i) {
2900 		dev_info = &rx_adapter->eth_devices[i];
2901 
2902 		if (rx_adapter->use_queue_event_buf && dev_info->rx_queue) {
2903 
2904 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
2905 			     j++) {
2906 				queue_info = &dev_info->rx_queue[j];
2907 				if (!queue_info->queue_enabled)
2908 					continue;
2909 				q_stats = queue_info->stats;
2910 
2911 				stats->rx_packets += q_stats->rx_packets;
2912 				stats->rx_poll_count += q_stats->rx_poll_count;
2913 				stats->rx_enq_count += q_stats->rx_enq_count;
2914 				stats->rx_enq_retry += q_stats->rx_enq_retry;
2915 				stats->rx_dropped += q_stats->rx_dropped;
2916 				stats->rx_enq_block_cycles +=
2917 						q_stats->rx_enq_block_cycles;
2918 			}
2919 		}
2920 
2921 		if (dev_info->internal_event_port == 0 ||
2922 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2923 			continue;
2924 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2925 						&rte_eth_devices[i],
2926 						&dev_stats);
2927 		if (ret)
2928 			continue;
2929 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
2930 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2931 	}
2932 
2933 	buf = &rx_adapter->event_enqueue_buffer;
2934 	stats->rx_packets += dev_stats_sum.rx_packets;
2935 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2936 	stats->rx_event_buf_count = buf->count;
2937 	stats->rx_event_buf_size = buf->events_size;
2938 
2939 	return 0;
2940 }
2941 
2942 int
2943 rte_event_eth_rx_adapter_queue_stats_get(uint8_t id,
2944 		uint16_t eth_dev_id,
2945 		uint16_t rx_queue_id,
2946 		struct rte_event_eth_rx_adapter_queue_stats *stats)
2947 {
2948 	struct event_eth_rx_adapter *rx_adapter;
2949 	struct eth_device_info *dev_info;
2950 	struct eth_rx_queue_info *queue_info;
2951 	struct eth_event_enqueue_buffer *event_buf;
2952 	struct rte_event_eth_rx_adapter_stats *q_stats;
2953 	struct rte_eventdev *dev;
2954 
2955 	if (rxa_memzone_lookup())
2956 		return -ENOMEM;
2957 
2958 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2959 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2960 
2961 	rx_adapter = rxa_id_to_adapter(id);
2962 
2963 	if (rx_adapter == NULL || stats == NULL)
2964 		return -EINVAL;
2965 
2966 	if (!rx_adapter->use_queue_event_buf)
2967 		return -EINVAL;
2968 
2969 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2970 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
2971 		return -EINVAL;
2972 	}
2973 
2974 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2975 	if (dev_info->rx_queue == NULL ||
2976 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
2977 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
2978 		return -EINVAL;
2979 	}
2980 
2981 	queue_info = &dev_info->rx_queue[rx_queue_id];
2982 	event_buf = queue_info->event_buf;
2983 	q_stats = queue_info->stats;
2984 
2985 	stats->rx_event_buf_count = event_buf->count;
2986 	stats->rx_event_buf_size = event_buf->events_size;
2987 	stats->rx_packets = q_stats->rx_packets;
2988 	stats->rx_poll_count = q_stats->rx_poll_count;
2989 	stats->rx_dropped = q_stats->rx_dropped;
2990 
2991 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2992 	if (dev->dev_ops->eth_rx_adapter_queue_stats_get != NULL) {
2993 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_get)(dev,
2994 						&rte_eth_devices[eth_dev_id],
2995 						rx_queue_id, stats);
2996 	}
2997 
2998 	return 0;
2999 }
3000 
3001 int
3002 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
3003 {
3004 	struct event_eth_rx_adapter *rx_adapter;
3005 	struct rte_eventdev *dev;
3006 	struct eth_device_info *dev_info;
3007 	struct eth_rx_queue_info *queue_info;
3008 	uint32_t i, j;
3009 
3010 	if (rxa_memzone_lookup())
3011 		return -ENOMEM;
3012 
3013 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3014 
3015 	rx_adapter = rxa_id_to_adapter(id);
3016 	if (rx_adapter == NULL)
3017 		return -EINVAL;
3018 
3019 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3020 
3021 	RTE_ETH_FOREACH_DEV(i) {
3022 		dev_info = &rx_adapter->eth_devices[i];
3023 
3024 		if (rx_adapter->use_queue_event_buf  && dev_info->rx_queue) {
3025 
3026 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3027 						j++) {
3028 				queue_info = &dev_info->rx_queue[j];
3029 				if (!queue_info->queue_enabled)
3030 					continue;
3031 				rxa_queue_stats_reset(queue_info);
3032 			}
3033 		}
3034 
3035 		if (dev_info->internal_event_port == 0 ||
3036 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
3037 			continue;
3038 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
3039 							&rte_eth_devices[i]);
3040 	}
3041 
3042 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
3043 
3044 	return 0;
3045 }
3046 
3047 int
3048 rte_event_eth_rx_adapter_queue_stats_reset(uint8_t id,
3049 		uint16_t eth_dev_id,
3050 		uint16_t rx_queue_id)
3051 {
3052 	struct event_eth_rx_adapter *rx_adapter;
3053 	struct eth_device_info *dev_info;
3054 	struct eth_rx_queue_info *queue_info;
3055 	struct rte_eventdev *dev;
3056 
3057 	if (rxa_memzone_lookup())
3058 		return -ENOMEM;
3059 
3060 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3061 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3062 
3063 	rx_adapter = rxa_id_to_adapter(id);
3064 	if (rx_adapter == NULL)
3065 		return -EINVAL;
3066 
3067 	if (!rx_adapter->use_queue_event_buf)
3068 		return -EINVAL;
3069 
3070 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3071 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3072 		return -EINVAL;
3073 	}
3074 
3075 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3076 
3077 	if (dev_info->rx_queue == NULL ||
3078 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3079 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3080 		return -EINVAL;
3081 	}
3082 
3083 	queue_info = &dev_info->rx_queue[rx_queue_id];
3084 	rxa_queue_stats_reset(queue_info);
3085 
3086 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3087 	if (dev->dev_ops->eth_rx_adapter_queue_stats_reset != NULL) {
3088 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_reset)(dev,
3089 						&rte_eth_devices[eth_dev_id],
3090 						rx_queue_id);
3091 	}
3092 
3093 	return 0;
3094 }
3095 
3096 int
3097 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
3098 {
3099 	struct event_eth_rx_adapter *rx_adapter;
3100 
3101 	if (rxa_memzone_lookup())
3102 		return -ENOMEM;
3103 
3104 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3105 
3106 	rx_adapter = rxa_id_to_adapter(id);
3107 	if (rx_adapter == NULL || service_id == NULL)
3108 		return -EINVAL;
3109 
3110 	if (rx_adapter->service_inited)
3111 		*service_id = rx_adapter->service_id;
3112 
3113 	return rx_adapter->service_inited ? 0 : -ESRCH;
3114 }
3115 
3116 int
3117 rte_event_eth_rx_adapter_cb_register(uint8_t id,
3118 					uint16_t eth_dev_id,
3119 					rte_event_eth_rx_adapter_cb_fn cb_fn,
3120 					void *cb_arg)
3121 {
3122 	struct event_eth_rx_adapter *rx_adapter;
3123 	struct eth_device_info *dev_info;
3124 	uint32_t cap;
3125 	int ret;
3126 
3127 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3128 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3129 
3130 	rx_adapter = rxa_id_to_adapter(id);
3131 	if (rx_adapter == NULL)
3132 		return -EINVAL;
3133 
3134 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3135 	if (dev_info->rx_queue == NULL)
3136 		return -EINVAL;
3137 
3138 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3139 						eth_dev_id,
3140 						&cap);
3141 	if (ret) {
3142 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3143 			"eth port %" PRIu16, id, eth_dev_id);
3144 		return ret;
3145 	}
3146 
3147 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3148 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
3149 				PRIu16, eth_dev_id);
3150 		return -EINVAL;
3151 	}
3152 
3153 	rte_spinlock_lock(&rx_adapter->rx_lock);
3154 	dev_info->cb_fn = cb_fn;
3155 	dev_info->cb_arg = cb_arg;
3156 	rte_spinlock_unlock(&rx_adapter->rx_lock);
3157 
3158 	return 0;
3159 }
3160 
3161 int
3162 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
3163 			uint16_t eth_dev_id,
3164 			uint16_t rx_queue_id,
3165 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
3166 {
3167 	struct rte_eventdev *dev;
3168 	struct event_eth_rx_adapter *rx_adapter;
3169 	struct eth_device_info *dev_info;
3170 	struct eth_rx_queue_info *queue_info;
3171 	struct rte_event *qi_ev;
3172 	int ret;
3173 
3174 	if (rxa_memzone_lookup())
3175 		return -ENOMEM;
3176 
3177 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3178 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3179 
3180 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3181 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3182 		return -EINVAL;
3183 	}
3184 
3185 	if (queue_conf == NULL) {
3186 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3187 		return -EINVAL;
3188 	}
3189 
3190 	rx_adapter = rxa_id_to_adapter(id);
3191 	if (rx_adapter == NULL)
3192 		return -EINVAL;
3193 
3194 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3195 	if (dev_info->rx_queue == NULL ||
3196 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3197 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3198 		return -EINVAL;
3199 	}
3200 
3201 	queue_info = &dev_info->rx_queue[rx_queue_id];
3202 	qi_ev = (struct rte_event *)&queue_info->event;
3203 
3204 	memset(queue_conf, 0, sizeof(*queue_conf));
3205 	queue_conf->rx_queue_flags = 0;
3206 	if (queue_info->flow_id_mask != 0)
3207 		queue_conf->rx_queue_flags |=
3208 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3209 	queue_conf->servicing_weight = queue_info->wt;
3210 
3211 	memcpy(&queue_conf->ev, qi_ev, sizeof(*qi_ev));
3212 
3213 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3214 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3215 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3216 						&rte_eth_devices[eth_dev_id],
3217 						rx_queue_id,
3218 						queue_conf);
3219 		return ret;
3220 	}
3221 
3222 	return 0;
3223 }
3224 
3225 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_u64(d, #s, stats.s)
3226 
3227 static int
3228 handle_rxa_stats(const char *cmd __rte_unused,
3229 		 const char *params,
3230 		 struct rte_tel_data *d)
3231 {
3232 	uint8_t rx_adapter_id;
3233 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3234 
3235 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3236 		return -1;
3237 
3238 	/* Get Rx adapter ID from parameter string */
3239 	rx_adapter_id = atoi(params);
3240 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3241 
3242 	/* Get Rx adapter stats */
3243 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3244 					       &rx_adptr_stats)) {
3245 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats\n");
3246 		return -1;
3247 	}
3248 
3249 	rte_tel_data_start_dict(d);
3250 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3251 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3252 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3253 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3254 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3255 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3256 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3257 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3258 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3259 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3260 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3261 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3262 
3263 	return 0;
3264 }
3265 
3266 static int
3267 handle_rxa_stats_reset(const char *cmd __rte_unused,
3268 		       const char *params,
3269 		       struct rte_tel_data *d __rte_unused)
3270 {
3271 	uint8_t rx_adapter_id;
3272 
3273 	if (params == NULL || strlen(params) == 0 || ~isdigit(*params))
3274 		return -1;
3275 
3276 	/* Get Rx adapter ID from parameter string */
3277 	rx_adapter_id = atoi(params);
3278 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3279 
3280 	/* Reset Rx adapter stats */
3281 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3282 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats\n");
3283 		return -1;
3284 	}
3285 
3286 	return 0;
3287 }
3288 
3289 static int
3290 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3291 			  const char *params,
3292 			  struct rte_tel_data *d)
3293 {
3294 	uint8_t rx_adapter_id;
3295 	uint16_t rx_queue_id;
3296 	int eth_dev_id;
3297 	char *token, *l_params;
3298 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3299 
3300 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3301 		return -1;
3302 
3303 	/* Get Rx adapter ID from parameter string */
3304 	l_params = strdup(params);
3305 	token = strtok(l_params, ",");
3306 	rx_adapter_id = strtoul(token, NULL, 10);
3307 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3308 
3309 	token = strtok(NULL, ",");
3310 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3311 		return -1;
3312 
3313 	/* Get device ID from parameter string */
3314 	eth_dev_id = strtoul(token, NULL, 10);
3315 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3316 
3317 	token = strtok(NULL, ",");
3318 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3319 		return -1;
3320 
3321 	/* Get Rx queue ID from parameter string */
3322 	rx_queue_id = strtoul(token, NULL, 10);
3323 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3324 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3325 		return -EINVAL;
3326 	}
3327 
3328 	token = strtok(NULL, "\0");
3329 	if (token != NULL)
3330 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3331 				 " telemetry command, igrnoring");
3332 
3333 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3334 						    rx_queue_id, &queue_conf)) {
3335 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3336 		return -1;
3337 	}
3338 
3339 	rte_tel_data_start_dict(d);
3340 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3341 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3342 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3343 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3344 	RXA_ADD_DICT(queue_conf, servicing_weight);
3345 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3346 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3347 	RXA_ADD_DICT(queue_conf.ev, priority);
3348 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3349 
3350 	return 0;
3351 }
3352 
3353 static int
3354 handle_rxa_get_queue_stats(const char *cmd __rte_unused,
3355 			   const char *params,
3356 			   struct rte_tel_data *d)
3357 {
3358 	uint8_t rx_adapter_id;
3359 	uint16_t rx_queue_id;
3360 	int eth_dev_id;
3361 	char *token, *l_params;
3362 	struct rte_event_eth_rx_adapter_queue_stats q_stats;
3363 
3364 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3365 		return -1;
3366 
3367 	/* Get Rx adapter ID from parameter string */
3368 	l_params = strdup(params);
3369 	token = strtok(l_params, ",");
3370 	rx_adapter_id = strtoul(token, NULL, 10);
3371 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3372 
3373 	token = strtok(NULL, ",");
3374 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3375 		return -1;
3376 
3377 	/* Get device ID from parameter string */
3378 	eth_dev_id = strtoul(token, NULL, 10);
3379 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3380 
3381 	token = strtok(NULL, ",");
3382 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3383 		return -1;
3384 
3385 	/* Get Rx queue ID from parameter string */
3386 	rx_queue_id = strtoul(token, NULL, 10);
3387 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3388 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3389 		return -EINVAL;
3390 	}
3391 
3392 	token = strtok(NULL, "\0");
3393 	if (token != NULL)
3394 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3395 				 " telemetry command, igrnoring");
3396 
3397 	if (rte_event_eth_rx_adapter_queue_stats_get(rx_adapter_id, eth_dev_id,
3398 						    rx_queue_id, &q_stats)) {
3399 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue stats");
3400 		return -1;
3401 	}
3402 
3403 	rte_tel_data_start_dict(d);
3404 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3405 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3406 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3407 	RXA_ADD_DICT(q_stats, rx_event_buf_count);
3408 	RXA_ADD_DICT(q_stats, rx_event_buf_size);
3409 	RXA_ADD_DICT(q_stats, rx_poll_count);
3410 	RXA_ADD_DICT(q_stats, rx_packets);
3411 	RXA_ADD_DICT(q_stats, rx_dropped);
3412 
3413 	return 0;
3414 }
3415 
3416 static int
3417 handle_rxa_queue_stats_reset(const char *cmd __rte_unused,
3418 			     const char *params,
3419 			     struct rte_tel_data *d __rte_unused)
3420 {
3421 	uint8_t rx_adapter_id;
3422 	uint16_t rx_queue_id;
3423 	int eth_dev_id;
3424 	char *token, *l_params;
3425 
3426 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3427 		return -1;
3428 
3429 	/* Get Rx adapter ID from parameter string */
3430 	l_params = strdup(params);
3431 	token = strtok(l_params, ",");
3432 	rx_adapter_id = strtoul(token, NULL, 10);
3433 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3434 
3435 	token = strtok(NULL, ",");
3436 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3437 		return -1;
3438 
3439 	/* Get device ID from parameter string */
3440 	eth_dev_id = strtoul(token, NULL, 10);
3441 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3442 
3443 	token = strtok(NULL, ",");
3444 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3445 		return -1;
3446 
3447 	/* Get Rx queue ID from parameter string */
3448 	rx_queue_id = strtoul(token, NULL, 10);
3449 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3450 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3451 		return -EINVAL;
3452 	}
3453 
3454 	token = strtok(NULL, "\0");
3455 	if (token != NULL)
3456 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3457 				 " telemetry command, igrnoring");
3458 
3459 	if (rte_event_eth_rx_adapter_queue_stats_reset(rx_adapter_id,
3460 						       eth_dev_id,
3461 						       rx_queue_id)) {
3462 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter queue stats");
3463 		return -1;
3464 	}
3465 
3466 	return 0;
3467 }
3468 
3469 RTE_INIT(rxa_init_telemetry)
3470 {
3471 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3472 		handle_rxa_stats,
3473 		"Returns Rx adapter stats. Parameter: rxa_id");
3474 
3475 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3476 		handle_rxa_stats_reset,
3477 		"Reset Rx adapter stats. Parameter: rxa_id");
3478 
3479 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3480 		handle_rxa_get_queue_conf,
3481 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3482 
3483 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats",
3484 		handle_rxa_get_queue_stats,
3485 		"Returns Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3486 
3487 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats_reset",
3488 		handle_rxa_queue_stats_reset,
3489 		"Reset Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3490 }
3491