xref: /dpdk/lib/vhost/virtio_net.c (revision c56185fc183fc0532d2f03aaf04bbf0989ea91a5)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_dmadev.h>
15 #include <rte_vhost.h>
16 #include <rte_tcp.h>
17 #include <rte_udp.h>
18 #include <rte_sctp.h>
19 #include <rte_arp.h>
20 #include <rte_spinlock.h>
21 #include <rte_malloc.h>
22 #include <rte_vhost_async.h>
23 
24 #include "iotlb.h"
25 #include "vhost.h"
26 
27 #define MAX_BATCH_LEN 256
28 
29 static __rte_always_inline uint16_t
30 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
31 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
32 		uint16_t vchan_id, bool legacy_ol_flags);
33 
34 /* DMA device copy operation tracking array. */
35 struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
36 
37 static  __rte_always_inline bool
38 rxvq_is_mergeable(struct virtio_net *dev)
39 {
40 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
41 }
42 
43 static  __rte_always_inline bool
44 virtio_net_is_inorder(struct virtio_net *dev)
45 {
46 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
47 }
48 
49 static bool
50 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
51 {
52 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
53 }
54 
55 static inline void
56 vhost_queue_stats_update(struct virtio_net *dev, struct vhost_virtqueue *vq,
57 		struct rte_mbuf **pkts, uint16_t count)
58 	__rte_shared_locks_required(&vq->access_lock)
59 {
60 	struct virtqueue_stats *stats = &vq->stats;
61 	int i;
62 
63 	if (!(dev->flags & VIRTIO_DEV_STATS_ENABLED))
64 		return;
65 
66 	for (i = 0; i < count; i++) {
67 		struct rte_ether_addr *ea;
68 		struct rte_mbuf *pkt = pkts[i];
69 		uint32_t pkt_len = rte_pktmbuf_pkt_len(pkt);
70 
71 		stats->packets++;
72 		stats->bytes += pkt_len;
73 
74 		if (pkt_len == 64) {
75 			stats->size_bins[1]++;
76 		} else if (pkt_len > 64 && pkt_len < 1024) {
77 			uint32_t bin;
78 
79 			/* count zeros, and offset into correct bin */
80 			bin = (sizeof(pkt_len) * 8) - rte_clz32(pkt_len) - 5;
81 			stats->size_bins[bin]++;
82 		} else {
83 			if (pkt_len < 64)
84 				stats->size_bins[0]++;
85 			else if (pkt_len < 1519)
86 				stats->size_bins[6]++;
87 			else
88 				stats->size_bins[7]++;
89 		}
90 
91 		ea = rte_pktmbuf_mtod(pkt, struct rte_ether_addr *);
92 		if (rte_is_multicast_ether_addr(ea)) {
93 			if (rte_is_broadcast_ether_addr(ea))
94 				stats->broadcast++;
95 			else
96 				stats->multicast++;
97 		}
98 	}
99 }
100 
101 static __rte_always_inline int64_t
102 vhost_async_dma_transfer_one(struct virtio_net *dev, struct vhost_virtqueue *vq,
103 		int16_t dma_id, uint16_t vchan_id, uint16_t flag_idx,
104 		struct vhost_iov_iter *pkt)
105 	__rte_shared_locks_required(&vq->access_lock)
106 {
107 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
108 	uint16_t ring_mask = dma_info->ring_mask;
109 	static bool vhost_async_dma_copy_log;
110 
111 
112 	struct vhost_iovec *iov = pkt->iov;
113 	int copy_idx = 0;
114 	uint32_t nr_segs = pkt->nr_segs;
115 	uint16_t i;
116 
117 	if (rte_dma_burst_capacity(dma_id, vchan_id) < nr_segs)
118 		return -1;
119 
120 	for (i = 0; i < nr_segs; i++) {
121 		copy_idx = rte_dma_copy(dma_id, vchan_id, (rte_iova_t)iov[i].src_addr,
122 				(rte_iova_t)iov[i].dst_addr, iov[i].len, RTE_DMA_OP_FLAG_LLC);
123 		/**
124 		 * Since all memory is pinned and DMA vChannel
125 		 * ring has enough space, failure should be a
126 		 * rare case. If failure happens, it means DMA
127 		 * device encounters serious errors; in this
128 		 * case, please stop async data-path and check
129 		 * what has happened to DMA device.
130 		 */
131 		if (unlikely(copy_idx < 0)) {
132 			if (!vhost_async_dma_copy_log) {
133 				VHOST_LOG_DATA(dev->ifname, ERR,
134 					"DMA copy failed for channel %d:%u\n",
135 					dma_id, vchan_id);
136 				vhost_async_dma_copy_log = true;
137 			}
138 			return -1;
139 		}
140 	}
141 
142 	/**
143 	 * Only store packet completion flag address in the last copy's
144 	 * slot, and other slots are set to NULL.
145 	 */
146 	dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = &vq->async->pkts_cmpl_flag[flag_idx];
147 
148 	return nr_segs;
149 }
150 
151 static __rte_always_inline uint16_t
152 vhost_async_dma_transfer(struct virtio_net *dev, struct vhost_virtqueue *vq,
153 		int16_t dma_id, uint16_t vchan_id, uint16_t head_idx,
154 		struct vhost_iov_iter *pkts, uint16_t nr_pkts)
155 	__rte_shared_locks_required(&vq->access_lock)
156 {
157 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
158 	int64_t ret, nr_copies = 0;
159 	uint16_t pkt_idx;
160 
161 	rte_spinlock_lock(&dma_info->dma_lock);
162 
163 	for (pkt_idx = 0; pkt_idx < nr_pkts; pkt_idx++) {
164 		ret = vhost_async_dma_transfer_one(dev, vq, dma_id, vchan_id, head_idx,
165 				&pkts[pkt_idx]);
166 		if (unlikely(ret < 0))
167 			break;
168 
169 		nr_copies += ret;
170 		head_idx++;
171 		if (head_idx >= vq->size)
172 			head_idx -= vq->size;
173 	}
174 
175 	if (likely(nr_copies > 0))
176 		rte_dma_submit(dma_id, vchan_id);
177 
178 	rte_spinlock_unlock(&dma_info->dma_lock);
179 
180 	return pkt_idx;
181 }
182 
183 static __rte_always_inline uint16_t
184 vhost_async_dma_check_completed(struct virtio_net *dev, int16_t dma_id, uint16_t vchan_id,
185 		uint16_t max_pkts)
186 {
187 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
188 	uint16_t ring_mask = dma_info->ring_mask;
189 	uint16_t last_idx = 0;
190 	uint16_t nr_copies;
191 	uint16_t copy_idx;
192 	uint16_t i;
193 	bool has_error = false;
194 	static bool vhost_async_dma_complete_log;
195 
196 	rte_spinlock_lock(&dma_info->dma_lock);
197 
198 	/**
199 	 * Print error log for debugging, if DMA reports error during
200 	 * DMA transfer. We do not handle error in vhost level.
201 	 */
202 	nr_copies = rte_dma_completed(dma_id, vchan_id, max_pkts, &last_idx, &has_error);
203 	if (unlikely(!vhost_async_dma_complete_log && has_error)) {
204 		VHOST_LOG_DATA(dev->ifname, ERR,
205 			"DMA completion failure on channel %d:%u\n",
206 			dma_id, vchan_id);
207 		vhost_async_dma_complete_log = true;
208 	} else if (nr_copies == 0) {
209 		goto out;
210 	}
211 
212 	copy_idx = last_idx - nr_copies + 1;
213 	for (i = 0; i < nr_copies; i++) {
214 		bool *flag;
215 
216 		flag = dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask];
217 		if (flag) {
218 			/**
219 			 * Mark the packet flag as received. The flag
220 			 * could belong to another virtqueue but write
221 			 * is atomic.
222 			 */
223 			*flag = true;
224 			dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = NULL;
225 		}
226 		copy_idx++;
227 	}
228 
229 out:
230 	rte_spinlock_unlock(&dma_info->dma_lock);
231 	return nr_copies;
232 }
233 
234 static inline void
235 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
236 	__rte_shared_locks_required(&vq->iotlb_lock)
237 {
238 	struct batch_copy_elem *elem = vq->batch_copy_elems;
239 	uint16_t count = vq->batch_copy_nb_elems;
240 	int i;
241 
242 	for (i = 0; i < count; i++) {
243 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
244 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
245 					   elem[i].len);
246 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
247 	}
248 
249 	vq->batch_copy_nb_elems = 0;
250 }
251 
252 static inline void
253 do_data_copy_dequeue(struct vhost_virtqueue *vq)
254 {
255 	struct batch_copy_elem *elem = vq->batch_copy_elems;
256 	uint16_t count = vq->batch_copy_nb_elems;
257 	int i;
258 
259 	for (i = 0; i < count; i++)
260 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
261 
262 	vq->batch_copy_nb_elems = 0;
263 }
264 
265 static __rte_always_inline void
266 do_flush_shadow_used_ring_split(struct virtio_net *dev,
267 			struct vhost_virtqueue *vq,
268 			uint16_t to, uint16_t from, uint16_t size)
269 {
270 	rte_memcpy(&vq->used->ring[to],
271 			&vq->shadow_used_split[from],
272 			size * sizeof(struct vring_used_elem));
273 	vhost_log_cache_used_vring(dev, vq,
274 			offsetof(struct vring_used, ring[to]),
275 			size * sizeof(struct vring_used_elem));
276 }
277 
278 static __rte_always_inline void
279 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
280 {
281 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
282 
283 	if (used_idx + vq->shadow_used_idx <= vq->size) {
284 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
285 					  vq->shadow_used_idx);
286 	} else {
287 		uint16_t size;
288 
289 		/* update used ring interval [used_idx, vq->size] */
290 		size = vq->size - used_idx;
291 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
292 
293 		/* update the left half used ring interval [0, left_size] */
294 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
295 					  vq->shadow_used_idx - size);
296 	}
297 	vq->last_used_idx += vq->shadow_used_idx;
298 
299 	vhost_log_cache_sync(dev, vq);
300 
301 	__atomic_fetch_add(&vq->used->idx, vq->shadow_used_idx,
302 			   __ATOMIC_RELEASE);
303 	vq->shadow_used_idx = 0;
304 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
305 		sizeof(vq->used->idx));
306 }
307 
308 static __rte_always_inline void
309 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
310 			 uint16_t desc_idx, uint32_t len)
311 {
312 	uint16_t i = vq->shadow_used_idx++;
313 
314 	vq->shadow_used_split[i].id  = desc_idx;
315 	vq->shadow_used_split[i].len = len;
316 }
317 
318 static __rte_always_inline void
319 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
320 				  struct vhost_virtqueue *vq)
321 {
322 	int i;
323 	uint16_t used_idx = vq->last_used_idx;
324 	uint16_t head_idx = vq->last_used_idx;
325 	uint16_t head_flags = 0;
326 
327 	/* Split loop in two to save memory barriers */
328 	for (i = 0; i < vq->shadow_used_idx; i++) {
329 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
330 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
331 
332 		used_idx += vq->shadow_used_packed[i].count;
333 		if (used_idx >= vq->size)
334 			used_idx -= vq->size;
335 	}
336 
337 	/* The ordering for storing desc flags needs to be enforced. */
338 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
339 
340 	for (i = 0; i < vq->shadow_used_idx; i++) {
341 		uint16_t flags;
342 
343 		if (vq->shadow_used_packed[i].len)
344 			flags = VRING_DESC_F_WRITE;
345 		else
346 			flags = 0;
347 
348 		if (vq->used_wrap_counter) {
349 			flags |= VRING_DESC_F_USED;
350 			flags |= VRING_DESC_F_AVAIL;
351 		} else {
352 			flags &= ~VRING_DESC_F_USED;
353 			flags &= ~VRING_DESC_F_AVAIL;
354 		}
355 
356 		if (i > 0) {
357 			vq->desc_packed[vq->last_used_idx].flags = flags;
358 
359 			vhost_log_cache_used_vring(dev, vq,
360 					vq->last_used_idx *
361 					sizeof(struct vring_packed_desc),
362 					sizeof(struct vring_packed_desc));
363 		} else {
364 			head_idx = vq->last_used_idx;
365 			head_flags = flags;
366 		}
367 
368 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
369 	}
370 
371 	vq->desc_packed[head_idx].flags = head_flags;
372 
373 	vhost_log_cache_used_vring(dev, vq,
374 				head_idx *
375 				sizeof(struct vring_packed_desc),
376 				sizeof(struct vring_packed_desc));
377 
378 	vq->shadow_used_idx = 0;
379 	vhost_log_cache_sync(dev, vq);
380 }
381 
382 static __rte_always_inline void
383 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
384 				  struct vhost_virtqueue *vq)
385 {
386 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
387 
388 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
389 	/* desc flags is the synchronization point for virtio packed vring */
390 	__atomic_store_n(&vq->desc_packed[vq->shadow_last_used_idx].flags,
391 			 used_elem->flags, __ATOMIC_RELEASE);
392 
393 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
394 				   sizeof(struct vring_packed_desc),
395 				   sizeof(struct vring_packed_desc));
396 	vq->shadow_used_idx = 0;
397 	vhost_log_cache_sync(dev, vq);
398 }
399 
400 static __rte_always_inline void
401 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
402 				 struct vhost_virtqueue *vq,
403 				 uint64_t *lens,
404 				 uint16_t *ids)
405 {
406 	uint16_t i;
407 	uint16_t flags;
408 	uint16_t last_used_idx;
409 	struct vring_packed_desc *desc_base;
410 
411 	last_used_idx = vq->last_used_idx;
412 	desc_base = &vq->desc_packed[last_used_idx];
413 
414 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
415 
416 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
417 		desc_base[i].id = ids[i];
418 		desc_base[i].len = lens[i];
419 	}
420 
421 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
422 
423 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
424 		desc_base[i].flags = flags;
425 	}
426 
427 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
428 				   sizeof(struct vring_packed_desc),
429 				   sizeof(struct vring_packed_desc) *
430 				   PACKED_BATCH_SIZE);
431 	vhost_log_cache_sync(dev, vq);
432 
433 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
434 }
435 
436 static __rte_always_inline void
437 vhost_async_shadow_enqueue_packed_batch(struct vhost_virtqueue *vq,
438 				 uint64_t *lens,
439 				 uint16_t *ids)
440 	__rte_exclusive_locks_required(&vq->access_lock)
441 {
442 	uint16_t i;
443 	struct vhost_async *async = vq->async;
444 
445 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
446 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
447 		async->buffers_packed[async->buffer_idx_packed].len = lens[i];
448 		async->buffers_packed[async->buffer_idx_packed].count = 1;
449 		async->buffer_idx_packed++;
450 		if (async->buffer_idx_packed >= vq->size)
451 			async->buffer_idx_packed -= vq->size;
452 	}
453 }
454 
455 static __rte_always_inline void
456 vhost_async_shadow_dequeue_packed_batch(struct vhost_virtqueue *vq, uint16_t *ids)
457 	__rte_shared_locks_required(&vq->access_lock)
458 {
459 	uint16_t i;
460 	struct vhost_async *async = vq->async;
461 
462 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
463 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
464 		async->buffers_packed[async->buffer_idx_packed].len = 0;
465 		async->buffers_packed[async->buffer_idx_packed].count = 1;
466 
467 		async->buffer_idx_packed++;
468 		if (async->buffer_idx_packed >= vq->size)
469 			async->buffer_idx_packed -= vq->size;
470 	}
471 }
472 
473 static __rte_always_inline void
474 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
475 					  uint16_t id)
476 {
477 	vq->shadow_used_packed[0].id = id;
478 
479 	if (!vq->shadow_used_idx) {
480 		vq->shadow_last_used_idx = vq->last_used_idx;
481 		vq->shadow_used_packed[0].flags =
482 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
483 		vq->shadow_used_packed[0].len = 0;
484 		vq->shadow_used_packed[0].count = 1;
485 		vq->shadow_used_idx++;
486 	}
487 
488 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
489 }
490 
491 static __rte_always_inline void
492 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
493 				  struct vhost_virtqueue *vq,
494 				  uint16_t *ids)
495 {
496 	uint16_t flags;
497 	uint16_t i;
498 	uint16_t begin;
499 
500 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
501 
502 	if (!vq->shadow_used_idx) {
503 		vq->shadow_last_used_idx = vq->last_used_idx;
504 		vq->shadow_used_packed[0].id  = ids[0];
505 		vq->shadow_used_packed[0].len = 0;
506 		vq->shadow_used_packed[0].count = 1;
507 		vq->shadow_used_packed[0].flags = flags;
508 		vq->shadow_used_idx++;
509 		begin = 1;
510 	} else
511 		begin = 0;
512 
513 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
514 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
515 		vq->desc_packed[vq->last_used_idx + i].len = 0;
516 	}
517 
518 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
519 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
520 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
521 
522 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
523 				   sizeof(struct vring_packed_desc),
524 				   sizeof(struct vring_packed_desc) *
525 				   PACKED_BATCH_SIZE);
526 	vhost_log_cache_sync(dev, vq);
527 
528 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
529 }
530 
531 static __rte_always_inline void
532 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
533 				   uint16_t buf_id,
534 				   uint16_t count)
535 {
536 	uint16_t flags;
537 
538 	flags = vq->desc_packed[vq->last_used_idx].flags;
539 	if (vq->used_wrap_counter) {
540 		flags |= VRING_DESC_F_USED;
541 		flags |= VRING_DESC_F_AVAIL;
542 	} else {
543 		flags &= ~VRING_DESC_F_USED;
544 		flags &= ~VRING_DESC_F_AVAIL;
545 	}
546 
547 	if (!vq->shadow_used_idx) {
548 		vq->shadow_last_used_idx = vq->last_used_idx;
549 
550 		vq->shadow_used_packed[0].id  = buf_id;
551 		vq->shadow_used_packed[0].len = 0;
552 		vq->shadow_used_packed[0].flags = flags;
553 		vq->shadow_used_idx++;
554 	} else {
555 		vq->desc_packed[vq->last_used_idx].id = buf_id;
556 		vq->desc_packed[vq->last_used_idx].len = 0;
557 		vq->desc_packed[vq->last_used_idx].flags = flags;
558 	}
559 
560 	vq_inc_last_used_packed(vq, count);
561 }
562 
563 static __rte_always_inline void
564 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
565 					   uint16_t buf_id,
566 					   uint16_t count)
567 {
568 	uint16_t flags;
569 
570 	vq->shadow_used_packed[0].id = buf_id;
571 
572 	flags = vq->desc_packed[vq->last_used_idx].flags;
573 	if (vq->used_wrap_counter) {
574 		flags |= VRING_DESC_F_USED;
575 		flags |= VRING_DESC_F_AVAIL;
576 	} else {
577 		flags &= ~VRING_DESC_F_USED;
578 		flags &= ~VRING_DESC_F_AVAIL;
579 	}
580 
581 	if (!vq->shadow_used_idx) {
582 		vq->shadow_last_used_idx = vq->last_used_idx;
583 		vq->shadow_used_packed[0].len = 0;
584 		vq->shadow_used_packed[0].flags = flags;
585 		vq->shadow_used_idx++;
586 	}
587 
588 	vq_inc_last_used_packed(vq, count);
589 }
590 
591 static __rte_always_inline void
592 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
593 				   uint32_t *len,
594 				   uint16_t *id,
595 				   uint16_t *count,
596 				   uint16_t num_buffers)
597 {
598 	uint16_t i;
599 
600 	for (i = 0; i < num_buffers; i++) {
601 		/* enqueue shadow flush action aligned with batch num */
602 		if (!vq->shadow_used_idx)
603 			vq->shadow_aligned_idx = vq->last_used_idx &
604 				PACKED_BATCH_MASK;
605 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
606 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
607 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
608 		vq->shadow_aligned_idx += count[i];
609 		vq->shadow_used_idx++;
610 	}
611 }
612 
613 static __rte_always_inline void
614 vhost_async_shadow_enqueue_packed(struct vhost_virtqueue *vq,
615 				   uint32_t *len,
616 				   uint16_t *id,
617 				   uint16_t *count,
618 				   uint16_t num_buffers)
619 	__rte_exclusive_locks_required(&vq->access_lock)
620 {
621 	uint16_t i;
622 	struct vhost_async *async = vq->async;
623 
624 	for (i = 0; i < num_buffers; i++) {
625 		async->buffers_packed[async->buffer_idx_packed].id  = id[i];
626 		async->buffers_packed[async->buffer_idx_packed].len = len[i];
627 		async->buffers_packed[async->buffer_idx_packed].count = count[i];
628 		async->buffer_idx_packed++;
629 		if (async->buffer_idx_packed >= vq->size)
630 			async->buffer_idx_packed -= vq->size;
631 	}
632 }
633 
634 static __rte_always_inline void
635 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
636 				   struct vhost_virtqueue *vq,
637 				   uint32_t *len,
638 				   uint16_t *id,
639 				   uint16_t *count,
640 				   uint16_t num_buffers)
641 	__rte_shared_locks_required(&vq->iotlb_lock)
642 {
643 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
644 
645 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
646 		do_data_copy_enqueue(dev, vq);
647 		vhost_flush_enqueue_shadow_packed(dev, vq);
648 	}
649 }
650 
651 /* avoid write operation when necessary, to lessen cache issues */
652 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
653 	if ((var) != (val))			\
654 		(var) = (val);			\
655 } while (0)
656 
657 static __rte_always_inline void
658 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
659 {
660 	uint64_t csum_l4 = m_buf->ol_flags & RTE_MBUF_F_TX_L4_MASK;
661 
662 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
663 		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
664 
665 	if (csum_l4) {
666 		/*
667 		 * Pseudo-header checksum must be set as per Virtio spec.
668 		 *
669 		 * Note: We don't propagate rte_net_intel_cksum_prepare()
670 		 * errors, as it would have an impact on performance, and an
671 		 * error would mean the packet is dropped by the guest instead
672 		 * of being dropped here.
673 		 */
674 		rte_net_intel_cksum_prepare(m_buf);
675 
676 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
677 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
678 
679 		switch (csum_l4) {
680 		case RTE_MBUF_F_TX_TCP_CKSUM:
681 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
682 						cksum));
683 			break;
684 		case RTE_MBUF_F_TX_UDP_CKSUM:
685 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
686 						dgram_cksum));
687 			break;
688 		case RTE_MBUF_F_TX_SCTP_CKSUM:
689 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
690 						cksum));
691 			break;
692 		}
693 	} else {
694 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
695 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
696 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
697 	}
698 
699 	/* IP cksum verification cannot be bypassed, then calculate here */
700 	if (m_buf->ol_flags & RTE_MBUF_F_TX_IP_CKSUM) {
701 		struct rte_ipv4_hdr *ipv4_hdr;
702 
703 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
704 						   m_buf->l2_len);
705 		ipv4_hdr->hdr_checksum = 0;
706 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
707 	}
708 
709 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
710 		if (m_buf->ol_flags & RTE_MBUF_F_TX_IPV4)
711 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
712 		else
713 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
714 		net_hdr->gso_size = m_buf->tso_segsz;
715 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
716 					+ m_buf->l4_len;
717 	} else if (m_buf->ol_flags & RTE_MBUF_F_TX_UDP_SEG) {
718 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
719 		net_hdr->gso_size = m_buf->tso_segsz;
720 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
721 			m_buf->l4_len;
722 	} else {
723 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
724 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
725 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
726 	}
727 }
728 
729 static __rte_always_inline int
730 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
731 		struct buf_vector *buf_vec, uint16_t *vec_idx,
732 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
733 	__rte_shared_locks_required(&vq->iotlb_lock)
734 {
735 	uint16_t vec_id = *vec_idx;
736 
737 	while (desc_len) {
738 		uint64_t desc_addr;
739 		uint64_t desc_chunck_len = desc_len;
740 
741 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
742 			return -1;
743 
744 		desc_addr = vhost_iova_to_vva(dev, vq,
745 				desc_iova,
746 				&desc_chunck_len,
747 				perm);
748 		if (unlikely(!desc_addr))
749 			return -1;
750 
751 		rte_prefetch0((void *)(uintptr_t)desc_addr);
752 
753 		buf_vec[vec_id].buf_iova = desc_iova;
754 		buf_vec[vec_id].buf_addr = desc_addr;
755 		buf_vec[vec_id].buf_len  = desc_chunck_len;
756 
757 		desc_len -= desc_chunck_len;
758 		desc_iova += desc_chunck_len;
759 		vec_id++;
760 	}
761 	*vec_idx = vec_id;
762 
763 	return 0;
764 }
765 
766 static __rte_always_inline int
767 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
768 			 uint32_t avail_idx, uint16_t *vec_idx,
769 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
770 			 uint32_t *desc_chain_len, uint8_t perm)
771 	__rte_shared_locks_required(&vq->iotlb_lock)
772 {
773 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
774 	uint16_t vec_id = *vec_idx;
775 	uint32_t len    = 0;
776 	uint64_t dlen;
777 	uint32_t nr_descs = vq->size;
778 	uint32_t cnt    = 0;
779 	struct vring_desc *descs = vq->desc;
780 	struct vring_desc *idesc = NULL;
781 
782 	if (unlikely(idx >= vq->size))
783 		return -1;
784 
785 	*desc_chain_head = idx;
786 
787 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
788 		dlen = vq->desc[idx].len;
789 		nr_descs = dlen / sizeof(struct vring_desc);
790 		if (unlikely(nr_descs > vq->size))
791 			return -1;
792 
793 		descs = (struct vring_desc *)(uintptr_t)
794 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
795 						&dlen,
796 						VHOST_ACCESS_RO);
797 		if (unlikely(!descs))
798 			return -1;
799 
800 		if (unlikely(dlen < vq->desc[idx].len)) {
801 			/*
802 			 * The indirect desc table is not contiguous
803 			 * in process VA space, we have to copy it.
804 			 */
805 			idesc = vhost_alloc_copy_ind_table(dev, vq,
806 					vq->desc[idx].addr, vq->desc[idx].len);
807 			if (unlikely(!idesc))
808 				return -1;
809 
810 			descs = idesc;
811 		}
812 
813 		idx = 0;
814 	}
815 
816 	while (1) {
817 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
818 			free_ind_table(idesc);
819 			return -1;
820 		}
821 
822 		dlen = descs[idx].len;
823 		len += dlen;
824 
825 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
826 						descs[idx].addr, dlen,
827 						perm))) {
828 			free_ind_table(idesc);
829 			return -1;
830 		}
831 
832 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
833 			break;
834 
835 		idx = descs[idx].next;
836 	}
837 
838 	*desc_chain_len = len;
839 	*vec_idx = vec_id;
840 
841 	if (unlikely(!!idesc))
842 		free_ind_table(idesc);
843 
844 	return 0;
845 }
846 
847 /*
848  * Returns -1 on fail, 0 on success
849  */
850 static inline int
851 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
852 				uint64_t size, struct buf_vector *buf_vec,
853 				uint16_t *num_buffers, uint16_t avail_head,
854 				uint16_t *nr_vec)
855 	__rte_shared_locks_required(&vq->iotlb_lock)
856 {
857 	uint16_t cur_idx;
858 	uint16_t vec_idx = 0;
859 	uint16_t max_tries, tries = 0;
860 
861 	uint16_t head_idx = 0;
862 	uint32_t len = 0;
863 
864 	*num_buffers = 0;
865 	cur_idx  = vq->last_avail_idx;
866 
867 	if (rxvq_is_mergeable(dev))
868 		max_tries = vq->size - 1;
869 	else
870 		max_tries = 1;
871 
872 	while (size > 0) {
873 		if (unlikely(cur_idx == avail_head))
874 			return -1;
875 		/*
876 		 * if we tried all available ring items, and still
877 		 * can't get enough buf, it means something abnormal
878 		 * happened.
879 		 */
880 		if (unlikely(++tries > max_tries))
881 			return -1;
882 
883 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
884 						&vec_idx, buf_vec,
885 						&head_idx, &len,
886 						VHOST_ACCESS_RW) < 0))
887 			return -1;
888 		len = RTE_MIN(len, size);
889 		update_shadow_used_ring_split(vq, head_idx, len);
890 		size -= len;
891 
892 		cur_idx++;
893 		*num_buffers += 1;
894 	}
895 
896 	*nr_vec = vec_idx;
897 
898 	return 0;
899 }
900 
901 static __rte_always_inline int
902 fill_vec_buf_packed_indirect(struct virtio_net *dev,
903 			struct vhost_virtqueue *vq,
904 			struct vring_packed_desc *desc, uint16_t *vec_idx,
905 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
906 	__rte_shared_locks_required(&vq->iotlb_lock)
907 {
908 	uint16_t i;
909 	uint32_t nr_descs;
910 	uint16_t vec_id = *vec_idx;
911 	uint64_t dlen;
912 	struct vring_packed_desc *descs, *idescs = NULL;
913 
914 	dlen = desc->len;
915 	descs = (struct vring_packed_desc *)(uintptr_t)
916 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
917 	if (unlikely(!descs))
918 		return -1;
919 
920 	if (unlikely(dlen < desc->len)) {
921 		/*
922 		 * The indirect desc table is not contiguous
923 		 * in process VA space, we have to copy it.
924 		 */
925 		idescs = vhost_alloc_copy_ind_table(dev,
926 				vq, desc->addr, desc->len);
927 		if (unlikely(!idescs))
928 			return -1;
929 
930 		descs = idescs;
931 	}
932 
933 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
934 	if (unlikely(nr_descs >= vq->size)) {
935 		free_ind_table(idescs);
936 		return -1;
937 	}
938 
939 	for (i = 0; i < nr_descs; i++) {
940 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
941 			free_ind_table(idescs);
942 			return -1;
943 		}
944 
945 		dlen = descs[i].len;
946 		*len += dlen;
947 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
948 						descs[i].addr, dlen,
949 						perm)))
950 			return -1;
951 	}
952 	*vec_idx = vec_id;
953 
954 	if (unlikely(!!idescs))
955 		free_ind_table(idescs);
956 
957 	return 0;
958 }
959 
960 static __rte_always_inline int
961 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
962 				uint16_t avail_idx, uint16_t *desc_count,
963 				struct buf_vector *buf_vec, uint16_t *vec_idx,
964 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
965 	__rte_shared_locks_required(&vq->iotlb_lock)
966 {
967 	bool wrap_counter = vq->avail_wrap_counter;
968 	struct vring_packed_desc *descs = vq->desc_packed;
969 	uint16_t vec_id = *vec_idx;
970 	uint64_t dlen;
971 
972 	if (avail_idx < vq->last_avail_idx)
973 		wrap_counter ^= 1;
974 
975 	/*
976 	 * Perform a load-acquire barrier in desc_is_avail to
977 	 * enforce the ordering between desc flags and desc
978 	 * content.
979 	 */
980 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
981 		return -1;
982 
983 	*desc_count = 0;
984 	*len = 0;
985 
986 	while (1) {
987 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
988 			return -1;
989 
990 		if (unlikely(*desc_count >= vq->size))
991 			return -1;
992 
993 		*desc_count += 1;
994 		*buf_id = descs[avail_idx].id;
995 
996 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
997 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
998 							&descs[avail_idx],
999 							&vec_id, buf_vec,
1000 							len, perm) < 0))
1001 				return -1;
1002 		} else {
1003 			dlen = descs[avail_idx].len;
1004 			*len += dlen;
1005 
1006 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
1007 							descs[avail_idx].addr,
1008 							dlen,
1009 							perm)))
1010 				return -1;
1011 		}
1012 
1013 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
1014 			break;
1015 
1016 		if (++avail_idx >= vq->size) {
1017 			avail_idx -= vq->size;
1018 			wrap_counter ^= 1;
1019 		}
1020 	}
1021 
1022 	*vec_idx = vec_id;
1023 
1024 	return 0;
1025 }
1026 
1027 static __rte_noinline void
1028 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1029 		struct buf_vector *buf_vec,
1030 		struct virtio_net_hdr_mrg_rxbuf *hdr)
1031 	__rte_shared_locks_required(&vq->iotlb_lock)
1032 {
1033 	uint64_t len;
1034 	uint64_t remain = dev->vhost_hlen;
1035 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
1036 	uint64_t iova = buf_vec->buf_iova;
1037 
1038 	while (remain) {
1039 		len = RTE_MIN(remain,
1040 				buf_vec->buf_len);
1041 		dst = buf_vec->buf_addr;
1042 		rte_memcpy((void *)(uintptr_t)dst,
1043 				(void *)(uintptr_t)src,
1044 				len);
1045 
1046 		PRINT_PACKET(dev, (uintptr_t)dst,
1047 				(uint32_t)len, 0);
1048 		vhost_log_cache_write_iova(dev, vq,
1049 				iova, len);
1050 
1051 		remain -= len;
1052 		iova += len;
1053 		src += len;
1054 		buf_vec++;
1055 	}
1056 }
1057 
1058 static __rte_always_inline int
1059 async_iter_initialize(struct virtio_net *dev, struct vhost_async *async)
1060 {
1061 	struct vhost_iov_iter *iter;
1062 
1063 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1064 		VHOST_LOG_DATA(dev->ifname, ERR, "no more async iovec available\n");
1065 		return -1;
1066 	}
1067 
1068 	iter = async->iov_iter + async->iter_idx;
1069 	iter->iov = async->iovec + async->iovec_idx;
1070 	iter->nr_segs = 0;
1071 
1072 	return 0;
1073 }
1074 
1075 static __rte_always_inline int
1076 async_iter_add_iovec(struct virtio_net *dev, struct vhost_async *async,
1077 		void *src, void *dst, size_t len)
1078 {
1079 	struct vhost_iov_iter *iter;
1080 	struct vhost_iovec *iovec;
1081 
1082 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1083 		static bool vhost_max_async_vec_log;
1084 
1085 		if (!vhost_max_async_vec_log) {
1086 			VHOST_LOG_DATA(dev->ifname, ERR, "no more async iovec available\n");
1087 			vhost_max_async_vec_log = true;
1088 		}
1089 
1090 		return -1;
1091 	}
1092 
1093 	iter = async->iov_iter + async->iter_idx;
1094 	iovec = async->iovec + async->iovec_idx;
1095 
1096 	iovec->src_addr = src;
1097 	iovec->dst_addr = dst;
1098 	iovec->len = len;
1099 
1100 	iter->nr_segs++;
1101 	async->iovec_idx++;
1102 
1103 	return 0;
1104 }
1105 
1106 static __rte_always_inline void
1107 async_iter_finalize(struct vhost_async *async)
1108 {
1109 	async->iter_idx++;
1110 }
1111 
1112 static __rte_always_inline void
1113 async_iter_cancel(struct vhost_async *async)
1114 {
1115 	struct vhost_iov_iter *iter;
1116 
1117 	iter = async->iov_iter + async->iter_idx;
1118 	async->iovec_idx -= iter->nr_segs;
1119 	iter->nr_segs = 0;
1120 	iter->iov = NULL;
1121 }
1122 
1123 static __rte_always_inline void
1124 async_iter_reset(struct vhost_async *async)
1125 {
1126 	async->iter_idx = 0;
1127 	async->iovec_idx = 0;
1128 }
1129 
1130 static __rte_always_inline int
1131 async_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1132 		struct rte_mbuf *m, uint32_t mbuf_offset,
1133 		uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1134 	__rte_shared_locks_required(&vq->access_lock)
1135 	__rte_shared_locks_required(&vq->iotlb_lock)
1136 {
1137 	struct vhost_async *async = vq->async;
1138 	uint64_t mapped_len;
1139 	uint32_t buf_offset = 0;
1140 	void *src, *dst;
1141 	void *host_iova;
1142 
1143 	while (cpy_len) {
1144 		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
1145 				buf_iova + buf_offset, cpy_len, &mapped_len);
1146 		if (unlikely(!host_iova)) {
1147 			VHOST_LOG_DATA(dev->ifname, ERR,
1148 				"%s: failed to get host iova.\n",
1149 				__func__);
1150 			return -1;
1151 		}
1152 
1153 		if (to_desc) {
1154 			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1155 			dst = host_iova;
1156 		} else {
1157 			src = host_iova;
1158 			dst = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1159 		}
1160 
1161 		if (unlikely(async_iter_add_iovec(dev, async, src, dst, (size_t)mapped_len)))
1162 			return -1;
1163 
1164 		cpy_len -= (uint32_t)mapped_len;
1165 		mbuf_offset += (uint32_t)mapped_len;
1166 		buf_offset += (uint32_t)mapped_len;
1167 	}
1168 
1169 	return 0;
1170 }
1171 
1172 static __rte_always_inline void
1173 sync_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1174 		struct rte_mbuf *m, uint32_t mbuf_offset,
1175 		uint64_t buf_addr, uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1176 	__rte_shared_locks_required(&vq->iotlb_lock)
1177 {
1178 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
1179 
1180 	if (likely(cpy_len > MAX_BATCH_LEN || vq->batch_copy_nb_elems >= vq->size)) {
1181 		if (to_desc) {
1182 			rte_memcpy((void *)((uintptr_t)(buf_addr)),
1183 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1184 				cpy_len);
1185 			vhost_log_cache_write_iova(dev, vq, buf_iova, cpy_len);
1186 			PRINT_PACKET(dev, (uintptr_t)(buf_addr), cpy_len, 0);
1187 		} else {
1188 			rte_memcpy(rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1189 				(void *)((uintptr_t)(buf_addr)),
1190 				cpy_len);
1191 		}
1192 	} else {
1193 		if (to_desc) {
1194 			batch_copy[vq->batch_copy_nb_elems].dst =
1195 				(void *)((uintptr_t)(buf_addr));
1196 			batch_copy[vq->batch_copy_nb_elems].src =
1197 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1198 			batch_copy[vq->batch_copy_nb_elems].log_addr = buf_iova;
1199 		} else {
1200 			batch_copy[vq->batch_copy_nb_elems].dst =
1201 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1202 			batch_copy[vq->batch_copy_nb_elems].src =
1203 				(void *)((uintptr_t)(buf_addr));
1204 		}
1205 		batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
1206 		vq->batch_copy_nb_elems++;
1207 	}
1208 }
1209 
1210 static __rte_always_inline int
1211 mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1212 		struct rte_mbuf *m, struct buf_vector *buf_vec,
1213 		uint16_t nr_vec, uint16_t num_buffers, bool is_async)
1214 	__rte_shared_locks_required(&vq->access_lock)
1215 	__rte_shared_locks_required(&vq->iotlb_lock)
1216 {
1217 	uint32_t vec_idx = 0;
1218 	uint32_t mbuf_offset, mbuf_avail;
1219 	uint32_t buf_offset, buf_avail;
1220 	uint64_t buf_addr, buf_iova, buf_len;
1221 	uint32_t cpy_len;
1222 	uint64_t hdr_addr;
1223 	struct rte_mbuf *hdr_mbuf;
1224 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
1225 	struct vhost_async *async = vq->async;
1226 
1227 	if (unlikely(m == NULL))
1228 		return -1;
1229 
1230 	buf_addr = buf_vec[vec_idx].buf_addr;
1231 	buf_iova = buf_vec[vec_idx].buf_iova;
1232 	buf_len = buf_vec[vec_idx].buf_len;
1233 
1234 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
1235 		return -1;
1236 
1237 	hdr_mbuf = m;
1238 	hdr_addr = buf_addr;
1239 	if (unlikely(buf_len < dev->vhost_hlen)) {
1240 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
1241 		hdr = &tmp_hdr;
1242 	} else
1243 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
1244 
1245 	VHOST_LOG_DATA(dev->ifname, DEBUG, "RX: num merge buffers %d\n", num_buffers);
1246 
1247 	if (unlikely(buf_len < dev->vhost_hlen)) {
1248 		buf_offset = dev->vhost_hlen - buf_len;
1249 		vec_idx++;
1250 		buf_addr = buf_vec[vec_idx].buf_addr;
1251 		buf_iova = buf_vec[vec_idx].buf_iova;
1252 		buf_len = buf_vec[vec_idx].buf_len;
1253 		buf_avail = buf_len - buf_offset;
1254 	} else {
1255 		buf_offset = dev->vhost_hlen;
1256 		buf_avail = buf_len - dev->vhost_hlen;
1257 	}
1258 
1259 	mbuf_avail  = rte_pktmbuf_data_len(m);
1260 	mbuf_offset = 0;
1261 
1262 	if (is_async) {
1263 		if (async_iter_initialize(dev, async))
1264 			return -1;
1265 	}
1266 
1267 	while (mbuf_avail != 0 || m->next != NULL) {
1268 		/* done with current buf, get the next one */
1269 		if (buf_avail == 0) {
1270 			vec_idx++;
1271 			if (unlikely(vec_idx >= nr_vec))
1272 				goto error;
1273 
1274 			buf_addr = buf_vec[vec_idx].buf_addr;
1275 			buf_iova = buf_vec[vec_idx].buf_iova;
1276 			buf_len = buf_vec[vec_idx].buf_len;
1277 
1278 			buf_offset = 0;
1279 			buf_avail  = buf_len;
1280 		}
1281 
1282 		/* done with current mbuf, get the next one */
1283 		if (mbuf_avail == 0) {
1284 			m = m->next;
1285 
1286 			mbuf_offset = 0;
1287 			mbuf_avail  = rte_pktmbuf_data_len(m);
1288 		}
1289 
1290 		if (hdr_addr) {
1291 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1292 			if (rxvq_is_mergeable(dev))
1293 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1294 						num_buffers);
1295 
1296 			if (unlikely(hdr == &tmp_hdr)) {
1297 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1298 			} else {
1299 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1300 						dev->vhost_hlen, 0);
1301 				vhost_log_cache_write_iova(dev, vq,
1302 						buf_vec[0].buf_iova,
1303 						dev->vhost_hlen);
1304 			}
1305 
1306 			hdr_addr = 0;
1307 		}
1308 
1309 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1310 
1311 		if (is_async) {
1312 			if (async_fill_seg(dev, vq, m, mbuf_offset,
1313 					   buf_iova + buf_offset, cpy_len, true) < 0)
1314 				goto error;
1315 		} else {
1316 			sync_fill_seg(dev, vq, m, mbuf_offset,
1317 				      buf_addr + buf_offset,
1318 				      buf_iova + buf_offset, cpy_len, true);
1319 		}
1320 
1321 		mbuf_avail  -= cpy_len;
1322 		mbuf_offset += cpy_len;
1323 		buf_avail  -= cpy_len;
1324 		buf_offset += cpy_len;
1325 	}
1326 
1327 	if (is_async)
1328 		async_iter_finalize(async);
1329 
1330 	return 0;
1331 error:
1332 	if (is_async)
1333 		async_iter_cancel(async);
1334 
1335 	return -1;
1336 }
1337 
1338 static __rte_always_inline int
1339 vhost_enqueue_single_packed(struct virtio_net *dev,
1340 			    struct vhost_virtqueue *vq,
1341 			    struct rte_mbuf *pkt,
1342 			    struct buf_vector *buf_vec,
1343 			    uint16_t *nr_descs)
1344 	__rte_shared_locks_required(&vq->access_lock)
1345 	__rte_shared_locks_required(&vq->iotlb_lock)
1346 {
1347 	uint16_t nr_vec = 0;
1348 	uint16_t avail_idx = vq->last_avail_idx;
1349 	uint16_t max_tries, tries = 0;
1350 	uint16_t buf_id = 0;
1351 	uint32_t len = 0;
1352 	uint16_t desc_count;
1353 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1354 	uint16_t num_buffers = 0;
1355 	uint32_t buffer_len[vq->size];
1356 	uint16_t buffer_buf_id[vq->size];
1357 	uint16_t buffer_desc_count[vq->size];
1358 
1359 	if (rxvq_is_mergeable(dev))
1360 		max_tries = vq->size - 1;
1361 	else
1362 		max_tries = 1;
1363 
1364 	while (size > 0) {
1365 		/*
1366 		 * if we tried all available ring items, and still
1367 		 * can't get enough buf, it means something abnormal
1368 		 * happened.
1369 		 */
1370 		if (unlikely(++tries > max_tries))
1371 			return -1;
1372 
1373 		if (unlikely(fill_vec_buf_packed(dev, vq,
1374 						avail_idx, &desc_count,
1375 						buf_vec, &nr_vec,
1376 						&buf_id, &len,
1377 						VHOST_ACCESS_RW) < 0))
1378 			return -1;
1379 
1380 		len = RTE_MIN(len, size);
1381 		size -= len;
1382 
1383 		buffer_len[num_buffers] = len;
1384 		buffer_buf_id[num_buffers] = buf_id;
1385 		buffer_desc_count[num_buffers] = desc_count;
1386 		num_buffers += 1;
1387 
1388 		*nr_descs += desc_count;
1389 		avail_idx += desc_count;
1390 		if (avail_idx >= vq->size)
1391 			avail_idx -= vq->size;
1392 	}
1393 
1394 	if (mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers, false) < 0)
1395 		return -1;
1396 
1397 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1398 					   buffer_desc_count, num_buffers);
1399 
1400 	return 0;
1401 }
1402 
1403 static __rte_noinline uint32_t
1404 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1405 	struct rte_mbuf **pkts, uint32_t count)
1406 	__rte_shared_locks_required(&vq->access_lock)
1407 	__rte_shared_locks_required(&vq->iotlb_lock)
1408 {
1409 	uint32_t pkt_idx = 0;
1410 	uint16_t num_buffers;
1411 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1412 	uint16_t avail_head;
1413 
1414 	/*
1415 	 * The ordering between avail index and
1416 	 * desc reads needs to be enforced.
1417 	 */
1418 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1419 
1420 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1421 
1422 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1423 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1424 		uint16_t nr_vec = 0;
1425 
1426 		if (unlikely(reserve_avail_buf_split(dev, vq,
1427 						pkt_len, buf_vec, &num_buffers,
1428 						avail_head, &nr_vec) < 0)) {
1429 			VHOST_LOG_DATA(dev->ifname, DEBUG,
1430 				"failed to get enough desc from vring\n");
1431 			vq->shadow_used_idx -= num_buffers;
1432 			break;
1433 		}
1434 
1435 		VHOST_LOG_DATA(dev->ifname, DEBUG,
1436 			"current index %d | end index %d\n",
1437 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1438 
1439 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec,
1440 					num_buffers, false) < 0) {
1441 			vq->shadow_used_idx -= num_buffers;
1442 			break;
1443 		}
1444 
1445 		vq->last_avail_idx += num_buffers;
1446 	}
1447 
1448 	do_data_copy_enqueue(dev, vq);
1449 
1450 	if (likely(vq->shadow_used_idx)) {
1451 		flush_shadow_used_ring_split(dev, vq);
1452 		vhost_vring_call_split(dev, vq);
1453 	}
1454 
1455 	return pkt_idx;
1456 }
1457 
1458 static __rte_always_inline int
1459 virtio_dev_rx_sync_batch_check(struct virtio_net *dev,
1460 			   struct vhost_virtqueue *vq,
1461 			   struct rte_mbuf **pkts,
1462 			   uint64_t *desc_addrs,
1463 			   uint64_t *lens)
1464 	__rte_shared_locks_required(&vq->iotlb_lock)
1465 {
1466 	bool wrap_counter = vq->avail_wrap_counter;
1467 	struct vring_packed_desc *descs = vq->desc_packed;
1468 	uint16_t avail_idx = vq->last_avail_idx;
1469 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1470 	uint16_t i;
1471 
1472 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1473 		return -1;
1474 
1475 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1476 		return -1;
1477 
1478 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1479 		if (unlikely(pkts[i]->next != NULL))
1480 			return -1;
1481 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1482 					    wrap_counter)))
1483 			return -1;
1484 	}
1485 
1486 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1487 		lens[i] = descs[avail_idx + i].len;
1488 
1489 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1490 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1491 			return -1;
1492 	}
1493 
1494 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1495 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1496 						  descs[avail_idx + i].addr,
1497 						  &lens[i],
1498 						  VHOST_ACCESS_RW);
1499 
1500 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1501 		if (unlikely(!desc_addrs[i]))
1502 			return -1;
1503 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1504 			return -1;
1505 	}
1506 
1507 	return 0;
1508 }
1509 
1510 static __rte_always_inline int
1511 virtio_dev_rx_async_batch_check(struct vhost_virtqueue *vq,
1512 			   struct rte_mbuf **pkts,
1513 			   uint64_t *desc_addrs,
1514 			   uint64_t *lens,
1515 			   int16_t dma_id,
1516 			   uint16_t vchan_id)
1517 {
1518 	bool wrap_counter = vq->avail_wrap_counter;
1519 	struct vring_packed_desc *descs = vq->desc_packed;
1520 	uint16_t avail_idx = vq->last_avail_idx;
1521 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1522 	uint16_t i;
1523 
1524 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1525 		return -1;
1526 
1527 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1528 		return -1;
1529 
1530 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1531 		if (unlikely(pkts[i]->next != NULL))
1532 			return -1;
1533 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1534 					    wrap_counter)))
1535 			return -1;
1536 	}
1537 
1538 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1539 		lens[i] = descs[avail_idx + i].len;
1540 
1541 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1542 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1543 			return -1;
1544 	}
1545 
1546 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1547 		desc_addrs[i] =  descs[avail_idx + i].addr;
1548 
1549 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1550 		if (unlikely(!desc_addrs[i]))
1551 			return -1;
1552 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1553 			return -1;
1554 	}
1555 
1556 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
1557 		return -1;
1558 
1559 	return 0;
1560 }
1561 
1562 static __rte_always_inline void
1563 virtio_dev_rx_batch_packed_copy(struct virtio_net *dev,
1564 			   struct vhost_virtqueue *vq,
1565 			   struct rte_mbuf **pkts,
1566 			   uint64_t *desc_addrs,
1567 			   uint64_t *lens)
1568 	__rte_shared_locks_required(&vq->iotlb_lock)
1569 {
1570 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1571 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1572 	struct vring_packed_desc *descs = vq->desc_packed;
1573 	uint16_t avail_idx = vq->last_avail_idx;
1574 	uint16_t ids[PACKED_BATCH_SIZE];
1575 	uint16_t i;
1576 
1577 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1578 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1579 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1580 					(uintptr_t)desc_addrs[i];
1581 		lens[i] = pkts[i]->pkt_len +
1582 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1583 	}
1584 
1585 	if (rxvq_is_mergeable(dev)) {
1586 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1587 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
1588 		}
1589 	}
1590 
1591 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1592 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1593 
1594 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1595 
1596 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1597 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1598 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1599 			   pkts[i]->pkt_len);
1600 	}
1601 
1602 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1603 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1604 					   lens[i]);
1605 
1606 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1607 		ids[i] = descs[avail_idx + i].id;
1608 
1609 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1610 }
1611 
1612 static __rte_always_inline int
1613 virtio_dev_rx_sync_batch_packed(struct virtio_net *dev,
1614 			   struct vhost_virtqueue *vq,
1615 			   struct rte_mbuf **pkts)
1616 	__rte_shared_locks_required(&vq->iotlb_lock)
1617 {
1618 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1619 	uint64_t lens[PACKED_BATCH_SIZE];
1620 
1621 	if (virtio_dev_rx_sync_batch_check(dev, vq, pkts, desc_addrs, lens) == -1)
1622 		return -1;
1623 
1624 	if (vq->shadow_used_idx) {
1625 		do_data_copy_enqueue(dev, vq);
1626 		vhost_flush_enqueue_shadow_packed(dev, vq);
1627 	}
1628 
1629 	virtio_dev_rx_batch_packed_copy(dev, vq, pkts, desc_addrs, lens);
1630 
1631 	return 0;
1632 }
1633 
1634 static __rte_always_inline int16_t
1635 virtio_dev_rx_single_packed(struct virtio_net *dev,
1636 			    struct vhost_virtqueue *vq,
1637 			    struct rte_mbuf *pkt)
1638 	__rte_shared_locks_required(&vq->access_lock)
1639 	__rte_shared_locks_required(&vq->iotlb_lock)
1640 {
1641 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1642 	uint16_t nr_descs = 0;
1643 
1644 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1645 						 &nr_descs) < 0)) {
1646 		VHOST_LOG_DATA(dev->ifname, DEBUG, "failed to get enough desc from vring\n");
1647 		return -1;
1648 	}
1649 
1650 	VHOST_LOG_DATA(dev->ifname, DEBUG,
1651 		"current index %d | end index %d\n",
1652 		vq->last_avail_idx, vq->last_avail_idx + nr_descs);
1653 
1654 	vq_inc_last_avail_packed(vq, nr_descs);
1655 
1656 	return 0;
1657 }
1658 
1659 static __rte_noinline uint32_t
1660 virtio_dev_rx_packed(struct virtio_net *dev,
1661 		     struct vhost_virtqueue *__rte_restrict vq,
1662 		     struct rte_mbuf **__rte_restrict pkts,
1663 		     uint32_t count)
1664 	__rte_shared_locks_required(&vq->access_lock)
1665 	__rte_shared_locks_required(&vq->iotlb_lock)
1666 {
1667 	uint32_t pkt_idx = 0;
1668 
1669 	do {
1670 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1671 
1672 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1673 			if (!virtio_dev_rx_sync_batch_packed(dev, vq,
1674 							&pkts[pkt_idx])) {
1675 				pkt_idx += PACKED_BATCH_SIZE;
1676 				continue;
1677 			}
1678 		}
1679 
1680 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1681 			break;
1682 		pkt_idx++;
1683 
1684 	} while (pkt_idx < count);
1685 
1686 	if (vq->shadow_used_idx) {
1687 		do_data_copy_enqueue(dev, vq);
1688 		vhost_flush_enqueue_shadow_packed(dev, vq);
1689 	}
1690 
1691 	if (pkt_idx)
1692 		vhost_vring_call_packed(dev, vq);
1693 
1694 	return pkt_idx;
1695 }
1696 
1697 static __rte_always_inline uint32_t
1698 virtio_dev_rx(struct virtio_net *dev, struct vhost_virtqueue *vq,
1699 	struct rte_mbuf **pkts, uint32_t count)
1700 {
1701 	uint32_t nb_tx = 0;
1702 
1703 	VHOST_LOG_DATA(dev->ifname, DEBUG, "%s\n", __func__);
1704 	rte_rwlock_read_lock(&vq->access_lock);
1705 
1706 	if (unlikely(!vq->enabled))
1707 		goto out_access_unlock;
1708 
1709 	vhost_user_iotlb_rd_lock(vq);
1710 
1711 	if (unlikely(!vq->access_ok))
1712 		if (unlikely(vring_translate(dev, vq) < 0))
1713 			goto out;
1714 
1715 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1716 	if (count == 0)
1717 		goto out;
1718 
1719 	if (vq_is_packed(dev))
1720 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1721 	else
1722 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1723 
1724 	vhost_queue_stats_update(dev, vq, pkts, nb_tx);
1725 
1726 out:
1727 	vhost_user_iotlb_rd_unlock(vq);
1728 
1729 out_access_unlock:
1730 	rte_rwlock_read_unlock(&vq->access_lock);
1731 
1732 	return nb_tx;
1733 }
1734 
1735 uint16_t
1736 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1737 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1738 {
1739 	struct virtio_net *dev = get_device(vid);
1740 
1741 	if (!dev)
1742 		return 0;
1743 
1744 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1745 		VHOST_LOG_DATA(dev->ifname, ERR,
1746 			"%s: built-in vhost net backend is disabled.\n",
1747 			__func__);
1748 		return 0;
1749 	}
1750 
1751 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1752 		VHOST_LOG_DATA(dev->ifname, ERR,
1753 			"%s: invalid virtqueue idx %d.\n",
1754 			__func__, queue_id);
1755 		return 0;
1756 	}
1757 
1758 	return virtio_dev_rx(dev, dev->virtqueue[queue_id], pkts, count);
1759 }
1760 
1761 static __rte_always_inline uint16_t
1762 async_get_first_inflight_pkt_idx(struct vhost_virtqueue *vq)
1763 	__rte_shared_locks_required(&vq->access_lock)
1764 {
1765 	struct vhost_async *async = vq->async;
1766 
1767 	if (async->pkts_idx >= async->pkts_inflight_n)
1768 		return async->pkts_idx - async->pkts_inflight_n;
1769 	else
1770 		return vq->size - async->pkts_inflight_n + async->pkts_idx;
1771 }
1772 
1773 static __rte_always_inline void
1774 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1775 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1776 {
1777 	size_t elem_size = sizeof(struct vring_used_elem);
1778 
1779 	if (d_idx + count <= ring_size) {
1780 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1781 	} else {
1782 		uint16_t size = ring_size - d_idx;
1783 
1784 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1785 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1786 	}
1787 }
1788 
1789 static __rte_noinline uint32_t
1790 virtio_dev_rx_async_submit_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1791 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
1792 	__rte_exclusive_locks_required(&vq->access_lock)
1793 	__rte_shared_locks_required(&vq->iotlb_lock)
1794 {
1795 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1796 	uint32_t pkt_idx = 0;
1797 	uint16_t num_buffers;
1798 	uint16_t avail_head;
1799 
1800 	struct vhost_async *async = vq->async;
1801 	struct async_inflight_info *pkts_info = async->pkts_info;
1802 	uint32_t pkt_err = 0;
1803 	uint16_t n_xfer;
1804 	uint16_t slot_idx = 0;
1805 
1806 	/*
1807 	 * The ordering between avail index and desc reads need to be enforced.
1808 	 */
1809 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1810 
1811 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1812 
1813 	async_iter_reset(async);
1814 
1815 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1816 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1817 		uint16_t nr_vec = 0;
1818 
1819 		if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec,
1820 						&num_buffers, avail_head, &nr_vec) < 0)) {
1821 			VHOST_LOG_DATA(dev->ifname, DEBUG,
1822 				"failed to get enough desc from vring\n");
1823 			vq->shadow_used_idx -= num_buffers;
1824 			break;
1825 		}
1826 
1827 		VHOST_LOG_DATA(dev->ifname, DEBUG,
1828 			"current index %d | end index %d\n",
1829 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1830 
1831 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers, true) < 0) {
1832 			vq->shadow_used_idx -= num_buffers;
1833 			break;
1834 		}
1835 
1836 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
1837 		pkts_info[slot_idx].descs = num_buffers;
1838 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1839 
1840 		vq->last_avail_idx += num_buffers;
1841 	}
1842 
1843 	if (unlikely(pkt_idx == 0))
1844 		return 0;
1845 
1846 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
1847 			async->iov_iter, pkt_idx);
1848 
1849 	pkt_err = pkt_idx - n_xfer;
1850 	if (unlikely(pkt_err)) {
1851 		uint16_t num_descs = 0;
1852 
1853 		VHOST_LOG_DATA(dev->ifname, DEBUG,
1854 			"%s: failed to transfer %u packets for queue %u.\n",
1855 			__func__, pkt_err, vq->index);
1856 
1857 		/* update number of completed packets */
1858 		pkt_idx = n_xfer;
1859 
1860 		/* calculate the sum of descriptors to revert */
1861 		while (pkt_err-- > 0) {
1862 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1863 			slot_idx--;
1864 		}
1865 
1866 		/* recover shadow used ring and available ring */
1867 		vq->shadow_used_idx -= num_descs;
1868 		vq->last_avail_idx -= num_descs;
1869 	}
1870 
1871 	/* keep used descriptors */
1872 	if (likely(vq->shadow_used_idx)) {
1873 		uint16_t to = async->desc_idx_split & (vq->size - 1);
1874 
1875 		store_dma_desc_info_split(vq->shadow_used_split,
1876 				async->descs_split, vq->size, 0, to,
1877 				vq->shadow_used_idx);
1878 
1879 		async->desc_idx_split += vq->shadow_used_idx;
1880 
1881 		async->pkts_idx += pkt_idx;
1882 		if (async->pkts_idx >= vq->size)
1883 			async->pkts_idx -= vq->size;
1884 
1885 		async->pkts_inflight_n += pkt_idx;
1886 		vq->shadow_used_idx = 0;
1887 	}
1888 
1889 	return pkt_idx;
1890 }
1891 
1892 
1893 static __rte_always_inline int
1894 vhost_enqueue_async_packed(struct virtio_net *dev,
1895 			    struct vhost_virtqueue *vq,
1896 			    struct rte_mbuf *pkt,
1897 			    struct buf_vector *buf_vec,
1898 			    uint16_t *nr_descs,
1899 			    uint16_t *nr_buffers)
1900 	__rte_exclusive_locks_required(&vq->access_lock)
1901 	__rte_shared_locks_required(&vq->iotlb_lock)
1902 {
1903 	uint16_t nr_vec = 0;
1904 	uint16_t avail_idx = vq->last_avail_idx;
1905 	uint16_t max_tries, tries = 0;
1906 	uint16_t buf_id = 0;
1907 	uint32_t len = 0;
1908 	uint16_t desc_count = 0;
1909 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1910 	uint32_t buffer_len[vq->size];
1911 	uint16_t buffer_buf_id[vq->size];
1912 	uint16_t buffer_desc_count[vq->size];
1913 
1914 	if (rxvq_is_mergeable(dev))
1915 		max_tries = vq->size - 1;
1916 	else
1917 		max_tries = 1;
1918 
1919 	while (size > 0) {
1920 		/*
1921 		 * if we tried all available ring items, and still
1922 		 * can't get enough buf, it means something abnormal
1923 		 * happened.
1924 		 */
1925 		if (unlikely(++tries > max_tries))
1926 			return -1;
1927 
1928 		if (unlikely(fill_vec_buf_packed(dev, vq,
1929 						avail_idx, &desc_count,
1930 						buf_vec, &nr_vec,
1931 						&buf_id, &len,
1932 						VHOST_ACCESS_RW) < 0))
1933 			return -1;
1934 
1935 		len = RTE_MIN(len, size);
1936 		size -= len;
1937 
1938 		buffer_len[*nr_buffers] = len;
1939 		buffer_buf_id[*nr_buffers] = buf_id;
1940 		buffer_desc_count[*nr_buffers] = desc_count;
1941 		*nr_buffers += 1;
1942 		*nr_descs += desc_count;
1943 		avail_idx += desc_count;
1944 		if (avail_idx >= vq->size)
1945 			avail_idx -= vq->size;
1946 	}
1947 
1948 	if (unlikely(mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, true) < 0))
1949 		return -1;
1950 
1951 	vhost_async_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
1952 					buffer_desc_count, *nr_buffers);
1953 
1954 	return 0;
1955 }
1956 
1957 static __rte_always_inline int16_t
1958 virtio_dev_rx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1959 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers)
1960 	__rte_exclusive_locks_required(&vq->access_lock)
1961 	__rte_shared_locks_required(&vq->iotlb_lock)
1962 {
1963 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1964 
1965 	if (unlikely(vhost_enqueue_async_packed(dev, vq, pkt, buf_vec,
1966 					nr_descs, nr_buffers) < 0)) {
1967 		VHOST_LOG_DATA(dev->ifname, DEBUG, "failed to get enough desc from vring\n");
1968 		return -1;
1969 	}
1970 
1971 	VHOST_LOG_DATA(dev->ifname, DEBUG,
1972 		"current index %d | end index %d\n",
1973 		vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1974 
1975 	return 0;
1976 }
1977 
1978 static __rte_always_inline void
1979 virtio_dev_rx_async_packed_batch_enqueue(struct virtio_net *dev,
1980 			   struct vhost_virtqueue *vq,
1981 			   struct rte_mbuf **pkts,
1982 			   uint64_t *desc_addrs,
1983 			   uint64_t *lens)
1984 	__rte_exclusive_locks_required(&vq->access_lock)
1985 	__rte_shared_locks_required(&vq->iotlb_lock)
1986 {
1987 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1988 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1989 	struct vring_packed_desc *descs = vq->desc_packed;
1990 	struct vhost_async *async = vq->async;
1991 	uint16_t avail_idx = vq->last_avail_idx;
1992 	uint32_t mbuf_offset = 0;
1993 	uint16_t ids[PACKED_BATCH_SIZE];
1994 	uint64_t mapped_len[PACKED_BATCH_SIZE];
1995 	void *host_iova[PACKED_BATCH_SIZE];
1996 	uintptr_t desc;
1997 	uint16_t i;
1998 
1999 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2000 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2001 		desc = vhost_iova_to_vva(dev, vq, desc_addrs[i], &lens[i], VHOST_ACCESS_RW);
2002 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc;
2003 		lens[i] = pkts[i]->pkt_len +
2004 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
2005 	}
2006 
2007 	if (rxvq_is_mergeable(dev)) {
2008 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2009 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
2010 		}
2011 	}
2012 
2013 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2014 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
2015 
2016 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2017 
2018 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2019 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
2020 			desc_addrs[i] + buf_offset, lens[i], &mapped_len[i]);
2021 	}
2022 
2023 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2024 		async_iter_initialize(dev, async);
2025 		async_iter_add_iovec(dev, async,
2026 				(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
2027 				host_iova[i],
2028 				mapped_len[i]);
2029 		async->iter_idx++;
2030 	}
2031 
2032 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2033 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr, lens[i]);
2034 
2035 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2036 		ids[i] = descs[avail_idx + i].id;
2037 
2038 	vhost_async_shadow_enqueue_packed_batch(vq, lens, ids);
2039 }
2040 
2041 static __rte_always_inline int
2042 virtio_dev_rx_async_packed_batch(struct virtio_net *dev,
2043 			   struct vhost_virtqueue *vq,
2044 			   struct rte_mbuf **pkts,
2045 			   int16_t dma_id, uint16_t vchan_id)
2046 	__rte_exclusive_locks_required(&vq->access_lock)
2047 	__rte_shared_locks_required(&vq->iotlb_lock)
2048 {
2049 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
2050 	uint64_t lens[PACKED_BATCH_SIZE];
2051 
2052 	if (virtio_dev_rx_async_batch_check(vq, pkts, desc_addrs, lens, dma_id, vchan_id) == -1)
2053 		return -1;
2054 
2055 	virtio_dev_rx_async_packed_batch_enqueue(dev, vq, pkts, desc_addrs, lens);
2056 
2057 	return 0;
2058 }
2059 
2060 static __rte_always_inline void
2061 dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
2062 			uint32_t nr_err, uint32_t *pkt_idx)
2063 	__rte_exclusive_locks_required(&vq->access_lock)
2064 {
2065 	uint16_t descs_err = 0;
2066 	uint16_t buffers_err = 0;
2067 	struct vhost_async *async = vq->async;
2068 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
2069 
2070 	*pkt_idx -= nr_err;
2071 	/* calculate the sum of buffers and descs of DMA-error packets. */
2072 	while (nr_err-- > 0) {
2073 		descs_err += pkts_info[slot_idx % vq->size].descs;
2074 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
2075 		slot_idx--;
2076 	}
2077 
2078 	if (vq->last_avail_idx >= descs_err) {
2079 		vq->last_avail_idx -= descs_err;
2080 	} else {
2081 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
2082 		vq->avail_wrap_counter ^= 1;
2083 	}
2084 
2085 	if (async->buffer_idx_packed >= buffers_err)
2086 		async->buffer_idx_packed -= buffers_err;
2087 	else
2088 		async->buffer_idx_packed = async->buffer_idx_packed + vq->size - buffers_err;
2089 }
2090 
2091 static __rte_noinline uint32_t
2092 virtio_dev_rx_async_submit_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2093 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2094 	__rte_exclusive_locks_required(&vq->access_lock)
2095 	__rte_shared_locks_required(&vq->iotlb_lock)
2096 {
2097 	uint32_t pkt_idx = 0;
2098 	uint16_t n_xfer;
2099 	uint16_t num_buffers;
2100 	uint16_t num_descs;
2101 
2102 	struct vhost_async *async = vq->async;
2103 	struct async_inflight_info *pkts_info = async->pkts_info;
2104 	uint32_t pkt_err = 0;
2105 	uint16_t slot_idx = 0;
2106 	uint16_t i;
2107 
2108 	do {
2109 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2110 
2111 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
2112 			if (!virtio_dev_rx_async_packed_batch(dev, vq, &pkts[pkt_idx],
2113 					dma_id, vchan_id)) {
2114 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
2115 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2116 					pkts_info[slot_idx].descs = 1;
2117 					pkts_info[slot_idx].nr_buffers = 1;
2118 					pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2119 					pkt_idx++;
2120 				}
2121 				continue;
2122 			}
2123 		}
2124 
2125 		num_buffers = 0;
2126 		num_descs = 0;
2127 		if (unlikely(virtio_dev_rx_async_packed(dev, vq, pkts[pkt_idx],
2128 						&num_descs, &num_buffers) < 0))
2129 			break;
2130 
2131 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2132 
2133 		pkts_info[slot_idx].descs = num_descs;
2134 		pkts_info[slot_idx].nr_buffers = num_buffers;
2135 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2136 
2137 		pkt_idx++;
2138 		vq_inc_last_avail_packed(vq, num_descs);
2139 	} while (pkt_idx < count);
2140 
2141 	if (unlikely(pkt_idx == 0))
2142 		return 0;
2143 
2144 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
2145 			async->iov_iter, pkt_idx);
2146 
2147 	async_iter_reset(async);
2148 
2149 	pkt_err = pkt_idx - n_xfer;
2150 	if (unlikely(pkt_err)) {
2151 		VHOST_LOG_DATA(dev->ifname, DEBUG,
2152 			"%s: failed to transfer %u packets for queue %u.\n",
2153 			__func__, pkt_err, vq->index);
2154 		dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
2155 	}
2156 
2157 	async->pkts_idx += pkt_idx;
2158 	if (async->pkts_idx >= vq->size)
2159 		async->pkts_idx -= vq->size;
2160 
2161 	async->pkts_inflight_n += pkt_idx;
2162 
2163 	return pkt_idx;
2164 }
2165 
2166 static __rte_always_inline void
2167 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
2168 	__rte_shared_locks_required(&vq->access_lock)
2169 {
2170 	struct vhost_async *async = vq->async;
2171 	uint16_t nr_left = n_descs;
2172 	uint16_t nr_copy;
2173 	uint16_t to, from;
2174 
2175 	do {
2176 		from = async->last_desc_idx_split & (vq->size - 1);
2177 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
2178 		to = vq->last_used_idx & (vq->size - 1);
2179 
2180 		if (to + nr_copy <= vq->size) {
2181 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2182 					nr_copy * sizeof(struct vring_used_elem));
2183 		} else {
2184 			uint16_t size = vq->size - to;
2185 
2186 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2187 					size * sizeof(struct vring_used_elem));
2188 			rte_memcpy(&vq->used->ring[0], &async->descs_split[from + size],
2189 					(nr_copy - size) * sizeof(struct vring_used_elem));
2190 		}
2191 
2192 		async->last_desc_idx_split += nr_copy;
2193 		vq->last_used_idx += nr_copy;
2194 		nr_left -= nr_copy;
2195 	} while (nr_left > 0);
2196 }
2197 
2198 static __rte_always_inline void
2199 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
2200 				uint16_t n_buffers)
2201 	__rte_shared_locks_required(&vq->access_lock)
2202 {
2203 	struct vhost_async *async = vq->async;
2204 	uint16_t from = async->last_buffer_idx_packed;
2205 	uint16_t used_idx = vq->last_used_idx;
2206 	uint16_t head_idx = vq->last_used_idx;
2207 	uint16_t head_flags = 0;
2208 	uint16_t i;
2209 
2210 	/* Split loop in two to save memory barriers */
2211 	for (i = 0; i < n_buffers; i++) {
2212 		vq->desc_packed[used_idx].id = async->buffers_packed[from].id;
2213 		vq->desc_packed[used_idx].len = async->buffers_packed[from].len;
2214 
2215 		used_idx += async->buffers_packed[from].count;
2216 		if (used_idx >= vq->size)
2217 			used_idx -= vq->size;
2218 
2219 		from++;
2220 		if (from >= vq->size)
2221 			from = 0;
2222 	}
2223 
2224 	/* The ordering for storing desc flags needs to be enforced. */
2225 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
2226 
2227 	from = async->last_buffer_idx_packed;
2228 
2229 	for (i = 0; i < n_buffers; i++) {
2230 		uint16_t flags;
2231 
2232 		if (async->buffers_packed[from].len)
2233 			flags = VRING_DESC_F_WRITE;
2234 		else
2235 			flags = 0;
2236 
2237 		if (vq->used_wrap_counter) {
2238 			flags |= VRING_DESC_F_USED;
2239 			flags |= VRING_DESC_F_AVAIL;
2240 		} else {
2241 			flags &= ~VRING_DESC_F_USED;
2242 			flags &= ~VRING_DESC_F_AVAIL;
2243 		}
2244 
2245 		if (i > 0) {
2246 			vq->desc_packed[vq->last_used_idx].flags = flags;
2247 		} else {
2248 			head_idx = vq->last_used_idx;
2249 			head_flags = flags;
2250 		}
2251 
2252 		vq_inc_last_used_packed(vq, async->buffers_packed[from].count);
2253 
2254 		from++;
2255 		if (from == vq->size)
2256 			from = 0;
2257 	}
2258 
2259 	vq->desc_packed[head_idx].flags = head_flags;
2260 	async->last_buffer_idx_packed = from;
2261 }
2262 
2263 static __rte_always_inline uint16_t
2264 vhost_poll_enqueue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2265 	struct rte_mbuf **pkts, uint16_t count, int16_t dma_id, uint16_t vchan_id)
2266 	__rte_shared_locks_required(&vq->access_lock)
2267 {
2268 	struct vhost_async *async = vq->async;
2269 	struct async_inflight_info *pkts_info = async->pkts_info;
2270 	uint16_t nr_cpl_pkts = 0;
2271 	uint16_t n_descs = 0, n_buffers = 0;
2272 	uint16_t start_idx, from, i;
2273 
2274 	/* Check completed copies for the given DMA vChannel */
2275 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
2276 
2277 	start_idx = async_get_first_inflight_pkt_idx(vq);
2278 	/**
2279 	 * Calculate the number of copy completed packets.
2280 	 * Note that there may be completed packets even if
2281 	 * no copies are reported done by the given DMA vChannel,
2282 	 * as it's possible that a virtqueue uses multiple DMA
2283 	 * vChannels.
2284 	 */
2285 	from = start_idx;
2286 	while (vq->async->pkts_cmpl_flag[from] && count--) {
2287 		vq->async->pkts_cmpl_flag[from] = false;
2288 		from++;
2289 		if (from >= vq->size)
2290 			from -= vq->size;
2291 		nr_cpl_pkts++;
2292 	}
2293 
2294 	if (nr_cpl_pkts == 0)
2295 		return 0;
2296 
2297 	for (i = 0; i < nr_cpl_pkts; i++) {
2298 		from = (start_idx + i) % vq->size;
2299 		/* Only used with packed ring */
2300 		n_buffers += pkts_info[from].nr_buffers;
2301 		/* Only used with split ring */
2302 		n_descs += pkts_info[from].descs;
2303 		pkts[i] = pkts_info[from].mbuf;
2304 	}
2305 
2306 	async->pkts_inflight_n -= nr_cpl_pkts;
2307 
2308 	if (likely(vq->enabled && vq->access_ok)) {
2309 		if (vq_is_packed(dev)) {
2310 			write_back_completed_descs_packed(vq, n_buffers);
2311 			vhost_vring_call_packed(dev, vq);
2312 		} else {
2313 			write_back_completed_descs_split(vq, n_descs);
2314 			__atomic_fetch_add(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
2315 			vhost_vring_call_split(dev, vq);
2316 		}
2317 	} else {
2318 		if (vq_is_packed(dev)) {
2319 			async->last_buffer_idx_packed += n_buffers;
2320 			if (async->last_buffer_idx_packed >= vq->size)
2321 				async->last_buffer_idx_packed -= vq->size;
2322 		} else {
2323 			async->last_desc_idx_split += n_descs;
2324 		}
2325 	}
2326 
2327 	return nr_cpl_pkts;
2328 }
2329 
2330 uint16_t
2331 rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
2332 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2333 		uint16_t vchan_id)
2334 {
2335 	struct virtio_net *dev = get_device(vid);
2336 	struct vhost_virtqueue *vq;
2337 	uint16_t n_pkts_cpl = 0;
2338 
2339 	if (unlikely(!dev))
2340 		return 0;
2341 
2342 	VHOST_LOG_DATA(dev->ifname, DEBUG, "%s\n", __func__);
2343 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2344 		VHOST_LOG_DATA(dev->ifname, ERR,
2345 			"%s: invalid virtqueue idx %d.\n",
2346 			__func__, queue_id);
2347 		return 0;
2348 	}
2349 
2350 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2351 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2352 		VHOST_LOG_DATA(dev->ifname, ERR,
2353 			"%s: invalid channel %d:%u.\n",
2354 			__func__, dma_id, vchan_id);
2355 		return 0;
2356 	}
2357 
2358 	vq = dev->virtqueue[queue_id];
2359 
2360 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2361 		VHOST_LOG_DATA(dev->ifname, DEBUG,
2362 			"%s: virtqueue %u is busy.\n",
2363 			__func__, queue_id);
2364 		return 0;
2365 	}
2366 
2367 	if (unlikely(!vq->async)) {
2368 		VHOST_LOG_DATA(dev->ifname, ERR,
2369 			"%s: async not registered for virtqueue %d.\n",
2370 			__func__, queue_id);
2371 		goto out;
2372 	}
2373 
2374 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count, dma_id, vchan_id);
2375 
2376 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2377 	vq->stats.inflight_completed += n_pkts_cpl;
2378 
2379 out:
2380 	rte_rwlock_read_unlock(&vq->access_lock);
2381 
2382 	return n_pkts_cpl;
2383 }
2384 
2385 uint16_t
2386 rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
2387 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2388 		uint16_t vchan_id)
2389 {
2390 	struct virtio_net *dev = get_device(vid);
2391 	struct vhost_virtqueue *vq;
2392 	uint16_t n_pkts_cpl = 0;
2393 
2394 	if (!dev)
2395 		return 0;
2396 
2397 	VHOST_LOG_DATA(dev->ifname, DEBUG, "%s\n", __func__);
2398 	if (unlikely(queue_id >= dev->nr_vring)) {
2399 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: invalid virtqueue idx %d.\n",
2400 			__func__, queue_id);
2401 		return 0;
2402 	}
2403 
2404 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2405 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: invalid dma id %d.\n",
2406 			__func__, dma_id);
2407 		return 0;
2408 	}
2409 
2410 	vq = dev->virtqueue[queue_id];
2411 
2412 	vq_assert_lock(dev, vq);
2413 
2414 	if (unlikely(!vq->async)) {
2415 		VHOST_LOG_DATA(dev->ifname, ERR,
2416 			"%s: async not registered for virtqueue %d.\n",
2417 			__func__, queue_id);
2418 		return 0;
2419 	}
2420 
2421 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2422 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2423 		VHOST_LOG_DATA(dev->ifname, ERR,
2424 			"%s: invalid channel %d:%u.\n",
2425 			__func__, dma_id, vchan_id);
2426 		return 0;
2427 	}
2428 
2429 	if ((queue_id & 1) == 0)
2430 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2431 			dma_id, vchan_id);
2432 	else
2433 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2434 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2435 
2436 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2437 	vq->stats.inflight_completed += n_pkts_cpl;
2438 
2439 	return n_pkts_cpl;
2440 }
2441 
2442 uint16_t
2443 rte_vhost_clear_queue(int vid, uint16_t queue_id, struct rte_mbuf **pkts,
2444 		uint16_t count, int16_t dma_id, uint16_t vchan_id)
2445 {
2446 	struct virtio_net *dev = get_device(vid);
2447 	struct vhost_virtqueue *vq;
2448 	uint16_t n_pkts_cpl = 0;
2449 
2450 	if (!dev)
2451 		return 0;
2452 
2453 	VHOST_LOG_DATA(dev->ifname, DEBUG, "%s\n", __func__);
2454 	if (unlikely(queue_id >= dev->nr_vring)) {
2455 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: invalid virtqueue idx %u.\n",
2456 			__func__, queue_id);
2457 		return 0;
2458 	}
2459 
2460 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2461 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: invalid dma id %d.\n",
2462 			__func__, dma_id);
2463 		return 0;
2464 	}
2465 
2466 	vq = dev->virtqueue[queue_id];
2467 
2468 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2469 		VHOST_LOG_DATA(dev->ifname, DEBUG, "%s: virtqueue %u is busy.\n",
2470 			__func__, queue_id);
2471 		return 0;
2472 	}
2473 
2474 	if (unlikely(!vq->async)) {
2475 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: async not registered for queue id %u.\n",
2476 			__func__, queue_id);
2477 		goto out_access_unlock;
2478 	}
2479 
2480 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2481 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2482 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: invalid channel %d:%u.\n",
2483 			__func__, dma_id, vchan_id);
2484 		goto out_access_unlock;
2485 	}
2486 
2487 	if ((queue_id & 1) == 0)
2488 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2489 			dma_id, vchan_id);
2490 	else
2491 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2492 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2493 
2494 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2495 	vq->stats.inflight_completed += n_pkts_cpl;
2496 
2497 out_access_unlock:
2498 	rte_rwlock_read_unlock(&vq->access_lock);
2499 
2500 	return n_pkts_cpl;
2501 }
2502 
2503 static __rte_always_inline uint32_t
2504 virtio_dev_rx_async_submit(struct virtio_net *dev, struct vhost_virtqueue *vq,
2505 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2506 {
2507 	uint32_t nb_tx = 0;
2508 
2509 	VHOST_LOG_DATA(dev->ifname, DEBUG, "%s\n", __func__);
2510 
2511 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2512 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2513 		VHOST_LOG_DATA(dev->ifname, ERR,
2514 			"%s: invalid channel %d:%u.\n",
2515 			 __func__, dma_id, vchan_id);
2516 		return 0;
2517 	}
2518 
2519 	rte_rwlock_write_lock(&vq->access_lock);
2520 
2521 	if (unlikely(!vq->enabled || !vq->async))
2522 		goto out_access_unlock;
2523 
2524 	vhost_user_iotlb_rd_lock(vq);
2525 
2526 	if (unlikely(!vq->access_ok))
2527 		if (unlikely(vring_translate(dev, vq) < 0))
2528 			goto out;
2529 
2530 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
2531 	if (count == 0)
2532 		goto out;
2533 
2534 	if (vq_is_packed(dev))
2535 		nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, pkts, count,
2536 			dma_id, vchan_id);
2537 	else
2538 		nb_tx = virtio_dev_rx_async_submit_split(dev, vq, pkts, count,
2539 			dma_id, vchan_id);
2540 
2541 	vq->stats.inflight_submitted += nb_tx;
2542 
2543 out:
2544 	vhost_user_iotlb_rd_unlock(vq);
2545 
2546 out_access_unlock:
2547 	rte_rwlock_write_unlock(&vq->access_lock);
2548 
2549 	return nb_tx;
2550 }
2551 
2552 uint16_t
2553 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2554 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2555 		uint16_t vchan_id)
2556 {
2557 	struct virtio_net *dev = get_device(vid);
2558 
2559 	if (!dev)
2560 		return 0;
2561 
2562 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2563 		VHOST_LOG_DATA(dev->ifname, ERR,
2564 			"%s: built-in vhost net backend is disabled.\n",
2565 			__func__);
2566 		return 0;
2567 	}
2568 
2569 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2570 		VHOST_LOG_DATA(dev->ifname, ERR,
2571 			"%s: invalid virtqueue idx %d.\n",
2572 			__func__, queue_id);
2573 		return 0;
2574 	}
2575 
2576 	return virtio_dev_rx_async_submit(dev, dev->virtqueue[queue_id], pkts, count,
2577 		dma_id, vchan_id);
2578 }
2579 
2580 static inline bool
2581 virtio_net_with_host_offload(struct virtio_net *dev)
2582 {
2583 	if (dev->features &
2584 			((1ULL << VIRTIO_NET_F_CSUM) |
2585 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2586 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2587 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2588 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2589 		return true;
2590 
2591 	return false;
2592 }
2593 
2594 static int
2595 parse_headers(struct rte_mbuf *m, uint8_t *l4_proto)
2596 {
2597 	struct rte_ipv4_hdr *ipv4_hdr;
2598 	struct rte_ipv6_hdr *ipv6_hdr;
2599 	struct rte_ether_hdr *eth_hdr;
2600 	uint16_t ethertype;
2601 	uint16_t data_len = rte_pktmbuf_data_len(m);
2602 
2603 	if (data_len < sizeof(struct rte_ether_hdr))
2604 		return -EINVAL;
2605 
2606 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2607 
2608 	m->l2_len = sizeof(struct rte_ether_hdr);
2609 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2610 
2611 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2612 		if (data_len < sizeof(struct rte_ether_hdr) +
2613 				sizeof(struct rte_vlan_hdr))
2614 			goto error;
2615 
2616 		struct rte_vlan_hdr *vlan_hdr =
2617 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2618 
2619 		m->l2_len += sizeof(struct rte_vlan_hdr);
2620 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2621 	}
2622 
2623 	switch (ethertype) {
2624 	case RTE_ETHER_TYPE_IPV4:
2625 		if (data_len < m->l2_len + sizeof(struct rte_ipv4_hdr))
2626 			goto error;
2627 		ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
2628 				m->l2_len);
2629 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2630 		if (data_len < m->l2_len + m->l3_len)
2631 			goto error;
2632 		m->ol_flags |= RTE_MBUF_F_TX_IPV4;
2633 		*l4_proto = ipv4_hdr->next_proto_id;
2634 		break;
2635 	case RTE_ETHER_TYPE_IPV6:
2636 		if (data_len < m->l2_len + sizeof(struct rte_ipv6_hdr))
2637 			goto error;
2638 		ipv6_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
2639 				m->l2_len);
2640 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2641 		m->ol_flags |= RTE_MBUF_F_TX_IPV6;
2642 		*l4_proto = ipv6_hdr->proto;
2643 		break;
2644 	default:
2645 		/* a valid L3 header is needed for further L4 parsing */
2646 		goto error;
2647 	}
2648 
2649 	/* both CSUM and GSO need a valid L4 header */
2650 	switch (*l4_proto) {
2651 	case IPPROTO_TCP:
2652 		if (data_len < m->l2_len + m->l3_len +
2653 				sizeof(struct rte_tcp_hdr))
2654 			goto error;
2655 		break;
2656 	case IPPROTO_UDP:
2657 		if (data_len < m->l2_len + m->l3_len +
2658 				sizeof(struct rte_udp_hdr))
2659 			goto error;
2660 		break;
2661 	case IPPROTO_SCTP:
2662 		if (data_len < m->l2_len + m->l3_len +
2663 				sizeof(struct rte_sctp_hdr))
2664 			goto error;
2665 		break;
2666 	default:
2667 		goto error;
2668 	}
2669 
2670 	return 0;
2671 
2672 error:
2673 	m->l2_len = 0;
2674 	m->l3_len = 0;
2675 	m->ol_flags = 0;
2676 	return -EINVAL;
2677 }
2678 
2679 static __rte_always_inline void
2680 vhost_dequeue_offload_legacy(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2681 		struct rte_mbuf *m)
2682 {
2683 	uint8_t l4_proto = 0;
2684 	struct rte_tcp_hdr *tcp_hdr = NULL;
2685 	uint16_t tcp_len;
2686 	uint16_t data_len = rte_pktmbuf_data_len(m);
2687 
2688 	if (parse_headers(m, &l4_proto) < 0)
2689 		return;
2690 
2691 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2692 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2693 			switch (hdr->csum_offset) {
2694 			case (offsetof(struct rte_tcp_hdr, cksum)):
2695 				if (l4_proto != IPPROTO_TCP)
2696 					goto error;
2697 				m->ol_flags |= RTE_MBUF_F_TX_TCP_CKSUM;
2698 				break;
2699 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2700 				if (l4_proto != IPPROTO_UDP)
2701 					goto error;
2702 				m->ol_flags |= RTE_MBUF_F_TX_UDP_CKSUM;
2703 				break;
2704 			case (offsetof(struct rte_sctp_hdr, cksum)):
2705 				if (l4_proto != IPPROTO_SCTP)
2706 					goto error;
2707 				m->ol_flags |= RTE_MBUF_F_TX_SCTP_CKSUM;
2708 				break;
2709 			default:
2710 				goto error;
2711 			}
2712 		} else {
2713 			goto error;
2714 		}
2715 	}
2716 
2717 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2718 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2719 		case VIRTIO_NET_HDR_GSO_TCPV4:
2720 		case VIRTIO_NET_HDR_GSO_TCPV6:
2721 			if (l4_proto != IPPROTO_TCP)
2722 				goto error;
2723 			tcp_hdr = rte_pktmbuf_mtod_offset(m,
2724 					struct rte_tcp_hdr *,
2725 					m->l2_len + m->l3_len);
2726 			tcp_len = (tcp_hdr->data_off & 0xf0) >> 2;
2727 			if (data_len < m->l2_len + m->l3_len + tcp_len)
2728 				goto error;
2729 			m->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
2730 			m->tso_segsz = hdr->gso_size;
2731 			m->l4_len = tcp_len;
2732 			break;
2733 		case VIRTIO_NET_HDR_GSO_UDP:
2734 			if (l4_proto != IPPROTO_UDP)
2735 				goto error;
2736 			m->ol_flags |= RTE_MBUF_F_TX_UDP_SEG;
2737 			m->tso_segsz = hdr->gso_size;
2738 			m->l4_len = sizeof(struct rte_udp_hdr);
2739 			break;
2740 		default:
2741 			VHOST_LOG_DATA(dev->ifname, WARNING,
2742 				"unsupported gso type %u.\n",
2743 				hdr->gso_type);
2744 			goto error;
2745 		}
2746 	}
2747 	return;
2748 
2749 error:
2750 	m->l2_len = 0;
2751 	m->l3_len = 0;
2752 	m->ol_flags = 0;
2753 }
2754 
2755 static __rte_always_inline void
2756 vhost_dequeue_offload(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2757 		struct rte_mbuf *m, bool legacy_ol_flags)
2758 {
2759 	struct rte_net_hdr_lens hdr_lens;
2760 	int l4_supported = 0;
2761 	uint32_t ptype;
2762 
2763 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2764 		return;
2765 
2766 	if (legacy_ol_flags) {
2767 		vhost_dequeue_offload_legacy(dev, hdr, m);
2768 		return;
2769 	}
2770 
2771 	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
2772 
2773 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2774 	m->packet_type = ptype;
2775 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2776 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2777 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2778 		l4_supported = 1;
2779 
2780 	/* According to Virtio 1.1 spec, the device only needs to look at
2781 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2782 	 * This differs from the processing incoming packets path where the
2783 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2784 	 * device.
2785 	 *
2786 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2787 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2788 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2789 	 *
2790 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2791 	 * The device MUST ignore flag bits that it does not recognize.
2792 	 */
2793 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2794 		uint32_t hdrlen;
2795 
2796 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2797 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2798 			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
2799 		} else {
2800 			/* Unknown proto or tunnel, do sw cksum. We can assume
2801 			 * the cksum field is in the first segment since the
2802 			 * buffers we provided to the host are large enough.
2803 			 * In case of SCTP, this will be wrong since it's a CRC
2804 			 * but there's nothing we can do.
2805 			 */
2806 			uint16_t csum = 0, off;
2807 
2808 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2809 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2810 				return;
2811 			if (likely(csum != 0xffff))
2812 				csum = ~csum;
2813 			off = hdr->csum_offset + hdr->csum_start;
2814 			if (rte_pktmbuf_data_len(m) >= off + 1)
2815 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2816 		}
2817 	}
2818 
2819 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2820 		if (hdr->gso_size == 0)
2821 			return;
2822 
2823 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2824 		case VIRTIO_NET_HDR_GSO_TCPV4:
2825 		case VIRTIO_NET_HDR_GSO_TCPV6:
2826 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2827 				break;
2828 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2829 			m->tso_segsz = hdr->gso_size;
2830 			break;
2831 		case VIRTIO_NET_HDR_GSO_UDP:
2832 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2833 				break;
2834 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2835 			m->tso_segsz = hdr->gso_size;
2836 			break;
2837 		default:
2838 			break;
2839 		}
2840 	}
2841 }
2842 
2843 static __rte_noinline void
2844 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2845 		struct buf_vector *buf_vec)
2846 {
2847 	uint64_t len;
2848 	uint64_t remain = sizeof(struct virtio_net_hdr);
2849 	uint64_t src;
2850 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2851 
2852 	while (remain) {
2853 		len = RTE_MIN(remain, buf_vec->buf_len);
2854 		src = buf_vec->buf_addr;
2855 		rte_memcpy((void *)(uintptr_t)dst,
2856 				(void *)(uintptr_t)src, len);
2857 
2858 		remain -= len;
2859 		dst += len;
2860 		buf_vec++;
2861 	}
2862 }
2863 
2864 static __rte_always_inline int
2865 desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2866 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2867 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2868 		  bool legacy_ol_flags, uint16_t slot_idx, bool is_async)
2869 	__rte_shared_locks_required(&vq->access_lock)
2870 	__rte_shared_locks_required(&vq->iotlb_lock)
2871 {
2872 	uint32_t buf_avail, buf_offset, buf_len;
2873 	uint64_t buf_addr, buf_iova;
2874 	uint32_t mbuf_avail, mbuf_offset;
2875 	uint32_t hdr_remain = dev->vhost_hlen;
2876 	uint32_t cpy_len;
2877 	struct rte_mbuf *cur = m, *prev = m;
2878 	struct virtio_net_hdr tmp_hdr;
2879 	struct virtio_net_hdr *hdr = NULL;
2880 	uint16_t vec_idx;
2881 	struct vhost_async *async = vq->async;
2882 	struct async_inflight_info *pkts_info;
2883 
2884 	/*
2885 	 * The caller has checked the descriptors chain is larger than the
2886 	 * header size.
2887 	 */
2888 
2889 	if (virtio_net_with_host_offload(dev)) {
2890 		if (unlikely(buf_vec[0].buf_len < sizeof(struct virtio_net_hdr))) {
2891 			/*
2892 			 * No luck, the virtio-net header doesn't fit
2893 			 * in a contiguous virtual area.
2894 			 */
2895 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2896 			hdr = &tmp_hdr;
2897 		} else {
2898 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_vec[0].buf_addr);
2899 		}
2900 	}
2901 
2902 	for (vec_idx = 0; vec_idx < nr_vec; vec_idx++) {
2903 		if (buf_vec[vec_idx].buf_len > hdr_remain)
2904 			break;
2905 
2906 		hdr_remain -= buf_vec[vec_idx].buf_len;
2907 	}
2908 
2909 	buf_addr = buf_vec[vec_idx].buf_addr;
2910 	buf_iova = buf_vec[vec_idx].buf_iova;
2911 	buf_len = buf_vec[vec_idx].buf_len;
2912 	buf_offset = hdr_remain;
2913 	buf_avail = buf_vec[vec_idx].buf_len - hdr_remain;
2914 
2915 	PRINT_PACKET(dev,
2916 			(uintptr_t)(buf_addr + buf_offset),
2917 			(uint32_t)buf_avail, 0);
2918 
2919 	mbuf_offset = 0;
2920 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2921 
2922 	if (is_async) {
2923 		pkts_info = async->pkts_info;
2924 		if (async_iter_initialize(dev, async))
2925 			return -1;
2926 	}
2927 
2928 	while (1) {
2929 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2930 
2931 		if (is_async) {
2932 			if (async_fill_seg(dev, vq, cur, mbuf_offset,
2933 					   buf_iova + buf_offset, cpy_len, false) < 0)
2934 				goto error;
2935 		} else if (likely(hdr && cur == m)) {
2936 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
2937 				(void *)((uintptr_t)(buf_addr + buf_offset)),
2938 				cpy_len);
2939 		} else {
2940 			sync_fill_seg(dev, vq, cur, mbuf_offset,
2941 				      buf_addr + buf_offset,
2942 				      buf_iova + buf_offset, cpy_len, false);
2943 		}
2944 
2945 		mbuf_avail  -= cpy_len;
2946 		mbuf_offset += cpy_len;
2947 		buf_avail -= cpy_len;
2948 		buf_offset += cpy_len;
2949 
2950 		/* This buf reaches to its end, get the next one */
2951 		if (buf_avail == 0) {
2952 			if (++vec_idx >= nr_vec)
2953 				break;
2954 
2955 			buf_addr = buf_vec[vec_idx].buf_addr;
2956 			buf_iova = buf_vec[vec_idx].buf_iova;
2957 			buf_len = buf_vec[vec_idx].buf_len;
2958 
2959 			buf_offset = 0;
2960 			buf_avail  = buf_len;
2961 
2962 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2963 					(uint32_t)buf_avail, 0);
2964 		}
2965 
2966 		/*
2967 		 * This mbuf reaches to its end, get a new one
2968 		 * to hold more data.
2969 		 */
2970 		if (mbuf_avail == 0) {
2971 			cur = rte_pktmbuf_alloc(mbuf_pool);
2972 			if (unlikely(cur == NULL)) {
2973 				VHOST_LOG_DATA(dev->ifname, ERR,
2974 					"failed to allocate memory for mbuf.\n");
2975 				goto error;
2976 			}
2977 
2978 			prev->next = cur;
2979 			prev->data_len = mbuf_offset;
2980 			m->nb_segs += 1;
2981 			m->pkt_len += mbuf_offset;
2982 			prev = cur;
2983 
2984 			mbuf_offset = 0;
2985 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
2986 		}
2987 	}
2988 
2989 	prev->data_len = mbuf_offset;
2990 	m->pkt_len    += mbuf_offset;
2991 
2992 	if (is_async) {
2993 		async_iter_finalize(async);
2994 		if (hdr)
2995 			pkts_info[slot_idx].nethdr = *hdr;
2996 	} else if (hdr) {
2997 		vhost_dequeue_offload(dev, hdr, m, legacy_ol_flags);
2998 	}
2999 
3000 	return 0;
3001 error:
3002 	if (is_async)
3003 		async_iter_cancel(async);
3004 
3005 	return -1;
3006 }
3007 
3008 static void
3009 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
3010 {
3011 	rte_free(opaque);
3012 }
3013 
3014 static int
3015 virtio_dev_extbuf_alloc(struct virtio_net *dev, struct rte_mbuf *pkt, uint32_t size)
3016 {
3017 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
3018 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
3019 	uint16_t buf_len;
3020 	rte_iova_t iova;
3021 	void *buf;
3022 
3023 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
3024 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
3025 
3026 	if (unlikely(total_len > UINT16_MAX))
3027 		return -ENOSPC;
3028 
3029 	buf_len = total_len;
3030 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
3031 	if (unlikely(buf == NULL))
3032 		return -ENOMEM;
3033 
3034 	/* Initialize shinfo */
3035 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
3036 						virtio_dev_extbuf_free, buf);
3037 	if (unlikely(shinfo == NULL)) {
3038 		rte_free(buf);
3039 		VHOST_LOG_DATA(dev->ifname, ERR, "failed to init shinfo\n");
3040 		return -1;
3041 	}
3042 
3043 	iova = rte_malloc_virt2iova(buf);
3044 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
3045 	rte_pktmbuf_reset_headroom(pkt);
3046 
3047 	return 0;
3048 }
3049 
3050 /*
3051  * Prepare a host supported pktmbuf.
3052  */
3053 static __rte_always_inline int
3054 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
3055 			 uint32_t data_len)
3056 {
3057 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
3058 		return 0;
3059 
3060 	/* attach an external buffer if supported */
3061 	if (dev->extbuf && !virtio_dev_extbuf_alloc(dev, pkt, data_len))
3062 		return 0;
3063 
3064 	/* check if chained buffers are allowed */
3065 	if (!dev->linearbuf)
3066 		return 0;
3067 
3068 	return -1;
3069 }
3070 
3071 __rte_always_inline
3072 static uint16_t
3073 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3074 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3075 	bool legacy_ol_flags)
3076 	__rte_shared_locks_required(&vq->access_lock)
3077 	__rte_shared_locks_required(&vq->iotlb_lock)
3078 {
3079 	uint16_t i;
3080 	uint16_t avail_entries;
3081 	uint16_t dropped = 0;
3082 	static bool allocerr_warned;
3083 
3084 	/*
3085 	 * The ordering between avail index and
3086 	 * desc reads needs to be enforced.
3087 	 */
3088 	avail_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
3089 			vq->last_avail_idx;
3090 	if (avail_entries == 0)
3091 		return 0;
3092 
3093 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3094 
3095 	VHOST_LOG_DATA(dev->ifname, DEBUG, "%s\n", __func__);
3096 
3097 	count = RTE_MIN(count, MAX_PKT_BURST);
3098 	count = RTE_MIN(count, avail_entries);
3099 	VHOST_LOG_DATA(dev->ifname, DEBUG, "about to dequeue %u buffers\n", count);
3100 
3101 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
3102 		return 0;
3103 
3104 	for (i = 0; i < count; i++) {
3105 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3106 		uint16_t head_idx;
3107 		uint32_t buf_len;
3108 		uint16_t nr_vec = 0;
3109 		int err;
3110 
3111 		if (unlikely(fill_vec_buf_split(dev, vq,
3112 						vq->last_avail_idx + i,
3113 						&nr_vec, buf_vec,
3114 						&head_idx, &buf_len,
3115 						VHOST_ACCESS_RO) < 0))
3116 			break;
3117 
3118 		update_shadow_used_ring_split(vq, head_idx, 0);
3119 
3120 		if (unlikely(buf_len <= dev->vhost_hlen)) {
3121 			dropped += 1;
3122 			i++;
3123 			break;
3124 		}
3125 
3126 		buf_len -= dev->vhost_hlen;
3127 
3128 		err = virtio_dev_pktmbuf_prep(dev, pkts[i], buf_len);
3129 		if (unlikely(err)) {
3130 			/*
3131 			 * mbuf allocation fails for jumbo packets when external
3132 			 * buffer allocation is not allowed and linear buffer
3133 			 * is required. Drop this packet.
3134 			 */
3135 			if (!allocerr_warned) {
3136 				VHOST_LOG_DATA(dev->ifname, ERR,
3137 					"failed mbuf alloc of size %d from %s.\n",
3138 					buf_len, mbuf_pool->name);
3139 				allocerr_warned = true;
3140 			}
3141 			dropped += 1;
3142 			i++;
3143 			break;
3144 		}
3145 
3146 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
3147 				   mbuf_pool, legacy_ol_flags, 0, false);
3148 		if (unlikely(err)) {
3149 			if (!allocerr_warned) {
3150 				VHOST_LOG_DATA(dev->ifname, ERR, "failed to copy desc to mbuf.\n");
3151 				allocerr_warned = true;
3152 			}
3153 			dropped += 1;
3154 			i++;
3155 			break;
3156 		}
3157 
3158 	}
3159 
3160 	if (dropped)
3161 		rte_pktmbuf_free_bulk(&pkts[i - 1], count - i + 1);
3162 
3163 	vq->last_avail_idx += i;
3164 
3165 	do_data_copy_dequeue(vq);
3166 	if (unlikely(i < count))
3167 		vq->shadow_used_idx = i;
3168 	if (likely(vq->shadow_used_idx)) {
3169 		flush_shadow_used_ring_split(dev, vq);
3170 		vhost_vring_call_split(dev, vq);
3171 	}
3172 
3173 	return (i - dropped);
3174 }
3175 
3176 __rte_noinline
3177 static uint16_t
3178 virtio_dev_tx_split_legacy(struct virtio_net *dev,
3179 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3180 	struct rte_mbuf **pkts, uint16_t count)
3181 	__rte_shared_locks_required(&vq->access_lock)
3182 	__rte_shared_locks_required(&vq->iotlb_lock)
3183 {
3184 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
3185 }
3186 
3187 __rte_noinline
3188 static uint16_t
3189 virtio_dev_tx_split_compliant(struct virtio_net *dev,
3190 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3191 	struct rte_mbuf **pkts, uint16_t count)
3192 	__rte_shared_locks_required(&vq->access_lock)
3193 	__rte_shared_locks_required(&vq->iotlb_lock)
3194 {
3195 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
3196 }
3197 
3198 static __rte_always_inline int
3199 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
3200 				 struct vhost_virtqueue *vq,
3201 				 struct rte_mbuf **pkts,
3202 				 uint16_t avail_idx,
3203 				 uintptr_t *desc_addrs,
3204 				 uint16_t *ids)
3205 	__rte_shared_locks_required(&vq->iotlb_lock)
3206 {
3207 	bool wrap = vq->avail_wrap_counter;
3208 	struct vring_packed_desc *descs = vq->desc_packed;
3209 	uint64_t lens[PACKED_BATCH_SIZE];
3210 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3211 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3212 	uint16_t flags, i;
3213 
3214 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3215 		return -1;
3216 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3217 		return -1;
3218 
3219 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3220 		flags = descs[avail_idx + i].flags;
3221 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3222 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3223 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3224 			return -1;
3225 	}
3226 
3227 	rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
3228 
3229 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3230 		lens[i] = descs[avail_idx + i].len;
3231 
3232 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3233 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
3234 						  descs[avail_idx + i].addr,
3235 						  &lens[i], VHOST_ACCESS_RW);
3236 	}
3237 
3238 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3239 		if (unlikely(!desc_addrs[i]))
3240 			return -1;
3241 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3242 			return -1;
3243 	}
3244 
3245 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3246 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3247 			goto err;
3248 	}
3249 
3250 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3251 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3252 
3253 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3254 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3255 			goto err;
3256 	}
3257 
3258 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3259 		pkts[i]->pkt_len = lens[i] - buf_offset;
3260 		pkts[i]->data_len = pkts[i]->pkt_len;
3261 		ids[i] = descs[avail_idx + i].id;
3262 	}
3263 
3264 	return 0;
3265 
3266 err:
3267 	return -1;
3268 }
3269 
3270 static __rte_always_inline int
3271 vhost_async_tx_batch_packed_check(struct virtio_net *dev,
3272 				 struct vhost_virtqueue *vq,
3273 				 struct rte_mbuf **pkts,
3274 				 uint16_t avail_idx,
3275 				 uintptr_t *desc_addrs,
3276 				 uint64_t *lens,
3277 				 uint16_t *ids,
3278 				 int16_t dma_id,
3279 				 uint16_t vchan_id)
3280 {
3281 	bool wrap = vq->avail_wrap_counter;
3282 	struct vring_packed_desc *descs = vq->desc_packed;
3283 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3284 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3285 	uint16_t flags, i;
3286 
3287 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3288 		return -1;
3289 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3290 		return -1;
3291 
3292 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3293 		flags = descs[avail_idx + i].flags;
3294 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3295 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3296 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3297 			return -1;
3298 	}
3299 
3300 	rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
3301 
3302 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3303 		lens[i] = descs[avail_idx + i].len;
3304 
3305 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3306 		desc_addrs[i] = descs[avail_idx + i].addr;
3307 	}
3308 
3309 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3310 		if (unlikely(!desc_addrs[i]))
3311 			return -1;
3312 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3313 			return -1;
3314 	}
3315 
3316 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3317 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3318 			goto err;
3319 	}
3320 
3321 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3322 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3323 
3324 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3325 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3326 			goto err;
3327 	}
3328 
3329 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3330 		pkts[i]->pkt_len = lens[i] - buf_offset;
3331 		pkts[i]->data_len = pkts[i]->pkt_len;
3332 		ids[i] = descs[avail_idx + i].id;
3333 	}
3334 
3335 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
3336 		return -1;
3337 
3338 	return 0;
3339 
3340 err:
3341 	return -1;
3342 }
3343 
3344 static __rte_always_inline int
3345 virtio_dev_tx_batch_packed(struct virtio_net *dev,
3346 			   struct vhost_virtqueue *vq,
3347 			   struct rte_mbuf **pkts,
3348 			   bool legacy_ol_flags)
3349 	__rte_shared_locks_required(&vq->iotlb_lock)
3350 {
3351 	uint16_t avail_idx = vq->last_avail_idx;
3352 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3353 	struct virtio_net_hdr *hdr;
3354 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
3355 	uint16_t ids[PACKED_BATCH_SIZE];
3356 	uint16_t i;
3357 
3358 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
3359 					     desc_addrs, ids))
3360 		return -1;
3361 
3362 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3363 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
3364 
3365 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3366 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
3367 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
3368 			   pkts[i]->pkt_len);
3369 
3370 	if (virtio_net_with_host_offload(dev)) {
3371 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3372 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
3373 			vhost_dequeue_offload(dev, hdr, pkts[i], legacy_ol_flags);
3374 		}
3375 	}
3376 
3377 	if (virtio_net_is_inorder(dev))
3378 		vhost_shadow_dequeue_batch_packed_inorder(vq,
3379 			ids[PACKED_BATCH_SIZE - 1]);
3380 	else
3381 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
3382 
3383 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
3384 
3385 	return 0;
3386 }
3387 
3388 static __rte_always_inline int
3389 vhost_dequeue_single_packed(struct virtio_net *dev,
3390 			    struct vhost_virtqueue *vq,
3391 			    struct rte_mempool *mbuf_pool,
3392 			    struct rte_mbuf *pkts,
3393 			    uint16_t *buf_id,
3394 			    uint16_t *desc_count,
3395 			    bool legacy_ol_flags)
3396 	__rte_shared_locks_required(&vq->access_lock)
3397 	__rte_shared_locks_required(&vq->iotlb_lock)
3398 {
3399 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3400 	uint32_t buf_len;
3401 	uint16_t nr_vec = 0;
3402 	int err;
3403 	static bool allocerr_warned;
3404 
3405 	if (unlikely(fill_vec_buf_packed(dev, vq,
3406 					 vq->last_avail_idx, desc_count,
3407 					 buf_vec, &nr_vec,
3408 					 buf_id, &buf_len,
3409 					 VHOST_ACCESS_RO) < 0))
3410 		return -1;
3411 
3412 	if (unlikely(buf_len <= dev->vhost_hlen))
3413 		return -1;
3414 
3415 	buf_len -= dev->vhost_hlen;
3416 
3417 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3418 		if (!allocerr_warned) {
3419 			VHOST_LOG_DATA(dev->ifname, ERR,
3420 				"failed mbuf alloc of size %d from %s.\n",
3421 				buf_len, mbuf_pool->name);
3422 			allocerr_warned = true;
3423 		}
3424 		return -1;
3425 	}
3426 
3427 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
3428 			   mbuf_pool, legacy_ol_flags, 0, false);
3429 	if (unlikely(err)) {
3430 		if (!allocerr_warned) {
3431 			VHOST_LOG_DATA(dev->ifname, ERR, "failed to copy desc to mbuf.\n");
3432 			allocerr_warned = true;
3433 		}
3434 		return -1;
3435 	}
3436 
3437 	return 0;
3438 }
3439 
3440 static __rte_always_inline int
3441 virtio_dev_tx_single_packed(struct virtio_net *dev,
3442 			    struct vhost_virtqueue *vq,
3443 			    struct rte_mempool *mbuf_pool,
3444 			    struct rte_mbuf *pkts,
3445 			    bool legacy_ol_flags)
3446 	__rte_shared_locks_required(&vq->access_lock)
3447 	__rte_shared_locks_required(&vq->iotlb_lock)
3448 {
3449 
3450 	uint16_t buf_id, desc_count = 0;
3451 	int ret;
3452 
3453 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
3454 					&desc_count, legacy_ol_flags);
3455 
3456 	if (likely(desc_count > 0)) {
3457 		if (virtio_net_is_inorder(dev))
3458 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
3459 								   desc_count);
3460 		else
3461 			vhost_shadow_dequeue_single_packed(vq, buf_id,
3462 					desc_count);
3463 
3464 		vq_inc_last_avail_packed(vq, desc_count);
3465 	}
3466 
3467 	return ret;
3468 }
3469 
3470 __rte_always_inline
3471 static uint16_t
3472 virtio_dev_tx_packed(struct virtio_net *dev,
3473 		     struct vhost_virtqueue *__rte_restrict vq,
3474 		     struct rte_mempool *mbuf_pool,
3475 		     struct rte_mbuf **__rte_restrict pkts,
3476 		     uint32_t count,
3477 		     bool legacy_ol_flags)
3478 	__rte_shared_locks_required(&vq->access_lock)
3479 	__rte_shared_locks_required(&vq->iotlb_lock)
3480 {
3481 	uint32_t pkt_idx = 0;
3482 
3483 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
3484 		return 0;
3485 
3486 	do {
3487 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
3488 
3489 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
3490 			if (!virtio_dev_tx_batch_packed(dev, vq,
3491 							&pkts[pkt_idx],
3492 							legacy_ol_flags)) {
3493 				pkt_idx += PACKED_BATCH_SIZE;
3494 				continue;
3495 			}
3496 		}
3497 
3498 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
3499 						pkts[pkt_idx],
3500 						legacy_ol_flags))
3501 			break;
3502 		pkt_idx++;
3503 	} while (pkt_idx < count);
3504 
3505 	if (pkt_idx != count)
3506 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
3507 
3508 	if (vq->shadow_used_idx) {
3509 		do_data_copy_dequeue(vq);
3510 
3511 		vhost_flush_dequeue_shadow_packed(dev, vq);
3512 		vhost_vring_call_packed(dev, vq);
3513 	}
3514 
3515 	return pkt_idx;
3516 }
3517 
3518 __rte_noinline
3519 static uint16_t
3520 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
3521 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3522 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3523 	__rte_shared_locks_required(&vq->access_lock)
3524 	__rte_shared_locks_required(&vq->iotlb_lock)
3525 {
3526 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
3527 }
3528 
3529 __rte_noinline
3530 static uint16_t
3531 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
3532 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3533 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3534 	__rte_shared_locks_required(&vq->access_lock)
3535 	__rte_shared_locks_required(&vq->iotlb_lock)
3536 {
3537 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
3538 }
3539 
3540 uint16_t
3541 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
3542 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
3543 {
3544 	struct virtio_net *dev;
3545 	struct rte_mbuf *rarp_mbuf = NULL;
3546 	struct vhost_virtqueue *vq;
3547 	int16_t success = 1;
3548 
3549 	dev = get_device(vid);
3550 	if (!dev)
3551 		return 0;
3552 
3553 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
3554 		VHOST_LOG_DATA(dev->ifname, ERR,
3555 			"%s: built-in vhost net backend is disabled.\n",
3556 			__func__);
3557 		return 0;
3558 	}
3559 
3560 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
3561 		VHOST_LOG_DATA(dev->ifname, ERR,
3562 			"%s: invalid virtqueue idx %d.\n",
3563 			__func__, queue_id);
3564 		return 0;
3565 	}
3566 
3567 	vq = dev->virtqueue[queue_id];
3568 
3569 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
3570 		return 0;
3571 
3572 	if (unlikely(!vq->enabled)) {
3573 		count = 0;
3574 		goto out_access_unlock;
3575 	}
3576 
3577 	vhost_user_iotlb_rd_lock(vq);
3578 
3579 	if (unlikely(!vq->access_ok))
3580 		if (unlikely(vring_translate(dev, vq) < 0)) {
3581 			count = 0;
3582 			goto out;
3583 		}
3584 
3585 	/*
3586 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
3587 	 * array, to looks like that guest actually send such packet.
3588 	 *
3589 	 * Check user_send_rarp() for more information.
3590 	 *
3591 	 * broadcast_rarp shares a cacheline in the virtio_net structure
3592 	 * with some fields that are accessed during enqueue and
3593 	 * __atomic_compare_exchange_n causes a write if performed compare
3594 	 * and exchange. This could result in false sharing between enqueue
3595 	 * and dequeue.
3596 	 *
3597 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
3598 	 * and only performing compare and exchange if the read indicates it
3599 	 * is likely to be set.
3600 	 */
3601 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
3602 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
3603 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
3604 
3605 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
3606 		if (rarp_mbuf == NULL) {
3607 			VHOST_LOG_DATA(dev->ifname, ERR, "failed to make RARP packet.\n");
3608 			count = 0;
3609 			goto out;
3610 		}
3611 		/*
3612 		 * Inject it to the head of "pkts" array, so that switch's mac
3613 		 * learning table will get updated first.
3614 		 */
3615 		pkts[0] = rarp_mbuf;
3616 		vhost_queue_stats_update(dev, vq, pkts, 1);
3617 		pkts++;
3618 		count -= 1;
3619 	}
3620 
3621 	if (vq_is_packed(dev)) {
3622 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3623 			count = virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool, pkts, count);
3624 		else
3625 			count = virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool, pkts, count);
3626 	} else {
3627 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3628 			count = virtio_dev_tx_split_legacy(dev, vq, mbuf_pool, pkts, count);
3629 		else
3630 			count = virtio_dev_tx_split_compliant(dev, vq, mbuf_pool, pkts, count);
3631 	}
3632 
3633 	vhost_queue_stats_update(dev, vq, pkts, count);
3634 
3635 out:
3636 	vhost_user_iotlb_rd_unlock(vq);
3637 
3638 out_access_unlock:
3639 	rte_rwlock_read_unlock(&vq->access_lock);
3640 
3641 	if (unlikely(rarp_mbuf != NULL))
3642 		count += 1;
3643 
3644 	return count;
3645 }
3646 
3647 static __rte_always_inline uint16_t
3648 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
3649 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
3650 		uint16_t vchan_id, bool legacy_ol_flags)
3651 	__rte_shared_locks_required(&vq->access_lock)
3652 {
3653 	uint16_t start_idx, from, i;
3654 	uint16_t nr_cpl_pkts = 0;
3655 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
3656 
3657 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
3658 
3659 	start_idx = async_get_first_inflight_pkt_idx(vq);
3660 
3661 	from = start_idx;
3662 	while (vq->async->pkts_cmpl_flag[from] && count--) {
3663 		vq->async->pkts_cmpl_flag[from] = false;
3664 		from = (from + 1) % vq->size;
3665 		nr_cpl_pkts++;
3666 	}
3667 
3668 	if (nr_cpl_pkts == 0)
3669 		return 0;
3670 
3671 	for (i = 0; i < nr_cpl_pkts; i++) {
3672 		from = (start_idx + i) % vq->size;
3673 		pkts[i] = pkts_info[from].mbuf;
3674 
3675 		if (virtio_net_with_host_offload(dev))
3676 			vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i],
3677 					      legacy_ol_flags);
3678 	}
3679 
3680 	/* write back completed descs to used ring and update used idx */
3681 	if (vq_is_packed(dev)) {
3682 		write_back_completed_descs_packed(vq, nr_cpl_pkts);
3683 		vhost_vring_call_packed(dev, vq);
3684 	} else {
3685 		write_back_completed_descs_split(vq, nr_cpl_pkts);
3686 		__atomic_fetch_add(&vq->used->idx, nr_cpl_pkts, __ATOMIC_RELEASE);
3687 		vhost_vring_call_split(dev, vq);
3688 	}
3689 	vq->async->pkts_inflight_n -= nr_cpl_pkts;
3690 
3691 	return nr_cpl_pkts;
3692 }
3693 
3694 static __rte_always_inline uint16_t
3695 virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3696 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3697 		int16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
3698 	__rte_shared_locks_required(&vq->access_lock)
3699 	__rte_shared_locks_required(&vq->iotlb_lock)
3700 {
3701 	static bool allocerr_warned;
3702 	bool dropped = false;
3703 	uint16_t avail_entries;
3704 	uint16_t pkt_idx, slot_idx = 0;
3705 	uint16_t nr_done_pkts = 0;
3706 	uint16_t pkt_err = 0;
3707 	uint16_t n_xfer;
3708 	struct vhost_async *async = vq->async;
3709 	struct async_inflight_info *pkts_info = async->pkts_info;
3710 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
3711 	uint16_t pkts_size = count;
3712 
3713 	/**
3714 	 * The ordering between avail index and
3715 	 * desc reads needs to be enforced.
3716 	 */
3717 	avail_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
3718 			vq->last_avail_idx;
3719 	if (avail_entries == 0)
3720 		goto out;
3721 
3722 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3723 
3724 	async_iter_reset(async);
3725 
3726 	count = RTE_MIN(count, MAX_PKT_BURST);
3727 	count = RTE_MIN(count, avail_entries);
3728 	VHOST_LOG_DATA(dev->ifname, DEBUG, "about to dequeue %u buffers\n", count);
3729 
3730 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
3731 		goto out;
3732 
3733 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
3734 		uint16_t head_idx = 0;
3735 		uint16_t nr_vec = 0;
3736 		uint16_t to;
3737 		uint32_t buf_len;
3738 		int err;
3739 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3740 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
3741 
3742 		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
3743 						&nr_vec, buf_vec,
3744 						&head_idx, &buf_len,
3745 						VHOST_ACCESS_RO) < 0)) {
3746 			dropped = true;
3747 			break;
3748 		}
3749 
3750 		if (unlikely(buf_len <= dev->vhost_hlen)) {
3751 			dropped = true;
3752 			break;
3753 		}
3754 
3755 		buf_len -= dev->vhost_hlen;
3756 
3757 		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
3758 		if (unlikely(err)) {
3759 			/**
3760 			 * mbuf allocation fails for jumbo packets when external
3761 			 * buffer allocation is not allowed and linear buffer
3762 			 * is required. Drop this packet.
3763 			 */
3764 			if (!allocerr_warned) {
3765 				VHOST_LOG_DATA(dev->ifname, ERR,
3766 					"%s: Failed mbuf alloc of size %d from %s\n",
3767 					__func__, buf_len, mbuf_pool->name);
3768 				allocerr_warned = true;
3769 			}
3770 			dropped = true;
3771 			slot_idx--;
3772 			break;
3773 		}
3774 
3775 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
3776 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
3777 					legacy_ol_flags, slot_idx, true);
3778 		if (unlikely(err)) {
3779 			if (!allocerr_warned) {
3780 				VHOST_LOG_DATA(dev->ifname, ERR,
3781 					"%s: Failed to offload copies to async channel.\n",
3782 					__func__);
3783 				allocerr_warned = true;
3784 			}
3785 			dropped = true;
3786 			slot_idx--;
3787 			break;
3788 		}
3789 
3790 		pkts_info[slot_idx].mbuf = pkt;
3791 
3792 		/* store used descs */
3793 		to = async->desc_idx_split & (vq->size - 1);
3794 		async->descs_split[to].id = head_idx;
3795 		async->descs_split[to].len = 0;
3796 		async->desc_idx_split++;
3797 
3798 		vq->last_avail_idx++;
3799 	}
3800 
3801 	if (unlikely(dropped))
3802 		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
3803 
3804 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
3805 					  async->iov_iter, pkt_idx);
3806 
3807 	async->pkts_inflight_n += n_xfer;
3808 
3809 	pkt_err = pkt_idx - n_xfer;
3810 	if (unlikely(pkt_err)) {
3811 		VHOST_LOG_DATA(dev->ifname, DEBUG, "%s: failed to transfer data.\n",
3812 			__func__);
3813 
3814 		pkt_idx = n_xfer;
3815 		/* recover available ring */
3816 		vq->last_avail_idx -= pkt_err;
3817 
3818 		/**
3819 		 * recover async channel copy related structures and free pktmbufs
3820 		 * for error pkts.
3821 		 */
3822 		async->desc_idx_split -= pkt_err;
3823 		while (pkt_err-- > 0) {
3824 			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
3825 			slot_idx--;
3826 		}
3827 	}
3828 
3829 	async->pkts_idx += pkt_idx;
3830 	if (async->pkts_idx >= vq->size)
3831 		async->pkts_idx -= vq->size;
3832 
3833 out:
3834 	/* DMA device may serve other queues, unconditionally check completed. */
3835 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, pkts_size,
3836 							dma_id, vchan_id, legacy_ol_flags);
3837 
3838 	return nr_done_pkts;
3839 }
3840 
3841 __rte_noinline
3842 static uint16_t
3843 virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
3844 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3845 		struct rte_mbuf **pkts, uint16_t count,
3846 		int16_t dma_id, uint16_t vchan_id)
3847 	__rte_shared_locks_required(&vq->access_lock)
3848 	__rte_shared_locks_required(&vq->iotlb_lock)
3849 {
3850 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3851 				pkts, count, dma_id, vchan_id, true);
3852 }
3853 
3854 __rte_noinline
3855 static uint16_t
3856 virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
3857 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3858 		struct rte_mbuf **pkts, uint16_t count,
3859 		int16_t dma_id, uint16_t vchan_id)
3860 	__rte_shared_locks_required(&vq->access_lock)
3861 	__rte_shared_locks_required(&vq->iotlb_lock)
3862 {
3863 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3864 				pkts, count, dma_id, vchan_id, false);
3865 }
3866 
3867 static __rte_always_inline void
3868 vhost_async_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
3869 				uint16_t buf_id, uint16_t count)
3870 	__rte_shared_locks_required(&vq->access_lock)
3871 {
3872 	struct vhost_async *async = vq->async;
3873 	uint16_t idx = async->buffer_idx_packed;
3874 
3875 	async->buffers_packed[idx].id = buf_id;
3876 	async->buffers_packed[idx].len = 0;
3877 	async->buffers_packed[idx].count = count;
3878 
3879 	async->buffer_idx_packed++;
3880 	if (async->buffer_idx_packed >= vq->size)
3881 		async->buffer_idx_packed -= vq->size;
3882 
3883 }
3884 
3885 static __rte_always_inline int
3886 virtio_dev_tx_async_single_packed(struct virtio_net *dev,
3887 			struct vhost_virtqueue *vq,
3888 			struct rte_mempool *mbuf_pool,
3889 			struct rte_mbuf *pkts,
3890 			uint16_t slot_idx,
3891 			bool legacy_ol_flags)
3892 	__rte_shared_locks_required(&vq->access_lock)
3893 	__rte_shared_locks_required(&vq->iotlb_lock)
3894 {
3895 	int err;
3896 	uint16_t buf_id, desc_count = 0;
3897 	uint16_t nr_vec = 0;
3898 	uint32_t buf_len;
3899 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3900 	struct vhost_async *async = vq->async;
3901 	struct async_inflight_info *pkts_info = async->pkts_info;
3902 	static bool allocerr_warned;
3903 
3904 	if (unlikely(fill_vec_buf_packed(dev, vq, vq->last_avail_idx, &desc_count,
3905 					 buf_vec, &nr_vec, &buf_id, &buf_len,
3906 					 VHOST_ACCESS_RO) < 0))
3907 		return -1;
3908 
3909 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3910 		if (!allocerr_warned) {
3911 			VHOST_LOG_DATA(dev->ifname, ERR, "Failed mbuf alloc of size %d from %s.\n",
3912 				buf_len, mbuf_pool->name);
3913 
3914 			allocerr_warned = true;
3915 		}
3916 		return -1;
3917 	}
3918 
3919 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts, mbuf_pool,
3920 		legacy_ol_flags, slot_idx, true);
3921 	if (unlikely(err)) {
3922 		rte_pktmbuf_free(pkts);
3923 		if (!allocerr_warned) {
3924 			VHOST_LOG_DATA(dev->ifname, ERR, "Failed to copy desc to mbuf on.\n");
3925 			allocerr_warned = true;
3926 		}
3927 		return -1;
3928 	}
3929 
3930 	pkts_info[slot_idx].descs = desc_count;
3931 
3932 	/* update async shadow packed ring */
3933 	vhost_async_shadow_dequeue_single_packed(vq, buf_id, desc_count);
3934 
3935 	vq_inc_last_avail_packed(vq, desc_count);
3936 
3937 	return err;
3938 }
3939 
3940 static __rte_always_inline int
3941 virtio_dev_tx_async_packed_batch(struct virtio_net *dev,
3942 			   struct vhost_virtqueue *vq,
3943 			   struct rte_mbuf **pkts, uint16_t slot_idx,
3944 			   uint16_t dma_id, uint16_t vchan_id)
3945 	__rte_shared_locks_required(&vq->access_lock)
3946 	__rte_shared_locks_required(&vq->iotlb_lock)
3947 {
3948 	uint16_t avail_idx = vq->last_avail_idx;
3949 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3950 	struct vhost_async *async = vq->async;
3951 	struct async_inflight_info *pkts_info = async->pkts_info;
3952 	struct virtio_net_hdr *hdr;
3953 	uint32_t mbuf_offset = 0;
3954 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
3955 	uint64_t desc_vva;
3956 	uint64_t lens[PACKED_BATCH_SIZE];
3957 	void *host_iova[PACKED_BATCH_SIZE];
3958 	uint64_t mapped_len[PACKED_BATCH_SIZE];
3959 	uint16_t ids[PACKED_BATCH_SIZE];
3960 	uint16_t i;
3961 
3962 	if (vhost_async_tx_batch_packed_check(dev, vq, pkts, avail_idx,
3963 					     desc_addrs, lens, ids, dma_id, vchan_id))
3964 		return -1;
3965 
3966 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3967 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
3968 
3969 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3970 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
3971 			desc_addrs[i] + buf_offset, pkts[i]->pkt_len, &mapped_len[i]);
3972 	}
3973 
3974 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3975 		async_iter_initialize(dev, async);
3976 		async_iter_add_iovec(dev, async,
3977 		host_iova[i],
3978 		(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
3979 		mapped_len[i]);
3980 		async->iter_idx++;
3981 	}
3982 
3983 	if (virtio_net_with_host_offload(dev)) {
3984 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3985 			desc_vva = vhost_iova_to_vva(dev, vq, desc_addrs[i],
3986 						&lens[i], VHOST_ACCESS_RO);
3987 			hdr = (struct virtio_net_hdr *)(uintptr_t)desc_vva;
3988 			pkts_info[slot_idx + i].nethdr = *hdr;
3989 		}
3990 	}
3991 
3992 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
3993 
3994 	vhost_async_shadow_dequeue_packed_batch(vq, ids);
3995 
3996 	return 0;
3997 }
3998 
3999 static __rte_always_inline uint16_t
4000 virtio_dev_tx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
4001 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4002 		uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
4003 	__rte_shared_locks_required(&vq->access_lock)
4004 	__rte_shared_locks_required(&vq->iotlb_lock)
4005 {
4006 	uint32_t pkt_idx = 0;
4007 	uint16_t slot_idx = 0;
4008 	uint16_t nr_done_pkts = 0;
4009 	uint16_t pkt_err = 0;
4010 	uint32_t n_xfer;
4011 	uint16_t i;
4012 	struct vhost_async *async = vq->async;
4013 	struct async_inflight_info *pkts_info = async->pkts_info;
4014 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
4015 
4016 	VHOST_LOG_DATA(dev->ifname, DEBUG, "(%d) about to dequeue %u buffers\n", dev->vid, count);
4017 
4018 	async_iter_reset(async);
4019 
4020 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
4021 		goto out;
4022 
4023 	do {
4024 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
4025 
4026 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
4027 
4028 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4029 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
4030 			if (!virtio_dev_tx_async_packed_batch(dev, vq, &pkts_prealloc[pkt_idx],
4031 						slot_idx, dma_id, vchan_id)) {
4032 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
4033 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4034 					pkts_info[slot_idx].descs = 1;
4035 					pkts_info[slot_idx].nr_buffers = 1;
4036 					pkts_info[slot_idx].mbuf = pkts_prealloc[pkt_idx];
4037 					pkt_idx++;
4038 				}
4039 				continue;
4040 			}
4041 		}
4042 
4043 		if (unlikely(virtio_dev_tx_async_single_packed(dev, vq, mbuf_pool, pkt,
4044 				slot_idx, legacy_ol_flags))) {
4045 			rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
4046 
4047 			if (slot_idx == 0)
4048 				slot_idx = vq->size - 1;
4049 			else
4050 				slot_idx--;
4051 
4052 			break;
4053 		}
4054 
4055 		pkts_info[slot_idx].mbuf = pkt;
4056 		pkt_idx++;
4057 	} while (pkt_idx < count);
4058 
4059 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
4060 					async->iov_iter, pkt_idx);
4061 
4062 	async->pkts_inflight_n += n_xfer;
4063 
4064 	pkt_err = pkt_idx - n_xfer;
4065 
4066 	if (unlikely(pkt_err)) {
4067 		uint16_t descs_err = 0;
4068 
4069 		pkt_idx -= pkt_err;
4070 
4071 		/**
4072 		 * recover DMA-copy related structures and free pktmbuf for DMA-error pkts.
4073 		 */
4074 		if (async->buffer_idx_packed >= pkt_err)
4075 			async->buffer_idx_packed -= pkt_err;
4076 		else
4077 			async->buffer_idx_packed += vq->size - pkt_err;
4078 
4079 		while (pkt_err-- > 0) {
4080 			rte_pktmbuf_free(pkts_info[slot_idx].mbuf);
4081 			descs_err += pkts_info[slot_idx].descs;
4082 
4083 			if (slot_idx == 0)
4084 				slot_idx = vq->size - 1;
4085 			else
4086 				slot_idx--;
4087 		}
4088 
4089 		/* recover available ring */
4090 		if (vq->last_avail_idx >= descs_err) {
4091 			vq->last_avail_idx -= descs_err;
4092 		} else {
4093 			vq->last_avail_idx += vq->size - descs_err;
4094 			vq->avail_wrap_counter ^= 1;
4095 		}
4096 	}
4097 
4098 	async->pkts_idx += pkt_idx;
4099 	if (async->pkts_idx >= vq->size)
4100 		async->pkts_idx -= vq->size;
4101 
4102 out:
4103 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, count,
4104 					dma_id, vchan_id, legacy_ol_flags);
4105 
4106 	return nr_done_pkts;
4107 }
4108 
4109 __rte_noinline
4110 static uint16_t
4111 virtio_dev_tx_async_packed_legacy(struct virtio_net *dev, struct vhost_virtqueue *vq,
4112 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4113 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4114 	__rte_shared_locks_required(&vq->access_lock)
4115 	__rte_shared_locks_required(&vq->iotlb_lock)
4116 {
4117 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4118 				pkts, count, dma_id, vchan_id, true);
4119 }
4120 
4121 __rte_noinline
4122 static uint16_t
4123 virtio_dev_tx_async_packed_compliant(struct virtio_net *dev, struct vhost_virtqueue *vq,
4124 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4125 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4126 	__rte_shared_locks_required(&vq->access_lock)
4127 	__rte_shared_locks_required(&vq->iotlb_lock)
4128 {
4129 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4130 				pkts, count, dma_id, vchan_id, false);
4131 }
4132 
4133 uint16_t
4134 rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
4135 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
4136 	int *nr_inflight, int16_t dma_id, uint16_t vchan_id)
4137 {
4138 	struct virtio_net *dev;
4139 	struct rte_mbuf *rarp_mbuf = NULL;
4140 	struct vhost_virtqueue *vq;
4141 	int16_t success = 1;
4142 
4143 	dev = get_device(vid);
4144 	if (!dev || !nr_inflight)
4145 		return 0;
4146 
4147 	*nr_inflight = -1;
4148 
4149 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
4150 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: built-in vhost net backend is disabled.\n",
4151 			__func__);
4152 		return 0;
4153 	}
4154 
4155 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
4156 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: invalid virtqueue idx %d.\n",
4157 			__func__, queue_id);
4158 		return 0;
4159 	}
4160 
4161 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
4162 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: invalid dma id %d.\n",
4163 			__func__, dma_id);
4164 		return 0;
4165 	}
4166 
4167 	if (unlikely(!dma_copy_track[dma_id].vchans ||
4168 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
4169 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: invalid channel %d:%u.\n",
4170 			__func__, dma_id, vchan_id);
4171 		return 0;
4172 	}
4173 
4174 	vq = dev->virtqueue[queue_id];
4175 
4176 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
4177 		return 0;
4178 
4179 	if (unlikely(vq->enabled == 0)) {
4180 		count = 0;
4181 		goto out_access_unlock;
4182 	}
4183 
4184 	if (unlikely(!vq->async)) {
4185 		VHOST_LOG_DATA(dev->ifname, ERR, "%s: async not registered for queue id %d.\n",
4186 			__func__, queue_id);
4187 		count = 0;
4188 		goto out_access_unlock;
4189 	}
4190 
4191 	vhost_user_iotlb_rd_lock(vq);
4192 
4193 	if (unlikely(vq->access_ok == 0))
4194 		if (unlikely(vring_translate(dev, vq) < 0)) {
4195 			count = 0;
4196 			goto out;
4197 		}
4198 
4199 	/*
4200 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
4201 	 * array, to looks like that guest actually send such packet.
4202 	 *
4203 	 * Check user_send_rarp() for more information.
4204 	 *
4205 	 * broadcast_rarp shares a cacheline in the virtio_net structure
4206 	 * with some fields that are accessed during enqueue and
4207 	 * __atomic_compare_exchange_n causes a write if performed compare
4208 	 * and exchange. This could result in false sharing between enqueue
4209 	 * and dequeue.
4210 	 *
4211 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
4212 	 * and only performing compare and exchange if the read indicates it
4213 	 * is likely to be set.
4214 	 */
4215 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
4216 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
4217 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
4218 
4219 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
4220 		if (rarp_mbuf == NULL) {
4221 			VHOST_LOG_DATA(dev->ifname, ERR, "failed to make RARP packet.\n");
4222 			count = 0;
4223 			goto out;
4224 		}
4225 		/*
4226 		 * Inject it to the head of "pkts" array, so that switch's mac
4227 		 * learning table will get updated first.
4228 		 */
4229 		pkts[0] = rarp_mbuf;
4230 		vhost_queue_stats_update(dev, vq, pkts, 1);
4231 		pkts++;
4232 		count -= 1;
4233 	}
4234 
4235 	if (vq_is_packed(dev)) {
4236 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4237 			count = virtio_dev_tx_async_packed_legacy(dev, vq, mbuf_pool,
4238 					pkts, count, dma_id, vchan_id);
4239 		else
4240 			count = virtio_dev_tx_async_packed_compliant(dev, vq, mbuf_pool,
4241 					pkts, count, dma_id, vchan_id);
4242 	} else {
4243 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4244 			count = virtio_dev_tx_async_split_legacy(dev, vq, mbuf_pool,
4245 					pkts, count, dma_id, vchan_id);
4246 		else
4247 			count = virtio_dev_tx_async_split_compliant(dev, vq, mbuf_pool,
4248 					pkts, count, dma_id, vchan_id);
4249 	}
4250 
4251 	*nr_inflight = vq->async->pkts_inflight_n;
4252 	vhost_queue_stats_update(dev, vq, pkts, count);
4253 
4254 out:
4255 	vhost_user_iotlb_rd_unlock(vq);
4256 
4257 out_access_unlock:
4258 	rte_rwlock_read_unlock(&vq->access_lock);
4259 
4260 	if (unlikely(rarp_mbuf != NULL))
4261 		count += 1;
4262 
4263 	return count;
4264 }
4265