xref: /dpdk/lib/vhost/virtio_net.c (revision 02363e1f2c35c9ddf8fa47c955a655a53d280986)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_net.h>
12 #include <rte_ether.h>
13 #include <rte_ip.h>
14 #include <rte_vhost.h>
15 #include <rte_tcp.h>
16 #include <rte_udp.h>
17 #include <rte_sctp.h>
18 #include <rte_arp.h>
19 #include <rte_spinlock.h>
20 #include <rte_malloc.h>
21 #include <rte_vhost_async.h>
22 
23 #include "iotlb.h"
24 #include "vhost.h"
25 
26 #define MAX_BATCH_LEN 256
27 
28 static  __rte_always_inline bool
29 rxvq_is_mergeable(struct virtio_net *dev)
30 {
31 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
32 }
33 
34 static  __rte_always_inline bool
35 virtio_net_is_inorder(struct virtio_net *dev)
36 {
37 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
38 }
39 
40 static bool
41 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
42 {
43 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
44 }
45 
46 static inline void
47 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
48 {
49 	struct batch_copy_elem *elem = vq->batch_copy_elems;
50 	uint16_t count = vq->batch_copy_nb_elems;
51 	int i;
52 
53 	for (i = 0; i < count; i++) {
54 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
55 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
56 					   elem[i].len);
57 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
58 	}
59 
60 	vq->batch_copy_nb_elems = 0;
61 }
62 
63 static inline void
64 do_data_copy_dequeue(struct vhost_virtqueue *vq)
65 {
66 	struct batch_copy_elem *elem = vq->batch_copy_elems;
67 	uint16_t count = vq->batch_copy_nb_elems;
68 	int i;
69 
70 	for (i = 0; i < count; i++)
71 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
72 
73 	vq->batch_copy_nb_elems = 0;
74 }
75 
76 static __rte_always_inline void
77 do_flush_shadow_used_ring_split(struct virtio_net *dev,
78 			struct vhost_virtqueue *vq,
79 			uint16_t to, uint16_t from, uint16_t size)
80 {
81 	rte_memcpy(&vq->used->ring[to],
82 			&vq->shadow_used_split[from],
83 			size * sizeof(struct vring_used_elem));
84 	vhost_log_cache_used_vring(dev, vq,
85 			offsetof(struct vring_used, ring[to]),
86 			size * sizeof(struct vring_used_elem));
87 }
88 
89 static __rte_always_inline void
90 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
91 {
92 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
93 
94 	if (used_idx + vq->shadow_used_idx <= vq->size) {
95 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
96 					  vq->shadow_used_idx);
97 	} else {
98 		uint16_t size;
99 
100 		/* update used ring interval [used_idx, vq->size] */
101 		size = vq->size - used_idx;
102 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
103 
104 		/* update the left half used ring interval [0, left_size] */
105 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
106 					  vq->shadow_used_idx - size);
107 	}
108 	vq->last_used_idx += vq->shadow_used_idx;
109 
110 	vhost_log_cache_sync(dev, vq);
111 
112 	__atomic_add_fetch(&vq->used->idx, vq->shadow_used_idx,
113 			   __ATOMIC_RELEASE);
114 	vq->shadow_used_idx = 0;
115 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
116 		sizeof(vq->used->idx));
117 }
118 
119 static __rte_always_inline void
120 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
121 			 uint16_t desc_idx, uint32_t len)
122 {
123 	uint16_t i = vq->shadow_used_idx++;
124 
125 	vq->shadow_used_split[i].id  = desc_idx;
126 	vq->shadow_used_split[i].len = len;
127 }
128 
129 static __rte_always_inline void
130 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
131 				  struct vhost_virtqueue *vq)
132 {
133 	int i;
134 	uint16_t used_idx = vq->last_used_idx;
135 	uint16_t head_idx = vq->last_used_idx;
136 	uint16_t head_flags = 0;
137 
138 	/* Split loop in two to save memory barriers */
139 	for (i = 0; i < vq->shadow_used_idx; i++) {
140 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
141 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
142 
143 		used_idx += vq->shadow_used_packed[i].count;
144 		if (used_idx >= vq->size)
145 			used_idx -= vq->size;
146 	}
147 
148 	/* The ordering for storing desc flags needs to be enforced. */
149 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
150 
151 	for (i = 0; i < vq->shadow_used_idx; i++) {
152 		uint16_t flags;
153 
154 		if (vq->shadow_used_packed[i].len)
155 			flags = VRING_DESC_F_WRITE;
156 		else
157 			flags = 0;
158 
159 		if (vq->used_wrap_counter) {
160 			flags |= VRING_DESC_F_USED;
161 			flags |= VRING_DESC_F_AVAIL;
162 		} else {
163 			flags &= ~VRING_DESC_F_USED;
164 			flags &= ~VRING_DESC_F_AVAIL;
165 		}
166 
167 		if (i > 0) {
168 			vq->desc_packed[vq->last_used_idx].flags = flags;
169 
170 			vhost_log_cache_used_vring(dev, vq,
171 					vq->last_used_idx *
172 					sizeof(struct vring_packed_desc),
173 					sizeof(struct vring_packed_desc));
174 		} else {
175 			head_idx = vq->last_used_idx;
176 			head_flags = flags;
177 		}
178 
179 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
180 	}
181 
182 	vq->desc_packed[head_idx].flags = head_flags;
183 
184 	vhost_log_cache_used_vring(dev, vq,
185 				head_idx *
186 				sizeof(struct vring_packed_desc),
187 				sizeof(struct vring_packed_desc));
188 
189 	vq->shadow_used_idx = 0;
190 	vhost_log_cache_sync(dev, vq);
191 }
192 
193 static __rte_always_inline void
194 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
195 				  struct vhost_virtqueue *vq)
196 {
197 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
198 
199 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
200 	/* desc flags is the synchronization point for virtio packed vring */
201 	__atomic_store_n(&vq->desc_packed[vq->shadow_last_used_idx].flags,
202 			 used_elem->flags, __ATOMIC_RELEASE);
203 
204 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
205 				   sizeof(struct vring_packed_desc),
206 				   sizeof(struct vring_packed_desc));
207 	vq->shadow_used_idx = 0;
208 	vhost_log_cache_sync(dev, vq);
209 }
210 
211 static __rte_always_inline void
212 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
213 				 struct vhost_virtqueue *vq,
214 				 uint64_t *lens,
215 				 uint16_t *ids)
216 {
217 	uint16_t i;
218 	uint16_t flags;
219 	uint16_t last_used_idx;
220 	struct vring_packed_desc *desc_base;
221 
222 	last_used_idx = vq->last_used_idx;
223 	desc_base = &vq->desc_packed[last_used_idx];
224 
225 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
226 
227 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
228 		desc_base[i].id = ids[i];
229 		desc_base[i].len = lens[i];
230 	}
231 
232 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
233 
234 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
235 		desc_base[i].flags = flags;
236 	}
237 
238 	vhost_log_cache_used_vring(dev, vq, last_used_idx *
239 				   sizeof(struct vring_packed_desc),
240 				   sizeof(struct vring_packed_desc) *
241 				   PACKED_BATCH_SIZE);
242 	vhost_log_cache_sync(dev, vq);
243 
244 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
245 }
246 
247 static __rte_always_inline void
248 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
249 					  uint16_t id)
250 {
251 	vq->shadow_used_packed[0].id = id;
252 
253 	if (!vq->shadow_used_idx) {
254 		vq->shadow_last_used_idx = vq->last_used_idx;
255 		vq->shadow_used_packed[0].flags =
256 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
257 		vq->shadow_used_packed[0].len = 0;
258 		vq->shadow_used_packed[0].count = 1;
259 		vq->shadow_used_idx++;
260 	}
261 
262 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
263 }
264 
265 static __rte_always_inline void
266 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
267 				  struct vhost_virtqueue *vq,
268 				  uint16_t *ids)
269 {
270 	uint16_t flags;
271 	uint16_t i;
272 	uint16_t begin;
273 
274 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
275 
276 	if (!vq->shadow_used_idx) {
277 		vq->shadow_last_used_idx = vq->last_used_idx;
278 		vq->shadow_used_packed[0].id  = ids[0];
279 		vq->shadow_used_packed[0].len = 0;
280 		vq->shadow_used_packed[0].count = 1;
281 		vq->shadow_used_packed[0].flags = flags;
282 		vq->shadow_used_idx++;
283 		begin = 1;
284 	} else
285 		begin = 0;
286 
287 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
288 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
289 		vq->desc_packed[vq->last_used_idx + i].len = 0;
290 	}
291 
292 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
293 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
294 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
295 
296 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
297 				   sizeof(struct vring_packed_desc),
298 				   sizeof(struct vring_packed_desc) *
299 				   PACKED_BATCH_SIZE);
300 	vhost_log_cache_sync(dev, vq);
301 
302 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
303 }
304 
305 static __rte_always_inline void
306 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
307 				   uint16_t buf_id,
308 				   uint16_t count)
309 {
310 	uint16_t flags;
311 
312 	flags = vq->desc_packed[vq->last_used_idx].flags;
313 	if (vq->used_wrap_counter) {
314 		flags |= VRING_DESC_F_USED;
315 		flags |= VRING_DESC_F_AVAIL;
316 	} else {
317 		flags &= ~VRING_DESC_F_USED;
318 		flags &= ~VRING_DESC_F_AVAIL;
319 	}
320 
321 	if (!vq->shadow_used_idx) {
322 		vq->shadow_last_used_idx = vq->last_used_idx;
323 
324 		vq->shadow_used_packed[0].id  = buf_id;
325 		vq->shadow_used_packed[0].len = 0;
326 		vq->shadow_used_packed[0].flags = flags;
327 		vq->shadow_used_idx++;
328 	} else {
329 		vq->desc_packed[vq->last_used_idx].id = buf_id;
330 		vq->desc_packed[vq->last_used_idx].len = 0;
331 		vq->desc_packed[vq->last_used_idx].flags = flags;
332 	}
333 
334 	vq_inc_last_used_packed(vq, count);
335 }
336 
337 static __rte_always_inline void
338 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
339 					   uint16_t buf_id,
340 					   uint16_t count)
341 {
342 	uint16_t flags;
343 
344 	vq->shadow_used_packed[0].id = buf_id;
345 
346 	flags = vq->desc_packed[vq->last_used_idx].flags;
347 	if (vq->used_wrap_counter) {
348 		flags |= VRING_DESC_F_USED;
349 		flags |= VRING_DESC_F_AVAIL;
350 	} else {
351 		flags &= ~VRING_DESC_F_USED;
352 		flags &= ~VRING_DESC_F_AVAIL;
353 	}
354 
355 	if (!vq->shadow_used_idx) {
356 		vq->shadow_last_used_idx = vq->last_used_idx;
357 		vq->shadow_used_packed[0].len = 0;
358 		vq->shadow_used_packed[0].flags = flags;
359 		vq->shadow_used_idx++;
360 	}
361 
362 	vq_inc_last_used_packed(vq, count);
363 }
364 
365 static __rte_always_inline void
366 vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
367 				   uint32_t *len,
368 				   uint16_t *id,
369 				   uint16_t *count,
370 				   uint16_t num_buffers)
371 {
372 	uint16_t i;
373 
374 	for (i = 0; i < num_buffers; i++) {
375 		/* enqueue shadow flush action aligned with batch num */
376 		if (!vq->shadow_used_idx)
377 			vq->shadow_aligned_idx = vq->last_used_idx &
378 				PACKED_BATCH_MASK;
379 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
380 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
381 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
382 		vq->shadow_aligned_idx += count[i];
383 		vq->shadow_used_idx++;
384 	}
385 }
386 
387 static __rte_always_inline void
388 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
389 				   struct vhost_virtqueue *vq,
390 				   uint32_t *len,
391 				   uint16_t *id,
392 				   uint16_t *count,
393 				   uint16_t num_buffers)
394 {
395 	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
396 
397 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
398 		do_data_copy_enqueue(dev, vq);
399 		vhost_flush_enqueue_shadow_packed(dev, vq);
400 	}
401 }
402 
403 /* avoid write operation when necessary, to lessen cache issues */
404 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
405 	if ((var) != (val))			\
406 		(var) = (val);			\
407 } while (0)
408 
409 static __rte_always_inline void
410 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
411 {
412 	uint64_t csum_l4 = m_buf->ol_flags & RTE_MBUF_F_TX_L4_MASK;
413 
414 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
415 		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
416 
417 	if (csum_l4) {
418 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
419 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
420 
421 		switch (csum_l4) {
422 		case RTE_MBUF_F_TX_TCP_CKSUM:
423 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
424 						cksum));
425 			break;
426 		case RTE_MBUF_F_TX_UDP_CKSUM:
427 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
428 						dgram_cksum));
429 			break;
430 		case RTE_MBUF_F_TX_SCTP_CKSUM:
431 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
432 						cksum));
433 			break;
434 		}
435 	} else {
436 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
437 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
438 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
439 	}
440 
441 	/* IP cksum verification cannot be bypassed, then calculate here */
442 	if (m_buf->ol_flags & RTE_MBUF_F_TX_IP_CKSUM) {
443 		struct rte_ipv4_hdr *ipv4_hdr;
444 
445 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
446 						   m_buf->l2_len);
447 		ipv4_hdr->hdr_checksum = 0;
448 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
449 	}
450 
451 	if (m_buf->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
452 		if (m_buf->ol_flags & RTE_MBUF_F_TX_IPV4)
453 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
454 		else
455 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
456 		net_hdr->gso_size = m_buf->tso_segsz;
457 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
458 					+ m_buf->l4_len;
459 	} else if (m_buf->ol_flags & RTE_MBUF_F_TX_UDP_SEG) {
460 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
461 		net_hdr->gso_size = m_buf->tso_segsz;
462 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
463 			m_buf->l4_len;
464 	} else {
465 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
466 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
467 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
468 	}
469 }
470 
471 static __rte_always_inline int
472 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
473 		struct buf_vector *buf_vec, uint16_t *vec_idx,
474 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
475 {
476 	uint16_t vec_id = *vec_idx;
477 
478 	while (desc_len) {
479 		uint64_t desc_addr;
480 		uint64_t desc_chunck_len = desc_len;
481 
482 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
483 			return -1;
484 
485 		desc_addr = vhost_iova_to_vva(dev, vq,
486 				desc_iova,
487 				&desc_chunck_len,
488 				perm);
489 		if (unlikely(!desc_addr))
490 			return -1;
491 
492 		rte_prefetch0((void *)(uintptr_t)desc_addr);
493 
494 		buf_vec[vec_id].buf_iova = desc_iova;
495 		buf_vec[vec_id].buf_addr = desc_addr;
496 		buf_vec[vec_id].buf_len  = desc_chunck_len;
497 
498 		desc_len -= desc_chunck_len;
499 		desc_iova += desc_chunck_len;
500 		vec_id++;
501 	}
502 	*vec_idx = vec_id;
503 
504 	return 0;
505 }
506 
507 static __rte_always_inline int
508 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
509 			 uint32_t avail_idx, uint16_t *vec_idx,
510 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
511 			 uint32_t *desc_chain_len, uint8_t perm)
512 {
513 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
514 	uint16_t vec_id = *vec_idx;
515 	uint32_t len    = 0;
516 	uint64_t dlen;
517 	uint32_t nr_descs = vq->size;
518 	uint32_t cnt    = 0;
519 	struct vring_desc *descs = vq->desc;
520 	struct vring_desc *idesc = NULL;
521 
522 	if (unlikely(idx >= vq->size))
523 		return -1;
524 
525 	*desc_chain_head = idx;
526 
527 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
528 		dlen = vq->desc[idx].len;
529 		nr_descs = dlen / sizeof(struct vring_desc);
530 		if (unlikely(nr_descs > vq->size))
531 			return -1;
532 
533 		descs = (struct vring_desc *)(uintptr_t)
534 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
535 						&dlen,
536 						VHOST_ACCESS_RO);
537 		if (unlikely(!descs))
538 			return -1;
539 
540 		if (unlikely(dlen < vq->desc[idx].len)) {
541 			/*
542 			 * The indirect desc table is not contiguous
543 			 * in process VA space, we have to copy it.
544 			 */
545 			idesc = vhost_alloc_copy_ind_table(dev, vq,
546 					vq->desc[idx].addr, vq->desc[idx].len);
547 			if (unlikely(!idesc))
548 				return -1;
549 
550 			descs = idesc;
551 		}
552 
553 		idx = 0;
554 	}
555 
556 	while (1) {
557 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
558 			free_ind_table(idesc);
559 			return -1;
560 		}
561 
562 		dlen = descs[idx].len;
563 		len += dlen;
564 
565 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
566 						descs[idx].addr, dlen,
567 						perm))) {
568 			free_ind_table(idesc);
569 			return -1;
570 		}
571 
572 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
573 			break;
574 
575 		idx = descs[idx].next;
576 	}
577 
578 	*desc_chain_len = len;
579 	*vec_idx = vec_id;
580 
581 	if (unlikely(!!idesc))
582 		free_ind_table(idesc);
583 
584 	return 0;
585 }
586 
587 /*
588  * Returns -1 on fail, 0 on success
589  */
590 static inline int
591 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
592 				uint32_t size, struct buf_vector *buf_vec,
593 				uint16_t *num_buffers, uint16_t avail_head,
594 				uint16_t *nr_vec)
595 {
596 	uint16_t cur_idx;
597 	uint16_t vec_idx = 0;
598 	uint16_t max_tries, tries = 0;
599 
600 	uint16_t head_idx = 0;
601 	uint32_t len = 0;
602 
603 	*num_buffers = 0;
604 	cur_idx  = vq->last_avail_idx;
605 
606 	if (rxvq_is_mergeable(dev))
607 		max_tries = vq->size - 1;
608 	else
609 		max_tries = 1;
610 
611 	while (size > 0) {
612 		if (unlikely(cur_idx == avail_head))
613 			return -1;
614 		/*
615 		 * if we tried all available ring items, and still
616 		 * can't get enough buf, it means something abnormal
617 		 * happened.
618 		 */
619 		if (unlikely(++tries > max_tries))
620 			return -1;
621 
622 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
623 						&vec_idx, buf_vec,
624 						&head_idx, &len,
625 						VHOST_ACCESS_RW) < 0))
626 			return -1;
627 		len = RTE_MIN(len, size);
628 		update_shadow_used_ring_split(vq, head_idx, len);
629 		size -= len;
630 
631 		cur_idx++;
632 		*num_buffers += 1;
633 	}
634 
635 	*nr_vec = vec_idx;
636 
637 	return 0;
638 }
639 
640 static __rte_always_inline int
641 fill_vec_buf_packed_indirect(struct virtio_net *dev,
642 			struct vhost_virtqueue *vq,
643 			struct vring_packed_desc *desc, uint16_t *vec_idx,
644 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
645 {
646 	uint16_t i;
647 	uint32_t nr_descs;
648 	uint16_t vec_id = *vec_idx;
649 	uint64_t dlen;
650 	struct vring_packed_desc *descs, *idescs = NULL;
651 
652 	dlen = desc->len;
653 	descs = (struct vring_packed_desc *)(uintptr_t)
654 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
655 	if (unlikely(!descs))
656 		return -1;
657 
658 	if (unlikely(dlen < desc->len)) {
659 		/*
660 		 * The indirect desc table is not contiguous
661 		 * in process VA space, we have to copy it.
662 		 */
663 		idescs = vhost_alloc_copy_ind_table(dev,
664 				vq, desc->addr, desc->len);
665 		if (unlikely(!idescs))
666 			return -1;
667 
668 		descs = idescs;
669 	}
670 
671 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
672 	if (unlikely(nr_descs >= vq->size)) {
673 		free_ind_table(idescs);
674 		return -1;
675 	}
676 
677 	for (i = 0; i < nr_descs; i++) {
678 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
679 			free_ind_table(idescs);
680 			return -1;
681 		}
682 
683 		dlen = descs[i].len;
684 		*len += dlen;
685 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
686 						descs[i].addr, dlen,
687 						perm)))
688 			return -1;
689 	}
690 	*vec_idx = vec_id;
691 
692 	if (unlikely(!!idescs))
693 		free_ind_table(idescs);
694 
695 	return 0;
696 }
697 
698 static __rte_always_inline int
699 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
700 				uint16_t avail_idx, uint16_t *desc_count,
701 				struct buf_vector *buf_vec, uint16_t *vec_idx,
702 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
703 {
704 	bool wrap_counter = vq->avail_wrap_counter;
705 	struct vring_packed_desc *descs = vq->desc_packed;
706 	uint16_t vec_id = *vec_idx;
707 	uint64_t dlen;
708 
709 	if (avail_idx < vq->last_avail_idx)
710 		wrap_counter ^= 1;
711 
712 	/*
713 	 * Perform a load-acquire barrier in desc_is_avail to
714 	 * enforce the ordering between desc flags and desc
715 	 * content.
716 	 */
717 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
718 		return -1;
719 
720 	*desc_count = 0;
721 	*len = 0;
722 
723 	while (1) {
724 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
725 			return -1;
726 
727 		if (unlikely(*desc_count >= vq->size))
728 			return -1;
729 
730 		*desc_count += 1;
731 		*buf_id = descs[avail_idx].id;
732 
733 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
734 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
735 							&descs[avail_idx],
736 							&vec_id, buf_vec,
737 							len, perm) < 0))
738 				return -1;
739 		} else {
740 			dlen = descs[avail_idx].len;
741 			*len += dlen;
742 
743 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
744 							descs[avail_idx].addr,
745 							dlen,
746 							perm)))
747 				return -1;
748 		}
749 
750 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
751 			break;
752 
753 		if (++avail_idx >= vq->size) {
754 			avail_idx -= vq->size;
755 			wrap_counter ^= 1;
756 		}
757 	}
758 
759 	*vec_idx = vec_id;
760 
761 	return 0;
762 }
763 
764 static __rte_noinline void
765 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
766 		struct buf_vector *buf_vec,
767 		struct virtio_net_hdr_mrg_rxbuf *hdr)
768 {
769 	uint64_t len;
770 	uint64_t remain = dev->vhost_hlen;
771 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
772 	uint64_t iova = buf_vec->buf_iova;
773 
774 	while (remain) {
775 		len = RTE_MIN(remain,
776 				buf_vec->buf_len);
777 		dst = buf_vec->buf_addr;
778 		rte_memcpy((void *)(uintptr_t)dst,
779 				(void *)(uintptr_t)src,
780 				len);
781 
782 		PRINT_PACKET(dev, (uintptr_t)dst,
783 				(uint32_t)len, 0);
784 		vhost_log_cache_write_iova(dev, vq,
785 				iova, len);
786 
787 		remain -= len;
788 		iova += len;
789 		src += len;
790 		buf_vec++;
791 	}
792 }
793 
794 static __rte_always_inline int
795 async_iter_initialize(struct vhost_async *async)
796 {
797 	struct rte_vhost_iov_iter *iter;
798 
799 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
800 		VHOST_LOG_DATA(ERR, "no more async iovec available\n");
801 		return -1;
802 	}
803 
804 	iter = async->iov_iter + async->iter_idx;
805 	iter->iov = async->iovec + async->iovec_idx;
806 	iter->nr_segs = 0;
807 
808 	return 0;
809 }
810 
811 static __rte_always_inline int
812 async_iter_add_iovec(struct vhost_async *async, void *src, void *dst, size_t len)
813 {
814 	struct rte_vhost_iov_iter *iter;
815 	struct rte_vhost_iovec *iovec;
816 
817 	if (unlikely(async->iovec_idx >= VHOST_MAX_ASYNC_VEC)) {
818 		static bool vhost_max_async_vec_log;
819 
820 		if (!vhost_max_async_vec_log) {
821 			VHOST_LOG_DATA(ERR, "no more async iovec available\n");
822 			vhost_max_async_vec_log = true;
823 		}
824 
825 		return -1;
826 	}
827 
828 	iter = async->iov_iter + async->iter_idx;
829 	iovec = async->iovec + async->iovec_idx;
830 
831 	iovec->src_addr = src;
832 	iovec->dst_addr = dst;
833 	iovec->len = len;
834 
835 	iter->nr_segs++;
836 	async->iovec_idx++;
837 
838 	return 0;
839 }
840 
841 static __rte_always_inline void
842 async_iter_finalize(struct vhost_async *async)
843 {
844 	async->iter_idx++;
845 }
846 
847 static __rte_always_inline void
848 async_iter_cancel(struct vhost_async *async)
849 {
850 	struct rte_vhost_iov_iter *iter;
851 
852 	iter = async->iov_iter + async->iter_idx;
853 	async->iovec_idx -= iter->nr_segs;
854 	iter->nr_segs = 0;
855 	iter->iov = NULL;
856 }
857 
858 static __rte_always_inline void
859 async_iter_reset(struct vhost_async *async)
860 {
861 	async->iter_idx = 0;
862 	async->iovec_idx = 0;
863 }
864 
865 static __rte_always_inline int
866 async_mbuf_to_desc_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
867 		struct rte_mbuf *m, uint32_t mbuf_offset,
868 		uint64_t buf_iova, uint32_t cpy_len)
869 {
870 	struct vhost_async *async = vq->async;
871 	uint64_t mapped_len;
872 	uint32_t buf_offset = 0;
873 	void *hpa;
874 
875 	while (cpy_len) {
876 		hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev,
877 				buf_iova + buf_offset, cpy_len, &mapped_len);
878 		if (unlikely(!hpa)) {
879 			VHOST_LOG_DATA(ERR, "(%d) %s: failed to get hpa.\n", dev->vid, __func__);
880 			return -1;
881 		}
882 
883 		if (unlikely(async_iter_add_iovec(async,
884 						(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
885 							mbuf_offset),
886 						hpa, (size_t)mapped_len)))
887 			return -1;
888 
889 		cpy_len -= (uint32_t)mapped_len;
890 		mbuf_offset += (uint32_t)mapped_len;
891 		buf_offset += (uint32_t)mapped_len;
892 	}
893 
894 	return 0;
895 }
896 
897 static __rte_always_inline void
898 sync_mbuf_to_desc_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
899 		struct rte_mbuf *m, uint32_t mbuf_offset,
900 		uint64_t buf_addr, uint64_t buf_iova, uint32_t cpy_len)
901 {
902 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
903 
904 	if (likely(cpy_len > MAX_BATCH_LEN || vq->batch_copy_nb_elems >= vq->size)) {
905 		rte_memcpy((void *)((uintptr_t)(buf_addr)),
906 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
907 				cpy_len);
908 		vhost_log_cache_write_iova(dev, vq, buf_iova, cpy_len);
909 		PRINT_PACKET(dev, (uintptr_t)(buf_addr), cpy_len, 0);
910 	} else {
911 		batch_copy[vq->batch_copy_nb_elems].dst =
912 			(void *)((uintptr_t)(buf_addr));
913 		batch_copy[vq->batch_copy_nb_elems].src =
914 			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
915 		batch_copy[vq->batch_copy_nb_elems].log_addr = buf_iova;
916 		batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
917 		vq->batch_copy_nb_elems++;
918 	}
919 }
920 
921 static __rte_always_inline int
922 mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
923 		struct rte_mbuf *m, struct buf_vector *buf_vec,
924 		uint16_t nr_vec, uint16_t num_buffers, bool is_async)
925 {
926 	uint32_t vec_idx = 0;
927 	uint32_t mbuf_offset, mbuf_avail;
928 	uint32_t buf_offset, buf_avail;
929 	uint64_t buf_addr, buf_iova, buf_len;
930 	uint32_t cpy_len;
931 	uint64_t hdr_addr;
932 	struct rte_mbuf *hdr_mbuf;
933 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
934 	struct vhost_async *async = vq->async;
935 
936 	if (unlikely(m == NULL))
937 		return -1;
938 
939 	buf_addr = buf_vec[vec_idx].buf_addr;
940 	buf_iova = buf_vec[vec_idx].buf_iova;
941 	buf_len = buf_vec[vec_idx].buf_len;
942 
943 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
944 		return -1;
945 
946 	hdr_mbuf = m;
947 	hdr_addr = buf_addr;
948 	if (unlikely(buf_len < dev->vhost_hlen)) {
949 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
950 		hdr = &tmp_hdr;
951 	} else
952 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
953 
954 	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
955 		dev->vid, num_buffers);
956 
957 	if (unlikely(buf_len < dev->vhost_hlen)) {
958 		buf_offset = dev->vhost_hlen - buf_len;
959 		vec_idx++;
960 		buf_addr = buf_vec[vec_idx].buf_addr;
961 		buf_iova = buf_vec[vec_idx].buf_iova;
962 		buf_len = buf_vec[vec_idx].buf_len;
963 		buf_avail = buf_len - buf_offset;
964 	} else {
965 		buf_offset = dev->vhost_hlen;
966 		buf_avail = buf_len - dev->vhost_hlen;
967 	}
968 
969 	mbuf_avail  = rte_pktmbuf_data_len(m);
970 	mbuf_offset = 0;
971 
972 	if (is_async) {
973 		if (async_iter_initialize(async))
974 			return -1;
975 	}
976 
977 	while (mbuf_avail != 0 || m->next != NULL) {
978 		/* done with current buf, get the next one */
979 		if (buf_avail == 0) {
980 			vec_idx++;
981 			if (unlikely(vec_idx >= nr_vec))
982 				goto error;
983 
984 			buf_addr = buf_vec[vec_idx].buf_addr;
985 			buf_iova = buf_vec[vec_idx].buf_iova;
986 			buf_len = buf_vec[vec_idx].buf_len;
987 
988 			buf_offset = 0;
989 			buf_avail  = buf_len;
990 		}
991 
992 		/* done with current mbuf, get the next one */
993 		if (mbuf_avail == 0) {
994 			m = m->next;
995 
996 			mbuf_offset = 0;
997 			mbuf_avail  = rte_pktmbuf_data_len(m);
998 		}
999 
1000 		if (hdr_addr) {
1001 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1002 			if (rxvq_is_mergeable(dev))
1003 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1004 						num_buffers);
1005 
1006 			if (unlikely(hdr == &tmp_hdr)) {
1007 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1008 			} else {
1009 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1010 						dev->vhost_hlen, 0);
1011 				vhost_log_cache_write_iova(dev, vq,
1012 						buf_vec[0].buf_iova,
1013 						dev->vhost_hlen);
1014 			}
1015 
1016 			hdr_addr = 0;
1017 		}
1018 
1019 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1020 
1021 		if (is_async) {
1022 			if (async_mbuf_to_desc_seg(dev, vq, m, mbuf_offset,
1023 						buf_iova + buf_offset, cpy_len) < 0)
1024 				goto error;
1025 		} else {
1026 			sync_mbuf_to_desc_seg(dev, vq, m, mbuf_offset,
1027 					buf_addr + buf_offset,
1028 					buf_iova + buf_offset, cpy_len);
1029 		}
1030 
1031 		mbuf_avail  -= cpy_len;
1032 		mbuf_offset += cpy_len;
1033 		buf_avail  -= cpy_len;
1034 		buf_offset += cpy_len;
1035 	}
1036 
1037 	if (is_async)
1038 		async_iter_finalize(async);
1039 
1040 	return 0;
1041 error:
1042 	if (is_async)
1043 		async_iter_cancel(async);
1044 
1045 	return -1;
1046 }
1047 
1048 static __rte_always_inline int
1049 vhost_enqueue_single_packed(struct virtio_net *dev,
1050 			    struct vhost_virtqueue *vq,
1051 			    struct rte_mbuf *pkt,
1052 			    struct buf_vector *buf_vec,
1053 			    uint16_t *nr_descs)
1054 {
1055 	uint16_t nr_vec = 0;
1056 	uint16_t avail_idx = vq->last_avail_idx;
1057 	uint16_t max_tries, tries = 0;
1058 	uint16_t buf_id = 0;
1059 	uint32_t len = 0;
1060 	uint16_t desc_count;
1061 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1062 	uint16_t num_buffers = 0;
1063 	uint32_t buffer_len[vq->size];
1064 	uint16_t buffer_buf_id[vq->size];
1065 	uint16_t buffer_desc_count[vq->size];
1066 
1067 	if (rxvq_is_mergeable(dev))
1068 		max_tries = vq->size - 1;
1069 	else
1070 		max_tries = 1;
1071 
1072 	while (size > 0) {
1073 		/*
1074 		 * if we tried all available ring items, and still
1075 		 * can't get enough buf, it means something abnormal
1076 		 * happened.
1077 		 */
1078 		if (unlikely(++tries > max_tries))
1079 			return -1;
1080 
1081 		if (unlikely(fill_vec_buf_packed(dev, vq,
1082 						avail_idx, &desc_count,
1083 						buf_vec, &nr_vec,
1084 						&buf_id, &len,
1085 						VHOST_ACCESS_RW) < 0))
1086 			return -1;
1087 
1088 		len = RTE_MIN(len, size);
1089 		size -= len;
1090 
1091 		buffer_len[num_buffers] = len;
1092 		buffer_buf_id[num_buffers] = buf_id;
1093 		buffer_desc_count[num_buffers] = desc_count;
1094 		num_buffers += 1;
1095 
1096 		*nr_descs += desc_count;
1097 		avail_idx += desc_count;
1098 		if (avail_idx >= vq->size)
1099 			avail_idx -= vq->size;
1100 	}
1101 
1102 	if (mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers, false) < 0)
1103 		return -1;
1104 
1105 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1106 					   buffer_desc_count, num_buffers);
1107 
1108 	return 0;
1109 }
1110 
1111 static __rte_noinline uint32_t
1112 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1113 	struct rte_mbuf **pkts, uint32_t count)
1114 {
1115 	uint32_t pkt_idx = 0;
1116 	uint16_t num_buffers;
1117 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1118 	uint16_t avail_head;
1119 
1120 	/*
1121 	 * The ordering between avail index and
1122 	 * desc reads needs to be enforced.
1123 	 */
1124 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1125 
1126 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1127 
1128 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1129 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1130 		uint16_t nr_vec = 0;
1131 
1132 		if (unlikely(reserve_avail_buf_split(dev, vq,
1133 						pkt_len, buf_vec, &num_buffers,
1134 						avail_head, &nr_vec) < 0)) {
1135 			VHOST_LOG_DATA(DEBUG,
1136 				"(%d) failed to get enough desc from vring\n",
1137 				dev->vid);
1138 			vq->shadow_used_idx -= num_buffers;
1139 			break;
1140 		}
1141 
1142 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1143 			dev->vid, vq->last_avail_idx,
1144 			vq->last_avail_idx + num_buffers);
1145 
1146 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec,
1147 					num_buffers, false) < 0) {
1148 			vq->shadow_used_idx -= num_buffers;
1149 			break;
1150 		}
1151 
1152 		vq->last_avail_idx += num_buffers;
1153 	}
1154 
1155 	do_data_copy_enqueue(dev, vq);
1156 
1157 	if (likely(vq->shadow_used_idx)) {
1158 		flush_shadow_used_ring_split(dev, vq);
1159 		vhost_vring_call_split(dev, vq);
1160 	}
1161 
1162 	return pkt_idx;
1163 }
1164 
1165 static __rte_always_inline int
1166 virtio_dev_rx_sync_batch_check(struct virtio_net *dev,
1167 			   struct vhost_virtqueue *vq,
1168 			   struct rte_mbuf **pkts,
1169 			   uint64_t *desc_addrs,
1170 			   uint64_t *lens)
1171 {
1172 	bool wrap_counter = vq->avail_wrap_counter;
1173 	struct vring_packed_desc *descs = vq->desc_packed;
1174 	uint16_t avail_idx = vq->last_avail_idx;
1175 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1176 	uint16_t i;
1177 
1178 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1179 		return -1;
1180 
1181 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1182 		return -1;
1183 
1184 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1185 		if (unlikely(pkts[i]->next != NULL))
1186 			return -1;
1187 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1188 					    wrap_counter)))
1189 			return -1;
1190 	}
1191 
1192 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1193 		lens[i] = descs[avail_idx + i].len;
1194 
1195 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1196 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1197 			return -1;
1198 	}
1199 
1200 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1201 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1202 						  descs[avail_idx + i].addr,
1203 						  &lens[i],
1204 						  VHOST_ACCESS_RW);
1205 
1206 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1207 		if (unlikely(!desc_addrs[i]))
1208 			return -1;
1209 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1210 			return -1;
1211 	}
1212 
1213 	return 0;
1214 }
1215 
1216 static __rte_always_inline void
1217 virtio_dev_rx_batch_packed_copy(struct virtio_net *dev,
1218 			   struct vhost_virtqueue *vq,
1219 			   struct rte_mbuf **pkts,
1220 			   uint64_t *desc_addrs,
1221 			   uint64_t *lens)
1222 {
1223 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1224 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1225 	struct vring_packed_desc *descs = vq->desc_packed;
1226 	uint16_t avail_idx = vq->last_avail_idx;
1227 	uint16_t ids[PACKED_BATCH_SIZE];
1228 	uint16_t i;
1229 
1230 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1231 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1232 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1233 					(uintptr_t)desc_addrs[i];
1234 		lens[i] = pkts[i]->pkt_len +
1235 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1236 	}
1237 
1238 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1239 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1240 
1241 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1242 
1243 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1244 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1245 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1246 			   pkts[i]->pkt_len);
1247 	}
1248 
1249 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1250 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1251 					   lens[i]);
1252 
1253 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1254 		ids[i] = descs[avail_idx + i].id;
1255 
1256 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1257 }
1258 
1259 static __rte_always_inline int
1260 virtio_dev_rx_sync_batch_packed(struct virtio_net *dev,
1261 			   struct vhost_virtqueue *vq,
1262 			   struct rte_mbuf **pkts)
1263 {
1264 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1265 	uint64_t lens[PACKED_BATCH_SIZE];
1266 
1267 	if (virtio_dev_rx_sync_batch_check(dev, vq, pkts, desc_addrs, lens) == -1)
1268 		return -1;
1269 
1270 	if (vq->shadow_used_idx) {
1271 		do_data_copy_enqueue(dev, vq);
1272 		vhost_flush_enqueue_shadow_packed(dev, vq);
1273 	}
1274 
1275 	virtio_dev_rx_batch_packed_copy(dev, vq, pkts, desc_addrs, lens);
1276 
1277 	return 0;
1278 }
1279 
1280 static __rte_always_inline int16_t
1281 virtio_dev_rx_single_packed(struct virtio_net *dev,
1282 			    struct vhost_virtqueue *vq,
1283 			    struct rte_mbuf *pkt)
1284 {
1285 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1286 	uint16_t nr_descs = 0;
1287 
1288 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1289 						 &nr_descs) < 0)) {
1290 		VHOST_LOG_DATA(DEBUG,
1291 				"(%d) failed to get enough desc from vring\n",
1292 				dev->vid);
1293 		return -1;
1294 	}
1295 
1296 	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1297 			dev->vid, vq->last_avail_idx,
1298 			vq->last_avail_idx + nr_descs);
1299 
1300 	vq_inc_last_avail_packed(vq, nr_descs);
1301 
1302 	return 0;
1303 }
1304 
1305 static __rte_noinline uint32_t
1306 virtio_dev_rx_packed(struct virtio_net *dev,
1307 		     struct vhost_virtqueue *__rte_restrict vq,
1308 		     struct rte_mbuf **__rte_restrict pkts,
1309 		     uint32_t count)
1310 {
1311 	uint32_t pkt_idx = 0;
1312 
1313 	do {
1314 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1315 
1316 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
1317 			if (!virtio_dev_rx_sync_batch_packed(dev, vq,
1318 							&pkts[pkt_idx])) {
1319 				pkt_idx += PACKED_BATCH_SIZE;
1320 				continue;
1321 			}
1322 		}
1323 
1324 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1325 			break;
1326 		pkt_idx++;
1327 
1328 	} while (pkt_idx < count);
1329 
1330 	if (vq->shadow_used_idx) {
1331 		do_data_copy_enqueue(dev, vq);
1332 		vhost_flush_enqueue_shadow_packed(dev, vq);
1333 	}
1334 
1335 	if (pkt_idx)
1336 		vhost_vring_call_packed(dev, vq);
1337 
1338 	return pkt_idx;
1339 }
1340 
1341 static __rte_always_inline uint32_t
1342 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
1343 	struct rte_mbuf **pkts, uint32_t count)
1344 {
1345 	struct vhost_virtqueue *vq;
1346 	uint32_t nb_tx = 0;
1347 
1348 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1349 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1350 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1351 			dev->vid, __func__, queue_id);
1352 		return 0;
1353 	}
1354 
1355 	vq = dev->virtqueue[queue_id];
1356 
1357 	rte_spinlock_lock(&vq->access_lock);
1358 
1359 	if (unlikely(!vq->enabled))
1360 		goto out_access_unlock;
1361 
1362 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1363 		vhost_user_iotlb_rd_lock(vq);
1364 
1365 	if (unlikely(!vq->access_ok))
1366 		if (unlikely(vring_translate(dev, vq) < 0))
1367 			goto out;
1368 
1369 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1370 	if (count == 0)
1371 		goto out;
1372 
1373 	if (vq_is_packed(dev))
1374 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1375 	else
1376 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1377 
1378 out:
1379 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1380 		vhost_user_iotlb_rd_unlock(vq);
1381 
1382 out_access_unlock:
1383 	rte_spinlock_unlock(&vq->access_lock);
1384 
1385 	return nb_tx;
1386 }
1387 
1388 uint16_t
1389 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1390 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1391 {
1392 	struct virtio_net *dev = get_device(vid);
1393 
1394 	if (!dev)
1395 		return 0;
1396 
1397 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1398 		VHOST_LOG_DATA(ERR,
1399 			"(%d) %s: built-in vhost net backend is disabled.\n",
1400 			dev->vid, __func__);
1401 		return 0;
1402 	}
1403 
1404 	return virtio_dev_rx(dev, queue_id, pkts, count);
1405 }
1406 
1407 static __rte_always_inline uint16_t
1408 async_get_first_inflight_pkt_idx(struct vhost_virtqueue *vq)
1409 {
1410 	struct vhost_async *async = vq->async;
1411 
1412 	if (async->pkts_idx >= async->pkts_inflight_n)
1413 		return async->pkts_idx - async->pkts_inflight_n;
1414 	else
1415 		return vq->size - async->pkts_inflight_n + async->pkts_idx;
1416 }
1417 
1418 static __rte_always_inline void
1419 store_dma_desc_info_split(struct vring_used_elem *s_ring, struct vring_used_elem *d_ring,
1420 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1421 {
1422 	size_t elem_size = sizeof(struct vring_used_elem);
1423 
1424 	if (d_idx + count <= ring_size) {
1425 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1426 	} else {
1427 		uint16_t size = ring_size - d_idx;
1428 
1429 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1430 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1431 	}
1432 }
1433 
1434 static __rte_always_inline void
1435 store_dma_desc_info_packed(struct vring_used_elem_packed *s_ring,
1436 		struct vring_used_elem_packed *d_ring,
1437 		uint16_t ring_size, uint16_t s_idx, uint16_t d_idx, uint16_t count)
1438 {
1439 	size_t elem_size = sizeof(struct vring_used_elem_packed);
1440 
1441 	if (d_idx + count <= ring_size) {
1442 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, count * elem_size);
1443 	} else {
1444 		uint16_t size = ring_size - d_idx;
1445 
1446 		rte_memcpy(d_ring + d_idx, s_ring + s_idx, size * elem_size);
1447 		rte_memcpy(d_ring, s_ring + s_idx + size, (count - size) * elem_size);
1448 	}
1449 }
1450 
1451 static __rte_noinline uint32_t
1452 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
1453 	struct vhost_virtqueue *vq, uint16_t queue_id,
1454 	struct rte_mbuf **pkts, uint32_t count)
1455 {
1456 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1457 	uint32_t pkt_idx = 0;
1458 	uint16_t num_buffers;
1459 	uint16_t avail_head;
1460 
1461 	struct vhost_async *async = vq->async;
1462 	struct async_inflight_info *pkts_info = async->pkts_info;
1463 	uint32_t pkt_err = 0;
1464 	int32_t n_xfer;
1465 	uint16_t slot_idx = 0;
1466 
1467 	/*
1468 	 * The ordering between avail index and desc reads need to be enforced.
1469 	 */
1470 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1471 
1472 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1473 
1474 	async_iter_reset(async);
1475 
1476 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1477 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1478 		uint16_t nr_vec = 0;
1479 
1480 		if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec,
1481 						&num_buffers, avail_head, &nr_vec) < 0)) {
1482 			VHOST_LOG_DATA(DEBUG, "(%d) failed to get enough desc from vring\n",
1483 					dev->vid);
1484 			vq->shadow_used_idx -= num_buffers;
1485 			break;
1486 		}
1487 
1488 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1489 			dev->vid, vq->last_avail_idx, vq->last_avail_idx + num_buffers);
1490 
1491 		if (mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers, true) < 0) {
1492 			vq->shadow_used_idx -= num_buffers;
1493 			break;
1494 		}
1495 
1496 		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
1497 		pkts_info[slot_idx].descs = num_buffers;
1498 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1499 
1500 		vq->last_avail_idx += num_buffers;
1501 	}
1502 
1503 	if (unlikely(pkt_idx == 0))
1504 		return 0;
1505 
1506 	n_xfer = async->ops.transfer_data(dev->vid, queue_id, async->iov_iter, 0, pkt_idx);
1507 	if (unlikely(n_xfer < 0)) {
1508 		VHOST_LOG_DATA(ERR, "(%d) %s: failed to transfer data for queue id %d.\n",
1509 				dev->vid, __func__, queue_id);
1510 		n_xfer = 0;
1511 	}
1512 
1513 	pkt_err = pkt_idx - n_xfer;
1514 	if (unlikely(pkt_err)) {
1515 		uint16_t num_descs = 0;
1516 
1517 		/* update number of completed packets */
1518 		pkt_idx = n_xfer;
1519 
1520 		/* calculate the sum of descriptors to revert */
1521 		while (pkt_err-- > 0) {
1522 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1523 			slot_idx--;
1524 		}
1525 
1526 		/* recover shadow used ring and available ring */
1527 		vq->shadow_used_idx -= num_descs;
1528 		vq->last_avail_idx -= num_descs;
1529 	}
1530 
1531 	/* keep used descriptors */
1532 	if (likely(vq->shadow_used_idx)) {
1533 		uint16_t to = async->desc_idx_split & (vq->size - 1);
1534 
1535 		store_dma_desc_info_split(vq->shadow_used_split,
1536 				async->descs_split, vq->size, 0, to,
1537 				vq->shadow_used_idx);
1538 
1539 		async->desc_idx_split += vq->shadow_used_idx;
1540 
1541 		async->pkts_idx += pkt_idx;
1542 		if (async->pkts_idx >= vq->size)
1543 			async->pkts_idx -= vq->size;
1544 
1545 		async->pkts_inflight_n += pkt_idx;
1546 		vq->shadow_used_idx = 0;
1547 	}
1548 
1549 	return pkt_idx;
1550 }
1551 
1552 
1553 static __rte_always_inline int
1554 vhost_enqueue_async_packed(struct virtio_net *dev,
1555 			    struct vhost_virtqueue *vq,
1556 			    struct rte_mbuf *pkt,
1557 			    struct buf_vector *buf_vec,
1558 			    uint16_t *nr_descs,
1559 			    uint16_t *nr_buffers)
1560 {
1561 	uint16_t nr_vec = 0;
1562 	uint16_t avail_idx = vq->last_avail_idx;
1563 	uint16_t max_tries, tries = 0;
1564 	uint16_t buf_id = 0;
1565 	uint32_t len = 0;
1566 	uint16_t desc_count = 0;
1567 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1568 	uint32_t buffer_len[vq->size];
1569 	uint16_t buffer_buf_id[vq->size];
1570 	uint16_t buffer_desc_count[vq->size];
1571 
1572 	if (rxvq_is_mergeable(dev))
1573 		max_tries = vq->size - 1;
1574 	else
1575 		max_tries = 1;
1576 
1577 	while (size > 0) {
1578 		/*
1579 		 * if we tried all available ring items, and still
1580 		 * can't get enough buf, it means something abnormal
1581 		 * happened.
1582 		 */
1583 		if (unlikely(++tries > max_tries))
1584 			return -1;
1585 
1586 		if (unlikely(fill_vec_buf_packed(dev, vq,
1587 						avail_idx, &desc_count,
1588 						buf_vec, &nr_vec,
1589 						&buf_id, &len,
1590 						VHOST_ACCESS_RW) < 0))
1591 			return -1;
1592 
1593 		len = RTE_MIN(len, size);
1594 		size -= len;
1595 
1596 		buffer_len[*nr_buffers] = len;
1597 		buffer_buf_id[*nr_buffers] = buf_id;
1598 		buffer_desc_count[*nr_buffers] = desc_count;
1599 		*nr_buffers += 1;
1600 		*nr_descs += desc_count;
1601 		avail_idx += desc_count;
1602 		if (avail_idx >= vq->size)
1603 			avail_idx -= vq->size;
1604 	}
1605 
1606 	if (unlikely(mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers, true) < 0))
1607 		return -1;
1608 
1609 	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id, buffer_desc_count, *nr_buffers);
1610 
1611 	return 0;
1612 }
1613 
1614 static __rte_always_inline int16_t
1615 virtio_dev_rx_async_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
1616 			    struct rte_mbuf *pkt, uint16_t *nr_descs, uint16_t *nr_buffers)
1617 {
1618 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1619 
1620 	if (unlikely(vhost_enqueue_async_packed(dev, vq, pkt, buf_vec,
1621 					nr_descs, nr_buffers) < 0)) {
1622 		VHOST_LOG_DATA(DEBUG, "(%d) failed to get enough desc from vring\n", dev->vid);
1623 		return -1;
1624 	}
1625 
1626 	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1627 			dev->vid, vq->last_avail_idx, vq->last_avail_idx + *nr_descs);
1628 
1629 	return 0;
1630 }
1631 
1632 static __rte_always_inline void
1633 dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
1634 			uint32_t nr_err, uint32_t *pkt_idx)
1635 {
1636 	uint16_t descs_err = 0;
1637 	uint16_t buffers_err = 0;
1638 	struct async_inflight_info *pkts_info = vq->async->pkts_info;
1639 
1640 	*pkt_idx -= nr_err;
1641 	/* calculate the sum of buffers and descs of DMA-error packets. */
1642 	while (nr_err-- > 0) {
1643 		descs_err += pkts_info[slot_idx % vq->size].descs;
1644 		buffers_err += pkts_info[slot_idx % vq->size].nr_buffers;
1645 		slot_idx--;
1646 	}
1647 
1648 	if (vq->last_avail_idx >= descs_err) {
1649 		vq->last_avail_idx -= descs_err;
1650 	} else {
1651 		vq->last_avail_idx = vq->last_avail_idx + vq->size - descs_err;
1652 		vq->avail_wrap_counter ^= 1;
1653 	}
1654 
1655 	vq->shadow_used_idx -= buffers_err;
1656 }
1657 
1658 static __rte_noinline uint32_t
1659 virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
1660 	struct vhost_virtqueue *vq, uint16_t queue_id,
1661 	struct rte_mbuf **pkts, uint32_t count)
1662 {
1663 	uint32_t pkt_idx = 0;
1664 	uint32_t remained = count;
1665 	int32_t n_xfer;
1666 	uint16_t num_buffers;
1667 	uint16_t num_descs;
1668 
1669 	struct vhost_async *async = vq->async;
1670 	struct async_inflight_info *pkts_info = async->pkts_info;
1671 	uint32_t pkt_err = 0;
1672 	uint16_t slot_idx = 0;
1673 
1674 	do {
1675 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1676 
1677 		num_buffers = 0;
1678 		num_descs = 0;
1679 		if (unlikely(virtio_dev_rx_async_packed(dev, vq, pkts[pkt_idx],
1680 						&num_descs, &num_buffers) < 0))
1681 			break;
1682 
1683 		slot_idx = (async->pkts_idx + pkt_idx) % vq->size;
1684 
1685 		pkts_info[slot_idx].descs = num_descs;
1686 		pkts_info[slot_idx].nr_buffers = num_buffers;
1687 		pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1688 
1689 		pkt_idx++;
1690 		remained--;
1691 		vq_inc_last_avail_packed(vq, num_descs);
1692 	} while (pkt_idx < count);
1693 
1694 	if (unlikely(pkt_idx == 0))
1695 		return 0;
1696 
1697 	n_xfer = async->ops.transfer_data(dev->vid, queue_id, async->iov_iter, 0, pkt_idx);
1698 	if (unlikely(n_xfer < 0)) {
1699 		VHOST_LOG_DATA(ERR, "(%d) %s: failed to transfer data for queue id %d.\n",
1700 				dev->vid, __func__, queue_id);
1701 		n_xfer = 0;
1702 	}
1703 
1704 	pkt_err = pkt_idx - n_xfer;
1705 
1706 	async_iter_reset(async);
1707 
1708 	if (unlikely(pkt_err))
1709 		dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
1710 
1711 	if (likely(vq->shadow_used_idx)) {
1712 		/* keep used descriptors. */
1713 		store_dma_desc_info_packed(vq->shadow_used_packed, async->buffers_packed,
1714 					vq->size, 0, async->buffer_idx_packed,
1715 					vq->shadow_used_idx);
1716 
1717 		async->buffer_idx_packed += vq->shadow_used_idx;
1718 		if (async->buffer_idx_packed >= vq->size)
1719 			async->buffer_idx_packed -= vq->size;
1720 
1721 		async->pkts_idx += pkt_idx;
1722 		if (async->pkts_idx >= vq->size)
1723 			async->pkts_idx -= vq->size;
1724 
1725 		vq->shadow_used_idx = 0;
1726 		async->pkts_inflight_n += pkt_idx;
1727 	}
1728 
1729 	return pkt_idx;
1730 }
1731 
1732 static __rte_always_inline void
1733 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
1734 {
1735 	struct vhost_async *async = vq->async;
1736 	uint16_t nr_left = n_descs;
1737 	uint16_t nr_copy;
1738 	uint16_t to, from;
1739 
1740 	do {
1741 		from = async->last_desc_idx_split & (vq->size - 1);
1742 		nr_copy = nr_left + from <= vq->size ? nr_left : vq->size - from;
1743 		to = vq->last_used_idx & (vq->size - 1);
1744 
1745 		if (to + nr_copy <= vq->size) {
1746 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
1747 					nr_copy * sizeof(struct vring_used_elem));
1748 		} else {
1749 			uint16_t size = vq->size - to;
1750 
1751 			rte_memcpy(&vq->used->ring[to], &async->descs_split[from],
1752 					size * sizeof(struct vring_used_elem));
1753 			rte_memcpy(&vq->used->ring[0], &async->descs_split[from + size],
1754 					(nr_copy - size) * sizeof(struct vring_used_elem));
1755 		}
1756 
1757 		async->last_desc_idx_split += nr_copy;
1758 		vq->last_used_idx += nr_copy;
1759 		nr_left -= nr_copy;
1760 	} while (nr_left > 0);
1761 }
1762 
1763 static __rte_always_inline void
1764 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
1765 				uint16_t n_buffers)
1766 {
1767 	struct vhost_async *async = vq->async;
1768 	uint16_t from = async->last_buffer_idx_packed;
1769 	uint16_t used_idx = vq->last_used_idx;
1770 	uint16_t head_idx = vq->last_used_idx;
1771 	uint16_t head_flags = 0;
1772 	uint16_t i;
1773 
1774 	/* Split loop in two to save memory barriers */
1775 	for (i = 0; i < n_buffers; i++) {
1776 		vq->desc_packed[used_idx].id = async->buffers_packed[from].id;
1777 		vq->desc_packed[used_idx].len = async->buffers_packed[from].len;
1778 
1779 		used_idx += async->buffers_packed[from].count;
1780 		if (used_idx >= vq->size)
1781 			used_idx -= vq->size;
1782 
1783 		from++;
1784 		if (from >= vq->size)
1785 			from = 0;
1786 	}
1787 
1788 	/* The ordering for storing desc flags needs to be enforced. */
1789 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
1790 
1791 	from = async->last_buffer_idx_packed;
1792 
1793 	for (i = 0; i < n_buffers; i++) {
1794 		uint16_t flags;
1795 
1796 		if (async->buffers_packed[from].len)
1797 			flags = VRING_DESC_F_WRITE;
1798 		else
1799 			flags = 0;
1800 
1801 		if (vq->used_wrap_counter) {
1802 			flags |= VRING_DESC_F_USED;
1803 			flags |= VRING_DESC_F_AVAIL;
1804 		} else {
1805 			flags &= ~VRING_DESC_F_USED;
1806 			flags &= ~VRING_DESC_F_AVAIL;
1807 		}
1808 
1809 		if (i > 0) {
1810 			vq->desc_packed[vq->last_used_idx].flags = flags;
1811 		} else {
1812 			head_idx = vq->last_used_idx;
1813 			head_flags = flags;
1814 		}
1815 
1816 		vq_inc_last_used_packed(vq, async->buffers_packed[from].count);
1817 
1818 		from++;
1819 		if (from == vq->size)
1820 			from = 0;
1821 	}
1822 
1823 	vq->desc_packed[head_idx].flags = head_flags;
1824 	async->last_buffer_idx_packed = from;
1825 }
1826 
1827 static __rte_always_inline uint16_t
1828 vhost_poll_enqueue_completed(struct virtio_net *dev, uint16_t queue_id,
1829 		struct rte_mbuf **pkts, uint16_t count)
1830 {
1831 	struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
1832 	struct vhost_async *async = vq->async;
1833 	struct async_inflight_info *pkts_info = async->pkts_info;
1834 	int32_t n_cpl;
1835 	uint16_t n_descs = 0, n_buffers = 0;
1836 	uint16_t start_idx, from, i;
1837 
1838 	n_cpl = async->ops.check_completed_copies(dev->vid, queue_id, 0, count);
1839 	if (unlikely(n_cpl < 0)) {
1840 		VHOST_LOG_DATA(ERR, "(%d) %s: failed to check completed copies for queue id %d.\n",
1841 				dev->vid, __func__, queue_id);
1842 		return 0;
1843 	}
1844 
1845 	if (n_cpl == 0)
1846 		return 0;
1847 
1848 	start_idx = async_get_first_inflight_pkt_idx(vq);
1849 
1850 	for (i = 0; i < n_cpl; i++) {
1851 		from = (start_idx + i) % vq->size;
1852 		/* Only used with packed ring */
1853 		n_buffers += pkts_info[from].nr_buffers;
1854 		/* Only used with split ring */
1855 		n_descs += pkts_info[from].descs;
1856 		pkts[i] = pkts_info[from].mbuf;
1857 	}
1858 
1859 	async->pkts_inflight_n -= n_cpl;
1860 
1861 	if (likely(vq->enabled && vq->access_ok)) {
1862 		if (vq_is_packed(dev)) {
1863 			write_back_completed_descs_packed(vq, n_buffers);
1864 			vhost_vring_call_packed(dev, vq);
1865 		} else {
1866 			write_back_completed_descs_split(vq, n_descs);
1867 			__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
1868 			vhost_vring_call_split(dev, vq);
1869 		}
1870 	} else {
1871 		if (vq_is_packed(dev)) {
1872 			async->last_buffer_idx_packed += n_buffers;
1873 			if (async->last_buffer_idx_packed >= vq->size)
1874 				async->last_buffer_idx_packed -= vq->size;
1875 		} else {
1876 			async->last_desc_idx_split += n_descs;
1877 		}
1878 	}
1879 
1880 	return n_cpl;
1881 }
1882 
1883 uint16_t
1884 rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
1885 		struct rte_mbuf **pkts, uint16_t count)
1886 {
1887 	struct virtio_net *dev = get_device(vid);
1888 	struct vhost_virtqueue *vq;
1889 	uint16_t n_pkts_cpl = 0;
1890 
1891 	if (unlikely(!dev))
1892 		return 0;
1893 
1894 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1895 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1896 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1897 			dev->vid, __func__, queue_id);
1898 		return 0;
1899 	}
1900 
1901 	vq = dev->virtqueue[queue_id];
1902 
1903 	if (unlikely(!vq->async)) {
1904 		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
1905 			dev->vid, __func__, queue_id);
1906 		return 0;
1907 	}
1908 
1909 	rte_spinlock_lock(&vq->access_lock);
1910 
1911 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
1912 
1913 	rte_spinlock_unlock(&vq->access_lock);
1914 
1915 	return n_pkts_cpl;
1916 }
1917 
1918 uint16_t
1919 rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
1920 		struct rte_mbuf **pkts, uint16_t count)
1921 {
1922 	struct virtio_net *dev = get_device(vid);
1923 	struct vhost_virtqueue *vq;
1924 	uint16_t n_pkts_cpl = 0;
1925 
1926 	if (!dev)
1927 		return 0;
1928 
1929 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1930 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1931 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1932 			dev->vid, __func__, queue_id);
1933 		return 0;
1934 	}
1935 
1936 	vq = dev->virtqueue[queue_id];
1937 
1938 	if (unlikely(!vq->async)) {
1939 		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
1940 			dev->vid, __func__, queue_id);
1941 		return 0;
1942 	}
1943 
1944 	n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
1945 
1946 	return n_pkts_cpl;
1947 }
1948 
1949 static __rte_always_inline uint32_t
1950 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
1951 	struct rte_mbuf **pkts, uint32_t count)
1952 {
1953 	struct vhost_virtqueue *vq;
1954 	uint32_t nb_tx = 0;
1955 
1956 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1957 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1958 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1959 			dev->vid, __func__, queue_id);
1960 		return 0;
1961 	}
1962 
1963 	vq = dev->virtqueue[queue_id];
1964 
1965 	rte_spinlock_lock(&vq->access_lock);
1966 
1967 	if (unlikely(!vq->enabled || !vq->async))
1968 		goto out_access_unlock;
1969 
1970 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1971 		vhost_user_iotlb_rd_lock(vq);
1972 
1973 	if (unlikely(!vq->access_ok))
1974 		if (unlikely(vring_translate(dev, vq) < 0))
1975 			goto out;
1976 
1977 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1978 	if (count == 0)
1979 		goto out;
1980 
1981 	if (vq_is_packed(dev))
1982 		nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, queue_id,
1983 				pkts, count);
1984 	else
1985 		nb_tx = virtio_dev_rx_async_submit_split(dev, vq, queue_id,
1986 				pkts, count);
1987 
1988 out:
1989 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1990 		vhost_user_iotlb_rd_unlock(vq);
1991 
1992 out_access_unlock:
1993 	rte_spinlock_unlock(&vq->access_lock);
1994 
1995 	return nb_tx;
1996 }
1997 
1998 uint16_t
1999 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
2000 		struct rte_mbuf **pkts, uint16_t count)
2001 {
2002 	struct virtio_net *dev = get_device(vid);
2003 
2004 	if (!dev)
2005 		return 0;
2006 
2007 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2008 		VHOST_LOG_DATA(ERR,
2009 			"(%d) %s: built-in vhost net backend is disabled.\n",
2010 			dev->vid, __func__);
2011 		return 0;
2012 	}
2013 
2014 	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
2015 }
2016 
2017 static inline bool
2018 virtio_net_with_host_offload(struct virtio_net *dev)
2019 {
2020 	if (dev->features &
2021 			((1ULL << VIRTIO_NET_F_CSUM) |
2022 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
2023 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
2024 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
2025 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
2026 		return true;
2027 
2028 	return false;
2029 }
2030 
2031 static int
2032 parse_headers(struct rte_mbuf *m, uint8_t *l4_proto)
2033 {
2034 	struct rte_ipv4_hdr *ipv4_hdr;
2035 	struct rte_ipv6_hdr *ipv6_hdr;
2036 	struct rte_ether_hdr *eth_hdr;
2037 	uint16_t ethertype;
2038 	uint16_t data_len = rte_pktmbuf_data_len(m);
2039 
2040 	if (data_len < sizeof(struct rte_ether_hdr))
2041 		return -EINVAL;
2042 
2043 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
2044 
2045 	m->l2_len = sizeof(struct rte_ether_hdr);
2046 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
2047 
2048 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
2049 		if (data_len < sizeof(struct rte_ether_hdr) +
2050 				sizeof(struct rte_vlan_hdr))
2051 			goto error;
2052 
2053 		struct rte_vlan_hdr *vlan_hdr =
2054 			(struct rte_vlan_hdr *)(eth_hdr + 1);
2055 
2056 		m->l2_len += sizeof(struct rte_vlan_hdr);
2057 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
2058 	}
2059 
2060 	switch (ethertype) {
2061 	case RTE_ETHER_TYPE_IPV4:
2062 		if (data_len < m->l2_len + sizeof(struct rte_ipv4_hdr))
2063 			goto error;
2064 		ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
2065 				m->l2_len);
2066 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
2067 		if (data_len < m->l2_len + m->l3_len)
2068 			goto error;
2069 		m->ol_flags |= RTE_MBUF_F_TX_IPV4;
2070 		*l4_proto = ipv4_hdr->next_proto_id;
2071 		break;
2072 	case RTE_ETHER_TYPE_IPV6:
2073 		if (data_len < m->l2_len + sizeof(struct rte_ipv6_hdr))
2074 			goto error;
2075 		ipv6_hdr = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
2076 				m->l2_len);
2077 		m->l3_len = sizeof(struct rte_ipv6_hdr);
2078 		m->ol_flags |= RTE_MBUF_F_TX_IPV6;
2079 		*l4_proto = ipv6_hdr->proto;
2080 		break;
2081 	default:
2082 		/* a valid L3 header is needed for further L4 parsing */
2083 		goto error;
2084 	}
2085 
2086 	/* both CSUM and GSO need a valid L4 header */
2087 	switch (*l4_proto) {
2088 	case IPPROTO_TCP:
2089 		if (data_len < m->l2_len + m->l3_len +
2090 				sizeof(struct rte_tcp_hdr))
2091 			goto error;
2092 		break;
2093 	case IPPROTO_UDP:
2094 		if (data_len < m->l2_len + m->l3_len +
2095 				sizeof(struct rte_udp_hdr))
2096 			goto error;
2097 		break;
2098 	case IPPROTO_SCTP:
2099 		if (data_len < m->l2_len + m->l3_len +
2100 				sizeof(struct rte_sctp_hdr))
2101 			goto error;
2102 		break;
2103 	default:
2104 		goto error;
2105 	}
2106 
2107 	return 0;
2108 
2109 error:
2110 	m->l2_len = 0;
2111 	m->l3_len = 0;
2112 	m->ol_flags = 0;
2113 	return -EINVAL;
2114 }
2115 
2116 static __rte_always_inline void
2117 vhost_dequeue_offload_legacy(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
2118 {
2119 	uint8_t l4_proto = 0;
2120 	struct rte_tcp_hdr *tcp_hdr = NULL;
2121 	uint16_t tcp_len;
2122 	uint16_t data_len = rte_pktmbuf_data_len(m);
2123 
2124 	if (parse_headers(m, &l4_proto) < 0)
2125 		return;
2126 
2127 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2128 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
2129 			switch (hdr->csum_offset) {
2130 			case (offsetof(struct rte_tcp_hdr, cksum)):
2131 				if (l4_proto != IPPROTO_TCP)
2132 					goto error;
2133 				m->ol_flags |= RTE_MBUF_F_TX_TCP_CKSUM;
2134 				break;
2135 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
2136 				if (l4_proto != IPPROTO_UDP)
2137 					goto error;
2138 				m->ol_flags |= RTE_MBUF_F_TX_UDP_CKSUM;
2139 				break;
2140 			case (offsetof(struct rte_sctp_hdr, cksum)):
2141 				if (l4_proto != IPPROTO_SCTP)
2142 					goto error;
2143 				m->ol_flags |= RTE_MBUF_F_TX_SCTP_CKSUM;
2144 				break;
2145 			default:
2146 				goto error;
2147 			}
2148 		} else {
2149 			goto error;
2150 		}
2151 	}
2152 
2153 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2154 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2155 		case VIRTIO_NET_HDR_GSO_TCPV4:
2156 		case VIRTIO_NET_HDR_GSO_TCPV6:
2157 			if (l4_proto != IPPROTO_TCP)
2158 				goto error;
2159 			tcp_hdr = rte_pktmbuf_mtod_offset(m,
2160 					struct rte_tcp_hdr *,
2161 					m->l2_len + m->l3_len);
2162 			tcp_len = (tcp_hdr->data_off & 0xf0) >> 2;
2163 			if (data_len < m->l2_len + m->l3_len + tcp_len)
2164 				goto error;
2165 			m->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
2166 			m->tso_segsz = hdr->gso_size;
2167 			m->l4_len = tcp_len;
2168 			break;
2169 		case VIRTIO_NET_HDR_GSO_UDP:
2170 			if (l4_proto != IPPROTO_UDP)
2171 				goto error;
2172 			m->ol_flags |= RTE_MBUF_F_TX_UDP_SEG;
2173 			m->tso_segsz = hdr->gso_size;
2174 			m->l4_len = sizeof(struct rte_udp_hdr);
2175 			break;
2176 		default:
2177 			VHOST_LOG_DATA(WARNING,
2178 				"unsupported gso type %u.\n", hdr->gso_type);
2179 			goto error;
2180 		}
2181 	}
2182 	return;
2183 
2184 error:
2185 	m->l2_len = 0;
2186 	m->l3_len = 0;
2187 	m->ol_flags = 0;
2188 }
2189 
2190 static __rte_always_inline void
2191 vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m,
2192 	bool legacy_ol_flags)
2193 {
2194 	struct rte_net_hdr_lens hdr_lens;
2195 	int l4_supported = 0;
2196 	uint32_t ptype;
2197 
2198 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
2199 		return;
2200 
2201 	if (legacy_ol_flags) {
2202 		vhost_dequeue_offload_legacy(hdr, m);
2203 		return;
2204 	}
2205 
2206 	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
2207 
2208 	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
2209 	m->packet_type = ptype;
2210 	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
2211 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
2212 	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
2213 		l4_supported = 1;
2214 
2215 	/* According to Virtio 1.1 spec, the device only needs to look at
2216 	 * VIRTIO_NET_HDR_F_NEEDS_CSUM in the packet transmission path.
2217 	 * This differs from the processing incoming packets path where the
2218 	 * driver could rely on VIRTIO_NET_HDR_F_DATA_VALID flag set by the
2219 	 * device.
2220 	 *
2221 	 * 5.1.6.2.1 Driver Requirements: Packet Transmission
2222 	 * The driver MUST NOT set the VIRTIO_NET_HDR_F_DATA_VALID and
2223 	 * VIRTIO_NET_HDR_F_RSC_INFO bits in flags.
2224 	 *
2225 	 * 5.1.6.2.2 Device Requirements: Packet Transmission
2226 	 * The device MUST ignore flag bits that it does not recognize.
2227 	 */
2228 	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
2229 		uint32_t hdrlen;
2230 
2231 		hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
2232 		if (hdr->csum_start <= hdrlen && l4_supported != 0) {
2233 			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
2234 		} else {
2235 			/* Unknown proto or tunnel, do sw cksum. We can assume
2236 			 * the cksum field is in the first segment since the
2237 			 * buffers we provided to the host are large enough.
2238 			 * In case of SCTP, this will be wrong since it's a CRC
2239 			 * but there's nothing we can do.
2240 			 */
2241 			uint16_t csum = 0, off;
2242 
2243 			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
2244 					rte_pktmbuf_pkt_len(m) - hdr->csum_start, &csum) < 0)
2245 				return;
2246 			if (likely(csum != 0xffff))
2247 				csum = ~csum;
2248 			off = hdr->csum_offset + hdr->csum_start;
2249 			if (rte_pktmbuf_data_len(m) >= off + 1)
2250 				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
2251 		}
2252 	}
2253 
2254 	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
2255 		if (hdr->gso_size == 0)
2256 			return;
2257 
2258 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
2259 		case VIRTIO_NET_HDR_GSO_TCPV4:
2260 		case VIRTIO_NET_HDR_GSO_TCPV6:
2261 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_TCP)
2262 				break;
2263 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2264 			m->tso_segsz = hdr->gso_size;
2265 			break;
2266 		case VIRTIO_NET_HDR_GSO_UDP:
2267 			if ((ptype & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP)
2268 				break;
2269 			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
2270 			m->tso_segsz = hdr->gso_size;
2271 			break;
2272 		default:
2273 			break;
2274 		}
2275 	}
2276 }
2277 
2278 static __rte_noinline void
2279 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
2280 		struct buf_vector *buf_vec)
2281 {
2282 	uint64_t len;
2283 	uint64_t remain = sizeof(struct virtio_net_hdr);
2284 	uint64_t src;
2285 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
2286 
2287 	while (remain) {
2288 		len = RTE_MIN(remain, buf_vec->buf_len);
2289 		src = buf_vec->buf_addr;
2290 		rte_memcpy((void *)(uintptr_t)dst,
2291 				(void *)(uintptr_t)src, len);
2292 
2293 		remain -= len;
2294 		dst += len;
2295 		buf_vec++;
2296 	}
2297 }
2298 
2299 static __rte_always_inline int
2300 copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
2301 		  struct buf_vector *buf_vec, uint16_t nr_vec,
2302 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
2303 		  bool legacy_ol_flags)
2304 {
2305 	uint32_t buf_avail, buf_offset;
2306 	uint64_t buf_addr, buf_len;
2307 	uint32_t mbuf_avail, mbuf_offset;
2308 	uint32_t cpy_len;
2309 	struct rte_mbuf *cur = m, *prev = m;
2310 	struct virtio_net_hdr tmp_hdr;
2311 	struct virtio_net_hdr *hdr = NULL;
2312 	/* A counter to avoid desc dead loop chain */
2313 	uint16_t vec_idx = 0;
2314 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
2315 	int error = 0;
2316 
2317 	buf_addr = buf_vec[vec_idx].buf_addr;
2318 	buf_len = buf_vec[vec_idx].buf_len;
2319 
2320 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
2321 		error = -1;
2322 		goto out;
2323 	}
2324 
2325 	if (virtio_net_with_host_offload(dev)) {
2326 		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
2327 			/*
2328 			 * No luck, the virtio-net header doesn't fit
2329 			 * in a contiguous virtual area.
2330 			 */
2331 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
2332 			hdr = &tmp_hdr;
2333 		} else {
2334 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
2335 		}
2336 	}
2337 
2338 	/*
2339 	 * A virtio driver normally uses at least 2 desc buffers
2340 	 * for Tx: the first for storing the header, and others
2341 	 * for storing the data.
2342 	 */
2343 	if (unlikely(buf_len < dev->vhost_hlen)) {
2344 		buf_offset = dev->vhost_hlen - buf_len;
2345 		vec_idx++;
2346 		buf_addr = buf_vec[vec_idx].buf_addr;
2347 		buf_len = buf_vec[vec_idx].buf_len;
2348 		buf_avail  = buf_len - buf_offset;
2349 	} else if (buf_len == dev->vhost_hlen) {
2350 		if (unlikely(++vec_idx >= nr_vec))
2351 			goto out;
2352 		buf_addr = buf_vec[vec_idx].buf_addr;
2353 		buf_len = buf_vec[vec_idx].buf_len;
2354 
2355 		buf_offset = 0;
2356 		buf_avail = buf_len;
2357 	} else {
2358 		buf_offset = dev->vhost_hlen;
2359 		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
2360 	}
2361 
2362 	PRINT_PACKET(dev,
2363 			(uintptr_t)(buf_addr + buf_offset),
2364 			(uint32_t)buf_avail, 0);
2365 
2366 	mbuf_offset = 0;
2367 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2368 	while (1) {
2369 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2370 
2371 		if (likely(cpy_len > MAX_BATCH_LEN ||
2372 					vq->batch_copy_nb_elems >= vq->size ||
2373 					(hdr && cur == m))) {
2374 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
2375 						mbuf_offset),
2376 					(void *)((uintptr_t)(buf_addr +
2377 							buf_offset)), cpy_len);
2378 		} else {
2379 			batch_copy[vq->batch_copy_nb_elems].dst =
2380 				rte_pktmbuf_mtod_offset(cur, void *,
2381 						mbuf_offset);
2382 			batch_copy[vq->batch_copy_nb_elems].src =
2383 				(void *)((uintptr_t)(buf_addr + buf_offset));
2384 			batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
2385 			vq->batch_copy_nb_elems++;
2386 		}
2387 
2388 		mbuf_avail  -= cpy_len;
2389 		mbuf_offset += cpy_len;
2390 		buf_avail -= cpy_len;
2391 		buf_offset += cpy_len;
2392 
2393 		/* This buf reaches to its end, get the next one */
2394 		if (buf_avail == 0) {
2395 			if (++vec_idx >= nr_vec)
2396 				break;
2397 
2398 			buf_addr = buf_vec[vec_idx].buf_addr;
2399 			buf_len = buf_vec[vec_idx].buf_len;
2400 
2401 			buf_offset = 0;
2402 			buf_avail  = buf_len;
2403 
2404 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2405 					(uint32_t)buf_avail, 0);
2406 		}
2407 
2408 		/*
2409 		 * This mbuf reaches to its end, get a new one
2410 		 * to hold more data.
2411 		 */
2412 		if (mbuf_avail == 0) {
2413 			cur = rte_pktmbuf_alloc(mbuf_pool);
2414 			if (unlikely(cur == NULL)) {
2415 				VHOST_LOG_DATA(ERR, "Failed to "
2416 					"allocate memory for mbuf.\n");
2417 				error = -1;
2418 				goto out;
2419 			}
2420 
2421 			prev->next = cur;
2422 			prev->data_len = mbuf_offset;
2423 			m->nb_segs += 1;
2424 			m->pkt_len += mbuf_offset;
2425 			prev = cur;
2426 
2427 			mbuf_offset = 0;
2428 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
2429 		}
2430 	}
2431 
2432 	prev->data_len = mbuf_offset;
2433 	m->pkt_len    += mbuf_offset;
2434 
2435 	if (hdr)
2436 		vhost_dequeue_offload(hdr, m, legacy_ol_flags);
2437 
2438 out:
2439 
2440 	return error;
2441 }
2442 
2443 static void
2444 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
2445 {
2446 	rte_free(opaque);
2447 }
2448 
2449 static int
2450 virtio_dev_extbuf_alloc(struct rte_mbuf *pkt, uint32_t size)
2451 {
2452 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
2453 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
2454 	uint16_t buf_len;
2455 	rte_iova_t iova;
2456 	void *buf;
2457 
2458 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
2459 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
2460 
2461 	if (unlikely(total_len > UINT16_MAX))
2462 		return -ENOSPC;
2463 
2464 	buf_len = total_len;
2465 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
2466 	if (unlikely(buf == NULL))
2467 		return -ENOMEM;
2468 
2469 	/* Initialize shinfo */
2470 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
2471 						virtio_dev_extbuf_free, buf);
2472 	if (unlikely(shinfo == NULL)) {
2473 		rte_free(buf);
2474 		VHOST_LOG_DATA(ERR, "Failed to init shinfo\n");
2475 		return -1;
2476 	}
2477 
2478 	iova = rte_malloc_virt2iova(buf);
2479 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
2480 	rte_pktmbuf_reset_headroom(pkt);
2481 
2482 	return 0;
2483 }
2484 
2485 /*
2486  * Prepare a host supported pktmbuf.
2487  */
2488 static __rte_always_inline int
2489 virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
2490 			 uint32_t data_len)
2491 {
2492 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
2493 		return 0;
2494 
2495 	/* attach an external buffer if supported */
2496 	if (dev->extbuf && !virtio_dev_extbuf_alloc(pkt, data_len))
2497 		return 0;
2498 
2499 	/* check if chained buffers are allowed */
2500 	if (!dev->linearbuf)
2501 		return 0;
2502 
2503 	return -1;
2504 }
2505 
2506 __rte_always_inline
2507 static uint16_t
2508 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
2509 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
2510 	bool legacy_ol_flags)
2511 {
2512 	uint16_t i;
2513 	uint16_t free_entries;
2514 	uint16_t dropped = 0;
2515 	static bool allocerr_warned;
2516 
2517 	/*
2518 	 * The ordering between avail index and
2519 	 * desc reads needs to be enforced.
2520 	 */
2521 	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
2522 			vq->last_avail_idx;
2523 	if (free_entries == 0)
2524 		return 0;
2525 
2526 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
2527 
2528 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
2529 
2530 	count = RTE_MIN(count, MAX_PKT_BURST);
2531 	count = RTE_MIN(count, free_entries);
2532 	VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n",
2533 			dev->vid, count);
2534 
2535 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
2536 		return 0;
2537 
2538 	for (i = 0; i < count; i++) {
2539 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
2540 		uint16_t head_idx;
2541 		uint32_t buf_len;
2542 		uint16_t nr_vec = 0;
2543 		int err;
2544 
2545 		if (unlikely(fill_vec_buf_split(dev, vq,
2546 						vq->last_avail_idx + i,
2547 						&nr_vec, buf_vec,
2548 						&head_idx, &buf_len,
2549 						VHOST_ACCESS_RO) < 0))
2550 			break;
2551 
2552 		update_shadow_used_ring_split(vq, head_idx, 0);
2553 
2554 		err = virtio_dev_pktmbuf_prep(dev, pkts[i], buf_len);
2555 		if (unlikely(err)) {
2556 			/*
2557 			 * mbuf allocation fails for jumbo packets when external
2558 			 * buffer allocation is not allowed and linear buffer
2559 			 * is required. Drop this packet.
2560 			 */
2561 			if (!allocerr_warned) {
2562 				VHOST_LOG_DATA(ERR,
2563 					"Failed mbuf alloc of size %d from %s on %s.\n",
2564 					buf_len, mbuf_pool->name, dev->ifname);
2565 				allocerr_warned = true;
2566 			}
2567 			dropped += 1;
2568 			i++;
2569 			break;
2570 		}
2571 
2572 		err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
2573 				mbuf_pool, legacy_ol_flags);
2574 		if (unlikely(err)) {
2575 			if (!allocerr_warned) {
2576 				VHOST_LOG_DATA(ERR,
2577 					"Failed to copy desc to mbuf on %s.\n",
2578 					dev->ifname);
2579 				allocerr_warned = true;
2580 			}
2581 			dropped += 1;
2582 			i++;
2583 			break;
2584 		}
2585 	}
2586 
2587 	if (dropped)
2588 		rte_pktmbuf_free_bulk(&pkts[i - 1], count - i + 1);
2589 
2590 	vq->last_avail_idx += i;
2591 
2592 	do_data_copy_dequeue(vq);
2593 	if (unlikely(i < count))
2594 		vq->shadow_used_idx = i;
2595 	if (likely(vq->shadow_used_idx)) {
2596 		flush_shadow_used_ring_split(dev, vq);
2597 		vhost_vring_call_split(dev, vq);
2598 	}
2599 
2600 	return (i - dropped);
2601 }
2602 
2603 __rte_noinline
2604 static uint16_t
2605 virtio_dev_tx_split_legacy(struct virtio_net *dev,
2606 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2607 	struct rte_mbuf **pkts, uint16_t count)
2608 {
2609 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, true);
2610 }
2611 
2612 __rte_noinline
2613 static uint16_t
2614 virtio_dev_tx_split_compliant(struct virtio_net *dev,
2615 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
2616 	struct rte_mbuf **pkts, uint16_t count)
2617 {
2618 	return virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count, false);
2619 }
2620 
2621 static __rte_always_inline int
2622 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
2623 				 struct vhost_virtqueue *vq,
2624 				 struct rte_mbuf **pkts,
2625 				 uint16_t avail_idx,
2626 				 uintptr_t *desc_addrs,
2627 				 uint16_t *ids)
2628 {
2629 	bool wrap = vq->avail_wrap_counter;
2630 	struct vring_packed_desc *descs = vq->desc_packed;
2631 	uint64_t lens[PACKED_BATCH_SIZE];
2632 	uint64_t buf_lens[PACKED_BATCH_SIZE];
2633 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2634 	uint16_t flags, i;
2635 
2636 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
2637 		return -1;
2638 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
2639 		return -1;
2640 
2641 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2642 		flags = descs[avail_idx + i].flags;
2643 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
2644 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
2645 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
2646 			return -1;
2647 	}
2648 
2649 	rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
2650 
2651 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2652 		lens[i] = descs[avail_idx + i].len;
2653 
2654 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2655 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
2656 						  descs[avail_idx + i].addr,
2657 						  &lens[i], VHOST_ACCESS_RW);
2658 	}
2659 
2660 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2661 		if (unlikely(!desc_addrs[i]))
2662 			return -1;
2663 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
2664 			return -1;
2665 	}
2666 
2667 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2668 		if (virtio_dev_pktmbuf_prep(dev, pkts[i], lens[i]))
2669 			goto err;
2670 	}
2671 
2672 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2673 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
2674 
2675 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2676 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
2677 			goto err;
2678 	}
2679 
2680 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2681 		pkts[i]->pkt_len = lens[i] - buf_offset;
2682 		pkts[i]->data_len = pkts[i]->pkt_len;
2683 		ids[i] = descs[avail_idx + i].id;
2684 	}
2685 
2686 	return 0;
2687 
2688 err:
2689 	return -1;
2690 }
2691 
2692 static __rte_always_inline int
2693 virtio_dev_tx_batch_packed(struct virtio_net *dev,
2694 			   struct vhost_virtqueue *vq,
2695 			   struct rte_mbuf **pkts,
2696 			   bool legacy_ol_flags)
2697 {
2698 	uint16_t avail_idx = vq->last_avail_idx;
2699 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2700 	struct virtio_net_hdr *hdr;
2701 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
2702 	uint16_t ids[PACKED_BATCH_SIZE];
2703 	uint16_t i;
2704 
2705 	if (vhost_reserve_avail_batch_packed(dev, vq, pkts, avail_idx,
2706 					     desc_addrs, ids))
2707 		return -1;
2708 
2709 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2710 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2711 
2712 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2713 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
2714 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
2715 			   pkts[i]->pkt_len);
2716 
2717 	if (virtio_net_with_host_offload(dev)) {
2718 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2719 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
2720 			vhost_dequeue_offload(hdr, pkts[i], legacy_ol_flags);
2721 		}
2722 	}
2723 
2724 	if (virtio_net_is_inorder(dev))
2725 		vhost_shadow_dequeue_batch_packed_inorder(vq,
2726 			ids[PACKED_BATCH_SIZE - 1]);
2727 	else
2728 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
2729 
2730 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2731 
2732 	return 0;
2733 }
2734 
2735 static __rte_always_inline int
2736 vhost_dequeue_single_packed(struct virtio_net *dev,
2737 			    struct vhost_virtqueue *vq,
2738 			    struct rte_mempool *mbuf_pool,
2739 			    struct rte_mbuf *pkts,
2740 			    uint16_t *buf_id,
2741 			    uint16_t *desc_count,
2742 			    bool legacy_ol_flags)
2743 {
2744 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
2745 	uint32_t buf_len;
2746 	uint16_t nr_vec = 0;
2747 	int err;
2748 	static bool allocerr_warned;
2749 
2750 	if (unlikely(fill_vec_buf_packed(dev, vq,
2751 					 vq->last_avail_idx, desc_count,
2752 					 buf_vec, &nr_vec,
2753 					 buf_id, &buf_len,
2754 					 VHOST_ACCESS_RO) < 0))
2755 		return -1;
2756 
2757 	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
2758 		if (!allocerr_warned) {
2759 			VHOST_LOG_DATA(ERR,
2760 				"Failed mbuf alloc of size %d from %s on %s.\n",
2761 				buf_len, mbuf_pool->name, dev->ifname);
2762 			allocerr_warned = true;
2763 		}
2764 		return -1;
2765 	}
2766 
2767 	err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts,
2768 				mbuf_pool, legacy_ol_flags);
2769 	if (unlikely(err)) {
2770 		if (!allocerr_warned) {
2771 			VHOST_LOG_DATA(ERR,
2772 				"Failed to copy desc to mbuf on %s.\n",
2773 				dev->ifname);
2774 			allocerr_warned = true;
2775 		}
2776 		return -1;
2777 	}
2778 
2779 	return 0;
2780 }
2781 
2782 static __rte_always_inline int
2783 virtio_dev_tx_single_packed(struct virtio_net *dev,
2784 			    struct vhost_virtqueue *vq,
2785 			    struct rte_mempool *mbuf_pool,
2786 			    struct rte_mbuf *pkts,
2787 			    bool legacy_ol_flags)
2788 {
2789 
2790 	uint16_t buf_id, desc_count = 0;
2791 	int ret;
2792 
2793 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
2794 					&desc_count, legacy_ol_flags);
2795 
2796 	if (likely(desc_count > 0)) {
2797 		if (virtio_net_is_inorder(dev))
2798 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
2799 								   desc_count);
2800 		else
2801 			vhost_shadow_dequeue_single_packed(vq, buf_id,
2802 					desc_count);
2803 
2804 		vq_inc_last_avail_packed(vq, desc_count);
2805 	}
2806 
2807 	return ret;
2808 }
2809 
2810 __rte_always_inline
2811 static uint16_t
2812 virtio_dev_tx_packed(struct virtio_net *dev,
2813 		     struct vhost_virtqueue *__rte_restrict vq,
2814 		     struct rte_mempool *mbuf_pool,
2815 		     struct rte_mbuf **__rte_restrict pkts,
2816 		     uint32_t count,
2817 		     bool legacy_ol_flags)
2818 {
2819 	uint32_t pkt_idx = 0;
2820 
2821 	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts, count))
2822 		return 0;
2823 
2824 	do {
2825 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2826 
2827 		if (count - pkt_idx >= PACKED_BATCH_SIZE) {
2828 			if (!virtio_dev_tx_batch_packed(dev, vq,
2829 							&pkts[pkt_idx],
2830 							legacy_ol_flags)) {
2831 				pkt_idx += PACKED_BATCH_SIZE;
2832 				continue;
2833 			}
2834 		}
2835 
2836 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
2837 						pkts[pkt_idx],
2838 						legacy_ol_flags))
2839 			break;
2840 		pkt_idx++;
2841 	} while (pkt_idx < count);
2842 
2843 	if (pkt_idx != count)
2844 		rte_pktmbuf_free_bulk(&pkts[pkt_idx], count - pkt_idx);
2845 
2846 	if (vq->shadow_used_idx) {
2847 		do_data_copy_dequeue(vq);
2848 
2849 		vhost_flush_dequeue_shadow_packed(dev, vq);
2850 		vhost_vring_call_packed(dev, vq);
2851 	}
2852 
2853 	return pkt_idx;
2854 }
2855 
2856 __rte_noinline
2857 static uint16_t
2858 virtio_dev_tx_packed_legacy(struct virtio_net *dev,
2859 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
2860 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
2861 {
2862 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, true);
2863 }
2864 
2865 __rte_noinline
2866 static uint16_t
2867 virtio_dev_tx_packed_compliant(struct virtio_net *dev,
2868 	struct vhost_virtqueue *__rte_restrict vq, struct rte_mempool *mbuf_pool,
2869 	struct rte_mbuf **__rte_restrict pkts, uint32_t count)
2870 {
2871 	return virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count, false);
2872 }
2873 
2874 uint16_t
2875 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
2876 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
2877 {
2878 	struct virtio_net *dev;
2879 	struct rte_mbuf *rarp_mbuf = NULL;
2880 	struct vhost_virtqueue *vq;
2881 	int16_t success = 1;
2882 
2883 	dev = get_device(vid);
2884 	if (!dev)
2885 		return 0;
2886 
2887 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2888 		VHOST_LOG_DATA(ERR,
2889 			"(%d) %s: built-in vhost net backend is disabled.\n",
2890 			dev->vid, __func__);
2891 		return 0;
2892 	}
2893 
2894 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
2895 		VHOST_LOG_DATA(ERR,
2896 			"(%d) %s: invalid virtqueue idx %d.\n",
2897 			dev->vid, __func__, queue_id);
2898 		return 0;
2899 	}
2900 
2901 	vq = dev->virtqueue[queue_id];
2902 
2903 	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
2904 		return 0;
2905 
2906 	if (unlikely(!vq->enabled)) {
2907 		count = 0;
2908 		goto out_access_unlock;
2909 	}
2910 
2911 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2912 		vhost_user_iotlb_rd_lock(vq);
2913 
2914 	if (unlikely(!vq->access_ok))
2915 		if (unlikely(vring_translate(dev, vq) < 0)) {
2916 			count = 0;
2917 			goto out;
2918 		}
2919 
2920 	/*
2921 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
2922 	 * array, to looks like that guest actually send such packet.
2923 	 *
2924 	 * Check user_send_rarp() for more information.
2925 	 *
2926 	 * broadcast_rarp shares a cacheline in the virtio_net structure
2927 	 * with some fields that are accessed during enqueue and
2928 	 * __atomic_compare_exchange_n causes a write if performed compare
2929 	 * and exchange. This could result in false sharing between enqueue
2930 	 * and dequeue.
2931 	 *
2932 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
2933 	 * and only performing compare and exchange if the read indicates it
2934 	 * is likely to be set.
2935 	 */
2936 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
2937 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
2938 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
2939 
2940 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
2941 		if (rarp_mbuf == NULL) {
2942 			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
2943 			count = 0;
2944 			goto out;
2945 		}
2946 		/*
2947 		 * Inject it to the head of "pkts" array, so that switch's mac
2948 		 * learning table will get updated first.
2949 		 */
2950 		pkts[0] = rarp_mbuf;
2951 		pkts++;
2952 		count -= 1;
2953 	}
2954 
2955 	if (vq_is_packed(dev)) {
2956 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
2957 			count = virtio_dev_tx_packed_legacy(dev, vq, mbuf_pool, pkts, count);
2958 		else
2959 			count = virtio_dev_tx_packed_compliant(dev, vq, mbuf_pool, pkts, count);
2960 	} else {
2961 		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
2962 			count = virtio_dev_tx_split_legacy(dev, vq, mbuf_pool, pkts, count);
2963 		else
2964 			count = virtio_dev_tx_split_compliant(dev, vq, mbuf_pool, pkts, count);
2965 	}
2966 
2967 out:
2968 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2969 		vhost_user_iotlb_rd_unlock(vq);
2970 
2971 out_access_unlock:
2972 	rte_spinlock_unlock(&vq->access_lock);
2973 
2974 	if (unlikely(rarp_mbuf != NULL))
2975 		count += 1;
2976 
2977 	return count;
2978 }
2979