xref: /dpdk/drivers/net/memif/rte_eth_memif.c (revision f8dbaebbf1c9efcbb2e2354b341ed62175466a57)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright 2018-2019 Cisco Systems, Inc.  All rights reserved.
3  */
4 
5 #include <stdlib.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <sys/un.h>
11 #include <sys/ioctl.h>
12 #include <sys/mman.h>
13 #include <linux/if_ether.h>
14 #include <errno.h>
15 #include <sys/eventfd.h>
16 
17 #include <rte_version.h>
18 #include <rte_mbuf.h>
19 #include <rte_ether.h>
20 #include <ethdev_driver.h>
21 #include <ethdev_vdev.h>
22 #include <rte_malloc.h>
23 #include <rte_kvargs.h>
24 #include <rte_bus_vdev.h>
25 #include <rte_string_fns.h>
26 #include <rte_errno.h>
27 #include <rte_memory.h>
28 #include <rte_memzone.h>
29 #include <rte_eal_memconfig.h>
30 
31 #include "rte_eth_memif.h"
32 #include "memif_socket.h"
33 
34 #define ETH_MEMIF_ID_ARG		"id"
35 #define ETH_MEMIF_ROLE_ARG		"role"
36 #define ETH_MEMIF_PKT_BUFFER_SIZE_ARG	"bsize"
37 #define ETH_MEMIF_RING_SIZE_ARG		"rsize"
38 #define ETH_MEMIF_SOCKET_ARG		"socket"
39 #define ETH_MEMIF_SOCKET_ABSTRACT_ARG	"socket-abstract"
40 #define ETH_MEMIF_MAC_ARG		"mac"
41 #define ETH_MEMIF_ZC_ARG		"zero-copy"
42 #define ETH_MEMIF_SECRET_ARG		"secret"
43 
44 static const char * const valid_arguments[] = {
45 	ETH_MEMIF_ID_ARG,
46 	ETH_MEMIF_ROLE_ARG,
47 	ETH_MEMIF_PKT_BUFFER_SIZE_ARG,
48 	ETH_MEMIF_RING_SIZE_ARG,
49 	ETH_MEMIF_SOCKET_ARG,
50 	ETH_MEMIF_SOCKET_ABSTRACT_ARG,
51 	ETH_MEMIF_MAC_ARG,
52 	ETH_MEMIF_ZC_ARG,
53 	ETH_MEMIF_SECRET_ARG,
54 	NULL
55 };
56 
57 static const struct rte_eth_link pmd_link = {
58 	.link_speed = RTE_ETH_SPEED_NUM_10G,
59 	.link_duplex = RTE_ETH_LINK_FULL_DUPLEX,
60 	.link_status = RTE_ETH_LINK_DOWN,
61 	.link_autoneg = RTE_ETH_LINK_AUTONEG
62 };
63 
64 #define MEMIF_MP_SEND_REGION		"memif_mp_send_region"
65 
66 
67 static int memif_region_init_zc(const struct rte_memseg_list *msl,
68 				const struct rte_memseg *ms, void *arg);
69 
70 const char *
71 memif_version(void)
72 {
73 	return ("memif-" RTE_STR(MEMIF_VERSION_MAJOR) "." RTE_STR(MEMIF_VERSION_MINOR));
74 }
75 
76 /* Message header to synchronize regions */
77 struct mp_region_msg {
78 	char port_name[RTE_DEV_NAME_MAX_LEN];
79 	memif_region_index_t idx;
80 	memif_region_size_t size;
81 };
82 
83 static int
84 memif_mp_send_region(const struct rte_mp_msg *msg, const void *peer)
85 {
86 	struct rte_eth_dev *dev;
87 	struct pmd_process_private *proc_private;
88 	const struct mp_region_msg *msg_param = (const struct mp_region_msg *)msg->param;
89 	struct rte_mp_msg reply;
90 	struct mp_region_msg *reply_param = (struct mp_region_msg *)reply.param;
91 	uint16_t port_id;
92 	int ret;
93 
94 	/* Get requested port */
95 	ret = rte_eth_dev_get_port_by_name(msg_param->port_name, &port_id);
96 	if (ret) {
97 		MIF_LOG(ERR, "Failed to get port id for %s",
98 			msg_param->port_name);
99 		return -1;
100 	}
101 	dev = &rte_eth_devices[port_id];
102 	proc_private = dev->process_private;
103 
104 	memset(&reply, 0, sizeof(reply));
105 	strlcpy(reply.name, msg->name, sizeof(reply.name));
106 	reply_param->idx = msg_param->idx;
107 	if (proc_private->regions[msg_param->idx] != NULL) {
108 		reply_param->size = proc_private->regions[msg_param->idx]->region_size;
109 		reply.fds[0] = proc_private->regions[msg_param->idx]->fd;
110 		reply.num_fds = 1;
111 	}
112 	reply.len_param = sizeof(*reply_param);
113 	if (rte_mp_reply(&reply, peer) < 0) {
114 		MIF_LOG(ERR, "Failed to reply to an add region request");
115 		return -1;
116 	}
117 
118 	return 0;
119 }
120 
121 /*
122  * Request regions
123  * Called by secondary process, when ports link status goes up.
124  */
125 static int
126 memif_mp_request_regions(struct rte_eth_dev *dev)
127 {
128 	int ret, i;
129 	struct timespec timeout = {.tv_sec = 5, .tv_nsec = 0};
130 	struct rte_mp_msg msg, *reply;
131 	struct rte_mp_reply replies;
132 	struct mp_region_msg *msg_param = (struct mp_region_msg *)msg.param;
133 	struct mp_region_msg *reply_param;
134 	struct memif_region *r;
135 	struct pmd_process_private *proc_private = dev->process_private;
136 	struct pmd_internals *pmd = dev->data->dev_private;
137 	/* in case of zero-copy client, only request region 0 */
138 	uint16_t max_region_num = (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) ?
139 				   1 : ETH_MEMIF_MAX_REGION_NUM;
140 
141 	MIF_LOG(DEBUG, "Requesting memory regions");
142 
143 	for (i = 0; i < max_region_num; i++) {
144 		/* Prepare the message */
145 		memset(&msg, 0, sizeof(msg));
146 		strlcpy(msg.name, MEMIF_MP_SEND_REGION, sizeof(msg.name));
147 		strlcpy(msg_param->port_name, dev->data->name,
148 			sizeof(msg_param->port_name));
149 		msg_param->idx = i;
150 		msg.len_param = sizeof(*msg_param);
151 
152 		/* Send message */
153 		ret = rte_mp_request_sync(&msg, &replies, &timeout);
154 		if (ret < 0 || replies.nb_received != 1) {
155 			MIF_LOG(ERR, "Failed to send mp msg: %d",
156 				rte_errno);
157 			return -1;
158 		}
159 
160 		reply = &replies.msgs[0];
161 		reply_param = (struct mp_region_msg *)reply->param;
162 
163 		if (reply_param->size > 0) {
164 			r = rte_zmalloc("region", sizeof(struct memif_region), 0);
165 			if (r == NULL) {
166 				MIF_LOG(ERR, "Failed to alloc memif region.");
167 				free(reply);
168 				return -ENOMEM;
169 			}
170 			r->region_size = reply_param->size;
171 			if (reply->num_fds < 1) {
172 				MIF_LOG(ERR, "Missing file descriptor.");
173 				free(reply);
174 				return -1;
175 			}
176 			r->fd = reply->fds[0];
177 			r->addr = NULL;
178 
179 			proc_private->regions[reply_param->idx] = r;
180 			proc_private->regions_num++;
181 		}
182 		free(reply);
183 	}
184 
185 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
186 		ret = rte_memseg_walk(memif_region_init_zc, (void *)proc_private);
187 		if (ret < 0)
188 			return ret;
189 	}
190 
191 	return memif_connect(dev);
192 }
193 
194 static int
195 memif_dev_info(struct rte_eth_dev *dev __rte_unused, struct rte_eth_dev_info *dev_info)
196 {
197 	dev_info->max_mac_addrs = 1;
198 	dev_info->max_rx_pktlen = RTE_ETHER_MAX_LEN;
199 	dev_info->max_rx_queues = ETH_MEMIF_MAX_NUM_Q_PAIRS;
200 	dev_info->max_tx_queues = ETH_MEMIF_MAX_NUM_Q_PAIRS;
201 	dev_info->min_rx_bufsize = 0;
202 	dev_info->tx_offload_capa = RTE_ETH_TX_OFFLOAD_MULTI_SEGS;
203 
204 	return 0;
205 }
206 
207 static memif_ring_t *
208 memif_get_ring(struct pmd_internals *pmd, struct pmd_process_private *proc_private,
209 	       memif_ring_type_t type, uint16_t ring_num)
210 {
211 	/* rings only in region 0 */
212 	void *p = proc_private->regions[0]->addr;
213 	int ring_size = sizeof(memif_ring_t) + sizeof(memif_desc_t) *
214 	    (1 << pmd->run.log2_ring_size);
215 
216 	p = (uint8_t *)p + (ring_num + type * pmd->run.num_c2s_rings) * ring_size;
217 
218 	return (memif_ring_t *)p;
219 }
220 
221 static memif_region_offset_t
222 memif_get_ring_offset(struct rte_eth_dev *dev, struct memif_queue *mq,
223 		      memif_ring_type_t type, uint16_t num)
224 {
225 	struct pmd_internals *pmd = dev->data->dev_private;
226 	struct pmd_process_private *proc_private = dev->process_private;
227 
228 	return ((uint8_t *)memif_get_ring(pmd, proc_private, type, num) -
229 		(uint8_t *)proc_private->regions[mq->region]->addr);
230 }
231 
232 static memif_ring_t *
233 memif_get_ring_from_queue(struct pmd_process_private *proc_private,
234 			  struct memif_queue *mq)
235 {
236 	struct memif_region *r;
237 
238 	r = proc_private->regions[mq->region];
239 	if (r == NULL)
240 		return NULL;
241 
242 	return (memif_ring_t *)((uint8_t *)r->addr + mq->ring_offset);
243 }
244 
245 static void *
246 memif_get_buffer(struct pmd_process_private *proc_private, memif_desc_t *d)
247 {
248 	return ((uint8_t *)proc_private->regions[d->region]->addr + d->offset);
249 }
250 
251 /* Free mbufs received by server */
252 static void
253 memif_free_stored_mbufs(struct pmd_process_private *proc_private, struct memif_queue *mq)
254 {
255 	uint16_t cur_tail;
256 	uint16_t mask = (1 << mq->log2_ring_size) - 1;
257 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
258 
259 	/* FIXME: improve performance */
260 	/* The ring->tail acts as a guard variable between Tx and Rx
261 	 * threads, so using load-acquire pairs with store-release
262 	 * in function eth_memif_rx for C2S queues.
263 	 */
264 	cur_tail = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
265 	while (mq->last_tail != cur_tail) {
266 		RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail + 1) & mask]);
267 		/* Decrement refcnt and free mbuf. (current segment) */
268 		rte_mbuf_refcnt_update(mq->buffers[mq->last_tail & mask], -1);
269 		rte_pktmbuf_free_seg(mq->buffers[mq->last_tail & mask]);
270 		mq->last_tail++;
271 	}
272 }
273 
274 static int
275 memif_pktmbuf_chain(struct rte_mbuf *head, struct rte_mbuf *cur_tail,
276 		    struct rte_mbuf *tail)
277 {
278 	/* Check for number-of-segments-overflow */
279 	if (unlikely(head->nb_segs + tail->nb_segs > RTE_MBUF_MAX_NB_SEGS))
280 		return -EOVERFLOW;
281 
282 	/* Chain 'tail' onto the old tail */
283 	cur_tail->next = tail;
284 
285 	/* accumulate number of segments and total length. */
286 	head->nb_segs = (uint16_t)(head->nb_segs + tail->nb_segs);
287 
288 	tail->pkt_len = tail->data_len;
289 	head->pkt_len += tail->pkt_len;
290 
291 	return 0;
292 }
293 
294 static uint16_t
295 eth_memif_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
296 {
297 	struct memif_queue *mq = queue;
298 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
299 	struct pmd_process_private *proc_private =
300 		rte_eth_devices[mq->in_port].process_private;
301 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
302 	uint16_t cur_slot, last_slot, n_slots, ring_size, mask, s0;
303 	uint16_t n_rx_pkts = 0;
304 	uint16_t mbuf_size = rte_pktmbuf_data_room_size(mq->mempool) -
305 		RTE_PKTMBUF_HEADROOM;
306 	uint16_t src_len, src_off, dst_len, dst_off, cp_len;
307 	memif_ring_type_t type = mq->type;
308 	memif_desc_t *d0;
309 	struct rte_mbuf *mbuf, *mbuf_head, *mbuf_tail;
310 	uint64_t b;
311 	ssize_t size __rte_unused;
312 	uint16_t head;
313 	int ret;
314 	struct rte_eth_link link;
315 
316 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
317 		return 0;
318 	if (unlikely(ring == NULL)) {
319 		/* Secondary process will attempt to request regions. */
320 		ret = rte_eth_link_get(mq->in_port, &link);
321 		if (ret < 0)
322 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
323 				mq->in_port, rte_strerror(-ret));
324 		return 0;
325 	}
326 
327 	/* consume interrupt */
328 	if (((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) &&
329 	    (rte_intr_fd_get(mq->intr_handle) >= 0))
330 		size = read(rte_intr_fd_get(mq->intr_handle), &b,
331 			    sizeof(b));
332 
333 	ring_size = 1 << mq->log2_ring_size;
334 	mask = ring_size - 1;
335 
336 	if (type == MEMIF_RING_C2S) {
337 		cur_slot = mq->last_head;
338 		last_slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
339 	} else {
340 		cur_slot = mq->last_tail;
341 		last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
342 	}
343 
344 	if (cur_slot == last_slot)
345 		goto refill;
346 	n_slots = last_slot - cur_slot;
347 
348 	while (n_slots && n_rx_pkts < nb_pkts) {
349 		mbuf_head = rte_pktmbuf_alloc(mq->mempool);
350 		if (unlikely(mbuf_head == NULL))
351 			goto no_free_bufs;
352 		mbuf = mbuf_head;
353 		mbuf->port = mq->in_port;
354 
355 next_slot:
356 		s0 = cur_slot & mask;
357 		d0 = &ring->desc[s0];
358 
359 		src_len = d0->length;
360 		dst_off = 0;
361 		src_off = 0;
362 
363 		do {
364 			dst_len = mbuf_size - dst_off;
365 			if (dst_len == 0) {
366 				dst_off = 0;
367 				dst_len = mbuf_size;
368 
369 				/* store pointer to tail */
370 				mbuf_tail = mbuf;
371 				mbuf = rte_pktmbuf_alloc(mq->mempool);
372 				if (unlikely(mbuf == NULL))
373 					goto no_free_bufs;
374 				mbuf->port = mq->in_port;
375 				ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
376 				if (unlikely(ret < 0)) {
377 					MIF_LOG(ERR, "number-of-segments-overflow");
378 					rte_pktmbuf_free(mbuf);
379 					goto no_free_bufs;
380 				}
381 			}
382 			cp_len = RTE_MIN(dst_len, src_len);
383 
384 			rte_pktmbuf_data_len(mbuf) += cp_len;
385 			rte_pktmbuf_pkt_len(mbuf) = rte_pktmbuf_data_len(mbuf);
386 			if (mbuf != mbuf_head)
387 				rte_pktmbuf_pkt_len(mbuf_head) += cp_len;
388 
389 			rte_memcpy(rte_pktmbuf_mtod_offset(mbuf, void *,
390 							   dst_off),
391 				(uint8_t *)memif_get_buffer(proc_private, d0) +
392 				src_off, cp_len);
393 
394 			src_off += cp_len;
395 			dst_off += cp_len;
396 			src_len -= cp_len;
397 		} while (src_len);
398 
399 		cur_slot++;
400 		n_slots--;
401 
402 		if (d0->flags & MEMIF_DESC_FLAG_NEXT)
403 			goto next_slot;
404 
405 		mq->n_bytes += rte_pktmbuf_pkt_len(mbuf_head);
406 		*bufs++ = mbuf_head;
407 		n_rx_pkts++;
408 	}
409 
410 no_free_bufs:
411 	if (type == MEMIF_RING_C2S) {
412 		__atomic_store_n(&ring->tail, cur_slot, __ATOMIC_RELEASE);
413 		mq->last_head = cur_slot;
414 	} else {
415 		mq->last_tail = cur_slot;
416 	}
417 
418 refill:
419 	if (type == MEMIF_RING_S2C) {
420 		/* ring->head is updated by the receiver and this function
421 		 * is called in the context of receiver thread. The loads in
422 		 * the receiver do not need to synchronize with its own stores.
423 		 */
424 		head = __atomic_load_n(&ring->head, __ATOMIC_RELAXED);
425 		n_slots = ring_size - head + mq->last_tail;
426 
427 		while (n_slots--) {
428 			s0 = head++ & mask;
429 			d0 = &ring->desc[s0];
430 			d0->length = pmd->run.pkt_buffer_size;
431 		}
432 		__atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
433 	}
434 
435 	mq->n_pkts += n_rx_pkts;
436 	return n_rx_pkts;
437 }
438 
439 static uint16_t
440 eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
441 {
442 	struct memif_queue *mq = queue;
443 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
444 	struct pmd_process_private *proc_private =
445 		rte_eth_devices[mq->in_port].process_private;
446 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
447 	uint16_t cur_slot, last_slot, n_slots, ring_size, mask, s0, head;
448 	uint16_t n_rx_pkts = 0;
449 	memif_desc_t *d0;
450 	struct rte_mbuf *mbuf, *mbuf_tail;
451 	struct rte_mbuf *mbuf_head = NULL;
452 	int ret;
453 	struct rte_eth_link link;
454 
455 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
456 		return 0;
457 	if (unlikely(ring == NULL)) {
458 		/* Secondary process will attempt to request regions. */
459 		rte_eth_link_get(mq->in_port, &link);
460 		return 0;
461 	}
462 
463 	/* consume interrupt */
464 	if ((rte_intr_fd_get(mq->intr_handle) >= 0) &&
465 	    ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0)) {
466 		uint64_t b;
467 		ssize_t size __rte_unused;
468 		size = read(rte_intr_fd_get(mq->intr_handle), &b,
469 			    sizeof(b));
470 	}
471 
472 	ring_size = 1 << mq->log2_ring_size;
473 	mask = ring_size - 1;
474 
475 	cur_slot = mq->last_tail;
476 	/* The ring->tail acts as a guard variable between Tx and Rx
477 	 * threads, so using load-acquire pairs with store-release
478 	 * to synchronize it between threads.
479 	 */
480 	last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
481 	if (cur_slot == last_slot)
482 		goto refill;
483 	n_slots = last_slot - cur_slot;
484 
485 	while (n_slots && n_rx_pkts < nb_pkts) {
486 		s0 = cur_slot & mask;
487 
488 		d0 = &ring->desc[s0];
489 		mbuf_head = mq->buffers[s0];
490 		mbuf = mbuf_head;
491 
492 next_slot:
493 		/* prefetch next descriptor */
494 		if (n_rx_pkts + 1 < nb_pkts)
495 			rte_prefetch0(&ring->desc[(cur_slot + 1) & mask]);
496 
497 		mbuf->port = mq->in_port;
498 		rte_pktmbuf_data_len(mbuf) = d0->length;
499 		rte_pktmbuf_pkt_len(mbuf) = rte_pktmbuf_data_len(mbuf);
500 
501 		mq->n_bytes += rte_pktmbuf_data_len(mbuf);
502 
503 		cur_slot++;
504 		n_slots--;
505 		if (d0->flags & MEMIF_DESC_FLAG_NEXT) {
506 			s0 = cur_slot & mask;
507 			d0 = &ring->desc[s0];
508 			mbuf_tail = mbuf;
509 			mbuf = mq->buffers[s0];
510 			ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
511 			if (unlikely(ret < 0)) {
512 				MIF_LOG(ERR, "number-of-segments-overflow");
513 				goto refill;
514 			}
515 			goto next_slot;
516 		}
517 
518 		*bufs++ = mbuf_head;
519 		n_rx_pkts++;
520 	}
521 
522 	mq->last_tail = cur_slot;
523 
524 /* Supply server with new buffers */
525 refill:
526 	/* ring->head is updated by the receiver and this function
527 	 * is called in the context of receiver thread. The loads in
528 	 * the receiver do not need to synchronize with its own stores.
529 	 */
530 	head = __atomic_load_n(&ring->head, __ATOMIC_RELAXED);
531 	n_slots = ring_size - head + mq->last_tail;
532 
533 	if (n_slots < 32)
534 		goto no_free_mbufs;
535 
536 	ret = rte_pktmbuf_alloc_bulk(mq->mempool, &mq->buffers[head & mask], n_slots);
537 	if (unlikely(ret < 0))
538 		goto no_free_mbufs;
539 
540 	while (n_slots--) {
541 		s0 = head++ & mask;
542 		if (n_slots > 0)
543 			rte_prefetch0(mq->buffers[head & mask]);
544 		d0 = &ring->desc[s0];
545 		/* store buffer header */
546 		mbuf = mq->buffers[s0];
547 		/* populate descriptor */
548 		d0->length = rte_pktmbuf_data_room_size(mq->mempool) -
549 				RTE_PKTMBUF_HEADROOM;
550 		d0->region = 1;
551 		d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
552 			(uint8_t *)proc_private->regions[d0->region]->addr;
553 	}
554 no_free_mbufs:
555 	/* The ring->head acts as a guard variable between Tx and Rx
556 	 * threads, so using store-release pairs with load-acquire
557 	 * in function eth_memif_tx.
558 	 */
559 	__atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
560 
561 	mq->n_pkts += n_rx_pkts;
562 
563 	return n_rx_pkts;
564 }
565 
566 static uint16_t
567 eth_memif_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
568 {
569 	struct memif_queue *mq = queue;
570 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
571 	struct pmd_process_private *proc_private =
572 		rte_eth_devices[mq->in_port].process_private;
573 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
574 	uint16_t slot, saved_slot, n_free, ring_size, mask, n_tx_pkts = 0;
575 	uint16_t src_len, src_off, dst_len, dst_off, cp_len, nb_segs;
576 	memif_ring_type_t type = mq->type;
577 	memif_desc_t *d0;
578 	struct rte_mbuf *mbuf;
579 	struct rte_mbuf *mbuf_head;
580 	uint64_t a;
581 	ssize_t size;
582 	struct rte_eth_link link;
583 
584 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
585 		return 0;
586 	if (unlikely(ring == NULL)) {
587 		int ret;
588 
589 		/* Secondary process will attempt to request regions. */
590 		ret = rte_eth_link_get(mq->in_port, &link);
591 		if (ret < 0)
592 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
593 				mq->in_port, rte_strerror(-ret));
594 		return 0;
595 	}
596 
597 	ring_size = 1 << mq->log2_ring_size;
598 	mask = ring_size - 1;
599 
600 	if (type == MEMIF_RING_C2S) {
601 		/* For C2S queues ring->head is updated by the sender and
602 		 * this function is called in the context of sending thread.
603 		 * The loads in the sender do not need to synchronize with
604 		 * its own stores. Hence, the following load can be a
605 		 * relaxed load.
606 		 */
607 		slot = __atomic_load_n(&ring->head, __ATOMIC_RELAXED);
608 		n_free = ring_size - slot +
609 				__atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
610 	} else {
611 		/* For S2C queues ring->tail is updated by the sender and
612 		 * this function is called in the context of sending thread.
613 		 * The loads in the sender do not need to synchronize with
614 		 * its own stores. Hence, the following load can be a
615 		 * relaxed load.
616 		 */
617 		slot = __atomic_load_n(&ring->tail, __ATOMIC_RELAXED);
618 		n_free = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE) - slot;
619 	}
620 
621 	while (n_tx_pkts < nb_pkts && n_free) {
622 		mbuf_head = *bufs++;
623 		nb_segs = mbuf_head->nb_segs;
624 		mbuf = mbuf_head;
625 
626 		saved_slot = slot;
627 		d0 = &ring->desc[slot & mask];
628 		dst_off = 0;
629 		dst_len = (type == MEMIF_RING_C2S) ?
630 			pmd->run.pkt_buffer_size : d0->length;
631 
632 next_in_chain:
633 		src_off = 0;
634 		src_len = rte_pktmbuf_data_len(mbuf);
635 
636 		while (src_len) {
637 			if (dst_len == 0) {
638 				if (n_free) {
639 					slot++;
640 					n_free--;
641 					d0->flags |= MEMIF_DESC_FLAG_NEXT;
642 					d0 = &ring->desc[slot & mask];
643 					dst_off = 0;
644 					dst_len = (type == MEMIF_RING_C2S) ?
645 					    pmd->run.pkt_buffer_size : d0->length;
646 					d0->flags = 0;
647 				} else {
648 					slot = saved_slot;
649 					goto no_free_slots;
650 				}
651 			}
652 			cp_len = RTE_MIN(dst_len, src_len);
653 
654 			rte_memcpy((uint8_t *)memif_get_buffer(proc_private,
655 							       d0) + dst_off,
656 				rte_pktmbuf_mtod_offset(mbuf, void *, src_off),
657 				cp_len);
658 
659 			mq->n_bytes += cp_len;
660 			src_off += cp_len;
661 			dst_off += cp_len;
662 			src_len -= cp_len;
663 			dst_len -= cp_len;
664 
665 			d0->length = dst_off;
666 		}
667 
668 		if (--nb_segs > 0) {
669 			mbuf = mbuf->next;
670 			goto next_in_chain;
671 		}
672 
673 		n_tx_pkts++;
674 		slot++;
675 		n_free--;
676 		rte_pktmbuf_free(mbuf_head);
677 	}
678 
679 no_free_slots:
680 	if (type == MEMIF_RING_C2S)
681 		__atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
682 	else
683 		__atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE);
684 
685 	if (((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) &&
686 	    (rte_intr_fd_get(mq->intr_handle) >= 0)) {
687 		a = 1;
688 		size = write(rte_intr_fd_get(mq->intr_handle), &a,
689 			     sizeof(a));
690 		if (unlikely(size < 0)) {
691 			MIF_LOG(WARNING,
692 				"Failed to send interrupt. %s", strerror(errno));
693 		}
694 	}
695 
696 	mq->n_pkts += n_tx_pkts;
697 	return n_tx_pkts;
698 }
699 
700 
701 static int
702 memif_tx_one_zc(struct pmd_process_private *proc_private, struct memif_queue *mq,
703 		memif_ring_t *ring, struct rte_mbuf *mbuf, const uint16_t mask,
704 		uint16_t slot, uint16_t n_free)
705 {
706 	memif_desc_t *d0;
707 	uint16_t nb_segs = mbuf->nb_segs;
708 	int used_slots = 1;
709 
710 next_in_chain:
711 	/* store pointer to mbuf to free it later */
712 	mq->buffers[slot & mask] = mbuf;
713 	/* Increment refcnt to make sure the buffer is not freed before server
714 	 * receives it. (current segment)
715 	 */
716 	rte_mbuf_refcnt_update(mbuf, 1);
717 	/* populate descriptor */
718 	d0 = &ring->desc[slot & mask];
719 	d0->length = rte_pktmbuf_data_len(mbuf);
720 	mq->n_bytes += rte_pktmbuf_data_len(mbuf);
721 	/* FIXME: get region index */
722 	d0->region = 1;
723 	d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
724 		(uint8_t *)proc_private->regions[d0->region]->addr;
725 	d0->flags = 0;
726 
727 	/* check if buffer is chained */
728 	if (--nb_segs > 0) {
729 		if (n_free < 2)
730 			return 0;
731 		/* mark buffer as chained */
732 		d0->flags |= MEMIF_DESC_FLAG_NEXT;
733 		/* advance mbuf */
734 		mbuf = mbuf->next;
735 		/* update counters */
736 		used_slots++;
737 		slot++;
738 		n_free--;
739 		goto next_in_chain;
740 	}
741 	return used_slots;
742 }
743 
744 static uint16_t
745 eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
746 {
747 	struct memif_queue *mq = queue;
748 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
749 	struct pmd_process_private *proc_private =
750 		rte_eth_devices[mq->in_port].process_private;
751 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
752 	uint16_t slot, n_free, ring_size, mask, n_tx_pkts = 0;
753 	struct rte_eth_link link;
754 
755 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
756 		return 0;
757 	if (unlikely(ring == NULL)) {
758 		/* Secondary process will attempt to request regions. */
759 		rte_eth_link_get(mq->in_port, &link);
760 		return 0;
761 	}
762 
763 	ring_size = 1 << mq->log2_ring_size;
764 	mask = ring_size - 1;
765 
766 	/* free mbufs received by server */
767 	memif_free_stored_mbufs(proc_private, mq);
768 
769 	/* ring type always MEMIF_RING_C2S */
770 	/* For C2S queues ring->head is updated by the sender and
771 	 * this function is called in the context of sending thread.
772 	 * The loads in the sender do not need to synchronize with
773 	 * its own stores. Hence, the following load can be a
774 	 * relaxed load.
775 	 */
776 	slot = __atomic_load_n(&ring->head, __ATOMIC_RELAXED);
777 	n_free = ring_size - slot + mq->last_tail;
778 
779 	int used_slots;
780 
781 	while (n_free && (n_tx_pkts < nb_pkts)) {
782 		while ((n_free > 4) && ((nb_pkts - n_tx_pkts) > 4)) {
783 			if ((nb_pkts - n_tx_pkts) > 8) {
784 				rte_prefetch0(*bufs + 4);
785 				rte_prefetch0(*bufs + 5);
786 				rte_prefetch0(*bufs + 6);
787 				rte_prefetch0(*bufs + 7);
788 			}
789 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
790 				mask, slot, n_free);
791 			if (unlikely(used_slots < 1))
792 				goto no_free_slots;
793 			n_tx_pkts++;
794 			slot += used_slots;
795 			n_free -= used_slots;
796 
797 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
798 				mask, slot, n_free);
799 			if (unlikely(used_slots < 1))
800 				goto no_free_slots;
801 			n_tx_pkts++;
802 			slot += used_slots;
803 			n_free -= used_slots;
804 
805 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
806 				mask, slot, n_free);
807 			if (unlikely(used_slots < 1))
808 				goto no_free_slots;
809 			n_tx_pkts++;
810 			slot += used_slots;
811 			n_free -= used_slots;
812 
813 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
814 				mask, slot, n_free);
815 			if (unlikely(used_slots < 1))
816 				goto no_free_slots;
817 			n_tx_pkts++;
818 			slot += used_slots;
819 			n_free -= used_slots;
820 		}
821 		used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
822 			mask, slot, n_free);
823 		if (unlikely(used_slots < 1))
824 			goto no_free_slots;
825 		n_tx_pkts++;
826 		slot += used_slots;
827 		n_free -= used_slots;
828 	}
829 
830 no_free_slots:
831 	/* ring type always MEMIF_RING_C2S */
832 	/* The ring->head acts as a guard variable between Tx and Rx
833 	 * threads, so using store-release pairs with load-acquire
834 	 * in function eth_memif_rx for C2S rings.
835 	 */
836 	__atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
837 
838 	/* Send interrupt, if enabled. */
839 	if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
840 		uint64_t a = 1;
841 		if (rte_intr_fd_get(mq->intr_handle) < 0)
842 			return -1;
843 
844 		ssize_t size = write(rte_intr_fd_get(mq->intr_handle),
845 				     &a, sizeof(a));
846 		if (unlikely(size < 0)) {
847 			MIF_LOG(WARNING,
848 				"Failed to send interrupt. %s", strerror(errno));
849 		}
850 	}
851 
852 	/* increment queue counters */
853 	mq->n_pkts += n_tx_pkts;
854 
855 	return n_tx_pkts;
856 }
857 
858 void
859 memif_free_regions(struct rte_eth_dev *dev)
860 {
861 	struct pmd_process_private *proc_private = dev->process_private;
862 	struct pmd_internals *pmd = dev->data->dev_private;
863 	int i;
864 	struct memif_region *r;
865 
866 	/* regions are allocated contiguously, so it's
867 	 * enough to loop until 'proc_private->regions_num'
868 	 */
869 	for (i = 0; i < proc_private->regions_num; i++) {
870 		r = proc_private->regions[i];
871 		if (r != NULL) {
872 			/* This is memzone */
873 			if (i > 0 && (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)) {
874 				r->addr = NULL;
875 				if (r->fd > 0)
876 					close(r->fd);
877 			}
878 			if (r->addr != NULL) {
879 				munmap(r->addr, r->region_size);
880 				if (r->fd > 0) {
881 					close(r->fd);
882 					r->fd = -1;
883 				}
884 			}
885 			rte_free(r);
886 			proc_private->regions[i] = NULL;
887 		}
888 	}
889 	proc_private->regions_num = 0;
890 }
891 
892 static int
893 memif_region_init_zc(const struct rte_memseg_list *msl, const struct rte_memseg *ms,
894 		     void *arg)
895 {
896 	struct pmd_process_private *proc_private = (struct pmd_process_private *)arg;
897 	struct memif_region *r;
898 
899 	if (proc_private->regions_num < 1) {
900 		MIF_LOG(ERR, "Missing descriptor region");
901 		return -1;
902 	}
903 
904 	r = proc_private->regions[proc_private->regions_num - 1];
905 
906 	if (r->addr != msl->base_va)
907 		r = proc_private->regions[++proc_private->regions_num - 1];
908 
909 	if (r == NULL) {
910 		r = rte_zmalloc("region", sizeof(struct memif_region), 0);
911 		if (r == NULL) {
912 			MIF_LOG(ERR, "Failed to alloc memif region.");
913 			return -ENOMEM;
914 		}
915 
916 		r->addr = msl->base_va;
917 		r->region_size = ms->len;
918 		r->fd = rte_memseg_get_fd(ms);
919 		if (r->fd < 0)
920 			return -1;
921 		r->pkt_buffer_offset = 0;
922 
923 		proc_private->regions[proc_private->regions_num - 1] = r;
924 	} else {
925 		r->region_size += ms->len;
926 	}
927 
928 	return 0;
929 }
930 
931 static int
932 memif_region_init_shm(struct rte_eth_dev *dev, uint8_t has_buffers)
933 {
934 	struct pmd_internals *pmd = dev->data->dev_private;
935 	struct pmd_process_private *proc_private = dev->process_private;
936 	char shm_name[ETH_MEMIF_SHM_NAME_SIZE];
937 	int ret = 0;
938 	struct memif_region *r;
939 
940 	if (proc_private->regions_num >= ETH_MEMIF_MAX_REGION_NUM) {
941 		MIF_LOG(ERR, "Too many regions.");
942 		return -1;
943 	}
944 
945 	r = rte_zmalloc("region", sizeof(struct memif_region), 0);
946 	if (r == NULL) {
947 		MIF_LOG(ERR, "Failed to alloc memif region.");
948 		return -ENOMEM;
949 	}
950 
951 	/* calculate buffer offset */
952 	r->pkt_buffer_offset = (pmd->run.num_c2s_rings + pmd->run.num_s2c_rings) *
953 	    (sizeof(memif_ring_t) + sizeof(memif_desc_t) *
954 	    (1 << pmd->run.log2_ring_size));
955 
956 	r->region_size = r->pkt_buffer_offset;
957 	/* if region has buffers, add buffers size to region_size */
958 	if (has_buffers == 1)
959 		r->region_size += (uint32_t)(pmd->run.pkt_buffer_size *
960 			(1 << pmd->run.log2_ring_size) *
961 			(pmd->run.num_c2s_rings +
962 			 pmd->run.num_s2c_rings));
963 
964 	memset(shm_name, 0, sizeof(char) * ETH_MEMIF_SHM_NAME_SIZE);
965 	snprintf(shm_name, ETH_MEMIF_SHM_NAME_SIZE, "memif_region_%d",
966 		 proc_private->regions_num);
967 
968 	r->fd = memfd_create(shm_name, MFD_ALLOW_SEALING);
969 	if (r->fd < 0) {
970 		MIF_LOG(ERR, "Failed to create shm file: %s.", strerror(errno));
971 		ret = -1;
972 		goto error;
973 	}
974 
975 	ret = fcntl(r->fd, F_ADD_SEALS, F_SEAL_SHRINK);
976 	if (ret < 0) {
977 		MIF_LOG(ERR, "Failed to add seals to shm file: %s.", strerror(errno));
978 		goto error;
979 	}
980 
981 	ret = ftruncate(r->fd, r->region_size);
982 	if (ret < 0) {
983 		MIF_LOG(ERR, "Failed to truncate shm file: %s.", strerror(errno));
984 		goto error;
985 	}
986 
987 	r->addr = mmap(NULL, r->region_size, PROT_READ |
988 		       PROT_WRITE, MAP_SHARED, r->fd, 0);
989 	if (r->addr == MAP_FAILED) {
990 		MIF_LOG(ERR, "Failed to mmap shm region: %s.", strerror(ret));
991 		ret = -1;
992 		goto error;
993 	}
994 
995 	proc_private->regions[proc_private->regions_num] = r;
996 	proc_private->regions_num++;
997 
998 	return ret;
999 
1000 error:
1001 	if (r->fd > 0)
1002 		close(r->fd);
1003 	r->fd = -1;
1004 
1005 	return ret;
1006 }
1007 
1008 static int
1009 memif_regions_init(struct rte_eth_dev *dev)
1010 {
1011 	struct pmd_internals *pmd = dev->data->dev_private;
1012 	int ret;
1013 
1014 	/*
1015 	 * Zero-copy exposes dpdk memory.
1016 	 * Each memseg list will be represented by memif region.
1017 	 * Zero-copy regions indexing: memseg list idx + 1,
1018 	 * as we already have region 0 reserved for descriptors.
1019 	 */
1020 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1021 		/* create region idx 0 containing descriptors */
1022 		ret = memif_region_init_shm(dev, 0);
1023 		if (ret < 0)
1024 			return ret;
1025 		ret = rte_memseg_walk(memif_region_init_zc, (void *)dev->process_private);
1026 		if (ret < 0)
1027 			return ret;
1028 	} else {
1029 		/* create one memory region contaning rings and buffers */
1030 		ret = memif_region_init_shm(dev, /* has buffers */ 1);
1031 		if (ret < 0)
1032 			return ret;
1033 	}
1034 
1035 	return 0;
1036 }
1037 
1038 static void
1039 memif_init_rings(struct rte_eth_dev *dev)
1040 {
1041 	struct pmd_internals *pmd = dev->data->dev_private;
1042 	struct pmd_process_private *proc_private = dev->process_private;
1043 	memif_ring_t *ring;
1044 	int i, j;
1045 	uint16_t slot;
1046 
1047 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1048 		ring = memif_get_ring(pmd, proc_private, MEMIF_RING_C2S, i);
1049 		__atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
1050 		__atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
1051 		ring->cookie = MEMIF_COOKIE;
1052 		ring->flags = 0;
1053 
1054 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)
1055 			continue;
1056 
1057 		for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
1058 			slot = i * (1 << pmd->run.log2_ring_size) + j;
1059 			ring->desc[j].region = 0;
1060 			ring->desc[j].offset =
1061 				proc_private->regions[0]->pkt_buffer_offset +
1062 				(uint32_t)(slot * pmd->run.pkt_buffer_size);
1063 			ring->desc[j].length = pmd->run.pkt_buffer_size;
1064 		}
1065 	}
1066 
1067 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1068 		ring = memif_get_ring(pmd, proc_private, MEMIF_RING_S2C, i);
1069 		__atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
1070 		__atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
1071 		ring->cookie = MEMIF_COOKIE;
1072 		ring->flags = 0;
1073 
1074 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)
1075 			continue;
1076 
1077 		for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
1078 			slot = (i + pmd->run.num_c2s_rings) *
1079 			    (1 << pmd->run.log2_ring_size) + j;
1080 			ring->desc[j].region = 0;
1081 			ring->desc[j].offset =
1082 				proc_private->regions[0]->pkt_buffer_offset +
1083 				(uint32_t)(slot * pmd->run.pkt_buffer_size);
1084 			ring->desc[j].length = pmd->run.pkt_buffer_size;
1085 		}
1086 	}
1087 }
1088 
1089 /* called only by client */
1090 static int
1091 memif_init_queues(struct rte_eth_dev *dev)
1092 {
1093 	struct pmd_internals *pmd = dev->data->dev_private;
1094 	struct memif_queue *mq;
1095 	int i;
1096 
1097 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1098 		mq = dev->data->tx_queues[i];
1099 		mq->log2_ring_size = pmd->run.log2_ring_size;
1100 		/* queues located only in region 0 */
1101 		mq->region = 0;
1102 		mq->ring_offset = memif_get_ring_offset(dev, mq, MEMIF_RING_C2S, i);
1103 		mq->last_head = 0;
1104 		mq->last_tail = 0;
1105 		if (rte_intr_fd_set(mq->intr_handle, eventfd(0, EFD_NONBLOCK)))
1106 			return -rte_errno;
1107 
1108 		if (rte_intr_fd_get(mq->intr_handle) < 0) {
1109 			MIF_LOG(WARNING,
1110 				"Failed to create eventfd for tx queue %d: %s.", i,
1111 				strerror(errno));
1112 		}
1113 		mq->buffers = NULL;
1114 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1115 			mq->buffers = rte_zmalloc("bufs", sizeof(struct rte_mbuf *) *
1116 						  (1 << mq->log2_ring_size), 0);
1117 			if (mq->buffers == NULL)
1118 				return -ENOMEM;
1119 		}
1120 	}
1121 
1122 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1123 		mq = dev->data->rx_queues[i];
1124 		mq->log2_ring_size = pmd->run.log2_ring_size;
1125 		/* queues located only in region 0 */
1126 		mq->region = 0;
1127 		mq->ring_offset = memif_get_ring_offset(dev, mq, MEMIF_RING_S2C, i);
1128 		mq->last_head = 0;
1129 		mq->last_tail = 0;
1130 		if (rte_intr_fd_set(mq->intr_handle, eventfd(0, EFD_NONBLOCK)))
1131 			return -rte_errno;
1132 		if (rte_intr_fd_get(mq->intr_handle) < 0) {
1133 			MIF_LOG(WARNING,
1134 				"Failed to create eventfd for rx queue %d: %s.", i,
1135 				strerror(errno));
1136 		}
1137 		mq->buffers = NULL;
1138 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1139 			mq->buffers = rte_zmalloc("bufs", sizeof(struct rte_mbuf *) *
1140 						  (1 << mq->log2_ring_size), 0);
1141 			if (mq->buffers == NULL)
1142 				return -ENOMEM;
1143 		}
1144 	}
1145 	return 0;
1146 }
1147 
1148 int
1149 memif_init_regions_and_queues(struct rte_eth_dev *dev)
1150 {
1151 	int ret;
1152 
1153 	ret = memif_regions_init(dev);
1154 	if (ret < 0)
1155 		return ret;
1156 
1157 	memif_init_rings(dev);
1158 
1159 	ret = memif_init_queues(dev);
1160 	if (ret < 0)
1161 		return ret;
1162 
1163 	return 0;
1164 }
1165 
1166 int
1167 memif_connect(struct rte_eth_dev *dev)
1168 {
1169 	struct pmd_internals *pmd = dev->data->dev_private;
1170 	struct pmd_process_private *proc_private = dev->process_private;
1171 	struct memif_region *mr;
1172 	struct memif_queue *mq;
1173 	memif_ring_t *ring;
1174 	int i;
1175 
1176 	for (i = 0; i < proc_private->regions_num; i++) {
1177 		mr = proc_private->regions[i];
1178 		if (mr != NULL) {
1179 			if (mr->addr == NULL) {
1180 				if (mr->fd < 0)
1181 					return -1;
1182 				mr->addr = mmap(NULL, mr->region_size,
1183 						PROT_READ | PROT_WRITE,
1184 						MAP_SHARED, mr->fd, 0);
1185 				if (mr->addr == MAP_FAILED) {
1186 					MIF_LOG(ERR, "mmap failed: %s\n",
1187 						strerror(errno));
1188 					return -1;
1189 				}
1190 			}
1191 			if (i > 0 && (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)) {
1192 				/* close memseg file */
1193 				close(mr->fd);
1194 				mr->fd = -1;
1195 			}
1196 		}
1197 	}
1198 
1199 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
1200 		for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1201 			mq = (pmd->role == MEMIF_ROLE_CLIENT) ?
1202 			    dev->data->tx_queues[i] : dev->data->rx_queues[i];
1203 			ring = memif_get_ring_from_queue(proc_private, mq);
1204 			if (ring == NULL || ring->cookie != MEMIF_COOKIE) {
1205 				MIF_LOG(ERR, "Wrong ring");
1206 				return -1;
1207 			}
1208 			__atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
1209 			__atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
1210 			mq->last_head = 0;
1211 			mq->last_tail = 0;
1212 			/* enable polling mode */
1213 			if (pmd->role == MEMIF_ROLE_SERVER)
1214 				ring->flags = MEMIF_RING_FLAG_MASK_INT;
1215 		}
1216 		for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1217 			mq = (pmd->role == MEMIF_ROLE_CLIENT) ?
1218 			    dev->data->rx_queues[i] : dev->data->tx_queues[i];
1219 			ring = memif_get_ring_from_queue(proc_private, mq);
1220 			if (ring == NULL || ring->cookie != MEMIF_COOKIE) {
1221 				MIF_LOG(ERR, "Wrong ring");
1222 				return -1;
1223 			}
1224 			__atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
1225 			__atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
1226 			mq->last_head = 0;
1227 			mq->last_tail = 0;
1228 			/* enable polling mode */
1229 			if (pmd->role == MEMIF_ROLE_CLIENT)
1230 				ring->flags = MEMIF_RING_FLAG_MASK_INT;
1231 		}
1232 
1233 		pmd->flags &= ~ETH_MEMIF_FLAG_CONNECTING;
1234 		pmd->flags |= ETH_MEMIF_FLAG_CONNECTED;
1235 		dev->data->dev_link.link_status = RTE_ETH_LINK_UP;
1236 	}
1237 	MIF_LOG(INFO, "Connected.");
1238 	return 0;
1239 }
1240 
1241 static int
1242 memif_dev_start(struct rte_eth_dev *dev)
1243 {
1244 	struct pmd_internals *pmd = dev->data->dev_private;
1245 	int ret = 0;
1246 
1247 	switch (pmd->role) {
1248 	case MEMIF_ROLE_CLIENT:
1249 		ret = memif_connect_client(dev);
1250 		break;
1251 	case MEMIF_ROLE_SERVER:
1252 		ret = memif_connect_server(dev);
1253 		break;
1254 	default:
1255 		MIF_LOG(ERR, "Unknown role: %d.", pmd->role);
1256 		ret = -1;
1257 		break;
1258 	}
1259 
1260 	return ret;
1261 }
1262 
1263 static int
1264 memif_dev_stop(struct rte_eth_dev *dev)
1265 {
1266 	memif_disconnect(dev);
1267 	return 0;
1268 }
1269 
1270 static int
1271 memif_dev_close(struct rte_eth_dev *dev)
1272 {
1273 	struct pmd_internals *pmd = dev->data->dev_private;
1274 	int i;
1275 
1276 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
1277 		memif_msg_enq_disconnect(pmd->cc, "Device closed", 0);
1278 
1279 		for (i = 0; i < dev->data->nb_rx_queues; i++)
1280 			(*dev->dev_ops->rx_queue_release)(dev, i);
1281 		for (i = 0; i < dev->data->nb_tx_queues; i++)
1282 			(*dev->dev_ops->tx_queue_release)(dev, i);
1283 
1284 		memif_socket_remove_device(dev);
1285 	}
1286 
1287 	rte_free(dev->process_private);
1288 
1289 	return 0;
1290 }
1291 
1292 static int
1293 memif_dev_configure(struct rte_eth_dev *dev)
1294 {
1295 	struct pmd_internals *pmd = dev->data->dev_private;
1296 
1297 	/*
1298 	 * CLIENT - TXQ
1299 	 * SERVER - RXQ
1300 	 */
1301 	pmd->cfg.num_c2s_rings = (pmd->role == MEMIF_ROLE_CLIENT) ?
1302 				  dev->data->nb_tx_queues : dev->data->nb_rx_queues;
1303 
1304 	/*
1305 	 * CLIENT - RXQ
1306 	 * SERVER - TXQ
1307 	 */
1308 	pmd->cfg.num_s2c_rings = (pmd->role == MEMIF_ROLE_CLIENT) ?
1309 				  dev->data->nb_rx_queues : dev->data->nb_tx_queues;
1310 
1311 	return 0;
1312 }
1313 
1314 static int
1315 memif_tx_queue_setup(struct rte_eth_dev *dev,
1316 		     uint16_t qid,
1317 		     uint16_t nb_tx_desc __rte_unused,
1318 		     unsigned int socket_id __rte_unused,
1319 		     const struct rte_eth_txconf *tx_conf __rte_unused)
1320 {
1321 	struct pmd_internals *pmd = dev->data->dev_private;
1322 	struct memif_queue *mq;
1323 
1324 	mq = rte_zmalloc("tx-queue", sizeof(struct memif_queue), 0);
1325 	if (mq == NULL) {
1326 		MIF_LOG(ERR, "Failed to allocate tx queue id: %u", qid);
1327 		return -ENOMEM;
1328 	}
1329 
1330 	/* Allocate interrupt instance */
1331 	mq->intr_handle = rte_intr_instance_alloc(RTE_INTR_INSTANCE_F_SHARED);
1332 	if (mq->intr_handle == NULL) {
1333 		MIF_LOG(ERR, "Failed to allocate intr handle");
1334 		return -ENOMEM;
1335 	}
1336 
1337 	mq->type =
1338 	    (pmd->role == MEMIF_ROLE_CLIENT) ? MEMIF_RING_C2S : MEMIF_RING_S2C;
1339 	mq->n_pkts = 0;
1340 	mq->n_bytes = 0;
1341 
1342 	if (rte_intr_fd_set(mq->intr_handle, -1))
1343 		return -rte_errno;
1344 
1345 	if (rte_intr_type_set(mq->intr_handle, RTE_INTR_HANDLE_EXT))
1346 		return -rte_errno;
1347 
1348 	mq->in_port = dev->data->port_id;
1349 	dev->data->tx_queues[qid] = mq;
1350 
1351 	return 0;
1352 }
1353 
1354 static int
1355 memif_rx_queue_setup(struct rte_eth_dev *dev,
1356 		     uint16_t qid,
1357 		     uint16_t nb_rx_desc __rte_unused,
1358 		     unsigned int socket_id __rte_unused,
1359 		     const struct rte_eth_rxconf *rx_conf __rte_unused,
1360 		     struct rte_mempool *mb_pool)
1361 {
1362 	struct pmd_internals *pmd = dev->data->dev_private;
1363 	struct memif_queue *mq;
1364 
1365 	mq = rte_zmalloc("rx-queue", sizeof(struct memif_queue), 0);
1366 	if (mq == NULL) {
1367 		MIF_LOG(ERR, "Failed to allocate rx queue id: %u", qid);
1368 		return -ENOMEM;
1369 	}
1370 
1371 	/* Allocate interrupt instance */
1372 	mq->intr_handle = rte_intr_instance_alloc(RTE_INTR_INSTANCE_F_SHARED);
1373 	if (mq->intr_handle == NULL) {
1374 		MIF_LOG(ERR, "Failed to allocate intr handle");
1375 		return -ENOMEM;
1376 	}
1377 
1378 	mq->type = (pmd->role == MEMIF_ROLE_CLIENT) ? MEMIF_RING_S2C : MEMIF_RING_C2S;
1379 	mq->n_pkts = 0;
1380 	mq->n_bytes = 0;
1381 
1382 	if (rte_intr_fd_set(mq->intr_handle, -1))
1383 		return -rte_errno;
1384 
1385 	if (rte_intr_type_set(mq->intr_handle, RTE_INTR_HANDLE_EXT))
1386 		return -rte_errno;
1387 
1388 	mq->mempool = mb_pool;
1389 	mq->in_port = dev->data->port_id;
1390 	dev->data->rx_queues[qid] = mq;
1391 
1392 	return 0;
1393 }
1394 
1395 static void
1396 memif_rx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
1397 {
1398 	struct memif_queue *mq = dev->data->rx_queues[qid];
1399 
1400 	if (!mq)
1401 		return;
1402 
1403 	rte_intr_instance_free(mq->intr_handle);
1404 	rte_free(mq);
1405 }
1406 
1407 static void
1408 memif_tx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
1409 {
1410 	struct memif_queue *mq = dev->data->tx_queues[qid];
1411 
1412 	if (!mq)
1413 		return;
1414 
1415 	rte_free(mq);
1416 }
1417 
1418 static int
1419 memif_link_update(struct rte_eth_dev *dev,
1420 		  int wait_to_complete __rte_unused)
1421 {
1422 	struct pmd_process_private *proc_private;
1423 
1424 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1425 		proc_private = dev->process_private;
1426 		if (dev->data->dev_link.link_status == RTE_ETH_LINK_UP &&
1427 				proc_private->regions_num == 0) {
1428 			memif_mp_request_regions(dev);
1429 		} else if (dev->data->dev_link.link_status == RTE_ETH_LINK_DOWN &&
1430 				proc_private->regions_num > 0) {
1431 			memif_free_regions(dev);
1432 		}
1433 	}
1434 	return 0;
1435 }
1436 
1437 static int
1438 memif_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
1439 {
1440 	struct pmd_internals *pmd = dev->data->dev_private;
1441 	struct memif_queue *mq;
1442 	int i;
1443 	uint8_t tmp, nq;
1444 
1445 	stats->ipackets = 0;
1446 	stats->ibytes = 0;
1447 	stats->opackets = 0;
1448 	stats->obytes = 0;
1449 
1450 	tmp = (pmd->role == MEMIF_ROLE_CLIENT) ? pmd->run.num_c2s_rings :
1451 	    pmd->run.num_s2c_rings;
1452 	nq = (tmp < RTE_ETHDEV_QUEUE_STAT_CNTRS) ? tmp :
1453 	    RTE_ETHDEV_QUEUE_STAT_CNTRS;
1454 
1455 	/* RX stats */
1456 	for (i = 0; i < nq; i++) {
1457 		mq = dev->data->rx_queues[i];
1458 		stats->q_ipackets[i] = mq->n_pkts;
1459 		stats->q_ibytes[i] = mq->n_bytes;
1460 		stats->ipackets += mq->n_pkts;
1461 		stats->ibytes += mq->n_bytes;
1462 	}
1463 
1464 	tmp = (pmd->role == MEMIF_ROLE_CLIENT) ? pmd->run.num_s2c_rings :
1465 	    pmd->run.num_c2s_rings;
1466 	nq = (tmp < RTE_ETHDEV_QUEUE_STAT_CNTRS) ? tmp :
1467 	    RTE_ETHDEV_QUEUE_STAT_CNTRS;
1468 
1469 	/* TX stats */
1470 	for (i = 0; i < nq; i++) {
1471 		mq = dev->data->tx_queues[i];
1472 		stats->q_opackets[i] = mq->n_pkts;
1473 		stats->q_obytes[i] = mq->n_bytes;
1474 		stats->opackets += mq->n_pkts;
1475 		stats->obytes += mq->n_bytes;
1476 	}
1477 	return 0;
1478 }
1479 
1480 static int
1481 memif_stats_reset(struct rte_eth_dev *dev)
1482 {
1483 	struct pmd_internals *pmd = dev->data->dev_private;
1484 	int i;
1485 	struct memif_queue *mq;
1486 
1487 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1488 		mq = (pmd->role == MEMIF_ROLE_CLIENT) ? dev->data->tx_queues[i] :
1489 		    dev->data->rx_queues[i];
1490 		mq->n_pkts = 0;
1491 		mq->n_bytes = 0;
1492 	}
1493 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1494 		mq = (pmd->role == MEMIF_ROLE_CLIENT) ? dev->data->rx_queues[i] :
1495 		    dev->data->tx_queues[i];
1496 		mq->n_pkts = 0;
1497 		mq->n_bytes = 0;
1498 	}
1499 
1500 	return 0;
1501 }
1502 
1503 static int
1504 memif_rx_queue_intr_enable(struct rte_eth_dev *dev __rte_unused,
1505 			   uint16_t qid __rte_unused)
1506 {
1507 	MIF_LOG(WARNING, "Interrupt mode not supported.");
1508 
1509 	return -1;
1510 }
1511 
1512 static int
1513 memif_rx_queue_intr_disable(struct rte_eth_dev *dev, uint16_t qid __rte_unused)
1514 {
1515 	struct pmd_internals *pmd __rte_unused = dev->data->dev_private;
1516 
1517 	return 0;
1518 }
1519 
1520 static const struct eth_dev_ops ops = {
1521 	.dev_start = memif_dev_start,
1522 	.dev_stop = memif_dev_stop,
1523 	.dev_close = memif_dev_close,
1524 	.dev_infos_get = memif_dev_info,
1525 	.dev_configure = memif_dev_configure,
1526 	.tx_queue_setup = memif_tx_queue_setup,
1527 	.rx_queue_setup = memif_rx_queue_setup,
1528 	.rx_queue_release = memif_rx_queue_release,
1529 	.tx_queue_release = memif_tx_queue_release,
1530 	.rx_queue_intr_enable = memif_rx_queue_intr_enable,
1531 	.rx_queue_intr_disable = memif_rx_queue_intr_disable,
1532 	.link_update = memif_link_update,
1533 	.stats_get = memif_stats_get,
1534 	.stats_reset = memif_stats_reset,
1535 };
1536 
1537 static int
1538 memif_create(struct rte_vdev_device *vdev, enum memif_role_t role,
1539 	     memif_interface_id_t id, uint32_t flags,
1540 	     const char *socket_filename,
1541 	     memif_log2_ring_size_t log2_ring_size,
1542 	     uint16_t pkt_buffer_size, const char *secret,
1543 	     struct rte_ether_addr *ether_addr)
1544 {
1545 	int ret = 0;
1546 	struct rte_eth_dev *eth_dev;
1547 	struct rte_eth_dev_data *data;
1548 	struct pmd_internals *pmd;
1549 	struct pmd_process_private *process_private;
1550 	const unsigned int numa_node = vdev->device.numa_node;
1551 	const char *name = rte_vdev_device_name(vdev);
1552 
1553 	eth_dev = rte_eth_vdev_allocate(vdev, sizeof(*pmd));
1554 	if (eth_dev == NULL) {
1555 		MIF_LOG(ERR, "%s: Unable to allocate device struct.", name);
1556 		return -1;
1557 	}
1558 
1559 	process_private = (struct pmd_process_private *)
1560 		rte_zmalloc(name, sizeof(struct pmd_process_private),
1561 			    RTE_CACHE_LINE_SIZE);
1562 
1563 	if (process_private == NULL) {
1564 		MIF_LOG(ERR, "Failed to alloc memory for process private");
1565 		return -1;
1566 	}
1567 	eth_dev->process_private = process_private;
1568 
1569 	pmd = eth_dev->data->dev_private;
1570 	memset(pmd, 0, sizeof(*pmd));
1571 
1572 	pmd->id = id;
1573 	pmd->flags = flags;
1574 	pmd->flags |= ETH_MEMIF_FLAG_DISABLED;
1575 	pmd->role = role;
1576 	/* Zero-copy flag irelevant to server. */
1577 	if (pmd->role == MEMIF_ROLE_SERVER)
1578 		pmd->flags &= ~ETH_MEMIF_FLAG_ZERO_COPY;
1579 
1580 	ret = memif_socket_init(eth_dev, socket_filename);
1581 	if (ret < 0)
1582 		return ret;
1583 
1584 	memset(pmd->secret, 0, sizeof(char) * ETH_MEMIF_SECRET_SIZE);
1585 	if (secret != NULL)
1586 		strlcpy(pmd->secret, secret, sizeof(pmd->secret));
1587 
1588 	pmd->cfg.log2_ring_size = log2_ring_size;
1589 	/* set in .dev_configure() */
1590 	pmd->cfg.num_c2s_rings = 0;
1591 	pmd->cfg.num_s2c_rings = 0;
1592 
1593 	pmd->cfg.pkt_buffer_size = pkt_buffer_size;
1594 	rte_spinlock_init(&pmd->cc_lock);
1595 
1596 	data = eth_dev->data;
1597 	data->dev_private = pmd;
1598 	data->numa_node = numa_node;
1599 	data->dev_link = pmd_link;
1600 	data->mac_addrs = ether_addr;
1601 	data->promiscuous = 1;
1602 	data->dev_flags |= RTE_ETH_DEV_AUTOFILL_QUEUE_XSTATS;
1603 
1604 	eth_dev->dev_ops = &ops;
1605 	eth_dev->device = &vdev->device;
1606 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1607 		eth_dev->rx_pkt_burst = eth_memif_rx_zc;
1608 		eth_dev->tx_pkt_burst = eth_memif_tx_zc;
1609 	} else {
1610 		eth_dev->rx_pkt_burst = eth_memif_rx;
1611 		eth_dev->tx_pkt_burst = eth_memif_tx;
1612 	}
1613 
1614 	rte_eth_dev_probing_finish(eth_dev);
1615 
1616 	return 0;
1617 }
1618 
1619 static int
1620 memif_set_role(const char *key __rte_unused, const char *value,
1621 	       void *extra_args)
1622 {
1623 	enum memif_role_t *role = (enum memif_role_t *)extra_args;
1624 
1625 	if (strstr(value, "server") != NULL) {
1626 		*role = MEMIF_ROLE_SERVER;
1627 	} else if (strstr(value, "client") != NULL) {
1628 		*role = MEMIF_ROLE_CLIENT;
1629 	} else if (strstr(value, "master") != NULL) {
1630 		MIF_LOG(NOTICE, "Role argument \"master\" is deprecated, use \"server\"");
1631 		*role = MEMIF_ROLE_SERVER;
1632 	} else if (strstr(value, "slave") != NULL) {
1633 		MIF_LOG(NOTICE, "Role argument \"slave\" is deprecated, use \"client\"");
1634 		*role = MEMIF_ROLE_CLIENT;
1635 	} else {
1636 		MIF_LOG(ERR, "Unknown role: %s.", value);
1637 		return -EINVAL;
1638 	}
1639 	return 0;
1640 }
1641 
1642 static int
1643 memif_set_zc(const char *key __rte_unused, const char *value, void *extra_args)
1644 {
1645 	uint32_t *flags = (uint32_t *)extra_args;
1646 
1647 	if (strstr(value, "yes") != NULL) {
1648 		if (!rte_mcfg_get_single_file_segments()) {
1649 			MIF_LOG(ERR, "Zero-copy doesn't support multi-file segments.");
1650 			return -ENOTSUP;
1651 		}
1652 		*flags |= ETH_MEMIF_FLAG_ZERO_COPY;
1653 	} else if (strstr(value, "no") != NULL) {
1654 		*flags &= ~ETH_MEMIF_FLAG_ZERO_COPY;
1655 	} else {
1656 		MIF_LOG(ERR, "Failed to parse zero-copy param: %s.", value);
1657 		return -EINVAL;
1658 	}
1659 	return 0;
1660 }
1661 
1662 static int
1663 memif_set_id(const char *key __rte_unused, const char *value, void *extra_args)
1664 {
1665 	memif_interface_id_t *id = (memif_interface_id_t *)extra_args;
1666 
1667 	/* even if parsing fails, 0 is a valid id */
1668 	*id = strtoul(value, NULL, 10);
1669 	return 0;
1670 }
1671 
1672 static int
1673 memif_set_bs(const char *key __rte_unused, const char *value, void *extra_args)
1674 {
1675 	unsigned long tmp;
1676 	uint16_t *pkt_buffer_size = (uint16_t *)extra_args;
1677 
1678 	tmp = strtoul(value, NULL, 10);
1679 	if (tmp == 0 || tmp > 0xFFFF) {
1680 		MIF_LOG(ERR, "Invalid buffer size: %s.", value);
1681 		return -EINVAL;
1682 	}
1683 	*pkt_buffer_size = tmp;
1684 	return 0;
1685 }
1686 
1687 static int
1688 memif_set_rs(const char *key __rte_unused, const char *value, void *extra_args)
1689 {
1690 	unsigned long tmp;
1691 	memif_log2_ring_size_t *log2_ring_size =
1692 	    (memif_log2_ring_size_t *)extra_args;
1693 
1694 	tmp = strtoul(value, NULL, 10);
1695 	if (tmp == 0 || tmp > ETH_MEMIF_MAX_LOG2_RING_SIZE) {
1696 		MIF_LOG(ERR, "Invalid ring size: %s (max %u).",
1697 			value, ETH_MEMIF_MAX_LOG2_RING_SIZE);
1698 		return -EINVAL;
1699 	}
1700 	*log2_ring_size = tmp;
1701 	return 0;
1702 }
1703 
1704 /* check if directory exists and if we have permission to read/write */
1705 static int
1706 memif_check_socket_filename(const char *filename)
1707 {
1708 	char *dir = NULL, *tmp;
1709 	uint32_t idx;
1710 	int ret = 0;
1711 
1712 	if (strlen(filename) >= MEMIF_SOCKET_UN_SIZE) {
1713 		MIF_LOG(ERR, "Unix socket address too long (max 108).");
1714 		return -1;
1715 	}
1716 
1717 	tmp = strrchr(filename, '/');
1718 	if (tmp != NULL) {
1719 		idx = tmp - filename;
1720 		dir = rte_zmalloc("memif_tmp", sizeof(char) * (idx + 1), 0);
1721 		if (dir == NULL) {
1722 			MIF_LOG(ERR, "Failed to allocate memory.");
1723 			return -1;
1724 		}
1725 		strlcpy(dir, filename, sizeof(char) * (idx + 1));
1726 	}
1727 
1728 	if (dir == NULL || (faccessat(-1, dir, F_OK | R_OK |
1729 					W_OK, AT_EACCESS) < 0)) {
1730 		MIF_LOG(ERR, "Invalid socket directory.");
1731 		ret = -EINVAL;
1732 	}
1733 
1734 	if (dir != NULL)
1735 		rte_free(dir);
1736 
1737 	return ret;
1738 }
1739 
1740 static int
1741 memif_set_socket_filename(const char *key __rte_unused, const char *value,
1742 			  void *extra_args)
1743 {
1744 	const char **socket_filename = (const char **)extra_args;
1745 
1746 	*socket_filename = value;
1747 	return 0;
1748 }
1749 
1750 static int
1751 memif_set_is_socket_abstract(const char *key __rte_unused, const char *value, void *extra_args)
1752 {
1753 	uint32_t *flags = (uint32_t *)extra_args;
1754 
1755 	if (strstr(value, "yes") != NULL) {
1756 		*flags |= ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1757 	} else if (strstr(value, "no") != NULL) {
1758 		*flags &= ~ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1759 	} else {
1760 		MIF_LOG(ERR, "Failed to parse socket-abstract param: %s.", value);
1761 		return -EINVAL;
1762 	}
1763 	return 0;
1764 }
1765 
1766 static int
1767 memif_set_mac(const char *key __rte_unused, const char *value, void *extra_args)
1768 {
1769 	struct rte_ether_addr *ether_addr = (struct rte_ether_addr *)extra_args;
1770 
1771 	if (rte_ether_unformat_addr(value, ether_addr) < 0)
1772 		MIF_LOG(WARNING, "Failed to parse mac '%s'.", value);
1773 	return 0;
1774 }
1775 
1776 static int
1777 memif_set_secret(const char *key __rte_unused, const char *value, void *extra_args)
1778 {
1779 	const char **secret = (const char **)extra_args;
1780 
1781 	*secret = value;
1782 	return 0;
1783 }
1784 
1785 static int
1786 rte_pmd_memif_probe(struct rte_vdev_device *vdev)
1787 {
1788 	RTE_BUILD_BUG_ON(sizeof(memif_msg_t) != 128);
1789 	RTE_BUILD_BUG_ON(sizeof(memif_desc_t) != 16);
1790 	int ret = 0;
1791 	struct rte_kvargs *kvlist;
1792 	const char *name = rte_vdev_device_name(vdev);
1793 	enum memif_role_t role = MEMIF_ROLE_CLIENT;
1794 	memif_interface_id_t id = 0;
1795 	uint16_t pkt_buffer_size = ETH_MEMIF_DEFAULT_PKT_BUFFER_SIZE;
1796 	memif_log2_ring_size_t log2_ring_size = ETH_MEMIF_DEFAULT_RING_SIZE;
1797 	const char *socket_filename = ETH_MEMIF_DEFAULT_SOCKET_FILENAME;
1798 	uint32_t flags = 0;
1799 	const char *secret = NULL;
1800 	struct rte_ether_addr *ether_addr = rte_zmalloc("",
1801 		sizeof(struct rte_ether_addr), 0);
1802 	struct rte_eth_dev *eth_dev;
1803 
1804 	rte_eth_random_addr(ether_addr->addr_bytes);
1805 
1806 	MIF_LOG(INFO, "Initialize MEMIF: %s.", name);
1807 
1808 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1809 		eth_dev = rte_eth_dev_attach_secondary(name);
1810 		if (!eth_dev) {
1811 			MIF_LOG(ERR, "Failed to probe %s", name);
1812 			return -1;
1813 		}
1814 
1815 		eth_dev->dev_ops = &ops;
1816 		eth_dev->device = &vdev->device;
1817 		eth_dev->rx_pkt_burst = eth_memif_rx;
1818 		eth_dev->tx_pkt_burst = eth_memif_tx;
1819 
1820 		if (!rte_eal_primary_proc_alive(NULL)) {
1821 			MIF_LOG(ERR, "Primary process is missing");
1822 			return -1;
1823 		}
1824 
1825 		eth_dev->process_private = (struct pmd_process_private *)
1826 			rte_zmalloc(name,
1827 				sizeof(struct pmd_process_private),
1828 				RTE_CACHE_LINE_SIZE);
1829 		if (eth_dev->process_private == NULL) {
1830 			MIF_LOG(ERR,
1831 				"Failed to alloc memory for process private");
1832 			return -1;
1833 		}
1834 
1835 		rte_eth_dev_probing_finish(eth_dev);
1836 
1837 		return 0;
1838 	}
1839 
1840 	ret = rte_mp_action_register(MEMIF_MP_SEND_REGION, memif_mp_send_region);
1841 	/*
1842 	 * Primary process can continue probing, but secondary process won't
1843 	 * be able to get memory regions information
1844 	 */
1845 	if (ret < 0 && rte_errno != EEXIST)
1846 		MIF_LOG(WARNING, "Failed to register mp action callback: %s",
1847 			strerror(rte_errno));
1848 
1849 	/* use abstract address by default */
1850 	flags |= ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1851 
1852 	kvlist = rte_kvargs_parse(rte_vdev_device_args(vdev), valid_arguments);
1853 
1854 	/* parse parameters */
1855 	if (kvlist != NULL) {
1856 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ROLE_ARG,
1857 					 &memif_set_role, &role);
1858 		if (ret < 0)
1859 			goto exit;
1860 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ID_ARG,
1861 					 &memif_set_id, &id);
1862 		if (ret < 0)
1863 			goto exit;
1864 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_PKT_BUFFER_SIZE_ARG,
1865 					 &memif_set_bs, &pkt_buffer_size);
1866 		if (ret < 0)
1867 			goto exit;
1868 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_RING_SIZE_ARG,
1869 					 &memif_set_rs, &log2_ring_size);
1870 		if (ret < 0)
1871 			goto exit;
1872 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SOCKET_ARG,
1873 					 &memif_set_socket_filename,
1874 					 (void *)(&socket_filename));
1875 		if (ret < 0)
1876 			goto exit;
1877 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SOCKET_ABSTRACT_ARG,
1878 					 &memif_set_is_socket_abstract, &flags);
1879 		if (ret < 0)
1880 			goto exit;
1881 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_MAC_ARG,
1882 					 &memif_set_mac, ether_addr);
1883 		if (ret < 0)
1884 			goto exit;
1885 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ZC_ARG,
1886 					 &memif_set_zc, &flags);
1887 		if (ret < 0)
1888 			goto exit;
1889 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SECRET_ARG,
1890 					 &memif_set_secret, (void *)(&secret));
1891 		if (ret < 0)
1892 			goto exit;
1893 	}
1894 
1895 	if (!(flags & ETH_MEMIF_FLAG_SOCKET_ABSTRACT)) {
1896 		ret = memif_check_socket_filename(socket_filename);
1897 		if (ret < 0)
1898 			goto exit;
1899 	}
1900 
1901 	/* create interface */
1902 	ret = memif_create(vdev, role, id, flags, socket_filename,
1903 			   log2_ring_size, pkt_buffer_size, secret, ether_addr);
1904 
1905 exit:
1906 	if (kvlist != NULL)
1907 		rte_kvargs_free(kvlist);
1908 	return ret;
1909 }
1910 
1911 static int
1912 rte_pmd_memif_remove(struct rte_vdev_device *vdev)
1913 {
1914 	struct rte_eth_dev *eth_dev;
1915 
1916 	eth_dev = rte_eth_dev_allocated(rte_vdev_device_name(vdev));
1917 	if (eth_dev == NULL)
1918 		return 0;
1919 
1920 	return rte_eth_dev_close(eth_dev->data->port_id);
1921 }
1922 
1923 static struct rte_vdev_driver pmd_memif_drv = {
1924 	.probe = rte_pmd_memif_probe,
1925 	.remove = rte_pmd_memif_remove,
1926 };
1927 
1928 RTE_PMD_REGISTER_VDEV(net_memif, pmd_memif_drv);
1929 
1930 RTE_PMD_REGISTER_PARAM_STRING(net_memif,
1931 			      ETH_MEMIF_ID_ARG "=<int>"
1932 			      ETH_MEMIF_ROLE_ARG "=server|client"
1933 			      ETH_MEMIF_PKT_BUFFER_SIZE_ARG "=<int>"
1934 			      ETH_MEMIF_RING_SIZE_ARG "=<int>"
1935 			      ETH_MEMIF_SOCKET_ARG "=<string>"
1936 				  ETH_MEMIF_SOCKET_ABSTRACT_ARG "=yes|no"
1937 			      ETH_MEMIF_MAC_ARG "=xx:xx:xx:xx:xx:xx"
1938 			      ETH_MEMIF_ZC_ARG "=yes|no"
1939 			      ETH_MEMIF_SECRET_ARG "=<string>");
1940 
1941 RTE_LOG_REGISTER_DEFAULT(memif_logtype, NOTICE);
1942