xref: /dpdk/lib/vhost/virtio_net.c (revision 6d7e741be18ab1e6ecce46edb2516318305c3c73)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_dmadev.h>
15 #include <rte_vhost.h>
16 #include <rte_tcp.h>
17 #include <rte_udp.h>
18 #include <rte_sctp.h>
19 #include <rte_arp.h>
20 #include <rte_spinlock.h>
21 #include <rte_malloc.h>
22 #include <rte_vhost_async.h>
23 
24 #include "iotlb.h"
25 #include "vhost.h"
26 
27 #define MAX_BATCH_LEN 256
28 
29 static __rte_always_inline uint16_t
30 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
31 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
32 		uint16_t vchan_id, bool legacy_ol_flags);
33 
34 /* DMA device copy operation tracking array. */
35 struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
36 
37 static  __rte_always_inline bool
38 rxvq_is_mergeable(struct virtio_net *dev)
39 {
40 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
41 }
42 
43 static  __rte_always_inline bool
44 virtio_net_is_inorder(struct virtio_net *dev)
45 {
46 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
47 }
48 
49 static bool
50 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
51 {
52 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
53 }
54 
55 static inline void
56 vhost_queue_stats_update(const struct virtio_net *dev, struct vhost_virtqueue *vq,
57 		struct rte_mbuf **pkts, uint16_t count)
58 	__rte_shared_locks_required(&vq->access_lock)
59 {
60 	struct virtqueue_stats *stats = &vq->stats;
61 	int i;
62 
63 	if (!(dev->flags & VIRTIO_DEV_STATS_ENABLED))
64 		return;
65 
66 	for (i = 0; i < count; i++) {
67 		const struct rte_ether_addr *ea;
68 		const struct rte_mbuf *pkt = pkts[i];
69 		uint32_t pkt_len = rte_pktmbuf_pkt_len(pkt);
70 
71 		stats->packets++;
72 		stats->bytes += pkt_len;
73 
74 		if (pkt_len >= 1024)
75 			stats->size_bins[6 + (pkt_len > 1518)]++;
76 		else if (pkt_len <= 64)
77 			stats->size_bins[pkt_len >> 6]++;
78 		else
79 			stats->size_bins[32UL - rte_clz32(pkt_len) - 5]++;
80 
81 		ea = rte_pktmbuf_mtod(pkt, const struct rte_ether_addr *);
82 		RTE_BUILD_BUG_ON(offsetof(struct virtqueue_stats, broadcast) !=
83 				offsetof(struct virtqueue_stats, multicast) + sizeof(uint64_t));
84 		if (unlikely(rte_is_multicast_ether_addr(ea)))
85 			(&stats->multicast)[rte_is_broadcast_ether_addr(ea)]++;
86 	}
87 }
88 
89 static __rte_always_inline int64_t
90 vhost_async_dma_transfer_one(struct virtio_net *dev, struct vhost_virtqueue *vq,
91 		int16_t dma_id, uint16_t vchan_id, uint16_t flag_idx,
92 		struct vhost_iov_iter *pkt)
93 	__rte_shared_locks_required(&vq->access_lock)
94 {
95 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
96 	uint16_t ring_mask = dma_info->ring_mask;
97 	static bool vhost_async_dma_copy_log;
98 
99 
100 	struct vhost_iovec *iov = pkt->iov;
101 	int copy_idx = 0;
102 	uint32_t nr_segs = pkt->nr_segs;
103 	uint16_t i;
104 
105 	if (rte_dma_burst_capacity(dma_id, vchan_id) < nr_segs)
106 		return -1;
107 
108 	for (i = 0; i < nr_segs; i++) {
109 		copy_idx = rte_dma_copy(dma_id, vchan_id, (rte_iova_t)iov[i].src_addr,
110 				(rte_iova_t)iov[i].dst_addr, iov[i].len, RTE_DMA_OP_FLAG_LLC);
111 		/**
112 		 * Since all memory is pinned and DMA vChannel
113 		 * ring has enough space, failure should be a
114 		 * rare case. If failure happens, it means DMA
115 		 * device encounters serious errors; in this
116 		 * case, please stop async data-path and check
117 		 * what has happened to DMA device.
118 		 */
119 		if (unlikely(copy_idx < 0)) {
120 			if (!vhost_async_dma_copy_log) {
121 				VHOST_DATA_LOG(dev->ifname, ERR,
122 					"DMA copy failed for channel %d:%u",
123 					dma_id, vchan_id);
124 				vhost_async_dma_copy_log = true;
125 			}
126 			return -1;
127 		}
128 	}
129 
130 	/**
131 	 * Only store packet completion flag address in the last copy's
132 	 * slot, and other slots are set to NULL.
133 	 */
134 	dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = &vq->async->pkts_cmpl_flag[flag_idx];
135 
136 	return nr_segs;
137 }
138 
139 static __rte_always_inline uint16_t
140 vhost_async_dma_transfer(struct virtio_net *dev, struct vhost_virtqueue *vq,
141 		int16_t dma_id, uint16_t vchan_id, uint16_t head_idx,
142 		struct vhost_iov_iter *pkts, uint16_t nr_pkts)
143 	__rte_shared_locks_required(&vq->access_lock)
144 {
145 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
146 	int64_t ret, nr_copies = 0;
147 	uint16_t pkt_idx;
148 
149 	rte_spinlock_lock(&dma_info->dma_lock);
150 
151 	for (pkt_idx = 0; pkt_idx < nr_pkts; pkt_idx++) {
152 		ret = vhost_async_dma_transfer_one(dev, vq, dma_id, vchan_id, head_idx,
153 				&pkts[pkt_idx]);
154 		if (unlikely(ret < 0))
155 			break;
156 
157 		nr_copies += ret;
158 		head_idx++;
159 		if (head_idx >= vq->size)
160 			head_idx -= vq->size;
161 	}
162 
163 	if (likely(nr_copies > 0))
164 		rte_dma_submit(dma_id, vchan_id);
165 
166 	rte_spinlock_unlock(&dma_info->dma_lock);
167 
168 	return pkt_idx;
169 }
170 
171 static __rte_always_inline uint16_t
172 vhost_async_dma_check_completed(struct virtio_net *dev, int16_t dma_id, uint16_t vchan_id,
173 		uint16_t max_pkts)
174 {
175 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
176 	uint16_t ring_mask = dma_info->ring_mask;
177 	uint16_t last_idx = 0;
178 	uint16_t nr_copies;
179 	uint16_t copy_idx;
180 	uint16_t i;
181 	bool has_error = false;
182 	static bool vhost_async_dma_complete_log;
183 
184 	rte_spinlock_lock(&dma_info->dma_lock);
185 
186 	/**
187 	 * Print error log for debugging, if DMA reports error during
188 	 * DMA transfer. We do not handle error in vhost level.
189 	 */
190 	nr_copies = rte_dma_completed(dma_id, vchan_id, max_pkts, &last_idx, &has_error);
191 	if (unlikely(!vhost_async_dma_complete_log && has_error)) {
192 		VHOST_DATA_LOG(dev->ifname, ERR,
193 			"DMA completion failure on channel %d:%u",
194 			dma_id, vchan_id);
195 		vhost_async_dma_complete_log = true;
196 	} else if (nr_copies == 0) {
197 		goto out;
198 	}
199 
200 	copy_idx = last_idx - nr_copies + 1;
201 	for (i = 0; i < nr_copies; i++) {
202 		bool *flag;
203 
204 		flag = dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask];
205 		if (flag) {
206 			/**
207 			 * Mark the packet flag as received. The flag
208 			 * could belong to another virtqueue but write
209 			 * is atomic.
210 			 */
211 			*flag = true;
212 			dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = NULL;
213 		}
214 		copy_idx++;
215 	}
216 
217 out:
218 	rte_spinlock_unlock(&dma_info->dma_lock);
219 	return nr_copies;
220 }
221 
222 static inline void
223 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
224 	__rte_shared_locks_required(&vq->iotlb_lock)
225 {
226 	struct batch_copy_elem *elem = vq->batch_copy_elems;
227 	uint16_t count = vq->batch_copy_nb_elems;
228 	int i;
229 
230 	for (i = 0; i < count; i++) {
231 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
232 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
233 					   elem[i].len);
234 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
235 	}
236 
237 	vq->batch_copy_nb_elems = 0;
238 }
239 
240 static inline void
241 do_data_copy_dequeue(struct vhost_virtqueue *vq)
242 {
243 	struct batch_copy_elem *elem = vq->batch_copy_elems;
244 	uint16_t count = vq->batch_copy_nb_elems;
245 	int i;
246 
247 	for (i = 0; i < count; i++)
248 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
249 
250 	vq->batch_copy_nb_elems = 0;
251 }
252 
253 static __rte_always_inline void
254 do_flush_shadow_used_ring_split(struct virtio_net *dev,
255 			struct vhost_virtqueue *vq,
256 			uint16_t to, uint16_t from, uint16_t size)
257 {
258 	rte_memcpy(&vq->used->ring[to],
259 			&vq->shadow_used_split[from],
260 			size * sizeof(struct vring_used_elem));
261 	vhost_log_cache_used_vring(dev, vq,
262 			offsetof(struct vring_used, ring[to]),
263 			size * sizeof(struct vring_used_elem));
264 }
265 
266 static __rte_always_inline void
267 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
268 {
269 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
270 
271 	if (used_idx + vq->shadow_used_idx <= vq->size) {
272 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
273 					  vq->shadow_used_idx);
274 	} else {
275 		uint16_t size;
276 
277 		/* update used ring interval [used_idx, vq->size] */
278 		size = vq->size - used_idx;
279 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
280 
281 		/* update the left half used ring interval [0, left_size] */
282 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
283 					  vq->shadow_used_idx - size);
284 	}
285 	vq->last_used_idx += vq->shadow_used_idx;
286 
287 	vhost_log_cache_sync(dev, vq);
288 
289 	rte_atomic_fetch_add_explicit((unsigned short __rte_atomic *)&vq->used->idx,
290 		vq->shadow_used_idx, rte_memory_order_release);
291 	vq->shadow_used_idx = 0;
292 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
293 		sizeof(vq->used->idx));
294 }
295 
296 static __rte_always_inline void
297 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
298 			 uint16_t desc_idx, uint32_t len)
299 {
300 	uint16_t i = vq->shadow_used_idx++;
301 
302 	vq->shadow_used_split[i].id  = desc_idx;
303 	vq->shadow_used_split[i].len = len;
304 }
305 
306 static __rte_always_inline void
307 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
308 				  struct vhost_virtqueue *vq)
309 {
310 	int i;
311 	uint16_t used_idx = vq->last_used_idx;
312 	uint16_t head_idx = vq->last_used_idx;
313 	uint16_t head_flags = 0;
314 
315 	/* Split loop in two to save memory barriers */
316 	for (i = 0; i < vq->shadow_used_idx; i++) {
317 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
318 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
319 
320 		used_idx += vq->shadow_used_packed[i].count;
321 		if (used_idx >= vq->size)
322 			used_idx -= vq->size;
323 	}
324 
325 	/* The ordering for storing desc flags needs to be enforced. */
326 	rte_atomic_thread_fence(rte_memory_order_release);
327 
328 	for (i = 0; i < vq->shadow_used_idx; i++) {
329 		uint16_t flags;
330 
331 		if (vq->shadow_used_packed[i].len)
332 			flags = VRING_DESC_F_WRITE;
333 		else
334 			flags = 0;
335 
336 		if (vq->used_wrap_counter) {
337 			flags |= VRING_DESC_F_USED;
338 			flags |= VRING_DESC_F_AVAIL;
339 		} else {
340 			flags &= ~VRING_DESC_F_USED;
341 			flags &= ~VRING_DESC_F_AVAIL;
342 		}
343 
344 		if (i > 0) {
345 			vq->desc_packed[vq->last_used_idx].flags = flags;
346 
347 			vhost_log_cache_used_vring(dev, vq,
348 					vq->last_used_idx *
349 					sizeof(struct vring_packed_desc),
350 					sizeof(struct vring_packed_desc));
351 		} else {
352 			head_idx = vq->last_used_idx;
353 			head_flags = flags;
354 		}
355 
356 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
357 	}
358 
359 	vq->desc_packed[head_idx].flags = head_flags;
360 
361 	vhost_log_cache_used_vring(dev, vq,
362 				head_idx *
363 				sizeof(struct vring_packed_desc),
364 				sizeof(struct vring_packed_desc));
365 
366 	vq->shadow_used_idx = 0;
367 	vhost_log_cache_sync(dev, vq);
368 }
369 
370 static __rte_always_inline void
371 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
372 				  struct vhost_virtqueue *vq)
373 {
374 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
375 
376 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
377 	/* desc flags is the synchronization point for virtio packed vring */
378 	rte_atomic_store_explicit(
379 		(unsigned short __rte_atomic *)&vq->desc_packed[vq->shadow_last_used_idx].flags,
380 		used_elem->flags, rte_memory_order_release);
381 
382 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
383 				   sizeof(struct vring_packed_desc),
384 				   sizeof(struct vring_packed_desc));
385 	vq->shadow_used_idx = 0;
386 	vhost_log_cache_sync(dev, vq);
387 }
388 
389 static __rte_always_inline void
390 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
391 				 struct vhost_virtqueue *vq,
392 				 uint64_t *lens,
393 				 uint16_t *ids)
394 {
395 	uint16_t i;
396 	uint16_t flags;
397 	uint16_t last_used_idx;
398 	struct vring_packed_desc *desc_base;
399 
400 	last_used_idx = vq->last_used_idx;
401 	desc_base = &vq->desc_packed[last_used_idx];
402 
403 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
404 
405 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
406 		desc_base[i].id = ids[i];
407 		desc_base[i].len = lens[i];
408 	}
409 
410 	rte_atomic_thread_fence(rte_memory_order_release);
411 
412 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
413 		desc_base[i].flags = flags;
414 	}
415 
416 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
417 				   sizeof(struct vring_packed_desc),
418 				   sizeof(struct vring_packed_desc) *
419 				   PACKED_BATCH_SIZE);
420 	vhost_log_cache_sync(dev, vq);
421 
422 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
423 }
424 
425 static __rte_always_inline void
426 vhost_async_shadow_enqueue_packed_batch(struct vhost_virtqueue *vq,
427 				 uint64_t *lens,
428 				 uint16_t *ids)
429 	__rte_exclusive_locks_required(&vq->access_lock)
430 {
431 	uint16_t i;
432 	struct vhost_async *async = vq->async;
433 
434 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
435 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
436 		async->buffers_packed[async->buffer_idx_packed].len = lens[i];
437 		async->buffers_packed[async->buffer_idx_packed].count = 1;
438 		async->buffer_idx_packed++;
439 		if (async->buffer_idx_packed >= vq->size)
440 			async->buffer_idx_packed -= vq->size;
441 	}
442 }
443 
444 static __rte_always_inline void
445 vhost_async_shadow_dequeue_packed_batch(struct vhost_virtqueue *vq, uint16_t *ids)
446 	__rte_shared_locks_required(&vq->access_lock)
447 {
448 	uint16_t i;
449 	struct vhost_async *async = vq->async;
450 
451 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
452 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
453 		async->buffers_packed[async->buffer_idx_packed].len = 0;
454 		async->buffers_packed[async->buffer_idx_packed].count = 1;
455 
456 		async->buffer_idx_packed++;
457 		if (async->buffer_idx_packed >= vq->size)
458 			async->buffer_idx_packed -= vq->size;
459 	}
460 }
461 
462 static __rte_always_inline void
463 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
464 					  uint16_t id)
465 {
466 	vq->shadow_used_packed[0].id = id;
467 
468 	if (!vq->shadow_used_idx) {
469 		vq->shadow_last_used_idx = vq->last_used_idx;
470 		vq->shadow_used_packed[0].flags =
471 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
472 		vq->shadow_used_packed[0].len = 0;
473 		vq->shadow_used_packed[0].count = 1;
474 		vq->shadow_used_idx++;
475 	}
476 
477 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
478 }
479 
480 static __rte_always_inline void
481 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
482 				  struct vhost_virtqueue *vq,
483 				  uint16_t *ids)
484 {
485 	uint16_t flags;
486 	uint16_t i;
487 	uint16_t begin;
488 
489 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
490 
491 	if (!vq->shadow_used_idx) {
492 		vq->shadow_last_used_idx = vq->last_used_idx;
493 		vq->shadow_used_packed[0].id  = ids[0];
494 		vq->shadow_used_packed[0].len = 0;
495 		vq->shadow_used_packed[0].count = 1;
496 		vq->shadow_used_packed[0].flags = flags;
497 		vq->shadow_used_idx++;
498 		begin = 1;
499 	} else
500 		begin = 0;
501 
502 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
503 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
504 		vq->desc_packed[vq->last_used_idx + i].len = 0;
505 	}
506 
507 	rte_atomic_thread_fence(rte_memory_order_release);
508 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
509 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
510 
511 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
512 				   sizeof(struct vring_packed_desc),
513 				   sizeof(struct vring_packed_desc) *
514 				   PACKED_BATCH_SIZE);
515 	vhost_log_cache_sync(dev, vq);
516 
517 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
518 }
519 
520 static __rte_always_inline void
521 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
522 				   uint16_t buf_id,
523 				   uint16_t count)
524 {
525 	uint16_t flags;
526 
527 	flags = vq->desc_packed[vq->last_used_idx].flags;
528 	if (vq->used_wrap_counter) {
529 		flags |= VRING_DESC_F_USED;
530 		flags |= VRING_DESC_F_AVAIL;
531 	} else {
532 		flags &= ~VRING_DESC_F_USED;
533 		flags &= ~VRING_DESC_F_AVAIL;
534 	}
535 
536 	if (!vq->shadow_used_idx) {
537 		vq->shadow_last_used_idx = vq->last_used_idx;
538 
539 		vq->shadow_used_packed[0].id  = buf_id;
540 		vq->shadow_used_packed[0].len = 0;
541 		vq->shadow_used_packed[0].flags = flags;
542 		vq->shadow_used_idx++;
543 	} else {
544 		vq->desc_packed[vq->last_used_idx].id = buf_id;
545 		vq->desc_packed[vq->last_used_idx].len = 0;
546 		vq->desc_packed[vq->last_used_idx].flags = flags;
547 	}
548 
549 	vq_inc_last_used_packed(vq, count);
550 }
551 
552 static __rte_always_inline void
553 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
554 					   uint16_t buf_id,
555 					   uint16_t count)
556 {
557 	uint16_t flags;
558 
559 	vq->shadow_used_packed[0].id = buf_id;
560 
561 	flags = vq->desc_packed[vq->last_used_idx].flags;
562 	if (vq->used_wrap_counter) {
563 		flags |= VRING_DESC_F_USED;
564 		flags |= VRING_DESC_F_AVAIL;
565 	} else {
566 		flags &= ~VRING_DESC_F_USED;
567 		flags &= ~VRING_DESC_F_AVAIL;
568 	}
569 
570 	if (!vq->shadow_used_idx) {
571 		vq->shadow_last_used_idx = vq->last_used_idx;
572 		vq->shadow_used_packed[0].len = 0;
573 		vq->shadow_used_packed[0].flags = flags;
574 		vq->shadow_used_idx++;
575 	}
576 
577 	vq_inc_last_used_packed(vq, count);
578 }
579 
580 static __rte_always_inline void
581 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
582 				   uint32_t *len,
583 				   uint16_t *id,
584 				   uint16_t *count,
585 				   uint16_t num_buffers)
586 {
587 	uint16_t i;
588 
589 	for (i = 0; i < num_buffers; i++) {
590 		/* enqueue shadow flush action aligned with batch num */
591 		if (!vq->shadow_used_idx)
592 			vq->shadow_aligned_idx = vq->last_used_idx &
593 				PACKED_BATCH_MASK;
594 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
595 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
596 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
597 		vq->shadow_aligned_idx += count[i];
598 		vq->shadow_used_idx++;
599 	}
600 }
601 
602 static __rte_always_inline void
603 vhost_async_shadow_enqueue_packed(struct vhost_virtqueue *vq,
604 				   uint32_t *len,
605 				   uint16_t *id,
606 				   uint16_t *count,
607 				   uint16_t num_buffers)
608 	__rte_exclusive_locks_required(&vq->access_lock)
609 {
610 	uint16_t i;
611 	struct vhost_async *async = vq->async;
612 
613 	for (i = 0; i < num_buffers; i++) {
614 		async->buffers_packed[async->buffer_idx_packed].id  = id[i];
615 		async->buffers_packed[async->buffer_idx_packed].len = len[i];
616 		async->buffers_packed[async->buffer_idx_packed].count = count[i];
617 		async->buffer_idx_packed++;
618 		if (async->buffer_idx_packed >= vq->size)
619 			async->buffer_idx_packed -= vq->size;
620 	}
621 }
622 
623 static __rte_always_inline void
624 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
625 				   struct vhost_virtqueue *vq,
626 				   uint32_t *len,
627 				   uint16_t *id,
628 				   uint16_t *count,
629 				   uint16_t num_buffers)
630 	__rte_shared_locks_required(&vq->iotlb_lock)
631 {
632 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
633 
634 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
635 		do_data_copy_enqueue(dev, vq);
636 		vhost_flush_enqueue_shadow_packed(dev, vq);
637 	}
638 }
639 
640 /* avoid write operation when necessary, to lessen cache issues */
641 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
642 	if ((var) != (val))			\
643 		(var) = (val);			\
644 } while (0)
645 
646 static __rte_always_inline void
647 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
648 {
649 	uint64_t csum_l4 = m_buf->ol_flags & RTE_MBUF_F_TX_L4_MASK;
650 
651 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
652 		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
653 
654 	if (csum_l4) {
655 		/*
656 		 * Pseudo-header checksum must be set as per Virtio spec.
657 		 *
658 		 * Note: We don't propagate rte_net_intel_cksum_prepare()
659 		 * errors, as it would have an impact on performance, and an
660 		 * error would mean the packet is dropped by the guest instead
661 		 * of being dropped here.
662 		 */
663 		rte_net_intel_cksum_prepare(m_buf);
664 
665 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
666 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
667 
668 		switch (csum_l4) {
669 		case RTE_MBUF_F_TX_TCP_CKSUM:
670 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
671 						cksum));
672 			break;
673 		case RTE_MBUF_F_TX_UDP_CKSUM:
674 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
675 						dgram_cksum));
676 			break;
677 		case RTE_MBUF_F_TX_SCTP_CKSUM:
678 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
679 						cksum));
680 			break;
681 		}
682 	} else {
683 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
684 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
685 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
686 	}
687 
688 	/* IP cksum verification cannot be bypassed, then calculate here */
689 	if (m_buf->ol_flags & RTE_MBUF_F_TX_IP_CKSUM) {
690 		struct rte_ipv4_hdr *ipv4_hdr;
691 
692 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
693 						   m_buf->l2_len);
694 		ipv4_hdr->hdr_checksum = 0;
695 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
696 	}
697 
698 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
699 		if (m_buf->ol_flags & RTE_MBUF_F_TX_IPV4)
700 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
701 		else
702 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
703 		net_hdr->gso_size = m_buf->tso_segsz;
704 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
705 					+ m_buf->l4_len;
706 	} else if (m_buf->ol_flags & RTE_MBUF_F_TX_UDP_SEG) {
707 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
708 		net_hdr->gso_size = m_buf->tso_segsz;
709 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
710 			m_buf->l4_len;
711 	} else {
712 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
713 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
714 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
715 	}
716 }
717 
718 static __rte_always_inline int
719 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
720 		struct buf_vector *buf_vec, uint16_t *vec_idx,
721 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
722 	__rte_shared_locks_required(&vq->iotlb_lock)
723 {
724 	uint16_t vec_id = *vec_idx;
725 
726 	while (desc_len) {
727 		uint64_t desc_addr;
728 		uint64_t desc_chunck_len = desc_len;
729 
730 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
731 			return -1;
732 
733 		desc_addr = vhost_iova_to_vva(dev, vq,
734 				desc_iova,
735 				&desc_chunck_len,
736 				perm);
737 		if (unlikely(!desc_addr))
738 			return -1;
739 
740 		rte_prefetch0((void *)(uintptr_t)desc_addr);
741 
742 		buf_vec[vec_id].buf_iova = desc_iova;
743 		buf_vec[vec_id].buf_addr = desc_addr;
744 		buf_vec[vec_id].buf_len  = desc_chunck_len;
745 
746 		desc_len -= desc_chunck_len;
747 		desc_iova += desc_chunck_len;
748 		vec_id++;
749 	}
750 	*vec_idx = vec_id;
751 
752 	return 0;
753 }
754 
755 static __rte_always_inline int
756 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
757 			 uint32_t avail_idx, uint16_t *vec_idx,
758 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
759 			 uint32_t *desc_chain_len, uint8_t perm)
760 	__rte_shared_locks_required(&vq->iotlb_lock)
761 {
762 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
763 	uint16_t vec_id = *vec_idx;
764 	uint32_t len    = 0;
765 	uint64_t dlen;
766 	uint32_t nr_descs = vq->size;
767 	uint32_t cnt    = 0;
768 	struct vring_desc *descs = vq->desc;
769 	struct vring_desc *idesc = NULL;
770 
771 	if (unlikely(idx >= vq->size))
772 		return -1;
773 
774 	*desc_chain_head = idx;
775 
776 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
777 		dlen = vq->desc[idx].len;
778 		nr_descs = dlen / sizeof(struct vring_desc);
779 		if (unlikely(nr_descs > vq->size))
780 			return -1;
781 
782 		descs = (struct vring_desc *)(uintptr_t)
783 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
784 						&dlen,
785 						VHOST_ACCESS_RO);
786 		if (unlikely(!descs))
787 			return -1;
788 
789 		if (unlikely(dlen < vq->desc[idx].len)) {
790 			/*
791 			 * The indirect desc table is not contiguous
792 			 * in process VA space, we have to copy it.
793 			 */
794 			idesc = vhost_alloc_copy_ind_table(dev, vq,
795 					vq->desc[idx].addr, vq->desc[idx].len);
796 			if (unlikely(!idesc))
797 				return -1;
798 
799 			descs = idesc;
800 		}
801 
802 		idx = 0;
803 	}
804 
805 	while (1) {
806 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
807 			free_ind_table(idesc);
808 			return -1;
809 		}
810 
811 		dlen = descs[idx].len;
812 		len += dlen;
813 
814 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
815 						descs[idx].addr, dlen,
816 						perm))) {
817 			free_ind_table(idesc);
818 			return -1;
819 		}
820 
821 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
822 			break;
823 
824 		idx = descs[idx].next;
825 	}
826 
827 	*desc_chain_len = len;
828 	*vec_idx = vec_id;
829 
830 	if (unlikely(!!idesc))
831 		free_ind_table(idesc);
832 
833 	return 0;
834 }
835 
836 /*
837  * Returns -1 on fail, 0 on success
838  */
839 static inline int
840 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
841 				uint64_t size, struct buf_vector *buf_vec,
842 				uint16_t *num_buffers, uint16_t avail_head,
843 				uint16_t *nr_vec)
844 	__rte_shared_locks_required(&vq->iotlb_lock)
845 {
846 	uint16_t cur_idx;
847 	uint16_t vec_idx = 0;
848 	uint16_t max_tries, tries = 0;
849 
850 	uint16_t head_idx = 0;
851 	uint32_t len = 0;
852 
853 	*num_buffers = 0;
854 	cur_idx  = vq->last_avail_idx;
855 
856 	if (rxvq_is_mergeable(dev))
857 		max_tries = vq->size - 1;
858 	else
859 		max_tries = 1;
860 
861 	while (size > 0) {
862 		if (unlikely(cur_idx == avail_head))
863 			return -1;
864 		/*
865 		 * if we tried all available ring items, and still
866 		 * can't get enough buf, it means something abnormal
867 		 * happened.
868 		 */
869 		if (unlikely(++tries > max_tries))
870 			return -1;
871 
872 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
873 						&vec_idx, buf_vec,
874 						&head_idx, &len,
875 						VHOST_ACCESS_RW) < 0))
876 			return -1;
877 		len = RTE_MIN(len, size);
878 		update_shadow_used_ring_split(vq, head_idx, len);
879 		size -= len;
880 
881 		cur_idx++;
882 		*num_buffers += 1;
883 	}
884 
885 	*nr_vec = vec_idx;
886 
887 	return 0;
888 }
889 
890 static __rte_always_inline int
891 fill_vec_buf_packed_indirect(struct virtio_net *dev,
892 			struct vhost_virtqueue *vq,
893 			struct vring_packed_desc *desc, uint16_t *vec_idx,
894 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
895 	__rte_shared_locks_required(&vq->iotlb_lock)
896 {
897 	uint16_t i;
898 	uint32_t nr_descs;
899 	uint16_t vec_id = *vec_idx;
900 	uint64_t dlen;
901 	struct vring_packed_desc *descs, *idescs = NULL;
902 
903 	dlen = desc->len;
904 	descs = (struct vring_packed_desc *)(uintptr_t)
905 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
906 	if (unlikely(!descs))
907 		return -1;
908 
909 	if (unlikely(dlen < desc->len)) {
910 		/*
911 		 * The indirect desc table is not contiguous
912 		 * in process VA space, we have to copy it.
913 		 */
914 		idescs = vhost_alloc_copy_ind_table(dev,
915 				vq, desc->addr, desc->len);
916 		if (unlikely(!idescs))
917 			return -1;
918 
919 		descs = idescs;
920 	}
921 
922 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
923 	if (unlikely(nr_descs >= vq->size)) {
924 		free_ind_table(idescs);
925 		return -1;
926 	}
927 
928 	for (i = 0; i < nr_descs; i++) {
929 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
930 			free_ind_table(idescs);
931 			return -1;
932 		}
933 
934 		dlen = descs[i].len;
935 		*len += dlen;
936 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
937 						descs[i].addr, dlen,
938 						perm)))
939 			return -1;
940 	}
941 	*vec_idx = vec_id;
942 
943 	if (unlikely(!!idescs))
944 		free_ind_table(idescs);
945 
946 	return 0;
947 }
948 
949 static __rte_always_inline int
950 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
951 				uint16_t avail_idx, uint16_t *desc_count,
952 				struct buf_vector *buf_vec, uint16_t *vec_idx,
953 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
954 	__rte_shared_locks_required(&vq->iotlb_lock)
955 {
956 	bool wrap_counter = vq->avail_wrap_counter;
957 	struct vring_packed_desc *descs = vq->desc_packed;
958 	uint16_t vec_id = *vec_idx;
959 	uint64_t dlen;
960 
961 	if (avail_idx < vq->last_avail_idx)
962 		wrap_counter ^= 1;
963 
964 	/*
965 	 * Perform a load-acquire barrier in desc_is_avail to
966 	 * enforce the ordering between desc flags and desc
967 	 * content.
968 	 */
969 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
970 		return -1;
971 
972 	*desc_count = 0;
973 	*len = 0;
974 
975 	while (1) {
976 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
977 			return -1;
978 
979 		if (unlikely(*desc_count >= vq->size))
980 			return -1;
981 
982 		*desc_count += 1;
983 		*buf_id = descs[avail_idx].id;
984 
985 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
986 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
987 							&descs[avail_idx],
988 							&vec_id, buf_vec,
989 							len, perm) < 0))
990 				return -1;
991 		} else {
992 			dlen = descs[avail_idx].len;
993 			*len += dlen;
994 
995 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
996 							descs[avail_idx].addr,
997 							dlen,
998 							perm)))
999 				return -1;
1000 		}
1001 
1002 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
1003 			break;
1004 
1005 		if (++avail_idx >= vq->size) {
1006 			avail_idx -= vq->size;
1007 			wrap_counter ^= 1;
1008 		}
1009 	}
1010 
1011 	*vec_idx = vec_id;
1012 
1013 	return 0;
1014 }
1015 
1016 static __rte_noinline void
1017 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1018 		struct buf_vector *buf_vec,
1019 		struct virtio_net_hdr_mrg_rxbuf *hdr)
1020 	__rte_shared_locks_required(&vq->iotlb_lock)
1021 {
1022 	uint64_t len;
1023 	uint64_t remain = dev->vhost_hlen;
1024 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
1025 	uint64_t iova = buf_vec->buf_iova;
1026 
1027 	while (remain) {
1028 		len = RTE_MIN(remain,
1029 				buf_vec->buf_len);
1030 		dst = buf_vec->buf_addr;
1031 		rte_memcpy((void *)(uintptr_t)dst,
1032 				(void *)(uintptr_t)src,
1033 				len);
1034 
1035 		PRINT_PACKET(dev, (uintptr_t)dst,
1036 				(uint32_t)len, 0);
1037 		vhost_log_cache_write_iova(dev, vq,
1038 				iova, len);
1039 
1040 		remain -= len;
1041 		iova += len;
1042 		src += len;
1043 		buf_vec++;
1044 	}
1045 }
1046 
1047 static __rte_always_inline int
1048 async_iter_initialize(struct virtio_net *dev, struct vhost_async *async)
1049 {
1050 	struct vhost_iov_iter *iter;
1051 
1052 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1053 		VHOST_DATA_LOG(dev->ifname, ERR, "no more async iovec available");
1054 		return -1;
1055 	}
1056 
1057 	iter = async->iov_iter + async->iter_idx;
1058 	iter->iov = async->iovec + async->iovec_idx;
1059 	iter->nr_segs = 0;
1060 
1061 	return 0;
1062 }
1063 
1064 static __rte_always_inline int
1065 async_iter_add_iovec(struct virtio_net *dev, struct vhost_async *async,
1066 		void *src, void *dst, size_t len)
1067 {
1068 	struct vhost_iov_iter *iter;
1069 	struct vhost_iovec *iovec;
1070 
1071 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1072 		static bool vhost_max_async_vec_log;
1073 
1074 		if (!vhost_max_async_vec_log) {
1075 			VHOST_DATA_LOG(dev->ifname, ERR, "no more async iovec available");
1076 			vhost_max_async_vec_log = true;
1077 		}
1078 
1079 		return -1;
1080 	}
1081 
1082 	iter = async->iov_iter + async->iter_idx;
1083 	iovec = async->iovec + async->iovec_idx;
1084 
1085 	iovec->src_addr = src;
1086 	iovec->dst_addr = dst;
1087 	iovec->len = len;
1088 
1089 	iter->nr_segs++;
1090 	async->iovec_idx++;
1091 
1092 	return 0;
1093 }
1094 
1095 static __rte_always_inline void
1096 async_iter_finalize(struct vhost_async *async)
1097 {
1098 	async->iter_idx++;
1099 }
1100 
1101 static __rte_always_inline void
1102 async_iter_cancel(struct vhost_async *async)
1103 {
1104 	struct vhost_iov_iter *iter;
1105 
1106 	iter = async->iov_iter + async->iter_idx;
1107 	async->iovec_idx -= iter->nr_segs;
1108 	iter->nr_segs = 0;
1109 	iter->iov = NULL;
1110 }
1111 
1112 static __rte_always_inline void
1113 async_iter_reset(struct vhost_async *async)
1114 {
1115 	async->iter_idx = 0;
1116 	async->iovec_idx = 0;
1117 }
1118 
1119 static __rte_always_inline int
1120 async_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1121 		struct rte_mbuf *m, uint32_t mbuf_offset,
1122 		uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1123 	__rte_shared_locks_required(&vq->access_lock)
1124 	__rte_shared_locks_required(&vq->iotlb_lock)
1125 {
1126 	struct vhost_async *async = vq->async;
1127 	uint64_t mapped_len;
1128 	uint32_t buf_offset = 0;
1129 	void *src, *dst;
1130 	void *host_iova;
1131 
1132 	while (cpy_len) {
1133 		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
1134 				buf_iova + buf_offset, cpy_len, &mapped_len);
1135 		if (unlikely(!host_iova)) {
1136 			VHOST_DATA_LOG(dev->ifname, ERR,
1137 				"%s: failed to get host iova.",
1138 				__func__);
1139 			return -1;
1140 		}
1141 
1142 		if (to_desc) {
1143 			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1144 			dst = host_iova;
1145 		} else {
1146 			src = host_iova;
1147 			dst = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1148 		}
1149 
1150 		if (unlikely(async_iter_add_iovec(dev, async, src, dst, (size_t)mapped_len)))
1151 			return -1;
1152 
1153 		cpy_len -= (uint32_t)mapped_len;
1154 		mbuf_offset += (uint32_t)mapped_len;
1155 		buf_offset += (uint32_t)mapped_len;
1156 	}
1157 
1158 	return 0;
1159 }
1160 
1161 static __rte_always_inline void
1162 sync_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1163 		struct rte_mbuf *m, uint32_t mbuf_offset,
1164 		uint64_t buf_addr, uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1165 	__rte_shared_locks_required(&vq->iotlb_lock)
1166 {
1167 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
1168 
1169 	if (likely(cpy_len > MAX_BATCH_LEN || vq->batch_copy_nb_elems >= vq->size)) {
1170 		if (to_desc) {
1171 			rte_memcpy((void *)((uintptr_t)(buf_addr)),
1172 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1173 				cpy_len);
1174 			vhost_log_cache_write_iova(dev, vq, buf_iova, cpy_len);
1175 			PRINT_PACKET(dev, (uintptr_t)(buf_addr), cpy_len, 0);
1176 		} else {
1177 			rte_memcpy(rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1178 				(void *)((uintptr_t)(buf_addr)),
1179 				cpy_len);
1180 		}
1181 	} else {
1182 		if (to_desc) {
1183 			batch_copy[vq->batch_copy_nb_elems].dst =
1184 				(void *)((uintptr_t)(buf_addr));
1185 			batch_copy[vq->batch_copy_nb_elems].src =
1186 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1187 			batch_copy[vq->batch_copy_nb_elems].log_addr = buf_iova;
1188 		} else {
1189 			batch_copy[vq->batch_copy_nb_elems].dst =
1190 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1191 			batch_copy[vq->batch_copy_nb_elems].src =
1192 				(void *)((uintptr_t)(buf_addr));
1193 		}
1194 		batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
1195 		vq->batch_copy_nb_elems++;
1196 	}
1197 }
1198 
1199 static __rte_always_inline int
1200 mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1201 		struct rte_mbuf *m, struct buf_vector *buf_vec,
1202 		uint16_t nr_vec, uint16_t num_buffers, bool is_async)
1203 	__rte_shared_locks_required(&vq->access_lock)
1204 	__rte_shared_locks_required(&vq->iotlb_lock)
1205 {
1206 	uint32_t vec_idx = 0;
1207 	uint32_t mbuf_offset, mbuf_avail;
1208 	uint32_t buf_offset, buf_avail;
1209 	uint64_t buf_addr, buf_iova, buf_len;
1210 	uint32_t cpy_len;
1211 	uint64_t hdr_addr;
1212 	struct rte_mbuf *hdr_mbuf;
1213 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
1214 	struct vhost_async *async = vq->async;
1215 
1216 	if (unlikely(m == NULL))
1217 		return -1;
1218 
1219 	buf_addr = buf_vec[vec_idx].buf_addr;
1220 	buf_iova = buf_vec[vec_idx].buf_iova;
1221 	buf_len = buf_vec[vec_idx].buf_len;
1222 
1223 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
1224 		return -1;
1225 
1226 	hdr_mbuf = m;
1227 	hdr_addr = buf_addr;
1228 	if (unlikely(buf_len < dev->vhost_hlen)) {
1229 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
1230 		hdr = &tmp_hdr;
1231 	} else
1232 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
1233 
1234 	VHOST_DATA_LOG(dev->ifname, DEBUG, "RX: num merge buffers %d", num_buffers);
1235 
1236 	if (unlikely(buf_len < dev->vhost_hlen)) {
1237 		buf_offset = dev->vhost_hlen - buf_len;
1238 		vec_idx++;
1239 		buf_addr = buf_vec[vec_idx].buf_addr;
1240 		buf_iova = buf_vec[vec_idx].buf_iova;
1241 		buf_len = buf_vec[vec_idx].buf_len;
1242 		buf_avail = buf_len - buf_offset;
1243 	} else {
1244 		buf_offset = dev->vhost_hlen;
1245 		buf_avail = buf_len - dev->vhost_hlen;
1246 	}
1247 
1248 	mbuf_avail  = rte_pktmbuf_data_len(m);
1249 	mbuf_offset = 0;
1250 
1251 	if (is_async) {
1252 		if (async_iter_initialize(dev, async))
1253 			return -1;
1254 	}
1255 
1256 	while (mbuf_avail != 0 || m->next != NULL) {
1257 		/* done with current buf, get the next one */
1258 		if (buf_avail == 0) {
1259 			vec_idx++;
1260 			if (unlikely(vec_idx >= nr_vec))
1261 				goto error;
1262 
1263 			buf_addr = buf_vec[vec_idx].buf_addr;
1264 			buf_iova = buf_vec[vec_idx].buf_iova;
1265 			buf_len = buf_vec[vec_idx].buf_len;
1266 
1267 			buf_offset = 0;
1268 			buf_avail  = buf_len;
1269 		}
1270 
1271 		/* done with current mbuf, get the next one */
1272 		if (mbuf_avail == 0) {
1273 			m = m->next;
1274 
1275 			mbuf_offset = 0;
1276 			mbuf_avail  = rte_pktmbuf_data_len(m);
1277 		}
1278 
1279 		if (hdr_addr) {
1280 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1281 			if (rxvq_is_mergeable(dev))
1282 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1283 						num_buffers);
1284 
1285 			if (unlikely(hdr == &tmp_hdr)) {
1286 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1287 			} else {
1288 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1289 						dev->vhost_hlen, 0);
1290 				vhost_log_cache_write_iova(dev, vq,
1291 						buf_vec[0].buf_iova,
1292 						dev->vhost_hlen);
1293 			}
1294 
1295 			hdr_addr = 0;
1296 		}
1297 
1298 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1299 
1300 		if (is_async) {
1301 			if (async_fill_seg(dev, vq, m, mbuf_offset,
1302 					   buf_iova + buf_offset, cpy_len, true) < 0)
1303 				goto error;
1304 		} else {
1305 			sync_fill_seg(dev, vq, m, mbuf_offset,
1306 				      buf_addr + buf_offset,
1307 				      buf_iova + buf_offset, cpy_len, true);
1308 		}
1309 
1310 		mbuf_avail  -= cpy_len;
1311 		mbuf_offset += cpy_len;
1312 		buf_avail  -= cpy_len;
1313 		buf_offset += cpy_len;
1314 	}
1315 
1316 	if (is_async)
1317 		async_iter_finalize(async);
1318 
1319 	return 0;
1320 error:
1321 	if (is_async)
1322 		async_iter_cancel(async);
1323 
1324 	return -1;
1325 }
1326 
1327 static __rte_always_inline int
1328 vhost_enqueue_single_packed(struct virtio_net *dev,
1329 			    struct vhost_virtqueue *vq,
1330 			    struct rte_mbuf *pkt,
1331 			    struct buf_vector *buf_vec,
1332 			    uint16_t *nr_descs)
1333 	__rte_shared_locks_required(&vq->access_lock)
1334 	__rte_shared_locks_required(&vq->iotlb_lock)
1335 {
1336 	uint16_t nr_vec = 0;
1337 	uint16_t avail_idx = vq->last_avail_idx;
1338 	uint16_t max_tries, tries = 0;
1339 	uint16_t buf_id = 0;
1340 	uint32_t len = 0;
1341 	uint16_t desc_count;
1342 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1343 	uint16_t num_buffers = 0;
1344 	uint32_t buffer_len[vq->size];
1345 	uint16_t buffer_buf_id[vq->size];
1346 	uint16_t buffer_desc_count[vq->size];
1347 
1348 	if (rxvq_is_mergeable(dev))
1349 		max_tries = vq->size - 1;
1350 	else
1351 		max_tries = 1;
1352 
1353 	while (size > 0) {
1354 		/*
1355 		 * if we tried all available ring items, and still
1356 		 * can't get enough buf, it means something abnormal
1357 		 * happened.
1358 		 */
1359 		if (unlikely(++tries > max_tries))
1360 			return -1;
1361 
1362 		if (unlikely(fill_vec_buf_packed(dev, vq,
1363 						avail_idx, &desc_count,
1364 						buf_vec, &nr_vec,
1365 						&buf_id, &len,
1366 						VHOST_ACCESS_RW) < 0))
1367 			return -1;
1368 
1369 		len = RTE_MIN(len, size);
1370 		size -= len;
1371 
1372 		buffer_len[num_buffers] = len;
1373 		buffer_buf_id[num_buffers] = buf_id;
1374 		buffer_desc_count[num_buffers] = desc_count;
1375 		num_buffers += 1;
1376 
1377 		*nr_descs += desc_count;
1378 		avail_idx += desc_count;
1379 		if (avail_idx >= vq->size)
1380 			avail_idx -= vq->size;
1381 	}
1382 
1383 	if (mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers, false) < 0)
1384 		return -1;
1385 
1386 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1387 					   buffer_desc_count, num_buffers);
1388 
1389 	return 0;
1390 }
1391 
1392 static __rte_noinline uint32_t
1393 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1394 	struct rte_mbuf **pkts, uint32_t count)
1395 	__rte_shared_locks_required(&vq->access_lock)
1396 	__rte_shared_locks_required(&vq->iotlb_lock)
1397 {
1398 	uint32_t pkt_idx = 0;
1399 	uint16_t num_buffers;
1400 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1401 	uint16_t avail_head;
1402 
1403 	/*
1404 	 * The ordering between avail index and
1405 	 * desc reads needs to be enforced.
1406 	 */
1407 	avail_head = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
1408 		rte_memory_order_acquire);
1409 
1410 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1411 
1412 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1413 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1414 		uint16_t nr_vec = 0;
1415 
1416 		if (unlikely(reserve_avail_buf_split(dev, vq,
1417 						pkt_len, buf_vec, &num_buffers,
1418 						avail_head, &nr_vec) < 0)) {
1419 			VHOST_DATA_LOG(dev->ifname, DEBUG,
1420 				"failed to get enough desc from vring");
1421 			vq->shadow_used_idx -= num_buffers;
1422 			break;
1423 		}
1424 
1425 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1426 			"current index %d | end index %d",
1427 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1428 
1429 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec,
1430 					num_buffers, false) < 0) {
1431 			vq->shadow_used_idx -= num_buffers;
1432 			break;
1433 		}
1434 
1435 		vq->last_avail_idx += num_buffers;
1436 		vhost_virtqueue_reconnect_log_split(vq);
1437 	}
1438 
1439 	do_data_copy_enqueue(dev, vq);
1440 
1441 	if (likely(vq->shadow_used_idx)) {
1442 		flush_shadow_used_ring_split(dev, vq);
1443 		vhost_vring_call_split(dev, vq);
1444 	}
1445 
1446 	return pkt_idx;
1447 }
1448 
1449 static __rte_always_inline int
1450 virtio_dev_rx_sync_batch_check(struct virtio_net *dev,
1451 			   struct vhost_virtqueue *vq,
1452 			   struct rte_mbuf **pkts,
1453 			   uint64_t *desc_addrs,
1454 			   uint64_t *lens)
1455 	__rte_shared_locks_required(&vq->iotlb_lock)
1456 {
1457 	bool wrap_counter = vq->avail_wrap_counter;
1458 	struct vring_packed_desc *descs = vq->desc_packed;
1459 	uint16_t avail_idx = vq->last_avail_idx;
1460 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1461 	uint16_t i;
1462 
1463 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1464 		return -1;
1465 
1466 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1467 		return -1;
1468 
1469 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1470 		if (unlikely(pkts[i]->next != NULL))
1471 			return -1;
1472 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1473 					    wrap_counter)))
1474 			return -1;
1475 	}
1476 
1477 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1478 		lens[i] = descs[avail_idx + i].len;
1479 
1480 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1481 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1482 			return -1;
1483 	}
1484 
1485 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1486 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1487 						  descs[avail_idx + i].addr,
1488 						  &lens[i],
1489 						  VHOST_ACCESS_RW);
1490 
1491 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1492 		if (unlikely(!desc_addrs[i]))
1493 			return -1;
1494 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1495 			return -1;
1496 	}
1497 
1498 	return 0;
1499 }
1500 
1501 static __rte_always_inline int
1502 virtio_dev_rx_async_batch_check(struct vhost_virtqueue *vq,
1503 			   struct rte_mbuf **pkts,
1504 			   uint64_t *desc_addrs,
1505 			   uint64_t *lens,
1506 			   int16_t dma_id,
1507 			   uint16_t vchan_id)
1508 {
1509 	bool wrap_counter = vq->avail_wrap_counter;
1510 	struct vring_packed_desc *descs = vq->desc_packed;
1511 	uint16_t avail_idx = vq->last_avail_idx;
1512 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1513 	uint16_t i;
1514 
1515 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1516 		return -1;
1517 
1518 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1519 		return -1;
1520 
1521 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1522 		if (unlikely(pkts[i]->next != NULL))
1523 			return -1;
1524 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1525 					    wrap_counter)))
1526 			return -1;
1527 	}
1528 
1529 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1530 		lens[i] = descs[avail_idx + i].len;
1531 
1532 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1533 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1534 			return -1;
1535 	}
1536 
1537 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1538 		desc_addrs[i] =  descs[avail_idx + i].addr;
1539 
1540 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1541 		if (unlikely(!desc_addrs[i]))
1542 			return -1;
1543 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1544 			return -1;
1545 	}
1546 
1547 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
1548 		return -1;
1549 
1550 	return 0;
1551 }
1552 
1553 static __rte_always_inline void
1554 virtio_dev_rx_batch_packed_copy(struct virtio_net *dev,
1555 			   struct vhost_virtqueue *vq,
1556 			   struct rte_mbuf **pkts,
1557 			   uint64_t *desc_addrs,
1558 			   uint64_t *lens)
1559 	__rte_shared_locks_required(&vq->iotlb_lock)
1560 {
1561 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1562 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1563 	struct vring_packed_desc *descs = vq->desc_packed;
1564 	uint16_t avail_idx = vq->last_avail_idx;
1565 	uint16_t ids[PACKED_BATCH_SIZE];
1566 	uint16_t i;
1567 
1568 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1569 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1570 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1571 					(uintptr_t)desc_addrs[i];
1572 		lens[i] = pkts[i]->pkt_len +
1573 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1574 	}
1575 
1576 	if (rxvq_is_mergeable(dev)) {
1577 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1578 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
1579 		}
1580 	}
1581 
1582 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1583 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1584 
1585 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1586 
1587 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1588 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1589 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1590 			   pkts[i]->pkt_len);
1591 	}
1592 
1593 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1594 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1595 					   lens[i]);
1596 
1597 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1598 		ids[i] = descs[avail_idx + i].id;
1599 
1600 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1601 }
1602 
1603 static __rte_always_inline int
1604 virtio_dev_rx_sync_batch_packed(struct virtio_net *dev,
1605 			   struct vhost_virtqueue *vq,
1606 			   struct rte_mbuf **pkts)
1607 	__rte_shared_locks_required(&vq->iotlb_lock)
1608 {
1609 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1610 	uint64_t lens[PACKED_BATCH_SIZE];
1611 
1612 	if (virtio_dev_rx_sync_batch_check(dev, vq, pkts, desc_addrs, lens) == -1)
1613 		return -1;
1614 
1615 	if (vq->shadow_used_idx) {
1616 		do_data_copy_enqueue(dev, vq);
1617 		vhost_flush_enqueue_shadow_packed(dev, vq);
1618 	}
1619 
1620 	virtio_dev_rx_batch_packed_copy(dev, vq, pkts, desc_addrs, lens);
1621 
1622 	return 0;
1623 }
1624 
1625 static __rte_always_inline int16_t
1626 virtio_dev_rx_single_packed(struct virtio_net *dev,
1627 			    struct vhost_virtqueue *vq,
1628 			    struct rte_mbuf *pkt)
1629 	__rte_shared_locks_required(&vq->access_lock)
1630 	__rte_shared_locks_required(&vq->iotlb_lock)
1631 {
1632 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1633 	uint16_t nr_descs = 0;
1634 
1635 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1636 						 &nr_descs) < 0)) {
1637 		VHOST_DATA_LOG(dev->ifname, DEBUG, "failed to get enough desc from vring");
1638 		return -1;
1639 	}
1640 
1641 	VHOST_DATA_LOG(dev->ifname, DEBUG,
1642 		"current index %d | end index %d",
1643 		vq->last_avail_idx, vq->last_avail_idx + nr_descs);
1644 
1645 	vq_inc_last_avail_packed(vq, nr_descs);
1646 
1647 	return 0;
1648 }
1649 
1650 static __rte_noinline uint32_t
1651 virtio_dev_rx_packed(struct virtio_net *dev,
1652 		     struct vhost_virtqueue *__rte_restrict vq,
1653 		     struct rte_mbuf **__rte_restrict pkts,
1654 		     uint32_t count)
1655 	__rte_shared_locks_required(&vq->access_lock)
1656 	__rte_shared_locks_required(&vq->iotlb_lock)
1657 {
1658 	uint32_t pkt_idx = 0;
1659 
1660 	do {
1661 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1662 
1663 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1664 			if (!virtio_dev_rx_sync_batch_packed(dev, vq,
1665 							&pkts[pkt_idx])) {
1666 				pkt_idx += PACKED_BATCH_SIZE;
1667 				continue;
1668 			}
1669 		}
1670 
1671 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1672 			break;
1673 		pkt_idx++;
1674 
1675 	} while (pkt_idx < count);
1676 
1677 	if (vq->shadow_used_idx) {
1678 		do_data_copy_enqueue(dev, vq);
1679 		vhost_flush_enqueue_shadow_packed(dev, vq);
1680 	}
1681 
1682 	if (pkt_idx)
1683 		vhost_vring_call_packed(dev, vq);
1684 
1685 	return pkt_idx;
1686 }
1687 
1688 static void
1689 virtio_dev_vring_translate(struct virtio_net *dev, struct vhost_virtqueue *vq)
1690 {
1691 	rte_rwlock_write_lock(&vq->access_lock);
1692 	vhost_user_iotlb_rd_lock(vq);
1693 	if (!vq->access_ok)
1694 		vring_translate(dev, vq);
1695 	vhost_user_iotlb_rd_unlock(vq);
1696 	rte_rwlock_write_unlock(&vq->access_lock);
1697 }
1698 
1699 static __rte_always_inline uint32_t
1700 virtio_dev_rx(struct virtio_net *dev, struct vhost_virtqueue *vq,
1701 	struct rte_mbuf **pkts, uint32_t count)
1702 {
1703 	uint32_t nb_tx = 0;
1704 
1705 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
1706 	rte_rwlock_read_lock(&vq->access_lock);
1707 
1708 	if (unlikely(!vq->enabled))
1709 		goto out_access_unlock;
1710 
1711 	vhost_user_iotlb_rd_lock(vq);
1712 
1713 	if (unlikely(!vq->access_ok)) {
1714 		vhost_user_iotlb_rd_unlock(vq);
1715 		rte_rwlock_read_unlock(&vq->access_lock);
1716 
1717 		virtio_dev_vring_translate(dev, vq);
1718 		goto out_no_unlock;
1719 	}
1720 
1721 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1722 	if (count == 0)
1723 		goto out;
1724 
1725 	if (vq_is_packed(dev))
1726 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1727 	else
1728 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1729 
1730 	vhost_queue_stats_update(dev, vq, pkts, nb_tx);
1731 
1732 out:
1733 	vhost_user_iotlb_rd_unlock(vq);
1734 
1735 out_access_unlock:
1736 	rte_rwlock_read_unlock(&vq->access_lock);
1737 
1738 out_no_unlock:
1739 	return nb_tx;
1740 }
1741 
1742 uint16_t
1743 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1744 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1745 {
1746 	struct virtio_net *dev = get_device(vid);
1747 
1748 	if (!dev)
1749 		return 0;
1750 
1751 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1752 		VHOST_DATA_LOG(dev->ifname, ERR,
1753 			"%s: built-in vhost net backend is disabled.",
1754 			__func__);
1755 		return 0;
1756 	}
1757 
1758 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1759 		VHOST_DATA_LOG(dev->ifname, ERR,
1760 			"%s: invalid virtqueue idx %d.",
1761 			__func__, queue_id);
1762 		return 0;
1763 	}
1764 
1765 	return virtio_dev_rx(dev, dev->virtqueue[queue_id], pkts, count);
1766 }
1767 
1768 static __rte_always_inline uint16_t
1769 async_get_first_inflight_pkt_idx(struct vhost_virtqueue *vq)
1770 	__rte_shared_locks_required(&vq->access_lock)
1771 {
1772 	struct vhost_async *async = vq->async;
1773 
1774 	if (async->pkts_idx >= async->pkts_inflight_n)
1775 		return async->pkts_idx - async->pkts_inflight_n;
1776 	else
1777 		return vq->size - async->pkts_inflight_n + async->pkts_idx;
1778 }
1779 
1780 static __rte_always_inline void
1781 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1782 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1783 {
1784 	size_t elem_size = sizeof(struct vring_used_elem);
1785 
1786 	if (d_idx + count <= ring_size) {
1787 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1788 	} else {
1789 		uint16_t size = ring_size - d_idx;
1790 
1791 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1792 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1793 	}
1794 }
1795 
1796 static __rte_noinline uint32_t
1797 virtio_dev_rx_async_submit_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1798 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
1799 	__rte_exclusive_locks_required(&vq->access_lock)
1800 	__rte_shared_locks_required(&vq->iotlb_lock)
1801 {
1802 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1803 	uint32_t pkt_idx = 0;
1804 	uint16_t num_buffers;
1805 	uint16_t avail_head;
1806 
1807 	struct vhost_async *async = vq->async;
1808 	struct async_inflight_info *pkts_info = async->pkts_info;
1809 	uint32_t pkt_err = 0;
1810 	uint16_t n_xfer;
1811 	uint16_t slot_idx = 0;
1812 
1813 	/*
1814 	 * The ordering between avail index and desc reads need to be enforced.
1815 	 */
1816 	avail_head = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
1817 		rte_memory_order_acquire);
1818 
1819 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1820 
1821 	async_iter_reset(async);
1822 
1823 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1824 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1825 		uint16_t nr_vec = 0;
1826 
1827 		if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec,
1828 						&num_buffers, avail_head, &nr_vec) < 0)) {
1829 			VHOST_DATA_LOG(dev->ifname, DEBUG,
1830 				"failed to get enough desc from vring");
1831 			vq->shadow_used_idx -= num_buffers;
1832 			break;
1833 		}
1834 
1835 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1836 			"current index %d | end index %d",
1837 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1838 
1839 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers, true) < 0) {
1840 			vq->shadow_used_idx -= num_buffers;
1841 			break;
1842 		}
1843 
1844 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
1845 		pkts_info[slot_idx].descs = num_buffers;
1846 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1847 
1848 		vq->last_avail_idx += num_buffers;
1849 		vhost_virtqueue_reconnect_log_split(vq);
1850 	}
1851 
1852 	if (unlikely(pkt_idx == 0))
1853 		return 0;
1854 
1855 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
1856 			async->iov_iter, pkt_idx);
1857 
1858 	pkt_err = pkt_idx - n_xfer;
1859 	if (unlikely(pkt_err)) {
1860 		uint16_t num_descs = 0;
1861 
1862 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1863 			"%s: failed to transfer %u packets for queue %u.",
1864 			__func__, pkt_err, vq->index);
1865 
1866 		/* update number of completed packets */
1867 		pkt_idx = n_xfer;
1868 
1869 		/* calculate the sum of descriptors to revert */
1870 		while (pkt_err-- > 0) {
1871 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1872 			slot_idx--;
1873 		}
1874 
1875 		/* recover shadow used ring and available ring */
1876 		vq->shadow_used_idx -= num_descs;
1877 		vq->last_avail_idx -= num_descs;
1878 		vhost_virtqueue_reconnect_log_split(vq);
1879 	}
1880 
1881 	/* keep used descriptors */
1882 	if (likely(vq->shadow_used_idx)) {
1883 		uint16_t to = async->desc_idx_split & (vq->size - 1);
1884 
1885 		store_dma_desc_info_split(vq->shadow_used_split,
1886 				async->descs_split, vq->size, 0, to,
1887 				vq->shadow_used_idx);
1888 
1889 		async->desc_idx_split += vq->shadow_used_idx;
1890 
1891 		async->pkts_idx += pkt_idx;
1892 		if (async->pkts_idx >= vq->size)
1893 			async->pkts_idx -= vq->size;
1894 
1895 		async->pkts_inflight_n += pkt_idx;
1896 		vq->shadow_used_idx = 0;
1897 	}
1898 
1899 	return pkt_idx;
1900 }
1901 
1902 
1903 static __rte_always_inline int
1904 vhost_enqueue_async_packed(struct virtio_net *dev,
1905 			    struct vhost_virtqueue *vq,
1906 			    struct rte_mbuf *pkt,
1907 			    struct buf_vector *buf_vec,
1908 			    uint16_t *nr_descs,
1909 			    uint16_t *nr_buffers)
1910 	__rte_exclusive_locks_required(&vq->access_lock)
1911 	__rte_shared_locks_required(&vq->iotlb_lock)
1912 {
1913 	uint16_t nr_vec = 0;
1914 	uint16_t avail_idx = vq->last_avail_idx;
1915 	uint16_t max_tries, tries = 0;
1916 	uint16_t buf_id = 0;
1917 	uint32_t len = 0;
1918 	uint16_t desc_count = 0;
1919 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1920 	uint32_t buffer_len[vq->size];
1921 	uint16_t buffer_buf_id[vq->size];
1922 	uint16_t buffer_desc_count[vq->size];
1923 
1924 	if (rxvq_is_mergeable(dev))
1925 		max_tries = vq->size - 1;
1926 	else
1927 		max_tries = 1;
1928 
1929 	do {
1930 		/*
1931 		 * if we tried all available ring items, and still
1932 		 * can't get enough buf, it means something abnormal
1933 		 * happened.
1934 		 */
1935 		if (unlikely(++tries > max_tries))
1936 			return -1;
1937 
1938 		if (unlikely(fill_vec_buf_packed(dev, vq,
1939 						avail_idx, &desc_count,
1940 						buf_vec, &nr_vec,
1941 						&buf_id, &len,
1942 						VHOST_ACCESS_RW) < 0))
1943 			return -1;
1944 
1945 		len = RTE_MIN(len, size);
1946 		size -= len;
1947 
1948 		buffer_len[*nr_buffers] = len;
1949 		buffer_buf_id[*nr_buffers] = buf_id;
1950 		buffer_desc_count[*nr_buffers] = desc_count;
1951 		*nr_buffers += 1;
1952 		*nr_descs += desc_count;
1953 		avail_idx += desc_count;
1954 		if (avail_idx >= vq->size)
1955 			avail_idx -= vq->size;
1956 	} while (size > 0);
1957 
1958 	if (unlikely(mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, true) < 0))
1959 		return -1;
1960 
1961 	vhost_async_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
1962 					buffer_desc_count, *nr_buffers);
1963 
1964 	return 0;
1965 }
1966 
1967 static __rte_always_inline int16_t
1968 virtio_dev_rx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1969 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers)
1970 	__rte_exclusive_locks_required(&vq->access_lock)
1971 	__rte_shared_locks_required(&vq->iotlb_lock)
1972 {
1973 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1974 
1975 	if (unlikely(vhost_enqueue_async_packed(dev, vq, pkt, buf_vec,
1976 					nr_descs, nr_buffers) < 0)) {
1977 		VHOST_DATA_LOG(dev->ifname, DEBUG, "failed to get enough desc from vring");
1978 		return -1;
1979 	}
1980 
1981 	VHOST_DATA_LOG(dev->ifname, DEBUG,
1982 		"current index %d | end index %d",
1983 		vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1984 
1985 	return 0;
1986 }
1987 
1988 static __rte_always_inline void
1989 virtio_dev_rx_async_packed_batch_enqueue(struct virtio_net *dev,
1990 			   struct vhost_virtqueue *vq,
1991 			   struct rte_mbuf **pkts,
1992 			   uint64_t *desc_addrs,
1993 			   uint64_t *lens)
1994 	__rte_exclusive_locks_required(&vq->access_lock)
1995 	__rte_shared_locks_required(&vq->iotlb_lock)
1996 {
1997 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1998 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1999 	struct vring_packed_desc *descs = vq->desc_packed;
2000 	struct vhost_async *async = vq->async;
2001 	uint16_t avail_idx = vq->last_avail_idx;
2002 	uint32_t mbuf_offset = 0;
2003 	uint16_t ids[PACKED_BATCH_SIZE];
2004 	uint64_t mapped_len[PACKED_BATCH_SIZE];
2005 	void *host_iova[PACKED_BATCH_SIZE];
2006 	uintptr_t desc;
2007 	uint16_t i;
2008 
2009 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2010 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2011 		desc = vhost_iova_to_vva(dev, vq, desc_addrs[i], &lens[i], VHOST_ACCESS_RW);
2012 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc;
2013 		lens[i] = pkts[i]->pkt_len +
2014 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
2015 	}
2016 
2017 	if (rxvq_is_mergeable(dev)) {
2018 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2019 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
2020 		}
2021 	}
2022 
2023 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2024 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
2025 
2026 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2027 
2028 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2029 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
2030 			desc_addrs[i] + buf_offset, lens[i], &mapped_len[i]);
2031 	}
2032 
2033 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2034 		async_iter_initialize(dev, async);
2035 		async_iter_add_iovec(dev, async,
2036 				(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
2037 				host_iova[i],
2038 				mapped_len[i]);
2039 		async->iter_idx++;
2040 	}
2041 
2042 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2043 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr, lens[i]);
2044 
2045 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2046 		ids[i] = descs[avail_idx + i].id;
2047 
2048 	vhost_async_shadow_enqueue_packed_batch(vq, lens, ids);
2049 }
2050 
2051 static __rte_always_inline int
2052 virtio_dev_rx_async_packed_batch(struct virtio_net *dev,
2053 			   struct vhost_virtqueue *vq,
2054 			   struct rte_mbuf **pkts,
2055 			   int16_t dma_id, uint16_t vchan_id)
2056 	__rte_exclusive_locks_required(&vq->access_lock)
2057 	__rte_shared_locks_required(&vq->iotlb_lock)
2058 {
2059 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
2060 	uint64_t lens[PACKED_BATCH_SIZE];
2061 
2062 	if (virtio_dev_rx_async_batch_check(vq, pkts, desc_addrs, lens, dma_id, vchan_id) == -1)
2063 		return -1;
2064 
2065 	virtio_dev_rx_async_packed_batch_enqueue(dev, vq, pkts, desc_addrs, lens);
2066 
2067 	return 0;
2068 }
2069 
2070 static __rte_always_inline void
2071 dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
2072 			uint32_t nr_err, uint32_t *pkt_idx)
2073 	__rte_exclusive_locks_required(&vq->access_lock)
2074 {
2075 	uint16_t descs_err = 0;
2076 	uint16_t buffers_err = 0;
2077 	struct vhost_async *async = vq->async;
2078 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
2079 
2080 	*pkt_idx -= nr_err;
2081 	/* calculate the sum of buffers and descs of DMA-error packets. */
2082 	while (nr_err-- > 0) {
2083 		descs_err += pkts_info[slot_idx % vq->size].descs;
2084 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
2085 		slot_idx--;
2086 	}
2087 
2088 	if (vq->last_avail_idx >= descs_err) {
2089 		vq->last_avail_idx -= descs_err;
2090 	} else {
2091 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
2092 		vq->avail_wrap_counter ^= 1;
2093 	}
2094 	vhost_virtqueue_reconnect_log_packed(vq);
2095 
2096 	if (async->buffer_idx_packed >= buffers_err)
2097 		async->buffer_idx_packed -= buffers_err;
2098 	else
2099 		async->buffer_idx_packed = async->buffer_idx_packed + vq->size - buffers_err;
2100 }
2101 
2102 static __rte_noinline uint32_t
2103 virtio_dev_rx_async_submit_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2104 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2105 	__rte_exclusive_locks_required(&vq->access_lock)
2106 	__rte_shared_locks_required(&vq->iotlb_lock)
2107 {
2108 	uint32_t pkt_idx = 0;
2109 	uint16_t n_xfer;
2110 	uint16_t num_buffers;
2111 	uint16_t num_descs;
2112 
2113 	struct vhost_async *async = vq->async;
2114 	struct async_inflight_info *pkts_info = async->pkts_info;
2115 	uint32_t pkt_err = 0;
2116 	uint16_t slot_idx = 0;
2117 	uint16_t i;
2118 
2119 	do {
2120 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2121 
2122 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
2123 			if (!virtio_dev_rx_async_packed_batch(dev, vq, &pkts[pkt_idx],
2124 					dma_id, vchan_id)) {
2125 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
2126 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2127 					pkts_info[slot_idx].descs = 1;
2128 					pkts_info[slot_idx].nr_buffers = 1;
2129 					pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2130 					pkt_idx++;
2131 				}
2132 				continue;
2133 			}
2134 		}
2135 
2136 		num_buffers = 0;
2137 		num_descs = 0;
2138 		if (unlikely(virtio_dev_rx_async_packed(dev, vq, pkts[pkt_idx],
2139 						&num_descs, &num_buffers) < 0))
2140 			break;
2141 
2142 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2143 
2144 		pkts_info[slot_idx].descs = num_descs;
2145 		pkts_info[slot_idx].nr_buffers = num_buffers;
2146 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2147 
2148 		pkt_idx++;
2149 		vq_inc_last_avail_packed(vq, num_descs);
2150 	} while (pkt_idx < count);
2151 
2152 	if (unlikely(pkt_idx == 0))
2153 		return 0;
2154 
2155 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
2156 			async->iov_iter, pkt_idx);
2157 
2158 	async_iter_reset(async);
2159 
2160 	pkt_err = pkt_idx - n_xfer;
2161 	if (unlikely(pkt_err)) {
2162 		VHOST_DATA_LOG(dev->ifname, DEBUG,
2163 			"%s: failed to transfer %u packets for queue %u.",
2164 			__func__, pkt_err, vq->index);
2165 		dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
2166 	}
2167 
2168 	async->pkts_idx += pkt_idx;
2169 	if (async->pkts_idx >= vq->size)
2170 		async->pkts_idx -= vq->size;
2171 
2172 	async->pkts_inflight_n += pkt_idx;
2173 
2174 	return pkt_idx;
2175 }
2176 
2177 static __rte_always_inline void
2178 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
2179 	__rte_shared_locks_required(&vq->access_lock)
2180 {
2181 	struct vhost_async *async = vq->async;
2182 	uint16_t nr_left = n_descs;
2183 	uint16_t nr_copy;
2184 	uint16_t to, from;
2185 
2186 	do {
2187 		from = async->last_desc_idx_split & (vq->size - 1);
2188 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
2189 		to = vq->last_used_idx & (vq->size - 1);
2190 
2191 		if (to + nr_copy <= vq->size) {
2192 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2193 					nr_copy * sizeof(struct vring_used_elem));
2194 		} else {
2195 			uint16_t size = vq->size - to;
2196 
2197 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2198 					size * sizeof(struct vring_used_elem));
2199 			rte_memcpy(&vq->used->ring[0], &async->descs_split[from + size],
2200 					(nr_copy - size) * sizeof(struct vring_used_elem));
2201 		}
2202 
2203 		async->last_desc_idx_split += nr_copy;
2204 		vq->last_used_idx += nr_copy;
2205 		nr_left -= nr_copy;
2206 	} while (nr_left > 0);
2207 }
2208 
2209 static __rte_always_inline void
2210 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
2211 				uint16_t n_buffers)
2212 	__rte_shared_locks_required(&vq->access_lock)
2213 {
2214 	struct vhost_async *async = vq->async;
2215 	uint16_t from = async->last_buffer_idx_packed;
2216 	uint16_t used_idx = vq->last_used_idx;
2217 	uint16_t head_idx = vq->last_used_idx;
2218 	uint16_t head_flags = 0;
2219 	uint16_t i;
2220 
2221 	/* Split loop in two to save memory barriers */
2222 	for (i = 0; i < n_buffers; i++) {
2223 		vq->desc_packed[used_idx].id = async->buffers_packed[from].id;
2224 		vq->desc_packed[used_idx].len = async->buffers_packed[from].len;
2225 
2226 		used_idx += async->buffers_packed[from].count;
2227 		if (used_idx >= vq->size)
2228 			used_idx -= vq->size;
2229 
2230 		from++;
2231 		if (from >= vq->size)
2232 			from = 0;
2233 	}
2234 
2235 	/* The ordering for storing desc flags needs to be enforced. */
2236 	rte_atomic_thread_fence(rte_memory_order_release);
2237 
2238 	from = async->last_buffer_idx_packed;
2239 
2240 	for (i = 0; i < n_buffers; i++) {
2241 		uint16_t flags;
2242 
2243 		if (async->buffers_packed[from].len)
2244 			flags = VRING_DESC_F_WRITE;
2245 		else
2246 			flags = 0;
2247 
2248 		if (vq->used_wrap_counter) {
2249 			flags |= VRING_DESC_F_USED;
2250 			flags |= VRING_DESC_F_AVAIL;
2251 		} else {
2252 			flags &= ~VRING_DESC_F_USED;
2253 			flags &= ~VRING_DESC_F_AVAIL;
2254 		}
2255 
2256 		if (i > 0) {
2257 			vq->desc_packed[vq->last_used_idx].flags = flags;
2258 		} else {
2259 			head_idx = vq->last_used_idx;
2260 			head_flags = flags;
2261 		}
2262 
2263 		vq_inc_last_used_packed(vq, async->buffers_packed[from].count);
2264 
2265 		from++;
2266 		if (from == vq->size)
2267 			from = 0;
2268 	}
2269 
2270 	vq->desc_packed[head_idx].flags = head_flags;
2271 	async->last_buffer_idx_packed = from;
2272 }
2273 
2274 static __rte_always_inline uint16_t
2275 vhost_poll_enqueue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2276 	struct rte_mbuf **pkts, uint16_t count, int16_t dma_id, uint16_t vchan_id)
2277 	__rte_shared_locks_required(&vq->access_lock)
2278 {
2279 	struct vhost_async *async = vq->async;
2280 	struct async_inflight_info *pkts_info = async->pkts_info;
2281 	uint16_t nr_cpl_pkts = 0;
2282 	uint16_t n_descs = 0, n_buffers = 0;
2283 	uint16_t start_idx, from, i;
2284 
2285 	/* Check completed copies for the given DMA vChannel */
2286 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
2287 
2288 	start_idx = async_get_first_inflight_pkt_idx(vq);
2289 	/**
2290 	 * Calculate the number of copy completed packets.
2291 	 * Note that there may be completed packets even if
2292 	 * no copies are reported done by the given DMA vChannel,
2293 	 * as it's possible that a virtqueue uses multiple DMA
2294 	 * vChannels.
2295 	 */
2296 	from = start_idx;
2297 	while (vq->async->pkts_cmpl_flag[from] && count--) {
2298 		vq->async->pkts_cmpl_flag[from] = false;
2299 		from++;
2300 		if (from >= vq->size)
2301 			from -= vq->size;
2302 		nr_cpl_pkts++;
2303 	}
2304 
2305 	if (nr_cpl_pkts == 0)
2306 		return 0;
2307 
2308 	for (i = 0; i < nr_cpl_pkts; i++) {
2309 		from = (start_idx + i) % vq->size;
2310 		/* Only used with packed ring */
2311 		n_buffers += pkts_info[from].nr_buffers;
2312 		/* Only used with split ring */
2313 		n_descs += pkts_info[from].descs;
2314 		pkts[i] = pkts_info[from].mbuf;
2315 	}
2316 
2317 	async->pkts_inflight_n -= nr_cpl_pkts;
2318 
2319 	if (likely(vq->enabled && vq->access_ok)) {
2320 		if (vq_is_packed(dev)) {
2321 			write_back_completed_descs_packed(vq, n_buffers);
2322 			vhost_vring_call_packed(dev, vq);
2323 		} else {
2324 			write_back_completed_descs_split(vq, n_descs);
2325 			rte_atomic_fetch_add_explicit(
2326 				(unsigned short __rte_atomic *)&vq->used->idx,
2327 				n_descs, rte_memory_order_release);
2328 			vhost_vring_call_split(dev, vq);
2329 		}
2330 	} else {
2331 		if (vq_is_packed(dev)) {
2332 			async->last_buffer_idx_packed += n_buffers;
2333 			if (async->last_buffer_idx_packed >= vq->size)
2334 				async->last_buffer_idx_packed -= vq->size;
2335 		} else {
2336 			async->last_desc_idx_split += n_descs;
2337 		}
2338 	}
2339 
2340 	return nr_cpl_pkts;
2341 }
2342 
2343 uint16_t
2344 rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
2345 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2346 		uint16_t vchan_id)
2347 {
2348 	struct virtio_net *dev = get_device(vid);
2349 	struct vhost_virtqueue *vq;
2350 	uint16_t n_pkts_cpl = 0;
2351 
2352 	if (unlikely(!dev))
2353 		return 0;
2354 
2355 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2356 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2357 		VHOST_DATA_LOG(dev->ifname, ERR,
2358 			"%s: invalid virtqueue idx %d.",
2359 			__func__, queue_id);
2360 		return 0;
2361 	}
2362 
2363 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2364 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2365 		VHOST_DATA_LOG(dev->ifname, ERR,
2366 			"%s: invalid channel %d:%u.",
2367 			__func__, dma_id, vchan_id);
2368 		return 0;
2369 	}
2370 
2371 	vq = dev->virtqueue[queue_id];
2372 
2373 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2374 		VHOST_DATA_LOG(dev->ifname, DEBUG,
2375 			"%s: virtqueue %u is busy.",
2376 			__func__, queue_id);
2377 		return 0;
2378 	}
2379 
2380 	if (unlikely(!vq->async)) {
2381 		VHOST_DATA_LOG(dev->ifname, ERR,
2382 			"%s: async not registered for virtqueue %d.",
2383 			__func__, queue_id);
2384 		goto out;
2385 	}
2386 
2387 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count, dma_id, vchan_id);
2388 
2389 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2390 	vq->stats.inflight_completed += n_pkts_cpl;
2391 
2392 out:
2393 	rte_rwlock_read_unlock(&vq->access_lock);
2394 
2395 	return n_pkts_cpl;
2396 }
2397 
2398 uint16_t
2399 rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
2400 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2401 		uint16_t vchan_id)
2402 {
2403 	struct virtio_net *dev = get_device(vid);
2404 	struct vhost_virtqueue *vq;
2405 	uint16_t n_pkts_cpl = 0;
2406 
2407 	if (!dev)
2408 		return 0;
2409 
2410 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2411 	if (unlikely(queue_id >= dev->nr_vring)) {
2412 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %d.",
2413 			__func__, queue_id);
2414 		return 0;
2415 	}
2416 
2417 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2418 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
2419 			__func__, dma_id);
2420 		return 0;
2421 	}
2422 
2423 	vq = dev->virtqueue[queue_id];
2424 
2425 	vq_assert_lock(dev, vq);
2426 
2427 	if (unlikely(!vq->async)) {
2428 		VHOST_DATA_LOG(dev->ifname, ERR,
2429 			"%s: async not registered for virtqueue %d.",
2430 			__func__, queue_id);
2431 		return 0;
2432 	}
2433 
2434 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2435 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2436 		VHOST_DATA_LOG(dev->ifname, ERR,
2437 			"%s: invalid channel %d:%u.",
2438 			__func__, dma_id, vchan_id);
2439 		return 0;
2440 	}
2441 
2442 	if ((queue_id & 1) == 0)
2443 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2444 			dma_id, vchan_id);
2445 	else
2446 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2447 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2448 
2449 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2450 	vq->stats.inflight_completed += n_pkts_cpl;
2451 
2452 	return n_pkts_cpl;
2453 }
2454 
2455 uint16_t
2456 rte_vhost_clear_queue(int vid, uint16_t queue_id, struct rte_mbuf **pkts,
2457 		uint16_t count, int16_t dma_id, uint16_t vchan_id)
2458 {
2459 	struct virtio_net *dev = get_device(vid);
2460 	struct vhost_virtqueue *vq;
2461 	uint16_t n_pkts_cpl = 0;
2462 
2463 	if (!dev)
2464 		return 0;
2465 
2466 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2467 	if (unlikely(queue_id >= dev->nr_vring)) {
2468 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %u.",
2469 			__func__, queue_id);
2470 		return 0;
2471 	}
2472 
2473 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2474 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
2475 			__func__, dma_id);
2476 		return 0;
2477 	}
2478 
2479 	vq = dev->virtqueue[queue_id];
2480 
2481 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2482 		VHOST_DATA_LOG(dev->ifname, DEBUG, "%s: virtqueue %u is busy.",
2483 			__func__, queue_id);
2484 		return 0;
2485 	}
2486 
2487 	if (unlikely(!vq->async)) {
2488 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: async not registered for queue id %u.",
2489 			__func__, queue_id);
2490 		goto out_access_unlock;
2491 	}
2492 
2493 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2494 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2495 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid channel %d:%u.",
2496 			__func__, dma_id, vchan_id);
2497 		goto out_access_unlock;
2498 	}
2499 
2500 	if ((queue_id & 1) == 0)
2501 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2502 			dma_id, vchan_id);
2503 	else
2504 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2505 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2506 
2507 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2508 	vq->stats.inflight_completed += n_pkts_cpl;
2509 
2510 out_access_unlock:
2511 	rte_rwlock_read_unlock(&vq->access_lock);
2512 
2513 	return n_pkts_cpl;
2514 }
2515 
2516 static __rte_always_inline uint32_t
2517 virtio_dev_rx_async_submit(struct virtio_net *dev, struct vhost_virtqueue *vq,
2518 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2519 {
2520 	uint32_t nb_tx = 0;
2521 
2522 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2523 
2524 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2525 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2526 		VHOST_DATA_LOG(dev->ifname, ERR,
2527 			"%s: invalid channel %d:%u.",
2528 			 __func__, dma_id, vchan_id);
2529 		return 0;
2530 	}
2531 
2532 	rte_rwlock_write_lock(&vq->access_lock);
2533 
2534 	if (unlikely(!vq->enabled || !vq->async))
2535 		goto out_access_unlock;
2536 
2537 	vhost_user_iotlb_rd_lock(vq);
2538 
2539 	if (unlikely(!vq->access_ok)) {
2540 		vhost_user_iotlb_rd_unlock(vq);
2541 		rte_rwlock_write_unlock(&vq->access_lock);
2542 
2543 		virtio_dev_vring_translate(dev, vq);
2544 		goto out_no_unlock;
2545 	}
2546 
2547 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
2548 	if (count == 0)
2549 		goto out;
2550 
2551 	if (vq_is_packed(dev))
2552 		nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, pkts, count,
2553 			dma_id, vchan_id);
2554 	else
2555 		nb_tx = virtio_dev_rx_async_submit_split(dev, vq, pkts, count,
2556 			dma_id, vchan_id);
2557 
2558 	vq->stats.inflight_submitted += nb_tx;
2559 
2560 out:
2561 	vhost_user_iotlb_rd_unlock(vq);
2562 
2563 out_access_unlock:
2564 	rte_rwlock_write_unlock(&vq->access_lock);
2565 
2566 out_no_unlock:
2567 	return nb_tx;
2568 }
2569 
2570 uint16_t
2571 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2572 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2573 		uint16_t vchan_id)
2574 {
2575 	struct virtio_net *dev = get_device(vid);
2576 
2577 	if (!dev)
2578 		return 0;
2579 
2580 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2581 		VHOST_DATA_LOG(dev->ifname, ERR,
2582 			"%s: built-in vhost net backend is disabled.",
2583 			__func__);
2584 		return 0;
2585 	}
2586 
2587 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2588 		VHOST_DATA_LOG(dev->ifname, ERR,
2589 			"%s: invalid virtqueue idx %d.",
2590 			__func__, queue_id);
2591 		return 0;
2592 	}
2593 
2594 	return virtio_dev_rx_async_submit(dev, dev->virtqueue[queue_id], pkts, count,
2595 		dma_id, vchan_id);
2596 }
2597 
2598 static inline bool
2599 virtio_net_with_host_offload(struct virtio_net *dev)
2600 {
2601 	if (dev->features &
2602 			((1ULL << VIRTIO_NET_F_CSUM) |
2603 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2604 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2605 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2606 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2607 		return true;
2608 
2609 	return false;
2610 }
2611 
2612 static int
2613 parse_headers(struct rte_mbuf *m, uint8_t *l4_proto)
2614 {
2615 	struct rte_ipv4_hdr *ipv4_hdr;
2616 	struct rte_ipv6_hdr *ipv6_hdr;
2617 	struct rte_ether_hdr *eth_hdr;
2618 	uint16_t ethertype;
2619 	uint16_t data_len = rte_pktmbuf_data_len(m);
2620 
2621 	if (data_len < sizeof(struct rte_ether_hdr))
2622 		return -EINVAL;
2623 
2624 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2625 
2626 	m->l2_len = sizeof(struct rte_ether_hdr);
2627 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2628 
2629 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2630 		if (data_len < sizeof(struct rte_ether_hdr) +
2631 				sizeof(struct rte_vlan_hdr))
2632 			goto error;
2633 
2634 		struct rte_vlan_hdr *vlan_hdr =
2635 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2636 
2637 		m->l2_len += sizeof(struct rte_vlan_hdr);
2638 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2639 	}
2640 
2641 	switch (ethertype) {
2642 	case RTE_ETHER_TYPE_IPV4:
2643 		if (data_len < m->l2_len + sizeof(struct rte_ipv4_hdr))
2644 			goto error;
2645 		ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
2646 				m->l2_len);
2647 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2648 		if (data_len < m->l2_len + m->l3_len)
2649 			goto error;
2650 		m->ol_flags |= RTE_MBUF_F_TX_IPV4;
2651 		*l4_proto = ipv4_hdr->next_proto_id;
2652 		break;
2653 	case RTE_ETHER_TYPE_IPV6:
2654 		if (data_len < m->l2_len + sizeof(struct rte_ipv6_hdr))
2655 			goto error;
2656 		ipv6_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
2657 				m->l2_len);
2658 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2659 		m->ol_flags |= RTE_MBUF_F_TX_IPV6;
2660 		*l4_proto = ipv6_hdr->proto;
2661 		break;
2662 	default:
2663 		/* a valid L3 header is needed for further L4 parsing */
2664 		goto error;
2665 	}
2666 
2667 	/* both CSUM and GSO need a valid L4 header */
2668 	switch (*l4_proto) {
2669 	case IPPROTO_TCP:
2670 		if (data_len < m->l2_len + m->l3_len +
2671 				sizeof(struct rte_tcp_hdr))
2672 			goto error;
2673 		break;
2674 	case IPPROTO_UDP:
2675 		if (data_len < m->l2_len + m->l3_len +
2676 				sizeof(struct rte_udp_hdr))
2677 			goto error;
2678 		break;
2679 	case IPPROTO_SCTP:
2680 		if (data_len < m->l2_len + m->l3_len +
2681 				sizeof(struct rte_sctp_hdr))
2682 			goto error;
2683 		break;
2684 	default:
2685 		goto error;
2686 	}
2687 
2688 	return 0;
2689 
2690 error:
2691 	m->l2_len = 0;
2692 	m->l3_len = 0;
2693 	m->ol_flags = 0;
2694 	return -EINVAL;
2695 }
2696 
2697 static __rte_always_inline void
2698 vhost_dequeue_offload_legacy(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2699 		struct rte_mbuf *m)
2700 {
2701 	uint8_t l4_proto = 0;
2702 	struct rte_tcp_hdr *tcp_hdr = NULL;
2703 	uint16_t tcp_len;
2704 	uint16_t data_len = rte_pktmbuf_data_len(m);
2705 
2706 	if (parse_headers(m, &l4_proto) < 0)
2707 		return;
2708 
2709 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2710 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2711 			switch (hdr->csum_offset) {
2712 			case (offsetof(struct rte_tcp_hdr, cksum)):
2713 				if (l4_proto != IPPROTO_TCP)
2714 					goto error;
2715 				m->ol_flags |= RTE_MBUF_F_TX_TCP_CKSUM;
2716 				break;
2717 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2718 				if (l4_proto != IPPROTO_UDP)
2719 					goto error;
2720 				m->ol_flags |= RTE_MBUF_F_TX_UDP_CKSUM;
2721 				break;
2722 			case (offsetof(struct rte_sctp_hdr, cksum)):
2723 				if (l4_proto != IPPROTO_SCTP)
2724 					goto error;
2725 				m->ol_flags |= RTE_MBUF_F_TX_SCTP_CKSUM;
2726 				break;
2727 			default:
2728 				goto error;
2729 			}
2730 		} else {
2731 			goto error;
2732 		}
2733 	}
2734 
2735 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2736 		if (hdr->gso_size == 0)
2737 			goto error;
2738 
2739 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2740 		case VIRTIO_NET_HDR_GSO_TCPV4:
2741 		case VIRTIO_NET_HDR_GSO_TCPV6:
2742 			if (l4_proto != IPPROTO_TCP)
2743 				goto error;
2744 			tcp_hdr = rte_pktmbuf_mtod_offset(m,
2745 					struct rte_tcp_hdr *,
2746 					m->l2_len + m->l3_len);
2747 			tcp_len = (tcp_hdr->data_off & 0xf0) >> 2;
2748 			if (data_len < m->l2_len + m->l3_len + tcp_len)
2749 				goto error;
2750 			m->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
2751 			m->tso_segsz = hdr->gso_size;
2752 			m->l4_len = tcp_len;
2753 			break;
2754 		case VIRTIO_NET_HDR_GSO_UDP:
2755 			if (l4_proto != IPPROTO_UDP)
2756 				goto error;
2757 			m->ol_flags |= RTE_MBUF_F_TX_UDP_SEG;
2758 			m->tso_segsz = hdr->gso_size;
2759 			m->l4_len = sizeof(struct rte_udp_hdr);
2760 			break;
2761 		default:
2762 			VHOST_DATA_LOG(dev->ifname, WARNING,
2763 				"unsupported gso type %u.",
2764 				hdr->gso_type);
2765 			goto error;
2766 		}
2767 	}
2768 	return;
2769 
2770 error:
2771 	m->l2_len = 0;
2772 	m->l3_len = 0;
2773 	m->ol_flags = 0;
2774 }
2775 
2776 static __rte_always_inline void
2777 vhost_dequeue_offload(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2778 		struct rte_mbuf *m, bool legacy_ol_flags)
2779 {
2780 	struct rte_net_hdr_lens hdr_lens;
2781 	int l4_supported = 0;
2782 	uint32_t ptype;
2783 
2784 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2785 		return;
2786 
2787 	if (legacy_ol_flags) {
2788 		vhost_dequeue_offload_legacy(dev, hdr, m);
2789 		return;
2790 	}
2791 
2792 	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
2793 
2794 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2795 	m->packet_type = ptype;
2796 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2797 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2798 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2799 		l4_supported = 1;
2800 
2801 	/* According to Virtio 1.1 spec, the device only needs to look at
2802 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2803 	 * This differs from the processing incoming packets path where the
2804 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2805 	 * device.
2806 	 *
2807 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2808 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2809 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2810 	 *
2811 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2812 	 * The device MUST ignore flag bits that it does not recognize.
2813 	 */
2814 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2815 		uint32_t hdrlen;
2816 
2817 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2818 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2819 			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
2820 		} else {
2821 			/* Unknown proto or tunnel, do sw cksum. We can assume
2822 			 * the cksum field is in the first segment since the
2823 			 * buffers we provided to the host are large enough.
2824 			 * In case of SCTP, this will be wrong since it's a CRC
2825 			 * but there's nothing we can do.
2826 			 */
2827 			uint16_t csum = 0, off;
2828 
2829 			if (hdr->csum_start >= rte_pktmbuf_pkt_len(m))
2830 				return;
2831 
2832 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2833 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2834 				return;
2835 			if (likely(csum != 0xffff))
2836 				csum = ~csum;
2837 			off = hdr->csum_offset + hdr->csum_start;
2838 			if (rte_pktmbuf_data_len(m) >= off + 1)
2839 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2840 		}
2841 	}
2842 
2843 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2844 		if (hdr->gso_size == 0)
2845 			return;
2846 
2847 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2848 		case VIRTIO_NET_HDR_GSO_TCPV4:
2849 		case VIRTIO_NET_HDR_GSO_TCPV6:
2850 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2851 				break;
2852 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2853 			m->tso_segsz = hdr->gso_size;
2854 			break;
2855 		case VIRTIO_NET_HDR_GSO_UDP:
2856 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2857 				break;
2858 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2859 			m->tso_segsz = hdr->gso_size;
2860 			break;
2861 		default:
2862 			break;
2863 		}
2864 	}
2865 }
2866 
2867 static __rte_noinline void
2868 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2869 		struct buf_vector *buf_vec)
2870 {
2871 	uint64_t len;
2872 	uint64_t remain = sizeof(struct virtio_net_hdr);
2873 	uint64_t src;
2874 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2875 
2876 	while (remain) {
2877 		len = RTE_MIN(remain, buf_vec->buf_len);
2878 		src = buf_vec->buf_addr;
2879 		rte_memcpy((void *)(uintptr_t)dst,
2880 				(void *)(uintptr_t)src, len);
2881 
2882 		remain -= len;
2883 		dst += len;
2884 		buf_vec++;
2885 	}
2886 }
2887 
2888 static __rte_always_inline int
2889 desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2890 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2891 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2892 		  bool legacy_ol_flags, uint16_t slot_idx, bool is_async)
2893 	__rte_shared_locks_required(&vq->access_lock)
2894 	__rte_shared_locks_required(&vq->iotlb_lock)
2895 {
2896 	uint32_t buf_avail, buf_offset, buf_len;
2897 	uint64_t buf_addr, buf_iova;
2898 	uint32_t mbuf_avail, mbuf_offset;
2899 	uint32_t hdr_remain = dev->vhost_hlen;
2900 	uint32_t cpy_len;
2901 	struct rte_mbuf *cur = m, *prev = m;
2902 	struct virtio_net_hdr tmp_hdr;
2903 	struct virtio_net_hdr *hdr = NULL;
2904 	uint16_t vec_idx;
2905 	struct vhost_async *async = vq->async;
2906 	struct async_inflight_info *pkts_info;
2907 
2908 	/*
2909 	 * The caller has checked the descriptors chain is larger than the
2910 	 * header size.
2911 	 */
2912 
2913 	if (virtio_net_with_host_offload(dev)) {
2914 		if (unlikely(buf_vec[0].buf_len < sizeof(struct virtio_net_hdr))) {
2915 			/*
2916 			 * No luck, the virtio-net header doesn't fit
2917 			 * in a contiguous virtual area.
2918 			 */
2919 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2920 			hdr = &tmp_hdr;
2921 		} else {
2922 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_vec[0].buf_addr);
2923 		}
2924 	}
2925 
2926 	for (vec_idx = 0; vec_idx < nr_vec; vec_idx++) {
2927 		if (buf_vec[vec_idx].buf_len > hdr_remain)
2928 			break;
2929 
2930 		hdr_remain -= buf_vec[vec_idx].buf_len;
2931 	}
2932 
2933 	buf_addr = buf_vec[vec_idx].buf_addr;
2934 	buf_iova = buf_vec[vec_idx].buf_iova;
2935 	buf_len = buf_vec[vec_idx].buf_len;
2936 	buf_offset = hdr_remain;
2937 	buf_avail = buf_vec[vec_idx].buf_len - hdr_remain;
2938 
2939 	PRINT_PACKET(dev,
2940 			(uintptr_t)(buf_addr + buf_offset),
2941 			(uint32_t)buf_avail, 0);
2942 
2943 	mbuf_offset = 0;
2944 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2945 
2946 	if (is_async) {
2947 		pkts_info = async->pkts_info;
2948 		if (async_iter_initialize(dev, async))
2949 			return -1;
2950 	}
2951 
2952 	while (1) {
2953 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2954 
2955 		if (is_async) {
2956 			if (async_fill_seg(dev, vq, cur, mbuf_offset,
2957 					   buf_iova + buf_offset, cpy_len, false) < 0)
2958 				goto error;
2959 		} else if (likely(hdr && cur == m)) {
2960 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
2961 				(void *)((uintptr_t)(buf_addr + buf_offset)),
2962 				cpy_len);
2963 		} else {
2964 			sync_fill_seg(dev, vq, cur, mbuf_offset,
2965 				      buf_addr + buf_offset,
2966 				      buf_iova + buf_offset, cpy_len, false);
2967 		}
2968 
2969 		mbuf_avail  -= cpy_len;
2970 		mbuf_offset += cpy_len;
2971 		buf_avail -= cpy_len;
2972 		buf_offset += cpy_len;
2973 
2974 		/* This buf reaches to its end, get the next one */
2975 		if (buf_avail == 0) {
2976 			if (++vec_idx >= nr_vec)
2977 				break;
2978 
2979 			buf_addr = buf_vec[vec_idx].buf_addr;
2980 			buf_iova = buf_vec[vec_idx].buf_iova;
2981 			buf_len = buf_vec[vec_idx].buf_len;
2982 
2983 			buf_offset = 0;
2984 			buf_avail  = buf_len;
2985 
2986 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2987 					(uint32_t)buf_avail, 0);
2988 		}
2989 
2990 		/*
2991 		 * This mbuf reaches to its end, get a new one
2992 		 * to hold more data.
2993 		 */
2994 		if (mbuf_avail == 0) {
2995 			cur = rte_pktmbuf_alloc(mbuf_pool);
2996 			if (unlikely(cur == NULL)) {
2997 				vq->stats.mbuf_alloc_failed++;
2998 				VHOST_DATA_LOG(dev->ifname, ERR,
2999 					"failed to allocate memory for mbuf.");
3000 				goto error;
3001 			}
3002 
3003 			prev->next = cur;
3004 			prev->data_len = mbuf_offset;
3005 			m->nb_segs += 1;
3006 			m->pkt_len += mbuf_offset;
3007 			prev = cur;
3008 
3009 			mbuf_offset = 0;
3010 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
3011 		}
3012 	}
3013 
3014 	prev->data_len = mbuf_offset;
3015 	m->pkt_len    += mbuf_offset;
3016 
3017 	if (is_async) {
3018 		async_iter_finalize(async);
3019 		if (hdr)
3020 			pkts_info[slot_idx].nethdr = *hdr;
3021 	} else if (hdr) {
3022 		vhost_dequeue_offload(dev, hdr, m, legacy_ol_flags);
3023 	}
3024 
3025 	return 0;
3026 error:
3027 	if (is_async)
3028 		async_iter_cancel(async);
3029 
3030 	return -1;
3031 }
3032 
3033 static void
3034 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
3035 {
3036 	rte_free(opaque);
3037 }
3038 
3039 static int
3040 virtio_dev_extbuf_alloc(struct virtio_net *dev, struct rte_mbuf *pkt, uint32_t size)
3041 {
3042 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
3043 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
3044 	uint16_t buf_len;
3045 	rte_iova_t iova;
3046 	void *buf;
3047 
3048 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
3049 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
3050 
3051 	if (unlikely(total_len > UINT16_MAX))
3052 		return -ENOSPC;
3053 
3054 	buf_len = total_len;
3055 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
3056 	if (unlikely(buf == NULL))
3057 		return -ENOMEM;
3058 
3059 	/* Initialize shinfo */
3060 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
3061 						virtio_dev_extbuf_free, buf);
3062 	if (unlikely(shinfo == NULL)) {
3063 		rte_free(buf);
3064 		VHOST_DATA_LOG(dev->ifname, ERR, "failed to init shinfo");
3065 		return -1;
3066 	}
3067 
3068 	iova = rte_malloc_virt2iova(buf);
3069 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
3070 	rte_pktmbuf_reset_headroom(pkt);
3071 
3072 	return 0;
3073 }
3074 
3075 /*
3076  * Prepare a host supported pktmbuf.
3077  */
3078 static __rte_always_inline int
3079 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
3080 			 uint32_t data_len)
3081 {
3082 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
3083 		return 0;
3084 
3085 	/* attach an external buffer if supported */
3086 	if (dev->extbuf && !virtio_dev_extbuf_alloc(dev, pkt, data_len))
3087 		return 0;
3088 
3089 	/* check if chained buffers are allowed */
3090 	if (!dev->linearbuf)
3091 		return 0;
3092 
3093 	return -1;
3094 }
3095 
3096 __rte_always_inline
3097 static uint16_t
3098 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3099 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3100 	bool legacy_ol_flags)
3101 	__rte_shared_locks_required(&vq->access_lock)
3102 	__rte_shared_locks_required(&vq->iotlb_lock)
3103 {
3104 	uint16_t i;
3105 	uint16_t avail_entries;
3106 	static bool allocerr_warned;
3107 
3108 	/*
3109 	 * The ordering between avail index and
3110 	 * desc reads needs to be enforced.
3111 	 */
3112 	avail_entries = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
3113 		rte_memory_order_acquire) - vq->last_avail_idx;
3114 	if (avail_entries == 0)
3115 		return 0;
3116 
3117 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3118 
3119 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
3120 
3121 	count = RTE_MIN(count, MAX_PKT_BURST);
3122 	count = RTE_MIN(count, avail_entries);
3123 	VHOST_DATA_LOG(dev->ifname, DEBUG, "about to dequeue %u buffers", count);
3124 
3125 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count)) {
3126 		vq->stats.mbuf_alloc_failed += count;
3127 		return 0;
3128 	}
3129 
3130 	for (i = 0; i < count; i++) {
3131 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3132 		uint16_t head_idx;
3133 		uint32_t buf_len;
3134 		uint16_t nr_vec = 0;
3135 		int err;
3136 
3137 		if (unlikely(fill_vec_buf_split(dev, vq,
3138 						vq->last_avail_idx + i,
3139 						&nr_vec, buf_vec,
3140 						&head_idx, &buf_len,
3141 						VHOST_ACCESS_RO) < 0))
3142 			break;
3143 
3144 		update_shadow_used_ring_split(vq, head_idx, 0);
3145 
3146 		if (unlikely(buf_len <= dev->vhost_hlen))
3147 			break;
3148 
3149 		buf_len -= dev->vhost_hlen;
3150 
3151 		err = virtio_dev_pktmbuf_prep(dev, pkts[i], buf_len);
3152 		if (unlikely(err)) {
3153 			/*
3154 			 * mbuf allocation fails for jumbo packets when external
3155 			 * buffer allocation is not allowed and linear buffer
3156 			 * is required. Drop this packet.
3157 			 */
3158 			if (!allocerr_warned) {
3159 				VHOST_DATA_LOG(dev->ifname, ERR,
3160 					"failed mbuf alloc of size %d from %s.",
3161 					buf_len, mbuf_pool->name);
3162 				allocerr_warned = true;
3163 			}
3164 			break;
3165 		}
3166 
3167 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
3168 				   mbuf_pool, legacy_ol_flags, 0, false);
3169 		if (unlikely(err)) {
3170 			if (!allocerr_warned) {
3171 				VHOST_DATA_LOG(dev->ifname, ERR, "failed to copy desc to mbuf.");
3172 				allocerr_warned = true;
3173 			}
3174 			break;
3175 		}
3176 	}
3177 
3178 	if (unlikely(count != i))
3179 		rte_pktmbuf_free_bulk(&pkts[i], count - i);
3180 
3181 	if (likely(vq->shadow_used_idx)) {
3182 		vq->last_avail_idx += vq->shadow_used_idx;
3183 		vhost_virtqueue_reconnect_log_split(vq);
3184 		do_data_copy_dequeue(vq);
3185 		flush_shadow_used_ring_split(dev, vq);
3186 		vhost_vring_call_split(dev, vq);
3187 	}
3188 
3189 	return i;
3190 }
3191 
3192 __rte_noinline
3193 static uint16_t
3194 virtio_dev_tx_split_legacy(struct virtio_net *dev,
3195 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3196 	struct rte_mbuf **pkts, uint16_t count)
3197 	__rte_shared_locks_required(&vq->access_lock)
3198 	__rte_shared_locks_required(&vq->iotlb_lock)
3199 {
3200 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
3201 }
3202 
3203 __rte_noinline
3204 static uint16_t
3205 virtio_dev_tx_split_compliant(struct virtio_net *dev,
3206 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3207 	struct rte_mbuf **pkts, uint16_t count)
3208 	__rte_shared_locks_required(&vq->access_lock)
3209 	__rte_shared_locks_required(&vq->iotlb_lock)
3210 {
3211 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
3212 }
3213 
3214 static __rte_always_inline int
3215 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
3216 				 struct vhost_virtqueue *vq,
3217 				 struct rte_mbuf **pkts,
3218 				 uint16_t avail_idx,
3219 				 uintptr_t *desc_addrs,
3220 				 uint16_t *ids)
3221 	__rte_shared_locks_required(&vq->iotlb_lock)
3222 {
3223 	bool wrap = vq->avail_wrap_counter;
3224 	struct vring_packed_desc *descs = vq->desc_packed;
3225 	uint64_t lens[PACKED_BATCH_SIZE];
3226 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3227 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3228 	uint16_t flags, i;
3229 
3230 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3231 		return -1;
3232 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3233 		return -1;
3234 
3235 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3236 		flags = descs[avail_idx + i].flags;
3237 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3238 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3239 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3240 			return -1;
3241 	}
3242 
3243 	rte_atomic_thread_fence(rte_memory_order_acquire);
3244 
3245 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3246 		lens[i] = descs[avail_idx + i].len;
3247 
3248 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3249 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
3250 						  descs[avail_idx + i].addr,
3251 						  &lens[i], VHOST_ACCESS_RW);
3252 	}
3253 
3254 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3255 		if (unlikely(!desc_addrs[i]))
3256 			return -1;
3257 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3258 			return -1;
3259 	}
3260 
3261 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3262 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3263 			goto err;
3264 	}
3265 
3266 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3267 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3268 
3269 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3270 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3271 			goto err;
3272 	}
3273 
3274 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3275 		pkts[i]->pkt_len = lens[i] - buf_offset;
3276 		pkts[i]->data_len = pkts[i]->pkt_len;
3277 		ids[i] = descs[avail_idx + i].id;
3278 	}
3279 
3280 	return 0;
3281 
3282 err:
3283 	return -1;
3284 }
3285 
3286 static __rte_always_inline int
3287 vhost_async_tx_batch_packed_check(struct virtio_net *dev,
3288 				 struct vhost_virtqueue *vq,
3289 				 struct rte_mbuf **pkts,
3290 				 uint16_t avail_idx,
3291 				 uintptr_t *desc_addrs,
3292 				 uint64_t *lens,
3293 				 uint16_t *ids,
3294 				 int16_t dma_id,
3295 				 uint16_t vchan_id)
3296 {
3297 	bool wrap = vq->avail_wrap_counter;
3298 	struct vring_packed_desc *descs = vq->desc_packed;
3299 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3300 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3301 	uint16_t flags, i;
3302 
3303 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3304 		return -1;
3305 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3306 		return -1;
3307 
3308 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3309 		flags = descs[avail_idx + i].flags;
3310 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3311 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3312 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3313 			return -1;
3314 	}
3315 
3316 	rte_atomic_thread_fence(rte_memory_order_acquire);
3317 
3318 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3319 		lens[i] = descs[avail_idx + i].len;
3320 
3321 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3322 		desc_addrs[i] = descs[avail_idx + i].addr;
3323 	}
3324 
3325 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3326 		if (unlikely(!desc_addrs[i]))
3327 			return -1;
3328 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3329 			return -1;
3330 	}
3331 
3332 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3333 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3334 			goto err;
3335 	}
3336 
3337 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3338 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3339 
3340 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3341 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3342 			goto err;
3343 	}
3344 
3345 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3346 		pkts[i]->pkt_len = lens[i] - buf_offset;
3347 		pkts[i]->data_len = pkts[i]->pkt_len;
3348 		ids[i] = descs[avail_idx + i].id;
3349 	}
3350 
3351 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
3352 		return -1;
3353 
3354 	return 0;
3355 
3356 err:
3357 	return -1;
3358 }
3359 
3360 static __rte_always_inline int
3361 virtio_dev_tx_batch_packed(struct virtio_net *dev,
3362 			   struct vhost_virtqueue *vq,
3363 			   struct rte_mbuf **pkts,
3364 			   bool legacy_ol_flags)
3365 	__rte_shared_locks_required(&vq->iotlb_lock)
3366 {
3367 	uint16_t avail_idx = vq->last_avail_idx;
3368 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3369 	struct virtio_net_hdr *hdr;
3370 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
3371 	uint16_t ids[PACKED_BATCH_SIZE];
3372 	uint16_t i;
3373 
3374 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
3375 					     desc_addrs, ids))
3376 		return -1;
3377 
3378 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3379 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
3380 
3381 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3382 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
3383 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
3384 			   pkts[i]->pkt_len);
3385 
3386 	if (virtio_net_with_host_offload(dev)) {
3387 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3388 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
3389 			vhost_dequeue_offload(dev, hdr, pkts[i], legacy_ol_flags);
3390 		}
3391 	}
3392 
3393 	if (virtio_net_is_inorder(dev))
3394 		vhost_shadow_dequeue_batch_packed_inorder(vq,
3395 			ids[PACKED_BATCH_SIZE - 1]);
3396 	else
3397 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
3398 
3399 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
3400 
3401 	return 0;
3402 }
3403 
3404 static __rte_always_inline int
3405 vhost_dequeue_single_packed(struct virtio_net *dev,
3406 			    struct vhost_virtqueue *vq,
3407 			    struct rte_mempool *mbuf_pool,
3408 			    struct rte_mbuf *pkts,
3409 			    uint16_t *buf_id,
3410 			    uint16_t *desc_count,
3411 			    bool legacy_ol_flags)
3412 	__rte_shared_locks_required(&vq->access_lock)
3413 	__rte_shared_locks_required(&vq->iotlb_lock)
3414 {
3415 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3416 	uint32_t buf_len;
3417 	uint16_t nr_vec = 0;
3418 	int err;
3419 	static bool allocerr_warned;
3420 
3421 	if (unlikely(fill_vec_buf_packed(dev, vq,
3422 					 vq->last_avail_idx, desc_count,
3423 					 buf_vec, &nr_vec,
3424 					 buf_id, &buf_len,
3425 					 VHOST_ACCESS_RO) < 0))
3426 		return -1;
3427 
3428 	if (unlikely(buf_len <= dev->vhost_hlen))
3429 		return -1;
3430 
3431 	buf_len -= dev->vhost_hlen;
3432 
3433 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3434 		if (!allocerr_warned) {
3435 			VHOST_DATA_LOG(dev->ifname, ERR,
3436 				"failed mbuf alloc of size %d from %s.",
3437 				buf_len, mbuf_pool->name);
3438 			allocerr_warned = true;
3439 		}
3440 		return -1;
3441 	}
3442 
3443 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
3444 			   mbuf_pool, legacy_ol_flags, 0, false);
3445 	if (unlikely(err)) {
3446 		if (!allocerr_warned) {
3447 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to copy desc to mbuf.");
3448 			allocerr_warned = true;
3449 		}
3450 		return -1;
3451 	}
3452 
3453 	return 0;
3454 }
3455 
3456 static __rte_always_inline int
3457 virtio_dev_tx_single_packed(struct virtio_net *dev,
3458 			    struct vhost_virtqueue *vq,
3459 			    struct rte_mempool *mbuf_pool,
3460 			    struct rte_mbuf *pkts,
3461 			    bool legacy_ol_flags)
3462 	__rte_shared_locks_required(&vq->access_lock)
3463 	__rte_shared_locks_required(&vq->iotlb_lock)
3464 {
3465 
3466 	uint16_t buf_id, desc_count = 0;
3467 	int ret;
3468 
3469 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
3470 					&desc_count, legacy_ol_flags);
3471 
3472 	if (likely(desc_count > 0)) {
3473 		if (virtio_net_is_inorder(dev))
3474 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
3475 								   desc_count);
3476 		else
3477 			vhost_shadow_dequeue_single_packed(vq, buf_id,
3478 					desc_count);
3479 
3480 		vq_inc_last_avail_packed(vq, desc_count);
3481 	}
3482 
3483 	return ret;
3484 }
3485 
3486 static __rte_always_inline uint16_t
3487 get_nb_avail_entries_packed(const struct vhost_virtqueue *__rte_restrict vq,
3488 			    uint16_t max_nb_avail_entries)
3489 {
3490 	const struct vring_packed_desc *descs = vq->desc_packed;
3491 	bool avail_wrap = vq->avail_wrap_counter;
3492 	uint16_t avail_idx = vq->last_avail_idx;
3493 	uint16_t nb_avail_entries = 0;
3494 	uint16_t flags;
3495 
3496 	while (nb_avail_entries < max_nb_avail_entries) {
3497 		flags = descs[avail_idx].flags;
3498 
3499 		if ((avail_wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3500 		    (avail_wrap == !!(flags & VRING_DESC_F_USED)))
3501 			return nb_avail_entries;
3502 
3503 		if (!(flags & VRING_DESC_F_NEXT))
3504 			++nb_avail_entries;
3505 
3506 		if (unlikely(++avail_idx >= vq->size)) {
3507 			avail_idx -= vq->size;
3508 			avail_wrap = !avail_wrap;
3509 		}
3510 	}
3511 
3512 	return nb_avail_entries;
3513 }
3514 
3515 __rte_always_inline
3516 static uint16_t
3517 virtio_dev_tx_packed(struct virtio_net *dev,
3518 		     struct vhost_virtqueue *__rte_restrict vq,
3519 		     struct rte_mempool *mbuf_pool,
3520 		     struct rte_mbuf **__rte_restrict pkts,
3521 		     uint32_t count,
3522 		     bool legacy_ol_flags)
3523 	__rte_shared_locks_required(&vq->access_lock)
3524 	__rte_shared_locks_required(&vq->iotlb_lock)
3525 {
3526 	uint32_t pkt_idx = 0;
3527 
3528 	count = get_nb_avail_entries_packed(vq, count);
3529 	if (count == 0)
3530 		return 0;
3531 
3532 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count)) {
3533 		vq->stats.mbuf_alloc_failed += count;
3534 		return 0;
3535 	}
3536 
3537 	do {
3538 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
3539 
3540 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
3541 			if (!virtio_dev_tx_batch_packed(dev, vq,
3542 							&pkts[pkt_idx],
3543 							legacy_ol_flags)) {
3544 				pkt_idx += PACKED_BATCH_SIZE;
3545 				continue;
3546 			}
3547 		}
3548 
3549 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
3550 						pkts[pkt_idx],
3551 						legacy_ol_flags))
3552 			break;
3553 		pkt_idx++;
3554 	} while (pkt_idx < count);
3555 
3556 	if (pkt_idx != count)
3557 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
3558 
3559 	if (vq->shadow_used_idx) {
3560 		do_data_copy_dequeue(vq);
3561 
3562 		vhost_flush_dequeue_shadow_packed(dev, vq);
3563 		vhost_vring_call_packed(dev, vq);
3564 	}
3565 
3566 	return pkt_idx;
3567 }
3568 
3569 __rte_noinline
3570 static uint16_t
3571 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
3572 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3573 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3574 	__rte_shared_locks_required(&vq->access_lock)
3575 	__rte_shared_locks_required(&vq->iotlb_lock)
3576 {
3577 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
3578 }
3579 
3580 __rte_noinline
3581 static uint16_t
3582 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
3583 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3584 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3585 	__rte_shared_locks_required(&vq->access_lock)
3586 	__rte_shared_locks_required(&vq->iotlb_lock)
3587 {
3588 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
3589 }
3590 
3591 uint16_t
3592 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
3593 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
3594 {
3595 	struct virtio_net *dev;
3596 	struct vhost_virtqueue *vq;
3597 	int16_t success = 1;
3598 	uint16_t nb_rx = 0;
3599 
3600 	dev = get_device(vid);
3601 	if (!dev)
3602 		return 0;
3603 
3604 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
3605 		VHOST_DATA_LOG(dev->ifname, ERR,
3606 			"%s: built-in vhost net backend is disabled.",
3607 			__func__);
3608 		goto out_no_unlock;
3609 	}
3610 
3611 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
3612 		VHOST_DATA_LOG(dev->ifname, ERR,
3613 			"%s: invalid virtqueue idx %d.",
3614 			__func__, queue_id);
3615 		goto out_no_unlock;
3616 	}
3617 
3618 	vq = dev->virtqueue[queue_id];
3619 
3620 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
3621 		goto out_no_unlock;
3622 
3623 	if (unlikely(!vq->enabled))
3624 		goto out_access_unlock;
3625 
3626 	vhost_user_iotlb_rd_lock(vq);
3627 
3628 	if (unlikely(!vq->access_ok)) {
3629 		vhost_user_iotlb_rd_unlock(vq);
3630 		rte_rwlock_read_unlock(&vq->access_lock);
3631 
3632 		virtio_dev_vring_translate(dev, vq);
3633 
3634 		goto out_no_unlock;
3635 	}
3636 
3637 	/*
3638 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
3639 	 * array, to looks like that guest actually send such packet.
3640 	 *
3641 	 * Check user_send_rarp() for more information.
3642 	 *
3643 	 * broadcast_rarp shares a cacheline in the virtio_net structure
3644 	 * with some fields that are accessed during enqueue and
3645 	 * rte_atomic_compare_exchange_strong_explicit causes a write if performed compare
3646 	 * and exchange. This could result in false sharing between enqueue
3647 	 * and dequeue.
3648 	 *
3649 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
3650 	 * and only performing compare and exchange if the read indicates it
3651 	 * is likely to be set.
3652 	 */
3653 	if (unlikely(rte_atomic_load_explicit(&dev->broadcast_rarp, rte_memory_order_acquire) &&
3654 			rte_atomic_compare_exchange_strong_explicit(&dev->broadcast_rarp,
3655 			&success, 0, rte_memory_order_release, rte_memory_order_relaxed))) {
3656 		/*
3657 		 * Inject the RARP packet to the head of "pkts" array,
3658 		 * so that switch's mac learning table will get updated first.
3659 		 */
3660 		pkts[nb_rx] = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
3661 		if (pkts[nb_rx] == NULL) {
3662 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to make RARP packet.");
3663 			goto out;
3664 		}
3665 		nb_rx += 1;
3666 	}
3667 
3668 	if (vq_is_packed(dev)) {
3669 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3670 			nb_rx += virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool,
3671 					pkts + nb_rx, count - nb_rx);
3672 		else
3673 			nb_rx += virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool,
3674 					pkts + nb_rx, count - nb_rx);
3675 	} else {
3676 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3677 			nb_rx += virtio_dev_tx_split_legacy(dev, vq, mbuf_pool,
3678 					pkts + nb_rx, count - nb_rx);
3679 		else
3680 			nb_rx += virtio_dev_tx_split_compliant(dev, vq, mbuf_pool,
3681 					pkts + nb_rx, count - nb_rx);
3682 	}
3683 
3684 	vhost_queue_stats_update(dev, vq, pkts, nb_rx);
3685 
3686 out:
3687 	vhost_user_iotlb_rd_unlock(vq);
3688 
3689 out_access_unlock:
3690 	rte_rwlock_read_unlock(&vq->access_lock);
3691 
3692 out_no_unlock:
3693 	return nb_rx;
3694 }
3695 
3696 static __rte_always_inline uint16_t
3697 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
3698 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
3699 		uint16_t vchan_id, bool legacy_ol_flags)
3700 	__rte_shared_locks_required(&vq->access_lock)
3701 {
3702 	uint16_t start_idx, from, i;
3703 	uint16_t nr_cpl_pkts = 0;
3704 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
3705 
3706 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
3707 
3708 	start_idx = async_get_first_inflight_pkt_idx(vq);
3709 
3710 	from = start_idx;
3711 	while (vq->async->pkts_cmpl_flag[from] && count--) {
3712 		vq->async->pkts_cmpl_flag[from] = false;
3713 		from = (from + 1) % vq->size;
3714 		nr_cpl_pkts++;
3715 	}
3716 
3717 	if (nr_cpl_pkts == 0)
3718 		return 0;
3719 
3720 	for (i = 0; i < nr_cpl_pkts; i++) {
3721 		from = (start_idx + i) % vq->size;
3722 		pkts[i] = pkts_info[from].mbuf;
3723 
3724 		if (virtio_net_with_host_offload(dev))
3725 			vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i],
3726 					      legacy_ol_flags);
3727 	}
3728 
3729 	/* write back completed descs to used ring and update used idx */
3730 	if (vq_is_packed(dev)) {
3731 		write_back_completed_descs_packed(vq, nr_cpl_pkts);
3732 		vhost_vring_call_packed(dev, vq);
3733 	} else {
3734 		write_back_completed_descs_split(vq, nr_cpl_pkts);
3735 		rte_atomic_fetch_add_explicit((unsigned short __rte_atomic *)&vq->used->idx,
3736 			nr_cpl_pkts, rte_memory_order_release);
3737 		vhost_vring_call_split(dev, vq);
3738 	}
3739 	vq->async->pkts_inflight_n -= nr_cpl_pkts;
3740 
3741 	return nr_cpl_pkts;
3742 }
3743 
3744 static __rte_always_inline uint16_t
3745 virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3746 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3747 		int16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
3748 	__rte_shared_locks_required(&vq->access_lock)
3749 	__rte_shared_locks_required(&vq->iotlb_lock)
3750 {
3751 	static bool allocerr_warned;
3752 	bool dropped = false;
3753 	uint16_t avail_entries;
3754 	uint16_t pkt_idx, slot_idx = 0;
3755 	uint16_t nr_done_pkts = 0;
3756 	uint16_t pkt_err = 0;
3757 	uint16_t n_xfer;
3758 	struct vhost_async *async = vq->async;
3759 	struct async_inflight_info *pkts_info = async->pkts_info;
3760 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
3761 	uint16_t pkts_size = count;
3762 
3763 	/**
3764 	 * The ordering between avail index and
3765 	 * desc reads needs to be enforced.
3766 	 */
3767 	avail_entries = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
3768 		rte_memory_order_acquire) - vq->last_avail_idx;
3769 	if (avail_entries == 0)
3770 		goto out;
3771 
3772 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3773 
3774 	async_iter_reset(async);
3775 
3776 	count = RTE_MIN(count, MAX_PKT_BURST);
3777 	count = RTE_MIN(count, avail_entries);
3778 	VHOST_DATA_LOG(dev->ifname, DEBUG, "about to dequeue %u buffers", count);
3779 
3780 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count)) {
3781 		vq->stats.mbuf_alloc_failed += count;
3782 		goto out;
3783 	}
3784 
3785 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
3786 		uint16_t head_idx = 0;
3787 		uint16_t nr_vec = 0;
3788 		uint16_t to;
3789 		uint32_t buf_len;
3790 		int err;
3791 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3792 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
3793 
3794 		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
3795 						&nr_vec, buf_vec,
3796 						&head_idx, &buf_len,
3797 						VHOST_ACCESS_RO) < 0)) {
3798 			dropped = true;
3799 			break;
3800 		}
3801 
3802 		if (unlikely(buf_len <= dev->vhost_hlen)) {
3803 			dropped = true;
3804 			break;
3805 		}
3806 
3807 		buf_len -= dev->vhost_hlen;
3808 
3809 		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
3810 		if (unlikely(err)) {
3811 			/**
3812 			 * mbuf allocation fails for jumbo packets when external
3813 			 * buffer allocation is not allowed and linear buffer
3814 			 * is required. Drop this packet.
3815 			 */
3816 			if (!allocerr_warned) {
3817 				VHOST_DATA_LOG(dev->ifname, ERR,
3818 					"%s: Failed mbuf alloc of size %d from %s",
3819 					__func__, buf_len, mbuf_pool->name);
3820 				allocerr_warned = true;
3821 			}
3822 			dropped = true;
3823 			slot_idx--;
3824 			break;
3825 		}
3826 
3827 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
3828 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
3829 					legacy_ol_flags, slot_idx, true);
3830 		if (unlikely(err)) {
3831 			if (!allocerr_warned) {
3832 				VHOST_DATA_LOG(dev->ifname, ERR,
3833 					"%s: Failed to offload copies to async channel.",
3834 					__func__);
3835 				allocerr_warned = true;
3836 			}
3837 			dropped = true;
3838 			slot_idx--;
3839 			break;
3840 		}
3841 
3842 		pkts_info[slot_idx].mbuf = pkt;
3843 
3844 		/* store used descs */
3845 		to = async->desc_idx_split & (vq->size - 1);
3846 		async->descs_split[to].id = head_idx;
3847 		async->descs_split[to].len = 0;
3848 		async->desc_idx_split++;
3849 
3850 		vq->last_avail_idx++;
3851 		vhost_virtqueue_reconnect_log_split(vq);
3852 	}
3853 
3854 	if (unlikely(dropped))
3855 		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
3856 
3857 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
3858 					  async->iov_iter, pkt_idx);
3859 
3860 	async->pkts_inflight_n += n_xfer;
3861 
3862 	pkt_err = pkt_idx - n_xfer;
3863 	if (unlikely(pkt_err)) {
3864 		VHOST_DATA_LOG(dev->ifname, DEBUG, "%s: failed to transfer data.",
3865 			__func__);
3866 
3867 		pkt_idx = n_xfer;
3868 		/* recover available ring */
3869 		vq->last_avail_idx -= pkt_err;
3870 		vhost_virtqueue_reconnect_log_split(vq);
3871 
3872 		/**
3873 		 * recover async channel copy related structures and free pktmbufs
3874 		 * for error pkts.
3875 		 */
3876 		async->desc_idx_split -= pkt_err;
3877 		while (pkt_err-- > 0) {
3878 			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
3879 			slot_idx--;
3880 		}
3881 	}
3882 
3883 	async->pkts_idx += pkt_idx;
3884 	if (async->pkts_idx >= vq->size)
3885 		async->pkts_idx -= vq->size;
3886 
3887 out:
3888 	/* DMA device may serve other queues, unconditionally check completed. */
3889 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, pkts_size,
3890 							dma_id, vchan_id, legacy_ol_flags);
3891 
3892 	return nr_done_pkts;
3893 }
3894 
3895 __rte_noinline
3896 static uint16_t
3897 virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
3898 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3899 		struct rte_mbuf **pkts, uint16_t count,
3900 		int16_t dma_id, uint16_t vchan_id)
3901 	__rte_shared_locks_required(&vq->access_lock)
3902 	__rte_shared_locks_required(&vq->iotlb_lock)
3903 {
3904 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3905 				pkts, count, dma_id, vchan_id, true);
3906 }
3907 
3908 __rte_noinline
3909 static uint16_t
3910 virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
3911 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3912 		struct rte_mbuf **pkts, uint16_t count,
3913 		int16_t dma_id, uint16_t vchan_id)
3914 	__rte_shared_locks_required(&vq->access_lock)
3915 	__rte_shared_locks_required(&vq->iotlb_lock)
3916 {
3917 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3918 				pkts, count, dma_id, vchan_id, false);
3919 }
3920 
3921 static __rte_always_inline void
3922 vhost_async_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
3923 				uint16_t buf_id, uint16_t count)
3924 	__rte_shared_locks_required(&vq->access_lock)
3925 {
3926 	struct vhost_async *async = vq->async;
3927 	uint16_t idx = async->buffer_idx_packed;
3928 
3929 	async->buffers_packed[idx].id = buf_id;
3930 	async->buffers_packed[idx].len = 0;
3931 	async->buffers_packed[idx].count = count;
3932 
3933 	async->buffer_idx_packed++;
3934 	if (async->buffer_idx_packed >= vq->size)
3935 		async->buffer_idx_packed -= vq->size;
3936 
3937 }
3938 
3939 static __rte_always_inline int
3940 virtio_dev_tx_async_single_packed(struct virtio_net *dev,
3941 			struct vhost_virtqueue *vq,
3942 			struct rte_mempool *mbuf_pool,
3943 			struct rte_mbuf *pkts,
3944 			uint16_t slot_idx,
3945 			bool legacy_ol_flags)
3946 	__rte_shared_locks_required(&vq->access_lock)
3947 	__rte_shared_locks_required(&vq->iotlb_lock)
3948 {
3949 	int err;
3950 	uint16_t buf_id, desc_count = 0;
3951 	uint16_t nr_vec = 0;
3952 	uint32_t buf_len;
3953 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3954 	struct vhost_async *async = vq->async;
3955 	struct async_inflight_info *pkts_info = async->pkts_info;
3956 	static bool allocerr_warned;
3957 
3958 	if (unlikely(fill_vec_buf_packed(dev, vq, vq->last_avail_idx, &desc_count,
3959 					 buf_vec, &nr_vec, &buf_id, &buf_len,
3960 					 VHOST_ACCESS_RO) < 0))
3961 		return -1;
3962 
3963 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3964 		if (!allocerr_warned) {
3965 			VHOST_DATA_LOG(dev->ifname, ERR, "Failed mbuf alloc of size %d from %s.",
3966 				buf_len, mbuf_pool->name);
3967 
3968 			allocerr_warned = true;
3969 		}
3970 		return -1;
3971 	}
3972 
3973 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts, mbuf_pool,
3974 		legacy_ol_flags, slot_idx, true);
3975 	if (unlikely(err)) {
3976 		rte_pktmbuf_free(pkts);
3977 		if (!allocerr_warned) {
3978 			VHOST_DATA_LOG(dev->ifname, ERR, "Failed to copy desc to mbuf on.");
3979 			allocerr_warned = true;
3980 		}
3981 		return -1;
3982 	}
3983 
3984 	pkts_info[slot_idx].descs = desc_count;
3985 
3986 	/* update async shadow packed ring */
3987 	vhost_async_shadow_dequeue_single_packed(vq, buf_id, desc_count);
3988 
3989 	vq_inc_last_avail_packed(vq, desc_count);
3990 
3991 	return err;
3992 }
3993 
3994 static __rte_always_inline int
3995 virtio_dev_tx_async_packed_batch(struct virtio_net *dev,
3996 			   struct vhost_virtqueue *vq,
3997 			   struct rte_mbuf **pkts, uint16_t slot_idx,
3998 			   uint16_t dma_id, uint16_t vchan_id)
3999 	__rte_shared_locks_required(&vq->access_lock)
4000 	__rte_shared_locks_required(&vq->iotlb_lock)
4001 {
4002 	uint16_t avail_idx = vq->last_avail_idx;
4003 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
4004 	struct vhost_async *async = vq->async;
4005 	struct async_inflight_info *pkts_info = async->pkts_info;
4006 	struct virtio_net_hdr *hdr;
4007 	uint32_t mbuf_offset = 0;
4008 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
4009 	uint64_t desc_vva;
4010 	uint64_t lens[PACKED_BATCH_SIZE];
4011 	void *host_iova[PACKED_BATCH_SIZE];
4012 	uint64_t mapped_len[PACKED_BATCH_SIZE];
4013 	uint16_t ids[PACKED_BATCH_SIZE];
4014 	uint16_t i;
4015 
4016 	if (vhost_async_tx_batch_packed_check(dev, vq, pkts, avail_idx,
4017 					     desc_addrs, lens, ids, dma_id, vchan_id))
4018 		return -1;
4019 
4020 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
4021 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
4022 
4023 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4024 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
4025 			desc_addrs[i] + buf_offset, pkts[i]->pkt_len, &mapped_len[i]);
4026 	}
4027 
4028 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4029 		async_iter_initialize(dev, async);
4030 		async_iter_add_iovec(dev, async,
4031 		host_iova[i],
4032 		(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
4033 		mapped_len[i]);
4034 		async->iter_idx++;
4035 	}
4036 
4037 	if (virtio_net_with_host_offload(dev)) {
4038 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4039 			desc_vva = vhost_iova_to_vva(dev, vq, desc_addrs[i],
4040 						&lens[i], VHOST_ACCESS_RO);
4041 			hdr = (struct virtio_net_hdr *)(uintptr_t)desc_vva;
4042 			pkts_info[slot_idx + i].nethdr = *hdr;
4043 		}
4044 	}
4045 
4046 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
4047 
4048 	vhost_async_shadow_dequeue_packed_batch(vq, ids);
4049 
4050 	return 0;
4051 }
4052 
4053 static __rte_always_inline uint16_t
4054 virtio_dev_tx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
4055 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4056 		uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
4057 	__rte_shared_locks_required(&vq->access_lock)
4058 	__rte_shared_locks_required(&vq->iotlb_lock)
4059 {
4060 	uint32_t pkt_idx = 0;
4061 	uint16_t slot_idx = 0;
4062 	uint16_t nr_done_pkts = 0;
4063 	uint16_t pkt_err = 0;
4064 	uint32_t n_xfer;
4065 	uint16_t i;
4066 	struct vhost_async *async = vq->async;
4067 	struct async_inflight_info *pkts_info = async->pkts_info;
4068 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
4069 
4070 	VHOST_DATA_LOG(dev->ifname, DEBUG, "(%d) about to dequeue %u buffers", dev->vid, count);
4071 
4072 	async_iter_reset(async);
4073 
4074 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count)) {
4075 		vq->stats.mbuf_alloc_failed += count;
4076 		goto out;
4077 	}
4078 
4079 	do {
4080 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
4081 
4082 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
4083 
4084 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4085 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
4086 			if (!virtio_dev_tx_async_packed_batch(dev, vq, &pkts_prealloc[pkt_idx],
4087 						slot_idx, dma_id, vchan_id)) {
4088 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
4089 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4090 					pkts_info[slot_idx].descs = 1;
4091 					pkts_info[slot_idx].nr_buffers = 1;
4092 					pkts_info[slot_idx].mbuf = pkts_prealloc[pkt_idx];
4093 					pkt_idx++;
4094 				}
4095 				continue;
4096 			}
4097 		}
4098 
4099 		if (unlikely(virtio_dev_tx_async_single_packed(dev, vq, mbuf_pool, pkt,
4100 				slot_idx, legacy_ol_flags))) {
4101 			rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
4102 
4103 			if (slot_idx == 0)
4104 				slot_idx = vq->size - 1;
4105 			else
4106 				slot_idx--;
4107 
4108 			break;
4109 		}
4110 
4111 		pkts_info[slot_idx].mbuf = pkt;
4112 		pkt_idx++;
4113 	} while (pkt_idx < count);
4114 
4115 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
4116 					async->iov_iter, pkt_idx);
4117 
4118 	async->pkts_inflight_n += n_xfer;
4119 
4120 	pkt_err = pkt_idx - n_xfer;
4121 
4122 	if (unlikely(pkt_err)) {
4123 		uint16_t descs_err = 0;
4124 
4125 		pkt_idx -= pkt_err;
4126 
4127 		/**
4128 		 * recover DMA-copy related structures and free pktmbuf for DMA-error pkts.
4129 		 */
4130 		if (async->buffer_idx_packed >= pkt_err)
4131 			async->buffer_idx_packed -= pkt_err;
4132 		else
4133 			async->buffer_idx_packed += vq->size - pkt_err;
4134 
4135 		while (pkt_err-- > 0) {
4136 			rte_pktmbuf_free(pkts_info[slot_idx].mbuf);
4137 			descs_err += pkts_info[slot_idx].descs;
4138 
4139 			if (slot_idx == 0)
4140 				slot_idx = vq->size - 1;
4141 			else
4142 				slot_idx--;
4143 		}
4144 
4145 		/* recover available ring */
4146 		if (vq->last_avail_idx >= descs_err) {
4147 			vq->last_avail_idx -= descs_err;
4148 		} else {
4149 			vq->last_avail_idx += vq->size - descs_err;
4150 			vq->avail_wrap_counter ^= 1;
4151 		}
4152 		vhost_virtqueue_reconnect_log_packed(vq);
4153 	}
4154 
4155 	async->pkts_idx += pkt_idx;
4156 	if (async->pkts_idx >= vq->size)
4157 		async->pkts_idx -= vq->size;
4158 
4159 out:
4160 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, count,
4161 					dma_id, vchan_id, legacy_ol_flags);
4162 
4163 	return nr_done_pkts;
4164 }
4165 
4166 __rte_noinline
4167 static uint16_t
4168 virtio_dev_tx_async_packed_legacy(struct virtio_net *dev, struct vhost_virtqueue *vq,
4169 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4170 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4171 	__rte_shared_locks_required(&vq->access_lock)
4172 	__rte_shared_locks_required(&vq->iotlb_lock)
4173 {
4174 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4175 				pkts, count, dma_id, vchan_id, true);
4176 }
4177 
4178 __rte_noinline
4179 static uint16_t
4180 virtio_dev_tx_async_packed_compliant(struct virtio_net *dev, struct vhost_virtqueue *vq,
4181 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4182 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4183 	__rte_shared_locks_required(&vq->access_lock)
4184 	__rte_shared_locks_required(&vq->iotlb_lock)
4185 {
4186 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4187 				pkts, count, dma_id, vchan_id, false);
4188 }
4189 
4190 uint16_t
4191 rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
4192 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
4193 	int *nr_inflight, int16_t dma_id, uint16_t vchan_id)
4194 {
4195 	struct virtio_net *dev;
4196 	struct vhost_virtqueue *vq;
4197 	int16_t success = 1;
4198 	uint16_t nb_rx = 0;
4199 
4200 	dev = get_device(vid);
4201 	if (!dev || !nr_inflight)
4202 		goto out_no_unlock;
4203 
4204 	*nr_inflight = -1;
4205 
4206 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
4207 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: built-in vhost net backend is disabled.",
4208 			__func__);
4209 		goto out_no_unlock;
4210 	}
4211 
4212 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
4213 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %d.",
4214 			__func__, queue_id);
4215 		goto out_no_unlock;
4216 	}
4217 
4218 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
4219 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
4220 			__func__, dma_id);
4221 		goto out_no_unlock;
4222 	}
4223 
4224 	if (unlikely(!dma_copy_track[dma_id].vchans ||
4225 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
4226 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid channel %d:%u.",
4227 			__func__, dma_id, vchan_id);
4228 		goto out_no_unlock;
4229 	}
4230 
4231 	vq = dev->virtqueue[queue_id];
4232 
4233 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
4234 		goto out_no_unlock;
4235 
4236 	if (unlikely(vq->enabled == 0))
4237 		goto out_access_unlock;
4238 
4239 	if (unlikely(!vq->async)) {
4240 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: async not registered for queue id %d.",
4241 			__func__, queue_id);
4242 		goto out_access_unlock;
4243 	}
4244 
4245 	vhost_user_iotlb_rd_lock(vq);
4246 
4247 	if (unlikely(vq->access_ok == 0)) {
4248 		vhost_user_iotlb_rd_unlock(vq);
4249 		rte_rwlock_read_unlock(&vq->access_lock);
4250 
4251 		virtio_dev_vring_translate(dev, vq);
4252 		goto out_no_unlock;
4253 	}
4254 
4255 	/*
4256 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
4257 	 * array, to looks like that guest actually send such packet.
4258 	 *
4259 	 * Check user_send_rarp() for more information.
4260 	 *
4261 	 * broadcast_rarp shares a cacheline in the virtio_net structure
4262 	 * with some fields that are accessed during enqueue and
4263 	 * rte_atomic_compare_exchange_strong_explicit causes a write if performed compare
4264 	 * and exchange. This could result in false sharing between enqueue
4265 	 * and dequeue.
4266 	 *
4267 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
4268 	 * and only performing compare and exchange if the read indicates it
4269 	 * is likely to be set.
4270 	 */
4271 	if (unlikely(rte_atomic_load_explicit(&dev->broadcast_rarp, rte_memory_order_acquire) &&
4272 			rte_atomic_compare_exchange_strong_explicit(&dev->broadcast_rarp,
4273 			&success, 0, rte_memory_order_release, rte_memory_order_relaxed))) {
4274 		/*
4275 		 * Inject the RARP packet to the head of "pkts" array,
4276 		 * so that switch's mac learning table will get updated first.
4277 		 */
4278 		pkts[nb_rx] = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
4279 		if (pkts[nb_rx] == NULL) {
4280 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to make RARP packet.");
4281 			goto out;
4282 		}
4283 		nb_rx += 1;
4284 	}
4285 
4286 	if (vq_is_packed(dev)) {
4287 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4288 			nb_rx += virtio_dev_tx_async_packed_legacy(dev, vq, mbuf_pool,
4289 					pkts + nb_rx, count - nb_rx, dma_id, vchan_id);
4290 		else
4291 			nb_rx += virtio_dev_tx_async_packed_compliant(dev, vq, mbuf_pool,
4292 					pkts + nb_rx, count - nb_rx, dma_id, vchan_id);
4293 	} else {
4294 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4295 			nb_rx += virtio_dev_tx_async_split_legacy(dev, vq, mbuf_pool,
4296 					pkts + nb_rx, count - nb_rx, dma_id, vchan_id);
4297 		else
4298 			nb_rx += virtio_dev_tx_async_split_compliant(dev, vq, mbuf_pool,
4299 					pkts + nb_rx, count - nb_rx, dma_id, vchan_id);
4300 	}
4301 
4302 	*nr_inflight = vq->async->pkts_inflight_n;
4303 	vhost_queue_stats_update(dev, vq, pkts, nb_rx);
4304 
4305 out:
4306 	vhost_user_iotlb_rd_unlock(vq);
4307 
4308 out_access_unlock:
4309 	rte_rwlock_read_unlock(&vq->access_lock);
4310 
4311 out_no_unlock:
4312 	return nb_rx;
4313 }
4314