xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision b53d106d34b5c638f5a2cbdfee0da5bd42d4383f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9 
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
21 #include <rte_telemetry.h>
22 
23 #include "rte_eventdev.h"
24 #include "eventdev_pmd.h"
25 #include "eventdev_trace.h"
26 #include "rte_event_eth_rx_adapter.h"
27 
28 #define BATCH_SIZE		32
29 #define BLOCK_CNT_THRESHOLD	10
30 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
31 #define MAX_VECTOR_SIZE		1024
32 #define MIN_VECTOR_SIZE		4
33 #define MAX_VECTOR_NS		1E9
34 #define MIN_VECTOR_NS		1E5
35 
36 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
37 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
38 
39 #define RSS_KEY_SIZE	40
40 /* value written to intr thread pipe to signal thread exit */
41 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
42 /* Sentinel value to detect initialized file handle */
43 #define INIT_FD		-1
44 
45 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
46 
47 /*
48  * Used to store port and queue ID of interrupting Rx queue
49  */
50 union queue_data {
51 	RTE_STD_C11
52 	void *ptr;
53 	struct {
54 		uint16_t port;
55 		uint16_t queue;
56 	};
57 };
58 
59 /*
60  * There is an instance of this struct per polled Rx queue added to the
61  * adapter
62  */
63 struct eth_rx_poll_entry {
64 	/* Eth port to poll */
65 	uint16_t eth_dev_id;
66 	/* Eth rx queue to poll */
67 	uint16_t eth_rx_qid;
68 };
69 
70 struct eth_rx_vector_data {
71 	TAILQ_ENTRY(eth_rx_vector_data) next;
72 	uint16_t port;
73 	uint16_t queue;
74 	uint16_t max_vector_count;
75 	uint64_t event;
76 	uint64_t ts;
77 	uint64_t vector_timeout_ticks;
78 	struct rte_mempool *vector_pool;
79 	struct rte_event_vector *vector_ev;
80 } __rte_cache_aligned;
81 
82 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
83 
84 /* Instance per adapter */
85 struct eth_event_enqueue_buffer {
86 	/* Count of events in this buffer */
87 	uint16_t count;
88 	/* Array of events in this buffer */
89 	struct rte_event *events;
90 	/* size of event buffer */
91 	uint16_t events_size;
92 	/* Event enqueue happens from head */
93 	uint16_t head;
94 	/* New packets from rte_eth_rx_burst is enqued from tail */
95 	uint16_t tail;
96 	/* last element in the buffer before rollover */
97 	uint16_t last;
98 	uint16_t last_mask;
99 };
100 
101 struct event_eth_rx_adapter {
102 	/* RSS key */
103 	uint8_t rss_key_be[RSS_KEY_SIZE];
104 	/* Event device identifier */
105 	uint8_t eventdev_id;
106 	/* Event port identifier */
107 	uint8_t event_port_id;
108 	/* Flag indicating per rxq event buffer */
109 	bool use_queue_event_buf;
110 	/* Per ethernet device structure */
111 	struct eth_device_info *eth_devices;
112 	/* Lock to serialize config updates with service function */
113 	rte_spinlock_t rx_lock;
114 	/* Max mbufs processed in any service function invocation */
115 	uint32_t max_nb_rx;
116 	/* Receive queues that need to be polled */
117 	struct eth_rx_poll_entry *eth_rx_poll;
118 	/* Size of the eth_rx_poll array */
119 	uint16_t num_rx_polled;
120 	/* Weighted round robin schedule */
121 	uint32_t *wrr_sched;
122 	/* wrr_sched[] size */
123 	uint32_t wrr_len;
124 	/* Next entry in wrr[] to begin polling */
125 	uint32_t wrr_pos;
126 	/* Event burst buffer */
127 	struct eth_event_enqueue_buffer event_enqueue_buffer;
128 	/* Vector enable flag */
129 	uint8_t ena_vector;
130 	/* Timestamp of previous vector expiry list traversal */
131 	uint64_t prev_expiry_ts;
132 	/* Minimum ticks to wait before traversing expiry list */
133 	uint64_t vector_tmo_ticks;
134 	/* vector list */
135 	struct eth_rx_vector_data_list vector_list;
136 	/* Per adapter stats */
137 	struct rte_event_eth_rx_adapter_stats stats;
138 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
139 	uint16_t enq_block_count;
140 	/* Block start ts */
141 	uint64_t rx_enq_block_start_ts;
142 	/* epoll fd used to wait for Rx interrupts */
143 	int epd;
144 	/* Num of interrupt driven interrupt queues */
145 	uint32_t num_rx_intr;
146 	/* Used to send <dev id, queue id> of interrupting Rx queues from
147 	 * the interrupt thread to the Rx thread
148 	 */
149 	struct rte_ring *intr_ring;
150 	/* Rx Queue data (dev id, queue id) for the last non-empty
151 	 * queue polled
152 	 */
153 	union queue_data qd;
154 	/* queue_data is valid */
155 	int qd_valid;
156 	/* Interrupt ring lock, synchronizes Rx thread
157 	 * and interrupt thread
158 	 */
159 	rte_spinlock_t intr_ring_lock;
160 	/* event array passed to rte_poll_wait */
161 	struct rte_epoll_event *epoll_events;
162 	/* Count of interrupt vectors in use */
163 	uint32_t num_intr_vec;
164 	/* Thread blocked on Rx interrupts */
165 	pthread_t rx_intr_thread;
166 	/* Configuration callback for rte_service configuration */
167 	rte_event_eth_rx_adapter_conf_cb conf_cb;
168 	/* Configuration callback argument */
169 	void *conf_arg;
170 	/* Set if  default_cb is being used */
171 	int default_cb_arg;
172 	/* Service initialization state */
173 	uint8_t service_inited;
174 	/* Total count of Rx queues in adapter */
175 	uint32_t nb_queues;
176 	/* Memory allocation name */
177 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
178 	/* Socket identifier cached from eventdev */
179 	int socket_id;
180 	/* Per adapter EAL service */
181 	uint32_t service_id;
182 	/* Adapter started flag */
183 	uint8_t rxa_started;
184 	/* Adapter ID */
185 	uint8_t id;
186 } __rte_cache_aligned;
187 
188 /* Per eth device */
189 struct eth_device_info {
190 	struct rte_eth_dev *dev;
191 	struct eth_rx_queue_info *rx_queue;
192 	/* Rx callback */
193 	rte_event_eth_rx_adapter_cb_fn cb_fn;
194 	/* Rx callback argument */
195 	void *cb_arg;
196 	/* Set if ethdev->eventdev packet transfer uses a
197 	 * hardware mechanism
198 	 */
199 	uint8_t internal_event_port;
200 	/* Set if the adapter is processing rx queues for
201 	 * this eth device and packet processing has been
202 	 * started, allows for the code to know if the PMD
203 	 * rx_adapter_stop callback needs to be invoked
204 	 */
205 	uint8_t dev_rx_started;
206 	/* Number of queues added for this device */
207 	uint16_t nb_dev_queues;
208 	/* Number of poll based queues
209 	 * If nb_rx_poll > 0, the start callback will
210 	 * be invoked if not already invoked
211 	 */
212 	uint16_t nb_rx_poll;
213 	/* Number of interrupt based queues
214 	 * If nb_rx_intr > 0, the start callback will
215 	 * be invoked if not already invoked.
216 	 */
217 	uint16_t nb_rx_intr;
218 	/* Number of queues that use the shared interrupt */
219 	uint16_t nb_shared_intr;
220 	/* sum(wrr(q)) for all queues within the device
221 	 * useful when deleting all device queues
222 	 */
223 	uint32_t wrr_len;
224 	/* Intr based queue index to start polling from, this is used
225 	 * if the number of shared interrupts is non-zero
226 	 */
227 	uint16_t next_q_idx;
228 	/* Intr based queue indices */
229 	uint16_t *intr_queue;
230 	/* device generates per Rx queue interrupt for queue index
231 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
232 	 */
233 	int multi_intr_cap;
234 	/* shared interrupt enabled */
235 	int shared_intr_enabled;
236 };
237 
238 /* Per Rx queue */
239 struct eth_rx_queue_info {
240 	int queue_enabled;	/* True if added */
241 	int intr_enabled;
242 	uint8_t ena_vector;
243 	uint16_t wt;		/* Polling weight */
244 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
245 	uint64_t event;
246 	struct eth_rx_vector_data vector_data;
247 	struct eth_event_enqueue_buffer *event_buf;
248 	/* use adapter stats struct for queue level stats,
249 	 * as same stats need to be updated for adapter and queue
250 	 */
251 	struct rte_event_eth_rx_adapter_stats *stats;
252 };
253 
254 static struct event_eth_rx_adapter **event_eth_rx_adapter;
255 
256 /* Enable dynamic timestamp field in mbuf */
257 static uint64_t event_eth_rx_timestamp_dynflag;
258 static int event_eth_rx_timestamp_dynfield_offset = -1;
259 
260 static inline rte_mbuf_timestamp_t *
261 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
262 {
263 	return RTE_MBUF_DYNFIELD(mbuf,
264 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
265 }
266 
267 static inline int
268 rxa_validate_id(uint8_t id)
269 {
270 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
271 }
272 
273 static inline struct eth_event_enqueue_buffer *
274 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
275 		  uint16_t rx_queue_id,
276 		  struct rte_event_eth_rx_adapter_stats **stats)
277 {
278 	if (rx_adapter->use_queue_event_buf) {
279 		struct eth_device_info *dev_info =
280 			&rx_adapter->eth_devices[eth_dev_id];
281 		*stats = dev_info->rx_queue[rx_queue_id].stats;
282 		return dev_info->rx_queue[rx_queue_id].event_buf;
283 	} else {
284 		*stats = &rx_adapter->stats;
285 		return &rx_adapter->event_enqueue_buffer;
286 	}
287 }
288 
289 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
290 	if (!rxa_validate_id(id)) { \
291 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
292 		return retval; \
293 	} \
294 } while (0)
295 
296 static inline int
297 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
298 {
299 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
300 }
301 
302 /* Greatest common divisor */
303 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
304 {
305 	uint16_t r = a % b;
306 
307 	return r ? rxa_gcd_u16(b, r) : b;
308 }
309 
310 /* Returns the next queue in the polling sequence
311  *
312  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
313  */
314 static int
315 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
316 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
317 	     uint16_t gcd, int prev)
318 {
319 	int i = prev;
320 	uint16_t w;
321 
322 	while (1) {
323 		uint16_t q;
324 		uint16_t d;
325 
326 		i = (i + 1) % n;
327 		if (i == 0) {
328 			*cw = *cw - gcd;
329 			if (*cw <= 0)
330 				*cw = max_wt;
331 		}
332 
333 		q = eth_rx_poll[i].eth_rx_qid;
334 		d = eth_rx_poll[i].eth_dev_id;
335 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
336 
337 		if ((int)w >= *cw)
338 			return i;
339 	}
340 }
341 
342 static inline int
343 rxa_shared_intr(struct eth_device_info *dev_info,
344 	int rx_queue_id)
345 {
346 	int multi_intr_cap;
347 
348 	if (dev_info->dev->intr_handle == NULL)
349 		return 0;
350 
351 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
352 	return !multi_intr_cap ||
353 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
354 }
355 
356 static inline int
357 rxa_intr_queue(struct eth_device_info *dev_info,
358 	int rx_queue_id)
359 {
360 	struct eth_rx_queue_info *queue_info;
361 
362 	queue_info = &dev_info->rx_queue[rx_queue_id];
363 	return dev_info->rx_queue &&
364 		!dev_info->internal_event_port &&
365 		queue_info->queue_enabled && queue_info->wt == 0;
366 }
367 
368 static inline int
369 rxa_polled_queue(struct eth_device_info *dev_info,
370 	int rx_queue_id)
371 {
372 	struct eth_rx_queue_info *queue_info;
373 
374 	queue_info = &dev_info->rx_queue[rx_queue_id];
375 	return !dev_info->internal_event_port &&
376 		dev_info->rx_queue &&
377 		queue_info->queue_enabled && queue_info->wt != 0;
378 }
379 
380 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
381 static int
382 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
383 {
384 	uint16_t i;
385 	int n, s;
386 	uint16_t nbq;
387 
388 	nbq = dev_info->dev->data->nb_rx_queues;
389 	n = 0; /* non shared count */
390 	s = 0; /* shared count */
391 
392 	if (rx_queue_id == -1) {
393 		for (i = 0; i < nbq; i++) {
394 			if (!rxa_shared_intr(dev_info, i))
395 				n += add ? !rxa_intr_queue(dev_info, i) :
396 					rxa_intr_queue(dev_info, i);
397 			else
398 				s += add ? !rxa_intr_queue(dev_info, i) :
399 					rxa_intr_queue(dev_info, i);
400 		}
401 
402 		if (s > 0) {
403 			if ((add && dev_info->nb_shared_intr == 0) ||
404 				(!add && dev_info->nb_shared_intr))
405 				n += 1;
406 		}
407 	} else {
408 		if (!rxa_shared_intr(dev_info, rx_queue_id))
409 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
410 				rxa_intr_queue(dev_info, rx_queue_id);
411 		else
412 			n = add ? !dev_info->nb_shared_intr :
413 				dev_info->nb_shared_intr == 1;
414 	}
415 
416 	return add ? n : -n;
417 }
418 
419 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
420  */
421 static void
422 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
423 			  struct eth_device_info *dev_info, int rx_queue_id,
424 			  uint32_t *nb_rx_intr)
425 {
426 	uint32_t intr_diff;
427 
428 	if (rx_queue_id == -1)
429 		intr_diff = dev_info->nb_rx_intr;
430 	else
431 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
432 
433 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
434 }
435 
436 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
437  * interrupt queues could currently be poll mode Rx queues
438  */
439 static void
440 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
441 			  struct eth_device_info *dev_info, int rx_queue_id,
442 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
443 			  uint32_t *nb_wrr)
444 {
445 	uint32_t intr_diff;
446 	uint32_t poll_diff;
447 	uint32_t wrr_len_diff;
448 
449 	if (rx_queue_id == -1) {
450 		intr_diff = dev_info->dev->data->nb_rx_queues -
451 						dev_info->nb_rx_intr;
452 		poll_diff = dev_info->nb_rx_poll;
453 		wrr_len_diff = dev_info->wrr_len;
454 	} else {
455 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
456 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
457 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
458 					0;
459 	}
460 
461 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
462 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
463 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
464 }
465 
466 /* Calculate size of the eth_rx_poll and wrr_sched arrays
467  * after deleting poll mode rx queues
468  */
469 static void
470 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
471 			  struct eth_device_info *dev_info, int rx_queue_id,
472 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
473 {
474 	uint32_t poll_diff;
475 	uint32_t wrr_len_diff;
476 
477 	if (rx_queue_id == -1) {
478 		poll_diff = dev_info->nb_rx_poll;
479 		wrr_len_diff = dev_info->wrr_len;
480 	} else {
481 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
482 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
483 					0;
484 	}
485 
486 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
487 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
488 }
489 
490 /* Calculate nb_rx_* after adding poll mode rx queues
491  */
492 static void
493 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
494 			  struct eth_device_info *dev_info, int rx_queue_id,
495 			  uint16_t wt, uint32_t *nb_rx_poll,
496 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
497 {
498 	uint32_t intr_diff;
499 	uint32_t poll_diff;
500 	uint32_t wrr_len_diff;
501 
502 	if (rx_queue_id == -1) {
503 		intr_diff = dev_info->nb_rx_intr;
504 		poll_diff = dev_info->dev->data->nb_rx_queues -
505 						dev_info->nb_rx_poll;
506 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
507 				- dev_info->wrr_len;
508 	} else {
509 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
510 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
511 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
512 				wt - dev_info->rx_queue[rx_queue_id].wt :
513 				wt;
514 	}
515 
516 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
517 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
518 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
519 }
520 
521 /* Calculate nb_rx_* after adding rx_queue_id */
522 static void
523 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
524 		     struct eth_device_info *dev_info, int rx_queue_id,
525 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
526 		     uint32_t *nb_wrr)
527 {
528 	if (wt != 0)
529 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
530 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
531 	else
532 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
533 					nb_rx_poll, nb_rx_intr, nb_wrr);
534 }
535 
536 /* Calculate nb_rx_* after deleting rx_queue_id */
537 static void
538 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
539 		     struct eth_device_info *dev_info, int rx_queue_id,
540 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
541 		     uint32_t *nb_wrr)
542 {
543 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
544 				nb_wrr);
545 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
546 				nb_rx_intr);
547 }
548 
549 /*
550  * Allocate the rx_poll array
551  */
552 static struct eth_rx_poll_entry *
553 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
554 {
555 	size_t len;
556 
557 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
558 							RTE_CACHE_LINE_SIZE);
559 	return  rte_zmalloc_socket(rx_adapter->mem_name,
560 				len,
561 				RTE_CACHE_LINE_SIZE,
562 				rx_adapter->socket_id);
563 }
564 
565 /*
566  * Allocate the WRR array
567  */
568 static uint32_t *
569 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
570 {
571 	size_t len;
572 
573 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
574 			RTE_CACHE_LINE_SIZE);
575 	return  rte_zmalloc_socket(rx_adapter->mem_name,
576 				len,
577 				RTE_CACHE_LINE_SIZE,
578 				rx_adapter->socket_id);
579 }
580 
581 static int
582 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
583 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
584 		      uint32_t **wrr_sched)
585 {
586 
587 	if (nb_poll == 0) {
588 		*rx_poll = NULL;
589 		*wrr_sched = NULL;
590 		return 0;
591 	}
592 
593 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
594 	if (*rx_poll == NULL) {
595 		*wrr_sched = NULL;
596 		return -ENOMEM;
597 	}
598 
599 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
600 	if (*wrr_sched == NULL) {
601 		rte_free(*rx_poll);
602 		return -ENOMEM;
603 	}
604 	return 0;
605 }
606 
607 /* Precalculate WRR polling sequence for all queues in rx_adapter */
608 static void
609 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
610 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
611 {
612 	uint16_t d;
613 	uint16_t q;
614 	unsigned int i;
615 	int prev = -1;
616 	int cw = -1;
617 
618 	/* Initialize variables for calculation of wrr schedule */
619 	uint16_t max_wrr_pos = 0;
620 	unsigned int poll_q = 0;
621 	uint16_t max_wt = 0;
622 	uint16_t gcd = 0;
623 
624 	if (rx_poll == NULL)
625 		return;
626 
627 	/* Generate array of all queues to poll, the size of this
628 	 * array is poll_q
629 	 */
630 	RTE_ETH_FOREACH_DEV(d) {
631 		uint16_t nb_rx_queues;
632 		struct eth_device_info *dev_info =
633 				&rx_adapter->eth_devices[d];
634 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
635 		if (dev_info->rx_queue == NULL)
636 			continue;
637 		if (dev_info->internal_event_port)
638 			continue;
639 		dev_info->wrr_len = 0;
640 		for (q = 0; q < nb_rx_queues; q++) {
641 			struct eth_rx_queue_info *queue_info =
642 				&dev_info->rx_queue[q];
643 			uint16_t wt;
644 
645 			if (!rxa_polled_queue(dev_info, q))
646 				continue;
647 			wt = queue_info->wt;
648 			rx_poll[poll_q].eth_dev_id = d;
649 			rx_poll[poll_q].eth_rx_qid = q;
650 			max_wrr_pos += wt;
651 			dev_info->wrr_len += wt;
652 			max_wt = RTE_MAX(max_wt, wt);
653 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
654 			poll_q++;
655 		}
656 	}
657 
658 	/* Generate polling sequence based on weights */
659 	prev = -1;
660 	cw = -1;
661 	for (i = 0; i < max_wrr_pos; i++) {
662 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
663 				     rx_poll, max_wt, gcd, prev);
664 		prev = rx_wrr[i];
665 	}
666 }
667 
668 static inline void
669 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
670 	struct rte_ipv6_hdr **ipv6_hdr)
671 {
672 	struct rte_ether_hdr *eth_hdr =
673 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
674 	struct rte_vlan_hdr *vlan_hdr;
675 
676 	*ipv4_hdr = NULL;
677 	*ipv6_hdr = NULL;
678 
679 	switch (eth_hdr->ether_type) {
680 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
681 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
682 		break;
683 
684 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
685 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
686 		break;
687 
688 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
689 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
690 		switch (vlan_hdr->eth_proto) {
691 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
692 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
693 			break;
694 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
695 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
696 			break;
697 		default:
698 			break;
699 		}
700 		break;
701 
702 	default:
703 		break;
704 	}
705 }
706 
707 /* Calculate RSS hash for IPv4/6 */
708 static inline uint32_t
709 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
710 {
711 	uint32_t input_len;
712 	void *tuple;
713 	struct rte_ipv4_tuple ipv4_tuple;
714 	struct rte_ipv6_tuple ipv6_tuple;
715 	struct rte_ipv4_hdr *ipv4_hdr;
716 	struct rte_ipv6_hdr *ipv6_hdr;
717 
718 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
719 
720 	if (ipv4_hdr) {
721 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
722 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
723 		tuple = &ipv4_tuple;
724 		input_len = RTE_THASH_V4_L3_LEN;
725 	} else if (ipv6_hdr) {
726 		rte_thash_load_v6_addrs(ipv6_hdr,
727 					(union rte_thash_tuple *)&ipv6_tuple);
728 		tuple = &ipv6_tuple;
729 		input_len = RTE_THASH_V6_L3_LEN;
730 	} else
731 		return 0;
732 
733 	return rte_softrss_be(tuple, input_len, rss_key_be);
734 }
735 
736 static inline int
737 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
738 {
739 	return !!rx_adapter->enq_block_count;
740 }
741 
742 static inline void
743 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
744 {
745 	if (rx_adapter->rx_enq_block_start_ts)
746 		return;
747 
748 	rx_adapter->enq_block_count++;
749 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
750 		return;
751 
752 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
753 }
754 
755 static inline void
756 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
757 		     struct rte_event_eth_rx_adapter_stats *stats)
758 {
759 	if (unlikely(!stats->rx_enq_start_ts))
760 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
761 
762 	if (likely(!rxa_enq_blocked(rx_adapter)))
763 		return;
764 
765 	rx_adapter->enq_block_count = 0;
766 	if (rx_adapter->rx_enq_block_start_ts) {
767 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
768 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
769 		    rx_adapter->rx_enq_block_start_ts;
770 		rx_adapter->rx_enq_block_start_ts = 0;
771 	}
772 }
773 
774 /* Enqueue buffered events to event device */
775 static inline uint16_t
776 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
777 		       struct eth_event_enqueue_buffer *buf,
778 		       struct rte_event_eth_rx_adapter_stats *stats)
779 {
780 	uint16_t count = buf->count;
781 	uint16_t n = 0;
782 
783 	if (!count)
784 		return 0;
785 
786 	if (buf->last)
787 		count = buf->last - buf->head;
788 
789 	if (count) {
790 		n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
791 						rx_adapter->event_port_id,
792 						&buf->events[buf->head],
793 						count);
794 		if (n != count)
795 			stats->rx_enq_retry++;
796 
797 		buf->head += n;
798 	}
799 
800 	if (buf->last && n == count) {
801 		uint16_t n1;
802 
803 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
804 					rx_adapter->event_port_id,
805 					&buf->events[0],
806 					buf->tail);
807 
808 		if (n1 != buf->tail)
809 			stats->rx_enq_retry++;
810 
811 		buf->last = 0;
812 		buf->head = n1;
813 		buf->last_mask = 0;
814 		n += n1;
815 	}
816 
817 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
818 		rxa_enq_block_start_ts(rx_adapter);
819 
820 	buf->count -= n;
821 	stats->rx_enq_count += n;
822 
823 	return n;
824 }
825 
826 static inline void
827 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
828 		struct eth_rx_vector_data *vec)
829 {
830 	vec->vector_ev->nb_elem = 0;
831 	vec->vector_ev->port = vec->port;
832 	vec->vector_ev->queue = vec->queue;
833 	vec->vector_ev->attr_valid = true;
834 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
835 }
836 
837 static inline uint16_t
838 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
839 			struct eth_rx_queue_info *queue_info,
840 			struct eth_event_enqueue_buffer *buf,
841 			struct rte_mbuf **mbufs, uint16_t num)
842 {
843 	struct rte_event *ev = &buf->events[buf->count];
844 	struct eth_rx_vector_data *vec;
845 	uint16_t filled, space, sz;
846 
847 	filled = 0;
848 	vec = &queue_info->vector_data;
849 
850 	if (vec->vector_ev == NULL) {
851 		if (rte_mempool_get(vec->vector_pool,
852 				    (void **)&vec->vector_ev) < 0) {
853 			rte_pktmbuf_free_bulk(mbufs, num);
854 			return 0;
855 		}
856 		rxa_init_vector(rx_adapter, vec);
857 	}
858 	while (num) {
859 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
860 			/* Event ready. */
861 			ev->event = vec->event;
862 			ev->vec = vec->vector_ev;
863 			ev++;
864 			filled++;
865 			vec->vector_ev = NULL;
866 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
867 			if (rte_mempool_get(vec->vector_pool,
868 					    (void **)&vec->vector_ev) < 0) {
869 				rte_pktmbuf_free_bulk(mbufs, num);
870 				return 0;
871 			}
872 			rxa_init_vector(rx_adapter, vec);
873 		}
874 
875 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
876 		sz = num > space ? space : num;
877 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
878 		       sizeof(void *) * sz);
879 		vec->vector_ev->nb_elem += sz;
880 		num -= sz;
881 		mbufs += sz;
882 		vec->ts = rte_rdtsc();
883 	}
884 
885 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
886 		ev->event = vec->event;
887 		ev->vec = vec->vector_ev;
888 		ev++;
889 		filled++;
890 		vec->vector_ev = NULL;
891 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
892 	}
893 
894 	return filled;
895 }
896 
897 static inline void
898 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
899 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
900 		 struct eth_event_enqueue_buffer *buf,
901 		 struct rte_event_eth_rx_adapter_stats *stats)
902 {
903 	uint32_t i;
904 	struct eth_device_info *dev_info =
905 					&rx_adapter->eth_devices[eth_dev_id];
906 	struct eth_rx_queue_info *eth_rx_queue_info =
907 					&dev_info->rx_queue[rx_queue_id];
908 	uint16_t new_tail = buf->tail;
909 	uint64_t event = eth_rx_queue_info->event;
910 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
911 	struct rte_mbuf *m = mbufs[0];
912 	uint32_t rss_mask;
913 	uint32_t rss;
914 	int do_rss;
915 	uint16_t nb_cb;
916 	uint16_t dropped;
917 	uint64_t ts, ts_mask;
918 
919 	if (!eth_rx_queue_info->ena_vector) {
920 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
921 						0 : rte_get_tsc_cycles();
922 
923 		/* 0xffff ffff ffff ffff if RTE_MBUF_F_RX_TIMESTAMP is set,
924 		 * otherwise 0
925 		 */
926 		ts_mask = (uint64_t)(!(m->ol_flags &
927 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
928 
929 		/* 0xffff ffff if RTE_MBUF_F_RX_RSS_HASH is set, otherwise 0 */
930 		rss_mask = ~(((m->ol_flags & RTE_MBUF_F_RX_RSS_HASH) != 0) - 1);
931 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
932 		for (i = 0; i < num; i++) {
933 			struct rte_event *ev;
934 
935 			m = mbufs[i];
936 			*rxa_timestamp_dynfield(m) = ts |
937 					(*rxa_timestamp_dynfield(m) & ts_mask);
938 
939 			ev = &buf->events[new_tail];
940 
941 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
942 				     : m->hash.rss;
943 			ev->event = event;
944 			ev->flow_id = (rss & ~flow_id_mask) |
945 				      (ev->flow_id & flow_id_mask);
946 			ev->mbuf = m;
947 			new_tail++;
948 		}
949 	} else {
950 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
951 					      buf, mbufs, num);
952 	}
953 
954 	if (num && dev_info->cb_fn) {
955 
956 		dropped = 0;
957 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
958 				       buf->last |
959 				       (buf->events_size & ~buf->last_mask),
960 				       buf->count >= BATCH_SIZE ?
961 						buf->count - BATCH_SIZE : 0,
962 				       &buf->events[buf->tail],
963 				       num,
964 				       dev_info->cb_arg,
965 				       &dropped);
966 		if (unlikely(nb_cb > num))
967 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
968 				nb_cb, num);
969 		else
970 			num = nb_cb;
971 		if (dropped)
972 			stats->rx_dropped += dropped;
973 	}
974 
975 	buf->count += num;
976 	buf->tail += num;
977 }
978 
979 static inline bool
980 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
981 {
982 	uint32_t nb_req = buf->tail + BATCH_SIZE;
983 
984 	if (!buf->last) {
985 		if (nb_req <= buf->events_size)
986 			return true;
987 
988 		if (buf->head >= BATCH_SIZE) {
989 			buf->last_mask = ~0;
990 			buf->last = buf->tail;
991 			buf->tail = 0;
992 			return true;
993 		}
994 	}
995 
996 	return nb_req <= buf->head;
997 }
998 
999 /* Enqueue packets from  <port, q>  to event buffer */
1000 static inline uint32_t
1001 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
1002 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
1003 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf,
1004 	   struct rte_event_eth_rx_adapter_stats *stats)
1005 {
1006 	struct rte_mbuf *mbufs[BATCH_SIZE];
1007 	uint16_t n;
1008 	uint32_t nb_rx = 0;
1009 	uint32_t nb_flushed = 0;
1010 
1011 	if (rxq_empty)
1012 		*rxq_empty = 0;
1013 	/* Don't do a batch dequeue from the rx queue if there isn't
1014 	 * enough space in the enqueue buffer.
1015 	 */
1016 	while (rxa_pkt_buf_available(buf)) {
1017 		if (buf->count >= BATCH_SIZE)
1018 			nb_flushed +=
1019 				rxa_flush_event_buffer(rx_adapter, buf, stats);
1020 
1021 		stats->rx_poll_count++;
1022 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1023 		if (unlikely(!n)) {
1024 			if (rxq_empty)
1025 				*rxq_empty = 1;
1026 			break;
1027 		}
1028 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf,
1029 				 stats);
1030 		nb_rx += n;
1031 		if (rx_count + nb_rx > max_rx)
1032 			break;
1033 	}
1034 
1035 	if (buf->count > 0)
1036 		nb_flushed += rxa_flush_event_buffer(rx_adapter, buf, stats);
1037 
1038 	stats->rx_packets += nb_rx;
1039 	if (nb_flushed == 0)
1040 		rte_event_maintain(rx_adapter->eventdev_id,
1041 				   rx_adapter->event_port_id, 0);
1042 
1043 	return nb_rx;
1044 }
1045 
1046 static inline void
1047 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1048 {
1049 	uint16_t port_id;
1050 	uint16_t queue;
1051 	int err;
1052 	union queue_data qd;
1053 	struct eth_device_info *dev_info;
1054 	struct eth_rx_queue_info *queue_info;
1055 	int *intr_enabled;
1056 
1057 	qd.ptr = data;
1058 	port_id = qd.port;
1059 	queue = qd.queue;
1060 
1061 	dev_info = &rx_adapter->eth_devices[port_id];
1062 	queue_info = &dev_info->rx_queue[queue];
1063 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1064 	if (rxa_shared_intr(dev_info, queue))
1065 		intr_enabled = &dev_info->shared_intr_enabled;
1066 	else
1067 		intr_enabled = &queue_info->intr_enabled;
1068 
1069 	if (*intr_enabled) {
1070 		*intr_enabled = 0;
1071 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1072 		/* Entry should always be available.
1073 		 * The ring size equals the maximum number of interrupt
1074 		 * vectors supported (an interrupt vector is shared in
1075 		 * case of shared interrupts)
1076 		 */
1077 		if (err)
1078 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1079 				" to ring: %s", strerror(-err));
1080 		else
1081 			rte_eth_dev_rx_intr_disable(port_id, queue);
1082 	}
1083 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1084 }
1085 
1086 static int
1087 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1088 			  uint32_t num_intr_vec)
1089 {
1090 	if (rx_adapter->num_intr_vec + num_intr_vec >
1091 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1092 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1093 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1094 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1095 		return -ENOSPC;
1096 	}
1097 
1098 	return 0;
1099 }
1100 
1101 /* Delete entries for (dev, queue) from the interrupt ring */
1102 static void
1103 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1104 			  struct eth_device_info *dev_info,
1105 			  uint16_t rx_queue_id)
1106 {
1107 	int i, n;
1108 	union queue_data qd;
1109 
1110 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1111 
1112 	n = rte_ring_count(rx_adapter->intr_ring);
1113 	for (i = 0; i < n; i++) {
1114 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1115 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1116 			if (qd.port == dev_info->dev->data->port_id &&
1117 				qd.queue == rx_queue_id)
1118 				continue;
1119 		} else {
1120 			if (qd.port == dev_info->dev->data->port_id)
1121 				continue;
1122 		}
1123 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1124 	}
1125 
1126 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1127 }
1128 
1129 /* pthread callback handling interrupt mode receive queues
1130  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1131  * interrupting queue to the adapter's ring buffer for interrupt events.
1132  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1133  * the adapter service function.
1134  */
1135 static void *
1136 rxa_intr_thread(void *arg)
1137 {
1138 	struct event_eth_rx_adapter *rx_adapter = arg;
1139 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1140 	int n, i;
1141 
1142 	while (1) {
1143 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1144 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1145 		if (unlikely(n < 0))
1146 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1147 					n);
1148 		for (i = 0; i < n; i++) {
1149 			rxa_intr_ring_enqueue(rx_adapter,
1150 					epoll_events[i].epdata.data);
1151 		}
1152 	}
1153 
1154 	return NULL;
1155 }
1156 
1157 /* Dequeue <port, q> from interrupt ring and enqueue received
1158  * mbufs to eventdev
1159  */
1160 static inline void
1161 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1162 {
1163 	uint32_t n;
1164 	uint32_t nb_rx = 0;
1165 	int rxq_empty;
1166 	struct eth_event_enqueue_buffer *buf;
1167 	struct rte_event_eth_rx_adapter_stats *stats;
1168 	rte_spinlock_t *ring_lock;
1169 	uint8_t max_done = 0;
1170 
1171 	if (rx_adapter->num_rx_intr == 0)
1172 		return;
1173 
1174 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1175 		&& !rx_adapter->qd_valid)
1176 		return;
1177 
1178 	buf = &rx_adapter->event_enqueue_buffer;
1179 	stats = &rx_adapter->stats;
1180 	ring_lock = &rx_adapter->intr_ring_lock;
1181 
1182 	if (buf->count >= BATCH_SIZE)
1183 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1184 
1185 	while (rxa_pkt_buf_available(buf)) {
1186 		struct eth_device_info *dev_info;
1187 		uint16_t port;
1188 		uint16_t queue;
1189 		union queue_data qd  = rx_adapter->qd;
1190 		int err;
1191 
1192 		if (!rx_adapter->qd_valid) {
1193 			struct eth_rx_queue_info *queue_info;
1194 
1195 			rte_spinlock_lock(ring_lock);
1196 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1197 			if (err) {
1198 				rte_spinlock_unlock(ring_lock);
1199 				break;
1200 			}
1201 
1202 			port = qd.port;
1203 			queue = qd.queue;
1204 			rx_adapter->qd = qd;
1205 			rx_adapter->qd_valid = 1;
1206 			dev_info = &rx_adapter->eth_devices[port];
1207 			if (rxa_shared_intr(dev_info, queue))
1208 				dev_info->shared_intr_enabled = 1;
1209 			else {
1210 				queue_info = &dev_info->rx_queue[queue];
1211 				queue_info->intr_enabled = 1;
1212 			}
1213 			rte_eth_dev_rx_intr_enable(port, queue);
1214 			rte_spinlock_unlock(ring_lock);
1215 		} else {
1216 			port = qd.port;
1217 			queue = qd.queue;
1218 
1219 			dev_info = &rx_adapter->eth_devices[port];
1220 		}
1221 
1222 		if (rxa_shared_intr(dev_info, queue)) {
1223 			uint16_t i;
1224 			uint16_t nb_queues;
1225 
1226 			nb_queues = dev_info->dev->data->nb_rx_queues;
1227 			n = 0;
1228 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1229 				uint8_t enq_buffer_full;
1230 
1231 				if (!rxa_intr_queue(dev_info, i))
1232 					continue;
1233 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1234 					rx_adapter->max_nb_rx,
1235 					&rxq_empty, buf, stats);
1236 				nb_rx += n;
1237 
1238 				enq_buffer_full = !rxq_empty && n == 0;
1239 				max_done = nb_rx > rx_adapter->max_nb_rx;
1240 
1241 				if (enq_buffer_full || max_done) {
1242 					dev_info->next_q_idx = i;
1243 					goto done;
1244 				}
1245 			}
1246 
1247 			rx_adapter->qd_valid = 0;
1248 
1249 			/* Reinitialize for next interrupt */
1250 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1251 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1252 						0;
1253 		} else {
1254 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1255 				rx_adapter->max_nb_rx,
1256 				&rxq_empty, buf, stats);
1257 			rx_adapter->qd_valid = !rxq_empty;
1258 			nb_rx += n;
1259 			if (nb_rx > rx_adapter->max_nb_rx)
1260 				break;
1261 		}
1262 	}
1263 
1264 done:
1265 	rx_adapter->stats.rx_intr_packets += nb_rx;
1266 }
1267 
1268 /*
1269  * Polls receive queues added to the event adapter and enqueues received
1270  * packets to the event device.
1271  *
1272  * The receive code enqueues initially to a temporary buffer, the
1273  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1274  *
1275  * If there isn't space available in the temporary buffer, packets from the
1276  * Rx queue aren't dequeued from the eth device, this back pressures the
1277  * eth device, in virtual device environments this back pressure is relayed to
1278  * the hypervisor's switching layer where adjustments can be made to deal with
1279  * it.
1280  */
1281 static inline void
1282 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1283 {
1284 	uint32_t num_queue;
1285 	uint32_t nb_rx = 0;
1286 	struct eth_event_enqueue_buffer *buf = NULL;
1287 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1288 	uint32_t wrr_pos;
1289 	uint32_t max_nb_rx;
1290 
1291 	wrr_pos = rx_adapter->wrr_pos;
1292 	max_nb_rx = rx_adapter->max_nb_rx;
1293 
1294 	/* Iterate through a WRR sequence */
1295 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1296 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1297 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1298 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1299 
1300 		buf = rxa_event_buf_get(rx_adapter, d, qid, &stats);
1301 
1302 		/* Don't do a batch dequeue from the rx queue if there isn't
1303 		 * enough space in the enqueue buffer.
1304 		 */
1305 		if (buf->count >= BATCH_SIZE)
1306 			rxa_flush_event_buffer(rx_adapter, buf, stats);
1307 		if (!rxa_pkt_buf_available(buf)) {
1308 			if (rx_adapter->use_queue_event_buf)
1309 				goto poll_next_entry;
1310 			else {
1311 				rx_adapter->wrr_pos = wrr_pos;
1312 				return;
1313 			}
1314 		}
1315 
1316 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1317 				NULL, buf, stats);
1318 		if (nb_rx > max_nb_rx) {
1319 			rx_adapter->wrr_pos =
1320 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1321 			break;
1322 		}
1323 
1324 poll_next_entry:
1325 		if (++wrr_pos == rx_adapter->wrr_len)
1326 			wrr_pos = 0;
1327 	}
1328 }
1329 
1330 static void
1331 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1332 {
1333 	struct event_eth_rx_adapter *rx_adapter = arg;
1334 	struct eth_event_enqueue_buffer *buf = NULL;
1335 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1336 	struct rte_event *ev;
1337 
1338 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue, &stats);
1339 
1340 	if (buf->count)
1341 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1342 
1343 	if (vec->vector_ev->nb_elem == 0)
1344 		return;
1345 	ev = &buf->events[buf->count];
1346 
1347 	/* Event ready. */
1348 	ev->event = vec->event;
1349 	ev->vec = vec->vector_ev;
1350 	buf->count++;
1351 
1352 	vec->vector_ev = NULL;
1353 	vec->ts = 0;
1354 }
1355 
1356 static int
1357 rxa_service_func(void *args)
1358 {
1359 	struct event_eth_rx_adapter *rx_adapter = args;
1360 
1361 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1362 		return 0;
1363 	if (!rx_adapter->rxa_started) {
1364 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1365 		return 0;
1366 	}
1367 
1368 	if (rx_adapter->ena_vector) {
1369 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1370 		    rx_adapter->vector_tmo_ticks) {
1371 			struct eth_rx_vector_data *vec;
1372 
1373 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1374 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1375 
1376 				if (elapsed_time >= vec->vector_timeout_ticks) {
1377 					rxa_vector_expire(vec, rx_adapter);
1378 					TAILQ_REMOVE(&rx_adapter->vector_list,
1379 						     vec, next);
1380 				}
1381 			}
1382 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1383 		}
1384 	}
1385 
1386 	rxa_intr_ring_dequeue(rx_adapter);
1387 	rxa_poll(rx_adapter);
1388 
1389 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1390 
1391 	return 0;
1392 }
1393 
1394 static int
1395 rte_event_eth_rx_adapter_init(void)
1396 {
1397 	const char *name = RXA_ADAPTER_ARRAY;
1398 	const struct rte_memzone *mz;
1399 	unsigned int sz;
1400 
1401 	sz = sizeof(*event_eth_rx_adapter) *
1402 	    RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1403 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1404 
1405 	mz = rte_memzone_lookup(name);
1406 	if (mz == NULL) {
1407 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1408 						 RTE_CACHE_LINE_SIZE);
1409 		if (mz == NULL) {
1410 			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1411 					PRId32, rte_errno);
1412 			return -rte_errno;
1413 		}
1414 	}
1415 
1416 	event_eth_rx_adapter = mz->addr;
1417 	return 0;
1418 }
1419 
1420 static int
1421 rxa_memzone_lookup(void)
1422 {
1423 	const struct rte_memzone *mz;
1424 
1425 	if (event_eth_rx_adapter == NULL) {
1426 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1427 		if (mz == NULL)
1428 			return -ENOMEM;
1429 		event_eth_rx_adapter = mz->addr;
1430 	}
1431 
1432 	return 0;
1433 }
1434 
1435 static inline struct event_eth_rx_adapter *
1436 rxa_id_to_adapter(uint8_t id)
1437 {
1438 	return event_eth_rx_adapter ?
1439 		event_eth_rx_adapter[id] : NULL;
1440 }
1441 
1442 static int
1443 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1444 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1445 {
1446 	int ret;
1447 	struct rte_eventdev *dev;
1448 	struct rte_event_dev_config dev_conf;
1449 	int started;
1450 	uint8_t port_id;
1451 	struct rte_event_port_conf *port_conf = arg;
1452 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1453 
1454 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1455 	dev_conf = dev->data->dev_conf;
1456 
1457 	started = dev->data->dev_started;
1458 	if (started)
1459 		rte_event_dev_stop(dev_id);
1460 	port_id = dev_conf.nb_event_ports;
1461 	dev_conf.nb_event_ports += 1;
1462 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1463 	if (ret) {
1464 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1465 						dev_id);
1466 		if (started) {
1467 			if (rte_event_dev_start(dev_id))
1468 				return -EIO;
1469 		}
1470 		return ret;
1471 	}
1472 
1473 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1474 	if (ret) {
1475 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1476 					port_id);
1477 		return ret;
1478 	}
1479 
1480 	conf->event_port_id = port_id;
1481 	conf->max_nb_rx = 128;
1482 	if (started)
1483 		ret = rte_event_dev_start(dev_id);
1484 	rx_adapter->default_cb_arg = 1;
1485 	return ret;
1486 }
1487 
1488 static int
1489 rxa_epoll_create1(void)
1490 {
1491 #if defined(LINUX)
1492 	int fd;
1493 	fd = epoll_create1(EPOLL_CLOEXEC);
1494 	return fd < 0 ? -errno : fd;
1495 #elif defined(BSD)
1496 	return -ENOTSUP;
1497 #endif
1498 }
1499 
1500 static int
1501 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1502 {
1503 	if (rx_adapter->epd != INIT_FD)
1504 		return 0;
1505 
1506 	rx_adapter->epd = rxa_epoll_create1();
1507 	if (rx_adapter->epd < 0) {
1508 		int err = rx_adapter->epd;
1509 		rx_adapter->epd = INIT_FD;
1510 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1511 		return err;
1512 	}
1513 
1514 	return 0;
1515 }
1516 
1517 static int
1518 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1519 {
1520 	int err;
1521 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1522 
1523 	if (rx_adapter->intr_ring)
1524 		return 0;
1525 
1526 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1527 					RTE_EVENT_ETH_INTR_RING_SIZE,
1528 					rte_socket_id(), 0);
1529 	if (!rx_adapter->intr_ring)
1530 		return -ENOMEM;
1531 
1532 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1533 					RTE_EVENT_ETH_INTR_RING_SIZE *
1534 					sizeof(struct rte_epoll_event),
1535 					RTE_CACHE_LINE_SIZE,
1536 					rx_adapter->socket_id);
1537 	if (!rx_adapter->epoll_events) {
1538 		err = -ENOMEM;
1539 		goto error;
1540 	}
1541 
1542 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1543 
1544 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1545 			"rx-intr-thread-%d", rx_adapter->id);
1546 
1547 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1548 				NULL, rxa_intr_thread, rx_adapter);
1549 	if (!err)
1550 		return 0;
1551 
1552 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1553 	rte_free(rx_adapter->epoll_events);
1554 error:
1555 	rte_ring_free(rx_adapter->intr_ring);
1556 	rx_adapter->intr_ring = NULL;
1557 	rx_adapter->epoll_events = NULL;
1558 	return err;
1559 }
1560 
1561 static int
1562 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1563 {
1564 	int err;
1565 
1566 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1567 	if (err)
1568 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1569 				err);
1570 
1571 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1572 	if (err)
1573 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1574 
1575 	rte_free(rx_adapter->epoll_events);
1576 	rte_ring_free(rx_adapter->intr_ring);
1577 	rx_adapter->intr_ring = NULL;
1578 	rx_adapter->epoll_events = NULL;
1579 	return 0;
1580 }
1581 
1582 static int
1583 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1584 {
1585 	int ret;
1586 
1587 	if (rx_adapter->num_rx_intr == 0)
1588 		return 0;
1589 
1590 	ret = rxa_destroy_intr_thread(rx_adapter);
1591 	if (ret)
1592 		return ret;
1593 
1594 	close(rx_adapter->epd);
1595 	rx_adapter->epd = INIT_FD;
1596 
1597 	return ret;
1598 }
1599 
1600 static int
1601 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1602 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1603 {
1604 	int err;
1605 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1606 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1607 
1608 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1609 	if (err) {
1610 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1611 			rx_queue_id);
1612 		return err;
1613 	}
1614 
1615 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1616 					rx_adapter->epd,
1617 					RTE_INTR_EVENT_DEL,
1618 					0);
1619 	if (err)
1620 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1621 
1622 	if (sintr)
1623 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1624 	else
1625 		dev_info->shared_intr_enabled = 0;
1626 	return err;
1627 }
1628 
1629 static int
1630 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1631 		   struct eth_device_info *dev_info, int rx_queue_id)
1632 {
1633 	int err;
1634 	int i;
1635 	int s;
1636 
1637 	if (dev_info->nb_rx_intr == 0)
1638 		return 0;
1639 
1640 	err = 0;
1641 	if (rx_queue_id == -1) {
1642 		s = dev_info->nb_shared_intr;
1643 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1644 			int sintr;
1645 			uint16_t q;
1646 
1647 			q = dev_info->intr_queue[i];
1648 			sintr = rxa_shared_intr(dev_info, q);
1649 			s -= sintr;
1650 
1651 			if (!sintr || s == 0) {
1652 
1653 				err = rxa_disable_intr(rx_adapter, dev_info,
1654 						q);
1655 				if (err)
1656 					return err;
1657 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1658 							q);
1659 			}
1660 		}
1661 	} else {
1662 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1663 			return 0;
1664 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1665 				dev_info->nb_shared_intr == 1) {
1666 			err = rxa_disable_intr(rx_adapter, dev_info,
1667 					rx_queue_id);
1668 			if (err)
1669 				return err;
1670 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1671 						rx_queue_id);
1672 		}
1673 
1674 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1675 			if (dev_info->intr_queue[i] == rx_queue_id) {
1676 				for (; i < dev_info->nb_rx_intr - 1; i++)
1677 					dev_info->intr_queue[i] =
1678 						dev_info->intr_queue[i + 1];
1679 				break;
1680 			}
1681 		}
1682 	}
1683 
1684 	return err;
1685 }
1686 
1687 static int
1688 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1689 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1690 {
1691 	int err, err1;
1692 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1693 	union queue_data qd;
1694 	int init_fd;
1695 	uint16_t *intr_queue;
1696 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1697 
1698 	if (rxa_intr_queue(dev_info, rx_queue_id))
1699 		return 0;
1700 
1701 	intr_queue = dev_info->intr_queue;
1702 	if (dev_info->intr_queue == NULL) {
1703 		size_t len =
1704 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1705 		dev_info->intr_queue =
1706 			rte_zmalloc_socket(
1707 				rx_adapter->mem_name,
1708 				len,
1709 				0,
1710 				rx_adapter->socket_id);
1711 		if (dev_info->intr_queue == NULL)
1712 			return -ENOMEM;
1713 	}
1714 
1715 	init_fd = rx_adapter->epd;
1716 	err = rxa_init_epd(rx_adapter);
1717 	if (err)
1718 		goto err_free_queue;
1719 
1720 	qd.port = eth_dev_id;
1721 	qd.queue = rx_queue_id;
1722 
1723 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1724 					rx_adapter->epd,
1725 					RTE_INTR_EVENT_ADD,
1726 					qd.ptr);
1727 	if (err) {
1728 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1729 			" Rx Queue %u err %d", rx_queue_id, err);
1730 		goto err_del_fd;
1731 	}
1732 
1733 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1734 	if (err) {
1735 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1736 				" Rx Queue %u err %d", rx_queue_id, err);
1737 
1738 		goto err_del_event;
1739 	}
1740 
1741 	err = rxa_create_intr_thread(rx_adapter);
1742 	if (!err)  {
1743 		if (sintr)
1744 			dev_info->shared_intr_enabled = 1;
1745 		else
1746 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1747 		return 0;
1748 	}
1749 
1750 
1751 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1752 	if (err)
1753 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1754 				" Rx Queue %u err %d", rx_queue_id, err);
1755 err_del_event:
1756 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1757 					rx_adapter->epd,
1758 					RTE_INTR_EVENT_DEL,
1759 					0);
1760 	if (err1) {
1761 		RTE_EDEV_LOG_ERR("Could not delete event for"
1762 				" Rx Queue %u err %d", rx_queue_id, err1);
1763 	}
1764 err_del_fd:
1765 	if (init_fd == INIT_FD) {
1766 		close(rx_adapter->epd);
1767 		rx_adapter->epd = -1;
1768 	}
1769 err_free_queue:
1770 	if (intr_queue == NULL)
1771 		rte_free(dev_info->intr_queue);
1772 
1773 	return err;
1774 }
1775 
1776 static int
1777 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1778 		   struct eth_device_info *dev_info, int rx_queue_id)
1779 
1780 {
1781 	int i, j, err;
1782 	int si = -1;
1783 	int shared_done = (dev_info->nb_shared_intr > 0);
1784 
1785 	if (rx_queue_id != -1) {
1786 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1787 			return 0;
1788 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1789 	}
1790 
1791 	err = 0;
1792 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1793 
1794 		if (rxa_shared_intr(dev_info, i) && shared_done)
1795 			continue;
1796 
1797 		err = rxa_config_intr(rx_adapter, dev_info, i);
1798 
1799 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1800 		if (shared_done) {
1801 			si = i;
1802 			dev_info->shared_intr_enabled = 1;
1803 		}
1804 		if (err)
1805 			break;
1806 	}
1807 
1808 	if (err == 0)
1809 		return 0;
1810 
1811 	shared_done = (dev_info->nb_shared_intr > 0);
1812 	for (j = 0; j < i; j++) {
1813 		if (rxa_intr_queue(dev_info, j))
1814 			continue;
1815 		if (rxa_shared_intr(dev_info, j) && si != j)
1816 			continue;
1817 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1818 		if (err)
1819 			break;
1820 
1821 	}
1822 
1823 	return err;
1824 }
1825 
1826 static int
1827 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1828 {
1829 	int ret;
1830 	struct rte_service_spec service;
1831 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1832 
1833 	if (rx_adapter->service_inited)
1834 		return 0;
1835 
1836 	memset(&service, 0, sizeof(service));
1837 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1838 		"rte_event_eth_rx_adapter_%d", id);
1839 	service.socket_id = rx_adapter->socket_id;
1840 	service.callback = rxa_service_func;
1841 	service.callback_userdata = rx_adapter;
1842 	/* Service function handles locking for queue add/del updates */
1843 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1844 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1845 	if (ret) {
1846 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1847 			service.name, ret);
1848 		return ret;
1849 	}
1850 
1851 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1852 		&rx_adapter_conf, rx_adapter->conf_arg);
1853 	if (ret) {
1854 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1855 			ret);
1856 		goto err_done;
1857 	}
1858 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1859 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1860 	rx_adapter->service_inited = 1;
1861 	rx_adapter->epd = INIT_FD;
1862 	return 0;
1863 
1864 err_done:
1865 	rte_service_component_unregister(rx_adapter->service_id);
1866 	return ret;
1867 }
1868 
1869 static void
1870 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1871 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1872 		 uint8_t add)
1873 {
1874 	struct eth_rx_queue_info *queue_info;
1875 	int enabled;
1876 	uint16_t i;
1877 
1878 	if (dev_info->rx_queue == NULL)
1879 		return;
1880 
1881 	if (rx_queue_id == -1) {
1882 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1883 			rxa_update_queue(rx_adapter, dev_info, i, add);
1884 	} else {
1885 		queue_info = &dev_info->rx_queue[rx_queue_id];
1886 		enabled = queue_info->queue_enabled;
1887 		if (add) {
1888 			rx_adapter->nb_queues += !enabled;
1889 			dev_info->nb_dev_queues += !enabled;
1890 		} else {
1891 			rx_adapter->nb_queues -= enabled;
1892 			dev_info->nb_dev_queues -= enabled;
1893 		}
1894 		queue_info->queue_enabled = !!add;
1895 	}
1896 }
1897 
1898 static void
1899 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1900 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1901 		    uint16_t port_id)
1902 {
1903 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1904 	struct eth_rx_vector_data *vector_data;
1905 	uint32_t flow_id;
1906 
1907 	vector_data = &queue_info->vector_data;
1908 	vector_data->max_vector_count = vector_count;
1909 	vector_data->port = port_id;
1910 	vector_data->queue = qid;
1911 	vector_data->vector_pool = mp;
1912 	vector_data->vector_timeout_ticks =
1913 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1914 	vector_data->ts = 0;
1915 	flow_id = queue_info->event & 0xFFFFF;
1916 	flow_id =
1917 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1918 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1919 }
1920 
1921 static void
1922 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
1923 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
1924 {
1925 	struct eth_rx_vector_data *vec;
1926 	int pollq;
1927 	int intrq;
1928 	int sintrq;
1929 
1930 
1931 	if (rx_adapter->nb_queues == 0)
1932 		return;
1933 
1934 	if (rx_queue_id == -1) {
1935 		uint16_t nb_rx_queues;
1936 		uint16_t i;
1937 
1938 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1939 		for (i = 0; i <	nb_rx_queues; i++)
1940 			rxa_sw_del(rx_adapter, dev_info, i);
1941 		return;
1942 	}
1943 
1944 	/* Push all the partial event vectors to event device. */
1945 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1946 		if (vec->queue != rx_queue_id)
1947 			continue;
1948 		rxa_vector_expire(vec, rx_adapter);
1949 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1950 	}
1951 
1952 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1953 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1954 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1955 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1956 	rx_adapter->num_rx_polled -= pollq;
1957 	dev_info->nb_rx_poll -= pollq;
1958 	rx_adapter->num_rx_intr -= intrq;
1959 	dev_info->nb_rx_intr -= intrq;
1960 	dev_info->nb_shared_intr -= intrq && sintrq;
1961 	if (rx_adapter->use_queue_event_buf) {
1962 		struct eth_event_enqueue_buffer *event_buf =
1963 			dev_info->rx_queue[rx_queue_id].event_buf;
1964 		struct rte_event_eth_rx_adapter_stats *stats =
1965 			dev_info->rx_queue[rx_queue_id].stats;
1966 		rte_free(event_buf->events);
1967 		rte_free(event_buf);
1968 		rte_free(stats);
1969 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
1970 		dev_info->rx_queue[rx_queue_id].stats = NULL;
1971 	}
1972 }
1973 
1974 static int
1975 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
1976 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
1977 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
1978 {
1979 	struct eth_rx_queue_info *queue_info;
1980 	const struct rte_event *ev = &conf->ev;
1981 	int pollq;
1982 	int intrq;
1983 	int sintrq;
1984 	struct rte_event *qi_ev;
1985 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
1986 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1987 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1988 	int ret;
1989 
1990 	if (rx_queue_id == -1) {
1991 		uint16_t nb_rx_queues;
1992 		uint16_t i;
1993 
1994 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1995 		for (i = 0; i <	nb_rx_queues; i++) {
1996 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
1997 			if (ret)
1998 				return ret;
1999 		}
2000 		return 0;
2001 	}
2002 
2003 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2004 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2005 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2006 
2007 	queue_info = &dev_info->rx_queue[rx_queue_id];
2008 	queue_info->wt = conf->servicing_weight;
2009 
2010 	qi_ev = (struct rte_event *)&queue_info->event;
2011 	qi_ev->event = ev->event;
2012 	qi_ev->op = RTE_EVENT_OP_NEW;
2013 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2014 	qi_ev->sub_event_type = 0;
2015 
2016 	if (conf->rx_queue_flags &
2017 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2018 		queue_info->flow_id_mask = ~0;
2019 	} else
2020 		qi_ev->flow_id = 0;
2021 
2022 	if (conf->rx_queue_flags &
2023 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2024 		queue_info->ena_vector = 1;
2025 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2026 		rxa_set_vector_data(queue_info, conf->vector_sz,
2027 				    conf->vector_timeout_ns, conf->vector_mp,
2028 				    rx_queue_id, dev_info->dev->data->port_id);
2029 		rx_adapter->ena_vector = 1;
2030 		rx_adapter->vector_tmo_ticks =
2031 			rx_adapter->vector_tmo_ticks ?
2032 				      RTE_MIN(queue_info->vector_data
2033 							.vector_timeout_ticks >>
2034 						1,
2035 					rx_adapter->vector_tmo_ticks) :
2036 				queue_info->vector_data.vector_timeout_ticks >>
2037 					1;
2038 	}
2039 
2040 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2041 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2042 		rx_adapter->num_rx_polled += !pollq;
2043 		dev_info->nb_rx_poll += !pollq;
2044 		rx_adapter->num_rx_intr -= intrq;
2045 		dev_info->nb_rx_intr -= intrq;
2046 		dev_info->nb_shared_intr -= intrq && sintrq;
2047 	}
2048 
2049 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2050 		rx_adapter->num_rx_polled -= pollq;
2051 		dev_info->nb_rx_poll -= pollq;
2052 		rx_adapter->num_rx_intr += !intrq;
2053 		dev_info->nb_rx_intr += !intrq;
2054 		dev_info->nb_shared_intr += !intrq && sintrq;
2055 		if (dev_info->nb_shared_intr == 1) {
2056 			if (dev_info->multi_intr_cap)
2057 				dev_info->next_q_idx =
2058 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2059 			else
2060 				dev_info->next_q_idx = 0;
2061 		}
2062 	}
2063 
2064 	if (!rx_adapter->use_queue_event_buf)
2065 		return 0;
2066 
2067 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2068 				sizeof(*new_rx_buf), 0,
2069 				rte_eth_dev_socket_id(eth_dev_id));
2070 	if (new_rx_buf == NULL) {
2071 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2072 				 "dev_id: %d queue_id: %d",
2073 				 eth_dev_id, rx_queue_id);
2074 		return -ENOMEM;
2075 	}
2076 
2077 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2078 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2079 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2080 				sizeof(struct rte_event) *
2081 				new_rx_buf->events_size, 0,
2082 				rte_eth_dev_socket_id(eth_dev_id));
2083 	if (new_rx_buf->events == NULL) {
2084 		rte_free(new_rx_buf);
2085 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2086 				 "dev_id: %d queue_id: %d",
2087 				 eth_dev_id, rx_queue_id);
2088 		return -ENOMEM;
2089 	}
2090 
2091 	queue_info->event_buf = new_rx_buf;
2092 
2093 	/* Allocate storage for adapter queue stats */
2094 	stats = rte_zmalloc_socket("rx_queue_stats",
2095 				sizeof(*stats), 0,
2096 				rte_eth_dev_socket_id(eth_dev_id));
2097 	if (stats == NULL) {
2098 		rte_free(new_rx_buf->events);
2099 		rte_free(new_rx_buf);
2100 		RTE_EDEV_LOG_ERR("Failed to allocate stats storage for"
2101 				 " dev_id: %d queue_id: %d",
2102 				 eth_dev_id, rx_queue_id);
2103 		return -ENOMEM;
2104 	}
2105 
2106 	queue_info->stats = stats;
2107 
2108 	return 0;
2109 }
2110 
2111 static int
2112 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2113 	   int rx_queue_id,
2114 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2115 {
2116 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2117 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2118 	int ret;
2119 	struct eth_rx_poll_entry *rx_poll;
2120 	struct eth_rx_queue_info *rx_queue;
2121 	uint32_t *rx_wrr;
2122 	uint16_t nb_rx_queues;
2123 	uint32_t nb_rx_poll, nb_wrr;
2124 	uint32_t nb_rx_intr;
2125 	int num_intr_vec;
2126 	uint16_t wt;
2127 
2128 	if (queue_conf->servicing_weight == 0) {
2129 		struct rte_eth_dev_data *data = dev_info->dev->data;
2130 
2131 		temp_conf = *queue_conf;
2132 		if (!data->dev_conf.intr_conf.rxq) {
2133 			/* If Rx interrupts are disabled set wt = 1 */
2134 			temp_conf.servicing_weight = 1;
2135 		}
2136 		queue_conf = &temp_conf;
2137 
2138 		if (queue_conf->servicing_weight == 0 &&
2139 		    rx_adapter->use_queue_event_buf) {
2140 
2141 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2142 					 "not supported for interrupt queues "
2143 					 "dev_id: %d queue_id: %d",
2144 					 eth_dev_id, rx_queue_id);
2145 			return -EINVAL;
2146 		}
2147 	}
2148 
2149 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2150 	rx_queue = dev_info->rx_queue;
2151 	wt = queue_conf->servicing_weight;
2152 
2153 	if (dev_info->rx_queue == NULL) {
2154 		dev_info->rx_queue =
2155 		    rte_zmalloc_socket(rx_adapter->mem_name,
2156 				       nb_rx_queues *
2157 				       sizeof(struct eth_rx_queue_info), 0,
2158 				       rx_adapter->socket_id);
2159 		if (dev_info->rx_queue == NULL)
2160 			return -ENOMEM;
2161 	}
2162 	rx_wrr = NULL;
2163 	rx_poll = NULL;
2164 
2165 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2166 			queue_conf->servicing_weight,
2167 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2168 
2169 	if (dev_info->dev->intr_handle)
2170 		dev_info->multi_intr_cap =
2171 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2172 
2173 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2174 				&rx_poll, &rx_wrr);
2175 	if (ret)
2176 		goto err_free_rxqueue;
2177 
2178 	if (wt == 0) {
2179 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2180 
2181 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2182 		if (ret)
2183 			goto err_free_rxqueue;
2184 
2185 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2186 		if (ret)
2187 			goto err_free_rxqueue;
2188 	} else {
2189 
2190 		num_intr_vec = 0;
2191 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2192 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2193 						rx_queue_id, 0);
2194 			/* interrupt based queues are being converted to
2195 			 * poll mode queues, delete the interrupt configuration
2196 			 * for those.
2197 			 */
2198 			ret = rxa_del_intr_queue(rx_adapter,
2199 						dev_info, rx_queue_id);
2200 			if (ret)
2201 				goto err_free_rxqueue;
2202 		}
2203 	}
2204 
2205 	if (nb_rx_intr == 0) {
2206 		ret = rxa_free_intr_resources(rx_adapter);
2207 		if (ret)
2208 			goto err_free_rxqueue;
2209 	}
2210 
2211 	if (wt == 0) {
2212 		uint16_t i;
2213 
2214 		if (rx_queue_id  == -1) {
2215 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2216 				dev_info->intr_queue[i] = i;
2217 		} else {
2218 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2219 				dev_info->intr_queue[nb_rx_intr - 1] =
2220 					rx_queue_id;
2221 		}
2222 	}
2223 
2224 
2225 
2226 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2227 	if (ret)
2228 		goto err_free_rxqueue;
2229 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2230 
2231 	rte_free(rx_adapter->eth_rx_poll);
2232 	rte_free(rx_adapter->wrr_sched);
2233 
2234 	rx_adapter->eth_rx_poll = rx_poll;
2235 	rx_adapter->wrr_sched = rx_wrr;
2236 	rx_adapter->wrr_len = nb_wrr;
2237 	rx_adapter->num_intr_vec += num_intr_vec;
2238 	return 0;
2239 
2240 err_free_rxqueue:
2241 	if (rx_queue == NULL) {
2242 		rte_free(dev_info->rx_queue);
2243 		dev_info->rx_queue = NULL;
2244 	}
2245 
2246 	rte_free(rx_poll);
2247 	rte_free(rx_wrr);
2248 
2249 	return ret;
2250 }
2251 
2252 static int
2253 rxa_ctrl(uint8_t id, int start)
2254 {
2255 	struct event_eth_rx_adapter *rx_adapter;
2256 	struct rte_eventdev *dev;
2257 	struct eth_device_info *dev_info;
2258 	uint32_t i;
2259 	int use_service = 0;
2260 	int stop = !start;
2261 
2262 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2263 	rx_adapter = rxa_id_to_adapter(id);
2264 	if (rx_adapter == NULL)
2265 		return -EINVAL;
2266 
2267 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2268 
2269 	RTE_ETH_FOREACH_DEV(i) {
2270 		dev_info = &rx_adapter->eth_devices[i];
2271 		/* if start  check for num dev queues */
2272 		if (start && !dev_info->nb_dev_queues)
2273 			continue;
2274 		/* if stop check if dev has been started */
2275 		if (stop && !dev_info->dev_rx_started)
2276 			continue;
2277 		use_service |= !dev_info->internal_event_port;
2278 		dev_info->dev_rx_started = start;
2279 		if (dev_info->internal_event_port == 0)
2280 			continue;
2281 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2282 						&rte_eth_devices[i]) :
2283 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2284 						&rte_eth_devices[i]);
2285 	}
2286 
2287 	if (use_service) {
2288 		rte_spinlock_lock(&rx_adapter->rx_lock);
2289 		rx_adapter->rxa_started = start;
2290 		rte_service_runstate_set(rx_adapter->service_id, start);
2291 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2292 	}
2293 
2294 	return 0;
2295 }
2296 
2297 static int
2298 rxa_create(uint8_t id, uint8_t dev_id,
2299 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2300 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2301 	   void *conf_arg)
2302 {
2303 	struct event_eth_rx_adapter *rx_adapter;
2304 	struct eth_event_enqueue_buffer *buf;
2305 	struct rte_event *events;
2306 	int ret;
2307 	int socket_id;
2308 	uint16_t i;
2309 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2310 	const uint8_t default_rss_key[] = {
2311 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2312 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2313 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2314 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2315 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2316 	};
2317 
2318 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2319 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2320 
2321 	if (conf_cb == NULL)
2322 		return -EINVAL;
2323 
2324 	if (event_eth_rx_adapter == NULL) {
2325 		ret = rte_event_eth_rx_adapter_init();
2326 		if (ret)
2327 			return ret;
2328 	}
2329 
2330 	rx_adapter = rxa_id_to_adapter(id);
2331 	if (rx_adapter != NULL) {
2332 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2333 		return -EEXIST;
2334 	}
2335 
2336 	socket_id = rte_event_dev_socket_id(dev_id);
2337 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2338 		"rte_event_eth_rx_adapter_%d",
2339 		id);
2340 
2341 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2342 			RTE_CACHE_LINE_SIZE, socket_id);
2343 	if (rx_adapter == NULL) {
2344 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2345 		return -ENOMEM;
2346 	}
2347 
2348 	rx_adapter->eventdev_id = dev_id;
2349 	rx_adapter->socket_id = socket_id;
2350 	rx_adapter->conf_cb = conf_cb;
2351 	rx_adapter->conf_arg = conf_arg;
2352 	rx_adapter->id = id;
2353 	TAILQ_INIT(&rx_adapter->vector_list);
2354 	strcpy(rx_adapter->mem_name, mem_name);
2355 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2356 					RTE_MAX_ETHPORTS *
2357 					sizeof(struct eth_device_info), 0,
2358 					socket_id);
2359 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2360 			(uint32_t *)rx_adapter->rss_key_be,
2361 			    RTE_DIM(default_rss_key));
2362 
2363 	if (rx_adapter->eth_devices == NULL) {
2364 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2365 		rte_free(rx_adapter);
2366 		return -ENOMEM;
2367 	}
2368 
2369 	rte_spinlock_init(&rx_adapter->rx_lock);
2370 
2371 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2372 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2373 
2374 	/* Rx adapter event buffer allocation */
2375 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2376 
2377 	if (!rx_adapter->use_queue_event_buf) {
2378 		buf = &rx_adapter->event_enqueue_buffer;
2379 		buf->events_size = rxa_params->event_buf_size;
2380 
2381 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2382 					    buf->events_size * sizeof(*events),
2383 					    0, socket_id);
2384 		if (events == NULL) {
2385 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2386 					 "for adapter event buffer");
2387 			rte_free(rx_adapter->eth_devices);
2388 			rte_free(rx_adapter);
2389 			return -ENOMEM;
2390 		}
2391 
2392 		rx_adapter->event_enqueue_buffer.events = events;
2393 	}
2394 
2395 	event_eth_rx_adapter[id] = rx_adapter;
2396 
2397 	if (conf_cb == rxa_default_conf_cb)
2398 		rx_adapter->default_cb_arg = 1;
2399 
2400 	if (rte_mbuf_dyn_rx_timestamp_register(
2401 			&event_eth_rx_timestamp_dynfield_offset,
2402 			&event_eth_rx_timestamp_dynflag) != 0) {
2403 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2404 		return -rte_errno;
2405 	}
2406 
2407 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2408 		conf_arg);
2409 	return 0;
2410 }
2411 
2412 int
2413 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2414 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2415 				void *conf_arg)
2416 {
2417 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2418 
2419 	/* use default values for adapter params */
2420 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2421 	rxa_params.use_queue_event_buf = false;
2422 
2423 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2424 }
2425 
2426 int
2427 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2428 			struct rte_event_port_conf *port_config,
2429 			struct rte_event_eth_rx_adapter_params *rxa_params)
2430 {
2431 	struct rte_event_port_conf *pc;
2432 	int ret;
2433 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2434 
2435 	if (port_config == NULL)
2436 		return -EINVAL;
2437 
2438 	if (rxa_params == NULL) {
2439 		/* use default values if rxa_params is NULL */
2440 		rxa_params = &temp_params;
2441 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2442 		rxa_params->use_queue_event_buf = false;
2443 	} else if ((!rxa_params->use_queue_event_buf &&
2444 		    rxa_params->event_buf_size == 0) ||
2445 		   (rxa_params->use_queue_event_buf &&
2446 		    rxa_params->event_buf_size != 0)) {
2447 		RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2448 		return -EINVAL;
2449 	} else if (!rxa_params->use_queue_event_buf) {
2450 		/* adjust event buff size with BATCH_SIZE used for fetching
2451 		 * packets from NIC rx queues to get full buffer utilization
2452 		 * and prevent unnecessary rollovers.
2453 		 */
2454 
2455 		rxa_params->event_buf_size =
2456 			RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2457 		rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2458 	}
2459 
2460 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2461 	if (pc == NULL)
2462 		return -ENOMEM;
2463 
2464 	*pc = *port_config;
2465 
2466 	ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2467 	if (ret)
2468 		rte_free(pc);
2469 
2470 	return ret;
2471 }
2472 
2473 int
2474 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2475 		struct rte_event_port_conf *port_config)
2476 {
2477 	struct rte_event_port_conf *pc;
2478 	int ret;
2479 
2480 	if (port_config == NULL)
2481 		return -EINVAL;
2482 
2483 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2484 
2485 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2486 	if (pc == NULL)
2487 		return -ENOMEM;
2488 	*pc = *port_config;
2489 
2490 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2491 					rxa_default_conf_cb,
2492 					pc);
2493 	if (ret)
2494 		rte_free(pc);
2495 	return ret;
2496 }
2497 
2498 int
2499 rte_event_eth_rx_adapter_free(uint8_t id)
2500 {
2501 	struct event_eth_rx_adapter *rx_adapter;
2502 
2503 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2504 
2505 	rx_adapter = rxa_id_to_adapter(id);
2506 	if (rx_adapter == NULL)
2507 		return -EINVAL;
2508 
2509 	if (rx_adapter->nb_queues) {
2510 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2511 				rx_adapter->nb_queues);
2512 		return -EBUSY;
2513 	}
2514 
2515 	if (rx_adapter->default_cb_arg)
2516 		rte_free(rx_adapter->conf_arg);
2517 	rte_free(rx_adapter->eth_devices);
2518 	if (!rx_adapter->use_queue_event_buf)
2519 		rte_free(rx_adapter->event_enqueue_buffer.events);
2520 	rte_free(rx_adapter);
2521 	event_eth_rx_adapter[id] = NULL;
2522 
2523 	rte_eventdev_trace_eth_rx_adapter_free(id);
2524 	return 0;
2525 }
2526 
2527 int
2528 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2529 		uint16_t eth_dev_id,
2530 		int32_t rx_queue_id,
2531 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2532 {
2533 	int ret;
2534 	uint32_t cap;
2535 	struct event_eth_rx_adapter *rx_adapter;
2536 	struct rte_eventdev *dev;
2537 	struct eth_device_info *dev_info;
2538 	struct rte_event_eth_rx_adapter_vector_limits limits;
2539 
2540 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2541 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2542 
2543 	rx_adapter = rxa_id_to_adapter(id);
2544 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2545 		return -EINVAL;
2546 
2547 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2548 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2549 						eth_dev_id,
2550 						&cap);
2551 	if (ret) {
2552 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2553 			"eth port %" PRIu16, id, eth_dev_id);
2554 		return ret;
2555 	}
2556 
2557 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2558 		&& (queue_conf->rx_queue_flags &
2559 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2560 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2561 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2562 				eth_dev_id, id);
2563 		return -EINVAL;
2564 	}
2565 
2566 	if (queue_conf->rx_queue_flags &
2567 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2568 
2569 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2570 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2571 					 " eth port: %" PRIu16
2572 					 " adapter id: %" PRIu8,
2573 					 eth_dev_id, id);
2574 			return -EINVAL;
2575 		}
2576 
2577 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2578 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2579 		if (ret < 0) {
2580 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2581 					 " eth port: %" PRIu16
2582 					 " adapter id: %" PRIu8,
2583 					 eth_dev_id, id);
2584 			return -EINVAL;
2585 		}
2586 		if (queue_conf->vector_sz < limits.min_sz ||
2587 		    queue_conf->vector_sz > limits.max_sz ||
2588 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2589 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2590 		    queue_conf->vector_mp == NULL) {
2591 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2592 					 " eth port: %" PRIu16
2593 					 " adapter id: %" PRIu8,
2594 					 eth_dev_id, id);
2595 			return -EINVAL;
2596 		}
2597 		if (queue_conf->vector_mp->elt_size <
2598 		    (sizeof(struct rte_event_vector) +
2599 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2600 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2601 					 " eth port: %" PRIu16
2602 					 " adapter id: %" PRIu8,
2603 					 eth_dev_id, id);
2604 			return -EINVAL;
2605 		}
2606 	}
2607 
2608 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2609 		(rx_queue_id != -1)) {
2610 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2611 			"event queue, eth port: %" PRIu16 " adapter id: %"
2612 			PRIu8, eth_dev_id, id);
2613 		return -EINVAL;
2614 	}
2615 
2616 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2617 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2618 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2619 			 (uint16_t)rx_queue_id);
2620 		return -EINVAL;
2621 	}
2622 
2623 	if ((rx_adapter->use_queue_event_buf &&
2624 	     queue_conf->event_buf_size == 0) ||
2625 	    (!rx_adapter->use_queue_event_buf &&
2626 	     queue_conf->event_buf_size != 0)) {
2627 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2628 		return -EINVAL;
2629 	}
2630 
2631 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2632 
2633 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2634 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2635 					-ENOTSUP);
2636 		if (dev_info->rx_queue == NULL) {
2637 			dev_info->rx_queue =
2638 			    rte_zmalloc_socket(rx_adapter->mem_name,
2639 					dev_info->dev->data->nb_rx_queues *
2640 					sizeof(struct eth_rx_queue_info), 0,
2641 					rx_adapter->socket_id);
2642 			if (dev_info->rx_queue == NULL)
2643 				return -ENOMEM;
2644 		}
2645 
2646 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2647 				&rte_eth_devices[eth_dev_id],
2648 				rx_queue_id, queue_conf);
2649 		if (ret == 0) {
2650 			dev_info->internal_event_port = 1;
2651 			rxa_update_queue(rx_adapter,
2652 					&rx_adapter->eth_devices[eth_dev_id],
2653 					rx_queue_id,
2654 					1);
2655 		}
2656 	} else {
2657 		rte_spinlock_lock(&rx_adapter->rx_lock);
2658 		dev_info->internal_event_port = 0;
2659 		ret = rxa_init_service(rx_adapter, id);
2660 		if (ret == 0) {
2661 			uint32_t service_id = rx_adapter->service_id;
2662 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2663 					queue_conf);
2664 			rte_service_component_runstate_set(service_id,
2665 				rxa_sw_adapter_queue_count(rx_adapter));
2666 		}
2667 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2668 	}
2669 
2670 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2671 		rx_queue_id, queue_conf, ret);
2672 	if (ret)
2673 		return ret;
2674 
2675 	return 0;
2676 }
2677 
2678 static int
2679 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2680 {
2681 	limits->max_sz = MAX_VECTOR_SIZE;
2682 	limits->min_sz = MIN_VECTOR_SIZE;
2683 	limits->max_timeout_ns = MAX_VECTOR_NS;
2684 	limits->min_timeout_ns = MIN_VECTOR_NS;
2685 
2686 	return 0;
2687 }
2688 
2689 int
2690 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2691 				int32_t rx_queue_id)
2692 {
2693 	int ret = 0;
2694 	struct rte_eventdev *dev;
2695 	struct event_eth_rx_adapter *rx_adapter;
2696 	struct eth_device_info *dev_info;
2697 	uint32_t cap;
2698 	uint32_t nb_rx_poll = 0;
2699 	uint32_t nb_wrr = 0;
2700 	uint32_t nb_rx_intr;
2701 	struct eth_rx_poll_entry *rx_poll = NULL;
2702 	uint32_t *rx_wrr = NULL;
2703 	int num_intr_vec;
2704 
2705 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2706 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2707 
2708 	rx_adapter = rxa_id_to_adapter(id);
2709 	if (rx_adapter == NULL)
2710 		return -EINVAL;
2711 
2712 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2713 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2714 						eth_dev_id,
2715 						&cap);
2716 	if (ret)
2717 		return ret;
2718 
2719 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2720 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2721 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2722 			 (uint16_t)rx_queue_id);
2723 		return -EINVAL;
2724 	}
2725 
2726 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2727 
2728 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2729 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2730 				 -ENOTSUP);
2731 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2732 						&rte_eth_devices[eth_dev_id],
2733 						rx_queue_id);
2734 		if (ret == 0) {
2735 			rxa_update_queue(rx_adapter,
2736 					&rx_adapter->eth_devices[eth_dev_id],
2737 					rx_queue_id,
2738 					0);
2739 			if (dev_info->nb_dev_queues == 0) {
2740 				rte_free(dev_info->rx_queue);
2741 				dev_info->rx_queue = NULL;
2742 			}
2743 		}
2744 	} else {
2745 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2746 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2747 
2748 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2749 			&rx_poll, &rx_wrr);
2750 		if (ret)
2751 			return ret;
2752 
2753 		rte_spinlock_lock(&rx_adapter->rx_lock);
2754 
2755 		num_intr_vec = 0;
2756 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2757 
2758 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2759 						rx_queue_id, 0);
2760 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2761 					rx_queue_id);
2762 			if (ret)
2763 				goto unlock_ret;
2764 		}
2765 
2766 		if (nb_rx_intr == 0) {
2767 			ret = rxa_free_intr_resources(rx_adapter);
2768 			if (ret)
2769 				goto unlock_ret;
2770 		}
2771 
2772 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2773 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2774 
2775 		rte_free(rx_adapter->eth_rx_poll);
2776 		rte_free(rx_adapter->wrr_sched);
2777 
2778 		if (nb_rx_intr == 0) {
2779 			rte_free(dev_info->intr_queue);
2780 			dev_info->intr_queue = NULL;
2781 		}
2782 
2783 		rx_adapter->eth_rx_poll = rx_poll;
2784 		rx_adapter->wrr_sched = rx_wrr;
2785 		rx_adapter->wrr_len = nb_wrr;
2786 		/*
2787 		 * reset next poll start position (wrr_pos) to avoid buffer
2788 		 * overrun when wrr_len is reduced in case of queue delete
2789 		 */
2790 		rx_adapter->wrr_pos = 0;
2791 		rx_adapter->num_intr_vec += num_intr_vec;
2792 
2793 		if (dev_info->nb_dev_queues == 0) {
2794 			rte_free(dev_info->rx_queue);
2795 			dev_info->rx_queue = NULL;
2796 		}
2797 unlock_ret:
2798 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2799 		if (ret) {
2800 			rte_free(rx_poll);
2801 			rte_free(rx_wrr);
2802 			return ret;
2803 		}
2804 
2805 		rte_service_component_runstate_set(rx_adapter->service_id,
2806 				rxa_sw_adapter_queue_count(rx_adapter));
2807 	}
2808 
2809 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2810 		rx_queue_id, ret);
2811 	return ret;
2812 }
2813 
2814 int
2815 rte_event_eth_rx_adapter_vector_limits_get(
2816 	uint8_t dev_id, uint16_t eth_port_id,
2817 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2818 {
2819 	struct rte_eventdev *dev;
2820 	uint32_t cap;
2821 	int ret;
2822 
2823 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2824 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2825 
2826 	if (limits == NULL)
2827 		return -EINVAL;
2828 
2829 	dev = &rte_eventdevs[dev_id];
2830 
2831 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2832 	if (ret) {
2833 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2834 				 "eth port %" PRIu16,
2835 				 dev_id, eth_port_id);
2836 		return ret;
2837 	}
2838 
2839 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2840 		RTE_FUNC_PTR_OR_ERR_RET(
2841 			*dev->dev_ops->eth_rx_adapter_vector_limits_get,
2842 			-ENOTSUP);
2843 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2844 			dev, &rte_eth_devices[eth_port_id], limits);
2845 	} else {
2846 		ret = rxa_sw_vector_limits(limits);
2847 	}
2848 
2849 	return ret;
2850 }
2851 
2852 int
2853 rte_event_eth_rx_adapter_start(uint8_t id)
2854 {
2855 	rte_eventdev_trace_eth_rx_adapter_start(id);
2856 	return rxa_ctrl(id, 1);
2857 }
2858 
2859 int
2860 rte_event_eth_rx_adapter_stop(uint8_t id)
2861 {
2862 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2863 	return rxa_ctrl(id, 0);
2864 }
2865 
2866 static inline void
2867 rxa_queue_stats_reset(struct eth_rx_queue_info *queue_info)
2868 {
2869 	struct rte_event_eth_rx_adapter_stats *q_stats;
2870 
2871 	q_stats = queue_info->stats;
2872 	memset(q_stats, 0, sizeof(*q_stats));
2873 }
2874 
2875 int
2876 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2877 			       struct rte_event_eth_rx_adapter_stats *stats)
2878 {
2879 	struct event_eth_rx_adapter *rx_adapter;
2880 	struct eth_event_enqueue_buffer *buf;
2881 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2882 	struct rte_event_eth_rx_adapter_stats dev_stats;
2883 	struct rte_eventdev *dev;
2884 	struct eth_device_info *dev_info;
2885 	struct eth_rx_queue_info *queue_info;
2886 	struct rte_event_eth_rx_adapter_stats *q_stats;
2887 	uint32_t i, j;
2888 	int ret;
2889 
2890 	if (rxa_memzone_lookup())
2891 		return -ENOMEM;
2892 
2893 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2894 
2895 	rx_adapter = rxa_id_to_adapter(id);
2896 	if (rx_adapter  == NULL || stats == NULL)
2897 		return -EINVAL;
2898 
2899 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2900 	memset(stats, 0, sizeof(*stats));
2901 
2902 	if (rx_adapter->service_inited)
2903 		*stats = rx_adapter->stats;
2904 
2905 	RTE_ETH_FOREACH_DEV(i) {
2906 		dev_info = &rx_adapter->eth_devices[i];
2907 
2908 		if (rx_adapter->use_queue_event_buf && dev_info->rx_queue) {
2909 
2910 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
2911 			     j++) {
2912 				queue_info = &dev_info->rx_queue[j];
2913 				if (!queue_info->queue_enabled)
2914 					continue;
2915 				q_stats = queue_info->stats;
2916 
2917 				stats->rx_packets += q_stats->rx_packets;
2918 				stats->rx_poll_count += q_stats->rx_poll_count;
2919 				stats->rx_enq_count += q_stats->rx_enq_count;
2920 				stats->rx_enq_retry += q_stats->rx_enq_retry;
2921 				stats->rx_dropped += q_stats->rx_dropped;
2922 				stats->rx_enq_block_cycles +=
2923 						q_stats->rx_enq_block_cycles;
2924 			}
2925 		}
2926 
2927 		if (dev_info->internal_event_port == 0 ||
2928 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2929 			continue;
2930 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2931 						&rte_eth_devices[i],
2932 						&dev_stats);
2933 		if (ret)
2934 			continue;
2935 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
2936 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2937 	}
2938 
2939 	buf = &rx_adapter->event_enqueue_buffer;
2940 	stats->rx_packets += dev_stats_sum.rx_packets;
2941 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2942 	stats->rx_event_buf_count = buf->count;
2943 	stats->rx_event_buf_size = buf->events_size;
2944 
2945 	return 0;
2946 }
2947 
2948 int
2949 rte_event_eth_rx_adapter_queue_stats_get(uint8_t id,
2950 		uint16_t eth_dev_id,
2951 		uint16_t rx_queue_id,
2952 		struct rte_event_eth_rx_adapter_queue_stats *stats)
2953 {
2954 	struct event_eth_rx_adapter *rx_adapter;
2955 	struct eth_device_info *dev_info;
2956 	struct eth_rx_queue_info *queue_info;
2957 	struct eth_event_enqueue_buffer *event_buf;
2958 	struct rte_event_eth_rx_adapter_stats *q_stats;
2959 	struct rte_eventdev *dev;
2960 
2961 	if (rxa_memzone_lookup())
2962 		return -ENOMEM;
2963 
2964 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2965 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2966 
2967 	rx_adapter = rxa_id_to_adapter(id);
2968 
2969 	if (rx_adapter == NULL || stats == NULL)
2970 		return -EINVAL;
2971 
2972 	if (!rx_adapter->use_queue_event_buf)
2973 		return -EINVAL;
2974 
2975 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2976 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
2977 		return -EINVAL;
2978 	}
2979 
2980 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2981 	if (dev_info->rx_queue == NULL ||
2982 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
2983 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
2984 		return -EINVAL;
2985 	}
2986 
2987 	queue_info = &dev_info->rx_queue[rx_queue_id];
2988 	event_buf = queue_info->event_buf;
2989 	q_stats = queue_info->stats;
2990 
2991 	stats->rx_event_buf_count = event_buf->count;
2992 	stats->rx_event_buf_size = event_buf->events_size;
2993 	stats->rx_packets = q_stats->rx_packets;
2994 	stats->rx_poll_count = q_stats->rx_poll_count;
2995 	stats->rx_dropped = q_stats->rx_dropped;
2996 
2997 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2998 	if (dev->dev_ops->eth_rx_adapter_queue_stats_get != NULL) {
2999 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_get)(dev,
3000 						&rte_eth_devices[eth_dev_id],
3001 						rx_queue_id, stats);
3002 	}
3003 
3004 	return 0;
3005 }
3006 
3007 int
3008 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
3009 {
3010 	struct event_eth_rx_adapter *rx_adapter;
3011 	struct rte_eventdev *dev;
3012 	struct eth_device_info *dev_info;
3013 	struct eth_rx_queue_info *queue_info;
3014 	uint32_t i, j;
3015 
3016 	if (rxa_memzone_lookup())
3017 		return -ENOMEM;
3018 
3019 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3020 
3021 	rx_adapter = rxa_id_to_adapter(id);
3022 	if (rx_adapter == NULL)
3023 		return -EINVAL;
3024 
3025 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3026 
3027 	RTE_ETH_FOREACH_DEV(i) {
3028 		dev_info = &rx_adapter->eth_devices[i];
3029 
3030 		if (rx_adapter->use_queue_event_buf  && dev_info->rx_queue) {
3031 
3032 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3033 						j++) {
3034 				queue_info = &dev_info->rx_queue[j];
3035 				if (!queue_info->queue_enabled)
3036 					continue;
3037 				rxa_queue_stats_reset(queue_info);
3038 			}
3039 		}
3040 
3041 		if (dev_info->internal_event_port == 0 ||
3042 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
3043 			continue;
3044 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
3045 							&rte_eth_devices[i]);
3046 	}
3047 
3048 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
3049 
3050 	return 0;
3051 }
3052 
3053 int
3054 rte_event_eth_rx_adapter_queue_stats_reset(uint8_t id,
3055 		uint16_t eth_dev_id,
3056 		uint16_t rx_queue_id)
3057 {
3058 	struct event_eth_rx_adapter *rx_adapter;
3059 	struct eth_device_info *dev_info;
3060 	struct eth_rx_queue_info *queue_info;
3061 	struct rte_eventdev *dev;
3062 
3063 	if (rxa_memzone_lookup())
3064 		return -ENOMEM;
3065 
3066 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3067 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3068 
3069 	rx_adapter = rxa_id_to_adapter(id);
3070 	if (rx_adapter == NULL)
3071 		return -EINVAL;
3072 
3073 	if (!rx_adapter->use_queue_event_buf)
3074 		return -EINVAL;
3075 
3076 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3077 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3078 		return -EINVAL;
3079 	}
3080 
3081 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3082 
3083 	if (dev_info->rx_queue == NULL ||
3084 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3085 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3086 		return -EINVAL;
3087 	}
3088 
3089 	queue_info = &dev_info->rx_queue[rx_queue_id];
3090 	rxa_queue_stats_reset(queue_info);
3091 
3092 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3093 	if (dev->dev_ops->eth_rx_adapter_queue_stats_reset != NULL) {
3094 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_reset)(dev,
3095 						&rte_eth_devices[eth_dev_id],
3096 						rx_queue_id);
3097 	}
3098 
3099 	return 0;
3100 }
3101 
3102 int
3103 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
3104 {
3105 	struct event_eth_rx_adapter *rx_adapter;
3106 
3107 	if (rxa_memzone_lookup())
3108 		return -ENOMEM;
3109 
3110 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3111 
3112 	rx_adapter = rxa_id_to_adapter(id);
3113 	if (rx_adapter == NULL || service_id == NULL)
3114 		return -EINVAL;
3115 
3116 	if (rx_adapter->service_inited)
3117 		*service_id = rx_adapter->service_id;
3118 
3119 	return rx_adapter->service_inited ? 0 : -ESRCH;
3120 }
3121 
3122 int
3123 rte_event_eth_rx_adapter_cb_register(uint8_t id,
3124 					uint16_t eth_dev_id,
3125 					rte_event_eth_rx_adapter_cb_fn cb_fn,
3126 					void *cb_arg)
3127 {
3128 	struct event_eth_rx_adapter *rx_adapter;
3129 	struct eth_device_info *dev_info;
3130 	uint32_t cap;
3131 	int ret;
3132 
3133 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3134 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3135 
3136 	rx_adapter = rxa_id_to_adapter(id);
3137 	if (rx_adapter == NULL)
3138 		return -EINVAL;
3139 
3140 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3141 	if (dev_info->rx_queue == NULL)
3142 		return -EINVAL;
3143 
3144 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3145 						eth_dev_id,
3146 						&cap);
3147 	if (ret) {
3148 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3149 			"eth port %" PRIu16, id, eth_dev_id);
3150 		return ret;
3151 	}
3152 
3153 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3154 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
3155 				PRIu16, eth_dev_id);
3156 		return -EINVAL;
3157 	}
3158 
3159 	rte_spinlock_lock(&rx_adapter->rx_lock);
3160 	dev_info->cb_fn = cb_fn;
3161 	dev_info->cb_arg = cb_arg;
3162 	rte_spinlock_unlock(&rx_adapter->rx_lock);
3163 
3164 	return 0;
3165 }
3166 
3167 int
3168 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
3169 			uint16_t eth_dev_id,
3170 			uint16_t rx_queue_id,
3171 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
3172 {
3173 	struct rte_eventdev *dev;
3174 	struct event_eth_rx_adapter *rx_adapter;
3175 	struct eth_device_info *dev_info;
3176 	struct eth_rx_queue_info *queue_info;
3177 	struct rte_event *qi_ev;
3178 	int ret;
3179 
3180 	if (rxa_memzone_lookup())
3181 		return -ENOMEM;
3182 
3183 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3184 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3185 
3186 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3187 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3188 		return -EINVAL;
3189 	}
3190 
3191 	if (queue_conf == NULL) {
3192 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3193 		return -EINVAL;
3194 	}
3195 
3196 	rx_adapter = rxa_id_to_adapter(id);
3197 	if (rx_adapter == NULL)
3198 		return -EINVAL;
3199 
3200 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3201 	if (dev_info->rx_queue == NULL ||
3202 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3203 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3204 		return -EINVAL;
3205 	}
3206 
3207 	queue_info = &dev_info->rx_queue[rx_queue_id];
3208 	qi_ev = (struct rte_event *)&queue_info->event;
3209 
3210 	memset(queue_conf, 0, sizeof(*queue_conf));
3211 	queue_conf->rx_queue_flags = 0;
3212 	if (queue_info->flow_id_mask != 0)
3213 		queue_conf->rx_queue_flags |=
3214 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3215 	queue_conf->servicing_weight = queue_info->wt;
3216 
3217 	memcpy(&queue_conf->ev, qi_ev, sizeof(*qi_ev));
3218 
3219 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3220 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3221 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3222 						&rte_eth_devices[eth_dev_id],
3223 						rx_queue_id,
3224 						queue_conf);
3225 		return ret;
3226 	}
3227 
3228 	return 0;
3229 }
3230 
3231 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_u64(d, #s, stats.s)
3232 
3233 static int
3234 handle_rxa_stats(const char *cmd __rte_unused,
3235 		 const char *params,
3236 		 struct rte_tel_data *d)
3237 {
3238 	uint8_t rx_adapter_id;
3239 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3240 
3241 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3242 		return -1;
3243 
3244 	/* Get Rx adapter ID from parameter string */
3245 	rx_adapter_id = atoi(params);
3246 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3247 
3248 	/* Get Rx adapter stats */
3249 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3250 					       &rx_adptr_stats)) {
3251 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats\n");
3252 		return -1;
3253 	}
3254 
3255 	rte_tel_data_start_dict(d);
3256 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3257 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3258 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3259 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3260 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3261 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3262 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3263 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3264 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3265 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3266 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3267 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3268 
3269 	return 0;
3270 }
3271 
3272 static int
3273 handle_rxa_stats_reset(const char *cmd __rte_unused,
3274 		       const char *params,
3275 		       struct rte_tel_data *d __rte_unused)
3276 {
3277 	uint8_t rx_adapter_id;
3278 
3279 	if (params == NULL || strlen(params) == 0 || ~isdigit(*params))
3280 		return -1;
3281 
3282 	/* Get Rx adapter ID from parameter string */
3283 	rx_adapter_id = atoi(params);
3284 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3285 
3286 	/* Reset Rx adapter stats */
3287 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3288 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats\n");
3289 		return -1;
3290 	}
3291 
3292 	return 0;
3293 }
3294 
3295 static int
3296 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3297 			  const char *params,
3298 			  struct rte_tel_data *d)
3299 {
3300 	uint8_t rx_adapter_id;
3301 	uint16_t rx_queue_id;
3302 	int eth_dev_id;
3303 	char *token, *l_params;
3304 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3305 
3306 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3307 		return -1;
3308 
3309 	/* Get Rx adapter ID from parameter string */
3310 	l_params = strdup(params);
3311 	token = strtok(l_params, ",");
3312 	rx_adapter_id = strtoul(token, NULL, 10);
3313 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3314 
3315 	token = strtok(NULL, ",");
3316 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3317 		return -1;
3318 
3319 	/* Get device ID from parameter string */
3320 	eth_dev_id = strtoul(token, NULL, 10);
3321 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3322 
3323 	token = strtok(NULL, ",");
3324 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3325 		return -1;
3326 
3327 	/* Get Rx queue ID from parameter string */
3328 	rx_queue_id = strtoul(token, NULL, 10);
3329 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3330 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3331 		return -EINVAL;
3332 	}
3333 
3334 	token = strtok(NULL, "\0");
3335 	if (token != NULL)
3336 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3337 				 " telemetry command, igrnoring");
3338 
3339 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3340 						    rx_queue_id, &queue_conf)) {
3341 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3342 		return -1;
3343 	}
3344 
3345 	rte_tel_data_start_dict(d);
3346 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3347 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3348 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3349 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3350 	RXA_ADD_DICT(queue_conf, servicing_weight);
3351 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3352 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3353 	RXA_ADD_DICT(queue_conf.ev, priority);
3354 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3355 
3356 	return 0;
3357 }
3358 
3359 static int
3360 handle_rxa_get_queue_stats(const char *cmd __rte_unused,
3361 			   const char *params,
3362 			   struct rte_tel_data *d)
3363 {
3364 	uint8_t rx_adapter_id;
3365 	uint16_t rx_queue_id;
3366 	int eth_dev_id;
3367 	char *token, *l_params;
3368 	struct rte_event_eth_rx_adapter_queue_stats q_stats;
3369 
3370 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3371 		return -1;
3372 
3373 	/* Get Rx adapter ID from parameter string */
3374 	l_params = strdup(params);
3375 	token = strtok(l_params, ",");
3376 	rx_adapter_id = strtoul(token, NULL, 10);
3377 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3378 
3379 	token = strtok(NULL, ",");
3380 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3381 		return -1;
3382 
3383 	/* Get device ID from parameter string */
3384 	eth_dev_id = strtoul(token, NULL, 10);
3385 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3386 
3387 	token = strtok(NULL, ",");
3388 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3389 		return -1;
3390 
3391 	/* Get Rx queue ID from parameter string */
3392 	rx_queue_id = strtoul(token, NULL, 10);
3393 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3394 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3395 		return -EINVAL;
3396 	}
3397 
3398 	token = strtok(NULL, "\0");
3399 	if (token != NULL)
3400 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3401 				 " telemetry command, igrnoring");
3402 
3403 	if (rte_event_eth_rx_adapter_queue_stats_get(rx_adapter_id, eth_dev_id,
3404 						    rx_queue_id, &q_stats)) {
3405 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue stats");
3406 		return -1;
3407 	}
3408 
3409 	rte_tel_data_start_dict(d);
3410 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3411 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3412 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3413 	RXA_ADD_DICT(q_stats, rx_event_buf_count);
3414 	RXA_ADD_DICT(q_stats, rx_event_buf_size);
3415 	RXA_ADD_DICT(q_stats, rx_poll_count);
3416 	RXA_ADD_DICT(q_stats, rx_packets);
3417 	RXA_ADD_DICT(q_stats, rx_dropped);
3418 
3419 	return 0;
3420 }
3421 
3422 static int
3423 handle_rxa_queue_stats_reset(const char *cmd __rte_unused,
3424 			     const char *params,
3425 			     struct rte_tel_data *d __rte_unused)
3426 {
3427 	uint8_t rx_adapter_id;
3428 	uint16_t rx_queue_id;
3429 	int eth_dev_id;
3430 	char *token, *l_params;
3431 
3432 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3433 		return -1;
3434 
3435 	/* Get Rx adapter ID from parameter string */
3436 	l_params = strdup(params);
3437 	token = strtok(l_params, ",");
3438 	rx_adapter_id = strtoul(token, NULL, 10);
3439 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3440 
3441 	token = strtok(NULL, ",");
3442 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3443 		return -1;
3444 
3445 	/* Get device ID from parameter string */
3446 	eth_dev_id = strtoul(token, NULL, 10);
3447 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3448 
3449 	token = strtok(NULL, ",");
3450 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3451 		return -1;
3452 
3453 	/* Get Rx queue ID from parameter string */
3454 	rx_queue_id = strtoul(token, NULL, 10);
3455 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3456 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3457 		return -EINVAL;
3458 	}
3459 
3460 	token = strtok(NULL, "\0");
3461 	if (token != NULL)
3462 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3463 				 " telemetry command, igrnoring");
3464 
3465 	if (rte_event_eth_rx_adapter_queue_stats_reset(rx_adapter_id,
3466 						       eth_dev_id,
3467 						       rx_queue_id)) {
3468 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter queue stats");
3469 		return -1;
3470 	}
3471 
3472 	return 0;
3473 }
3474 
3475 RTE_INIT(rxa_init_telemetry)
3476 {
3477 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3478 		handle_rxa_stats,
3479 		"Returns Rx adapter stats. Parameter: rxa_id");
3480 
3481 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3482 		handle_rxa_stats_reset,
3483 		"Reset Rx adapter stats. Parameter: rxa_id");
3484 
3485 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3486 		handle_rxa_get_queue_conf,
3487 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3488 
3489 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats",
3490 		handle_rxa_get_queue_stats,
3491 		"Returns Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3492 
3493 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats_reset",
3494 		handle_rxa_queue_stats_reset,
3495 		"Reset Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3496 }
3497