xref: /dpdk/lib/vhost/virtio_net.c (revision 99a2dd955fba6e4cc23b77d590a033650ced9c45)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <linux/virtio_net.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_memcpy.h>
11 #include <rte_ether.h>
12 #include <rte_ip.h>
13 #include <rte_vhost.h>
14 #include <rte_tcp.h>
15 #include <rte_udp.h>
16 #include <rte_sctp.h>
17 #include <rte_arp.h>
18 #include <rte_spinlock.h>
19 #include <rte_malloc.h>
20 #include <rte_vhost_async.h>
21 
22 #include "iotlb.h"
23 #include "vhost.h"
24 
25 #define MAX_BATCH_LEN 256
26 
27 #define VHOST_ASYNC_BATCH_THRESHOLD 32
28 
29 static  __rte_always_inline bool
30 rxvq_is_mergeable(struct virtio_net *dev)
31 {
32 	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
33 }
34 
35 static  __rte_always_inline bool
36 virtio_net_is_inorder(struct virtio_net *dev)
37 {
38 	return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
39 }
40 
41 static bool
42 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
43 {
44 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
45 }
46 
47 static inline void
48 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
49 {
50 	struct batch_copy_elem *elem = vq->batch_copy_elems;
51 	uint16_t count = vq->batch_copy_nb_elems;
52 	int i;
53 
54 	for (i = 0; i < count; i++) {
55 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
56 		vhost_log_cache_write_iova(dev, vq, elem[i].log_addr,
57 					   elem[i].len);
58 		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
59 	}
60 
61 	vq->batch_copy_nb_elems = 0;
62 }
63 
64 static inline void
65 do_data_copy_dequeue(struct vhost_virtqueue *vq)
66 {
67 	struct batch_copy_elem *elem = vq->batch_copy_elems;
68 	uint16_t count = vq->batch_copy_nb_elems;
69 	int i;
70 
71 	for (i = 0; i < count; i++)
72 		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
73 
74 	vq->batch_copy_nb_elems = 0;
75 }
76 
77 static __rte_always_inline void
78 do_flush_shadow_used_ring_split(struct virtio_net *dev,
79 			struct vhost_virtqueue *vq,
80 			uint16_t to, uint16_t from, uint16_t size)
81 {
82 	rte_memcpy(&vq->used->ring[to],
83 			&vq->shadow_used_split[from],
84 			size * sizeof(struct vring_used_elem));
85 	vhost_log_cache_used_vring(dev, vq,
86 			offsetof(struct vring_used, ring[to]),
87 			size * sizeof(struct vring_used_elem));
88 }
89 
90 static __rte_always_inline void
91 flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
92 {
93 	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
94 
95 	if (used_idx + vq->shadow_used_idx <= vq->size) {
96 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
97 					  vq->shadow_used_idx);
98 	} else {
99 		uint16_t size;
100 
101 		/* update used ring interval [used_idx, vq->size] */
102 		size = vq->size - used_idx;
103 		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
104 
105 		/* update the left half used ring interval [0, left_size] */
106 		do_flush_shadow_used_ring_split(dev, vq, 0, size,
107 					  vq->shadow_used_idx - size);
108 	}
109 	vq->last_used_idx += vq->shadow_used_idx;
110 
111 	vhost_log_cache_sync(dev, vq);
112 
113 	__atomic_add_fetch(&vq->used->idx, vq->shadow_used_idx,
114 			   __ATOMIC_RELEASE);
115 	vq->shadow_used_idx = 0;
116 	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
117 		sizeof(vq->used->idx));
118 }
119 
120 static __rte_always_inline void
121 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
122 			 uint16_t desc_idx, uint32_t len)
123 {
124 	uint16_t i = vq->shadow_used_idx++;
125 
126 	vq->shadow_used_split[i].id  = desc_idx;
127 	vq->shadow_used_split[i].len = len;
128 }
129 
130 static __rte_always_inline void
131 vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
132 				  struct vhost_virtqueue *vq)
133 {
134 	int i;
135 	uint16_t used_idx = vq->last_used_idx;
136 	uint16_t head_idx = vq->last_used_idx;
137 	uint16_t head_flags = 0;
138 
139 	/* Split loop in two to save memory barriers */
140 	for (i = 0; i < vq->shadow_used_idx; i++) {
141 		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
142 		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
143 
144 		used_idx += vq->shadow_used_packed[i].count;
145 		if (used_idx >= vq->size)
146 			used_idx -= vq->size;
147 	}
148 
149 	/* The ordering for storing desc flags needs to be enforced. */
150 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
151 
152 	for (i = 0; i < vq->shadow_used_idx; i++) {
153 		uint16_t flags;
154 
155 		if (vq->shadow_used_packed[i].len)
156 			flags = VRING_DESC_F_WRITE;
157 		else
158 			flags = 0;
159 
160 		if (vq->used_wrap_counter) {
161 			flags |= VRING_DESC_F_USED;
162 			flags |= VRING_DESC_F_AVAIL;
163 		} else {
164 			flags &= ~VRING_DESC_F_USED;
165 			flags &= ~VRING_DESC_F_AVAIL;
166 		}
167 
168 		if (i > 0) {
169 			vq->desc_packed[vq->last_used_idx].flags = flags;
170 
171 			vhost_log_cache_used_vring(dev, vq,
172 					vq->last_used_idx *
173 					sizeof(struct vring_packed_desc),
174 					sizeof(struct vring_packed_desc));
175 		} else {
176 			head_idx = vq->last_used_idx;
177 			head_flags = flags;
178 		}
179 
180 		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
181 	}
182 
183 	vq->desc_packed[head_idx].flags = head_flags;
184 
185 	vhost_log_cache_used_vring(dev, vq,
186 				head_idx *
187 				sizeof(struct vring_packed_desc),
188 				sizeof(struct vring_packed_desc));
189 
190 	vq->shadow_used_idx = 0;
191 	vhost_log_cache_sync(dev, vq);
192 }
193 
194 static __rte_always_inline void
195 vhost_flush_dequeue_shadow_packed(struct virtio_net *dev,
196 				  struct vhost_virtqueue *vq)
197 {
198 	struct vring_used_elem_packed *used_elem = &vq->shadow_used_packed[0];
199 
200 	vq->desc_packed[vq->shadow_last_used_idx].id = used_elem->id;
201 	/* desc flags is the synchronization point for virtio packed vring */
202 	__atomic_store_n(&vq->desc_packed[vq->shadow_last_used_idx].flags,
203 			 used_elem->flags, __ATOMIC_RELEASE);
204 
205 	vhost_log_cache_used_vring(dev, vq, vq->shadow_last_used_idx *
206 				   sizeof(struct vring_packed_desc),
207 				   sizeof(struct vring_packed_desc));
208 	vq->shadow_used_idx = 0;
209 	vhost_log_cache_sync(dev, vq);
210 }
211 
212 static __rte_always_inline void
213 vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
214 				 struct vhost_virtqueue *vq,
215 				 uint64_t *lens,
216 				 uint16_t *ids)
217 {
218 	uint16_t i;
219 	uint16_t flags;
220 
221 	if (vq->shadow_used_idx) {
222 		do_data_copy_enqueue(dev, vq);
223 		vhost_flush_enqueue_shadow_packed(dev, vq);
224 	}
225 
226 	flags = PACKED_DESC_ENQUEUE_USED_FLAG(vq->used_wrap_counter);
227 
228 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
229 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
230 		vq->desc_packed[vq->last_used_idx + i].len = lens[i];
231 	}
232 
233 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
234 
235 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
236 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
237 
238 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
239 				   sizeof(struct vring_packed_desc),
240 				   sizeof(struct vring_packed_desc) *
241 				   PACKED_BATCH_SIZE);
242 	vhost_log_cache_sync(dev, vq);
243 
244 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
245 }
246 
247 static __rte_always_inline void
248 vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
249 					  uint16_t id)
250 {
251 	vq->shadow_used_packed[0].id = id;
252 
253 	if (!vq->shadow_used_idx) {
254 		vq->shadow_last_used_idx = vq->last_used_idx;
255 		vq->shadow_used_packed[0].flags =
256 			PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
257 		vq->shadow_used_packed[0].len = 0;
258 		vq->shadow_used_packed[0].count = 1;
259 		vq->shadow_used_idx++;
260 	}
261 
262 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
263 }
264 
265 static __rte_always_inline void
266 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
267 				  struct vhost_virtqueue *vq,
268 				  uint16_t *ids)
269 {
270 	uint16_t flags;
271 	uint16_t i;
272 	uint16_t begin;
273 
274 	flags = PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
275 
276 	if (!vq->shadow_used_idx) {
277 		vq->shadow_last_used_idx = vq->last_used_idx;
278 		vq->shadow_used_packed[0].id  = ids[0];
279 		vq->shadow_used_packed[0].len = 0;
280 		vq->shadow_used_packed[0].count = 1;
281 		vq->shadow_used_packed[0].flags = flags;
282 		vq->shadow_used_idx++;
283 		begin = 1;
284 	} else
285 		begin = 0;
286 
287 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE) {
288 		vq->desc_packed[vq->last_used_idx + i].id = ids[i];
289 		vq->desc_packed[vq->last_used_idx + i].len = 0;
290 	}
291 
292 	rte_atomic_thread_fence(__ATOMIC_RELEASE);
293 	vhost_for_each_try_unroll(i, begin, PACKED_BATCH_SIZE)
294 		vq->desc_packed[vq->last_used_idx + i].flags = flags;
295 
296 	vhost_log_cache_used_vring(dev, vq, vq->last_used_idx *
297 				   sizeof(struct vring_packed_desc),
298 				   sizeof(struct vring_packed_desc) *
299 				   PACKED_BATCH_SIZE);
300 	vhost_log_cache_sync(dev, vq);
301 
302 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
303 }
304 
305 static __rte_always_inline void
306 vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
307 				   uint16_t buf_id,
308 				   uint16_t count)
309 {
310 	uint16_t flags;
311 
312 	flags = vq->desc_packed[vq->last_used_idx].flags;
313 	if (vq->used_wrap_counter) {
314 		flags |= VRING_DESC_F_USED;
315 		flags |= VRING_DESC_F_AVAIL;
316 	} else {
317 		flags &= ~VRING_DESC_F_USED;
318 		flags &= ~VRING_DESC_F_AVAIL;
319 	}
320 
321 	if (!vq->shadow_used_idx) {
322 		vq->shadow_last_used_idx = vq->last_used_idx;
323 
324 		vq->shadow_used_packed[0].id  = buf_id;
325 		vq->shadow_used_packed[0].len = 0;
326 		vq->shadow_used_packed[0].flags = flags;
327 		vq->shadow_used_idx++;
328 	} else {
329 		vq->desc_packed[vq->last_used_idx].id = buf_id;
330 		vq->desc_packed[vq->last_used_idx].len = 0;
331 		vq->desc_packed[vq->last_used_idx].flags = flags;
332 	}
333 
334 	vq_inc_last_used_packed(vq, count);
335 }
336 
337 static __rte_always_inline void
338 vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
339 					   uint16_t buf_id,
340 					   uint16_t count)
341 {
342 	uint16_t flags;
343 
344 	vq->shadow_used_packed[0].id = buf_id;
345 
346 	flags = vq->desc_packed[vq->last_used_idx].flags;
347 	if (vq->used_wrap_counter) {
348 		flags |= VRING_DESC_F_USED;
349 		flags |= VRING_DESC_F_AVAIL;
350 	} else {
351 		flags &= ~VRING_DESC_F_USED;
352 		flags &= ~VRING_DESC_F_AVAIL;
353 	}
354 
355 	if (!vq->shadow_used_idx) {
356 		vq->shadow_last_used_idx = vq->last_used_idx;
357 		vq->shadow_used_packed[0].len = 0;
358 		vq->shadow_used_packed[0].flags = flags;
359 		vq->shadow_used_idx++;
360 	}
361 
362 	vq_inc_last_used_packed(vq, count);
363 }
364 
365 static __rte_always_inline void
366 vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
367 				   struct vhost_virtqueue *vq,
368 				   uint32_t len[],
369 				   uint16_t id[],
370 				   uint16_t count[],
371 				   uint16_t num_buffers)
372 {
373 	uint16_t i;
374 	for (i = 0; i < num_buffers; i++) {
375 		/* enqueue shadow flush action aligned with batch num */
376 		if (!vq->shadow_used_idx)
377 			vq->shadow_aligned_idx = vq->last_used_idx &
378 				PACKED_BATCH_MASK;
379 		vq->shadow_used_packed[vq->shadow_used_idx].id  = id[i];
380 		vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
381 		vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
382 		vq->shadow_aligned_idx += count[i];
383 		vq->shadow_used_idx++;
384 	}
385 
386 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
387 		do_data_copy_enqueue(dev, vq);
388 		vhost_flush_enqueue_shadow_packed(dev, vq);
389 	}
390 }
391 
392 /* avoid write operation when necessary, to lessen cache issues */
393 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
394 	if ((var) != (val))			\
395 		(var) = (val);			\
396 } while (0)
397 
398 static __rte_always_inline void
399 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
400 {
401 	uint64_t csum_l4 = m_buf->ol_flags & PKT_TX_L4_MASK;
402 
403 	if (m_buf->ol_flags & PKT_TX_TCP_SEG)
404 		csum_l4 |= PKT_TX_TCP_CKSUM;
405 
406 	if (csum_l4) {
407 		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
408 		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
409 
410 		switch (csum_l4) {
411 		case PKT_TX_TCP_CKSUM:
412 			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
413 						cksum));
414 			break;
415 		case PKT_TX_UDP_CKSUM:
416 			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
417 						dgram_cksum));
418 			break;
419 		case PKT_TX_SCTP_CKSUM:
420 			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
421 						cksum));
422 			break;
423 		}
424 	} else {
425 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
426 		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
427 		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
428 	}
429 
430 	/* IP cksum verification cannot be bypassed, then calculate here */
431 	if (m_buf->ol_flags & PKT_TX_IP_CKSUM) {
432 		struct rte_ipv4_hdr *ipv4_hdr;
433 
434 		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
435 						   m_buf->l2_len);
436 		ipv4_hdr->hdr_checksum = 0;
437 		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
438 	}
439 
440 	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
441 		if (m_buf->ol_flags & PKT_TX_IPV4)
442 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
443 		else
444 			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
445 		net_hdr->gso_size = m_buf->tso_segsz;
446 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
447 					+ m_buf->l4_len;
448 	} else if (m_buf->ol_flags & PKT_TX_UDP_SEG) {
449 		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
450 		net_hdr->gso_size = m_buf->tso_segsz;
451 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
452 			m_buf->l4_len;
453 	} else {
454 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
455 		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
456 		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
457 	}
458 }
459 
460 static __rte_always_inline int
461 map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
462 		struct buf_vector *buf_vec, uint16_t *vec_idx,
463 		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
464 {
465 	uint16_t vec_id = *vec_idx;
466 
467 	while (desc_len) {
468 		uint64_t desc_addr;
469 		uint64_t desc_chunck_len = desc_len;
470 
471 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
472 			return -1;
473 
474 		desc_addr = vhost_iova_to_vva(dev, vq,
475 				desc_iova,
476 				&desc_chunck_len,
477 				perm);
478 		if (unlikely(!desc_addr))
479 			return -1;
480 
481 		rte_prefetch0((void *)(uintptr_t)desc_addr);
482 
483 		buf_vec[vec_id].buf_iova = desc_iova;
484 		buf_vec[vec_id].buf_addr = desc_addr;
485 		buf_vec[vec_id].buf_len  = desc_chunck_len;
486 
487 		desc_len -= desc_chunck_len;
488 		desc_iova += desc_chunck_len;
489 		vec_id++;
490 	}
491 	*vec_idx = vec_id;
492 
493 	return 0;
494 }
495 
496 static __rte_always_inline int
497 fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
498 			 uint32_t avail_idx, uint16_t *vec_idx,
499 			 struct buf_vector *buf_vec, uint16_t *desc_chain_head,
500 			 uint32_t *desc_chain_len, uint8_t perm)
501 {
502 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
503 	uint16_t vec_id = *vec_idx;
504 	uint32_t len    = 0;
505 	uint64_t dlen;
506 	uint32_t nr_descs = vq->size;
507 	uint32_t cnt    = 0;
508 	struct vring_desc *descs = vq->desc;
509 	struct vring_desc *idesc = NULL;
510 
511 	if (unlikely(idx >= vq->size))
512 		return -1;
513 
514 	*desc_chain_head = idx;
515 
516 	if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
517 		dlen = vq->desc[idx].len;
518 		nr_descs = dlen / sizeof(struct vring_desc);
519 		if (unlikely(nr_descs > vq->size))
520 			return -1;
521 
522 		descs = (struct vring_desc *)(uintptr_t)
523 			vhost_iova_to_vva(dev, vq, vq->desc[idx].addr,
524 						&dlen,
525 						VHOST_ACCESS_RO);
526 		if (unlikely(!descs))
527 			return -1;
528 
529 		if (unlikely(dlen < vq->desc[idx].len)) {
530 			/*
531 			 * The indirect desc table is not contiguous
532 			 * in process VA space, we have to copy it.
533 			 */
534 			idesc = vhost_alloc_copy_ind_table(dev, vq,
535 					vq->desc[idx].addr, vq->desc[idx].len);
536 			if (unlikely(!idesc))
537 				return -1;
538 
539 			descs = idesc;
540 		}
541 
542 		idx = 0;
543 	}
544 
545 	while (1) {
546 		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
547 			free_ind_table(idesc);
548 			return -1;
549 		}
550 
551 		dlen = descs[idx].len;
552 		len += dlen;
553 
554 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
555 						descs[idx].addr, dlen,
556 						perm))) {
557 			free_ind_table(idesc);
558 			return -1;
559 		}
560 
561 		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
562 			break;
563 
564 		idx = descs[idx].next;
565 	}
566 
567 	*desc_chain_len = len;
568 	*vec_idx = vec_id;
569 
570 	if (unlikely(!!idesc))
571 		free_ind_table(idesc);
572 
573 	return 0;
574 }
575 
576 /*
577  * Returns -1 on fail, 0 on success
578  */
579 static inline int
580 reserve_avail_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
581 				uint32_t size, struct buf_vector *buf_vec,
582 				uint16_t *num_buffers, uint16_t avail_head,
583 				uint16_t *nr_vec)
584 {
585 	uint16_t cur_idx;
586 	uint16_t vec_idx = 0;
587 	uint16_t max_tries, tries = 0;
588 
589 	uint16_t head_idx = 0;
590 	uint32_t len = 0;
591 
592 	*num_buffers = 0;
593 	cur_idx  = vq->last_avail_idx;
594 
595 	if (rxvq_is_mergeable(dev))
596 		max_tries = vq->size - 1;
597 	else
598 		max_tries = 1;
599 
600 	while (size > 0) {
601 		if (unlikely(cur_idx == avail_head))
602 			return -1;
603 		/*
604 		 * if we tried all available ring items, and still
605 		 * can't get enough buf, it means something abnormal
606 		 * happened.
607 		 */
608 		if (unlikely(++tries > max_tries))
609 			return -1;
610 
611 		if (unlikely(fill_vec_buf_split(dev, vq, cur_idx,
612 						&vec_idx, buf_vec,
613 						&head_idx, &len,
614 						VHOST_ACCESS_RW) < 0))
615 			return -1;
616 		len = RTE_MIN(len, size);
617 		update_shadow_used_ring_split(vq, head_idx, len);
618 		size -= len;
619 
620 		cur_idx++;
621 		*num_buffers += 1;
622 	}
623 
624 	*nr_vec = vec_idx;
625 
626 	return 0;
627 }
628 
629 static __rte_always_inline int
630 fill_vec_buf_packed_indirect(struct virtio_net *dev,
631 			struct vhost_virtqueue *vq,
632 			struct vring_packed_desc *desc, uint16_t *vec_idx,
633 			struct buf_vector *buf_vec, uint32_t *len, uint8_t perm)
634 {
635 	uint16_t i;
636 	uint32_t nr_descs;
637 	uint16_t vec_id = *vec_idx;
638 	uint64_t dlen;
639 	struct vring_packed_desc *descs, *idescs = NULL;
640 
641 	dlen = desc->len;
642 	descs = (struct vring_packed_desc *)(uintptr_t)
643 		vhost_iova_to_vva(dev, vq, desc->addr, &dlen, VHOST_ACCESS_RO);
644 	if (unlikely(!descs))
645 		return -1;
646 
647 	if (unlikely(dlen < desc->len)) {
648 		/*
649 		 * The indirect desc table is not contiguous
650 		 * in process VA space, we have to copy it.
651 		 */
652 		idescs = vhost_alloc_copy_ind_table(dev,
653 				vq, desc->addr, desc->len);
654 		if (unlikely(!idescs))
655 			return -1;
656 
657 		descs = idescs;
658 	}
659 
660 	nr_descs =  desc->len / sizeof(struct vring_packed_desc);
661 	if (unlikely(nr_descs >= vq->size)) {
662 		free_ind_table(idescs);
663 		return -1;
664 	}
665 
666 	for (i = 0; i < nr_descs; i++) {
667 		if (unlikely(vec_id >= BUF_VECTOR_MAX)) {
668 			free_ind_table(idescs);
669 			return -1;
670 		}
671 
672 		dlen = descs[i].len;
673 		*len += dlen;
674 		if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
675 						descs[i].addr, dlen,
676 						perm)))
677 			return -1;
678 	}
679 	*vec_idx = vec_id;
680 
681 	if (unlikely(!!idescs))
682 		free_ind_table(idescs);
683 
684 	return 0;
685 }
686 
687 static __rte_always_inline int
688 fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
689 				uint16_t avail_idx, uint16_t *desc_count,
690 				struct buf_vector *buf_vec, uint16_t *vec_idx,
691 				uint16_t *buf_id, uint32_t *len, uint8_t perm)
692 {
693 	bool wrap_counter = vq->avail_wrap_counter;
694 	struct vring_packed_desc *descs = vq->desc_packed;
695 	uint16_t vec_id = *vec_idx;
696 	uint64_t dlen;
697 
698 	if (avail_idx < vq->last_avail_idx)
699 		wrap_counter ^= 1;
700 
701 	/*
702 	 * Perform a load-acquire barrier in desc_is_avail to
703 	 * enforce the ordering between desc flags and desc
704 	 * content.
705 	 */
706 	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
707 		return -1;
708 
709 	*desc_count = 0;
710 	*len = 0;
711 
712 	while (1) {
713 		if (unlikely(vec_id >= BUF_VECTOR_MAX))
714 			return -1;
715 
716 		if (unlikely(*desc_count >= vq->size))
717 			return -1;
718 
719 		*desc_count += 1;
720 		*buf_id = descs[avail_idx].id;
721 
722 		if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
723 			if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
724 							&descs[avail_idx],
725 							&vec_id, buf_vec,
726 							len, perm) < 0))
727 				return -1;
728 		} else {
729 			dlen = descs[avail_idx].len;
730 			*len += dlen;
731 
732 			if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
733 							descs[avail_idx].addr,
734 							dlen,
735 							perm)))
736 				return -1;
737 		}
738 
739 		if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
740 			break;
741 
742 		if (++avail_idx >= vq->size) {
743 			avail_idx -= vq->size;
744 			wrap_counter ^= 1;
745 		}
746 	}
747 
748 	*vec_idx = vec_id;
749 
750 	return 0;
751 }
752 
753 static __rte_noinline void
754 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
755 		struct buf_vector *buf_vec,
756 		struct virtio_net_hdr_mrg_rxbuf *hdr)
757 {
758 	uint64_t len;
759 	uint64_t remain = dev->vhost_hlen;
760 	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
761 	uint64_t iova = buf_vec->buf_iova;
762 
763 	while (remain) {
764 		len = RTE_MIN(remain,
765 				buf_vec->buf_len);
766 		dst = buf_vec->buf_addr;
767 		rte_memcpy((void *)(uintptr_t)dst,
768 				(void *)(uintptr_t)src,
769 				len);
770 
771 		PRINT_PACKET(dev, (uintptr_t)dst,
772 				(uint32_t)len, 0);
773 		vhost_log_cache_write_iova(dev, vq,
774 				iova, len);
775 
776 		remain -= len;
777 		iova += len;
778 		src += len;
779 		buf_vec++;
780 	}
781 }
782 
783 static __rte_always_inline int
784 copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
785 			    struct rte_mbuf *m, struct buf_vector *buf_vec,
786 			    uint16_t nr_vec, uint16_t num_buffers)
787 {
788 	uint32_t vec_idx = 0;
789 	uint32_t mbuf_offset, mbuf_avail;
790 	uint32_t buf_offset, buf_avail;
791 	uint64_t buf_addr, buf_iova, buf_len;
792 	uint32_t cpy_len;
793 	uint64_t hdr_addr;
794 	struct rte_mbuf *hdr_mbuf;
795 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
796 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
797 	int error = 0;
798 
799 	if (unlikely(m == NULL)) {
800 		error = -1;
801 		goto out;
802 	}
803 
804 	buf_addr = buf_vec[vec_idx].buf_addr;
805 	buf_iova = buf_vec[vec_idx].buf_iova;
806 	buf_len = buf_vec[vec_idx].buf_len;
807 
808 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
809 		error = -1;
810 		goto out;
811 	}
812 
813 	hdr_mbuf = m;
814 	hdr_addr = buf_addr;
815 	if (unlikely(buf_len < dev->vhost_hlen)) {
816 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
817 		hdr = &tmp_hdr;
818 	} else
819 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
820 
821 	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
822 		dev->vid, num_buffers);
823 
824 	if (unlikely(buf_len < dev->vhost_hlen)) {
825 		buf_offset = dev->vhost_hlen - buf_len;
826 		vec_idx++;
827 		buf_addr = buf_vec[vec_idx].buf_addr;
828 		buf_iova = buf_vec[vec_idx].buf_iova;
829 		buf_len = buf_vec[vec_idx].buf_len;
830 		buf_avail = buf_len - buf_offset;
831 	} else {
832 		buf_offset = dev->vhost_hlen;
833 		buf_avail = buf_len - dev->vhost_hlen;
834 	}
835 
836 	mbuf_avail  = rte_pktmbuf_data_len(m);
837 	mbuf_offset = 0;
838 	while (mbuf_avail != 0 || m->next != NULL) {
839 		/* done with current buf, get the next one */
840 		if (buf_avail == 0) {
841 			vec_idx++;
842 			if (unlikely(vec_idx >= nr_vec)) {
843 				error = -1;
844 				goto out;
845 			}
846 
847 			buf_addr = buf_vec[vec_idx].buf_addr;
848 			buf_iova = buf_vec[vec_idx].buf_iova;
849 			buf_len = buf_vec[vec_idx].buf_len;
850 
851 			buf_offset = 0;
852 			buf_avail  = buf_len;
853 		}
854 
855 		/* done with current mbuf, get the next one */
856 		if (mbuf_avail == 0) {
857 			m = m->next;
858 
859 			mbuf_offset = 0;
860 			mbuf_avail  = rte_pktmbuf_data_len(m);
861 		}
862 
863 		if (hdr_addr) {
864 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
865 			if (rxvq_is_mergeable(dev))
866 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
867 						num_buffers);
868 
869 			if (unlikely(hdr == &tmp_hdr)) {
870 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
871 			} else {
872 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
873 						dev->vhost_hlen, 0);
874 				vhost_log_cache_write_iova(dev, vq,
875 						buf_vec[0].buf_iova,
876 						dev->vhost_hlen);
877 			}
878 
879 			hdr_addr = 0;
880 		}
881 
882 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
883 
884 		if (likely(cpy_len > MAX_BATCH_LEN ||
885 					vq->batch_copy_nb_elems >= vq->size)) {
886 			rte_memcpy((void *)((uintptr_t)(buf_addr + buf_offset)),
887 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
888 				cpy_len);
889 			vhost_log_cache_write_iova(dev, vq,
890 						   buf_iova + buf_offset,
891 						   cpy_len);
892 			PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset),
893 				cpy_len, 0);
894 		} else {
895 			batch_copy[vq->batch_copy_nb_elems].dst =
896 				(void *)((uintptr_t)(buf_addr + buf_offset));
897 			batch_copy[vq->batch_copy_nb_elems].src =
898 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
899 			batch_copy[vq->batch_copy_nb_elems].log_addr =
900 				buf_iova + buf_offset;
901 			batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
902 			vq->batch_copy_nb_elems++;
903 		}
904 
905 		mbuf_avail  -= cpy_len;
906 		mbuf_offset += cpy_len;
907 		buf_avail  -= cpy_len;
908 		buf_offset += cpy_len;
909 	}
910 
911 out:
912 
913 	return error;
914 }
915 
916 static __rte_always_inline void
917 async_fill_vec(struct iovec *v, void *base, size_t len)
918 {
919 	v->iov_base = base;
920 	v->iov_len = len;
921 }
922 
923 static __rte_always_inline void
924 async_fill_iter(struct rte_vhost_iov_iter *it, size_t count,
925 	struct iovec *vec, unsigned long nr_seg)
926 {
927 	it->offset = 0;
928 	it->count = count;
929 
930 	if (count) {
931 		it->iov = vec;
932 		it->nr_segs = nr_seg;
933 	} else {
934 		it->iov = 0;
935 		it->nr_segs = 0;
936 	}
937 }
938 
939 static __rte_always_inline void
940 async_fill_desc(struct rte_vhost_async_desc *desc,
941 	struct rte_vhost_iov_iter *src, struct rte_vhost_iov_iter *dst)
942 {
943 	desc->src = src;
944 	desc->dst = dst;
945 }
946 
947 static __rte_always_inline int
948 async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
949 			struct rte_mbuf *m, struct buf_vector *buf_vec,
950 			uint16_t nr_vec, uint16_t num_buffers,
951 			struct iovec *src_iovec, struct iovec *dst_iovec,
952 			struct rte_vhost_iov_iter *src_it,
953 			struct rte_vhost_iov_iter *dst_it)
954 {
955 	uint32_t vec_idx = 0;
956 	uint32_t mbuf_offset, mbuf_avail;
957 	uint32_t buf_offset, buf_avail;
958 	uint64_t buf_addr, buf_iova, buf_len;
959 	uint32_t cpy_len, cpy_threshold;
960 	uint64_t hdr_addr;
961 	struct rte_mbuf *hdr_mbuf;
962 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
963 	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
964 	int error = 0;
965 	uint64_t mapped_len;
966 
967 	uint32_t tlen = 0;
968 	int tvec_idx = 0;
969 	void *hpa;
970 
971 	if (unlikely(m == NULL)) {
972 		error = -1;
973 		goto out;
974 	}
975 
976 	cpy_threshold = vq->async_threshold;
977 
978 	buf_addr = buf_vec[vec_idx].buf_addr;
979 	buf_iova = buf_vec[vec_idx].buf_iova;
980 	buf_len = buf_vec[vec_idx].buf_len;
981 
982 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
983 		error = -1;
984 		goto out;
985 	}
986 
987 	hdr_mbuf = m;
988 	hdr_addr = buf_addr;
989 	if (unlikely(buf_len < dev->vhost_hlen)) {
990 		memset(&tmp_hdr, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
991 		hdr = &tmp_hdr;
992 	} else
993 		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
994 
995 	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
996 		dev->vid, num_buffers);
997 
998 	if (unlikely(buf_len < dev->vhost_hlen)) {
999 		buf_offset = dev->vhost_hlen - buf_len;
1000 		vec_idx++;
1001 		buf_addr = buf_vec[vec_idx].buf_addr;
1002 		buf_iova = buf_vec[vec_idx].buf_iova;
1003 		buf_len = buf_vec[vec_idx].buf_len;
1004 		buf_avail = buf_len - buf_offset;
1005 	} else {
1006 		buf_offset = dev->vhost_hlen;
1007 		buf_avail = buf_len - dev->vhost_hlen;
1008 	}
1009 
1010 	mbuf_avail  = rte_pktmbuf_data_len(m);
1011 	mbuf_offset = 0;
1012 
1013 	while (mbuf_avail != 0 || m->next != NULL) {
1014 		/* done with current buf, get the next one */
1015 		if (buf_avail == 0) {
1016 			vec_idx++;
1017 			if (unlikely(vec_idx >= nr_vec)) {
1018 				error = -1;
1019 				goto out;
1020 			}
1021 
1022 			buf_addr = buf_vec[vec_idx].buf_addr;
1023 			buf_iova = buf_vec[vec_idx].buf_iova;
1024 			buf_len = buf_vec[vec_idx].buf_len;
1025 
1026 			buf_offset = 0;
1027 			buf_avail  = buf_len;
1028 		}
1029 
1030 		/* done with current mbuf, get the next one */
1031 		if (mbuf_avail == 0) {
1032 			m = m->next;
1033 
1034 			mbuf_offset = 0;
1035 			mbuf_avail  = rte_pktmbuf_data_len(m);
1036 		}
1037 
1038 		if (hdr_addr) {
1039 			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
1040 			if (rxvq_is_mergeable(dev))
1041 				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
1042 						num_buffers);
1043 
1044 			if (unlikely(hdr == &tmp_hdr)) {
1045 				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
1046 			} else {
1047 				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
1048 						dev->vhost_hlen, 0);
1049 				vhost_log_cache_write_iova(dev, vq,
1050 						buf_vec[0].buf_iova,
1051 						dev->vhost_hlen);
1052 			}
1053 
1054 			hdr_addr = 0;
1055 		}
1056 
1057 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
1058 
1059 		while (unlikely(cpy_len && cpy_len >= cpy_threshold)) {
1060 			hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev,
1061 					buf_iova + buf_offset,
1062 					cpy_len, &mapped_len);
1063 
1064 			if (unlikely(!hpa || mapped_len < cpy_threshold))
1065 				break;
1066 
1067 			async_fill_vec(src_iovec + tvec_idx,
1068 				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
1069 				mbuf_offset), (size_t)mapped_len);
1070 
1071 			async_fill_vec(dst_iovec + tvec_idx,
1072 					hpa, (size_t)mapped_len);
1073 
1074 			tlen += (uint32_t)mapped_len;
1075 			cpy_len -= (uint32_t)mapped_len;
1076 			mbuf_avail  -= (uint32_t)mapped_len;
1077 			mbuf_offset += (uint32_t)mapped_len;
1078 			buf_avail  -= (uint32_t)mapped_len;
1079 			buf_offset += (uint32_t)mapped_len;
1080 			tvec_idx++;
1081 		}
1082 
1083 		if (likely(cpy_len)) {
1084 			if (unlikely(vq->batch_copy_nb_elems >= vq->size)) {
1085 				rte_memcpy(
1086 				(void *)((uintptr_t)(buf_addr + buf_offset)),
1087 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
1088 				cpy_len);
1089 
1090 				PRINT_PACKET(dev,
1091 					(uintptr_t)(buf_addr + buf_offset),
1092 					cpy_len, 0);
1093 			} else {
1094 				batch_copy[vq->batch_copy_nb_elems].dst =
1095 				(void *)((uintptr_t)(buf_addr + buf_offset));
1096 				batch_copy[vq->batch_copy_nb_elems].src =
1097 				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
1098 				batch_copy[vq->batch_copy_nb_elems].log_addr =
1099 					buf_iova + buf_offset;
1100 				batch_copy[vq->batch_copy_nb_elems].len =
1101 					cpy_len;
1102 				vq->batch_copy_nb_elems++;
1103 			}
1104 
1105 			mbuf_avail  -= cpy_len;
1106 			mbuf_offset += cpy_len;
1107 			buf_avail  -= cpy_len;
1108 			buf_offset += cpy_len;
1109 		}
1110 
1111 	}
1112 
1113 out:
1114 	if (tlen) {
1115 		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
1116 		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
1117 	} else {
1118 		src_it->count = 0;
1119 	}
1120 
1121 	return error;
1122 }
1123 
1124 static __rte_always_inline int
1125 vhost_enqueue_single_packed(struct virtio_net *dev,
1126 			    struct vhost_virtqueue *vq,
1127 			    struct rte_mbuf *pkt,
1128 			    struct buf_vector *buf_vec,
1129 			    uint16_t *nr_descs)
1130 {
1131 	uint16_t nr_vec = 0;
1132 	uint16_t avail_idx = vq->last_avail_idx;
1133 	uint16_t max_tries, tries = 0;
1134 	uint16_t buf_id = 0;
1135 	uint32_t len = 0;
1136 	uint16_t desc_count;
1137 	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
1138 	uint16_t num_buffers = 0;
1139 	uint32_t buffer_len[vq->size];
1140 	uint16_t buffer_buf_id[vq->size];
1141 	uint16_t buffer_desc_count[vq->size];
1142 
1143 	if (rxvq_is_mergeable(dev))
1144 		max_tries = vq->size - 1;
1145 	else
1146 		max_tries = 1;
1147 
1148 	while (size > 0) {
1149 		/*
1150 		 * if we tried all available ring items, and still
1151 		 * can't get enough buf, it means something abnormal
1152 		 * happened.
1153 		 */
1154 		if (unlikely(++tries > max_tries))
1155 			return -1;
1156 
1157 		if (unlikely(fill_vec_buf_packed(dev, vq,
1158 						avail_idx, &desc_count,
1159 						buf_vec, &nr_vec,
1160 						&buf_id, &len,
1161 						VHOST_ACCESS_RW) < 0))
1162 			return -1;
1163 
1164 		len = RTE_MIN(len, size);
1165 		size -= len;
1166 
1167 		buffer_len[num_buffers] = len;
1168 		buffer_buf_id[num_buffers] = buf_id;
1169 		buffer_desc_count[num_buffers] = desc_count;
1170 		num_buffers += 1;
1171 
1172 		*nr_descs += desc_count;
1173 		avail_idx += desc_count;
1174 		if (avail_idx >= vq->size)
1175 			avail_idx -= vq->size;
1176 	}
1177 
1178 	if (copy_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers) < 0)
1179 		return -1;
1180 
1181 	vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
1182 					   buffer_desc_count, num_buffers);
1183 
1184 	return 0;
1185 }
1186 
1187 static __rte_noinline uint32_t
1188 virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
1189 	struct rte_mbuf **pkts, uint32_t count)
1190 {
1191 	uint32_t pkt_idx = 0;
1192 	uint16_t num_buffers;
1193 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1194 	uint16_t avail_head;
1195 
1196 	/*
1197 	 * The ordering between avail index and
1198 	 * desc reads needs to be enforced.
1199 	 */
1200 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1201 
1202 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1203 
1204 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1205 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1206 		uint16_t nr_vec = 0;
1207 
1208 		if (unlikely(reserve_avail_buf_split(dev, vq,
1209 						pkt_len, buf_vec, &num_buffers,
1210 						avail_head, &nr_vec) < 0)) {
1211 			VHOST_LOG_DATA(DEBUG,
1212 				"(%d) failed to get enough desc from vring\n",
1213 				dev->vid);
1214 			vq->shadow_used_idx -= num_buffers;
1215 			break;
1216 		}
1217 
1218 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1219 			dev->vid, vq->last_avail_idx,
1220 			vq->last_avail_idx + num_buffers);
1221 
1222 		if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
1223 						buf_vec, nr_vec,
1224 						num_buffers) < 0) {
1225 			vq->shadow_used_idx -= num_buffers;
1226 			break;
1227 		}
1228 
1229 		vq->last_avail_idx += num_buffers;
1230 	}
1231 
1232 	do_data_copy_enqueue(dev, vq);
1233 
1234 	if (likely(vq->shadow_used_idx)) {
1235 		flush_shadow_used_ring_split(dev, vq);
1236 		vhost_vring_call_split(dev, vq);
1237 	}
1238 
1239 	return pkt_idx;
1240 }
1241 
1242 static __rte_always_inline int
1243 virtio_dev_rx_batch_packed(struct virtio_net *dev,
1244 			   struct vhost_virtqueue *vq,
1245 			   struct rte_mbuf **pkts)
1246 {
1247 	bool wrap_counter = vq->avail_wrap_counter;
1248 	struct vring_packed_desc *descs = vq->desc_packed;
1249 	uint16_t avail_idx = vq->last_avail_idx;
1250 	uint64_t desc_addrs[PACKED_BATCH_SIZE];
1251 	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
1252 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
1253 	uint64_t lens[PACKED_BATCH_SIZE];
1254 	uint16_t ids[PACKED_BATCH_SIZE];
1255 	uint16_t i;
1256 
1257 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
1258 		return -1;
1259 
1260 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
1261 		return -1;
1262 
1263 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1264 		if (unlikely(pkts[i]->next != NULL))
1265 			return -1;
1266 		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
1267 					    wrap_counter)))
1268 			return -1;
1269 	}
1270 
1271 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1272 		lens[i] = descs[avail_idx + i].len;
1273 
1274 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1275 		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
1276 			return -1;
1277 	}
1278 
1279 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1280 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
1281 						  descs[avail_idx + i].addr,
1282 						  &lens[i],
1283 						  VHOST_ACCESS_RW);
1284 
1285 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1286 		if (unlikely(!desc_addrs[i]))
1287 			return -1;
1288 		if (unlikely(lens[i] != descs[avail_idx + i].len))
1289 			return -1;
1290 	}
1291 
1292 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1293 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
1294 		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
1295 					(uintptr_t)desc_addrs[i];
1296 		lens[i] = pkts[i]->pkt_len +
1297 			sizeof(struct virtio_net_hdr_mrg_rxbuf);
1298 	}
1299 
1300 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1301 		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
1302 
1303 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
1304 
1305 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
1306 		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
1307 			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
1308 			   pkts[i]->pkt_len);
1309 	}
1310 
1311 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1312 		vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
1313 					   lens[i]);
1314 
1315 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
1316 		ids[i] = descs[avail_idx + i].id;
1317 
1318 	vhost_flush_enqueue_batch_packed(dev, vq, lens, ids);
1319 
1320 	return 0;
1321 }
1322 
1323 static __rte_always_inline int16_t
1324 virtio_dev_rx_single_packed(struct virtio_net *dev,
1325 			    struct vhost_virtqueue *vq,
1326 			    struct rte_mbuf *pkt)
1327 {
1328 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1329 	uint16_t nr_descs = 0;
1330 
1331 	if (unlikely(vhost_enqueue_single_packed(dev, vq, pkt, buf_vec,
1332 						 &nr_descs) < 0)) {
1333 		VHOST_LOG_DATA(DEBUG,
1334 				"(%d) failed to get enough desc from vring\n",
1335 				dev->vid);
1336 		return -1;
1337 	}
1338 
1339 	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1340 			dev->vid, vq->last_avail_idx,
1341 			vq->last_avail_idx + nr_descs);
1342 
1343 	vq_inc_last_avail_packed(vq, nr_descs);
1344 
1345 	return 0;
1346 }
1347 
1348 static __rte_noinline uint32_t
1349 virtio_dev_rx_packed(struct virtio_net *dev,
1350 		     struct vhost_virtqueue *__rte_restrict vq,
1351 		     struct rte_mbuf **__rte_restrict pkts,
1352 		     uint32_t count)
1353 {
1354 	uint32_t pkt_idx = 0;
1355 	uint32_t remained = count;
1356 
1357 	do {
1358 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
1359 
1360 		if (remained >= PACKED_BATCH_SIZE) {
1361 			if (!virtio_dev_rx_batch_packed(dev, vq,
1362 							&pkts[pkt_idx])) {
1363 				pkt_idx += PACKED_BATCH_SIZE;
1364 				remained -= PACKED_BATCH_SIZE;
1365 				continue;
1366 			}
1367 		}
1368 
1369 		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
1370 			break;
1371 		pkt_idx++;
1372 		remained--;
1373 
1374 	} while (pkt_idx < count);
1375 
1376 	if (vq->shadow_used_idx) {
1377 		do_data_copy_enqueue(dev, vq);
1378 		vhost_flush_enqueue_shadow_packed(dev, vq);
1379 	}
1380 
1381 	if (pkt_idx)
1382 		vhost_vring_call_packed(dev, vq);
1383 
1384 	return pkt_idx;
1385 }
1386 
1387 static __rte_always_inline uint32_t
1388 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
1389 	struct rte_mbuf **pkts, uint32_t count)
1390 {
1391 	struct vhost_virtqueue *vq;
1392 	uint32_t nb_tx = 0;
1393 
1394 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1395 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1396 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1397 			dev->vid, __func__, queue_id);
1398 		return 0;
1399 	}
1400 
1401 	vq = dev->virtqueue[queue_id];
1402 
1403 	rte_spinlock_lock(&vq->access_lock);
1404 
1405 	if (unlikely(!vq->enabled))
1406 		goto out_access_unlock;
1407 
1408 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1409 		vhost_user_iotlb_rd_lock(vq);
1410 
1411 	if (unlikely(!vq->access_ok))
1412 		if (unlikely(vring_translate(dev, vq) < 0))
1413 			goto out;
1414 
1415 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1416 	if (count == 0)
1417 		goto out;
1418 
1419 	if (vq_is_packed(dev))
1420 		nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
1421 	else
1422 		nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
1423 
1424 out:
1425 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1426 		vhost_user_iotlb_rd_unlock(vq);
1427 
1428 out_access_unlock:
1429 	rte_spinlock_unlock(&vq->access_lock);
1430 
1431 	return nb_tx;
1432 }
1433 
1434 uint16_t
1435 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
1436 	struct rte_mbuf **__rte_restrict pkts, uint16_t count)
1437 {
1438 	struct virtio_net *dev = get_device(vid);
1439 
1440 	if (!dev)
1441 		return 0;
1442 
1443 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1444 		VHOST_LOG_DATA(ERR,
1445 			"(%d) %s: built-in vhost net backend is disabled.\n",
1446 			dev->vid, __func__);
1447 		return 0;
1448 	}
1449 
1450 	return virtio_dev_rx(dev, queue_id, pkts, count);
1451 }
1452 
1453 static __rte_always_inline uint16_t
1454 virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
1455 	uint16_t vq_size, uint16_t n_inflight)
1456 {
1457 	return pkts_idx > n_inflight ? (pkts_idx - n_inflight) :
1458 		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
1459 }
1460 
1461 static __rte_noinline uint32_t
1462 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
1463 	struct vhost_virtqueue *vq, uint16_t queue_id,
1464 	struct rte_mbuf **pkts, uint32_t count,
1465 	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
1466 {
1467 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
1468 	uint16_t num_buffers;
1469 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
1470 	uint16_t avail_head;
1471 
1472 	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
1473 	struct iovec *vec_pool = vq->vec_pool;
1474 	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
1475 	struct iovec *src_iovec = vec_pool;
1476 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
1477 	struct rte_vhost_iov_iter *src_it = it_pool;
1478 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
1479 	uint16_t slot_idx = 0;
1480 	uint16_t segs_await = 0;
1481 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
1482 	uint32_t n_pkts = 0, pkt_err = 0;
1483 	uint32_t num_async_pkts = 0, num_done_pkts = 0;
1484 	struct {
1485 		uint16_t pkt_idx;
1486 		uint16_t last_avail_idx;
1487 	} async_pkts_log[MAX_PKT_BURST];
1488 
1489 	/*
1490 	 * The ordering between avail index and desc reads need to be enforced.
1491 	 */
1492 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
1493 
1494 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
1495 
1496 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1497 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
1498 		uint16_t nr_vec = 0;
1499 
1500 		if (unlikely(reserve_avail_buf_split(dev, vq,
1501 						pkt_len, buf_vec, &num_buffers,
1502 						avail_head, &nr_vec) < 0)) {
1503 			VHOST_LOG_DATA(DEBUG,
1504 				"(%d) failed to get enough desc from vring\n",
1505 				dev->vid);
1506 			vq->shadow_used_idx -= num_buffers;
1507 			break;
1508 		}
1509 
1510 		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
1511 			dev->vid, vq->last_avail_idx,
1512 			vq->last_avail_idx + num_buffers);
1513 
1514 		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
1515 				buf_vec, nr_vec, num_buffers,
1516 				src_iovec, dst_iovec, src_it, dst_it) < 0) {
1517 			vq->shadow_used_idx -= num_buffers;
1518 			break;
1519 		}
1520 
1521 		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
1522 			(vq->size - 1);
1523 		if (src_it->count) {
1524 			uint16_t from, to;
1525 
1526 			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
1527 			pkts_info[slot_idx].descs = num_buffers;
1528 			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
1529 			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
1530 			async_pkts_log[num_async_pkts++].last_avail_idx =
1531 				vq->last_avail_idx;
1532 			src_iovec += src_it->nr_segs;
1533 			dst_iovec += dst_it->nr_segs;
1534 			src_it += 2;
1535 			dst_it += 2;
1536 			segs_await += src_it->nr_segs;
1537 
1538 			/**
1539 			 * recover shadow used ring and keep DMA-occupied
1540 			 * descriptors.
1541 			 */
1542 			from = vq->shadow_used_idx - num_buffers;
1543 			to = vq->async_desc_idx & (vq->size - 1);
1544 			if (num_buffers + to <= vq->size) {
1545 				rte_memcpy(&vq->async_descs_split[to],
1546 						&vq->shadow_used_split[from],
1547 						num_buffers *
1548 						sizeof(struct vring_used_elem));
1549 			} else {
1550 				int size = vq->size - to;
1551 
1552 				rte_memcpy(&vq->async_descs_split[to],
1553 						&vq->shadow_used_split[from],
1554 						size *
1555 						sizeof(struct vring_used_elem));
1556 				rte_memcpy(vq->async_descs_split,
1557 						&vq->shadow_used_split[from +
1558 						size], (num_buffers - size) *
1559 					   sizeof(struct vring_used_elem));
1560 			}
1561 			vq->async_desc_idx += num_buffers;
1562 			vq->shadow_used_idx -= num_buffers;
1563 		} else
1564 			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
1565 
1566 		vq->last_avail_idx += num_buffers;
1567 
1568 		/*
1569 		 * conditions to trigger async device transfer:
1570 		 * - buffered packet number reaches transfer threshold
1571 		 * - unused async iov number is less than max vhost vector
1572 		 */
1573 		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
1574 			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
1575 			BUF_VECTOR_MAX))) {
1576 			n_pkts = vq->async_ops.transfer_data(dev->vid,
1577 					queue_id, tdes, 0, pkt_burst_idx);
1578 			src_iovec = vec_pool;
1579 			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
1580 			src_it = it_pool;
1581 			dst_it = it_pool + 1;
1582 			segs_await = 0;
1583 			vq->async_pkts_inflight_n += n_pkts;
1584 
1585 			if (unlikely(n_pkts < pkt_burst_idx)) {
1586 				/*
1587 				 * log error packets number here and do actual
1588 				 * error processing when applications poll
1589 				 * completion
1590 				 */
1591 				pkt_err = pkt_burst_idx - n_pkts;
1592 				pkt_burst_idx = 0;
1593 				break;
1594 			}
1595 
1596 			pkt_burst_idx = 0;
1597 		}
1598 	}
1599 
1600 	if (pkt_burst_idx) {
1601 		n_pkts = vq->async_ops.transfer_data(dev->vid,
1602 				queue_id, tdes, 0, pkt_burst_idx);
1603 		vq->async_pkts_inflight_n += n_pkts;
1604 
1605 		if (unlikely(n_pkts < pkt_burst_idx))
1606 			pkt_err = pkt_burst_idx - n_pkts;
1607 	}
1608 
1609 	do_data_copy_enqueue(dev, vq);
1610 
1611 	if (unlikely(pkt_err)) {
1612 		uint16_t num_descs = 0;
1613 
1614 		num_async_pkts -= pkt_err;
1615 		/* calculate the sum of descriptors of DMA-error packets. */
1616 		while (pkt_err-- > 0) {
1617 			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
1618 			slot_idx--;
1619 		}
1620 		vq->async_desc_idx -= num_descs;
1621 		/* recover shadow used ring and available ring */
1622 		vq->shadow_used_idx -= (vq->last_avail_idx -
1623 				async_pkts_log[num_async_pkts].last_avail_idx -
1624 				num_descs);
1625 		vq->last_avail_idx =
1626 			async_pkts_log[num_async_pkts].last_avail_idx;
1627 		pkt_idx = async_pkts_log[num_async_pkts].pkt_idx;
1628 		num_done_pkts = pkt_idx - num_async_pkts;
1629 	}
1630 
1631 	vq->async_pkts_idx += num_async_pkts;
1632 	*comp_count = num_done_pkts;
1633 
1634 	if (likely(vq->shadow_used_idx)) {
1635 		flush_shadow_used_ring_split(dev, vq);
1636 		vhost_vring_call_split(dev, vq);
1637 	}
1638 
1639 	return pkt_idx;
1640 }
1641 
1642 uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
1643 		struct rte_mbuf **pkts, uint16_t count)
1644 {
1645 	struct virtio_net *dev = get_device(vid);
1646 	struct vhost_virtqueue *vq;
1647 	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
1648 	uint16_t start_idx, pkts_idx, vq_size;
1649 	struct async_inflight_info *pkts_info;
1650 	uint16_t from, i;
1651 
1652 	if (!dev)
1653 		return 0;
1654 
1655 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1656 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1657 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1658 			dev->vid, __func__, queue_id);
1659 		return 0;
1660 	}
1661 
1662 	vq = dev->virtqueue[queue_id];
1663 
1664 	if (unlikely(!vq->async_registered)) {
1665 		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
1666 			dev->vid, __func__, queue_id);
1667 		return 0;
1668 	}
1669 
1670 	rte_spinlock_lock(&vq->access_lock);
1671 
1672 	pkts_idx = vq->async_pkts_idx & (vq->size - 1);
1673 	pkts_info = vq->async_pkts_info;
1674 	vq_size = vq->size;
1675 	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
1676 		vq_size, vq->async_pkts_inflight_n);
1677 
1678 	if (count > vq->async_last_pkts_n)
1679 		n_pkts_cpl = vq->async_ops.check_completed_copies(vid,
1680 			queue_id, 0, count - vq->async_last_pkts_n);
1681 	n_pkts_cpl += vq->async_last_pkts_n;
1682 
1683 	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
1684 	if (unlikely(n_pkts_put == 0)) {
1685 		vq->async_last_pkts_n = n_pkts_cpl;
1686 		goto done;
1687 	}
1688 
1689 	for (i = 0; i < n_pkts_put; i++) {
1690 		from = (start_idx + i) & (vq_size - 1);
1691 		n_descs += pkts_info[from].descs;
1692 		pkts[i] = pkts_info[from].mbuf;
1693 	}
1694 	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
1695 	vq->async_pkts_inflight_n -= n_pkts_put;
1696 
1697 	if (likely(vq->enabled && vq->access_ok)) {
1698 		uint16_t nr_left = n_descs;
1699 		uint16_t nr_copy;
1700 		uint16_t to;
1701 
1702 		/* write back completed descriptors to used ring */
1703 		do {
1704 			from = vq->last_async_desc_idx & (vq->size - 1);
1705 			nr_copy = nr_left + from <= vq->size ? nr_left :
1706 				vq->size - from;
1707 			to = vq->last_used_idx & (vq->size - 1);
1708 
1709 			if (to + nr_copy <= vq->size) {
1710 				rte_memcpy(&vq->used->ring[to],
1711 						&vq->async_descs_split[from],
1712 						nr_copy *
1713 						sizeof(struct vring_used_elem));
1714 			} else {
1715 				uint16_t size = vq->size - to;
1716 
1717 				rte_memcpy(&vq->used->ring[to],
1718 						&vq->async_descs_split[from],
1719 						size *
1720 						sizeof(struct vring_used_elem));
1721 				rte_memcpy(vq->used->ring,
1722 						&vq->async_descs_split[from +
1723 						size], (nr_copy - size) *
1724 						sizeof(struct vring_used_elem));
1725 			}
1726 
1727 			vq->last_async_desc_idx += nr_copy;
1728 			vq->last_used_idx += nr_copy;
1729 			nr_left -= nr_copy;
1730 		} while (nr_left > 0);
1731 
1732 		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
1733 		vhost_vring_call_split(dev, vq);
1734 	} else
1735 		vq->last_async_desc_idx += n_descs;
1736 
1737 done:
1738 	rte_spinlock_unlock(&vq->access_lock);
1739 
1740 	return n_pkts_put;
1741 }
1742 
1743 static __rte_always_inline uint32_t
1744 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
1745 	struct rte_mbuf **pkts, uint32_t count,
1746 	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
1747 {
1748 	struct vhost_virtqueue *vq;
1749 	uint32_t nb_tx = 0;
1750 
1751 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
1752 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
1753 		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
1754 			dev->vid, __func__, queue_id);
1755 		return 0;
1756 	}
1757 
1758 	vq = dev->virtqueue[queue_id];
1759 
1760 	rte_spinlock_lock(&vq->access_lock);
1761 
1762 	if (unlikely(!vq->enabled || !vq->async_registered))
1763 		goto out_access_unlock;
1764 
1765 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1766 		vhost_user_iotlb_rd_lock(vq);
1767 
1768 	if (unlikely(!vq->access_ok))
1769 		if (unlikely(vring_translate(dev, vq) < 0))
1770 			goto out;
1771 
1772 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1773 	if (count == 0)
1774 		goto out;
1775 
1776 	/* TODO: packed queue not implemented */
1777 	if (vq_is_packed(dev))
1778 		nb_tx = 0;
1779 	else
1780 		nb_tx = virtio_dev_rx_async_submit_split(dev,
1781 				vq, queue_id, pkts, count, comp_pkts,
1782 				comp_count);
1783 
1784 out:
1785 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
1786 		vhost_user_iotlb_rd_unlock(vq);
1787 
1788 out_access_unlock:
1789 	rte_spinlock_unlock(&vq->access_lock);
1790 
1791 	return nb_tx;
1792 }
1793 
1794 uint16_t
1795 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
1796 		struct rte_mbuf **pkts, uint16_t count,
1797 		struct rte_mbuf **comp_pkts, uint32_t *comp_count)
1798 {
1799 	struct virtio_net *dev = get_device(vid);
1800 
1801 	*comp_count = 0;
1802 	if (!dev)
1803 		return 0;
1804 
1805 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
1806 		VHOST_LOG_DATA(ERR,
1807 			"(%d) %s: built-in vhost net backend is disabled.\n",
1808 			dev->vid, __func__);
1809 		return 0;
1810 	}
1811 
1812 	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count, comp_pkts,
1813 			comp_count);
1814 }
1815 
1816 static inline bool
1817 virtio_net_with_host_offload(struct virtio_net *dev)
1818 {
1819 	if (dev->features &
1820 			((1ULL << VIRTIO_NET_F_CSUM) |
1821 			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
1822 			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
1823 			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
1824 			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
1825 		return true;
1826 
1827 	return false;
1828 }
1829 
1830 static void
1831 parse_ethernet(struct rte_mbuf *m, uint16_t *l4_proto, void **l4_hdr)
1832 {
1833 	struct rte_ipv4_hdr *ipv4_hdr;
1834 	struct rte_ipv6_hdr *ipv6_hdr;
1835 	void *l3_hdr = NULL;
1836 	struct rte_ether_hdr *eth_hdr;
1837 	uint16_t ethertype;
1838 
1839 	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
1840 
1841 	m->l2_len = sizeof(struct rte_ether_hdr);
1842 	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
1843 
1844 	if (ethertype == RTE_ETHER_TYPE_VLAN) {
1845 		struct rte_vlan_hdr *vlan_hdr =
1846 			(struct rte_vlan_hdr *)(eth_hdr + 1);
1847 
1848 		m->l2_len += sizeof(struct rte_vlan_hdr);
1849 		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
1850 	}
1851 
1852 	l3_hdr = (char *)eth_hdr + m->l2_len;
1853 
1854 	switch (ethertype) {
1855 	case RTE_ETHER_TYPE_IPV4:
1856 		ipv4_hdr = l3_hdr;
1857 		*l4_proto = ipv4_hdr->next_proto_id;
1858 		m->l3_len = rte_ipv4_hdr_len(ipv4_hdr);
1859 		*l4_hdr = (char *)l3_hdr + m->l3_len;
1860 		m->ol_flags |= PKT_TX_IPV4;
1861 		break;
1862 	case RTE_ETHER_TYPE_IPV6:
1863 		ipv6_hdr = l3_hdr;
1864 		*l4_proto = ipv6_hdr->proto;
1865 		m->l3_len = sizeof(struct rte_ipv6_hdr);
1866 		*l4_hdr = (char *)l3_hdr + m->l3_len;
1867 		m->ol_flags |= PKT_TX_IPV6;
1868 		break;
1869 	default:
1870 		m->l3_len = 0;
1871 		*l4_proto = 0;
1872 		*l4_hdr = NULL;
1873 		break;
1874 	}
1875 }
1876 
1877 static __rte_always_inline void
1878 vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
1879 {
1880 	uint16_t l4_proto = 0;
1881 	void *l4_hdr = NULL;
1882 	struct rte_tcp_hdr *tcp_hdr = NULL;
1883 
1884 	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
1885 		return;
1886 
1887 	parse_ethernet(m, &l4_proto, &l4_hdr);
1888 	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
1889 		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
1890 			switch (hdr->csum_offset) {
1891 			case (offsetof(struct rte_tcp_hdr, cksum)):
1892 				if (l4_proto == IPPROTO_TCP)
1893 					m->ol_flags |= PKT_TX_TCP_CKSUM;
1894 				break;
1895 			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
1896 				if (l4_proto == IPPROTO_UDP)
1897 					m->ol_flags |= PKT_TX_UDP_CKSUM;
1898 				break;
1899 			case (offsetof(struct rte_sctp_hdr, cksum)):
1900 				if (l4_proto == IPPROTO_SCTP)
1901 					m->ol_flags |= PKT_TX_SCTP_CKSUM;
1902 				break;
1903 			default:
1904 				break;
1905 			}
1906 		}
1907 	}
1908 
1909 	if (l4_hdr && hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
1910 		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
1911 		case VIRTIO_NET_HDR_GSO_TCPV4:
1912 		case VIRTIO_NET_HDR_GSO_TCPV6:
1913 			tcp_hdr = l4_hdr;
1914 			m->ol_flags |= PKT_TX_TCP_SEG;
1915 			m->tso_segsz = hdr->gso_size;
1916 			m->l4_len = (tcp_hdr->data_off & 0xf0) >> 2;
1917 			break;
1918 		case VIRTIO_NET_HDR_GSO_UDP:
1919 			m->ol_flags |= PKT_TX_UDP_SEG;
1920 			m->tso_segsz = hdr->gso_size;
1921 			m->l4_len = sizeof(struct rte_udp_hdr);
1922 			break;
1923 		default:
1924 			VHOST_LOG_DATA(WARNING,
1925 				"unsupported gso type %u.\n", hdr->gso_type);
1926 			break;
1927 		}
1928 	}
1929 }
1930 
1931 static __rte_noinline void
1932 copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr,
1933 		struct buf_vector *buf_vec)
1934 {
1935 	uint64_t len;
1936 	uint64_t remain = sizeof(struct virtio_net_hdr);
1937 	uint64_t src;
1938 	uint64_t dst = (uint64_t)(uintptr_t)hdr;
1939 
1940 	while (remain) {
1941 		len = RTE_MIN(remain, buf_vec->buf_len);
1942 		src = buf_vec->buf_addr;
1943 		rte_memcpy((void *)(uintptr_t)dst,
1944 				(void *)(uintptr_t)src, len);
1945 
1946 		remain -= len;
1947 		dst += len;
1948 		buf_vec++;
1949 	}
1950 }
1951 
1952 static __rte_always_inline int
1953 copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
1954 		  struct buf_vector *buf_vec, uint16_t nr_vec,
1955 		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool)
1956 {
1957 	uint32_t buf_avail, buf_offset;
1958 	uint64_t buf_addr, buf_len;
1959 	uint32_t mbuf_avail, mbuf_offset;
1960 	uint32_t cpy_len;
1961 	struct rte_mbuf *cur = m, *prev = m;
1962 	struct virtio_net_hdr tmp_hdr;
1963 	struct virtio_net_hdr *hdr = NULL;
1964 	/* A counter to avoid desc dead loop chain */
1965 	uint16_t vec_idx = 0;
1966 	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
1967 	int error = 0;
1968 
1969 	buf_addr = buf_vec[vec_idx].buf_addr;
1970 	buf_len = buf_vec[vec_idx].buf_len;
1971 
1972 	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
1973 		error = -1;
1974 		goto out;
1975 	}
1976 
1977 	if (virtio_net_with_host_offload(dev)) {
1978 		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
1979 			/*
1980 			 * No luck, the virtio-net header doesn't fit
1981 			 * in a contiguous virtual area.
1982 			 */
1983 			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
1984 			hdr = &tmp_hdr;
1985 		} else {
1986 			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
1987 		}
1988 	}
1989 
1990 	/*
1991 	 * A virtio driver normally uses at least 2 desc buffers
1992 	 * for Tx: the first for storing the header, and others
1993 	 * for storing the data.
1994 	 */
1995 	if (unlikely(buf_len < dev->vhost_hlen)) {
1996 		buf_offset = dev->vhost_hlen - buf_len;
1997 		vec_idx++;
1998 		buf_addr = buf_vec[vec_idx].buf_addr;
1999 		buf_len = buf_vec[vec_idx].buf_len;
2000 		buf_avail  = buf_len - buf_offset;
2001 	} else if (buf_len == dev->vhost_hlen) {
2002 		if (unlikely(++vec_idx >= nr_vec))
2003 			goto out;
2004 		buf_addr = buf_vec[vec_idx].buf_addr;
2005 		buf_len = buf_vec[vec_idx].buf_len;
2006 
2007 		buf_offset = 0;
2008 		buf_avail = buf_len;
2009 	} else {
2010 		buf_offset = dev->vhost_hlen;
2011 		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
2012 	}
2013 
2014 	PRINT_PACKET(dev,
2015 			(uintptr_t)(buf_addr + buf_offset),
2016 			(uint32_t)buf_avail, 0);
2017 
2018 	mbuf_offset = 0;
2019 	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
2020 	while (1) {
2021 		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
2022 
2023 		if (likely(cpy_len > MAX_BATCH_LEN ||
2024 					vq->batch_copy_nb_elems >= vq->size ||
2025 					(hdr && cur == m))) {
2026 			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
2027 						mbuf_offset),
2028 					(void *)((uintptr_t)(buf_addr +
2029 							buf_offset)), cpy_len);
2030 		} else {
2031 			batch_copy[vq->batch_copy_nb_elems].dst =
2032 				rte_pktmbuf_mtod_offset(cur, void *,
2033 						mbuf_offset);
2034 			batch_copy[vq->batch_copy_nb_elems].src =
2035 				(void *)((uintptr_t)(buf_addr + buf_offset));
2036 			batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
2037 			vq->batch_copy_nb_elems++;
2038 		}
2039 
2040 		mbuf_avail  -= cpy_len;
2041 		mbuf_offset += cpy_len;
2042 		buf_avail -= cpy_len;
2043 		buf_offset += cpy_len;
2044 
2045 		/* This buf reaches to its end, get the next one */
2046 		if (buf_avail == 0) {
2047 			if (++vec_idx >= nr_vec)
2048 				break;
2049 
2050 			buf_addr = buf_vec[vec_idx].buf_addr;
2051 			buf_len = buf_vec[vec_idx].buf_len;
2052 
2053 			buf_offset = 0;
2054 			buf_avail  = buf_len;
2055 
2056 			PRINT_PACKET(dev, (uintptr_t)buf_addr,
2057 					(uint32_t)buf_avail, 0);
2058 		}
2059 
2060 		/*
2061 		 * This mbuf reaches to its end, get a new one
2062 		 * to hold more data.
2063 		 */
2064 		if (mbuf_avail == 0) {
2065 			cur = rte_pktmbuf_alloc(mbuf_pool);
2066 			if (unlikely(cur == NULL)) {
2067 				VHOST_LOG_DATA(ERR, "Failed to "
2068 					"allocate memory for mbuf.\n");
2069 				error = -1;
2070 				goto out;
2071 			}
2072 
2073 			prev->next = cur;
2074 			prev->data_len = mbuf_offset;
2075 			m->nb_segs += 1;
2076 			m->pkt_len += mbuf_offset;
2077 			prev = cur;
2078 
2079 			mbuf_offset = 0;
2080 			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
2081 		}
2082 	}
2083 
2084 	prev->data_len = mbuf_offset;
2085 	m->pkt_len    += mbuf_offset;
2086 
2087 	if (hdr)
2088 		vhost_dequeue_offload(hdr, m);
2089 
2090 out:
2091 
2092 	return error;
2093 }
2094 
2095 static void
2096 virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
2097 {
2098 	rte_free(opaque);
2099 }
2100 
2101 static int
2102 virtio_dev_extbuf_alloc(struct rte_mbuf *pkt, uint32_t size)
2103 {
2104 	struct rte_mbuf_ext_shared_info *shinfo = NULL;
2105 	uint32_t total_len = RTE_PKTMBUF_HEADROOM + size;
2106 	uint16_t buf_len;
2107 	rte_iova_t iova;
2108 	void *buf;
2109 
2110 	total_len += sizeof(*shinfo) + sizeof(uintptr_t);
2111 	total_len = RTE_ALIGN_CEIL(total_len, sizeof(uintptr_t));
2112 
2113 	if (unlikely(total_len > UINT16_MAX))
2114 		return -ENOSPC;
2115 
2116 	buf_len = total_len;
2117 	buf = rte_malloc(NULL, buf_len, RTE_CACHE_LINE_SIZE);
2118 	if (unlikely(buf == NULL))
2119 		return -ENOMEM;
2120 
2121 	/* Initialize shinfo */
2122 	shinfo = rte_pktmbuf_ext_shinfo_init_helper(buf, &buf_len,
2123 						virtio_dev_extbuf_free, buf);
2124 	if (unlikely(shinfo == NULL)) {
2125 		rte_free(buf);
2126 		VHOST_LOG_DATA(ERR, "Failed to init shinfo\n");
2127 		return -1;
2128 	}
2129 
2130 	iova = rte_malloc_virt2iova(buf);
2131 	rte_pktmbuf_attach_extbuf(pkt, buf, iova, buf_len, shinfo);
2132 	rte_pktmbuf_reset_headroom(pkt);
2133 
2134 	return 0;
2135 }
2136 
2137 /*
2138  * Allocate a host supported pktmbuf.
2139  */
2140 static __rte_always_inline struct rte_mbuf *
2141 virtio_dev_pktmbuf_alloc(struct virtio_net *dev, struct rte_mempool *mp,
2142 			 uint32_t data_len)
2143 {
2144 	struct rte_mbuf *pkt = rte_pktmbuf_alloc(mp);
2145 
2146 	if (unlikely(pkt == NULL)) {
2147 		VHOST_LOG_DATA(ERR,
2148 			"Failed to allocate memory for mbuf.\n");
2149 		return NULL;
2150 	}
2151 
2152 	if (rte_pktmbuf_tailroom(pkt) >= data_len)
2153 		return pkt;
2154 
2155 	/* attach an external buffer if supported */
2156 	if (dev->extbuf && !virtio_dev_extbuf_alloc(pkt, data_len))
2157 		return pkt;
2158 
2159 	/* check if chained buffers are allowed */
2160 	if (!dev->linearbuf)
2161 		return pkt;
2162 
2163 	/* Data doesn't fit into the buffer and the host supports
2164 	 * only linear buffers
2165 	 */
2166 	rte_pktmbuf_free(pkt);
2167 
2168 	return NULL;
2169 }
2170 
2171 static __rte_noinline uint16_t
2172 virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
2173 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
2174 {
2175 	uint16_t i;
2176 	uint16_t free_entries;
2177 	uint16_t dropped = 0;
2178 	static bool allocerr_warned;
2179 
2180 	/*
2181 	 * The ordering between avail index and
2182 	 * desc reads needs to be enforced.
2183 	 */
2184 	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
2185 			vq->last_avail_idx;
2186 	if (free_entries == 0)
2187 		return 0;
2188 
2189 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
2190 
2191 	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
2192 
2193 	count = RTE_MIN(count, MAX_PKT_BURST);
2194 	count = RTE_MIN(count, free_entries);
2195 	VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n",
2196 			dev->vid, count);
2197 
2198 	for (i = 0; i < count; i++) {
2199 		struct buf_vector buf_vec[BUF_VECTOR_MAX];
2200 		uint16_t head_idx;
2201 		uint32_t buf_len;
2202 		uint16_t nr_vec = 0;
2203 		int err;
2204 
2205 		if (unlikely(fill_vec_buf_split(dev, vq,
2206 						vq->last_avail_idx + i,
2207 						&nr_vec, buf_vec,
2208 						&head_idx, &buf_len,
2209 						VHOST_ACCESS_RO) < 0))
2210 			break;
2211 
2212 		update_shadow_used_ring_split(vq, head_idx, 0);
2213 
2214 		pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
2215 		if (unlikely(pkts[i] == NULL)) {
2216 			/*
2217 			 * mbuf allocation fails for jumbo packets when external
2218 			 * buffer allocation is not allowed and linear buffer
2219 			 * is required. Drop this packet.
2220 			 */
2221 			if (!allocerr_warned) {
2222 				VHOST_LOG_DATA(ERR,
2223 					"Failed mbuf alloc of size %d from %s on %s.\n",
2224 					buf_len, mbuf_pool->name, dev->ifname);
2225 				allocerr_warned = true;
2226 			}
2227 			dropped += 1;
2228 			i++;
2229 			break;
2230 		}
2231 
2232 		err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
2233 				mbuf_pool);
2234 		if (unlikely(err)) {
2235 			rte_pktmbuf_free(pkts[i]);
2236 			if (!allocerr_warned) {
2237 				VHOST_LOG_DATA(ERR,
2238 					"Failed to copy desc to mbuf on %s.\n",
2239 					dev->ifname);
2240 				allocerr_warned = true;
2241 			}
2242 			dropped += 1;
2243 			i++;
2244 			break;
2245 		}
2246 	}
2247 
2248 	vq->last_avail_idx += i;
2249 
2250 	do_data_copy_dequeue(vq);
2251 	if (unlikely(i < count))
2252 		vq->shadow_used_idx = i;
2253 	if (likely(vq->shadow_used_idx)) {
2254 		flush_shadow_used_ring_split(dev, vq);
2255 		vhost_vring_call_split(dev, vq);
2256 	}
2257 
2258 	return (i - dropped);
2259 }
2260 
2261 static __rte_always_inline int
2262 vhost_reserve_avail_batch_packed(struct virtio_net *dev,
2263 				 struct vhost_virtqueue *vq,
2264 				 struct rte_mempool *mbuf_pool,
2265 				 struct rte_mbuf **pkts,
2266 				 uint16_t avail_idx,
2267 				 uintptr_t *desc_addrs,
2268 				 uint16_t *ids)
2269 {
2270 	bool wrap = vq->avail_wrap_counter;
2271 	struct vring_packed_desc *descs = vq->desc_packed;
2272 	uint64_t lens[PACKED_BATCH_SIZE];
2273 	uint64_t buf_lens[PACKED_BATCH_SIZE];
2274 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2275 	uint16_t flags, i;
2276 
2277 	if (unlikely(avail_idx & PACKED_BATCH_MASK))
2278 		return -1;
2279 	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
2280 		return -1;
2281 
2282 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2283 		flags = descs[avail_idx + i].flags;
2284 		if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
2285 			     (wrap == !!(flags & VRING_DESC_F_USED))  ||
2286 			     (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
2287 			return -1;
2288 	}
2289 
2290 	rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
2291 
2292 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2293 		lens[i] = descs[avail_idx + i].len;
2294 
2295 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2296 		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
2297 						  descs[avail_idx + i].addr,
2298 						  &lens[i], VHOST_ACCESS_RW);
2299 	}
2300 
2301 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2302 		if (unlikely(!desc_addrs[i]))
2303 			return -1;
2304 		if (unlikely((lens[i] != descs[avail_idx + i].len)))
2305 			return -1;
2306 	}
2307 
2308 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2309 		pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, lens[i]);
2310 		if (!pkts[i])
2311 			goto free_buf;
2312 	}
2313 
2314 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2315 		buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
2316 
2317 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2318 		if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
2319 			goto free_buf;
2320 	}
2321 
2322 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2323 		pkts[i]->pkt_len = lens[i] - buf_offset;
2324 		pkts[i]->data_len = pkts[i]->pkt_len;
2325 		ids[i] = descs[avail_idx + i].id;
2326 	}
2327 
2328 	return 0;
2329 
2330 free_buf:
2331 	for (i = 0; i < PACKED_BATCH_SIZE; i++)
2332 		rte_pktmbuf_free(pkts[i]);
2333 
2334 	return -1;
2335 }
2336 
2337 static __rte_always_inline int
2338 virtio_dev_tx_batch_packed(struct virtio_net *dev,
2339 			   struct vhost_virtqueue *vq,
2340 			   struct rte_mempool *mbuf_pool,
2341 			   struct rte_mbuf **pkts)
2342 {
2343 	uint16_t avail_idx = vq->last_avail_idx;
2344 	uint32_t buf_offset = sizeof(struct virtio_net_hdr_mrg_rxbuf);
2345 	struct virtio_net_hdr *hdr;
2346 	uintptr_t desc_addrs[PACKED_BATCH_SIZE];
2347 	uint16_t ids[PACKED_BATCH_SIZE];
2348 	uint16_t i;
2349 
2350 	if (vhost_reserve_avail_batch_packed(dev, vq, mbuf_pool, pkts,
2351 					     avail_idx, desc_addrs, ids))
2352 		return -1;
2353 
2354 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2355 		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
2356 
2357 	vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
2358 		rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
2359 			   (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
2360 			   pkts[i]->pkt_len);
2361 
2362 	if (virtio_net_with_host_offload(dev)) {
2363 		vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
2364 			hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
2365 			vhost_dequeue_offload(hdr, pkts[i]);
2366 		}
2367 	}
2368 
2369 	if (virtio_net_is_inorder(dev))
2370 		vhost_shadow_dequeue_batch_packed_inorder(vq,
2371 			ids[PACKED_BATCH_SIZE - 1]);
2372 	else
2373 		vhost_shadow_dequeue_batch_packed(dev, vq, ids);
2374 
2375 	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
2376 
2377 	return 0;
2378 }
2379 
2380 static __rte_always_inline int
2381 vhost_dequeue_single_packed(struct virtio_net *dev,
2382 			    struct vhost_virtqueue *vq,
2383 			    struct rte_mempool *mbuf_pool,
2384 			    struct rte_mbuf **pkts,
2385 			    uint16_t *buf_id,
2386 			    uint16_t *desc_count)
2387 {
2388 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
2389 	uint32_t buf_len;
2390 	uint16_t nr_vec = 0;
2391 	int err;
2392 	static bool allocerr_warned;
2393 
2394 	if (unlikely(fill_vec_buf_packed(dev, vq,
2395 					 vq->last_avail_idx, desc_count,
2396 					 buf_vec, &nr_vec,
2397 					 buf_id, &buf_len,
2398 					 VHOST_ACCESS_RO) < 0))
2399 		return -1;
2400 
2401 	*pkts = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
2402 	if (unlikely(*pkts == NULL)) {
2403 		if (!allocerr_warned) {
2404 			VHOST_LOG_DATA(ERR,
2405 				"Failed mbuf alloc of size %d from %s on %s.\n",
2406 				buf_len, mbuf_pool->name, dev->ifname);
2407 			allocerr_warned = true;
2408 		}
2409 		return -1;
2410 	}
2411 
2412 	err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, *pkts,
2413 				mbuf_pool);
2414 	if (unlikely(err)) {
2415 		if (!allocerr_warned) {
2416 			VHOST_LOG_DATA(ERR,
2417 				"Failed to copy desc to mbuf on %s.\n",
2418 				dev->ifname);
2419 			allocerr_warned = true;
2420 		}
2421 		rte_pktmbuf_free(*pkts);
2422 		return -1;
2423 	}
2424 
2425 	return 0;
2426 }
2427 
2428 static __rte_always_inline int
2429 virtio_dev_tx_single_packed(struct virtio_net *dev,
2430 			    struct vhost_virtqueue *vq,
2431 			    struct rte_mempool *mbuf_pool,
2432 			    struct rte_mbuf **pkts)
2433 {
2434 
2435 	uint16_t buf_id, desc_count = 0;
2436 	int ret;
2437 
2438 	ret = vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
2439 					&desc_count);
2440 
2441 	if (likely(desc_count > 0)) {
2442 		if (virtio_net_is_inorder(dev))
2443 			vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
2444 								   desc_count);
2445 		else
2446 			vhost_shadow_dequeue_single_packed(vq, buf_id,
2447 					desc_count);
2448 
2449 		vq_inc_last_avail_packed(vq, desc_count);
2450 	}
2451 
2452 	return ret;
2453 }
2454 
2455 static __rte_noinline uint16_t
2456 virtio_dev_tx_packed(struct virtio_net *dev,
2457 		     struct vhost_virtqueue *__rte_restrict vq,
2458 		     struct rte_mempool *mbuf_pool,
2459 		     struct rte_mbuf **__rte_restrict pkts,
2460 		     uint32_t count)
2461 {
2462 	uint32_t pkt_idx = 0;
2463 	uint32_t remained = count;
2464 
2465 	do {
2466 		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
2467 
2468 		if (remained >= PACKED_BATCH_SIZE) {
2469 			if (!virtio_dev_tx_batch_packed(dev, vq, mbuf_pool,
2470 							&pkts[pkt_idx])) {
2471 				pkt_idx += PACKED_BATCH_SIZE;
2472 				remained -= PACKED_BATCH_SIZE;
2473 				continue;
2474 			}
2475 		}
2476 
2477 		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
2478 						&pkts[pkt_idx]))
2479 			break;
2480 		pkt_idx++;
2481 		remained--;
2482 
2483 	} while (remained);
2484 
2485 	if (vq->shadow_used_idx) {
2486 		do_data_copy_dequeue(vq);
2487 
2488 		vhost_flush_dequeue_shadow_packed(dev, vq);
2489 		vhost_vring_call_packed(dev, vq);
2490 	}
2491 
2492 	return pkt_idx;
2493 }
2494 
2495 uint16_t
2496 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
2497 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
2498 {
2499 	struct virtio_net *dev;
2500 	struct rte_mbuf *rarp_mbuf = NULL;
2501 	struct vhost_virtqueue *vq;
2502 	int16_t success = 1;
2503 
2504 	dev = get_device(vid);
2505 	if (!dev)
2506 		return 0;
2507 
2508 	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
2509 		VHOST_LOG_DATA(ERR,
2510 			"(%d) %s: built-in vhost net backend is disabled.\n",
2511 			dev->vid, __func__);
2512 		return 0;
2513 	}
2514 
2515 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
2516 		VHOST_LOG_DATA(ERR,
2517 			"(%d) %s: invalid virtqueue idx %d.\n",
2518 			dev->vid, __func__, queue_id);
2519 		return 0;
2520 	}
2521 
2522 	vq = dev->virtqueue[queue_id];
2523 
2524 	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
2525 		return 0;
2526 
2527 	if (unlikely(!vq->enabled)) {
2528 		count = 0;
2529 		goto out_access_unlock;
2530 	}
2531 
2532 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2533 		vhost_user_iotlb_rd_lock(vq);
2534 
2535 	if (unlikely(!vq->access_ok))
2536 		if (unlikely(vring_translate(dev, vq) < 0)) {
2537 			count = 0;
2538 			goto out;
2539 		}
2540 
2541 	/*
2542 	 * Construct a RARP broadcast packet, and inject it to the "pkts"
2543 	 * array, to looks like that guest actually send such packet.
2544 	 *
2545 	 * Check user_send_rarp() for more information.
2546 	 *
2547 	 * broadcast_rarp shares a cacheline in the virtio_net structure
2548 	 * with some fields that are accessed during enqueue and
2549 	 * __atomic_compare_exchange_n causes a write if performed compare
2550 	 * and exchange. This could result in false sharing between enqueue
2551 	 * and dequeue.
2552 	 *
2553 	 * Prevent unnecessary false sharing by reading broadcast_rarp first
2554 	 * and only performing compare and exchange if the read indicates it
2555 	 * is likely to be set.
2556 	 */
2557 	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
2558 			__atomic_compare_exchange_n(&dev->broadcast_rarp,
2559 			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
2560 
2561 		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
2562 		if (rarp_mbuf == NULL) {
2563 			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
2564 			count = 0;
2565 			goto out;
2566 		}
2567 		count -= 1;
2568 	}
2569 
2570 	if (vq_is_packed(dev))
2571 		count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count);
2572 	else
2573 		count = virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count);
2574 
2575 out:
2576 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
2577 		vhost_user_iotlb_rd_unlock(vq);
2578 
2579 out_access_unlock:
2580 	rte_spinlock_unlock(&vq->access_lock);
2581 
2582 	if (unlikely(rarp_mbuf != NULL)) {
2583 		/*
2584 		 * Inject it to the head of "pkts" array, so that switch's mac
2585 		 * learning table will get updated first.
2586 		 */
2587 		memmove(&pkts[1], pkts, count * sizeof(struct rte_mbuf *));
2588 		pkts[0] = rarp_mbuf;
2589 		count += 1;
2590 	}
2591 
2592 	return count;
2593 }
2594