xref: /dpdk/lib/vhost/virtio_net.c (revision 63f3f7cd445694200f0ff8110d7132353db2ff0f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_dmadev.h>
15 #include <rte_vhost.h>
16 #include <rte_tcp.h>
17 #include <rte_udp.h>
18 #include <rte_sctp.h>
19 #include <rte_arp.h>
20 #include <rte_spinlock.h>
21 #include <rte_malloc.h>
22 #include <rte_vhost_async.h>
23 
24 #include "iotlb.h"
25 #include "vhost.h"
26 
27 #define MAX_BATCH_LEN 256
28 
29 /* DMA device copy operation tracking array. */
30 struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
31 
32 static  __rte_always_inline bool
33 rxvq_is_mergeable(struct virtio_net *dev)
34 {
35 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
36 }
37 
38 static  __rte_always_inline bool
39 virtio_net_is_inorder(struct virtio_net *dev)
40 {
41 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
42 }
43 
44 static bool
45 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
46 {
47 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
48 }
49 
50 /*
51  * This function must be called with virtqueue's access_lock taken.
52  */
53 static inline void
54 vhost_queue_stats_update(struct virtio_net *dev, struct vhost_virtqueue *vq,
55 		struct rte_mbuf **pkts, uint16_t count)
56 {
57 	struct virtqueue_stats *stats = &vq->stats;
58 	int i;
59 
60 	if (!(dev->flags & VIRTIO_DEV_STATS_ENABLED))
61 		return;
62 
63 	for (i = 0; i < count; i++) {
64 		struct rte_ether_addr *ea;
65 		struct rte_mbuf *pkt = pkts[i];
66 		uint32_t pkt_len = rte_pktmbuf_pkt_len(pkt);
67 
68 		stats->packets++;
69 		stats->bytes += pkt_len;
70 
71 		if (pkt_len == 64) {
72 			stats->size_bins[1]++;
73 		} else if (pkt_len > 64 && pkt_len < 1024) {
74 			uint32_t bin;
75 
76 			/* count zeros, and offset into correct bin */
77 			bin = (sizeof(pkt_len) * 8) - __builtin_clz(pkt_len) - 5;
78 			stats->size_bins[bin]++;
79 		} else {
80 			if (pkt_len < 64)
81 				stats->size_bins[0]++;
82 			else if (pkt_len < 1519)
83 				stats->size_bins[6]++;
84 			else
85 				stats->size_bins[7]++;
86 		}
87 
88 		ea = rte_pktmbuf_mtod(pkt, struct rte_ether_addr *);
89 		if (rte_is_multicast_ether_addr(ea)) {
90 			if (rte_is_broadcast_ether_addr(ea))
91 				stats->broadcast++;
92 			else
93 				stats->multicast++;
94 		}
95 	}
96 }
97 
98 static __rte_always_inline int64_t
99 vhost_async_dma_transfer_one(struct virtio_net *dev, struct vhost_virtqueue *vq,
100 		int16_t dma_id, uint16_t vchan_id, uint16_t flag_idx,
101 		struct vhost_iov_iter *pkt)
102 {
103 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
104 	uint16_t ring_mask = dma_info->ring_mask;
105 	static bool vhost_async_dma_copy_log;
106 
107 
108 	struct vhost_iovec *iov = pkt->iov;
109 	int copy_idx = 0;
110 	uint32_t nr_segs = pkt->nr_segs;
111 	uint16_t i;
112 
113 	if (rte_dma_burst_capacity(dma_id, vchan_id) < nr_segs)
114 		return -1;
115 
116 	for (i = 0; i < nr_segs; i++) {
117 		copy_idx = rte_dma_copy(dma_id, vchan_id, (rte_iova_t)iov[i].src_addr,
118 				(rte_iova_t)iov[i].dst_addr, iov[i].len, RTE_DMA_OP_FLAG_LLC);
119 		/**
120 		 * Since all memory is pinned and DMA vChannel
121 		 * ring has enough space, failure should be a
122 		 * rare case. If failure happens, it means DMA
123 		 * device encounters serious errors; in this
124 		 * case, please stop async data-path and check
125 		 * what has happened to DMA device.
126 		 */
127 		if (unlikely(copy_idx < 0)) {
128 			if (!vhost_async_dma_copy_log) {
129 				VHOST_LOG_DATA(ERR, "(%s) DMA copy failed for channel %d:%u\n",
130 						dev->ifname, dma_id, vchan_id);
131 				vhost_async_dma_copy_log = true;
132 			}
133 			return -1;
134 		}
135 	}
136 
137 	/**
138 	 * Only store packet completion flag address in the last copy's
139 	 * slot, and other slots are set to NULL.
140 	 */
141 	dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = &vq->async->pkts_cmpl_flag[flag_idx];
142 
143 	return nr_segs;
144 }
145 
146 static __rte_always_inline uint16_t
147 vhost_async_dma_transfer(struct virtio_net *dev, struct vhost_virtqueue *vq,
148 		int16_t dma_id, uint16_t vchan_id, uint16_t head_idx,
149 		struct vhost_iov_iter *pkts, uint16_t nr_pkts)
150 {
151 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
152 	int64_t ret, nr_copies = 0;
153 	uint16_t pkt_idx;
154 
155 	rte_spinlock_lock(&dma_info->dma_lock);
156 
157 	for (pkt_idx = 0; pkt_idx < nr_pkts; pkt_idx++) {
158 		ret = vhost_async_dma_transfer_one(dev, vq, dma_id, vchan_id, head_idx,
159 				&pkts[pkt_idx]);
160 		if (unlikely(ret < 0))
161 			break;
162 
163 		nr_copies += ret;
164 		head_idx++;
165 		if (head_idx >= vq->size)
166 			head_idx -= vq->size;
167 	}
168 
169 	if (likely(nr_copies > 0))
170 		rte_dma_submit(dma_id, vchan_id);
171 
172 	rte_spinlock_unlock(&dma_info->dma_lock);
173 
174 	return pkt_idx;
175 }
176 
177 static __rte_always_inline uint16_t
178 vhost_async_dma_check_completed(struct virtio_net *dev, int16_t dma_id, uint16_t vchan_id,
179 		uint16_t max_pkts)
180 {
181 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
182 	uint16_t ring_mask = dma_info->ring_mask;
183 	uint16_t last_idx = 0;
184 	uint16_t nr_copies;
185 	uint16_t copy_idx;
186 	uint16_t i;
187 	bool has_error = false;
188 	static bool vhost_async_dma_complete_log;
189 
190 	rte_spinlock_lock(&dma_info->dma_lock);
191 
192 	/**
193 	 * Print error log for debugging, if DMA reports error during
194 	 * DMA transfer. We do not handle error in vhost level.
195 	 */
196 	nr_copies = rte_dma_completed(dma_id, vchan_id, max_pkts, &last_idx, &has_error);
197 	if (unlikely(!vhost_async_dma_complete_log && has_error)) {
198 		VHOST_LOG_DATA(ERR, "(%s) DMA completion failure on channel %d:%u\n", dev->ifname,
199 				dma_id, vchan_id);
200 		vhost_async_dma_complete_log = true;
201 	} else if (nr_copies == 0) {
202 		goto out;
203 	}
204 
205 	copy_idx = last_idx - nr_copies + 1;
206 	for (i = 0; i < nr_copies; i++) {
207 		bool *flag;
208 
209 		flag = dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask];
210 		if (flag) {
211 			/**
212 			 * Mark the packet flag as received. The flag
213 			 * could belong to another virtqueue but write
214 			 * is atomic.
215 			 */
216 			*flag = true;
217 			dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = NULL;
218 		}
219 		copy_idx++;
220 	}
221 
222 out:
223 	rte_spinlock_unlock(&dma_info->dma_lock);
224 	return nr_copies;
225 }
226 
227 static inline void
228 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
229 {
230 	struct batch_copy_elem *elem = vq->batch_copy_elems;
231 	uint16_t count = vq->batch_copy_nb_elems;
232 	int i;
233 
234 	for (i = 0; i < count; i++) {
235 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
236 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
237 					   elem[i].len);
238 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
239 	}
240 
241 	vq->batch_copy_nb_elems = 0;
242 }
243 
244 static inline void
245 do_data_copy_dequeue(struct vhost_virtqueue *vq)
246 {
247 	struct batch_copy_elem *elem = vq->batch_copy_elems;
248 	uint16_t count = vq->batch_copy_nb_elems;
249 	int i;
250 
251 	for (i = 0; i < count; i++)
252 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
253 
254 	vq->batch_copy_nb_elems = 0;
255 }
256 
257 static __rte_always_inline void
258 do_flush_shadow_used_ring_split(struct virtio_net *dev,
259 			struct vhost_virtqueue *vq,
260 			uint16_t to, uint16_t from, uint16_t size)
261 {
262 	rte_memcpy(&vq->used->ring[to],
263 			&vq->shadow_used_split[from],
264 			size * sizeof(struct vring_used_elem));
265 	vhost_log_cache_used_vring(dev, vq,
266 			offsetof(struct vring_used, ring[to]),
267 			size * sizeof(struct vring_used_elem));
268 }
269 
270 static __rte_always_inline void
271 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
272 {
273 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
274 
275 	if (used_idx + vq->shadow_used_idx <= vq->size) {
276 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
277 					  vq->shadow_used_idx);
278 	} else {
279 		uint16_t size;
280 
281 		/* update used ring interval [used_idx, vq->size] */
282 		size = vq->size - used_idx;
283 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
284 
285 		/* update the left half used ring interval [0, left_size] */
286 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
287 					  vq->shadow_used_idx - size);
288 	}
289 	vq->last_used_idx += vq->shadow_used_idx;
290 
291 	vhost_log_cache_sync(dev, vq);
292 
293 	__atomic_add_fetch(&vq->used->idx, vq->shadow_used_idx,
294 			   __ATOMIC_RELEASE);
295 	vq->shadow_used_idx = 0;
296 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
297 		sizeof(vq->used->idx));
298 }
299 
300 static __rte_always_inline void
301 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
302 			 uint16_t desc_idx, uint32_t len)
303 {
304 	uint16_t i = vq->shadow_used_idx++;
305 
306 	vq->shadow_used_split[i].id  = desc_idx;
307 	vq->shadow_used_split[i].len = len;
308 }
309 
310 static __rte_always_inline void
311 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
312 				  struct vhost_virtqueue *vq)
313 {
314 	int i;
315 	uint16_t used_idx = vq->last_used_idx;
316 	uint16_t head_idx = vq->last_used_idx;
317 	uint16_t head_flags = 0;
318 
319 	/* Split loop in two to save memory barriers */
320 	for (i = 0; i < vq->shadow_used_idx; i++) {
321 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
322 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
323 
324 		used_idx += vq->shadow_used_packed[i].count;
325 		if (used_idx >= vq->size)
326 			used_idx -= vq->size;
327 	}
328 
329 	/* The ordering for storing desc flags needs to be enforced. */
330 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
331 
332 	for (i = 0; i < vq->shadow_used_idx; i++) {
333 		uint16_t flags;
334 
335 		if (vq->shadow_used_packed[i].len)
336 			flags = VRING_DESC_F_WRITE;
337 		else
338 			flags = 0;
339 
340 		if (vq->used_wrap_counter) {
341 			flags |= VRING_DESC_F_USED;
342 			flags |= VRING_DESC_F_AVAIL;
343 		} else {
344 			flags &= ~VRING_DESC_F_USED;
345 			flags &= ~VRING_DESC_F_AVAIL;
346 		}
347 
348 		if (i > 0) {
349 			vq->desc_packed[vq->last_used_idx].flags = flags;
350 
351 			vhost_log_cache_used_vring(dev, vq,
352 					vq->last_used_idx *
353 					sizeof(struct vring_packed_desc),
354 					sizeof(struct vring_packed_desc));
355 		} else {
356 			head_idx = vq->last_used_idx;
357 			head_flags = flags;
358 		}
359 
360 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
361 	}
362 
363 	vq->desc_packed[head_idx].flags = head_flags;
364 
365 	vhost_log_cache_used_vring(dev, vq,
366 				head_idx *
367 				sizeof(struct vring_packed_desc),
368 				sizeof(struct vring_packed_desc));
369 
370 	vq->shadow_used_idx = 0;
371 	vhost_log_cache_sync(dev, vq);
372 }
373 
374 static __rte_always_inline void
375 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
376 				  struct vhost_virtqueue *vq)
377 {
378 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
379 
380 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
381 	/* desc flags is the synchronization point for virtio packed vring */
382 	__atomic_store_n(&vq->desc_packed[vq->shadow_last_used_idx].flags,
383 			 used_elem->flags, __ATOMIC_RELEASE);
384 
385 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
386 				   sizeof(struct vring_packed_desc),
387 				   sizeof(struct vring_packed_desc));
388 	vq->shadow_used_idx = 0;
389 	vhost_log_cache_sync(dev, vq);
390 }
391 
392 static __rte_always_inline void
393 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
394 				 struct vhost_virtqueue *vq,
395 				 uint64_t *lens,
396 				 uint16_t *ids)
397 {
398 	uint16_t i;
399 	uint16_t flags;
400 	uint16_t last_used_idx;
401 	struct vring_packed_desc *desc_base;
402 
403 	last_used_idx = vq->last_used_idx;
404 	desc_base = &vq->desc_packed[last_used_idx];
405 
406 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
407 
408 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
409 		desc_base[i].id = ids[i];
410 		desc_base[i].len = lens[i];
411 	}
412 
413 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
414 
415 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
416 		desc_base[i].flags = flags;
417 	}
418 
419 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
420 				   sizeof(struct vring_packed_desc),
421 				   sizeof(struct vring_packed_desc) *
422 				   PACKED_BATCH_SIZE);
423 	vhost_log_cache_sync(dev, vq);
424 
425 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
426 }
427 
428 static __rte_always_inline void
429 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
430 					  uint16_t id)
431 {
432 	vq->shadow_used_packed[0].id = id;
433 
434 	if (!vq->shadow_used_idx) {
435 		vq->shadow_last_used_idx = vq->last_used_idx;
436 		vq->shadow_used_packed[0].flags =
437 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
438 		vq->shadow_used_packed[0].len = 0;
439 		vq->shadow_used_packed[0].count = 1;
440 		vq->shadow_used_idx++;
441 	}
442 
443 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
444 }
445 
446 static __rte_always_inline void
447 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
448 				  struct vhost_virtqueue *vq,
449 				  uint16_t *ids)
450 {
451 	uint16_t flags;
452 	uint16_t i;
453 	uint16_t begin;
454 
455 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
456 
457 	if (!vq->shadow_used_idx) {
458 		vq->shadow_last_used_idx = vq->last_used_idx;
459 		vq->shadow_used_packed[0].id  = ids[0];
460 		vq->shadow_used_packed[0].len = 0;
461 		vq->shadow_used_packed[0].count = 1;
462 		vq->shadow_used_packed[0].flags = flags;
463 		vq->shadow_used_idx++;
464 		begin = 1;
465 	} else
466 		begin = 0;
467 
468 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
469 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
470 		vq->desc_packed[vq->last_used_idx + i].len = 0;
471 	}
472 
473 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
474 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
475 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
476 
477 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
478 				   sizeof(struct vring_packed_desc),
479 				   sizeof(struct vring_packed_desc) *
480 				   PACKED_BATCH_SIZE);
481 	vhost_log_cache_sync(dev, vq);
482 
483 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
484 }
485 
486 static __rte_always_inline void
487 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
488 				   uint16_t buf_id,
489 				   uint16_t count)
490 {
491 	uint16_t flags;
492 
493 	flags = vq->desc_packed[vq->last_used_idx].flags;
494 	if (vq->used_wrap_counter) {
495 		flags |= VRING_DESC_F_USED;
496 		flags |= VRING_DESC_F_AVAIL;
497 	} else {
498 		flags &= ~VRING_DESC_F_USED;
499 		flags &= ~VRING_DESC_F_AVAIL;
500 	}
501 
502 	if (!vq->shadow_used_idx) {
503 		vq->shadow_last_used_idx = vq->last_used_idx;
504 
505 		vq->shadow_used_packed[0].id  = buf_id;
506 		vq->shadow_used_packed[0].len = 0;
507 		vq->shadow_used_packed[0].flags = flags;
508 		vq->shadow_used_idx++;
509 	} else {
510 		vq->desc_packed[vq->last_used_idx].id = buf_id;
511 		vq->desc_packed[vq->last_used_idx].len = 0;
512 		vq->desc_packed[vq->last_used_idx].flags = flags;
513 	}
514 
515 	vq_inc_last_used_packed(vq, count);
516 }
517 
518 static __rte_always_inline void
519 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
520 					   uint16_t buf_id,
521 					   uint16_t count)
522 {
523 	uint16_t flags;
524 
525 	vq->shadow_used_packed[0].id = buf_id;
526 
527 	flags = vq->desc_packed[vq->last_used_idx].flags;
528 	if (vq->used_wrap_counter) {
529 		flags |= VRING_DESC_F_USED;
530 		flags |= VRING_DESC_F_AVAIL;
531 	} else {
532 		flags &= ~VRING_DESC_F_USED;
533 		flags &= ~VRING_DESC_F_AVAIL;
534 	}
535 
536 	if (!vq->shadow_used_idx) {
537 		vq->shadow_last_used_idx = vq->last_used_idx;
538 		vq->shadow_used_packed[0].len = 0;
539 		vq->shadow_used_packed[0].flags = flags;
540 		vq->shadow_used_idx++;
541 	}
542 
543 	vq_inc_last_used_packed(vq, count);
544 }
545 
546 static __rte_always_inline void
547 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
548 				   uint32_t *len,
549 				   uint16_t *id,
550 				   uint16_t *count,
551 				   uint16_t num_buffers)
552 {
553 	uint16_t i;
554 
555 	for (i = 0; i < num_buffers; i++) {
556 		/* enqueue shadow flush action aligned with batch num */
557 		if (!vq->shadow_used_idx)
558 			vq->shadow_aligned_idx = vq->last_used_idx &
559 				PACKED_BATCH_MASK;
560 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
561 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
562 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
563 		vq->shadow_aligned_idx += count[i];
564 		vq->shadow_used_idx++;
565 	}
566 }
567 
568 static __rte_always_inline void
569 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
570 				   struct vhost_virtqueue *vq,
571 				   uint32_t *len,
572 				   uint16_t *id,
573 				   uint16_t *count,
574 				   uint16_t num_buffers)
575 {
576 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
577 
578 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
579 		do_data_copy_enqueue(dev, vq);
580 		vhost_flush_enqueue_shadow_packed(dev, vq);
581 	}
582 }
583 
584 /* avoid write operation when necessary, to lessen cache issues */
585 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
586 	if ((var) != (val))			\
587 		(var) = (val);			\
588 } while (0)
589 
590 static __rte_always_inline void
591 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
592 {
593 	uint64_t csum_l4 = m_buf->ol_flags & RTE_MBUF_F_TX_L4_MASK;
594 
595 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
596 		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
597 
598 	if (csum_l4) {
599 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
600 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
601 
602 		switch (csum_l4) {
603 		case RTE_MBUF_F_TX_TCP_CKSUM:
604 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
605 						cksum));
606 			break;
607 		case RTE_MBUF_F_TX_UDP_CKSUM:
608 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
609 						dgram_cksum));
610 			break;
611 		case RTE_MBUF_F_TX_SCTP_CKSUM:
612 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
613 						cksum));
614 			break;
615 		}
616 	} else {
617 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
618 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
619 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
620 	}
621 
622 	/* IP cksum verification cannot be bypassed, then calculate here */
623 	if (m_buf->ol_flags & RTE_MBUF_F_TX_IP_CKSUM) {
624 		struct rte_ipv4_hdr *ipv4_hdr;
625 
626 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
627 						   m_buf->l2_len);
628 		ipv4_hdr->hdr_checksum = 0;
629 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
630 	}
631 
632 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
633 		if (m_buf->ol_flags & RTE_MBUF_F_TX_IPV4)
634 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
635 		else
636 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
637 		net_hdr->gso_size = m_buf->tso_segsz;
638 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
639 					+ m_buf->l4_len;
640 	} else if (m_buf->ol_flags & RTE_MBUF_F_TX_UDP_SEG) {
641 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
642 		net_hdr->gso_size = m_buf->tso_segsz;
643 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
644 			m_buf->l4_len;
645 	} else {
646 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
647 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
648 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
649 	}
650 }
651 
652 static __rte_always_inline int
653 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
654 		struct buf_vector *buf_vec, uint16_t *vec_idx,
655 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
656 {
657 	uint16_t vec_id = *vec_idx;
658 
659 	while (desc_len) {
660 		uint64_t desc_addr;
661 		uint64_t desc_chunck_len = desc_len;
662 
663 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
664 			return -1;
665 
666 		desc_addr = vhost_iova_to_vva(dev, vq,
667 				desc_iova,
668 				&desc_chunck_len,
669 				perm);
670 		if (unlikely(!desc_addr))
671 			return -1;
672 
673 		rte_prefetch0((void *)(uintptr_t)desc_addr);
674 
675 		buf_vec[vec_id].buf_iova = desc_iova;
676 		buf_vec[vec_id].buf_addr = desc_addr;
677 		buf_vec[vec_id].buf_len  = desc_chunck_len;
678 
679 		desc_len -= desc_chunck_len;
680 		desc_iova += desc_chunck_len;
681 		vec_id++;
682 	}
683 	*vec_idx = vec_id;
684 
685 	return 0;
686 }
687 
688 static __rte_always_inline int
689 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
690 			 uint32_t avail_idx, uint16_t *vec_idx,
691 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
692 			 uint32_t *desc_chain_len, uint8_t perm)
693 {
694 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
695 	uint16_t vec_id = *vec_idx;
696 	uint32_t len    = 0;
697 	uint64_t dlen;
698 	uint32_t nr_descs = vq->size;
699 	uint32_t cnt    = 0;
700 	struct vring_desc *descs = vq->desc;
701 	struct vring_desc *idesc = NULL;
702 
703 	if (unlikely(idx >= vq->size))
704 		return -1;
705 
706 	*desc_chain_head = idx;
707 
708 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
709 		dlen = vq->desc[idx].len;
710 		nr_descs = dlen / sizeof(struct vring_desc);
711 		if (unlikely(nr_descs > vq->size))
712 			return -1;
713 
714 		descs = (struct vring_desc *)(uintptr_t)
715 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
716 						&dlen,
717 						VHOST_ACCESS_RO);
718 		if (unlikely(!descs))
719 			return -1;
720 
721 		if (unlikely(dlen < vq->desc[idx].len)) {
722 			/*
723 			 * The indirect desc table is not contiguous
724 			 * in process VA space, we have to copy it.
725 			 */
726 			idesc = vhost_alloc_copy_ind_table(dev, vq,
727 					vq->desc[idx].addr, vq->desc[idx].len);
728 			if (unlikely(!idesc))
729 				return -1;
730 
731 			descs = idesc;
732 		}
733 
734 		idx = 0;
735 	}
736 
737 	while (1) {
738 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
739 			free_ind_table(idesc);
740 			return -1;
741 		}
742 
743 		dlen = descs[idx].len;
744 		len += dlen;
745 
746 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
747 						descs[idx].addr, dlen,
748 						perm))) {
749 			free_ind_table(idesc);
750 			return -1;
751 		}
752 
753 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
754 			break;
755 
756 		idx = descs[idx].next;
757 	}
758 
759 	*desc_chain_len = len;
760 	*vec_idx = vec_id;
761 
762 	if (unlikely(!!idesc))
763 		free_ind_table(idesc);
764 
765 	return 0;
766 }
767 
768 /*
769  * Returns -1 on fail, 0 on success
770  */
771 static inline int
772 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
773 				uint32_t size, struct buf_vector *buf_vec,
774 				uint16_t *num_buffers, uint16_t avail_head,
775 				uint16_t *nr_vec)
776 {
777 	uint16_t cur_idx;
778 	uint16_t vec_idx = 0;
779 	uint16_t max_tries, tries = 0;
780 
781 	uint16_t head_idx = 0;
782 	uint32_t len = 0;
783 
784 	*num_buffers = 0;
785 	cur_idx  = vq->last_avail_idx;
786 
787 	if (rxvq_is_mergeable(dev))
788 		max_tries = vq->size - 1;
789 	else
790 		max_tries = 1;
791 
792 	while (size > 0) {
793 		if (unlikely(cur_idx == avail_head))
794 			return -1;
795 		/*
796 		 * if we tried all available ring items, and still
797 		 * can't get enough buf, it means something abnormal
798 		 * happened.
799 		 */
800 		if (unlikely(++tries > max_tries))
801 			return -1;
802 
803 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
804 						&vec_idx, buf_vec,
805 						&head_idx, &len,
806 						VHOST_ACCESS_RW) < 0))
807 			return -1;
808 		len = RTE_MIN(len, size);
809 		update_shadow_used_ring_split(vq, head_idx, len);
810 		size -= len;
811 
812 		cur_idx++;
813 		*num_buffers += 1;
814 	}
815 
816 	*nr_vec = vec_idx;
817 
818 	return 0;
819 }
820 
821 static __rte_always_inline int
822 fill_vec_buf_packed_indirect(struct virtio_net *dev,
823 			struct vhost_virtqueue *vq,
824 			struct vring_packed_desc *desc, uint16_t *vec_idx,
825 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
826 {
827 	uint16_t i;
828 	uint32_t nr_descs;
829 	uint16_t vec_id = *vec_idx;
830 	uint64_t dlen;
831 	struct vring_packed_desc *descs, *idescs = NULL;
832 
833 	dlen = desc->len;
834 	descs = (struct vring_packed_desc *)(uintptr_t)
835 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
836 	if (unlikely(!descs))
837 		return -1;
838 
839 	if (unlikely(dlen < desc->len)) {
840 		/*
841 		 * The indirect desc table is not contiguous
842 		 * in process VA space, we have to copy it.
843 		 */
844 		idescs = vhost_alloc_copy_ind_table(dev,
845 				vq, desc->addr, desc->len);
846 		if (unlikely(!idescs))
847 			return -1;
848 
849 		descs = idescs;
850 	}
851 
852 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
853 	if (unlikely(nr_descs >= vq->size)) {
854 		free_ind_table(idescs);
855 		return -1;
856 	}
857 
858 	for (i = 0; i < nr_descs; i++) {
859 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
860 			free_ind_table(idescs);
861 			return -1;
862 		}
863 
864 		dlen = descs[i].len;
865 		*len += dlen;
866 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
867 						descs[i].addr, dlen,
868 						perm)))
869 			return -1;
870 	}
871 	*vec_idx = vec_id;
872 
873 	if (unlikely(!!idescs))
874 		free_ind_table(idescs);
875 
876 	return 0;
877 }
878 
879 static __rte_always_inline int
880 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
881 				uint16_t avail_idx, uint16_t *desc_count,
882 				struct buf_vector *buf_vec, uint16_t *vec_idx,
883 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
884 {
885 	bool wrap_counter = vq->avail_wrap_counter;
886 	struct vring_packed_desc *descs = vq->desc_packed;
887 	uint16_t vec_id = *vec_idx;
888 	uint64_t dlen;
889 
890 	if (avail_idx < vq->last_avail_idx)
891 		wrap_counter ^= 1;
892 
893 	/*
894 	 * Perform a load-acquire barrier in desc_is_avail to
895 	 * enforce the ordering between desc flags and desc
896 	 * content.
897 	 */
898 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
899 		return -1;
900 
901 	*desc_count = 0;
902 	*len = 0;
903 
904 	while (1) {
905 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
906 			return -1;
907 
908 		if (unlikely(*desc_count >= vq->size))
909 			return -1;
910 
911 		*desc_count += 1;
912 		*buf_id = descs[avail_idx].id;
913 
914 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
915 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
916 							&descs[avail_idx],
917 							&vec_id, buf_vec,
918 							len, perm) < 0))
919 				return -1;
920 		} else {
921 			dlen = descs[avail_idx].len;
922 			*len += dlen;
923 
924 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
925 							descs[avail_idx].addr,
926 							dlen,
927 							perm)))
928 				return -1;
929 		}
930 
931 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
932 			break;
933 
934 		if (++avail_idx >= vq->size) {
935 			avail_idx -= vq->size;
936 			wrap_counter ^= 1;
937 		}
938 	}
939 
940 	*vec_idx = vec_id;
941 
942 	return 0;
943 }
944 
945 static __rte_noinline void
946 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
947 		struct buf_vector *buf_vec,
948 		struct virtio_net_hdr_mrg_rxbuf *hdr)
949 {
950 	uint64_t len;
951 	uint64_t remain = dev->vhost_hlen;
952 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
953 	uint64_t iova = buf_vec->buf_iova;
954 
955 	while (remain) {
956 		len = RTE_MIN(remain,
957 				buf_vec->buf_len);
958 		dst = buf_vec->buf_addr;
959 		rte_memcpy((void *)(uintptr_t)dst,
960 				(void *)(uintptr_t)src,
961 				len);
962 
963 		PRINT_PACKET(dev, (uintptr_t)dst,
964 				(uint32_t)len, 0);
965 		vhost_log_cache_write_iova(dev, vq,
966 				iova, len);
967 
968 		remain -= len;
969 		iova += len;
970 		src += len;
971 		buf_vec++;
972 	}
973 }
974 
975 static __rte_always_inline int
976 async_iter_initialize(struct virtio_net *dev, struct vhost_async *async)
977 {
978 	struct vhost_iov_iter *iter;
979 
980 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
981 		VHOST_LOG_DATA(ERR, "(%s) no more async iovec available\n", dev->ifname);
982 		return -1;
983 	}
984 
985 	iter = async->iov_iter + async->iter_idx;
986 	iter->iov = async->iovec + async->iovec_idx;
987 	iter->nr_segs = 0;
988 
989 	return 0;
990 }
991 
992 static __rte_always_inline int
993 async_iter_add_iovec(struct virtio_net *dev, struct vhost_async *async,
994 		void *src, void *dst, size_t len)
995 {
996 	struct vhost_iov_iter *iter;
997 	struct vhost_iovec *iovec;
998 
999 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1000 		static bool vhost_max_async_vec_log;
1001 
1002 		if (!vhost_max_async_vec_log) {
1003 			VHOST_LOG_DATA(ERR, "(%s) no more async iovec available\n", dev->ifname);
1004 			vhost_max_async_vec_log = true;
1005 		}
1006 
1007 		return -1;
1008 	}
1009 
1010 	iter = async->iov_iter + async->iter_idx;
1011 	iovec = async->iovec + async->iovec_idx;
1012 
1013 	iovec->src_addr = src;
1014 	iovec->dst_addr = dst;
1015 	iovec->len = len;
1016 
1017 	iter->nr_segs++;
1018 	async->iovec_idx++;
1019 
1020 	return 0;
1021 }
1022 
1023 static __rte_always_inline void
1024 async_iter_finalize(struct vhost_async *async)
1025 {
1026 	async->iter_idx++;
1027 }
1028 
1029 static __rte_always_inline void
1030 async_iter_cancel(struct vhost_async *async)
1031 {
1032 	struct vhost_iov_iter *iter;
1033 
1034 	iter = async->iov_iter + async->iter_idx;
1035 	async->iovec_idx -= iter->nr_segs;
1036 	iter->nr_segs = 0;
1037 	iter->iov = NULL;
1038 }
1039 
1040 static __rte_always_inline void
1041 async_iter_reset(struct vhost_async *async)
1042 {
1043 	async->iter_idx = 0;
1044 	async->iovec_idx = 0;
1045 }
1046 
1047 static __rte_always_inline int
1048 async_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1049 		struct rte_mbuf *m, uint32_t mbuf_offset,
1050 		uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1051 {
1052 	struct vhost_async *async = vq->async;
1053 	uint64_t mapped_len;
1054 	uint32_t buf_offset = 0;
1055 	void *src, *dst;
1056 	void *host_iova;
1057 
1058 	while (cpy_len) {
1059 		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
1060 				buf_iova + buf_offset, cpy_len, &mapped_len);
1061 		if (unlikely(!host_iova)) {
1062 			VHOST_LOG_DATA(ERR, "(%s) %s: failed to get host iova.\n",
1063 				       dev->ifname, __func__);
1064 			return -1;
1065 		}
1066 
1067 		if (to_desc) {
1068 			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1069 			dst = host_iova;
1070 		} else {
1071 			src = host_iova;
1072 			dst = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1073 		}
1074 
1075 		if (unlikely(async_iter_add_iovec(dev, async, src, dst, (size_t)mapped_len)))
1076 			return -1;
1077 
1078 		cpy_len -= (uint32_t)mapped_len;
1079 		mbuf_offset += (uint32_t)mapped_len;
1080 		buf_offset += (uint32_t)mapped_len;
1081 	}
1082 
1083 	return 0;
1084 }
1085 
1086 static __rte_always_inline void
1087 sync_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1088 		struct rte_mbuf *m, uint32_t mbuf_offset,
1089 		uint64_t buf_addr, uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1090 {
1091 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
1092 
1093 	if (likely(cpy_len > MAX_BATCH_LEN || vq->batch_copy_nb_elems >= vq->size)) {
1094 		if (to_desc) {
1095 			rte_memcpy((void *)((uintptr_t)(buf_addr)),
1096 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1097 				cpy_len);
1098 		} else {
1099 			rte_memcpy(rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1100 				(void *)((uintptr_t)(buf_addr)),
1101 				cpy_len);
1102 		}
1103 		vhost_log_cache_write_iova(dev, vq, buf_iova, cpy_len);
1104 		PRINT_PACKET(dev, (uintptr_t)(buf_addr), cpy_len, 0);
1105 	} else {
1106 		if (to_desc) {
1107 			batch_copy[vq->batch_copy_nb_elems].dst =
1108 				(void *)((uintptr_t)(buf_addr));
1109 			batch_copy[vq->batch_copy_nb_elems].src =
1110 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1111 		} else {
1112 			batch_copy[vq->batch_copy_nb_elems].dst =
1113 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1114 			batch_copy[vq->batch_copy_nb_elems].src =
1115 				(void *)((uintptr_t)(buf_addr));
1116 		}
1117 		batch_copy[vq->batch_copy_nb_elems].log_addr = buf_iova;
1118 		batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
1119 		vq->batch_copy_nb_elems++;
1120 	}
1121 }
1122 
1123 static __rte_always_inline int
1124 mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1125 		struct rte_mbuf *m, struct buf_vector *buf_vec,
1126 		uint16_t nr_vec, uint16_t num_buffers, bool is_async)
1127 {
1128 	uint32_t vec_idx = 0;
1129 	uint32_t mbuf_offset, mbuf_avail;
1130 	uint32_t buf_offset, buf_avail;
1131 	uint64_t buf_addr, buf_iova, buf_len;
1132 	uint32_t cpy_len;
1133 	uint64_t hdr_addr;
1134 	struct rte_mbuf *hdr_mbuf;
1135 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
1136 	struct vhost_async *async = vq->async;
1137 
1138 	if (unlikely(m == NULL))
1139 		return -1;
1140 
1141 	buf_addr = buf_vec[vec_idx].buf_addr;
1142 	buf_iova = buf_vec[vec_idx].buf_iova;
1143 	buf_len = buf_vec[vec_idx].buf_len;
1144 
1145 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
1146 		return -1;
1147 
1148 	hdr_mbuf = m;
1149 	hdr_addr = buf_addr;
1150 	if (unlikely(buf_len < dev->vhost_hlen)) {
1151 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
1152 		hdr = &tmp_hdr;
1153 	} else
1154 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
1155 
1156 	VHOST_LOG_DATA(DEBUG, "(%s) RX: num merge buffers %d\n",
1157 		dev->ifname, num_buffers);
1158 
1159 	if (unlikely(buf_len < dev->vhost_hlen)) {
1160 		buf_offset = dev->vhost_hlen - buf_len;
1161 		vec_idx++;
1162 		buf_addr = buf_vec[vec_idx].buf_addr;
1163 		buf_iova = buf_vec[vec_idx].buf_iova;
1164 		buf_len = buf_vec[vec_idx].buf_len;
1165 		buf_avail = buf_len - buf_offset;
1166 	} else {
1167 		buf_offset = dev->vhost_hlen;
1168 		buf_avail = buf_len - dev->vhost_hlen;
1169 	}
1170 
1171 	mbuf_avail  = rte_pktmbuf_data_len(m);
1172 	mbuf_offset = 0;
1173 
1174 	if (is_async) {
1175 		if (async_iter_initialize(dev, async))
1176 			return -1;
1177 	}
1178 
1179 	while (mbuf_avail != 0 || m->next != NULL) {
1180 		/* done with current buf, get the next one */
1181 		if (buf_avail == 0) {
1182 			vec_idx++;
1183 			if (unlikely(vec_idx >= nr_vec))
1184 				goto error;
1185 
1186 			buf_addr = buf_vec[vec_idx].buf_addr;
1187 			buf_iova = buf_vec[vec_idx].buf_iova;
1188 			buf_len = buf_vec[vec_idx].buf_len;
1189 
1190 			buf_offset = 0;
1191 			buf_avail  = buf_len;
1192 		}
1193 
1194 		/* done with current mbuf, get the next one */
1195 		if (mbuf_avail == 0) {
1196 			m = m->next;
1197 
1198 			mbuf_offset = 0;
1199 			mbuf_avail  = rte_pktmbuf_data_len(m);
1200 		}
1201 
1202 		if (hdr_addr) {
1203 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1204 			if (rxvq_is_mergeable(dev))
1205 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1206 						num_buffers);
1207 
1208 			if (unlikely(hdr == &tmp_hdr)) {
1209 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1210 			} else {
1211 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1212 						dev->vhost_hlen, 0);
1213 				vhost_log_cache_write_iova(dev, vq,
1214 						buf_vec[0].buf_iova,
1215 						dev->vhost_hlen);
1216 			}
1217 
1218 			hdr_addr = 0;
1219 		}
1220 
1221 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1222 
1223 		if (is_async) {
1224 			if (async_fill_seg(dev, vq, m, mbuf_offset,
1225 					   buf_iova + buf_offset, cpy_len, true) < 0)
1226 				goto error;
1227 		} else {
1228 			sync_fill_seg(dev, vq, m, mbuf_offset,
1229 				      buf_addr + buf_offset,
1230 				      buf_iova + buf_offset, cpy_len, true);
1231 		}
1232 
1233 		mbuf_avail  -= cpy_len;
1234 		mbuf_offset += cpy_len;
1235 		buf_avail  -= cpy_len;
1236 		buf_offset += cpy_len;
1237 	}
1238 
1239 	if (is_async)
1240 		async_iter_finalize(async);
1241 
1242 	return 0;
1243 error:
1244 	if (is_async)
1245 		async_iter_cancel(async);
1246 
1247 	return -1;
1248 }
1249 
1250 static __rte_always_inline int
1251 vhost_enqueue_single_packed(struct virtio_net *dev,
1252 			    struct vhost_virtqueue *vq,
1253 			    struct rte_mbuf *pkt,
1254 			    struct buf_vector *buf_vec,
1255 			    uint16_t *nr_descs)
1256 {
1257 	uint16_t nr_vec = 0;
1258 	uint16_t avail_idx = vq->last_avail_idx;
1259 	uint16_t max_tries, tries = 0;
1260 	uint16_t buf_id = 0;
1261 	uint32_t len = 0;
1262 	uint16_t desc_count;
1263 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1264 	uint16_t num_buffers = 0;
1265 	uint32_t buffer_len[vq->size];
1266 	uint16_t buffer_buf_id[vq->size];
1267 	uint16_t buffer_desc_count[vq->size];
1268 
1269 	if (rxvq_is_mergeable(dev))
1270 		max_tries = vq->size - 1;
1271 	else
1272 		max_tries = 1;
1273 
1274 	while (size > 0) {
1275 		/*
1276 		 * if we tried all available ring items, and still
1277 		 * can't get enough buf, it means something abnormal
1278 		 * happened.
1279 		 */
1280 		if (unlikely(++tries > max_tries))
1281 			return -1;
1282 
1283 		if (unlikely(fill_vec_buf_packed(dev, vq,
1284 						avail_idx, &desc_count,
1285 						buf_vec, &nr_vec,
1286 						&buf_id, &len,
1287 						VHOST_ACCESS_RW) < 0))
1288 			return -1;
1289 
1290 		len = RTE_MIN(len, size);
1291 		size -= len;
1292 
1293 		buffer_len[num_buffers] = len;
1294 		buffer_buf_id[num_buffers] = buf_id;
1295 		buffer_desc_count[num_buffers] = desc_count;
1296 		num_buffers += 1;
1297 
1298 		*nr_descs += desc_count;
1299 		avail_idx += desc_count;
1300 		if (avail_idx >= vq->size)
1301 			avail_idx -= vq->size;
1302 	}
1303 
1304 	if (mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers, false) < 0)
1305 		return -1;
1306 
1307 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1308 					   buffer_desc_count, num_buffers);
1309 
1310 	return 0;
1311 }
1312 
1313 static __rte_noinline uint32_t
1314 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1315 	struct rte_mbuf **pkts, uint32_t count)
1316 {
1317 	uint32_t pkt_idx = 0;
1318 	uint16_t num_buffers;
1319 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1320 	uint16_t avail_head;
1321 
1322 	/*
1323 	 * The ordering between avail index and
1324 	 * desc reads needs to be enforced.
1325 	 */
1326 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1327 
1328 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1329 
1330 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1331 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1332 		uint16_t nr_vec = 0;
1333 
1334 		if (unlikely(reserve_avail_buf_split(dev, vq,
1335 						pkt_len, buf_vec, &num_buffers,
1336 						avail_head, &nr_vec) < 0)) {
1337 			VHOST_LOG_DATA(DEBUG,
1338 				"(%s) failed to get enough desc from vring\n",
1339 				dev->ifname);
1340 			vq->shadow_used_idx -= num_buffers;
1341 			break;
1342 		}
1343 
1344 		VHOST_LOG_DATA(DEBUG, "(%s) current index %d | end index %d\n",
1345 			dev->ifname, vq->last_avail_idx,
1346 			vq->last_avail_idx + num_buffers);
1347 
1348 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec,
1349 					num_buffers, false) < 0) {
1350 			vq->shadow_used_idx -= num_buffers;
1351 			break;
1352 		}
1353 
1354 		vq->last_avail_idx += num_buffers;
1355 	}
1356 
1357 	do_data_copy_enqueue(dev, vq);
1358 
1359 	if (likely(vq->shadow_used_idx)) {
1360 		flush_shadow_used_ring_split(dev, vq);
1361 		vhost_vring_call_split(dev, vq);
1362 	}
1363 
1364 	return pkt_idx;
1365 }
1366 
1367 static __rte_always_inline int
1368 virtio_dev_rx_sync_batch_check(struct virtio_net *dev,
1369 			   struct vhost_virtqueue *vq,
1370 			   struct rte_mbuf **pkts,
1371 			   uint64_t *desc_addrs,
1372 			   uint64_t *lens)
1373 {
1374 	bool wrap_counter = vq->avail_wrap_counter;
1375 	struct vring_packed_desc *descs = vq->desc_packed;
1376 	uint16_t avail_idx = vq->last_avail_idx;
1377 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1378 	uint16_t i;
1379 
1380 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1381 		return -1;
1382 
1383 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1384 		return -1;
1385 
1386 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1387 		if (unlikely(pkts[i]->next != NULL))
1388 			return -1;
1389 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1390 					    wrap_counter)))
1391 			return -1;
1392 	}
1393 
1394 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1395 		lens[i] = descs[avail_idx + i].len;
1396 
1397 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1398 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1399 			return -1;
1400 	}
1401 
1402 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1403 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1404 						  descs[avail_idx + i].addr,
1405 						  &lens[i],
1406 						  VHOST_ACCESS_RW);
1407 
1408 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1409 		if (unlikely(!desc_addrs[i]))
1410 			return -1;
1411 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1412 			return -1;
1413 	}
1414 
1415 	return 0;
1416 }
1417 
1418 static __rte_always_inline void
1419 virtio_dev_rx_batch_packed_copy(struct virtio_net *dev,
1420 			   struct vhost_virtqueue *vq,
1421 			   struct rte_mbuf **pkts,
1422 			   uint64_t *desc_addrs,
1423 			   uint64_t *lens)
1424 {
1425 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1426 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1427 	struct vring_packed_desc *descs = vq->desc_packed;
1428 	uint16_t avail_idx = vq->last_avail_idx;
1429 	uint16_t ids[PACKED_BATCH_SIZE];
1430 	uint16_t i;
1431 
1432 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1433 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1434 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1435 					(uintptr_t)desc_addrs[i];
1436 		lens[i] = pkts[i]->pkt_len +
1437 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1438 	}
1439 
1440 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1441 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1442 
1443 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1444 
1445 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1446 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1447 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1448 			   pkts[i]->pkt_len);
1449 	}
1450 
1451 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1452 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1453 					   lens[i]);
1454 
1455 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1456 		ids[i] = descs[avail_idx + i].id;
1457 
1458 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1459 }
1460 
1461 static __rte_always_inline int
1462 virtio_dev_rx_sync_batch_packed(struct virtio_net *dev,
1463 			   struct vhost_virtqueue *vq,
1464 			   struct rte_mbuf **pkts)
1465 {
1466 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1467 	uint64_t lens[PACKED_BATCH_SIZE];
1468 
1469 	if (virtio_dev_rx_sync_batch_check(dev, vq, pkts, desc_addrs, lens) == -1)
1470 		return -1;
1471 
1472 	if (vq->shadow_used_idx) {
1473 		do_data_copy_enqueue(dev, vq);
1474 		vhost_flush_enqueue_shadow_packed(dev, vq);
1475 	}
1476 
1477 	virtio_dev_rx_batch_packed_copy(dev, vq, pkts, desc_addrs, lens);
1478 
1479 	return 0;
1480 }
1481 
1482 static __rte_always_inline int16_t
1483 virtio_dev_rx_single_packed(struct virtio_net *dev,
1484 			    struct vhost_virtqueue *vq,
1485 			    struct rte_mbuf *pkt)
1486 {
1487 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1488 	uint16_t nr_descs = 0;
1489 
1490 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1491 						 &nr_descs) < 0)) {
1492 		VHOST_LOG_DATA(DEBUG, "(%s) failed to get enough desc from vring\n",
1493 				dev->ifname);
1494 		return -1;
1495 	}
1496 
1497 	VHOST_LOG_DATA(DEBUG, "(%s) current index %d | end index %d\n",
1498 			dev->ifname, vq->last_avail_idx,
1499 			vq->last_avail_idx + nr_descs);
1500 
1501 	vq_inc_last_avail_packed(vq, nr_descs);
1502 
1503 	return 0;
1504 }
1505 
1506 static __rte_noinline uint32_t
1507 virtio_dev_rx_packed(struct virtio_net *dev,
1508 		     struct vhost_virtqueue *__rte_restrict vq,
1509 		     struct rte_mbuf **__rte_restrict pkts,
1510 		     uint32_t count)
1511 {
1512 	uint32_t pkt_idx = 0;
1513 
1514 	do {
1515 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1516 
1517 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1518 			if (!virtio_dev_rx_sync_batch_packed(dev, vq,
1519 							&pkts[pkt_idx])) {
1520 				pkt_idx += PACKED_BATCH_SIZE;
1521 				continue;
1522 			}
1523 		}
1524 
1525 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1526 			break;
1527 		pkt_idx++;
1528 
1529 	} while (pkt_idx < count);
1530 
1531 	if (vq->shadow_used_idx) {
1532 		do_data_copy_enqueue(dev, vq);
1533 		vhost_flush_enqueue_shadow_packed(dev, vq);
1534 	}
1535 
1536 	if (pkt_idx)
1537 		vhost_vring_call_packed(dev, vq);
1538 
1539 	return pkt_idx;
1540 }
1541 
1542 static __rte_always_inline uint32_t
1543 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
1544 	struct rte_mbuf **pkts, uint32_t count)
1545 {
1546 	struct vhost_virtqueue *vq;
1547 	uint32_t nb_tx = 0;
1548 
1549 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
1550 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1551 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
1552 			dev->ifname, __func__, queue_id);
1553 		return 0;
1554 	}
1555 
1556 	vq = dev->virtqueue[queue_id];
1557 
1558 	rte_spinlock_lock(&vq->access_lock);
1559 
1560 	if (unlikely(!vq->enabled))
1561 		goto out_access_unlock;
1562 
1563 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1564 		vhost_user_iotlb_rd_lock(vq);
1565 
1566 	if (unlikely(!vq->access_ok))
1567 		if (unlikely(vring_translate(dev, vq) < 0))
1568 			goto out;
1569 
1570 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1571 	if (count == 0)
1572 		goto out;
1573 
1574 	if (vq_is_packed(dev))
1575 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1576 	else
1577 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1578 
1579 	vhost_queue_stats_update(dev, vq, pkts, nb_tx);
1580 
1581 out:
1582 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1583 		vhost_user_iotlb_rd_unlock(vq);
1584 
1585 out_access_unlock:
1586 	rte_spinlock_unlock(&vq->access_lock);
1587 
1588 	return nb_tx;
1589 }
1590 
1591 uint16_t
1592 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1593 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1594 {
1595 	struct virtio_net *dev = get_device(vid);
1596 
1597 	if (!dev)
1598 		return 0;
1599 
1600 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1601 		VHOST_LOG_DATA(ERR, "(%s) %s: built-in vhost net backend is disabled.\n",
1602 			dev->ifname, __func__);
1603 		return 0;
1604 	}
1605 
1606 	return virtio_dev_rx(dev, queue_id, pkts, count);
1607 }
1608 
1609 static __rte_always_inline uint16_t
1610 async_get_first_inflight_pkt_idx(struct vhost_virtqueue *vq)
1611 {
1612 	struct vhost_async *async = vq->async;
1613 
1614 	if (async->pkts_idx >= async->pkts_inflight_n)
1615 		return async->pkts_idx - async->pkts_inflight_n;
1616 	else
1617 		return vq->size - async->pkts_inflight_n + async->pkts_idx;
1618 }
1619 
1620 static __rte_always_inline void
1621 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1622 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1623 {
1624 	size_t elem_size = sizeof(struct vring_used_elem);
1625 
1626 	if (d_idx + count <= ring_size) {
1627 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1628 	} else {
1629 		uint16_t size = ring_size - d_idx;
1630 
1631 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1632 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1633 	}
1634 }
1635 
1636 static __rte_always_inline void
1637 store_dma_desc_info_packed(struct vring_used_elem_packed *s_ring,
1638 		struct vring_used_elem_packed *d_ring,
1639 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1640 {
1641 	size_t elem_size = sizeof(struct vring_used_elem_packed);
1642 
1643 	if (d_idx + count <= ring_size) {
1644 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1645 	} else {
1646 		uint16_t size = ring_size - d_idx;
1647 
1648 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1649 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1650 	}
1651 }
1652 
1653 static __rte_noinline uint32_t
1654 virtio_dev_rx_async_submit_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1655 		uint16_t queue_id, struct rte_mbuf **pkts, uint32_t count,
1656 		int16_t dma_id, uint16_t vchan_id)
1657 {
1658 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1659 	uint32_t pkt_idx = 0;
1660 	uint16_t num_buffers;
1661 	uint16_t avail_head;
1662 
1663 	struct vhost_async *async = vq->async;
1664 	struct async_inflight_info *pkts_info = async->pkts_info;
1665 	uint32_t pkt_err = 0;
1666 	uint16_t n_xfer;
1667 	uint16_t slot_idx = 0;
1668 
1669 	/*
1670 	 * The ordering between avail index and desc reads need to be enforced.
1671 	 */
1672 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1673 
1674 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1675 
1676 	async_iter_reset(async);
1677 
1678 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1679 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1680 		uint16_t nr_vec = 0;
1681 
1682 		if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec,
1683 						&num_buffers, avail_head, &nr_vec) < 0)) {
1684 			VHOST_LOG_DATA(DEBUG, "(%s) failed to get enough desc from vring\n",
1685 					dev->ifname);
1686 			vq->shadow_used_idx -= num_buffers;
1687 			break;
1688 		}
1689 
1690 		VHOST_LOG_DATA(DEBUG, "(%s) current index %d | end index %d\n",
1691 			dev->ifname, vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1692 
1693 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers, true) < 0) {
1694 			vq->shadow_used_idx -= num_buffers;
1695 			break;
1696 		}
1697 
1698 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
1699 		pkts_info[slot_idx].descs = num_buffers;
1700 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1701 
1702 		vq->last_avail_idx += num_buffers;
1703 	}
1704 
1705 	if (unlikely(pkt_idx == 0))
1706 		return 0;
1707 
1708 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
1709 			async->iov_iter, pkt_idx);
1710 
1711 	pkt_err = pkt_idx - n_xfer;
1712 	if (unlikely(pkt_err)) {
1713 		uint16_t num_descs = 0;
1714 
1715 		VHOST_LOG_DATA(DEBUG, "(%s) %s: failed to transfer %u packets for queue %u.\n",
1716 				dev->ifname, __func__, pkt_err, queue_id);
1717 
1718 		/* update number of completed packets */
1719 		pkt_idx = n_xfer;
1720 
1721 		/* calculate the sum of descriptors to revert */
1722 		while (pkt_err-- > 0) {
1723 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1724 			slot_idx--;
1725 		}
1726 
1727 		/* recover shadow used ring and available ring */
1728 		vq->shadow_used_idx -= num_descs;
1729 		vq->last_avail_idx -= num_descs;
1730 	}
1731 
1732 	/* keep used descriptors */
1733 	if (likely(vq->shadow_used_idx)) {
1734 		uint16_t to = async->desc_idx_split & (vq->size - 1);
1735 
1736 		store_dma_desc_info_split(vq->shadow_used_split,
1737 				async->descs_split, vq->size, 0, to,
1738 				vq->shadow_used_idx);
1739 
1740 		async->desc_idx_split += vq->shadow_used_idx;
1741 
1742 		async->pkts_idx += pkt_idx;
1743 		if (async->pkts_idx >= vq->size)
1744 			async->pkts_idx -= vq->size;
1745 
1746 		async->pkts_inflight_n += pkt_idx;
1747 		vq->shadow_used_idx = 0;
1748 	}
1749 
1750 	return pkt_idx;
1751 }
1752 
1753 
1754 static __rte_always_inline int
1755 vhost_enqueue_async_packed(struct virtio_net *dev,
1756 			    struct vhost_virtqueue *vq,
1757 			    struct rte_mbuf *pkt,
1758 			    struct buf_vector *buf_vec,
1759 			    uint16_t *nr_descs,
1760 			    uint16_t *nr_buffers)
1761 {
1762 	uint16_t nr_vec = 0;
1763 	uint16_t avail_idx = vq->last_avail_idx;
1764 	uint16_t max_tries, tries = 0;
1765 	uint16_t buf_id = 0;
1766 	uint32_t len = 0;
1767 	uint16_t desc_count = 0;
1768 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1769 	uint32_t buffer_len[vq->size];
1770 	uint16_t buffer_buf_id[vq->size];
1771 	uint16_t buffer_desc_count[vq->size];
1772 
1773 	if (rxvq_is_mergeable(dev))
1774 		max_tries = vq->size - 1;
1775 	else
1776 		max_tries = 1;
1777 
1778 	while (size > 0) {
1779 		/*
1780 		 * if we tried all available ring items, and still
1781 		 * can't get enough buf, it means something abnormal
1782 		 * happened.
1783 		 */
1784 		if (unlikely(++tries > max_tries))
1785 			return -1;
1786 
1787 		if (unlikely(fill_vec_buf_packed(dev, vq,
1788 						avail_idx, &desc_count,
1789 						buf_vec, &nr_vec,
1790 						&buf_id, &len,
1791 						VHOST_ACCESS_RW) < 0))
1792 			return -1;
1793 
1794 		len = RTE_MIN(len, size);
1795 		size -= len;
1796 
1797 		buffer_len[*nr_buffers] = len;
1798 		buffer_buf_id[*nr_buffers] = buf_id;
1799 		buffer_desc_count[*nr_buffers] = desc_count;
1800 		*nr_buffers += 1;
1801 		*nr_descs += desc_count;
1802 		avail_idx += desc_count;
1803 		if (avail_idx >= vq->size)
1804 			avail_idx -= vq->size;
1805 	}
1806 
1807 	if (unlikely(mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, true) < 0))
1808 		return -1;
1809 
1810 	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id, buffer_desc_count, *nr_buffers);
1811 
1812 	return 0;
1813 }
1814 
1815 static __rte_always_inline int16_t
1816 virtio_dev_rx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1817 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers)
1818 {
1819 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1820 
1821 	if (unlikely(vhost_enqueue_async_packed(dev, vq, pkt, buf_vec,
1822 					nr_descs, nr_buffers) < 0)) {
1823 		VHOST_LOG_DATA(DEBUG, "(%s) failed to get enough desc from vring\n", dev->ifname);
1824 		return -1;
1825 	}
1826 
1827 	VHOST_LOG_DATA(DEBUG, "(%s) current index %d | end index %d\n",
1828 			dev->ifname, vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1829 
1830 	return 0;
1831 }
1832 
1833 static __rte_always_inline void
1834 dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
1835 			uint32_t nr_err, uint32_t *pkt_idx)
1836 {
1837 	uint16_t descs_err = 0;
1838 	uint16_t buffers_err = 0;
1839 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
1840 
1841 	*pkt_idx -= nr_err;
1842 	/* calculate the sum of buffers and descs of DMA-error packets. */
1843 	while (nr_err-- > 0) {
1844 		descs_err += pkts_info[slot_idx % vq->size].descs;
1845 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
1846 		slot_idx--;
1847 	}
1848 
1849 	if (vq->last_avail_idx >= descs_err) {
1850 		vq->last_avail_idx -= descs_err;
1851 	} else {
1852 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
1853 		vq->avail_wrap_counter ^= 1;
1854 	}
1855 
1856 	vq->shadow_used_idx -= buffers_err;
1857 }
1858 
1859 static __rte_noinline uint32_t
1860 virtio_dev_rx_async_submit_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1861 		uint16_t queue_id, struct rte_mbuf **pkts, uint32_t count,
1862 		int16_t dma_id, uint16_t vchan_id)
1863 {
1864 	uint32_t pkt_idx = 0;
1865 	uint32_t remained = count;
1866 	uint16_t n_xfer;
1867 	uint16_t num_buffers;
1868 	uint16_t num_descs;
1869 
1870 	struct vhost_async *async = vq->async;
1871 	struct async_inflight_info *pkts_info = async->pkts_info;
1872 	uint32_t pkt_err = 0;
1873 	uint16_t slot_idx = 0;
1874 
1875 	do {
1876 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1877 
1878 		num_buffers = 0;
1879 		num_descs = 0;
1880 		if (unlikely(virtio_dev_rx_async_packed(dev, vq, pkts[pkt_idx],
1881 						&num_descs, &num_buffers) < 0))
1882 			break;
1883 
1884 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
1885 
1886 		pkts_info[slot_idx].descs = num_descs;
1887 		pkts_info[slot_idx].nr_buffers = num_buffers;
1888 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1889 
1890 		pkt_idx++;
1891 		remained--;
1892 		vq_inc_last_avail_packed(vq, num_descs);
1893 	} while (pkt_idx < count);
1894 
1895 	if (unlikely(pkt_idx == 0))
1896 		return 0;
1897 
1898 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
1899 			async->iov_iter, pkt_idx);
1900 
1901 	async_iter_reset(async);
1902 
1903 	pkt_err = pkt_idx - n_xfer;
1904 	if (unlikely(pkt_err)) {
1905 		VHOST_LOG_DATA(DEBUG, "(%s) %s: failed to transfer %u packets for queue %u.\n",
1906 				dev->ifname, __func__, pkt_err, queue_id);
1907 		dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
1908 	}
1909 
1910 	if (likely(vq->shadow_used_idx)) {
1911 		/* keep used descriptors. */
1912 		store_dma_desc_info_packed(vq->shadow_used_packed, async->buffers_packed,
1913 					vq->size, 0, async->buffer_idx_packed,
1914 					vq->shadow_used_idx);
1915 
1916 		async->buffer_idx_packed += vq->shadow_used_idx;
1917 		if (async->buffer_idx_packed >= vq->size)
1918 			async->buffer_idx_packed -= vq->size;
1919 
1920 		async->pkts_idx += pkt_idx;
1921 		if (async->pkts_idx >= vq->size)
1922 			async->pkts_idx -= vq->size;
1923 
1924 		vq->shadow_used_idx = 0;
1925 		async->pkts_inflight_n += pkt_idx;
1926 	}
1927 
1928 	return pkt_idx;
1929 }
1930 
1931 static __rte_always_inline void
1932 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
1933 {
1934 	struct vhost_async *async = vq->async;
1935 	uint16_t nr_left = n_descs;
1936 	uint16_t nr_copy;
1937 	uint16_t to, from;
1938 
1939 	do {
1940 		from = async->last_desc_idx_split & (vq->size - 1);
1941 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
1942 		to = vq->last_used_idx & (vq->size - 1);
1943 
1944 		if (to + nr_copy <= vq->size) {
1945 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
1946 					nr_copy * sizeof(struct vring_used_elem));
1947 		} else {
1948 			uint16_t size = vq->size - to;
1949 
1950 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
1951 					size * sizeof(struct vring_used_elem));
1952 			rte_memcpy(&vq->used->ring[0], &async->descs_split[from + size],
1953 					(nr_copy - size) * sizeof(struct vring_used_elem));
1954 		}
1955 
1956 		async->last_desc_idx_split += nr_copy;
1957 		vq->last_used_idx += nr_copy;
1958 		nr_left -= nr_copy;
1959 	} while (nr_left > 0);
1960 }
1961 
1962 static __rte_always_inline void
1963 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
1964 				uint16_t n_buffers)
1965 {
1966 	struct vhost_async *async = vq->async;
1967 	uint16_t from = async->last_buffer_idx_packed;
1968 	uint16_t used_idx = vq->last_used_idx;
1969 	uint16_t head_idx = vq->last_used_idx;
1970 	uint16_t head_flags = 0;
1971 	uint16_t i;
1972 
1973 	/* Split loop in two to save memory barriers */
1974 	for (i = 0; i < n_buffers; i++) {
1975 		vq->desc_packed[used_idx].id = async->buffers_packed[from].id;
1976 		vq->desc_packed[used_idx].len = async->buffers_packed[from].len;
1977 
1978 		used_idx += async->buffers_packed[from].count;
1979 		if (used_idx >= vq->size)
1980 			used_idx -= vq->size;
1981 
1982 		from++;
1983 		if (from >= vq->size)
1984 			from = 0;
1985 	}
1986 
1987 	/* The ordering for storing desc flags needs to be enforced. */
1988 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
1989 
1990 	from = async->last_buffer_idx_packed;
1991 
1992 	for (i = 0; i < n_buffers; i++) {
1993 		uint16_t flags;
1994 
1995 		if (async->buffers_packed[from].len)
1996 			flags = VRING_DESC_F_WRITE;
1997 		else
1998 			flags = 0;
1999 
2000 		if (vq->used_wrap_counter) {
2001 			flags |= VRING_DESC_F_USED;
2002 			flags |= VRING_DESC_F_AVAIL;
2003 		} else {
2004 			flags &= ~VRING_DESC_F_USED;
2005 			flags &= ~VRING_DESC_F_AVAIL;
2006 		}
2007 
2008 		if (i > 0) {
2009 			vq->desc_packed[vq->last_used_idx].flags = flags;
2010 		} else {
2011 			head_idx = vq->last_used_idx;
2012 			head_flags = flags;
2013 		}
2014 
2015 		vq_inc_last_used_packed(vq, async->buffers_packed[from].count);
2016 
2017 		from++;
2018 		if (from == vq->size)
2019 			from = 0;
2020 	}
2021 
2022 	vq->desc_packed[head_idx].flags = head_flags;
2023 	async->last_buffer_idx_packed = from;
2024 }
2025 
2026 static __rte_always_inline uint16_t
2027 vhost_poll_enqueue_completed(struct virtio_net *dev, uint16_t queue_id,
2028 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2029 		uint16_t vchan_id)
2030 {
2031 	struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
2032 	struct vhost_async *async = vq->async;
2033 	struct async_inflight_info *pkts_info = async->pkts_info;
2034 	uint16_t nr_cpl_pkts = 0;
2035 	uint16_t n_descs = 0, n_buffers = 0;
2036 	uint16_t start_idx, from, i;
2037 
2038 	/* Check completed copies for the given DMA vChannel */
2039 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
2040 
2041 	start_idx = async_get_first_inflight_pkt_idx(vq);
2042 	/**
2043 	 * Calculate the number of copy completed packets.
2044 	 * Note that there may be completed packets even if
2045 	 * no copies are reported done by the given DMA vChannel,
2046 	 * as it's possible that a virtqueue uses multiple DMA
2047 	 * vChannels.
2048 	 */
2049 	from = start_idx;
2050 	while (vq->async->pkts_cmpl_flag[from] && count--) {
2051 		vq->async->pkts_cmpl_flag[from] = false;
2052 		from++;
2053 		if (from >= vq->size)
2054 			from -= vq->size;
2055 		nr_cpl_pkts++;
2056 	}
2057 
2058 	if (nr_cpl_pkts == 0)
2059 		return 0;
2060 
2061 	for (i = 0; i < nr_cpl_pkts; i++) {
2062 		from = (start_idx + i) % vq->size;
2063 		/* Only used with packed ring */
2064 		n_buffers += pkts_info[from].nr_buffers;
2065 		/* Only used with split ring */
2066 		n_descs += pkts_info[from].descs;
2067 		pkts[i] = pkts_info[from].mbuf;
2068 	}
2069 
2070 	async->pkts_inflight_n -= nr_cpl_pkts;
2071 
2072 	if (likely(vq->enabled && vq->access_ok)) {
2073 		if (vq_is_packed(dev)) {
2074 			write_back_completed_descs_packed(vq, n_buffers);
2075 			vhost_vring_call_packed(dev, vq);
2076 		} else {
2077 			write_back_completed_descs_split(vq, n_descs);
2078 			__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
2079 			vhost_vring_call_split(dev, vq);
2080 		}
2081 	} else {
2082 		if (vq_is_packed(dev)) {
2083 			async->last_buffer_idx_packed += n_buffers;
2084 			if (async->last_buffer_idx_packed >= vq->size)
2085 				async->last_buffer_idx_packed -= vq->size;
2086 		} else {
2087 			async->last_desc_idx_split += n_descs;
2088 		}
2089 	}
2090 
2091 	return nr_cpl_pkts;
2092 }
2093 
2094 uint16_t
2095 rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
2096 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2097 		uint16_t vchan_id)
2098 {
2099 	struct virtio_net *dev = get_device(vid);
2100 	struct vhost_virtqueue *vq;
2101 	uint16_t n_pkts_cpl = 0;
2102 
2103 	if (unlikely(!dev))
2104 		return 0;
2105 
2106 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
2107 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2108 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
2109 			dev->ifname, __func__, queue_id);
2110 		return 0;
2111 	}
2112 
2113 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2114 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2115 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid channel %d:%u.\n", dev->ifname, __func__,
2116 			       dma_id, vchan_id);
2117 		return 0;
2118 	}
2119 
2120 	vq = dev->virtqueue[queue_id];
2121 
2122 	if (!rte_spinlock_trylock(&vq->access_lock)) {
2123 		VHOST_LOG_DATA(DEBUG, "(%s) %s: virtqueue %u is busy.\n", dev->ifname, __func__,
2124 				queue_id);
2125 		return 0;
2126 	}
2127 
2128 	if (unlikely(!vq->async)) {
2129 		VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for virtqueue %d.\n",
2130 				dev->ifname, __func__, queue_id);
2131 		goto out;
2132 	}
2133 
2134 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count, dma_id, vchan_id);
2135 
2136 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2137 	vq->stats.inflight_completed += n_pkts_cpl;
2138 
2139 out:
2140 	rte_spinlock_unlock(&vq->access_lock);
2141 
2142 	return n_pkts_cpl;
2143 }
2144 
2145 uint16_t
2146 rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
2147 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2148 		uint16_t vchan_id)
2149 {
2150 	struct virtio_net *dev = get_device(vid);
2151 	struct vhost_virtqueue *vq;
2152 	uint16_t n_pkts_cpl = 0;
2153 
2154 	if (!dev)
2155 		return 0;
2156 
2157 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
2158 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2159 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
2160 			dev->ifname, __func__, queue_id);
2161 		return 0;
2162 	}
2163 
2164 	vq = dev->virtqueue[queue_id];
2165 
2166 	if (unlikely(!rte_spinlock_is_locked(&vq->access_lock))) {
2167 		VHOST_LOG_DATA(ERR, "(%s) %s() called without access lock taken.\n",
2168 				dev->ifname, __func__);
2169 		return -1;
2170 	}
2171 
2172 	if (unlikely(!vq->async)) {
2173 		VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for queue id %d.\n",
2174 			dev->ifname, __func__, queue_id);
2175 		return 0;
2176 	}
2177 
2178 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2179 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2180 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid channel %d:%u.\n", dev->ifname, __func__,
2181 				dma_id, vchan_id);
2182 		return 0;
2183 	}
2184 
2185 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count, dma_id, vchan_id);
2186 
2187 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2188 	vq->stats.inflight_completed += n_pkts_cpl;
2189 
2190 	return n_pkts_cpl;
2191 }
2192 
2193 static __rte_always_inline uint32_t
2194 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
2195 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2196 {
2197 	struct vhost_virtqueue *vq;
2198 	uint32_t nb_tx = 0;
2199 
2200 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
2201 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2202 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
2203 			dev->ifname, __func__, queue_id);
2204 		return 0;
2205 	}
2206 
2207 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2208 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2209 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid channel %d:%u.\n", dev->ifname, __func__,
2210 			       dma_id, vchan_id);
2211 		return 0;
2212 	}
2213 
2214 	vq = dev->virtqueue[queue_id];
2215 
2216 	rte_spinlock_lock(&vq->access_lock);
2217 
2218 	if (unlikely(!vq->enabled || !vq->async))
2219 		goto out_access_unlock;
2220 
2221 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2222 		vhost_user_iotlb_rd_lock(vq);
2223 
2224 	if (unlikely(!vq->access_ok))
2225 		if (unlikely(vring_translate(dev, vq) < 0))
2226 			goto out;
2227 
2228 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
2229 	if (count == 0)
2230 		goto out;
2231 
2232 	if (vq_is_packed(dev))
2233 		nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, queue_id,
2234 				pkts, count, dma_id, vchan_id);
2235 	else
2236 		nb_tx = virtio_dev_rx_async_submit_split(dev, vq, queue_id,
2237 				pkts, count, dma_id, vchan_id);
2238 
2239 	vq->stats.inflight_submitted += nb_tx;
2240 
2241 out:
2242 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2243 		vhost_user_iotlb_rd_unlock(vq);
2244 
2245 out_access_unlock:
2246 	rte_spinlock_unlock(&vq->access_lock);
2247 
2248 	return nb_tx;
2249 }
2250 
2251 uint16_t
2252 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2253 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2254 		uint16_t vchan_id)
2255 {
2256 	struct virtio_net *dev = get_device(vid);
2257 
2258 	if (!dev)
2259 		return 0;
2260 
2261 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2262 		VHOST_LOG_DATA(ERR, "(%s) %s: built-in vhost net backend is disabled.\n",
2263 			dev->ifname, __func__);
2264 		return 0;
2265 	}
2266 
2267 	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count, dma_id, vchan_id);
2268 }
2269 
2270 static inline bool
2271 virtio_net_with_host_offload(struct virtio_net *dev)
2272 {
2273 	if (dev->features &
2274 			((1ULL << VIRTIO_NET_F_CSUM) |
2275 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2276 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2277 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2278 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2279 		return true;
2280 
2281 	return false;
2282 }
2283 
2284 static int
2285 parse_headers(struct rte_mbuf *m, uint8_t *l4_proto)
2286 {
2287 	struct rte_ipv4_hdr *ipv4_hdr;
2288 	struct rte_ipv6_hdr *ipv6_hdr;
2289 	struct rte_ether_hdr *eth_hdr;
2290 	uint16_t ethertype;
2291 	uint16_t data_len = rte_pktmbuf_data_len(m);
2292 
2293 	if (data_len < sizeof(struct rte_ether_hdr))
2294 		return -EINVAL;
2295 
2296 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2297 
2298 	m->l2_len = sizeof(struct rte_ether_hdr);
2299 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2300 
2301 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2302 		if (data_len < sizeof(struct rte_ether_hdr) +
2303 				sizeof(struct rte_vlan_hdr))
2304 			goto error;
2305 
2306 		struct rte_vlan_hdr *vlan_hdr =
2307 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2308 
2309 		m->l2_len += sizeof(struct rte_vlan_hdr);
2310 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2311 	}
2312 
2313 	switch (ethertype) {
2314 	case RTE_ETHER_TYPE_IPV4:
2315 		if (data_len < m->l2_len + sizeof(struct rte_ipv4_hdr))
2316 			goto error;
2317 		ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
2318 				m->l2_len);
2319 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2320 		if (data_len < m->l2_len + m->l3_len)
2321 			goto error;
2322 		m->ol_flags |= RTE_MBUF_F_TX_IPV4;
2323 		*l4_proto = ipv4_hdr->next_proto_id;
2324 		break;
2325 	case RTE_ETHER_TYPE_IPV6:
2326 		if (data_len < m->l2_len + sizeof(struct rte_ipv6_hdr))
2327 			goto error;
2328 		ipv6_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
2329 				m->l2_len);
2330 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2331 		m->ol_flags |= RTE_MBUF_F_TX_IPV6;
2332 		*l4_proto = ipv6_hdr->proto;
2333 		break;
2334 	default:
2335 		/* a valid L3 header is needed for further L4 parsing */
2336 		goto error;
2337 	}
2338 
2339 	/* both CSUM and GSO need a valid L4 header */
2340 	switch (*l4_proto) {
2341 	case IPPROTO_TCP:
2342 		if (data_len < m->l2_len + m->l3_len +
2343 				sizeof(struct rte_tcp_hdr))
2344 			goto error;
2345 		break;
2346 	case IPPROTO_UDP:
2347 		if (data_len < m->l2_len + m->l3_len +
2348 				sizeof(struct rte_udp_hdr))
2349 			goto error;
2350 		break;
2351 	case IPPROTO_SCTP:
2352 		if (data_len < m->l2_len + m->l3_len +
2353 				sizeof(struct rte_sctp_hdr))
2354 			goto error;
2355 		break;
2356 	default:
2357 		goto error;
2358 	}
2359 
2360 	return 0;
2361 
2362 error:
2363 	m->l2_len = 0;
2364 	m->l3_len = 0;
2365 	m->ol_flags = 0;
2366 	return -EINVAL;
2367 }
2368 
2369 static __rte_always_inline void
2370 vhost_dequeue_offload_legacy(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2371 		struct rte_mbuf *m)
2372 {
2373 	uint8_t l4_proto = 0;
2374 	struct rte_tcp_hdr *tcp_hdr = NULL;
2375 	uint16_t tcp_len;
2376 	uint16_t data_len = rte_pktmbuf_data_len(m);
2377 
2378 	if (parse_headers(m, &l4_proto) < 0)
2379 		return;
2380 
2381 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2382 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2383 			switch (hdr->csum_offset) {
2384 			case (offsetof(struct rte_tcp_hdr, cksum)):
2385 				if (l4_proto != IPPROTO_TCP)
2386 					goto error;
2387 				m->ol_flags |= RTE_MBUF_F_TX_TCP_CKSUM;
2388 				break;
2389 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2390 				if (l4_proto != IPPROTO_UDP)
2391 					goto error;
2392 				m->ol_flags |= RTE_MBUF_F_TX_UDP_CKSUM;
2393 				break;
2394 			case (offsetof(struct rte_sctp_hdr, cksum)):
2395 				if (l4_proto != IPPROTO_SCTP)
2396 					goto error;
2397 				m->ol_flags |= RTE_MBUF_F_TX_SCTP_CKSUM;
2398 				break;
2399 			default:
2400 				goto error;
2401 			}
2402 		} else {
2403 			goto error;
2404 		}
2405 	}
2406 
2407 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2408 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2409 		case VIRTIO_NET_HDR_GSO_TCPV4:
2410 		case VIRTIO_NET_HDR_GSO_TCPV6:
2411 			if (l4_proto != IPPROTO_TCP)
2412 				goto error;
2413 			tcp_hdr = rte_pktmbuf_mtod_offset(m,
2414 					struct rte_tcp_hdr *,
2415 					m->l2_len + m->l3_len);
2416 			tcp_len = (tcp_hdr->data_off & 0xf0) >> 2;
2417 			if (data_len < m->l2_len + m->l3_len + tcp_len)
2418 				goto error;
2419 			m->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
2420 			m->tso_segsz = hdr->gso_size;
2421 			m->l4_len = tcp_len;
2422 			break;
2423 		case VIRTIO_NET_HDR_GSO_UDP:
2424 			if (l4_proto != IPPROTO_UDP)
2425 				goto error;
2426 			m->ol_flags |= RTE_MBUF_F_TX_UDP_SEG;
2427 			m->tso_segsz = hdr->gso_size;
2428 			m->l4_len = sizeof(struct rte_udp_hdr);
2429 			break;
2430 		default:
2431 			VHOST_LOG_DATA(WARNING, "(%s) unsupported gso type %u.\n",
2432 					dev->ifname, hdr->gso_type);
2433 			goto error;
2434 		}
2435 	}
2436 	return;
2437 
2438 error:
2439 	m->l2_len = 0;
2440 	m->l3_len = 0;
2441 	m->ol_flags = 0;
2442 }
2443 
2444 static __rte_always_inline void
2445 vhost_dequeue_offload(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2446 		struct rte_mbuf *m, bool legacy_ol_flags)
2447 {
2448 	struct rte_net_hdr_lens hdr_lens;
2449 	int l4_supported = 0;
2450 	uint32_t ptype;
2451 
2452 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2453 		return;
2454 
2455 	if (legacy_ol_flags) {
2456 		vhost_dequeue_offload_legacy(dev, hdr, m);
2457 		return;
2458 	}
2459 
2460 	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
2461 
2462 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2463 	m->packet_type = ptype;
2464 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2465 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2466 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2467 		l4_supported = 1;
2468 
2469 	/* According to Virtio 1.1 spec, the device only needs to look at
2470 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2471 	 * This differs from the processing incoming packets path where the
2472 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2473 	 * device.
2474 	 *
2475 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2476 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2477 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2478 	 *
2479 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2480 	 * The device MUST ignore flag bits that it does not recognize.
2481 	 */
2482 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2483 		uint32_t hdrlen;
2484 
2485 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2486 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2487 			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
2488 		} else {
2489 			/* Unknown proto or tunnel, do sw cksum. We can assume
2490 			 * the cksum field is in the first segment since the
2491 			 * buffers we provided to the host are large enough.
2492 			 * In case of SCTP, this will be wrong since it's a CRC
2493 			 * but there's nothing we can do.
2494 			 */
2495 			uint16_t csum = 0, off;
2496 
2497 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2498 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2499 				return;
2500 			if (likely(csum != 0xffff))
2501 				csum = ~csum;
2502 			off = hdr->csum_offset + hdr->csum_start;
2503 			if (rte_pktmbuf_data_len(m) >= off + 1)
2504 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2505 		}
2506 	}
2507 
2508 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2509 		if (hdr->gso_size == 0)
2510 			return;
2511 
2512 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2513 		case VIRTIO_NET_HDR_GSO_TCPV4:
2514 		case VIRTIO_NET_HDR_GSO_TCPV6:
2515 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2516 				break;
2517 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2518 			m->tso_segsz = hdr->gso_size;
2519 			break;
2520 		case VIRTIO_NET_HDR_GSO_UDP:
2521 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2522 				break;
2523 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2524 			m->tso_segsz = hdr->gso_size;
2525 			break;
2526 		default:
2527 			break;
2528 		}
2529 	}
2530 }
2531 
2532 static __rte_noinline void
2533 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2534 		struct buf_vector *buf_vec)
2535 {
2536 	uint64_t len;
2537 	uint64_t remain = sizeof(struct virtio_net_hdr);
2538 	uint64_t src;
2539 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2540 
2541 	while (remain) {
2542 		len = RTE_MIN(remain, buf_vec->buf_len);
2543 		src = buf_vec->buf_addr;
2544 		rte_memcpy((void *)(uintptr_t)dst,
2545 				(void *)(uintptr_t)src, len);
2546 
2547 		remain -= len;
2548 		dst += len;
2549 		buf_vec++;
2550 	}
2551 }
2552 
2553 static __rte_always_inline int
2554 desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2555 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2556 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2557 		  bool legacy_ol_flags, uint16_t slot_idx, bool is_async)
2558 {
2559 	uint32_t buf_avail, buf_offset, buf_len;
2560 	uint64_t buf_addr, buf_iova;
2561 	uint32_t mbuf_avail, mbuf_offset;
2562 	uint32_t cpy_len;
2563 	struct rte_mbuf *cur = m, *prev = m;
2564 	struct virtio_net_hdr tmp_hdr;
2565 	struct virtio_net_hdr *hdr = NULL;
2566 	/* A counter to avoid desc dead loop chain */
2567 	uint16_t vec_idx = 0;
2568 	struct vhost_async *async = vq->async;
2569 	struct async_inflight_info *pkts_info;
2570 
2571 	buf_addr = buf_vec[vec_idx].buf_addr;
2572 	buf_iova = buf_vec[vec_idx].buf_iova;
2573 	buf_len = buf_vec[vec_idx].buf_len;
2574 
2575 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
2576 		return -1;
2577 
2578 	if (virtio_net_with_host_offload(dev)) {
2579 		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
2580 			/*
2581 			 * No luck, the virtio-net header doesn't fit
2582 			 * in a contiguous virtual area.
2583 			 */
2584 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2585 			hdr = &tmp_hdr;
2586 		} else {
2587 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
2588 		}
2589 	}
2590 
2591 	/*
2592 	 * A virtio driver normally uses at least 2 desc buffers
2593 	 * for Tx: the first for storing the header, and others
2594 	 * for storing the data.
2595 	 */
2596 	if (unlikely(buf_len < dev->vhost_hlen)) {
2597 		buf_offset = dev->vhost_hlen - buf_len;
2598 		vec_idx++;
2599 		buf_addr = buf_vec[vec_idx].buf_addr;
2600 		buf_iova = buf_vec[vec_idx].buf_iova;
2601 		buf_len = buf_vec[vec_idx].buf_len;
2602 		buf_avail  = buf_len - buf_offset;
2603 	} else if (buf_len == dev->vhost_hlen) {
2604 		if (unlikely(++vec_idx >= nr_vec))
2605 			goto error;
2606 		buf_addr = buf_vec[vec_idx].buf_addr;
2607 		buf_iova = buf_vec[vec_idx].buf_iova;
2608 		buf_len = buf_vec[vec_idx].buf_len;
2609 
2610 		buf_offset = 0;
2611 		buf_avail = buf_len;
2612 	} else {
2613 		buf_offset = dev->vhost_hlen;
2614 		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
2615 	}
2616 
2617 	PRINT_PACKET(dev,
2618 			(uintptr_t)(buf_addr + buf_offset),
2619 			(uint32_t)buf_avail, 0);
2620 
2621 	mbuf_offset = 0;
2622 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2623 
2624 	if (is_async) {
2625 		pkts_info = async->pkts_info;
2626 		if (async_iter_initialize(dev, async))
2627 			return -1;
2628 	}
2629 
2630 	while (1) {
2631 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2632 
2633 		if (is_async) {
2634 			if (async_fill_seg(dev, vq, cur, mbuf_offset,
2635 					   buf_iova + buf_offset, cpy_len, false) < 0)
2636 				goto error;
2637 		} else {
2638 			sync_fill_seg(dev, vq, cur, mbuf_offset,
2639 				      buf_addr + buf_offset,
2640 				      buf_iova + buf_offset, cpy_len, false);
2641 		}
2642 
2643 		mbuf_avail  -= cpy_len;
2644 		mbuf_offset += cpy_len;
2645 		buf_avail -= cpy_len;
2646 		buf_offset += cpy_len;
2647 
2648 		/* This buf reaches to its end, get the next one */
2649 		if (buf_avail == 0) {
2650 			if (++vec_idx >= nr_vec)
2651 				break;
2652 
2653 			buf_addr = buf_vec[vec_idx].buf_addr;
2654 			buf_iova = buf_vec[vec_idx].buf_iova;
2655 			buf_len = buf_vec[vec_idx].buf_len;
2656 
2657 			buf_offset = 0;
2658 			buf_avail  = buf_len;
2659 
2660 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2661 					(uint32_t)buf_avail, 0);
2662 		}
2663 
2664 		/*
2665 		 * This mbuf reaches to its end, get a new one
2666 		 * to hold more data.
2667 		 */
2668 		if (mbuf_avail == 0) {
2669 			cur = rte_pktmbuf_alloc(mbuf_pool);
2670 			if (unlikely(cur == NULL)) {
2671 				VHOST_LOG_DATA(ERR, "(%s) failed to allocate memory for mbuf.\n",
2672 						dev->ifname);
2673 				goto error;
2674 			}
2675 
2676 			prev->next = cur;
2677 			prev->data_len = mbuf_offset;
2678 			m->nb_segs += 1;
2679 			m->pkt_len += mbuf_offset;
2680 			prev = cur;
2681 
2682 			mbuf_offset = 0;
2683 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
2684 		}
2685 	}
2686 
2687 	prev->data_len = mbuf_offset;
2688 	m->pkt_len    += mbuf_offset;
2689 
2690 	if (is_async) {
2691 		async_iter_finalize(async);
2692 		if (hdr)
2693 			pkts_info[slot_idx].nethdr = *hdr;
2694 	} else {
2695 		if (hdr)
2696 			vhost_dequeue_offload(dev, hdr, m, legacy_ol_flags);
2697 	}
2698 
2699 	return 0;
2700 error:
2701 	if (is_async)
2702 		async_iter_cancel(async);
2703 
2704 	return -1;
2705 }
2706 
2707 static void
2708 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
2709 {
2710 	rte_free(opaque);
2711 }
2712 
2713 static int
2714 virtio_dev_extbuf_alloc(struct virtio_net *dev, struct rte_mbuf *pkt, uint32_t size)
2715 {
2716 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
2717 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
2718 	uint16_t buf_len;
2719 	rte_iova_t iova;
2720 	void *buf;
2721 
2722 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
2723 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
2724 
2725 	if (unlikely(total_len > UINT16_MAX))
2726 		return -ENOSPC;
2727 
2728 	buf_len = total_len;
2729 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
2730 	if (unlikely(buf == NULL))
2731 		return -ENOMEM;
2732 
2733 	/* Initialize shinfo */
2734 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
2735 						virtio_dev_extbuf_free, buf);
2736 	if (unlikely(shinfo == NULL)) {
2737 		rte_free(buf);
2738 		VHOST_LOG_DATA(ERR, "(%s) failed to init shinfo\n", dev->ifname);
2739 		return -1;
2740 	}
2741 
2742 	iova = rte_malloc_virt2iova(buf);
2743 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
2744 	rte_pktmbuf_reset_headroom(pkt);
2745 
2746 	return 0;
2747 }
2748 
2749 /*
2750  * Prepare a host supported pktmbuf.
2751  */
2752 static __rte_always_inline int
2753 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
2754 			 uint32_t data_len)
2755 {
2756 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
2757 		return 0;
2758 
2759 	/* attach an external buffer if supported */
2760 	if (dev->extbuf && !virtio_dev_extbuf_alloc(dev, pkt, data_len))
2761 		return 0;
2762 
2763 	/* check if chained buffers are allowed */
2764 	if (!dev->linearbuf)
2765 		return 0;
2766 
2767 	return -1;
2768 }
2769 
2770 __rte_always_inline
2771 static uint16_t
2772 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
2773 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
2774 	bool legacy_ol_flags)
2775 {
2776 	uint16_t i;
2777 	uint16_t free_entries;
2778 	uint16_t dropped = 0;
2779 	static bool allocerr_warned;
2780 
2781 	/*
2782 	 * The ordering between avail index and
2783 	 * desc reads needs to be enforced.
2784 	 */
2785 	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
2786 			vq->last_avail_idx;
2787 	if (free_entries == 0)
2788 		return 0;
2789 
2790 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
2791 
2792 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
2793 
2794 	count = RTE_MIN(count, MAX_PKT_BURST);
2795 	count = RTE_MIN(count, free_entries);
2796 	VHOST_LOG_DATA(DEBUG, "(%s) about to dequeue %u buffers\n",
2797 			dev->ifname, count);
2798 
2799 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
2800 		return 0;
2801 
2802 	for (i = 0; i < count; i++) {
2803 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
2804 		uint16_t head_idx;
2805 		uint32_t buf_len;
2806 		uint16_t nr_vec = 0;
2807 		int err;
2808 
2809 		if (unlikely(fill_vec_buf_split(dev, vq,
2810 						vq->last_avail_idx + i,
2811 						&nr_vec, buf_vec,
2812 						&head_idx, &buf_len,
2813 						VHOST_ACCESS_RO) < 0))
2814 			break;
2815 
2816 		update_shadow_used_ring_split(vq, head_idx, 0);
2817 
2818 		err = virtio_dev_pktmbuf_prep(dev, pkts[i], buf_len);
2819 		if (unlikely(err)) {
2820 			/*
2821 			 * mbuf allocation fails for jumbo packets when external
2822 			 * buffer allocation is not allowed and linear buffer
2823 			 * is required. Drop this packet.
2824 			 */
2825 			if (!allocerr_warned) {
2826 				VHOST_LOG_DATA(ERR, "(%s) failed mbuf alloc of size %d from %s.\n",
2827 					dev->ifname, buf_len, mbuf_pool->name);
2828 				allocerr_warned = true;
2829 			}
2830 			dropped += 1;
2831 			i++;
2832 			break;
2833 		}
2834 
2835 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
2836 				   mbuf_pool, legacy_ol_flags, 0, false);
2837 		if (unlikely(err)) {
2838 			if (!allocerr_warned) {
2839 				VHOST_LOG_DATA(ERR, "(%s) failed to copy desc to mbuf.\n",
2840 					dev->ifname);
2841 				allocerr_warned = true;
2842 			}
2843 			dropped += 1;
2844 			i++;
2845 			break;
2846 		}
2847 
2848 	}
2849 
2850 	if (dropped)
2851 		rte_pktmbuf_free_bulk(&pkts[i - 1], count - i + 1);
2852 
2853 	vq->last_avail_idx += i;
2854 
2855 	do_data_copy_dequeue(vq);
2856 	if (unlikely(i < count))
2857 		vq->shadow_used_idx = i;
2858 	if (likely(vq->shadow_used_idx)) {
2859 		flush_shadow_used_ring_split(dev, vq);
2860 		vhost_vring_call_split(dev, vq);
2861 	}
2862 
2863 	return (i - dropped);
2864 }
2865 
2866 __rte_noinline
2867 static uint16_t
2868 virtio_dev_tx_split_legacy(struct virtio_net *dev,
2869 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2870 	struct rte_mbuf **pkts, uint16_t count)
2871 {
2872 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
2873 }
2874 
2875 __rte_noinline
2876 static uint16_t
2877 virtio_dev_tx_split_compliant(struct virtio_net *dev,
2878 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2879 	struct rte_mbuf **pkts, uint16_t count)
2880 {
2881 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
2882 }
2883 
2884 static __rte_always_inline int
2885 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
2886 				 struct vhost_virtqueue *vq,
2887 				 struct rte_mbuf **pkts,
2888 				 uint16_t avail_idx,
2889 				 uintptr_t *desc_addrs,
2890 				 uint16_t *ids)
2891 {
2892 	bool wrap = vq->avail_wrap_counter;
2893 	struct vring_packed_desc *descs = vq->desc_packed;
2894 	uint64_t lens[PACKED_BATCH_SIZE];
2895 	uint64_t buf_lens[PACKED_BATCH_SIZE];
2896 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2897 	uint16_t flags, i;
2898 
2899 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
2900 		return -1;
2901 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
2902 		return -1;
2903 
2904 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2905 		flags = descs[avail_idx + i].flags;
2906 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
2907 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
2908 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
2909 			return -1;
2910 	}
2911 
2912 	rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
2913 
2914 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2915 		lens[i] = descs[avail_idx + i].len;
2916 
2917 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2918 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
2919 						  descs[avail_idx + i].addr,
2920 						  &lens[i], VHOST_ACCESS_RW);
2921 	}
2922 
2923 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2924 		if (unlikely(!desc_addrs[i]))
2925 			return -1;
2926 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
2927 			return -1;
2928 	}
2929 
2930 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2931 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
2932 			goto err;
2933 	}
2934 
2935 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2936 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
2937 
2938 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2939 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
2940 			goto err;
2941 	}
2942 
2943 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2944 		pkts[i]->pkt_len = lens[i] - buf_offset;
2945 		pkts[i]->data_len = pkts[i]->pkt_len;
2946 		ids[i] = descs[avail_idx + i].id;
2947 	}
2948 
2949 	return 0;
2950 
2951 err:
2952 	return -1;
2953 }
2954 
2955 static __rte_always_inline int
2956 virtio_dev_tx_batch_packed(struct virtio_net *dev,
2957 			   struct vhost_virtqueue *vq,
2958 			   struct rte_mbuf **pkts,
2959 			   bool legacy_ol_flags)
2960 {
2961 	uint16_t avail_idx = vq->last_avail_idx;
2962 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2963 	struct virtio_net_hdr *hdr;
2964 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
2965 	uint16_t ids[PACKED_BATCH_SIZE];
2966 	uint16_t i;
2967 
2968 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
2969 					     desc_addrs, ids))
2970 		return -1;
2971 
2972 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2973 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2974 
2975 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2976 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
2977 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
2978 			   pkts[i]->pkt_len);
2979 
2980 	if (virtio_net_with_host_offload(dev)) {
2981 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2982 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
2983 			vhost_dequeue_offload(dev, hdr, pkts[i], legacy_ol_flags);
2984 		}
2985 	}
2986 
2987 	if (virtio_net_is_inorder(dev))
2988 		vhost_shadow_dequeue_batch_packed_inorder(vq,
2989 			ids[PACKED_BATCH_SIZE - 1]);
2990 	else
2991 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
2992 
2993 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2994 
2995 	return 0;
2996 }
2997 
2998 static __rte_always_inline int
2999 vhost_dequeue_single_packed(struct virtio_net *dev,
3000 			    struct vhost_virtqueue *vq,
3001 			    struct rte_mempool *mbuf_pool,
3002 			    struct rte_mbuf *pkts,
3003 			    uint16_t *buf_id,
3004 			    uint16_t *desc_count,
3005 			    bool legacy_ol_flags)
3006 {
3007 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3008 	uint32_t buf_len;
3009 	uint16_t nr_vec = 0;
3010 	int err;
3011 	static bool allocerr_warned;
3012 
3013 	if (unlikely(fill_vec_buf_packed(dev, vq,
3014 					 vq->last_avail_idx, desc_count,
3015 					 buf_vec, &nr_vec,
3016 					 buf_id, &buf_len,
3017 					 VHOST_ACCESS_RO) < 0))
3018 		return -1;
3019 
3020 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3021 		if (!allocerr_warned) {
3022 			VHOST_LOG_DATA(ERR, "(%s) failed mbuf alloc of size %d from %s.\n",
3023 				dev->ifname, buf_len, mbuf_pool->name);
3024 			allocerr_warned = true;
3025 		}
3026 		return -1;
3027 	}
3028 
3029 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
3030 			   mbuf_pool, legacy_ol_flags, 0, false);
3031 	if (unlikely(err)) {
3032 		if (!allocerr_warned) {
3033 			VHOST_LOG_DATA(ERR, "(%s) failed to copy desc to mbuf.\n",
3034 				dev->ifname);
3035 			allocerr_warned = true;
3036 		}
3037 		return -1;
3038 	}
3039 
3040 	return 0;
3041 }
3042 
3043 static __rte_always_inline int
3044 virtio_dev_tx_single_packed(struct virtio_net *dev,
3045 			    struct vhost_virtqueue *vq,
3046 			    struct rte_mempool *mbuf_pool,
3047 			    struct rte_mbuf *pkts,
3048 			    bool legacy_ol_flags)
3049 {
3050 
3051 	uint16_t buf_id, desc_count = 0;
3052 	int ret;
3053 
3054 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
3055 					&desc_count, legacy_ol_flags);
3056 
3057 	if (likely(desc_count > 0)) {
3058 		if (virtio_net_is_inorder(dev))
3059 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
3060 								   desc_count);
3061 		else
3062 			vhost_shadow_dequeue_single_packed(vq, buf_id,
3063 					desc_count);
3064 
3065 		vq_inc_last_avail_packed(vq, desc_count);
3066 	}
3067 
3068 	return ret;
3069 }
3070 
3071 __rte_always_inline
3072 static uint16_t
3073 virtio_dev_tx_packed(struct virtio_net *dev,
3074 		     struct vhost_virtqueue *__rte_restrict vq,
3075 		     struct rte_mempool *mbuf_pool,
3076 		     struct rte_mbuf **__rte_restrict pkts,
3077 		     uint32_t count,
3078 		     bool legacy_ol_flags)
3079 {
3080 	uint32_t pkt_idx = 0;
3081 
3082 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
3083 		return 0;
3084 
3085 	do {
3086 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
3087 
3088 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
3089 			if (!virtio_dev_tx_batch_packed(dev, vq,
3090 							&pkts[pkt_idx],
3091 							legacy_ol_flags)) {
3092 				pkt_idx += PACKED_BATCH_SIZE;
3093 				continue;
3094 			}
3095 		}
3096 
3097 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
3098 						pkts[pkt_idx],
3099 						legacy_ol_flags))
3100 			break;
3101 		pkt_idx++;
3102 	} while (pkt_idx < count);
3103 
3104 	if (pkt_idx != count)
3105 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
3106 
3107 	if (vq->shadow_used_idx) {
3108 		do_data_copy_dequeue(vq);
3109 
3110 		vhost_flush_dequeue_shadow_packed(dev, vq);
3111 		vhost_vring_call_packed(dev, vq);
3112 	}
3113 
3114 	return pkt_idx;
3115 }
3116 
3117 __rte_noinline
3118 static uint16_t
3119 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
3120 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3121 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3122 {
3123 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
3124 }
3125 
3126 __rte_noinline
3127 static uint16_t
3128 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
3129 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3130 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3131 {
3132 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
3133 }
3134 
3135 uint16_t
3136 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
3137 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
3138 {
3139 	struct virtio_net *dev;
3140 	struct rte_mbuf *rarp_mbuf = NULL;
3141 	struct vhost_virtqueue *vq;
3142 	int16_t success = 1;
3143 
3144 	dev = get_device(vid);
3145 	if (!dev)
3146 		return 0;
3147 
3148 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
3149 		VHOST_LOG_DATA(ERR, "(%s) %s: built-in vhost net backend is disabled.\n",
3150 				dev->ifname, __func__);
3151 		return 0;
3152 	}
3153 
3154 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
3155 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
3156 				dev->ifname, __func__, queue_id);
3157 		return 0;
3158 	}
3159 
3160 	vq = dev->virtqueue[queue_id];
3161 
3162 	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
3163 		return 0;
3164 
3165 	if (unlikely(!vq->enabled)) {
3166 		count = 0;
3167 		goto out_access_unlock;
3168 	}
3169 
3170 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
3171 		vhost_user_iotlb_rd_lock(vq);
3172 
3173 	if (unlikely(!vq->access_ok))
3174 		if (unlikely(vring_translate(dev, vq) < 0)) {
3175 			count = 0;
3176 			goto out;
3177 		}
3178 
3179 	/*
3180 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
3181 	 * array, to looks like that guest actually send such packet.
3182 	 *
3183 	 * Check user_send_rarp() for more information.
3184 	 *
3185 	 * broadcast_rarp shares a cacheline in the virtio_net structure
3186 	 * with some fields that are accessed during enqueue and
3187 	 * __atomic_compare_exchange_n causes a write if performed compare
3188 	 * and exchange. This could result in false sharing between enqueue
3189 	 * and dequeue.
3190 	 *
3191 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
3192 	 * and only performing compare and exchange if the read indicates it
3193 	 * is likely to be set.
3194 	 */
3195 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
3196 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
3197 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
3198 
3199 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
3200 		if (rarp_mbuf == NULL) {
3201 			VHOST_LOG_DATA(ERR, "(%s) failed to make RARP packet.\n", dev->ifname);
3202 			count = 0;
3203 			goto out;
3204 		}
3205 		/*
3206 		 * Inject it to the head of "pkts" array, so that switch's mac
3207 		 * learning table will get updated first.
3208 		 */
3209 		pkts[0] = rarp_mbuf;
3210 		vhost_queue_stats_update(dev, vq, pkts, 1);
3211 		pkts++;
3212 		count -= 1;
3213 	}
3214 
3215 	if (vq_is_packed(dev)) {
3216 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3217 			count = virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool, pkts, count);
3218 		else
3219 			count = virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool, pkts, count);
3220 	} else {
3221 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3222 			count = virtio_dev_tx_split_legacy(dev, vq, mbuf_pool, pkts, count);
3223 		else
3224 			count = virtio_dev_tx_split_compliant(dev, vq, mbuf_pool, pkts, count);
3225 	}
3226 
3227 	vhost_queue_stats_update(dev, vq, pkts, count);
3228 
3229 out:
3230 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
3231 		vhost_user_iotlb_rd_unlock(vq);
3232 
3233 out_access_unlock:
3234 	rte_spinlock_unlock(&vq->access_lock);
3235 
3236 	if (unlikely(rarp_mbuf != NULL))
3237 		count += 1;
3238 
3239 	return count;
3240 }
3241 
3242 static __rte_always_inline uint16_t
3243 async_poll_dequeue_completed_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3244 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
3245 		uint16_t vchan_id, bool legacy_ol_flags)
3246 {
3247 	uint16_t start_idx, from, i;
3248 	uint16_t nr_cpl_pkts = 0;
3249 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
3250 
3251 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
3252 
3253 	start_idx = async_get_first_inflight_pkt_idx(vq);
3254 
3255 	from = start_idx;
3256 	while (vq->async->pkts_cmpl_flag[from] && count--) {
3257 		vq->async->pkts_cmpl_flag[from] = false;
3258 		from = (from + 1) & (vq->size - 1);
3259 		nr_cpl_pkts++;
3260 	}
3261 
3262 	if (nr_cpl_pkts == 0)
3263 		return 0;
3264 
3265 	for (i = 0; i < nr_cpl_pkts; i++) {
3266 		from = (start_idx + i) & (vq->size - 1);
3267 		pkts[i] = pkts_info[from].mbuf;
3268 
3269 		if (virtio_net_with_host_offload(dev))
3270 			vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i],
3271 					      legacy_ol_flags);
3272 	}
3273 
3274 	/* write back completed descs to used ring and update used idx */
3275 	write_back_completed_descs_split(vq, nr_cpl_pkts);
3276 	__atomic_add_fetch(&vq->used->idx, nr_cpl_pkts, __ATOMIC_RELEASE);
3277 	vhost_vring_call_split(dev, vq);
3278 
3279 	vq->async->pkts_inflight_n -= nr_cpl_pkts;
3280 
3281 	return nr_cpl_pkts;
3282 }
3283 
3284 static __rte_always_inline uint16_t
3285 virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3286 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3287 		int16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
3288 {
3289 	static bool allocerr_warned;
3290 	bool dropped = false;
3291 	uint16_t free_entries;
3292 	uint16_t pkt_idx, slot_idx = 0;
3293 	uint16_t nr_done_pkts = 0;
3294 	uint16_t pkt_err = 0;
3295 	uint16_t n_xfer;
3296 	struct vhost_async *async = vq->async;
3297 	struct async_inflight_info *pkts_info = async->pkts_info;
3298 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
3299 	uint16_t pkts_size = count;
3300 
3301 	/**
3302 	 * The ordering between avail index and
3303 	 * desc reads needs to be enforced.
3304 	 */
3305 	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
3306 			vq->last_avail_idx;
3307 	if (free_entries == 0)
3308 		goto out;
3309 
3310 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3311 
3312 	async_iter_reset(async);
3313 
3314 	count = RTE_MIN(count, MAX_PKT_BURST);
3315 	count = RTE_MIN(count, free_entries);
3316 	VHOST_LOG_DATA(DEBUG, "(%s) about to dequeue %u buffers\n",
3317 			dev->ifname, count);
3318 
3319 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
3320 		goto out;
3321 
3322 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
3323 		uint16_t head_idx = 0;
3324 		uint16_t nr_vec = 0;
3325 		uint16_t to;
3326 		uint32_t buf_len;
3327 		int err;
3328 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3329 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
3330 
3331 		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
3332 						&nr_vec, buf_vec,
3333 						&head_idx, &buf_len,
3334 						VHOST_ACCESS_RO) < 0)) {
3335 			dropped = true;
3336 			break;
3337 		}
3338 
3339 		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
3340 		if (unlikely(err)) {
3341 			/**
3342 			 * mbuf allocation fails for jumbo packets when external
3343 			 * buffer allocation is not allowed and linear buffer
3344 			 * is required. Drop this packet.
3345 			 */
3346 			if (!allocerr_warned) {
3347 				VHOST_LOG_DATA(ERR,
3348 					"(%s) %s: Failed mbuf alloc of size %d from %s\n",
3349 					dev->ifname, __func__, buf_len, mbuf_pool->name);
3350 				allocerr_warned = true;
3351 			}
3352 			dropped = true;
3353 			break;
3354 		}
3355 
3356 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
3357 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
3358 					legacy_ol_flags, slot_idx, true);
3359 		if (unlikely(err)) {
3360 			if (!allocerr_warned) {
3361 				VHOST_LOG_DATA(ERR,
3362 					"(%s) %s: Failed to offload copies to async channel.\n",
3363 					dev->ifname, __func__);
3364 				allocerr_warned = true;
3365 			}
3366 			dropped = true;
3367 			break;
3368 		}
3369 
3370 		pkts_info[slot_idx].mbuf = pkt;
3371 
3372 		/* store used descs */
3373 		to = async->desc_idx_split & (vq->size - 1);
3374 		async->descs_split[to].id = head_idx;
3375 		async->descs_split[to].len = 0;
3376 		async->desc_idx_split++;
3377 
3378 		vq->last_avail_idx++;
3379 	}
3380 
3381 	if (unlikely(dropped))
3382 		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
3383 
3384 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
3385 					  async->iov_iter, pkt_idx);
3386 
3387 	async->pkts_inflight_n += n_xfer;
3388 
3389 	pkt_err = pkt_idx - n_xfer;
3390 	if (unlikely(pkt_err)) {
3391 		VHOST_LOG_DATA(DEBUG, "(%s) %s: failed to transfer data.\n",
3392 				dev->ifname, __func__);
3393 
3394 		pkt_idx = n_xfer;
3395 		/* recover available ring */
3396 		vq->last_avail_idx -= pkt_err;
3397 
3398 		/**
3399 		 * recover async channel copy related structures and free pktmbufs
3400 		 * for error pkts.
3401 		 */
3402 		async->desc_idx_split -= pkt_err;
3403 		while (pkt_err-- > 0) {
3404 			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
3405 			slot_idx--;
3406 		}
3407 	}
3408 
3409 	async->pkts_idx += pkt_idx;
3410 	if (async->pkts_idx >= vq->size)
3411 		async->pkts_idx -= vq->size;
3412 
3413 out:
3414 	/* DMA device may serve other queues, unconditionally check completed. */
3415 	nr_done_pkts = async_poll_dequeue_completed_split(dev, vq, pkts, pkts_size,
3416 							  dma_id, vchan_id, legacy_ol_flags);
3417 
3418 	return nr_done_pkts;
3419 }
3420 
3421 __rte_noinline
3422 static uint16_t
3423 virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
3424 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3425 		struct rte_mbuf **pkts, uint16_t count,
3426 		int16_t dma_id, uint16_t vchan_id)
3427 {
3428 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3429 				pkts, count, dma_id, vchan_id, true);
3430 }
3431 
3432 __rte_noinline
3433 static uint16_t
3434 virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
3435 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3436 		struct rte_mbuf **pkts, uint16_t count,
3437 		int16_t dma_id, uint16_t vchan_id)
3438 {
3439 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3440 				pkts, count, dma_id, vchan_id, false);
3441 }
3442 
3443 uint16_t
3444 rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
3445 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3446 	int *nr_inflight, int16_t dma_id, uint16_t vchan_id)
3447 {
3448 	struct virtio_net *dev;
3449 	struct rte_mbuf *rarp_mbuf = NULL;
3450 	struct vhost_virtqueue *vq;
3451 	int16_t success = 1;
3452 
3453 	dev = get_device(vid);
3454 	if (!dev || !nr_inflight)
3455 		return 0;
3456 
3457 	*nr_inflight = -1;
3458 
3459 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
3460 		VHOST_LOG_DATA(ERR, "(%s) %s: built-in vhost net backend is disabled.\n",
3461 				dev->ifname, __func__);
3462 		return 0;
3463 	}
3464 
3465 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
3466 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
3467 				dev->ifname, __func__, queue_id);
3468 		return 0;
3469 	}
3470 
3471 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
3472 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid dma id %d.\n",
3473 				dev->ifname, __func__, dma_id);
3474 		return 0;
3475 	}
3476 
3477 	if (unlikely(!dma_copy_track[dma_id].vchans ||
3478 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
3479 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid channel %d:%u.\n", dev->ifname, __func__,
3480 				dma_id, vchan_id);
3481 		return 0;
3482 	}
3483 
3484 	vq = dev->virtqueue[queue_id];
3485 
3486 	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
3487 		return 0;
3488 
3489 	if (unlikely(vq->enabled == 0)) {
3490 		count = 0;
3491 		goto out_access_unlock;
3492 	}
3493 
3494 	if (unlikely(!vq->async)) {
3495 		VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for queue id %d.\n",
3496 				dev->ifname, __func__, queue_id);
3497 		count = 0;
3498 		goto out_access_unlock;
3499 	}
3500 
3501 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
3502 		vhost_user_iotlb_rd_lock(vq);
3503 
3504 	if (unlikely(vq->access_ok == 0))
3505 		if (unlikely(vring_translate(dev, vq) < 0)) {
3506 			count = 0;
3507 			goto out;
3508 		}
3509 
3510 	/*
3511 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
3512 	 * array, to looks like that guest actually send such packet.
3513 	 *
3514 	 * Check user_send_rarp() for more information.
3515 	 *
3516 	 * broadcast_rarp shares a cacheline in the virtio_net structure
3517 	 * with some fields that are accessed during enqueue and
3518 	 * __atomic_compare_exchange_n causes a write if performed compare
3519 	 * and exchange. This could result in false sharing between enqueue
3520 	 * and dequeue.
3521 	 *
3522 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
3523 	 * and only performing compare and exchange if the read indicates it
3524 	 * is likely to be set.
3525 	 */
3526 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
3527 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
3528 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
3529 
3530 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
3531 		if (rarp_mbuf == NULL) {
3532 			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
3533 			count = 0;
3534 			goto out;
3535 		}
3536 		/*
3537 		 * Inject it to the head of "pkts" array, so that switch's mac
3538 		 * learning table will get updated first.
3539 		 */
3540 		pkts[0] = rarp_mbuf;
3541 		pkts++;
3542 		count -= 1;
3543 	}
3544 
3545 	if (unlikely(vq_is_packed(dev))) {
3546 		static bool not_support_pack_log;
3547 		if (!not_support_pack_log) {
3548 			VHOST_LOG_DATA(ERR,
3549 				"(%s) %s: async dequeue does not support packed ring.\n",
3550 				dev->ifname, __func__);
3551 			not_support_pack_log = true;
3552 		}
3553 		count = 0;
3554 		goto out;
3555 	}
3556 
3557 	if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3558 		count = virtio_dev_tx_async_split_legacy(dev, vq, mbuf_pool, pkts,
3559 							 count, dma_id, vchan_id);
3560 	else
3561 		count = virtio_dev_tx_async_split_compliant(dev, vq, mbuf_pool, pkts,
3562 							    count, dma_id, vchan_id);
3563 
3564 	*nr_inflight = vq->async->pkts_inflight_n;
3565 
3566 out:
3567 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
3568 		vhost_user_iotlb_rd_unlock(vq);
3569 
3570 out_access_unlock:
3571 	rte_spinlock_unlock(&vq->access_lock);
3572 
3573 	if (unlikely(rarp_mbuf != NULL))
3574 		count += 1;
3575 
3576 	return count;
3577 }
3578