xref: /dpdk/drivers/net/af_xdp/rte_eth_af_xdp.c (revision 68a03efeed657e6e05f281479b33b51102797e15)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2019-2020 Intel Corporation.
3  */
4 #include <unistd.h>
5 #include <errno.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <poll.h>
9 #include <netinet/in.h>
10 #include <net/if.h>
11 #include <sys/socket.h>
12 #include <sys/ioctl.h>
13 #include <linux/if_ether.h>
14 #include <linux/if_xdp.h>
15 #include <linux/if_link.h>
16 #include <linux/ethtool.h>
17 #include <linux/sockios.h>
18 #include "af_xdp_deps.h"
19 #include <bpf/xsk.h>
20 
21 #include <rte_ethdev.h>
22 #include <ethdev_driver.h>
23 #include <ethdev_vdev.h>
24 #include <rte_kvargs.h>
25 #include <rte_bus_vdev.h>
26 #include <rte_string_fns.h>
27 #include <rte_branch_prediction.h>
28 #include <rte_common.h>
29 #include <rte_dev.h>
30 #include <rte_eal.h>
31 #include <rte_ether.h>
32 #include <rte_lcore.h>
33 #include <rte_log.h>
34 #include <rte_memory.h>
35 #include <rte_memzone.h>
36 #include <rte_mempool.h>
37 #include <rte_mbuf.h>
38 #include <rte_malloc.h>
39 #include <rte_ring.h>
40 #include <rte_spinlock.h>
41 
42 #include "compat.h"
43 
44 #ifndef SO_PREFER_BUSY_POLL
45 #define SO_PREFER_BUSY_POLL 69
46 #endif
47 #ifndef SO_BUSY_POLL_BUDGET
48 #define SO_BUSY_POLL_BUDGET 70
49 #endif
50 
51 
52 #ifndef SOL_XDP
53 #define SOL_XDP 283
54 #endif
55 
56 #ifndef AF_XDP
57 #define AF_XDP 44
58 #endif
59 
60 #ifndef PF_XDP
61 #define PF_XDP AF_XDP
62 #endif
63 
64 RTE_LOG_REGISTER(af_xdp_logtype, pmd.net.af_xdp, NOTICE);
65 
66 #define AF_XDP_LOG(level, fmt, args...)			\
67 	rte_log(RTE_LOG_ ## level, af_xdp_logtype,	\
68 		"%s(): " fmt, __func__, ##args)
69 
70 #define ETH_AF_XDP_FRAME_SIZE		2048
71 #define ETH_AF_XDP_NUM_BUFFERS		4096
72 #define ETH_AF_XDP_DFLT_NUM_DESCS	XSK_RING_CONS__DEFAULT_NUM_DESCS
73 #define ETH_AF_XDP_DFLT_START_QUEUE_IDX	0
74 #define ETH_AF_XDP_DFLT_QUEUE_COUNT	1
75 #define ETH_AF_XDP_DFLT_BUSY_BUDGET	64
76 #define ETH_AF_XDP_DFLT_BUSY_TIMEOUT	20
77 
78 #define ETH_AF_XDP_RX_BATCH_SIZE	XSK_RING_CONS__DEFAULT_NUM_DESCS
79 #define ETH_AF_XDP_TX_BATCH_SIZE	XSK_RING_CONS__DEFAULT_NUM_DESCS
80 
81 
82 struct xsk_umem_info {
83 	struct xsk_umem *umem;
84 	struct rte_ring *buf_ring;
85 	const struct rte_memzone *mz;
86 	struct rte_mempool *mb_pool;
87 	void *buffer;
88 	uint8_t refcnt;
89 	uint32_t max_xsks;
90 };
91 
92 struct rx_stats {
93 	uint64_t rx_pkts;
94 	uint64_t rx_bytes;
95 	uint64_t rx_dropped;
96 };
97 
98 struct pkt_rx_queue {
99 	struct xsk_ring_cons rx;
100 	struct xsk_umem_info *umem;
101 	struct xsk_socket *xsk;
102 	struct rte_mempool *mb_pool;
103 
104 	struct rx_stats stats;
105 
106 	struct xsk_ring_prod fq;
107 	struct xsk_ring_cons cq;
108 
109 	struct pkt_tx_queue *pair;
110 	struct pollfd fds[1];
111 	int xsk_queue_idx;
112 	int busy_budget;
113 };
114 
115 struct tx_stats {
116 	uint64_t tx_pkts;
117 	uint64_t tx_bytes;
118 	uint64_t tx_dropped;
119 };
120 
121 struct pkt_tx_queue {
122 	struct xsk_ring_prod tx;
123 	struct xsk_umem_info *umem;
124 
125 	struct tx_stats stats;
126 
127 	struct pkt_rx_queue *pair;
128 	int xsk_queue_idx;
129 };
130 
131 struct pmd_internals {
132 	int if_index;
133 	char if_name[IFNAMSIZ];
134 	int start_queue_idx;
135 	int queue_cnt;
136 	int max_queue_cnt;
137 	int combined_queue_cnt;
138 	bool shared_umem;
139 	char prog_path[PATH_MAX];
140 	bool custom_prog_configured;
141 
142 	struct rte_ether_addr eth_addr;
143 
144 	struct pkt_rx_queue *rx_queues;
145 	struct pkt_tx_queue *tx_queues;
146 };
147 
148 #define ETH_AF_XDP_IFACE_ARG			"iface"
149 #define ETH_AF_XDP_START_QUEUE_ARG		"start_queue"
150 #define ETH_AF_XDP_QUEUE_COUNT_ARG		"queue_count"
151 #define ETH_AF_XDP_SHARED_UMEM_ARG		"shared_umem"
152 #define ETH_AF_XDP_PROG_ARG			"xdp_prog"
153 #define ETH_AF_XDP_BUDGET_ARG			"busy_budget"
154 
155 static const char * const valid_arguments[] = {
156 	ETH_AF_XDP_IFACE_ARG,
157 	ETH_AF_XDP_START_QUEUE_ARG,
158 	ETH_AF_XDP_QUEUE_COUNT_ARG,
159 	ETH_AF_XDP_SHARED_UMEM_ARG,
160 	ETH_AF_XDP_PROG_ARG,
161 	ETH_AF_XDP_BUDGET_ARG,
162 	NULL
163 };
164 
165 static const struct rte_eth_link pmd_link = {
166 	.link_speed = ETH_SPEED_NUM_10G,
167 	.link_duplex = ETH_LINK_FULL_DUPLEX,
168 	.link_status = ETH_LINK_DOWN,
169 	.link_autoneg = ETH_LINK_AUTONEG
170 };
171 
172 /* List which tracks PMDs to facilitate sharing UMEMs across them. */
173 struct internal_list {
174 	TAILQ_ENTRY(internal_list) next;
175 	struct rte_eth_dev *eth_dev;
176 };
177 
178 TAILQ_HEAD(internal_list_head, internal_list);
179 static struct internal_list_head internal_list =
180 	TAILQ_HEAD_INITIALIZER(internal_list);
181 
182 static pthread_mutex_t internal_list_lock = PTHREAD_MUTEX_INITIALIZER;
183 
184 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
185 static inline int
186 reserve_fill_queue_zc(struct xsk_umem_info *umem, uint16_t reserve_size,
187 		      struct rte_mbuf **bufs, struct xsk_ring_prod *fq)
188 {
189 	uint32_t idx;
190 	uint16_t i;
191 
192 	if (unlikely(!xsk_ring_prod__reserve(fq, reserve_size, &idx))) {
193 		for (i = 0; i < reserve_size; i++)
194 			rte_pktmbuf_free(bufs[i]);
195 		AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n");
196 		return -1;
197 	}
198 
199 	for (i = 0; i < reserve_size; i++) {
200 		__u64 *fq_addr;
201 		uint64_t addr;
202 
203 		fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
204 		addr = (uint64_t)bufs[i] - (uint64_t)umem->buffer -
205 				umem->mb_pool->header_size;
206 		*fq_addr = addr;
207 	}
208 
209 	xsk_ring_prod__submit(fq, reserve_size);
210 
211 	return 0;
212 }
213 #else
214 static inline int
215 reserve_fill_queue_cp(struct xsk_umem_info *umem, uint16_t reserve_size,
216 		      struct rte_mbuf **bufs __rte_unused,
217 		      struct xsk_ring_prod *fq)
218 {
219 	void *addrs[reserve_size];
220 	uint32_t idx;
221 	uint16_t i;
222 
223 	if (rte_ring_dequeue_bulk(umem->buf_ring, addrs, reserve_size, NULL)
224 		    != reserve_size) {
225 		AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n");
226 		return -1;
227 	}
228 
229 	if (unlikely(!xsk_ring_prod__reserve(fq, reserve_size, &idx))) {
230 		AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n");
231 		rte_ring_enqueue_bulk(umem->buf_ring, addrs,
232 				reserve_size, NULL);
233 		return -1;
234 	}
235 
236 	for (i = 0; i < reserve_size; i++) {
237 		__u64 *fq_addr;
238 
239 		fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
240 		*fq_addr = (uint64_t)addrs[i];
241 	}
242 
243 	xsk_ring_prod__submit(fq, reserve_size);
244 
245 	return 0;
246 }
247 #endif
248 
249 static inline int
250 reserve_fill_queue(struct xsk_umem_info *umem, uint16_t reserve_size,
251 		   struct rte_mbuf **bufs, struct xsk_ring_prod *fq)
252 {
253 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
254 	return reserve_fill_queue_zc(umem, reserve_size, bufs, fq);
255 #else
256 	return reserve_fill_queue_cp(umem, reserve_size, bufs, fq);
257 #endif
258 }
259 
260 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
261 static uint16_t
262 af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
263 {
264 	struct pkt_rx_queue *rxq = queue;
265 	struct xsk_ring_cons *rx = &rxq->rx;
266 	struct xsk_ring_prod *fq = &rxq->fq;
267 	struct xsk_umem_info *umem = rxq->umem;
268 	uint32_t idx_rx = 0;
269 	unsigned long rx_bytes = 0;
270 	int i;
271 	struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
272 
273 	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
274 
275 	if (nb_pkts == 0) {
276 		if (syscall_needed(&rxq->fq, rxq->busy_budget))
277 			(void)recvfrom(xsk_socket__fd(rxq->xsk), NULL, 0,
278 				MSG_DONTWAIT, NULL, NULL);
279 
280 		return 0;
281 	}
282 
283 	/* allocate bufs for fill queue replenishment after rx */
284 	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
285 		AF_XDP_LOG(DEBUG,
286 			"Failed to get enough buffers for fq.\n");
287 		/* rollback cached_cons which is added by
288 		 * xsk_ring_cons__peek
289 		 */
290 		rx->cached_cons -= nb_pkts;
291 		return 0;
292 	}
293 
294 	for (i = 0; i < nb_pkts; i++) {
295 		const struct xdp_desc *desc;
296 		uint64_t addr;
297 		uint32_t len;
298 		uint64_t offset;
299 
300 		desc = xsk_ring_cons__rx_desc(rx, idx_rx++);
301 		addr = desc->addr;
302 		len = desc->len;
303 
304 		offset = xsk_umem__extract_offset(addr);
305 		addr = xsk_umem__extract_addr(addr);
306 
307 		bufs[i] = (struct rte_mbuf *)
308 				xsk_umem__get_data(umem->buffer, addr +
309 					umem->mb_pool->header_size);
310 		bufs[i]->data_off = offset - sizeof(struct rte_mbuf) -
311 			rte_pktmbuf_priv_size(umem->mb_pool) -
312 			umem->mb_pool->header_size;
313 
314 		rte_pktmbuf_pkt_len(bufs[i]) = len;
315 		rte_pktmbuf_data_len(bufs[i]) = len;
316 		rx_bytes += len;
317 	}
318 
319 	xsk_ring_cons__release(rx, nb_pkts);
320 	(void)reserve_fill_queue(umem, nb_pkts, fq_bufs, fq);
321 
322 	/* statistics */
323 	rxq->stats.rx_pkts += nb_pkts;
324 	rxq->stats.rx_bytes += rx_bytes;
325 
326 	return nb_pkts;
327 }
328 #else
329 static uint16_t
330 af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
331 {
332 	struct pkt_rx_queue *rxq = queue;
333 	struct xsk_ring_cons *rx = &rxq->rx;
334 	struct xsk_umem_info *umem = rxq->umem;
335 	struct xsk_ring_prod *fq = &rxq->fq;
336 	uint32_t idx_rx = 0;
337 	unsigned long rx_bytes = 0;
338 	int i;
339 	uint32_t free_thresh = fq->size >> 1;
340 	struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
341 
342 	if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
343 		(void)reserve_fill_queue(umem, nb_pkts, NULL, fq);
344 
345 	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
346 	if (nb_pkts == 0) {
347 #if defined(XDP_USE_NEED_WAKEUP)
348 		if (xsk_ring_prod__needs_wakeup(fq))
349 			(void)recvfrom(xsk_socket__fd(rxq->xsk), NULL, 0,
350 				MSG_DONTWAIT, NULL, NULL);
351 #endif
352 		return 0;
353 	}
354 
355 	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts))) {
356 		/* rollback cached_cons which is added by
357 		 * xsk_ring_cons__peek
358 		 */
359 		rx->cached_cons -= nb_pkts;
360 		return 0;
361 	}
362 
363 	for (i = 0; i < nb_pkts; i++) {
364 		const struct xdp_desc *desc;
365 		uint64_t addr;
366 		uint32_t len;
367 		void *pkt;
368 
369 		desc = xsk_ring_cons__rx_desc(rx, idx_rx++);
370 		addr = desc->addr;
371 		len = desc->len;
372 		pkt = xsk_umem__get_data(rxq->umem->mz->addr, addr);
373 
374 		rte_memcpy(rte_pktmbuf_mtod(mbufs[i], void *), pkt, len);
375 		rte_ring_enqueue(umem->buf_ring, (void *)addr);
376 		rte_pktmbuf_pkt_len(mbufs[i]) = len;
377 		rte_pktmbuf_data_len(mbufs[i]) = len;
378 		rx_bytes += len;
379 		bufs[i] = mbufs[i];
380 	}
381 
382 	xsk_ring_cons__release(rx, nb_pkts);
383 
384 	/* statistics */
385 	rxq->stats.rx_pkts += nb_pkts;
386 	rxq->stats.rx_bytes += rx_bytes;
387 
388 	return nb_pkts;
389 }
390 #endif
391 
392 static uint16_t
393 af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
394 {
395 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
396 	return af_xdp_rx_zc(queue, bufs, nb_pkts);
397 #else
398 	return af_xdp_rx_cp(queue, bufs, nb_pkts);
399 #endif
400 }
401 
402 static uint16_t
403 eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
404 {
405 	uint16_t nb_rx;
406 
407 	if (likely(nb_pkts <= ETH_AF_XDP_RX_BATCH_SIZE))
408 		return af_xdp_rx(queue, bufs, nb_pkts);
409 
410 	/* Split larger batch into smaller batches of size
411 	 * ETH_AF_XDP_RX_BATCH_SIZE or less.
412 	 */
413 	nb_rx = 0;
414 	while (nb_pkts) {
415 		uint16_t ret, n;
416 
417 		n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE);
418 		ret = af_xdp_rx(queue, &bufs[nb_rx], n);
419 		nb_rx = (uint16_t)(nb_rx + ret);
420 		nb_pkts = (uint16_t)(nb_pkts - ret);
421 		if (ret < n)
422 			break;
423 	}
424 
425 	return nb_rx;
426 }
427 
428 static void
429 pull_umem_cq(struct xsk_umem_info *umem, int size, struct xsk_ring_cons *cq)
430 {
431 	size_t i, n;
432 	uint32_t idx_cq = 0;
433 
434 	n = xsk_ring_cons__peek(cq, size, &idx_cq);
435 
436 	for (i = 0; i < n; i++) {
437 		uint64_t addr;
438 		addr = *xsk_ring_cons__comp_addr(cq, idx_cq++);
439 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
440 		addr = xsk_umem__extract_addr(addr);
441 		rte_pktmbuf_free((struct rte_mbuf *)
442 					xsk_umem__get_data(umem->buffer,
443 					addr + umem->mb_pool->header_size));
444 #else
445 		rte_ring_enqueue(umem->buf_ring, (void *)addr);
446 #endif
447 	}
448 
449 	xsk_ring_cons__release(cq, n);
450 }
451 
452 static void
453 kick_tx(struct pkt_tx_queue *txq, struct xsk_ring_cons *cq)
454 {
455 	struct xsk_umem_info *umem = txq->umem;
456 
457 	pull_umem_cq(umem, XSK_RING_CONS__DEFAULT_NUM_DESCS, cq);
458 
459 	if (syscall_needed(&txq->tx, txq->pair->busy_budget))
460 		while (send(xsk_socket__fd(txq->pair->xsk), NULL,
461 			    0, MSG_DONTWAIT) < 0) {
462 			/* some thing unexpected */
463 			if (errno != EBUSY && errno != EAGAIN && errno != EINTR)
464 				break;
465 
466 			/* pull from completion queue to leave more space */
467 			if (errno == EAGAIN)
468 				pull_umem_cq(umem,
469 					     XSK_RING_CONS__DEFAULT_NUM_DESCS,
470 					     cq);
471 		}
472 }
473 
474 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
475 static uint16_t
476 af_xdp_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
477 {
478 	struct pkt_tx_queue *txq = queue;
479 	struct xsk_umem_info *umem = txq->umem;
480 	struct rte_mbuf *mbuf;
481 	unsigned long tx_bytes = 0;
482 	int i;
483 	uint32_t idx_tx;
484 	uint16_t count = 0;
485 	struct xdp_desc *desc;
486 	uint64_t addr, offset;
487 	struct xsk_ring_cons *cq = &txq->pair->cq;
488 	uint32_t free_thresh = cq->size >> 1;
489 
490 	if (xsk_cons_nb_avail(cq, free_thresh) >= free_thresh)
491 		pull_umem_cq(umem, XSK_RING_CONS__DEFAULT_NUM_DESCS, cq);
492 
493 	for (i = 0; i < nb_pkts; i++) {
494 		mbuf = bufs[i];
495 
496 		if (mbuf->pool == umem->mb_pool) {
497 			if (!xsk_ring_prod__reserve(&txq->tx, 1, &idx_tx)) {
498 				kick_tx(txq, cq);
499 				if (!xsk_ring_prod__reserve(&txq->tx, 1,
500 							    &idx_tx))
501 					goto out;
502 			}
503 			desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx);
504 			desc->len = mbuf->pkt_len;
505 			addr = (uint64_t)mbuf - (uint64_t)umem->buffer -
506 					umem->mb_pool->header_size;
507 			offset = rte_pktmbuf_mtod(mbuf, uint64_t) -
508 					(uint64_t)mbuf +
509 					umem->mb_pool->header_size;
510 			offset = offset << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
511 			desc->addr = addr | offset;
512 			count++;
513 		} else {
514 			struct rte_mbuf *local_mbuf =
515 					rte_pktmbuf_alloc(umem->mb_pool);
516 			void *pkt;
517 
518 			if (local_mbuf == NULL)
519 				goto out;
520 
521 			if (!xsk_ring_prod__reserve(&txq->tx, 1, &idx_tx)) {
522 				rte_pktmbuf_free(local_mbuf);
523 				kick_tx(txq, cq);
524 				goto out;
525 			}
526 
527 			desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx);
528 			desc->len = mbuf->pkt_len;
529 
530 			addr = (uint64_t)local_mbuf - (uint64_t)umem->buffer -
531 					umem->mb_pool->header_size;
532 			offset = rte_pktmbuf_mtod(local_mbuf, uint64_t) -
533 					(uint64_t)local_mbuf +
534 					umem->mb_pool->header_size;
535 			pkt = xsk_umem__get_data(umem->buffer, addr + offset);
536 			offset = offset << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
537 			desc->addr = addr | offset;
538 			rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
539 					desc->len);
540 			rte_pktmbuf_free(mbuf);
541 			count++;
542 		}
543 
544 		tx_bytes += mbuf->pkt_len;
545 	}
546 
547 	kick_tx(txq, cq);
548 
549 out:
550 	xsk_ring_prod__submit(&txq->tx, count);
551 
552 	txq->stats.tx_pkts += count;
553 	txq->stats.tx_bytes += tx_bytes;
554 	txq->stats.tx_dropped += nb_pkts - count;
555 
556 	return count;
557 }
558 #else
559 static uint16_t
560 af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
561 {
562 	struct pkt_tx_queue *txq = queue;
563 	struct xsk_umem_info *umem = txq->umem;
564 	struct rte_mbuf *mbuf;
565 	void *addrs[ETH_AF_XDP_TX_BATCH_SIZE];
566 	unsigned long tx_bytes = 0;
567 	int i;
568 	uint32_t idx_tx;
569 	struct xsk_ring_cons *cq = &txq->pair->cq;
570 
571 	pull_umem_cq(umem, nb_pkts, cq);
572 
573 	nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
574 					nb_pkts, NULL);
575 	if (nb_pkts == 0)
576 		return 0;
577 
578 	if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) {
579 		kick_tx(txq, cq);
580 		rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts, NULL);
581 		return 0;
582 	}
583 
584 	for (i = 0; i < nb_pkts; i++) {
585 		struct xdp_desc *desc;
586 		void *pkt;
587 
588 		desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
589 		mbuf = bufs[i];
590 		desc->len = mbuf->pkt_len;
591 
592 		desc->addr = (uint64_t)addrs[i];
593 		pkt = xsk_umem__get_data(umem->mz->addr,
594 					 desc->addr);
595 		rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), desc->len);
596 		tx_bytes += mbuf->pkt_len;
597 		rte_pktmbuf_free(mbuf);
598 	}
599 
600 	xsk_ring_prod__submit(&txq->tx, nb_pkts);
601 
602 	kick_tx(txq, cq);
603 
604 	txq->stats.tx_pkts += nb_pkts;
605 	txq->stats.tx_bytes += tx_bytes;
606 
607 	return nb_pkts;
608 }
609 
610 static uint16_t
611 af_xdp_tx_cp_batch(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
612 {
613 	uint16_t nb_tx;
614 
615 	if (likely(nb_pkts <= ETH_AF_XDP_TX_BATCH_SIZE))
616 		return af_xdp_tx_cp(queue, bufs, nb_pkts);
617 
618 	nb_tx = 0;
619 	while (nb_pkts) {
620 		uint16_t ret, n;
621 
622 		/* Split larger batch into smaller batches of size
623 		 * ETH_AF_XDP_TX_BATCH_SIZE or less.
624 		 */
625 		n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
626 		ret = af_xdp_tx_cp(queue, &bufs[nb_tx], n);
627 		nb_tx = (uint16_t)(nb_tx + ret);
628 		nb_pkts = (uint16_t)(nb_pkts - ret);
629 		if (ret < n)
630 			break;
631 	}
632 
633 	return nb_tx;
634 }
635 #endif
636 
637 static uint16_t
638 eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
639 {
640 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
641 	return af_xdp_tx_zc(queue, bufs, nb_pkts);
642 #else
643 	return af_xdp_tx_cp_batch(queue, bufs, nb_pkts);
644 #endif
645 }
646 
647 static int
648 eth_dev_start(struct rte_eth_dev *dev)
649 {
650 	dev->data->dev_link.link_status = ETH_LINK_UP;
651 
652 	return 0;
653 }
654 
655 /* This function gets called when the current port gets stopped. */
656 static int
657 eth_dev_stop(struct rte_eth_dev *dev)
658 {
659 	dev->data->dev_link.link_status = ETH_LINK_DOWN;
660 	return 0;
661 }
662 
663 /* Find ethdev in list */
664 static inline struct internal_list *
665 find_internal_resource(struct pmd_internals *port_int)
666 {
667 	int found = 0;
668 	struct internal_list *list = NULL;
669 
670 	if (port_int == NULL)
671 		return NULL;
672 
673 	pthread_mutex_lock(&internal_list_lock);
674 
675 	TAILQ_FOREACH(list, &internal_list, next) {
676 		struct pmd_internals *list_int =
677 				list->eth_dev->data->dev_private;
678 		if (list_int == port_int) {
679 			found = 1;
680 			break;
681 		}
682 	}
683 
684 	pthread_mutex_unlock(&internal_list_lock);
685 
686 	if (!found)
687 		return NULL;
688 
689 	return list;
690 }
691 
692 /* Check if the netdev,qid context already exists */
693 static inline bool
694 ctx_exists(struct pkt_rx_queue *rxq, const char *ifname,
695 		struct pkt_rx_queue *list_rxq, const char *list_ifname)
696 {
697 	bool exists = false;
698 
699 	if (rxq->xsk_queue_idx == list_rxq->xsk_queue_idx &&
700 			!strncmp(ifname, list_ifname, IFNAMSIZ)) {
701 		AF_XDP_LOG(ERR, "ctx %s,%i already exists, cannot share umem\n",
702 					ifname, rxq->xsk_queue_idx);
703 		exists = true;
704 	}
705 
706 	return exists;
707 }
708 
709 /* Get a pointer to an existing UMEM which overlays the rxq's mb_pool */
710 static inline int
711 get_shared_umem(struct pkt_rx_queue *rxq, const char *ifname,
712 			struct xsk_umem_info **umem)
713 {
714 	struct internal_list *list;
715 	struct pmd_internals *internals;
716 	int i = 0, ret = 0;
717 	struct rte_mempool *mb_pool = rxq->mb_pool;
718 
719 	if (mb_pool == NULL)
720 		return ret;
721 
722 	pthread_mutex_lock(&internal_list_lock);
723 
724 	TAILQ_FOREACH(list, &internal_list, next) {
725 		internals = list->eth_dev->data->dev_private;
726 		for (i = 0; i < internals->queue_cnt; i++) {
727 			struct pkt_rx_queue *list_rxq =
728 						&internals->rx_queues[i];
729 			if (rxq == list_rxq)
730 				continue;
731 			if (mb_pool == internals->rx_queues[i].mb_pool) {
732 				if (ctx_exists(rxq, ifname, list_rxq,
733 						internals->if_name)) {
734 					ret = -1;
735 					goto out;
736 				}
737 				if (__atomic_load_n(
738 					&internals->rx_queues[i].umem->refcnt,
739 							__ATOMIC_ACQUIRE)) {
740 					*umem = internals->rx_queues[i].umem;
741 					goto out;
742 				}
743 			}
744 		}
745 	}
746 
747 out:
748 	pthread_mutex_unlock(&internal_list_lock);
749 
750 	return ret;
751 }
752 
753 static int
754 eth_dev_configure(struct rte_eth_dev *dev)
755 {
756 	struct pmd_internals *internal = dev->data->dev_private;
757 
758 	/* rx/tx must be paired */
759 	if (dev->data->nb_rx_queues != dev->data->nb_tx_queues)
760 		return -EINVAL;
761 
762 	if (internal->shared_umem) {
763 		struct internal_list *list = NULL;
764 		const char *name = dev->device->name;
765 
766 		/* Ensure PMD is not already inserted into the list */
767 		list = find_internal_resource(internal);
768 		if (list)
769 			return 0;
770 
771 		list = rte_zmalloc_socket(name, sizeof(*list), 0,
772 					dev->device->numa_node);
773 		if (list == NULL)
774 			return -1;
775 
776 		list->eth_dev = dev;
777 		pthread_mutex_lock(&internal_list_lock);
778 		TAILQ_INSERT_TAIL(&internal_list, list, next);
779 		pthread_mutex_unlock(&internal_list_lock);
780 	}
781 
782 	return 0;
783 }
784 
785 static int
786 eth_dev_info(struct rte_eth_dev *dev, struct rte_eth_dev_info *dev_info)
787 {
788 	struct pmd_internals *internals = dev->data->dev_private;
789 
790 	dev_info->if_index = internals->if_index;
791 	dev_info->max_mac_addrs = 1;
792 	dev_info->max_rx_pktlen = ETH_FRAME_LEN;
793 	dev_info->max_rx_queues = internals->queue_cnt;
794 	dev_info->max_tx_queues = internals->queue_cnt;
795 
796 	dev_info->min_mtu = RTE_ETHER_MIN_MTU;
797 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
798 	dev_info->max_mtu = getpagesize() -
799 				sizeof(struct rte_mempool_objhdr) -
800 				sizeof(struct rte_mbuf) -
801 				RTE_PKTMBUF_HEADROOM - XDP_PACKET_HEADROOM;
802 #else
803 	dev_info->max_mtu = ETH_AF_XDP_FRAME_SIZE - XDP_PACKET_HEADROOM;
804 #endif
805 
806 	dev_info->default_rxportconf.burst_size = ETH_AF_XDP_DFLT_BUSY_BUDGET;
807 	dev_info->default_txportconf.burst_size = ETH_AF_XDP_DFLT_BUSY_BUDGET;
808 	dev_info->default_rxportconf.nb_queues = 1;
809 	dev_info->default_txportconf.nb_queues = 1;
810 	dev_info->default_rxportconf.ring_size = ETH_AF_XDP_DFLT_NUM_DESCS;
811 	dev_info->default_txportconf.ring_size = ETH_AF_XDP_DFLT_NUM_DESCS;
812 
813 	return 0;
814 }
815 
816 static int
817 eth_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
818 {
819 	struct pmd_internals *internals = dev->data->dev_private;
820 	struct xdp_statistics xdp_stats;
821 	struct pkt_rx_queue *rxq;
822 	struct pkt_tx_queue *txq;
823 	socklen_t optlen;
824 	int i, ret;
825 
826 	for (i = 0; i < dev->data->nb_rx_queues; i++) {
827 		optlen = sizeof(struct xdp_statistics);
828 		rxq = &internals->rx_queues[i];
829 		txq = rxq->pair;
830 		stats->q_ipackets[i] = rxq->stats.rx_pkts;
831 		stats->q_ibytes[i] = rxq->stats.rx_bytes;
832 
833 		stats->q_opackets[i] = txq->stats.tx_pkts;
834 		stats->q_obytes[i] = txq->stats.tx_bytes;
835 
836 		stats->ipackets += stats->q_ipackets[i];
837 		stats->ibytes += stats->q_ibytes[i];
838 		stats->imissed += rxq->stats.rx_dropped;
839 		stats->oerrors += txq->stats.tx_dropped;
840 		ret = getsockopt(xsk_socket__fd(rxq->xsk), SOL_XDP,
841 				XDP_STATISTICS, &xdp_stats, &optlen);
842 		if (ret != 0) {
843 			AF_XDP_LOG(ERR, "getsockopt() failed for XDP_STATISTICS.\n");
844 			return -1;
845 		}
846 		stats->imissed += xdp_stats.rx_dropped;
847 
848 		stats->opackets += stats->q_opackets[i];
849 		stats->obytes += stats->q_obytes[i];
850 	}
851 
852 	return 0;
853 }
854 
855 static int
856 eth_stats_reset(struct rte_eth_dev *dev)
857 {
858 	struct pmd_internals *internals = dev->data->dev_private;
859 	int i;
860 
861 	for (i = 0; i < internals->queue_cnt; i++) {
862 		memset(&internals->rx_queues[i].stats, 0,
863 					sizeof(struct rx_stats));
864 		memset(&internals->tx_queues[i].stats, 0,
865 					sizeof(struct tx_stats));
866 	}
867 
868 	return 0;
869 }
870 
871 static void
872 remove_xdp_program(struct pmd_internals *internals)
873 {
874 	uint32_t curr_prog_id = 0;
875 
876 	if (bpf_get_link_xdp_id(internals->if_index, &curr_prog_id,
877 				XDP_FLAGS_UPDATE_IF_NOEXIST)) {
878 		AF_XDP_LOG(ERR, "bpf_get_link_xdp_id failed\n");
879 		return;
880 	}
881 	bpf_set_link_xdp_fd(internals->if_index, -1,
882 			XDP_FLAGS_UPDATE_IF_NOEXIST);
883 }
884 
885 static void
886 xdp_umem_destroy(struct xsk_umem_info *umem)
887 {
888 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
889 	umem->mb_pool = NULL;
890 #else
891 	rte_memzone_free(umem->mz);
892 	umem->mz = NULL;
893 
894 	rte_ring_free(umem->buf_ring);
895 	umem->buf_ring = NULL;
896 #endif
897 
898 	rte_free(umem);
899 }
900 
901 static int
902 eth_dev_close(struct rte_eth_dev *dev)
903 {
904 	struct pmd_internals *internals = dev->data->dev_private;
905 	struct pkt_rx_queue *rxq;
906 	int i;
907 
908 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
909 		return 0;
910 
911 	AF_XDP_LOG(INFO, "Closing AF_XDP ethdev on numa socket %u\n",
912 		rte_socket_id());
913 
914 	for (i = 0; i < internals->queue_cnt; i++) {
915 		rxq = &internals->rx_queues[i];
916 		if (rxq->umem == NULL)
917 			break;
918 		xsk_socket__delete(rxq->xsk);
919 
920 		if (__atomic_sub_fetch(&rxq->umem->refcnt, 1, __ATOMIC_ACQUIRE)
921 				== 0) {
922 			(void)xsk_umem__delete(rxq->umem->umem);
923 			xdp_umem_destroy(rxq->umem);
924 		}
925 
926 		/* free pkt_tx_queue */
927 		rte_free(rxq->pair);
928 		rte_free(rxq);
929 	}
930 
931 	/*
932 	 * MAC is not allocated dynamically, setting it to NULL would prevent
933 	 * from releasing it in rte_eth_dev_release_port.
934 	 */
935 	dev->data->mac_addrs = NULL;
936 
937 	remove_xdp_program(internals);
938 
939 	if (internals->shared_umem) {
940 		struct internal_list *list;
941 
942 		/* Remove ethdev from list used to track and share UMEMs */
943 		list = find_internal_resource(internals);
944 		if (list) {
945 			pthread_mutex_lock(&internal_list_lock);
946 			TAILQ_REMOVE(&internal_list, list, next);
947 			pthread_mutex_unlock(&internal_list_lock);
948 			rte_free(list);
949 		}
950 	}
951 
952 	return 0;
953 }
954 
955 static void
956 eth_queue_release(void *q __rte_unused)
957 {
958 }
959 
960 static int
961 eth_link_update(struct rte_eth_dev *dev __rte_unused,
962 		int wait_to_complete __rte_unused)
963 {
964 	return 0;
965 }
966 
967 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
968 static inline uintptr_t get_base_addr(struct rte_mempool *mp, uint64_t *align)
969 {
970 	struct rte_mempool_memhdr *memhdr;
971 	uintptr_t memhdr_addr, aligned_addr;
972 
973 	memhdr = STAILQ_FIRST(&mp->mem_list);
974 	memhdr_addr = (uintptr_t)memhdr->addr;
975 	aligned_addr = memhdr_addr & ~(getpagesize() - 1);
976 	*align = memhdr_addr - aligned_addr;
977 
978 	return aligned_addr;
979 }
980 
981 static struct
982 xsk_umem_info *xdp_umem_configure(struct pmd_internals *internals,
983 				  struct pkt_rx_queue *rxq)
984 {
985 	struct xsk_umem_info *umem = NULL;
986 	int ret;
987 	struct xsk_umem_config usr_config = {
988 		.fill_size = ETH_AF_XDP_DFLT_NUM_DESCS * 2,
989 		.comp_size = ETH_AF_XDP_DFLT_NUM_DESCS,
990 		.flags = XDP_UMEM_UNALIGNED_CHUNK_FLAG};
991 	void *base_addr = NULL;
992 	struct rte_mempool *mb_pool = rxq->mb_pool;
993 	uint64_t umem_size, align = 0;
994 
995 	if (internals->shared_umem) {
996 		if (get_shared_umem(rxq, internals->if_name, &umem) < 0)
997 			return NULL;
998 
999 		if (umem != NULL &&
1000 			__atomic_load_n(&umem->refcnt, __ATOMIC_ACQUIRE) <
1001 					umem->max_xsks) {
1002 			AF_XDP_LOG(INFO, "%s,qid%i sharing UMEM\n",
1003 					internals->if_name, rxq->xsk_queue_idx);
1004 			__atomic_fetch_add(&umem->refcnt, 1, __ATOMIC_ACQUIRE);
1005 		}
1006 	}
1007 
1008 	if (umem == NULL) {
1009 		usr_config.frame_size =
1010 			rte_mempool_calc_obj_size(mb_pool->elt_size,
1011 						  mb_pool->flags, NULL);
1012 		usr_config.frame_headroom = mb_pool->header_size +
1013 						sizeof(struct rte_mbuf) +
1014 						rte_pktmbuf_priv_size(mb_pool) +
1015 						RTE_PKTMBUF_HEADROOM;
1016 
1017 		umem = rte_zmalloc_socket("umem", sizeof(*umem), 0,
1018 					  rte_socket_id());
1019 		if (umem == NULL) {
1020 			AF_XDP_LOG(ERR, "Failed to allocate umem info");
1021 			return NULL;
1022 		}
1023 
1024 		umem->mb_pool = mb_pool;
1025 		base_addr = (void *)get_base_addr(mb_pool, &align);
1026 		umem_size = (uint64_t)mb_pool->populated_size *
1027 				(uint64_t)usr_config.frame_size +
1028 				align;
1029 
1030 		ret = xsk_umem__create(&umem->umem, base_addr, umem_size,
1031 				&rxq->fq, &rxq->cq, &usr_config);
1032 		if (ret) {
1033 			AF_XDP_LOG(ERR, "Failed to create umem");
1034 			goto err;
1035 		}
1036 		umem->buffer = base_addr;
1037 
1038 		if (internals->shared_umem) {
1039 			umem->max_xsks = mb_pool->populated_size /
1040 						ETH_AF_XDP_NUM_BUFFERS;
1041 			AF_XDP_LOG(INFO, "Max xsks for UMEM %s: %u\n",
1042 						mb_pool->name, umem->max_xsks);
1043 		}
1044 
1045 		__atomic_store_n(&umem->refcnt, 1, __ATOMIC_RELEASE);
1046 	}
1047 
1048 #else
1049 static struct
1050 xsk_umem_info *xdp_umem_configure(struct pmd_internals *internals,
1051 				  struct pkt_rx_queue *rxq)
1052 {
1053 	struct xsk_umem_info *umem;
1054 	const struct rte_memzone *mz;
1055 	struct xsk_umem_config usr_config = {
1056 		.fill_size = ETH_AF_XDP_DFLT_NUM_DESCS,
1057 		.comp_size = ETH_AF_XDP_DFLT_NUM_DESCS,
1058 		.frame_size = ETH_AF_XDP_FRAME_SIZE,
1059 		.frame_headroom = 0 };
1060 	char ring_name[RTE_RING_NAMESIZE];
1061 	char mz_name[RTE_MEMZONE_NAMESIZE];
1062 	int ret;
1063 	uint64_t i;
1064 
1065 	umem = rte_zmalloc_socket("umem", sizeof(*umem), 0, rte_socket_id());
1066 	if (umem == NULL) {
1067 		AF_XDP_LOG(ERR, "Failed to allocate umem info");
1068 		return NULL;
1069 	}
1070 
1071 	snprintf(ring_name, sizeof(ring_name), "af_xdp_ring_%s_%u",
1072 		       internals->if_name, rxq->xsk_queue_idx);
1073 	umem->buf_ring = rte_ring_create(ring_name,
1074 					 ETH_AF_XDP_NUM_BUFFERS,
1075 					 rte_socket_id(),
1076 					 0x0);
1077 	if (umem->buf_ring == NULL) {
1078 		AF_XDP_LOG(ERR, "Failed to create rte_ring\n");
1079 		goto err;
1080 	}
1081 
1082 	for (i = 0; i < ETH_AF_XDP_NUM_BUFFERS; i++)
1083 		rte_ring_enqueue(umem->buf_ring,
1084 				 (void *)(i * ETH_AF_XDP_FRAME_SIZE));
1085 
1086 	snprintf(mz_name, sizeof(mz_name), "af_xdp_umem_%s_%u",
1087 		       internals->if_name, rxq->xsk_queue_idx);
1088 	mz = rte_memzone_reserve_aligned(mz_name,
1089 			ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE,
1090 			rte_socket_id(), RTE_MEMZONE_IOVA_CONTIG,
1091 			getpagesize());
1092 	if (mz == NULL) {
1093 		AF_XDP_LOG(ERR, "Failed to reserve memzone for af_xdp umem.\n");
1094 		goto err;
1095 	}
1096 
1097 	ret = xsk_umem__create(&umem->umem, mz->addr,
1098 			       ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE,
1099 			       &rxq->fq, &rxq->cq,
1100 			       &usr_config);
1101 
1102 	if (ret) {
1103 		AF_XDP_LOG(ERR, "Failed to create umem");
1104 		goto err;
1105 	}
1106 	umem->mz = mz;
1107 
1108 #endif
1109 	return umem;
1110 
1111 err:
1112 	xdp_umem_destroy(umem);
1113 	return NULL;
1114 }
1115 
1116 static int
1117 load_custom_xdp_prog(const char *prog_path, int if_index)
1118 {
1119 	int ret, prog_fd = -1;
1120 	struct bpf_object *obj;
1121 	struct bpf_map *map;
1122 
1123 	ret = bpf_prog_load(prog_path, BPF_PROG_TYPE_XDP, &obj, &prog_fd);
1124 	if (ret) {
1125 		AF_XDP_LOG(ERR, "Failed to load program %s\n", prog_path);
1126 		return ret;
1127 	}
1128 
1129 	/*
1130 	 * The loaded program must provision for a map of xsks, such that some
1131 	 * traffic can be redirected to userspace. When the xsk is created,
1132 	 * libbpf inserts it into the map.
1133 	 */
1134 	map = bpf_object__find_map_by_name(obj, "xsks_map");
1135 	if (!map) {
1136 		AF_XDP_LOG(ERR, "Failed to find xsks_map in %s\n", prog_path);
1137 		return -1;
1138 	}
1139 
1140 	/* Link the program with the given network device */
1141 	ret = bpf_set_link_xdp_fd(if_index, prog_fd,
1142 					XDP_FLAGS_UPDATE_IF_NOEXIST);
1143 	if (ret) {
1144 		AF_XDP_LOG(ERR, "Failed to set prog fd %d on interface\n",
1145 				prog_fd);
1146 		return -1;
1147 	}
1148 
1149 	AF_XDP_LOG(INFO, "Successfully loaded XDP program %s with fd %d\n",
1150 				prog_path, prog_fd);
1151 
1152 	return 0;
1153 }
1154 
1155 /* Detect support for busy polling through setsockopt(). */
1156 static int
1157 configure_preferred_busy_poll(struct pkt_rx_queue *rxq)
1158 {
1159 	int sock_opt = 1;
1160 	int fd = xsk_socket__fd(rxq->xsk);
1161 	int ret = 0;
1162 
1163 	ret = setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL,
1164 			(void *)&sock_opt, sizeof(sock_opt));
1165 	if (ret < 0) {
1166 		AF_XDP_LOG(DEBUG, "Failed to set SO_PREFER_BUSY_POLL\n");
1167 		goto err_prefer;
1168 	}
1169 
1170 	sock_opt = ETH_AF_XDP_DFLT_BUSY_TIMEOUT;
1171 	ret = setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt,
1172 			sizeof(sock_opt));
1173 	if (ret < 0) {
1174 		AF_XDP_LOG(DEBUG, "Failed to set SO_BUSY_POLL\n");
1175 		goto err_timeout;
1176 	}
1177 
1178 	sock_opt = rxq->busy_budget;
1179 	ret = setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET,
1180 			(void *)&sock_opt, sizeof(sock_opt));
1181 	if (ret < 0) {
1182 		AF_XDP_LOG(DEBUG, "Failed to set SO_BUSY_POLL_BUDGET\n");
1183 	} else {
1184 		AF_XDP_LOG(INFO, "Busy polling budget set to: %u\n",
1185 					rxq->busy_budget);
1186 		return 0;
1187 	}
1188 
1189 	/* setsockopt failure - attempt to restore xsk to default state and
1190 	 * proceed without busy polling support.
1191 	 */
1192 	sock_opt = 0;
1193 	ret = setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt,
1194 			sizeof(sock_opt));
1195 	if (ret < 0) {
1196 		AF_XDP_LOG(ERR, "Failed to unset SO_BUSY_POLL\n");
1197 		return -1;
1198 	}
1199 
1200 err_timeout:
1201 	sock_opt = 0;
1202 	ret = setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL,
1203 			(void *)&sock_opt, sizeof(sock_opt));
1204 	if (ret < 0) {
1205 		AF_XDP_LOG(ERR, "Failed to unset SO_PREFER_BUSY_POLL\n");
1206 		return -1;
1207 	}
1208 
1209 err_prefer:
1210 	rxq->busy_budget = 0;
1211 	return 0;
1212 }
1213 
1214 static int
1215 xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
1216 	      int ring_size)
1217 {
1218 	struct xsk_socket_config cfg;
1219 	struct pkt_tx_queue *txq = rxq->pair;
1220 	int ret = 0;
1221 	int reserve_size = ETH_AF_XDP_DFLT_NUM_DESCS;
1222 	struct rte_mbuf *fq_bufs[reserve_size];
1223 
1224 	rxq->umem = xdp_umem_configure(internals, rxq);
1225 	if (rxq->umem == NULL)
1226 		return -ENOMEM;
1227 	txq->umem = rxq->umem;
1228 
1229 	cfg.rx_size = ring_size;
1230 	cfg.tx_size = ring_size;
1231 	cfg.libbpf_flags = 0;
1232 	cfg.xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST;
1233 	cfg.bind_flags = 0;
1234 
1235 #if defined(XDP_USE_NEED_WAKEUP)
1236 	cfg.bind_flags |= XDP_USE_NEED_WAKEUP;
1237 #endif
1238 
1239 	if (strnlen(internals->prog_path, PATH_MAX) &&
1240 				!internals->custom_prog_configured) {
1241 		ret = load_custom_xdp_prog(internals->prog_path,
1242 					   internals->if_index);
1243 		if (ret) {
1244 			AF_XDP_LOG(ERR, "Failed to load custom XDP program %s\n",
1245 					internals->prog_path);
1246 			goto err;
1247 		}
1248 		internals->custom_prog_configured = 1;
1249 	}
1250 
1251 	if (internals->shared_umem)
1252 		ret = create_shared_socket(&rxq->xsk, internals->if_name,
1253 				rxq->xsk_queue_idx, rxq->umem->umem, &rxq->rx,
1254 				&txq->tx, &rxq->fq, &rxq->cq, &cfg);
1255 	else
1256 		ret = xsk_socket__create(&rxq->xsk, internals->if_name,
1257 				rxq->xsk_queue_idx, rxq->umem->umem, &rxq->rx,
1258 				&txq->tx, &cfg);
1259 
1260 	if (ret) {
1261 		AF_XDP_LOG(ERR, "Failed to create xsk socket.\n");
1262 		goto err;
1263 	}
1264 
1265 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
1266 	ret = rte_pktmbuf_alloc_bulk(rxq->umem->mb_pool, fq_bufs, reserve_size);
1267 	if (ret) {
1268 		AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n");
1269 		goto err;
1270 	}
1271 #endif
1272 
1273 	if (rxq->busy_budget) {
1274 		ret = configure_preferred_busy_poll(rxq);
1275 		if (ret) {
1276 			AF_XDP_LOG(ERR, "Failed configure busy polling.\n");
1277 			goto err;
1278 		}
1279 	}
1280 
1281 	ret = reserve_fill_queue(rxq->umem, reserve_size, fq_bufs, &rxq->fq);
1282 	if (ret) {
1283 		xsk_socket__delete(rxq->xsk);
1284 		AF_XDP_LOG(ERR, "Failed to reserve fill queue.\n");
1285 		goto err;
1286 	}
1287 
1288 	return 0;
1289 
1290 err:
1291 	if (__atomic_sub_fetch(&rxq->umem->refcnt, 1, __ATOMIC_ACQUIRE) == 0)
1292 		xdp_umem_destroy(rxq->umem);
1293 
1294 	return ret;
1295 }
1296 
1297 static int
1298 eth_rx_queue_setup(struct rte_eth_dev *dev,
1299 		   uint16_t rx_queue_id,
1300 		   uint16_t nb_rx_desc,
1301 		   unsigned int socket_id __rte_unused,
1302 		   const struct rte_eth_rxconf *rx_conf __rte_unused,
1303 		   struct rte_mempool *mb_pool)
1304 {
1305 	struct pmd_internals *internals = dev->data->dev_private;
1306 	struct pkt_rx_queue *rxq;
1307 	int ret;
1308 
1309 	rxq = &internals->rx_queues[rx_queue_id];
1310 
1311 	AF_XDP_LOG(INFO, "Set up rx queue, rx queue id: %d, xsk queue id: %d\n",
1312 		   rx_queue_id, rxq->xsk_queue_idx);
1313 
1314 #ifndef XDP_UMEM_UNALIGNED_CHUNK_FLAG
1315 	uint32_t buf_size, data_size;
1316 
1317 	/* Now get the space available for data in the mbuf */
1318 	buf_size = rte_pktmbuf_data_room_size(mb_pool) -
1319 		RTE_PKTMBUF_HEADROOM;
1320 	data_size = ETH_AF_XDP_FRAME_SIZE;
1321 
1322 	if (data_size > buf_size) {
1323 		AF_XDP_LOG(ERR, "%s: %d bytes will not fit in mbuf (%d bytes)\n",
1324 			dev->device->name, data_size, buf_size);
1325 		ret = -ENOMEM;
1326 		goto err;
1327 	}
1328 #endif
1329 
1330 	rxq->mb_pool = mb_pool;
1331 
1332 	if (xsk_configure(internals, rxq, nb_rx_desc)) {
1333 		AF_XDP_LOG(ERR, "Failed to configure xdp socket\n");
1334 		ret = -EINVAL;
1335 		goto err;
1336 	}
1337 
1338 	if (!rxq->busy_budget)
1339 		AF_XDP_LOG(DEBUG, "Preferred busy polling not enabled\n");
1340 
1341 	rxq->fds[0].fd = xsk_socket__fd(rxq->xsk);
1342 	rxq->fds[0].events = POLLIN;
1343 
1344 	dev->data->rx_queues[rx_queue_id] = rxq;
1345 	return 0;
1346 
1347 err:
1348 	return ret;
1349 }
1350 
1351 static int
1352 eth_tx_queue_setup(struct rte_eth_dev *dev,
1353 		   uint16_t tx_queue_id,
1354 		   uint16_t nb_tx_desc __rte_unused,
1355 		   unsigned int socket_id __rte_unused,
1356 		   const struct rte_eth_txconf *tx_conf __rte_unused)
1357 {
1358 	struct pmd_internals *internals = dev->data->dev_private;
1359 	struct pkt_tx_queue *txq;
1360 
1361 	txq = &internals->tx_queues[tx_queue_id];
1362 
1363 	dev->data->tx_queues[tx_queue_id] = txq;
1364 	return 0;
1365 }
1366 
1367 static int
1368 eth_dev_mtu_set(struct rte_eth_dev *dev, uint16_t mtu)
1369 {
1370 	struct pmd_internals *internals = dev->data->dev_private;
1371 	struct ifreq ifr = { .ifr_mtu = mtu };
1372 	int ret;
1373 	int s;
1374 
1375 	s = socket(PF_INET, SOCK_DGRAM, 0);
1376 	if (s < 0)
1377 		return -EINVAL;
1378 
1379 	strlcpy(ifr.ifr_name, internals->if_name, IFNAMSIZ);
1380 	ret = ioctl(s, SIOCSIFMTU, &ifr);
1381 	close(s);
1382 
1383 	return (ret < 0) ? -errno : 0;
1384 }
1385 
1386 static int
1387 eth_dev_change_flags(char *if_name, uint32_t flags, uint32_t mask)
1388 {
1389 	struct ifreq ifr;
1390 	int ret = 0;
1391 	int s;
1392 
1393 	s = socket(PF_INET, SOCK_DGRAM, 0);
1394 	if (s < 0)
1395 		return -errno;
1396 
1397 	strlcpy(ifr.ifr_name, if_name, IFNAMSIZ);
1398 	if (ioctl(s, SIOCGIFFLAGS, &ifr) < 0) {
1399 		ret = -errno;
1400 		goto out;
1401 	}
1402 	ifr.ifr_flags &= mask;
1403 	ifr.ifr_flags |= flags;
1404 	if (ioctl(s, SIOCSIFFLAGS, &ifr) < 0) {
1405 		ret = -errno;
1406 		goto out;
1407 	}
1408 out:
1409 	close(s);
1410 	return ret;
1411 }
1412 
1413 static int
1414 eth_dev_promiscuous_enable(struct rte_eth_dev *dev)
1415 {
1416 	struct pmd_internals *internals = dev->data->dev_private;
1417 
1418 	return eth_dev_change_flags(internals->if_name, IFF_PROMISC, ~0);
1419 }
1420 
1421 static int
1422 eth_dev_promiscuous_disable(struct rte_eth_dev *dev)
1423 {
1424 	struct pmd_internals *internals = dev->data->dev_private;
1425 
1426 	return eth_dev_change_flags(internals->if_name, 0, ~IFF_PROMISC);
1427 }
1428 
1429 static const struct eth_dev_ops ops = {
1430 	.dev_start = eth_dev_start,
1431 	.dev_stop = eth_dev_stop,
1432 	.dev_close = eth_dev_close,
1433 	.dev_configure = eth_dev_configure,
1434 	.dev_infos_get = eth_dev_info,
1435 	.mtu_set = eth_dev_mtu_set,
1436 	.promiscuous_enable = eth_dev_promiscuous_enable,
1437 	.promiscuous_disable = eth_dev_promiscuous_disable,
1438 	.rx_queue_setup = eth_rx_queue_setup,
1439 	.tx_queue_setup = eth_tx_queue_setup,
1440 	.rx_queue_release = eth_queue_release,
1441 	.tx_queue_release = eth_queue_release,
1442 	.link_update = eth_link_update,
1443 	.stats_get = eth_stats_get,
1444 	.stats_reset = eth_stats_reset,
1445 };
1446 
1447 /** parse busy_budget argument */
1448 static int
1449 parse_budget_arg(const char *key __rte_unused,
1450 		  const char *value, void *extra_args)
1451 {
1452 	int *i = (int *)extra_args;
1453 	char *end;
1454 
1455 	*i = strtol(value, &end, 10);
1456 	if (*i < 0 || *i > UINT16_MAX) {
1457 		AF_XDP_LOG(ERR, "Invalid busy_budget %i, must be >= 0 and <= %u\n",
1458 				*i, UINT16_MAX);
1459 		return -EINVAL;
1460 	}
1461 
1462 	return 0;
1463 }
1464 
1465 /** parse integer from integer argument */
1466 static int
1467 parse_integer_arg(const char *key __rte_unused,
1468 		  const char *value, void *extra_args)
1469 {
1470 	int *i = (int *)extra_args;
1471 	char *end;
1472 
1473 	*i = strtol(value, &end, 10);
1474 	if (*i < 0) {
1475 		AF_XDP_LOG(ERR, "Argument has to be positive.\n");
1476 		return -EINVAL;
1477 	}
1478 
1479 	return 0;
1480 }
1481 
1482 /** parse name argument */
1483 static int
1484 parse_name_arg(const char *key __rte_unused,
1485 	       const char *value, void *extra_args)
1486 {
1487 	char *name = extra_args;
1488 
1489 	if (strnlen(value, IFNAMSIZ) > IFNAMSIZ - 1) {
1490 		AF_XDP_LOG(ERR, "Invalid name %s, should be less than %u bytes.\n",
1491 			   value, IFNAMSIZ);
1492 		return -EINVAL;
1493 	}
1494 
1495 	strlcpy(name, value, IFNAMSIZ);
1496 
1497 	return 0;
1498 }
1499 
1500 /** parse xdp prog argument */
1501 static int
1502 parse_prog_arg(const char *key __rte_unused,
1503 	       const char *value, void *extra_args)
1504 {
1505 	char *path = extra_args;
1506 
1507 	if (strnlen(value, PATH_MAX) == PATH_MAX) {
1508 		AF_XDP_LOG(ERR, "Invalid path %s, should be less than %u bytes.\n",
1509 			   value, PATH_MAX);
1510 		return -EINVAL;
1511 	}
1512 
1513 	if (access(value, F_OK) != 0) {
1514 		AF_XDP_LOG(ERR, "Error accessing %s: %s\n",
1515 			   value, strerror(errno));
1516 		return -EINVAL;
1517 	}
1518 
1519 	strlcpy(path, value, PATH_MAX);
1520 
1521 	return 0;
1522 }
1523 
1524 static int
1525 xdp_get_channels_info(const char *if_name, int *max_queues,
1526 				int *combined_queues)
1527 {
1528 	struct ethtool_channels channels;
1529 	struct ifreq ifr;
1530 	int fd, ret;
1531 
1532 	fd = socket(AF_INET, SOCK_DGRAM, 0);
1533 	if (fd < 0)
1534 		return -1;
1535 
1536 	channels.cmd = ETHTOOL_GCHANNELS;
1537 	ifr.ifr_data = (void *)&channels;
1538 	strlcpy(ifr.ifr_name, if_name, IFNAMSIZ);
1539 	ret = ioctl(fd, SIOCETHTOOL, &ifr);
1540 	if (ret) {
1541 		if (errno == EOPNOTSUPP) {
1542 			ret = 0;
1543 		} else {
1544 			ret = -errno;
1545 			goto out;
1546 		}
1547 	}
1548 
1549 	if (channels.max_combined == 0 || errno == EOPNOTSUPP) {
1550 		/* If the device says it has no channels, then all traffic
1551 		 * is sent to a single stream, so max queues = 1.
1552 		 */
1553 		*max_queues = 1;
1554 		*combined_queues = 1;
1555 	} else {
1556 		*max_queues = channels.max_combined;
1557 		*combined_queues = channels.combined_count;
1558 	}
1559 
1560  out:
1561 	close(fd);
1562 	return ret;
1563 }
1564 
1565 static int
1566 parse_parameters(struct rte_kvargs *kvlist, char *if_name, int *start_queue,
1567 			int *queue_cnt, int *shared_umem, char *prog_path,
1568 			int *busy_budget)
1569 {
1570 	int ret;
1571 
1572 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_IFACE_ARG,
1573 				 &parse_name_arg, if_name);
1574 	if (ret < 0)
1575 		goto free_kvlist;
1576 
1577 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_START_QUEUE_ARG,
1578 				 &parse_integer_arg, start_queue);
1579 	if (ret < 0)
1580 		goto free_kvlist;
1581 
1582 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_QUEUE_COUNT_ARG,
1583 				 &parse_integer_arg, queue_cnt);
1584 	if (ret < 0 || *queue_cnt <= 0) {
1585 		ret = -EINVAL;
1586 		goto free_kvlist;
1587 	}
1588 
1589 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_SHARED_UMEM_ARG,
1590 				&parse_integer_arg, shared_umem);
1591 	if (ret < 0)
1592 		goto free_kvlist;
1593 
1594 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_PROG_ARG,
1595 				 &parse_prog_arg, prog_path);
1596 	if (ret < 0)
1597 		goto free_kvlist;
1598 
1599 	ret = rte_kvargs_process(kvlist, ETH_AF_XDP_BUDGET_ARG,
1600 				&parse_budget_arg, busy_budget);
1601 	if (ret < 0)
1602 		goto free_kvlist;
1603 
1604 free_kvlist:
1605 	rte_kvargs_free(kvlist);
1606 	return ret;
1607 }
1608 
1609 static int
1610 get_iface_info(const char *if_name,
1611 	       struct rte_ether_addr *eth_addr,
1612 	       int *if_index)
1613 {
1614 	struct ifreq ifr;
1615 	int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP);
1616 
1617 	if (sock < 0)
1618 		return -1;
1619 
1620 	strlcpy(ifr.ifr_name, if_name, IFNAMSIZ);
1621 	if (ioctl(sock, SIOCGIFINDEX, &ifr))
1622 		goto error;
1623 
1624 	*if_index = ifr.ifr_ifindex;
1625 
1626 	if (ioctl(sock, SIOCGIFHWADDR, &ifr))
1627 		goto error;
1628 
1629 	rte_memcpy(eth_addr, ifr.ifr_hwaddr.sa_data, RTE_ETHER_ADDR_LEN);
1630 
1631 	close(sock);
1632 	return 0;
1633 
1634 error:
1635 	close(sock);
1636 	return -1;
1637 }
1638 
1639 static struct rte_eth_dev *
1640 init_internals(struct rte_vdev_device *dev, const char *if_name,
1641 		int start_queue_idx, int queue_cnt, int shared_umem,
1642 		const char *prog_path, int busy_budget)
1643 {
1644 	const char *name = rte_vdev_device_name(dev);
1645 	const unsigned int numa_node = dev->device.numa_node;
1646 	struct pmd_internals *internals;
1647 	struct rte_eth_dev *eth_dev;
1648 	int ret;
1649 	int i;
1650 
1651 	internals = rte_zmalloc_socket(name, sizeof(*internals), 0, numa_node);
1652 	if (internals == NULL)
1653 		return NULL;
1654 
1655 	internals->start_queue_idx = start_queue_idx;
1656 	internals->queue_cnt = queue_cnt;
1657 	strlcpy(internals->if_name, if_name, IFNAMSIZ);
1658 	strlcpy(internals->prog_path, prog_path, PATH_MAX);
1659 	internals->custom_prog_configured = 0;
1660 
1661 #ifndef ETH_AF_XDP_SHARED_UMEM
1662 	if (shared_umem) {
1663 		AF_XDP_LOG(ERR, "Shared UMEM feature not available. "
1664 				"Check kernel and libbpf version\n");
1665 		goto err_free_internals;
1666 	}
1667 #endif
1668 	internals->shared_umem = shared_umem;
1669 
1670 	if (xdp_get_channels_info(if_name, &internals->max_queue_cnt,
1671 				  &internals->combined_queue_cnt)) {
1672 		AF_XDP_LOG(ERR, "Failed to get channel info of interface: %s\n",
1673 				if_name);
1674 		goto err_free_internals;
1675 	}
1676 
1677 	if (queue_cnt > internals->combined_queue_cnt) {
1678 		AF_XDP_LOG(ERR, "Specified queue count %d is larger than combined queue count %d.\n",
1679 				queue_cnt, internals->combined_queue_cnt);
1680 		goto err_free_internals;
1681 	}
1682 
1683 	internals->rx_queues = rte_zmalloc_socket(NULL,
1684 					sizeof(struct pkt_rx_queue) * queue_cnt,
1685 					0, numa_node);
1686 	if (internals->rx_queues == NULL) {
1687 		AF_XDP_LOG(ERR, "Failed to allocate memory for rx queues.\n");
1688 		goto err_free_internals;
1689 	}
1690 
1691 	internals->tx_queues = rte_zmalloc_socket(NULL,
1692 					sizeof(struct pkt_tx_queue) * queue_cnt,
1693 					0, numa_node);
1694 	if (internals->tx_queues == NULL) {
1695 		AF_XDP_LOG(ERR, "Failed to allocate memory for tx queues.\n");
1696 		goto err_free_rx;
1697 	}
1698 	for (i = 0; i < queue_cnt; i++) {
1699 		internals->tx_queues[i].pair = &internals->rx_queues[i];
1700 		internals->rx_queues[i].pair = &internals->tx_queues[i];
1701 		internals->rx_queues[i].xsk_queue_idx = start_queue_idx + i;
1702 		internals->tx_queues[i].xsk_queue_idx = start_queue_idx + i;
1703 		internals->rx_queues[i].busy_budget = busy_budget;
1704 	}
1705 
1706 	ret = get_iface_info(if_name, &internals->eth_addr,
1707 			     &internals->if_index);
1708 	if (ret)
1709 		goto err_free_tx;
1710 
1711 	eth_dev = rte_eth_vdev_allocate(dev, 0);
1712 	if (eth_dev == NULL)
1713 		goto err_free_tx;
1714 
1715 	eth_dev->data->dev_private = internals;
1716 	eth_dev->data->dev_link = pmd_link;
1717 	eth_dev->data->mac_addrs = &internals->eth_addr;
1718 	eth_dev->data->dev_flags |= RTE_ETH_DEV_AUTOFILL_QUEUE_XSTATS;
1719 	eth_dev->dev_ops = &ops;
1720 	eth_dev->rx_pkt_burst = eth_af_xdp_rx;
1721 	eth_dev->tx_pkt_burst = eth_af_xdp_tx;
1722 
1723 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
1724 	AF_XDP_LOG(INFO, "Zero copy between umem and mbuf enabled.\n");
1725 #endif
1726 
1727 	return eth_dev;
1728 
1729 err_free_tx:
1730 	rte_free(internals->tx_queues);
1731 err_free_rx:
1732 	rte_free(internals->rx_queues);
1733 err_free_internals:
1734 	rte_free(internals);
1735 	return NULL;
1736 }
1737 
1738 static int
1739 rte_pmd_af_xdp_probe(struct rte_vdev_device *dev)
1740 {
1741 	struct rte_kvargs *kvlist;
1742 	char if_name[IFNAMSIZ] = {'\0'};
1743 	int xsk_start_queue_idx = ETH_AF_XDP_DFLT_START_QUEUE_IDX;
1744 	int xsk_queue_cnt = ETH_AF_XDP_DFLT_QUEUE_COUNT;
1745 	int shared_umem = 0;
1746 	char prog_path[PATH_MAX] = {'\0'};
1747 	int busy_budget = -1;
1748 	struct rte_eth_dev *eth_dev = NULL;
1749 	const char *name;
1750 
1751 	AF_XDP_LOG(INFO, "Initializing pmd_af_xdp for %s\n",
1752 		rte_vdev_device_name(dev));
1753 
1754 	name = rte_vdev_device_name(dev);
1755 	if (rte_eal_process_type() == RTE_PROC_SECONDARY &&
1756 		strlen(rte_vdev_device_args(dev)) == 0) {
1757 		eth_dev = rte_eth_dev_attach_secondary(name);
1758 		if (eth_dev == NULL) {
1759 			AF_XDP_LOG(ERR, "Failed to probe %s\n", name);
1760 			return -EINVAL;
1761 		}
1762 		eth_dev->dev_ops = &ops;
1763 		rte_eth_dev_probing_finish(eth_dev);
1764 		return 0;
1765 	}
1766 
1767 	kvlist = rte_kvargs_parse(rte_vdev_device_args(dev), valid_arguments);
1768 	if (kvlist == NULL) {
1769 		AF_XDP_LOG(ERR, "Invalid kvargs key\n");
1770 		return -EINVAL;
1771 	}
1772 
1773 	if (dev->device.numa_node == SOCKET_ID_ANY)
1774 		dev->device.numa_node = rte_socket_id();
1775 
1776 	if (parse_parameters(kvlist, if_name, &xsk_start_queue_idx,
1777 			     &xsk_queue_cnt, &shared_umem, prog_path,
1778 			     &busy_budget) < 0) {
1779 		AF_XDP_LOG(ERR, "Invalid kvargs value\n");
1780 		return -EINVAL;
1781 	}
1782 
1783 	if (strlen(if_name) == 0) {
1784 		AF_XDP_LOG(ERR, "Network interface must be specified\n");
1785 		return -EINVAL;
1786 	}
1787 
1788 	busy_budget = busy_budget == -1 ? ETH_AF_XDP_DFLT_BUSY_BUDGET :
1789 					busy_budget;
1790 
1791 	eth_dev = init_internals(dev, if_name, xsk_start_queue_idx,
1792 					xsk_queue_cnt, shared_umem, prog_path,
1793 					busy_budget);
1794 	if (eth_dev == NULL) {
1795 		AF_XDP_LOG(ERR, "Failed to init internals\n");
1796 		return -1;
1797 	}
1798 
1799 	rte_eth_dev_probing_finish(eth_dev);
1800 
1801 	return 0;
1802 }
1803 
1804 static int
1805 rte_pmd_af_xdp_remove(struct rte_vdev_device *dev)
1806 {
1807 	struct rte_eth_dev *eth_dev = NULL;
1808 
1809 	AF_XDP_LOG(INFO, "Removing AF_XDP ethdev on numa socket %u\n",
1810 		rte_socket_id());
1811 
1812 	if (dev == NULL)
1813 		return -1;
1814 
1815 	/* find the ethdev entry */
1816 	eth_dev = rte_eth_dev_allocated(rte_vdev_device_name(dev));
1817 	if (eth_dev == NULL)
1818 		return 0;
1819 
1820 	eth_dev_close(eth_dev);
1821 	rte_eth_dev_release_port(eth_dev);
1822 
1823 
1824 	return 0;
1825 }
1826 
1827 static struct rte_vdev_driver pmd_af_xdp_drv = {
1828 	.probe = rte_pmd_af_xdp_probe,
1829 	.remove = rte_pmd_af_xdp_remove,
1830 };
1831 
1832 RTE_PMD_REGISTER_VDEV(net_af_xdp, pmd_af_xdp_drv);
1833 RTE_PMD_REGISTER_PARAM_STRING(net_af_xdp,
1834 			      "iface=<string> "
1835 			      "start_queue=<int> "
1836 			      "queue_count=<int> "
1837 			      "shared_umem=<int> "
1838 			      "xdp_prog=<string> "
1839 			      "busy_budget=<int>");
1840