xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 30a1de105a5f40d77b344a891c4a68f79e815c43)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9 
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
21 #include <rte_telemetry.h>
22 
23 #include "rte_eventdev.h"
24 #include "eventdev_pmd.h"
25 #include "eventdev_trace.h"
26 #include "rte_event_eth_rx_adapter.h"
27 
28 #define BATCH_SIZE		32
29 #define BLOCK_CNT_THRESHOLD	10
30 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
31 #define MAX_VECTOR_SIZE		1024
32 #define MIN_VECTOR_SIZE		4
33 #define MAX_VECTOR_NS		1E9
34 #define MIN_VECTOR_NS		1E5
35 
36 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
37 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
38 
39 #define RSS_KEY_SIZE	40
40 /* value written to intr thread pipe to signal thread exit */
41 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
42 /* Sentinel value to detect initialized file handle */
43 #define INIT_FD		-1
44 
45 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
46 
47 /*
48  * Used to store port and queue ID of interrupting Rx queue
49  */
50 union queue_data {
51 	RTE_STD_C11
52 	void *ptr;
53 	struct {
54 		uint16_t port;
55 		uint16_t queue;
56 	};
57 };
58 
59 /*
60  * There is an instance of this struct per polled Rx queue added to the
61  * adapter
62  */
63 struct eth_rx_poll_entry {
64 	/* Eth port to poll */
65 	uint16_t eth_dev_id;
66 	/* Eth rx queue to poll */
67 	uint16_t eth_rx_qid;
68 };
69 
70 struct eth_rx_vector_data {
71 	TAILQ_ENTRY(eth_rx_vector_data) next;
72 	uint16_t port;
73 	uint16_t queue;
74 	uint16_t max_vector_count;
75 	uint64_t event;
76 	uint64_t ts;
77 	uint64_t vector_timeout_ticks;
78 	struct rte_mempool *vector_pool;
79 	struct rte_event_vector *vector_ev;
80 } __rte_cache_aligned;
81 
82 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
83 
84 /* Instance per adapter */
85 struct eth_event_enqueue_buffer {
86 	/* Count of events in this buffer */
87 	uint16_t count;
88 	/* Array of events in this buffer */
89 	struct rte_event *events;
90 	/* size of event buffer */
91 	uint16_t events_size;
92 	/* Event enqueue happens from head */
93 	uint16_t head;
94 	/* New packets from rte_eth_rx_burst is enqued from tail */
95 	uint16_t tail;
96 	/* last element in the buffer before rollover */
97 	uint16_t last;
98 	uint16_t last_mask;
99 };
100 
101 struct event_eth_rx_adapter {
102 	/* RSS key */
103 	uint8_t rss_key_be[RSS_KEY_SIZE];
104 	/* Event device identifier */
105 	uint8_t eventdev_id;
106 	/* Event port identifier */
107 	uint8_t event_port_id;
108 	/* Flag indicating per rxq event buffer */
109 	bool use_queue_event_buf;
110 	/* Per ethernet device structure */
111 	struct eth_device_info *eth_devices;
112 	/* Lock to serialize config updates with service function */
113 	rte_spinlock_t rx_lock;
114 	/* Max mbufs processed in any service function invocation */
115 	uint32_t max_nb_rx;
116 	/* Receive queues that need to be polled */
117 	struct eth_rx_poll_entry *eth_rx_poll;
118 	/* Size of the eth_rx_poll array */
119 	uint16_t num_rx_polled;
120 	/* Weighted round robin schedule */
121 	uint32_t *wrr_sched;
122 	/* wrr_sched[] size */
123 	uint32_t wrr_len;
124 	/* Next entry in wrr[] to begin polling */
125 	uint32_t wrr_pos;
126 	/* Event burst buffer */
127 	struct eth_event_enqueue_buffer event_enqueue_buffer;
128 	/* Vector enable flag */
129 	uint8_t ena_vector;
130 	/* Timestamp of previous vector expiry list traversal */
131 	uint64_t prev_expiry_ts;
132 	/* Minimum ticks to wait before traversing expiry list */
133 	uint64_t vector_tmo_ticks;
134 	/* vector list */
135 	struct eth_rx_vector_data_list vector_list;
136 	/* Per adapter stats */
137 	struct rte_event_eth_rx_adapter_stats stats;
138 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
139 	uint16_t enq_block_count;
140 	/* Block start ts */
141 	uint64_t rx_enq_block_start_ts;
142 	/* epoll fd used to wait for Rx interrupts */
143 	int epd;
144 	/* Num of interrupt driven interrupt queues */
145 	uint32_t num_rx_intr;
146 	/* Used to send <dev id, queue id> of interrupting Rx queues from
147 	 * the interrupt thread to the Rx thread
148 	 */
149 	struct rte_ring *intr_ring;
150 	/* Rx Queue data (dev id, queue id) for the last non-empty
151 	 * queue polled
152 	 */
153 	union queue_data qd;
154 	/* queue_data is valid */
155 	int qd_valid;
156 	/* Interrupt ring lock, synchronizes Rx thread
157 	 * and interrupt thread
158 	 */
159 	rte_spinlock_t intr_ring_lock;
160 	/* event array passed to rte_poll_wait */
161 	struct rte_epoll_event *epoll_events;
162 	/* Count of interrupt vectors in use */
163 	uint32_t num_intr_vec;
164 	/* Thread blocked on Rx interrupts */
165 	pthread_t rx_intr_thread;
166 	/* Configuration callback for rte_service configuration */
167 	rte_event_eth_rx_adapter_conf_cb conf_cb;
168 	/* Configuration callback argument */
169 	void *conf_arg;
170 	/* Set if  default_cb is being used */
171 	int default_cb_arg;
172 	/* Service initialization state */
173 	uint8_t service_inited;
174 	/* Total count of Rx queues in adapter */
175 	uint32_t nb_queues;
176 	/* Memory allocation name */
177 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
178 	/* Socket identifier cached from eventdev */
179 	int socket_id;
180 	/* Per adapter EAL service */
181 	uint32_t service_id;
182 	/* Adapter started flag */
183 	uint8_t rxa_started;
184 	/* Adapter ID */
185 	uint8_t id;
186 } __rte_cache_aligned;
187 
188 /* Per eth device */
189 struct eth_device_info {
190 	struct rte_eth_dev *dev;
191 	struct eth_rx_queue_info *rx_queue;
192 	/* Rx callback */
193 	rte_event_eth_rx_adapter_cb_fn cb_fn;
194 	/* Rx callback argument */
195 	void *cb_arg;
196 	/* Set if ethdev->eventdev packet transfer uses a
197 	 * hardware mechanism
198 	 */
199 	uint8_t internal_event_port;
200 	/* Set if the adapter is processing rx queues for
201 	 * this eth device and packet processing has been
202 	 * started, allows for the code to know if the PMD
203 	 * rx_adapter_stop callback needs to be invoked
204 	 */
205 	uint8_t dev_rx_started;
206 	/* Number of queues added for this device */
207 	uint16_t nb_dev_queues;
208 	/* Number of poll based queues
209 	 * If nb_rx_poll > 0, the start callback will
210 	 * be invoked if not already invoked
211 	 */
212 	uint16_t nb_rx_poll;
213 	/* Number of interrupt based queues
214 	 * If nb_rx_intr > 0, the start callback will
215 	 * be invoked if not already invoked.
216 	 */
217 	uint16_t nb_rx_intr;
218 	/* Number of queues that use the shared interrupt */
219 	uint16_t nb_shared_intr;
220 	/* sum(wrr(q)) for all queues within the device
221 	 * useful when deleting all device queues
222 	 */
223 	uint32_t wrr_len;
224 	/* Intr based queue index to start polling from, this is used
225 	 * if the number of shared interrupts is non-zero
226 	 */
227 	uint16_t next_q_idx;
228 	/* Intr based queue indices */
229 	uint16_t *intr_queue;
230 	/* device generates per Rx queue interrupt for queue index
231 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
232 	 */
233 	int multi_intr_cap;
234 	/* shared interrupt enabled */
235 	int shared_intr_enabled;
236 };
237 
238 /* Per Rx queue */
239 struct eth_rx_queue_info {
240 	int queue_enabled;	/* True if added */
241 	int intr_enabled;
242 	uint8_t ena_vector;
243 	uint16_t wt;		/* Polling weight */
244 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
245 	uint64_t event;
246 	struct eth_rx_vector_data vector_data;
247 	struct eth_event_enqueue_buffer *event_buf;
248 	/* use adapter stats struct for queue level stats,
249 	 * as same stats need to be updated for adapter and queue
250 	 */
251 	struct rte_event_eth_rx_adapter_stats *stats;
252 };
253 
254 static struct event_eth_rx_adapter **event_eth_rx_adapter;
255 
256 /* Enable dynamic timestamp field in mbuf */
257 static uint64_t event_eth_rx_timestamp_dynflag;
258 static int event_eth_rx_timestamp_dynfield_offset = -1;
259 
260 static inline rte_mbuf_timestamp_t *
261 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
262 {
263 	return RTE_MBUF_DYNFIELD(mbuf,
264 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
265 }
266 
267 static inline int
268 rxa_validate_id(uint8_t id)
269 {
270 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
271 }
272 
273 static inline struct eth_event_enqueue_buffer *
274 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
275 		  uint16_t rx_queue_id,
276 		  struct rte_event_eth_rx_adapter_stats **stats)
277 {
278 	if (rx_adapter->use_queue_event_buf) {
279 		struct eth_device_info *dev_info =
280 			&rx_adapter->eth_devices[eth_dev_id];
281 		*stats = dev_info->rx_queue[rx_queue_id].stats;
282 		return dev_info->rx_queue[rx_queue_id].event_buf;
283 	} else {
284 		*stats = &rx_adapter->stats;
285 		return &rx_adapter->event_enqueue_buffer;
286 	}
287 }
288 
289 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
290 	if (!rxa_validate_id(id)) { \
291 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
292 		return retval; \
293 	} \
294 } while (0)
295 
296 static inline int
297 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
298 {
299 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
300 }
301 
302 /* Greatest common divisor */
303 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
304 {
305 	uint16_t r = a % b;
306 
307 	return r ? rxa_gcd_u16(b, r) : b;
308 }
309 
310 /* Returns the next queue in the polling sequence
311  *
312  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
313  */
314 static int
315 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
316 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
317 	     uint16_t gcd, int prev)
318 {
319 	int i = prev;
320 	uint16_t w;
321 
322 	while (1) {
323 		uint16_t q;
324 		uint16_t d;
325 
326 		i = (i + 1) % n;
327 		if (i == 0) {
328 			*cw = *cw - gcd;
329 			if (*cw <= 0)
330 				*cw = max_wt;
331 		}
332 
333 		q = eth_rx_poll[i].eth_rx_qid;
334 		d = eth_rx_poll[i].eth_dev_id;
335 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
336 
337 		if ((int)w >= *cw)
338 			return i;
339 	}
340 }
341 
342 static inline int
343 rxa_shared_intr(struct eth_device_info *dev_info,
344 	int rx_queue_id)
345 {
346 	int multi_intr_cap;
347 
348 	if (dev_info->dev->intr_handle == NULL)
349 		return 0;
350 
351 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
352 	return !multi_intr_cap ||
353 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
354 }
355 
356 static inline int
357 rxa_intr_queue(struct eth_device_info *dev_info,
358 	int rx_queue_id)
359 {
360 	struct eth_rx_queue_info *queue_info;
361 
362 	queue_info = &dev_info->rx_queue[rx_queue_id];
363 	return dev_info->rx_queue &&
364 		!dev_info->internal_event_port &&
365 		queue_info->queue_enabled && queue_info->wt == 0;
366 }
367 
368 static inline int
369 rxa_polled_queue(struct eth_device_info *dev_info,
370 	int rx_queue_id)
371 {
372 	struct eth_rx_queue_info *queue_info;
373 
374 	queue_info = &dev_info->rx_queue[rx_queue_id];
375 	return !dev_info->internal_event_port &&
376 		dev_info->rx_queue &&
377 		queue_info->queue_enabled && queue_info->wt != 0;
378 }
379 
380 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
381 static int
382 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
383 {
384 	uint16_t i;
385 	int n, s;
386 	uint16_t nbq;
387 
388 	nbq = dev_info->dev->data->nb_rx_queues;
389 	n = 0; /* non shared count */
390 	s = 0; /* shared count */
391 
392 	if (rx_queue_id == -1) {
393 		for (i = 0; i < nbq; i++) {
394 			if (!rxa_shared_intr(dev_info, i))
395 				n += add ? !rxa_intr_queue(dev_info, i) :
396 					rxa_intr_queue(dev_info, i);
397 			else
398 				s += add ? !rxa_intr_queue(dev_info, i) :
399 					rxa_intr_queue(dev_info, i);
400 		}
401 
402 		if (s > 0) {
403 			if ((add && dev_info->nb_shared_intr == 0) ||
404 				(!add && dev_info->nb_shared_intr))
405 				n += 1;
406 		}
407 	} else {
408 		if (!rxa_shared_intr(dev_info, rx_queue_id))
409 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
410 				rxa_intr_queue(dev_info, rx_queue_id);
411 		else
412 			n = add ? !dev_info->nb_shared_intr :
413 				dev_info->nb_shared_intr == 1;
414 	}
415 
416 	return add ? n : -n;
417 }
418 
419 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
420  */
421 static void
422 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
423 			  struct eth_device_info *dev_info, int rx_queue_id,
424 			  uint32_t *nb_rx_intr)
425 {
426 	uint32_t intr_diff;
427 
428 	if (rx_queue_id == -1)
429 		intr_diff = dev_info->nb_rx_intr;
430 	else
431 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
432 
433 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
434 }
435 
436 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
437  * interrupt queues could currently be poll mode Rx queues
438  */
439 static void
440 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
441 			  struct eth_device_info *dev_info, int rx_queue_id,
442 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
443 			  uint32_t *nb_wrr)
444 {
445 	uint32_t intr_diff;
446 	uint32_t poll_diff;
447 	uint32_t wrr_len_diff;
448 
449 	if (rx_queue_id == -1) {
450 		intr_diff = dev_info->dev->data->nb_rx_queues -
451 						dev_info->nb_rx_intr;
452 		poll_diff = dev_info->nb_rx_poll;
453 		wrr_len_diff = dev_info->wrr_len;
454 	} else {
455 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
456 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
457 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
458 					0;
459 	}
460 
461 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
462 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
463 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
464 }
465 
466 /* Calculate size of the eth_rx_poll and wrr_sched arrays
467  * after deleting poll mode rx queues
468  */
469 static void
470 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
471 			  struct eth_device_info *dev_info, int rx_queue_id,
472 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
473 {
474 	uint32_t poll_diff;
475 	uint32_t wrr_len_diff;
476 
477 	if (rx_queue_id == -1) {
478 		poll_diff = dev_info->nb_rx_poll;
479 		wrr_len_diff = dev_info->wrr_len;
480 	} else {
481 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
482 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
483 					0;
484 	}
485 
486 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
487 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
488 }
489 
490 /* Calculate nb_rx_* after adding poll mode rx queues
491  */
492 static void
493 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
494 			  struct eth_device_info *dev_info, int rx_queue_id,
495 			  uint16_t wt, uint32_t *nb_rx_poll,
496 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
497 {
498 	uint32_t intr_diff;
499 	uint32_t poll_diff;
500 	uint32_t wrr_len_diff;
501 
502 	if (rx_queue_id == -1) {
503 		intr_diff = dev_info->nb_rx_intr;
504 		poll_diff = dev_info->dev->data->nb_rx_queues -
505 						dev_info->nb_rx_poll;
506 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
507 				- dev_info->wrr_len;
508 	} else {
509 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
510 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
511 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
512 				wt - dev_info->rx_queue[rx_queue_id].wt :
513 				wt;
514 	}
515 
516 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
517 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
518 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
519 }
520 
521 /* Calculate nb_rx_* after adding rx_queue_id */
522 static void
523 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
524 		     struct eth_device_info *dev_info, int rx_queue_id,
525 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
526 		     uint32_t *nb_wrr)
527 {
528 	if (wt != 0)
529 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
530 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
531 	else
532 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
533 					nb_rx_poll, nb_rx_intr, nb_wrr);
534 }
535 
536 /* Calculate nb_rx_* after deleting rx_queue_id */
537 static void
538 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
539 		     struct eth_device_info *dev_info, int rx_queue_id,
540 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
541 		     uint32_t *nb_wrr)
542 {
543 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
544 				nb_wrr);
545 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
546 				nb_rx_intr);
547 }
548 
549 /*
550  * Allocate the rx_poll array
551  */
552 static struct eth_rx_poll_entry *
553 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
554 {
555 	size_t len;
556 
557 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
558 							RTE_CACHE_LINE_SIZE);
559 	return  rte_zmalloc_socket(rx_adapter->mem_name,
560 				len,
561 				RTE_CACHE_LINE_SIZE,
562 				rx_adapter->socket_id);
563 }
564 
565 /*
566  * Allocate the WRR array
567  */
568 static uint32_t *
569 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
570 {
571 	size_t len;
572 
573 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
574 			RTE_CACHE_LINE_SIZE);
575 	return  rte_zmalloc_socket(rx_adapter->mem_name,
576 				len,
577 				RTE_CACHE_LINE_SIZE,
578 				rx_adapter->socket_id);
579 }
580 
581 static int
582 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
583 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
584 		      uint32_t **wrr_sched)
585 {
586 
587 	if (nb_poll == 0) {
588 		*rx_poll = NULL;
589 		*wrr_sched = NULL;
590 		return 0;
591 	}
592 
593 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
594 	if (*rx_poll == NULL) {
595 		*wrr_sched = NULL;
596 		return -ENOMEM;
597 	}
598 
599 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
600 	if (*wrr_sched == NULL) {
601 		rte_free(*rx_poll);
602 		return -ENOMEM;
603 	}
604 	return 0;
605 }
606 
607 /* Precalculate WRR polling sequence for all queues in rx_adapter */
608 static void
609 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
610 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
611 {
612 	uint16_t d;
613 	uint16_t q;
614 	unsigned int i;
615 	int prev = -1;
616 	int cw = -1;
617 
618 	/* Initialize variables for calculation of wrr schedule */
619 	uint16_t max_wrr_pos = 0;
620 	unsigned int poll_q = 0;
621 	uint16_t max_wt = 0;
622 	uint16_t gcd = 0;
623 
624 	if (rx_poll == NULL)
625 		return;
626 
627 	/* Generate array of all queues to poll, the size of this
628 	 * array is poll_q
629 	 */
630 	RTE_ETH_FOREACH_DEV(d) {
631 		uint16_t nb_rx_queues;
632 		struct eth_device_info *dev_info =
633 				&rx_adapter->eth_devices[d];
634 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
635 		if (dev_info->rx_queue == NULL)
636 			continue;
637 		if (dev_info->internal_event_port)
638 			continue;
639 		dev_info->wrr_len = 0;
640 		for (q = 0; q < nb_rx_queues; q++) {
641 			struct eth_rx_queue_info *queue_info =
642 				&dev_info->rx_queue[q];
643 			uint16_t wt;
644 
645 			if (!rxa_polled_queue(dev_info, q))
646 				continue;
647 			wt = queue_info->wt;
648 			rx_poll[poll_q].eth_dev_id = d;
649 			rx_poll[poll_q].eth_rx_qid = q;
650 			max_wrr_pos += wt;
651 			dev_info->wrr_len += wt;
652 			max_wt = RTE_MAX(max_wt, wt);
653 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
654 			poll_q++;
655 		}
656 	}
657 
658 	/* Generate polling sequence based on weights */
659 	prev = -1;
660 	cw = -1;
661 	for (i = 0; i < max_wrr_pos; i++) {
662 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
663 				     rx_poll, max_wt, gcd, prev);
664 		prev = rx_wrr[i];
665 	}
666 }
667 
668 static inline void
669 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
670 	struct rte_ipv6_hdr **ipv6_hdr)
671 {
672 	struct rte_ether_hdr *eth_hdr =
673 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
674 	struct rte_vlan_hdr *vlan_hdr;
675 
676 	*ipv4_hdr = NULL;
677 	*ipv6_hdr = NULL;
678 
679 	switch (eth_hdr->ether_type) {
680 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
681 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
682 		break;
683 
684 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
685 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
686 		break;
687 
688 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
689 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
690 		switch (vlan_hdr->eth_proto) {
691 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
692 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
693 			break;
694 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
695 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
696 			break;
697 		default:
698 			break;
699 		}
700 		break;
701 
702 	default:
703 		break;
704 	}
705 }
706 
707 /* Calculate RSS hash for IPv4/6 */
708 static inline uint32_t
709 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
710 {
711 	uint32_t input_len;
712 	void *tuple;
713 	struct rte_ipv4_tuple ipv4_tuple;
714 	struct rte_ipv6_tuple ipv6_tuple;
715 	struct rte_ipv4_hdr *ipv4_hdr;
716 	struct rte_ipv6_hdr *ipv6_hdr;
717 
718 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
719 
720 	if (ipv4_hdr) {
721 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
722 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
723 		tuple = &ipv4_tuple;
724 		input_len = RTE_THASH_V4_L3_LEN;
725 	} else if (ipv6_hdr) {
726 		rte_thash_load_v6_addrs(ipv6_hdr,
727 					(union rte_thash_tuple *)&ipv6_tuple);
728 		tuple = &ipv6_tuple;
729 		input_len = RTE_THASH_V6_L3_LEN;
730 	} else
731 		return 0;
732 
733 	return rte_softrss_be(tuple, input_len, rss_key_be);
734 }
735 
736 static inline int
737 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
738 {
739 	return !!rx_adapter->enq_block_count;
740 }
741 
742 static inline void
743 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
744 {
745 	if (rx_adapter->rx_enq_block_start_ts)
746 		return;
747 
748 	rx_adapter->enq_block_count++;
749 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
750 		return;
751 
752 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
753 }
754 
755 static inline void
756 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
757 		     struct rte_event_eth_rx_adapter_stats *stats)
758 {
759 	if (unlikely(!stats->rx_enq_start_ts))
760 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
761 
762 	if (likely(!rxa_enq_blocked(rx_adapter)))
763 		return;
764 
765 	rx_adapter->enq_block_count = 0;
766 	if (rx_adapter->rx_enq_block_start_ts) {
767 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
768 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
769 		    rx_adapter->rx_enq_block_start_ts;
770 		rx_adapter->rx_enq_block_start_ts = 0;
771 	}
772 }
773 
774 /* Enqueue buffered events to event device */
775 static inline uint16_t
776 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
777 		       struct eth_event_enqueue_buffer *buf,
778 		       struct rte_event_eth_rx_adapter_stats *stats)
779 {
780 	uint16_t count = buf->count;
781 	uint16_t n = 0;
782 
783 	if (!count)
784 		return 0;
785 
786 	if (buf->last)
787 		count = buf->last - buf->head;
788 
789 	if (count) {
790 		n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
791 						rx_adapter->event_port_id,
792 						&buf->events[buf->head],
793 						count);
794 		if (n != count)
795 			stats->rx_enq_retry++;
796 
797 		buf->head += n;
798 	}
799 
800 	if (buf->last && n == count) {
801 		uint16_t n1;
802 
803 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
804 					rx_adapter->event_port_id,
805 					&buf->events[0],
806 					buf->tail);
807 
808 		if (n1 != buf->tail)
809 			stats->rx_enq_retry++;
810 
811 		buf->last = 0;
812 		buf->head = n1;
813 		buf->last_mask = 0;
814 		n += n1;
815 	}
816 
817 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
818 		rxa_enq_block_start_ts(rx_adapter);
819 
820 	buf->count -= n;
821 	stats->rx_enq_count += n;
822 
823 	return n;
824 }
825 
826 static inline void
827 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
828 		struct eth_rx_vector_data *vec)
829 {
830 	vec->vector_ev->nb_elem = 0;
831 	vec->vector_ev->port = vec->port;
832 	vec->vector_ev->queue = vec->queue;
833 	vec->vector_ev->attr_valid = true;
834 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
835 }
836 
837 static inline uint16_t
838 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
839 			struct eth_rx_queue_info *queue_info,
840 			struct eth_event_enqueue_buffer *buf,
841 			struct rte_mbuf **mbufs, uint16_t num)
842 {
843 	struct rte_event *ev = &buf->events[buf->count];
844 	struct eth_rx_vector_data *vec;
845 	uint16_t filled, space, sz;
846 
847 	filled = 0;
848 	vec = &queue_info->vector_data;
849 
850 	if (vec->vector_ev == NULL) {
851 		if (rte_mempool_get(vec->vector_pool,
852 				    (void **)&vec->vector_ev) < 0) {
853 			rte_pktmbuf_free_bulk(mbufs, num);
854 			return 0;
855 		}
856 		rxa_init_vector(rx_adapter, vec);
857 	}
858 	while (num) {
859 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
860 			/* Event ready. */
861 			ev->event = vec->event;
862 			ev->vec = vec->vector_ev;
863 			ev++;
864 			filled++;
865 			vec->vector_ev = NULL;
866 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
867 			if (rte_mempool_get(vec->vector_pool,
868 					    (void **)&vec->vector_ev) < 0) {
869 				rte_pktmbuf_free_bulk(mbufs, num);
870 				return 0;
871 			}
872 			rxa_init_vector(rx_adapter, vec);
873 		}
874 
875 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
876 		sz = num > space ? space : num;
877 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
878 		       sizeof(void *) * sz);
879 		vec->vector_ev->nb_elem += sz;
880 		num -= sz;
881 		mbufs += sz;
882 		vec->ts = rte_rdtsc();
883 	}
884 
885 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
886 		ev->event = vec->event;
887 		ev->vec = vec->vector_ev;
888 		ev++;
889 		filled++;
890 		vec->vector_ev = NULL;
891 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
892 	}
893 
894 	return filled;
895 }
896 
897 static inline void
898 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
899 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
900 		 struct eth_event_enqueue_buffer *buf,
901 		 struct rte_event_eth_rx_adapter_stats *stats)
902 {
903 	uint32_t i;
904 	struct eth_device_info *dev_info =
905 					&rx_adapter->eth_devices[eth_dev_id];
906 	struct eth_rx_queue_info *eth_rx_queue_info =
907 					&dev_info->rx_queue[rx_queue_id];
908 	uint16_t new_tail = buf->tail;
909 	uint64_t event = eth_rx_queue_info->event;
910 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
911 	struct rte_mbuf *m = mbufs[0];
912 	uint32_t rss_mask;
913 	uint32_t rss;
914 	int do_rss;
915 	uint16_t nb_cb;
916 	uint16_t dropped;
917 	uint64_t ts, ts_mask;
918 
919 	if (!eth_rx_queue_info->ena_vector) {
920 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
921 						0 : rte_get_tsc_cycles();
922 
923 		/* 0xffff ffff ffff ffff if RTE_MBUF_F_RX_TIMESTAMP is set,
924 		 * otherwise 0
925 		 */
926 		ts_mask = (uint64_t)(!(m->ol_flags &
927 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
928 
929 		/* 0xffff ffff if RTE_MBUF_F_RX_RSS_HASH is set, otherwise 0 */
930 		rss_mask = ~(((m->ol_flags & RTE_MBUF_F_RX_RSS_HASH) != 0) - 1);
931 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
932 		for (i = 0; i < num; i++) {
933 			struct rte_event *ev;
934 
935 			m = mbufs[i];
936 			*rxa_timestamp_dynfield(m) = ts |
937 					(*rxa_timestamp_dynfield(m) & ts_mask);
938 
939 			ev = &buf->events[new_tail];
940 
941 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
942 				     : m->hash.rss;
943 			ev->event = event;
944 			ev->flow_id = (rss & ~flow_id_mask) |
945 				      (ev->flow_id & flow_id_mask);
946 			ev->mbuf = m;
947 			new_tail++;
948 		}
949 	} else {
950 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
951 					      buf, mbufs, num);
952 	}
953 
954 	if (num && dev_info->cb_fn) {
955 
956 		dropped = 0;
957 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
958 				       buf->last |
959 				       (buf->events_size & ~buf->last_mask),
960 				       buf->count >= BATCH_SIZE ?
961 						buf->count - BATCH_SIZE : 0,
962 				       &buf->events[buf->tail],
963 				       num,
964 				       dev_info->cb_arg,
965 				       &dropped);
966 		if (unlikely(nb_cb > num))
967 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
968 				nb_cb, num);
969 		else
970 			num = nb_cb;
971 		if (dropped)
972 			stats->rx_dropped += dropped;
973 	}
974 
975 	buf->count += num;
976 	buf->tail += num;
977 }
978 
979 static inline bool
980 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
981 {
982 	uint32_t nb_req = buf->tail + BATCH_SIZE;
983 
984 	if (!buf->last) {
985 		if (nb_req <= buf->events_size)
986 			return true;
987 
988 		if (buf->head >= BATCH_SIZE) {
989 			buf->last_mask = ~0;
990 			buf->last = buf->tail;
991 			buf->tail = 0;
992 			return true;
993 		}
994 	}
995 
996 	return nb_req <= buf->head;
997 }
998 
999 /* Enqueue packets from  <port, q>  to event buffer */
1000 static inline uint32_t
1001 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
1002 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
1003 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf,
1004 	   struct rte_event_eth_rx_adapter_stats *stats)
1005 {
1006 	struct rte_mbuf *mbufs[BATCH_SIZE];
1007 	uint16_t n;
1008 	uint32_t nb_rx = 0;
1009 	uint32_t nb_flushed = 0;
1010 
1011 	if (rxq_empty)
1012 		*rxq_empty = 0;
1013 	/* Don't do a batch dequeue from the rx queue if there isn't
1014 	 * enough space in the enqueue buffer.
1015 	 */
1016 	while (rxa_pkt_buf_available(buf)) {
1017 		if (buf->count >= BATCH_SIZE)
1018 			nb_flushed +=
1019 				rxa_flush_event_buffer(rx_adapter, buf, stats);
1020 
1021 		stats->rx_poll_count++;
1022 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1023 		if (unlikely(!n)) {
1024 			if (rxq_empty)
1025 				*rxq_empty = 1;
1026 			break;
1027 		}
1028 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf,
1029 				 stats);
1030 		nb_rx += n;
1031 		if (rx_count + nb_rx > max_rx)
1032 			break;
1033 	}
1034 
1035 	if (buf->count > 0)
1036 		nb_flushed += rxa_flush_event_buffer(rx_adapter, buf, stats);
1037 
1038 	stats->rx_packets += nb_rx;
1039 	if (nb_flushed == 0)
1040 		rte_event_maintain(rx_adapter->eventdev_id,
1041 				   rx_adapter->event_port_id, 0);
1042 
1043 	return nb_rx;
1044 }
1045 
1046 static inline void
1047 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1048 {
1049 	uint16_t port_id;
1050 	uint16_t queue;
1051 	int err;
1052 	union queue_data qd;
1053 	struct eth_device_info *dev_info;
1054 	struct eth_rx_queue_info *queue_info;
1055 	int *intr_enabled;
1056 
1057 	qd.ptr = data;
1058 	port_id = qd.port;
1059 	queue = qd.queue;
1060 
1061 	dev_info = &rx_adapter->eth_devices[port_id];
1062 	queue_info = &dev_info->rx_queue[queue];
1063 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1064 	if (rxa_shared_intr(dev_info, queue))
1065 		intr_enabled = &dev_info->shared_intr_enabled;
1066 	else
1067 		intr_enabled = &queue_info->intr_enabled;
1068 
1069 	if (*intr_enabled) {
1070 		*intr_enabled = 0;
1071 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1072 		/* Entry should always be available.
1073 		 * The ring size equals the maximum number of interrupt
1074 		 * vectors supported (an interrupt vector is shared in
1075 		 * case of shared interrupts)
1076 		 */
1077 		if (err)
1078 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1079 				" to ring: %s", strerror(-err));
1080 		else
1081 			rte_eth_dev_rx_intr_disable(port_id, queue);
1082 	}
1083 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1084 }
1085 
1086 static int
1087 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1088 			  uint32_t num_intr_vec)
1089 {
1090 	if (rx_adapter->num_intr_vec + num_intr_vec >
1091 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1092 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1093 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1094 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1095 		return -ENOSPC;
1096 	}
1097 
1098 	return 0;
1099 }
1100 
1101 /* Delete entries for (dev, queue) from the interrupt ring */
1102 static void
1103 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1104 			  struct eth_device_info *dev_info,
1105 			  uint16_t rx_queue_id)
1106 {
1107 	int i, n;
1108 	union queue_data qd;
1109 
1110 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1111 
1112 	n = rte_ring_count(rx_adapter->intr_ring);
1113 	for (i = 0; i < n; i++) {
1114 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1115 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1116 			if (qd.port == dev_info->dev->data->port_id &&
1117 				qd.queue == rx_queue_id)
1118 				continue;
1119 		} else {
1120 			if (qd.port == dev_info->dev->data->port_id)
1121 				continue;
1122 		}
1123 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1124 	}
1125 
1126 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1127 }
1128 
1129 /* pthread callback handling interrupt mode receive queues
1130  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1131  * interrupting queue to the adapter's ring buffer for interrupt events.
1132  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1133  * the adapter service function.
1134  */
1135 static void *
1136 rxa_intr_thread(void *arg)
1137 {
1138 	struct event_eth_rx_adapter *rx_adapter = arg;
1139 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1140 	int n, i;
1141 
1142 	while (1) {
1143 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1144 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1145 		if (unlikely(n < 0))
1146 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1147 					n);
1148 		for (i = 0; i < n; i++) {
1149 			rxa_intr_ring_enqueue(rx_adapter,
1150 					epoll_events[i].epdata.data);
1151 		}
1152 	}
1153 
1154 	return NULL;
1155 }
1156 
1157 /* Dequeue <port, q> from interrupt ring and enqueue received
1158  * mbufs to eventdev
1159  */
1160 static inline void
1161 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1162 {
1163 	uint32_t n;
1164 	uint32_t nb_rx = 0;
1165 	int rxq_empty;
1166 	struct eth_event_enqueue_buffer *buf;
1167 	struct rte_event_eth_rx_adapter_stats *stats;
1168 	rte_spinlock_t *ring_lock;
1169 	uint8_t max_done = 0;
1170 
1171 	if (rx_adapter->num_rx_intr == 0)
1172 		return;
1173 
1174 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1175 		&& !rx_adapter->qd_valid)
1176 		return;
1177 
1178 	buf = &rx_adapter->event_enqueue_buffer;
1179 	stats = &rx_adapter->stats;
1180 	ring_lock = &rx_adapter->intr_ring_lock;
1181 
1182 	if (buf->count >= BATCH_SIZE)
1183 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1184 
1185 	while (rxa_pkt_buf_available(buf)) {
1186 		struct eth_device_info *dev_info;
1187 		uint16_t port;
1188 		uint16_t queue;
1189 		union queue_data qd  = rx_adapter->qd;
1190 		int err;
1191 
1192 		if (!rx_adapter->qd_valid) {
1193 			struct eth_rx_queue_info *queue_info;
1194 
1195 			rte_spinlock_lock(ring_lock);
1196 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1197 			if (err) {
1198 				rte_spinlock_unlock(ring_lock);
1199 				break;
1200 			}
1201 
1202 			port = qd.port;
1203 			queue = qd.queue;
1204 			rx_adapter->qd = qd;
1205 			rx_adapter->qd_valid = 1;
1206 			dev_info = &rx_adapter->eth_devices[port];
1207 			if (rxa_shared_intr(dev_info, queue))
1208 				dev_info->shared_intr_enabled = 1;
1209 			else {
1210 				queue_info = &dev_info->rx_queue[queue];
1211 				queue_info->intr_enabled = 1;
1212 			}
1213 			rte_eth_dev_rx_intr_enable(port, queue);
1214 			rte_spinlock_unlock(ring_lock);
1215 		} else {
1216 			port = qd.port;
1217 			queue = qd.queue;
1218 
1219 			dev_info = &rx_adapter->eth_devices[port];
1220 		}
1221 
1222 		if (rxa_shared_intr(dev_info, queue)) {
1223 			uint16_t i;
1224 			uint16_t nb_queues;
1225 
1226 			nb_queues = dev_info->dev->data->nb_rx_queues;
1227 			n = 0;
1228 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1229 				uint8_t enq_buffer_full;
1230 
1231 				if (!rxa_intr_queue(dev_info, i))
1232 					continue;
1233 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1234 					rx_adapter->max_nb_rx,
1235 					&rxq_empty, buf, stats);
1236 				nb_rx += n;
1237 
1238 				enq_buffer_full = !rxq_empty && n == 0;
1239 				max_done = nb_rx > rx_adapter->max_nb_rx;
1240 
1241 				if (enq_buffer_full || max_done) {
1242 					dev_info->next_q_idx = i;
1243 					goto done;
1244 				}
1245 			}
1246 
1247 			rx_adapter->qd_valid = 0;
1248 
1249 			/* Reinitialize for next interrupt */
1250 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1251 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1252 						0;
1253 		} else {
1254 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1255 				rx_adapter->max_nb_rx,
1256 				&rxq_empty, buf, stats);
1257 			rx_adapter->qd_valid = !rxq_empty;
1258 			nb_rx += n;
1259 			if (nb_rx > rx_adapter->max_nb_rx)
1260 				break;
1261 		}
1262 	}
1263 
1264 done:
1265 	rx_adapter->stats.rx_intr_packets += nb_rx;
1266 }
1267 
1268 /*
1269  * Polls receive queues added to the event adapter and enqueues received
1270  * packets to the event device.
1271  *
1272  * The receive code enqueues initially to a temporary buffer, the
1273  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1274  *
1275  * If there isn't space available in the temporary buffer, packets from the
1276  * Rx queue aren't dequeued from the eth device, this back pressures the
1277  * eth device, in virtual device environments this back pressure is relayed to
1278  * the hypervisor's switching layer where adjustments can be made to deal with
1279  * it.
1280  */
1281 static inline void
1282 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1283 {
1284 	uint32_t num_queue;
1285 	uint32_t nb_rx = 0;
1286 	struct eth_event_enqueue_buffer *buf = NULL;
1287 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1288 	uint32_t wrr_pos;
1289 	uint32_t max_nb_rx;
1290 
1291 	wrr_pos = rx_adapter->wrr_pos;
1292 	max_nb_rx = rx_adapter->max_nb_rx;
1293 
1294 	/* Iterate through a WRR sequence */
1295 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1296 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1297 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1298 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1299 
1300 		buf = rxa_event_buf_get(rx_adapter, d, qid, &stats);
1301 
1302 		/* Don't do a batch dequeue from the rx queue if there isn't
1303 		 * enough space in the enqueue buffer.
1304 		 */
1305 		if (buf->count >= BATCH_SIZE)
1306 			rxa_flush_event_buffer(rx_adapter, buf, stats);
1307 		if (!rxa_pkt_buf_available(buf)) {
1308 			if (rx_adapter->use_queue_event_buf)
1309 				goto poll_next_entry;
1310 			else {
1311 				rx_adapter->wrr_pos = wrr_pos;
1312 				return;
1313 			}
1314 		}
1315 
1316 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1317 				NULL, buf, stats);
1318 		if (nb_rx > max_nb_rx) {
1319 			rx_adapter->wrr_pos =
1320 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1321 			break;
1322 		}
1323 
1324 poll_next_entry:
1325 		if (++wrr_pos == rx_adapter->wrr_len)
1326 			wrr_pos = 0;
1327 	}
1328 }
1329 
1330 static void
1331 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1332 {
1333 	struct event_eth_rx_adapter *rx_adapter = arg;
1334 	struct eth_event_enqueue_buffer *buf = NULL;
1335 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1336 	struct rte_event *ev;
1337 
1338 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue, &stats);
1339 
1340 	if (buf->count)
1341 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1342 
1343 	if (vec->vector_ev->nb_elem == 0)
1344 		return;
1345 	ev = &buf->events[buf->count];
1346 
1347 	/* Event ready. */
1348 	ev->event = vec->event;
1349 	ev->vec = vec->vector_ev;
1350 	buf->count++;
1351 
1352 	vec->vector_ev = NULL;
1353 	vec->ts = 0;
1354 }
1355 
1356 static int
1357 rxa_service_func(void *args)
1358 {
1359 	struct event_eth_rx_adapter *rx_adapter = args;
1360 
1361 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1362 		return 0;
1363 	if (!rx_adapter->rxa_started) {
1364 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1365 		return 0;
1366 	}
1367 
1368 	if (rx_adapter->ena_vector) {
1369 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1370 		    rx_adapter->vector_tmo_ticks) {
1371 			struct eth_rx_vector_data *vec;
1372 
1373 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1374 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1375 
1376 				if (elapsed_time >= vec->vector_timeout_ticks) {
1377 					rxa_vector_expire(vec, rx_adapter);
1378 					TAILQ_REMOVE(&rx_adapter->vector_list,
1379 						     vec, next);
1380 				}
1381 			}
1382 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1383 		}
1384 	}
1385 
1386 	rxa_intr_ring_dequeue(rx_adapter);
1387 	rxa_poll(rx_adapter);
1388 
1389 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1390 
1391 	return 0;
1392 }
1393 
1394 static int
1395 rte_event_eth_rx_adapter_init(void)
1396 {
1397 	const char *name = RXA_ADAPTER_ARRAY;
1398 	const struct rte_memzone *mz;
1399 	unsigned int sz;
1400 
1401 	sz = sizeof(*event_eth_rx_adapter) *
1402 	    RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1403 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1404 
1405 	mz = rte_memzone_lookup(name);
1406 	if (mz == NULL) {
1407 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1408 						 RTE_CACHE_LINE_SIZE);
1409 		if (mz == NULL) {
1410 			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1411 					PRId32, rte_errno);
1412 			return -rte_errno;
1413 		}
1414 	}
1415 
1416 	event_eth_rx_adapter = mz->addr;
1417 	return 0;
1418 }
1419 
1420 static int
1421 rxa_memzone_lookup(void)
1422 {
1423 	const struct rte_memzone *mz;
1424 
1425 	if (event_eth_rx_adapter == NULL) {
1426 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1427 		if (mz == NULL)
1428 			return -ENOMEM;
1429 		event_eth_rx_adapter = mz->addr;
1430 	}
1431 
1432 	return 0;
1433 }
1434 
1435 static inline struct event_eth_rx_adapter *
1436 rxa_id_to_adapter(uint8_t id)
1437 {
1438 	return event_eth_rx_adapter ?
1439 		event_eth_rx_adapter[id] : NULL;
1440 }
1441 
1442 static int
1443 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1444 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1445 {
1446 	int ret;
1447 	struct rte_eventdev *dev;
1448 	struct rte_event_dev_config dev_conf;
1449 	int started;
1450 	uint8_t port_id;
1451 	struct rte_event_port_conf *port_conf = arg;
1452 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1453 
1454 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1455 	dev_conf = dev->data->dev_conf;
1456 
1457 	started = dev->data->dev_started;
1458 	if (started)
1459 		rte_event_dev_stop(dev_id);
1460 	port_id = dev_conf.nb_event_ports;
1461 	dev_conf.nb_event_ports += 1;
1462 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1463 	if (ret) {
1464 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1465 						dev_id);
1466 		if (started) {
1467 			if (rte_event_dev_start(dev_id))
1468 				return -EIO;
1469 		}
1470 		return ret;
1471 	}
1472 
1473 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1474 	if (ret) {
1475 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1476 					port_id);
1477 		return ret;
1478 	}
1479 
1480 	conf->event_port_id = port_id;
1481 	conf->max_nb_rx = 128;
1482 	if (started)
1483 		ret = rte_event_dev_start(dev_id);
1484 	rx_adapter->default_cb_arg = 1;
1485 	return ret;
1486 }
1487 
1488 static int
1489 rxa_epoll_create1(void)
1490 {
1491 #if defined(LINUX)
1492 	int fd;
1493 	fd = epoll_create1(EPOLL_CLOEXEC);
1494 	return fd < 0 ? -errno : fd;
1495 #elif defined(BSD)
1496 	return -ENOTSUP;
1497 #endif
1498 }
1499 
1500 static int
1501 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1502 {
1503 	if (rx_adapter->epd != INIT_FD)
1504 		return 0;
1505 
1506 	rx_adapter->epd = rxa_epoll_create1();
1507 	if (rx_adapter->epd < 0) {
1508 		int err = rx_adapter->epd;
1509 		rx_adapter->epd = INIT_FD;
1510 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1511 		return err;
1512 	}
1513 
1514 	return 0;
1515 }
1516 
1517 static int
1518 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1519 {
1520 	int err;
1521 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1522 
1523 	if (rx_adapter->intr_ring)
1524 		return 0;
1525 
1526 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1527 					RTE_EVENT_ETH_INTR_RING_SIZE,
1528 					rte_socket_id(), 0);
1529 	if (!rx_adapter->intr_ring)
1530 		return -ENOMEM;
1531 
1532 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1533 					RTE_EVENT_ETH_INTR_RING_SIZE *
1534 					sizeof(struct rte_epoll_event),
1535 					RTE_CACHE_LINE_SIZE,
1536 					rx_adapter->socket_id);
1537 	if (!rx_adapter->epoll_events) {
1538 		err = -ENOMEM;
1539 		goto error;
1540 	}
1541 
1542 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1543 
1544 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1545 			"rx-intr-thread-%d", rx_adapter->id);
1546 
1547 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1548 				NULL, rxa_intr_thread, rx_adapter);
1549 	if (!err)
1550 		return 0;
1551 
1552 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1553 	rte_free(rx_adapter->epoll_events);
1554 error:
1555 	rte_ring_free(rx_adapter->intr_ring);
1556 	rx_adapter->intr_ring = NULL;
1557 	rx_adapter->epoll_events = NULL;
1558 	return err;
1559 }
1560 
1561 static int
1562 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1563 {
1564 	int err;
1565 
1566 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1567 	if (err)
1568 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1569 				err);
1570 
1571 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1572 	if (err)
1573 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1574 
1575 	rte_free(rx_adapter->epoll_events);
1576 	rte_ring_free(rx_adapter->intr_ring);
1577 	rx_adapter->intr_ring = NULL;
1578 	rx_adapter->epoll_events = NULL;
1579 	return 0;
1580 }
1581 
1582 static int
1583 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1584 {
1585 	int ret;
1586 
1587 	if (rx_adapter->num_rx_intr == 0)
1588 		return 0;
1589 
1590 	ret = rxa_destroy_intr_thread(rx_adapter);
1591 	if (ret)
1592 		return ret;
1593 
1594 	close(rx_adapter->epd);
1595 	rx_adapter->epd = INIT_FD;
1596 
1597 	return ret;
1598 }
1599 
1600 static int
1601 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1602 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1603 {
1604 	int err;
1605 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1606 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1607 
1608 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1609 	if (err) {
1610 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1611 			rx_queue_id);
1612 		return err;
1613 	}
1614 
1615 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1616 					rx_adapter->epd,
1617 					RTE_INTR_EVENT_DEL,
1618 					0);
1619 	if (err)
1620 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1621 
1622 	if (sintr)
1623 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1624 	else
1625 		dev_info->shared_intr_enabled = 0;
1626 	return err;
1627 }
1628 
1629 static int
1630 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1631 		   struct eth_device_info *dev_info, int rx_queue_id)
1632 {
1633 	int err;
1634 	int i;
1635 	int s;
1636 
1637 	if (dev_info->nb_rx_intr == 0)
1638 		return 0;
1639 
1640 	err = 0;
1641 	if (rx_queue_id == -1) {
1642 		s = dev_info->nb_shared_intr;
1643 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1644 			int sintr;
1645 			uint16_t q;
1646 
1647 			q = dev_info->intr_queue[i];
1648 			sintr = rxa_shared_intr(dev_info, q);
1649 			s -= sintr;
1650 
1651 			if (!sintr || s == 0) {
1652 
1653 				err = rxa_disable_intr(rx_adapter, dev_info,
1654 						q);
1655 				if (err)
1656 					return err;
1657 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1658 							q);
1659 			}
1660 		}
1661 	} else {
1662 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1663 			return 0;
1664 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1665 				dev_info->nb_shared_intr == 1) {
1666 			err = rxa_disable_intr(rx_adapter, dev_info,
1667 					rx_queue_id);
1668 			if (err)
1669 				return err;
1670 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1671 						rx_queue_id);
1672 		}
1673 
1674 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1675 			if (dev_info->intr_queue[i] == rx_queue_id) {
1676 				for (; i < dev_info->nb_rx_intr - 1; i++)
1677 					dev_info->intr_queue[i] =
1678 						dev_info->intr_queue[i + 1];
1679 				break;
1680 			}
1681 		}
1682 	}
1683 
1684 	return err;
1685 }
1686 
1687 static int
1688 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1689 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1690 {
1691 	int err, err1;
1692 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1693 	union queue_data qd;
1694 	int init_fd;
1695 	uint16_t *intr_queue;
1696 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1697 
1698 	if (rxa_intr_queue(dev_info, rx_queue_id))
1699 		return 0;
1700 
1701 	intr_queue = dev_info->intr_queue;
1702 	if (dev_info->intr_queue == NULL) {
1703 		size_t len =
1704 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1705 		dev_info->intr_queue =
1706 			rte_zmalloc_socket(
1707 				rx_adapter->mem_name,
1708 				len,
1709 				0,
1710 				rx_adapter->socket_id);
1711 		if (dev_info->intr_queue == NULL)
1712 			return -ENOMEM;
1713 	}
1714 
1715 	init_fd = rx_adapter->epd;
1716 	err = rxa_init_epd(rx_adapter);
1717 	if (err)
1718 		goto err_free_queue;
1719 
1720 	qd.port = eth_dev_id;
1721 	qd.queue = rx_queue_id;
1722 
1723 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1724 					rx_adapter->epd,
1725 					RTE_INTR_EVENT_ADD,
1726 					qd.ptr);
1727 	if (err) {
1728 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1729 			" Rx Queue %u err %d", rx_queue_id, err);
1730 		goto err_del_fd;
1731 	}
1732 
1733 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1734 	if (err) {
1735 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1736 				" Rx Queue %u err %d", rx_queue_id, err);
1737 
1738 		goto err_del_event;
1739 	}
1740 
1741 	err = rxa_create_intr_thread(rx_adapter);
1742 	if (!err)  {
1743 		if (sintr)
1744 			dev_info->shared_intr_enabled = 1;
1745 		else
1746 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1747 		return 0;
1748 	}
1749 
1750 
1751 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1752 	if (err)
1753 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1754 				" Rx Queue %u err %d", rx_queue_id, err);
1755 err_del_event:
1756 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1757 					rx_adapter->epd,
1758 					RTE_INTR_EVENT_DEL,
1759 					0);
1760 	if (err1) {
1761 		RTE_EDEV_LOG_ERR("Could not delete event for"
1762 				" Rx Queue %u err %d", rx_queue_id, err1);
1763 	}
1764 err_del_fd:
1765 	if (init_fd == INIT_FD) {
1766 		close(rx_adapter->epd);
1767 		rx_adapter->epd = -1;
1768 	}
1769 err_free_queue:
1770 	if (intr_queue == NULL)
1771 		rte_free(dev_info->intr_queue);
1772 
1773 	return err;
1774 }
1775 
1776 static int
1777 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1778 		   struct eth_device_info *dev_info, int rx_queue_id)
1779 
1780 {
1781 	int i, j, err;
1782 	int si = -1;
1783 	int shared_done = (dev_info->nb_shared_intr > 0);
1784 
1785 	if (rx_queue_id != -1) {
1786 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1787 			return 0;
1788 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1789 	}
1790 
1791 	err = 0;
1792 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1793 
1794 		if (rxa_shared_intr(dev_info, i) && shared_done)
1795 			continue;
1796 
1797 		err = rxa_config_intr(rx_adapter, dev_info, i);
1798 
1799 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1800 		if (shared_done) {
1801 			si = i;
1802 			dev_info->shared_intr_enabled = 1;
1803 		}
1804 		if (err)
1805 			break;
1806 	}
1807 
1808 	if (err == 0)
1809 		return 0;
1810 
1811 	shared_done = (dev_info->nb_shared_intr > 0);
1812 	for (j = 0; j < i; j++) {
1813 		if (rxa_intr_queue(dev_info, j))
1814 			continue;
1815 		if (rxa_shared_intr(dev_info, j) && si != j)
1816 			continue;
1817 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1818 		if (err)
1819 			break;
1820 
1821 	}
1822 
1823 	return err;
1824 }
1825 
1826 static int
1827 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1828 {
1829 	int ret;
1830 	struct rte_service_spec service;
1831 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1832 
1833 	if (rx_adapter->service_inited)
1834 		return 0;
1835 
1836 	memset(&service, 0, sizeof(service));
1837 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1838 		"rte_event_eth_rx_adapter_%d", id);
1839 	service.socket_id = rx_adapter->socket_id;
1840 	service.callback = rxa_service_func;
1841 	service.callback_userdata = rx_adapter;
1842 	/* Service function handles locking for queue add/del updates */
1843 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1844 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1845 	if (ret) {
1846 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1847 			service.name, ret);
1848 		return ret;
1849 	}
1850 
1851 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1852 		&rx_adapter_conf, rx_adapter->conf_arg);
1853 	if (ret) {
1854 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1855 			ret);
1856 		goto err_done;
1857 	}
1858 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1859 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1860 	rx_adapter->service_inited = 1;
1861 	rx_adapter->epd = INIT_FD;
1862 	return 0;
1863 
1864 err_done:
1865 	rte_service_component_unregister(rx_adapter->service_id);
1866 	return ret;
1867 }
1868 
1869 static void
1870 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1871 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1872 		 uint8_t add)
1873 {
1874 	struct eth_rx_queue_info *queue_info;
1875 	int enabled;
1876 	uint16_t i;
1877 
1878 	if (dev_info->rx_queue == NULL)
1879 		return;
1880 
1881 	if (rx_queue_id == -1) {
1882 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1883 			rxa_update_queue(rx_adapter, dev_info, i, add);
1884 	} else {
1885 		queue_info = &dev_info->rx_queue[rx_queue_id];
1886 		enabled = queue_info->queue_enabled;
1887 		if (add) {
1888 			rx_adapter->nb_queues += !enabled;
1889 			dev_info->nb_dev_queues += !enabled;
1890 		} else {
1891 			rx_adapter->nb_queues -= enabled;
1892 			dev_info->nb_dev_queues -= enabled;
1893 		}
1894 		queue_info->queue_enabled = !!add;
1895 	}
1896 }
1897 
1898 static void
1899 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1900 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1901 		    uint16_t port_id)
1902 {
1903 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1904 	struct eth_rx_vector_data *vector_data;
1905 	uint32_t flow_id;
1906 
1907 	vector_data = &queue_info->vector_data;
1908 	vector_data->max_vector_count = vector_count;
1909 	vector_data->port = port_id;
1910 	vector_data->queue = qid;
1911 	vector_data->vector_pool = mp;
1912 	vector_data->vector_timeout_ticks =
1913 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1914 	vector_data->ts = 0;
1915 	flow_id = queue_info->event & 0xFFFFF;
1916 	flow_id =
1917 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1918 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1919 }
1920 
1921 static void
1922 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
1923 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
1924 {
1925 	struct eth_rx_vector_data *vec;
1926 	int pollq;
1927 	int intrq;
1928 	int sintrq;
1929 
1930 
1931 	if (rx_adapter->nb_queues == 0)
1932 		return;
1933 
1934 	if (rx_queue_id == -1) {
1935 		uint16_t nb_rx_queues;
1936 		uint16_t i;
1937 
1938 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1939 		for (i = 0; i <	nb_rx_queues; i++)
1940 			rxa_sw_del(rx_adapter, dev_info, i);
1941 		return;
1942 	}
1943 
1944 	/* Push all the partial event vectors to event device. */
1945 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1946 		if (vec->queue != rx_queue_id)
1947 			continue;
1948 		rxa_vector_expire(vec, rx_adapter);
1949 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1950 	}
1951 
1952 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1953 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1954 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1955 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1956 	rx_adapter->num_rx_polled -= pollq;
1957 	dev_info->nb_rx_poll -= pollq;
1958 	rx_adapter->num_rx_intr -= intrq;
1959 	dev_info->nb_rx_intr -= intrq;
1960 	dev_info->nb_shared_intr -= intrq && sintrq;
1961 	if (rx_adapter->use_queue_event_buf) {
1962 		struct eth_event_enqueue_buffer *event_buf =
1963 			dev_info->rx_queue[rx_queue_id].event_buf;
1964 		struct rte_event_eth_rx_adapter_stats *stats =
1965 			dev_info->rx_queue[rx_queue_id].stats;
1966 		rte_free(event_buf->events);
1967 		rte_free(event_buf);
1968 		rte_free(stats);
1969 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
1970 		dev_info->rx_queue[rx_queue_id].stats = NULL;
1971 	}
1972 }
1973 
1974 static int
1975 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
1976 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
1977 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
1978 {
1979 	struct eth_rx_queue_info *queue_info;
1980 	const struct rte_event *ev = &conf->ev;
1981 	int pollq;
1982 	int intrq;
1983 	int sintrq;
1984 	struct rte_event *qi_ev;
1985 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
1986 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1987 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1988 	int ret;
1989 
1990 	if (rx_queue_id == -1) {
1991 		uint16_t nb_rx_queues;
1992 		uint16_t i;
1993 
1994 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1995 		for (i = 0; i <	nb_rx_queues; i++) {
1996 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
1997 			if (ret)
1998 				return ret;
1999 		}
2000 		return 0;
2001 	}
2002 
2003 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2004 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2005 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2006 
2007 	queue_info = &dev_info->rx_queue[rx_queue_id];
2008 	queue_info->wt = conf->servicing_weight;
2009 
2010 	qi_ev = (struct rte_event *)&queue_info->event;
2011 	qi_ev->event = ev->event;
2012 	qi_ev->op = RTE_EVENT_OP_NEW;
2013 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2014 	qi_ev->sub_event_type = 0;
2015 
2016 	if (conf->rx_queue_flags &
2017 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2018 		queue_info->flow_id_mask = ~0;
2019 	} else
2020 		qi_ev->flow_id = 0;
2021 
2022 	if (conf->rx_queue_flags &
2023 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2024 		queue_info->ena_vector = 1;
2025 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2026 		rxa_set_vector_data(queue_info, conf->vector_sz,
2027 				    conf->vector_timeout_ns, conf->vector_mp,
2028 				    rx_queue_id, dev_info->dev->data->port_id);
2029 		rx_adapter->ena_vector = 1;
2030 		rx_adapter->vector_tmo_ticks =
2031 			rx_adapter->vector_tmo_ticks ?
2032 				      RTE_MIN(queue_info->vector_data
2033 							.vector_timeout_ticks >>
2034 						1,
2035 					rx_adapter->vector_tmo_ticks) :
2036 				queue_info->vector_data.vector_timeout_ticks >>
2037 					1;
2038 	}
2039 
2040 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2041 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2042 		rx_adapter->num_rx_polled += !pollq;
2043 		dev_info->nb_rx_poll += !pollq;
2044 		rx_adapter->num_rx_intr -= intrq;
2045 		dev_info->nb_rx_intr -= intrq;
2046 		dev_info->nb_shared_intr -= intrq && sintrq;
2047 	}
2048 
2049 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2050 		rx_adapter->num_rx_polled -= pollq;
2051 		dev_info->nb_rx_poll -= pollq;
2052 		rx_adapter->num_rx_intr += !intrq;
2053 		dev_info->nb_rx_intr += !intrq;
2054 		dev_info->nb_shared_intr += !intrq && sintrq;
2055 		if (dev_info->nb_shared_intr == 1) {
2056 			if (dev_info->multi_intr_cap)
2057 				dev_info->next_q_idx =
2058 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2059 			else
2060 				dev_info->next_q_idx = 0;
2061 		}
2062 	}
2063 
2064 	if (!rx_adapter->use_queue_event_buf)
2065 		return 0;
2066 
2067 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2068 				sizeof(*new_rx_buf), 0,
2069 				rte_eth_dev_socket_id(eth_dev_id));
2070 	if (new_rx_buf == NULL) {
2071 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2072 				 "dev_id: %d queue_id: %d",
2073 				 eth_dev_id, rx_queue_id);
2074 		return -ENOMEM;
2075 	}
2076 
2077 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2078 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2079 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2080 				sizeof(struct rte_event) *
2081 				new_rx_buf->events_size, 0,
2082 				rte_eth_dev_socket_id(eth_dev_id));
2083 	if (new_rx_buf->events == NULL) {
2084 		rte_free(new_rx_buf);
2085 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2086 				 "dev_id: %d queue_id: %d",
2087 				 eth_dev_id, rx_queue_id);
2088 		return -ENOMEM;
2089 	}
2090 
2091 	queue_info->event_buf = new_rx_buf;
2092 
2093 	/* Allocate storage for adapter queue stats */
2094 	stats = rte_zmalloc_socket("rx_queue_stats",
2095 				sizeof(*stats), 0,
2096 				rte_eth_dev_socket_id(eth_dev_id));
2097 	if (stats == NULL) {
2098 		rte_free(new_rx_buf->events);
2099 		rte_free(new_rx_buf);
2100 		RTE_EDEV_LOG_ERR("Failed to allocate stats storage for"
2101 				 " dev_id: %d queue_id: %d",
2102 				 eth_dev_id, rx_queue_id);
2103 		return -ENOMEM;
2104 	}
2105 
2106 	queue_info->stats = stats;
2107 
2108 	return 0;
2109 }
2110 
2111 static int
2112 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2113 	   int rx_queue_id,
2114 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2115 {
2116 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2117 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2118 	int ret;
2119 	struct eth_rx_poll_entry *rx_poll;
2120 	struct eth_rx_queue_info *rx_queue;
2121 	uint32_t *rx_wrr;
2122 	uint16_t nb_rx_queues;
2123 	uint32_t nb_rx_poll, nb_wrr;
2124 	uint32_t nb_rx_intr;
2125 	int num_intr_vec;
2126 	uint16_t wt;
2127 
2128 	if (queue_conf->servicing_weight == 0) {
2129 		struct rte_eth_dev_data *data = dev_info->dev->data;
2130 
2131 		temp_conf = *queue_conf;
2132 		if (!data->dev_conf.intr_conf.rxq) {
2133 			/* If Rx interrupts are disabled set wt = 1 */
2134 			temp_conf.servicing_weight = 1;
2135 		}
2136 		queue_conf = &temp_conf;
2137 
2138 		if (queue_conf->servicing_weight == 0 &&
2139 		    rx_adapter->use_queue_event_buf) {
2140 
2141 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2142 					 "not supported for interrupt queues "
2143 					 "dev_id: %d queue_id: %d",
2144 					 eth_dev_id, rx_queue_id);
2145 			return -EINVAL;
2146 		}
2147 	}
2148 
2149 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2150 	rx_queue = dev_info->rx_queue;
2151 	wt = queue_conf->servicing_weight;
2152 
2153 	if (dev_info->rx_queue == NULL) {
2154 		dev_info->rx_queue =
2155 		    rte_zmalloc_socket(rx_adapter->mem_name,
2156 				       nb_rx_queues *
2157 				       sizeof(struct eth_rx_queue_info), 0,
2158 				       rx_adapter->socket_id);
2159 		if (dev_info->rx_queue == NULL)
2160 			return -ENOMEM;
2161 	}
2162 	rx_wrr = NULL;
2163 	rx_poll = NULL;
2164 
2165 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2166 			queue_conf->servicing_weight,
2167 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2168 
2169 	if (dev_info->dev->intr_handle)
2170 		dev_info->multi_intr_cap =
2171 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2172 
2173 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2174 				&rx_poll, &rx_wrr);
2175 	if (ret)
2176 		goto err_free_rxqueue;
2177 
2178 	if (wt == 0) {
2179 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2180 
2181 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2182 		if (ret)
2183 			goto err_free_rxqueue;
2184 
2185 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2186 		if (ret)
2187 			goto err_free_rxqueue;
2188 	} else {
2189 
2190 		num_intr_vec = 0;
2191 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2192 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2193 						rx_queue_id, 0);
2194 			/* interrupt based queues are being converted to
2195 			 * poll mode queues, delete the interrupt configuration
2196 			 * for those.
2197 			 */
2198 			ret = rxa_del_intr_queue(rx_adapter,
2199 						dev_info, rx_queue_id);
2200 			if (ret)
2201 				goto err_free_rxqueue;
2202 		}
2203 	}
2204 
2205 	if (nb_rx_intr == 0) {
2206 		ret = rxa_free_intr_resources(rx_adapter);
2207 		if (ret)
2208 			goto err_free_rxqueue;
2209 	}
2210 
2211 	if (wt == 0) {
2212 		uint16_t i;
2213 
2214 		if (rx_queue_id  == -1) {
2215 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2216 				dev_info->intr_queue[i] = i;
2217 		} else {
2218 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2219 				dev_info->intr_queue[nb_rx_intr - 1] =
2220 					rx_queue_id;
2221 		}
2222 	}
2223 
2224 
2225 
2226 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2227 	if (ret)
2228 		goto err_free_rxqueue;
2229 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2230 
2231 	rte_free(rx_adapter->eth_rx_poll);
2232 	rte_free(rx_adapter->wrr_sched);
2233 
2234 	rx_adapter->eth_rx_poll = rx_poll;
2235 	rx_adapter->wrr_sched = rx_wrr;
2236 	rx_adapter->wrr_len = nb_wrr;
2237 	rx_adapter->num_intr_vec += num_intr_vec;
2238 	return 0;
2239 
2240 err_free_rxqueue:
2241 	if (rx_queue == NULL) {
2242 		rte_free(dev_info->rx_queue);
2243 		dev_info->rx_queue = NULL;
2244 	}
2245 
2246 	rte_free(rx_poll);
2247 	rte_free(rx_wrr);
2248 
2249 	return ret;
2250 }
2251 
2252 static int
2253 rxa_ctrl(uint8_t id, int start)
2254 {
2255 	struct event_eth_rx_adapter *rx_adapter;
2256 	struct rte_eventdev *dev;
2257 	struct eth_device_info *dev_info;
2258 	uint32_t i;
2259 	int use_service = 0;
2260 	int stop = !start;
2261 
2262 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2263 	rx_adapter = rxa_id_to_adapter(id);
2264 	if (rx_adapter == NULL)
2265 		return -EINVAL;
2266 
2267 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2268 
2269 	RTE_ETH_FOREACH_DEV(i) {
2270 		dev_info = &rx_adapter->eth_devices[i];
2271 		/* if start  check for num dev queues */
2272 		if (start && !dev_info->nb_dev_queues)
2273 			continue;
2274 		/* if stop check if dev has been started */
2275 		if (stop && !dev_info->dev_rx_started)
2276 			continue;
2277 		use_service |= !dev_info->internal_event_port;
2278 		dev_info->dev_rx_started = start;
2279 		if (dev_info->internal_event_port == 0)
2280 			continue;
2281 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2282 						&rte_eth_devices[i]) :
2283 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2284 						&rte_eth_devices[i]);
2285 	}
2286 
2287 	if (use_service) {
2288 		rte_spinlock_lock(&rx_adapter->rx_lock);
2289 		rx_adapter->rxa_started = start;
2290 		rte_service_runstate_set(rx_adapter->service_id, start);
2291 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2292 	}
2293 
2294 	return 0;
2295 }
2296 
2297 static int
2298 rxa_create(uint8_t id, uint8_t dev_id,
2299 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2300 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2301 	   void *conf_arg)
2302 {
2303 	struct event_eth_rx_adapter *rx_adapter;
2304 	struct eth_event_enqueue_buffer *buf;
2305 	struct rte_event *events;
2306 	int ret;
2307 	int socket_id;
2308 	uint16_t i;
2309 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2310 	const uint8_t default_rss_key[] = {
2311 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2312 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2313 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2314 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2315 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2316 	};
2317 
2318 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2319 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2320 
2321 	if (conf_cb == NULL)
2322 		return -EINVAL;
2323 
2324 	if (event_eth_rx_adapter == NULL) {
2325 		ret = rte_event_eth_rx_adapter_init();
2326 		if (ret)
2327 			return ret;
2328 	}
2329 
2330 	rx_adapter = rxa_id_to_adapter(id);
2331 	if (rx_adapter != NULL) {
2332 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2333 		return -EEXIST;
2334 	}
2335 
2336 	socket_id = rte_event_dev_socket_id(dev_id);
2337 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2338 		"rte_event_eth_rx_adapter_%d",
2339 		id);
2340 
2341 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2342 			RTE_CACHE_LINE_SIZE, socket_id);
2343 	if (rx_adapter == NULL) {
2344 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2345 		return -ENOMEM;
2346 	}
2347 
2348 	rx_adapter->eventdev_id = dev_id;
2349 	rx_adapter->socket_id = socket_id;
2350 	rx_adapter->conf_cb = conf_cb;
2351 	rx_adapter->conf_arg = conf_arg;
2352 	rx_adapter->id = id;
2353 	TAILQ_INIT(&rx_adapter->vector_list);
2354 	strcpy(rx_adapter->mem_name, mem_name);
2355 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2356 					RTE_MAX_ETHPORTS *
2357 					sizeof(struct eth_device_info), 0,
2358 					socket_id);
2359 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2360 			(uint32_t *)rx_adapter->rss_key_be,
2361 			    RTE_DIM(default_rss_key));
2362 
2363 	if (rx_adapter->eth_devices == NULL) {
2364 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2365 		rte_free(rx_adapter);
2366 		return -ENOMEM;
2367 	}
2368 
2369 	rte_spinlock_init(&rx_adapter->rx_lock);
2370 
2371 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2372 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2373 
2374 	/* Rx adapter event buffer allocation */
2375 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2376 
2377 	if (!rx_adapter->use_queue_event_buf) {
2378 		buf = &rx_adapter->event_enqueue_buffer;
2379 		buf->events_size = rxa_params->event_buf_size;
2380 
2381 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2382 					    buf->events_size * sizeof(*events),
2383 					    0, socket_id);
2384 		if (events == NULL) {
2385 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2386 					 "for adapter event buffer");
2387 			rte_free(rx_adapter->eth_devices);
2388 			rte_free(rx_adapter);
2389 			return -ENOMEM;
2390 		}
2391 
2392 		rx_adapter->event_enqueue_buffer.events = events;
2393 	}
2394 
2395 	event_eth_rx_adapter[id] = rx_adapter;
2396 
2397 	if (conf_cb == rxa_default_conf_cb)
2398 		rx_adapter->default_cb_arg = 1;
2399 
2400 	if (rte_mbuf_dyn_rx_timestamp_register(
2401 			&event_eth_rx_timestamp_dynfield_offset,
2402 			&event_eth_rx_timestamp_dynflag) != 0) {
2403 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2404 		return -rte_errno;
2405 	}
2406 
2407 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2408 		conf_arg);
2409 	return 0;
2410 }
2411 
2412 int
2413 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2414 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2415 				void *conf_arg)
2416 {
2417 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2418 
2419 	/* use default values for adapter params */
2420 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2421 	rxa_params.use_queue_event_buf = false;
2422 
2423 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2424 }
2425 
2426 int
2427 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2428 			struct rte_event_port_conf *port_config,
2429 			struct rte_event_eth_rx_adapter_params *rxa_params)
2430 {
2431 	struct rte_event_port_conf *pc;
2432 	int ret;
2433 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2434 
2435 	if (port_config == NULL)
2436 		return -EINVAL;
2437 
2438 	if (rxa_params == NULL) {
2439 		/* use default values if rxa_params is NULL */
2440 		rxa_params = &temp_params;
2441 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2442 		rxa_params->use_queue_event_buf = false;
2443 	} else if ((!rxa_params->use_queue_event_buf &&
2444 		    rxa_params->event_buf_size == 0) ||
2445 		   (rxa_params->use_queue_event_buf &&
2446 		    rxa_params->event_buf_size != 0)) {
2447 		RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2448 		return -EINVAL;
2449 	} else if (!rxa_params->use_queue_event_buf) {
2450 		/* adjust event buff size with BATCH_SIZE used for fetching
2451 		 * packets from NIC rx queues to get full buffer utilization
2452 		 * and prevent unnecessary rollovers.
2453 		 */
2454 
2455 		rxa_params->event_buf_size =
2456 			RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2457 		rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2458 	}
2459 
2460 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2461 	if (pc == NULL)
2462 		return -ENOMEM;
2463 
2464 	*pc = *port_config;
2465 
2466 	ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2467 	if (ret)
2468 		rte_free(pc);
2469 
2470 	return ret;
2471 }
2472 
2473 int
2474 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2475 		struct rte_event_port_conf *port_config)
2476 {
2477 	struct rte_event_port_conf *pc;
2478 	int ret;
2479 
2480 	if (port_config == NULL)
2481 		return -EINVAL;
2482 
2483 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2484 
2485 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2486 	if (pc == NULL)
2487 		return -ENOMEM;
2488 	*pc = *port_config;
2489 
2490 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2491 					rxa_default_conf_cb,
2492 					pc);
2493 	if (ret)
2494 		rte_free(pc);
2495 	return ret;
2496 }
2497 
2498 int
2499 rte_event_eth_rx_adapter_free(uint8_t id)
2500 {
2501 	struct event_eth_rx_adapter *rx_adapter;
2502 
2503 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2504 
2505 	rx_adapter = rxa_id_to_adapter(id);
2506 	if (rx_adapter == NULL)
2507 		return -EINVAL;
2508 
2509 	if (rx_adapter->nb_queues) {
2510 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2511 				rx_adapter->nb_queues);
2512 		return -EBUSY;
2513 	}
2514 
2515 	if (rx_adapter->default_cb_arg)
2516 		rte_free(rx_adapter->conf_arg);
2517 	rte_free(rx_adapter->eth_devices);
2518 	if (!rx_adapter->use_queue_event_buf)
2519 		rte_free(rx_adapter->event_enqueue_buffer.events);
2520 	rte_free(rx_adapter);
2521 	event_eth_rx_adapter[id] = NULL;
2522 
2523 	rte_eventdev_trace_eth_rx_adapter_free(id);
2524 	return 0;
2525 }
2526 
2527 int
2528 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2529 		uint16_t eth_dev_id,
2530 		int32_t rx_queue_id,
2531 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2532 {
2533 	int ret;
2534 	uint32_t cap;
2535 	struct event_eth_rx_adapter *rx_adapter;
2536 	struct rte_eventdev *dev;
2537 	struct eth_device_info *dev_info;
2538 	struct rte_event_eth_rx_adapter_vector_limits limits;
2539 
2540 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2541 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2542 
2543 	rx_adapter = rxa_id_to_adapter(id);
2544 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2545 		return -EINVAL;
2546 
2547 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2548 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2549 						eth_dev_id,
2550 						&cap);
2551 	if (ret) {
2552 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2553 			"eth port %" PRIu16, id, eth_dev_id);
2554 		return ret;
2555 	}
2556 
2557 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2558 		&& (queue_conf->rx_queue_flags &
2559 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2560 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2561 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2562 				eth_dev_id, id);
2563 		return -EINVAL;
2564 	}
2565 
2566 	if (queue_conf->rx_queue_flags &
2567 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2568 
2569 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2570 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2571 					 " eth port: %" PRIu16
2572 					 " adapter id: %" PRIu8,
2573 					 eth_dev_id, id);
2574 			return -EINVAL;
2575 		}
2576 
2577 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2578 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2579 		if (ret < 0) {
2580 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2581 					 " eth port: %" PRIu16
2582 					 " adapter id: %" PRIu8,
2583 					 eth_dev_id, id);
2584 			return -EINVAL;
2585 		}
2586 		if (queue_conf->vector_sz < limits.min_sz ||
2587 		    queue_conf->vector_sz > limits.max_sz ||
2588 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2589 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2590 		    queue_conf->vector_mp == NULL) {
2591 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2592 					 " eth port: %" PRIu16
2593 					 " adapter id: %" PRIu8,
2594 					 eth_dev_id, id);
2595 			return -EINVAL;
2596 		}
2597 		if (queue_conf->vector_mp->elt_size <
2598 		    (sizeof(struct rte_event_vector) +
2599 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2600 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2601 					 " eth port: %" PRIu16
2602 					 " adapter id: %" PRIu8,
2603 					 eth_dev_id, id);
2604 			return -EINVAL;
2605 		}
2606 	}
2607 
2608 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2609 		(rx_queue_id != -1)) {
2610 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2611 			"event queue, eth port: %" PRIu16 " adapter id: %"
2612 			PRIu8, eth_dev_id, id);
2613 		return -EINVAL;
2614 	}
2615 
2616 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2617 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2618 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2619 			 (uint16_t)rx_queue_id);
2620 		return -EINVAL;
2621 	}
2622 
2623 	if ((rx_adapter->use_queue_event_buf &&
2624 	     queue_conf->event_buf_size == 0) ||
2625 	    (!rx_adapter->use_queue_event_buf &&
2626 	     queue_conf->event_buf_size != 0)) {
2627 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2628 		return -EINVAL;
2629 	}
2630 
2631 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2632 
2633 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2634 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2635 					-ENOTSUP);
2636 		if (dev_info->rx_queue == NULL) {
2637 			dev_info->rx_queue =
2638 			    rte_zmalloc_socket(rx_adapter->mem_name,
2639 					dev_info->dev->data->nb_rx_queues *
2640 					sizeof(struct eth_rx_queue_info), 0,
2641 					rx_adapter->socket_id);
2642 			if (dev_info->rx_queue == NULL)
2643 				return -ENOMEM;
2644 		}
2645 
2646 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2647 				&rte_eth_devices[eth_dev_id],
2648 				rx_queue_id, queue_conf);
2649 		if (ret == 0) {
2650 			dev_info->internal_event_port = 1;
2651 			rxa_update_queue(rx_adapter,
2652 					&rx_adapter->eth_devices[eth_dev_id],
2653 					rx_queue_id,
2654 					1);
2655 		}
2656 	} else {
2657 		rte_spinlock_lock(&rx_adapter->rx_lock);
2658 		dev_info->internal_event_port = 0;
2659 		ret = rxa_init_service(rx_adapter, id);
2660 		if (ret == 0) {
2661 			uint32_t service_id = rx_adapter->service_id;
2662 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2663 					queue_conf);
2664 			rte_service_component_runstate_set(service_id,
2665 				rxa_sw_adapter_queue_count(rx_adapter));
2666 		}
2667 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2668 	}
2669 
2670 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2671 		rx_queue_id, queue_conf, ret);
2672 	if (ret)
2673 		return ret;
2674 
2675 	return 0;
2676 }
2677 
2678 static int
2679 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2680 {
2681 	limits->max_sz = MAX_VECTOR_SIZE;
2682 	limits->min_sz = MIN_VECTOR_SIZE;
2683 	limits->max_timeout_ns = MAX_VECTOR_NS;
2684 	limits->min_timeout_ns = MIN_VECTOR_NS;
2685 
2686 	return 0;
2687 }
2688 
2689 int
2690 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2691 				int32_t rx_queue_id)
2692 {
2693 	int ret = 0;
2694 	struct rte_eventdev *dev;
2695 	struct event_eth_rx_adapter *rx_adapter;
2696 	struct eth_device_info *dev_info;
2697 	uint32_t cap;
2698 	uint32_t nb_rx_poll = 0;
2699 	uint32_t nb_wrr = 0;
2700 	uint32_t nb_rx_intr;
2701 	struct eth_rx_poll_entry *rx_poll = NULL;
2702 	uint32_t *rx_wrr = NULL;
2703 	int num_intr_vec;
2704 
2705 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2706 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2707 
2708 	rx_adapter = rxa_id_to_adapter(id);
2709 	if (rx_adapter == NULL)
2710 		return -EINVAL;
2711 
2712 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2713 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2714 						eth_dev_id,
2715 						&cap);
2716 	if (ret)
2717 		return ret;
2718 
2719 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2720 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2721 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2722 			 (uint16_t)rx_queue_id);
2723 		return -EINVAL;
2724 	}
2725 
2726 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2727 
2728 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2729 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2730 				 -ENOTSUP);
2731 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2732 						&rte_eth_devices[eth_dev_id],
2733 						rx_queue_id);
2734 		if (ret == 0) {
2735 			rxa_update_queue(rx_adapter,
2736 					&rx_adapter->eth_devices[eth_dev_id],
2737 					rx_queue_id,
2738 					0);
2739 			if (dev_info->nb_dev_queues == 0) {
2740 				rte_free(dev_info->rx_queue);
2741 				dev_info->rx_queue = NULL;
2742 			}
2743 		}
2744 	} else {
2745 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2746 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2747 
2748 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2749 			&rx_poll, &rx_wrr);
2750 		if (ret)
2751 			return ret;
2752 
2753 		rte_spinlock_lock(&rx_adapter->rx_lock);
2754 
2755 		num_intr_vec = 0;
2756 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2757 
2758 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2759 						rx_queue_id, 0);
2760 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2761 					rx_queue_id);
2762 			if (ret)
2763 				goto unlock_ret;
2764 		}
2765 
2766 		if (nb_rx_intr == 0) {
2767 			ret = rxa_free_intr_resources(rx_adapter);
2768 			if (ret)
2769 				goto unlock_ret;
2770 		}
2771 
2772 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2773 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2774 
2775 		rte_free(rx_adapter->eth_rx_poll);
2776 		rte_free(rx_adapter->wrr_sched);
2777 
2778 		if (nb_rx_intr == 0) {
2779 			rte_free(dev_info->intr_queue);
2780 			dev_info->intr_queue = NULL;
2781 		}
2782 
2783 		rx_adapter->eth_rx_poll = rx_poll;
2784 		rx_adapter->wrr_sched = rx_wrr;
2785 		rx_adapter->wrr_len = nb_wrr;
2786 		/*
2787 		 * reset next poll start position (wrr_pos) to avoid buffer
2788 		 * overrun when wrr_len is reduced in case of queue delete
2789 		 */
2790 		rx_adapter->wrr_pos = 0;
2791 		rx_adapter->num_intr_vec += num_intr_vec;
2792 
2793 		if (dev_info->nb_dev_queues == 0) {
2794 			rte_free(dev_info->rx_queue);
2795 			dev_info->rx_queue = NULL;
2796 		}
2797 unlock_ret:
2798 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2799 		if (ret) {
2800 			rte_free(rx_poll);
2801 			rte_free(rx_wrr);
2802 			return ret;
2803 		}
2804 
2805 		rte_service_component_runstate_set(rx_adapter->service_id,
2806 				rxa_sw_adapter_queue_count(rx_adapter));
2807 	}
2808 
2809 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2810 		rx_queue_id, ret);
2811 	return ret;
2812 }
2813 
2814 int
2815 rte_event_eth_rx_adapter_vector_limits_get(
2816 	uint8_t dev_id, uint16_t eth_port_id,
2817 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2818 {
2819 	struct rte_eventdev *dev;
2820 	uint32_t cap;
2821 	int ret;
2822 
2823 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2824 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2825 
2826 	if (limits == NULL)
2827 		return -EINVAL;
2828 
2829 	dev = &rte_eventdevs[dev_id];
2830 
2831 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2832 	if (ret) {
2833 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2834 				 "eth port %" PRIu16,
2835 				 dev_id, eth_port_id);
2836 		return ret;
2837 	}
2838 
2839 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2840 		RTE_FUNC_PTR_OR_ERR_RET(
2841 			*dev->dev_ops->eth_rx_adapter_vector_limits_get,
2842 			-ENOTSUP);
2843 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2844 			dev, &rte_eth_devices[eth_port_id], limits);
2845 	} else {
2846 		ret = rxa_sw_vector_limits(limits);
2847 	}
2848 
2849 	return ret;
2850 }
2851 
2852 int
2853 rte_event_eth_rx_adapter_start(uint8_t id)
2854 {
2855 	rte_eventdev_trace_eth_rx_adapter_start(id);
2856 	return rxa_ctrl(id, 1);
2857 }
2858 
2859 int
2860 rte_event_eth_rx_adapter_stop(uint8_t id)
2861 {
2862 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2863 	return rxa_ctrl(id, 0);
2864 }
2865 
2866 static inline void
2867 rxa_queue_stats_reset(struct eth_rx_queue_info *queue_info)
2868 {
2869 	struct rte_event_eth_rx_adapter_stats *q_stats;
2870 
2871 	q_stats = queue_info->stats;
2872 	memset(q_stats, 0, sizeof(*q_stats));
2873 }
2874 
2875 int
2876 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2877 			       struct rte_event_eth_rx_adapter_stats *stats)
2878 {
2879 	struct event_eth_rx_adapter *rx_adapter;
2880 	struct eth_event_enqueue_buffer *buf;
2881 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2882 	struct rte_event_eth_rx_adapter_stats dev_stats;
2883 	struct rte_eventdev *dev;
2884 	struct eth_device_info *dev_info;
2885 	struct eth_rx_queue_info *queue_info;
2886 	struct rte_event_eth_rx_adapter_stats *q_stats;
2887 	uint32_t i, j;
2888 	int ret;
2889 
2890 	if (rxa_memzone_lookup())
2891 		return -ENOMEM;
2892 
2893 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2894 
2895 	rx_adapter = rxa_id_to_adapter(id);
2896 	if (rx_adapter  == NULL || stats == NULL)
2897 		return -EINVAL;
2898 
2899 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2900 	memset(stats, 0, sizeof(*stats));
2901 
2902 	if (rx_adapter->service_inited)
2903 		*stats = rx_adapter->stats;
2904 
2905 	RTE_ETH_FOREACH_DEV(i) {
2906 		dev_info = &rx_adapter->eth_devices[i];
2907 
2908 		if (rx_adapter->use_queue_event_buf && dev_info->rx_queue) {
2909 
2910 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
2911 			     j++) {
2912 				queue_info = &dev_info->rx_queue[j];
2913 				if (!queue_info->queue_enabled)
2914 					continue;
2915 				q_stats = queue_info->stats;
2916 
2917 				stats->rx_packets += q_stats->rx_packets;
2918 				stats->rx_poll_count += q_stats->rx_poll_count;
2919 				stats->rx_enq_count += q_stats->rx_enq_count;
2920 				stats->rx_enq_retry += q_stats->rx_enq_retry;
2921 				stats->rx_dropped += q_stats->rx_dropped;
2922 				stats->rx_enq_block_cycles +=
2923 						q_stats->rx_enq_block_cycles;
2924 			}
2925 		}
2926 
2927 		if (dev_info->internal_event_port == 0 ||
2928 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2929 			continue;
2930 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2931 						&rte_eth_devices[i],
2932 						&dev_stats);
2933 		if (ret)
2934 			continue;
2935 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
2936 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2937 	}
2938 
2939 	buf = &rx_adapter->event_enqueue_buffer;
2940 	stats->rx_packets += dev_stats_sum.rx_packets;
2941 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2942 	stats->rx_event_buf_count = buf->count;
2943 	stats->rx_event_buf_size = buf->events_size;
2944 
2945 	return 0;
2946 }
2947 
2948 int
2949 rte_event_eth_rx_adapter_queue_stats_get(uint8_t id,
2950 		uint16_t eth_dev_id,
2951 		uint16_t rx_queue_id,
2952 		struct rte_event_eth_rx_adapter_queue_stats *stats)
2953 {
2954 	struct event_eth_rx_adapter *rx_adapter;
2955 	struct eth_device_info *dev_info;
2956 	struct eth_rx_queue_info *queue_info;
2957 	struct eth_event_enqueue_buffer *event_buf;
2958 	struct rte_event_eth_rx_adapter_stats *q_stats;
2959 	struct rte_eventdev *dev;
2960 
2961 	if (rxa_memzone_lookup())
2962 		return -ENOMEM;
2963 
2964 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2965 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2966 
2967 	rx_adapter = rxa_id_to_adapter(id);
2968 
2969 	if (rx_adapter == NULL || stats == NULL)
2970 		return -EINVAL;
2971 
2972 	if (!rx_adapter->use_queue_event_buf)
2973 		return -EINVAL;
2974 
2975 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2976 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
2977 		return -EINVAL;
2978 	}
2979 
2980 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2981 	if (dev_info->rx_queue == NULL ||
2982 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
2983 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
2984 		return -EINVAL;
2985 	}
2986 
2987 	if (dev_info->internal_event_port == 0) {
2988 		queue_info = &dev_info->rx_queue[rx_queue_id];
2989 		event_buf = queue_info->event_buf;
2990 		q_stats = queue_info->stats;
2991 
2992 		stats->rx_event_buf_count = event_buf->count;
2993 		stats->rx_event_buf_size = event_buf->events_size;
2994 		stats->rx_packets = q_stats->rx_packets;
2995 		stats->rx_poll_count = q_stats->rx_poll_count;
2996 		stats->rx_dropped = q_stats->rx_dropped;
2997 	}
2998 
2999 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3000 	if (dev->dev_ops->eth_rx_adapter_queue_stats_get != NULL) {
3001 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_get)(dev,
3002 						&rte_eth_devices[eth_dev_id],
3003 						rx_queue_id, stats);
3004 	}
3005 
3006 	return 0;
3007 }
3008 
3009 int
3010 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
3011 {
3012 	struct event_eth_rx_adapter *rx_adapter;
3013 	struct rte_eventdev *dev;
3014 	struct eth_device_info *dev_info;
3015 	struct eth_rx_queue_info *queue_info;
3016 	uint32_t i, j;
3017 
3018 	if (rxa_memzone_lookup())
3019 		return -ENOMEM;
3020 
3021 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3022 
3023 	rx_adapter = rxa_id_to_adapter(id);
3024 	if (rx_adapter == NULL)
3025 		return -EINVAL;
3026 
3027 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3028 
3029 	RTE_ETH_FOREACH_DEV(i) {
3030 		dev_info = &rx_adapter->eth_devices[i];
3031 
3032 		if (rx_adapter->use_queue_event_buf  && dev_info->rx_queue) {
3033 
3034 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3035 						j++) {
3036 				queue_info = &dev_info->rx_queue[j];
3037 				if (!queue_info->queue_enabled)
3038 					continue;
3039 				rxa_queue_stats_reset(queue_info);
3040 			}
3041 		}
3042 
3043 		if (dev_info->internal_event_port == 0 ||
3044 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
3045 			continue;
3046 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
3047 							&rte_eth_devices[i]);
3048 	}
3049 
3050 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
3051 
3052 	return 0;
3053 }
3054 
3055 int
3056 rte_event_eth_rx_adapter_queue_stats_reset(uint8_t id,
3057 		uint16_t eth_dev_id,
3058 		uint16_t rx_queue_id)
3059 {
3060 	struct event_eth_rx_adapter *rx_adapter;
3061 	struct eth_device_info *dev_info;
3062 	struct eth_rx_queue_info *queue_info;
3063 	struct rte_eventdev *dev;
3064 
3065 	if (rxa_memzone_lookup())
3066 		return -ENOMEM;
3067 
3068 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3069 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3070 
3071 	rx_adapter = rxa_id_to_adapter(id);
3072 	if (rx_adapter == NULL)
3073 		return -EINVAL;
3074 
3075 	if (!rx_adapter->use_queue_event_buf)
3076 		return -EINVAL;
3077 
3078 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3079 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3080 		return -EINVAL;
3081 	}
3082 
3083 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3084 
3085 	if (dev_info->rx_queue == NULL ||
3086 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3087 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3088 		return -EINVAL;
3089 	}
3090 
3091 	if (dev_info->internal_event_port == 0) {
3092 		queue_info = &dev_info->rx_queue[rx_queue_id];
3093 		rxa_queue_stats_reset(queue_info);
3094 	}
3095 
3096 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3097 	if (dev->dev_ops->eth_rx_adapter_queue_stats_reset != NULL) {
3098 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_reset)(dev,
3099 						&rte_eth_devices[eth_dev_id],
3100 						rx_queue_id);
3101 	}
3102 
3103 	return 0;
3104 }
3105 
3106 int
3107 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
3108 {
3109 	struct event_eth_rx_adapter *rx_adapter;
3110 
3111 	if (rxa_memzone_lookup())
3112 		return -ENOMEM;
3113 
3114 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3115 
3116 	rx_adapter = rxa_id_to_adapter(id);
3117 	if (rx_adapter == NULL || service_id == NULL)
3118 		return -EINVAL;
3119 
3120 	if (rx_adapter->service_inited)
3121 		*service_id = rx_adapter->service_id;
3122 
3123 	return rx_adapter->service_inited ? 0 : -ESRCH;
3124 }
3125 
3126 int
3127 rte_event_eth_rx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
3128 {
3129 	struct event_eth_rx_adapter *rx_adapter;
3130 
3131 	if (rxa_memzone_lookup())
3132 		return -ENOMEM;
3133 
3134 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3135 
3136 	rx_adapter = rxa_id_to_adapter(id);
3137 	if (rx_adapter == NULL || event_port_id == NULL)
3138 		return -EINVAL;
3139 
3140 	if (rx_adapter->service_inited)
3141 		*event_port_id = rx_adapter->event_port_id;
3142 
3143 	return rx_adapter->service_inited ? 0 : -ESRCH;
3144 }
3145 
3146 int
3147 rte_event_eth_rx_adapter_cb_register(uint8_t id,
3148 					uint16_t eth_dev_id,
3149 					rte_event_eth_rx_adapter_cb_fn cb_fn,
3150 					void *cb_arg)
3151 {
3152 	struct event_eth_rx_adapter *rx_adapter;
3153 	struct eth_device_info *dev_info;
3154 	uint32_t cap;
3155 	int ret;
3156 
3157 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3158 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3159 
3160 	rx_adapter = rxa_id_to_adapter(id);
3161 	if (rx_adapter == NULL)
3162 		return -EINVAL;
3163 
3164 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3165 	if (dev_info->rx_queue == NULL)
3166 		return -EINVAL;
3167 
3168 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3169 						eth_dev_id,
3170 						&cap);
3171 	if (ret) {
3172 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3173 			"eth port %" PRIu16, id, eth_dev_id);
3174 		return ret;
3175 	}
3176 
3177 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3178 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
3179 				PRIu16, eth_dev_id);
3180 		return -EINVAL;
3181 	}
3182 
3183 	rte_spinlock_lock(&rx_adapter->rx_lock);
3184 	dev_info->cb_fn = cb_fn;
3185 	dev_info->cb_arg = cb_arg;
3186 	rte_spinlock_unlock(&rx_adapter->rx_lock);
3187 
3188 	return 0;
3189 }
3190 
3191 int
3192 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
3193 			uint16_t eth_dev_id,
3194 			uint16_t rx_queue_id,
3195 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
3196 {
3197 	struct rte_eventdev *dev;
3198 	struct event_eth_rx_adapter *rx_adapter;
3199 	struct eth_device_info *dev_info;
3200 	struct eth_rx_queue_info *queue_info;
3201 	struct rte_event *qi_ev;
3202 	int ret;
3203 
3204 	if (rxa_memzone_lookup())
3205 		return -ENOMEM;
3206 
3207 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3208 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3209 
3210 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3211 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3212 		return -EINVAL;
3213 	}
3214 
3215 	if (queue_conf == NULL) {
3216 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3217 		return -EINVAL;
3218 	}
3219 
3220 	rx_adapter = rxa_id_to_adapter(id);
3221 	if (rx_adapter == NULL)
3222 		return -EINVAL;
3223 
3224 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3225 	if (dev_info->rx_queue == NULL ||
3226 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3227 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3228 		return -EINVAL;
3229 	}
3230 
3231 	queue_info = &dev_info->rx_queue[rx_queue_id];
3232 	qi_ev = (struct rte_event *)&queue_info->event;
3233 
3234 	memset(queue_conf, 0, sizeof(*queue_conf));
3235 	queue_conf->rx_queue_flags = 0;
3236 	if (queue_info->flow_id_mask != 0)
3237 		queue_conf->rx_queue_flags |=
3238 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3239 	queue_conf->servicing_weight = queue_info->wt;
3240 
3241 	memcpy(&queue_conf->ev, qi_ev, sizeof(*qi_ev));
3242 
3243 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3244 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3245 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3246 						&rte_eth_devices[eth_dev_id],
3247 						rx_queue_id,
3248 						queue_conf);
3249 		return ret;
3250 	}
3251 
3252 	return 0;
3253 }
3254 
3255 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_u64(d, #s, stats.s)
3256 
3257 static int
3258 handle_rxa_stats(const char *cmd __rte_unused,
3259 		 const char *params,
3260 		 struct rte_tel_data *d)
3261 {
3262 	uint8_t rx_adapter_id;
3263 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3264 
3265 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3266 		return -1;
3267 
3268 	/* Get Rx adapter ID from parameter string */
3269 	rx_adapter_id = atoi(params);
3270 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3271 
3272 	/* Get Rx adapter stats */
3273 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3274 					       &rx_adptr_stats)) {
3275 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats\n");
3276 		return -1;
3277 	}
3278 
3279 	rte_tel_data_start_dict(d);
3280 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3281 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3282 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3283 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3284 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3285 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3286 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3287 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3288 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3289 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3290 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3291 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3292 
3293 	return 0;
3294 }
3295 
3296 static int
3297 handle_rxa_stats_reset(const char *cmd __rte_unused,
3298 		       const char *params,
3299 		       struct rte_tel_data *d __rte_unused)
3300 {
3301 	uint8_t rx_adapter_id;
3302 
3303 	if (params == NULL || strlen(params) == 0 || ~isdigit(*params))
3304 		return -1;
3305 
3306 	/* Get Rx adapter ID from parameter string */
3307 	rx_adapter_id = atoi(params);
3308 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3309 
3310 	/* Reset Rx adapter stats */
3311 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3312 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats\n");
3313 		return -1;
3314 	}
3315 
3316 	return 0;
3317 }
3318 
3319 static int
3320 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3321 			  const char *params,
3322 			  struct rte_tel_data *d)
3323 {
3324 	uint8_t rx_adapter_id;
3325 	uint16_t rx_queue_id;
3326 	int eth_dev_id;
3327 	char *token, *l_params;
3328 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3329 
3330 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3331 		return -1;
3332 
3333 	/* Get Rx adapter ID from parameter string */
3334 	l_params = strdup(params);
3335 	token = strtok(l_params, ",");
3336 	rx_adapter_id = strtoul(token, NULL, 10);
3337 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3338 
3339 	token = strtok(NULL, ",");
3340 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3341 		return -1;
3342 
3343 	/* Get device ID from parameter string */
3344 	eth_dev_id = strtoul(token, NULL, 10);
3345 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3346 
3347 	token = strtok(NULL, ",");
3348 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3349 		return -1;
3350 
3351 	/* Get Rx queue ID from parameter string */
3352 	rx_queue_id = strtoul(token, NULL, 10);
3353 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3354 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3355 		return -EINVAL;
3356 	}
3357 
3358 	token = strtok(NULL, "\0");
3359 	if (token != NULL)
3360 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3361 				 " telemetry command, ignoring");
3362 
3363 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3364 						    rx_queue_id, &queue_conf)) {
3365 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3366 		return -1;
3367 	}
3368 
3369 	rte_tel_data_start_dict(d);
3370 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3371 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3372 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3373 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3374 	RXA_ADD_DICT(queue_conf, servicing_weight);
3375 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3376 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3377 	RXA_ADD_DICT(queue_conf.ev, priority);
3378 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3379 
3380 	return 0;
3381 }
3382 
3383 static int
3384 handle_rxa_get_queue_stats(const char *cmd __rte_unused,
3385 			   const char *params,
3386 			   struct rte_tel_data *d)
3387 {
3388 	uint8_t rx_adapter_id;
3389 	uint16_t rx_queue_id;
3390 	int eth_dev_id;
3391 	char *token, *l_params;
3392 	struct rte_event_eth_rx_adapter_queue_stats q_stats;
3393 
3394 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3395 		return -1;
3396 
3397 	/* Get Rx adapter ID from parameter string */
3398 	l_params = strdup(params);
3399 	token = strtok(l_params, ",");
3400 	rx_adapter_id = strtoul(token, NULL, 10);
3401 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3402 
3403 	token = strtok(NULL, ",");
3404 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3405 		return -1;
3406 
3407 	/* Get device ID from parameter string */
3408 	eth_dev_id = strtoul(token, NULL, 10);
3409 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3410 
3411 	token = strtok(NULL, ",");
3412 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3413 		return -1;
3414 
3415 	/* Get Rx queue ID from parameter string */
3416 	rx_queue_id = strtoul(token, NULL, 10);
3417 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3418 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3419 		return -EINVAL;
3420 	}
3421 
3422 	token = strtok(NULL, "\0");
3423 	if (token != NULL)
3424 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3425 				 " telemetry command, ignoring");
3426 
3427 	if (rte_event_eth_rx_adapter_queue_stats_get(rx_adapter_id, eth_dev_id,
3428 						    rx_queue_id, &q_stats)) {
3429 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue stats");
3430 		return -1;
3431 	}
3432 
3433 	rte_tel_data_start_dict(d);
3434 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3435 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3436 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3437 	RXA_ADD_DICT(q_stats, rx_event_buf_count);
3438 	RXA_ADD_DICT(q_stats, rx_event_buf_size);
3439 	RXA_ADD_DICT(q_stats, rx_poll_count);
3440 	RXA_ADD_DICT(q_stats, rx_packets);
3441 	RXA_ADD_DICT(q_stats, rx_dropped);
3442 
3443 	return 0;
3444 }
3445 
3446 static int
3447 handle_rxa_queue_stats_reset(const char *cmd __rte_unused,
3448 			     const char *params,
3449 			     struct rte_tel_data *d __rte_unused)
3450 {
3451 	uint8_t rx_adapter_id;
3452 	uint16_t rx_queue_id;
3453 	int eth_dev_id;
3454 	char *token, *l_params;
3455 
3456 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3457 		return -1;
3458 
3459 	/* Get Rx adapter ID from parameter string */
3460 	l_params = strdup(params);
3461 	token = strtok(l_params, ",");
3462 	rx_adapter_id = strtoul(token, NULL, 10);
3463 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3464 
3465 	token = strtok(NULL, ",");
3466 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3467 		return -1;
3468 
3469 	/* Get device ID from parameter string */
3470 	eth_dev_id = strtoul(token, NULL, 10);
3471 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3472 
3473 	token = strtok(NULL, ",");
3474 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3475 		return -1;
3476 
3477 	/* Get Rx queue ID from parameter string */
3478 	rx_queue_id = strtoul(token, NULL, 10);
3479 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3480 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3481 		return -EINVAL;
3482 	}
3483 
3484 	token = strtok(NULL, "\0");
3485 	if (token != NULL)
3486 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3487 				 " telemetry command, ignoring");
3488 
3489 	if (rte_event_eth_rx_adapter_queue_stats_reset(rx_adapter_id,
3490 						       eth_dev_id,
3491 						       rx_queue_id)) {
3492 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter queue stats");
3493 		return -1;
3494 	}
3495 
3496 	return 0;
3497 }
3498 
3499 RTE_INIT(rxa_init_telemetry)
3500 {
3501 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3502 		handle_rxa_stats,
3503 		"Returns Rx adapter stats. Parameter: rxa_id");
3504 
3505 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3506 		handle_rxa_stats_reset,
3507 		"Reset Rx adapter stats. Parameter: rxa_id");
3508 
3509 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3510 		handle_rxa_get_queue_conf,
3511 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3512 
3513 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats",
3514 		handle_rxa_get_queue_stats,
3515 		"Returns Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3516 
3517 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats_reset",
3518 		handle_rxa_queue_stats_reset,
3519 		"Reset Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3520 }
3521