xref: /dpdk/lib/eventdev/rte_event_eth_rx_adapter.c (revision 081e42dab11d1add2d038fdf2bd4c86b20043d08)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9 
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 
21 #include "rte_eventdev.h"
22 #include "eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
25 
26 #define BATCH_SIZE		32
27 #define BLOCK_CNT_THRESHOLD	10
28 #define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
29 #define MAX_VECTOR_SIZE		1024
30 #define MIN_VECTOR_SIZE		4
31 #define MAX_VECTOR_NS		1E9
32 #define MIN_VECTOR_NS		1E5
33 
34 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
35 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
36 
37 #define RSS_KEY_SIZE	40
38 /* value written to intr thread pipe to signal thread exit */
39 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
40 /* Sentinel value to detect initialized file handle */
41 #define INIT_FD		-1
42 
43 /*
44  * Used to store port and queue ID of interrupting Rx queue
45  */
46 union queue_data {
47 	RTE_STD_C11
48 	void *ptr;
49 	struct {
50 		uint16_t port;
51 		uint16_t queue;
52 	};
53 };
54 
55 /*
56  * There is an instance of this struct per polled Rx queue added to the
57  * adapter
58  */
59 struct eth_rx_poll_entry {
60 	/* Eth port to poll */
61 	uint16_t eth_dev_id;
62 	/* Eth rx queue to poll */
63 	uint16_t eth_rx_qid;
64 };
65 
66 struct eth_rx_vector_data {
67 	TAILQ_ENTRY(eth_rx_vector_data) next;
68 	uint16_t port;
69 	uint16_t queue;
70 	uint16_t max_vector_count;
71 	uint64_t event;
72 	uint64_t ts;
73 	uint64_t vector_timeout_ticks;
74 	struct rte_mempool *vector_pool;
75 	struct rte_event_vector *vector_ev;
76 } __rte_cache_aligned;
77 
78 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
79 
80 /* Instance per adapter */
81 struct rte_eth_event_enqueue_buffer {
82 	/* Count of events in this buffer */
83 	uint16_t count;
84 	/* Array of events in this buffer */
85 	struct rte_event events[ETH_EVENT_BUFFER_SIZE];
86 };
87 
88 struct rte_event_eth_rx_adapter {
89 	/* RSS key */
90 	uint8_t rss_key_be[RSS_KEY_SIZE];
91 	/* Event device identifier */
92 	uint8_t eventdev_id;
93 	/* Per ethernet device structure */
94 	struct eth_device_info *eth_devices;
95 	/* Event port identifier */
96 	uint8_t event_port_id;
97 	/* Lock to serialize config updates with service function */
98 	rte_spinlock_t rx_lock;
99 	/* Max mbufs processed in any service function invocation */
100 	uint32_t max_nb_rx;
101 	/* Receive queues that need to be polled */
102 	struct eth_rx_poll_entry *eth_rx_poll;
103 	/* Size of the eth_rx_poll array */
104 	uint16_t num_rx_polled;
105 	/* Weighted round robin schedule */
106 	uint32_t *wrr_sched;
107 	/* wrr_sched[] size */
108 	uint32_t wrr_len;
109 	/* Next entry in wrr[] to begin polling */
110 	uint32_t wrr_pos;
111 	/* Event burst buffer */
112 	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
113 	/* Vector enable flag */
114 	uint8_t ena_vector;
115 	/* Timestamp of previous vector expiry list traversal */
116 	uint64_t prev_expiry_ts;
117 	/* Minimum ticks to wait before traversing expiry list */
118 	uint64_t vector_tmo_ticks;
119 	/* vector list */
120 	struct eth_rx_vector_data_list vector_list;
121 	/* Per adapter stats */
122 	struct rte_event_eth_rx_adapter_stats stats;
123 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
124 	uint16_t enq_block_count;
125 	/* Block start ts */
126 	uint64_t rx_enq_block_start_ts;
127 	/* epoll fd used to wait for Rx interrupts */
128 	int epd;
129 	/* Num of interrupt driven interrupt queues */
130 	uint32_t num_rx_intr;
131 	/* Used to send <dev id, queue id> of interrupting Rx queues from
132 	 * the interrupt thread to the Rx thread
133 	 */
134 	struct rte_ring *intr_ring;
135 	/* Rx Queue data (dev id, queue id) for the last non-empty
136 	 * queue polled
137 	 */
138 	union queue_data qd;
139 	/* queue_data is valid */
140 	int qd_valid;
141 	/* Interrupt ring lock, synchronizes Rx thread
142 	 * and interrupt thread
143 	 */
144 	rte_spinlock_t intr_ring_lock;
145 	/* event array passed to rte_poll_wait */
146 	struct rte_epoll_event *epoll_events;
147 	/* Count of interrupt vectors in use */
148 	uint32_t num_intr_vec;
149 	/* Thread blocked on Rx interrupts */
150 	pthread_t rx_intr_thread;
151 	/* Configuration callback for rte_service configuration */
152 	rte_event_eth_rx_adapter_conf_cb conf_cb;
153 	/* Configuration callback argument */
154 	void *conf_arg;
155 	/* Set if  default_cb is being used */
156 	int default_cb_arg;
157 	/* Service initialization state */
158 	uint8_t service_inited;
159 	/* Total count of Rx queues in adapter */
160 	uint32_t nb_queues;
161 	/* Memory allocation name */
162 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
163 	/* Socket identifier cached from eventdev */
164 	int socket_id;
165 	/* Per adapter EAL service */
166 	uint32_t service_id;
167 	/* Adapter started flag */
168 	uint8_t rxa_started;
169 	/* Adapter ID */
170 	uint8_t id;
171 } __rte_cache_aligned;
172 
173 /* Per eth device */
174 struct eth_device_info {
175 	struct rte_eth_dev *dev;
176 	struct eth_rx_queue_info *rx_queue;
177 	/* Rx callback */
178 	rte_event_eth_rx_adapter_cb_fn cb_fn;
179 	/* Rx callback argument */
180 	void *cb_arg;
181 	/* Set if ethdev->eventdev packet transfer uses a
182 	 * hardware mechanism
183 	 */
184 	uint8_t internal_event_port;
185 	/* Set if the adapter is processing rx queues for
186 	 * this eth device and packet processing has been
187 	 * started, allows for the code to know if the PMD
188 	 * rx_adapter_stop callback needs to be invoked
189 	 */
190 	uint8_t dev_rx_started;
191 	/* Number of queues added for this device */
192 	uint16_t nb_dev_queues;
193 	/* Number of poll based queues
194 	 * If nb_rx_poll > 0, the start callback will
195 	 * be invoked if not already invoked
196 	 */
197 	uint16_t nb_rx_poll;
198 	/* Number of interrupt based queues
199 	 * If nb_rx_intr > 0, the start callback will
200 	 * be invoked if not already invoked.
201 	 */
202 	uint16_t nb_rx_intr;
203 	/* Number of queues that use the shared interrupt */
204 	uint16_t nb_shared_intr;
205 	/* sum(wrr(q)) for all queues within the device
206 	 * useful when deleting all device queues
207 	 */
208 	uint32_t wrr_len;
209 	/* Intr based queue index to start polling from, this is used
210 	 * if the number of shared interrupts is non-zero
211 	 */
212 	uint16_t next_q_idx;
213 	/* Intr based queue indices */
214 	uint16_t *intr_queue;
215 	/* device generates per Rx queue interrupt for queue index
216 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
217 	 */
218 	int multi_intr_cap;
219 	/* shared interrupt enabled */
220 	int shared_intr_enabled;
221 };
222 
223 /* Per Rx queue */
224 struct eth_rx_queue_info {
225 	int queue_enabled;	/* True if added */
226 	int intr_enabled;
227 	uint8_t ena_vector;
228 	uint16_t wt;		/* Polling weight */
229 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
230 	uint64_t event;
231 	struct eth_rx_vector_data vector_data;
232 };
233 
234 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
235 
236 static inline int
237 rxa_validate_id(uint8_t id)
238 {
239 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
240 }
241 
242 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
243 	if (!rxa_validate_id(id)) { \
244 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
245 		return retval; \
246 	} \
247 } while (0)
248 
249 static inline int
250 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
251 {
252 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
253 }
254 
255 /* Greatest common divisor */
256 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
257 {
258 	uint16_t r = a % b;
259 
260 	return r ? rxa_gcd_u16(b, r) : b;
261 }
262 
263 /* Returns the next queue in the polling sequence
264  *
265  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
266  */
267 static int
268 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
269 	 unsigned int n, int *cw,
270 	 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
271 	 uint16_t gcd, int prev)
272 {
273 	int i = prev;
274 	uint16_t w;
275 
276 	while (1) {
277 		uint16_t q;
278 		uint16_t d;
279 
280 		i = (i + 1) % n;
281 		if (i == 0) {
282 			*cw = *cw - gcd;
283 			if (*cw <= 0)
284 				*cw = max_wt;
285 		}
286 
287 		q = eth_rx_poll[i].eth_rx_qid;
288 		d = eth_rx_poll[i].eth_dev_id;
289 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
290 
291 		if ((int)w >= *cw)
292 			return i;
293 	}
294 }
295 
296 static inline int
297 rxa_shared_intr(struct eth_device_info *dev_info,
298 	int rx_queue_id)
299 {
300 	int multi_intr_cap;
301 
302 	if (dev_info->dev->intr_handle == NULL)
303 		return 0;
304 
305 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
306 	return !multi_intr_cap ||
307 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
308 }
309 
310 static inline int
311 rxa_intr_queue(struct eth_device_info *dev_info,
312 	int rx_queue_id)
313 {
314 	struct eth_rx_queue_info *queue_info;
315 
316 	queue_info = &dev_info->rx_queue[rx_queue_id];
317 	return dev_info->rx_queue &&
318 		!dev_info->internal_event_port &&
319 		queue_info->queue_enabled && queue_info->wt == 0;
320 }
321 
322 static inline int
323 rxa_polled_queue(struct eth_device_info *dev_info,
324 	int rx_queue_id)
325 {
326 	struct eth_rx_queue_info *queue_info;
327 
328 	queue_info = &dev_info->rx_queue[rx_queue_id];
329 	return !dev_info->internal_event_port &&
330 		dev_info->rx_queue &&
331 		queue_info->queue_enabled && queue_info->wt != 0;
332 }
333 
334 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
335 static int
336 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
337 {
338 	uint16_t i;
339 	int n, s;
340 	uint16_t nbq;
341 
342 	nbq = dev_info->dev->data->nb_rx_queues;
343 	n = 0; /* non shared count */
344 	s = 0; /* shared count */
345 
346 	if (rx_queue_id == -1) {
347 		for (i = 0; i < nbq; i++) {
348 			if (!rxa_shared_intr(dev_info, i))
349 				n += add ? !rxa_intr_queue(dev_info, i) :
350 					rxa_intr_queue(dev_info, i);
351 			else
352 				s += add ? !rxa_intr_queue(dev_info, i) :
353 					rxa_intr_queue(dev_info, i);
354 		}
355 
356 		if (s > 0) {
357 			if ((add && dev_info->nb_shared_intr == 0) ||
358 				(!add && dev_info->nb_shared_intr))
359 				n += 1;
360 		}
361 	} else {
362 		if (!rxa_shared_intr(dev_info, rx_queue_id))
363 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
364 				rxa_intr_queue(dev_info, rx_queue_id);
365 		else
366 			n = add ? !dev_info->nb_shared_intr :
367 				dev_info->nb_shared_intr == 1;
368 	}
369 
370 	return add ? n : -n;
371 }
372 
373 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
374  */
375 static void
376 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
377 			struct eth_device_info *dev_info,
378 			int rx_queue_id,
379 			uint32_t *nb_rx_intr)
380 {
381 	uint32_t intr_diff;
382 
383 	if (rx_queue_id == -1)
384 		intr_diff = dev_info->nb_rx_intr;
385 	else
386 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
387 
388 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
389 }
390 
391 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
392  * interrupt queues could currently be poll mode Rx queues
393  */
394 static void
395 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
396 			struct eth_device_info *dev_info,
397 			int rx_queue_id,
398 			uint32_t *nb_rx_poll,
399 			uint32_t *nb_rx_intr,
400 			uint32_t *nb_wrr)
401 {
402 	uint32_t intr_diff;
403 	uint32_t poll_diff;
404 	uint32_t wrr_len_diff;
405 
406 	if (rx_queue_id == -1) {
407 		intr_diff = dev_info->dev->data->nb_rx_queues -
408 						dev_info->nb_rx_intr;
409 		poll_diff = dev_info->nb_rx_poll;
410 		wrr_len_diff = dev_info->wrr_len;
411 	} else {
412 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
413 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
414 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
415 					0;
416 	}
417 
418 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
419 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 }
422 
423 /* Calculate size of the eth_rx_poll and wrr_sched arrays
424  * after deleting poll mode rx queues
425  */
426 static void
427 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
428 			struct eth_device_info *dev_info,
429 			int rx_queue_id,
430 			uint32_t *nb_rx_poll,
431 			uint32_t *nb_wrr)
432 {
433 	uint32_t poll_diff;
434 	uint32_t wrr_len_diff;
435 
436 	if (rx_queue_id == -1) {
437 		poll_diff = dev_info->nb_rx_poll;
438 		wrr_len_diff = dev_info->wrr_len;
439 	} else {
440 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
441 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
442 					0;
443 	}
444 
445 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
446 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
447 }
448 
449 /* Calculate nb_rx_* after adding poll mode rx queues
450  */
451 static void
452 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
453 			struct eth_device_info *dev_info,
454 			int rx_queue_id,
455 			uint16_t wt,
456 			uint32_t *nb_rx_poll,
457 			uint32_t *nb_rx_intr,
458 			uint32_t *nb_wrr)
459 {
460 	uint32_t intr_diff;
461 	uint32_t poll_diff;
462 	uint32_t wrr_len_diff;
463 
464 	if (rx_queue_id == -1) {
465 		intr_diff = dev_info->nb_rx_intr;
466 		poll_diff = dev_info->dev->data->nb_rx_queues -
467 						dev_info->nb_rx_poll;
468 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
469 				- dev_info->wrr_len;
470 	} else {
471 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
472 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
473 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
474 				wt - dev_info->rx_queue[rx_queue_id].wt :
475 				wt;
476 	}
477 
478 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
479 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
480 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
481 }
482 
483 /* Calculate nb_rx_* after adding rx_queue_id */
484 static void
485 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
486 		struct eth_device_info *dev_info,
487 		int rx_queue_id,
488 		uint16_t wt,
489 		uint32_t *nb_rx_poll,
490 		uint32_t *nb_rx_intr,
491 		uint32_t *nb_wrr)
492 {
493 	if (wt != 0)
494 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
495 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
496 	else
497 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
498 					nb_rx_poll, nb_rx_intr, nb_wrr);
499 }
500 
501 /* Calculate nb_rx_* after deleting rx_queue_id */
502 static void
503 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
504 		struct eth_device_info *dev_info,
505 		int rx_queue_id,
506 		uint32_t *nb_rx_poll,
507 		uint32_t *nb_rx_intr,
508 		uint32_t *nb_wrr)
509 {
510 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
511 				nb_wrr);
512 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
513 				nb_rx_intr);
514 }
515 
516 /*
517  * Allocate the rx_poll array
518  */
519 static struct eth_rx_poll_entry *
520 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
521 	uint32_t num_rx_polled)
522 {
523 	size_t len;
524 
525 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
526 							RTE_CACHE_LINE_SIZE);
527 	return  rte_zmalloc_socket(rx_adapter->mem_name,
528 				len,
529 				RTE_CACHE_LINE_SIZE,
530 				rx_adapter->socket_id);
531 }
532 
533 /*
534  * Allocate the WRR array
535  */
536 static uint32_t *
537 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
538 {
539 	size_t len;
540 
541 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
542 			RTE_CACHE_LINE_SIZE);
543 	return  rte_zmalloc_socket(rx_adapter->mem_name,
544 				len,
545 				RTE_CACHE_LINE_SIZE,
546 				rx_adapter->socket_id);
547 }
548 
549 static int
550 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
551 		uint32_t nb_poll,
552 		uint32_t nb_wrr,
553 		struct eth_rx_poll_entry **rx_poll,
554 		uint32_t **wrr_sched)
555 {
556 
557 	if (nb_poll == 0) {
558 		*rx_poll = NULL;
559 		*wrr_sched = NULL;
560 		return 0;
561 	}
562 
563 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
564 	if (*rx_poll == NULL) {
565 		*wrr_sched = NULL;
566 		return -ENOMEM;
567 	}
568 
569 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
570 	if (*wrr_sched == NULL) {
571 		rte_free(*rx_poll);
572 		return -ENOMEM;
573 	}
574 	return 0;
575 }
576 
577 /* Precalculate WRR polling sequence for all queues in rx_adapter */
578 static void
579 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
580 		struct eth_rx_poll_entry *rx_poll,
581 		uint32_t *rx_wrr)
582 {
583 	uint16_t d;
584 	uint16_t q;
585 	unsigned int i;
586 	int prev = -1;
587 	int cw = -1;
588 
589 	/* Initialize variables for calculation of wrr schedule */
590 	uint16_t max_wrr_pos = 0;
591 	unsigned int poll_q = 0;
592 	uint16_t max_wt = 0;
593 	uint16_t gcd = 0;
594 
595 	if (rx_poll == NULL)
596 		return;
597 
598 	/* Generate array of all queues to poll, the size of this
599 	 * array is poll_q
600 	 */
601 	RTE_ETH_FOREACH_DEV(d) {
602 		uint16_t nb_rx_queues;
603 		struct eth_device_info *dev_info =
604 				&rx_adapter->eth_devices[d];
605 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
606 		if (dev_info->rx_queue == NULL)
607 			continue;
608 		if (dev_info->internal_event_port)
609 			continue;
610 		dev_info->wrr_len = 0;
611 		for (q = 0; q < nb_rx_queues; q++) {
612 			struct eth_rx_queue_info *queue_info =
613 				&dev_info->rx_queue[q];
614 			uint16_t wt;
615 
616 			if (!rxa_polled_queue(dev_info, q))
617 				continue;
618 			wt = queue_info->wt;
619 			rx_poll[poll_q].eth_dev_id = d;
620 			rx_poll[poll_q].eth_rx_qid = q;
621 			max_wrr_pos += wt;
622 			dev_info->wrr_len += wt;
623 			max_wt = RTE_MAX(max_wt, wt);
624 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
625 			poll_q++;
626 		}
627 	}
628 
629 	/* Generate polling sequence based on weights */
630 	prev = -1;
631 	cw = -1;
632 	for (i = 0; i < max_wrr_pos; i++) {
633 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
634 				     rx_poll, max_wt, gcd, prev);
635 		prev = rx_wrr[i];
636 	}
637 }
638 
639 static inline void
640 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
641 	struct rte_ipv6_hdr **ipv6_hdr)
642 {
643 	struct rte_ether_hdr *eth_hdr =
644 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
645 	struct rte_vlan_hdr *vlan_hdr;
646 
647 	*ipv4_hdr = NULL;
648 	*ipv6_hdr = NULL;
649 
650 	switch (eth_hdr->ether_type) {
651 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
652 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
653 		break;
654 
655 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
656 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
657 		break;
658 
659 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
660 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
661 		switch (vlan_hdr->eth_proto) {
662 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
663 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
664 			break;
665 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
666 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
667 			break;
668 		default:
669 			break;
670 		}
671 		break;
672 
673 	default:
674 		break;
675 	}
676 }
677 
678 /* Calculate RSS hash for IPv4/6 */
679 static inline uint32_t
680 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
681 {
682 	uint32_t input_len;
683 	void *tuple;
684 	struct rte_ipv4_tuple ipv4_tuple;
685 	struct rte_ipv6_tuple ipv6_tuple;
686 	struct rte_ipv4_hdr *ipv4_hdr;
687 	struct rte_ipv6_hdr *ipv6_hdr;
688 
689 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
690 
691 	if (ipv4_hdr) {
692 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
693 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
694 		tuple = &ipv4_tuple;
695 		input_len = RTE_THASH_V4_L3_LEN;
696 	} else if (ipv6_hdr) {
697 		rte_thash_load_v6_addrs(ipv6_hdr,
698 					(union rte_thash_tuple *)&ipv6_tuple);
699 		tuple = &ipv6_tuple;
700 		input_len = RTE_THASH_V6_L3_LEN;
701 	} else
702 		return 0;
703 
704 	return rte_softrss_be(tuple, input_len, rss_key_be);
705 }
706 
707 static inline int
708 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
709 {
710 	return !!rx_adapter->enq_block_count;
711 }
712 
713 static inline void
714 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
715 {
716 	if (rx_adapter->rx_enq_block_start_ts)
717 		return;
718 
719 	rx_adapter->enq_block_count++;
720 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
721 		return;
722 
723 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
724 }
725 
726 static inline void
727 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
728 		    struct rte_event_eth_rx_adapter_stats *stats)
729 {
730 	if (unlikely(!stats->rx_enq_start_ts))
731 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
732 
733 	if (likely(!rxa_enq_blocked(rx_adapter)))
734 		return;
735 
736 	rx_adapter->enq_block_count = 0;
737 	if (rx_adapter->rx_enq_block_start_ts) {
738 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
739 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
740 		    rx_adapter->rx_enq_block_start_ts;
741 		rx_adapter->rx_enq_block_start_ts = 0;
742 	}
743 }
744 
745 /* Enqueue buffered events to event device */
746 static inline uint16_t
747 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
748 {
749 	struct rte_eth_event_enqueue_buffer *buf =
750 	    &rx_adapter->event_enqueue_buffer;
751 	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
752 
753 	if (!buf->count)
754 		return 0;
755 
756 	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
757 					rx_adapter->event_port_id,
758 					buf->events,
759 					buf->count);
760 	if (n != buf->count) {
761 		memmove(buf->events,
762 			&buf->events[n],
763 			(buf->count - n) * sizeof(struct rte_event));
764 		stats->rx_enq_retry++;
765 	}
766 
767 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
768 		rxa_enq_block_start_ts(rx_adapter);
769 
770 	buf->count -= n;
771 	stats->rx_enq_count += n;
772 
773 	return n;
774 }
775 
776 static inline void
777 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
778 		struct eth_rx_vector_data *vec)
779 {
780 	vec->vector_ev->nb_elem = 0;
781 	vec->vector_ev->port = vec->port;
782 	vec->vector_ev->queue = vec->queue;
783 	vec->vector_ev->attr_valid = true;
784 	TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
785 }
786 
787 static inline uint16_t
788 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
789 			struct eth_rx_queue_info *queue_info,
790 			struct rte_eth_event_enqueue_buffer *buf,
791 			struct rte_mbuf **mbufs, uint16_t num)
792 {
793 	struct rte_event *ev = &buf->events[buf->count];
794 	struct eth_rx_vector_data *vec;
795 	uint16_t filled, space, sz;
796 
797 	filled = 0;
798 	vec = &queue_info->vector_data;
799 
800 	if (vec->vector_ev == NULL) {
801 		if (rte_mempool_get(vec->vector_pool,
802 				    (void **)&vec->vector_ev) < 0) {
803 			rte_pktmbuf_free_bulk(mbufs, num);
804 			return 0;
805 		}
806 		rxa_init_vector(rx_adapter, vec);
807 	}
808 	while (num) {
809 		if (vec->vector_ev->nb_elem == vec->max_vector_count) {
810 			/* Event ready. */
811 			ev->event = vec->event;
812 			ev->vec = vec->vector_ev;
813 			ev++;
814 			filled++;
815 			vec->vector_ev = NULL;
816 			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
817 			if (rte_mempool_get(vec->vector_pool,
818 					    (void **)&vec->vector_ev) < 0) {
819 				rte_pktmbuf_free_bulk(mbufs, num);
820 				return 0;
821 			}
822 			rxa_init_vector(rx_adapter, vec);
823 		}
824 
825 		space = vec->max_vector_count - vec->vector_ev->nb_elem;
826 		sz = num > space ? space : num;
827 		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
828 		       sizeof(void *) * sz);
829 		vec->vector_ev->nb_elem += sz;
830 		num -= sz;
831 		mbufs += sz;
832 		vec->ts = rte_rdtsc();
833 	}
834 
835 	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
836 		ev->event = vec->event;
837 		ev->vec = vec->vector_ev;
838 		ev++;
839 		filled++;
840 		vec->vector_ev = NULL;
841 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
842 	}
843 
844 	return filled;
845 }
846 
847 static inline void
848 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
849 		uint16_t eth_dev_id,
850 		uint16_t rx_queue_id,
851 		struct rte_mbuf **mbufs,
852 		uint16_t num)
853 {
854 	uint32_t i;
855 	struct eth_device_info *dev_info =
856 					&rx_adapter->eth_devices[eth_dev_id];
857 	struct eth_rx_queue_info *eth_rx_queue_info =
858 					&dev_info->rx_queue[rx_queue_id];
859 	struct rte_eth_event_enqueue_buffer *buf =
860 					&rx_adapter->event_enqueue_buffer;
861 	struct rte_event *ev = &buf->events[buf->count];
862 	uint64_t event = eth_rx_queue_info->event;
863 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
864 	struct rte_mbuf *m = mbufs[0];
865 	uint32_t rss_mask;
866 	uint32_t rss;
867 	int do_rss;
868 	uint16_t nb_cb;
869 	uint16_t dropped;
870 
871 	if (!eth_rx_queue_info->ena_vector) {
872 		/* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
873 		rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
874 		do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
875 		for (i = 0; i < num; i++) {
876 			m = mbufs[i];
877 
878 			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
879 				     : m->hash.rss;
880 			ev->event = event;
881 			ev->flow_id = (rss & ~flow_id_mask) |
882 				      (ev->flow_id & flow_id_mask);
883 			ev->mbuf = m;
884 			ev++;
885 		}
886 	} else {
887 		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
888 					      buf, mbufs, num);
889 	}
890 
891 	if (num && dev_info->cb_fn) {
892 
893 		dropped = 0;
894 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
895 					ETH_EVENT_BUFFER_SIZE, buf->count,
896 					&buf->events[buf->count], num,
897 					dev_info->cb_arg, &dropped);
898 		if (unlikely(nb_cb > num))
899 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
900 				nb_cb, num);
901 		else
902 			num = nb_cb;
903 		if (dropped)
904 			rx_adapter->stats.rx_dropped += dropped;
905 	}
906 
907 	buf->count += num;
908 }
909 
910 /* Enqueue packets from  <port, q>  to event buffer */
911 static inline uint32_t
912 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
913 	uint16_t port_id,
914 	uint16_t queue_id,
915 	uint32_t rx_count,
916 	uint32_t max_rx,
917 	int *rxq_empty)
918 {
919 	struct rte_mbuf *mbufs[BATCH_SIZE];
920 	struct rte_eth_event_enqueue_buffer *buf =
921 					&rx_adapter->event_enqueue_buffer;
922 	struct rte_event_eth_rx_adapter_stats *stats =
923 					&rx_adapter->stats;
924 	uint16_t n;
925 	uint32_t nb_rx = 0;
926 
927 	if (rxq_empty)
928 		*rxq_empty = 0;
929 	/* Don't do a batch dequeue from the rx queue if there isn't
930 	 * enough space in the enqueue buffer.
931 	 */
932 	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
933 		if (buf->count >= BATCH_SIZE)
934 			rxa_flush_event_buffer(rx_adapter);
935 
936 		stats->rx_poll_count++;
937 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
938 		if (unlikely(!n)) {
939 			if (rxq_empty)
940 				*rxq_empty = 1;
941 			break;
942 		}
943 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
944 		nb_rx += n;
945 		if (rx_count + nb_rx > max_rx)
946 			break;
947 	}
948 
949 	if (buf->count > 0)
950 		rxa_flush_event_buffer(rx_adapter);
951 
952 	return nb_rx;
953 }
954 
955 static inline void
956 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
957 		void *data)
958 {
959 	uint16_t port_id;
960 	uint16_t queue;
961 	int err;
962 	union queue_data qd;
963 	struct eth_device_info *dev_info;
964 	struct eth_rx_queue_info *queue_info;
965 	int *intr_enabled;
966 
967 	qd.ptr = data;
968 	port_id = qd.port;
969 	queue = qd.queue;
970 
971 	dev_info = &rx_adapter->eth_devices[port_id];
972 	queue_info = &dev_info->rx_queue[queue];
973 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
974 	if (rxa_shared_intr(dev_info, queue))
975 		intr_enabled = &dev_info->shared_intr_enabled;
976 	else
977 		intr_enabled = &queue_info->intr_enabled;
978 
979 	if (*intr_enabled) {
980 		*intr_enabled = 0;
981 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
982 		/* Entry should always be available.
983 		 * The ring size equals the maximum number of interrupt
984 		 * vectors supported (an interrupt vector is shared in
985 		 * case of shared interrupts)
986 		 */
987 		if (err)
988 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
989 				" to ring: %s", strerror(-err));
990 		else
991 			rte_eth_dev_rx_intr_disable(port_id, queue);
992 	}
993 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
994 }
995 
996 static int
997 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
998 			uint32_t num_intr_vec)
999 {
1000 	if (rx_adapter->num_intr_vec + num_intr_vec >
1001 				RTE_EVENT_ETH_INTR_RING_SIZE) {
1002 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1003 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
1004 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1005 		return -ENOSPC;
1006 	}
1007 
1008 	return 0;
1009 }
1010 
1011 /* Delete entries for (dev, queue) from the interrupt ring */
1012 static void
1013 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1014 			struct eth_device_info *dev_info,
1015 			uint16_t rx_queue_id)
1016 {
1017 	int i, n;
1018 	union queue_data qd;
1019 
1020 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1021 
1022 	n = rte_ring_count(rx_adapter->intr_ring);
1023 	for (i = 0; i < n; i++) {
1024 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1025 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1026 			if (qd.port == dev_info->dev->data->port_id &&
1027 				qd.queue == rx_queue_id)
1028 				continue;
1029 		} else {
1030 			if (qd.port == dev_info->dev->data->port_id)
1031 				continue;
1032 		}
1033 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1034 	}
1035 
1036 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1037 }
1038 
1039 /* pthread callback handling interrupt mode receive queues
1040  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1041  * interrupting queue to the adapter's ring buffer for interrupt events.
1042  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1043  * the adapter service function.
1044  */
1045 static void *
1046 rxa_intr_thread(void *arg)
1047 {
1048 	struct rte_event_eth_rx_adapter *rx_adapter = arg;
1049 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1050 	int n, i;
1051 
1052 	while (1) {
1053 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1054 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1055 		if (unlikely(n < 0))
1056 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1057 					n);
1058 		for (i = 0; i < n; i++) {
1059 			rxa_intr_ring_enqueue(rx_adapter,
1060 					epoll_events[i].epdata.data);
1061 		}
1062 	}
1063 
1064 	return NULL;
1065 }
1066 
1067 /* Dequeue <port, q> from interrupt ring and enqueue received
1068  * mbufs to eventdev
1069  */
1070 static inline uint32_t
1071 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1072 {
1073 	uint32_t n;
1074 	uint32_t nb_rx = 0;
1075 	int rxq_empty;
1076 	struct rte_eth_event_enqueue_buffer *buf;
1077 	rte_spinlock_t *ring_lock;
1078 	uint8_t max_done = 0;
1079 
1080 	if (rx_adapter->num_rx_intr == 0)
1081 		return 0;
1082 
1083 	if (rte_ring_count(rx_adapter->intr_ring) == 0
1084 		&& !rx_adapter->qd_valid)
1085 		return 0;
1086 
1087 	buf = &rx_adapter->event_enqueue_buffer;
1088 	ring_lock = &rx_adapter->intr_ring_lock;
1089 
1090 	if (buf->count >= BATCH_SIZE)
1091 		rxa_flush_event_buffer(rx_adapter);
1092 
1093 	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1094 		struct eth_device_info *dev_info;
1095 		uint16_t port;
1096 		uint16_t queue;
1097 		union queue_data qd  = rx_adapter->qd;
1098 		int err;
1099 
1100 		if (!rx_adapter->qd_valid) {
1101 			struct eth_rx_queue_info *queue_info;
1102 
1103 			rte_spinlock_lock(ring_lock);
1104 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1105 			if (err) {
1106 				rte_spinlock_unlock(ring_lock);
1107 				break;
1108 			}
1109 
1110 			port = qd.port;
1111 			queue = qd.queue;
1112 			rx_adapter->qd = qd;
1113 			rx_adapter->qd_valid = 1;
1114 			dev_info = &rx_adapter->eth_devices[port];
1115 			if (rxa_shared_intr(dev_info, queue))
1116 				dev_info->shared_intr_enabled = 1;
1117 			else {
1118 				queue_info = &dev_info->rx_queue[queue];
1119 				queue_info->intr_enabled = 1;
1120 			}
1121 			rte_eth_dev_rx_intr_enable(port, queue);
1122 			rte_spinlock_unlock(ring_lock);
1123 		} else {
1124 			port = qd.port;
1125 			queue = qd.queue;
1126 
1127 			dev_info = &rx_adapter->eth_devices[port];
1128 		}
1129 
1130 		if (rxa_shared_intr(dev_info, queue)) {
1131 			uint16_t i;
1132 			uint16_t nb_queues;
1133 
1134 			nb_queues = dev_info->dev->data->nb_rx_queues;
1135 			n = 0;
1136 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1137 				uint8_t enq_buffer_full;
1138 
1139 				if (!rxa_intr_queue(dev_info, i))
1140 					continue;
1141 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1142 					rx_adapter->max_nb_rx,
1143 					&rxq_empty);
1144 				nb_rx += n;
1145 
1146 				enq_buffer_full = !rxq_empty && n == 0;
1147 				max_done = nb_rx > rx_adapter->max_nb_rx;
1148 
1149 				if (enq_buffer_full || max_done) {
1150 					dev_info->next_q_idx = i;
1151 					goto done;
1152 				}
1153 			}
1154 
1155 			rx_adapter->qd_valid = 0;
1156 
1157 			/* Reinitialize for next interrupt */
1158 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1159 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1160 						0;
1161 		} else {
1162 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1163 				rx_adapter->max_nb_rx,
1164 				&rxq_empty);
1165 			rx_adapter->qd_valid = !rxq_empty;
1166 			nb_rx += n;
1167 			if (nb_rx > rx_adapter->max_nb_rx)
1168 				break;
1169 		}
1170 	}
1171 
1172 done:
1173 	rx_adapter->stats.rx_intr_packets += nb_rx;
1174 	return nb_rx;
1175 }
1176 
1177 /*
1178  * Polls receive queues added to the event adapter and enqueues received
1179  * packets to the event device.
1180  *
1181  * The receive code enqueues initially to a temporary buffer, the
1182  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1183  *
1184  * If there isn't space available in the temporary buffer, packets from the
1185  * Rx queue aren't dequeued from the eth device, this back pressures the
1186  * eth device, in virtual device environments this back pressure is relayed to
1187  * the hypervisor's switching layer where adjustments can be made to deal with
1188  * it.
1189  */
1190 static inline uint32_t
1191 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1192 {
1193 	uint32_t num_queue;
1194 	uint32_t nb_rx = 0;
1195 	struct rte_eth_event_enqueue_buffer *buf;
1196 	uint32_t wrr_pos;
1197 	uint32_t max_nb_rx;
1198 
1199 	wrr_pos = rx_adapter->wrr_pos;
1200 	max_nb_rx = rx_adapter->max_nb_rx;
1201 	buf = &rx_adapter->event_enqueue_buffer;
1202 
1203 	/* Iterate through a WRR sequence */
1204 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1205 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1206 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1207 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1208 
1209 		/* Don't do a batch dequeue from the rx queue if there isn't
1210 		 * enough space in the enqueue buffer.
1211 		 */
1212 		if (buf->count >= BATCH_SIZE)
1213 			rxa_flush_event_buffer(rx_adapter);
1214 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1215 			rx_adapter->wrr_pos = wrr_pos;
1216 			return nb_rx;
1217 		}
1218 
1219 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1220 				NULL);
1221 		if (nb_rx > max_nb_rx) {
1222 			rx_adapter->wrr_pos =
1223 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1224 			break;
1225 		}
1226 
1227 		if (++wrr_pos == rx_adapter->wrr_len)
1228 			wrr_pos = 0;
1229 	}
1230 	return nb_rx;
1231 }
1232 
1233 static void
1234 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1235 {
1236 	struct rte_event_eth_rx_adapter *rx_adapter = arg;
1237 	struct rte_eth_event_enqueue_buffer *buf =
1238 		&rx_adapter->event_enqueue_buffer;
1239 	struct rte_event *ev;
1240 
1241 	if (buf->count)
1242 		rxa_flush_event_buffer(rx_adapter);
1243 
1244 	if (vec->vector_ev->nb_elem == 0)
1245 		return;
1246 	ev = &buf->events[buf->count];
1247 
1248 	/* Event ready. */
1249 	ev->event = vec->event;
1250 	ev->vec = vec->vector_ev;
1251 	buf->count++;
1252 
1253 	vec->vector_ev = NULL;
1254 	vec->ts = 0;
1255 }
1256 
1257 static int
1258 rxa_service_func(void *args)
1259 {
1260 	struct rte_event_eth_rx_adapter *rx_adapter = args;
1261 	struct rte_event_eth_rx_adapter_stats *stats;
1262 
1263 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1264 		return 0;
1265 	if (!rx_adapter->rxa_started) {
1266 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1267 		return 0;
1268 	}
1269 
1270 	if (rx_adapter->ena_vector) {
1271 		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1272 		    rx_adapter->vector_tmo_ticks) {
1273 			struct eth_rx_vector_data *vec;
1274 
1275 			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1276 				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1277 
1278 				if (elapsed_time >= vec->vector_timeout_ticks) {
1279 					rxa_vector_expire(vec, rx_adapter);
1280 					TAILQ_REMOVE(&rx_adapter->vector_list,
1281 						     vec, next);
1282 				}
1283 			}
1284 			rx_adapter->prev_expiry_ts = rte_rdtsc();
1285 		}
1286 	}
1287 
1288 	stats = &rx_adapter->stats;
1289 	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1290 	stats->rx_packets += rxa_poll(rx_adapter);
1291 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1292 	return 0;
1293 }
1294 
1295 static int
1296 rte_event_eth_rx_adapter_init(void)
1297 {
1298 	const char *name = "rte_event_eth_rx_adapter_array";
1299 	const struct rte_memzone *mz;
1300 	unsigned int sz;
1301 
1302 	sz = sizeof(*event_eth_rx_adapter) *
1303 	    RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1304 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1305 
1306 	mz = rte_memzone_lookup(name);
1307 	if (mz == NULL) {
1308 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1309 						 RTE_CACHE_LINE_SIZE);
1310 		if (mz == NULL) {
1311 			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1312 					PRId32, rte_errno);
1313 			return -rte_errno;
1314 		}
1315 	}
1316 
1317 	event_eth_rx_adapter = mz->addr;
1318 	return 0;
1319 }
1320 
1321 static inline struct rte_event_eth_rx_adapter *
1322 rxa_id_to_adapter(uint8_t id)
1323 {
1324 	return event_eth_rx_adapter ?
1325 		event_eth_rx_adapter[id] : NULL;
1326 }
1327 
1328 static int
1329 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1330 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1331 {
1332 	int ret;
1333 	struct rte_eventdev *dev;
1334 	struct rte_event_dev_config dev_conf;
1335 	int started;
1336 	uint8_t port_id;
1337 	struct rte_event_port_conf *port_conf = arg;
1338 	struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1339 
1340 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1341 	dev_conf = dev->data->dev_conf;
1342 
1343 	started = dev->data->dev_started;
1344 	if (started)
1345 		rte_event_dev_stop(dev_id);
1346 	port_id = dev_conf.nb_event_ports;
1347 	dev_conf.nb_event_ports += 1;
1348 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1349 	if (ret) {
1350 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1351 						dev_id);
1352 		if (started) {
1353 			if (rte_event_dev_start(dev_id))
1354 				return -EIO;
1355 		}
1356 		return ret;
1357 	}
1358 
1359 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1360 	if (ret) {
1361 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1362 					port_id);
1363 		return ret;
1364 	}
1365 
1366 	conf->event_port_id = port_id;
1367 	conf->max_nb_rx = 128;
1368 	if (started)
1369 		ret = rte_event_dev_start(dev_id);
1370 	rx_adapter->default_cb_arg = 1;
1371 	return ret;
1372 }
1373 
1374 static int
1375 rxa_epoll_create1(void)
1376 {
1377 #if defined(LINUX)
1378 	int fd;
1379 	fd = epoll_create1(EPOLL_CLOEXEC);
1380 	return fd < 0 ? -errno : fd;
1381 #elif defined(BSD)
1382 	return -ENOTSUP;
1383 #endif
1384 }
1385 
1386 static int
1387 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1388 {
1389 	if (rx_adapter->epd != INIT_FD)
1390 		return 0;
1391 
1392 	rx_adapter->epd = rxa_epoll_create1();
1393 	if (rx_adapter->epd < 0) {
1394 		int err = rx_adapter->epd;
1395 		rx_adapter->epd = INIT_FD;
1396 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1397 		return err;
1398 	}
1399 
1400 	return 0;
1401 }
1402 
1403 static int
1404 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1405 {
1406 	int err;
1407 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1408 
1409 	if (rx_adapter->intr_ring)
1410 		return 0;
1411 
1412 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1413 					RTE_EVENT_ETH_INTR_RING_SIZE,
1414 					rte_socket_id(), 0);
1415 	if (!rx_adapter->intr_ring)
1416 		return -ENOMEM;
1417 
1418 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1419 					RTE_EVENT_ETH_INTR_RING_SIZE *
1420 					sizeof(struct rte_epoll_event),
1421 					RTE_CACHE_LINE_SIZE,
1422 					rx_adapter->socket_id);
1423 	if (!rx_adapter->epoll_events) {
1424 		err = -ENOMEM;
1425 		goto error;
1426 	}
1427 
1428 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1429 
1430 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1431 			"rx-intr-thread-%d", rx_adapter->id);
1432 
1433 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1434 				NULL, rxa_intr_thread, rx_adapter);
1435 	if (!err)
1436 		return 0;
1437 
1438 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1439 	rte_free(rx_adapter->epoll_events);
1440 error:
1441 	rte_ring_free(rx_adapter->intr_ring);
1442 	rx_adapter->intr_ring = NULL;
1443 	rx_adapter->epoll_events = NULL;
1444 	return err;
1445 }
1446 
1447 static int
1448 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1449 {
1450 	int err;
1451 
1452 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1453 	if (err)
1454 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1455 				err);
1456 
1457 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1458 	if (err)
1459 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1460 
1461 	rte_free(rx_adapter->epoll_events);
1462 	rte_ring_free(rx_adapter->intr_ring);
1463 	rx_adapter->intr_ring = NULL;
1464 	rx_adapter->epoll_events = NULL;
1465 	return 0;
1466 }
1467 
1468 static int
1469 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1470 {
1471 	int ret;
1472 
1473 	if (rx_adapter->num_rx_intr == 0)
1474 		return 0;
1475 
1476 	ret = rxa_destroy_intr_thread(rx_adapter);
1477 	if (ret)
1478 		return ret;
1479 
1480 	close(rx_adapter->epd);
1481 	rx_adapter->epd = INIT_FD;
1482 
1483 	return ret;
1484 }
1485 
1486 static int
1487 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1488 	struct eth_device_info *dev_info,
1489 	uint16_t rx_queue_id)
1490 {
1491 	int err;
1492 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1493 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1494 
1495 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1496 	if (err) {
1497 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1498 			rx_queue_id);
1499 		return err;
1500 	}
1501 
1502 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1503 					rx_adapter->epd,
1504 					RTE_INTR_EVENT_DEL,
1505 					0);
1506 	if (err)
1507 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1508 
1509 	if (sintr)
1510 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1511 	else
1512 		dev_info->shared_intr_enabled = 0;
1513 	return err;
1514 }
1515 
1516 static int
1517 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1518 		struct eth_device_info *dev_info,
1519 		int rx_queue_id)
1520 {
1521 	int err;
1522 	int i;
1523 	int s;
1524 
1525 	if (dev_info->nb_rx_intr == 0)
1526 		return 0;
1527 
1528 	err = 0;
1529 	if (rx_queue_id == -1) {
1530 		s = dev_info->nb_shared_intr;
1531 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1532 			int sintr;
1533 			uint16_t q;
1534 
1535 			q = dev_info->intr_queue[i];
1536 			sintr = rxa_shared_intr(dev_info, q);
1537 			s -= sintr;
1538 
1539 			if (!sintr || s == 0) {
1540 
1541 				err = rxa_disable_intr(rx_adapter, dev_info,
1542 						q);
1543 				if (err)
1544 					return err;
1545 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1546 							q);
1547 			}
1548 		}
1549 	} else {
1550 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1551 			return 0;
1552 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1553 				dev_info->nb_shared_intr == 1) {
1554 			err = rxa_disable_intr(rx_adapter, dev_info,
1555 					rx_queue_id);
1556 			if (err)
1557 				return err;
1558 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1559 						rx_queue_id);
1560 		}
1561 
1562 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1563 			if (dev_info->intr_queue[i] == rx_queue_id) {
1564 				for (; i < dev_info->nb_rx_intr - 1; i++)
1565 					dev_info->intr_queue[i] =
1566 						dev_info->intr_queue[i + 1];
1567 				break;
1568 			}
1569 		}
1570 	}
1571 
1572 	return err;
1573 }
1574 
1575 static int
1576 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1577 	struct eth_device_info *dev_info,
1578 	uint16_t rx_queue_id)
1579 {
1580 	int err, err1;
1581 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1582 	union queue_data qd;
1583 	int init_fd;
1584 	uint16_t *intr_queue;
1585 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1586 
1587 	if (rxa_intr_queue(dev_info, rx_queue_id))
1588 		return 0;
1589 
1590 	intr_queue = dev_info->intr_queue;
1591 	if (dev_info->intr_queue == NULL) {
1592 		size_t len =
1593 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1594 		dev_info->intr_queue =
1595 			rte_zmalloc_socket(
1596 				rx_adapter->mem_name,
1597 				len,
1598 				0,
1599 				rx_adapter->socket_id);
1600 		if (dev_info->intr_queue == NULL)
1601 			return -ENOMEM;
1602 	}
1603 
1604 	init_fd = rx_adapter->epd;
1605 	err = rxa_init_epd(rx_adapter);
1606 	if (err)
1607 		goto err_free_queue;
1608 
1609 	qd.port = eth_dev_id;
1610 	qd.queue = rx_queue_id;
1611 
1612 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1613 					rx_adapter->epd,
1614 					RTE_INTR_EVENT_ADD,
1615 					qd.ptr);
1616 	if (err) {
1617 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1618 			" Rx Queue %u err %d", rx_queue_id, err);
1619 		goto err_del_fd;
1620 	}
1621 
1622 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1623 	if (err) {
1624 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1625 				" Rx Queue %u err %d", rx_queue_id, err);
1626 
1627 		goto err_del_event;
1628 	}
1629 
1630 	err = rxa_create_intr_thread(rx_adapter);
1631 	if (!err)  {
1632 		if (sintr)
1633 			dev_info->shared_intr_enabled = 1;
1634 		else
1635 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1636 		return 0;
1637 	}
1638 
1639 
1640 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1641 	if (err)
1642 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1643 				" Rx Queue %u err %d", rx_queue_id, err);
1644 err_del_event:
1645 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1646 					rx_adapter->epd,
1647 					RTE_INTR_EVENT_DEL,
1648 					0);
1649 	if (err1) {
1650 		RTE_EDEV_LOG_ERR("Could not delete event for"
1651 				" Rx Queue %u err %d", rx_queue_id, err1);
1652 	}
1653 err_del_fd:
1654 	if (init_fd == INIT_FD) {
1655 		close(rx_adapter->epd);
1656 		rx_adapter->epd = -1;
1657 	}
1658 err_free_queue:
1659 	if (intr_queue == NULL)
1660 		rte_free(dev_info->intr_queue);
1661 
1662 	return err;
1663 }
1664 
1665 static int
1666 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1667 	struct eth_device_info *dev_info,
1668 	int rx_queue_id)
1669 
1670 {
1671 	int i, j, err;
1672 	int si = -1;
1673 	int shared_done = (dev_info->nb_shared_intr > 0);
1674 
1675 	if (rx_queue_id != -1) {
1676 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1677 			return 0;
1678 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1679 	}
1680 
1681 	err = 0;
1682 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1683 
1684 		if (rxa_shared_intr(dev_info, i) && shared_done)
1685 			continue;
1686 
1687 		err = rxa_config_intr(rx_adapter, dev_info, i);
1688 
1689 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1690 		if (shared_done) {
1691 			si = i;
1692 			dev_info->shared_intr_enabled = 1;
1693 		}
1694 		if (err)
1695 			break;
1696 	}
1697 
1698 	if (err == 0)
1699 		return 0;
1700 
1701 	shared_done = (dev_info->nb_shared_intr > 0);
1702 	for (j = 0; j < i; j++) {
1703 		if (rxa_intr_queue(dev_info, j))
1704 			continue;
1705 		if (rxa_shared_intr(dev_info, j) && si != j)
1706 			continue;
1707 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1708 		if (err)
1709 			break;
1710 
1711 	}
1712 
1713 	return err;
1714 }
1715 
1716 
1717 static int
1718 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1719 {
1720 	int ret;
1721 	struct rte_service_spec service;
1722 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1723 
1724 	if (rx_adapter->service_inited)
1725 		return 0;
1726 
1727 	memset(&service, 0, sizeof(service));
1728 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1729 		"rte_event_eth_rx_adapter_%d", id);
1730 	service.socket_id = rx_adapter->socket_id;
1731 	service.callback = rxa_service_func;
1732 	service.callback_userdata = rx_adapter;
1733 	/* Service function handles locking for queue add/del updates */
1734 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1735 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1736 	if (ret) {
1737 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1738 			service.name, ret);
1739 		return ret;
1740 	}
1741 
1742 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1743 		&rx_adapter_conf, rx_adapter->conf_arg);
1744 	if (ret) {
1745 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1746 			ret);
1747 		goto err_done;
1748 	}
1749 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1750 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1751 	rx_adapter->service_inited = 1;
1752 	rx_adapter->epd = INIT_FD;
1753 	return 0;
1754 
1755 err_done:
1756 	rte_service_component_unregister(rx_adapter->service_id);
1757 	return ret;
1758 }
1759 
1760 static void
1761 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1762 		struct eth_device_info *dev_info,
1763 		int32_t rx_queue_id,
1764 		uint8_t add)
1765 {
1766 	struct eth_rx_queue_info *queue_info;
1767 	int enabled;
1768 	uint16_t i;
1769 
1770 	if (dev_info->rx_queue == NULL)
1771 		return;
1772 
1773 	if (rx_queue_id == -1) {
1774 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1775 			rxa_update_queue(rx_adapter, dev_info, i, add);
1776 	} else {
1777 		queue_info = &dev_info->rx_queue[rx_queue_id];
1778 		enabled = queue_info->queue_enabled;
1779 		if (add) {
1780 			rx_adapter->nb_queues += !enabled;
1781 			dev_info->nb_dev_queues += !enabled;
1782 		} else {
1783 			rx_adapter->nb_queues -= enabled;
1784 			dev_info->nb_dev_queues -= enabled;
1785 		}
1786 		queue_info->queue_enabled = !!add;
1787 	}
1788 }
1789 
1790 static void
1791 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1792 		    uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1793 		    uint16_t port_id)
1794 {
1795 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1796 	struct eth_rx_vector_data *vector_data;
1797 	uint32_t flow_id;
1798 
1799 	vector_data = &queue_info->vector_data;
1800 	vector_data->max_vector_count = vector_count;
1801 	vector_data->port = port_id;
1802 	vector_data->queue = qid;
1803 	vector_data->vector_pool = mp;
1804 	vector_data->vector_timeout_ticks =
1805 		NSEC2TICK(vector_ns, rte_get_timer_hz());
1806 	vector_data->ts = 0;
1807 	flow_id = queue_info->event & 0xFFFFF;
1808 	flow_id =
1809 		flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1810 	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1811 }
1812 
1813 static void
1814 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1815 	struct eth_device_info *dev_info,
1816 	int32_t rx_queue_id)
1817 {
1818 	struct eth_rx_vector_data *vec;
1819 	int pollq;
1820 	int intrq;
1821 	int sintrq;
1822 
1823 
1824 	if (rx_adapter->nb_queues == 0)
1825 		return;
1826 
1827 	if (rx_queue_id == -1) {
1828 		uint16_t nb_rx_queues;
1829 		uint16_t i;
1830 
1831 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1832 		for (i = 0; i <	nb_rx_queues; i++)
1833 			rxa_sw_del(rx_adapter, dev_info, i);
1834 		return;
1835 	}
1836 
1837 	/* Push all the partial event vectors to event device. */
1838 	TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1839 		if (vec->queue != rx_queue_id)
1840 			continue;
1841 		rxa_vector_expire(vec, rx_adapter);
1842 		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1843 	}
1844 
1845 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1846 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1847 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1848 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1849 	rx_adapter->num_rx_polled -= pollq;
1850 	dev_info->nb_rx_poll -= pollq;
1851 	rx_adapter->num_rx_intr -= intrq;
1852 	dev_info->nb_rx_intr -= intrq;
1853 	dev_info->nb_shared_intr -= intrq && sintrq;
1854 }
1855 
1856 static void
1857 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1858 	struct eth_device_info *dev_info,
1859 	int32_t rx_queue_id,
1860 	const struct rte_event_eth_rx_adapter_queue_conf *conf)
1861 {
1862 	struct eth_rx_queue_info *queue_info;
1863 	const struct rte_event *ev = &conf->ev;
1864 	int pollq;
1865 	int intrq;
1866 	int sintrq;
1867 	struct rte_event *qi_ev;
1868 
1869 	if (rx_queue_id == -1) {
1870 		uint16_t nb_rx_queues;
1871 		uint16_t i;
1872 
1873 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1874 		for (i = 0; i <	nb_rx_queues; i++)
1875 			rxa_add_queue(rx_adapter, dev_info, i, conf);
1876 		return;
1877 	}
1878 
1879 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1880 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1881 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1882 
1883 	queue_info = &dev_info->rx_queue[rx_queue_id];
1884 	queue_info->wt = conf->servicing_weight;
1885 
1886 	qi_ev = (struct rte_event *)&queue_info->event;
1887 	qi_ev->event = ev->event;
1888 	qi_ev->op = RTE_EVENT_OP_NEW;
1889 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1890 	qi_ev->sub_event_type = 0;
1891 
1892 	if (conf->rx_queue_flags &
1893 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1894 		queue_info->flow_id_mask = ~0;
1895 	} else
1896 		qi_ev->flow_id = 0;
1897 
1898 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1899 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
1900 		rx_adapter->num_rx_polled += !pollq;
1901 		dev_info->nb_rx_poll += !pollq;
1902 		rx_adapter->num_rx_intr -= intrq;
1903 		dev_info->nb_rx_intr -= intrq;
1904 		dev_info->nb_shared_intr -= intrq && sintrq;
1905 	}
1906 
1907 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
1908 		rx_adapter->num_rx_polled -= pollq;
1909 		dev_info->nb_rx_poll -= pollq;
1910 		rx_adapter->num_rx_intr += !intrq;
1911 		dev_info->nb_rx_intr += !intrq;
1912 		dev_info->nb_shared_intr += !intrq && sintrq;
1913 		if (dev_info->nb_shared_intr == 1) {
1914 			if (dev_info->multi_intr_cap)
1915 				dev_info->next_q_idx =
1916 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
1917 			else
1918 				dev_info->next_q_idx = 0;
1919 		}
1920 	}
1921 }
1922 
1923 static void
1924 rxa_sw_event_vector_configure(
1925 	struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
1926 	int rx_queue_id,
1927 	const struct rte_event_eth_rx_adapter_event_vector_config *config)
1928 {
1929 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1930 	struct eth_rx_queue_info *queue_info;
1931 	struct rte_event *qi_ev;
1932 
1933 	if (rx_queue_id == -1) {
1934 		uint16_t nb_rx_queues;
1935 		uint16_t i;
1936 
1937 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1938 		for (i = 0; i < nb_rx_queues; i++)
1939 			rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
1940 						      config);
1941 		return;
1942 	}
1943 
1944 	queue_info = &dev_info->rx_queue[rx_queue_id];
1945 	qi_ev = (struct rte_event *)&queue_info->event;
1946 	queue_info->ena_vector = 1;
1947 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1948 	rxa_set_vector_data(queue_info, config->vector_sz,
1949 			    config->vector_timeout_ns, config->vector_mp,
1950 			    rx_queue_id, dev_info->dev->data->port_id);
1951 	rx_adapter->ena_vector = 1;
1952 	rx_adapter->vector_tmo_ticks =
1953 		rx_adapter->vector_tmo_ticks ?
1954 			      RTE_MIN(config->vector_timeout_ns >> 1,
1955 				rx_adapter->vector_tmo_ticks) :
1956 			      config->vector_timeout_ns >> 1;
1957 }
1958 
1959 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1960 		uint16_t eth_dev_id,
1961 		int rx_queue_id,
1962 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1963 {
1964 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1965 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1966 	int ret;
1967 	struct eth_rx_poll_entry *rx_poll;
1968 	struct eth_rx_queue_info *rx_queue;
1969 	uint32_t *rx_wrr;
1970 	uint16_t nb_rx_queues;
1971 	uint32_t nb_rx_poll, nb_wrr;
1972 	uint32_t nb_rx_intr;
1973 	int num_intr_vec;
1974 	uint16_t wt;
1975 
1976 	if (queue_conf->servicing_weight == 0) {
1977 		struct rte_eth_dev_data *data = dev_info->dev->data;
1978 
1979 		temp_conf = *queue_conf;
1980 		if (!data->dev_conf.intr_conf.rxq) {
1981 			/* If Rx interrupts are disabled set wt = 1 */
1982 			temp_conf.servicing_weight = 1;
1983 		}
1984 		queue_conf = &temp_conf;
1985 	}
1986 
1987 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1988 	rx_queue = dev_info->rx_queue;
1989 	wt = queue_conf->servicing_weight;
1990 
1991 	if (dev_info->rx_queue == NULL) {
1992 		dev_info->rx_queue =
1993 		    rte_zmalloc_socket(rx_adapter->mem_name,
1994 				       nb_rx_queues *
1995 				       sizeof(struct eth_rx_queue_info), 0,
1996 				       rx_adapter->socket_id);
1997 		if (dev_info->rx_queue == NULL)
1998 			return -ENOMEM;
1999 	}
2000 	rx_wrr = NULL;
2001 	rx_poll = NULL;
2002 
2003 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2004 			queue_conf->servicing_weight,
2005 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2006 
2007 	if (dev_info->dev->intr_handle)
2008 		dev_info->multi_intr_cap =
2009 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
2010 
2011 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2012 				&rx_poll, &rx_wrr);
2013 	if (ret)
2014 		goto err_free_rxqueue;
2015 
2016 	if (wt == 0) {
2017 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2018 
2019 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2020 		if (ret)
2021 			goto err_free_rxqueue;
2022 
2023 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2024 		if (ret)
2025 			goto err_free_rxqueue;
2026 	} else {
2027 
2028 		num_intr_vec = 0;
2029 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2030 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2031 						rx_queue_id, 0);
2032 			/* interrupt based queues are being converted to
2033 			 * poll mode queues, delete the interrupt configuration
2034 			 * for those.
2035 			 */
2036 			ret = rxa_del_intr_queue(rx_adapter,
2037 						dev_info, rx_queue_id);
2038 			if (ret)
2039 				goto err_free_rxqueue;
2040 		}
2041 	}
2042 
2043 	if (nb_rx_intr == 0) {
2044 		ret = rxa_free_intr_resources(rx_adapter);
2045 		if (ret)
2046 			goto err_free_rxqueue;
2047 	}
2048 
2049 	if (wt == 0) {
2050 		uint16_t i;
2051 
2052 		if (rx_queue_id  == -1) {
2053 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2054 				dev_info->intr_queue[i] = i;
2055 		} else {
2056 			if (!rxa_intr_queue(dev_info, rx_queue_id))
2057 				dev_info->intr_queue[nb_rx_intr - 1] =
2058 					rx_queue_id;
2059 		}
2060 	}
2061 
2062 
2063 
2064 	rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2065 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2066 
2067 	rte_free(rx_adapter->eth_rx_poll);
2068 	rte_free(rx_adapter->wrr_sched);
2069 
2070 	rx_adapter->eth_rx_poll = rx_poll;
2071 	rx_adapter->wrr_sched = rx_wrr;
2072 	rx_adapter->wrr_len = nb_wrr;
2073 	rx_adapter->num_intr_vec += num_intr_vec;
2074 	return 0;
2075 
2076 err_free_rxqueue:
2077 	if (rx_queue == NULL) {
2078 		rte_free(dev_info->rx_queue);
2079 		dev_info->rx_queue = NULL;
2080 	}
2081 
2082 	rte_free(rx_poll);
2083 	rte_free(rx_wrr);
2084 
2085 	return 0;
2086 }
2087 
2088 static int
2089 rxa_ctrl(uint8_t id, int start)
2090 {
2091 	struct rte_event_eth_rx_adapter *rx_adapter;
2092 	struct rte_eventdev *dev;
2093 	struct eth_device_info *dev_info;
2094 	uint32_t i;
2095 	int use_service = 0;
2096 	int stop = !start;
2097 
2098 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2099 	rx_adapter = rxa_id_to_adapter(id);
2100 	if (rx_adapter == NULL)
2101 		return -EINVAL;
2102 
2103 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2104 
2105 	RTE_ETH_FOREACH_DEV(i) {
2106 		dev_info = &rx_adapter->eth_devices[i];
2107 		/* if start  check for num dev queues */
2108 		if (start && !dev_info->nb_dev_queues)
2109 			continue;
2110 		/* if stop check if dev has been started */
2111 		if (stop && !dev_info->dev_rx_started)
2112 			continue;
2113 		use_service |= !dev_info->internal_event_port;
2114 		dev_info->dev_rx_started = start;
2115 		if (dev_info->internal_event_port == 0)
2116 			continue;
2117 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2118 						&rte_eth_devices[i]) :
2119 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
2120 						&rte_eth_devices[i]);
2121 	}
2122 
2123 	if (use_service) {
2124 		rte_spinlock_lock(&rx_adapter->rx_lock);
2125 		rx_adapter->rxa_started = start;
2126 		rte_service_runstate_set(rx_adapter->service_id, start);
2127 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2128 	}
2129 
2130 	return 0;
2131 }
2132 
2133 int
2134 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2135 				rte_event_eth_rx_adapter_conf_cb conf_cb,
2136 				void *conf_arg)
2137 {
2138 	struct rte_event_eth_rx_adapter *rx_adapter;
2139 	int ret;
2140 	int socket_id;
2141 	uint16_t i;
2142 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2143 	const uint8_t default_rss_key[] = {
2144 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2145 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2146 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2147 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2148 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2149 	};
2150 
2151 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2152 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2153 	if (conf_cb == NULL)
2154 		return -EINVAL;
2155 
2156 	if (event_eth_rx_adapter == NULL) {
2157 		ret = rte_event_eth_rx_adapter_init();
2158 		if (ret)
2159 			return ret;
2160 	}
2161 
2162 	rx_adapter = rxa_id_to_adapter(id);
2163 	if (rx_adapter != NULL) {
2164 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2165 		return -EEXIST;
2166 	}
2167 
2168 	socket_id = rte_event_dev_socket_id(dev_id);
2169 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2170 		"rte_event_eth_rx_adapter_%d",
2171 		id);
2172 
2173 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2174 			RTE_CACHE_LINE_SIZE, socket_id);
2175 	if (rx_adapter == NULL) {
2176 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2177 		return -ENOMEM;
2178 	}
2179 
2180 	rx_adapter->eventdev_id = dev_id;
2181 	rx_adapter->socket_id = socket_id;
2182 	rx_adapter->conf_cb = conf_cb;
2183 	rx_adapter->conf_arg = conf_arg;
2184 	rx_adapter->id = id;
2185 	TAILQ_INIT(&rx_adapter->vector_list);
2186 	strcpy(rx_adapter->mem_name, mem_name);
2187 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2188 					RTE_MAX_ETHPORTS *
2189 					sizeof(struct eth_device_info), 0,
2190 					socket_id);
2191 	rte_convert_rss_key((const uint32_t *)default_rss_key,
2192 			(uint32_t *)rx_adapter->rss_key_be,
2193 			    RTE_DIM(default_rss_key));
2194 
2195 	if (rx_adapter->eth_devices == NULL) {
2196 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2197 		rte_free(rx_adapter);
2198 		return -ENOMEM;
2199 	}
2200 	rte_spinlock_init(&rx_adapter->rx_lock);
2201 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2202 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2203 
2204 	event_eth_rx_adapter[id] = rx_adapter;
2205 	if (conf_cb == rxa_default_conf_cb)
2206 		rx_adapter->default_cb_arg = 1;
2207 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2208 		conf_arg);
2209 	return 0;
2210 }
2211 
2212 int
2213 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2214 		struct rte_event_port_conf *port_config)
2215 {
2216 	struct rte_event_port_conf *pc;
2217 	int ret;
2218 
2219 	if (port_config == NULL)
2220 		return -EINVAL;
2221 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2222 
2223 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2224 	if (pc == NULL)
2225 		return -ENOMEM;
2226 	*pc = *port_config;
2227 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2228 					rxa_default_conf_cb,
2229 					pc);
2230 	if (ret)
2231 		rte_free(pc);
2232 	return ret;
2233 }
2234 
2235 int
2236 rte_event_eth_rx_adapter_free(uint8_t id)
2237 {
2238 	struct rte_event_eth_rx_adapter *rx_adapter;
2239 
2240 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2241 
2242 	rx_adapter = rxa_id_to_adapter(id);
2243 	if (rx_adapter == NULL)
2244 		return -EINVAL;
2245 
2246 	if (rx_adapter->nb_queues) {
2247 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2248 				rx_adapter->nb_queues);
2249 		return -EBUSY;
2250 	}
2251 
2252 	if (rx_adapter->default_cb_arg)
2253 		rte_free(rx_adapter->conf_arg);
2254 	rte_free(rx_adapter->eth_devices);
2255 	rte_free(rx_adapter);
2256 	event_eth_rx_adapter[id] = NULL;
2257 
2258 	rte_eventdev_trace_eth_rx_adapter_free(id);
2259 	return 0;
2260 }
2261 
2262 int
2263 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2264 		uint16_t eth_dev_id,
2265 		int32_t rx_queue_id,
2266 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2267 {
2268 	int ret;
2269 	uint32_t cap;
2270 	struct rte_event_eth_rx_adapter *rx_adapter;
2271 	struct rte_eventdev *dev;
2272 	struct eth_device_info *dev_info;
2273 
2274 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2275 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2276 
2277 	rx_adapter = rxa_id_to_adapter(id);
2278 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2279 		return -EINVAL;
2280 
2281 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2282 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2283 						eth_dev_id,
2284 						&cap);
2285 	if (ret) {
2286 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2287 			"eth port %" PRIu16, id, eth_dev_id);
2288 		return ret;
2289 	}
2290 
2291 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2292 		&& (queue_conf->rx_queue_flags &
2293 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2294 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2295 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2296 				eth_dev_id, id);
2297 		return -EINVAL;
2298 	}
2299 
2300 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
2301 	    (queue_conf->rx_queue_flags &
2302 	     RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
2303 		RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2304 				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2305 				 eth_dev_id, id);
2306 		return -EINVAL;
2307 	}
2308 
2309 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2310 		(rx_queue_id != -1)) {
2311 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2312 			"event queue, eth port: %" PRIu16 " adapter id: %"
2313 			PRIu8, eth_dev_id, id);
2314 		return -EINVAL;
2315 	}
2316 
2317 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2318 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2319 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2320 			 (uint16_t)rx_queue_id);
2321 		return -EINVAL;
2322 	}
2323 
2324 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2325 
2326 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2327 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2328 					-ENOTSUP);
2329 		if (dev_info->rx_queue == NULL) {
2330 			dev_info->rx_queue =
2331 			    rte_zmalloc_socket(rx_adapter->mem_name,
2332 					dev_info->dev->data->nb_rx_queues *
2333 					sizeof(struct eth_rx_queue_info), 0,
2334 					rx_adapter->socket_id);
2335 			if (dev_info->rx_queue == NULL)
2336 				return -ENOMEM;
2337 		}
2338 
2339 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2340 				&rte_eth_devices[eth_dev_id],
2341 				rx_queue_id, queue_conf);
2342 		if (ret == 0) {
2343 			dev_info->internal_event_port = 1;
2344 			rxa_update_queue(rx_adapter,
2345 					&rx_adapter->eth_devices[eth_dev_id],
2346 					rx_queue_id,
2347 					1);
2348 		}
2349 	} else {
2350 		rte_spinlock_lock(&rx_adapter->rx_lock);
2351 		dev_info->internal_event_port = 0;
2352 		ret = rxa_init_service(rx_adapter, id);
2353 		if (ret == 0) {
2354 			uint32_t service_id = rx_adapter->service_id;
2355 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2356 					queue_conf);
2357 			rte_service_component_runstate_set(service_id,
2358 				rxa_sw_adapter_queue_count(rx_adapter));
2359 		}
2360 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2361 	}
2362 
2363 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2364 		rx_queue_id, queue_conf, ret);
2365 	if (ret)
2366 		return ret;
2367 
2368 	return 0;
2369 }
2370 
2371 static int
2372 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2373 {
2374 	limits->max_sz = MAX_VECTOR_SIZE;
2375 	limits->min_sz = MIN_VECTOR_SIZE;
2376 	limits->max_timeout_ns = MAX_VECTOR_NS;
2377 	limits->min_timeout_ns = MIN_VECTOR_NS;
2378 
2379 	return 0;
2380 }
2381 
2382 int
2383 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2384 				int32_t rx_queue_id)
2385 {
2386 	int ret = 0;
2387 	struct rte_eventdev *dev;
2388 	struct rte_event_eth_rx_adapter *rx_adapter;
2389 	struct eth_device_info *dev_info;
2390 	uint32_t cap;
2391 	uint32_t nb_rx_poll = 0;
2392 	uint32_t nb_wrr = 0;
2393 	uint32_t nb_rx_intr;
2394 	struct eth_rx_poll_entry *rx_poll = NULL;
2395 	uint32_t *rx_wrr = NULL;
2396 	int num_intr_vec;
2397 
2398 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2399 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2400 
2401 	rx_adapter = rxa_id_to_adapter(id);
2402 	if (rx_adapter == NULL)
2403 		return -EINVAL;
2404 
2405 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2406 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2407 						eth_dev_id,
2408 						&cap);
2409 	if (ret)
2410 		return ret;
2411 
2412 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2413 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2414 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2415 			 (uint16_t)rx_queue_id);
2416 		return -EINVAL;
2417 	}
2418 
2419 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2420 
2421 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2422 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2423 				 -ENOTSUP);
2424 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2425 						&rte_eth_devices[eth_dev_id],
2426 						rx_queue_id);
2427 		if (ret == 0) {
2428 			rxa_update_queue(rx_adapter,
2429 					&rx_adapter->eth_devices[eth_dev_id],
2430 					rx_queue_id,
2431 					0);
2432 			if (dev_info->nb_dev_queues == 0) {
2433 				rte_free(dev_info->rx_queue);
2434 				dev_info->rx_queue = NULL;
2435 			}
2436 		}
2437 	} else {
2438 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2439 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2440 
2441 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2442 			&rx_poll, &rx_wrr);
2443 		if (ret)
2444 			return ret;
2445 
2446 		rte_spinlock_lock(&rx_adapter->rx_lock);
2447 
2448 		num_intr_vec = 0;
2449 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2450 
2451 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2452 						rx_queue_id, 0);
2453 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2454 					rx_queue_id);
2455 			if (ret)
2456 				goto unlock_ret;
2457 		}
2458 
2459 		if (nb_rx_intr == 0) {
2460 			ret = rxa_free_intr_resources(rx_adapter);
2461 			if (ret)
2462 				goto unlock_ret;
2463 		}
2464 
2465 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2466 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2467 
2468 		rte_free(rx_adapter->eth_rx_poll);
2469 		rte_free(rx_adapter->wrr_sched);
2470 
2471 		if (nb_rx_intr == 0) {
2472 			rte_free(dev_info->intr_queue);
2473 			dev_info->intr_queue = NULL;
2474 		}
2475 
2476 		rx_adapter->eth_rx_poll = rx_poll;
2477 		rx_adapter->wrr_sched = rx_wrr;
2478 		rx_adapter->wrr_len = nb_wrr;
2479 		rx_adapter->num_intr_vec += num_intr_vec;
2480 
2481 		if (dev_info->nb_dev_queues == 0) {
2482 			rte_free(dev_info->rx_queue);
2483 			dev_info->rx_queue = NULL;
2484 		}
2485 unlock_ret:
2486 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2487 		if (ret) {
2488 			rte_free(rx_poll);
2489 			rte_free(rx_wrr);
2490 			return ret;
2491 		}
2492 
2493 		rte_service_component_runstate_set(rx_adapter->service_id,
2494 				rxa_sw_adapter_queue_count(rx_adapter));
2495 	}
2496 
2497 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2498 		rx_queue_id, ret);
2499 	return ret;
2500 }
2501 
2502 int
2503 rte_event_eth_rx_adapter_queue_event_vector_config(
2504 	uint8_t id, uint16_t eth_dev_id, int32_t rx_queue_id,
2505 	struct rte_event_eth_rx_adapter_event_vector_config *config)
2506 {
2507 	struct rte_event_eth_rx_adapter_vector_limits limits;
2508 	struct rte_event_eth_rx_adapter *rx_adapter;
2509 	struct rte_eventdev *dev;
2510 	uint32_t cap;
2511 	int ret;
2512 
2513 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2514 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2515 
2516 	rx_adapter = rxa_id_to_adapter(id);
2517 	if ((rx_adapter == NULL) || (config == NULL))
2518 		return -EINVAL;
2519 
2520 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2521 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2522 						eth_dev_id, &cap);
2523 	if (ret) {
2524 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2525 				 "eth port %" PRIu16,
2526 				 id, eth_dev_id);
2527 		return ret;
2528 	}
2529 
2530 	if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR)) {
2531 		RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2532 				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2533 				 eth_dev_id, id);
2534 		return -EINVAL;
2535 	}
2536 
2537 	ret = rte_event_eth_rx_adapter_vector_limits_get(
2538 		rx_adapter->eventdev_id, eth_dev_id, &limits);
2539 	if (ret) {
2540 		RTE_EDEV_LOG_ERR("Failed to get vector limits edev %" PRIu8
2541 				 "eth port %" PRIu16,
2542 				 rx_adapter->eventdev_id, eth_dev_id);
2543 		return ret;
2544 	}
2545 
2546 	if (config->vector_sz < limits.min_sz ||
2547 	    config->vector_sz > limits.max_sz ||
2548 	    config->vector_timeout_ns < limits.min_timeout_ns ||
2549 	    config->vector_timeout_ns > limits.max_timeout_ns ||
2550 	    config->vector_mp == NULL) {
2551 		RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2552 				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2553 				 eth_dev_id, id);
2554 		return -EINVAL;
2555 	}
2556 	if (config->vector_mp->elt_size <
2557 	    (sizeof(struct rte_event_vector) +
2558 	     (sizeof(uintptr_t) * config->vector_sz))) {
2559 		RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2560 				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2561 				 eth_dev_id, id);
2562 		return -EINVAL;
2563 	}
2564 
2565 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2566 		RTE_FUNC_PTR_OR_ERR_RET(
2567 			*dev->dev_ops->eth_rx_adapter_event_vector_config,
2568 			-ENOTSUP);
2569 		ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
2570 			dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
2571 	} else {
2572 		rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
2573 					      rx_queue_id, config);
2574 	}
2575 
2576 	return ret;
2577 }
2578 
2579 int
2580 rte_event_eth_rx_adapter_vector_limits_get(
2581 	uint8_t dev_id, uint16_t eth_port_id,
2582 	struct rte_event_eth_rx_adapter_vector_limits *limits)
2583 {
2584 	struct rte_eventdev *dev;
2585 	uint32_t cap;
2586 	int ret;
2587 
2588 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2589 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2590 
2591 	if (limits == NULL)
2592 		return -EINVAL;
2593 
2594 	dev = &rte_eventdevs[dev_id];
2595 
2596 	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2597 	if (ret) {
2598 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2599 				 "eth port %" PRIu16,
2600 				 dev_id, eth_port_id);
2601 		return ret;
2602 	}
2603 
2604 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2605 		RTE_FUNC_PTR_OR_ERR_RET(
2606 			*dev->dev_ops->eth_rx_adapter_vector_limits_get,
2607 			-ENOTSUP);
2608 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2609 			dev, &rte_eth_devices[eth_port_id], limits);
2610 	} else {
2611 		ret = rxa_sw_vector_limits(limits);
2612 	}
2613 
2614 	return ret;
2615 }
2616 
2617 int
2618 rte_event_eth_rx_adapter_start(uint8_t id)
2619 {
2620 	rte_eventdev_trace_eth_rx_adapter_start(id);
2621 	return rxa_ctrl(id, 1);
2622 }
2623 
2624 int
2625 rte_event_eth_rx_adapter_stop(uint8_t id)
2626 {
2627 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2628 	return rxa_ctrl(id, 0);
2629 }
2630 
2631 int
2632 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2633 			       struct rte_event_eth_rx_adapter_stats *stats)
2634 {
2635 	struct rte_event_eth_rx_adapter *rx_adapter;
2636 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2637 	struct rte_event_eth_rx_adapter_stats dev_stats;
2638 	struct rte_eventdev *dev;
2639 	struct eth_device_info *dev_info;
2640 	uint32_t i;
2641 	int ret;
2642 
2643 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2644 
2645 	rx_adapter = rxa_id_to_adapter(id);
2646 	if (rx_adapter  == NULL || stats == NULL)
2647 		return -EINVAL;
2648 
2649 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2650 	memset(stats, 0, sizeof(*stats));
2651 	RTE_ETH_FOREACH_DEV(i) {
2652 		dev_info = &rx_adapter->eth_devices[i];
2653 		if (dev_info->internal_event_port == 0 ||
2654 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2655 			continue;
2656 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2657 						&rte_eth_devices[i],
2658 						&dev_stats);
2659 		if (ret)
2660 			continue;
2661 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
2662 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2663 	}
2664 
2665 	if (rx_adapter->service_inited)
2666 		*stats = rx_adapter->stats;
2667 
2668 	stats->rx_packets += dev_stats_sum.rx_packets;
2669 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2670 	return 0;
2671 }
2672 
2673 int
2674 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2675 {
2676 	struct rte_event_eth_rx_adapter *rx_adapter;
2677 	struct rte_eventdev *dev;
2678 	struct eth_device_info *dev_info;
2679 	uint32_t i;
2680 
2681 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2682 
2683 	rx_adapter = rxa_id_to_adapter(id);
2684 	if (rx_adapter == NULL)
2685 		return -EINVAL;
2686 
2687 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2688 	RTE_ETH_FOREACH_DEV(i) {
2689 		dev_info = &rx_adapter->eth_devices[i];
2690 		if (dev_info->internal_event_port == 0 ||
2691 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2692 			continue;
2693 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2694 							&rte_eth_devices[i]);
2695 	}
2696 
2697 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2698 	return 0;
2699 }
2700 
2701 int
2702 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2703 {
2704 	struct rte_event_eth_rx_adapter *rx_adapter;
2705 
2706 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2707 
2708 	rx_adapter = rxa_id_to_adapter(id);
2709 	if (rx_adapter == NULL || service_id == NULL)
2710 		return -EINVAL;
2711 
2712 	if (rx_adapter->service_inited)
2713 		*service_id = rx_adapter->service_id;
2714 
2715 	return rx_adapter->service_inited ? 0 : -ESRCH;
2716 }
2717 
2718 int
2719 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2720 					uint16_t eth_dev_id,
2721 					rte_event_eth_rx_adapter_cb_fn cb_fn,
2722 					void *cb_arg)
2723 {
2724 	struct rte_event_eth_rx_adapter *rx_adapter;
2725 	struct eth_device_info *dev_info;
2726 	uint32_t cap;
2727 	int ret;
2728 
2729 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2730 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2731 
2732 	rx_adapter = rxa_id_to_adapter(id);
2733 	if (rx_adapter == NULL)
2734 		return -EINVAL;
2735 
2736 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2737 	if (dev_info->rx_queue == NULL)
2738 		return -EINVAL;
2739 
2740 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2741 						eth_dev_id,
2742 						&cap);
2743 	if (ret) {
2744 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2745 			"eth port %" PRIu16, id, eth_dev_id);
2746 		return ret;
2747 	}
2748 
2749 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2750 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2751 				PRIu16, eth_dev_id);
2752 		return -EINVAL;
2753 	}
2754 
2755 	rte_spinlock_lock(&rx_adapter->rx_lock);
2756 	dev_info->cb_fn = cb_fn;
2757 	dev_info->cb_arg = cb_arg;
2758 	rte_spinlock_unlock(&rx_adapter->rx_lock);
2759 
2760 	return 0;
2761 }
2762