xref: /dpdk/lib/vhost/virtio_net.c (revision d029f35384d0844e9aeb5dbc46fbe1b063d649f7)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_dmadev.h>
15 #include <rte_vhost.h>
16 #include <rte_tcp.h>
17 #include <rte_udp.h>
18 #include <rte_sctp.h>
19 #include <rte_arp.h>
20 #include <rte_spinlock.h>
21 #include <rte_malloc.h>
22 #include <rte_vhost_async.h>
23 
24 #include "iotlb.h"
25 #include "vhost.h"
26 
27 #define MAX_BATCH_LEN 256
28 
29 static __rte_always_inline uint16_t
30 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
31 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
32 		uint16_t vchan_id, bool legacy_ol_flags);
33 
34 /* DMA device copy operation tracking array. */
35 struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
36 
37 static  __rte_always_inline bool
38 rxvq_is_mergeable(struct virtio_net *dev)
39 {
40 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
41 }
42 
43 static  __rte_always_inline bool
44 virtio_net_is_inorder(struct virtio_net *dev)
45 {
46 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
47 }
48 
49 static bool
50 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
51 {
52 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
53 }
54 
55 static inline void
56 vhost_queue_stats_update(struct virtio_net *dev, struct vhost_virtqueue *vq,
57 		struct rte_mbuf **pkts, uint16_t count)
58 	__rte_shared_locks_required(&vq->access_lock)
59 {
60 	struct virtqueue_stats *stats = &vq->stats;
61 	int i;
62 
63 	if (!(dev->flags & VIRTIO_DEV_STATS_ENABLED))
64 		return;
65 
66 	for (i = 0; i < count; i++) {
67 		struct rte_ether_addr *ea;
68 		struct rte_mbuf *pkt = pkts[i];
69 		uint32_t pkt_len = rte_pktmbuf_pkt_len(pkt);
70 
71 		stats->packets++;
72 		stats->bytes += pkt_len;
73 
74 		if (pkt_len == 64) {
75 			stats->size_bins[1]++;
76 		} else if (pkt_len > 64 && pkt_len < 1024) {
77 			uint32_t bin;
78 
79 			/* count zeros, and offset into correct bin */
80 			bin = (sizeof(pkt_len) * 8) - rte_clz32(pkt_len) - 5;
81 			stats->size_bins[bin]++;
82 		} else {
83 			if (pkt_len < 64)
84 				stats->size_bins[0]++;
85 			else if (pkt_len < 1519)
86 				stats->size_bins[6]++;
87 			else
88 				stats->size_bins[7]++;
89 		}
90 
91 		ea = rte_pktmbuf_mtod(pkt, struct rte_ether_addr *);
92 		if (rte_is_multicast_ether_addr(ea)) {
93 			if (rte_is_broadcast_ether_addr(ea))
94 				stats->broadcast++;
95 			else
96 				stats->multicast++;
97 		}
98 	}
99 }
100 
101 static __rte_always_inline int64_t
102 vhost_async_dma_transfer_one(struct virtio_net *dev, struct vhost_virtqueue *vq,
103 		int16_t dma_id, uint16_t vchan_id, uint16_t flag_idx,
104 		struct vhost_iov_iter *pkt)
105 	__rte_shared_locks_required(&vq->access_lock)
106 {
107 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
108 	uint16_t ring_mask = dma_info->ring_mask;
109 	static bool vhost_async_dma_copy_log;
110 
111 
112 	struct vhost_iovec *iov = pkt->iov;
113 	int copy_idx = 0;
114 	uint32_t nr_segs = pkt->nr_segs;
115 	uint16_t i;
116 
117 	if (rte_dma_burst_capacity(dma_id, vchan_id) < nr_segs)
118 		return -1;
119 
120 	for (i = 0; i < nr_segs; i++) {
121 		copy_idx = rte_dma_copy(dma_id, vchan_id, (rte_iova_t)iov[i].src_addr,
122 				(rte_iova_t)iov[i].dst_addr, iov[i].len, RTE_DMA_OP_FLAG_LLC);
123 		/**
124 		 * Since all memory is pinned and DMA vChannel
125 		 * ring has enough space, failure should be a
126 		 * rare case. If failure happens, it means DMA
127 		 * device encounters serious errors; in this
128 		 * case, please stop async data-path and check
129 		 * what has happened to DMA device.
130 		 */
131 		if (unlikely(copy_idx < 0)) {
132 			if (!vhost_async_dma_copy_log) {
133 				VHOST_DATA_LOG(dev->ifname, ERR,
134 					"DMA copy failed for channel %d:%u",
135 					dma_id, vchan_id);
136 				vhost_async_dma_copy_log = true;
137 			}
138 			return -1;
139 		}
140 	}
141 
142 	/**
143 	 * Only store packet completion flag address in the last copy's
144 	 * slot, and other slots are set to NULL.
145 	 */
146 	dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = &vq->async->pkts_cmpl_flag[flag_idx];
147 
148 	return nr_segs;
149 }
150 
151 static __rte_always_inline uint16_t
152 vhost_async_dma_transfer(struct virtio_net *dev, struct vhost_virtqueue *vq,
153 		int16_t dma_id, uint16_t vchan_id, uint16_t head_idx,
154 		struct vhost_iov_iter *pkts, uint16_t nr_pkts)
155 	__rte_shared_locks_required(&vq->access_lock)
156 {
157 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
158 	int64_t ret, nr_copies = 0;
159 	uint16_t pkt_idx;
160 
161 	rte_spinlock_lock(&dma_info->dma_lock);
162 
163 	for (pkt_idx = 0; pkt_idx < nr_pkts; pkt_idx++) {
164 		ret = vhost_async_dma_transfer_one(dev, vq, dma_id, vchan_id, head_idx,
165 				&pkts[pkt_idx]);
166 		if (unlikely(ret < 0))
167 			break;
168 
169 		nr_copies += ret;
170 		head_idx++;
171 		if (head_idx >= vq->size)
172 			head_idx -= vq->size;
173 	}
174 
175 	if (likely(nr_copies > 0))
176 		rte_dma_submit(dma_id, vchan_id);
177 
178 	rte_spinlock_unlock(&dma_info->dma_lock);
179 
180 	return pkt_idx;
181 }
182 
183 static __rte_always_inline uint16_t
184 vhost_async_dma_check_completed(struct virtio_net *dev, int16_t dma_id, uint16_t vchan_id,
185 		uint16_t max_pkts)
186 {
187 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
188 	uint16_t ring_mask = dma_info->ring_mask;
189 	uint16_t last_idx = 0;
190 	uint16_t nr_copies;
191 	uint16_t copy_idx;
192 	uint16_t i;
193 	bool has_error = false;
194 	static bool vhost_async_dma_complete_log;
195 
196 	rte_spinlock_lock(&dma_info->dma_lock);
197 
198 	/**
199 	 * Print error log for debugging, if DMA reports error during
200 	 * DMA transfer. We do not handle error in vhost level.
201 	 */
202 	nr_copies = rte_dma_completed(dma_id, vchan_id, max_pkts, &last_idx, &has_error);
203 	if (unlikely(!vhost_async_dma_complete_log && has_error)) {
204 		VHOST_DATA_LOG(dev->ifname, ERR,
205 			"DMA completion failure on channel %d:%u",
206 			dma_id, vchan_id);
207 		vhost_async_dma_complete_log = true;
208 	} else if (nr_copies == 0) {
209 		goto out;
210 	}
211 
212 	copy_idx = last_idx - nr_copies + 1;
213 	for (i = 0; i < nr_copies; i++) {
214 		bool *flag;
215 
216 		flag = dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask];
217 		if (flag) {
218 			/**
219 			 * Mark the packet flag as received. The flag
220 			 * could belong to another virtqueue but write
221 			 * is atomic.
222 			 */
223 			*flag = true;
224 			dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = NULL;
225 		}
226 		copy_idx++;
227 	}
228 
229 out:
230 	rte_spinlock_unlock(&dma_info->dma_lock);
231 	return nr_copies;
232 }
233 
234 static inline void
235 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
236 	__rte_shared_locks_required(&vq->iotlb_lock)
237 {
238 	struct batch_copy_elem *elem = vq->batch_copy_elems;
239 	uint16_t count = vq->batch_copy_nb_elems;
240 	int i;
241 
242 	for (i = 0; i < count; i++) {
243 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
244 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
245 					   elem[i].len);
246 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
247 	}
248 
249 	vq->batch_copy_nb_elems = 0;
250 }
251 
252 static inline void
253 do_data_copy_dequeue(struct vhost_virtqueue *vq)
254 {
255 	struct batch_copy_elem *elem = vq->batch_copy_elems;
256 	uint16_t count = vq->batch_copy_nb_elems;
257 	int i;
258 
259 	for (i = 0; i < count; i++)
260 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
261 
262 	vq->batch_copy_nb_elems = 0;
263 }
264 
265 static __rte_always_inline void
266 do_flush_shadow_used_ring_split(struct virtio_net *dev,
267 			struct vhost_virtqueue *vq,
268 			uint16_t to, uint16_t from, uint16_t size)
269 {
270 	rte_memcpy(&vq->used->ring[to],
271 			&vq->shadow_used_split[from],
272 			size * sizeof(struct vring_used_elem));
273 	vhost_log_cache_used_vring(dev, vq,
274 			offsetof(struct vring_used, ring[to]),
275 			size * sizeof(struct vring_used_elem));
276 }
277 
278 static __rte_always_inline void
279 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
280 {
281 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
282 
283 	if (used_idx + vq->shadow_used_idx <= vq->size) {
284 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
285 					  vq->shadow_used_idx);
286 	} else {
287 		uint16_t size;
288 
289 		/* update used ring interval [used_idx, vq->size] */
290 		size = vq->size - used_idx;
291 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
292 
293 		/* update the left half used ring interval [0, left_size] */
294 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
295 					  vq->shadow_used_idx - size);
296 	}
297 	vq->last_used_idx += vq->shadow_used_idx;
298 
299 	vhost_log_cache_sync(dev, vq);
300 
301 	rte_atomic_fetch_add_explicit((unsigned short __rte_atomic *)&vq->used->idx,
302 		vq->shadow_used_idx, rte_memory_order_release);
303 	vq->shadow_used_idx = 0;
304 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
305 		sizeof(vq->used->idx));
306 }
307 
308 static __rte_always_inline void
309 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
310 			 uint16_t desc_idx, uint32_t len)
311 {
312 	uint16_t i = vq->shadow_used_idx++;
313 
314 	vq->shadow_used_split[i].id  = desc_idx;
315 	vq->shadow_used_split[i].len = len;
316 }
317 
318 static __rte_always_inline void
319 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
320 				  struct vhost_virtqueue *vq)
321 {
322 	int i;
323 	uint16_t used_idx = vq->last_used_idx;
324 	uint16_t head_idx = vq->last_used_idx;
325 	uint16_t head_flags = 0;
326 
327 	/* Split loop in two to save memory barriers */
328 	for (i = 0; i < vq->shadow_used_idx; i++) {
329 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
330 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
331 
332 		used_idx += vq->shadow_used_packed[i].count;
333 		if (used_idx >= vq->size)
334 			used_idx -= vq->size;
335 	}
336 
337 	/* The ordering for storing desc flags needs to be enforced. */
338 	rte_atomic_thread_fence(rte_memory_order_release);
339 
340 	for (i = 0; i < vq->shadow_used_idx; i++) {
341 		uint16_t flags;
342 
343 		if (vq->shadow_used_packed[i].len)
344 			flags = VRING_DESC_F_WRITE;
345 		else
346 			flags = 0;
347 
348 		if (vq->used_wrap_counter) {
349 			flags |= VRING_DESC_F_USED;
350 			flags |= VRING_DESC_F_AVAIL;
351 		} else {
352 			flags &= ~VRING_DESC_F_USED;
353 			flags &= ~VRING_DESC_F_AVAIL;
354 		}
355 
356 		if (i > 0) {
357 			vq->desc_packed[vq->last_used_idx].flags = flags;
358 
359 			vhost_log_cache_used_vring(dev, vq,
360 					vq->last_used_idx *
361 					sizeof(struct vring_packed_desc),
362 					sizeof(struct vring_packed_desc));
363 		} else {
364 			head_idx = vq->last_used_idx;
365 			head_flags = flags;
366 		}
367 
368 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
369 	}
370 
371 	vq->desc_packed[head_idx].flags = head_flags;
372 
373 	vhost_log_cache_used_vring(dev, vq,
374 				head_idx *
375 				sizeof(struct vring_packed_desc),
376 				sizeof(struct vring_packed_desc));
377 
378 	vq->shadow_used_idx = 0;
379 	vhost_log_cache_sync(dev, vq);
380 }
381 
382 static __rte_always_inline void
383 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
384 				  struct vhost_virtqueue *vq)
385 {
386 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
387 
388 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
389 	/* desc flags is the synchronization point for virtio packed vring */
390 	rte_atomic_store_explicit(
391 		(unsigned short __rte_atomic *)&vq->desc_packed[vq->shadow_last_used_idx].flags,
392 		used_elem->flags, rte_memory_order_release);
393 
394 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
395 				   sizeof(struct vring_packed_desc),
396 				   sizeof(struct vring_packed_desc));
397 	vq->shadow_used_idx = 0;
398 	vhost_log_cache_sync(dev, vq);
399 }
400 
401 static __rte_always_inline void
402 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
403 				 struct vhost_virtqueue *vq,
404 				 uint64_t *lens,
405 				 uint16_t *ids)
406 {
407 	uint16_t i;
408 	uint16_t flags;
409 	uint16_t last_used_idx;
410 	struct vring_packed_desc *desc_base;
411 
412 	last_used_idx = vq->last_used_idx;
413 	desc_base = &vq->desc_packed[last_used_idx];
414 
415 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
416 
417 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
418 		desc_base[i].id = ids[i];
419 		desc_base[i].len = lens[i];
420 	}
421 
422 	rte_atomic_thread_fence(rte_memory_order_release);
423 
424 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
425 		desc_base[i].flags = flags;
426 	}
427 
428 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
429 				   sizeof(struct vring_packed_desc),
430 				   sizeof(struct vring_packed_desc) *
431 				   PACKED_BATCH_SIZE);
432 	vhost_log_cache_sync(dev, vq);
433 
434 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
435 }
436 
437 static __rte_always_inline void
438 vhost_async_shadow_enqueue_packed_batch(struct vhost_virtqueue *vq,
439 				 uint64_t *lens,
440 				 uint16_t *ids)
441 	__rte_exclusive_locks_required(&vq->access_lock)
442 {
443 	uint16_t i;
444 	struct vhost_async *async = vq->async;
445 
446 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
447 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
448 		async->buffers_packed[async->buffer_idx_packed].len = lens[i];
449 		async->buffers_packed[async->buffer_idx_packed].count = 1;
450 		async->buffer_idx_packed++;
451 		if (async->buffer_idx_packed >= vq->size)
452 			async->buffer_idx_packed -= vq->size;
453 	}
454 }
455 
456 static __rte_always_inline void
457 vhost_async_shadow_dequeue_packed_batch(struct vhost_virtqueue *vq, uint16_t *ids)
458 	__rte_shared_locks_required(&vq->access_lock)
459 {
460 	uint16_t i;
461 	struct vhost_async *async = vq->async;
462 
463 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
464 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
465 		async->buffers_packed[async->buffer_idx_packed].len = 0;
466 		async->buffers_packed[async->buffer_idx_packed].count = 1;
467 
468 		async->buffer_idx_packed++;
469 		if (async->buffer_idx_packed >= vq->size)
470 			async->buffer_idx_packed -= vq->size;
471 	}
472 }
473 
474 static __rte_always_inline void
475 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
476 					  uint16_t id)
477 {
478 	vq->shadow_used_packed[0].id = id;
479 
480 	if (!vq->shadow_used_idx) {
481 		vq->shadow_last_used_idx = vq->last_used_idx;
482 		vq->shadow_used_packed[0].flags =
483 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
484 		vq->shadow_used_packed[0].len = 0;
485 		vq->shadow_used_packed[0].count = 1;
486 		vq->shadow_used_idx++;
487 	}
488 
489 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
490 }
491 
492 static __rte_always_inline void
493 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
494 				  struct vhost_virtqueue *vq,
495 				  uint16_t *ids)
496 {
497 	uint16_t flags;
498 	uint16_t i;
499 	uint16_t begin;
500 
501 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
502 
503 	if (!vq->shadow_used_idx) {
504 		vq->shadow_last_used_idx = vq->last_used_idx;
505 		vq->shadow_used_packed[0].id  = ids[0];
506 		vq->shadow_used_packed[0].len = 0;
507 		vq->shadow_used_packed[0].count = 1;
508 		vq->shadow_used_packed[0].flags = flags;
509 		vq->shadow_used_idx++;
510 		begin = 1;
511 	} else
512 		begin = 0;
513 
514 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
515 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
516 		vq->desc_packed[vq->last_used_idx + i].len = 0;
517 	}
518 
519 	rte_atomic_thread_fence(rte_memory_order_release);
520 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
521 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
522 
523 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
524 				   sizeof(struct vring_packed_desc),
525 				   sizeof(struct vring_packed_desc) *
526 				   PACKED_BATCH_SIZE);
527 	vhost_log_cache_sync(dev, vq);
528 
529 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
530 }
531 
532 static __rte_always_inline void
533 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
534 				   uint16_t buf_id,
535 				   uint16_t count)
536 {
537 	uint16_t flags;
538 
539 	flags = vq->desc_packed[vq->last_used_idx].flags;
540 	if (vq->used_wrap_counter) {
541 		flags |= VRING_DESC_F_USED;
542 		flags |= VRING_DESC_F_AVAIL;
543 	} else {
544 		flags &= ~VRING_DESC_F_USED;
545 		flags &= ~VRING_DESC_F_AVAIL;
546 	}
547 
548 	if (!vq->shadow_used_idx) {
549 		vq->shadow_last_used_idx = vq->last_used_idx;
550 
551 		vq->shadow_used_packed[0].id  = buf_id;
552 		vq->shadow_used_packed[0].len = 0;
553 		vq->shadow_used_packed[0].flags = flags;
554 		vq->shadow_used_idx++;
555 	} else {
556 		vq->desc_packed[vq->last_used_idx].id = buf_id;
557 		vq->desc_packed[vq->last_used_idx].len = 0;
558 		vq->desc_packed[vq->last_used_idx].flags = flags;
559 	}
560 
561 	vq_inc_last_used_packed(vq, count);
562 }
563 
564 static __rte_always_inline void
565 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
566 					   uint16_t buf_id,
567 					   uint16_t count)
568 {
569 	uint16_t flags;
570 
571 	vq->shadow_used_packed[0].id = buf_id;
572 
573 	flags = vq->desc_packed[vq->last_used_idx].flags;
574 	if (vq->used_wrap_counter) {
575 		flags |= VRING_DESC_F_USED;
576 		flags |= VRING_DESC_F_AVAIL;
577 	} else {
578 		flags &= ~VRING_DESC_F_USED;
579 		flags &= ~VRING_DESC_F_AVAIL;
580 	}
581 
582 	if (!vq->shadow_used_idx) {
583 		vq->shadow_last_used_idx = vq->last_used_idx;
584 		vq->shadow_used_packed[0].len = 0;
585 		vq->shadow_used_packed[0].flags = flags;
586 		vq->shadow_used_idx++;
587 	}
588 
589 	vq_inc_last_used_packed(vq, count);
590 }
591 
592 static __rte_always_inline void
593 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
594 				   uint32_t *len,
595 				   uint16_t *id,
596 				   uint16_t *count,
597 				   uint16_t num_buffers)
598 {
599 	uint16_t i;
600 
601 	for (i = 0; i < num_buffers; i++) {
602 		/* enqueue shadow flush action aligned with batch num */
603 		if (!vq->shadow_used_idx)
604 			vq->shadow_aligned_idx = vq->last_used_idx &
605 				PACKED_BATCH_MASK;
606 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
607 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
608 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
609 		vq->shadow_aligned_idx += count[i];
610 		vq->shadow_used_idx++;
611 	}
612 }
613 
614 static __rte_always_inline void
615 vhost_async_shadow_enqueue_packed(struct vhost_virtqueue *vq,
616 				   uint32_t *len,
617 				   uint16_t *id,
618 				   uint16_t *count,
619 				   uint16_t num_buffers)
620 	__rte_exclusive_locks_required(&vq->access_lock)
621 {
622 	uint16_t i;
623 	struct vhost_async *async = vq->async;
624 
625 	for (i = 0; i < num_buffers; i++) {
626 		async->buffers_packed[async->buffer_idx_packed].id  = id[i];
627 		async->buffers_packed[async->buffer_idx_packed].len = len[i];
628 		async->buffers_packed[async->buffer_idx_packed].count = count[i];
629 		async->buffer_idx_packed++;
630 		if (async->buffer_idx_packed >= vq->size)
631 			async->buffer_idx_packed -= vq->size;
632 	}
633 }
634 
635 static __rte_always_inline void
636 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
637 				   struct vhost_virtqueue *vq,
638 				   uint32_t *len,
639 				   uint16_t *id,
640 				   uint16_t *count,
641 				   uint16_t num_buffers)
642 	__rte_shared_locks_required(&vq->iotlb_lock)
643 {
644 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
645 
646 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
647 		do_data_copy_enqueue(dev, vq);
648 		vhost_flush_enqueue_shadow_packed(dev, vq);
649 	}
650 }
651 
652 /* avoid write operation when necessary, to lessen cache issues */
653 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
654 	if ((var) != (val))			\
655 		(var) = (val);			\
656 } while (0)
657 
658 static __rte_always_inline void
659 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
660 {
661 	uint64_t csum_l4 = m_buf->ol_flags & RTE_MBUF_F_TX_L4_MASK;
662 
663 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
664 		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
665 
666 	if (csum_l4) {
667 		/*
668 		 * Pseudo-header checksum must be set as per Virtio spec.
669 		 *
670 		 * Note: We don't propagate rte_net_intel_cksum_prepare()
671 		 * errors, as it would have an impact on performance, and an
672 		 * error would mean the packet is dropped by the guest instead
673 		 * of being dropped here.
674 		 */
675 		rte_net_intel_cksum_prepare(m_buf);
676 
677 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
678 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
679 
680 		switch (csum_l4) {
681 		case RTE_MBUF_F_TX_TCP_CKSUM:
682 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
683 						cksum));
684 			break;
685 		case RTE_MBUF_F_TX_UDP_CKSUM:
686 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
687 						dgram_cksum));
688 			break;
689 		case RTE_MBUF_F_TX_SCTP_CKSUM:
690 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
691 						cksum));
692 			break;
693 		}
694 	} else {
695 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
696 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
697 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
698 	}
699 
700 	/* IP cksum verification cannot be bypassed, then calculate here */
701 	if (m_buf->ol_flags & RTE_MBUF_F_TX_IP_CKSUM) {
702 		struct rte_ipv4_hdr *ipv4_hdr;
703 
704 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
705 						   m_buf->l2_len);
706 		ipv4_hdr->hdr_checksum = 0;
707 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
708 	}
709 
710 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
711 		if (m_buf->ol_flags & RTE_MBUF_F_TX_IPV4)
712 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
713 		else
714 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
715 		net_hdr->gso_size = m_buf->tso_segsz;
716 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
717 					+ m_buf->l4_len;
718 	} else if (m_buf->ol_flags & RTE_MBUF_F_TX_UDP_SEG) {
719 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
720 		net_hdr->gso_size = m_buf->tso_segsz;
721 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
722 			m_buf->l4_len;
723 	} else {
724 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
725 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
726 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
727 	}
728 }
729 
730 static __rte_always_inline int
731 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
732 		struct buf_vector *buf_vec, uint16_t *vec_idx,
733 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
734 	__rte_shared_locks_required(&vq->iotlb_lock)
735 {
736 	uint16_t vec_id = *vec_idx;
737 
738 	while (desc_len) {
739 		uint64_t desc_addr;
740 		uint64_t desc_chunck_len = desc_len;
741 
742 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
743 			return -1;
744 
745 		desc_addr = vhost_iova_to_vva(dev, vq,
746 				desc_iova,
747 				&desc_chunck_len,
748 				perm);
749 		if (unlikely(!desc_addr))
750 			return -1;
751 
752 		rte_prefetch0((void *)(uintptr_t)desc_addr);
753 
754 		buf_vec[vec_id].buf_iova = desc_iova;
755 		buf_vec[vec_id].buf_addr = desc_addr;
756 		buf_vec[vec_id].buf_len  = desc_chunck_len;
757 
758 		desc_len -= desc_chunck_len;
759 		desc_iova += desc_chunck_len;
760 		vec_id++;
761 	}
762 	*vec_idx = vec_id;
763 
764 	return 0;
765 }
766 
767 static __rte_always_inline int
768 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
769 			 uint32_t avail_idx, uint16_t *vec_idx,
770 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
771 			 uint32_t *desc_chain_len, uint8_t perm)
772 	__rte_shared_locks_required(&vq->iotlb_lock)
773 {
774 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
775 	uint16_t vec_id = *vec_idx;
776 	uint32_t len    = 0;
777 	uint64_t dlen;
778 	uint32_t nr_descs = vq->size;
779 	uint32_t cnt    = 0;
780 	struct vring_desc *descs = vq->desc;
781 	struct vring_desc *idesc = NULL;
782 
783 	if (unlikely(idx >= vq->size))
784 		return -1;
785 
786 	*desc_chain_head = idx;
787 
788 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
789 		dlen = vq->desc[idx].len;
790 		nr_descs = dlen / sizeof(struct vring_desc);
791 		if (unlikely(nr_descs > vq->size))
792 			return -1;
793 
794 		descs = (struct vring_desc *)(uintptr_t)
795 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
796 						&dlen,
797 						VHOST_ACCESS_RO);
798 		if (unlikely(!descs))
799 			return -1;
800 
801 		if (unlikely(dlen < vq->desc[idx].len)) {
802 			/*
803 			 * The indirect desc table is not contiguous
804 			 * in process VA space, we have to copy it.
805 			 */
806 			idesc = vhost_alloc_copy_ind_table(dev, vq,
807 					vq->desc[idx].addr, vq->desc[idx].len);
808 			if (unlikely(!idesc))
809 				return -1;
810 
811 			descs = idesc;
812 		}
813 
814 		idx = 0;
815 	}
816 
817 	while (1) {
818 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
819 			free_ind_table(idesc);
820 			return -1;
821 		}
822 
823 		dlen = descs[idx].len;
824 		len += dlen;
825 
826 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
827 						descs[idx].addr, dlen,
828 						perm))) {
829 			free_ind_table(idesc);
830 			return -1;
831 		}
832 
833 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
834 			break;
835 
836 		idx = descs[idx].next;
837 	}
838 
839 	*desc_chain_len = len;
840 	*vec_idx = vec_id;
841 
842 	if (unlikely(!!idesc))
843 		free_ind_table(idesc);
844 
845 	return 0;
846 }
847 
848 /*
849  * Returns -1 on fail, 0 on success
850  */
851 static inline int
852 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
853 				uint64_t size, struct buf_vector *buf_vec,
854 				uint16_t *num_buffers, uint16_t avail_head,
855 				uint16_t *nr_vec)
856 	__rte_shared_locks_required(&vq->iotlb_lock)
857 {
858 	uint16_t cur_idx;
859 	uint16_t vec_idx = 0;
860 	uint16_t max_tries, tries = 0;
861 
862 	uint16_t head_idx = 0;
863 	uint32_t len = 0;
864 
865 	*num_buffers = 0;
866 	cur_idx  = vq->last_avail_idx;
867 
868 	if (rxvq_is_mergeable(dev))
869 		max_tries = vq->size - 1;
870 	else
871 		max_tries = 1;
872 
873 	while (size > 0) {
874 		if (unlikely(cur_idx == avail_head))
875 			return -1;
876 		/*
877 		 * if we tried all available ring items, and still
878 		 * can't get enough buf, it means something abnormal
879 		 * happened.
880 		 */
881 		if (unlikely(++tries > max_tries))
882 			return -1;
883 
884 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
885 						&vec_idx, buf_vec,
886 						&head_idx, &len,
887 						VHOST_ACCESS_RW) < 0))
888 			return -1;
889 		len = RTE_MIN(len, size);
890 		update_shadow_used_ring_split(vq, head_idx, len);
891 		size -= len;
892 
893 		cur_idx++;
894 		*num_buffers += 1;
895 	}
896 
897 	*nr_vec = vec_idx;
898 
899 	return 0;
900 }
901 
902 static __rte_always_inline int
903 fill_vec_buf_packed_indirect(struct virtio_net *dev,
904 			struct vhost_virtqueue *vq,
905 			struct vring_packed_desc *desc, uint16_t *vec_idx,
906 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
907 	__rte_shared_locks_required(&vq->iotlb_lock)
908 {
909 	uint16_t i;
910 	uint32_t nr_descs;
911 	uint16_t vec_id = *vec_idx;
912 	uint64_t dlen;
913 	struct vring_packed_desc *descs, *idescs = NULL;
914 
915 	dlen = desc->len;
916 	descs = (struct vring_packed_desc *)(uintptr_t)
917 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
918 	if (unlikely(!descs))
919 		return -1;
920 
921 	if (unlikely(dlen < desc->len)) {
922 		/*
923 		 * The indirect desc table is not contiguous
924 		 * in process VA space, we have to copy it.
925 		 */
926 		idescs = vhost_alloc_copy_ind_table(dev,
927 				vq, desc->addr, desc->len);
928 		if (unlikely(!idescs))
929 			return -1;
930 
931 		descs = idescs;
932 	}
933 
934 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
935 	if (unlikely(nr_descs >= vq->size)) {
936 		free_ind_table(idescs);
937 		return -1;
938 	}
939 
940 	for (i = 0; i < nr_descs; i++) {
941 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
942 			free_ind_table(idescs);
943 			return -1;
944 		}
945 
946 		dlen = descs[i].len;
947 		*len += dlen;
948 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
949 						descs[i].addr, dlen,
950 						perm)))
951 			return -1;
952 	}
953 	*vec_idx = vec_id;
954 
955 	if (unlikely(!!idescs))
956 		free_ind_table(idescs);
957 
958 	return 0;
959 }
960 
961 static __rte_always_inline int
962 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
963 				uint16_t avail_idx, uint16_t *desc_count,
964 				struct buf_vector *buf_vec, uint16_t *vec_idx,
965 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
966 	__rte_shared_locks_required(&vq->iotlb_lock)
967 {
968 	bool wrap_counter = vq->avail_wrap_counter;
969 	struct vring_packed_desc *descs = vq->desc_packed;
970 	uint16_t vec_id = *vec_idx;
971 	uint64_t dlen;
972 
973 	if (avail_idx < vq->last_avail_idx)
974 		wrap_counter ^= 1;
975 
976 	/*
977 	 * Perform a load-acquire barrier in desc_is_avail to
978 	 * enforce the ordering between desc flags and desc
979 	 * content.
980 	 */
981 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
982 		return -1;
983 
984 	*desc_count = 0;
985 	*len = 0;
986 
987 	while (1) {
988 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
989 			return -1;
990 
991 		if (unlikely(*desc_count >= vq->size))
992 			return -1;
993 
994 		*desc_count += 1;
995 		*buf_id = descs[avail_idx].id;
996 
997 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
998 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
999 							&descs[avail_idx],
1000 							&vec_id, buf_vec,
1001 							len, perm) < 0))
1002 				return -1;
1003 		} else {
1004 			dlen = descs[avail_idx].len;
1005 			*len += dlen;
1006 
1007 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
1008 							descs[avail_idx].addr,
1009 							dlen,
1010 							perm)))
1011 				return -1;
1012 		}
1013 
1014 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
1015 			break;
1016 
1017 		if (++avail_idx >= vq->size) {
1018 			avail_idx -= vq->size;
1019 			wrap_counter ^= 1;
1020 		}
1021 	}
1022 
1023 	*vec_idx = vec_id;
1024 
1025 	return 0;
1026 }
1027 
1028 static __rte_noinline void
1029 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1030 		struct buf_vector *buf_vec,
1031 		struct virtio_net_hdr_mrg_rxbuf *hdr)
1032 	__rte_shared_locks_required(&vq->iotlb_lock)
1033 {
1034 	uint64_t len;
1035 	uint64_t remain = dev->vhost_hlen;
1036 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
1037 	uint64_t iova = buf_vec->buf_iova;
1038 
1039 	while (remain) {
1040 		len = RTE_MIN(remain,
1041 				buf_vec->buf_len);
1042 		dst = buf_vec->buf_addr;
1043 		rte_memcpy((void *)(uintptr_t)dst,
1044 				(void *)(uintptr_t)src,
1045 				len);
1046 
1047 		PRINT_PACKET(dev, (uintptr_t)dst,
1048 				(uint32_t)len, 0);
1049 		vhost_log_cache_write_iova(dev, vq,
1050 				iova, len);
1051 
1052 		remain -= len;
1053 		iova += len;
1054 		src += len;
1055 		buf_vec++;
1056 	}
1057 }
1058 
1059 static __rte_always_inline int
1060 async_iter_initialize(struct virtio_net *dev, struct vhost_async *async)
1061 {
1062 	struct vhost_iov_iter *iter;
1063 
1064 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1065 		VHOST_DATA_LOG(dev->ifname, ERR, "no more async iovec available");
1066 		return -1;
1067 	}
1068 
1069 	iter = async->iov_iter + async->iter_idx;
1070 	iter->iov = async->iovec + async->iovec_idx;
1071 	iter->nr_segs = 0;
1072 
1073 	return 0;
1074 }
1075 
1076 static __rte_always_inline int
1077 async_iter_add_iovec(struct virtio_net *dev, struct vhost_async *async,
1078 		void *src, void *dst, size_t len)
1079 {
1080 	struct vhost_iov_iter *iter;
1081 	struct vhost_iovec *iovec;
1082 
1083 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1084 		static bool vhost_max_async_vec_log;
1085 
1086 		if (!vhost_max_async_vec_log) {
1087 			VHOST_DATA_LOG(dev->ifname, ERR, "no more async iovec available");
1088 			vhost_max_async_vec_log = true;
1089 		}
1090 
1091 		return -1;
1092 	}
1093 
1094 	iter = async->iov_iter + async->iter_idx;
1095 	iovec = async->iovec + async->iovec_idx;
1096 
1097 	iovec->src_addr = src;
1098 	iovec->dst_addr = dst;
1099 	iovec->len = len;
1100 
1101 	iter->nr_segs++;
1102 	async->iovec_idx++;
1103 
1104 	return 0;
1105 }
1106 
1107 static __rte_always_inline void
1108 async_iter_finalize(struct vhost_async *async)
1109 {
1110 	async->iter_idx++;
1111 }
1112 
1113 static __rte_always_inline void
1114 async_iter_cancel(struct vhost_async *async)
1115 {
1116 	struct vhost_iov_iter *iter;
1117 
1118 	iter = async->iov_iter + async->iter_idx;
1119 	async->iovec_idx -= iter->nr_segs;
1120 	iter->nr_segs = 0;
1121 	iter->iov = NULL;
1122 }
1123 
1124 static __rte_always_inline void
1125 async_iter_reset(struct vhost_async *async)
1126 {
1127 	async->iter_idx = 0;
1128 	async->iovec_idx = 0;
1129 }
1130 
1131 static __rte_always_inline int
1132 async_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1133 		struct rte_mbuf *m, uint32_t mbuf_offset,
1134 		uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1135 	__rte_shared_locks_required(&vq->access_lock)
1136 	__rte_shared_locks_required(&vq->iotlb_lock)
1137 {
1138 	struct vhost_async *async = vq->async;
1139 	uint64_t mapped_len;
1140 	uint32_t buf_offset = 0;
1141 	void *src, *dst;
1142 	void *host_iova;
1143 
1144 	while (cpy_len) {
1145 		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
1146 				buf_iova + buf_offset, cpy_len, &mapped_len);
1147 		if (unlikely(!host_iova)) {
1148 			VHOST_DATA_LOG(dev->ifname, ERR,
1149 				"%s: failed to get host iova.",
1150 				__func__);
1151 			return -1;
1152 		}
1153 
1154 		if (to_desc) {
1155 			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1156 			dst = host_iova;
1157 		} else {
1158 			src = host_iova;
1159 			dst = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1160 		}
1161 
1162 		if (unlikely(async_iter_add_iovec(dev, async, src, dst, (size_t)mapped_len)))
1163 			return -1;
1164 
1165 		cpy_len -= (uint32_t)mapped_len;
1166 		mbuf_offset += (uint32_t)mapped_len;
1167 		buf_offset += (uint32_t)mapped_len;
1168 	}
1169 
1170 	return 0;
1171 }
1172 
1173 static __rte_always_inline void
1174 sync_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1175 		struct rte_mbuf *m, uint32_t mbuf_offset,
1176 		uint64_t buf_addr, uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1177 	__rte_shared_locks_required(&vq->iotlb_lock)
1178 {
1179 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
1180 
1181 	if (likely(cpy_len > MAX_BATCH_LEN || vq->batch_copy_nb_elems >= vq->size)) {
1182 		if (to_desc) {
1183 			rte_memcpy((void *)((uintptr_t)(buf_addr)),
1184 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1185 				cpy_len);
1186 			vhost_log_cache_write_iova(dev, vq, buf_iova, cpy_len);
1187 			PRINT_PACKET(dev, (uintptr_t)(buf_addr), cpy_len, 0);
1188 		} else {
1189 			rte_memcpy(rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1190 				(void *)((uintptr_t)(buf_addr)),
1191 				cpy_len);
1192 		}
1193 	} else {
1194 		if (to_desc) {
1195 			batch_copy[vq->batch_copy_nb_elems].dst =
1196 				(void *)((uintptr_t)(buf_addr));
1197 			batch_copy[vq->batch_copy_nb_elems].src =
1198 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1199 			batch_copy[vq->batch_copy_nb_elems].log_addr = buf_iova;
1200 		} else {
1201 			batch_copy[vq->batch_copy_nb_elems].dst =
1202 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1203 			batch_copy[vq->batch_copy_nb_elems].src =
1204 				(void *)((uintptr_t)(buf_addr));
1205 		}
1206 		batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
1207 		vq->batch_copy_nb_elems++;
1208 	}
1209 }
1210 
1211 static __rte_always_inline int
1212 mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1213 		struct rte_mbuf *m, struct buf_vector *buf_vec,
1214 		uint16_t nr_vec, uint16_t num_buffers, bool is_async)
1215 	__rte_shared_locks_required(&vq->access_lock)
1216 	__rte_shared_locks_required(&vq->iotlb_lock)
1217 {
1218 	uint32_t vec_idx = 0;
1219 	uint32_t mbuf_offset, mbuf_avail;
1220 	uint32_t buf_offset, buf_avail;
1221 	uint64_t buf_addr, buf_iova, buf_len;
1222 	uint32_t cpy_len;
1223 	uint64_t hdr_addr;
1224 	struct rte_mbuf *hdr_mbuf;
1225 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
1226 	struct vhost_async *async = vq->async;
1227 
1228 	if (unlikely(m == NULL))
1229 		return -1;
1230 
1231 	buf_addr = buf_vec[vec_idx].buf_addr;
1232 	buf_iova = buf_vec[vec_idx].buf_iova;
1233 	buf_len = buf_vec[vec_idx].buf_len;
1234 
1235 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
1236 		return -1;
1237 
1238 	hdr_mbuf = m;
1239 	hdr_addr = buf_addr;
1240 	if (unlikely(buf_len < dev->vhost_hlen)) {
1241 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
1242 		hdr = &tmp_hdr;
1243 	} else
1244 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
1245 
1246 	VHOST_DATA_LOG(dev->ifname, DEBUG, "RX: num merge buffers %d", num_buffers);
1247 
1248 	if (unlikely(buf_len < dev->vhost_hlen)) {
1249 		buf_offset = dev->vhost_hlen - buf_len;
1250 		vec_idx++;
1251 		buf_addr = buf_vec[vec_idx].buf_addr;
1252 		buf_iova = buf_vec[vec_idx].buf_iova;
1253 		buf_len = buf_vec[vec_idx].buf_len;
1254 		buf_avail = buf_len - buf_offset;
1255 	} else {
1256 		buf_offset = dev->vhost_hlen;
1257 		buf_avail = buf_len - dev->vhost_hlen;
1258 	}
1259 
1260 	mbuf_avail  = rte_pktmbuf_data_len(m);
1261 	mbuf_offset = 0;
1262 
1263 	if (is_async) {
1264 		if (async_iter_initialize(dev, async))
1265 			return -1;
1266 	}
1267 
1268 	while (mbuf_avail != 0 || m->next != NULL) {
1269 		/* done with current buf, get the next one */
1270 		if (buf_avail == 0) {
1271 			vec_idx++;
1272 			if (unlikely(vec_idx >= nr_vec))
1273 				goto error;
1274 
1275 			buf_addr = buf_vec[vec_idx].buf_addr;
1276 			buf_iova = buf_vec[vec_idx].buf_iova;
1277 			buf_len = buf_vec[vec_idx].buf_len;
1278 
1279 			buf_offset = 0;
1280 			buf_avail  = buf_len;
1281 		}
1282 
1283 		/* done with current mbuf, get the next one */
1284 		if (mbuf_avail == 0) {
1285 			m = m->next;
1286 
1287 			mbuf_offset = 0;
1288 			mbuf_avail  = rte_pktmbuf_data_len(m);
1289 		}
1290 
1291 		if (hdr_addr) {
1292 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1293 			if (rxvq_is_mergeable(dev))
1294 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1295 						num_buffers);
1296 
1297 			if (unlikely(hdr == &tmp_hdr)) {
1298 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1299 			} else {
1300 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1301 						dev->vhost_hlen, 0);
1302 				vhost_log_cache_write_iova(dev, vq,
1303 						buf_vec[0].buf_iova,
1304 						dev->vhost_hlen);
1305 			}
1306 
1307 			hdr_addr = 0;
1308 		}
1309 
1310 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1311 
1312 		if (is_async) {
1313 			if (async_fill_seg(dev, vq, m, mbuf_offset,
1314 					   buf_iova + buf_offset, cpy_len, true) < 0)
1315 				goto error;
1316 		} else {
1317 			sync_fill_seg(dev, vq, m, mbuf_offset,
1318 				      buf_addr + buf_offset,
1319 				      buf_iova + buf_offset, cpy_len, true);
1320 		}
1321 
1322 		mbuf_avail  -= cpy_len;
1323 		mbuf_offset += cpy_len;
1324 		buf_avail  -= cpy_len;
1325 		buf_offset += cpy_len;
1326 	}
1327 
1328 	if (is_async)
1329 		async_iter_finalize(async);
1330 
1331 	return 0;
1332 error:
1333 	if (is_async)
1334 		async_iter_cancel(async);
1335 
1336 	return -1;
1337 }
1338 
1339 static __rte_always_inline int
1340 vhost_enqueue_single_packed(struct virtio_net *dev,
1341 			    struct vhost_virtqueue *vq,
1342 			    struct rte_mbuf *pkt,
1343 			    struct buf_vector *buf_vec,
1344 			    uint16_t *nr_descs)
1345 	__rte_shared_locks_required(&vq->access_lock)
1346 	__rte_shared_locks_required(&vq->iotlb_lock)
1347 {
1348 	uint16_t nr_vec = 0;
1349 	uint16_t avail_idx = vq->last_avail_idx;
1350 	uint16_t max_tries, tries = 0;
1351 	uint16_t buf_id = 0;
1352 	uint32_t len = 0;
1353 	uint16_t desc_count;
1354 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1355 	uint16_t num_buffers = 0;
1356 	uint32_t buffer_len[vq->size];
1357 	uint16_t buffer_buf_id[vq->size];
1358 	uint16_t buffer_desc_count[vq->size];
1359 
1360 	if (rxvq_is_mergeable(dev))
1361 		max_tries = vq->size - 1;
1362 	else
1363 		max_tries = 1;
1364 
1365 	while (size > 0) {
1366 		/*
1367 		 * if we tried all available ring items, and still
1368 		 * can't get enough buf, it means something abnormal
1369 		 * happened.
1370 		 */
1371 		if (unlikely(++tries > max_tries))
1372 			return -1;
1373 
1374 		if (unlikely(fill_vec_buf_packed(dev, vq,
1375 						avail_idx, &desc_count,
1376 						buf_vec, &nr_vec,
1377 						&buf_id, &len,
1378 						VHOST_ACCESS_RW) < 0))
1379 			return -1;
1380 
1381 		len = RTE_MIN(len, size);
1382 		size -= len;
1383 
1384 		buffer_len[num_buffers] = len;
1385 		buffer_buf_id[num_buffers] = buf_id;
1386 		buffer_desc_count[num_buffers] = desc_count;
1387 		num_buffers += 1;
1388 
1389 		*nr_descs += desc_count;
1390 		avail_idx += desc_count;
1391 		if (avail_idx >= vq->size)
1392 			avail_idx -= vq->size;
1393 	}
1394 
1395 	if (mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers, false) < 0)
1396 		return -1;
1397 
1398 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1399 					   buffer_desc_count, num_buffers);
1400 
1401 	return 0;
1402 }
1403 
1404 static __rte_noinline uint32_t
1405 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1406 	struct rte_mbuf **pkts, uint32_t count)
1407 	__rte_shared_locks_required(&vq->access_lock)
1408 	__rte_shared_locks_required(&vq->iotlb_lock)
1409 {
1410 	uint32_t pkt_idx = 0;
1411 	uint16_t num_buffers;
1412 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1413 	uint16_t avail_head;
1414 
1415 	/*
1416 	 * The ordering between avail index and
1417 	 * desc reads needs to be enforced.
1418 	 */
1419 	avail_head = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
1420 		rte_memory_order_acquire);
1421 
1422 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1423 
1424 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1425 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1426 		uint16_t nr_vec = 0;
1427 
1428 		if (unlikely(reserve_avail_buf_split(dev, vq,
1429 						pkt_len, buf_vec, &num_buffers,
1430 						avail_head, &nr_vec) < 0)) {
1431 			VHOST_DATA_LOG(dev->ifname, DEBUG,
1432 				"failed to get enough desc from vring");
1433 			vq->shadow_used_idx -= num_buffers;
1434 			break;
1435 		}
1436 
1437 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1438 			"current index %d | end index %d",
1439 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1440 
1441 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec,
1442 					num_buffers, false) < 0) {
1443 			vq->shadow_used_idx -= num_buffers;
1444 			break;
1445 		}
1446 
1447 		vq->last_avail_idx += num_buffers;
1448 	}
1449 
1450 	do_data_copy_enqueue(dev, vq);
1451 
1452 	if (likely(vq->shadow_used_idx)) {
1453 		flush_shadow_used_ring_split(dev, vq);
1454 		vhost_vring_call_split(dev, vq);
1455 	}
1456 
1457 	return pkt_idx;
1458 }
1459 
1460 static __rte_always_inline int
1461 virtio_dev_rx_sync_batch_check(struct virtio_net *dev,
1462 			   struct vhost_virtqueue *vq,
1463 			   struct rte_mbuf **pkts,
1464 			   uint64_t *desc_addrs,
1465 			   uint64_t *lens)
1466 	__rte_shared_locks_required(&vq->iotlb_lock)
1467 {
1468 	bool wrap_counter = vq->avail_wrap_counter;
1469 	struct vring_packed_desc *descs = vq->desc_packed;
1470 	uint16_t avail_idx = vq->last_avail_idx;
1471 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1472 	uint16_t i;
1473 
1474 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1475 		return -1;
1476 
1477 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1478 		return -1;
1479 
1480 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1481 		if (unlikely(pkts[i]->next != NULL))
1482 			return -1;
1483 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1484 					    wrap_counter)))
1485 			return -1;
1486 	}
1487 
1488 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1489 		lens[i] = descs[avail_idx + i].len;
1490 
1491 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1492 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1493 			return -1;
1494 	}
1495 
1496 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1497 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1498 						  descs[avail_idx + i].addr,
1499 						  &lens[i],
1500 						  VHOST_ACCESS_RW);
1501 
1502 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1503 		if (unlikely(!desc_addrs[i]))
1504 			return -1;
1505 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1506 			return -1;
1507 	}
1508 
1509 	return 0;
1510 }
1511 
1512 static __rte_always_inline int
1513 virtio_dev_rx_async_batch_check(struct vhost_virtqueue *vq,
1514 			   struct rte_mbuf **pkts,
1515 			   uint64_t *desc_addrs,
1516 			   uint64_t *lens,
1517 			   int16_t dma_id,
1518 			   uint16_t vchan_id)
1519 {
1520 	bool wrap_counter = vq->avail_wrap_counter;
1521 	struct vring_packed_desc *descs = vq->desc_packed;
1522 	uint16_t avail_idx = vq->last_avail_idx;
1523 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1524 	uint16_t i;
1525 
1526 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1527 		return -1;
1528 
1529 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1530 		return -1;
1531 
1532 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1533 		if (unlikely(pkts[i]->next != NULL))
1534 			return -1;
1535 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1536 					    wrap_counter)))
1537 			return -1;
1538 	}
1539 
1540 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1541 		lens[i] = descs[avail_idx + i].len;
1542 
1543 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1544 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1545 			return -1;
1546 	}
1547 
1548 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1549 		desc_addrs[i] =  descs[avail_idx + i].addr;
1550 
1551 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1552 		if (unlikely(!desc_addrs[i]))
1553 			return -1;
1554 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1555 			return -1;
1556 	}
1557 
1558 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
1559 		return -1;
1560 
1561 	return 0;
1562 }
1563 
1564 static __rte_always_inline void
1565 virtio_dev_rx_batch_packed_copy(struct virtio_net *dev,
1566 			   struct vhost_virtqueue *vq,
1567 			   struct rte_mbuf **pkts,
1568 			   uint64_t *desc_addrs,
1569 			   uint64_t *lens)
1570 	__rte_shared_locks_required(&vq->iotlb_lock)
1571 {
1572 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1573 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1574 	struct vring_packed_desc *descs = vq->desc_packed;
1575 	uint16_t avail_idx = vq->last_avail_idx;
1576 	uint16_t ids[PACKED_BATCH_SIZE];
1577 	uint16_t i;
1578 
1579 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1580 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1581 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1582 					(uintptr_t)desc_addrs[i];
1583 		lens[i] = pkts[i]->pkt_len +
1584 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1585 	}
1586 
1587 	if (rxvq_is_mergeable(dev)) {
1588 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1589 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
1590 		}
1591 	}
1592 
1593 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1594 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1595 
1596 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1597 
1598 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1599 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1600 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1601 			   pkts[i]->pkt_len);
1602 	}
1603 
1604 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1605 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1606 					   lens[i]);
1607 
1608 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1609 		ids[i] = descs[avail_idx + i].id;
1610 
1611 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1612 }
1613 
1614 static __rte_always_inline int
1615 virtio_dev_rx_sync_batch_packed(struct virtio_net *dev,
1616 			   struct vhost_virtqueue *vq,
1617 			   struct rte_mbuf **pkts)
1618 	__rte_shared_locks_required(&vq->iotlb_lock)
1619 {
1620 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1621 	uint64_t lens[PACKED_BATCH_SIZE];
1622 
1623 	if (virtio_dev_rx_sync_batch_check(dev, vq, pkts, desc_addrs, lens) == -1)
1624 		return -1;
1625 
1626 	if (vq->shadow_used_idx) {
1627 		do_data_copy_enqueue(dev, vq);
1628 		vhost_flush_enqueue_shadow_packed(dev, vq);
1629 	}
1630 
1631 	virtio_dev_rx_batch_packed_copy(dev, vq, pkts, desc_addrs, lens);
1632 
1633 	return 0;
1634 }
1635 
1636 static __rte_always_inline int16_t
1637 virtio_dev_rx_single_packed(struct virtio_net *dev,
1638 			    struct vhost_virtqueue *vq,
1639 			    struct rte_mbuf *pkt)
1640 	__rte_shared_locks_required(&vq->access_lock)
1641 	__rte_shared_locks_required(&vq->iotlb_lock)
1642 {
1643 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1644 	uint16_t nr_descs = 0;
1645 
1646 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1647 						 &nr_descs) < 0)) {
1648 		VHOST_DATA_LOG(dev->ifname, DEBUG, "failed to get enough desc from vring");
1649 		return -1;
1650 	}
1651 
1652 	VHOST_DATA_LOG(dev->ifname, DEBUG,
1653 		"current index %d | end index %d",
1654 		vq->last_avail_idx, vq->last_avail_idx + nr_descs);
1655 
1656 	vq_inc_last_avail_packed(vq, nr_descs);
1657 
1658 	return 0;
1659 }
1660 
1661 static __rte_noinline uint32_t
1662 virtio_dev_rx_packed(struct virtio_net *dev,
1663 		     struct vhost_virtqueue *__rte_restrict vq,
1664 		     struct rte_mbuf **__rte_restrict pkts,
1665 		     uint32_t count)
1666 	__rte_shared_locks_required(&vq->access_lock)
1667 	__rte_shared_locks_required(&vq->iotlb_lock)
1668 {
1669 	uint32_t pkt_idx = 0;
1670 
1671 	do {
1672 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1673 
1674 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1675 			if (!virtio_dev_rx_sync_batch_packed(dev, vq,
1676 							&pkts[pkt_idx])) {
1677 				pkt_idx += PACKED_BATCH_SIZE;
1678 				continue;
1679 			}
1680 		}
1681 
1682 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1683 			break;
1684 		pkt_idx++;
1685 
1686 	} while (pkt_idx < count);
1687 
1688 	if (vq->shadow_used_idx) {
1689 		do_data_copy_enqueue(dev, vq);
1690 		vhost_flush_enqueue_shadow_packed(dev, vq);
1691 	}
1692 
1693 	if (pkt_idx)
1694 		vhost_vring_call_packed(dev, vq);
1695 
1696 	return pkt_idx;
1697 }
1698 
1699 static void
1700 virtio_dev_vring_translate(struct virtio_net *dev, struct vhost_virtqueue *vq)
1701 {
1702 	rte_rwlock_write_lock(&vq->access_lock);
1703 	vhost_user_iotlb_rd_lock(vq);
1704 	if (!vq->access_ok)
1705 		vring_translate(dev, vq);
1706 	vhost_user_iotlb_rd_unlock(vq);
1707 	rte_rwlock_write_unlock(&vq->access_lock);
1708 }
1709 
1710 static __rte_always_inline uint32_t
1711 virtio_dev_rx(struct virtio_net *dev, struct vhost_virtqueue *vq,
1712 	struct rte_mbuf **pkts, uint32_t count)
1713 {
1714 	uint32_t nb_tx = 0;
1715 
1716 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
1717 	rte_rwlock_read_lock(&vq->access_lock);
1718 
1719 	if (unlikely(!vq->enabled))
1720 		goto out_access_unlock;
1721 
1722 	vhost_user_iotlb_rd_lock(vq);
1723 
1724 	if (unlikely(!vq->access_ok)) {
1725 		vhost_user_iotlb_rd_unlock(vq);
1726 		rte_rwlock_read_unlock(&vq->access_lock);
1727 
1728 		virtio_dev_vring_translate(dev, vq);
1729 		goto out_no_unlock;
1730 	}
1731 
1732 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1733 	if (count == 0)
1734 		goto out;
1735 
1736 	if (vq_is_packed(dev))
1737 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1738 	else
1739 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1740 
1741 	vhost_queue_stats_update(dev, vq, pkts, nb_tx);
1742 
1743 out:
1744 	vhost_user_iotlb_rd_unlock(vq);
1745 
1746 out_access_unlock:
1747 	rte_rwlock_read_unlock(&vq->access_lock);
1748 
1749 out_no_unlock:
1750 	return nb_tx;
1751 }
1752 
1753 uint16_t
1754 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1755 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1756 {
1757 	struct virtio_net *dev = get_device(vid);
1758 
1759 	if (!dev)
1760 		return 0;
1761 
1762 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1763 		VHOST_DATA_LOG(dev->ifname, ERR,
1764 			"%s: built-in vhost net backend is disabled.",
1765 			__func__);
1766 		return 0;
1767 	}
1768 
1769 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1770 		VHOST_DATA_LOG(dev->ifname, ERR,
1771 			"%s: invalid virtqueue idx %d.",
1772 			__func__, queue_id);
1773 		return 0;
1774 	}
1775 
1776 	return virtio_dev_rx(dev, dev->virtqueue[queue_id], pkts, count);
1777 }
1778 
1779 static __rte_always_inline uint16_t
1780 async_get_first_inflight_pkt_idx(struct vhost_virtqueue *vq)
1781 	__rte_shared_locks_required(&vq->access_lock)
1782 {
1783 	struct vhost_async *async = vq->async;
1784 
1785 	if (async->pkts_idx >= async->pkts_inflight_n)
1786 		return async->pkts_idx - async->pkts_inflight_n;
1787 	else
1788 		return vq->size - async->pkts_inflight_n + async->pkts_idx;
1789 }
1790 
1791 static __rte_always_inline void
1792 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1793 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1794 {
1795 	size_t elem_size = sizeof(struct vring_used_elem);
1796 
1797 	if (d_idx + count <= ring_size) {
1798 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1799 	} else {
1800 		uint16_t size = ring_size - d_idx;
1801 
1802 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1803 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1804 	}
1805 }
1806 
1807 static __rte_noinline uint32_t
1808 virtio_dev_rx_async_submit_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1809 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
1810 	__rte_exclusive_locks_required(&vq->access_lock)
1811 	__rte_shared_locks_required(&vq->iotlb_lock)
1812 {
1813 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1814 	uint32_t pkt_idx = 0;
1815 	uint16_t num_buffers;
1816 	uint16_t avail_head;
1817 
1818 	struct vhost_async *async = vq->async;
1819 	struct async_inflight_info *pkts_info = async->pkts_info;
1820 	uint32_t pkt_err = 0;
1821 	uint16_t n_xfer;
1822 	uint16_t slot_idx = 0;
1823 
1824 	/*
1825 	 * The ordering between avail index and desc reads need to be enforced.
1826 	 */
1827 	avail_head = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
1828 		rte_memory_order_acquire);
1829 
1830 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1831 
1832 	async_iter_reset(async);
1833 
1834 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1835 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1836 		uint16_t nr_vec = 0;
1837 
1838 		if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec,
1839 						&num_buffers, avail_head, &nr_vec) < 0)) {
1840 			VHOST_DATA_LOG(dev->ifname, DEBUG,
1841 				"failed to get enough desc from vring");
1842 			vq->shadow_used_idx -= num_buffers;
1843 			break;
1844 		}
1845 
1846 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1847 			"current index %d | end index %d",
1848 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1849 
1850 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers, true) < 0) {
1851 			vq->shadow_used_idx -= num_buffers;
1852 			break;
1853 		}
1854 
1855 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
1856 		pkts_info[slot_idx].descs = num_buffers;
1857 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1858 
1859 		vq->last_avail_idx += num_buffers;
1860 	}
1861 
1862 	if (unlikely(pkt_idx == 0))
1863 		return 0;
1864 
1865 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
1866 			async->iov_iter, pkt_idx);
1867 
1868 	pkt_err = pkt_idx - n_xfer;
1869 	if (unlikely(pkt_err)) {
1870 		uint16_t num_descs = 0;
1871 
1872 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1873 			"%s: failed to transfer %u packets for queue %u.",
1874 			__func__, pkt_err, vq->index);
1875 
1876 		/* update number of completed packets */
1877 		pkt_idx = n_xfer;
1878 
1879 		/* calculate the sum of descriptors to revert */
1880 		while (pkt_err-- > 0) {
1881 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1882 			slot_idx--;
1883 		}
1884 
1885 		/* recover shadow used ring and available ring */
1886 		vq->shadow_used_idx -= num_descs;
1887 		vq->last_avail_idx -= num_descs;
1888 	}
1889 
1890 	/* keep used descriptors */
1891 	if (likely(vq->shadow_used_idx)) {
1892 		uint16_t to = async->desc_idx_split & (vq->size - 1);
1893 
1894 		store_dma_desc_info_split(vq->shadow_used_split,
1895 				async->descs_split, vq->size, 0, to,
1896 				vq->shadow_used_idx);
1897 
1898 		async->desc_idx_split += vq->shadow_used_idx;
1899 
1900 		async->pkts_idx += pkt_idx;
1901 		if (async->pkts_idx >= vq->size)
1902 			async->pkts_idx -= vq->size;
1903 
1904 		async->pkts_inflight_n += pkt_idx;
1905 		vq->shadow_used_idx = 0;
1906 	}
1907 
1908 	return pkt_idx;
1909 }
1910 
1911 
1912 static __rte_always_inline int
1913 vhost_enqueue_async_packed(struct virtio_net *dev,
1914 			    struct vhost_virtqueue *vq,
1915 			    struct rte_mbuf *pkt,
1916 			    struct buf_vector *buf_vec,
1917 			    uint16_t *nr_descs,
1918 			    uint16_t *nr_buffers)
1919 	__rte_exclusive_locks_required(&vq->access_lock)
1920 	__rte_shared_locks_required(&vq->iotlb_lock)
1921 {
1922 	uint16_t nr_vec = 0;
1923 	uint16_t avail_idx = vq->last_avail_idx;
1924 	uint16_t max_tries, tries = 0;
1925 	uint16_t buf_id = 0;
1926 	uint32_t len = 0;
1927 	uint16_t desc_count = 0;
1928 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1929 	uint32_t buffer_len[vq->size];
1930 	uint16_t buffer_buf_id[vq->size];
1931 	uint16_t buffer_desc_count[vq->size];
1932 
1933 	if (rxvq_is_mergeable(dev))
1934 		max_tries = vq->size - 1;
1935 	else
1936 		max_tries = 1;
1937 
1938 	while (size > 0) {
1939 		/*
1940 		 * if we tried all available ring items, and still
1941 		 * can't get enough buf, it means something abnormal
1942 		 * happened.
1943 		 */
1944 		if (unlikely(++tries > max_tries))
1945 			return -1;
1946 
1947 		if (unlikely(fill_vec_buf_packed(dev, vq,
1948 						avail_idx, &desc_count,
1949 						buf_vec, &nr_vec,
1950 						&buf_id, &len,
1951 						VHOST_ACCESS_RW) < 0))
1952 			return -1;
1953 
1954 		len = RTE_MIN(len, size);
1955 		size -= len;
1956 
1957 		buffer_len[*nr_buffers] = len;
1958 		buffer_buf_id[*nr_buffers] = buf_id;
1959 		buffer_desc_count[*nr_buffers] = desc_count;
1960 		*nr_buffers += 1;
1961 		*nr_descs += desc_count;
1962 		avail_idx += desc_count;
1963 		if (avail_idx >= vq->size)
1964 			avail_idx -= vq->size;
1965 	}
1966 
1967 	if (unlikely(mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, true) < 0))
1968 		return -1;
1969 
1970 	vhost_async_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
1971 					buffer_desc_count, *nr_buffers);
1972 
1973 	return 0;
1974 }
1975 
1976 static __rte_always_inline int16_t
1977 virtio_dev_rx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1978 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers)
1979 	__rte_exclusive_locks_required(&vq->access_lock)
1980 	__rte_shared_locks_required(&vq->iotlb_lock)
1981 {
1982 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1983 
1984 	if (unlikely(vhost_enqueue_async_packed(dev, vq, pkt, buf_vec,
1985 					nr_descs, nr_buffers) < 0)) {
1986 		VHOST_DATA_LOG(dev->ifname, DEBUG, "failed to get enough desc from vring");
1987 		return -1;
1988 	}
1989 
1990 	VHOST_DATA_LOG(dev->ifname, DEBUG,
1991 		"current index %d | end index %d",
1992 		vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1993 
1994 	return 0;
1995 }
1996 
1997 static __rte_always_inline void
1998 virtio_dev_rx_async_packed_batch_enqueue(struct virtio_net *dev,
1999 			   struct vhost_virtqueue *vq,
2000 			   struct rte_mbuf **pkts,
2001 			   uint64_t *desc_addrs,
2002 			   uint64_t *lens)
2003 	__rte_exclusive_locks_required(&vq->access_lock)
2004 	__rte_shared_locks_required(&vq->iotlb_lock)
2005 {
2006 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2007 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
2008 	struct vring_packed_desc *descs = vq->desc_packed;
2009 	struct vhost_async *async = vq->async;
2010 	uint16_t avail_idx = vq->last_avail_idx;
2011 	uint32_t mbuf_offset = 0;
2012 	uint16_t ids[PACKED_BATCH_SIZE];
2013 	uint64_t mapped_len[PACKED_BATCH_SIZE];
2014 	void *host_iova[PACKED_BATCH_SIZE];
2015 	uintptr_t desc;
2016 	uint16_t i;
2017 
2018 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2019 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2020 		desc = vhost_iova_to_vva(dev, vq, desc_addrs[i], &lens[i], VHOST_ACCESS_RW);
2021 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc;
2022 		lens[i] = pkts[i]->pkt_len +
2023 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
2024 	}
2025 
2026 	if (rxvq_is_mergeable(dev)) {
2027 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2028 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
2029 		}
2030 	}
2031 
2032 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2033 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
2034 
2035 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2036 
2037 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2038 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
2039 			desc_addrs[i] + buf_offset, lens[i], &mapped_len[i]);
2040 	}
2041 
2042 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2043 		async_iter_initialize(dev, async);
2044 		async_iter_add_iovec(dev, async,
2045 				(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
2046 				host_iova[i],
2047 				mapped_len[i]);
2048 		async->iter_idx++;
2049 	}
2050 
2051 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2052 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr, lens[i]);
2053 
2054 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2055 		ids[i] = descs[avail_idx + i].id;
2056 
2057 	vhost_async_shadow_enqueue_packed_batch(vq, lens, ids);
2058 }
2059 
2060 static __rte_always_inline int
2061 virtio_dev_rx_async_packed_batch(struct virtio_net *dev,
2062 			   struct vhost_virtqueue *vq,
2063 			   struct rte_mbuf **pkts,
2064 			   int16_t dma_id, uint16_t vchan_id)
2065 	__rte_exclusive_locks_required(&vq->access_lock)
2066 	__rte_shared_locks_required(&vq->iotlb_lock)
2067 {
2068 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
2069 	uint64_t lens[PACKED_BATCH_SIZE];
2070 
2071 	if (virtio_dev_rx_async_batch_check(vq, pkts, desc_addrs, lens, dma_id, vchan_id) == -1)
2072 		return -1;
2073 
2074 	virtio_dev_rx_async_packed_batch_enqueue(dev, vq, pkts, desc_addrs, lens);
2075 
2076 	return 0;
2077 }
2078 
2079 static __rte_always_inline void
2080 dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
2081 			uint32_t nr_err, uint32_t *pkt_idx)
2082 	__rte_exclusive_locks_required(&vq->access_lock)
2083 {
2084 	uint16_t descs_err = 0;
2085 	uint16_t buffers_err = 0;
2086 	struct vhost_async *async = vq->async;
2087 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
2088 
2089 	*pkt_idx -= nr_err;
2090 	/* calculate the sum of buffers and descs of DMA-error packets. */
2091 	while (nr_err-- > 0) {
2092 		descs_err += pkts_info[slot_idx % vq->size].descs;
2093 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
2094 		slot_idx--;
2095 	}
2096 
2097 	if (vq->last_avail_idx >= descs_err) {
2098 		vq->last_avail_idx -= descs_err;
2099 	} else {
2100 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
2101 		vq->avail_wrap_counter ^= 1;
2102 	}
2103 
2104 	if (async->buffer_idx_packed >= buffers_err)
2105 		async->buffer_idx_packed -= buffers_err;
2106 	else
2107 		async->buffer_idx_packed = async->buffer_idx_packed + vq->size - buffers_err;
2108 }
2109 
2110 static __rte_noinline uint32_t
2111 virtio_dev_rx_async_submit_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2112 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2113 	__rte_exclusive_locks_required(&vq->access_lock)
2114 	__rte_shared_locks_required(&vq->iotlb_lock)
2115 {
2116 	uint32_t pkt_idx = 0;
2117 	uint16_t n_xfer;
2118 	uint16_t num_buffers;
2119 	uint16_t num_descs;
2120 
2121 	struct vhost_async *async = vq->async;
2122 	struct async_inflight_info *pkts_info = async->pkts_info;
2123 	uint32_t pkt_err = 0;
2124 	uint16_t slot_idx = 0;
2125 	uint16_t i;
2126 
2127 	do {
2128 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2129 
2130 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
2131 			if (!virtio_dev_rx_async_packed_batch(dev, vq, &pkts[pkt_idx],
2132 					dma_id, vchan_id)) {
2133 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
2134 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2135 					pkts_info[slot_idx].descs = 1;
2136 					pkts_info[slot_idx].nr_buffers = 1;
2137 					pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2138 					pkt_idx++;
2139 				}
2140 				continue;
2141 			}
2142 		}
2143 
2144 		num_buffers = 0;
2145 		num_descs = 0;
2146 		if (unlikely(virtio_dev_rx_async_packed(dev, vq, pkts[pkt_idx],
2147 						&num_descs, &num_buffers) < 0))
2148 			break;
2149 
2150 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2151 
2152 		pkts_info[slot_idx].descs = num_descs;
2153 		pkts_info[slot_idx].nr_buffers = num_buffers;
2154 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2155 
2156 		pkt_idx++;
2157 		vq_inc_last_avail_packed(vq, num_descs);
2158 	} while (pkt_idx < count);
2159 
2160 	if (unlikely(pkt_idx == 0))
2161 		return 0;
2162 
2163 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
2164 			async->iov_iter, pkt_idx);
2165 
2166 	async_iter_reset(async);
2167 
2168 	pkt_err = pkt_idx - n_xfer;
2169 	if (unlikely(pkt_err)) {
2170 		VHOST_DATA_LOG(dev->ifname, DEBUG,
2171 			"%s: failed to transfer %u packets for queue %u.",
2172 			__func__, pkt_err, vq->index);
2173 		dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
2174 	}
2175 
2176 	async->pkts_idx += pkt_idx;
2177 	if (async->pkts_idx >= vq->size)
2178 		async->pkts_idx -= vq->size;
2179 
2180 	async->pkts_inflight_n += pkt_idx;
2181 
2182 	return pkt_idx;
2183 }
2184 
2185 static __rte_always_inline void
2186 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
2187 	__rte_shared_locks_required(&vq->access_lock)
2188 {
2189 	struct vhost_async *async = vq->async;
2190 	uint16_t nr_left = n_descs;
2191 	uint16_t nr_copy;
2192 	uint16_t to, from;
2193 
2194 	do {
2195 		from = async->last_desc_idx_split & (vq->size - 1);
2196 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
2197 		to = vq->last_used_idx & (vq->size - 1);
2198 
2199 		if (to + nr_copy <= vq->size) {
2200 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2201 					nr_copy * sizeof(struct vring_used_elem));
2202 		} else {
2203 			uint16_t size = vq->size - to;
2204 
2205 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2206 					size * sizeof(struct vring_used_elem));
2207 			rte_memcpy(&vq->used->ring[0], &async->descs_split[from + size],
2208 					(nr_copy - size) * sizeof(struct vring_used_elem));
2209 		}
2210 
2211 		async->last_desc_idx_split += nr_copy;
2212 		vq->last_used_idx += nr_copy;
2213 		nr_left -= nr_copy;
2214 	} while (nr_left > 0);
2215 }
2216 
2217 static __rte_always_inline void
2218 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
2219 				uint16_t n_buffers)
2220 	__rte_shared_locks_required(&vq->access_lock)
2221 {
2222 	struct vhost_async *async = vq->async;
2223 	uint16_t from = async->last_buffer_idx_packed;
2224 	uint16_t used_idx = vq->last_used_idx;
2225 	uint16_t head_idx = vq->last_used_idx;
2226 	uint16_t head_flags = 0;
2227 	uint16_t i;
2228 
2229 	/* Split loop in two to save memory barriers */
2230 	for (i = 0; i < n_buffers; i++) {
2231 		vq->desc_packed[used_idx].id = async->buffers_packed[from].id;
2232 		vq->desc_packed[used_idx].len = async->buffers_packed[from].len;
2233 
2234 		used_idx += async->buffers_packed[from].count;
2235 		if (used_idx >= vq->size)
2236 			used_idx -= vq->size;
2237 
2238 		from++;
2239 		if (from >= vq->size)
2240 			from = 0;
2241 	}
2242 
2243 	/* The ordering for storing desc flags needs to be enforced. */
2244 	rte_atomic_thread_fence(rte_memory_order_release);
2245 
2246 	from = async->last_buffer_idx_packed;
2247 
2248 	for (i = 0; i < n_buffers; i++) {
2249 		uint16_t flags;
2250 
2251 		if (async->buffers_packed[from].len)
2252 			flags = VRING_DESC_F_WRITE;
2253 		else
2254 			flags = 0;
2255 
2256 		if (vq->used_wrap_counter) {
2257 			flags |= VRING_DESC_F_USED;
2258 			flags |= VRING_DESC_F_AVAIL;
2259 		} else {
2260 			flags &= ~VRING_DESC_F_USED;
2261 			flags &= ~VRING_DESC_F_AVAIL;
2262 		}
2263 
2264 		if (i > 0) {
2265 			vq->desc_packed[vq->last_used_idx].flags = flags;
2266 		} else {
2267 			head_idx = vq->last_used_idx;
2268 			head_flags = flags;
2269 		}
2270 
2271 		vq_inc_last_used_packed(vq, async->buffers_packed[from].count);
2272 
2273 		from++;
2274 		if (from == vq->size)
2275 			from = 0;
2276 	}
2277 
2278 	vq->desc_packed[head_idx].flags = head_flags;
2279 	async->last_buffer_idx_packed = from;
2280 }
2281 
2282 static __rte_always_inline uint16_t
2283 vhost_poll_enqueue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2284 	struct rte_mbuf **pkts, uint16_t count, int16_t dma_id, uint16_t vchan_id)
2285 	__rte_shared_locks_required(&vq->access_lock)
2286 {
2287 	struct vhost_async *async = vq->async;
2288 	struct async_inflight_info *pkts_info = async->pkts_info;
2289 	uint16_t nr_cpl_pkts = 0;
2290 	uint16_t n_descs = 0, n_buffers = 0;
2291 	uint16_t start_idx, from, i;
2292 
2293 	/* Check completed copies for the given DMA vChannel */
2294 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
2295 
2296 	start_idx = async_get_first_inflight_pkt_idx(vq);
2297 	/**
2298 	 * Calculate the number of copy completed packets.
2299 	 * Note that there may be completed packets even if
2300 	 * no copies are reported done by the given DMA vChannel,
2301 	 * as it's possible that a virtqueue uses multiple DMA
2302 	 * vChannels.
2303 	 */
2304 	from = start_idx;
2305 	while (vq->async->pkts_cmpl_flag[from] && count--) {
2306 		vq->async->pkts_cmpl_flag[from] = false;
2307 		from++;
2308 		if (from >= vq->size)
2309 			from -= vq->size;
2310 		nr_cpl_pkts++;
2311 	}
2312 
2313 	if (nr_cpl_pkts == 0)
2314 		return 0;
2315 
2316 	for (i = 0; i < nr_cpl_pkts; i++) {
2317 		from = (start_idx + i) % vq->size;
2318 		/* Only used with packed ring */
2319 		n_buffers += pkts_info[from].nr_buffers;
2320 		/* Only used with split ring */
2321 		n_descs += pkts_info[from].descs;
2322 		pkts[i] = pkts_info[from].mbuf;
2323 	}
2324 
2325 	async->pkts_inflight_n -= nr_cpl_pkts;
2326 
2327 	if (likely(vq->enabled && vq->access_ok)) {
2328 		if (vq_is_packed(dev)) {
2329 			write_back_completed_descs_packed(vq, n_buffers);
2330 			vhost_vring_call_packed(dev, vq);
2331 		} else {
2332 			write_back_completed_descs_split(vq, n_descs);
2333 			rte_atomic_fetch_add_explicit(
2334 				(unsigned short __rte_atomic *)&vq->used->idx,
2335 				n_descs, rte_memory_order_release);
2336 			vhost_vring_call_split(dev, vq);
2337 		}
2338 	} else {
2339 		if (vq_is_packed(dev)) {
2340 			async->last_buffer_idx_packed += n_buffers;
2341 			if (async->last_buffer_idx_packed >= vq->size)
2342 				async->last_buffer_idx_packed -= vq->size;
2343 		} else {
2344 			async->last_desc_idx_split += n_descs;
2345 		}
2346 	}
2347 
2348 	return nr_cpl_pkts;
2349 }
2350 
2351 uint16_t
2352 rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
2353 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2354 		uint16_t vchan_id)
2355 {
2356 	struct virtio_net *dev = get_device(vid);
2357 	struct vhost_virtqueue *vq;
2358 	uint16_t n_pkts_cpl = 0;
2359 
2360 	if (unlikely(!dev))
2361 		return 0;
2362 
2363 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2364 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2365 		VHOST_DATA_LOG(dev->ifname, ERR,
2366 			"%s: invalid virtqueue idx %d.",
2367 			__func__, queue_id);
2368 		return 0;
2369 	}
2370 
2371 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2372 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2373 		VHOST_DATA_LOG(dev->ifname, ERR,
2374 			"%s: invalid channel %d:%u.",
2375 			__func__, dma_id, vchan_id);
2376 		return 0;
2377 	}
2378 
2379 	vq = dev->virtqueue[queue_id];
2380 
2381 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2382 		VHOST_DATA_LOG(dev->ifname, DEBUG,
2383 			"%s: virtqueue %u is busy.",
2384 			__func__, queue_id);
2385 		return 0;
2386 	}
2387 
2388 	if (unlikely(!vq->async)) {
2389 		VHOST_DATA_LOG(dev->ifname, ERR,
2390 			"%s: async not registered for virtqueue %d.",
2391 			__func__, queue_id);
2392 		goto out;
2393 	}
2394 
2395 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count, dma_id, vchan_id);
2396 
2397 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2398 	vq->stats.inflight_completed += n_pkts_cpl;
2399 
2400 out:
2401 	rte_rwlock_read_unlock(&vq->access_lock);
2402 
2403 	return n_pkts_cpl;
2404 }
2405 
2406 uint16_t
2407 rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
2408 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2409 		uint16_t vchan_id)
2410 {
2411 	struct virtio_net *dev = get_device(vid);
2412 	struct vhost_virtqueue *vq;
2413 	uint16_t n_pkts_cpl = 0;
2414 
2415 	if (!dev)
2416 		return 0;
2417 
2418 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2419 	if (unlikely(queue_id >= dev->nr_vring)) {
2420 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %d.",
2421 			__func__, queue_id);
2422 		return 0;
2423 	}
2424 
2425 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2426 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
2427 			__func__, dma_id);
2428 		return 0;
2429 	}
2430 
2431 	vq = dev->virtqueue[queue_id];
2432 
2433 	vq_assert_lock(dev, vq);
2434 
2435 	if (unlikely(!vq->async)) {
2436 		VHOST_DATA_LOG(dev->ifname, ERR,
2437 			"%s: async not registered for virtqueue %d.",
2438 			__func__, queue_id);
2439 		return 0;
2440 	}
2441 
2442 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2443 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2444 		VHOST_DATA_LOG(dev->ifname, ERR,
2445 			"%s: invalid channel %d:%u.",
2446 			__func__, dma_id, vchan_id);
2447 		return 0;
2448 	}
2449 
2450 	if ((queue_id & 1) == 0)
2451 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2452 			dma_id, vchan_id);
2453 	else
2454 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2455 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2456 
2457 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2458 	vq->stats.inflight_completed += n_pkts_cpl;
2459 
2460 	return n_pkts_cpl;
2461 }
2462 
2463 uint16_t
2464 rte_vhost_clear_queue(int vid, uint16_t queue_id, struct rte_mbuf **pkts,
2465 		uint16_t count, int16_t dma_id, uint16_t vchan_id)
2466 {
2467 	struct virtio_net *dev = get_device(vid);
2468 	struct vhost_virtqueue *vq;
2469 	uint16_t n_pkts_cpl = 0;
2470 
2471 	if (!dev)
2472 		return 0;
2473 
2474 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2475 	if (unlikely(queue_id >= dev->nr_vring)) {
2476 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %u.",
2477 			__func__, queue_id);
2478 		return 0;
2479 	}
2480 
2481 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2482 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
2483 			__func__, dma_id);
2484 		return 0;
2485 	}
2486 
2487 	vq = dev->virtqueue[queue_id];
2488 
2489 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2490 		VHOST_DATA_LOG(dev->ifname, DEBUG, "%s: virtqueue %u is busy.",
2491 			__func__, queue_id);
2492 		return 0;
2493 	}
2494 
2495 	if (unlikely(!vq->async)) {
2496 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: async not registered for queue id %u.",
2497 			__func__, queue_id);
2498 		goto out_access_unlock;
2499 	}
2500 
2501 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2502 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2503 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid channel %d:%u.",
2504 			__func__, dma_id, vchan_id);
2505 		goto out_access_unlock;
2506 	}
2507 
2508 	if ((queue_id & 1) == 0)
2509 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2510 			dma_id, vchan_id);
2511 	else
2512 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2513 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2514 
2515 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2516 	vq->stats.inflight_completed += n_pkts_cpl;
2517 
2518 out_access_unlock:
2519 	rte_rwlock_read_unlock(&vq->access_lock);
2520 
2521 	return n_pkts_cpl;
2522 }
2523 
2524 static __rte_always_inline uint32_t
2525 virtio_dev_rx_async_submit(struct virtio_net *dev, struct vhost_virtqueue *vq,
2526 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2527 {
2528 	uint32_t nb_tx = 0;
2529 
2530 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2531 
2532 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2533 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2534 		VHOST_DATA_LOG(dev->ifname, ERR,
2535 			"%s: invalid channel %d:%u.",
2536 			 __func__, dma_id, vchan_id);
2537 		return 0;
2538 	}
2539 
2540 	rte_rwlock_write_lock(&vq->access_lock);
2541 
2542 	if (unlikely(!vq->enabled || !vq->async))
2543 		goto out_access_unlock;
2544 
2545 	vhost_user_iotlb_rd_lock(vq);
2546 
2547 	if (unlikely(!vq->access_ok)) {
2548 		vhost_user_iotlb_rd_unlock(vq);
2549 		rte_rwlock_read_unlock(&vq->access_lock);
2550 
2551 		virtio_dev_vring_translate(dev, vq);
2552 		goto out_no_unlock;
2553 	}
2554 
2555 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
2556 	if (count == 0)
2557 		goto out;
2558 
2559 	if (vq_is_packed(dev))
2560 		nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, pkts, count,
2561 			dma_id, vchan_id);
2562 	else
2563 		nb_tx = virtio_dev_rx_async_submit_split(dev, vq, pkts, count,
2564 			dma_id, vchan_id);
2565 
2566 	vq->stats.inflight_submitted += nb_tx;
2567 
2568 out:
2569 	vhost_user_iotlb_rd_unlock(vq);
2570 
2571 out_access_unlock:
2572 	rte_rwlock_write_unlock(&vq->access_lock);
2573 
2574 out_no_unlock:
2575 	return nb_tx;
2576 }
2577 
2578 uint16_t
2579 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2580 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2581 		uint16_t vchan_id)
2582 {
2583 	struct virtio_net *dev = get_device(vid);
2584 
2585 	if (!dev)
2586 		return 0;
2587 
2588 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2589 		VHOST_DATA_LOG(dev->ifname, ERR,
2590 			"%s: built-in vhost net backend is disabled.",
2591 			__func__);
2592 		return 0;
2593 	}
2594 
2595 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2596 		VHOST_DATA_LOG(dev->ifname, ERR,
2597 			"%s: invalid virtqueue idx %d.",
2598 			__func__, queue_id);
2599 		return 0;
2600 	}
2601 
2602 	return virtio_dev_rx_async_submit(dev, dev->virtqueue[queue_id], pkts, count,
2603 		dma_id, vchan_id);
2604 }
2605 
2606 static inline bool
2607 virtio_net_with_host_offload(struct virtio_net *dev)
2608 {
2609 	if (dev->features &
2610 			((1ULL << VIRTIO_NET_F_CSUM) |
2611 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2612 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2613 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2614 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2615 		return true;
2616 
2617 	return false;
2618 }
2619 
2620 static int
2621 parse_headers(struct rte_mbuf *m, uint8_t *l4_proto)
2622 {
2623 	struct rte_ipv4_hdr *ipv4_hdr;
2624 	struct rte_ipv6_hdr *ipv6_hdr;
2625 	struct rte_ether_hdr *eth_hdr;
2626 	uint16_t ethertype;
2627 	uint16_t data_len = rte_pktmbuf_data_len(m);
2628 
2629 	if (data_len < sizeof(struct rte_ether_hdr))
2630 		return -EINVAL;
2631 
2632 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2633 
2634 	m->l2_len = sizeof(struct rte_ether_hdr);
2635 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2636 
2637 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2638 		if (data_len < sizeof(struct rte_ether_hdr) +
2639 				sizeof(struct rte_vlan_hdr))
2640 			goto error;
2641 
2642 		struct rte_vlan_hdr *vlan_hdr =
2643 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2644 
2645 		m->l2_len += sizeof(struct rte_vlan_hdr);
2646 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2647 	}
2648 
2649 	switch (ethertype) {
2650 	case RTE_ETHER_TYPE_IPV4:
2651 		if (data_len < m->l2_len + sizeof(struct rte_ipv4_hdr))
2652 			goto error;
2653 		ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
2654 				m->l2_len);
2655 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2656 		if (data_len < m->l2_len + m->l3_len)
2657 			goto error;
2658 		m->ol_flags |= RTE_MBUF_F_TX_IPV4;
2659 		*l4_proto = ipv4_hdr->next_proto_id;
2660 		break;
2661 	case RTE_ETHER_TYPE_IPV6:
2662 		if (data_len < m->l2_len + sizeof(struct rte_ipv6_hdr))
2663 			goto error;
2664 		ipv6_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
2665 				m->l2_len);
2666 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2667 		m->ol_flags |= RTE_MBUF_F_TX_IPV6;
2668 		*l4_proto = ipv6_hdr->proto;
2669 		break;
2670 	default:
2671 		/* a valid L3 header is needed for further L4 parsing */
2672 		goto error;
2673 	}
2674 
2675 	/* both CSUM and GSO need a valid L4 header */
2676 	switch (*l4_proto) {
2677 	case IPPROTO_TCP:
2678 		if (data_len < m->l2_len + m->l3_len +
2679 				sizeof(struct rte_tcp_hdr))
2680 			goto error;
2681 		break;
2682 	case IPPROTO_UDP:
2683 		if (data_len < m->l2_len + m->l3_len +
2684 				sizeof(struct rte_udp_hdr))
2685 			goto error;
2686 		break;
2687 	case IPPROTO_SCTP:
2688 		if (data_len < m->l2_len + m->l3_len +
2689 				sizeof(struct rte_sctp_hdr))
2690 			goto error;
2691 		break;
2692 	default:
2693 		goto error;
2694 	}
2695 
2696 	return 0;
2697 
2698 error:
2699 	m->l2_len = 0;
2700 	m->l3_len = 0;
2701 	m->ol_flags = 0;
2702 	return -EINVAL;
2703 }
2704 
2705 static __rte_always_inline void
2706 vhost_dequeue_offload_legacy(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2707 		struct rte_mbuf *m)
2708 {
2709 	uint8_t l4_proto = 0;
2710 	struct rte_tcp_hdr *tcp_hdr = NULL;
2711 	uint16_t tcp_len;
2712 	uint16_t data_len = rte_pktmbuf_data_len(m);
2713 
2714 	if (parse_headers(m, &l4_proto) < 0)
2715 		return;
2716 
2717 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2718 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2719 			switch (hdr->csum_offset) {
2720 			case (offsetof(struct rte_tcp_hdr, cksum)):
2721 				if (l4_proto != IPPROTO_TCP)
2722 					goto error;
2723 				m->ol_flags |= RTE_MBUF_F_TX_TCP_CKSUM;
2724 				break;
2725 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2726 				if (l4_proto != IPPROTO_UDP)
2727 					goto error;
2728 				m->ol_flags |= RTE_MBUF_F_TX_UDP_CKSUM;
2729 				break;
2730 			case (offsetof(struct rte_sctp_hdr, cksum)):
2731 				if (l4_proto != IPPROTO_SCTP)
2732 					goto error;
2733 				m->ol_flags |= RTE_MBUF_F_TX_SCTP_CKSUM;
2734 				break;
2735 			default:
2736 				goto error;
2737 			}
2738 		} else {
2739 			goto error;
2740 		}
2741 	}
2742 
2743 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2744 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2745 		case VIRTIO_NET_HDR_GSO_TCPV4:
2746 		case VIRTIO_NET_HDR_GSO_TCPV6:
2747 			if (l4_proto != IPPROTO_TCP)
2748 				goto error;
2749 			tcp_hdr = rte_pktmbuf_mtod_offset(m,
2750 					struct rte_tcp_hdr *,
2751 					m->l2_len + m->l3_len);
2752 			tcp_len = (tcp_hdr->data_off & 0xf0) >> 2;
2753 			if (data_len < m->l2_len + m->l3_len + tcp_len)
2754 				goto error;
2755 			m->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
2756 			m->tso_segsz = hdr->gso_size;
2757 			m->l4_len = tcp_len;
2758 			break;
2759 		case VIRTIO_NET_HDR_GSO_UDP:
2760 			if (l4_proto != IPPROTO_UDP)
2761 				goto error;
2762 			m->ol_flags |= RTE_MBUF_F_TX_UDP_SEG;
2763 			m->tso_segsz = hdr->gso_size;
2764 			m->l4_len = sizeof(struct rte_udp_hdr);
2765 			break;
2766 		default:
2767 			VHOST_DATA_LOG(dev->ifname, WARNING,
2768 				"unsupported gso type %u.",
2769 				hdr->gso_type);
2770 			goto error;
2771 		}
2772 	}
2773 	return;
2774 
2775 error:
2776 	m->l2_len = 0;
2777 	m->l3_len = 0;
2778 	m->ol_flags = 0;
2779 }
2780 
2781 static __rte_always_inline void
2782 vhost_dequeue_offload(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2783 		struct rte_mbuf *m, bool legacy_ol_flags)
2784 {
2785 	struct rte_net_hdr_lens hdr_lens;
2786 	int l4_supported = 0;
2787 	uint32_t ptype;
2788 
2789 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2790 		return;
2791 
2792 	if (legacy_ol_flags) {
2793 		vhost_dequeue_offload_legacy(dev, hdr, m);
2794 		return;
2795 	}
2796 
2797 	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
2798 
2799 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2800 	m->packet_type = ptype;
2801 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2802 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2803 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2804 		l4_supported = 1;
2805 
2806 	/* According to Virtio 1.1 spec, the device only needs to look at
2807 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2808 	 * This differs from the processing incoming packets path where the
2809 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2810 	 * device.
2811 	 *
2812 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2813 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2814 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2815 	 *
2816 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2817 	 * The device MUST ignore flag bits that it does not recognize.
2818 	 */
2819 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2820 		uint32_t hdrlen;
2821 
2822 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2823 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2824 			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
2825 		} else {
2826 			/* Unknown proto or tunnel, do sw cksum. We can assume
2827 			 * the cksum field is in the first segment since the
2828 			 * buffers we provided to the host are large enough.
2829 			 * In case of SCTP, this will be wrong since it's a CRC
2830 			 * but there's nothing we can do.
2831 			 */
2832 			uint16_t csum = 0, off;
2833 
2834 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2835 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2836 				return;
2837 			if (likely(csum != 0xffff))
2838 				csum = ~csum;
2839 			off = hdr->csum_offset + hdr->csum_start;
2840 			if (rte_pktmbuf_data_len(m) >= off + 1)
2841 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2842 		}
2843 	}
2844 
2845 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2846 		if (hdr->gso_size == 0)
2847 			return;
2848 
2849 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2850 		case VIRTIO_NET_HDR_GSO_TCPV4:
2851 		case VIRTIO_NET_HDR_GSO_TCPV6:
2852 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2853 				break;
2854 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2855 			m->tso_segsz = hdr->gso_size;
2856 			break;
2857 		case VIRTIO_NET_HDR_GSO_UDP:
2858 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2859 				break;
2860 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2861 			m->tso_segsz = hdr->gso_size;
2862 			break;
2863 		default:
2864 			break;
2865 		}
2866 	}
2867 }
2868 
2869 static __rte_noinline void
2870 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2871 		struct buf_vector *buf_vec)
2872 {
2873 	uint64_t len;
2874 	uint64_t remain = sizeof(struct virtio_net_hdr);
2875 	uint64_t src;
2876 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2877 
2878 	while (remain) {
2879 		len = RTE_MIN(remain, buf_vec->buf_len);
2880 		src = buf_vec->buf_addr;
2881 		rte_memcpy((void *)(uintptr_t)dst,
2882 				(void *)(uintptr_t)src, len);
2883 
2884 		remain -= len;
2885 		dst += len;
2886 		buf_vec++;
2887 	}
2888 }
2889 
2890 static __rte_always_inline int
2891 desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2892 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2893 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2894 		  bool legacy_ol_flags, uint16_t slot_idx, bool is_async)
2895 	__rte_shared_locks_required(&vq->access_lock)
2896 	__rte_shared_locks_required(&vq->iotlb_lock)
2897 {
2898 	uint32_t buf_avail, buf_offset, buf_len;
2899 	uint64_t buf_addr, buf_iova;
2900 	uint32_t mbuf_avail, mbuf_offset;
2901 	uint32_t hdr_remain = dev->vhost_hlen;
2902 	uint32_t cpy_len;
2903 	struct rte_mbuf *cur = m, *prev = m;
2904 	struct virtio_net_hdr tmp_hdr;
2905 	struct virtio_net_hdr *hdr = NULL;
2906 	uint16_t vec_idx;
2907 	struct vhost_async *async = vq->async;
2908 	struct async_inflight_info *pkts_info;
2909 
2910 	/*
2911 	 * The caller has checked the descriptors chain is larger than the
2912 	 * header size.
2913 	 */
2914 
2915 	if (virtio_net_with_host_offload(dev)) {
2916 		if (unlikely(buf_vec[0].buf_len < sizeof(struct virtio_net_hdr))) {
2917 			/*
2918 			 * No luck, the virtio-net header doesn't fit
2919 			 * in a contiguous virtual area.
2920 			 */
2921 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2922 			hdr = &tmp_hdr;
2923 		} else {
2924 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_vec[0].buf_addr);
2925 		}
2926 	}
2927 
2928 	for (vec_idx = 0; vec_idx < nr_vec; vec_idx++) {
2929 		if (buf_vec[vec_idx].buf_len > hdr_remain)
2930 			break;
2931 
2932 		hdr_remain -= buf_vec[vec_idx].buf_len;
2933 	}
2934 
2935 	buf_addr = buf_vec[vec_idx].buf_addr;
2936 	buf_iova = buf_vec[vec_idx].buf_iova;
2937 	buf_len = buf_vec[vec_idx].buf_len;
2938 	buf_offset = hdr_remain;
2939 	buf_avail = buf_vec[vec_idx].buf_len - hdr_remain;
2940 
2941 	PRINT_PACKET(dev,
2942 			(uintptr_t)(buf_addr + buf_offset),
2943 			(uint32_t)buf_avail, 0);
2944 
2945 	mbuf_offset = 0;
2946 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2947 
2948 	if (is_async) {
2949 		pkts_info = async->pkts_info;
2950 		if (async_iter_initialize(dev, async))
2951 			return -1;
2952 	}
2953 
2954 	while (1) {
2955 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2956 
2957 		if (is_async) {
2958 			if (async_fill_seg(dev, vq, cur, mbuf_offset,
2959 					   buf_iova + buf_offset, cpy_len, false) < 0)
2960 				goto error;
2961 		} else if (likely(hdr && cur == m)) {
2962 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
2963 				(void *)((uintptr_t)(buf_addr + buf_offset)),
2964 				cpy_len);
2965 		} else {
2966 			sync_fill_seg(dev, vq, cur, mbuf_offset,
2967 				      buf_addr + buf_offset,
2968 				      buf_iova + buf_offset, cpy_len, false);
2969 		}
2970 
2971 		mbuf_avail  -= cpy_len;
2972 		mbuf_offset += cpy_len;
2973 		buf_avail -= cpy_len;
2974 		buf_offset += cpy_len;
2975 
2976 		/* This buf reaches to its end, get the next one */
2977 		if (buf_avail == 0) {
2978 			if (++vec_idx >= nr_vec)
2979 				break;
2980 
2981 			buf_addr = buf_vec[vec_idx].buf_addr;
2982 			buf_iova = buf_vec[vec_idx].buf_iova;
2983 			buf_len = buf_vec[vec_idx].buf_len;
2984 
2985 			buf_offset = 0;
2986 			buf_avail  = buf_len;
2987 
2988 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2989 					(uint32_t)buf_avail, 0);
2990 		}
2991 
2992 		/*
2993 		 * This mbuf reaches to its end, get a new one
2994 		 * to hold more data.
2995 		 */
2996 		if (mbuf_avail == 0) {
2997 			cur = rte_pktmbuf_alloc(mbuf_pool);
2998 			if (unlikely(cur == NULL)) {
2999 				vq->stats.mbuf_alloc_failed++;
3000 				VHOST_DATA_LOG(dev->ifname, ERR,
3001 					"failed to allocate memory for mbuf.");
3002 				goto error;
3003 			}
3004 
3005 			prev->next = cur;
3006 			prev->data_len = mbuf_offset;
3007 			m->nb_segs += 1;
3008 			m->pkt_len += mbuf_offset;
3009 			prev = cur;
3010 
3011 			mbuf_offset = 0;
3012 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
3013 		}
3014 	}
3015 
3016 	prev->data_len = mbuf_offset;
3017 	m->pkt_len    += mbuf_offset;
3018 
3019 	if (is_async) {
3020 		async_iter_finalize(async);
3021 		if (hdr)
3022 			pkts_info[slot_idx].nethdr = *hdr;
3023 	} else if (hdr) {
3024 		vhost_dequeue_offload(dev, hdr, m, legacy_ol_flags);
3025 	}
3026 
3027 	return 0;
3028 error:
3029 	if (is_async)
3030 		async_iter_cancel(async);
3031 
3032 	return -1;
3033 }
3034 
3035 static void
3036 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
3037 {
3038 	rte_free(opaque);
3039 }
3040 
3041 static int
3042 virtio_dev_extbuf_alloc(struct virtio_net *dev, struct rte_mbuf *pkt, uint32_t size)
3043 {
3044 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
3045 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
3046 	uint16_t buf_len;
3047 	rte_iova_t iova;
3048 	void *buf;
3049 
3050 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
3051 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
3052 
3053 	if (unlikely(total_len > UINT16_MAX))
3054 		return -ENOSPC;
3055 
3056 	buf_len = total_len;
3057 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
3058 	if (unlikely(buf == NULL))
3059 		return -ENOMEM;
3060 
3061 	/* Initialize shinfo */
3062 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
3063 						virtio_dev_extbuf_free, buf);
3064 	if (unlikely(shinfo == NULL)) {
3065 		rte_free(buf);
3066 		VHOST_DATA_LOG(dev->ifname, ERR, "failed to init shinfo");
3067 		return -1;
3068 	}
3069 
3070 	iova = rte_malloc_virt2iova(buf);
3071 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
3072 	rte_pktmbuf_reset_headroom(pkt);
3073 
3074 	return 0;
3075 }
3076 
3077 /*
3078  * Prepare a host supported pktmbuf.
3079  */
3080 static __rte_always_inline int
3081 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
3082 			 uint32_t data_len)
3083 {
3084 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
3085 		return 0;
3086 
3087 	/* attach an external buffer if supported */
3088 	if (dev->extbuf && !virtio_dev_extbuf_alloc(dev, pkt, data_len))
3089 		return 0;
3090 
3091 	/* check if chained buffers are allowed */
3092 	if (!dev->linearbuf)
3093 		return 0;
3094 
3095 	return -1;
3096 }
3097 
3098 __rte_always_inline
3099 static uint16_t
3100 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3101 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3102 	bool legacy_ol_flags)
3103 	__rte_shared_locks_required(&vq->access_lock)
3104 	__rte_shared_locks_required(&vq->iotlb_lock)
3105 {
3106 	uint16_t i;
3107 	uint16_t avail_entries;
3108 	static bool allocerr_warned;
3109 
3110 	/*
3111 	 * The ordering between avail index and
3112 	 * desc reads needs to be enforced.
3113 	 */
3114 	avail_entries = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
3115 		rte_memory_order_acquire) - vq->last_avail_idx;
3116 	if (avail_entries == 0)
3117 		return 0;
3118 
3119 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3120 
3121 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
3122 
3123 	count = RTE_MIN(count, MAX_PKT_BURST);
3124 	count = RTE_MIN(count, avail_entries);
3125 	VHOST_DATA_LOG(dev->ifname, DEBUG, "about to dequeue %u buffers", count);
3126 
3127 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count)) {
3128 		vq->stats.mbuf_alloc_failed += count;
3129 		return 0;
3130 	}
3131 
3132 	for (i = 0; i < count; i++) {
3133 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3134 		uint16_t head_idx;
3135 		uint32_t buf_len;
3136 		uint16_t nr_vec = 0;
3137 		int err;
3138 
3139 		if (unlikely(fill_vec_buf_split(dev, vq,
3140 						vq->last_avail_idx + i,
3141 						&nr_vec, buf_vec,
3142 						&head_idx, &buf_len,
3143 						VHOST_ACCESS_RO) < 0))
3144 			break;
3145 
3146 		update_shadow_used_ring_split(vq, head_idx, 0);
3147 
3148 		if (unlikely(buf_len <= dev->vhost_hlen))
3149 			break;
3150 
3151 		buf_len -= dev->vhost_hlen;
3152 
3153 		err = virtio_dev_pktmbuf_prep(dev, pkts[i], buf_len);
3154 		if (unlikely(err)) {
3155 			/*
3156 			 * mbuf allocation fails for jumbo packets when external
3157 			 * buffer allocation is not allowed and linear buffer
3158 			 * is required. Drop this packet.
3159 			 */
3160 			if (!allocerr_warned) {
3161 				VHOST_DATA_LOG(dev->ifname, ERR,
3162 					"failed mbuf alloc of size %d from %s.",
3163 					buf_len, mbuf_pool->name);
3164 				allocerr_warned = true;
3165 			}
3166 			break;
3167 		}
3168 
3169 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
3170 				   mbuf_pool, legacy_ol_flags, 0, false);
3171 		if (unlikely(err)) {
3172 			if (!allocerr_warned) {
3173 				VHOST_DATA_LOG(dev->ifname, ERR, "failed to copy desc to mbuf.");
3174 				allocerr_warned = true;
3175 			}
3176 			break;
3177 		}
3178 	}
3179 
3180 	if (unlikely(count != i))
3181 		rte_pktmbuf_free_bulk(&pkts[i], count - i);
3182 
3183 	if (likely(vq->shadow_used_idx)) {
3184 		vq->last_avail_idx += vq->shadow_used_idx;
3185 		do_data_copy_dequeue(vq);
3186 		flush_shadow_used_ring_split(dev, vq);
3187 		vhost_vring_call_split(dev, vq);
3188 	}
3189 
3190 	return i;
3191 }
3192 
3193 __rte_noinline
3194 static uint16_t
3195 virtio_dev_tx_split_legacy(struct virtio_net *dev,
3196 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3197 	struct rte_mbuf **pkts, uint16_t count)
3198 	__rte_shared_locks_required(&vq->access_lock)
3199 	__rte_shared_locks_required(&vq->iotlb_lock)
3200 {
3201 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
3202 }
3203 
3204 __rte_noinline
3205 static uint16_t
3206 virtio_dev_tx_split_compliant(struct virtio_net *dev,
3207 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3208 	struct rte_mbuf **pkts, uint16_t count)
3209 	__rte_shared_locks_required(&vq->access_lock)
3210 	__rte_shared_locks_required(&vq->iotlb_lock)
3211 {
3212 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
3213 }
3214 
3215 static __rte_always_inline int
3216 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
3217 				 struct vhost_virtqueue *vq,
3218 				 struct rte_mbuf **pkts,
3219 				 uint16_t avail_idx,
3220 				 uintptr_t *desc_addrs,
3221 				 uint16_t *ids)
3222 	__rte_shared_locks_required(&vq->iotlb_lock)
3223 {
3224 	bool wrap = vq->avail_wrap_counter;
3225 	struct vring_packed_desc *descs = vq->desc_packed;
3226 	uint64_t lens[PACKED_BATCH_SIZE];
3227 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3228 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3229 	uint16_t flags, i;
3230 
3231 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3232 		return -1;
3233 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3234 		return -1;
3235 
3236 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3237 		flags = descs[avail_idx + i].flags;
3238 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3239 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3240 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3241 			return -1;
3242 	}
3243 
3244 	rte_atomic_thread_fence(rte_memory_order_acquire);
3245 
3246 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3247 		lens[i] = descs[avail_idx + i].len;
3248 
3249 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3250 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
3251 						  descs[avail_idx + i].addr,
3252 						  &lens[i], VHOST_ACCESS_RW);
3253 	}
3254 
3255 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3256 		if (unlikely(!desc_addrs[i]))
3257 			return -1;
3258 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3259 			return -1;
3260 	}
3261 
3262 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3263 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3264 			goto err;
3265 	}
3266 
3267 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3268 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3269 
3270 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3271 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3272 			goto err;
3273 	}
3274 
3275 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3276 		pkts[i]->pkt_len = lens[i] - buf_offset;
3277 		pkts[i]->data_len = pkts[i]->pkt_len;
3278 		ids[i] = descs[avail_idx + i].id;
3279 	}
3280 
3281 	return 0;
3282 
3283 err:
3284 	return -1;
3285 }
3286 
3287 static __rte_always_inline int
3288 vhost_async_tx_batch_packed_check(struct virtio_net *dev,
3289 				 struct vhost_virtqueue *vq,
3290 				 struct rte_mbuf **pkts,
3291 				 uint16_t avail_idx,
3292 				 uintptr_t *desc_addrs,
3293 				 uint64_t *lens,
3294 				 uint16_t *ids,
3295 				 int16_t dma_id,
3296 				 uint16_t vchan_id)
3297 {
3298 	bool wrap = vq->avail_wrap_counter;
3299 	struct vring_packed_desc *descs = vq->desc_packed;
3300 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3301 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3302 	uint16_t flags, i;
3303 
3304 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3305 		return -1;
3306 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3307 		return -1;
3308 
3309 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3310 		flags = descs[avail_idx + i].flags;
3311 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3312 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3313 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3314 			return -1;
3315 	}
3316 
3317 	rte_atomic_thread_fence(rte_memory_order_acquire);
3318 
3319 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3320 		lens[i] = descs[avail_idx + i].len;
3321 
3322 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3323 		desc_addrs[i] = descs[avail_idx + i].addr;
3324 	}
3325 
3326 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3327 		if (unlikely(!desc_addrs[i]))
3328 			return -1;
3329 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3330 			return -1;
3331 	}
3332 
3333 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3334 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3335 			goto err;
3336 	}
3337 
3338 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3339 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3340 
3341 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3342 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3343 			goto err;
3344 	}
3345 
3346 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3347 		pkts[i]->pkt_len = lens[i] - buf_offset;
3348 		pkts[i]->data_len = pkts[i]->pkt_len;
3349 		ids[i] = descs[avail_idx + i].id;
3350 	}
3351 
3352 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
3353 		return -1;
3354 
3355 	return 0;
3356 
3357 err:
3358 	return -1;
3359 }
3360 
3361 static __rte_always_inline int
3362 virtio_dev_tx_batch_packed(struct virtio_net *dev,
3363 			   struct vhost_virtqueue *vq,
3364 			   struct rte_mbuf **pkts,
3365 			   bool legacy_ol_flags)
3366 	__rte_shared_locks_required(&vq->iotlb_lock)
3367 {
3368 	uint16_t avail_idx = vq->last_avail_idx;
3369 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3370 	struct virtio_net_hdr *hdr;
3371 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
3372 	uint16_t ids[PACKED_BATCH_SIZE];
3373 	uint16_t i;
3374 
3375 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
3376 					     desc_addrs, ids))
3377 		return -1;
3378 
3379 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3380 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
3381 
3382 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3383 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
3384 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
3385 			   pkts[i]->pkt_len);
3386 
3387 	if (virtio_net_with_host_offload(dev)) {
3388 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3389 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
3390 			vhost_dequeue_offload(dev, hdr, pkts[i], legacy_ol_flags);
3391 		}
3392 	}
3393 
3394 	if (virtio_net_is_inorder(dev))
3395 		vhost_shadow_dequeue_batch_packed_inorder(vq,
3396 			ids[PACKED_BATCH_SIZE - 1]);
3397 	else
3398 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
3399 
3400 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
3401 
3402 	return 0;
3403 }
3404 
3405 static __rte_always_inline int
3406 vhost_dequeue_single_packed(struct virtio_net *dev,
3407 			    struct vhost_virtqueue *vq,
3408 			    struct rte_mempool *mbuf_pool,
3409 			    struct rte_mbuf *pkts,
3410 			    uint16_t *buf_id,
3411 			    uint16_t *desc_count,
3412 			    bool legacy_ol_flags)
3413 	__rte_shared_locks_required(&vq->access_lock)
3414 	__rte_shared_locks_required(&vq->iotlb_lock)
3415 {
3416 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3417 	uint32_t buf_len;
3418 	uint16_t nr_vec = 0;
3419 	int err;
3420 	static bool allocerr_warned;
3421 
3422 	if (unlikely(fill_vec_buf_packed(dev, vq,
3423 					 vq->last_avail_idx, desc_count,
3424 					 buf_vec, &nr_vec,
3425 					 buf_id, &buf_len,
3426 					 VHOST_ACCESS_RO) < 0))
3427 		return -1;
3428 
3429 	if (unlikely(buf_len <= dev->vhost_hlen))
3430 		return -1;
3431 
3432 	buf_len -= dev->vhost_hlen;
3433 
3434 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3435 		if (!allocerr_warned) {
3436 			VHOST_DATA_LOG(dev->ifname, ERR,
3437 				"failed mbuf alloc of size %d from %s.",
3438 				buf_len, mbuf_pool->name);
3439 			allocerr_warned = true;
3440 		}
3441 		return -1;
3442 	}
3443 
3444 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
3445 			   mbuf_pool, legacy_ol_flags, 0, false);
3446 	if (unlikely(err)) {
3447 		if (!allocerr_warned) {
3448 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to copy desc to mbuf.");
3449 			allocerr_warned = true;
3450 		}
3451 		return -1;
3452 	}
3453 
3454 	return 0;
3455 }
3456 
3457 static __rte_always_inline int
3458 virtio_dev_tx_single_packed(struct virtio_net *dev,
3459 			    struct vhost_virtqueue *vq,
3460 			    struct rte_mempool *mbuf_pool,
3461 			    struct rte_mbuf *pkts,
3462 			    bool legacy_ol_flags)
3463 	__rte_shared_locks_required(&vq->access_lock)
3464 	__rte_shared_locks_required(&vq->iotlb_lock)
3465 {
3466 
3467 	uint16_t buf_id, desc_count = 0;
3468 	int ret;
3469 
3470 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
3471 					&desc_count, legacy_ol_flags);
3472 
3473 	if (likely(desc_count > 0)) {
3474 		if (virtio_net_is_inorder(dev))
3475 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
3476 								   desc_count);
3477 		else
3478 			vhost_shadow_dequeue_single_packed(vq, buf_id,
3479 					desc_count);
3480 
3481 		vq_inc_last_avail_packed(vq, desc_count);
3482 	}
3483 
3484 	return ret;
3485 }
3486 
3487 __rte_always_inline
3488 static uint16_t
3489 virtio_dev_tx_packed(struct virtio_net *dev,
3490 		     struct vhost_virtqueue *__rte_restrict vq,
3491 		     struct rte_mempool *mbuf_pool,
3492 		     struct rte_mbuf **__rte_restrict pkts,
3493 		     uint32_t count,
3494 		     bool legacy_ol_flags)
3495 	__rte_shared_locks_required(&vq->access_lock)
3496 	__rte_shared_locks_required(&vq->iotlb_lock)
3497 {
3498 	uint32_t pkt_idx = 0;
3499 
3500 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count)) {
3501 		vq->stats.mbuf_alloc_failed += count;
3502 		return 0;
3503 	}
3504 
3505 	do {
3506 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
3507 
3508 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
3509 			if (!virtio_dev_tx_batch_packed(dev, vq,
3510 							&pkts[pkt_idx],
3511 							legacy_ol_flags)) {
3512 				pkt_idx += PACKED_BATCH_SIZE;
3513 				continue;
3514 			}
3515 		}
3516 
3517 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
3518 						pkts[pkt_idx],
3519 						legacy_ol_flags))
3520 			break;
3521 		pkt_idx++;
3522 	} while (pkt_idx < count);
3523 
3524 	if (pkt_idx != count)
3525 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
3526 
3527 	if (vq->shadow_used_idx) {
3528 		do_data_copy_dequeue(vq);
3529 
3530 		vhost_flush_dequeue_shadow_packed(dev, vq);
3531 		vhost_vring_call_packed(dev, vq);
3532 	}
3533 
3534 	return pkt_idx;
3535 }
3536 
3537 __rte_noinline
3538 static uint16_t
3539 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
3540 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3541 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3542 	__rte_shared_locks_required(&vq->access_lock)
3543 	__rte_shared_locks_required(&vq->iotlb_lock)
3544 {
3545 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
3546 }
3547 
3548 __rte_noinline
3549 static uint16_t
3550 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
3551 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3552 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3553 	__rte_shared_locks_required(&vq->access_lock)
3554 	__rte_shared_locks_required(&vq->iotlb_lock)
3555 {
3556 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
3557 }
3558 
3559 uint16_t
3560 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
3561 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
3562 {
3563 	struct virtio_net *dev;
3564 	struct rte_mbuf *rarp_mbuf = NULL;
3565 	struct vhost_virtqueue *vq;
3566 	int16_t success = 1;
3567 
3568 	dev = get_device(vid);
3569 	if (!dev)
3570 		return 0;
3571 
3572 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
3573 		VHOST_DATA_LOG(dev->ifname, ERR,
3574 			"%s: built-in vhost net backend is disabled.",
3575 			__func__);
3576 		return 0;
3577 	}
3578 
3579 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
3580 		VHOST_DATA_LOG(dev->ifname, ERR,
3581 			"%s: invalid virtqueue idx %d.",
3582 			__func__, queue_id);
3583 		return 0;
3584 	}
3585 
3586 	vq = dev->virtqueue[queue_id];
3587 
3588 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
3589 		return 0;
3590 
3591 	if (unlikely(!vq->enabled)) {
3592 		count = 0;
3593 		goto out_access_unlock;
3594 	}
3595 
3596 	vhost_user_iotlb_rd_lock(vq);
3597 
3598 	if (unlikely(!vq->access_ok)) {
3599 		vhost_user_iotlb_rd_unlock(vq);
3600 		rte_rwlock_read_unlock(&vq->access_lock);
3601 
3602 		virtio_dev_vring_translate(dev, vq);
3603 		goto out_no_unlock;
3604 	}
3605 
3606 	/*
3607 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
3608 	 * array, to looks like that guest actually send such packet.
3609 	 *
3610 	 * Check user_send_rarp() for more information.
3611 	 *
3612 	 * broadcast_rarp shares a cacheline in the virtio_net structure
3613 	 * with some fields that are accessed during enqueue and
3614 	 * rte_atomic_compare_exchange_strong_explicit causes a write if performed compare
3615 	 * and exchange. This could result in false sharing between enqueue
3616 	 * and dequeue.
3617 	 *
3618 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
3619 	 * and only performing compare and exchange if the read indicates it
3620 	 * is likely to be set.
3621 	 */
3622 	if (unlikely(rte_atomic_load_explicit(&dev->broadcast_rarp, rte_memory_order_acquire) &&
3623 			rte_atomic_compare_exchange_strong_explicit(&dev->broadcast_rarp,
3624 			&success, 0, rte_memory_order_release, rte_memory_order_relaxed))) {
3625 
3626 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
3627 		if (rarp_mbuf == NULL) {
3628 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to make RARP packet.");
3629 			count = 0;
3630 			goto out;
3631 		}
3632 		/*
3633 		 * Inject it to the head of "pkts" array, so that switch's mac
3634 		 * learning table will get updated first.
3635 		 */
3636 		pkts[0] = rarp_mbuf;
3637 		vhost_queue_stats_update(dev, vq, pkts, 1);
3638 		pkts++;
3639 		count -= 1;
3640 	}
3641 
3642 	if (vq_is_packed(dev)) {
3643 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3644 			count = virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool, pkts, count);
3645 		else
3646 			count = virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool, pkts, count);
3647 	} else {
3648 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3649 			count = virtio_dev_tx_split_legacy(dev, vq, mbuf_pool, pkts, count);
3650 		else
3651 			count = virtio_dev_tx_split_compliant(dev, vq, mbuf_pool, pkts, count);
3652 	}
3653 
3654 	vhost_queue_stats_update(dev, vq, pkts, count);
3655 
3656 out:
3657 	vhost_user_iotlb_rd_unlock(vq);
3658 
3659 out_access_unlock:
3660 	rte_rwlock_read_unlock(&vq->access_lock);
3661 
3662 	if (unlikely(rarp_mbuf != NULL))
3663 		count += 1;
3664 
3665 out_no_unlock:
3666 	return count;
3667 }
3668 
3669 static __rte_always_inline uint16_t
3670 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
3671 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
3672 		uint16_t vchan_id, bool legacy_ol_flags)
3673 	__rte_shared_locks_required(&vq->access_lock)
3674 {
3675 	uint16_t start_idx, from, i;
3676 	uint16_t nr_cpl_pkts = 0;
3677 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
3678 
3679 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
3680 
3681 	start_idx = async_get_first_inflight_pkt_idx(vq);
3682 
3683 	from = start_idx;
3684 	while (vq->async->pkts_cmpl_flag[from] && count--) {
3685 		vq->async->pkts_cmpl_flag[from] = false;
3686 		from = (from + 1) % vq->size;
3687 		nr_cpl_pkts++;
3688 	}
3689 
3690 	if (nr_cpl_pkts == 0)
3691 		return 0;
3692 
3693 	for (i = 0; i < nr_cpl_pkts; i++) {
3694 		from = (start_idx + i) % vq->size;
3695 		pkts[i] = pkts_info[from].mbuf;
3696 
3697 		if (virtio_net_with_host_offload(dev))
3698 			vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i],
3699 					      legacy_ol_flags);
3700 	}
3701 
3702 	/* write back completed descs to used ring and update used idx */
3703 	if (vq_is_packed(dev)) {
3704 		write_back_completed_descs_packed(vq, nr_cpl_pkts);
3705 		vhost_vring_call_packed(dev, vq);
3706 	} else {
3707 		write_back_completed_descs_split(vq, nr_cpl_pkts);
3708 		rte_atomic_fetch_add_explicit((unsigned short __rte_atomic *)&vq->used->idx,
3709 			nr_cpl_pkts, rte_memory_order_release);
3710 		vhost_vring_call_split(dev, vq);
3711 	}
3712 	vq->async->pkts_inflight_n -= nr_cpl_pkts;
3713 
3714 	return nr_cpl_pkts;
3715 }
3716 
3717 static __rte_always_inline uint16_t
3718 virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3719 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3720 		int16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
3721 	__rte_shared_locks_required(&vq->access_lock)
3722 	__rte_shared_locks_required(&vq->iotlb_lock)
3723 {
3724 	static bool allocerr_warned;
3725 	bool dropped = false;
3726 	uint16_t avail_entries;
3727 	uint16_t pkt_idx, slot_idx = 0;
3728 	uint16_t nr_done_pkts = 0;
3729 	uint16_t pkt_err = 0;
3730 	uint16_t n_xfer;
3731 	struct vhost_async *async = vq->async;
3732 	struct async_inflight_info *pkts_info = async->pkts_info;
3733 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
3734 	uint16_t pkts_size = count;
3735 
3736 	/**
3737 	 * The ordering between avail index and
3738 	 * desc reads needs to be enforced.
3739 	 */
3740 	avail_entries = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
3741 		rte_memory_order_acquire) - vq->last_avail_idx;
3742 	if (avail_entries == 0)
3743 		goto out;
3744 
3745 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3746 
3747 	async_iter_reset(async);
3748 
3749 	count = RTE_MIN(count, MAX_PKT_BURST);
3750 	count = RTE_MIN(count, avail_entries);
3751 	VHOST_DATA_LOG(dev->ifname, DEBUG, "about to dequeue %u buffers", count);
3752 
3753 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count)) {
3754 		vq->stats.mbuf_alloc_failed += count;
3755 		goto out;
3756 	}
3757 
3758 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
3759 		uint16_t head_idx = 0;
3760 		uint16_t nr_vec = 0;
3761 		uint16_t to;
3762 		uint32_t buf_len;
3763 		int err;
3764 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3765 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
3766 
3767 		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
3768 						&nr_vec, buf_vec,
3769 						&head_idx, &buf_len,
3770 						VHOST_ACCESS_RO) < 0)) {
3771 			dropped = true;
3772 			break;
3773 		}
3774 
3775 		if (unlikely(buf_len <= dev->vhost_hlen)) {
3776 			dropped = true;
3777 			break;
3778 		}
3779 
3780 		buf_len -= dev->vhost_hlen;
3781 
3782 		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
3783 		if (unlikely(err)) {
3784 			/**
3785 			 * mbuf allocation fails for jumbo packets when external
3786 			 * buffer allocation is not allowed and linear buffer
3787 			 * is required. Drop this packet.
3788 			 */
3789 			if (!allocerr_warned) {
3790 				VHOST_DATA_LOG(dev->ifname, ERR,
3791 					"%s: Failed mbuf alloc of size %d from %s",
3792 					__func__, buf_len, mbuf_pool->name);
3793 				allocerr_warned = true;
3794 			}
3795 			dropped = true;
3796 			slot_idx--;
3797 			break;
3798 		}
3799 
3800 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
3801 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
3802 					legacy_ol_flags, slot_idx, true);
3803 		if (unlikely(err)) {
3804 			if (!allocerr_warned) {
3805 				VHOST_DATA_LOG(dev->ifname, ERR,
3806 					"%s: Failed to offload copies to async channel.",
3807 					__func__);
3808 				allocerr_warned = true;
3809 			}
3810 			dropped = true;
3811 			slot_idx--;
3812 			break;
3813 		}
3814 
3815 		pkts_info[slot_idx].mbuf = pkt;
3816 
3817 		/* store used descs */
3818 		to = async->desc_idx_split & (vq->size - 1);
3819 		async->descs_split[to].id = head_idx;
3820 		async->descs_split[to].len = 0;
3821 		async->desc_idx_split++;
3822 
3823 		vq->last_avail_idx++;
3824 	}
3825 
3826 	if (unlikely(dropped))
3827 		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
3828 
3829 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
3830 					  async->iov_iter, pkt_idx);
3831 
3832 	async->pkts_inflight_n += n_xfer;
3833 
3834 	pkt_err = pkt_idx - n_xfer;
3835 	if (unlikely(pkt_err)) {
3836 		VHOST_DATA_LOG(dev->ifname, DEBUG, "%s: failed to transfer data.",
3837 			__func__);
3838 
3839 		pkt_idx = n_xfer;
3840 		/* recover available ring */
3841 		vq->last_avail_idx -= pkt_err;
3842 
3843 		/**
3844 		 * recover async channel copy related structures and free pktmbufs
3845 		 * for error pkts.
3846 		 */
3847 		async->desc_idx_split -= pkt_err;
3848 		while (pkt_err-- > 0) {
3849 			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
3850 			slot_idx--;
3851 		}
3852 	}
3853 
3854 	async->pkts_idx += pkt_idx;
3855 	if (async->pkts_idx >= vq->size)
3856 		async->pkts_idx -= vq->size;
3857 
3858 out:
3859 	/* DMA device may serve other queues, unconditionally check completed. */
3860 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, pkts_size,
3861 							dma_id, vchan_id, legacy_ol_flags);
3862 
3863 	return nr_done_pkts;
3864 }
3865 
3866 __rte_noinline
3867 static uint16_t
3868 virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
3869 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3870 		struct rte_mbuf **pkts, uint16_t count,
3871 		int16_t dma_id, uint16_t vchan_id)
3872 	__rte_shared_locks_required(&vq->access_lock)
3873 	__rte_shared_locks_required(&vq->iotlb_lock)
3874 {
3875 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3876 				pkts, count, dma_id, vchan_id, true);
3877 }
3878 
3879 __rte_noinline
3880 static uint16_t
3881 virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
3882 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3883 		struct rte_mbuf **pkts, uint16_t count,
3884 		int16_t dma_id, uint16_t vchan_id)
3885 	__rte_shared_locks_required(&vq->access_lock)
3886 	__rte_shared_locks_required(&vq->iotlb_lock)
3887 {
3888 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3889 				pkts, count, dma_id, vchan_id, false);
3890 }
3891 
3892 static __rte_always_inline void
3893 vhost_async_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
3894 				uint16_t buf_id, uint16_t count)
3895 	__rte_shared_locks_required(&vq->access_lock)
3896 {
3897 	struct vhost_async *async = vq->async;
3898 	uint16_t idx = async->buffer_idx_packed;
3899 
3900 	async->buffers_packed[idx].id = buf_id;
3901 	async->buffers_packed[idx].len = 0;
3902 	async->buffers_packed[idx].count = count;
3903 
3904 	async->buffer_idx_packed++;
3905 	if (async->buffer_idx_packed >= vq->size)
3906 		async->buffer_idx_packed -= vq->size;
3907 
3908 }
3909 
3910 static __rte_always_inline int
3911 virtio_dev_tx_async_single_packed(struct virtio_net *dev,
3912 			struct vhost_virtqueue *vq,
3913 			struct rte_mempool *mbuf_pool,
3914 			struct rte_mbuf *pkts,
3915 			uint16_t slot_idx,
3916 			bool legacy_ol_flags)
3917 	__rte_shared_locks_required(&vq->access_lock)
3918 	__rte_shared_locks_required(&vq->iotlb_lock)
3919 {
3920 	int err;
3921 	uint16_t buf_id, desc_count = 0;
3922 	uint16_t nr_vec = 0;
3923 	uint32_t buf_len;
3924 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3925 	struct vhost_async *async = vq->async;
3926 	struct async_inflight_info *pkts_info = async->pkts_info;
3927 	static bool allocerr_warned;
3928 
3929 	if (unlikely(fill_vec_buf_packed(dev, vq, vq->last_avail_idx, &desc_count,
3930 					 buf_vec, &nr_vec, &buf_id, &buf_len,
3931 					 VHOST_ACCESS_RO) < 0))
3932 		return -1;
3933 
3934 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3935 		if (!allocerr_warned) {
3936 			VHOST_DATA_LOG(dev->ifname, ERR, "Failed mbuf alloc of size %d from %s.",
3937 				buf_len, mbuf_pool->name);
3938 
3939 			allocerr_warned = true;
3940 		}
3941 		return -1;
3942 	}
3943 
3944 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts, mbuf_pool,
3945 		legacy_ol_flags, slot_idx, true);
3946 	if (unlikely(err)) {
3947 		rte_pktmbuf_free(pkts);
3948 		if (!allocerr_warned) {
3949 			VHOST_DATA_LOG(dev->ifname, ERR, "Failed to copy desc to mbuf on.");
3950 			allocerr_warned = true;
3951 		}
3952 		return -1;
3953 	}
3954 
3955 	pkts_info[slot_idx].descs = desc_count;
3956 
3957 	/* update async shadow packed ring */
3958 	vhost_async_shadow_dequeue_single_packed(vq, buf_id, desc_count);
3959 
3960 	vq_inc_last_avail_packed(vq, desc_count);
3961 
3962 	return err;
3963 }
3964 
3965 static __rte_always_inline int
3966 virtio_dev_tx_async_packed_batch(struct virtio_net *dev,
3967 			   struct vhost_virtqueue *vq,
3968 			   struct rte_mbuf **pkts, uint16_t slot_idx,
3969 			   uint16_t dma_id, uint16_t vchan_id)
3970 	__rte_shared_locks_required(&vq->access_lock)
3971 	__rte_shared_locks_required(&vq->iotlb_lock)
3972 {
3973 	uint16_t avail_idx = vq->last_avail_idx;
3974 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3975 	struct vhost_async *async = vq->async;
3976 	struct async_inflight_info *pkts_info = async->pkts_info;
3977 	struct virtio_net_hdr *hdr;
3978 	uint32_t mbuf_offset = 0;
3979 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
3980 	uint64_t desc_vva;
3981 	uint64_t lens[PACKED_BATCH_SIZE];
3982 	void *host_iova[PACKED_BATCH_SIZE];
3983 	uint64_t mapped_len[PACKED_BATCH_SIZE];
3984 	uint16_t ids[PACKED_BATCH_SIZE];
3985 	uint16_t i;
3986 
3987 	if (vhost_async_tx_batch_packed_check(dev, vq, pkts, avail_idx,
3988 					     desc_addrs, lens, ids, dma_id, vchan_id))
3989 		return -1;
3990 
3991 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3992 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
3993 
3994 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3995 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
3996 			desc_addrs[i] + buf_offset, pkts[i]->pkt_len, &mapped_len[i]);
3997 	}
3998 
3999 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4000 		async_iter_initialize(dev, async);
4001 		async_iter_add_iovec(dev, async,
4002 		host_iova[i],
4003 		(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
4004 		mapped_len[i]);
4005 		async->iter_idx++;
4006 	}
4007 
4008 	if (virtio_net_with_host_offload(dev)) {
4009 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4010 			desc_vva = vhost_iova_to_vva(dev, vq, desc_addrs[i],
4011 						&lens[i], VHOST_ACCESS_RO);
4012 			hdr = (struct virtio_net_hdr *)(uintptr_t)desc_vva;
4013 			pkts_info[slot_idx + i].nethdr = *hdr;
4014 		}
4015 	}
4016 
4017 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
4018 
4019 	vhost_async_shadow_dequeue_packed_batch(vq, ids);
4020 
4021 	return 0;
4022 }
4023 
4024 static __rte_always_inline uint16_t
4025 virtio_dev_tx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
4026 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4027 		uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
4028 	__rte_shared_locks_required(&vq->access_lock)
4029 	__rte_shared_locks_required(&vq->iotlb_lock)
4030 {
4031 	uint32_t pkt_idx = 0;
4032 	uint16_t slot_idx = 0;
4033 	uint16_t nr_done_pkts = 0;
4034 	uint16_t pkt_err = 0;
4035 	uint32_t n_xfer;
4036 	uint16_t i;
4037 	struct vhost_async *async = vq->async;
4038 	struct async_inflight_info *pkts_info = async->pkts_info;
4039 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
4040 
4041 	VHOST_DATA_LOG(dev->ifname, DEBUG, "(%d) about to dequeue %u buffers", dev->vid, count);
4042 
4043 	async_iter_reset(async);
4044 
4045 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count)) {
4046 		vq->stats.mbuf_alloc_failed += count;
4047 		goto out;
4048 	}
4049 
4050 	do {
4051 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
4052 
4053 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
4054 
4055 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4056 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
4057 			if (!virtio_dev_tx_async_packed_batch(dev, vq, &pkts_prealloc[pkt_idx],
4058 						slot_idx, dma_id, vchan_id)) {
4059 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
4060 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4061 					pkts_info[slot_idx].descs = 1;
4062 					pkts_info[slot_idx].nr_buffers = 1;
4063 					pkts_info[slot_idx].mbuf = pkts_prealloc[pkt_idx];
4064 					pkt_idx++;
4065 				}
4066 				continue;
4067 			}
4068 		}
4069 
4070 		if (unlikely(virtio_dev_tx_async_single_packed(dev, vq, mbuf_pool, pkt,
4071 				slot_idx, legacy_ol_flags))) {
4072 			rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
4073 
4074 			if (slot_idx == 0)
4075 				slot_idx = vq->size - 1;
4076 			else
4077 				slot_idx--;
4078 
4079 			break;
4080 		}
4081 
4082 		pkts_info[slot_idx].mbuf = pkt;
4083 		pkt_idx++;
4084 	} while (pkt_idx < count);
4085 
4086 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
4087 					async->iov_iter, pkt_idx);
4088 
4089 	async->pkts_inflight_n += n_xfer;
4090 
4091 	pkt_err = pkt_idx - n_xfer;
4092 
4093 	if (unlikely(pkt_err)) {
4094 		uint16_t descs_err = 0;
4095 
4096 		pkt_idx -= pkt_err;
4097 
4098 		/**
4099 		 * recover DMA-copy related structures and free pktmbuf for DMA-error pkts.
4100 		 */
4101 		if (async->buffer_idx_packed >= pkt_err)
4102 			async->buffer_idx_packed -= pkt_err;
4103 		else
4104 			async->buffer_idx_packed += vq->size - pkt_err;
4105 
4106 		while (pkt_err-- > 0) {
4107 			rte_pktmbuf_free(pkts_info[slot_idx].mbuf);
4108 			descs_err += pkts_info[slot_idx].descs;
4109 
4110 			if (slot_idx == 0)
4111 				slot_idx = vq->size - 1;
4112 			else
4113 				slot_idx--;
4114 		}
4115 
4116 		/* recover available ring */
4117 		if (vq->last_avail_idx >= descs_err) {
4118 			vq->last_avail_idx -= descs_err;
4119 		} else {
4120 			vq->last_avail_idx += vq->size - descs_err;
4121 			vq->avail_wrap_counter ^= 1;
4122 		}
4123 	}
4124 
4125 	async->pkts_idx += pkt_idx;
4126 	if (async->pkts_idx >= vq->size)
4127 		async->pkts_idx -= vq->size;
4128 
4129 out:
4130 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, count,
4131 					dma_id, vchan_id, legacy_ol_flags);
4132 
4133 	return nr_done_pkts;
4134 }
4135 
4136 __rte_noinline
4137 static uint16_t
4138 virtio_dev_tx_async_packed_legacy(struct virtio_net *dev, struct vhost_virtqueue *vq,
4139 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4140 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4141 	__rte_shared_locks_required(&vq->access_lock)
4142 	__rte_shared_locks_required(&vq->iotlb_lock)
4143 {
4144 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4145 				pkts, count, dma_id, vchan_id, true);
4146 }
4147 
4148 __rte_noinline
4149 static uint16_t
4150 virtio_dev_tx_async_packed_compliant(struct virtio_net *dev, struct vhost_virtqueue *vq,
4151 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4152 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4153 	__rte_shared_locks_required(&vq->access_lock)
4154 	__rte_shared_locks_required(&vq->iotlb_lock)
4155 {
4156 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4157 				pkts, count, dma_id, vchan_id, false);
4158 }
4159 
4160 uint16_t
4161 rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
4162 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
4163 	int *nr_inflight, int16_t dma_id, uint16_t vchan_id)
4164 {
4165 	struct virtio_net *dev;
4166 	struct rte_mbuf *rarp_mbuf = NULL;
4167 	struct vhost_virtqueue *vq;
4168 	int16_t success = 1;
4169 
4170 	dev = get_device(vid);
4171 	if (!dev || !nr_inflight)
4172 		return 0;
4173 
4174 	*nr_inflight = -1;
4175 
4176 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
4177 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: built-in vhost net backend is disabled.",
4178 			__func__);
4179 		return 0;
4180 	}
4181 
4182 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
4183 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %d.",
4184 			__func__, queue_id);
4185 		return 0;
4186 	}
4187 
4188 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
4189 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
4190 			__func__, dma_id);
4191 		return 0;
4192 	}
4193 
4194 	if (unlikely(!dma_copy_track[dma_id].vchans ||
4195 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
4196 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid channel %d:%u.",
4197 			__func__, dma_id, vchan_id);
4198 		return 0;
4199 	}
4200 
4201 	vq = dev->virtqueue[queue_id];
4202 
4203 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
4204 		return 0;
4205 
4206 	if (unlikely(vq->enabled == 0)) {
4207 		count = 0;
4208 		goto out_access_unlock;
4209 	}
4210 
4211 	if (unlikely(!vq->async)) {
4212 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: async not registered for queue id %d.",
4213 			__func__, queue_id);
4214 		count = 0;
4215 		goto out_access_unlock;
4216 	}
4217 
4218 	vhost_user_iotlb_rd_lock(vq);
4219 
4220 	if (unlikely(vq->access_ok == 0)) {
4221 		vhost_user_iotlb_rd_unlock(vq);
4222 		rte_rwlock_read_unlock(&vq->access_lock);
4223 
4224 		virtio_dev_vring_translate(dev, vq);
4225 		count = 0;
4226 		goto out_no_unlock;
4227 	}
4228 
4229 	/*
4230 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
4231 	 * array, to looks like that guest actually send such packet.
4232 	 *
4233 	 * Check user_send_rarp() for more information.
4234 	 *
4235 	 * broadcast_rarp shares a cacheline in the virtio_net structure
4236 	 * with some fields that are accessed during enqueue and
4237 	 * rte_atomic_compare_exchange_strong_explicit causes a write if performed compare
4238 	 * and exchange. This could result in false sharing between enqueue
4239 	 * and dequeue.
4240 	 *
4241 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
4242 	 * and only performing compare and exchange if the read indicates it
4243 	 * is likely to be set.
4244 	 */
4245 	if (unlikely(rte_atomic_load_explicit(&dev->broadcast_rarp, rte_memory_order_acquire) &&
4246 			rte_atomic_compare_exchange_strong_explicit(&dev->broadcast_rarp,
4247 			&success, 0, rte_memory_order_release, rte_memory_order_relaxed))) {
4248 
4249 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
4250 		if (rarp_mbuf == NULL) {
4251 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to make RARP packet.");
4252 			count = 0;
4253 			goto out;
4254 		}
4255 		/*
4256 		 * Inject it to the head of "pkts" array, so that switch's mac
4257 		 * learning table will get updated first.
4258 		 */
4259 		pkts[0] = rarp_mbuf;
4260 		vhost_queue_stats_update(dev, vq, pkts, 1);
4261 		pkts++;
4262 		count -= 1;
4263 	}
4264 
4265 	if (vq_is_packed(dev)) {
4266 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4267 			count = virtio_dev_tx_async_packed_legacy(dev, vq, mbuf_pool,
4268 					pkts, count, dma_id, vchan_id);
4269 		else
4270 			count = virtio_dev_tx_async_packed_compliant(dev, vq, mbuf_pool,
4271 					pkts, count, dma_id, vchan_id);
4272 	} else {
4273 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4274 			count = virtio_dev_tx_async_split_legacy(dev, vq, mbuf_pool,
4275 					pkts, count, dma_id, vchan_id);
4276 		else
4277 			count = virtio_dev_tx_async_split_compliant(dev, vq, mbuf_pool,
4278 					pkts, count, dma_id, vchan_id);
4279 	}
4280 
4281 	*nr_inflight = vq->async->pkts_inflight_n;
4282 	vhost_queue_stats_update(dev, vq, pkts, count);
4283 
4284 out:
4285 	vhost_user_iotlb_rd_unlock(vq);
4286 
4287 out_access_unlock:
4288 	rte_rwlock_read_unlock(&vq->access_lock);
4289 
4290 	if (unlikely(rarp_mbuf != NULL))
4291 		count += 1;
4292 
4293 out_no_unlock:
4294 	return count;
4295 }
4296