xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 952b24bd0475450e548d4aafae7d8cf48258402b)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <ctype.h>
6 #include <stdlib.h>
7 #include <pthread.h>
8 #if defined(__linux__)
9 #include <sys/epoll.h>
10 #endif
11 #include <unistd.h>
12 
13 #include <rte_cycles.h>
14 #include <rte_thread.h>
15 #include <rte_common.h>
16 #include <dev_driver.h>
17 #include <rte_errno.h>
18 #include <ethdev_driver.h>
19 #include <rte_log.h>
20 #include <rte_malloc.h>
21 #include <rte_service_component.h>
22 #include <rte_thash.h>
23 #include <rte_interrupts.h>
24 #include <rte_mbuf_dyn.h>
25 #include <rte_telemetry.h>
26 
27 #include "rte_eventdev.h"
28 #include "eventdev_pmd.h"
29 #include "eventdev_trace.h"
30 #include "rte_event_eth_rx_adapter.h"
31 
32 #define BATCH_SIZE		32
33 #define BLOCK_CNT_THRESHOLD	10
34 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
35 #define MAX_VECTOR_SIZE		1024
36 #define MIN_VECTOR_SIZE		4
37 #define MAX_VECTOR_NS		1E9
38 #define MIN_VECTOR_NS		1E5
39 
40 #define RXA_NB_RX_WORK_DEFAULT 128
41 
42 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
43 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
44 
45 #define RSS_KEY_SIZE	40
46 /* value written to intr thread pipe to signal thread exit */
47 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
48 /* Sentinel value to detect initialized file handle */
49 #define INIT_FD		-1
50 
51 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
52 
53 /*
54  * Used to store port and queue ID of interrupting Rx queue
55  */
56 union queue_data {
57 	void *ptr;
58 	struct {
59 		uint16_t port;
60 		uint16_t queue;
61 	};
62 };
63 
64 /*
65  * There is an instance of this struct per polled Rx queue added to the
66  * adapter
67  */
68 struct eth_rx_poll_entry {
69 	/* Eth port to poll */
70 	uint16_t eth_dev_id;
71 	/* Eth rx queue to poll */
72 	uint16_t eth_rx_qid;
73 };
74 
75 struct __rte_cache_aligned eth_rx_vector_data {
76 	TAILQ_ENTRY(eth_rx_vector_data) next;
77 	uint16_t port;
78 	uint16_t queue;
79 	uint16_t max_vector_count;
80 	uint64_t event;
81 	uint64_t ts;
82 	uint64_t vector_timeout_ticks;
83 	struct rte_mempool *vector_pool;
84 	struct rte_event_vector *vector_ev;
85 };
86 
87 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
88 
89 /* Instance per adapter */
90 struct eth_event_enqueue_buffer {
91 	/* Count of events in this buffer */
92 	uint16_t count;
93 	/* Array of events in this buffer */
94 	struct rte_event *events;
95 	/* size of event buffer */
96 	uint16_t events_size;
97 	/* Event enqueue happens from head */
98 	uint16_t head;
99 	/* New packets from rte_eth_rx_burst is enqued from tail */
100 	uint16_t tail;
101 	/* last element in the buffer before rollover */
102 	uint16_t last;
103 	uint16_t last_mask;
104 };
105 
106 struct __rte_cache_aligned event_eth_rx_adapter {
107 	/* RSS key */
108 	uint8_t rss_key_be[RSS_KEY_SIZE];
109 	/* Event device identifier */
110 	uint8_t eventdev_id;
111 	/* Event port identifier */
112 	uint8_t event_port_id;
113 	/* Flag indicating per rxq event buffer */
114 	bool use_queue_event_buf;
115 	/* Per ethernet device structure */
116 	struct eth_device_info *eth_devices;
117 	/* Lock to serialize config updates with service function */
118 	rte_spinlock_t rx_lock;
119 	/* Max mbufs processed in any service function invocation */
120 	uint32_t max_nb_rx;
121 	/* Receive queues that need to be polled */
122 	struct eth_rx_poll_entry *eth_rx_poll;
123 	/* Size of the eth_rx_poll array */
124 	uint16_t num_rx_polled;
125 	/* Weighted round robin schedule */
126 	uint32_t *wrr_sched;
127 	/* wrr_sched[] size */
128 	uint32_t wrr_len;
129 	/* Next entry in wrr[] to begin polling */
130 	uint32_t wrr_pos;
131 	/* Event burst buffer */
132 	struct eth_event_enqueue_buffer event_enqueue_buffer;
133 	/* Vector enable flag */
134 	uint8_t ena_vector;
135 	/* Timestamp of previous vector expiry list traversal */
136 	uint64_t prev_expiry_ts;
137 	/* Minimum ticks to wait before traversing expiry list */
138 	uint64_t vector_tmo_ticks;
139 	/* vector list */
140 	struct eth_rx_vector_data_list vector_list;
141 	/* Per adapter stats */
142 	struct rte_event_eth_rx_adapter_stats stats;
143 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
144 	uint16_t enq_block_count;
145 	/* Block start ts */
146 	uint64_t rx_enq_block_start_ts;
147 	/* epoll fd used to wait for Rx interrupts */
148 	int epd;
149 	/* Num of interrupt driven interrupt queues */
150 	uint32_t num_rx_intr;
151 	/* Used to send <dev id, queue id> of interrupting Rx queues from
152 	 * the interrupt thread to the Rx thread
153 	 */
154 	struct rte_ring *intr_ring;
155 	/* Rx Queue data (dev id, queue id) for the last non-empty
156 	 * queue polled
157 	 */
158 	union queue_data qd;
159 	/* queue_data is valid */
160 	int qd_valid;
161 	/* Interrupt ring lock, synchronizes Rx thread
162 	 * and interrupt thread
163 	 */
164 	rte_spinlock_t intr_ring_lock;
165 	/* event array passed to rte_poll_wait */
166 	struct rte_epoll_event *epoll_events;
167 	/* Count of interrupt vectors in use */
168 	uint32_t num_intr_vec;
169 	/* Thread blocked on Rx interrupts */
170 	rte_thread_t rx_intr_thread;
171 	/* Configuration callback for rte_service configuration */
172 	rte_event_eth_rx_adapter_conf_cb conf_cb;
173 	/* Configuration callback argument */
174 	void *conf_arg;
175 	/* Set if  default_cb is being used */
176 	int default_cb_arg;
177 	/* Service initialization state */
178 	uint8_t service_inited;
179 	/* Total count of Rx queues in adapter */
180 	uint32_t nb_queues;
181 	/* Memory allocation name */
182 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
183 	/* Socket identifier cached from eventdev */
184 	int socket_id;
185 	/* Per adapter EAL service */
186 	uint32_t service_id;
187 	/* Adapter started flag */
188 	uint8_t rxa_started;
189 	/* Adapter ID */
190 	uint8_t id;
191 };
192 
193 /* Per eth device */
194 struct eth_device_info {
195 	struct rte_eth_dev *dev;
196 	struct eth_rx_queue_info *rx_queue;
197 	/* Rx callback */
198 	rte_event_eth_rx_adapter_cb_fn cb_fn;
199 	/* Rx callback argument */
200 	void *cb_arg;
201 	/* Set if ethdev->eventdev packet transfer uses a
202 	 * hardware mechanism
203 	 */
204 	uint8_t internal_event_port;
205 	/* Set if the adapter is processing rx queues for
206 	 * this eth device and packet processing has been
207 	 * started, allows for the code to know if the PMD
208 	 * rx_adapter_stop callback needs to be invoked
209 	 */
210 	uint8_t dev_rx_started;
211 	/* Number of queues added for this device */
212 	uint16_t nb_dev_queues;
213 	/* Number of poll based queues
214 	 * If nb_rx_poll > 0, the start callback will
215 	 * be invoked if not already invoked
216 	 */
217 	uint16_t nb_rx_poll;
218 	/* Number of interrupt based queues
219 	 * If nb_rx_intr > 0, the start callback will
220 	 * be invoked if not already invoked.
221 	 */
222 	uint16_t nb_rx_intr;
223 	/* Number of queues that use the shared interrupt */
224 	uint16_t nb_shared_intr;
225 	/* sum(wrr(q)) for all queues within the device
226 	 * useful when deleting all device queues
227 	 */
228 	uint32_t wrr_len;
229 	/* Intr based queue index to start polling from, this is used
230 	 * if the number of shared interrupts is non-zero
231 	 */
232 	uint16_t next_q_idx;
233 	/* Intr based queue indices */
234 	uint16_t *intr_queue;
235 	/* device generates per Rx queue interrupt for queue index
236 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
237 	 */
238 	int multi_intr_cap;
239 	/* shared interrupt enabled */
240 	int shared_intr_enabled;
241 };
242 
243 /* Per Rx queue */
244 struct eth_rx_queue_info {
245 	int queue_enabled;	/* True if added */
246 	int intr_enabled;
247 	uint8_t ena_vector;
248 	uint16_t wt;		/* Polling weight */
249 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
250 	uint64_t event;
251 	struct eth_rx_vector_data vector_data;
252 	struct eth_event_enqueue_buffer *event_buf;
253 	/* use adapter stats struct for queue level stats,
254 	 * as same stats need to be updated for adapter and queue
255 	 */
256 	struct rte_event_eth_rx_adapter_stats *stats;
257 };
258 
259 static struct event_eth_rx_adapter **event_eth_rx_adapter;
260 
261 /* Enable dynamic timestamp field in mbuf */
262 static uint64_t event_eth_rx_timestamp_dynflag;
263 static int event_eth_rx_timestamp_dynfield_offset = -1;
264 
265 static inline rte_mbuf_timestamp_t *
266 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
267 {
268 	return RTE_MBUF_DYNFIELD(mbuf,
269 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
270 }
271 
272 static inline int
273 rxa_validate_id(uint8_t id)
274 {
275 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
276 }
277 
278 static inline struct eth_event_enqueue_buffer *
279 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
280 		  uint16_t rx_queue_id,
281 		  struct rte_event_eth_rx_adapter_stats **stats)
282 {
283 	if (rx_adapter->use_queue_event_buf) {
284 		struct eth_device_info *dev_info =
285 			&rx_adapter->eth_devices[eth_dev_id];
286 		*stats = dev_info->rx_queue[rx_queue_id].stats;
287 		return dev_info->rx_queue[rx_queue_id].event_buf;
288 	} else {
289 		*stats = &rx_adapter->stats;
290 		return &rx_adapter->event_enqueue_buffer;
291 	}
292 }
293 
294 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
295 	if (!rxa_validate_id(id)) { \
296 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d", id); \
297 		return retval; \
298 	} \
299 } while (0)
300 
301 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(id, retval) do { \
302 	if (!rxa_validate_id(id)) { \
303 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d", id); \
304 		ret = retval; \
305 		goto error; \
306 	} \
307 } while (0)
308 
309 #define RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, retval) do { \
310 	if ((token) == NULL || strlen(token) == 0 || !isdigit(*token)) { \
311 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter token"); \
312 		ret = retval; \
313 		goto error; \
314 	} \
315 } while (0)
316 
317 #define RTE_EVENT_ETH_RX_ADAPTER_PORTID_VALID_OR_GOTO_ERR_RET(port_id, retval) do { \
318 	if (!rte_eth_dev_is_valid_port(port_id)) { \
319 		RTE_EDEV_LOG_ERR("Invalid port_id=%u", port_id); \
320 		ret = retval; \
321 		goto error; \
322 	} \
323 } while (0)
324 
325 static inline int
326 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
327 {
328 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
329 }
330 
331 /* Greatest common divisor */
332 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
333 {
334 	uint16_t r = a % b;
335 
336 	return r ? rxa_gcd_u16(b, r) : b;
337 }
338 
339 /* Returns the next queue in the polling sequence
340  *
341  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
342  */
343 static int
344 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
345 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
346 	     uint16_t gcd, int prev)
347 {
348 	int i = prev;
349 	uint16_t w;
350 
351 	while (1) {
352 		uint16_t q;
353 		uint16_t d;
354 
355 		i = (i + 1) % n;
356 		if (i == 0) {
357 			*cw = *cw - gcd;
358 			if (*cw <= 0)
359 				*cw = max_wt;
360 		}
361 
362 		q = eth_rx_poll[i].eth_rx_qid;
363 		d = eth_rx_poll[i].eth_dev_id;
364 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
365 
366 		if ((int)w >= *cw)
367 			return i;
368 	}
369 }
370 
371 static inline int
372 rxa_shared_intr(struct eth_device_info *dev_info,
373 	int rx_queue_id)
374 {
375 	int multi_intr_cap;
376 
377 	if (dev_info->dev->intr_handle == NULL)
378 		return 0;
379 
380 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
381 	return !multi_intr_cap ||
382 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
383 }
384 
385 static inline int
386 rxa_intr_queue(struct eth_device_info *dev_info,
387 	int rx_queue_id)
388 {
389 	struct eth_rx_queue_info *queue_info;
390 
391 	queue_info = &dev_info->rx_queue[rx_queue_id];
392 	return dev_info->rx_queue &&
393 		!dev_info->internal_event_port &&
394 		queue_info->queue_enabled && queue_info->wt == 0;
395 }
396 
397 static inline int
398 rxa_polled_queue(struct eth_device_info *dev_info,
399 	int rx_queue_id)
400 {
401 	struct eth_rx_queue_info *queue_info;
402 
403 	queue_info = &dev_info->rx_queue[rx_queue_id];
404 	return !dev_info->internal_event_port &&
405 		dev_info->rx_queue &&
406 		queue_info->queue_enabled && queue_info->wt != 0;
407 }
408 
409 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
410 static int
411 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
412 {
413 	uint16_t i;
414 	int n, s;
415 	uint16_t nbq;
416 
417 	nbq = dev_info->dev->data->nb_rx_queues;
418 	n = 0; /* non shared count */
419 	s = 0; /* shared count */
420 
421 	if (rx_queue_id == -1) {
422 		for (i = 0; i < nbq; i++) {
423 			if (!rxa_shared_intr(dev_info, i))
424 				n += add ? !rxa_intr_queue(dev_info, i) :
425 					rxa_intr_queue(dev_info, i);
426 			else
427 				s += add ? !rxa_intr_queue(dev_info, i) :
428 					rxa_intr_queue(dev_info, i);
429 		}
430 
431 		if (s > 0) {
432 			if ((add && dev_info->nb_shared_intr == 0) ||
433 				(!add && dev_info->nb_shared_intr))
434 				n += 1;
435 		}
436 	} else {
437 		if (!rxa_shared_intr(dev_info, rx_queue_id))
438 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
439 				rxa_intr_queue(dev_info, rx_queue_id);
440 		else
441 			n = add ? !dev_info->nb_shared_intr :
442 				dev_info->nb_shared_intr == 1;
443 	}
444 
445 	return add ? n : -n;
446 }
447 
448 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
449  */
450 static void
451 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
452 			  struct eth_device_info *dev_info, int rx_queue_id,
453 			  uint32_t *nb_rx_intr)
454 {
455 	uint32_t intr_diff;
456 
457 	if (rx_queue_id == -1)
458 		intr_diff = dev_info->nb_rx_intr;
459 	else
460 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
461 
462 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
463 }
464 
465 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
466  * interrupt queues could currently be poll mode Rx queues
467  */
468 static void
469 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
470 			  struct eth_device_info *dev_info, int rx_queue_id,
471 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
472 			  uint32_t *nb_wrr)
473 {
474 	uint32_t intr_diff;
475 	uint32_t poll_diff;
476 	uint32_t wrr_len_diff;
477 
478 	if (rx_queue_id == -1) {
479 		intr_diff = dev_info->dev->data->nb_rx_queues -
480 						dev_info->nb_rx_intr;
481 		poll_diff = dev_info->nb_rx_poll;
482 		wrr_len_diff = dev_info->wrr_len;
483 	} else {
484 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
485 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
486 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
487 					0;
488 	}
489 
490 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
491 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
492 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
493 }
494 
495 /* Calculate size of the eth_rx_poll and wrr_sched arrays
496  * after deleting poll mode rx queues
497  */
498 static void
499 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
500 			  struct eth_device_info *dev_info, int rx_queue_id,
501 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
502 {
503 	uint32_t poll_diff;
504 	uint32_t wrr_len_diff;
505 
506 	if (rx_queue_id == -1) {
507 		poll_diff = dev_info->nb_rx_poll;
508 		wrr_len_diff = dev_info->wrr_len;
509 	} else {
510 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
511 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
512 					0;
513 	}
514 
515 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
516 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
517 }
518 
519 /* Calculate nb_rx_* after adding poll mode rx queues
520  */
521 static void
522 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
523 			  struct eth_device_info *dev_info, int rx_queue_id,
524 			  uint16_t wt, uint32_t *nb_rx_poll,
525 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
526 {
527 	uint32_t intr_diff;
528 	uint32_t poll_diff;
529 	uint32_t wrr_len_diff;
530 
531 	if (rx_queue_id == -1) {
532 		intr_diff = dev_info->nb_rx_intr;
533 		poll_diff = dev_info->dev->data->nb_rx_queues -
534 						dev_info->nb_rx_poll;
535 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
536 				- dev_info->wrr_len;
537 	} else {
538 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
539 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
540 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
541 				wt - dev_info->rx_queue[rx_queue_id].wt :
542 				wt;
543 	}
544 
545 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
546 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
547 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
548 }
549 
550 /* Calculate nb_rx_* after adding rx_queue_id */
551 static void
552 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
553 		     struct eth_device_info *dev_info, int rx_queue_id,
554 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
555 		     uint32_t *nb_wrr)
556 {
557 	if (wt != 0)
558 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
559 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
560 	else
561 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
562 					nb_rx_poll, nb_rx_intr, nb_wrr);
563 }
564 
565 /* Calculate nb_rx_* after deleting rx_queue_id */
566 static void
567 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
568 		     struct eth_device_info *dev_info, int rx_queue_id,
569 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
570 		     uint32_t *nb_wrr)
571 {
572 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
573 				nb_wrr);
574 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
575 				nb_rx_intr);
576 }
577 
578 /*
579  * Allocate the rx_poll array
580  */
581 static struct eth_rx_poll_entry *
582 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
583 {
584 	size_t len;
585 
586 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
587 							RTE_CACHE_LINE_SIZE);
588 	return  rte_zmalloc_socket(rx_adapter->mem_name,
589 				len,
590 				RTE_CACHE_LINE_SIZE,
591 				rx_adapter->socket_id);
592 }
593 
594 /*
595  * Allocate the WRR array
596  */
597 static uint32_t *
598 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
599 {
600 	size_t len;
601 
602 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
603 			RTE_CACHE_LINE_SIZE);
604 	return  rte_zmalloc_socket(rx_adapter->mem_name,
605 				len,
606 				RTE_CACHE_LINE_SIZE,
607 				rx_adapter->socket_id);
608 }
609 
610 static int
611 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
612 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
613 		      uint32_t **wrr_sched)
614 {
615 
616 	if (nb_poll == 0) {
617 		*rx_poll = NULL;
618 		*wrr_sched = NULL;
619 		return 0;
620 	}
621 
622 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
623 	if (*rx_poll == NULL) {
624 		*wrr_sched = NULL;
625 		return -ENOMEM;
626 	}
627 
628 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
629 	if (*wrr_sched == NULL) {
630 		rte_free(*rx_poll);
631 		return -ENOMEM;
632 	}
633 	return 0;
634 }
635 
636 /* Precalculate WRR polling sequence for all queues in rx_adapter */
637 static void
638 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
639 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
640 {
641 	uint16_t d;
642 	uint16_t q;
643 	unsigned int i;
644 	int prev = -1;
645 	int cw = -1;
646 
647 	/* Initialize variables for calculation of wrr schedule */
648 	uint16_t max_wrr_pos = 0;
649 	unsigned int poll_q = 0;
650 	uint16_t max_wt = 0;
651 	uint16_t gcd = 0;
652 
653 	if (rx_poll == NULL)
654 		return;
655 
656 	/* Generate array of all queues to poll, the size of this
657 	 * array is poll_q
658 	 */
659 	RTE_ETH_FOREACH_DEV(d) {
660 		uint16_t nb_rx_queues;
661 		struct eth_device_info *dev_info =
662 				&rx_adapter->eth_devices[d];
663 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
664 		if (dev_info->rx_queue == NULL)
665 			continue;
666 		if (dev_info->internal_event_port)
667 			continue;
668 		dev_info->wrr_len = 0;
669 		for (q = 0; q < nb_rx_queues; q++) {
670 			struct eth_rx_queue_info *queue_info =
671 				&dev_info->rx_queue[q];
672 			uint16_t wt;
673 
674 			if (!rxa_polled_queue(dev_info, q))
675 				continue;
676 			wt = queue_info->wt;
677 			rx_poll[poll_q].eth_dev_id = d;
678 			rx_poll[poll_q].eth_rx_qid = q;
679 			max_wrr_pos += wt;
680 			dev_info->wrr_len += wt;
681 			max_wt = RTE_MAX(max_wt, wt);
682 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
683 			poll_q++;
684 		}
685 	}
686 
687 	/* Generate polling sequence based on weights */
688 	prev = -1;
689 	cw = -1;
690 	for (i = 0; i < max_wrr_pos; i++) {
691 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
692 				     rx_poll, max_wt, gcd, prev);
693 		prev = rx_wrr[i];
694 	}
695 }
696 
697 static inline void
698 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
699 	struct rte_ipv6_hdr **ipv6_hdr)
700 {
701 	struct rte_ether_hdr *eth_hdr =
702 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
703 	struct rte_vlan_hdr *vlan_hdr;
704 
705 	*ipv4_hdr = NULL;
706 	*ipv6_hdr = NULL;
707 
708 	switch (eth_hdr->ether_type) {
709 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
710 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
711 		break;
712 
713 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
714 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
715 		break;
716 
717 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
718 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
719 		switch (vlan_hdr->eth_proto) {
720 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
721 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
722 			break;
723 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
724 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
725 			break;
726 		default:
727 			break;
728 		}
729 		break;
730 
731 	default:
732 		break;
733 	}
734 }
735 
736 /* Calculate RSS hash for IPv4/6 */
737 static inline uint32_t
738 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
739 {
740 	uint32_t input_len;
741 	void *tuple;
742 	struct rte_ipv4_tuple ipv4_tuple;
743 	struct rte_ipv6_tuple ipv6_tuple;
744 	struct rte_ipv4_hdr *ipv4_hdr;
745 	struct rte_ipv6_hdr *ipv6_hdr;
746 
747 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
748 
749 	if (ipv4_hdr) {
750 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
751 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
752 		tuple = &ipv4_tuple;
753 		input_len = RTE_THASH_V4_L3_LEN;
754 	} else if (ipv6_hdr) {
755 		rte_thash_load_v6_addrs(ipv6_hdr,
756 					(union rte_thash_tuple *)&ipv6_tuple);
757 		tuple = &ipv6_tuple;
758 		input_len = RTE_THASH_V6_L3_LEN;
759 	} else
760 		return 0;
761 
762 	return rte_softrss_be(tuple, input_len, rss_key_be);
763 }
764 
765 static inline int
766 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
767 {
768 	return !!rx_adapter->enq_block_count;
769 }
770 
771 static inline void
772 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
773 {
774 	if (rx_adapter->rx_enq_block_start_ts)
775 		return;
776 
777 	rx_adapter->enq_block_count++;
778 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
779 		return;
780 
781 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
782 }
783 
784 static inline void
785 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
786 		     struct rte_event_eth_rx_adapter_stats *stats)
787 {
788 	if (unlikely(!stats->rx_enq_start_ts))
789 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
790 
791 	if (likely(!rxa_enq_blocked(rx_adapter)))
792 		return;
793 
794 	rx_adapter->enq_block_count = 0;
795 	if (rx_adapter->rx_enq_block_start_ts) {
796 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
797 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
798 		    rx_adapter->rx_enq_block_start_ts;
799 		rx_adapter->rx_enq_block_start_ts = 0;
800 	}
801 }
802 
803 /* Enqueue buffered events to event device */
804 static inline uint16_t
805 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
806 		       struct eth_event_enqueue_buffer *buf,
807 		       struct rte_event_eth_rx_adapter_stats *stats)
808 {
809 	uint16_t count = buf->count;
810 	uint16_t n = 0;
811 
812 	if (!count)
813 		return 0;
814 
815 	if (buf->last)
816 		count = buf->last - buf->head;
817 
818 	if (count) {
819 		n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
820 						rx_adapter->event_port_id,
821 						&buf->events[buf->head],
822 						count);
823 		if (n != count)
824 			stats->rx_enq_retry++;
825 
826 		buf->head += n;
827 	}
828 
829 	if (buf->last && n == count) {
830 		uint16_t n1;
831 
832 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
833 					rx_adapter->event_port_id,
834 					&buf->events[0],
835 					buf->tail);
836 
837 		if (n1 != buf->tail)
838 			stats->rx_enq_retry++;
839 
840 		buf->last = 0;
841 		buf->head = n1;
842 		buf->last_mask = 0;
843 		n += n1;
844 	}
845 
846 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
847 		rxa_enq_block_start_ts(rx_adapter);
848 
849 	buf->count -= n;
850 	stats->rx_enq_count += n;
851 
852 	return n;
853 }
854 
855 static inline void
856 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
857 		struct eth_rx_vector_data *vec)
858 {
859 	vec->vector_ev->nb_elem = 0;
860 	vec->vector_ev->port = vec->port;
861 	vec->vector_ev->queue = vec->queue;
862 	vec->vector_ev->attr_valid = true;
863 	vec->vector_ev->elem_offset = 0;
864 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
865 }
866 
867 static inline uint16_t
868 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
869 			struct eth_rx_queue_info *queue_info,
870 			struct eth_event_enqueue_buffer *buf,
871 			struct rte_mbuf **mbufs, uint16_t num)
872 {
873 	struct rte_event *ev = &buf->events[buf->count];
874 	struct eth_rx_vector_data *vec;
875 	uint16_t filled, space, sz;
876 
877 	filled = 0;
878 	vec = &queue_info->vector_data;
879 
880 	if (vec->vector_ev == NULL) {
881 		if (rte_mempool_get(vec->vector_pool,
882 				    (void **)&vec->vector_ev) < 0) {
883 			rte_pktmbuf_free_bulk(mbufs, num);
884 			return 0;
885 		}
886 		rxa_init_vector(rx_adapter, vec);
887 	}
888 	while (num) {
889 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
890 			/* Event ready. */
891 			ev->event = vec->event;
892 			ev->vec = vec->vector_ev;
893 			ev++;
894 			filled++;
895 			vec->vector_ev = NULL;
896 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
897 			if (rte_mempool_get(vec->vector_pool,
898 					    (void **)&vec->vector_ev) < 0) {
899 				rte_pktmbuf_free_bulk(mbufs, num);
900 				return 0;
901 			}
902 			rxa_init_vector(rx_adapter, vec);
903 		}
904 
905 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
906 		sz = num > space ? space : num;
907 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
908 		       sizeof(void *) * sz);
909 		vec->vector_ev->nb_elem += sz;
910 		num -= sz;
911 		mbufs += sz;
912 		vec->ts = rte_rdtsc();
913 	}
914 
915 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
916 		ev->event = vec->event;
917 		ev->vec = vec->vector_ev;
918 		ev++;
919 		filled++;
920 		vec->vector_ev = NULL;
921 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
922 	}
923 
924 	return filled;
925 }
926 
927 static inline void
928 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
929 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
930 		 struct eth_event_enqueue_buffer *buf,
931 		 struct rte_event_eth_rx_adapter_stats *stats)
932 {
933 	uint32_t i;
934 	struct eth_device_info *dev_info =
935 					&rx_adapter->eth_devices[eth_dev_id];
936 	struct eth_rx_queue_info *eth_rx_queue_info =
937 					&dev_info->rx_queue[rx_queue_id];
938 	uint16_t new_tail = buf->tail;
939 	uint64_t event = eth_rx_queue_info->event;
940 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
941 	struct rte_mbuf *m = mbufs[0];
942 	uint32_t rss_mask;
943 	uint32_t rss;
944 	int do_rss;
945 	uint16_t nb_cb;
946 	uint16_t dropped;
947 	uint64_t ts, ts_mask;
948 
949 	if (!eth_rx_queue_info->ena_vector) {
950 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
951 						0 : rte_get_tsc_cycles();
952 
953 		/* 0xffff ffff ffff ffff if RTE_MBUF_F_RX_TIMESTAMP is set,
954 		 * otherwise 0
955 		 */
956 		ts_mask = (uint64_t)(!(m->ol_flags &
957 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
958 
959 		/* 0xffff ffff if RTE_MBUF_F_RX_RSS_HASH is set, otherwise 0 */
960 		rss_mask = ~(((m->ol_flags & RTE_MBUF_F_RX_RSS_HASH) != 0) - 1);
961 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
962 		for (i = 0; i < num; i++) {
963 			struct rte_event *ev;
964 
965 			m = mbufs[i];
966 			*rxa_timestamp_dynfield(m) = ts |
967 					(*rxa_timestamp_dynfield(m) & ts_mask);
968 
969 			ev = &buf->events[new_tail];
970 
971 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
972 				     : m->hash.rss;
973 			ev->event = event;
974 			ev->flow_id = (rss & ~flow_id_mask) |
975 				      (ev->flow_id & flow_id_mask);
976 			ev->mbuf = m;
977 			new_tail++;
978 		}
979 	} else {
980 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
981 					      buf, mbufs, num);
982 	}
983 
984 	if (num && dev_info->cb_fn) {
985 
986 		dropped = 0;
987 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
988 				       buf->last |
989 				       (buf->events_size & ~buf->last_mask),
990 				       buf->count >= BATCH_SIZE ?
991 						buf->count - BATCH_SIZE : 0,
992 				       &buf->events[buf->tail],
993 				       num,
994 				       dev_info->cb_arg,
995 				       &dropped);
996 		if (unlikely(nb_cb > num))
997 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
998 				nb_cb, num);
999 		else
1000 			num = nb_cb;
1001 		if (dropped)
1002 			stats->rx_dropped += dropped;
1003 	}
1004 
1005 	buf->count += num;
1006 	buf->tail += num;
1007 }
1008 
1009 static inline bool
1010 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
1011 {
1012 	uint32_t nb_req = buf->tail + BATCH_SIZE;
1013 
1014 	if (!buf->last) {
1015 		if (nb_req <= buf->events_size)
1016 			return true;
1017 
1018 		if (buf->head >= BATCH_SIZE) {
1019 			buf->last_mask = ~0;
1020 			buf->last = buf->tail;
1021 			buf->tail = 0;
1022 			return true;
1023 		}
1024 	}
1025 
1026 	return nb_req <= buf->head;
1027 }
1028 
1029 /* Enqueue packets from  <port, q>  to event buffer */
1030 static inline uint32_t
1031 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
1032 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
1033 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf,
1034 	   struct rte_event_eth_rx_adapter_stats *stats)
1035 {
1036 	struct rte_mbuf *mbufs[BATCH_SIZE];
1037 	uint16_t n;
1038 	uint32_t nb_rx = 0;
1039 	uint32_t nb_flushed = 0;
1040 
1041 	if (rxq_empty)
1042 		*rxq_empty = 0;
1043 	/* Don't do a batch dequeue from the rx queue if there isn't
1044 	 * enough space in the enqueue buffer.
1045 	 */
1046 	while (rxa_pkt_buf_available(buf)) {
1047 		if (buf->count >= BATCH_SIZE)
1048 			nb_flushed +=
1049 				rxa_flush_event_buffer(rx_adapter, buf, stats);
1050 
1051 		stats->rx_poll_count++;
1052 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1053 		if (unlikely(!n)) {
1054 			if (rxq_empty)
1055 				*rxq_empty = 1;
1056 			break;
1057 		}
1058 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf,
1059 				 stats);
1060 		nb_rx += n;
1061 		if (rx_count + nb_rx > max_rx)
1062 			break;
1063 	}
1064 
1065 	if (buf->count > 0)
1066 		nb_flushed += rxa_flush_event_buffer(rx_adapter, buf, stats);
1067 
1068 	stats->rx_packets += nb_rx;
1069 	if (nb_flushed == 0)
1070 		rte_event_maintain(rx_adapter->eventdev_id,
1071 				   rx_adapter->event_port_id, 0);
1072 
1073 	return nb_rx;
1074 }
1075 
1076 static inline void
1077 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1078 {
1079 	uint16_t port_id;
1080 	uint16_t queue;
1081 	int err;
1082 	union queue_data qd;
1083 	struct eth_device_info *dev_info;
1084 	struct eth_rx_queue_info *queue_info;
1085 	int *intr_enabled;
1086 
1087 	qd.ptr = data;
1088 	port_id = qd.port;
1089 	queue = qd.queue;
1090 
1091 	dev_info = &rx_adapter->eth_devices[port_id];
1092 	queue_info = &dev_info->rx_queue[queue];
1093 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1094 	if (rxa_shared_intr(dev_info, queue))
1095 		intr_enabled = &dev_info->shared_intr_enabled;
1096 	else
1097 		intr_enabled = &queue_info->intr_enabled;
1098 
1099 	if (*intr_enabled) {
1100 		*intr_enabled = 0;
1101 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1102 		/* Entry should always be available.
1103 		 * The ring size equals the maximum number of interrupt
1104 		 * vectors supported (an interrupt vector is shared in
1105 		 * case of shared interrupts)
1106 		 */
1107 		if (err)
1108 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1109 				" to ring: %s", strerror(-err));
1110 		else
1111 			rte_eth_dev_rx_intr_disable(port_id, queue);
1112 	}
1113 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1114 }
1115 
1116 static int
1117 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1118 			  uint32_t num_intr_vec)
1119 {
1120 	if (rx_adapter->num_intr_vec + num_intr_vec >
1121 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1122 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1123 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1124 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1125 		return -ENOSPC;
1126 	}
1127 
1128 	return 0;
1129 }
1130 
1131 /* Delete entries for (dev, queue) from the interrupt ring */
1132 static void
1133 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1134 			  struct eth_device_info *dev_info,
1135 			  uint16_t rx_queue_id)
1136 {
1137 	int i, n;
1138 	union queue_data qd;
1139 
1140 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1141 
1142 	n = rte_ring_count(rx_adapter->intr_ring);
1143 	for (i = 0; i < n; i++) {
1144 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1145 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1146 			if (qd.port == dev_info->dev->data->port_id &&
1147 				qd.queue == rx_queue_id)
1148 				continue;
1149 		} else {
1150 			if (qd.port == dev_info->dev->data->port_id)
1151 				continue;
1152 		}
1153 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1154 	}
1155 
1156 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1157 }
1158 
1159 /* thread callback handling interrupt mode receive queues
1160  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1161  * interrupting queue to the adapter's ring buffer for interrupt events.
1162  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1163  * the adapter service function.
1164  */
1165 static uint32_t
1166 rxa_intr_thread(void *arg)
1167 {
1168 	struct event_eth_rx_adapter *rx_adapter = arg;
1169 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1170 	int n, i;
1171 
1172 	while (1) {
1173 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1174 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1175 		if (unlikely(n < 0))
1176 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1177 					n);
1178 		for (i = 0; i < n; i++) {
1179 			rxa_intr_ring_enqueue(rx_adapter,
1180 					epoll_events[i].epdata.data);
1181 		}
1182 	}
1183 
1184 	return 0;
1185 }
1186 
1187 /* Dequeue <port, q> from interrupt ring and enqueue received
1188  * mbufs to eventdev
1189  */
1190 static inline bool
1191 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1192 {
1193 	uint32_t n;
1194 	uint32_t nb_rx = 0;
1195 	int rxq_empty;
1196 	struct eth_event_enqueue_buffer *buf;
1197 	struct rte_event_eth_rx_adapter_stats *stats;
1198 	rte_spinlock_t *ring_lock;
1199 	uint8_t max_done = 0;
1200 	bool work = false;
1201 
1202 	if (rx_adapter->num_rx_intr == 0)
1203 		return work;
1204 
1205 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1206 		&& !rx_adapter->qd_valid)
1207 		return work;
1208 
1209 	buf = &rx_adapter->event_enqueue_buffer;
1210 	stats = &rx_adapter->stats;
1211 	ring_lock = &rx_adapter->intr_ring_lock;
1212 
1213 	if (buf->count >= BATCH_SIZE) {
1214 		uint16_t n;
1215 
1216 		n = rxa_flush_event_buffer(rx_adapter, buf, stats);
1217 
1218 		if (likely(n > 0))
1219 			work = true;
1220 	}
1221 
1222 	while (rxa_pkt_buf_available(buf)) {
1223 		struct eth_device_info *dev_info;
1224 		uint16_t port;
1225 		uint16_t queue;
1226 		union queue_data qd  = rx_adapter->qd;
1227 		int err;
1228 
1229 		if (!rx_adapter->qd_valid) {
1230 			struct eth_rx_queue_info *queue_info;
1231 
1232 			rte_spinlock_lock(ring_lock);
1233 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1234 			if (err) {
1235 				rte_spinlock_unlock(ring_lock);
1236 				break;
1237 			}
1238 
1239 			port = qd.port;
1240 			queue = qd.queue;
1241 			rx_adapter->qd = qd;
1242 			rx_adapter->qd_valid = 1;
1243 			dev_info = &rx_adapter->eth_devices[port];
1244 			if (rxa_shared_intr(dev_info, queue))
1245 				dev_info->shared_intr_enabled = 1;
1246 			else {
1247 				queue_info = &dev_info->rx_queue[queue];
1248 				queue_info->intr_enabled = 1;
1249 			}
1250 			rte_eth_dev_rx_intr_enable(port, queue);
1251 			rte_spinlock_unlock(ring_lock);
1252 		} else {
1253 			port = qd.port;
1254 			queue = qd.queue;
1255 
1256 			dev_info = &rx_adapter->eth_devices[port];
1257 		}
1258 
1259 		if (rxa_shared_intr(dev_info, queue)) {
1260 			uint16_t i;
1261 			uint16_t nb_queues;
1262 
1263 			nb_queues = dev_info->dev->data->nb_rx_queues;
1264 			n = 0;
1265 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1266 				uint8_t enq_buffer_full;
1267 
1268 				if (!rxa_intr_queue(dev_info, i))
1269 					continue;
1270 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1271 					rx_adapter->max_nb_rx,
1272 					&rxq_empty, buf, stats);
1273 				nb_rx += n;
1274 
1275 				enq_buffer_full = !rxq_empty && n == 0;
1276 				max_done = nb_rx > rx_adapter->max_nb_rx;
1277 
1278 				if (enq_buffer_full || max_done) {
1279 					dev_info->next_q_idx = i;
1280 					goto done;
1281 				}
1282 			}
1283 
1284 			rx_adapter->qd_valid = 0;
1285 
1286 			/* Reinitialize for next interrupt */
1287 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1288 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1289 						0;
1290 		} else {
1291 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1292 				rx_adapter->max_nb_rx,
1293 				&rxq_empty, buf, stats);
1294 			rx_adapter->qd_valid = !rxq_empty;
1295 			nb_rx += n;
1296 			if (nb_rx > rx_adapter->max_nb_rx)
1297 				break;
1298 		}
1299 	}
1300 
1301 done:
1302 	if (nb_rx > 0) {
1303 		rx_adapter->stats.rx_intr_packets += nb_rx;
1304 		work = true;
1305 	}
1306 
1307 	return work;
1308 }
1309 
1310 /*
1311  * Polls receive queues added to the event adapter and enqueues received
1312  * packets to the event device.
1313  *
1314  * The receive code enqueues initially to a temporary buffer, the
1315  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1316  *
1317  * If there isn't space available in the temporary buffer, packets from the
1318  * Rx queue aren't dequeued from the eth device, this back pressures the
1319  * eth device, in virtual device environments this back pressure is relayed to
1320  * the hypervisor's switching layer where adjustments can be made to deal with
1321  * it.
1322  */
1323 static inline bool
1324 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1325 {
1326 	uint32_t num_queue;
1327 	uint32_t nb_rx = 0;
1328 	struct eth_event_enqueue_buffer *buf = NULL;
1329 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1330 	uint32_t wrr_pos;
1331 	uint32_t max_nb_rx;
1332 	bool work = false;
1333 
1334 	wrr_pos = rx_adapter->wrr_pos;
1335 	max_nb_rx = rx_adapter->max_nb_rx;
1336 
1337 	/* Iterate through a WRR sequence */
1338 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1339 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1340 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1341 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1342 
1343 		buf = rxa_event_buf_get(rx_adapter, d, qid, &stats);
1344 
1345 		/* Don't do a batch dequeue from the rx queue if there isn't
1346 		 * enough space in the enqueue buffer.
1347 		 */
1348 		if (buf->count >= BATCH_SIZE) {
1349 			uint16_t n;
1350 
1351 			n = rxa_flush_event_buffer(rx_adapter, buf, stats);
1352 
1353 			if (likely(n > 0))
1354 				work = true;
1355 		}
1356 		if (!rxa_pkt_buf_available(buf)) {
1357 			if (rx_adapter->use_queue_event_buf)
1358 				goto poll_next_entry;
1359 			else {
1360 				rx_adapter->wrr_pos = wrr_pos;
1361 				break;
1362 			}
1363 		}
1364 
1365 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1366 				NULL, buf, stats);
1367 		if (nb_rx > max_nb_rx) {
1368 			rx_adapter->wrr_pos =
1369 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1370 			break;
1371 		}
1372 
1373 poll_next_entry:
1374 		if (++wrr_pos == rx_adapter->wrr_len)
1375 			wrr_pos = 0;
1376 	}
1377 
1378 	if (nb_rx > 0)
1379 		work = true;
1380 
1381 	return work;
1382 }
1383 
1384 static void
1385 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1386 {
1387 	struct event_eth_rx_adapter *rx_adapter = arg;
1388 	struct eth_event_enqueue_buffer *buf = NULL;
1389 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1390 	struct rte_event *ev;
1391 
1392 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue, &stats);
1393 
1394 	if (buf->count)
1395 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1396 
1397 	if (vec->vector_ev->nb_elem == 0)
1398 		return;
1399 	ev = &buf->events[buf->count];
1400 
1401 	/* Event ready. */
1402 	ev->event = vec->event;
1403 	ev->vec = vec->vector_ev;
1404 	buf->count++;
1405 
1406 	vec->vector_ev = NULL;
1407 	vec->ts = 0;
1408 }
1409 
1410 static int
1411 rxa_service_func(void *args)
1412 {
1413 	struct event_eth_rx_adapter *rx_adapter = args;
1414 	bool intr_work;
1415 	bool poll_work;
1416 
1417 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1418 		return -EAGAIN;
1419 	if (!rx_adapter->rxa_started) {
1420 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1421 		return -EAGAIN;
1422 	}
1423 
1424 	if (rx_adapter->ena_vector) {
1425 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1426 		    rx_adapter->vector_tmo_ticks) {
1427 			struct eth_rx_vector_data *vec;
1428 
1429 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1430 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1431 
1432 				if (elapsed_time >= vec->vector_timeout_ticks) {
1433 					rxa_vector_expire(vec, rx_adapter);
1434 					TAILQ_REMOVE(&rx_adapter->vector_list,
1435 						     vec, next);
1436 				}
1437 			}
1438 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1439 		}
1440 	}
1441 
1442 	intr_work = rxa_intr_ring_dequeue(rx_adapter);
1443 	poll_work = rxa_poll(rx_adapter);
1444 
1445 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1446 
1447 	return intr_work || poll_work ? 0 : -EAGAIN;
1448 }
1449 
1450 static void *
1451 rxa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
1452 {
1453 	const struct rte_memzone *mz;
1454 	unsigned int sz;
1455 
1456 	sz = elt_size * nb_elems;
1457 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1458 
1459 	mz = rte_memzone_lookup(name);
1460 	if (mz == NULL) {
1461 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1462 						 RTE_CACHE_LINE_SIZE);
1463 		if (mz == NULL) {
1464 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
1465 					 " name = %s, err = %"
1466 					 PRId32, name, rte_errno);
1467 			return NULL;
1468 		}
1469 	}
1470 
1471 	return mz->addr;
1472 }
1473 
1474 static int
1475 rte_event_eth_rx_adapter_init(void)
1476 {
1477 	uint8_t i;
1478 
1479 	if (event_eth_rx_adapter == NULL) {
1480 		event_eth_rx_adapter =
1481 			rxa_memzone_array_get(RXA_ADAPTER_ARRAY,
1482 					sizeof(*event_eth_rx_adapter),
1483 					RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE);
1484 		if (event_eth_rx_adapter == NULL)
1485 			return -ENOMEM;
1486 
1487 		for (i = 0; i < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; i++)
1488 			event_eth_rx_adapter[i] = NULL;
1489 
1490 	}
1491 
1492 	return 0;
1493 }
1494 
1495 static int
1496 rxa_memzone_lookup(void)
1497 {
1498 	const struct rte_memzone *mz;
1499 
1500 	if (event_eth_rx_adapter == NULL) {
1501 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1502 		if (mz == NULL)
1503 			return -ENOMEM;
1504 
1505 		event_eth_rx_adapter = mz->addr;
1506 	}
1507 
1508 	return 0;
1509 }
1510 
1511 static inline struct event_eth_rx_adapter *
1512 rxa_id_to_adapter(uint8_t id)
1513 {
1514 	return event_eth_rx_adapter ?
1515 		event_eth_rx_adapter[id] : NULL;
1516 }
1517 
1518 static int
1519 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1520 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1521 {
1522 	int ret;
1523 	struct rte_eventdev *dev;
1524 	struct rte_event_dev_config dev_conf;
1525 	int started;
1526 	uint8_t port_id;
1527 	struct rte_event_port_conf *port_conf = arg;
1528 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1529 
1530 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1531 	dev_conf = dev->data->dev_conf;
1532 
1533 	started = dev->data->dev_started;
1534 	if (started)
1535 		rte_event_dev_stop(dev_id);
1536 	port_id = dev_conf.nb_event_ports;
1537 	dev_conf.nb_event_ports += 1;
1538 	if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_SINGLE_LINK)
1539 		dev_conf.nb_single_link_event_port_queues += 1;
1540 
1541 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1542 	if (ret) {
1543 		RTE_EDEV_LOG_ERR("failed to configure event dev %u",
1544 						dev_id);
1545 		if (started) {
1546 			if (rte_event_dev_start(dev_id))
1547 				return -EIO;
1548 		}
1549 		return ret;
1550 	}
1551 
1552 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1553 	if (ret) {
1554 		RTE_EDEV_LOG_ERR("failed to setup event port %u",
1555 					port_id);
1556 		return ret;
1557 	}
1558 
1559 	conf->event_port_id = port_id;
1560 	conf->max_nb_rx = RXA_NB_RX_WORK_DEFAULT;
1561 	if (started)
1562 		ret = rte_event_dev_start(dev_id);
1563 	rx_adapter->default_cb_arg = 1;
1564 	return ret;
1565 }
1566 
1567 static int
1568 rxa_epoll_create1(void)
1569 {
1570 #if defined(__linux__)
1571 	int fd;
1572 	fd = epoll_create1(EPOLL_CLOEXEC);
1573 	return fd < 0 ? -errno : fd;
1574 #else
1575 	return -ENOTSUP;
1576 #endif
1577 }
1578 
1579 static int
1580 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1581 {
1582 	if (rx_adapter->epd != INIT_FD)
1583 		return 0;
1584 
1585 	rx_adapter->epd = rxa_epoll_create1();
1586 	if (rx_adapter->epd < 0) {
1587 		int err = rx_adapter->epd;
1588 		rx_adapter->epd = INIT_FD;
1589 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1590 		return err;
1591 	}
1592 
1593 	return 0;
1594 }
1595 
1596 static int
1597 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1598 {
1599 	int err;
1600 	char thread_name[RTE_THREAD_INTERNAL_NAME_SIZE];
1601 
1602 	if (rx_adapter->intr_ring)
1603 		return 0;
1604 
1605 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1606 					RTE_EVENT_ETH_INTR_RING_SIZE,
1607 					rte_socket_id(), 0);
1608 	if (!rx_adapter->intr_ring)
1609 		return -ENOMEM;
1610 
1611 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1612 					RTE_EVENT_ETH_INTR_RING_SIZE *
1613 					sizeof(struct rte_epoll_event),
1614 					RTE_CACHE_LINE_SIZE,
1615 					rx_adapter->socket_id);
1616 	if (!rx_adapter->epoll_events) {
1617 		err = -ENOMEM;
1618 		goto error;
1619 	}
1620 
1621 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1622 
1623 	snprintf(thread_name, sizeof(thread_name),
1624 			"evt-rx%d", rx_adapter->id);
1625 
1626 	err = rte_thread_create_internal_control(&rx_adapter->rx_intr_thread,
1627 			thread_name, rxa_intr_thread, rx_adapter);
1628 	if (!err)
1629 		return 0;
1630 
1631 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d", err);
1632 	rte_free(rx_adapter->epoll_events);
1633 error:
1634 	rte_ring_free(rx_adapter->intr_ring);
1635 	rx_adapter->intr_ring = NULL;
1636 	rx_adapter->epoll_events = NULL;
1637 	return err;
1638 }
1639 
1640 static int
1641 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1642 {
1643 	int err;
1644 
1645 	err = pthread_cancel((pthread_t)rx_adapter->rx_intr_thread.opaque_id);
1646 	if (err)
1647 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d",
1648 				err);
1649 
1650 	err = rte_thread_join(rx_adapter->rx_intr_thread, NULL);
1651 	if (err)
1652 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d", err);
1653 
1654 	rte_free(rx_adapter->epoll_events);
1655 	rte_ring_free(rx_adapter->intr_ring);
1656 	rx_adapter->intr_ring = NULL;
1657 	rx_adapter->epoll_events = NULL;
1658 	return 0;
1659 }
1660 
1661 static int
1662 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1663 {
1664 	int ret;
1665 
1666 	if (rx_adapter->num_rx_intr == 0)
1667 		return 0;
1668 
1669 	ret = rxa_destroy_intr_thread(rx_adapter);
1670 	if (ret)
1671 		return ret;
1672 
1673 	close(rx_adapter->epd);
1674 	rx_adapter->epd = INIT_FD;
1675 
1676 	return ret;
1677 }
1678 
1679 static int
1680 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1681 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1682 {
1683 	int err;
1684 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1685 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1686 
1687 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1688 	if (err) {
1689 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1690 			rx_queue_id);
1691 		return err;
1692 	}
1693 
1694 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1695 					rx_adapter->epd,
1696 					RTE_INTR_EVENT_DEL,
1697 					0);
1698 	if (err)
1699 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1700 
1701 	if (sintr)
1702 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1703 	else
1704 		dev_info->shared_intr_enabled = 0;
1705 	return err;
1706 }
1707 
1708 static int
1709 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1710 		   struct eth_device_info *dev_info, int rx_queue_id)
1711 {
1712 	int err;
1713 	int i;
1714 	int s;
1715 
1716 	if (dev_info->nb_rx_intr == 0)
1717 		return 0;
1718 
1719 	err = 0;
1720 	if (rx_queue_id == -1) {
1721 		s = dev_info->nb_shared_intr;
1722 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1723 			int sintr;
1724 			uint16_t q;
1725 
1726 			q = dev_info->intr_queue[i];
1727 			sintr = rxa_shared_intr(dev_info, q);
1728 			s -= sintr;
1729 
1730 			if (!sintr || s == 0) {
1731 
1732 				err = rxa_disable_intr(rx_adapter, dev_info,
1733 						q);
1734 				if (err)
1735 					return err;
1736 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1737 							q);
1738 			}
1739 		}
1740 	} else {
1741 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1742 			return 0;
1743 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1744 				dev_info->nb_shared_intr == 1) {
1745 			err = rxa_disable_intr(rx_adapter, dev_info,
1746 					rx_queue_id);
1747 			if (err)
1748 				return err;
1749 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1750 						rx_queue_id);
1751 		}
1752 
1753 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1754 			if (dev_info->intr_queue[i] == rx_queue_id) {
1755 				for (; i < dev_info->nb_rx_intr - 1; i++)
1756 					dev_info->intr_queue[i] =
1757 						dev_info->intr_queue[i + 1];
1758 				break;
1759 			}
1760 		}
1761 	}
1762 
1763 	return err;
1764 }
1765 
1766 static int
1767 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1768 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1769 {
1770 	int err, err1;
1771 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1772 	union queue_data qd;
1773 	int init_fd;
1774 	uint16_t *intr_queue;
1775 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1776 
1777 	if (rxa_intr_queue(dev_info, rx_queue_id))
1778 		return 0;
1779 
1780 	intr_queue = dev_info->intr_queue;
1781 	if (dev_info->intr_queue == NULL) {
1782 		size_t len =
1783 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1784 		dev_info->intr_queue =
1785 			rte_zmalloc_socket(
1786 				rx_adapter->mem_name,
1787 				len,
1788 				0,
1789 				rx_adapter->socket_id);
1790 		if (dev_info->intr_queue == NULL)
1791 			return -ENOMEM;
1792 	}
1793 
1794 	init_fd = rx_adapter->epd;
1795 	err = rxa_init_epd(rx_adapter);
1796 	if (err)
1797 		goto err_free_queue;
1798 
1799 	qd.port = eth_dev_id;
1800 	qd.queue = rx_queue_id;
1801 
1802 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1803 					rx_adapter->epd,
1804 					RTE_INTR_EVENT_ADD,
1805 					qd.ptr);
1806 	if (err) {
1807 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1808 			" Rx Queue %u err %d", rx_queue_id, err);
1809 		goto err_del_fd;
1810 	}
1811 
1812 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1813 	if (err) {
1814 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1815 				" Rx Queue %u err %d", rx_queue_id, err);
1816 
1817 		goto err_del_event;
1818 	}
1819 
1820 	err = rxa_create_intr_thread(rx_adapter);
1821 	if (!err)  {
1822 		if (sintr)
1823 			dev_info->shared_intr_enabled = 1;
1824 		else
1825 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1826 		return 0;
1827 	}
1828 
1829 
1830 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1831 	if (err)
1832 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1833 				" Rx Queue %u err %d", rx_queue_id, err);
1834 err_del_event:
1835 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1836 					rx_adapter->epd,
1837 					RTE_INTR_EVENT_DEL,
1838 					0);
1839 	if (err1) {
1840 		RTE_EDEV_LOG_ERR("Could not delete event for"
1841 				" Rx Queue %u err %d", rx_queue_id, err1);
1842 	}
1843 err_del_fd:
1844 	if (init_fd == INIT_FD) {
1845 		close(rx_adapter->epd);
1846 		rx_adapter->epd = -1;
1847 	}
1848 err_free_queue:
1849 	if (intr_queue == NULL)
1850 		rte_free(dev_info->intr_queue);
1851 
1852 	return err;
1853 }
1854 
1855 static int
1856 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1857 		   struct eth_device_info *dev_info, int rx_queue_id)
1858 
1859 {
1860 	int i, j, err;
1861 	int si = -1;
1862 	int shared_done = (dev_info->nb_shared_intr > 0);
1863 
1864 	if (rx_queue_id != -1) {
1865 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1866 			return 0;
1867 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1868 	}
1869 
1870 	err = 0;
1871 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1872 
1873 		if (rxa_shared_intr(dev_info, i) && shared_done)
1874 			continue;
1875 
1876 		err = rxa_config_intr(rx_adapter, dev_info, i);
1877 
1878 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1879 		if (shared_done) {
1880 			si = i;
1881 			dev_info->shared_intr_enabled = 1;
1882 		}
1883 		if (err)
1884 			break;
1885 	}
1886 
1887 	if (err == 0)
1888 		return 0;
1889 
1890 	shared_done = (dev_info->nb_shared_intr > 0);
1891 	for (j = 0; j < i; j++) {
1892 		if (rxa_intr_queue(dev_info, j))
1893 			continue;
1894 		if (rxa_shared_intr(dev_info, j) && si != j)
1895 			continue;
1896 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1897 		if (err)
1898 			break;
1899 
1900 	}
1901 
1902 	return err;
1903 }
1904 
1905 static int
1906 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1907 {
1908 	int ret;
1909 	struct rte_service_spec service;
1910 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1911 
1912 	if (rx_adapter->service_inited)
1913 		return 0;
1914 
1915 	if (rte_mbuf_dyn_rx_timestamp_register(
1916 			&event_eth_rx_timestamp_dynfield_offset,
1917 			&event_eth_rx_timestamp_dynflag) != 0) {
1918 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf");
1919 		return -rte_errno;
1920 	}
1921 
1922 	memset(&service, 0, sizeof(service));
1923 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1924 		"rte_event_eth_rx_adapter_%d", id);
1925 	service.socket_id = rx_adapter->socket_id;
1926 	service.callback = rxa_service_func;
1927 	service.callback_userdata = rx_adapter;
1928 	/* Service function handles locking for queue add/del updates */
1929 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1930 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1931 	if (ret) {
1932 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1933 			service.name, ret);
1934 		return ret;
1935 	}
1936 
1937 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1938 		&rx_adapter_conf, rx_adapter->conf_arg);
1939 	if (ret) {
1940 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1941 			ret);
1942 		goto err_done;
1943 	}
1944 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1945 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1946 	rx_adapter->service_inited = 1;
1947 	rx_adapter->epd = INIT_FD;
1948 	return 0;
1949 
1950 err_done:
1951 	rte_service_component_unregister(rx_adapter->service_id);
1952 	return ret;
1953 }
1954 
1955 static void
1956 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1957 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1958 		 uint8_t add)
1959 {
1960 	struct eth_rx_queue_info *queue_info;
1961 	int enabled;
1962 	uint16_t i;
1963 
1964 	if (dev_info->rx_queue == NULL)
1965 		return;
1966 
1967 	if (rx_queue_id == -1) {
1968 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1969 			rxa_update_queue(rx_adapter, dev_info, i, add);
1970 	} else {
1971 		queue_info = &dev_info->rx_queue[rx_queue_id];
1972 		enabled = queue_info->queue_enabled;
1973 		if (add) {
1974 			rx_adapter->nb_queues += !enabled;
1975 			dev_info->nb_dev_queues += !enabled;
1976 		} else {
1977 			rx_adapter->nb_queues -= enabled;
1978 			dev_info->nb_dev_queues -= enabled;
1979 		}
1980 		queue_info->queue_enabled = !!add;
1981 	}
1982 }
1983 
1984 static void
1985 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1986 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1987 		    uint16_t port_id)
1988 {
1989 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1990 	struct eth_rx_vector_data *vector_data;
1991 	uint32_t flow_id;
1992 
1993 	vector_data = &queue_info->vector_data;
1994 	vector_data->max_vector_count = vector_count;
1995 	vector_data->port = port_id;
1996 	vector_data->queue = qid;
1997 	vector_data->vector_pool = mp;
1998 	vector_data->vector_timeout_ticks =
1999 		NSEC2TICK(vector_ns, rte_get_timer_hz());
2000 	vector_data->ts = 0;
2001 	flow_id = queue_info->event & 0xFFFFF;
2002 	flow_id =
2003 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
2004 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
2005 }
2006 
2007 static void
2008 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
2009 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
2010 {
2011 	struct eth_rx_vector_data *vec;
2012 	int pollq;
2013 	int intrq;
2014 	int sintrq;
2015 
2016 	if (rx_adapter->nb_queues == 0)
2017 		return;
2018 
2019 	if (rx_queue_id == -1) {
2020 		uint16_t nb_rx_queues;
2021 		uint16_t i;
2022 
2023 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2024 		for (i = 0; i <	nb_rx_queues; i++)
2025 			rxa_sw_del(rx_adapter, dev_info, i);
2026 		return;
2027 	}
2028 
2029 	/* Push all the partial event vectors to event device. */
2030 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
2031 		if (vec->queue != rx_queue_id)
2032 			continue;
2033 		rxa_vector_expire(vec, rx_adapter);
2034 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
2035 	}
2036 
2037 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2038 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2039 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2040 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
2041 	rx_adapter->num_rx_polled -= pollq;
2042 	dev_info->nb_rx_poll -= pollq;
2043 	rx_adapter->num_rx_intr -= intrq;
2044 	dev_info->nb_rx_intr -= intrq;
2045 	dev_info->nb_shared_intr -= intrq && sintrq;
2046 	if (rx_adapter->use_queue_event_buf) {
2047 		struct eth_event_enqueue_buffer *event_buf =
2048 			dev_info->rx_queue[rx_queue_id].event_buf;
2049 		struct rte_event_eth_rx_adapter_stats *stats =
2050 			dev_info->rx_queue[rx_queue_id].stats;
2051 		rte_free(event_buf->events);
2052 		rte_free(event_buf);
2053 		rte_free(stats);
2054 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
2055 		dev_info->rx_queue[rx_queue_id].stats = NULL;
2056 	}
2057 }
2058 
2059 static int
2060 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
2061 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
2062 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
2063 {
2064 	struct eth_rx_queue_info *queue_info;
2065 	const struct rte_event *ev = &conf->ev;
2066 	int pollq;
2067 	int intrq;
2068 	int sintrq;
2069 	struct rte_event *qi_ev;
2070 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
2071 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
2072 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
2073 	int ret;
2074 
2075 	if (rx_queue_id == -1) {
2076 		uint16_t nb_rx_queues;
2077 		uint16_t i;
2078 
2079 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2080 		for (i = 0; i <	nb_rx_queues; i++) {
2081 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
2082 			if (ret)
2083 				return ret;
2084 		}
2085 		return 0;
2086 	}
2087 
2088 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2089 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2090 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2091 
2092 	queue_info = &dev_info->rx_queue[rx_queue_id];
2093 	queue_info->wt = conf->servicing_weight;
2094 
2095 	qi_ev = (struct rte_event *)&queue_info->event;
2096 	qi_ev->event = ev->event;
2097 	qi_ev->op = RTE_EVENT_OP_NEW;
2098 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2099 
2100 	if (conf->rx_queue_flags &
2101 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2102 		queue_info->flow_id_mask = ~0;
2103 	} else
2104 		qi_ev->flow_id = 0;
2105 
2106 	if (conf->rx_queue_flags &
2107 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2108 		queue_info->ena_vector = 1;
2109 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2110 		rxa_set_vector_data(queue_info, conf->vector_sz,
2111 				    conf->vector_timeout_ns, conf->vector_mp,
2112 				    rx_queue_id, dev_info->dev->data->port_id);
2113 		rx_adapter->ena_vector = 1;
2114 		rx_adapter->vector_tmo_ticks =
2115 			rx_adapter->vector_tmo_ticks ?
2116 				      RTE_MIN(queue_info->vector_data
2117 							.vector_timeout_ticks >>
2118 						1,
2119 					rx_adapter->vector_tmo_ticks) :
2120 				queue_info->vector_data.vector_timeout_ticks >>
2121 					1;
2122 	}
2123 
2124 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2125 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2126 		rx_adapter->num_rx_polled += !pollq;
2127 		dev_info->nb_rx_poll += !pollq;
2128 		rx_adapter->num_rx_intr -= intrq;
2129 		dev_info->nb_rx_intr -= intrq;
2130 		dev_info->nb_shared_intr -= intrq && sintrq;
2131 	}
2132 
2133 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2134 		rx_adapter->num_rx_polled -= pollq;
2135 		dev_info->nb_rx_poll -= pollq;
2136 		rx_adapter->num_rx_intr += !intrq;
2137 		dev_info->nb_rx_intr += !intrq;
2138 		dev_info->nb_shared_intr += !intrq && sintrq;
2139 		if (dev_info->nb_shared_intr == 1) {
2140 			if (dev_info->multi_intr_cap)
2141 				dev_info->next_q_idx =
2142 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2143 			else
2144 				dev_info->next_q_idx = 0;
2145 		}
2146 	}
2147 
2148 	if (!rx_adapter->use_queue_event_buf)
2149 		return 0;
2150 
2151 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2152 				sizeof(*new_rx_buf), 0,
2153 				rte_eth_dev_socket_id(eth_dev_id));
2154 	if (new_rx_buf == NULL) {
2155 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2156 				 "dev_id: %d queue_id: %d",
2157 				 eth_dev_id, rx_queue_id);
2158 		return -ENOMEM;
2159 	}
2160 
2161 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2162 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2163 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2164 				sizeof(struct rte_event) *
2165 				new_rx_buf->events_size, 0,
2166 				rte_eth_dev_socket_id(eth_dev_id));
2167 	if (new_rx_buf->events == NULL) {
2168 		rte_free(new_rx_buf);
2169 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2170 				 "dev_id: %d queue_id: %d",
2171 				 eth_dev_id, rx_queue_id);
2172 		return -ENOMEM;
2173 	}
2174 
2175 	queue_info->event_buf = new_rx_buf;
2176 
2177 	/* Allocate storage for adapter queue stats */
2178 	stats = rte_zmalloc_socket("rx_queue_stats",
2179 				sizeof(*stats), 0,
2180 				rte_eth_dev_socket_id(eth_dev_id));
2181 	if (stats == NULL) {
2182 		rte_free(new_rx_buf->events);
2183 		rte_free(new_rx_buf);
2184 		RTE_EDEV_LOG_ERR("Failed to allocate stats storage for"
2185 				 " dev_id: %d queue_id: %d",
2186 				 eth_dev_id, rx_queue_id);
2187 		return -ENOMEM;
2188 	}
2189 
2190 	queue_info->stats = stats;
2191 
2192 	return 0;
2193 }
2194 
2195 static int
2196 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2197 	   int rx_queue_id,
2198 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2199 {
2200 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2201 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2202 	int ret;
2203 	struct eth_rx_poll_entry *rx_poll;
2204 	struct eth_rx_queue_info *rx_queue;
2205 	uint32_t *rx_wrr;
2206 	uint16_t nb_rx_queues;
2207 	uint32_t nb_rx_poll, nb_wrr;
2208 	uint32_t nb_rx_intr;
2209 	int num_intr_vec;
2210 	uint16_t wt;
2211 
2212 	if (queue_conf->servicing_weight == 0) {
2213 		struct rte_eth_dev_data *data = dev_info->dev->data;
2214 
2215 		temp_conf = *queue_conf;
2216 		if (!data->dev_conf.intr_conf.rxq) {
2217 			/* If Rx interrupts are disabled set wt = 1 */
2218 			temp_conf.servicing_weight = 1;
2219 		}
2220 		queue_conf = &temp_conf;
2221 
2222 		if (queue_conf->servicing_weight == 0 &&
2223 		    rx_adapter->use_queue_event_buf) {
2224 
2225 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2226 					 "not supported for interrupt queues "
2227 					 "dev_id: %d queue_id: %d",
2228 					 eth_dev_id, rx_queue_id);
2229 			return -EINVAL;
2230 		}
2231 	}
2232 
2233 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2234 	rx_queue = dev_info->rx_queue;
2235 	wt = queue_conf->servicing_weight;
2236 
2237 	if (dev_info->rx_queue == NULL) {
2238 		dev_info->rx_queue =
2239 		    rte_zmalloc_socket(rx_adapter->mem_name,
2240 				       nb_rx_queues *
2241 				       sizeof(struct eth_rx_queue_info), 0,
2242 				       rx_adapter->socket_id);
2243 		if (dev_info->rx_queue == NULL)
2244 			return -ENOMEM;
2245 	}
2246 	rx_wrr = NULL;
2247 	rx_poll = NULL;
2248 
2249 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2250 			queue_conf->servicing_weight,
2251 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2252 
2253 	if (dev_info->dev->intr_handle)
2254 		dev_info->multi_intr_cap =
2255 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2256 
2257 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2258 				&rx_poll, &rx_wrr);
2259 	if (ret)
2260 		goto err_free_rxqueue;
2261 
2262 	if (wt == 0) {
2263 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2264 
2265 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2266 		if (ret)
2267 			goto err_free_rxqueue;
2268 
2269 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2270 		if (ret)
2271 			goto err_free_rxqueue;
2272 	} else {
2273 
2274 		num_intr_vec = 0;
2275 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2276 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2277 						rx_queue_id, 0);
2278 			/* interrupt based queues are being converted to
2279 			 * poll mode queues, delete the interrupt configuration
2280 			 * for those.
2281 			 */
2282 			ret = rxa_del_intr_queue(rx_adapter,
2283 						dev_info, rx_queue_id);
2284 			if (ret)
2285 				goto err_free_rxqueue;
2286 		}
2287 	}
2288 
2289 	if (nb_rx_intr == 0) {
2290 		ret = rxa_free_intr_resources(rx_adapter);
2291 		if (ret)
2292 			goto err_free_rxqueue;
2293 	}
2294 
2295 	if (wt == 0) {
2296 		uint16_t i;
2297 
2298 		if (rx_queue_id  == -1) {
2299 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2300 				dev_info->intr_queue[i] = i;
2301 		} else {
2302 			if (!rxa_intr_queue(dev_info, rx_queue_id) && nb_rx_intr > 0)
2303 				dev_info->intr_queue[nb_rx_intr - 1] =
2304 					rx_queue_id;
2305 		}
2306 	}
2307 
2308 
2309 
2310 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2311 	if (ret)
2312 		goto err_free_rxqueue;
2313 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2314 
2315 	rte_free(rx_adapter->eth_rx_poll);
2316 	rte_free(rx_adapter->wrr_sched);
2317 
2318 	rx_adapter->eth_rx_poll = rx_poll;
2319 	rx_adapter->wrr_sched = rx_wrr;
2320 	rx_adapter->wrr_len = nb_wrr;
2321 	rx_adapter->num_intr_vec += num_intr_vec;
2322 	return 0;
2323 
2324 err_free_rxqueue:
2325 	if (rx_queue == NULL) {
2326 		rte_free(dev_info->rx_queue);
2327 		dev_info->rx_queue = NULL;
2328 	}
2329 
2330 	rte_free(rx_poll);
2331 	rte_free(rx_wrr);
2332 
2333 	return ret;
2334 }
2335 
2336 static int
2337 rxa_ctrl(uint8_t id, int start)
2338 {
2339 	struct event_eth_rx_adapter *rx_adapter;
2340 	struct rte_eventdev *dev;
2341 	struct eth_device_info *dev_info;
2342 	uint32_t i;
2343 	int use_service = 0;
2344 	int stop = !start;
2345 
2346 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2347 	rx_adapter = rxa_id_to_adapter(id);
2348 	if (rx_adapter == NULL)
2349 		return -EINVAL;
2350 
2351 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2352 
2353 	RTE_ETH_FOREACH_DEV(i) {
2354 		dev_info = &rx_adapter->eth_devices[i];
2355 		/* if start  check for num dev queues */
2356 		if (start && !dev_info->nb_dev_queues)
2357 			continue;
2358 		/* if stop check if dev has been started */
2359 		if (stop && !dev_info->dev_rx_started)
2360 			continue;
2361 		use_service |= !dev_info->internal_event_port;
2362 		dev_info->dev_rx_started = start;
2363 		if (dev_info->internal_event_port == 0)
2364 			continue;
2365 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2366 						&rte_eth_devices[i]) :
2367 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2368 						&rte_eth_devices[i]);
2369 	}
2370 
2371 	if (use_service) {
2372 		rte_spinlock_lock(&rx_adapter->rx_lock);
2373 		rx_adapter->rxa_started = start;
2374 		rte_service_runstate_set(rx_adapter->service_id, start);
2375 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2376 	}
2377 
2378 	return 0;
2379 }
2380 
2381 static int
2382 rxa_create(uint8_t id, uint8_t dev_id,
2383 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2384 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2385 	   void *conf_arg)
2386 {
2387 	struct event_eth_rx_adapter *rx_adapter;
2388 	struct eth_event_enqueue_buffer *buf;
2389 	struct rte_event *events;
2390 	int ret;
2391 	int socket_id;
2392 	uint16_t i;
2393 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2394 	const uint8_t default_rss_key[] = {
2395 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2396 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2397 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2398 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2399 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2400 	};
2401 
2402 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2403 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2404 
2405 	if (conf_cb == NULL)
2406 		return -EINVAL;
2407 
2408 	if (event_eth_rx_adapter == NULL) {
2409 		ret = rte_event_eth_rx_adapter_init();
2410 		if (ret)
2411 			return ret;
2412 	}
2413 
2414 	rx_adapter = rxa_id_to_adapter(id);
2415 	if (rx_adapter != NULL) {
2416 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2417 		return -EEXIST;
2418 	}
2419 
2420 	socket_id = rte_event_dev_socket_id(dev_id);
2421 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2422 		"rte_event_eth_rx_adapter_%d",
2423 		id);
2424 
2425 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2426 			RTE_CACHE_LINE_SIZE, socket_id);
2427 	if (rx_adapter == NULL) {
2428 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2429 		return -ENOMEM;
2430 	}
2431 
2432 	rx_adapter->eventdev_id = dev_id;
2433 	rx_adapter->socket_id = socket_id;
2434 	rx_adapter->conf_cb = conf_cb;
2435 	rx_adapter->conf_arg = conf_arg;
2436 	rx_adapter->id = id;
2437 	TAILQ_INIT(&rx_adapter->vector_list);
2438 	strcpy(rx_adapter->mem_name, mem_name);
2439 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2440 					RTE_MAX_ETHPORTS *
2441 					sizeof(struct eth_device_info), 0,
2442 					socket_id);
2443 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2444 			(uint32_t *)rx_adapter->rss_key_be,
2445 			    RTE_DIM(default_rss_key));
2446 
2447 	if (rx_adapter->eth_devices == NULL) {
2448 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices");
2449 		rte_free(rx_adapter);
2450 		return -ENOMEM;
2451 	}
2452 
2453 	rte_spinlock_init(&rx_adapter->rx_lock);
2454 
2455 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2456 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2457 
2458 	/* Rx adapter event buffer allocation */
2459 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2460 
2461 	if (!rx_adapter->use_queue_event_buf) {
2462 		buf = &rx_adapter->event_enqueue_buffer;
2463 		buf->events_size = rxa_params->event_buf_size;
2464 
2465 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2466 					    buf->events_size * sizeof(*events),
2467 					    0, socket_id);
2468 		if (events == NULL) {
2469 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2470 					 "for adapter event buffer");
2471 			rte_free(rx_adapter->eth_devices);
2472 			rte_free(rx_adapter);
2473 			return -ENOMEM;
2474 		}
2475 
2476 		rx_adapter->event_enqueue_buffer.events = events;
2477 	}
2478 
2479 	event_eth_rx_adapter[id] = rx_adapter;
2480 
2481 	if (conf_cb == rxa_default_conf_cb)
2482 		rx_adapter->default_cb_arg = 1;
2483 
2484 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2485 		conf_arg);
2486 	return 0;
2487 }
2488 
2489 static int
2490 rxa_config_params_validate(struct rte_event_eth_rx_adapter_params *rxa_params,
2491 			   struct rte_event_eth_rx_adapter_params *temp_params)
2492 {
2493 	if (rxa_params == NULL) {
2494 		/* use default values if rxa_params is NULL */
2495 		temp_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2496 		temp_params->use_queue_event_buf = false;
2497 		return 0;
2498 	} else if (!rxa_params->use_queue_event_buf &&
2499 		    rxa_params->event_buf_size == 0) {
2500 		RTE_EDEV_LOG_ERR("event buffer size can't be zero");
2501 		return -EINVAL;
2502 	} else if (rxa_params->use_queue_event_buf &&
2503 		   rxa_params->event_buf_size != 0) {
2504 		RTE_EDEV_LOG_ERR("event buffer size needs to be configured "
2505 				 "as part of queue add");
2506 		return -EINVAL;
2507 	}
2508 
2509 	*temp_params = *rxa_params;
2510 	/* adjust event buff size with BATCH_SIZE used for fetching
2511 	 * packets from NIC rx queues to get full buffer utilization
2512 	 * and prevent unnecessary rollovers.
2513 	 */
2514 	if (!temp_params->use_queue_event_buf) {
2515 		temp_params->event_buf_size =
2516 			RTE_ALIGN(temp_params->event_buf_size, BATCH_SIZE);
2517 		temp_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2518 	}
2519 
2520 	return 0;
2521 }
2522 
2523 int
2524 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2525 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2526 				void *conf_arg)
2527 {
2528 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2529 
2530 	/* use default values for adapter params */
2531 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2532 	rxa_params.use_queue_event_buf = false;
2533 
2534 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2535 }
2536 
2537 int
2538 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2539 			struct rte_event_port_conf *port_config,
2540 			struct rte_event_eth_rx_adapter_params *rxa_params)
2541 {
2542 	struct rte_event_port_conf *pc;
2543 	int ret;
2544 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2545 
2546 	if (port_config == NULL)
2547 		return -EINVAL;
2548 
2549 	ret = rxa_config_params_validate(rxa_params, &temp_params);
2550 	if (ret != 0)
2551 		return ret;
2552 
2553 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2554 	if (pc == NULL)
2555 		return -ENOMEM;
2556 
2557 	*pc = *port_config;
2558 
2559 	ret = rxa_create(id, dev_id, &temp_params, rxa_default_conf_cb, pc);
2560 	if (ret)
2561 		rte_free(pc);
2562 
2563 	rte_eventdev_trace_eth_rx_adapter_create_with_params(id, dev_id,
2564 		port_config, rxa_params, ret);
2565 
2566 	return ret;
2567 }
2568 
2569 int
2570 rte_event_eth_rx_adapter_create_ext_with_params(uint8_t id, uint8_t dev_id,
2571 			rte_event_eth_rx_adapter_conf_cb conf_cb,
2572 			void *conf_arg,
2573 			struct rte_event_eth_rx_adapter_params *rxa_params)
2574 {
2575 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2576 	int ret;
2577 
2578 	ret = rxa_config_params_validate(rxa_params, &temp_params);
2579 	if (ret != 0)
2580 		return ret;
2581 
2582 	return rxa_create(id, dev_id, &temp_params, conf_cb, conf_arg);
2583 }
2584 
2585 int
2586 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2587 		struct rte_event_port_conf *port_config)
2588 {
2589 	struct rte_event_port_conf *pc;
2590 	int ret;
2591 
2592 	if (port_config == NULL)
2593 		return -EINVAL;
2594 
2595 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2596 
2597 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2598 	if (pc == NULL)
2599 		return -ENOMEM;
2600 	*pc = *port_config;
2601 
2602 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2603 					rxa_default_conf_cb,
2604 					pc);
2605 	if (ret)
2606 		rte_free(pc);
2607 	return ret;
2608 }
2609 
2610 int
2611 rte_event_eth_rx_adapter_free(uint8_t id)
2612 {
2613 	struct event_eth_rx_adapter *rx_adapter;
2614 
2615 	if (rxa_memzone_lookup())
2616 		return -ENOMEM;
2617 
2618 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2619 
2620 	rx_adapter = rxa_id_to_adapter(id);
2621 	if (rx_adapter == NULL)
2622 		return -EINVAL;
2623 
2624 	if (rx_adapter->nb_queues) {
2625 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2626 				rx_adapter->nb_queues);
2627 		return -EBUSY;
2628 	}
2629 
2630 	if (rx_adapter->default_cb_arg)
2631 		rte_free(rx_adapter->conf_arg);
2632 	rte_free(rx_adapter->eth_devices);
2633 	if (!rx_adapter->use_queue_event_buf)
2634 		rte_free(rx_adapter->event_enqueue_buffer.events);
2635 	rte_free(rx_adapter);
2636 	event_eth_rx_adapter[id] = NULL;
2637 
2638 	rte_eventdev_trace_eth_rx_adapter_free(id);
2639 	return 0;
2640 }
2641 
2642 int
2643 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2644 		uint16_t eth_dev_id,
2645 		int32_t rx_queue_id,
2646 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2647 {
2648 	int ret;
2649 	uint32_t cap;
2650 	struct event_eth_rx_adapter *rx_adapter;
2651 	struct rte_eventdev *dev;
2652 	struct eth_device_info *dev_info;
2653 	struct rte_event_eth_rx_adapter_vector_limits limits;
2654 
2655 	if (rxa_memzone_lookup())
2656 		return -ENOMEM;
2657 
2658 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2659 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2660 
2661 	rx_adapter = rxa_id_to_adapter(id);
2662 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2663 		return -EINVAL;
2664 
2665 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2666 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2667 						eth_dev_id,
2668 						&cap);
2669 	if (ret) {
2670 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2671 			"eth port %" PRIu16, id, eth_dev_id);
2672 		return ret;
2673 	}
2674 
2675 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2676 		&& (queue_conf->rx_queue_flags &
2677 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2678 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2679 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2680 				eth_dev_id, id);
2681 		return -EINVAL;
2682 	}
2683 
2684 	if (queue_conf->rx_queue_flags &
2685 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2686 
2687 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2688 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2689 					 " eth port: %" PRIu16
2690 					 " adapter id: %" PRIu8,
2691 					 eth_dev_id, id);
2692 			return -EINVAL;
2693 		}
2694 
2695 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2696 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2697 		if (ret < 0) {
2698 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2699 					 " eth port: %" PRIu16
2700 					 " adapter id: %" PRIu8,
2701 					 eth_dev_id, id);
2702 			return -EINVAL;
2703 		}
2704 		if (queue_conf->vector_sz < limits.min_sz ||
2705 		    queue_conf->vector_sz > limits.max_sz ||
2706 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2707 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2708 		    queue_conf->vector_mp == NULL) {
2709 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2710 					 " eth port: %" PRIu16
2711 					 " adapter id: %" PRIu8,
2712 					 eth_dev_id, id);
2713 			return -EINVAL;
2714 		}
2715 		if (queue_conf->vector_mp->elt_size <
2716 		    (sizeof(struct rte_event_vector) +
2717 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2718 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2719 					 " eth port: %" PRIu16
2720 					 " adapter id: %" PRIu8,
2721 					 eth_dev_id, id);
2722 			return -EINVAL;
2723 		}
2724 	}
2725 
2726 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2727 		(rx_queue_id != -1)) {
2728 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2729 			"event queue, eth port: %" PRIu16 " adapter id: %"
2730 			PRIu8, eth_dev_id, id);
2731 		return -EINVAL;
2732 	}
2733 
2734 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2735 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2736 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2737 			 (uint16_t)rx_queue_id);
2738 		return -EINVAL;
2739 	}
2740 
2741 	if ((rx_adapter->use_queue_event_buf &&
2742 	     queue_conf->event_buf_size == 0) ||
2743 	    (!rx_adapter->use_queue_event_buf &&
2744 	     queue_conf->event_buf_size != 0)) {
2745 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2746 		return -EINVAL;
2747 	}
2748 
2749 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2750 
2751 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2752 		if (*dev->dev_ops->eth_rx_adapter_queue_add == NULL)
2753 			return -ENOTSUP;
2754 		if (dev_info->rx_queue == NULL) {
2755 			dev_info->rx_queue =
2756 			    rte_zmalloc_socket(rx_adapter->mem_name,
2757 					dev_info->dev->data->nb_rx_queues *
2758 					sizeof(struct eth_rx_queue_info), 0,
2759 					rx_adapter->socket_id);
2760 			if (dev_info->rx_queue == NULL)
2761 				return -ENOMEM;
2762 		}
2763 
2764 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2765 				&rte_eth_devices[eth_dev_id],
2766 				rx_queue_id, queue_conf);
2767 		if (ret == 0) {
2768 			dev_info->internal_event_port = 1;
2769 			rxa_update_queue(rx_adapter,
2770 					&rx_adapter->eth_devices[eth_dev_id],
2771 					rx_queue_id,
2772 					1);
2773 		}
2774 	} else {
2775 		rte_spinlock_lock(&rx_adapter->rx_lock);
2776 		dev_info->internal_event_port = 0;
2777 		ret = rxa_init_service(rx_adapter, id);
2778 		if (ret == 0) {
2779 			uint32_t service_id = rx_adapter->service_id;
2780 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2781 					queue_conf);
2782 			rte_service_component_runstate_set(service_id,
2783 				rxa_sw_adapter_queue_count(rx_adapter));
2784 		}
2785 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2786 	}
2787 
2788 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2789 		rx_queue_id, queue_conf, ret);
2790 	if (ret)
2791 		return ret;
2792 
2793 	return 0;
2794 }
2795 
2796 static int
2797 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2798 {
2799 	limits->max_sz = MAX_VECTOR_SIZE;
2800 	limits->min_sz = MIN_VECTOR_SIZE;
2801 	limits->max_timeout_ns = MAX_VECTOR_NS;
2802 	limits->min_timeout_ns = MIN_VECTOR_NS;
2803 
2804 	return 0;
2805 }
2806 
2807 int
2808 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2809 				int32_t rx_queue_id)
2810 {
2811 	int ret = 0;
2812 	struct rte_eventdev *dev;
2813 	struct event_eth_rx_adapter *rx_adapter;
2814 	struct eth_device_info *dev_info;
2815 	uint32_t cap;
2816 	uint32_t nb_rx_poll = 0;
2817 	uint32_t nb_wrr = 0;
2818 	uint32_t nb_rx_intr;
2819 	struct eth_rx_poll_entry *rx_poll = NULL;
2820 	uint32_t *rx_wrr = NULL;
2821 	int num_intr_vec;
2822 
2823 	if (rxa_memzone_lookup())
2824 		return -ENOMEM;
2825 
2826 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2827 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2828 
2829 	rx_adapter = rxa_id_to_adapter(id);
2830 	if (rx_adapter == NULL)
2831 		return -EINVAL;
2832 
2833 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2834 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2835 						eth_dev_id,
2836 						&cap);
2837 	if (ret)
2838 		return ret;
2839 
2840 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2841 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2842 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2843 			 (uint16_t)rx_queue_id);
2844 		return -EINVAL;
2845 	}
2846 
2847 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2848 
2849 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2850 		if (*dev->dev_ops->eth_rx_adapter_queue_del == NULL)
2851 			return -ENOTSUP;
2852 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2853 						&rte_eth_devices[eth_dev_id],
2854 						rx_queue_id);
2855 		if (ret == 0) {
2856 			rxa_update_queue(rx_adapter,
2857 					&rx_adapter->eth_devices[eth_dev_id],
2858 					rx_queue_id,
2859 					0);
2860 			if (dev_info->nb_dev_queues == 0) {
2861 				rte_free(dev_info->rx_queue);
2862 				dev_info->rx_queue = NULL;
2863 			}
2864 		}
2865 	} else {
2866 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2867 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2868 
2869 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2870 			&rx_poll, &rx_wrr);
2871 		if (ret)
2872 			return ret;
2873 
2874 		rte_spinlock_lock(&rx_adapter->rx_lock);
2875 
2876 		num_intr_vec = 0;
2877 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2878 
2879 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2880 						rx_queue_id, 0);
2881 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2882 					rx_queue_id);
2883 			if (ret)
2884 				goto unlock_ret;
2885 		}
2886 
2887 		if (nb_rx_intr == 0) {
2888 			ret = rxa_free_intr_resources(rx_adapter);
2889 			if (ret)
2890 				goto unlock_ret;
2891 		}
2892 
2893 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2894 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2895 
2896 		rte_free(rx_adapter->eth_rx_poll);
2897 		rte_free(rx_adapter->wrr_sched);
2898 
2899 		if (nb_rx_intr == 0) {
2900 			rte_free(dev_info->intr_queue);
2901 			dev_info->intr_queue = NULL;
2902 		}
2903 
2904 		rx_adapter->eth_rx_poll = rx_poll;
2905 		rx_adapter->wrr_sched = rx_wrr;
2906 		rx_adapter->wrr_len = nb_wrr;
2907 		/*
2908 		 * reset next poll start position (wrr_pos) to avoid buffer
2909 		 * overrun when wrr_len is reduced in case of queue delete
2910 		 */
2911 		rx_adapter->wrr_pos = 0;
2912 		rx_adapter->num_intr_vec += num_intr_vec;
2913 
2914 		if (dev_info->nb_dev_queues == 0) {
2915 			rte_free(dev_info->rx_queue);
2916 			dev_info->rx_queue = NULL;
2917 		}
2918 unlock_ret:
2919 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2920 		if (ret) {
2921 			rte_free(rx_poll);
2922 			rte_free(rx_wrr);
2923 			return ret;
2924 		}
2925 
2926 		rte_service_component_runstate_set(rx_adapter->service_id,
2927 				rxa_sw_adapter_queue_count(rx_adapter));
2928 	}
2929 
2930 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2931 		rx_queue_id, ret);
2932 
2933 	return ret;
2934 }
2935 
2936 int
2937 rte_event_eth_rx_adapter_vector_limits_get(
2938 	uint8_t dev_id, uint16_t eth_port_id,
2939 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2940 {
2941 	struct rte_eventdev *dev;
2942 	uint32_t cap;
2943 	int ret;
2944 
2945 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2946 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2947 
2948 	if (limits == NULL)
2949 		return -EINVAL;
2950 
2951 	dev = &rte_eventdevs[dev_id];
2952 
2953 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2954 	if (ret) {
2955 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2956 				 "eth port %" PRIu16,
2957 				 dev_id, eth_port_id);
2958 		return ret;
2959 	}
2960 
2961 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2962 		if (*dev->dev_ops->eth_rx_adapter_vector_limits_get == NULL)
2963 			return -ENOTSUP;
2964 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2965 			dev, &rte_eth_devices[eth_port_id], limits);
2966 	} else {
2967 		ret = rxa_sw_vector_limits(limits);
2968 	}
2969 
2970 	rte_eventdev_trace_eth_rx_adapter_vector_limits_get(dev_id, eth_port_id,
2971 		limits->min_sz, limits->max_sz, limits->log2_sz,
2972 		limits->min_timeout_ns, limits->max_timeout_ns, ret);
2973 	return ret;
2974 }
2975 
2976 int
2977 rte_event_eth_rx_adapter_start(uint8_t id)
2978 {
2979 	rte_eventdev_trace_eth_rx_adapter_start(id);
2980 	return rxa_ctrl(id, 1);
2981 }
2982 
2983 int
2984 rte_event_eth_rx_adapter_stop(uint8_t id)
2985 {
2986 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2987 	return rxa_ctrl(id, 0);
2988 }
2989 
2990 static inline void
2991 rxa_queue_stats_reset(struct eth_rx_queue_info *queue_info)
2992 {
2993 	struct rte_event_eth_rx_adapter_stats *q_stats;
2994 
2995 	q_stats = queue_info->stats;
2996 	memset(q_stats, 0, sizeof(*q_stats));
2997 }
2998 
2999 int
3000 rte_event_eth_rx_adapter_stats_get(uint8_t id,
3001 			       struct rte_event_eth_rx_adapter_stats *stats)
3002 {
3003 	struct event_eth_rx_adapter *rx_adapter;
3004 	struct eth_event_enqueue_buffer *buf;
3005 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
3006 	struct rte_event_eth_rx_adapter_stats dev_stats;
3007 	struct rte_eventdev *dev;
3008 	struct eth_device_info *dev_info;
3009 	struct eth_rx_queue_info *queue_info;
3010 	struct rte_event_eth_rx_adapter_stats *q_stats;
3011 	uint32_t i, j;
3012 	int ret;
3013 
3014 	rte_eventdev_trace_eth_rx_adapter_stats_get(id, stats);
3015 
3016 	if (rxa_memzone_lookup())
3017 		return -ENOMEM;
3018 
3019 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3020 
3021 	rx_adapter = rxa_id_to_adapter(id);
3022 	if (rx_adapter  == NULL || stats == NULL)
3023 		return -EINVAL;
3024 
3025 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3026 	memset(stats, 0, sizeof(*stats));
3027 
3028 	if (rx_adapter->service_inited)
3029 		*stats = rx_adapter->stats;
3030 
3031 	RTE_ETH_FOREACH_DEV(i) {
3032 		dev_info = &rx_adapter->eth_devices[i];
3033 
3034 		if (rx_adapter->use_queue_event_buf && dev_info->rx_queue) {
3035 
3036 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3037 			     j++) {
3038 				queue_info = &dev_info->rx_queue[j];
3039 				if (!queue_info->queue_enabled)
3040 					continue;
3041 				q_stats = queue_info->stats;
3042 
3043 				stats->rx_packets += q_stats->rx_packets;
3044 				stats->rx_poll_count += q_stats->rx_poll_count;
3045 				stats->rx_enq_count += q_stats->rx_enq_count;
3046 				stats->rx_enq_retry += q_stats->rx_enq_retry;
3047 				stats->rx_dropped += q_stats->rx_dropped;
3048 				stats->rx_enq_block_cycles +=
3049 						q_stats->rx_enq_block_cycles;
3050 			}
3051 		}
3052 
3053 		if (dev_info->internal_event_port == 0 ||
3054 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
3055 			continue;
3056 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
3057 						&rte_eth_devices[i],
3058 						&dev_stats);
3059 		if (ret)
3060 			continue;
3061 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
3062 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
3063 	}
3064 
3065 	buf = &rx_adapter->event_enqueue_buffer;
3066 	stats->rx_packets += dev_stats_sum.rx_packets;
3067 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
3068 	stats->rx_event_buf_count = buf->count;
3069 	stats->rx_event_buf_size = buf->events_size;
3070 
3071 	return 0;
3072 }
3073 
3074 int
3075 rte_event_eth_rx_adapter_queue_stats_get(uint8_t id,
3076 		uint16_t eth_dev_id,
3077 		uint16_t rx_queue_id,
3078 		struct rte_event_eth_rx_adapter_queue_stats *stats)
3079 {
3080 	struct event_eth_rx_adapter *rx_adapter;
3081 	struct eth_device_info *dev_info;
3082 	struct eth_rx_queue_info *queue_info;
3083 	struct eth_event_enqueue_buffer *event_buf;
3084 	struct rte_event_eth_rx_adapter_stats *q_stats;
3085 	struct rte_eventdev *dev;
3086 
3087 	rte_eventdev_trace_eth_rx_adapter_queue_stats_get(id, eth_dev_id,
3088 							  rx_queue_id, stats);
3089 
3090 	if (rxa_memzone_lookup())
3091 		return -ENOMEM;
3092 
3093 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3094 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3095 
3096 	rx_adapter = rxa_id_to_adapter(id);
3097 
3098 	if (rx_adapter == NULL || stats == NULL)
3099 		return -EINVAL;
3100 
3101 	if (!rx_adapter->use_queue_event_buf)
3102 		return -EINVAL;
3103 
3104 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3105 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3106 		return -EINVAL;
3107 	}
3108 
3109 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3110 	if (dev_info->rx_queue == NULL ||
3111 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3112 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3113 		return -EINVAL;
3114 	}
3115 
3116 	if (dev_info->internal_event_port == 0) {
3117 		queue_info = &dev_info->rx_queue[rx_queue_id];
3118 		event_buf = queue_info->event_buf;
3119 		q_stats = queue_info->stats;
3120 
3121 		stats->rx_event_buf_count = event_buf->count;
3122 		stats->rx_event_buf_size = event_buf->events_size;
3123 		stats->rx_packets = q_stats->rx_packets;
3124 		stats->rx_poll_count = q_stats->rx_poll_count;
3125 		stats->rx_dropped = q_stats->rx_dropped;
3126 	}
3127 
3128 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3129 	if (dev->dev_ops->eth_rx_adapter_queue_stats_get != NULL) {
3130 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_get)(dev,
3131 						&rte_eth_devices[eth_dev_id],
3132 						rx_queue_id, stats);
3133 	}
3134 
3135 	return 0;
3136 }
3137 
3138 int
3139 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
3140 {
3141 	struct event_eth_rx_adapter *rx_adapter;
3142 	struct rte_eventdev *dev;
3143 	struct eth_device_info *dev_info;
3144 	struct eth_rx_queue_info *queue_info;
3145 	uint32_t i, j;
3146 
3147 	rte_eventdev_trace_eth_rx_adapter_stats_reset(id);
3148 
3149 	if (rxa_memzone_lookup())
3150 		return -ENOMEM;
3151 
3152 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3153 
3154 	rx_adapter = rxa_id_to_adapter(id);
3155 	if (rx_adapter == NULL)
3156 		return -EINVAL;
3157 
3158 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3159 
3160 	RTE_ETH_FOREACH_DEV(i) {
3161 		dev_info = &rx_adapter->eth_devices[i];
3162 
3163 		if (rx_adapter->use_queue_event_buf  && dev_info->rx_queue) {
3164 
3165 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3166 						j++) {
3167 				queue_info = &dev_info->rx_queue[j];
3168 				if (!queue_info->queue_enabled)
3169 					continue;
3170 				rxa_queue_stats_reset(queue_info);
3171 			}
3172 		}
3173 
3174 		if (dev_info->internal_event_port == 0 ||
3175 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
3176 			continue;
3177 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
3178 							&rte_eth_devices[i]);
3179 	}
3180 
3181 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
3182 
3183 	return 0;
3184 }
3185 
3186 int
3187 rte_event_eth_rx_adapter_queue_stats_reset(uint8_t id,
3188 		uint16_t eth_dev_id,
3189 		uint16_t rx_queue_id)
3190 {
3191 	struct event_eth_rx_adapter *rx_adapter;
3192 	struct eth_device_info *dev_info;
3193 	struct eth_rx_queue_info *queue_info;
3194 	struct rte_eventdev *dev;
3195 
3196 	rte_eventdev_trace_eth_rx_adapter_queue_stats_reset(id, eth_dev_id,
3197 							    rx_queue_id);
3198 
3199 	if (rxa_memzone_lookup())
3200 		return -ENOMEM;
3201 
3202 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3203 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3204 
3205 	rx_adapter = rxa_id_to_adapter(id);
3206 	if (rx_adapter == NULL)
3207 		return -EINVAL;
3208 
3209 	if (!rx_adapter->use_queue_event_buf)
3210 		return -EINVAL;
3211 
3212 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3213 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3214 		return -EINVAL;
3215 	}
3216 
3217 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3218 
3219 	if (dev_info->rx_queue == NULL ||
3220 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3221 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3222 		return -EINVAL;
3223 	}
3224 
3225 	if (dev_info->internal_event_port == 0) {
3226 		queue_info = &dev_info->rx_queue[rx_queue_id];
3227 		rxa_queue_stats_reset(queue_info);
3228 	}
3229 
3230 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3231 	if (dev->dev_ops->eth_rx_adapter_queue_stats_reset != NULL) {
3232 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_reset)(dev,
3233 						&rte_eth_devices[eth_dev_id],
3234 						rx_queue_id);
3235 	}
3236 
3237 	return 0;
3238 }
3239 
3240 int
3241 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
3242 {
3243 	struct event_eth_rx_adapter *rx_adapter;
3244 
3245 	if (rxa_memzone_lookup())
3246 		return -ENOMEM;
3247 
3248 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3249 
3250 	rx_adapter = rxa_id_to_adapter(id);
3251 	if (rx_adapter == NULL || service_id == NULL)
3252 		return -EINVAL;
3253 
3254 	if (rx_adapter->service_inited)
3255 		*service_id = rx_adapter->service_id;
3256 
3257 	rte_eventdev_trace_eth_rx_adapter_service_id_get(id, *service_id);
3258 
3259 	return rx_adapter->service_inited ? 0 : -ESRCH;
3260 }
3261 
3262 int
3263 rte_event_eth_rx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
3264 {
3265 	struct event_eth_rx_adapter *rx_adapter;
3266 
3267 	if (rxa_memzone_lookup())
3268 		return -ENOMEM;
3269 
3270 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3271 
3272 	rx_adapter = rxa_id_to_adapter(id);
3273 	if (rx_adapter == NULL || event_port_id == NULL)
3274 		return -EINVAL;
3275 
3276 	if (rx_adapter->service_inited)
3277 		*event_port_id = rx_adapter->event_port_id;
3278 
3279 	rte_eventdev_trace_eth_rx_adapter_event_port_get(id, *event_port_id);
3280 
3281 	return rx_adapter->service_inited ? 0 : -ESRCH;
3282 }
3283 
3284 int
3285 rte_event_eth_rx_adapter_cb_register(uint8_t id,
3286 					uint16_t eth_dev_id,
3287 					rte_event_eth_rx_adapter_cb_fn cb_fn,
3288 					void *cb_arg)
3289 {
3290 	struct event_eth_rx_adapter *rx_adapter;
3291 	struct eth_device_info *dev_info;
3292 	uint32_t cap;
3293 	int ret;
3294 
3295 	rte_eventdev_trace_eth_rx_adapter_cb_register(id, eth_dev_id, cb_fn,
3296 						      cb_arg);
3297 
3298 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3299 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3300 
3301 	rx_adapter = rxa_id_to_adapter(id);
3302 	if (rx_adapter == NULL)
3303 		return -EINVAL;
3304 
3305 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3306 	if (dev_info->rx_queue == NULL)
3307 		return -EINVAL;
3308 
3309 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3310 						eth_dev_id,
3311 						&cap);
3312 	if (ret) {
3313 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3314 			"eth port %" PRIu16, id, eth_dev_id);
3315 		return ret;
3316 	}
3317 
3318 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3319 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
3320 				PRIu16, eth_dev_id);
3321 		return -EINVAL;
3322 	}
3323 
3324 	rte_spinlock_lock(&rx_adapter->rx_lock);
3325 	dev_info->cb_fn = cb_fn;
3326 	dev_info->cb_arg = cb_arg;
3327 	rte_spinlock_unlock(&rx_adapter->rx_lock);
3328 
3329 	return 0;
3330 }
3331 
3332 int
3333 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
3334 			uint16_t eth_dev_id,
3335 			uint16_t rx_queue_id,
3336 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
3337 {
3338 #define TICK2NSEC(_ticks, _freq) (((_ticks) * (1E9)) / (_freq))
3339 	struct rte_eventdev *dev;
3340 	struct event_eth_rx_adapter *rx_adapter;
3341 	struct eth_device_info *dev_info;
3342 	struct eth_rx_queue_info *queue_info;
3343 	int ret;
3344 
3345 	rte_eventdev_trace_eth_rx_adapter_queue_conf_get(id, eth_dev_id,
3346 							 rx_queue_id, queue_conf);
3347 
3348 	if (rxa_memzone_lookup())
3349 		return -ENOMEM;
3350 
3351 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3352 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3353 
3354 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3355 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3356 		return -EINVAL;
3357 	}
3358 
3359 	if (queue_conf == NULL) {
3360 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3361 		return -EINVAL;
3362 	}
3363 
3364 	rx_adapter = rxa_id_to_adapter(id);
3365 	if (rx_adapter == NULL)
3366 		return -EINVAL;
3367 
3368 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3369 	if (dev_info->rx_queue == NULL ||
3370 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3371 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3372 		return -EINVAL;
3373 	}
3374 
3375 	queue_info = &dev_info->rx_queue[rx_queue_id];
3376 
3377 	memset(queue_conf, 0, sizeof(*queue_conf));
3378 	queue_conf->rx_queue_flags = 0;
3379 	if (queue_info->flow_id_mask != 0)
3380 		queue_conf->rx_queue_flags |=
3381 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3382 	queue_conf->servicing_weight = queue_info->wt;
3383 
3384 	queue_conf->ev.event = queue_info->event;
3385 
3386 	queue_conf->vector_sz = queue_info->vector_data.max_vector_count;
3387 	queue_conf->vector_mp = queue_info->vector_data.vector_pool;
3388 	/* need to be converted from ticks to ns */
3389 	queue_conf->vector_timeout_ns = TICK2NSEC(
3390 		queue_info->vector_data.vector_timeout_ticks, rte_get_timer_hz());
3391 
3392 	if (queue_info->event_buf != NULL)
3393 		queue_conf->event_buf_size = queue_info->event_buf->events_size;
3394 	else
3395 		queue_conf->event_buf_size = 0;
3396 
3397 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3398 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3399 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3400 						&rte_eth_devices[eth_dev_id],
3401 						rx_queue_id,
3402 						queue_conf);
3403 		return ret;
3404 	}
3405 
3406 	return 0;
3407 }
3408 
3409 static int
3410 rxa_is_queue_added(struct event_eth_rx_adapter *rx_adapter,
3411 		   uint16_t eth_dev_id,
3412 		   uint16_t rx_queue_id)
3413 {
3414 	struct eth_device_info *dev_info;
3415 	struct eth_rx_queue_info *queue_info;
3416 
3417 	if (!rx_adapter->eth_devices)
3418 		return 0;
3419 
3420 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3421 	if (!dev_info || !dev_info->rx_queue)
3422 		return 0;
3423 
3424 	queue_info = &dev_info->rx_queue[rx_queue_id];
3425 
3426 	return queue_info && queue_info->queue_enabled;
3427 }
3428 
3429 #define rxa_evdev(rx_adapter) (&rte_eventdevs[(rx_adapter)->eventdev_id])
3430 
3431 #define rxa_dev_instance_get(rx_adapter) \
3432 		rxa_evdev((rx_adapter))->dev_ops->eth_rx_adapter_instance_get
3433 
3434 int
3435 rte_event_eth_rx_adapter_instance_get(uint16_t eth_dev_id,
3436 				      uint16_t rx_queue_id,
3437 				      uint8_t *rxa_inst_id)
3438 {
3439 	uint8_t id;
3440 	int ret = -EINVAL;
3441 	uint32_t caps;
3442 	struct event_eth_rx_adapter *rx_adapter;
3443 
3444 	if (rxa_memzone_lookup())
3445 		return -ENOMEM;
3446 
3447 	if (eth_dev_id >= rte_eth_dev_count_avail()) {
3448 		RTE_EDEV_LOG_ERR("Invalid ethernet port id %u", eth_dev_id);
3449 		return -EINVAL;
3450 	}
3451 
3452 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3453 		RTE_EDEV_LOG_ERR("Invalid Rx queue %u", rx_queue_id);
3454 		return -EINVAL;
3455 	}
3456 
3457 	if (rxa_inst_id == NULL) {
3458 		RTE_EDEV_LOG_ERR("rxa_inst_id cannot be NULL");
3459 		return -EINVAL;
3460 	}
3461 
3462 	/* Iterate through all adapter instances */
3463 	for (id = 0; id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; id++) {
3464 		rx_adapter = rxa_id_to_adapter(id);
3465 		if (!rx_adapter)
3466 			continue;
3467 
3468 		if (rxa_is_queue_added(rx_adapter, eth_dev_id, rx_queue_id)) {
3469 			*rxa_inst_id = rx_adapter->id;
3470 			ret = 0;
3471 		}
3472 
3473 		/* Rx adapter internally mainatains queue information
3474 		 * for both internal port and DPDK service port.
3475 		 * Eventdev PMD callback is called for future proof only and
3476 		 * overrides the above return value if defined.
3477 		 */
3478 		caps = 0;
3479 		if (!rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3480 						      eth_dev_id,
3481 						      &caps)) {
3482 			if (caps & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT &&
3483 			    rxa_dev_instance_get(rx_adapter))
3484 				ret = rxa_dev_instance_get(rx_adapter)(eth_dev_id, rx_queue_id,
3485 								       rxa_inst_id);
3486 		}
3487 
3488 		/* return if entry found */
3489 		if (ret == 0) {
3490 			rte_eventdev_trace_eth_rx_adapter_instance_get(eth_dev_id, rx_queue_id,
3491 								       *rxa_inst_id);
3492 			return ret;
3493 		}
3494 	}
3495 
3496 	return -EINVAL;
3497 }
3498 
3499 static int
3500 rxa_caps_check(struct event_eth_rx_adapter *rxa)
3501 {
3502 	if (!rxa->nb_queues)
3503 		return -EINVAL;
3504 
3505 	/* Check if there is at least one non-internal ethernet port. */
3506 	if (rxa->service_inited)
3507 		return 0;
3508 
3509 	return -ENOTSUP;
3510 }
3511 
3512 int
3513 rte_event_eth_rx_adapter_runtime_params_init(
3514 		struct rte_event_eth_rx_adapter_runtime_params *params)
3515 {
3516 	if (params == NULL)
3517 		return -EINVAL;
3518 
3519 	memset(params, 0, sizeof(struct rte_event_eth_rx_adapter_runtime_params));
3520 	params->max_nb_rx = RXA_NB_RX_WORK_DEFAULT;
3521 
3522 	return 0;
3523 }
3524 
3525 int
3526 rte_event_eth_rx_adapter_runtime_params_set(uint8_t id,
3527 		struct rte_event_eth_rx_adapter_runtime_params *params)
3528 {
3529 	struct event_eth_rx_adapter *rxa;
3530 	int ret;
3531 
3532 	if (params == NULL)
3533 		return -EINVAL;
3534 
3535 	if (rxa_memzone_lookup())
3536 		return -ENOMEM;
3537 
3538 	rxa = rxa_id_to_adapter(id);
3539 	if (rxa == NULL)
3540 		return -EINVAL;
3541 
3542 	ret = rxa_caps_check(rxa);
3543 	if (ret)
3544 		return ret;
3545 
3546 	rte_spinlock_lock(&rxa->rx_lock);
3547 	rxa->max_nb_rx = params->max_nb_rx;
3548 	rte_spinlock_unlock(&rxa->rx_lock);
3549 
3550 	return 0;
3551 }
3552 
3553 int
3554 rte_event_eth_rx_adapter_runtime_params_get(uint8_t id,
3555 		struct rte_event_eth_rx_adapter_runtime_params *params)
3556 {
3557 	struct event_eth_rx_adapter *rxa;
3558 	int ret;
3559 
3560 	if (params == NULL)
3561 		return -EINVAL;
3562 
3563 	if (rxa_memzone_lookup())
3564 		return -ENOMEM;
3565 
3566 	rxa = rxa_id_to_adapter(id);
3567 	if (rxa == NULL)
3568 		return -EINVAL;
3569 
3570 	ret = rxa_caps_check(rxa);
3571 	if (ret)
3572 		return ret;
3573 
3574 	params->max_nb_rx = rxa->max_nb_rx;
3575 
3576 	return 0;
3577 }
3578 
3579 /* RX-adapter telemetry callbacks */
3580 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_uint(d, #s, stats.s)
3581 
3582 static int
3583 handle_rxa_stats(const char *cmd __rte_unused,
3584 		 const char *params,
3585 		 struct rte_tel_data *d)
3586 {
3587 	uint8_t rx_adapter_id;
3588 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3589 
3590 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3591 		return -1;
3592 
3593 	/* Get Rx adapter ID from parameter string */
3594 	rx_adapter_id = atoi(params);
3595 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3596 
3597 	/* Get Rx adapter stats */
3598 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3599 					       &rx_adptr_stats)) {
3600 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats");
3601 		return -1;
3602 	}
3603 
3604 	rte_tel_data_start_dict(d);
3605 	rte_tel_data_add_dict_uint(d, "rx_adapter_id", rx_adapter_id);
3606 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3607 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3608 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3609 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3610 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3611 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3612 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3613 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3614 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3615 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3616 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3617 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3618 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3619 
3620 	return 0;
3621 }
3622 
3623 static int
3624 handle_rxa_stats_reset(const char *cmd __rte_unused,
3625 		       const char *params,
3626 		       struct rte_tel_data *d __rte_unused)
3627 {
3628 	uint8_t rx_adapter_id;
3629 
3630 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3631 		return -1;
3632 
3633 	/* Get Rx adapter ID from parameter string */
3634 	rx_adapter_id = atoi(params);
3635 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3636 
3637 	/* Reset Rx adapter stats */
3638 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3639 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats");
3640 		return -1;
3641 	}
3642 
3643 	return 0;
3644 }
3645 
3646 static int
3647 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3648 			  const char *params,
3649 			  struct rte_tel_data *d)
3650 {
3651 	uint8_t rx_adapter_id;
3652 	uint16_t rx_queue_id;
3653 	int eth_dev_id, ret = -1;
3654 	char *token, *l_params;
3655 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3656 
3657 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3658 		return -1;
3659 
3660 	/* Get Rx adapter ID from parameter string */
3661 	l_params = strdup(params);
3662 	if (l_params == NULL)
3663 		return -ENOMEM;
3664 	token = strtok(l_params, ",");
3665 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3666 	rx_adapter_id = strtoul(token, NULL, 10);
3667 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3668 
3669 	token = strtok(NULL, ",");
3670 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3671 
3672 	/* Get device ID from parameter string */
3673 	eth_dev_id = strtoul(token, NULL, 10);
3674 	RTE_EVENT_ETH_RX_ADAPTER_PORTID_VALID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3675 
3676 	token = strtok(NULL, ",");
3677 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3678 
3679 	/* Get Rx queue ID from parameter string */
3680 	rx_queue_id = strtoul(token, NULL, 10);
3681 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3682 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3683 		ret = -EINVAL;
3684 		goto error;
3685 	}
3686 
3687 	token = strtok(NULL, "\0");
3688 	if (token != NULL)
3689 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3690 				 " telemetry command, ignoring");
3691 	/* Parsing parameter finished */
3692 	free(l_params);
3693 
3694 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3695 						    rx_queue_id, &queue_conf)) {
3696 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3697 		return -1;
3698 	}
3699 
3700 	rte_tel_data_start_dict(d);
3701 	rte_tel_data_add_dict_uint(d, "rx_adapter_id", rx_adapter_id);
3702 	rte_tel_data_add_dict_uint(d, "eth_dev_id", eth_dev_id);
3703 	rte_tel_data_add_dict_uint(d, "rx_queue_id", rx_queue_id);
3704 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3705 	RXA_ADD_DICT(queue_conf, servicing_weight);
3706 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3707 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3708 	RXA_ADD_DICT(queue_conf.ev, priority);
3709 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3710 
3711 	return 0;
3712 
3713 error:
3714 	free(l_params);
3715 	return ret;
3716 }
3717 
3718 static int
3719 handle_rxa_get_queue_stats(const char *cmd __rte_unused,
3720 			   const char *params,
3721 			   struct rte_tel_data *d)
3722 {
3723 	uint8_t rx_adapter_id;
3724 	uint16_t rx_queue_id;
3725 	int eth_dev_id, ret = -1;
3726 	char *token, *l_params;
3727 	struct rte_event_eth_rx_adapter_queue_stats q_stats;
3728 
3729 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3730 		return -1;
3731 
3732 	/* Get Rx adapter ID from parameter string */
3733 	l_params = strdup(params);
3734 	if (l_params == NULL)
3735 		return -ENOMEM;
3736 	token = strtok(l_params, ",");
3737 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3738 	rx_adapter_id = strtoul(token, NULL, 10);
3739 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3740 
3741 	token = strtok(NULL, ",");
3742 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3743 
3744 	/* Get device ID from parameter string */
3745 	eth_dev_id = strtoul(token, NULL, 10);
3746 	RTE_EVENT_ETH_RX_ADAPTER_PORTID_VALID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3747 
3748 	token = strtok(NULL, ",");
3749 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3750 
3751 	/* Get Rx queue ID from parameter string */
3752 	rx_queue_id = strtoul(token, NULL, 10);
3753 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3754 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3755 		ret = -EINVAL;
3756 		goto error;
3757 	}
3758 
3759 	token = strtok(NULL, "\0");
3760 	if (token != NULL)
3761 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3762 				 " telemetry command, ignoring");
3763 	/* Parsing parameter finished */
3764 	free(l_params);
3765 
3766 	if (rte_event_eth_rx_adapter_queue_stats_get(rx_adapter_id, eth_dev_id,
3767 						    rx_queue_id, &q_stats)) {
3768 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue stats");
3769 		return -1;
3770 	}
3771 
3772 	rte_tel_data_start_dict(d);
3773 	rte_tel_data_add_dict_uint(d, "rx_adapter_id", rx_adapter_id);
3774 	rte_tel_data_add_dict_uint(d, "eth_dev_id", eth_dev_id);
3775 	rte_tel_data_add_dict_uint(d, "rx_queue_id", rx_queue_id);
3776 	RXA_ADD_DICT(q_stats, rx_event_buf_count);
3777 	RXA_ADD_DICT(q_stats, rx_event_buf_size);
3778 	RXA_ADD_DICT(q_stats, rx_poll_count);
3779 	RXA_ADD_DICT(q_stats, rx_packets);
3780 	RXA_ADD_DICT(q_stats, rx_dropped);
3781 
3782 	return 0;
3783 
3784 error:
3785 	free(l_params);
3786 	return ret;
3787 }
3788 
3789 static int
3790 handle_rxa_queue_stats_reset(const char *cmd __rte_unused,
3791 			     const char *params,
3792 			     struct rte_tel_data *d __rte_unused)
3793 {
3794 	uint8_t rx_adapter_id;
3795 	uint16_t rx_queue_id;
3796 	int eth_dev_id, ret = -1;
3797 	char *token, *l_params;
3798 
3799 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3800 		return -1;
3801 
3802 	/* Get Rx adapter ID from parameter string */
3803 	l_params = strdup(params);
3804 	if (l_params == NULL)
3805 		return -ENOMEM;
3806 	token = strtok(l_params, ",");
3807 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3808 	rx_adapter_id = strtoul(token, NULL, 10);
3809 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3810 
3811 	token = strtok(NULL, ",");
3812 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3813 
3814 	/* Get device ID from parameter string */
3815 	eth_dev_id = strtoul(token, NULL, 10);
3816 	RTE_EVENT_ETH_RX_ADAPTER_PORTID_VALID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3817 
3818 	token = strtok(NULL, ",");
3819 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3820 
3821 	/* Get Rx queue ID from parameter string */
3822 	rx_queue_id = strtoul(token, NULL, 10);
3823 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3824 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3825 		ret = -EINVAL;
3826 		goto error;
3827 	}
3828 
3829 	token = strtok(NULL, "\0");
3830 	if (token != NULL)
3831 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3832 				 " telemetry command, ignoring");
3833 	/* Parsing parameter finished */
3834 	free(l_params);
3835 
3836 	if (rte_event_eth_rx_adapter_queue_stats_reset(rx_adapter_id,
3837 						       eth_dev_id,
3838 						       rx_queue_id)) {
3839 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter queue stats");
3840 		return -1;
3841 	}
3842 
3843 	return 0;
3844 
3845 error:
3846 	free(l_params);
3847 	return ret;
3848 }
3849 
3850 static int
3851 handle_rxa_instance_get(const char *cmd __rte_unused,
3852 			const char *params,
3853 			struct rte_tel_data *d)
3854 {
3855 	uint8_t instance_id;
3856 	uint16_t rx_queue_id;
3857 	int eth_dev_id, ret = -1;
3858 	char *token, *l_params;
3859 
3860 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3861 		return -1;
3862 
3863 	l_params = strdup(params);
3864 	if (l_params == NULL)
3865 		return -ENOMEM;
3866 	token = strtok(l_params, ",");
3867 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3868 
3869 	/* Get device ID from parameter string */
3870 	eth_dev_id = strtoul(token, NULL, 10);
3871 	RTE_EVENT_ETH_RX_ADAPTER_PORTID_VALID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3872 
3873 	token = strtok(NULL, ",");
3874 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3875 
3876 	/* Get Rx queue ID from parameter string */
3877 	rx_queue_id = strtoul(token, NULL, 10);
3878 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3879 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3880 		ret = -EINVAL;
3881 		goto error;
3882 	}
3883 
3884 	token = strtok(NULL, "\0");
3885 	if (token != NULL)
3886 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3887 				 " telemetry command, ignoring");
3888 
3889 	/* Parsing parameter finished */
3890 	free(l_params);
3891 
3892 	if (rte_event_eth_rx_adapter_instance_get(eth_dev_id,
3893 						  rx_queue_id,
3894 						  &instance_id)) {
3895 		RTE_EDEV_LOG_ERR("Failed to get RX adapter instance ID "
3896 				 " for rx_queue_id = %d", rx_queue_id);
3897 		return -1;
3898 	}
3899 
3900 	rte_tel_data_start_dict(d);
3901 	rte_tel_data_add_dict_uint(d, "eth_dev_id", eth_dev_id);
3902 	rte_tel_data_add_dict_uint(d, "rx_queue_id", rx_queue_id);
3903 	rte_tel_data_add_dict_uint(d, "rxa_instance_id", instance_id);
3904 
3905 	return 0;
3906 
3907 error:
3908 	free(l_params);
3909 	return ret;
3910 }
3911 
3912 RTE_INIT(rxa_init_telemetry)
3913 {
3914 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3915 		handle_rxa_stats,
3916 		"Returns Rx adapter stats. Parameter: rxa_id");
3917 
3918 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3919 		handle_rxa_stats_reset,
3920 		"Reset Rx adapter stats. Parameter: rxa_id");
3921 
3922 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3923 		handle_rxa_get_queue_conf,
3924 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3925 
3926 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats",
3927 		handle_rxa_get_queue_stats,
3928 		"Returns Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3929 
3930 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats_reset",
3931 		handle_rxa_queue_stats_reset,
3932 		"Reset Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3933 
3934 	rte_telemetry_register_cmd("/eventdev/rxa_rxq_instance_get",
3935 		handle_rxa_instance_get,
3936 		"Returns Rx adapter instance id. Parameter: dev_id, queue_id");
3937 }
3938