xref: /dpdk/drivers/net/memif/rte_eth_memif.c (revision 7917b0d38e92e8b9ec5a870415b791420e10f11a)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright 2018-2019 Cisco Systems, Inc.  All rights reserved.
3  */
4 
5 #include <stdlib.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <sys/un.h>
11 #include <sys/ioctl.h>
12 #include <sys/mman.h>
13 #include <linux/if_ether.h>
14 #include <errno.h>
15 #include <sys/eventfd.h>
16 
17 #include <rte_version.h>
18 #include <rte_mbuf.h>
19 #include <rte_ether.h>
20 #include <ethdev_driver.h>
21 #include <ethdev_vdev.h>
22 #include <rte_malloc.h>
23 #include <rte_kvargs.h>
24 #include <bus_vdev_driver.h>
25 #include <rte_string_fns.h>
26 #include <rte_errno.h>
27 #include <rte_memory.h>
28 #include <rte_memzone.h>
29 #include <rte_eal_memconfig.h>
30 
31 #include "rte_eth_memif.h"
32 #include "memif_socket.h"
33 
34 #define ETH_MEMIF_ID_ARG		"id"
35 #define ETH_MEMIF_ROLE_ARG		"role"
36 #define ETH_MEMIF_PKT_BUFFER_SIZE_ARG	"bsize"
37 #define ETH_MEMIF_RING_SIZE_ARG		"rsize"
38 #define ETH_MEMIF_SOCKET_ARG		"socket"
39 #define ETH_MEMIF_SOCKET_ABSTRACT_ARG	"socket-abstract"
40 #define ETH_MEMIF_OWNER_UID_ARG		"owner-uid"
41 #define ETH_MEMIF_OWNER_GID_ARG		"owner-gid"
42 #define ETH_MEMIF_MAC_ARG		"mac"
43 #define ETH_MEMIF_ZC_ARG		"zero-copy"
44 #define ETH_MEMIF_SECRET_ARG		"secret"
45 
46 static const char * const valid_arguments[] = {
47 	ETH_MEMIF_ID_ARG,
48 	ETH_MEMIF_ROLE_ARG,
49 	ETH_MEMIF_PKT_BUFFER_SIZE_ARG,
50 	ETH_MEMIF_RING_SIZE_ARG,
51 	ETH_MEMIF_SOCKET_ARG,
52 	ETH_MEMIF_SOCKET_ABSTRACT_ARG,
53 	ETH_MEMIF_OWNER_UID_ARG,
54 	ETH_MEMIF_OWNER_GID_ARG,
55 	ETH_MEMIF_MAC_ARG,
56 	ETH_MEMIF_ZC_ARG,
57 	ETH_MEMIF_SECRET_ARG,
58 	NULL
59 };
60 
61 static const struct rte_eth_link pmd_link = {
62 	.link_speed = RTE_ETH_SPEED_NUM_100G,
63 	.link_duplex = RTE_ETH_LINK_FULL_DUPLEX,
64 	.link_status = RTE_ETH_LINK_DOWN,
65 	.link_autoneg = RTE_ETH_LINK_AUTONEG
66 };
67 
68 #define MEMIF_MP_SEND_REGION		"memif_mp_send_region"
69 
70 
71 static int memif_region_init_zc(const struct rte_memseg_list *msl,
72 				const struct rte_memseg *ms, void *arg);
73 
74 const char *
75 memif_version(void)
76 {
77 	return ("memif-" RTE_STR(MEMIF_VERSION_MAJOR) "." RTE_STR(MEMIF_VERSION_MINOR));
78 }
79 
80 /* Message header to synchronize regions */
81 struct mp_region_msg {
82 	char port_name[RTE_DEV_NAME_MAX_LEN];
83 	memif_region_index_t idx;
84 	memif_region_size_t size;
85 };
86 
87 static int
88 memif_mp_send_region(const struct rte_mp_msg *msg, const void *peer)
89 {
90 	struct rte_eth_dev *dev;
91 	struct pmd_process_private *proc_private;
92 	const struct mp_region_msg *msg_param = (const struct mp_region_msg *)msg->param;
93 	struct rte_mp_msg reply;
94 	struct mp_region_msg *reply_param = (struct mp_region_msg *)reply.param;
95 
96 	/* Get requested port */
97 	dev = rte_eth_dev_get_by_name(msg_param->port_name);
98 	if (!dev) {
99 		MIF_LOG(ERR, "Failed to get port id for %s",
100 			msg_param->port_name);
101 		return -1;
102 	}
103 	proc_private = dev->process_private;
104 
105 	memset(&reply, 0, sizeof(reply));
106 	strlcpy(reply.name, msg->name, sizeof(reply.name));
107 	reply_param->idx = msg_param->idx;
108 	if (proc_private->regions[msg_param->idx] != NULL) {
109 		reply_param->size = proc_private->regions[msg_param->idx]->region_size;
110 		reply.fds[0] = proc_private->regions[msg_param->idx]->fd;
111 		reply.num_fds = 1;
112 	}
113 	reply.len_param = sizeof(*reply_param);
114 	if (rte_mp_reply(&reply, peer) < 0) {
115 		MIF_LOG(ERR, "Failed to reply to an add region request");
116 		return -1;
117 	}
118 
119 	return 0;
120 }
121 
122 /*
123  * Request regions
124  * Called by secondary process, when ports link status goes up.
125  */
126 static int
127 memif_mp_request_regions(struct rte_eth_dev *dev)
128 {
129 	int ret, i;
130 	struct timespec timeout = {.tv_sec = 5, .tv_nsec = 0};
131 	struct rte_mp_msg msg, *reply;
132 	struct rte_mp_reply replies;
133 	struct mp_region_msg *msg_param = (struct mp_region_msg *)msg.param;
134 	struct mp_region_msg *reply_param;
135 	struct memif_region *r;
136 	struct pmd_process_private *proc_private = dev->process_private;
137 	struct pmd_internals *pmd = dev->data->dev_private;
138 	/* in case of zero-copy client, only request region 0 */
139 	uint16_t max_region_num = (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) ?
140 				   1 : ETH_MEMIF_MAX_REGION_NUM;
141 
142 	MIF_LOG(DEBUG, "Requesting memory regions");
143 
144 	for (i = 0; i < max_region_num; i++) {
145 		/* Prepare the message */
146 		memset(&msg, 0, sizeof(msg));
147 		strlcpy(msg.name, MEMIF_MP_SEND_REGION, sizeof(msg.name));
148 		strlcpy(msg_param->port_name, dev->data->name,
149 			sizeof(msg_param->port_name));
150 		msg_param->idx = i;
151 		msg.len_param = sizeof(*msg_param);
152 
153 		/* Send message */
154 		ret = rte_mp_request_sync(&msg, &replies, &timeout);
155 		if (ret < 0 || replies.nb_received != 1) {
156 			MIF_LOG(ERR, "Failed to send mp msg: %d",
157 				rte_errno);
158 			return -1;
159 		}
160 
161 		reply = &replies.msgs[0];
162 		reply_param = (struct mp_region_msg *)reply->param;
163 
164 		if (reply_param->size > 0) {
165 			r = rte_zmalloc("region", sizeof(struct memif_region), 0);
166 			if (r == NULL) {
167 				MIF_LOG(ERR, "Failed to alloc memif region.");
168 				free(reply);
169 				return -ENOMEM;
170 			}
171 			r->region_size = reply_param->size;
172 			if (reply->num_fds < 1) {
173 				MIF_LOG(ERR, "Missing file descriptor.");
174 				free(reply);
175 				return -1;
176 			}
177 			r->fd = reply->fds[0];
178 			r->addr = NULL;
179 
180 			proc_private->regions[reply_param->idx] = r;
181 			proc_private->regions_num++;
182 		}
183 		free(reply);
184 	}
185 
186 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
187 		ret = rte_memseg_walk(memif_region_init_zc, (void *)proc_private);
188 		if (ret < 0)
189 			return ret;
190 	}
191 
192 	return memif_connect(dev);
193 }
194 
195 static int
196 memif_dev_info(struct rte_eth_dev *dev __rte_unused, struct rte_eth_dev_info *dev_info)
197 {
198 	dev_info->max_mac_addrs = 1;
199 	dev_info->max_rx_pktlen = RTE_ETHER_MAX_LEN;
200 	dev_info->max_rx_queues = ETH_MEMIF_MAX_NUM_Q_PAIRS;
201 	dev_info->max_tx_queues = ETH_MEMIF_MAX_NUM_Q_PAIRS;
202 	dev_info->min_rx_bufsize = 0;
203 	dev_info->tx_offload_capa = RTE_ETH_TX_OFFLOAD_MULTI_SEGS;
204 
205 	return 0;
206 }
207 
208 static memif_ring_t *
209 memif_get_ring(struct pmd_internals *pmd, struct pmd_process_private *proc_private,
210 	       memif_ring_type_t type, uint16_t ring_num)
211 {
212 	/* rings only in region 0 */
213 	void *p = proc_private->regions[0]->addr;
214 	int ring_size = sizeof(memif_ring_t) + sizeof(memif_desc_t) *
215 	    (1 << pmd->run.log2_ring_size);
216 
217 	p = (uint8_t *)p + (ring_num + type * pmd->run.num_c2s_rings) * ring_size;
218 
219 	return (memif_ring_t *)p;
220 }
221 
222 static memif_region_offset_t
223 memif_get_ring_offset(struct rte_eth_dev *dev, struct memif_queue *mq,
224 		      memif_ring_type_t type, uint16_t num)
225 {
226 	struct pmd_internals *pmd = dev->data->dev_private;
227 	struct pmd_process_private *proc_private = dev->process_private;
228 
229 	return ((uint8_t *)memif_get_ring(pmd, proc_private, type, num) -
230 		(uint8_t *)proc_private->regions[mq->region]->addr);
231 }
232 
233 static memif_ring_t *
234 memif_get_ring_from_queue(struct pmd_process_private *proc_private,
235 			  struct memif_queue *mq)
236 {
237 	struct memif_region *r;
238 
239 	r = proc_private->regions[mq->region];
240 	if (r == NULL)
241 		return NULL;
242 
243 	return (memif_ring_t *)((uint8_t *)r->addr + mq->ring_offset);
244 }
245 
246 static void *
247 memif_get_buffer(struct pmd_process_private *proc_private, memif_desc_t *d)
248 {
249 	return ((uint8_t *)proc_private->regions[d->region]->addr + d->offset);
250 }
251 
252 /* Free mbufs received by server */
253 static void
254 memif_free_stored_mbufs(struct pmd_process_private *proc_private, struct memif_queue *mq)
255 {
256 	uint16_t cur_tail;
257 	uint16_t mask = (1 << mq->log2_ring_size) - 1;
258 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
259 
260 	/* FIXME: improve performance */
261 	/* The ring->tail acts as a guard variable between Tx and Rx
262 	 * threads, so using load-acquire pairs with store-release
263 	 * in function eth_memif_rx for C2S queues.
264 	 */
265 	cur_tail = rte_atomic_load_explicit(&ring->tail, rte_memory_order_acquire);
266 	while (mq->last_tail != cur_tail) {
267 		RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail + 1) & mask]);
268 		rte_pktmbuf_free_seg(mq->buffers[mq->last_tail & mask]);
269 		mq->last_tail++;
270 	}
271 }
272 
273 static int
274 memif_pktmbuf_chain(struct rte_mbuf *head, struct rte_mbuf *cur_tail,
275 		    struct rte_mbuf *tail)
276 {
277 	/* Check for number-of-segments-overflow */
278 	if (unlikely(head->nb_segs + tail->nb_segs > RTE_MBUF_MAX_NB_SEGS))
279 		return -EOVERFLOW;
280 
281 	/* Chain 'tail' onto the old tail */
282 	cur_tail->next = tail;
283 
284 	/* accumulate number of segments and total length. */
285 	head->nb_segs = (uint16_t)(head->nb_segs + tail->nb_segs);
286 
287 	tail->pkt_len = tail->data_len;
288 	head->pkt_len += tail->pkt_len;
289 
290 	return 0;
291 }
292 
293 static uint16_t
294 eth_memif_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
295 {
296 	struct memif_queue *mq = queue;
297 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
298 	struct pmd_process_private *proc_private =
299 		rte_eth_devices[mq->in_port].process_private;
300 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
301 	uint16_t cur_slot, last_slot, n_slots, ring_size, mask, s0;
302 	uint16_t pkts, rx_pkts, n_rx_pkts = 0;
303 	uint16_t mbuf_size = rte_pktmbuf_data_room_size(mq->mempool) -
304 		RTE_PKTMBUF_HEADROOM;
305 	uint16_t src_len, src_off, dst_len, dst_off, cp_len;
306 	memif_ring_type_t type = mq->type;
307 	memif_desc_t *d0;
308 	struct rte_mbuf *mbuf, *mbuf_head, *mbuf_tail;
309 	uint64_t b;
310 	ssize_t size __rte_unused;
311 	uint16_t head;
312 	int ret;
313 	struct rte_eth_link link;
314 
315 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
316 		return 0;
317 	if (unlikely(ring == NULL)) {
318 		/* Secondary process will attempt to request regions. */
319 		ret = rte_eth_link_get(mq->in_port, &link);
320 		if (ret < 0)
321 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
322 				mq->in_port, rte_strerror(-ret));
323 		return 0;
324 	}
325 
326 	/* consume interrupt */
327 	if (((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) &&
328 	    (rte_intr_fd_get(mq->intr_handle) >= 0))
329 		size = read(rte_intr_fd_get(mq->intr_handle), &b,
330 			    sizeof(b));
331 
332 	ring_size = 1 << mq->log2_ring_size;
333 	mask = ring_size - 1;
334 
335 	if (type == MEMIF_RING_C2S) {
336 		cur_slot = mq->last_head;
337 		last_slot = rte_atomic_load_explicit(&ring->head, rte_memory_order_acquire);
338 	} else {
339 		cur_slot = mq->last_tail;
340 		last_slot = rte_atomic_load_explicit(&ring->tail, rte_memory_order_acquire);
341 	}
342 
343 	if (cur_slot == last_slot)
344 		goto refill;
345 	n_slots = last_slot - cur_slot;
346 
347 	if (likely(mbuf_size >= pmd->cfg.pkt_buffer_size)) {
348 		struct rte_mbuf *mbufs[MAX_PKT_BURST];
349 next_bulk:
350 		ret = rte_pktmbuf_alloc_bulk(mq->mempool, mbufs, MAX_PKT_BURST);
351 		if (unlikely(ret < 0))
352 			goto no_free_bufs;
353 
354 		rx_pkts = 0;
355 		pkts = nb_pkts < MAX_PKT_BURST ? nb_pkts : MAX_PKT_BURST;
356 		while (n_slots && rx_pkts < pkts) {
357 			mbuf_head = mbufs[rx_pkts];
358 			mbuf = mbuf_head;
359 
360 next_slot1:
361 			mbuf->port = mq->in_port;
362 			s0 = cur_slot & mask;
363 			d0 = &ring->desc[s0];
364 
365 			cp_len = d0->length;
366 
367 			rte_pktmbuf_data_len(mbuf) = cp_len;
368 			rte_pktmbuf_pkt_len(mbuf) = cp_len;
369 			if (mbuf != mbuf_head)
370 				rte_pktmbuf_pkt_len(mbuf_head) += cp_len;
371 
372 			rte_memcpy(rte_pktmbuf_mtod(mbuf, void *),
373 				(uint8_t *)memif_get_buffer(proc_private, d0), cp_len);
374 
375 			cur_slot++;
376 			n_slots--;
377 
378 			if (d0->flags & MEMIF_DESC_FLAG_NEXT) {
379 				mbuf_tail = mbuf;
380 				mbuf = rte_pktmbuf_alloc(mq->mempool);
381 				if (unlikely(mbuf == NULL)) {
382 					rte_pktmbuf_free_bulk(mbufs + rx_pkts,
383 							MAX_PKT_BURST - rx_pkts);
384 					goto no_free_bufs;
385 				}
386 				ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
387 				if (unlikely(ret < 0)) {
388 					MIF_LOG(ERR, "number-of-segments-overflow");
389 					rte_pktmbuf_free(mbuf);
390 					rte_pktmbuf_free_bulk(mbufs + rx_pkts,
391 							MAX_PKT_BURST - rx_pkts);
392 					goto no_free_bufs;
393 				}
394 				goto next_slot1;
395 			}
396 
397 			mq->n_bytes += rte_pktmbuf_pkt_len(mbuf_head);
398 			*bufs++ = mbuf_head;
399 			rx_pkts++;
400 			n_rx_pkts++;
401 		}
402 
403 		if (rx_pkts < MAX_PKT_BURST) {
404 			rte_pktmbuf_free_bulk(mbufs + rx_pkts, MAX_PKT_BURST - rx_pkts);
405 		} else {
406 			nb_pkts -= rx_pkts;
407 			if (nb_pkts)
408 				goto next_bulk;
409 		}
410 	} else {
411 		while (n_slots && n_rx_pkts < nb_pkts) {
412 			mbuf_head = rte_pktmbuf_alloc(mq->mempool);
413 			if (unlikely(mbuf_head == NULL))
414 				goto no_free_bufs;
415 			mbuf = mbuf_head;
416 			mbuf->port = mq->in_port;
417 
418 next_slot2:
419 			s0 = cur_slot & mask;
420 			d0 = &ring->desc[s0];
421 
422 			src_len = d0->length;
423 			dst_off = 0;
424 			src_off = 0;
425 
426 			do {
427 				dst_len = mbuf_size - dst_off;
428 				if (dst_len == 0) {
429 					dst_off = 0;
430 					dst_len = mbuf_size;
431 
432 					/* store pointer to tail */
433 					mbuf_tail = mbuf;
434 					mbuf = rte_pktmbuf_alloc(mq->mempool);
435 					if (unlikely(mbuf == NULL))
436 						goto no_free_bufs;
437 					mbuf->port = mq->in_port;
438 					ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
439 					if (unlikely(ret < 0)) {
440 						MIF_LOG(ERR, "number-of-segments-overflow");
441 						rte_pktmbuf_free(mbuf);
442 						goto no_free_bufs;
443 					}
444 				}
445 				cp_len = RTE_MIN(dst_len, src_len);
446 
447 				rte_pktmbuf_data_len(mbuf) += cp_len;
448 				rte_pktmbuf_pkt_len(mbuf) = rte_pktmbuf_data_len(mbuf);
449 				if (mbuf != mbuf_head)
450 					rte_pktmbuf_pkt_len(mbuf_head) += cp_len;
451 
452 				rte_memcpy(rte_pktmbuf_mtod_offset(mbuf, void *,
453 								   dst_off),
454 					(uint8_t *)memif_get_buffer(proc_private, d0) +
455 					src_off, cp_len);
456 
457 				src_off += cp_len;
458 				dst_off += cp_len;
459 				src_len -= cp_len;
460 			} while (src_len);
461 
462 			cur_slot++;
463 			n_slots--;
464 
465 			if (d0->flags & MEMIF_DESC_FLAG_NEXT)
466 				goto next_slot2;
467 
468 			mq->n_bytes += rte_pktmbuf_pkt_len(mbuf_head);
469 			*bufs++ = mbuf_head;
470 			n_rx_pkts++;
471 		}
472 	}
473 
474 no_free_bufs:
475 	if (type == MEMIF_RING_C2S) {
476 		rte_atomic_store_explicit(&ring->tail, cur_slot, rte_memory_order_release);
477 		mq->last_head = cur_slot;
478 	} else {
479 		mq->last_tail = cur_slot;
480 	}
481 
482 refill:
483 	if (type == MEMIF_RING_S2C) {
484 		/* ring->head is updated by the receiver and this function
485 		 * is called in the context of receiver thread. The loads in
486 		 * the receiver do not need to synchronize with its own stores.
487 		 */
488 		head = rte_atomic_load_explicit(&ring->head, rte_memory_order_relaxed);
489 		n_slots = ring_size - head + mq->last_tail;
490 
491 		while (n_slots--) {
492 			s0 = head++ & mask;
493 			d0 = &ring->desc[s0];
494 			d0->length = pmd->run.pkt_buffer_size;
495 		}
496 		rte_atomic_store_explicit(&ring->head, head, rte_memory_order_release);
497 	}
498 
499 	mq->n_pkts += n_rx_pkts;
500 	return n_rx_pkts;
501 }
502 
503 static uint16_t
504 eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
505 {
506 	struct memif_queue *mq = queue;
507 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
508 	struct pmd_process_private *proc_private =
509 		rte_eth_devices[mq->in_port].process_private;
510 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
511 	uint16_t cur_slot, last_slot, n_slots, ring_size, mask, s0, head;
512 	uint16_t n_rx_pkts = 0;
513 	memif_desc_t *d0;
514 	struct rte_mbuf *mbuf, *mbuf_tail;
515 	struct rte_mbuf *mbuf_head = NULL;
516 	int ret;
517 	struct rte_eth_link link;
518 
519 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
520 		return 0;
521 	if (unlikely(ring == NULL)) {
522 		/* Secondary process will attempt to request regions. */
523 		rte_eth_link_get(mq->in_port, &link);
524 		return 0;
525 	}
526 
527 	/* consume interrupt */
528 	if ((rte_intr_fd_get(mq->intr_handle) >= 0) &&
529 	    ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0)) {
530 		uint64_t b;
531 		ssize_t size __rte_unused;
532 		size = read(rte_intr_fd_get(mq->intr_handle), &b,
533 			    sizeof(b));
534 	}
535 
536 	ring_size = 1 << mq->log2_ring_size;
537 	mask = ring_size - 1;
538 
539 	cur_slot = mq->last_tail;
540 	/* The ring->tail acts as a guard variable between Tx and Rx
541 	 * threads, so using load-acquire pairs with store-release
542 	 * to synchronize it between threads.
543 	 */
544 	last_slot = rte_atomic_load_explicit(&ring->tail, rte_memory_order_acquire);
545 	if (cur_slot == last_slot)
546 		goto refill;
547 	n_slots = last_slot - cur_slot;
548 
549 	while (n_slots && n_rx_pkts < nb_pkts) {
550 		s0 = cur_slot & mask;
551 
552 		d0 = &ring->desc[s0];
553 		mbuf_head = mq->buffers[s0];
554 		mbuf = mbuf_head;
555 
556 next_slot:
557 		/* prefetch next descriptor */
558 		if (n_rx_pkts + 1 < nb_pkts)
559 			rte_prefetch0(&ring->desc[(cur_slot + 1) & mask]);
560 
561 		mbuf->port = mq->in_port;
562 		rte_pktmbuf_data_len(mbuf) = d0->length;
563 		rte_pktmbuf_pkt_len(mbuf) = rte_pktmbuf_data_len(mbuf);
564 
565 		mq->n_bytes += rte_pktmbuf_data_len(mbuf);
566 
567 		cur_slot++;
568 		n_slots--;
569 		if (d0->flags & MEMIF_DESC_FLAG_NEXT) {
570 			s0 = cur_slot & mask;
571 			d0 = &ring->desc[s0];
572 			mbuf_tail = mbuf;
573 			mbuf = mq->buffers[s0];
574 			ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
575 			if (unlikely(ret < 0)) {
576 				MIF_LOG(ERR, "number-of-segments-overflow");
577 				goto refill;
578 			}
579 			goto next_slot;
580 		}
581 
582 		*bufs++ = mbuf_head;
583 		n_rx_pkts++;
584 	}
585 
586 	mq->last_tail = cur_slot;
587 
588 /* Supply server with new buffers */
589 refill:
590 	/* ring->head is updated by the receiver and this function
591 	 * is called in the context of receiver thread. The loads in
592 	 * the receiver do not need to synchronize with its own stores.
593 	 */
594 	head = rte_atomic_load_explicit(&ring->head, rte_memory_order_relaxed);
595 	n_slots = ring_size - head + mq->last_tail;
596 
597 	if (n_slots < 32)
598 		goto no_free_mbufs;
599 
600 	ret = rte_pktmbuf_alloc_bulk(mq->mempool, &mq->buffers[head & mask], n_slots);
601 	if (unlikely(ret < 0))
602 		goto no_free_mbufs;
603 	if (unlikely(n_slots > ring_size - (head & mask))) {
604 		rte_memcpy(mq->buffers, &mq->buffers[ring_size],
605 			(n_slots + (head & mask) - ring_size) * sizeof(struct rte_mbuf *));
606 	}
607 
608 	while (n_slots--) {
609 		s0 = head++ & mask;
610 		if (n_slots > 0)
611 			rte_prefetch0(mq->buffers[head & mask]);
612 		d0 = &ring->desc[s0];
613 		/* store buffer header */
614 		mbuf = mq->buffers[s0];
615 		/* populate descriptor */
616 		d0->length = rte_pktmbuf_data_room_size(mq->mempool) -
617 				RTE_PKTMBUF_HEADROOM;
618 		d0->region = 1;
619 		d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
620 			(uint8_t *)proc_private->regions[d0->region]->addr;
621 	}
622 no_free_mbufs:
623 	/* The ring->head acts as a guard variable between Tx and Rx
624 	 * threads, so using store-release pairs with load-acquire
625 	 * in function eth_memif_tx.
626 	 */
627 	rte_atomic_store_explicit(&ring->head, head, rte_memory_order_release);
628 
629 	mq->n_pkts += n_rx_pkts;
630 
631 	return n_rx_pkts;
632 }
633 
634 static uint16_t
635 eth_memif_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
636 {
637 	struct memif_queue *mq = queue;
638 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
639 	struct pmd_process_private *proc_private =
640 		rte_eth_devices[mq->in_port].process_private;
641 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
642 	uint16_t slot, saved_slot, n_free, ring_size, mask, n_tx_pkts = 0;
643 	uint16_t src_len, src_off, dst_len, dst_off, cp_len, nb_segs;
644 	memif_ring_type_t type = mq->type;
645 	memif_desc_t *d0;
646 	struct rte_mbuf *mbuf;
647 	struct rte_mbuf *mbuf_head;
648 	uint64_t a;
649 	ssize_t size;
650 	struct rte_eth_link link;
651 
652 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
653 		return 0;
654 	if (unlikely(ring == NULL)) {
655 		int ret;
656 
657 		/* Secondary process will attempt to request regions. */
658 		ret = rte_eth_link_get(mq->in_port, &link);
659 		if (ret < 0)
660 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
661 				mq->in_port, rte_strerror(-ret));
662 		return 0;
663 	}
664 
665 	ring_size = 1 << mq->log2_ring_size;
666 	mask = ring_size - 1;
667 
668 	if (type == MEMIF_RING_C2S) {
669 		/* For C2S queues ring->head is updated by the sender and
670 		 * this function is called in the context of sending thread.
671 		 * The loads in the sender do not need to synchronize with
672 		 * its own stores. Hence, the following load can be a
673 		 * relaxed load.
674 		 */
675 		slot = rte_atomic_load_explicit(&ring->head, rte_memory_order_relaxed);
676 		n_free = ring_size - slot +
677 				rte_atomic_load_explicit(&ring->tail, rte_memory_order_acquire);
678 	} else {
679 		/* For S2C queues ring->tail is updated by the sender and
680 		 * this function is called in the context of sending thread.
681 		 * The loads in the sender do not need to synchronize with
682 		 * its own stores. Hence, the following load can be a
683 		 * relaxed load.
684 		 */
685 		slot = rte_atomic_load_explicit(&ring->tail, rte_memory_order_relaxed);
686 		n_free = rte_atomic_load_explicit(&ring->head, rte_memory_order_acquire) - slot;
687 	}
688 
689 	uint16_t i;
690 	struct rte_mbuf **buf_tmp = bufs;
691 	mbuf_head = *buf_tmp++;
692 	struct rte_mempool *mp = mbuf_head->pool;
693 
694 	for (i = 1; i < nb_pkts; i++) {
695 		mbuf_head = *buf_tmp++;
696 		if (mbuf_head->pool != mp)
697 			break;
698 	}
699 
700 	uint16_t mbuf_size = rte_pktmbuf_data_room_size(mp) - RTE_PKTMBUF_HEADROOM;
701 	if (i == nb_pkts && pmd->cfg.pkt_buffer_size >= mbuf_size) {
702 		buf_tmp = bufs;
703 		while (n_tx_pkts < nb_pkts && n_free) {
704 			mbuf_head = *bufs++;
705 			nb_segs = mbuf_head->nb_segs;
706 			mbuf = mbuf_head;
707 
708 			saved_slot = slot;
709 
710 next_in_chain1:
711 			d0 = &ring->desc[slot & mask];
712 			cp_len = rte_pktmbuf_data_len(mbuf);
713 
714 			rte_memcpy((uint8_t *)memif_get_buffer(proc_private, d0),
715 				rte_pktmbuf_mtod(mbuf, void *), cp_len);
716 
717 			d0->length = cp_len;
718 			mq->n_bytes += cp_len;
719 			slot++;
720 			n_free--;
721 
722 			if (--nb_segs > 0) {
723 				if (n_free) {
724 					d0->flags |= MEMIF_DESC_FLAG_NEXT;
725 					mbuf = mbuf->next;
726 					goto next_in_chain1;
727 				} else {
728 					slot = saved_slot;
729 					goto free_mbufs;
730 				}
731 			}
732 
733 			n_tx_pkts++;
734 		}
735 free_mbufs:
736 		rte_pktmbuf_free_bulk(buf_tmp, n_tx_pkts);
737 	} else {
738 		while (n_tx_pkts < nb_pkts && n_free) {
739 			mbuf_head = *bufs++;
740 			nb_segs = mbuf_head->nb_segs;
741 			mbuf = mbuf_head;
742 
743 			saved_slot = slot;
744 			d0 = &ring->desc[slot & mask];
745 			dst_off = 0;
746 			dst_len = (type == MEMIF_RING_C2S) ?
747 				pmd->run.pkt_buffer_size : d0->length;
748 
749 next_in_chain2:
750 			src_off = 0;
751 			src_len = rte_pktmbuf_data_len(mbuf);
752 
753 			while (src_len) {
754 				if (dst_len == 0) {
755 					if (n_free) {
756 						slot++;
757 						n_free--;
758 						d0->flags |= MEMIF_DESC_FLAG_NEXT;
759 						d0 = &ring->desc[slot & mask];
760 						dst_off = 0;
761 						dst_len = (type == MEMIF_RING_C2S) ?
762 						    pmd->run.pkt_buffer_size : d0->length;
763 						d0->flags = 0;
764 					} else {
765 						slot = saved_slot;
766 						goto no_free_slots;
767 					}
768 				}
769 				cp_len = RTE_MIN(dst_len, src_len);
770 
771 				rte_memcpy((uint8_t *)memif_get_buffer(proc_private,
772 								       d0) + dst_off,
773 					rte_pktmbuf_mtod_offset(mbuf, void *, src_off),
774 					cp_len);
775 
776 				mq->n_bytes += cp_len;
777 				src_off += cp_len;
778 				dst_off += cp_len;
779 				src_len -= cp_len;
780 				dst_len -= cp_len;
781 
782 				d0->length = dst_off;
783 			}
784 
785 			if (--nb_segs > 0) {
786 				mbuf = mbuf->next;
787 				goto next_in_chain2;
788 			}
789 
790 			n_tx_pkts++;
791 			slot++;
792 			n_free--;
793 			rte_pktmbuf_free(mbuf_head);
794 		}
795 	}
796 
797 no_free_slots:
798 	if (type == MEMIF_RING_C2S)
799 		rte_atomic_store_explicit(&ring->head, slot, rte_memory_order_release);
800 	else
801 		rte_atomic_store_explicit(&ring->tail, slot, rte_memory_order_release);
802 
803 	if (((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) &&
804 	    (rte_intr_fd_get(mq->intr_handle) >= 0)) {
805 		a = 1;
806 		size = write(rte_intr_fd_get(mq->intr_handle), &a,
807 			     sizeof(a));
808 		if (unlikely(size < 0)) {
809 			MIF_LOG(WARNING,
810 				"Failed to send interrupt. %s", strerror(errno));
811 		}
812 	}
813 
814 	mq->n_pkts += n_tx_pkts;
815 	return n_tx_pkts;
816 }
817 
818 static int
819 memif_tx_one_zc(struct pmd_process_private *proc_private, struct memif_queue *mq,
820 		memif_ring_t *ring, struct rte_mbuf *mbuf, const uint16_t mask,
821 		uint16_t slot, uint16_t n_free)
822 {
823 	memif_desc_t *d0;
824 	uint16_t nb_segs = mbuf->nb_segs;
825 	int used_slots = 1;
826 
827 next_in_chain:
828 	/* store pointer to mbuf to free it later */
829 	mq->buffers[slot & mask] = mbuf;
830 	/* populate descriptor */
831 	d0 = &ring->desc[slot & mask];
832 	d0->length = rte_pktmbuf_data_len(mbuf);
833 	mq->n_bytes += rte_pktmbuf_data_len(mbuf);
834 	/* FIXME: get region index */
835 	d0->region = 1;
836 	d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
837 		(uint8_t *)proc_private->regions[d0->region]->addr;
838 	d0->flags = 0;
839 
840 	/* check if buffer is chained */
841 	if (--nb_segs > 0) {
842 		if (n_free < 2)
843 			return 0;
844 		/* mark buffer as chained */
845 		d0->flags |= MEMIF_DESC_FLAG_NEXT;
846 		/* advance mbuf */
847 		mbuf = mbuf->next;
848 		/* update counters */
849 		used_slots++;
850 		slot++;
851 		n_free--;
852 		goto next_in_chain;
853 	}
854 	return used_slots;
855 }
856 
857 static uint16_t
858 eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
859 {
860 	struct memif_queue *mq = queue;
861 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
862 	struct pmd_process_private *proc_private =
863 		rte_eth_devices[mq->in_port].process_private;
864 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
865 	uint16_t slot, n_free, ring_size, mask, n_tx_pkts = 0;
866 	struct rte_eth_link link;
867 
868 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
869 		return 0;
870 	if (unlikely(ring == NULL)) {
871 		/* Secondary process will attempt to request regions. */
872 		rte_eth_link_get(mq->in_port, &link);
873 		return 0;
874 	}
875 
876 	ring_size = 1 << mq->log2_ring_size;
877 	mask = ring_size - 1;
878 
879 	/* free mbufs received by server */
880 	memif_free_stored_mbufs(proc_private, mq);
881 
882 	/* ring type always MEMIF_RING_C2S */
883 	/* For C2S queues ring->head is updated by the sender and
884 	 * this function is called in the context of sending thread.
885 	 * The loads in the sender do not need to synchronize with
886 	 * its own stores. Hence, the following load can be a
887 	 * relaxed load.
888 	 */
889 	slot = rte_atomic_load_explicit(&ring->head, rte_memory_order_relaxed);
890 	n_free = ring_size - slot + mq->last_tail;
891 
892 	int used_slots;
893 
894 	while (n_free && (n_tx_pkts < nb_pkts)) {
895 		while ((n_free > 4) && ((nb_pkts - n_tx_pkts) > 4)) {
896 			if ((nb_pkts - n_tx_pkts) > 8) {
897 				rte_prefetch0(*bufs + 4);
898 				rte_prefetch0(*bufs + 5);
899 				rte_prefetch0(*bufs + 6);
900 				rte_prefetch0(*bufs + 7);
901 			}
902 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
903 				mask, slot, n_free);
904 			if (unlikely(used_slots < 1))
905 				goto no_free_slots;
906 			n_tx_pkts++;
907 			slot += used_slots;
908 			n_free -= used_slots;
909 
910 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
911 				mask, slot, n_free);
912 			if (unlikely(used_slots < 1))
913 				goto no_free_slots;
914 			n_tx_pkts++;
915 			slot += used_slots;
916 			n_free -= used_slots;
917 
918 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
919 				mask, slot, n_free);
920 			if (unlikely(used_slots < 1))
921 				goto no_free_slots;
922 			n_tx_pkts++;
923 			slot += used_slots;
924 			n_free -= used_slots;
925 
926 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
927 				mask, slot, n_free);
928 			if (unlikely(used_slots < 1))
929 				goto no_free_slots;
930 			n_tx_pkts++;
931 			slot += used_slots;
932 			n_free -= used_slots;
933 		}
934 		used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
935 			mask, slot, n_free);
936 		if (unlikely(used_slots < 1))
937 			goto no_free_slots;
938 		n_tx_pkts++;
939 		slot += used_slots;
940 		n_free -= used_slots;
941 	}
942 
943 no_free_slots:
944 	/* ring type always MEMIF_RING_C2S */
945 	/* The ring->head acts as a guard variable between Tx and Rx
946 	 * threads, so using store-release pairs with load-acquire
947 	 * in function eth_memif_rx for C2S rings.
948 	 */
949 	rte_atomic_store_explicit(&ring->head, slot, rte_memory_order_release);
950 
951 	/* Send interrupt, if enabled. */
952 	if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
953 		uint64_t a = 1;
954 		if (rte_intr_fd_get(mq->intr_handle) < 0)
955 			return -1;
956 
957 		ssize_t size = write(rte_intr_fd_get(mq->intr_handle),
958 				     &a, sizeof(a));
959 		if (unlikely(size < 0)) {
960 			MIF_LOG(WARNING,
961 				"Failed to send interrupt. %s", strerror(errno));
962 		}
963 	}
964 
965 	/* increment queue counters */
966 	mq->n_pkts += n_tx_pkts;
967 
968 	return n_tx_pkts;
969 }
970 
971 void
972 memif_free_regions(struct rte_eth_dev *dev)
973 {
974 	struct pmd_process_private *proc_private = dev->process_private;
975 	struct pmd_internals *pmd = dev->data->dev_private;
976 	int i;
977 	struct memif_region *r;
978 
979 	/* regions are allocated contiguously, so it's
980 	 * enough to loop until 'proc_private->regions_num'
981 	 */
982 	for (i = 0; i < proc_private->regions_num; i++) {
983 		r = proc_private->regions[i];
984 		if (r != NULL) {
985 			/* This is memzone */
986 			if (i > 0 && (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)) {
987 				r->addr = NULL;
988 				if (r->fd > 0)
989 					close(r->fd);
990 			}
991 			if (r->addr != NULL) {
992 				munmap(r->addr, r->region_size);
993 				if (r->fd > 0) {
994 					close(r->fd);
995 					r->fd = -1;
996 				}
997 			}
998 			rte_free(r);
999 			proc_private->regions[i] = NULL;
1000 		}
1001 	}
1002 	proc_private->regions_num = 0;
1003 }
1004 
1005 static int
1006 memif_region_init_zc(const struct rte_memseg_list *msl, const struct rte_memseg *ms,
1007 		     void *arg)
1008 {
1009 	struct pmd_process_private *proc_private = (struct pmd_process_private *)arg;
1010 	struct memif_region *r;
1011 
1012 	if (proc_private->regions_num < 1) {
1013 		MIF_LOG(ERR, "Missing descriptor region");
1014 		return -1;
1015 	}
1016 
1017 	r = proc_private->regions[proc_private->regions_num - 1];
1018 
1019 	if (r->addr != msl->base_va)
1020 		r = proc_private->regions[++proc_private->regions_num - 1];
1021 
1022 	if (r == NULL) {
1023 		r = rte_zmalloc("region", sizeof(struct memif_region), 0);
1024 		if (r == NULL) {
1025 			MIF_LOG(ERR, "Failed to alloc memif region.");
1026 			return -ENOMEM;
1027 		}
1028 
1029 		r->addr = msl->base_va;
1030 		r->region_size = ms->len;
1031 		r->fd = rte_memseg_get_fd(ms);
1032 		if (r->fd < 0)
1033 			return -1;
1034 		r->pkt_buffer_offset = 0;
1035 
1036 		proc_private->regions[proc_private->regions_num - 1] = r;
1037 	} else {
1038 		r->region_size += ms->len;
1039 	}
1040 
1041 	return 0;
1042 }
1043 
1044 static int
1045 memif_region_init_shm(struct rte_eth_dev *dev, uint8_t has_buffers)
1046 {
1047 	struct pmd_internals *pmd = dev->data->dev_private;
1048 	struct pmd_process_private *proc_private = dev->process_private;
1049 	char shm_name[ETH_MEMIF_SHM_NAME_SIZE];
1050 	int ret = 0;
1051 	struct memif_region *r;
1052 
1053 	if (proc_private->regions_num >= ETH_MEMIF_MAX_REGION_NUM) {
1054 		MIF_LOG(ERR, "Too many regions.");
1055 		return -1;
1056 	}
1057 
1058 	r = rte_zmalloc("region", sizeof(struct memif_region), 0);
1059 	if (r == NULL) {
1060 		MIF_LOG(ERR, "Failed to alloc memif region.");
1061 		return -ENOMEM;
1062 	}
1063 
1064 	/* calculate buffer offset */
1065 	r->pkt_buffer_offset = (pmd->run.num_c2s_rings + pmd->run.num_s2c_rings) *
1066 	    (sizeof(memif_ring_t) + sizeof(memif_desc_t) *
1067 	    (1 << pmd->run.log2_ring_size));
1068 
1069 	r->region_size = r->pkt_buffer_offset;
1070 	/* if region has buffers, add buffers size to region_size */
1071 	if (has_buffers == 1)
1072 		r->region_size += (uint32_t)(pmd->run.pkt_buffer_size *
1073 			(1 << pmd->run.log2_ring_size) *
1074 			(pmd->run.num_c2s_rings +
1075 			 pmd->run.num_s2c_rings));
1076 
1077 	memset(shm_name, 0, sizeof(char) * ETH_MEMIF_SHM_NAME_SIZE);
1078 	snprintf(shm_name, ETH_MEMIF_SHM_NAME_SIZE, "memif_region_%d",
1079 		 proc_private->regions_num);
1080 
1081 	r->fd = memfd_create(shm_name, MFD_ALLOW_SEALING);
1082 	if (r->fd < 0) {
1083 		MIF_LOG(ERR, "Failed to create shm file: %s.", strerror(errno));
1084 		ret = -1;
1085 		goto error;
1086 	}
1087 
1088 	ret = fcntl(r->fd, F_ADD_SEALS, F_SEAL_SHRINK);
1089 	if (ret < 0) {
1090 		MIF_LOG(ERR, "Failed to add seals to shm file: %s.", strerror(errno));
1091 		goto error;
1092 	}
1093 
1094 	ret = ftruncate(r->fd, r->region_size);
1095 	if (ret < 0) {
1096 		MIF_LOG(ERR, "Failed to truncate shm file: %s.", strerror(errno));
1097 		goto error;
1098 	}
1099 
1100 	r->addr = mmap(NULL, r->region_size, PROT_READ |
1101 		       PROT_WRITE, MAP_SHARED, r->fd, 0);
1102 	if (r->addr == MAP_FAILED) {
1103 		MIF_LOG(ERR, "Failed to mmap shm region: %s.", strerror(ret));
1104 		ret = -1;
1105 		goto error;
1106 	}
1107 
1108 	proc_private->regions[proc_private->regions_num] = r;
1109 	proc_private->regions_num++;
1110 
1111 	return ret;
1112 
1113 error:
1114 	if (r->fd > 0)
1115 		close(r->fd);
1116 	r->fd = -1;
1117 
1118 	return ret;
1119 }
1120 
1121 static int
1122 memif_regions_init(struct rte_eth_dev *dev)
1123 {
1124 	struct pmd_internals *pmd = dev->data->dev_private;
1125 	int ret;
1126 
1127 	/*
1128 	 * Zero-copy exposes dpdk memory.
1129 	 * Each memseg list will be represented by memif region.
1130 	 * Zero-copy regions indexing: memseg list idx + 1,
1131 	 * as we already have region 0 reserved for descriptors.
1132 	 */
1133 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1134 		/* create region idx 0 containing descriptors */
1135 		ret = memif_region_init_shm(dev, 0);
1136 		if (ret < 0)
1137 			return ret;
1138 		ret = rte_memseg_walk(memif_region_init_zc, (void *)dev->process_private);
1139 		if (ret < 0)
1140 			return ret;
1141 	} else {
1142 		/* create one memory region containing rings and buffers */
1143 		ret = memif_region_init_shm(dev, /* has buffers */ 1);
1144 		if (ret < 0)
1145 			return ret;
1146 	}
1147 
1148 	return 0;
1149 }
1150 
1151 static void
1152 memif_init_rings(struct rte_eth_dev *dev)
1153 {
1154 	struct pmd_internals *pmd = dev->data->dev_private;
1155 	struct pmd_process_private *proc_private = dev->process_private;
1156 	memif_ring_t *ring;
1157 	int i, j;
1158 	uint16_t slot;
1159 
1160 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1161 		ring = memif_get_ring(pmd, proc_private, MEMIF_RING_C2S, i);
1162 		rte_atomic_store_explicit(&ring->head, 0, rte_memory_order_relaxed);
1163 		rte_atomic_store_explicit(&ring->tail, 0, rte_memory_order_relaxed);
1164 		ring->cookie = MEMIF_COOKIE;
1165 		ring->flags = 0;
1166 
1167 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)
1168 			continue;
1169 
1170 		for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
1171 			slot = i * (1 << pmd->run.log2_ring_size) + j;
1172 			ring->desc[j].region = 0;
1173 			ring->desc[j].offset =
1174 				proc_private->regions[0]->pkt_buffer_offset +
1175 				(uint32_t)(slot * pmd->run.pkt_buffer_size);
1176 			ring->desc[j].length = pmd->run.pkt_buffer_size;
1177 		}
1178 	}
1179 
1180 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1181 		ring = memif_get_ring(pmd, proc_private, MEMIF_RING_S2C, i);
1182 		rte_atomic_store_explicit(&ring->head, 0, rte_memory_order_relaxed);
1183 		rte_atomic_store_explicit(&ring->tail, 0, rte_memory_order_relaxed);
1184 		ring->cookie = MEMIF_COOKIE;
1185 		ring->flags = 0;
1186 
1187 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)
1188 			continue;
1189 
1190 		for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
1191 			slot = (i + pmd->run.num_c2s_rings) *
1192 			    (1 << pmd->run.log2_ring_size) + j;
1193 			ring->desc[j].region = 0;
1194 			ring->desc[j].offset =
1195 				proc_private->regions[0]->pkt_buffer_offset +
1196 				(uint32_t)(slot * pmd->run.pkt_buffer_size);
1197 			ring->desc[j].length = pmd->run.pkt_buffer_size;
1198 		}
1199 	}
1200 }
1201 
1202 /* called only by client */
1203 static int
1204 memif_init_queues(struct rte_eth_dev *dev)
1205 {
1206 	struct pmd_internals *pmd = dev->data->dev_private;
1207 	struct memif_queue *mq;
1208 	int i;
1209 
1210 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1211 		mq = dev->data->tx_queues[i];
1212 		mq->log2_ring_size = pmd->run.log2_ring_size;
1213 		/* queues located only in region 0 */
1214 		mq->region = 0;
1215 		mq->ring_offset = memif_get_ring_offset(dev, mq, MEMIF_RING_C2S, i);
1216 		mq->last_head = 0;
1217 		mq->last_tail = 0;
1218 		if (rte_intr_fd_set(mq->intr_handle, eventfd(0, EFD_NONBLOCK)))
1219 			return -rte_errno;
1220 
1221 		if (rte_intr_fd_get(mq->intr_handle) < 0) {
1222 			MIF_LOG(WARNING,
1223 				"Failed to create eventfd for tx queue %d: %s.", i,
1224 				strerror(errno));
1225 		}
1226 		mq->buffers = NULL;
1227 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1228 			mq->buffers = rte_zmalloc("bufs", sizeof(struct rte_mbuf *) *
1229 						  (1 << mq->log2_ring_size), 0);
1230 			if (mq->buffers == NULL)
1231 				return -ENOMEM;
1232 		}
1233 	}
1234 
1235 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1236 		mq = dev->data->rx_queues[i];
1237 		mq->log2_ring_size = pmd->run.log2_ring_size;
1238 		/* queues located only in region 0 */
1239 		mq->region = 0;
1240 		mq->ring_offset = memif_get_ring_offset(dev, mq, MEMIF_RING_S2C, i);
1241 		mq->last_head = 0;
1242 		mq->last_tail = 0;
1243 		if (rte_intr_fd_set(mq->intr_handle, eventfd(0, EFD_NONBLOCK)))
1244 			return -rte_errno;
1245 		if (rte_intr_fd_get(mq->intr_handle) < 0) {
1246 			MIF_LOG(WARNING,
1247 				"Failed to create eventfd for rx queue %d: %s.", i,
1248 				strerror(errno));
1249 		}
1250 		mq->buffers = NULL;
1251 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1252 			/*
1253 			 * Allocate 2x ring_size to reserve a contiguous array for
1254 			 * rte_pktmbuf_alloc_bulk (to store allocated mbufs).
1255 			 */
1256 			mq->buffers = rte_zmalloc("bufs", sizeof(struct rte_mbuf *) *
1257 						  (1 << (mq->log2_ring_size + 1)), 0);
1258 			if (mq->buffers == NULL)
1259 				return -ENOMEM;
1260 		}
1261 	}
1262 	return 0;
1263 }
1264 
1265 int
1266 memif_init_regions_and_queues(struct rte_eth_dev *dev)
1267 {
1268 	int ret;
1269 
1270 	ret = memif_regions_init(dev);
1271 	if (ret < 0)
1272 		return ret;
1273 
1274 	memif_init_rings(dev);
1275 
1276 	ret = memif_init_queues(dev);
1277 	if (ret < 0)
1278 		return ret;
1279 
1280 	return 0;
1281 }
1282 
1283 int
1284 memif_connect(struct rte_eth_dev *dev)
1285 {
1286 	struct pmd_internals *pmd = dev->data->dev_private;
1287 	struct pmd_process_private *proc_private = dev->process_private;
1288 	struct memif_region *mr;
1289 	struct memif_queue *mq;
1290 	memif_ring_t *ring;
1291 	int i;
1292 
1293 	for (i = 0; i < proc_private->regions_num; i++) {
1294 		mr = proc_private->regions[i];
1295 		if (mr != NULL) {
1296 			if (mr->addr == NULL) {
1297 				if (mr->fd < 0)
1298 					return -1;
1299 				mr->addr = mmap(NULL, mr->region_size,
1300 						PROT_READ | PROT_WRITE,
1301 						MAP_SHARED, mr->fd, 0);
1302 				if (mr->addr == MAP_FAILED) {
1303 					MIF_LOG(ERR, "mmap failed: %s",
1304 						strerror(errno));
1305 					return -1;
1306 				}
1307 			}
1308 			if (i > 0 && (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)) {
1309 				/* close memseg file */
1310 				close(mr->fd);
1311 				mr->fd = -1;
1312 			}
1313 		}
1314 	}
1315 
1316 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
1317 		for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1318 			mq = (pmd->role == MEMIF_ROLE_CLIENT) ?
1319 			    dev->data->tx_queues[i] : dev->data->rx_queues[i];
1320 			ring = memif_get_ring_from_queue(proc_private, mq);
1321 			if (ring == NULL || ring->cookie != MEMIF_COOKIE) {
1322 				MIF_LOG(ERR, "Wrong ring");
1323 				return -1;
1324 			}
1325 			rte_atomic_store_explicit(&ring->head, 0, rte_memory_order_relaxed);
1326 			rte_atomic_store_explicit(&ring->tail, 0, rte_memory_order_relaxed);
1327 			mq->last_head = 0;
1328 			mq->last_tail = 0;
1329 			/* enable polling mode */
1330 			if (pmd->role == MEMIF_ROLE_SERVER)
1331 				ring->flags = MEMIF_RING_FLAG_MASK_INT;
1332 		}
1333 		for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1334 			mq = (pmd->role == MEMIF_ROLE_CLIENT) ?
1335 			    dev->data->rx_queues[i] : dev->data->tx_queues[i];
1336 			ring = memif_get_ring_from_queue(proc_private, mq);
1337 			if (ring == NULL || ring->cookie != MEMIF_COOKIE) {
1338 				MIF_LOG(ERR, "Wrong ring");
1339 				return -1;
1340 			}
1341 			rte_atomic_store_explicit(&ring->head, 0, rte_memory_order_relaxed);
1342 			rte_atomic_store_explicit(&ring->tail, 0, rte_memory_order_relaxed);
1343 			mq->last_head = 0;
1344 			mq->last_tail = 0;
1345 			/* enable polling mode */
1346 			if (pmd->role == MEMIF_ROLE_CLIENT)
1347 				ring->flags = MEMIF_RING_FLAG_MASK_INT;
1348 		}
1349 
1350 		pmd->flags &= ~ETH_MEMIF_FLAG_CONNECTING;
1351 		pmd->flags |= ETH_MEMIF_FLAG_CONNECTED;
1352 		dev->data->dev_link.link_status = RTE_ETH_LINK_UP;
1353 	}
1354 	MIF_LOG(INFO, "Connected.");
1355 	return 0;
1356 }
1357 
1358 static int
1359 memif_dev_start(struct rte_eth_dev *dev)
1360 {
1361 	struct pmd_internals *pmd = dev->data->dev_private;
1362 	int ret = 0;
1363 	uint16_t i;
1364 
1365 	switch (pmd->role) {
1366 	case MEMIF_ROLE_CLIENT:
1367 		ret = memif_connect_client(dev);
1368 		break;
1369 	case MEMIF_ROLE_SERVER:
1370 		ret = memif_connect_server(dev);
1371 		break;
1372 	default:
1373 		MIF_LOG(ERR, "Unknown role: %d.", pmd->role);
1374 		ret = -1;
1375 		break;
1376 	}
1377 
1378 	if (ret == 0) {
1379 		for (i = 0; i < dev->data->nb_rx_queues; i++)
1380 			dev->data->rx_queue_state[i] = RTE_ETH_QUEUE_STATE_STARTED;
1381 		for (i = 0; i < dev->data->nb_tx_queues; i++)
1382 			dev->data->tx_queue_state[i] = RTE_ETH_QUEUE_STATE_STARTED;
1383 	}
1384 
1385 	return ret;
1386 }
1387 
1388 static int
1389 memif_dev_stop(struct rte_eth_dev *dev)
1390 {
1391 	uint16_t i;
1392 
1393 	memif_disconnect(dev);
1394 
1395 	for (i = 0; i < dev->data->nb_rx_queues; i++)
1396 		dev->data->rx_queue_state[i] = RTE_ETH_QUEUE_STATE_STOPPED;
1397 	for (i = 0; i < dev->data->nb_tx_queues; i++)
1398 		dev->data->tx_queue_state[i] = RTE_ETH_QUEUE_STATE_STOPPED;
1399 
1400 	return 0;
1401 }
1402 
1403 static int
1404 memif_dev_close(struct rte_eth_dev *dev)
1405 {
1406 	struct pmd_internals *pmd = dev->data->dev_private;
1407 	int i;
1408 
1409 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
1410 		memif_msg_enq_disconnect(pmd->cc, "Device closed", 0);
1411 
1412 		for (i = 0; i < dev->data->nb_rx_queues; i++)
1413 			(*dev->dev_ops->rx_queue_release)(dev, i);
1414 		for (i = 0; i < dev->data->nb_tx_queues; i++)
1415 			(*dev->dev_ops->tx_queue_release)(dev, i);
1416 
1417 		memif_socket_remove_device(dev);
1418 	}
1419 
1420 	rte_free(dev->process_private);
1421 
1422 	return 0;
1423 }
1424 
1425 static int
1426 memif_dev_configure(struct rte_eth_dev *dev)
1427 {
1428 	struct pmd_internals *pmd = dev->data->dev_private;
1429 
1430 	/*
1431 	 * CLIENT - TXQ
1432 	 * SERVER - RXQ
1433 	 */
1434 	pmd->cfg.num_c2s_rings = (pmd->role == MEMIF_ROLE_CLIENT) ?
1435 				  dev->data->nb_tx_queues : dev->data->nb_rx_queues;
1436 
1437 	/*
1438 	 * CLIENT - RXQ
1439 	 * SERVER - TXQ
1440 	 */
1441 	pmd->cfg.num_s2c_rings = (pmd->role == MEMIF_ROLE_CLIENT) ?
1442 				  dev->data->nb_rx_queues : dev->data->nb_tx_queues;
1443 
1444 	return 0;
1445 }
1446 
1447 static int
1448 memif_tx_queue_setup(struct rte_eth_dev *dev,
1449 		     uint16_t qid,
1450 		     uint16_t nb_tx_desc __rte_unused,
1451 		     unsigned int socket_id __rte_unused,
1452 		     const struct rte_eth_txconf *tx_conf __rte_unused)
1453 {
1454 	struct pmd_internals *pmd = dev->data->dev_private;
1455 	struct memif_queue *mq;
1456 
1457 	mq = rte_zmalloc("tx-queue", sizeof(struct memif_queue), 0);
1458 	if (mq == NULL) {
1459 		MIF_LOG(ERR, "Failed to allocate tx queue id: %u", qid);
1460 		return -ENOMEM;
1461 	}
1462 
1463 	/* Allocate interrupt instance */
1464 	mq->intr_handle = rte_intr_instance_alloc(RTE_INTR_INSTANCE_F_SHARED);
1465 	if (mq->intr_handle == NULL) {
1466 		MIF_LOG(ERR, "Failed to allocate intr handle");
1467 		return -ENOMEM;
1468 	}
1469 
1470 	mq->type =
1471 	    (pmd->role == MEMIF_ROLE_CLIENT) ? MEMIF_RING_C2S : MEMIF_RING_S2C;
1472 	mq->n_pkts = 0;
1473 	mq->n_bytes = 0;
1474 
1475 	if (rte_intr_fd_set(mq->intr_handle, -1))
1476 		return -rte_errno;
1477 
1478 	if (rte_intr_type_set(mq->intr_handle, RTE_INTR_HANDLE_EXT))
1479 		return -rte_errno;
1480 
1481 	mq->in_port = dev->data->port_id;
1482 	dev->data->tx_queues[qid] = mq;
1483 
1484 	return 0;
1485 }
1486 
1487 static int
1488 memif_rx_queue_setup(struct rte_eth_dev *dev,
1489 		     uint16_t qid,
1490 		     uint16_t nb_rx_desc __rte_unused,
1491 		     unsigned int socket_id __rte_unused,
1492 		     const struct rte_eth_rxconf *rx_conf __rte_unused,
1493 		     struct rte_mempool *mb_pool)
1494 {
1495 	struct pmd_internals *pmd = dev->data->dev_private;
1496 	struct memif_queue *mq;
1497 
1498 	mq = rte_zmalloc("rx-queue", sizeof(struct memif_queue), 0);
1499 	if (mq == NULL) {
1500 		MIF_LOG(ERR, "Failed to allocate rx queue id: %u", qid);
1501 		return -ENOMEM;
1502 	}
1503 
1504 	/* Allocate interrupt instance */
1505 	mq->intr_handle = rte_intr_instance_alloc(RTE_INTR_INSTANCE_F_SHARED);
1506 	if (mq->intr_handle == NULL) {
1507 		MIF_LOG(ERR, "Failed to allocate intr handle");
1508 		return -ENOMEM;
1509 	}
1510 
1511 	mq->type = (pmd->role == MEMIF_ROLE_CLIENT) ? MEMIF_RING_S2C : MEMIF_RING_C2S;
1512 	mq->n_pkts = 0;
1513 	mq->n_bytes = 0;
1514 
1515 	if (rte_intr_fd_set(mq->intr_handle, -1))
1516 		return -rte_errno;
1517 
1518 	if (rte_intr_type_set(mq->intr_handle, RTE_INTR_HANDLE_EXT))
1519 		return -rte_errno;
1520 
1521 	mq->mempool = mb_pool;
1522 	mq->in_port = dev->data->port_id;
1523 	dev->data->rx_queues[qid] = mq;
1524 
1525 	return 0;
1526 }
1527 
1528 static void
1529 memif_rx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
1530 {
1531 	struct memif_queue *mq = dev->data->rx_queues[qid];
1532 
1533 	if (!mq)
1534 		return;
1535 
1536 	rte_intr_instance_free(mq->intr_handle);
1537 	rte_free(mq);
1538 }
1539 
1540 static void
1541 memif_tx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
1542 {
1543 	struct memif_queue *mq = dev->data->tx_queues[qid];
1544 
1545 	if (!mq)
1546 		return;
1547 
1548 	rte_free(mq);
1549 }
1550 
1551 static int
1552 memif_link_update(struct rte_eth_dev *dev,
1553 		  int wait_to_complete __rte_unused)
1554 {
1555 	struct pmd_process_private *proc_private;
1556 
1557 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1558 		proc_private = dev->process_private;
1559 		if (dev->data->dev_link.link_status == RTE_ETH_LINK_UP &&
1560 				proc_private->regions_num == 0) {
1561 			memif_mp_request_regions(dev);
1562 		} else if (dev->data->dev_link.link_status == RTE_ETH_LINK_DOWN &&
1563 				proc_private->regions_num > 0) {
1564 			memif_free_regions(dev);
1565 		}
1566 	}
1567 	return 0;
1568 }
1569 
1570 static int
1571 memif_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
1572 {
1573 	struct pmd_internals *pmd = dev->data->dev_private;
1574 	struct memif_queue *mq;
1575 	int i;
1576 	uint8_t tmp, nq;
1577 
1578 	stats->ipackets = 0;
1579 	stats->ibytes = 0;
1580 	stats->opackets = 0;
1581 	stats->obytes = 0;
1582 
1583 	tmp = (pmd->role == MEMIF_ROLE_CLIENT) ? pmd->run.num_s2c_rings :
1584 	    pmd->run.num_c2s_rings;
1585 	nq = (tmp < RTE_ETHDEV_QUEUE_STAT_CNTRS) ? tmp :
1586 	    RTE_ETHDEV_QUEUE_STAT_CNTRS;
1587 
1588 	/* RX stats */
1589 	for (i = 0; i < nq; i++) {
1590 		mq = dev->data->rx_queues[i];
1591 		stats->q_ipackets[i] = mq->n_pkts;
1592 		stats->q_ibytes[i] = mq->n_bytes;
1593 		stats->ipackets += mq->n_pkts;
1594 		stats->ibytes += mq->n_bytes;
1595 	}
1596 
1597 	tmp = (pmd->role == MEMIF_ROLE_CLIENT) ? pmd->run.num_c2s_rings :
1598 	    pmd->run.num_s2c_rings;
1599 	nq = (tmp < RTE_ETHDEV_QUEUE_STAT_CNTRS) ? tmp :
1600 	    RTE_ETHDEV_QUEUE_STAT_CNTRS;
1601 
1602 	/* TX stats */
1603 	for (i = 0; i < nq; i++) {
1604 		mq = dev->data->tx_queues[i];
1605 		stats->q_opackets[i] = mq->n_pkts;
1606 		stats->q_obytes[i] = mq->n_bytes;
1607 		stats->opackets += mq->n_pkts;
1608 		stats->obytes += mq->n_bytes;
1609 	}
1610 	return 0;
1611 }
1612 
1613 static int
1614 memif_stats_reset(struct rte_eth_dev *dev)
1615 {
1616 	struct pmd_internals *pmd = dev->data->dev_private;
1617 	int i;
1618 	struct memif_queue *mq;
1619 
1620 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1621 		mq = (pmd->role == MEMIF_ROLE_CLIENT) ? dev->data->tx_queues[i] :
1622 		    dev->data->rx_queues[i];
1623 		mq->n_pkts = 0;
1624 		mq->n_bytes = 0;
1625 	}
1626 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1627 		mq = (pmd->role == MEMIF_ROLE_CLIENT) ? dev->data->rx_queues[i] :
1628 		    dev->data->tx_queues[i];
1629 		mq->n_pkts = 0;
1630 		mq->n_bytes = 0;
1631 	}
1632 
1633 	return 0;
1634 }
1635 
1636 static const struct eth_dev_ops ops = {
1637 	.dev_start = memif_dev_start,
1638 	.dev_stop = memif_dev_stop,
1639 	.dev_close = memif_dev_close,
1640 	.dev_infos_get = memif_dev_info,
1641 	.dev_configure = memif_dev_configure,
1642 	.tx_queue_setup = memif_tx_queue_setup,
1643 	.rx_queue_setup = memif_rx_queue_setup,
1644 	.rx_queue_release = memif_rx_queue_release,
1645 	.tx_queue_release = memif_tx_queue_release,
1646 	.link_update = memif_link_update,
1647 	.stats_get = memif_stats_get,
1648 	.stats_reset = memif_stats_reset,
1649 };
1650 
1651 static int
1652 memif_create(struct rte_vdev_device *vdev, enum memif_role_t role,
1653 	     memif_interface_id_t id, uint32_t flags,
1654 	     const char *socket_filename, uid_t owner_uid, gid_t owner_gid,
1655 	     memif_log2_ring_size_t log2_ring_size,
1656 	     uint16_t pkt_buffer_size, const char *secret,
1657 	     struct rte_ether_addr *ether_addr)
1658 {
1659 	int ret = 0;
1660 	struct rte_eth_dev *eth_dev;
1661 	struct rte_eth_dev_data *data;
1662 	struct pmd_internals *pmd;
1663 	struct pmd_process_private *process_private;
1664 	const unsigned int numa_node = vdev->device.numa_node;
1665 	const char *name = rte_vdev_device_name(vdev);
1666 
1667 	eth_dev = rte_eth_vdev_allocate(vdev, sizeof(*pmd));
1668 	if (eth_dev == NULL) {
1669 		MIF_LOG(ERR, "%s: Unable to allocate device struct.", name);
1670 		return -1;
1671 	}
1672 
1673 	process_private = (struct pmd_process_private *)
1674 		rte_zmalloc(name, sizeof(struct pmd_process_private),
1675 			    RTE_CACHE_LINE_SIZE);
1676 
1677 	if (process_private == NULL) {
1678 		MIF_LOG(ERR, "Failed to alloc memory for process private");
1679 		return -1;
1680 	}
1681 	eth_dev->process_private = process_private;
1682 
1683 	pmd = eth_dev->data->dev_private;
1684 	memset(pmd, 0, sizeof(*pmd));
1685 
1686 	pmd->id = id;
1687 	pmd->flags = flags;
1688 	pmd->flags |= ETH_MEMIF_FLAG_DISABLED;
1689 	pmd->role = role;
1690 	/* Zero-copy flag irelevant to server. */
1691 	if (pmd->role == MEMIF_ROLE_SERVER)
1692 		pmd->flags &= ~ETH_MEMIF_FLAG_ZERO_COPY;
1693 	pmd->owner_uid = owner_uid;
1694 	pmd->owner_gid = owner_gid;
1695 
1696 	ret = memif_socket_init(eth_dev, socket_filename);
1697 	if (ret < 0)
1698 		return ret;
1699 
1700 	memset(pmd->secret, 0, sizeof(char) * ETH_MEMIF_SECRET_SIZE);
1701 	if (secret != NULL)
1702 		strlcpy(pmd->secret, secret, sizeof(pmd->secret));
1703 
1704 	pmd->cfg.log2_ring_size = log2_ring_size;
1705 	/* set in .dev_configure() */
1706 	pmd->cfg.num_c2s_rings = 0;
1707 	pmd->cfg.num_s2c_rings = 0;
1708 
1709 	pmd->cfg.pkt_buffer_size = pkt_buffer_size;
1710 	rte_spinlock_init(&pmd->cc_lock);
1711 
1712 	data = eth_dev->data;
1713 	data->dev_private = pmd;
1714 	data->numa_node = numa_node;
1715 	data->dev_link = pmd_link;
1716 	data->mac_addrs = ether_addr;
1717 	data->promiscuous = 1;
1718 	data->dev_flags |= RTE_ETH_DEV_AUTOFILL_QUEUE_XSTATS;
1719 
1720 	eth_dev->dev_ops = &ops;
1721 	eth_dev->device = &vdev->device;
1722 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1723 		eth_dev->rx_pkt_burst = eth_memif_rx_zc;
1724 		eth_dev->tx_pkt_burst = eth_memif_tx_zc;
1725 	} else {
1726 		eth_dev->rx_pkt_burst = eth_memif_rx;
1727 		eth_dev->tx_pkt_burst = eth_memif_tx;
1728 	}
1729 
1730 	rte_eth_dev_probing_finish(eth_dev);
1731 
1732 	return 0;
1733 }
1734 
1735 static int
1736 memif_set_role(const char *key __rte_unused, const char *value,
1737 	       void *extra_args)
1738 {
1739 	enum memif_role_t *role = (enum memif_role_t *)extra_args;
1740 
1741 	if (strstr(value, "server") != NULL) {
1742 		*role = MEMIF_ROLE_SERVER;
1743 	} else if (strstr(value, "client") != NULL) {
1744 		*role = MEMIF_ROLE_CLIENT;
1745 	} else if (strstr(value, "master") != NULL) {
1746 		MIF_LOG(NOTICE, "Role argument \"master\" is deprecated, use \"server\"");
1747 		*role = MEMIF_ROLE_SERVER;
1748 	} else if (strstr(value, "slave") != NULL) {
1749 		MIF_LOG(NOTICE, "Role argument \"slave\" is deprecated, use \"client\"");
1750 		*role = MEMIF_ROLE_CLIENT;
1751 	} else {
1752 		MIF_LOG(ERR, "Unknown role: %s.", value);
1753 		return -EINVAL;
1754 	}
1755 	return 0;
1756 }
1757 
1758 static int
1759 memif_set_zc(const char *key __rte_unused, const char *value, void *extra_args)
1760 {
1761 	uint32_t *flags = (uint32_t *)extra_args;
1762 
1763 	if (strstr(value, "yes") != NULL) {
1764 		if (!rte_mcfg_get_single_file_segments()) {
1765 			MIF_LOG(ERR, "Zero-copy doesn't support multi-file segments.");
1766 			return -ENOTSUP;
1767 		}
1768 		*flags |= ETH_MEMIF_FLAG_ZERO_COPY;
1769 	} else if (strstr(value, "no") != NULL) {
1770 		*flags &= ~ETH_MEMIF_FLAG_ZERO_COPY;
1771 	} else {
1772 		MIF_LOG(ERR, "Failed to parse zero-copy param: %s.", value);
1773 		return -EINVAL;
1774 	}
1775 	return 0;
1776 }
1777 
1778 static int
1779 memif_set_id(const char *key __rte_unused, const char *value, void *extra_args)
1780 {
1781 	memif_interface_id_t *id = (memif_interface_id_t *)extra_args;
1782 
1783 	/* even if parsing fails, 0 is a valid id */
1784 	*id = strtoul(value, NULL, 10);
1785 	return 0;
1786 }
1787 
1788 static int
1789 memif_set_bs(const char *key __rte_unused, const char *value, void *extra_args)
1790 {
1791 	unsigned long tmp;
1792 	uint16_t *pkt_buffer_size = (uint16_t *)extra_args;
1793 
1794 	tmp = strtoul(value, NULL, 10);
1795 	if (tmp == 0 || tmp > 0xFFFF) {
1796 		MIF_LOG(ERR, "Invalid buffer size: %s.", value);
1797 		return -EINVAL;
1798 	}
1799 	*pkt_buffer_size = tmp;
1800 	return 0;
1801 }
1802 
1803 static int
1804 memif_set_rs(const char *key __rte_unused, const char *value, void *extra_args)
1805 {
1806 	unsigned long tmp;
1807 	memif_log2_ring_size_t *log2_ring_size =
1808 	    (memif_log2_ring_size_t *)extra_args;
1809 
1810 	tmp = strtoul(value, NULL, 10);
1811 	if (tmp == 0 || tmp > ETH_MEMIF_MAX_LOG2_RING_SIZE) {
1812 		MIF_LOG(ERR, "Invalid ring size: %s (max %u).",
1813 			value, ETH_MEMIF_MAX_LOG2_RING_SIZE);
1814 		return -EINVAL;
1815 	}
1816 	*log2_ring_size = tmp;
1817 	return 0;
1818 }
1819 
1820 /* check if directory exists and if we have permission to read/write */
1821 static int
1822 memif_check_socket_filename(const char *filename)
1823 {
1824 	char *dir = NULL, *tmp;
1825 	uint32_t idx;
1826 	int ret = 0;
1827 
1828 	if (strlen(filename) >= MEMIF_SOCKET_UN_SIZE) {
1829 		MIF_LOG(ERR, "Unix socket address too long (max 108).");
1830 		return -1;
1831 	}
1832 
1833 	tmp = strrchr(filename, '/');
1834 	if (tmp != NULL) {
1835 		idx = tmp - filename;
1836 		dir = rte_zmalloc("memif_tmp", sizeof(char) * (idx + 1), 0);
1837 		if (dir == NULL) {
1838 			MIF_LOG(ERR, "Failed to allocate memory.");
1839 			return -1;
1840 		}
1841 		strlcpy(dir, filename, sizeof(char) * (idx + 1));
1842 	}
1843 
1844 	if (dir == NULL || (faccessat(-1, dir, F_OK | R_OK |
1845 					W_OK, AT_EACCESS) < 0)) {
1846 		MIF_LOG(ERR, "Invalid socket directory.");
1847 		ret = -EINVAL;
1848 	}
1849 
1850 	rte_free(dir);
1851 
1852 	return ret;
1853 }
1854 
1855 static int
1856 memif_set_socket_filename(const char *key __rte_unused, const char *value,
1857 			  void *extra_args)
1858 {
1859 	const char **socket_filename = (const char **)extra_args;
1860 
1861 	*socket_filename = value;
1862 	return 0;
1863 }
1864 
1865 static int
1866 memif_set_is_socket_abstract(const char *key __rte_unused, const char *value, void *extra_args)
1867 {
1868 	uint32_t *flags = (uint32_t *)extra_args;
1869 
1870 	if (strstr(value, "yes") != NULL) {
1871 		*flags |= ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1872 	} else if (strstr(value, "no") != NULL) {
1873 		*flags &= ~ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1874 	} else {
1875 		MIF_LOG(ERR, "Failed to parse socket-abstract param: %s.", value);
1876 		return -EINVAL;
1877 	}
1878 	return 0;
1879 }
1880 
1881 static int
1882 memif_set_owner(const char *key, const char *value, void *extra_args)
1883 {
1884 	RTE_ASSERT(sizeof(uid_t) == sizeof(uint32_t));
1885 	RTE_ASSERT(sizeof(gid_t) == sizeof(uint32_t));
1886 
1887 	unsigned long val;
1888 	char *end = NULL;
1889 	uint32_t *id = (uint32_t *)extra_args;
1890 
1891 	val = strtoul(value, &end, 10);
1892 	if (*value == '\0' || *end != '\0') {
1893 		MIF_LOG(ERR, "Failed to parse %s: %s.", key, value);
1894 		return -EINVAL;
1895 	}
1896 	if (val >= UINT32_MAX) {
1897 		MIF_LOG(ERR, "Invalid %s: %s.", key, value);
1898 		return -ERANGE;
1899 	}
1900 
1901 	*id = val;
1902 	return 0;
1903 }
1904 
1905 static int
1906 memif_set_mac(const char *key __rte_unused, const char *value, void *extra_args)
1907 {
1908 	struct rte_ether_addr *ether_addr = (struct rte_ether_addr *)extra_args;
1909 
1910 	if (rte_ether_unformat_addr(value, ether_addr) < 0)
1911 		MIF_LOG(WARNING, "Failed to parse mac '%s'.", value);
1912 	return 0;
1913 }
1914 
1915 static int
1916 memif_set_secret(const char *key __rte_unused, const char *value, void *extra_args)
1917 {
1918 	const char **secret = (const char **)extra_args;
1919 
1920 	*secret = value;
1921 	return 0;
1922 }
1923 
1924 static int
1925 rte_pmd_memif_probe(struct rte_vdev_device *vdev)
1926 {
1927 	RTE_BUILD_BUG_ON(sizeof(memif_msg_t) != 128);
1928 	RTE_BUILD_BUG_ON(sizeof(memif_desc_t) != 16);
1929 	int ret = 0;
1930 	struct rte_kvargs *kvlist;
1931 	const char *name = rte_vdev_device_name(vdev);
1932 	enum memif_role_t role = MEMIF_ROLE_CLIENT;
1933 	memif_interface_id_t id = 0;
1934 	uint16_t pkt_buffer_size = ETH_MEMIF_DEFAULT_PKT_BUFFER_SIZE;
1935 	memif_log2_ring_size_t log2_ring_size = ETH_MEMIF_DEFAULT_RING_SIZE;
1936 	const char *socket_filename = ETH_MEMIF_DEFAULT_SOCKET_FILENAME;
1937 	uid_t owner_uid = -1;
1938 	gid_t owner_gid = -1;
1939 	uint32_t flags = 0;
1940 	const char *secret = NULL;
1941 	struct rte_ether_addr *ether_addr = rte_zmalloc("",
1942 		sizeof(struct rte_ether_addr), 0);
1943 	struct rte_eth_dev *eth_dev;
1944 
1945 	rte_eth_random_addr(ether_addr->addr_bytes);
1946 
1947 	MIF_LOG(INFO, "Initialize MEMIF: %s.", name);
1948 
1949 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1950 		eth_dev = rte_eth_dev_attach_secondary(name);
1951 		if (!eth_dev) {
1952 			MIF_LOG(ERR, "Failed to probe %s", name);
1953 			return -1;
1954 		}
1955 
1956 		eth_dev->dev_ops = &ops;
1957 		eth_dev->device = &vdev->device;
1958 		eth_dev->rx_pkt_burst = eth_memif_rx;
1959 		eth_dev->tx_pkt_burst = eth_memif_tx;
1960 
1961 		if (!rte_eal_primary_proc_alive(NULL)) {
1962 			MIF_LOG(ERR, "Primary process is missing");
1963 			return -1;
1964 		}
1965 
1966 		eth_dev->process_private = (struct pmd_process_private *)
1967 			rte_zmalloc(name,
1968 				sizeof(struct pmd_process_private),
1969 				RTE_CACHE_LINE_SIZE);
1970 		if (eth_dev->process_private == NULL) {
1971 			MIF_LOG(ERR,
1972 				"Failed to alloc memory for process private");
1973 			return -1;
1974 		}
1975 
1976 		rte_eth_dev_probing_finish(eth_dev);
1977 
1978 		return 0;
1979 	}
1980 
1981 	ret = rte_mp_action_register(MEMIF_MP_SEND_REGION, memif_mp_send_region);
1982 	/*
1983 	 * Primary process can continue probing, but secondary process won't
1984 	 * be able to get memory regions information
1985 	 */
1986 	if (ret < 0 && rte_errno != EEXIST)
1987 		MIF_LOG(WARNING, "Failed to register mp action callback: %s",
1988 			strerror(rte_errno));
1989 
1990 	/* use abstract address by default */
1991 	flags |= ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1992 
1993 	kvlist = rte_kvargs_parse(rte_vdev_device_args(vdev), valid_arguments);
1994 
1995 	/* parse parameters */
1996 	if (kvlist != NULL) {
1997 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ROLE_ARG,
1998 					 &memif_set_role, &role);
1999 		if (ret < 0)
2000 			goto exit;
2001 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ID_ARG,
2002 					 &memif_set_id, &id);
2003 		if (ret < 0)
2004 			goto exit;
2005 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_PKT_BUFFER_SIZE_ARG,
2006 					 &memif_set_bs, &pkt_buffer_size);
2007 		if (ret < 0)
2008 			goto exit;
2009 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_RING_SIZE_ARG,
2010 					 &memif_set_rs, &log2_ring_size);
2011 		if (ret < 0)
2012 			goto exit;
2013 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SOCKET_ARG,
2014 					 &memif_set_socket_filename,
2015 					 (void *)(&socket_filename));
2016 		if (ret < 0)
2017 			goto exit;
2018 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SOCKET_ABSTRACT_ARG,
2019 					 &memif_set_is_socket_abstract, &flags);
2020 		if (ret < 0)
2021 			goto exit;
2022 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_OWNER_UID_ARG,
2023 					 &memif_set_owner, &owner_uid);
2024 		if (ret < 0)
2025 			goto exit;
2026 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_OWNER_GID_ARG,
2027 					 &memif_set_owner, &owner_gid);
2028 		if (ret < 0)
2029 			goto exit;
2030 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_MAC_ARG,
2031 					 &memif_set_mac, ether_addr);
2032 		if (ret < 0)
2033 			goto exit;
2034 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ZC_ARG,
2035 					 &memif_set_zc, &flags);
2036 		if (ret < 0)
2037 			goto exit;
2038 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SECRET_ARG,
2039 					 &memif_set_secret, (void *)(&secret));
2040 		if (ret < 0)
2041 			goto exit;
2042 	}
2043 
2044 	if (!(flags & ETH_MEMIF_FLAG_SOCKET_ABSTRACT)) {
2045 		ret = memif_check_socket_filename(socket_filename);
2046 		if (ret < 0)
2047 			goto exit;
2048 	}
2049 
2050 	/* create interface */
2051 	ret = memif_create(vdev, role, id, flags, socket_filename, owner_uid, owner_gid,
2052 			   log2_ring_size, pkt_buffer_size, secret, ether_addr);
2053 
2054 exit:
2055 	rte_kvargs_free(kvlist);
2056 	return ret;
2057 }
2058 
2059 static int
2060 rte_pmd_memif_remove(struct rte_vdev_device *vdev)
2061 {
2062 	struct rte_eth_dev *eth_dev;
2063 
2064 	eth_dev = rte_eth_dev_allocated(rte_vdev_device_name(vdev));
2065 	if (eth_dev == NULL)
2066 		return 0;
2067 
2068 	return rte_eth_dev_close(eth_dev->data->port_id);
2069 }
2070 
2071 static struct rte_vdev_driver pmd_memif_drv = {
2072 	.probe = rte_pmd_memif_probe,
2073 	.remove = rte_pmd_memif_remove,
2074 };
2075 
2076 RTE_PMD_REGISTER_VDEV(net_memif, pmd_memif_drv);
2077 
2078 RTE_PMD_REGISTER_PARAM_STRING(net_memif,
2079 			      ETH_MEMIF_ID_ARG "=<int>"
2080 			      ETH_MEMIF_ROLE_ARG "=server|client"
2081 			      ETH_MEMIF_PKT_BUFFER_SIZE_ARG "=<int>"
2082 			      ETH_MEMIF_RING_SIZE_ARG "=<int>"
2083 			      ETH_MEMIF_SOCKET_ARG "=<string>"
2084 			      ETH_MEMIF_SOCKET_ABSTRACT_ARG "=yes|no"
2085 			      ETH_MEMIF_OWNER_UID_ARG "=<int>"
2086 			      ETH_MEMIF_OWNER_GID_ARG "=<int>"
2087 			      ETH_MEMIF_MAC_ARG "=xx:xx:xx:xx:xx:xx"
2088 			      ETH_MEMIF_ZC_ARG "=yes|no"
2089 			      ETH_MEMIF_SECRET_ARG "=<string>");
2090 
2091 RTE_LOG_REGISTER_DEFAULT(memif_logtype, NOTICE);
2092