xref: /dpdk/lib/vhost/virtio_net.c (revision cb43641e732a8c9a04e11a8ec7b04ab5e313b288)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_vhost.h>
15 #include <rte_tcp.h>
16 #include <rte_udp.h>
17 #include <rte_sctp.h>
18 #include <rte_arp.h>
19 #include <rte_spinlock.h>
20 #include <rte_malloc.h>
21 #include <rte_vhost_async.h>
22 
23 #include "iotlb.h"
24 #include "vhost.h"
25 
26 #define MAX_BATCH_LEN 256
27 
28 static  __rte_always_inline bool
29 rxvq_is_mergeable(struct virtio_net *dev)
30 {
31 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
32 }
33 
34 static  __rte_always_inline bool
35 virtio_net_is_inorder(struct virtio_net *dev)
36 {
37 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
38 }
39 
40 static bool
41 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
42 {
43 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
44 }
45 
46 static inline void
47 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
48 {
49 	struct batch_copy_elem *elem = vq->batch_copy_elems;
50 	uint16_t count = vq->batch_copy_nb_elems;
51 	int i;
52 
53 	for (i = 0; i < count; i++) {
54 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
55 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
56 					   elem[i].len);
57 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
58 	}
59 
60 	vq->batch_copy_nb_elems = 0;
61 }
62 
63 static inline void
64 do_data_copy_dequeue(struct vhost_virtqueue *vq)
65 {
66 	struct batch_copy_elem *elem = vq->batch_copy_elems;
67 	uint16_t count = vq->batch_copy_nb_elems;
68 	int i;
69 
70 	for (i = 0; i < count; i++)
71 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
72 
73 	vq->batch_copy_nb_elems = 0;
74 }
75 
76 static __rte_always_inline void
77 do_flush_shadow_used_ring_split(struct virtio_net *dev,
78 			struct vhost_virtqueue *vq,
79 			uint16_t to, uint16_t from, uint16_t size)
80 {
81 	rte_memcpy(&vq->used->ring[to],
82 			&vq->shadow_used_split[from],
83 			size * sizeof(struct vring_used_elem));
84 	vhost_log_cache_used_vring(dev, vq,
85 			offsetof(struct vring_used, ring[to]),
86 			size * sizeof(struct vring_used_elem));
87 }
88 
89 static __rte_always_inline void
90 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
91 {
92 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
93 
94 	if (used_idx + vq->shadow_used_idx <= vq->size) {
95 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
96 					  vq->shadow_used_idx);
97 	} else {
98 		uint16_t size;
99 
100 		/* update used ring interval [used_idx, vq->size] */
101 		size = vq->size - used_idx;
102 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
103 
104 		/* update the left half used ring interval [0, left_size] */
105 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
106 					  vq->shadow_used_idx - size);
107 	}
108 	vq->last_used_idx += vq->shadow_used_idx;
109 
110 	vhost_log_cache_sync(dev, vq);
111 
112 	__atomic_add_fetch(&vq->used->idx, vq->shadow_used_idx,
113 			   __ATOMIC_RELEASE);
114 	vq->shadow_used_idx = 0;
115 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
116 		sizeof(vq->used->idx));
117 }
118 
119 static __rte_always_inline void
120 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
121 			 uint16_t desc_idx, uint32_t len)
122 {
123 	uint16_t i = vq->shadow_used_idx++;
124 
125 	vq->shadow_used_split[i].id  = desc_idx;
126 	vq->shadow_used_split[i].len = len;
127 }
128 
129 static __rte_always_inline void
130 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
131 				  struct vhost_virtqueue *vq)
132 {
133 	int i;
134 	uint16_t used_idx = vq->last_used_idx;
135 	uint16_t head_idx = vq->last_used_idx;
136 	uint16_t head_flags = 0;
137 
138 	/* Split loop in two to save memory barriers */
139 	for (i = 0; i < vq->shadow_used_idx; i++) {
140 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
141 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
142 
143 		used_idx += vq->shadow_used_packed[i].count;
144 		if (used_idx >= vq->size)
145 			used_idx -= vq->size;
146 	}
147 
148 	/* The ordering for storing desc flags needs to be enforced. */
149 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
150 
151 	for (i = 0; i < vq->shadow_used_idx; i++) {
152 		uint16_t flags;
153 
154 		if (vq->shadow_used_packed[i].len)
155 			flags = VRING_DESC_F_WRITE;
156 		else
157 			flags = 0;
158 
159 		if (vq->used_wrap_counter) {
160 			flags |= VRING_DESC_F_USED;
161 			flags |= VRING_DESC_F_AVAIL;
162 		} else {
163 			flags &= ~VRING_DESC_F_USED;
164 			flags &= ~VRING_DESC_F_AVAIL;
165 		}
166 
167 		if (i > 0) {
168 			vq->desc_packed[vq->last_used_idx].flags = flags;
169 
170 			vhost_log_cache_used_vring(dev, vq,
171 					vq->last_used_idx *
172 					sizeof(struct vring_packed_desc),
173 					sizeof(struct vring_packed_desc));
174 		} else {
175 			head_idx = vq->last_used_idx;
176 			head_flags = flags;
177 		}
178 
179 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
180 	}
181 
182 	vq->desc_packed[head_idx].flags = head_flags;
183 
184 	vhost_log_cache_used_vring(dev, vq,
185 				head_idx *
186 				sizeof(struct vring_packed_desc),
187 				sizeof(struct vring_packed_desc));
188 
189 	vq->shadow_used_idx = 0;
190 	vhost_log_cache_sync(dev, vq);
191 }
192 
193 static __rte_always_inline void
194 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
195 				  struct vhost_virtqueue *vq)
196 {
197 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
198 
199 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
200 	/* desc flags is the synchronization point for virtio packed vring */
201 	__atomic_store_n(&vq->desc_packed[vq->shadow_last_used_idx].flags,
202 			 used_elem->flags, __ATOMIC_RELEASE);
203 
204 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
205 				   sizeof(struct vring_packed_desc),
206 				   sizeof(struct vring_packed_desc));
207 	vq->shadow_used_idx = 0;
208 	vhost_log_cache_sync(dev, vq);
209 }
210 
211 static __rte_always_inline void
212 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
213 				 struct vhost_virtqueue *vq,
214 				 uint64_t *lens,
215 				 uint16_t *ids)
216 {
217 	uint16_t i;
218 	uint16_t flags;
219 	uint16_t last_used_idx;
220 	struct vring_packed_desc *desc_base;
221 
222 	last_used_idx = vq->last_used_idx;
223 	desc_base = &vq->desc_packed[last_used_idx];
224 
225 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
226 
227 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
228 		desc_base[i].id = ids[i];
229 		desc_base[i].len = lens[i];
230 	}
231 
232 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
233 
234 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
235 		desc_base[i].flags = flags;
236 	}
237 
238 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
239 				   sizeof(struct vring_packed_desc),
240 				   sizeof(struct vring_packed_desc) *
241 				   PACKED_BATCH_SIZE);
242 	vhost_log_cache_sync(dev, vq);
243 
244 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
245 }
246 
247 static __rte_always_inline void
248 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
249 					  uint16_t id)
250 {
251 	vq->shadow_used_packed[0].id = id;
252 
253 	if (!vq->shadow_used_idx) {
254 		vq->shadow_last_used_idx = vq->last_used_idx;
255 		vq->shadow_used_packed[0].flags =
256 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
257 		vq->shadow_used_packed[0].len = 0;
258 		vq->shadow_used_packed[0].count = 1;
259 		vq->shadow_used_idx++;
260 	}
261 
262 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
263 }
264 
265 static __rte_always_inline void
266 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
267 				  struct vhost_virtqueue *vq,
268 				  uint16_t *ids)
269 {
270 	uint16_t flags;
271 	uint16_t i;
272 	uint16_t begin;
273 
274 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
275 
276 	if (!vq->shadow_used_idx) {
277 		vq->shadow_last_used_idx = vq->last_used_idx;
278 		vq->shadow_used_packed[0].id  = ids[0];
279 		vq->shadow_used_packed[0].len = 0;
280 		vq->shadow_used_packed[0].count = 1;
281 		vq->shadow_used_packed[0].flags = flags;
282 		vq->shadow_used_idx++;
283 		begin = 1;
284 	} else
285 		begin = 0;
286 
287 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
288 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
289 		vq->desc_packed[vq->last_used_idx + i].len = 0;
290 	}
291 
292 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
293 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
294 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
295 
296 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
297 				   sizeof(struct vring_packed_desc),
298 				   sizeof(struct vring_packed_desc) *
299 				   PACKED_BATCH_SIZE);
300 	vhost_log_cache_sync(dev, vq);
301 
302 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
303 }
304 
305 static __rte_always_inline void
306 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
307 				   uint16_t buf_id,
308 				   uint16_t count)
309 {
310 	uint16_t flags;
311 
312 	flags = vq->desc_packed[vq->last_used_idx].flags;
313 	if (vq->used_wrap_counter) {
314 		flags |= VRING_DESC_F_USED;
315 		flags |= VRING_DESC_F_AVAIL;
316 	} else {
317 		flags &= ~VRING_DESC_F_USED;
318 		flags &= ~VRING_DESC_F_AVAIL;
319 	}
320 
321 	if (!vq->shadow_used_idx) {
322 		vq->shadow_last_used_idx = vq->last_used_idx;
323 
324 		vq->shadow_used_packed[0].id  = buf_id;
325 		vq->shadow_used_packed[0].len = 0;
326 		vq->shadow_used_packed[0].flags = flags;
327 		vq->shadow_used_idx++;
328 	} else {
329 		vq->desc_packed[vq->last_used_idx].id = buf_id;
330 		vq->desc_packed[vq->last_used_idx].len = 0;
331 		vq->desc_packed[vq->last_used_idx].flags = flags;
332 	}
333 
334 	vq_inc_last_used_packed(vq, count);
335 }
336 
337 static __rte_always_inline void
338 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
339 					   uint16_t buf_id,
340 					   uint16_t count)
341 {
342 	uint16_t flags;
343 
344 	vq->shadow_used_packed[0].id = buf_id;
345 
346 	flags = vq->desc_packed[vq->last_used_idx].flags;
347 	if (vq->used_wrap_counter) {
348 		flags |= VRING_DESC_F_USED;
349 		flags |= VRING_DESC_F_AVAIL;
350 	} else {
351 		flags &= ~VRING_DESC_F_USED;
352 		flags &= ~VRING_DESC_F_AVAIL;
353 	}
354 
355 	if (!vq->shadow_used_idx) {
356 		vq->shadow_last_used_idx = vq->last_used_idx;
357 		vq->shadow_used_packed[0].len = 0;
358 		vq->shadow_used_packed[0].flags = flags;
359 		vq->shadow_used_idx++;
360 	}
361 
362 	vq_inc_last_used_packed(vq, count);
363 }
364 
365 static __rte_always_inline void
366 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
367 				   uint32_t *len,
368 				   uint16_t *id,
369 				   uint16_t *count,
370 				   uint16_t num_buffers)
371 {
372 	uint16_t i;
373 
374 	for (i = 0; i < num_buffers; i++) {
375 		/* enqueue shadow flush action aligned with batch num */
376 		if (!vq->shadow_used_idx)
377 			vq->shadow_aligned_idx = vq->last_used_idx &
378 				PACKED_BATCH_MASK;
379 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
380 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
381 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
382 		vq->shadow_aligned_idx += count[i];
383 		vq->shadow_used_idx++;
384 	}
385 }
386 
387 static __rte_always_inline void
388 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
389 				   struct vhost_virtqueue *vq,
390 				   uint32_t *len,
391 				   uint16_t *id,
392 				   uint16_t *count,
393 				   uint16_t num_buffers)
394 {
395 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
396 
397 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
398 		do_data_copy_enqueue(dev, vq);
399 		vhost_flush_enqueue_shadow_packed(dev, vq);
400 	}
401 }
402 
403 /* avoid write operation when necessary, to lessen cache issues */
404 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
405 	if ((var) != (val))			\
406 		(var) = (val);			\
407 } while (0)
408 
409 static __rte_always_inline void
410 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
411 {
412 	uint64_t csum_l4 = m_buf->ol_flags & RTE_MBUF_F_TX_L4_MASK;
413 
414 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
415 		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
416 
417 	if (csum_l4) {
418 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
419 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
420 
421 		switch (csum_l4) {
422 		case RTE_MBUF_F_TX_TCP_CKSUM:
423 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
424 						cksum));
425 			break;
426 		case RTE_MBUF_F_TX_UDP_CKSUM:
427 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
428 						dgram_cksum));
429 			break;
430 		case RTE_MBUF_F_TX_SCTP_CKSUM:
431 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
432 						cksum));
433 			break;
434 		}
435 	} else {
436 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
437 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
438 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
439 	}
440 
441 	/* IP cksum verification cannot be bypassed, then calculate here */
442 	if (m_buf->ol_flags & RTE_MBUF_F_TX_IP_CKSUM) {
443 		struct rte_ipv4_hdr *ipv4_hdr;
444 
445 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
446 						   m_buf->l2_len);
447 		ipv4_hdr->hdr_checksum = 0;
448 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
449 	}
450 
451 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
452 		if (m_buf->ol_flags & RTE_MBUF_F_TX_IPV4)
453 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
454 		else
455 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
456 		net_hdr->gso_size = m_buf->tso_segsz;
457 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
458 					+ m_buf->l4_len;
459 	} else if (m_buf->ol_flags & RTE_MBUF_F_TX_UDP_SEG) {
460 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
461 		net_hdr->gso_size = m_buf->tso_segsz;
462 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
463 			m_buf->l4_len;
464 	} else {
465 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
466 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
467 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
468 	}
469 }
470 
471 static __rte_always_inline int
472 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
473 		struct buf_vector *buf_vec, uint16_t *vec_idx,
474 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
475 {
476 	uint16_t vec_id = *vec_idx;
477 
478 	while (desc_len) {
479 		uint64_t desc_addr;
480 		uint64_t desc_chunck_len = desc_len;
481 
482 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
483 			return -1;
484 
485 		desc_addr = vhost_iova_to_vva(dev, vq,
486 				desc_iova,
487 				&desc_chunck_len,
488 				perm);
489 		if (unlikely(!desc_addr))
490 			return -1;
491 
492 		rte_prefetch0((void *)(uintptr_t)desc_addr);
493 
494 		buf_vec[vec_id].buf_iova = desc_iova;
495 		buf_vec[vec_id].buf_addr = desc_addr;
496 		buf_vec[vec_id].buf_len  = desc_chunck_len;
497 
498 		desc_len -= desc_chunck_len;
499 		desc_iova += desc_chunck_len;
500 		vec_id++;
501 	}
502 	*vec_idx = vec_id;
503 
504 	return 0;
505 }
506 
507 static __rte_always_inline int
508 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
509 			 uint32_t avail_idx, uint16_t *vec_idx,
510 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
511 			 uint32_t *desc_chain_len, uint8_t perm)
512 {
513 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
514 	uint16_t vec_id = *vec_idx;
515 	uint32_t len    = 0;
516 	uint64_t dlen;
517 	uint32_t nr_descs = vq->size;
518 	uint32_t cnt    = 0;
519 	struct vring_desc *descs = vq->desc;
520 	struct vring_desc *idesc = NULL;
521 
522 	if (unlikely(idx >= vq->size))
523 		return -1;
524 
525 	*desc_chain_head = idx;
526 
527 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
528 		dlen = vq->desc[idx].len;
529 		nr_descs = dlen / sizeof(struct vring_desc);
530 		if (unlikely(nr_descs > vq->size))
531 			return -1;
532 
533 		descs = (struct vring_desc *)(uintptr_t)
534 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
535 						&dlen,
536 						VHOST_ACCESS_RO);
537 		if (unlikely(!descs))
538 			return -1;
539 
540 		if (unlikely(dlen < vq->desc[idx].len)) {
541 			/*
542 			 * The indirect desc table is not contiguous
543 			 * in process VA space, we have to copy it.
544 			 */
545 			idesc = vhost_alloc_copy_ind_table(dev, vq,
546 					vq->desc[idx].addr, vq->desc[idx].len);
547 			if (unlikely(!idesc))
548 				return -1;
549 
550 			descs = idesc;
551 		}
552 
553 		idx = 0;
554 	}
555 
556 	while (1) {
557 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
558 			free_ind_table(idesc);
559 			return -1;
560 		}
561 
562 		dlen = descs[idx].len;
563 		len += dlen;
564 
565 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
566 						descs[idx].addr, dlen,
567 						perm))) {
568 			free_ind_table(idesc);
569 			return -1;
570 		}
571 
572 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
573 			break;
574 
575 		idx = descs[idx].next;
576 	}
577 
578 	*desc_chain_len = len;
579 	*vec_idx = vec_id;
580 
581 	if (unlikely(!!idesc))
582 		free_ind_table(idesc);
583 
584 	return 0;
585 }
586 
587 /*
588  * Returns -1 on fail, 0 on success
589  */
590 static inline int
591 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
592 				uint32_t size, struct buf_vector *buf_vec,
593 				uint16_t *num_buffers, uint16_t avail_head,
594 				uint16_t *nr_vec)
595 {
596 	uint16_t cur_idx;
597 	uint16_t vec_idx = 0;
598 	uint16_t max_tries, tries = 0;
599 
600 	uint16_t head_idx = 0;
601 	uint32_t len = 0;
602 
603 	*num_buffers = 0;
604 	cur_idx  = vq->last_avail_idx;
605 
606 	if (rxvq_is_mergeable(dev))
607 		max_tries = vq->size - 1;
608 	else
609 		max_tries = 1;
610 
611 	while (size > 0) {
612 		if (unlikely(cur_idx == avail_head))
613 			return -1;
614 		/*
615 		 * if we tried all available ring items, and still
616 		 * can't get enough buf, it means something abnormal
617 		 * happened.
618 		 */
619 		if (unlikely(++tries > max_tries))
620 			return -1;
621 
622 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
623 						&vec_idx, buf_vec,
624 						&head_idx, &len,
625 						VHOST_ACCESS_RW) < 0))
626 			return -1;
627 		len = RTE_MIN(len, size);
628 		update_shadow_used_ring_split(vq, head_idx, len);
629 		size -= len;
630 
631 		cur_idx++;
632 		*num_buffers += 1;
633 	}
634 
635 	*nr_vec = vec_idx;
636 
637 	return 0;
638 }
639 
640 static __rte_always_inline int
641 fill_vec_buf_packed_indirect(struct virtio_net *dev,
642 			struct vhost_virtqueue *vq,
643 			struct vring_packed_desc *desc, uint16_t *vec_idx,
644 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
645 {
646 	uint16_t i;
647 	uint32_t nr_descs;
648 	uint16_t vec_id = *vec_idx;
649 	uint64_t dlen;
650 	struct vring_packed_desc *descs, *idescs = NULL;
651 
652 	dlen = desc->len;
653 	descs = (struct vring_packed_desc *)(uintptr_t)
654 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
655 	if (unlikely(!descs))
656 		return -1;
657 
658 	if (unlikely(dlen < desc->len)) {
659 		/*
660 		 * The indirect desc table is not contiguous
661 		 * in process VA space, we have to copy it.
662 		 */
663 		idescs = vhost_alloc_copy_ind_table(dev,
664 				vq, desc->addr, desc->len);
665 		if (unlikely(!idescs))
666 			return -1;
667 
668 		descs = idescs;
669 	}
670 
671 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
672 	if (unlikely(nr_descs >= vq->size)) {
673 		free_ind_table(idescs);
674 		return -1;
675 	}
676 
677 	for (i = 0; i < nr_descs; i++) {
678 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
679 			free_ind_table(idescs);
680 			return -1;
681 		}
682 
683 		dlen = descs[i].len;
684 		*len += dlen;
685 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
686 						descs[i].addr, dlen,
687 						perm)))
688 			return -1;
689 	}
690 	*vec_idx = vec_id;
691 
692 	if (unlikely(!!idescs))
693 		free_ind_table(idescs);
694 
695 	return 0;
696 }
697 
698 static __rte_always_inline int
699 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
700 				uint16_t avail_idx, uint16_t *desc_count,
701 				struct buf_vector *buf_vec, uint16_t *vec_idx,
702 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
703 {
704 	bool wrap_counter = vq->avail_wrap_counter;
705 	struct vring_packed_desc *descs = vq->desc_packed;
706 	uint16_t vec_id = *vec_idx;
707 	uint64_t dlen;
708 
709 	if (avail_idx < vq->last_avail_idx)
710 		wrap_counter ^= 1;
711 
712 	/*
713 	 * Perform a load-acquire barrier in desc_is_avail to
714 	 * enforce the ordering between desc flags and desc
715 	 * content.
716 	 */
717 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
718 		return -1;
719 
720 	*desc_count = 0;
721 	*len = 0;
722 
723 	while (1) {
724 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
725 			return -1;
726 
727 		if (unlikely(*desc_count >= vq->size))
728 			return -1;
729 
730 		*desc_count += 1;
731 		*buf_id = descs[avail_idx].id;
732 
733 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
734 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
735 							&descs[avail_idx],
736 							&vec_id, buf_vec,
737 							len, perm) < 0))
738 				return -1;
739 		} else {
740 			dlen = descs[avail_idx].len;
741 			*len += dlen;
742 
743 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
744 							descs[avail_idx].addr,
745 							dlen,
746 							perm)))
747 				return -1;
748 		}
749 
750 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
751 			break;
752 
753 		if (++avail_idx >= vq->size) {
754 			avail_idx -= vq->size;
755 			wrap_counter ^= 1;
756 		}
757 	}
758 
759 	*vec_idx = vec_id;
760 
761 	return 0;
762 }
763 
764 static __rte_noinline void
765 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
766 		struct buf_vector *buf_vec,
767 		struct virtio_net_hdr_mrg_rxbuf *hdr)
768 {
769 	uint64_t len;
770 	uint64_t remain = dev->vhost_hlen;
771 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
772 	uint64_t iova = buf_vec->buf_iova;
773 
774 	while (remain) {
775 		len = RTE_MIN(remain,
776 				buf_vec->buf_len);
777 		dst = buf_vec->buf_addr;
778 		rte_memcpy((void *)(uintptr_t)dst,
779 				(void *)(uintptr_t)src,
780 				len);
781 
782 		PRINT_PACKET(dev, (uintptr_t)dst,
783 				(uint32_t)len, 0);
784 		vhost_log_cache_write_iova(dev, vq,
785 				iova, len);
786 
787 		remain -= len;
788 		iova += len;
789 		src += len;
790 		buf_vec++;
791 	}
792 }
793 
794 static __rte_always_inline int
795 async_iter_initialize(struct virtio_net *dev, struct vhost_async *async)
796 {
797 	struct rte_vhost_iov_iter *iter;
798 
799 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
800 		VHOST_LOG_DATA(ERR, "(%s) no more async iovec available\n", dev->ifname);
801 		return -1;
802 	}
803 
804 	iter = async->iov_iter + async->iter_idx;
805 	iter->iov = async->iovec + async->iovec_idx;
806 	iter->nr_segs = 0;
807 
808 	return 0;
809 }
810 
811 static __rte_always_inline int
812 async_iter_add_iovec(struct virtio_net *dev, struct vhost_async *async,
813 		void *src, void *dst, size_t len)
814 {
815 	struct rte_vhost_iov_iter *iter;
816 	struct rte_vhost_iovec *iovec;
817 
818 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
819 		static bool vhost_max_async_vec_log;
820 
821 		if (!vhost_max_async_vec_log) {
822 			VHOST_LOG_DATA(ERR, "(%s) no more async iovec available\n", dev->ifname);
823 			vhost_max_async_vec_log = true;
824 		}
825 
826 		return -1;
827 	}
828 
829 	iter = async->iov_iter + async->iter_idx;
830 	iovec = async->iovec + async->iovec_idx;
831 
832 	iovec->src_addr = src;
833 	iovec->dst_addr = dst;
834 	iovec->len = len;
835 
836 	iter->nr_segs++;
837 	async->iovec_idx++;
838 
839 	return 0;
840 }
841 
842 static __rte_always_inline void
843 async_iter_finalize(struct vhost_async *async)
844 {
845 	async->iter_idx++;
846 }
847 
848 static __rte_always_inline void
849 async_iter_cancel(struct vhost_async *async)
850 {
851 	struct rte_vhost_iov_iter *iter;
852 
853 	iter = async->iov_iter + async->iter_idx;
854 	async->iovec_idx -= iter->nr_segs;
855 	iter->nr_segs = 0;
856 	iter->iov = NULL;
857 }
858 
859 static __rte_always_inline void
860 async_iter_reset(struct vhost_async *async)
861 {
862 	async->iter_idx = 0;
863 	async->iovec_idx = 0;
864 }
865 
866 static __rte_always_inline int
867 async_mbuf_to_desc_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
868 		struct rte_mbuf *m, uint32_t mbuf_offset,
869 		uint64_t buf_iova, uint32_t cpy_len)
870 {
871 	struct vhost_async *async = vq->async;
872 	uint64_t mapped_len;
873 	uint32_t buf_offset = 0;
874 	void *hpa;
875 
876 	while (cpy_len) {
877 		hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev,
878 				buf_iova + buf_offset, cpy_len, &mapped_len);
879 		if (unlikely(!hpa)) {
880 			VHOST_LOG_DATA(ERR, "(%s) %s: failed to get hpa.\n", dev->ifname, __func__);
881 			return -1;
882 		}
883 
884 		if (unlikely(async_iter_add_iovec(dev, async,
885 						(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
886 							mbuf_offset),
887 						hpa, (size_t)mapped_len)))
888 			return -1;
889 
890 		cpy_len -= (uint32_t)mapped_len;
891 		mbuf_offset += (uint32_t)mapped_len;
892 		buf_offset += (uint32_t)mapped_len;
893 	}
894 
895 	return 0;
896 }
897 
898 static __rte_always_inline void
899 sync_mbuf_to_desc_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
900 		struct rte_mbuf *m, uint32_t mbuf_offset,
901 		uint64_t buf_addr, uint64_t buf_iova, uint32_t cpy_len)
902 {
903 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
904 
905 	if (likely(cpy_len > MAX_BATCH_LEN || vq->batch_copy_nb_elems >= vq->size)) {
906 		rte_memcpy((void *)((uintptr_t)(buf_addr)),
907 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
908 				cpy_len);
909 		vhost_log_cache_write_iova(dev, vq, buf_iova, cpy_len);
910 		PRINT_PACKET(dev, (uintptr_t)(buf_addr), cpy_len, 0);
911 	} else {
912 		batch_copy[vq->batch_copy_nb_elems].dst =
913 			(void *)((uintptr_t)(buf_addr));
914 		batch_copy[vq->batch_copy_nb_elems].src =
915 			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
916 		batch_copy[vq->batch_copy_nb_elems].log_addr = buf_iova;
917 		batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
918 		vq->batch_copy_nb_elems++;
919 	}
920 }
921 
922 static __rte_always_inline int
923 mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
924 		struct rte_mbuf *m, struct buf_vector *buf_vec,
925 		uint16_t nr_vec, uint16_t num_buffers, bool is_async)
926 {
927 	uint32_t vec_idx = 0;
928 	uint32_t mbuf_offset, mbuf_avail;
929 	uint32_t buf_offset, buf_avail;
930 	uint64_t buf_addr, buf_iova, buf_len;
931 	uint32_t cpy_len;
932 	uint64_t hdr_addr;
933 	struct rte_mbuf *hdr_mbuf;
934 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
935 	struct vhost_async *async = vq->async;
936 
937 	if (unlikely(m == NULL))
938 		return -1;
939 
940 	buf_addr = buf_vec[vec_idx].buf_addr;
941 	buf_iova = buf_vec[vec_idx].buf_iova;
942 	buf_len = buf_vec[vec_idx].buf_len;
943 
944 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
945 		return -1;
946 
947 	hdr_mbuf = m;
948 	hdr_addr = buf_addr;
949 	if (unlikely(buf_len < dev->vhost_hlen)) {
950 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
951 		hdr = &tmp_hdr;
952 	} else
953 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
954 
955 	VHOST_LOG_DATA(DEBUG, "(%s) RX: num merge buffers %d\n",
956 		dev->ifname, num_buffers);
957 
958 	if (unlikely(buf_len < dev->vhost_hlen)) {
959 		buf_offset = dev->vhost_hlen - buf_len;
960 		vec_idx++;
961 		buf_addr = buf_vec[vec_idx].buf_addr;
962 		buf_iova = buf_vec[vec_idx].buf_iova;
963 		buf_len = buf_vec[vec_idx].buf_len;
964 		buf_avail = buf_len - buf_offset;
965 	} else {
966 		buf_offset = dev->vhost_hlen;
967 		buf_avail = buf_len - dev->vhost_hlen;
968 	}
969 
970 	mbuf_avail  = rte_pktmbuf_data_len(m);
971 	mbuf_offset = 0;
972 
973 	if (is_async) {
974 		if (async_iter_initialize(dev, async))
975 			return -1;
976 	}
977 
978 	while (mbuf_avail != 0 || m->next != NULL) {
979 		/* done with current buf, get the next one */
980 		if (buf_avail == 0) {
981 			vec_idx++;
982 			if (unlikely(vec_idx >= nr_vec))
983 				goto error;
984 
985 			buf_addr = buf_vec[vec_idx].buf_addr;
986 			buf_iova = buf_vec[vec_idx].buf_iova;
987 			buf_len = buf_vec[vec_idx].buf_len;
988 
989 			buf_offset = 0;
990 			buf_avail  = buf_len;
991 		}
992 
993 		/* done with current mbuf, get the next one */
994 		if (mbuf_avail == 0) {
995 			m = m->next;
996 
997 			mbuf_offset = 0;
998 			mbuf_avail  = rte_pktmbuf_data_len(m);
999 		}
1000 
1001 		if (hdr_addr) {
1002 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1003 			if (rxvq_is_mergeable(dev))
1004 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1005 						num_buffers);
1006 
1007 			if (unlikely(hdr == &tmp_hdr)) {
1008 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1009 			} else {
1010 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1011 						dev->vhost_hlen, 0);
1012 				vhost_log_cache_write_iova(dev, vq,
1013 						buf_vec[0].buf_iova,
1014 						dev->vhost_hlen);
1015 			}
1016 
1017 			hdr_addr = 0;
1018 		}
1019 
1020 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1021 
1022 		if (is_async) {
1023 			if (async_mbuf_to_desc_seg(dev, vq, m, mbuf_offset,
1024 						buf_iova + buf_offset, cpy_len) < 0)
1025 				goto error;
1026 		} else {
1027 			sync_mbuf_to_desc_seg(dev, vq, m, mbuf_offset,
1028 					buf_addr + buf_offset,
1029 					buf_iova + buf_offset, cpy_len);
1030 		}
1031 
1032 		mbuf_avail  -= cpy_len;
1033 		mbuf_offset += cpy_len;
1034 		buf_avail  -= cpy_len;
1035 		buf_offset += cpy_len;
1036 	}
1037 
1038 	if (is_async)
1039 		async_iter_finalize(async);
1040 
1041 	return 0;
1042 error:
1043 	if (is_async)
1044 		async_iter_cancel(async);
1045 
1046 	return -1;
1047 }
1048 
1049 static __rte_always_inline int
1050 vhost_enqueue_single_packed(struct virtio_net *dev,
1051 			    struct vhost_virtqueue *vq,
1052 			    struct rte_mbuf *pkt,
1053 			    struct buf_vector *buf_vec,
1054 			    uint16_t *nr_descs)
1055 {
1056 	uint16_t nr_vec = 0;
1057 	uint16_t avail_idx = vq->last_avail_idx;
1058 	uint16_t max_tries, tries = 0;
1059 	uint16_t buf_id = 0;
1060 	uint32_t len = 0;
1061 	uint16_t desc_count;
1062 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1063 	uint16_t num_buffers = 0;
1064 	uint32_t buffer_len[vq->size];
1065 	uint16_t buffer_buf_id[vq->size];
1066 	uint16_t buffer_desc_count[vq->size];
1067 
1068 	if (rxvq_is_mergeable(dev))
1069 		max_tries = vq->size - 1;
1070 	else
1071 		max_tries = 1;
1072 
1073 	while (size > 0) {
1074 		/*
1075 		 * if we tried all available ring items, and still
1076 		 * can't get enough buf, it means something abnormal
1077 		 * happened.
1078 		 */
1079 		if (unlikely(++tries > max_tries))
1080 			return -1;
1081 
1082 		if (unlikely(fill_vec_buf_packed(dev, vq,
1083 						avail_idx, &desc_count,
1084 						buf_vec, &nr_vec,
1085 						&buf_id, &len,
1086 						VHOST_ACCESS_RW) < 0))
1087 			return -1;
1088 
1089 		len = RTE_MIN(len, size);
1090 		size -= len;
1091 
1092 		buffer_len[num_buffers] = len;
1093 		buffer_buf_id[num_buffers] = buf_id;
1094 		buffer_desc_count[num_buffers] = desc_count;
1095 		num_buffers += 1;
1096 
1097 		*nr_descs += desc_count;
1098 		avail_idx += desc_count;
1099 		if (avail_idx >= vq->size)
1100 			avail_idx -= vq->size;
1101 	}
1102 
1103 	if (mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers, false) < 0)
1104 		return -1;
1105 
1106 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1107 					   buffer_desc_count, num_buffers);
1108 
1109 	return 0;
1110 }
1111 
1112 static __rte_noinline uint32_t
1113 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1114 	struct rte_mbuf **pkts, uint32_t count)
1115 {
1116 	uint32_t pkt_idx = 0;
1117 	uint16_t num_buffers;
1118 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1119 	uint16_t avail_head;
1120 
1121 	/*
1122 	 * The ordering between avail index and
1123 	 * desc reads needs to be enforced.
1124 	 */
1125 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1126 
1127 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1128 
1129 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1130 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1131 		uint16_t nr_vec = 0;
1132 
1133 		if (unlikely(reserve_avail_buf_split(dev, vq,
1134 						pkt_len, buf_vec, &num_buffers,
1135 						avail_head, &nr_vec) < 0)) {
1136 			VHOST_LOG_DATA(DEBUG,
1137 				"(%s) failed to get enough desc from vring\n",
1138 				dev->ifname);
1139 			vq->shadow_used_idx -= num_buffers;
1140 			break;
1141 		}
1142 
1143 		VHOST_LOG_DATA(DEBUG, "(%s) current index %d | end index %d\n",
1144 			dev->ifname, vq->last_avail_idx,
1145 			vq->last_avail_idx + num_buffers);
1146 
1147 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec,
1148 					num_buffers, false) < 0) {
1149 			vq->shadow_used_idx -= num_buffers;
1150 			break;
1151 		}
1152 
1153 		vq->last_avail_idx += num_buffers;
1154 	}
1155 
1156 	do_data_copy_enqueue(dev, vq);
1157 
1158 	if (likely(vq->shadow_used_idx)) {
1159 		flush_shadow_used_ring_split(dev, vq);
1160 		vhost_vring_call_split(dev, vq);
1161 	}
1162 
1163 	return pkt_idx;
1164 }
1165 
1166 static __rte_always_inline int
1167 virtio_dev_rx_sync_batch_check(struct virtio_net *dev,
1168 			   struct vhost_virtqueue *vq,
1169 			   struct rte_mbuf **pkts,
1170 			   uint64_t *desc_addrs,
1171 			   uint64_t *lens)
1172 {
1173 	bool wrap_counter = vq->avail_wrap_counter;
1174 	struct vring_packed_desc *descs = vq->desc_packed;
1175 	uint16_t avail_idx = vq->last_avail_idx;
1176 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1177 	uint16_t i;
1178 
1179 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1180 		return -1;
1181 
1182 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1183 		return -1;
1184 
1185 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1186 		if (unlikely(pkts[i]->next != NULL))
1187 			return -1;
1188 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1189 					    wrap_counter)))
1190 			return -1;
1191 	}
1192 
1193 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1194 		lens[i] = descs[avail_idx + i].len;
1195 
1196 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1197 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1198 			return -1;
1199 	}
1200 
1201 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1202 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1203 						  descs[avail_idx + i].addr,
1204 						  &lens[i],
1205 						  VHOST_ACCESS_RW);
1206 
1207 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1208 		if (unlikely(!desc_addrs[i]))
1209 			return -1;
1210 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1211 			return -1;
1212 	}
1213 
1214 	return 0;
1215 }
1216 
1217 static __rte_always_inline void
1218 virtio_dev_rx_batch_packed_copy(struct virtio_net *dev,
1219 			   struct vhost_virtqueue *vq,
1220 			   struct rte_mbuf **pkts,
1221 			   uint64_t *desc_addrs,
1222 			   uint64_t *lens)
1223 {
1224 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1225 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1226 	struct vring_packed_desc *descs = vq->desc_packed;
1227 	uint16_t avail_idx = vq->last_avail_idx;
1228 	uint16_t ids[PACKED_BATCH_SIZE];
1229 	uint16_t i;
1230 
1231 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1232 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1233 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1234 					(uintptr_t)desc_addrs[i];
1235 		lens[i] = pkts[i]->pkt_len +
1236 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1237 	}
1238 
1239 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1240 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1241 
1242 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1243 
1244 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1245 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1246 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1247 			   pkts[i]->pkt_len);
1248 	}
1249 
1250 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1251 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1252 					   lens[i]);
1253 
1254 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1255 		ids[i] = descs[avail_idx + i].id;
1256 
1257 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1258 }
1259 
1260 static __rte_always_inline int
1261 virtio_dev_rx_sync_batch_packed(struct virtio_net *dev,
1262 			   struct vhost_virtqueue *vq,
1263 			   struct rte_mbuf **pkts)
1264 {
1265 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1266 	uint64_t lens[PACKED_BATCH_SIZE];
1267 
1268 	if (virtio_dev_rx_sync_batch_check(dev, vq, pkts, desc_addrs, lens) == -1)
1269 		return -1;
1270 
1271 	if (vq->shadow_used_idx) {
1272 		do_data_copy_enqueue(dev, vq);
1273 		vhost_flush_enqueue_shadow_packed(dev, vq);
1274 	}
1275 
1276 	virtio_dev_rx_batch_packed_copy(dev, vq, pkts, desc_addrs, lens);
1277 
1278 	return 0;
1279 }
1280 
1281 static __rte_always_inline int16_t
1282 virtio_dev_rx_single_packed(struct virtio_net *dev,
1283 			    struct vhost_virtqueue *vq,
1284 			    struct rte_mbuf *pkt)
1285 {
1286 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1287 	uint16_t nr_descs = 0;
1288 
1289 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1290 						 &nr_descs) < 0)) {
1291 		VHOST_LOG_DATA(DEBUG, "(%s) failed to get enough desc from vring\n",
1292 				dev->ifname);
1293 		return -1;
1294 	}
1295 
1296 	VHOST_LOG_DATA(DEBUG, "(%s) current index %d | end index %d\n",
1297 			dev->ifname, vq->last_avail_idx,
1298 			vq->last_avail_idx + nr_descs);
1299 
1300 	vq_inc_last_avail_packed(vq, nr_descs);
1301 
1302 	return 0;
1303 }
1304 
1305 static __rte_noinline uint32_t
1306 virtio_dev_rx_packed(struct virtio_net *dev,
1307 		     struct vhost_virtqueue *__rte_restrict vq,
1308 		     struct rte_mbuf **__rte_restrict pkts,
1309 		     uint32_t count)
1310 {
1311 	uint32_t pkt_idx = 0;
1312 
1313 	do {
1314 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1315 
1316 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1317 			if (!virtio_dev_rx_sync_batch_packed(dev, vq,
1318 							&pkts[pkt_idx])) {
1319 				pkt_idx += PACKED_BATCH_SIZE;
1320 				continue;
1321 			}
1322 		}
1323 
1324 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1325 			break;
1326 		pkt_idx++;
1327 
1328 	} while (pkt_idx < count);
1329 
1330 	if (vq->shadow_used_idx) {
1331 		do_data_copy_enqueue(dev, vq);
1332 		vhost_flush_enqueue_shadow_packed(dev, vq);
1333 	}
1334 
1335 	if (pkt_idx)
1336 		vhost_vring_call_packed(dev, vq);
1337 
1338 	return pkt_idx;
1339 }
1340 
1341 static __rte_always_inline uint32_t
1342 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
1343 	struct rte_mbuf **pkts, uint32_t count)
1344 {
1345 	struct vhost_virtqueue *vq;
1346 	uint32_t nb_tx = 0;
1347 
1348 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
1349 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1350 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
1351 			dev->ifname, __func__, queue_id);
1352 		return 0;
1353 	}
1354 
1355 	vq = dev->virtqueue[queue_id];
1356 
1357 	rte_spinlock_lock(&vq->access_lock);
1358 
1359 	if (unlikely(!vq->enabled))
1360 		goto out_access_unlock;
1361 
1362 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1363 		vhost_user_iotlb_rd_lock(vq);
1364 
1365 	if (unlikely(!vq->access_ok))
1366 		if (unlikely(vring_translate(dev, vq) < 0))
1367 			goto out;
1368 
1369 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1370 	if (count == 0)
1371 		goto out;
1372 
1373 	if (vq_is_packed(dev))
1374 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1375 	else
1376 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1377 
1378 out:
1379 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1380 		vhost_user_iotlb_rd_unlock(vq);
1381 
1382 out_access_unlock:
1383 	rte_spinlock_unlock(&vq->access_lock);
1384 
1385 	return nb_tx;
1386 }
1387 
1388 uint16_t
1389 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1390 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1391 {
1392 	struct virtio_net *dev = get_device(vid);
1393 
1394 	if (!dev)
1395 		return 0;
1396 
1397 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1398 		VHOST_LOG_DATA(ERR, "(%s) %s: built-in vhost net backend is disabled.\n",
1399 			dev->ifname, __func__);
1400 		return 0;
1401 	}
1402 
1403 	return virtio_dev_rx(dev, queue_id, pkts, count);
1404 }
1405 
1406 static __rte_always_inline uint16_t
1407 async_get_first_inflight_pkt_idx(struct vhost_virtqueue *vq)
1408 {
1409 	struct vhost_async *async = vq->async;
1410 
1411 	if (async->pkts_idx >= async->pkts_inflight_n)
1412 		return async->pkts_idx - async->pkts_inflight_n;
1413 	else
1414 		return vq->size - async->pkts_inflight_n + async->pkts_idx;
1415 }
1416 
1417 static __rte_always_inline void
1418 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1419 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1420 {
1421 	size_t elem_size = sizeof(struct vring_used_elem);
1422 
1423 	if (d_idx + count <= ring_size) {
1424 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1425 	} else {
1426 		uint16_t size = ring_size - d_idx;
1427 
1428 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1429 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1430 	}
1431 }
1432 
1433 static __rte_always_inline void
1434 store_dma_desc_info_packed(struct vring_used_elem_packed *s_ring,
1435 		struct vring_used_elem_packed *d_ring,
1436 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1437 {
1438 	size_t elem_size = sizeof(struct vring_used_elem_packed);
1439 
1440 	if (d_idx + count <= ring_size) {
1441 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1442 	} else {
1443 		uint16_t size = ring_size - d_idx;
1444 
1445 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1446 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1447 	}
1448 }
1449 
1450 static __rte_noinline uint32_t
1451 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
1452 	struct vhost_virtqueue *vq, uint16_t queue_id,
1453 	struct rte_mbuf **pkts, uint32_t count)
1454 {
1455 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1456 	uint32_t pkt_idx = 0;
1457 	uint16_t num_buffers;
1458 	uint16_t avail_head;
1459 
1460 	struct vhost_async *async = vq->async;
1461 	struct async_inflight_info *pkts_info = async->pkts_info;
1462 	uint32_t pkt_err = 0;
1463 	int32_t n_xfer;
1464 	uint16_t slot_idx = 0;
1465 
1466 	/*
1467 	 * The ordering between avail index and desc reads need to be enforced.
1468 	 */
1469 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1470 
1471 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1472 
1473 	async_iter_reset(async);
1474 
1475 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1476 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1477 		uint16_t nr_vec = 0;
1478 
1479 		if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec,
1480 						&num_buffers, avail_head, &nr_vec) < 0)) {
1481 			VHOST_LOG_DATA(DEBUG, "(%s) failed to get enough desc from vring\n",
1482 					dev->ifname);
1483 			vq->shadow_used_idx -= num_buffers;
1484 			break;
1485 		}
1486 
1487 		VHOST_LOG_DATA(DEBUG, "(%s) current index %d | end index %d\n",
1488 			dev->ifname, vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1489 
1490 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers, true) < 0) {
1491 			vq->shadow_used_idx -= num_buffers;
1492 			break;
1493 		}
1494 
1495 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
1496 		pkts_info[slot_idx].descs = num_buffers;
1497 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1498 
1499 		vq->last_avail_idx += num_buffers;
1500 	}
1501 
1502 	if (unlikely(pkt_idx == 0))
1503 		return 0;
1504 
1505 	n_xfer = async->ops.transfer_data(dev->vid, queue_id, async->iov_iter, 0, pkt_idx);
1506 	if (unlikely(n_xfer < 0)) {
1507 		VHOST_LOG_DATA(ERR, "(%s) %s: failed to transfer data for queue id %d.\n",
1508 				dev->ifname, __func__, queue_id);
1509 		n_xfer = 0;
1510 	}
1511 
1512 	pkt_err = pkt_idx - n_xfer;
1513 	if (unlikely(pkt_err)) {
1514 		uint16_t num_descs = 0;
1515 
1516 		/* update number of completed packets */
1517 		pkt_idx = n_xfer;
1518 
1519 		/* calculate the sum of descriptors to revert */
1520 		while (pkt_err-- > 0) {
1521 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1522 			slot_idx--;
1523 		}
1524 
1525 		/* recover shadow used ring and available ring */
1526 		vq->shadow_used_idx -= num_descs;
1527 		vq->last_avail_idx -= num_descs;
1528 	}
1529 
1530 	/* keep used descriptors */
1531 	if (likely(vq->shadow_used_idx)) {
1532 		uint16_t to = async->desc_idx_split & (vq->size - 1);
1533 
1534 		store_dma_desc_info_split(vq->shadow_used_split,
1535 				async->descs_split, vq->size, 0, to,
1536 				vq->shadow_used_idx);
1537 
1538 		async->desc_idx_split += vq->shadow_used_idx;
1539 
1540 		async->pkts_idx += pkt_idx;
1541 		if (async->pkts_idx >= vq->size)
1542 			async->pkts_idx -= vq->size;
1543 
1544 		async->pkts_inflight_n += pkt_idx;
1545 		vq->shadow_used_idx = 0;
1546 	}
1547 
1548 	return pkt_idx;
1549 }
1550 
1551 
1552 static __rte_always_inline int
1553 vhost_enqueue_async_packed(struct virtio_net *dev,
1554 			    struct vhost_virtqueue *vq,
1555 			    struct rte_mbuf *pkt,
1556 			    struct buf_vector *buf_vec,
1557 			    uint16_t *nr_descs,
1558 			    uint16_t *nr_buffers)
1559 {
1560 	uint16_t nr_vec = 0;
1561 	uint16_t avail_idx = vq->last_avail_idx;
1562 	uint16_t max_tries, tries = 0;
1563 	uint16_t buf_id = 0;
1564 	uint32_t len = 0;
1565 	uint16_t desc_count = 0;
1566 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1567 	uint32_t buffer_len[vq->size];
1568 	uint16_t buffer_buf_id[vq->size];
1569 	uint16_t buffer_desc_count[vq->size];
1570 
1571 	if (rxvq_is_mergeable(dev))
1572 		max_tries = vq->size - 1;
1573 	else
1574 		max_tries = 1;
1575 
1576 	while (size > 0) {
1577 		/*
1578 		 * if we tried all available ring items, and still
1579 		 * can't get enough buf, it means something abnormal
1580 		 * happened.
1581 		 */
1582 		if (unlikely(++tries > max_tries))
1583 			return -1;
1584 
1585 		if (unlikely(fill_vec_buf_packed(dev, vq,
1586 						avail_idx, &desc_count,
1587 						buf_vec, &nr_vec,
1588 						&buf_id, &len,
1589 						VHOST_ACCESS_RW) < 0))
1590 			return -1;
1591 
1592 		len = RTE_MIN(len, size);
1593 		size -= len;
1594 
1595 		buffer_len[*nr_buffers] = len;
1596 		buffer_buf_id[*nr_buffers] = buf_id;
1597 		buffer_desc_count[*nr_buffers] = desc_count;
1598 		*nr_buffers += 1;
1599 		*nr_descs += desc_count;
1600 		avail_idx += desc_count;
1601 		if (avail_idx >= vq->size)
1602 			avail_idx -= vq->size;
1603 	}
1604 
1605 	if (unlikely(mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, true) < 0))
1606 		return -1;
1607 
1608 	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id, buffer_desc_count, *nr_buffers);
1609 
1610 	return 0;
1611 }
1612 
1613 static __rte_always_inline int16_t
1614 virtio_dev_rx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1615 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers)
1616 {
1617 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1618 
1619 	if (unlikely(vhost_enqueue_async_packed(dev, vq, pkt, buf_vec,
1620 					nr_descs, nr_buffers) < 0)) {
1621 		VHOST_LOG_DATA(DEBUG, "(%s) failed to get enough desc from vring\n", dev->ifname);
1622 		return -1;
1623 	}
1624 
1625 	VHOST_LOG_DATA(DEBUG, "(%s) current index %d | end index %d\n",
1626 			dev->ifname, vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1627 
1628 	return 0;
1629 }
1630 
1631 static __rte_always_inline void
1632 dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
1633 			uint32_t nr_err, uint32_t *pkt_idx)
1634 {
1635 	uint16_t descs_err = 0;
1636 	uint16_t buffers_err = 0;
1637 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
1638 
1639 	*pkt_idx -= nr_err;
1640 	/* calculate the sum of buffers and descs of DMA-error packets. */
1641 	while (nr_err-- > 0) {
1642 		descs_err += pkts_info[slot_idx % vq->size].descs;
1643 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
1644 		slot_idx--;
1645 	}
1646 
1647 	if (vq->last_avail_idx >= descs_err) {
1648 		vq->last_avail_idx -= descs_err;
1649 	} else {
1650 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
1651 		vq->avail_wrap_counter ^= 1;
1652 	}
1653 
1654 	vq->shadow_used_idx -= buffers_err;
1655 }
1656 
1657 static __rte_noinline uint32_t
1658 virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
1659 	struct vhost_virtqueue *vq, uint16_t queue_id,
1660 	struct rte_mbuf **pkts, uint32_t count)
1661 {
1662 	uint32_t pkt_idx = 0;
1663 	uint32_t remained = count;
1664 	int32_t n_xfer;
1665 	uint16_t num_buffers;
1666 	uint16_t num_descs;
1667 
1668 	struct vhost_async *async = vq->async;
1669 	struct async_inflight_info *pkts_info = async->pkts_info;
1670 	uint32_t pkt_err = 0;
1671 	uint16_t slot_idx = 0;
1672 
1673 	do {
1674 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1675 
1676 		num_buffers = 0;
1677 		num_descs = 0;
1678 		if (unlikely(virtio_dev_rx_async_packed(dev, vq, pkts[pkt_idx],
1679 						&num_descs, &num_buffers) < 0))
1680 			break;
1681 
1682 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
1683 
1684 		pkts_info[slot_idx].descs = num_descs;
1685 		pkts_info[slot_idx].nr_buffers = num_buffers;
1686 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1687 
1688 		pkt_idx++;
1689 		remained--;
1690 		vq_inc_last_avail_packed(vq, num_descs);
1691 	} while (pkt_idx < count);
1692 
1693 	if (unlikely(pkt_idx == 0))
1694 		return 0;
1695 
1696 	n_xfer = async->ops.transfer_data(dev->vid, queue_id, async->iov_iter, 0, pkt_idx);
1697 	if (unlikely(n_xfer < 0)) {
1698 		VHOST_LOG_DATA(ERR, "(%s) %s: failed to transfer data for queue id %d.\n",
1699 				dev->ifname, __func__, queue_id);
1700 		n_xfer = 0;
1701 	}
1702 
1703 	pkt_err = pkt_idx - n_xfer;
1704 
1705 	async_iter_reset(async);
1706 
1707 	if (unlikely(pkt_err))
1708 		dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
1709 
1710 	if (likely(vq->shadow_used_idx)) {
1711 		/* keep used descriptors. */
1712 		store_dma_desc_info_packed(vq->shadow_used_packed, async->buffers_packed,
1713 					vq->size, 0, async->buffer_idx_packed,
1714 					vq->shadow_used_idx);
1715 
1716 		async->buffer_idx_packed += vq->shadow_used_idx;
1717 		if (async->buffer_idx_packed >= vq->size)
1718 			async->buffer_idx_packed -= vq->size;
1719 
1720 		async->pkts_idx += pkt_idx;
1721 		if (async->pkts_idx >= vq->size)
1722 			async->pkts_idx -= vq->size;
1723 
1724 		vq->shadow_used_idx = 0;
1725 		async->pkts_inflight_n += pkt_idx;
1726 	}
1727 
1728 	return pkt_idx;
1729 }
1730 
1731 static __rte_always_inline void
1732 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
1733 {
1734 	struct vhost_async *async = vq->async;
1735 	uint16_t nr_left = n_descs;
1736 	uint16_t nr_copy;
1737 	uint16_t to, from;
1738 
1739 	do {
1740 		from = async->last_desc_idx_split & (vq->size - 1);
1741 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
1742 		to = vq->last_used_idx & (vq->size - 1);
1743 
1744 		if (to + nr_copy <= vq->size) {
1745 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
1746 					nr_copy * sizeof(struct vring_used_elem));
1747 		} else {
1748 			uint16_t size = vq->size - to;
1749 
1750 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
1751 					size * sizeof(struct vring_used_elem));
1752 			rte_memcpy(&vq->used->ring[0], &async->descs_split[from + size],
1753 					(nr_copy - size) * sizeof(struct vring_used_elem));
1754 		}
1755 
1756 		async->last_desc_idx_split += nr_copy;
1757 		vq->last_used_idx += nr_copy;
1758 		nr_left -= nr_copy;
1759 	} while (nr_left > 0);
1760 }
1761 
1762 static __rte_always_inline void
1763 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
1764 				uint16_t n_buffers)
1765 {
1766 	struct vhost_async *async = vq->async;
1767 	uint16_t from = async->last_buffer_idx_packed;
1768 	uint16_t used_idx = vq->last_used_idx;
1769 	uint16_t head_idx = vq->last_used_idx;
1770 	uint16_t head_flags = 0;
1771 	uint16_t i;
1772 
1773 	/* Split loop in two to save memory barriers */
1774 	for (i = 0; i < n_buffers; i++) {
1775 		vq->desc_packed[used_idx].id = async->buffers_packed[from].id;
1776 		vq->desc_packed[used_idx].len = async->buffers_packed[from].len;
1777 
1778 		used_idx += async->buffers_packed[from].count;
1779 		if (used_idx >= vq->size)
1780 			used_idx -= vq->size;
1781 
1782 		from++;
1783 		if (from >= vq->size)
1784 			from = 0;
1785 	}
1786 
1787 	/* The ordering for storing desc flags needs to be enforced. */
1788 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
1789 
1790 	from = async->last_buffer_idx_packed;
1791 
1792 	for (i = 0; i < n_buffers; i++) {
1793 		uint16_t flags;
1794 
1795 		if (async->buffers_packed[from].len)
1796 			flags = VRING_DESC_F_WRITE;
1797 		else
1798 			flags = 0;
1799 
1800 		if (vq->used_wrap_counter) {
1801 			flags |= VRING_DESC_F_USED;
1802 			flags |= VRING_DESC_F_AVAIL;
1803 		} else {
1804 			flags &= ~VRING_DESC_F_USED;
1805 			flags &= ~VRING_DESC_F_AVAIL;
1806 		}
1807 
1808 		if (i > 0) {
1809 			vq->desc_packed[vq->last_used_idx].flags = flags;
1810 		} else {
1811 			head_idx = vq->last_used_idx;
1812 			head_flags = flags;
1813 		}
1814 
1815 		vq_inc_last_used_packed(vq, async->buffers_packed[from].count);
1816 
1817 		from++;
1818 		if (from == vq->size)
1819 			from = 0;
1820 	}
1821 
1822 	vq->desc_packed[head_idx].flags = head_flags;
1823 	async->last_buffer_idx_packed = from;
1824 }
1825 
1826 static __rte_always_inline uint16_t
1827 vhost_poll_enqueue_completed(struct virtio_net *dev, uint16_t queue_id,
1828 		struct rte_mbuf **pkts, uint16_t count)
1829 {
1830 	struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
1831 	struct vhost_async *async = vq->async;
1832 	struct async_inflight_info *pkts_info = async->pkts_info;
1833 	int32_t n_cpl;
1834 	uint16_t n_descs = 0, n_buffers = 0;
1835 	uint16_t start_idx, from, i;
1836 
1837 	n_cpl = async->ops.check_completed_copies(dev->vid, queue_id, 0, count);
1838 	if (unlikely(n_cpl < 0)) {
1839 		VHOST_LOG_DATA(ERR, "(%s) %s: failed to check completed copies for queue id %d.\n",
1840 				dev->ifname, __func__, queue_id);
1841 		return 0;
1842 	}
1843 
1844 	if (n_cpl == 0)
1845 		return 0;
1846 
1847 	start_idx = async_get_first_inflight_pkt_idx(vq);
1848 
1849 	for (i = 0; i < n_cpl; i++) {
1850 		from = (start_idx + i) % vq->size;
1851 		/* Only used with packed ring */
1852 		n_buffers += pkts_info[from].nr_buffers;
1853 		/* Only used with split ring */
1854 		n_descs += pkts_info[from].descs;
1855 		pkts[i] = pkts_info[from].mbuf;
1856 	}
1857 
1858 	async->pkts_inflight_n -= n_cpl;
1859 
1860 	if (likely(vq->enabled && vq->access_ok)) {
1861 		if (vq_is_packed(dev)) {
1862 			write_back_completed_descs_packed(vq, n_buffers);
1863 			vhost_vring_call_packed(dev, vq);
1864 		} else {
1865 			write_back_completed_descs_split(vq, n_descs);
1866 			__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
1867 			vhost_vring_call_split(dev, vq);
1868 		}
1869 	} else {
1870 		if (vq_is_packed(dev)) {
1871 			async->last_buffer_idx_packed += n_buffers;
1872 			if (async->last_buffer_idx_packed >= vq->size)
1873 				async->last_buffer_idx_packed -= vq->size;
1874 		} else {
1875 			async->last_desc_idx_split += n_descs;
1876 		}
1877 	}
1878 
1879 	return n_cpl;
1880 }
1881 
1882 uint16_t
1883 rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
1884 		struct rte_mbuf **pkts, uint16_t count)
1885 {
1886 	struct virtio_net *dev = get_device(vid);
1887 	struct vhost_virtqueue *vq;
1888 	uint16_t n_pkts_cpl = 0;
1889 
1890 	if (unlikely(!dev))
1891 		return 0;
1892 
1893 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
1894 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1895 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
1896 			dev->ifname, __func__, queue_id);
1897 		return 0;
1898 	}
1899 
1900 	vq = dev->virtqueue[queue_id];
1901 
1902 	if (unlikely(!vq->async)) {
1903 		VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for queue id %d.\n",
1904 			dev->ifname, __func__, queue_id);
1905 		return 0;
1906 	}
1907 
1908 	rte_spinlock_lock(&vq->access_lock);
1909 
1910 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
1911 
1912 	rte_spinlock_unlock(&vq->access_lock);
1913 
1914 	return n_pkts_cpl;
1915 }
1916 
1917 uint16_t
1918 rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
1919 		struct rte_mbuf **pkts, uint16_t count)
1920 {
1921 	struct virtio_net *dev = get_device(vid);
1922 	struct vhost_virtqueue *vq;
1923 	uint16_t n_pkts_cpl = 0;
1924 
1925 	if (!dev)
1926 		return 0;
1927 
1928 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
1929 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1930 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
1931 			dev->ifname, __func__, queue_id);
1932 		return 0;
1933 	}
1934 
1935 	vq = dev->virtqueue[queue_id];
1936 
1937 	if (unlikely(!vq->async)) {
1938 		VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for queue id %d.\n",
1939 			dev->ifname, __func__, queue_id);
1940 		return 0;
1941 	}
1942 
1943 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
1944 
1945 	return n_pkts_cpl;
1946 }
1947 
1948 static __rte_always_inline uint32_t
1949 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
1950 	struct rte_mbuf **pkts, uint32_t count)
1951 {
1952 	struct vhost_virtqueue *vq;
1953 	uint32_t nb_tx = 0;
1954 
1955 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
1956 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1957 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
1958 			dev->ifname, __func__, queue_id);
1959 		return 0;
1960 	}
1961 
1962 	vq = dev->virtqueue[queue_id];
1963 
1964 	rte_spinlock_lock(&vq->access_lock);
1965 
1966 	if (unlikely(!vq->enabled || !vq->async))
1967 		goto out_access_unlock;
1968 
1969 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1970 		vhost_user_iotlb_rd_lock(vq);
1971 
1972 	if (unlikely(!vq->access_ok))
1973 		if (unlikely(vring_translate(dev, vq) < 0))
1974 			goto out;
1975 
1976 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1977 	if (count == 0)
1978 		goto out;
1979 
1980 	if (vq_is_packed(dev))
1981 		nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, queue_id,
1982 				pkts, count);
1983 	else
1984 		nb_tx = virtio_dev_rx_async_submit_split(dev, vq, queue_id,
1985 				pkts, count);
1986 
1987 out:
1988 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1989 		vhost_user_iotlb_rd_unlock(vq);
1990 
1991 out_access_unlock:
1992 	rte_spinlock_unlock(&vq->access_lock);
1993 
1994 	return nb_tx;
1995 }
1996 
1997 uint16_t
1998 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
1999 		struct rte_mbuf **pkts, uint16_t count)
2000 {
2001 	struct virtio_net *dev = get_device(vid);
2002 
2003 	if (!dev)
2004 		return 0;
2005 
2006 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2007 		VHOST_LOG_DATA(ERR, "(%s) %s: built-in vhost net backend is disabled.\n",
2008 			dev->ifname, __func__);
2009 		return 0;
2010 	}
2011 
2012 	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
2013 }
2014 
2015 static inline bool
2016 virtio_net_with_host_offload(struct virtio_net *dev)
2017 {
2018 	if (dev->features &
2019 			((1ULL << VIRTIO_NET_F_CSUM) |
2020 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2021 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2022 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2023 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2024 		return true;
2025 
2026 	return false;
2027 }
2028 
2029 static int
2030 parse_headers(struct rte_mbuf *m, uint8_t *l4_proto)
2031 {
2032 	struct rte_ipv4_hdr *ipv4_hdr;
2033 	struct rte_ipv6_hdr *ipv6_hdr;
2034 	struct rte_ether_hdr *eth_hdr;
2035 	uint16_t ethertype;
2036 	uint16_t data_len = rte_pktmbuf_data_len(m);
2037 
2038 	if (data_len < sizeof(struct rte_ether_hdr))
2039 		return -EINVAL;
2040 
2041 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2042 
2043 	m->l2_len = sizeof(struct rte_ether_hdr);
2044 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2045 
2046 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2047 		if (data_len < sizeof(struct rte_ether_hdr) +
2048 				sizeof(struct rte_vlan_hdr))
2049 			goto error;
2050 
2051 		struct rte_vlan_hdr *vlan_hdr =
2052 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2053 
2054 		m->l2_len += sizeof(struct rte_vlan_hdr);
2055 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2056 	}
2057 
2058 	switch (ethertype) {
2059 	case RTE_ETHER_TYPE_IPV4:
2060 		if (data_len < m->l2_len + sizeof(struct rte_ipv4_hdr))
2061 			goto error;
2062 		ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
2063 				m->l2_len);
2064 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2065 		if (data_len < m->l2_len + m->l3_len)
2066 			goto error;
2067 		m->ol_flags |= RTE_MBUF_F_TX_IPV4;
2068 		*l4_proto = ipv4_hdr->next_proto_id;
2069 		break;
2070 	case RTE_ETHER_TYPE_IPV6:
2071 		if (data_len < m->l2_len + sizeof(struct rte_ipv6_hdr))
2072 			goto error;
2073 		ipv6_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
2074 				m->l2_len);
2075 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2076 		m->ol_flags |= RTE_MBUF_F_TX_IPV6;
2077 		*l4_proto = ipv6_hdr->proto;
2078 		break;
2079 	default:
2080 		/* a valid L3 header is needed for further L4 parsing */
2081 		goto error;
2082 	}
2083 
2084 	/* both CSUM and GSO need a valid L4 header */
2085 	switch (*l4_proto) {
2086 	case IPPROTO_TCP:
2087 		if (data_len < m->l2_len + m->l3_len +
2088 				sizeof(struct rte_tcp_hdr))
2089 			goto error;
2090 		break;
2091 	case IPPROTO_UDP:
2092 		if (data_len < m->l2_len + m->l3_len +
2093 				sizeof(struct rte_udp_hdr))
2094 			goto error;
2095 		break;
2096 	case IPPROTO_SCTP:
2097 		if (data_len < m->l2_len + m->l3_len +
2098 				sizeof(struct rte_sctp_hdr))
2099 			goto error;
2100 		break;
2101 	default:
2102 		goto error;
2103 	}
2104 
2105 	return 0;
2106 
2107 error:
2108 	m->l2_len = 0;
2109 	m->l3_len = 0;
2110 	m->ol_flags = 0;
2111 	return -EINVAL;
2112 }
2113 
2114 static __rte_always_inline void
2115 vhost_dequeue_offload_legacy(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2116 		struct rte_mbuf *m)
2117 {
2118 	uint8_t l4_proto = 0;
2119 	struct rte_tcp_hdr *tcp_hdr = NULL;
2120 	uint16_t tcp_len;
2121 	uint16_t data_len = rte_pktmbuf_data_len(m);
2122 
2123 	if (parse_headers(m, &l4_proto) < 0)
2124 		return;
2125 
2126 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2127 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2128 			switch (hdr->csum_offset) {
2129 			case (offsetof(struct rte_tcp_hdr, cksum)):
2130 				if (l4_proto != IPPROTO_TCP)
2131 					goto error;
2132 				m->ol_flags |= RTE_MBUF_F_TX_TCP_CKSUM;
2133 				break;
2134 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2135 				if (l4_proto != IPPROTO_UDP)
2136 					goto error;
2137 				m->ol_flags |= RTE_MBUF_F_TX_UDP_CKSUM;
2138 				break;
2139 			case (offsetof(struct rte_sctp_hdr, cksum)):
2140 				if (l4_proto != IPPROTO_SCTP)
2141 					goto error;
2142 				m->ol_flags |= RTE_MBUF_F_TX_SCTP_CKSUM;
2143 				break;
2144 			default:
2145 				goto error;
2146 			}
2147 		} else {
2148 			goto error;
2149 		}
2150 	}
2151 
2152 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2153 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2154 		case VIRTIO_NET_HDR_GSO_TCPV4:
2155 		case VIRTIO_NET_HDR_GSO_TCPV6:
2156 			if (l4_proto != IPPROTO_TCP)
2157 				goto error;
2158 			tcp_hdr = rte_pktmbuf_mtod_offset(m,
2159 					struct rte_tcp_hdr *,
2160 					m->l2_len + m->l3_len);
2161 			tcp_len = (tcp_hdr->data_off & 0xf0) >> 2;
2162 			if (data_len < m->l2_len + m->l3_len + tcp_len)
2163 				goto error;
2164 			m->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
2165 			m->tso_segsz = hdr->gso_size;
2166 			m->l4_len = tcp_len;
2167 			break;
2168 		case VIRTIO_NET_HDR_GSO_UDP:
2169 			if (l4_proto != IPPROTO_UDP)
2170 				goto error;
2171 			m->ol_flags |= RTE_MBUF_F_TX_UDP_SEG;
2172 			m->tso_segsz = hdr->gso_size;
2173 			m->l4_len = sizeof(struct rte_udp_hdr);
2174 			break;
2175 		default:
2176 			VHOST_LOG_DATA(WARNING, "(%s) unsupported gso type %u.\n",
2177 					dev->ifname, hdr->gso_type);
2178 			goto error;
2179 		}
2180 	}
2181 	return;
2182 
2183 error:
2184 	m->l2_len = 0;
2185 	m->l3_len = 0;
2186 	m->ol_flags = 0;
2187 }
2188 
2189 static __rte_always_inline void
2190 vhost_dequeue_offload(struct virtio_net *dev, struct virtio_net_hdr *hdr,
2191 		struct rte_mbuf *m, bool legacy_ol_flags)
2192 {
2193 	struct rte_net_hdr_lens hdr_lens;
2194 	int l4_supported = 0;
2195 	uint32_t ptype;
2196 
2197 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2198 		return;
2199 
2200 	if (legacy_ol_flags) {
2201 		vhost_dequeue_offload_legacy(dev, hdr, m);
2202 		return;
2203 	}
2204 
2205 	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
2206 
2207 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2208 	m->packet_type = ptype;
2209 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2210 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2211 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2212 		l4_supported = 1;
2213 
2214 	/* According to Virtio 1.1 spec, the device only needs to look at
2215 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2216 	 * This differs from the processing incoming packets path where the
2217 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2218 	 * device.
2219 	 *
2220 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2221 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2222 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2223 	 *
2224 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2225 	 * The device MUST ignore flag bits that it does not recognize.
2226 	 */
2227 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2228 		uint32_t hdrlen;
2229 
2230 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2231 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2232 			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
2233 		} else {
2234 			/* Unknown proto or tunnel, do sw cksum. We can assume
2235 			 * the cksum field is in the first segment since the
2236 			 * buffers we provided to the host are large enough.
2237 			 * In case of SCTP, this will be wrong since it's a CRC
2238 			 * but there's nothing we can do.
2239 			 */
2240 			uint16_t csum = 0, off;
2241 
2242 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2243 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2244 				return;
2245 			if (likely(csum != 0xffff))
2246 				csum = ~csum;
2247 			off = hdr->csum_offset + hdr->csum_start;
2248 			if (rte_pktmbuf_data_len(m) >= off + 1)
2249 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2250 		}
2251 	}
2252 
2253 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2254 		if (hdr->gso_size == 0)
2255 			return;
2256 
2257 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2258 		case VIRTIO_NET_HDR_GSO_TCPV4:
2259 		case VIRTIO_NET_HDR_GSO_TCPV6:
2260 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2261 				break;
2262 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2263 			m->tso_segsz = hdr->gso_size;
2264 			break;
2265 		case VIRTIO_NET_HDR_GSO_UDP:
2266 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2267 				break;
2268 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2269 			m->tso_segsz = hdr->gso_size;
2270 			break;
2271 		default:
2272 			break;
2273 		}
2274 	}
2275 }
2276 
2277 static __rte_noinline void
2278 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2279 		struct buf_vector *buf_vec)
2280 {
2281 	uint64_t len;
2282 	uint64_t remain = sizeof(struct virtio_net_hdr);
2283 	uint64_t src;
2284 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2285 
2286 	while (remain) {
2287 		len = RTE_MIN(remain, buf_vec->buf_len);
2288 		src = buf_vec->buf_addr;
2289 		rte_memcpy((void *)(uintptr_t)dst,
2290 				(void *)(uintptr_t)src, len);
2291 
2292 		remain -= len;
2293 		dst += len;
2294 		buf_vec++;
2295 	}
2296 }
2297 
2298 static __rte_always_inline int
2299 copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2300 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2301 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2302 		  bool legacy_ol_flags)
2303 {
2304 	uint32_t buf_avail, buf_offset;
2305 	uint64_t buf_addr, buf_len;
2306 	uint32_t mbuf_avail, mbuf_offset;
2307 	uint32_t cpy_len;
2308 	struct rte_mbuf *cur = m, *prev = m;
2309 	struct virtio_net_hdr tmp_hdr;
2310 	struct virtio_net_hdr *hdr = NULL;
2311 	/* A counter to avoid desc dead loop chain */
2312 	uint16_t vec_idx = 0;
2313 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
2314 	int error = 0;
2315 
2316 	buf_addr = buf_vec[vec_idx].buf_addr;
2317 	buf_len = buf_vec[vec_idx].buf_len;
2318 
2319 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
2320 		error = -1;
2321 		goto out;
2322 	}
2323 
2324 	if (virtio_net_with_host_offload(dev)) {
2325 		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
2326 			/*
2327 			 * No luck, the virtio-net header doesn't fit
2328 			 * in a contiguous virtual area.
2329 			 */
2330 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2331 			hdr = &tmp_hdr;
2332 		} else {
2333 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
2334 		}
2335 	}
2336 
2337 	/*
2338 	 * A virtio driver normally uses at least 2 desc buffers
2339 	 * for Tx: the first for storing the header, and others
2340 	 * for storing the data.
2341 	 */
2342 	if (unlikely(buf_len < dev->vhost_hlen)) {
2343 		buf_offset = dev->vhost_hlen - buf_len;
2344 		vec_idx++;
2345 		buf_addr = buf_vec[vec_idx].buf_addr;
2346 		buf_len = buf_vec[vec_idx].buf_len;
2347 		buf_avail  = buf_len - buf_offset;
2348 	} else if (buf_len == dev->vhost_hlen) {
2349 		if (unlikely(++vec_idx >= nr_vec))
2350 			goto out;
2351 		buf_addr = buf_vec[vec_idx].buf_addr;
2352 		buf_len = buf_vec[vec_idx].buf_len;
2353 
2354 		buf_offset = 0;
2355 		buf_avail = buf_len;
2356 	} else {
2357 		buf_offset = dev->vhost_hlen;
2358 		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
2359 	}
2360 
2361 	PRINT_PACKET(dev,
2362 			(uintptr_t)(buf_addr + buf_offset),
2363 			(uint32_t)buf_avail, 0);
2364 
2365 	mbuf_offset = 0;
2366 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2367 	while (1) {
2368 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2369 
2370 		if (likely(cpy_len > MAX_BATCH_LEN ||
2371 					vq->batch_copy_nb_elems >= vq->size ||
2372 					(hdr && cur == m))) {
2373 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
2374 						mbuf_offset),
2375 					(void *)((uintptr_t)(buf_addr +
2376 							buf_offset)), cpy_len);
2377 		} else {
2378 			batch_copy[vq->batch_copy_nb_elems].dst =
2379 				rte_pktmbuf_mtod_offset(cur, void *,
2380 						mbuf_offset);
2381 			batch_copy[vq->batch_copy_nb_elems].src =
2382 				(void *)((uintptr_t)(buf_addr + buf_offset));
2383 			batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
2384 			vq->batch_copy_nb_elems++;
2385 		}
2386 
2387 		mbuf_avail  -= cpy_len;
2388 		mbuf_offset += cpy_len;
2389 		buf_avail -= cpy_len;
2390 		buf_offset += cpy_len;
2391 
2392 		/* This buf reaches to its end, get the next one */
2393 		if (buf_avail == 0) {
2394 			if (++vec_idx >= nr_vec)
2395 				break;
2396 
2397 			buf_addr = buf_vec[vec_idx].buf_addr;
2398 			buf_len = buf_vec[vec_idx].buf_len;
2399 
2400 			buf_offset = 0;
2401 			buf_avail  = buf_len;
2402 
2403 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2404 					(uint32_t)buf_avail, 0);
2405 		}
2406 
2407 		/*
2408 		 * This mbuf reaches to its end, get a new one
2409 		 * to hold more data.
2410 		 */
2411 		if (mbuf_avail == 0) {
2412 			cur = rte_pktmbuf_alloc(mbuf_pool);
2413 			if (unlikely(cur == NULL)) {
2414 				VHOST_LOG_DATA(ERR, "(%s) failed to allocate memory for mbuf.\n",
2415 						dev->ifname);
2416 				error = -1;
2417 				goto out;
2418 			}
2419 
2420 			prev->next = cur;
2421 			prev->data_len = mbuf_offset;
2422 			m->nb_segs += 1;
2423 			m->pkt_len += mbuf_offset;
2424 			prev = cur;
2425 
2426 			mbuf_offset = 0;
2427 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
2428 		}
2429 	}
2430 
2431 	prev->data_len = mbuf_offset;
2432 	m->pkt_len    += mbuf_offset;
2433 
2434 	if (hdr)
2435 		vhost_dequeue_offload(dev, hdr, m, legacy_ol_flags);
2436 
2437 out:
2438 
2439 	return error;
2440 }
2441 
2442 static void
2443 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
2444 {
2445 	rte_free(opaque);
2446 }
2447 
2448 static int
2449 virtio_dev_extbuf_alloc(struct virtio_net *dev, struct rte_mbuf *pkt, uint32_t size)
2450 {
2451 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
2452 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
2453 	uint16_t buf_len;
2454 	rte_iova_t iova;
2455 	void *buf;
2456 
2457 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
2458 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
2459 
2460 	if (unlikely(total_len > UINT16_MAX))
2461 		return -ENOSPC;
2462 
2463 	buf_len = total_len;
2464 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
2465 	if (unlikely(buf == NULL))
2466 		return -ENOMEM;
2467 
2468 	/* Initialize shinfo */
2469 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
2470 						virtio_dev_extbuf_free, buf);
2471 	if (unlikely(shinfo == NULL)) {
2472 		rte_free(buf);
2473 		VHOST_LOG_DATA(ERR, "(%s) failed to init shinfo\n", dev->ifname);
2474 		return -1;
2475 	}
2476 
2477 	iova = rte_malloc_virt2iova(buf);
2478 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
2479 	rte_pktmbuf_reset_headroom(pkt);
2480 
2481 	return 0;
2482 }
2483 
2484 /*
2485  * Prepare a host supported pktmbuf.
2486  */
2487 static __rte_always_inline int
2488 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
2489 			 uint32_t data_len)
2490 {
2491 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
2492 		return 0;
2493 
2494 	/* attach an external buffer if supported */
2495 	if (dev->extbuf && !virtio_dev_extbuf_alloc(dev, pkt, data_len))
2496 		return 0;
2497 
2498 	/* check if chained buffers are allowed */
2499 	if (!dev->linearbuf)
2500 		return 0;
2501 
2502 	return -1;
2503 }
2504 
2505 __rte_always_inline
2506 static uint16_t
2507 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
2508 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
2509 	bool legacy_ol_flags)
2510 {
2511 	uint16_t i;
2512 	uint16_t free_entries;
2513 	uint16_t dropped = 0;
2514 	static bool allocerr_warned;
2515 
2516 	/*
2517 	 * The ordering between avail index and
2518 	 * desc reads needs to be enforced.
2519 	 */
2520 	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
2521 			vq->last_avail_idx;
2522 	if (free_entries == 0)
2523 		return 0;
2524 
2525 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
2526 
2527 	VHOST_LOG_DATA(DEBUG, "(%s) %s\n", dev->ifname, __func__);
2528 
2529 	count = RTE_MIN(count, MAX_PKT_BURST);
2530 	count = RTE_MIN(count, free_entries);
2531 	VHOST_LOG_DATA(DEBUG, "(%s) about to dequeue %u buffers\n",
2532 			dev->ifname, count);
2533 
2534 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
2535 		return 0;
2536 
2537 	for (i = 0; i < count; i++) {
2538 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
2539 		uint16_t head_idx;
2540 		uint32_t buf_len;
2541 		uint16_t nr_vec = 0;
2542 		int err;
2543 
2544 		if (unlikely(fill_vec_buf_split(dev, vq,
2545 						vq->last_avail_idx + i,
2546 						&nr_vec, buf_vec,
2547 						&head_idx, &buf_len,
2548 						VHOST_ACCESS_RO) < 0))
2549 			break;
2550 
2551 		update_shadow_used_ring_split(vq, head_idx, 0);
2552 
2553 		err = virtio_dev_pktmbuf_prep(dev, pkts[i], buf_len);
2554 		if (unlikely(err)) {
2555 			/*
2556 			 * mbuf allocation fails for jumbo packets when external
2557 			 * buffer allocation is not allowed and linear buffer
2558 			 * is required. Drop this packet.
2559 			 */
2560 			if (!allocerr_warned) {
2561 				VHOST_LOG_DATA(ERR, "(%s) failed mbuf alloc of size %d from %s.\n",
2562 					dev->ifname, buf_len, mbuf_pool->name);
2563 				allocerr_warned = true;
2564 			}
2565 			dropped += 1;
2566 			i++;
2567 			break;
2568 		}
2569 
2570 		err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
2571 				mbuf_pool, legacy_ol_flags);
2572 		if (unlikely(err)) {
2573 			if (!allocerr_warned) {
2574 				VHOST_LOG_DATA(ERR, "(%s) failed to copy desc to mbuf.\n",
2575 					dev->ifname);
2576 				allocerr_warned = true;
2577 			}
2578 			dropped += 1;
2579 			i++;
2580 			break;
2581 		}
2582 	}
2583 
2584 	if (dropped)
2585 		rte_pktmbuf_free_bulk(&pkts[i - 1], count - i + 1);
2586 
2587 	vq->last_avail_idx += i;
2588 
2589 	do_data_copy_dequeue(vq);
2590 	if (unlikely(i < count))
2591 		vq->shadow_used_idx = i;
2592 	if (likely(vq->shadow_used_idx)) {
2593 		flush_shadow_used_ring_split(dev, vq);
2594 		vhost_vring_call_split(dev, vq);
2595 	}
2596 
2597 	return (i - dropped);
2598 }
2599 
2600 __rte_noinline
2601 static uint16_t
2602 virtio_dev_tx_split_legacy(struct virtio_net *dev,
2603 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2604 	struct rte_mbuf **pkts, uint16_t count)
2605 {
2606 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
2607 }
2608 
2609 __rte_noinline
2610 static uint16_t
2611 virtio_dev_tx_split_compliant(struct virtio_net *dev,
2612 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2613 	struct rte_mbuf **pkts, uint16_t count)
2614 {
2615 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
2616 }
2617 
2618 static __rte_always_inline int
2619 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
2620 				 struct vhost_virtqueue *vq,
2621 				 struct rte_mbuf **pkts,
2622 				 uint16_t avail_idx,
2623 				 uintptr_t *desc_addrs,
2624 				 uint16_t *ids)
2625 {
2626 	bool wrap = vq->avail_wrap_counter;
2627 	struct vring_packed_desc *descs = vq->desc_packed;
2628 	uint64_t lens[PACKED_BATCH_SIZE];
2629 	uint64_t buf_lens[PACKED_BATCH_SIZE];
2630 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2631 	uint16_t flags, i;
2632 
2633 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
2634 		return -1;
2635 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
2636 		return -1;
2637 
2638 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2639 		flags = descs[avail_idx + i].flags;
2640 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
2641 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
2642 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
2643 			return -1;
2644 	}
2645 
2646 	rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
2647 
2648 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2649 		lens[i] = descs[avail_idx + i].len;
2650 
2651 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2652 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
2653 						  descs[avail_idx + i].addr,
2654 						  &lens[i], VHOST_ACCESS_RW);
2655 	}
2656 
2657 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2658 		if (unlikely(!desc_addrs[i]))
2659 			return -1;
2660 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
2661 			return -1;
2662 	}
2663 
2664 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2665 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
2666 			goto err;
2667 	}
2668 
2669 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2670 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
2671 
2672 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2673 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
2674 			goto err;
2675 	}
2676 
2677 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2678 		pkts[i]->pkt_len = lens[i] - buf_offset;
2679 		pkts[i]->data_len = pkts[i]->pkt_len;
2680 		ids[i] = descs[avail_idx + i].id;
2681 	}
2682 
2683 	return 0;
2684 
2685 err:
2686 	return -1;
2687 }
2688 
2689 static __rte_always_inline int
2690 virtio_dev_tx_batch_packed(struct virtio_net *dev,
2691 			   struct vhost_virtqueue *vq,
2692 			   struct rte_mbuf **pkts,
2693 			   bool legacy_ol_flags)
2694 {
2695 	uint16_t avail_idx = vq->last_avail_idx;
2696 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2697 	struct virtio_net_hdr *hdr;
2698 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
2699 	uint16_t ids[PACKED_BATCH_SIZE];
2700 	uint16_t i;
2701 
2702 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
2703 					     desc_addrs, ids))
2704 		return -1;
2705 
2706 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2707 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2708 
2709 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2710 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
2711 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
2712 			   pkts[i]->pkt_len);
2713 
2714 	if (virtio_net_with_host_offload(dev)) {
2715 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2716 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
2717 			vhost_dequeue_offload(dev, hdr, pkts[i], legacy_ol_flags);
2718 		}
2719 	}
2720 
2721 	if (virtio_net_is_inorder(dev))
2722 		vhost_shadow_dequeue_batch_packed_inorder(vq,
2723 			ids[PACKED_BATCH_SIZE - 1]);
2724 	else
2725 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
2726 
2727 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2728 
2729 	return 0;
2730 }
2731 
2732 static __rte_always_inline int
2733 vhost_dequeue_single_packed(struct virtio_net *dev,
2734 			    struct vhost_virtqueue *vq,
2735 			    struct rte_mempool *mbuf_pool,
2736 			    struct rte_mbuf *pkts,
2737 			    uint16_t *buf_id,
2738 			    uint16_t *desc_count,
2739 			    bool legacy_ol_flags)
2740 {
2741 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
2742 	uint32_t buf_len;
2743 	uint16_t nr_vec = 0;
2744 	int err;
2745 	static bool allocerr_warned;
2746 
2747 	if (unlikely(fill_vec_buf_packed(dev, vq,
2748 					 vq->last_avail_idx, desc_count,
2749 					 buf_vec, &nr_vec,
2750 					 buf_id, &buf_len,
2751 					 VHOST_ACCESS_RO) < 0))
2752 		return -1;
2753 
2754 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
2755 		if (!allocerr_warned) {
2756 			VHOST_LOG_DATA(ERR, "(%s) failed mbuf alloc of size %d from %s.\n",
2757 				dev->ifname, buf_len, mbuf_pool->name);
2758 			allocerr_warned = true;
2759 		}
2760 		return -1;
2761 	}
2762 
2763 	err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
2764 				mbuf_pool, legacy_ol_flags);
2765 	if (unlikely(err)) {
2766 		if (!allocerr_warned) {
2767 			VHOST_LOG_DATA(ERR, "(%s) failed to copy desc to mbuf.\n",
2768 				dev->ifname);
2769 			allocerr_warned = true;
2770 		}
2771 		return -1;
2772 	}
2773 
2774 	return 0;
2775 }
2776 
2777 static __rte_always_inline int
2778 virtio_dev_tx_single_packed(struct virtio_net *dev,
2779 			    struct vhost_virtqueue *vq,
2780 			    struct rte_mempool *mbuf_pool,
2781 			    struct rte_mbuf *pkts,
2782 			    bool legacy_ol_flags)
2783 {
2784 
2785 	uint16_t buf_id, desc_count = 0;
2786 	int ret;
2787 
2788 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
2789 					&desc_count, legacy_ol_flags);
2790 
2791 	if (likely(desc_count > 0)) {
2792 		if (virtio_net_is_inorder(dev))
2793 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
2794 								   desc_count);
2795 		else
2796 			vhost_shadow_dequeue_single_packed(vq, buf_id,
2797 					desc_count);
2798 
2799 		vq_inc_last_avail_packed(vq, desc_count);
2800 	}
2801 
2802 	return ret;
2803 }
2804 
2805 __rte_always_inline
2806 static uint16_t
2807 virtio_dev_tx_packed(struct virtio_net *dev,
2808 		     struct vhost_virtqueue *__rte_restrict vq,
2809 		     struct rte_mempool *mbuf_pool,
2810 		     struct rte_mbuf **__rte_restrict pkts,
2811 		     uint32_t count,
2812 		     bool legacy_ol_flags)
2813 {
2814 	uint32_t pkt_idx = 0;
2815 
2816 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
2817 		return 0;
2818 
2819 	do {
2820 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2821 
2822 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
2823 			if (!virtio_dev_tx_batch_packed(dev, vq,
2824 							&pkts[pkt_idx],
2825 							legacy_ol_flags)) {
2826 				pkt_idx += PACKED_BATCH_SIZE;
2827 				continue;
2828 			}
2829 		}
2830 
2831 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
2832 						pkts[pkt_idx],
2833 						legacy_ol_flags))
2834 			break;
2835 		pkt_idx++;
2836 	} while (pkt_idx < count);
2837 
2838 	if (pkt_idx != count)
2839 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
2840 
2841 	if (vq->shadow_used_idx) {
2842 		do_data_copy_dequeue(vq);
2843 
2844 		vhost_flush_dequeue_shadow_packed(dev, vq);
2845 		vhost_vring_call_packed(dev, vq);
2846 	}
2847 
2848 	return pkt_idx;
2849 }
2850 
2851 __rte_noinline
2852 static uint16_t
2853 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
2854 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
2855 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
2856 {
2857 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
2858 }
2859 
2860 __rte_noinline
2861 static uint16_t
2862 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
2863 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
2864 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
2865 {
2866 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
2867 }
2868 
2869 uint16_t
2870 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
2871 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
2872 {
2873 	struct virtio_net *dev;
2874 	struct rte_mbuf *rarp_mbuf = NULL;
2875 	struct vhost_virtqueue *vq;
2876 	int16_t success = 1;
2877 
2878 	dev = get_device(vid);
2879 	if (!dev)
2880 		return 0;
2881 
2882 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2883 		VHOST_LOG_DATA(ERR, "(%s) %s: built-in vhost net backend is disabled.\n",
2884 				dev->ifname, __func__);
2885 		return 0;
2886 	}
2887 
2888 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
2889 		VHOST_LOG_DATA(ERR, "(%s) %s: invalid virtqueue idx %d.\n",
2890 				dev->ifname, __func__, queue_id);
2891 		return 0;
2892 	}
2893 
2894 	vq = dev->virtqueue[queue_id];
2895 
2896 	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
2897 		return 0;
2898 
2899 	if (unlikely(!vq->enabled)) {
2900 		count = 0;
2901 		goto out_access_unlock;
2902 	}
2903 
2904 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2905 		vhost_user_iotlb_rd_lock(vq);
2906 
2907 	if (unlikely(!vq->access_ok))
2908 		if (unlikely(vring_translate(dev, vq) < 0)) {
2909 			count = 0;
2910 			goto out;
2911 		}
2912 
2913 	/*
2914 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
2915 	 * array, to looks like that guest actually send such packet.
2916 	 *
2917 	 * Check user_send_rarp() for more information.
2918 	 *
2919 	 * broadcast_rarp shares a cacheline in the virtio_net structure
2920 	 * with some fields that are accessed during enqueue and
2921 	 * __atomic_compare_exchange_n causes a write if performed compare
2922 	 * and exchange. This could result in false sharing between enqueue
2923 	 * and dequeue.
2924 	 *
2925 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
2926 	 * and only performing compare and exchange if the read indicates it
2927 	 * is likely to be set.
2928 	 */
2929 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
2930 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
2931 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
2932 
2933 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
2934 		if (rarp_mbuf == NULL) {
2935 			VHOST_LOG_DATA(ERR, "(%s) failed to make RARP packet.\n", dev->ifname);
2936 			count = 0;
2937 			goto out;
2938 		}
2939 		/*
2940 		 * Inject it to the head of "pkts" array, so that switch's mac
2941 		 * learning table will get updated first.
2942 		 */
2943 		pkts[0] = rarp_mbuf;
2944 		pkts++;
2945 		count -= 1;
2946 	}
2947 
2948 	if (vq_is_packed(dev)) {
2949 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
2950 			count = virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool, pkts, count);
2951 		else
2952 			count = virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool, pkts, count);
2953 	} else {
2954 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
2955 			count = virtio_dev_tx_split_legacy(dev, vq, mbuf_pool, pkts, count);
2956 		else
2957 			count = virtio_dev_tx_split_compliant(dev, vq, mbuf_pool, pkts, count);
2958 	}
2959 
2960 out:
2961 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2962 		vhost_user_iotlb_rd_unlock(vq);
2963 
2964 out_access_unlock:
2965 	rte_spinlock_unlock(&vq->access_lock);
2966 
2967 	if (unlikely(rarp_mbuf != NULL))
2968 		count += 1;
2969 
2970 	return count;
2971 }
2972