xref: /dpdk/drivers/net/af_xdp/rte_eth_af_xdp.c (revision 2b843cac232eb3f2fa79e4254e21766817e2019f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2019-2020 Intel Corporation.
3  */
4 #include <unistd.h>
5 #include <errno.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <netinet/in.h>
9 #include <net/if.h>
10 #include <sys/un.h>
11 #include <sys/socket.h>
12 #include <sys/ioctl.h>
13 #include <linux/if_ether.h>
14 #include <linux/if_xdp.h>
15 #include <linux/if_link.h>
16 #include <linux/ethtool.h>
17 #include <linux/sockios.h>
18 #include "af_xdp_deps.h"
19 
20 #include <rte_ethdev.h>
21 #include <ethdev_driver.h>
22 #include <ethdev_vdev.h>
23 #include <rte_kvargs.h>
24 #include <bus_vdev_driver.h>
25 #include <rte_string_fns.h>
26 #include <rte_branch_prediction.h>
27 #include <rte_common.h>
28 #include <dev_driver.h>
29 #include <rte_eal.h>
30 #include <rte_ether.h>
31 #include <rte_lcore.h>
32 #include <rte_log.h>
33 #include <rte_memory.h>
34 #include <rte_memzone.h>
35 #include <rte_mempool.h>
36 #include <rte_mbuf.h>
37 #include <rte_malloc.h>
38 #include <rte_ring.h>
39 #include <rte_spinlock.h>
40 #include <rte_power_intrinsics.h>
41 
42 #include "compat.h"
43 #include "eal_filesystem.h"
44 
45 #ifndef SO_PREFER_BUSY_POLL
46 #define SO_PREFER_BUSY_POLL 69
47 #endif
48 #ifndef SO_BUSY_POLL_BUDGET
49 #define SO_BUSY_POLL_BUDGET 70
50 #endif
51 
52 
53 #ifndef SOL_XDP
54 #define SOL_XDP 283
55 #endif
56 
57 #ifndef AF_XDP
58 #define AF_XDP 44
59 #endif
60 
61 #ifndef PF_XDP
62 #define PF_XDP AF_XDP
63 #endif
64 
65 RTE_LOG_REGISTER_DEFAULT(af_xdp_logtype, NOTICE);
66 #define RTE_LOGTYPE_NET_AF_XDP af_xdp_logtype
67 
68 #define AF_XDP_LOG_LINE(level, ...) \
69 	RTE_LOG_LINE_PREFIX(level, NET_AF_XDP, "%s(): ", __func__, __VA_ARGS__)
70 
71 #define ETH_AF_XDP_FRAME_SIZE		2048
72 #define ETH_AF_XDP_NUM_BUFFERS		4096
73 #define ETH_AF_XDP_DFLT_NUM_DESCS	XSK_RING_CONS__DEFAULT_NUM_DESCS
74 #define ETH_AF_XDP_DFLT_START_QUEUE_IDX	0
75 #define ETH_AF_XDP_DFLT_QUEUE_COUNT	1
76 #define ETH_AF_XDP_DFLT_BUSY_BUDGET	64
77 #define ETH_AF_XDP_DFLT_BUSY_TIMEOUT	20
78 
79 #define ETH_AF_XDP_RX_BATCH_SIZE	XSK_RING_CONS__DEFAULT_NUM_DESCS
80 #define ETH_AF_XDP_TX_BATCH_SIZE	XSK_RING_CONS__DEFAULT_NUM_DESCS
81 
82 #define ETH_AF_XDP_ETH_OVERHEAD		(RTE_ETHER_HDR_LEN + RTE_ETHER_CRC_LEN)
83 
84 #define ETH_AF_XDP_MP_KEY "afxdp_mp_send_fds"
85 
86 #define DP_BASE_PATH			"/tmp/afxdp_dp"
87 #define DP_UDS_SOCK             "afxdp.sock"
88 #define DP_XSK_MAP				"xsks_map"
89 #define MAX_LONG_OPT_SZ			64
90 #define UDS_MAX_FD_NUM			2
91 #define UDS_MAX_CMD_LEN			64
92 #define UDS_MAX_CMD_RESP		128
93 #define UDS_XSK_MAP_FD_MSG		"/xsk_map_fd"
94 #define UDS_CONNECT_MSG			"/connect"
95 #define UDS_HOST_OK_MSG			"/host_ok"
96 #define UDS_HOST_NAK_MSG		"/host_nak"
97 #define UDS_VERSION_MSG			"/version"
98 #define UDS_XSK_MAP_FD_MSG		"/xsk_map_fd"
99 #define UDS_XSK_SOCKET_MSG		"/xsk_socket"
100 #define UDS_FD_ACK_MSG			"/fd_ack"
101 #define UDS_FD_NAK_MSG			"/fd_nak"
102 #define UDS_FIN_MSG			"/fin"
103 #define UDS_FIN_ACK_MSG			"/fin_ack"
104 
105 static int afxdp_dev_count;
106 
107 /* Message header to synchronize fds via IPC */
108 struct ipc_hdr {
109 	char port_name[RTE_DEV_NAME_MAX_LEN];
110 	/* The file descriptors are in the dedicated part
111 	 * of the Unix message to be translated by the kernel.
112 	 */
113 };
114 
115 struct xsk_umem_info {
116 	struct xsk_umem *umem;
117 	struct rte_ring *buf_ring;
118 	const struct rte_memzone *mz;
119 	struct rte_mempool *mb_pool;
120 	void *buffer;
121 	RTE_ATOMIC(uint8_t) refcnt;
122 	uint32_t max_xsks;
123 };
124 
125 struct rx_stats {
126 	uint64_t rx_pkts;
127 	uint64_t rx_bytes;
128 	uint64_t imissed_offset;
129 };
130 
131 struct pkt_rx_queue {
132 	struct xsk_ring_cons rx;
133 	struct xsk_umem_info *umem;
134 	struct xsk_socket *xsk;
135 	struct rte_mempool *mb_pool;
136 	uint16_t port;
137 
138 	struct rx_stats stats;
139 
140 	struct xsk_ring_prod fq;
141 	struct xsk_ring_cons cq;
142 
143 	struct pkt_tx_queue *pair;
144 	struct pollfd fds[1];
145 	int xsk_queue_idx;
146 	int busy_budget;
147 };
148 
149 struct tx_stats {
150 	uint64_t tx_pkts;
151 	uint64_t tx_bytes;
152 	uint64_t tx_dropped;
153 };
154 
155 struct pkt_tx_queue {
156 	struct xsk_ring_prod tx;
157 	struct xsk_umem_info *umem;
158 
159 	struct tx_stats stats;
160 
161 	struct pkt_rx_queue *pair;
162 	int xsk_queue_idx;
163 };
164 
165 struct pmd_internals {
166 	int if_index;
167 	char if_name[IFNAMSIZ];
168 	int start_queue_idx;
169 	int queue_cnt;
170 	int max_queue_cnt;
171 	int combined_queue_cnt;
172 	bool shared_umem;
173 	char prog_path[PATH_MAX];
174 	bool custom_prog_configured;
175 	bool force_copy;
176 	bool use_cni;
177 	bool use_pinned_map;
178 	char dp_path[PATH_MAX];
179 	struct bpf_map *map;
180 
181 	struct rte_ether_addr eth_addr;
182 
183 	struct pkt_rx_queue *rx_queues;
184 	struct pkt_tx_queue *tx_queues;
185 };
186 
187 struct pmd_process_private {
188 	int rxq_xsk_fds[RTE_MAX_QUEUES_PER_PORT];
189 };
190 
191 #define ETH_AF_XDP_IFACE_ARG			"iface"
192 #define ETH_AF_XDP_START_QUEUE_ARG		"start_queue"
193 #define ETH_AF_XDP_QUEUE_COUNT_ARG		"queue_count"
194 #define ETH_AF_XDP_SHARED_UMEM_ARG		"shared_umem"
195 #define ETH_AF_XDP_PROG_ARG			"xdp_prog"
196 #define ETH_AF_XDP_BUDGET_ARG			"busy_budget"
197 #define ETH_AF_XDP_FORCE_COPY_ARG		"force_copy"
198 #define ETH_AF_XDP_USE_CNI_ARG			"use_cni"
199 #define ETH_AF_XDP_USE_PINNED_MAP_ARG	"use_pinned_map"
200 #define ETH_AF_XDP_DP_PATH_ARG			"dp_path"
201 
202 static const char * const valid_arguments[] = {
203 	ETH_AF_XDP_IFACE_ARG,
204 	ETH_AF_XDP_START_QUEUE_ARG,
205 	ETH_AF_XDP_QUEUE_COUNT_ARG,
206 	ETH_AF_XDP_SHARED_UMEM_ARG,
207 	ETH_AF_XDP_PROG_ARG,
208 	ETH_AF_XDP_BUDGET_ARG,
209 	ETH_AF_XDP_FORCE_COPY_ARG,
210 	ETH_AF_XDP_USE_CNI_ARG,
211 	ETH_AF_XDP_USE_PINNED_MAP_ARG,
212 	ETH_AF_XDP_DP_PATH_ARG,
213 	NULL
214 };
215 
216 static const struct rte_eth_link pmd_link = {
217 	.link_speed = RTE_ETH_SPEED_NUM_10G,
218 	.link_duplex = RTE_ETH_LINK_FULL_DUPLEX,
219 	.link_status = RTE_ETH_LINK_DOWN,
220 	.link_autoneg = RTE_ETH_LINK_AUTONEG
221 };
222 
223 /* List which tracks PMDs to facilitate sharing UMEMs across them. */
224 struct internal_list {
225 	TAILQ_ENTRY(internal_list) next;
226 	struct rte_eth_dev *eth_dev;
227 };
228 
229 TAILQ_HEAD(internal_list_head, internal_list);
230 static struct internal_list_head internal_list =
231 	TAILQ_HEAD_INITIALIZER(internal_list);
232 
233 static pthread_mutex_t internal_list_lock = PTHREAD_MUTEX_INITIALIZER;
234 
235 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
236 static inline int
237 reserve_fill_queue_zc(struct xsk_umem_info *umem, uint16_t reserve_size,
238 		      struct rte_mbuf **bufs, struct xsk_ring_prod *fq)
239 {
240 	uint32_t idx;
241 	uint16_t i;
242 
243 	if (unlikely(!xsk_ring_prod__reserve(fq, reserve_size, &idx))) {
244 		for (i = 0; i < reserve_size; i++)
245 			rte_pktmbuf_free(bufs[i]);
246 		AF_XDP_LOG_LINE(DEBUG, "Failed to reserve enough fq descs.");
247 		return -1;
248 	}
249 
250 	for (i = 0; i < reserve_size; i++) {
251 		__u64 *fq_addr;
252 		uint64_t addr;
253 
254 		fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
255 		addr = (uint64_t)bufs[i] - (uint64_t)umem->buffer -
256 				umem->mb_pool->header_size;
257 		*fq_addr = addr;
258 	}
259 
260 	xsk_ring_prod__submit(fq, reserve_size);
261 
262 	return 0;
263 }
264 #else
265 static inline int
266 reserve_fill_queue_cp(struct xsk_umem_info *umem, uint16_t reserve_size,
267 		      struct rte_mbuf **bufs __rte_unused,
268 		      struct xsk_ring_prod *fq)
269 {
270 	void *addrs[reserve_size];
271 	uint32_t idx;
272 	uint16_t i;
273 
274 	if (rte_ring_dequeue_bulk(umem->buf_ring, addrs, reserve_size, NULL)
275 		    != reserve_size) {
276 		AF_XDP_LOG_LINE(DEBUG, "Failed to get enough buffers for fq.");
277 		return -1;
278 	}
279 
280 	if (unlikely(!xsk_ring_prod__reserve(fq, reserve_size, &idx))) {
281 		AF_XDP_LOG_LINE(DEBUG, "Failed to reserve enough fq descs.");
282 		rte_ring_enqueue_bulk(umem->buf_ring, addrs,
283 				reserve_size, NULL);
284 		return -1;
285 	}
286 
287 	for (i = 0; i < reserve_size; i++) {
288 		__u64 *fq_addr;
289 
290 		fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
291 		*fq_addr = (uint64_t)addrs[i];
292 	}
293 
294 	xsk_ring_prod__submit(fq, reserve_size);
295 
296 	return 0;
297 }
298 #endif
299 
300 static inline int
301 reserve_fill_queue(struct xsk_umem_info *umem, uint16_t reserve_size,
302 		   struct rte_mbuf **bufs, struct xsk_ring_prod *fq)
303 {
304 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
305 	return reserve_fill_queue_zc(umem, reserve_size, bufs, fq);
306 #else
307 	return reserve_fill_queue_cp(umem, reserve_size, bufs, fq);
308 #endif
309 }
310 
311 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
312 static uint16_t
313 af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
314 {
315 	struct pkt_rx_queue *rxq = queue;
316 	struct xsk_ring_cons *rx = &rxq->rx;
317 	struct xsk_ring_prod *fq = &rxq->fq;
318 	struct xsk_umem_info *umem = rxq->umem;
319 	uint32_t idx_rx = 0;
320 	unsigned long rx_bytes = 0;
321 	int i;
322 	struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
323 	struct rte_eth_dev *dev = &rte_eth_devices[rxq->port];
324 
325 	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
326 
327 	if (nb_pkts == 0) {
328 		/* we can assume a kernel >= 5.11 is in use if busy polling is
329 		 * enabled and thus we can safely use the recvfrom() syscall
330 		 * which is only supported for AF_XDP sockets in kernels >=
331 		 * 5.11.
332 		 */
333 		if (rxq->busy_budget) {
334 			(void)recvfrom(xsk_socket__fd(rxq->xsk), NULL, 0,
335 				       MSG_DONTWAIT, NULL, NULL);
336 		} else if (xsk_ring_prod__needs_wakeup(fq)) {
337 			(void)poll(&rxq->fds[0], 1, 1000);
338 		}
339 
340 		return 0;
341 	}
342 
343 	/* allocate bufs for fill queue replenishment after rx */
344 	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
345 		AF_XDP_LOG_LINE(DEBUG,
346 			"Failed to get enough buffers for fq.");
347 		/* rollback cached_cons which is added by
348 		 * xsk_ring_cons__peek
349 		 */
350 		rx->cached_cons -= nb_pkts;
351 		dev->data->rx_mbuf_alloc_failed += nb_pkts;
352 
353 		return 0;
354 	}
355 
356 	for (i = 0; i < nb_pkts; i++) {
357 		const struct xdp_desc *desc;
358 		uint64_t addr;
359 		uint32_t len;
360 		uint64_t offset;
361 
362 		desc = xsk_ring_cons__rx_desc(rx, idx_rx++);
363 		addr = desc->addr;
364 		len = desc->len;
365 
366 		offset = xsk_umem__extract_offset(addr);
367 		addr = xsk_umem__extract_addr(addr);
368 
369 		bufs[i] = (struct rte_mbuf *)
370 				xsk_umem__get_data(umem->buffer, addr +
371 					umem->mb_pool->header_size);
372 		bufs[i]->data_off = offset - sizeof(struct rte_mbuf) -
373 			rte_pktmbuf_priv_size(umem->mb_pool) -
374 			umem->mb_pool->header_size;
375 		bufs[i]->port = rxq->port;
376 
377 		rte_pktmbuf_pkt_len(bufs[i]) = len;
378 		rte_pktmbuf_data_len(bufs[i]) = len;
379 		rx_bytes += len;
380 	}
381 
382 	xsk_ring_cons__release(rx, nb_pkts);
383 	(void)reserve_fill_queue(umem, nb_pkts, fq_bufs, fq);
384 
385 	/* statistics */
386 	rxq->stats.rx_pkts += nb_pkts;
387 	rxq->stats.rx_bytes += rx_bytes;
388 
389 	return nb_pkts;
390 }
391 #else
392 static uint16_t
393 af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
394 {
395 	struct pkt_rx_queue *rxq = queue;
396 	struct xsk_ring_cons *rx = &rxq->rx;
397 	struct xsk_umem_info *umem = rxq->umem;
398 	struct xsk_ring_prod *fq = &rxq->fq;
399 	uint32_t idx_rx = 0;
400 	unsigned long rx_bytes = 0;
401 	int i;
402 	uint32_t free_thresh = fq->size >> 1;
403 	struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
404 	struct rte_eth_dev *dev = &rte_eth_devices[rxq->port];
405 
406 	if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
407 		(void)reserve_fill_queue(umem, nb_pkts, NULL, fq);
408 
409 	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
410 	if (nb_pkts == 0) {
411 #if defined(XDP_USE_NEED_WAKEUP)
412 		if (xsk_ring_prod__needs_wakeup(fq))
413 			(void)poll(rxq->fds, 1, 1000);
414 #endif
415 		return 0;
416 	}
417 
418 	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts))) {
419 		/* rollback cached_cons which is added by
420 		 * xsk_ring_cons__peek
421 		 */
422 		rx->cached_cons -= nb_pkts;
423 		dev->data->rx_mbuf_alloc_failed += nb_pkts;
424 		return 0;
425 	}
426 
427 	for (i = 0; i < nb_pkts; i++) {
428 		const struct xdp_desc *desc;
429 		uint64_t addr;
430 		uint32_t len;
431 		void *pkt;
432 
433 		desc = xsk_ring_cons__rx_desc(rx, idx_rx++);
434 		addr = desc->addr;
435 		len = desc->len;
436 		pkt = xsk_umem__get_data(rxq->umem->mz->addr, addr);
437 
438 		rte_memcpy(rte_pktmbuf_mtod(mbufs[i], void *), pkt, len);
439 		rte_ring_enqueue(umem->buf_ring, (void *)addr);
440 		rte_pktmbuf_pkt_len(mbufs[i]) = len;
441 		rte_pktmbuf_data_len(mbufs[i]) = len;
442 		rx_bytes += len;
443 		bufs[i] = mbufs[i];
444 		bufs[i]->port = rxq->port;
445 	}
446 
447 	xsk_ring_cons__release(rx, nb_pkts);
448 
449 	/* statistics */
450 	rxq->stats.rx_pkts += nb_pkts;
451 	rxq->stats.rx_bytes += rx_bytes;
452 
453 	return nb_pkts;
454 }
455 #endif
456 
457 static uint16_t
458 af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
459 {
460 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
461 	return af_xdp_rx_zc(queue, bufs, nb_pkts);
462 #else
463 	return af_xdp_rx_cp(queue, bufs, nb_pkts);
464 #endif
465 }
466 
467 static uint16_t
468 eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
469 {
470 	uint16_t nb_rx;
471 
472 	if (likely(nb_pkts <= ETH_AF_XDP_RX_BATCH_SIZE))
473 		return af_xdp_rx(queue, bufs, nb_pkts);
474 
475 	/* Split larger batch into smaller batches of size
476 	 * ETH_AF_XDP_RX_BATCH_SIZE or less.
477 	 */
478 	nb_rx = 0;
479 	while (nb_pkts) {
480 		uint16_t ret, n;
481 
482 		n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE);
483 		ret = af_xdp_rx(queue, &bufs[nb_rx], n);
484 		nb_rx = (uint16_t)(nb_rx + ret);
485 		nb_pkts = (uint16_t)(nb_pkts - ret);
486 		if (ret < n)
487 			break;
488 	}
489 
490 	return nb_rx;
491 }
492 
493 static void
494 pull_umem_cq(struct xsk_umem_info *umem, int size, struct xsk_ring_cons *cq)
495 {
496 	size_t i, n;
497 	uint32_t idx_cq = 0;
498 
499 	n = xsk_ring_cons__peek(cq, size, &idx_cq);
500 
501 	for (i = 0; i < n; i++) {
502 		uint64_t addr;
503 		addr = *xsk_ring_cons__comp_addr(cq, idx_cq++);
504 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
505 		addr = xsk_umem__extract_addr(addr);
506 		rte_pktmbuf_free((struct rte_mbuf *)
507 					xsk_umem__get_data(umem->buffer,
508 					addr + umem->mb_pool->header_size));
509 #else
510 		rte_ring_enqueue(umem->buf_ring, (void *)addr);
511 #endif
512 	}
513 
514 	xsk_ring_cons__release(cq, n);
515 }
516 
517 static void
518 kick_tx(struct pkt_tx_queue *txq, struct xsk_ring_cons *cq)
519 {
520 	struct xsk_umem_info *umem = txq->umem;
521 
522 	pull_umem_cq(umem, XSK_RING_CONS__DEFAULT_NUM_DESCS, cq);
523 
524 	if (tx_syscall_needed(&txq->tx))
525 		while (send(xsk_socket__fd(txq->pair->xsk), NULL,
526 			    0, MSG_DONTWAIT) < 0) {
527 			/* some thing unexpected */
528 			if (errno != EBUSY && errno != EAGAIN && errno != EINTR)
529 				break;
530 
531 			/* pull from completion queue to leave more space */
532 			if (errno == EAGAIN)
533 				pull_umem_cq(umem,
534 					     XSK_RING_CONS__DEFAULT_NUM_DESCS,
535 					     cq);
536 		}
537 }
538 
539 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
540 static uint16_t
541 af_xdp_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
542 {
543 	struct pkt_tx_queue *txq = queue;
544 	struct xsk_umem_info *umem = txq->umem;
545 	struct rte_mbuf *mbuf;
546 	unsigned long tx_bytes = 0;
547 	int i;
548 	uint32_t idx_tx;
549 	uint16_t count = 0;
550 	struct xdp_desc *desc;
551 	uint64_t addr, offset;
552 	struct xsk_ring_cons *cq = &txq->pair->cq;
553 	uint32_t free_thresh = cq->size >> 1;
554 
555 	if (xsk_cons_nb_avail(cq, free_thresh) >= free_thresh)
556 		pull_umem_cq(umem, XSK_RING_CONS__DEFAULT_NUM_DESCS, cq);
557 
558 	for (i = 0; i < nb_pkts; i++) {
559 		mbuf = bufs[i];
560 
561 		if (mbuf->pool == umem->mb_pool) {
562 			if (!xsk_ring_prod__reserve(&txq->tx, 1, &idx_tx)) {
563 				kick_tx(txq, cq);
564 				if (!xsk_ring_prod__reserve(&txq->tx, 1,
565 							    &idx_tx))
566 					goto out;
567 			}
568 			desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx);
569 			desc->len = mbuf->pkt_len;
570 			addr = (uint64_t)mbuf - (uint64_t)umem->buffer -
571 					umem->mb_pool->header_size;
572 			offset = rte_pktmbuf_mtod(mbuf, uint64_t) -
573 					(uint64_t)mbuf +
574 					umem->mb_pool->header_size;
575 			offset = offset << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
576 			desc->addr = addr | offset;
577 			count++;
578 		} else {
579 			struct rte_mbuf *local_mbuf =
580 					rte_pktmbuf_alloc(umem->mb_pool);
581 			void *pkt;
582 
583 			if (local_mbuf == NULL)
584 				goto out;
585 
586 			if (!xsk_ring_prod__reserve(&txq->tx, 1, &idx_tx)) {
587 				rte_pktmbuf_free(local_mbuf);
588 				goto out;
589 			}
590 
591 			desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx);
592 			desc->len = mbuf->pkt_len;
593 
594 			addr = (uint64_t)local_mbuf - (uint64_t)umem->buffer -
595 					umem->mb_pool->header_size;
596 			offset = rte_pktmbuf_mtod(local_mbuf, uint64_t) -
597 					(uint64_t)local_mbuf +
598 					umem->mb_pool->header_size;
599 			pkt = xsk_umem__get_data(umem->buffer, addr + offset);
600 			offset = offset << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
601 			desc->addr = addr | offset;
602 			rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
603 					desc->len);
604 			rte_pktmbuf_free(mbuf);
605 			count++;
606 		}
607 
608 		tx_bytes += mbuf->pkt_len;
609 	}
610 
611 out:
612 	xsk_ring_prod__submit(&txq->tx, count);
613 	kick_tx(txq, cq);
614 
615 	txq->stats.tx_pkts += count;
616 	txq->stats.tx_bytes += tx_bytes;
617 	txq->stats.tx_dropped += nb_pkts - count;
618 
619 	return count;
620 }
621 #else
622 static uint16_t
623 af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
624 {
625 	struct pkt_tx_queue *txq = queue;
626 	struct xsk_umem_info *umem = txq->umem;
627 	struct rte_mbuf *mbuf;
628 	void *addrs[ETH_AF_XDP_TX_BATCH_SIZE];
629 	unsigned long tx_bytes = 0;
630 	int i;
631 	uint32_t idx_tx;
632 	struct xsk_ring_cons *cq = &txq->pair->cq;
633 
634 	pull_umem_cq(umem, nb_pkts, cq);
635 
636 	nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
637 					nb_pkts, NULL);
638 	if (nb_pkts == 0)
639 		return 0;
640 
641 	if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) {
642 		kick_tx(txq, cq);
643 		rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts, NULL);
644 		return 0;
645 	}
646 
647 	for (i = 0; i < nb_pkts; i++) {
648 		struct xdp_desc *desc;
649 		void *pkt;
650 
651 		desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
652 		mbuf = bufs[i];
653 		desc->len = mbuf->pkt_len;
654 
655 		desc->addr = (uint64_t)addrs[i];
656 		pkt = xsk_umem__get_data(umem->mz->addr,
657 					 desc->addr);
658 		rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), desc->len);
659 		tx_bytes += mbuf->pkt_len;
660 		rte_pktmbuf_free(mbuf);
661 	}
662 
663 	xsk_ring_prod__submit(&txq->tx, nb_pkts);
664 
665 	kick_tx(txq, cq);
666 
667 	txq->stats.tx_pkts += nb_pkts;
668 	txq->stats.tx_bytes += tx_bytes;
669 
670 	return nb_pkts;
671 }
672 
673 static uint16_t
674 af_xdp_tx_cp_batch(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
675 {
676 	uint16_t nb_tx;
677 
678 	if (likely(nb_pkts <= ETH_AF_XDP_TX_BATCH_SIZE))
679 		return af_xdp_tx_cp(queue, bufs, nb_pkts);
680 
681 	nb_tx = 0;
682 	while (nb_pkts) {
683 		uint16_t ret, n;
684 
685 		/* Split larger batch into smaller batches of size
686 		 * ETH_AF_XDP_TX_BATCH_SIZE or less.
687 		 */
688 		n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
689 		ret = af_xdp_tx_cp(queue, &bufs[nb_tx], n);
690 		nb_tx = (uint16_t)(nb_tx + ret);
691 		nb_pkts = (uint16_t)(nb_pkts - ret);
692 		if (ret < n)
693 			break;
694 	}
695 
696 	return nb_tx;
697 }
698 #endif
699 
700 static uint16_t
701 eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
702 {
703 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
704 	return af_xdp_tx_zc(queue, bufs, nb_pkts);
705 #else
706 	return af_xdp_tx_cp_batch(queue, bufs, nb_pkts);
707 #endif
708 }
709 
710 static int
711 eth_dev_start(struct rte_eth_dev *dev)
712 {
713 	uint16_t i;
714 
715 	dev->data->dev_link.link_status = RTE_ETH_LINK_UP;
716 	for (i = 0; i < dev->data->nb_rx_queues; i++) {
717 		dev->data->rx_queue_state[i] = RTE_ETH_QUEUE_STATE_STARTED;
718 		dev->data->tx_queue_state[i] = RTE_ETH_QUEUE_STATE_STARTED;
719 	}
720 
721 	return 0;
722 }
723 
724 /* This function gets called when the current port gets stopped. */
725 static int
726 eth_dev_stop(struct rte_eth_dev *dev)
727 {
728 	uint16_t i;
729 
730 	dev->data->dev_link.link_status = RTE_ETH_LINK_DOWN;
731 	for (i = 0; i < dev->data->nb_rx_queues; i++) {
732 		dev->data->rx_queue_state[i] = RTE_ETH_QUEUE_STATE_STOPPED;
733 		dev->data->tx_queue_state[i] = RTE_ETH_QUEUE_STATE_STOPPED;
734 	}
735 
736 	return 0;
737 }
738 
739 /* Find ethdev in list */
740 static inline struct internal_list *
741 find_internal_resource(struct pmd_internals *port_int)
742 {
743 	int found = 0;
744 	struct internal_list *list = NULL;
745 
746 	if (port_int == NULL)
747 		return NULL;
748 
749 	pthread_mutex_lock(&internal_list_lock);
750 
751 	TAILQ_FOREACH(list, &internal_list, next) {
752 		struct pmd_internals *list_int =
753 				list->eth_dev->data->dev_private;
754 		if (list_int == port_int) {
755 			found = 1;
756 			break;
757 		}
758 	}
759 
760 	pthread_mutex_unlock(&internal_list_lock);
761 
762 	if (!found)
763 		return NULL;
764 
765 	return list;
766 }
767 
768 static int
769 eth_dev_configure(struct rte_eth_dev *dev)
770 {
771 	struct pmd_internals *internal = dev->data->dev_private;
772 
773 	/* rx/tx must be paired */
774 	if (dev->data->nb_rx_queues != dev->data->nb_tx_queues)
775 		return -EINVAL;
776 
777 	if (internal->shared_umem) {
778 		struct internal_list *list = NULL;
779 		const char *name = dev->device->name;
780 
781 		/* Ensure PMD is not already inserted into the list */
782 		list = find_internal_resource(internal);
783 		if (list)
784 			return 0;
785 
786 		list = rte_zmalloc_socket(name, sizeof(*list), 0,
787 					dev->device->numa_node);
788 		if (list == NULL)
789 			return -1;
790 
791 		list->eth_dev = dev;
792 		pthread_mutex_lock(&internal_list_lock);
793 		TAILQ_INSERT_TAIL(&internal_list, list, next);
794 		pthread_mutex_unlock(&internal_list_lock);
795 	}
796 
797 	return 0;
798 }
799 
800 #define CLB_VAL_IDX 0
801 static int
802 eth_monitor_callback(const uint64_t value,
803 		const uint64_t opaque[RTE_POWER_MONITOR_OPAQUE_SZ])
804 {
805 	const uint64_t v = opaque[CLB_VAL_IDX];
806 	const uint64_t m = (uint32_t)~0;
807 
808 	/* if the value has changed, abort entering power optimized state */
809 	return (value & m) == v ? 0 : -1;
810 }
811 
812 static int
813 eth_get_monitor_addr(void *rx_queue, struct rte_power_monitor_cond *pmc)
814 {
815 	struct pkt_rx_queue *rxq = rx_queue;
816 	unsigned int *prod = rxq->rx.producer;
817 	const uint32_t cur_val = rxq->rx.cached_prod; /* use cached value */
818 
819 	/* watch for changes in producer ring */
820 	pmc->addr = (void *)prod;
821 
822 	/* store current value */
823 	pmc->opaque[CLB_VAL_IDX] = cur_val;
824 	pmc->fn = eth_monitor_callback;
825 
826 	/* AF_XDP producer ring index is 32-bit */
827 	pmc->size = sizeof(uint32_t);
828 
829 	return 0;
830 }
831 
832 static int
833 eth_dev_info(struct rte_eth_dev *dev, struct rte_eth_dev_info *dev_info)
834 {
835 	struct pmd_internals *internals = dev->data->dev_private;
836 
837 	dev_info->if_index = internals->if_index;
838 	dev_info->max_mac_addrs = 1;
839 	dev_info->max_rx_queues = internals->queue_cnt;
840 	dev_info->max_tx_queues = internals->queue_cnt;
841 
842 	dev_info->min_mtu = RTE_ETHER_MIN_MTU;
843 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
844 	dev_info->max_rx_pktlen = getpagesize() -
845 				  sizeof(struct rte_mempool_objhdr) -
846 				  sizeof(struct rte_mbuf) -
847 				  RTE_PKTMBUF_HEADROOM - XDP_PACKET_HEADROOM;
848 #else
849 	dev_info->max_rx_pktlen = ETH_AF_XDP_FRAME_SIZE - XDP_PACKET_HEADROOM;
850 #endif
851 	dev_info->max_mtu = dev_info->max_rx_pktlen - ETH_AF_XDP_ETH_OVERHEAD;
852 
853 	dev_info->default_rxportconf.burst_size = ETH_AF_XDP_DFLT_BUSY_BUDGET;
854 	dev_info->default_txportconf.burst_size = ETH_AF_XDP_DFLT_BUSY_BUDGET;
855 	dev_info->default_rxportconf.nb_queues = 1;
856 	dev_info->default_txportconf.nb_queues = 1;
857 	dev_info->default_rxportconf.ring_size = ETH_AF_XDP_DFLT_NUM_DESCS;
858 	dev_info->default_txportconf.ring_size = ETH_AF_XDP_DFLT_NUM_DESCS;
859 
860 	return 0;
861 }
862 
863 static int
864 eth_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
865 {
866 	struct pmd_internals *internals = dev->data->dev_private;
867 	struct pmd_process_private *process_private = dev->process_private;
868 	struct xdp_statistics xdp_stats;
869 	struct pkt_rx_queue *rxq;
870 	struct pkt_tx_queue *txq;
871 	socklen_t optlen;
872 	int i, ret, fd;
873 
874 	for (i = 0; i < dev->data->nb_rx_queues; i++) {
875 		optlen = sizeof(struct xdp_statistics);
876 		rxq = &internals->rx_queues[i];
877 		txq = rxq->pair;
878 		stats->q_ipackets[i] = rxq->stats.rx_pkts;
879 		stats->q_ibytes[i] = rxq->stats.rx_bytes;
880 
881 		stats->q_opackets[i] = txq->stats.tx_pkts;
882 		stats->q_obytes[i] = txq->stats.tx_bytes;
883 
884 		stats->ipackets += stats->q_ipackets[i];
885 		stats->ibytes += stats->q_ibytes[i];
886 		stats->oerrors += txq->stats.tx_dropped;
887 		fd = process_private->rxq_xsk_fds[i];
888 		ret = fd >= 0 ? getsockopt(fd, SOL_XDP, XDP_STATISTICS,
889 					   &xdp_stats, &optlen) : -1;
890 		if (ret != 0) {
891 			AF_XDP_LOG_LINE(ERR, "getsockopt() failed for XDP_STATISTICS.");
892 			return -1;
893 		}
894 		stats->imissed += xdp_stats.rx_dropped - rxq->stats.imissed_offset;
895 
896 		stats->opackets += stats->q_opackets[i];
897 		stats->obytes += stats->q_obytes[i];
898 	}
899 
900 	return 0;
901 }
902 
903 static int
904 eth_stats_reset(struct rte_eth_dev *dev)
905 {
906 	struct pmd_internals *internals = dev->data->dev_private;
907 	struct pmd_process_private *process_private = dev->process_private;
908 	struct xdp_statistics xdp_stats;
909 	socklen_t optlen;
910 	int i, ret, fd;
911 
912 	for (i = 0; i < internals->queue_cnt; i++) {
913 		memset(&internals->rx_queues[i].stats, 0,
914 					sizeof(struct rx_stats));
915 		memset(&internals->tx_queues[i].stats, 0,
916 					sizeof(struct tx_stats));
917 		fd = process_private->rxq_xsk_fds[i];
918 		optlen = sizeof(struct xdp_statistics);
919 		ret = fd >= 0 ? getsockopt(fd, SOL_XDP, XDP_STATISTICS,
920 					   &xdp_stats, &optlen) : -1;
921 		if (ret != 0) {
922 			AF_XDP_LOG_LINE(ERR, "getsockopt() failed for XDP_STATISTICS.");
923 			return -1;
924 		}
925 		internals->rx_queues[i].stats.imissed_offset = xdp_stats.rx_dropped;
926 	}
927 
928 	return 0;
929 }
930 
931 #ifdef RTE_NET_AF_XDP_LIBBPF_XDP_ATTACH
932 
933 static int link_xdp_prog_with_dev(int ifindex, int fd, __u32 flags)
934 {
935 	return bpf_xdp_attach(ifindex, fd, flags, NULL);
936 }
937 
938 static int
939 remove_xdp_program(struct pmd_internals *internals)
940 {
941 	uint32_t curr_prog_id = 0;
942 	int ret;
943 
944 	ret = bpf_xdp_query_id(internals->if_index, XDP_FLAGS_UPDATE_IF_NOEXIST,
945 			       &curr_prog_id);
946 	if (ret != 0) {
947 		AF_XDP_LOG_LINE(ERR, "bpf_xdp_query_id failed");
948 		return ret;
949 	}
950 
951 	ret = bpf_xdp_detach(internals->if_index, XDP_FLAGS_UPDATE_IF_NOEXIST,
952 			     NULL);
953 	if (ret != 0)
954 		AF_XDP_LOG_LINE(ERR, "bpf_xdp_detach failed");
955 	return ret;
956 }
957 
958 #else
959 
960 static int link_xdp_prog_with_dev(int ifindex, int fd, __u32 flags)
961 {
962 	return bpf_set_link_xdp_fd(ifindex, fd, flags);
963 }
964 
965 static int
966 remove_xdp_program(struct pmd_internals *internals)
967 {
968 	uint32_t curr_prog_id = 0;
969 	int ret;
970 
971 	ret = bpf_get_link_xdp_id(internals->if_index, &curr_prog_id,
972 				  XDP_FLAGS_UPDATE_IF_NOEXIST);
973 	if (ret != 0) {
974 		AF_XDP_LOG_LINE(ERR, "bpf_get_link_xdp_id failed");
975 		return ret;
976 	}
977 
978 	ret = bpf_set_link_xdp_fd(internals->if_index, -1,
979 				  XDP_FLAGS_UPDATE_IF_NOEXIST);
980 	if (ret != 0)
981 		AF_XDP_LOG_LINE(ERR, "bpf_set_link_xdp_fd failed");
982 	return ret;
983 }
984 
985 #endif
986 
987 static void
988 xdp_umem_destroy(struct xsk_umem_info *umem)
989 {
990 	(void)xsk_umem__delete(umem->umem);
991 	umem->umem = NULL;
992 
993 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
994 	umem->mb_pool = NULL;
995 #else
996 	rte_memzone_free(umem->mz);
997 	umem->mz = NULL;
998 
999 	rte_ring_free(umem->buf_ring);
1000 	umem->buf_ring = NULL;
1001 #endif
1002 
1003 	rte_free(umem);
1004 }
1005 
1006 static int
1007 eth_dev_close(struct rte_eth_dev *dev)
1008 {
1009 	struct pmd_internals *internals = dev->data->dev_private;
1010 	struct pkt_rx_queue *rxq;
1011 	int i;
1012 
1013 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
1014 		goto out;
1015 
1016 	AF_XDP_LOG_LINE(INFO, "Closing AF_XDP ethdev on numa socket %u",
1017 		rte_socket_id());
1018 
1019 	for (i = 0; i < internals->queue_cnt; i++) {
1020 		rxq = &internals->rx_queues[i];
1021 		if (rxq->umem == NULL)
1022 			break;
1023 		xsk_socket__delete(rxq->xsk);
1024 
1025 		if (rte_atomic_fetch_sub_explicit(&rxq->umem->refcnt, 1,
1026 				rte_memory_order_acquire) - 1 == 0)
1027 			xdp_umem_destroy(rxq->umem);
1028 
1029 		/* free pkt_tx_queue */
1030 		rte_free(rxq->pair);
1031 		rte_free(rxq);
1032 	}
1033 
1034 	/*
1035 	 * MAC is not allocated dynamically, setting it to NULL would prevent
1036 	 * from releasing it in rte_eth_dev_release_port.
1037 	 */
1038 	dev->data->mac_addrs = NULL;
1039 
1040 	if (remove_xdp_program(internals) != 0)
1041 		AF_XDP_LOG_LINE(ERR, "Error while removing XDP program.");
1042 
1043 	if (internals->shared_umem) {
1044 		struct internal_list *list;
1045 
1046 		/* Remove ethdev from list used to track and share UMEMs */
1047 		list = find_internal_resource(internals);
1048 		if (list) {
1049 			pthread_mutex_lock(&internal_list_lock);
1050 			TAILQ_REMOVE(&internal_list, list, next);
1051 			pthread_mutex_unlock(&internal_list_lock);
1052 			rte_free(list);
1053 		}
1054 	}
1055 
1056 out:
1057 	rte_free(dev->process_private);
1058 
1059 	return 0;
1060 }
1061 
1062 static int
1063 eth_link_update(struct rte_eth_dev *dev __rte_unused,
1064 		int wait_to_complete __rte_unused)
1065 {
1066 	return 0;
1067 }
1068 
1069 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
1070 /* Check if the netdev,qid context already exists */
1071 static inline bool
1072 ctx_exists(struct pkt_rx_queue *rxq, const char *ifname,
1073 		struct pkt_rx_queue *list_rxq, const char *list_ifname)
1074 {
1075 	bool exists = false;
1076 
1077 	if (rxq->xsk_queue_idx == list_rxq->xsk_queue_idx &&
1078 			!strncmp(ifname, list_ifname, IFNAMSIZ)) {
1079 		AF_XDP_LOG_LINE(ERR, "ctx %s,%i already exists, cannot share umem",
1080 					ifname, rxq->xsk_queue_idx);
1081 		exists = true;
1082 	}
1083 
1084 	return exists;
1085 }
1086 
1087 /* Get a pointer to an existing UMEM which overlays the rxq's mb_pool */
1088 static inline int
1089 get_shared_umem(struct pkt_rx_queue *rxq, const char *ifname,
1090 			struct xsk_umem_info **umem)
1091 {
1092 	struct internal_list *list;
1093 	struct pmd_internals *internals;
1094 	int i = 0, ret = 0;
1095 	struct rte_mempool *mb_pool = rxq->mb_pool;
1096 
1097 	if (mb_pool == NULL)
1098 		return ret;
1099 
1100 	pthread_mutex_lock(&internal_list_lock);
1101 
1102 	TAILQ_FOREACH(list, &internal_list, next) {
1103 		internals = list->eth_dev->data->dev_private;
1104 		for (i = 0; i < internals->queue_cnt; i++) {
1105 			struct pkt_rx_queue *list_rxq =
1106 						&internals->rx_queues[i];
1107 			if (rxq == list_rxq)
1108 				continue;
1109 			if (mb_pool == internals->rx_queues[i].mb_pool) {
1110 				if (ctx_exists(rxq, ifname, list_rxq,
1111 						internals->if_name)) {
1112 					ret = -1;
1113 					goto out;
1114 				}
1115 				if (rte_atomic_load_explicit(&internals->rx_queues[i].umem->refcnt,
1116 						    rte_memory_order_acquire)) {
1117 					*umem = internals->rx_queues[i].umem;
1118 					goto out;
1119 				}
1120 			}
1121 		}
1122 	}
1123 
1124 out:
1125 	pthread_mutex_unlock(&internal_list_lock);
1126 
1127 	return ret;
1128 }
1129 
1130 static struct
1131 xsk_umem_info *xdp_umem_configure(struct pmd_internals *internals,
1132 				  struct pkt_rx_queue *rxq)
1133 {
1134 	struct xsk_umem_info *umem = NULL;
1135 	int ret;
1136 	struct xsk_umem_config usr_config = {
1137 		.fill_size = ETH_AF_XDP_DFLT_NUM_DESCS * 2,
1138 		.comp_size = ETH_AF_XDP_DFLT_NUM_DESCS,
1139 		.flags = XDP_UMEM_UNALIGNED_CHUNK_FLAG};
1140 	struct rte_mempool *mb_pool = rxq->mb_pool;
1141 	void *aligned_addr;
1142 	uint64_t umem_size;
1143 	struct rte_mempool_mem_range_info range;
1144 
1145 	if (internals->shared_umem) {
1146 		if (get_shared_umem(rxq, internals->if_name, &umem) < 0)
1147 			return NULL;
1148 
1149 		if (umem != NULL &&
1150 			rte_atomic_load_explicit(&umem->refcnt, rte_memory_order_acquire) <
1151 					umem->max_xsks) {
1152 			AF_XDP_LOG_LINE(INFO, "%s,qid%i sharing UMEM",
1153 					internals->if_name, rxq->xsk_queue_idx);
1154 			rte_atomic_fetch_add_explicit(&umem->refcnt, 1, rte_memory_order_acquire);
1155 		}
1156 	}
1157 
1158 	if (umem == NULL) {
1159 		usr_config.frame_size =
1160 			rte_mempool_calc_obj_size(mb_pool->elt_size,
1161 						  mb_pool->flags, NULL);
1162 		usr_config.frame_headroom = mb_pool->header_size +
1163 						sizeof(struct rte_mbuf) +
1164 						rte_pktmbuf_priv_size(mb_pool) +
1165 						RTE_PKTMBUF_HEADROOM;
1166 
1167 		umem = rte_zmalloc_socket("umem", sizeof(*umem), 0,
1168 					  rte_socket_id());
1169 		if (umem == NULL) {
1170 			AF_XDP_LOG_LINE(ERR, "Failed to allocate umem info");
1171 			return NULL;
1172 		}
1173 
1174 		umem->mb_pool = mb_pool;
1175 		ret = rte_mempool_get_mem_range(mb_pool, &range);
1176 		if (ret < 0) {
1177 			AF_XDP_LOG_LINE(ERR, "Failed(%d) to get range from mempool", ret);
1178 			goto err;
1179 		}
1180 		if (!range.is_contiguous) {
1181 			AF_XDP_LOG_LINE(ERR, "Can't mapped to umem as mempool is not contiguous");
1182 			goto err;
1183 		}
1184 		/*
1185 		 * umem requires the memory area be page aligned, safe to map with a large area as
1186 		 * the memory pointer for each XSK TX/RX descriptor is derived from mbuf data area.
1187 		 */
1188 		aligned_addr = (void *)RTE_ALIGN_FLOOR((uintptr_t)range.start, getpagesize());
1189 		umem_size = range.length + RTE_PTR_DIFF(range.start, aligned_addr);
1190 		ret = xsk_umem__create(&umem->umem, aligned_addr, umem_size,
1191 				&rxq->fq, &rxq->cq, &usr_config);
1192 		if (ret) {
1193 			AF_XDP_LOG_LINE(ERR, "Failed to create umem [%d]: [%s]",
1194 				   errno, strerror(errno));
1195 			goto err;
1196 		}
1197 		umem->buffer = aligned_addr;
1198 
1199 		if (internals->shared_umem) {
1200 			umem->max_xsks = mb_pool->populated_size /
1201 						ETH_AF_XDP_NUM_BUFFERS;
1202 			AF_XDP_LOG_LINE(INFO, "Max xsks for UMEM %s: %u",
1203 						mb_pool->name, umem->max_xsks);
1204 		}
1205 
1206 		rte_atomic_store_explicit(&umem->refcnt, 1, rte_memory_order_release);
1207 	}
1208 
1209 	return umem;
1210 
1211 err:
1212 	xdp_umem_destroy(umem);
1213 	return NULL;
1214 }
1215 #else
1216 static struct
1217 xsk_umem_info *xdp_umem_configure(struct pmd_internals *internals,
1218 				  struct pkt_rx_queue *rxq)
1219 {
1220 	struct xsk_umem_info *umem;
1221 	const struct rte_memzone *mz;
1222 	struct xsk_umem_config usr_config = {
1223 		.fill_size = ETH_AF_XDP_DFLT_NUM_DESCS,
1224 		.comp_size = ETH_AF_XDP_DFLT_NUM_DESCS,
1225 		.frame_size = ETH_AF_XDP_FRAME_SIZE,
1226 		.frame_headroom = 0 };
1227 	char ring_name[RTE_RING_NAMESIZE];
1228 	char mz_name[RTE_MEMZONE_NAMESIZE];
1229 	int ret;
1230 	uint64_t i;
1231 
1232 	umem = rte_zmalloc_socket("umem", sizeof(*umem), 0, rte_socket_id());
1233 	if (umem == NULL) {
1234 		AF_XDP_LOG_LINE(ERR, "Failed to allocate umem info");
1235 		return NULL;
1236 	}
1237 
1238 	snprintf(ring_name, sizeof(ring_name), "af_xdp_ring_%s_%u",
1239 		       internals->if_name, rxq->xsk_queue_idx);
1240 	umem->buf_ring = rte_ring_create(ring_name,
1241 					 ETH_AF_XDP_NUM_BUFFERS,
1242 					 rte_socket_id(),
1243 					 0x0);
1244 	if (umem->buf_ring == NULL) {
1245 		AF_XDP_LOG_LINE(ERR, "Failed to create rte_ring");
1246 		goto err;
1247 	}
1248 
1249 	for (i = 0; i < ETH_AF_XDP_NUM_BUFFERS; i++)
1250 		rte_ring_enqueue(umem->buf_ring,
1251 				 (void *)(i * ETH_AF_XDP_FRAME_SIZE));
1252 
1253 	snprintf(mz_name, sizeof(mz_name), "af_xdp_umem_%s_%u",
1254 		       internals->if_name, rxq->xsk_queue_idx);
1255 	mz = rte_memzone_reserve_aligned(mz_name,
1256 			ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE,
1257 			rte_socket_id(), RTE_MEMZONE_IOVA_CONTIG,
1258 			getpagesize());
1259 	if (mz == NULL) {
1260 		AF_XDP_LOG_LINE(ERR, "Failed to reserve memzone for af_xdp umem.");
1261 		goto err;
1262 	}
1263 	umem->mz = mz;
1264 
1265 	ret = xsk_umem__create(&umem->umem, mz->addr,
1266 			       ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE,
1267 			       &rxq->fq, &rxq->cq,
1268 			       &usr_config);
1269 
1270 	if (ret) {
1271 		AF_XDP_LOG_LINE(ERR, "Failed to create umem");
1272 		goto err;
1273 	}
1274 
1275 	return umem;
1276 
1277 err:
1278 	xdp_umem_destroy(umem);
1279 	return NULL;
1280 }
1281 #endif
1282 
1283 static int
1284 get_pinned_map(const char *dp_path, int *map_fd)
1285 {
1286 	*map_fd  = bpf_obj_get(dp_path);
1287 	if (!*map_fd) {
1288 		AF_XDP_LOG_LINE(ERR, "Failed to find xsks_map in %s", dp_path);
1289 		return -1;
1290 	}
1291 
1292 	AF_XDP_LOG_LINE(INFO, "Successfully retrieved map %s with fd %d",
1293 				dp_path, *map_fd);
1294 
1295 	return 0;
1296 }
1297 
1298 static int
1299 load_custom_xdp_prog(const char *prog_path, int if_index, struct bpf_map **map)
1300 {
1301 	int ret, prog_fd;
1302 	struct bpf_object *obj;
1303 
1304 	prog_fd = load_program(prog_path, &obj);
1305 	if (prog_fd < 0) {
1306 		AF_XDP_LOG_LINE(ERR, "Failed to load program %s", prog_path);
1307 		return -1;
1308 	}
1309 
1310 	/*
1311 	 * The loaded program must provision for a map of xsks, such that some
1312 	 * traffic can be redirected to userspace.
1313 	 */
1314 	*map = bpf_object__find_map_by_name(obj, "xsks_map");
1315 	if (!*map) {
1316 		AF_XDP_LOG_LINE(ERR, "Failed to find xsks_map in %s", prog_path);
1317 		return -1;
1318 	}
1319 
1320 	/* Link the program with the given network device */
1321 	ret = link_xdp_prog_with_dev(if_index, prog_fd,
1322 					XDP_FLAGS_UPDATE_IF_NOEXIST);
1323 	if (ret) {
1324 		AF_XDP_LOG_LINE(ERR, "Failed to set prog fd %d on interface",
1325 				prog_fd);
1326 		return -1;
1327 	}
1328 
1329 	AF_XDP_LOG_LINE(INFO, "Successfully loaded XDP program %s with fd %d",
1330 				prog_path, prog_fd);
1331 
1332 	return 0;
1333 }
1334 
1335 /* Detect support for busy polling through setsockopt(). */
1336 static int
1337 configure_preferred_busy_poll(struct pkt_rx_queue *rxq)
1338 {
1339 	int sock_opt = 1;
1340 	int fd = xsk_socket__fd(rxq->xsk);
1341 	int ret = 0;
1342 
1343 	ret = setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL,
1344 			(void *)&sock_opt, sizeof(sock_opt));
1345 	if (ret < 0) {
1346 		AF_XDP_LOG_LINE(DEBUG, "Failed to set SO_PREFER_BUSY_POLL");
1347 		goto err_prefer;
1348 	}
1349 
1350 	sock_opt = ETH_AF_XDP_DFLT_BUSY_TIMEOUT;
1351 	ret = setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt,
1352 			sizeof(sock_opt));
1353 	if (ret < 0) {
1354 		AF_XDP_LOG_LINE(DEBUG, "Failed to set SO_BUSY_POLL");
1355 		goto err_timeout;
1356 	}
1357 
1358 	sock_opt = rxq->busy_budget;
1359 	ret = setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET,
1360 			(void *)&sock_opt, sizeof(sock_opt));
1361 	if (ret < 0) {
1362 		AF_XDP_LOG_LINE(DEBUG, "Failed to set SO_BUSY_POLL_BUDGET");
1363 	} else {
1364 		AF_XDP_LOG_LINE(INFO, "Busy polling budget set to: %u",
1365 					rxq->busy_budget);
1366 		return 0;
1367 	}
1368 
1369 	/* setsockopt failure - attempt to restore xsk to default state and
1370 	 * proceed without busy polling support.
1371 	 */
1372 	sock_opt = 0;
1373 	ret = setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt,
1374 			sizeof(sock_opt));
1375 	if (ret < 0) {
1376 		AF_XDP_LOG_LINE(ERR, "Failed to unset SO_BUSY_POLL");
1377 		return -1;
1378 	}
1379 
1380 err_timeout:
1381 	sock_opt = 0;
1382 	ret = setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL,
1383 			(void *)&sock_opt, sizeof(sock_opt));
1384 	if (ret < 0) {
1385 		AF_XDP_LOG_LINE(ERR, "Failed to unset SO_PREFER_BUSY_POLL");
1386 		return -1;
1387 	}
1388 
1389 err_prefer:
1390 	rxq->busy_budget = 0;
1391 	return 0;
1392 }
1393 
1394 static int
1395 init_uds_sock(struct sockaddr_un *server, const char *dp_path)
1396 {
1397 	int sock;
1398 
1399 	sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
1400 	if (sock < 0) {
1401 		AF_XDP_LOG_LINE(ERR, "Failed to opening stream socket");
1402 		return -1;
1403 	}
1404 
1405 	server->sun_family = AF_UNIX;
1406 	strlcpy(server->sun_path, dp_path, sizeof(server->sun_path));
1407 
1408 	if (connect(sock, (struct sockaddr *)server, sizeof(struct sockaddr_un)) < 0) {
1409 		close(sock);
1410 		AF_XDP_LOG_LINE(ERR, "Error connecting stream socket errno = [%d]: [%s]",
1411 			   errno, strerror(errno));
1412 		return -1;
1413 	}
1414 
1415 	return sock;
1416 }
1417 
1418 struct msg_internal {
1419 	char response[UDS_MAX_CMD_RESP];
1420 	int len_param;
1421 	int num_fds;
1422 	int fds[UDS_MAX_FD_NUM];
1423 };
1424 
1425 static int
1426 send_msg(int sock, char *request, int *fd, const char *dp_path)
1427 {
1428 	int snd;
1429 	struct iovec iov;
1430 	struct msghdr msgh;
1431 	struct cmsghdr *cmsg;
1432 	struct sockaddr_un dst;
1433 	char control[CMSG_SPACE(sizeof(*fd))];
1434 
1435 	memset(&dst, 0, sizeof(dst));
1436 	dst.sun_family = AF_UNIX;
1437 	strlcpy(dst.sun_path, dp_path, sizeof(dst.sun_path));
1438 
1439 	/* Initialize message header structure */
1440 	memset(&msgh, 0, sizeof(msgh));
1441 	memset(control, 0, sizeof(control));
1442 	iov.iov_base = request;
1443 	iov.iov_len = strlen(request);
1444 
1445 	msgh.msg_name = &dst;
1446 	msgh.msg_namelen = sizeof(dst);
1447 	msgh.msg_iov = &iov;
1448 	msgh.msg_iovlen = 1;
1449 	msgh.msg_control = control;
1450 	msgh.msg_controllen = sizeof(control);
1451 
1452 	/* Translate the FD. */
1453 	cmsg = CMSG_FIRSTHDR(&msgh);
1454 	cmsg->cmsg_len = CMSG_LEN(sizeof(*fd));
1455 	cmsg->cmsg_level = SOL_SOCKET;
1456 	cmsg->cmsg_type = SCM_RIGHTS;
1457 	memcpy(CMSG_DATA(cmsg), fd, sizeof(*fd));
1458 
1459 	/* Send the request message. */
1460 	do {
1461 		snd = sendmsg(sock, &msgh, 0);
1462 	} while (snd < 0 && errno == EINTR);
1463 
1464 	return snd;
1465 }
1466 
1467 static int
1468 read_msg(int sock, char *response, struct sockaddr_un *s, int *fd)
1469 {
1470 	int msglen;
1471 	struct msghdr msgh;
1472 	struct iovec iov;
1473 	char control[CMSG_SPACE(sizeof(*fd))];
1474 	struct cmsghdr *cmsg;
1475 
1476 	/* Initialize message header structure */
1477 	memset(&msgh, 0, sizeof(msgh));
1478 	iov.iov_base = response;
1479 	iov.iov_len = UDS_MAX_CMD_RESP;
1480 
1481 	msgh.msg_name = s;
1482 	msgh.msg_namelen = sizeof(*s);
1483 	msgh.msg_iov = &iov;
1484 	msgh.msg_iovlen = 1;
1485 	msgh.msg_control = control;
1486 	msgh.msg_controllen = sizeof(control);
1487 
1488 	msglen = recvmsg(sock, &msgh, 0);
1489 
1490 	/* zero length message means socket was closed */
1491 	if (msglen == 0)
1492 		return 0;
1493 
1494 	if (msglen < 0) {
1495 		AF_XDP_LOG_LINE(ERR, "recvmsg failed, %s", strerror(errno));
1496 		return -1;
1497 	}
1498 
1499 	/* read auxiliary FDs if any */
1500 	for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
1501 			cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
1502 		if (cmsg->cmsg_level == SOL_SOCKET &&
1503 				cmsg->cmsg_type == SCM_RIGHTS) {
1504 			memcpy(fd, CMSG_DATA(cmsg), sizeof(*fd));
1505 			break;
1506 		}
1507 	}
1508 
1509 	response[msglen] = '\0';
1510 	return msglen;
1511 }
1512 
1513 static int
1514 make_request_dp(int sock, struct sockaddr_un *server, char *request,
1515 		 int *req_fd, char *response, int *out_fd, const char *dp_path)
1516 {
1517 	int rval;
1518 
1519 	AF_XDP_LOG_LINE(DEBUG, "Request: [%s]", request);
1520 
1521 	/* if no file descriptor to send then directly write to socket.
1522 	 * else use sendmsg() to send the file descriptor.
1523 	 */
1524 	if (req_fd == NULL)
1525 		rval = write(sock, request, strlen(request));
1526 	else
1527 		rval = send_msg(sock, request, req_fd, dp_path);
1528 
1529 	if (rval < 0) {
1530 		AF_XDP_LOG_LINE(ERR, "Write error %s", strerror(errno));
1531 		return -1;
1532 	}
1533 
1534 	rval = read_msg(sock, response, server, out_fd);
1535 	if (rval <= 0) {
1536 		AF_XDP_LOG_LINE(ERR, "Read error %d", rval);
1537 		return -1;
1538 	}
1539 	AF_XDP_LOG_LINE(DEBUG, "Response: [%s]", request);
1540 
1541 	return 0;
1542 }
1543 
1544 static int
1545 check_response(char *response, char *exp_resp, long size)
1546 {
1547 	return strncmp(response, exp_resp, size);
1548 }
1549 
1550 static int
1551 uds_get_xskmap_fd(char *if_name, const char *dp_path)
1552 {
1553 	char request[UDS_MAX_CMD_LEN], response[UDS_MAX_CMD_RESP];
1554 	char hostname[MAX_LONG_OPT_SZ], exp_resp[UDS_MAX_CMD_RESP];
1555 	struct sockaddr_un server;
1556 	int xsk_map_fd = -1, out_fd = 0;
1557 	int sock, err;
1558 
1559 	err = gethostname(hostname, MAX_LONG_OPT_SZ - 1);
1560 	if (err)
1561 		return -1;
1562 
1563 	memset(&server, 0, sizeof(server));
1564 	sock = init_uds_sock(&server, dp_path);
1565 	if (sock < 0)
1566 		return -1;
1567 
1568 	/* Initiates handshake to the AF_XDP Device Plugin send: /connect,hostname */
1569 	snprintf(request, sizeof(request), "%s,%s", UDS_CONNECT_MSG, hostname);
1570 	memset(response, 0, sizeof(response));
1571 	if (make_request_dp(sock, &server, request, NULL, response, &out_fd, dp_path) < 0) {
1572 		AF_XDP_LOG_LINE(ERR, "Error in processing cmd [%s]", request);
1573 		goto err_close;
1574 	}
1575 
1576 	/* Expect /host_ok */
1577 	strlcpy(exp_resp, UDS_HOST_OK_MSG, UDS_MAX_CMD_LEN);
1578 	if (check_response(response, exp_resp, strlen(exp_resp)) < 0) {
1579 		AF_XDP_LOG_LINE(ERR, "Unexpected response [%s]", response);
1580 		goto err_close;
1581 	}
1582 	/* Request for "/version" */
1583 	strlcpy(request, UDS_VERSION_MSG, UDS_MAX_CMD_LEN);
1584 	memset(response, 0, sizeof(response));
1585 	if (make_request_dp(sock, &server, request, NULL, response, &out_fd, dp_path) < 0) {
1586 		AF_XDP_LOG_LINE(ERR, "Error in processing cmd [%s]", request);
1587 		goto err_close;
1588 	}
1589 
1590 	/* Request for file descriptor for netdev name*/
1591 	snprintf(request, sizeof(request), "%s,%s", UDS_XSK_MAP_FD_MSG, if_name);
1592 	memset(response, 0, sizeof(response));
1593 	if (make_request_dp(sock, &server, request, NULL, response, &out_fd, dp_path) < 0) {
1594 		AF_XDP_LOG_LINE(ERR, "Error in processing cmd [%s]", request);
1595 		goto err_close;
1596 	}
1597 
1598 	if (out_fd < 0) {
1599 		AF_XDP_LOG_LINE(ERR, "Error in processing cmd [%s]", request);
1600 		goto err_close;
1601 	}
1602 
1603 	xsk_map_fd = out_fd;
1604 
1605 	/* Expect fd_ack with file descriptor */
1606 	strlcpy(exp_resp, UDS_FD_ACK_MSG, UDS_MAX_CMD_LEN);
1607 	if (check_response(response, exp_resp, strlen(exp_resp)) < 0) {
1608 		AF_XDP_LOG_LINE(ERR, "Unexpected response [%s]", response);
1609 		goto err_close;
1610 	}
1611 
1612 	/* Initiate close connection */
1613 	strlcpy(request, UDS_FIN_MSG, UDS_MAX_CMD_LEN);
1614 	memset(response, 0, sizeof(response));
1615 	if (make_request_dp(sock, &server, request, NULL, response, &out_fd, dp_path) < 0) {
1616 		AF_XDP_LOG_LINE(ERR, "Error in processing cmd [%s]", request);
1617 		goto err_close;
1618 	}
1619 
1620 	/* Connection close */
1621 	strlcpy(exp_resp, UDS_FIN_ACK_MSG, UDS_MAX_CMD_LEN);
1622 	if (check_response(response, exp_resp, strlen(exp_resp)) < 0) {
1623 		AF_XDP_LOG_LINE(ERR, "Unexpected response [%s]", response);
1624 		goto err_close;
1625 	}
1626 	close(sock);
1627 
1628 	return xsk_map_fd;
1629 
1630 err_close:
1631 	close(sock);
1632 	return -1;
1633 }
1634 
1635 static int
1636 xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
1637 	      int ring_size)
1638 {
1639 	struct xsk_socket_config cfg;
1640 	struct pkt_tx_queue *txq = rxq->pair;
1641 	int ret = 0;
1642 	int reserve_size = ETH_AF_XDP_DFLT_NUM_DESCS;
1643 	struct rte_mbuf *fq_bufs[reserve_size];
1644 	bool reserve_before;
1645 
1646 	rxq->umem = xdp_umem_configure(internals, rxq);
1647 	if (rxq->umem == NULL)
1648 		return -ENOMEM;
1649 	txq->umem = rxq->umem;
1650 	reserve_before = rte_atomic_load_explicit(&rxq->umem->refcnt,
1651 			rte_memory_order_acquire) <= 1;
1652 
1653 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
1654 	ret = rte_pktmbuf_alloc_bulk(rxq->umem->mb_pool, fq_bufs, reserve_size);
1655 	if (ret) {
1656 		AF_XDP_LOG_LINE(DEBUG, "Failed to get enough buffers for fq.");
1657 		goto out_umem;
1658 	}
1659 #endif
1660 
1661 	/* reserve fill queue of queues not (yet) sharing UMEM */
1662 	if (reserve_before) {
1663 		ret = reserve_fill_queue(rxq->umem, reserve_size, fq_bufs, &rxq->fq);
1664 		if (ret) {
1665 			AF_XDP_LOG_LINE(ERR, "Failed to reserve fill queue.");
1666 			goto out_umem;
1667 		}
1668 	}
1669 
1670 	cfg.rx_size = ring_size;
1671 	cfg.tx_size = ring_size;
1672 	cfg.libbpf_flags = 0;
1673 	cfg.xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST;
1674 	cfg.bind_flags = 0;
1675 
1676 	/* Force AF_XDP socket into copy mode when users want it */
1677 	if (internals->force_copy)
1678 		cfg.bind_flags |= XDP_COPY;
1679 
1680 #if defined(XDP_USE_NEED_WAKEUP)
1681 	cfg.bind_flags |= XDP_USE_NEED_WAKEUP;
1682 #endif
1683 
1684 	/* Disable libbpf from loading XDP program */
1685 	if (internals->use_cni || internals->use_pinned_map)
1686 		cfg.libbpf_flags |= XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD;
1687 
1688 	if (strnlen(internals->prog_path, PATH_MAX)) {
1689 		if (!internals->custom_prog_configured) {
1690 			ret = load_custom_xdp_prog(internals->prog_path,
1691 							internals->if_index,
1692 							&internals->map);
1693 			if (ret) {
1694 				AF_XDP_LOG_LINE(ERR, "Failed to load custom XDP program %s",
1695 						internals->prog_path);
1696 				goto out_umem;
1697 			}
1698 			internals->custom_prog_configured = 1;
1699 		}
1700 		cfg.libbpf_flags |= XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD;
1701 	}
1702 
1703 	if (internals->shared_umem)
1704 		ret = create_shared_socket(&rxq->xsk, internals->if_name,
1705 				rxq->xsk_queue_idx, rxq->umem->umem, &rxq->rx,
1706 				&txq->tx, &rxq->fq, &rxq->cq, &cfg);
1707 	else
1708 		ret = xsk_socket__create(&rxq->xsk, internals->if_name,
1709 				rxq->xsk_queue_idx, rxq->umem->umem, &rxq->rx,
1710 				&txq->tx, &cfg);
1711 
1712 	if (ret) {
1713 		AF_XDP_LOG_LINE(ERR, "Failed to create xsk socket.");
1714 		goto out_umem;
1715 	}
1716 
1717 	if (!reserve_before) {
1718 		/* reserve fill queue of queues sharing UMEM */
1719 		ret = reserve_fill_queue(rxq->umem, reserve_size, fq_bufs, &rxq->fq);
1720 		if (ret) {
1721 			AF_XDP_LOG_LINE(ERR, "Failed to reserve fill queue.");
1722 			goto out_xsk;
1723 		}
1724 	}
1725 
1726 	/* insert the xsk into the xsks_map */
1727 	if (internals->custom_prog_configured) {
1728 		int err, fd;
1729 
1730 		fd = xsk_socket__fd(rxq->xsk);
1731 		err = bpf_map_update_elem(bpf_map__fd(internals->map),
1732 					  &rxq->xsk_queue_idx, &fd, 0);
1733 		if (err) {
1734 			AF_XDP_LOG_LINE(ERR, "Failed to insert xsk in map.");
1735 			goto out_xsk;
1736 		}
1737 	}
1738 
1739 	if (internals->use_cni || internals->use_pinned_map) {
1740 		int err, map_fd;
1741 
1742 		if (internals->use_cni) {
1743 			/* get socket fd from AF_XDP Device Plugin */
1744 			map_fd = uds_get_xskmap_fd(internals->if_name, internals->dp_path);
1745 			if (map_fd < 0) {
1746 				AF_XDP_LOG_LINE(ERR, "Failed to receive xskmap fd from AF_XDP Device Plugin");
1747 				goto out_xsk;
1748 			}
1749 		} else {
1750 			/* get socket fd from AF_XDP plugin */
1751 			err = get_pinned_map(internals->dp_path, &map_fd);
1752 			if (err < 0 || map_fd < 0) {
1753 				AF_XDP_LOG_LINE(ERR, "Failed to retrieve pinned map fd");
1754 				goto out_xsk;
1755 			}
1756 		}
1757 
1758 		err = update_xskmap(rxq->xsk, map_fd, rxq->xsk_queue_idx);
1759 		if (err) {
1760 			AF_XDP_LOG_LINE(ERR, "Failed to insert xsk in map.");
1761 			goto out_xsk;
1762 		}
1763 
1764 	} else if (rxq->busy_budget) {
1765 		ret = configure_preferred_busy_poll(rxq);
1766 		if (ret) {
1767 			AF_XDP_LOG_LINE(ERR, "Failed configure busy polling.");
1768 			goto out_xsk;
1769 		}
1770 	}
1771 
1772 	return 0;
1773 
1774 out_xsk:
1775 	xsk_socket__delete(rxq->xsk);
1776 out_umem:
1777 	if (rte_atomic_fetch_sub_explicit(&rxq->umem->refcnt, 1, rte_memory_order_acquire) - 1 == 0)
1778 		xdp_umem_destroy(rxq->umem);
1779 
1780 	return ret;
1781 }
1782 
1783 static int
1784 eth_rx_queue_setup(struct rte_eth_dev *dev,
1785 		   uint16_t rx_queue_id,
1786 		   uint16_t nb_rx_desc,
1787 		   unsigned int socket_id __rte_unused,
1788 		   const struct rte_eth_rxconf *rx_conf __rte_unused,
1789 		   struct rte_mempool *mb_pool)
1790 {
1791 	struct pmd_internals *internals = dev->data->dev_private;
1792 	struct pmd_process_private *process_private = dev->process_private;
1793 	struct pkt_rx_queue *rxq;
1794 	int ret;
1795 
1796 	rxq = &internals->rx_queues[rx_queue_id];
1797 
1798 	AF_XDP_LOG_LINE(INFO, "Set up rx queue, rx queue id: %d, xsk queue id: %d",
1799 		   rx_queue_id, rxq->xsk_queue_idx);
1800 
1801 #ifndef XDP_UMEM_UNALIGNED_CHUNK_FLAG
1802 	uint32_t buf_size, data_size;
1803 
1804 	/* Now get the space available for data in the mbuf */
1805 	buf_size = rte_pktmbuf_data_room_size(mb_pool) -
1806 		RTE_PKTMBUF_HEADROOM;
1807 	data_size = ETH_AF_XDP_FRAME_SIZE;
1808 
1809 	if (data_size > buf_size) {
1810 		AF_XDP_LOG_LINE(ERR, "%s: %d bytes will not fit in mbuf (%d bytes)",
1811 			dev->device->name, data_size, buf_size);
1812 		ret = -ENOMEM;
1813 		goto err;
1814 	}
1815 #endif
1816 
1817 	rxq->mb_pool = mb_pool;
1818 
1819 	if (xsk_configure(internals, rxq, nb_rx_desc)) {
1820 		AF_XDP_LOG_LINE(ERR, "Failed to configure xdp socket");
1821 		ret = -EINVAL;
1822 		goto err;
1823 	}
1824 
1825 	if (!rxq->busy_budget)
1826 		AF_XDP_LOG_LINE(DEBUG, "Preferred busy polling not enabled");
1827 
1828 	rxq->fds[0].fd = xsk_socket__fd(rxq->xsk);
1829 	rxq->fds[0].events = POLLIN;
1830 
1831 	process_private->rxq_xsk_fds[rx_queue_id] = rxq->fds[0].fd;
1832 
1833 	rxq->port = dev->data->port_id;
1834 
1835 	dev->data->rx_queues[rx_queue_id] = rxq;
1836 	return 0;
1837 
1838 err:
1839 	return ret;
1840 }
1841 
1842 static int
1843 eth_tx_queue_setup(struct rte_eth_dev *dev,
1844 		   uint16_t tx_queue_id,
1845 		   uint16_t nb_tx_desc __rte_unused,
1846 		   unsigned int socket_id __rte_unused,
1847 		   const struct rte_eth_txconf *tx_conf __rte_unused)
1848 {
1849 	struct pmd_internals *internals = dev->data->dev_private;
1850 	struct pkt_tx_queue *txq;
1851 
1852 	txq = &internals->tx_queues[tx_queue_id];
1853 
1854 	dev->data->tx_queues[tx_queue_id] = txq;
1855 	return 0;
1856 }
1857 
1858 static int
1859 eth_dev_mtu_set(struct rte_eth_dev *dev, uint16_t mtu)
1860 {
1861 	struct pmd_internals *internals = dev->data->dev_private;
1862 	struct ifreq ifr = { .ifr_mtu = mtu };
1863 	int ret;
1864 	int s;
1865 
1866 	s = socket(PF_INET, SOCK_DGRAM, 0);
1867 	if (s < 0)
1868 		return -EINVAL;
1869 
1870 	strlcpy(ifr.ifr_name, internals->if_name, IFNAMSIZ);
1871 	ret = ioctl(s, SIOCSIFMTU, &ifr);
1872 	close(s);
1873 
1874 	return (ret < 0) ? -errno : 0;
1875 }
1876 
1877 static int
1878 eth_dev_change_flags(char *if_name, uint32_t flags, uint32_t mask)
1879 {
1880 	struct ifreq ifr;
1881 	int ret = 0;
1882 	int s;
1883 
1884 	s = socket(PF_INET, SOCK_DGRAM, 0);
1885 	if (s < 0)
1886 		return -errno;
1887 
1888 	strlcpy(ifr.ifr_name, if_name, IFNAMSIZ);
1889 	if (ioctl(s, SIOCGIFFLAGS, &ifr) < 0) {
1890 		ret = -errno;
1891 		goto out;
1892 	}
1893 	ifr.ifr_flags &= mask;
1894 	ifr.ifr_flags |= flags;
1895 	if (ioctl(s, SIOCSIFFLAGS, &ifr) < 0) {
1896 		ret = -errno;
1897 		goto out;
1898 	}
1899 out:
1900 	close(s);
1901 	return ret;
1902 }
1903 
1904 static int
1905 eth_dev_promiscuous_enable(struct rte_eth_dev *dev)
1906 {
1907 	struct pmd_internals *internals = dev->data->dev_private;
1908 
1909 	return eth_dev_change_flags(internals->if_name, IFF_PROMISC, ~0);
1910 }
1911 
1912 static int
1913 eth_dev_promiscuous_disable(struct rte_eth_dev *dev)
1914 {
1915 	struct pmd_internals *internals = dev->data->dev_private;
1916 
1917 	return eth_dev_change_flags(internals->if_name, 0, ~IFF_PROMISC);
1918 }
1919 
1920 static const struct eth_dev_ops ops = {
1921 	.dev_start = eth_dev_start,
1922 	.dev_stop = eth_dev_stop,
1923 	.dev_close = eth_dev_close,
1924 	.dev_configure = eth_dev_configure,
1925 	.dev_infos_get = eth_dev_info,
1926 	.mtu_set = eth_dev_mtu_set,
1927 	.promiscuous_enable = eth_dev_promiscuous_enable,
1928 	.promiscuous_disable = eth_dev_promiscuous_disable,
1929 	.rx_queue_setup = eth_rx_queue_setup,
1930 	.tx_queue_setup = eth_tx_queue_setup,
1931 	.link_update = eth_link_update,
1932 	.stats_get = eth_stats_get,
1933 	.stats_reset = eth_stats_reset,
1934 	.get_monitor_addr = eth_get_monitor_addr,
1935 };
1936 
1937 /* AF_XDP Device Plugin option works in unprivileged
1938  * container environments and ethernet device functionality
1939  * will be reduced. So additional customised eth_dev_ops
1940  * struct is needed for the Device Plugin. Promiscuous
1941  * enable and disable functionality is removed.
1942  **/
1943 static const struct eth_dev_ops ops_afxdp_dp = {
1944 	.dev_start = eth_dev_start,
1945 	.dev_stop = eth_dev_stop,
1946 	.dev_close = eth_dev_close,
1947 	.dev_configure = eth_dev_configure,
1948 	.dev_infos_get = eth_dev_info,
1949 	.mtu_set = eth_dev_mtu_set,
1950 	.rx_queue_setup = eth_rx_queue_setup,
1951 	.tx_queue_setup = eth_tx_queue_setup,
1952 	.link_update = eth_link_update,
1953 	.stats_get = eth_stats_get,
1954 	.stats_reset = eth_stats_reset,
1955 	.get_monitor_addr = eth_get_monitor_addr,
1956 };
1957 
1958 /** parse busy_budget argument */
1959 static int
1960 parse_budget_arg(const char *key __rte_unused,
1961 		  const char *value, void *extra_args)
1962 {
1963 	int *i = (int *)extra_args;
1964 	char *end;
1965 
1966 	*i = strtol(value, &end, 10);
1967 	if (*i < 0 || *i > UINT16_MAX) {
1968 		AF_XDP_LOG_LINE(ERR, "Invalid busy_budget %i, must be >= 0 and <= %u",
1969 				*i, UINT16_MAX);
1970 		return -EINVAL;
1971 	}
1972 
1973 	return 0;
1974 }
1975 
1976 /** parse integer from integer argument */
1977 static int
1978 parse_integer_arg(const char *key __rte_unused,
1979 		  const char *value, void *extra_args)
1980 {
1981 	int *i = (int *)extra_args;
1982 	char *end;
1983 
1984 	*i = strtol(value, &end, 10);
1985 	if (*i < 0) {
1986 		AF_XDP_LOG_LINE(ERR, "Argument has to be positive.");
1987 		return -EINVAL;
1988 	}
1989 
1990 	return 0;
1991 }
1992 
1993 /** parse name argument */
1994 static int
1995 parse_name_arg(const char *key __rte_unused,
1996 	       const char *value, void *extra_args)
1997 {
1998 	char *name = extra_args;
1999 
2000 	if (strnlen(value, IFNAMSIZ) > IFNAMSIZ - 1) {
2001 		AF_XDP_LOG_LINE(ERR, "Invalid name %s, should be less than %u bytes.",
2002 			   value, IFNAMSIZ);
2003 		return -EINVAL;
2004 	}
2005 
2006 	strlcpy(name, value, IFNAMSIZ);
2007 
2008 	return 0;
2009 }
2010 
2011 /** parse xdp prog argument */
2012 static int
2013 parse_prog_arg(const char *key __rte_unused,
2014 	       const char *value, void *extra_args)
2015 {
2016 	char *path = extra_args;
2017 
2018 	if (strnlen(value, PATH_MAX) == PATH_MAX) {
2019 		AF_XDP_LOG_LINE(ERR, "Invalid path %s, should be less than %u bytes.",
2020 			   value, PATH_MAX);
2021 		return -EINVAL;
2022 	}
2023 
2024 	if (access(value, F_OK) != 0) {
2025 		AF_XDP_LOG_LINE(ERR, "Error accessing %s: %s",
2026 			   value, strerror(errno));
2027 		return -EINVAL;
2028 	}
2029 
2030 	strlcpy(path, value, PATH_MAX);
2031 
2032 	return 0;
2033 }
2034 
2035 static int
2036 xdp_get_channels_info(const char *if_name, int *max_queues,
2037 				int *combined_queues)
2038 {
2039 	struct ethtool_channels channels;
2040 	struct ifreq ifr;
2041 	int fd, ret;
2042 
2043 	fd = socket(AF_INET, SOCK_DGRAM, 0);
2044 	if (fd < 0)
2045 		return -1;
2046 
2047 	channels.cmd = ETHTOOL_GCHANNELS;
2048 	ifr.ifr_data = (void *)&channels;
2049 	strlcpy(ifr.ifr_name, if_name, IFNAMSIZ);
2050 	ret = ioctl(fd, SIOCETHTOOL, &ifr);
2051 	if (ret) {
2052 		if (errno == EOPNOTSUPP) {
2053 			ret = 0;
2054 		} else {
2055 			ret = -errno;
2056 			goto out;
2057 		}
2058 	}
2059 
2060 	if (channels.max_combined == 0 || errno == EOPNOTSUPP) {
2061 		/* If the device says it has no channels, then all traffic
2062 		 * is sent to a single stream, so max queues = 1.
2063 		 */
2064 		*max_queues = 1;
2065 		*combined_queues = 1;
2066 	} else {
2067 		*max_queues = channels.max_combined;
2068 		*combined_queues = channels.combined_count;
2069 	}
2070 
2071  out:
2072 	close(fd);
2073 	return ret;
2074 }
2075 
2076 static int
2077 parse_parameters(struct rte_kvargs *kvlist, char *if_name, int *start_queue,
2078 		 int *queue_cnt, int *shared_umem, char *prog_path,
2079 		 int *busy_budget, int *force_copy, int *use_cni,
2080 		 int *use_pinned_map, char *dp_path)
2081 {
2082 	int ret;
2083 
2084 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_IFACE_ARG,
2085 				 &parse_name_arg, if_name);
2086 	if (ret < 0)
2087 		goto free_kvlist;
2088 
2089 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_START_QUEUE_ARG,
2090 				 &parse_integer_arg, start_queue);
2091 	if (ret < 0)
2092 		goto free_kvlist;
2093 
2094 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_QUEUE_COUNT_ARG,
2095 				 &parse_integer_arg, queue_cnt);
2096 	if (ret < 0 || *queue_cnt <= 0) {
2097 		ret = -EINVAL;
2098 		goto free_kvlist;
2099 	}
2100 
2101 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_SHARED_UMEM_ARG,
2102 				&parse_integer_arg, shared_umem);
2103 	if (ret < 0)
2104 		goto free_kvlist;
2105 
2106 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_PROG_ARG,
2107 				 &parse_prog_arg, prog_path);
2108 	if (ret < 0)
2109 		goto free_kvlist;
2110 
2111 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_BUDGET_ARG,
2112 				&parse_budget_arg, busy_budget);
2113 	if (ret < 0)
2114 		goto free_kvlist;
2115 
2116 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_FORCE_COPY_ARG,
2117 				&parse_integer_arg, force_copy);
2118 	if (ret < 0)
2119 		goto free_kvlist;
2120 
2121 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_USE_CNI_ARG,
2122 				 &parse_integer_arg, use_cni);
2123 	if (ret < 0)
2124 		goto free_kvlist;
2125 
2126 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_USE_PINNED_MAP_ARG,
2127 				 &parse_integer_arg, use_pinned_map);
2128 	if (ret < 0)
2129 		goto free_kvlist;
2130 
2131 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_DP_PATH_ARG,
2132 				 &parse_prog_arg, dp_path);
2133 	if (ret < 0)
2134 		goto free_kvlist;
2135 
2136 free_kvlist:
2137 	rte_kvargs_free(kvlist);
2138 	return ret;
2139 }
2140 
2141 static int
2142 get_iface_info(const char *if_name,
2143 	       struct rte_ether_addr *eth_addr,
2144 	       int *if_index)
2145 {
2146 	struct ifreq ifr;
2147 	int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP);
2148 
2149 	if (sock < 0)
2150 		return -1;
2151 
2152 	strlcpy(ifr.ifr_name, if_name, IFNAMSIZ);
2153 	if (ioctl(sock, SIOCGIFINDEX, &ifr))
2154 		goto error;
2155 
2156 	*if_index = ifr.ifr_ifindex;
2157 
2158 	if (ioctl(sock, SIOCGIFHWADDR, &ifr))
2159 		goto error;
2160 
2161 	rte_memcpy(eth_addr, ifr.ifr_hwaddr.sa_data, RTE_ETHER_ADDR_LEN);
2162 
2163 	close(sock);
2164 	return 0;
2165 
2166 error:
2167 	close(sock);
2168 	return -1;
2169 }
2170 
2171 static struct rte_eth_dev *
2172 init_internals(struct rte_vdev_device *dev, const char *if_name,
2173 	       int start_queue_idx, int queue_cnt, int shared_umem,
2174 	       const char *prog_path, int busy_budget, int force_copy,
2175 	       int use_cni, int use_pinned_map, const char *dp_path)
2176 {
2177 	const char *name = rte_vdev_device_name(dev);
2178 	const unsigned int numa_node = dev->device.numa_node;
2179 	struct pmd_process_private *process_private;
2180 	struct pmd_internals *internals;
2181 	struct rte_eth_dev *eth_dev;
2182 	int ret;
2183 	int i;
2184 
2185 	internals = rte_zmalloc_socket(name, sizeof(*internals), 0, numa_node);
2186 	if (internals == NULL)
2187 		return NULL;
2188 
2189 	internals->start_queue_idx = start_queue_idx;
2190 	internals->queue_cnt = queue_cnt;
2191 	strlcpy(internals->if_name, if_name, IFNAMSIZ);
2192 	strlcpy(internals->prog_path, prog_path, PATH_MAX);
2193 	internals->custom_prog_configured = 0;
2194 
2195 #ifndef ETH_AF_XDP_SHARED_UMEM
2196 	if (shared_umem) {
2197 		AF_XDP_LOG_LINE(ERR, "Shared UMEM feature not available. "
2198 				"Check kernel and libbpf version");
2199 		goto err_free_internals;
2200 	}
2201 #endif
2202 	internals->shared_umem = shared_umem;
2203 	internals->force_copy = force_copy;
2204 	internals->use_cni = use_cni;
2205 	internals->use_pinned_map = use_pinned_map;
2206 	strlcpy(internals->dp_path, dp_path, PATH_MAX);
2207 
2208 	if (xdp_get_channels_info(if_name, &internals->max_queue_cnt,
2209 				  &internals->combined_queue_cnt)) {
2210 		AF_XDP_LOG_LINE(ERR, "Failed to get channel info of interface: %s",
2211 				if_name);
2212 		goto err_free_internals;
2213 	}
2214 
2215 	if (queue_cnt > internals->combined_queue_cnt) {
2216 		AF_XDP_LOG_LINE(ERR, "Specified queue count %d is larger than combined queue count %d.",
2217 				queue_cnt, internals->combined_queue_cnt);
2218 		goto err_free_internals;
2219 	}
2220 
2221 	internals->rx_queues = rte_zmalloc_socket(NULL,
2222 					sizeof(struct pkt_rx_queue) * queue_cnt,
2223 					0, numa_node);
2224 	if (internals->rx_queues == NULL) {
2225 		AF_XDP_LOG_LINE(ERR, "Failed to allocate memory for rx queues.");
2226 		goto err_free_internals;
2227 	}
2228 
2229 	internals->tx_queues = rte_zmalloc_socket(NULL,
2230 					sizeof(struct pkt_tx_queue) * queue_cnt,
2231 					0, numa_node);
2232 	if (internals->tx_queues == NULL) {
2233 		AF_XDP_LOG_LINE(ERR, "Failed to allocate memory for tx queues.");
2234 		goto err_free_rx;
2235 	}
2236 	for (i = 0; i < queue_cnt; i++) {
2237 		internals->tx_queues[i].pair = &internals->rx_queues[i];
2238 		internals->rx_queues[i].pair = &internals->tx_queues[i];
2239 		internals->rx_queues[i].xsk_queue_idx = start_queue_idx + i;
2240 		internals->tx_queues[i].xsk_queue_idx = start_queue_idx + i;
2241 		internals->rx_queues[i].busy_budget = busy_budget;
2242 	}
2243 
2244 	ret = get_iface_info(if_name, &internals->eth_addr,
2245 			     &internals->if_index);
2246 	if (ret)
2247 		goto err_free_tx;
2248 
2249 	process_private = (struct pmd_process_private *)
2250 		rte_zmalloc_socket(name, sizeof(struct pmd_process_private),
2251 				   RTE_CACHE_LINE_SIZE, numa_node);
2252 	if (process_private == NULL) {
2253 		AF_XDP_LOG_LINE(ERR, "Failed to alloc memory for process private");
2254 		goto err_free_tx;
2255 	}
2256 
2257 	eth_dev = rte_eth_vdev_allocate(dev, 0);
2258 	if (eth_dev == NULL)
2259 		goto err_free_pp;
2260 
2261 	eth_dev->data->dev_private = internals;
2262 	eth_dev->data->dev_link = pmd_link;
2263 	eth_dev->data->mac_addrs = &internals->eth_addr;
2264 	eth_dev->data->dev_flags |= RTE_ETH_DEV_AUTOFILL_QUEUE_XSTATS;
2265 	if (!internals->use_cni && !internals->use_pinned_map)
2266 		eth_dev->dev_ops = &ops;
2267 	else
2268 		eth_dev->dev_ops = &ops_afxdp_dp;
2269 
2270 	eth_dev->rx_pkt_burst = eth_af_xdp_rx;
2271 	eth_dev->tx_pkt_burst = eth_af_xdp_tx;
2272 	eth_dev->process_private = process_private;
2273 
2274 	for (i = 0; i < queue_cnt; i++)
2275 		process_private->rxq_xsk_fds[i] = -1;
2276 
2277 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
2278 	AF_XDP_LOG_LINE(INFO, "Zero copy between umem and mbuf enabled.");
2279 #endif
2280 
2281 	return eth_dev;
2282 
2283 err_free_pp:
2284 	rte_free(process_private);
2285 err_free_tx:
2286 	rte_free(internals->tx_queues);
2287 err_free_rx:
2288 	rte_free(internals->rx_queues);
2289 err_free_internals:
2290 	rte_free(internals);
2291 	return NULL;
2292 }
2293 
2294 /* Secondary process requests rxq fds from primary. */
2295 static int
2296 afxdp_mp_request_fds(const char *name, struct rte_eth_dev *dev)
2297 {
2298 	struct pmd_process_private *process_private = dev->process_private;
2299 	struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
2300 	struct rte_mp_msg request, *reply;
2301 	struct rte_mp_reply replies;
2302 	struct ipc_hdr *request_param = (struct ipc_hdr *)request.param;
2303 	int i, ret;
2304 
2305 	/* Prepare the request */
2306 	memset(&request, 0, sizeof(request));
2307 	strlcpy(request.name, ETH_AF_XDP_MP_KEY, sizeof(request.name));
2308 	strlcpy(request_param->port_name, name,
2309 		sizeof(request_param->port_name));
2310 	request.len_param = sizeof(*request_param);
2311 
2312 	/* Send the request and receive the reply */
2313 	AF_XDP_LOG_LINE(DEBUG, "Sending multi-process IPC request for %s", name);
2314 	ret = rte_mp_request_sync(&request, &replies, &timeout);
2315 	if (ret < 0 || replies.nb_received != 1) {
2316 		AF_XDP_LOG_LINE(ERR, "Failed to request fds from primary: %d",
2317 			   rte_errno);
2318 		return -1;
2319 	}
2320 	reply = replies.msgs;
2321 	AF_XDP_LOG_LINE(DEBUG, "Received multi-process IPC reply for %s", name);
2322 	if (dev->data->nb_rx_queues != reply->num_fds) {
2323 		AF_XDP_LOG_LINE(ERR, "Incorrect number of fds received: %d != %d",
2324 			   reply->num_fds, dev->data->nb_rx_queues);
2325 		return -EINVAL;
2326 	}
2327 
2328 	for (i = 0; i < reply->num_fds; i++)
2329 		process_private->rxq_xsk_fds[i] = reply->fds[i];
2330 
2331 	free(reply);
2332 	return 0;
2333 }
2334 
2335 /* Primary process sends rxq fds to secondary. */
2336 static int
2337 afxdp_mp_send_fds(const struct rte_mp_msg *request, const void *peer)
2338 {
2339 	struct rte_eth_dev *dev;
2340 	struct pmd_process_private *process_private;
2341 	struct rte_mp_msg reply;
2342 	const struct ipc_hdr *request_param =
2343 		(const struct ipc_hdr *)request->param;
2344 	struct ipc_hdr *reply_param =
2345 		(struct ipc_hdr *)reply.param;
2346 	const char *request_name = request_param->port_name;
2347 	int i;
2348 
2349 	AF_XDP_LOG_LINE(DEBUG, "Received multi-process IPC request for %s",
2350 		   request_name);
2351 
2352 	/* Find the requested port */
2353 	dev = rte_eth_dev_get_by_name(request_name);
2354 	if (!dev) {
2355 		AF_XDP_LOG_LINE(ERR, "Failed to get port id for %s", request_name);
2356 		return -1;
2357 	}
2358 	process_private = dev->process_private;
2359 
2360 	/* Populate the reply with the xsk fd for each queue */
2361 	reply.num_fds = 0;
2362 	if (dev->data->nb_rx_queues > RTE_MP_MAX_FD_NUM) {
2363 		AF_XDP_LOG_LINE(ERR, "Number of rx queues (%d) exceeds max number of fds (%d)",
2364 			   dev->data->nb_rx_queues, RTE_MP_MAX_FD_NUM);
2365 		return -EINVAL;
2366 	}
2367 
2368 	for (i = 0; i < dev->data->nb_rx_queues; i++)
2369 		reply.fds[reply.num_fds++] = process_private->rxq_xsk_fds[i];
2370 
2371 	/* Send the reply */
2372 	strlcpy(reply.name, request->name, sizeof(reply.name));
2373 	strlcpy(reply_param->port_name, request_name,
2374 		sizeof(reply_param->port_name));
2375 	reply.len_param = sizeof(*reply_param);
2376 	AF_XDP_LOG_LINE(DEBUG, "Sending multi-process IPC reply for %s",
2377 		   reply_param->port_name);
2378 	if (rte_mp_reply(&reply, peer) < 0) {
2379 		AF_XDP_LOG_LINE(ERR, "Failed to reply to multi-process IPC request");
2380 		return -1;
2381 	}
2382 	return 0;
2383 }
2384 
2385 static int
2386 rte_pmd_af_xdp_probe(struct rte_vdev_device *dev)
2387 {
2388 	struct rte_kvargs *kvlist;
2389 	char if_name[IFNAMSIZ] = {'\0'};
2390 	int xsk_start_queue_idx = ETH_AF_XDP_DFLT_START_QUEUE_IDX;
2391 	int xsk_queue_cnt = ETH_AF_XDP_DFLT_QUEUE_COUNT;
2392 	int shared_umem = 0;
2393 	char prog_path[PATH_MAX] = {'\0'};
2394 	int busy_budget = -1, ret;
2395 	int force_copy = 0;
2396 	int use_cni = 0;
2397 	int use_pinned_map = 0;
2398 	char dp_path[PATH_MAX] = {'\0'};
2399 	struct rte_eth_dev *eth_dev = NULL;
2400 	const char *name = rte_vdev_device_name(dev);
2401 
2402 	AF_XDP_LOG_LINE(INFO, "Initializing pmd_af_xdp for %s", name);
2403 
2404 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
2405 		eth_dev = rte_eth_dev_attach_secondary(name);
2406 		if (eth_dev == NULL) {
2407 			AF_XDP_LOG_LINE(ERR, "Failed to probe %s", name);
2408 			return -EINVAL;
2409 		}
2410 		eth_dev->dev_ops = &ops;
2411 		eth_dev->device = &dev->device;
2412 		eth_dev->rx_pkt_burst = rte_eth_pkt_burst_dummy;
2413 		eth_dev->tx_pkt_burst = rte_eth_pkt_burst_dummy;
2414 		eth_dev->process_private = (struct pmd_process_private *)
2415 			rte_zmalloc_socket(name,
2416 					   sizeof(struct pmd_process_private),
2417 					   RTE_CACHE_LINE_SIZE,
2418 					   eth_dev->device->numa_node);
2419 		if (eth_dev->process_private == NULL) {
2420 			AF_XDP_LOG_LINE(ERR,
2421 				"Failed to alloc memory for process private");
2422 			return -ENOMEM;
2423 		}
2424 
2425 		/* Obtain the xsk fds from the primary process. */
2426 		if (afxdp_mp_request_fds(name, eth_dev))
2427 			return -1;
2428 
2429 		rte_eth_dev_probing_finish(eth_dev);
2430 		return 0;
2431 	}
2432 
2433 	kvlist = rte_kvargs_parse(rte_vdev_device_args(dev), valid_arguments);
2434 	if (kvlist == NULL) {
2435 		AF_XDP_LOG_LINE(ERR, "Invalid kvargs key");
2436 		return -EINVAL;
2437 	}
2438 
2439 	if (parse_parameters(kvlist, if_name, &xsk_start_queue_idx,
2440 			     &xsk_queue_cnt, &shared_umem, prog_path,
2441 			     &busy_budget, &force_copy, &use_cni, &use_pinned_map,
2442 			     dp_path) < 0) {
2443 		AF_XDP_LOG_LINE(ERR, "Invalid kvargs value");
2444 		return -EINVAL;
2445 	}
2446 
2447 	if (use_cni && use_pinned_map) {
2448 		AF_XDP_LOG_LINE(ERR, "When '%s' parameter is used, '%s' parameter is not valid",
2449 			ETH_AF_XDP_USE_CNI_ARG, ETH_AF_XDP_USE_PINNED_MAP_ARG);
2450 		return -EINVAL;
2451 	}
2452 
2453 	if ((use_cni || use_pinned_map) && busy_budget > 0) {
2454 		AF_XDP_LOG_LINE(ERR, "When '%s' or '%s' parameter is used, '%s' parameter is not valid",
2455 			ETH_AF_XDP_USE_CNI_ARG, ETH_AF_XDP_USE_PINNED_MAP_ARG,
2456 			ETH_AF_XDP_BUDGET_ARG);
2457 		return -EINVAL;
2458 	}
2459 
2460 	if ((use_cni || use_pinned_map) && strnlen(prog_path, PATH_MAX)) {
2461 		AF_XDP_LOG_LINE(ERR, "When '%s' or '%s' parameter is used, '%s' parameter is not valid",
2462 			ETH_AF_XDP_USE_CNI_ARG, ETH_AF_XDP_USE_PINNED_MAP_ARG,
2463 			ETH_AF_XDP_PROG_ARG);
2464 		return -EINVAL;
2465 	}
2466 
2467 	if (use_cni && !strnlen(dp_path, PATH_MAX)) {
2468 		snprintf(dp_path, sizeof(dp_path), "%s/%s/%s", DP_BASE_PATH, if_name, DP_UDS_SOCK);
2469 		AF_XDP_LOG_LINE(INFO, "'%s' parameter not provided, setting value to '%s'",
2470 			ETH_AF_XDP_DP_PATH_ARG, dp_path);
2471 	}
2472 
2473 	if (use_pinned_map && !strnlen(dp_path, PATH_MAX)) {
2474 		snprintf(dp_path, sizeof(dp_path), "%s/%s/%s", DP_BASE_PATH, if_name, DP_XSK_MAP);
2475 		AF_XDP_LOG_LINE(INFO, "'%s' parameter not provided, setting value to '%s'",
2476 			ETH_AF_XDP_DP_PATH_ARG, dp_path);
2477 	}
2478 
2479 	if ((!use_cni && !use_pinned_map) && strnlen(dp_path, PATH_MAX)) {
2480 		AF_XDP_LOG_LINE(ERR, "'%s' parameter is set, but '%s' or '%s' were not enabled",
2481 			ETH_AF_XDP_DP_PATH_ARG, ETH_AF_XDP_USE_CNI_ARG,
2482 			ETH_AF_XDP_USE_PINNED_MAP_ARG);
2483 		return -EINVAL;
2484 	}
2485 
2486 	if (strlen(if_name) == 0) {
2487 		AF_XDP_LOG_LINE(ERR, "Network interface must be specified");
2488 		return -EINVAL;
2489 	}
2490 
2491 	/* get numa node id from net sysfs */
2492 	if (dev->device.numa_node == SOCKET_ID_ANY) {
2493 		unsigned long numa = 0;
2494 		char numa_path[PATH_MAX];
2495 
2496 		snprintf(numa_path, sizeof(numa_path), "/sys/class/net/%s/device/numa_node",
2497 			 if_name);
2498 		if (access(numa_path, R_OK) != 0 || eal_parse_sysfs_value(numa_path, &numa) != 0)
2499 			dev->device.numa_node = rte_socket_id();
2500 		else
2501 			dev->device.numa_node = numa;
2502 	}
2503 
2504 	busy_budget = busy_budget == -1 ? ETH_AF_XDP_DFLT_BUSY_BUDGET :
2505 					busy_budget;
2506 
2507 	eth_dev = init_internals(dev, if_name, xsk_start_queue_idx,
2508 				 xsk_queue_cnt, shared_umem, prog_path,
2509 				 busy_budget, force_copy, use_cni, use_pinned_map,
2510 				 dp_path);
2511 	if (eth_dev == NULL) {
2512 		AF_XDP_LOG_LINE(ERR, "Failed to init internals");
2513 		return -1;
2514 	}
2515 
2516 	/* Register IPC callback which shares xsk fds from primary to secondary */
2517 	if (!afxdp_dev_count) {
2518 		ret = rte_mp_action_register(ETH_AF_XDP_MP_KEY, afxdp_mp_send_fds);
2519 		if (ret < 0 && rte_errno != ENOTSUP) {
2520 			AF_XDP_LOG_LINE(ERR, "%s: Failed to register multi-process IPC callback: %s",
2521 				   name, strerror(rte_errno));
2522 			return -1;
2523 		}
2524 	}
2525 	afxdp_dev_count++;
2526 
2527 	rte_eth_dev_probing_finish(eth_dev);
2528 
2529 	return 0;
2530 }
2531 
2532 static int
2533 rte_pmd_af_xdp_remove(struct rte_vdev_device *dev)
2534 {
2535 	struct rte_eth_dev *eth_dev = NULL;
2536 
2537 	AF_XDP_LOG_LINE(INFO, "Removing AF_XDP ethdev on numa socket %u",
2538 		rte_socket_id());
2539 
2540 	if (dev == NULL)
2541 		return -1;
2542 
2543 	/* find the ethdev entry */
2544 	eth_dev = rte_eth_dev_allocated(rte_vdev_device_name(dev));
2545 	if (eth_dev == NULL)
2546 		return 0;
2547 
2548 	eth_dev_close(eth_dev);
2549 	if (afxdp_dev_count == 1)
2550 		rte_mp_action_unregister(ETH_AF_XDP_MP_KEY);
2551 	afxdp_dev_count--;
2552 	rte_eth_dev_release_port(eth_dev);
2553 
2554 	return 0;
2555 }
2556 
2557 static struct rte_vdev_driver pmd_af_xdp_drv = {
2558 	.probe = rte_pmd_af_xdp_probe,
2559 	.remove = rte_pmd_af_xdp_remove,
2560 };
2561 
2562 RTE_PMD_REGISTER_VDEV(net_af_xdp, pmd_af_xdp_drv);
2563 RTE_PMD_REGISTER_PARAM_STRING(net_af_xdp,
2564 			      "iface=<string> "
2565 			      "start_queue=<int> "
2566 			      "queue_count=<int> "
2567 			      "shared_umem=<int> "
2568 			      "xdp_prog=<string> "
2569 			      "busy_budget=<int> "
2570 			      "force_copy=<int> "
2571 			      "use_cni=<int> "
2572 			      "use_pinned_map=<int> "
2573 			      "dp_path=<string> ");
2574