xref: /dpdk/examples/vhost/main.c (revision 62814bc2e923b3d1867a93a1a4cd6073f5065e41)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <arpa/inet.h>
35 #include <getopt.h>
36 #include <linux/if_ether.h>
37 #include <linux/if_vlan.h>
38 #include <linux/virtio_net.h>
39 #include <linux/virtio_ring.h>
40 #include <signal.h>
41 #include <stdint.h>
42 #include <sys/eventfd.h>
43 #include <sys/param.h>
44 #include <unistd.h>
45 
46 #include <rte_atomic.h>
47 #include <rte_cycles.h>
48 #include <rte_ethdev.h>
49 #include <rte_log.h>
50 #include <rte_string_fns.h>
51 #include <rte_malloc.h>
52 
53 #include "main.h"
54 #include "virtio-net.h"
55 #include "vhost-net-cdev.h"
56 
57 #define MAX_QUEUES 128
58 
59 /* the maximum number of external ports supported */
60 #define MAX_SUP_PORTS 1
61 
62 /*
63  * Calculate the number of buffers needed per port
64  */
65 #define NUM_MBUFS_PER_PORT ((MAX_QUEUES*RTE_TEST_RX_DESC_DEFAULT) +  		\
66 							(num_switching_cores*MAX_PKT_BURST) +  			\
67 							(num_switching_cores*RTE_TEST_TX_DESC_DEFAULT) +\
68 							(num_switching_cores*MBUF_CACHE_SIZE))
69 
70 #define MBUF_CACHE_SIZE 128
71 #define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
72 
73 /*
74  * No frame data buffer allocated from host are required for zero copy
75  * implementation, guest will allocate the frame data buffer, and vhost
76  * directly use it.
77  */
78 #define VIRTIO_DESCRIPTOR_LEN_ZCP 1518
79 #define MBUF_SIZE_ZCP (VIRTIO_DESCRIPTOR_LEN_ZCP + sizeof(struct rte_mbuf) \
80 	+ RTE_PKTMBUF_HEADROOM)
81 #define MBUF_CACHE_SIZE_ZCP 0
82 
83 /*
84  * RX and TX Prefetch, Host, and Write-back threshold values should be
85  * carefully set for optimal performance. Consult the network
86  * controller's datasheet and supporting DPDK documentation for guidance
87  * on how these parameters should be set.
88  */
89 #define RX_PTHRESH 8 /* Default values of RX prefetch threshold reg. */
90 #define RX_HTHRESH 8 /* Default values of RX host threshold reg. */
91 #define RX_WTHRESH 4 /* Default values of RX write-back threshold reg. */
92 
93 /*
94  * These default values are optimized for use with the Intel(R) 82599 10 GbE
95  * Controller and the DPDK ixgbe PMD. Consider using other values for other
96  * network controllers and/or network drivers.
97  */
98 #define TX_PTHRESH 36 /* Default values of TX prefetch threshold reg. */
99 #define TX_HTHRESH 0  /* Default values of TX host threshold reg. */
100 #define TX_WTHRESH 0  /* Default values of TX write-back threshold reg. */
101 
102 #define MAX_PKT_BURST 32 		/* Max burst size for RX/TX */
103 #define MAX_MRG_PKT_BURST 16 	/* Max burst for merge buffers. Set to 1 due to performance issue. */
104 #define BURST_TX_DRAIN_US 100 	/* TX drain every ~100us */
105 
106 #define BURST_RX_WAIT_US 15 	/* Defines how long we wait between retries on RX */
107 #define BURST_RX_RETRIES 4		/* Number of retries on RX. */
108 
109 #define JUMBO_FRAME_MAX_SIZE    0x2600
110 
111 /* State of virtio device. */
112 #define DEVICE_MAC_LEARNING 0
113 #define DEVICE_RX			1
114 #define DEVICE_SAFE_REMOVE	2
115 
116 /* Config_core_flag status definitions. */
117 #define REQUEST_DEV_REMOVAL 1
118 #define ACK_DEV_REMOVAL 0
119 
120 /* Configurable number of RX/TX ring descriptors */
121 #define RTE_TEST_RX_DESC_DEFAULT 1024
122 #define RTE_TEST_TX_DESC_DEFAULT 512
123 
124 /*
125  * Need refine these 2 macros for legacy and DPDK based front end:
126  * Max vring avail descriptor/entries from guest - MAX_PKT_BURST
127  * And then adjust power 2.
128  */
129 /*
130  * For legacy front end, 128 descriptors,
131  * half for virtio header, another half for mbuf.
132  */
133 #define RTE_TEST_RX_DESC_DEFAULT_ZCP 32   /* legacy: 32, DPDK virt FE: 128. */
134 #define RTE_TEST_TX_DESC_DEFAULT_ZCP 64   /* legacy: 64, DPDK virt FE: 64.  */
135 
136 /* Get first 4 bytes in mbuf headroom. */
137 #define MBUF_HEADROOM_UINT32(mbuf) (*(uint32_t *)((uint8_t *)(mbuf) \
138 		+ sizeof(struct rte_mbuf)))
139 
140 /* true if x is a power of 2 */
141 #define POWEROF2(x) ((((x)-1) & (x)) == 0)
142 
143 #define INVALID_PORT_ID 0xFF
144 
145 /* Max number of devices. Limited by vmdq. */
146 #define MAX_DEVICES 64
147 
148 /* Size of buffers used for snprintfs. */
149 #define MAX_PRINT_BUFF 6072
150 
151 /* Maximum character device basename size. */
152 #define MAX_BASENAME_SZ 10
153 
154 /* Maximum long option length for option parsing. */
155 #define MAX_LONG_OPT_SZ 64
156 
157 /* Used to compare MAC addresses. */
158 #define MAC_ADDR_CMP 0xFFFFFFFFFFFFULL
159 
160 /* Number of descriptors per cacheline. */
161 #define DESC_PER_CACHELINE (CACHE_LINE_SIZE / sizeof(struct vring_desc))
162 
163 /* mask of enabled ports */
164 static uint32_t enabled_port_mask = 0;
165 
166 /*Number of switching cores enabled*/
167 static uint32_t num_switching_cores = 0;
168 
169 /* number of devices/queues to support*/
170 static uint32_t num_queues = 0;
171 uint32_t num_devices = 0;
172 
173 /*
174  * Enable zero copy, pkts buffer will directly dma to hw descriptor,
175  * disabled on default.
176  */
177 static uint32_t zero_copy;
178 
179 /* number of descriptors to apply*/
180 static uint32_t num_rx_descriptor = RTE_TEST_RX_DESC_DEFAULT_ZCP;
181 static uint32_t num_tx_descriptor = RTE_TEST_TX_DESC_DEFAULT_ZCP;
182 
183 /* max ring descriptor, ixgbe, i40e, e1000 all are 4096. */
184 #define MAX_RING_DESC 4096
185 
186 struct vpool {
187 	struct rte_mempool *pool;
188 	struct rte_ring *ring;
189 	uint32_t buf_size;
190 } vpool_array[MAX_QUEUES+MAX_QUEUES];
191 
192 /* Enable VM2VM communications. If this is disabled then the MAC address compare is skipped. */
193 typedef enum {
194 	VM2VM_DISABLED = 0,
195 	VM2VM_SOFTWARE = 1,
196 	VM2VM_HARDWARE = 2,
197 	VM2VM_LAST
198 } vm2vm_type;
199 static vm2vm_type vm2vm_mode = VM2VM_SOFTWARE;
200 
201 /* The type of host physical address translated from guest physical address. */
202 typedef enum {
203 	PHYS_ADDR_CONTINUOUS = 0,
204 	PHYS_ADDR_CROSS_SUBREG = 1,
205 	PHYS_ADDR_INVALID = 2,
206 	PHYS_ADDR_LAST
207 } hpa_type;
208 
209 /* Enable stats. */
210 static uint32_t enable_stats = 0;
211 /* Enable retries on RX. */
212 static uint32_t enable_retry = 1;
213 /* Specify timeout (in useconds) between retries on RX. */
214 static uint32_t burst_rx_delay_time = BURST_RX_WAIT_US;
215 /* Specify the number of retries on RX. */
216 static uint32_t burst_rx_retry_num = BURST_RX_RETRIES;
217 
218 /* Character device basename. Can be set by user. */
219 static char dev_basename[MAX_BASENAME_SZ] = "vhost-net";
220 
221 /* Charater device index. Can be set by user. */
222 static uint32_t dev_index = 0;
223 
224 /* This can be set by the user so it is made available here. */
225 extern uint64_t VHOST_FEATURES;
226 
227 /* Default configuration for rx and tx thresholds etc. */
228 static struct rte_eth_rxconf rx_conf_default = {
229 	.rx_thresh = {
230 		.pthresh = RX_PTHRESH,
231 		.hthresh = RX_HTHRESH,
232 		.wthresh = RX_WTHRESH,
233 	},
234 	.rx_drop_en = 1,
235 };
236 
237 /*
238  * These default values are optimized for use with the Intel(R) 82599 10 GbE
239  * Controller and the DPDK ixgbe/igb PMD. Consider using other values for other
240  * network controllers and/or network drivers.
241  */
242 static struct rte_eth_txconf tx_conf_default = {
243 	.tx_thresh = {
244 		.pthresh = TX_PTHRESH,
245 		.hthresh = TX_HTHRESH,
246 		.wthresh = TX_WTHRESH,
247 	},
248 	.tx_free_thresh = 0, /* Use PMD default values */
249 	.tx_rs_thresh = 0, /* Use PMD default values */
250 };
251 
252 /* empty vmdq configuration structure. Filled in programatically */
253 static struct rte_eth_conf vmdq_conf_default = {
254 	.rxmode = {
255 		.mq_mode        = ETH_MQ_RX_VMDQ_ONLY,
256 		.split_hdr_size = 0,
257 		.header_split   = 0, /**< Header Split disabled */
258 		.hw_ip_checksum = 0, /**< IP checksum offload disabled */
259 		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
260 		/*
261 		 * It is necessary for 1G NIC such as I350,
262 		 * this fixes bug of ipv4 forwarding in guest can't
263 		 * forward pakets from one virtio dev to another virtio dev.
264 		 */
265 		.hw_vlan_strip  = 1, /**< VLAN strip enabled. */
266 		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
267 		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
268 	},
269 
270 	.txmode = {
271 		.mq_mode = ETH_MQ_TX_NONE,
272 	},
273 	.rx_adv_conf = {
274 		/*
275 		 * should be overridden separately in code with
276 		 * appropriate values
277 		 */
278 		.vmdq_rx_conf = {
279 			.nb_queue_pools = ETH_8_POOLS,
280 			.enable_default_pool = 0,
281 			.default_pool = 0,
282 			.nb_pool_maps = 0,
283 			.pool_map = {{0, 0},},
284 		},
285 	},
286 };
287 
288 static unsigned lcore_ids[RTE_MAX_LCORE];
289 static uint8_t ports[RTE_MAX_ETHPORTS];
290 static unsigned num_ports = 0; /**< The number of ports specified in command line */
291 
292 static const uint16_t external_pkt_default_vlan_tag = 2000;
293 const uint16_t vlan_tags[] = {
294 	1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007,
295 	1008, 1009, 1010, 1011,	1012, 1013, 1014, 1015,
296 	1016, 1017, 1018, 1019, 1020, 1021, 1022, 1023,
297 	1024, 1025, 1026, 1027, 1028, 1029, 1030, 1031,
298 	1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039,
299 	1040, 1041, 1042, 1043, 1044, 1045, 1046, 1047,
300 	1048, 1049, 1050, 1051, 1052, 1053, 1054, 1055,
301 	1056, 1057, 1058, 1059, 1060, 1061, 1062, 1063,
302 };
303 
304 /* ethernet addresses of ports */
305 static struct ether_addr vmdq_ports_eth_addr[RTE_MAX_ETHPORTS];
306 
307 /* heads for the main used and free linked lists for the data path. */
308 static struct virtio_net_data_ll *ll_root_used = NULL;
309 static struct virtio_net_data_ll *ll_root_free = NULL;
310 
311 /* Array of data core structures containing information on individual core linked lists. */
312 static struct lcore_info lcore_info[RTE_MAX_LCORE];
313 
314 /* Used for queueing bursts of TX packets. */
315 struct mbuf_table {
316 	unsigned len;
317 	unsigned txq_id;
318 	struct rte_mbuf *m_table[MAX_PKT_BURST];
319 };
320 
321 /* TX queue for each data core. */
322 struct mbuf_table lcore_tx_queue[RTE_MAX_LCORE];
323 
324 /* TX queue fori each virtio device for zero copy. */
325 struct mbuf_table tx_queue_zcp[MAX_QUEUES];
326 
327 /* Vlan header struct used to insert vlan tags on TX. */
328 struct vlan_ethhdr {
329 	unsigned char   h_dest[ETH_ALEN];
330 	unsigned char   h_source[ETH_ALEN];
331 	__be16          h_vlan_proto;
332 	__be16          h_vlan_TCI;
333 	__be16          h_vlan_encapsulated_proto;
334 };
335 
336 /* IPv4 Header */
337 struct ipv4_hdr {
338 	uint8_t  version_ihl;		/**< version and header length */
339 	uint8_t  type_of_service;	/**< type of service */
340 	uint16_t total_length;		/**< length of packet */
341 	uint16_t packet_id;		/**< packet ID */
342 	uint16_t fragment_offset;	/**< fragmentation offset */
343 	uint8_t  time_to_live;		/**< time to live */
344 	uint8_t  next_proto_id;		/**< protocol ID */
345 	uint16_t hdr_checksum;		/**< header checksum */
346 	uint32_t src_addr;		/**< source address */
347 	uint32_t dst_addr;		/**< destination address */
348 } __attribute__((__packed__));
349 
350 /* Header lengths. */
351 #define VLAN_HLEN       4
352 #define VLAN_ETH_HLEN   18
353 
354 /* Per-device statistics struct */
355 struct device_statistics {
356 	uint64_t tx_total;
357 	rte_atomic64_t rx_total_atomic;
358 	uint64_t rx_total;
359 	uint64_t tx;
360 	rte_atomic64_t rx_atomic;
361 	uint64_t rx;
362 } __rte_cache_aligned;
363 struct device_statistics dev_statistics[MAX_DEVICES];
364 
365 /*
366  * Builds up the correct configuration for VMDQ VLAN pool map
367  * according to the pool & queue limits.
368  */
369 static inline int
370 get_eth_conf(struct rte_eth_conf *eth_conf, uint32_t num_devices)
371 {
372 	struct rte_eth_vmdq_rx_conf conf;
373 	unsigned i;
374 
375 	memset(&conf, 0, sizeof(conf));
376 	conf.nb_queue_pools = (enum rte_eth_nb_pools)num_devices;
377 	conf.nb_pool_maps = num_devices;
378 	conf.enable_loop_back =
379 		vmdq_conf_default.rx_adv_conf.vmdq_rx_conf.enable_loop_back;
380 
381 	for (i = 0; i < conf.nb_pool_maps; i++) {
382 		conf.pool_map[i].vlan_id = vlan_tags[ i ];
383 		conf.pool_map[i].pools = (1UL << i);
384 	}
385 
386 	(void)(rte_memcpy(eth_conf, &vmdq_conf_default, sizeof(*eth_conf)));
387 	(void)(rte_memcpy(&eth_conf->rx_adv_conf.vmdq_rx_conf, &conf,
388 		   sizeof(eth_conf->rx_adv_conf.vmdq_rx_conf)));
389 	return 0;
390 }
391 
392 /*
393  * Validate the device number according to the max pool number gotten form
394  * dev_info. If the device number is invalid, give the error message and
395  * return -1. Each device must have its own pool.
396  */
397 static inline int
398 validate_num_devices(uint32_t max_nb_devices)
399 {
400 	if (num_devices > max_nb_devices) {
401 		RTE_LOG(ERR, VHOST_PORT, "invalid number of devices\n");
402 		return -1;
403 	}
404 	return 0;
405 }
406 
407 /*
408  * Initialises a given port using global settings and with the rx buffers
409  * coming from the mbuf_pool passed as parameter
410  */
411 static inline int
412 port_init(uint8_t port)
413 {
414 	struct rte_eth_dev_info dev_info;
415 	struct rte_eth_conf port_conf;
416 	uint16_t rx_rings, tx_rings;
417 	uint16_t rx_ring_size, tx_ring_size;
418 	int retval;
419 	uint16_t q;
420 
421 	/* The max pool number from dev_info will be used to validate the pool number specified in cmd line */
422 	rte_eth_dev_info_get (port, &dev_info);
423 
424 	/*configure the number of supported virtio devices based on VMDQ limits */
425 	num_devices = dev_info.max_vmdq_pools;
426 	num_queues = dev_info.max_rx_queues;
427 
428 	if (zero_copy) {
429 		rx_ring_size = num_rx_descriptor;
430 		tx_ring_size = num_tx_descriptor;
431 		tx_rings = dev_info.max_tx_queues;
432 	} else {
433 		rx_ring_size = RTE_TEST_RX_DESC_DEFAULT;
434 		tx_ring_size = RTE_TEST_TX_DESC_DEFAULT;
435 		tx_rings = (uint16_t)rte_lcore_count();
436 	}
437 
438 	retval = validate_num_devices(MAX_DEVICES);
439 	if (retval < 0)
440 		return retval;
441 
442 	/* Get port configuration. */
443 	retval = get_eth_conf(&port_conf, num_devices);
444 	if (retval < 0)
445 		return retval;
446 
447 	if (port >= rte_eth_dev_count()) return -1;
448 
449 	rx_rings = (uint16_t)num_queues,
450 	/* Configure ethernet device. */
451 	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
452 	if (retval != 0)
453 		return retval;
454 
455 	/* Setup the queues. */
456 	for (q = 0; q < rx_rings; q ++) {
457 		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
458 						rte_eth_dev_socket_id(port), &rx_conf_default,
459 						vpool_array[q].pool);
460 		if (retval < 0)
461 			return retval;
462 	}
463 	for (q = 0; q < tx_rings; q ++) {
464 		retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
465 						rte_eth_dev_socket_id(port), &tx_conf_default);
466 		if (retval < 0)
467 			return retval;
468 	}
469 
470 	/* Start the device. */
471 	retval  = rte_eth_dev_start(port);
472 	if (retval < 0) {
473 		RTE_LOG(ERR, VHOST_DATA, "Failed to start the device.\n");
474 		return retval;
475 	}
476 
477 	rte_eth_macaddr_get(port, &vmdq_ports_eth_addr[port]);
478 	RTE_LOG(INFO, VHOST_PORT, "Max virtio devices supported: %u\n", num_devices);
479 	RTE_LOG(INFO, VHOST_PORT, "Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
480 			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
481 			(unsigned)port,
482 			vmdq_ports_eth_addr[port].addr_bytes[0],
483 			vmdq_ports_eth_addr[port].addr_bytes[1],
484 			vmdq_ports_eth_addr[port].addr_bytes[2],
485 			vmdq_ports_eth_addr[port].addr_bytes[3],
486 			vmdq_ports_eth_addr[port].addr_bytes[4],
487 			vmdq_ports_eth_addr[port].addr_bytes[5]);
488 
489 	return 0;
490 }
491 
492 /*
493  * Set character device basename.
494  */
495 static int
496 us_vhost_parse_basename(const char *q_arg)
497 {
498 	/* parse number string */
499 
500 	if (strnlen(q_arg, MAX_BASENAME_SZ) > MAX_BASENAME_SZ)
501 		return -1;
502 	else
503 		snprintf((char*)&dev_basename, MAX_BASENAME_SZ, "%s", q_arg);
504 
505 	return 0;
506 }
507 
508 /*
509  * Parse the portmask provided at run time.
510  */
511 static int
512 parse_portmask(const char *portmask)
513 {
514 	char *end = NULL;
515 	unsigned long pm;
516 
517 	errno = 0;
518 
519 	/* parse hexadecimal string */
520 	pm = strtoul(portmask, &end, 16);
521 	if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0') || (errno != 0))
522 		return -1;
523 
524 	if (pm == 0)
525 		return -1;
526 
527 	return pm;
528 
529 }
530 
531 /*
532  * Parse num options at run time.
533  */
534 static int
535 parse_num_opt(const char *q_arg, uint32_t max_valid_value)
536 {
537 	char *end = NULL;
538 	unsigned long num;
539 
540 	errno = 0;
541 
542 	/* parse unsigned int string */
543 	num = strtoul(q_arg, &end, 10);
544 	if ((q_arg[0] == '\0') || (end == NULL) || (*end != '\0') || (errno != 0))
545 		return -1;
546 
547 	if (num > max_valid_value)
548 		return -1;
549 
550 	return num;
551 
552 }
553 
554 /*
555  * Display usage
556  */
557 static void
558 us_vhost_usage(const char *prgname)
559 {
560 	RTE_LOG(INFO, VHOST_CONFIG, "%s [EAL options] -- -p PORTMASK\n"
561 	"		--vm2vm [0|1|2]\n"
562 	"		--rx_retry [0|1] --mergeable [0|1] --stats [0-N]\n"
563 	"		--dev-basename <name> --dev-index [0-N]\n"
564 	"		--nb-devices ND\n"
565 	"		-p PORTMASK: Set mask for ports to be used by application\n"
566 	"		--vm2vm [0|1|2]: disable/software(default)/hardware vm2vm comms\n"
567 	"		--rx-retry [0|1]: disable/enable(default) retries on rx. Enable retry if destintation queue is full\n"
568 	"		--rx-retry-delay [0-N]: timeout(in usecond) between retries on RX. This makes effect only if retries on rx enabled\n"
569 	"		--rx-retry-num [0-N]: the number of retries on rx. This makes effect only if retries on rx enabled\n"
570 	"		--mergeable [0|1]: disable(default)/enable RX mergeable buffers\n"
571 	"		--stats [0-N]: 0: Disable stats, N: Time in seconds to print stats\n"
572 	"		--dev-basename: The basename to be used for the character device.\n"
573 	"		--dev-index [0-N]: Defaults to zero if not used. Index is appended to basename.\n"
574 	"		--zero-copy [0|1]: disable(default)/enable rx/tx "
575 			"zero copy\n"
576 	"		--rx-desc-num [0-N]: the number of descriptors on rx, "
577 			"used only when zero copy is enabled.\n"
578 	"		--tx-desc-num [0-N]: the number of descriptors on tx, "
579 			"used only when zero copy is enabled.\n",
580 	       prgname);
581 }
582 
583 /*
584  * Parse the arguments given in the command line of the application.
585  */
586 static int
587 us_vhost_parse_args(int argc, char **argv)
588 {
589 	int opt, ret;
590 	int option_index;
591 	unsigned i;
592 	const char *prgname = argv[0];
593 	static struct option long_option[] = {
594 		{"vm2vm", required_argument, NULL, 0},
595 		{"rx-retry", required_argument, NULL, 0},
596 		{"rx-retry-delay", required_argument, NULL, 0},
597 		{"rx-retry-num", required_argument, NULL, 0},
598 		{"mergeable", required_argument, NULL, 0},
599 		{"stats", required_argument, NULL, 0},
600 		{"dev-basename", required_argument, NULL, 0},
601 		{"dev-index", required_argument, NULL, 0},
602 		{"zero-copy", required_argument, NULL, 0},
603 		{"rx-desc-num", required_argument, NULL, 0},
604 		{"tx-desc-num", required_argument, NULL, 0},
605 		{NULL, 0, 0, 0},
606 	};
607 
608 	/* Parse command line */
609 	while ((opt = getopt_long(argc, argv, "p:",long_option, &option_index)) != EOF) {
610 		switch (opt) {
611 		/* Portmask */
612 		case 'p':
613 			enabled_port_mask = parse_portmask(optarg);
614 			if (enabled_port_mask == 0) {
615 				RTE_LOG(INFO, VHOST_CONFIG, "Invalid portmask\n");
616 				us_vhost_usage(prgname);
617 				return -1;
618 			}
619 			break;
620 
621 		case 0:
622 			/* Enable/disable vm2vm comms. */
623 			if (!strncmp(long_option[option_index].name, "vm2vm",
624 				MAX_LONG_OPT_SZ)) {
625 				ret = parse_num_opt(optarg, (VM2VM_LAST - 1));
626 				if (ret == -1) {
627 					RTE_LOG(INFO, VHOST_CONFIG,
628 						"Invalid argument for "
629 						"vm2vm [0|1|2]\n");
630 					us_vhost_usage(prgname);
631 					return -1;
632 				} else {
633 					vm2vm_mode = (vm2vm_type)ret;
634 				}
635 			}
636 
637 			/* Enable/disable retries on RX. */
638 			if (!strncmp(long_option[option_index].name, "rx-retry", MAX_LONG_OPT_SZ)) {
639 				ret = parse_num_opt(optarg, 1);
640 				if (ret == -1) {
641 					RTE_LOG(INFO, VHOST_CONFIG, "Invalid argument for rx-retry [0|1]\n");
642 					us_vhost_usage(prgname);
643 					return -1;
644 				} else {
645 					enable_retry = ret;
646 				}
647 			}
648 
649 			/* Specify the retries delay time (in useconds) on RX. */
650 			if (!strncmp(long_option[option_index].name, "rx-retry-delay", MAX_LONG_OPT_SZ)) {
651 				ret = parse_num_opt(optarg, INT32_MAX);
652 				if (ret == -1) {
653 					RTE_LOG(INFO, VHOST_CONFIG, "Invalid argument for rx-retry-delay [0-N]\n");
654 					us_vhost_usage(prgname);
655 					return -1;
656 				} else {
657 					burst_rx_delay_time = ret;
658 				}
659 			}
660 
661 			/* Specify the retries number on RX. */
662 			if (!strncmp(long_option[option_index].name, "rx-retry-num", MAX_LONG_OPT_SZ)) {
663 				ret = parse_num_opt(optarg, INT32_MAX);
664 				if (ret == -1) {
665 					RTE_LOG(INFO, VHOST_CONFIG, "Invalid argument for rx-retry-num [0-N]\n");
666 					us_vhost_usage(prgname);
667 					return -1;
668 				} else {
669 					burst_rx_retry_num = ret;
670 				}
671 			}
672 
673 			/* Enable/disable RX mergeable buffers. */
674 			if (!strncmp(long_option[option_index].name, "mergeable", MAX_LONG_OPT_SZ)) {
675 				ret = parse_num_opt(optarg, 1);
676 				if (ret == -1) {
677 					RTE_LOG(INFO, VHOST_CONFIG, "Invalid argument for mergeable [0|1]\n");
678 					us_vhost_usage(prgname);
679 					return -1;
680 				} else {
681 					if (ret) {
682 						vmdq_conf_default.rxmode.jumbo_frame = 1;
683 						vmdq_conf_default.rxmode.max_rx_pkt_len
684 							= JUMBO_FRAME_MAX_SIZE;
685 						VHOST_FEATURES = (1ULL << VIRTIO_NET_F_MRG_RXBUF);
686 					}
687 				}
688 			}
689 
690 			/* Enable/disable stats. */
691 			if (!strncmp(long_option[option_index].name, "stats", MAX_LONG_OPT_SZ)) {
692 				ret = parse_num_opt(optarg, INT32_MAX);
693 				if (ret == -1) {
694 					RTE_LOG(INFO, VHOST_CONFIG, "Invalid argument for stats [0..N]\n");
695 					us_vhost_usage(prgname);
696 					return -1;
697 				} else {
698 					enable_stats = ret;
699 				}
700 			}
701 
702 			/* Set character device basename. */
703 			if (!strncmp(long_option[option_index].name, "dev-basename", MAX_LONG_OPT_SZ)) {
704 				if (us_vhost_parse_basename(optarg) == -1) {
705 					RTE_LOG(INFO, VHOST_CONFIG, "Invalid argument for character device basename (Max %d characters)\n", MAX_BASENAME_SZ);
706 					us_vhost_usage(prgname);
707 					return -1;
708 				}
709 			}
710 
711 			/* Set character device index. */
712 			if (!strncmp(long_option[option_index].name, "dev-index", MAX_LONG_OPT_SZ)) {
713 				ret = parse_num_opt(optarg, INT32_MAX);
714 				if (ret == -1) {
715 					RTE_LOG(INFO, VHOST_CONFIG, "Invalid argument for character device index [0..N]\n");
716 					us_vhost_usage(prgname);
717 					return -1;
718 				} else
719 					dev_index = ret;
720 			}
721 
722 			/* Enable/disable rx/tx zero copy. */
723 			if (!strncmp(long_option[option_index].name,
724 				"zero-copy", MAX_LONG_OPT_SZ)) {
725 				ret = parse_num_opt(optarg, 1);
726 				if (ret == -1) {
727 					RTE_LOG(INFO, VHOST_CONFIG,
728 						"Invalid argument"
729 						" for zero-copy [0|1]\n");
730 					us_vhost_usage(prgname);
731 					return -1;
732 				} else
733 					zero_copy = ret;
734 
735 				if (zero_copy) {
736 #ifdef RTE_MBUF_REFCNT
737 					RTE_LOG(ERR, VHOST_CONFIG, "Before running "
738 					"zero copy vhost APP, please "
739 					"disable RTE_MBUF_REFCNT\n"
740 					"in config file and then rebuild DPDK "
741 					"core lib!\n"
742 					"Otherwise please disable zero copy "
743 					"flag in command line!\n");
744 					return -1;
745 #endif
746 				}
747 			}
748 
749 			/* Specify the descriptor number on RX. */
750 			if (!strncmp(long_option[option_index].name,
751 				"rx-desc-num", MAX_LONG_OPT_SZ)) {
752 				ret = parse_num_opt(optarg, MAX_RING_DESC);
753 				if ((ret == -1) || (!POWEROF2(ret))) {
754 					RTE_LOG(INFO, VHOST_CONFIG,
755 					"Invalid argument for rx-desc-num[0-N],"
756 					"power of 2 required.\n");
757 					us_vhost_usage(prgname);
758 					return -1;
759 				} else {
760 					num_rx_descriptor = ret;
761 				}
762 			}
763 
764 			/* Specify the descriptor number on TX. */
765 			if (!strncmp(long_option[option_index].name,
766 				"tx-desc-num", MAX_LONG_OPT_SZ)) {
767 				ret = parse_num_opt(optarg, MAX_RING_DESC);
768 				if ((ret == -1) || (!POWEROF2(ret))) {
769 					RTE_LOG(INFO, VHOST_CONFIG,
770 					"Invalid argument for tx-desc-num [0-N],"
771 					"power of 2 required.\n");
772 					us_vhost_usage(prgname);
773 					return -1;
774 				} else {
775 					num_tx_descriptor = ret;
776 				}
777 			}
778 
779 			break;
780 
781 			/* Invalid option - print options. */
782 		default:
783 			us_vhost_usage(prgname);
784 			return -1;
785 		}
786 	}
787 
788 	for (i = 0; i < RTE_MAX_ETHPORTS; i++) {
789 		if (enabled_port_mask & (1 << i))
790 			ports[num_ports++] = (uint8_t)i;
791 	}
792 
793 	if ((num_ports ==  0) || (num_ports > MAX_SUP_PORTS)) {
794 		RTE_LOG(INFO, VHOST_PORT, "Current enabled port number is %u,"
795 			"but only %u port can be enabled\n",num_ports, MAX_SUP_PORTS);
796 		return -1;
797 	}
798 
799 	if ((zero_copy == 1) && (vm2vm_mode == VM2VM_SOFTWARE)) {
800 		RTE_LOG(INFO, VHOST_PORT,
801 			"Vhost zero copy doesn't support software vm2vm,"
802 			"please specify 'vm2vm 2' to use hardware vm2vm.\n");
803 		return -1;
804 	}
805 
806 	if ((zero_copy == 1) && (vmdq_conf_default.rxmode.jumbo_frame == 1)) {
807 		RTE_LOG(INFO, VHOST_PORT,
808 			"Vhost zero copy doesn't support jumbo frame,"
809 			"please specify '--mergeable 0' to disable the "
810 			"mergeable feature.\n");
811 		return -1;
812 	}
813 
814 	return 0;
815 }
816 
817 /*
818  * Update the global var NUM_PORTS and array PORTS according to system ports number
819  * and return valid ports number
820  */
821 static unsigned check_ports_num(unsigned nb_ports)
822 {
823 	unsigned valid_num_ports = num_ports;
824 	unsigned portid;
825 
826 	if (num_ports > nb_ports) {
827 		RTE_LOG(INFO, VHOST_PORT, "\nSpecified port number(%u) exceeds total system port number(%u)\n",
828 			num_ports, nb_ports);
829 		num_ports = nb_ports;
830 	}
831 
832 	for (portid = 0; portid < num_ports; portid ++) {
833 		if (ports[portid] >= nb_ports) {
834 			RTE_LOG(INFO, VHOST_PORT, "\nSpecified port ID(%u) exceeds max system port ID(%u)\n",
835 				ports[portid], (nb_ports - 1));
836 			ports[portid] = INVALID_PORT_ID;
837 			valid_num_ports--;
838 		}
839 	}
840 	return valid_num_ports;
841 }
842 
843 /*
844  * Macro to print out packet contents. Wrapped in debug define so that the
845  * data path is not effected when debug is disabled.
846  */
847 #ifdef DEBUG
848 #define PRINT_PACKET(device, addr, size, header) do {																\
849 	char *pkt_addr = (char*)(addr);																					\
850 	unsigned int index;																								\
851 	char packet[MAX_PRINT_BUFF];																					\
852 																													\
853 	if ((header))																									\
854 		snprintf(packet, MAX_PRINT_BUFF, "(%"PRIu64") Header size %d: ", (device->device_fh), (size));				\
855 	else																											\
856 		snprintf(packet, MAX_PRINT_BUFF, "(%"PRIu64") Packet size %d: ", (device->device_fh), (size));				\
857 	for (index = 0; index < (size); index++) {																		\
858 		snprintf(packet + strnlen(packet, MAX_PRINT_BUFF), MAX_PRINT_BUFF - strnlen(packet, MAX_PRINT_BUFF),	\
859 			"%02hhx ", pkt_addr[index]);																			\
860 	}																												\
861 	snprintf(packet + strnlen(packet, MAX_PRINT_BUFF), MAX_PRINT_BUFF - strnlen(packet, MAX_PRINT_BUFF), "\n");	\
862 																													\
863 	LOG_DEBUG(VHOST_DATA, "%s", packet);																					\
864 } while(0)
865 #else
866 #define PRINT_PACKET(device, addr, size, header) do{} while(0)
867 #endif
868 
869 /*
870  * Function to convert guest physical addresses to vhost virtual addresses. This
871  * is used to convert virtio buffer addresses.
872  */
873 static inline uint64_t __attribute__((always_inline))
874 gpa_to_vva(struct virtio_net *dev, uint64_t guest_pa)
875 {
876 	struct virtio_memory_regions *region;
877 	uint32_t regionidx;
878 	uint64_t vhost_va = 0;
879 
880 	for (regionidx = 0; regionidx < dev->mem->nregions; regionidx++) {
881 		region = &dev->mem->regions[regionidx];
882 		if ((guest_pa >= region->guest_phys_address) &&
883 			(guest_pa <= region->guest_phys_address_end)) {
884 			vhost_va = region->address_offset + guest_pa;
885 			break;
886 		}
887 	}
888 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") GPA %p| VVA %p\n",
889 		dev->device_fh, (void*)(uintptr_t)guest_pa, (void*)(uintptr_t)vhost_va);
890 
891 	return vhost_va;
892 }
893 
894 /*
895  * Function to convert guest physical addresses to vhost physical addresses.
896  * This is used to convert virtio buffer addresses.
897  */
898 static inline uint64_t __attribute__((always_inline))
899 gpa_to_hpa(struct virtio_net *dev, uint64_t guest_pa,
900 	uint32_t buf_len, hpa_type *addr_type)
901 {
902 	struct virtio_memory_regions_hpa *region;
903 	uint32_t regionidx;
904 	uint64_t vhost_pa = 0;
905 
906 	*addr_type = PHYS_ADDR_INVALID;
907 
908 	for (regionidx = 0; regionidx < dev->mem->nregions_hpa; regionidx++) {
909 		region = &dev->mem->regions_hpa[regionidx];
910 		if ((guest_pa >= region->guest_phys_address) &&
911 			(guest_pa <= region->guest_phys_address_end)) {
912 			vhost_pa = region->host_phys_addr_offset + guest_pa;
913 			if (likely((guest_pa + buf_len - 1)
914 				<= region->guest_phys_address_end))
915 				*addr_type = PHYS_ADDR_CONTINUOUS;
916 			else
917 				*addr_type = PHYS_ADDR_CROSS_SUBREG;
918 			break;
919 		}
920 	}
921 
922 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") GPA %p| HPA %p\n",
923 		dev->device_fh, (void *)(uintptr_t)guest_pa,
924 		(void *)(uintptr_t)vhost_pa);
925 
926 	return vhost_pa;
927 }
928 
929 /*
930  * This function adds buffers to the virtio devices RX virtqueue. Buffers can
931  * be received from the physical port or from another virtio device. A packet
932  * count is returned to indicate the number of packets that were succesfully
933  * added to the RX queue. This function works when mergeable is disabled.
934  */
935 static inline uint32_t __attribute__((always_inline))
936 virtio_dev_rx(struct virtio_net *dev, struct rte_mbuf **pkts, uint32_t count)
937 {
938 	struct vhost_virtqueue *vq;
939 	struct vring_desc *desc;
940 	struct rte_mbuf *buff;
941 	/* The virtio_hdr is initialised to 0. */
942 	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0,0,0,0,0,0},0};
943 	uint64_t buff_addr = 0;
944 	uint64_t buff_hdr_addr = 0;
945 	uint32_t head[MAX_PKT_BURST], packet_len = 0;
946 	uint32_t head_idx, packet_success = 0;
947 	uint32_t retry = 0;
948 	uint16_t avail_idx, res_cur_idx;
949 	uint16_t res_base_idx, res_end_idx;
950 	uint16_t free_entries;
951 	uint8_t success = 0;
952 
953 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
954 	vq = dev->virtqueue[VIRTIO_RXQ];
955 	count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;
956 
957 	/* As many data cores may want access to available buffers, they need to be reserved. */
958 	do {
959 		res_base_idx = vq->last_used_idx_res;
960 		avail_idx = *((volatile uint16_t *)&vq->avail->idx);
961 
962 		free_entries = (avail_idx - res_base_idx);
963 		/* If retry is enabled and the queue is full then we wait and retry to avoid packet loss. */
964 		if (enable_retry && unlikely(count > free_entries)) {
965 			for (retry = 0; retry < burst_rx_retry_num; retry++) {
966 				rte_delay_us(burst_rx_delay_time);
967 				avail_idx =
968 					*((volatile uint16_t *)&vq->avail->idx);
969 				free_entries = (avail_idx - res_base_idx);
970 				if (count <= free_entries)
971 					break;
972 			}
973 		}
974 
975 		/*check that we have enough buffers*/
976 		if (unlikely(count > free_entries))
977 			count = free_entries;
978 
979 		if (count == 0)
980 			return 0;
981 
982 		res_end_idx = res_base_idx + count;
983 		/* vq->last_used_idx_res is atomically updated. */
984 		success = rte_atomic16_cmpset(&vq->last_used_idx_res, res_base_idx,
985 									res_end_idx);
986 	} while (unlikely(success == 0));
987 	res_cur_idx = res_base_idx;
988 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n", dev->device_fh, res_cur_idx, res_end_idx);
989 
990 	/* Prefetch available ring to retrieve indexes. */
991 	rte_prefetch0(&vq->avail->ring[res_cur_idx & (vq->size - 1)]);
992 
993 	/* Retrieve all of the head indexes first to avoid caching issues. */
994 	for (head_idx = 0; head_idx < count; head_idx++)
995 		head[head_idx] = vq->avail->ring[(res_cur_idx + head_idx) & (vq->size - 1)];
996 
997 	/*Prefetch descriptor index. */
998 	rte_prefetch0(&vq->desc[head[packet_success]]);
999 
1000 	while (res_cur_idx != res_end_idx) {
1001 		/* Get descriptor from available ring */
1002 		desc = &vq->desc[head[packet_success]];
1003 
1004 		buff = pkts[packet_success];
1005 
1006 		/* Convert from gpa to vva (guest physical addr -> vhost virtual addr) */
1007 		buff_addr = gpa_to_vva(dev, desc->addr);
1008 		/* Prefetch buffer address. */
1009 		rte_prefetch0((void*)(uintptr_t)buff_addr);
1010 
1011 		/* Copy virtio_hdr to packet and increment buffer address */
1012 		buff_hdr_addr = buff_addr;
1013 		packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;
1014 
1015 		/*
1016 		 * If the descriptors are chained the header and data are
1017 		 * placed in separate buffers.
1018 		 */
1019 		if (desc->flags & VRING_DESC_F_NEXT) {
1020 			desc->len = vq->vhost_hlen;
1021 			desc = &vq->desc[desc->next];
1022 			/* Buffer address translation. */
1023 			buff_addr = gpa_to_vva(dev, desc->addr);
1024 			desc->len = rte_pktmbuf_data_len(buff);
1025 		} else {
1026 			buff_addr += vq->vhost_hlen;
1027 			desc->len = packet_len;
1028 		}
1029 
1030 		/* Update used ring with desc information */
1031 		vq->used->ring[res_cur_idx & (vq->size - 1)].id = head[packet_success];
1032 		vq->used->ring[res_cur_idx & (vq->size - 1)].len = packet_len;
1033 
1034 		/* Copy mbuf data to buffer */
1035 		rte_memcpy((void *)(uintptr_t)buff_addr,
1036 			(const void *)buff->pkt.data,
1037 			rte_pktmbuf_data_len(buff));
1038 		PRINT_PACKET(dev, (uintptr_t)buff_addr,
1039 			rte_pktmbuf_data_len(buff), 0);
1040 
1041 		res_cur_idx++;
1042 		packet_success++;
1043 
1044 		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
1045 			(const void *)&virtio_hdr, vq->vhost_hlen);
1046 
1047 		PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1);
1048 
1049 		if (res_cur_idx < res_end_idx) {
1050 			/* Prefetch descriptor index. */
1051 			rte_prefetch0(&vq->desc[head[packet_success]]);
1052 		}
1053 	}
1054 
1055 	rte_compiler_barrier();
1056 
1057 	/* Wait until it's our turn to add our buffer to the used ring. */
1058 	while (unlikely(vq->last_used_idx != res_base_idx))
1059 		rte_pause();
1060 
1061 	*(volatile uint16_t *)&vq->used->idx += count;
1062 	vq->last_used_idx = res_end_idx;
1063 
1064 	/* Kick the guest if necessary. */
1065 	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
1066 		eventfd_write((int)vq->kickfd, 1);
1067 	return count;
1068 }
1069 
1070 static inline uint32_t __attribute__((always_inline))
1071 copy_from_mbuf_to_vring(struct virtio_net *dev,
1072 	uint16_t res_base_idx, uint16_t res_end_idx,
1073 	struct rte_mbuf *pkt)
1074 {
1075 	uint32_t vec_idx = 0;
1076 	uint32_t entry_success = 0;
1077 	struct vhost_virtqueue *vq;
1078 	/* The virtio_hdr is initialised to 0. */
1079 	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {
1080 		{0, 0, 0, 0, 0, 0}, 0};
1081 	uint16_t cur_idx = res_base_idx;
1082 	uint64_t vb_addr = 0;
1083 	uint64_t vb_hdr_addr = 0;
1084 	uint32_t seg_offset = 0;
1085 	uint32_t vb_offset = 0;
1086 	uint32_t seg_avail;
1087 	uint32_t vb_avail;
1088 	uint32_t cpy_len, entry_len;
1089 
1090 	if (pkt == NULL)
1091 		return 0;
1092 
1093 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| "
1094 		"End Index %d\n",
1095 		dev->device_fh, cur_idx, res_end_idx);
1096 
1097 	/*
1098 	 * Convert from gpa to vva
1099 	 * (guest physical addr -> vhost virtual addr)
1100 	 */
1101 	vq = dev->virtqueue[VIRTIO_RXQ];
1102 	vb_addr =
1103 		gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
1104 	vb_hdr_addr = vb_addr;
1105 
1106 	/* Prefetch buffer address. */
1107 	rte_prefetch0((void *)(uintptr_t)vb_addr);
1108 
1109 	virtio_hdr.num_buffers = res_end_idx - res_base_idx;
1110 
1111 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
1112 		dev->device_fh, virtio_hdr.num_buffers);
1113 
1114 	rte_memcpy((void *)(uintptr_t)vb_hdr_addr,
1115 		(const void *)&virtio_hdr, vq->vhost_hlen);
1116 
1117 	PRINT_PACKET(dev, (uintptr_t)vb_hdr_addr, vq->vhost_hlen, 1);
1118 
1119 	seg_avail = rte_pktmbuf_data_len(pkt);
1120 	vb_offset = vq->vhost_hlen;
1121 	vb_avail =
1122 		vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
1123 
1124 	entry_len = vq->vhost_hlen;
1125 
1126 	if (vb_avail == 0) {
1127 		uint32_t desc_idx =
1128 			vq->buf_vec[vec_idx].desc_idx;
1129 		vq->desc[desc_idx].len = vq->vhost_hlen;
1130 
1131 		if ((vq->desc[desc_idx].flags
1132 			& VRING_DESC_F_NEXT) == 0) {
1133 			/* Update used ring with desc information */
1134 			vq->used->ring[cur_idx & (vq->size - 1)].id
1135 				= vq->buf_vec[vec_idx].desc_idx;
1136 			vq->used->ring[cur_idx & (vq->size - 1)].len
1137 				= entry_len;
1138 
1139 			entry_len = 0;
1140 			cur_idx++;
1141 			entry_success++;
1142 		}
1143 
1144 		vec_idx++;
1145 		vb_addr =
1146 			gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
1147 
1148 		/* Prefetch buffer address. */
1149 		rte_prefetch0((void *)(uintptr_t)vb_addr);
1150 		vb_offset = 0;
1151 		vb_avail = vq->buf_vec[vec_idx].buf_len;
1152 	}
1153 
1154 	cpy_len = RTE_MIN(vb_avail, seg_avail);
1155 
1156 	while (cpy_len > 0) {
1157 		/* Copy mbuf data to vring buffer */
1158 		rte_memcpy((void *)(uintptr_t)(vb_addr + vb_offset),
1159 			(const void *)(rte_pktmbuf_mtod(pkt, char*) + seg_offset),
1160 			cpy_len);
1161 
1162 		PRINT_PACKET(dev,
1163 			(uintptr_t)(vb_addr + vb_offset),
1164 			cpy_len, 0);
1165 
1166 		seg_offset += cpy_len;
1167 		vb_offset += cpy_len;
1168 		seg_avail -= cpy_len;
1169 		vb_avail -= cpy_len;
1170 		entry_len += cpy_len;
1171 
1172 		if (seg_avail != 0) {
1173 			/*
1174 			 * The virtio buffer in this vring
1175 			 * entry reach to its end.
1176 			 * But the segment doesn't complete.
1177 			 */
1178 			if ((vq->desc[vq->buf_vec[vec_idx].desc_idx].flags &
1179 				VRING_DESC_F_NEXT) == 0) {
1180 				/* Update used ring with desc information */
1181 				vq->used->ring[cur_idx & (vq->size - 1)].id
1182 					= vq->buf_vec[vec_idx].desc_idx;
1183 				vq->used->ring[cur_idx & (vq->size - 1)].len
1184 					= entry_len;
1185 				entry_len = 0;
1186 				cur_idx++;
1187 				entry_success++;
1188 			}
1189 
1190 			vec_idx++;
1191 			vb_addr = gpa_to_vva(dev,
1192 				vq->buf_vec[vec_idx].buf_addr);
1193 			vb_offset = 0;
1194 			vb_avail = vq->buf_vec[vec_idx].buf_len;
1195 			cpy_len = RTE_MIN(vb_avail, seg_avail);
1196 		} else {
1197 			/*
1198 			 * This current segment complete, need continue to
1199 			 * check if the whole packet complete or not.
1200 			 */
1201 			pkt = pkt->pkt.next;
1202 			if (pkt != NULL) {
1203 				/*
1204 				 * There are more segments.
1205 				 */
1206 				if (vb_avail == 0) {
1207 					/*
1208 					 * This current buffer from vring is
1209 					 * used up, need fetch next buffer
1210 					 * from buf_vec.
1211 					 */
1212 					uint32_t desc_idx =
1213 						vq->buf_vec[vec_idx].desc_idx;
1214 					vq->desc[desc_idx].len = vb_offset;
1215 
1216 					if ((vq->desc[desc_idx].flags &
1217 						VRING_DESC_F_NEXT) == 0) {
1218 						uint16_t wrapped_idx =
1219 							cur_idx & (vq->size - 1);
1220 						/*
1221 						 * Update used ring with the
1222 						 * descriptor information
1223 						 */
1224 						vq->used->ring[wrapped_idx].id
1225 							= desc_idx;
1226 						vq->used->ring[wrapped_idx].len
1227 							= entry_len;
1228 						entry_success++;
1229 						entry_len = 0;
1230 						cur_idx++;
1231 					}
1232 
1233 					/* Get next buffer from buf_vec. */
1234 					vec_idx++;
1235 					vb_addr = gpa_to_vva(dev,
1236 						vq->buf_vec[vec_idx].buf_addr);
1237 					vb_avail =
1238 						vq->buf_vec[vec_idx].buf_len;
1239 					vb_offset = 0;
1240 				}
1241 
1242 				seg_offset = 0;
1243 				seg_avail = rte_pktmbuf_data_len(pkt);
1244 				cpy_len = RTE_MIN(vb_avail, seg_avail);
1245 			} else {
1246 				/*
1247 				 * This whole packet completes.
1248 				 */
1249 				uint32_t desc_idx =
1250 					vq->buf_vec[vec_idx].desc_idx;
1251 				vq->desc[desc_idx].len = vb_offset;
1252 
1253 				while (vq->desc[desc_idx].flags &
1254 					VRING_DESC_F_NEXT) {
1255 					desc_idx = vq->desc[desc_idx].next;
1256 					 vq->desc[desc_idx].len = 0;
1257 				}
1258 
1259 				/* Update used ring with desc information */
1260 				vq->used->ring[cur_idx & (vq->size - 1)].id
1261 					= vq->buf_vec[vec_idx].desc_idx;
1262 				vq->used->ring[cur_idx & (vq->size - 1)].len
1263 					= entry_len;
1264 				entry_len = 0;
1265 				cur_idx++;
1266 				entry_success++;
1267 				seg_avail = 0;
1268 				cpy_len = RTE_MIN(vb_avail, seg_avail);
1269 			}
1270 		}
1271 	}
1272 
1273 	return entry_success;
1274 }
1275 
1276 /*
1277  * This function adds buffers to the virtio devices RX virtqueue. Buffers can
1278  * be received from the physical port or from another virtio device. A packet
1279  * count is returned to indicate the number of packets that were succesfully
1280  * added to the RX queue. This function works for mergeable RX.
1281  */
1282 static inline uint32_t __attribute__((always_inline))
1283 virtio_dev_merge_rx(struct virtio_net *dev, struct rte_mbuf **pkts,
1284 	uint32_t count)
1285 {
1286 	struct vhost_virtqueue *vq;
1287 	uint32_t pkt_idx = 0, entry_success = 0;
1288 	uint32_t retry = 0;
1289 	uint16_t avail_idx, res_cur_idx;
1290 	uint16_t res_base_idx, res_end_idx;
1291 	uint8_t success = 0;
1292 
1293 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
1294 		dev->device_fh);
1295 	vq = dev->virtqueue[VIRTIO_RXQ];
1296 	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
1297 
1298 	if (count == 0)
1299 		return 0;
1300 
1301 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
1302 		uint32_t secure_len = 0;
1303 		uint16_t need_cnt;
1304 		uint32_t vec_idx = 0;
1305 		uint32_t pkt_len = pkts[pkt_idx]->pkt.pkt_len + vq->vhost_hlen;
1306 		uint16_t i, id;
1307 
1308 		do {
1309 			/*
1310 			 * As many data cores may want access to available
1311 			 * buffers, they need to be reserved.
1312 			 */
1313 			res_base_idx = vq->last_used_idx_res;
1314 			res_cur_idx = res_base_idx;
1315 
1316 			do {
1317 				avail_idx = *((volatile uint16_t *)&vq->avail->idx);
1318 				if (unlikely(res_cur_idx == avail_idx)) {
1319 					/*
1320 					 * If retry is enabled and the queue is
1321 					 * full then we wait and retry to avoid
1322 					 * packet loss.
1323 					 */
1324 					if (enable_retry) {
1325 						uint8_t cont = 0;
1326 						for (retry = 0; retry < burst_rx_retry_num; retry++) {
1327 							rte_delay_us(burst_rx_delay_time);
1328 							avail_idx =
1329 								*((volatile uint16_t *)&vq->avail->idx);
1330 							if (likely(res_cur_idx != avail_idx)) {
1331 								cont = 1;
1332 								break;
1333 							}
1334 						}
1335 						if (cont == 1)
1336 							continue;
1337 					}
1338 
1339 					LOG_DEBUG(VHOST_DATA,
1340 						"(%"PRIu64") Failed "
1341 						"to get enough desc from "
1342 						"vring\n",
1343 						dev->device_fh);
1344 					return pkt_idx;
1345 				} else {
1346 					uint16_t wrapped_idx =
1347 						(res_cur_idx) & (vq->size - 1);
1348 					uint32_t idx =
1349 						vq->avail->ring[wrapped_idx];
1350 					uint8_t next_desc;
1351 
1352 					do {
1353 						next_desc = 0;
1354 						secure_len += vq->desc[idx].len;
1355 						if (vq->desc[idx].flags &
1356 							VRING_DESC_F_NEXT) {
1357 							idx = vq->desc[idx].next;
1358 							next_desc = 1;
1359 						}
1360 					} while (next_desc);
1361 
1362 					res_cur_idx++;
1363 				}
1364 			} while (pkt_len > secure_len);
1365 
1366 			/* vq->last_used_idx_res is atomically updated. */
1367 			success = rte_atomic16_cmpset(&vq->last_used_idx_res,
1368 							res_base_idx,
1369 							res_cur_idx);
1370 		} while (success == 0);
1371 
1372 		id = res_base_idx;
1373 		need_cnt = res_cur_idx - res_base_idx;
1374 
1375 		for (i = 0; i < need_cnt; i++, id++) {
1376 			uint16_t wrapped_idx = id & (vq->size - 1);
1377 			uint32_t idx = vq->avail->ring[wrapped_idx];
1378 			uint8_t next_desc;
1379 			do {
1380 				next_desc = 0;
1381 				vq->buf_vec[vec_idx].buf_addr =
1382 					vq->desc[idx].addr;
1383 				vq->buf_vec[vec_idx].buf_len =
1384 					vq->desc[idx].len;
1385 				vq->buf_vec[vec_idx].desc_idx = idx;
1386 				vec_idx++;
1387 
1388 				if (vq->desc[idx].flags & VRING_DESC_F_NEXT) {
1389 					idx = vq->desc[idx].next;
1390 					next_desc = 1;
1391 				}
1392 			} while (next_desc);
1393 		}
1394 
1395 		res_end_idx = res_cur_idx;
1396 
1397 		entry_success = copy_from_mbuf_to_vring(dev, res_base_idx,
1398 			res_end_idx, pkts[pkt_idx]);
1399 
1400 		rte_compiler_barrier();
1401 
1402 		/*
1403 		 * Wait until it's our turn to add our buffer
1404 		 * to the used ring.
1405 		 */
1406 		while (unlikely(vq->last_used_idx != res_base_idx))
1407 			rte_pause();
1408 
1409 		*(volatile uint16_t *)&vq->used->idx += entry_success;
1410 		vq->last_used_idx = res_end_idx;
1411 
1412 		/* Kick the guest if necessary. */
1413 		if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
1414 			eventfd_write((int)vq->kickfd, 1);
1415 	}
1416 
1417 	return count;
1418 }
1419 
1420 /*
1421  * Compares a packet destination MAC address to a device MAC address.
1422  */
1423 static inline int __attribute__((always_inline))
1424 ether_addr_cmp(struct ether_addr *ea, struct ether_addr *eb)
1425 {
1426 	return (((*(uint64_t *)ea ^ *(uint64_t *)eb) & MAC_ADDR_CMP) == 0);
1427 }
1428 
1429 /*
1430  * This function learns the MAC address of the device and registers this along with a
1431  * vlan tag to a VMDQ.
1432  */
1433 static int
1434 link_vmdq(struct virtio_net *dev, struct rte_mbuf *m)
1435 {
1436 	struct ether_hdr *pkt_hdr;
1437 	struct virtio_net_data_ll *dev_ll;
1438 	int i, ret;
1439 
1440 	/* Learn MAC address of guest device from packet */
1441 	pkt_hdr = (struct ether_hdr *)m->pkt.data;
1442 
1443 	dev_ll = ll_root_used;
1444 
1445 	while (dev_ll != NULL) {
1446 		if (ether_addr_cmp(&(pkt_hdr->s_addr), &dev_ll->dev->mac_address)) {
1447 			RTE_LOG(INFO, VHOST_DATA, "(%"PRIu64") WARNING: This device is using an existing MAC address and has not been registered.\n", dev->device_fh);
1448 			return -1;
1449 		}
1450 		dev_ll = dev_ll->next;
1451 	}
1452 
1453 	for (i = 0; i < ETHER_ADDR_LEN; i++)
1454 		dev->mac_address.addr_bytes[i] = pkt_hdr->s_addr.addr_bytes[i];
1455 
1456 	/* vlan_tag currently uses the device_id. */
1457 	dev->vlan_tag = vlan_tags[dev->device_fh];
1458 
1459 	/* Print out VMDQ registration info. */
1460 	RTE_LOG(INFO, VHOST_DATA, "(%"PRIu64") MAC_ADDRESS %02x:%02x:%02x:%02x:%02x:%02x and VLAN_TAG %d registered\n",
1461 		dev->device_fh,
1462 		dev->mac_address.addr_bytes[0], dev->mac_address.addr_bytes[1],
1463 		dev->mac_address.addr_bytes[2], dev->mac_address.addr_bytes[3],
1464 		dev->mac_address.addr_bytes[4], dev->mac_address.addr_bytes[5],
1465 		dev->vlan_tag);
1466 
1467 	/* Register the MAC address. */
1468 	ret = rte_eth_dev_mac_addr_add(ports[0], &dev->mac_address, (uint32_t)dev->device_fh);
1469 	if (ret)
1470 		RTE_LOG(ERR, VHOST_DATA, "(%"PRIu64") Failed to add device MAC address to VMDQ\n",
1471 					dev->device_fh);
1472 
1473 	/* Enable stripping of the vlan tag as we handle routing. */
1474 	rte_eth_dev_set_vlan_strip_on_queue(ports[0], (uint16_t)dev->vmdq_rx_q, 1);
1475 
1476 	/* Set device as ready for RX. */
1477 	dev->ready = DEVICE_RX;
1478 
1479 	return 0;
1480 }
1481 
1482 /*
1483  * Removes MAC address and vlan tag from VMDQ. Ensures that nothing is adding buffers to the RX
1484  * queue before disabling RX on the device.
1485  */
1486 static inline void
1487 unlink_vmdq(struct virtio_net *dev)
1488 {
1489 	unsigned i = 0;
1490 	unsigned rx_count;
1491 	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1492 
1493 	if (dev->ready == DEVICE_RX) {
1494 		/*clear MAC and VLAN settings*/
1495 		rte_eth_dev_mac_addr_remove(ports[0], &dev->mac_address);
1496 		for (i = 0; i < 6; i++)
1497 			dev->mac_address.addr_bytes[i] = 0;
1498 
1499 		dev->vlan_tag = 0;
1500 
1501 		/*Clear out the receive buffers*/
1502 		rx_count = rte_eth_rx_burst(ports[0],
1503 					(uint16_t)dev->vmdq_rx_q, pkts_burst, MAX_PKT_BURST);
1504 
1505 		while (rx_count) {
1506 			for (i = 0; i < rx_count; i++)
1507 				rte_pktmbuf_free(pkts_burst[i]);
1508 
1509 			rx_count = rte_eth_rx_burst(ports[0],
1510 					(uint16_t)dev->vmdq_rx_q, pkts_burst, MAX_PKT_BURST);
1511 		}
1512 
1513 		dev->ready = DEVICE_MAC_LEARNING;
1514 	}
1515 }
1516 
1517 /*
1518  * Check if the packet destination MAC address is for a local device. If so then put
1519  * the packet on that devices RX queue. If not then return.
1520  */
1521 static inline unsigned __attribute__((always_inline))
1522 virtio_tx_local(struct virtio_net *dev, struct rte_mbuf *m)
1523 {
1524 	struct virtio_net_data_ll *dev_ll;
1525 	struct ether_hdr *pkt_hdr;
1526 	uint64_t ret = 0;
1527 
1528 	pkt_hdr = (struct ether_hdr *)m->pkt.data;
1529 
1530 	/*get the used devices list*/
1531 	dev_ll = ll_root_used;
1532 
1533 	while (dev_ll != NULL) {
1534 		if ((dev_ll->dev->ready == DEVICE_RX) && ether_addr_cmp(&(pkt_hdr->d_addr),
1535 				          &dev_ll->dev->mac_address)) {
1536 
1537 			/* Drop the packet if the TX packet is destined for the TX device. */
1538 			if (dev_ll->dev->device_fh == dev->device_fh) {
1539 				LOG_DEBUG(VHOST_DATA, "(%"PRIu64") TX: Source and destination MAC addresses are the same. Dropping packet.\n",
1540 							dev_ll->dev->device_fh);
1541 				return 0;
1542 			}
1543 
1544 
1545 			LOG_DEBUG(VHOST_DATA, "(%"PRIu64") TX: MAC address is local\n", dev_ll->dev->device_fh);
1546 
1547 			if (dev_ll->dev->remove) {
1548 				/*drop the packet if the device is marked for removal*/
1549 				LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Device is marked for removal\n", dev_ll->dev->device_fh);
1550 			} else {
1551 				uint32_t mergeable =
1552 					dev_ll->dev->features &
1553 					(1 << VIRTIO_NET_F_MRG_RXBUF);
1554 
1555 				/*send the packet to the local virtio device*/
1556 				if (likely(mergeable == 0))
1557 					ret = virtio_dev_rx(dev_ll->dev, &m, 1);
1558 				else
1559 					ret = virtio_dev_merge_rx(dev_ll->dev,
1560 						&m, 1);
1561 
1562 				if (enable_stats) {
1563 					rte_atomic64_add(
1564 					&dev_statistics[dev_ll->dev->device_fh].rx_total_atomic,
1565 					1);
1566 					rte_atomic64_add(
1567 					&dev_statistics[dev_ll->dev->device_fh].rx_atomic,
1568 					ret);
1569 					dev_statistics[dev->device_fh].tx_total++;
1570 					dev_statistics[dev->device_fh].tx += ret;
1571 				}
1572 			}
1573 
1574 			return 0;
1575 		}
1576 		dev_ll = dev_ll->next;
1577 	}
1578 
1579 	return -1;
1580 }
1581 
1582 /*
1583  * This function routes the TX packet to the correct interface. This may be a local device
1584  * or the physical port.
1585  */
1586 static inline void __attribute__((always_inline))
1587 virtio_tx_route(struct virtio_net* dev, struct rte_mbuf *m, struct rte_mempool *mbuf_pool, uint16_t vlan_tag)
1588 {
1589 	struct mbuf_table *tx_q;
1590 	struct vlan_ethhdr *vlan_hdr;
1591 	struct rte_mbuf **m_table;
1592 	struct rte_mbuf *mbuf, *prev;
1593 	unsigned len, ret, offset = 0;
1594 	const uint16_t lcore_id = rte_lcore_id();
1595 	struct virtio_net_data_ll *dev_ll = ll_root_used;
1596 	struct ether_hdr *pkt_hdr = (struct ether_hdr *)m->pkt.data;
1597 
1598 	/*check if destination is local VM*/
1599 	if ((vm2vm_mode == VM2VM_SOFTWARE) && (virtio_tx_local(dev, m) == 0))
1600 		return;
1601 
1602 	if (vm2vm_mode == VM2VM_HARDWARE) {
1603 		while (dev_ll != NULL) {
1604 			if ((dev_ll->dev->ready == DEVICE_RX)
1605 				&& ether_addr_cmp(&(pkt_hdr->d_addr),
1606 				&dev_ll->dev->mac_address)) {
1607 				/*
1608 				 * Drop the packet if the TX packet is
1609 				 * destined for the TX device.
1610 				 */
1611 				if (dev_ll->dev->device_fh == dev->device_fh) {
1612 					LOG_DEBUG(VHOST_DATA,
1613 					"(%"PRIu64") TX: Source and destination"
1614 					" MAC addresses are the same. Dropping "
1615 					"packet.\n",
1616 					dev_ll->dev->device_fh);
1617 					return;
1618 				}
1619 				offset = 4;
1620 				vlan_tag =
1621 				(uint16_t)
1622 				vlan_tags[(uint16_t)dev_ll->dev->device_fh];
1623 
1624 				LOG_DEBUG(VHOST_DATA,
1625 				"(%"PRIu64") TX: pkt to local VM device id:"
1626 				"(%"PRIu64") vlan tag: %d.\n",
1627 				dev->device_fh, dev_ll->dev->device_fh,
1628 				vlan_tag);
1629 
1630 				break;
1631 			}
1632 			dev_ll = dev_ll->next;
1633 		}
1634 	}
1635 
1636 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") TX: MAC address is external\n", dev->device_fh);
1637 
1638 	/*Add packet to the port tx queue*/
1639 	tx_q = &lcore_tx_queue[lcore_id];
1640 	len = tx_q->len;
1641 
1642 	/* Allocate an mbuf and populate the structure. */
1643 	mbuf = rte_pktmbuf_alloc(mbuf_pool);
1644 	if (unlikely(mbuf == NULL)) {
1645 		RTE_LOG(ERR, VHOST_DATA,
1646 			"Failed to allocate memory for mbuf.\n");
1647 		return;
1648 	}
1649 
1650 	mbuf->pkt.data_len = m->pkt.data_len + VLAN_HLEN + offset;
1651 	mbuf->pkt.pkt_len = m->pkt.pkt_len + VLAN_HLEN + offset;
1652 	mbuf->pkt.nb_segs = m->pkt.nb_segs;
1653 
1654 	/* Copy ethernet header to mbuf. */
1655 	rte_memcpy((void*)mbuf->pkt.data, (const void*)m->pkt.data, ETH_HLEN);
1656 
1657 
1658 	/* Setup vlan header. Bytes need to be re-ordered for network with htons()*/
1659 	vlan_hdr = (struct vlan_ethhdr *) mbuf->pkt.data;
1660 	vlan_hdr->h_vlan_encapsulated_proto = vlan_hdr->h_vlan_proto;
1661 	vlan_hdr->h_vlan_proto = htons(ETH_P_8021Q);
1662 	vlan_hdr->h_vlan_TCI = htons(vlan_tag);
1663 
1664 	/* Copy the remaining packet contents to the mbuf. */
1665 	rte_memcpy((void*) ((uint8_t*)mbuf->pkt.data + VLAN_ETH_HLEN),
1666 		(const void*) ((uint8_t*)m->pkt.data + ETH_HLEN), (m->pkt.data_len - ETH_HLEN));
1667 
1668 	/* Copy the remaining segments for the whole packet. */
1669 	prev = mbuf;
1670 	while (m->pkt.next) {
1671 		/* Allocate an mbuf and populate the structure. */
1672 		struct rte_mbuf *next_mbuf = rte_pktmbuf_alloc(mbuf_pool);
1673 		if (unlikely(next_mbuf == NULL)) {
1674 			rte_pktmbuf_free(mbuf);
1675 			RTE_LOG(ERR, VHOST_DATA,
1676 				"Failed to allocate memory for mbuf.\n");
1677 			return;
1678 		}
1679 
1680 		m = m->pkt.next;
1681 		prev->pkt.next = next_mbuf;
1682 		prev = next_mbuf;
1683 		next_mbuf->pkt.data_len = m->pkt.data_len;
1684 
1685 		/* Copy data to next mbuf. */
1686 		rte_memcpy(rte_pktmbuf_mtod(next_mbuf, void *),
1687 			rte_pktmbuf_mtod(m, const void *), m->pkt.data_len);
1688 	}
1689 
1690 	tx_q->m_table[len] = mbuf;
1691 	len++;
1692 	if (enable_stats) {
1693 		dev_statistics[dev->device_fh].tx_total++;
1694 		dev_statistics[dev->device_fh].tx++;
1695 	}
1696 
1697 	if (unlikely(len == MAX_PKT_BURST)) {
1698 		m_table = (struct rte_mbuf **)tx_q->m_table;
1699 		ret = rte_eth_tx_burst(ports[0], (uint16_t)tx_q->txq_id, m_table, (uint16_t) len);
1700 		/* Free any buffers not handled by TX and update the port stats. */
1701 		if (unlikely(ret < len)) {
1702 			do {
1703 				rte_pktmbuf_free(m_table[ret]);
1704 			} while (++ret < len);
1705 		}
1706 
1707 		len = 0;
1708 	}
1709 
1710 	tx_q->len = len;
1711 	return;
1712 }
1713 
1714 static inline void __attribute__((always_inline))
1715 virtio_dev_tx(struct virtio_net* dev, struct rte_mempool *mbuf_pool)
1716 {
1717 	struct rte_mbuf m;
1718 	struct vhost_virtqueue *vq;
1719 	struct vring_desc *desc;
1720 	uint64_t buff_addr = 0;
1721 	uint32_t head[MAX_PKT_BURST];
1722 	uint32_t used_idx;
1723 	uint32_t i;
1724 	uint16_t free_entries, packet_success = 0;
1725 	uint16_t avail_idx;
1726 
1727 	vq = dev->virtqueue[VIRTIO_TXQ];
1728 	avail_idx =  *((volatile uint16_t *)&vq->avail->idx);
1729 
1730 	/* If there are no available buffers then return. */
1731 	if (vq->last_used_idx == avail_idx)
1732 		return;
1733 
1734 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_tx()\n", dev->device_fh);
1735 
1736 	/* Prefetch available ring to retrieve head indexes. */
1737 	rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);
1738 
1739 	/*get the number of free entries in the ring*/
1740 	free_entries = (avail_idx - vq->last_used_idx);
1741 
1742 	/* Limit to MAX_PKT_BURST. */
1743 	if (free_entries > MAX_PKT_BURST)
1744 		free_entries = MAX_PKT_BURST;
1745 
1746 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Buffers available %d\n", dev->device_fh, free_entries);
1747 	/* Retrieve all of the head indexes first to avoid caching issues. */
1748 	for (i = 0; i < free_entries; i++)
1749 		head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)];
1750 
1751 	/* Prefetch descriptor index. */
1752 	rte_prefetch0(&vq->desc[head[packet_success]]);
1753 	rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);
1754 
1755 	while (packet_success < free_entries) {
1756 		desc = &vq->desc[head[packet_success]];
1757 
1758 		/* Discard first buffer as it is the virtio header */
1759 		desc = &vq->desc[desc->next];
1760 
1761 		/* Buffer address translation. */
1762 		buff_addr = gpa_to_vva(dev, desc->addr);
1763 		/* Prefetch buffer address. */
1764 		rte_prefetch0((void*)(uintptr_t)buff_addr);
1765 
1766 		used_idx = vq->last_used_idx & (vq->size - 1);
1767 
1768 		if (packet_success < (free_entries - 1)) {
1769 			/* Prefetch descriptor index. */
1770 			rte_prefetch0(&vq->desc[head[packet_success+1]]);
1771 			rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
1772 		}
1773 
1774 		/* Update used index buffer information. */
1775 		vq->used->ring[used_idx].id = head[packet_success];
1776 		vq->used->ring[used_idx].len = 0;
1777 
1778 		/* Setup dummy mbuf. This is copied to a real mbuf if transmitted out the physical port. */
1779 		m.pkt.data_len = desc->len;
1780 		m.pkt.pkt_len = desc->len;
1781 		m.pkt.data = (void*)(uintptr_t)buff_addr;
1782 
1783 		PRINT_PACKET(dev, (uintptr_t)buff_addr, desc->len, 0);
1784 
1785 		/* If this is the first received packet we need to learn the MAC and setup VMDQ */
1786 		if (dev->ready == DEVICE_MAC_LEARNING) {
1787 			if (dev->remove || (link_vmdq(dev, &m) == -1)) {
1788 				/*discard frame if device is scheduled for removal or a duplicate MAC address is found. */
1789 				packet_success += free_entries;
1790 				vq->last_used_idx += packet_success;
1791 				break;
1792 			}
1793 		}
1794 		virtio_tx_route(dev, &m, mbuf_pool, (uint16_t)dev->device_fh);
1795 
1796 		vq->last_used_idx++;
1797 		packet_success++;
1798 	}
1799 
1800 	rte_compiler_barrier();
1801 	vq->used->idx += packet_success;
1802 	/* Kick guest if required. */
1803 	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
1804 		eventfd_write((int)vq->kickfd, 1);
1805 }
1806 
1807 /* This function works for TX packets with mergeable feature enabled. */
1808 static inline void __attribute__((always_inline))
1809 virtio_dev_merge_tx(struct virtio_net *dev, struct rte_mempool *mbuf_pool)
1810 {
1811 	struct rte_mbuf *m, *prev;
1812 	struct vhost_virtqueue *vq;
1813 	struct vring_desc *desc;
1814 	uint64_t vb_addr = 0;
1815 	uint32_t head[MAX_PKT_BURST];
1816 	uint32_t used_idx;
1817 	uint32_t i;
1818 	uint16_t free_entries, entry_success = 0;
1819 	uint16_t avail_idx;
1820 	uint32_t buf_size = MBUF_SIZE - (sizeof(struct rte_mbuf)
1821 			+ RTE_PKTMBUF_HEADROOM);
1822 
1823 	vq = dev->virtqueue[VIRTIO_TXQ];
1824 	avail_idx =  *((volatile uint16_t *)&vq->avail->idx);
1825 
1826 	/* If there are no available buffers then return. */
1827 	if (vq->last_used_idx == avail_idx)
1828 		return;
1829 
1830 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_tx()\n",
1831 		dev->device_fh);
1832 
1833 	/* Prefetch available ring to retrieve head indexes. */
1834 	rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);
1835 
1836 	/*get the number of free entries in the ring*/
1837 	free_entries = (avail_idx - vq->last_used_idx);
1838 
1839 	/* Limit to MAX_PKT_BURST. */
1840 	free_entries = RTE_MIN(free_entries, MAX_PKT_BURST);
1841 
1842 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Buffers available %d\n",
1843 		dev->device_fh, free_entries);
1844 	/* Retrieve all of the head indexes first to avoid caching issues. */
1845 	for (i = 0; i < free_entries; i++)
1846 		head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)];
1847 
1848 	/* Prefetch descriptor index. */
1849 	rte_prefetch0(&vq->desc[head[entry_success]]);
1850 	rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);
1851 
1852 	while (entry_success < free_entries) {
1853 		uint32_t vb_avail, vb_offset;
1854 		uint32_t seg_avail, seg_offset;
1855 		uint32_t cpy_len;
1856 		uint32_t seg_num = 0;
1857 		struct rte_mbuf *cur;
1858 		uint8_t alloc_err = 0;
1859 
1860 		desc = &vq->desc[head[entry_success]];
1861 
1862 		/* Discard first buffer as it is the virtio header */
1863 		desc = &vq->desc[desc->next];
1864 
1865 		/* Buffer address translation. */
1866 		vb_addr = gpa_to_vva(dev, desc->addr);
1867 		/* Prefetch buffer address. */
1868 		rte_prefetch0((void *)(uintptr_t)vb_addr);
1869 
1870 		used_idx = vq->last_used_idx & (vq->size - 1);
1871 
1872 		if (entry_success < (free_entries - 1)) {
1873 			/* Prefetch descriptor index. */
1874 			rte_prefetch0(&vq->desc[head[entry_success+1]]);
1875 			rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
1876 		}
1877 
1878 		/* Update used index buffer information. */
1879 		vq->used->ring[used_idx].id = head[entry_success];
1880 		vq->used->ring[used_idx].len = 0;
1881 
1882 		vb_offset = 0;
1883 		vb_avail = desc->len;
1884 		seg_offset = 0;
1885 		seg_avail = buf_size;
1886 		cpy_len = RTE_MIN(vb_avail, seg_avail);
1887 
1888 		PRINT_PACKET(dev, (uintptr_t)vb_addr, desc->len, 0);
1889 
1890 		/* Allocate an mbuf and populate the structure. */
1891 		m = rte_pktmbuf_alloc(mbuf_pool);
1892 		if (unlikely(m == NULL)) {
1893 			RTE_LOG(ERR, VHOST_DATA,
1894 				"Failed to allocate memory for mbuf.\n");
1895 			return;
1896 		}
1897 
1898 		seg_num++;
1899 		cur = m;
1900 		prev = m;
1901 		while (cpy_len != 0) {
1902 			rte_memcpy((void *)(rte_pktmbuf_mtod(cur, char *) + seg_offset),
1903 				(void *)((uintptr_t)(vb_addr + vb_offset)),
1904 				cpy_len);
1905 
1906 			seg_offset += cpy_len;
1907 			vb_offset += cpy_len;
1908 			vb_avail -= cpy_len;
1909 			seg_avail -= cpy_len;
1910 
1911 			if (vb_avail != 0) {
1912 				/*
1913 				 * The segment reachs to its end,
1914 				 * while the virtio buffer in TX vring has
1915 				 * more data to be copied.
1916 				 */
1917 				cur->pkt.data_len = seg_offset;
1918 				m->pkt.pkt_len += seg_offset;
1919 				/* Allocate mbuf and populate the structure. */
1920 				cur = rte_pktmbuf_alloc(mbuf_pool);
1921 				if (unlikely(cur == NULL)) {
1922 					RTE_LOG(ERR, VHOST_DATA, "Failed to "
1923 						"allocate memory for mbuf.\n");
1924 					rte_pktmbuf_free(m);
1925 					alloc_err = 1;
1926 					break;
1927 				}
1928 
1929 				seg_num++;
1930 				prev->pkt.next = cur;
1931 				prev = cur;
1932 				seg_offset = 0;
1933 				seg_avail = buf_size;
1934 			} else {
1935 				if (desc->flags & VRING_DESC_F_NEXT) {
1936 					/*
1937 					 * There are more virtio buffers in
1938 					 * same vring entry need to be copied.
1939 					 */
1940 					if (seg_avail == 0) {
1941 						/*
1942 						 * The current segment hasn't
1943 						 * room to accomodate more
1944 						 * data.
1945 						 */
1946 						cur->pkt.data_len = seg_offset;
1947 						m->pkt.pkt_len += seg_offset;
1948 						/*
1949 						 * Allocate an mbuf and
1950 						 * populate the structure.
1951 						 */
1952 						cur = rte_pktmbuf_alloc(mbuf_pool);
1953 						if (unlikely(cur == NULL)) {
1954 							RTE_LOG(ERR,
1955 								VHOST_DATA,
1956 								"Failed to "
1957 								"allocate memory "
1958 								"for mbuf\n");
1959 							rte_pktmbuf_free(m);
1960 							alloc_err = 1;
1961 							break;
1962 						}
1963 						seg_num++;
1964 						prev->pkt.next = cur;
1965 						prev = cur;
1966 						seg_offset = 0;
1967 						seg_avail = buf_size;
1968 					}
1969 
1970 					desc = &vq->desc[desc->next];
1971 
1972 					/* Buffer address translation. */
1973 					vb_addr = gpa_to_vva(dev, desc->addr);
1974 					/* Prefetch buffer address. */
1975 					rte_prefetch0((void *)(uintptr_t)vb_addr);
1976 					vb_offset = 0;
1977 					vb_avail = desc->len;
1978 
1979 					PRINT_PACKET(dev, (uintptr_t)vb_addr,
1980 						desc->len, 0);
1981 				} else {
1982 					/* The whole packet completes. */
1983 					cur->pkt.data_len = seg_offset;
1984 					m->pkt.pkt_len += seg_offset;
1985 					vb_avail = 0;
1986 				}
1987 			}
1988 
1989 			cpy_len = RTE_MIN(vb_avail, seg_avail);
1990 		}
1991 
1992 		if (unlikely(alloc_err == 1))
1993 			break;
1994 
1995 		m->pkt.nb_segs = seg_num;
1996 
1997 		/*
1998 		 * If this is the first received packet we need to learn
1999 		 * the MAC and setup VMDQ
2000 		 */
2001 		if (dev->ready == DEVICE_MAC_LEARNING) {
2002 			if (dev->remove || (link_vmdq(dev, m) == -1)) {
2003 				/*
2004 				 * Discard frame if device is scheduled for
2005 				 * removal or a duplicate MAC address is found.
2006 				 */
2007 				entry_success = free_entries;
2008 				vq->last_used_idx += entry_success;
2009 				rte_pktmbuf_free(m);
2010 				break;
2011 			}
2012 		}
2013 
2014 		virtio_tx_route(dev, m, mbuf_pool, (uint16_t)dev->device_fh);
2015 		vq->last_used_idx++;
2016 		entry_success++;
2017 		rte_pktmbuf_free(m);
2018 	}
2019 
2020 	rte_compiler_barrier();
2021 	vq->used->idx += entry_success;
2022 	/* Kick guest if required. */
2023 	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
2024 		eventfd_write((int)vq->kickfd, 1);
2025 
2026 }
2027 
2028 /*
2029  * This function is called by each data core. It handles all RX/TX registered with the
2030  * core. For TX the specific lcore linked list is used. For RX, MAC addresses are compared
2031  * with all devices in the main linked list.
2032  */
2033 static int
2034 switch_worker(__attribute__((unused)) void *arg)
2035 {
2036 	struct rte_mempool *mbuf_pool = arg;
2037 	struct virtio_net *dev = NULL;
2038 	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
2039 	struct virtio_net_data_ll *dev_ll;
2040 	struct mbuf_table *tx_q;
2041 	volatile struct lcore_ll_info *lcore_ll;
2042 	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US;
2043 	uint64_t prev_tsc, diff_tsc, cur_tsc, ret_count = 0;
2044 	unsigned ret, i;
2045 	const uint16_t lcore_id = rte_lcore_id();
2046 	const uint16_t num_cores = (uint16_t)rte_lcore_count();
2047 	uint16_t rx_count = 0;
2048 	uint32_t mergeable = 0;
2049 
2050 	RTE_LOG(INFO, VHOST_DATA, "Procesing on Core %u started\n", lcore_id);
2051 	lcore_ll = lcore_info[lcore_id].lcore_ll;
2052 	prev_tsc = 0;
2053 
2054 	tx_q = &lcore_tx_queue[lcore_id];
2055 	for (i = 0; i < num_cores; i ++) {
2056 		if (lcore_ids[i] == lcore_id) {
2057 			tx_q->txq_id = i;
2058 			break;
2059 		}
2060 	}
2061 
2062 	while(1) {
2063 		cur_tsc = rte_rdtsc();
2064 		/*
2065 		 * TX burst queue drain
2066 		 */
2067 		diff_tsc = cur_tsc - prev_tsc;
2068 		if (unlikely(diff_tsc > drain_tsc)) {
2069 
2070 			if (tx_q->len) {
2071 				LOG_DEBUG(VHOST_DATA, "TX queue drained after timeout with burst size %u \n", tx_q->len);
2072 
2073 				/*Tx any packets in the queue*/
2074 				ret = rte_eth_tx_burst(ports[0], (uint16_t)tx_q->txq_id,
2075 									   (struct rte_mbuf **)tx_q->m_table,
2076 									   (uint16_t)tx_q->len);
2077 				if (unlikely(ret < tx_q->len)) {
2078 					do {
2079 						rte_pktmbuf_free(tx_q->m_table[ret]);
2080 					} while (++ret < tx_q->len);
2081 				}
2082 
2083 				tx_q->len = 0;
2084 			}
2085 
2086 			prev_tsc = cur_tsc;
2087 
2088 		}
2089 
2090 		rte_prefetch0(lcore_ll->ll_root_used);
2091 		/*
2092 		 * Inform the configuration core that we have exited the linked list and that no devices are
2093 		 * in use if requested.
2094 		 */
2095 		if (lcore_ll->dev_removal_flag == REQUEST_DEV_REMOVAL)
2096 			lcore_ll->dev_removal_flag = ACK_DEV_REMOVAL;
2097 
2098 		/*
2099 		 * Process devices
2100 		 */
2101 		dev_ll = lcore_ll->ll_root_used;
2102 
2103 		while (dev_ll != NULL) {
2104 			/*get virtio device ID*/
2105 			dev = dev_ll->dev;
2106 			mergeable =
2107 				dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF);
2108 
2109 			if (dev->remove) {
2110 				dev_ll = dev_ll->next;
2111 				unlink_vmdq(dev);
2112 				dev->ready = DEVICE_SAFE_REMOVE;
2113 				continue;
2114 			}
2115 			if (likely(dev->ready == DEVICE_RX)) {
2116 				/*Handle guest RX*/
2117 				rx_count = rte_eth_rx_burst(ports[0],
2118 					(uint16_t)dev->vmdq_rx_q, pkts_burst, MAX_PKT_BURST);
2119 
2120 				if (rx_count) {
2121 					if (likely(mergeable == 0))
2122 						ret_count =
2123 							virtio_dev_rx(dev,
2124 							pkts_burst, rx_count);
2125 					else
2126 						ret_count =
2127 							virtio_dev_merge_rx(dev,
2128 							pkts_burst, rx_count);
2129 
2130 					if (enable_stats) {
2131 						rte_atomic64_add(
2132 						&dev_statistics[dev_ll->dev->device_fh].rx_total_atomic,
2133 						rx_count);
2134 						rte_atomic64_add(
2135 						&dev_statistics[dev_ll->dev->device_fh].rx_atomic, ret_count);
2136 					}
2137 					while (likely(rx_count)) {
2138 						rx_count--;
2139 						rte_pktmbuf_free(pkts_burst[rx_count]);
2140 					}
2141 
2142 				}
2143 			}
2144 
2145 			if (!dev->remove) {
2146 				/*Handle guest TX*/
2147 				if (likely(mergeable == 0))
2148 					virtio_dev_tx(dev, mbuf_pool);
2149 				else
2150 					virtio_dev_merge_tx(dev, mbuf_pool);
2151 			}
2152 
2153 			/*move to the next device in the list*/
2154 			dev_ll = dev_ll->next;
2155 		}
2156 	}
2157 
2158 	return 0;
2159 }
2160 
2161 /*
2162  * This function gets available ring number for zero copy rx.
2163  * Only one thread will call this funciton for a paticular virtio device,
2164  * so, it is designed as non-thread-safe function.
2165  */
2166 static inline uint32_t __attribute__((always_inline))
2167 get_available_ring_num_zcp(struct virtio_net *dev)
2168 {
2169 	struct vhost_virtqueue *vq = dev->virtqueue[VIRTIO_RXQ];
2170 	uint16_t avail_idx;
2171 
2172 	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
2173 	return (uint32_t)(avail_idx - vq->last_used_idx_res);
2174 }
2175 
2176 /*
2177  * This function gets available ring index for zero copy rx,
2178  * it will retry 'burst_rx_retry_num' times till it get enough ring index.
2179  * Only one thread will call this funciton for a paticular virtio device,
2180  * so, it is designed as non-thread-safe function.
2181  */
2182 static inline uint32_t __attribute__((always_inline))
2183 get_available_ring_index_zcp(struct virtio_net *dev,
2184 	uint16_t *res_base_idx, uint32_t count)
2185 {
2186 	struct vhost_virtqueue *vq = dev->virtqueue[VIRTIO_RXQ];
2187 	uint16_t avail_idx;
2188 	uint32_t retry = 0;
2189 	uint16_t free_entries;
2190 
2191 	*res_base_idx = vq->last_used_idx_res;
2192 	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
2193 	free_entries = (avail_idx - *res_base_idx);
2194 
2195 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") in get_available_ring_index_zcp: "
2196 			"avail idx: %d, "
2197 			"res base idx:%d, free entries:%d\n",
2198 			dev->device_fh, avail_idx, *res_base_idx,
2199 			free_entries);
2200 
2201 	/*
2202 	 * If retry is enabled and the queue is full then we wait
2203 	 * and retry to avoid packet loss.
2204 	 */
2205 	if (enable_retry && unlikely(count > free_entries)) {
2206 		for (retry = 0; retry < burst_rx_retry_num; retry++) {
2207 			rte_delay_us(burst_rx_delay_time);
2208 			avail_idx = *((volatile uint16_t *)&vq->avail->idx);
2209 			free_entries = (avail_idx - *res_base_idx);
2210 			if (count <= free_entries)
2211 				break;
2212 		}
2213 	}
2214 
2215 	/*check that we have enough buffers*/
2216 	if (unlikely(count > free_entries))
2217 		count = free_entries;
2218 
2219 	if (unlikely(count == 0)) {
2220 		LOG_DEBUG(VHOST_DATA,
2221 			"(%"PRIu64") Fail in get_available_ring_index_zcp: "
2222 			"avail idx: %d, res base idx:%d, free entries:%d\n",
2223 			dev->device_fh, avail_idx,
2224 			*res_base_idx, free_entries);
2225 		return 0;
2226 	}
2227 
2228 	vq->last_used_idx_res = *res_base_idx + count;
2229 
2230 	return count;
2231 }
2232 
2233 /*
2234  * This function put descriptor back to used list.
2235  */
2236 static inline void __attribute__((always_inline))
2237 put_desc_to_used_list_zcp(struct vhost_virtqueue *vq, uint16_t desc_idx)
2238 {
2239 	uint16_t res_cur_idx = vq->last_used_idx;
2240 	vq->used->ring[res_cur_idx & (vq->size - 1)].id = (uint32_t)desc_idx;
2241 	vq->used->ring[res_cur_idx & (vq->size - 1)].len = 0;
2242 	rte_compiler_barrier();
2243 	*(volatile uint16_t *)&vq->used->idx += 1;
2244 	vq->last_used_idx += 1;
2245 
2246 	/* Kick the guest if necessary. */
2247 	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
2248 		eventfd_write((int)vq->kickfd, 1);
2249 }
2250 
2251 /*
2252  * This function get available descriptor from vitio vring and un-attached mbuf
2253  * from vpool->ring, and then attach them together. It needs adjust the offset
2254  * for buff_addr and phys_addr accroding to PMD implementation, otherwise the
2255  * frame data may be put to wrong location in mbuf.
2256  */
2257 static inline void __attribute__((always_inline))
2258 attach_rxmbuf_zcp(struct virtio_net *dev)
2259 {
2260 	uint16_t res_base_idx, desc_idx;
2261 	uint64_t buff_addr, phys_addr;
2262 	struct vhost_virtqueue *vq;
2263 	struct vring_desc *desc;
2264 	struct rte_mbuf *mbuf = NULL;
2265 	struct vpool *vpool;
2266 	hpa_type addr_type;
2267 
2268 	vpool = &vpool_array[dev->vmdq_rx_q];
2269 	vq = dev->virtqueue[VIRTIO_RXQ];
2270 
2271 	do {
2272 		if (unlikely(get_available_ring_index_zcp(dev, &res_base_idx,
2273 				1) != 1))
2274 			return;
2275 		desc_idx = vq->avail->ring[(res_base_idx) & (vq->size - 1)];
2276 
2277 		desc = &vq->desc[desc_idx];
2278 		if (desc->flags & VRING_DESC_F_NEXT) {
2279 			desc = &vq->desc[desc->next];
2280 			buff_addr = gpa_to_vva(dev, desc->addr);
2281 			phys_addr = gpa_to_hpa(dev, desc->addr, desc->len,
2282 					&addr_type);
2283 		} else {
2284 			buff_addr = gpa_to_vva(dev,
2285 					desc->addr + vq->vhost_hlen);
2286 			phys_addr = gpa_to_hpa(dev,
2287 					desc->addr + vq->vhost_hlen,
2288 					desc->len, &addr_type);
2289 		}
2290 
2291 		if (unlikely(addr_type == PHYS_ADDR_INVALID)) {
2292 			RTE_LOG(ERR, VHOST_DATA, "(%"PRIu64") Invalid frame buffer"
2293 				" address found when attaching RX frame buffer"
2294 				" address!\n", dev->device_fh);
2295 			put_desc_to_used_list_zcp(vq, desc_idx);
2296 			continue;
2297 		}
2298 
2299 		/*
2300 		 * Check if the frame buffer address from guest crosses
2301 		 * sub-region or not.
2302 		 */
2303 		if (unlikely(addr_type == PHYS_ADDR_CROSS_SUBREG)) {
2304 			RTE_LOG(ERR, VHOST_DATA,
2305 				"(%"PRIu64") Frame buffer address cross "
2306 				"sub-regioin found when attaching RX frame "
2307 				"buffer address!\n",
2308 				dev->device_fh);
2309 			put_desc_to_used_list_zcp(vq, desc_idx);
2310 			continue;
2311 		}
2312 	} while (unlikely(phys_addr == 0));
2313 
2314 	rte_ring_sc_dequeue(vpool->ring, (void **)&mbuf);
2315 	if (unlikely(mbuf == NULL)) {
2316 		LOG_DEBUG(VHOST_DATA,
2317 			"(%"PRIu64") in attach_rxmbuf_zcp: "
2318 			"ring_sc_dequeue fail.\n",
2319 			dev->device_fh);
2320 		put_desc_to_used_list_zcp(vq, desc_idx);
2321 		return;
2322 	}
2323 
2324 	if (unlikely(vpool->buf_size > desc->len)) {
2325 		LOG_DEBUG(VHOST_DATA,
2326 			"(%"PRIu64") in attach_rxmbuf_zcp: frame buffer "
2327 			"length(%d) of descriptor idx: %d less than room "
2328 			"size required: %d\n",
2329 			dev->device_fh, desc->len, desc_idx, vpool->buf_size);
2330 		put_desc_to_used_list_zcp(vq, desc_idx);
2331 		rte_ring_sp_enqueue(vpool->ring, (void *)mbuf);
2332 		return;
2333 	}
2334 
2335 	mbuf->buf_addr = (void *)(uintptr_t)(buff_addr - RTE_PKTMBUF_HEADROOM);
2336 	mbuf->pkt.data = (void *)(uintptr_t)(buff_addr);
2337 	mbuf->buf_physaddr = phys_addr - RTE_PKTMBUF_HEADROOM;
2338 	mbuf->pkt.data_len = desc->len;
2339 	MBUF_HEADROOM_UINT32(mbuf) = (uint32_t)desc_idx;
2340 
2341 	LOG_DEBUG(VHOST_DATA,
2342 		"(%"PRIu64") in attach_rxmbuf_zcp: res base idx:%d, "
2343 		"descriptor idx:%d\n",
2344 		dev->device_fh, res_base_idx, desc_idx);
2345 
2346 	__rte_mbuf_raw_free(mbuf);
2347 
2348 	return;
2349 }
2350 
2351 /*
2352  * Detach an attched packet mbuf -
2353  *  - restore original mbuf address and length values.
2354  *  - reset pktmbuf data and data_len to their default values.
2355  *  All other fields of the given packet mbuf will be left intact.
2356  *
2357  * @param m
2358  *   The attached packet mbuf.
2359  */
2360 static inline void pktmbuf_detach_zcp(struct rte_mbuf *m)
2361 {
2362 	const struct rte_mempool *mp = m->pool;
2363 	void *buf = RTE_MBUF_TO_BADDR(m);
2364 	uint32_t buf_ofs;
2365 	uint32_t buf_len = mp->elt_size - sizeof(*m);
2366 	m->buf_physaddr = rte_mempool_virt2phy(mp, m) + sizeof(*m);
2367 
2368 	m->buf_addr = buf;
2369 	m->buf_len = (uint16_t)buf_len;
2370 
2371 	buf_ofs = (RTE_PKTMBUF_HEADROOM <= m->buf_len) ?
2372 			RTE_PKTMBUF_HEADROOM : m->buf_len;
2373 	m->pkt.data = (char *) m->buf_addr + buf_ofs;
2374 
2375 	m->pkt.data_len = 0;
2376 }
2377 
2378 /*
2379  * This function is called after packets have been transimited. It fetchs mbuf
2380  * from vpool->pool, detached it and put into vpool->ring. It also update the
2381  * used index and kick the guest if necessary.
2382  */
2383 static inline uint32_t __attribute__((always_inline))
2384 txmbuf_clean_zcp(struct virtio_net *dev, struct vpool *vpool)
2385 {
2386 	struct rte_mbuf *mbuf;
2387 	struct vhost_virtqueue *vq = dev->virtqueue[VIRTIO_TXQ];
2388 	uint32_t used_idx = vq->last_used_idx & (vq->size - 1);
2389 	uint32_t index = 0;
2390 	uint32_t mbuf_count = rte_mempool_count(vpool->pool);
2391 
2392 	LOG_DEBUG(VHOST_DATA,
2393 		"(%"PRIu64") in txmbuf_clean_zcp: mbuf count in mempool before "
2394 		"clean is: %d\n",
2395 		dev->device_fh, mbuf_count);
2396 	LOG_DEBUG(VHOST_DATA,
2397 		"(%"PRIu64") in txmbuf_clean_zcp: mbuf count in  ring before "
2398 		"clean  is : %d\n",
2399 		dev->device_fh, rte_ring_count(vpool->ring));
2400 
2401 	for (index = 0; index < mbuf_count; index++) {
2402 		mbuf = __rte_mbuf_raw_alloc(vpool->pool);
2403 		if (likely(RTE_MBUF_INDIRECT(mbuf)))
2404 			pktmbuf_detach_zcp(mbuf);
2405 		rte_ring_sp_enqueue(vpool->ring, mbuf);
2406 
2407 		/* Update used index buffer information. */
2408 		vq->used->ring[used_idx].id = MBUF_HEADROOM_UINT32(mbuf);
2409 		vq->used->ring[used_idx].len = 0;
2410 
2411 		used_idx = (used_idx + 1) & (vq->size - 1);
2412 	}
2413 
2414 	LOG_DEBUG(VHOST_DATA,
2415 		"(%"PRIu64") in txmbuf_clean_zcp: mbuf count in mempool after "
2416 		"clean is: %d\n",
2417 		dev->device_fh, rte_mempool_count(vpool->pool));
2418 	LOG_DEBUG(VHOST_DATA,
2419 		"(%"PRIu64") in txmbuf_clean_zcp: mbuf count in  ring after "
2420 		"clean  is : %d\n",
2421 		dev->device_fh, rte_ring_count(vpool->ring));
2422 	LOG_DEBUG(VHOST_DATA,
2423 		"(%"PRIu64") in txmbuf_clean_zcp: before updated "
2424 		"vq->last_used_idx:%d\n",
2425 		dev->device_fh, vq->last_used_idx);
2426 
2427 	vq->last_used_idx += mbuf_count;
2428 
2429 	LOG_DEBUG(VHOST_DATA,
2430 		"(%"PRIu64") in txmbuf_clean_zcp: after updated "
2431 		"vq->last_used_idx:%d\n",
2432 		dev->device_fh, vq->last_used_idx);
2433 
2434 	rte_compiler_barrier();
2435 
2436 	*(volatile uint16_t *)&vq->used->idx += mbuf_count;
2437 
2438 	/* Kick guest if required. */
2439 	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
2440 		eventfd_write((int)vq->kickfd, 1);
2441 
2442 	return 0;
2443 }
2444 
2445 /*
2446  * This function is called when a virtio device is destroy.
2447  * It fetchs mbuf from vpool->pool, and detached it, and put into vpool->ring.
2448  */
2449 static void mbuf_destroy_zcp(struct vpool *vpool)
2450 {
2451 	struct rte_mbuf *mbuf = NULL;
2452 	uint32_t index, mbuf_count = rte_mempool_count(vpool->pool);
2453 
2454 	LOG_DEBUG(VHOST_CONFIG,
2455 		"in mbuf_destroy_zcp: mbuf count in mempool before "
2456 		"mbuf_destroy_zcp is: %d\n",
2457 		mbuf_count);
2458 	LOG_DEBUG(VHOST_CONFIG,
2459 		"in mbuf_destroy_zcp: mbuf count in  ring before "
2460 		"mbuf_destroy_zcp  is : %d\n",
2461 		rte_ring_count(vpool->ring));
2462 
2463 	for (index = 0; index < mbuf_count; index++) {
2464 		mbuf = __rte_mbuf_raw_alloc(vpool->pool);
2465 		if (likely(mbuf != NULL)) {
2466 			if (likely(RTE_MBUF_INDIRECT(mbuf)))
2467 				pktmbuf_detach_zcp(mbuf);
2468 			rte_ring_sp_enqueue(vpool->ring, (void *)mbuf);
2469 		}
2470 	}
2471 
2472 	LOG_DEBUG(VHOST_CONFIG,
2473 		"in mbuf_destroy_zcp: mbuf count in mempool after "
2474 		"mbuf_destroy_zcp is: %d\n",
2475 		rte_mempool_count(vpool->pool));
2476 	LOG_DEBUG(VHOST_CONFIG,
2477 		"in mbuf_destroy_zcp: mbuf count in ring after "
2478 		"mbuf_destroy_zcp is : %d\n",
2479 		rte_ring_count(vpool->ring));
2480 }
2481 
2482 /*
2483  * This function update the use flag and counter.
2484  */
2485 static inline uint32_t __attribute__((always_inline))
2486 virtio_dev_rx_zcp(struct virtio_net *dev, struct rte_mbuf **pkts,
2487 	uint32_t count)
2488 {
2489 	struct vhost_virtqueue *vq;
2490 	struct vring_desc *desc;
2491 	struct rte_mbuf *buff;
2492 	/* The virtio_hdr is initialised to 0. */
2493 	struct virtio_net_hdr_mrg_rxbuf virtio_hdr
2494 		= {{0, 0, 0, 0, 0, 0}, 0};
2495 	uint64_t buff_hdr_addr = 0;
2496 	uint32_t head[MAX_PKT_BURST], packet_len = 0;
2497 	uint32_t head_idx, packet_success = 0;
2498 	uint16_t res_cur_idx;
2499 
2500 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
2501 
2502 	if (count == 0)
2503 		return 0;
2504 
2505 	vq = dev->virtqueue[VIRTIO_RXQ];
2506 	count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;
2507 
2508 	res_cur_idx = vq->last_used_idx;
2509 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n",
2510 		dev->device_fh, res_cur_idx, res_cur_idx + count);
2511 
2512 	/* Retrieve all of the head indexes first to avoid caching issues. */
2513 	for (head_idx = 0; head_idx < count; head_idx++)
2514 		head[head_idx] = MBUF_HEADROOM_UINT32(pkts[head_idx]);
2515 
2516 	/*Prefetch descriptor index. */
2517 	rte_prefetch0(&vq->desc[head[packet_success]]);
2518 
2519 	while (packet_success != count) {
2520 		/* Get descriptor from available ring */
2521 		desc = &vq->desc[head[packet_success]];
2522 
2523 		buff = pkts[packet_success];
2524 		LOG_DEBUG(VHOST_DATA,
2525 			"(%"PRIu64") in dev_rx_zcp: update the used idx for "
2526 			"pkt[%d] descriptor idx: %d\n",
2527 			dev->device_fh, packet_success,
2528 			MBUF_HEADROOM_UINT32(buff));
2529 
2530 		PRINT_PACKET(dev,
2531 			(uintptr_t)(((uint64_t)(uintptr_t)buff->buf_addr)
2532 			+ RTE_PKTMBUF_HEADROOM),
2533 			rte_pktmbuf_data_len(buff), 0);
2534 
2535 		/* Buffer address translation for virtio header. */
2536 		buff_hdr_addr = gpa_to_vva(dev, desc->addr);
2537 		packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;
2538 
2539 		/*
2540 		 * If the descriptors are chained the header and data are
2541 		 * placed in separate buffers.
2542 		 */
2543 		if (desc->flags & VRING_DESC_F_NEXT) {
2544 			desc->len = vq->vhost_hlen;
2545 			desc = &vq->desc[desc->next];
2546 			desc->len = rte_pktmbuf_data_len(buff);
2547 		} else {
2548 			desc->len = packet_len;
2549 		}
2550 
2551 		/* Update used ring with desc information */
2552 		vq->used->ring[res_cur_idx & (vq->size - 1)].id
2553 			= head[packet_success];
2554 		vq->used->ring[res_cur_idx & (vq->size - 1)].len
2555 			= packet_len;
2556 		res_cur_idx++;
2557 		packet_success++;
2558 
2559 		/* A header is required per buffer. */
2560 		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
2561 			(const void *)&virtio_hdr, vq->vhost_hlen);
2562 
2563 		PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1);
2564 
2565 		if (likely(packet_success < count)) {
2566 			/* Prefetch descriptor index. */
2567 			rte_prefetch0(&vq->desc[head[packet_success]]);
2568 		}
2569 	}
2570 
2571 	rte_compiler_barrier();
2572 
2573 	LOG_DEBUG(VHOST_DATA,
2574 		"(%"PRIu64") in dev_rx_zcp: before update used idx: "
2575 		"vq.last_used_idx: %d, vq->used->idx: %d\n",
2576 		dev->device_fh, vq->last_used_idx, vq->used->idx);
2577 
2578 	*(volatile uint16_t *)&vq->used->idx += count;
2579 	vq->last_used_idx += count;
2580 
2581 	LOG_DEBUG(VHOST_DATA,
2582 		"(%"PRIu64") in dev_rx_zcp: after  update used idx: "
2583 		"vq.last_used_idx: %d, vq->used->idx: %d\n",
2584 		dev->device_fh, vq->last_used_idx, vq->used->idx);
2585 
2586 	/* Kick the guest if necessary. */
2587 	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
2588 		eventfd_write((int)vq->kickfd, 1);
2589 
2590 	return count;
2591 }
2592 
2593 /*
2594  * This function routes the TX packet to the correct interface.
2595  * This may be a local device or the physical port.
2596  */
2597 static inline void __attribute__((always_inline))
2598 virtio_tx_route_zcp(struct virtio_net *dev, struct rte_mbuf *m,
2599 	uint32_t desc_idx, uint8_t need_copy)
2600 {
2601 	struct mbuf_table *tx_q;
2602 	struct rte_mbuf **m_table;
2603 	struct rte_mbuf *mbuf = NULL;
2604 	unsigned len, ret, offset = 0;
2605 	struct vpool *vpool;
2606 	struct virtio_net_data_ll *dev_ll = ll_root_used;
2607 	struct ether_hdr *pkt_hdr = (struct ether_hdr *)m->pkt.data;
2608 	uint16_t vlan_tag = (uint16_t)vlan_tags[(uint16_t)dev->device_fh];
2609 
2610 	/*Add packet to the port tx queue*/
2611 	tx_q = &tx_queue_zcp[(uint16_t)dev->vmdq_rx_q];
2612 	len = tx_q->len;
2613 
2614 	/* Allocate an mbuf and populate the structure. */
2615 	vpool = &vpool_array[MAX_QUEUES + (uint16_t)dev->vmdq_rx_q];
2616 	rte_ring_sc_dequeue(vpool->ring, (void **)&mbuf);
2617 	if (unlikely(mbuf == NULL)) {
2618 		struct vhost_virtqueue *vq = dev->virtqueue[VIRTIO_TXQ];
2619 		RTE_LOG(ERR, VHOST_DATA,
2620 			"(%"PRIu64") Failed to allocate memory for mbuf.\n",
2621 			dev->device_fh);
2622 		put_desc_to_used_list_zcp(vq, desc_idx);
2623 		return;
2624 	}
2625 
2626 	if (vm2vm_mode == VM2VM_HARDWARE) {
2627 		/* Avoid using a vlan tag from any vm for external pkt, such as
2628 		 * vlan_tags[dev->device_fh], oterwise, it conflicts when pool
2629 		 * selection, MAC address determines it as an external pkt
2630 		 * which should go to network, while vlan tag determine it as
2631 		 * a vm2vm pkt should forward to another vm. Hardware confuse
2632 		 * such a ambiguous situation, so pkt will lost.
2633 		 */
2634 		vlan_tag = external_pkt_default_vlan_tag;
2635 		while (dev_ll != NULL) {
2636 			if (likely(dev_ll->dev->ready == DEVICE_RX) &&
2637 				ether_addr_cmp(&(pkt_hdr->d_addr),
2638 				&dev_ll->dev->mac_address)) {
2639 
2640 				/*
2641 				 * Drop the packet if the TX packet is destined
2642 				 * for the TX device.
2643 				 */
2644 				if (unlikely(dev_ll->dev->device_fh
2645 					== dev->device_fh)) {
2646 					LOG_DEBUG(VHOST_DATA,
2647 					"(%"PRIu64") TX: Source and destination"
2648 					"MAC addresses are the same. Dropping "
2649 					"packet.\n",
2650 					dev_ll->dev->device_fh);
2651 					MBUF_HEADROOM_UINT32(mbuf)
2652 						= (uint32_t)desc_idx;
2653 					__rte_mbuf_raw_free(mbuf);
2654 					return;
2655 				}
2656 
2657 				/*
2658 				 * Packet length offset 4 bytes for HW vlan
2659 				 * strip when L2 switch back.
2660 				 */
2661 				offset = 4;
2662 				vlan_tag =
2663 				(uint16_t)
2664 				vlan_tags[(uint16_t)dev_ll->dev->device_fh];
2665 
2666 				LOG_DEBUG(VHOST_DATA,
2667 				"(%"PRIu64") TX: pkt to local VM device id:"
2668 				"(%"PRIu64") vlan tag: %d.\n",
2669 				dev->device_fh, dev_ll->dev->device_fh,
2670 				vlan_tag);
2671 
2672 				break;
2673 			}
2674 			dev_ll = dev_ll->next;
2675 		}
2676 	}
2677 
2678 	mbuf->pkt.nb_segs = m->pkt.nb_segs;
2679 	mbuf->pkt.next = m->pkt.next;
2680 	mbuf->pkt.data_len = m->pkt.data_len + offset;
2681 	mbuf->pkt.pkt_len = mbuf->pkt.data_len;
2682 	if (unlikely(need_copy)) {
2683 		/* Copy the packet contents to the mbuf. */
2684 		rte_memcpy((void *)((uint8_t *)mbuf->pkt.data),
2685 			(const void *) ((uint8_t *)m->pkt.data),
2686 			m->pkt.data_len);
2687 	} else {
2688 		mbuf->pkt.data = m->pkt.data;
2689 		mbuf->buf_physaddr = m->buf_physaddr;
2690 		mbuf->buf_addr = m->buf_addr;
2691 	}
2692 	mbuf->ol_flags = PKT_TX_VLAN_PKT;
2693 	mbuf->pkt.vlan_macip.f.vlan_tci = vlan_tag;
2694 	mbuf->pkt.vlan_macip.f.l2_len = sizeof(struct ether_hdr);
2695 	mbuf->pkt.vlan_macip.f.l3_len = sizeof(struct ipv4_hdr);
2696 	MBUF_HEADROOM_UINT32(mbuf) = (uint32_t)desc_idx;
2697 
2698 	tx_q->m_table[len] = mbuf;
2699 	len++;
2700 
2701 	LOG_DEBUG(VHOST_DATA,
2702 		"(%"PRIu64") in tx_route_zcp: pkt: nb_seg: %d, next:%s\n",
2703 		dev->device_fh,
2704 		mbuf->pkt.nb_segs,
2705 		(mbuf->pkt.next == NULL) ? "null" : "non-null");
2706 
2707 	if (enable_stats) {
2708 		dev_statistics[dev->device_fh].tx_total++;
2709 		dev_statistics[dev->device_fh].tx++;
2710 	}
2711 
2712 	if (unlikely(len == MAX_PKT_BURST)) {
2713 		m_table = (struct rte_mbuf **)tx_q->m_table;
2714 		ret = rte_eth_tx_burst(ports[0],
2715 			(uint16_t)tx_q->txq_id, m_table, (uint16_t) len);
2716 
2717 		/*
2718 		 * Free any buffers not handled by TX and update
2719 		 * the port stats.
2720 		 */
2721 		if (unlikely(ret < len)) {
2722 			do {
2723 				rte_pktmbuf_free(m_table[ret]);
2724 			} while (++ret < len);
2725 		}
2726 
2727 		len = 0;
2728 		txmbuf_clean_zcp(dev, vpool);
2729 	}
2730 
2731 	tx_q->len = len;
2732 
2733 	return;
2734 }
2735 
2736 /*
2737  * This function TX all available packets in virtio TX queue for one
2738  * virtio-net device. If it is first packet, it learns MAC address and
2739  * setup VMDQ.
2740  */
2741 static inline void __attribute__((always_inline))
2742 virtio_dev_tx_zcp(struct virtio_net *dev)
2743 {
2744 	struct rte_mbuf m;
2745 	struct vhost_virtqueue *vq;
2746 	struct vring_desc *desc;
2747 	uint64_t buff_addr = 0, phys_addr;
2748 	uint32_t head[MAX_PKT_BURST];
2749 	uint32_t i;
2750 	uint16_t free_entries, packet_success = 0;
2751 	uint16_t avail_idx;
2752 	uint8_t need_copy = 0;
2753 	hpa_type addr_type;
2754 
2755 	vq = dev->virtqueue[VIRTIO_TXQ];
2756 	avail_idx =  *((volatile uint16_t *)&vq->avail->idx);
2757 
2758 	/* If there are no available buffers then return. */
2759 	if (vq->last_used_idx_res == avail_idx)
2760 		return;
2761 
2762 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_tx()\n", dev->device_fh);
2763 
2764 	/* Prefetch available ring to retrieve head indexes. */
2765 	rte_prefetch0(&vq->avail->ring[vq->last_used_idx_res & (vq->size - 1)]);
2766 
2767 	/* Get the number of free entries in the ring */
2768 	free_entries = (avail_idx - vq->last_used_idx_res);
2769 
2770 	/* Limit to MAX_PKT_BURST. */
2771 	free_entries
2772 		= (free_entries > MAX_PKT_BURST) ? MAX_PKT_BURST : free_entries;
2773 
2774 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Buffers available %d\n",
2775 		dev->device_fh, free_entries);
2776 
2777 	/* Retrieve all of the head indexes first to avoid caching issues. */
2778 	for (i = 0; i < free_entries; i++)
2779 		head[i]
2780 			= vq->avail->ring[(vq->last_used_idx_res + i)
2781 			& (vq->size - 1)];
2782 
2783 	vq->last_used_idx_res += free_entries;
2784 
2785 	/* Prefetch descriptor index. */
2786 	rte_prefetch0(&vq->desc[head[packet_success]]);
2787 	rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);
2788 
2789 	while (packet_success < free_entries) {
2790 		desc = &vq->desc[head[packet_success]];
2791 
2792 		/* Discard first buffer as it is the virtio header */
2793 		desc = &vq->desc[desc->next];
2794 
2795 		/* Buffer address translation. */
2796 		buff_addr = gpa_to_vva(dev, desc->addr);
2797 		phys_addr = gpa_to_hpa(dev, desc->addr, desc->len, &addr_type);
2798 
2799 		if (likely(packet_success < (free_entries - 1)))
2800 			/* Prefetch descriptor index. */
2801 			rte_prefetch0(&vq->desc[head[packet_success + 1]]);
2802 
2803 		if (unlikely(addr_type == PHYS_ADDR_INVALID)) {
2804 			RTE_LOG(ERR, VHOST_DATA,
2805 				"(%"PRIu64") Invalid frame buffer address found"
2806 				"when TX packets!\n",
2807 				dev->device_fh);
2808 			packet_success++;
2809 			continue;
2810 		}
2811 
2812 		/* Prefetch buffer address. */
2813 		rte_prefetch0((void *)(uintptr_t)buff_addr);
2814 
2815 		/*
2816 		 * Setup dummy mbuf. This is copied to a real mbuf if
2817 		 * transmitted out the physical port.
2818 		 */
2819 		m.pkt.data_len = desc->len;
2820 		m.pkt.nb_segs = 1;
2821 		m.pkt.next = NULL;
2822 		m.pkt.data = (void *)(uintptr_t)buff_addr;
2823 		m.buf_addr = m.pkt.data;
2824 		m.buf_physaddr = phys_addr;
2825 
2826 		/*
2827 		 * Check if the frame buffer address from guest crosses
2828 		 * sub-region or not.
2829 		 */
2830 		if (unlikely(addr_type == PHYS_ADDR_CROSS_SUBREG)) {
2831 			RTE_LOG(ERR, VHOST_DATA,
2832 				"(%"PRIu64") Frame buffer address cross "
2833 				"sub-regioin found when attaching TX frame "
2834 				"buffer address!\n",
2835 				dev->device_fh);
2836 			need_copy = 1;
2837 		} else
2838 			need_copy = 0;
2839 
2840 		PRINT_PACKET(dev, (uintptr_t)buff_addr, desc->len, 0);
2841 
2842 		/*
2843 		 * If this is the first received packet we need to learn
2844 		 * the MAC and setup VMDQ
2845 		 */
2846 		if (unlikely(dev->ready == DEVICE_MAC_LEARNING)) {
2847 			if (dev->remove || (link_vmdq(dev, &m) == -1)) {
2848 				/*
2849 				 * Discard frame if device is scheduled for
2850 				 * removal or a duplicate MAC address is found.
2851 				 */
2852 				packet_success += free_entries;
2853 				vq->last_used_idx += packet_success;
2854 				break;
2855 			}
2856 		}
2857 
2858 		virtio_tx_route_zcp(dev, &m, head[packet_success], need_copy);
2859 		packet_success++;
2860 	}
2861 }
2862 
2863 /*
2864  * This function is called by each data core. It handles all RX/TX registered
2865  * with the core. For TX the specific lcore linked list is used. For RX, MAC
2866  * addresses are compared with all devices in the main linked list.
2867  */
2868 static int
2869 switch_worker_zcp(__attribute__((unused)) void *arg)
2870 {
2871 	struct virtio_net *dev = NULL;
2872 	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
2873 	struct virtio_net_data_ll *dev_ll;
2874 	struct mbuf_table *tx_q;
2875 	volatile struct lcore_ll_info *lcore_ll;
2876 	const uint64_t drain_tsc
2877 		= (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S
2878 		* BURST_TX_DRAIN_US;
2879 	uint64_t prev_tsc, diff_tsc, cur_tsc, ret_count = 0;
2880 	unsigned ret;
2881 	const uint16_t lcore_id = rte_lcore_id();
2882 	uint16_t count_in_ring, rx_count = 0;
2883 
2884 	RTE_LOG(INFO, VHOST_DATA, "Procesing on Core %u started\n", lcore_id);
2885 
2886 	lcore_ll = lcore_info[lcore_id].lcore_ll;
2887 	prev_tsc = 0;
2888 
2889 	while (1) {
2890 		cur_tsc = rte_rdtsc();
2891 
2892 		/* TX burst queue drain */
2893 		diff_tsc = cur_tsc - prev_tsc;
2894 		if (unlikely(diff_tsc > drain_tsc)) {
2895 			/*
2896 			 * Get mbuf from vpool.pool and detach mbuf and
2897 			 * put back into vpool.ring.
2898 			 */
2899 			dev_ll = lcore_ll->ll_root_used;
2900 			while ((dev_ll != NULL) && (dev_ll->dev != NULL)) {
2901 				/* Get virtio device ID */
2902 				dev = dev_ll->dev;
2903 
2904 				if (likely(!dev->remove)) {
2905 					tx_q = &tx_queue_zcp[(uint16_t)dev->vmdq_rx_q];
2906 					if (tx_q->len) {
2907 						LOG_DEBUG(VHOST_DATA,
2908 						"TX queue drained after timeout"
2909 						" with burst size %u\n",
2910 						tx_q->len);
2911 
2912 						/*
2913 						 * Tx any packets in the queue
2914 						 */
2915 						ret = rte_eth_tx_burst(
2916 							ports[0],
2917 							(uint16_t)tx_q->txq_id,
2918 							(struct rte_mbuf **)
2919 							tx_q->m_table,
2920 							(uint16_t)tx_q->len);
2921 						if (unlikely(ret < tx_q->len)) {
2922 							do {
2923 								rte_pktmbuf_free(
2924 									tx_q->m_table[ret]);
2925 							} while (++ret < tx_q->len);
2926 						}
2927 						tx_q->len = 0;
2928 
2929 						txmbuf_clean_zcp(dev,
2930 							&vpool_array[MAX_QUEUES+dev->vmdq_rx_q]);
2931 					}
2932 				}
2933 				dev_ll = dev_ll->next;
2934 			}
2935 			prev_tsc = cur_tsc;
2936 		}
2937 
2938 		rte_prefetch0(lcore_ll->ll_root_used);
2939 
2940 		/*
2941 		 * Inform the configuration core that we have exited the linked
2942 		 * list and that no devices are in use if requested.
2943 		 */
2944 		if (lcore_ll->dev_removal_flag == REQUEST_DEV_REMOVAL)
2945 			lcore_ll->dev_removal_flag = ACK_DEV_REMOVAL;
2946 
2947 		/* Process devices */
2948 		dev_ll = lcore_ll->ll_root_used;
2949 
2950 		while ((dev_ll != NULL) && (dev_ll->dev != NULL)) {
2951 			dev = dev_ll->dev;
2952 			if (unlikely(dev->remove)) {
2953 				dev_ll = dev_ll->next;
2954 				unlink_vmdq(dev);
2955 				dev->ready = DEVICE_SAFE_REMOVE;
2956 				continue;
2957 			}
2958 
2959 			if (likely(dev->ready == DEVICE_RX)) {
2960 				uint32_t index = dev->vmdq_rx_q;
2961 				uint16_t i;
2962 				count_in_ring
2963 				= rte_ring_count(vpool_array[index].ring);
2964 				uint16_t free_entries
2965 				= (uint16_t)get_available_ring_num_zcp(dev);
2966 
2967 				/*
2968 				 * Attach all mbufs in vpool.ring and put back
2969 				 * into vpool.pool.
2970 				 */
2971 				for (i = 0;
2972 				i < RTE_MIN(free_entries,
2973 				RTE_MIN(count_in_ring, MAX_PKT_BURST));
2974 				i++)
2975 					attach_rxmbuf_zcp(dev);
2976 
2977 				/* Handle guest RX */
2978 				rx_count = rte_eth_rx_burst(ports[0],
2979 					(uint16_t)dev->vmdq_rx_q, pkts_burst,
2980 					MAX_PKT_BURST);
2981 
2982 				if (rx_count) {
2983 					ret_count = virtio_dev_rx_zcp(dev,
2984 							pkts_burst, rx_count);
2985 					if (enable_stats) {
2986 						dev_statistics[dev->device_fh].rx_total
2987 							+= rx_count;
2988 						dev_statistics[dev->device_fh].rx
2989 							+= ret_count;
2990 					}
2991 					while (likely(rx_count)) {
2992 						rx_count--;
2993 						pktmbuf_detach_zcp(
2994 							pkts_burst[rx_count]);
2995 						rte_ring_sp_enqueue(
2996 							vpool_array[index].ring,
2997 							(void *)pkts_burst[rx_count]);
2998 					}
2999 				}
3000 			}
3001 
3002 			if (likely(!dev->remove))
3003 				/* Handle guest TX */
3004 				virtio_dev_tx_zcp(dev);
3005 
3006 			/* Move to the next device in the list */
3007 			dev_ll = dev_ll->next;
3008 		}
3009 	}
3010 
3011 	return 0;
3012 }
3013 
3014 
3015 /*
3016  * Add an entry to a used linked list. A free entry must first be found
3017  * in the free linked list using get_data_ll_free_entry();
3018  */
3019 static void
3020 add_data_ll_entry(struct virtio_net_data_ll **ll_root_addr,
3021 	struct virtio_net_data_ll *ll_dev)
3022 {
3023 	struct virtio_net_data_ll *ll = *ll_root_addr;
3024 
3025 	/* Set next as NULL and use a compiler barrier to avoid reordering. */
3026 	ll_dev->next = NULL;
3027 	rte_compiler_barrier();
3028 
3029 	/* If ll == NULL then this is the first device. */
3030 	if (ll) {
3031 		/* Increment to the tail of the linked list. */
3032 		while ((ll->next != NULL) )
3033 			ll = ll->next;
3034 
3035 		ll->next = ll_dev;
3036 	} else {
3037 		*ll_root_addr = ll_dev;
3038 	}
3039 }
3040 
3041 /*
3042  * Remove an entry from a used linked list. The entry must then be added to
3043  * the free linked list using put_data_ll_free_entry().
3044  */
3045 static void
3046 rm_data_ll_entry(struct virtio_net_data_ll **ll_root_addr,
3047 	struct virtio_net_data_ll *ll_dev,
3048 	struct virtio_net_data_ll *ll_dev_last)
3049 {
3050 	struct virtio_net_data_ll *ll = *ll_root_addr;
3051 
3052 	if (unlikely((ll == NULL) || (ll_dev == NULL)))
3053 		return;
3054 
3055 	if (ll_dev == ll)
3056 		*ll_root_addr = ll_dev->next;
3057 	else
3058 		if (likely(ll_dev_last != NULL))
3059 			ll_dev_last->next = ll_dev->next;
3060 		else
3061 			RTE_LOG(ERR, VHOST_CONFIG, "Remove entry form ll failed.\n");
3062 }
3063 
3064 /*
3065  * Find and return an entry from the free linked list.
3066  */
3067 static struct virtio_net_data_ll *
3068 get_data_ll_free_entry(struct virtio_net_data_ll **ll_root_addr)
3069 {
3070 	struct virtio_net_data_ll *ll_free = *ll_root_addr;
3071 	struct virtio_net_data_ll *ll_dev;
3072 
3073 	if (ll_free == NULL)
3074 		return NULL;
3075 
3076 	ll_dev = ll_free;
3077 	*ll_root_addr = ll_free->next;
3078 
3079 	return ll_dev;
3080 }
3081 
3082 /*
3083  * Place an entry back on to the free linked list.
3084  */
3085 static void
3086 put_data_ll_free_entry(struct virtio_net_data_ll **ll_root_addr,
3087 	struct virtio_net_data_ll *ll_dev)
3088 {
3089 	struct virtio_net_data_ll *ll_free = *ll_root_addr;
3090 
3091 	if (ll_dev == NULL)
3092 		return;
3093 
3094 	ll_dev->next = ll_free;
3095 	*ll_root_addr = ll_dev;
3096 }
3097 
3098 /*
3099  * Creates a linked list of a given size.
3100  */
3101 static struct virtio_net_data_ll *
3102 alloc_data_ll(uint32_t size)
3103 {
3104 	struct virtio_net_data_ll *ll_new;
3105 	uint32_t i;
3106 
3107 	/* Malloc and then chain the linked list. */
3108 	ll_new = malloc(size * sizeof(struct virtio_net_data_ll));
3109 	if (ll_new == NULL) {
3110 		RTE_LOG(ERR, VHOST_CONFIG, "Failed to allocate memory for ll_new.\n");
3111 		return NULL;
3112 	}
3113 
3114 	for (i = 0; i < size - 1; i++) {
3115 		ll_new[i].dev = NULL;
3116 		ll_new[i].next = &ll_new[i+1];
3117 	}
3118 	ll_new[i].next = NULL;
3119 
3120 	return (ll_new);
3121 }
3122 
3123 /*
3124  * Create the main linked list along with each individual cores linked list. A used and a free list
3125  * are created to manage entries.
3126  */
3127 static int
3128 init_data_ll (void)
3129 {
3130 	int lcore;
3131 
3132 	RTE_LCORE_FOREACH_SLAVE(lcore) {
3133 		lcore_info[lcore].lcore_ll = malloc(sizeof(struct lcore_ll_info));
3134 		if (lcore_info[lcore].lcore_ll == NULL) {
3135 			RTE_LOG(ERR, VHOST_CONFIG, "Failed to allocate memory for lcore_ll.\n");
3136 			return -1;
3137 		}
3138 
3139 		lcore_info[lcore].lcore_ll->device_num = 0;
3140 		lcore_info[lcore].lcore_ll->dev_removal_flag = ACK_DEV_REMOVAL;
3141 		lcore_info[lcore].lcore_ll->ll_root_used = NULL;
3142 		if (num_devices % num_switching_cores)
3143 			lcore_info[lcore].lcore_ll->ll_root_free = alloc_data_ll((num_devices / num_switching_cores) + 1);
3144 		else
3145 			lcore_info[lcore].lcore_ll->ll_root_free = alloc_data_ll(num_devices / num_switching_cores);
3146 	}
3147 
3148 	/* Allocate devices up to a maximum of MAX_DEVICES. */
3149 	ll_root_free = alloc_data_ll(MIN((num_devices), MAX_DEVICES));
3150 
3151 	return 0;
3152 }
3153 
3154 /*
3155  * Set virtqueue flags so that we do not receive interrupts.
3156  */
3157 static void
3158 set_irq_status (struct virtio_net *dev)
3159 {
3160 	dev->virtqueue[VIRTIO_RXQ]->used->flags = VRING_USED_F_NO_NOTIFY;
3161 	dev->virtqueue[VIRTIO_TXQ]->used->flags = VRING_USED_F_NO_NOTIFY;
3162 }
3163 
3164 /*
3165  * Remove a device from the specific data core linked list and from the main linked list. Synchonization
3166  * occurs through the use of the lcore dev_removal_flag. Device is made volatile here to avoid re-ordering
3167  * of dev->remove=1 which can cause an infinite loop in the rte_pause loop.
3168  */
3169 static void
3170 destroy_device (volatile struct virtio_net *dev)
3171 {
3172 	struct virtio_net_data_ll *ll_lcore_dev_cur;
3173 	struct virtio_net_data_ll *ll_main_dev_cur;
3174 	struct virtio_net_data_ll *ll_lcore_dev_last = NULL;
3175 	struct virtio_net_data_ll *ll_main_dev_last = NULL;
3176 	int lcore;
3177 
3178 	dev->flags &= ~VIRTIO_DEV_RUNNING;
3179 
3180 	/*set the remove flag. */
3181 	dev->remove = 1;
3182 
3183 	while(dev->ready != DEVICE_SAFE_REMOVE) {
3184 		rte_pause();
3185 	}
3186 
3187 	/* Search for entry to be removed from lcore ll */
3188 	ll_lcore_dev_cur = lcore_info[dev->coreid].lcore_ll->ll_root_used;
3189 	while (ll_lcore_dev_cur != NULL) {
3190 		if (ll_lcore_dev_cur->dev == dev) {
3191 			break;
3192 		} else {
3193 			ll_lcore_dev_last = ll_lcore_dev_cur;
3194 			ll_lcore_dev_cur = ll_lcore_dev_cur->next;
3195 		}
3196 	}
3197 
3198 	if (ll_lcore_dev_cur == NULL) {
3199 		RTE_LOG(ERR, VHOST_CONFIG,
3200 			"(%"PRIu64") Failed to find the dev to be destroy.\n",
3201 			dev->device_fh);
3202 		return;
3203 	}
3204 
3205 	/* Search for entry to be removed from main ll */
3206 	ll_main_dev_cur = ll_root_used;
3207 	ll_main_dev_last = NULL;
3208 	while (ll_main_dev_cur != NULL) {
3209 		if (ll_main_dev_cur->dev == dev) {
3210 			break;
3211 		} else {
3212 			ll_main_dev_last = ll_main_dev_cur;
3213 			ll_main_dev_cur = ll_main_dev_cur->next;
3214 		}
3215 	}
3216 
3217 	/* Remove entries from the lcore and main ll. */
3218 	rm_data_ll_entry(&lcore_info[ll_lcore_dev_cur->dev->coreid].lcore_ll->ll_root_used, ll_lcore_dev_cur, ll_lcore_dev_last);
3219 	rm_data_ll_entry(&ll_root_used, ll_main_dev_cur, ll_main_dev_last);
3220 
3221 	/* Set the dev_removal_flag on each lcore. */
3222 	RTE_LCORE_FOREACH_SLAVE(lcore) {
3223 		lcore_info[lcore].lcore_ll->dev_removal_flag = REQUEST_DEV_REMOVAL;
3224 	}
3225 
3226 	/*
3227 	 * Once each core has set the dev_removal_flag to ACK_DEV_REMOVAL we can be sure that
3228 	 * they can no longer access the device removed from the linked lists and that the devices
3229 	 * are no longer in use.
3230 	 */
3231 	RTE_LCORE_FOREACH_SLAVE(lcore) {
3232 		while (lcore_info[lcore].lcore_ll->dev_removal_flag != ACK_DEV_REMOVAL) {
3233 			rte_pause();
3234 		}
3235 	}
3236 
3237 	/* Add the entries back to the lcore and main free ll.*/
3238 	put_data_ll_free_entry(&lcore_info[ll_lcore_dev_cur->dev->coreid].lcore_ll->ll_root_free, ll_lcore_dev_cur);
3239 	put_data_ll_free_entry(&ll_root_free, ll_main_dev_cur);
3240 
3241 	/* Decrement number of device on the lcore. */
3242 	lcore_info[ll_lcore_dev_cur->dev->coreid].lcore_ll->device_num--;
3243 
3244 	RTE_LOG(INFO, VHOST_DATA, "(%"PRIu64") Device has been removed from data core\n", dev->device_fh);
3245 
3246 	if (zero_copy) {
3247 		struct vpool *vpool = &vpool_array[dev->vmdq_rx_q];
3248 
3249 		/* Stop the RX queue. */
3250 		if (rte_eth_dev_rx_queue_stop(ports[0], dev->vmdq_rx_q) != 0) {
3251 			LOG_DEBUG(VHOST_CONFIG,
3252 				"(%"PRIu64") In destroy_device: Failed to stop "
3253 				"rx queue:%d\n",
3254 				dev->device_fh,
3255 				dev->vmdq_rx_q);
3256 		}
3257 
3258 		LOG_DEBUG(VHOST_CONFIG,
3259 			"(%"PRIu64") in destroy_device: Start put mbuf in "
3260 			"mempool back to ring for RX queue: %d\n",
3261 			dev->device_fh, dev->vmdq_rx_q);
3262 
3263 		mbuf_destroy_zcp(vpool);
3264 
3265 		/* Stop the TX queue. */
3266 		if (rte_eth_dev_tx_queue_stop(ports[0], dev->vmdq_rx_q) != 0) {
3267 			LOG_DEBUG(VHOST_CONFIG,
3268 				"(%"PRIu64") In destroy_device: Failed to "
3269 				"stop tx queue:%d\n",
3270 				dev->device_fh, dev->vmdq_rx_q);
3271 		}
3272 
3273 		vpool = &vpool_array[dev->vmdq_rx_q + MAX_QUEUES];
3274 
3275 		LOG_DEBUG(VHOST_CONFIG,
3276 			"(%"PRIu64") destroy_device: Start put mbuf in mempool "
3277 			"back to ring for TX queue: %d, dev:(%"PRIu64")\n",
3278 			dev->device_fh, (dev->vmdq_rx_q + MAX_QUEUES),
3279 			dev->device_fh);
3280 
3281 		mbuf_destroy_zcp(vpool);
3282 	}
3283 
3284 }
3285 
3286 /*
3287  * A new device is added to a data core. First the device is added to the main linked list
3288  * and the allocated to a specific data core.
3289  */
3290 static int
3291 new_device (struct virtio_net *dev)
3292 {
3293 	struct virtio_net_data_ll *ll_dev;
3294 	int lcore, core_add = 0;
3295 	uint32_t device_num_min = num_devices;
3296 
3297 	/* Add device to main ll */
3298 	ll_dev = get_data_ll_free_entry(&ll_root_free);
3299 	if (ll_dev == NULL) {
3300 		RTE_LOG(INFO, VHOST_DATA, "(%"PRIu64") No free entry found in linked list. Device limit "
3301 			"of %d devices per core has been reached\n",
3302 			dev->device_fh, num_devices);
3303 		return -1;
3304 	}
3305 	ll_dev->dev = dev;
3306 	add_data_ll_entry(&ll_root_used, ll_dev);
3307 	ll_dev->dev->vmdq_rx_q
3308 		= ll_dev->dev->device_fh * (num_queues / num_devices);
3309 
3310 	if (zero_copy) {
3311 		uint32_t index = ll_dev->dev->vmdq_rx_q;
3312 		uint32_t count_in_ring, i;
3313 		struct mbuf_table *tx_q;
3314 
3315 		count_in_ring = rte_ring_count(vpool_array[index].ring);
3316 
3317 		LOG_DEBUG(VHOST_CONFIG,
3318 			"(%"PRIu64") in new_device: mbuf count in mempool "
3319 			"before attach is: %d\n",
3320 			dev->device_fh,
3321 			rte_mempool_count(vpool_array[index].pool));
3322 		LOG_DEBUG(VHOST_CONFIG,
3323 			"(%"PRIu64") in new_device: mbuf count in  ring "
3324 			"before attach  is : %d\n",
3325 			dev->device_fh, count_in_ring);
3326 
3327 		/*
3328 		 * Attach all mbufs in vpool.ring and put back intovpool.pool.
3329 		 */
3330 		for (i = 0; i < count_in_ring; i++)
3331 			attach_rxmbuf_zcp(dev);
3332 
3333 		LOG_DEBUG(VHOST_CONFIG, "(%"PRIu64") in new_device: mbuf count in "
3334 			"mempool after attach is: %d\n",
3335 			dev->device_fh,
3336 			rte_mempool_count(vpool_array[index].pool));
3337 		LOG_DEBUG(VHOST_CONFIG, "(%"PRIu64") in new_device: mbuf count in "
3338 			"ring after attach  is : %d\n",
3339 			dev->device_fh,
3340 			rte_ring_count(vpool_array[index].ring));
3341 
3342 		tx_q = &tx_queue_zcp[(uint16_t)dev->vmdq_rx_q];
3343 		tx_q->txq_id = dev->vmdq_rx_q;
3344 
3345 		if (rte_eth_dev_tx_queue_start(ports[0], dev->vmdq_rx_q) != 0) {
3346 			struct vpool *vpool = &vpool_array[dev->vmdq_rx_q];
3347 
3348 			LOG_DEBUG(VHOST_CONFIG,
3349 				"(%"PRIu64") In new_device: Failed to start "
3350 				"tx queue:%d\n",
3351 				dev->device_fh, dev->vmdq_rx_q);
3352 
3353 			mbuf_destroy_zcp(vpool);
3354 			return -1;
3355 		}
3356 
3357 		if (rte_eth_dev_rx_queue_start(ports[0], dev->vmdq_rx_q) != 0) {
3358 			struct vpool *vpool = &vpool_array[dev->vmdq_rx_q];
3359 
3360 			LOG_DEBUG(VHOST_CONFIG,
3361 				"(%"PRIu64") In new_device: Failed to start "
3362 				"rx queue:%d\n",
3363 				dev->device_fh, dev->vmdq_rx_q);
3364 
3365 			/* Stop the TX queue. */
3366 			if (rte_eth_dev_tx_queue_stop(ports[0],
3367 				dev->vmdq_rx_q) != 0) {
3368 				LOG_DEBUG(VHOST_CONFIG,
3369 					"(%"PRIu64") In new_device: Failed to "
3370 					"stop tx queue:%d\n",
3371 					dev->device_fh, dev->vmdq_rx_q);
3372 			}
3373 
3374 			mbuf_destroy_zcp(vpool);
3375 			return -1;
3376 		}
3377 
3378 	}
3379 
3380 	/*reset ready flag*/
3381 	dev->ready = DEVICE_MAC_LEARNING;
3382 	dev->remove = 0;
3383 
3384 	/* Find a suitable lcore to add the device. */
3385 	RTE_LCORE_FOREACH_SLAVE(lcore) {
3386 		if (lcore_info[lcore].lcore_ll->device_num < device_num_min) {
3387 			device_num_min = lcore_info[lcore].lcore_ll->device_num;
3388 			core_add = lcore;
3389 		}
3390 	}
3391 	/* Add device to lcore ll */
3392 	ll_dev->dev->coreid = core_add;
3393 	ll_dev = get_data_ll_free_entry(&lcore_info[ll_dev->dev->coreid].lcore_ll->ll_root_free);
3394 	if (ll_dev == NULL) {
3395 		RTE_LOG(INFO, VHOST_DATA, "(%"PRIu64") Failed to add device to data core\n", dev->device_fh);
3396 		dev->ready = DEVICE_SAFE_REMOVE;
3397 		destroy_device(dev);
3398 		return -1;
3399 	}
3400 	ll_dev->dev = dev;
3401 	add_data_ll_entry(&lcore_info[ll_dev->dev->coreid].lcore_ll->ll_root_used, ll_dev);
3402 
3403 	/* Initialize device stats */
3404 	memset(&dev_statistics[dev->device_fh], 0, sizeof(struct device_statistics));
3405 
3406 	/* Disable notifications. */
3407 	set_irq_status(dev);
3408 	lcore_info[ll_dev->dev->coreid].lcore_ll->device_num++;
3409 	dev->flags |= VIRTIO_DEV_RUNNING;
3410 
3411 	RTE_LOG(INFO, VHOST_DATA, "(%"PRIu64") Device has been added to data core %d\n", dev->device_fh, dev->coreid);
3412 
3413 	return 0;
3414 }
3415 
3416 /*
3417  * These callback allow devices to be added to the data core when configuration
3418  * has been fully complete.
3419  */
3420 static const struct virtio_net_device_ops virtio_net_device_ops =
3421 {
3422 	.new_device =  new_device,
3423 	.destroy_device = destroy_device,
3424 };
3425 
3426 /*
3427  * This is a thread will wake up after a period to print stats if the user has
3428  * enabled them.
3429  */
3430 static void
3431 print_stats(void)
3432 {
3433 	struct virtio_net_data_ll *dev_ll;
3434 	uint64_t tx_dropped, rx_dropped;
3435 	uint64_t tx, tx_total, rx, rx_total;
3436 	uint32_t device_fh;
3437 	const char clr[] = { 27, '[', '2', 'J', '\0' };
3438 	const char top_left[] = { 27, '[', '1', ';', '1', 'H','\0' };
3439 
3440 	while(1) {
3441 		sleep(enable_stats);
3442 
3443 		/* Clear screen and move to top left */
3444 		printf("%s%s", clr, top_left);
3445 
3446 		printf("\nDevice statistics ====================================");
3447 
3448 		dev_ll = ll_root_used;
3449 		while (dev_ll != NULL) {
3450 			device_fh = (uint32_t)dev_ll->dev->device_fh;
3451 			tx_total = dev_statistics[device_fh].tx_total;
3452 			tx = dev_statistics[device_fh].tx;
3453 			tx_dropped = tx_total - tx;
3454 			if (zero_copy == 0) {
3455 				rx_total = rte_atomic64_read(
3456 					&dev_statistics[device_fh].rx_total_atomic);
3457 				rx = rte_atomic64_read(
3458 					&dev_statistics[device_fh].rx_atomic);
3459 			} else {
3460 				rx_total = dev_statistics[device_fh].rx_total;
3461 				rx = dev_statistics[device_fh].rx;
3462 			}
3463 			rx_dropped = rx_total - rx;
3464 
3465 			printf("\nStatistics for device %"PRIu32" ------------------------------"
3466 					"\nTX total: 		%"PRIu64""
3467 					"\nTX dropped: 		%"PRIu64""
3468 					"\nTX successful: 		%"PRIu64""
3469 					"\nRX total: 		%"PRIu64""
3470 					"\nRX dropped: 		%"PRIu64""
3471 					"\nRX successful: 		%"PRIu64"",
3472 					device_fh,
3473 					tx_total,
3474 					tx_dropped,
3475 					tx,
3476 					rx_total,
3477 					rx_dropped,
3478 					rx);
3479 
3480 			dev_ll = dev_ll->next;
3481 		}
3482 		printf("\n======================================================\n");
3483 	}
3484 }
3485 
3486 static void
3487 setup_mempool_tbl(int socket, uint32_t index, char *pool_name,
3488 	char *ring_name, uint32_t nb_mbuf)
3489 {
3490 	uint16_t roomsize = VIRTIO_DESCRIPTOR_LEN_ZCP + RTE_PKTMBUF_HEADROOM;
3491 	vpool_array[index].pool
3492 		= rte_mempool_create(pool_name, nb_mbuf, MBUF_SIZE_ZCP,
3493 		MBUF_CACHE_SIZE_ZCP, sizeof(struct rte_pktmbuf_pool_private),
3494 		rte_pktmbuf_pool_init, (void *)(uintptr_t)roomsize,
3495 		rte_pktmbuf_init, NULL, socket, 0);
3496 	if (vpool_array[index].pool != NULL) {
3497 		vpool_array[index].ring
3498 			= rte_ring_create(ring_name,
3499 				rte_align32pow2(nb_mbuf + 1),
3500 				socket, RING_F_SP_ENQ | RING_F_SC_DEQ);
3501 		if (likely(vpool_array[index].ring != NULL)) {
3502 			LOG_DEBUG(VHOST_CONFIG,
3503 				"in setup_mempool_tbl: mbuf count in "
3504 				"mempool is: %d\n",
3505 				rte_mempool_count(vpool_array[index].pool));
3506 			LOG_DEBUG(VHOST_CONFIG,
3507 				"in setup_mempool_tbl: mbuf count in "
3508 				"ring   is: %d\n",
3509 				rte_ring_count(vpool_array[index].ring));
3510 		} else {
3511 			rte_exit(EXIT_FAILURE, "ring_create(%s) failed",
3512 				ring_name);
3513 		}
3514 
3515 		/* Need consider head room. */
3516 		vpool_array[index].buf_size = roomsize - RTE_PKTMBUF_HEADROOM;
3517 	} else {
3518 		rte_exit(EXIT_FAILURE, "mempool_create(%s) failed", pool_name);
3519 	}
3520 }
3521 
3522 
3523 /*
3524  * Main function, does initialisation and calls the per-lcore functions. The CUSE
3525  * device is also registered here to handle the IOCTLs.
3526  */
3527 int
3528 MAIN(int argc, char *argv[])
3529 {
3530 	struct rte_mempool *mbuf_pool = NULL;
3531 	unsigned lcore_id, core_id = 0;
3532 	unsigned nb_ports, valid_num_ports;
3533 	int ret;
3534 	uint8_t portid, queue_id = 0;
3535 	static pthread_t tid;
3536 
3537 	/* init EAL */
3538 	ret = rte_eal_init(argc, argv);
3539 	if (ret < 0)
3540 		rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
3541 	argc -= ret;
3542 	argv += ret;
3543 
3544 	/* parse app arguments */
3545 	ret = us_vhost_parse_args(argc, argv);
3546 	if (ret < 0)
3547 		rte_exit(EXIT_FAILURE, "Invalid argument\n");
3548 
3549 	if (rte_eal_pci_probe() != 0)
3550 		rte_exit(EXIT_FAILURE, "Error with NIC driver initialization\n");
3551 
3552 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id ++)
3553 		if (rte_lcore_is_enabled(lcore_id))
3554 			lcore_ids[core_id ++] = lcore_id;
3555 
3556 	if (rte_lcore_count() > RTE_MAX_LCORE)
3557 		rte_exit(EXIT_FAILURE,"Not enough cores\n");
3558 
3559 	/*set the number of swithcing cores available*/
3560 	num_switching_cores = rte_lcore_count()-1;
3561 
3562 	/* Get the number of physical ports. */
3563 	nb_ports = rte_eth_dev_count();
3564 	if (nb_ports > RTE_MAX_ETHPORTS)
3565 		nb_ports = RTE_MAX_ETHPORTS;
3566 
3567 	/*
3568 	 * Update the global var NUM_PORTS and global array PORTS
3569 	 * and get value of var VALID_NUM_PORTS according to system ports number
3570 	 */
3571 	valid_num_ports = check_ports_num(nb_ports);
3572 
3573 	if ((valid_num_ports ==  0) || (valid_num_ports > MAX_SUP_PORTS)) {
3574 		RTE_LOG(INFO, VHOST_PORT, "Current enabled port number is %u,"
3575 			"but only %u port can be enabled\n",num_ports, MAX_SUP_PORTS);
3576 		return -1;
3577 	}
3578 
3579 	if (zero_copy == 0) {
3580 		/* Create the mbuf pool. */
3581 		mbuf_pool = rte_mempool_create(
3582 				"MBUF_POOL",
3583 				NUM_MBUFS_PER_PORT
3584 				* valid_num_ports,
3585 				MBUF_SIZE, MBUF_CACHE_SIZE,
3586 				sizeof(struct rte_pktmbuf_pool_private),
3587 				rte_pktmbuf_pool_init, NULL,
3588 				rte_pktmbuf_init, NULL,
3589 				rte_socket_id(), 0);
3590 		if (mbuf_pool == NULL)
3591 			rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
3592 
3593 		for (queue_id = 0; queue_id < MAX_QUEUES + 1; queue_id++)
3594 			vpool_array[queue_id].pool = mbuf_pool;
3595 
3596 		if (vm2vm_mode == VM2VM_HARDWARE) {
3597 			/* Enable VT loop back to let L2 switch to do it. */
3598 			vmdq_conf_default.rx_adv_conf.vmdq_rx_conf.enable_loop_back = 1;
3599 			LOG_DEBUG(VHOST_CONFIG,
3600 				"Enable loop back for L2 switch in vmdq.\n");
3601 		}
3602 	} else {
3603 		uint32_t nb_mbuf;
3604 		char pool_name[RTE_MEMPOOL_NAMESIZE];
3605 		char ring_name[RTE_MEMPOOL_NAMESIZE];
3606 
3607 		rx_conf_default.start_rx_per_q = (uint8_t)zero_copy;
3608 		rx_conf_default.rx_drop_en = 0;
3609 		tx_conf_default.start_tx_per_q = (uint8_t)zero_copy;
3610 		nb_mbuf = num_rx_descriptor
3611 			+ num_switching_cores * MBUF_CACHE_SIZE_ZCP
3612 			+ num_switching_cores * MAX_PKT_BURST;
3613 
3614 		for (queue_id = 0; queue_id < MAX_QUEUES; queue_id++) {
3615 			snprintf(pool_name, sizeof(pool_name),
3616 				"rxmbuf_pool_%u", queue_id);
3617 			snprintf(ring_name, sizeof(ring_name),
3618 				"rxmbuf_ring_%u", queue_id);
3619 			setup_mempool_tbl(rte_socket_id(), queue_id,
3620 				pool_name, ring_name, nb_mbuf);
3621 		}
3622 
3623 		nb_mbuf = num_tx_descriptor
3624 				+ num_switching_cores * MBUF_CACHE_SIZE_ZCP
3625 				+ num_switching_cores * MAX_PKT_BURST;
3626 
3627 		for (queue_id = 0; queue_id < MAX_QUEUES; queue_id++) {
3628 			snprintf(pool_name, sizeof(pool_name),
3629 				"txmbuf_pool_%u", queue_id);
3630 			snprintf(ring_name, sizeof(ring_name),
3631 				"txmbuf_ring_%u", queue_id);
3632 			setup_mempool_tbl(rte_socket_id(),
3633 				(queue_id + MAX_QUEUES),
3634 				pool_name, ring_name, nb_mbuf);
3635 		}
3636 
3637 		if (vm2vm_mode == VM2VM_HARDWARE) {
3638 			/* Enable VT loop back to let L2 switch to do it. */
3639 			vmdq_conf_default.rx_adv_conf.vmdq_rx_conf.enable_loop_back = 1;
3640 			LOG_DEBUG(VHOST_CONFIG,
3641 				"Enable loop back for L2 switch in vmdq.\n");
3642 		}
3643 	}
3644 	/* Set log level. */
3645 	rte_set_log_level(LOG_LEVEL);
3646 
3647 	/* initialize all ports */
3648 	for (portid = 0; portid < nb_ports; portid++) {
3649 		/* skip ports that are not enabled */
3650 		if ((enabled_port_mask & (1 << portid)) == 0) {
3651 			RTE_LOG(INFO, VHOST_PORT,
3652 				"Skipping disabled port %d\n", portid);
3653 			continue;
3654 		}
3655 		if (port_init(portid) != 0)
3656 			rte_exit(EXIT_FAILURE,
3657 				"Cannot initialize network ports\n");
3658 	}
3659 
3660 	/* Initialise all linked lists. */
3661 	if (init_data_ll() == -1)
3662 		rte_exit(EXIT_FAILURE, "Failed to initialize linked list\n");
3663 
3664 	/* Initialize device stats */
3665 	memset(&dev_statistics, 0, sizeof(dev_statistics));
3666 
3667 	/* Enable stats if the user option is set. */
3668 	if (enable_stats)
3669 		pthread_create(&tid, NULL, (void*)print_stats, NULL );
3670 
3671 	/* Launch all data cores. */
3672 	if (zero_copy == 0) {
3673 		RTE_LCORE_FOREACH_SLAVE(lcore_id) {
3674 			rte_eal_remote_launch(switch_worker,
3675 				mbuf_pool, lcore_id);
3676 		}
3677 	} else {
3678 		uint32_t count_in_mempool, index, i;
3679 		for (index = 0; index < 2*MAX_QUEUES; index++) {
3680 			/* For all RX and TX queues. */
3681 			count_in_mempool
3682 				= rte_mempool_count(vpool_array[index].pool);
3683 
3684 			/*
3685 			 * Transfer all un-attached mbufs from vpool.pool
3686 			 * to vpoo.ring.
3687 			 */
3688 			for (i = 0; i < count_in_mempool; i++) {
3689 				struct rte_mbuf *mbuf
3690 					= __rte_mbuf_raw_alloc(
3691 						vpool_array[index].pool);
3692 				rte_ring_sp_enqueue(vpool_array[index].ring,
3693 						(void *)mbuf);
3694 			}
3695 
3696 			LOG_DEBUG(VHOST_CONFIG,
3697 				"in MAIN: mbuf count in mempool at initial "
3698 				"is: %d\n", count_in_mempool);
3699 			LOG_DEBUG(VHOST_CONFIG,
3700 				"in MAIN: mbuf count in  ring at initial  is :"
3701 				" %d\n",
3702 				rte_ring_count(vpool_array[index].ring));
3703 		}
3704 
3705 		RTE_LCORE_FOREACH_SLAVE(lcore_id)
3706 			rte_eal_remote_launch(switch_worker_zcp, NULL,
3707 				lcore_id);
3708 	}
3709 
3710 	/* Register CUSE device to handle IOCTLs. */
3711 	ret = register_cuse_device((char*)&dev_basename, dev_index, get_virtio_net_callbacks());
3712 	if (ret != 0)
3713 		rte_exit(EXIT_FAILURE,"CUSE device setup failure.\n");
3714 
3715 	init_virtio_net(&virtio_net_device_ops);
3716 
3717 	/* Start CUSE session. */
3718 	start_cuse_session_loop();
3719 	return 0;
3720 
3721 }
3722 
3723