xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 3a80d7fb2ecdd6e8e48e56e3726b26980fa2a089)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <ctype.h>
6 #include <stdlib.h>
7 #if defined(LINUX)
8 #include <sys/epoll.h>
9 #endif
10 #include <unistd.h>
11 
12 #include <rte_cycles.h>
13 #include <rte_common.h>
14 #include <dev_driver.h>
15 #include <rte_errno.h>
16 #include <ethdev_driver.h>
17 #include <rte_log.h>
18 #include <rte_malloc.h>
19 #include <rte_service_component.h>
20 #include <rte_thash.h>
21 #include <rte_interrupts.h>
22 #include <rte_mbuf_dyn.h>
23 #include <rte_telemetry.h>
24 
25 #include "rte_eventdev.h"
26 #include "eventdev_pmd.h"
27 #include "eventdev_trace.h"
28 #include "rte_event_eth_rx_adapter.h"
29 
30 #define BATCH_SIZE		32
31 #define BLOCK_CNT_THRESHOLD	10
32 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
33 #define MAX_VECTOR_SIZE		1024
34 #define MIN_VECTOR_SIZE		4
35 #define MAX_VECTOR_NS		1E9
36 #define MIN_VECTOR_NS		1E5
37 
38 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
39 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
40 
41 #define RSS_KEY_SIZE	40
42 /* value written to intr thread pipe to signal thread exit */
43 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
44 /* Sentinel value to detect initialized file handle */
45 #define INIT_FD		-1
46 
47 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
48 
49 /*
50  * Used to store port and queue ID of interrupting Rx queue
51  */
52 union queue_data {
53 	RTE_STD_C11
54 	void *ptr;
55 	struct {
56 		uint16_t port;
57 		uint16_t queue;
58 	};
59 };
60 
61 /*
62  * There is an instance of this struct per polled Rx queue added to the
63  * adapter
64  */
65 struct eth_rx_poll_entry {
66 	/* Eth port to poll */
67 	uint16_t eth_dev_id;
68 	/* Eth rx queue to poll */
69 	uint16_t eth_rx_qid;
70 };
71 
72 struct eth_rx_vector_data {
73 	TAILQ_ENTRY(eth_rx_vector_data) next;
74 	uint16_t port;
75 	uint16_t queue;
76 	uint16_t max_vector_count;
77 	uint64_t event;
78 	uint64_t ts;
79 	uint64_t vector_timeout_ticks;
80 	struct rte_mempool *vector_pool;
81 	struct rte_event_vector *vector_ev;
82 } __rte_cache_aligned;
83 
84 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
85 
86 /* Instance per adapter */
87 struct eth_event_enqueue_buffer {
88 	/* Count of events in this buffer */
89 	uint16_t count;
90 	/* Array of events in this buffer */
91 	struct rte_event *events;
92 	/* size of event buffer */
93 	uint16_t events_size;
94 	/* Event enqueue happens from head */
95 	uint16_t head;
96 	/* New packets from rte_eth_rx_burst is enqued from tail */
97 	uint16_t tail;
98 	/* last element in the buffer before rollover */
99 	uint16_t last;
100 	uint16_t last_mask;
101 };
102 
103 struct event_eth_rx_adapter {
104 	/* RSS key */
105 	uint8_t rss_key_be[RSS_KEY_SIZE];
106 	/* Event device identifier */
107 	uint8_t eventdev_id;
108 	/* Event port identifier */
109 	uint8_t event_port_id;
110 	/* Flag indicating per rxq event buffer */
111 	bool use_queue_event_buf;
112 	/* Per ethernet device structure */
113 	struct eth_device_info *eth_devices;
114 	/* Lock to serialize config updates with service function */
115 	rte_spinlock_t rx_lock;
116 	/* Max mbufs processed in any service function invocation */
117 	uint32_t max_nb_rx;
118 	/* Receive queues that need to be polled */
119 	struct eth_rx_poll_entry *eth_rx_poll;
120 	/* Size of the eth_rx_poll array */
121 	uint16_t num_rx_polled;
122 	/* Weighted round robin schedule */
123 	uint32_t *wrr_sched;
124 	/* wrr_sched[] size */
125 	uint32_t wrr_len;
126 	/* Next entry in wrr[] to begin polling */
127 	uint32_t wrr_pos;
128 	/* Event burst buffer */
129 	struct eth_event_enqueue_buffer event_enqueue_buffer;
130 	/* Vector enable flag */
131 	uint8_t ena_vector;
132 	/* Timestamp of previous vector expiry list traversal */
133 	uint64_t prev_expiry_ts;
134 	/* Minimum ticks to wait before traversing expiry list */
135 	uint64_t vector_tmo_ticks;
136 	/* vector list */
137 	struct eth_rx_vector_data_list vector_list;
138 	/* Per adapter stats */
139 	struct rte_event_eth_rx_adapter_stats stats;
140 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
141 	uint16_t enq_block_count;
142 	/* Block start ts */
143 	uint64_t rx_enq_block_start_ts;
144 	/* epoll fd used to wait for Rx interrupts */
145 	int epd;
146 	/* Num of interrupt driven interrupt queues */
147 	uint32_t num_rx_intr;
148 	/* Used to send <dev id, queue id> of interrupting Rx queues from
149 	 * the interrupt thread to the Rx thread
150 	 */
151 	struct rte_ring *intr_ring;
152 	/* Rx Queue data (dev id, queue id) for the last non-empty
153 	 * queue polled
154 	 */
155 	union queue_data qd;
156 	/* queue_data is valid */
157 	int qd_valid;
158 	/* Interrupt ring lock, synchronizes Rx thread
159 	 * and interrupt thread
160 	 */
161 	rte_spinlock_t intr_ring_lock;
162 	/* event array passed to rte_poll_wait */
163 	struct rte_epoll_event *epoll_events;
164 	/* Count of interrupt vectors in use */
165 	uint32_t num_intr_vec;
166 	/* Thread blocked on Rx interrupts */
167 	pthread_t rx_intr_thread;
168 	/* Configuration callback for rte_service configuration */
169 	rte_event_eth_rx_adapter_conf_cb conf_cb;
170 	/* Configuration callback argument */
171 	void *conf_arg;
172 	/* Set if  default_cb is being used */
173 	int default_cb_arg;
174 	/* Service initialization state */
175 	uint8_t service_inited;
176 	/* Total count of Rx queues in adapter */
177 	uint32_t nb_queues;
178 	/* Memory allocation name */
179 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
180 	/* Socket identifier cached from eventdev */
181 	int socket_id;
182 	/* Per adapter EAL service */
183 	uint32_t service_id;
184 	/* Adapter started flag */
185 	uint8_t rxa_started;
186 	/* Adapter ID */
187 	uint8_t id;
188 } __rte_cache_aligned;
189 
190 /* Per eth device */
191 struct eth_device_info {
192 	struct rte_eth_dev *dev;
193 	struct eth_rx_queue_info *rx_queue;
194 	/* Rx callback */
195 	rte_event_eth_rx_adapter_cb_fn cb_fn;
196 	/* Rx callback argument */
197 	void *cb_arg;
198 	/* Set if ethdev->eventdev packet transfer uses a
199 	 * hardware mechanism
200 	 */
201 	uint8_t internal_event_port;
202 	/* Set if the adapter is processing rx queues for
203 	 * this eth device and packet processing has been
204 	 * started, allows for the code to know if the PMD
205 	 * rx_adapter_stop callback needs to be invoked
206 	 */
207 	uint8_t dev_rx_started;
208 	/* Number of queues added for this device */
209 	uint16_t nb_dev_queues;
210 	/* Number of poll based queues
211 	 * If nb_rx_poll > 0, the start callback will
212 	 * be invoked if not already invoked
213 	 */
214 	uint16_t nb_rx_poll;
215 	/* Number of interrupt based queues
216 	 * If nb_rx_intr > 0, the start callback will
217 	 * be invoked if not already invoked.
218 	 */
219 	uint16_t nb_rx_intr;
220 	/* Number of queues that use the shared interrupt */
221 	uint16_t nb_shared_intr;
222 	/* sum(wrr(q)) for all queues within the device
223 	 * useful when deleting all device queues
224 	 */
225 	uint32_t wrr_len;
226 	/* Intr based queue index to start polling from, this is used
227 	 * if the number of shared interrupts is non-zero
228 	 */
229 	uint16_t next_q_idx;
230 	/* Intr based queue indices */
231 	uint16_t *intr_queue;
232 	/* device generates per Rx queue interrupt for queue index
233 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
234 	 */
235 	int multi_intr_cap;
236 	/* shared interrupt enabled */
237 	int shared_intr_enabled;
238 };
239 
240 /* Per Rx queue */
241 struct eth_rx_queue_info {
242 	int queue_enabled;	/* True if added */
243 	int intr_enabled;
244 	uint8_t ena_vector;
245 	uint16_t wt;		/* Polling weight */
246 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
247 	uint64_t event;
248 	struct eth_rx_vector_data vector_data;
249 	struct eth_event_enqueue_buffer *event_buf;
250 	/* use adapter stats struct for queue level stats,
251 	 * as same stats need to be updated for adapter and queue
252 	 */
253 	struct rte_event_eth_rx_adapter_stats *stats;
254 };
255 
256 static struct event_eth_rx_adapter **event_eth_rx_adapter;
257 
258 /* Enable dynamic timestamp field in mbuf */
259 static uint64_t event_eth_rx_timestamp_dynflag;
260 static int event_eth_rx_timestamp_dynfield_offset = -1;
261 
262 static inline rte_mbuf_timestamp_t *
263 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
264 {
265 	return RTE_MBUF_DYNFIELD(mbuf,
266 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
267 }
268 
269 static inline int
270 rxa_validate_id(uint8_t id)
271 {
272 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
273 }
274 
275 static inline struct eth_event_enqueue_buffer *
276 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
277 		  uint16_t rx_queue_id,
278 		  struct rte_event_eth_rx_adapter_stats **stats)
279 {
280 	if (rx_adapter->use_queue_event_buf) {
281 		struct eth_device_info *dev_info =
282 			&rx_adapter->eth_devices[eth_dev_id];
283 		*stats = dev_info->rx_queue[rx_queue_id].stats;
284 		return dev_info->rx_queue[rx_queue_id].event_buf;
285 	} else {
286 		*stats = &rx_adapter->stats;
287 		return &rx_adapter->event_enqueue_buffer;
288 	}
289 }
290 
291 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
292 	if (!rxa_validate_id(id)) { \
293 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
294 		return retval; \
295 	} \
296 } while (0)
297 
298 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(id, retval) do { \
299 	if (!rxa_validate_id(id)) { \
300 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
301 		ret = retval; \
302 		goto error; \
303 	} \
304 } while (0)
305 
306 #define RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, retval) do { \
307 	if ((token) == NULL || strlen(token) == 0 || !isdigit(*token)) { \
308 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter token\n"); \
309 		ret = retval; \
310 		goto error; \
311 	} \
312 } while (0)
313 
314 #define RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(port_id, retval) do { \
315 	if (!rte_eth_dev_is_valid_port(port_id)) { \
316 		RTE_ETHDEV_LOG(ERR, "Invalid port_id=%u\n", port_id); \
317 		ret = retval; \
318 		goto error; \
319 	} \
320 } while (0)
321 
322 static inline int
323 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
324 {
325 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
326 }
327 
328 /* Greatest common divisor */
329 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
330 {
331 	uint16_t r = a % b;
332 
333 	return r ? rxa_gcd_u16(b, r) : b;
334 }
335 
336 /* Returns the next queue in the polling sequence
337  *
338  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
339  */
340 static int
341 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
342 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
343 	     uint16_t gcd, int prev)
344 {
345 	int i = prev;
346 	uint16_t w;
347 
348 	while (1) {
349 		uint16_t q;
350 		uint16_t d;
351 
352 		i = (i + 1) % n;
353 		if (i == 0) {
354 			*cw = *cw - gcd;
355 			if (*cw <= 0)
356 				*cw = max_wt;
357 		}
358 
359 		q = eth_rx_poll[i].eth_rx_qid;
360 		d = eth_rx_poll[i].eth_dev_id;
361 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
362 
363 		if ((int)w >= *cw)
364 			return i;
365 	}
366 }
367 
368 static inline int
369 rxa_shared_intr(struct eth_device_info *dev_info,
370 	int rx_queue_id)
371 {
372 	int multi_intr_cap;
373 
374 	if (dev_info->dev->intr_handle == NULL)
375 		return 0;
376 
377 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
378 	return !multi_intr_cap ||
379 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
380 }
381 
382 static inline int
383 rxa_intr_queue(struct eth_device_info *dev_info,
384 	int rx_queue_id)
385 {
386 	struct eth_rx_queue_info *queue_info;
387 
388 	queue_info = &dev_info->rx_queue[rx_queue_id];
389 	return dev_info->rx_queue &&
390 		!dev_info->internal_event_port &&
391 		queue_info->queue_enabled && queue_info->wt == 0;
392 }
393 
394 static inline int
395 rxa_polled_queue(struct eth_device_info *dev_info,
396 	int rx_queue_id)
397 {
398 	struct eth_rx_queue_info *queue_info;
399 
400 	queue_info = &dev_info->rx_queue[rx_queue_id];
401 	return !dev_info->internal_event_port &&
402 		dev_info->rx_queue &&
403 		queue_info->queue_enabled && queue_info->wt != 0;
404 }
405 
406 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
407 static int
408 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
409 {
410 	uint16_t i;
411 	int n, s;
412 	uint16_t nbq;
413 
414 	nbq = dev_info->dev->data->nb_rx_queues;
415 	n = 0; /* non shared count */
416 	s = 0; /* shared count */
417 
418 	if (rx_queue_id == -1) {
419 		for (i = 0; i < nbq; i++) {
420 			if (!rxa_shared_intr(dev_info, i))
421 				n += add ? !rxa_intr_queue(dev_info, i) :
422 					rxa_intr_queue(dev_info, i);
423 			else
424 				s += add ? !rxa_intr_queue(dev_info, i) :
425 					rxa_intr_queue(dev_info, i);
426 		}
427 
428 		if (s > 0) {
429 			if ((add && dev_info->nb_shared_intr == 0) ||
430 				(!add && dev_info->nb_shared_intr))
431 				n += 1;
432 		}
433 	} else {
434 		if (!rxa_shared_intr(dev_info, rx_queue_id))
435 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
436 				rxa_intr_queue(dev_info, rx_queue_id);
437 		else
438 			n = add ? !dev_info->nb_shared_intr :
439 				dev_info->nb_shared_intr == 1;
440 	}
441 
442 	return add ? n : -n;
443 }
444 
445 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
446  */
447 static void
448 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
449 			  struct eth_device_info *dev_info, int rx_queue_id,
450 			  uint32_t *nb_rx_intr)
451 {
452 	uint32_t intr_diff;
453 
454 	if (rx_queue_id == -1)
455 		intr_diff = dev_info->nb_rx_intr;
456 	else
457 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
458 
459 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
460 }
461 
462 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
463  * interrupt queues could currently be poll mode Rx queues
464  */
465 static void
466 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
467 			  struct eth_device_info *dev_info, int rx_queue_id,
468 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
469 			  uint32_t *nb_wrr)
470 {
471 	uint32_t intr_diff;
472 	uint32_t poll_diff;
473 	uint32_t wrr_len_diff;
474 
475 	if (rx_queue_id == -1) {
476 		intr_diff = dev_info->dev->data->nb_rx_queues -
477 						dev_info->nb_rx_intr;
478 		poll_diff = dev_info->nb_rx_poll;
479 		wrr_len_diff = dev_info->wrr_len;
480 	} else {
481 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
482 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
483 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
484 					0;
485 	}
486 
487 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
488 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
489 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
490 }
491 
492 /* Calculate size of the eth_rx_poll and wrr_sched arrays
493  * after deleting poll mode rx queues
494  */
495 static void
496 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
497 			  struct eth_device_info *dev_info, int rx_queue_id,
498 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
499 {
500 	uint32_t poll_diff;
501 	uint32_t wrr_len_diff;
502 
503 	if (rx_queue_id == -1) {
504 		poll_diff = dev_info->nb_rx_poll;
505 		wrr_len_diff = dev_info->wrr_len;
506 	} else {
507 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
508 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
509 					0;
510 	}
511 
512 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
513 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
514 }
515 
516 /* Calculate nb_rx_* after adding poll mode rx queues
517  */
518 static void
519 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
520 			  struct eth_device_info *dev_info, int rx_queue_id,
521 			  uint16_t wt, uint32_t *nb_rx_poll,
522 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
523 {
524 	uint32_t intr_diff;
525 	uint32_t poll_diff;
526 	uint32_t wrr_len_diff;
527 
528 	if (rx_queue_id == -1) {
529 		intr_diff = dev_info->nb_rx_intr;
530 		poll_diff = dev_info->dev->data->nb_rx_queues -
531 						dev_info->nb_rx_poll;
532 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
533 				- dev_info->wrr_len;
534 	} else {
535 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
536 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
537 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
538 				wt - dev_info->rx_queue[rx_queue_id].wt :
539 				wt;
540 	}
541 
542 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
543 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
544 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
545 }
546 
547 /* Calculate nb_rx_* after adding rx_queue_id */
548 static void
549 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
550 		     struct eth_device_info *dev_info, int rx_queue_id,
551 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
552 		     uint32_t *nb_wrr)
553 {
554 	if (wt != 0)
555 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
556 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
557 	else
558 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
559 					nb_rx_poll, nb_rx_intr, nb_wrr);
560 }
561 
562 /* Calculate nb_rx_* after deleting rx_queue_id */
563 static void
564 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
565 		     struct eth_device_info *dev_info, int rx_queue_id,
566 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
567 		     uint32_t *nb_wrr)
568 {
569 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
570 				nb_wrr);
571 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
572 				nb_rx_intr);
573 }
574 
575 /*
576  * Allocate the rx_poll array
577  */
578 static struct eth_rx_poll_entry *
579 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
580 {
581 	size_t len;
582 
583 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
584 							RTE_CACHE_LINE_SIZE);
585 	return  rte_zmalloc_socket(rx_adapter->mem_name,
586 				len,
587 				RTE_CACHE_LINE_SIZE,
588 				rx_adapter->socket_id);
589 }
590 
591 /*
592  * Allocate the WRR array
593  */
594 static uint32_t *
595 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
596 {
597 	size_t len;
598 
599 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
600 			RTE_CACHE_LINE_SIZE);
601 	return  rte_zmalloc_socket(rx_adapter->mem_name,
602 				len,
603 				RTE_CACHE_LINE_SIZE,
604 				rx_adapter->socket_id);
605 }
606 
607 static int
608 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
609 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
610 		      uint32_t **wrr_sched)
611 {
612 
613 	if (nb_poll == 0) {
614 		*rx_poll = NULL;
615 		*wrr_sched = NULL;
616 		return 0;
617 	}
618 
619 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
620 	if (*rx_poll == NULL) {
621 		*wrr_sched = NULL;
622 		return -ENOMEM;
623 	}
624 
625 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
626 	if (*wrr_sched == NULL) {
627 		rte_free(*rx_poll);
628 		return -ENOMEM;
629 	}
630 	return 0;
631 }
632 
633 /* Precalculate WRR polling sequence for all queues in rx_adapter */
634 static void
635 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
636 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
637 {
638 	uint16_t d;
639 	uint16_t q;
640 	unsigned int i;
641 	int prev = -1;
642 	int cw = -1;
643 
644 	/* Initialize variables for calculation of wrr schedule */
645 	uint16_t max_wrr_pos = 0;
646 	unsigned int poll_q = 0;
647 	uint16_t max_wt = 0;
648 	uint16_t gcd = 0;
649 
650 	if (rx_poll == NULL)
651 		return;
652 
653 	/* Generate array of all queues to poll, the size of this
654 	 * array is poll_q
655 	 */
656 	RTE_ETH_FOREACH_DEV(d) {
657 		uint16_t nb_rx_queues;
658 		struct eth_device_info *dev_info =
659 				&rx_adapter->eth_devices[d];
660 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
661 		if (dev_info->rx_queue == NULL)
662 			continue;
663 		if (dev_info->internal_event_port)
664 			continue;
665 		dev_info->wrr_len = 0;
666 		for (q = 0; q < nb_rx_queues; q++) {
667 			struct eth_rx_queue_info *queue_info =
668 				&dev_info->rx_queue[q];
669 			uint16_t wt;
670 
671 			if (!rxa_polled_queue(dev_info, q))
672 				continue;
673 			wt = queue_info->wt;
674 			rx_poll[poll_q].eth_dev_id = d;
675 			rx_poll[poll_q].eth_rx_qid = q;
676 			max_wrr_pos += wt;
677 			dev_info->wrr_len += wt;
678 			max_wt = RTE_MAX(max_wt, wt);
679 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
680 			poll_q++;
681 		}
682 	}
683 
684 	/* Generate polling sequence based on weights */
685 	prev = -1;
686 	cw = -1;
687 	for (i = 0; i < max_wrr_pos; i++) {
688 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
689 				     rx_poll, max_wt, gcd, prev);
690 		prev = rx_wrr[i];
691 	}
692 }
693 
694 static inline void
695 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
696 	struct rte_ipv6_hdr **ipv6_hdr)
697 {
698 	struct rte_ether_hdr *eth_hdr =
699 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
700 	struct rte_vlan_hdr *vlan_hdr;
701 
702 	*ipv4_hdr = NULL;
703 	*ipv6_hdr = NULL;
704 
705 	switch (eth_hdr->ether_type) {
706 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
707 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
708 		break;
709 
710 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
711 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
712 		break;
713 
714 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
715 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
716 		switch (vlan_hdr->eth_proto) {
717 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
718 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
719 			break;
720 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
721 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
722 			break;
723 		default:
724 			break;
725 		}
726 		break;
727 
728 	default:
729 		break;
730 	}
731 }
732 
733 /* Calculate RSS hash for IPv4/6 */
734 static inline uint32_t
735 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
736 {
737 	uint32_t input_len;
738 	void *tuple;
739 	struct rte_ipv4_tuple ipv4_tuple;
740 	struct rte_ipv6_tuple ipv6_tuple;
741 	struct rte_ipv4_hdr *ipv4_hdr;
742 	struct rte_ipv6_hdr *ipv6_hdr;
743 
744 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
745 
746 	if (ipv4_hdr) {
747 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
748 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
749 		tuple = &ipv4_tuple;
750 		input_len = RTE_THASH_V4_L3_LEN;
751 	} else if (ipv6_hdr) {
752 		rte_thash_load_v6_addrs(ipv6_hdr,
753 					(union rte_thash_tuple *)&ipv6_tuple);
754 		tuple = &ipv6_tuple;
755 		input_len = RTE_THASH_V6_L3_LEN;
756 	} else
757 		return 0;
758 
759 	return rte_softrss_be(tuple, input_len, rss_key_be);
760 }
761 
762 static inline int
763 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
764 {
765 	return !!rx_adapter->enq_block_count;
766 }
767 
768 static inline void
769 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
770 {
771 	if (rx_adapter->rx_enq_block_start_ts)
772 		return;
773 
774 	rx_adapter->enq_block_count++;
775 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
776 		return;
777 
778 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
779 }
780 
781 static inline void
782 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
783 		     struct rte_event_eth_rx_adapter_stats *stats)
784 {
785 	if (unlikely(!stats->rx_enq_start_ts))
786 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
787 
788 	if (likely(!rxa_enq_blocked(rx_adapter)))
789 		return;
790 
791 	rx_adapter->enq_block_count = 0;
792 	if (rx_adapter->rx_enq_block_start_ts) {
793 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
794 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
795 		    rx_adapter->rx_enq_block_start_ts;
796 		rx_adapter->rx_enq_block_start_ts = 0;
797 	}
798 }
799 
800 /* Enqueue buffered events to event device */
801 static inline uint16_t
802 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
803 		       struct eth_event_enqueue_buffer *buf,
804 		       struct rte_event_eth_rx_adapter_stats *stats)
805 {
806 	uint16_t count = buf->count;
807 	uint16_t n = 0;
808 
809 	if (!count)
810 		return 0;
811 
812 	if (buf->last)
813 		count = buf->last - buf->head;
814 
815 	if (count) {
816 		n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
817 						rx_adapter->event_port_id,
818 						&buf->events[buf->head],
819 						count);
820 		if (n != count)
821 			stats->rx_enq_retry++;
822 
823 		buf->head += n;
824 	}
825 
826 	if (buf->last && n == count) {
827 		uint16_t n1;
828 
829 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
830 					rx_adapter->event_port_id,
831 					&buf->events[0],
832 					buf->tail);
833 
834 		if (n1 != buf->tail)
835 			stats->rx_enq_retry++;
836 
837 		buf->last = 0;
838 		buf->head = n1;
839 		buf->last_mask = 0;
840 		n += n1;
841 	}
842 
843 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
844 		rxa_enq_block_start_ts(rx_adapter);
845 
846 	buf->count -= n;
847 	stats->rx_enq_count += n;
848 
849 	return n;
850 }
851 
852 static inline void
853 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
854 		struct eth_rx_vector_data *vec)
855 {
856 	vec->vector_ev->nb_elem = 0;
857 	vec->vector_ev->port = vec->port;
858 	vec->vector_ev->queue = vec->queue;
859 	vec->vector_ev->attr_valid = true;
860 	vec->vector_ev->elem_offset = 0;
861 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
862 }
863 
864 static inline uint16_t
865 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
866 			struct eth_rx_queue_info *queue_info,
867 			struct eth_event_enqueue_buffer *buf,
868 			struct rte_mbuf **mbufs, uint16_t num)
869 {
870 	struct rte_event *ev = &buf->events[buf->count];
871 	struct eth_rx_vector_data *vec;
872 	uint16_t filled, space, sz;
873 
874 	filled = 0;
875 	vec = &queue_info->vector_data;
876 
877 	if (vec->vector_ev == NULL) {
878 		if (rte_mempool_get(vec->vector_pool,
879 				    (void **)&vec->vector_ev) < 0) {
880 			rte_pktmbuf_free_bulk(mbufs, num);
881 			return 0;
882 		}
883 		rxa_init_vector(rx_adapter, vec);
884 	}
885 	while (num) {
886 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
887 			/* Event ready. */
888 			ev->event = vec->event;
889 			ev->vec = vec->vector_ev;
890 			ev++;
891 			filled++;
892 			vec->vector_ev = NULL;
893 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
894 			if (rte_mempool_get(vec->vector_pool,
895 					    (void **)&vec->vector_ev) < 0) {
896 				rte_pktmbuf_free_bulk(mbufs, num);
897 				return 0;
898 			}
899 			rxa_init_vector(rx_adapter, vec);
900 		}
901 
902 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
903 		sz = num > space ? space : num;
904 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
905 		       sizeof(void *) * sz);
906 		vec->vector_ev->nb_elem += sz;
907 		num -= sz;
908 		mbufs += sz;
909 		vec->ts = rte_rdtsc();
910 	}
911 
912 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
913 		ev->event = vec->event;
914 		ev->vec = vec->vector_ev;
915 		ev++;
916 		filled++;
917 		vec->vector_ev = NULL;
918 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
919 	}
920 
921 	return filled;
922 }
923 
924 static inline void
925 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
926 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
927 		 struct eth_event_enqueue_buffer *buf,
928 		 struct rte_event_eth_rx_adapter_stats *stats)
929 {
930 	uint32_t i;
931 	struct eth_device_info *dev_info =
932 					&rx_adapter->eth_devices[eth_dev_id];
933 	struct eth_rx_queue_info *eth_rx_queue_info =
934 					&dev_info->rx_queue[rx_queue_id];
935 	uint16_t new_tail = buf->tail;
936 	uint64_t event = eth_rx_queue_info->event;
937 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
938 	struct rte_mbuf *m = mbufs[0];
939 	uint32_t rss_mask;
940 	uint32_t rss;
941 	int do_rss;
942 	uint16_t nb_cb;
943 	uint16_t dropped;
944 	uint64_t ts, ts_mask;
945 
946 	if (!eth_rx_queue_info->ena_vector) {
947 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
948 						0 : rte_get_tsc_cycles();
949 
950 		/* 0xffff ffff ffff ffff if RTE_MBUF_F_RX_TIMESTAMP is set,
951 		 * otherwise 0
952 		 */
953 		ts_mask = (uint64_t)(!(m->ol_flags &
954 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
955 
956 		/* 0xffff ffff if RTE_MBUF_F_RX_RSS_HASH is set, otherwise 0 */
957 		rss_mask = ~(((m->ol_flags & RTE_MBUF_F_RX_RSS_HASH) != 0) - 1);
958 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
959 		for (i = 0; i < num; i++) {
960 			struct rte_event *ev;
961 
962 			m = mbufs[i];
963 			*rxa_timestamp_dynfield(m) = ts |
964 					(*rxa_timestamp_dynfield(m) & ts_mask);
965 
966 			ev = &buf->events[new_tail];
967 
968 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
969 				     : m->hash.rss;
970 			ev->event = event;
971 			ev->flow_id = (rss & ~flow_id_mask) |
972 				      (ev->flow_id & flow_id_mask);
973 			ev->mbuf = m;
974 			new_tail++;
975 		}
976 	} else {
977 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
978 					      buf, mbufs, num);
979 	}
980 
981 	if (num && dev_info->cb_fn) {
982 
983 		dropped = 0;
984 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
985 				       buf->last |
986 				       (buf->events_size & ~buf->last_mask),
987 				       buf->count >= BATCH_SIZE ?
988 						buf->count - BATCH_SIZE : 0,
989 				       &buf->events[buf->tail],
990 				       num,
991 				       dev_info->cb_arg,
992 				       &dropped);
993 		if (unlikely(nb_cb > num))
994 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
995 				nb_cb, num);
996 		else
997 			num = nb_cb;
998 		if (dropped)
999 			stats->rx_dropped += dropped;
1000 	}
1001 
1002 	buf->count += num;
1003 	buf->tail += num;
1004 }
1005 
1006 static inline bool
1007 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
1008 {
1009 	uint32_t nb_req = buf->tail + BATCH_SIZE;
1010 
1011 	if (!buf->last) {
1012 		if (nb_req <= buf->events_size)
1013 			return true;
1014 
1015 		if (buf->head >= BATCH_SIZE) {
1016 			buf->last_mask = ~0;
1017 			buf->last = buf->tail;
1018 			buf->tail = 0;
1019 			return true;
1020 		}
1021 	}
1022 
1023 	return nb_req <= buf->head;
1024 }
1025 
1026 /* Enqueue packets from  <port, q>  to event buffer */
1027 static inline uint32_t
1028 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
1029 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
1030 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf,
1031 	   struct rte_event_eth_rx_adapter_stats *stats)
1032 {
1033 	struct rte_mbuf *mbufs[BATCH_SIZE];
1034 	uint16_t n;
1035 	uint32_t nb_rx = 0;
1036 	uint32_t nb_flushed = 0;
1037 
1038 	if (rxq_empty)
1039 		*rxq_empty = 0;
1040 	/* Don't do a batch dequeue from the rx queue if there isn't
1041 	 * enough space in the enqueue buffer.
1042 	 */
1043 	while (rxa_pkt_buf_available(buf)) {
1044 		if (buf->count >= BATCH_SIZE)
1045 			nb_flushed +=
1046 				rxa_flush_event_buffer(rx_adapter, buf, stats);
1047 
1048 		stats->rx_poll_count++;
1049 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1050 		if (unlikely(!n)) {
1051 			if (rxq_empty)
1052 				*rxq_empty = 1;
1053 			break;
1054 		}
1055 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf,
1056 				 stats);
1057 		nb_rx += n;
1058 		if (rx_count + nb_rx > max_rx)
1059 			break;
1060 	}
1061 
1062 	if (buf->count > 0)
1063 		nb_flushed += rxa_flush_event_buffer(rx_adapter, buf, stats);
1064 
1065 	stats->rx_packets += nb_rx;
1066 	if (nb_flushed == 0)
1067 		rte_event_maintain(rx_adapter->eventdev_id,
1068 				   rx_adapter->event_port_id, 0);
1069 
1070 	return nb_rx;
1071 }
1072 
1073 static inline void
1074 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1075 {
1076 	uint16_t port_id;
1077 	uint16_t queue;
1078 	int err;
1079 	union queue_data qd;
1080 	struct eth_device_info *dev_info;
1081 	struct eth_rx_queue_info *queue_info;
1082 	int *intr_enabled;
1083 
1084 	qd.ptr = data;
1085 	port_id = qd.port;
1086 	queue = qd.queue;
1087 
1088 	dev_info = &rx_adapter->eth_devices[port_id];
1089 	queue_info = &dev_info->rx_queue[queue];
1090 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1091 	if (rxa_shared_intr(dev_info, queue))
1092 		intr_enabled = &dev_info->shared_intr_enabled;
1093 	else
1094 		intr_enabled = &queue_info->intr_enabled;
1095 
1096 	if (*intr_enabled) {
1097 		*intr_enabled = 0;
1098 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1099 		/* Entry should always be available.
1100 		 * The ring size equals the maximum number of interrupt
1101 		 * vectors supported (an interrupt vector is shared in
1102 		 * case of shared interrupts)
1103 		 */
1104 		if (err)
1105 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1106 				" to ring: %s", strerror(-err));
1107 		else
1108 			rte_eth_dev_rx_intr_disable(port_id, queue);
1109 	}
1110 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1111 }
1112 
1113 static int
1114 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1115 			  uint32_t num_intr_vec)
1116 {
1117 	if (rx_adapter->num_intr_vec + num_intr_vec >
1118 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1119 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1120 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1121 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1122 		return -ENOSPC;
1123 	}
1124 
1125 	return 0;
1126 }
1127 
1128 /* Delete entries for (dev, queue) from the interrupt ring */
1129 static void
1130 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1131 			  struct eth_device_info *dev_info,
1132 			  uint16_t rx_queue_id)
1133 {
1134 	int i, n;
1135 	union queue_data qd;
1136 
1137 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1138 
1139 	n = rte_ring_count(rx_adapter->intr_ring);
1140 	for (i = 0; i < n; i++) {
1141 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1142 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1143 			if (qd.port == dev_info->dev->data->port_id &&
1144 				qd.queue == rx_queue_id)
1145 				continue;
1146 		} else {
1147 			if (qd.port == dev_info->dev->data->port_id)
1148 				continue;
1149 		}
1150 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1151 	}
1152 
1153 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1154 }
1155 
1156 /* pthread callback handling interrupt mode receive queues
1157  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1158  * interrupting queue to the adapter's ring buffer for interrupt events.
1159  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1160  * the adapter service function.
1161  */
1162 static void *
1163 rxa_intr_thread(void *arg)
1164 {
1165 	struct event_eth_rx_adapter *rx_adapter = arg;
1166 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1167 	int n, i;
1168 
1169 	while (1) {
1170 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1171 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1172 		if (unlikely(n < 0))
1173 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1174 					n);
1175 		for (i = 0; i < n; i++) {
1176 			rxa_intr_ring_enqueue(rx_adapter,
1177 					epoll_events[i].epdata.data);
1178 		}
1179 	}
1180 
1181 	return NULL;
1182 }
1183 
1184 /* Dequeue <port, q> from interrupt ring and enqueue received
1185  * mbufs to eventdev
1186  */
1187 static inline bool
1188 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1189 {
1190 	uint32_t n;
1191 	uint32_t nb_rx = 0;
1192 	int rxq_empty;
1193 	struct eth_event_enqueue_buffer *buf;
1194 	struct rte_event_eth_rx_adapter_stats *stats;
1195 	rte_spinlock_t *ring_lock;
1196 	uint8_t max_done = 0;
1197 	bool work = false;
1198 
1199 	if (rx_adapter->num_rx_intr == 0)
1200 		return work;
1201 
1202 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1203 		&& !rx_adapter->qd_valid)
1204 		return work;
1205 
1206 	buf = &rx_adapter->event_enqueue_buffer;
1207 	stats = &rx_adapter->stats;
1208 	ring_lock = &rx_adapter->intr_ring_lock;
1209 
1210 	if (buf->count >= BATCH_SIZE) {
1211 		uint16_t n;
1212 
1213 		n = rxa_flush_event_buffer(rx_adapter, buf, stats);
1214 
1215 		if (likely(n > 0))
1216 			work = true;
1217 	}
1218 
1219 	while (rxa_pkt_buf_available(buf)) {
1220 		struct eth_device_info *dev_info;
1221 		uint16_t port;
1222 		uint16_t queue;
1223 		union queue_data qd  = rx_adapter->qd;
1224 		int err;
1225 
1226 		if (!rx_adapter->qd_valid) {
1227 			struct eth_rx_queue_info *queue_info;
1228 
1229 			rte_spinlock_lock(ring_lock);
1230 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1231 			if (err) {
1232 				rte_spinlock_unlock(ring_lock);
1233 				break;
1234 			}
1235 
1236 			port = qd.port;
1237 			queue = qd.queue;
1238 			rx_adapter->qd = qd;
1239 			rx_adapter->qd_valid = 1;
1240 			dev_info = &rx_adapter->eth_devices[port];
1241 			if (rxa_shared_intr(dev_info, queue))
1242 				dev_info->shared_intr_enabled = 1;
1243 			else {
1244 				queue_info = &dev_info->rx_queue[queue];
1245 				queue_info->intr_enabled = 1;
1246 			}
1247 			rte_eth_dev_rx_intr_enable(port, queue);
1248 			rte_spinlock_unlock(ring_lock);
1249 		} else {
1250 			port = qd.port;
1251 			queue = qd.queue;
1252 
1253 			dev_info = &rx_adapter->eth_devices[port];
1254 		}
1255 
1256 		if (rxa_shared_intr(dev_info, queue)) {
1257 			uint16_t i;
1258 			uint16_t nb_queues;
1259 
1260 			nb_queues = dev_info->dev->data->nb_rx_queues;
1261 			n = 0;
1262 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1263 				uint8_t enq_buffer_full;
1264 
1265 				if (!rxa_intr_queue(dev_info, i))
1266 					continue;
1267 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1268 					rx_adapter->max_nb_rx,
1269 					&rxq_empty, buf, stats);
1270 				nb_rx += n;
1271 
1272 				enq_buffer_full = !rxq_empty && n == 0;
1273 				max_done = nb_rx > rx_adapter->max_nb_rx;
1274 
1275 				if (enq_buffer_full || max_done) {
1276 					dev_info->next_q_idx = i;
1277 					goto done;
1278 				}
1279 			}
1280 
1281 			rx_adapter->qd_valid = 0;
1282 
1283 			/* Reinitialize for next interrupt */
1284 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1285 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1286 						0;
1287 		} else {
1288 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1289 				rx_adapter->max_nb_rx,
1290 				&rxq_empty, buf, stats);
1291 			rx_adapter->qd_valid = !rxq_empty;
1292 			nb_rx += n;
1293 			if (nb_rx > rx_adapter->max_nb_rx)
1294 				break;
1295 		}
1296 	}
1297 
1298 done:
1299 	if (nb_rx > 0) {
1300 		rx_adapter->stats.rx_intr_packets += nb_rx;
1301 		work = true;
1302 	}
1303 
1304 	return work;
1305 }
1306 
1307 /*
1308  * Polls receive queues added to the event adapter and enqueues received
1309  * packets to the event device.
1310  *
1311  * The receive code enqueues initially to a temporary buffer, the
1312  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1313  *
1314  * If there isn't space available in the temporary buffer, packets from the
1315  * Rx queue aren't dequeued from the eth device, this back pressures the
1316  * eth device, in virtual device environments this back pressure is relayed to
1317  * the hypervisor's switching layer where adjustments can be made to deal with
1318  * it.
1319  */
1320 static inline bool
1321 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1322 {
1323 	uint32_t num_queue;
1324 	uint32_t nb_rx = 0;
1325 	struct eth_event_enqueue_buffer *buf = NULL;
1326 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1327 	uint32_t wrr_pos;
1328 	uint32_t max_nb_rx;
1329 	bool work = false;
1330 
1331 	wrr_pos = rx_adapter->wrr_pos;
1332 	max_nb_rx = rx_adapter->max_nb_rx;
1333 
1334 	/* Iterate through a WRR sequence */
1335 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1336 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1337 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1338 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1339 
1340 		buf = rxa_event_buf_get(rx_adapter, d, qid, &stats);
1341 
1342 		/* Don't do a batch dequeue from the rx queue if there isn't
1343 		 * enough space in the enqueue buffer.
1344 		 */
1345 		if (buf->count >= BATCH_SIZE) {
1346 			uint16_t n;
1347 
1348 			n = rxa_flush_event_buffer(rx_adapter, buf, stats);
1349 
1350 			if (likely(n > 0))
1351 				work = true;
1352 		}
1353 		if (!rxa_pkt_buf_available(buf)) {
1354 			if (rx_adapter->use_queue_event_buf)
1355 				goto poll_next_entry;
1356 			else {
1357 				rx_adapter->wrr_pos = wrr_pos;
1358 				break;
1359 			}
1360 		}
1361 
1362 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1363 				NULL, buf, stats);
1364 		if (nb_rx > max_nb_rx) {
1365 			rx_adapter->wrr_pos =
1366 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1367 			break;
1368 		}
1369 
1370 poll_next_entry:
1371 		if (++wrr_pos == rx_adapter->wrr_len)
1372 			wrr_pos = 0;
1373 	}
1374 
1375 	if (nb_rx > 0)
1376 		work = true;
1377 
1378 	return work;
1379 }
1380 
1381 static void
1382 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1383 {
1384 	struct event_eth_rx_adapter *rx_adapter = arg;
1385 	struct eth_event_enqueue_buffer *buf = NULL;
1386 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
1387 	struct rte_event *ev;
1388 
1389 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue, &stats);
1390 
1391 	if (buf->count)
1392 		rxa_flush_event_buffer(rx_adapter, buf, stats);
1393 
1394 	if (vec->vector_ev->nb_elem == 0)
1395 		return;
1396 	ev = &buf->events[buf->count];
1397 
1398 	/* Event ready. */
1399 	ev->event = vec->event;
1400 	ev->vec = vec->vector_ev;
1401 	buf->count++;
1402 
1403 	vec->vector_ev = NULL;
1404 	vec->ts = 0;
1405 }
1406 
1407 static int
1408 rxa_service_func(void *args)
1409 {
1410 	struct event_eth_rx_adapter *rx_adapter = args;
1411 	bool intr_work;
1412 	bool poll_work;
1413 
1414 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1415 		return -EAGAIN;
1416 	if (!rx_adapter->rxa_started) {
1417 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1418 		return -EAGAIN;
1419 	}
1420 
1421 	if (rx_adapter->ena_vector) {
1422 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1423 		    rx_adapter->vector_tmo_ticks) {
1424 			struct eth_rx_vector_data *vec;
1425 
1426 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1427 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1428 
1429 				if (elapsed_time >= vec->vector_timeout_ticks) {
1430 					rxa_vector_expire(vec, rx_adapter);
1431 					TAILQ_REMOVE(&rx_adapter->vector_list,
1432 						     vec, next);
1433 				}
1434 			}
1435 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1436 		}
1437 	}
1438 
1439 	intr_work = rxa_intr_ring_dequeue(rx_adapter);
1440 	poll_work = rxa_poll(rx_adapter);
1441 
1442 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1443 
1444 	return intr_work || poll_work ? 0 : -EAGAIN;
1445 }
1446 
1447 static void *
1448 rxa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
1449 {
1450 	const struct rte_memzone *mz;
1451 	unsigned int sz;
1452 
1453 	sz = elt_size * nb_elems;
1454 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1455 
1456 	mz = rte_memzone_lookup(name);
1457 	if (mz == NULL) {
1458 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1459 						 RTE_CACHE_LINE_SIZE);
1460 		if (mz == NULL) {
1461 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
1462 					 " name = %s, err = %"
1463 					 PRId32, name, rte_errno);
1464 			return NULL;
1465 		}
1466 	}
1467 
1468 	return mz->addr;
1469 }
1470 
1471 static int
1472 rte_event_eth_rx_adapter_init(void)
1473 {
1474 	uint8_t i;
1475 
1476 	if (event_eth_rx_adapter == NULL) {
1477 		event_eth_rx_adapter =
1478 			rxa_memzone_array_get(RXA_ADAPTER_ARRAY,
1479 					sizeof(*event_eth_rx_adapter),
1480 					RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE);
1481 		if (event_eth_rx_adapter == NULL)
1482 			return -ENOMEM;
1483 
1484 		for (i = 0; i < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; i++)
1485 			event_eth_rx_adapter[i] = NULL;
1486 
1487 	}
1488 
1489 	return 0;
1490 }
1491 
1492 static int
1493 rxa_memzone_lookup(void)
1494 {
1495 	const struct rte_memzone *mz;
1496 
1497 	if (event_eth_rx_adapter == NULL) {
1498 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1499 		if (mz == NULL)
1500 			return -ENOMEM;
1501 
1502 		event_eth_rx_adapter = mz->addr;
1503 	}
1504 
1505 	return 0;
1506 }
1507 
1508 static inline struct event_eth_rx_adapter *
1509 rxa_id_to_adapter(uint8_t id)
1510 {
1511 	return event_eth_rx_adapter ?
1512 		event_eth_rx_adapter[id] : NULL;
1513 }
1514 
1515 static int
1516 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1517 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1518 {
1519 	int ret;
1520 	struct rte_eventdev *dev;
1521 	struct rte_event_dev_config dev_conf;
1522 	int started;
1523 	uint8_t port_id;
1524 	struct rte_event_port_conf *port_conf = arg;
1525 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1526 
1527 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1528 	dev_conf = dev->data->dev_conf;
1529 
1530 	started = dev->data->dev_started;
1531 	if (started)
1532 		rte_event_dev_stop(dev_id);
1533 	port_id = dev_conf.nb_event_ports;
1534 	dev_conf.nb_event_ports += 1;
1535 	if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_SINGLE_LINK)
1536 		dev_conf.nb_single_link_event_port_queues += 1;
1537 
1538 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1539 	if (ret) {
1540 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1541 						dev_id);
1542 		if (started) {
1543 			if (rte_event_dev_start(dev_id))
1544 				return -EIO;
1545 		}
1546 		return ret;
1547 	}
1548 
1549 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1550 	if (ret) {
1551 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1552 					port_id);
1553 		return ret;
1554 	}
1555 
1556 	conf->event_port_id = port_id;
1557 	conf->max_nb_rx = 128;
1558 	if (started)
1559 		ret = rte_event_dev_start(dev_id);
1560 	rx_adapter->default_cb_arg = 1;
1561 	return ret;
1562 }
1563 
1564 static int
1565 rxa_epoll_create1(void)
1566 {
1567 #if defined(LINUX)
1568 	int fd;
1569 	fd = epoll_create1(EPOLL_CLOEXEC);
1570 	return fd < 0 ? -errno : fd;
1571 #elif defined(BSD)
1572 	return -ENOTSUP;
1573 #endif
1574 }
1575 
1576 static int
1577 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1578 {
1579 	if (rx_adapter->epd != INIT_FD)
1580 		return 0;
1581 
1582 	rx_adapter->epd = rxa_epoll_create1();
1583 	if (rx_adapter->epd < 0) {
1584 		int err = rx_adapter->epd;
1585 		rx_adapter->epd = INIT_FD;
1586 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1587 		return err;
1588 	}
1589 
1590 	return 0;
1591 }
1592 
1593 static int
1594 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1595 {
1596 	int err;
1597 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1598 
1599 	if (rx_adapter->intr_ring)
1600 		return 0;
1601 
1602 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1603 					RTE_EVENT_ETH_INTR_RING_SIZE,
1604 					rte_socket_id(), 0);
1605 	if (!rx_adapter->intr_ring)
1606 		return -ENOMEM;
1607 
1608 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1609 					RTE_EVENT_ETH_INTR_RING_SIZE *
1610 					sizeof(struct rte_epoll_event),
1611 					RTE_CACHE_LINE_SIZE,
1612 					rx_adapter->socket_id);
1613 	if (!rx_adapter->epoll_events) {
1614 		err = -ENOMEM;
1615 		goto error;
1616 	}
1617 
1618 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1619 
1620 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1621 			"rx-intr-thread-%d", rx_adapter->id);
1622 
1623 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1624 				NULL, rxa_intr_thread, rx_adapter);
1625 	if (!err)
1626 		return 0;
1627 
1628 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1629 	rte_free(rx_adapter->epoll_events);
1630 error:
1631 	rte_ring_free(rx_adapter->intr_ring);
1632 	rx_adapter->intr_ring = NULL;
1633 	rx_adapter->epoll_events = NULL;
1634 	return err;
1635 }
1636 
1637 static int
1638 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1639 {
1640 	int err;
1641 
1642 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1643 	if (err)
1644 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1645 				err);
1646 
1647 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1648 	if (err)
1649 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1650 
1651 	rte_free(rx_adapter->epoll_events);
1652 	rte_ring_free(rx_adapter->intr_ring);
1653 	rx_adapter->intr_ring = NULL;
1654 	rx_adapter->epoll_events = NULL;
1655 	return 0;
1656 }
1657 
1658 static int
1659 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1660 {
1661 	int ret;
1662 
1663 	if (rx_adapter->num_rx_intr == 0)
1664 		return 0;
1665 
1666 	ret = rxa_destroy_intr_thread(rx_adapter);
1667 	if (ret)
1668 		return ret;
1669 
1670 	close(rx_adapter->epd);
1671 	rx_adapter->epd = INIT_FD;
1672 
1673 	return ret;
1674 }
1675 
1676 static int
1677 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1678 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1679 {
1680 	int err;
1681 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1682 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1683 
1684 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1685 	if (err) {
1686 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1687 			rx_queue_id);
1688 		return err;
1689 	}
1690 
1691 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1692 					rx_adapter->epd,
1693 					RTE_INTR_EVENT_DEL,
1694 					0);
1695 	if (err)
1696 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1697 
1698 	if (sintr)
1699 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1700 	else
1701 		dev_info->shared_intr_enabled = 0;
1702 	return err;
1703 }
1704 
1705 static int
1706 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1707 		   struct eth_device_info *dev_info, int rx_queue_id)
1708 {
1709 	int err;
1710 	int i;
1711 	int s;
1712 
1713 	if (dev_info->nb_rx_intr == 0)
1714 		return 0;
1715 
1716 	err = 0;
1717 	if (rx_queue_id == -1) {
1718 		s = dev_info->nb_shared_intr;
1719 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1720 			int sintr;
1721 			uint16_t q;
1722 
1723 			q = dev_info->intr_queue[i];
1724 			sintr = rxa_shared_intr(dev_info, q);
1725 			s -= sintr;
1726 
1727 			if (!sintr || s == 0) {
1728 
1729 				err = rxa_disable_intr(rx_adapter, dev_info,
1730 						q);
1731 				if (err)
1732 					return err;
1733 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1734 							q);
1735 			}
1736 		}
1737 	} else {
1738 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1739 			return 0;
1740 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1741 				dev_info->nb_shared_intr == 1) {
1742 			err = rxa_disable_intr(rx_adapter, dev_info,
1743 					rx_queue_id);
1744 			if (err)
1745 				return err;
1746 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1747 						rx_queue_id);
1748 		}
1749 
1750 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1751 			if (dev_info->intr_queue[i] == rx_queue_id) {
1752 				for (; i < dev_info->nb_rx_intr - 1; i++)
1753 					dev_info->intr_queue[i] =
1754 						dev_info->intr_queue[i + 1];
1755 				break;
1756 			}
1757 		}
1758 	}
1759 
1760 	return err;
1761 }
1762 
1763 static int
1764 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1765 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1766 {
1767 	int err, err1;
1768 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1769 	union queue_data qd;
1770 	int init_fd;
1771 	uint16_t *intr_queue;
1772 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1773 
1774 	if (rxa_intr_queue(dev_info, rx_queue_id))
1775 		return 0;
1776 
1777 	intr_queue = dev_info->intr_queue;
1778 	if (dev_info->intr_queue == NULL) {
1779 		size_t len =
1780 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1781 		dev_info->intr_queue =
1782 			rte_zmalloc_socket(
1783 				rx_adapter->mem_name,
1784 				len,
1785 				0,
1786 				rx_adapter->socket_id);
1787 		if (dev_info->intr_queue == NULL)
1788 			return -ENOMEM;
1789 	}
1790 
1791 	init_fd = rx_adapter->epd;
1792 	err = rxa_init_epd(rx_adapter);
1793 	if (err)
1794 		goto err_free_queue;
1795 
1796 	qd.port = eth_dev_id;
1797 	qd.queue = rx_queue_id;
1798 
1799 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1800 					rx_adapter->epd,
1801 					RTE_INTR_EVENT_ADD,
1802 					qd.ptr);
1803 	if (err) {
1804 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1805 			" Rx Queue %u err %d", rx_queue_id, err);
1806 		goto err_del_fd;
1807 	}
1808 
1809 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1810 	if (err) {
1811 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1812 				" Rx Queue %u err %d", rx_queue_id, err);
1813 
1814 		goto err_del_event;
1815 	}
1816 
1817 	err = rxa_create_intr_thread(rx_adapter);
1818 	if (!err)  {
1819 		if (sintr)
1820 			dev_info->shared_intr_enabled = 1;
1821 		else
1822 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1823 		return 0;
1824 	}
1825 
1826 
1827 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1828 	if (err)
1829 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1830 				" Rx Queue %u err %d", rx_queue_id, err);
1831 err_del_event:
1832 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1833 					rx_adapter->epd,
1834 					RTE_INTR_EVENT_DEL,
1835 					0);
1836 	if (err1) {
1837 		RTE_EDEV_LOG_ERR("Could not delete event for"
1838 				" Rx Queue %u err %d", rx_queue_id, err1);
1839 	}
1840 err_del_fd:
1841 	if (init_fd == INIT_FD) {
1842 		close(rx_adapter->epd);
1843 		rx_adapter->epd = -1;
1844 	}
1845 err_free_queue:
1846 	if (intr_queue == NULL)
1847 		rte_free(dev_info->intr_queue);
1848 
1849 	return err;
1850 }
1851 
1852 static int
1853 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1854 		   struct eth_device_info *dev_info, int rx_queue_id)
1855 
1856 {
1857 	int i, j, err;
1858 	int si = -1;
1859 	int shared_done = (dev_info->nb_shared_intr > 0);
1860 
1861 	if (rx_queue_id != -1) {
1862 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1863 			return 0;
1864 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1865 	}
1866 
1867 	err = 0;
1868 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1869 
1870 		if (rxa_shared_intr(dev_info, i) && shared_done)
1871 			continue;
1872 
1873 		err = rxa_config_intr(rx_adapter, dev_info, i);
1874 
1875 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1876 		if (shared_done) {
1877 			si = i;
1878 			dev_info->shared_intr_enabled = 1;
1879 		}
1880 		if (err)
1881 			break;
1882 	}
1883 
1884 	if (err == 0)
1885 		return 0;
1886 
1887 	shared_done = (dev_info->nb_shared_intr > 0);
1888 	for (j = 0; j < i; j++) {
1889 		if (rxa_intr_queue(dev_info, j))
1890 			continue;
1891 		if (rxa_shared_intr(dev_info, j) && si != j)
1892 			continue;
1893 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1894 		if (err)
1895 			break;
1896 
1897 	}
1898 
1899 	return err;
1900 }
1901 
1902 static int
1903 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1904 {
1905 	int ret;
1906 	struct rte_service_spec service;
1907 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1908 
1909 	if (rx_adapter->service_inited)
1910 		return 0;
1911 
1912 	memset(&service, 0, sizeof(service));
1913 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1914 		"rte_event_eth_rx_adapter_%d", id);
1915 	service.socket_id = rx_adapter->socket_id;
1916 	service.callback = rxa_service_func;
1917 	service.callback_userdata = rx_adapter;
1918 	/* Service function handles locking for queue add/del updates */
1919 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1920 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1921 	if (ret) {
1922 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1923 			service.name, ret);
1924 		return ret;
1925 	}
1926 
1927 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1928 		&rx_adapter_conf, rx_adapter->conf_arg);
1929 	if (ret) {
1930 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1931 			ret);
1932 		goto err_done;
1933 	}
1934 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1935 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1936 	rx_adapter->service_inited = 1;
1937 	rx_adapter->epd = INIT_FD;
1938 	return 0;
1939 
1940 err_done:
1941 	rte_service_component_unregister(rx_adapter->service_id);
1942 	return ret;
1943 }
1944 
1945 static void
1946 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1947 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1948 		 uint8_t add)
1949 {
1950 	struct eth_rx_queue_info *queue_info;
1951 	int enabled;
1952 	uint16_t i;
1953 
1954 	if (dev_info->rx_queue == NULL)
1955 		return;
1956 
1957 	if (rx_queue_id == -1) {
1958 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1959 			rxa_update_queue(rx_adapter, dev_info, i, add);
1960 	} else {
1961 		queue_info = &dev_info->rx_queue[rx_queue_id];
1962 		enabled = queue_info->queue_enabled;
1963 		if (add) {
1964 			rx_adapter->nb_queues += !enabled;
1965 			dev_info->nb_dev_queues += !enabled;
1966 		} else {
1967 			rx_adapter->nb_queues -= enabled;
1968 			dev_info->nb_dev_queues -= enabled;
1969 		}
1970 		queue_info->queue_enabled = !!add;
1971 	}
1972 }
1973 
1974 static void
1975 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1976 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1977 		    uint16_t port_id)
1978 {
1979 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1980 	struct eth_rx_vector_data *vector_data;
1981 	uint32_t flow_id;
1982 
1983 	vector_data = &queue_info->vector_data;
1984 	vector_data->max_vector_count = vector_count;
1985 	vector_data->port = port_id;
1986 	vector_data->queue = qid;
1987 	vector_data->vector_pool = mp;
1988 	vector_data->vector_timeout_ticks =
1989 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1990 	vector_data->ts = 0;
1991 	flow_id = queue_info->event & 0xFFFFF;
1992 	flow_id =
1993 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1994 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1995 }
1996 
1997 static void
1998 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
1999 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
2000 {
2001 	struct eth_rx_vector_data *vec;
2002 	int pollq;
2003 	int intrq;
2004 	int sintrq;
2005 
2006 	if (rx_adapter->nb_queues == 0)
2007 		return;
2008 
2009 	if (rx_queue_id == -1) {
2010 		uint16_t nb_rx_queues;
2011 		uint16_t i;
2012 
2013 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2014 		for (i = 0; i <	nb_rx_queues; i++)
2015 			rxa_sw_del(rx_adapter, dev_info, i);
2016 		return;
2017 	}
2018 
2019 	/* Push all the partial event vectors to event device. */
2020 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
2021 		if (vec->queue != rx_queue_id)
2022 			continue;
2023 		rxa_vector_expire(vec, rx_adapter);
2024 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
2025 	}
2026 
2027 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2028 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2029 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2030 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
2031 	rx_adapter->num_rx_polled -= pollq;
2032 	dev_info->nb_rx_poll -= pollq;
2033 	rx_adapter->num_rx_intr -= intrq;
2034 	dev_info->nb_rx_intr -= intrq;
2035 	dev_info->nb_shared_intr -= intrq && sintrq;
2036 	if (rx_adapter->use_queue_event_buf) {
2037 		struct eth_event_enqueue_buffer *event_buf =
2038 			dev_info->rx_queue[rx_queue_id].event_buf;
2039 		struct rte_event_eth_rx_adapter_stats *stats =
2040 			dev_info->rx_queue[rx_queue_id].stats;
2041 		rte_free(event_buf->events);
2042 		rte_free(event_buf);
2043 		rte_free(stats);
2044 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
2045 		dev_info->rx_queue[rx_queue_id].stats = NULL;
2046 	}
2047 }
2048 
2049 static int
2050 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
2051 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
2052 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
2053 {
2054 	struct eth_rx_queue_info *queue_info;
2055 	const struct rte_event *ev = &conf->ev;
2056 	int pollq;
2057 	int intrq;
2058 	int sintrq;
2059 	struct rte_event *qi_ev;
2060 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
2061 	struct rte_event_eth_rx_adapter_stats *stats = NULL;
2062 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
2063 	int ret;
2064 
2065 	if (rx_queue_id == -1) {
2066 		uint16_t nb_rx_queues;
2067 		uint16_t i;
2068 
2069 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2070 		for (i = 0; i <	nb_rx_queues; i++) {
2071 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
2072 			if (ret)
2073 				return ret;
2074 		}
2075 		return 0;
2076 	}
2077 
2078 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
2079 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
2080 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2081 
2082 	queue_info = &dev_info->rx_queue[rx_queue_id];
2083 	queue_info->wt = conf->servicing_weight;
2084 
2085 	qi_ev = (struct rte_event *)&queue_info->event;
2086 	qi_ev->event = ev->event;
2087 	qi_ev->op = RTE_EVENT_OP_NEW;
2088 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2089 
2090 	if (conf->rx_queue_flags &
2091 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2092 		queue_info->flow_id_mask = ~0;
2093 	} else
2094 		qi_ev->flow_id = 0;
2095 
2096 	if (conf->rx_queue_flags &
2097 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2098 		queue_info->ena_vector = 1;
2099 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2100 		rxa_set_vector_data(queue_info, conf->vector_sz,
2101 				    conf->vector_timeout_ns, conf->vector_mp,
2102 				    rx_queue_id, dev_info->dev->data->port_id);
2103 		rx_adapter->ena_vector = 1;
2104 		rx_adapter->vector_tmo_ticks =
2105 			rx_adapter->vector_tmo_ticks ?
2106 				      RTE_MIN(queue_info->vector_data
2107 							.vector_timeout_ticks >>
2108 						1,
2109 					rx_adapter->vector_tmo_ticks) :
2110 				queue_info->vector_data.vector_timeout_ticks >>
2111 					1;
2112 	}
2113 
2114 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2115 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2116 		rx_adapter->num_rx_polled += !pollq;
2117 		dev_info->nb_rx_poll += !pollq;
2118 		rx_adapter->num_rx_intr -= intrq;
2119 		dev_info->nb_rx_intr -= intrq;
2120 		dev_info->nb_shared_intr -= intrq && sintrq;
2121 	}
2122 
2123 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2124 		rx_adapter->num_rx_polled -= pollq;
2125 		dev_info->nb_rx_poll -= pollq;
2126 		rx_adapter->num_rx_intr += !intrq;
2127 		dev_info->nb_rx_intr += !intrq;
2128 		dev_info->nb_shared_intr += !intrq && sintrq;
2129 		if (dev_info->nb_shared_intr == 1) {
2130 			if (dev_info->multi_intr_cap)
2131 				dev_info->next_q_idx =
2132 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2133 			else
2134 				dev_info->next_q_idx = 0;
2135 		}
2136 	}
2137 
2138 	if (!rx_adapter->use_queue_event_buf)
2139 		return 0;
2140 
2141 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2142 				sizeof(*new_rx_buf), 0,
2143 				rte_eth_dev_socket_id(eth_dev_id));
2144 	if (new_rx_buf == NULL) {
2145 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2146 				 "dev_id: %d queue_id: %d",
2147 				 eth_dev_id, rx_queue_id);
2148 		return -ENOMEM;
2149 	}
2150 
2151 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2152 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2153 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2154 				sizeof(struct rte_event) *
2155 				new_rx_buf->events_size, 0,
2156 				rte_eth_dev_socket_id(eth_dev_id));
2157 	if (new_rx_buf->events == NULL) {
2158 		rte_free(new_rx_buf);
2159 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2160 				 "dev_id: %d queue_id: %d",
2161 				 eth_dev_id, rx_queue_id);
2162 		return -ENOMEM;
2163 	}
2164 
2165 	queue_info->event_buf = new_rx_buf;
2166 
2167 	/* Allocate storage for adapter queue stats */
2168 	stats = rte_zmalloc_socket("rx_queue_stats",
2169 				sizeof(*stats), 0,
2170 				rte_eth_dev_socket_id(eth_dev_id));
2171 	if (stats == NULL) {
2172 		rte_free(new_rx_buf->events);
2173 		rte_free(new_rx_buf);
2174 		RTE_EDEV_LOG_ERR("Failed to allocate stats storage for"
2175 				 " dev_id: %d queue_id: %d",
2176 				 eth_dev_id, rx_queue_id);
2177 		return -ENOMEM;
2178 	}
2179 
2180 	queue_info->stats = stats;
2181 
2182 	return 0;
2183 }
2184 
2185 static int
2186 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2187 	   int rx_queue_id,
2188 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2189 {
2190 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2191 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2192 	int ret;
2193 	struct eth_rx_poll_entry *rx_poll;
2194 	struct eth_rx_queue_info *rx_queue;
2195 	uint32_t *rx_wrr;
2196 	uint16_t nb_rx_queues;
2197 	uint32_t nb_rx_poll, nb_wrr;
2198 	uint32_t nb_rx_intr;
2199 	int num_intr_vec;
2200 	uint16_t wt;
2201 
2202 	if (queue_conf->servicing_weight == 0) {
2203 		struct rte_eth_dev_data *data = dev_info->dev->data;
2204 
2205 		temp_conf = *queue_conf;
2206 		if (!data->dev_conf.intr_conf.rxq) {
2207 			/* If Rx interrupts are disabled set wt = 1 */
2208 			temp_conf.servicing_weight = 1;
2209 		}
2210 		queue_conf = &temp_conf;
2211 
2212 		if (queue_conf->servicing_weight == 0 &&
2213 		    rx_adapter->use_queue_event_buf) {
2214 
2215 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2216 					 "not supported for interrupt queues "
2217 					 "dev_id: %d queue_id: %d",
2218 					 eth_dev_id, rx_queue_id);
2219 			return -EINVAL;
2220 		}
2221 	}
2222 
2223 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2224 	rx_queue = dev_info->rx_queue;
2225 	wt = queue_conf->servicing_weight;
2226 
2227 	if (dev_info->rx_queue == NULL) {
2228 		dev_info->rx_queue =
2229 		    rte_zmalloc_socket(rx_adapter->mem_name,
2230 				       nb_rx_queues *
2231 				       sizeof(struct eth_rx_queue_info), 0,
2232 				       rx_adapter->socket_id);
2233 		if (dev_info->rx_queue == NULL)
2234 			return -ENOMEM;
2235 	}
2236 	rx_wrr = NULL;
2237 	rx_poll = NULL;
2238 
2239 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2240 			queue_conf->servicing_weight,
2241 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2242 
2243 	if (dev_info->dev->intr_handle)
2244 		dev_info->multi_intr_cap =
2245 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2246 
2247 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2248 				&rx_poll, &rx_wrr);
2249 	if (ret)
2250 		goto err_free_rxqueue;
2251 
2252 	if (wt == 0) {
2253 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2254 
2255 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2256 		if (ret)
2257 			goto err_free_rxqueue;
2258 
2259 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2260 		if (ret)
2261 			goto err_free_rxqueue;
2262 	} else {
2263 
2264 		num_intr_vec = 0;
2265 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2266 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2267 						rx_queue_id, 0);
2268 			/* interrupt based queues are being converted to
2269 			 * poll mode queues, delete the interrupt configuration
2270 			 * for those.
2271 			 */
2272 			ret = rxa_del_intr_queue(rx_adapter,
2273 						dev_info, rx_queue_id);
2274 			if (ret)
2275 				goto err_free_rxqueue;
2276 		}
2277 	}
2278 
2279 	if (nb_rx_intr == 0) {
2280 		ret = rxa_free_intr_resources(rx_adapter);
2281 		if (ret)
2282 			goto err_free_rxqueue;
2283 	}
2284 
2285 	if (wt == 0) {
2286 		uint16_t i;
2287 
2288 		if (rx_queue_id  == -1) {
2289 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2290 				dev_info->intr_queue[i] = i;
2291 		} else {
2292 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2293 				dev_info->intr_queue[nb_rx_intr - 1] =
2294 					rx_queue_id;
2295 		}
2296 	}
2297 
2298 
2299 
2300 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2301 	if (ret)
2302 		goto err_free_rxqueue;
2303 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2304 
2305 	rte_free(rx_adapter->eth_rx_poll);
2306 	rte_free(rx_adapter->wrr_sched);
2307 
2308 	rx_adapter->eth_rx_poll = rx_poll;
2309 	rx_adapter->wrr_sched = rx_wrr;
2310 	rx_adapter->wrr_len = nb_wrr;
2311 	rx_adapter->num_intr_vec += num_intr_vec;
2312 	return 0;
2313 
2314 err_free_rxqueue:
2315 	if (rx_queue == NULL) {
2316 		rte_free(dev_info->rx_queue);
2317 		dev_info->rx_queue = NULL;
2318 	}
2319 
2320 	rte_free(rx_poll);
2321 	rte_free(rx_wrr);
2322 
2323 	return ret;
2324 }
2325 
2326 static int
2327 rxa_ctrl(uint8_t id, int start)
2328 {
2329 	struct event_eth_rx_adapter *rx_adapter;
2330 	struct rte_eventdev *dev;
2331 	struct eth_device_info *dev_info;
2332 	uint32_t i;
2333 	int use_service = 0;
2334 	int stop = !start;
2335 
2336 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2337 	rx_adapter = rxa_id_to_adapter(id);
2338 	if (rx_adapter == NULL)
2339 		return -EINVAL;
2340 
2341 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2342 
2343 	RTE_ETH_FOREACH_DEV(i) {
2344 		dev_info = &rx_adapter->eth_devices[i];
2345 		/* if start  check for num dev queues */
2346 		if (start && !dev_info->nb_dev_queues)
2347 			continue;
2348 		/* if stop check if dev has been started */
2349 		if (stop && !dev_info->dev_rx_started)
2350 			continue;
2351 		use_service |= !dev_info->internal_event_port;
2352 		dev_info->dev_rx_started = start;
2353 		if (dev_info->internal_event_port == 0)
2354 			continue;
2355 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2356 						&rte_eth_devices[i]) :
2357 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2358 						&rte_eth_devices[i]);
2359 	}
2360 
2361 	if (use_service) {
2362 		rte_spinlock_lock(&rx_adapter->rx_lock);
2363 		rx_adapter->rxa_started = start;
2364 		rte_service_runstate_set(rx_adapter->service_id, start);
2365 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2366 	}
2367 
2368 	return 0;
2369 }
2370 
2371 static int
2372 rxa_create(uint8_t id, uint8_t dev_id,
2373 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2374 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2375 	   void *conf_arg)
2376 {
2377 	struct event_eth_rx_adapter *rx_adapter;
2378 	struct eth_event_enqueue_buffer *buf;
2379 	struct rte_event *events;
2380 	int ret;
2381 	int socket_id;
2382 	uint16_t i;
2383 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2384 	const uint8_t default_rss_key[] = {
2385 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2386 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2387 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2388 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2389 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2390 	};
2391 
2392 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2393 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2394 
2395 	if (conf_cb == NULL)
2396 		return -EINVAL;
2397 
2398 	if (event_eth_rx_adapter == NULL) {
2399 		ret = rte_event_eth_rx_adapter_init();
2400 		if (ret)
2401 			return ret;
2402 	}
2403 
2404 	rx_adapter = rxa_id_to_adapter(id);
2405 	if (rx_adapter != NULL) {
2406 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2407 		return -EEXIST;
2408 	}
2409 
2410 	socket_id = rte_event_dev_socket_id(dev_id);
2411 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2412 		"rte_event_eth_rx_adapter_%d",
2413 		id);
2414 
2415 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2416 			RTE_CACHE_LINE_SIZE, socket_id);
2417 	if (rx_adapter == NULL) {
2418 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2419 		return -ENOMEM;
2420 	}
2421 
2422 	rx_adapter->eventdev_id = dev_id;
2423 	rx_adapter->socket_id = socket_id;
2424 	rx_adapter->conf_cb = conf_cb;
2425 	rx_adapter->conf_arg = conf_arg;
2426 	rx_adapter->id = id;
2427 	TAILQ_INIT(&rx_adapter->vector_list);
2428 	strcpy(rx_adapter->mem_name, mem_name);
2429 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2430 					RTE_MAX_ETHPORTS *
2431 					sizeof(struct eth_device_info), 0,
2432 					socket_id);
2433 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2434 			(uint32_t *)rx_adapter->rss_key_be,
2435 			    RTE_DIM(default_rss_key));
2436 
2437 	if (rx_adapter->eth_devices == NULL) {
2438 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2439 		rte_free(rx_adapter);
2440 		return -ENOMEM;
2441 	}
2442 
2443 	rte_spinlock_init(&rx_adapter->rx_lock);
2444 
2445 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2446 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2447 
2448 	/* Rx adapter event buffer allocation */
2449 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2450 
2451 	if (!rx_adapter->use_queue_event_buf) {
2452 		buf = &rx_adapter->event_enqueue_buffer;
2453 		buf->events_size = rxa_params->event_buf_size;
2454 
2455 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2456 					    buf->events_size * sizeof(*events),
2457 					    0, socket_id);
2458 		if (events == NULL) {
2459 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2460 					 "for adapter event buffer");
2461 			rte_free(rx_adapter->eth_devices);
2462 			rte_free(rx_adapter);
2463 			return -ENOMEM;
2464 		}
2465 
2466 		rx_adapter->event_enqueue_buffer.events = events;
2467 	}
2468 
2469 	event_eth_rx_adapter[id] = rx_adapter;
2470 
2471 	if (conf_cb == rxa_default_conf_cb)
2472 		rx_adapter->default_cb_arg = 1;
2473 
2474 	if (rte_mbuf_dyn_rx_timestamp_register(
2475 			&event_eth_rx_timestamp_dynfield_offset,
2476 			&event_eth_rx_timestamp_dynflag) != 0) {
2477 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2478 		return -rte_errno;
2479 	}
2480 
2481 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2482 		conf_arg);
2483 	return 0;
2484 }
2485 
2486 int
2487 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2488 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2489 				void *conf_arg)
2490 {
2491 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2492 
2493 	/* use default values for adapter params */
2494 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2495 	rxa_params.use_queue_event_buf = false;
2496 
2497 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2498 }
2499 
2500 int
2501 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2502 			struct rte_event_port_conf *port_config,
2503 			struct rte_event_eth_rx_adapter_params *rxa_params)
2504 {
2505 	struct rte_event_port_conf *pc;
2506 	int ret;
2507 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2508 
2509 	if (port_config == NULL)
2510 		return -EINVAL;
2511 
2512 	if (rxa_params == NULL) {
2513 		/* use default values if rxa_params is NULL */
2514 		rxa_params = &temp_params;
2515 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2516 		rxa_params->use_queue_event_buf = false;
2517 	} else if ((!rxa_params->use_queue_event_buf &&
2518 		    rxa_params->event_buf_size == 0) ||
2519 		   (rxa_params->use_queue_event_buf &&
2520 		    rxa_params->event_buf_size != 0)) {
2521 		RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2522 		return -EINVAL;
2523 	} else if (!rxa_params->use_queue_event_buf) {
2524 		/* adjust event buff size with BATCH_SIZE used for fetching
2525 		 * packets from NIC rx queues to get full buffer utilization
2526 		 * and prevent unnecessary rollovers.
2527 		 */
2528 
2529 		rxa_params->event_buf_size =
2530 			RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2531 		rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2532 	}
2533 
2534 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2535 	if (pc == NULL)
2536 		return -ENOMEM;
2537 
2538 	*pc = *port_config;
2539 
2540 	ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2541 	if (ret)
2542 		rte_free(pc);
2543 
2544 	return ret;
2545 }
2546 
2547 int
2548 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2549 		struct rte_event_port_conf *port_config)
2550 {
2551 	struct rte_event_port_conf *pc;
2552 	int ret;
2553 
2554 	if (port_config == NULL)
2555 		return -EINVAL;
2556 
2557 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2558 
2559 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2560 	if (pc == NULL)
2561 		return -ENOMEM;
2562 	*pc = *port_config;
2563 
2564 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2565 					rxa_default_conf_cb,
2566 					pc);
2567 	if (ret)
2568 		rte_free(pc);
2569 	return ret;
2570 }
2571 
2572 int
2573 rte_event_eth_rx_adapter_free(uint8_t id)
2574 {
2575 	struct event_eth_rx_adapter *rx_adapter;
2576 
2577 	if (rxa_memzone_lookup())
2578 		return -ENOMEM;
2579 
2580 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2581 
2582 	rx_adapter = rxa_id_to_adapter(id);
2583 	if (rx_adapter == NULL)
2584 		return -EINVAL;
2585 
2586 	if (rx_adapter->nb_queues) {
2587 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2588 				rx_adapter->nb_queues);
2589 		return -EBUSY;
2590 	}
2591 
2592 	if (rx_adapter->default_cb_arg)
2593 		rte_free(rx_adapter->conf_arg);
2594 	rte_free(rx_adapter->eth_devices);
2595 	if (!rx_adapter->use_queue_event_buf)
2596 		rte_free(rx_adapter->event_enqueue_buffer.events);
2597 	rte_free(rx_adapter);
2598 	event_eth_rx_adapter[id] = NULL;
2599 
2600 	rte_eventdev_trace_eth_rx_adapter_free(id);
2601 	return 0;
2602 }
2603 
2604 int
2605 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2606 		uint16_t eth_dev_id,
2607 		int32_t rx_queue_id,
2608 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2609 {
2610 	int ret;
2611 	uint32_t cap;
2612 	struct event_eth_rx_adapter *rx_adapter;
2613 	struct rte_eventdev *dev;
2614 	struct eth_device_info *dev_info;
2615 	struct rte_event_eth_rx_adapter_vector_limits limits;
2616 
2617 	if (rxa_memzone_lookup())
2618 		return -ENOMEM;
2619 
2620 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2621 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2622 
2623 	rx_adapter = rxa_id_to_adapter(id);
2624 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2625 		return -EINVAL;
2626 
2627 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2628 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2629 						eth_dev_id,
2630 						&cap);
2631 	if (ret) {
2632 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2633 			"eth port %" PRIu16, id, eth_dev_id);
2634 		return ret;
2635 	}
2636 
2637 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2638 		&& (queue_conf->rx_queue_flags &
2639 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2640 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2641 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2642 				eth_dev_id, id);
2643 		return -EINVAL;
2644 	}
2645 
2646 	if (queue_conf->rx_queue_flags &
2647 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2648 
2649 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2650 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2651 					 " eth port: %" PRIu16
2652 					 " adapter id: %" PRIu8,
2653 					 eth_dev_id, id);
2654 			return -EINVAL;
2655 		}
2656 
2657 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2658 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2659 		if (ret < 0) {
2660 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2661 					 " eth port: %" PRIu16
2662 					 " adapter id: %" PRIu8,
2663 					 eth_dev_id, id);
2664 			return -EINVAL;
2665 		}
2666 		if (queue_conf->vector_sz < limits.min_sz ||
2667 		    queue_conf->vector_sz > limits.max_sz ||
2668 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2669 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2670 		    queue_conf->vector_mp == NULL) {
2671 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2672 					 " eth port: %" PRIu16
2673 					 " adapter id: %" PRIu8,
2674 					 eth_dev_id, id);
2675 			return -EINVAL;
2676 		}
2677 		if (queue_conf->vector_mp->elt_size <
2678 		    (sizeof(struct rte_event_vector) +
2679 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2680 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2681 					 " eth port: %" PRIu16
2682 					 " adapter id: %" PRIu8,
2683 					 eth_dev_id, id);
2684 			return -EINVAL;
2685 		}
2686 	}
2687 
2688 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2689 		(rx_queue_id != -1)) {
2690 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2691 			"event queue, eth port: %" PRIu16 " adapter id: %"
2692 			PRIu8, eth_dev_id, id);
2693 		return -EINVAL;
2694 	}
2695 
2696 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2697 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2698 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2699 			 (uint16_t)rx_queue_id);
2700 		return -EINVAL;
2701 	}
2702 
2703 	if ((rx_adapter->use_queue_event_buf &&
2704 	     queue_conf->event_buf_size == 0) ||
2705 	    (!rx_adapter->use_queue_event_buf &&
2706 	     queue_conf->event_buf_size != 0)) {
2707 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2708 		return -EINVAL;
2709 	}
2710 
2711 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2712 
2713 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2714 		if (*dev->dev_ops->eth_rx_adapter_queue_add == NULL)
2715 			return -ENOTSUP;
2716 		if (dev_info->rx_queue == NULL) {
2717 			dev_info->rx_queue =
2718 			    rte_zmalloc_socket(rx_adapter->mem_name,
2719 					dev_info->dev->data->nb_rx_queues *
2720 					sizeof(struct eth_rx_queue_info), 0,
2721 					rx_adapter->socket_id);
2722 			if (dev_info->rx_queue == NULL)
2723 				return -ENOMEM;
2724 		}
2725 
2726 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2727 				&rte_eth_devices[eth_dev_id],
2728 				rx_queue_id, queue_conf);
2729 		if (ret == 0) {
2730 			dev_info->internal_event_port = 1;
2731 			rxa_update_queue(rx_adapter,
2732 					&rx_adapter->eth_devices[eth_dev_id],
2733 					rx_queue_id,
2734 					1);
2735 		}
2736 	} else {
2737 		rte_spinlock_lock(&rx_adapter->rx_lock);
2738 		dev_info->internal_event_port = 0;
2739 		ret = rxa_init_service(rx_adapter, id);
2740 		if (ret == 0) {
2741 			uint32_t service_id = rx_adapter->service_id;
2742 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2743 					queue_conf);
2744 			rte_service_component_runstate_set(service_id,
2745 				rxa_sw_adapter_queue_count(rx_adapter));
2746 		}
2747 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2748 	}
2749 
2750 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2751 		rx_queue_id, queue_conf, ret);
2752 	if (ret)
2753 		return ret;
2754 
2755 	return 0;
2756 }
2757 
2758 static int
2759 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2760 {
2761 	limits->max_sz = MAX_VECTOR_SIZE;
2762 	limits->min_sz = MIN_VECTOR_SIZE;
2763 	limits->max_timeout_ns = MAX_VECTOR_NS;
2764 	limits->min_timeout_ns = MIN_VECTOR_NS;
2765 
2766 	return 0;
2767 }
2768 
2769 int
2770 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2771 				int32_t rx_queue_id)
2772 {
2773 	int ret = 0;
2774 	struct rte_eventdev *dev;
2775 	struct event_eth_rx_adapter *rx_adapter;
2776 	struct eth_device_info *dev_info;
2777 	uint32_t cap;
2778 	uint32_t nb_rx_poll = 0;
2779 	uint32_t nb_wrr = 0;
2780 	uint32_t nb_rx_intr;
2781 	struct eth_rx_poll_entry *rx_poll = NULL;
2782 	uint32_t *rx_wrr = NULL;
2783 	int num_intr_vec;
2784 
2785 	if (rxa_memzone_lookup())
2786 		return -ENOMEM;
2787 
2788 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2789 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2790 
2791 	rx_adapter = rxa_id_to_adapter(id);
2792 	if (rx_adapter == NULL)
2793 		return -EINVAL;
2794 
2795 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2796 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2797 						eth_dev_id,
2798 						&cap);
2799 	if (ret)
2800 		return ret;
2801 
2802 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2803 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2804 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2805 			 (uint16_t)rx_queue_id);
2806 		return -EINVAL;
2807 	}
2808 
2809 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2810 
2811 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2812 		if (*dev->dev_ops->eth_rx_adapter_queue_del == NULL)
2813 			return -ENOTSUP;
2814 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2815 						&rte_eth_devices[eth_dev_id],
2816 						rx_queue_id);
2817 		if (ret == 0) {
2818 			rxa_update_queue(rx_adapter,
2819 					&rx_adapter->eth_devices[eth_dev_id],
2820 					rx_queue_id,
2821 					0);
2822 			if (dev_info->nb_dev_queues == 0) {
2823 				rte_free(dev_info->rx_queue);
2824 				dev_info->rx_queue = NULL;
2825 			}
2826 		}
2827 	} else {
2828 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2829 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2830 
2831 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2832 			&rx_poll, &rx_wrr);
2833 		if (ret)
2834 			return ret;
2835 
2836 		rte_spinlock_lock(&rx_adapter->rx_lock);
2837 
2838 		num_intr_vec = 0;
2839 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2840 
2841 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2842 						rx_queue_id, 0);
2843 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2844 					rx_queue_id);
2845 			if (ret)
2846 				goto unlock_ret;
2847 		}
2848 
2849 		if (nb_rx_intr == 0) {
2850 			ret = rxa_free_intr_resources(rx_adapter);
2851 			if (ret)
2852 				goto unlock_ret;
2853 		}
2854 
2855 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2856 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2857 
2858 		rte_free(rx_adapter->eth_rx_poll);
2859 		rte_free(rx_adapter->wrr_sched);
2860 
2861 		if (nb_rx_intr == 0) {
2862 			rte_free(dev_info->intr_queue);
2863 			dev_info->intr_queue = NULL;
2864 		}
2865 
2866 		rx_adapter->eth_rx_poll = rx_poll;
2867 		rx_adapter->wrr_sched = rx_wrr;
2868 		rx_adapter->wrr_len = nb_wrr;
2869 		/*
2870 		 * reset next poll start position (wrr_pos) to avoid buffer
2871 		 * overrun when wrr_len is reduced in case of queue delete
2872 		 */
2873 		rx_adapter->wrr_pos = 0;
2874 		rx_adapter->num_intr_vec += num_intr_vec;
2875 
2876 		if (dev_info->nb_dev_queues == 0) {
2877 			rte_free(dev_info->rx_queue);
2878 			dev_info->rx_queue = NULL;
2879 		}
2880 unlock_ret:
2881 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2882 		if (ret) {
2883 			rte_free(rx_poll);
2884 			rte_free(rx_wrr);
2885 			return ret;
2886 		}
2887 
2888 		rte_service_component_runstate_set(rx_adapter->service_id,
2889 				rxa_sw_adapter_queue_count(rx_adapter));
2890 	}
2891 
2892 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2893 		rx_queue_id, ret);
2894 
2895 	return ret;
2896 }
2897 
2898 int
2899 rte_event_eth_rx_adapter_vector_limits_get(
2900 	uint8_t dev_id, uint16_t eth_port_id,
2901 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2902 {
2903 	struct rte_eventdev *dev;
2904 	uint32_t cap;
2905 	int ret;
2906 
2907 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2908 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2909 
2910 	if (limits == NULL)
2911 		return -EINVAL;
2912 
2913 	dev = &rte_eventdevs[dev_id];
2914 
2915 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2916 	if (ret) {
2917 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2918 				 "eth port %" PRIu16,
2919 				 dev_id, eth_port_id);
2920 		return ret;
2921 	}
2922 
2923 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2924 		if (*dev->dev_ops->eth_rx_adapter_vector_limits_get == NULL)
2925 			return -ENOTSUP;
2926 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2927 			dev, &rte_eth_devices[eth_port_id], limits);
2928 	} else {
2929 		ret = rxa_sw_vector_limits(limits);
2930 	}
2931 
2932 	return ret;
2933 }
2934 
2935 int
2936 rte_event_eth_rx_adapter_start(uint8_t id)
2937 {
2938 	rte_eventdev_trace_eth_rx_adapter_start(id);
2939 	return rxa_ctrl(id, 1);
2940 }
2941 
2942 int
2943 rte_event_eth_rx_adapter_stop(uint8_t id)
2944 {
2945 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2946 	return rxa_ctrl(id, 0);
2947 }
2948 
2949 static inline void
2950 rxa_queue_stats_reset(struct eth_rx_queue_info *queue_info)
2951 {
2952 	struct rte_event_eth_rx_adapter_stats *q_stats;
2953 
2954 	q_stats = queue_info->stats;
2955 	memset(q_stats, 0, sizeof(*q_stats));
2956 }
2957 
2958 int
2959 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2960 			       struct rte_event_eth_rx_adapter_stats *stats)
2961 {
2962 	struct event_eth_rx_adapter *rx_adapter;
2963 	struct eth_event_enqueue_buffer *buf;
2964 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2965 	struct rte_event_eth_rx_adapter_stats dev_stats;
2966 	struct rte_eventdev *dev;
2967 	struct eth_device_info *dev_info;
2968 	struct eth_rx_queue_info *queue_info;
2969 	struct rte_event_eth_rx_adapter_stats *q_stats;
2970 	uint32_t i, j;
2971 	int ret;
2972 
2973 	if (rxa_memzone_lookup())
2974 		return -ENOMEM;
2975 
2976 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2977 
2978 	rx_adapter = rxa_id_to_adapter(id);
2979 	if (rx_adapter  == NULL || stats == NULL)
2980 		return -EINVAL;
2981 
2982 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2983 	memset(stats, 0, sizeof(*stats));
2984 
2985 	if (rx_adapter->service_inited)
2986 		*stats = rx_adapter->stats;
2987 
2988 	RTE_ETH_FOREACH_DEV(i) {
2989 		dev_info = &rx_adapter->eth_devices[i];
2990 
2991 		if (rx_adapter->use_queue_event_buf && dev_info->rx_queue) {
2992 
2993 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
2994 			     j++) {
2995 				queue_info = &dev_info->rx_queue[j];
2996 				if (!queue_info->queue_enabled)
2997 					continue;
2998 				q_stats = queue_info->stats;
2999 
3000 				stats->rx_packets += q_stats->rx_packets;
3001 				stats->rx_poll_count += q_stats->rx_poll_count;
3002 				stats->rx_enq_count += q_stats->rx_enq_count;
3003 				stats->rx_enq_retry += q_stats->rx_enq_retry;
3004 				stats->rx_dropped += q_stats->rx_dropped;
3005 				stats->rx_enq_block_cycles +=
3006 						q_stats->rx_enq_block_cycles;
3007 			}
3008 		}
3009 
3010 		if (dev_info->internal_event_port == 0 ||
3011 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
3012 			continue;
3013 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
3014 						&rte_eth_devices[i],
3015 						&dev_stats);
3016 		if (ret)
3017 			continue;
3018 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
3019 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
3020 	}
3021 
3022 	buf = &rx_adapter->event_enqueue_buffer;
3023 	stats->rx_packets += dev_stats_sum.rx_packets;
3024 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
3025 	stats->rx_event_buf_count = buf->count;
3026 	stats->rx_event_buf_size = buf->events_size;
3027 
3028 	return 0;
3029 }
3030 
3031 int
3032 rte_event_eth_rx_adapter_queue_stats_get(uint8_t id,
3033 		uint16_t eth_dev_id,
3034 		uint16_t rx_queue_id,
3035 		struct rte_event_eth_rx_adapter_queue_stats *stats)
3036 {
3037 	struct event_eth_rx_adapter *rx_adapter;
3038 	struct eth_device_info *dev_info;
3039 	struct eth_rx_queue_info *queue_info;
3040 	struct eth_event_enqueue_buffer *event_buf;
3041 	struct rte_event_eth_rx_adapter_stats *q_stats;
3042 	struct rte_eventdev *dev;
3043 
3044 	if (rxa_memzone_lookup())
3045 		return -ENOMEM;
3046 
3047 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3048 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3049 
3050 	rx_adapter = rxa_id_to_adapter(id);
3051 
3052 	if (rx_adapter == NULL || stats == NULL)
3053 		return -EINVAL;
3054 
3055 	if (!rx_adapter->use_queue_event_buf)
3056 		return -EINVAL;
3057 
3058 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3059 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3060 		return -EINVAL;
3061 	}
3062 
3063 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3064 	if (dev_info->rx_queue == NULL ||
3065 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3066 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3067 		return -EINVAL;
3068 	}
3069 
3070 	if (dev_info->internal_event_port == 0) {
3071 		queue_info = &dev_info->rx_queue[rx_queue_id];
3072 		event_buf = queue_info->event_buf;
3073 		q_stats = queue_info->stats;
3074 
3075 		stats->rx_event_buf_count = event_buf->count;
3076 		stats->rx_event_buf_size = event_buf->events_size;
3077 		stats->rx_packets = q_stats->rx_packets;
3078 		stats->rx_poll_count = q_stats->rx_poll_count;
3079 		stats->rx_dropped = q_stats->rx_dropped;
3080 	}
3081 
3082 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3083 	if (dev->dev_ops->eth_rx_adapter_queue_stats_get != NULL) {
3084 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_get)(dev,
3085 						&rte_eth_devices[eth_dev_id],
3086 						rx_queue_id, stats);
3087 	}
3088 
3089 	return 0;
3090 }
3091 
3092 int
3093 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
3094 {
3095 	struct event_eth_rx_adapter *rx_adapter;
3096 	struct rte_eventdev *dev;
3097 	struct eth_device_info *dev_info;
3098 	struct eth_rx_queue_info *queue_info;
3099 	uint32_t i, j;
3100 
3101 	if (rxa_memzone_lookup())
3102 		return -ENOMEM;
3103 
3104 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3105 
3106 	rx_adapter = rxa_id_to_adapter(id);
3107 	if (rx_adapter == NULL)
3108 		return -EINVAL;
3109 
3110 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3111 
3112 	RTE_ETH_FOREACH_DEV(i) {
3113 		dev_info = &rx_adapter->eth_devices[i];
3114 
3115 		if (rx_adapter->use_queue_event_buf  && dev_info->rx_queue) {
3116 
3117 			for (j = 0; j < dev_info->dev->data->nb_rx_queues;
3118 						j++) {
3119 				queue_info = &dev_info->rx_queue[j];
3120 				if (!queue_info->queue_enabled)
3121 					continue;
3122 				rxa_queue_stats_reset(queue_info);
3123 			}
3124 		}
3125 
3126 		if (dev_info->internal_event_port == 0 ||
3127 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
3128 			continue;
3129 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
3130 							&rte_eth_devices[i]);
3131 	}
3132 
3133 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
3134 
3135 	return 0;
3136 }
3137 
3138 int
3139 rte_event_eth_rx_adapter_queue_stats_reset(uint8_t id,
3140 		uint16_t eth_dev_id,
3141 		uint16_t rx_queue_id)
3142 {
3143 	struct event_eth_rx_adapter *rx_adapter;
3144 	struct eth_device_info *dev_info;
3145 	struct eth_rx_queue_info *queue_info;
3146 	struct rte_eventdev *dev;
3147 
3148 	if (rxa_memzone_lookup())
3149 		return -ENOMEM;
3150 
3151 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3152 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3153 
3154 	rx_adapter = rxa_id_to_adapter(id);
3155 	if (rx_adapter == NULL)
3156 		return -EINVAL;
3157 
3158 	if (!rx_adapter->use_queue_event_buf)
3159 		return -EINVAL;
3160 
3161 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3162 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, rx_queue_id);
3163 		return -EINVAL;
3164 	}
3165 
3166 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3167 
3168 	if (dev_info->rx_queue == NULL ||
3169 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3170 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3171 		return -EINVAL;
3172 	}
3173 
3174 	if (dev_info->internal_event_port == 0) {
3175 		queue_info = &dev_info->rx_queue[rx_queue_id];
3176 		rxa_queue_stats_reset(queue_info);
3177 	}
3178 
3179 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3180 	if (dev->dev_ops->eth_rx_adapter_queue_stats_reset != NULL) {
3181 		return (*dev->dev_ops->eth_rx_adapter_queue_stats_reset)(dev,
3182 						&rte_eth_devices[eth_dev_id],
3183 						rx_queue_id);
3184 	}
3185 
3186 	return 0;
3187 }
3188 
3189 int
3190 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
3191 {
3192 	struct event_eth_rx_adapter *rx_adapter;
3193 
3194 	if (rxa_memzone_lookup())
3195 		return -ENOMEM;
3196 
3197 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3198 
3199 	rx_adapter = rxa_id_to_adapter(id);
3200 	if (rx_adapter == NULL || service_id == NULL)
3201 		return -EINVAL;
3202 
3203 	if (rx_adapter->service_inited)
3204 		*service_id = rx_adapter->service_id;
3205 
3206 	return rx_adapter->service_inited ? 0 : -ESRCH;
3207 }
3208 
3209 int
3210 rte_event_eth_rx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
3211 {
3212 	struct event_eth_rx_adapter *rx_adapter;
3213 
3214 	if (rxa_memzone_lookup())
3215 		return -ENOMEM;
3216 
3217 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3218 
3219 	rx_adapter = rxa_id_to_adapter(id);
3220 	if (rx_adapter == NULL || event_port_id == NULL)
3221 		return -EINVAL;
3222 
3223 	if (rx_adapter->service_inited)
3224 		*event_port_id = rx_adapter->event_port_id;
3225 
3226 	return rx_adapter->service_inited ? 0 : -ESRCH;
3227 }
3228 
3229 int
3230 rte_event_eth_rx_adapter_cb_register(uint8_t id,
3231 					uint16_t eth_dev_id,
3232 					rte_event_eth_rx_adapter_cb_fn cb_fn,
3233 					void *cb_arg)
3234 {
3235 	struct event_eth_rx_adapter *rx_adapter;
3236 	struct eth_device_info *dev_info;
3237 	uint32_t cap;
3238 	int ret;
3239 
3240 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3241 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3242 
3243 	rx_adapter = rxa_id_to_adapter(id);
3244 	if (rx_adapter == NULL)
3245 		return -EINVAL;
3246 
3247 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3248 	if (dev_info->rx_queue == NULL)
3249 		return -EINVAL;
3250 
3251 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3252 						eth_dev_id,
3253 						&cap);
3254 	if (ret) {
3255 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
3256 			"eth port %" PRIu16, id, eth_dev_id);
3257 		return ret;
3258 	}
3259 
3260 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3261 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
3262 				PRIu16, eth_dev_id);
3263 		return -EINVAL;
3264 	}
3265 
3266 	rte_spinlock_lock(&rx_adapter->rx_lock);
3267 	dev_info->cb_fn = cb_fn;
3268 	dev_info->cb_arg = cb_arg;
3269 	rte_spinlock_unlock(&rx_adapter->rx_lock);
3270 
3271 	return 0;
3272 }
3273 
3274 int
3275 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
3276 			uint16_t eth_dev_id,
3277 			uint16_t rx_queue_id,
3278 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
3279 {
3280 #define TICK2NSEC(_ticks, _freq) (((_ticks) * (1E9)) / (_freq))
3281 	struct rte_eventdev *dev;
3282 	struct event_eth_rx_adapter *rx_adapter;
3283 	struct eth_device_info *dev_info;
3284 	struct eth_rx_queue_info *queue_info;
3285 	int ret;
3286 
3287 	if (rxa_memzone_lookup())
3288 		return -ENOMEM;
3289 
3290 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3291 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3292 
3293 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3294 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3295 		return -EINVAL;
3296 	}
3297 
3298 	if (queue_conf == NULL) {
3299 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3300 		return -EINVAL;
3301 	}
3302 
3303 	rx_adapter = rxa_id_to_adapter(id);
3304 	if (rx_adapter == NULL)
3305 		return -EINVAL;
3306 
3307 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3308 	if (dev_info->rx_queue == NULL ||
3309 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3310 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3311 		return -EINVAL;
3312 	}
3313 
3314 	queue_info = &dev_info->rx_queue[rx_queue_id];
3315 
3316 	memset(queue_conf, 0, sizeof(*queue_conf));
3317 	queue_conf->rx_queue_flags = 0;
3318 	if (queue_info->flow_id_mask != 0)
3319 		queue_conf->rx_queue_flags |=
3320 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3321 	queue_conf->servicing_weight = queue_info->wt;
3322 
3323 	queue_conf->ev.event = queue_info->event;
3324 
3325 	queue_conf->vector_sz = queue_info->vector_data.max_vector_count;
3326 	queue_conf->vector_mp = queue_info->vector_data.vector_pool;
3327 	/* need to be converted from ticks to ns */
3328 	queue_conf->vector_timeout_ns = TICK2NSEC(
3329 		queue_info->vector_data.vector_timeout_ticks, rte_get_timer_hz());
3330 
3331 	if (queue_info->event_buf != NULL)
3332 		queue_conf->event_buf_size = queue_info->event_buf->events_size;
3333 	else
3334 		queue_conf->event_buf_size = 0;
3335 
3336 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3337 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3338 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3339 						&rte_eth_devices[eth_dev_id],
3340 						rx_queue_id,
3341 						queue_conf);
3342 		return ret;
3343 	}
3344 
3345 	return 0;
3346 }
3347 
3348 static int
3349 rxa_is_queue_added(struct event_eth_rx_adapter *rx_adapter,
3350 		   uint16_t eth_dev_id,
3351 		   uint16_t rx_queue_id)
3352 {
3353 	struct eth_device_info *dev_info;
3354 	struct eth_rx_queue_info *queue_info;
3355 
3356 	if (!rx_adapter->eth_devices)
3357 		return 0;
3358 
3359 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3360 	if (!dev_info || !dev_info->rx_queue)
3361 		return 0;
3362 
3363 	queue_info = &dev_info->rx_queue[rx_queue_id];
3364 
3365 	return queue_info && queue_info->queue_enabled;
3366 }
3367 
3368 #define rxa_evdev(rx_adapter) (&rte_eventdevs[(rx_adapter)->eventdev_id])
3369 
3370 #define rxa_dev_instance_get(rx_adapter) \
3371 		rxa_evdev((rx_adapter))->dev_ops->eth_rx_adapter_instance_get
3372 
3373 int
3374 rte_event_eth_rx_adapter_instance_get(uint16_t eth_dev_id,
3375 				      uint16_t rx_queue_id,
3376 				      uint8_t *rxa_inst_id)
3377 {
3378 	uint8_t id;
3379 	int ret = -EINVAL;
3380 	uint32_t caps;
3381 	struct event_eth_rx_adapter *rx_adapter;
3382 
3383 	if (rxa_memzone_lookup())
3384 		return -ENOMEM;
3385 
3386 	if (eth_dev_id >= rte_eth_dev_count_avail()) {
3387 		RTE_EDEV_LOG_ERR("Invalid ethernet port id %u", eth_dev_id);
3388 		return -EINVAL;
3389 	}
3390 
3391 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3392 		RTE_EDEV_LOG_ERR("Invalid Rx queue %u", rx_queue_id);
3393 		return -EINVAL;
3394 	}
3395 
3396 	if (rxa_inst_id == NULL) {
3397 		RTE_EDEV_LOG_ERR("rxa_inst_id cannot be NULL");
3398 		return -EINVAL;
3399 	}
3400 
3401 	/* Iterate through all adapter instances */
3402 	for (id = 0; id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; id++) {
3403 		rx_adapter = rxa_id_to_adapter(id);
3404 		if (!rx_adapter)
3405 			continue;
3406 
3407 		if (rxa_is_queue_added(rx_adapter, eth_dev_id, rx_queue_id)) {
3408 			*rxa_inst_id = rx_adapter->id;
3409 			ret = 0;
3410 		}
3411 
3412 		/* Rx adapter internally mainatains queue information
3413 		 * for both internal port and DPDK service port.
3414 		 * Eventdev PMD callback is called for future proof only and
3415 		 * overrides the above return value if defined.
3416 		 */
3417 		caps = 0;
3418 		if (!rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
3419 						      eth_dev_id,
3420 						      &caps)) {
3421 			if (caps & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
3422 				ret = rxa_dev_instance_get(rx_adapter) ?
3423 						rxa_dev_instance_get(rx_adapter)
3424 								(eth_dev_id,
3425 								 rx_queue_id,
3426 								 rxa_inst_id)
3427 							: -EINVAL;
3428 			}
3429 		}
3430 
3431 		/* return if entry found */
3432 		if (ret == 0)
3433 			return ret;
3434 	}
3435 
3436 	return -EINVAL;
3437 }
3438 
3439 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_u64(d, #s, stats.s)
3440 
3441 static int
3442 handle_rxa_stats(const char *cmd __rte_unused,
3443 		 const char *params,
3444 		 struct rte_tel_data *d)
3445 {
3446 	uint8_t rx_adapter_id;
3447 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3448 
3449 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3450 		return -1;
3451 
3452 	/* Get Rx adapter ID from parameter string */
3453 	rx_adapter_id = atoi(params);
3454 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3455 
3456 	/* Get Rx adapter stats */
3457 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3458 					       &rx_adptr_stats)) {
3459 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats\n");
3460 		return -1;
3461 	}
3462 
3463 	rte_tel_data_start_dict(d);
3464 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3465 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3466 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3467 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3468 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3469 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3470 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3471 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3472 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3473 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3474 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3475 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3476 
3477 	return 0;
3478 }
3479 
3480 static int
3481 handle_rxa_stats_reset(const char *cmd __rte_unused,
3482 		       const char *params,
3483 		       struct rte_tel_data *d __rte_unused)
3484 {
3485 	uint8_t rx_adapter_id;
3486 
3487 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3488 		return -1;
3489 
3490 	/* Get Rx adapter ID from parameter string */
3491 	rx_adapter_id = atoi(params);
3492 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3493 
3494 	/* Reset Rx adapter stats */
3495 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3496 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats\n");
3497 		return -1;
3498 	}
3499 
3500 	return 0;
3501 }
3502 
3503 static int
3504 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3505 			  const char *params,
3506 			  struct rte_tel_data *d)
3507 {
3508 	uint8_t rx_adapter_id;
3509 	uint16_t rx_queue_id;
3510 	int eth_dev_id, ret = -1;
3511 	char *token, *l_params;
3512 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3513 
3514 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3515 		return -1;
3516 
3517 	/* Get Rx adapter ID from parameter string */
3518 	l_params = strdup(params);
3519 	if (l_params == NULL)
3520 		return -ENOMEM;
3521 	token = strtok(l_params, ",");
3522 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3523 	rx_adapter_id = strtoul(token, NULL, 10);
3524 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3525 
3526 	token = strtok(NULL, ",");
3527 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3528 
3529 	/* Get device ID from parameter string */
3530 	eth_dev_id = strtoul(token, NULL, 10);
3531 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3532 
3533 	token = strtok(NULL, ",");
3534 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3535 
3536 	/* Get Rx queue ID from parameter string */
3537 	rx_queue_id = strtoul(token, NULL, 10);
3538 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3539 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3540 		ret = -EINVAL;
3541 		goto error;
3542 	}
3543 
3544 	token = strtok(NULL, "\0");
3545 	if (token != NULL)
3546 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3547 				 " telemetry command, ignoring");
3548 	/* Parsing parameter finished */
3549 	free(l_params);
3550 
3551 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3552 						    rx_queue_id, &queue_conf)) {
3553 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3554 		return -1;
3555 	}
3556 
3557 	rte_tel_data_start_dict(d);
3558 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3559 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3560 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3561 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3562 	RXA_ADD_DICT(queue_conf, servicing_weight);
3563 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3564 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3565 	RXA_ADD_DICT(queue_conf.ev, priority);
3566 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3567 
3568 	return 0;
3569 
3570 error:
3571 	free(l_params);
3572 	return ret;
3573 }
3574 
3575 static int
3576 handle_rxa_get_queue_stats(const char *cmd __rte_unused,
3577 			   const char *params,
3578 			   struct rte_tel_data *d)
3579 {
3580 	uint8_t rx_adapter_id;
3581 	uint16_t rx_queue_id;
3582 	int eth_dev_id, ret = -1;
3583 	char *token, *l_params;
3584 	struct rte_event_eth_rx_adapter_queue_stats q_stats;
3585 
3586 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3587 		return -1;
3588 
3589 	/* Get Rx adapter ID from parameter string */
3590 	l_params = strdup(params);
3591 	if (l_params == NULL)
3592 		return -ENOMEM;
3593 	token = strtok(l_params, ",");
3594 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3595 	rx_adapter_id = strtoul(token, NULL, 10);
3596 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3597 
3598 	token = strtok(NULL, ",");
3599 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3600 
3601 	/* Get device ID from parameter string */
3602 	eth_dev_id = strtoul(token, NULL, 10);
3603 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3604 
3605 	token = strtok(NULL, ",");
3606 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3607 
3608 	/* Get Rx queue ID from parameter string */
3609 	rx_queue_id = strtoul(token, NULL, 10);
3610 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3611 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3612 		ret = -EINVAL;
3613 		goto error;
3614 	}
3615 
3616 	token = strtok(NULL, "\0");
3617 	if (token != NULL)
3618 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3619 				 " telemetry command, ignoring");
3620 	/* Parsing parameter finished */
3621 	free(l_params);
3622 
3623 	if (rte_event_eth_rx_adapter_queue_stats_get(rx_adapter_id, eth_dev_id,
3624 						    rx_queue_id, &q_stats)) {
3625 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue stats");
3626 		return -1;
3627 	}
3628 
3629 	rte_tel_data_start_dict(d);
3630 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3631 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3632 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3633 	RXA_ADD_DICT(q_stats, rx_event_buf_count);
3634 	RXA_ADD_DICT(q_stats, rx_event_buf_size);
3635 	RXA_ADD_DICT(q_stats, rx_poll_count);
3636 	RXA_ADD_DICT(q_stats, rx_packets);
3637 	RXA_ADD_DICT(q_stats, rx_dropped);
3638 
3639 	return 0;
3640 
3641 error:
3642 	free(l_params);
3643 	return ret;
3644 }
3645 
3646 static int
3647 handle_rxa_queue_stats_reset(const char *cmd __rte_unused,
3648 			     const char *params,
3649 			     struct rte_tel_data *d __rte_unused)
3650 {
3651 	uint8_t rx_adapter_id;
3652 	uint16_t rx_queue_id;
3653 	int eth_dev_id, ret = -1;
3654 	char *token, *l_params;
3655 
3656 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3657 		return -1;
3658 
3659 	/* Get Rx adapter ID from parameter string */
3660 	l_params = strdup(params);
3661 	if (l_params == NULL)
3662 		return -ENOMEM;
3663 	token = strtok(l_params, ",");
3664 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3665 	rx_adapter_id = strtoul(token, NULL, 10);
3666 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_GOTO_ERR_RET(rx_adapter_id, -EINVAL);
3667 
3668 	token = strtok(NULL, ",");
3669 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3670 
3671 	/* Get device ID from parameter string */
3672 	eth_dev_id = strtoul(token, NULL, 10);
3673 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3674 
3675 	token = strtok(NULL, ",");
3676 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3677 
3678 	/* Get Rx queue ID from parameter string */
3679 	rx_queue_id = strtoul(token, NULL, 10);
3680 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3681 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3682 		ret = -EINVAL;
3683 		goto error;
3684 	}
3685 
3686 	token = strtok(NULL, "\0");
3687 	if (token != NULL)
3688 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3689 				 " telemetry command, ignoring");
3690 	/* Parsing parameter finished */
3691 	free(l_params);
3692 
3693 	if (rte_event_eth_rx_adapter_queue_stats_reset(rx_adapter_id,
3694 						       eth_dev_id,
3695 						       rx_queue_id)) {
3696 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter queue stats");
3697 		return -1;
3698 	}
3699 
3700 	return 0;
3701 
3702 error:
3703 	free(l_params);
3704 	return ret;
3705 }
3706 
3707 static int
3708 handle_rxa_instance_get(const char *cmd __rte_unused,
3709 			const char *params,
3710 			struct rte_tel_data *d)
3711 {
3712 	uint8_t instance_id;
3713 	uint16_t rx_queue_id;
3714 	int eth_dev_id, ret = -1;
3715 	char *token, *l_params;
3716 
3717 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3718 		return -1;
3719 
3720 	l_params = strdup(params);
3721 	if (l_params == NULL)
3722 		return -ENOMEM;
3723 	token = strtok(l_params, ",");
3724 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3725 
3726 	/* Get device ID from parameter string */
3727 	eth_dev_id = strtoul(token, NULL, 10);
3728 	RTE_ETH_VALID_PORTID_OR_GOTO_ERR_RET(eth_dev_id, -EINVAL);
3729 
3730 	token = strtok(NULL, ",");
3731 	RTE_EVENT_ETH_RX_ADAPTER_TOKEN_VALID_OR_GOTO_ERR_RET(token, -1);
3732 
3733 	/* Get Rx queue ID from parameter string */
3734 	rx_queue_id = strtoul(token, NULL, 10);
3735 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3736 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3737 		ret = -EINVAL;
3738 		goto error;
3739 	}
3740 
3741 	token = strtok(NULL, "\0");
3742 	if (token != NULL)
3743 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3744 				 " telemetry command, ignoring");
3745 
3746 	/* Parsing parameter finished */
3747 	free(l_params);
3748 
3749 	if (rte_event_eth_rx_adapter_instance_get(eth_dev_id,
3750 						  rx_queue_id,
3751 						  &instance_id)) {
3752 		RTE_EDEV_LOG_ERR("Failed to get RX adapter instance ID "
3753 				 " for rx_queue_id = %d", rx_queue_id);
3754 		return -1;
3755 	}
3756 
3757 	rte_tel_data_start_dict(d);
3758 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3759 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3760 	rte_tel_data_add_dict_u64(d, "rxa_instance_id", instance_id);
3761 
3762 	return 0;
3763 
3764 error:
3765 	free(l_params);
3766 	return ret;
3767 }
3768 
3769 RTE_INIT(rxa_init_telemetry)
3770 {
3771 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3772 		handle_rxa_stats,
3773 		"Returns Rx adapter stats. Parameter: rxa_id");
3774 
3775 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3776 		handle_rxa_stats_reset,
3777 		"Reset Rx adapter stats. Parameter: rxa_id");
3778 
3779 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3780 		handle_rxa_get_queue_conf,
3781 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3782 
3783 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats",
3784 		handle_rxa_get_queue_stats,
3785 		"Returns Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3786 
3787 	rte_telemetry_register_cmd("/eventdev/rxa_queue_stats_reset",
3788 		handle_rxa_queue_stats_reset,
3789 		"Reset Rx queue stats. Parameter: rxa_id, dev_id, queue_id");
3790 
3791 	rte_telemetry_register_cmd("/eventdev/rxa_rxq_instance_get",
3792 		handle_rxa_instance_get,
3793 		"Returns Rx adapter instance id. Parameter: dev_id, queue_id");
3794 }
3795