xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 5208d68d30cbc36dc453703e58084f33af0320ef)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9 
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
21 #include <rte_telemetry.h>
22 
23 #include "rte_eventdev.h"
24 #include "eventdev_pmd.h"
25 #include "eventdev_trace.h"
26 #include "rte_event_eth_rx_adapter.h"
27 
28 #define BATCH_SIZE		32
29 #define BLOCK_CNT_THRESHOLD	10
30 #define ETH_EVENT_BUFFER_SIZE	(6*BATCH_SIZE)
31 #define MAX_VECTOR_SIZE		1024
32 #define MIN_VECTOR_SIZE		4
33 #define MAX_VECTOR_NS		1E9
34 #define MIN_VECTOR_NS		1E5
35 
36 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
37 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
38 
39 #define RSS_KEY_SIZE	40
40 /* value written to intr thread pipe to signal thread exit */
41 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
42 /* Sentinel value to detect initialized file handle */
43 #define INIT_FD		-1
44 
45 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
46 
47 /*
48  * Used to store port and queue ID of interrupting Rx queue
49  */
50 union queue_data {
51 	RTE_STD_C11
52 	void *ptr;
53 	struct {
54 		uint16_t port;
55 		uint16_t queue;
56 	};
57 };
58 
59 /*
60  * There is an instance of this struct per polled Rx queue added to the
61  * adapter
62  */
63 struct eth_rx_poll_entry {
64 	/* Eth port to poll */
65 	uint16_t eth_dev_id;
66 	/* Eth rx queue to poll */
67 	uint16_t eth_rx_qid;
68 };
69 
70 struct eth_rx_vector_data {
71 	TAILQ_ENTRY(eth_rx_vector_data) next;
72 	uint16_t port;
73 	uint16_t queue;
74 	uint16_t max_vector_count;
75 	uint64_t event;
76 	uint64_t ts;
77 	uint64_t vector_timeout_ticks;
78 	struct rte_mempool *vector_pool;
79 	struct rte_event_vector *vector_ev;
80 } __rte_cache_aligned;
81 
82 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
83 
84 /* Instance per adapter */
85 struct eth_event_enqueue_buffer {
86 	/* Count of events in this buffer */
87 	uint16_t count;
88 	/* Array of events in this buffer */
89 	struct rte_event *events;
90 	/* size of event buffer */
91 	uint16_t events_size;
92 	/* Event enqueue happens from head */
93 	uint16_t head;
94 	/* New packets from rte_eth_rx_burst is enqued from tail */
95 	uint16_t tail;
96 	/* last element in the buffer before rollover */
97 	uint16_t last;
98 	uint16_t last_mask;
99 };
100 
101 struct event_eth_rx_adapter {
102 	/* RSS key */
103 	uint8_t rss_key_be[RSS_KEY_SIZE];
104 	/* Event device identifier */
105 	uint8_t eventdev_id;
106 	/* Event port identifier */
107 	uint8_t event_port_id;
108 	/* Flag indicating per rxq event buffer */
109 	bool use_queue_event_buf;
110 	/* Per ethernet device structure */
111 	struct eth_device_info *eth_devices;
112 	/* Lock to serialize config updates with service function */
113 	rte_spinlock_t rx_lock;
114 	/* Max mbufs processed in any service function invocation */
115 	uint32_t max_nb_rx;
116 	/* Receive queues that need to be polled */
117 	struct eth_rx_poll_entry *eth_rx_poll;
118 	/* Size of the eth_rx_poll array */
119 	uint16_t num_rx_polled;
120 	/* Weighted round robin schedule */
121 	uint32_t *wrr_sched;
122 	/* wrr_sched[] size */
123 	uint32_t wrr_len;
124 	/* Next entry in wrr[] to begin polling */
125 	uint32_t wrr_pos;
126 	/* Event burst buffer */
127 	struct eth_event_enqueue_buffer event_enqueue_buffer;
128 	/* Vector enable flag */
129 	uint8_t ena_vector;
130 	/* Timestamp of previous vector expiry list traversal */
131 	uint64_t prev_expiry_ts;
132 	/* Minimum ticks to wait before traversing expiry list */
133 	uint64_t vector_tmo_ticks;
134 	/* vector list */
135 	struct eth_rx_vector_data_list vector_list;
136 	/* Per adapter stats */
137 	struct rte_event_eth_rx_adapter_stats stats;
138 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
139 	uint16_t enq_block_count;
140 	/* Block start ts */
141 	uint64_t rx_enq_block_start_ts;
142 	/* epoll fd used to wait for Rx interrupts */
143 	int epd;
144 	/* Num of interrupt driven interrupt queues */
145 	uint32_t num_rx_intr;
146 	/* Used to send <dev id, queue id> of interrupting Rx queues from
147 	 * the interrupt thread to the Rx thread
148 	 */
149 	struct rte_ring *intr_ring;
150 	/* Rx Queue data (dev id, queue id) for the last non-empty
151 	 * queue polled
152 	 */
153 	union queue_data qd;
154 	/* queue_data is valid */
155 	int qd_valid;
156 	/* Interrupt ring lock, synchronizes Rx thread
157 	 * and interrupt thread
158 	 */
159 	rte_spinlock_t intr_ring_lock;
160 	/* event array passed to rte_poll_wait */
161 	struct rte_epoll_event *epoll_events;
162 	/* Count of interrupt vectors in use */
163 	uint32_t num_intr_vec;
164 	/* Thread blocked on Rx interrupts */
165 	pthread_t rx_intr_thread;
166 	/* Configuration callback for rte_service configuration */
167 	rte_event_eth_rx_adapter_conf_cb conf_cb;
168 	/* Configuration callback argument */
169 	void *conf_arg;
170 	/* Set if  default_cb is being used */
171 	int default_cb_arg;
172 	/* Service initialization state */
173 	uint8_t service_inited;
174 	/* Total count of Rx queues in adapter */
175 	uint32_t nb_queues;
176 	/* Memory allocation name */
177 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
178 	/* Socket identifier cached from eventdev */
179 	int socket_id;
180 	/* Per adapter EAL service */
181 	uint32_t service_id;
182 	/* Adapter started flag */
183 	uint8_t rxa_started;
184 	/* Adapter ID */
185 	uint8_t id;
186 } __rte_cache_aligned;
187 
188 /* Per eth device */
189 struct eth_device_info {
190 	struct rte_eth_dev *dev;
191 	struct eth_rx_queue_info *rx_queue;
192 	/* Rx callback */
193 	rte_event_eth_rx_adapter_cb_fn cb_fn;
194 	/* Rx callback argument */
195 	void *cb_arg;
196 	/* Set if ethdev->eventdev packet transfer uses a
197 	 * hardware mechanism
198 	 */
199 	uint8_t internal_event_port;
200 	/* Set if the adapter is processing rx queues for
201 	 * this eth device and packet processing has been
202 	 * started, allows for the code to know if the PMD
203 	 * rx_adapter_stop callback needs to be invoked
204 	 */
205 	uint8_t dev_rx_started;
206 	/* Number of queues added for this device */
207 	uint16_t nb_dev_queues;
208 	/* Number of poll based queues
209 	 * If nb_rx_poll > 0, the start callback will
210 	 * be invoked if not already invoked
211 	 */
212 	uint16_t nb_rx_poll;
213 	/* Number of interrupt based queues
214 	 * If nb_rx_intr > 0, the start callback will
215 	 * be invoked if not already invoked.
216 	 */
217 	uint16_t nb_rx_intr;
218 	/* Number of queues that use the shared interrupt */
219 	uint16_t nb_shared_intr;
220 	/* sum(wrr(q)) for all queues within the device
221 	 * useful when deleting all device queues
222 	 */
223 	uint32_t wrr_len;
224 	/* Intr based queue index to start polling from, this is used
225 	 * if the number of shared interrupts is non-zero
226 	 */
227 	uint16_t next_q_idx;
228 	/* Intr based queue indices */
229 	uint16_t *intr_queue;
230 	/* device generates per Rx queue interrupt for queue index
231 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
232 	 */
233 	int multi_intr_cap;
234 	/* shared interrupt enabled */
235 	int shared_intr_enabled;
236 };
237 
238 /* Per Rx queue */
239 struct eth_rx_queue_info {
240 	int queue_enabled;	/* True if added */
241 	int intr_enabled;
242 	uint8_t ena_vector;
243 	uint16_t wt;		/* Polling weight */
244 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
245 	uint64_t event;
246 	struct eth_rx_vector_data vector_data;
247 	struct eth_event_enqueue_buffer *event_buf;
248 };
249 
250 static struct event_eth_rx_adapter **event_eth_rx_adapter;
251 
252 /* Enable dynamic timestamp field in mbuf */
253 static uint64_t event_eth_rx_timestamp_dynflag;
254 static int event_eth_rx_timestamp_dynfield_offset = -1;
255 
256 static inline rte_mbuf_timestamp_t *
257 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
258 {
259 	return RTE_MBUF_DYNFIELD(mbuf,
260 		event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
261 }
262 
263 static inline int
264 rxa_validate_id(uint8_t id)
265 {
266 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
267 }
268 
269 static inline struct eth_event_enqueue_buffer *
270 rxa_event_buf_get(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
271 		  uint16_t rx_queue_id)
272 {
273 	if (rx_adapter->use_queue_event_buf) {
274 		struct eth_device_info *dev_info =
275 			&rx_adapter->eth_devices[eth_dev_id];
276 		return dev_info->rx_queue[rx_queue_id].event_buf;
277 	} else
278 		return &rx_adapter->event_enqueue_buffer;
279 }
280 
281 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
282 	if (!rxa_validate_id(id)) { \
283 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
284 		return retval; \
285 	} \
286 } while (0)
287 
288 static inline int
289 rxa_sw_adapter_queue_count(struct event_eth_rx_adapter *rx_adapter)
290 {
291 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
292 }
293 
294 /* Greatest common divisor */
295 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
296 {
297 	uint16_t r = a % b;
298 
299 	return r ? rxa_gcd_u16(b, r) : b;
300 }
301 
302 /* Returns the next queue in the polling sequence
303  *
304  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
305  */
306 static int
307 rxa_wrr_next(struct event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw,
308 	     struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
309 	     uint16_t gcd, int prev)
310 {
311 	int i = prev;
312 	uint16_t w;
313 
314 	while (1) {
315 		uint16_t q;
316 		uint16_t d;
317 
318 		i = (i + 1) % n;
319 		if (i == 0) {
320 			*cw = *cw - gcd;
321 			if (*cw <= 0)
322 				*cw = max_wt;
323 		}
324 
325 		q = eth_rx_poll[i].eth_rx_qid;
326 		d = eth_rx_poll[i].eth_dev_id;
327 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
328 
329 		if ((int)w >= *cw)
330 			return i;
331 	}
332 }
333 
334 static inline int
335 rxa_shared_intr(struct eth_device_info *dev_info,
336 	int rx_queue_id)
337 {
338 	int multi_intr_cap;
339 
340 	if (dev_info->dev->intr_handle == NULL)
341 		return 0;
342 
343 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
344 	return !multi_intr_cap ||
345 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
346 }
347 
348 static inline int
349 rxa_intr_queue(struct eth_device_info *dev_info,
350 	int rx_queue_id)
351 {
352 	struct eth_rx_queue_info *queue_info;
353 
354 	queue_info = &dev_info->rx_queue[rx_queue_id];
355 	return dev_info->rx_queue &&
356 		!dev_info->internal_event_port &&
357 		queue_info->queue_enabled && queue_info->wt == 0;
358 }
359 
360 static inline int
361 rxa_polled_queue(struct eth_device_info *dev_info,
362 	int rx_queue_id)
363 {
364 	struct eth_rx_queue_info *queue_info;
365 
366 	queue_info = &dev_info->rx_queue[rx_queue_id];
367 	return !dev_info->internal_event_port &&
368 		dev_info->rx_queue &&
369 		queue_info->queue_enabled && queue_info->wt != 0;
370 }
371 
372 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
373 static int
374 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
375 {
376 	uint16_t i;
377 	int n, s;
378 	uint16_t nbq;
379 
380 	nbq = dev_info->dev->data->nb_rx_queues;
381 	n = 0; /* non shared count */
382 	s = 0; /* shared count */
383 
384 	if (rx_queue_id == -1) {
385 		for (i = 0; i < nbq; i++) {
386 			if (!rxa_shared_intr(dev_info, i))
387 				n += add ? !rxa_intr_queue(dev_info, i) :
388 					rxa_intr_queue(dev_info, i);
389 			else
390 				s += add ? !rxa_intr_queue(dev_info, i) :
391 					rxa_intr_queue(dev_info, i);
392 		}
393 
394 		if (s > 0) {
395 			if ((add && dev_info->nb_shared_intr == 0) ||
396 				(!add && dev_info->nb_shared_intr))
397 				n += 1;
398 		}
399 	} else {
400 		if (!rxa_shared_intr(dev_info, rx_queue_id))
401 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
402 				rxa_intr_queue(dev_info, rx_queue_id);
403 		else
404 			n = add ? !dev_info->nb_shared_intr :
405 				dev_info->nb_shared_intr == 1;
406 	}
407 
408 	return add ? n : -n;
409 }
410 
411 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
412  */
413 static void
414 rxa_calc_nb_post_intr_del(struct event_eth_rx_adapter *rx_adapter,
415 			  struct eth_device_info *dev_info, int rx_queue_id,
416 			  uint32_t *nb_rx_intr)
417 {
418 	uint32_t intr_diff;
419 
420 	if (rx_queue_id == -1)
421 		intr_diff = dev_info->nb_rx_intr;
422 	else
423 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
424 
425 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
426 }
427 
428 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
429  * interrupt queues could currently be poll mode Rx queues
430  */
431 static void
432 rxa_calc_nb_post_add_intr(struct event_eth_rx_adapter *rx_adapter,
433 			  struct eth_device_info *dev_info, int rx_queue_id,
434 			  uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
435 			  uint32_t *nb_wrr)
436 {
437 	uint32_t intr_diff;
438 	uint32_t poll_diff;
439 	uint32_t wrr_len_diff;
440 
441 	if (rx_queue_id == -1) {
442 		intr_diff = dev_info->dev->data->nb_rx_queues -
443 						dev_info->nb_rx_intr;
444 		poll_diff = dev_info->nb_rx_poll;
445 		wrr_len_diff = dev_info->wrr_len;
446 	} else {
447 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
448 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
449 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
450 					0;
451 	}
452 
453 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
454 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
455 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
456 }
457 
458 /* Calculate size of the eth_rx_poll and wrr_sched arrays
459  * after deleting poll mode rx queues
460  */
461 static void
462 rxa_calc_nb_post_poll_del(struct event_eth_rx_adapter *rx_adapter,
463 			  struct eth_device_info *dev_info, int rx_queue_id,
464 			  uint32_t *nb_rx_poll, uint32_t *nb_wrr)
465 {
466 	uint32_t poll_diff;
467 	uint32_t wrr_len_diff;
468 
469 	if (rx_queue_id == -1) {
470 		poll_diff = dev_info->nb_rx_poll;
471 		wrr_len_diff = dev_info->wrr_len;
472 	} else {
473 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
474 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
475 					0;
476 	}
477 
478 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
479 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
480 }
481 
482 /* Calculate nb_rx_* after adding poll mode rx queues
483  */
484 static void
485 rxa_calc_nb_post_add_poll(struct event_eth_rx_adapter *rx_adapter,
486 			  struct eth_device_info *dev_info, int rx_queue_id,
487 			  uint16_t wt, uint32_t *nb_rx_poll,
488 			  uint32_t *nb_rx_intr, uint32_t *nb_wrr)
489 {
490 	uint32_t intr_diff;
491 	uint32_t poll_diff;
492 	uint32_t wrr_len_diff;
493 
494 	if (rx_queue_id == -1) {
495 		intr_diff = dev_info->nb_rx_intr;
496 		poll_diff = dev_info->dev->data->nb_rx_queues -
497 						dev_info->nb_rx_poll;
498 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
499 				- dev_info->wrr_len;
500 	} else {
501 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
502 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
503 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
504 				wt - dev_info->rx_queue[rx_queue_id].wt :
505 				wt;
506 	}
507 
508 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
509 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
510 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
511 }
512 
513 /* Calculate nb_rx_* after adding rx_queue_id */
514 static void
515 rxa_calc_nb_post_add(struct event_eth_rx_adapter *rx_adapter,
516 		     struct eth_device_info *dev_info, int rx_queue_id,
517 		     uint16_t wt, uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
518 		     uint32_t *nb_wrr)
519 {
520 	if (wt != 0)
521 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
522 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
523 	else
524 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
525 					nb_rx_poll, nb_rx_intr, nb_wrr);
526 }
527 
528 /* Calculate nb_rx_* after deleting rx_queue_id */
529 static void
530 rxa_calc_nb_post_del(struct event_eth_rx_adapter *rx_adapter,
531 		     struct eth_device_info *dev_info, int rx_queue_id,
532 		     uint32_t *nb_rx_poll, uint32_t *nb_rx_intr,
533 		     uint32_t *nb_wrr)
534 {
535 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
536 				nb_wrr);
537 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
538 				nb_rx_intr);
539 }
540 
541 /*
542  * Allocate the rx_poll array
543  */
544 static struct eth_rx_poll_entry *
545 rxa_alloc_poll(struct event_eth_rx_adapter *rx_adapter, uint32_t num_rx_polled)
546 {
547 	size_t len;
548 
549 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
550 							RTE_CACHE_LINE_SIZE);
551 	return  rte_zmalloc_socket(rx_adapter->mem_name,
552 				len,
553 				RTE_CACHE_LINE_SIZE,
554 				rx_adapter->socket_id);
555 }
556 
557 /*
558  * Allocate the WRR array
559  */
560 static uint32_t *
561 rxa_alloc_wrr(struct event_eth_rx_adapter *rx_adapter, int nb_wrr)
562 {
563 	size_t len;
564 
565 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
566 			RTE_CACHE_LINE_SIZE);
567 	return  rte_zmalloc_socket(rx_adapter->mem_name,
568 				len,
569 				RTE_CACHE_LINE_SIZE,
570 				rx_adapter->socket_id);
571 }
572 
573 static int
574 rxa_alloc_poll_arrays(struct event_eth_rx_adapter *rx_adapter, uint32_t nb_poll,
575 		      uint32_t nb_wrr, struct eth_rx_poll_entry **rx_poll,
576 		      uint32_t **wrr_sched)
577 {
578 
579 	if (nb_poll == 0) {
580 		*rx_poll = NULL;
581 		*wrr_sched = NULL;
582 		return 0;
583 	}
584 
585 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
586 	if (*rx_poll == NULL) {
587 		*wrr_sched = NULL;
588 		return -ENOMEM;
589 	}
590 
591 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
592 	if (*wrr_sched == NULL) {
593 		rte_free(*rx_poll);
594 		return -ENOMEM;
595 	}
596 	return 0;
597 }
598 
599 /* Precalculate WRR polling sequence for all queues in rx_adapter */
600 static void
601 rxa_calc_wrr_sequence(struct event_eth_rx_adapter *rx_adapter,
602 		      struct eth_rx_poll_entry *rx_poll, uint32_t *rx_wrr)
603 {
604 	uint16_t d;
605 	uint16_t q;
606 	unsigned int i;
607 	int prev = -1;
608 	int cw = -1;
609 
610 	/* Initialize variables for calculation of wrr schedule */
611 	uint16_t max_wrr_pos = 0;
612 	unsigned int poll_q = 0;
613 	uint16_t max_wt = 0;
614 	uint16_t gcd = 0;
615 
616 	if (rx_poll == NULL)
617 		return;
618 
619 	/* Generate array of all queues to poll, the size of this
620 	 * array is poll_q
621 	 */
622 	RTE_ETH_FOREACH_DEV(d) {
623 		uint16_t nb_rx_queues;
624 		struct eth_device_info *dev_info =
625 				&rx_adapter->eth_devices[d];
626 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
627 		if (dev_info->rx_queue == NULL)
628 			continue;
629 		if (dev_info->internal_event_port)
630 			continue;
631 		dev_info->wrr_len = 0;
632 		for (q = 0; q < nb_rx_queues; q++) {
633 			struct eth_rx_queue_info *queue_info =
634 				&dev_info->rx_queue[q];
635 			uint16_t wt;
636 
637 			if (!rxa_polled_queue(dev_info, q))
638 				continue;
639 			wt = queue_info->wt;
640 			rx_poll[poll_q].eth_dev_id = d;
641 			rx_poll[poll_q].eth_rx_qid = q;
642 			max_wrr_pos += wt;
643 			dev_info->wrr_len += wt;
644 			max_wt = RTE_MAX(max_wt, wt);
645 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
646 			poll_q++;
647 		}
648 	}
649 
650 	/* Generate polling sequence based on weights */
651 	prev = -1;
652 	cw = -1;
653 	for (i = 0; i < max_wrr_pos; i++) {
654 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
655 				     rx_poll, max_wt, gcd, prev);
656 		prev = rx_wrr[i];
657 	}
658 }
659 
660 static inline void
661 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
662 	struct rte_ipv6_hdr **ipv6_hdr)
663 {
664 	struct rte_ether_hdr *eth_hdr =
665 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
666 	struct rte_vlan_hdr *vlan_hdr;
667 
668 	*ipv4_hdr = NULL;
669 	*ipv6_hdr = NULL;
670 
671 	switch (eth_hdr->ether_type) {
672 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
673 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
674 		break;
675 
676 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
677 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
678 		break;
679 
680 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
681 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
682 		switch (vlan_hdr->eth_proto) {
683 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
684 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
685 			break;
686 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
687 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
688 			break;
689 		default:
690 			break;
691 		}
692 		break;
693 
694 	default:
695 		break;
696 	}
697 }
698 
699 /* Calculate RSS hash for IPv4/6 */
700 static inline uint32_t
701 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
702 {
703 	uint32_t input_len;
704 	void *tuple;
705 	struct rte_ipv4_tuple ipv4_tuple;
706 	struct rte_ipv6_tuple ipv6_tuple;
707 	struct rte_ipv4_hdr *ipv4_hdr;
708 	struct rte_ipv6_hdr *ipv6_hdr;
709 
710 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
711 
712 	if (ipv4_hdr) {
713 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
714 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
715 		tuple = &ipv4_tuple;
716 		input_len = RTE_THASH_V4_L3_LEN;
717 	} else if (ipv6_hdr) {
718 		rte_thash_load_v6_addrs(ipv6_hdr,
719 					(union rte_thash_tuple *)&ipv6_tuple);
720 		tuple = &ipv6_tuple;
721 		input_len = RTE_THASH_V6_L3_LEN;
722 	} else
723 		return 0;
724 
725 	return rte_softrss_be(tuple, input_len, rss_key_be);
726 }
727 
728 static inline int
729 rxa_enq_blocked(struct event_eth_rx_adapter *rx_adapter)
730 {
731 	return !!rx_adapter->enq_block_count;
732 }
733 
734 static inline void
735 rxa_enq_block_start_ts(struct event_eth_rx_adapter *rx_adapter)
736 {
737 	if (rx_adapter->rx_enq_block_start_ts)
738 		return;
739 
740 	rx_adapter->enq_block_count++;
741 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
742 		return;
743 
744 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
745 }
746 
747 static inline void
748 rxa_enq_block_end_ts(struct event_eth_rx_adapter *rx_adapter,
749 		     struct rte_event_eth_rx_adapter_stats *stats)
750 {
751 	if (unlikely(!stats->rx_enq_start_ts))
752 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
753 
754 	if (likely(!rxa_enq_blocked(rx_adapter)))
755 		return;
756 
757 	rx_adapter->enq_block_count = 0;
758 	if (rx_adapter->rx_enq_block_start_ts) {
759 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
760 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
761 		    rx_adapter->rx_enq_block_start_ts;
762 		rx_adapter->rx_enq_block_start_ts = 0;
763 	}
764 }
765 
766 /* Enqueue buffered events to event device */
767 static inline uint16_t
768 rxa_flush_event_buffer(struct event_eth_rx_adapter *rx_adapter,
769 		       struct eth_event_enqueue_buffer *buf)
770 {
771 	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
772 	uint16_t count = buf->last ? buf->last - buf->head : buf->count;
773 
774 	if (!count)
775 		return 0;
776 
777 	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
778 					rx_adapter->event_port_id,
779 					&buf->events[buf->head],
780 					count);
781 	if (n != count)
782 		stats->rx_enq_retry++;
783 
784 	buf->head += n;
785 
786 	if (buf->last && n == count) {
787 		uint16_t n1;
788 
789 		n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
790 					rx_adapter->event_port_id,
791 					&buf->events[0],
792 					buf->tail);
793 
794 		if (n1 != buf->tail)
795 			stats->rx_enq_retry++;
796 
797 		buf->last = 0;
798 		buf->head = n1;
799 		buf->last_mask = 0;
800 		n += n1;
801 	}
802 
803 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
804 		rxa_enq_block_start_ts(rx_adapter);
805 
806 	buf->count -= n;
807 	stats->rx_enq_count += n;
808 
809 	return n;
810 }
811 
812 static inline void
813 rxa_init_vector(struct event_eth_rx_adapter *rx_adapter,
814 		struct eth_rx_vector_data *vec)
815 {
816 	vec->vector_ev->nb_elem = 0;
817 	vec->vector_ev->port = vec->port;
818 	vec->vector_ev->queue = vec->queue;
819 	vec->vector_ev->attr_valid = true;
820 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
821 }
822 
823 static inline uint16_t
824 rxa_create_event_vector(struct event_eth_rx_adapter *rx_adapter,
825 			struct eth_rx_queue_info *queue_info,
826 			struct eth_event_enqueue_buffer *buf,
827 			struct rte_mbuf **mbufs, uint16_t num)
828 {
829 	struct rte_event *ev = &buf->events[buf->count];
830 	struct eth_rx_vector_data *vec;
831 	uint16_t filled, space, sz;
832 
833 	filled = 0;
834 	vec = &queue_info->vector_data;
835 
836 	if (vec->vector_ev == NULL) {
837 		if (rte_mempool_get(vec->vector_pool,
838 				    (void **)&vec->vector_ev) < 0) {
839 			rte_pktmbuf_free_bulk(mbufs, num);
840 			return 0;
841 		}
842 		rxa_init_vector(rx_adapter, vec);
843 	}
844 	while (num) {
845 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
846 			/* Event ready. */
847 			ev->event = vec->event;
848 			ev->vec = vec->vector_ev;
849 			ev++;
850 			filled++;
851 			vec->vector_ev = NULL;
852 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
853 			if (rte_mempool_get(vec->vector_pool,
854 					    (void **)&vec->vector_ev) < 0) {
855 				rte_pktmbuf_free_bulk(mbufs, num);
856 				return 0;
857 			}
858 			rxa_init_vector(rx_adapter, vec);
859 		}
860 
861 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
862 		sz = num > space ? space : num;
863 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
864 		       sizeof(void *) * sz);
865 		vec->vector_ev->nb_elem += sz;
866 		num -= sz;
867 		mbufs += sz;
868 		vec->ts = rte_rdtsc();
869 	}
870 
871 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
872 		ev->event = vec->event;
873 		ev->vec = vec->vector_ev;
874 		ev++;
875 		filled++;
876 		vec->vector_ev = NULL;
877 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
878 	}
879 
880 	return filled;
881 }
882 
883 static inline void
884 rxa_buffer_mbufs(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
885 		 uint16_t rx_queue_id, struct rte_mbuf **mbufs, uint16_t num,
886 		 struct eth_event_enqueue_buffer *buf)
887 {
888 	uint32_t i;
889 	struct eth_device_info *dev_info =
890 					&rx_adapter->eth_devices[eth_dev_id];
891 	struct eth_rx_queue_info *eth_rx_queue_info =
892 					&dev_info->rx_queue[rx_queue_id];
893 	uint16_t new_tail = buf->tail;
894 	uint64_t event = eth_rx_queue_info->event;
895 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
896 	struct rte_mbuf *m = mbufs[0];
897 	uint32_t rss_mask;
898 	uint32_t rss;
899 	int do_rss;
900 	uint16_t nb_cb;
901 	uint16_t dropped;
902 	uint64_t ts, ts_mask;
903 
904 	if (!eth_rx_queue_info->ena_vector) {
905 		ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
906 						0 : rte_get_tsc_cycles();
907 
908 		/* 0xffff ffff ffff ffff if PKT_RX_TIMESTAMP is set,
909 		 * otherwise 0
910 		 */
911 		ts_mask = (uint64_t)(!(m->ol_flags &
912 				       event_eth_rx_timestamp_dynflag)) - 1ULL;
913 
914 		/* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
915 		rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
916 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
917 		for (i = 0; i < num; i++) {
918 			struct rte_event *ev;
919 
920 			m = mbufs[i];
921 			*rxa_timestamp_dynfield(m) = ts |
922 					(*rxa_timestamp_dynfield(m) & ts_mask);
923 
924 			ev = &buf->events[new_tail];
925 
926 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
927 				     : m->hash.rss;
928 			ev->event = event;
929 			ev->flow_id = (rss & ~flow_id_mask) |
930 				      (ev->flow_id & flow_id_mask);
931 			ev->mbuf = m;
932 			new_tail++;
933 		}
934 	} else {
935 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
936 					      buf, mbufs, num);
937 	}
938 
939 	if (num && dev_info->cb_fn) {
940 
941 		dropped = 0;
942 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
943 				       buf->last |
944 				       (buf->events_size & ~buf->last_mask),
945 				       buf->count >= BATCH_SIZE ?
946 						buf->count - BATCH_SIZE : 0,
947 				       &buf->events[buf->tail],
948 				       num,
949 				       dev_info->cb_arg,
950 				       &dropped);
951 		if (unlikely(nb_cb > num))
952 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
953 				nb_cb, num);
954 		else
955 			num = nb_cb;
956 		if (dropped)
957 			rx_adapter->stats.rx_dropped += dropped;
958 	}
959 
960 	buf->count += num;
961 	buf->tail += num;
962 }
963 
964 static inline bool
965 rxa_pkt_buf_available(struct eth_event_enqueue_buffer *buf)
966 {
967 	uint32_t nb_req = buf->tail + BATCH_SIZE;
968 
969 	if (!buf->last) {
970 		if (nb_req <= buf->events_size)
971 			return true;
972 
973 		if (buf->head >= BATCH_SIZE) {
974 			buf->last_mask = ~0;
975 			buf->last = buf->tail;
976 			buf->tail = 0;
977 			return true;
978 		}
979 	}
980 
981 	return nb_req <= buf->head;
982 }
983 
984 /* Enqueue packets from  <port, q>  to event buffer */
985 static inline uint32_t
986 rxa_eth_rx(struct event_eth_rx_adapter *rx_adapter, uint16_t port_id,
987 	   uint16_t queue_id, uint32_t rx_count, uint32_t max_rx,
988 	   int *rxq_empty, struct eth_event_enqueue_buffer *buf)
989 {
990 	struct rte_mbuf *mbufs[BATCH_SIZE];
991 	struct rte_event_eth_rx_adapter_stats *stats =
992 					&rx_adapter->stats;
993 	uint16_t n;
994 	uint32_t nb_rx = 0;
995 
996 	if (rxq_empty)
997 		*rxq_empty = 0;
998 	/* Don't do a batch dequeue from the rx queue if there isn't
999 	 * enough space in the enqueue buffer.
1000 	 */
1001 	while (rxa_pkt_buf_available(buf)) {
1002 		if (buf->count >= BATCH_SIZE)
1003 			rxa_flush_event_buffer(rx_adapter, buf);
1004 
1005 		stats->rx_poll_count++;
1006 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1007 		if (unlikely(!n)) {
1008 			if (rxq_empty)
1009 				*rxq_empty = 1;
1010 			break;
1011 		}
1012 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf);
1013 		nb_rx += n;
1014 		if (rx_count + nb_rx > max_rx)
1015 			break;
1016 	}
1017 
1018 	if (buf->count > 0)
1019 		rxa_flush_event_buffer(rx_adapter, buf);
1020 
1021 	return nb_rx;
1022 }
1023 
1024 static inline void
1025 rxa_intr_ring_enqueue(struct event_eth_rx_adapter *rx_adapter, void *data)
1026 {
1027 	uint16_t port_id;
1028 	uint16_t queue;
1029 	int err;
1030 	union queue_data qd;
1031 	struct eth_device_info *dev_info;
1032 	struct eth_rx_queue_info *queue_info;
1033 	int *intr_enabled;
1034 
1035 	qd.ptr = data;
1036 	port_id = qd.port;
1037 	queue = qd.queue;
1038 
1039 	dev_info = &rx_adapter->eth_devices[port_id];
1040 	queue_info = &dev_info->rx_queue[queue];
1041 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1042 	if (rxa_shared_intr(dev_info, queue))
1043 		intr_enabled = &dev_info->shared_intr_enabled;
1044 	else
1045 		intr_enabled = &queue_info->intr_enabled;
1046 
1047 	if (*intr_enabled) {
1048 		*intr_enabled = 0;
1049 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1050 		/* Entry should always be available.
1051 		 * The ring size equals the maximum number of interrupt
1052 		 * vectors supported (an interrupt vector is shared in
1053 		 * case of shared interrupts)
1054 		 */
1055 		if (err)
1056 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1057 				" to ring: %s", strerror(-err));
1058 		else
1059 			rte_eth_dev_rx_intr_disable(port_id, queue);
1060 	}
1061 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1062 }
1063 
1064 static int
1065 rxa_intr_ring_check_avail(struct event_eth_rx_adapter *rx_adapter,
1066 			  uint32_t num_intr_vec)
1067 {
1068 	if (rx_adapter->num_intr_vec + num_intr_vec >
1069 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1070 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1071 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1072 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1073 		return -ENOSPC;
1074 	}
1075 
1076 	return 0;
1077 }
1078 
1079 /* Delete entries for (dev, queue) from the interrupt ring */
1080 static void
1081 rxa_intr_ring_del_entries(struct event_eth_rx_adapter *rx_adapter,
1082 			  struct eth_device_info *dev_info,
1083 			  uint16_t rx_queue_id)
1084 {
1085 	int i, n;
1086 	union queue_data qd;
1087 
1088 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1089 
1090 	n = rte_ring_count(rx_adapter->intr_ring);
1091 	for (i = 0; i < n; i++) {
1092 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1093 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1094 			if (qd.port == dev_info->dev->data->port_id &&
1095 				qd.queue == rx_queue_id)
1096 				continue;
1097 		} else {
1098 			if (qd.port == dev_info->dev->data->port_id)
1099 				continue;
1100 		}
1101 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1102 	}
1103 
1104 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1105 }
1106 
1107 /* pthread callback handling interrupt mode receive queues
1108  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1109  * interrupting queue to the adapter's ring buffer for interrupt events.
1110  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1111  * the adapter service function.
1112  */
1113 static void *
1114 rxa_intr_thread(void *arg)
1115 {
1116 	struct event_eth_rx_adapter *rx_adapter = arg;
1117 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1118 	int n, i;
1119 
1120 	while (1) {
1121 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1122 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1123 		if (unlikely(n < 0))
1124 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1125 					n);
1126 		for (i = 0; i < n; i++) {
1127 			rxa_intr_ring_enqueue(rx_adapter,
1128 					epoll_events[i].epdata.data);
1129 		}
1130 	}
1131 
1132 	return NULL;
1133 }
1134 
1135 /* Dequeue <port, q> from interrupt ring and enqueue received
1136  * mbufs to eventdev
1137  */
1138 static inline uint32_t
1139 rxa_intr_ring_dequeue(struct event_eth_rx_adapter *rx_adapter)
1140 {
1141 	uint32_t n;
1142 	uint32_t nb_rx = 0;
1143 	int rxq_empty;
1144 	struct eth_event_enqueue_buffer *buf;
1145 	rte_spinlock_t *ring_lock;
1146 	uint8_t max_done = 0;
1147 
1148 	if (rx_adapter->num_rx_intr == 0)
1149 		return 0;
1150 
1151 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1152 		&& !rx_adapter->qd_valid)
1153 		return 0;
1154 
1155 	buf = &rx_adapter->event_enqueue_buffer;
1156 	ring_lock = &rx_adapter->intr_ring_lock;
1157 
1158 	if (buf->count >= BATCH_SIZE)
1159 		rxa_flush_event_buffer(rx_adapter, buf);
1160 
1161 	while (rxa_pkt_buf_available(buf)) {
1162 		struct eth_device_info *dev_info;
1163 		uint16_t port;
1164 		uint16_t queue;
1165 		union queue_data qd  = rx_adapter->qd;
1166 		int err;
1167 
1168 		if (!rx_adapter->qd_valid) {
1169 			struct eth_rx_queue_info *queue_info;
1170 
1171 			rte_spinlock_lock(ring_lock);
1172 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1173 			if (err) {
1174 				rte_spinlock_unlock(ring_lock);
1175 				break;
1176 			}
1177 
1178 			port = qd.port;
1179 			queue = qd.queue;
1180 			rx_adapter->qd = qd;
1181 			rx_adapter->qd_valid = 1;
1182 			dev_info = &rx_adapter->eth_devices[port];
1183 			if (rxa_shared_intr(dev_info, queue))
1184 				dev_info->shared_intr_enabled = 1;
1185 			else {
1186 				queue_info = &dev_info->rx_queue[queue];
1187 				queue_info->intr_enabled = 1;
1188 			}
1189 			rte_eth_dev_rx_intr_enable(port, queue);
1190 			rte_spinlock_unlock(ring_lock);
1191 		} else {
1192 			port = qd.port;
1193 			queue = qd.queue;
1194 
1195 			dev_info = &rx_adapter->eth_devices[port];
1196 		}
1197 
1198 		if (rxa_shared_intr(dev_info, queue)) {
1199 			uint16_t i;
1200 			uint16_t nb_queues;
1201 
1202 			nb_queues = dev_info->dev->data->nb_rx_queues;
1203 			n = 0;
1204 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1205 				uint8_t enq_buffer_full;
1206 
1207 				if (!rxa_intr_queue(dev_info, i))
1208 					continue;
1209 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1210 					rx_adapter->max_nb_rx,
1211 					&rxq_empty, buf);
1212 				nb_rx += n;
1213 
1214 				enq_buffer_full = !rxq_empty && n == 0;
1215 				max_done = nb_rx > rx_adapter->max_nb_rx;
1216 
1217 				if (enq_buffer_full || max_done) {
1218 					dev_info->next_q_idx = i;
1219 					goto done;
1220 				}
1221 			}
1222 
1223 			rx_adapter->qd_valid = 0;
1224 
1225 			/* Reinitialize for next interrupt */
1226 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1227 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1228 						0;
1229 		} else {
1230 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1231 				rx_adapter->max_nb_rx,
1232 				&rxq_empty, buf);
1233 			rx_adapter->qd_valid = !rxq_empty;
1234 			nb_rx += n;
1235 			if (nb_rx > rx_adapter->max_nb_rx)
1236 				break;
1237 		}
1238 	}
1239 
1240 done:
1241 	rx_adapter->stats.rx_intr_packets += nb_rx;
1242 	return nb_rx;
1243 }
1244 
1245 /*
1246  * Polls receive queues added to the event adapter and enqueues received
1247  * packets to the event device.
1248  *
1249  * The receive code enqueues initially to a temporary buffer, the
1250  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1251  *
1252  * If there isn't space available in the temporary buffer, packets from the
1253  * Rx queue aren't dequeued from the eth device, this back pressures the
1254  * eth device, in virtual device environments this back pressure is relayed to
1255  * the hypervisor's switching layer where adjustments can be made to deal with
1256  * it.
1257  */
1258 static inline uint32_t
1259 rxa_poll(struct event_eth_rx_adapter *rx_adapter)
1260 {
1261 	uint32_t num_queue;
1262 	uint32_t nb_rx = 0;
1263 	struct eth_event_enqueue_buffer *buf = NULL;
1264 	uint32_t wrr_pos;
1265 	uint32_t max_nb_rx;
1266 
1267 	wrr_pos = rx_adapter->wrr_pos;
1268 	max_nb_rx = rx_adapter->max_nb_rx;
1269 
1270 	/* Iterate through a WRR sequence */
1271 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1272 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1273 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1274 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1275 
1276 		buf = rxa_event_buf_get(rx_adapter, d, qid);
1277 
1278 		/* Don't do a batch dequeue from the rx queue if there isn't
1279 		 * enough space in the enqueue buffer.
1280 		 */
1281 		if (buf->count >= BATCH_SIZE)
1282 			rxa_flush_event_buffer(rx_adapter, buf);
1283 		if (!rxa_pkt_buf_available(buf)) {
1284 			if (rx_adapter->use_queue_event_buf)
1285 				goto poll_next_entry;
1286 			else {
1287 				rx_adapter->wrr_pos = wrr_pos;
1288 				return nb_rx;
1289 			}
1290 		}
1291 
1292 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1293 				NULL, buf);
1294 		if (nb_rx > max_nb_rx) {
1295 			rx_adapter->wrr_pos =
1296 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1297 			break;
1298 		}
1299 
1300 poll_next_entry:
1301 		if (++wrr_pos == rx_adapter->wrr_len)
1302 			wrr_pos = 0;
1303 	}
1304 	return nb_rx;
1305 }
1306 
1307 static void
1308 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1309 {
1310 	struct event_eth_rx_adapter *rx_adapter = arg;
1311 	struct eth_event_enqueue_buffer *buf = NULL;
1312 	struct rte_event *ev;
1313 
1314 	buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue);
1315 
1316 	if (buf->count)
1317 		rxa_flush_event_buffer(rx_adapter, buf);
1318 
1319 	if (vec->vector_ev->nb_elem == 0)
1320 		return;
1321 	ev = &buf->events[buf->count];
1322 
1323 	/* Event ready. */
1324 	ev->event = vec->event;
1325 	ev->vec = vec->vector_ev;
1326 	buf->count++;
1327 
1328 	vec->vector_ev = NULL;
1329 	vec->ts = 0;
1330 }
1331 
1332 static int
1333 rxa_service_func(void *args)
1334 {
1335 	struct event_eth_rx_adapter *rx_adapter = args;
1336 	struct rte_event_eth_rx_adapter_stats *stats;
1337 
1338 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1339 		return 0;
1340 	if (!rx_adapter->rxa_started) {
1341 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1342 		return 0;
1343 	}
1344 
1345 	if (rx_adapter->ena_vector) {
1346 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1347 		    rx_adapter->vector_tmo_ticks) {
1348 			struct eth_rx_vector_data *vec;
1349 
1350 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1351 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1352 
1353 				if (elapsed_time >= vec->vector_timeout_ticks) {
1354 					rxa_vector_expire(vec, rx_adapter);
1355 					TAILQ_REMOVE(&rx_adapter->vector_list,
1356 						     vec, next);
1357 				}
1358 			}
1359 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1360 		}
1361 	}
1362 
1363 	stats = &rx_adapter->stats;
1364 	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1365 	stats->rx_packets += rxa_poll(rx_adapter);
1366 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1367 	return 0;
1368 }
1369 
1370 static int
1371 rte_event_eth_rx_adapter_init(void)
1372 {
1373 	const char *name = RXA_ADAPTER_ARRAY;
1374 	const struct rte_memzone *mz;
1375 	unsigned int sz;
1376 
1377 	sz = sizeof(*event_eth_rx_adapter) *
1378 	    RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1379 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1380 
1381 	mz = rte_memzone_lookup(name);
1382 	if (mz == NULL) {
1383 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1384 						 RTE_CACHE_LINE_SIZE);
1385 		if (mz == NULL) {
1386 			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1387 					PRId32, rte_errno);
1388 			return -rte_errno;
1389 		}
1390 	}
1391 
1392 	event_eth_rx_adapter = mz->addr;
1393 	return 0;
1394 }
1395 
1396 static int
1397 rxa_memzone_lookup(void)
1398 {
1399 	const struct rte_memzone *mz;
1400 
1401 	if (event_eth_rx_adapter == NULL) {
1402 		mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1403 		if (mz == NULL)
1404 			return -ENOMEM;
1405 		event_eth_rx_adapter = mz->addr;
1406 	}
1407 
1408 	return 0;
1409 }
1410 
1411 static inline struct event_eth_rx_adapter *
1412 rxa_id_to_adapter(uint8_t id)
1413 {
1414 	return event_eth_rx_adapter ?
1415 		event_eth_rx_adapter[id] : NULL;
1416 }
1417 
1418 static int
1419 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1420 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1421 {
1422 	int ret;
1423 	struct rte_eventdev *dev;
1424 	struct rte_event_dev_config dev_conf;
1425 	int started;
1426 	uint8_t port_id;
1427 	struct rte_event_port_conf *port_conf = arg;
1428 	struct event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1429 
1430 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1431 	dev_conf = dev->data->dev_conf;
1432 
1433 	started = dev->data->dev_started;
1434 	if (started)
1435 		rte_event_dev_stop(dev_id);
1436 	port_id = dev_conf.nb_event_ports;
1437 	dev_conf.nb_event_ports += 1;
1438 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1439 	if (ret) {
1440 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1441 						dev_id);
1442 		if (started) {
1443 			if (rte_event_dev_start(dev_id))
1444 				return -EIO;
1445 		}
1446 		return ret;
1447 	}
1448 
1449 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1450 	if (ret) {
1451 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1452 					port_id);
1453 		return ret;
1454 	}
1455 
1456 	conf->event_port_id = port_id;
1457 	conf->max_nb_rx = 128;
1458 	if (started)
1459 		ret = rte_event_dev_start(dev_id);
1460 	rx_adapter->default_cb_arg = 1;
1461 	return ret;
1462 }
1463 
1464 static int
1465 rxa_epoll_create1(void)
1466 {
1467 #if defined(LINUX)
1468 	int fd;
1469 	fd = epoll_create1(EPOLL_CLOEXEC);
1470 	return fd < 0 ? -errno : fd;
1471 #elif defined(BSD)
1472 	return -ENOTSUP;
1473 #endif
1474 }
1475 
1476 static int
1477 rxa_init_epd(struct event_eth_rx_adapter *rx_adapter)
1478 {
1479 	if (rx_adapter->epd != INIT_FD)
1480 		return 0;
1481 
1482 	rx_adapter->epd = rxa_epoll_create1();
1483 	if (rx_adapter->epd < 0) {
1484 		int err = rx_adapter->epd;
1485 		rx_adapter->epd = INIT_FD;
1486 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1487 		return err;
1488 	}
1489 
1490 	return 0;
1491 }
1492 
1493 static int
1494 rxa_create_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1495 {
1496 	int err;
1497 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1498 
1499 	if (rx_adapter->intr_ring)
1500 		return 0;
1501 
1502 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1503 					RTE_EVENT_ETH_INTR_RING_SIZE,
1504 					rte_socket_id(), 0);
1505 	if (!rx_adapter->intr_ring)
1506 		return -ENOMEM;
1507 
1508 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1509 					RTE_EVENT_ETH_INTR_RING_SIZE *
1510 					sizeof(struct rte_epoll_event),
1511 					RTE_CACHE_LINE_SIZE,
1512 					rx_adapter->socket_id);
1513 	if (!rx_adapter->epoll_events) {
1514 		err = -ENOMEM;
1515 		goto error;
1516 	}
1517 
1518 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1519 
1520 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1521 			"rx-intr-thread-%d", rx_adapter->id);
1522 
1523 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1524 				NULL, rxa_intr_thread, rx_adapter);
1525 	if (!err)
1526 		return 0;
1527 
1528 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1529 	rte_free(rx_adapter->epoll_events);
1530 error:
1531 	rte_ring_free(rx_adapter->intr_ring);
1532 	rx_adapter->intr_ring = NULL;
1533 	rx_adapter->epoll_events = NULL;
1534 	return err;
1535 }
1536 
1537 static int
1538 rxa_destroy_intr_thread(struct event_eth_rx_adapter *rx_adapter)
1539 {
1540 	int err;
1541 
1542 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1543 	if (err)
1544 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1545 				err);
1546 
1547 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1548 	if (err)
1549 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1550 
1551 	rte_free(rx_adapter->epoll_events);
1552 	rte_ring_free(rx_adapter->intr_ring);
1553 	rx_adapter->intr_ring = NULL;
1554 	rx_adapter->epoll_events = NULL;
1555 	return 0;
1556 }
1557 
1558 static int
1559 rxa_free_intr_resources(struct event_eth_rx_adapter *rx_adapter)
1560 {
1561 	int ret;
1562 
1563 	if (rx_adapter->num_rx_intr == 0)
1564 		return 0;
1565 
1566 	ret = rxa_destroy_intr_thread(rx_adapter);
1567 	if (ret)
1568 		return ret;
1569 
1570 	close(rx_adapter->epd);
1571 	rx_adapter->epd = INIT_FD;
1572 
1573 	return ret;
1574 }
1575 
1576 static int
1577 rxa_disable_intr(struct event_eth_rx_adapter *rx_adapter,
1578 		 struct eth_device_info *dev_info, uint16_t rx_queue_id)
1579 {
1580 	int err;
1581 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1582 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1583 
1584 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1585 	if (err) {
1586 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1587 			rx_queue_id);
1588 		return err;
1589 	}
1590 
1591 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1592 					rx_adapter->epd,
1593 					RTE_INTR_EVENT_DEL,
1594 					0);
1595 	if (err)
1596 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1597 
1598 	if (sintr)
1599 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1600 	else
1601 		dev_info->shared_intr_enabled = 0;
1602 	return err;
1603 }
1604 
1605 static int
1606 rxa_del_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1607 		   struct eth_device_info *dev_info, int rx_queue_id)
1608 {
1609 	int err;
1610 	int i;
1611 	int s;
1612 
1613 	if (dev_info->nb_rx_intr == 0)
1614 		return 0;
1615 
1616 	err = 0;
1617 	if (rx_queue_id == -1) {
1618 		s = dev_info->nb_shared_intr;
1619 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1620 			int sintr;
1621 			uint16_t q;
1622 
1623 			q = dev_info->intr_queue[i];
1624 			sintr = rxa_shared_intr(dev_info, q);
1625 			s -= sintr;
1626 
1627 			if (!sintr || s == 0) {
1628 
1629 				err = rxa_disable_intr(rx_adapter, dev_info,
1630 						q);
1631 				if (err)
1632 					return err;
1633 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1634 							q);
1635 			}
1636 		}
1637 	} else {
1638 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1639 			return 0;
1640 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1641 				dev_info->nb_shared_intr == 1) {
1642 			err = rxa_disable_intr(rx_adapter, dev_info,
1643 					rx_queue_id);
1644 			if (err)
1645 				return err;
1646 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1647 						rx_queue_id);
1648 		}
1649 
1650 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1651 			if (dev_info->intr_queue[i] == rx_queue_id) {
1652 				for (; i < dev_info->nb_rx_intr - 1; i++)
1653 					dev_info->intr_queue[i] =
1654 						dev_info->intr_queue[i + 1];
1655 				break;
1656 			}
1657 		}
1658 	}
1659 
1660 	return err;
1661 }
1662 
1663 static int
1664 rxa_config_intr(struct event_eth_rx_adapter *rx_adapter,
1665 		struct eth_device_info *dev_info, uint16_t rx_queue_id)
1666 {
1667 	int err, err1;
1668 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1669 	union queue_data qd;
1670 	int init_fd;
1671 	uint16_t *intr_queue;
1672 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1673 
1674 	if (rxa_intr_queue(dev_info, rx_queue_id))
1675 		return 0;
1676 
1677 	intr_queue = dev_info->intr_queue;
1678 	if (dev_info->intr_queue == NULL) {
1679 		size_t len =
1680 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1681 		dev_info->intr_queue =
1682 			rte_zmalloc_socket(
1683 				rx_adapter->mem_name,
1684 				len,
1685 				0,
1686 				rx_adapter->socket_id);
1687 		if (dev_info->intr_queue == NULL)
1688 			return -ENOMEM;
1689 	}
1690 
1691 	init_fd = rx_adapter->epd;
1692 	err = rxa_init_epd(rx_adapter);
1693 	if (err)
1694 		goto err_free_queue;
1695 
1696 	qd.port = eth_dev_id;
1697 	qd.queue = rx_queue_id;
1698 
1699 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1700 					rx_adapter->epd,
1701 					RTE_INTR_EVENT_ADD,
1702 					qd.ptr);
1703 	if (err) {
1704 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1705 			" Rx Queue %u err %d", rx_queue_id, err);
1706 		goto err_del_fd;
1707 	}
1708 
1709 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1710 	if (err) {
1711 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1712 				" Rx Queue %u err %d", rx_queue_id, err);
1713 
1714 		goto err_del_event;
1715 	}
1716 
1717 	err = rxa_create_intr_thread(rx_adapter);
1718 	if (!err)  {
1719 		if (sintr)
1720 			dev_info->shared_intr_enabled = 1;
1721 		else
1722 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1723 		return 0;
1724 	}
1725 
1726 
1727 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1728 	if (err)
1729 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1730 				" Rx Queue %u err %d", rx_queue_id, err);
1731 err_del_event:
1732 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1733 					rx_adapter->epd,
1734 					RTE_INTR_EVENT_DEL,
1735 					0);
1736 	if (err1) {
1737 		RTE_EDEV_LOG_ERR("Could not delete event for"
1738 				" Rx Queue %u err %d", rx_queue_id, err1);
1739 	}
1740 err_del_fd:
1741 	if (init_fd == INIT_FD) {
1742 		close(rx_adapter->epd);
1743 		rx_adapter->epd = -1;
1744 	}
1745 err_free_queue:
1746 	if (intr_queue == NULL)
1747 		rte_free(dev_info->intr_queue);
1748 
1749 	return err;
1750 }
1751 
1752 static int
1753 rxa_add_intr_queue(struct event_eth_rx_adapter *rx_adapter,
1754 		   struct eth_device_info *dev_info, int rx_queue_id)
1755 
1756 {
1757 	int i, j, err;
1758 	int si = -1;
1759 	int shared_done = (dev_info->nb_shared_intr > 0);
1760 
1761 	if (rx_queue_id != -1) {
1762 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1763 			return 0;
1764 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1765 	}
1766 
1767 	err = 0;
1768 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1769 
1770 		if (rxa_shared_intr(dev_info, i) && shared_done)
1771 			continue;
1772 
1773 		err = rxa_config_intr(rx_adapter, dev_info, i);
1774 
1775 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1776 		if (shared_done) {
1777 			si = i;
1778 			dev_info->shared_intr_enabled = 1;
1779 		}
1780 		if (err)
1781 			break;
1782 	}
1783 
1784 	if (err == 0)
1785 		return 0;
1786 
1787 	shared_done = (dev_info->nb_shared_intr > 0);
1788 	for (j = 0; j < i; j++) {
1789 		if (rxa_intr_queue(dev_info, j))
1790 			continue;
1791 		if (rxa_shared_intr(dev_info, j) && si != j)
1792 			continue;
1793 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1794 		if (err)
1795 			break;
1796 
1797 	}
1798 
1799 	return err;
1800 }
1801 
1802 static int
1803 rxa_init_service(struct event_eth_rx_adapter *rx_adapter, uint8_t id)
1804 {
1805 	int ret;
1806 	struct rte_service_spec service;
1807 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1808 
1809 	if (rx_adapter->service_inited)
1810 		return 0;
1811 
1812 	memset(&service, 0, sizeof(service));
1813 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1814 		"rte_event_eth_rx_adapter_%d", id);
1815 	service.socket_id = rx_adapter->socket_id;
1816 	service.callback = rxa_service_func;
1817 	service.callback_userdata = rx_adapter;
1818 	/* Service function handles locking for queue add/del updates */
1819 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1820 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1821 	if (ret) {
1822 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1823 			service.name, ret);
1824 		return ret;
1825 	}
1826 
1827 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1828 		&rx_adapter_conf, rx_adapter->conf_arg);
1829 	if (ret) {
1830 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1831 			ret);
1832 		goto err_done;
1833 	}
1834 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1835 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1836 	rx_adapter->service_inited = 1;
1837 	rx_adapter->epd = INIT_FD;
1838 	return 0;
1839 
1840 err_done:
1841 	rte_service_component_unregister(rx_adapter->service_id);
1842 	return ret;
1843 }
1844 
1845 static void
1846 rxa_update_queue(struct event_eth_rx_adapter *rx_adapter,
1847 		 struct eth_device_info *dev_info, int32_t rx_queue_id,
1848 		 uint8_t add)
1849 {
1850 	struct eth_rx_queue_info *queue_info;
1851 	int enabled;
1852 	uint16_t i;
1853 
1854 	if (dev_info->rx_queue == NULL)
1855 		return;
1856 
1857 	if (rx_queue_id == -1) {
1858 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1859 			rxa_update_queue(rx_adapter, dev_info, i, add);
1860 	} else {
1861 		queue_info = &dev_info->rx_queue[rx_queue_id];
1862 		enabled = queue_info->queue_enabled;
1863 		if (add) {
1864 			rx_adapter->nb_queues += !enabled;
1865 			dev_info->nb_dev_queues += !enabled;
1866 		} else {
1867 			rx_adapter->nb_queues -= enabled;
1868 			dev_info->nb_dev_queues -= enabled;
1869 		}
1870 		queue_info->queue_enabled = !!add;
1871 	}
1872 }
1873 
1874 static void
1875 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1876 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1877 		    uint16_t port_id)
1878 {
1879 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1880 	struct eth_rx_vector_data *vector_data;
1881 	uint32_t flow_id;
1882 
1883 	vector_data = &queue_info->vector_data;
1884 	vector_data->max_vector_count = vector_count;
1885 	vector_data->port = port_id;
1886 	vector_data->queue = qid;
1887 	vector_data->vector_pool = mp;
1888 	vector_data->vector_timeout_ticks =
1889 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1890 	vector_data->ts = 0;
1891 	flow_id = queue_info->event & 0xFFFFF;
1892 	flow_id =
1893 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1894 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1895 }
1896 
1897 static void
1898 rxa_sw_del(struct event_eth_rx_adapter *rx_adapter,
1899 	   struct eth_device_info *dev_info, int32_t rx_queue_id)
1900 {
1901 	struct eth_rx_vector_data *vec;
1902 	int pollq;
1903 	int intrq;
1904 	int sintrq;
1905 
1906 
1907 	if (rx_adapter->nb_queues == 0)
1908 		return;
1909 
1910 	if (rx_queue_id == -1) {
1911 		uint16_t nb_rx_queues;
1912 		uint16_t i;
1913 
1914 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1915 		for (i = 0; i <	nb_rx_queues; i++)
1916 			rxa_sw_del(rx_adapter, dev_info, i);
1917 		return;
1918 	}
1919 
1920 	/* Push all the partial event vectors to event device. */
1921 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1922 		if (vec->queue != rx_queue_id)
1923 			continue;
1924 		rxa_vector_expire(vec, rx_adapter);
1925 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1926 	}
1927 
1928 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1929 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1930 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1931 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1932 	rx_adapter->num_rx_polled -= pollq;
1933 	dev_info->nb_rx_poll -= pollq;
1934 	rx_adapter->num_rx_intr -= intrq;
1935 	dev_info->nb_rx_intr -= intrq;
1936 	dev_info->nb_shared_intr -= intrq && sintrq;
1937 	if (rx_adapter->use_queue_event_buf) {
1938 		struct eth_event_enqueue_buffer *event_buf =
1939 			dev_info->rx_queue[rx_queue_id].event_buf;
1940 		rte_free(event_buf->events);
1941 		rte_free(event_buf);
1942 		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
1943 	}
1944 }
1945 
1946 static int
1947 rxa_add_queue(struct event_eth_rx_adapter *rx_adapter,
1948 	      struct eth_device_info *dev_info, int32_t rx_queue_id,
1949 	      const struct rte_event_eth_rx_adapter_queue_conf *conf)
1950 {
1951 	struct eth_rx_queue_info *queue_info;
1952 	const struct rte_event *ev = &conf->ev;
1953 	int pollq;
1954 	int intrq;
1955 	int sintrq;
1956 	struct rte_event *qi_ev;
1957 	struct eth_event_enqueue_buffer *new_rx_buf = NULL;
1958 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1959 	int ret;
1960 
1961 	if (rx_queue_id == -1) {
1962 		uint16_t nb_rx_queues;
1963 		uint16_t i;
1964 
1965 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1966 		for (i = 0; i <	nb_rx_queues; i++) {
1967 			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
1968 			if (ret)
1969 				return ret;
1970 		}
1971 		return 0;
1972 	}
1973 
1974 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1975 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1976 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1977 
1978 	queue_info = &dev_info->rx_queue[rx_queue_id];
1979 	queue_info->wt = conf->servicing_weight;
1980 
1981 	qi_ev = (struct rte_event *)&queue_info->event;
1982 	qi_ev->event = ev->event;
1983 	qi_ev->op = RTE_EVENT_OP_NEW;
1984 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1985 	qi_ev->sub_event_type = 0;
1986 
1987 	if (conf->rx_queue_flags &
1988 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1989 		queue_info->flow_id_mask = ~0;
1990 	} else
1991 		qi_ev->flow_id = 0;
1992 
1993 	if (conf->rx_queue_flags &
1994 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
1995 		queue_info->ena_vector = 1;
1996 		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1997 		rxa_set_vector_data(queue_info, conf->vector_sz,
1998 				    conf->vector_timeout_ns, conf->vector_mp,
1999 				    rx_queue_id, dev_info->dev->data->port_id);
2000 		rx_adapter->ena_vector = 1;
2001 		rx_adapter->vector_tmo_ticks =
2002 			rx_adapter->vector_tmo_ticks ?
2003 				      RTE_MIN(queue_info->vector_data
2004 							.vector_timeout_ticks >>
2005 						1,
2006 					rx_adapter->vector_tmo_ticks) :
2007 				queue_info->vector_data.vector_timeout_ticks >>
2008 					1;
2009 	}
2010 
2011 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2012 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
2013 		rx_adapter->num_rx_polled += !pollq;
2014 		dev_info->nb_rx_poll += !pollq;
2015 		rx_adapter->num_rx_intr -= intrq;
2016 		dev_info->nb_rx_intr -= intrq;
2017 		dev_info->nb_shared_intr -= intrq && sintrq;
2018 	}
2019 
2020 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
2021 		rx_adapter->num_rx_polled -= pollq;
2022 		dev_info->nb_rx_poll -= pollq;
2023 		rx_adapter->num_rx_intr += !intrq;
2024 		dev_info->nb_rx_intr += !intrq;
2025 		dev_info->nb_shared_intr += !intrq && sintrq;
2026 		if (dev_info->nb_shared_intr == 1) {
2027 			if (dev_info->multi_intr_cap)
2028 				dev_info->next_q_idx =
2029 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
2030 			else
2031 				dev_info->next_q_idx = 0;
2032 		}
2033 	}
2034 
2035 	if (!rx_adapter->use_queue_event_buf)
2036 		return 0;
2037 
2038 	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2039 				sizeof(*new_rx_buf), 0,
2040 				rte_eth_dev_socket_id(eth_dev_id));
2041 	if (new_rx_buf == NULL) {
2042 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2043 				 "dev_id: %d queue_id: %d",
2044 				 eth_dev_id, rx_queue_id);
2045 		return -ENOMEM;
2046 	}
2047 
2048 	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2049 	new_rx_buf->events_size += (2 * BATCH_SIZE);
2050 	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2051 				sizeof(struct rte_event) *
2052 				new_rx_buf->events_size, 0,
2053 				rte_eth_dev_socket_id(eth_dev_id));
2054 	if (new_rx_buf->events == NULL) {
2055 		rte_free(new_rx_buf);
2056 		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2057 				 "dev_id: %d queue_id: %d",
2058 				 eth_dev_id, rx_queue_id);
2059 		return -ENOMEM;
2060 	}
2061 
2062 	queue_info->event_buf = new_rx_buf;
2063 
2064 	return 0;
2065 }
2066 
2067 static int
2068 rxa_sw_add(struct event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
2069 	   int rx_queue_id,
2070 	   const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2071 {
2072 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2073 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2074 	int ret;
2075 	struct eth_rx_poll_entry *rx_poll;
2076 	struct eth_rx_queue_info *rx_queue;
2077 	uint32_t *rx_wrr;
2078 	uint16_t nb_rx_queues;
2079 	uint32_t nb_rx_poll, nb_wrr;
2080 	uint32_t nb_rx_intr;
2081 	int num_intr_vec;
2082 	uint16_t wt;
2083 
2084 	if (queue_conf->servicing_weight == 0) {
2085 		struct rte_eth_dev_data *data = dev_info->dev->data;
2086 
2087 		temp_conf = *queue_conf;
2088 		if (!data->dev_conf.intr_conf.rxq) {
2089 			/* If Rx interrupts are disabled set wt = 1 */
2090 			temp_conf.servicing_weight = 1;
2091 		}
2092 		queue_conf = &temp_conf;
2093 
2094 		if (queue_conf->servicing_weight == 0 &&
2095 		    rx_adapter->use_queue_event_buf) {
2096 
2097 			RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2098 					 "not supported for interrupt queues "
2099 					 "dev_id: %d queue_id: %d",
2100 					 eth_dev_id, rx_queue_id);
2101 			return -EINVAL;
2102 		}
2103 	}
2104 
2105 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2106 	rx_queue = dev_info->rx_queue;
2107 	wt = queue_conf->servicing_weight;
2108 
2109 	if (dev_info->rx_queue == NULL) {
2110 		dev_info->rx_queue =
2111 		    rte_zmalloc_socket(rx_adapter->mem_name,
2112 				       nb_rx_queues *
2113 				       sizeof(struct eth_rx_queue_info), 0,
2114 				       rx_adapter->socket_id);
2115 		if (dev_info->rx_queue == NULL)
2116 			return -ENOMEM;
2117 	}
2118 	rx_wrr = NULL;
2119 	rx_poll = NULL;
2120 
2121 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2122 			queue_conf->servicing_weight,
2123 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2124 
2125 	if (dev_info->dev->intr_handle)
2126 		dev_info->multi_intr_cap =
2127 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2128 
2129 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2130 				&rx_poll, &rx_wrr);
2131 	if (ret)
2132 		goto err_free_rxqueue;
2133 
2134 	if (wt == 0) {
2135 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2136 
2137 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2138 		if (ret)
2139 			goto err_free_rxqueue;
2140 
2141 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2142 		if (ret)
2143 			goto err_free_rxqueue;
2144 	} else {
2145 
2146 		num_intr_vec = 0;
2147 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2148 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2149 						rx_queue_id, 0);
2150 			/* interrupt based queues are being converted to
2151 			 * poll mode queues, delete the interrupt configuration
2152 			 * for those.
2153 			 */
2154 			ret = rxa_del_intr_queue(rx_adapter,
2155 						dev_info, rx_queue_id);
2156 			if (ret)
2157 				goto err_free_rxqueue;
2158 		}
2159 	}
2160 
2161 	if (nb_rx_intr == 0) {
2162 		ret = rxa_free_intr_resources(rx_adapter);
2163 		if (ret)
2164 			goto err_free_rxqueue;
2165 	}
2166 
2167 	if (wt == 0) {
2168 		uint16_t i;
2169 
2170 		if (rx_queue_id  == -1) {
2171 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2172 				dev_info->intr_queue[i] = i;
2173 		} else {
2174 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2175 				dev_info->intr_queue[nb_rx_intr - 1] =
2176 					rx_queue_id;
2177 		}
2178 	}
2179 
2180 
2181 
2182 	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2183 	if (ret)
2184 		goto err_free_rxqueue;
2185 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2186 
2187 	rte_free(rx_adapter->eth_rx_poll);
2188 	rte_free(rx_adapter->wrr_sched);
2189 
2190 	rx_adapter->eth_rx_poll = rx_poll;
2191 	rx_adapter->wrr_sched = rx_wrr;
2192 	rx_adapter->wrr_len = nb_wrr;
2193 	rx_adapter->num_intr_vec += num_intr_vec;
2194 	return 0;
2195 
2196 err_free_rxqueue:
2197 	if (rx_queue == NULL) {
2198 		rte_free(dev_info->rx_queue);
2199 		dev_info->rx_queue = NULL;
2200 	}
2201 
2202 	rte_free(rx_poll);
2203 	rte_free(rx_wrr);
2204 
2205 	return ret;
2206 }
2207 
2208 static int
2209 rxa_ctrl(uint8_t id, int start)
2210 {
2211 	struct event_eth_rx_adapter *rx_adapter;
2212 	struct rte_eventdev *dev;
2213 	struct eth_device_info *dev_info;
2214 	uint32_t i;
2215 	int use_service = 0;
2216 	int stop = !start;
2217 
2218 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2219 	rx_adapter = rxa_id_to_adapter(id);
2220 	if (rx_adapter == NULL)
2221 		return -EINVAL;
2222 
2223 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2224 
2225 	RTE_ETH_FOREACH_DEV(i) {
2226 		dev_info = &rx_adapter->eth_devices[i];
2227 		/* if start  check for num dev queues */
2228 		if (start && !dev_info->nb_dev_queues)
2229 			continue;
2230 		/* if stop check if dev has been started */
2231 		if (stop && !dev_info->dev_rx_started)
2232 			continue;
2233 		use_service |= !dev_info->internal_event_port;
2234 		dev_info->dev_rx_started = start;
2235 		if (dev_info->internal_event_port == 0)
2236 			continue;
2237 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2238 						&rte_eth_devices[i]) :
2239 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2240 						&rte_eth_devices[i]);
2241 	}
2242 
2243 	if (use_service) {
2244 		rte_spinlock_lock(&rx_adapter->rx_lock);
2245 		rx_adapter->rxa_started = start;
2246 		rte_service_runstate_set(rx_adapter->service_id, start);
2247 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2248 	}
2249 
2250 	return 0;
2251 }
2252 
2253 static int
2254 rxa_create(uint8_t id, uint8_t dev_id,
2255 	   struct rte_event_eth_rx_adapter_params *rxa_params,
2256 	   rte_event_eth_rx_adapter_conf_cb conf_cb,
2257 	   void *conf_arg)
2258 {
2259 	struct event_eth_rx_adapter *rx_adapter;
2260 	struct eth_event_enqueue_buffer *buf;
2261 	struct rte_event *events;
2262 	int ret;
2263 	int socket_id;
2264 	uint16_t i;
2265 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2266 	const uint8_t default_rss_key[] = {
2267 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2268 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2269 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2270 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2271 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2272 	};
2273 
2274 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2275 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2276 
2277 	if (conf_cb == NULL)
2278 		return -EINVAL;
2279 
2280 	if (event_eth_rx_adapter == NULL) {
2281 		ret = rte_event_eth_rx_adapter_init();
2282 		if (ret)
2283 			return ret;
2284 	}
2285 
2286 	rx_adapter = rxa_id_to_adapter(id);
2287 	if (rx_adapter != NULL) {
2288 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2289 		return -EEXIST;
2290 	}
2291 
2292 	socket_id = rte_event_dev_socket_id(dev_id);
2293 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2294 		"rte_event_eth_rx_adapter_%d",
2295 		id);
2296 
2297 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2298 			RTE_CACHE_LINE_SIZE, socket_id);
2299 	if (rx_adapter == NULL) {
2300 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2301 		return -ENOMEM;
2302 	}
2303 
2304 	rx_adapter->eventdev_id = dev_id;
2305 	rx_adapter->socket_id = socket_id;
2306 	rx_adapter->conf_cb = conf_cb;
2307 	rx_adapter->conf_arg = conf_arg;
2308 	rx_adapter->id = id;
2309 	TAILQ_INIT(&rx_adapter->vector_list);
2310 	strcpy(rx_adapter->mem_name, mem_name);
2311 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2312 					RTE_MAX_ETHPORTS *
2313 					sizeof(struct eth_device_info), 0,
2314 					socket_id);
2315 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2316 			(uint32_t *)rx_adapter->rss_key_be,
2317 			    RTE_DIM(default_rss_key));
2318 
2319 	if (rx_adapter->eth_devices == NULL) {
2320 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2321 		rte_free(rx_adapter);
2322 		return -ENOMEM;
2323 	}
2324 
2325 	rte_spinlock_init(&rx_adapter->rx_lock);
2326 
2327 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2328 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2329 
2330 	/* Rx adapter event buffer allocation */
2331 	rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2332 
2333 	if (!rx_adapter->use_queue_event_buf) {
2334 		buf = &rx_adapter->event_enqueue_buffer;
2335 		buf->events_size = rxa_params->event_buf_size;
2336 
2337 		events = rte_zmalloc_socket(rx_adapter->mem_name,
2338 					    buf->events_size * sizeof(*events),
2339 					    0, socket_id);
2340 		if (events == NULL) {
2341 			RTE_EDEV_LOG_ERR("Failed to allocate memory "
2342 					 "for adapter event buffer");
2343 			rte_free(rx_adapter->eth_devices);
2344 			rte_free(rx_adapter);
2345 			return -ENOMEM;
2346 		}
2347 
2348 		rx_adapter->event_enqueue_buffer.events = events;
2349 	}
2350 
2351 	event_eth_rx_adapter[id] = rx_adapter;
2352 
2353 	if (conf_cb == rxa_default_conf_cb)
2354 		rx_adapter->default_cb_arg = 1;
2355 
2356 	if (rte_mbuf_dyn_rx_timestamp_register(
2357 			&event_eth_rx_timestamp_dynfield_offset,
2358 			&event_eth_rx_timestamp_dynflag) != 0) {
2359 		RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2360 		return -rte_errno;
2361 	}
2362 
2363 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2364 		conf_arg);
2365 	return 0;
2366 }
2367 
2368 int
2369 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2370 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2371 				void *conf_arg)
2372 {
2373 	struct rte_event_eth_rx_adapter_params rxa_params = {0};
2374 
2375 	/* use default values for adapter params */
2376 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2377 	rxa_params.use_queue_event_buf = false;
2378 
2379 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2380 }
2381 
2382 int
2383 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2384 			struct rte_event_port_conf *port_config,
2385 			struct rte_event_eth_rx_adapter_params *rxa_params)
2386 {
2387 	struct rte_event_port_conf *pc;
2388 	int ret;
2389 	struct rte_event_eth_rx_adapter_params temp_params = {0};
2390 
2391 	if (port_config == NULL)
2392 		return -EINVAL;
2393 
2394 	if (rxa_params == NULL) {
2395 		/* use default values if rxa_params is NULL */
2396 		rxa_params = &temp_params;
2397 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2398 		rxa_params->use_queue_event_buf = false;
2399 	} else if ((!rxa_params->use_queue_event_buf &&
2400 		    rxa_params->event_buf_size == 0) ||
2401 		   (rxa_params->use_queue_event_buf &&
2402 		    rxa_params->event_buf_size != 0)) {
2403 		RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2404 		return -EINVAL;
2405 	} else if (!rxa_params->use_queue_event_buf) {
2406 		/* adjust event buff size with BATCH_SIZE used for fetching
2407 		 * packets from NIC rx queues to get full buffer utilization
2408 		 * and prevent unnecessary rollovers.
2409 		 */
2410 
2411 		rxa_params->event_buf_size =
2412 			RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2413 		rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2414 	}
2415 
2416 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2417 	if (pc == NULL)
2418 		return -ENOMEM;
2419 
2420 	*pc = *port_config;
2421 
2422 	ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2423 	if (ret)
2424 		rte_free(pc);
2425 
2426 	return ret;
2427 }
2428 
2429 int
2430 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2431 		struct rte_event_port_conf *port_config)
2432 {
2433 	struct rte_event_port_conf *pc;
2434 	int ret;
2435 
2436 	if (port_config == NULL)
2437 		return -EINVAL;
2438 
2439 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2440 
2441 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2442 	if (pc == NULL)
2443 		return -ENOMEM;
2444 	*pc = *port_config;
2445 
2446 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2447 					rxa_default_conf_cb,
2448 					pc);
2449 	if (ret)
2450 		rte_free(pc);
2451 	return ret;
2452 }
2453 
2454 int
2455 rte_event_eth_rx_adapter_free(uint8_t id)
2456 {
2457 	struct event_eth_rx_adapter *rx_adapter;
2458 
2459 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2460 
2461 	rx_adapter = rxa_id_to_adapter(id);
2462 	if (rx_adapter == NULL)
2463 		return -EINVAL;
2464 
2465 	if (rx_adapter->nb_queues) {
2466 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2467 				rx_adapter->nb_queues);
2468 		return -EBUSY;
2469 	}
2470 
2471 	if (rx_adapter->default_cb_arg)
2472 		rte_free(rx_adapter->conf_arg);
2473 	rte_free(rx_adapter->eth_devices);
2474 	if (!rx_adapter->use_queue_event_buf)
2475 		rte_free(rx_adapter->event_enqueue_buffer.events);
2476 	rte_free(rx_adapter);
2477 	event_eth_rx_adapter[id] = NULL;
2478 
2479 	rte_eventdev_trace_eth_rx_adapter_free(id);
2480 	return 0;
2481 }
2482 
2483 int
2484 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2485 		uint16_t eth_dev_id,
2486 		int32_t rx_queue_id,
2487 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2488 {
2489 	int ret;
2490 	uint32_t cap;
2491 	struct event_eth_rx_adapter *rx_adapter;
2492 	struct rte_eventdev *dev;
2493 	struct eth_device_info *dev_info;
2494 	struct rte_event_eth_rx_adapter_vector_limits limits;
2495 
2496 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2497 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2498 
2499 	rx_adapter = rxa_id_to_adapter(id);
2500 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2501 		return -EINVAL;
2502 
2503 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2504 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2505 						eth_dev_id,
2506 						&cap);
2507 	if (ret) {
2508 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2509 			"eth port %" PRIu16, id, eth_dev_id);
2510 		return ret;
2511 	}
2512 
2513 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2514 		&& (queue_conf->rx_queue_flags &
2515 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2516 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2517 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2518 				eth_dev_id, id);
2519 		return -EINVAL;
2520 	}
2521 
2522 	if (queue_conf->rx_queue_flags &
2523 	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2524 
2525 		if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2526 			RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2527 					 " eth port: %" PRIu16
2528 					 " adapter id: %" PRIu8,
2529 					 eth_dev_id, id);
2530 			return -EINVAL;
2531 		}
2532 
2533 		ret = rte_event_eth_rx_adapter_vector_limits_get(
2534 			rx_adapter->eventdev_id, eth_dev_id, &limits);
2535 		if (ret < 0) {
2536 			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2537 					 " eth port: %" PRIu16
2538 					 " adapter id: %" PRIu8,
2539 					 eth_dev_id, id);
2540 			return -EINVAL;
2541 		}
2542 		if (queue_conf->vector_sz < limits.min_sz ||
2543 		    queue_conf->vector_sz > limits.max_sz ||
2544 		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2545 		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2546 		    queue_conf->vector_mp == NULL) {
2547 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2548 					 " eth port: %" PRIu16
2549 					 " adapter id: %" PRIu8,
2550 					 eth_dev_id, id);
2551 			return -EINVAL;
2552 		}
2553 		if (queue_conf->vector_mp->elt_size <
2554 		    (sizeof(struct rte_event_vector) +
2555 		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2556 			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2557 					 " eth port: %" PRIu16
2558 					 " adapter id: %" PRIu8,
2559 					 eth_dev_id, id);
2560 			return -EINVAL;
2561 		}
2562 	}
2563 
2564 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2565 		(rx_queue_id != -1)) {
2566 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2567 			"event queue, eth port: %" PRIu16 " adapter id: %"
2568 			PRIu8, eth_dev_id, id);
2569 		return -EINVAL;
2570 	}
2571 
2572 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2573 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2574 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2575 			 (uint16_t)rx_queue_id);
2576 		return -EINVAL;
2577 	}
2578 
2579 	if ((rx_adapter->use_queue_event_buf &&
2580 	     queue_conf->event_buf_size == 0) ||
2581 	    (!rx_adapter->use_queue_event_buf &&
2582 	     queue_conf->event_buf_size != 0)) {
2583 		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2584 		return -EINVAL;
2585 	}
2586 
2587 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2588 
2589 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2590 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2591 					-ENOTSUP);
2592 		if (dev_info->rx_queue == NULL) {
2593 			dev_info->rx_queue =
2594 			    rte_zmalloc_socket(rx_adapter->mem_name,
2595 					dev_info->dev->data->nb_rx_queues *
2596 					sizeof(struct eth_rx_queue_info), 0,
2597 					rx_adapter->socket_id);
2598 			if (dev_info->rx_queue == NULL)
2599 				return -ENOMEM;
2600 		}
2601 
2602 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2603 				&rte_eth_devices[eth_dev_id],
2604 				rx_queue_id, queue_conf);
2605 		if (ret == 0) {
2606 			dev_info->internal_event_port = 1;
2607 			rxa_update_queue(rx_adapter,
2608 					&rx_adapter->eth_devices[eth_dev_id],
2609 					rx_queue_id,
2610 					1);
2611 		}
2612 	} else {
2613 		rte_spinlock_lock(&rx_adapter->rx_lock);
2614 		dev_info->internal_event_port = 0;
2615 		ret = rxa_init_service(rx_adapter, id);
2616 		if (ret == 0) {
2617 			uint32_t service_id = rx_adapter->service_id;
2618 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2619 					queue_conf);
2620 			rte_service_component_runstate_set(service_id,
2621 				rxa_sw_adapter_queue_count(rx_adapter));
2622 		}
2623 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2624 	}
2625 
2626 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2627 		rx_queue_id, queue_conf, ret);
2628 	if (ret)
2629 		return ret;
2630 
2631 	return 0;
2632 }
2633 
2634 static int
2635 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2636 {
2637 	limits->max_sz = MAX_VECTOR_SIZE;
2638 	limits->min_sz = MIN_VECTOR_SIZE;
2639 	limits->max_timeout_ns = MAX_VECTOR_NS;
2640 	limits->min_timeout_ns = MIN_VECTOR_NS;
2641 
2642 	return 0;
2643 }
2644 
2645 int
2646 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2647 				int32_t rx_queue_id)
2648 {
2649 	int ret = 0;
2650 	struct rte_eventdev *dev;
2651 	struct event_eth_rx_adapter *rx_adapter;
2652 	struct eth_device_info *dev_info;
2653 	uint32_t cap;
2654 	uint32_t nb_rx_poll = 0;
2655 	uint32_t nb_wrr = 0;
2656 	uint32_t nb_rx_intr;
2657 	struct eth_rx_poll_entry *rx_poll = NULL;
2658 	uint32_t *rx_wrr = NULL;
2659 	int num_intr_vec;
2660 
2661 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2662 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2663 
2664 	rx_adapter = rxa_id_to_adapter(id);
2665 	if (rx_adapter == NULL)
2666 		return -EINVAL;
2667 
2668 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2669 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2670 						eth_dev_id,
2671 						&cap);
2672 	if (ret)
2673 		return ret;
2674 
2675 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2676 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2677 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2678 			 (uint16_t)rx_queue_id);
2679 		return -EINVAL;
2680 	}
2681 
2682 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2683 
2684 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2685 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2686 				 -ENOTSUP);
2687 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2688 						&rte_eth_devices[eth_dev_id],
2689 						rx_queue_id);
2690 		if (ret == 0) {
2691 			rxa_update_queue(rx_adapter,
2692 					&rx_adapter->eth_devices[eth_dev_id],
2693 					rx_queue_id,
2694 					0);
2695 			if (dev_info->nb_dev_queues == 0) {
2696 				rte_free(dev_info->rx_queue);
2697 				dev_info->rx_queue = NULL;
2698 			}
2699 		}
2700 	} else {
2701 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2702 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2703 
2704 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2705 			&rx_poll, &rx_wrr);
2706 		if (ret)
2707 			return ret;
2708 
2709 		rte_spinlock_lock(&rx_adapter->rx_lock);
2710 
2711 		num_intr_vec = 0;
2712 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2713 
2714 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2715 						rx_queue_id, 0);
2716 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2717 					rx_queue_id);
2718 			if (ret)
2719 				goto unlock_ret;
2720 		}
2721 
2722 		if (nb_rx_intr == 0) {
2723 			ret = rxa_free_intr_resources(rx_adapter);
2724 			if (ret)
2725 				goto unlock_ret;
2726 		}
2727 
2728 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2729 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2730 
2731 		rte_free(rx_adapter->eth_rx_poll);
2732 		rte_free(rx_adapter->wrr_sched);
2733 
2734 		if (nb_rx_intr == 0) {
2735 			rte_free(dev_info->intr_queue);
2736 			dev_info->intr_queue = NULL;
2737 		}
2738 
2739 		rx_adapter->eth_rx_poll = rx_poll;
2740 		rx_adapter->wrr_sched = rx_wrr;
2741 		rx_adapter->wrr_len = nb_wrr;
2742 		/*
2743 		 * reset next poll start position (wrr_pos) to avoid buffer
2744 		 * overrun when wrr_len is reduced in case of queue delete
2745 		 */
2746 		rx_adapter->wrr_pos = 0;
2747 		rx_adapter->num_intr_vec += num_intr_vec;
2748 
2749 		if (dev_info->nb_dev_queues == 0) {
2750 			rte_free(dev_info->rx_queue);
2751 			dev_info->rx_queue = NULL;
2752 		}
2753 unlock_ret:
2754 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2755 		if (ret) {
2756 			rte_free(rx_poll);
2757 			rte_free(rx_wrr);
2758 			return ret;
2759 		}
2760 
2761 		rte_service_component_runstate_set(rx_adapter->service_id,
2762 				rxa_sw_adapter_queue_count(rx_adapter));
2763 	}
2764 
2765 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2766 		rx_queue_id, ret);
2767 	return ret;
2768 }
2769 
2770 int
2771 rte_event_eth_rx_adapter_vector_limits_get(
2772 	uint8_t dev_id, uint16_t eth_port_id,
2773 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2774 {
2775 	struct rte_eventdev *dev;
2776 	uint32_t cap;
2777 	int ret;
2778 
2779 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2780 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2781 
2782 	if (limits == NULL)
2783 		return -EINVAL;
2784 
2785 	dev = &rte_eventdevs[dev_id];
2786 
2787 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2788 	if (ret) {
2789 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2790 				 "eth port %" PRIu16,
2791 				 dev_id, eth_port_id);
2792 		return ret;
2793 	}
2794 
2795 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2796 		RTE_FUNC_PTR_OR_ERR_RET(
2797 			*dev->dev_ops->eth_rx_adapter_vector_limits_get,
2798 			-ENOTSUP);
2799 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2800 			dev, &rte_eth_devices[eth_port_id], limits);
2801 	} else {
2802 		ret = rxa_sw_vector_limits(limits);
2803 	}
2804 
2805 	return ret;
2806 }
2807 
2808 int
2809 rte_event_eth_rx_adapter_start(uint8_t id)
2810 {
2811 	rte_eventdev_trace_eth_rx_adapter_start(id);
2812 	return rxa_ctrl(id, 1);
2813 }
2814 
2815 int
2816 rte_event_eth_rx_adapter_stop(uint8_t id)
2817 {
2818 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2819 	return rxa_ctrl(id, 0);
2820 }
2821 
2822 int
2823 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2824 			       struct rte_event_eth_rx_adapter_stats *stats)
2825 {
2826 	struct event_eth_rx_adapter *rx_adapter;
2827 	struct eth_event_enqueue_buffer *buf;
2828 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2829 	struct rte_event_eth_rx_adapter_stats dev_stats;
2830 	struct rte_eventdev *dev;
2831 	struct eth_device_info *dev_info;
2832 	uint32_t i;
2833 	int ret;
2834 
2835 	if (rxa_memzone_lookup())
2836 		return -ENOMEM;
2837 
2838 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2839 
2840 	rx_adapter = rxa_id_to_adapter(id);
2841 	if (rx_adapter  == NULL || stats == NULL)
2842 		return -EINVAL;
2843 
2844 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2845 	memset(stats, 0, sizeof(*stats));
2846 	RTE_ETH_FOREACH_DEV(i) {
2847 		dev_info = &rx_adapter->eth_devices[i];
2848 		if (dev_info->internal_event_port == 0 ||
2849 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2850 			continue;
2851 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2852 						&rte_eth_devices[i],
2853 						&dev_stats);
2854 		if (ret)
2855 			continue;
2856 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
2857 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2858 	}
2859 
2860 	if (rx_adapter->service_inited)
2861 		*stats = rx_adapter->stats;
2862 
2863 	stats->rx_packets += dev_stats_sum.rx_packets;
2864 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2865 
2866 	if (!rx_adapter->use_queue_event_buf) {
2867 		buf = &rx_adapter->event_enqueue_buffer;
2868 		stats->rx_event_buf_count = buf->count;
2869 		stats->rx_event_buf_size = buf->events_size;
2870 	} else {
2871 		stats->rx_event_buf_count = 0;
2872 		stats->rx_event_buf_size = 0;
2873 	}
2874 
2875 	return 0;
2876 }
2877 
2878 int
2879 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2880 {
2881 	struct event_eth_rx_adapter *rx_adapter;
2882 	struct rte_eventdev *dev;
2883 	struct eth_device_info *dev_info;
2884 	uint32_t i;
2885 
2886 	if (rxa_memzone_lookup())
2887 		return -ENOMEM;
2888 
2889 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2890 
2891 	rx_adapter = rxa_id_to_adapter(id);
2892 	if (rx_adapter == NULL)
2893 		return -EINVAL;
2894 
2895 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2896 	RTE_ETH_FOREACH_DEV(i) {
2897 		dev_info = &rx_adapter->eth_devices[i];
2898 		if (dev_info->internal_event_port == 0 ||
2899 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2900 			continue;
2901 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2902 							&rte_eth_devices[i]);
2903 	}
2904 
2905 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2906 	return 0;
2907 }
2908 
2909 int
2910 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2911 {
2912 	struct event_eth_rx_adapter *rx_adapter;
2913 
2914 	if (rxa_memzone_lookup())
2915 		return -ENOMEM;
2916 
2917 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2918 
2919 	rx_adapter = rxa_id_to_adapter(id);
2920 	if (rx_adapter == NULL || service_id == NULL)
2921 		return -EINVAL;
2922 
2923 	if (rx_adapter->service_inited)
2924 		*service_id = rx_adapter->service_id;
2925 
2926 	return rx_adapter->service_inited ? 0 : -ESRCH;
2927 }
2928 
2929 int
2930 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2931 					uint16_t eth_dev_id,
2932 					rte_event_eth_rx_adapter_cb_fn cb_fn,
2933 					void *cb_arg)
2934 {
2935 	struct event_eth_rx_adapter *rx_adapter;
2936 	struct eth_device_info *dev_info;
2937 	uint32_t cap;
2938 	int ret;
2939 
2940 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2941 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2942 
2943 	rx_adapter = rxa_id_to_adapter(id);
2944 	if (rx_adapter == NULL)
2945 		return -EINVAL;
2946 
2947 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2948 	if (dev_info->rx_queue == NULL)
2949 		return -EINVAL;
2950 
2951 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2952 						eth_dev_id,
2953 						&cap);
2954 	if (ret) {
2955 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2956 			"eth port %" PRIu16, id, eth_dev_id);
2957 		return ret;
2958 	}
2959 
2960 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2961 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2962 				PRIu16, eth_dev_id);
2963 		return -EINVAL;
2964 	}
2965 
2966 	rte_spinlock_lock(&rx_adapter->rx_lock);
2967 	dev_info->cb_fn = cb_fn;
2968 	dev_info->cb_arg = cb_arg;
2969 	rte_spinlock_unlock(&rx_adapter->rx_lock);
2970 
2971 	return 0;
2972 }
2973 
2974 int
2975 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
2976 			uint16_t eth_dev_id,
2977 			uint16_t rx_queue_id,
2978 			struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2979 {
2980 	struct rte_eventdev *dev;
2981 	struct event_eth_rx_adapter *rx_adapter;
2982 	struct eth_device_info *dev_info;
2983 	struct eth_rx_queue_info *queue_info;
2984 	struct rte_event *qi_ev;
2985 	int ret;
2986 
2987 	if (rxa_memzone_lookup())
2988 		return -ENOMEM;
2989 
2990 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2991 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2992 
2993 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2994 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
2995 		return -EINVAL;
2996 	}
2997 
2998 	if (queue_conf == NULL) {
2999 		RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3000 		return -EINVAL;
3001 	}
3002 
3003 	rx_adapter = rxa_id_to_adapter(id);
3004 	if (rx_adapter == NULL)
3005 		return -EINVAL;
3006 
3007 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
3008 	if (dev_info->rx_queue == NULL ||
3009 	    !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3010 		RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3011 		return -EINVAL;
3012 	}
3013 
3014 	queue_info = &dev_info->rx_queue[rx_queue_id];
3015 	qi_ev = (struct rte_event *)&queue_info->event;
3016 
3017 	memset(queue_conf, 0, sizeof(*queue_conf));
3018 	queue_conf->rx_queue_flags = 0;
3019 	if (queue_info->flow_id_mask != 0)
3020 		queue_conf->rx_queue_flags |=
3021 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3022 	queue_conf->servicing_weight = queue_info->wt;
3023 
3024 	memcpy(&queue_conf->ev, qi_ev, sizeof(*qi_ev));
3025 
3026 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
3027 	if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3028 		ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3029 						&rte_eth_devices[eth_dev_id],
3030 						rx_queue_id,
3031 						queue_conf);
3032 		return ret;
3033 	}
3034 
3035 	return 0;
3036 }
3037 
3038 #define RXA_ADD_DICT(stats, s) rte_tel_data_add_dict_u64(d, #s, stats.s)
3039 
3040 static int
3041 handle_rxa_stats(const char *cmd __rte_unused,
3042 		 const char *params,
3043 		 struct rte_tel_data *d)
3044 {
3045 	uint8_t rx_adapter_id;
3046 	struct rte_event_eth_rx_adapter_stats rx_adptr_stats;
3047 
3048 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3049 		return -1;
3050 
3051 	/* Get Rx adapter ID from parameter string */
3052 	rx_adapter_id = atoi(params);
3053 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3054 
3055 	/* Get Rx adapter stats */
3056 	if (rte_event_eth_rx_adapter_stats_get(rx_adapter_id,
3057 					       &rx_adptr_stats)) {
3058 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter stats\n");
3059 		return -1;
3060 	}
3061 
3062 	rte_tel_data_start_dict(d);
3063 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3064 	RXA_ADD_DICT(rx_adptr_stats, rx_packets);
3065 	RXA_ADD_DICT(rx_adptr_stats, rx_poll_count);
3066 	RXA_ADD_DICT(rx_adptr_stats, rx_dropped);
3067 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_retry);
3068 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_count);
3069 	RXA_ADD_DICT(rx_adptr_stats, rx_event_buf_size);
3070 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_count);
3071 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_start_ts);
3072 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_block_cycles);
3073 	RXA_ADD_DICT(rx_adptr_stats, rx_enq_end_ts);
3074 	RXA_ADD_DICT(rx_adptr_stats, rx_intr_packets);
3075 
3076 	return 0;
3077 }
3078 
3079 static int
3080 handle_rxa_stats_reset(const char *cmd __rte_unused,
3081 		       const char *params,
3082 		       struct rte_tel_data *d __rte_unused)
3083 {
3084 	uint8_t rx_adapter_id;
3085 
3086 	if (params == NULL || strlen(params) == 0 || ~isdigit(*params))
3087 		return -1;
3088 
3089 	/* Get Rx adapter ID from parameter string */
3090 	rx_adapter_id = atoi(params);
3091 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3092 
3093 	/* Reset Rx adapter stats */
3094 	if (rte_event_eth_rx_adapter_stats_reset(rx_adapter_id)) {
3095 		RTE_EDEV_LOG_ERR("Failed to reset Rx adapter stats\n");
3096 		return -1;
3097 	}
3098 
3099 	return 0;
3100 }
3101 
3102 static int
3103 handle_rxa_get_queue_conf(const char *cmd __rte_unused,
3104 			  const char *params,
3105 			  struct rte_tel_data *d)
3106 {
3107 	uint8_t rx_adapter_id;
3108 	uint16_t rx_queue_id;
3109 	int eth_dev_id;
3110 	char *token, *l_params;
3111 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
3112 
3113 	if (params == NULL || strlen(params) == 0 || !isdigit(*params))
3114 		return -1;
3115 
3116 	/* Get Rx adapter ID from parameter string */
3117 	l_params = strdup(params);
3118 	token = strtok(l_params, ",");
3119 	rx_adapter_id = strtoul(token, NULL, 10);
3120 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(rx_adapter_id, -EINVAL);
3121 
3122 	token = strtok(NULL, ",");
3123 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3124 		return -1;
3125 
3126 	/* Get device ID from parameter string */
3127 	eth_dev_id = strtoul(token, NULL, 10);
3128 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(eth_dev_id, -EINVAL);
3129 
3130 	token = strtok(NULL, ",");
3131 	if (token == NULL || strlen(token) == 0 || !isdigit(*token))
3132 		return -1;
3133 
3134 	/* Get Rx queue ID from parameter string */
3135 	rx_queue_id = strtoul(token, NULL, 10);
3136 	if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3137 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3138 		return -EINVAL;
3139 	}
3140 
3141 	token = strtok(NULL, "\0");
3142 	if (token != NULL)
3143 		RTE_EDEV_LOG_ERR("Extra parameters passed to eventdev"
3144 				 " telemetry command, igrnoring");
3145 
3146 	if (rte_event_eth_rx_adapter_queue_conf_get(rx_adapter_id, eth_dev_id,
3147 						    rx_queue_id, &queue_conf)) {
3148 		RTE_EDEV_LOG_ERR("Failed to get Rx adapter queue config");
3149 		return -1;
3150 	}
3151 
3152 	rte_tel_data_start_dict(d);
3153 	rte_tel_data_add_dict_u64(d, "rx_adapter_id", rx_adapter_id);
3154 	rte_tel_data_add_dict_u64(d, "eth_dev_id", eth_dev_id);
3155 	rte_tel_data_add_dict_u64(d, "rx_queue_id", rx_queue_id);
3156 	RXA_ADD_DICT(queue_conf, rx_queue_flags);
3157 	RXA_ADD_DICT(queue_conf, servicing_weight);
3158 	RXA_ADD_DICT(queue_conf.ev, queue_id);
3159 	RXA_ADD_DICT(queue_conf.ev, sched_type);
3160 	RXA_ADD_DICT(queue_conf.ev, priority);
3161 	RXA_ADD_DICT(queue_conf.ev, flow_id);
3162 
3163 	return 0;
3164 }
3165 
3166 RTE_INIT(rxa_init_telemetry)
3167 {
3168 	rte_telemetry_register_cmd("/eventdev/rxa_stats",
3169 		handle_rxa_stats,
3170 		"Returns Rx adapter stats. Parameter: rxa_id");
3171 
3172 	rte_telemetry_register_cmd("/eventdev/rxa_stats_reset",
3173 		handle_rxa_stats_reset,
3174 		"Reset Rx adapter stats. Parameter: rxa_id");
3175 
3176 	rte_telemetry_register_cmd("/eventdev/rxa_queue_conf",
3177 		handle_rxa_get_queue_conf,
3178 		"Returns Rx queue config. Parameter: rxa_id, dev_id, queue_id");
3179 }
3180