xref: /dpdk/lib/vhost/virtio_net.c (revision 69f9d8aa357d2299e057b7e335f340e20a0c5e7e)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_vhost.h>
15 #include <rte_tcp.h>
16 #include <rte_udp.h>
17 #include <rte_sctp.h>
18 #include <rte_arp.h>
19 #include <rte_spinlock.h>
20 #include <rte_malloc.h>
21 #include <rte_vhost_async.h>
22 
23 #include "iotlb.h"
24 #include "vhost.h"
25 
26 #define MAX_BATCH_LEN 256
27 
28 static  __rte_always_inline bool
29 rxvq_is_mergeable(struct virtio_net *dev)
30 {
31 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
32 }
33 
34 static  __rte_always_inline bool
35 virtio_net_is_inorder(struct virtio_net *dev)
36 {
37 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
38 }
39 
40 static bool
41 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
42 {
43 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
44 }
45 
46 static inline void
47 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
48 {
49 	struct batch_copy_elem *elem = vq->batch_copy_elems;
50 	uint16_t count = vq->batch_copy_nb_elems;
51 	int i;
52 
53 	for (i = 0; i < count; i++) {
54 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
55 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
56 					   elem[i].len);
57 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
58 	}
59 
60 	vq->batch_copy_nb_elems = 0;
61 }
62 
63 static inline void
64 do_data_copy_dequeue(struct vhost_virtqueue *vq)
65 {
66 	struct batch_copy_elem *elem = vq->batch_copy_elems;
67 	uint16_t count = vq->batch_copy_nb_elems;
68 	int i;
69 
70 	for (i = 0; i < count; i++)
71 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
72 
73 	vq->batch_copy_nb_elems = 0;
74 }
75 
76 static __rte_always_inline void
77 do_flush_shadow_used_ring_split(struct virtio_net *dev,
78 			struct vhost_virtqueue *vq,
79 			uint16_t to, uint16_t from, uint16_t size)
80 {
81 	rte_memcpy(&vq->used->ring[to],
82 			&vq->shadow_used_split[from],
83 			size * sizeof(struct vring_used_elem));
84 	vhost_log_cache_used_vring(dev, vq,
85 			offsetof(struct vring_used, ring[to]),
86 			size * sizeof(struct vring_used_elem));
87 }
88 
89 static __rte_always_inline void
90 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
91 {
92 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
93 
94 	if (used_idx + vq->shadow_used_idx <= vq->size) {
95 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
96 					  vq->shadow_used_idx);
97 	} else {
98 		uint16_t size;
99 
100 		/* update used ring interval [used_idx, vq->size] */
101 		size = vq->size - used_idx;
102 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
103 
104 		/* update the left half used ring interval [0, left_size] */
105 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
106 					  vq->shadow_used_idx - size);
107 	}
108 	vq->last_used_idx += vq->shadow_used_idx;
109 
110 	vhost_log_cache_sync(dev, vq);
111 
112 	__atomic_add_fetch(&vq->used->idx, vq->shadow_used_idx,
113 			   __ATOMIC_RELEASE);
114 	vq->shadow_used_idx = 0;
115 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
116 		sizeof(vq->used->idx));
117 }
118 
119 static __rte_always_inline void
120 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
121 			 uint16_t desc_idx, uint32_t len)
122 {
123 	uint16_t i = vq->shadow_used_idx++;
124 
125 	vq->shadow_used_split[i].id  = desc_idx;
126 	vq->shadow_used_split[i].len = len;
127 }
128 
129 static __rte_always_inline void
130 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
131 				  struct vhost_virtqueue *vq)
132 {
133 	int i;
134 	uint16_t used_idx = vq->last_used_idx;
135 	uint16_t head_idx = vq->last_used_idx;
136 	uint16_t head_flags = 0;
137 
138 	/* Split loop in two to save memory barriers */
139 	for (i = 0; i < vq->shadow_used_idx; i++) {
140 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
141 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
142 
143 		used_idx += vq->shadow_used_packed[i].count;
144 		if (used_idx >= vq->size)
145 			used_idx -= vq->size;
146 	}
147 
148 	/* The ordering for storing desc flags needs to be enforced. */
149 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
150 
151 	for (i = 0; i < vq->shadow_used_idx; i++) {
152 		uint16_t flags;
153 
154 		if (vq->shadow_used_packed[i].len)
155 			flags = VRING_DESC_F_WRITE;
156 		else
157 			flags = 0;
158 
159 		if (vq->used_wrap_counter) {
160 			flags |= VRING_DESC_F_USED;
161 			flags |= VRING_DESC_F_AVAIL;
162 		} else {
163 			flags &= ~VRING_DESC_F_USED;
164 			flags &= ~VRING_DESC_F_AVAIL;
165 		}
166 
167 		if (i > 0) {
168 			vq->desc_packed[vq->last_used_idx].flags = flags;
169 
170 			vhost_log_cache_used_vring(dev, vq,
171 					vq->last_used_idx *
172 					sizeof(struct vring_packed_desc),
173 					sizeof(struct vring_packed_desc));
174 		} else {
175 			head_idx = vq->last_used_idx;
176 			head_flags = flags;
177 		}
178 
179 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
180 	}
181 
182 	vq->desc_packed[head_idx].flags = head_flags;
183 
184 	vhost_log_cache_used_vring(dev, vq,
185 				head_idx *
186 				sizeof(struct vring_packed_desc),
187 				sizeof(struct vring_packed_desc));
188 
189 	vq->shadow_used_idx = 0;
190 	vhost_log_cache_sync(dev, vq);
191 }
192 
193 static __rte_always_inline void
194 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
195 				  struct vhost_virtqueue *vq)
196 {
197 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
198 
199 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
200 	/* desc flags is the synchronization point for virtio packed vring */
201 	__atomic_store_n(&vq->desc_packed[vq->shadow_last_used_idx].flags,
202 			 used_elem->flags, __ATOMIC_RELEASE);
203 
204 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
205 				   sizeof(struct vring_packed_desc),
206 				   sizeof(struct vring_packed_desc));
207 	vq->shadow_used_idx = 0;
208 	vhost_log_cache_sync(dev, vq);
209 }
210 
211 static __rte_always_inline void
212 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
213 				 struct vhost_virtqueue *vq,
214 				 uint64_t *lens,
215 				 uint16_t *ids)
216 {
217 	uint16_t i;
218 	uint16_t flags;
219 	uint16_t last_used_idx;
220 	struct vring_packed_desc *desc_base;
221 
222 	last_used_idx = vq->last_used_idx;
223 	desc_base = &vq->desc_packed[last_used_idx];
224 
225 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
226 
227 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
228 		desc_base[i].id = ids[i];
229 		desc_base[i].len = lens[i];
230 	}
231 
232 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
233 
234 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
235 		desc_base[i].flags = flags;
236 	}
237 
238 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
239 				   sizeof(struct vring_packed_desc),
240 				   sizeof(struct vring_packed_desc) *
241 				   PACKED_BATCH_SIZE);
242 	vhost_log_cache_sync(dev, vq);
243 
244 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
245 }
246 
247 static __rte_always_inline void
248 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
249 					  uint16_t id)
250 {
251 	vq->shadow_used_packed[0].id = id;
252 
253 	if (!vq->shadow_used_idx) {
254 		vq->shadow_last_used_idx = vq->last_used_idx;
255 		vq->shadow_used_packed[0].flags =
256 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
257 		vq->shadow_used_packed[0].len = 0;
258 		vq->shadow_used_packed[0].count = 1;
259 		vq->shadow_used_idx++;
260 	}
261 
262 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
263 }
264 
265 static __rte_always_inline void
266 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
267 				  struct vhost_virtqueue *vq,
268 				  uint16_t *ids)
269 {
270 	uint16_t flags;
271 	uint16_t i;
272 	uint16_t begin;
273 
274 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
275 
276 	if (!vq->shadow_used_idx) {
277 		vq->shadow_last_used_idx = vq->last_used_idx;
278 		vq->shadow_used_packed[0].id  = ids[0];
279 		vq->shadow_used_packed[0].len = 0;
280 		vq->shadow_used_packed[0].count = 1;
281 		vq->shadow_used_packed[0].flags = flags;
282 		vq->shadow_used_idx++;
283 		begin = 1;
284 	} else
285 		begin = 0;
286 
287 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
288 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
289 		vq->desc_packed[vq->last_used_idx + i].len = 0;
290 	}
291 
292 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
293 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
294 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
295 
296 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
297 				   sizeof(struct vring_packed_desc),
298 				   sizeof(struct vring_packed_desc) *
299 				   PACKED_BATCH_SIZE);
300 	vhost_log_cache_sync(dev, vq);
301 
302 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
303 }
304 
305 static __rte_always_inline void
306 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
307 				   uint16_t buf_id,
308 				   uint16_t count)
309 {
310 	uint16_t flags;
311 
312 	flags = vq->desc_packed[vq->last_used_idx].flags;
313 	if (vq->used_wrap_counter) {
314 		flags |= VRING_DESC_F_USED;
315 		flags |= VRING_DESC_F_AVAIL;
316 	} else {
317 		flags &= ~VRING_DESC_F_USED;
318 		flags &= ~VRING_DESC_F_AVAIL;
319 	}
320 
321 	if (!vq->shadow_used_idx) {
322 		vq->shadow_last_used_idx = vq->last_used_idx;
323 
324 		vq->shadow_used_packed[0].id  = buf_id;
325 		vq->shadow_used_packed[0].len = 0;
326 		vq->shadow_used_packed[0].flags = flags;
327 		vq->shadow_used_idx++;
328 	} else {
329 		vq->desc_packed[vq->last_used_idx].id = buf_id;
330 		vq->desc_packed[vq->last_used_idx].len = 0;
331 		vq->desc_packed[vq->last_used_idx].flags = flags;
332 	}
333 
334 	vq_inc_last_used_packed(vq, count);
335 }
336 
337 static __rte_always_inline void
338 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
339 					   uint16_t buf_id,
340 					   uint16_t count)
341 {
342 	uint16_t flags;
343 
344 	vq->shadow_used_packed[0].id = buf_id;
345 
346 	flags = vq->desc_packed[vq->last_used_idx].flags;
347 	if (vq->used_wrap_counter) {
348 		flags |= VRING_DESC_F_USED;
349 		flags |= VRING_DESC_F_AVAIL;
350 	} else {
351 		flags &= ~VRING_DESC_F_USED;
352 		flags &= ~VRING_DESC_F_AVAIL;
353 	}
354 
355 	if (!vq->shadow_used_idx) {
356 		vq->shadow_last_used_idx = vq->last_used_idx;
357 		vq->shadow_used_packed[0].len = 0;
358 		vq->shadow_used_packed[0].flags = flags;
359 		vq->shadow_used_idx++;
360 	}
361 
362 	vq_inc_last_used_packed(vq, count);
363 }
364 
365 static __rte_always_inline void
366 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
367 				   uint32_t *len,
368 				   uint16_t *id,
369 				   uint16_t *count,
370 				   uint16_t num_buffers)
371 {
372 	uint16_t i;
373 
374 	for (i = 0; i < num_buffers; i++) {
375 		/* enqueue shadow flush action aligned with batch num */
376 		if (!vq->shadow_used_idx)
377 			vq->shadow_aligned_idx = vq->last_used_idx &
378 				PACKED_BATCH_MASK;
379 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
380 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
381 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
382 		vq->shadow_aligned_idx += count[i];
383 		vq->shadow_used_idx++;
384 	}
385 }
386 
387 static __rte_always_inline void
388 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
389 				   struct vhost_virtqueue *vq,
390 				   uint32_t *len,
391 				   uint16_t *id,
392 				   uint16_t *count,
393 				   uint16_t num_buffers)
394 {
395 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
396 
397 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
398 		do_data_copy_enqueue(dev, vq);
399 		vhost_flush_enqueue_shadow_packed(dev, vq);
400 	}
401 }
402 
403 /* avoid write operation when necessary, to lessen cache issues */
404 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
405 	if ((var) != (val))			\
406 		(var) = (val);			\
407 } while (0)
408 
409 static __rte_always_inline void
410 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
411 {
412 	uint64_t csum_l4 = m_buf->ol_flags & RTE_MBUF_F_TX_L4_MASK;
413 
414 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
415 		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
416 
417 	if (csum_l4) {
418 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
419 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
420 
421 		switch (csum_l4) {
422 		case RTE_MBUF_F_TX_TCP_CKSUM:
423 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
424 						cksum));
425 			break;
426 		case RTE_MBUF_F_TX_UDP_CKSUM:
427 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
428 						dgram_cksum));
429 			break;
430 		case RTE_MBUF_F_TX_SCTP_CKSUM:
431 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
432 						cksum));
433 			break;
434 		}
435 	} else {
436 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
437 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
438 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
439 	}
440 
441 	/* IP cksum verification cannot be bypassed, then calculate here */
442 	if (m_buf->ol_flags & RTE_MBUF_F_TX_IP_CKSUM) {
443 		struct rte_ipv4_hdr *ipv4_hdr;
444 
445 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
446 						   m_buf->l2_len);
447 		ipv4_hdr->hdr_checksum = 0;
448 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
449 	}
450 
451 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
452 		if (m_buf->ol_flags & RTE_MBUF_F_TX_IPV4)
453 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
454 		else
455 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
456 		net_hdr->gso_size = m_buf->tso_segsz;
457 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
458 					+ m_buf->l4_len;
459 	} else if (m_buf->ol_flags & RTE_MBUF_F_TX_UDP_SEG) {
460 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
461 		net_hdr->gso_size = m_buf->tso_segsz;
462 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
463 			m_buf->l4_len;
464 	} else {
465 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
466 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
467 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
468 	}
469 }
470 
471 static __rte_always_inline int
472 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
473 		struct buf_vector *buf_vec, uint16_t *vec_idx,
474 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
475 {
476 	uint16_t vec_id = *vec_idx;
477 
478 	while (desc_len) {
479 		uint64_t desc_addr;
480 		uint64_t desc_chunck_len = desc_len;
481 
482 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
483 			return -1;
484 
485 		desc_addr = vhost_iova_to_vva(dev, vq,
486 				desc_iova,
487 				&desc_chunck_len,
488 				perm);
489 		if (unlikely(!desc_addr))
490 			return -1;
491 
492 		rte_prefetch0((void *)(uintptr_t)desc_addr);
493 
494 		buf_vec[vec_id].buf_iova = desc_iova;
495 		buf_vec[vec_id].buf_addr = desc_addr;
496 		buf_vec[vec_id].buf_len  = desc_chunck_len;
497 
498 		desc_len -= desc_chunck_len;
499 		desc_iova += desc_chunck_len;
500 		vec_id++;
501 	}
502 	*vec_idx = vec_id;
503 
504 	return 0;
505 }
506 
507 static __rte_always_inline int
508 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
509 			 uint32_t avail_idx, uint16_t *vec_idx,
510 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
511 			 uint32_t *desc_chain_len, uint8_t perm)
512 {
513 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
514 	uint16_t vec_id = *vec_idx;
515 	uint32_t len    = 0;
516 	uint64_t dlen;
517 	uint32_t nr_descs = vq->size;
518 	uint32_t cnt    = 0;
519 	struct vring_desc *descs = vq->desc;
520 	struct vring_desc *idesc = NULL;
521 
522 	if (unlikely(idx >= vq->size))
523 		return -1;
524 
525 	*desc_chain_head = idx;
526 
527 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
528 		dlen = vq->desc[idx].len;
529 		nr_descs = dlen / sizeof(struct vring_desc);
530 		if (unlikely(nr_descs > vq->size))
531 			return -1;
532 
533 		descs = (struct vring_desc *)(uintptr_t)
534 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
535 						&dlen,
536 						VHOST_ACCESS_RO);
537 		if (unlikely(!descs))
538 			return -1;
539 
540 		if (unlikely(dlen < vq->desc[idx].len)) {
541 			/*
542 			 * The indirect desc table is not contiguous
543 			 * in process VA space, we have to copy it.
544 			 */
545 			idesc = vhost_alloc_copy_ind_table(dev, vq,
546 					vq->desc[idx].addr, vq->desc[idx].len);
547 			if (unlikely(!idesc))
548 				return -1;
549 
550 			descs = idesc;
551 		}
552 
553 		idx = 0;
554 	}
555 
556 	while (1) {
557 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
558 			free_ind_table(idesc);
559 			return -1;
560 		}
561 
562 		dlen = descs[idx].len;
563 		len += dlen;
564 
565 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
566 						descs[idx].addr, dlen,
567 						perm))) {
568 			free_ind_table(idesc);
569 			return -1;
570 		}
571 
572 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
573 			break;
574 
575 		idx = descs[idx].next;
576 	}
577 
578 	*desc_chain_len = len;
579 	*vec_idx = vec_id;
580 
581 	if (unlikely(!!idesc))
582 		free_ind_table(idesc);
583 
584 	return 0;
585 }
586 
587 /*
588  * Returns -1 on fail, 0 on success
589  */
590 static inline int
591 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
592 				uint32_t size, struct buf_vector *buf_vec,
593 				uint16_t *num_buffers, uint16_t avail_head,
594 				uint16_t *nr_vec)
595 {
596 	uint16_t cur_idx;
597 	uint16_t vec_idx = 0;
598 	uint16_t max_tries, tries = 0;
599 
600 	uint16_t head_idx = 0;
601 	uint32_t len = 0;
602 
603 	*num_buffers = 0;
604 	cur_idx  = vq->last_avail_idx;
605 
606 	if (rxvq_is_mergeable(dev))
607 		max_tries = vq->size - 1;
608 	else
609 		max_tries = 1;
610 
611 	while (size > 0) {
612 		if (unlikely(cur_idx == avail_head))
613 			return -1;
614 		/*
615 		 * if we tried all available ring items, and still
616 		 * can't get enough buf, it means something abnormal
617 		 * happened.
618 		 */
619 		if (unlikely(++tries > max_tries))
620 			return -1;
621 
622 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
623 						&vec_idx, buf_vec,
624 						&head_idx, &len,
625 						VHOST_ACCESS_RW) < 0))
626 			return -1;
627 		len = RTE_MIN(len, size);
628 		update_shadow_used_ring_split(vq, head_idx, len);
629 		size -= len;
630 
631 		cur_idx++;
632 		*num_buffers += 1;
633 	}
634 
635 	*nr_vec = vec_idx;
636 
637 	return 0;
638 }
639 
640 static __rte_always_inline int
641 fill_vec_buf_packed_indirect(struct virtio_net *dev,
642 			struct vhost_virtqueue *vq,
643 			struct vring_packed_desc *desc, uint16_t *vec_idx,
644 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
645 {
646 	uint16_t i;
647 	uint32_t nr_descs;
648 	uint16_t vec_id = *vec_idx;
649 	uint64_t dlen;
650 	struct vring_packed_desc *descs, *idescs = NULL;
651 
652 	dlen = desc->len;
653 	descs = (struct vring_packed_desc *)(uintptr_t)
654 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
655 	if (unlikely(!descs))
656 		return -1;
657 
658 	if (unlikely(dlen < desc->len)) {
659 		/*
660 		 * The indirect desc table is not contiguous
661 		 * in process VA space, we have to copy it.
662 		 */
663 		idescs = vhost_alloc_copy_ind_table(dev,
664 				vq, desc->addr, desc->len);
665 		if (unlikely(!idescs))
666 			return -1;
667 
668 		descs = idescs;
669 	}
670 
671 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
672 	if (unlikely(nr_descs >= vq->size)) {
673 		free_ind_table(idescs);
674 		return -1;
675 	}
676 
677 	for (i = 0; i < nr_descs; i++) {
678 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
679 			free_ind_table(idescs);
680 			return -1;
681 		}
682 
683 		dlen = descs[i].len;
684 		*len += dlen;
685 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
686 						descs[i].addr, dlen,
687 						perm)))
688 			return -1;
689 	}
690 	*vec_idx = vec_id;
691 
692 	if (unlikely(!!idescs))
693 		free_ind_table(idescs);
694 
695 	return 0;
696 }
697 
698 static __rte_always_inline int
699 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
700 				uint16_t avail_idx, uint16_t *desc_count,
701 				struct buf_vector *buf_vec, uint16_t *vec_idx,
702 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
703 {
704 	bool wrap_counter = vq->avail_wrap_counter;
705 	struct vring_packed_desc *descs = vq->desc_packed;
706 	uint16_t vec_id = *vec_idx;
707 	uint64_t dlen;
708 
709 	if (avail_idx < vq->last_avail_idx)
710 		wrap_counter ^= 1;
711 
712 	/*
713 	 * Perform a load-acquire barrier in desc_is_avail to
714 	 * enforce the ordering between desc flags and desc
715 	 * content.
716 	 */
717 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
718 		return -1;
719 
720 	*desc_count = 0;
721 	*len = 0;
722 
723 	while (1) {
724 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
725 			return -1;
726 
727 		if (unlikely(*desc_count >= vq->size))
728 			return -1;
729 
730 		*desc_count += 1;
731 		*buf_id = descs[avail_idx].id;
732 
733 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
734 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
735 							&descs[avail_idx],
736 							&vec_id, buf_vec,
737 							len, perm) < 0))
738 				return -1;
739 		} else {
740 			dlen = descs[avail_idx].len;
741 			*len += dlen;
742 
743 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
744 							descs[avail_idx].addr,
745 							dlen,
746 							perm)))
747 				return -1;
748 		}
749 
750 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
751 			break;
752 
753 		if (++avail_idx >= vq->size) {
754 			avail_idx -= vq->size;
755 			wrap_counter ^= 1;
756 		}
757 	}
758 
759 	*vec_idx = vec_id;
760 
761 	return 0;
762 }
763 
764 static __rte_noinline void
765 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
766 		struct buf_vector *buf_vec,
767 		struct virtio_net_hdr_mrg_rxbuf *hdr)
768 {
769 	uint64_t len;
770 	uint64_t remain = dev->vhost_hlen;
771 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
772 	uint64_t iova = buf_vec->buf_iova;
773 
774 	while (remain) {
775 		len = RTE_MIN(remain,
776 				buf_vec->buf_len);
777 		dst = buf_vec->buf_addr;
778 		rte_memcpy((void *)(uintptr_t)dst,
779 				(void *)(uintptr_t)src,
780 				len);
781 
782 		PRINT_PACKET(dev, (uintptr_t)dst,
783 				(uint32_t)len, 0);
784 		vhost_log_cache_write_iova(dev, vq,
785 				iova, len);
786 
787 		remain -= len;
788 		iova += len;
789 		src += len;
790 		buf_vec++;
791 	}
792 }
793 
794 static __rte_always_inline int
795 async_iter_initialize(struct vhost_async *async)
796 {
797 	struct rte_vhost_iov_iter *iter;
798 
799 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
800 		VHOST_LOG_DATA(ERR, "no more async iovec available\n");
801 		return -1;
802 	}
803 
804 	iter = async->iov_iter + async->iter_idx;
805 	iter->iov = async->iovec + async->iovec_idx;
806 	iter->nr_segs = 0;
807 
808 	return 0;
809 }
810 
811 static __rte_always_inline int
812 async_iter_add_iovec(struct vhost_async *async, void *src, void *dst, size_t len)
813 {
814 	struct rte_vhost_iov_iter *iter;
815 	struct rte_vhost_iovec *iovec;
816 
817 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
818 		static bool vhost_max_async_vec_log;
819 
820 		if (!vhost_max_async_vec_log) {
821 			VHOST_LOG_DATA(ERR, "no more async iovec available\n");
822 			vhost_max_async_vec_log = true;
823 		}
824 
825 		return -1;
826 	}
827 
828 	iter = async->iov_iter + async->iter_idx;
829 	iovec = async->iovec + async->iovec_idx;
830 
831 	iovec->src_addr = src;
832 	iovec->dst_addr = dst;
833 	iovec->len = len;
834 
835 	iter->nr_segs++;
836 	async->iovec_idx++;
837 
838 	return 0;
839 }
840 
841 static __rte_always_inline void
842 async_iter_finalize(struct vhost_async *async)
843 {
844 	async->iter_idx++;
845 }
846 
847 static __rte_always_inline void
848 async_iter_cancel(struct vhost_async *async)
849 {
850 	struct rte_vhost_iov_iter *iter;
851 
852 	iter = async->iov_iter + async->iter_idx;
853 	async->iovec_idx -= iter->nr_segs;
854 	iter->nr_segs = 0;
855 	iter->iov = NULL;
856 }
857 
858 static __rte_always_inline void
859 async_iter_reset(struct vhost_async *async)
860 {
861 	async->iter_idx = 0;
862 	async->iovec_idx = 0;
863 }
864 
865 static __rte_always_inline int
866 async_mbuf_to_desc_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
867 		struct rte_mbuf *m, uint32_t mbuf_offset,
868 		uint64_t buf_iova, uint32_t cpy_len)
869 {
870 	struct vhost_async *async = vq->async;
871 	uint64_t mapped_len;
872 	uint32_t buf_offset = 0;
873 	void *hpa;
874 
875 	while (cpy_len) {
876 		hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev,
877 				buf_iova + buf_offset, cpy_len, &mapped_len);
878 		if (unlikely(!hpa)) {
879 			VHOST_LOG_DATA(ERR, "(%d) %s: failed to get hpa.\n", dev->vid, __func__);
880 			return -1;
881 		}
882 
883 		if (unlikely(async_iter_add_iovec(async,
884 						(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
885 							mbuf_offset),
886 						hpa, (size_t)mapped_len)))
887 			return -1;
888 
889 		cpy_len -= (uint32_t)mapped_len;
890 		mbuf_offset += (uint32_t)mapped_len;
891 		buf_offset += (uint32_t)mapped_len;
892 	}
893 
894 	return 0;
895 }
896 
897 static __rte_always_inline void
898 sync_mbuf_to_desc_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
899 		struct rte_mbuf *m, uint32_t mbuf_offset,
900 		uint64_t buf_addr, uint64_t buf_iova, uint32_t cpy_len)
901 {
902 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
903 
904 	if (likely(cpy_len > MAX_BATCH_LEN || vq->batch_copy_nb_elems >= vq->size)) {
905 		rte_memcpy((void *)((uintptr_t)(buf_addr)),
906 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
907 				cpy_len);
908 		vhost_log_cache_write_iova(dev, vq, buf_iova, cpy_len);
909 		PRINT_PACKET(dev, (uintptr_t)(buf_addr), cpy_len, 0);
910 	} else {
911 		batch_copy[vq->batch_copy_nb_elems].dst =
912 			(void *)((uintptr_t)(buf_addr));
913 		batch_copy[vq->batch_copy_nb_elems].src =
914 			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
915 		batch_copy[vq->batch_copy_nb_elems].log_addr = buf_iova;
916 		batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
917 		vq->batch_copy_nb_elems++;
918 	}
919 }
920 
921 static __rte_always_inline int
922 mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
923 		struct rte_mbuf *m, struct buf_vector *buf_vec,
924 		uint16_t nr_vec, uint16_t num_buffers, bool is_async)
925 {
926 	uint32_t vec_idx = 0;
927 	uint32_t mbuf_offset, mbuf_avail;
928 	uint32_t buf_offset, buf_avail;
929 	uint64_t buf_addr, buf_iova, buf_len;
930 	uint32_t cpy_len;
931 	uint64_t hdr_addr;
932 	struct rte_mbuf *hdr_mbuf;
933 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
934 	struct vhost_async *async = vq->async;
935 
936 	if (unlikely(m == NULL))
937 		return -1;
938 
939 	buf_addr = buf_vec[vec_idx].buf_addr;
940 	buf_iova = buf_vec[vec_idx].buf_iova;
941 	buf_len = buf_vec[vec_idx].buf_len;
942 
943 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
944 		return -1;
945 
946 	hdr_mbuf = m;
947 	hdr_addr = buf_addr;
948 	if (unlikely(buf_len < dev->vhost_hlen)) {
949 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
950 		hdr = &tmp_hdr;
951 	} else
952 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
953 
954 	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
955 		dev->vid, num_buffers);
956 
957 	if (unlikely(buf_len < dev->vhost_hlen)) {
958 		buf_offset = dev->vhost_hlen - buf_len;
959 		vec_idx++;
960 		buf_addr = buf_vec[vec_idx].buf_addr;
961 		buf_iova = buf_vec[vec_idx].buf_iova;
962 		buf_len = buf_vec[vec_idx].buf_len;
963 		buf_avail = buf_len - buf_offset;
964 	} else {
965 		buf_offset = dev->vhost_hlen;
966 		buf_avail = buf_len - dev->vhost_hlen;
967 	}
968 
969 	mbuf_avail  = rte_pktmbuf_data_len(m);
970 	mbuf_offset = 0;
971 
972 	if (is_async) {
973 		if (async_iter_initialize(async))
974 			return -1;
975 	}
976 
977 	while (mbuf_avail != 0 || m->next != NULL) {
978 		/* done with current buf, get the next one */
979 		if (buf_avail == 0) {
980 			vec_idx++;
981 			if (unlikely(vec_idx >= nr_vec))
982 				goto error;
983 
984 			buf_addr = buf_vec[vec_idx].buf_addr;
985 			buf_iova = buf_vec[vec_idx].buf_iova;
986 			buf_len = buf_vec[vec_idx].buf_len;
987 
988 			buf_offset = 0;
989 			buf_avail  = buf_len;
990 		}
991 
992 		/* done with current mbuf, get the next one */
993 		if (mbuf_avail == 0) {
994 			m = m->next;
995 
996 			mbuf_offset = 0;
997 			mbuf_avail  = rte_pktmbuf_data_len(m);
998 		}
999 
1000 		if (hdr_addr) {
1001 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1002 			if (rxvq_is_mergeable(dev))
1003 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1004 						num_buffers);
1005 
1006 			if (unlikely(hdr == &tmp_hdr)) {
1007 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1008 			} else {
1009 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1010 						dev->vhost_hlen, 0);
1011 				vhost_log_cache_write_iova(dev, vq,
1012 						buf_vec[0].buf_iova,
1013 						dev->vhost_hlen);
1014 			}
1015 
1016 			hdr_addr = 0;
1017 		}
1018 
1019 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1020 
1021 		if (is_async) {
1022 			if (async_mbuf_to_desc_seg(dev, vq, m, mbuf_offset,
1023 						buf_iova + buf_offset, cpy_len) < 0)
1024 				goto error;
1025 		} else {
1026 			sync_mbuf_to_desc_seg(dev, vq, m, mbuf_offset,
1027 					buf_addr + buf_offset,
1028 					buf_iova + buf_offset, cpy_len);
1029 		}
1030 
1031 		mbuf_avail  -= cpy_len;
1032 		mbuf_offset += cpy_len;
1033 		buf_avail  -= cpy_len;
1034 		buf_offset += cpy_len;
1035 	}
1036 
1037 	if (is_async)
1038 		async_iter_finalize(async);
1039 
1040 	return 0;
1041 error:
1042 	if (is_async)
1043 		async_iter_cancel(async);
1044 
1045 	return -1;
1046 }
1047 
1048 static __rte_always_inline int
1049 vhost_enqueue_single_packed(struct virtio_net *dev,
1050 			    struct vhost_virtqueue *vq,
1051 			    struct rte_mbuf *pkt,
1052 			    struct buf_vector *buf_vec,
1053 			    uint16_t *nr_descs)
1054 {
1055 	uint16_t nr_vec = 0;
1056 	uint16_t avail_idx = vq->last_avail_idx;
1057 	uint16_t max_tries, tries = 0;
1058 	uint16_t buf_id = 0;
1059 	uint32_t len = 0;
1060 	uint16_t desc_count;
1061 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1062 	uint16_t num_buffers = 0;
1063 	uint32_t buffer_len[vq->size];
1064 	uint16_t buffer_buf_id[vq->size];
1065 	uint16_t buffer_desc_count[vq->size];
1066 
1067 	if (rxvq_is_mergeable(dev))
1068 		max_tries = vq->size - 1;
1069 	else
1070 		max_tries = 1;
1071 
1072 	while (size > 0) {
1073 		/*
1074 		 * if we tried all available ring items, and still
1075 		 * can't get enough buf, it means something abnormal
1076 		 * happened.
1077 		 */
1078 		if (unlikely(++tries > max_tries))
1079 			return -1;
1080 
1081 		if (unlikely(fill_vec_buf_packed(dev, vq,
1082 						avail_idx, &desc_count,
1083 						buf_vec, &nr_vec,
1084 						&buf_id, &len,
1085 						VHOST_ACCESS_RW) < 0))
1086 			return -1;
1087 
1088 		len = RTE_MIN(len, size);
1089 		size -= len;
1090 
1091 		buffer_len[num_buffers] = len;
1092 		buffer_buf_id[num_buffers] = buf_id;
1093 		buffer_desc_count[num_buffers] = desc_count;
1094 		num_buffers += 1;
1095 
1096 		*nr_descs += desc_count;
1097 		avail_idx += desc_count;
1098 		if (avail_idx >= vq->size)
1099 			avail_idx -= vq->size;
1100 	}
1101 
1102 	if (mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers, false) < 0)
1103 		return -1;
1104 
1105 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1106 					   buffer_desc_count, num_buffers);
1107 
1108 	return 0;
1109 }
1110 
1111 static __rte_noinline uint32_t
1112 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1113 	struct rte_mbuf **pkts, uint32_t count)
1114 {
1115 	uint32_t pkt_idx = 0;
1116 	uint16_t num_buffers;
1117 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1118 	uint16_t avail_head;
1119 
1120 	/*
1121 	 * The ordering between avail index and
1122 	 * desc reads needs to be enforced.
1123 	 */
1124 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1125 
1126 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1127 
1128 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1129 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1130 		uint16_t nr_vec = 0;
1131 
1132 		if (unlikely(reserve_avail_buf_split(dev, vq,
1133 						pkt_len, buf_vec, &num_buffers,
1134 						avail_head, &nr_vec) < 0)) {
1135 			VHOST_LOG_DATA(DEBUG,
1136 				"(%d) failed to get enough desc from vring\n",
1137 				dev->vid);
1138 			vq->shadow_used_idx -= num_buffers;
1139 			break;
1140 		}
1141 
1142 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1143 			dev->vid, vq->last_avail_idx,
1144 			vq->last_avail_idx + num_buffers);
1145 
1146 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec,
1147 					num_buffers, false) < 0) {
1148 			vq->shadow_used_idx -= num_buffers;
1149 			break;
1150 		}
1151 
1152 		vq->last_avail_idx += num_buffers;
1153 	}
1154 
1155 	do_data_copy_enqueue(dev, vq);
1156 
1157 	if (likely(vq->shadow_used_idx)) {
1158 		flush_shadow_used_ring_split(dev, vq);
1159 		vhost_vring_call_split(dev, vq);
1160 	}
1161 
1162 	return pkt_idx;
1163 }
1164 
1165 static __rte_always_inline int
1166 virtio_dev_rx_sync_batch_check(struct virtio_net *dev,
1167 			   struct vhost_virtqueue *vq,
1168 			   struct rte_mbuf **pkts,
1169 			   uint64_t *desc_addrs,
1170 			   uint64_t *lens)
1171 {
1172 	bool wrap_counter = vq->avail_wrap_counter;
1173 	struct vring_packed_desc *descs = vq->desc_packed;
1174 	uint16_t avail_idx = vq->last_avail_idx;
1175 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1176 	uint16_t i;
1177 
1178 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1179 		return -1;
1180 
1181 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1182 		return -1;
1183 
1184 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1185 		if (unlikely(pkts[i]->next != NULL))
1186 			return -1;
1187 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1188 					    wrap_counter)))
1189 			return -1;
1190 	}
1191 
1192 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1193 		lens[i] = descs[avail_idx + i].len;
1194 
1195 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1196 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1197 			return -1;
1198 	}
1199 
1200 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1201 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1202 						  descs[avail_idx + i].addr,
1203 						  &lens[i],
1204 						  VHOST_ACCESS_RW);
1205 
1206 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1207 		if (unlikely(!desc_addrs[i]))
1208 			return -1;
1209 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1210 			return -1;
1211 	}
1212 
1213 	return 0;
1214 }
1215 
1216 static __rte_always_inline void
1217 virtio_dev_rx_batch_packed_copy(struct virtio_net *dev,
1218 			   struct vhost_virtqueue *vq,
1219 			   struct rte_mbuf **pkts,
1220 			   uint64_t *desc_addrs,
1221 			   uint64_t *lens)
1222 {
1223 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1224 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1225 	struct vring_packed_desc *descs = vq->desc_packed;
1226 	uint16_t avail_idx = vq->last_avail_idx;
1227 	uint16_t ids[PACKED_BATCH_SIZE];
1228 	uint16_t i;
1229 
1230 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1231 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1232 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1233 					(uintptr_t)desc_addrs[i];
1234 		lens[i] = pkts[i]->pkt_len +
1235 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1236 	}
1237 
1238 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1239 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1240 
1241 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1242 
1243 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1244 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1245 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1246 			   pkts[i]->pkt_len);
1247 	}
1248 
1249 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1250 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1251 					   lens[i]);
1252 
1253 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1254 		ids[i] = descs[avail_idx + i].id;
1255 
1256 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1257 }
1258 
1259 static __rte_always_inline int
1260 virtio_dev_rx_sync_batch_packed(struct virtio_net *dev,
1261 			   struct vhost_virtqueue *vq,
1262 			   struct rte_mbuf **pkts)
1263 {
1264 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1265 	uint64_t lens[PACKED_BATCH_SIZE];
1266 
1267 	if (virtio_dev_rx_sync_batch_check(dev, vq, pkts, desc_addrs, lens) == -1)
1268 		return -1;
1269 
1270 	if (vq->shadow_used_idx) {
1271 		do_data_copy_enqueue(dev, vq);
1272 		vhost_flush_enqueue_shadow_packed(dev, vq);
1273 	}
1274 
1275 	virtio_dev_rx_batch_packed_copy(dev, vq, pkts, desc_addrs, lens);
1276 
1277 	return 0;
1278 }
1279 
1280 static __rte_always_inline int16_t
1281 virtio_dev_rx_single_packed(struct virtio_net *dev,
1282 			    struct vhost_virtqueue *vq,
1283 			    struct rte_mbuf *pkt)
1284 {
1285 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1286 	uint16_t nr_descs = 0;
1287 
1288 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1289 						 &nr_descs) < 0)) {
1290 		VHOST_LOG_DATA(DEBUG,
1291 				"(%d) failed to get enough desc from vring\n",
1292 				dev->vid);
1293 		return -1;
1294 	}
1295 
1296 	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1297 			dev->vid, vq->last_avail_idx,
1298 			vq->last_avail_idx + nr_descs);
1299 
1300 	vq_inc_last_avail_packed(vq, nr_descs);
1301 
1302 	return 0;
1303 }
1304 
1305 static __rte_noinline uint32_t
1306 virtio_dev_rx_packed(struct virtio_net *dev,
1307 		     struct vhost_virtqueue *__rte_restrict vq,
1308 		     struct rte_mbuf **__rte_restrict pkts,
1309 		     uint32_t count)
1310 {
1311 	uint32_t pkt_idx = 0;
1312 
1313 	do {
1314 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1315 
1316 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1317 			if (!virtio_dev_rx_sync_batch_packed(dev, vq,
1318 							&pkts[pkt_idx])) {
1319 				pkt_idx += PACKED_BATCH_SIZE;
1320 				continue;
1321 			}
1322 		}
1323 
1324 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1325 			break;
1326 		pkt_idx++;
1327 
1328 	} while (pkt_idx < count);
1329 
1330 	if (vq->shadow_used_idx) {
1331 		do_data_copy_enqueue(dev, vq);
1332 		vhost_flush_enqueue_shadow_packed(dev, vq);
1333 	}
1334 
1335 	if (pkt_idx)
1336 		vhost_vring_call_packed(dev, vq);
1337 
1338 	return pkt_idx;
1339 }
1340 
1341 static __rte_always_inline uint32_t
1342 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
1343 	struct rte_mbuf **pkts, uint32_t count)
1344 {
1345 	struct vhost_virtqueue *vq;
1346 	uint32_t nb_tx = 0;
1347 
1348 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1349 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1350 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1351 			dev->vid, __func__, queue_id);
1352 		return 0;
1353 	}
1354 
1355 	vq = dev->virtqueue[queue_id];
1356 
1357 	rte_spinlock_lock(&vq->access_lock);
1358 
1359 	if (unlikely(!vq->enabled))
1360 		goto out_access_unlock;
1361 
1362 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1363 		vhost_user_iotlb_rd_lock(vq);
1364 
1365 	if (unlikely(!vq->access_ok))
1366 		if (unlikely(vring_translate(dev, vq) < 0))
1367 			goto out;
1368 
1369 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1370 	if (count == 0)
1371 		goto out;
1372 
1373 	if (vq_is_packed(dev))
1374 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1375 	else
1376 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1377 
1378 out:
1379 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1380 		vhost_user_iotlb_rd_unlock(vq);
1381 
1382 out_access_unlock:
1383 	rte_spinlock_unlock(&vq->access_lock);
1384 
1385 	return nb_tx;
1386 }
1387 
1388 uint16_t
1389 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1390 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1391 {
1392 	struct virtio_net *dev = get_device(vid);
1393 
1394 	if (!dev)
1395 		return 0;
1396 
1397 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1398 		VHOST_LOG_DATA(ERR,
1399 			"(%d) %s: built-in vhost net backend is disabled.\n",
1400 			dev->vid, __func__);
1401 		return 0;
1402 	}
1403 
1404 	return virtio_dev_rx(dev, queue_id, pkts, count);
1405 }
1406 
1407 static __rte_always_inline uint16_t
1408 async_get_first_inflight_pkt_idx(struct vhost_virtqueue *vq)
1409 {
1410 	struct vhost_async *async = vq->async;
1411 
1412 	if (async->pkts_idx >= async->pkts_inflight_n)
1413 		return async->pkts_idx - async->pkts_inflight_n;
1414 	else
1415 		return vq->size - async->pkts_inflight_n + async->pkts_idx;
1416 }
1417 
1418 static __rte_always_inline void
1419 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1420 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1421 {
1422 	size_t elem_size = sizeof(struct vring_used_elem);
1423 
1424 	if (d_idx + count <= ring_size) {
1425 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1426 	} else {
1427 		uint16_t size = ring_size - d_idx;
1428 
1429 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1430 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1431 	}
1432 }
1433 
1434 static __rte_always_inline void
1435 store_dma_desc_info_packed(struct vring_used_elem_packed *s_ring,
1436 		struct vring_used_elem_packed *d_ring,
1437 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1438 {
1439 	size_t elem_size = sizeof(struct vring_used_elem_packed);
1440 
1441 	if (d_idx + count <= ring_size) {
1442 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1443 	} else {
1444 		uint16_t size = ring_size - d_idx;
1445 
1446 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1447 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1448 	}
1449 }
1450 
1451 static __rte_noinline uint32_t
1452 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
1453 	struct vhost_virtqueue *vq, uint16_t queue_id,
1454 	struct rte_mbuf **pkts, uint32_t count)
1455 {
1456 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1457 	uint32_t pkt_idx = 0;
1458 	uint16_t num_buffers;
1459 	uint16_t avail_head;
1460 
1461 	struct vhost_async *async = vq->async;
1462 	struct async_inflight_info *pkts_info = async->pkts_info;
1463 	uint32_t pkt_err = 0;
1464 	int32_t n_xfer;
1465 	uint16_t slot_idx = 0;
1466 
1467 	/*
1468 	 * The ordering between avail index and desc reads need to be enforced.
1469 	 */
1470 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1471 
1472 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1473 
1474 	async_iter_reset(async);
1475 
1476 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1477 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1478 		uint16_t nr_vec = 0;
1479 
1480 		if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec,
1481 						&num_buffers, avail_head, &nr_vec) < 0)) {
1482 			VHOST_LOG_DATA(DEBUG, "(%d) failed to get enough desc from vring\n",
1483 					dev->vid);
1484 			vq->shadow_used_idx -= num_buffers;
1485 			break;
1486 		}
1487 
1488 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1489 			dev->vid, vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1490 
1491 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers, true) < 0) {
1492 			vq->shadow_used_idx -= num_buffers;
1493 			break;
1494 		}
1495 
1496 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
1497 		pkts_info[slot_idx].descs = num_buffers;
1498 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1499 
1500 		vq->last_avail_idx += num_buffers;
1501 	}
1502 
1503 	if (unlikely(pkt_idx == 0))
1504 		return 0;
1505 
1506 	n_xfer = async->ops.transfer_data(dev->vid, queue_id, async->iov_iter, 0, pkt_idx);
1507 	if (unlikely(n_xfer < 0)) {
1508 		VHOST_LOG_DATA(ERR, "(%d) %s: failed to transfer data for queue id %d.\n",
1509 				dev->vid, __func__, queue_id);
1510 		n_xfer = 0;
1511 	}
1512 
1513 	pkt_err = pkt_idx - n_xfer;
1514 	if (unlikely(pkt_err)) {
1515 		uint16_t num_descs = 0;
1516 
1517 		/* update number of completed packets */
1518 		pkt_idx = n_xfer;
1519 
1520 		/* calculate the sum of descriptors to revert */
1521 		while (pkt_err-- > 0) {
1522 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1523 			slot_idx--;
1524 		}
1525 
1526 		/* recover shadow used ring and available ring */
1527 		vq->shadow_used_idx -= num_descs;
1528 		vq->last_avail_idx -= num_descs;
1529 	}
1530 
1531 	/* keep used descriptors */
1532 	if (likely(vq->shadow_used_idx)) {
1533 		uint16_t to = async->desc_idx_split & (vq->size - 1);
1534 
1535 		store_dma_desc_info_split(vq->shadow_used_split,
1536 				async->descs_split, vq->size, 0, to,
1537 				vq->shadow_used_idx);
1538 
1539 		async->desc_idx_split += vq->shadow_used_idx;
1540 
1541 		async->pkts_idx += pkt_idx;
1542 		if (async->pkts_idx >= vq->size)
1543 			async->pkts_idx -= vq->size;
1544 
1545 		async->pkts_inflight_n += pkt_idx;
1546 		vq->shadow_used_idx = 0;
1547 	}
1548 
1549 	return pkt_idx;
1550 }
1551 
1552 static __rte_always_inline void
1553 vhost_update_used_packed(struct vhost_virtqueue *vq,
1554 			struct vring_used_elem_packed *shadow_ring,
1555 			uint16_t count)
1556 {
1557 	int i;
1558 	uint16_t used_idx = vq->last_used_idx;
1559 	uint16_t head_idx = vq->last_used_idx;
1560 	uint16_t head_flags = 0;
1561 
1562 	if (count == 0)
1563 		return;
1564 
1565 	/* Split loop in two to save memory barriers */
1566 	for (i = 0; i < count; i++) {
1567 		vq->desc_packed[used_idx].id = shadow_ring[i].id;
1568 		vq->desc_packed[used_idx].len = shadow_ring[i].len;
1569 
1570 		used_idx += shadow_ring[i].count;
1571 		if (used_idx >= vq->size)
1572 			used_idx -= vq->size;
1573 	}
1574 
1575 	/* The ordering for storing desc flags needs to be enforced. */
1576 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
1577 
1578 	for (i = 0; i < count; i++) {
1579 		uint16_t flags;
1580 
1581 		if (vq->shadow_used_packed[i].len)
1582 			flags = VRING_DESC_F_WRITE;
1583 		else
1584 			flags = 0;
1585 
1586 		if (vq->used_wrap_counter) {
1587 			flags |= VRING_DESC_F_USED;
1588 			flags |= VRING_DESC_F_AVAIL;
1589 		} else {
1590 			flags &= ~VRING_DESC_F_USED;
1591 			flags &= ~VRING_DESC_F_AVAIL;
1592 		}
1593 
1594 		if (i > 0) {
1595 			vq->desc_packed[vq->last_used_idx].flags = flags;
1596 		} else {
1597 			head_idx = vq->last_used_idx;
1598 			head_flags = flags;
1599 		}
1600 
1601 		vq_inc_last_used_packed(vq, shadow_ring[i].count);
1602 	}
1603 
1604 	vq->desc_packed[head_idx].flags = head_flags;
1605 }
1606 
1607 static __rte_always_inline int
1608 vhost_enqueue_async_packed(struct virtio_net *dev,
1609 			    struct vhost_virtqueue *vq,
1610 			    struct rte_mbuf *pkt,
1611 			    struct buf_vector *buf_vec,
1612 			    uint16_t *nr_descs,
1613 			    uint16_t *nr_buffers)
1614 {
1615 	uint16_t nr_vec = 0;
1616 	uint16_t avail_idx = vq->last_avail_idx;
1617 	uint16_t max_tries, tries = 0;
1618 	uint16_t buf_id = 0;
1619 	uint32_t len = 0;
1620 	uint16_t desc_count = 0;
1621 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1622 	uint32_t buffer_len[vq->size];
1623 	uint16_t buffer_buf_id[vq->size];
1624 	uint16_t buffer_desc_count[vq->size];
1625 
1626 	if (rxvq_is_mergeable(dev))
1627 		max_tries = vq->size - 1;
1628 	else
1629 		max_tries = 1;
1630 
1631 	while (size > 0) {
1632 		/*
1633 		 * if we tried all available ring items, and still
1634 		 * can't get enough buf, it means something abnormal
1635 		 * happened.
1636 		 */
1637 		if (unlikely(++tries > max_tries))
1638 			return -1;
1639 
1640 		if (unlikely(fill_vec_buf_packed(dev, vq,
1641 						avail_idx, &desc_count,
1642 						buf_vec, &nr_vec,
1643 						&buf_id, &len,
1644 						VHOST_ACCESS_RW) < 0))
1645 			return -1;
1646 
1647 		len = RTE_MIN(len, size);
1648 		size -= len;
1649 
1650 		buffer_len[*nr_buffers] = len;
1651 		buffer_buf_id[*nr_buffers] = buf_id;
1652 		buffer_desc_count[*nr_buffers] = desc_count;
1653 		*nr_buffers += 1;
1654 		*nr_descs += desc_count;
1655 		avail_idx += desc_count;
1656 		if (avail_idx >= vq->size)
1657 			avail_idx -= vq->size;
1658 	}
1659 
1660 	if (unlikely(mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, true) < 0))
1661 		return -1;
1662 
1663 	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id, buffer_desc_count, *nr_buffers);
1664 
1665 	return 0;
1666 }
1667 
1668 static __rte_always_inline int16_t
1669 virtio_dev_rx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1670 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers)
1671 {
1672 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1673 
1674 	if (unlikely(vhost_enqueue_async_packed(dev, vq, pkt, buf_vec,
1675 					nr_descs, nr_buffers) < 0)) {
1676 		VHOST_LOG_DATA(DEBUG, "(%d) failed to get enough desc from vring\n", dev->vid);
1677 		return -1;
1678 	}
1679 
1680 	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1681 			dev->vid, vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1682 
1683 	return 0;
1684 }
1685 
1686 static __rte_always_inline void
1687 dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
1688 			uint32_t nr_err, uint32_t *pkt_idx)
1689 {
1690 	uint16_t descs_err = 0;
1691 	uint16_t buffers_err = 0;
1692 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
1693 
1694 	*pkt_idx -= nr_err;
1695 	/* calculate the sum of buffers and descs of DMA-error packets. */
1696 	while (nr_err-- > 0) {
1697 		descs_err += pkts_info[slot_idx % vq->size].descs;
1698 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
1699 		slot_idx--;
1700 	}
1701 
1702 	if (vq->last_avail_idx >= descs_err) {
1703 		vq->last_avail_idx -= descs_err;
1704 	} else {
1705 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
1706 		vq->avail_wrap_counter ^= 1;
1707 	}
1708 
1709 	vq->shadow_used_idx -= buffers_err;
1710 }
1711 
1712 static __rte_noinline uint32_t
1713 virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
1714 	struct vhost_virtqueue *vq, uint16_t queue_id,
1715 	struct rte_mbuf **pkts, uint32_t count)
1716 {
1717 	uint32_t pkt_idx = 0;
1718 	uint32_t remained = count;
1719 	int32_t n_xfer;
1720 	uint16_t num_buffers;
1721 	uint16_t num_descs;
1722 
1723 	struct vhost_async *async = vq->async;
1724 	struct async_inflight_info *pkts_info = async->pkts_info;
1725 	uint32_t pkt_err = 0;
1726 	uint16_t slot_idx = 0;
1727 
1728 	do {
1729 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1730 
1731 		num_buffers = 0;
1732 		num_descs = 0;
1733 		if (unlikely(virtio_dev_rx_async_packed(dev, vq, pkts[pkt_idx],
1734 						&num_descs, &num_buffers) < 0))
1735 			break;
1736 
1737 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
1738 
1739 		pkts_info[slot_idx].descs = num_descs;
1740 		pkts_info[slot_idx].nr_buffers = num_buffers;
1741 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1742 
1743 		pkt_idx++;
1744 		remained--;
1745 		vq_inc_last_avail_packed(vq, num_descs);
1746 	} while (pkt_idx < count);
1747 
1748 	if (unlikely(pkt_idx == 0))
1749 		return 0;
1750 
1751 	n_xfer = async->ops.transfer_data(dev->vid, queue_id, async->iov_iter, 0, pkt_idx);
1752 	if (unlikely(n_xfer < 0)) {
1753 		VHOST_LOG_DATA(ERR, "(%d) %s: failed to transfer data for queue id %d.\n",
1754 				dev->vid, __func__, queue_id);
1755 		n_xfer = 0;
1756 	}
1757 
1758 	pkt_err = pkt_idx - n_xfer;
1759 
1760 	async_iter_reset(async);
1761 
1762 	if (unlikely(pkt_err))
1763 		dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
1764 
1765 	if (likely(vq->shadow_used_idx)) {
1766 		/* keep used descriptors. */
1767 		store_dma_desc_info_packed(vq->shadow_used_packed, async->buffers_packed,
1768 					vq->size, 0, async->buffer_idx_packed,
1769 					vq->shadow_used_idx);
1770 
1771 		async->buffer_idx_packed += vq->shadow_used_idx;
1772 		if (async->buffer_idx_packed >= vq->size)
1773 			async->buffer_idx_packed -= vq->size;
1774 
1775 		async->pkts_idx += pkt_idx;
1776 		if (async->pkts_idx >= vq->size)
1777 			async->pkts_idx -= vq->size;
1778 
1779 		vq->shadow_used_idx = 0;
1780 		async->pkts_inflight_n += pkt_idx;
1781 	}
1782 
1783 	return pkt_idx;
1784 }
1785 
1786 static __rte_always_inline void
1787 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
1788 {
1789 	struct vhost_async *async = vq->async;
1790 	uint16_t nr_left = n_descs;
1791 	uint16_t nr_copy;
1792 	uint16_t to, from;
1793 
1794 	do {
1795 		from = async->last_desc_idx_split & (vq->size - 1);
1796 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
1797 		to = vq->last_used_idx & (vq->size - 1);
1798 
1799 		if (to + nr_copy <= vq->size) {
1800 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
1801 					nr_copy * sizeof(struct vring_used_elem));
1802 		} else {
1803 			uint16_t size = vq->size - to;
1804 
1805 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
1806 					size * sizeof(struct vring_used_elem));
1807 			rte_memcpy(&vq->used->ring[0], &async->descs_split[from + size],
1808 					(nr_copy - size) * sizeof(struct vring_used_elem));
1809 		}
1810 
1811 		async->last_desc_idx_split += nr_copy;
1812 		vq->last_used_idx += nr_copy;
1813 		nr_left -= nr_copy;
1814 	} while (nr_left > 0);
1815 }
1816 
1817 static __rte_always_inline void
1818 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
1819 				uint16_t n_buffers)
1820 {
1821 	struct vhost_async *async = vq->async;
1822 	uint16_t nr_left = n_buffers;
1823 	uint16_t from, to;
1824 
1825 	do {
1826 		from = async->last_buffer_idx_packed;
1827 		to = (from + nr_left) % vq->size;
1828 		if (to > from) {
1829 			vhost_update_used_packed(vq, async->buffers_packed + from, to - from);
1830 			async->last_buffer_idx_packed += nr_left;
1831 			nr_left = 0;
1832 		} else {
1833 			vhost_update_used_packed(vq, async->buffers_packed + from,
1834 				vq->size - from);
1835 			async->last_buffer_idx_packed = 0;
1836 			nr_left -= vq->size - from;
1837 		}
1838 	} while (nr_left > 0);
1839 }
1840 
1841 static __rte_always_inline uint16_t
1842 vhost_poll_enqueue_completed(struct virtio_net *dev, uint16_t queue_id,
1843 		struct rte_mbuf **pkts, uint16_t count)
1844 {
1845 	struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
1846 	struct vhost_async *async = vq->async;
1847 	struct async_inflight_info *pkts_info = async->pkts_info;
1848 	int32_t n_cpl;
1849 	uint16_t n_descs = 0, n_buffers = 0;
1850 	uint16_t start_idx, from, i;
1851 
1852 	n_cpl = async->ops.check_completed_copies(dev->vid, queue_id, 0, count);
1853 	if (unlikely(n_cpl < 0)) {
1854 		VHOST_LOG_DATA(ERR, "(%d) %s: failed to check completed copies for queue id %d.\n",
1855 				dev->vid, __func__, queue_id);
1856 		return 0;
1857 	}
1858 
1859 	if (n_cpl == 0)
1860 		return 0;
1861 
1862 	start_idx = async_get_first_inflight_pkt_idx(vq);
1863 
1864 	for (i = 0; i < n_cpl; i++) {
1865 		from = (start_idx + i) % vq->size;
1866 		/* Only used with packed ring */
1867 		n_buffers += pkts_info[from].nr_buffers;
1868 		/* Only used with split ring */
1869 		n_descs += pkts_info[from].descs;
1870 		pkts[i] = pkts_info[from].mbuf;
1871 	}
1872 
1873 	async->pkts_inflight_n -= n_cpl;
1874 
1875 	if (likely(vq->enabled && vq->access_ok)) {
1876 		if (vq_is_packed(dev)) {
1877 			write_back_completed_descs_packed(vq, n_buffers);
1878 			vhost_vring_call_packed(dev, vq);
1879 		} else {
1880 			write_back_completed_descs_split(vq, n_descs);
1881 			__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
1882 			vhost_vring_call_split(dev, vq);
1883 		}
1884 	} else {
1885 		if (vq_is_packed(dev)) {
1886 			async->last_buffer_idx_packed += n_buffers;
1887 			if (async->last_buffer_idx_packed >= vq->size)
1888 				async->last_buffer_idx_packed -= vq->size;
1889 		} else {
1890 			async->last_desc_idx_split += n_descs;
1891 		}
1892 	}
1893 
1894 	return n_cpl;
1895 }
1896 
1897 uint16_t
1898 rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
1899 		struct rte_mbuf **pkts, uint16_t count)
1900 {
1901 	struct virtio_net *dev = get_device(vid);
1902 	struct vhost_virtqueue *vq;
1903 	uint16_t n_pkts_cpl = 0;
1904 
1905 	if (unlikely(!dev))
1906 		return 0;
1907 
1908 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1909 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1910 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1911 			dev->vid, __func__, queue_id);
1912 		return 0;
1913 	}
1914 
1915 	vq = dev->virtqueue[queue_id];
1916 
1917 	if (unlikely(!vq->async)) {
1918 		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
1919 			dev->vid, __func__, queue_id);
1920 		return 0;
1921 	}
1922 
1923 	rte_spinlock_lock(&vq->access_lock);
1924 
1925 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
1926 
1927 	rte_spinlock_unlock(&vq->access_lock);
1928 
1929 	return n_pkts_cpl;
1930 }
1931 
1932 uint16_t
1933 rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
1934 		struct rte_mbuf **pkts, uint16_t count)
1935 {
1936 	struct virtio_net *dev = get_device(vid);
1937 	struct vhost_virtqueue *vq;
1938 	uint16_t n_pkts_cpl = 0;
1939 
1940 	if (!dev)
1941 		return 0;
1942 
1943 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1944 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1945 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1946 			dev->vid, __func__, queue_id);
1947 		return 0;
1948 	}
1949 
1950 	vq = dev->virtqueue[queue_id];
1951 
1952 	if (unlikely(!vq->async)) {
1953 		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
1954 			dev->vid, __func__, queue_id);
1955 		return 0;
1956 	}
1957 
1958 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
1959 
1960 	return n_pkts_cpl;
1961 }
1962 
1963 static __rte_always_inline uint32_t
1964 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
1965 	struct rte_mbuf **pkts, uint32_t count)
1966 {
1967 	struct vhost_virtqueue *vq;
1968 	uint32_t nb_tx = 0;
1969 
1970 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1971 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1972 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1973 			dev->vid, __func__, queue_id);
1974 		return 0;
1975 	}
1976 
1977 	vq = dev->virtqueue[queue_id];
1978 
1979 	rte_spinlock_lock(&vq->access_lock);
1980 
1981 	if (unlikely(!vq->enabled || !vq->async))
1982 		goto out_access_unlock;
1983 
1984 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1985 		vhost_user_iotlb_rd_lock(vq);
1986 
1987 	if (unlikely(!vq->access_ok))
1988 		if (unlikely(vring_translate(dev, vq) < 0))
1989 			goto out;
1990 
1991 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1992 	if (count == 0)
1993 		goto out;
1994 
1995 	if (vq_is_packed(dev))
1996 		nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, queue_id,
1997 				pkts, count);
1998 	else
1999 		nb_tx = virtio_dev_rx_async_submit_split(dev, vq, queue_id,
2000 				pkts, count);
2001 
2002 out:
2003 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2004 		vhost_user_iotlb_rd_unlock(vq);
2005 
2006 out_access_unlock:
2007 	rte_spinlock_unlock(&vq->access_lock);
2008 
2009 	return nb_tx;
2010 }
2011 
2012 uint16_t
2013 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2014 		struct rte_mbuf **pkts, uint16_t count)
2015 {
2016 	struct virtio_net *dev = get_device(vid);
2017 
2018 	if (!dev)
2019 		return 0;
2020 
2021 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2022 		VHOST_LOG_DATA(ERR,
2023 			"(%d) %s: built-in vhost net backend is disabled.\n",
2024 			dev->vid, __func__);
2025 		return 0;
2026 	}
2027 
2028 	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
2029 }
2030 
2031 static inline bool
2032 virtio_net_with_host_offload(struct virtio_net *dev)
2033 {
2034 	if (dev->features &
2035 			((1ULL << VIRTIO_NET_F_CSUM) |
2036 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2037 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2038 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2039 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2040 		return true;
2041 
2042 	return false;
2043 }
2044 
2045 static int
2046 parse_headers(struct rte_mbuf *m, uint8_t *l4_proto)
2047 {
2048 	struct rte_ipv4_hdr *ipv4_hdr;
2049 	struct rte_ipv6_hdr *ipv6_hdr;
2050 	struct rte_ether_hdr *eth_hdr;
2051 	uint16_t ethertype;
2052 	uint16_t data_len = rte_pktmbuf_data_len(m);
2053 
2054 	if (data_len < sizeof(struct rte_ether_hdr))
2055 		return -EINVAL;
2056 
2057 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2058 
2059 	m->l2_len = sizeof(struct rte_ether_hdr);
2060 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2061 
2062 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2063 		if (data_len < sizeof(struct rte_ether_hdr) +
2064 				sizeof(struct rte_vlan_hdr))
2065 			goto error;
2066 
2067 		struct rte_vlan_hdr *vlan_hdr =
2068 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2069 
2070 		m->l2_len += sizeof(struct rte_vlan_hdr);
2071 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2072 	}
2073 
2074 	switch (ethertype) {
2075 	case RTE_ETHER_TYPE_IPV4:
2076 		if (data_len < m->l2_len + sizeof(struct rte_ipv4_hdr))
2077 			goto error;
2078 		ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
2079 				m->l2_len);
2080 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2081 		if (data_len < m->l2_len + m->l3_len)
2082 			goto error;
2083 		m->ol_flags |= RTE_MBUF_F_TX_IPV4;
2084 		*l4_proto = ipv4_hdr->next_proto_id;
2085 		break;
2086 	case RTE_ETHER_TYPE_IPV6:
2087 		if (data_len < m->l2_len + sizeof(struct rte_ipv6_hdr))
2088 			goto error;
2089 		ipv6_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
2090 				m->l2_len);
2091 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2092 		m->ol_flags |= RTE_MBUF_F_TX_IPV6;
2093 		*l4_proto = ipv6_hdr->proto;
2094 		break;
2095 	default:
2096 		/* a valid L3 header is needed for further L4 parsing */
2097 		goto error;
2098 	}
2099 
2100 	/* both CSUM and GSO need a valid L4 header */
2101 	switch (*l4_proto) {
2102 	case IPPROTO_TCP:
2103 		if (data_len < m->l2_len + m->l3_len +
2104 				sizeof(struct rte_tcp_hdr))
2105 			goto error;
2106 		break;
2107 	case IPPROTO_UDP:
2108 		if (data_len < m->l2_len + m->l3_len +
2109 				sizeof(struct rte_udp_hdr))
2110 			goto error;
2111 		break;
2112 	case IPPROTO_SCTP:
2113 		if (data_len < m->l2_len + m->l3_len +
2114 				sizeof(struct rte_sctp_hdr))
2115 			goto error;
2116 		break;
2117 	default:
2118 		goto error;
2119 	}
2120 
2121 	return 0;
2122 
2123 error:
2124 	m->l2_len = 0;
2125 	m->l3_len = 0;
2126 	m->ol_flags = 0;
2127 	return -EINVAL;
2128 }
2129 
2130 static __rte_always_inline void
2131 vhost_dequeue_offload_legacy(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
2132 {
2133 	uint8_t l4_proto = 0;
2134 	struct rte_tcp_hdr *tcp_hdr = NULL;
2135 	uint16_t tcp_len;
2136 	uint16_t data_len = rte_pktmbuf_data_len(m);
2137 
2138 	if (parse_headers(m, &l4_proto) < 0)
2139 		return;
2140 
2141 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2142 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2143 			switch (hdr->csum_offset) {
2144 			case (offsetof(struct rte_tcp_hdr, cksum)):
2145 				if (l4_proto != IPPROTO_TCP)
2146 					goto error;
2147 				m->ol_flags |= RTE_MBUF_F_TX_TCP_CKSUM;
2148 				break;
2149 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2150 				if (l4_proto != IPPROTO_UDP)
2151 					goto error;
2152 				m->ol_flags |= RTE_MBUF_F_TX_UDP_CKSUM;
2153 				break;
2154 			case (offsetof(struct rte_sctp_hdr, cksum)):
2155 				if (l4_proto != IPPROTO_SCTP)
2156 					goto error;
2157 				m->ol_flags |= RTE_MBUF_F_TX_SCTP_CKSUM;
2158 				break;
2159 			default:
2160 				goto error;
2161 			}
2162 		} else {
2163 			goto error;
2164 		}
2165 	}
2166 
2167 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2168 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2169 		case VIRTIO_NET_HDR_GSO_TCPV4:
2170 		case VIRTIO_NET_HDR_GSO_TCPV6:
2171 			if (l4_proto != IPPROTO_TCP)
2172 				goto error;
2173 			tcp_hdr = rte_pktmbuf_mtod_offset(m,
2174 					struct rte_tcp_hdr *,
2175 					m->l2_len + m->l3_len);
2176 			tcp_len = (tcp_hdr->data_off & 0xf0) >> 2;
2177 			if (data_len < m->l2_len + m->l3_len + tcp_len)
2178 				goto error;
2179 			m->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
2180 			m->tso_segsz = hdr->gso_size;
2181 			m->l4_len = tcp_len;
2182 			break;
2183 		case VIRTIO_NET_HDR_GSO_UDP:
2184 			if (l4_proto != IPPROTO_UDP)
2185 				goto error;
2186 			m->ol_flags |= RTE_MBUF_F_TX_UDP_SEG;
2187 			m->tso_segsz = hdr->gso_size;
2188 			m->l4_len = sizeof(struct rte_udp_hdr);
2189 			break;
2190 		default:
2191 			VHOST_LOG_DATA(WARNING,
2192 				"unsupported gso type %u.\n", hdr->gso_type);
2193 			goto error;
2194 		}
2195 	}
2196 	return;
2197 
2198 error:
2199 	m->l2_len = 0;
2200 	m->l3_len = 0;
2201 	m->ol_flags = 0;
2202 }
2203 
2204 static __rte_always_inline void
2205 vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m,
2206 	bool legacy_ol_flags)
2207 {
2208 	struct rte_net_hdr_lens hdr_lens;
2209 	int l4_supported = 0;
2210 	uint32_t ptype;
2211 
2212 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2213 		return;
2214 
2215 	if (legacy_ol_flags) {
2216 		vhost_dequeue_offload_legacy(hdr, m);
2217 		return;
2218 	}
2219 
2220 	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
2221 
2222 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2223 	m->packet_type = ptype;
2224 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2225 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2226 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2227 		l4_supported = 1;
2228 
2229 	/* According to Virtio 1.1 spec, the device only needs to look at
2230 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2231 	 * This differs from the processing incoming packets path where the
2232 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2233 	 * device.
2234 	 *
2235 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2236 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2237 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2238 	 *
2239 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2240 	 * The device MUST ignore flag bits that it does not recognize.
2241 	 */
2242 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2243 		uint32_t hdrlen;
2244 
2245 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2246 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2247 			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
2248 		} else {
2249 			/* Unknown proto or tunnel, do sw cksum. We can assume
2250 			 * the cksum field is in the first segment since the
2251 			 * buffers we provided to the host are large enough.
2252 			 * In case of SCTP, this will be wrong since it's a CRC
2253 			 * but there's nothing we can do.
2254 			 */
2255 			uint16_t csum = 0, off;
2256 
2257 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2258 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2259 				return;
2260 			if (likely(csum != 0xffff))
2261 				csum = ~csum;
2262 			off = hdr->csum_offset + hdr->csum_start;
2263 			if (rte_pktmbuf_data_len(m) >= off + 1)
2264 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2265 		}
2266 	}
2267 
2268 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2269 		if (hdr->gso_size == 0)
2270 			return;
2271 
2272 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2273 		case VIRTIO_NET_HDR_GSO_TCPV4:
2274 		case VIRTIO_NET_HDR_GSO_TCPV6:
2275 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2276 				break;
2277 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2278 			m->tso_segsz = hdr->gso_size;
2279 			break;
2280 		case VIRTIO_NET_HDR_GSO_UDP:
2281 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2282 				break;
2283 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2284 			m->tso_segsz = hdr->gso_size;
2285 			break;
2286 		default:
2287 			break;
2288 		}
2289 	}
2290 }
2291 
2292 static __rte_noinline void
2293 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2294 		struct buf_vector *buf_vec)
2295 {
2296 	uint64_t len;
2297 	uint64_t remain = sizeof(struct virtio_net_hdr);
2298 	uint64_t src;
2299 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2300 
2301 	while (remain) {
2302 		len = RTE_MIN(remain, buf_vec->buf_len);
2303 		src = buf_vec->buf_addr;
2304 		rte_memcpy((void *)(uintptr_t)dst,
2305 				(void *)(uintptr_t)src, len);
2306 
2307 		remain -= len;
2308 		dst += len;
2309 		buf_vec++;
2310 	}
2311 }
2312 
2313 static __rte_always_inline int
2314 copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2315 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2316 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2317 		  bool legacy_ol_flags)
2318 {
2319 	uint32_t buf_avail, buf_offset;
2320 	uint64_t buf_addr, buf_len;
2321 	uint32_t mbuf_avail, mbuf_offset;
2322 	uint32_t cpy_len;
2323 	struct rte_mbuf *cur = m, *prev = m;
2324 	struct virtio_net_hdr tmp_hdr;
2325 	struct virtio_net_hdr *hdr = NULL;
2326 	/* A counter to avoid desc dead loop chain */
2327 	uint16_t vec_idx = 0;
2328 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
2329 	int error = 0;
2330 
2331 	buf_addr = buf_vec[vec_idx].buf_addr;
2332 	buf_len = buf_vec[vec_idx].buf_len;
2333 
2334 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
2335 		error = -1;
2336 		goto out;
2337 	}
2338 
2339 	if (virtio_net_with_host_offload(dev)) {
2340 		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
2341 			/*
2342 			 * No luck, the virtio-net header doesn't fit
2343 			 * in a contiguous virtual area.
2344 			 */
2345 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2346 			hdr = &tmp_hdr;
2347 		} else {
2348 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
2349 		}
2350 	}
2351 
2352 	/*
2353 	 * A virtio driver normally uses at least 2 desc buffers
2354 	 * for Tx: the first for storing the header, and others
2355 	 * for storing the data.
2356 	 */
2357 	if (unlikely(buf_len < dev->vhost_hlen)) {
2358 		buf_offset = dev->vhost_hlen - buf_len;
2359 		vec_idx++;
2360 		buf_addr = buf_vec[vec_idx].buf_addr;
2361 		buf_len = buf_vec[vec_idx].buf_len;
2362 		buf_avail  = buf_len - buf_offset;
2363 	} else if (buf_len == dev->vhost_hlen) {
2364 		if (unlikely(++vec_idx >= nr_vec))
2365 			goto out;
2366 		buf_addr = buf_vec[vec_idx].buf_addr;
2367 		buf_len = buf_vec[vec_idx].buf_len;
2368 
2369 		buf_offset = 0;
2370 		buf_avail = buf_len;
2371 	} else {
2372 		buf_offset = dev->vhost_hlen;
2373 		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
2374 	}
2375 
2376 	PRINT_PACKET(dev,
2377 			(uintptr_t)(buf_addr + buf_offset),
2378 			(uint32_t)buf_avail, 0);
2379 
2380 	mbuf_offset = 0;
2381 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2382 	while (1) {
2383 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2384 
2385 		if (likely(cpy_len > MAX_BATCH_LEN ||
2386 					vq->batch_copy_nb_elems >= vq->size ||
2387 					(hdr && cur == m))) {
2388 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
2389 						mbuf_offset),
2390 					(void *)((uintptr_t)(buf_addr +
2391 							buf_offset)), cpy_len);
2392 		} else {
2393 			batch_copy[vq->batch_copy_nb_elems].dst =
2394 				rte_pktmbuf_mtod_offset(cur, void *,
2395 						mbuf_offset);
2396 			batch_copy[vq->batch_copy_nb_elems].src =
2397 				(void *)((uintptr_t)(buf_addr + buf_offset));
2398 			batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
2399 			vq->batch_copy_nb_elems++;
2400 		}
2401 
2402 		mbuf_avail  -= cpy_len;
2403 		mbuf_offset += cpy_len;
2404 		buf_avail -= cpy_len;
2405 		buf_offset += cpy_len;
2406 
2407 		/* This buf reaches to its end, get the next one */
2408 		if (buf_avail == 0) {
2409 			if (++vec_idx >= nr_vec)
2410 				break;
2411 
2412 			buf_addr = buf_vec[vec_idx].buf_addr;
2413 			buf_len = buf_vec[vec_idx].buf_len;
2414 
2415 			buf_offset = 0;
2416 			buf_avail  = buf_len;
2417 
2418 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2419 					(uint32_t)buf_avail, 0);
2420 		}
2421 
2422 		/*
2423 		 * This mbuf reaches to its end, get a new one
2424 		 * to hold more data.
2425 		 */
2426 		if (mbuf_avail == 0) {
2427 			cur = rte_pktmbuf_alloc(mbuf_pool);
2428 			if (unlikely(cur == NULL)) {
2429 				VHOST_LOG_DATA(ERR, "Failed to "
2430 					"allocate memory for mbuf.\n");
2431 				error = -1;
2432 				goto out;
2433 			}
2434 
2435 			prev->next = cur;
2436 			prev->data_len = mbuf_offset;
2437 			m->nb_segs += 1;
2438 			m->pkt_len += mbuf_offset;
2439 			prev = cur;
2440 
2441 			mbuf_offset = 0;
2442 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
2443 		}
2444 	}
2445 
2446 	prev->data_len = mbuf_offset;
2447 	m->pkt_len    += mbuf_offset;
2448 
2449 	if (hdr)
2450 		vhost_dequeue_offload(hdr, m, legacy_ol_flags);
2451 
2452 out:
2453 
2454 	return error;
2455 }
2456 
2457 static void
2458 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
2459 {
2460 	rte_free(opaque);
2461 }
2462 
2463 static int
2464 virtio_dev_extbuf_alloc(struct rte_mbuf *pkt, uint32_t size)
2465 {
2466 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
2467 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
2468 	uint16_t buf_len;
2469 	rte_iova_t iova;
2470 	void *buf;
2471 
2472 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
2473 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
2474 
2475 	if (unlikely(total_len > UINT16_MAX))
2476 		return -ENOSPC;
2477 
2478 	buf_len = total_len;
2479 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
2480 	if (unlikely(buf == NULL))
2481 		return -ENOMEM;
2482 
2483 	/* Initialize shinfo */
2484 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
2485 						virtio_dev_extbuf_free, buf);
2486 	if (unlikely(shinfo == NULL)) {
2487 		rte_free(buf);
2488 		VHOST_LOG_DATA(ERR, "Failed to init shinfo\n");
2489 		return -1;
2490 	}
2491 
2492 	iova = rte_malloc_virt2iova(buf);
2493 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
2494 	rte_pktmbuf_reset_headroom(pkt);
2495 
2496 	return 0;
2497 }
2498 
2499 /*
2500  * Prepare a host supported pktmbuf.
2501  */
2502 static __rte_always_inline int
2503 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
2504 			 uint32_t data_len)
2505 {
2506 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
2507 		return 0;
2508 
2509 	/* attach an external buffer if supported */
2510 	if (dev->extbuf && !virtio_dev_extbuf_alloc(pkt, data_len))
2511 		return 0;
2512 
2513 	/* check if chained buffers are allowed */
2514 	if (!dev->linearbuf)
2515 		return 0;
2516 
2517 	return -1;
2518 }
2519 
2520 __rte_always_inline
2521 static uint16_t
2522 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
2523 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
2524 	bool legacy_ol_flags)
2525 {
2526 	uint16_t i;
2527 	uint16_t free_entries;
2528 	uint16_t dropped = 0;
2529 	static bool allocerr_warned;
2530 
2531 	/*
2532 	 * The ordering between avail index and
2533 	 * desc reads needs to be enforced.
2534 	 */
2535 	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
2536 			vq->last_avail_idx;
2537 	if (free_entries == 0)
2538 		return 0;
2539 
2540 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
2541 
2542 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
2543 
2544 	count = RTE_MIN(count, MAX_PKT_BURST);
2545 	count = RTE_MIN(count, free_entries);
2546 	VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n",
2547 			dev->vid, count);
2548 
2549 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
2550 		return 0;
2551 
2552 	for (i = 0; i < count; i++) {
2553 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
2554 		uint16_t head_idx;
2555 		uint32_t buf_len;
2556 		uint16_t nr_vec = 0;
2557 		int err;
2558 
2559 		if (unlikely(fill_vec_buf_split(dev, vq,
2560 						vq->last_avail_idx + i,
2561 						&nr_vec, buf_vec,
2562 						&head_idx, &buf_len,
2563 						VHOST_ACCESS_RO) < 0))
2564 			break;
2565 
2566 		update_shadow_used_ring_split(vq, head_idx, 0);
2567 
2568 		err = virtio_dev_pktmbuf_prep(dev, pkts[i], buf_len);
2569 		if (unlikely(err)) {
2570 			/*
2571 			 * mbuf allocation fails for jumbo packets when external
2572 			 * buffer allocation is not allowed and linear buffer
2573 			 * is required. Drop this packet.
2574 			 */
2575 			if (!allocerr_warned) {
2576 				VHOST_LOG_DATA(ERR,
2577 					"Failed mbuf alloc of size %d from %s on %s.\n",
2578 					buf_len, mbuf_pool->name, dev->ifname);
2579 				allocerr_warned = true;
2580 			}
2581 			dropped += 1;
2582 			i++;
2583 			break;
2584 		}
2585 
2586 		err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
2587 				mbuf_pool, legacy_ol_flags);
2588 		if (unlikely(err)) {
2589 			if (!allocerr_warned) {
2590 				VHOST_LOG_DATA(ERR,
2591 					"Failed to copy desc to mbuf on %s.\n",
2592 					dev->ifname);
2593 				allocerr_warned = true;
2594 			}
2595 			dropped += 1;
2596 			i++;
2597 			break;
2598 		}
2599 	}
2600 
2601 	if (dropped)
2602 		rte_pktmbuf_free_bulk(&pkts[i - 1], count - i + 1);
2603 
2604 	vq->last_avail_idx += i;
2605 
2606 	do_data_copy_dequeue(vq);
2607 	if (unlikely(i < count))
2608 		vq->shadow_used_idx = i;
2609 	if (likely(vq->shadow_used_idx)) {
2610 		flush_shadow_used_ring_split(dev, vq);
2611 		vhost_vring_call_split(dev, vq);
2612 	}
2613 
2614 	return (i - dropped);
2615 }
2616 
2617 __rte_noinline
2618 static uint16_t
2619 virtio_dev_tx_split_legacy(struct virtio_net *dev,
2620 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2621 	struct rte_mbuf **pkts, uint16_t count)
2622 {
2623 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
2624 }
2625 
2626 __rte_noinline
2627 static uint16_t
2628 virtio_dev_tx_split_compliant(struct virtio_net *dev,
2629 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2630 	struct rte_mbuf **pkts, uint16_t count)
2631 {
2632 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
2633 }
2634 
2635 static __rte_always_inline int
2636 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
2637 				 struct vhost_virtqueue *vq,
2638 				 struct rte_mbuf **pkts,
2639 				 uint16_t avail_idx,
2640 				 uintptr_t *desc_addrs,
2641 				 uint16_t *ids)
2642 {
2643 	bool wrap = vq->avail_wrap_counter;
2644 	struct vring_packed_desc *descs = vq->desc_packed;
2645 	uint64_t lens[PACKED_BATCH_SIZE];
2646 	uint64_t buf_lens[PACKED_BATCH_SIZE];
2647 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2648 	uint16_t flags, i;
2649 
2650 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
2651 		return -1;
2652 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
2653 		return -1;
2654 
2655 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2656 		flags = descs[avail_idx + i].flags;
2657 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
2658 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
2659 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
2660 			return -1;
2661 	}
2662 
2663 	rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
2664 
2665 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2666 		lens[i] = descs[avail_idx + i].len;
2667 
2668 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2669 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
2670 						  descs[avail_idx + i].addr,
2671 						  &lens[i], VHOST_ACCESS_RW);
2672 	}
2673 
2674 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2675 		if (unlikely(!desc_addrs[i]))
2676 			return -1;
2677 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
2678 			return -1;
2679 	}
2680 
2681 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2682 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
2683 			goto err;
2684 	}
2685 
2686 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2687 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
2688 
2689 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2690 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
2691 			goto err;
2692 	}
2693 
2694 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2695 		pkts[i]->pkt_len = lens[i] - buf_offset;
2696 		pkts[i]->data_len = pkts[i]->pkt_len;
2697 		ids[i] = descs[avail_idx + i].id;
2698 	}
2699 
2700 	return 0;
2701 
2702 err:
2703 	return -1;
2704 }
2705 
2706 static __rte_always_inline int
2707 virtio_dev_tx_batch_packed(struct virtio_net *dev,
2708 			   struct vhost_virtqueue *vq,
2709 			   struct rte_mbuf **pkts,
2710 			   bool legacy_ol_flags)
2711 {
2712 	uint16_t avail_idx = vq->last_avail_idx;
2713 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2714 	struct virtio_net_hdr *hdr;
2715 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
2716 	uint16_t ids[PACKED_BATCH_SIZE];
2717 	uint16_t i;
2718 
2719 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
2720 					     desc_addrs, ids))
2721 		return -1;
2722 
2723 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2724 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2725 
2726 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2727 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
2728 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
2729 			   pkts[i]->pkt_len);
2730 
2731 	if (virtio_net_with_host_offload(dev)) {
2732 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2733 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
2734 			vhost_dequeue_offload(hdr, pkts[i], legacy_ol_flags);
2735 		}
2736 	}
2737 
2738 	if (virtio_net_is_inorder(dev))
2739 		vhost_shadow_dequeue_batch_packed_inorder(vq,
2740 			ids[PACKED_BATCH_SIZE - 1]);
2741 	else
2742 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
2743 
2744 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2745 
2746 	return 0;
2747 }
2748 
2749 static __rte_always_inline int
2750 vhost_dequeue_single_packed(struct virtio_net *dev,
2751 			    struct vhost_virtqueue *vq,
2752 			    struct rte_mempool *mbuf_pool,
2753 			    struct rte_mbuf *pkts,
2754 			    uint16_t *buf_id,
2755 			    uint16_t *desc_count,
2756 			    bool legacy_ol_flags)
2757 {
2758 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
2759 	uint32_t buf_len;
2760 	uint16_t nr_vec = 0;
2761 	int err;
2762 	static bool allocerr_warned;
2763 
2764 	if (unlikely(fill_vec_buf_packed(dev, vq,
2765 					 vq->last_avail_idx, desc_count,
2766 					 buf_vec, &nr_vec,
2767 					 buf_id, &buf_len,
2768 					 VHOST_ACCESS_RO) < 0))
2769 		return -1;
2770 
2771 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
2772 		if (!allocerr_warned) {
2773 			VHOST_LOG_DATA(ERR,
2774 				"Failed mbuf alloc of size %d from %s on %s.\n",
2775 				buf_len, mbuf_pool->name, dev->ifname);
2776 			allocerr_warned = true;
2777 		}
2778 		return -1;
2779 	}
2780 
2781 	err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
2782 				mbuf_pool, legacy_ol_flags);
2783 	if (unlikely(err)) {
2784 		if (!allocerr_warned) {
2785 			VHOST_LOG_DATA(ERR,
2786 				"Failed to copy desc to mbuf on %s.\n",
2787 				dev->ifname);
2788 			allocerr_warned = true;
2789 		}
2790 		return -1;
2791 	}
2792 
2793 	return 0;
2794 }
2795 
2796 static __rte_always_inline int
2797 virtio_dev_tx_single_packed(struct virtio_net *dev,
2798 			    struct vhost_virtqueue *vq,
2799 			    struct rte_mempool *mbuf_pool,
2800 			    struct rte_mbuf *pkts,
2801 			    bool legacy_ol_flags)
2802 {
2803 
2804 	uint16_t buf_id, desc_count = 0;
2805 	int ret;
2806 
2807 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
2808 					&desc_count, legacy_ol_flags);
2809 
2810 	if (likely(desc_count > 0)) {
2811 		if (virtio_net_is_inorder(dev))
2812 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
2813 								   desc_count);
2814 		else
2815 			vhost_shadow_dequeue_single_packed(vq, buf_id,
2816 					desc_count);
2817 
2818 		vq_inc_last_avail_packed(vq, desc_count);
2819 	}
2820 
2821 	return ret;
2822 }
2823 
2824 __rte_always_inline
2825 static uint16_t
2826 virtio_dev_tx_packed(struct virtio_net *dev,
2827 		     struct vhost_virtqueue *__rte_restrict vq,
2828 		     struct rte_mempool *mbuf_pool,
2829 		     struct rte_mbuf **__rte_restrict pkts,
2830 		     uint32_t count,
2831 		     bool legacy_ol_flags)
2832 {
2833 	uint32_t pkt_idx = 0;
2834 
2835 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
2836 		return 0;
2837 
2838 	do {
2839 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2840 
2841 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
2842 			if (!virtio_dev_tx_batch_packed(dev, vq,
2843 							&pkts[pkt_idx],
2844 							legacy_ol_flags)) {
2845 				pkt_idx += PACKED_BATCH_SIZE;
2846 				continue;
2847 			}
2848 		}
2849 
2850 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
2851 						pkts[pkt_idx],
2852 						legacy_ol_flags))
2853 			break;
2854 		pkt_idx++;
2855 	} while (pkt_idx < count);
2856 
2857 	if (pkt_idx != count)
2858 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
2859 
2860 	if (vq->shadow_used_idx) {
2861 		do_data_copy_dequeue(vq);
2862 
2863 		vhost_flush_dequeue_shadow_packed(dev, vq);
2864 		vhost_vring_call_packed(dev, vq);
2865 	}
2866 
2867 	return pkt_idx;
2868 }
2869 
2870 __rte_noinline
2871 static uint16_t
2872 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
2873 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
2874 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
2875 {
2876 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
2877 }
2878 
2879 __rte_noinline
2880 static uint16_t
2881 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
2882 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
2883 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
2884 {
2885 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
2886 }
2887 
2888 uint16_t
2889 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
2890 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
2891 {
2892 	struct virtio_net *dev;
2893 	struct rte_mbuf *rarp_mbuf = NULL;
2894 	struct vhost_virtqueue *vq;
2895 	int16_t success = 1;
2896 
2897 	dev = get_device(vid);
2898 	if (!dev)
2899 		return 0;
2900 
2901 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2902 		VHOST_LOG_DATA(ERR,
2903 			"(%d) %s: built-in vhost net backend is disabled.\n",
2904 			dev->vid, __func__);
2905 		return 0;
2906 	}
2907 
2908 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
2909 		VHOST_LOG_DATA(ERR,
2910 			"(%d) %s: invalid virtqueue idx %d.\n",
2911 			dev->vid, __func__, queue_id);
2912 		return 0;
2913 	}
2914 
2915 	vq = dev->virtqueue[queue_id];
2916 
2917 	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
2918 		return 0;
2919 
2920 	if (unlikely(!vq->enabled)) {
2921 		count = 0;
2922 		goto out_access_unlock;
2923 	}
2924 
2925 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2926 		vhost_user_iotlb_rd_lock(vq);
2927 
2928 	if (unlikely(!vq->access_ok))
2929 		if (unlikely(vring_translate(dev, vq) < 0)) {
2930 			count = 0;
2931 			goto out;
2932 		}
2933 
2934 	/*
2935 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
2936 	 * array, to looks like that guest actually send such packet.
2937 	 *
2938 	 * Check user_send_rarp() for more information.
2939 	 *
2940 	 * broadcast_rarp shares a cacheline in the virtio_net structure
2941 	 * with some fields that are accessed during enqueue and
2942 	 * __atomic_compare_exchange_n causes a write if performed compare
2943 	 * and exchange. This could result in false sharing between enqueue
2944 	 * and dequeue.
2945 	 *
2946 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
2947 	 * and only performing compare and exchange if the read indicates it
2948 	 * is likely to be set.
2949 	 */
2950 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
2951 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
2952 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
2953 
2954 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
2955 		if (rarp_mbuf == NULL) {
2956 			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
2957 			count = 0;
2958 			goto out;
2959 		}
2960 		/*
2961 		 * Inject it to the head of "pkts" array, so that switch's mac
2962 		 * learning table will get updated first.
2963 		 */
2964 		pkts[0] = rarp_mbuf;
2965 		pkts++;
2966 		count -= 1;
2967 	}
2968 
2969 	if (vq_is_packed(dev)) {
2970 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
2971 			count = virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool, pkts, count);
2972 		else
2973 			count = virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool, pkts, count);
2974 	} else {
2975 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
2976 			count = virtio_dev_tx_split_legacy(dev, vq, mbuf_pool, pkts, count);
2977 		else
2978 			count = virtio_dev_tx_split_compliant(dev, vq, mbuf_pool, pkts, count);
2979 	}
2980 
2981 out:
2982 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2983 		vhost_user_iotlb_rd_unlock(vq);
2984 
2985 out_access_unlock:
2986 	rte_spinlock_unlock(&vq->access_lock);
2987 
2988 	if (unlikely(rarp_mbuf != NULL))
2989 		count += 1;
2990 
2991 	return count;
2992 }
2993