xref: /dpdk/drivers/net/memif/rte_eth_memif.c (revision 42a8fc7daa46256d150278fc9a7a846e27945a0c)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright 2018-2019 Cisco Systems, Inc.  All rights reserved.
3  */
4 
5 #include <stdlib.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <sys/un.h>
11 #include <sys/ioctl.h>
12 #include <sys/mman.h>
13 #include <linux/if_ether.h>
14 #include <errno.h>
15 #include <sys/eventfd.h>
16 
17 #include <rte_version.h>
18 #include <rte_mbuf.h>
19 #include <rte_ether.h>
20 #include <ethdev_driver.h>
21 #include <ethdev_vdev.h>
22 #include <rte_malloc.h>
23 #include <rte_kvargs.h>
24 #include <rte_bus_vdev.h>
25 #include <rte_string_fns.h>
26 #include <rte_errno.h>
27 #include <rte_memory.h>
28 #include <rte_memzone.h>
29 #include <rte_eal_memconfig.h>
30 
31 #include "rte_eth_memif.h"
32 #include "memif_socket.h"
33 
34 #define ETH_MEMIF_ID_ARG		"id"
35 #define ETH_MEMIF_ROLE_ARG		"role"
36 #define ETH_MEMIF_PKT_BUFFER_SIZE_ARG	"bsize"
37 #define ETH_MEMIF_RING_SIZE_ARG		"rsize"
38 #define ETH_MEMIF_SOCKET_ARG		"socket"
39 #define ETH_MEMIF_SOCKET_ABSTRACT_ARG	"socket-abstract"
40 #define ETH_MEMIF_MAC_ARG		"mac"
41 #define ETH_MEMIF_ZC_ARG		"zero-copy"
42 #define ETH_MEMIF_SECRET_ARG		"secret"
43 
44 static const char * const valid_arguments[] = {
45 	ETH_MEMIF_ID_ARG,
46 	ETH_MEMIF_ROLE_ARG,
47 	ETH_MEMIF_PKT_BUFFER_SIZE_ARG,
48 	ETH_MEMIF_RING_SIZE_ARG,
49 	ETH_MEMIF_SOCKET_ARG,
50 	ETH_MEMIF_SOCKET_ABSTRACT_ARG,
51 	ETH_MEMIF_MAC_ARG,
52 	ETH_MEMIF_ZC_ARG,
53 	ETH_MEMIF_SECRET_ARG,
54 	NULL
55 };
56 
57 static const struct rte_eth_link pmd_link = {
58 	.link_speed = RTE_ETH_SPEED_NUM_10G,
59 	.link_duplex = RTE_ETH_LINK_FULL_DUPLEX,
60 	.link_status = RTE_ETH_LINK_DOWN,
61 	.link_autoneg = RTE_ETH_LINK_AUTONEG
62 };
63 
64 #define MEMIF_MP_SEND_REGION		"memif_mp_send_region"
65 
66 
67 static int memif_region_init_zc(const struct rte_memseg_list *msl,
68 				const struct rte_memseg *ms, void *arg);
69 
70 const char *
71 memif_version(void)
72 {
73 	return ("memif-" RTE_STR(MEMIF_VERSION_MAJOR) "." RTE_STR(MEMIF_VERSION_MINOR));
74 }
75 
76 /* Message header to synchronize regions */
77 struct mp_region_msg {
78 	char port_name[RTE_DEV_NAME_MAX_LEN];
79 	memif_region_index_t idx;
80 	memif_region_size_t size;
81 };
82 
83 static int
84 memif_mp_send_region(const struct rte_mp_msg *msg, const void *peer)
85 {
86 	struct rte_eth_dev *dev;
87 	struct pmd_process_private *proc_private;
88 	const struct mp_region_msg *msg_param = (const struct mp_region_msg *)msg->param;
89 	struct rte_mp_msg reply;
90 	struct mp_region_msg *reply_param = (struct mp_region_msg *)reply.param;
91 
92 	/* Get requested port */
93 	dev = rte_eth_dev_get_by_name(msg_param->port_name);
94 	if (!dev) {
95 		MIF_LOG(ERR, "Failed to get port id for %s",
96 			msg_param->port_name);
97 		return -1;
98 	}
99 	proc_private = dev->process_private;
100 
101 	memset(&reply, 0, sizeof(reply));
102 	strlcpy(reply.name, msg->name, sizeof(reply.name));
103 	reply_param->idx = msg_param->idx;
104 	if (proc_private->regions[msg_param->idx] != NULL) {
105 		reply_param->size = proc_private->regions[msg_param->idx]->region_size;
106 		reply.fds[0] = proc_private->regions[msg_param->idx]->fd;
107 		reply.num_fds = 1;
108 	}
109 	reply.len_param = sizeof(*reply_param);
110 	if (rte_mp_reply(&reply, peer) < 0) {
111 		MIF_LOG(ERR, "Failed to reply to an add region request");
112 		return -1;
113 	}
114 
115 	return 0;
116 }
117 
118 /*
119  * Request regions
120  * Called by secondary process, when ports link status goes up.
121  */
122 static int
123 memif_mp_request_regions(struct rte_eth_dev *dev)
124 {
125 	int ret, i;
126 	struct timespec timeout = {.tv_sec = 5, .tv_nsec = 0};
127 	struct rte_mp_msg msg, *reply;
128 	struct rte_mp_reply replies;
129 	struct mp_region_msg *msg_param = (struct mp_region_msg *)msg.param;
130 	struct mp_region_msg *reply_param;
131 	struct memif_region *r;
132 	struct pmd_process_private *proc_private = dev->process_private;
133 	struct pmd_internals *pmd = dev->data->dev_private;
134 	/* in case of zero-copy client, only request region 0 */
135 	uint16_t max_region_num = (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) ?
136 				   1 : ETH_MEMIF_MAX_REGION_NUM;
137 
138 	MIF_LOG(DEBUG, "Requesting memory regions");
139 
140 	for (i = 0; i < max_region_num; i++) {
141 		/* Prepare the message */
142 		memset(&msg, 0, sizeof(msg));
143 		strlcpy(msg.name, MEMIF_MP_SEND_REGION, sizeof(msg.name));
144 		strlcpy(msg_param->port_name, dev->data->name,
145 			sizeof(msg_param->port_name));
146 		msg_param->idx = i;
147 		msg.len_param = sizeof(*msg_param);
148 
149 		/* Send message */
150 		ret = rte_mp_request_sync(&msg, &replies, &timeout);
151 		if (ret < 0 || replies.nb_received != 1) {
152 			MIF_LOG(ERR, "Failed to send mp msg: %d",
153 				rte_errno);
154 			return -1;
155 		}
156 
157 		reply = &replies.msgs[0];
158 		reply_param = (struct mp_region_msg *)reply->param;
159 
160 		if (reply_param->size > 0) {
161 			r = rte_zmalloc("region", sizeof(struct memif_region), 0);
162 			if (r == NULL) {
163 				MIF_LOG(ERR, "Failed to alloc memif region.");
164 				free(reply);
165 				return -ENOMEM;
166 			}
167 			r->region_size = reply_param->size;
168 			if (reply->num_fds < 1) {
169 				MIF_LOG(ERR, "Missing file descriptor.");
170 				free(reply);
171 				return -1;
172 			}
173 			r->fd = reply->fds[0];
174 			r->addr = NULL;
175 
176 			proc_private->regions[reply_param->idx] = r;
177 			proc_private->regions_num++;
178 		}
179 		free(reply);
180 	}
181 
182 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
183 		ret = rte_memseg_walk(memif_region_init_zc, (void *)proc_private);
184 		if (ret < 0)
185 			return ret;
186 	}
187 
188 	return memif_connect(dev);
189 }
190 
191 static int
192 memif_dev_info(struct rte_eth_dev *dev __rte_unused, struct rte_eth_dev_info *dev_info)
193 {
194 	dev_info->max_mac_addrs = 1;
195 	dev_info->max_rx_pktlen = RTE_ETHER_MAX_LEN;
196 	dev_info->max_rx_queues = ETH_MEMIF_MAX_NUM_Q_PAIRS;
197 	dev_info->max_tx_queues = ETH_MEMIF_MAX_NUM_Q_PAIRS;
198 	dev_info->min_rx_bufsize = 0;
199 	dev_info->tx_offload_capa = RTE_ETH_TX_OFFLOAD_MULTI_SEGS;
200 
201 	return 0;
202 }
203 
204 static memif_ring_t *
205 memif_get_ring(struct pmd_internals *pmd, struct pmd_process_private *proc_private,
206 	       memif_ring_type_t type, uint16_t ring_num)
207 {
208 	/* rings only in region 0 */
209 	void *p = proc_private->regions[0]->addr;
210 	int ring_size = sizeof(memif_ring_t) + sizeof(memif_desc_t) *
211 	    (1 << pmd->run.log2_ring_size);
212 
213 	p = (uint8_t *)p + (ring_num + type * pmd->run.num_c2s_rings) * ring_size;
214 
215 	return (memif_ring_t *)p;
216 }
217 
218 static memif_region_offset_t
219 memif_get_ring_offset(struct rte_eth_dev *dev, struct memif_queue *mq,
220 		      memif_ring_type_t type, uint16_t num)
221 {
222 	struct pmd_internals *pmd = dev->data->dev_private;
223 	struct pmd_process_private *proc_private = dev->process_private;
224 
225 	return ((uint8_t *)memif_get_ring(pmd, proc_private, type, num) -
226 		(uint8_t *)proc_private->regions[mq->region]->addr);
227 }
228 
229 static memif_ring_t *
230 memif_get_ring_from_queue(struct pmd_process_private *proc_private,
231 			  struct memif_queue *mq)
232 {
233 	struct memif_region *r;
234 
235 	r = proc_private->regions[mq->region];
236 	if (r == NULL)
237 		return NULL;
238 
239 	return (memif_ring_t *)((uint8_t *)r->addr + mq->ring_offset);
240 }
241 
242 static void *
243 memif_get_buffer(struct pmd_process_private *proc_private, memif_desc_t *d)
244 {
245 	return ((uint8_t *)proc_private->regions[d->region]->addr + d->offset);
246 }
247 
248 /* Free mbufs received by server */
249 static void
250 memif_free_stored_mbufs(struct pmd_process_private *proc_private, struct memif_queue *mq)
251 {
252 	uint16_t cur_tail;
253 	uint16_t mask = (1 << mq->log2_ring_size) - 1;
254 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
255 
256 	/* FIXME: improve performance */
257 	/* The ring->tail acts as a guard variable between Tx and Rx
258 	 * threads, so using load-acquire pairs with store-release
259 	 * in function eth_memif_rx for C2S queues.
260 	 */
261 	cur_tail = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
262 	while (mq->last_tail != cur_tail) {
263 		RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail + 1) & mask]);
264 		/* Decrement refcnt and free mbuf. (current segment) */
265 		rte_mbuf_refcnt_update(mq->buffers[mq->last_tail & mask], -1);
266 		rte_pktmbuf_free_seg(mq->buffers[mq->last_tail & mask]);
267 		mq->last_tail++;
268 	}
269 }
270 
271 static int
272 memif_pktmbuf_chain(struct rte_mbuf *head, struct rte_mbuf *cur_tail,
273 		    struct rte_mbuf *tail)
274 {
275 	/* Check for number-of-segments-overflow */
276 	if (unlikely(head->nb_segs + tail->nb_segs > RTE_MBUF_MAX_NB_SEGS))
277 		return -EOVERFLOW;
278 
279 	/* Chain 'tail' onto the old tail */
280 	cur_tail->next = tail;
281 
282 	/* accumulate number of segments and total length. */
283 	head->nb_segs = (uint16_t)(head->nb_segs + tail->nb_segs);
284 
285 	tail->pkt_len = tail->data_len;
286 	head->pkt_len += tail->pkt_len;
287 
288 	return 0;
289 }
290 
291 static uint16_t
292 eth_memif_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
293 {
294 	struct memif_queue *mq = queue;
295 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
296 	struct pmd_process_private *proc_private =
297 		rte_eth_devices[mq->in_port].process_private;
298 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
299 	uint16_t cur_slot, last_slot, n_slots, ring_size, mask, s0;
300 	uint16_t n_rx_pkts = 0;
301 	uint16_t mbuf_size = rte_pktmbuf_data_room_size(mq->mempool) -
302 		RTE_PKTMBUF_HEADROOM;
303 	uint16_t src_len, src_off, dst_len, dst_off, cp_len;
304 	memif_ring_type_t type = mq->type;
305 	memif_desc_t *d0;
306 	struct rte_mbuf *mbuf, *mbuf_head, *mbuf_tail;
307 	uint64_t b;
308 	ssize_t size __rte_unused;
309 	uint16_t head;
310 	int ret;
311 	struct rte_eth_link link;
312 
313 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
314 		return 0;
315 	if (unlikely(ring == NULL)) {
316 		/* Secondary process will attempt to request regions. */
317 		ret = rte_eth_link_get(mq->in_port, &link);
318 		if (ret < 0)
319 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
320 				mq->in_port, rte_strerror(-ret));
321 		return 0;
322 	}
323 
324 	/* consume interrupt */
325 	if (((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) &&
326 	    (rte_intr_fd_get(mq->intr_handle) >= 0))
327 		size = read(rte_intr_fd_get(mq->intr_handle), &b,
328 			    sizeof(b));
329 
330 	ring_size = 1 << mq->log2_ring_size;
331 	mask = ring_size - 1;
332 
333 	if (type == MEMIF_RING_C2S) {
334 		cur_slot = mq->last_head;
335 		last_slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
336 	} else {
337 		cur_slot = mq->last_tail;
338 		last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
339 	}
340 
341 	if (cur_slot == last_slot)
342 		goto refill;
343 	n_slots = last_slot - cur_slot;
344 
345 	while (n_slots && n_rx_pkts < nb_pkts) {
346 		mbuf_head = rte_pktmbuf_alloc(mq->mempool);
347 		if (unlikely(mbuf_head == NULL))
348 			goto no_free_bufs;
349 		mbuf = mbuf_head;
350 		mbuf->port = mq->in_port;
351 		dst_off = 0;
352 
353 next_slot:
354 		s0 = cur_slot & mask;
355 		d0 = &ring->desc[s0];
356 
357 		src_len = d0->length;
358 		src_off = 0;
359 
360 		do {
361 			dst_len = mbuf_size - dst_off;
362 			if (dst_len == 0) {
363 				dst_off = 0;
364 				dst_len = mbuf_size;
365 
366 				/* store pointer to tail */
367 				mbuf_tail = mbuf;
368 				mbuf = rte_pktmbuf_alloc(mq->mempool);
369 				if (unlikely(mbuf == NULL))
370 					goto no_free_bufs;
371 				mbuf->port = mq->in_port;
372 				ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
373 				if (unlikely(ret < 0)) {
374 					MIF_LOG(ERR, "number-of-segments-overflow");
375 					rte_pktmbuf_free(mbuf);
376 					goto no_free_bufs;
377 				}
378 			}
379 			cp_len = RTE_MIN(dst_len, src_len);
380 
381 			rte_pktmbuf_data_len(mbuf) += cp_len;
382 			rte_pktmbuf_pkt_len(mbuf) = rte_pktmbuf_data_len(mbuf);
383 			if (mbuf != mbuf_head)
384 				rte_pktmbuf_pkt_len(mbuf_head) += cp_len;
385 
386 			rte_memcpy(rte_pktmbuf_mtod_offset(mbuf, void *,
387 							   dst_off),
388 				(uint8_t *)memif_get_buffer(proc_private, d0) +
389 				src_off, cp_len);
390 
391 			src_off += cp_len;
392 			dst_off += cp_len;
393 			src_len -= cp_len;
394 		} while (src_len);
395 
396 		cur_slot++;
397 		n_slots--;
398 
399 		if (d0->flags & MEMIF_DESC_FLAG_NEXT)
400 			goto next_slot;
401 
402 		mq->n_bytes += rte_pktmbuf_pkt_len(mbuf_head);
403 		*bufs++ = mbuf_head;
404 		n_rx_pkts++;
405 	}
406 
407 no_free_bufs:
408 	if (type == MEMIF_RING_C2S) {
409 		__atomic_store_n(&ring->tail, cur_slot, __ATOMIC_RELEASE);
410 		mq->last_head = cur_slot;
411 	} else {
412 		mq->last_tail = cur_slot;
413 	}
414 
415 refill:
416 	if (type == MEMIF_RING_S2C) {
417 		/* ring->head is updated by the receiver and this function
418 		 * is called in the context of receiver thread. The loads in
419 		 * the receiver do not need to synchronize with its own stores.
420 		 */
421 		head = __atomic_load_n(&ring->head, __ATOMIC_RELAXED);
422 		n_slots = ring_size - head + mq->last_tail;
423 
424 		while (n_slots--) {
425 			s0 = head++ & mask;
426 			d0 = &ring->desc[s0];
427 			d0->length = pmd->run.pkt_buffer_size;
428 		}
429 		__atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
430 	}
431 
432 	mq->n_pkts += n_rx_pkts;
433 	return n_rx_pkts;
434 }
435 
436 static uint16_t
437 eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
438 {
439 	struct memif_queue *mq = queue;
440 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
441 	struct pmd_process_private *proc_private =
442 		rte_eth_devices[mq->in_port].process_private;
443 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
444 	uint16_t cur_slot, last_slot, n_slots, ring_size, mask, s0, head;
445 	uint16_t n_rx_pkts = 0;
446 	memif_desc_t *d0;
447 	struct rte_mbuf *mbuf, *mbuf_tail;
448 	struct rte_mbuf *mbuf_head = NULL;
449 	int ret;
450 	struct rte_eth_link link;
451 
452 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
453 		return 0;
454 	if (unlikely(ring == NULL)) {
455 		/* Secondary process will attempt to request regions. */
456 		rte_eth_link_get(mq->in_port, &link);
457 		return 0;
458 	}
459 
460 	/* consume interrupt */
461 	if ((rte_intr_fd_get(mq->intr_handle) >= 0) &&
462 	    ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0)) {
463 		uint64_t b;
464 		ssize_t size __rte_unused;
465 		size = read(rte_intr_fd_get(mq->intr_handle), &b,
466 			    sizeof(b));
467 	}
468 
469 	ring_size = 1 << mq->log2_ring_size;
470 	mask = ring_size - 1;
471 
472 	cur_slot = mq->last_tail;
473 	/* The ring->tail acts as a guard variable between Tx and Rx
474 	 * threads, so using load-acquire pairs with store-release
475 	 * to synchronize it between threads.
476 	 */
477 	last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
478 	if (cur_slot == last_slot)
479 		goto refill;
480 	n_slots = last_slot - cur_slot;
481 
482 	while (n_slots && n_rx_pkts < nb_pkts) {
483 		s0 = cur_slot & mask;
484 
485 		d0 = &ring->desc[s0];
486 		mbuf_head = mq->buffers[s0];
487 		mbuf = mbuf_head;
488 
489 next_slot:
490 		/* prefetch next descriptor */
491 		if (n_rx_pkts + 1 < nb_pkts)
492 			rte_prefetch0(&ring->desc[(cur_slot + 1) & mask]);
493 
494 		mbuf->port = mq->in_port;
495 		rte_pktmbuf_data_len(mbuf) = d0->length;
496 		rte_pktmbuf_pkt_len(mbuf) = rte_pktmbuf_data_len(mbuf);
497 
498 		mq->n_bytes += rte_pktmbuf_data_len(mbuf);
499 
500 		cur_slot++;
501 		n_slots--;
502 		if (d0->flags & MEMIF_DESC_FLAG_NEXT) {
503 			s0 = cur_slot & mask;
504 			d0 = &ring->desc[s0];
505 			mbuf_tail = mbuf;
506 			mbuf = mq->buffers[s0];
507 			ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
508 			if (unlikely(ret < 0)) {
509 				MIF_LOG(ERR, "number-of-segments-overflow");
510 				goto refill;
511 			}
512 			goto next_slot;
513 		}
514 
515 		*bufs++ = mbuf_head;
516 		n_rx_pkts++;
517 	}
518 
519 	mq->last_tail = cur_slot;
520 
521 /* Supply server with new buffers */
522 refill:
523 	/* ring->head is updated by the receiver and this function
524 	 * is called in the context of receiver thread. The loads in
525 	 * the receiver do not need to synchronize with its own stores.
526 	 */
527 	head = __atomic_load_n(&ring->head, __ATOMIC_RELAXED);
528 	n_slots = ring_size - head + mq->last_tail;
529 
530 	if (n_slots < 32)
531 		goto no_free_mbufs;
532 
533 	ret = rte_pktmbuf_alloc_bulk(mq->mempool, &mq->buffers[head & mask], n_slots);
534 	if (unlikely(ret < 0))
535 		goto no_free_mbufs;
536 
537 	while (n_slots--) {
538 		s0 = head++ & mask;
539 		if (n_slots > 0)
540 			rte_prefetch0(mq->buffers[head & mask]);
541 		d0 = &ring->desc[s0];
542 		/* store buffer header */
543 		mbuf = mq->buffers[s0];
544 		/* populate descriptor */
545 		d0->length = rte_pktmbuf_data_room_size(mq->mempool) -
546 				RTE_PKTMBUF_HEADROOM;
547 		d0->region = 1;
548 		d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
549 			(uint8_t *)proc_private->regions[d0->region]->addr;
550 	}
551 no_free_mbufs:
552 	/* The ring->head acts as a guard variable between Tx and Rx
553 	 * threads, so using store-release pairs with load-acquire
554 	 * in function eth_memif_tx.
555 	 */
556 	__atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
557 
558 	mq->n_pkts += n_rx_pkts;
559 
560 	return n_rx_pkts;
561 }
562 
563 static uint16_t
564 eth_memif_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
565 {
566 	struct memif_queue *mq = queue;
567 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
568 	struct pmd_process_private *proc_private =
569 		rte_eth_devices[mq->in_port].process_private;
570 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
571 	uint16_t slot, saved_slot, n_free, ring_size, mask, n_tx_pkts = 0;
572 	uint16_t src_len, src_off, dst_len, dst_off, cp_len, nb_segs;
573 	memif_ring_type_t type = mq->type;
574 	memif_desc_t *d0;
575 	struct rte_mbuf *mbuf;
576 	struct rte_mbuf *mbuf_head;
577 	uint64_t a;
578 	ssize_t size;
579 	struct rte_eth_link link;
580 
581 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
582 		return 0;
583 	if (unlikely(ring == NULL)) {
584 		int ret;
585 
586 		/* Secondary process will attempt to request regions. */
587 		ret = rte_eth_link_get(mq->in_port, &link);
588 		if (ret < 0)
589 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
590 				mq->in_port, rte_strerror(-ret));
591 		return 0;
592 	}
593 
594 	ring_size = 1 << mq->log2_ring_size;
595 	mask = ring_size - 1;
596 
597 	if (type == MEMIF_RING_C2S) {
598 		/* For C2S queues ring->head is updated by the sender and
599 		 * this function is called in the context of sending thread.
600 		 * The loads in the sender do not need to synchronize with
601 		 * its own stores. Hence, the following load can be a
602 		 * relaxed load.
603 		 */
604 		slot = __atomic_load_n(&ring->head, __ATOMIC_RELAXED);
605 		n_free = ring_size - slot +
606 				__atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
607 	} else {
608 		/* For S2C queues ring->tail is updated by the sender and
609 		 * this function is called in the context of sending thread.
610 		 * The loads in the sender do not need to synchronize with
611 		 * its own stores. Hence, the following load can be a
612 		 * relaxed load.
613 		 */
614 		slot = __atomic_load_n(&ring->tail, __ATOMIC_RELAXED);
615 		n_free = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE) - slot;
616 	}
617 
618 	while (n_tx_pkts < nb_pkts && n_free) {
619 		mbuf_head = *bufs++;
620 		nb_segs = mbuf_head->nb_segs;
621 		mbuf = mbuf_head;
622 
623 		saved_slot = slot;
624 		d0 = &ring->desc[slot & mask];
625 		dst_off = 0;
626 		dst_len = (type == MEMIF_RING_C2S) ?
627 			pmd->run.pkt_buffer_size : d0->length;
628 
629 next_in_chain:
630 		src_off = 0;
631 		src_len = rte_pktmbuf_data_len(mbuf);
632 
633 		while (src_len) {
634 			if (dst_len == 0) {
635 				if (n_free) {
636 					slot++;
637 					n_free--;
638 					d0->flags |= MEMIF_DESC_FLAG_NEXT;
639 					d0 = &ring->desc[slot & mask];
640 					dst_off = 0;
641 					dst_len = (type == MEMIF_RING_C2S) ?
642 					    pmd->run.pkt_buffer_size : d0->length;
643 					d0->flags = 0;
644 				} else {
645 					slot = saved_slot;
646 					goto no_free_slots;
647 				}
648 			}
649 			cp_len = RTE_MIN(dst_len, src_len);
650 
651 			rte_memcpy((uint8_t *)memif_get_buffer(proc_private,
652 							       d0) + dst_off,
653 				rte_pktmbuf_mtod_offset(mbuf, void *, src_off),
654 				cp_len);
655 
656 			mq->n_bytes += cp_len;
657 			src_off += cp_len;
658 			dst_off += cp_len;
659 			src_len -= cp_len;
660 			dst_len -= cp_len;
661 
662 			d0->length = dst_off;
663 		}
664 
665 		if (--nb_segs > 0) {
666 			mbuf = mbuf->next;
667 			goto next_in_chain;
668 		}
669 
670 		n_tx_pkts++;
671 		slot++;
672 		n_free--;
673 		rte_pktmbuf_free(mbuf_head);
674 	}
675 
676 no_free_slots:
677 	if (type == MEMIF_RING_C2S)
678 		__atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
679 	else
680 		__atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE);
681 
682 	if (((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) &&
683 	    (rte_intr_fd_get(mq->intr_handle) >= 0)) {
684 		a = 1;
685 		size = write(rte_intr_fd_get(mq->intr_handle), &a,
686 			     sizeof(a));
687 		if (unlikely(size < 0)) {
688 			MIF_LOG(WARNING,
689 				"Failed to send interrupt. %s", strerror(errno));
690 		}
691 	}
692 
693 	mq->n_pkts += n_tx_pkts;
694 	return n_tx_pkts;
695 }
696 
697 
698 static int
699 memif_tx_one_zc(struct pmd_process_private *proc_private, struct memif_queue *mq,
700 		memif_ring_t *ring, struct rte_mbuf *mbuf, const uint16_t mask,
701 		uint16_t slot, uint16_t n_free)
702 {
703 	memif_desc_t *d0;
704 	uint16_t nb_segs = mbuf->nb_segs;
705 	int used_slots = 1;
706 
707 next_in_chain:
708 	/* store pointer to mbuf to free it later */
709 	mq->buffers[slot & mask] = mbuf;
710 	/* Increment refcnt to make sure the buffer is not freed before server
711 	 * receives it. (current segment)
712 	 */
713 	rte_mbuf_refcnt_update(mbuf, 1);
714 	/* populate descriptor */
715 	d0 = &ring->desc[slot & mask];
716 	d0->length = rte_pktmbuf_data_len(mbuf);
717 	mq->n_bytes += rte_pktmbuf_data_len(mbuf);
718 	/* FIXME: get region index */
719 	d0->region = 1;
720 	d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
721 		(uint8_t *)proc_private->regions[d0->region]->addr;
722 	d0->flags = 0;
723 
724 	/* check if buffer is chained */
725 	if (--nb_segs > 0) {
726 		if (n_free < 2)
727 			return 0;
728 		/* mark buffer as chained */
729 		d0->flags |= MEMIF_DESC_FLAG_NEXT;
730 		/* advance mbuf */
731 		mbuf = mbuf->next;
732 		/* update counters */
733 		used_slots++;
734 		slot++;
735 		n_free--;
736 		goto next_in_chain;
737 	}
738 	return used_slots;
739 }
740 
741 static uint16_t
742 eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
743 {
744 	struct memif_queue *mq = queue;
745 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
746 	struct pmd_process_private *proc_private =
747 		rte_eth_devices[mq->in_port].process_private;
748 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
749 	uint16_t slot, n_free, ring_size, mask, n_tx_pkts = 0;
750 	struct rte_eth_link link;
751 
752 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
753 		return 0;
754 	if (unlikely(ring == NULL)) {
755 		/* Secondary process will attempt to request regions. */
756 		rte_eth_link_get(mq->in_port, &link);
757 		return 0;
758 	}
759 
760 	ring_size = 1 << mq->log2_ring_size;
761 	mask = ring_size - 1;
762 
763 	/* free mbufs received by server */
764 	memif_free_stored_mbufs(proc_private, mq);
765 
766 	/* ring type always MEMIF_RING_C2S */
767 	/* For C2S queues ring->head is updated by the sender and
768 	 * this function is called in the context of sending thread.
769 	 * The loads in the sender do not need to synchronize with
770 	 * its own stores. Hence, the following load can be a
771 	 * relaxed load.
772 	 */
773 	slot = __atomic_load_n(&ring->head, __ATOMIC_RELAXED);
774 	n_free = ring_size - slot + mq->last_tail;
775 
776 	int used_slots;
777 
778 	while (n_free && (n_tx_pkts < nb_pkts)) {
779 		while ((n_free > 4) && ((nb_pkts - n_tx_pkts) > 4)) {
780 			if ((nb_pkts - n_tx_pkts) > 8) {
781 				rte_prefetch0(*bufs + 4);
782 				rte_prefetch0(*bufs + 5);
783 				rte_prefetch0(*bufs + 6);
784 				rte_prefetch0(*bufs + 7);
785 			}
786 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
787 				mask, slot, n_free);
788 			if (unlikely(used_slots < 1))
789 				goto no_free_slots;
790 			n_tx_pkts++;
791 			slot += used_slots;
792 			n_free -= used_slots;
793 
794 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
795 				mask, slot, n_free);
796 			if (unlikely(used_slots < 1))
797 				goto no_free_slots;
798 			n_tx_pkts++;
799 			slot += used_slots;
800 			n_free -= used_slots;
801 
802 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
803 				mask, slot, n_free);
804 			if (unlikely(used_slots < 1))
805 				goto no_free_slots;
806 			n_tx_pkts++;
807 			slot += used_slots;
808 			n_free -= used_slots;
809 
810 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
811 				mask, slot, n_free);
812 			if (unlikely(used_slots < 1))
813 				goto no_free_slots;
814 			n_tx_pkts++;
815 			slot += used_slots;
816 			n_free -= used_slots;
817 		}
818 		used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
819 			mask, slot, n_free);
820 		if (unlikely(used_slots < 1))
821 			goto no_free_slots;
822 		n_tx_pkts++;
823 		slot += used_slots;
824 		n_free -= used_slots;
825 	}
826 
827 no_free_slots:
828 	/* ring type always MEMIF_RING_C2S */
829 	/* The ring->head acts as a guard variable between Tx and Rx
830 	 * threads, so using store-release pairs with load-acquire
831 	 * in function eth_memif_rx for C2S rings.
832 	 */
833 	__atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
834 
835 	/* Send interrupt, if enabled. */
836 	if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
837 		uint64_t a = 1;
838 		if (rte_intr_fd_get(mq->intr_handle) < 0)
839 			return -1;
840 
841 		ssize_t size = write(rte_intr_fd_get(mq->intr_handle),
842 				     &a, sizeof(a));
843 		if (unlikely(size < 0)) {
844 			MIF_LOG(WARNING,
845 				"Failed to send interrupt. %s", strerror(errno));
846 		}
847 	}
848 
849 	/* increment queue counters */
850 	mq->n_pkts += n_tx_pkts;
851 
852 	return n_tx_pkts;
853 }
854 
855 void
856 memif_free_regions(struct rte_eth_dev *dev)
857 {
858 	struct pmd_process_private *proc_private = dev->process_private;
859 	struct pmd_internals *pmd = dev->data->dev_private;
860 	int i;
861 	struct memif_region *r;
862 
863 	/* regions are allocated contiguously, so it's
864 	 * enough to loop until 'proc_private->regions_num'
865 	 */
866 	for (i = 0; i < proc_private->regions_num; i++) {
867 		r = proc_private->regions[i];
868 		if (r != NULL) {
869 			/* This is memzone */
870 			if (i > 0 && (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)) {
871 				r->addr = NULL;
872 				if (r->fd > 0)
873 					close(r->fd);
874 			}
875 			if (r->addr != NULL) {
876 				munmap(r->addr, r->region_size);
877 				if (r->fd > 0) {
878 					close(r->fd);
879 					r->fd = -1;
880 				}
881 			}
882 			rte_free(r);
883 			proc_private->regions[i] = NULL;
884 		}
885 	}
886 	proc_private->regions_num = 0;
887 }
888 
889 static int
890 memif_region_init_zc(const struct rte_memseg_list *msl, const struct rte_memseg *ms,
891 		     void *arg)
892 {
893 	struct pmd_process_private *proc_private = (struct pmd_process_private *)arg;
894 	struct memif_region *r;
895 
896 	if (proc_private->regions_num < 1) {
897 		MIF_LOG(ERR, "Missing descriptor region");
898 		return -1;
899 	}
900 
901 	r = proc_private->regions[proc_private->regions_num - 1];
902 
903 	if (r->addr != msl->base_va)
904 		r = proc_private->regions[++proc_private->regions_num - 1];
905 
906 	if (r == NULL) {
907 		r = rte_zmalloc("region", sizeof(struct memif_region), 0);
908 		if (r == NULL) {
909 			MIF_LOG(ERR, "Failed to alloc memif region.");
910 			return -ENOMEM;
911 		}
912 
913 		r->addr = msl->base_va;
914 		r->region_size = ms->len;
915 		r->fd = rte_memseg_get_fd(ms);
916 		if (r->fd < 0)
917 			return -1;
918 		r->pkt_buffer_offset = 0;
919 
920 		proc_private->regions[proc_private->regions_num - 1] = r;
921 	} else {
922 		r->region_size += ms->len;
923 	}
924 
925 	return 0;
926 }
927 
928 static int
929 memif_region_init_shm(struct rte_eth_dev *dev, uint8_t has_buffers)
930 {
931 	struct pmd_internals *pmd = dev->data->dev_private;
932 	struct pmd_process_private *proc_private = dev->process_private;
933 	char shm_name[ETH_MEMIF_SHM_NAME_SIZE];
934 	int ret = 0;
935 	struct memif_region *r;
936 
937 	if (proc_private->regions_num >= ETH_MEMIF_MAX_REGION_NUM) {
938 		MIF_LOG(ERR, "Too many regions.");
939 		return -1;
940 	}
941 
942 	r = rte_zmalloc("region", sizeof(struct memif_region), 0);
943 	if (r == NULL) {
944 		MIF_LOG(ERR, "Failed to alloc memif region.");
945 		return -ENOMEM;
946 	}
947 
948 	/* calculate buffer offset */
949 	r->pkt_buffer_offset = (pmd->run.num_c2s_rings + pmd->run.num_s2c_rings) *
950 	    (sizeof(memif_ring_t) + sizeof(memif_desc_t) *
951 	    (1 << pmd->run.log2_ring_size));
952 
953 	r->region_size = r->pkt_buffer_offset;
954 	/* if region has buffers, add buffers size to region_size */
955 	if (has_buffers == 1)
956 		r->region_size += (uint32_t)(pmd->run.pkt_buffer_size *
957 			(1 << pmd->run.log2_ring_size) *
958 			(pmd->run.num_c2s_rings +
959 			 pmd->run.num_s2c_rings));
960 
961 	memset(shm_name, 0, sizeof(char) * ETH_MEMIF_SHM_NAME_SIZE);
962 	snprintf(shm_name, ETH_MEMIF_SHM_NAME_SIZE, "memif_region_%d",
963 		 proc_private->regions_num);
964 
965 	r->fd = memfd_create(shm_name, MFD_ALLOW_SEALING);
966 	if (r->fd < 0) {
967 		MIF_LOG(ERR, "Failed to create shm file: %s.", strerror(errno));
968 		ret = -1;
969 		goto error;
970 	}
971 
972 	ret = fcntl(r->fd, F_ADD_SEALS, F_SEAL_SHRINK);
973 	if (ret < 0) {
974 		MIF_LOG(ERR, "Failed to add seals to shm file: %s.", strerror(errno));
975 		goto error;
976 	}
977 
978 	ret = ftruncate(r->fd, r->region_size);
979 	if (ret < 0) {
980 		MIF_LOG(ERR, "Failed to truncate shm file: %s.", strerror(errno));
981 		goto error;
982 	}
983 
984 	r->addr = mmap(NULL, r->region_size, PROT_READ |
985 		       PROT_WRITE, MAP_SHARED, r->fd, 0);
986 	if (r->addr == MAP_FAILED) {
987 		MIF_LOG(ERR, "Failed to mmap shm region: %s.", strerror(ret));
988 		ret = -1;
989 		goto error;
990 	}
991 
992 	proc_private->regions[proc_private->regions_num] = r;
993 	proc_private->regions_num++;
994 
995 	return ret;
996 
997 error:
998 	if (r->fd > 0)
999 		close(r->fd);
1000 	r->fd = -1;
1001 
1002 	return ret;
1003 }
1004 
1005 static int
1006 memif_regions_init(struct rte_eth_dev *dev)
1007 {
1008 	struct pmd_internals *pmd = dev->data->dev_private;
1009 	int ret;
1010 
1011 	/*
1012 	 * Zero-copy exposes dpdk memory.
1013 	 * Each memseg list will be represented by memif region.
1014 	 * Zero-copy regions indexing: memseg list idx + 1,
1015 	 * as we already have region 0 reserved for descriptors.
1016 	 */
1017 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1018 		/* create region idx 0 containing descriptors */
1019 		ret = memif_region_init_shm(dev, 0);
1020 		if (ret < 0)
1021 			return ret;
1022 		ret = rte_memseg_walk(memif_region_init_zc, (void *)dev->process_private);
1023 		if (ret < 0)
1024 			return ret;
1025 	} else {
1026 		/* create one memory region containing rings and buffers */
1027 		ret = memif_region_init_shm(dev, /* has buffers */ 1);
1028 		if (ret < 0)
1029 			return ret;
1030 	}
1031 
1032 	return 0;
1033 }
1034 
1035 static void
1036 memif_init_rings(struct rte_eth_dev *dev)
1037 {
1038 	struct pmd_internals *pmd = dev->data->dev_private;
1039 	struct pmd_process_private *proc_private = dev->process_private;
1040 	memif_ring_t *ring;
1041 	int i, j;
1042 	uint16_t slot;
1043 
1044 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1045 		ring = memif_get_ring(pmd, proc_private, MEMIF_RING_C2S, i);
1046 		__atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
1047 		__atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
1048 		ring->cookie = MEMIF_COOKIE;
1049 		ring->flags = 0;
1050 
1051 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)
1052 			continue;
1053 
1054 		for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
1055 			slot = i * (1 << pmd->run.log2_ring_size) + j;
1056 			ring->desc[j].region = 0;
1057 			ring->desc[j].offset =
1058 				proc_private->regions[0]->pkt_buffer_offset +
1059 				(uint32_t)(slot * pmd->run.pkt_buffer_size);
1060 			ring->desc[j].length = pmd->run.pkt_buffer_size;
1061 		}
1062 	}
1063 
1064 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1065 		ring = memif_get_ring(pmd, proc_private, MEMIF_RING_S2C, i);
1066 		__atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
1067 		__atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
1068 		ring->cookie = MEMIF_COOKIE;
1069 		ring->flags = 0;
1070 
1071 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)
1072 			continue;
1073 
1074 		for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
1075 			slot = (i + pmd->run.num_c2s_rings) *
1076 			    (1 << pmd->run.log2_ring_size) + j;
1077 			ring->desc[j].region = 0;
1078 			ring->desc[j].offset =
1079 				proc_private->regions[0]->pkt_buffer_offset +
1080 				(uint32_t)(slot * pmd->run.pkt_buffer_size);
1081 			ring->desc[j].length = pmd->run.pkt_buffer_size;
1082 		}
1083 	}
1084 }
1085 
1086 /* called only by client */
1087 static int
1088 memif_init_queues(struct rte_eth_dev *dev)
1089 {
1090 	struct pmd_internals *pmd = dev->data->dev_private;
1091 	struct memif_queue *mq;
1092 	int i;
1093 
1094 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1095 		mq = dev->data->tx_queues[i];
1096 		mq->log2_ring_size = pmd->run.log2_ring_size;
1097 		/* queues located only in region 0 */
1098 		mq->region = 0;
1099 		mq->ring_offset = memif_get_ring_offset(dev, mq, MEMIF_RING_C2S, i);
1100 		mq->last_head = 0;
1101 		mq->last_tail = 0;
1102 		if (rte_intr_fd_set(mq->intr_handle, eventfd(0, EFD_NONBLOCK)))
1103 			return -rte_errno;
1104 
1105 		if (rte_intr_fd_get(mq->intr_handle) < 0) {
1106 			MIF_LOG(WARNING,
1107 				"Failed to create eventfd for tx queue %d: %s.", i,
1108 				strerror(errno));
1109 		}
1110 		mq->buffers = NULL;
1111 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1112 			mq->buffers = rte_zmalloc("bufs", sizeof(struct rte_mbuf *) *
1113 						  (1 << mq->log2_ring_size), 0);
1114 			if (mq->buffers == NULL)
1115 				return -ENOMEM;
1116 		}
1117 	}
1118 
1119 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1120 		mq = dev->data->rx_queues[i];
1121 		mq->log2_ring_size = pmd->run.log2_ring_size;
1122 		/* queues located only in region 0 */
1123 		mq->region = 0;
1124 		mq->ring_offset = memif_get_ring_offset(dev, mq, MEMIF_RING_S2C, i);
1125 		mq->last_head = 0;
1126 		mq->last_tail = 0;
1127 		if (rte_intr_fd_set(mq->intr_handle, eventfd(0, EFD_NONBLOCK)))
1128 			return -rte_errno;
1129 		if (rte_intr_fd_get(mq->intr_handle) < 0) {
1130 			MIF_LOG(WARNING,
1131 				"Failed to create eventfd for rx queue %d: %s.", i,
1132 				strerror(errno));
1133 		}
1134 		mq->buffers = NULL;
1135 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1136 			mq->buffers = rte_zmalloc("bufs", sizeof(struct rte_mbuf *) *
1137 						  (1 << mq->log2_ring_size), 0);
1138 			if (mq->buffers == NULL)
1139 				return -ENOMEM;
1140 		}
1141 	}
1142 	return 0;
1143 }
1144 
1145 int
1146 memif_init_regions_and_queues(struct rte_eth_dev *dev)
1147 {
1148 	int ret;
1149 
1150 	ret = memif_regions_init(dev);
1151 	if (ret < 0)
1152 		return ret;
1153 
1154 	memif_init_rings(dev);
1155 
1156 	ret = memif_init_queues(dev);
1157 	if (ret < 0)
1158 		return ret;
1159 
1160 	return 0;
1161 }
1162 
1163 int
1164 memif_connect(struct rte_eth_dev *dev)
1165 {
1166 	struct pmd_internals *pmd = dev->data->dev_private;
1167 	struct pmd_process_private *proc_private = dev->process_private;
1168 	struct memif_region *mr;
1169 	struct memif_queue *mq;
1170 	memif_ring_t *ring;
1171 	int i;
1172 
1173 	for (i = 0; i < proc_private->regions_num; i++) {
1174 		mr = proc_private->regions[i];
1175 		if (mr != NULL) {
1176 			if (mr->addr == NULL) {
1177 				if (mr->fd < 0)
1178 					return -1;
1179 				mr->addr = mmap(NULL, mr->region_size,
1180 						PROT_READ | PROT_WRITE,
1181 						MAP_SHARED, mr->fd, 0);
1182 				if (mr->addr == MAP_FAILED) {
1183 					MIF_LOG(ERR, "mmap failed: %s\n",
1184 						strerror(errno));
1185 					return -1;
1186 				}
1187 			}
1188 			if (i > 0 && (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)) {
1189 				/* close memseg file */
1190 				close(mr->fd);
1191 				mr->fd = -1;
1192 			}
1193 		}
1194 	}
1195 
1196 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
1197 		for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1198 			mq = (pmd->role == MEMIF_ROLE_CLIENT) ?
1199 			    dev->data->tx_queues[i] : dev->data->rx_queues[i];
1200 			ring = memif_get_ring_from_queue(proc_private, mq);
1201 			if (ring == NULL || ring->cookie != MEMIF_COOKIE) {
1202 				MIF_LOG(ERR, "Wrong ring");
1203 				return -1;
1204 			}
1205 			__atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
1206 			__atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
1207 			mq->last_head = 0;
1208 			mq->last_tail = 0;
1209 			/* enable polling mode */
1210 			if (pmd->role == MEMIF_ROLE_SERVER)
1211 				ring->flags = MEMIF_RING_FLAG_MASK_INT;
1212 		}
1213 		for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1214 			mq = (pmd->role == MEMIF_ROLE_CLIENT) ?
1215 			    dev->data->rx_queues[i] : dev->data->tx_queues[i];
1216 			ring = memif_get_ring_from_queue(proc_private, mq);
1217 			if (ring == NULL || ring->cookie != MEMIF_COOKIE) {
1218 				MIF_LOG(ERR, "Wrong ring");
1219 				return -1;
1220 			}
1221 			__atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
1222 			__atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
1223 			mq->last_head = 0;
1224 			mq->last_tail = 0;
1225 			/* enable polling mode */
1226 			if (pmd->role == MEMIF_ROLE_CLIENT)
1227 				ring->flags = MEMIF_RING_FLAG_MASK_INT;
1228 		}
1229 
1230 		pmd->flags &= ~ETH_MEMIF_FLAG_CONNECTING;
1231 		pmd->flags |= ETH_MEMIF_FLAG_CONNECTED;
1232 		dev->data->dev_link.link_status = RTE_ETH_LINK_UP;
1233 	}
1234 	MIF_LOG(INFO, "Connected.");
1235 	return 0;
1236 }
1237 
1238 static int
1239 memif_dev_start(struct rte_eth_dev *dev)
1240 {
1241 	struct pmd_internals *pmd = dev->data->dev_private;
1242 	int ret = 0;
1243 
1244 	switch (pmd->role) {
1245 	case MEMIF_ROLE_CLIENT:
1246 		ret = memif_connect_client(dev);
1247 		break;
1248 	case MEMIF_ROLE_SERVER:
1249 		ret = memif_connect_server(dev);
1250 		break;
1251 	default:
1252 		MIF_LOG(ERR, "Unknown role: %d.", pmd->role);
1253 		ret = -1;
1254 		break;
1255 	}
1256 
1257 	return ret;
1258 }
1259 
1260 static int
1261 memif_dev_stop(struct rte_eth_dev *dev)
1262 {
1263 	memif_disconnect(dev);
1264 	return 0;
1265 }
1266 
1267 static int
1268 memif_dev_close(struct rte_eth_dev *dev)
1269 {
1270 	struct pmd_internals *pmd = dev->data->dev_private;
1271 	int i;
1272 
1273 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
1274 		memif_msg_enq_disconnect(pmd->cc, "Device closed", 0);
1275 
1276 		for (i = 0; i < dev->data->nb_rx_queues; i++)
1277 			(*dev->dev_ops->rx_queue_release)(dev, i);
1278 		for (i = 0; i < dev->data->nb_tx_queues; i++)
1279 			(*dev->dev_ops->tx_queue_release)(dev, i);
1280 
1281 		memif_socket_remove_device(dev);
1282 	}
1283 
1284 	rte_free(dev->process_private);
1285 
1286 	return 0;
1287 }
1288 
1289 static int
1290 memif_dev_configure(struct rte_eth_dev *dev)
1291 {
1292 	struct pmd_internals *pmd = dev->data->dev_private;
1293 
1294 	/*
1295 	 * CLIENT - TXQ
1296 	 * SERVER - RXQ
1297 	 */
1298 	pmd->cfg.num_c2s_rings = (pmd->role == MEMIF_ROLE_CLIENT) ?
1299 				  dev->data->nb_tx_queues : dev->data->nb_rx_queues;
1300 
1301 	/*
1302 	 * CLIENT - RXQ
1303 	 * SERVER - TXQ
1304 	 */
1305 	pmd->cfg.num_s2c_rings = (pmd->role == MEMIF_ROLE_CLIENT) ?
1306 				  dev->data->nb_rx_queues : dev->data->nb_tx_queues;
1307 
1308 	return 0;
1309 }
1310 
1311 static int
1312 memif_tx_queue_setup(struct rte_eth_dev *dev,
1313 		     uint16_t qid,
1314 		     uint16_t nb_tx_desc __rte_unused,
1315 		     unsigned int socket_id __rte_unused,
1316 		     const struct rte_eth_txconf *tx_conf __rte_unused)
1317 {
1318 	struct pmd_internals *pmd = dev->data->dev_private;
1319 	struct memif_queue *mq;
1320 
1321 	mq = rte_zmalloc("tx-queue", sizeof(struct memif_queue), 0);
1322 	if (mq == NULL) {
1323 		MIF_LOG(ERR, "Failed to allocate tx queue id: %u", qid);
1324 		return -ENOMEM;
1325 	}
1326 
1327 	/* Allocate interrupt instance */
1328 	mq->intr_handle = rte_intr_instance_alloc(RTE_INTR_INSTANCE_F_SHARED);
1329 	if (mq->intr_handle == NULL) {
1330 		MIF_LOG(ERR, "Failed to allocate intr handle");
1331 		return -ENOMEM;
1332 	}
1333 
1334 	mq->type =
1335 	    (pmd->role == MEMIF_ROLE_CLIENT) ? MEMIF_RING_C2S : MEMIF_RING_S2C;
1336 	mq->n_pkts = 0;
1337 	mq->n_bytes = 0;
1338 
1339 	if (rte_intr_fd_set(mq->intr_handle, -1))
1340 		return -rte_errno;
1341 
1342 	if (rte_intr_type_set(mq->intr_handle, RTE_INTR_HANDLE_EXT))
1343 		return -rte_errno;
1344 
1345 	mq->in_port = dev->data->port_id;
1346 	dev->data->tx_queues[qid] = mq;
1347 
1348 	return 0;
1349 }
1350 
1351 static int
1352 memif_rx_queue_setup(struct rte_eth_dev *dev,
1353 		     uint16_t qid,
1354 		     uint16_t nb_rx_desc __rte_unused,
1355 		     unsigned int socket_id __rte_unused,
1356 		     const struct rte_eth_rxconf *rx_conf __rte_unused,
1357 		     struct rte_mempool *mb_pool)
1358 {
1359 	struct pmd_internals *pmd = dev->data->dev_private;
1360 	struct memif_queue *mq;
1361 
1362 	mq = rte_zmalloc("rx-queue", sizeof(struct memif_queue), 0);
1363 	if (mq == NULL) {
1364 		MIF_LOG(ERR, "Failed to allocate rx queue id: %u", qid);
1365 		return -ENOMEM;
1366 	}
1367 
1368 	/* Allocate interrupt instance */
1369 	mq->intr_handle = rte_intr_instance_alloc(RTE_INTR_INSTANCE_F_SHARED);
1370 	if (mq->intr_handle == NULL) {
1371 		MIF_LOG(ERR, "Failed to allocate intr handle");
1372 		return -ENOMEM;
1373 	}
1374 
1375 	mq->type = (pmd->role == MEMIF_ROLE_CLIENT) ? MEMIF_RING_S2C : MEMIF_RING_C2S;
1376 	mq->n_pkts = 0;
1377 	mq->n_bytes = 0;
1378 
1379 	if (rte_intr_fd_set(mq->intr_handle, -1))
1380 		return -rte_errno;
1381 
1382 	if (rte_intr_type_set(mq->intr_handle, RTE_INTR_HANDLE_EXT))
1383 		return -rte_errno;
1384 
1385 	mq->mempool = mb_pool;
1386 	mq->in_port = dev->data->port_id;
1387 	dev->data->rx_queues[qid] = mq;
1388 
1389 	return 0;
1390 }
1391 
1392 static void
1393 memif_rx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
1394 {
1395 	struct memif_queue *mq = dev->data->rx_queues[qid];
1396 
1397 	if (!mq)
1398 		return;
1399 
1400 	rte_intr_instance_free(mq->intr_handle);
1401 	rte_free(mq);
1402 }
1403 
1404 static void
1405 memif_tx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
1406 {
1407 	struct memif_queue *mq = dev->data->tx_queues[qid];
1408 
1409 	if (!mq)
1410 		return;
1411 
1412 	rte_free(mq);
1413 }
1414 
1415 static int
1416 memif_link_update(struct rte_eth_dev *dev,
1417 		  int wait_to_complete __rte_unused)
1418 {
1419 	struct pmd_process_private *proc_private;
1420 
1421 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1422 		proc_private = dev->process_private;
1423 		if (dev->data->dev_link.link_status == RTE_ETH_LINK_UP &&
1424 				proc_private->regions_num == 0) {
1425 			memif_mp_request_regions(dev);
1426 		} else if (dev->data->dev_link.link_status == RTE_ETH_LINK_DOWN &&
1427 				proc_private->regions_num > 0) {
1428 			memif_free_regions(dev);
1429 		}
1430 	}
1431 	return 0;
1432 }
1433 
1434 static int
1435 memif_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
1436 {
1437 	struct pmd_internals *pmd = dev->data->dev_private;
1438 	struct memif_queue *mq;
1439 	int i;
1440 	uint8_t tmp, nq;
1441 
1442 	stats->ipackets = 0;
1443 	stats->ibytes = 0;
1444 	stats->opackets = 0;
1445 	stats->obytes = 0;
1446 
1447 	tmp = (pmd->role == MEMIF_ROLE_CLIENT) ? pmd->run.num_c2s_rings :
1448 	    pmd->run.num_s2c_rings;
1449 	nq = (tmp < RTE_ETHDEV_QUEUE_STAT_CNTRS) ? tmp :
1450 	    RTE_ETHDEV_QUEUE_STAT_CNTRS;
1451 
1452 	/* RX stats */
1453 	for (i = 0; i < nq; i++) {
1454 		mq = dev->data->rx_queues[i];
1455 		stats->q_ipackets[i] = mq->n_pkts;
1456 		stats->q_ibytes[i] = mq->n_bytes;
1457 		stats->ipackets += mq->n_pkts;
1458 		stats->ibytes += mq->n_bytes;
1459 	}
1460 
1461 	tmp = (pmd->role == MEMIF_ROLE_CLIENT) ? pmd->run.num_s2c_rings :
1462 	    pmd->run.num_c2s_rings;
1463 	nq = (tmp < RTE_ETHDEV_QUEUE_STAT_CNTRS) ? tmp :
1464 	    RTE_ETHDEV_QUEUE_STAT_CNTRS;
1465 
1466 	/* TX stats */
1467 	for (i = 0; i < nq; i++) {
1468 		mq = dev->data->tx_queues[i];
1469 		stats->q_opackets[i] = mq->n_pkts;
1470 		stats->q_obytes[i] = mq->n_bytes;
1471 		stats->opackets += mq->n_pkts;
1472 		stats->obytes += mq->n_bytes;
1473 	}
1474 	return 0;
1475 }
1476 
1477 static int
1478 memif_stats_reset(struct rte_eth_dev *dev)
1479 {
1480 	struct pmd_internals *pmd = dev->data->dev_private;
1481 	int i;
1482 	struct memif_queue *mq;
1483 
1484 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1485 		mq = (pmd->role == MEMIF_ROLE_CLIENT) ? dev->data->tx_queues[i] :
1486 		    dev->data->rx_queues[i];
1487 		mq->n_pkts = 0;
1488 		mq->n_bytes = 0;
1489 	}
1490 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1491 		mq = (pmd->role == MEMIF_ROLE_CLIENT) ? dev->data->rx_queues[i] :
1492 		    dev->data->tx_queues[i];
1493 		mq->n_pkts = 0;
1494 		mq->n_bytes = 0;
1495 	}
1496 
1497 	return 0;
1498 }
1499 
1500 static const struct eth_dev_ops ops = {
1501 	.dev_start = memif_dev_start,
1502 	.dev_stop = memif_dev_stop,
1503 	.dev_close = memif_dev_close,
1504 	.dev_infos_get = memif_dev_info,
1505 	.dev_configure = memif_dev_configure,
1506 	.tx_queue_setup = memif_tx_queue_setup,
1507 	.rx_queue_setup = memif_rx_queue_setup,
1508 	.rx_queue_release = memif_rx_queue_release,
1509 	.tx_queue_release = memif_tx_queue_release,
1510 	.link_update = memif_link_update,
1511 	.stats_get = memif_stats_get,
1512 	.stats_reset = memif_stats_reset,
1513 };
1514 
1515 static int
1516 memif_create(struct rte_vdev_device *vdev, enum memif_role_t role,
1517 	     memif_interface_id_t id, uint32_t flags,
1518 	     const char *socket_filename,
1519 	     memif_log2_ring_size_t log2_ring_size,
1520 	     uint16_t pkt_buffer_size, const char *secret,
1521 	     struct rte_ether_addr *ether_addr)
1522 {
1523 	int ret = 0;
1524 	struct rte_eth_dev *eth_dev;
1525 	struct rte_eth_dev_data *data;
1526 	struct pmd_internals *pmd;
1527 	struct pmd_process_private *process_private;
1528 	const unsigned int numa_node = vdev->device.numa_node;
1529 	const char *name = rte_vdev_device_name(vdev);
1530 
1531 	eth_dev = rte_eth_vdev_allocate(vdev, sizeof(*pmd));
1532 	if (eth_dev == NULL) {
1533 		MIF_LOG(ERR, "%s: Unable to allocate device struct.", name);
1534 		return -1;
1535 	}
1536 
1537 	process_private = (struct pmd_process_private *)
1538 		rte_zmalloc(name, sizeof(struct pmd_process_private),
1539 			    RTE_CACHE_LINE_SIZE);
1540 
1541 	if (process_private == NULL) {
1542 		MIF_LOG(ERR, "Failed to alloc memory for process private");
1543 		return -1;
1544 	}
1545 	eth_dev->process_private = process_private;
1546 
1547 	pmd = eth_dev->data->dev_private;
1548 	memset(pmd, 0, sizeof(*pmd));
1549 
1550 	pmd->id = id;
1551 	pmd->flags = flags;
1552 	pmd->flags |= ETH_MEMIF_FLAG_DISABLED;
1553 	pmd->role = role;
1554 	/* Zero-copy flag irelevant to server. */
1555 	if (pmd->role == MEMIF_ROLE_SERVER)
1556 		pmd->flags &= ~ETH_MEMIF_FLAG_ZERO_COPY;
1557 
1558 	ret = memif_socket_init(eth_dev, socket_filename);
1559 	if (ret < 0)
1560 		return ret;
1561 
1562 	memset(pmd->secret, 0, sizeof(char) * ETH_MEMIF_SECRET_SIZE);
1563 	if (secret != NULL)
1564 		strlcpy(pmd->secret, secret, sizeof(pmd->secret));
1565 
1566 	pmd->cfg.log2_ring_size = log2_ring_size;
1567 	/* set in .dev_configure() */
1568 	pmd->cfg.num_c2s_rings = 0;
1569 	pmd->cfg.num_s2c_rings = 0;
1570 
1571 	pmd->cfg.pkt_buffer_size = pkt_buffer_size;
1572 	rte_spinlock_init(&pmd->cc_lock);
1573 
1574 	data = eth_dev->data;
1575 	data->dev_private = pmd;
1576 	data->numa_node = numa_node;
1577 	data->dev_link = pmd_link;
1578 	data->mac_addrs = ether_addr;
1579 	data->promiscuous = 1;
1580 	data->dev_flags |= RTE_ETH_DEV_AUTOFILL_QUEUE_XSTATS;
1581 
1582 	eth_dev->dev_ops = &ops;
1583 	eth_dev->device = &vdev->device;
1584 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1585 		eth_dev->rx_pkt_burst = eth_memif_rx_zc;
1586 		eth_dev->tx_pkt_burst = eth_memif_tx_zc;
1587 	} else {
1588 		eth_dev->rx_pkt_burst = eth_memif_rx;
1589 		eth_dev->tx_pkt_burst = eth_memif_tx;
1590 	}
1591 
1592 	rte_eth_dev_probing_finish(eth_dev);
1593 
1594 	return 0;
1595 }
1596 
1597 static int
1598 memif_set_role(const char *key __rte_unused, const char *value,
1599 	       void *extra_args)
1600 {
1601 	enum memif_role_t *role = (enum memif_role_t *)extra_args;
1602 
1603 	if (strstr(value, "server") != NULL) {
1604 		*role = MEMIF_ROLE_SERVER;
1605 	} else if (strstr(value, "client") != NULL) {
1606 		*role = MEMIF_ROLE_CLIENT;
1607 	} else if (strstr(value, "master") != NULL) {
1608 		MIF_LOG(NOTICE, "Role argument \"master\" is deprecated, use \"server\"");
1609 		*role = MEMIF_ROLE_SERVER;
1610 	} else if (strstr(value, "slave") != NULL) {
1611 		MIF_LOG(NOTICE, "Role argument \"slave\" is deprecated, use \"client\"");
1612 		*role = MEMIF_ROLE_CLIENT;
1613 	} else {
1614 		MIF_LOG(ERR, "Unknown role: %s.", value);
1615 		return -EINVAL;
1616 	}
1617 	return 0;
1618 }
1619 
1620 static int
1621 memif_set_zc(const char *key __rte_unused, const char *value, void *extra_args)
1622 {
1623 	uint32_t *flags = (uint32_t *)extra_args;
1624 
1625 	if (strstr(value, "yes") != NULL) {
1626 		if (!rte_mcfg_get_single_file_segments()) {
1627 			MIF_LOG(ERR, "Zero-copy doesn't support multi-file segments.");
1628 			return -ENOTSUP;
1629 		}
1630 		*flags |= ETH_MEMIF_FLAG_ZERO_COPY;
1631 	} else if (strstr(value, "no") != NULL) {
1632 		*flags &= ~ETH_MEMIF_FLAG_ZERO_COPY;
1633 	} else {
1634 		MIF_LOG(ERR, "Failed to parse zero-copy param: %s.", value);
1635 		return -EINVAL;
1636 	}
1637 	return 0;
1638 }
1639 
1640 static int
1641 memif_set_id(const char *key __rte_unused, const char *value, void *extra_args)
1642 {
1643 	memif_interface_id_t *id = (memif_interface_id_t *)extra_args;
1644 
1645 	/* even if parsing fails, 0 is a valid id */
1646 	*id = strtoul(value, NULL, 10);
1647 	return 0;
1648 }
1649 
1650 static int
1651 memif_set_bs(const char *key __rte_unused, const char *value, void *extra_args)
1652 {
1653 	unsigned long tmp;
1654 	uint16_t *pkt_buffer_size = (uint16_t *)extra_args;
1655 
1656 	tmp = strtoul(value, NULL, 10);
1657 	if (tmp == 0 || tmp > 0xFFFF) {
1658 		MIF_LOG(ERR, "Invalid buffer size: %s.", value);
1659 		return -EINVAL;
1660 	}
1661 	*pkt_buffer_size = tmp;
1662 	return 0;
1663 }
1664 
1665 static int
1666 memif_set_rs(const char *key __rte_unused, const char *value, void *extra_args)
1667 {
1668 	unsigned long tmp;
1669 	memif_log2_ring_size_t *log2_ring_size =
1670 	    (memif_log2_ring_size_t *)extra_args;
1671 
1672 	tmp = strtoul(value, NULL, 10);
1673 	if (tmp == 0 || tmp > ETH_MEMIF_MAX_LOG2_RING_SIZE) {
1674 		MIF_LOG(ERR, "Invalid ring size: %s (max %u).",
1675 			value, ETH_MEMIF_MAX_LOG2_RING_SIZE);
1676 		return -EINVAL;
1677 	}
1678 	*log2_ring_size = tmp;
1679 	return 0;
1680 }
1681 
1682 /* check if directory exists and if we have permission to read/write */
1683 static int
1684 memif_check_socket_filename(const char *filename)
1685 {
1686 	char *dir = NULL, *tmp;
1687 	uint32_t idx;
1688 	int ret = 0;
1689 
1690 	if (strlen(filename) >= MEMIF_SOCKET_UN_SIZE) {
1691 		MIF_LOG(ERR, "Unix socket address too long (max 108).");
1692 		return -1;
1693 	}
1694 
1695 	tmp = strrchr(filename, '/');
1696 	if (tmp != NULL) {
1697 		idx = tmp - filename;
1698 		dir = rte_zmalloc("memif_tmp", sizeof(char) * (idx + 1), 0);
1699 		if (dir == NULL) {
1700 			MIF_LOG(ERR, "Failed to allocate memory.");
1701 			return -1;
1702 		}
1703 		strlcpy(dir, filename, sizeof(char) * (idx + 1));
1704 	}
1705 
1706 	if (dir == NULL || (faccessat(-1, dir, F_OK | R_OK |
1707 					W_OK, AT_EACCESS) < 0)) {
1708 		MIF_LOG(ERR, "Invalid socket directory.");
1709 		ret = -EINVAL;
1710 	}
1711 
1712 	rte_free(dir);
1713 
1714 	return ret;
1715 }
1716 
1717 static int
1718 memif_set_socket_filename(const char *key __rte_unused, const char *value,
1719 			  void *extra_args)
1720 {
1721 	const char **socket_filename = (const char **)extra_args;
1722 
1723 	*socket_filename = value;
1724 	return 0;
1725 }
1726 
1727 static int
1728 memif_set_is_socket_abstract(const char *key __rte_unused, const char *value, void *extra_args)
1729 {
1730 	uint32_t *flags = (uint32_t *)extra_args;
1731 
1732 	if (strstr(value, "yes") != NULL) {
1733 		*flags |= ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1734 	} else if (strstr(value, "no") != NULL) {
1735 		*flags &= ~ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1736 	} else {
1737 		MIF_LOG(ERR, "Failed to parse socket-abstract param: %s.", value);
1738 		return -EINVAL;
1739 	}
1740 	return 0;
1741 }
1742 
1743 static int
1744 memif_set_mac(const char *key __rte_unused, const char *value, void *extra_args)
1745 {
1746 	struct rte_ether_addr *ether_addr = (struct rte_ether_addr *)extra_args;
1747 
1748 	if (rte_ether_unformat_addr(value, ether_addr) < 0)
1749 		MIF_LOG(WARNING, "Failed to parse mac '%s'.", value);
1750 	return 0;
1751 }
1752 
1753 static int
1754 memif_set_secret(const char *key __rte_unused, const char *value, void *extra_args)
1755 {
1756 	const char **secret = (const char **)extra_args;
1757 
1758 	*secret = value;
1759 	return 0;
1760 }
1761 
1762 static int
1763 rte_pmd_memif_probe(struct rte_vdev_device *vdev)
1764 {
1765 	RTE_BUILD_BUG_ON(sizeof(memif_msg_t) != 128);
1766 	RTE_BUILD_BUG_ON(sizeof(memif_desc_t) != 16);
1767 	int ret = 0;
1768 	struct rte_kvargs *kvlist;
1769 	const char *name = rte_vdev_device_name(vdev);
1770 	enum memif_role_t role = MEMIF_ROLE_CLIENT;
1771 	memif_interface_id_t id = 0;
1772 	uint16_t pkt_buffer_size = ETH_MEMIF_DEFAULT_PKT_BUFFER_SIZE;
1773 	memif_log2_ring_size_t log2_ring_size = ETH_MEMIF_DEFAULT_RING_SIZE;
1774 	const char *socket_filename = ETH_MEMIF_DEFAULT_SOCKET_FILENAME;
1775 	uint32_t flags = 0;
1776 	const char *secret = NULL;
1777 	struct rte_ether_addr *ether_addr = rte_zmalloc("",
1778 		sizeof(struct rte_ether_addr), 0);
1779 	struct rte_eth_dev *eth_dev;
1780 
1781 	rte_eth_random_addr(ether_addr->addr_bytes);
1782 
1783 	MIF_LOG(INFO, "Initialize MEMIF: %s.", name);
1784 
1785 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1786 		eth_dev = rte_eth_dev_attach_secondary(name);
1787 		if (!eth_dev) {
1788 			MIF_LOG(ERR, "Failed to probe %s", name);
1789 			return -1;
1790 		}
1791 
1792 		eth_dev->dev_ops = &ops;
1793 		eth_dev->device = &vdev->device;
1794 		eth_dev->rx_pkt_burst = eth_memif_rx;
1795 		eth_dev->tx_pkt_burst = eth_memif_tx;
1796 
1797 		if (!rte_eal_primary_proc_alive(NULL)) {
1798 			MIF_LOG(ERR, "Primary process is missing");
1799 			return -1;
1800 		}
1801 
1802 		eth_dev->process_private = (struct pmd_process_private *)
1803 			rte_zmalloc(name,
1804 				sizeof(struct pmd_process_private),
1805 				RTE_CACHE_LINE_SIZE);
1806 		if (eth_dev->process_private == NULL) {
1807 			MIF_LOG(ERR,
1808 				"Failed to alloc memory for process private");
1809 			return -1;
1810 		}
1811 
1812 		rte_eth_dev_probing_finish(eth_dev);
1813 
1814 		return 0;
1815 	}
1816 
1817 	ret = rte_mp_action_register(MEMIF_MP_SEND_REGION, memif_mp_send_region);
1818 	/*
1819 	 * Primary process can continue probing, but secondary process won't
1820 	 * be able to get memory regions information
1821 	 */
1822 	if (ret < 0 && rte_errno != EEXIST)
1823 		MIF_LOG(WARNING, "Failed to register mp action callback: %s",
1824 			strerror(rte_errno));
1825 
1826 	/* use abstract address by default */
1827 	flags |= ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1828 
1829 	kvlist = rte_kvargs_parse(rte_vdev_device_args(vdev), valid_arguments);
1830 
1831 	/* parse parameters */
1832 	if (kvlist != NULL) {
1833 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ROLE_ARG,
1834 					 &memif_set_role, &role);
1835 		if (ret < 0)
1836 			goto exit;
1837 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ID_ARG,
1838 					 &memif_set_id, &id);
1839 		if (ret < 0)
1840 			goto exit;
1841 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_PKT_BUFFER_SIZE_ARG,
1842 					 &memif_set_bs, &pkt_buffer_size);
1843 		if (ret < 0)
1844 			goto exit;
1845 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_RING_SIZE_ARG,
1846 					 &memif_set_rs, &log2_ring_size);
1847 		if (ret < 0)
1848 			goto exit;
1849 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SOCKET_ARG,
1850 					 &memif_set_socket_filename,
1851 					 (void *)(&socket_filename));
1852 		if (ret < 0)
1853 			goto exit;
1854 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SOCKET_ABSTRACT_ARG,
1855 					 &memif_set_is_socket_abstract, &flags);
1856 		if (ret < 0)
1857 			goto exit;
1858 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_MAC_ARG,
1859 					 &memif_set_mac, ether_addr);
1860 		if (ret < 0)
1861 			goto exit;
1862 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ZC_ARG,
1863 					 &memif_set_zc, &flags);
1864 		if (ret < 0)
1865 			goto exit;
1866 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SECRET_ARG,
1867 					 &memif_set_secret, (void *)(&secret));
1868 		if (ret < 0)
1869 			goto exit;
1870 	}
1871 
1872 	if (!(flags & ETH_MEMIF_FLAG_SOCKET_ABSTRACT)) {
1873 		ret = memif_check_socket_filename(socket_filename);
1874 		if (ret < 0)
1875 			goto exit;
1876 	}
1877 
1878 	/* create interface */
1879 	ret = memif_create(vdev, role, id, flags, socket_filename,
1880 			   log2_ring_size, pkt_buffer_size, secret, ether_addr);
1881 
1882 exit:
1883 	rte_kvargs_free(kvlist);
1884 	return ret;
1885 }
1886 
1887 static int
1888 rte_pmd_memif_remove(struct rte_vdev_device *vdev)
1889 {
1890 	struct rte_eth_dev *eth_dev;
1891 
1892 	eth_dev = rte_eth_dev_allocated(rte_vdev_device_name(vdev));
1893 	if (eth_dev == NULL)
1894 		return 0;
1895 
1896 	return rte_eth_dev_close(eth_dev->data->port_id);
1897 }
1898 
1899 static struct rte_vdev_driver pmd_memif_drv = {
1900 	.probe = rte_pmd_memif_probe,
1901 	.remove = rte_pmd_memif_remove,
1902 };
1903 
1904 RTE_PMD_REGISTER_VDEV(net_memif, pmd_memif_drv);
1905 
1906 RTE_PMD_REGISTER_PARAM_STRING(net_memif,
1907 			      ETH_MEMIF_ID_ARG "=<int>"
1908 			      ETH_MEMIF_ROLE_ARG "=server|client"
1909 			      ETH_MEMIF_PKT_BUFFER_SIZE_ARG "=<int>"
1910 			      ETH_MEMIF_RING_SIZE_ARG "=<int>"
1911 			      ETH_MEMIF_SOCKET_ARG "=<string>"
1912 				  ETH_MEMIF_SOCKET_ABSTRACT_ARG "=yes|no"
1913 			      ETH_MEMIF_MAC_ARG "=xx:xx:xx:xx:xx:xx"
1914 			      ETH_MEMIF_ZC_ARG "=yes|no"
1915 			      ETH_MEMIF_SECRET_ARG "=<string>");
1916 
1917 RTE_LOG_REGISTER_DEFAULT(memif_logtype, NOTICE);
1918