xref: /dpdk/lib/vhost/virtio_net.c (revision 99f9d799ce21ab22e922ffec8aad51d56e24d04d)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_ether.h>
12 #include <rte_ip.h>
13 #include <rte_vhost.h>
14 #include <rte_tcp.h>
15 #include <rte_udp.h>
16 #include <rte_sctp.h>
17 #include <rte_arp.h>
18 #include <rte_spinlock.h>
19 #include <rte_malloc.h>
20 #include <rte_vhost_async.h>
21 
22 #include "iotlb.h"
23 #include "vhost.h"
24 
25 #define MAX_BATCH_LEN 256
26 
27 #define VHOST_ASYNC_BATCH_THRESHOLD 32
28 
29 static  __rte_always_inline bool
30 rxvq_is_mergeable(struct virtio_net *dev)
31 {
32 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
33 }
34 
35 static  __rte_always_inline bool
36 virtio_net_is_inorder(struct virtio_net *dev)
37 {
38 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
39 }
40 
41 static bool
42 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
43 {
44 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
45 }
46 
47 static inline void
48 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
49 {
50 	struct batch_copy_elem *elem = vq->batch_copy_elems;
51 	uint16_t count = vq->batch_copy_nb_elems;
52 	int i;
53 
54 	for (i = 0; i < count; i++) {
55 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
56 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
57 					   elem[i].len);
58 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
59 	}
60 
61 	vq->batch_copy_nb_elems = 0;
62 }
63 
64 static inline void
65 do_data_copy_dequeue(struct vhost_virtqueue *vq)
66 {
67 	struct batch_copy_elem *elem = vq->batch_copy_elems;
68 	uint16_t count = vq->batch_copy_nb_elems;
69 	int i;
70 
71 	for (i = 0; i < count; i++)
72 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
73 
74 	vq->batch_copy_nb_elems = 0;
75 }
76 
77 static __rte_always_inline void
78 do_flush_shadow_used_ring_split(struct virtio_net *dev,
79 			struct vhost_virtqueue *vq,
80 			uint16_t to, uint16_t from, uint16_t size)
81 {
82 	rte_memcpy(&vq->used->ring[to],
83 			&vq->shadow_used_split[from],
84 			size * sizeof(struct vring_used_elem));
85 	vhost_log_cache_used_vring(dev, vq,
86 			offsetof(struct vring_used, ring[to]),
87 			size * sizeof(struct vring_used_elem));
88 }
89 
90 static __rte_always_inline void
91 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
92 {
93 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
94 
95 	if (used_idx + vq->shadow_used_idx <= vq->size) {
96 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
97 					  vq->shadow_used_idx);
98 	} else {
99 		uint16_t size;
100 
101 		/* update used ring interval [used_idx, vq->size] */
102 		size = vq->size - used_idx;
103 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
104 
105 		/* update the left half used ring interval [0, left_size] */
106 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
107 					  vq->shadow_used_idx - size);
108 	}
109 	vq->last_used_idx += vq->shadow_used_idx;
110 
111 	vhost_log_cache_sync(dev, vq);
112 
113 	__atomic_add_fetch(&vq->used->idx, vq->shadow_used_idx,
114 			   __ATOMIC_RELEASE);
115 	vq->shadow_used_idx = 0;
116 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
117 		sizeof(vq->used->idx));
118 }
119 
120 static __rte_always_inline void
121 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
122 			 uint16_t desc_idx, uint32_t len)
123 {
124 	uint16_t i = vq->shadow_used_idx++;
125 
126 	vq->shadow_used_split[i].id  = desc_idx;
127 	vq->shadow_used_split[i].len = len;
128 }
129 
130 static __rte_always_inline void
131 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
132 				  struct vhost_virtqueue *vq)
133 {
134 	int i;
135 	uint16_t used_idx = vq->last_used_idx;
136 	uint16_t head_idx = vq->last_used_idx;
137 	uint16_t head_flags = 0;
138 
139 	/* Split loop in two to save memory barriers */
140 	for (i = 0; i < vq->shadow_used_idx; i++) {
141 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
142 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
143 
144 		used_idx += vq->shadow_used_packed[i].count;
145 		if (used_idx >= vq->size)
146 			used_idx -= vq->size;
147 	}
148 
149 	/* The ordering for storing desc flags needs to be enforced. */
150 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
151 
152 	for (i = 0; i < vq->shadow_used_idx; i++) {
153 		uint16_t flags;
154 
155 		if (vq->shadow_used_packed[i].len)
156 			flags = VRING_DESC_F_WRITE;
157 		else
158 			flags = 0;
159 
160 		if (vq->used_wrap_counter) {
161 			flags |= VRING_DESC_F_USED;
162 			flags |= VRING_DESC_F_AVAIL;
163 		} else {
164 			flags &= ~VRING_DESC_F_USED;
165 			flags &= ~VRING_DESC_F_AVAIL;
166 		}
167 
168 		if (i > 0) {
169 			vq->desc_packed[vq->last_used_idx].flags = flags;
170 
171 			vhost_log_cache_used_vring(dev, vq,
172 					vq->last_used_idx *
173 					sizeof(struct vring_packed_desc),
174 					sizeof(struct vring_packed_desc));
175 		} else {
176 			head_idx = vq->last_used_idx;
177 			head_flags = flags;
178 		}
179 
180 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
181 	}
182 
183 	vq->desc_packed[head_idx].flags = head_flags;
184 
185 	vhost_log_cache_used_vring(dev, vq,
186 				head_idx *
187 				sizeof(struct vring_packed_desc),
188 				sizeof(struct vring_packed_desc));
189 
190 	vq->shadow_used_idx = 0;
191 	vhost_log_cache_sync(dev, vq);
192 }
193 
194 static __rte_always_inline void
195 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
196 				  struct vhost_virtqueue *vq)
197 {
198 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
199 
200 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
201 	/* desc flags is the synchronization point for virtio packed vring */
202 	__atomic_store_n(&vq->desc_packed[vq->shadow_last_used_idx].flags,
203 			 used_elem->flags, __ATOMIC_RELEASE);
204 
205 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
206 				   sizeof(struct vring_packed_desc),
207 				   sizeof(struct vring_packed_desc));
208 	vq->shadow_used_idx = 0;
209 	vhost_log_cache_sync(dev, vq);
210 }
211 
212 static __rte_always_inline void
213 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
214 				 struct vhost_virtqueue *vq,
215 				 uint64_t *lens,
216 				 uint16_t *ids)
217 {
218 	uint16_t i;
219 	uint16_t flags;
220 	uint16_t last_used_idx = vq->last_used_idx;
221 	struct vring_packed_desc *desc_base = &vq->desc_packed[last_used_idx];
222 
223 	if (vq->shadow_used_idx) {
224 		do_data_copy_enqueue(dev, vq);
225 		vhost_flush_enqueue_shadow_packed(dev, vq);
226 	}
227 
228 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
229 
230 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
231 		desc_base[i].id = ids[i];
232 		desc_base[i].len = lens[i];
233 	}
234 
235 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
236 
237 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
238 		desc_base[i].flags = flags;
239 	}
240 
241 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
242 				   sizeof(struct vring_packed_desc),
243 				   sizeof(struct vring_packed_desc) *
244 				   PACKED_BATCH_SIZE);
245 	vhost_log_cache_sync(dev, vq);
246 
247 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
248 }
249 
250 static __rte_always_inline void
251 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
252 					  uint16_t id)
253 {
254 	vq->shadow_used_packed[0].id = id;
255 
256 	if (!vq->shadow_used_idx) {
257 		vq->shadow_last_used_idx = vq->last_used_idx;
258 		vq->shadow_used_packed[0].flags =
259 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
260 		vq->shadow_used_packed[0].len = 0;
261 		vq->shadow_used_packed[0].count = 1;
262 		vq->shadow_used_idx++;
263 	}
264 
265 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
266 }
267 
268 static __rte_always_inline void
269 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
270 				  struct vhost_virtqueue *vq,
271 				  uint16_t *ids)
272 {
273 	uint16_t flags;
274 	uint16_t i;
275 	uint16_t begin;
276 
277 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
278 
279 	if (!vq->shadow_used_idx) {
280 		vq->shadow_last_used_idx = vq->last_used_idx;
281 		vq->shadow_used_packed[0].id  = ids[0];
282 		vq->shadow_used_packed[0].len = 0;
283 		vq->shadow_used_packed[0].count = 1;
284 		vq->shadow_used_packed[0].flags = flags;
285 		vq->shadow_used_idx++;
286 		begin = 1;
287 	} else
288 		begin = 0;
289 
290 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
291 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
292 		vq->desc_packed[vq->last_used_idx + i].len = 0;
293 	}
294 
295 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
296 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
297 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
298 
299 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
300 				   sizeof(struct vring_packed_desc),
301 				   sizeof(struct vring_packed_desc) *
302 				   PACKED_BATCH_SIZE);
303 	vhost_log_cache_sync(dev, vq);
304 
305 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
306 }
307 
308 static __rte_always_inline void
309 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
310 				   uint16_t buf_id,
311 				   uint16_t count)
312 {
313 	uint16_t flags;
314 
315 	flags = vq->desc_packed[vq->last_used_idx].flags;
316 	if (vq->used_wrap_counter) {
317 		flags |= VRING_DESC_F_USED;
318 		flags |= VRING_DESC_F_AVAIL;
319 	} else {
320 		flags &= ~VRING_DESC_F_USED;
321 		flags &= ~VRING_DESC_F_AVAIL;
322 	}
323 
324 	if (!vq->shadow_used_idx) {
325 		vq->shadow_last_used_idx = vq->last_used_idx;
326 
327 		vq->shadow_used_packed[0].id  = buf_id;
328 		vq->shadow_used_packed[0].len = 0;
329 		vq->shadow_used_packed[0].flags = flags;
330 		vq->shadow_used_idx++;
331 	} else {
332 		vq->desc_packed[vq->last_used_idx].id = buf_id;
333 		vq->desc_packed[vq->last_used_idx].len = 0;
334 		vq->desc_packed[vq->last_used_idx].flags = flags;
335 	}
336 
337 	vq_inc_last_used_packed(vq, count);
338 }
339 
340 static __rte_always_inline void
341 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
342 					   uint16_t buf_id,
343 					   uint16_t count)
344 {
345 	uint16_t flags;
346 
347 	vq->shadow_used_packed[0].id = buf_id;
348 
349 	flags = vq->desc_packed[vq->last_used_idx].flags;
350 	if (vq->used_wrap_counter) {
351 		flags |= VRING_DESC_F_USED;
352 		flags |= VRING_DESC_F_AVAIL;
353 	} else {
354 		flags &= ~VRING_DESC_F_USED;
355 		flags &= ~VRING_DESC_F_AVAIL;
356 	}
357 
358 	if (!vq->shadow_used_idx) {
359 		vq->shadow_last_used_idx = vq->last_used_idx;
360 		vq->shadow_used_packed[0].len = 0;
361 		vq->shadow_used_packed[0].flags = flags;
362 		vq->shadow_used_idx++;
363 	}
364 
365 	vq_inc_last_used_packed(vq, count);
366 }
367 
368 static __rte_always_inline void
369 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
370 				   uint32_t *len,
371 				   uint16_t *id,
372 				   uint16_t *count,
373 				   uint16_t num_buffers)
374 {
375 	uint16_t i;
376 
377 	for (i = 0; i < num_buffers; i++) {
378 		/* enqueue shadow flush action aligned with batch num */
379 		if (!vq->shadow_used_idx)
380 			vq->shadow_aligned_idx = vq->last_used_idx &
381 				PACKED_BATCH_MASK;
382 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
383 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
384 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
385 		vq->shadow_aligned_idx += count[i];
386 		vq->shadow_used_idx++;
387 	}
388 }
389 
390 static __rte_always_inline void
391 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
392 				   struct vhost_virtqueue *vq,
393 				   uint32_t *len,
394 				   uint16_t *id,
395 				   uint16_t *count,
396 				   uint16_t num_buffers)
397 {
398 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
399 
400 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
401 		do_data_copy_enqueue(dev, vq);
402 		vhost_flush_enqueue_shadow_packed(dev, vq);
403 	}
404 }
405 
406 /* avoid write operation when necessary, to lessen cache issues */
407 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
408 	if ((var) != (val))			\
409 		(var) = (val);			\
410 } while (0)
411 
412 static __rte_always_inline void
413 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
414 {
415 	uint64_t csum_l4 = m_buf->ol_flags & PKT_TX_L4_MASK;
416 
417 	if (m_buf->ol_flags & PKT_TX_TCP_SEG)
418 		csum_l4 |= PKT_TX_TCP_CKSUM;
419 
420 	if (csum_l4) {
421 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
422 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
423 
424 		switch (csum_l4) {
425 		case PKT_TX_TCP_CKSUM:
426 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
427 						cksum));
428 			break;
429 		case PKT_TX_UDP_CKSUM:
430 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
431 						dgram_cksum));
432 			break;
433 		case PKT_TX_SCTP_CKSUM:
434 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
435 						cksum));
436 			break;
437 		}
438 	} else {
439 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
440 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
441 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
442 	}
443 
444 	/* IP cksum verification cannot be bypassed, then calculate here */
445 	if (m_buf->ol_flags & PKT_TX_IP_CKSUM) {
446 		struct rte_ipv4_hdr *ipv4_hdr;
447 
448 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
449 						   m_buf->l2_len);
450 		ipv4_hdr->hdr_checksum = 0;
451 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
452 	}
453 
454 	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
455 		if (m_buf->ol_flags & PKT_TX_IPV4)
456 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
457 		else
458 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
459 		net_hdr->gso_size = m_buf->tso_segsz;
460 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
461 					+ m_buf->l4_len;
462 	} else if (m_buf->ol_flags & PKT_TX_UDP_SEG) {
463 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
464 		net_hdr->gso_size = m_buf->tso_segsz;
465 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
466 			m_buf->l4_len;
467 	} else {
468 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
469 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
470 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
471 	}
472 }
473 
474 static __rte_always_inline int
475 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
476 		struct buf_vector *buf_vec, uint16_t *vec_idx,
477 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
478 {
479 	uint16_t vec_id = *vec_idx;
480 
481 	while (desc_len) {
482 		uint64_t desc_addr;
483 		uint64_t desc_chunck_len = desc_len;
484 
485 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
486 			return -1;
487 
488 		desc_addr = vhost_iova_to_vva(dev, vq,
489 				desc_iova,
490 				&desc_chunck_len,
491 				perm);
492 		if (unlikely(!desc_addr))
493 			return -1;
494 
495 		rte_prefetch0((void *)(uintptr_t)desc_addr);
496 
497 		buf_vec[vec_id].buf_iova = desc_iova;
498 		buf_vec[vec_id].buf_addr = desc_addr;
499 		buf_vec[vec_id].buf_len  = desc_chunck_len;
500 
501 		desc_len -= desc_chunck_len;
502 		desc_iova += desc_chunck_len;
503 		vec_id++;
504 	}
505 	*vec_idx = vec_id;
506 
507 	return 0;
508 }
509 
510 static __rte_always_inline int
511 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
512 			 uint32_t avail_idx, uint16_t *vec_idx,
513 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
514 			 uint32_t *desc_chain_len, uint8_t perm)
515 {
516 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
517 	uint16_t vec_id = *vec_idx;
518 	uint32_t len    = 0;
519 	uint64_t dlen;
520 	uint32_t nr_descs = vq->size;
521 	uint32_t cnt    = 0;
522 	struct vring_desc *descs = vq->desc;
523 	struct vring_desc *idesc = NULL;
524 
525 	if (unlikely(idx >= vq->size))
526 		return -1;
527 
528 	*desc_chain_head = idx;
529 
530 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
531 		dlen = vq->desc[idx].len;
532 		nr_descs = dlen / sizeof(struct vring_desc);
533 		if (unlikely(nr_descs > vq->size))
534 			return -1;
535 
536 		descs = (struct vring_desc *)(uintptr_t)
537 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
538 						&dlen,
539 						VHOST_ACCESS_RO);
540 		if (unlikely(!descs))
541 			return -1;
542 
543 		if (unlikely(dlen < vq->desc[idx].len)) {
544 			/*
545 			 * The indirect desc table is not contiguous
546 			 * in process VA space, we have to copy it.
547 			 */
548 			idesc = vhost_alloc_copy_ind_table(dev, vq,
549 					vq->desc[idx].addr, vq->desc[idx].len);
550 			if (unlikely(!idesc))
551 				return -1;
552 
553 			descs = idesc;
554 		}
555 
556 		idx = 0;
557 	}
558 
559 	while (1) {
560 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
561 			free_ind_table(idesc);
562 			return -1;
563 		}
564 
565 		dlen = descs[idx].len;
566 		len += dlen;
567 
568 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
569 						descs[idx].addr, dlen,
570 						perm))) {
571 			free_ind_table(idesc);
572 			return -1;
573 		}
574 
575 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
576 			break;
577 
578 		idx = descs[idx].next;
579 	}
580 
581 	*desc_chain_len = len;
582 	*vec_idx = vec_id;
583 
584 	if (unlikely(!!idesc))
585 		free_ind_table(idesc);
586 
587 	return 0;
588 }
589 
590 /*
591  * Returns -1 on fail, 0 on success
592  */
593 static inline int
594 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
595 				uint32_t size, struct buf_vector *buf_vec,
596 				uint16_t *num_buffers, uint16_t avail_head,
597 				uint16_t *nr_vec)
598 {
599 	uint16_t cur_idx;
600 	uint16_t vec_idx = 0;
601 	uint16_t max_tries, tries = 0;
602 
603 	uint16_t head_idx = 0;
604 	uint32_t len = 0;
605 
606 	*num_buffers = 0;
607 	cur_idx  = vq->last_avail_idx;
608 
609 	if (rxvq_is_mergeable(dev))
610 		max_tries = vq->size - 1;
611 	else
612 		max_tries = 1;
613 
614 	while (size > 0) {
615 		if (unlikely(cur_idx == avail_head))
616 			return -1;
617 		/*
618 		 * if we tried all available ring items, and still
619 		 * can't get enough buf, it means something abnormal
620 		 * happened.
621 		 */
622 		if (unlikely(++tries > max_tries))
623 			return -1;
624 
625 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
626 						&vec_idx, buf_vec,
627 						&head_idx, &len,
628 						VHOST_ACCESS_RW) < 0))
629 			return -1;
630 		len = RTE_MIN(len, size);
631 		update_shadow_used_ring_split(vq, head_idx, len);
632 		size -= len;
633 
634 		cur_idx++;
635 		*num_buffers += 1;
636 	}
637 
638 	*nr_vec = vec_idx;
639 
640 	return 0;
641 }
642 
643 static __rte_always_inline int
644 fill_vec_buf_packed_indirect(struct virtio_net *dev,
645 			struct vhost_virtqueue *vq,
646 			struct vring_packed_desc *desc, uint16_t *vec_idx,
647 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
648 {
649 	uint16_t i;
650 	uint32_t nr_descs;
651 	uint16_t vec_id = *vec_idx;
652 	uint64_t dlen;
653 	struct vring_packed_desc *descs, *idescs = NULL;
654 
655 	dlen = desc->len;
656 	descs = (struct vring_packed_desc *)(uintptr_t)
657 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
658 	if (unlikely(!descs))
659 		return -1;
660 
661 	if (unlikely(dlen < desc->len)) {
662 		/*
663 		 * The indirect desc table is not contiguous
664 		 * in process VA space, we have to copy it.
665 		 */
666 		idescs = vhost_alloc_copy_ind_table(dev,
667 				vq, desc->addr, desc->len);
668 		if (unlikely(!idescs))
669 			return -1;
670 
671 		descs = idescs;
672 	}
673 
674 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
675 	if (unlikely(nr_descs >= vq->size)) {
676 		free_ind_table(idescs);
677 		return -1;
678 	}
679 
680 	for (i = 0; i < nr_descs; i++) {
681 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
682 			free_ind_table(idescs);
683 			return -1;
684 		}
685 
686 		dlen = descs[i].len;
687 		*len += dlen;
688 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
689 						descs[i].addr, dlen,
690 						perm)))
691 			return -1;
692 	}
693 	*vec_idx = vec_id;
694 
695 	if (unlikely(!!idescs))
696 		free_ind_table(idescs);
697 
698 	return 0;
699 }
700 
701 static __rte_always_inline int
702 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
703 				uint16_t avail_idx, uint16_t *desc_count,
704 				struct buf_vector *buf_vec, uint16_t *vec_idx,
705 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
706 {
707 	bool wrap_counter = vq->avail_wrap_counter;
708 	struct vring_packed_desc *descs = vq->desc_packed;
709 	uint16_t vec_id = *vec_idx;
710 	uint64_t dlen;
711 
712 	if (avail_idx < vq->last_avail_idx)
713 		wrap_counter ^= 1;
714 
715 	/*
716 	 * Perform a load-acquire barrier in desc_is_avail to
717 	 * enforce the ordering between desc flags and desc
718 	 * content.
719 	 */
720 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
721 		return -1;
722 
723 	*desc_count = 0;
724 	*len = 0;
725 
726 	while (1) {
727 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
728 			return -1;
729 
730 		if (unlikely(*desc_count >= vq->size))
731 			return -1;
732 
733 		*desc_count += 1;
734 		*buf_id = descs[avail_idx].id;
735 
736 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
737 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
738 							&descs[avail_idx],
739 							&vec_id, buf_vec,
740 							len, perm) < 0))
741 				return -1;
742 		} else {
743 			dlen = descs[avail_idx].len;
744 			*len += dlen;
745 
746 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
747 							descs[avail_idx].addr,
748 							dlen,
749 							perm)))
750 				return -1;
751 		}
752 
753 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
754 			break;
755 
756 		if (++avail_idx >= vq->size) {
757 			avail_idx -= vq->size;
758 			wrap_counter ^= 1;
759 		}
760 	}
761 
762 	*vec_idx = vec_id;
763 
764 	return 0;
765 }
766 
767 static __rte_noinline void
768 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
769 		struct buf_vector *buf_vec,
770 		struct virtio_net_hdr_mrg_rxbuf *hdr)
771 {
772 	uint64_t len;
773 	uint64_t remain = dev->vhost_hlen;
774 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
775 	uint64_t iova = buf_vec->buf_iova;
776 
777 	while (remain) {
778 		len = RTE_MIN(remain,
779 				buf_vec->buf_len);
780 		dst = buf_vec->buf_addr;
781 		rte_memcpy((void *)(uintptr_t)dst,
782 				(void *)(uintptr_t)src,
783 				len);
784 
785 		PRINT_PACKET(dev, (uintptr_t)dst,
786 				(uint32_t)len, 0);
787 		vhost_log_cache_write_iova(dev, vq,
788 				iova, len);
789 
790 		remain -= len;
791 		iova += len;
792 		src += len;
793 		buf_vec++;
794 	}
795 }
796 
797 static __rte_always_inline int
798 copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
799 			    struct rte_mbuf *m, struct buf_vector *buf_vec,
800 			    uint16_t nr_vec, uint16_t num_buffers)
801 {
802 	uint32_t vec_idx = 0;
803 	uint32_t mbuf_offset, mbuf_avail;
804 	uint32_t buf_offset, buf_avail;
805 	uint64_t buf_addr, buf_iova, buf_len;
806 	uint32_t cpy_len;
807 	uint64_t hdr_addr;
808 	struct rte_mbuf *hdr_mbuf;
809 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
810 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
811 	int error = 0;
812 
813 	if (unlikely(m == NULL)) {
814 		error = -1;
815 		goto out;
816 	}
817 
818 	buf_addr = buf_vec[vec_idx].buf_addr;
819 	buf_iova = buf_vec[vec_idx].buf_iova;
820 	buf_len = buf_vec[vec_idx].buf_len;
821 
822 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
823 		error = -1;
824 		goto out;
825 	}
826 
827 	hdr_mbuf = m;
828 	hdr_addr = buf_addr;
829 	if (unlikely(buf_len < dev->vhost_hlen)) {
830 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
831 		hdr = &tmp_hdr;
832 	} else
833 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
834 
835 	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
836 		dev->vid, num_buffers);
837 
838 	if (unlikely(buf_len < dev->vhost_hlen)) {
839 		buf_offset = dev->vhost_hlen - buf_len;
840 		vec_idx++;
841 		buf_addr = buf_vec[vec_idx].buf_addr;
842 		buf_iova = buf_vec[vec_idx].buf_iova;
843 		buf_len = buf_vec[vec_idx].buf_len;
844 		buf_avail = buf_len - buf_offset;
845 	} else {
846 		buf_offset = dev->vhost_hlen;
847 		buf_avail = buf_len - dev->vhost_hlen;
848 	}
849 
850 	mbuf_avail  = rte_pktmbuf_data_len(m);
851 	mbuf_offset = 0;
852 	while (mbuf_avail != 0 || m->next != NULL) {
853 		/* done with current buf, get the next one */
854 		if (buf_avail == 0) {
855 			vec_idx++;
856 			if (unlikely(vec_idx >= nr_vec)) {
857 				error = -1;
858 				goto out;
859 			}
860 
861 			buf_addr = buf_vec[vec_idx].buf_addr;
862 			buf_iova = buf_vec[vec_idx].buf_iova;
863 			buf_len = buf_vec[vec_idx].buf_len;
864 
865 			buf_offset = 0;
866 			buf_avail  = buf_len;
867 		}
868 
869 		/* done with current mbuf, get the next one */
870 		if (mbuf_avail == 0) {
871 			m = m->next;
872 
873 			mbuf_offset = 0;
874 			mbuf_avail  = rte_pktmbuf_data_len(m);
875 		}
876 
877 		if (hdr_addr) {
878 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
879 			if (rxvq_is_mergeable(dev))
880 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
881 						num_buffers);
882 
883 			if (unlikely(hdr == &tmp_hdr)) {
884 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
885 			} else {
886 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
887 						dev->vhost_hlen, 0);
888 				vhost_log_cache_write_iova(dev, vq,
889 						buf_vec[0].buf_iova,
890 						dev->vhost_hlen);
891 			}
892 
893 			hdr_addr = 0;
894 		}
895 
896 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
897 
898 		if (likely(cpy_len > MAX_BATCH_LEN ||
899 					vq->batch_copy_nb_elems >= vq->size)) {
900 			rte_memcpy((void *)((uintptr_t)(buf_addr + buf_offset)),
901 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
902 				cpy_len);
903 			vhost_log_cache_write_iova(dev, vq,
904 						   buf_iova + buf_offset,
905 						   cpy_len);
906 			PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset),
907 				cpy_len, 0);
908 		} else {
909 			batch_copy[vq->batch_copy_nb_elems].dst =
910 				(void *)((uintptr_t)(buf_addr + buf_offset));
911 			batch_copy[vq->batch_copy_nb_elems].src =
912 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
913 			batch_copy[vq->batch_copy_nb_elems].log_addr =
914 				buf_iova + buf_offset;
915 			batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
916 			vq->batch_copy_nb_elems++;
917 		}
918 
919 		mbuf_avail  -= cpy_len;
920 		mbuf_offset += cpy_len;
921 		buf_avail  -= cpy_len;
922 		buf_offset += cpy_len;
923 	}
924 
925 out:
926 
927 	return error;
928 }
929 
930 static __rte_always_inline void
931 async_fill_vec(struct iovec *v, void *base, size_t len)
932 {
933 	v->iov_base = base;
934 	v->iov_len = len;
935 }
936 
937 static __rte_always_inline void
938 async_fill_iter(struct rte_vhost_iov_iter *it, size_t count,
939 	struct iovec *vec, unsigned long nr_seg)
940 {
941 	it->offset = 0;
942 	it->count = count;
943 
944 	if (count) {
945 		it->iov = vec;
946 		it->nr_segs = nr_seg;
947 	} else {
948 		it->iov = 0;
949 		it->nr_segs = 0;
950 	}
951 }
952 
953 static __rte_always_inline void
954 async_fill_desc(struct rte_vhost_async_desc *desc,
955 	struct rte_vhost_iov_iter *src, struct rte_vhost_iov_iter *dst)
956 {
957 	desc->src = src;
958 	desc->dst = dst;
959 }
960 
961 static __rte_always_inline int
962 async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
963 			struct rte_mbuf *m, struct buf_vector *buf_vec,
964 			uint16_t nr_vec, uint16_t num_buffers,
965 			struct iovec *src_iovec, struct iovec *dst_iovec,
966 			struct rte_vhost_iov_iter *src_it,
967 			struct rte_vhost_iov_iter *dst_it)
968 {
969 	uint32_t vec_idx = 0;
970 	uint32_t mbuf_offset, mbuf_avail;
971 	uint32_t buf_offset, buf_avail;
972 	uint64_t buf_addr, buf_iova, buf_len;
973 	uint32_t cpy_len, cpy_threshold;
974 	uint64_t hdr_addr;
975 	struct rte_mbuf *hdr_mbuf;
976 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
977 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
978 	int error = 0;
979 	uint64_t mapped_len;
980 
981 	uint32_t tlen = 0;
982 	int tvec_idx = 0;
983 	void *hpa;
984 
985 	if (unlikely(m == NULL)) {
986 		error = -1;
987 		goto out;
988 	}
989 
990 	cpy_threshold = vq->async_threshold;
991 
992 	buf_addr = buf_vec[vec_idx].buf_addr;
993 	buf_iova = buf_vec[vec_idx].buf_iova;
994 	buf_len = buf_vec[vec_idx].buf_len;
995 
996 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
997 		error = -1;
998 		goto out;
999 	}
1000 
1001 	hdr_mbuf = m;
1002 	hdr_addr = buf_addr;
1003 	if (unlikely(buf_len < dev->vhost_hlen)) {
1004 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
1005 		hdr = &tmp_hdr;
1006 	} else
1007 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
1008 
1009 	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
1010 		dev->vid, num_buffers);
1011 
1012 	if (unlikely(buf_len < dev->vhost_hlen)) {
1013 		buf_offset = dev->vhost_hlen - buf_len;
1014 		vec_idx++;
1015 		buf_addr = buf_vec[vec_idx].buf_addr;
1016 		buf_iova = buf_vec[vec_idx].buf_iova;
1017 		buf_len = buf_vec[vec_idx].buf_len;
1018 		buf_avail = buf_len - buf_offset;
1019 	} else {
1020 		buf_offset = dev->vhost_hlen;
1021 		buf_avail = buf_len - dev->vhost_hlen;
1022 	}
1023 
1024 	mbuf_avail  = rte_pktmbuf_data_len(m);
1025 	mbuf_offset = 0;
1026 
1027 	while (mbuf_avail != 0 || m->next != NULL) {
1028 		/* done with current buf, get the next one */
1029 		if (buf_avail == 0) {
1030 			vec_idx++;
1031 			if (unlikely(vec_idx >= nr_vec)) {
1032 				error = -1;
1033 				goto out;
1034 			}
1035 
1036 			buf_addr = buf_vec[vec_idx].buf_addr;
1037 			buf_iova = buf_vec[vec_idx].buf_iova;
1038 			buf_len = buf_vec[vec_idx].buf_len;
1039 
1040 			buf_offset = 0;
1041 			buf_avail  = buf_len;
1042 		}
1043 
1044 		/* done with current mbuf, get the next one */
1045 		if (mbuf_avail == 0) {
1046 			m = m->next;
1047 
1048 			mbuf_offset = 0;
1049 			mbuf_avail  = rte_pktmbuf_data_len(m);
1050 		}
1051 
1052 		if (hdr_addr) {
1053 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1054 			if (rxvq_is_mergeable(dev))
1055 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1056 						num_buffers);
1057 
1058 			if (unlikely(hdr == &tmp_hdr)) {
1059 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1060 			} else {
1061 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1062 						dev->vhost_hlen, 0);
1063 				vhost_log_cache_write_iova(dev, vq,
1064 						buf_vec[0].buf_iova,
1065 						dev->vhost_hlen);
1066 			}
1067 
1068 			hdr_addr = 0;
1069 		}
1070 
1071 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1072 
1073 		while (unlikely(cpy_len && cpy_len >= cpy_threshold)) {
1074 			hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev,
1075 					buf_iova + buf_offset,
1076 					cpy_len, &mapped_len);
1077 
1078 			if (unlikely(!hpa || mapped_len < cpy_threshold))
1079 				break;
1080 
1081 			async_fill_vec(src_iovec + tvec_idx,
1082 				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
1083 				mbuf_offset), (size_t)mapped_len);
1084 
1085 			async_fill_vec(dst_iovec + tvec_idx,
1086 					hpa, (size_t)mapped_len);
1087 
1088 			tlen += (uint32_t)mapped_len;
1089 			cpy_len -= (uint32_t)mapped_len;
1090 			mbuf_avail  -= (uint32_t)mapped_len;
1091 			mbuf_offset += (uint32_t)mapped_len;
1092 			buf_avail  -= (uint32_t)mapped_len;
1093 			buf_offset += (uint32_t)mapped_len;
1094 			tvec_idx++;
1095 		}
1096 
1097 		if (likely(cpy_len)) {
1098 			if (unlikely(vq->batch_copy_nb_elems >= vq->size)) {
1099 				rte_memcpy(
1100 				(void *)((uintptr_t)(buf_addr + buf_offset)),
1101 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1102 				cpy_len);
1103 
1104 				PRINT_PACKET(dev,
1105 					(uintptr_t)(buf_addr + buf_offset),
1106 					cpy_len, 0);
1107 			} else {
1108 				batch_copy[vq->batch_copy_nb_elems].dst =
1109 				(void *)((uintptr_t)(buf_addr + buf_offset));
1110 				batch_copy[vq->batch_copy_nb_elems].src =
1111 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1112 				batch_copy[vq->batch_copy_nb_elems].log_addr =
1113 					buf_iova + buf_offset;
1114 				batch_copy[vq->batch_copy_nb_elems].len =
1115 					cpy_len;
1116 				vq->batch_copy_nb_elems++;
1117 			}
1118 
1119 			mbuf_avail  -= cpy_len;
1120 			mbuf_offset += cpy_len;
1121 			buf_avail  -= cpy_len;
1122 			buf_offset += cpy_len;
1123 		}
1124 
1125 	}
1126 
1127 out:
1128 	if (tlen) {
1129 		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
1130 		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
1131 	} else {
1132 		src_it->count = 0;
1133 	}
1134 
1135 	return error;
1136 }
1137 
1138 static __rte_always_inline int
1139 vhost_enqueue_single_packed(struct virtio_net *dev,
1140 			    struct vhost_virtqueue *vq,
1141 			    struct rte_mbuf *pkt,
1142 			    struct buf_vector *buf_vec,
1143 			    uint16_t *nr_descs)
1144 {
1145 	uint16_t nr_vec = 0;
1146 	uint16_t avail_idx = vq->last_avail_idx;
1147 	uint16_t max_tries, tries = 0;
1148 	uint16_t buf_id = 0;
1149 	uint32_t len = 0;
1150 	uint16_t desc_count;
1151 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1152 	uint16_t num_buffers = 0;
1153 	uint32_t buffer_len[vq->size];
1154 	uint16_t buffer_buf_id[vq->size];
1155 	uint16_t buffer_desc_count[vq->size];
1156 
1157 	if (rxvq_is_mergeable(dev))
1158 		max_tries = vq->size - 1;
1159 	else
1160 		max_tries = 1;
1161 
1162 	while (size > 0) {
1163 		/*
1164 		 * if we tried all available ring items, and still
1165 		 * can't get enough buf, it means something abnormal
1166 		 * happened.
1167 		 */
1168 		if (unlikely(++tries > max_tries))
1169 			return -1;
1170 
1171 		if (unlikely(fill_vec_buf_packed(dev, vq,
1172 						avail_idx, &desc_count,
1173 						buf_vec, &nr_vec,
1174 						&buf_id, &len,
1175 						VHOST_ACCESS_RW) < 0))
1176 			return -1;
1177 
1178 		len = RTE_MIN(len, size);
1179 		size -= len;
1180 
1181 		buffer_len[num_buffers] = len;
1182 		buffer_buf_id[num_buffers] = buf_id;
1183 		buffer_desc_count[num_buffers] = desc_count;
1184 		num_buffers += 1;
1185 
1186 		*nr_descs += desc_count;
1187 		avail_idx += desc_count;
1188 		if (avail_idx >= vq->size)
1189 			avail_idx -= vq->size;
1190 	}
1191 
1192 	if (copy_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers) < 0)
1193 		return -1;
1194 
1195 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1196 					   buffer_desc_count, num_buffers);
1197 
1198 	return 0;
1199 }
1200 
1201 static __rte_noinline uint32_t
1202 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1203 	struct rte_mbuf **pkts, uint32_t count)
1204 {
1205 	uint32_t pkt_idx = 0;
1206 	uint16_t num_buffers;
1207 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1208 	uint16_t avail_head;
1209 
1210 	/*
1211 	 * The ordering between avail index and
1212 	 * desc reads needs to be enforced.
1213 	 */
1214 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1215 
1216 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1217 
1218 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1219 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1220 		uint16_t nr_vec = 0;
1221 
1222 		if (unlikely(reserve_avail_buf_split(dev, vq,
1223 						pkt_len, buf_vec, &num_buffers,
1224 						avail_head, &nr_vec) < 0)) {
1225 			VHOST_LOG_DATA(DEBUG,
1226 				"(%d) failed to get enough desc from vring\n",
1227 				dev->vid);
1228 			vq->shadow_used_idx -= num_buffers;
1229 			break;
1230 		}
1231 
1232 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1233 			dev->vid, vq->last_avail_idx,
1234 			vq->last_avail_idx + num_buffers);
1235 
1236 		if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
1237 						buf_vec, nr_vec,
1238 						num_buffers) < 0) {
1239 			vq->shadow_used_idx -= num_buffers;
1240 			break;
1241 		}
1242 
1243 		vq->last_avail_idx += num_buffers;
1244 	}
1245 
1246 	do_data_copy_enqueue(dev, vq);
1247 
1248 	if (likely(vq->shadow_used_idx)) {
1249 		flush_shadow_used_ring_split(dev, vq);
1250 		vhost_vring_call_split(dev, vq);
1251 	}
1252 
1253 	return pkt_idx;
1254 }
1255 
1256 static __rte_always_inline int
1257 virtio_dev_rx_batch_packed(struct virtio_net *dev,
1258 			   struct vhost_virtqueue *vq,
1259 			   struct rte_mbuf **pkts)
1260 {
1261 	bool wrap_counter = vq->avail_wrap_counter;
1262 	struct vring_packed_desc *descs = vq->desc_packed;
1263 	uint16_t avail_idx = vq->last_avail_idx;
1264 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1265 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1266 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1267 	uint64_t lens[PACKED_BATCH_SIZE];
1268 	uint16_t ids[PACKED_BATCH_SIZE];
1269 	uint16_t i;
1270 
1271 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1272 		return -1;
1273 
1274 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1275 		return -1;
1276 
1277 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1278 		if (unlikely(pkts[i]->next != NULL))
1279 			return -1;
1280 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1281 					    wrap_counter)))
1282 			return -1;
1283 	}
1284 
1285 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1286 		lens[i] = descs[avail_idx + i].len;
1287 
1288 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1289 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1290 			return -1;
1291 	}
1292 
1293 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1294 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1295 						  descs[avail_idx + i].addr,
1296 						  &lens[i],
1297 						  VHOST_ACCESS_RW);
1298 
1299 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1300 		if (unlikely(!desc_addrs[i]))
1301 			return -1;
1302 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1303 			return -1;
1304 	}
1305 
1306 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1307 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1308 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1309 					(uintptr_t)desc_addrs[i];
1310 		lens[i] = pkts[i]->pkt_len +
1311 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1312 	}
1313 
1314 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1315 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1316 
1317 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1318 
1319 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1320 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1321 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1322 			   pkts[i]->pkt_len);
1323 	}
1324 
1325 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1326 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1327 					   lens[i]);
1328 
1329 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1330 		ids[i] = descs[avail_idx + i].id;
1331 
1332 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1333 
1334 	return 0;
1335 }
1336 
1337 static __rte_always_inline int16_t
1338 virtio_dev_rx_single_packed(struct virtio_net *dev,
1339 			    struct vhost_virtqueue *vq,
1340 			    struct rte_mbuf *pkt)
1341 {
1342 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1343 	uint16_t nr_descs = 0;
1344 
1345 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1346 						 &nr_descs) < 0)) {
1347 		VHOST_LOG_DATA(DEBUG,
1348 				"(%d) failed to get enough desc from vring\n",
1349 				dev->vid);
1350 		return -1;
1351 	}
1352 
1353 	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1354 			dev->vid, vq->last_avail_idx,
1355 			vq->last_avail_idx + nr_descs);
1356 
1357 	vq_inc_last_avail_packed(vq, nr_descs);
1358 
1359 	return 0;
1360 }
1361 
1362 static __rte_noinline uint32_t
1363 virtio_dev_rx_packed(struct virtio_net *dev,
1364 		     struct vhost_virtqueue *__rte_restrict vq,
1365 		     struct rte_mbuf **__rte_restrict pkts,
1366 		     uint32_t count)
1367 {
1368 	uint32_t pkt_idx = 0;
1369 
1370 	do {
1371 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1372 
1373 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1374 			if (!virtio_dev_rx_batch_packed(dev, vq,
1375 							&pkts[pkt_idx])) {
1376 				pkt_idx += PACKED_BATCH_SIZE;
1377 				continue;
1378 			}
1379 		}
1380 
1381 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1382 			break;
1383 		pkt_idx++;
1384 
1385 	} while (pkt_idx < count);
1386 
1387 	if (vq->shadow_used_idx) {
1388 		do_data_copy_enqueue(dev, vq);
1389 		vhost_flush_enqueue_shadow_packed(dev, vq);
1390 	}
1391 
1392 	if (pkt_idx)
1393 		vhost_vring_call_packed(dev, vq);
1394 
1395 	return pkt_idx;
1396 }
1397 
1398 static __rte_always_inline uint32_t
1399 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
1400 	struct rte_mbuf **pkts, uint32_t count)
1401 {
1402 	struct vhost_virtqueue *vq;
1403 	uint32_t nb_tx = 0;
1404 
1405 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1406 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1407 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1408 			dev->vid, __func__, queue_id);
1409 		return 0;
1410 	}
1411 
1412 	vq = dev->virtqueue[queue_id];
1413 
1414 	rte_spinlock_lock(&vq->access_lock);
1415 
1416 	if (unlikely(!vq->enabled))
1417 		goto out_access_unlock;
1418 
1419 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1420 		vhost_user_iotlb_rd_lock(vq);
1421 
1422 	if (unlikely(!vq->access_ok))
1423 		if (unlikely(vring_translate(dev, vq) < 0))
1424 			goto out;
1425 
1426 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1427 	if (count == 0)
1428 		goto out;
1429 
1430 	if (vq_is_packed(dev))
1431 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1432 	else
1433 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1434 
1435 out:
1436 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1437 		vhost_user_iotlb_rd_unlock(vq);
1438 
1439 out_access_unlock:
1440 	rte_spinlock_unlock(&vq->access_lock);
1441 
1442 	return nb_tx;
1443 }
1444 
1445 uint16_t
1446 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1447 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1448 {
1449 	struct virtio_net *dev = get_device(vid);
1450 
1451 	if (!dev)
1452 		return 0;
1453 
1454 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1455 		VHOST_LOG_DATA(ERR,
1456 			"(%d) %s: built-in vhost net backend is disabled.\n",
1457 			dev->vid, __func__);
1458 		return 0;
1459 	}
1460 
1461 	return virtio_dev_rx(dev, queue_id, pkts, count);
1462 }
1463 
1464 static __rte_always_inline uint16_t
1465 virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
1466 	uint16_t vq_size, uint16_t n_inflight)
1467 {
1468 	return pkts_idx > n_inflight ? (pkts_idx - n_inflight) :
1469 		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
1470 }
1471 
1472 static __rte_always_inline void
1473 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1474 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1475 {
1476 	uint16_t elem_size = sizeof(struct vring_used_elem);
1477 
1478 	if (d_idx + count <= ring_size) {
1479 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1480 	} else {
1481 		uint16_t size = ring_size - d_idx;
1482 
1483 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1484 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1485 	}
1486 }
1487 
1488 static __rte_always_inline void
1489 store_dma_desc_info_packed(struct vring_used_elem_packed *s_ring,
1490 		struct vring_used_elem_packed *d_ring,
1491 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1492 {
1493 	uint16_t elem_size = sizeof(struct vring_used_elem_packed);
1494 
1495 	if (d_idx + count <= ring_size) {
1496 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1497 	} else {
1498 		uint16_t size = ring_size - d_idx;
1499 
1500 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1501 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1502 	}
1503 }
1504 
1505 static __rte_noinline uint32_t
1506 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
1507 	struct vhost_virtqueue *vq, uint16_t queue_id,
1508 	struct rte_mbuf **pkts, uint32_t count,
1509 	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
1510 {
1511 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
1512 	uint16_t num_buffers;
1513 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1514 	uint16_t avail_head;
1515 
1516 	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
1517 	struct iovec *vec_pool = vq->vec_pool;
1518 	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
1519 	struct iovec *src_iovec = vec_pool;
1520 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
1521 	uint16_t slot_idx = 0;
1522 	uint16_t segs_await = 0;
1523 	uint16_t iovec_idx = 0, it_idx = 0;
1524 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
1525 	uint32_t n_pkts = 0, pkt_err = 0;
1526 	uint32_t num_async_pkts = 0, num_done_pkts = 0;
1527 	struct {
1528 		uint16_t pkt_idx;
1529 		uint16_t last_avail_idx;
1530 	} async_pkts_log[MAX_PKT_BURST];
1531 
1532 	/*
1533 	 * The ordering between avail index and desc reads need to be enforced.
1534 	 */
1535 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1536 
1537 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1538 
1539 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1540 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1541 		uint16_t nr_vec = 0;
1542 
1543 		if (unlikely(reserve_avail_buf_split(dev, vq,
1544 						pkt_len, buf_vec, &num_buffers,
1545 						avail_head, &nr_vec) < 0)) {
1546 			VHOST_LOG_DATA(DEBUG,
1547 				"(%d) failed to get enough desc from vring\n",
1548 				dev->vid);
1549 			vq->shadow_used_idx -= num_buffers;
1550 			break;
1551 		}
1552 
1553 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1554 			dev->vid, vq->last_avail_idx,
1555 			vq->last_avail_idx + num_buffers);
1556 
1557 		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers,
1558 				&src_iovec[iovec_idx], &dst_iovec[iovec_idx],
1559 				&it_pool[it_idx], &it_pool[it_idx + 1]) < 0) {
1560 			vq->shadow_used_idx -= num_buffers;
1561 			break;
1562 		}
1563 
1564 		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
1565 			(vq->size - 1);
1566 		if (it_pool[it_idx].count) {
1567 			uint16_t from, to;
1568 
1569 			async_fill_desc(&tdes[pkt_burst_idx++],
1570 				&it_pool[it_idx], &it_pool[it_idx + 1]);
1571 			pkts_info[slot_idx].descs = num_buffers;
1572 			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1573 			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
1574 			async_pkts_log[num_async_pkts++].last_avail_idx =
1575 				vq->last_avail_idx;
1576 
1577 			iovec_idx += it_pool[it_idx].nr_segs;
1578 			it_idx += 2;
1579 
1580 			segs_await += it_pool[it_idx].nr_segs;
1581 
1582 			/**
1583 			 * recover shadow used ring and keep DMA-occupied
1584 			 * descriptors.
1585 			 */
1586 			from = vq->shadow_used_idx - num_buffers;
1587 			to = vq->async_desc_idx_split & (vq->size - 1);
1588 
1589 			store_dma_desc_info_split(vq->shadow_used_split,
1590 					vq->async_descs_split, vq->size, from, to, num_buffers);
1591 
1592 			vq->async_desc_idx_split += num_buffers;
1593 			vq->shadow_used_idx -= num_buffers;
1594 		} else
1595 			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
1596 
1597 		vq->last_avail_idx += num_buffers;
1598 
1599 		/*
1600 		 * conditions to trigger async device transfer:
1601 		 * - buffered packet number reaches transfer threshold
1602 		 * - unused async iov number is less than max vhost vector
1603 		 */
1604 		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
1605 			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
1606 			BUF_VECTOR_MAX))) {
1607 			n_pkts = vq->async_ops.transfer_data(dev->vid,
1608 					queue_id, tdes, 0, pkt_burst_idx);
1609 			iovec_idx = 0;
1610 			it_idx = 0;
1611 
1612 			segs_await = 0;
1613 			vq->async_pkts_inflight_n += n_pkts;
1614 
1615 			if (unlikely(n_pkts < pkt_burst_idx)) {
1616 				/*
1617 				 * log error packets number here and do actual
1618 				 * error processing when applications poll
1619 				 * completion
1620 				 */
1621 				pkt_err = pkt_burst_idx - n_pkts;
1622 				pkt_burst_idx = 0;
1623 				break;
1624 			}
1625 
1626 			pkt_burst_idx = 0;
1627 		}
1628 	}
1629 
1630 	if (pkt_burst_idx) {
1631 		n_pkts = vq->async_ops.transfer_data(dev->vid,
1632 				queue_id, tdes, 0, pkt_burst_idx);
1633 		vq->async_pkts_inflight_n += n_pkts;
1634 
1635 		if (unlikely(n_pkts < pkt_burst_idx))
1636 			pkt_err = pkt_burst_idx - n_pkts;
1637 	}
1638 
1639 	do_data_copy_enqueue(dev, vq);
1640 
1641 	if (unlikely(pkt_err)) {
1642 		uint16_t num_descs = 0;
1643 
1644 		num_async_pkts -= pkt_err;
1645 		/* calculate the sum of descriptors of DMA-error packets. */
1646 		while (pkt_err-- > 0) {
1647 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1648 			slot_idx--;
1649 		}
1650 		vq->async_desc_idx_split -= num_descs;
1651 		/* recover shadow used ring and available ring */
1652 		vq->shadow_used_idx -= (vq->last_avail_idx -
1653 				async_pkts_log[num_async_pkts].last_avail_idx -
1654 				num_descs);
1655 		vq->last_avail_idx =
1656 			async_pkts_log[num_async_pkts].last_avail_idx;
1657 		pkt_idx = async_pkts_log[num_async_pkts].pkt_idx;
1658 		num_done_pkts = pkt_idx - num_async_pkts;
1659 	}
1660 
1661 	vq->async_pkts_idx += num_async_pkts;
1662 	*comp_count = num_done_pkts;
1663 
1664 	if (likely(vq->shadow_used_idx)) {
1665 		flush_shadow_used_ring_split(dev, vq);
1666 		vhost_vring_call_split(dev, vq);
1667 	}
1668 
1669 	return pkt_idx;
1670 }
1671 
1672 static __rte_always_inline void
1673 vhost_update_used_packed(struct vhost_virtqueue *vq,
1674 			struct vring_used_elem_packed *shadow_ring,
1675 			uint16_t count)
1676 {
1677 	int i;
1678 	uint16_t used_idx = vq->last_used_idx;
1679 	uint16_t head_idx = vq->last_used_idx;
1680 	uint16_t head_flags = 0;
1681 
1682 	if (count == 0)
1683 		return;
1684 
1685 	/* Split loop in two to save memory barriers */
1686 	for (i = 0; i < count; i++) {
1687 		vq->desc_packed[used_idx].id = shadow_ring[i].id;
1688 		vq->desc_packed[used_idx].len = shadow_ring[i].len;
1689 
1690 		used_idx += shadow_ring[i].count;
1691 		if (used_idx >= vq->size)
1692 			used_idx -= vq->size;
1693 	}
1694 
1695 	/* The ordering for storing desc flags needs to be enforced. */
1696 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
1697 
1698 	for (i = 0; i < count; i++) {
1699 		uint16_t flags;
1700 
1701 		if (vq->shadow_used_packed[i].len)
1702 			flags = VRING_DESC_F_WRITE;
1703 		else
1704 			flags = 0;
1705 
1706 		if (vq->used_wrap_counter) {
1707 			flags |= VRING_DESC_F_USED;
1708 			flags |= VRING_DESC_F_AVAIL;
1709 		} else {
1710 			flags &= ~VRING_DESC_F_USED;
1711 			flags &= ~VRING_DESC_F_AVAIL;
1712 		}
1713 
1714 		if (i > 0) {
1715 			vq->desc_packed[vq->last_used_idx].flags = flags;
1716 		} else {
1717 			head_idx = vq->last_used_idx;
1718 			head_flags = flags;
1719 		}
1720 
1721 		vq_inc_last_used_packed(vq, shadow_ring[i].count);
1722 	}
1723 
1724 	vq->desc_packed[head_idx].flags = head_flags;
1725 }
1726 
1727 static __rte_always_inline int
1728 virtio_dev_rx_async_batch_packed(struct virtio_net *dev,
1729 			   struct vhost_virtqueue *vq,
1730 			   struct rte_mbuf **pkts,
1731 			   struct rte_mbuf **comp_pkts, uint32_t *pkt_done)
1732 {
1733 	uint16_t i;
1734 	uint32_t cpy_threshold = vq->async_threshold;
1735 
1736 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1737 		if (unlikely(pkts[i]->pkt_len >= cpy_threshold))
1738 			return -1;
1739 	}
1740 	if (!virtio_dev_rx_batch_packed(dev, vq, pkts)) {
1741 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1742 			comp_pkts[(*pkt_done)++] = pkts[i];
1743 
1744 		return 0;
1745 	}
1746 
1747 	return -1;
1748 }
1749 
1750 static __rte_always_inline int
1751 vhost_enqueue_async_single_packed(struct virtio_net *dev,
1752 			    struct vhost_virtqueue *vq,
1753 			    struct rte_mbuf *pkt,
1754 			    struct buf_vector *buf_vec,
1755 			    uint16_t *nr_descs,
1756 			    uint16_t *nr_buffers,
1757 			    struct vring_packed_desc *async_descs,
1758 			    struct iovec *src_iovec, struct iovec *dst_iovec,
1759 			    struct rte_vhost_iov_iter *src_it,
1760 			    struct rte_vhost_iov_iter *dst_it)
1761 {
1762 	uint16_t nr_vec = 0;
1763 	uint16_t avail_idx = vq->last_avail_idx;
1764 	uint16_t max_tries, tries = 0;
1765 	uint16_t buf_id = 0;
1766 	uint32_t len = 0;
1767 	uint16_t desc_count = 0;
1768 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1769 	uint32_t buffer_len[vq->size];
1770 	uint16_t buffer_buf_id[vq->size];
1771 	uint16_t buffer_desc_count[vq->size];
1772 
1773 	if (rxvq_is_mergeable(dev))
1774 		max_tries = vq->size - 1;
1775 	else
1776 		max_tries = 1;
1777 
1778 	while (size > 0) {
1779 		/*
1780 		 * if we tried all available ring items, and still
1781 		 * can't get enough buf, it means something abnormal
1782 		 * happened.
1783 		 */
1784 		if (unlikely(++tries > max_tries))
1785 			return -1;
1786 
1787 		if (unlikely(fill_vec_buf_packed(dev, vq, avail_idx, &desc_count, buf_vec, &nr_vec,
1788 						&buf_id, &len, VHOST_ACCESS_RW) < 0))
1789 			return -1;
1790 
1791 		len = RTE_MIN(len, size);
1792 		size -= len;
1793 
1794 		buffer_len[*nr_buffers] = len;
1795 		buffer_buf_id[*nr_buffers] = buf_id;
1796 		buffer_desc_count[*nr_buffers] = desc_count;
1797 		*nr_buffers += 1;
1798 
1799 		*nr_descs += desc_count;
1800 		avail_idx += desc_count;
1801 		if (avail_idx >= vq->size)
1802 			avail_idx -= vq->size;
1803 	}
1804 
1805 	if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, src_iovec, dst_iovec,
1806 			src_it, dst_it) < 0)
1807 		return -1;
1808 	/* store descriptors for DMA */
1809 	if (avail_idx >= *nr_descs) {
1810 		rte_memcpy(async_descs, &vq->desc_packed[vq->last_avail_idx],
1811 			*nr_descs * sizeof(struct vring_packed_desc));
1812 	} else {
1813 		uint16_t nr_copy = vq->size - vq->last_avail_idx;
1814 
1815 		rte_memcpy(async_descs, &vq->desc_packed[vq->last_avail_idx],
1816 			nr_copy * sizeof(struct vring_packed_desc));
1817 		rte_memcpy(async_descs + nr_copy, vq->desc_packed,
1818 			(*nr_descs - nr_copy) * sizeof(struct vring_packed_desc));
1819 	}
1820 
1821 	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id, buffer_desc_count, *nr_buffers);
1822 
1823 	return 0;
1824 }
1825 
1826 static __rte_always_inline int16_t
1827 virtio_dev_rx_async_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1828 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers,
1829 			    struct vring_packed_desc *async_descs,
1830 			    struct iovec *src_iovec, struct iovec *dst_iovec,
1831 			    struct rte_vhost_iov_iter *src_it, struct rte_vhost_iov_iter *dst_it)
1832 {
1833 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1834 
1835 	if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt, buf_vec, nr_descs, nr_buffers,
1836 						 async_descs, src_iovec, dst_iovec,
1837 						 src_it, dst_it) < 0)) {
1838 		VHOST_LOG_DATA(DEBUG, "(%d) failed to get enough desc from vring\n", dev->vid);
1839 		return -1;
1840 	}
1841 
1842 	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1843 			dev->vid, vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1844 
1845 	return 0;
1846 }
1847 
1848 static __rte_always_inline void
1849 dma_error_handler_packed(struct vhost_virtqueue *vq, struct vring_packed_desc *async_descs,
1850 			uint16_t async_descs_idx, uint16_t slot_idx, uint32_t nr_err,
1851 			uint32_t *pkt_idx, uint32_t *num_async_pkts, uint32_t *num_done_pkts)
1852 {
1853 	uint16_t descs_err = 0;
1854 	uint16_t buffers_err = 0;
1855 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
1856 
1857 	*num_async_pkts -= nr_err;
1858 	*pkt_idx -= nr_err;
1859 	/* calculate the sum of buffers and descs of DMA-error packets. */
1860 	while (nr_err-- > 0) {
1861 		descs_err += pkts_info[slot_idx % vq->size].descs;
1862 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
1863 		slot_idx--;
1864 	}
1865 
1866 	vq->async_buffer_idx_packed -= buffers_err;
1867 
1868 	if (vq->last_avail_idx >= descs_err) {
1869 		vq->last_avail_idx -= descs_err;
1870 
1871 		rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
1872 			&async_descs[async_descs_idx - descs_err],
1873 			descs_err * sizeof(struct vring_packed_desc));
1874 	} else {
1875 		uint16_t nr_copy;
1876 
1877 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
1878 		nr_copy = vq->size - vq->last_avail_idx;
1879 		rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
1880 			&async_descs[async_descs_idx - descs_err],
1881 			nr_copy * sizeof(struct vring_packed_desc));
1882 		descs_err -= nr_copy;
1883 		rte_memcpy(&vq->desc_packed[0], &async_descs[async_descs_idx - descs_err],
1884 			descs_err * sizeof(struct vring_packed_desc));
1885 		vq->avail_wrap_counter ^= 1;
1886 	}
1887 
1888 	*num_done_pkts = *pkt_idx - *num_async_pkts;
1889 }
1890 
1891 static __rte_noinline uint32_t
1892 virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
1893 	struct vhost_virtqueue *vq, uint16_t queue_id,
1894 	struct rte_mbuf **pkts, uint32_t count,
1895 	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
1896 {
1897 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
1898 	uint32_t remained = count;
1899 	uint16_t async_descs_idx = 0;
1900 	uint16_t num_buffers;
1901 	uint16_t num_descs;
1902 
1903 	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
1904 	struct iovec *vec_pool = vq->vec_pool;
1905 	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
1906 	struct iovec *src_iovec = vec_pool;
1907 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
1908 	uint16_t slot_idx = 0;
1909 	uint16_t segs_await = 0;
1910 	uint16_t iovec_idx = 0, it_idx = 0;
1911 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
1912 	uint32_t n_pkts = 0, pkt_err = 0;
1913 	uint32_t num_async_pkts = 0, num_done_pkts = 0;
1914 	struct vring_packed_desc async_descs[vq->size];
1915 
1916 	do {
1917 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1918 		if (remained >= PACKED_BATCH_SIZE) {
1919 			if (!virtio_dev_rx_async_batch_packed(dev, vq,
1920 				&pkts[pkt_idx], comp_pkts, &num_done_pkts)) {
1921 				pkt_idx += PACKED_BATCH_SIZE;
1922 				remained -= PACKED_BATCH_SIZE;
1923 				continue;
1924 			}
1925 		}
1926 
1927 		num_buffers = 0;
1928 		num_descs = 0;
1929 		if (unlikely(virtio_dev_rx_async_single_packed(dev, vq, pkts[pkt_idx],
1930 						&num_descs, &num_buffers,
1931 						&async_descs[async_descs_idx],
1932 						&src_iovec[iovec_idx], &dst_iovec[iovec_idx],
1933 						&it_pool[it_idx], &it_pool[it_idx + 1]) < 0))
1934 			break;
1935 
1936 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1937 			dev->vid, vq->last_avail_idx,
1938 			vq->last_avail_idx + num_descs);
1939 
1940 		slot_idx = (vq->async_pkts_idx + num_async_pkts) % vq->size;
1941 		if (it_pool[it_idx].count) {
1942 			uint16_t from, to;
1943 
1944 			async_descs_idx += num_descs;
1945 			async_fill_desc(&tdes[pkt_burst_idx++],
1946 				&it_pool[it_idx], &it_pool[it_idx + 1]);
1947 			pkts_info[slot_idx].descs = num_descs;
1948 			pkts_info[slot_idx].nr_buffers = num_buffers;
1949 			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1950 			num_async_pkts++;
1951 			iovec_idx += it_pool[it_idx].nr_segs;
1952 			it_idx += 2;
1953 
1954 			segs_await += it_pool[it_idx].nr_segs;
1955 
1956 			/**
1957 			 * recover shadow used ring and keep DMA-occupied
1958 			 * descriptors.
1959 			 */
1960 			from = vq->shadow_used_idx - num_buffers;
1961 			to = vq->async_buffer_idx_packed % vq->size;
1962 			store_dma_desc_info_packed(vq->shadow_used_packed,
1963 					vq->async_buffers_packed, vq->size, from, to, num_buffers);
1964 
1965 			vq->async_buffer_idx_packed += num_buffers;
1966 			vq->shadow_used_idx -= num_buffers;
1967 		} else {
1968 			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
1969 		}
1970 
1971 		pkt_idx++;
1972 		remained--;
1973 		vq_inc_last_avail_packed(vq, num_descs);
1974 
1975 		/*
1976 		 * conditions to trigger async device transfer:
1977 		 * - buffered packet number reaches transfer threshold
1978 		 * - unused async iov number is less than max vhost vector
1979 		 */
1980 		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
1981 			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await < BUF_VECTOR_MAX))) {
1982 			n_pkts = vq->async_ops.transfer_data(dev->vid, queue_id,
1983 				tdes, 0, pkt_burst_idx);
1984 			iovec_idx = 0;
1985 			it_idx = 0;
1986 			segs_await = 0;
1987 			vq->async_pkts_inflight_n += n_pkts;
1988 
1989 			if (unlikely(n_pkts < pkt_burst_idx)) {
1990 				/*
1991 				 * log error packets number here and do actual
1992 				 * error processing when applications poll
1993 				 * completion
1994 				 */
1995 				pkt_err = pkt_burst_idx - n_pkts;
1996 				pkt_burst_idx = 0;
1997 				break;
1998 			}
1999 
2000 			pkt_burst_idx = 0;
2001 		}
2002 	} while (pkt_idx < count);
2003 
2004 	if (pkt_burst_idx) {
2005 		n_pkts = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, pkt_burst_idx);
2006 		vq->async_pkts_inflight_n += n_pkts;
2007 
2008 		if (unlikely(n_pkts < pkt_burst_idx))
2009 			pkt_err = pkt_burst_idx - n_pkts;
2010 	}
2011 
2012 	do_data_copy_enqueue(dev, vq);
2013 
2014 	if (unlikely(pkt_err))
2015 		dma_error_handler_packed(vq, async_descs, async_descs_idx, slot_idx, pkt_err,
2016 					&pkt_idx, &num_async_pkts, &num_done_pkts);
2017 	vq->async_pkts_idx += num_async_pkts;
2018 	*comp_count = num_done_pkts;
2019 
2020 	if (likely(vq->shadow_used_idx)) {
2021 		vhost_flush_enqueue_shadow_packed(dev, vq);
2022 		vhost_vring_call_packed(dev, vq);
2023 	}
2024 
2025 	return pkt_idx;
2026 }
2027 
2028 static __rte_always_inline void
2029 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
2030 {
2031 	uint16_t nr_left = n_descs;
2032 	uint16_t nr_copy;
2033 	uint16_t to, from;
2034 
2035 	do {
2036 		from = vq->last_async_desc_idx_split & (vq->size - 1);
2037 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
2038 		to = vq->last_used_idx & (vq->size - 1);
2039 
2040 		if (to + nr_copy <= vq->size) {
2041 			rte_memcpy(&vq->used->ring[to], &vq->async_descs_split[from],
2042 					nr_copy * sizeof(struct vring_used_elem));
2043 		} else {
2044 			uint16_t size = vq->size - to;
2045 
2046 			rte_memcpy(&vq->used->ring[to], &vq->async_descs_split[from],
2047 					size * sizeof(struct vring_used_elem));
2048 			rte_memcpy(&vq->used->ring[0], &vq->async_descs_split[from + size],
2049 					(nr_copy - size) * sizeof(struct vring_used_elem));
2050 		}
2051 
2052 		vq->last_async_desc_idx_split += nr_copy;
2053 		vq->last_used_idx += nr_copy;
2054 		nr_left -= nr_copy;
2055 	} while (nr_left > 0);
2056 }
2057 
2058 static __rte_always_inline void
2059 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
2060 				uint16_t n_buffers)
2061 {
2062 	uint16_t nr_left = n_buffers;
2063 	uint16_t from, to;
2064 
2065 	do {
2066 		from = vq->last_async_buffer_idx_packed % vq->size;
2067 		to = (from + nr_left) % vq->size;
2068 		if (to > from) {
2069 			vhost_update_used_packed(vq, vq->async_buffers_packed + from, to - from);
2070 			vq->last_async_buffer_idx_packed += nr_left;
2071 			nr_left = 0;
2072 		} else {
2073 			vhost_update_used_packed(vq, vq->async_buffers_packed + from,
2074 				vq->size - from);
2075 			vq->last_async_buffer_idx_packed += vq->size - from;
2076 			nr_left -= vq->size - from;
2077 		}
2078 	} while (nr_left > 0);
2079 }
2080 
2081 uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
2082 		struct rte_mbuf **pkts, uint16_t count)
2083 {
2084 	struct virtio_net *dev = get_device(vid);
2085 	struct vhost_virtqueue *vq;
2086 	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
2087 	uint16_t start_idx, pkts_idx, vq_size;
2088 	struct async_inflight_info *pkts_info;
2089 	uint16_t from, i;
2090 
2091 	if (!dev)
2092 		return 0;
2093 
2094 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
2095 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2096 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
2097 			dev->vid, __func__, queue_id);
2098 		return 0;
2099 	}
2100 
2101 	vq = dev->virtqueue[queue_id];
2102 
2103 	if (unlikely(!vq->async_registered)) {
2104 		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
2105 			dev->vid, __func__, queue_id);
2106 		return 0;
2107 	}
2108 
2109 	rte_spinlock_lock(&vq->access_lock);
2110 
2111 	pkts_idx = vq->async_pkts_idx % vq->size;
2112 	pkts_info = vq->async_pkts_info;
2113 	vq_size = vq->size;
2114 	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
2115 		vq_size, vq->async_pkts_inflight_n);
2116 
2117 	if (count > vq->async_last_pkts_n)
2118 		n_pkts_cpl = vq->async_ops.check_completed_copies(vid,
2119 			queue_id, 0, count - vq->async_last_pkts_n);
2120 	n_pkts_cpl += vq->async_last_pkts_n;
2121 
2122 	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
2123 	if (unlikely(n_pkts_put == 0)) {
2124 		vq->async_last_pkts_n = n_pkts_cpl;
2125 		goto done;
2126 	}
2127 
2128 	if (vq_is_packed(dev)) {
2129 		for (i = 0; i < n_pkts_put; i++) {
2130 			from = (start_idx + i) & (vq_size - 1);
2131 			n_buffers += pkts_info[from].nr_buffers;
2132 			pkts[i] = pkts_info[from].mbuf;
2133 		}
2134 	} else {
2135 		for (i = 0; i < n_pkts_put; i++) {
2136 			from = (start_idx + i) & (vq_size - 1);
2137 			n_descs += pkts_info[from].descs;
2138 			pkts[i] = pkts_info[from].mbuf;
2139 		}
2140 	}
2141 
2142 	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
2143 	vq->async_pkts_inflight_n -= n_pkts_put;
2144 
2145 	if (likely(vq->enabled && vq->access_ok)) {
2146 		if (vq_is_packed(dev)) {
2147 			write_back_completed_descs_packed(vq, n_buffers);
2148 
2149 			vhost_vring_call_packed(dev, vq);
2150 		} else {
2151 			write_back_completed_descs_split(vq, n_descs);
2152 
2153 			__atomic_add_fetch(&vq->used->idx, n_descs,
2154 					__ATOMIC_RELEASE);
2155 			vhost_vring_call_split(dev, vq);
2156 		}
2157 	} else {
2158 		if (vq_is_packed(dev))
2159 			vq->last_async_buffer_idx_packed += n_buffers;
2160 		else
2161 			vq->last_async_desc_idx_split += n_descs;
2162 	}
2163 
2164 done:
2165 	rte_spinlock_unlock(&vq->access_lock);
2166 
2167 	return n_pkts_put;
2168 }
2169 
2170 static __rte_always_inline uint32_t
2171 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
2172 	struct rte_mbuf **pkts, uint32_t count,
2173 	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
2174 {
2175 	struct vhost_virtqueue *vq;
2176 	uint32_t nb_tx = 0;
2177 
2178 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
2179 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
2180 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
2181 			dev->vid, __func__, queue_id);
2182 		return 0;
2183 	}
2184 
2185 	vq = dev->virtqueue[queue_id];
2186 
2187 	rte_spinlock_lock(&vq->access_lock);
2188 
2189 	if (unlikely(!vq->enabled || !vq->async_registered))
2190 		goto out_access_unlock;
2191 
2192 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2193 		vhost_user_iotlb_rd_lock(vq);
2194 
2195 	if (unlikely(!vq->access_ok))
2196 		if (unlikely(vring_translate(dev, vq) < 0))
2197 			goto out;
2198 
2199 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
2200 	if (count == 0)
2201 		goto out;
2202 
2203 	if (vq_is_packed(dev))
2204 		nb_tx = virtio_dev_rx_async_submit_packed(dev,
2205 				vq, queue_id, pkts, count, comp_pkts,
2206 				comp_count);
2207 	else
2208 		nb_tx = virtio_dev_rx_async_submit_split(dev,
2209 				vq, queue_id, pkts, count, comp_pkts,
2210 				comp_count);
2211 
2212 out:
2213 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2214 		vhost_user_iotlb_rd_unlock(vq);
2215 
2216 out_access_unlock:
2217 	rte_spinlock_unlock(&vq->access_lock);
2218 
2219 	return nb_tx;
2220 }
2221 
2222 uint16_t
2223 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2224 		struct rte_mbuf **pkts, uint16_t count,
2225 		struct rte_mbuf **comp_pkts, uint32_t *comp_count)
2226 {
2227 	struct virtio_net *dev = get_device(vid);
2228 
2229 	*comp_count = 0;
2230 	if (!dev)
2231 		return 0;
2232 
2233 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2234 		VHOST_LOG_DATA(ERR,
2235 			"(%d) %s: built-in vhost net backend is disabled.\n",
2236 			dev->vid, __func__);
2237 		return 0;
2238 	}
2239 
2240 	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count, comp_pkts,
2241 			comp_count);
2242 }
2243 
2244 static inline bool
2245 virtio_net_with_host_offload(struct virtio_net *dev)
2246 {
2247 	if (dev->features &
2248 			((1ULL << VIRTIO_NET_F_CSUM) |
2249 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2250 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2251 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2252 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2253 		return true;
2254 
2255 	return false;
2256 }
2257 
2258 static void
2259 parse_ethernet(struct rte_mbuf *m, uint16_t *l4_proto, void **l4_hdr)
2260 {
2261 	struct rte_ipv4_hdr *ipv4_hdr;
2262 	struct rte_ipv6_hdr *ipv6_hdr;
2263 	void *l3_hdr = NULL;
2264 	struct rte_ether_hdr *eth_hdr;
2265 	uint16_t ethertype;
2266 
2267 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2268 
2269 	m->l2_len = sizeof(struct rte_ether_hdr);
2270 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2271 
2272 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2273 		struct rte_vlan_hdr *vlan_hdr =
2274 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2275 
2276 		m->l2_len += sizeof(struct rte_vlan_hdr);
2277 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2278 	}
2279 
2280 	l3_hdr = (char *)eth_hdr + m->l2_len;
2281 
2282 	switch (ethertype) {
2283 	case RTE_ETHER_TYPE_IPV4:
2284 		ipv4_hdr = l3_hdr;
2285 		*l4_proto = ipv4_hdr->next_proto_id;
2286 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2287 		*l4_hdr = (char *)l3_hdr + m->l3_len;
2288 		m->ol_flags |= PKT_TX_IPV4;
2289 		break;
2290 	case RTE_ETHER_TYPE_IPV6:
2291 		ipv6_hdr = l3_hdr;
2292 		*l4_proto = ipv6_hdr->proto;
2293 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2294 		*l4_hdr = (char *)l3_hdr + m->l3_len;
2295 		m->ol_flags |= PKT_TX_IPV6;
2296 		break;
2297 	default:
2298 		m->l3_len = 0;
2299 		*l4_proto = 0;
2300 		*l4_hdr = NULL;
2301 		break;
2302 	}
2303 }
2304 
2305 static __rte_always_inline void
2306 vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
2307 {
2308 	uint16_t l4_proto = 0;
2309 	void *l4_hdr = NULL;
2310 	struct rte_tcp_hdr *tcp_hdr = NULL;
2311 
2312 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2313 		return;
2314 
2315 	parse_ethernet(m, &l4_proto, &l4_hdr);
2316 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2317 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2318 			switch (hdr->csum_offset) {
2319 			case (offsetof(struct rte_tcp_hdr, cksum)):
2320 				if (l4_proto == IPPROTO_TCP)
2321 					m->ol_flags |= PKT_TX_TCP_CKSUM;
2322 				break;
2323 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2324 				if (l4_proto == IPPROTO_UDP)
2325 					m->ol_flags |= PKT_TX_UDP_CKSUM;
2326 				break;
2327 			case (offsetof(struct rte_sctp_hdr, cksum)):
2328 				if (l4_proto == IPPROTO_SCTP)
2329 					m->ol_flags |= PKT_TX_SCTP_CKSUM;
2330 				break;
2331 			default:
2332 				break;
2333 			}
2334 		}
2335 	}
2336 
2337 	if (l4_hdr && hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2338 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2339 		case VIRTIO_NET_HDR_GSO_TCPV4:
2340 		case VIRTIO_NET_HDR_GSO_TCPV6:
2341 			tcp_hdr = l4_hdr;
2342 			m->ol_flags |= PKT_TX_TCP_SEG;
2343 			m->tso_segsz = hdr->gso_size;
2344 			m->l4_len = (tcp_hdr->data_off & 0xf0) >> 2;
2345 			break;
2346 		case VIRTIO_NET_HDR_GSO_UDP:
2347 			m->ol_flags |= PKT_TX_UDP_SEG;
2348 			m->tso_segsz = hdr->gso_size;
2349 			m->l4_len = sizeof(struct rte_udp_hdr);
2350 			break;
2351 		default:
2352 			VHOST_LOG_DATA(WARNING,
2353 				"unsupported gso type %u.\n", hdr->gso_type);
2354 			break;
2355 		}
2356 	}
2357 }
2358 
2359 static __rte_noinline void
2360 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2361 		struct buf_vector *buf_vec)
2362 {
2363 	uint64_t len;
2364 	uint64_t remain = sizeof(struct virtio_net_hdr);
2365 	uint64_t src;
2366 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2367 
2368 	while (remain) {
2369 		len = RTE_MIN(remain, buf_vec->buf_len);
2370 		src = buf_vec->buf_addr;
2371 		rte_memcpy((void *)(uintptr_t)dst,
2372 				(void *)(uintptr_t)src, len);
2373 
2374 		remain -= len;
2375 		dst += len;
2376 		buf_vec++;
2377 	}
2378 }
2379 
2380 static __rte_always_inline int
2381 copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2382 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2383 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool)
2384 {
2385 	uint32_t buf_avail, buf_offset;
2386 	uint64_t buf_addr, buf_len;
2387 	uint32_t mbuf_avail, mbuf_offset;
2388 	uint32_t cpy_len;
2389 	struct rte_mbuf *cur = m, *prev = m;
2390 	struct virtio_net_hdr tmp_hdr;
2391 	struct virtio_net_hdr *hdr = NULL;
2392 	/* A counter to avoid desc dead loop chain */
2393 	uint16_t vec_idx = 0;
2394 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
2395 	int error = 0;
2396 
2397 	buf_addr = buf_vec[vec_idx].buf_addr;
2398 	buf_len = buf_vec[vec_idx].buf_len;
2399 
2400 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
2401 		error = -1;
2402 		goto out;
2403 	}
2404 
2405 	if (virtio_net_with_host_offload(dev)) {
2406 		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
2407 			/*
2408 			 * No luck, the virtio-net header doesn't fit
2409 			 * in a contiguous virtual area.
2410 			 */
2411 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2412 			hdr = &tmp_hdr;
2413 		} else {
2414 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
2415 		}
2416 	}
2417 
2418 	/*
2419 	 * A virtio driver normally uses at least 2 desc buffers
2420 	 * for Tx: the first for storing the header, and others
2421 	 * for storing the data.
2422 	 */
2423 	if (unlikely(buf_len < dev->vhost_hlen)) {
2424 		buf_offset = dev->vhost_hlen - buf_len;
2425 		vec_idx++;
2426 		buf_addr = buf_vec[vec_idx].buf_addr;
2427 		buf_len = buf_vec[vec_idx].buf_len;
2428 		buf_avail  = buf_len - buf_offset;
2429 	} else if (buf_len == dev->vhost_hlen) {
2430 		if (unlikely(++vec_idx >= nr_vec))
2431 			goto out;
2432 		buf_addr = buf_vec[vec_idx].buf_addr;
2433 		buf_len = buf_vec[vec_idx].buf_len;
2434 
2435 		buf_offset = 0;
2436 		buf_avail = buf_len;
2437 	} else {
2438 		buf_offset = dev->vhost_hlen;
2439 		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
2440 	}
2441 
2442 	PRINT_PACKET(dev,
2443 			(uintptr_t)(buf_addr + buf_offset),
2444 			(uint32_t)buf_avail, 0);
2445 
2446 	mbuf_offset = 0;
2447 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2448 	while (1) {
2449 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2450 
2451 		if (likely(cpy_len > MAX_BATCH_LEN ||
2452 					vq->batch_copy_nb_elems >= vq->size ||
2453 					(hdr && cur == m))) {
2454 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
2455 						mbuf_offset),
2456 					(void *)((uintptr_t)(buf_addr +
2457 							buf_offset)), cpy_len);
2458 		} else {
2459 			batch_copy[vq->batch_copy_nb_elems].dst =
2460 				rte_pktmbuf_mtod_offset(cur, void *,
2461 						mbuf_offset);
2462 			batch_copy[vq->batch_copy_nb_elems].src =
2463 				(void *)((uintptr_t)(buf_addr + buf_offset));
2464 			batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
2465 			vq->batch_copy_nb_elems++;
2466 		}
2467 
2468 		mbuf_avail  -= cpy_len;
2469 		mbuf_offset += cpy_len;
2470 		buf_avail -= cpy_len;
2471 		buf_offset += cpy_len;
2472 
2473 		/* This buf reaches to its end, get the next one */
2474 		if (buf_avail == 0) {
2475 			if (++vec_idx >= nr_vec)
2476 				break;
2477 
2478 			buf_addr = buf_vec[vec_idx].buf_addr;
2479 			buf_len = buf_vec[vec_idx].buf_len;
2480 
2481 			buf_offset = 0;
2482 			buf_avail  = buf_len;
2483 
2484 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2485 					(uint32_t)buf_avail, 0);
2486 		}
2487 
2488 		/*
2489 		 * This mbuf reaches to its end, get a new one
2490 		 * to hold more data.
2491 		 */
2492 		if (mbuf_avail == 0) {
2493 			cur = rte_pktmbuf_alloc(mbuf_pool);
2494 			if (unlikely(cur == NULL)) {
2495 				VHOST_LOG_DATA(ERR, "Failed to "
2496 					"allocate memory for mbuf.\n");
2497 				error = -1;
2498 				goto out;
2499 			}
2500 
2501 			prev->next = cur;
2502 			prev->data_len = mbuf_offset;
2503 			m->nb_segs += 1;
2504 			m->pkt_len += mbuf_offset;
2505 			prev = cur;
2506 
2507 			mbuf_offset = 0;
2508 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
2509 		}
2510 	}
2511 
2512 	prev->data_len = mbuf_offset;
2513 	m->pkt_len    += mbuf_offset;
2514 
2515 	if (hdr)
2516 		vhost_dequeue_offload(hdr, m);
2517 
2518 out:
2519 
2520 	return error;
2521 }
2522 
2523 static void
2524 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
2525 {
2526 	rte_free(opaque);
2527 }
2528 
2529 static int
2530 virtio_dev_extbuf_alloc(struct rte_mbuf *pkt, uint32_t size)
2531 {
2532 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
2533 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
2534 	uint16_t buf_len;
2535 	rte_iova_t iova;
2536 	void *buf;
2537 
2538 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
2539 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
2540 
2541 	if (unlikely(total_len > UINT16_MAX))
2542 		return -ENOSPC;
2543 
2544 	buf_len = total_len;
2545 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
2546 	if (unlikely(buf == NULL))
2547 		return -ENOMEM;
2548 
2549 	/* Initialize shinfo */
2550 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
2551 						virtio_dev_extbuf_free, buf);
2552 	if (unlikely(shinfo == NULL)) {
2553 		rte_free(buf);
2554 		VHOST_LOG_DATA(ERR, "Failed to init shinfo\n");
2555 		return -1;
2556 	}
2557 
2558 	iova = rte_malloc_virt2iova(buf);
2559 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
2560 	rte_pktmbuf_reset_headroom(pkt);
2561 
2562 	return 0;
2563 }
2564 
2565 static __rte_always_inline int
2566 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
2567 			 uint32_t data_len)
2568 {
2569 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
2570 		return 0;
2571 
2572 	/* attach an external buffer if supported */
2573 	if (dev->extbuf && !virtio_dev_extbuf_alloc(pkt, data_len))
2574 		return 0;
2575 
2576 	/* check if chained buffers are allowed */
2577 	if (!dev->linearbuf)
2578 		return 0;
2579 
2580 	return -1;
2581 }
2582 
2583 /*
2584  * Allocate a host supported pktmbuf.
2585  */
2586 static __rte_always_inline struct rte_mbuf *
2587 virtio_dev_pktmbuf_alloc(struct virtio_net *dev, struct rte_mempool *mp,
2588 			 uint32_t data_len)
2589 {
2590 	struct rte_mbuf *pkt = rte_pktmbuf_alloc(mp);
2591 
2592 	if (unlikely(pkt == NULL)) {
2593 		VHOST_LOG_DATA(ERR,
2594 			"Failed to allocate memory for mbuf.\n");
2595 		return NULL;
2596 	}
2597 
2598 	if (virtio_dev_pktmbuf_prep(dev, pkt, data_len)) {
2599 		/* Data doesn't fit into the buffer and the host supports
2600 		 * only linear buffers
2601 		 */
2602 		rte_pktmbuf_free(pkt);
2603 		return NULL;
2604 	}
2605 
2606 	return pkt;
2607 }
2608 
2609 static __rte_noinline uint16_t
2610 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
2611 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
2612 {
2613 	uint16_t i;
2614 	uint16_t free_entries;
2615 	uint16_t dropped = 0;
2616 	static bool allocerr_warned;
2617 
2618 	/*
2619 	 * The ordering between avail index and
2620 	 * desc reads needs to be enforced.
2621 	 */
2622 	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
2623 			vq->last_avail_idx;
2624 	if (free_entries == 0)
2625 		return 0;
2626 
2627 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
2628 
2629 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
2630 
2631 	count = RTE_MIN(count, MAX_PKT_BURST);
2632 	count = RTE_MIN(count, free_entries);
2633 	VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n",
2634 			dev->vid, count);
2635 
2636 	for (i = 0; i < count; i++) {
2637 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
2638 		uint16_t head_idx;
2639 		uint32_t buf_len;
2640 		uint16_t nr_vec = 0;
2641 		int err;
2642 
2643 		if (unlikely(fill_vec_buf_split(dev, vq,
2644 						vq->last_avail_idx + i,
2645 						&nr_vec, buf_vec,
2646 						&head_idx, &buf_len,
2647 						VHOST_ACCESS_RO) < 0))
2648 			break;
2649 
2650 		update_shadow_used_ring_split(vq, head_idx, 0);
2651 
2652 		pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
2653 		if (unlikely(pkts[i] == NULL)) {
2654 			/*
2655 			 * mbuf allocation fails for jumbo packets when external
2656 			 * buffer allocation is not allowed and linear buffer
2657 			 * is required. Drop this packet.
2658 			 */
2659 			if (!allocerr_warned) {
2660 				VHOST_LOG_DATA(ERR,
2661 					"Failed mbuf alloc of size %d from %s on %s.\n",
2662 					buf_len, mbuf_pool->name, dev->ifname);
2663 				allocerr_warned = true;
2664 			}
2665 			dropped += 1;
2666 			i++;
2667 			break;
2668 		}
2669 
2670 		err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
2671 				mbuf_pool);
2672 		if (unlikely(err)) {
2673 			rte_pktmbuf_free(pkts[i]);
2674 			if (!allocerr_warned) {
2675 				VHOST_LOG_DATA(ERR,
2676 					"Failed to copy desc to mbuf on %s.\n",
2677 					dev->ifname);
2678 				allocerr_warned = true;
2679 			}
2680 			dropped += 1;
2681 			i++;
2682 			break;
2683 		}
2684 	}
2685 
2686 	vq->last_avail_idx += i;
2687 
2688 	do_data_copy_dequeue(vq);
2689 	if (unlikely(i < count))
2690 		vq->shadow_used_idx = i;
2691 	if (likely(vq->shadow_used_idx)) {
2692 		flush_shadow_used_ring_split(dev, vq);
2693 		vhost_vring_call_split(dev, vq);
2694 	}
2695 
2696 	return (i - dropped);
2697 }
2698 
2699 static __rte_always_inline int
2700 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
2701 				 struct vhost_virtqueue *vq,
2702 				 struct rte_mbuf **pkts,
2703 				 uint16_t avail_idx,
2704 				 uintptr_t *desc_addrs,
2705 				 uint16_t *ids)
2706 {
2707 	bool wrap = vq->avail_wrap_counter;
2708 	struct vring_packed_desc *descs = vq->desc_packed;
2709 	uint64_t lens[PACKED_BATCH_SIZE];
2710 	uint64_t buf_lens[PACKED_BATCH_SIZE];
2711 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2712 	uint16_t flags, i;
2713 
2714 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
2715 		return -1;
2716 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
2717 		return -1;
2718 
2719 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2720 		flags = descs[avail_idx + i].flags;
2721 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
2722 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
2723 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
2724 			return -1;
2725 	}
2726 
2727 	rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
2728 
2729 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2730 		lens[i] = descs[avail_idx + i].len;
2731 
2732 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2733 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
2734 						  descs[avail_idx + i].addr,
2735 						  &lens[i], VHOST_ACCESS_RW);
2736 	}
2737 
2738 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2739 		if (unlikely(!desc_addrs[i]))
2740 			return -1;
2741 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
2742 			return -1;
2743 	}
2744 
2745 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2746 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
2747 			goto err;
2748 	}
2749 
2750 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2751 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
2752 
2753 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2754 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
2755 			goto err;
2756 	}
2757 
2758 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2759 		pkts[i]->pkt_len = lens[i] - buf_offset;
2760 		pkts[i]->data_len = pkts[i]->pkt_len;
2761 		ids[i] = descs[avail_idx + i].id;
2762 	}
2763 
2764 	return 0;
2765 
2766 err:
2767 	return -1;
2768 }
2769 
2770 static __rte_always_inline int
2771 virtio_dev_tx_batch_packed(struct virtio_net *dev,
2772 			   struct vhost_virtqueue *vq,
2773 			   struct rte_mbuf **pkts)
2774 {
2775 	uint16_t avail_idx = vq->last_avail_idx;
2776 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2777 	struct virtio_net_hdr *hdr;
2778 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
2779 	uint16_t ids[PACKED_BATCH_SIZE];
2780 	uint16_t i;
2781 
2782 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
2783 					     desc_addrs, ids))
2784 		return -1;
2785 
2786 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2787 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2788 
2789 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2790 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
2791 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
2792 			   pkts[i]->pkt_len);
2793 
2794 	if (virtio_net_with_host_offload(dev)) {
2795 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2796 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
2797 			vhost_dequeue_offload(hdr, pkts[i]);
2798 		}
2799 	}
2800 
2801 	if (virtio_net_is_inorder(dev))
2802 		vhost_shadow_dequeue_batch_packed_inorder(vq,
2803 			ids[PACKED_BATCH_SIZE - 1]);
2804 	else
2805 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
2806 
2807 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2808 
2809 	return 0;
2810 }
2811 
2812 static __rte_always_inline int
2813 vhost_dequeue_single_packed(struct virtio_net *dev,
2814 			    struct vhost_virtqueue *vq,
2815 			    struct rte_mempool *mbuf_pool,
2816 			    struct rte_mbuf *pkts,
2817 			    uint16_t *buf_id,
2818 			    uint16_t *desc_count)
2819 {
2820 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
2821 	uint32_t buf_len;
2822 	uint16_t nr_vec = 0;
2823 	int err;
2824 	static bool allocerr_warned;
2825 
2826 	if (unlikely(fill_vec_buf_packed(dev, vq,
2827 					 vq->last_avail_idx, desc_count,
2828 					 buf_vec, &nr_vec,
2829 					 buf_id, &buf_len,
2830 					 VHOST_ACCESS_RO) < 0))
2831 		return -1;
2832 
2833 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
2834 		if (!allocerr_warned) {
2835 			VHOST_LOG_DATA(ERR,
2836 				"Failed mbuf alloc of size %d from %s on %s.\n",
2837 				buf_len, mbuf_pool->name, dev->ifname);
2838 			allocerr_warned = true;
2839 		}
2840 		return -1;
2841 	}
2842 
2843 	err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
2844 				mbuf_pool);
2845 	if (unlikely(err)) {
2846 		if (!allocerr_warned) {
2847 			VHOST_LOG_DATA(ERR,
2848 				"Failed to copy desc to mbuf on %s.\n",
2849 				dev->ifname);
2850 			allocerr_warned = true;
2851 		}
2852 		return -1;
2853 	}
2854 
2855 	return 0;
2856 }
2857 
2858 static __rte_always_inline int
2859 virtio_dev_tx_single_packed(struct virtio_net *dev,
2860 			    struct vhost_virtqueue *vq,
2861 			    struct rte_mempool *mbuf_pool,
2862 			    struct rte_mbuf *pkts)
2863 {
2864 
2865 	uint16_t buf_id, desc_count = 0;
2866 	int ret;
2867 
2868 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
2869 					&desc_count);
2870 
2871 	if (likely(desc_count > 0)) {
2872 		if (virtio_net_is_inorder(dev))
2873 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
2874 								   desc_count);
2875 		else
2876 			vhost_shadow_dequeue_single_packed(vq, buf_id,
2877 					desc_count);
2878 
2879 		vq_inc_last_avail_packed(vq, desc_count);
2880 	}
2881 
2882 	return ret;
2883 }
2884 
2885 static __rte_noinline uint16_t
2886 virtio_dev_tx_packed(struct virtio_net *dev,
2887 		     struct vhost_virtqueue *__rte_restrict vq,
2888 		     struct rte_mempool *mbuf_pool,
2889 		     struct rte_mbuf **__rte_restrict pkts,
2890 		     uint32_t count)
2891 {
2892 	uint32_t pkt_idx = 0;
2893 
2894 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
2895 		return 0;
2896 
2897 	do {
2898 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2899 
2900 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
2901 			if (!virtio_dev_tx_batch_packed(dev, vq,
2902 							&pkts[pkt_idx])) {
2903 				pkt_idx += PACKED_BATCH_SIZE;
2904 				continue;
2905 			}
2906 		}
2907 
2908 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
2909 						pkts[pkt_idx]))
2910 			break;
2911 		pkt_idx++;
2912 	} while (pkt_idx < count);
2913 
2914 	if (pkt_idx != count)
2915 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
2916 
2917 	if (vq->shadow_used_idx) {
2918 		do_data_copy_dequeue(vq);
2919 
2920 		vhost_flush_dequeue_shadow_packed(dev, vq);
2921 		vhost_vring_call_packed(dev, vq);
2922 	}
2923 
2924 	return pkt_idx;
2925 }
2926 
2927 uint16_t
2928 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
2929 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
2930 {
2931 	struct virtio_net *dev;
2932 	struct rte_mbuf *rarp_mbuf = NULL;
2933 	struct vhost_virtqueue *vq;
2934 	int16_t success = 1;
2935 
2936 	dev = get_device(vid);
2937 	if (!dev)
2938 		return 0;
2939 
2940 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2941 		VHOST_LOG_DATA(ERR,
2942 			"(%d) %s: built-in vhost net backend is disabled.\n",
2943 			dev->vid, __func__);
2944 		return 0;
2945 	}
2946 
2947 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
2948 		VHOST_LOG_DATA(ERR,
2949 			"(%d) %s: invalid virtqueue idx %d.\n",
2950 			dev->vid, __func__, queue_id);
2951 		return 0;
2952 	}
2953 
2954 	vq = dev->virtqueue[queue_id];
2955 
2956 	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
2957 		return 0;
2958 
2959 	if (unlikely(!vq->enabled)) {
2960 		count = 0;
2961 		goto out_access_unlock;
2962 	}
2963 
2964 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2965 		vhost_user_iotlb_rd_lock(vq);
2966 
2967 	if (unlikely(!vq->access_ok))
2968 		if (unlikely(vring_translate(dev, vq) < 0)) {
2969 			count = 0;
2970 			goto out;
2971 		}
2972 
2973 	/*
2974 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
2975 	 * array, to looks like that guest actually send such packet.
2976 	 *
2977 	 * Check user_send_rarp() for more information.
2978 	 *
2979 	 * broadcast_rarp shares a cacheline in the virtio_net structure
2980 	 * with some fields that are accessed during enqueue and
2981 	 * __atomic_compare_exchange_n causes a write if performed compare
2982 	 * and exchange. This could result in false sharing between enqueue
2983 	 * and dequeue.
2984 	 *
2985 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
2986 	 * and only performing compare and exchange if the read indicates it
2987 	 * is likely to be set.
2988 	 */
2989 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
2990 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
2991 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
2992 
2993 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
2994 		if (rarp_mbuf == NULL) {
2995 			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
2996 			count = 0;
2997 			goto out;
2998 		}
2999 		count -= 1;
3000 	}
3001 
3002 	if (vq_is_packed(dev))
3003 		count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count);
3004 	else
3005 		count = virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count);
3006 
3007 out:
3008 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
3009 		vhost_user_iotlb_rd_unlock(vq);
3010 
3011 out_access_unlock:
3012 	rte_spinlock_unlock(&vq->access_lock);
3013 
3014 	if (unlikely(rarp_mbuf != NULL)) {
3015 		/*
3016 		 * Inject it to the head of "pkts" array, so that switch's mac
3017 		 * learning table will get updated first.
3018 		 */
3019 		memmove(&pkts[1], pkts, count * sizeof(struct rte_mbuf *));
3020 		pkts[0] = rarp_mbuf;
3021 		count += 1;
3022 	}
3023 
3024 	return count;
3025 }
3026