xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 8f1d23ece06adff5eae9f1b4365bdbbd3abee2b2)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <ctype.h>
6 #include <stdlib.h>
7 #if defined(LINUX)
8 #include <sys/epoll.h>
9 #endif
10 #include <unistd.h>
11 
12 #include <rte_cycles.h>
13 #include <rte_common.h>
14 #include <rte_dev.h>
15 #include <rte_errno.h>
16 #include <ethdev_driver.h>
17 #include <rte_log.h>
18 #include <rte_malloc.h>
19 #include <rte_service_component.h>
20 #include <rte_thash.h>
21 #include <rte_interrupts.h>
22 #include <rte_mbuf_dyn.h>
23 #include <rte_telemetry.h>
24 
25 #include "rte_eventdev.h"
26 #include "eventdev_pmd.h"
27 #include "eventdev_trace.h"
28 #include "rte_event_eth_rx_adapter.h"
29 
30 #define BATCH_SIZE		32
31 #define BLOCK_CNT_THRESHOLD	10
32 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
33 #define MAX_VECTOR_SIZE		1024
34 #define MIN_VECTOR_SIZE		4
35 #define MAX_VECTOR_NS		1E9
36 #define MIN_VECTOR_NS		1E5
37 
38 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
39 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
40 
41 #define RSS_KEY_SIZE	40
42 /* value written to intr thread pipe to signal thread exit */
43 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
44 /* Sentinel value to detect initialized file handle */
45 #define INIT_FD		-1
46 
47 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
48 
49 /*
50  * Used to store port and queue ID of interrupting Rx queue
51  */
52 union queue_data {
53 	RTE_STD_C11
54 	void *ptr;
55 	struct {
56 		uint16_t port;
57 		uint16_t queue;
58 	};
59 };
60 
61 /*
62  * There is an instance of this struct per polled Rx queue added to the
63  * adapter
64  */
65 struct eth_rx_poll_entry {
66 	/* Eth port to poll */
67 	uint16_t eth_dev_id;
68 	/* Eth rx queue to poll */
69 	uint16_t eth_rx_qid;
70 };
71 
72 struct eth_rx_vector_data {
73 	TAILQ_ENTRY(eth_rx_vector_data) next;
74 	uint16_t port;
75 	uint16_t queue;
76 	uint16_t max_vector_count;
77 	uint64_t event;
78 	uint64_t ts;
79 	uint64_t vector_timeout_ticks;
80 	struct rte_mempool *vector_pool;
81 	struct rte_event_vector *vector_ev;
82 } __rte_cache_aligned;
83 
84 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
85 
86 /* Instance per adapter */
87 struct eth_event_enqueue_buffer {
88 	/* Count of events in this buffer */
89 	uint16_t count;
90 	/* Array of events in this buffer */
91 	struct rte_event *events;
92 	/* size of event buffer */
93 	uint16_t events_size;
94 	/* Event enqueue happens from head */
95 	uint16_t head;
96 	/* New packets from rte_eth_rx_burst is enqued from tail */
97 	uint16_t tail;
98 	/* last element in the buffer before rollover */
99 	uint16_t last;
100 	uint16_t last_mask;
101 };
102 
103 struct event_eth_rx_adapter {
104 	/* RSS key */
105 	uint8_t rss_key_be[RSS_KEY_SIZE];
106 	/* Event device identifier */
107 	uint8_t eventdev_id;
108 	/* Event port identifier */
109 	uint8_t event_port_id;
110 	/* Flag indicating per rxq event buffer */
111 	bool use_queue_event_buf;
112 	/* Per ethernet device structure */
113 	struct eth_device_info *eth_devices;
114 	/* Lock to serialize config updates with service function */
115 	rte_spinlock_t rx_lock;
116 	/* Max mbufs processed in any service function invocation */
117 	uint32_t max_nb_rx;
118 	/* Receive queues that need to be polled */
119 	struct eth_rx_poll_entry *eth_rx_poll;
120 	/* Size of the eth_rx_poll array */
121 	uint16_t num_rx_polled;
122 	/* Weighted round robin schedule */
123 	uint32_t *wrr_sched;
124 	/* wrr_sched[] size */
125 	uint32_t wrr_len;
126 	/* Next entry in wrr[] to begin polling */
127 	uint32_t wrr_pos;
128 	/* Event burst buffer */
129 	struct eth_event_enqueue_buffer event_enqueue_buffer;
130 	/* Vector enable flag */
131 	uint8_t ena_vector;
132 	/* Timestamp of previous vector expiry list traversal */
133 	uint64_t prev_expiry_ts;
134 	/* Minimum ticks to wait before traversing expiry list */
135 	uint64_t vector_tmo_ticks;
136 	/* vector list */
137 	struct eth_rx_vector_data_list vector_list;
138 	/* Per adapter stats */
139 	struct rte_event_eth_rx_adapter_stats stats;
140 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
141 	uint16_t enq_block_count;
142 	/* Block start ts */
143 	uint64_t rx_enq_block_start_ts;
144 	/* epoll fd used to wait for Rx interrupts */
145 	int epd;
146 	/* Num of interrupt driven interrupt queues */
147 	uint32_t num_rx_intr;
148 	/* Used to send <dev id, queue id> of interrupting Rx queues from
149 	 * the interrupt thread to the Rx thread
150 	 */
151 	struct rte_ring *intr_ring;
152 	/* Rx Queue data (dev id, queue id) for the last non-empty
153 	 * queue polled
154 	 */
155 	union queue_data qd;
156 	/* queue_data is valid */
157 	int qd_valid;
158 	/* Interrupt ring lock, synchronizes Rx thread
159 	 * and interrupt thread
160 	 */
161 	rte_spinlock_t intr_ring_lock;
162 	/* event array passed to rte_poll_wait */
163 	struct rte_epoll_event *epoll_events;
164 	/* Count of interrupt vectors in use */
165 	uint32_t num_intr_vec;
166 	/* Thread blocked on Rx interrupts */
167 	pthread_t rx_intr_thread;
168 	/* Configuration callback for rte_service configuration */
169 	rte_event_eth_rx_adapter_conf_cb conf_cb;
170 	/* Configuration callback argument */
171 	void *conf_arg;
172 	/* Set if  default_cb is being used */
173 	int default_cb_arg;
174 	/* Service initialization state */
175 	uint8_t service_inited;
176 	/* Total count of Rx queues in adapter */
177 	uint32_t nb_queues;
178 	/* Memory allocation name */
179 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
180 	/* Socket identifier cached from eventdev */
181 	int socket_id;
182 	/* Per adapter EAL service */
183 	uint32_t service_id;
184 	/* Adapter started flag */
185 	uint8_t rxa_started;
186 	/* Adapter ID */
187 	uint8_t id;
188 } __rte_cache_aligned;
189 
190 /* Per eth device */
191 struct eth_device_info {
192 	struct rte_eth_dev *dev;
193 	struct eth_rx_queue_info *rx_queue;
194 	/* Rx callback */
195 	rte_event_eth_rx_adapter_cb_fn cb_fn;
196 	/* Rx callback argument */
197 	void *cb_arg;
198 	/* Set if ethdev->eventdev packet transfer uses a
199 	 * hardware mechanism
200 	 */
201 	uint8_t internal_event_port;
202 	/* Set if the adapter is processing rx queues for
203 	 * this eth device and packet processing has been
204 	 * started, allows for the code to know if the PMD
205 	 * rx_adapter_stop callback needs to be invoked
206 	 */
207 	uint8_t dev_rx_started;
208 	/* Number of queues added for this device */
209 	uint16_t nb_dev_queues;
210 	/* Number of poll based queues
211 	 * If nb_rx_poll > 0, the start callback will
212 	 * be invoked if not already invoked
213 	 */
214 	uint16_t nb_rx_poll;
215 	/* Number of interrupt based queues
216 	 * If nb_rx_intr > 0, the start callback will
217 	 * be invoked if not already invoked.
218 	 */
219 	uint16_t nb_rx_intr;
220 	/* Number of queues that use the shared interrupt */
221 	uint16_t nb_shared_intr;
222 	/* sum(wrr(q)) for all queues within the device
223 	 * useful when deleting all device queues
224 	 */
225 	uint32_t wrr_len;
226 	/* Intr based queue index to start polling from, this is used
227 	 * if the number of shared interrupts is non-zero
228 	 */
229 	uint16_t next_q_idx;
230 	/* Intr based queue indices */
231 	uint16_t *intr_queue;
232 	/* device generates per Rx queue interrupt for queue index
233 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
234 	 */
235 	int multi_intr_cap;
236 	/* shared interrupt enabled */
237 	int shared_intr_enabled;
238 };
239 
240 /* Per Rx queue */
241 struct eth_rx_queue_info {
242 	int queue_enabled;	/* True if added */
243 	int intr_enabled;
244 	uint8_t ena_vector;
245 	uint16_t wt;		/* Polling weight */
246 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
247 	uint64_t event;
248 	struct eth_rx_vector_data vector_data;
249 	struct eth_event_enqueue_buffer *event_buf;
250 	/* use adapter stats struct for queue level stats,
251 	 * as same stats need to be updated for adapter and queue
252 	 */
253 	struct rte_event_eth_rx_adapter_stats *stats;
254 };
255 
256 static struct event_eth_rx_adapter **event_eth_rx_adapter;
257 
258 /* Enable dynamic timestamp field in mbuf */
259 static uint64_t event_eth_rx_timestamp_dynflag;
260 static int event_eth_rx_timestamp_dynfield_offset = -1;
261 
262 static inline rte_mbuf_timestamp_t *
263 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
264 {
265 	return RTE_MBUF_DYNFIELD(mbuf,
266 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
267 }
268 
269 static inline int
270 rxa_validate_id(uint8_t id)
271 {
272 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
273 }
274 
275 static inline struct eth_event_enqueue_buffer *
276 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
277 		  uint16_t rx_queue_id,
278 		  struct rte_event_eth_rx_adapter_stats **stats)
279 {
280 	if (rx_adapter->use_queue_event_buf) {
281 		struct eth_device_info *dev_info =
282 			&rx_adapter->eth_devices[eth_dev_id];
283 		*stats = dev_info->rx_queue[rx_queue_id].stats;
284 		return dev_info->rx_queue[rx_queue_id].event_buf;
285 	} else {
286 		*stats = &rx_adapter->stats;
287 		return &rx_adapter->event_enqueue_buffer;
288 	}
289 }
290 
291 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
292 	if (!rxa_validate_id(id)) { \
293 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
294 		return retval; \
295 	} \
296 } while (0)
297 
298 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(id, retval) do { \
299 	if (!rxa_validate_id(id)) { \
300 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
301 		ret = retval; \
302 		goto error; \
303 	} \
304 } while (0)
305 
306 #define RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, retval) do { \
307 	if ((token) == NULL || strlen(token) == 0 || !isdigit(*token)) { \
308 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter token\n"); \
309 		ret = retval; \
310 		goto error; \
311 	} \
312 } while (0)
313 
314 #define RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(port_id, retval) do { \
315 	if (!rte_eth_dev_is_valid_port(port_id)) { \
316 		RTE_ETHDEV_LOG(ERR, "Invalid port_id=%u\n", port_id); \
317 		ret = retval; \
318 		goto error; \
319 	} \
320 } while (0)
321 
322 static inline int
323 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
324 {
325 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
326 }
327 
328 /* Greatest common divisor */
329 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
330 {
331 	uint16_t r = a % b;
332 
333 	return r ? rxa_gcd_u16(b, r) : b;
334 }
335 
336 /* Returns the next queue in the polling sequence
337  *
338  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
339  */
340 static int
341 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
342 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
343 	     uint16_t gcd, int prev)
344 {
345 	int i = prev;
346 	uint16_t w;
347 
348 	while (1) {
349 		uint16_t q;
350 		uint16_t d;
351 
352 		i = (i + 1) % n;
353 		if (i == 0) {
354 			*cw = *cw - gcd;
355 			if (*cw <= 0)
356 				*cw = max_wt;
357 		}
358 
359 		q = eth_rx_poll[i].eth_rx_qid;
360 		d = eth_rx_poll[i].eth_dev_id;
361 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
362 
363 		if ((int)w >= *cw)
364 			return i;
365 	}
366 }
367 
368 static inline int
369 rxa_shared_intr(struct eth_device_info *dev_info,
370 	int rx_queue_id)
371 {
372 	int multi_intr_cap;
373 
374 	if (dev_info->dev->intr_handle == NULL)
375 		return 0;
376 
377 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
378 	return !multi_intr_cap ||
379 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
380 }
381 
382 static inline int
383 rxa_intr_queue(struct eth_device_info *dev_info,
384 	int rx_queue_id)
385 {
386 	struct eth_rx_queue_info *queue_info;
387 
388 	queue_info = &dev_info->rx_queue[rx_queue_id];
389 	return dev_info->rx_queue &&
390 		!dev_info->internal_event_port &&
391 		queue_info->queue_enabled && queue_info->wt == 0;
392 }
393 
394 static inline int
395 rxa_polled_queue(struct eth_device_info *dev_info,
396 	int rx_queue_id)
397 {
398 	struct eth_rx_queue_info *queue_info;
399 
400 	queue_info = &dev_info->rx_queue[rx_queue_id];
401 	return !dev_info->internal_event_port &&
402 		dev_info->rx_queue &&
403 		queue_info->queue_enabled && queue_info->wt != 0;
404 }
405 
406 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
407 static int
408 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
409 {
410 	uint16_t i;
411 	int n, s;
412 	uint16_t nbq;
413 
414 	nbq = dev_info->dev->data->nb_rx_queues;
415 	n = 0; /* non shared count */
416 	s = 0; /* shared count */
417 
418 	if (rx_queue_id == -1) {
419 		for (i = 0; i < nbq; i++) {
420 			if (!rxa_shared_intr(dev_info, i))
421 				n += add ? !rxa_intr_queue(dev_info, i) :
422 					rxa_intr_queue(dev_info, i);
423 			else
424 				s += add ? !rxa_intr_queue(dev_info, i) :
425 					rxa_intr_queue(dev_info, i);
426 		}
427 
428 		if (s > 0) {
429 			if ((add && dev_info->nb_shared_intr == 0) ||
430 				(!add && dev_info->nb_shared_intr))
431 				n += 1;
432 		}
433 	} else {
434 		if (!rxa_shared_intr(dev_info, rx_queue_id))
435 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
436 				rxa_intr_queue(dev_info, rx_queue_id);
437 		else
438 			n = add ? !dev_info->nb_shared_intr :
439 				dev_info->nb_shared_intr == 1;
440 	}
441 
442 	return add ? n : -n;
443 }
444 
445 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
446  */
447 static void
448 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
449 			  struct eth_device_info *dev_info, int rx_queue_id,
450 			  uint32_t *nb_rx_intr)
451 {
452 	uint32_t intr_diff;
453 
454 	if (rx_queue_id == -1)
455 		intr_diff = dev_info->nb_rx_intr;
456 	else
457 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
458 
459 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
460 }
461 
462 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
463  * interrupt queues could currently be poll mode Rx queues
464  */
465 static void
466 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
467 			  struct eth_device_info *dev_info, int rx_queue_id,
468 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
469 			  uint32_t *nb_wrr)
470 {
471 	uint32_t intr_diff;
472 	uint32_t poll_diff;
473 	uint32_t wrr_len_diff;
474 
475 	if (rx_queue_id == -1) {
476 		intr_diff = dev_info->dev->data->nb_rx_queues -
477 						dev_info->nb_rx_intr;
478 		poll_diff = dev_info->nb_rx_poll;
479 		wrr_len_diff = dev_info->wrr_len;
480 	} else {
481 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
482 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
483 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
484 					0;
485 	}
486 
487 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
488 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
489 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
490 }
491 
492 /* Calculate size of the eth_rx_poll and wrr_sched arrays
493  * after deleting poll mode rx queues
494  */
495 static void
496 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
497 			  struct eth_device_info *dev_info, int rx_queue_id,
498 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
499 {
500 	uint32_t poll_diff;
501 	uint32_t wrr_len_diff;
502 
503 	if (rx_queue_id == -1) {
504 		poll_diff = dev_info->nb_rx_poll;
505 		wrr_len_diff = dev_info->wrr_len;
506 	} else {
507 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
508 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
509 					0;
510 	}
511 
512 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
513 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
514 }
515 
516 /* Calculate nb_rx_* after adding poll mode rx queues
517  */
518 static void
519 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
520 			  struct eth_device_info *dev_info, int rx_queue_id,
521 			  uint16_t wt, uint32_t *nb_rx_poll,
522 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
523 {
524 	uint32_t intr_diff;
525 	uint32_t poll_diff;
526 	uint32_t wrr_len_diff;
527 
528 	if (rx_queue_id == -1) {
529 		intr_diff = dev_info->nb_rx_intr;
530 		poll_diff = dev_info->dev->data->nb_rx_queues -
531 						dev_info->nb_rx_poll;
532 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
533 				- dev_info->wrr_len;
534 	} else {
535 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
536 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
537 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
538 				wt - dev_info->rx_queue[rx_queue_id].wt :
539 				wt;
540 	}
541 
542 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
543 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
544 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
545 }
546 
547 /* Calculate nb_rx_* after adding rx_queue_id */
548 static void
549 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
550 		     struct eth_device_info *dev_info, int rx_queue_id,
551 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
552 		     uint32_t *nb_wrr)
553 {
554 	if (wt != 0)
555 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
556 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
557 	else
558 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
559 					nb_rx_poll, nb_rx_intr, nb_wrr);
560 }
561 
562 /* Calculate nb_rx_* after deleting rx_queue_id */
563 static void
564 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
565 		     struct eth_device_info *dev_info, int rx_queue_id,
566 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
567 		     uint32_t *nb_wrr)
568 {
569 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
570 				nb_wrr);
571 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
572 				nb_rx_intr);
573 }
574 
575 /*
576  * Allocate the rx_poll array
577  */
578 static struct eth_rx_poll_entry *
579 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
580 {
581 	size_t len;
582 
583 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
584 							RTE_CACHE_LINE_SIZE);
585 	return  rte_zmalloc_socket(rx_adapter->mem_name,
586 				len,
587 				RTE_CACHE_LINE_SIZE,
588 				rx_adapter->socket_id);
589 }
590 
591 /*
592  * Allocate the WRR array
593  */
594 static uint32_t *
595 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
596 {
597 	size_t len;
598 
599 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
600 			RTE_CACHE_LINE_SIZE);
601 	return  rte_zmalloc_socket(rx_adapter->mem_name,
602 				len,
603 				RTE_CACHE_LINE_SIZE,
604 				rx_adapter->socket_id);
605 }
606 
607 static int
608 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
609 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
610 		      uint32_t **wrr_sched)
611 {
612 
613 	if (nb_poll == 0) {
614 		*rx_poll = NULL;
615 		*wrr_sched = NULL;
616 		return 0;
617 	}
618 
619 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
620 	if (*rx_poll == NULL) {
621 		*wrr_sched = NULL;
622 		return -ENOMEM;
623 	}
624 
625 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
626 	if (*wrr_sched == NULL) {
627 		rte_free(*rx_poll);
628 		return -ENOMEM;
629 	}
630 	return 0;
631 }
632 
633 /* Precalculate WRR polling sequence for all queues in rx_adapter */
634 static void
635 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
636 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
637 {
638 	uint16_t d;
639 	uint16_t q;
640 	unsigned int i;
641 	int prev = -1;
642 	int cw = -1;
643 
644 	/* Initialize variables for calculation of wrr schedule */
645 	uint16_t max_wrr_pos = 0;
646 	unsigned int poll_q = 0;
647 	uint16_t max_wt = 0;
648 	uint16_t gcd = 0;
649 
650 	if (rx_poll == NULL)
651 		return;
652 
653 	/* Generate array of all queues to poll, the size of this
654 	 * array is poll_q
655 	 */
656 	RTE_ETH_FOREACH_DEV(d) {
657 		uint16_t nb_rx_queues;
658 		struct eth_device_info *dev_info =
659 				&rx_adapter->eth_devices[d];
660 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
661 		if (dev_info->rx_queue == NULL)
662 			continue;
663 		if (dev_info->internal_event_port)
664 			continue;
665 		dev_info->wrr_len = 0;
666 		for (q = 0; q < nb_rx_queues; q++) {
667 			struct eth_rx_queue_info *queue_info =
668 				&dev_info->rx_queue[q];
669 			uint16_t wt;
670 
671 			if (!rxa_polled_queue(dev_info, q))
672 				continue;
673 			wt = queue_info->wt;
674 			rx_poll[poll_q].eth_dev_id = d;
675 			rx_poll[poll_q].eth_rx_qid = q;
676 			max_wrr_pos += wt;
677 			dev_info->wrr_len += wt;
678 			max_wt = RTE_MAX(max_wt, wt);
679 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
680 			poll_q++;
681 		}
682 	}
683 
684 	/* Generate polling sequence based on weights */
685 	prev = -1;
686 	cw = -1;
687 	for (i = 0; i < max_wrr_pos; i++) {
688 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
689 				     rx_poll, max_wt, gcd, prev);
690 		prev = rx_wrr[i];
691 	}
692 }
693 
694 static inline void
695 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
696 	struct rte_ipv6_hdr **ipv6_hdr)
697 {
698 	struct rte_ether_hdr *eth_hdr =
699 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
700 	struct rte_vlan_hdr *vlan_hdr;
701 
702 	*ipv4_hdr = NULL;
703 	*ipv6_hdr = NULL;
704 
705 	switch (eth_hdr->ether_type) {
706 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
707 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
708 		break;
709 
710 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
711 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
712 		break;
713 
714 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
715 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
716 		switch (vlan_hdr->eth_proto) {
717 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
718 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
719 			break;
720 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
721 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
722 			break;
723 		default:
724 			break;
725 		}
726 		break;
727 
728 	default:
729 		break;
730 	}
731 }
732 
733 /* Calculate RSS hash for IPv4/6 */
734 static inline uint32_t
735 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
736 {
737 	uint32_t input_len;
738 	void *tuple;
739 	struct rte_ipv4_tuple ipv4_tuple;
740 	struct rte_ipv6_tuple ipv6_tuple;
741 	struct rte_ipv4_hdr *ipv4_hdr;
742 	struct rte_ipv6_hdr *ipv6_hdr;
743 
744 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
745 
746 	if (ipv4_hdr) {
747 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
748 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
749 		tuple = &ipv4_tuple;
750 		input_len = RTE_THASH_V4_L3_LEN;
751 	} else if (ipv6_hdr) {
752 		rte_thash_load_v6_addrs(ipv6_hdr,
753 					(union rte_thash_tuple *)&ipv6_tuple);
754 		tuple = &ipv6_tuple;
755 		input_len = RTE_THASH_V6_L3_LEN;
756 	} else
757 		return 0;
758 
759 	return rte_softrss_be(tuple, input_len, rss_key_be);
760 }
761 
762 static inline int
763 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
764 {
765 	return !!rx_adapter->enq_block_count;
766 }
767 
768 static inline void
769 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
770 {
771 	if (rx_adapter->rx_enq_block_start_ts)
772 		return;
773 
774 	rx_adapter->enq_block_count++;
775 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
776 		return;
777 
778 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
779 }
780 
781 static inline void
782 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
783 		     struct rte_event_eth_rx_adapter_stats *stats)
784 {
785 	if (unlikely(!stats->rx_enq_start_ts))
786 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
787 
788 	if (likely(!rxa_enq_blocked(rx_adapter)))
789 		return;
790 
791 	rx_adapter->enq_block_count = 0;
792 	if (rx_adapter->rx_enq_block_start_ts) {
793 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
794 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
795 		    rx_adapter->rx_enq_block_start_ts;
796 		rx_adapter->rx_enq_block_start_ts = 0;
797 	}
798 }
799 
800 /* Enqueue buffered events to event device */
801 static inline uint16_t
802 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
803 		       struct eth_event_enqueue_buffer *buf,
804 		       struct rte_event_eth_rx_adapter_stats *stats)
805 {
806 	uint16_t count = buf->count;
807 	uint16_t n = 0;
808 
809 	if (!count)
810 		return 0;
811 
812 	if (buf->last)
813 		count = buf->last - buf->head;
814 
815 	if (count) {
816 		n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
817 						rx_adapter->event_port_id,
818 						&buf->events[buf->head],
819 						count);
820 		if (n != count)
821 			stats->rx_enq_retry++;
822 
823 		buf->head += n;
824 	}
825 
826 	if (buf->last && n == count) {
827 		uint16_t n1;
828 
829 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
830 					rx_adapter->event_port_id,
831 					&buf->events[0],
832 					buf->tail);
833 
834 		if (n1 != buf->tail)
835 			stats->rx_enq_retry++;
836 
837 		buf->last = 0;
838 		buf->head = n1;
839 		buf->last_mask = 0;
840 		n += n1;
841 	}
842 
843 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
844 		rxa_enq_block_start_ts(rx_adapter);
845 
846 	buf->count -= n;
847 	stats->rx_enq_count += n;
848 
849 	return n;
850 }
851 
852 static inline void
853 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
854 		struct eth_rx_vector_data *vec)
855 {
856 	vec->vector_ev->nb_elem = 0;
857 	vec->vector_ev->port = vec->port;
858 	vec->vector_ev->queue = vec->queue;
859 	vec->vector_ev->attr_valid = true;
860 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
861 }
862 
863 static inline uint16_t
864 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
865 			struct eth_rx_queue_info *queue_info,
866 			struct eth_event_enqueue_buffer *buf,
867 			struct rte_mbuf **mbufs, uint16_t num)
868 {
869 	struct rte_event *ev = &buf->events[buf->count];
870 	struct eth_rx_vector_data *vec;
871 	uint16_t filled, space, sz;
872 
873 	filled = 0;
874 	vec = &queue_info->vector_data;
875 
876 	if (vec->vector_ev == NULL) {
877 		if (rte_mempool_get(vec->vector_pool,
878 				    (void **)&vec->vector_ev) < 0) {
879 			rte_pktmbuf_free_bulk(mbufs, num);
880 			return 0;
881 		}
882 		rxa_init_vector(rx_adapter, vec);
883 	}
884 	while (num) {
885 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
886 			/* Event ready. */
887 			ev->event = vec->event;
888 			ev->vec = vec->vector_ev;
889 			ev++;
890 			filled++;
891 			vec->vector_ev = NULL;
892 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
893 			if (rte_mempool_get(vec->vector_pool,
894 					    (void **)&vec->vector_ev) < 0) {
895 				rte_pktmbuf_free_bulk(mbufs, num);
896 				return 0;
897 			}
898 			rxa_init_vector(rx_adapter, vec);
899 		}
900 
901 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
902 		sz = num > space ? space : num;
903 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
904 		       sizeof(void *) * sz);
905 		vec->vector_ev->nb_elem += sz;
906 		num -= sz;
907 		mbufs += sz;
908 		vec->ts = rte_rdtsc();
909 	}
910 
911 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
912 		ev->event = vec->event;
913 		ev->vec = vec->vector_ev;
914 		ev++;
915 		filled++;
916 		vec->vector_ev = NULL;
917 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
918 	}
919 
920 	return filled;
921 }
922 
923 static inline void
924 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
925 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
926 		 struct eth_event_enqueue_buffer *buf,
927 		 struct rte_event_eth_rx_adapter_stats *stats)
928 {
929 	uint32_t i;
930 	struct eth_device_info *dev_info =
931 					&rx_adapter->eth_devices[eth_dev_id];
932 	struct eth_rx_queue_info *eth_rx_queue_info =
933 					&dev_info->rx_queue[rx_queue_id];
934 	uint16_t new_tail = buf->tail;
935 	uint64_t event = eth_rx_queue_info->event;
936 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
937 	struct rte_mbuf *m = mbufs[0];
938 	uint32_t rss_mask;
939 	uint32_t rss;
940 	int do_rss;
941 	uint16_t nb_cb;
942 	uint16_t dropped;
943 	uint64_t ts, ts_mask;
944 
945 	if (!eth_rx_queue_info->ena_vector) {
946 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
947 						0 : rte_get_tsc_cycles();
948 
949 		/* 0xffff ffff ffff ffff if RTE_MBUF_F_RX_TIMESTAMP is set,
950 		 * otherwise 0
951 		 */
952 		ts_mask = (uint64_t)(!(m->ol_flags &
953 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
954 
955 		/* 0xffff ffff if RTE_MBUF_F_RX_RSS_HASH is set, otherwise 0 */
956 		rss_mask = ~(((m->ol_flags & RTE_MBUF_F_RX_RSS_HASH) != 0) - 1);
957 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
958 		for (i = 0; i < num; i++) {
959 			struct rte_event *ev;
960 
961 			m = mbufs[i];
962 			*rxa_timestamp_dynfield(m) = ts |
963 					(*rxa_timestamp_dynfield(m) & ts_mask);
964 
965 			ev = &buf->events[new_tail];
966 
967 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
968 				     : m->hash.rss;
969 			ev->event = event;
970 			ev->flow_id = (rss & ~flow_id_mask) |
971 				      (ev->flow_id & flow_id_mask);
972 			ev->mbuf = m;
973 			new_tail++;
974 		}
975 	} else {
976 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
977 					      buf, mbufs, num);
978 	}
979 
980 	if (num && dev_info->cb_fn) {
981 
982 		dropped = 0;
983 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
984 				       buf->last |
985 				       (buf->events_size & ~buf->last_mask),
986 				       buf->count >= BATCH_SIZE ?
987 						buf->count - BATCH_SIZE : 0,
988 				       &buf->events[buf->tail],
989 				       num,
990 				       dev_info->cb_arg,
991 				       &dropped);
992 		if (unlikely(nb_cb > num))
993 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
994 				nb_cb, num);
995 		else
996 			num = nb_cb;
997 		if (dropped)
998 			stats->rx_dropped += dropped;
999 	}
1000 
1001 	buf->count += num;
1002 	buf->tail += num;
1003 }
1004 
1005 static inline bool
1006 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
1007 {
1008 	uint32_t nb_req = buf->tail + BATCH_SIZE;
1009 
1010 	if (!buf->last) {
1011 		if (nb_req <= buf->events_size)
1012 			return true;
1013 
1014 		if (buf->head >= BATCH_SIZE) {
1015 			buf->last_mask = ~0;
1016 			buf->last = buf->tail;
1017 			buf->tail = 0;
1018 			return true;
1019 		}
1020 	}
1021 
1022 	return nb_req <= buf->head;
1023 }
1024 
1025 /* Enqueue packets from  <port, q>  to event buffer */
1026 static inline uint32_t
1027 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
1028 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
1029 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf,
1030 	   struct rte_event_eth_rx_adapter_stats *stats)
1031 {
1032 	struct rte_mbuf *mbufs[BATCH_SIZE];
1033 	uint16_t n;
1034 	uint32_t nb_rx = 0;
1035 	uint32_t nb_flushed = 0;
1036 
1037 	if (rxq_empty)
1038 		*rxq_empty = 0;
1039 	/* Don't do a batch dequeue from the rx queue if there isn't
1040 	 * enough space in the enqueue buffer.
1041 	 */
1042 	while (rxa_pkt_buf_available(buf)) {
1043 		if (buf->count >= BATCH_SIZE)
1044 			nb_flushed +=
1045 				rxa_flush_event_buffer(rx_adapter, buf, stats);
1046 
1047 		stats->rx_poll_count++;
1048 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1049 		if (unlikely(!n)) {
1050 			if (rxq_empty)
1051 				*rxq_empty = 1;
1052 			break;
1053 		}
1054 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf,
1055 				 stats);
1056 		nb_rx += n;
1057 		if (rx_count + nb_rx > max_rx)
1058 			break;
1059 	}
1060 
1061 	if (buf->count > 0)
1062 		nb_flushed += rxa_flush_event_buffer(rx_adapter, buf, stats);
1063 
1064 	stats->rx_packets += nb_rx;
1065 	if (nb_flushed == 0)
1066 		rte_event_maintain(rx_adapter->eventdev_id,
1067 				   rx_adapter->event_port_id, 0);
1068 
1069 	return nb_rx;
1070 }
1071 
1072 static inline void
1073 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1074 {
1075 	uint16_t port_id;
1076 	uint16_t queue;
1077 	int err;
1078 	union queue_data qd;
1079 	struct eth_device_info *dev_info;
1080 	struct eth_rx_queue_info *queue_info;
1081 	int *intr_enabled;
1082 
1083 	qd.ptr = data;
1084 	port_id = qd.port;
1085 	queue = qd.queue;
1086 
1087 	dev_info = &rx_adapter->eth_devices[port_id];
1088 	queue_info = &dev_info->rx_queue[queue];
1089 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1090 	if (rxa_shared_intr(dev_info, queue))
1091 		intr_enabled = &dev_info->shared_intr_enabled;
1092 	else
1093 		intr_enabled = &queue_info->intr_enabled;
1094 
1095 	if (*intr_enabled) {
1096 		*intr_enabled = 0;
1097 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1098 		/* Entry should always be available.
1099 		 * The ring size equals the maximum number of interrupt
1100 		 * vectors supported (an interrupt vector is shared in
1101 		 * case of shared interrupts)
1102 		 */
1103 		if (err)
1104 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1105 				" to ring: %s", strerror(-err));
1106 		else
1107 			rte_eth_dev_rx_intr_disable(port_id, queue);
1108 	}
1109 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1110 }
1111 
1112 static int
1113 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1114 			  uint32_t num_intr_vec)
1115 {
1116 	if (rx_adapter->num_intr_vec + num_intr_vec >
1117 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1118 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1119 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1120 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1121 		return -ENOSPC;
1122 	}
1123 
1124 	return 0;
1125 }
1126 
1127 /* Delete entries for (dev, queue) from the interrupt ring */
1128 static void
1129 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1130 			  struct eth_device_info *dev_info,
1131 			  uint16_t rx_queue_id)
1132 {
1133 	int i, n;
1134 	union queue_data qd;
1135 
1136 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1137 
1138 	n = rte_ring_count(rx_adapter->intr_ring);
1139 	for (i = 0; i < n; i++) {
1140 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1141 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1142 			if (qd.port == dev_info->dev->data->port_id &&
1143 				qd.queue == rx_queue_id)
1144 				continue;
1145 		} else {
1146 			if (qd.port == dev_info->dev->data->port_id)
1147 				continue;
1148 		}
1149 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1150 	}
1151 
1152 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1153 }
1154 
1155 /* pthread callback handling interrupt mode receive queues
1156  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1157  * interrupting queue to the adapter's ring buffer for interrupt events.
1158  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1159  * the adapter service function.
1160  */
1161 static void *
1162 rxa_intr_thread(void *arg)
1163 {
1164 	struct event_eth_rx_adapter *rx_adapter = arg;
1165 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1166 	int n, i;
1167 
1168 	while (1) {
1169 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1170 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1171 		if (unlikely(n < 0))
1172 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1173 					n);
1174 		for (i = 0; i < n; i++) {
1175 			rxa_intr_ring_enqueue(rx_adapter,
1176 					epoll_events[i].epdata.data);
1177 		}
1178 	}
1179 
1180 	return NULL;
1181 }
1182 
1183 /* Dequeue <port, q> from interrupt ring and enqueue received
1184  * mbufs to eventdev
1185  */
1186 static inline void
1187 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1188 {
1189 	uint32_t n;
1190 	uint32_t nb_rx = 0;
1191 	int rxq_empty;
1192 	struct eth_event_enqueue_buffer *buf;
1193 	struct rte_event_eth_rx_adapter_stats *stats;
1194 	rte_spinlock_t *ring_lock;
1195 	uint8_t max_done = 0;
1196 
1197 	if (rx_adapter->num_rx_intr == 0)
1198 		return;
1199 
1200 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1201 		&& !rx_adapter->qd_valid)
1202 		return;
1203 
1204 	buf = &rx_adapter->event_enqueue_buffer;
1205 	stats = &rx_adapter->stats;
1206 	ring_lock = &rx_adapter->intr_ring_lock;
1207 
1208 	if (buf->count >= BATCH_SIZE)
1209 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1210 
1211 	while (rxa_pkt_buf_available(buf)) {
1212 		struct eth_device_info *dev_info;
1213 		uint16_t port;
1214 		uint16_t queue;
1215 		union queue_data qd  = rx_adapter->qd;
1216 		int err;
1217 
1218 		if (!rx_adapter->qd_valid) {
1219 			struct eth_rx_queue_info *queue_info;
1220 
1221 			rte_spinlock_lock(ring_lock);
1222 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1223 			if (err) {
1224 				rte_spinlock_unlock(ring_lock);
1225 				break;
1226 			}
1227 
1228 			port = qd.port;
1229 			queue = qd.queue;
1230 			rx_adapter->qd = qd;
1231 			rx_adapter->qd_valid = 1;
1232 			dev_info = &rx_adapter->eth_devices[port];
1233 			if (rxa_shared_intr(dev_info, queue))
1234 				dev_info->shared_intr_enabled = 1;
1235 			else {
1236 				queue_info = &dev_info->rx_queue[queue];
1237 				queue_info->intr_enabled = 1;
1238 			}
1239 			rte_eth_dev_rx_intr_enable(port, queue);
1240 			rte_spinlock_unlock(ring_lock);
1241 		} else {
1242 			port = qd.port;
1243 			queue = qd.queue;
1244 
1245 			dev_info = &rx_adapter->eth_devices[port];
1246 		}
1247 
1248 		if (rxa_shared_intr(dev_info, queue)) {
1249 			uint16_t i;
1250 			uint16_t nb_queues;
1251 
1252 			nb_queues = dev_info->dev->data->nb_rx_queues;
1253 			n = 0;
1254 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1255 				uint8_t enq_buffer_full;
1256 
1257 				if (!rxa_intr_queue(dev_info, i))
1258 					continue;
1259 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1260 					rx_adapter->max_nb_rx,
1261 					&rxq_empty, buf, stats);
1262 				nb_rx += n;
1263 
1264 				enq_buffer_full = !rxq_empty && n == 0;
1265 				max_done = nb_rx > rx_adapter->max_nb_rx;
1266 
1267 				if (enq_buffer_full || max_done) {
1268 					dev_info->next_q_idx = i;
1269 					goto done;
1270 				}
1271 			}
1272 
1273 			rx_adapter->qd_valid = 0;
1274 
1275 			/* Reinitialize for next interrupt */
1276 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1277 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1278 						0;
1279 		} else {
1280 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1281 				rx_adapter->max_nb_rx,
1282 				&rxq_empty, buf, stats);
1283 			rx_adapter->qd_valid = !rxq_empty;
1284 			nb_rx += n;
1285 			if (nb_rx > rx_adapter->max_nb_rx)
1286 				break;
1287 		}
1288 	}
1289 
1290 done:
1291 	rx_adapter->stats.rx_intr_packets += nb_rx;
1292 }
1293 
1294 /*
1295  * Polls receive queues added to the event adapter and enqueues received
1296  * packets to the event device.
1297  *
1298  * The receive code enqueues initially to a temporary buffer, the
1299  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1300  *
1301  * If there isn't space available in the temporary buffer, packets from the
1302  * Rx queue aren't dequeued from the eth device, this back pressures the
1303  * eth device, in virtual device environments this back pressure is relayed to
1304  * the hypervisor's switching layer where adjustments can be made to deal with
1305  * it.
1306  */
1307 static inline void
1308 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1309 {
1310 	uint32_t num_queue;
1311 	uint32_t nb_rx = 0;
1312 	struct eth_event_enqueue_buffer *buf = NULL;
1313 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1314 	uint32_t wrr_pos;
1315 	uint32_t max_nb_rx;
1316 
1317 	wrr_pos = rx_adapter->wrr_pos;
1318 	max_nb_rx = rx_adapter->max_nb_rx;
1319 
1320 	/* Iterate through a WRR sequence */
1321 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1322 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1323 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1324 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1325 
1326 		buf = rxa_event_buf_get(rx_adapter, d, qid, &stats);
1327 
1328 		/* Don't do a batch dequeue from the rx queue if there isn't
1329 		 * enough space in the enqueue buffer.
1330 		 */
1331 		if (buf->count >= BATCH_SIZE)
1332 			rxa_flush_event_buffer(rx_adapter, buf, stats);
1333 		if (!rxa_pkt_buf_available(buf)) {
1334 			if (rx_adapter->use_queue_event_buf)
1335 				goto poll_next_entry;
1336 			else {
1337 				rx_adapter->wrr_pos = wrr_pos;
1338 				return;
1339 			}
1340 		}
1341 
1342 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1343 				NULL, buf, stats);
1344 		if (nb_rx > max_nb_rx) {
1345 			rx_adapter->wrr_pos =
1346 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1347 			break;
1348 		}
1349 
1350 poll_next_entry:
1351 		if (++wrr_pos == rx_adapter->wrr_len)
1352 			wrr_pos = 0;
1353 	}
1354 }
1355 
1356 static void
1357 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1358 {
1359 	struct event_eth_rx_adapter *rx_adapter = arg;
1360 	struct eth_event_enqueue_buffer *buf = NULL;
1361 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1362 	struct rte_event *ev;
1363 
1364 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue, &stats);
1365 
1366 	if (buf->count)
1367 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1368 
1369 	if (vec->vector_ev->nb_elem == 0)
1370 		return;
1371 	ev = &buf->events[buf->count];
1372 
1373 	/* Event ready. */
1374 	ev->event = vec->event;
1375 	ev->vec = vec->vector_ev;
1376 	buf->count++;
1377 
1378 	vec->vector_ev = NULL;
1379 	vec->ts = 0;
1380 }
1381 
1382 static int
1383 rxa_service_func(void *args)
1384 {
1385 	struct event_eth_rx_adapter *rx_adapter = args;
1386 
1387 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1388 		return 0;
1389 	if (!rx_adapter->rxa_started) {
1390 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1391 		return 0;
1392 	}
1393 
1394 	if (rx_adapter->ena_vector) {
1395 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1396 		    rx_adapter->vector_tmo_ticks) {
1397 			struct eth_rx_vector_data *vec;
1398 
1399 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1400 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1401 
1402 				if (elapsed_time >= vec->vector_timeout_ticks) {
1403 					rxa_vector_expire(vec, rx_adapter);
1404 					TAILQ_REMOVE(&rx_adapter->vector_list,
1405 						     vec, next);
1406 				}
1407 			}
1408 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1409 		}
1410 	}
1411 
1412 	rxa_intr_ring_dequeue(rx_adapter);
1413 	rxa_poll(rx_adapter);
1414 
1415 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1416 
1417 	return 0;
1418 }
1419 
1420 static int
1421 rte_event_eth_rx_adapter_init(void)
1422 {
1423 	const char *name = RXA_ADAPTER_ARRAY;
1424 	const struct rte_memzone *mz;
1425 	unsigned int sz;
1426 
1427 	sz = sizeof(*event_eth_rx_adapter) *
1428 	    RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1429 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1430 
1431 	mz = rte_memzone_lookup(name);
1432 	if (mz == NULL) {
1433 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1434 						 RTE_CACHE_LINE_SIZE);
1435 		if (mz == NULL) {
1436 			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1437 					PRId32, rte_errno);
1438 			return -rte_errno;
1439 		}
1440 	}
1441 
1442 	event_eth_rx_adapter = mz->addr;
1443 	return 0;
1444 }
1445 
1446 static int
1447 rxa_memzone_lookup(void)
1448 {
1449 	const struct rte_memzone *mz;
1450 
1451 	if (event_eth_rx_adapter == NULL) {
1452 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1453 		if (mz == NULL)
1454 			return -ENOMEM;
1455 		event_eth_rx_adapter = mz->addr;
1456 	}
1457 
1458 	return 0;
1459 }
1460 
1461 static inline struct event_eth_rx_adapter *
1462 rxa_id_to_adapter(uint8_t id)
1463 {
1464 	return event_eth_rx_adapter ?
1465 		event_eth_rx_adapter[id] : NULL;
1466 }
1467 
1468 static int
1469 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1470 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1471 {
1472 	int ret;
1473 	struct rte_eventdev *dev;
1474 	struct rte_event_dev_config dev_conf;
1475 	int started;
1476 	uint8_t port_id;
1477 	struct rte_event_port_conf *port_conf = arg;
1478 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1479 
1480 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1481 	dev_conf = dev->data->dev_conf;
1482 
1483 	started = dev->data->dev_started;
1484 	if (started)
1485 		rte_event_dev_stop(dev_id);
1486 	port_id = dev_conf.nb_event_ports;
1487 	dev_conf.nb_event_ports += 1;
1488 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1489 	if (ret) {
1490 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1491 						dev_id);
1492 		if (started) {
1493 			if (rte_event_dev_start(dev_id))
1494 				return -EIO;
1495 		}
1496 		return ret;
1497 	}
1498 
1499 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1500 	if (ret) {
1501 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1502 					port_id);
1503 		return ret;
1504 	}
1505 
1506 	conf->event_port_id = port_id;
1507 	conf->max_nb_rx = 128;
1508 	if (started)
1509 		ret = rte_event_dev_start(dev_id);
1510 	rx_adapter->default_cb_arg = 1;
1511 	return ret;
1512 }
1513 
1514 static int
1515 rxa_epoll_create1(void)
1516 {
1517 #if defined(LINUX)
1518 	int fd;
1519 	fd = epoll_create1(EPOLL_CLOEXEC);
1520 	return fd < 0 ? -errno : fd;
1521 #elif defined(BSD)
1522 	return -ENOTSUP;
1523 #endif
1524 }
1525 
1526 static int
1527 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1528 {
1529 	if (rx_adapter->epd != INIT_FD)
1530 		return 0;
1531 
1532 	rx_adapter->epd = rxa_epoll_create1();
1533 	if (rx_adapter->epd < 0) {
1534 		int err = rx_adapter->epd;
1535 		rx_adapter->epd = INIT_FD;
1536 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1537 		return err;
1538 	}
1539 
1540 	return 0;
1541 }
1542 
1543 static int
1544 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1545 {
1546 	int err;
1547 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1548 
1549 	if (rx_adapter->intr_ring)
1550 		return 0;
1551 
1552 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1553 					RTE_EVENT_ETH_INTR_RING_SIZE,
1554 					rte_socket_id(), 0);
1555 	if (!rx_adapter->intr_ring)
1556 		return -ENOMEM;
1557 
1558 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1559 					RTE_EVENT_ETH_INTR_RING_SIZE *
1560 					sizeof(struct rte_epoll_event),
1561 					RTE_CACHE_LINE_SIZE,
1562 					rx_adapter->socket_id);
1563 	if (!rx_adapter->epoll_events) {
1564 		err = -ENOMEM;
1565 		goto error;
1566 	}
1567 
1568 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1569 
1570 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1571 			"rx-intr-thread-%d", rx_adapter->id);
1572 
1573 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1574 				NULL, rxa_intr_thread, rx_adapter);
1575 	if (!err)
1576 		return 0;
1577 
1578 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1579 	rte_free(rx_adapter->epoll_events);
1580 error:
1581 	rte_ring_free(rx_adapter->intr_ring);
1582 	rx_adapter->intr_ring = NULL;
1583 	rx_adapter->epoll_events = NULL;
1584 	return err;
1585 }
1586 
1587 static int
1588 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1589 {
1590 	int err;
1591 
1592 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1593 	if (err)
1594 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1595 				err);
1596 
1597 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1598 	if (err)
1599 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1600 
1601 	rte_free(rx_adapter->epoll_events);
1602 	rte_ring_free(rx_adapter->intr_ring);
1603 	rx_adapter->intr_ring = NULL;
1604 	rx_adapter->epoll_events = NULL;
1605 	return 0;
1606 }
1607 
1608 static int
1609 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1610 {
1611 	int ret;
1612 
1613 	if (rx_adapter->num_rx_intr == 0)
1614 		return 0;
1615 
1616 	ret = rxa_destroy_intr_thread(rx_adapter);
1617 	if (ret)
1618 		return ret;
1619 
1620 	close(rx_adapter->epd);
1621 	rx_adapter->epd = INIT_FD;
1622 
1623 	return ret;
1624 }
1625 
1626 static int
1627 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1628 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1629 {
1630 	int err;
1631 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1632 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1633 
1634 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1635 	if (err) {
1636 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1637 			rx_queue_id);
1638 		return err;
1639 	}
1640 
1641 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1642 					rx_adapter->epd,
1643 					RTE_INTR_EVENT_DEL,
1644 					0);
1645 	if (err)
1646 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1647 
1648 	if (sintr)
1649 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1650 	else
1651 		dev_info->shared_intr_enabled = 0;
1652 	return err;
1653 }
1654 
1655 static int
1656 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1657 		   struct eth_device_info *dev_info, int rx_queue_id)
1658 {
1659 	int err;
1660 	int i;
1661 	int s;
1662 
1663 	if (dev_info->nb_rx_intr == 0)
1664 		return 0;
1665 
1666 	err = 0;
1667 	if (rx_queue_id == -1) {
1668 		s = dev_info->nb_shared_intr;
1669 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1670 			int sintr;
1671 			uint16_t q;
1672 
1673 			q = dev_info->intr_queue[i];
1674 			sintr = rxa_shared_intr(dev_info, q);
1675 			s -= sintr;
1676 
1677 			if (!sintr || s == 0) {
1678 
1679 				err = rxa_disable_intr(rx_adapter, dev_info,
1680 						q);
1681 				if (err)
1682 					return err;
1683 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1684 							q);
1685 			}
1686 		}
1687 	} else {
1688 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1689 			return 0;
1690 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1691 				dev_info->nb_shared_intr == 1) {
1692 			err = rxa_disable_intr(rx_adapter, dev_info,
1693 					rx_queue_id);
1694 			if (err)
1695 				return err;
1696 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1697 						rx_queue_id);
1698 		}
1699 
1700 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1701 			if (dev_info->intr_queue[i] == rx_queue_id) {
1702 				for (; i < dev_info->nb_rx_intr - 1; i++)
1703 					dev_info->intr_queue[i] =
1704 						dev_info->intr_queue[i + 1];
1705 				break;
1706 			}
1707 		}
1708 	}
1709 
1710 	return err;
1711 }
1712 
1713 static int
1714 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1715 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1716 {
1717 	int err, err1;
1718 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1719 	union queue_data qd;
1720 	int init_fd;
1721 	uint16_t *intr_queue;
1722 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1723 
1724 	if (rxa_intr_queue(dev_info, rx_queue_id))
1725 		return 0;
1726 
1727 	intr_queue = dev_info->intr_queue;
1728 	if (dev_info->intr_queue == NULL) {
1729 		size_t len =
1730 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1731 		dev_info->intr_queue =
1732 			rte_zmalloc_socket(
1733 				rx_adapter->mem_name,
1734 				len,
1735 				0,
1736 				rx_adapter->socket_id);
1737 		if (dev_info->intr_queue == NULL)
1738 			return -ENOMEM;
1739 	}
1740 
1741 	init_fd = rx_adapter->epd;
1742 	err = rxa_init_epd(rx_adapter);
1743 	if (err)
1744 		goto err_free_queue;
1745 
1746 	qd.port = eth_dev_id;
1747 	qd.queue = rx_queue_id;
1748 
1749 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1750 					rx_adapter->epd,
1751 					RTE_INTR_EVENT_ADD,
1752 					qd.ptr);
1753 	if (err) {
1754 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1755 			" Rx Queue %u err %d", rx_queue_id, err);
1756 		goto err_del_fd;
1757 	}
1758 
1759 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1760 	if (err) {
1761 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1762 				" Rx Queue %u err %d", rx_queue_id, err);
1763 
1764 		goto err_del_event;
1765 	}
1766 
1767 	err = rxa_create_intr_thread(rx_adapter);
1768 	if (!err)  {
1769 		if (sintr)
1770 			dev_info->shared_intr_enabled = 1;
1771 		else
1772 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1773 		return 0;
1774 	}
1775 
1776 
1777 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1778 	if (err)
1779 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1780 				" Rx Queue %u err %d", rx_queue_id, err);
1781 err_del_event:
1782 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1783 					rx_adapter->epd,
1784 					RTE_INTR_EVENT_DEL,
1785 					0);
1786 	if (err1) {
1787 		RTE_EDEV_LOG_ERR("Could not delete event for"
1788 				" Rx Queue %u err %d", rx_queue_id, err1);
1789 	}
1790 err_del_fd:
1791 	if (init_fd == INIT_FD) {
1792 		close(rx_adapter->epd);
1793 		rx_adapter->epd = -1;
1794 	}
1795 err_free_queue:
1796 	if (intr_queue == NULL)
1797 		rte_free(dev_info->intr_queue);
1798 
1799 	return err;
1800 }
1801 
1802 static int
1803 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1804 		   struct eth_device_info *dev_info, int rx_queue_id)
1805 
1806 {
1807 	int i, j, err;
1808 	int si = -1;
1809 	int shared_done = (dev_info->nb_shared_intr > 0);
1810 
1811 	if (rx_queue_id != -1) {
1812 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1813 			return 0;
1814 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1815 	}
1816 
1817 	err = 0;
1818 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1819 
1820 		if (rxa_shared_intr(dev_info, i) && shared_done)
1821 			continue;
1822 
1823 		err = rxa_config_intr(rx_adapter, dev_info, i);
1824 
1825 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1826 		if (shared_done) {
1827 			si = i;
1828 			dev_info->shared_intr_enabled = 1;
1829 		}
1830 		if (err)
1831 			break;
1832 	}
1833 
1834 	if (err == 0)
1835 		return 0;
1836 
1837 	shared_done = (dev_info->nb_shared_intr > 0);
1838 	for (j = 0; j < i; j++) {
1839 		if (rxa_intr_queue(dev_info, j))
1840 			continue;
1841 		if (rxa_shared_intr(dev_info, j) && si != j)
1842 			continue;
1843 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1844 		if (err)
1845 			break;
1846 
1847 	}
1848 
1849 	return err;
1850 }
1851 
1852 static int
1853 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1854 {
1855 	int ret;
1856 	struct rte_service_spec service;
1857 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1858 
1859 	if (rx_adapter->service_inited)
1860 		return 0;
1861 
1862 	memset(&service, 0, sizeof(service));
1863 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1864 		"rte_event_eth_rx_adapter_%d", id);
1865 	service.socket_id = rx_adapter->socket_id;
1866 	service.callback = rxa_service_func;
1867 	service.callback_userdata = rx_adapter;
1868 	/* Service function handles locking for queue add/del updates */
1869 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1870 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1871 	if (ret) {
1872 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1873 			service.name, ret);
1874 		return ret;
1875 	}
1876 
1877 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1878 		&rx_adapter_conf, rx_adapter->conf_arg);
1879 	if (ret) {
1880 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1881 			ret);
1882 		goto err_done;
1883 	}
1884 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1885 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1886 	rx_adapter->service_inited = 1;
1887 	rx_adapter->epd = INIT_FD;
1888 	return 0;
1889 
1890 err_done:
1891 	rte_service_component_unregister(rx_adapter->service_id);
1892 	return ret;
1893 }
1894 
1895 static void
1896 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1897 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1898 		 uint8_t add)
1899 {
1900 	struct eth_rx_queue_info *queue_info;
1901 	int enabled;
1902 	uint16_t i;
1903 
1904 	if (dev_info->rx_queue == NULL)
1905 		return;
1906 
1907 	if (rx_queue_id == -1) {
1908 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1909 			rxa_update_queue(rx_adapter, dev_info, i, add);
1910 	} else {
1911 		queue_info = &dev_info->rx_queue[rx_queue_id];
1912 		enabled = queue_info->queue_enabled;
1913 		if (add) {
1914 			rx_adapter->nb_queues += !enabled;
1915 			dev_info->nb_dev_queues += !enabled;
1916 		} else {
1917 			rx_adapter->nb_queues -= enabled;
1918 			dev_info->nb_dev_queues -= enabled;
1919 		}
1920 		queue_info->queue_enabled = !!add;
1921 	}
1922 }
1923 
1924 static void
1925 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1926 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1927 		    uint16_t port_id)
1928 {
1929 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1930 	struct eth_rx_vector_data *vector_data;
1931 	uint32_t flow_id;
1932 
1933 	vector_data = &queue_info->vector_data;
1934 	vector_data->max_vector_count = vector_count;
1935 	vector_data->port = port_id;
1936 	vector_data->queue = qid;
1937 	vector_data->vector_pool = mp;
1938 	vector_data->vector_timeout_ticks =
1939 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1940 	vector_data->ts = 0;
1941 	flow_id = queue_info->event & 0xFFFFF;
1942 	flow_id =
1943 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1944 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1945 }
1946 
1947 static void
1948 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
1949 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
1950 {
1951 	struct eth_rx_vector_data *vec;
1952 	int pollq;
1953 	int intrq;
1954 	int sintrq;
1955 
1956 
1957 	if (rx_adapter->nb_queues == 0)
1958 		return;
1959 
1960 	if (rx_queue_id == -1) {
1961 		uint16_t nb_rx_queues;
1962 		uint16_t i;
1963 
1964 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1965 		for (i = 0; i <	nb_rx_queues; i++)
1966 			rxa_sw_del(rx_adapter, dev_info, i);
1967 		return;
1968 	}
1969 
1970 	/* Push all the partial event vectors to event device. */
1971 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1972 		if (vec->queue != rx_queue_id)
1973 			continue;
1974 		rxa_vector_expire(vec, rx_adapter);
1975 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1976 	}
1977 
1978 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1979 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1980 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1981 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1982 	rx_adapter->num_rx_polled -= pollq;
1983 	dev_info->nb_rx_poll -= pollq;
1984 	rx_adapter->num_rx_intr -= intrq;
1985 	dev_info->nb_rx_intr -= intrq;
1986 	dev_info->nb_shared_intr -= intrq && sintrq;
1987 	if (rx_adapter->use_queue_event_buf) {
1988 		struct eth_event_enqueue_buffer *event_buf =
1989 			dev_info->rx_queue[rx_queue_id].event_buf;
1990 		struct rte_event_eth_rx_adapter_stats *stats =
1991 			dev_info->rx_queue[rx_queue_id].stats;
1992 		rte_free(event_buf->events);
1993 		rte_free(event_buf);
1994 		rte_free(stats);
1995 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
1996 		dev_info->rx_queue[rx_queue_id].stats = NULL;
1997 	}
1998 }
1999 
2000 static int
2001 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
2002 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
2003 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
2004 {
2005 	struct eth_rx_queue_info *queue_info;
2006 	const struct rte_event *ev = &conf->ev;
2007 	int pollq;
2008 	int intrq;
2009 	int sintrq;
2010 	struct rte_event *qi_ev;
2011 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
2012 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
2013 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
2014 	int ret;
2015 
2016 	if (rx_queue_id == -1) {
2017 		uint16_t nb_rx_queues;
2018 		uint16_t i;
2019 
2020 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2021 		for (i = 0; i <	nb_rx_queues; i++) {
2022 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
2023 			if (ret)
2024 				return ret;
2025 		}
2026 		return 0;
2027 	}
2028 
2029 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2030 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2031 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2032 
2033 	queue_info = &dev_info->rx_queue[rx_queue_id];
2034 	queue_info->wt = conf->servicing_weight;
2035 
2036 	qi_ev = (struct rte_event *)&queue_info->event;
2037 	qi_ev->event = ev->event;
2038 	qi_ev->op = RTE_EVENT_OP_NEW;
2039 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2040 	qi_ev->sub_event_type = 0;
2041 
2042 	if (conf->rx_queue_flags &
2043 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2044 		queue_info->flow_id_mask = ~0;
2045 	} else
2046 		qi_ev->flow_id = 0;
2047 
2048 	if (conf->rx_queue_flags &
2049 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2050 		queue_info->ena_vector = 1;
2051 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2052 		rxa_set_vector_data(queue_info, conf->vector_sz,
2053 				    conf->vector_timeout_ns, conf->vector_mp,
2054 				    rx_queue_id, dev_info->dev->data->port_id);
2055 		rx_adapter->ena_vector = 1;
2056 		rx_adapter->vector_tmo_ticks =
2057 			rx_adapter->vector_tmo_ticks ?
2058 				      RTE_MIN(queue_info->vector_data
2059 							.vector_timeout_ticks >>
2060 						1,
2061 					rx_adapter->vector_tmo_ticks) :
2062 				queue_info->vector_data.vector_timeout_ticks >>
2063 					1;
2064 	}
2065 
2066 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2067 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2068 		rx_adapter->num_rx_polled += !pollq;
2069 		dev_info->nb_rx_poll += !pollq;
2070 		rx_adapter->num_rx_intr -= intrq;
2071 		dev_info->nb_rx_intr -= intrq;
2072 		dev_info->nb_shared_intr -= intrq && sintrq;
2073 	}
2074 
2075 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2076 		rx_adapter->num_rx_polled -= pollq;
2077 		dev_info->nb_rx_poll -= pollq;
2078 		rx_adapter->num_rx_intr += !intrq;
2079 		dev_info->nb_rx_intr += !intrq;
2080 		dev_info->nb_shared_intr += !intrq && sintrq;
2081 		if (dev_info->nb_shared_intr == 1) {
2082 			if (dev_info->multi_intr_cap)
2083 				dev_info->next_q_idx =
2084 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2085 			else
2086 				dev_info->next_q_idx = 0;
2087 		}
2088 	}
2089 
2090 	if (!rx_adapter->use_queue_event_buf)
2091 		return 0;
2092 
2093 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2094 				sizeof(*new_rx_buf), 0,
2095 				rte_eth_dev_socket_id(eth_dev_id));
2096 	if (new_rx_buf == NULL) {
2097 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2098 				 "dev_id: %d queue_id: %d",
2099 				 eth_dev_id, rx_queue_id);
2100 		return -ENOMEM;
2101 	}
2102 
2103 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2104 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2105 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2106 				sizeof(struct rte_event) *
2107 				new_rx_buf->events_size, 0,
2108 				rte_eth_dev_socket_id(eth_dev_id));
2109 	if (new_rx_buf->events == NULL) {
2110 		rte_free(new_rx_buf);
2111 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2112 				 "dev_id: %d queue_id: %d",
2113 				 eth_dev_id, rx_queue_id);
2114 		return -ENOMEM;
2115 	}
2116 
2117 	queue_info->event_buf = new_rx_buf;
2118 
2119 	/* Allocate storage for adapter queue stats */
2120 	stats = rte_zmalloc_socket("rx_queue_stats",
2121 				sizeof(*stats), 0,
2122 				rte_eth_dev_socket_id(eth_dev_id));
2123 	if (stats == NULL) {
2124 		rte_free(new_rx_buf->events);
2125 		rte_free(new_rx_buf);
2126 		RTE_EDEV_LOG_ERR("Failed to allocate stats storage for"
2127 				 " dev_id: %d queue_id: %d",
2128 				 eth_dev_id, rx_queue_id);
2129 		return -ENOMEM;
2130 	}
2131 
2132 	queue_info->stats = stats;
2133 
2134 	return 0;
2135 }
2136 
2137 static int
2138 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2139 	   int rx_queue_id,
2140 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2141 {
2142 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2143 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2144 	int ret;
2145 	struct eth_rx_poll_entry *rx_poll;
2146 	struct eth_rx_queue_info *rx_queue;
2147 	uint32_t *rx_wrr;
2148 	uint16_t nb_rx_queues;
2149 	uint32_t nb_rx_poll, nb_wrr;
2150 	uint32_t nb_rx_intr;
2151 	int num_intr_vec;
2152 	uint16_t wt;
2153 
2154 	if (queue_conf->servicing_weight == 0) {
2155 		struct rte_eth_dev_data *data = dev_info->dev->data;
2156 
2157 		temp_conf = *queue_conf;
2158 		if (!data->dev_conf.intr_conf.rxq) {
2159 			/* If Rx interrupts are disabled set wt = 1 */
2160 			temp_conf.servicing_weight = 1;
2161 		}
2162 		queue_conf = &temp_conf;
2163 
2164 		if (queue_conf->servicing_weight == 0 &&
2165 		    rx_adapter->use_queue_event_buf) {
2166 
2167 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2168 					 "not supported for interrupt queues "
2169 					 "dev_id: %d queue_id: %d",
2170 					 eth_dev_id, rx_queue_id);
2171 			return -EINVAL;
2172 		}
2173 	}
2174 
2175 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2176 	rx_queue = dev_info->rx_queue;
2177 	wt = queue_conf->servicing_weight;
2178 
2179 	if (dev_info->rx_queue == NULL) {
2180 		dev_info->rx_queue =
2181 		    rte_zmalloc_socket(rx_adapter->mem_name,
2182 				       nb_rx_queues *
2183 				       sizeof(struct eth_rx_queue_info), 0,
2184 				       rx_adapter->socket_id);
2185 		if (dev_info->rx_queue == NULL)
2186 			return -ENOMEM;
2187 	}
2188 	rx_wrr = NULL;
2189 	rx_poll = NULL;
2190 
2191 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2192 			queue_conf->servicing_weight,
2193 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2194 
2195 	if (dev_info->dev->intr_handle)
2196 		dev_info->multi_intr_cap =
2197 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2198 
2199 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2200 				&rx_poll, &rx_wrr);
2201 	if (ret)
2202 		goto err_free_rxqueue;
2203 
2204 	if (wt == 0) {
2205 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2206 
2207 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2208 		if (ret)
2209 			goto err_free_rxqueue;
2210 
2211 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2212 		if (ret)
2213 			goto err_free_rxqueue;
2214 	} else {
2215 
2216 		num_intr_vec = 0;
2217 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2218 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2219 						rx_queue_id, 0);
2220 			/* interrupt based queues are being converted to
2221 			 * poll mode queues, delete the interrupt configuration
2222 			 * for those.
2223 			 */
2224 			ret = rxa_del_intr_queue(rx_adapter,
2225 						dev_info, rx_queue_id);
2226 			if (ret)
2227 				goto err_free_rxqueue;
2228 		}
2229 	}
2230 
2231 	if (nb_rx_intr == 0) {
2232 		ret = rxa_free_intr_resources(rx_adapter);
2233 		if (ret)
2234 			goto err_free_rxqueue;
2235 	}
2236 
2237 	if (wt == 0) {
2238 		uint16_t i;
2239 
2240 		if (rx_queue_id  == -1) {
2241 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2242 				dev_info->intr_queue[i] = i;
2243 		} else {
2244 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2245 				dev_info->intr_queue[nb_rx_intr - 1] =
2246 					rx_queue_id;
2247 		}
2248 	}
2249 
2250 
2251 
2252 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2253 	if (ret)
2254 		goto err_free_rxqueue;
2255 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2256 
2257 	rte_free(rx_adapter->eth_rx_poll);
2258 	rte_free(rx_adapter->wrr_sched);
2259 
2260 	rx_adapter->eth_rx_poll = rx_poll;
2261 	rx_adapter->wrr_sched = rx_wrr;
2262 	rx_adapter->wrr_len = nb_wrr;
2263 	rx_adapter->num_intr_vec += num_intr_vec;
2264 	return 0;
2265 
2266 err_free_rxqueue:
2267 	if (rx_queue == NULL) {
2268 		rte_free(dev_info->rx_queue);
2269 		dev_info->rx_queue = NULL;
2270 	}
2271 
2272 	rte_free(rx_poll);
2273 	rte_free(rx_wrr);
2274 
2275 	return ret;
2276 }
2277 
2278 static int
2279 rxa_ctrl(uint8_t id, int start)
2280 {
2281 	struct event_eth_rx_adapter *rx_adapter;
2282 	struct rte_eventdev *dev;
2283 	struct eth_device_info *dev_info;
2284 	uint32_t i;
2285 	int use_service = 0;
2286 	int stop = !start;
2287 
2288 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2289 	rx_adapter = rxa_id_to_adapter(id);
2290 	if (rx_adapter == NULL)
2291 		return -EINVAL;
2292 
2293 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2294 
2295 	RTE_ETH_FOREACH_DEV(i) {
2296 		dev_info = &rx_adapter->eth_devices[i];
2297 		/* if start  check for num dev queues */
2298 		if (start && !dev_info->nb_dev_queues)
2299 			continue;
2300 		/* if stop check if dev has been started */
2301 		if (stop && !dev_info->dev_rx_started)
2302 			continue;
2303 		use_service |= !dev_info->internal_event_port;
2304 		dev_info->dev_rx_started = start;
2305 		if (dev_info->internal_event_port == 0)
2306 			continue;
2307 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2308 						&rte_eth_devices[i]) :
2309 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2310 						&rte_eth_devices[i]);
2311 	}
2312 
2313 	if (use_service) {
2314 		rte_spinlock_lock(&rx_adapter->rx_lock);
2315 		rx_adapter->rxa_started = start;
2316 		rte_service_runstate_set(rx_adapter->service_id, start);
2317 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2318 	}
2319 
2320 	return 0;
2321 }
2322 
2323 static int
2324 rxa_create(uint8_t id, uint8_t dev_id,
2325 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2326 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2327 	   void *conf_arg)
2328 {
2329 	struct event_eth_rx_adapter *rx_adapter;
2330 	struct eth_event_enqueue_buffer *buf;
2331 	struct rte_event *events;
2332 	int ret;
2333 	int socket_id;
2334 	uint16_t i;
2335 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2336 	const uint8_t default_rss_key[] = {
2337 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2338 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2339 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2340 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2341 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2342 	};
2343 
2344 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2345 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2346 
2347 	if (conf_cb == NULL)
2348 		return -EINVAL;
2349 
2350 	if (event_eth_rx_adapter == NULL) {
2351 		ret = rte_event_eth_rx_adapter_init();
2352 		if (ret)
2353 			return ret;
2354 	}
2355 
2356 	rx_adapter = rxa_id_to_adapter(id);
2357 	if (rx_adapter != NULL) {
2358 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2359 		return -EEXIST;
2360 	}
2361 
2362 	socket_id = rte_event_dev_socket_id(dev_id);
2363 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2364 		"rte_event_eth_rx_adapter_%d",
2365 		id);
2366 
2367 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2368 			RTE_CACHE_LINE_SIZE, socket_id);
2369 	if (rx_adapter == NULL) {
2370 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2371 		return -ENOMEM;
2372 	}
2373 
2374 	rx_adapter->eventdev_id = dev_id;
2375 	rx_adapter->socket_id = socket_id;
2376 	rx_adapter->conf_cb = conf_cb;
2377 	rx_adapter->conf_arg = conf_arg;
2378 	rx_adapter->id = id;
2379 	TAILQ_INIT(&rx_adapter->vector_list);
2380 	strcpy(rx_adapter->mem_name, mem_name);
2381 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2382 					RTE_MAX_ETHPORTS *
2383 					sizeof(struct eth_device_info), 0,
2384 					socket_id);
2385 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2386 			(uint32_t *)rx_adapter->rss_key_be,
2387 			    RTE_DIM(default_rss_key));
2388 
2389 	if (rx_adapter->eth_devices == NULL) {
2390 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2391 		rte_free(rx_adapter);
2392 		return -ENOMEM;
2393 	}
2394 
2395 	rte_spinlock_init(&rx_adapter->rx_lock);
2396 
2397 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2398 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2399 
2400 	/* Rx adapter event buffer allocation */
2401 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2402 
2403 	if (!rx_adapter->use_queue_event_buf) {
2404 		buf = &rx_adapter->event_enqueue_buffer;
2405 		buf->events_size = rxa_params->event_buf_size;
2406 
2407 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2408 					    buf->events_size * sizeof(*events),
2409 					    0, socket_id);
2410 		if (events == NULL) {
2411 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2412 					 "for adapter event buffer");
2413 			rte_free(rx_adapter->eth_devices);
2414 			rte_free(rx_adapter);
2415 			return -ENOMEM;
2416 		}
2417 
2418 		rx_adapter->event_enqueue_buffer.events = events;
2419 	}
2420 
2421 	event_eth_rx_adapter[id] = rx_adapter;
2422 
2423 	if (conf_cb == rxa_default_conf_cb)
2424 		rx_adapter->default_cb_arg = 1;
2425 
2426 	if (rte_mbuf_dyn_rx_timestamp_register(
2427 			&event_eth_rx_timestamp_dynfield_offset,
2428 			&event_eth_rx_timestamp_dynflag) != 0) {
2429 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2430 		return -rte_errno;
2431 	}
2432 
2433 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2434 		conf_arg);
2435 	return 0;
2436 }
2437 
2438 int
2439 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2440 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2441 				void *conf_arg)
2442 {
2443 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2444 
2445 	/* use default values for adapter params */
2446 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2447 	rxa_params.use_queue_event_buf = false;
2448 
2449 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2450 }
2451 
2452 int
2453 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2454 			struct rte_event_port_conf *port_config,
2455 			struct rte_event_eth_rx_adapter_params *rxa_params)
2456 {
2457 	struct rte_event_port_conf *pc;
2458 	int ret;
2459 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2460 
2461 	if (port_config == NULL)
2462 		return -EINVAL;
2463 
2464 	if (rxa_params == NULL) {
2465 		/* use default values if rxa_params is NULL */
2466 		rxa_params = &temp_params;
2467 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2468 		rxa_params->use_queue_event_buf = false;
2469 	} else if ((!rxa_params->use_queue_event_buf &&
2470 		    rxa_params->event_buf_size == 0) ||
2471 		   (rxa_params->use_queue_event_buf &&
2472 		    rxa_params->event_buf_size != 0)) {
2473 		RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2474 		return -EINVAL;
2475 	} else if (!rxa_params->use_queue_event_buf) {
2476 		/* adjust event buff size with BATCH_SIZE used for fetching
2477 		 * packets from NIC rx queues to get full buffer utilization
2478 		 * and prevent unnecessary rollovers.
2479 		 */
2480 
2481 		rxa_params->event_buf_size =
2482 			RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2483 		rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2484 	}
2485 
2486 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2487 	if (pc == NULL)
2488 		return -ENOMEM;
2489 
2490 	*pc = *port_config;
2491 
2492 	ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2493 	if (ret)
2494 		rte_free(pc);
2495 
2496 	return ret;
2497 }
2498 
2499 int
2500 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2501 		struct rte_event_port_conf *port_config)
2502 {
2503 	struct rte_event_port_conf *pc;
2504 	int ret;
2505 
2506 	if (port_config == NULL)
2507 		return -EINVAL;
2508 
2509 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2510 
2511 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2512 	if (pc == NULL)
2513 		return -ENOMEM;
2514 	*pc = *port_config;
2515 
2516 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2517 					rxa_default_conf_cb,
2518 					pc);
2519 	if (ret)
2520 		rte_free(pc);
2521 	return ret;
2522 }
2523 
2524 int
2525 rte_event_eth_rx_adapter_free(uint8_t id)
2526 {
2527 	struct event_eth_rx_adapter *rx_adapter;
2528 
2529 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2530 
2531 	rx_adapter = rxa_id_to_adapter(id);
2532 	if (rx_adapter == NULL)
2533 		return -EINVAL;
2534 
2535 	if (rx_adapter->nb_queues) {
2536 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2537 				rx_adapter->nb_queues);
2538 		return -EBUSY;
2539 	}
2540 
2541 	if (rx_adapter->default_cb_arg)
2542 		rte_free(rx_adapter->conf_arg);
2543 	rte_free(rx_adapter->eth_devices);
2544 	if (!rx_adapter->use_queue_event_buf)
2545 		rte_free(rx_adapter->event_enqueue_buffer.events);
2546 	rte_free(rx_adapter);
2547 	event_eth_rx_adapter[id] = NULL;
2548 
2549 	rte_eventdev_trace_eth_rx_adapter_free(id);
2550 	return 0;
2551 }
2552 
2553 int
2554 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2555 		uint16_t eth_dev_id,
2556 		int32_t rx_queue_id,
2557 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2558 {
2559 	int ret;
2560 	uint32_t cap;
2561 	struct event_eth_rx_adapter *rx_adapter;
2562 	struct rte_eventdev *dev;
2563 	struct eth_device_info *dev_info;
2564 	struct rte_event_eth_rx_adapter_vector_limits limits;
2565 
2566 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2567 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2568 
2569 	rx_adapter = rxa_id_to_adapter(id);
2570 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2571 		return -EINVAL;
2572 
2573 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2574 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2575 						eth_dev_id,
2576 						&cap);
2577 	if (ret) {
2578 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2579 			"eth port %" PRIu16, id, eth_dev_id);
2580 		return ret;
2581 	}
2582 
2583 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2584 		&& (queue_conf->rx_queue_flags &
2585 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2586 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2587 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2588 				eth_dev_id, id);
2589 		return -EINVAL;
2590 	}
2591 
2592 	if (queue_conf->rx_queue_flags &
2593 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2594 
2595 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2596 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2597 					 " eth port: %" PRIu16
2598 					 " adapter id: %" PRIu8,
2599 					 eth_dev_id, id);
2600 			return -EINVAL;
2601 		}
2602 
2603 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2604 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2605 		if (ret < 0) {
2606 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2607 					 " eth port: %" PRIu16
2608 					 " adapter id: %" PRIu8,
2609 					 eth_dev_id, id);
2610 			return -EINVAL;
2611 		}
2612 		if (queue_conf->vector_sz < limits.min_sz ||
2613 		    queue_conf->vector_sz > limits.max_sz ||
2614 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2615 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2616 		    queue_conf->vector_mp == NULL) {
2617 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2618 					 " eth port: %" PRIu16
2619 					 " adapter id: %" PRIu8,
2620 					 eth_dev_id, id);
2621 			return -EINVAL;
2622 		}
2623 		if (queue_conf->vector_mp->elt_size <
2624 		    (sizeof(struct rte_event_vector) +
2625 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2626 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2627 					 " eth port: %" PRIu16
2628 					 " adapter id: %" PRIu8,
2629 					 eth_dev_id, id);
2630 			return -EINVAL;
2631 		}
2632 	}
2633 
2634 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2635 		(rx_queue_id != -1)) {
2636 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2637 			"event queue, eth port: %" PRIu16 " adapter id: %"
2638 			PRIu8, eth_dev_id, id);
2639 		return -EINVAL;
2640 	}
2641 
2642 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2643 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2644 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2645 			 (uint16_t)rx_queue_id);
2646 		return -EINVAL;
2647 	}
2648 
2649 	if ((rx_adapter->use_queue_event_buf &&
2650 	     queue_conf->event_buf_size == 0) ||
2651 	    (!rx_adapter->use_queue_event_buf &&
2652 	     queue_conf->event_buf_size != 0)) {
2653 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2654 		return -EINVAL;
2655 	}
2656 
2657 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2658 
2659 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2660 		if (*dev->dev_ops->eth_rx_adapter_queue_add == NULL)
2661 			return -ENOTSUP;
2662 		if (dev_info->rx_queue == NULL) {
2663 			dev_info->rx_queue =
2664 			    rte_zmalloc_socket(rx_adapter->mem_name,
2665 					dev_info->dev->data->nb_rx_queues *
2666 					sizeof(struct eth_rx_queue_info), 0,
2667 					rx_adapter->socket_id);
2668 			if (dev_info->rx_queue == NULL)
2669 				return -ENOMEM;
2670 		}
2671 
2672 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2673 				&rte_eth_devices[eth_dev_id],
2674 				rx_queue_id, queue_conf);
2675 		if (ret == 0) {
2676 			dev_info->internal_event_port = 1;
2677 			rxa_update_queue(rx_adapter,
2678 					&rx_adapter->eth_devices[eth_dev_id],
2679 					rx_queue_id,
2680 					1);
2681 		}
2682 	} else {
2683 		rte_spinlock_lock(&rx_adapter->rx_lock);
2684 		dev_info->internal_event_port = 0;
2685 		ret = rxa_init_service(rx_adapter, id);
2686 		if (ret == 0) {
2687 			uint32_t service_id = rx_adapter->service_id;
2688 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2689 					queue_conf);
2690 			rte_service_component_runstate_set(service_id,
2691 				rxa_sw_adapter_queue_count(rx_adapter));
2692 		}
2693 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2694 	}
2695 
2696 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2697 		rx_queue_id, queue_conf, ret);
2698 	if (ret)
2699 		return ret;
2700 
2701 	return 0;
2702 }
2703 
2704 static int
2705 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2706 {
2707 	limits->max_sz = MAX_VECTOR_SIZE;
2708 	limits->min_sz = MIN_VECTOR_SIZE;
2709 	limits->max_timeout_ns = MAX_VECTOR_NS;
2710 	limits->min_timeout_ns = MIN_VECTOR_NS;
2711 
2712 	return 0;
2713 }
2714 
2715 int
2716 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2717 				int32_t rx_queue_id)
2718 {
2719 	int ret = 0;
2720 	struct rte_eventdev *dev;
2721 	struct event_eth_rx_adapter *rx_adapter;
2722 	struct eth_device_info *dev_info;
2723 	uint32_t cap;
2724 	uint32_t nb_rx_poll = 0;
2725 	uint32_t nb_wrr = 0;
2726 	uint32_t nb_rx_intr;
2727 	struct eth_rx_poll_entry *rx_poll = NULL;
2728 	uint32_t *rx_wrr = NULL;
2729 	int num_intr_vec;
2730 
2731 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2732 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2733 
2734 	rx_adapter = rxa_id_to_adapter(id);
2735 	if (rx_adapter == NULL)
2736 		return -EINVAL;
2737 
2738 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2739 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2740 						eth_dev_id,
2741 						&cap);
2742 	if (ret)
2743 		return ret;
2744 
2745 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2746 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2747 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2748 			 (uint16_t)rx_queue_id);
2749 		return -EINVAL;
2750 	}
2751 
2752 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2753 
2754 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2755 		if (*dev->dev_ops->eth_rx_adapter_queue_del == NULL)
2756 			return -ENOTSUP;
2757 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2758 						&rte_eth_devices[eth_dev_id],
2759 						rx_queue_id);
2760 		if (ret == 0) {
2761 			rxa_update_queue(rx_adapter,
2762 					&rx_adapter->eth_devices[eth_dev_id],
2763 					rx_queue_id,
2764 					0);
2765 			if (dev_info->nb_dev_queues == 0) {
2766 				rte_free(dev_info->rx_queue);
2767 				dev_info->rx_queue = NULL;
2768 			}
2769 		}
2770 	} else {
2771 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2772 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2773 
2774 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2775 			&rx_poll, &rx_wrr);
2776 		if (ret)
2777 			return ret;
2778 
2779 		rte_spinlock_lock(&rx_adapter->rx_lock);
2780 
2781 		num_intr_vec = 0;
2782 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2783 
2784 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2785 						rx_queue_id, 0);
2786 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2787 					rx_queue_id);
2788 			if (ret)
2789 				goto unlock_ret;
2790 		}
2791 
2792 		if (nb_rx_intr == 0) {
2793 			ret = rxa_free_intr_resources(rx_adapter);
2794 			if (ret)
2795 				goto unlock_ret;
2796 		}
2797 
2798 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2799 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2800 
2801 		rte_free(rx_adapter->eth_rx_poll);
2802 		rte_free(rx_adapter->wrr_sched);
2803 
2804 		if (nb_rx_intr == 0) {
2805 			rte_free(dev_info->intr_queue);
2806 			dev_info->intr_queue = NULL;
2807 		}
2808 
2809 		rx_adapter->eth_rx_poll = rx_poll;
2810 		rx_adapter->wrr_sched = rx_wrr;
2811 		rx_adapter->wrr_len = nb_wrr;
2812 		/*
2813 		 * reset next poll start position (wrr_pos) to avoid buffer
2814 		 * overrun when wrr_len is reduced in case of queue delete
2815 		 */
2816 		rx_adapter->wrr_pos = 0;
2817 		rx_adapter->num_intr_vec += num_intr_vec;
2818 
2819 		if (dev_info->nb_dev_queues == 0) {
2820 			rte_free(dev_info->rx_queue);
2821 			dev_info->rx_queue = NULL;
2822 		}
2823 unlock_ret:
2824 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2825 		if (ret) {
2826 			rte_free(rx_poll);
2827 			rte_free(rx_wrr);
2828 			return ret;
2829 		}
2830 
2831 		rte_service_component_runstate_set(rx_adapter->service_id,
2832 				rxa_sw_adapter_queue_count(rx_adapter));
2833 	}
2834 
2835 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2836 		rx_queue_id, ret);
2837 	return ret;
2838 }
2839 
2840 int
2841 rte_event_eth_rx_adapter_vector_limits_get(
2842 	uint8_t dev_id, uint16_t eth_port_id,
2843 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2844 {
2845 	struct rte_eventdev *dev;
2846 	uint32_t cap;
2847 	int ret;
2848 
2849 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2850 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2851 
2852 	if (limits == NULL)
2853 		return -EINVAL;
2854 
2855 	dev = &rte_eventdevs[dev_id];
2856 
2857 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2858 	if (ret) {
2859 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2860 				 "eth port %" PRIu16,
2861 				 dev_id, eth_port_id);
2862 		return ret;
2863 	}
2864 
2865 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2866 		if (*dev->dev_ops->eth_rx_adapter_vector_limits_get == NULL)
2867 			return -ENOTSUP;
2868 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2869 			dev, &rte_eth_devices[eth_port_id], limits);
2870 	} else {
2871 		ret = rxa_sw_vector_limits(limits);
2872 	}
2873 
2874 	return ret;
2875 }
2876 
2877 int
2878 rte_event_eth_rx_adapter_start(uint8_t id)
2879 {
2880 	rte_eventdev_trace_eth_rx_adapter_start(id);
2881 	return rxa_ctrl(id, 1);
2882 }
2883 
2884 int
2885 rte_event_eth_rx_adapter_stop(uint8_t id)
2886 {
2887 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2888 	return rxa_ctrl(id, 0);
2889 }
2890 
2891 static inline void
2892 rxa_queue_stats_reset(struct eth_rx_queue_info *queue_info)
2893 {
2894 	struct rte_event_eth_rx_adapter_stats *q_stats;
2895 
2896 	q_stats = queue_info->stats;
2897 	memset(q_stats, 0, sizeof(*q_stats));
2898 }
2899 
2900 int
2901 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2902 			       struct rte_event_eth_rx_adapter_stats *stats)
2903 {
2904 	struct event_eth_rx_adapter *rx_adapter;
2905 	struct eth_event_enqueue_buffer *buf;
2906 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2907 	struct rte_event_eth_rx_adapter_stats dev_stats;
2908 	struct rte_eventdev *dev;
2909 	struct eth_device_info *dev_info;
2910 	struct eth_rx_queue_info *queue_info;
2911 	struct rte_event_eth_rx_adapter_stats *q_stats;
2912 	uint32_t i, j;
2913 	int ret;
2914 
2915 	if (rxa_memzone_lookup())
2916 		return -ENOMEM;
2917 
2918 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2919 
2920 	rx_adapter = rxa_id_to_adapter(id);
2921 	if (rx_adapter  == NULL || stats == NULL)
2922 		return -EINVAL;
2923 
2924 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2925 	memset(stats, 0, sizeof(*stats));
2926 
2927 	if (rx_adapter->service_inited)
2928 		*stats = rx_adapter->stats;
2929 
2930 	RTE_ETH_FOREACH_DEV(i) {
2931 		dev_info = &rx_adapter->eth_devices[i];
2932 
2933 		if (rx_adapter->use_queue_event_buf && dev_info->rx_queue) {
2934 
2935 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
2936 			     j++) {
2937 				queue_info = &dev_info->rx_queue[j];
2938 				if (!queue_info->queue_enabled)
2939 					continue;
2940 				q_stats = queue_info->stats;
2941 
2942 				stats->rx_packets += q_stats->rx_packets;
2943 				stats->rx_poll_count += q_stats->rx_poll_count;
2944 				stats->rx_enq_count += q_stats->rx_enq_count;
2945 				stats->rx_enq_retry += q_stats->rx_enq_retry;
2946 				stats->rx_dropped += q_stats->rx_dropped;
2947 				stats->rx_enq_block_cycles +=
2948 						q_stats->rx_enq_block_cycles;
2949 			}
2950 		}
2951 
2952 		if (dev_info->internal_event_port == 0 ||
2953 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2954 			continue;
2955 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2956 						&rte_eth_devices[i],
2957 						&dev_stats);
2958 		if (ret)
2959 			continue;
2960 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
2961 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2962 	}
2963 
2964 	buf = &rx_adapter->event_enqueue_buffer;
2965 	stats->rx_packets += dev_stats_sum.rx_packets;
2966 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2967 	stats->rx_event_buf_count = buf->count;
2968 	stats->rx_event_buf_size = buf->events_size;
2969 
2970 	return 0;
2971 }
2972 
2973 int
2974 rte_event_eth_rx_adapter_queue_stats_get(uint8_t id,
2975 		uint16_t eth_dev_id,
2976 		uint16_t rx_queue_id,
2977 		struct rte_event_eth_rx_adapter_queue_stats *stats)
2978 {
2979 	struct event_eth_rx_adapter *rx_adapter;
2980 	struct eth_device_info *dev_info;
2981 	struct eth_rx_queue_info *queue_info;
2982 	struct eth_event_enqueue_buffer *event_buf;
2983 	struct rte_event_eth_rx_adapter_stats *q_stats;
2984 	struct rte_eventdev *dev;
2985 
2986 	if (rxa_memzone_lookup())
2987 		return -ENOMEM;
2988 
2989 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2990 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2991 
2992 	rx_adapter = rxa_id_to_adapter(id);
2993 
2994 	if (rx_adapter == NULL || stats == NULL)
2995 		return -EINVAL;
2996 
2997 	if (!rx_adapter->use_queue_event_buf)
2998 		return -EINVAL;
2999 
3000 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3001 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3002 		return -EINVAL;
3003 	}
3004 
3005 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3006 	if (dev_info->rx_queue == NULL ||
3007 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3008 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3009 		return -EINVAL;
3010 	}
3011 
3012 	if (dev_info->internal_event_port == 0) {
3013 		queue_info = &dev_info->rx_queue[rx_queue_id];
3014 		event_buf = queue_info->event_buf;
3015 		q_stats = queue_info->stats;
3016 
3017 		stats->rx_event_buf_count = event_buf->count;
3018 		stats->rx_event_buf_size = event_buf->events_size;
3019 		stats->rx_packets = q_stats->rx_packets;
3020 		stats->rx_poll_count = q_stats->rx_poll_count;
3021 		stats->rx_dropped = q_stats->rx_dropped;
3022 	}
3023 
3024 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3025 	if (dev->dev_ops->eth_rx_adapter_queue_stats_get != NULL) {
3026 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_get)(dev,
3027 						&rte_eth_devices[eth_dev_id],
3028 						rx_queue_id, stats);
3029 	}
3030 
3031 	return 0;
3032 }
3033 
3034 int
3035 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
3036 {
3037 	struct event_eth_rx_adapter *rx_adapter;
3038 	struct rte_eventdev *dev;
3039 	struct eth_device_info *dev_info;
3040 	struct eth_rx_queue_info *queue_info;
3041 	uint32_t i, j;
3042 
3043 	if (rxa_memzone_lookup())
3044 		return -ENOMEM;
3045 
3046 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3047 
3048 	rx_adapter = rxa_id_to_adapter(id);
3049 	if (rx_adapter == NULL)
3050 		return -EINVAL;
3051 
3052 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3053 
3054 	RTE_ETH_FOREACH_DEV(i) {
3055 		dev_info = &rx_adapter->eth_devices[i];
3056 
3057 		if (rx_adapter->use_queue_event_buf  && dev_info->rx_queue) {
3058 
3059 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3060 						j++) {
3061 				queue_info = &dev_info->rx_queue[j];
3062 				if (!queue_info->queue_enabled)
3063 					continue;
3064 				rxa_queue_stats_reset(queue_info);
3065 			}
3066 		}
3067 
3068 		if (dev_info->internal_event_port == 0 ||
3069 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
3070 			continue;
3071 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
3072 							&rte_eth_devices[i]);
3073 	}
3074 
3075 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
3076 
3077 	return 0;
3078 }
3079 
3080 int
3081 rte_event_eth_rx_adapter_queue_stats_reset(uint8_t id,
3082 		uint16_t eth_dev_id,
3083 		uint16_t rx_queue_id)
3084 {
3085 	struct event_eth_rx_adapter *rx_adapter;
3086 	struct eth_device_info *dev_info;
3087 	struct eth_rx_queue_info *queue_info;
3088 	struct rte_eventdev *dev;
3089 
3090 	if (rxa_memzone_lookup())
3091 		return -ENOMEM;
3092 
3093 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3094 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3095 
3096 	rx_adapter = rxa_id_to_adapter(id);
3097 	if (rx_adapter == NULL)
3098 		return -EINVAL;
3099 
3100 	if (!rx_adapter->use_queue_event_buf)
3101 		return -EINVAL;
3102 
3103 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3104 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3105 		return -EINVAL;
3106 	}
3107 
3108 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3109 
3110 	if (dev_info->rx_queue == NULL ||
3111 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3112 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3113 		return -EINVAL;
3114 	}
3115 
3116 	if (dev_info->internal_event_port == 0) {
3117 		queue_info = &dev_info->rx_queue[rx_queue_id];
3118 		rxa_queue_stats_reset(queue_info);
3119 	}
3120 
3121 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3122 	if (dev->dev_ops->eth_rx_adapter_queue_stats_reset != NULL) {
3123 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_reset)(dev,
3124 						&rte_eth_devices[eth_dev_id],
3125 						rx_queue_id);
3126 	}
3127 
3128 	return 0;
3129 }
3130 
3131 int
3132 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
3133 {
3134 	struct event_eth_rx_adapter *rx_adapter;
3135 
3136 	if (rxa_memzone_lookup())
3137 		return -ENOMEM;
3138 
3139 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3140 
3141 	rx_adapter = rxa_id_to_adapter(id);
3142 	if (rx_adapter == NULL || service_id == NULL)
3143 		return -EINVAL;
3144 
3145 	if (rx_adapter->service_inited)
3146 		*service_id = rx_adapter->service_id;
3147 
3148 	return rx_adapter->service_inited ? 0 : -ESRCH;
3149 }
3150 
3151 int
3152 rte_event_eth_rx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
3153 {
3154 	struct event_eth_rx_adapter *rx_adapter;
3155 
3156 	if (rxa_memzone_lookup())
3157 		return -ENOMEM;
3158 
3159 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3160 
3161 	rx_adapter = rxa_id_to_adapter(id);
3162 	if (rx_adapter == NULL || event_port_id == NULL)
3163 		return -EINVAL;
3164 
3165 	if (rx_adapter->service_inited)
3166 		*event_port_id = rx_adapter->event_port_id;
3167 
3168 	return rx_adapter->service_inited ? 0 : -ESRCH;
3169 }
3170 
3171 int
3172 rte_event_eth_rx_adapter_cb_register(uint8_t id,
3173 					uint16_t eth_dev_id,
3174 					rte_event_eth_rx_adapter_cb_fn cb_fn,
3175 					void *cb_arg)
3176 {
3177 	struct event_eth_rx_adapter *rx_adapter;
3178 	struct eth_device_info *dev_info;
3179 	uint32_t cap;
3180 	int ret;
3181 
3182 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3183 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3184 
3185 	rx_adapter = rxa_id_to_adapter(id);
3186 	if (rx_adapter == NULL)
3187 		return -EINVAL;
3188 
3189 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3190 	if (dev_info->rx_queue == NULL)
3191 		return -EINVAL;
3192 
3193 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3194 						eth_dev_id,
3195 						&cap);
3196 	if (ret) {
3197 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3198 			"eth port %" PRIu16, id, eth_dev_id);
3199 		return ret;
3200 	}
3201 
3202 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3203 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
3204 				PRIu16, eth_dev_id);
3205 		return -EINVAL;
3206 	}
3207 
3208 	rte_spinlock_lock(&rx_adapter->rx_lock);
3209 	dev_info->cb_fn = cb_fn;
3210 	dev_info->cb_arg = cb_arg;
3211 	rte_spinlock_unlock(&rx_adapter->rx_lock);
3212 
3213 	return 0;
3214 }
3215 
3216 int
3217 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
3218 			uint16_t eth_dev_id,
3219 			uint16_t rx_queue_id,
3220 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
3221 {
3222 #define TICK2NSEC(_ticks, _freq) (((_ticks) * (1E9)) / (_freq))
3223 	struct rte_eventdev *dev;
3224 	struct event_eth_rx_adapter *rx_adapter;
3225 	struct eth_device_info *dev_info;
3226 	struct eth_rx_queue_info *queue_info;
3227 	int ret;
3228 
3229 	if (rxa_memzone_lookup())
3230 		return -ENOMEM;
3231 
3232 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3233 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3234 
3235 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3236 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3237 		return -EINVAL;
3238 	}
3239 
3240 	if (queue_conf == NULL) {
3241 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3242 		return -EINVAL;
3243 	}
3244 
3245 	rx_adapter = rxa_id_to_adapter(id);
3246 	if (rx_adapter == NULL)
3247 		return -EINVAL;
3248 
3249 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3250 	if (dev_info->rx_queue == NULL ||
3251 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3252 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3253 		return -EINVAL;
3254 	}
3255 
3256 	queue_info = &dev_info->rx_queue[rx_queue_id];
3257 
3258 	memset(queue_conf, 0, sizeof(*queue_conf));
3259 	queue_conf->rx_queue_flags = 0;
3260 	if (queue_info->flow_id_mask != 0)
3261 		queue_conf->rx_queue_flags |=
3262 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3263 	queue_conf->servicing_weight = queue_info->wt;
3264 
3265 	queue_conf->ev.event = queue_info->event;
3266 
3267 	queue_conf->vector_sz = queue_info->vector_data.max_vector_count;
3268 	queue_conf->vector_mp = queue_info->vector_data.vector_pool;
3269 	/* need to be converted from ticks to ns */
3270 	queue_conf->vector_timeout_ns = TICK2NSEC(
3271 		queue_info->vector_data.vector_timeout_ticks, rte_get_timer_hz());
3272 
3273 	if (queue_info->event_buf != NULL)
3274 		queue_conf->event_buf_size = queue_info->event_buf->events_size;
3275 	else
3276 		queue_conf->event_buf_size = 0;
3277 
3278 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3279 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3280 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3281 						&rte_eth_devices[eth_dev_id],
3282 						rx_queue_id,
3283 						queue_conf);
3284 		return ret;
3285 	}
3286 
3287 	return 0;
3288 }
3289 
3290 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_u64(d, #s, stats.s)
3291 
3292 static int
3293 handle_rxa_stats(const char *cmd __rte_unused,
3294 		 const char *params,
3295 		 struct rte_tel_data *d)
3296 {
3297 	uint8_t rx_adapter_id;
3298 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3299 
3300 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3301 		return -1;
3302 
3303 	/* Get Rx adapter ID from parameter string */
3304 	rx_adapter_id = atoi(params);
3305 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3306 
3307 	/* Get Rx adapter stats */
3308 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3309 					       &rx_adptr_stats)) {
3310 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats\n");
3311 		return -1;
3312 	}
3313 
3314 	rte_tel_data_start_dict(d);
3315 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3316 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3317 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3318 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3319 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3320 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3321 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3322 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3323 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3324 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3325 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3326 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3327 
3328 	return 0;
3329 }
3330 
3331 static int
3332 handle_rxa_stats_reset(const char *cmd __rte_unused,
3333 		       const char *params,
3334 		       struct rte_tel_data *d __rte_unused)
3335 {
3336 	uint8_t rx_adapter_id;
3337 
3338 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3339 		return -1;
3340 
3341 	/* Get Rx adapter ID from parameter string */
3342 	rx_adapter_id = atoi(params);
3343 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3344 
3345 	/* Reset Rx adapter stats */
3346 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3347 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats\n");
3348 		return -1;
3349 	}
3350 
3351 	return 0;
3352 }
3353 
3354 static int
3355 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3356 			  const char *params,
3357 			  struct rte_tel_data *d)
3358 {
3359 	uint8_t rx_adapter_id;
3360 	uint16_t rx_queue_id;
3361 	int eth_dev_id, ret = -1;
3362 	char *token, *l_params;
3363 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3364 
3365 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3366 		return -1;
3367 
3368 	/* Get Rx adapter ID from parameter string */
3369 	l_params = strdup(params);
3370 	if (l_params == NULL)
3371 		return -ENOMEM;
3372 	token = strtok(l_params, ",");
3373 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3374 	rx_adapter_id = strtoul(token, NULL, 10);
3375 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3376 
3377 	token = strtok(NULL, ",");
3378 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3379 
3380 	/* Get device ID from parameter string */
3381 	eth_dev_id = strtoul(token, NULL, 10);
3382 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3383 
3384 	token = strtok(NULL, ",");
3385 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3386 
3387 	/* Get Rx queue ID from parameter string */
3388 	rx_queue_id = strtoul(token, NULL, 10);
3389 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3390 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3391 		ret = -EINVAL;
3392 		goto error;
3393 	}
3394 
3395 	token = strtok(NULL, "\0");
3396 	if (token != NULL)
3397 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3398 				 " telemetry command, ignoring");
3399 	/* Parsing parameter finished */
3400 	free(l_params);
3401 
3402 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3403 						    rx_queue_id, &queue_conf)) {
3404 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3405 		return -1;
3406 	}
3407 
3408 	rte_tel_data_start_dict(d);
3409 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3410 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3411 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3412 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3413 	RXA_ADD_DICT(queue_conf, servicing_weight);
3414 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3415 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3416 	RXA_ADD_DICT(queue_conf.ev, priority);
3417 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3418 
3419 	return 0;
3420 
3421 error:
3422 	free(l_params);
3423 	return ret;
3424 }
3425 
3426 static int
3427 handle_rxa_get_queue_stats(const char *cmd __rte_unused,
3428 			   const char *params,
3429 			   struct rte_tel_data *d)
3430 {
3431 	uint8_t rx_adapter_id;
3432 	uint16_t rx_queue_id;
3433 	int eth_dev_id, ret = -1;
3434 	char *token, *l_params;
3435 	struct rte_event_eth_rx_adapter_queue_stats q_stats;
3436 
3437 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3438 		return -1;
3439 
3440 	/* Get Rx adapter ID from parameter string */
3441 	l_params = strdup(params);
3442 	if (l_params == NULL)
3443 		return -ENOMEM;
3444 	token = strtok(l_params, ",");
3445 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3446 	rx_adapter_id = strtoul(token, NULL, 10);
3447 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3448 
3449 	token = strtok(NULL, ",");
3450 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3451 
3452 	/* Get device ID from parameter string */
3453 	eth_dev_id = strtoul(token, NULL, 10);
3454 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3455 
3456 	token = strtok(NULL, ",");
3457 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3458 
3459 	/* Get Rx queue ID from parameter string */
3460 	rx_queue_id = strtoul(token, NULL, 10);
3461 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3462 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3463 		ret = -EINVAL;
3464 		goto error;
3465 	}
3466 
3467 	token = strtok(NULL, "\0");
3468 	if (token != NULL)
3469 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3470 				 " telemetry command, ignoring");
3471 	/* Parsing parameter finished */
3472 	free(l_params);
3473 
3474 	if (rte_event_eth_rx_adapter_queue_stats_get(rx_adapter_id, eth_dev_id,
3475 						    rx_queue_id, &q_stats)) {
3476 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue stats");
3477 		return -1;
3478 	}
3479 
3480 	rte_tel_data_start_dict(d);
3481 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3482 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3483 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3484 	RXA_ADD_DICT(q_stats, rx_event_buf_count);
3485 	RXA_ADD_DICT(q_stats, rx_event_buf_size);
3486 	RXA_ADD_DICT(q_stats, rx_poll_count);
3487 	RXA_ADD_DICT(q_stats, rx_packets);
3488 	RXA_ADD_DICT(q_stats, rx_dropped);
3489 
3490 	return 0;
3491 
3492 error:
3493 	free(l_params);
3494 	return ret;
3495 }
3496 
3497 static int
3498 handle_rxa_queue_stats_reset(const char *cmd __rte_unused,
3499 			     const char *params,
3500 			     struct rte_tel_data *d __rte_unused)
3501 {
3502 	uint8_t rx_adapter_id;
3503 	uint16_t rx_queue_id;
3504 	int eth_dev_id, ret = -1;
3505 	char *token, *l_params;
3506 
3507 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3508 		return -1;
3509 
3510 	/* Get Rx adapter ID from parameter string */
3511 	l_params = strdup(params);
3512 	if (l_params == NULL)
3513 		return -ENOMEM;
3514 	token = strtok(l_params, ",");
3515 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3516 	rx_adapter_id = strtoul(token, NULL, 10);
3517 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3518 
3519 	token = strtok(NULL, ",");
3520 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3521 
3522 	/* Get device ID from parameter string */
3523 	eth_dev_id = strtoul(token, NULL, 10);
3524 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3525 
3526 	token = strtok(NULL, ",");
3527 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3528 
3529 	/* Get Rx queue ID from parameter string */
3530 	rx_queue_id = strtoul(token, NULL, 10);
3531 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3532 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3533 		ret = -EINVAL;
3534 		goto error;
3535 	}
3536 
3537 	token = strtok(NULL, "\0");
3538 	if (token != NULL)
3539 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3540 				 " telemetry command, ignoring");
3541 	/* Parsing parameter finished */
3542 	free(l_params);
3543 
3544 	if (rte_event_eth_rx_adapter_queue_stats_reset(rx_adapter_id,
3545 						       eth_dev_id,
3546 						       rx_queue_id)) {
3547 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter queue stats");
3548 		return -1;
3549 	}
3550 
3551 	return 0;
3552 
3553 error:
3554 	free(l_params);
3555 	return ret;
3556 }
3557 
3558 RTE_INIT(rxa_init_telemetry)
3559 {
3560 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3561 		handle_rxa_stats,
3562 		"Returns Rx adapter stats. Parameter: rxa_id");
3563 
3564 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3565 		handle_rxa_stats_reset,
3566 		"Reset Rx adapter stats. Parameter: rxa_id");
3567 
3568 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3569 		handle_rxa_get_queue_conf,
3570 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3571 
3572 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats",
3573 		handle_rxa_get_queue_stats,
3574 		"Returns Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3575 
3576 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats_reset",
3577 		handle_rxa_queue_stats_reset,
3578 		"Reset Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3579 }
3580