xref: /dpdk/lib/vhost/virtio_net.c (revision 5a36ec735130f81a187ef65c4bc43c445a89568d)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_vhost.h>
15 #include <rte_tcp.h>
16 #include <rte_udp.h>
17 #include <rte_sctp.h>
18 #include <rte_arp.h>
19 #include <rte_spinlock.h>
20 #include <rte_malloc.h>
21 #include <rte_vhost_async.h>
22 
23 #include "iotlb.h"
24 #include "vhost.h"
25 
26 #define MAX_BATCH_LEN 256
27 
28 #define VHOST_ASYNC_BATCH_THRESHOLD 32
29 
30 static  __rte_always_inline bool
31 rxvq_is_mergeable(struct virtio_net *dev)
32 {
33 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
34 }
35 
36 static  __rte_always_inline bool
37 virtio_net_is_inorder(struct virtio_net *dev)
38 {
39 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
40 }
41 
42 static bool
43 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
44 {
45 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
46 }
47 
48 static inline void
49 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
50 {
51 	struct batch_copy_elem *elem = vq->batch_copy_elems;
52 	uint16_t count = vq->batch_copy_nb_elems;
53 	int i;
54 
55 	for (i = 0; i < count; i++) {
56 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
57 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
58 					   elem[i].len);
59 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
60 	}
61 
62 	vq->batch_copy_nb_elems = 0;
63 }
64 
65 static inline void
66 do_data_copy_dequeue(struct vhost_virtqueue *vq)
67 {
68 	struct batch_copy_elem *elem = vq->batch_copy_elems;
69 	uint16_t count = vq->batch_copy_nb_elems;
70 	int i;
71 
72 	for (i = 0; i < count; i++)
73 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
74 
75 	vq->batch_copy_nb_elems = 0;
76 }
77 
78 static __rte_always_inline void
79 do_flush_shadow_used_ring_split(struct virtio_net *dev,
80 			struct vhost_virtqueue *vq,
81 			uint16_t to, uint16_t from, uint16_t size)
82 {
83 	rte_memcpy(&vq->used->ring[to],
84 			&vq->shadow_used_split[from],
85 			size * sizeof(struct vring_used_elem));
86 	vhost_log_cache_used_vring(dev, vq,
87 			offsetof(struct vring_used, ring[to]),
88 			size * sizeof(struct vring_used_elem));
89 }
90 
91 static __rte_always_inline void
92 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
93 {
94 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
95 
96 	if (used_idx + vq->shadow_used_idx <= vq->size) {
97 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
98 					  vq->shadow_used_idx);
99 	} else {
100 		uint16_t size;
101 
102 		/* update used ring interval [used_idx, vq->size] */
103 		size = vq->size - used_idx;
104 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
105 
106 		/* update the left half used ring interval [0, left_size] */
107 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
108 					  vq->shadow_used_idx - size);
109 	}
110 	vq->last_used_idx += vq->shadow_used_idx;
111 
112 	vhost_log_cache_sync(dev, vq);
113 
114 	__atomic_add_fetch(&vq->used->idx, vq->shadow_used_idx,
115 			   __ATOMIC_RELEASE);
116 	vq->shadow_used_idx = 0;
117 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
118 		sizeof(vq->used->idx));
119 }
120 
121 static __rte_always_inline void
122 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
123 			 uint16_t desc_idx, uint32_t len)
124 {
125 	uint16_t i = vq->shadow_used_idx++;
126 
127 	vq->shadow_used_split[i].id  = desc_idx;
128 	vq->shadow_used_split[i].len = len;
129 }
130 
131 static __rte_always_inline void
132 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
133 				  struct vhost_virtqueue *vq)
134 {
135 	int i;
136 	uint16_t used_idx = vq->last_used_idx;
137 	uint16_t head_idx = vq->last_used_idx;
138 	uint16_t head_flags = 0;
139 
140 	/* Split loop in two to save memory barriers */
141 	for (i = 0; i < vq->shadow_used_idx; i++) {
142 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
143 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
144 
145 		used_idx += vq->shadow_used_packed[i].count;
146 		if (used_idx >= vq->size)
147 			used_idx -= vq->size;
148 	}
149 
150 	/* The ordering for storing desc flags needs to be enforced. */
151 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
152 
153 	for (i = 0; i < vq->shadow_used_idx; i++) {
154 		uint16_t flags;
155 
156 		if (vq->shadow_used_packed[i].len)
157 			flags = VRING_DESC_F_WRITE;
158 		else
159 			flags = 0;
160 
161 		if (vq->used_wrap_counter) {
162 			flags |= VRING_DESC_F_USED;
163 			flags |= VRING_DESC_F_AVAIL;
164 		} else {
165 			flags &= ~VRING_DESC_F_USED;
166 			flags &= ~VRING_DESC_F_AVAIL;
167 		}
168 
169 		if (i > 0) {
170 			vq->desc_packed[vq->last_used_idx].flags = flags;
171 
172 			vhost_log_cache_used_vring(dev, vq,
173 					vq->last_used_idx *
174 					sizeof(struct vring_packed_desc),
175 					sizeof(struct vring_packed_desc));
176 		} else {
177 			head_idx = vq->last_used_idx;
178 			head_flags = flags;
179 		}
180 
181 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
182 	}
183 
184 	vq->desc_packed[head_idx].flags = head_flags;
185 
186 	vhost_log_cache_used_vring(dev, vq,
187 				head_idx *
188 				sizeof(struct vring_packed_desc),
189 				sizeof(struct vring_packed_desc));
190 
191 	vq->shadow_used_idx = 0;
192 	vhost_log_cache_sync(dev, vq);
193 }
194 
195 static __rte_always_inline void
196 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
197 				  struct vhost_virtqueue *vq)
198 {
199 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
200 
201 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
202 	/* desc flags is the synchronization point for virtio packed vring */
203 	__atomic_store_n(&vq->desc_packed[vq->shadow_last_used_idx].flags,
204 			 used_elem->flags, __ATOMIC_RELEASE);
205 
206 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
207 				   sizeof(struct vring_packed_desc),
208 				   sizeof(struct vring_packed_desc));
209 	vq->shadow_used_idx = 0;
210 	vhost_log_cache_sync(dev, vq);
211 }
212 
213 static __rte_always_inline void
214 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
215 				 struct vhost_virtqueue *vq,
216 				 uint64_t *lens,
217 				 uint16_t *ids)
218 {
219 	uint16_t i;
220 	uint16_t flags;
221 	uint16_t last_used_idx;
222 	struct vring_packed_desc *desc_base;
223 
224 	if (vq->shadow_used_idx) {
225 		do_data_copy_enqueue(dev, vq);
226 		vhost_flush_enqueue_shadow_packed(dev, vq);
227 	}
228 
229 	last_used_idx = vq->last_used_idx;
230 	desc_base = &vq->desc_packed[last_used_idx];
231 
232 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
233 
234 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
235 		desc_base[i].id = ids[i];
236 		desc_base[i].len = lens[i];
237 	}
238 
239 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
240 
241 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
242 		desc_base[i].flags = flags;
243 	}
244 
245 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
246 				   sizeof(struct vring_packed_desc),
247 				   sizeof(struct vring_packed_desc) *
248 				   PACKED_BATCH_SIZE);
249 	vhost_log_cache_sync(dev, vq);
250 
251 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
252 }
253 
254 static __rte_always_inline void
255 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
256 					  uint16_t id)
257 {
258 	vq->shadow_used_packed[0].id = id;
259 
260 	if (!vq->shadow_used_idx) {
261 		vq->shadow_last_used_idx = vq->last_used_idx;
262 		vq->shadow_used_packed[0].flags =
263 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
264 		vq->shadow_used_packed[0].len = 0;
265 		vq->shadow_used_packed[0].count = 1;
266 		vq->shadow_used_idx++;
267 	}
268 
269 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
270 }
271 
272 static __rte_always_inline void
273 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
274 				  struct vhost_virtqueue *vq,
275 				  uint16_t *ids)
276 {
277 	uint16_t flags;
278 	uint16_t i;
279 	uint16_t begin;
280 
281 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
282 
283 	if (!vq->shadow_used_idx) {
284 		vq->shadow_last_used_idx = vq->last_used_idx;
285 		vq->shadow_used_packed[0].id  = ids[0];
286 		vq->shadow_used_packed[0].len = 0;
287 		vq->shadow_used_packed[0].count = 1;
288 		vq->shadow_used_packed[0].flags = flags;
289 		vq->shadow_used_idx++;
290 		begin = 1;
291 	} else
292 		begin = 0;
293 
294 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
295 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
296 		vq->desc_packed[vq->last_used_idx + i].len = 0;
297 	}
298 
299 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
300 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
301 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
302 
303 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
304 				   sizeof(struct vring_packed_desc),
305 				   sizeof(struct vring_packed_desc) *
306 				   PACKED_BATCH_SIZE);
307 	vhost_log_cache_sync(dev, vq);
308 
309 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
310 }
311 
312 static __rte_always_inline void
313 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
314 				   uint16_t buf_id,
315 				   uint16_t count)
316 {
317 	uint16_t flags;
318 
319 	flags = vq->desc_packed[vq->last_used_idx].flags;
320 	if (vq->used_wrap_counter) {
321 		flags |= VRING_DESC_F_USED;
322 		flags |= VRING_DESC_F_AVAIL;
323 	} else {
324 		flags &= ~VRING_DESC_F_USED;
325 		flags &= ~VRING_DESC_F_AVAIL;
326 	}
327 
328 	if (!vq->shadow_used_idx) {
329 		vq->shadow_last_used_idx = vq->last_used_idx;
330 
331 		vq->shadow_used_packed[0].id  = buf_id;
332 		vq->shadow_used_packed[0].len = 0;
333 		vq->shadow_used_packed[0].flags = flags;
334 		vq->shadow_used_idx++;
335 	} else {
336 		vq->desc_packed[vq->last_used_idx].id = buf_id;
337 		vq->desc_packed[vq->last_used_idx].len = 0;
338 		vq->desc_packed[vq->last_used_idx].flags = flags;
339 	}
340 
341 	vq_inc_last_used_packed(vq, count);
342 }
343 
344 static __rte_always_inline void
345 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
346 					   uint16_t buf_id,
347 					   uint16_t count)
348 {
349 	uint16_t flags;
350 
351 	vq->shadow_used_packed[0].id = buf_id;
352 
353 	flags = vq->desc_packed[vq->last_used_idx].flags;
354 	if (vq->used_wrap_counter) {
355 		flags |= VRING_DESC_F_USED;
356 		flags |= VRING_DESC_F_AVAIL;
357 	} else {
358 		flags &= ~VRING_DESC_F_USED;
359 		flags &= ~VRING_DESC_F_AVAIL;
360 	}
361 
362 	if (!vq->shadow_used_idx) {
363 		vq->shadow_last_used_idx = vq->last_used_idx;
364 		vq->shadow_used_packed[0].len = 0;
365 		vq->shadow_used_packed[0].flags = flags;
366 		vq->shadow_used_idx++;
367 	}
368 
369 	vq_inc_last_used_packed(vq, count);
370 }
371 
372 static __rte_always_inline void
373 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
374 				   uint32_t *len,
375 				   uint16_t *id,
376 				   uint16_t *count,
377 				   uint16_t num_buffers)
378 {
379 	uint16_t i;
380 
381 	for (i = 0; i < num_buffers; i++) {
382 		/* enqueue shadow flush action aligned with batch num */
383 		if (!vq->shadow_used_idx)
384 			vq->shadow_aligned_idx = vq->last_used_idx &
385 				PACKED_BATCH_MASK;
386 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
387 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
388 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
389 		vq->shadow_aligned_idx += count[i];
390 		vq->shadow_used_idx++;
391 	}
392 }
393 
394 static __rte_always_inline void
395 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
396 				   struct vhost_virtqueue *vq,
397 				   uint32_t *len,
398 				   uint16_t *id,
399 				   uint16_t *count,
400 				   uint16_t num_buffers)
401 {
402 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
403 
404 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
405 		do_data_copy_enqueue(dev, vq);
406 		vhost_flush_enqueue_shadow_packed(dev, vq);
407 	}
408 }
409 
410 /* avoid write operation when necessary, to lessen cache issues */
411 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
412 	if ((var) != (val))			\
413 		(var) = (val);			\
414 } while (0)
415 
416 static __rte_always_inline void
417 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
418 {
419 	uint64_t csum_l4 = m_buf->ol_flags & PKT_TX_L4_MASK;
420 
421 	if (m_buf->ol_flags & PKT_TX_TCP_SEG)
422 		csum_l4 |= PKT_TX_TCP_CKSUM;
423 
424 	if (csum_l4) {
425 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
426 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
427 
428 		switch (csum_l4) {
429 		case PKT_TX_TCP_CKSUM:
430 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
431 						cksum));
432 			break;
433 		case PKT_TX_UDP_CKSUM:
434 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
435 						dgram_cksum));
436 			break;
437 		case PKT_TX_SCTP_CKSUM:
438 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
439 						cksum));
440 			break;
441 		}
442 	} else {
443 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
444 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
445 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
446 	}
447 
448 	/* IP cksum verification cannot be bypassed, then calculate here */
449 	if (m_buf->ol_flags & PKT_TX_IP_CKSUM) {
450 		struct rte_ipv4_hdr *ipv4_hdr;
451 
452 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
453 						   m_buf->l2_len);
454 		ipv4_hdr->hdr_checksum = 0;
455 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
456 	}
457 
458 	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
459 		if (m_buf->ol_flags & PKT_TX_IPV4)
460 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
461 		else
462 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
463 		net_hdr->gso_size = m_buf->tso_segsz;
464 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
465 					+ m_buf->l4_len;
466 	} else if (m_buf->ol_flags & PKT_TX_UDP_SEG) {
467 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
468 		net_hdr->gso_size = m_buf->tso_segsz;
469 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
470 			m_buf->l4_len;
471 	} else {
472 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
473 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
474 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
475 	}
476 }
477 
478 static __rte_always_inline int
479 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
480 		struct buf_vector *buf_vec, uint16_t *vec_idx,
481 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
482 {
483 	uint16_t vec_id = *vec_idx;
484 
485 	while (desc_len) {
486 		uint64_t desc_addr;
487 		uint64_t desc_chunck_len = desc_len;
488 
489 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
490 			return -1;
491 
492 		desc_addr = vhost_iova_to_vva(dev, vq,
493 				desc_iova,
494 				&desc_chunck_len,
495 				perm);
496 		if (unlikely(!desc_addr))
497 			return -1;
498 
499 		rte_prefetch0((void *)(uintptr_t)desc_addr);
500 
501 		buf_vec[vec_id].buf_iova = desc_iova;
502 		buf_vec[vec_id].buf_addr = desc_addr;
503 		buf_vec[vec_id].buf_len  = desc_chunck_len;
504 
505 		desc_len -= desc_chunck_len;
506 		desc_iova += desc_chunck_len;
507 		vec_id++;
508 	}
509 	*vec_idx = vec_id;
510 
511 	return 0;
512 }
513 
514 static __rte_always_inline int
515 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
516 			 uint32_t avail_idx, uint16_t *vec_idx,
517 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
518 			 uint32_t *desc_chain_len, uint8_t perm)
519 {
520 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
521 	uint16_t vec_id = *vec_idx;
522 	uint32_t len    = 0;
523 	uint64_t dlen;
524 	uint32_t nr_descs = vq->size;
525 	uint32_t cnt    = 0;
526 	struct vring_desc *descs = vq->desc;
527 	struct vring_desc *idesc = NULL;
528 
529 	if (unlikely(idx >= vq->size))
530 		return -1;
531 
532 	*desc_chain_head = idx;
533 
534 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
535 		dlen = vq->desc[idx].len;
536 		nr_descs = dlen / sizeof(struct vring_desc);
537 		if (unlikely(nr_descs > vq->size))
538 			return -1;
539 
540 		descs = (struct vring_desc *)(uintptr_t)
541 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
542 						&dlen,
543 						VHOST_ACCESS_RO);
544 		if (unlikely(!descs))
545 			return -1;
546 
547 		if (unlikely(dlen < vq->desc[idx].len)) {
548 			/*
549 			 * The indirect desc table is not contiguous
550 			 * in process VA space, we have to copy it.
551 			 */
552 			idesc = vhost_alloc_copy_ind_table(dev, vq,
553 					vq->desc[idx].addr, vq->desc[idx].len);
554 			if (unlikely(!idesc))
555 				return -1;
556 
557 			descs = idesc;
558 		}
559 
560 		idx = 0;
561 	}
562 
563 	while (1) {
564 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
565 			free_ind_table(idesc);
566 			return -1;
567 		}
568 
569 		dlen = descs[idx].len;
570 		len += dlen;
571 
572 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
573 						descs[idx].addr, dlen,
574 						perm))) {
575 			free_ind_table(idesc);
576 			return -1;
577 		}
578 
579 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
580 			break;
581 
582 		idx = descs[idx].next;
583 	}
584 
585 	*desc_chain_len = len;
586 	*vec_idx = vec_id;
587 
588 	if (unlikely(!!idesc))
589 		free_ind_table(idesc);
590 
591 	return 0;
592 }
593 
594 /*
595  * Returns -1 on fail, 0 on success
596  */
597 static inline int
598 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
599 				uint32_t size, struct buf_vector *buf_vec,
600 				uint16_t *num_buffers, uint16_t avail_head,
601 				uint16_t *nr_vec)
602 {
603 	uint16_t cur_idx;
604 	uint16_t vec_idx = 0;
605 	uint16_t max_tries, tries = 0;
606 
607 	uint16_t head_idx = 0;
608 	uint32_t len = 0;
609 
610 	*num_buffers = 0;
611 	cur_idx  = vq->last_avail_idx;
612 
613 	if (rxvq_is_mergeable(dev))
614 		max_tries = vq->size - 1;
615 	else
616 		max_tries = 1;
617 
618 	while (size > 0) {
619 		if (unlikely(cur_idx == avail_head))
620 			return -1;
621 		/*
622 		 * if we tried all available ring items, and still
623 		 * can't get enough buf, it means something abnormal
624 		 * happened.
625 		 */
626 		if (unlikely(++tries > max_tries))
627 			return -1;
628 
629 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
630 						&vec_idx, buf_vec,
631 						&head_idx, &len,
632 						VHOST_ACCESS_RW) < 0))
633 			return -1;
634 		len = RTE_MIN(len, size);
635 		update_shadow_used_ring_split(vq, head_idx, len);
636 		size -= len;
637 
638 		cur_idx++;
639 		*num_buffers += 1;
640 	}
641 
642 	*nr_vec = vec_idx;
643 
644 	return 0;
645 }
646 
647 static __rte_always_inline int
648 fill_vec_buf_packed_indirect(struct virtio_net *dev,
649 			struct vhost_virtqueue *vq,
650 			struct vring_packed_desc *desc, uint16_t *vec_idx,
651 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
652 {
653 	uint16_t i;
654 	uint32_t nr_descs;
655 	uint16_t vec_id = *vec_idx;
656 	uint64_t dlen;
657 	struct vring_packed_desc *descs, *idescs = NULL;
658 
659 	dlen = desc->len;
660 	descs = (struct vring_packed_desc *)(uintptr_t)
661 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
662 	if (unlikely(!descs))
663 		return -1;
664 
665 	if (unlikely(dlen < desc->len)) {
666 		/*
667 		 * The indirect desc table is not contiguous
668 		 * in process VA space, we have to copy it.
669 		 */
670 		idescs = vhost_alloc_copy_ind_table(dev,
671 				vq, desc->addr, desc->len);
672 		if (unlikely(!idescs))
673 			return -1;
674 
675 		descs = idescs;
676 	}
677 
678 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
679 	if (unlikely(nr_descs >= vq->size)) {
680 		free_ind_table(idescs);
681 		return -1;
682 	}
683 
684 	for (i = 0; i < nr_descs; i++) {
685 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
686 			free_ind_table(idescs);
687 			return -1;
688 		}
689 
690 		dlen = descs[i].len;
691 		*len += dlen;
692 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
693 						descs[i].addr, dlen,
694 						perm)))
695 			return -1;
696 	}
697 	*vec_idx = vec_id;
698 
699 	if (unlikely(!!idescs))
700 		free_ind_table(idescs);
701 
702 	return 0;
703 }
704 
705 static __rte_always_inline int
706 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
707 				uint16_t avail_idx, uint16_t *desc_count,
708 				struct buf_vector *buf_vec, uint16_t *vec_idx,
709 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
710 {
711 	bool wrap_counter = vq->avail_wrap_counter;
712 	struct vring_packed_desc *descs = vq->desc_packed;
713 	uint16_t vec_id = *vec_idx;
714 	uint64_t dlen;
715 
716 	if (avail_idx < vq->last_avail_idx)
717 		wrap_counter ^= 1;
718 
719 	/*
720 	 * Perform a load-acquire barrier in desc_is_avail to
721 	 * enforce the ordering between desc flags and desc
722 	 * content.
723 	 */
724 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
725 		return -1;
726 
727 	*desc_count = 0;
728 	*len = 0;
729 
730 	while (1) {
731 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
732 			return -1;
733 
734 		if (unlikely(*desc_count >= vq->size))
735 			return -1;
736 
737 		*desc_count += 1;
738 		*buf_id = descs[avail_idx].id;
739 
740 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
741 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
742 							&descs[avail_idx],
743 							&vec_id, buf_vec,
744 							len, perm) < 0))
745 				return -1;
746 		} else {
747 			dlen = descs[avail_idx].len;
748 			*len += dlen;
749 
750 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
751 							descs[avail_idx].addr,
752 							dlen,
753 							perm)))
754 				return -1;
755 		}
756 
757 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
758 			break;
759 
760 		if (++avail_idx >= vq->size) {
761 			avail_idx -= vq->size;
762 			wrap_counter ^= 1;
763 		}
764 	}
765 
766 	*vec_idx = vec_id;
767 
768 	return 0;
769 }
770 
771 static __rte_noinline void
772 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
773 		struct buf_vector *buf_vec,
774 		struct virtio_net_hdr_mrg_rxbuf *hdr)
775 {
776 	uint64_t len;
777 	uint64_t remain = dev->vhost_hlen;
778 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
779 	uint64_t iova = buf_vec->buf_iova;
780 
781 	while (remain) {
782 		len = RTE_MIN(remain,
783 				buf_vec->buf_len);
784 		dst = buf_vec->buf_addr;
785 		rte_memcpy((void *)(uintptr_t)dst,
786 				(void *)(uintptr_t)src,
787 				len);
788 
789 		PRINT_PACKET(dev, (uintptr_t)dst,
790 				(uint32_t)len, 0);
791 		vhost_log_cache_write_iova(dev, vq,
792 				iova, len);
793 
794 		remain -= len;
795 		iova += len;
796 		src += len;
797 		buf_vec++;
798 	}
799 }
800 
801 static __rte_always_inline int
802 copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
803 			    struct rte_mbuf *m, struct buf_vector *buf_vec,
804 			    uint16_t nr_vec, uint16_t num_buffers)
805 {
806 	uint32_t vec_idx = 0;
807 	uint32_t mbuf_offset, mbuf_avail;
808 	uint32_t buf_offset, buf_avail;
809 	uint64_t buf_addr, buf_iova, buf_len;
810 	uint32_t cpy_len;
811 	uint64_t hdr_addr;
812 	struct rte_mbuf *hdr_mbuf;
813 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
814 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
815 	int error = 0;
816 
817 	if (unlikely(m == NULL)) {
818 		error = -1;
819 		goto out;
820 	}
821 
822 	buf_addr = buf_vec[vec_idx].buf_addr;
823 	buf_iova = buf_vec[vec_idx].buf_iova;
824 	buf_len = buf_vec[vec_idx].buf_len;
825 
826 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
827 		error = -1;
828 		goto out;
829 	}
830 
831 	hdr_mbuf = m;
832 	hdr_addr = buf_addr;
833 	if (unlikely(buf_len < dev->vhost_hlen)) {
834 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
835 		hdr = &tmp_hdr;
836 	} else
837 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
838 
839 	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
840 		dev->vid, num_buffers);
841 
842 	if (unlikely(buf_len < dev->vhost_hlen)) {
843 		buf_offset = dev->vhost_hlen - buf_len;
844 		vec_idx++;
845 		buf_addr = buf_vec[vec_idx].buf_addr;
846 		buf_iova = buf_vec[vec_idx].buf_iova;
847 		buf_len = buf_vec[vec_idx].buf_len;
848 		buf_avail = buf_len - buf_offset;
849 	} else {
850 		buf_offset = dev->vhost_hlen;
851 		buf_avail = buf_len - dev->vhost_hlen;
852 	}
853 
854 	mbuf_avail  = rte_pktmbuf_data_len(m);
855 	mbuf_offset = 0;
856 	while (mbuf_avail != 0 || m->next != NULL) {
857 		/* done with current buf, get the next one */
858 		if (buf_avail == 0) {
859 			vec_idx++;
860 			if (unlikely(vec_idx >= nr_vec)) {
861 				error = -1;
862 				goto out;
863 			}
864 
865 			buf_addr = buf_vec[vec_idx].buf_addr;
866 			buf_iova = buf_vec[vec_idx].buf_iova;
867 			buf_len = buf_vec[vec_idx].buf_len;
868 
869 			buf_offset = 0;
870 			buf_avail  = buf_len;
871 		}
872 
873 		/* done with current mbuf, get the next one */
874 		if (mbuf_avail == 0) {
875 			m = m->next;
876 
877 			mbuf_offset = 0;
878 			mbuf_avail  = rte_pktmbuf_data_len(m);
879 		}
880 
881 		if (hdr_addr) {
882 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
883 			if (rxvq_is_mergeable(dev))
884 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
885 						num_buffers);
886 
887 			if (unlikely(hdr == &tmp_hdr)) {
888 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
889 			} else {
890 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
891 						dev->vhost_hlen, 0);
892 				vhost_log_cache_write_iova(dev, vq,
893 						buf_vec[0].buf_iova,
894 						dev->vhost_hlen);
895 			}
896 
897 			hdr_addr = 0;
898 		}
899 
900 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
901 
902 		if (likely(cpy_len > MAX_BATCH_LEN ||
903 					vq->batch_copy_nb_elems >= vq->size)) {
904 			rte_memcpy((void *)((uintptr_t)(buf_addr + buf_offset)),
905 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
906 				cpy_len);
907 			vhost_log_cache_write_iova(dev, vq,
908 						   buf_iova + buf_offset,
909 						   cpy_len);
910 			PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset),
911 				cpy_len, 0);
912 		} else {
913 			batch_copy[vq->batch_copy_nb_elems].dst =
914 				(void *)((uintptr_t)(buf_addr + buf_offset));
915 			batch_copy[vq->batch_copy_nb_elems].src =
916 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
917 			batch_copy[vq->batch_copy_nb_elems].log_addr =
918 				buf_iova + buf_offset;
919 			batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
920 			vq->batch_copy_nb_elems++;
921 		}
922 
923 		mbuf_avail  -= cpy_len;
924 		mbuf_offset += cpy_len;
925 		buf_avail  -= cpy_len;
926 		buf_offset += cpy_len;
927 	}
928 
929 out:
930 
931 	return error;
932 }
933 
934 static __rte_always_inline void
935 async_fill_vec(struct iovec *v, void *base, size_t len)
936 {
937 	v->iov_base = base;
938 	v->iov_len = len;
939 }
940 
941 static __rte_always_inline void
942 async_fill_iter(struct rte_vhost_iov_iter *it, size_t count,
943 	struct iovec *vec, unsigned long nr_seg)
944 {
945 	it->offset = 0;
946 	it->count = count;
947 
948 	if (count) {
949 		it->iov = vec;
950 		it->nr_segs = nr_seg;
951 	} else {
952 		it->iov = 0;
953 		it->nr_segs = 0;
954 	}
955 }
956 
957 static __rte_always_inline void
958 async_fill_desc(struct rte_vhost_async_desc *desc,
959 	struct rte_vhost_iov_iter *src, struct rte_vhost_iov_iter *dst)
960 {
961 	desc->src = src;
962 	desc->dst = dst;
963 }
964 
965 static __rte_always_inline int
966 async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
967 			struct rte_mbuf *m, struct buf_vector *buf_vec,
968 			uint16_t nr_vec, uint16_t num_buffers,
969 			struct iovec *src_iovec, struct iovec *dst_iovec,
970 			struct rte_vhost_iov_iter *src_it,
971 			struct rte_vhost_iov_iter *dst_it)
972 {
973 	uint32_t vec_idx = 0;
974 	uint32_t mbuf_offset, mbuf_avail;
975 	uint32_t buf_offset, buf_avail;
976 	uint64_t buf_addr, buf_iova, buf_len;
977 	uint32_t cpy_len, cpy_threshold;
978 	uint64_t hdr_addr;
979 	struct rte_mbuf *hdr_mbuf;
980 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
981 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
982 	int error = 0;
983 	uint64_t mapped_len;
984 
985 	uint32_t tlen = 0;
986 	int tvec_idx = 0;
987 	void *hpa;
988 
989 	if (unlikely(m == NULL)) {
990 		error = -1;
991 		goto out;
992 	}
993 
994 	cpy_threshold = vq->async_threshold;
995 
996 	buf_addr = buf_vec[vec_idx].buf_addr;
997 	buf_iova = buf_vec[vec_idx].buf_iova;
998 	buf_len = buf_vec[vec_idx].buf_len;
999 
1000 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
1001 		error = -1;
1002 		goto out;
1003 	}
1004 
1005 	hdr_mbuf = m;
1006 	hdr_addr = buf_addr;
1007 	if (unlikely(buf_len < dev->vhost_hlen)) {
1008 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
1009 		hdr = &tmp_hdr;
1010 	} else
1011 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
1012 
1013 	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
1014 		dev->vid, num_buffers);
1015 
1016 	if (unlikely(buf_len < dev->vhost_hlen)) {
1017 		buf_offset = dev->vhost_hlen - buf_len;
1018 		vec_idx++;
1019 		buf_addr = buf_vec[vec_idx].buf_addr;
1020 		buf_iova = buf_vec[vec_idx].buf_iova;
1021 		buf_len = buf_vec[vec_idx].buf_len;
1022 		buf_avail = buf_len - buf_offset;
1023 	} else {
1024 		buf_offset = dev->vhost_hlen;
1025 		buf_avail = buf_len - dev->vhost_hlen;
1026 	}
1027 
1028 	mbuf_avail  = rte_pktmbuf_data_len(m);
1029 	mbuf_offset = 0;
1030 
1031 	while (mbuf_avail != 0 || m->next != NULL) {
1032 		/* done with current buf, get the next one */
1033 		if (buf_avail == 0) {
1034 			vec_idx++;
1035 			if (unlikely(vec_idx >= nr_vec)) {
1036 				error = -1;
1037 				goto out;
1038 			}
1039 
1040 			buf_addr = buf_vec[vec_idx].buf_addr;
1041 			buf_iova = buf_vec[vec_idx].buf_iova;
1042 			buf_len = buf_vec[vec_idx].buf_len;
1043 
1044 			buf_offset = 0;
1045 			buf_avail  = buf_len;
1046 		}
1047 
1048 		/* done with current mbuf, get the next one */
1049 		if (mbuf_avail == 0) {
1050 			m = m->next;
1051 
1052 			mbuf_offset = 0;
1053 			mbuf_avail  = rte_pktmbuf_data_len(m);
1054 		}
1055 
1056 		if (hdr_addr) {
1057 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1058 			if (rxvq_is_mergeable(dev))
1059 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1060 						num_buffers);
1061 
1062 			if (unlikely(hdr == &tmp_hdr)) {
1063 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1064 			} else {
1065 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1066 						dev->vhost_hlen, 0);
1067 				vhost_log_cache_write_iova(dev, vq,
1068 						buf_vec[0].buf_iova,
1069 						dev->vhost_hlen);
1070 			}
1071 
1072 			hdr_addr = 0;
1073 		}
1074 
1075 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1076 
1077 		while (unlikely(cpy_len && cpy_len >= cpy_threshold)) {
1078 			hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev,
1079 					buf_iova + buf_offset,
1080 					cpy_len, &mapped_len);
1081 
1082 			if (unlikely(!hpa || mapped_len < cpy_threshold))
1083 				break;
1084 
1085 			async_fill_vec(src_iovec + tvec_idx,
1086 				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
1087 				mbuf_offset), (size_t)mapped_len);
1088 
1089 			async_fill_vec(dst_iovec + tvec_idx,
1090 					hpa, (size_t)mapped_len);
1091 
1092 			tlen += (uint32_t)mapped_len;
1093 			cpy_len -= (uint32_t)mapped_len;
1094 			mbuf_avail  -= (uint32_t)mapped_len;
1095 			mbuf_offset += (uint32_t)mapped_len;
1096 			buf_avail  -= (uint32_t)mapped_len;
1097 			buf_offset += (uint32_t)mapped_len;
1098 			tvec_idx++;
1099 		}
1100 
1101 		if (likely(cpy_len)) {
1102 			if (unlikely(vq->batch_copy_nb_elems >= vq->size)) {
1103 				rte_memcpy(
1104 				(void *)((uintptr_t)(buf_addr + buf_offset)),
1105 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1106 				cpy_len);
1107 
1108 				PRINT_PACKET(dev,
1109 					(uintptr_t)(buf_addr + buf_offset),
1110 					cpy_len, 0);
1111 			} else {
1112 				batch_copy[vq->batch_copy_nb_elems].dst =
1113 				(void *)((uintptr_t)(buf_addr + buf_offset));
1114 				batch_copy[vq->batch_copy_nb_elems].src =
1115 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1116 				batch_copy[vq->batch_copy_nb_elems].log_addr =
1117 					buf_iova + buf_offset;
1118 				batch_copy[vq->batch_copy_nb_elems].len =
1119 					cpy_len;
1120 				vq->batch_copy_nb_elems++;
1121 			}
1122 
1123 			mbuf_avail  -= cpy_len;
1124 			mbuf_offset += cpy_len;
1125 			buf_avail  -= cpy_len;
1126 			buf_offset += cpy_len;
1127 		}
1128 
1129 	}
1130 
1131 out:
1132 	if (tlen) {
1133 		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
1134 		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
1135 	} else {
1136 		src_it->count = 0;
1137 	}
1138 
1139 	return error;
1140 }
1141 
1142 static __rte_always_inline int
1143 vhost_enqueue_single_packed(struct virtio_net *dev,
1144 			    struct vhost_virtqueue *vq,
1145 			    struct rte_mbuf *pkt,
1146 			    struct buf_vector *buf_vec,
1147 			    uint16_t *nr_descs)
1148 {
1149 	uint16_t nr_vec = 0;
1150 	uint16_t avail_idx = vq->last_avail_idx;
1151 	uint16_t max_tries, tries = 0;
1152 	uint16_t buf_id = 0;
1153 	uint32_t len = 0;
1154 	uint16_t desc_count;
1155 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1156 	uint16_t num_buffers = 0;
1157 	uint32_t buffer_len[vq->size];
1158 	uint16_t buffer_buf_id[vq->size];
1159 	uint16_t buffer_desc_count[vq->size];
1160 
1161 	if (rxvq_is_mergeable(dev))
1162 		max_tries = vq->size - 1;
1163 	else
1164 		max_tries = 1;
1165 
1166 	while (size > 0) {
1167 		/*
1168 		 * if we tried all available ring items, and still
1169 		 * can't get enough buf, it means something abnormal
1170 		 * happened.
1171 		 */
1172 		if (unlikely(++tries > max_tries))
1173 			return -1;
1174 
1175 		if (unlikely(fill_vec_buf_packed(dev, vq,
1176 						avail_idx, &desc_count,
1177 						buf_vec, &nr_vec,
1178 						&buf_id, &len,
1179 						VHOST_ACCESS_RW) < 0))
1180 			return -1;
1181 
1182 		len = RTE_MIN(len, size);
1183 		size -= len;
1184 
1185 		buffer_len[num_buffers] = len;
1186 		buffer_buf_id[num_buffers] = buf_id;
1187 		buffer_desc_count[num_buffers] = desc_count;
1188 		num_buffers += 1;
1189 
1190 		*nr_descs += desc_count;
1191 		avail_idx += desc_count;
1192 		if (avail_idx >= vq->size)
1193 			avail_idx -= vq->size;
1194 	}
1195 
1196 	if (copy_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers) < 0)
1197 		return -1;
1198 
1199 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1200 					   buffer_desc_count, num_buffers);
1201 
1202 	return 0;
1203 }
1204 
1205 static __rte_noinline uint32_t
1206 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1207 	struct rte_mbuf **pkts, uint32_t count)
1208 {
1209 	uint32_t pkt_idx = 0;
1210 	uint16_t num_buffers;
1211 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1212 	uint16_t avail_head;
1213 
1214 	/*
1215 	 * The ordering between avail index and
1216 	 * desc reads needs to be enforced.
1217 	 */
1218 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1219 
1220 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1221 
1222 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1223 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1224 		uint16_t nr_vec = 0;
1225 
1226 		if (unlikely(reserve_avail_buf_split(dev, vq,
1227 						pkt_len, buf_vec, &num_buffers,
1228 						avail_head, &nr_vec) < 0)) {
1229 			VHOST_LOG_DATA(DEBUG,
1230 				"(%d) failed to get enough desc from vring\n",
1231 				dev->vid);
1232 			vq->shadow_used_idx -= num_buffers;
1233 			break;
1234 		}
1235 
1236 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1237 			dev->vid, vq->last_avail_idx,
1238 			vq->last_avail_idx + num_buffers);
1239 
1240 		if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
1241 						buf_vec, nr_vec,
1242 						num_buffers) < 0) {
1243 			vq->shadow_used_idx -= num_buffers;
1244 			break;
1245 		}
1246 
1247 		vq->last_avail_idx += num_buffers;
1248 	}
1249 
1250 	do_data_copy_enqueue(dev, vq);
1251 
1252 	if (likely(vq->shadow_used_idx)) {
1253 		flush_shadow_used_ring_split(dev, vq);
1254 		vhost_vring_call_split(dev, vq);
1255 	}
1256 
1257 	return pkt_idx;
1258 }
1259 
1260 static __rte_always_inline int
1261 virtio_dev_rx_batch_packed(struct virtio_net *dev,
1262 			   struct vhost_virtqueue *vq,
1263 			   struct rte_mbuf **pkts)
1264 {
1265 	bool wrap_counter = vq->avail_wrap_counter;
1266 	struct vring_packed_desc *descs = vq->desc_packed;
1267 	uint16_t avail_idx = vq->last_avail_idx;
1268 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1269 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1270 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1271 	uint64_t lens[PACKED_BATCH_SIZE];
1272 	uint16_t ids[PACKED_BATCH_SIZE];
1273 	uint16_t i;
1274 
1275 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1276 		return -1;
1277 
1278 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1279 		return -1;
1280 
1281 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1282 		if (unlikely(pkts[i]->next != NULL))
1283 			return -1;
1284 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1285 					    wrap_counter)))
1286 			return -1;
1287 	}
1288 
1289 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1290 		lens[i] = descs[avail_idx + i].len;
1291 
1292 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1293 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1294 			return -1;
1295 	}
1296 
1297 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1298 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1299 						  descs[avail_idx + i].addr,
1300 						  &lens[i],
1301 						  VHOST_ACCESS_RW);
1302 
1303 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1304 		if (unlikely(!desc_addrs[i]))
1305 			return -1;
1306 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1307 			return -1;
1308 	}
1309 
1310 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1311 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1312 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1313 					(uintptr_t)desc_addrs[i];
1314 		lens[i] = pkts[i]->pkt_len +
1315 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1316 	}
1317 
1318 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1319 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1320 
1321 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1322 
1323 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1324 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1325 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1326 			   pkts[i]->pkt_len);
1327 	}
1328 
1329 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1330 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1331 					   lens[i]);
1332 
1333 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1334 		ids[i] = descs[avail_idx + i].id;
1335 
1336 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1337 
1338 	return 0;
1339 }
1340 
1341 static __rte_always_inline int16_t
1342 virtio_dev_rx_single_packed(struct virtio_net *dev,
1343 			    struct vhost_virtqueue *vq,
1344 			    struct rte_mbuf *pkt)
1345 {
1346 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1347 	uint16_t nr_descs = 0;
1348 
1349 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1350 						 &nr_descs) < 0)) {
1351 		VHOST_LOG_DATA(DEBUG,
1352 				"(%d) failed to get enough desc from vring\n",
1353 				dev->vid);
1354 		return -1;
1355 	}
1356 
1357 	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1358 			dev->vid, vq->last_avail_idx,
1359 			vq->last_avail_idx + nr_descs);
1360 
1361 	vq_inc_last_avail_packed(vq, nr_descs);
1362 
1363 	return 0;
1364 }
1365 
1366 static __rte_noinline uint32_t
1367 virtio_dev_rx_packed(struct virtio_net *dev,
1368 		     struct vhost_virtqueue *__rte_restrict vq,
1369 		     struct rte_mbuf **__rte_restrict pkts,
1370 		     uint32_t count)
1371 {
1372 	uint32_t pkt_idx = 0;
1373 
1374 	do {
1375 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1376 
1377 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1378 			if (!virtio_dev_rx_batch_packed(dev, vq,
1379 							&pkts[pkt_idx])) {
1380 				pkt_idx += PACKED_BATCH_SIZE;
1381 				continue;
1382 			}
1383 		}
1384 
1385 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1386 			break;
1387 		pkt_idx++;
1388 
1389 	} while (pkt_idx < count);
1390 
1391 	if (vq->shadow_used_idx) {
1392 		do_data_copy_enqueue(dev, vq);
1393 		vhost_flush_enqueue_shadow_packed(dev, vq);
1394 	}
1395 
1396 	if (pkt_idx)
1397 		vhost_vring_call_packed(dev, vq);
1398 
1399 	return pkt_idx;
1400 }
1401 
1402 static __rte_always_inline uint32_t
1403 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
1404 	struct rte_mbuf **pkts, uint32_t count)
1405 {
1406 	struct vhost_virtqueue *vq;
1407 	uint32_t nb_tx = 0;
1408 
1409 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1410 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1411 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1412 			dev->vid, __func__, queue_id);
1413 		return 0;
1414 	}
1415 
1416 	vq = dev->virtqueue[queue_id];
1417 
1418 	rte_spinlock_lock(&vq->access_lock);
1419 
1420 	if (unlikely(!vq->enabled))
1421 		goto out_access_unlock;
1422 
1423 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1424 		vhost_user_iotlb_rd_lock(vq);
1425 
1426 	if (unlikely(!vq->access_ok))
1427 		if (unlikely(vring_translate(dev, vq) < 0))
1428 			goto out;
1429 
1430 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1431 	if (count == 0)
1432 		goto out;
1433 
1434 	if (vq_is_packed(dev))
1435 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1436 	else
1437 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1438 
1439 out:
1440 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1441 		vhost_user_iotlb_rd_unlock(vq);
1442 
1443 out_access_unlock:
1444 	rte_spinlock_unlock(&vq->access_lock);
1445 
1446 	return nb_tx;
1447 }
1448 
1449 uint16_t
1450 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1451 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1452 {
1453 	struct virtio_net *dev = get_device(vid);
1454 
1455 	if (!dev)
1456 		return 0;
1457 
1458 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1459 		VHOST_LOG_DATA(ERR,
1460 			"(%d) %s: built-in vhost net backend is disabled.\n",
1461 			dev->vid, __func__);
1462 		return 0;
1463 	}
1464 
1465 	return virtio_dev_rx(dev, queue_id, pkts, count);
1466 }
1467 
1468 static __rte_always_inline uint16_t
1469 virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
1470 	uint16_t vq_size, uint16_t n_inflight)
1471 {
1472 	return pkts_idx > n_inflight ? (pkts_idx - n_inflight) :
1473 		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
1474 }
1475 
1476 static __rte_always_inline void
1477 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1478 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1479 {
1480 	size_t elem_size = sizeof(struct vring_used_elem);
1481 
1482 	if (d_idx + count <= ring_size) {
1483 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1484 	} else {
1485 		uint16_t size = ring_size - d_idx;
1486 
1487 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1488 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1489 	}
1490 }
1491 
1492 static __rte_always_inline void
1493 store_dma_desc_info_packed(struct vring_used_elem_packed *s_ring,
1494 		struct vring_used_elem_packed *d_ring,
1495 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1496 {
1497 	size_t elem_size = sizeof(struct vring_used_elem_packed);
1498 
1499 	if (d_idx + count <= ring_size) {
1500 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1501 	} else {
1502 		uint16_t size = ring_size - d_idx;
1503 
1504 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1505 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1506 	}
1507 }
1508 
1509 static __rte_noinline uint32_t
1510 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
1511 	struct vhost_virtqueue *vq, uint16_t queue_id,
1512 	struct rte_mbuf **pkts, uint32_t count,
1513 	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
1514 {
1515 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
1516 	uint16_t num_buffers;
1517 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1518 	uint16_t avail_head;
1519 
1520 	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
1521 	struct iovec *vec_pool = vq->vec_pool;
1522 	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
1523 	struct iovec *src_iovec = vec_pool;
1524 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
1525 	uint16_t slot_idx = 0;
1526 	uint16_t segs_await = 0;
1527 	uint16_t iovec_idx = 0, it_idx = 0;
1528 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
1529 	uint32_t n_pkts = 0, pkt_err = 0;
1530 	uint32_t num_async_pkts = 0, num_done_pkts = 0;
1531 	struct {
1532 		uint16_t pkt_idx;
1533 		uint16_t last_avail_idx;
1534 	} async_pkts_log[MAX_PKT_BURST];
1535 
1536 	/*
1537 	 * The ordering between avail index and desc reads need to be enforced.
1538 	 */
1539 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1540 
1541 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1542 
1543 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1544 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1545 		uint16_t nr_vec = 0;
1546 
1547 		if (unlikely(reserve_avail_buf_split(dev, vq,
1548 						pkt_len, buf_vec, &num_buffers,
1549 						avail_head, &nr_vec) < 0)) {
1550 			VHOST_LOG_DATA(DEBUG,
1551 				"(%d) failed to get enough desc from vring\n",
1552 				dev->vid);
1553 			vq->shadow_used_idx -= num_buffers;
1554 			break;
1555 		}
1556 
1557 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1558 			dev->vid, vq->last_avail_idx,
1559 			vq->last_avail_idx + num_buffers);
1560 
1561 		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers,
1562 				&src_iovec[iovec_idx], &dst_iovec[iovec_idx],
1563 				&it_pool[it_idx], &it_pool[it_idx + 1]) < 0) {
1564 			vq->shadow_used_idx -= num_buffers;
1565 			break;
1566 		}
1567 
1568 		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
1569 			(vq->size - 1);
1570 		if (it_pool[it_idx].count) {
1571 			uint16_t from, to;
1572 
1573 			async_fill_desc(&tdes[pkt_burst_idx++],
1574 				&it_pool[it_idx], &it_pool[it_idx + 1]);
1575 			pkts_info[slot_idx].descs = num_buffers;
1576 			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1577 			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
1578 			async_pkts_log[num_async_pkts++].last_avail_idx =
1579 				vq->last_avail_idx;
1580 
1581 			iovec_idx += it_pool[it_idx].nr_segs;
1582 			it_idx += 2;
1583 
1584 			segs_await += it_pool[it_idx].nr_segs;
1585 
1586 			/**
1587 			 * recover shadow used ring and keep DMA-occupied
1588 			 * descriptors.
1589 			 */
1590 			from = vq->shadow_used_idx - num_buffers;
1591 			to = vq->async_desc_idx_split & (vq->size - 1);
1592 
1593 			store_dma_desc_info_split(vq->shadow_used_split,
1594 					vq->async_descs_split, vq->size, from, to, num_buffers);
1595 
1596 			vq->async_desc_idx_split += num_buffers;
1597 			vq->shadow_used_idx -= num_buffers;
1598 		} else
1599 			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
1600 
1601 		vq->last_avail_idx += num_buffers;
1602 
1603 		/*
1604 		 * conditions to trigger async device transfer:
1605 		 * - buffered packet number reaches transfer threshold
1606 		 * - unused async iov number is less than max vhost vector
1607 		 */
1608 		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
1609 			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
1610 			BUF_VECTOR_MAX))) {
1611 			n_pkts = vq->async_ops.transfer_data(dev->vid,
1612 					queue_id, tdes, 0, pkt_burst_idx);
1613 			iovec_idx = 0;
1614 			it_idx = 0;
1615 
1616 			segs_await = 0;
1617 			vq->async_pkts_inflight_n += n_pkts;
1618 
1619 			if (unlikely(n_pkts < pkt_burst_idx)) {
1620 				/*
1621 				 * log error packets number here and do actual
1622 				 * error processing when applications poll
1623 				 * completion
1624 				 */
1625 				pkt_err = pkt_burst_idx - n_pkts;
1626 				pkt_burst_idx = 0;
1627 				break;
1628 			}
1629 
1630 			pkt_burst_idx = 0;
1631 		}
1632 	}
1633 
1634 	if (pkt_burst_idx) {
1635 		n_pkts = vq->async_ops.transfer_data(dev->vid,
1636 				queue_id, tdes, 0, pkt_burst_idx);
1637 		vq->async_pkts_inflight_n += n_pkts;
1638 
1639 		if (unlikely(n_pkts < pkt_burst_idx))
1640 			pkt_err = pkt_burst_idx - n_pkts;
1641 	}
1642 
1643 	do_data_copy_enqueue(dev, vq);
1644 
1645 	if (unlikely(pkt_err)) {
1646 		uint16_t num_descs = 0;
1647 
1648 		num_async_pkts -= pkt_err;
1649 		/* calculate the sum of descriptors of DMA-error packets. */
1650 		while (pkt_err-- > 0) {
1651 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1652 			slot_idx--;
1653 		}
1654 		vq->async_desc_idx_split -= num_descs;
1655 		/* recover shadow used ring and available ring */
1656 		vq->shadow_used_idx -= (vq->last_avail_idx -
1657 				async_pkts_log[num_async_pkts].last_avail_idx -
1658 				num_descs);
1659 		vq->last_avail_idx =
1660 			async_pkts_log[num_async_pkts].last_avail_idx;
1661 		pkt_idx = async_pkts_log[num_async_pkts].pkt_idx;
1662 		num_done_pkts = pkt_idx - num_async_pkts;
1663 	}
1664 
1665 	vq->async_pkts_idx += num_async_pkts;
1666 	*comp_count = num_done_pkts;
1667 
1668 	if (likely(vq->shadow_used_idx)) {
1669 		flush_shadow_used_ring_split(dev, vq);
1670 		vhost_vring_call_split(dev, vq);
1671 	}
1672 
1673 	return pkt_idx;
1674 }
1675 
1676 static __rte_always_inline void
1677 vhost_update_used_packed(struct vhost_virtqueue *vq,
1678 			struct vring_used_elem_packed *shadow_ring,
1679 			uint16_t count)
1680 {
1681 	int i;
1682 	uint16_t used_idx = vq->last_used_idx;
1683 	uint16_t head_idx = vq->last_used_idx;
1684 	uint16_t head_flags = 0;
1685 
1686 	if (count == 0)
1687 		return;
1688 
1689 	/* Split loop in two to save memory barriers */
1690 	for (i = 0; i < count; i++) {
1691 		vq->desc_packed[used_idx].id = shadow_ring[i].id;
1692 		vq->desc_packed[used_idx].len = shadow_ring[i].len;
1693 
1694 		used_idx += shadow_ring[i].count;
1695 		if (used_idx >= vq->size)
1696 			used_idx -= vq->size;
1697 	}
1698 
1699 	/* The ordering for storing desc flags needs to be enforced. */
1700 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
1701 
1702 	for (i = 0; i < count; i++) {
1703 		uint16_t flags;
1704 
1705 		if (vq->shadow_used_packed[i].len)
1706 			flags = VRING_DESC_F_WRITE;
1707 		else
1708 			flags = 0;
1709 
1710 		if (vq->used_wrap_counter) {
1711 			flags |= VRING_DESC_F_USED;
1712 			flags |= VRING_DESC_F_AVAIL;
1713 		} else {
1714 			flags &= ~VRING_DESC_F_USED;
1715 			flags &= ~VRING_DESC_F_AVAIL;
1716 		}
1717 
1718 		if (i > 0) {
1719 			vq->desc_packed[vq->last_used_idx].flags = flags;
1720 		} else {
1721 			head_idx = vq->last_used_idx;
1722 			head_flags = flags;
1723 		}
1724 
1725 		vq_inc_last_used_packed(vq, shadow_ring[i].count);
1726 	}
1727 
1728 	vq->desc_packed[head_idx].flags = head_flags;
1729 }
1730 
1731 static __rte_always_inline int
1732 virtio_dev_rx_async_batch_packed(struct virtio_net *dev,
1733 			   struct vhost_virtqueue *vq,
1734 			   struct rte_mbuf **pkts,
1735 			   struct rte_mbuf **comp_pkts, uint32_t *pkt_done)
1736 {
1737 	uint16_t i;
1738 	uint32_t cpy_threshold = vq->async_threshold;
1739 
1740 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1741 		if (unlikely(pkts[i]->pkt_len >= cpy_threshold))
1742 			return -1;
1743 	}
1744 	if (!virtio_dev_rx_batch_packed(dev, vq, pkts)) {
1745 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1746 			comp_pkts[(*pkt_done)++] = pkts[i];
1747 
1748 		return 0;
1749 	}
1750 
1751 	return -1;
1752 }
1753 
1754 static __rte_always_inline int
1755 vhost_enqueue_async_single_packed(struct virtio_net *dev,
1756 			    struct vhost_virtqueue *vq,
1757 			    struct rte_mbuf *pkt,
1758 			    struct buf_vector *buf_vec,
1759 			    uint16_t *nr_descs,
1760 			    uint16_t *nr_buffers,
1761 			    struct vring_packed_desc *async_descs,
1762 			    struct iovec *src_iovec, struct iovec *dst_iovec,
1763 			    struct rte_vhost_iov_iter *src_it,
1764 			    struct rte_vhost_iov_iter *dst_it)
1765 {
1766 	uint16_t nr_vec = 0;
1767 	uint16_t avail_idx = vq->last_avail_idx;
1768 	uint16_t max_tries, tries = 0;
1769 	uint16_t buf_id = 0;
1770 	uint32_t len = 0;
1771 	uint16_t desc_count = 0;
1772 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1773 	uint32_t buffer_len[vq->size];
1774 	uint16_t buffer_buf_id[vq->size];
1775 	uint16_t buffer_desc_count[vq->size];
1776 
1777 	if (rxvq_is_mergeable(dev))
1778 		max_tries = vq->size - 1;
1779 	else
1780 		max_tries = 1;
1781 
1782 	while (size > 0) {
1783 		/*
1784 		 * if we tried all available ring items, and still
1785 		 * can't get enough buf, it means something abnormal
1786 		 * happened.
1787 		 */
1788 		if (unlikely(++tries > max_tries))
1789 			return -1;
1790 
1791 		if (unlikely(fill_vec_buf_packed(dev, vq, avail_idx, &desc_count, buf_vec, &nr_vec,
1792 						&buf_id, &len, VHOST_ACCESS_RW) < 0))
1793 			return -1;
1794 
1795 		len = RTE_MIN(len, size);
1796 		size -= len;
1797 
1798 		buffer_len[*nr_buffers] = len;
1799 		buffer_buf_id[*nr_buffers] = buf_id;
1800 		buffer_desc_count[*nr_buffers] = desc_count;
1801 		*nr_buffers += 1;
1802 
1803 		*nr_descs += desc_count;
1804 		avail_idx += desc_count;
1805 		if (avail_idx >= vq->size)
1806 			avail_idx -= vq->size;
1807 	}
1808 
1809 	if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, src_iovec, dst_iovec,
1810 			src_it, dst_it) < 0)
1811 		return -1;
1812 	/* store descriptors for DMA */
1813 	if (avail_idx >= *nr_descs) {
1814 		rte_memcpy(async_descs, &vq->desc_packed[vq->last_avail_idx],
1815 			*nr_descs * sizeof(struct vring_packed_desc));
1816 	} else {
1817 		uint16_t nr_copy = vq->size - vq->last_avail_idx;
1818 
1819 		rte_memcpy(async_descs, &vq->desc_packed[vq->last_avail_idx],
1820 			nr_copy * sizeof(struct vring_packed_desc));
1821 		rte_memcpy(async_descs + nr_copy, vq->desc_packed,
1822 			(*nr_descs - nr_copy) * sizeof(struct vring_packed_desc));
1823 	}
1824 
1825 	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id, buffer_desc_count, *nr_buffers);
1826 
1827 	return 0;
1828 }
1829 
1830 static __rte_always_inline int16_t
1831 virtio_dev_rx_async_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1832 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers,
1833 			    struct vring_packed_desc *async_descs,
1834 			    struct iovec *src_iovec, struct iovec *dst_iovec,
1835 			    struct rte_vhost_iov_iter *src_it, struct rte_vhost_iov_iter *dst_it)
1836 {
1837 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1838 
1839 	if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt, buf_vec, nr_descs, nr_buffers,
1840 						 async_descs, src_iovec, dst_iovec,
1841 						 src_it, dst_it) < 0)) {
1842 		VHOST_LOG_DATA(DEBUG, "(%d) failed to get enough desc from vring\n", dev->vid);
1843 		return -1;
1844 	}
1845 
1846 	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1847 			dev->vid, vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1848 
1849 	return 0;
1850 }
1851 
1852 static __rte_always_inline void
1853 dma_error_handler_packed(struct vhost_virtqueue *vq, struct vring_packed_desc *async_descs,
1854 			uint16_t async_descs_idx, uint16_t slot_idx, uint32_t nr_err,
1855 			uint32_t *pkt_idx, uint32_t *num_async_pkts, uint32_t *num_done_pkts)
1856 {
1857 	uint16_t descs_err = 0;
1858 	uint16_t buffers_err = 0;
1859 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
1860 
1861 	*num_async_pkts -= nr_err;
1862 	*pkt_idx -= nr_err;
1863 	/* calculate the sum of buffers and descs of DMA-error packets. */
1864 	while (nr_err-- > 0) {
1865 		descs_err += pkts_info[slot_idx % vq->size].descs;
1866 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
1867 		slot_idx--;
1868 	}
1869 
1870 	vq->async_buffer_idx_packed -= buffers_err;
1871 
1872 	if (vq->last_avail_idx >= descs_err) {
1873 		vq->last_avail_idx -= descs_err;
1874 
1875 		rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
1876 			&async_descs[async_descs_idx - descs_err],
1877 			descs_err * sizeof(struct vring_packed_desc));
1878 	} else {
1879 		uint16_t nr_copy;
1880 
1881 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
1882 		nr_copy = vq->size - vq->last_avail_idx;
1883 		rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
1884 			&async_descs[async_descs_idx - descs_err],
1885 			nr_copy * sizeof(struct vring_packed_desc));
1886 		descs_err -= nr_copy;
1887 		rte_memcpy(&vq->desc_packed[0], &async_descs[async_descs_idx - descs_err],
1888 			descs_err * sizeof(struct vring_packed_desc));
1889 		vq->avail_wrap_counter ^= 1;
1890 	}
1891 
1892 	*num_done_pkts = *pkt_idx - *num_async_pkts;
1893 }
1894 
1895 static __rte_noinline uint32_t
1896 virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
1897 	struct vhost_virtqueue *vq, uint16_t queue_id,
1898 	struct rte_mbuf **pkts, uint32_t count,
1899 	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
1900 {
1901 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
1902 	uint32_t remained = count;
1903 	uint16_t async_descs_idx = 0;
1904 	uint16_t num_buffers;
1905 	uint16_t num_descs;
1906 
1907 	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
1908 	struct iovec *vec_pool = vq->vec_pool;
1909 	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
1910 	struct iovec *src_iovec = vec_pool;
1911 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
1912 	uint16_t slot_idx = 0;
1913 	uint16_t segs_await = 0;
1914 	uint16_t iovec_idx = 0, it_idx = 0;
1915 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
1916 	uint32_t n_pkts = 0, pkt_err = 0;
1917 	uint32_t num_async_pkts = 0, num_done_pkts = 0;
1918 	struct vring_packed_desc async_descs[vq->size];
1919 
1920 	do {
1921 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1922 		if (remained >= PACKED_BATCH_SIZE) {
1923 			if (!virtio_dev_rx_async_batch_packed(dev, vq,
1924 				&pkts[pkt_idx], comp_pkts, &num_done_pkts)) {
1925 				pkt_idx += PACKED_BATCH_SIZE;
1926 				remained -= PACKED_BATCH_SIZE;
1927 				continue;
1928 			}
1929 		}
1930 
1931 		num_buffers = 0;
1932 		num_descs = 0;
1933 		if (unlikely(virtio_dev_rx_async_single_packed(dev, vq, pkts[pkt_idx],
1934 						&num_descs, &num_buffers,
1935 						&async_descs[async_descs_idx],
1936 						&src_iovec[iovec_idx], &dst_iovec[iovec_idx],
1937 						&it_pool[it_idx], &it_pool[it_idx + 1]) < 0))
1938 			break;
1939 
1940 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1941 			dev->vid, vq->last_avail_idx,
1942 			vq->last_avail_idx + num_descs);
1943 
1944 		slot_idx = (vq->async_pkts_idx + num_async_pkts) % vq->size;
1945 		if (it_pool[it_idx].count) {
1946 			uint16_t from, to;
1947 
1948 			async_descs_idx += num_descs;
1949 			async_fill_desc(&tdes[pkt_burst_idx++],
1950 				&it_pool[it_idx], &it_pool[it_idx + 1]);
1951 			pkts_info[slot_idx].descs = num_descs;
1952 			pkts_info[slot_idx].nr_buffers = num_buffers;
1953 			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1954 			num_async_pkts++;
1955 			iovec_idx += it_pool[it_idx].nr_segs;
1956 			it_idx += 2;
1957 
1958 			segs_await += it_pool[it_idx].nr_segs;
1959 
1960 			/**
1961 			 * recover shadow used ring and keep DMA-occupied
1962 			 * descriptors.
1963 			 */
1964 			from = vq->shadow_used_idx - num_buffers;
1965 			to = vq->async_buffer_idx_packed % vq->size;
1966 			store_dma_desc_info_packed(vq->shadow_used_packed,
1967 					vq->async_buffers_packed, vq->size, from, to, num_buffers);
1968 
1969 			vq->async_buffer_idx_packed += num_buffers;
1970 			vq->shadow_used_idx -= num_buffers;
1971 		} else {
1972 			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
1973 		}
1974 
1975 		pkt_idx++;
1976 		remained--;
1977 		vq_inc_last_avail_packed(vq, num_descs);
1978 
1979 		/*
1980 		 * conditions to trigger async device transfer:
1981 		 * - buffered packet number reaches transfer threshold
1982 		 * - unused async iov number is less than max vhost vector
1983 		 */
1984 		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
1985 			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await < BUF_VECTOR_MAX))) {
1986 			n_pkts = vq->async_ops.transfer_data(dev->vid, queue_id,
1987 				tdes, 0, pkt_burst_idx);
1988 			iovec_idx = 0;
1989 			it_idx = 0;
1990 			segs_await = 0;
1991 			vq->async_pkts_inflight_n += n_pkts;
1992 
1993 			if (unlikely(n_pkts < pkt_burst_idx)) {
1994 				/*
1995 				 * log error packets number here and do actual
1996 				 * error processing when applications poll
1997 				 * completion
1998 				 */
1999 				pkt_err = pkt_burst_idx - n_pkts;
2000 				pkt_burst_idx = 0;
2001 				break;
2002 			}
2003 
2004 			pkt_burst_idx = 0;
2005 		}
2006 	} while (pkt_idx < count);
2007 
2008 	if (pkt_burst_idx) {
2009 		n_pkts = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, pkt_burst_idx);
2010 		vq->async_pkts_inflight_n += n_pkts;
2011 
2012 		if (unlikely(n_pkts < pkt_burst_idx))
2013 			pkt_err = pkt_burst_idx - n_pkts;
2014 	}
2015 
2016 	do_data_copy_enqueue(dev, vq);
2017 
2018 	if (unlikely(pkt_err))
2019 		dma_error_handler_packed(vq, async_descs, async_descs_idx, slot_idx, pkt_err,
2020 					&pkt_idx, &num_async_pkts, &num_done_pkts);
2021 	vq->async_pkts_idx += num_async_pkts;
2022 	*comp_count = num_done_pkts;
2023 
2024 	if (likely(vq->shadow_used_idx)) {
2025 		vhost_flush_enqueue_shadow_packed(dev, vq);
2026 		vhost_vring_call_packed(dev, vq);
2027 	}
2028 
2029 	return pkt_idx;
2030 }
2031 
2032 static __rte_always_inline void
2033 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
2034 {
2035 	uint16_t nr_left = n_descs;
2036 	uint16_t nr_copy;
2037 	uint16_t to, from;
2038 
2039 	do {
2040 		from = vq->last_async_desc_idx_split & (vq->size - 1);
2041 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
2042 		to = vq->last_used_idx & (vq->size - 1);
2043 
2044 		if (to + nr_copy <= vq->size) {
2045 			rte_memcpy(&vq->used->ring[to], &vq->async_descs_split[from],
2046 					nr_copy * sizeof(struct vring_used_elem));
2047 		} else {
2048 			uint16_t size = vq->size - to;
2049 
2050 			rte_memcpy(&vq->used->ring[to], &vq->async_descs_split[from],
2051 					size * sizeof(struct vring_used_elem));
2052 			rte_memcpy(&vq->used->ring[0], &vq->async_descs_split[from + size],
2053 					(nr_copy - size) * sizeof(struct vring_used_elem));
2054 		}
2055 
2056 		vq->last_async_desc_idx_split += nr_copy;
2057 		vq->last_used_idx += nr_copy;
2058 		nr_left -= nr_copy;
2059 	} while (nr_left > 0);
2060 }
2061 
2062 static __rte_always_inline void
2063 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
2064 				uint16_t n_buffers)
2065 {
2066 	uint16_t nr_left = n_buffers;
2067 	uint16_t from, to;
2068 
2069 	do {
2070 		from = vq->last_async_buffer_idx_packed % vq->size;
2071 		to = (from + nr_left) % vq->size;
2072 		if (to > from) {
2073 			vhost_update_used_packed(vq, vq->async_buffers_packed + from, to - from);
2074 			vq->last_async_buffer_idx_packed += nr_left;
2075 			nr_left = 0;
2076 		} else {
2077 			vhost_update_used_packed(vq, vq->async_buffers_packed + from,
2078 				vq->size - from);
2079 			vq->last_async_buffer_idx_packed += vq->size - from;
2080 			nr_left -= vq->size - from;
2081 		}
2082 	} while (nr_left > 0);
2083 }
2084 
2085 uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
2086 		struct rte_mbuf **pkts, uint16_t count)
2087 {
2088 	struct virtio_net *dev = get_device(vid);
2089 	struct vhost_virtqueue *vq;
2090 	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
2091 	uint16_t start_idx, pkts_idx, vq_size;
2092 	struct async_inflight_info *pkts_info;
2093 	uint16_t from, i;
2094 
2095 	if (!dev)
2096 		return 0;
2097 
2098 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
2099 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2100 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
2101 			dev->vid, __func__, queue_id);
2102 		return 0;
2103 	}
2104 
2105 	vq = dev->virtqueue[queue_id];
2106 
2107 	if (unlikely(!vq->async_registered)) {
2108 		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
2109 			dev->vid, __func__, queue_id);
2110 		return 0;
2111 	}
2112 
2113 	rte_spinlock_lock(&vq->access_lock);
2114 
2115 	pkts_idx = vq->async_pkts_idx % vq->size;
2116 	pkts_info = vq->async_pkts_info;
2117 	vq_size = vq->size;
2118 	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
2119 		vq_size, vq->async_pkts_inflight_n);
2120 
2121 	if (count > vq->async_last_pkts_n)
2122 		n_pkts_cpl = vq->async_ops.check_completed_copies(vid,
2123 			queue_id, 0, count - vq->async_last_pkts_n);
2124 	n_pkts_cpl += vq->async_last_pkts_n;
2125 
2126 	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
2127 	if (unlikely(n_pkts_put == 0)) {
2128 		vq->async_last_pkts_n = n_pkts_cpl;
2129 		goto done;
2130 	}
2131 
2132 	if (vq_is_packed(dev)) {
2133 		for (i = 0; i < n_pkts_put; i++) {
2134 			from = (start_idx + i) & (vq_size - 1);
2135 			n_buffers += pkts_info[from].nr_buffers;
2136 			pkts[i] = pkts_info[from].mbuf;
2137 		}
2138 	} else {
2139 		for (i = 0; i < n_pkts_put; i++) {
2140 			from = (start_idx + i) & (vq_size - 1);
2141 			n_descs += pkts_info[from].descs;
2142 			pkts[i] = pkts_info[from].mbuf;
2143 		}
2144 	}
2145 
2146 	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
2147 	vq->async_pkts_inflight_n -= n_pkts_put;
2148 
2149 	if (likely(vq->enabled && vq->access_ok)) {
2150 		if (vq_is_packed(dev)) {
2151 			write_back_completed_descs_packed(vq, n_buffers);
2152 
2153 			vhost_vring_call_packed(dev, vq);
2154 		} else {
2155 			write_back_completed_descs_split(vq, n_descs);
2156 
2157 			__atomic_add_fetch(&vq->used->idx, n_descs,
2158 					__ATOMIC_RELEASE);
2159 			vhost_vring_call_split(dev, vq);
2160 		}
2161 	} else {
2162 		if (vq_is_packed(dev))
2163 			vq->last_async_buffer_idx_packed += n_buffers;
2164 		else
2165 			vq->last_async_desc_idx_split += n_descs;
2166 	}
2167 
2168 done:
2169 	rte_spinlock_unlock(&vq->access_lock);
2170 
2171 	return n_pkts_put;
2172 }
2173 
2174 static __rte_always_inline uint32_t
2175 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
2176 	struct rte_mbuf **pkts, uint32_t count,
2177 	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
2178 {
2179 	struct vhost_virtqueue *vq;
2180 	uint32_t nb_tx = 0;
2181 
2182 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
2183 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2184 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
2185 			dev->vid, __func__, queue_id);
2186 		return 0;
2187 	}
2188 
2189 	vq = dev->virtqueue[queue_id];
2190 
2191 	rte_spinlock_lock(&vq->access_lock);
2192 
2193 	if (unlikely(!vq->enabled || !vq->async_registered))
2194 		goto out_access_unlock;
2195 
2196 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2197 		vhost_user_iotlb_rd_lock(vq);
2198 
2199 	if (unlikely(!vq->access_ok))
2200 		if (unlikely(vring_translate(dev, vq) < 0))
2201 			goto out;
2202 
2203 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
2204 	if (count == 0)
2205 		goto out;
2206 
2207 	if (vq_is_packed(dev))
2208 		nb_tx = virtio_dev_rx_async_submit_packed(dev,
2209 				vq, queue_id, pkts, count, comp_pkts,
2210 				comp_count);
2211 	else
2212 		nb_tx = virtio_dev_rx_async_submit_split(dev,
2213 				vq, queue_id, pkts, count, comp_pkts,
2214 				comp_count);
2215 
2216 out:
2217 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2218 		vhost_user_iotlb_rd_unlock(vq);
2219 
2220 out_access_unlock:
2221 	rte_spinlock_unlock(&vq->access_lock);
2222 
2223 	return nb_tx;
2224 }
2225 
2226 uint16_t
2227 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2228 		struct rte_mbuf **pkts, uint16_t count,
2229 		struct rte_mbuf **comp_pkts, uint32_t *comp_count)
2230 {
2231 	struct virtio_net *dev = get_device(vid);
2232 
2233 	*comp_count = 0;
2234 	if (!dev)
2235 		return 0;
2236 
2237 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2238 		VHOST_LOG_DATA(ERR,
2239 			"(%d) %s: built-in vhost net backend is disabled.\n",
2240 			dev->vid, __func__);
2241 		return 0;
2242 	}
2243 
2244 	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count, comp_pkts,
2245 			comp_count);
2246 }
2247 
2248 static inline bool
2249 virtio_net_with_host_offload(struct virtio_net *dev)
2250 {
2251 	if (dev->features &
2252 			((1ULL << VIRTIO_NET_F_CSUM) |
2253 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2254 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2255 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2256 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2257 		return true;
2258 
2259 	return false;
2260 }
2261 
2262 static void
2263 parse_ethernet(struct rte_mbuf *m, uint16_t *l4_proto, void **l4_hdr)
2264 {
2265 	struct rte_ipv4_hdr *ipv4_hdr;
2266 	struct rte_ipv6_hdr *ipv6_hdr;
2267 	void *l3_hdr = NULL;
2268 	struct rte_ether_hdr *eth_hdr;
2269 	uint16_t ethertype;
2270 
2271 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2272 
2273 	m->l2_len = sizeof(struct rte_ether_hdr);
2274 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2275 
2276 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2277 		struct rte_vlan_hdr *vlan_hdr =
2278 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2279 
2280 		m->l2_len += sizeof(struct rte_vlan_hdr);
2281 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2282 	}
2283 
2284 	l3_hdr = (char *)eth_hdr + m->l2_len;
2285 
2286 	switch (ethertype) {
2287 	case RTE_ETHER_TYPE_IPV4:
2288 		ipv4_hdr = l3_hdr;
2289 		*l4_proto = ipv4_hdr->next_proto_id;
2290 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2291 		*l4_hdr = (char *)l3_hdr + m->l3_len;
2292 		m->ol_flags |= PKT_TX_IPV4;
2293 		break;
2294 	case RTE_ETHER_TYPE_IPV6:
2295 		ipv6_hdr = l3_hdr;
2296 		*l4_proto = ipv6_hdr->proto;
2297 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2298 		*l4_hdr = (char *)l3_hdr + m->l3_len;
2299 		m->ol_flags |= PKT_TX_IPV6;
2300 		break;
2301 	default:
2302 		m->l3_len = 0;
2303 		*l4_proto = 0;
2304 		*l4_hdr = NULL;
2305 		break;
2306 	}
2307 }
2308 
2309 static __rte_always_inline void
2310 vhost_dequeue_offload_legacy(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
2311 {
2312 	uint16_t l4_proto = 0;
2313 	void *l4_hdr = NULL;
2314 	struct rte_tcp_hdr *tcp_hdr = NULL;
2315 
2316 	parse_ethernet(m, &l4_proto, &l4_hdr);
2317 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2318 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2319 			switch (hdr->csum_offset) {
2320 			case (offsetof(struct rte_tcp_hdr, cksum)):
2321 				if (l4_proto == IPPROTO_TCP)
2322 					m->ol_flags |= PKT_TX_TCP_CKSUM;
2323 				break;
2324 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2325 				if (l4_proto == IPPROTO_UDP)
2326 					m->ol_flags |= PKT_TX_UDP_CKSUM;
2327 				break;
2328 			case (offsetof(struct rte_sctp_hdr, cksum)):
2329 				if (l4_proto == IPPROTO_SCTP)
2330 					m->ol_flags |= PKT_TX_SCTP_CKSUM;
2331 				break;
2332 			default:
2333 				break;
2334 			}
2335 		}
2336 	}
2337 
2338 	if (l4_hdr && hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2339 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2340 		case VIRTIO_NET_HDR_GSO_TCPV4:
2341 		case VIRTIO_NET_HDR_GSO_TCPV6:
2342 			tcp_hdr = l4_hdr;
2343 			m->ol_flags |= PKT_TX_TCP_SEG;
2344 			m->tso_segsz = hdr->gso_size;
2345 			m->l4_len = (tcp_hdr->data_off & 0xf0) >> 2;
2346 			break;
2347 		case VIRTIO_NET_HDR_GSO_UDP:
2348 			m->ol_flags |= PKT_TX_UDP_SEG;
2349 			m->tso_segsz = hdr->gso_size;
2350 			m->l4_len = sizeof(struct rte_udp_hdr);
2351 			break;
2352 		default:
2353 			VHOST_LOG_DATA(WARNING,
2354 				"unsupported gso type %u.\n", hdr->gso_type);
2355 			break;
2356 		}
2357 	}
2358 }
2359 
2360 static __rte_always_inline void
2361 vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m,
2362 	bool legacy_ol_flags)
2363 {
2364 	struct rte_net_hdr_lens hdr_lens;
2365 	int l4_supported = 0;
2366 	uint32_t ptype;
2367 
2368 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2369 		return;
2370 
2371 	if (legacy_ol_flags) {
2372 		vhost_dequeue_offload_legacy(hdr, m);
2373 		return;
2374 	}
2375 
2376 	m->ol_flags |= PKT_RX_IP_CKSUM_UNKNOWN;
2377 
2378 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2379 	m->packet_type = ptype;
2380 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2381 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2382 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2383 		l4_supported = 1;
2384 
2385 	/* According to Virtio 1.1 spec, the device only needs to look at
2386 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2387 	 * This differs from the processing incoming packets path where the
2388 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2389 	 * device.
2390 	 *
2391 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2392 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2393 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2394 	 *
2395 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2396 	 * The device MUST ignore flag bits that it does not recognize.
2397 	 */
2398 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2399 		uint32_t hdrlen;
2400 
2401 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2402 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2403 			m->ol_flags |= PKT_RX_L4_CKSUM_NONE;
2404 		} else {
2405 			/* Unknown proto or tunnel, do sw cksum. We can assume
2406 			 * the cksum field is in the first segment since the
2407 			 * buffers we provided to the host are large enough.
2408 			 * In case of SCTP, this will be wrong since it's a CRC
2409 			 * but there's nothing we can do.
2410 			 */
2411 			uint16_t csum = 0, off;
2412 
2413 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2414 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2415 				return;
2416 			if (likely(csum != 0xffff))
2417 				csum = ~csum;
2418 			off = hdr->csum_offset + hdr->csum_start;
2419 			if (rte_pktmbuf_data_len(m) >= off + 1)
2420 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2421 		}
2422 	}
2423 
2424 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2425 		if (hdr->gso_size == 0)
2426 			return;
2427 
2428 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2429 		case VIRTIO_NET_HDR_GSO_TCPV4:
2430 		case VIRTIO_NET_HDR_GSO_TCPV6:
2431 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2432 				break;
2433 			m->ol_flags |= PKT_RX_LRO | PKT_RX_L4_CKSUM_NONE;
2434 			m->tso_segsz = hdr->gso_size;
2435 			break;
2436 		case VIRTIO_NET_HDR_GSO_UDP:
2437 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2438 				break;
2439 			m->ol_flags |= PKT_RX_LRO | PKT_RX_L4_CKSUM_NONE;
2440 			m->tso_segsz = hdr->gso_size;
2441 			break;
2442 		default:
2443 			break;
2444 		}
2445 	}
2446 }
2447 
2448 static __rte_noinline void
2449 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2450 		struct buf_vector *buf_vec)
2451 {
2452 	uint64_t len;
2453 	uint64_t remain = sizeof(struct virtio_net_hdr);
2454 	uint64_t src;
2455 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2456 
2457 	while (remain) {
2458 		len = RTE_MIN(remain, buf_vec->buf_len);
2459 		src = buf_vec->buf_addr;
2460 		rte_memcpy((void *)(uintptr_t)dst,
2461 				(void *)(uintptr_t)src, len);
2462 
2463 		remain -= len;
2464 		dst += len;
2465 		buf_vec++;
2466 	}
2467 }
2468 
2469 static __rte_always_inline int
2470 copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2471 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2472 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2473 		  bool legacy_ol_flags)
2474 {
2475 	uint32_t buf_avail, buf_offset;
2476 	uint64_t buf_addr, buf_len;
2477 	uint32_t mbuf_avail, mbuf_offset;
2478 	uint32_t cpy_len;
2479 	struct rte_mbuf *cur = m, *prev = m;
2480 	struct virtio_net_hdr tmp_hdr;
2481 	struct virtio_net_hdr *hdr = NULL;
2482 	/* A counter to avoid desc dead loop chain */
2483 	uint16_t vec_idx = 0;
2484 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
2485 	int error = 0;
2486 
2487 	buf_addr = buf_vec[vec_idx].buf_addr;
2488 	buf_len = buf_vec[vec_idx].buf_len;
2489 
2490 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
2491 		error = -1;
2492 		goto out;
2493 	}
2494 
2495 	if (virtio_net_with_host_offload(dev)) {
2496 		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
2497 			/*
2498 			 * No luck, the virtio-net header doesn't fit
2499 			 * in a contiguous virtual area.
2500 			 */
2501 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2502 			hdr = &tmp_hdr;
2503 		} else {
2504 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
2505 		}
2506 	}
2507 
2508 	/*
2509 	 * A virtio driver normally uses at least 2 desc buffers
2510 	 * for Tx: the first for storing the header, and others
2511 	 * for storing the data.
2512 	 */
2513 	if (unlikely(buf_len < dev->vhost_hlen)) {
2514 		buf_offset = dev->vhost_hlen - buf_len;
2515 		vec_idx++;
2516 		buf_addr = buf_vec[vec_idx].buf_addr;
2517 		buf_len = buf_vec[vec_idx].buf_len;
2518 		buf_avail  = buf_len - buf_offset;
2519 	} else if (buf_len == dev->vhost_hlen) {
2520 		if (unlikely(++vec_idx >= nr_vec))
2521 			goto out;
2522 		buf_addr = buf_vec[vec_idx].buf_addr;
2523 		buf_len = buf_vec[vec_idx].buf_len;
2524 
2525 		buf_offset = 0;
2526 		buf_avail = buf_len;
2527 	} else {
2528 		buf_offset = dev->vhost_hlen;
2529 		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
2530 	}
2531 
2532 	PRINT_PACKET(dev,
2533 			(uintptr_t)(buf_addr + buf_offset),
2534 			(uint32_t)buf_avail, 0);
2535 
2536 	mbuf_offset = 0;
2537 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2538 	while (1) {
2539 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2540 
2541 		if (likely(cpy_len > MAX_BATCH_LEN ||
2542 					vq->batch_copy_nb_elems >= vq->size ||
2543 					(hdr && cur == m))) {
2544 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
2545 						mbuf_offset),
2546 					(void *)((uintptr_t)(buf_addr +
2547 							buf_offset)), cpy_len);
2548 		} else {
2549 			batch_copy[vq->batch_copy_nb_elems].dst =
2550 				rte_pktmbuf_mtod_offset(cur, void *,
2551 						mbuf_offset);
2552 			batch_copy[vq->batch_copy_nb_elems].src =
2553 				(void *)((uintptr_t)(buf_addr + buf_offset));
2554 			batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
2555 			vq->batch_copy_nb_elems++;
2556 		}
2557 
2558 		mbuf_avail  -= cpy_len;
2559 		mbuf_offset += cpy_len;
2560 		buf_avail -= cpy_len;
2561 		buf_offset += cpy_len;
2562 
2563 		/* This buf reaches to its end, get the next one */
2564 		if (buf_avail == 0) {
2565 			if (++vec_idx >= nr_vec)
2566 				break;
2567 
2568 			buf_addr = buf_vec[vec_idx].buf_addr;
2569 			buf_len = buf_vec[vec_idx].buf_len;
2570 
2571 			buf_offset = 0;
2572 			buf_avail  = buf_len;
2573 
2574 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2575 					(uint32_t)buf_avail, 0);
2576 		}
2577 
2578 		/*
2579 		 * This mbuf reaches to its end, get a new one
2580 		 * to hold more data.
2581 		 */
2582 		if (mbuf_avail == 0) {
2583 			cur = rte_pktmbuf_alloc(mbuf_pool);
2584 			if (unlikely(cur == NULL)) {
2585 				VHOST_LOG_DATA(ERR, "Failed to "
2586 					"allocate memory for mbuf.\n");
2587 				error = -1;
2588 				goto out;
2589 			}
2590 
2591 			prev->next = cur;
2592 			prev->data_len = mbuf_offset;
2593 			m->nb_segs += 1;
2594 			m->pkt_len += mbuf_offset;
2595 			prev = cur;
2596 
2597 			mbuf_offset = 0;
2598 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
2599 		}
2600 	}
2601 
2602 	prev->data_len = mbuf_offset;
2603 	m->pkt_len    += mbuf_offset;
2604 
2605 	if (hdr)
2606 		vhost_dequeue_offload(hdr, m, legacy_ol_flags);
2607 
2608 out:
2609 
2610 	return error;
2611 }
2612 
2613 static void
2614 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
2615 {
2616 	rte_free(opaque);
2617 }
2618 
2619 static int
2620 virtio_dev_extbuf_alloc(struct rte_mbuf *pkt, uint32_t size)
2621 {
2622 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
2623 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
2624 	uint16_t buf_len;
2625 	rte_iova_t iova;
2626 	void *buf;
2627 
2628 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
2629 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
2630 
2631 	if (unlikely(total_len > UINT16_MAX))
2632 		return -ENOSPC;
2633 
2634 	buf_len = total_len;
2635 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
2636 	if (unlikely(buf == NULL))
2637 		return -ENOMEM;
2638 
2639 	/* Initialize shinfo */
2640 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
2641 						virtio_dev_extbuf_free, buf);
2642 	if (unlikely(shinfo == NULL)) {
2643 		rte_free(buf);
2644 		VHOST_LOG_DATA(ERR, "Failed to init shinfo\n");
2645 		return -1;
2646 	}
2647 
2648 	iova = rte_malloc_virt2iova(buf);
2649 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
2650 	rte_pktmbuf_reset_headroom(pkt);
2651 
2652 	return 0;
2653 }
2654 
2655 static __rte_always_inline int
2656 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
2657 			 uint32_t data_len)
2658 {
2659 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
2660 		return 0;
2661 
2662 	/* attach an external buffer if supported */
2663 	if (dev->extbuf && !virtio_dev_extbuf_alloc(pkt, data_len))
2664 		return 0;
2665 
2666 	/* check if chained buffers are allowed */
2667 	if (!dev->linearbuf)
2668 		return 0;
2669 
2670 	return -1;
2671 }
2672 
2673 /*
2674  * Allocate a host supported pktmbuf.
2675  */
2676 static __rte_always_inline struct rte_mbuf *
2677 virtio_dev_pktmbuf_alloc(struct virtio_net *dev, struct rte_mempool *mp,
2678 			 uint32_t data_len)
2679 {
2680 	struct rte_mbuf *pkt = rte_pktmbuf_alloc(mp);
2681 
2682 	if (unlikely(pkt == NULL)) {
2683 		VHOST_LOG_DATA(ERR,
2684 			"Failed to allocate memory for mbuf.\n");
2685 		return NULL;
2686 	}
2687 
2688 	if (virtio_dev_pktmbuf_prep(dev, pkt, data_len)) {
2689 		/* Data doesn't fit into the buffer and the host supports
2690 		 * only linear buffers
2691 		 */
2692 		rte_pktmbuf_free(pkt);
2693 		return NULL;
2694 	}
2695 
2696 	return pkt;
2697 }
2698 
2699 __rte_always_inline
2700 static uint16_t
2701 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
2702 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
2703 	bool legacy_ol_flags)
2704 {
2705 	uint16_t i;
2706 	uint16_t free_entries;
2707 	uint16_t dropped = 0;
2708 	static bool allocerr_warned;
2709 
2710 	/*
2711 	 * The ordering between avail index and
2712 	 * desc reads needs to be enforced.
2713 	 */
2714 	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
2715 			vq->last_avail_idx;
2716 	if (free_entries == 0)
2717 		return 0;
2718 
2719 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
2720 
2721 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
2722 
2723 	count = RTE_MIN(count, MAX_PKT_BURST);
2724 	count = RTE_MIN(count, free_entries);
2725 	VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n",
2726 			dev->vid, count);
2727 
2728 	for (i = 0; i < count; i++) {
2729 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
2730 		uint16_t head_idx;
2731 		uint32_t buf_len;
2732 		uint16_t nr_vec = 0;
2733 		int err;
2734 
2735 		if (unlikely(fill_vec_buf_split(dev, vq,
2736 						vq->last_avail_idx + i,
2737 						&nr_vec, buf_vec,
2738 						&head_idx, &buf_len,
2739 						VHOST_ACCESS_RO) < 0))
2740 			break;
2741 
2742 		update_shadow_used_ring_split(vq, head_idx, 0);
2743 
2744 		pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
2745 		if (unlikely(pkts[i] == NULL)) {
2746 			/*
2747 			 * mbuf allocation fails for jumbo packets when external
2748 			 * buffer allocation is not allowed and linear buffer
2749 			 * is required. Drop this packet.
2750 			 */
2751 			if (!allocerr_warned) {
2752 				VHOST_LOG_DATA(ERR,
2753 					"Failed mbuf alloc of size %d from %s on %s.\n",
2754 					buf_len, mbuf_pool->name, dev->ifname);
2755 				allocerr_warned = true;
2756 			}
2757 			dropped += 1;
2758 			i++;
2759 			break;
2760 		}
2761 
2762 		err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
2763 				mbuf_pool, legacy_ol_flags);
2764 		if (unlikely(err)) {
2765 			rte_pktmbuf_free(pkts[i]);
2766 			if (!allocerr_warned) {
2767 				VHOST_LOG_DATA(ERR,
2768 					"Failed to copy desc to mbuf on %s.\n",
2769 					dev->ifname);
2770 				allocerr_warned = true;
2771 			}
2772 			dropped += 1;
2773 			i++;
2774 			break;
2775 		}
2776 	}
2777 
2778 	vq->last_avail_idx += i;
2779 
2780 	do_data_copy_dequeue(vq);
2781 	if (unlikely(i < count))
2782 		vq->shadow_used_idx = i;
2783 	if (likely(vq->shadow_used_idx)) {
2784 		flush_shadow_used_ring_split(dev, vq);
2785 		vhost_vring_call_split(dev, vq);
2786 	}
2787 
2788 	return (i - dropped);
2789 }
2790 
2791 __rte_noinline
2792 static uint16_t
2793 virtio_dev_tx_split_legacy(struct virtio_net *dev,
2794 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2795 	struct rte_mbuf **pkts, uint16_t count)
2796 {
2797 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
2798 }
2799 
2800 __rte_noinline
2801 static uint16_t
2802 virtio_dev_tx_split_compliant(struct virtio_net *dev,
2803 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2804 	struct rte_mbuf **pkts, uint16_t count)
2805 {
2806 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
2807 }
2808 
2809 static __rte_always_inline int
2810 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
2811 				 struct vhost_virtqueue *vq,
2812 				 struct rte_mbuf **pkts,
2813 				 uint16_t avail_idx,
2814 				 uintptr_t *desc_addrs,
2815 				 uint16_t *ids)
2816 {
2817 	bool wrap = vq->avail_wrap_counter;
2818 	struct vring_packed_desc *descs = vq->desc_packed;
2819 	uint64_t lens[PACKED_BATCH_SIZE];
2820 	uint64_t buf_lens[PACKED_BATCH_SIZE];
2821 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2822 	uint16_t flags, i;
2823 
2824 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
2825 		return -1;
2826 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
2827 		return -1;
2828 
2829 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2830 		flags = descs[avail_idx + i].flags;
2831 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
2832 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
2833 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
2834 			return -1;
2835 	}
2836 
2837 	rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
2838 
2839 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2840 		lens[i] = descs[avail_idx + i].len;
2841 
2842 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2843 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
2844 						  descs[avail_idx + i].addr,
2845 						  &lens[i], VHOST_ACCESS_RW);
2846 	}
2847 
2848 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2849 		if (unlikely(!desc_addrs[i]))
2850 			return -1;
2851 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
2852 			return -1;
2853 	}
2854 
2855 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2856 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
2857 			goto err;
2858 	}
2859 
2860 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2861 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
2862 
2863 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2864 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
2865 			goto err;
2866 	}
2867 
2868 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2869 		pkts[i]->pkt_len = lens[i] - buf_offset;
2870 		pkts[i]->data_len = pkts[i]->pkt_len;
2871 		ids[i] = descs[avail_idx + i].id;
2872 	}
2873 
2874 	return 0;
2875 
2876 err:
2877 	return -1;
2878 }
2879 
2880 static __rte_always_inline int
2881 virtio_dev_tx_batch_packed(struct virtio_net *dev,
2882 			   struct vhost_virtqueue *vq,
2883 			   struct rte_mbuf **pkts,
2884 			   bool legacy_ol_flags)
2885 {
2886 	uint16_t avail_idx = vq->last_avail_idx;
2887 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2888 	struct virtio_net_hdr *hdr;
2889 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
2890 	uint16_t ids[PACKED_BATCH_SIZE];
2891 	uint16_t i;
2892 
2893 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
2894 					     desc_addrs, ids))
2895 		return -1;
2896 
2897 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2898 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2899 
2900 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2901 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
2902 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
2903 			   pkts[i]->pkt_len);
2904 
2905 	if (virtio_net_with_host_offload(dev)) {
2906 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2907 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
2908 			vhost_dequeue_offload(hdr, pkts[i], legacy_ol_flags);
2909 		}
2910 	}
2911 
2912 	if (virtio_net_is_inorder(dev))
2913 		vhost_shadow_dequeue_batch_packed_inorder(vq,
2914 			ids[PACKED_BATCH_SIZE - 1]);
2915 	else
2916 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
2917 
2918 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2919 
2920 	return 0;
2921 }
2922 
2923 static __rte_always_inline int
2924 vhost_dequeue_single_packed(struct virtio_net *dev,
2925 			    struct vhost_virtqueue *vq,
2926 			    struct rte_mempool *mbuf_pool,
2927 			    struct rte_mbuf *pkts,
2928 			    uint16_t *buf_id,
2929 			    uint16_t *desc_count,
2930 			    bool legacy_ol_flags)
2931 {
2932 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
2933 	uint32_t buf_len;
2934 	uint16_t nr_vec = 0;
2935 	int err;
2936 	static bool allocerr_warned;
2937 
2938 	if (unlikely(fill_vec_buf_packed(dev, vq,
2939 					 vq->last_avail_idx, desc_count,
2940 					 buf_vec, &nr_vec,
2941 					 buf_id, &buf_len,
2942 					 VHOST_ACCESS_RO) < 0))
2943 		return -1;
2944 
2945 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
2946 		if (!allocerr_warned) {
2947 			VHOST_LOG_DATA(ERR,
2948 				"Failed mbuf alloc of size %d from %s on %s.\n",
2949 				buf_len, mbuf_pool->name, dev->ifname);
2950 			allocerr_warned = true;
2951 		}
2952 		return -1;
2953 	}
2954 
2955 	err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
2956 				mbuf_pool, legacy_ol_flags);
2957 	if (unlikely(err)) {
2958 		if (!allocerr_warned) {
2959 			VHOST_LOG_DATA(ERR,
2960 				"Failed to copy desc to mbuf on %s.\n",
2961 				dev->ifname);
2962 			allocerr_warned = true;
2963 		}
2964 		return -1;
2965 	}
2966 
2967 	return 0;
2968 }
2969 
2970 static __rte_always_inline int
2971 virtio_dev_tx_single_packed(struct virtio_net *dev,
2972 			    struct vhost_virtqueue *vq,
2973 			    struct rte_mempool *mbuf_pool,
2974 			    struct rte_mbuf *pkts,
2975 			    bool legacy_ol_flags)
2976 {
2977 
2978 	uint16_t buf_id, desc_count = 0;
2979 	int ret;
2980 
2981 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
2982 					&desc_count, legacy_ol_flags);
2983 
2984 	if (likely(desc_count > 0)) {
2985 		if (virtio_net_is_inorder(dev))
2986 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
2987 								   desc_count);
2988 		else
2989 			vhost_shadow_dequeue_single_packed(vq, buf_id,
2990 					desc_count);
2991 
2992 		vq_inc_last_avail_packed(vq, desc_count);
2993 	}
2994 
2995 	return ret;
2996 }
2997 
2998 __rte_always_inline
2999 static uint16_t
3000 virtio_dev_tx_packed(struct virtio_net *dev,
3001 		     struct vhost_virtqueue *__rte_restrict vq,
3002 		     struct rte_mempool *mbuf_pool,
3003 		     struct rte_mbuf **__rte_restrict pkts,
3004 		     uint32_t count,
3005 		     bool legacy_ol_flags)
3006 {
3007 	uint32_t pkt_idx = 0;
3008 
3009 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
3010 		return 0;
3011 
3012 	do {
3013 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
3014 
3015 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
3016 			if (!virtio_dev_tx_batch_packed(dev, vq,
3017 							&pkts[pkt_idx],
3018 							legacy_ol_flags)) {
3019 				pkt_idx += PACKED_BATCH_SIZE;
3020 				continue;
3021 			}
3022 		}
3023 
3024 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
3025 						pkts[pkt_idx],
3026 						legacy_ol_flags))
3027 			break;
3028 		pkt_idx++;
3029 	} while (pkt_idx < count);
3030 
3031 	if (pkt_idx != count)
3032 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
3033 
3034 	if (vq->shadow_used_idx) {
3035 		do_data_copy_dequeue(vq);
3036 
3037 		vhost_flush_dequeue_shadow_packed(dev, vq);
3038 		vhost_vring_call_packed(dev, vq);
3039 	}
3040 
3041 	return pkt_idx;
3042 }
3043 
3044 __rte_noinline
3045 static uint16_t
3046 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
3047 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3048 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3049 {
3050 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
3051 }
3052 
3053 __rte_noinline
3054 static uint16_t
3055 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
3056 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
3057 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
3058 {
3059 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
3060 }
3061 
3062 uint16_t
3063 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
3064 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
3065 {
3066 	struct virtio_net *dev;
3067 	struct rte_mbuf *rarp_mbuf = NULL;
3068 	struct vhost_virtqueue *vq;
3069 	int16_t success = 1;
3070 
3071 	dev = get_device(vid);
3072 	if (!dev)
3073 		return 0;
3074 
3075 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
3076 		VHOST_LOG_DATA(ERR,
3077 			"(%d) %s: built-in vhost net backend is disabled.\n",
3078 			dev->vid, __func__);
3079 		return 0;
3080 	}
3081 
3082 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
3083 		VHOST_LOG_DATA(ERR,
3084 			"(%d) %s: invalid virtqueue idx %d.\n",
3085 			dev->vid, __func__, queue_id);
3086 		return 0;
3087 	}
3088 
3089 	vq = dev->virtqueue[queue_id];
3090 
3091 	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
3092 		return 0;
3093 
3094 	if (unlikely(!vq->enabled)) {
3095 		count = 0;
3096 		goto out_access_unlock;
3097 	}
3098 
3099 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
3100 		vhost_user_iotlb_rd_lock(vq);
3101 
3102 	if (unlikely(!vq->access_ok))
3103 		if (unlikely(vring_translate(dev, vq) < 0)) {
3104 			count = 0;
3105 			goto out;
3106 		}
3107 
3108 	/*
3109 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
3110 	 * array, to looks like that guest actually send such packet.
3111 	 *
3112 	 * Check user_send_rarp() for more information.
3113 	 *
3114 	 * broadcast_rarp shares a cacheline in the virtio_net structure
3115 	 * with some fields that are accessed during enqueue and
3116 	 * __atomic_compare_exchange_n causes a write if performed compare
3117 	 * and exchange. This could result in false sharing between enqueue
3118 	 * and dequeue.
3119 	 *
3120 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
3121 	 * and only performing compare and exchange if the read indicates it
3122 	 * is likely to be set.
3123 	 */
3124 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
3125 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
3126 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
3127 
3128 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
3129 		if (rarp_mbuf == NULL) {
3130 			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
3131 			count = 0;
3132 			goto out;
3133 		}
3134 		count -= 1;
3135 	}
3136 
3137 	if (vq_is_packed(dev)) {
3138 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3139 			count = virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool, pkts, count);
3140 		else
3141 			count = virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool, pkts, count);
3142 	} else {
3143 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
3144 			count = virtio_dev_tx_split_legacy(dev, vq, mbuf_pool, pkts, count);
3145 		else
3146 			count = virtio_dev_tx_split_compliant(dev, vq, mbuf_pool, pkts, count);
3147 	}
3148 
3149 out:
3150 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
3151 		vhost_user_iotlb_rd_unlock(vq);
3152 
3153 out_access_unlock:
3154 	rte_spinlock_unlock(&vq->access_lock);
3155 
3156 	if (unlikely(rarp_mbuf != NULL)) {
3157 		/*
3158 		 * Inject it to the head of "pkts" array, so that switch's mac
3159 		 * learning table will get updated first.
3160 		 */
3161 		memmove(&pkts[1], pkts, count * sizeof(struct rte_mbuf *));
3162 		pkts[0] = rarp_mbuf;
3163 		count += 1;
3164 	}
3165 
3166 	return count;
3167 }
3168