xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 665b49c51639a10c553433bc2bcd85c7331c631e)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <ctype.h>
6 #include <stdlib.h>
7 #if defined(LINUX)
8 #include <sys/epoll.h>
9 #endif
10 #include <unistd.h>
11 
12 #include <rte_cycles.h>
13 #include <rte_common.h>
14 #include <dev_driver.h>
15 #include <rte_errno.h>
16 #include <ethdev_driver.h>
17 #include <rte_log.h>
18 #include <rte_malloc.h>
19 #include <rte_service_component.h>
20 #include <rte_thash.h>
21 #include <rte_interrupts.h>
22 #include <rte_mbuf_dyn.h>
23 #include <rte_telemetry.h>
24 
25 #include "rte_eventdev.h"
26 #include "eventdev_pmd.h"
27 #include "eventdev_trace.h"
28 #include "rte_event_eth_rx_adapter.h"
29 
30 #define BATCH_SIZE		32
31 #define BLOCK_CNT_THRESHOLD	10
32 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
33 #define MAX_VECTOR_SIZE		1024
34 #define MIN_VECTOR_SIZE		4
35 #define MAX_VECTOR_NS		1E9
36 #define MIN_VECTOR_NS		1E5
37 
38 #define RXA_NB_RX_WORK_DEFAULT 128
39 
40 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
41 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
42 
43 #define RSS_KEY_SIZE	40
44 /* value written to intr thread pipe to signal thread exit */
45 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
46 /* Sentinel value to detect initialized file handle */
47 #define INIT_FD		-1
48 
49 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
50 
51 /*
52  * Used to store port and queue ID of interrupting Rx queue
53  */
54 union queue_data {
55 	RTE_STD_C11
56 	void *ptr;
57 	struct {
58 		uint16_t port;
59 		uint16_t queue;
60 	};
61 };
62 
63 /*
64  * There is an instance of this struct per polled Rx queue added to the
65  * adapter
66  */
67 struct eth_rx_poll_entry {
68 	/* Eth port to poll */
69 	uint16_t eth_dev_id;
70 	/* Eth rx queue to poll */
71 	uint16_t eth_rx_qid;
72 };
73 
74 struct eth_rx_vector_data {
75 	TAILQ_ENTRY(eth_rx_vector_data) next;
76 	uint16_t port;
77 	uint16_t queue;
78 	uint16_t max_vector_count;
79 	uint64_t event;
80 	uint64_t ts;
81 	uint64_t vector_timeout_ticks;
82 	struct rte_mempool *vector_pool;
83 	struct rte_event_vector *vector_ev;
84 } __rte_cache_aligned;
85 
86 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
87 
88 /* Instance per adapter */
89 struct eth_event_enqueue_buffer {
90 	/* Count of events in this buffer */
91 	uint16_t count;
92 	/* Array of events in this buffer */
93 	struct rte_event *events;
94 	/* size of event buffer */
95 	uint16_t events_size;
96 	/* Event enqueue happens from head */
97 	uint16_t head;
98 	/* New packets from rte_eth_rx_burst is enqued from tail */
99 	uint16_t tail;
100 	/* last element in the buffer before rollover */
101 	uint16_t last;
102 	uint16_t last_mask;
103 };
104 
105 struct event_eth_rx_adapter {
106 	/* RSS key */
107 	uint8_t rss_key_be[RSS_KEY_SIZE];
108 	/* Event device identifier */
109 	uint8_t eventdev_id;
110 	/* Event port identifier */
111 	uint8_t event_port_id;
112 	/* Flag indicating per rxq event buffer */
113 	bool use_queue_event_buf;
114 	/* Per ethernet device structure */
115 	struct eth_device_info *eth_devices;
116 	/* Lock to serialize config updates with service function */
117 	rte_spinlock_t rx_lock;
118 	/* Max mbufs processed in any service function invocation */
119 	uint32_t max_nb_rx;
120 	/* Receive queues that need to be polled */
121 	struct eth_rx_poll_entry *eth_rx_poll;
122 	/* Size of the eth_rx_poll array */
123 	uint16_t num_rx_polled;
124 	/* Weighted round robin schedule */
125 	uint32_t *wrr_sched;
126 	/* wrr_sched[] size */
127 	uint32_t wrr_len;
128 	/* Next entry in wrr[] to begin polling */
129 	uint32_t wrr_pos;
130 	/* Event burst buffer */
131 	struct eth_event_enqueue_buffer event_enqueue_buffer;
132 	/* Vector enable flag */
133 	uint8_t ena_vector;
134 	/* Timestamp of previous vector expiry list traversal */
135 	uint64_t prev_expiry_ts;
136 	/* Minimum ticks to wait before traversing expiry list */
137 	uint64_t vector_tmo_ticks;
138 	/* vector list */
139 	struct eth_rx_vector_data_list vector_list;
140 	/* Per adapter stats */
141 	struct rte_event_eth_rx_adapter_stats stats;
142 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
143 	uint16_t enq_block_count;
144 	/* Block start ts */
145 	uint64_t rx_enq_block_start_ts;
146 	/* epoll fd used to wait for Rx interrupts */
147 	int epd;
148 	/* Num of interrupt driven interrupt queues */
149 	uint32_t num_rx_intr;
150 	/* Used to send <dev id, queue id> of interrupting Rx queues from
151 	 * the interrupt thread to the Rx thread
152 	 */
153 	struct rte_ring *intr_ring;
154 	/* Rx Queue data (dev id, queue id) for the last non-empty
155 	 * queue polled
156 	 */
157 	union queue_data qd;
158 	/* queue_data is valid */
159 	int qd_valid;
160 	/* Interrupt ring lock, synchronizes Rx thread
161 	 * and interrupt thread
162 	 */
163 	rte_spinlock_t intr_ring_lock;
164 	/* event array passed to rte_poll_wait */
165 	struct rte_epoll_event *epoll_events;
166 	/* Count of interrupt vectors in use */
167 	uint32_t num_intr_vec;
168 	/* Thread blocked on Rx interrupts */
169 	pthread_t rx_intr_thread;
170 	/* Configuration callback for rte_service configuration */
171 	rte_event_eth_rx_adapter_conf_cb conf_cb;
172 	/* Configuration callback argument */
173 	void *conf_arg;
174 	/* Set if  default_cb is being used */
175 	int default_cb_arg;
176 	/* Service initialization state */
177 	uint8_t service_inited;
178 	/* Total count of Rx queues in adapter */
179 	uint32_t nb_queues;
180 	/* Memory allocation name */
181 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
182 	/* Socket identifier cached from eventdev */
183 	int socket_id;
184 	/* Per adapter EAL service */
185 	uint32_t service_id;
186 	/* Adapter started flag */
187 	uint8_t rxa_started;
188 	/* Adapter ID */
189 	uint8_t id;
190 } __rte_cache_aligned;
191 
192 /* Per eth device */
193 struct eth_device_info {
194 	struct rte_eth_dev *dev;
195 	struct eth_rx_queue_info *rx_queue;
196 	/* Rx callback */
197 	rte_event_eth_rx_adapter_cb_fn cb_fn;
198 	/* Rx callback argument */
199 	void *cb_arg;
200 	/* Set if ethdev->eventdev packet transfer uses a
201 	 * hardware mechanism
202 	 */
203 	uint8_t internal_event_port;
204 	/* Set if the adapter is processing rx queues for
205 	 * this eth device and packet processing has been
206 	 * started, allows for the code to know if the PMD
207 	 * rx_adapter_stop callback needs to be invoked
208 	 */
209 	uint8_t dev_rx_started;
210 	/* Number of queues added for this device */
211 	uint16_t nb_dev_queues;
212 	/* Number of poll based queues
213 	 * If nb_rx_poll > 0, the start callback will
214 	 * be invoked if not already invoked
215 	 */
216 	uint16_t nb_rx_poll;
217 	/* Number of interrupt based queues
218 	 * If nb_rx_intr > 0, the start callback will
219 	 * be invoked if not already invoked.
220 	 */
221 	uint16_t nb_rx_intr;
222 	/* Number of queues that use the shared interrupt */
223 	uint16_t nb_shared_intr;
224 	/* sum(wrr(q)) for all queues within the device
225 	 * useful when deleting all device queues
226 	 */
227 	uint32_t wrr_len;
228 	/* Intr based queue index to start polling from, this is used
229 	 * if the number of shared interrupts is non-zero
230 	 */
231 	uint16_t next_q_idx;
232 	/* Intr based queue indices */
233 	uint16_t *intr_queue;
234 	/* device generates per Rx queue interrupt for queue index
235 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
236 	 */
237 	int multi_intr_cap;
238 	/* shared interrupt enabled */
239 	int shared_intr_enabled;
240 };
241 
242 /* Per Rx queue */
243 struct eth_rx_queue_info {
244 	int queue_enabled;	/* True if added */
245 	int intr_enabled;
246 	uint8_t ena_vector;
247 	uint16_t wt;		/* Polling weight */
248 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
249 	uint64_t event;
250 	struct eth_rx_vector_data vector_data;
251 	struct eth_event_enqueue_buffer *event_buf;
252 	/* use adapter stats struct for queue level stats,
253 	 * as same stats need to be updated for adapter and queue
254 	 */
255 	struct rte_event_eth_rx_adapter_stats *stats;
256 };
257 
258 static struct event_eth_rx_adapter **event_eth_rx_adapter;
259 
260 /* Enable dynamic timestamp field in mbuf */
261 static uint64_t event_eth_rx_timestamp_dynflag;
262 static int event_eth_rx_timestamp_dynfield_offset = -1;
263 
264 static inline rte_mbuf_timestamp_t *
265 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
266 {
267 	return RTE_MBUF_DYNFIELD(mbuf,
268 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
269 }
270 
271 static inline int
272 rxa_validate_id(uint8_t id)
273 {
274 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
275 }
276 
277 static inline struct eth_event_enqueue_buffer *
278 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
279 		  uint16_t rx_queue_id,
280 		  struct rte_event_eth_rx_adapter_stats **stats)
281 {
282 	if (rx_adapter->use_queue_event_buf) {
283 		struct eth_device_info *dev_info =
284 			&rx_adapter->eth_devices[eth_dev_id];
285 		*stats = dev_info->rx_queue[rx_queue_id].stats;
286 		return dev_info->rx_queue[rx_queue_id].event_buf;
287 	} else {
288 		*stats = &rx_adapter->stats;
289 		return &rx_adapter->event_enqueue_buffer;
290 	}
291 }
292 
293 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
294 	if (!rxa_validate_id(id)) { \
295 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
296 		return retval; \
297 	} \
298 } while (0)
299 
300 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(id, retval) do { \
301 	if (!rxa_validate_id(id)) { \
302 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
303 		ret = retval; \
304 		goto error; \
305 	} \
306 } while (0)
307 
308 #define RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, retval) do { \
309 	if ((token) == NULL || strlen(token) == 0 || !isdigit(*token)) { \
310 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter token\n"); \
311 		ret = retval; \
312 		goto error; \
313 	} \
314 } while (0)
315 
316 #define RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(port_id, retval) do { \
317 	if (!rte_eth_dev_is_valid_port(port_id)) { \
318 		RTE_ETHDEV_LOG(ERR, "Invalid port_id=%u\n", port_id); \
319 		ret = retval; \
320 		goto error; \
321 	} \
322 } while (0)
323 
324 static inline int
325 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
326 {
327 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
328 }
329 
330 /* Greatest common divisor */
331 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
332 {
333 	uint16_t r = a % b;
334 
335 	return r ? rxa_gcd_u16(b, r) : b;
336 }
337 
338 /* Returns the next queue in the polling sequence
339  *
340  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
341  */
342 static int
343 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
344 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
345 	     uint16_t gcd, int prev)
346 {
347 	int i = prev;
348 	uint16_t w;
349 
350 	while (1) {
351 		uint16_t q;
352 		uint16_t d;
353 
354 		i = (i + 1) % n;
355 		if (i == 0) {
356 			*cw = *cw - gcd;
357 			if (*cw <= 0)
358 				*cw = max_wt;
359 		}
360 
361 		q = eth_rx_poll[i].eth_rx_qid;
362 		d = eth_rx_poll[i].eth_dev_id;
363 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
364 
365 		if ((int)w >= *cw)
366 			return i;
367 	}
368 }
369 
370 static inline int
371 rxa_shared_intr(struct eth_device_info *dev_info,
372 	int rx_queue_id)
373 {
374 	int multi_intr_cap;
375 
376 	if (dev_info->dev->intr_handle == NULL)
377 		return 0;
378 
379 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
380 	return !multi_intr_cap ||
381 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
382 }
383 
384 static inline int
385 rxa_intr_queue(struct eth_device_info *dev_info,
386 	int rx_queue_id)
387 {
388 	struct eth_rx_queue_info *queue_info;
389 
390 	queue_info = &dev_info->rx_queue[rx_queue_id];
391 	return dev_info->rx_queue &&
392 		!dev_info->internal_event_port &&
393 		queue_info->queue_enabled && queue_info->wt == 0;
394 }
395 
396 static inline int
397 rxa_polled_queue(struct eth_device_info *dev_info,
398 	int rx_queue_id)
399 {
400 	struct eth_rx_queue_info *queue_info;
401 
402 	queue_info = &dev_info->rx_queue[rx_queue_id];
403 	return !dev_info->internal_event_port &&
404 		dev_info->rx_queue &&
405 		queue_info->queue_enabled && queue_info->wt != 0;
406 }
407 
408 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
409 static int
410 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
411 {
412 	uint16_t i;
413 	int n, s;
414 	uint16_t nbq;
415 
416 	nbq = dev_info->dev->data->nb_rx_queues;
417 	n = 0; /* non shared count */
418 	s = 0; /* shared count */
419 
420 	if (rx_queue_id == -1) {
421 		for (i = 0; i < nbq; i++) {
422 			if (!rxa_shared_intr(dev_info, i))
423 				n += add ? !rxa_intr_queue(dev_info, i) :
424 					rxa_intr_queue(dev_info, i);
425 			else
426 				s += add ? !rxa_intr_queue(dev_info, i) :
427 					rxa_intr_queue(dev_info, i);
428 		}
429 
430 		if (s > 0) {
431 			if ((add && dev_info->nb_shared_intr == 0) ||
432 				(!add && dev_info->nb_shared_intr))
433 				n += 1;
434 		}
435 	} else {
436 		if (!rxa_shared_intr(dev_info, rx_queue_id))
437 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
438 				rxa_intr_queue(dev_info, rx_queue_id);
439 		else
440 			n = add ? !dev_info->nb_shared_intr :
441 				dev_info->nb_shared_intr == 1;
442 	}
443 
444 	return add ? n : -n;
445 }
446 
447 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
448  */
449 static void
450 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
451 			  struct eth_device_info *dev_info, int rx_queue_id,
452 			  uint32_t *nb_rx_intr)
453 {
454 	uint32_t intr_diff;
455 
456 	if (rx_queue_id == -1)
457 		intr_diff = dev_info->nb_rx_intr;
458 	else
459 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
460 
461 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
462 }
463 
464 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
465  * interrupt queues could currently be poll mode Rx queues
466  */
467 static void
468 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
469 			  struct eth_device_info *dev_info, int rx_queue_id,
470 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
471 			  uint32_t *nb_wrr)
472 {
473 	uint32_t intr_diff;
474 	uint32_t poll_diff;
475 	uint32_t wrr_len_diff;
476 
477 	if (rx_queue_id == -1) {
478 		intr_diff = dev_info->dev->data->nb_rx_queues -
479 						dev_info->nb_rx_intr;
480 		poll_diff = dev_info->nb_rx_poll;
481 		wrr_len_diff = dev_info->wrr_len;
482 	} else {
483 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
484 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
485 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
486 					0;
487 	}
488 
489 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
490 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
491 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
492 }
493 
494 /* Calculate size of the eth_rx_poll and wrr_sched arrays
495  * after deleting poll mode rx queues
496  */
497 static void
498 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
499 			  struct eth_device_info *dev_info, int rx_queue_id,
500 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
501 {
502 	uint32_t poll_diff;
503 	uint32_t wrr_len_diff;
504 
505 	if (rx_queue_id == -1) {
506 		poll_diff = dev_info->nb_rx_poll;
507 		wrr_len_diff = dev_info->wrr_len;
508 	} else {
509 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
510 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
511 					0;
512 	}
513 
514 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
515 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
516 }
517 
518 /* Calculate nb_rx_* after adding poll mode rx queues
519  */
520 static void
521 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
522 			  struct eth_device_info *dev_info, int rx_queue_id,
523 			  uint16_t wt, uint32_t *nb_rx_poll,
524 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
525 {
526 	uint32_t intr_diff;
527 	uint32_t poll_diff;
528 	uint32_t wrr_len_diff;
529 
530 	if (rx_queue_id == -1) {
531 		intr_diff = dev_info->nb_rx_intr;
532 		poll_diff = dev_info->dev->data->nb_rx_queues -
533 						dev_info->nb_rx_poll;
534 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
535 				- dev_info->wrr_len;
536 	} else {
537 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
538 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
539 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
540 				wt - dev_info->rx_queue[rx_queue_id].wt :
541 				wt;
542 	}
543 
544 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
545 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
546 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
547 }
548 
549 /* Calculate nb_rx_* after adding rx_queue_id */
550 static void
551 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
552 		     struct eth_device_info *dev_info, int rx_queue_id,
553 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
554 		     uint32_t *nb_wrr)
555 {
556 	if (wt != 0)
557 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
558 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
559 	else
560 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
561 					nb_rx_poll, nb_rx_intr, nb_wrr);
562 }
563 
564 /* Calculate nb_rx_* after deleting rx_queue_id */
565 static void
566 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
567 		     struct eth_device_info *dev_info, int rx_queue_id,
568 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
569 		     uint32_t *nb_wrr)
570 {
571 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
572 				nb_wrr);
573 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
574 				nb_rx_intr);
575 }
576 
577 /*
578  * Allocate the rx_poll array
579  */
580 static struct eth_rx_poll_entry *
581 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
582 {
583 	size_t len;
584 
585 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
586 							RTE_CACHE_LINE_SIZE);
587 	return  rte_zmalloc_socket(rx_adapter->mem_name,
588 				len,
589 				RTE_CACHE_LINE_SIZE,
590 				rx_adapter->socket_id);
591 }
592 
593 /*
594  * Allocate the WRR array
595  */
596 static uint32_t *
597 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
598 {
599 	size_t len;
600 
601 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
602 			RTE_CACHE_LINE_SIZE);
603 	return  rte_zmalloc_socket(rx_adapter->mem_name,
604 				len,
605 				RTE_CACHE_LINE_SIZE,
606 				rx_adapter->socket_id);
607 }
608 
609 static int
610 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
611 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
612 		      uint32_t **wrr_sched)
613 {
614 
615 	if (nb_poll == 0) {
616 		*rx_poll = NULL;
617 		*wrr_sched = NULL;
618 		return 0;
619 	}
620 
621 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
622 	if (*rx_poll == NULL) {
623 		*wrr_sched = NULL;
624 		return -ENOMEM;
625 	}
626 
627 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
628 	if (*wrr_sched == NULL) {
629 		rte_free(*rx_poll);
630 		return -ENOMEM;
631 	}
632 	return 0;
633 }
634 
635 /* Precalculate WRR polling sequence for all queues in rx_adapter */
636 static void
637 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
638 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
639 {
640 	uint16_t d;
641 	uint16_t q;
642 	unsigned int i;
643 	int prev = -1;
644 	int cw = -1;
645 
646 	/* Initialize variables for calculation of wrr schedule */
647 	uint16_t max_wrr_pos = 0;
648 	unsigned int poll_q = 0;
649 	uint16_t max_wt = 0;
650 	uint16_t gcd = 0;
651 
652 	if (rx_poll == NULL)
653 		return;
654 
655 	/* Generate array of all queues to poll, the size of this
656 	 * array is poll_q
657 	 */
658 	RTE_ETH_FOREACH_DEV(d) {
659 		uint16_t nb_rx_queues;
660 		struct eth_device_info *dev_info =
661 				&rx_adapter->eth_devices[d];
662 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
663 		if (dev_info->rx_queue == NULL)
664 			continue;
665 		if (dev_info->internal_event_port)
666 			continue;
667 		dev_info->wrr_len = 0;
668 		for (q = 0; q < nb_rx_queues; q++) {
669 			struct eth_rx_queue_info *queue_info =
670 				&dev_info->rx_queue[q];
671 			uint16_t wt;
672 
673 			if (!rxa_polled_queue(dev_info, q))
674 				continue;
675 			wt = queue_info->wt;
676 			rx_poll[poll_q].eth_dev_id = d;
677 			rx_poll[poll_q].eth_rx_qid = q;
678 			max_wrr_pos += wt;
679 			dev_info->wrr_len += wt;
680 			max_wt = RTE_MAX(max_wt, wt);
681 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
682 			poll_q++;
683 		}
684 	}
685 
686 	/* Generate polling sequence based on weights */
687 	prev = -1;
688 	cw = -1;
689 	for (i = 0; i < max_wrr_pos; i++) {
690 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
691 				     rx_poll, max_wt, gcd, prev);
692 		prev = rx_wrr[i];
693 	}
694 }
695 
696 static inline void
697 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
698 	struct rte_ipv6_hdr **ipv6_hdr)
699 {
700 	struct rte_ether_hdr *eth_hdr =
701 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
702 	struct rte_vlan_hdr *vlan_hdr;
703 
704 	*ipv4_hdr = NULL;
705 	*ipv6_hdr = NULL;
706 
707 	switch (eth_hdr->ether_type) {
708 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
709 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
710 		break;
711 
712 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
713 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
714 		break;
715 
716 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
717 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
718 		switch (vlan_hdr->eth_proto) {
719 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
720 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
721 			break;
722 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
723 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
724 			break;
725 		default:
726 			break;
727 		}
728 		break;
729 
730 	default:
731 		break;
732 	}
733 }
734 
735 /* Calculate RSS hash for IPv4/6 */
736 static inline uint32_t
737 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
738 {
739 	uint32_t input_len;
740 	void *tuple;
741 	struct rte_ipv4_tuple ipv4_tuple;
742 	struct rte_ipv6_tuple ipv6_tuple;
743 	struct rte_ipv4_hdr *ipv4_hdr;
744 	struct rte_ipv6_hdr *ipv6_hdr;
745 
746 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
747 
748 	if (ipv4_hdr) {
749 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
750 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
751 		tuple = &ipv4_tuple;
752 		input_len = RTE_THASH_V4_L3_LEN;
753 	} else if (ipv6_hdr) {
754 		rte_thash_load_v6_addrs(ipv6_hdr,
755 					(union rte_thash_tuple *)&ipv6_tuple);
756 		tuple = &ipv6_tuple;
757 		input_len = RTE_THASH_V6_L3_LEN;
758 	} else
759 		return 0;
760 
761 	return rte_softrss_be(tuple, input_len, rss_key_be);
762 }
763 
764 static inline int
765 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
766 {
767 	return !!rx_adapter->enq_block_count;
768 }
769 
770 static inline void
771 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
772 {
773 	if (rx_adapter->rx_enq_block_start_ts)
774 		return;
775 
776 	rx_adapter->enq_block_count++;
777 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
778 		return;
779 
780 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
781 }
782 
783 static inline void
784 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
785 		     struct rte_event_eth_rx_adapter_stats *stats)
786 {
787 	if (unlikely(!stats->rx_enq_start_ts))
788 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
789 
790 	if (likely(!rxa_enq_blocked(rx_adapter)))
791 		return;
792 
793 	rx_adapter->enq_block_count = 0;
794 	if (rx_adapter->rx_enq_block_start_ts) {
795 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
796 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
797 		    rx_adapter->rx_enq_block_start_ts;
798 		rx_adapter->rx_enq_block_start_ts = 0;
799 	}
800 }
801 
802 /* Enqueue buffered events to event device */
803 static inline uint16_t
804 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
805 		       struct eth_event_enqueue_buffer *buf,
806 		       struct rte_event_eth_rx_adapter_stats *stats)
807 {
808 	uint16_t count = buf->count;
809 	uint16_t n = 0;
810 
811 	if (!count)
812 		return 0;
813 
814 	if (buf->last)
815 		count = buf->last - buf->head;
816 
817 	if (count) {
818 		n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
819 						rx_adapter->event_port_id,
820 						&buf->events[buf->head],
821 						count);
822 		if (n != count)
823 			stats->rx_enq_retry++;
824 
825 		buf->head += n;
826 	}
827 
828 	if (buf->last && n == count) {
829 		uint16_t n1;
830 
831 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
832 					rx_adapter->event_port_id,
833 					&buf->events[0],
834 					buf->tail);
835 
836 		if (n1 != buf->tail)
837 			stats->rx_enq_retry++;
838 
839 		buf->last = 0;
840 		buf->head = n1;
841 		buf->last_mask = 0;
842 		n += n1;
843 	}
844 
845 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
846 		rxa_enq_block_start_ts(rx_adapter);
847 
848 	buf->count -= n;
849 	stats->rx_enq_count += n;
850 
851 	return n;
852 }
853 
854 static inline void
855 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
856 		struct eth_rx_vector_data *vec)
857 {
858 	vec->vector_ev->nb_elem = 0;
859 	vec->vector_ev->port = vec->port;
860 	vec->vector_ev->queue = vec->queue;
861 	vec->vector_ev->attr_valid = true;
862 	vec->vector_ev->elem_offset = 0;
863 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
864 }
865 
866 static inline uint16_t
867 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
868 			struct eth_rx_queue_info *queue_info,
869 			struct eth_event_enqueue_buffer *buf,
870 			struct rte_mbuf **mbufs, uint16_t num)
871 {
872 	struct rte_event *ev = &buf->events[buf->count];
873 	struct eth_rx_vector_data *vec;
874 	uint16_t filled, space, sz;
875 
876 	filled = 0;
877 	vec = &queue_info->vector_data;
878 
879 	if (vec->vector_ev == NULL) {
880 		if (rte_mempool_get(vec->vector_pool,
881 				    (void **)&vec->vector_ev) < 0) {
882 			rte_pktmbuf_free_bulk(mbufs, num);
883 			return 0;
884 		}
885 		rxa_init_vector(rx_adapter, vec);
886 	}
887 	while (num) {
888 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
889 			/* Event ready. */
890 			ev->event = vec->event;
891 			ev->vec = vec->vector_ev;
892 			ev++;
893 			filled++;
894 			vec->vector_ev = NULL;
895 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
896 			if (rte_mempool_get(vec->vector_pool,
897 					    (void **)&vec->vector_ev) < 0) {
898 				rte_pktmbuf_free_bulk(mbufs, num);
899 				return 0;
900 			}
901 			rxa_init_vector(rx_adapter, vec);
902 		}
903 
904 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
905 		sz = num > space ? space : num;
906 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
907 		       sizeof(void *) * sz);
908 		vec->vector_ev->nb_elem += sz;
909 		num -= sz;
910 		mbufs += sz;
911 		vec->ts = rte_rdtsc();
912 	}
913 
914 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
915 		ev->event = vec->event;
916 		ev->vec = vec->vector_ev;
917 		ev++;
918 		filled++;
919 		vec->vector_ev = NULL;
920 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
921 	}
922 
923 	return filled;
924 }
925 
926 static inline void
927 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
928 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
929 		 struct eth_event_enqueue_buffer *buf,
930 		 struct rte_event_eth_rx_adapter_stats *stats)
931 {
932 	uint32_t i;
933 	struct eth_device_info *dev_info =
934 					&rx_adapter->eth_devices[eth_dev_id];
935 	struct eth_rx_queue_info *eth_rx_queue_info =
936 					&dev_info->rx_queue[rx_queue_id];
937 	uint16_t new_tail = buf->tail;
938 	uint64_t event = eth_rx_queue_info->event;
939 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
940 	struct rte_mbuf *m = mbufs[0];
941 	uint32_t rss_mask;
942 	uint32_t rss;
943 	int do_rss;
944 	uint16_t nb_cb;
945 	uint16_t dropped;
946 	uint64_t ts, ts_mask;
947 
948 	if (!eth_rx_queue_info->ena_vector) {
949 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
950 						0 : rte_get_tsc_cycles();
951 
952 		/* 0xffff ffff ffff ffff if RTE_MBUF_F_RX_TIMESTAMP is set,
953 		 * otherwise 0
954 		 */
955 		ts_mask = (uint64_t)(!(m->ol_flags &
956 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
957 
958 		/* 0xffff ffff if RTE_MBUF_F_RX_RSS_HASH is set, otherwise 0 */
959 		rss_mask = ~(((m->ol_flags & RTE_MBUF_F_RX_RSS_HASH) != 0) - 1);
960 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
961 		for (i = 0; i < num; i++) {
962 			struct rte_event *ev;
963 
964 			m = mbufs[i];
965 			*rxa_timestamp_dynfield(m) = ts |
966 					(*rxa_timestamp_dynfield(m) & ts_mask);
967 
968 			ev = &buf->events[new_tail];
969 
970 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
971 				     : m->hash.rss;
972 			ev->event = event;
973 			ev->flow_id = (rss & ~flow_id_mask) |
974 				      (ev->flow_id & flow_id_mask);
975 			ev->mbuf = m;
976 			new_tail++;
977 		}
978 	} else {
979 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
980 					      buf, mbufs, num);
981 	}
982 
983 	if (num && dev_info->cb_fn) {
984 
985 		dropped = 0;
986 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
987 				       buf->last |
988 				       (buf->events_size & ~buf->last_mask),
989 				       buf->count >= BATCH_SIZE ?
990 						buf->count - BATCH_SIZE : 0,
991 				       &buf->events[buf->tail],
992 				       num,
993 				       dev_info->cb_arg,
994 				       &dropped);
995 		if (unlikely(nb_cb > num))
996 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
997 				nb_cb, num);
998 		else
999 			num = nb_cb;
1000 		if (dropped)
1001 			stats->rx_dropped += dropped;
1002 	}
1003 
1004 	buf->count += num;
1005 	buf->tail += num;
1006 }
1007 
1008 static inline bool
1009 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
1010 {
1011 	uint32_t nb_req = buf->tail + BATCH_SIZE;
1012 
1013 	if (!buf->last) {
1014 		if (nb_req <= buf->events_size)
1015 			return true;
1016 
1017 		if (buf->head >= BATCH_SIZE) {
1018 			buf->last_mask = ~0;
1019 			buf->last = buf->tail;
1020 			buf->tail = 0;
1021 			return true;
1022 		}
1023 	}
1024 
1025 	return nb_req <= buf->head;
1026 }
1027 
1028 /* Enqueue packets from  <port, q>  to event buffer */
1029 static inline uint32_t
1030 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
1031 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
1032 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf,
1033 	   struct rte_event_eth_rx_adapter_stats *stats)
1034 {
1035 	struct rte_mbuf *mbufs[BATCH_SIZE];
1036 	uint16_t n;
1037 	uint32_t nb_rx = 0;
1038 	uint32_t nb_flushed = 0;
1039 
1040 	if (rxq_empty)
1041 		*rxq_empty = 0;
1042 	/* Don't do a batch dequeue from the rx queue if there isn't
1043 	 * enough space in the enqueue buffer.
1044 	 */
1045 	while (rxa_pkt_buf_available(buf)) {
1046 		if (buf->count >= BATCH_SIZE)
1047 			nb_flushed +=
1048 				rxa_flush_event_buffer(rx_adapter, buf, stats);
1049 
1050 		stats->rx_poll_count++;
1051 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1052 		if (unlikely(!n)) {
1053 			if (rxq_empty)
1054 				*rxq_empty = 1;
1055 			break;
1056 		}
1057 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf,
1058 				 stats);
1059 		nb_rx += n;
1060 		if (rx_count + nb_rx > max_rx)
1061 			break;
1062 	}
1063 
1064 	if (buf->count > 0)
1065 		nb_flushed += rxa_flush_event_buffer(rx_adapter, buf, stats);
1066 
1067 	stats->rx_packets += nb_rx;
1068 	if (nb_flushed == 0)
1069 		rte_event_maintain(rx_adapter->eventdev_id,
1070 				   rx_adapter->event_port_id, 0);
1071 
1072 	return nb_rx;
1073 }
1074 
1075 static inline void
1076 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1077 {
1078 	uint16_t port_id;
1079 	uint16_t queue;
1080 	int err;
1081 	union queue_data qd;
1082 	struct eth_device_info *dev_info;
1083 	struct eth_rx_queue_info *queue_info;
1084 	int *intr_enabled;
1085 
1086 	qd.ptr = data;
1087 	port_id = qd.port;
1088 	queue = qd.queue;
1089 
1090 	dev_info = &rx_adapter->eth_devices[port_id];
1091 	queue_info = &dev_info->rx_queue[queue];
1092 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1093 	if (rxa_shared_intr(dev_info, queue))
1094 		intr_enabled = &dev_info->shared_intr_enabled;
1095 	else
1096 		intr_enabled = &queue_info->intr_enabled;
1097 
1098 	if (*intr_enabled) {
1099 		*intr_enabled = 0;
1100 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1101 		/* Entry should always be available.
1102 		 * The ring size equals the maximum number of interrupt
1103 		 * vectors supported (an interrupt vector is shared in
1104 		 * case of shared interrupts)
1105 		 */
1106 		if (err)
1107 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1108 				" to ring: %s", strerror(-err));
1109 		else
1110 			rte_eth_dev_rx_intr_disable(port_id, queue);
1111 	}
1112 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1113 }
1114 
1115 static int
1116 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1117 			  uint32_t num_intr_vec)
1118 {
1119 	if (rx_adapter->num_intr_vec + num_intr_vec >
1120 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1121 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1122 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1123 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1124 		return -ENOSPC;
1125 	}
1126 
1127 	return 0;
1128 }
1129 
1130 /* Delete entries for (dev, queue) from the interrupt ring */
1131 static void
1132 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1133 			  struct eth_device_info *dev_info,
1134 			  uint16_t rx_queue_id)
1135 {
1136 	int i, n;
1137 	union queue_data qd;
1138 
1139 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1140 
1141 	n = rte_ring_count(rx_adapter->intr_ring);
1142 	for (i = 0; i < n; i++) {
1143 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1144 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1145 			if (qd.port == dev_info->dev->data->port_id &&
1146 				qd.queue == rx_queue_id)
1147 				continue;
1148 		} else {
1149 			if (qd.port == dev_info->dev->data->port_id)
1150 				continue;
1151 		}
1152 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1153 	}
1154 
1155 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1156 }
1157 
1158 /* pthread callback handling interrupt mode receive queues
1159  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1160  * interrupting queue to the adapter's ring buffer for interrupt events.
1161  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1162  * the adapter service function.
1163  */
1164 static void *
1165 rxa_intr_thread(void *arg)
1166 {
1167 	struct event_eth_rx_adapter *rx_adapter = arg;
1168 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1169 	int n, i;
1170 
1171 	while (1) {
1172 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1173 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1174 		if (unlikely(n < 0))
1175 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1176 					n);
1177 		for (i = 0; i < n; i++) {
1178 			rxa_intr_ring_enqueue(rx_adapter,
1179 					epoll_events[i].epdata.data);
1180 		}
1181 	}
1182 
1183 	return NULL;
1184 }
1185 
1186 /* Dequeue <port, q> from interrupt ring and enqueue received
1187  * mbufs to eventdev
1188  */
1189 static inline bool
1190 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1191 {
1192 	uint32_t n;
1193 	uint32_t nb_rx = 0;
1194 	int rxq_empty;
1195 	struct eth_event_enqueue_buffer *buf;
1196 	struct rte_event_eth_rx_adapter_stats *stats;
1197 	rte_spinlock_t *ring_lock;
1198 	uint8_t max_done = 0;
1199 	bool work = false;
1200 
1201 	if (rx_adapter->num_rx_intr == 0)
1202 		return work;
1203 
1204 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1205 		&& !rx_adapter->qd_valid)
1206 		return work;
1207 
1208 	buf = &rx_adapter->event_enqueue_buffer;
1209 	stats = &rx_adapter->stats;
1210 	ring_lock = &rx_adapter->intr_ring_lock;
1211 
1212 	if (buf->count >= BATCH_SIZE) {
1213 		uint16_t n;
1214 
1215 		n = rxa_flush_event_buffer(rx_adapter, buf, stats);
1216 
1217 		if (likely(n > 0))
1218 			work = true;
1219 	}
1220 
1221 	while (rxa_pkt_buf_available(buf)) {
1222 		struct eth_device_info *dev_info;
1223 		uint16_t port;
1224 		uint16_t queue;
1225 		union queue_data qd  = rx_adapter->qd;
1226 		int err;
1227 
1228 		if (!rx_adapter->qd_valid) {
1229 			struct eth_rx_queue_info *queue_info;
1230 
1231 			rte_spinlock_lock(ring_lock);
1232 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1233 			if (err) {
1234 				rte_spinlock_unlock(ring_lock);
1235 				break;
1236 			}
1237 
1238 			port = qd.port;
1239 			queue = qd.queue;
1240 			rx_adapter->qd = qd;
1241 			rx_adapter->qd_valid = 1;
1242 			dev_info = &rx_adapter->eth_devices[port];
1243 			if (rxa_shared_intr(dev_info, queue))
1244 				dev_info->shared_intr_enabled = 1;
1245 			else {
1246 				queue_info = &dev_info->rx_queue[queue];
1247 				queue_info->intr_enabled = 1;
1248 			}
1249 			rte_eth_dev_rx_intr_enable(port, queue);
1250 			rte_spinlock_unlock(ring_lock);
1251 		} else {
1252 			port = qd.port;
1253 			queue = qd.queue;
1254 
1255 			dev_info = &rx_adapter->eth_devices[port];
1256 		}
1257 
1258 		if (rxa_shared_intr(dev_info, queue)) {
1259 			uint16_t i;
1260 			uint16_t nb_queues;
1261 
1262 			nb_queues = dev_info->dev->data->nb_rx_queues;
1263 			n = 0;
1264 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1265 				uint8_t enq_buffer_full;
1266 
1267 				if (!rxa_intr_queue(dev_info, i))
1268 					continue;
1269 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1270 					rx_adapter->max_nb_rx,
1271 					&rxq_empty, buf, stats);
1272 				nb_rx += n;
1273 
1274 				enq_buffer_full = !rxq_empty && n == 0;
1275 				max_done = nb_rx > rx_adapter->max_nb_rx;
1276 
1277 				if (enq_buffer_full || max_done) {
1278 					dev_info->next_q_idx = i;
1279 					goto done;
1280 				}
1281 			}
1282 
1283 			rx_adapter->qd_valid = 0;
1284 
1285 			/* Reinitialize for next interrupt */
1286 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1287 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1288 						0;
1289 		} else {
1290 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1291 				rx_adapter->max_nb_rx,
1292 				&rxq_empty, buf, stats);
1293 			rx_adapter->qd_valid = !rxq_empty;
1294 			nb_rx += n;
1295 			if (nb_rx > rx_adapter->max_nb_rx)
1296 				break;
1297 		}
1298 	}
1299 
1300 done:
1301 	if (nb_rx > 0) {
1302 		rx_adapter->stats.rx_intr_packets += nb_rx;
1303 		work = true;
1304 	}
1305 
1306 	return work;
1307 }
1308 
1309 /*
1310  * Polls receive queues added to the event adapter and enqueues received
1311  * packets to the event device.
1312  *
1313  * The receive code enqueues initially to a temporary buffer, the
1314  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1315  *
1316  * If there isn't space available in the temporary buffer, packets from the
1317  * Rx queue aren't dequeued from the eth device, this back pressures the
1318  * eth device, in virtual device environments this back pressure is relayed to
1319  * the hypervisor's switching layer where adjustments can be made to deal with
1320  * it.
1321  */
1322 static inline bool
1323 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1324 {
1325 	uint32_t num_queue;
1326 	uint32_t nb_rx = 0;
1327 	struct eth_event_enqueue_buffer *buf = NULL;
1328 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1329 	uint32_t wrr_pos;
1330 	uint32_t max_nb_rx;
1331 	bool work = false;
1332 
1333 	wrr_pos = rx_adapter->wrr_pos;
1334 	max_nb_rx = rx_adapter->max_nb_rx;
1335 
1336 	/* Iterate through a WRR sequence */
1337 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1338 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1339 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1340 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1341 
1342 		buf = rxa_event_buf_get(rx_adapter, d, qid, &stats);
1343 
1344 		/* Don't do a batch dequeue from the rx queue if there isn't
1345 		 * enough space in the enqueue buffer.
1346 		 */
1347 		if (buf->count >= BATCH_SIZE) {
1348 			uint16_t n;
1349 
1350 			n = rxa_flush_event_buffer(rx_adapter, buf, stats);
1351 
1352 			if (likely(n > 0))
1353 				work = true;
1354 		}
1355 		if (!rxa_pkt_buf_available(buf)) {
1356 			if (rx_adapter->use_queue_event_buf)
1357 				goto poll_next_entry;
1358 			else {
1359 				rx_adapter->wrr_pos = wrr_pos;
1360 				break;
1361 			}
1362 		}
1363 
1364 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1365 				NULL, buf, stats);
1366 		if (nb_rx > max_nb_rx) {
1367 			rx_adapter->wrr_pos =
1368 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1369 			break;
1370 		}
1371 
1372 poll_next_entry:
1373 		if (++wrr_pos == rx_adapter->wrr_len)
1374 			wrr_pos = 0;
1375 	}
1376 
1377 	if (nb_rx > 0)
1378 		work = true;
1379 
1380 	return work;
1381 }
1382 
1383 static void
1384 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1385 {
1386 	struct event_eth_rx_adapter *rx_adapter = arg;
1387 	struct eth_event_enqueue_buffer *buf = NULL;
1388 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1389 	struct rte_event *ev;
1390 
1391 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue, &stats);
1392 
1393 	if (buf->count)
1394 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1395 
1396 	if (vec->vector_ev->nb_elem == 0)
1397 		return;
1398 	ev = &buf->events[buf->count];
1399 
1400 	/* Event ready. */
1401 	ev->event = vec->event;
1402 	ev->vec = vec->vector_ev;
1403 	buf->count++;
1404 
1405 	vec->vector_ev = NULL;
1406 	vec->ts = 0;
1407 }
1408 
1409 static int
1410 rxa_service_func(void *args)
1411 {
1412 	struct event_eth_rx_adapter *rx_adapter = args;
1413 	bool intr_work;
1414 	bool poll_work;
1415 
1416 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1417 		return -EAGAIN;
1418 	if (!rx_adapter->rxa_started) {
1419 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1420 		return -EAGAIN;
1421 	}
1422 
1423 	if (rx_adapter->ena_vector) {
1424 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1425 		    rx_adapter->vector_tmo_ticks) {
1426 			struct eth_rx_vector_data *vec;
1427 
1428 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1429 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1430 
1431 				if (elapsed_time >= vec->vector_timeout_ticks) {
1432 					rxa_vector_expire(vec, rx_adapter);
1433 					TAILQ_REMOVE(&rx_adapter->vector_list,
1434 						     vec, next);
1435 				}
1436 			}
1437 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1438 		}
1439 	}
1440 
1441 	intr_work = rxa_intr_ring_dequeue(rx_adapter);
1442 	poll_work = rxa_poll(rx_adapter);
1443 
1444 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1445 
1446 	return intr_work || poll_work ? 0 : -EAGAIN;
1447 }
1448 
1449 static void *
1450 rxa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
1451 {
1452 	const struct rte_memzone *mz;
1453 	unsigned int sz;
1454 
1455 	sz = elt_size * nb_elems;
1456 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1457 
1458 	mz = rte_memzone_lookup(name);
1459 	if (mz == NULL) {
1460 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1461 						 RTE_CACHE_LINE_SIZE);
1462 		if (mz == NULL) {
1463 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
1464 					 " name = %s, err = %"
1465 					 PRId32, name, rte_errno);
1466 			return NULL;
1467 		}
1468 	}
1469 
1470 	return mz->addr;
1471 }
1472 
1473 static int
1474 rte_event_eth_rx_adapter_init(void)
1475 {
1476 	uint8_t i;
1477 
1478 	if (event_eth_rx_adapter == NULL) {
1479 		event_eth_rx_adapter =
1480 			rxa_memzone_array_get(RXA_ADAPTER_ARRAY,
1481 					sizeof(*event_eth_rx_adapter),
1482 					RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE);
1483 		if (event_eth_rx_adapter == NULL)
1484 			return -ENOMEM;
1485 
1486 		for (i = 0; i < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; i++)
1487 			event_eth_rx_adapter[i] = NULL;
1488 
1489 	}
1490 
1491 	return 0;
1492 }
1493 
1494 static int
1495 rxa_memzone_lookup(void)
1496 {
1497 	const struct rte_memzone *mz;
1498 
1499 	if (event_eth_rx_adapter == NULL) {
1500 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1501 		if (mz == NULL)
1502 			return -ENOMEM;
1503 
1504 		event_eth_rx_adapter = mz->addr;
1505 	}
1506 
1507 	return 0;
1508 }
1509 
1510 static inline struct event_eth_rx_adapter *
1511 rxa_id_to_adapter(uint8_t id)
1512 {
1513 	return event_eth_rx_adapter ?
1514 		event_eth_rx_adapter[id] : NULL;
1515 }
1516 
1517 static int
1518 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1519 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1520 {
1521 	int ret;
1522 	struct rte_eventdev *dev;
1523 	struct rte_event_dev_config dev_conf;
1524 	int started;
1525 	uint8_t port_id;
1526 	struct rte_event_port_conf *port_conf = arg;
1527 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1528 
1529 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1530 	dev_conf = dev->data->dev_conf;
1531 
1532 	started = dev->data->dev_started;
1533 	if (started)
1534 		rte_event_dev_stop(dev_id);
1535 	port_id = dev_conf.nb_event_ports;
1536 	dev_conf.nb_event_ports += 1;
1537 	if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_SINGLE_LINK)
1538 		dev_conf.nb_single_link_event_port_queues += 1;
1539 
1540 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1541 	if (ret) {
1542 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1543 						dev_id);
1544 		if (started) {
1545 			if (rte_event_dev_start(dev_id))
1546 				return -EIO;
1547 		}
1548 		return ret;
1549 	}
1550 
1551 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1552 	if (ret) {
1553 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1554 					port_id);
1555 		return ret;
1556 	}
1557 
1558 	conf->event_port_id = port_id;
1559 	conf->max_nb_rx = RXA_NB_RX_WORK_DEFAULT;
1560 	if (started)
1561 		ret = rte_event_dev_start(dev_id);
1562 	rx_adapter->default_cb_arg = 1;
1563 	return ret;
1564 }
1565 
1566 static int
1567 rxa_epoll_create1(void)
1568 {
1569 #if defined(LINUX)
1570 	int fd;
1571 	fd = epoll_create1(EPOLL_CLOEXEC);
1572 	return fd < 0 ? -errno : fd;
1573 #elif defined(BSD)
1574 	return -ENOTSUP;
1575 #endif
1576 }
1577 
1578 static int
1579 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1580 {
1581 	if (rx_adapter->epd != INIT_FD)
1582 		return 0;
1583 
1584 	rx_adapter->epd = rxa_epoll_create1();
1585 	if (rx_adapter->epd < 0) {
1586 		int err = rx_adapter->epd;
1587 		rx_adapter->epd = INIT_FD;
1588 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1589 		return err;
1590 	}
1591 
1592 	return 0;
1593 }
1594 
1595 static int
1596 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1597 {
1598 	int err;
1599 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1600 
1601 	if (rx_adapter->intr_ring)
1602 		return 0;
1603 
1604 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1605 					RTE_EVENT_ETH_INTR_RING_SIZE,
1606 					rte_socket_id(), 0);
1607 	if (!rx_adapter->intr_ring)
1608 		return -ENOMEM;
1609 
1610 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1611 					RTE_EVENT_ETH_INTR_RING_SIZE *
1612 					sizeof(struct rte_epoll_event),
1613 					RTE_CACHE_LINE_SIZE,
1614 					rx_adapter->socket_id);
1615 	if (!rx_adapter->epoll_events) {
1616 		err = -ENOMEM;
1617 		goto error;
1618 	}
1619 
1620 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1621 
1622 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1623 			"rx-intr-thread-%d", rx_adapter->id);
1624 
1625 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1626 				NULL, rxa_intr_thread, rx_adapter);
1627 	if (!err)
1628 		return 0;
1629 
1630 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1631 	rte_free(rx_adapter->epoll_events);
1632 error:
1633 	rte_ring_free(rx_adapter->intr_ring);
1634 	rx_adapter->intr_ring = NULL;
1635 	rx_adapter->epoll_events = NULL;
1636 	return err;
1637 }
1638 
1639 static int
1640 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1641 {
1642 	int err;
1643 
1644 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1645 	if (err)
1646 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1647 				err);
1648 
1649 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1650 	if (err)
1651 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1652 
1653 	rte_free(rx_adapter->epoll_events);
1654 	rte_ring_free(rx_adapter->intr_ring);
1655 	rx_adapter->intr_ring = NULL;
1656 	rx_adapter->epoll_events = NULL;
1657 	return 0;
1658 }
1659 
1660 static int
1661 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1662 {
1663 	int ret;
1664 
1665 	if (rx_adapter->num_rx_intr == 0)
1666 		return 0;
1667 
1668 	ret = rxa_destroy_intr_thread(rx_adapter);
1669 	if (ret)
1670 		return ret;
1671 
1672 	close(rx_adapter->epd);
1673 	rx_adapter->epd = INIT_FD;
1674 
1675 	return ret;
1676 }
1677 
1678 static int
1679 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1680 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1681 {
1682 	int err;
1683 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1684 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1685 
1686 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1687 	if (err) {
1688 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1689 			rx_queue_id);
1690 		return err;
1691 	}
1692 
1693 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1694 					rx_adapter->epd,
1695 					RTE_INTR_EVENT_DEL,
1696 					0);
1697 	if (err)
1698 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1699 
1700 	if (sintr)
1701 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1702 	else
1703 		dev_info->shared_intr_enabled = 0;
1704 	return err;
1705 }
1706 
1707 static int
1708 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1709 		   struct eth_device_info *dev_info, int rx_queue_id)
1710 {
1711 	int err;
1712 	int i;
1713 	int s;
1714 
1715 	if (dev_info->nb_rx_intr == 0)
1716 		return 0;
1717 
1718 	err = 0;
1719 	if (rx_queue_id == -1) {
1720 		s = dev_info->nb_shared_intr;
1721 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1722 			int sintr;
1723 			uint16_t q;
1724 
1725 			q = dev_info->intr_queue[i];
1726 			sintr = rxa_shared_intr(dev_info, q);
1727 			s -= sintr;
1728 
1729 			if (!sintr || s == 0) {
1730 
1731 				err = rxa_disable_intr(rx_adapter, dev_info,
1732 						q);
1733 				if (err)
1734 					return err;
1735 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1736 							q);
1737 			}
1738 		}
1739 	} else {
1740 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1741 			return 0;
1742 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1743 				dev_info->nb_shared_intr == 1) {
1744 			err = rxa_disable_intr(rx_adapter, dev_info,
1745 					rx_queue_id);
1746 			if (err)
1747 				return err;
1748 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1749 						rx_queue_id);
1750 		}
1751 
1752 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1753 			if (dev_info->intr_queue[i] == rx_queue_id) {
1754 				for (; i < dev_info->nb_rx_intr - 1; i++)
1755 					dev_info->intr_queue[i] =
1756 						dev_info->intr_queue[i + 1];
1757 				break;
1758 			}
1759 		}
1760 	}
1761 
1762 	return err;
1763 }
1764 
1765 static int
1766 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1767 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1768 {
1769 	int err, err1;
1770 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1771 	union queue_data qd;
1772 	int init_fd;
1773 	uint16_t *intr_queue;
1774 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1775 
1776 	if (rxa_intr_queue(dev_info, rx_queue_id))
1777 		return 0;
1778 
1779 	intr_queue = dev_info->intr_queue;
1780 	if (dev_info->intr_queue == NULL) {
1781 		size_t len =
1782 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1783 		dev_info->intr_queue =
1784 			rte_zmalloc_socket(
1785 				rx_adapter->mem_name,
1786 				len,
1787 				0,
1788 				rx_adapter->socket_id);
1789 		if (dev_info->intr_queue == NULL)
1790 			return -ENOMEM;
1791 	}
1792 
1793 	init_fd = rx_adapter->epd;
1794 	err = rxa_init_epd(rx_adapter);
1795 	if (err)
1796 		goto err_free_queue;
1797 
1798 	qd.port = eth_dev_id;
1799 	qd.queue = rx_queue_id;
1800 
1801 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1802 					rx_adapter->epd,
1803 					RTE_INTR_EVENT_ADD,
1804 					qd.ptr);
1805 	if (err) {
1806 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1807 			" Rx Queue %u err %d", rx_queue_id, err);
1808 		goto err_del_fd;
1809 	}
1810 
1811 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1812 	if (err) {
1813 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1814 				" Rx Queue %u err %d", rx_queue_id, err);
1815 
1816 		goto err_del_event;
1817 	}
1818 
1819 	err = rxa_create_intr_thread(rx_adapter);
1820 	if (!err)  {
1821 		if (sintr)
1822 			dev_info->shared_intr_enabled = 1;
1823 		else
1824 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1825 		return 0;
1826 	}
1827 
1828 
1829 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1830 	if (err)
1831 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1832 				" Rx Queue %u err %d", rx_queue_id, err);
1833 err_del_event:
1834 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1835 					rx_adapter->epd,
1836 					RTE_INTR_EVENT_DEL,
1837 					0);
1838 	if (err1) {
1839 		RTE_EDEV_LOG_ERR("Could not delete event for"
1840 				" Rx Queue %u err %d", rx_queue_id, err1);
1841 	}
1842 err_del_fd:
1843 	if (init_fd == INIT_FD) {
1844 		close(rx_adapter->epd);
1845 		rx_adapter->epd = -1;
1846 	}
1847 err_free_queue:
1848 	if (intr_queue == NULL)
1849 		rte_free(dev_info->intr_queue);
1850 
1851 	return err;
1852 }
1853 
1854 static int
1855 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1856 		   struct eth_device_info *dev_info, int rx_queue_id)
1857 
1858 {
1859 	int i, j, err;
1860 	int si = -1;
1861 	int shared_done = (dev_info->nb_shared_intr > 0);
1862 
1863 	if (rx_queue_id != -1) {
1864 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1865 			return 0;
1866 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1867 	}
1868 
1869 	err = 0;
1870 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1871 
1872 		if (rxa_shared_intr(dev_info, i) && shared_done)
1873 			continue;
1874 
1875 		err = rxa_config_intr(rx_adapter, dev_info, i);
1876 
1877 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1878 		if (shared_done) {
1879 			si = i;
1880 			dev_info->shared_intr_enabled = 1;
1881 		}
1882 		if (err)
1883 			break;
1884 	}
1885 
1886 	if (err == 0)
1887 		return 0;
1888 
1889 	shared_done = (dev_info->nb_shared_intr > 0);
1890 	for (j = 0; j < i; j++) {
1891 		if (rxa_intr_queue(dev_info, j))
1892 			continue;
1893 		if (rxa_shared_intr(dev_info, j) && si != j)
1894 			continue;
1895 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1896 		if (err)
1897 			break;
1898 
1899 	}
1900 
1901 	return err;
1902 }
1903 
1904 static int
1905 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1906 {
1907 	int ret;
1908 	struct rte_service_spec service;
1909 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1910 
1911 	if (rx_adapter->service_inited)
1912 		return 0;
1913 
1914 	memset(&service, 0, sizeof(service));
1915 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1916 		"rte_event_eth_rx_adapter_%d", id);
1917 	service.socket_id = rx_adapter->socket_id;
1918 	service.callback = rxa_service_func;
1919 	service.callback_userdata = rx_adapter;
1920 	/* Service function handles locking for queue add/del updates */
1921 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1922 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1923 	if (ret) {
1924 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1925 			service.name, ret);
1926 		return ret;
1927 	}
1928 
1929 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1930 		&rx_adapter_conf, rx_adapter->conf_arg);
1931 	if (ret) {
1932 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1933 			ret);
1934 		goto err_done;
1935 	}
1936 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1937 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1938 	rx_adapter->service_inited = 1;
1939 	rx_adapter->epd = INIT_FD;
1940 	return 0;
1941 
1942 err_done:
1943 	rte_service_component_unregister(rx_adapter->service_id);
1944 	return ret;
1945 }
1946 
1947 static void
1948 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1949 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1950 		 uint8_t add)
1951 {
1952 	struct eth_rx_queue_info *queue_info;
1953 	int enabled;
1954 	uint16_t i;
1955 
1956 	if (dev_info->rx_queue == NULL)
1957 		return;
1958 
1959 	if (rx_queue_id == -1) {
1960 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1961 			rxa_update_queue(rx_adapter, dev_info, i, add);
1962 	} else {
1963 		queue_info = &dev_info->rx_queue[rx_queue_id];
1964 		enabled = queue_info->queue_enabled;
1965 		if (add) {
1966 			rx_adapter->nb_queues += !enabled;
1967 			dev_info->nb_dev_queues += !enabled;
1968 		} else {
1969 			rx_adapter->nb_queues -= enabled;
1970 			dev_info->nb_dev_queues -= enabled;
1971 		}
1972 		queue_info->queue_enabled = !!add;
1973 	}
1974 }
1975 
1976 static void
1977 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1978 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1979 		    uint16_t port_id)
1980 {
1981 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1982 	struct eth_rx_vector_data *vector_data;
1983 	uint32_t flow_id;
1984 
1985 	vector_data = &queue_info->vector_data;
1986 	vector_data->max_vector_count = vector_count;
1987 	vector_data->port = port_id;
1988 	vector_data->queue = qid;
1989 	vector_data->vector_pool = mp;
1990 	vector_data->vector_timeout_ticks =
1991 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1992 	vector_data->ts = 0;
1993 	flow_id = queue_info->event & 0xFFFFF;
1994 	flow_id =
1995 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1996 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1997 }
1998 
1999 static void
2000 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
2001 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
2002 {
2003 	struct eth_rx_vector_data *vec;
2004 	int pollq;
2005 	int intrq;
2006 	int sintrq;
2007 
2008 	if (rx_adapter->nb_queues == 0)
2009 		return;
2010 
2011 	if (rx_queue_id == -1) {
2012 		uint16_t nb_rx_queues;
2013 		uint16_t i;
2014 
2015 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2016 		for (i = 0; i <	nb_rx_queues; i++)
2017 			rxa_sw_del(rx_adapter, dev_info, i);
2018 		return;
2019 	}
2020 
2021 	/* Push all the partial event vectors to event device. */
2022 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
2023 		if (vec->queue != rx_queue_id)
2024 			continue;
2025 		rxa_vector_expire(vec, rx_adapter);
2026 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
2027 	}
2028 
2029 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2030 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2031 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2032 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
2033 	rx_adapter->num_rx_polled -= pollq;
2034 	dev_info->nb_rx_poll -= pollq;
2035 	rx_adapter->num_rx_intr -= intrq;
2036 	dev_info->nb_rx_intr -= intrq;
2037 	dev_info->nb_shared_intr -= intrq && sintrq;
2038 	if (rx_adapter->use_queue_event_buf) {
2039 		struct eth_event_enqueue_buffer *event_buf =
2040 			dev_info->rx_queue[rx_queue_id].event_buf;
2041 		struct rte_event_eth_rx_adapter_stats *stats =
2042 			dev_info->rx_queue[rx_queue_id].stats;
2043 		rte_free(event_buf->events);
2044 		rte_free(event_buf);
2045 		rte_free(stats);
2046 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
2047 		dev_info->rx_queue[rx_queue_id].stats = NULL;
2048 	}
2049 }
2050 
2051 static int
2052 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
2053 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
2054 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
2055 {
2056 	struct eth_rx_queue_info *queue_info;
2057 	const struct rte_event *ev = &conf->ev;
2058 	int pollq;
2059 	int intrq;
2060 	int sintrq;
2061 	struct rte_event *qi_ev;
2062 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
2063 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
2064 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
2065 	int ret;
2066 
2067 	if (rx_queue_id == -1) {
2068 		uint16_t nb_rx_queues;
2069 		uint16_t i;
2070 
2071 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2072 		for (i = 0; i <	nb_rx_queues; i++) {
2073 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
2074 			if (ret)
2075 				return ret;
2076 		}
2077 		return 0;
2078 	}
2079 
2080 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2081 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2082 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2083 
2084 	queue_info = &dev_info->rx_queue[rx_queue_id];
2085 	queue_info->wt = conf->servicing_weight;
2086 
2087 	qi_ev = (struct rte_event *)&queue_info->event;
2088 	qi_ev->event = ev->event;
2089 	qi_ev->op = RTE_EVENT_OP_NEW;
2090 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2091 
2092 	if (conf->rx_queue_flags &
2093 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2094 		queue_info->flow_id_mask = ~0;
2095 	} else
2096 		qi_ev->flow_id = 0;
2097 
2098 	if (conf->rx_queue_flags &
2099 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2100 		queue_info->ena_vector = 1;
2101 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2102 		rxa_set_vector_data(queue_info, conf->vector_sz,
2103 				    conf->vector_timeout_ns, conf->vector_mp,
2104 				    rx_queue_id, dev_info->dev->data->port_id);
2105 		rx_adapter->ena_vector = 1;
2106 		rx_adapter->vector_tmo_ticks =
2107 			rx_adapter->vector_tmo_ticks ?
2108 				      RTE_MIN(queue_info->vector_data
2109 							.vector_timeout_ticks >>
2110 						1,
2111 					rx_adapter->vector_tmo_ticks) :
2112 				queue_info->vector_data.vector_timeout_ticks >>
2113 					1;
2114 	}
2115 
2116 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2117 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2118 		rx_adapter->num_rx_polled += !pollq;
2119 		dev_info->nb_rx_poll += !pollq;
2120 		rx_adapter->num_rx_intr -= intrq;
2121 		dev_info->nb_rx_intr -= intrq;
2122 		dev_info->nb_shared_intr -= intrq && sintrq;
2123 	}
2124 
2125 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2126 		rx_adapter->num_rx_polled -= pollq;
2127 		dev_info->nb_rx_poll -= pollq;
2128 		rx_adapter->num_rx_intr += !intrq;
2129 		dev_info->nb_rx_intr += !intrq;
2130 		dev_info->nb_shared_intr += !intrq && sintrq;
2131 		if (dev_info->nb_shared_intr == 1) {
2132 			if (dev_info->multi_intr_cap)
2133 				dev_info->next_q_idx =
2134 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2135 			else
2136 				dev_info->next_q_idx = 0;
2137 		}
2138 	}
2139 
2140 	if (!rx_adapter->use_queue_event_buf)
2141 		return 0;
2142 
2143 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2144 				sizeof(*new_rx_buf), 0,
2145 				rte_eth_dev_socket_id(eth_dev_id));
2146 	if (new_rx_buf == NULL) {
2147 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2148 				 "dev_id: %d queue_id: %d",
2149 				 eth_dev_id, rx_queue_id);
2150 		return -ENOMEM;
2151 	}
2152 
2153 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2154 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2155 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2156 				sizeof(struct rte_event) *
2157 				new_rx_buf->events_size, 0,
2158 				rte_eth_dev_socket_id(eth_dev_id));
2159 	if (new_rx_buf->events == NULL) {
2160 		rte_free(new_rx_buf);
2161 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2162 				 "dev_id: %d queue_id: %d",
2163 				 eth_dev_id, rx_queue_id);
2164 		return -ENOMEM;
2165 	}
2166 
2167 	queue_info->event_buf = new_rx_buf;
2168 
2169 	/* Allocate storage for adapter queue stats */
2170 	stats = rte_zmalloc_socket("rx_queue_stats",
2171 				sizeof(*stats), 0,
2172 				rte_eth_dev_socket_id(eth_dev_id));
2173 	if (stats == NULL) {
2174 		rte_free(new_rx_buf->events);
2175 		rte_free(new_rx_buf);
2176 		RTE_EDEV_LOG_ERR("Failed to allocate stats storage for"
2177 				 " dev_id: %d queue_id: %d",
2178 				 eth_dev_id, rx_queue_id);
2179 		return -ENOMEM;
2180 	}
2181 
2182 	queue_info->stats = stats;
2183 
2184 	return 0;
2185 }
2186 
2187 static int
2188 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2189 	   int rx_queue_id,
2190 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2191 {
2192 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2193 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2194 	int ret;
2195 	struct eth_rx_poll_entry *rx_poll;
2196 	struct eth_rx_queue_info *rx_queue;
2197 	uint32_t *rx_wrr;
2198 	uint16_t nb_rx_queues;
2199 	uint32_t nb_rx_poll, nb_wrr;
2200 	uint32_t nb_rx_intr;
2201 	int num_intr_vec;
2202 	uint16_t wt;
2203 
2204 	if (queue_conf->servicing_weight == 0) {
2205 		struct rte_eth_dev_data *data = dev_info->dev->data;
2206 
2207 		temp_conf = *queue_conf;
2208 		if (!data->dev_conf.intr_conf.rxq) {
2209 			/* If Rx interrupts are disabled set wt = 1 */
2210 			temp_conf.servicing_weight = 1;
2211 		}
2212 		queue_conf = &temp_conf;
2213 
2214 		if (queue_conf->servicing_weight == 0 &&
2215 		    rx_adapter->use_queue_event_buf) {
2216 
2217 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2218 					 "not supported for interrupt queues "
2219 					 "dev_id: %d queue_id: %d",
2220 					 eth_dev_id, rx_queue_id);
2221 			return -EINVAL;
2222 		}
2223 	}
2224 
2225 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2226 	rx_queue = dev_info->rx_queue;
2227 	wt = queue_conf->servicing_weight;
2228 
2229 	if (dev_info->rx_queue == NULL) {
2230 		dev_info->rx_queue =
2231 		    rte_zmalloc_socket(rx_adapter->mem_name,
2232 				       nb_rx_queues *
2233 				       sizeof(struct eth_rx_queue_info), 0,
2234 				       rx_adapter->socket_id);
2235 		if (dev_info->rx_queue == NULL)
2236 			return -ENOMEM;
2237 	}
2238 	rx_wrr = NULL;
2239 	rx_poll = NULL;
2240 
2241 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2242 			queue_conf->servicing_weight,
2243 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2244 
2245 	if (dev_info->dev->intr_handle)
2246 		dev_info->multi_intr_cap =
2247 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2248 
2249 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2250 				&rx_poll, &rx_wrr);
2251 	if (ret)
2252 		goto err_free_rxqueue;
2253 
2254 	if (wt == 0) {
2255 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2256 
2257 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2258 		if (ret)
2259 			goto err_free_rxqueue;
2260 
2261 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2262 		if (ret)
2263 			goto err_free_rxqueue;
2264 	} else {
2265 
2266 		num_intr_vec = 0;
2267 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2268 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2269 						rx_queue_id, 0);
2270 			/* interrupt based queues are being converted to
2271 			 * poll mode queues, delete the interrupt configuration
2272 			 * for those.
2273 			 */
2274 			ret = rxa_del_intr_queue(rx_adapter,
2275 						dev_info, rx_queue_id);
2276 			if (ret)
2277 				goto err_free_rxqueue;
2278 		}
2279 	}
2280 
2281 	if (nb_rx_intr == 0) {
2282 		ret = rxa_free_intr_resources(rx_adapter);
2283 		if (ret)
2284 			goto err_free_rxqueue;
2285 	}
2286 
2287 	if (wt == 0) {
2288 		uint16_t i;
2289 
2290 		if (rx_queue_id  == -1) {
2291 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2292 				dev_info->intr_queue[i] = i;
2293 		} else {
2294 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2295 				dev_info->intr_queue[nb_rx_intr - 1] =
2296 					rx_queue_id;
2297 		}
2298 	}
2299 
2300 
2301 
2302 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2303 	if (ret)
2304 		goto err_free_rxqueue;
2305 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2306 
2307 	rte_free(rx_adapter->eth_rx_poll);
2308 	rte_free(rx_adapter->wrr_sched);
2309 
2310 	rx_adapter->eth_rx_poll = rx_poll;
2311 	rx_adapter->wrr_sched = rx_wrr;
2312 	rx_adapter->wrr_len = nb_wrr;
2313 	rx_adapter->num_intr_vec += num_intr_vec;
2314 	return 0;
2315 
2316 err_free_rxqueue:
2317 	if (rx_queue == NULL) {
2318 		rte_free(dev_info->rx_queue);
2319 		dev_info->rx_queue = NULL;
2320 	}
2321 
2322 	rte_free(rx_poll);
2323 	rte_free(rx_wrr);
2324 
2325 	return ret;
2326 }
2327 
2328 static int
2329 rxa_ctrl(uint8_t id, int start)
2330 {
2331 	struct event_eth_rx_adapter *rx_adapter;
2332 	struct rte_eventdev *dev;
2333 	struct eth_device_info *dev_info;
2334 	uint32_t i;
2335 	int use_service = 0;
2336 	int stop = !start;
2337 
2338 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2339 	rx_adapter = rxa_id_to_adapter(id);
2340 	if (rx_adapter == NULL)
2341 		return -EINVAL;
2342 
2343 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2344 
2345 	RTE_ETH_FOREACH_DEV(i) {
2346 		dev_info = &rx_adapter->eth_devices[i];
2347 		/* if start  check for num dev queues */
2348 		if (start && !dev_info->nb_dev_queues)
2349 			continue;
2350 		/* if stop check if dev has been started */
2351 		if (stop && !dev_info->dev_rx_started)
2352 			continue;
2353 		use_service |= !dev_info->internal_event_port;
2354 		dev_info->dev_rx_started = start;
2355 		if (dev_info->internal_event_port == 0)
2356 			continue;
2357 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2358 						&rte_eth_devices[i]) :
2359 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2360 						&rte_eth_devices[i]);
2361 	}
2362 
2363 	if (use_service) {
2364 		rte_spinlock_lock(&rx_adapter->rx_lock);
2365 		rx_adapter->rxa_started = start;
2366 		rte_service_runstate_set(rx_adapter->service_id, start);
2367 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2368 	}
2369 
2370 	return 0;
2371 }
2372 
2373 static int
2374 rxa_create(uint8_t id, uint8_t dev_id,
2375 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2376 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2377 	   void *conf_arg)
2378 {
2379 	struct event_eth_rx_adapter *rx_adapter;
2380 	struct eth_event_enqueue_buffer *buf;
2381 	struct rte_event *events;
2382 	int ret;
2383 	int socket_id;
2384 	uint16_t i;
2385 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2386 	const uint8_t default_rss_key[] = {
2387 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2388 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2389 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2390 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2391 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2392 	};
2393 
2394 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2395 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2396 
2397 	if (conf_cb == NULL)
2398 		return -EINVAL;
2399 
2400 	if (event_eth_rx_adapter == NULL) {
2401 		ret = rte_event_eth_rx_adapter_init();
2402 		if (ret)
2403 			return ret;
2404 	}
2405 
2406 	rx_adapter = rxa_id_to_adapter(id);
2407 	if (rx_adapter != NULL) {
2408 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2409 		return -EEXIST;
2410 	}
2411 
2412 	socket_id = rte_event_dev_socket_id(dev_id);
2413 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2414 		"rte_event_eth_rx_adapter_%d",
2415 		id);
2416 
2417 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2418 			RTE_CACHE_LINE_SIZE, socket_id);
2419 	if (rx_adapter == NULL) {
2420 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2421 		return -ENOMEM;
2422 	}
2423 
2424 	rx_adapter->eventdev_id = dev_id;
2425 	rx_adapter->socket_id = socket_id;
2426 	rx_adapter->conf_cb = conf_cb;
2427 	rx_adapter->conf_arg = conf_arg;
2428 	rx_adapter->id = id;
2429 	TAILQ_INIT(&rx_adapter->vector_list);
2430 	strcpy(rx_adapter->mem_name, mem_name);
2431 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2432 					RTE_MAX_ETHPORTS *
2433 					sizeof(struct eth_device_info), 0,
2434 					socket_id);
2435 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2436 			(uint32_t *)rx_adapter->rss_key_be,
2437 			    RTE_DIM(default_rss_key));
2438 
2439 	if (rx_adapter->eth_devices == NULL) {
2440 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2441 		rte_free(rx_adapter);
2442 		return -ENOMEM;
2443 	}
2444 
2445 	rte_spinlock_init(&rx_adapter->rx_lock);
2446 
2447 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2448 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2449 
2450 	/* Rx adapter event buffer allocation */
2451 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2452 
2453 	if (!rx_adapter->use_queue_event_buf) {
2454 		buf = &rx_adapter->event_enqueue_buffer;
2455 		buf->events_size = rxa_params->event_buf_size;
2456 
2457 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2458 					    buf->events_size * sizeof(*events),
2459 					    0, socket_id);
2460 		if (events == NULL) {
2461 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2462 					 "for adapter event buffer");
2463 			rte_free(rx_adapter->eth_devices);
2464 			rte_free(rx_adapter);
2465 			return -ENOMEM;
2466 		}
2467 
2468 		rx_adapter->event_enqueue_buffer.events = events;
2469 	}
2470 
2471 	event_eth_rx_adapter[id] = rx_adapter;
2472 
2473 	if (conf_cb == rxa_default_conf_cb)
2474 		rx_adapter->default_cb_arg = 1;
2475 
2476 	if (rte_mbuf_dyn_rx_timestamp_register(
2477 			&event_eth_rx_timestamp_dynfield_offset,
2478 			&event_eth_rx_timestamp_dynflag) != 0) {
2479 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2480 		return -rte_errno;
2481 	}
2482 
2483 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2484 		conf_arg);
2485 	return 0;
2486 }
2487 
2488 int
2489 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2490 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2491 				void *conf_arg)
2492 {
2493 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2494 
2495 	/* use default values for adapter params */
2496 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2497 	rxa_params.use_queue_event_buf = false;
2498 
2499 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2500 }
2501 
2502 int
2503 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2504 			struct rte_event_port_conf *port_config,
2505 			struct rte_event_eth_rx_adapter_params *rxa_params)
2506 {
2507 	struct rte_event_port_conf *pc;
2508 	int ret;
2509 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2510 
2511 	if (port_config == NULL)
2512 		return -EINVAL;
2513 
2514 	if (rxa_params == NULL) {
2515 		/* use default values if rxa_params is NULL */
2516 		rxa_params = &temp_params;
2517 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2518 		rxa_params->use_queue_event_buf = false;
2519 	} else if ((!rxa_params->use_queue_event_buf &&
2520 		    rxa_params->event_buf_size == 0) ||
2521 		   (rxa_params->use_queue_event_buf &&
2522 		    rxa_params->event_buf_size != 0)) {
2523 		RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2524 		return -EINVAL;
2525 	} else if (!rxa_params->use_queue_event_buf) {
2526 		/* adjust event buff size with BATCH_SIZE used for fetching
2527 		 * packets from NIC rx queues to get full buffer utilization
2528 		 * and prevent unnecessary rollovers.
2529 		 */
2530 
2531 		rxa_params->event_buf_size =
2532 			RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2533 		rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2534 	}
2535 
2536 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2537 	if (pc == NULL)
2538 		return -ENOMEM;
2539 
2540 	*pc = *port_config;
2541 
2542 	ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2543 	if (ret)
2544 		rte_free(pc);
2545 
2546 	rte_eventdev_trace_eth_rx_adapter_create_with_params(id, dev_id,
2547 		port_config, rxa_params, ret);
2548 
2549 	return ret;
2550 }
2551 
2552 int
2553 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2554 		struct rte_event_port_conf *port_config)
2555 {
2556 	struct rte_event_port_conf *pc;
2557 	int ret;
2558 
2559 	if (port_config == NULL)
2560 		return -EINVAL;
2561 
2562 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2563 
2564 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2565 	if (pc == NULL)
2566 		return -ENOMEM;
2567 	*pc = *port_config;
2568 
2569 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2570 					rxa_default_conf_cb,
2571 					pc);
2572 	if (ret)
2573 		rte_free(pc);
2574 	return ret;
2575 }
2576 
2577 int
2578 rte_event_eth_rx_adapter_free(uint8_t id)
2579 {
2580 	struct event_eth_rx_adapter *rx_adapter;
2581 
2582 	if (rxa_memzone_lookup())
2583 		return -ENOMEM;
2584 
2585 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2586 
2587 	rx_adapter = rxa_id_to_adapter(id);
2588 	if (rx_adapter == NULL)
2589 		return -EINVAL;
2590 
2591 	if (rx_adapter->nb_queues) {
2592 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2593 				rx_adapter->nb_queues);
2594 		return -EBUSY;
2595 	}
2596 
2597 	if (rx_adapter->default_cb_arg)
2598 		rte_free(rx_adapter->conf_arg);
2599 	rte_free(rx_adapter->eth_devices);
2600 	if (!rx_adapter->use_queue_event_buf)
2601 		rte_free(rx_adapter->event_enqueue_buffer.events);
2602 	rte_free(rx_adapter);
2603 	event_eth_rx_adapter[id] = NULL;
2604 
2605 	rte_eventdev_trace_eth_rx_adapter_free(id);
2606 	return 0;
2607 }
2608 
2609 int
2610 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2611 		uint16_t eth_dev_id,
2612 		int32_t rx_queue_id,
2613 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2614 {
2615 	int ret;
2616 	uint32_t cap;
2617 	struct event_eth_rx_adapter *rx_adapter;
2618 	struct rte_eventdev *dev;
2619 	struct eth_device_info *dev_info;
2620 	struct rte_event_eth_rx_adapter_vector_limits limits;
2621 
2622 	if (rxa_memzone_lookup())
2623 		return -ENOMEM;
2624 
2625 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2626 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2627 
2628 	rx_adapter = rxa_id_to_adapter(id);
2629 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2630 		return -EINVAL;
2631 
2632 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2633 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2634 						eth_dev_id,
2635 						&cap);
2636 	if (ret) {
2637 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2638 			"eth port %" PRIu16, id, eth_dev_id);
2639 		return ret;
2640 	}
2641 
2642 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2643 		&& (queue_conf->rx_queue_flags &
2644 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2645 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2646 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2647 				eth_dev_id, id);
2648 		return -EINVAL;
2649 	}
2650 
2651 	if (queue_conf->rx_queue_flags &
2652 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2653 
2654 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2655 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2656 					 " eth port: %" PRIu16
2657 					 " adapter id: %" PRIu8,
2658 					 eth_dev_id, id);
2659 			return -EINVAL;
2660 		}
2661 
2662 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2663 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2664 		if (ret < 0) {
2665 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2666 					 " eth port: %" PRIu16
2667 					 " adapter id: %" PRIu8,
2668 					 eth_dev_id, id);
2669 			return -EINVAL;
2670 		}
2671 		if (queue_conf->vector_sz < limits.min_sz ||
2672 		    queue_conf->vector_sz > limits.max_sz ||
2673 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2674 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2675 		    queue_conf->vector_mp == NULL) {
2676 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2677 					 " eth port: %" PRIu16
2678 					 " adapter id: %" PRIu8,
2679 					 eth_dev_id, id);
2680 			return -EINVAL;
2681 		}
2682 		if (queue_conf->vector_mp->elt_size <
2683 		    (sizeof(struct rte_event_vector) +
2684 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2685 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2686 					 " eth port: %" PRIu16
2687 					 " adapter id: %" PRIu8,
2688 					 eth_dev_id, id);
2689 			return -EINVAL;
2690 		}
2691 	}
2692 
2693 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2694 		(rx_queue_id != -1)) {
2695 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2696 			"event queue, eth port: %" PRIu16 " adapter id: %"
2697 			PRIu8, eth_dev_id, id);
2698 		return -EINVAL;
2699 	}
2700 
2701 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2702 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2703 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2704 			 (uint16_t)rx_queue_id);
2705 		return -EINVAL;
2706 	}
2707 
2708 	if ((rx_adapter->use_queue_event_buf &&
2709 	     queue_conf->event_buf_size == 0) ||
2710 	    (!rx_adapter->use_queue_event_buf &&
2711 	     queue_conf->event_buf_size != 0)) {
2712 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2713 		return -EINVAL;
2714 	}
2715 
2716 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2717 
2718 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2719 		if (*dev->dev_ops->eth_rx_adapter_queue_add == NULL)
2720 			return -ENOTSUP;
2721 		if (dev_info->rx_queue == NULL) {
2722 			dev_info->rx_queue =
2723 			    rte_zmalloc_socket(rx_adapter->mem_name,
2724 					dev_info->dev->data->nb_rx_queues *
2725 					sizeof(struct eth_rx_queue_info), 0,
2726 					rx_adapter->socket_id);
2727 			if (dev_info->rx_queue == NULL)
2728 				return -ENOMEM;
2729 		}
2730 
2731 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2732 				&rte_eth_devices[eth_dev_id],
2733 				rx_queue_id, queue_conf);
2734 		if (ret == 0) {
2735 			dev_info->internal_event_port = 1;
2736 			rxa_update_queue(rx_adapter,
2737 					&rx_adapter->eth_devices[eth_dev_id],
2738 					rx_queue_id,
2739 					1);
2740 		}
2741 	} else {
2742 		rte_spinlock_lock(&rx_adapter->rx_lock);
2743 		dev_info->internal_event_port = 0;
2744 		ret = rxa_init_service(rx_adapter, id);
2745 		if (ret == 0) {
2746 			uint32_t service_id = rx_adapter->service_id;
2747 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2748 					queue_conf);
2749 			rte_service_component_runstate_set(service_id,
2750 				rxa_sw_adapter_queue_count(rx_adapter));
2751 		}
2752 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2753 	}
2754 
2755 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2756 		rx_queue_id, queue_conf, ret);
2757 	if (ret)
2758 		return ret;
2759 
2760 	return 0;
2761 }
2762 
2763 static int
2764 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2765 {
2766 	limits->max_sz = MAX_VECTOR_SIZE;
2767 	limits->min_sz = MIN_VECTOR_SIZE;
2768 	limits->max_timeout_ns = MAX_VECTOR_NS;
2769 	limits->min_timeout_ns = MIN_VECTOR_NS;
2770 
2771 	return 0;
2772 }
2773 
2774 int
2775 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2776 				int32_t rx_queue_id)
2777 {
2778 	int ret = 0;
2779 	struct rte_eventdev *dev;
2780 	struct event_eth_rx_adapter *rx_adapter;
2781 	struct eth_device_info *dev_info;
2782 	uint32_t cap;
2783 	uint32_t nb_rx_poll = 0;
2784 	uint32_t nb_wrr = 0;
2785 	uint32_t nb_rx_intr;
2786 	struct eth_rx_poll_entry *rx_poll = NULL;
2787 	uint32_t *rx_wrr = NULL;
2788 	int num_intr_vec;
2789 
2790 	if (rxa_memzone_lookup())
2791 		return -ENOMEM;
2792 
2793 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2794 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2795 
2796 	rx_adapter = rxa_id_to_adapter(id);
2797 	if (rx_adapter == NULL)
2798 		return -EINVAL;
2799 
2800 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2801 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2802 						eth_dev_id,
2803 						&cap);
2804 	if (ret)
2805 		return ret;
2806 
2807 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2808 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2809 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2810 			 (uint16_t)rx_queue_id);
2811 		return -EINVAL;
2812 	}
2813 
2814 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2815 
2816 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2817 		if (*dev->dev_ops->eth_rx_adapter_queue_del == NULL)
2818 			return -ENOTSUP;
2819 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2820 						&rte_eth_devices[eth_dev_id],
2821 						rx_queue_id);
2822 		if (ret == 0) {
2823 			rxa_update_queue(rx_adapter,
2824 					&rx_adapter->eth_devices[eth_dev_id],
2825 					rx_queue_id,
2826 					0);
2827 			if (dev_info->nb_dev_queues == 0) {
2828 				rte_free(dev_info->rx_queue);
2829 				dev_info->rx_queue = NULL;
2830 			}
2831 		}
2832 	} else {
2833 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2834 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2835 
2836 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2837 			&rx_poll, &rx_wrr);
2838 		if (ret)
2839 			return ret;
2840 
2841 		rte_spinlock_lock(&rx_adapter->rx_lock);
2842 
2843 		num_intr_vec = 0;
2844 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2845 
2846 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2847 						rx_queue_id, 0);
2848 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2849 					rx_queue_id);
2850 			if (ret)
2851 				goto unlock_ret;
2852 		}
2853 
2854 		if (nb_rx_intr == 0) {
2855 			ret = rxa_free_intr_resources(rx_adapter);
2856 			if (ret)
2857 				goto unlock_ret;
2858 		}
2859 
2860 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2861 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2862 
2863 		rte_free(rx_adapter->eth_rx_poll);
2864 		rte_free(rx_adapter->wrr_sched);
2865 
2866 		if (nb_rx_intr == 0) {
2867 			rte_free(dev_info->intr_queue);
2868 			dev_info->intr_queue = NULL;
2869 		}
2870 
2871 		rx_adapter->eth_rx_poll = rx_poll;
2872 		rx_adapter->wrr_sched = rx_wrr;
2873 		rx_adapter->wrr_len = nb_wrr;
2874 		/*
2875 		 * reset next poll start position (wrr_pos) to avoid buffer
2876 		 * overrun when wrr_len is reduced in case of queue delete
2877 		 */
2878 		rx_adapter->wrr_pos = 0;
2879 		rx_adapter->num_intr_vec += num_intr_vec;
2880 
2881 		if (dev_info->nb_dev_queues == 0) {
2882 			rte_free(dev_info->rx_queue);
2883 			dev_info->rx_queue = NULL;
2884 		}
2885 unlock_ret:
2886 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2887 		if (ret) {
2888 			rte_free(rx_poll);
2889 			rte_free(rx_wrr);
2890 			return ret;
2891 		}
2892 
2893 		rte_service_component_runstate_set(rx_adapter->service_id,
2894 				rxa_sw_adapter_queue_count(rx_adapter));
2895 	}
2896 
2897 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2898 		rx_queue_id, ret);
2899 
2900 	return ret;
2901 }
2902 
2903 int
2904 rte_event_eth_rx_adapter_vector_limits_get(
2905 	uint8_t dev_id, uint16_t eth_port_id,
2906 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2907 {
2908 	struct rte_eventdev *dev;
2909 	uint32_t cap;
2910 	int ret;
2911 
2912 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2913 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2914 
2915 	if (limits == NULL)
2916 		return -EINVAL;
2917 
2918 	dev = &rte_eventdevs[dev_id];
2919 
2920 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2921 	if (ret) {
2922 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2923 				 "eth port %" PRIu16,
2924 				 dev_id, eth_port_id);
2925 		return ret;
2926 	}
2927 
2928 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2929 		if (*dev->dev_ops->eth_rx_adapter_vector_limits_get == NULL)
2930 			return -ENOTSUP;
2931 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2932 			dev, &rte_eth_devices[eth_port_id], limits);
2933 	} else {
2934 		ret = rxa_sw_vector_limits(limits);
2935 	}
2936 
2937 	rte_eventdev_trace_eth_rx_adapter_vector_limits_get(dev_id, eth_port_id,
2938 		limits->min_sz, limits->max_sz, limits->log2_sz,
2939 		limits->min_timeout_ns, limits->max_timeout_ns, ret);
2940 	return ret;
2941 }
2942 
2943 int
2944 rte_event_eth_rx_adapter_start(uint8_t id)
2945 {
2946 	rte_eventdev_trace_eth_rx_adapter_start(id);
2947 	return rxa_ctrl(id, 1);
2948 }
2949 
2950 int
2951 rte_event_eth_rx_adapter_stop(uint8_t id)
2952 {
2953 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2954 	return rxa_ctrl(id, 0);
2955 }
2956 
2957 static inline void
2958 rxa_queue_stats_reset(struct eth_rx_queue_info *queue_info)
2959 {
2960 	struct rte_event_eth_rx_adapter_stats *q_stats;
2961 
2962 	q_stats = queue_info->stats;
2963 	memset(q_stats, 0, sizeof(*q_stats));
2964 }
2965 
2966 int
2967 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2968 			       struct rte_event_eth_rx_adapter_stats *stats)
2969 {
2970 	struct event_eth_rx_adapter *rx_adapter;
2971 	struct eth_event_enqueue_buffer *buf;
2972 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2973 	struct rte_event_eth_rx_adapter_stats dev_stats;
2974 	struct rte_eventdev *dev;
2975 	struct eth_device_info *dev_info;
2976 	struct eth_rx_queue_info *queue_info;
2977 	struct rte_event_eth_rx_adapter_stats *q_stats;
2978 	uint32_t i, j;
2979 	int ret;
2980 
2981 	rte_eventdev_trace_eth_rx_adapter_stats_get(id, stats);
2982 
2983 	if (rxa_memzone_lookup())
2984 		return -ENOMEM;
2985 
2986 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2987 
2988 	rx_adapter = rxa_id_to_adapter(id);
2989 	if (rx_adapter  == NULL || stats == NULL)
2990 		return -EINVAL;
2991 
2992 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2993 	memset(stats, 0, sizeof(*stats));
2994 
2995 	if (rx_adapter->service_inited)
2996 		*stats = rx_adapter->stats;
2997 
2998 	RTE_ETH_FOREACH_DEV(i) {
2999 		dev_info = &rx_adapter->eth_devices[i];
3000 
3001 		if (rx_adapter->use_queue_event_buf && dev_info->rx_queue) {
3002 
3003 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3004 			     j++) {
3005 				queue_info = &dev_info->rx_queue[j];
3006 				if (!queue_info->queue_enabled)
3007 					continue;
3008 				q_stats = queue_info->stats;
3009 
3010 				stats->rx_packets += q_stats->rx_packets;
3011 				stats->rx_poll_count += q_stats->rx_poll_count;
3012 				stats->rx_enq_count += q_stats->rx_enq_count;
3013 				stats->rx_enq_retry += q_stats->rx_enq_retry;
3014 				stats->rx_dropped += q_stats->rx_dropped;
3015 				stats->rx_enq_block_cycles +=
3016 						q_stats->rx_enq_block_cycles;
3017 			}
3018 		}
3019 
3020 		if (dev_info->internal_event_port == 0 ||
3021 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
3022 			continue;
3023 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
3024 						&rte_eth_devices[i],
3025 						&dev_stats);
3026 		if (ret)
3027 			continue;
3028 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
3029 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
3030 	}
3031 
3032 	buf = &rx_adapter->event_enqueue_buffer;
3033 	stats->rx_packets += dev_stats_sum.rx_packets;
3034 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
3035 	stats->rx_event_buf_count = buf->count;
3036 	stats->rx_event_buf_size = buf->events_size;
3037 
3038 	return 0;
3039 }
3040 
3041 int
3042 rte_event_eth_rx_adapter_queue_stats_get(uint8_t id,
3043 		uint16_t eth_dev_id,
3044 		uint16_t rx_queue_id,
3045 		struct rte_event_eth_rx_adapter_queue_stats *stats)
3046 {
3047 	struct event_eth_rx_adapter *rx_adapter;
3048 	struct eth_device_info *dev_info;
3049 	struct eth_rx_queue_info *queue_info;
3050 	struct eth_event_enqueue_buffer *event_buf;
3051 	struct rte_event_eth_rx_adapter_stats *q_stats;
3052 	struct rte_eventdev *dev;
3053 
3054 	rte_eventdev_trace_eth_rx_adapter_queue_stats_get(id, eth_dev_id,
3055 							  rx_queue_id, stats);
3056 
3057 	if (rxa_memzone_lookup())
3058 		return -ENOMEM;
3059 
3060 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3061 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3062 
3063 	rx_adapter = rxa_id_to_adapter(id);
3064 
3065 	if (rx_adapter == NULL || stats == NULL)
3066 		return -EINVAL;
3067 
3068 	if (!rx_adapter->use_queue_event_buf)
3069 		return -EINVAL;
3070 
3071 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3072 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3073 		return -EINVAL;
3074 	}
3075 
3076 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3077 	if (dev_info->rx_queue == NULL ||
3078 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3079 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3080 		return -EINVAL;
3081 	}
3082 
3083 	if (dev_info->internal_event_port == 0) {
3084 		queue_info = &dev_info->rx_queue[rx_queue_id];
3085 		event_buf = queue_info->event_buf;
3086 		q_stats = queue_info->stats;
3087 
3088 		stats->rx_event_buf_count = event_buf->count;
3089 		stats->rx_event_buf_size = event_buf->events_size;
3090 		stats->rx_packets = q_stats->rx_packets;
3091 		stats->rx_poll_count = q_stats->rx_poll_count;
3092 		stats->rx_dropped = q_stats->rx_dropped;
3093 	}
3094 
3095 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3096 	if (dev->dev_ops->eth_rx_adapter_queue_stats_get != NULL) {
3097 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_get)(dev,
3098 						&rte_eth_devices[eth_dev_id],
3099 						rx_queue_id, stats);
3100 	}
3101 
3102 	return 0;
3103 }
3104 
3105 int
3106 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
3107 {
3108 	struct event_eth_rx_adapter *rx_adapter;
3109 	struct rte_eventdev *dev;
3110 	struct eth_device_info *dev_info;
3111 	struct eth_rx_queue_info *queue_info;
3112 	uint32_t i, j;
3113 
3114 	rte_eventdev_trace_eth_rx_adapter_stats_reset(id);
3115 
3116 	if (rxa_memzone_lookup())
3117 		return -ENOMEM;
3118 
3119 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3120 
3121 	rx_adapter = rxa_id_to_adapter(id);
3122 	if (rx_adapter == NULL)
3123 		return -EINVAL;
3124 
3125 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3126 
3127 	RTE_ETH_FOREACH_DEV(i) {
3128 		dev_info = &rx_adapter->eth_devices[i];
3129 
3130 		if (rx_adapter->use_queue_event_buf  && dev_info->rx_queue) {
3131 
3132 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3133 						j++) {
3134 				queue_info = &dev_info->rx_queue[j];
3135 				if (!queue_info->queue_enabled)
3136 					continue;
3137 				rxa_queue_stats_reset(queue_info);
3138 			}
3139 		}
3140 
3141 		if (dev_info->internal_event_port == 0 ||
3142 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
3143 			continue;
3144 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
3145 							&rte_eth_devices[i]);
3146 	}
3147 
3148 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
3149 
3150 	return 0;
3151 }
3152 
3153 int
3154 rte_event_eth_rx_adapter_queue_stats_reset(uint8_t id,
3155 		uint16_t eth_dev_id,
3156 		uint16_t rx_queue_id)
3157 {
3158 	struct event_eth_rx_adapter *rx_adapter;
3159 	struct eth_device_info *dev_info;
3160 	struct eth_rx_queue_info *queue_info;
3161 	struct rte_eventdev *dev;
3162 
3163 	rte_eventdev_trace_eth_rx_adapter_queue_stats_reset(id, eth_dev_id,
3164 							    rx_queue_id);
3165 
3166 	if (rxa_memzone_lookup())
3167 		return -ENOMEM;
3168 
3169 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3170 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3171 
3172 	rx_adapter = rxa_id_to_adapter(id);
3173 	if (rx_adapter == NULL)
3174 		return -EINVAL;
3175 
3176 	if (!rx_adapter->use_queue_event_buf)
3177 		return -EINVAL;
3178 
3179 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3180 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3181 		return -EINVAL;
3182 	}
3183 
3184 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3185 
3186 	if (dev_info->rx_queue == NULL ||
3187 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3188 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3189 		return -EINVAL;
3190 	}
3191 
3192 	if (dev_info->internal_event_port == 0) {
3193 		queue_info = &dev_info->rx_queue[rx_queue_id];
3194 		rxa_queue_stats_reset(queue_info);
3195 	}
3196 
3197 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3198 	if (dev->dev_ops->eth_rx_adapter_queue_stats_reset != NULL) {
3199 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_reset)(dev,
3200 						&rte_eth_devices[eth_dev_id],
3201 						rx_queue_id);
3202 	}
3203 
3204 	return 0;
3205 }
3206 
3207 int
3208 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
3209 {
3210 	struct event_eth_rx_adapter *rx_adapter;
3211 
3212 	if (rxa_memzone_lookup())
3213 		return -ENOMEM;
3214 
3215 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3216 
3217 	rx_adapter = rxa_id_to_adapter(id);
3218 	if (rx_adapter == NULL || service_id == NULL)
3219 		return -EINVAL;
3220 
3221 	if (rx_adapter->service_inited)
3222 		*service_id = rx_adapter->service_id;
3223 
3224 	rte_eventdev_trace_eth_rx_adapter_service_id_get(id, *service_id);
3225 
3226 	return rx_adapter->service_inited ? 0 : -ESRCH;
3227 }
3228 
3229 int
3230 rte_event_eth_rx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
3231 {
3232 	struct event_eth_rx_adapter *rx_adapter;
3233 
3234 	if (rxa_memzone_lookup())
3235 		return -ENOMEM;
3236 
3237 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3238 
3239 	rx_adapter = rxa_id_to_adapter(id);
3240 	if (rx_adapter == NULL || event_port_id == NULL)
3241 		return -EINVAL;
3242 
3243 	if (rx_adapter->service_inited)
3244 		*event_port_id = rx_adapter->event_port_id;
3245 
3246 	rte_eventdev_trace_eth_rx_adapter_event_port_get(id, *event_port_id);
3247 
3248 	return rx_adapter->service_inited ? 0 : -ESRCH;
3249 }
3250 
3251 int
3252 rte_event_eth_rx_adapter_cb_register(uint8_t id,
3253 					uint16_t eth_dev_id,
3254 					rte_event_eth_rx_adapter_cb_fn cb_fn,
3255 					void *cb_arg)
3256 {
3257 	struct event_eth_rx_adapter *rx_adapter;
3258 	struct eth_device_info *dev_info;
3259 	uint32_t cap;
3260 	int ret;
3261 
3262 	rte_eventdev_trace_eth_rx_adapter_cb_register(id, eth_dev_id, cb_fn,
3263 						      cb_arg);
3264 
3265 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3266 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3267 
3268 	rx_adapter = rxa_id_to_adapter(id);
3269 	if (rx_adapter == NULL)
3270 		return -EINVAL;
3271 
3272 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3273 	if (dev_info->rx_queue == NULL)
3274 		return -EINVAL;
3275 
3276 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3277 						eth_dev_id,
3278 						&cap);
3279 	if (ret) {
3280 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3281 			"eth port %" PRIu16, id, eth_dev_id);
3282 		return ret;
3283 	}
3284 
3285 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3286 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
3287 				PRIu16, eth_dev_id);
3288 		return -EINVAL;
3289 	}
3290 
3291 	rte_spinlock_lock(&rx_adapter->rx_lock);
3292 	dev_info->cb_fn = cb_fn;
3293 	dev_info->cb_arg = cb_arg;
3294 	rte_spinlock_unlock(&rx_adapter->rx_lock);
3295 
3296 	return 0;
3297 }
3298 
3299 int
3300 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
3301 			uint16_t eth_dev_id,
3302 			uint16_t rx_queue_id,
3303 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
3304 {
3305 #define TICK2NSEC(_ticks, _freq) (((_ticks) * (1E9)) / (_freq))
3306 	struct rte_eventdev *dev;
3307 	struct event_eth_rx_adapter *rx_adapter;
3308 	struct eth_device_info *dev_info;
3309 	struct eth_rx_queue_info *queue_info;
3310 	int ret;
3311 
3312 	rte_eventdev_trace_eth_rx_adapter_queue_conf_get(id, eth_dev_id,
3313 							 rx_queue_id, queue_conf);
3314 
3315 	if (rxa_memzone_lookup())
3316 		return -ENOMEM;
3317 
3318 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3319 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3320 
3321 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3322 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3323 		return -EINVAL;
3324 	}
3325 
3326 	if (queue_conf == NULL) {
3327 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3328 		return -EINVAL;
3329 	}
3330 
3331 	rx_adapter = rxa_id_to_adapter(id);
3332 	if (rx_adapter == NULL)
3333 		return -EINVAL;
3334 
3335 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3336 	if (dev_info->rx_queue == NULL ||
3337 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3338 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3339 		return -EINVAL;
3340 	}
3341 
3342 	queue_info = &dev_info->rx_queue[rx_queue_id];
3343 
3344 	memset(queue_conf, 0, sizeof(*queue_conf));
3345 	queue_conf->rx_queue_flags = 0;
3346 	if (queue_info->flow_id_mask != 0)
3347 		queue_conf->rx_queue_flags |=
3348 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3349 	queue_conf->servicing_weight = queue_info->wt;
3350 
3351 	queue_conf->ev.event = queue_info->event;
3352 
3353 	queue_conf->vector_sz = queue_info->vector_data.max_vector_count;
3354 	queue_conf->vector_mp = queue_info->vector_data.vector_pool;
3355 	/* need to be converted from ticks to ns */
3356 	queue_conf->vector_timeout_ns = TICK2NSEC(
3357 		queue_info->vector_data.vector_timeout_ticks, rte_get_timer_hz());
3358 
3359 	if (queue_info->event_buf != NULL)
3360 		queue_conf->event_buf_size = queue_info->event_buf->events_size;
3361 	else
3362 		queue_conf->event_buf_size = 0;
3363 
3364 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3365 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3366 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3367 						&rte_eth_devices[eth_dev_id],
3368 						rx_queue_id,
3369 						queue_conf);
3370 		return ret;
3371 	}
3372 
3373 	return 0;
3374 }
3375 
3376 static int
3377 rxa_is_queue_added(struct event_eth_rx_adapter *rx_adapter,
3378 		   uint16_t eth_dev_id,
3379 		   uint16_t rx_queue_id)
3380 {
3381 	struct eth_device_info *dev_info;
3382 	struct eth_rx_queue_info *queue_info;
3383 
3384 	if (!rx_adapter->eth_devices)
3385 		return 0;
3386 
3387 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3388 	if (!dev_info || !dev_info->rx_queue)
3389 		return 0;
3390 
3391 	queue_info = &dev_info->rx_queue[rx_queue_id];
3392 
3393 	return queue_info && queue_info->queue_enabled;
3394 }
3395 
3396 #define rxa_evdev(rx_adapter) (&rte_eventdevs[(rx_adapter)->eventdev_id])
3397 
3398 #define rxa_dev_instance_get(rx_adapter) \
3399 		rxa_evdev((rx_adapter))->dev_ops->eth_rx_adapter_instance_get
3400 
3401 int
3402 rte_event_eth_rx_adapter_instance_get(uint16_t eth_dev_id,
3403 				      uint16_t rx_queue_id,
3404 				      uint8_t *rxa_inst_id)
3405 {
3406 	uint8_t id;
3407 	int ret = -EINVAL;
3408 	uint32_t caps;
3409 	struct event_eth_rx_adapter *rx_adapter;
3410 
3411 	if (rxa_memzone_lookup())
3412 		return -ENOMEM;
3413 
3414 	if (eth_dev_id >= rte_eth_dev_count_avail()) {
3415 		RTE_EDEV_LOG_ERR("Invalid ethernet port id %u", eth_dev_id);
3416 		return -EINVAL;
3417 	}
3418 
3419 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3420 		RTE_EDEV_LOG_ERR("Invalid Rx queue %u", rx_queue_id);
3421 		return -EINVAL;
3422 	}
3423 
3424 	if (rxa_inst_id == NULL) {
3425 		RTE_EDEV_LOG_ERR("rxa_inst_id cannot be NULL");
3426 		return -EINVAL;
3427 	}
3428 
3429 	/* Iterate through all adapter instances */
3430 	for (id = 0; id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; id++) {
3431 		rx_adapter = rxa_id_to_adapter(id);
3432 		if (!rx_adapter)
3433 			continue;
3434 
3435 		if (rxa_is_queue_added(rx_adapter, eth_dev_id, rx_queue_id)) {
3436 			*rxa_inst_id = rx_adapter->id;
3437 			ret = 0;
3438 		}
3439 
3440 		/* Rx adapter internally mainatains queue information
3441 		 * for both internal port and DPDK service port.
3442 		 * Eventdev PMD callback is called for future proof only and
3443 		 * overrides the above return value if defined.
3444 		 */
3445 		caps = 0;
3446 		if (!rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3447 						      eth_dev_id,
3448 						      &caps)) {
3449 			if (caps & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT &&
3450 			    rxa_dev_instance_get(rx_adapter))
3451 				ret = rxa_dev_instance_get(rx_adapter)(eth_dev_id, rx_queue_id,
3452 								       rxa_inst_id);
3453 		}
3454 
3455 		/* return if entry found */
3456 		if (ret == 0) {
3457 			rte_eventdev_trace_eth_rx_adapter_instance_get(eth_dev_id, rx_queue_id,
3458 								       *rxa_inst_id);
3459 			return ret;
3460 		}
3461 	}
3462 
3463 	return -EINVAL;
3464 }
3465 
3466 static int
3467 rxa_caps_check(struct event_eth_rx_adapter *rxa)
3468 {
3469 	uint16_t eth_dev_id;
3470 	uint32_t caps = 0;
3471 	int ret;
3472 
3473 	if (!rxa->nb_queues)
3474 		return -EINVAL;
3475 
3476 	/* The eth_dev used is always of same type.
3477 	 * Hence eth_dev_id is taken from first entry of poll array.
3478 	 */
3479 	eth_dev_id = rxa->eth_rx_poll[0].eth_dev_id;
3480 	ret = rte_event_eth_rx_adapter_caps_get(rxa->eventdev_id,
3481 						eth_dev_id,
3482 						&caps);
3483 	if (ret) {
3484 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3485 			"eth port %" PRIu16, rxa->eventdev_id, eth_dev_id);
3486 		return ret;
3487 	}
3488 
3489 	if (caps & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)
3490 		return -ENOTSUP;
3491 
3492 	return 0;
3493 }
3494 
3495 int
3496 rte_event_eth_rx_adapter_runtime_params_init(
3497 		struct rte_event_eth_rx_adapter_runtime_params *params)
3498 {
3499 	if (params == NULL)
3500 		return -EINVAL;
3501 
3502 	memset(params, 0, sizeof(struct rte_event_eth_rx_adapter_runtime_params));
3503 	params->max_nb_rx = RXA_NB_RX_WORK_DEFAULT;
3504 
3505 	return 0;
3506 }
3507 
3508 int
3509 rte_event_eth_rx_adapter_runtime_params_set(uint8_t id,
3510 		struct rte_event_eth_rx_adapter_runtime_params *params)
3511 {
3512 	struct event_eth_rx_adapter *rxa;
3513 	int ret;
3514 
3515 	if (params == NULL)
3516 		return -EINVAL;
3517 
3518 	if (rxa_memzone_lookup())
3519 		return -ENOMEM;
3520 
3521 	rxa = rxa_id_to_adapter(id);
3522 	if (rxa == NULL)
3523 		return -EINVAL;
3524 
3525 	ret = rxa_caps_check(rxa);
3526 	if (ret)
3527 		return ret;
3528 
3529 	rte_spinlock_lock(&rxa->rx_lock);
3530 	rxa->max_nb_rx = params->max_nb_rx;
3531 	rte_spinlock_unlock(&rxa->rx_lock);
3532 
3533 	return 0;
3534 }
3535 
3536 int
3537 rte_event_eth_rx_adapter_runtime_params_get(uint8_t id,
3538 		struct rte_event_eth_rx_adapter_runtime_params *params)
3539 {
3540 	struct event_eth_rx_adapter *rxa;
3541 	int ret;
3542 
3543 	if (params == NULL)
3544 		return -EINVAL;
3545 
3546 	if (rxa_memzone_lookup())
3547 		return -ENOMEM;
3548 
3549 	rxa = rxa_id_to_adapter(id);
3550 	if (rxa == NULL)
3551 		return -EINVAL;
3552 
3553 	ret = rxa_caps_check(rxa);
3554 	if (ret)
3555 		return ret;
3556 
3557 	params->max_nb_rx = rxa->max_nb_rx;
3558 
3559 	return 0;
3560 }
3561 
3562 /* RX-adapter telemetry callbacks */
3563 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_uint(d, #s, stats.s)
3564 
3565 static int
3566 handle_rxa_stats(const char *cmd __rte_unused,
3567 		 const char *params,
3568 		 struct rte_tel_data *d)
3569 {
3570 	uint8_t rx_adapter_id;
3571 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3572 
3573 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3574 		return -1;
3575 
3576 	/* Get Rx adapter ID from parameter string */
3577 	rx_adapter_id = atoi(params);
3578 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3579 
3580 	/* Get Rx adapter stats */
3581 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3582 					       &rx_adptr_stats)) {
3583 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats\n");
3584 		return -1;
3585 	}
3586 
3587 	rte_tel_data_start_dict(d);
3588 	rte_tel_data_add_dict_uint(d, "rx_adapter_id", rx_adapter_id);
3589 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3590 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3591 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3592 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3593 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3594 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3595 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3596 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3597 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3598 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3599 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3600 
3601 	return 0;
3602 }
3603 
3604 static int
3605 handle_rxa_stats_reset(const char *cmd __rte_unused,
3606 		       const char *params,
3607 		       struct rte_tel_data *d __rte_unused)
3608 {
3609 	uint8_t rx_adapter_id;
3610 
3611 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3612 		return -1;
3613 
3614 	/* Get Rx adapter ID from parameter string */
3615 	rx_adapter_id = atoi(params);
3616 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3617 
3618 	/* Reset Rx adapter stats */
3619 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3620 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats\n");
3621 		return -1;
3622 	}
3623 
3624 	return 0;
3625 }
3626 
3627 static int
3628 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3629 			  const char *params,
3630 			  struct rte_tel_data *d)
3631 {
3632 	uint8_t rx_adapter_id;
3633 	uint16_t rx_queue_id;
3634 	int eth_dev_id, ret = -1;
3635 	char *token, *l_params;
3636 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3637 
3638 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3639 		return -1;
3640 
3641 	/* Get Rx adapter ID from parameter string */
3642 	l_params = strdup(params);
3643 	if (l_params == NULL)
3644 		return -ENOMEM;
3645 	token = strtok(l_params, ",");
3646 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3647 	rx_adapter_id = strtoul(token, NULL, 10);
3648 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3649 
3650 	token = strtok(NULL, ",");
3651 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3652 
3653 	/* Get device ID from parameter string */
3654 	eth_dev_id = strtoul(token, NULL, 10);
3655 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3656 
3657 	token = strtok(NULL, ",");
3658 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3659 
3660 	/* Get Rx queue ID from parameter string */
3661 	rx_queue_id = strtoul(token, NULL, 10);
3662 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3663 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3664 		ret = -EINVAL;
3665 		goto error;
3666 	}
3667 
3668 	token = strtok(NULL, "\0");
3669 	if (token != NULL)
3670 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3671 				 " telemetry command, ignoring");
3672 	/* Parsing parameter finished */
3673 	free(l_params);
3674 
3675 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3676 						    rx_queue_id, &queue_conf)) {
3677 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3678 		return -1;
3679 	}
3680 
3681 	rte_tel_data_start_dict(d);
3682 	rte_tel_data_add_dict_uint(d, "rx_adapter_id", rx_adapter_id);
3683 	rte_tel_data_add_dict_uint(d, "eth_dev_id", eth_dev_id);
3684 	rte_tel_data_add_dict_uint(d, "rx_queue_id", rx_queue_id);
3685 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3686 	RXA_ADD_DICT(queue_conf, servicing_weight);
3687 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3688 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3689 	RXA_ADD_DICT(queue_conf.ev, priority);
3690 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3691 
3692 	return 0;
3693 
3694 error:
3695 	free(l_params);
3696 	return ret;
3697 }
3698 
3699 static int
3700 handle_rxa_get_queue_stats(const char *cmd __rte_unused,
3701 			   const char *params,
3702 			   struct rte_tel_data *d)
3703 {
3704 	uint8_t rx_adapter_id;
3705 	uint16_t rx_queue_id;
3706 	int eth_dev_id, ret = -1;
3707 	char *token, *l_params;
3708 	struct rte_event_eth_rx_adapter_queue_stats q_stats;
3709 
3710 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3711 		return -1;
3712 
3713 	/* Get Rx adapter ID from parameter string */
3714 	l_params = strdup(params);
3715 	if (l_params == NULL)
3716 		return -ENOMEM;
3717 	token = strtok(l_params, ",");
3718 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3719 	rx_adapter_id = strtoul(token, NULL, 10);
3720 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3721 
3722 	token = strtok(NULL, ",");
3723 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3724 
3725 	/* Get device ID from parameter string */
3726 	eth_dev_id = strtoul(token, NULL, 10);
3727 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3728 
3729 	token = strtok(NULL, ",");
3730 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3731 
3732 	/* Get Rx queue ID from parameter string */
3733 	rx_queue_id = strtoul(token, NULL, 10);
3734 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3735 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3736 		ret = -EINVAL;
3737 		goto error;
3738 	}
3739 
3740 	token = strtok(NULL, "\0");
3741 	if (token != NULL)
3742 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3743 				 " telemetry command, ignoring");
3744 	/* Parsing parameter finished */
3745 	free(l_params);
3746 
3747 	if (rte_event_eth_rx_adapter_queue_stats_get(rx_adapter_id, eth_dev_id,
3748 						    rx_queue_id, &q_stats)) {
3749 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue stats");
3750 		return -1;
3751 	}
3752 
3753 	rte_tel_data_start_dict(d);
3754 	rte_tel_data_add_dict_uint(d, "rx_adapter_id", rx_adapter_id);
3755 	rte_tel_data_add_dict_uint(d, "eth_dev_id", eth_dev_id);
3756 	rte_tel_data_add_dict_uint(d, "rx_queue_id", rx_queue_id);
3757 	RXA_ADD_DICT(q_stats, rx_event_buf_count);
3758 	RXA_ADD_DICT(q_stats, rx_event_buf_size);
3759 	RXA_ADD_DICT(q_stats, rx_poll_count);
3760 	RXA_ADD_DICT(q_stats, rx_packets);
3761 	RXA_ADD_DICT(q_stats, rx_dropped);
3762 
3763 	return 0;
3764 
3765 error:
3766 	free(l_params);
3767 	return ret;
3768 }
3769 
3770 static int
3771 handle_rxa_queue_stats_reset(const char *cmd __rte_unused,
3772 			     const char *params,
3773 			     struct rte_tel_data *d __rte_unused)
3774 {
3775 	uint8_t rx_adapter_id;
3776 	uint16_t rx_queue_id;
3777 	int eth_dev_id, ret = -1;
3778 	char *token, *l_params;
3779 
3780 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3781 		return -1;
3782 
3783 	/* Get Rx adapter ID from parameter string */
3784 	l_params = strdup(params);
3785 	if (l_params == NULL)
3786 		return -ENOMEM;
3787 	token = strtok(l_params, ",");
3788 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3789 	rx_adapter_id = strtoul(token, NULL, 10);
3790 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3791 
3792 	token = strtok(NULL, ",");
3793 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3794 
3795 	/* Get device ID from parameter string */
3796 	eth_dev_id = strtoul(token, NULL, 10);
3797 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3798 
3799 	token = strtok(NULL, ",");
3800 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3801 
3802 	/* Get Rx queue ID from parameter string */
3803 	rx_queue_id = strtoul(token, NULL, 10);
3804 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3805 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3806 		ret = -EINVAL;
3807 		goto error;
3808 	}
3809 
3810 	token = strtok(NULL, "\0");
3811 	if (token != NULL)
3812 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3813 				 " telemetry command, ignoring");
3814 	/* Parsing parameter finished */
3815 	free(l_params);
3816 
3817 	if (rte_event_eth_rx_adapter_queue_stats_reset(rx_adapter_id,
3818 						       eth_dev_id,
3819 						       rx_queue_id)) {
3820 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter queue stats");
3821 		return -1;
3822 	}
3823 
3824 	return 0;
3825 
3826 error:
3827 	free(l_params);
3828 	return ret;
3829 }
3830 
3831 static int
3832 handle_rxa_instance_get(const char *cmd __rte_unused,
3833 			const char *params,
3834 			struct rte_tel_data *d)
3835 {
3836 	uint8_t instance_id;
3837 	uint16_t rx_queue_id;
3838 	int eth_dev_id, ret = -1;
3839 	char *token, *l_params;
3840 
3841 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3842 		return -1;
3843 
3844 	l_params = strdup(params);
3845 	if (l_params == NULL)
3846 		return -ENOMEM;
3847 	token = strtok(l_params, ",");
3848 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3849 
3850 	/* Get device ID from parameter string */
3851 	eth_dev_id = strtoul(token, NULL, 10);
3852 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3853 
3854 	token = strtok(NULL, ",");
3855 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3856 
3857 	/* Get Rx queue ID from parameter string */
3858 	rx_queue_id = strtoul(token, NULL, 10);
3859 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3860 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3861 		ret = -EINVAL;
3862 		goto error;
3863 	}
3864 
3865 	token = strtok(NULL, "\0");
3866 	if (token != NULL)
3867 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3868 				 " telemetry command, ignoring");
3869 
3870 	/* Parsing parameter finished */
3871 	free(l_params);
3872 
3873 	if (rte_event_eth_rx_adapter_instance_get(eth_dev_id,
3874 						  rx_queue_id,
3875 						  &instance_id)) {
3876 		RTE_EDEV_LOG_ERR("Failed to get RX adapter instance ID "
3877 				 " for rx_queue_id = %d", rx_queue_id);
3878 		return -1;
3879 	}
3880 
3881 	rte_tel_data_start_dict(d);
3882 	rte_tel_data_add_dict_uint(d, "eth_dev_id", eth_dev_id);
3883 	rte_tel_data_add_dict_uint(d, "rx_queue_id", rx_queue_id);
3884 	rte_tel_data_add_dict_uint(d, "rxa_instance_id", instance_id);
3885 
3886 	return 0;
3887 
3888 error:
3889 	free(l_params);
3890 	return ret;
3891 }
3892 
3893 RTE_INIT(rxa_init_telemetry)
3894 {
3895 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3896 		handle_rxa_stats,
3897 		"Returns Rx adapter stats. Parameter: rxa_id");
3898 
3899 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3900 		handle_rxa_stats_reset,
3901 		"Reset Rx adapter stats. Parameter: rxa_id");
3902 
3903 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3904 		handle_rxa_get_queue_conf,
3905 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3906 
3907 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats",
3908 		handle_rxa_get_queue_stats,
3909 		"Returns Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3910 
3911 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats_reset",
3912 		handle_rxa_queue_stats_reset,
3913 		"Reset Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3914 
3915 	rte_telemetry_register_cmd("/eventdev/rxa_rxq_instance_get",
3916 		handle_rxa_instance_get,
3917 		"Returns Rx adapter instance id. Parameter: dev_id, queue_id");
3918 }
3919