xref: /dpdk/lib/vhost/virtio_net.c (revision adec2a5ce47fa3fccd82c9796c71eeeb65e99700)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_dmadev.h>
15 #include <rte_vhost.h>
16 #include <rte_tcp.h>
17 #include <rte_udp.h>
18 #include <rte_sctp.h>
19 #include <rte_arp.h>
20 #include <rte_spinlock.h>
21 #include <rte_malloc.h>
22 #include <rte_vhost_async.h>
23 
24 #include "iotlb.h"
25 #include "vhost.h"
26 
27 #define MAX_BATCH_LEN 256
28 
29 static __rte_always_inline uint16_t
30 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
31 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
32 		uint16_t vchan_id, bool legacy_ol_flags);
33 
34 /* DMA device copy operation tracking array. */
35 struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
36 
37 static  __rte_always_inline bool
38 rxvq_is_mergeable(struct virtio_net *dev)
39 {
40 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
41 }
42 
43 static  __rte_always_inline bool
44 virtio_net_is_inorder(struct virtio_net *dev)
45 {
46 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
47 }
48 
49 static bool
50 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
51 {
52 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
53 }
54 
55 static inline void
56 vhost_queue_stats_update(const struct virtio_net *dev, struct vhost_virtqueue *vq,
57 		struct rte_mbuf **pkts, uint16_t count)
58 	__rte_shared_locks_required(&vq->access_lock)
59 {
60 	struct virtqueue_stats *stats = &vq->stats;
61 	int i;
62 
63 	if (!(dev->flags & VIRTIO_DEV_STATS_ENABLED))
64 		return;
65 
66 	for (i = 0; i < count; i++) {
67 		const struct rte_ether_addr *ea;
68 		const struct rte_mbuf *pkt = pkts[i];
69 		uint32_t pkt_len = rte_pktmbuf_pkt_len(pkt);
70 
71 		stats->packets++;
72 		stats->bytes += pkt_len;
73 
74 		if (pkt_len >= 1024)
75 			stats->size_bins[6 + (pkt_len > 1518)]++;
76 		else if (pkt_len <= 64)
77 			stats->size_bins[pkt_len >> 6]++;
78 		else
79 			stats->size_bins[32UL - rte_clz32(pkt_len) - 5]++;
80 
81 		ea = rte_pktmbuf_mtod(pkt, const struct rte_ether_addr *);
82 		RTE_BUILD_BUG_ON(offsetof(struct virtqueue_stats, broadcast) !=
83 				offsetof(struct virtqueue_stats, multicast) + sizeof(uint64_t));
84 		if (unlikely(rte_is_multicast_ether_addr(ea)))
85 			(&stats->multicast)[rte_is_broadcast_ether_addr(ea)]++;
86 	}
87 }
88 
89 static __rte_always_inline int64_t
90 vhost_async_dma_transfer_one(struct virtio_net *dev, struct vhost_virtqueue *vq,
91 		int16_t dma_id, uint16_t vchan_id, uint16_t flag_idx,
92 		struct vhost_iov_iter *pkt)
93 	__rte_shared_locks_required(&vq->access_lock)
94 {
95 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
96 	uint16_t ring_mask = dma_info->ring_mask;
97 	static bool vhost_async_dma_copy_log;
98 
99 
100 	struct vhost_iovec *iov = pkt->iov;
101 	int copy_idx = 0;
102 	uint32_t nr_segs = pkt->nr_segs;
103 	uint16_t i;
104 
105 	if (rte_dma_burst_capacity(dma_id, vchan_id) < nr_segs)
106 		return -1;
107 
108 	for (i = 0; i < nr_segs; i++) {
109 		copy_idx = rte_dma_copy(dma_id, vchan_id, (rte_iova_t)iov[i].src_addr,
110 				(rte_iova_t)iov[i].dst_addr, iov[i].len, RTE_DMA_OP_FLAG_LLC);
111 		/**
112 		 * Since all memory is pinned and DMA vChannel
113 		 * ring has enough space, failure should be a
114 		 * rare case. If failure happens, it means DMA
115 		 * device encounters serious errors; in this
116 		 * case, please stop async data-path and check
117 		 * what has happened to DMA device.
118 		 */
119 		if (unlikely(copy_idx < 0)) {
120 			if (!vhost_async_dma_copy_log) {
121 				VHOST_DATA_LOG(dev->ifname, ERR,
122 					"DMA copy failed for channel %d:%u",
123 					dma_id, vchan_id);
124 				vhost_async_dma_copy_log = true;
125 			}
126 			return -1;
127 		}
128 	}
129 
130 	/**
131 	 * Only store packet completion flag address in the last copy's
132 	 * slot, and other slots are set to NULL.
133 	 */
134 	dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = &vq->async->pkts_cmpl_flag[flag_idx];
135 
136 	return nr_segs;
137 }
138 
139 static __rte_always_inline uint16_t
140 vhost_async_dma_transfer(struct virtio_net *dev, struct vhost_virtqueue *vq,
141 		int16_t dma_id, uint16_t vchan_id, uint16_t head_idx,
142 		struct vhost_iov_iter *pkts, uint16_t nr_pkts)
143 	__rte_shared_locks_required(&vq->access_lock)
144 {
145 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
146 	int64_t ret, nr_copies = 0;
147 	uint16_t pkt_idx;
148 
149 	rte_spinlock_lock(&dma_info->dma_lock);
150 
151 	for (pkt_idx = 0; pkt_idx < nr_pkts; pkt_idx++) {
152 		ret = vhost_async_dma_transfer_one(dev, vq, dma_id, vchan_id, head_idx,
153 				&pkts[pkt_idx]);
154 		if (unlikely(ret < 0))
155 			break;
156 
157 		nr_copies += ret;
158 		head_idx++;
159 		if (head_idx >= vq->size)
160 			head_idx -= vq->size;
161 	}
162 
163 	if (likely(nr_copies > 0))
164 		rte_dma_submit(dma_id, vchan_id);
165 
166 	rte_spinlock_unlock(&dma_info->dma_lock);
167 
168 	return pkt_idx;
169 }
170 
171 static __rte_always_inline uint16_t
172 vhost_async_dma_check_completed(struct virtio_net *dev, int16_t dma_id, uint16_t vchan_id,
173 		uint16_t max_pkts)
174 {
175 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
176 	uint16_t ring_mask = dma_info->ring_mask;
177 	uint16_t last_idx = 0;
178 	uint16_t nr_copies;
179 	uint16_t copy_idx;
180 	uint16_t i;
181 	bool has_error = false;
182 	static bool vhost_async_dma_complete_log;
183 
184 	rte_spinlock_lock(&dma_info->dma_lock);
185 
186 	/**
187 	 * Print error log for debugging, if DMA reports error during
188 	 * DMA transfer. We do not handle error in vhost level.
189 	 */
190 	nr_copies = rte_dma_completed(dma_id, vchan_id, max_pkts, &last_idx, &has_error);
191 	if (unlikely(!vhost_async_dma_complete_log && has_error)) {
192 		VHOST_DATA_LOG(dev->ifname, ERR,
193 			"DMA completion failure on channel %d:%u",
194 			dma_id, vchan_id);
195 		vhost_async_dma_complete_log = true;
196 	} else if (nr_copies == 0) {
197 		goto out;
198 	}
199 
200 	copy_idx = last_idx - nr_copies + 1;
201 	for (i = 0; i < nr_copies; i++) {
202 		bool *flag;
203 
204 		flag = dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask];
205 		if (flag) {
206 			/**
207 			 * Mark the packet flag as received. The flag
208 			 * could belong to another virtqueue but write
209 			 * is atomic.
210 			 */
211 			*flag = true;
212 			dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = NULL;
213 		}
214 		copy_idx++;
215 	}
216 
217 out:
218 	rte_spinlock_unlock(&dma_info->dma_lock);
219 	return nr_copies;
220 }
221 
222 static inline void
223 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
224 	__rte_shared_locks_required(&vq->iotlb_lock)
225 {
226 	struct batch_copy_elem *elem = vq->batch_copy_elems;
227 	uint16_t count = vq->batch_copy_nb_elems;
228 	int i;
229 
230 	for (i = 0; i < count; i++) {
231 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
232 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
233 					   elem[i].len);
234 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
235 	}
236 
237 	vq->batch_copy_nb_elems = 0;
238 }
239 
240 static inline void
241 do_data_copy_dequeue(struct vhost_virtqueue *vq)
242 {
243 	struct batch_copy_elem *elem = vq->batch_copy_elems;
244 	uint16_t count = vq->batch_copy_nb_elems;
245 	int i;
246 
247 	for (i = 0; i < count; i++)
248 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
249 
250 	vq->batch_copy_nb_elems = 0;
251 }
252 
253 static __rte_always_inline void
254 do_flush_shadow_used_ring_split(struct virtio_net *dev,
255 			struct vhost_virtqueue *vq,
256 			uint16_t to, uint16_t from, uint16_t size)
257 {
258 	rte_memcpy(&vq->used->ring[to],
259 			&vq->shadow_used_split[from],
260 			size * sizeof(struct vring_used_elem));
261 	vhost_log_cache_used_vring(dev, vq,
262 			offsetof(struct vring_used, ring[to]),
263 			size * sizeof(struct vring_used_elem));
264 }
265 
266 static __rte_always_inline void
267 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
268 {
269 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
270 
271 	if (used_idx + vq->shadow_used_idx <= vq->size) {
272 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
273 					  vq->shadow_used_idx);
274 	} else {
275 		uint16_t size;
276 
277 		/* update used ring interval [used_idx, vq->size] */
278 		size = vq->size - used_idx;
279 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
280 
281 		/* update the left half used ring interval [0, left_size] */
282 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
283 					  vq->shadow_used_idx - size);
284 	}
285 	vq->last_used_idx += vq->shadow_used_idx;
286 
287 	vhost_log_cache_sync(dev, vq);
288 
289 	rte_atomic_fetch_add_explicit((unsigned short __rte_atomic *)&vq->used->idx,
290 		vq->shadow_used_idx, rte_memory_order_release);
291 	vq->shadow_used_idx = 0;
292 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
293 		sizeof(vq->used->idx));
294 }
295 
296 static __rte_always_inline void
297 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
298 			 uint16_t desc_idx, uint32_t len)
299 {
300 	uint16_t i = vq->shadow_used_idx++;
301 
302 	vq->shadow_used_split[i].id  = desc_idx;
303 	vq->shadow_used_split[i].len = len;
304 }
305 
306 static __rte_always_inline void
307 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
308 				  struct vhost_virtqueue *vq)
309 {
310 	int i;
311 	uint16_t used_idx = vq->last_used_idx;
312 	uint16_t head_idx = vq->last_used_idx;
313 	uint16_t head_flags = 0;
314 
315 	/* Split loop in two to save memory barriers */
316 	for (i = 0; i < vq->shadow_used_idx; i++) {
317 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
318 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
319 
320 		used_idx += vq->shadow_used_packed[i].count;
321 		if (used_idx >= vq->size)
322 			used_idx -= vq->size;
323 	}
324 
325 	/* The ordering for storing desc flags needs to be enforced. */
326 	rte_atomic_thread_fence(rte_memory_order_release);
327 
328 	for (i = 0; i < vq->shadow_used_idx; i++) {
329 		uint16_t flags;
330 
331 		if (vq->shadow_used_packed[i].len)
332 			flags = VRING_DESC_F_WRITE;
333 		else
334 			flags = 0;
335 
336 		if (vq->used_wrap_counter) {
337 			flags |= VRING_DESC_F_USED;
338 			flags |= VRING_DESC_F_AVAIL;
339 		} else {
340 			flags &= ~VRING_DESC_F_USED;
341 			flags &= ~VRING_DESC_F_AVAIL;
342 		}
343 
344 		if (i > 0) {
345 			vq->desc_packed[vq->last_used_idx].flags = flags;
346 
347 			vhost_log_cache_used_vring(dev, vq,
348 					vq->last_used_idx *
349 					sizeof(struct vring_packed_desc),
350 					sizeof(struct vring_packed_desc));
351 		} else {
352 			head_idx = vq->last_used_idx;
353 			head_flags = flags;
354 		}
355 
356 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
357 	}
358 
359 	vq->desc_packed[head_idx].flags = head_flags;
360 
361 	vhost_log_cache_used_vring(dev, vq,
362 				head_idx *
363 				sizeof(struct vring_packed_desc),
364 				sizeof(struct vring_packed_desc));
365 
366 	vq->shadow_used_idx = 0;
367 	vhost_log_cache_sync(dev, vq);
368 }
369 
370 static __rte_always_inline void
371 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
372 				  struct vhost_virtqueue *vq)
373 {
374 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
375 
376 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
377 	/* desc flags is the synchronization point for virtio packed vring */
378 	rte_atomic_store_explicit(
379 		(unsigned short __rte_atomic *)&vq->desc_packed[vq->shadow_last_used_idx].flags,
380 		used_elem->flags, rte_memory_order_release);
381 
382 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
383 				   sizeof(struct vring_packed_desc),
384 				   sizeof(struct vring_packed_desc));
385 	vq->shadow_used_idx = 0;
386 	vhost_log_cache_sync(dev, vq);
387 }
388 
389 static __rte_always_inline void
390 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
391 				 struct vhost_virtqueue *vq,
392 				 uint64_t *lens,
393 				 uint16_t *ids)
394 {
395 	uint16_t i;
396 	uint16_t flags;
397 	uint16_t last_used_idx;
398 	struct vring_packed_desc *desc_base;
399 
400 	last_used_idx = vq->last_used_idx;
401 	desc_base = &vq->desc_packed[last_used_idx];
402 
403 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
404 
405 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
406 		desc_base[i].id = ids[i];
407 		desc_base[i].len = lens[i];
408 	}
409 
410 	rte_atomic_thread_fence(rte_memory_order_release);
411 
412 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
413 		desc_base[i].flags = flags;
414 	}
415 
416 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
417 				   sizeof(struct vring_packed_desc),
418 				   sizeof(struct vring_packed_desc) *
419 				   PACKED_BATCH_SIZE);
420 	vhost_log_cache_sync(dev, vq);
421 
422 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
423 }
424 
425 static __rte_always_inline void
426 vhost_async_shadow_enqueue_packed_batch(struct vhost_virtqueue *vq,
427 				 uint64_t *lens,
428 				 uint16_t *ids)
429 	__rte_exclusive_locks_required(&vq->access_lock)
430 {
431 	uint16_t i;
432 	struct vhost_async *async = vq->async;
433 
434 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
435 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
436 		async->buffers_packed[async->buffer_idx_packed].len = lens[i];
437 		async->buffers_packed[async->buffer_idx_packed].count = 1;
438 		async->buffer_idx_packed++;
439 		if (async->buffer_idx_packed >= vq->size)
440 			async->buffer_idx_packed -= vq->size;
441 	}
442 }
443 
444 static __rte_always_inline void
445 vhost_async_shadow_dequeue_packed_batch(struct vhost_virtqueue *vq, uint16_t *ids)
446 	__rte_shared_locks_required(&vq->access_lock)
447 {
448 	uint16_t i;
449 	struct vhost_async *async = vq->async;
450 
451 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
452 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
453 		async->buffers_packed[async->buffer_idx_packed].len = 0;
454 		async->buffers_packed[async->buffer_idx_packed].count = 1;
455 
456 		async->buffer_idx_packed++;
457 		if (async->buffer_idx_packed >= vq->size)
458 			async->buffer_idx_packed -= vq->size;
459 	}
460 }
461 
462 static __rte_always_inline void
463 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
464 					  uint16_t id)
465 {
466 	vq->shadow_used_packed[0].id = id;
467 
468 	if (!vq->shadow_used_idx) {
469 		vq->shadow_last_used_idx = vq->last_used_idx;
470 		vq->shadow_used_packed[0].flags =
471 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
472 		vq->shadow_used_packed[0].len = 0;
473 		vq->shadow_used_packed[0].count = 1;
474 		vq->shadow_used_idx++;
475 	}
476 
477 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
478 }
479 
480 static __rte_always_inline void
481 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
482 				  struct vhost_virtqueue *vq,
483 				  uint16_t *ids)
484 {
485 	uint16_t flags;
486 	uint16_t i;
487 	uint16_t begin;
488 
489 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
490 
491 	if (!vq->shadow_used_idx) {
492 		vq->shadow_last_used_idx = vq->last_used_idx;
493 		vq->shadow_used_packed[0].id  = ids[0];
494 		vq->shadow_used_packed[0].len = 0;
495 		vq->shadow_used_packed[0].count = 1;
496 		vq->shadow_used_packed[0].flags = flags;
497 		vq->shadow_used_idx++;
498 		begin = 1;
499 	} else
500 		begin = 0;
501 
502 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
503 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
504 		vq->desc_packed[vq->last_used_idx + i].len = 0;
505 	}
506 
507 	rte_atomic_thread_fence(rte_memory_order_release);
508 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
509 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
510 
511 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
512 				   sizeof(struct vring_packed_desc),
513 				   sizeof(struct vring_packed_desc) *
514 				   PACKED_BATCH_SIZE);
515 	vhost_log_cache_sync(dev, vq);
516 
517 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
518 }
519 
520 static __rte_always_inline void
521 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
522 				   uint16_t buf_id,
523 				   uint16_t count)
524 {
525 	uint16_t flags;
526 
527 	flags = vq->desc_packed[vq->last_used_idx].flags;
528 	if (vq->used_wrap_counter) {
529 		flags |= VRING_DESC_F_USED;
530 		flags |= VRING_DESC_F_AVAIL;
531 	} else {
532 		flags &= ~VRING_DESC_F_USED;
533 		flags &= ~VRING_DESC_F_AVAIL;
534 	}
535 
536 	if (!vq->shadow_used_idx) {
537 		vq->shadow_last_used_idx = vq->last_used_idx;
538 
539 		vq->shadow_used_packed[0].id  = buf_id;
540 		vq->shadow_used_packed[0].len = 0;
541 		vq->shadow_used_packed[0].flags = flags;
542 		vq->shadow_used_idx++;
543 	} else {
544 		vq->desc_packed[vq->last_used_idx].id = buf_id;
545 		vq->desc_packed[vq->last_used_idx].len = 0;
546 		vq->desc_packed[vq->last_used_idx].flags = flags;
547 	}
548 
549 	vq_inc_last_used_packed(vq, count);
550 }
551 
552 static __rte_always_inline void
553 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
554 					   uint16_t buf_id,
555 					   uint16_t count)
556 {
557 	uint16_t flags;
558 
559 	vq->shadow_used_packed[0].id = buf_id;
560 
561 	flags = vq->desc_packed[vq->last_used_idx].flags;
562 	if (vq->used_wrap_counter) {
563 		flags |= VRING_DESC_F_USED;
564 		flags |= VRING_DESC_F_AVAIL;
565 	} else {
566 		flags &= ~VRING_DESC_F_USED;
567 		flags &= ~VRING_DESC_F_AVAIL;
568 	}
569 
570 	if (!vq->shadow_used_idx) {
571 		vq->shadow_last_used_idx = vq->last_used_idx;
572 		vq->shadow_used_packed[0].len = 0;
573 		vq->shadow_used_packed[0].flags = flags;
574 		vq->shadow_used_idx++;
575 	}
576 
577 	vq_inc_last_used_packed(vq, count);
578 }
579 
580 static __rte_always_inline void
581 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
582 				   uint32_t *len,
583 				   uint16_t *id,
584 				   uint16_t *count,
585 				   uint16_t num_buffers)
586 {
587 	uint16_t i;
588 
589 	for (i = 0; i < num_buffers; i++) {
590 		/* enqueue shadow flush action aligned with batch num */
591 		if (!vq->shadow_used_idx)
592 			vq->shadow_aligned_idx = vq->last_used_idx &
593 				PACKED_BATCH_MASK;
594 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
595 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
596 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
597 		vq->shadow_aligned_idx += count[i];
598 		vq->shadow_used_idx++;
599 	}
600 }
601 
602 static __rte_always_inline void
603 vhost_async_shadow_enqueue_packed(struct vhost_virtqueue *vq,
604 				   uint32_t *len,
605 				   uint16_t *id,
606 				   uint16_t *count,
607 				   uint16_t num_buffers)
608 	__rte_exclusive_locks_required(&vq->access_lock)
609 {
610 	uint16_t i;
611 	struct vhost_async *async = vq->async;
612 
613 	for (i = 0; i < num_buffers; i++) {
614 		async->buffers_packed[async->buffer_idx_packed].id  = id[i];
615 		async->buffers_packed[async->buffer_idx_packed].len = len[i];
616 		async->buffers_packed[async->buffer_idx_packed].count = count[i];
617 		async->buffer_idx_packed++;
618 		if (async->buffer_idx_packed >= vq->size)
619 			async->buffer_idx_packed -= vq->size;
620 	}
621 }
622 
623 static __rte_always_inline void
624 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
625 				   struct vhost_virtqueue *vq,
626 				   uint32_t *len,
627 				   uint16_t *id,
628 				   uint16_t *count,
629 				   uint16_t num_buffers)
630 	__rte_shared_locks_required(&vq->iotlb_lock)
631 {
632 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
633 
634 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
635 		do_data_copy_enqueue(dev, vq);
636 		vhost_flush_enqueue_shadow_packed(dev, vq);
637 	}
638 }
639 
640 /* avoid write operation when necessary, to lessen cache issues */
641 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
642 	if ((var) != (val))			\
643 		(var) = (val);			\
644 } while (0)
645 
646 static __rte_always_inline void
647 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
648 {
649 	uint64_t csum_l4 = m_buf->ol_flags & RTE_MBUF_F_TX_L4_MASK;
650 
651 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
652 		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
653 
654 	if (csum_l4) {
655 		/*
656 		 * Pseudo-header checksum must be set as per Virtio spec.
657 		 *
658 		 * Note: We don't propagate rte_net_intel_cksum_prepare()
659 		 * errors, as it would have an impact on performance, and an
660 		 * error would mean the packet is dropped by the guest instead
661 		 * of being dropped here.
662 		 */
663 		rte_net_intel_cksum_prepare(m_buf);
664 
665 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
666 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
667 
668 		switch (csum_l4) {
669 		case RTE_MBUF_F_TX_TCP_CKSUM:
670 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
671 						cksum));
672 			break;
673 		case RTE_MBUF_F_TX_UDP_CKSUM:
674 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
675 						dgram_cksum));
676 			break;
677 		case RTE_MBUF_F_TX_SCTP_CKSUM:
678 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
679 						cksum));
680 			break;
681 		}
682 	} else {
683 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
684 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
685 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
686 	}
687 
688 	/* IP cksum verification cannot be bypassed, then calculate here */
689 	if (m_buf->ol_flags & RTE_MBUF_F_TX_IP_CKSUM) {
690 		struct rte_ipv4_hdr *ipv4_hdr;
691 
692 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
693 						   m_buf->l2_len);
694 		ipv4_hdr->hdr_checksum = 0;
695 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
696 	}
697 
698 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
699 		if (m_buf->ol_flags & RTE_MBUF_F_TX_IPV4)
700 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
701 		else
702 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
703 		net_hdr->gso_size = m_buf->tso_segsz;
704 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
705 					+ m_buf->l4_len;
706 	} else if (m_buf->ol_flags & RTE_MBUF_F_TX_UDP_SEG) {
707 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
708 		net_hdr->gso_size = m_buf->tso_segsz;
709 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
710 			m_buf->l4_len;
711 	} else {
712 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
713 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
714 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
715 	}
716 }
717 
718 static __rte_always_inline int
719 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
720 		struct buf_vector *buf_vec, uint16_t *vec_idx,
721 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
722 	__rte_shared_locks_required(&vq->iotlb_lock)
723 {
724 	uint16_t vec_id = *vec_idx;
725 
726 	while (desc_len) {
727 		uint64_t desc_addr;
728 		uint64_t desc_chunck_len = desc_len;
729 
730 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
731 			return -1;
732 
733 		desc_addr = vhost_iova_to_vva(dev, vq,
734 				desc_iova,
735 				&desc_chunck_len,
736 				perm);
737 		if (unlikely(!desc_addr))
738 			return -1;
739 
740 		rte_prefetch0((void *)(uintptr_t)desc_addr);
741 
742 		buf_vec[vec_id].buf_iova = desc_iova;
743 		buf_vec[vec_id].buf_addr = desc_addr;
744 		buf_vec[vec_id].buf_len  = desc_chunck_len;
745 
746 		desc_len -= desc_chunck_len;
747 		desc_iova += desc_chunck_len;
748 		vec_id++;
749 	}
750 	*vec_idx = vec_id;
751 
752 	return 0;
753 }
754 
755 static __rte_always_inline int
756 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
757 			 uint32_t avail_idx, uint16_t *vec_idx,
758 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
759 			 uint32_t *desc_chain_len, uint8_t perm)
760 	__rte_shared_locks_required(&vq->iotlb_lock)
761 {
762 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
763 	uint16_t vec_id = *vec_idx;
764 	uint32_t len    = 0;
765 	uint64_t dlen;
766 	uint32_t nr_descs = vq->size;
767 	uint32_t cnt    = 0;
768 	struct vring_desc *descs = vq->desc;
769 	struct vring_desc *idesc = NULL;
770 
771 	if (unlikely(idx >= vq->size))
772 		return -1;
773 
774 	*desc_chain_head = idx;
775 
776 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
777 		dlen = vq->desc[idx].len;
778 		nr_descs = dlen / sizeof(struct vring_desc);
779 		if (unlikely(nr_descs > vq->size))
780 			return -1;
781 
782 		descs = (struct vring_desc *)(uintptr_t)
783 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
784 						&dlen,
785 						VHOST_ACCESS_RO);
786 		if (unlikely(!descs))
787 			return -1;
788 
789 		if (unlikely(dlen < vq->desc[idx].len)) {
790 			/*
791 			 * The indirect desc table is not contiguous
792 			 * in process VA space, we have to copy it.
793 			 */
794 			idesc = vhost_alloc_copy_ind_table(dev, vq,
795 					vq->desc[idx].addr, vq->desc[idx].len);
796 			if (unlikely(!idesc))
797 				return -1;
798 
799 			descs = idesc;
800 		}
801 
802 		idx = 0;
803 	}
804 
805 	while (1) {
806 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
807 			free_ind_table(idesc);
808 			return -1;
809 		}
810 
811 		dlen = descs[idx].len;
812 		len += dlen;
813 
814 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
815 						descs[idx].addr, dlen,
816 						perm))) {
817 			free_ind_table(idesc);
818 			return -1;
819 		}
820 
821 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
822 			break;
823 
824 		idx = descs[idx].next;
825 	}
826 
827 	*desc_chain_len = len;
828 	*vec_idx = vec_id;
829 
830 	if (unlikely(!!idesc))
831 		free_ind_table(idesc);
832 
833 	return 0;
834 }
835 
836 /*
837  * Returns -1 on fail, 0 on success
838  */
839 static inline int
840 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
841 				uint64_t size, struct buf_vector *buf_vec,
842 				uint16_t *num_buffers, uint16_t avail_head,
843 				uint16_t *nr_vec)
844 	__rte_shared_locks_required(&vq->iotlb_lock)
845 {
846 	uint16_t cur_idx;
847 	uint16_t vec_idx = 0;
848 	uint16_t max_tries, tries = 0;
849 
850 	uint16_t head_idx = 0;
851 	uint32_t len = 0;
852 
853 	*num_buffers = 0;
854 	cur_idx  = vq->last_avail_idx;
855 
856 	if (rxvq_is_mergeable(dev))
857 		max_tries = vq->size - 1;
858 	else
859 		max_tries = 1;
860 
861 	while (size > 0) {
862 		if (unlikely(cur_idx == avail_head))
863 			return -1;
864 		/*
865 		 * if we tried all available ring items, and still
866 		 * can't get enough buf, it means something abnormal
867 		 * happened.
868 		 */
869 		if (unlikely(++tries > max_tries))
870 			return -1;
871 
872 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
873 						&vec_idx, buf_vec,
874 						&head_idx, &len,
875 						VHOST_ACCESS_RW) < 0))
876 			return -1;
877 		len = RTE_MIN(len, size);
878 		update_shadow_used_ring_split(vq, head_idx, len);
879 		size -= len;
880 
881 		cur_idx++;
882 		*num_buffers += 1;
883 	}
884 
885 	*nr_vec = vec_idx;
886 
887 	return 0;
888 }
889 
890 static __rte_always_inline int
891 fill_vec_buf_packed_indirect(struct virtio_net *dev,
892 			struct vhost_virtqueue *vq,
893 			struct vring_packed_desc *desc, uint16_t *vec_idx,
894 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
895 	__rte_shared_locks_required(&vq->iotlb_lock)
896 {
897 	uint16_t i;
898 	uint32_t nr_descs;
899 	uint16_t vec_id = *vec_idx;
900 	uint64_t dlen;
901 	struct vring_packed_desc *descs, *idescs = NULL;
902 
903 	dlen = desc->len;
904 	descs = (struct vring_packed_desc *)(uintptr_t)
905 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
906 	if (unlikely(!descs))
907 		return -1;
908 
909 	if (unlikely(dlen < desc->len)) {
910 		/*
911 		 * The indirect desc table is not contiguous
912 		 * in process VA space, we have to copy it.
913 		 */
914 		idescs = vhost_alloc_copy_ind_table(dev,
915 				vq, desc->addr, desc->len);
916 		if (unlikely(!idescs))
917 			return -1;
918 
919 		descs = idescs;
920 	}
921 
922 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
923 	if (unlikely(nr_descs >= vq->size)) {
924 		free_ind_table(idescs);
925 		return -1;
926 	}
927 
928 	for (i = 0; i < nr_descs; i++) {
929 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
930 			free_ind_table(idescs);
931 			return -1;
932 		}
933 
934 		dlen = descs[i].len;
935 		*len += dlen;
936 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
937 						descs[i].addr, dlen,
938 						perm)))
939 			return -1;
940 	}
941 	*vec_idx = vec_id;
942 
943 	if (unlikely(!!idescs))
944 		free_ind_table(idescs);
945 
946 	return 0;
947 }
948 
949 static __rte_always_inline int
950 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
951 				uint16_t avail_idx, uint16_t *desc_count,
952 				struct buf_vector *buf_vec, uint16_t *vec_idx,
953 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
954 	__rte_shared_locks_required(&vq->iotlb_lock)
955 {
956 	bool wrap_counter = vq->avail_wrap_counter;
957 	struct vring_packed_desc *descs = vq->desc_packed;
958 	uint16_t vec_id = *vec_idx;
959 	uint64_t dlen;
960 
961 	if (avail_idx < vq->last_avail_idx)
962 		wrap_counter ^= 1;
963 
964 	/*
965 	 * Perform a load-acquire barrier in desc_is_avail to
966 	 * enforce the ordering between desc flags and desc
967 	 * content.
968 	 */
969 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
970 		return -1;
971 
972 	*desc_count = 0;
973 	*len = 0;
974 
975 	while (1) {
976 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
977 			return -1;
978 
979 		if (unlikely(*desc_count >= vq->size))
980 			return -1;
981 
982 		*desc_count += 1;
983 		*buf_id = descs[avail_idx].id;
984 
985 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
986 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
987 							&descs[avail_idx],
988 							&vec_id, buf_vec,
989 							len, perm) < 0))
990 				return -1;
991 		} else {
992 			dlen = descs[avail_idx].len;
993 			*len += dlen;
994 
995 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
996 							descs[avail_idx].addr,
997 							dlen,
998 							perm)))
999 				return -1;
1000 		}
1001 
1002 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
1003 			break;
1004 
1005 		if (++avail_idx >= vq->size) {
1006 			avail_idx -= vq->size;
1007 			wrap_counter ^= 1;
1008 		}
1009 	}
1010 
1011 	*vec_idx = vec_id;
1012 
1013 	return 0;
1014 }
1015 
1016 static __rte_noinline void
1017 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1018 		struct buf_vector *buf_vec,
1019 		struct virtio_net_hdr_mrg_rxbuf *hdr)
1020 	__rte_shared_locks_required(&vq->iotlb_lock)
1021 {
1022 	uint64_t len;
1023 	uint64_t remain = dev->vhost_hlen;
1024 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
1025 	uint64_t iova = buf_vec->buf_iova;
1026 
1027 	while (remain) {
1028 		len = RTE_MIN(remain,
1029 				buf_vec->buf_len);
1030 		dst = buf_vec->buf_addr;
1031 		rte_memcpy((void *)(uintptr_t)dst,
1032 				(void *)(uintptr_t)src,
1033 				len);
1034 
1035 		PRINT_PACKET(dev, (uintptr_t)dst,
1036 				(uint32_t)len, 0);
1037 		vhost_log_cache_write_iova(dev, vq,
1038 				iova, len);
1039 
1040 		remain -= len;
1041 		iova += len;
1042 		src += len;
1043 		buf_vec++;
1044 	}
1045 }
1046 
1047 static __rte_always_inline int
1048 async_iter_initialize(struct virtio_net *dev, struct vhost_async *async)
1049 {
1050 	struct vhost_iov_iter *iter;
1051 
1052 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1053 		VHOST_DATA_LOG(dev->ifname, ERR, "no more async iovec available");
1054 		return -1;
1055 	}
1056 
1057 	iter = async->iov_iter + async->iter_idx;
1058 	iter->iov = async->iovec + async->iovec_idx;
1059 	iter->nr_segs = 0;
1060 
1061 	return 0;
1062 }
1063 
1064 static __rte_always_inline int
1065 async_iter_add_iovec(struct virtio_net *dev, struct vhost_async *async,
1066 		void *src, void *dst, size_t len)
1067 {
1068 	struct vhost_iov_iter *iter;
1069 	struct vhost_iovec *iovec;
1070 
1071 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1072 		static bool vhost_max_async_vec_log;
1073 
1074 		if (!vhost_max_async_vec_log) {
1075 			VHOST_DATA_LOG(dev->ifname, ERR, "no more async iovec available");
1076 			vhost_max_async_vec_log = true;
1077 		}
1078 
1079 		return -1;
1080 	}
1081 
1082 	iter = async->iov_iter + async->iter_idx;
1083 	iovec = async->iovec + async->iovec_idx;
1084 
1085 	iovec->src_addr = src;
1086 	iovec->dst_addr = dst;
1087 	iovec->len = len;
1088 
1089 	iter->nr_segs++;
1090 	async->iovec_idx++;
1091 
1092 	return 0;
1093 }
1094 
1095 static __rte_always_inline void
1096 async_iter_finalize(struct vhost_async *async)
1097 {
1098 	async->iter_idx++;
1099 }
1100 
1101 static __rte_always_inline void
1102 async_iter_cancel(struct vhost_async *async)
1103 {
1104 	struct vhost_iov_iter *iter;
1105 
1106 	iter = async->iov_iter + async->iter_idx;
1107 	async->iovec_idx -= iter->nr_segs;
1108 	iter->nr_segs = 0;
1109 	iter->iov = NULL;
1110 }
1111 
1112 static __rte_always_inline void
1113 async_iter_reset(struct vhost_async *async)
1114 {
1115 	async->iter_idx = 0;
1116 	async->iovec_idx = 0;
1117 }
1118 
1119 static __rte_always_inline int
1120 async_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1121 		struct rte_mbuf *m, uint32_t mbuf_offset,
1122 		uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1123 	__rte_shared_locks_required(&vq->access_lock)
1124 	__rte_shared_locks_required(&vq->iotlb_lock)
1125 {
1126 	struct vhost_async *async = vq->async;
1127 	uint64_t mapped_len;
1128 	uint32_t buf_offset = 0;
1129 	void *src, *dst;
1130 	void *host_iova;
1131 
1132 	while (cpy_len) {
1133 		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
1134 				buf_iova + buf_offset, cpy_len, &mapped_len);
1135 		if (unlikely(!host_iova)) {
1136 			VHOST_DATA_LOG(dev->ifname, ERR,
1137 				"%s: failed to get host iova.",
1138 				__func__);
1139 			return -1;
1140 		}
1141 
1142 		if (to_desc) {
1143 			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1144 			dst = host_iova;
1145 		} else {
1146 			src = host_iova;
1147 			dst = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1148 		}
1149 
1150 		if (unlikely(async_iter_add_iovec(dev, async, src, dst, (size_t)mapped_len)))
1151 			return -1;
1152 
1153 		cpy_len -= (uint32_t)mapped_len;
1154 		mbuf_offset += (uint32_t)mapped_len;
1155 		buf_offset += (uint32_t)mapped_len;
1156 	}
1157 
1158 	return 0;
1159 }
1160 
1161 static __rte_always_inline void
1162 sync_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1163 		struct rte_mbuf *m, uint32_t mbuf_offset,
1164 		uint64_t buf_addr, uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1165 	__rte_shared_locks_required(&vq->iotlb_lock)
1166 {
1167 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
1168 
1169 	if (likely(cpy_len > MAX_BATCH_LEN || vq->batch_copy_nb_elems >= vq->size)) {
1170 		if (to_desc) {
1171 			rte_memcpy((void *)((uintptr_t)(buf_addr)),
1172 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1173 				cpy_len);
1174 			vhost_log_cache_write_iova(dev, vq, buf_iova, cpy_len);
1175 			PRINT_PACKET(dev, (uintptr_t)(buf_addr), cpy_len, 0);
1176 		} else {
1177 			rte_memcpy(rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1178 				(void *)((uintptr_t)(buf_addr)),
1179 				cpy_len);
1180 		}
1181 	} else {
1182 		if (to_desc) {
1183 			batch_copy[vq->batch_copy_nb_elems].dst =
1184 				(void *)((uintptr_t)(buf_addr));
1185 			batch_copy[vq->batch_copy_nb_elems].src =
1186 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1187 			batch_copy[vq->batch_copy_nb_elems].log_addr = buf_iova;
1188 		} else {
1189 			batch_copy[vq->batch_copy_nb_elems].dst =
1190 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1191 			batch_copy[vq->batch_copy_nb_elems].src =
1192 				(void *)((uintptr_t)(buf_addr));
1193 		}
1194 		batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
1195 		vq->batch_copy_nb_elems++;
1196 	}
1197 }
1198 
1199 static __rte_always_inline int
1200 mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1201 		struct rte_mbuf *m, struct buf_vector *buf_vec,
1202 		uint16_t nr_vec, uint16_t num_buffers, bool is_async)
1203 	__rte_shared_locks_required(&vq->access_lock)
1204 	__rte_shared_locks_required(&vq->iotlb_lock)
1205 {
1206 	uint32_t vec_idx = 0;
1207 	uint32_t mbuf_offset, mbuf_avail;
1208 	uint32_t buf_offset, buf_avail;
1209 	uint64_t buf_addr, buf_iova, buf_len;
1210 	uint32_t cpy_len;
1211 	uint64_t hdr_addr;
1212 	struct rte_mbuf *hdr_mbuf;
1213 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
1214 	struct vhost_async *async = vq->async;
1215 
1216 	if (unlikely(m == NULL))
1217 		return -1;
1218 
1219 	buf_addr = buf_vec[vec_idx].buf_addr;
1220 	buf_iova = buf_vec[vec_idx].buf_iova;
1221 	buf_len = buf_vec[vec_idx].buf_len;
1222 
1223 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
1224 		return -1;
1225 
1226 	hdr_mbuf = m;
1227 	hdr_addr = buf_addr;
1228 	if (unlikely(buf_len < dev->vhost_hlen)) {
1229 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
1230 		hdr = &tmp_hdr;
1231 	} else
1232 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
1233 
1234 	VHOST_DATA_LOG(dev->ifname, DEBUG, "RX: num merge buffers %d", num_buffers);
1235 
1236 	if (unlikely(buf_len < dev->vhost_hlen)) {
1237 		buf_offset = dev->vhost_hlen - buf_len;
1238 		vec_idx++;
1239 		buf_addr = buf_vec[vec_idx].buf_addr;
1240 		buf_iova = buf_vec[vec_idx].buf_iova;
1241 		buf_len = buf_vec[vec_idx].buf_len;
1242 		buf_avail = buf_len - buf_offset;
1243 	} else {
1244 		buf_offset = dev->vhost_hlen;
1245 		buf_avail = buf_len - dev->vhost_hlen;
1246 	}
1247 
1248 	mbuf_avail  = rte_pktmbuf_data_len(m);
1249 	mbuf_offset = 0;
1250 
1251 	if (is_async) {
1252 		if (async_iter_initialize(dev, async))
1253 			return -1;
1254 	}
1255 
1256 	while (mbuf_avail != 0 || m->next != NULL) {
1257 		/* done with current buf, get the next one */
1258 		if (buf_avail == 0) {
1259 			vec_idx++;
1260 			if (unlikely(vec_idx >= nr_vec))
1261 				goto error;
1262 
1263 			buf_addr = buf_vec[vec_idx].buf_addr;
1264 			buf_iova = buf_vec[vec_idx].buf_iova;
1265 			buf_len = buf_vec[vec_idx].buf_len;
1266 
1267 			buf_offset = 0;
1268 			buf_avail  = buf_len;
1269 		}
1270 
1271 		/* done with current mbuf, get the next one */
1272 		if (mbuf_avail == 0) {
1273 			m = m->next;
1274 
1275 			mbuf_offset = 0;
1276 			mbuf_avail  = rte_pktmbuf_data_len(m);
1277 		}
1278 
1279 		if (hdr_addr) {
1280 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1281 			if (rxvq_is_mergeable(dev))
1282 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1283 						num_buffers);
1284 
1285 			if (unlikely(hdr == &tmp_hdr)) {
1286 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1287 			} else {
1288 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1289 						dev->vhost_hlen, 0);
1290 				vhost_log_cache_write_iova(dev, vq,
1291 						buf_vec[0].buf_iova,
1292 						dev->vhost_hlen);
1293 			}
1294 
1295 			hdr_addr = 0;
1296 		}
1297 
1298 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1299 
1300 		if (is_async) {
1301 			if (async_fill_seg(dev, vq, m, mbuf_offset,
1302 					   buf_iova + buf_offset, cpy_len, true) < 0)
1303 				goto error;
1304 		} else {
1305 			sync_fill_seg(dev, vq, m, mbuf_offset,
1306 				      buf_addr + buf_offset,
1307 				      buf_iova + buf_offset, cpy_len, true);
1308 		}
1309 
1310 		mbuf_avail  -= cpy_len;
1311 		mbuf_offset += cpy_len;
1312 		buf_avail  -= cpy_len;
1313 		buf_offset += cpy_len;
1314 	}
1315 
1316 	if (is_async)
1317 		async_iter_finalize(async);
1318 
1319 	return 0;
1320 error:
1321 	if (is_async)
1322 		async_iter_cancel(async);
1323 
1324 	return -1;
1325 }
1326 
1327 static __rte_always_inline int
1328 vhost_enqueue_single_packed(struct virtio_net *dev,
1329 			    struct vhost_virtqueue *vq,
1330 			    struct rte_mbuf *pkt,
1331 			    struct buf_vector *buf_vec,
1332 			    uint16_t *nr_descs)
1333 	__rte_shared_locks_required(&vq->access_lock)
1334 	__rte_shared_locks_required(&vq->iotlb_lock)
1335 {
1336 	uint16_t nr_vec = 0;
1337 	uint16_t avail_idx = vq->last_avail_idx;
1338 	uint16_t max_tries, tries = 0;
1339 	uint16_t buf_id = 0;
1340 	uint32_t len = 0;
1341 	uint16_t desc_count;
1342 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1343 	uint16_t num_buffers = 0;
1344 	uint32_t buffer_len[vq->size];
1345 	uint16_t buffer_buf_id[vq->size];
1346 	uint16_t buffer_desc_count[vq->size];
1347 
1348 	if (rxvq_is_mergeable(dev))
1349 		max_tries = vq->size - 1;
1350 	else
1351 		max_tries = 1;
1352 
1353 	while (size > 0) {
1354 		/*
1355 		 * if we tried all available ring items, and still
1356 		 * can't get enough buf, it means something abnormal
1357 		 * happened.
1358 		 */
1359 		if (unlikely(++tries > max_tries))
1360 			return -1;
1361 
1362 		if (unlikely(fill_vec_buf_packed(dev, vq,
1363 						avail_idx, &desc_count,
1364 						buf_vec, &nr_vec,
1365 						&buf_id, &len,
1366 						VHOST_ACCESS_RW) < 0))
1367 			return -1;
1368 
1369 		len = RTE_MIN(len, size);
1370 		size -= len;
1371 
1372 		buffer_len[num_buffers] = len;
1373 		buffer_buf_id[num_buffers] = buf_id;
1374 		buffer_desc_count[num_buffers] = desc_count;
1375 		num_buffers += 1;
1376 
1377 		*nr_descs += desc_count;
1378 		avail_idx += desc_count;
1379 		if (avail_idx >= vq->size)
1380 			avail_idx -= vq->size;
1381 	}
1382 
1383 	if (mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers, false) < 0)
1384 		return -1;
1385 
1386 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1387 					   buffer_desc_count, num_buffers);
1388 
1389 	return 0;
1390 }
1391 
1392 static __rte_noinline uint32_t
1393 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1394 	struct rte_mbuf **pkts, uint32_t count)
1395 	__rte_shared_locks_required(&vq->access_lock)
1396 	__rte_shared_locks_required(&vq->iotlb_lock)
1397 {
1398 	uint32_t pkt_idx = 0;
1399 	uint16_t num_buffers;
1400 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1401 	uint16_t avail_head;
1402 
1403 	/*
1404 	 * The ordering between avail index and
1405 	 * desc reads needs to be enforced.
1406 	 */
1407 	avail_head = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
1408 		rte_memory_order_acquire);
1409 
1410 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1411 
1412 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1413 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1414 		uint16_t nr_vec = 0;
1415 
1416 		if (unlikely(reserve_avail_buf_split(dev, vq,
1417 						pkt_len, buf_vec, &num_buffers,
1418 						avail_head, &nr_vec) < 0)) {
1419 			VHOST_DATA_LOG(dev->ifname, DEBUG,
1420 				"failed to get enough desc from vring");
1421 			vq->shadow_used_idx -= num_buffers;
1422 			break;
1423 		}
1424 
1425 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1426 			"current index %d | end index %d",
1427 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1428 
1429 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec,
1430 					num_buffers, false) < 0) {
1431 			vq->shadow_used_idx -= num_buffers;
1432 			break;
1433 		}
1434 
1435 		vq->last_avail_idx += num_buffers;
1436 	}
1437 
1438 	do_data_copy_enqueue(dev, vq);
1439 
1440 	if (likely(vq->shadow_used_idx)) {
1441 		flush_shadow_used_ring_split(dev, vq);
1442 		vhost_vring_call_split(dev, vq);
1443 	}
1444 
1445 	return pkt_idx;
1446 }
1447 
1448 static __rte_always_inline int
1449 virtio_dev_rx_sync_batch_check(struct virtio_net *dev,
1450 			   struct vhost_virtqueue *vq,
1451 			   struct rte_mbuf **pkts,
1452 			   uint64_t *desc_addrs,
1453 			   uint64_t *lens)
1454 	__rte_shared_locks_required(&vq->iotlb_lock)
1455 {
1456 	bool wrap_counter = vq->avail_wrap_counter;
1457 	struct vring_packed_desc *descs = vq->desc_packed;
1458 	uint16_t avail_idx = vq->last_avail_idx;
1459 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1460 	uint16_t i;
1461 
1462 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1463 		return -1;
1464 
1465 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1466 		return -1;
1467 
1468 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1469 		if (unlikely(pkts[i]->next != NULL))
1470 			return -1;
1471 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1472 					    wrap_counter)))
1473 			return -1;
1474 	}
1475 
1476 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1477 		lens[i] = descs[avail_idx + i].len;
1478 
1479 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1480 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1481 			return -1;
1482 	}
1483 
1484 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1485 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1486 						  descs[avail_idx + i].addr,
1487 						  &lens[i],
1488 						  VHOST_ACCESS_RW);
1489 
1490 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1491 		if (unlikely(!desc_addrs[i]))
1492 			return -1;
1493 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1494 			return -1;
1495 	}
1496 
1497 	return 0;
1498 }
1499 
1500 static __rte_always_inline int
1501 virtio_dev_rx_async_batch_check(struct vhost_virtqueue *vq,
1502 			   struct rte_mbuf **pkts,
1503 			   uint64_t *desc_addrs,
1504 			   uint64_t *lens,
1505 			   int16_t dma_id,
1506 			   uint16_t vchan_id)
1507 {
1508 	bool wrap_counter = vq->avail_wrap_counter;
1509 	struct vring_packed_desc *descs = vq->desc_packed;
1510 	uint16_t avail_idx = vq->last_avail_idx;
1511 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1512 	uint16_t i;
1513 
1514 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1515 		return -1;
1516 
1517 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1518 		return -1;
1519 
1520 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1521 		if (unlikely(pkts[i]->next != NULL))
1522 			return -1;
1523 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1524 					    wrap_counter)))
1525 			return -1;
1526 	}
1527 
1528 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1529 		lens[i] = descs[avail_idx + i].len;
1530 
1531 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1532 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1533 			return -1;
1534 	}
1535 
1536 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1537 		desc_addrs[i] =  descs[avail_idx + i].addr;
1538 
1539 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1540 		if (unlikely(!desc_addrs[i]))
1541 			return -1;
1542 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1543 			return -1;
1544 	}
1545 
1546 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
1547 		return -1;
1548 
1549 	return 0;
1550 }
1551 
1552 static __rte_always_inline void
1553 virtio_dev_rx_batch_packed_copy(struct virtio_net *dev,
1554 			   struct vhost_virtqueue *vq,
1555 			   struct rte_mbuf **pkts,
1556 			   uint64_t *desc_addrs,
1557 			   uint64_t *lens)
1558 	__rte_shared_locks_required(&vq->iotlb_lock)
1559 {
1560 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1561 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1562 	struct vring_packed_desc *descs = vq->desc_packed;
1563 	uint16_t avail_idx = vq->last_avail_idx;
1564 	uint16_t ids[PACKED_BATCH_SIZE];
1565 	uint16_t i;
1566 
1567 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1568 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1569 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1570 					(uintptr_t)desc_addrs[i];
1571 		lens[i] = pkts[i]->pkt_len +
1572 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1573 	}
1574 
1575 	if (rxvq_is_mergeable(dev)) {
1576 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1577 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
1578 		}
1579 	}
1580 
1581 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1582 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1583 
1584 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1585 
1586 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1587 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1588 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1589 			   pkts[i]->pkt_len);
1590 	}
1591 
1592 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1593 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1594 					   lens[i]);
1595 
1596 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1597 		ids[i] = descs[avail_idx + i].id;
1598 
1599 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1600 }
1601 
1602 static __rte_always_inline int
1603 virtio_dev_rx_sync_batch_packed(struct virtio_net *dev,
1604 			   struct vhost_virtqueue *vq,
1605 			   struct rte_mbuf **pkts)
1606 	__rte_shared_locks_required(&vq->iotlb_lock)
1607 {
1608 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1609 	uint64_t lens[PACKED_BATCH_SIZE];
1610 
1611 	if (virtio_dev_rx_sync_batch_check(dev, vq, pkts, desc_addrs, lens) == -1)
1612 		return -1;
1613 
1614 	if (vq->shadow_used_idx) {
1615 		do_data_copy_enqueue(dev, vq);
1616 		vhost_flush_enqueue_shadow_packed(dev, vq);
1617 	}
1618 
1619 	virtio_dev_rx_batch_packed_copy(dev, vq, pkts, desc_addrs, lens);
1620 
1621 	return 0;
1622 }
1623 
1624 static __rte_always_inline int16_t
1625 virtio_dev_rx_single_packed(struct virtio_net *dev,
1626 			    struct vhost_virtqueue *vq,
1627 			    struct rte_mbuf *pkt)
1628 	__rte_shared_locks_required(&vq->access_lock)
1629 	__rte_shared_locks_required(&vq->iotlb_lock)
1630 {
1631 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1632 	uint16_t nr_descs = 0;
1633 
1634 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1635 						 &nr_descs) < 0)) {
1636 		VHOST_DATA_LOG(dev->ifname, DEBUG, "failed to get enough desc from vring");
1637 		return -1;
1638 	}
1639 
1640 	VHOST_DATA_LOG(dev->ifname, DEBUG,
1641 		"current index %d | end index %d",
1642 		vq->last_avail_idx, vq->last_avail_idx + nr_descs);
1643 
1644 	vq_inc_last_avail_packed(vq, nr_descs);
1645 
1646 	return 0;
1647 }
1648 
1649 static __rte_noinline uint32_t
1650 virtio_dev_rx_packed(struct virtio_net *dev,
1651 		     struct vhost_virtqueue *__rte_restrict vq,
1652 		     struct rte_mbuf **__rte_restrict pkts,
1653 		     uint32_t count)
1654 	__rte_shared_locks_required(&vq->access_lock)
1655 	__rte_shared_locks_required(&vq->iotlb_lock)
1656 {
1657 	uint32_t pkt_idx = 0;
1658 
1659 	do {
1660 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1661 
1662 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1663 			if (!virtio_dev_rx_sync_batch_packed(dev, vq,
1664 							&pkts[pkt_idx])) {
1665 				pkt_idx += PACKED_BATCH_SIZE;
1666 				continue;
1667 			}
1668 		}
1669 
1670 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1671 			break;
1672 		pkt_idx++;
1673 
1674 	} while (pkt_idx < count);
1675 
1676 	if (vq->shadow_used_idx) {
1677 		do_data_copy_enqueue(dev, vq);
1678 		vhost_flush_enqueue_shadow_packed(dev, vq);
1679 	}
1680 
1681 	if (pkt_idx)
1682 		vhost_vring_call_packed(dev, vq);
1683 
1684 	return pkt_idx;
1685 }
1686 
1687 static void
1688 virtio_dev_vring_translate(struct virtio_net *dev, struct vhost_virtqueue *vq)
1689 {
1690 	rte_rwlock_write_lock(&vq->access_lock);
1691 	vhost_user_iotlb_rd_lock(vq);
1692 	if (!vq->access_ok)
1693 		vring_translate(dev, vq);
1694 	vhost_user_iotlb_rd_unlock(vq);
1695 	rte_rwlock_write_unlock(&vq->access_lock);
1696 }
1697 
1698 static __rte_always_inline uint32_t
1699 virtio_dev_rx(struct virtio_net *dev, struct vhost_virtqueue *vq,
1700 	struct rte_mbuf **pkts, uint32_t count)
1701 {
1702 	uint32_t nb_tx = 0;
1703 
1704 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
1705 	rte_rwlock_read_lock(&vq->access_lock);
1706 
1707 	if (unlikely(!vq->enabled))
1708 		goto out_access_unlock;
1709 
1710 	vhost_user_iotlb_rd_lock(vq);
1711 
1712 	if (unlikely(!vq->access_ok)) {
1713 		vhost_user_iotlb_rd_unlock(vq);
1714 		rte_rwlock_read_unlock(&vq->access_lock);
1715 
1716 		virtio_dev_vring_translate(dev, vq);
1717 		goto out_no_unlock;
1718 	}
1719 
1720 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1721 	if (count == 0)
1722 		goto out;
1723 
1724 	if (vq_is_packed(dev))
1725 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1726 	else
1727 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1728 
1729 	vhost_queue_stats_update(dev, vq, pkts, nb_tx);
1730 
1731 out:
1732 	vhost_user_iotlb_rd_unlock(vq);
1733 
1734 out_access_unlock:
1735 	rte_rwlock_read_unlock(&vq->access_lock);
1736 
1737 out_no_unlock:
1738 	return nb_tx;
1739 }
1740 
1741 uint16_t
1742 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1743 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1744 {
1745 	struct virtio_net *dev = get_device(vid);
1746 
1747 	if (!dev)
1748 		return 0;
1749 
1750 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1751 		VHOST_DATA_LOG(dev->ifname, ERR,
1752 			"%s: built-in vhost net backend is disabled.",
1753 			__func__);
1754 		return 0;
1755 	}
1756 
1757 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1758 		VHOST_DATA_LOG(dev->ifname, ERR,
1759 			"%s: invalid virtqueue idx %d.",
1760 			__func__, queue_id);
1761 		return 0;
1762 	}
1763 
1764 	return virtio_dev_rx(dev, dev->virtqueue[queue_id], pkts, count);
1765 }
1766 
1767 static __rte_always_inline uint16_t
1768 async_get_first_inflight_pkt_idx(struct vhost_virtqueue *vq)
1769 	__rte_shared_locks_required(&vq->access_lock)
1770 {
1771 	struct vhost_async *async = vq->async;
1772 
1773 	if (async->pkts_idx >= async->pkts_inflight_n)
1774 		return async->pkts_idx - async->pkts_inflight_n;
1775 	else
1776 		return vq->size - async->pkts_inflight_n + async->pkts_idx;
1777 }
1778 
1779 static __rte_always_inline void
1780 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1781 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1782 {
1783 	size_t elem_size = sizeof(struct vring_used_elem);
1784 
1785 	if (d_idx + count <= ring_size) {
1786 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1787 	} else {
1788 		uint16_t size = ring_size - d_idx;
1789 
1790 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1791 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1792 	}
1793 }
1794 
1795 static __rte_noinline uint32_t
1796 virtio_dev_rx_async_submit_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1797 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
1798 	__rte_exclusive_locks_required(&vq->access_lock)
1799 	__rte_shared_locks_required(&vq->iotlb_lock)
1800 {
1801 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1802 	uint32_t pkt_idx = 0;
1803 	uint16_t num_buffers;
1804 	uint16_t avail_head;
1805 
1806 	struct vhost_async *async = vq->async;
1807 	struct async_inflight_info *pkts_info = async->pkts_info;
1808 	uint32_t pkt_err = 0;
1809 	uint16_t n_xfer;
1810 	uint16_t slot_idx = 0;
1811 
1812 	/*
1813 	 * The ordering between avail index and desc reads need to be enforced.
1814 	 */
1815 	avail_head = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
1816 		rte_memory_order_acquire);
1817 
1818 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1819 
1820 	async_iter_reset(async);
1821 
1822 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1823 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1824 		uint16_t nr_vec = 0;
1825 
1826 		if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec,
1827 						&num_buffers, avail_head, &nr_vec) < 0)) {
1828 			VHOST_DATA_LOG(dev->ifname, DEBUG,
1829 				"failed to get enough desc from vring");
1830 			vq->shadow_used_idx -= num_buffers;
1831 			break;
1832 		}
1833 
1834 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1835 			"current index %d | end index %d",
1836 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1837 
1838 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers, true) < 0) {
1839 			vq->shadow_used_idx -= num_buffers;
1840 			break;
1841 		}
1842 
1843 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
1844 		pkts_info[slot_idx].descs = num_buffers;
1845 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1846 
1847 		vq->last_avail_idx += num_buffers;
1848 	}
1849 
1850 	if (unlikely(pkt_idx == 0))
1851 		return 0;
1852 
1853 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
1854 			async->iov_iter, pkt_idx);
1855 
1856 	pkt_err = pkt_idx - n_xfer;
1857 	if (unlikely(pkt_err)) {
1858 		uint16_t num_descs = 0;
1859 
1860 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1861 			"%s: failed to transfer %u packets for queue %u.",
1862 			__func__, pkt_err, vq->index);
1863 
1864 		/* update number of completed packets */
1865 		pkt_idx = n_xfer;
1866 
1867 		/* calculate the sum of descriptors to revert */
1868 		while (pkt_err-- > 0) {
1869 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1870 			slot_idx--;
1871 		}
1872 
1873 		/* recover shadow used ring and available ring */
1874 		vq->shadow_used_idx -= num_descs;
1875 		vq->last_avail_idx -= num_descs;
1876 	}
1877 
1878 	/* keep used descriptors */
1879 	if (likely(vq->shadow_used_idx)) {
1880 		uint16_t to = async->desc_idx_split & (vq->size - 1);
1881 
1882 		store_dma_desc_info_split(vq->shadow_used_split,
1883 				async->descs_split, vq->size, 0, to,
1884 				vq->shadow_used_idx);
1885 
1886 		async->desc_idx_split += vq->shadow_used_idx;
1887 
1888 		async->pkts_idx += pkt_idx;
1889 		if (async->pkts_idx >= vq->size)
1890 			async->pkts_idx -= vq->size;
1891 
1892 		async->pkts_inflight_n += pkt_idx;
1893 		vq->shadow_used_idx = 0;
1894 	}
1895 
1896 	return pkt_idx;
1897 }
1898 
1899 
1900 static __rte_always_inline int
1901 vhost_enqueue_async_packed(struct virtio_net *dev,
1902 			    struct vhost_virtqueue *vq,
1903 			    struct rte_mbuf *pkt,
1904 			    struct buf_vector *buf_vec,
1905 			    uint16_t *nr_descs,
1906 			    uint16_t *nr_buffers)
1907 	__rte_exclusive_locks_required(&vq->access_lock)
1908 	__rte_shared_locks_required(&vq->iotlb_lock)
1909 {
1910 	uint16_t nr_vec = 0;
1911 	uint16_t avail_idx = vq->last_avail_idx;
1912 	uint16_t max_tries, tries = 0;
1913 	uint16_t buf_id = 0;
1914 	uint32_t len = 0;
1915 	uint16_t desc_count = 0;
1916 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1917 	uint32_t buffer_len[vq->size];
1918 	uint16_t buffer_buf_id[vq->size];
1919 	uint16_t buffer_desc_count[vq->size];
1920 
1921 	if (rxvq_is_mergeable(dev))
1922 		max_tries = vq->size - 1;
1923 	else
1924 		max_tries = 1;
1925 
1926 	do {
1927 		/*
1928 		 * if we tried all available ring items, and still
1929 		 * can't get enough buf, it means something abnormal
1930 		 * happened.
1931 		 */
1932 		if (unlikely(++tries > max_tries))
1933 			return -1;
1934 
1935 		if (unlikely(fill_vec_buf_packed(dev, vq,
1936 						avail_idx, &desc_count,
1937 						buf_vec, &nr_vec,
1938 						&buf_id, &len,
1939 						VHOST_ACCESS_RW) < 0))
1940 			return -1;
1941 
1942 		len = RTE_MIN(len, size);
1943 		size -= len;
1944 
1945 		buffer_len[*nr_buffers] = len;
1946 		buffer_buf_id[*nr_buffers] = buf_id;
1947 		buffer_desc_count[*nr_buffers] = desc_count;
1948 		*nr_buffers += 1;
1949 		*nr_descs += desc_count;
1950 		avail_idx += desc_count;
1951 		if (avail_idx >= vq->size)
1952 			avail_idx -= vq->size;
1953 	} while (size > 0);
1954 
1955 	if (unlikely(mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, true) < 0))
1956 		return -1;
1957 
1958 	vhost_async_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
1959 					buffer_desc_count, *nr_buffers);
1960 
1961 	return 0;
1962 }
1963 
1964 static __rte_always_inline int16_t
1965 virtio_dev_rx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1966 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers)
1967 	__rte_exclusive_locks_required(&vq->access_lock)
1968 	__rte_shared_locks_required(&vq->iotlb_lock)
1969 {
1970 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1971 
1972 	if (unlikely(vhost_enqueue_async_packed(dev, vq, pkt, buf_vec,
1973 					nr_descs, nr_buffers) < 0)) {
1974 		VHOST_DATA_LOG(dev->ifname, DEBUG, "failed to get enough desc from vring");
1975 		return -1;
1976 	}
1977 
1978 	VHOST_DATA_LOG(dev->ifname, DEBUG,
1979 		"current index %d | end index %d",
1980 		vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1981 
1982 	return 0;
1983 }
1984 
1985 static __rte_always_inline void
1986 virtio_dev_rx_async_packed_batch_enqueue(struct virtio_net *dev,
1987 			   struct vhost_virtqueue *vq,
1988 			   struct rte_mbuf **pkts,
1989 			   uint64_t *desc_addrs,
1990 			   uint64_t *lens)
1991 	__rte_exclusive_locks_required(&vq->access_lock)
1992 	__rte_shared_locks_required(&vq->iotlb_lock)
1993 {
1994 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1995 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1996 	struct vring_packed_desc *descs = vq->desc_packed;
1997 	struct vhost_async *async = vq->async;
1998 	uint16_t avail_idx = vq->last_avail_idx;
1999 	uint32_t mbuf_offset = 0;
2000 	uint16_t ids[PACKED_BATCH_SIZE];
2001 	uint64_t mapped_len[PACKED_BATCH_SIZE];
2002 	void *host_iova[PACKED_BATCH_SIZE];
2003 	uintptr_t desc;
2004 	uint16_t i;
2005 
2006 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2007 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2008 		desc = vhost_iova_to_vva(dev, vq, desc_addrs[i], &lens[i], VHOST_ACCESS_RW);
2009 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc;
2010 		lens[i] = pkts[i]->pkt_len +
2011 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
2012 	}
2013 
2014 	if (rxvq_is_mergeable(dev)) {
2015 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2016 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
2017 		}
2018 	}
2019 
2020 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2021 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
2022 
2023 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2024 
2025 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2026 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
2027 			desc_addrs[i] + buf_offset, lens[i], &mapped_len[i]);
2028 	}
2029 
2030 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2031 		async_iter_initialize(dev, async);
2032 		async_iter_add_iovec(dev, async,
2033 				(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
2034 				host_iova[i],
2035 				mapped_len[i]);
2036 		async->iter_idx++;
2037 	}
2038 
2039 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2040 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr, lens[i]);
2041 
2042 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2043 		ids[i] = descs[avail_idx + i].id;
2044 
2045 	vhost_async_shadow_enqueue_packed_batch(vq, lens, ids);
2046 }
2047 
2048 static __rte_always_inline int
2049 virtio_dev_rx_async_packed_batch(struct virtio_net *dev,
2050 			   struct vhost_virtqueue *vq,
2051 			   struct rte_mbuf **pkts,
2052 			   int16_t dma_id, uint16_t vchan_id)
2053 	__rte_exclusive_locks_required(&vq->access_lock)
2054 	__rte_shared_locks_required(&vq->iotlb_lock)
2055 {
2056 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
2057 	uint64_t lens[PACKED_BATCH_SIZE];
2058 
2059 	if (virtio_dev_rx_async_batch_check(vq, pkts, desc_addrs, lens, dma_id, vchan_id) == -1)
2060 		return -1;
2061 
2062 	virtio_dev_rx_async_packed_batch_enqueue(dev, vq, pkts, desc_addrs, lens);
2063 
2064 	return 0;
2065 }
2066 
2067 static __rte_always_inline void
2068 dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
2069 			uint32_t nr_err, uint32_t *pkt_idx)
2070 	__rte_exclusive_locks_required(&vq->access_lock)
2071 {
2072 	uint16_t descs_err = 0;
2073 	uint16_t buffers_err = 0;
2074 	struct vhost_async *async = vq->async;
2075 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
2076 
2077 	*pkt_idx -= nr_err;
2078 	/* calculate the sum of buffers and descs of DMA-error packets. */
2079 	while (nr_err-- > 0) {
2080 		descs_err += pkts_info[slot_idx % vq->size].descs;
2081 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
2082 		slot_idx--;
2083 	}
2084 
2085 	if (vq->last_avail_idx >= descs_err) {
2086 		vq->last_avail_idx -= descs_err;
2087 	} else {
2088 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
2089 		vq->avail_wrap_counter ^= 1;
2090 	}
2091 
2092 	if (async->buffer_idx_packed >= buffers_err)
2093 		async->buffer_idx_packed -= buffers_err;
2094 	else
2095 		async->buffer_idx_packed = async->buffer_idx_packed + vq->size - buffers_err;
2096 }
2097 
2098 static __rte_noinline uint32_t
2099 virtio_dev_rx_async_submit_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2100 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2101 	__rte_exclusive_locks_required(&vq->access_lock)
2102 	__rte_shared_locks_required(&vq->iotlb_lock)
2103 {
2104 	uint32_t pkt_idx = 0;
2105 	uint16_t n_xfer;
2106 	uint16_t num_buffers;
2107 	uint16_t num_descs;
2108 
2109 	struct vhost_async *async = vq->async;
2110 	struct async_inflight_info *pkts_info = async->pkts_info;
2111 	uint32_t pkt_err = 0;
2112 	uint16_t slot_idx = 0;
2113 	uint16_t i;
2114 
2115 	do {
2116 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2117 
2118 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
2119 			if (!virtio_dev_rx_async_packed_batch(dev, vq, &pkts[pkt_idx],
2120 					dma_id, vchan_id)) {
2121 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
2122 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2123 					pkts_info[slot_idx].descs = 1;
2124 					pkts_info[slot_idx].nr_buffers = 1;
2125 					pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2126 					pkt_idx++;
2127 				}
2128 				continue;
2129 			}
2130 		}
2131 
2132 		num_buffers = 0;
2133 		num_descs = 0;
2134 		if (unlikely(virtio_dev_rx_async_packed(dev, vq, pkts[pkt_idx],
2135 						&num_descs, &num_buffers) < 0))
2136 			break;
2137 
2138 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2139 
2140 		pkts_info[slot_idx].descs = num_descs;
2141 		pkts_info[slot_idx].nr_buffers = num_buffers;
2142 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2143 
2144 		pkt_idx++;
2145 		vq_inc_last_avail_packed(vq, num_descs);
2146 	} while (pkt_idx < count);
2147 
2148 	if (unlikely(pkt_idx == 0))
2149 		return 0;
2150 
2151 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
2152 			async->iov_iter, pkt_idx);
2153 
2154 	async_iter_reset(async);
2155 
2156 	pkt_err = pkt_idx - n_xfer;
2157 	if (unlikely(pkt_err)) {
2158 		VHOST_DATA_LOG(dev->ifname, DEBUG,
2159 			"%s: failed to transfer %u packets for queue %u.",
2160 			__func__, pkt_err, vq->index);
2161 		dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
2162 	}
2163 
2164 	async->pkts_idx += pkt_idx;
2165 	if (async->pkts_idx >= vq->size)
2166 		async->pkts_idx -= vq->size;
2167 
2168 	async->pkts_inflight_n += pkt_idx;
2169 
2170 	return pkt_idx;
2171 }
2172 
2173 static __rte_always_inline void
2174 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
2175 	__rte_shared_locks_required(&vq->access_lock)
2176 {
2177 	struct vhost_async *async = vq->async;
2178 	uint16_t nr_left = n_descs;
2179 	uint16_t nr_copy;
2180 	uint16_t to, from;
2181 
2182 	do {
2183 		from = async->last_desc_idx_split & (vq->size - 1);
2184 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
2185 		to = vq->last_used_idx & (vq->size - 1);
2186 
2187 		if (to + nr_copy <= vq->size) {
2188 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2189 					nr_copy * sizeof(struct vring_used_elem));
2190 		} else {
2191 			uint16_t size = vq->size - to;
2192 
2193 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2194 					size * sizeof(struct vring_used_elem));
2195 			rte_memcpy(&vq->used->ring[0], &async->descs_split[from + size],
2196 					(nr_copy - size) * sizeof(struct vring_used_elem));
2197 		}
2198 
2199 		async->last_desc_idx_split += nr_copy;
2200 		vq->last_used_idx += nr_copy;
2201 		nr_left -= nr_copy;
2202 	} while (nr_left > 0);
2203 }
2204 
2205 static __rte_always_inline void
2206 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
2207 				uint16_t n_buffers)
2208 	__rte_shared_locks_required(&vq->access_lock)
2209 {
2210 	struct vhost_async *async = vq->async;
2211 	uint16_t from = async->last_buffer_idx_packed;
2212 	uint16_t used_idx = vq->last_used_idx;
2213 	uint16_t head_idx = vq->last_used_idx;
2214 	uint16_t head_flags = 0;
2215 	uint16_t i;
2216 
2217 	/* Split loop in two to save memory barriers */
2218 	for (i = 0; i < n_buffers; i++) {
2219 		vq->desc_packed[used_idx].id = async->buffers_packed[from].id;
2220 		vq->desc_packed[used_idx].len = async->buffers_packed[from].len;
2221 
2222 		used_idx += async->buffers_packed[from].count;
2223 		if (used_idx >= vq->size)
2224 			used_idx -= vq->size;
2225 
2226 		from++;
2227 		if (from >= vq->size)
2228 			from = 0;
2229 	}
2230 
2231 	/* The ordering for storing desc flags needs to be enforced. */
2232 	rte_atomic_thread_fence(rte_memory_order_release);
2233 
2234 	from = async->last_buffer_idx_packed;
2235 
2236 	for (i = 0; i < n_buffers; i++) {
2237 		uint16_t flags;
2238 
2239 		if (async->buffers_packed[from].len)
2240 			flags = VRING_DESC_F_WRITE;
2241 		else
2242 			flags = 0;
2243 
2244 		if (vq->used_wrap_counter) {
2245 			flags |= VRING_DESC_F_USED;
2246 			flags |= VRING_DESC_F_AVAIL;
2247 		} else {
2248 			flags &= ~VRING_DESC_F_USED;
2249 			flags &= ~VRING_DESC_F_AVAIL;
2250 		}
2251 
2252 		if (i > 0) {
2253 			vq->desc_packed[vq->last_used_idx].flags = flags;
2254 		} else {
2255 			head_idx = vq->last_used_idx;
2256 			head_flags = flags;
2257 		}
2258 
2259 		vq_inc_last_used_packed(vq, async->buffers_packed[from].count);
2260 
2261 		from++;
2262 		if (from == vq->size)
2263 			from = 0;
2264 	}
2265 
2266 	vq->desc_packed[head_idx].flags = head_flags;
2267 	async->last_buffer_idx_packed = from;
2268 }
2269 
2270 static __rte_always_inline uint16_t
2271 vhost_poll_enqueue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2272 	struct rte_mbuf **pkts, uint16_t count, int16_t dma_id, uint16_t vchan_id)
2273 	__rte_shared_locks_required(&vq->access_lock)
2274 {
2275 	struct vhost_async *async = vq->async;
2276 	struct async_inflight_info *pkts_info = async->pkts_info;
2277 	uint16_t nr_cpl_pkts = 0;
2278 	uint16_t n_descs = 0, n_buffers = 0;
2279 	uint16_t start_idx, from, i;
2280 
2281 	/* Check completed copies for the given DMA vChannel */
2282 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
2283 
2284 	start_idx = async_get_first_inflight_pkt_idx(vq);
2285 	/**
2286 	 * Calculate the number of copy completed packets.
2287 	 * Note that there may be completed packets even if
2288 	 * no copies are reported done by the given DMA vChannel,
2289 	 * as it's possible that a virtqueue uses multiple DMA
2290 	 * vChannels.
2291 	 */
2292 	from = start_idx;
2293 	while (vq->async->pkts_cmpl_flag[from] && count--) {
2294 		vq->async->pkts_cmpl_flag[from] = false;
2295 		from++;
2296 		if (from >= vq->size)
2297 			from -= vq->size;
2298 		nr_cpl_pkts++;
2299 	}
2300 
2301 	if (nr_cpl_pkts == 0)
2302 		return 0;
2303 
2304 	for (i = 0; i < nr_cpl_pkts; i++) {
2305 		from = (start_idx + i) % vq->size;
2306 		/* Only used with packed ring */
2307 		n_buffers += pkts_info[from].nr_buffers;
2308 		/* Only used with split ring */
2309 		n_descs += pkts_info[from].descs;
2310 		pkts[i] = pkts_info[from].mbuf;
2311 	}
2312 
2313 	async->pkts_inflight_n -= nr_cpl_pkts;
2314 
2315 	if (likely(vq->enabled && vq->access_ok)) {
2316 		if (vq_is_packed(dev)) {
2317 			write_back_completed_descs_packed(vq, n_buffers);
2318 			vhost_vring_call_packed(dev, vq);
2319 		} else {
2320 			write_back_completed_descs_split(vq, n_descs);
2321 			rte_atomic_fetch_add_explicit(
2322 				(unsigned short __rte_atomic *)&vq->used->idx,
2323 				n_descs, rte_memory_order_release);
2324 			vhost_vring_call_split(dev, vq);
2325 		}
2326 	} else {
2327 		if (vq_is_packed(dev)) {
2328 			async->last_buffer_idx_packed += n_buffers;
2329 			if (async->last_buffer_idx_packed >= vq->size)
2330 				async->last_buffer_idx_packed -= vq->size;
2331 		} else {
2332 			async->last_desc_idx_split += n_descs;
2333 		}
2334 	}
2335 
2336 	return nr_cpl_pkts;
2337 }
2338 
2339 uint16_t
2340 rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
2341 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2342 		uint16_t vchan_id)
2343 {
2344 	struct virtio_net *dev = get_device(vid);
2345 	struct vhost_virtqueue *vq;
2346 	uint16_t n_pkts_cpl = 0;
2347 
2348 	if (unlikely(!dev))
2349 		return 0;
2350 
2351 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2352 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2353 		VHOST_DATA_LOG(dev->ifname, ERR,
2354 			"%s: invalid virtqueue idx %d.",
2355 			__func__, queue_id);
2356 		return 0;
2357 	}
2358 
2359 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2360 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2361 		VHOST_DATA_LOG(dev->ifname, ERR,
2362 			"%s: invalid channel %d:%u.",
2363 			__func__, dma_id, vchan_id);
2364 		return 0;
2365 	}
2366 
2367 	vq = dev->virtqueue[queue_id];
2368 
2369 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2370 		VHOST_DATA_LOG(dev->ifname, DEBUG,
2371 			"%s: virtqueue %u is busy.",
2372 			__func__, queue_id);
2373 		return 0;
2374 	}
2375 
2376 	if (unlikely(!vq->async)) {
2377 		VHOST_DATA_LOG(dev->ifname, ERR,
2378 			"%s: async not registered for virtqueue %d.",
2379 			__func__, queue_id);
2380 		goto out;
2381 	}
2382 
2383 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count, dma_id, vchan_id);
2384 
2385 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2386 	vq->stats.inflight_completed += n_pkts_cpl;
2387 
2388 out:
2389 	rte_rwlock_read_unlock(&vq->access_lock);
2390 
2391 	return n_pkts_cpl;
2392 }
2393 
2394 uint16_t
2395 rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
2396 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2397 		uint16_t vchan_id)
2398 {
2399 	struct virtio_net *dev = get_device(vid);
2400 	struct vhost_virtqueue *vq;
2401 	uint16_t n_pkts_cpl = 0;
2402 
2403 	if (!dev)
2404 		return 0;
2405 
2406 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2407 	if (unlikely(queue_id >= dev->nr_vring)) {
2408 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %d.",
2409 			__func__, queue_id);
2410 		return 0;
2411 	}
2412 
2413 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2414 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
2415 			__func__, dma_id);
2416 		return 0;
2417 	}
2418 
2419 	vq = dev->virtqueue[queue_id];
2420 
2421 	vq_assert_lock(dev, vq);
2422 
2423 	if (unlikely(!vq->async)) {
2424 		VHOST_DATA_LOG(dev->ifname, ERR,
2425 			"%s: async not registered for virtqueue %d.",
2426 			__func__, queue_id);
2427 		return 0;
2428 	}
2429 
2430 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2431 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2432 		VHOST_DATA_LOG(dev->ifname, ERR,
2433 			"%s: invalid channel %d:%u.",
2434 			__func__, dma_id, vchan_id);
2435 		return 0;
2436 	}
2437 
2438 	if ((queue_id & 1) == 0)
2439 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2440 			dma_id, vchan_id);
2441 	else
2442 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2443 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2444 
2445 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2446 	vq->stats.inflight_completed += n_pkts_cpl;
2447 
2448 	return n_pkts_cpl;
2449 }
2450 
2451 uint16_t
2452 rte_vhost_clear_queue(int vid, uint16_t queue_id, struct rte_mbuf **pkts,
2453 		uint16_t count, int16_t dma_id, uint16_t vchan_id)
2454 {
2455 	struct virtio_net *dev = get_device(vid);
2456 	struct vhost_virtqueue *vq;
2457 	uint16_t n_pkts_cpl = 0;
2458 
2459 	if (!dev)
2460 		return 0;
2461 
2462 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2463 	if (unlikely(queue_id >= dev->nr_vring)) {
2464 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %u.",
2465 			__func__, queue_id);
2466 		return 0;
2467 	}
2468 
2469 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2470 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
2471 			__func__, dma_id);
2472 		return 0;
2473 	}
2474 
2475 	vq = dev->virtqueue[queue_id];
2476 
2477 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2478 		VHOST_DATA_LOG(dev->ifname, DEBUG, "%s: virtqueue %u is busy.",
2479 			__func__, queue_id);
2480 		return 0;
2481 	}
2482 
2483 	if (unlikely(!vq->async)) {
2484 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: async not registered for queue id %u.",
2485 			__func__, queue_id);
2486 		goto out_access_unlock;
2487 	}
2488 
2489 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2490 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2491 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid channel %d:%u.",
2492 			__func__, dma_id, vchan_id);
2493 		goto out_access_unlock;
2494 	}
2495 
2496 	if ((queue_id & 1) == 0)
2497 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2498 			dma_id, vchan_id);
2499 	else
2500 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2501 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2502 
2503 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2504 	vq->stats.inflight_completed += n_pkts_cpl;
2505 
2506 out_access_unlock:
2507 	rte_rwlock_read_unlock(&vq->access_lock);
2508 
2509 	return n_pkts_cpl;
2510 }
2511 
2512 static __rte_always_inline uint32_t
2513 virtio_dev_rx_async_submit(struct virtio_net *dev, struct vhost_virtqueue *vq,
2514 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2515 {
2516 	uint32_t nb_tx = 0;
2517 
2518 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2519 
2520 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2521 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2522 		VHOST_DATA_LOG(dev->ifname, ERR,
2523 			"%s: invalid channel %d:%u.",
2524 			 __func__, dma_id, vchan_id);
2525 		return 0;
2526 	}
2527 
2528 	rte_rwlock_write_lock(&vq->access_lock);
2529 
2530 	if (unlikely(!vq->enabled || !vq->async))
2531 		goto out_access_unlock;
2532 
2533 	vhost_user_iotlb_rd_lock(vq);
2534 
2535 	if (unlikely(!vq->access_ok)) {
2536 		vhost_user_iotlb_rd_unlock(vq);
2537 		rte_rwlock_read_unlock(&vq->access_lock);
2538 
2539 		virtio_dev_vring_translate(dev, vq);
2540 		goto out_no_unlock;
2541 	}
2542 
2543 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
2544 	if (count == 0)
2545 		goto out;
2546 
2547 	if (vq_is_packed(dev))
2548 		nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, pkts, count,
2549 			dma_id, vchan_id);
2550 	else
2551 		nb_tx = virtio_dev_rx_async_submit_split(dev, vq, pkts, count,
2552 			dma_id, vchan_id);
2553 
2554 	vq->stats.inflight_submitted += nb_tx;
2555 
2556 out:
2557 	vhost_user_iotlb_rd_unlock(vq);
2558 
2559 out_access_unlock:
2560 	rte_rwlock_write_unlock(&vq->access_lock);
2561 
2562 out_no_unlock:
2563 	return nb_tx;
2564 }
2565 
2566 uint16_t
2567 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2568 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2569 		uint16_t vchan_id)
2570 {
2571 	struct virtio_net *dev = get_device(vid);
2572 
2573 	if (!dev)
2574 		return 0;
2575 
2576 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2577 		VHOST_DATA_LOG(dev->ifname, ERR,
2578 			"%s: built-in vhost net backend is disabled.",
2579 			__func__);
2580 		return 0;
2581 	}
2582 
2583 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2584 		VHOST_DATA_LOG(dev->ifname, ERR,
2585 			"%s: invalid virtqueue idx %d.",
2586 			__func__, queue_id);
2587 		return 0;
2588 	}
2589 
2590 	return virtio_dev_rx_async_submit(dev, dev->virtqueue[queue_id], pkts, count,
2591 		dma_id, vchan_id);
2592 }
2593 
2594 static inline bool
2595 virtio_net_with_host_offload(struct virtio_net *dev)
2596 {
2597 	if (dev->features &
2598 			((1ULL << VIRTIO_NET_F_CSUM) |
2599 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2600 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2601 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2602 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2603 		return true;
2604 
2605 	return false;
2606 }
2607 
2608 static int
2609 parse_headers(struct rte_mbuf *m, uint8_t *l4_proto)
2610 {
2611 	struct rte_ipv4_hdr *ipv4_hdr;
2612 	struct rte_ipv6_hdr *ipv6_hdr;
2613 	struct rte_ether_hdr *eth_hdr;
2614 	uint16_t ethertype;
2615 	uint16_t data_len = rte_pktmbuf_data_len(m);
2616 
2617 	if (data_len < sizeof(struct rte_ether_hdr))
2618 		return -EINVAL;
2619 
2620 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2621 
2622 	m->l2_len = sizeof(struct rte_ether_hdr);
2623 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2624 
2625 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2626 		if (data_len < sizeof(struct rte_ether_hdr) +
2627 				sizeof(struct rte_vlan_hdr))
2628 			goto error;
2629 
2630 		struct rte_vlan_hdr *vlan_hdr =
2631 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2632 
2633 		m->l2_len += sizeof(struct rte_vlan_hdr);
2634 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2635 	}
2636 
2637 	switch (ethertype) {
2638 	case RTE_ETHER_TYPE_IPV4:
2639 		if (data_len < m->l2_len + sizeof(struct rte_ipv4_hdr))
2640 			goto error;
2641 		ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
2642 				m->l2_len);
2643 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2644 		if (data_len < m->l2_len + m->l3_len)
2645 			goto error;
2646 		m->ol_flags |= RTE_MBUF_F_TX_IPV4;
2647 		*l4_proto = ipv4_hdr->next_proto_id;
2648 		break;
2649 	case RTE_ETHER_TYPE_IPV6:
2650 		if (data_len < m->l2_len + sizeof(struct rte_ipv6_hdr))
2651 			goto error;
2652 		ipv6_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
2653 				m->l2_len);
2654 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2655 		m->ol_flags |= RTE_MBUF_F_TX_IPV6;
2656 		*l4_proto = ipv6_hdr->proto;
2657 		break;
2658 	default:
2659 		/* a valid L3 header is needed for further L4 parsing */
2660 		goto error;
2661 	}
2662 
2663 	/* both CSUM and GSO need a valid L4 header */
2664 	switch (*l4_proto) {
2665 	case IPPROTO_TCP:
2666 		if (data_len < m->l2_len + m->l3_len +
2667 				sizeof(struct rte_tcp_hdr))
2668 			goto error;
2669 		break;
2670 	case IPPROTO_UDP:
2671 		if (data_len < m->l2_len + m->l3_len +
2672 				sizeof(struct rte_udp_hdr))
2673 			goto error;
2674 		break;
2675 	case IPPROTO_SCTP:
2676 		if (data_len < m->l2_len + m->l3_len +
2677 				sizeof(struct rte_sctp_hdr))
2678 			goto error;
2679 		break;
2680 	default:
2681 		goto error;
2682 	}
2683 
2684 	return 0;
2685 
2686 error:
2687 	m->l2_len = 0;
2688 	m->l3_len = 0;
2689 	m->ol_flags = 0;
2690 	return -EINVAL;
2691 }
2692 
2693 static __rte_always_inline void
2694 vhost_dequeue_offload_legacy(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2695 		struct rte_mbuf *m)
2696 {
2697 	uint8_t l4_proto = 0;
2698 	struct rte_tcp_hdr *tcp_hdr = NULL;
2699 	uint16_t tcp_len;
2700 	uint16_t data_len = rte_pktmbuf_data_len(m);
2701 
2702 	if (parse_headers(m, &l4_proto) < 0)
2703 		return;
2704 
2705 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2706 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2707 			switch (hdr->csum_offset) {
2708 			case (offsetof(struct rte_tcp_hdr, cksum)):
2709 				if (l4_proto != IPPROTO_TCP)
2710 					goto error;
2711 				m->ol_flags |= RTE_MBUF_F_TX_TCP_CKSUM;
2712 				break;
2713 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2714 				if (l4_proto != IPPROTO_UDP)
2715 					goto error;
2716 				m->ol_flags |= RTE_MBUF_F_TX_UDP_CKSUM;
2717 				break;
2718 			case (offsetof(struct rte_sctp_hdr, cksum)):
2719 				if (l4_proto != IPPROTO_SCTP)
2720 					goto error;
2721 				m->ol_flags |= RTE_MBUF_F_TX_SCTP_CKSUM;
2722 				break;
2723 			default:
2724 				goto error;
2725 			}
2726 		} else {
2727 			goto error;
2728 		}
2729 	}
2730 
2731 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2732 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2733 		case VIRTIO_NET_HDR_GSO_TCPV4:
2734 		case VIRTIO_NET_HDR_GSO_TCPV6:
2735 			if (l4_proto != IPPROTO_TCP)
2736 				goto error;
2737 			tcp_hdr = rte_pktmbuf_mtod_offset(m,
2738 					struct rte_tcp_hdr *,
2739 					m->l2_len + m->l3_len);
2740 			tcp_len = (tcp_hdr->data_off & 0xf0) >> 2;
2741 			if (data_len < m->l2_len + m->l3_len + tcp_len)
2742 				goto error;
2743 			m->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
2744 			m->tso_segsz = hdr->gso_size;
2745 			m->l4_len = tcp_len;
2746 			break;
2747 		case VIRTIO_NET_HDR_GSO_UDP:
2748 			if (l4_proto != IPPROTO_UDP)
2749 				goto error;
2750 			m->ol_flags |= RTE_MBUF_F_TX_UDP_SEG;
2751 			m->tso_segsz = hdr->gso_size;
2752 			m->l4_len = sizeof(struct rte_udp_hdr);
2753 			break;
2754 		default:
2755 			VHOST_DATA_LOG(dev->ifname, WARNING,
2756 				"unsupported gso type %u.",
2757 				hdr->gso_type);
2758 			goto error;
2759 		}
2760 	}
2761 	return;
2762 
2763 error:
2764 	m->l2_len = 0;
2765 	m->l3_len = 0;
2766 	m->ol_flags = 0;
2767 }
2768 
2769 static __rte_always_inline void
2770 vhost_dequeue_offload(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2771 		struct rte_mbuf *m, bool legacy_ol_flags)
2772 {
2773 	struct rte_net_hdr_lens hdr_lens;
2774 	int l4_supported = 0;
2775 	uint32_t ptype;
2776 
2777 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2778 		return;
2779 
2780 	if (legacy_ol_flags) {
2781 		vhost_dequeue_offload_legacy(dev, hdr, m);
2782 		return;
2783 	}
2784 
2785 	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
2786 
2787 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2788 	m->packet_type = ptype;
2789 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2790 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2791 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2792 		l4_supported = 1;
2793 
2794 	/* According to Virtio 1.1 spec, the device only needs to look at
2795 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2796 	 * This differs from the processing incoming packets path where the
2797 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2798 	 * device.
2799 	 *
2800 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2801 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2802 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2803 	 *
2804 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2805 	 * The device MUST ignore flag bits that it does not recognize.
2806 	 */
2807 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2808 		uint32_t hdrlen;
2809 
2810 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2811 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2812 			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
2813 		} else {
2814 			/* Unknown proto or tunnel, do sw cksum. We can assume
2815 			 * the cksum field is in the first segment since the
2816 			 * buffers we provided to the host are large enough.
2817 			 * In case of SCTP, this will be wrong since it's a CRC
2818 			 * but there's nothing we can do.
2819 			 */
2820 			uint16_t csum = 0, off;
2821 
2822 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2823 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2824 				return;
2825 			if (likely(csum != 0xffff))
2826 				csum = ~csum;
2827 			off = hdr->csum_offset + hdr->csum_start;
2828 			if (rte_pktmbuf_data_len(m) >= off + 1)
2829 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2830 		}
2831 	}
2832 
2833 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2834 		if (hdr->gso_size == 0)
2835 			return;
2836 
2837 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2838 		case VIRTIO_NET_HDR_GSO_TCPV4:
2839 		case VIRTIO_NET_HDR_GSO_TCPV6:
2840 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2841 				break;
2842 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2843 			m->tso_segsz = hdr->gso_size;
2844 			break;
2845 		case VIRTIO_NET_HDR_GSO_UDP:
2846 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2847 				break;
2848 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2849 			m->tso_segsz = hdr->gso_size;
2850 			break;
2851 		default:
2852 			break;
2853 		}
2854 	}
2855 }
2856 
2857 static __rte_noinline void
2858 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2859 		struct buf_vector *buf_vec)
2860 {
2861 	uint64_t len;
2862 	uint64_t remain = sizeof(struct virtio_net_hdr);
2863 	uint64_t src;
2864 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2865 
2866 	while (remain) {
2867 		len = RTE_MIN(remain, buf_vec->buf_len);
2868 		src = buf_vec->buf_addr;
2869 		rte_memcpy((void *)(uintptr_t)dst,
2870 				(void *)(uintptr_t)src, len);
2871 
2872 		remain -= len;
2873 		dst += len;
2874 		buf_vec++;
2875 	}
2876 }
2877 
2878 static __rte_always_inline int
2879 desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2880 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2881 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2882 		  bool legacy_ol_flags, uint16_t slot_idx, bool is_async)
2883 	__rte_shared_locks_required(&vq->access_lock)
2884 	__rte_shared_locks_required(&vq->iotlb_lock)
2885 {
2886 	uint32_t buf_avail, buf_offset, buf_len;
2887 	uint64_t buf_addr, buf_iova;
2888 	uint32_t mbuf_avail, mbuf_offset;
2889 	uint32_t hdr_remain = dev->vhost_hlen;
2890 	uint32_t cpy_len;
2891 	struct rte_mbuf *cur = m, *prev = m;
2892 	struct virtio_net_hdr tmp_hdr;
2893 	struct virtio_net_hdr *hdr = NULL;
2894 	uint16_t vec_idx;
2895 	struct vhost_async *async = vq->async;
2896 	struct async_inflight_info *pkts_info;
2897 
2898 	/*
2899 	 * The caller has checked the descriptors chain is larger than the
2900 	 * header size.
2901 	 */
2902 
2903 	if (virtio_net_with_host_offload(dev)) {
2904 		if (unlikely(buf_vec[0].buf_len < sizeof(struct virtio_net_hdr))) {
2905 			/*
2906 			 * No luck, the virtio-net header doesn't fit
2907 			 * in a contiguous virtual area.
2908 			 */
2909 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2910 			hdr = &tmp_hdr;
2911 		} else {
2912 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_vec[0].buf_addr);
2913 		}
2914 	}
2915 
2916 	for (vec_idx = 0; vec_idx < nr_vec; vec_idx++) {
2917 		if (buf_vec[vec_idx].buf_len > hdr_remain)
2918 			break;
2919 
2920 		hdr_remain -= buf_vec[vec_idx].buf_len;
2921 	}
2922 
2923 	buf_addr = buf_vec[vec_idx].buf_addr;
2924 	buf_iova = buf_vec[vec_idx].buf_iova;
2925 	buf_len = buf_vec[vec_idx].buf_len;
2926 	buf_offset = hdr_remain;
2927 	buf_avail = buf_vec[vec_idx].buf_len - hdr_remain;
2928 
2929 	PRINT_PACKET(dev,
2930 			(uintptr_t)(buf_addr + buf_offset),
2931 			(uint32_t)buf_avail, 0);
2932 
2933 	mbuf_offset = 0;
2934 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2935 
2936 	if (is_async) {
2937 		pkts_info = async->pkts_info;
2938 		if (async_iter_initialize(dev, async))
2939 			return -1;
2940 	}
2941 
2942 	while (1) {
2943 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2944 
2945 		if (is_async) {
2946 			if (async_fill_seg(dev, vq, cur, mbuf_offset,
2947 					   buf_iova + buf_offset, cpy_len, false) < 0)
2948 				goto error;
2949 		} else if (likely(hdr && cur == m)) {
2950 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
2951 				(void *)((uintptr_t)(buf_addr + buf_offset)),
2952 				cpy_len);
2953 		} else {
2954 			sync_fill_seg(dev, vq, cur, mbuf_offset,
2955 				      buf_addr + buf_offset,
2956 				      buf_iova + buf_offset, cpy_len, false);
2957 		}
2958 
2959 		mbuf_avail  -= cpy_len;
2960 		mbuf_offset += cpy_len;
2961 		buf_avail -= cpy_len;
2962 		buf_offset += cpy_len;
2963 
2964 		/* This buf reaches to its end, get the next one */
2965 		if (buf_avail == 0) {
2966 			if (++vec_idx >= nr_vec)
2967 				break;
2968 
2969 			buf_addr = buf_vec[vec_idx].buf_addr;
2970 			buf_iova = buf_vec[vec_idx].buf_iova;
2971 			buf_len = buf_vec[vec_idx].buf_len;
2972 
2973 			buf_offset = 0;
2974 			buf_avail  = buf_len;
2975 
2976 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2977 					(uint32_t)buf_avail, 0);
2978 		}
2979 
2980 		/*
2981 		 * This mbuf reaches to its end, get a new one
2982 		 * to hold more data.
2983 		 */
2984 		if (mbuf_avail == 0) {
2985 			cur = rte_pktmbuf_alloc(mbuf_pool);
2986 			if (unlikely(cur == NULL)) {
2987 				vq->stats.mbuf_alloc_failed++;
2988 				VHOST_DATA_LOG(dev->ifname, ERR,
2989 					"failed to allocate memory for mbuf.");
2990 				goto error;
2991 			}
2992 
2993 			prev->next = cur;
2994 			prev->data_len = mbuf_offset;
2995 			m->nb_segs += 1;
2996 			m->pkt_len += mbuf_offset;
2997 			prev = cur;
2998 
2999 			mbuf_offset = 0;
3000 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
3001 		}
3002 	}
3003 
3004 	prev->data_len = mbuf_offset;
3005 	m->pkt_len    += mbuf_offset;
3006 
3007 	if (is_async) {
3008 		async_iter_finalize(async);
3009 		if (hdr)
3010 			pkts_info[slot_idx].nethdr = *hdr;
3011 	} else if (hdr) {
3012 		vhost_dequeue_offload(dev, hdr, m, legacy_ol_flags);
3013 	}
3014 
3015 	return 0;
3016 error:
3017 	if (is_async)
3018 		async_iter_cancel(async);
3019 
3020 	return -1;
3021 }
3022 
3023 static void
3024 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
3025 {
3026 	rte_free(opaque);
3027 }
3028 
3029 static int
3030 virtio_dev_extbuf_alloc(struct virtio_net *dev, struct rte_mbuf *pkt, uint32_t size)
3031 {
3032 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
3033 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
3034 	uint16_t buf_len;
3035 	rte_iova_t iova;
3036 	void *buf;
3037 
3038 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
3039 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
3040 
3041 	if (unlikely(total_len > UINT16_MAX))
3042 		return -ENOSPC;
3043 
3044 	buf_len = total_len;
3045 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
3046 	if (unlikely(buf == NULL))
3047 		return -ENOMEM;
3048 
3049 	/* Initialize shinfo */
3050 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
3051 						virtio_dev_extbuf_free, buf);
3052 	if (unlikely(shinfo == NULL)) {
3053 		rte_free(buf);
3054 		VHOST_DATA_LOG(dev->ifname, ERR, "failed to init shinfo");
3055 		return -1;
3056 	}
3057 
3058 	iova = rte_malloc_virt2iova(buf);
3059 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
3060 	rte_pktmbuf_reset_headroom(pkt);
3061 
3062 	return 0;
3063 }
3064 
3065 /*
3066  * Prepare a host supported pktmbuf.
3067  */
3068 static __rte_always_inline int
3069 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
3070 			 uint32_t data_len)
3071 {
3072 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
3073 		return 0;
3074 
3075 	/* attach an external buffer if supported */
3076 	if (dev->extbuf && !virtio_dev_extbuf_alloc(dev, pkt, data_len))
3077 		return 0;
3078 
3079 	/* check if chained buffers are allowed */
3080 	if (!dev->linearbuf)
3081 		return 0;
3082 
3083 	return -1;
3084 }
3085 
3086 __rte_always_inline
3087 static uint16_t
3088 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3089 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3090 	bool legacy_ol_flags)
3091 	__rte_shared_locks_required(&vq->access_lock)
3092 	__rte_shared_locks_required(&vq->iotlb_lock)
3093 {
3094 	uint16_t i;
3095 	uint16_t avail_entries;
3096 	static bool allocerr_warned;
3097 
3098 	/*
3099 	 * The ordering between avail index and
3100 	 * desc reads needs to be enforced.
3101 	 */
3102 	avail_entries = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
3103 		rte_memory_order_acquire) - vq->last_avail_idx;
3104 	if (avail_entries == 0)
3105 		return 0;
3106 
3107 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3108 
3109 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
3110 
3111 	count = RTE_MIN(count, MAX_PKT_BURST);
3112 	count = RTE_MIN(count, avail_entries);
3113 	VHOST_DATA_LOG(dev->ifname, DEBUG, "about to dequeue %u buffers", count);
3114 
3115 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count)) {
3116 		vq->stats.mbuf_alloc_failed += count;
3117 		return 0;
3118 	}
3119 
3120 	for (i = 0; i < count; i++) {
3121 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3122 		uint16_t head_idx;
3123 		uint32_t buf_len;
3124 		uint16_t nr_vec = 0;
3125 		int err;
3126 
3127 		if (unlikely(fill_vec_buf_split(dev, vq,
3128 						vq->last_avail_idx + i,
3129 						&nr_vec, buf_vec,
3130 						&head_idx, &buf_len,
3131 						VHOST_ACCESS_RO) < 0))
3132 			break;
3133 
3134 		update_shadow_used_ring_split(vq, head_idx, 0);
3135 
3136 		if (unlikely(buf_len <= dev->vhost_hlen))
3137 			break;
3138 
3139 		buf_len -= dev->vhost_hlen;
3140 
3141 		err = virtio_dev_pktmbuf_prep(dev, pkts[i], buf_len);
3142 		if (unlikely(err)) {
3143 			/*
3144 			 * mbuf allocation fails for jumbo packets when external
3145 			 * buffer allocation is not allowed and linear buffer
3146 			 * is required. Drop this packet.
3147 			 */
3148 			if (!allocerr_warned) {
3149 				VHOST_DATA_LOG(dev->ifname, ERR,
3150 					"failed mbuf alloc of size %d from %s.",
3151 					buf_len, mbuf_pool->name);
3152 				allocerr_warned = true;
3153 			}
3154 			break;
3155 		}
3156 
3157 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
3158 				   mbuf_pool, legacy_ol_flags, 0, false);
3159 		if (unlikely(err)) {
3160 			if (!allocerr_warned) {
3161 				VHOST_DATA_LOG(dev->ifname, ERR, "failed to copy desc to mbuf.");
3162 				allocerr_warned = true;
3163 			}
3164 			break;
3165 		}
3166 	}
3167 
3168 	if (unlikely(count != i))
3169 		rte_pktmbuf_free_bulk(&pkts[i], count - i);
3170 
3171 	if (likely(vq->shadow_used_idx)) {
3172 		vq->last_avail_idx += vq->shadow_used_idx;
3173 		do_data_copy_dequeue(vq);
3174 		flush_shadow_used_ring_split(dev, vq);
3175 		vhost_vring_call_split(dev, vq);
3176 	}
3177 
3178 	return i;
3179 }
3180 
3181 __rte_noinline
3182 static uint16_t
3183 virtio_dev_tx_split_legacy(struct virtio_net *dev,
3184 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3185 	struct rte_mbuf **pkts, uint16_t count)
3186 	__rte_shared_locks_required(&vq->access_lock)
3187 	__rte_shared_locks_required(&vq->iotlb_lock)
3188 {
3189 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
3190 }
3191 
3192 __rte_noinline
3193 static uint16_t
3194 virtio_dev_tx_split_compliant(struct virtio_net *dev,
3195 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3196 	struct rte_mbuf **pkts, uint16_t count)
3197 	__rte_shared_locks_required(&vq->access_lock)
3198 	__rte_shared_locks_required(&vq->iotlb_lock)
3199 {
3200 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
3201 }
3202 
3203 static __rte_always_inline int
3204 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
3205 				 struct vhost_virtqueue *vq,
3206 				 struct rte_mbuf **pkts,
3207 				 uint16_t avail_idx,
3208 				 uintptr_t *desc_addrs,
3209 				 uint16_t *ids)
3210 	__rte_shared_locks_required(&vq->iotlb_lock)
3211 {
3212 	bool wrap = vq->avail_wrap_counter;
3213 	struct vring_packed_desc *descs = vq->desc_packed;
3214 	uint64_t lens[PACKED_BATCH_SIZE];
3215 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3216 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3217 	uint16_t flags, i;
3218 
3219 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3220 		return -1;
3221 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3222 		return -1;
3223 
3224 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3225 		flags = descs[avail_idx + i].flags;
3226 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3227 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3228 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3229 			return -1;
3230 	}
3231 
3232 	rte_atomic_thread_fence(rte_memory_order_acquire);
3233 
3234 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3235 		lens[i] = descs[avail_idx + i].len;
3236 
3237 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3238 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
3239 						  descs[avail_idx + i].addr,
3240 						  &lens[i], VHOST_ACCESS_RW);
3241 	}
3242 
3243 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3244 		if (unlikely(!desc_addrs[i]))
3245 			return -1;
3246 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3247 			return -1;
3248 	}
3249 
3250 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3251 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3252 			goto err;
3253 	}
3254 
3255 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3256 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3257 
3258 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3259 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3260 			goto err;
3261 	}
3262 
3263 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3264 		pkts[i]->pkt_len = lens[i] - buf_offset;
3265 		pkts[i]->data_len = pkts[i]->pkt_len;
3266 		ids[i] = descs[avail_idx + i].id;
3267 	}
3268 
3269 	return 0;
3270 
3271 err:
3272 	return -1;
3273 }
3274 
3275 static __rte_always_inline int
3276 vhost_async_tx_batch_packed_check(struct virtio_net *dev,
3277 				 struct vhost_virtqueue *vq,
3278 				 struct rte_mbuf **pkts,
3279 				 uint16_t avail_idx,
3280 				 uintptr_t *desc_addrs,
3281 				 uint64_t *lens,
3282 				 uint16_t *ids,
3283 				 int16_t dma_id,
3284 				 uint16_t vchan_id)
3285 {
3286 	bool wrap = vq->avail_wrap_counter;
3287 	struct vring_packed_desc *descs = vq->desc_packed;
3288 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3289 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3290 	uint16_t flags, i;
3291 
3292 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3293 		return -1;
3294 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3295 		return -1;
3296 
3297 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3298 		flags = descs[avail_idx + i].flags;
3299 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3300 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3301 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3302 			return -1;
3303 	}
3304 
3305 	rte_atomic_thread_fence(rte_memory_order_acquire);
3306 
3307 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3308 		lens[i] = descs[avail_idx + i].len;
3309 
3310 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3311 		desc_addrs[i] = descs[avail_idx + i].addr;
3312 	}
3313 
3314 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3315 		if (unlikely(!desc_addrs[i]))
3316 			return -1;
3317 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3318 			return -1;
3319 	}
3320 
3321 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3322 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3323 			goto err;
3324 	}
3325 
3326 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3327 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3328 
3329 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3330 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3331 			goto err;
3332 	}
3333 
3334 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3335 		pkts[i]->pkt_len = lens[i] - buf_offset;
3336 		pkts[i]->data_len = pkts[i]->pkt_len;
3337 		ids[i] = descs[avail_idx + i].id;
3338 	}
3339 
3340 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
3341 		return -1;
3342 
3343 	return 0;
3344 
3345 err:
3346 	return -1;
3347 }
3348 
3349 static __rte_always_inline int
3350 virtio_dev_tx_batch_packed(struct virtio_net *dev,
3351 			   struct vhost_virtqueue *vq,
3352 			   struct rte_mbuf **pkts,
3353 			   bool legacy_ol_flags)
3354 	__rte_shared_locks_required(&vq->iotlb_lock)
3355 {
3356 	uint16_t avail_idx = vq->last_avail_idx;
3357 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3358 	struct virtio_net_hdr *hdr;
3359 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
3360 	uint16_t ids[PACKED_BATCH_SIZE];
3361 	uint16_t i;
3362 
3363 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
3364 					     desc_addrs, ids))
3365 		return -1;
3366 
3367 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3368 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
3369 
3370 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3371 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
3372 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
3373 			   pkts[i]->pkt_len);
3374 
3375 	if (virtio_net_with_host_offload(dev)) {
3376 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3377 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
3378 			vhost_dequeue_offload(dev, hdr, pkts[i], legacy_ol_flags);
3379 		}
3380 	}
3381 
3382 	if (virtio_net_is_inorder(dev))
3383 		vhost_shadow_dequeue_batch_packed_inorder(vq,
3384 			ids[PACKED_BATCH_SIZE - 1]);
3385 	else
3386 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
3387 
3388 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
3389 
3390 	return 0;
3391 }
3392 
3393 static __rte_always_inline int
3394 vhost_dequeue_single_packed(struct virtio_net *dev,
3395 			    struct vhost_virtqueue *vq,
3396 			    struct rte_mempool *mbuf_pool,
3397 			    struct rte_mbuf *pkts,
3398 			    uint16_t *buf_id,
3399 			    uint16_t *desc_count,
3400 			    bool legacy_ol_flags)
3401 	__rte_shared_locks_required(&vq->access_lock)
3402 	__rte_shared_locks_required(&vq->iotlb_lock)
3403 {
3404 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3405 	uint32_t buf_len;
3406 	uint16_t nr_vec = 0;
3407 	int err;
3408 	static bool allocerr_warned;
3409 
3410 	if (unlikely(fill_vec_buf_packed(dev, vq,
3411 					 vq->last_avail_idx, desc_count,
3412 					 buf_vec, &nr_vec,
3413 					 buf_id, &buf_len,
3414 					 VHOST_ACCESS_RO) < 0))
3415 		return -1;
3416 
3417 	if (unlikely(buf_len <= dev->vhost_hlen))
3418 		return -1;
3419 
3420 	buf_len -= dev->vhost_hlen;
3421 
3422 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3423 		if (!allocerr_warned) {
3424 			VHOST_DATA_LOG(dev->ifname, ERR,
3425 				"failed mbuf alloc of size %d from %s.",
3426 				buf_len, mbuf_pool->name);
3427 			allocerr_warned = true;
3428 		}
3429 		return -1;
3430 	}
3431 
3432 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
3433 			   mbuf_pool, legacy_ol_flags, 0, false);
3434 	if (unlikely(err)) {
3435 		if (!allocerr_warned) {
3436 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to copy desc to mbuf.");
3437 			allocerr_warned = true;
3438 		}
3439 		return -1;
3440 	}
3441 
3442 	return 0;
3443 }
3444 
3445 static __rte_always_inline int
3446 virtio_dev_tx_single_packed(struct virtio_net *dev,
3447 			    struct vhost_virtqueue *vq,
3448 			    struct rte_mempool *mbuf_pool,
3449 			    struct rte_mbuf *pkts,
3450 			    bool legacy_ol_flags)
3451 	__rte_shared_locks_required(&vq->access_lock)
3452 	__rte_shared_locks_required(&vq->iotlb_lock)
3453 {
3454 
3455 	uint16_t buf_id, desc_count = 0;
3456 	int ret;
3457 
3458 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
3459 					&desc_count, legacy_ol_flags);
3460 
3461 	if (likely(desc_count > 0)) {
3462 		if (virtio_net_is_inorder(dev))
3463 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
3464 								   desc_count);
3465 		else
3466 			vhost_shadow_dequeue_single_packed(vq, buf_id,
3467 					desc_count);
3468 
3469 		vq_inc_last_avail_packed(vq, desc_count);
3470 	}
3471 
3472 	return ret;
3473 }
3474 
3475 static __rte_always_inline uint16_t
3476 get_nb_avail_entries_packed(const struct vhost_virtqueue *__rte_restrict vq,
3477 			    uint16_t max_nb_avail_entries)
3478 {
3479 	const struct vring_packed_desc *descs = vq->desc_packed;
3480 	bool avail_wrap = vq->avail_wrap_counter;
3481 	uint16_t avail_idx = vq->last_avail_idx;
3482 	uint16_t nb_avail_entries = 0;
3483 	uint16_t flags;
3484 
3485 	while (nb_avail_entries < max_nb_avail_entries) {
3486 		flags = descs[avail_idx].flags;
3487 
3488 		if ((avail_wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3489 		    (avail_wrap == !!(flags & VRING_DESC_F_USED)))
3490 			return nb_avail_entries;
3491 
3492 		if (!(flags & VRING_DESC_F_NEXT))
3493 			++nb_avail_entries;
3494 
3495 		if (unlikely(++avail_idx >= vq->size)) {
3496 			avail_idx -= vq->size;
3497 			avail_wrap = !avail_wrap;
3498 		}
3499 	}
3500 
3501 	return nb_avail_entries;
3502 }
3503 
3504 __rte_always_inline
3505 static uint16_t
3506 virtio_dev_tx_packed(struct virtio_net *dev,
3507 		     struct vhost_virtqueue *__rte_restrict vq,
3508 		     struct rte_mempool *mbuf_pool,
3509 		     struct rte_mbuf **__rte_restrict pkts,
3510 		     uint32_t count,
3511 		     bool legacy_ol_flags)
3512 	__rte_shared_locks_required(&vq->access_lock)
3513 	__rte_shared_locks_required(&vq->iotlb_lock)
3514 {
3515 	uint32_t pkt_idx = 0;
3516 
3517 	count = get_nb_avail_entries_packed(vq, count);
3518 	if (count == 0)
3519 		return 0;
3520 
3521 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count)) {
3522 		vq->stats.mbuf_alloc_failed += count;
3523 		return 0;
3524 	}
3525 
3526 	do {
3527 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
3528 
3529 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
3530 			if (!virtio_dev_tx_batch_packed(dev, vq,
3531 							&pkts[pkt_idx],
3532 							legacy_ol_flags)) {
3533 				pkt_idx += PACKED_BATCH_SIZE;
3534 				continue;
3535 			}
3536 		}
3537 
3538 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
3539 						pkts[pkt_idx],
3540 						legacy_ol_flags))
3541 			break;
3542 		pkt_idx++;
3543 	} while (pkt_idx < count);
3544 
3545 	if (pkt_idx != count)
3546 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
3547 
3548 	if (vq->shadow_used_idx) {
3549 		do_data_copy_dequeue(vq);
3550 
3551 		vhost_flush_dequeue_shadow_packed(dev, vq);
3552 		vhost_vring_call_packed(dev, vq);
3553 	}
3554 
3555 	return pkt_idx;
3556 }
3557 
3558 __rte_noinline
3559 static uint16_t
3560 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
3561 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3562 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3563 	__rte_shared_locks_required(&vq->access_lock)
3564 	__rte_shared_locks_required(&vq->iotlb_lock)
3565 {
3566 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
3567 }
3568 
3569 __rte_noinline
3570 static uint16_t
3571 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
3572 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3573 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3574 	__rte_shared_locks_required(&vq->access_lock)
3575 	__rte_shared_locks_required(&vq->iotlb_lock)
3576 {
3577 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
3578 }
3579 
3580 uint16_t
3581 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
3582 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
3583 {
3584 	struct virtio_net *dev;
3585 	struct rte_mbuf *rarp_mbuf = NULL;
3586 	struct vhost_virtqueue *vq;
3587 	int16_t success = 1;
3588 
3589 	dev = get_device(vid);
3590 	if (!dev)
3591 		return 0;
3592 
3593 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
3594 		VHOST_DATA_LOG(dev->ifname, ERR,
3595 			"%s: built-in vhost net backend is disabled.",
3596 			__func__);
3597 		return 0;
3598 	}
3599 
3600 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
3601 		VHOST_DATA_LOG(dev->ifname, ERR,
3602 			"%s: invalid virtqueue idx %d.",
3603 			__func__, queue_id);
3604 		return 0;
3605 	}
3606 
3607 	vq = dev->virtqueue[queue_id];
3608 
3609 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
3610 		return 0;
3611 
3612 	if (unlikely(!vq->enabled)) {
3613 		count = 0;
3614 		goto out_access_unlock;
3615 	}
3616 
3617 	vhost_user_iotlb_rd_lock(vq);
3618 
3619 	if (unlikely(!vq->access_ok)) {
3620 		vhost_user_iotlb_rd_unlock(vq);
3621 		rte_rwlock_read_unlock(&vq->access_lock);
3622 
3623 		virtio_dev_vring_translate(dev, vq);
3624 		goto out_no_unlock;
3625 	}
3626 
3627 	/*
3628 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
3629 	 * array, to looks like that guest actually send such packet.
3630 	 *
3631 	 * Check user_send_rarp() for more information.
3632 	 *
3633 	 * broadcast_rarp shares a cacheline in the virtio_net structure
3634 	 * with some fields that are accessed during enqueue and
3635 	 * rte_atomic_compare_exchange_strong_explicit causes a write if performed compare
3636 	 * and exchange. This could result in false sharing between enqueue
3637 	 * and dequeue.
3638 	 *
3639 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
3640 	 * and only performing compare and exchange if the read indicates it
3641 	 * is likely to be set.
3642 	 */
3643 	if (unlikely(rte_atomic_load_explicit(&dev->broadcast_rarp, rte_memory_order_acquire) &&
3644 			rte_atomic_compare_exchange_strong_explicit(&dev->broadcast_rarp,
3645 			&success, 0, rte_memory_order_release, rte_memory_order_relaxed))) {
3646 
3647 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
3648 		if (rarp_mbuf == NULL) {
3649 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to make RARP packet.");
3650 			count = 0;
3651 			goto out;
3652 		}
3653 		/*
3654 		 * Inject it to the head of "pkts" array, so that switch's mac
3655 		 * learning table will get updated first.
3656 		 */
3657 		pkts[0] = rarp_mbuf;
3658 		vhost_queue_stats_update(dev, vq, pkts, 1);
3659 		pkts++;
3660 		count -= 1;
3661 	}
3662 
3663 	if (vq_is_packed(dev)) {
3664 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3665 			count = virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool, pkts, count);
3666 		else
3667 			count = virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool, pkts, count);
3668 	} else {
3669 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3670 			count = virtio_dev_tx_split_legacy(dev, vq, mbuf_pool, pkts, count);
3671 		else
3672 			count = virtio_dev_tx_split_compliant(dev, vq, mbuf_pool, pkts, count);
3673 	}
3674 
3675 	vhost_queue_stats_update(dev, vq, pkts, count);
3676 
3677 out:
3678 	vhost_user_iotlb_rd_unlock(vq);
3679 
3680 out_access_unlock:
3681 	rte_rwlock_read_unlock(&vq->access_lock);
3682 
3683 	if (unlikely(rarp_mbuf != NULL))
3684 		count += 1;
3685 
3686 out_no_unlock:
3687 	return count;
3688 }
3689 
3690 static __rte_always_inline uint16_t
3691 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
3692 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
3693 		uint16_t vchan_id, bool legacy_ol_flags)
3694 	__rte_shared_locks_required(&vq->access_lock)
3695 {
3696 	uint16_t start_idx, from, i;
3697 	uint16_t nr_cpl_pkts = 0;
3698 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
3699 
3700 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
3701 
3702 	start_idx = async_get_first_inflight_pkt_idx(vq);
3703 
3704 	from = start_idx;
3705 	while (vq->async->pkts_cmpl_flag[from] && count--) {
3706 		vq->async->pkts_cmpl_flag[from] = false;
3707 		from = (from + 1) % vq->size;
3708 		nr_cpl_pkts++;
3709 	}
3710 
3711 	if (nr_cpl_pkts == 0)
3712 		return 0;
3713 
3714 	for (i = 0; i < nr_cpl_pkts; i++) {
3715 		from = (start_idx + i) % vq->size;
3716 		pkts[i] = pkts_info[from].mbuf;
3717 
3718 		if (virtio_net_with_host_offload(dev))
3719 			vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i],
3720 					      legacy_ol_flags);
3721 	}
3722 
3723 	/* write back completed descs to used ring and update used idx */
3724 	if (vq_is_packed(dev)) {
3725 		write_back_completed_descs_packed(vq, nr_cpl_pkts);
3726 		vhost_vring_call_packed(dev, vq);
3727 	} else {
3728 		write_back_completed_descs_split(vq, nr_cpl_pkts);
3729 		rte_atomic_fetch_add_explicit((unsigned short __rte_atomic *)&vq->used->idx,
3730 			nr_cpl_pkts, rte_memory_order_release);
3731 		vhost_vring_call_split(dev, vq);
3732 	}
3733 	vq->async->pkts_inflight_n -= nr_cpl_pkts;
3734 
3735 	return nr_cpl_pkts;
3736 }
3737 
3738 static __rte_always_inline uint16_t
3739 virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3740 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3741 		int16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
3742 	__rte_shared_locks_required(&vq->access_lock)
3743 	__rte_shared_locks_required(&vq->iotlb_lock)
3744 {
3745 	static bool allocerr_warned;
3746 	bool dropped = false;
3747 	uint16_t avail_entries;
3748 	uint16_t pkt_idx, slot_idx = 0;
3749 	uint16_t nr_done_pkts = 0;
3750 	uint16_t pkt_err = 0;
3751 	uint16_t n_xfer;
3752 	struct vhost_async *async = vq->async;
3753 	struct async_inflight_info *pkts_info = async->pkts_info;
3754 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
3755 	uint16_t pkts_size = count;
3756 
3757 	/**
3758 	 * The ordering between avail index and
3759 	 * desc reads needs to be enforced.
3760 	 */
3761 	avail_entries = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
3762 		rte_memory_order_acquire) - vq->last_avail_idx;
3763 	if (avail_entries == 0)
3764 		goto out;
3765 
3766 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3767 
3768 	async_iter_reset(async);
3769 
3770 	count = RTE_MIN(count, MAX_PKT_BURST);
3771 	count = RTE_MIN(count, avail_entries);
3772 	VHOST_DATA_LOG(dev->ifname, DEBUG, "about to dequeue %u buffers", count);
3773 
3774 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count)) {
3775 		vq->stats.mbuf_alloc_failed += count;
3776 		goto out;
3777 	}
3778 
3779 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
3780 		uint16_t head_idx = 0;
3781 		uint16_t nr_vec = 0;
3782 		uint16_t to;
3783 		uint32_t buf_len;
3784 		int err;
3785 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3786 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
3787 
3788 		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
3789 						&nr_vec, buf_vec,
3790 						&head_idx, &buf_len,
3791 						VHOST_ACCESS_RO) < 0)) {
3792 			dropped = true;
3793 			break;
3794 		}
3795 
3796 		if (unlikely(buf_len <= dev->vhost_hlen)) {
3797 			dropped = true;
3798 			break;
3799 		}
3800 
3801 		buf_len -= dev->vhost_hlen;
3802 
3803 		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
3804 		if (unlikely(err)) {
3805 			/**
3806 			 * mbuf allocation fails for jumbo packets when external
3807 			 * buffer allocation is not allowed and linear buffer
3808 			 * is required. Drop this packet.
3809 			 */
3810 			if (!allocerr_warned) {
3811 				VHOST_DATA_LOG(dev->ifname, ERR,
3812 					"%s: Failed mbuf alloc of size %d from %s",
3813 					__func__, buf_len, mbuf_pool->name);
3814 				allocerr_warned = true;
3815 			}
3816 			dropped = true;
3817 			slot_idx--;
3818 			break;
3819 		}
3820 
3821 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
3822 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
3823 					legacy_ol_flags, slot_idx, true);
3824 		if (unlikely(err)) {
3825 			if (!allocerr_warned) {
3826 				VHOST_DATA_LOG(dev->ifname, ERR,
3827 					"%s: Failed to offload copies to async channel.",
3828 					__func__);
3829 				allocerr_warned = true;
3830 			}
3831 			dropped = true;
3832 			slot_idx--;
3833 			break;
3834 		}
3835 
3836 		pkts_info[slot_idx].mbuf = pkt;
3837 
3838 		/* store used descs */
3839 		to = async->desc_idx_split & (vq->size - 1);
3840 		async->descs_split[to].id = head_idx;
3841 		async->descs_split[to].len = 0;
3842 		async->desc_idx_split++;
3843 
3844 		vq->last_avail_idx++;
3845 	}
3846 
3847 	if (unlikely(dropped))
3848 		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
3849 
3850 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
3851 					  async->iov_iter, pkt_idx);
3852 
3853 	async->pkts_inflight_n += n_xfer;
3854 
3855 	pkt_err = pkt_idx - n_xfer;
3856 	if (unlikely(pkt_err)) {
3857 		VHOST_DATA_LOG(dev->ifname, DEBUG, "%s: failed to transfer data.",
3858 			__func__);
3859 
3860 		pkt_idx = n_xfer;
3861 		/* recover available ring */
3862 		vq->last_avail_idx -= pkt_err;
3863 
3864 		/**
3865 		 * recover async channel copy related structures and free pktmbufs
3866 		 * for error pkts.
3867 		 */
3868 		async->desc_idx_split -= pkt_err;
3869 		while (pkt_err-- > 0) {
3870 			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
3871 			slot_idx--;
3872 		}
3873 	}
3874 
3875 	async->pkts_idx += pkt_idx;
3876 	if (async->pkts_idx >= vq->size)
3877 		async->pkts_idx -= vq->size;
3878 
3879 out:
3880 	/* DMA device may serve other queues, unconditionally check completed. */
3881 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, pkts_size,
3882 							dma_id, vchan_id, legacy_ol_flags);
3883 
3884 	return nr_done_pkts;
3885 }
3886 
3887 __rte_noinline
3888 static uint16_t
3889 virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
3890 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3891 		struct rte_mbuf **pkts, uint16_t count,
3892 		int16_t dma_id, uint16_t vchan_id)
3893 	__rte_shared_locks_required(&vq->access_lock)
3894 	__rte_shared_locks_required(&vq->iotlb_lock)
3895 {
3896 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3897 				pkts, count, dma_id, vchan_id, true);
3898 }
3899 
3900 __rte_noinline
3901 static uint16_t
3902 virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
3903 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3904 		struct rte_mbuf **pkts, uint16_t count,
3905 		int16_t dma_id, uint16_t vchan_id)
3906 	__rte_shared_locks_required(&vq->access_lock)
3907 	__rte_shared_locks_required(&vq->iotlb_lock)
3908 {
3909 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3910 				pkts, count, dma_id, vchan_id, false);
3911 }
3912 
3913 static __rte_always_inline void
3914 vhost_async_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
3915 				uint16_t buf_id, uint16_t count)
3916 	__rte_shared_locks_required(&vq->access_lock)
3917 {
3918 	struct vhost_async *async = vq->async;
3919 	uint16_t idx = async->buffer_idx_packed;
3920 
3921 	async->buffers_packed[idx].id = buf_id;
3922 	async->buffers_packed[idx].len = 0;
3923 	async->buffers_packed[idx].count = count;
3924 
3925 	async->buffer_idx_packed++;
3926 	if (async->buffer_idx_packed >= vq->size)
3927 		async->buffer_idx_packed -= vq->size;
3928 
3929 }
3930 
3931 static __rte_always_inline int
3932 virtio_dev_tx_async_single_packed(struct virtio_net *dev,
3933 			struct vhost_virtqueue *vq,
3934 			struct rte_mempool *mbuf_pool,
3935 			struct rte_mbuf *pkts,
3936 			uint16_t slot_idx,
3937 			bool legacy_ol_flags)
3938 	__rte_shared_locks_required(&vq->access_lock)
3939 	__rte_shared_locks_required(&vq->iotlb_lock)
3940 {
3941 	int err;
3942 	uint16_t buf_id, desc_count = 0;
3943 	uint16_t nr_vec = 0;
3944 	uint32_t buf_len;
3945 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3946 	struct vhost_async *async = vq->async;
3947 	struct async_inflight_info *pkts_info = async->pkts_info;
3948 	static bool allocerr_warned;
3949 
3950 	if (unlikely(fill_vec_buf_packed(dev, vq, vq->last_avail_idx, &desc_count,
3951 					 buf_vec, &nr_vec, &buf_id, &buf_len,
3952 					 VHOST_ACCESS_RO) < 0))
3953 		return -1;
3954 
3955 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3956 		if (!allocerr_warned) {
3957 			VHOST_DATA_LOG(dev->ifname, ERR, "Failed mbuf alloc of size %d from %s.",
3958 				buf_len, mbuf_pool->name);
3959 
3960 			allocerr_warned = true;
3961 		}
3962 		return -1;
3963 	}
3964 
3965 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts, mbuf_pool,
3966 		legacy_ol_flags, slot_idx, true);
3967 	if (unlikely(err)) {
3968 		rte_pktmbuf_free(pkts);
3969 		if (!allocerr_warned) {
3970 			VHOST_DATA_LOG(dev->ifname, ERR, "Failed to copy desc to mbuf on.");
3971 			allocerr_warned = true;
3972 		}
3973 		return -1;
3974 	}
3975 
3976 	pkts_info[slot_idx].descs = desc_count;
3977 
3978 	/* update async shadow packed ring */
3979 	vhost_async_shadow_dequeue_single_packed(vq, buf_id, desc_count);
3980 
3981 	vq_inc_last_avail_packed(vq, desc_count);
3982 
3983 	return err;
3984 }
3985 
3986 static __rte_always_inline int
3987 virtio_dev_tx_async_packed_batch(struct virtio_net *dev,
3988 			   struct vhost_virtqueue *vq,
3989 			   struct rte_mbuf **pkts, uint16_t slot_idx,
3990 			   uint16_t dma_id, uint16_t vchan_id)
3991 	__rte_shared_locks_required(&vq->access_lock)
3992 	__rte_shared_locks_required(&vq->iotlb_lock)
3993 {
3994 	uint16_t avail_idx = vq->last_avail_idx;
3995 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3996 	struct vhost_async *async = vq->async;
3997 	struct async_inflight_info *pkts_info = async->pkts_info;
3998 	struct virtio_net_hdr *hdr;
3999 	uint32_t mbuf_offset = 0;
4000 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
4001 	uint64_t desc_vva;
4002 	uint64_t lens[PACKED_BATCH_SIZE];
4003 	void *host_iova[PACKED_BATCH_SIZE];
4004 	uint64_t mapped_len[PACKED_BATCH_SIZE];
4005 	uint16_t ids[PACKED_BATCH_SIZE];
4006 	uint16_t i;
4007 
4008 	if (vhost_async_tx_batch_packed_check(dev, vq, pkts, avail_idx,
4009 					     desc_addrs, lens, ids, dma_id, vchan_id))
4010 		return -1;
4011 
4012 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
4013 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
4014 
4015 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4016 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
4017 			desc_addrs[i] + buf_offset, pkts[i]->pkt_len, &mapped_len[i]);
4018 	}
4019 
4020 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4021 		async_iter_initialize(dev, async);
4022 		async_iter_add_iovec(dev, async,
4023 		host_iova[i],
4024 		(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
4025 		mapped_len[i]);
4026 		async->iter_idx++;
4027 	}
4028 
4029 	if (virtio_net_with_host_offload(dev)) {
4030 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4031 			desc_vva = vhost_iova_to_vva(dev, vq, desc_addrs[i],
4032 						&lens[i], VHOST_ACCESS_RO);
4033 			hdr = (struct virtio_net_hdr *)(uintptr_t)desc_vva;
4034 			pkts_info[slot_idx + i].nethdr = *hdr;
4035 		}
4036 	}
4037 
4038 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
4039 
4040 	vhost_async_shadow_dequeue_packed_batch(vq, ids);
4041 
4042 	return 0;
4043 }
4044 
4045 static __rte_always_inline uint16_t
4046 virtio_dev_tx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
4047 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4048 		uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
4049 	__rte_shared_locks_required(&vq->access_lock)
4050 	__rte_shared_locks_required(&vq->iotlb_lock)
4051 {
4052 	uint32_t pkt_idx = 0;
4053 	uint16_t slot_idx = 0;
4054 	uint16_t nr_done_pkts = 0;
4055 	uint16_t pkt_err = 0;
4056 	uint32_t n_xfer;
4057 	uint16_t i;
4058 	struct vhost_async *async = vq->async;
4059 	struct async_inflight_info *pkts_info = async->pkts_info;
4060 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
4061 
4062 	VHOST_DATA_LOG(dev->ifname, DEBUG, "(%d) about to dequeue %u buffers", dev->vid, count);
4063 
4064 	async_iter_reset(async);
4065 
4066 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count)) {
4067 		vq->stats.mbuf_alloc_failed += count;
4068 		goto out;
4069 	}
4070 
4071 	do {
4072 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
4073 
4074 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
4075 
4076 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4077 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
4078 			if (!virtio_dev_tx_async_packed_batch(dev, vq, &pkts_prealloc[pkt_idx],
4079 						slot_idx, dma_id, vchan_id)) {
4080 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
4081 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4082 					pkts_info[slot_idx].descs = 1;
4083 					pkts_info[slot_idx].nr_buffers = 1;
4084 					pkts_info[slot_idx].mbuf = pkts_prealloc[pkt_idx];
4085 					pkt_idx++;
4086 				}
4087 				continue;
4088 			}
4089 		}
4090 
4091 		if (unlikely(virtio_dev_tx_async_single_packed(dev, vq, mbuf_pool, pkt,
4092 				slot_idx, legacy_ol_flags))) {
4093 			rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
4094 
4095 			if (slot_idx == 0)
4096 				slot_idx = vq->size - 1;
4097 			else
4098 				slot_idx--;
4099 
4100 			break;
4101 		}
4102 
4103 		pkts_info[slot_idx].mbuf = pkt;
4104 		pkt_idx++;
4105 	} while (pkt_idx < count);
4106 
4107 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
4108 					async->iov_iter, pkt_idx);
4109 
4110 	async->pkts_inflight_n += n_xfer;
4111 
4112 	pkt_err = pkt_idx - n_xfer;
4113 
4114 	if (unlikely(pkt_err)) {
4115 		uint16_t descs_err = 0;
4116 
4117 		pkt_idx -= pkt_err;
4118 
4119 		/**
4120 		 * recover DMA-copy related structures and free pktmbuf for DMA-error pkts.
4121 		 */
4122 		if (async->buffer_idx_packed >= pkt_err)
4123 			async->buffer_idx_packed -= pkt_err;
4124 		else
4125 			async->buffer_idx_packed += vq->size - pkt_err;
4126 
4127 		while (pkt_err-- > 0) {
4128 			rte_pktmbuf_free(pkts_info[slot_idx].mbuf);
4129 			descs_err += pkts_info[slot_idx].descs;
4130 
4131 			if (slot_idx == 0)
4132 				slot_idx = vq->size - 1;
4133 			else
4134 				slot_idx--;
4135 		}
4136 
4137 		/* recover available ring */
4138 		if (vq->last_avail_idx >= descs_err) {
4139 			vq->last_avail_idx -= descs_err;
4140 		} else {
4141 			vq->last_avail_idx += vq->size - descs_err;
4142 			vq->avail_wrap_counter ^= 1;
4143 		}
4144 	}
4145 
4146 	async->pkts_idx += pkt_idx;
4147 	if (async->pkts_idx >= vq->size)
4148 		async->pkts_idx -= vq->size;
4149 
4150 out:
4151 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, count,
4152 					dma_id, vchan_id, legacy_ol_flags);
4153 
4154 	return nr_done_pkts;
4155 }
4156 
4157 __rte_noinline
4158 static uint16_t
4159 virtio_dev_tx_async_packed_legacy(struct virtio_net *dev, struct vhost_virtqueue *vq,
4160 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4161 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4162 	__rte_shared_locks_required(&vq->access_lock)
4163 	__rte_shared_locks_required(&vq->iotlb_lock)
4164 {
4165 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4166 				pkts, count, dma_id, vchan_id, true);
4167 }
4168 
4169 __rte_noinline
4170 static uint16_t
4171 virtio_dev_tx_async_packed_compliant(struct virtio_net *dev, struct vhost_virtqueue *vq,
4172 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4173 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4174 	__rte_shared_locks_required(&vq->access_lock)
4175 	__rte_shared_locks_required(&vq->iotlb_lock)
4176 {
4177 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4178 				pkts, count, dma_id, vchan_id, false);
4179 }
4180 
4181 uint16_t
4182 rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
4183 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
4184 	int *nr_inflight, int16_t dma_id, uint16_t vchan_id)
4185 {
4186 	struct virtio_net *dev;
4187 	struct rte_mbuf *rarp_mbuf = NULL;
4188 	struct vhost_virtqueue *vq;
4189 	int16_t success = 1;
4190 
4191 	dev = get_device(vid);
4192 	if (!dev || !nr_inflight)
4193 		return 0;
4194 
4195 	*nr_inflight = -1;
4196 
4197 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
4198 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: built-in vhost net backend is disabled.",
4199 			__func__);
4200 		return 0;
4201 	}
4202 
4203 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
4204 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %d.",
4205 			__func__, queue_id);
4206 		return 0;
4207 	}
4208 
4209 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
4210 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
4211 			__func__, dma_id);
4212 		return 0;
4213 	}
4214 
4215 	if (unlikely(!dma_copy_track[dma_id].vchans ||
4216 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
4217 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid channel %d:%u.",
4218 			__func__, dma_id, vchan_id);
4219 		return 0;
4220 	}
4221 
4222 	vq = dev->virtqueue[queue_id];
4223 
4224 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
4225 		return 0;
4226 
4227 	if (unlikely(vq->enabled == 0)) {
4228 		count = 0;
4229 		goto out_access_unlock;
4230 	}
4231 
4232 	if (unlikely(!vq->async)) {
4233 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: async not registered for queue id %d.",
4234 			__func__, queue_id);
4235 		count = 0;
4236 		goto out_access_unlock;
4237 	}
4238 
4239 	vhost_user_iotlb_rd_lock(vq);
4240 
4241 	if (unlikely(vq->access_ok == 0)) {
4242 		vhost_user_iotlb_rd_unlock(vq);
4243 		rte_rwlock_read_unlock(&vq->access_lock);
4244 
4245 		virtio_dev_vring_translate(dev, vq);
4246 		count = 0;
4247 		goto out_no_unlock;
4248 	}
4249 
4250 	/*
4251 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
4252 	 * array, to looks like that guest actually send such packet.
4253 	 *
4254 	 * Check user_send_rarp() for more information.
4255 	 *
4256 	 * broadcast_rarp shares a cacheline in the virtio_net structure
4257 	 * with some fields that are accessed during enqueue and
4258 	 * rte_atomic_compare_exchange_strong_explicit causes a write if performed compare
4259 	 * and exchange. This could result in false sharing between enqueue
4260 	 * and dequeue.
4261 	 *
4262 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
4263 	 * and only performing compare and exchange if the read indicates it
4264 	 * is likely to be set.
4265 	 */
4266 	if (unlikely(rte_atomic_load_explicit(&dev->broadcast_rarp, rte_memory_order_acquire) &&
4267 			rte_atomic_compare_exchange_strong_explicit(&dev->broadcast_rarp,
4268 			&success, 0, rte_memory_order_release, rte_memory_order_relaxed))) {
4269 
4270 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
4271 		if (rarp_mbuf == NULL) {
4272 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to make RARP packet.");
4273 			count = 0;
4274 			goto out;
4275 		}
4276 		/*
4277 		 * Inject it to the head of "pkts" array, so that switch's mac
4278 		 * learning table will get updated first.
4279 		 */
4280 		pkts[0] = rarp_mbuf;
4281 		vhost_queue_stats_update(dev, vq, pkts, 1);
4282 		pkts++;
4283 		count -= 1;
4284 	}
4285 
4286 	if (vq_is_packed(dev)) {
4287 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4288 			count = virtio_dev_tx_async_packed_legacy(dev, vq, mbuf_pool,
4289 					pkts, count, dma_id, vchan_id);
4290 		else
4291 			count = virtio_dev_tx_async_packed_compliant(dev, vq, mbuf_pool,
4292 					pkts, count, dma_id, vchan_id);
4293 	} else {
4294 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4295 			count = virtio_dev_tx_async_split_legacy(dev, vq, mbuf_pool,
4296 					pkts, count, dma_id, vchan_id);
4297 		else
4298 			count = virtio_dev_tx_async_split_compliant(dev, vq, mbuf_pool,
4299 					pkts, count, dma_id, vchan_id);
4300 	}
4301 
4302 	*nr_inflight = vq->async->pkts_inflight_n;
4303 	vhost_queue_stats_update(dev, vq, pkts, count);
4304 
4305 out:
4306 	vhost_user_iotlb_rd_unlock(vq);
4307 
4308 out_access_unlock:
4309 	rte_rwlock_read_unlock(&vq->access_lock);
4310 
4311 	if (unlikely(rarp_mbuf != NULL))
4312 		count += 1;
4313 
4314 out_no_unlock:
4315 	return count;
4316 }
4317