xref: /dpdk/lib/vhost/virtio_net.c (revision 1e472b5746aeb6189fa254ab82ce4cd27999f868)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_dmadev.h>
15 #include <rte_vhost.h>
16 #include <rte_tcp.h>
17 #include <rte_udp.h>
18 #include <rte_sctp.h>
19 #include <rte_arp.h>
20 #include <rte_spinlock.h>
21 #include <rte_malloc.h>
22 #include <rte_vhost_async.h>
23 
24 #include "iotlb.h"
25 #include "vhost.h"
26 
27 #define MAX_BATCH_LEN 256
28 
29 static __rte_always_inline uint16_t
30 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
31 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
32 		uint16_t vchan_id, bool legacy_ol_flags);
33 
34 /* DMA device copy operation tracking array. */
35 struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
36 
37 static  __rte_always_inline bool
38 rxvq_is_mergeable(struct virtio_net *dev)
39 {
40 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
41 }
42 
43 static  __rte_always_inline bool
44 virtio_net_is_inorder(struct virtio_net *dev)
45 {
46 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
47 }
48 
49 static bool
50 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
51 {
52 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
53 }
54 
55 static inline void
56 vhost_queue_stats_update(const struct virtio_net *dev, struct vhost_virtqueue *vq,
57 		struct rte_mbuf **pkts, uint16_t count)
58 	__rte_shared_locks_required(&vq->access_lock)
59 {
60 	struct virtqueue_stats *stats = &vq->stats;
61 	int i;
62 
63 	if (!(dev->flags & VIRTIO_DEV_STATS_ENABLED))
64 		return;
65 
66 	for (i = 0; i < count; i++) {
67 		const struct rte_ether_addr *ea;
68 		const struct rte_mbuf *pkt = pkts[i];
69 		uint32_t pkt_len = rte_pktmbuf_pkt_len(pkt);
70 
71 		stats->packets++;
72 		stats->bytes += pkt_len;
73 
74 		if (pkt_len >= 1024)
75 			stats->size_bins[6 + (pkt_len > 1518)]++;
76 		else if (pkt_len <= 64)
77 			stats->size_bins[pkt_len >> 6]++;
78 		else
79 			stats->size_bins[32UL - rte_clz32(pkt_len) - 5]++;
80 
81 		ea = rte_pktmbuf_mtod(pkt, const struct rte_ether_addr *);
82 		RTE_BUILD_BUG_ON(offsetof(struct virtqueue_stats, broadcast) !=
83 				offsetof(struct virtqueue_stats, multicast) + sizeof(uint64_t));
84 		if (unlikely(rte_is_multicast_ether_addr(ea)))
85 			(&stats->multicast)[rte_is_broadcast_ether_addr(ea)]++;
86 	}
87 }
88 
89 static __rte_always_inline int64_t
90 vhost_async_dma_transfer_one(struct virtio_net *dev, struct vhost_virtqueue *vq,
91 		int16_t dma_id, uint16_t vchan_id, uint16_t flag_idx,
92 		struct vhost_iov_iter *pkt)
93 	__rte_shared_locks_required(&vq->access_lock)
94 {
95 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
96 	uint16_t ring_mask = dma_info->ring_mask;
97 	static bool vhost_async_dma_copy_log;
98 
99 
100 	struct vhost_iovec *iov = pkt->iov;
101 	int copy_idx = 0;
102 	uint32_t nr_segs = pkt->nr_segs;
103 	uint16_t i;
104 
105 	if (rte_dma_burst_capacity(dma_id, vchan_id) < nr_segs)
106 		return -1;
107 
108 	for (i = 0; i < nr_segs; i++) {
109 		copy_idx = rte_dma_copy(dma_id, vchan_id, (rte_iova_t)iov[i].src_addr,
110 				(rte_iova_t)iov[i].dst_addr, iov[i].len, RTE_DMA_OP_FLAG_LLC);
111 		/**
112 		 * Since all memory is pinned and DMA vChannel
113 		 * ring has enough space, failure should be a
114 		 * rare case. If failure happens, it means DMA
115 		 * device encounters serious errors; in this
116 		 * case, please stop async data-path and check
117 		 * what has happened to DMA device.
118 		 */
119 		if (unlikely(copy_idx < 0)) {
120 			if (!vhost_async_dma_copy_log) {
121 				VHOST_DATA_LOG(dev->ifname, ERR,
122 					"DMA copy failed for channel %d:%u",
123 					dma_id, vchan_id);
124 				vhost_async_dma_copy_log = true;
125 			}
126 			return -1;
127 		}
128 	}
129 
130 	/**
131 	 * Only store packet completion flag address in the last copy's
132 	 * slot, and other slots are set to NULL.
133 	 */
134 	dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = &vq->async->pkts_cmpl_flag[flag_idx];
135 
136 	return nr_segs;
137 }
138 
139 static __rte_always_inline uint16_t
140 vhost_async_dma_transfer(struct virtio_net *dev, struct vhost_virtqueue *vq,
141 		int16_t dma_id, uint16_t vchan_id, uint16_t head_idx,
142 		struct vhost_iov_iter *pkts, uint16_t nr_pkts)
143 	__rte_shared_locks_required(&vq->access_lock)
144 {
145 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
146 	int64_t ret, nr_copies = 0;
147 	uint16_t pkt_idx;
148 
149 	rte_spinlock_lock(&dma_info->dma_lock);
150 
151 	for (pkt_idx = 0; pkt_idx < nr_pkts; pkt_idx++) {
152 		ret = vhost_async_dma_transfer_one(dev, vq, dma_id, vchan_id, head_idx,
153 				&pkts[pkt_idx]);
154 		if (unlikely(ret < 0))
155 			break;
156 
157 		nr_copies += ret;
158 		head_idx++;
159 		if (head_idx >= vq->size)
160 			head_idx -= vq->size;
161 	}
162 
163 	if (likely(nr_copies > 0))
164 		rte_dma_submit(dma_id, vchan_id);
165 
166 	rte_spinlock_unlock(&dma_info->dma_lock);
167 
168 	return pkt_idx;
169 }
170 
171 static __rte_always_inline uint16_t
172 vhost_async_dma_check_completed(struct virtio_net *dev, int16_t dma_id, uint16_t vchan_id,
173 		uint16_t max_pkts)
174 {
175 	struct async_dma_vchan_info *dma_info = &dma_copy_track[dma_id].vchans[vchan_id];
176 	uint16_t ring_mask = dma_info->ring_mask;
177 	uint16_t last_idx = 0;
178 	uint16_t nr_copies;
179 	uint16_t copy_idx;
180 	uint16_t i;
181 	bool has_error = false;
182 	static bool vhost_async_dma_complete_log;
183 
184 	rte_spinlock_lock(&dma_info->dma_lock);
185 
186 	/**
187 	 * Print error log for debugging, if DMA reports error during
188 	 * DMA transfer. We do not handle error in vhost level.
189 	 */
190 	nr_copies = rte_dma_completed(dma_id, vchan_id, max_pkts, &last_idx, &has_error);
191 	if (unlikely(!vhost_async_dma_complete_log && has_error)) {
192 		VHOST_DATA_LOG(dev->ifname, ERR,
193 			"DMA completion failure on channel %d:%u",
194 			dma_id, vchan_id);
195 		vhost_async_dma_complete_log = true;
196 	} else if (nr_copies == 0) {
197 		goto out;
198 	}
199 
200 	copy_idx = last_idx - nr_copies + 1;
201 	for (i = 0; i < nr_copies; i++) {
202 		bool *flag;
203 
204 		flag = dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask];
205 		if (flag) {
206 			/**
207 			 * Mark the packet flag as received. The flag
208 			 * could belong to another virtqueue but write
209 			 * is atomic.
210 			 */
211 			*flag = true;
212 			dma_info->pkts_cmpl_flag_addr[copy_idx & ring_mask] = NULL;
213 		}
214 		copy_idx++;
215 	}
216 
217 out:
218 	rte_spinlock_unlock(&dma_info->dma_lock);
219 	return nr_copies;
220 }
221 
222 static inline void
223 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
224 	__rte_shared_locks_required(&vq->iotlb_lock)
225 {
226 	struct batch_copy_elem *elem = vq->batch_copy_elems;
227 	uint16_t count = vq->batch_copy_nb_elems;
228 	int i;
229 
230 	for (i = 0; i < count; i++) {
231 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
232 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
233 					   elem[i].len);
234 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
235 	}
236 
237 	vq->batch_copy_nb_elems = 0;
238 }
239 
240 static inline void
241 do_data_copy_dequeue(struct vhost_virtqueue *vq)
242 {
243 	struct batch_copy_elem *elem = vq->batch_copy_elems;
244 	uint16_t count = vq->batch_copy_nb_elems;
245 	int i;
246 
247 	for (i = 0; i < count; i++)
248 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
249 
250 	vq->batch_copy_nb_elems = 0;
251 }
252 
253 static __rte_always_inline void
254 do_flush_shadow_used_ring_split(struct virtio_net *dev,
255 			struct vhost_virtqueue *vq,
256 			uint16_t to, uint16_t from, uint16_t size)
257 {
258 	rte_memcpy(&vq->used->ring[to],
259 			&vq->shadow_used_split[from],
260 			size * sizeof(struct vring_used_elem));
261 	vhost_log_cache_used_vring(dev, vq,
262 			offsetof(struct vring_used, ring[to]),
263 			size * sizeof(struct vring_used_elem));
264 }
265 
266 static __rte_always_inline void
267 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
268 {
269 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
270 
271 	if (used_idx + vq->shadow_used_idx <= vq->size) {
272 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
273 					  vq->shadow_used_idx);
274 	} else {
275 		uint16_t size;
276 
277 		/* update used ring interval [used_idx, vq->size] */
278 		size = vq->size - used_idx;
279 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
280 
281 		/* update the left half used ring interval [0, left_size] */
282 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
283 					  vq->shadow_used_idx - size);
284 	}
285 	vq->last_used_idx += vq->shadow_used_idx;
286 
287 	vhost_log_cache_sync(dev, vq);
288 
289 	rte_atomic_fetch_add_explicit((unsigned short __rte_atomic *)&vq->used->idx,
290 		vq->shadow_used_idx, rte_memory_order_release);
291 	vq->shadow_used_idx = 0;
292 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
293 		sizeof(vq->used->idx));
294 }
295 
296 static __rte_always_inline void
297 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
298 			 uint16_t desc_idx, uint32_t len)
299 {
300 	uint16_t i = vq->shadow_used_idx++;
301 
302 	vq->shadow_used_split[i].id  = desc_idx;
303 	vq->shadow_used_split[i].len = len;
304 }
305 
306 static __rte_always_inline void
307 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
308 				  struct vhost_virtqueue *vq)
309 {
310 	int i;
311 	uint16_t used_idx = vq->last_used_idx;
312 	uint16_t head_idx = vq->last_used_idx;
313 	uint16_t head_flags = 0;
314 
315 	/* Split loop in two to save memory barriers */
316 	for (i = 0; i < vq->shadow_used_idx; i++) {
317 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
318 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
319 
320 		used_idx += vq->shadow_used_packed[i].count;
321 		if (used_idx >= vq->size)
322 			used_idx -= vq->size;
323 	}
324 
325 	/* The ordering for storing desc flags needs to be enforced. */
326 	rte_atomic_thread_fence(rte_memory_order_release);
327 
328 	for (i = 0; i < vq->shadow_used_idx; i++) {
329 		uint16_t flags;
330 
331 		if (vq->shadow_used_packed[i].len)
332 			flags = VRING_DESC_F_WRITE;
333 		else
334 			flags = 0;
335 
336 		if (vq->used_wrap_counter) {
337 			flags |= VRING_DESC_F_USED;
338 			flags |= VRING_DESC_F_AVAIL;
339 		} else {
340 			flags &= ~VRING_DESC_F_USED;
341 			flags &= ~VRING_DESC_F_AVAIL;
342 		}
343 
344 		if (i > 0) {
345 			vq->desc_packed[vq->last_used_idx].flags = flags;
346 
347 			vhost_log_cache_used_vring(dev, vq,
348 					vq->last_used_idx *
349 					sizeof(struct vring_packed_desc),
350 					sizeof(struct vring_packed_desc));
351 		} else {
352 			head_idx = vq->last_used_idx;
353 			head_flags = flags;
354 		}
355 
356 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
357 	}
358 
359 	vq->desc_packed[head_idx].flags = head_flags;
360 
361 	vhost_log_cache_used_vring(dev, vq,
362 				head_idx *
363 				sizeof(struct vring_packed_desc),
364 				sizeof(struct vring_packed_desc));
365 
366 	vq->shadow_used_idx = 0;
367 	vhost_log_cache_sync(dev, vq);
368 }
369 
370 static __rte_always_inline void
371 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
372 				  struct vhost_virtqueue *vq)
373 {
374 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
375 
376 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
377 	/* desc flags is the synchronization point for virtio packed vring */
378 	rte_atomic_store_explicit(
379 		(unsigned short __rte_atomic *)&vq->desc_packed[vq->shadow_last_used_idx].flags,
380 		used_elem->flags, rte_memory_order_release);
381 
382 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
383 				   sizeof(struct vring_packed_desc),
384 				   sizeof(struct vring_packed_desc));
385 	vq->shadow_used_idx = 0;
386 	vhost_log_cache_sync(dev, vq);
387 }
388 
389 static __rte_always_inline void
390 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
391 				 struct vhost_virtqueue *vq,
392 				 uint64_t *lens,
393 				 uint16_t *ids)
394 {
395 	uint16_t i;
396 	uint16_t flags;
397 	uint16_t last_used_idx;
398 	struct vring_packed_desc *desc_base;
399 
400 	last_used_idx = vq->last_used_idx;
401 	desc_base = &vq->desc_packed[last_used_idx];
402 
403 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
404 
405 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
406 		desc_base[i].id = ids[i];
407 		desc_base[i].len = lens[i];
408 	}
409 
410 	rte_atomic_thread_fence(rte_memory_order_release);
411 
412 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
413 		desc_base[i].flags = flags;
414 	}
415 
416 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
417 				   sizeof(struct vring_packed_desc),
418 				   sizeof(struct vring_packed_desc) *
419 				   PACKED_BATCH_SIZE);
420 	vhost_log_cache_sync(dev, vq);
421 
422 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
423 }
424 
425 static __rte_always_inline void
426 vhost_async_shadow_enqueue_packed_batch(struct vhost_virtqueue *vq,
427 				 uint64_t *lens,
428 				 uint16_t *ids)
429 	__rte_exclusive_locks_required(&vq->access_lock)
430 {
431 	uint16_t i;
432 	struct vhost_async *async = vq->async;
433 
434 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
435 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
436 		async->buffers_packed[async->buffer_idx_packed].len = lens[i];
437 		async->buffers_packed[async->buffer_idx_packed].count = 1;
438 		async->buffer_idx_packed++;
439 		if (async->buffer_idx_packed >= vq->size)
440 			async->buffer_idx_packed -= vq->size;
441 	}
442 }
443 
444 static __rte_always_inline void
445 vhost_async_shadow_dequeue_packed_batch(struct vhost_virtqueue *vq, uint16_t *ids)
446 	__rte_shared_locks_required(&vq->access_lock)
447 {
448 	uint16_t i;
449 	struct vhost_async *async = vq->async;
450 
451 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
452 		async->buffers_packed[async->buffer_idx_packed].id  = ids[i];
453 		async->buffers_packed[async->buffer_idx_packed].len = 0;
454 		async->buffers_packed[async->buffer_idx_packed].count = 1;
455 
456 		async->buffer_idx_packed++;
457 		if (async->buffer_idx_packed >= vq->size)
458 			async->buffer_idx_packed -= vq->size;
459 	}
460 }
461 
462 static __rte_always_inline void
463 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
464 					  uint16_t id)
465 {
466 	vq->shadow_used_packed[0].id = id;
467 
468 	if (!vq->shadow_used_idx) {
469 		vq->shadow_last_used_idx = vq->last_used_idx;
470 		vq->shadow_used_packed[0].flags =
471 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
472 		vq->shadow_used_packed[0].len = 0;
473 		vq->shadow_used_packed[0].count = 1;
474 		vq->shadow_used_idx++;
475 	}
476 
477 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
478 }
479 
480 static __rte_always_inline void
481 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
482 				  struct vhost_virtqueue *vq,
483 				  uint16_t *ids)
484 {
485 	uint16_t flags;
486 	uint16_t i;
487 	uint16_t begin;
488 
489 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
490 
491 	if (!vq->shadow_used_idx) {
492 		vq->shadow_last_used_idx = vq->last_used_idx;
493 		vq->shadow_used_packed[0].id  = ids[0];
494 		vq->shadow_used_packed[0].len = 0;
495 		vq->shadow_used_packed[0].count = 1;
496 		vq->shadow_used_packed[0].flags = flags;
497 		vq->shadow_used_idx++;
498 		begin = 1;
499 	} else
500 		begin = 0;
501 
502 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
503 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
504 		vq->desc_packed[vq->last_used_idx + i].len = 0;
505 	}
506 
507 	rte_atomic_thread_fence(rte_memory_order_release);
508 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
509 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
510 
511 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
512 				   sizeof(struct vring_packed_desc),
513 				   sizeof(struct vring_packed_desc) *
514 				   PACKED_BATCH_SIZE);
515 	vhost_log_cache_sync(dev, vq);
516 
517 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
518 }
519 
520 static __rte_always_inline void
521 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
522 				   uint16_t buf_id,
523 				   uint16_t count)
524 {
525 	uint16_t flags;
526 
527 	flags = vq->desc_packed[vq->last_used_idx].flags;
528 	if (vq->used_wrap_counter) {
529 		flags |= VRING_DESC_F_USED;
530 		flags |= VRING_DESC_F_AVAIL;
531 	} else {
532 		flags &= ~VRING_DESC_F_USED;
533 		flags &= ~VRING_DESC_F_AVAIL;
534 	}
535 
536 	if (!vq->shadow_used_idx) {
537 		vq->shadow_last_used_idx = vq->last_used_idx;
538 
539 		vq->shadow_used_packed[0].id  = buf_id;
540 		vq->shadow_used_packed[0].len = 0;
541 		vq->shadow_used_packed[0].flags = flags;
542 		vq->shadow_used_idx++;
543 	} else {
544 		vq->desc_packed[vq->last_used_idx].id = buf_id;
545 		vq->desc_packed[vq->last_used_idx].len = 0;
546 		vq->desc_packed[vq->last_used_idx].flags = flags;
547 	}
548 
549 	vq_inc_last_used_packed(vq, count);
550 }
551 
552 static __rte_always_inline void
553 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
554 					   uint16_t buf_id,
555 					   uint16_t count)
556 {
557 	uint16_t flags;
558 
559 	vq->shadow_used_packed[0].id = buf_id;
560 
561 	flags = vq->desc_packed[vq->last_used_idx].flags;
562 	if (vq->used_wrap_counter) {
563 		flags |= VRING_DESC_F_USED;
564 		flags |= VRING_DESC_F_AVAIL;
565 	} else {
566 		flags &= ~VRING_DESC_F_USED;
567 		flags &= ~VRING_DESC_F_AVAIL;
568 	}
569 
570 	if (!vq->shadow_used_idx) {
571 		vq->shadow_last_used_idx = vq->last_used_idx;
572 		vq->shadow_used_packed[0].len = 0;
573 		vq->shadow_used_packed[0].flags = flags;
574 		vq->shadow_used_idx++;
575 	}
576 
577 	vq_inc_last_used_packed(vq, count);
578 }
579 
580 static __rte_always_inline void
581 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
582 				   uint32_t *len,
583 				   uint16_t *id,
584 				   uint16_t *count,
585 				   uint16_t num_buffers)
586 {
587 	uint16_t i;
588 
589 	for (i = 0; i < num_buffers; i++) {
590 		/* enqueue shadow flush action aligned with batch num */
591 		if (!vq->shadow_used_idx)
592 			vq->shadow_aligned_idx = vq->last_used_idx &
593 				PACKED_BATCH_MASK;
594 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
595 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
596 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
597 		vq->shadow_aligned_idx += count[i];
598 		vq->shadow_used_idx++;
599 	}
600 }
601 
602 static __rte_always_inline void
603 vhost_async_shadow_enqueue_packed(struct vhost_virtqueue *vq,
604 				   uint32_t *len,
605 				   uint16_t *id,
606 				   uint16_t *count,
607 				   uint16_t num_buffers)
608 	__rte_exclusive_locks_required(&vq->access_lock)
609 {
610 	uint16_t i;
611 	struct vhost_async *async = vq->async;
612 
613 	for (i = 0; i < num_buffers; i++) {
614 		async->buffers_packed[async->buffer_idx_packed].id  = id[i];
615 		async->buffers_packed[async->buffer_idx_packed].len = len[i];
616 		async->buffers_packed[async->buffer_idx_packed].count = count[i];
617 		async->buffer_idx_packed++;
618 		if (async->buffer_idx_packed >= vq->size)
619 			async->buffer_idx_packed -= vq->size;
620 	}
621 }
622 
623 static __rte_always_inline void
624 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
625 				   struct vhost_virtqueue *vq,
626 				   uint32_t *len,
627 				   uint16_t *id,
628 				   uint16_t *count,
629 				   uint16_t num_buffers)
630 	__rte_shared_locks_required(&vq->iotlb_lock)
631 {
632 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
633 
634 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
635 		do_data_copy_enqueue(dev, vq);
636 		vhost_flush_enqueue_shadow_packed(dev, vq);
637 	}
638 }
639 
640 /* avoid write operation when necessary, to lessen cache issues */
641 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
642 	if ((var) != (val))			\
643 		(var) = (val);			\
644 } while (0)
645 
646 static __rte_always_inline void
647 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
648 {
649 	uint64_t csum_l4 = m_buf->ol_flags & RTE_MBUF_F_TX_L4_MASK;
650 
651 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
652 		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
653 
654 	if (csum_l4) {
655 		/*
656 		 * Pseudo-header checksum must be set as per Virtio spec.
657 		 *
658 		 * Note: We don't propagate rte_net_intel_cksum_prepare()
659 		 * errors, as it would have an impact on performance, and an
660 		 * error would mean the packet is dropped by the guest instead
661 		 * of being dropped here.
662 		 */
663 		rte_net_intel_cksum_prepare(m_buf);
664 
665 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
666 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
667 
668 		switch (csum_l4) {
669 		case RTE_MBUF_F_TX_TCP_CKSUM:
670 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
671 						cksum));
672 			break;
673 		case RTE_MBUF_F_TX_UDP_CKSUM:
674 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
675 						dgram_cksum));
676 			break;
677 		case RTE_MBUF_F_TX_SCTP_CKSUM:
678 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
679 						cksum));
680 			break;
681 		}
682 	} else {
683 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
684 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
685 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
686 	}
687 
688 	/* IP cksum verification cannot be bypassed, then calculate here */
689 	if (m_buf->ol_flags & RTE_MBUF_F_TX_IP_CKSUM) {
690 		struct rte_ipv4_hdr *ipv4_hdr;
691 
692 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
693 						   m_buf->l2_len);
694 		ipv4_hdr->hdr_checksum = 0;
695 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
696 	}
697 
698 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
699 		if (m_buf->ol_flags & RTE_MBUF_F_TX_IPV4)
700 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
701 		else
702 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
703 		net_hdr->gso_size = m_buf->tso_segsz;
704 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
705 					+ m_buf->l4_len;
706 	} else if (m_buf->ol_flags & RTE_MBUF_F_TX_UDP_SEG) {
707 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
708 		net_hdr->gso_size = m_buf->tso_segsz;
709 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
710 			m_buf->l4_len;
711 	} else {
712 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
713 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
714 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
715 	}
716 }
717 
718 static __rte_always_inline int
719 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
720 		struct buf_vector *buf_vec, uint16_t *vec_idx,
721 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
722 	__rte_shared_locks_required(&vq->iotlb_lock)
723 {
724 	uint16_t vec_id = *vec_idx;
725 
726 	while (desc_len) {
727 		uint64_t desc_addr;
728 		uint64_t desc_chunck_len = desc_len;
729 
730 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
731 			return -1;
732 
733 		desc_addr = vhost_iova_to_vva(dev, vq,
734 				desc_iova,
735 				&desc_chunck_len,
736 				perm);
737 		if (unlikely(!desc_addr))
738 			return -1;
739 
740 		rte_prefetch0((void *)(uintptr_t)desc_addr);
741 
742 		buf_vec[vec_id].buf_iova = desc_iova;
743 		buf_vec[vec_id].buf_addr = desc_addr;
744 		buf_vec[vec_id].buf_len  = desc_chunck_len;
745 
746 		desc_len -= desc_chunck_len;
747 		desc_iova += desc_chunck_len;
748 		vec_id++;
749 	}
750 	*vec_idx = vec_id;
751 
752 	return 0;
753 }
754 
755 static __rte_always_inline int
756 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
757 			 uint32_t avail_idx, uint16_t *vec_idx,
758 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
759 			 uint32_t *desc_chain_len, uint8_t perm)
760 	__rte_shared_locks_required(&vq->iotlb_lock)
761 {
762 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
763 	uint16_t vec_id = *vec_idx;
764 	uint32_t len    = 0;
765 	uint64_t dlen;
766 	uint32_t nr_descs = vq->size;
767 	uint32_t cnt    = 0;
768 	struct vring_desc *descs = vq->desc;
769 	struct vring_desc *idesc = NULL;
770 
771 	if (unlikely(idx >= vq->size))
772 		return -1;
773 
774 	*desc_chain_head = idx;
775 
776 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
777 		dlen = vq->desc[idx].len;
778 		nr_descs = dlen / sizeof(struct vring_desc);
779 		if (unlikely(nr_descs > vq->size))
780 			return -1;
781 
782 		descs = (struct vring_desc *)(uintptr_t)
783 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
784 						&dlen,
785 						VHOST_ACCESS_RO);
786 		if (unlikely(!descs))
787 			return -1;
788 
789 		if (unlikely(dlen < vq->desc[idx].len)) {
790 			/*
791 			 * The indirect desc table is not contiguous
792 			 * in process VA space, we have to copy it.
793 			 */
794 			idesc = vhost_alloc_copy_ind_table(dev, vq,
795 					vq->desc[idx].addr, vq->desc[idx].len);
796 			if (unlikely(!idesc))
797 				return -1;
798 
799 			descs = idesc;
800 		}
801 
802 		idx = 0;
803 	}
804 
805 	while (1) {
806 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
807 			free_ind_table(idesc);
808 			return -1;
809 		}
810 
811 		dlen = descs[idx].len;
812 		len += dlen;
813 
814 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
815 						descs[idx].addr, dlen,
816 						perm))) {
817 			free_ind_table(idesc);
818 			return -1;
819 		}
820 
821 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
822 			break;
823 
824 		idx = descs[idx].next;
825 	}
826 
827 	*desc_chain_len = len;
828 	*vec_idx = vec_id;
829 
830 	if (unlikely(!!idesc))
831 		free_ind_table(idesc);
832 
833 	return 0;
834 }
835 
836 /*
837  * Returns -1 on fail, 0 on success
838  */
839 static inline int
840 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
841 				uint64_t size, struct buf_vector *buf_vec,
842 				uint16_t *num_buffers, uint16_t avail_head,
843 				uint16_t *nr_vec)
844 	__rte_shared_locks_required(&vq->iotlb_lock)
845 {
846 	uint16_t cur_idx;
847 	uint16_t vec_idx = 0;
848 	uint16_t max_tries, tries = 0;
849 
850 	uint16_t head_idx = 0;
851 	uint32_t len = 0;
852 
853 	*num_buffers = 0;
854 	cur_idx  = vq->last_avail_idx;
855 
856 	if (rxvq_is_mergeable(dev))
857 		max_tries = vq->size - 1;
858 	else
859 		max_tries = 1;
860 
861 	while (size > 0) {
862 		if (unlikely(cur_idx == avail_head))
863 			return -1;
864 		/*
865 		 * if we tried all available ring items, and still
866 		 * can't get enough buf, it means something abnormal
867 		 * happened.
868 		 */
869 		if (unlikely(++tries > max_tries))
870 			return -1;
871 
872 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
873 						&vec_idx, buf_vec,
874 						&head_idx, &len,
875 						VHOST_ACCESS_RW) < 0))
876 			return -1;
877 		len = RTE_MIN(len, size);
878 		update_shadow_used_ring_split(vq, head_idx, len);
879 		size -= len;
880 
881 		cur_idx++;
882 		*num_buffers += 1;
883 	}
884 
885 	*nr_vec = vec_idx;
886 
887 	return 0;
888 }
889 
890 static __rte_always_inline int
891 fill_vec_buf_packed_indirect(struct virtio_net *dev,
892 			struct vhost_virtqueue *vq,
893 			struct vring_packed_desc *desc, uint16_t *vec_idx,
894 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
895 	__rte_shared_locks_required(&vq->iotlb_lock)
896 {
897 	uint16_t i;
898 	uint32_t nr_descs;
899 	uint16_t vec_id = *vec_idx;
900 	uint64_t dlen;
901 	struct vring_packed_desc *descs, *idescs = NULL;
902 
903 	dlen = desc->len;
904 	descs = (struct vring_packed_desc *)(uintptr_t)
905 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
906 	if (unlikely(!descs))
907 		return -1;
908 
909 	if (unlikely(dlen < desc->len)) {
910 		/*
911 		 * The indirect desc table is not contiguous
912 		 * in process VA space, we have to copy it.
913 		 */
914 		idescs = vhost_alloc_copy_ind_table(dev,
915 				vq, desc->addr, desc->len);
916 		if (unlikely(!idescs))
917 			return -1;
918 
919 		descs = idescs;
920 	}
921 
922 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
923 	if (unlikely(nr_descs >= vq->size)) {
924 		free_ind_table(idescs);
925 		return -1;
926 	}
927 
928 	for (i = 0; i < nr_descs; i++) {
929 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
930 			free_ind_table(idescs);
931 			return -1;
932 		}
933 
934 		dlen = descs[i].len;
935 		*len += dlen;
936 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
937 						descs[i].addr, dlen,
938 						perm)))
939 			return -1;
940 	}
941 	*vec_idx = vec_id;
942 
943 	if (unlikely(!!idescs))
944 		free_ind_table(idescs);
945 
946 	return 0;
947 }
948 
949 static __rte_always_inline int
950 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
951 				uint16_t avail_idx, uint16_t *desc_count,
952 				struct buf_vector *buf_vec, uint16_t *vec_idx,
953 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
954 	__rte_shared_locks_required(&vq->iotlb_lock)
955 {
956 	bool wrap_counter = vq->avail_wrap_counter;
957 	struct vring_packed_desc *descs = vq->desc_packed;
958 	uint16_t vec_id = *vec_idx;
959 	uint64_t dlen;
960 
961 	if (avail_idx < vq->last_avail_idx)
962 		wrap_counter ^= 1;
963 
964 	/*
965 	 * Perform a load-acquire barrier in desc_is_avail to
966 	 * enforce the ordering between desc flags and desc
967 	 * content.
968 	 */
969 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
970 		return -1;
971 
972 	*desc_count = 0;
973 	*len = 0;
974 
975 	while (1) {
976 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
977 			return -1;
978 
979 		if (unlikely(*desc_count >= vq->size))
980 			return -1;
981 
982 		*desc_count += 1;
983 		*buf_id = descs[avail_idx].id;
984 
985 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
986 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
987 							&descs[avail_idx],
988 							&vec_id, buf_vec,
989 							len, perm) < 0))
990 				return -1;
991 		} else {
992 			dlen = descs[avail_idx].len;
993 			*len += dlen;
994 
995 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
996 							descs[avail_idx].addr,
997 							dlen,
998 							perm)))
999 				return -1;
1000 		}
1001 
1002 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
1003 			break;
1004 
1005 		if (++avail_idx >= vq->size) {
1006 			avail_idx -= vq->size;
1007 			wrap_counter ^= 1;
1008 		}
1009 	}
1010 
1011 	*vec_idx = vec_id;
1012 
1013 	return 0;
1014 }
1015 
1016 static __rte_noinline void
1017 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1018 		struct buf_vector *buf_vec,
1019 		struct virtio_net_hdr_mrg_rxbuf *hdr)
1020 	__rte_shared_locks_required(&vq->iotlb_lock)
1021 {
1022 	uint64_t len;
1023 	uint64_t remain = dev->vhost_hlen;
1024 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
1025 	uint64_t iova = buf_vec->buf_iova;
1026 
1027 	while (remain) {
1028 		len = RTE_MIN(remain,
1029 				buf_vec->buf_len);
1030 		dst = buf_vec->buf_addr;
1031 		rte_memcpy((void *)(uintptr_t)dst,
1032 				(void *)(uintptr_t)src,
1033 				len);
1034 
1035 		PRINT_PACKET(dev, (uintptr_t)dst,
1036 				(uint32_t)len, 0);
1037 		vhost_log_cache_write_iova(dev, vq,
1038 				iova, len);
1039 
1040 		remain -= len;
1041 		iova += len;
1042 		src += len;
1043 		buf_vec++;
1044 	}
1045 }
1046 
1047 static __rte_always_inline int
1048 async_iter_initialize(struct virtio_net *dev, struct vhost_async *async)
1049 {
1050 	struct vhost_iov_iter *iter;
1051 
1052 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1053 		VHOST_DATA_LOG(dev->ifname, ERR, "no more async iovec available");
1054 		return -1;
1055 	}
1056 
1057 	iter = async->iov_iter + async->iter_idx;
1058 	iter->iov = async->iovec + async->iovec_idx;
1059 	iter->nr_segs = 0;
1060 
1061 	return 0;
1062 }
1063 
1064 static __rte_always_inline int
1065 async_iter_add_iovec(struct virtio_net *dev, struct vhost_async *async,
1066 		void *src, void *dst, size_t len)
1067 {
1068 	struct vhost_iov_iter *iter;
1069 	struct vhost_iovec *iovec;
1070 
1071 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
1072 		static bool vhost_max_async_vec_log;
1073 
1074 		if (!vhost_max_async_vec_log) {
1075 			VHOST_DATA_LOG(dev->ifname, ERR, "no more async iovec available");
1076 			vhost_max_async_vec_log = true;
1077 		}
1078 
1079 		return -1;
1080 	}
1081 
1082 	iter = async->iov_iter + async->iter_idx;
1083 	iovec = async->iovec + async->iovec_idx;
1084 
1085 	iovec->src_addr = src;
1086 	iovec->dst_addr = dst;
1087 	iovec->len = len;
1088 
1089 	iter->nr_segs++;
1090 	async->iovec_idx++;
1091 
1092 	return 0;
1093 }
1094 
1095 static __rte_always_inline void
1096 async_iter_finalize(struct vhost_async *async)
1097 {
1098 	async->iter_idx++;
1099 }
1100 
1101 static __rte_always_inline void
1102 async_iter_cancel(struct vhost_async *async)
1103 {
1104 	struct vhost_iov_iter *iter;
1105 
1106 	iter = async->iov_iter + async->iter_idx;
1107 	async->iovec_idx -= iter->nr_segs;
1108 	iter->nr_segs = 0;
1109 	iter->iov = NULL;
1110 }
1111 
1112 static __rte_always_inline void
1113 async_iter_reset(struct vhost_async *async)
1114 {
1115 	async->iter_idx = 0;
1116 	async->iovec_idx = 0;
1117 }
1118 
1119 static __rte_always_inline int
1120 async_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1121 		struct rte_mbuf *m, uint32_t mbuf_offset,
1122 		uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1123 	__rte_shared_locks_required(&vq->access_lock)
1124 	__rte_shared_locks_required(&vq->iotlb_lock)
1125 {
1126 	struct vhost_async *async = vq->async;
1127 	uint64_t mapped_len;
1128 	uint32_t buf_offset = 0;
1129 	void *src, *dst;
1130 	void *host_iova;
1131 
1132 	while (cpy_len) {
1133 		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
1134 				buf_iova + buf_offset, cpy_len, &mapped_len);
1135 		if (unlikely(!host_iova)) {
1136 			VHOST_DATA_LOG(dev->ifname, ERR,
1137 				"%s: failed to get host iova.",
1138 				__func__);
1139 			return -1;
1140 		}
1141 
1142 		if (to_desc) {
1143 			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1144 			dst = host_iova;
1145 		} else {
1146 			src = host_iova;
1147 			dst = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset);
1148 		}
1149 
1150 		if (unlikely(async_iter_add_iovec(dev, async, src, dst, (size_t)mapped_len)))
1151 			return -1;
1152 
1153 		cpy_len -= (uint32_t)mapped_len;
1154 		mbuf_offset += (uint32_t)mapped_len;
1155 		buf_offset += (uint32_t)mapped_len;
1156 	}
1157 
1158 	return 0;
1159 }
1160 
1161 static __rte_always_inline void
1162 sync_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
1163 		struct rte_mbuf *m, uint32_t mbuf_offset,
1164 		uint64_t buf_addr, uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
1165 	__rte_shared_locks_required(&vq->iotlb_lock)
1166 {
1167 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
1168 
1169 	if (likely(cpy_len > MAX_BATCH_LEN || vq->batch_copy_nb_elems >= vq->size)) {
1170 		if (to_desc) {
1171 			rte_memcpy((void *)((uintptr_t)(buf_addr)),
1172 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1173 				cpy_len);
1174 			vhost_log_cache_write_iova(dev, vq, buf_iova, cpy_len);
1175 			PRINT_PACKET(dev, (uintptr_t)(buf_addr), cpy_len, 0);
1176 		} else {
1177 			rte_memcpy(rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1178 				(void *)((uintptr_t)(buf_addr)),
1179 				cpy_len);
1180 		}
1181 	} else {
1182 		if (to_desc) {
1183 			batch_copy[vq->batch_copy_nb_elems].dst =
1184 				(void *)((uintptr_t)(buf_addr));
1185 			batch_copy[vq->batch_copy_nb_elems].src =
1186 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1187 			batch_copy[vq->batch_copy_nb_elems].log_addr = buf_iova;
1188 		} else {
1189 			batch_copy[vq->batch_copy_nb_elems].dst =
1190 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1191 			batch_copy[vq->batch_copy_nb_elems].src =
1192 				(void *)((uintptr_t)(buf_addr));
1193 		}
1194 		batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
1195 		vq->batch_copy_nb_elems++;
1196 	}
1197 }
1198 
1199 static __rte_always_inline int
1200 mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
1201 		struct rte_mbuf *m, struct buf_vector *buf_vec,
1202 		uint16_t nr_vec, uint16_t num_buffers, bool is_async)
1203 	__rte_shared_locks_required(&vq->access_lock)
1204 	__rte_shared_locks_required(&vq->iotlb_lock)
1205 {
1206 	uint32_t vec_idx = 0;
1207 	uint32_t mbuf_offset, mbuf_avail;
1208 	uint32_t buf_offset, buf_avail;
1209 	uint64_t buf_addr, buf_iova, buf_len;
1210 	uint32_t cpy_len;
1211 	uint64_t hdr_addr;
1212 	struct rte_mbuf *hdr_mbuf;
1213 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
1214 	struct vhost_async *async = vq->async;
1215 
1216 	if (unlikely(m == NULL))
1217 		return -1;
1218 
1219 	buf_addr = buf_vec[vec_idx].buf_addr;
1220 	buf_iova = buf_vec[vec_idx].buf_iova;
1221 	buf_len = buf_vec[vec_idx].buf_len;
1222 
1223 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
1224 		return -1;
1225 
1226 	hdr_mbuf = m;
1227 	hdr_addr = buf_addr;
1228 	if (unlikely(buf_len < dev->vhost_hlen)) {
1229 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
1230 		hdr = &tmp_hdr;
1231 	} else
1232 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
1233 
1234 	VHOST_DATA_LOG(dev->ifname, DEBUG, "RX: num merge buffers %d", num_buffers);
1235 
1236 	if (unlikely(buf_len < dev->vhost_hlen)) {
1237 		buf_offset = dev->vhost_hlen - buf_len;
1238 		vec_idx++;
1239 		buf_addr = buf_vec[vec_idx].buf_addr;
1240 		buf_iova = buf_vec[vec_idx].buf_iova;
1241 		buf_len = buf_vec[vec_idx].buf_len;
1242 		buf_avail = buf_len - buf_offset;
1243 	} else {
1244 		buf_offset = dev->vhost_hlen;
1245 		buf_avail = buf_len - dev->vhost_hlen;
1246 	}
1247 
1248 	mbuf_avail  = rte_pktmbuf_data_len(m);
1249 	mbuf_offset = 0;
1250 
1251 	if (is_async) {
1252 		if (async_iter_initialize(dev, async))
1253 			return -1;
1254 	}
1255 
1256 	while (mbuf_avail != 0 || m->next != NULL) {
1257 		/* done with current buf, get the next one */
1258 		if (buf_avail == 0) {
1259 			vec_idx++;
1260 			if (unlikely(vec_idx >= nr_vec))
1261 				goto error;
1262 
1263 			buf_addr = buf_vec[vec_idx].buf_addr;
1264 			buf_iova = buf_vec[vec_idx].buf_iova;
1265 			buf_len = buf_vec[vec_idx].buf_len;
1266 
1267 			buf_offset = 0;
1268 			buf_avail  = buf_len;
1269 		}
1270 
1271 		/* done with current mbuf, get the next one */
1272 		if (mbuf_avail == 0) {
1273 			m = m->next;
1274 
1275 			mbuf_offset = 0;
1276 			mbuf_avail  = rte_pktmbuf_data_len(m);
1277 		}
1278 
1279 		if (hdr_addr) {
1280 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1281 			if (rxvq_is_mergeable(dev))
1282 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1283 						num_buffers);
1284 
1285 			if (unlikely(hdr == &tmp_hdr)) {
1286 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1287 			} else {
1288 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1289 						dev->vhost_hlen, 0);
1290 				vhost_log_cache_write_iova(dev, vq,
1291 						buf_vec[0].buf_iova,
1292 						dev->vhost_hlen);
1293 			}
1294 
1295 			hdr_addr = 0;
1296 		}
1297 
1298 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1299 
1300 		if (is_async) {
1301 			if (async_fill_seg(dev, vq, m, mbuf_offset,
1302 					   buf_iova + buf_offset, cpy_len, true) < 0)
1303 				goto error;
1304 		} else {
1305 			sync_fill_seg(dev, vq, m, mbuf_offset,
1306 				      buf_addr + buf_offset,
1307 				      buf_iova + buf_offset, cpy_len, true);
1308 		}
1309 
1310 		mbuf_avail  -= cpy_len;
1311 		mbuf_offset += cpy_len;
1312 		buf_avail  -= cpy_len;
1313 		buf_offset += cpy_len;
1314 	}
1315 
1316 	if (is_async)
1317 		async_iter_finalize(async);
1318 
1319 	return 0;
1320 error:
1321 	if (is_async)
1322 		async_iter_cancel(async);
1323 
1324 	return -1;
1325 }
1326 
1327 static __rte_always_inline int
1328 vhost_enqueue_single_packed(struct virtio_net *dev,
1329 			    struct vhost_virtqueue *vq,
1330 			    struct rte_mbuf *pkt,
1331 			    struct buf_vector *buf_vec,
1332 			    uint16_t *nr_descs)
1333 	__rte_shared_locks_required(&vq->access_lock)
1334 	__rte_shared_locks_required(&vq->iotlb_lock)
1335 {
1336 	uint16_t nr_vec = 0;
1337 	uint16_t avail_idx = vq->last_avail_idx;
1338 	uint16_t max_tries, tries = 0;
1339 	uint16_t buf_id = 0;
1340 	uint32_t len = 0;
1341 	uint16_t desc_count;
1342 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1343 	uint16_t num_buffers = 0;
1344 	uint32_t buffer_len[vq->size];
1345 	uint16_t buffer_buf_id[vq->size];
1346 	uint16_t buffer_desc_count[vq->size];
1347 
1348 	if (rxvq_is_mergeable(dev))
1349 		max_tries = vq->size - 1;
1350 	else
1351 		max_tries = 1;
1352 
1353 	while (size > 0) {
1354 		/*
1355 		 * if we tried all available ring items, and still
1356 		 * can't get enough buf, it means something abnormal
1357 		 * happened.
1358 		 */
1359 		if (unlikely(++tries > max_tries))
1360 			return -1;
1361 
1362 		if (unlikely(fill_vec_buf_packed(dev, vq,
1363 						avail_idx, &desc_count,
1364 						buf_vec, &nr_vec,
1365 						&buf_id, &len,
1366 						VHOST_ACCESS_RW) < 0))
1367 			return -1;
1368 
1369 		len = RTE_MIN(len, size);
1370 		size -= len;
1371 
1372 		buffer_len[num_buffers] = len;
1373 		buffer_buf_id[num_buffers] = buf_id;
1374 		buffer_desc_count[num_buffers] = desc_count;
1375 		num_buffers += 1;
1376 
1377 		*nr_descs += desc_count;
1378 		avail_idx += desc_count;
1379 		if (avail_idx >= vq->size)
1380 			avail_idx -= vq->size;
1381 	}
1382 
1383 	if (mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers, false) < 0)
1384 		return -1;
1385 
1386 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1387 					   buffer_desc_count, num_buffers);
1388 
1389 	return 0;
1390 }
1391 
1392 static __rte_noinline uint32_t
1393 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1394 	struct rte_mbuf **pkts, uint32_t count)
1395 	__rte_shared_locks_required(&vq->access_lock)
1396 	__rte_shared_locks_required(&vq->iotlb_lock)
1397 {
1398 	uint32_t pkt_idx = 0;
1399 	uint16_t num_buffers;
1400 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1401 	uint16_t avail_head;
1402 
1403 	/*
1404 	 * The ordering between avail index and
1405 	 * desc reads needs to be enforced.
1406 	 */
1407 	avail_head = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
1408 		rte_memory_order_acquire);
1409 
1410 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1411 
1412 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1413 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1414 		uint16_t nr_vec = 0;
1415 
1416 		if (unlikely(reserve_avail_buf_split(dev, vq,
1417 						pkt_len, buf_vec, &num_buffers,
1418 						avail_head, &nr_vec) < 0)) {
1419 			VHOST_DATA_LOG(dev->ifname, DEBUG,
1420 				"failed to get enough desc from vring");
1421 			vq->shadow_used_idx -= num_buffers;
1422 			break;
1423 		}
1424 
1425 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1426 			"current index %d | end index %d",
1427 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1428 
1429 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec,
1430 					num_buffers, false) < 0) {
1431 			vq->shadow_used_idx -= num_buffers;
1432 			break;
1433 		}
1434 
1435 		vq->last_avail_idx += num_buffers;
1436 		vhost_virtqueue_reconnect_log_split(vq);
1437 	}
1438 
1439 	do_data_copy_enqueue(dev, vq);
1440 
1441 	if (likely(vq->shadow_used_idx)) {
1442 		flush_shadow_used_ring_split(dev, vq);
1443 		vhost_vring_call_split(dev, vq);
1444 	}
1445 
1446 	return pkt_idx;
1447 }
1448 
1449 static __rte_always_inline int
1450 virtio_dev_rx_sync_batch_check(struct virtio_net *dev,
1451 			   struct vhost_virtqueue *vq,
1452 			   struct rte_mbuf **pkts,
1453 			   uint64_t *desc_addrs,
1454 			   uint64_t *lens)
1455 	__rte_shared_locks_required(&vq->iotlb_lock)
1456 {
1457 	bool wrap_counter = vq->avail_wrap_counter;
1458 	struct vring_packed_desc *descs = vq->desc_packed;
1459 	uint16_t avail_idx = vq->last_avail_idx;
1460 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1461 	uint16_t i;
1462 
1463 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1464 		return -1;
1465 
1466 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1467 		return -1;
1468 
1469 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1470 		if (unlikely(pkts[i]->next != NULL))
1471 			return -1;
1472 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1473 					    wrap_counter)))
1474 			return -1;
1475 	}
1476 
1477 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1478 		lens[i] = descs[avail_idx + i].len;
1479 
1480 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1481 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1482 			return -1;
1483 	}
1484 
1485 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1486 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1487 						  descs[avail_idx + i].addr,
1488 						  &lens[i],
1489 						  VHOST_ACCESS_RW);
1490 
1491 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1492 		if (unlikely(!desc_addrs[i]))
1493 			return -1;
1494 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1495 			return -1;
1496 	}
1497 
1498 	return 0;
1499 }
1500 
1501 static __rte_always_inline int
1502 virtio_dev_rx_async_batch_check(struct vhost_virtqueue *vq,
1503 			   struct rte_mbuf **pkts,
1504 			   uint64_t *desc_addrs,
1505 			   uint64_t *lens,
1506 			   int16_t dma_id,
1507 			   uint16_t vchan_id)
1508 {
1509 	bool wrap_counter = vq->avail_wrap_counter;
1510 	struct vring_packed_desc *descs = vq->desc_packed;
1511 	uint16_t avail_idx = vq->last_avail_idx;
1512 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1513 	uint16_t i;
1514 
1515 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1516 		return -1;
1517 
1518 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1519 		return -1;
1520 
1521 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1522 		if (unlikely(pkts[i]->next != NULL))
1523 			return -1;
1524 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1525 					    wrap_counter)))
1526 			return -1;
1527 	}
1528 
1529 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1530 		lens[i] = descs[avail_idx + i].len;
1531 
1532 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1533 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1534 			return -1;
1535 	}
1536 
1537 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1538 		desc_addrs[i] =  descs[avail_idx + i].addr;
1539 
1540 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1541 		if (unlikely(!desc_addrs[i]))
1542 			return -1;
1543 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1544 			return -1;
1545 	}
1546 
1547 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
1548 		return -1;
1549 
1550 	return 0;
1551 }
1552 
1553 static __rte_always_inline void
1554 virtio_dev_rx_batch_packed_copy(struct virtio_net *dev,
1555 			   struct vhost_virtqueue *vq,
1556 			   struct rte_mbuf **pkts,
1557 			   uint64_t *desc_addrs,
1558 			   uint64_t *lens)
1559 	__rte_shared_locks_required(&vq->iotlb_lock)
1560 {
1561 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1562 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1563 	struct vring_packed_desc *descs = vq->desc_packed;
1564 	uint16_t avail_idx = vq->last_avail_idx;
1565 	uint16_t ids[PACKED_BATCH_SIZE];
1566 	uint16_t i;
1567 
1568 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1569 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1570 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1571 					(uintptr_t)desc_addrs[i];
1572 		lens[i] = pkts[i]->pkt_len +
1573 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1574 	}
1575 
1576 	if (rxvq_is_mergeable(dev)) {
1577 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1578 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
1579 		}
1580 	}
1581 
1582 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1583 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1584 
1585 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1586 
1587 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1588 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1589 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1590 			   pkts[i]->pkt_len);
1591 	}
1592 
1593 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1594 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1595 					   lens[i]);
1596 
1597 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1598 		ids[i] = descs[avail_idx + i].id;
1599 
1600 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1601 }
1602 
1603 static __rte_always_inline int
1604 virtio_dev_rx_sync_batch_packed(struct virtio_net *dev,
1605 			   struct vhost_virtqueue *vq,
1606 			   struct rte_mbuf **pkts)
1607 	__rte_shared_locks_required(&vq->iotlb_lock)
1608 {
1609 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1610 	uint64_t lens[PACKED_BATCH_SIZE];
1611 
1612 	if (virtio_dev_rx_sync_batch_check(dev, vq, pkts, desc_addrs, lens) == -1)
1613 		return -1;
1614 
1615 	if (vq->shadow_used_idx) {
1616 		do_data_copy_enqueue(dev, vq);
1617 		vhost_flush_enqueue_shadow_packed(dev, vq);
1618 	}
1619 
1620 	virtio_dev_rx_batch_packed_copy(dev, vq, pkts, desc_addrs, lens);
1621 
1622 	return 0;
1623 }
1624 
1625 static __rte_always_inline int16_t
1626 virtio_dev_rx_single_packed(struct virtio_net *dev,
1627 			    struct vhost_virtqueue *vq,
1628 			    struct rte_mbuf *pkt)
1629 	__rte_shared_locks_required(&vq->access_lock)
1630 	__rte_shared_locks_required(&vq->iotlb_lock)
1631 {
1632 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1633 	uint16_t nr_descs = 0;
1634 
1635 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1636 						 &nr_descs) < 0)) {
1637 		VHOST_DATA_LOG(dev->ifname, DEBUG, "failed to get enough desc from vring");
1638 		return -1;
1639 	}
1640 
1641 	VHOST_DATA_LOG(dev->ifname, DEBUG,
1642 		"current index %d | end index %d",
1643 		vq->last_avail_idx, vq->last_avail_idx + nr_descs);
1644 
1645 	vq_inc_last_avail_packed(vq, nr_descs);
1646 
1647 	return 0;
1648 }
1649 
1650 static __rte_noinline uint32_t
1651 virtio_dev_rx_packed(struct virtio_net *dev,
1652 		     struct vhost_virtqueue *__rte_restrict vq,
1653 		     struct rte_mbuf **__rte_restrict pkts,
1654 		     uint32_t count)
1655 	__rte_shared_locks_required(&vq->access_lock)
1656 	__rte_shared_locks_required(&vq->iotlb_lock)
1657 {
1658 	uint32_t pkt_idx = 0;
1659 
1660 	do {
1661 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1662 
1663 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1664 			if (!virtio_dev_rx_sync_batch_packed(dev, vq,
1665 							&pkts[pkt_idx])) {
1666 				pkt_idx += PACKED_BATCH_SIZE;
1667 				continue;
1668 			}
1669 		}
1670 
1671 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1672 			break;
1673 		pkt_idx++;
1674 
1675 	} while (pkt_idx < count);
1676 
1677 	if (vq->shadow_used_idx) {
1678 		do_data_copy_enqueue(dev, vq);
1679 		vhost_flush_enqueue_shadow_packed(dev, vq);
1680 	}
1681 
1682 	if (pkt_idx)
1683 		vhost_vring_call_packed(dev, vq);
1684 
1685 	return pkt_idx;
1686 }
1687 
1688 static void
1689 virtio_dev_vring_translate(struct virtio_net *dev, struct vhost_virtqueue *vq)
1690 {
1691 	rte_rwlock_write_lock(&vq->access_lock);
1692 	vhost_user_iotlb_rd_lock(vq);
1693 	if (!vq->access_ok)
1694 		vring_translate(dev, vq);
1695 	vhost_user_iotlb_rd_unlock(vq);
1696 	rte_rwlock_write_unlock(&vq->access_lock);
1697 }
1698 
1699 static __rte_always_inline uint32_t
1700 virtio_dev_rx(struct virtio_net *dev, struct vhost_virtqueue *vq,
1701 	struct rte_mbuf **pkts, uint32_t count)
1702 {
1703 	uint32_t nb_tx = 0;
1704 
1705 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
1706 	rte_rwlock_read_lock(&vq->access_lock);
1707 
1708 	if (unlikely(!vq->enabled))
1709 		goto out_access_unlock;
1710 
1711 	vhost_user_iotlb_rd_lock(vq);
1712 
1713 	if (unlikely(!vq->access_ok)) {
1714 		vhost_user_iotlb_rd_unlock(vq);
1715 		rte_rwlock_read_unlock(&vq->access_lock);
1716 
1717 		virtio_dev_vring_translate(dev, vq);
1718 		goto out_no_unlock;
1719 	}
1720 
1721 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1722 	if (count == 0)
1723 		goto out;
1724 
1725 	if (vq_is_packed(dev))
1726 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1727 	else
1728 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1729 
1730 	vhost_queue_stats_update(dev, vq, pkts, nb_tx);
1731 
1732 out:
1733 	vhost_user_iotlb_rd_unlock(vq);
1734 
1735 out_access_unlock:
1736 	rte_rwlock_read_unlock(&vq->access_lock);
1737 
1738 out_no_unlock:
1739 	return nb_tx;
1740 }
1741 
1742 uint16_t
1743 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1744 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1745 {
1746 	struct virtio_net *dev = get_device(vid);
1747 
1748 	if (!dev)
1749 		return 0;
1750 
1751 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1752 		VHOST_DATA_LOG(dev->ifname, ERR,
1753 			"%s: built-in vhost net backend is disabled.",
1754 			__func__);
1755 		return 0;
1756 	}
1757 
1758 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1759 		VHOST_DATA_LOG(dev->ifname, ERR,
1760 			"%s: invalid virtqueue idx %d.",
1761 			__func__, queue_id);
1762 		return 0;
1763 	}
1764 
1765 	return virtio_dev_rx(dev, dev->virtqueue[queue_id], pkts, count);
1766 }
1767 
1768 static __rte_always_inline uint16_t
1769 async_get_first_inflight_pkt_idx(struct vhost_virtqueue *vq)
1770 	__rte_shared_locks_required(&vq->access_lock)
1771 {
1772 	struct vhost_async *async = vq->async;
1773 
1774 	if (async->pkts_idx >= async->pkts_inflight_n)
1775 		return async->pkts_idx - async->pkts_inflight_n;
1776 	else
1777 		return vq->size - async->pkts_inflight_n + async->pkts_idx;
1778 }
1779 
1780 static __rte_always_inline void
1781 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1782 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1783 {
1784 	size_t elem_size = sizeof(struct vring_used_elem);
1785 
1786 	if (d_idx + count <= ring_size) {
1787 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1788 	} else {
1789 		uint16_t size = ring_size - d_idx;
1790 
1791 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1792 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1793 	}
1794 }
1795 
1796 static __rte_noinline uint32_t
1797 virtio_dev_rx_async_submit_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1798 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
1799 	__rte_exclusive_locks_required(&vq->access_lock)
1800 	__rte_shared_locks_required(&vq->iotlb_lock)
1801 {
1802 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1803 	uint32_t pkt_idx = 0;
1804 	uint16_t num_buffers;
1805 	uint16_t avail_head;
1806 
1807 	struct vhost_async *async = vq->async;
1808 	struct async_inflight_info *pkts_info = async->pkts_info;
1809 	uint32_t pkt_err = 0;
1810 	uint16_t n_xfer;
1811 	uint16_t slot_idx = 0;
1812 
1813 	/*
1814 	 * The ordering between avail index and desc reads need to be enforced.
1815 	 */
1816 	avail_head = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
1817 		rte_memory_order_acquire);
1818 
1819 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1820 
1821 	async_iter_reset(async);
1822 
1823 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1824 		uint64_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1825 		uint16_t nr_vec = 0;
1826 
1827 		if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec,
1828 						&num_buffers, avail_head, &nr_vec) < 0)) {
1829 			VHOST_DATA_LOG(dev->ifname, DEBUG,
1830 				"failed to get enough desc from vring");
1831 			vq->shadow_used_idx -= num_buffers;
1832 			break;
1833 		}
1834 
1835 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1836 			"current index %d | end index %d",
1837 			vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1838 
1839 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers, true) < 0) {
1840 			vq->shadow_used_idx -= num_buffers;
1841 			break;
1842 		}
1843 
1844 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
1845 		pkts_info[slot_idx].descs = num_buffers;
1846 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1847 
1848 		vq->last_avail_idx += num_buffers;
1849 		vhost_virtqueue_reconnect_log_split(vq);
1850 	}
1851 
1852 	if (unlikely(pkt_idx == 0))
1853 		return 0;
1854 
1855 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
1856 			async->iov_iter, pkt_idx);
1857 
1858 	pkt_err = pkt_idx - n_xfer;
1859 	if (unlikely(pkt_err)) {
1860 		uint16_t num_descs = 0;
1861 
1862 		VHOST_DATA_LOG(dev->ifname, DEBUG,
1863 			"%s: failed to transfer %u packets for queue %u.",
1864 			__func__, pkt_err, vq->index);
1865 
1866 		/* update number of completed packets */
1867 		pkt_idx = n_xfer;
1868 
1869 		/* calculate the sum of descriptors to revert */
1870 		while (pkt_err-- > 0) {
1871 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1872 			slot_idx--;
1873 		}
1874 
1875 		/* recover shadow used ring and available ring */
1876 		vq->shadow_used_idx -= num_descs;
1877 		vq->last_avail_idx -= num_descs;
1878 		vhost_virtqueue_reconnect_log_split(vq);
1879 	}
1880 
1881 	/* keep used descriptors */
1882 	if (likely(vq->shadow_used_idx)) {
1883 		uint16_t to = async->desc_idx_split & (vq->size - 1);
1884 
1885 		store_dma_desc_info_split(vq->shadow_used_split,
1886 				async->descs_split, vq->size, 0, to,
1887 				vq->shadow_used_idx);
1888 
1889 		async->desc_idx_split += vq->shadow_used_idx;
1890 
1891 		async->pkts_idx += pkt_idx;
1892 		if (async->pkts_idx >= vq->size)
1893 			async->pkts_idx -= vq->size;
1894 
1895 		async->pkts_inflight_n += pkt_idx;
1896 		vq->shadow_used_idx = 0;
1897 	}
1898 
1899 	return pkt_idx;
1900 }
1901 
1902 
1903 static __rte_always_inline int
1904 vhost_enqueue_async_packed(struct virtio_net *dev,
1905 			    struct vhost_virtqueue *vq,
1906 			    struct rte_mbuf *pkt,
1907 			    struct buf_vector *buf_vec,
1908 			    uint16_t *nr_descs,
1909 			    uint16_t *nr_buffers)
1910 	__rte_exclusive_locks_required(&vq->access_lock)
1911 	__rte_shared_locks_required(&vq->iotlb_lock)
1912 {
1913 	uint16_t nr_vec = 0;
1914 	uint16_t avail_idx = vq->last_avail_idx;
1915 	uint16_t max_tries, tries = 0;
1916 	uint16_t buf_id = 0;
1917 	uint32_t len = 0;
1918 	uint16_t desc_count = 0;
1919 	uint64_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1920 	uint32_t buffer_len[vq->size];
1921 	uint16_t buffer_buf_id[vq->size];
1922 	uint16_t buffer_desc_count[vq->size];
1923 
1924 	if (rxvq_is_mergeable(dev))
1925 		max_tries = vq->size - 1;
1926 	else
1927 		max_tries = 1;
1928 
1929 	do {
1930 		/*
1931 		 * if we tried all available ring items, and still
1932 		 * can't get enough buf, it means something abnormal
1933 		 * happened.
1934 		 */
1935 		if (unlikely(++tries > max_tries))
1936 			return -1;
1937 
1938 		if (unlikely(fill_vec_buf_packed(dev, vq,
1939 						avail_idx, &desc_count,
1940 						buf_vec, &nr_vec,
1941 						&buf_id, &len,
1942 						VHOST_ACCESS_RW) < 0))
1943 			return -1;
1944 
1945 		len = RTE_MIN(len, size);
1946 		size -= len;
1947 
1948 		buffer_len[*nr_buffers] = len;
1949 		buffer_buf_id[*nr_buffers] = buf_id;
1950 		buffer_desc_count[*nr_buffers] = desc_count;
1951 		*nr_buffers += 1;
1952 		*nr_descs += desc_count;
1953 		avail_idx += desc_count;
1954 		if (avail_idx >= vq->size)
1955 			avail_idx -= vq->size;
1956 	} while (size > 0);
1957 
1958 	if (unlikely(mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, true) < 0))
1959 		return -1;
1960 
1961 	vhost_async_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
1962 					buffer_desc_count, *nr_buffers);
1963 
1964 	return 0;
1965 }
1966 
1967 static __rte_always_inline int16_t
1968 virtio_dev_rx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1969 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers)
1970 	__rte_exclusive_locks_required(&vq->access_lock)
1971 	__rte_shared_locks_required(&vq->iotlb_lock)
1972 {
1973 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1974 
1975 	if (unlikely(vhost_enqueue_async_packed(dev, vq, pkt, buf_vec,
1976 					nr_descs, nr_buffers) < 0)) {
1977 		VHOST_DATA_LOG(dev->ifname, DEBUG, "failed to get enough desc from vring");
1978 		return -1;
1979 	}
1980 
1981 	VHOST_DATA_LOG(dev->ifname, DEBUG,
1982 		"current index %d | end index %d",
1983 		vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1984 
1985 	return 0;
1986 }
1987 
1988 static __rte_always_inline void
1989 virtio_dev_rx_async_packed_batch_enqueue(struct virtio_net *dev,
1990 			   struct vhost_virtqueue *vq,
1991 			   struct rte_mbuf **pkts,
1992 			   uint64_t *desc_addrs,
1993 			   uint64_t *lens)
1994 	__rte_exclusive_locks_required(&vq->access_lock)
1995 	__rte_shared_locks_required(&vq->iotlb_lock)
1996 {
1997 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1998 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1999 	struct vring_packed_desc *descs = vq->desc_packed;
2000 	struct vhost_async *async = vq->async;
2001 	uint16_t avail_idx = vq->last_avail_idx;
2002 	uint32_t mbuf_offset = 0;
2003 	uint16_t ids[PACKED_BATCH_SIZE];
2004 	uint64_t mapped_len[PACKED_BATCH_SIZE];
2005 	void *host_iova[PACKED_BATCH_SIZE];
2006 	uintptr_t desc;
2007 	uint16_t i;
2008 
2009 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2010 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2011 		desc = vhost_iova_to_vva(dev, vq, desc_addrs[i], &lens[i], VHOST_ACCESS_RW);
2012 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc;
2013 		lens[i] = pkts[i]->pkt_len +
2014 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
2015 	}
2016 
2017 	if (rxvq_is_mergeable(dev)) {
2018 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2019 			ASSIGN_UNLESS_EQUAL(hdrs[i]->num_buffers, 1);
2020 		}
2021 	}
2022 
2023 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2024 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
2025 
2026 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2027 
2028 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2029 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
2030 			desc_addrs[i] + buf_offset, lens[i], &mapped_len[i]);
2031 	}
2032 
2033 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2034 		async_iter_initialize(dev, async);
2035 		async_iter_add_iovec(dev, async,
2036 				(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
2037 				host_iova[i],
2038 				mapped_len[i]);
2039 		async->iter_idx++;
2040 	}
2041 
2042 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2043 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr, lens[i]);
2044 
2045 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2046 		ids[i] = descs[avail_idx + i].id;
2047 
2048 	vhost_async_shadow_enqueue_packed_batch(vq, lens, ids);
2049 }
2050 
2051 static __rte_always_inline int
2052 virtio_dev_rx_async_packed_batch(struct virtio_net *dev,
2053 			   struct vhost_virtqueue *vq,
2054 			   struct rte_mbuf **pkts,
2055 			   int16_t dma_id, uint16_t vchan_id)
2056 	__rte_exclusive_locks_required(&vq->access_lock)
2057 	__rte_shared_locks_required(&vq->iotlb_lock)
2058 {
2059 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
2060 	uint64_t lens[PACKED_BATCH_SIZE];
2061 
2062 	if (virtio_dev_rx_async_batch_check(vq, pkts, desc_addrs, lens, dma_id, vchan_id) == -1)
2063 		return -1;
2064 
2065 	virtio_dev_rx_async_packed_batch_enqueue(dev, vq, pkts, desc_addrs, lens);
2066 
2067 	return 0;
2068 }
2069 
2070 static __rte_always_inline void
2071 dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
2072 			uint32_t nr_err, uint32_t *pkt_idx)
2073 	__rte_exclusive_locks_required(&vq->access_lock)
2074 {
2075 	uint16_t descs_err = 0;
2076 	uint16_t buffers_err = 0;
2077 	struct vhost_async *async = vq->async;
2078 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
2079 
2080 	*pkt_idx -= nr_err;
2081 	/* calculate the sum of buffers and descs of DMA-error packets. */
2082 	while (nr_err-- > 0) {
2083 		descs_err += pkts_info[slot_idx % vq->size].descs;
2084 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
2085 		slot_idx--;
2086 	}
2087 
2088 	if (vq->last_avail_idx >= descs_err) {
2089 		vq->last_avail_idx -= descs_err;
2090 	} else {
2091 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
2092 		vq->avail_wrap_counter ^= 1;
2093 	}
2094 	vhost_virtqueue_reconnect_log_packed(vq);
2095 
2096 	if (async->buffer_idx_packed >= buffers_err)
2097 		async->buffer_idx_packed -= buffers_err;
2098 	else
2099 		async->buffer_idx_packed = async->buffer_idx_packed + vq->size - buffers_err;
2100 }
2101 
2102 static __rte_noinline uint32_t
2103 virtio_dev_rx_async_submit_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2104 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2105 	__rte_exclusive_locks_required(&vq->access_lock)
2106 	__rte_shared_locks_required(&vq->iotlb_lock)
2107 {
2108 	uint32_t pkt_idx = 0;
2109 	uint16_t n_xfer;
2110 	uint16_t num_buffers;
2111 	uint16_t num_descs;
2112 
2113 	struct vhost_async *async = vq->async;
2114 	struct async_inflight_info *pkts_info = async->pkts_info;
2115 	uint32_t pkt_err = 0;
2116 	uint16_t slot_idx = 0;
2117 	uint16_t i;
2118 
2119 	do {
2120 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2121 
2122 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
2123 			if (!virtio_dev_rx_async_packed_batch(dev, vq, &pkts[pkt_idx],
2124 					dma_id, vchan_id)) {
2125 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
2126 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2127 					pkts_info[slot_idx].descs = 1;
2128 					pkts_info[slot_idx].nr_buffers = 1;
2129 					pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2130 					pkt_idx++;
2131 				}
2132 				continue;
2133 			}
2134 		}
2135 
2136 		num_buffers = 0;
2137 		num_descs = 0;
2138 		if (unlikely(virtio_dev_rx_async_packed(dev, vq, pkts[pkt_idx],
2139 						&num_descs, &num_buffers) < 0))
2140 			break;
2141 
2142 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
2143 
2144 		pkts_info[slot_idx].descs = num_descs;
2145 		pkts_info[slot_idx].nr_buffers = num_buffers;
2146 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
2147 
2148 		pkt_idx++;
2149 		vq_inc_last_avail_packed(vq, num_descs);
2150 	} while (pkt_idx < count);
2151 
2152 	if (unlikely(pkt_idx == 0))
2153 		return 0;
2154 
2155 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
2156 			async->iov_iter, pkt_idx);
2157 
2158 	async_iter_reset(async);
2159 
2160 	pkt_err = pkt_idx - n_xfer;
2161 	if (unlikely(pkt_err)) {
2162 		VHOST_DATA_LOG(dev->ifname, DEBUG,
2163 			"%s: failed to transfer %u packets for queue %u.",
2164 			__func__, pkt_err, vq->index);
2165 		dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
2166 	}
2167 
2168 	async->pkts_idx += pkt_idx;
2169 	if (async->pkts_idx >= vq->size)
2170 		async->pkts_idx -= vq->size;
2171 
2172 	async->pkts_inflight_n += pkt_idx;
2173 
2174 	return pkt_idx;
2175 }
2176 
2177 static __rte_always_inline void
2178 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
2179 	__rte_shared_locks_required(&vq->access_lock)
2180 {
2181 	struct vhost_async *async = vq->async;
2182 	uint16_t nr_left = n_descs;
2183 	uint16_t nr_copy;
2184 	uint16_t to, from;
2185 
2186 	do {
2187 		from = async->last_desc_idx_split & (vq->size - 1);
2188 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
2189 		to = vq->last_used_idx & (vq->size - 1);
2190 
2191 		if (to + nr_copy <= vq->size) {
2192 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2193 					nr_copy * sizeof(struct vring_used_elem));
2194 		} else {
2195 			uint16_t size = vq->size - to;
2196 
2197 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
2198 					size * sizeof(struct vring_used_elem));
2199 			rte_memcpy(&vq->used->ring[0], &async->descs_split[from + size],
2200 					(nr_copy - size) * sizeof(struct vring_used_elem));
2201 		}
2202 
2203 		async->last_desc_idx_split += nr_copy;
2204 		vq->last_used_idx += nr_copy;
2205 		nr_left -= nr_copy;
2206 	} while (nr_left > 0);
2207 }
2208 
2209 static __rte_always_inline void
2210 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
2211 				uint16_t n_buffers)
2212 	__rte_shared_locks_required(&vq->access_lock)
2213 {
2214 	struct vhost_async *async = vq->async;
2215 	uint16_t from = async->last_buffer_idx_packed;
2216 	uint16_t used_idx = vq->last_used_idx;
2217 	uint16_t head_idx = vq->last_used_idx;
2218 	uint16_t head_flags = 0;
2219 	uint16_t i;
2220 
2221 	/* Split loop in two to save memory barriers */
2222 	for (i = 0; i < n_buffers; i++) {
2223 		vq->desc_packed[used_idx].id = async->buffers_packed[from].id;
2224 		vq->desc_packed[used_idx].len = async->buffers_packed[from].len;
2225 
2226 		used_idx += async->buffers_packed[from].count;
2227 		if (used_idx >= vq->size)
2228 			used_idx -= vq->size;
2229 
2230 		from++;
2231 		if (from >= vq->size)
2232 			from = 0;
2233 	}
2234 
2235 	/* The ordering for storing desc flags needs to be enforced. */
2236 	rte_atomic_thread_fence(rte_memory_order_release);
2237 
2238 	from = async->last_buffer_idx_packed;
2239 
2240 	for (i = 0; i < n_buffers; i++) {
2241 		uint16_t flags;
2242 
2243 		if (async->buffers_packed[from].len)
2244 			flags = VRING_DESC_F_WRITE;
2245 		else
2246 			flags = 0;
2247 
2248 		if (vq->used_wrap_counter) {
2249 			flags |= VRING_DESC_F_USED;
2250 			flags |= VRING_DESC_F_AVAIL;
2251 		} else {
2252 			flags &= ~VRING_DESC_F_USED;
2253 			flags &= ~VRING_DESC_F_AVAIL;
2254 		}
2255 
2256 		if (i > 0) {
2257 			vq->desc_packed[vq->last_used_idx].flags = flags;
2258 		} else {
2259 			head_idx = vq->last_used_idx;
2260 			head_flags = flags;
2261 		}
2262 
2263 		vq_inc_last_used_packed(vq, async->buffers_packed[from].count);
2264 
2265 		from++;
2266 		if (from == vq->size)
2267 			from = 0;
2268 	}
2269 
2270 	vq->desc_packed[head_idx].flags = head_flags;
2271 	async->last_buffer_idx_packed = from;
2272 }
2273 
2274 static __rte_always_inline uint16_t
2275 vhost_poll_enqueue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
2276 	struct rte_mbuf **pkts, uint16_t count, int16_t dma_id, uint16_t vchan_id)
2277 	__rte_shared_locks_required(&vq->access_lock)
2278 {
2279 	struct vhost_async *async = vq->async;
2280 	struct async_inflight_info *pkts_info = async->pkts_info;
2281 	uint16_t nr_cpl_pkts = 0;
2282 	uint16_t n_descs = 0, n_buffers = 0;
2283 	uint16_t start_idx, from, i;
2284 
2285 	/* Check completed copies for the given DMA vChannel */
2286 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
2287 
2288 	start_idx = async_get_first_inflight_pkt_idx(vq);
2289 	/**
2290 	 * Calculate the number of copy completed packets.
2291 	 * Note that there may be completed packets even if
2292 	 * no copies are reported done by the given DMA vChannel,
2293 	 * as it's possible that a virtqueue uses multiple DMA
2294 	 * vChannels.
2295 	 */
2296 	from = start_idx;
2297 	while (vq->async->pkts_cmpl_flag[from] && count--) {
2298 		vq->async->pkts_cmpl_flag[from] = false;
2299 		from++;
2300 		if (from >= vq->size)
2301 			from -= vq->size;
2302 		nr_cpl_pkts++;
2303 	}
2304 
2305 	if (nr_cpl_pkts == 0)
2306 		return 0;
2307 
2308 	for (i = 0; i < nr_cpl_pkts; i++) {
2309 		from = (start_idx + i) % vq->size;
2310 		/* Only used with packed ring */
2311 		n_buffers += pkts_info[from].nr_buffers;
2312 		/* Only used with split ring */
2313 		n_descs += pkts_info[from].descs;
2314 		pkts[i] = pkts_info[from].mbuf;
2315 	}
2316 
2317 	async->pkts_inflight_n -= nr_cpl_pkts;
2318 
2319 	if (likely(vq->enabled && vq->access_ok)) {
2320 		if (vq_is_packed(dev)) {
2321 			write_back_completed_descs_packed(vq, n_buffers);
2322 			vhost_vring_call_packed(dev, vq);
2323 		} else {
2324 			write_back_completed_descs_split(vq, n_descs);
2325 			rte_atomic_fetch_add_explicit(
2326 				(unsigned short __rte_atomic *)&vq->used->idx,
2327 				n_descs, rte_memory_order_release);
2328 			vhost_vring_call_split(dev, vq);
2329 		}
2330 	} else {
2331 		if (vq_is_packed(dev)) {
2332 			async->last_buffer_idx_packed += n_buffers;
2333 			if (async->last_buffer_idx_packed >= vq->size)
2334 				async->last_buffer_idx_packed -= vq->size;
2335 		} else {
2336 			async->last_desc_idx_split += n_descs;
2337 		}
2338 	}
2339 
2340 	return nr_cpl_pkts;
2341 }
2342 
2343 uint16_t
2344 rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
2345 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2346 		uint16_t vchan_id)
2347 {
2348 	struct virtio_net *dev = get_device(vid);
2349 	struct vhost_virtqueue *vq;
2350 	uint16_t n_pkts_cpl = 0;
2351 
2352 	if (unlikely(!dev))
2353 		return 0;
2354 
2355 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2356 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2357 		VHOST_DATA_LOG(dev->ifname, ERR,
2358 			"%s: invalid virtqueue idx %d.",
2359 			__func__, queue_id);
2360 		return 0;
2361 	}
2362 
2363 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2364 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2365 		VHOST_DATA_LOG(dev->ifname, ERR,
2366 			"%s: invalid channel %d:%u.",
2367 			__func__, dma_id, vchan_id);
2368 		return 0;
2369 	}
2370 
2371 	vq = dev->virtqueue[queue_id];
2372 
2373 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2374 		VHOST_DATA_LOG(dev->ifname, DEBUG,
2375 			"%s: virtqueue %u is busy.",
2376 			__func__, queue_id);
2377 		return 0;
2378 	}
2379 
2380 	if (unlikely(!vq->async)) {
2381 		VHOST_DATA_LOG(dev->ifname, ERR,
2382 			"%s: async not registered for virtqueue %d.",
2383 			__func__, queue_id);
2384 		goto out;
2385 	}
2386 
2387 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count, dma_id, vchan_id);
2388 
2389 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2390 	vq->stats.inflight_completed += n_pkts_cpl;
2391 
2392 out:
2393 	rte_rwlock_read_unlock(&vq->access_lock);
2394 
2395 	return n_pkts_cpl;
2396 }
2397 
2398 uint16_t
2399 rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
2400 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2401 		uint16_t vchan_id)
2402 {
2403 	struct virtio_net *dev = get_device(vid);
2404 	struct vhost_virtqueue *vq;
2405 	uint16_t n_pkts_cpl = 0;
2406 
2407 	if (!dev)
2408 		return 0;
2409 
2410 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2411 	if (unlikely(queue_id >= dev->nr_vring)) {
2412 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %d.",
2413 			__func__, queue_id);
2414 		return 0;
2415 	}
2416 
2417 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2418 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
2419 			__func__, dma_id);
2420 		return 0;
2421 	}
2422 
2423 	vq = dev->virtqueue[queue_id];
2424 
2425 	vq_assert_lock(dev, vq);
2426 
2427 	if (unlikely(!vq->async)) {
2428 		VHOST_DATA_LOG(dev->ifname, ERR,
2429 			"%s: async not registered for virtqueue %d.",
2430 			__func__, queue_id);
2431 		return 0;
2432 	}
2433 
2434 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2435 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2436 		VHOST_DATA_LOG(dev->ifname, ERR,
2437 			"%s: invalid channel %d:%u.",
2438 			__func__, dma_id, vchan_id);
2439 		return 0;
2440 	}
2441 
2442 	if ((queue_id & 1) == 0)
2443 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2444 			dma_id, vchan_id);
2445 	else
2446 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2447 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2448 
2449 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2450 	vq->stats.inflight_completed += n_pkts_cpl;
2451 
2452 	return n_pkts_cpl;
2453 }
2454 
2455 uint16_t
2456 rte_vhost_clear_queue(int vid, uint16_t queue_id, struct rte_mbuf **pkts,
2457 		uint16_t count, int16_t dma_id, uint16_t vchan_id)
2458 {
2459 	struct virtio_net *dev = get_device(vid);
2460 	struct vhost_virtqueue *vq;
2461 	uint16_t n_pkts_cpl = 0;
2462 
2463 	if (!dev)
2464 		return 0;
2465 
2466 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2467 	if (unlikely(queue_id >= dev->nr_vring)) {
2468 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %u.",
2469 			__func__, queue_id);
2470 		return 0;
2471 	}
2472 
2473 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
2474 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
2475 			__func__, dma_id);
2476 		return 0;
2477 	}
2478 
2479 	vq = dev->virtqueue[queue_id];
2480 
2481 	if (rte_rwlock_read_trylock(&vq->access_lock)) {
2482 		VHOST_DATA_LOG(dev->ifname, DEBUG, "%s: virtqueue %u is busy.",
2483 			__func__, queue_id);
2484 		return 0;
2485 	}
2486 
2487 	if (unlikely(!vq->async)) {
2488 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: async not registered for queue id %u.",
2489 			__func__, queue_id);
2490 		goto out_access_unlock;
2491 	}
2492 
2493 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2494 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2495 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid channel %d:%u.",
2496 			__func__, dma_id, vchan_id);
2497 		goto out_access_unlock;
2498 	}
2499 
2500 	if ((queue_id & 1) == 0)
2501 		n_pkts_cpl = vhost_poll_enqueue_completed(dev, vq, pkts, count,
2502 			dma_id, vchan_id);
2503 	else
2504 		n_pkts_cpl = async_poll_dequeue_completed(dev, vq, pkts, count,
2505 			dma_id, vchan_id, dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS);
2506 
2507 	vhost_queue_stats_update(dev, vq, pkts, n_pkts_cpl);
2508 	vq->stats.inflight_completed += n_pkts_cpl;
2509 
2510 out_access_unlock:
2511 	rte_rwlock_read_unlock(&vq->access_lock);
2512 
2513 	return n_pkts_cpl;
2514 }
2515 
2516 static __rte_always_inline uint32_t
2517 virtio_dev_rx_async_submit(struct virtio_net *dev, struct vhost_virtqueue *vq,
2518 	struct rte_mbuf **pkts, uint32_t count, int16_t dma_id, uint16_t vchan_id)
2519 {
2520 	uint32_t nb_tx = 0;
2521 
2522 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
2523 
2524 	if (unlikely(!dma_copy_track[dma_id].vchans ||
2525 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
2526 		VHOST_DATA_LOG(dev->ifname, ERR,
2527 			"%s: invalid channel %d:%u.",
2528 			 __func__, dma_id, vchan_id);
2529 		return 0;
2530 	}
2531 
2532 	rte_rwlock_write_lock(&vq->access_lock);
2533 
2534 	if (unlikely(!vq->enabled || !vq->async))
2535 		goto out_access_unlock;
2536 
2537 	vhost_user_iotlb_rd_lock(vq);
2538 
2539 	if (unlikely(!vq->access_ok)) {
2540 		vhost_user_iotlb_rd_unlock(vq);
2541 		rte_rwlock_read_unlock(&vq->access_lock);
2542 
2543 		virtio_dev_vring_translate(dev, vq);
2544 		goto out_no_unlock;
2545 	}
2546 
2547 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
2548 	if (count == 0)
2549 		goto out;
2550 
2551 	if (vq_is_packed(dev))
2552 		nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, pkts, count,
2553 			dma_id, vchan_id);
2554 	else
2555 		nb_tx = virtio_dev_rx_async_submit_split(dev, vq, pkts, count,
2556 			dma_id, vchan_id);
2557 
2558 	vq->stats.inflight_submitted += nb_tx;
2559 
2560 out:
2561 	vhost_user_iotlb_rd_unlock(vq);
2562 
2563 out_access_unlock:
2564 	rte_rwlock_write_unlock(&vq->access_lock);
2565 
2566 out_no_unlock:
2567 	return nb_tx;
2568 }
2569 
2570 uint16_t
2571 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2572 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
2573 		uint16_t vchan_id)
2574 {
2575 	struct virtio_net *dev = get_device(vid);
2576 
2577 	if (!dev)
2578 		return 0;
2579 
2580 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2581 		VHOST_DATA_LOG(dev->ifname, ERR,
2582 			"%s: built-in vhost net backend is disabled.",
2583 			__func__);
2584 		return 0;
2585 	}
2586 
2587 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2588 		VHOST_DATA_LOG(dev->ifname, ERR,
2589 			"%s: invalid virtqueue idx %d.",
2590 			__func__, queue_id);
2591 		return 0;
2592 	}
2593 
2594 	return virtio_dev_rx_async_submit(dev, dev->virtqueue[queue_id], pkts, count,
2595 		dma_id, vchan_id);
2596 }
2597 
2598 static inline bool
2599 virtio_net_with_host_offload(struct virtio_net *dev)
2600 {
2601 	if (dev->features &
2602 			((1ULL << VIRTIO_NET_F_CSUM) |
2603 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2604 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2605 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2606 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2607 		return true;
2608 
2609 	return false;
2610 }
2611 
2612 static int
2613 parse_headers(struct rte_mbuf *m, uint8_t *l4_proto)
2614 {
2615 	struct rte_ipv4_hdr *ipv4_hdr;
2616 	struct rte_ipv6_hdr *ipv6_hdr;
2617 	struct rte_ether_hdr *eth_hdr;
2618 	uint16_t ethertype;
2619 	uint16_t data_len = rte_pktmbuf_data_len(m);
2620 
2621 	if (data_len < sizeof(struct rte_ether_hdr))
2622 		return -EINVAL;
2623 
2624 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2625 
2626 	m->l2_len = sizeof(struct rte_ether_hdr);
2627 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2628 
2629 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2630 		if (data_len < sizeof(struct rte_ether_hdr) +
2631 				sizeof(struct rte_vlan_hdr))
2632 			goto error;
2633 
2634 		struct rte_vlan_hdr *vlan_hdr =
2635 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2636 
2637 		m->l2_len += sizeof(struct rte_vlan_hdr);
2638 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2639 	}
2640 
2641 	switch (ethertype) {
2642 	case RTE_ETHER_TYPE_IPV4:
2643 		if (data_len < m->l2_len + sizeof(struct rte_ipv4_hdr))
2644 			goto error;
2645 		ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
2646 				m->l2_len);
2647 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2648 		if (data_len < m->l2_len + m->l3_len)
2649 			goto error;
2650 		m->ol_flags |= RTE_MBUF_F_TX_IPV4;
2651 		*l4_proto = ipv4_hdr->next_proto_id;
2652 		break;
2653 	case RTE_ETHER_TYPE_IPV6:
2654 		if (data_len < m->l2_len + sizeof(struct rte_ipv6_hdr))
2655 			goto error;
2656 		ipv6_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
2657 				m->l2_len);
2658 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2659 		m->ol_flags |= RTE_MBUF_F_TX_IPV6;
2660 		*l4_proto = ipv6_hdr->proto;
2661 		break;
2662 	default:
2663 		/* a valid L3 header is needed for further L4 parsing */
2664 		goto error;
2665 	}
2666 
2667 	/* both CSUM and GSO need a valid L4 header */
2668 	switch (*l4_proto) {
2669 	case IPPROTO_TCP:
2670 		if (data_len < m->l2_len + m->l3_len +
2671 				sizeof(struct rte_tcp_hdr))
2672 			goto error;
2673 		break;
2674 	case IPPROTO_UDP:
2675 		if (data_len < m->l2_len + m->l3_len +
2676 				sizeof(struct rte_udp_hdr))
2677 			goto error;
2678 		break;
2679 	case IPPROTO_SCTP:
2680 		if (data_len < m->l2_len + m->l3_len +
2681 				sizeof(struct rte_sctp_hdr))
2682 			goto error;
2683 		break;
2684 	default:
2685 		goto error;
2686 	}
2687 
2688 	return 0;
2689 
2690 error:
2691 	m->l2_len = 0;
2692 	m->l3_len = 0;
2693 	m->ol_flags = 0;
2694 	return -EINVAL;
2695 }
2696 
2697 static __rte_always_inline void
2698 vhost_dequeue_offload_legacy(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2699 		struct rte_mbuf *m)
2700 {
2701 	uint8_t l4_proto = 0;
2702 	struct rte_tcp_hdr *tcp_hdr = NULL;
2703 	uint16_t tcp_len;
2704 	uint16_t data_len = rte_pktmbuf_data_len(m);
2705 
2706 	if (parse_headers(m, &l4_proto) < 0)
2707 		return;
2708 
2709 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2710 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2711 			switch (hdr->csum_offset) {
2712 			case (offsetof(struct rte_tcp_hdr, cksum)):
2713 				if (l4_proto != IPPROTO_TCP)
2714 					goto error;
2715 				m->ol_flags |= RTE_MBUF_F_TX_TCP_CKSUM;
2716 				break;
2717 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2718 				if (l4_proto != IPPROTO_UDP)
2719 					goto error;
2720 				m->ol_flags |= RTE_MBUF_F_TX_UDP_CKSUM;
2721 				break;
2722 			case (offsetof(struct rte_sctp_hdr, cksum)):
2723 				if (l4_proto != IPPROTO_SCTP)
2724 					goto error;
2725 				m->ol_flags |= RTE_MBUF_F_TX_SCTP_CKSUM;
2726 				break;
2727 			default:
2728 				goto error;
2729 			}
2730 		} else {
2731 			goto error;
2732 		}
2733 	}
2734 
2735 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2736 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2737 		case VIRTIO_NET_HDR_GSO_TCPV4:
2738 		case VIRTIO_NET_HDR_GSO_TCPV6:
2739 			if (l4_proto != IPPROTO_TCP)
2740 				goto error;
2741 			tcp_hdr = rte_pktmbuf_mtod_offset(m,
2742 					struct rte_tcp_hdr *,
2743 					m->l2_len + m->l3_len);
2744 			tcp_len = (tcp_hdr->data_off & 0xf0) >> 2;
2745 			if (data_len < m->l2_len + m->l3_len + tcp_len)
2746 				goto error;
2747 			m->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
2748 			m->tso_segsz = hdr->gso_size;
2749 			m->l4_len = tcp_len;
2750 			break;
2751 		case VIRTIO_NET_HDR_GSO_UDP:
2752 			if (l4_proto != IPPROTO_UDP)
2753 				goto error;
2754 			m->ol_flags |= RTE_MBUF_F_TX_UDP_SEG;
2755 			m->tso_segsz = hdr->gso_size;
2756 			m->l4_len = sizeof(struct rte_udp_hdr);
2757 			break;
2758 		default:
2759 			VHOST_DATA_LOG(dev->ifname, WARNING,
2760 				"unsupported gso type %u.",
2761 				hdr->gso_type);
2762 			goto error;
2763 		}
2764 	}
2765 	return;
2766 
2767 error:
2768 	m->l2_len = 0;
2769 	m->l3_len = 0;
2770 	m->ol_flags = 0;
2771 }
2772 
2773 static __rte_always_inline void
2774 vhost_dequeue_offload(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2775 		struct rte_mbuf *m, bool legacy_ol_flags)
2776 {
2777 	struct rte_net_hdr_lens hdr_lens;
2778 	int l4_supported = 0;
2779 	uint32_t ptype;
2780 
2781 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2782 		return;
2783 
2784 	if (legacy_ol_flags) {
2785 		vhost_dequeue_offload_legacy(dev, hdr, m);
2786 		return;
2787 	}
2788 
2789 	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
2790 
2791 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2792 	m->packet_type = ptype;
2793 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2794 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2795 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2796 		l4_supported = 1;
2797 
2798 	/* According to Virtio 1.1 spec, the device only needs to look at
2799 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2800 	 * This differs from the processing incoming packets path where the
2801 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2802 	 * device.
2803 	 *
2804 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2805 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2806 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2807 	 *
2808 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2809 	 * The device MUST ignore flag bits that it does not recognize.
2810 	 */
2811 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2812 		uint32_t hdrlen;
2813 
2814 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2815 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2816 			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
2817 		} else {
2818 			/* Unknown proto or tunnel, do sw cksum. We can assume
2819 			 * the cksum field is in the first segment since the
2820 			 * buffers we provided to the host are large enough.
2821 			 * In case of SCTP, this will be wrong since it's a CRC
2822 			 * but there's nothing we can do.
2823 			 */
2824 			uint16_t csum = 0, off;
2825 
2826 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2827 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2828 				return;
2829 			if (likely(csum != 0xffff))
2830 				csum = ~csum;
2831 			off = hdr->csum_offset + hdr->csum_start;
2832 			if (rte_pktmbuf_data_len(m) >= off + 1)
2833 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2834 		}
2835 	}
2836 
2837 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2838 		if (hdr->gso_size == 0)
2839 			return;
2840 
2841 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2842 		case VIRTIO_NET_HDR_GSO_TCPV4:
2843 		case VIRTIO_NET_HDR_GSO_TCPV6:
2844 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2845 				break;
2846 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2847 			m->tso_segsz = hdr->gso_size;
2848 			break;
2849 		case VIRTIO_NET_HDR_GSO_UDP:
2850 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2851 				break;
2852 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2853 			m->tso_segsz = hdr->gso_size;
2854 			break;
2855 		default:
2856 			break;
2857 		}
2858 	}
2859 }
2860 
2861 static __rte_noinline void
2862 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2863 		struct buf_vector *buf_vec)
2864 {
2865 	uint64_t len;
2866 	uint64_t remain = sizeof(struct virtio_net_hdr);
2867 	uint64_t src;
2868 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2869 
2870 	while (remain) {
2871 		len = RTE_MIN(remain, buf_vec->buf_len);
2872 		src = buf_vec->buf_addr;
2873 		rte_memcpy((void *)(uintptr_t)dst,
2874 				(void *)(uintptr_t)src, len);
2875 
2876 		remain -= len;
2877 		dst += len;
2878 		buf_vec++;
2879 	}
2880 }
2881 
2882 static __rte_always_inline int
2883 desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2884 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2885 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2886 		  bool legacy_ol_flags, uint16_t slot_idx, bool is_async)
2887 	__rte_shared_locks_required(&vq->access_lock)
2888 	__rte_shared_locks_required(&vq->iotlb_lock)
2889 {
2890 	uint32_t buf_avail, buf_offset, buf_len;
2891 	uint64_t buf_addr, buf_iova;
2892 	uint32_t mbuf_avail, mbuf_offset;
2893 	uint32_t hdr_remain = dev->vhost_hlen;
2894 	uint32_t cpy_len;
2895 	struct rte_mbuf *cur = m, *prev = m;
2896 	struct virtio_net_hdr tmp_hdr;
2897 	struct virtio_net_hdr *hdr = NULL;
2898 	uint16_t vec_idx;
2899 	struct vhost_async *async = vq->async;
2900 	struct async_inflight_info *pkts_info;
2901 
2902 	/*
2903 	 * The caller has checked the descriptors chain is larger than the
2904 	 * header size.
2905 	 */
2906 
2907 	if (virtio_net_with_host_offload(dev)) {
2908 		if (unlikely(buf_vec[0].buf_len < sizeof(struct virtio_net_hdr))) {
2909 			/*
2910 			 * No luck, the virtio-net header doesn't fit
2911 			 * in a contiguous virtual area.
2912 			 */
2913 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2914 			hdr = &tmp_hdr;
2915 		} else {
2916 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_vec[0].buf_addr);
2917 		}
2918 	}
2919 
2920 	for (vec_idx = 0; vec_idx < nr_vec; vec_idx++) {
2921 		if (buf_vec[vec_idx].buf_len > hdr_remain)
2922 			break;
2923 
2924 		hdr_remain -= buf_vec[vec_idx].buf_len;
2925 	}
2926 
2927 	buf_addr = buf_vec[vec_idx].buf_addr;
2928 	buf_iova = buf_vec[vec_idx].buf_iova;
2929 	buf_len = buf_vec[vec_idx].buf_len;
2930 	buf_offset = hdr_remain;
2931 	buf_avail = buf_vec[vec_idx].buf_len - hdr_remain;
2932 
2933 	PRINT_PACKET(dev,
2934 			(uintptr_t)(buf_addr + buf_offset),
2935 			(uint32_t)buf_avail, 0);
2936 
2937 	mbuf_offset = 0;
2938 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2939 
2940 	if (is_async) {
2941 		pkts_info = async->pkts_info;
2942 		if (async_iter_initialize(dev, async))
2943 			return -1;
2944 	}
2945 
2946 	while (1) {
2947 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2948 
2949 		if (is_async) {
2950 			if (async_fill_seg(dev, vq, cur, mbuf_offset,
2951 					   buf_iova + buf_offset, cpy_len, false) < 0)
2952 				goto error;
2953 		} else if (likely(hdr && cur == m)) {
2954 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
2955 				(void *)((uintptr_t)(buf_addr + buf_offset)),
2956 				cpy_len);
2957 		} else {
2958 			sync_fill_seg(dev, vq, cur, mbuf_offset,
2959 				      buf_addr + buf_offset,
2960 				      buf_iova + buf_offset, cpy_len, false);
2961 		}
2962 
2963 		mbuf_avail  -= cpy_len;
2964 		mbuf_offset += cpy_len;
2965 		buf_avail -= cpy_len;
2966 		buf_offset += cpy_len;
2967 
2968 		/* This buf reaches to its end, get the next one */
2969 		if (buf_avail == 0) {
2970 			if (++vec_idx >= nr_vec)
2971 				break;
2972 
2973 			buf_addr = buf_vec[vec_idx].buf_addr;
2974 			buf_iova = buf_vec[vec_idx].buf_iova;
2975 			buf_len = buf_vec[vec_idx].buf_len;
2976 
2977 			buf_offset = 0;
2978 			buf_avail  = buf_len;
2979 
2980 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2981 					(uint32_t)buf_avail, 0);
2982 		}
2983 
2984 		/*
2985 		 * This mbuf reaches to its end, get a new one
2986 		 * to hold more data.
2987 		 */
2988 		if (mbuf_avail == 0) {
2989 			cur = rte_pktmbuf_alloc(mbuf_pool);
2990 			if (unlikely(cur == NULL)) {
2991 				vq->stats.mbuf_alloc_failed++;
2992 				VHOST_DATA_LOG(dev->ifname, ERR,
2993 					"failed to allocate memory for mbuf.");
2994 				goto error;
2995 			}
2996 
2997 			prev->next = cur;
2998 			prev->data_len = mbuf_offset;
2999 			m->nb_segs += 1;
3000 			m->pkt_len += mbuf_offset;
3001 			prev = cur;
3002 
3003 			mbuf_offset = 0;
3004 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
3005 		}
3006 	}
3007 
3008 	prev->data_len = mbuf_offset;
3009 	m->pkt_len    += mbuf_offset;
3010 
3011 	if (is_async) {
3012 		async_iter_finalize(async);
3013 		if (hdr)
3014 			pkts_info[slot_idx].nethdr = *hdr;
3015 	} else if (hdr) {
3016 		vhost_dequeue_offload(dev, hdr, m, legacy_ol_flags);
3017 	}
3018 
3019 	return 0;
3020 error:
3021 	if (is_async)
3022 		async_iter_cancel(async);
3023 
3024 	return -1;
3025 }
3026 
3027 static void
3028 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
3029 {
3030 	rte_free(opaque);
3031 }
3032 
3033 static int
3034 virtio_dev_extbuf_alloc(struct virtio_net *dev, struct rte_mbuf *pkt, uint32_t size)
3035 {
3036 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
3037 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
3038 	uint16_t buf_len;
3039 	rte_iova_t iova;
3040 	void *buf;
3041 
3042 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
3043 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
3044 
3045 	if (unlikely(total_len > UINT16_MAX))
3046 		return -ENOSPC;
3047 
3048 	buf_len = total_len;
3049 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
3050 	if (unlikely(buf == NULL))
3051 		return -ENOMEM;
3052 
3053 	/* Initialize shinfo */
3054 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
3055 						virtio_dev_extbuf_free, buf);
3056 	if (unlikely(shinfo == NULL)) {
3057 		rte_free(buf);
3058 		VHOST_DATA_LOG(dev->ifname, ERR, "failed to init shinfo");
3059 		return -1;
3060 	}
3061 
3062 	iova = rte_malloc_virt2iova(buf);
3063 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
3064 	rte_pktmbuf_reset_headroom(pkt);
3065 
3066 	return 0;
3067 }
3068 
3069 /*
3070  * Prepare a host supported pktmbuf.
3071  */
3072 static __rte_always_inline int
3073 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
3074 			 uint32_t data_len)
3075 {
3076 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
3077 		return 0;
3078 
3079 	/* attach an external buffer if supported */
3080 	if (dev->extbuf && !virtio_dev_extbuf_alloc(dev, pkt, data_len))
3081 		return 0;
3082 
3083 	/* check if chained buffers are allowed */
3084 	if (!dev->linearbuf)
3085 		return 0;
3086 
3087 	return -1;
3088 }
3089 
3090 __rte_always_inline
3091 static uint16_t
3092 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3093 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3094 	bool legacy_ol_flags)
3095 	__rte_shared_locks_required(&vq->access_lock)
3096 	__rte_shared_locks_required(&vq->iotlb_lock)
3097 {
3098 	uint16_t i;
3099 	uint16_t avail_entries;
3100 	static bool allocerr_warned;
3101 
3102 	/*
3103 	 * The ordering between avail index and
3104 	 * desc reads needs to be enforced.
3105 	 */
3106 	avail_entries = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
3107 		rte_memory_order_acquire) - vq->last_avail_idx;
3108 	if (avail_entries == 0)
3109 		return 0;
3110 
3111 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3112 
3113 	VHOST_DATA_LOG(dev->ifname, DEBUG, "%s", __func__);
3114 
3115 	count = RTE_MIN(count, MAX_PKT_BURST);
3116 	count = RTE_MIN(count, avail_entries);
3117 	VHOST_DATA_LOG(dev->ifname, DEBUG, "about to dequeue %u buffers", count);
3118 
3119 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count)) {
3120 		vq->stats.mbuf_alloc_failed += count;
3121 		return 0;
3122 	}
3123 
3124 	for (i = 0; i < count; i++) {
3125 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3126 		uint16_t head_idx;
3127 		uint32_t buf_len;
3128 		uint16_t nr_vec = 0;
3129 		int err;
3130 
3131 		if (unlikely(fill_vec_buf_split(dev, vq,
3132 						vq->last_avail_idx + i,
3133 						&nr_vec, buf_vec,
3134 						&head_idx, &buf_len,
3135 						VHOST_ACCESS_RO) < 0))
3136 			break;
3137 
3138 		update_shadow_used_ring_split(vq, head_idx, 0);
3139 
3140 		if (unlikely(buf_len <= dev->vhost_hlen))
3141 			break;
3142 
3143 		buf_len -= dev->vhost_hlen;
3144 
3145 		err = virtio_dev_pktmbuf_prep(dev, pkts[i], buf_len);
3146 		if (unlikely(err)) {
3147 			/*
3148 			 * mbuf allocation fails for jumbo packets when external
3149 			 * buffer allocation is not allowed and linear buffer
3150 			 * is required. Drop this packet.
3151 			 */
3152 			if (!allocerr_warned) {
3153 				VHOST_DATA_LOG(dev->ifname, ERR,
3154 					"failed mbuf alloc of size %d from %s.",
3155 					buf_len, mbuf_pool->name);
3156 				allocerr_warned = true;
3157 			}
3158 			break;
3159 		}
3160 
3161 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
3162 				   mbuf_pool, legacy_ol_flags, 0, false);
3163 		if (unlikely(err)) {
3164 			if (!allocerr_warned) {
3165 				VHOST_DATA_LOG(dev->ifname, ERR, "failed to copy desc to mbuf.");
3166 				allocerr_warned = true;
3167 			}
3168 			break;
3169 		}
3170 	}
3171 
3172 	if (unlikely(count != i))
3173 		rte_pktmbuf_free_bulk(&pkts[i], count - i);
3174 
3175 	if (likely(vq->shadow_used_idx)) {
3176 		vq->last_avail_idx += vq->shadow_used_idx;
3177 		vhost_virtqueue_reconnect_log_split(vq);
3178 		do_data_copy_dequeue(vq);
3179 		flush_shadow_used_ring_split(dev, vq);
3180 		vhost_vring_call_split(dev, vq);
3181 	}
3182 
3183 	return i;
3184 }
3185 
3186 __rte_noinline
3187 static uint16_t
3188 virtio_dev_tx_split_legacy(struct virtio_net *dev,
3189 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3190 	struct rte_mbuf **pkts, uint16_t count)
3191 	__rte_shared_locks_required(&vq->access_lock)
3192 	__rte_shared_locks_required(&vq->iotlb_lock)
3193 {
3194 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
3195 }
3196 
3197 __rte_noinline
3198 static uint16_t
3199 virtio_dev_tx_split_compliant(struct virtio_net *dev,
3200 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3201 	struct rte_mbuf **pkts, uint16_t count)
3202 	__rte_shared_locks_required(&vq->access_lock)
3203 	__rte_shared_locks_required(&vq->iotlb_lock)
3204 {
3205 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
3206 }
3207 
3208 static __rte_always_inline int
3209 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
3210 				 struct vhost_virtqueue *vq,
3211 				 struct rte_mbuf **pkts,
3212 				 uint16_t avail_idx,
3213 				 uintptr_t *desc_addrs,
3214 				 uint16_t *ids)
3215 	__rte_shared_locks_required(&vq->iotlb_lock)
3216 {
3217 	bool wrap = vq->avail_wrap_counter;
3218 	struct vring_packed_desc *descs = vq->desc_packed;
3219 	uint64_t lens[PACKED_BATCH_SIZE];
3220 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3221 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3222 	uint16_t flags, i;
3223 
3224 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3225 		return -1;
3226 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3227 		return -1;
3228 
3229 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3230 		flags = descs[avail_idx + i].flags;
3231 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3232 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3233 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3234 			return -1;
3235 	}
3236 
3237 	rte_atomic_thread_fence(rte_memory_order_acquire);
3238 
3239 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3240 		lens[i] = descs[avail_idx + i].len;
3241 
3242 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3243 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
3244 						  descs[avail_idx + i].addr,
3245 						  &lens[i], VHOST_ACCESS_RW);
3246 	}
3247 
3248 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3249 		if (unlikely(!desc_addrs[i]))
3250 			return -1;
3251 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3252 			return -1;
3253 	}
3254 
3255 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3256 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3257 			goto err;
3258 	}
3259 
3260 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3261 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3262 
3263 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3264 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3265 			goto err;
3266 	}
3267 
3268 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3269 		pkts[i]->pkt_len = lens[i] - buf_offset;
3270 		pkts[i]->data_len = pkts[i]->pkt_len;
3271 		ids[i] = descs[avail_idx + i].id;
3272 	}
3273 
3274 	return 0;
3275 
3276 err:
3277 	return -1;
3278 }
3279 
3280 static __rte_always_inline int
3281 vhost_async_tx_batch_packed_check(struct virtio_net *dev,
3282 				 struct vhost_virtqueue *vq,
3283 				 struct rte_mbuf **pkts,
3284 				 uint16_t avail_idx,
3285 				 uintptr_t *desc_addrs,
3286 				 uint64_t *lens,
3287 				 uint16_t *ids,
3288 				 int16_t dma_id,
3289 				 uint16_t vchan_id)
3290 {
3291 	bool wrap = vq->avail_wrap_counter;
3292 	struct vring_packed_desc *descs = vq->desc_packed;
3293 	uint64_t buf_lens[PACKED_BATCH_SIZE];
3294 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3295 	uint16_t flags, i;
3296 
3297 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
3298 		return -1;
3299 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
3300 		return -1;
3301 
3302 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3303 		flags = descs[avail_idx + i].flags;
3304 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3305 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
3306 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
3307 			return -1;
3308 	}
3309 
3310 	rte_atomic_thread_fence(rte_memory_order_acquire);
3311 
3312 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3313 		lens[i] = descs[avail_idx + i].len;
3314 
3315 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3316 		desc_addrs[i] = descs[avail_idx + i].addr;
3317 	}
3318 
3319 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3320 		if (unlikely(!desc_addrs[i]))
3321 			return -1;
3322 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
3323 			return -1;
3324 	}
3325 
3326 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3327 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
3328 			goto err;
3329 	}
3330 
3331 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3332 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
3333 
3334 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3335 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
3336 			goto err;
3337 	}
3338 
3339 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3340 		pkts[i]->pkt_len = lens[i] - buf_offset;
3341 		pkts[i]->data_len = pkts[i]->pkt_len;
3342 		ids[i] = descs[avail_idx + i].id;
3343 	}
3344 
3345 	if (rte_dma_burst_capacity(dma_id, vchan_id) < PACKED_BATCH_SIZE)
3346 		return -1;
3347 
3348 	return 0;
3349 
3350 err:
3351 	return -1;
3352 }
3353 
3354 static __rte_always_inline int
3355 virtio_dev_tx_batch_packed(struct virtio_net *dev,
3356 			   struct vhost_virtqueue *vq,
3357 			   struct rte_mbuf **pkts,
3358 			   bool legacy_ol_flags)
3359 	__rte_shared_locks_required(&vq->iotlb_lock)
3360 {
3361 	uint16_t avail_idx = vq->last_avail_idx;
3362 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
3363 	struct virtio_net_hdr *hdr;
3364 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
3365 	uint16_t ids[PACKED_BATCH_SIZE];
3366 	uint16_t i;
3367 
3368 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
3369 					     desc_addrs, ids))
3370 		return -1;
3371 
3372 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3373 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
3374 
3375 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
3376 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
3377 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
3378 			   pkts[i]->pkt_len);
3379 
3380 	if (virtio_net_with_host_offload(dev)) {
3381 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
3382 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
3383 			vhost_dequeue_offload(dev, hdr, pkts[i], legacy_ol_flags);
3384 		}
3385 	}
3386 
3387 	if (virtio_net_is_inorder(dev))
3388 		vhost_shadow_dequeue_batch_packed_inorder(vq,
3389 			ids[PACKED_BATCH_SIZE - 1]);
3390 	else
3391 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
3392 
3393 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
3394 
3395 	return 0;
3396 }
3397 
3398 static __rte_always_inline int
3399 vhost_dequeue_single_packed(struct virtio_net *dev,
3400 			    struct vhost_virtqueue *vq,
3401 			    struct rte_mempool *mbuf_pool,
3402 			    struct rte_mbuf *pkts,
3403 			    uint16_t *buf_id,
3404 			    uint16_t *desc_count,
3405 			    bool legacy_ol_flags)
3406 	__rte_shared_locks_required(&vq->access_lock)
3407 	__rte_shared_locks_required(&vq->iotlb_lock)
3408 {
3409 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3410 	uint32_t buf_len;
3411 	uint16_t nr_vec = 0;
3412 	int err;
3413 	static bool allocerr_warned;
3414 
3415 	if (unlikely(fill_vec_buf_packed(dev, vq,
3416 					 vq->last_avail_idx, desc_count,
3417 					 buf_vec, &nr_vec,
3418 					 buf_id, &buf_len,
3419 					 VHOST_ACCESS_RO) < 0))
3420 		return -1;
3421 
3422 	if (unlikely(buf_len <= dev->vhost_hlen))
3423 		return -1;
3424 
3425 	buf_len -= dev->vhost_hlen;
3426 
3427 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3428 		if (!allocerr_warned) {
3429 			VHOST_DATA_LOG(dev->ifname, ERR,
3430 				"failed mbuf alloc of size %d from %s.",
3431 				buf_len, mbuf_pool->name);
3432 			allocerr_warned = true;
3433 		}
3434 		return -1;
3435 	}
3436 
3437 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
3438 			   mbuf_pool, legacy_ol_flags, 0, false);
3439 	if (unlikely(err)) {
3440 		if (!allocerr_warned) {
3441 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to copy desc to mbuf.");
3442 			allocerr_warned = true;
3443 		}
3444 		return -1;
3445 	}
3446 
3447 	return 0;
3448 }
3449 
3450 static __rte_always_inline int
3451 virtio_dev_tx_single_packed(struct virtio_net *dev,
3452 			    struct vhost_virtqueue *vq,
3453 			    struct rte_mempool *mbuf_pool,
3454 			    struct rte_mbuf *pkts,
3455 			    bool legacy_ol_flags)
3456 	__rte_shared_locks_required(&vq->access_lock)
3457 	__rte_shared_locks_required(&vq->iotlb_lock)
3458 {
3459 
3460 	uint16_t buf_id, desc_count = 0;
3461 	int ret;
3462 
3463 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
3464 					&desc_count, legacy_ol_flags);
3465 
3466 	if (likely(desc_count > 0)) {
3467 		if (virtio_net_is_inorder(dev))
3468 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
3469 								   desc_count);
3470 		else
3471 			vhost_shadow_dequeue_single_packed(vq, buf_id,
3472 					desc_count);
3473 
3474 		vq_inc_last_avail_packed(vq, desc_count);
3475 	}
3476 
3477 	return ret;
3478 }
3479 
3480 static __rte_always_inline uint16_t
3481 get_nb_avail_entries_packed(const struct vhost_virtqueue *__rte_restrict vq,
3482 			    uint16_t max_nb_avail_entries)
3483 {
3484 	const struct vring_packed_desc *descs = vq->desc_packed;
3485 	bool avail_wrap = vq->avail_wrap_counter;
3486 	uint16_t avail_idx = vq->last_avail_idx;
3487 	uint16_t nb_avail_entries = 0;
3488 	uint16_t flags;
3489 
3490 	while (nb_avail_entries < max_nb_avail_entries) {
3491 		flags = descs[avail_idx].flags;
3492 
3493 		if ((avail_wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
3494 		    (avail_wrap == !!(flags & VRING_DESC_F_USED)))
3495 			return nb_avail_entries;
3496 
3497 		if (!(flags & VRING_DESC_F_NEXT))
3498 			++nb_avail_entries;
3499 
3500 		if (unlikely(++avail_idx >= vq->size)) {
3501 			avail_idx -= vq->size;
3502 			avail_wrap = !avail_wrap;
3503 		}
3504 	}
3505 
3506 	return nb_avail_entries;
3507 }
3508 
3509 __rte_always_inline
3510 static uint16_t
3511 virtio_dev_tx_packed(struct virtio_net *dev,
3512 		     struct vhost_virtqueue *__rte_restrict vq,
3513 		     struct rte_mempool *mbuf_pool,
3514 		     struct rte_mbuf **__rte_restrict pkts,
3515 		     uint32_t count,
3516 		     bool legacy_ol_flags)
3517 	__rte_shared_locks_required(&vq->access_lock)
3518 	__rte_shared_locks_required(&vq->iotlb_lock)
3519 {
3520 	uint32_t pkt_idx = 0;
3521 
3522 	count = get_nb_avail_entries_packed(vq, count);
3523 	if (count == 0)
3524 		return 0;
3525 
3526 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count)) {
3527 		vq->stats.mbuf_alloc_failed += count;
3528 		return 0;
3529 	}
3530 
3531 	do {
3532 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
3533 
3534 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
3535 			if (!virtio_dev_tx_batch_packed(dev, vq,
3536 							&pkts[pkt_idx],
3537 							legacy_ol_flags)) {
3538 				pkt_idx += PACKED_BATCH_SIZE;
3539 				continue;
3540 			}
3541 		}
3542 
3543 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
3544 						pkts[pkt_idx],
3545 						legacy_ol_flags))
3546 			break;
3547 		pkt_idx++;
3548 	} while (pkt_idx < count);
3549 
3550 	if (pkt_idx != count)
3551 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
3552 
3553 	if (vq->shadow_used_idx) {
3554 		do_data_copy_dequeue(vq);
3555 
3556 		vhost_flush_dequeue_shadow_packed(dev, vq);
3557 		vhost_vring_call_packed(dev, vq);
3558 	}
3559 
3560 	return pkt_idx;
3561 }
3562 
3563 __rte_noinline
3564 static uint16_t
3565 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
3566 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3567 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3568 	__rte_shared_locks_required(&vq->access_lock)
3569 	__rte_shared_locks_required(&vq->iotlb_lock)
3570 {
3571 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
3572 }
3573 
3574 __rte_noinline
3575 static uint16_t
3576 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
3577 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3578 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3579 	__rte_shared_locks_required(&vq->access_lock)
3580 	__rte_shared_locks_required(&vq->iotlb_lock)
3581 {
3582 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
3583 }
3584 
3585 uint16_t
3586 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
3587 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
3588 {
3589 	struct virtio_net *dev;
3590 	struct rte_mbuf *rarp_mbuf = NULL;
3591 	struct vhost_virtqueue *vq;
3592 	int16_t success = 1;
3593 
3594 	dev = get_device(vid);
3595 	if (!dev)
3596 		return 0;
3597 
3598 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
3599 		VHOST_DATA_LOG(dev->ifname, ERR,
3600 			"%s: built-in vhost net backend is disabled.",
3601 			__func__);
3602 		return 0;
3603 	}
3604 
3605 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
3606 		VHOST_DATA_LOG(dev->ifname, ERR,
3607 			"%s: invalid virtqueue idx %d.",
3608 			__func__, queue_id);
3609 		return 0;
3610 	}
3611 
3612 	vq = dev->virtqueue[queue_id];
3613 
3614 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
3615 		return 0;
3616 
3617 	if (unlikely(!vq->enabled)) {
3618 		count = 0;
3619 		goto out_access_unlock;
3620 	}
3621 
3622 	vhost_user_iotlb_rd_lock(vq);
3623 
3624 	if (unlikely(!vq->access_ok)) {
3625 		vhost_user_iotlb_rd_unlock(vq);
3626 		rte_rwlock_read_unlock(&vq->access_lock);
3627 
3628 		virtio_dev_vring_translate(dev, vq);
3629 		goto out_no_unlock;
3630 	}
3631 
3632 	/*
3633 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
3634 	 * array, to looks like that guest actually send such packet.
3635 	 *
3636 	 * Check user_send_rarp() for more information.
3637 	 *
3638 	 * broadcast_rarp shares a cacheline in the virtio_net structure
3639 	 * with some fields that are accessed during enqueue and
3640 	 * rte_atomic_compare_exchange_strong_explicit causes a write if performed compare
3641 	 * and exchange. This could result in false sharing between enqueue
3642 	 * and dequeue.
3643 	 *
3644 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
3645 	 * and only performing compare and exchange if the read indicates it
3646 	 * is likely to be set.
3647 	 */
3648 	if (unlikely(rte_atomic_load_explicit(&dev->broadcast_rarp, rte_memory_order_acquire) &&
3649 			rte_atomic_compare_exchange_strong_explicit(&dev->broadcast_rarp,
3650 			&success, 0, rte_memory_order_release, rte_memory_order_relaxed))) {
3651 
3652 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
3653 		if (rarp_mbuf == NULL) {
3654 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to make RARP packet.");
3655 			count = 0;
3656 			goto out;
3657 		}
3658 		/*
3659 		 * Inject it to the head of "pkts" array, so that switch's mac
3660 		 * learning table will get updated first.
3661 		 */
3662 		pkts[0] = rarp_mbuf;
3663 		vhost_queue_stats_update(dev, vq, pkts, 1);
3664 		pkts++;
3665 		count -= 1;
3666 	}
3667 
3668 	if (vq_is_packed(dev)) {
3669 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3670 			count = virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool, pkts, count);
3671 		else
3672 			count = virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool, pkts, count);
3673 	} else {
3674 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3675 			count = virtio_dev_tx_split_legacy(dev, vq, mbuf_pool, pkts, count);
3676 		else
3677 			count = virtio_dev_tx_split_compliant(dev, vq, mbuf_pool, pkts, count);
3678 	}
3679 
3680 	vhost_queue_stats_update(dev, vq, pkts, count);
3681 
3682 out:
3683 	vhost_user_iotlb_rd_unlock(vq);
3684 
3685 out_access_unlock:
3686 	rte_rwlock_read_unlock(&vq->access_lock);
3687 
3688 	if (unlikely(rarp_mbuf != NULL))
3689 		count += 1;
3690 
3691 out_no_unlock:
3692 	return count;
3693 }
3694 
3695 static __rte_always_inline uint16_t
3696 async_poll_dequeue_completed(struct virtio_net *dev, struct vhost_virtqueue *vq,
3697 		struct rte_mbuf **pkts, uint16_t count, int16_t dma_id,
3698 		uint16_t vchan_id, bool legacy_ol_flags)
3699 	__rte_shared_locks_required(&vq->access_lock)
3700 {
3701 	uint16_t start_idx, from, i;
3702 	uint16_t nr_cpl_pkts = 0;
3703 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
3704 
3705 	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
3706 
3707 	start_idx = async_get_first_inflight_pkt_idx(vq);
3708 
3709 	from = start_idx;
3710 	while (vq->async->pkts_cmpl_flag[from] && count--) {
3711 		vq->async->pkts_cmpl_flag[from] = false;
3712 		from = (from + 1) % vq->size;
3713 		nr_cpl_pkts++;
3714 	}
3715 
3716 	if (nr_cpl_pkts == 0)
3717 		return 0;
3718 
3719 	for (i = 0; i < nr_cpl_pkts; i++) {
3720 		from = (start_idx + i) % vq->size;
3721 		pkts[i] = pkts_info[from].mbuf;
3722 
3723 		if (virtio_net_with_host_offload(dev))
3724 			vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i],
3725 					      legacy_ol_flags);
3726 	}
3727 
3728 	/* write back completed descs to used ring and update used idx */
3729 	if (vq_is_packed(dev)) {
3730 		write_back_completed_descs_packed(vq, nr_cpl_pkts);
3731 		vhost_vring_call_packed(dev, vq);
3732 	} else {
3733 		write_back_completed_descs_split(vq, nr_cpl_pkts);
3734 		rte_atomic_fetch_add_explicit((unsigned short __rte_atomic *)&vq->used->idx,
3735 			nr_cpl_pkts, rte_memory_order_release);
3736 		vhost_vring_call_split(dev, vq);
3737 	}
3738 	vq->async->pkts_inflight_n -= nr_cpl_pkts;
3739 
3740 	return nr_cpl_pkts;
3741 }
3742 
3743 static __rte_always_inline uint16_t
3744 virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
3745 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
3746 		int16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
3747 	__rte_shared_locks_required(&vq->access_lock)
3748 	__rte_shared_locks_required(&vq->iotlb_lock)
3749 {
3750 	static bool allocerr_warned;
3751 	bool dropped = false;
3752 	uint16_t avail_entries;
3753 	uint16_t pkt_idx, slot_idx = 0;
3754 	uint16_t nr_done_pkts = 0;
3755 	uint16_t pkt_err = 0;
3756 	uint16_t n_xfer;
3757 	struct vhost_async *async = vq->async;
3758 	struct async_inflight_info *pkts_info = async->pkts_info;
3759 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
3760 	uint16_t pkts_size = count;
3761 
3762 	/**
3763 	 * The ordering between avail index and
3764 	 * desc reads needs to be enforced.
3765 	 */
3766 	avail_entries = rte_atomic_load_explicit((unsigned short __rte_atomic *)&vq->avail->idx,
3767 		rte_memory_order_acquire) - vq->last_avail_idx;
3768 	if (avail_entries == 0)
3769 		goto out;
3770 
3771 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
3772 
3773 	async_iter_reset(async);
3774 
3775 	count = RTE_MIN(count, MAX_PKT_BURST);
3776 	count = RTE_MIN(count, avail_entries);
3777 	VHOST_DATA_LOG(dev->ifname, DEBUG, "about to dequeue %u buffers", count);
3778 
3779 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count)) {
3780 		vq->stats.mbuf_alloc_failed += count;
3781 		goto out;
3782 	}
3783 
3784 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
3785 		uint16_t head_idx = 0;
3786 		uint16_t nr_vec = 0;
3787 		uint16_t to;
3788 		uint32_t buf_len;
3789 		int err;
3790 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
3791 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
3792 
3793 		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
3794 						&nr_vec, buf_vec,
3795 						&head_idx, &buf_len,
3796 						VHOST_ACCESS_RO) < 0)) {
3797 			dropped = true;
3798 			break;
3799 		}
3800 
3801 		if (unlikely(buf_len <= dev->vhost_hlen)) {
3802 			dropped = true;
3803 			break;
3804 		}
3805 
3806 		buf_len -= dev->vhost_hlen;
3807 
3808 		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
3809 		if (unlikely(err)) {
3810 			/**
3811 			 * mbuf allocation fails for jumbo packets when external
3812 			 * buffer allocation is not allowed and linear buffer
3813 			 * is required. Drop this packet.
3814 			 */
3815 			if (!allocerr_warned) {
3816 				VHOST_DATA_LOG(dev->ifname, ERR,
3817 					"%s: Failed mbuf alloc of size %d from %s",
3818 					__func__, buf_len, mbuf_pool->name);
3819 				allocerr_warned = true;
3820 			}
3821 			dropped = true;
3822 			slot_idx--;
3823 			break;
3824 		}
3825 
3826 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
3827 		err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
3828 					legacy_ol_flags, slot_idx, true);
3829 		if (unlikely(err)) {
3830 			if (!allocerr_warned) {
3831 				VHOST_DATA_LOG(dev->ifname, ERR,
3832 					"%s: Failed to offload copies to async channel.",
3833 					__func__);
3834 				allocerr_warned = true;
3835 			}
3836 			dropped = true;
3837 			slot_idx--;
3838 			break;
3839 		}
3840 
3841 		pkts_info[slot_idx].mbuf = pkt;
3842 
3843 		/* store used descs */
3844 		to = async->desc_idx_split & (vq->size - 1);
3845 		async->descs_split[to].id = head_idx;
3846 		async->descs_split[to].len = 0;
3847 		async->desc_idx_split++;
3848 
3849 		vq->last_avail_idx++;
3850 		vhost_virtqueue_reconnect_log_split(vq);
3851 	}
3852 
3853 	if (unlikely(dropped))
3854 		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
3855 
3856 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
3857 					  async->iov_iter, pkt_idx);
3858 
3859 	async->pkts_inflight_n += n_xfer;
3860 
3861 	pkt_err = pkt_idx - n_xfer;
3862 	if (unlikely(pkt_err)) {
3863 		VHOST_DATA_LOG(dev->ifname, DEBUG, "%s: failed to transfer data.",
3864 			__func__);
3865 
3866 		pkt_idx = n_xfer;
3867 		/* recover available ring */
3868 		vq->last_avail_idx -= pkt_err;
3869 		vhost_virtqueue_reconnect_log_split(vq);
3870 
3871 		/**
3872 		 * recover async channel copy related structures and free pktmbufs
3873 		 * for error pkts.
3874 		 */
3875 		async->desc_idx_split -= pkt_err;
3876 		while (pkt_err-- > 0) {
3877 			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
3878 			slot_idx--;
3879 		}
3880 	}
3881 
3882 	async->pkts_idx += pkt_idx;
3883 	if (async->pkts_idx >= vq->size)
3884 		async->pkts_idx -= vq->size;
3885 
3886 out:
3887 	/* DMA device may serve other queues, unconditionally check completed. */
3888 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, pkts_size,
3889 							dma_id, vchan_id, legacy_ol_flags);
3890 
3891 	return nr_done_pkts;
3892 }
3893 
3894 __rte_noinline
3895 static uint16_t
3896 virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
3897 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3898 		struct rte_mbuf **pkts, uint16_t count,
3899 		int16_t dma_id, uint16_t vchan_id)
3900 	__rte_shared_locks_required(&vq->access_lock)
3901 	__rte_shared_locks_required(&vq->iotlb_lock)
3902 {
3903 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3904 				pkts, count, dma_id, vchan_id, true);
3905 }
3906 
3907 __rte_noinline
3908 static uint16_t
3909 virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
3910 		struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
3911 		struct rte_mbuf **pkts, uint16_t count,
3912 		int16_t dma_id, uint16_t vchan_id)
3913 	__rte_shared_locks_required(&vq->access_lock)
3914 	__rte_shared_locks_required(&vq->iotlb_lock)
3915 {
3916 	return virtio_dev_tx_async_split(dev, vq, mbuf_pool,
3917 				pkts, count, dma_id, vchan_id, false);
3918 }
3919 
3920 static __rte_always_inline void
3921 vhost_async_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
3922 				uint16_t buf_id, uint16_t count)
3923 	__rte_shared_locks_required(&vq->access_lock)
3924 {
3925 	struct vhost_async *async = vq->async;
3926 	uint16_t idx = async->buffer_idx_packed;
3927 
3928 	async->buffers_packed[idx].id = buf_id;
3929 	async->buffers_packed[idx].len = 0;
3930 	async->buffers_packed[idx].count = count;
3931 
3932 	async->buffer_idx_packed++;
3933 	if (async->buffer_idx_packed >= vq->size)
3934 		async->buffer_idx_packed -= vq->size;
3935 
3936 }
3937 
3938 static __rte_always_inline int
3939 virtio_dev_tx_async_single_packed(struct virtio_net *dev,
3940 			struct vhost_virtqueue *vq,
3941 			struct rte_mempool *mbuf_pool,
3942 			struct rte_mbuf *pkts,
3943 			uint16_t slot_idx,
3944 			bool legacy_ol_flags)
3945 	__rte_shared_locks_required(&vq->access_lock)
3946 	__rte_shared_locks_required(&vq->iotlb_lock)
3947 {
3948 	int err;
3949 	uint16_t buf_id, desc_count = 0;
3950 	uint16_t nr_vec = 0;
3951 	uint32_t buf_len;
3952 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
3953 	struct vhost_async *async = vq->async;
3954 	struct async_inflight_info *pkts_info = async->pkts_info;
3955 	static bool allocerr_warned;
3956 
3957 	if (unlikely(fill_vec_buf_packed(dev, vq, vq->last_avail_idx, &desc_count,
3958 					 buf_vec, &nr_vec, &buf_id, &buf_len,
3959 					 VHOST_ACCESS_RO) < 0))
3960 		return -1;
3961 
3962 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
3963 		if (!allocerr_warned) {
3964 			VHOST_DATA_LOG(dev->ifname, ERR, "Failed mbuf alloc of size %d from %s.",
3965 				buf_len, mbuf_pool->name);
3966 
3967 			allocerr_warned = true;
3968 		}
3969 		return -1;
3970 	}
3971 
3972 	err = desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts, mbuf_pool,
3973 		legacy_ol_flags, slot_idx, true);
3974 	if (unlikely(err)) {
3975 		rte_pktmbuf_free(pkts);
3976 		if (!allocerr_warned) {
3977 			VHOST_DATA_LOG(dev->ifname, ERR, "Failed to copy desc to mbuf on.");
3978 			allocerr_warned = true;
3979 		}
3980 		return -1;
3981 	}
3982 
3983 	pkts_info[slot_idx].descs = desc_count;
3984 
3985 	/* update async shadow packed ring */
3986 	vhost_async_shadow_dequeue_single_packed(vq, buf_id, desc_count);
3987 
3988 	vq_inc_last_avail_packed(vq, desc_count);
3989 
3990 	return err;
3991 }
3992 
3993 static __rte_always_inline int
3994 virtio_dev_tx_async_packed_batch(struct virtio_net *dev,
3995 			   struct vhost_virtqueue *vq,
3996 			   struct rte_mbuf **pkts, uint16_t slot_idx,
3997 			   uint16_t dma_id, uint16_t vchan_id)
3998 	__rte_shared_locks_required(&vq->access_lock)
3999 	__rte_shared_locks_required(&vq->iotlb_lock)
4000 {
4001 	uint16_t avail_idx = vq->last_avail_idx;
4002 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
4003 	struct vhost_async *async = vq->async;
4004 	struct async_inflight_info *pkts_info = async->pkts_info;
4005 	struct virtio_net_hdr *hdr;
4006 	uint32_t mbuf_offset = 0;
4007 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
4008 	uint64_t desc_vva;
4009 	uint64_t lens[PACKED_BATCH_SIZE];
4010 	void *host_iova[PACKED_BATCH_SIZE];
4011 	uint64_t mapped_len[PACKED_BATCH_SIZE];
4012 	uint16_t ids[PACKED_BATCH_SIZE];
4013 	uint16_t i;
4014 
4015 	if (vhost_async_tx_batch_packed_check(dev, vq, pkts, avail_idx,
4016 					     desc_addrs, lens, ids, dma_id, vchan_id))
4017 		return -1;
4018 
4019 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
4020 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
4021 
4022 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4023 		host_iova[i] = (void *)(uintptr_t)gpa_to_first_hpa(dev,
4024 			desc_addrs[i] + buf_offset, pkts[i]->pkt_len, &mapped_len[i]);
4025 	}
4026 
4027 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4028 		async_iter_initialize(dev, async);
4029 		async_iter_add_iovec(dev, async,
4030 		host_iova[i],
4031 		(void *)(uintptr_t)rte_pktmbuf_iova_offset(pkts[i], mbuf_offset),
4032 		mapped_len[i]);
4033 		async->iter_idx++;
4034 	}
4035 
4036 	if (virtio_net_with_host_offload(dev)) {
4037 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
4038 			desc_vva = vhost_iova_to_vva(dev, vq, desc_addrs[i],
4039 						&lens[i], VHOST_ACCESS_RO);
4040 			hdr = (struct virtio_net_hdr *)(uintptr_t)desc_vva;
4041 			pkts_info[slot_idx + i].nethdr = *hdr;
4042 		}
4043 	}
4044 
4045 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
4046 
4047 	vhost_async_shadow_dequeue_packed_batch(vq, ids);
4048 
4049 	return 0;
4050 }
4051 
4052 static __rte_always_inline uint16_t
4053 virtio_dev_tx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
4054 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4055 		uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
4056 	__rte_shared_locks_required(&vq->access_lock)
4057 	__rte_shared_locks_required(&vq->iotlb_lock)
4058 {
4059 	uint32_t pkt_idx = 0;
4060 	uint16_t slot_idx = 0;
4061 	uint16_t nr_done_pkts = 0;
4062 	uint16_t pkt_err = 0;
4063 	uint32_t n_xfer;
4064 	uint16_t i;
4065 	struct vhost_async *async = vq->async;
4066 	struct async_inflight_info *pkts_info = async->pkts_info;
4067 	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
4068 
4069 	VHOST_DATA_LOG(dev->ifname, DEBUG, "(%d) about to dequeue %u buffers", dev->vid, count);
4070 
4071 	async_iter_reset(async);
4072 
4073 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count)) {
4074 		vq->stats.mbuf_alloc_failed += count;
4075 		goto out;
4076 	}
4077 
4078 	do {
4079 		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
4080 
4081 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
4082 
4083 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4084 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
4085 			if (!virtio_dev_tx_async_packed_batch(dev, vq, &pkts_prealloc[pkt_idx],
4086 						slot_idx, dma_id, vchan_id)) {
4087 				for (i = 0; i < PACKED_BATCH_SIZE; i++) {
4088 					slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
4089 					pkts_info[slot_idx].descs = 1;
4090 					pkts_info[slot_idx].nr_buffers = 1;
4091 					pkts_info[slot_idx].mbuf = pkts_prealloc[pkt_idx];
4092 					pkt_idx++;
4093 				}
4094 				continue;
4095 			}
4096 		}
4097 
4098 		if (unlikely(virtio_dev_tx_async_single_packed(dev, vq, mbuf_pool, pkt,
4099 				slot_idx, legacy_ol_flags))) {
4100 			rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
4101 
4102 			if (slot_idx == 0)
4103 				slot_idx = vq->size - 1;
4104 			else
4105 				slot_idx--;
4106 
4107 			break;
4108 		}
4109 
4110 		pkts_info[slot_idx].mbuf = pkt;
4111 		pkt_idx++;
4112 	} while (pkt_idx < count);
4113 
4114 	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
4115 					async->iov_iter, pkt_idx);
4116 
4117 	async->pkts_inflight_n += n_xfer;
4118 
4119 	pkt_err = pkt_idx - n_xfer;
4120 
4121 	if (unlikely(pkt_err)) {
4122 		uint16_t descs_err = 0;
4123 
4124 		pkt_idx -= pkt_err;
4125 
4126 		/**
4127 		 * recover DMA-copy related structures and free pktmbuf for DMA-error pkts.
4128 		 */
4129 		if (async->buffer_idx_packed >= pkt_err)
4130 			async->buffer_idx_packed -= pkt_err;
4131 		else
4132 			async->buffer_idx_packed += vq->size - pkt_err;
4133 
4134 		while (pkt_err-- > 0) {
4135 			rte_pktmbuf_free(pkts_info[slot_idx].mbuf);
4136 			descs_err += pkts_info[slot_idx].descs;
4137 
4138 			if (slot_idx == 0)
4139 				slot_idx = vq->size - 1;
4140 			else
4141 				slot_idx--;
4142 		}
4143 
4144 		/* recover available ring */
4145 		if (vq->last_avail_idx >= descs_err) {
4146 			vq->last_avail_idx -= descs_err;
4147 		} else {
4148 			vq->last_avail_idx += vq->size - descs_err;
4149 			vq->avail_wrap_counter ^= 1;
4150 		}
4151 		vhost_virtqueue_reconnect_log_packed(vq);
4152 	}
4153 
4154 	async->pkts_idx += pkt_idx;
4155 	if (async->pkts_idx >= vq->size)
4156 		async->pkts_idx -= vq->size;
4157 
4158 out:
4159 	nr_done_pkts = async_poll_dequeue_completed(dev, vq, pkts, count,
4160 					dma_id, vchan_id, legacy_ol_flags);
4161 
4162 	return nr_done_pkts;
4163 }
4164 
4165 __rte_noinline
4166 static uint16_t
4167 virtio_dev_tx_async_packed_legacy(struct virtio_net *dev, struct vhost_virtqueue *vq,
4168 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4169 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4170 	__rte_shared_locks_required(&vq->access_lock)
4171 	__rte_shared_locks_required(&vq->iotlb_lock)
4172 {
4173 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4174 				pkts, count, dma_id, vchan_id, true);
4175 }
4176 
4177 __rte_noinline
4178 static uint16_t
4179 virtio_dev_tx_async_packed_compliant(struct virtio_net *dev, struct vhost_virtqueue *vq,
4180 		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
4181 		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
4182 	__rte_shared_locks_required(&vq->access_lock)
4183 	__rte_shared_locks_required(&vq->iotlb_lock)
4184 {
4185 	return virtio_dev_tx_async_packed(dev, vq, mbuf_pool,
4186 				pkts, count, dma_id, vchan_id, false);
4187 }
4188 
4189 uint16_t
4190 rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
4191 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
4192 	int *nr_inflight, int16_t dma_id, uint16_t vchan_id)
4193 {
4194 	struct virtio_net *dev;
4195 	struct rte_mbuf *rarp_mbuf = NULL;
4196 	struct vhost_virtqueue *vq;
4197 	int16_t success = 1;
4198 
4199 	dev = get_device(vid);
4200 	if (!dev || !nr_inflight)
4201 		return 0;
4202 
4203 	*nr_inflight = -1;
4204 
4205 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
4206 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: built-in vhost net backend is disabled.",
4207 			__func__);
4208 		return 0;
4209 	}
4210 
4211 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
4212 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid virtqueue idx %d.",
4213 			__func__, queue_id);
4214 		return 0;
4215 	}
4216 
4217 	if (unlikely(dma_id < 0 || dma_id >= RTE_DMADEV_DEFAULT_MAX)) {
4218 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid dma id %d.",
4219 			__func__, dma_id);
4220 		return 0;
4221 	}
4222 
4223 	if (unlikely(!dma_copy_track[dma_id].vchans ||
4224 				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
4225 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: invalid channel %d:%u.",
4226 			__func__, dma_id, vchan_id);
4227 		return 0;
4228 	}
4229 
4230 	vq = dev->virtqueue[queue_id];
4231 
4232 	if (unlikely(rte_rwlock_read_trylock(&vq->access_lock) != 0))
4233 		return 0;
4234 
4235 	if (unlikely(vq->enabled == 0)) {
4236 		count = 0;
4237 		goto out_access_unlock;
4238 	}
4239 
4240 	if (unlikely(!vq->async)) {
4241 		VHOST_DATA_LOG(dev->ifname, ERR, "%s: async not registered for queue id %d.",
4242 			__func__, queue_id);
4243 		count = 0;
4244 		goto out_access_unlock;
4245 	}
4246 
4247 	vhost_user_iotlb_rd_lock(vq);
4248 
4249 	if (unlikely(vq->access_ok == 0)) {
4250 		vhost_user_iotlb_rd_unlock(vq);
4251 		rte_rwlock_read_unlock(&vq->access_lock);
4252 
4253 		virtio_dev_vring_translate(dev, vq);
4254 		count = 0;
4255 		goto out_no_unlock;
4256 	}
4257 
4258 	/*
4259 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
4260 	 * array, to looks like that guest actually send such packet.
4261 	 *
4262 	 * Check user_send_rarp() for more information.
4263 	 *
4264 	 * broadcast_rarp shares a cacheline in the virtio_net structure
4265 	 * with some fields that are accessed during enqueue and
4266 	 * rte_atomic_compare_exchange_strong_explicit causes a write if performed compare
4267 	 * and exchange. This could result in false sharing between enqueue
4268 	 * and dequeue.
4269 	 *
4270 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
4271 	 * and only performing compare and exchange if the read indicates it
4272 	 * is likely to be set.
4273 	 */
4274 	if (unlikely(rte_atomic_load_explicit(&dev->broadcast_rarp, rte_memory_order_acquire) &&
4275 			rte_atomic_compare_exchange_strong_explicit(&dev->broadcast_rarp,
4276 			&success, 0, rte_memory_order_release, rte_memory_order_relaxed))) {
4277 
4278 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
4279 		if (rarp_mbuf == NULL) {
4280 			VHOST_DATA_LOG(dev->ifname, ERR, "failed to make RARP packet.");
4281 			count = 0;
4282 			goto out;
4283 		}
4284 		/*
4285 		 * Inject it to the head of "pkts" array, so that switch's mac
4286 		 * learning table will get updated first.
4287 		 */
4288 		pkts[0] = rarp_mbuf;
4289 		vhost_queue_stats_update(dev, vq, pkts, 1);
4290 		pkts++;
4291 		count -= 1;
4292 	}
4293 
4294 	if (vq_is_packed(dev)) {
4295 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4296 			count = virtio_dev_tx_async_packed_legacy(dev, vq, mbuf_pool,
4297 					pkts, count, dma_id, vchan_id);
4298 		else
4299 			count = virtio_dev_tx_async_packed_compliant(dev, vq, mbuf_pool,
4300 					pkts, count, dma_id, vchan_id);
4301 	} else {
4302 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
4303 			count = virtio_dev_tx_async_split_legacy(dev, vq, mbuf_pool,
4304 					pkts, count, dma_id, vchan_id);
4305 		else
4306 			count = virtio_dev_tx_async_split_compliant(dev, vq, mbuf_pool,
4307 					pkts, count, dma_id, vchan_id);
4308 	}
4309 
4310 	*nr_inflight = vq->async->pkts_inflight_n;
4311 	vhost_queue_stats_update(dev, vq, pkts, count);
4312 
4313 out:
4314 	vhost_user_iotlb_rd_unlock(vq);
4315 
4316 out_access_unlock:
4317 	rte_rwlock_read_unlock(&vq->access_lock);
4318 
4319 	if (unlikely(rarp_mbuf != NULL))
4320 		count += 1;
4321 
4322 out_no_unlock:
4323 	return count;
4324 }
4325