xref: /dpdk/drivers/net/memif/rte_eth_memif.c (revision a4fa02e06046d36c6a7340201571397d2f59a682)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright 2018-2019 Cisco Systems, Inc.  All rights reserved.
3  */
4 
5 #include <stdlib.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <sys/un.h>
11 #include <sys/ioctl.h>
12 #include <sys/mman.h>
13 #include <linux/if_ether.h>
14 #include <errno.h>
15 #include <sys/eventfd.h>
16 
17 #include <rte_version.h>
18 #include <rte_mbuf.h>
19 #include <rte_ether.h>
20 #include <ethdev_driver.h>
21 #include <ethdev_vdev.h>
22 #include <rte_malloc.h>
23 #include <rte_kvargs.h>
24 #include <bus_vdev_driver.h>
25 #include <rte_string_fns.h>
26 #include <rte_errno.h>
27 #include <rte_memory.h>
28 #include <rte_memzone.h>
29 #include <rte_eal_memconfig.h>
30 
31 #include "rte_eth_memif.h"
32 #include "memif_socket.h"
33 
34 #define ETH_MEMIF_ID_ARG		"id"
35 #define ETH_MEMIF_ROLE_ARG		"role"
36 #define ETH_MEMIF_PKT_BUFFER_SIZE_ARG	"bsize"
37 #define ETH_MEMIF_RING_SIZE_ARG		"rsize"
38 #define ETH_MEMIF_SOCKET_ARG		"socket"
39 #define ETH_MEMIF_SOCKET_ABSTRACT_ARG	"socket-abstract"
40 #define ETH_MEMIF_OWNER_UID_ARG		"owner-uid"
41 #define ETH_MEMIF_OWNER_GID_ARG		"owner-gid"
42 #define ETH_MEMIF_MAC_ARG		"mac"
43 #define ETH_MEMIF_ZC_ARG		"zero-copy"
44 #define ETH_MEMIF_SECRET_ARG		"secret"
45 
46 static const char * const valid_arguments[] = {
47 	ETH_MEMIF_ID_ARG,
48 	ETH_MEMIF_ROLE_ARG,
49 	ETH_MEMIF_PKT_BUFFER_SIZE_ARG,
50 	ETH_MEMIF_RING_SIZE_ARG,
51 	ETH_MEMIF_SOCKET_ARG,
52 	ETH_MEMIF_SOCKET_ABSTRACT_ARG,
53 	ETH_MEMIF_OWNER_UID_ARG,
54 	ETH_MEMIF_OWNER_GID_ARG,
55 	ETH_MEMIF_MAC_ARG,
56 	ETH_MEMIF_ZC_ARG,
57 	ETH_MEMIF_SECRET_ARG,
58 	NULL
59 };
60 
61 static const struct rte_eth_link pmd_link = {
62 	.link_speed = RTE_ETH_SPEED_NUM_100G,
63 	.link_duplex = RTE_ETH_LINK_FULL_DUPLEX,
64 	.link_status = RTE_ETH_LINK_DOWN,
65 	.link_autoneg = RTE_ETH_LINK_AUTONEG
66 };
67 
68 #define MEMIF_MP_SEND_REGION		"memif_mp_send_region"
69 
70 
71 static int memif_region_init_zc(const struct rte_memseg_list *msl,
72 				const struct rte_memseg *ms, void *arg);
73 
74 const char *
75 memif_version(void)
76 {
77 	return ("memif-" RTE_STR(MEMIF_VERSION_MAJOR) "." RTE_STR(MEMIF_VERSION_MINOR));
78 }
79 
80 /* Message header to synchronize regions */
81 struct mp_region_msg {
82 	char port_name[RTE_DEV_NAME_MAX_LEN];
83 	memif_region_index_t idx;
84 	memif_region_size_t size;
85 };
86 
87 static int
88 memif_mp_send_region(const struct rte_mp_msg *msg, const void *peer)
89 {
90 	struct rte_eth_dev *dev;
91 	struct pmd_process_private *proc_private;
92 	const struct mp_region_msg *msg_param = (const struct mp_region_msg *)msg->param;
93 	struct rte_mp_msg reply;
94 	struct mp_region_msg *reply_param = (struct mp_region_msg *)reply.param;
95 
96 	/* Get requested port */
97 	dev = rte_eth_dev_get_by_name(msg_param->port_name);
98 	if (!dev) {
99 		MIF_LOG(ERR, "Failed to get port id for %s",
100 			msg_param->port_name);
101 		return -1;
102 	}
103 	proc_private = dev->process_private;
104 
105 	memset(&reply, 0, sizeof(reply));
106 	strlcpy(reply.name, msg->name, sizeof(reply.name));
107 	reply_param->idx = msg_param->idx;
108 	if (proc_private->regions[msg_param->idx] != NULL) {
109 		reply_param->size = proc_private->regions[msg_param->idx]->region_size;
110 		reply.fds[0] = proc_private->regions[msg_param->idx]->fd;
111 		reply.num_fds = 1;
112 	}
113 	reply.len_param = sizeof(*reply_param);
114 	if (rte_mp_reply(&reply, peer) < 0) {
115 		MIF_LOG(ERR, "Failed to reply to an add region request");
116 		return -1;
117 	}
118 
119 	return 0;
120 }
121 
122 /*
123  * Request regions
124  * Called by secondary process, when ports link status goes up.
125  */
126 static int
127 memif_mp_request_regions(struct rte_eth_dev *dev)
128 {
129 	int ret, i;
130 	struct timespec timeout = {.tv_sec = 5, .tv_nsec = 0};
131 	struct rte_mp_msg msg, *reply;
132 	struct rte_mp_reply replies;
133 	struct mp_region_msg *msg_param = (struct mp_region_msg *)msg.param;
134 	struct mp_region_msg *reply_param;
135 	struct memif_region *r;
136 	struct pmd_process_private *proc_private = dev->process_private;
137 	struct pmd_internals *pmd = dev->data->dev_private;
138 	/* in case of zero-copy client, only request region 0 */
139 	uint16_t max_region_num = (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) ?
140 				   1 : ETH_MEMIF_MAX_REGION_NUM;
141 
142 	MIF_LOG(DEBUG, "Requesting memory regions");
143 
144 	for (i = 0; i < max_region_num; i++) {
145 		/* Prepare the message */
146 		memset(&msg, 0, sizeof(msg));
147 		strlcpy(msg.name, MEMIF_MP_SEND_REGION, sizeof(msg.name));
148 		strlcpy(msg_param->port_name, dev->data->name,
149 			sizeof(msg_param->port_name));
150 		msg_param->idx = i;
151 		msg.len_param = sizeof(*msg_param);
152 
153 		/* Send message */
154 		ret = rte_mp_request_sync(&msg, &replies, &timeout);
155 		if (ret < 0 || replies.nb_received != 1) {
156 			MIF_LOG(ERR, "Failed to send mp msg: %d",
157 				rte_errno);
158 			return -1;
159 		}
160 
161 		reply = &replies.msgs[0];
162 		reply_param = (struct mp_region_msg *)reply->param;
163 
164 		if (reply_param->size > 0) {
165 			r = rte_zmalloc("region", sizeof(struct memif_region), 0);
166 			if (r == NULL) {
167 				MIF_LOG(ERR, "Failed to alloc memif region.");
168 				free(reply);
169 				return -ENOMEM;
170 			}
171 			r->region_size = reply_param->size;
172 			if (reply->num_fds < 1) {
173 				MIF_LOG(ERR, "Missing file descriptor.");
174 				free(reply);
175 				return -1;
176 			}
177 			r->fd = reply->fds[0];
178 			r->addr = NULL;
179 
180 			proc_private->regions[reply_param->idx] = r;
181 			proc_private->regions_num++;
182 		}
183 		free(reply);
184 	}
185 
186 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
187 		ret = rte_memseg_walk(memif_region_init_zc, (void *)proc_private);
188 		if (ret < 0)
189 			return ret;
190 	}
191 
192 	return memif_connect(dev);
193 }
194 
195 static int
196 memif_dev_info(struct rte_eth_dev *dev __rte_unused, struct rte_eth_dev_info *dev_info)
197 {
198 	dev_info->max_mac_addrs = 1;
199 	dev_info->max_rx_pktlen = RTE_ETHER_MAX_LEN;
200 	dev_info->max_rx_queues = ETH_MEMIF_MAX_NUM_Q_PAIRS;
201 	dev_info->max_tx_queues = ETH_MEMIF_MAX_NUM_Q_PAIRS;
202 	dev_info->min_rx_bufsize = 0;
203 	dev_info->tx_offload_capa = RTE_ETH_TX_OFFLOAD_MULTI_SEGS;
204 
205 	return 0;
206 }
207 
208 static memif_ring_t *
209 memif_get_ring(struct pmd_internals *pmd, struct pmd_process_private *proc_private,
210 	       memif_ring_type_t type, uint16_t ring_num)
211 {
212 	/* rings only in region 0 */
213 	void *p = proc_private->regions[0]->addr;
214 	int ring_size = sizeof(memif_ring_t) + sizeof(memif_desc_t) *
215 	    (1 << pmd->run.log2_ring_size);
216 
217 	p = (uint8_t *)p + (ring_num + type * pmd->run.num_c2s_rings) * ring_size;
218 
219 	return (memif_ring_t *)p;
220 }
221 
222 static memif_region_offset_t
223 memif_get_ring_offset(struct rte_eth_dev *dev, struct memif_queue *mq,
224 		      memif_ring_type_t type, uint16_t num)
225 {
226 	struct pmd_internals *pmd = dev->data->dev_private;
227 	struct pmd_process_private *proc_private = dev->process_private;
228 
229 	return ((uint8_t *)memif_get_ring(pmd, proc_private, type, num) -
230 		(uint8_t *)proc_private->regions[mq->region]->addr);
231 }
232 
233 static memif_ring_t *
234 memif_get_ring_from_queue(struct pmd_process_private *proc_private,
235 			  struct memif_queue *mq)
236 {
237 	struct memif_region *r;
238 
239 	r = proc_private->regions[mq->region];
240 	if (r == NULL)
241 		return NULL;
242 
243 	return (memif_ring_t *)((uint8_t *)r->addr + mq->ring_offset);
244 }
245 
246 static void *
247 memif_get_buffer(struct pmd_process_private *proc_private, memif_desc_t *d)
248 {
249 	return ((uint8_t *)proc_private->regions[d->region]->addr + d->offset);
250 }
251 
252 /* Free mbufs received by server */
253 static void
254 memif_free_stored_mbufs(struct pmd_process_private *proc_private, struct memif_queue *mq)
255 {
256 	uint16_t cur_tail;
257 	uint16_t mask = (1 << mq->log2_ring_size) - 1;
258 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
259 
260 	/* FIXME: improve performance */
261 	/* The ring->tail acts as a guard variable between Tx and Rx
262 	 * threads, so using load-acquire pairs with store-release
263 	 * in function eth_memif_rx for C2S queues.
264 	 */
265 	cur_tail = rte_atomic_load_explicit(&ring->tail, rte_memory_order_acquire);
266 	while (mq->last_tail != cur_tail) {
267 		RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail + 1) & mask]);
268 		rte_pktmbuf_free_seg(mq->buffers[mq->last_tail & mask]);
269 		mq->last_tail++;
270 	}
271 }
272 
273 static int
274 memif_pktmbuf_chain(struct rte_mbuf *head, struct rte_mbuf *cur_tail,
275 		    struct rte_mbuf *tail)
276 {
277 	/* Check for number-of-segments-overflow */
278 	if (unlikely(head->nb_segs + tail->nb_segs > RTE_MBUF_MAX_NB_SEGS))
279 		return -EOVERFLOW;
280 
281 	/* Chain 'tail' onto the old tail */
282 	cur_tail->next = tail;
283 
284 	/* accumulate number of segments and total length. */
285 	head->nb_segs = (uint16_t)(head->nb_segs + tail->nb_segs);
286 
287 	tail->pkt_len = tail->data_len;
288 	head->pkt_len += tail->pkt_len;
289 
290 	return 0;
291 }
292 
293 static uint16_t
294 eth_memif_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
295 {
296 	struct memif_queue *mq = queue;
297 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
298 	struct pmd_process_private *proc_private =
299 		rte_eth_devices[mq->in_port].process_private;
300 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
301 	uint16_t cur_slot, last_slot, n_slots, ring_size, mask, s0;
302 	uint16_t pkts, rx_pkts, n_rx_pkts = 0;
303 	uint16_t mbuf_size = rte_pktmbuf_data_room_size(mq->mempool) -
304 		RTE_PKTMBUF_HEADROOM;
305 	uint16_t src_len, src_off, dst_len, dst_off, cp_len;
306 	memif_ring_type_t type = mq->type;
307 	memif_desc_t *d0;
308 	struct rte_mbuf *mbuf, *mbuf_head, *mbuf_tail;
309 	uint64_t b;
310 	ssize_t size __rte_unused;
311 	uint16_t head;
312 	int ret;
313 	struct rte_eth_link link;
314 
315 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
316 		return 0;
317 	if (unlikely(ring == NULL)) {
318 		/* Secondary process will attempt to request regions. */
319 		ret = rte_eth_link_get(mq->in_port, &link);
320 		if (ret < 0)
321 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
322 				mq->in_port, rte_strerror(-ret));
323 		return 0;
324 	}
325 
326 	/* consume interrupt */
327 	if (((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) &&
328 	    (rte_intr_fd_get(mq->intr_handle) >= 0))
329 		size = read(rte_intr_fd_get(mq->intr_handle), &b,
330 			    sizeof(b));
331 
332 	ring_size = 1 << mq->log2_ring_size;
333 	mask = ring_size - 1;
334 
335 	if (type == MEMIF_RING_C2S) {
336 		cur_slot = mq->last_head;
337 		last_slot = rte_atomic_load_explicit(&ring->head, rte_memory_order_acquire);
338 	} else {
339 		cur_slot = mq->last_tail;
340 		last_slot = rte_atomic_load_explicit(&ring->tail, rte_memory_order_acquire);
341 	}
342 
343 	if (cur_slot == last_slot)
344 		goto refill;
345 	n_slots = last_slot - cur_slot;
346 
347 	if (likely(mbuf_size >= pmd->cfg.pkt_buffer_size)) {
348 		struct rte_mbuf *mbufs[MAX_PKT_BURST];
349 next_bulk:
350 		ret = rte_pktmbuf_alloc_bulk(mq->mempool, mbufs, MAX_PKT_BURST);
351 		if (unlikely(ret < 0))
352 			goto no_free_bufs;
353 
354 		rx_pkts = 0;
355 		pkts = nb_pkts < MAX_PKT_BURST ? nb_pkts : MAX_PKT_BURST;
356 		while (n_slots && rx_pkts < pkts) {
357 			mbuf_head = mbufs[rx_pkts];
358 			mbuf = mbuf_head;
359 
360 next_slot1:
361 			mbuf->port = mq->in_port;
362 			s0 = cur_slot & mask;
363 			d0 = &ring->desc[s0];
364 
365 			cp_len = d0->length;
366 
367 			rte_pktmbuf_data_len(mbuf) = cp_len;
368 			rte_pktmbuf_pkt_len(mbuf) = cp_len;
369 			if (mbuf != mbuf_head)
370 				rte_pktmbuf_pkt_len(mbuf_head) += cp_len;
371 
372 			rte_memcpy(rte_pktmbuf_mtod(mbuf, void *),
373 				(uint8_t *)memif_get_buffer(proc_private, d0), cp_len);
374 
375 			cur_slot++;
376 			n_slots--;
377 
378 			if (d0->flags & MEMIF_DESC_FLAG_NEXT) {
379 				mbuf_tail = mbuf;
380 				mbuf = rte_pktmbuf_alloc(mq->mempool);
381 				if (unlikely(mbuf == NULL)) {
382 					rte_pktmbuf_free_bulk(mbufs + rx_pkts,
383 							MAX_PKT_BURST - rx_pkts);
384 					goto no_free_bufs;
385 				}
386 				ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
387 				if (unlikely(ret < 0)) {
388 					MIF_LOG(ERR, "number-of-segments-overflow");
389 					rte_pktmbuf_free(mbuf);
390 					rte_pktmbuf_free_bulk(mbufs + rx_pkts,
391 							MAX_PKT_BURST - rx_pkts);
392 					goto no_free_bufs;
393 				}
394 				goto next_slot1;
395 			}
396 
397 			mq->n_bytes += rte_pktmbuf_pkt_len(mbuf_head);
398 			*bufs++ = mbuf_head;
399 			rx_pkts++;
400 			n_rx_pkts++;
401 		}
402 
403 		if (rx_pkts < MAX_PKT_BURST) {
404 			rte_pktmbuf_free_bulk(mbufs + rx_pkts, MAX_PKT_BURST - rx_pkts);
405 		} else {
406 			nb_pkts -= rx_pkts;
407 			if (nb_pkts)
408 				goto next_bulk;
409 		}
410 	} else {
411 		while (n_slots && n_rx_pkts < nb_pkts) {
412 			mbuf_head = rte_pktmbuf_alloc(mq->mempool);
413 			if (unlikely(mbuf_head == NULL))
414 				goto no_free_bufs;
415 			mbuf = mbuf_head;
416 			mbuf->port = mq->in_port;
417 
418 next_slot2:
419 			s0 = cur_slot & mask;
420 			d0 = &ring->desc[s0];
421 
422 			src_len = d0->length;
423 			dst_off = 0;
424 			src_off = 0;
425 
426 			do {
427 				dst_len = mbuf_size - dst_off;
428 				if (dst_len == 0) {
429 					dst_off = 0;
430 					dst_len = mbuf_size;
431 
432 					/* store pointer to tail */
433 					mbuf_tail = mbuf;
434 					mbuf = rte_pktmbuf_alloc(mq->mempool);
435 					if (unlikely(mbuf == NULL))
436 						goto no_free_bufs;
437 					mbuf->port = mq->in_port;
438 					ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
439 					if (unlikely(ret < 0)) {
440 						MIF_LOG(ERR, "number-of-segments-overflow");
441 						rte_pktmbuf_free(mbuf);
442 						goto no_free_bufs;
443 					}
444 				}
445 				cp_len = RTE_MIN(dst_len, src_len);
446 
447 				rte_pktmbuf_data_len(mbuf) += cp_len;
448 				rte_pktmbuf_pkt_len(mbuf) = rte_pktmbuf_data_len(mbuf);
449 				if (mbuf != mbuf_head)
450 					rte_pktmbuf_pkt_len(mbuf_head) += cp_len;
451 
452 				rte_memcpy(rte_pktmbuf_mtod_offset(mbuf, void *,
453 								   dst_off),
454 					(uint8_t *)memif_get_buffer(proc_private, d0) +
455 					src_off, cp_len);
456 
457 				src_off += cp_len;
458 				dst_off += cp_len;
459 				src_len -= cp_len;
460 			} while (src_len);
461 
462 			cur_slot++;
463 			n_slots--;
464 
465 			if (d0->flags & MEMIF_DESC_FLAG_NEXT)
466 				goto next_slot2;
467 
468 			mq->n_bytes += rte_pktmbuf_pkt_len(mbuf_head);
469 			*bufs++ = mbuf_head;
470 			n_rx_pkts++;
471 		}
472 	}
473 
474 no_free_bufs:
475 	if (type == MEMIF_RING_C2S) {
476 		rte_atomic_store_explicit(&ring->tail, cur_slot, rte_memory_order_release);
477 		mq->last_head = cur_slot;
478 	} else {
479 		mq->last_tail = cur_slot;
480 	}
481 
482 refill:
483 	if (type == MEMIF_RING_S2C) {
484 		/* ring->head is updated by the receiver and this function
485 		 * is called in the context of receiver thread. The loads in
486 		 * the receiver do not need to synchronize with its own stores.
487 		 */
488 		head = rte_atomic_load_explicit(&ring->head, rte_memory_order_relaxed);
489 		n_slots = ring_size - head + mq->last_tail;
490 
491 		while (n_slots--) {
492 			s0 = head++ & mask;
493 			d0 = &ring->desc[s0];
494 			d0->length = pmd->run.pkt_buffer_size;
495 		}
496 		rte_atomic_store_explicit(&ring->head, head, rte_memory_order_release);
497 	}
498 
499 	mq->n_pkts += n_rx_pkts;
500 	return n_rx_pkts;
501 }
502 
503 static uint16_t
504 eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
505 {
506 	struct memif_queue *mq = queue;
507 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
508 	struct pmd_process_private *proc_private =
509 		rte_eth_devices[mq->in_port].process_private;
510 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
511 	uint16_t cur_slot, last_slot, n_slots, ring_size, mask, s0, head;
512 	uint16_t n_rx_pkts = 0;
513 	memif_desc_t *d0;
514 	struct rte_mbuf *mbuf, *mbuf_tail;
515 	struct rte_mbuf *mbuf_head = NULL;
516 	int ret;
517 	struct rte_eth_link link;
518 
519 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
520 		return 0;
521 	if (unlikely(ring == NULL)) {
522 		/* Secondary process will attempt to request regions. */
523 		ret = rte_eth_link_get(mq->in_port, &link);
524 		if (ret < 0)
525 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
526 				mq->in_port, rte_strerror(-ret));
527 		return 0;
528 	}
529 
530 	/* consume interrupt */
531 	if ((rte_intr_fd_get(mq->intr_handle) >= 0) &&
532 	    ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0)) {
533 		uint64_t b;
534 		ssize_t size __rte_unused;
535 		size = read(rte_intr_fd_get(mq->intr_handle), &b,
536 			    sizeof(b));
537 	}
538 
539 	ring_size = 1 << mq->log2_ring_size;
540 	mask = ring_size - 1;
541 
542 	cur_slot = mq->last_tail;
543 	/* The ring->tail acts as a guard variable between Tx and Rx
544 	 * threads, so using load-acquire pairs with store-release
545 	 * to synchronize it between threads.
546 	 */
547 	last_slot = rte_atomic_load_explicit(&ring->tail, rte_memory_order_acquire);
548 	if (cur_slot == last_slot)
549 		goto refill;
550 	n_slots = last_slot - cur_slot;
551 
552 	while (n_slots && n_rx_pkts < nb_pkts) {
553 		s0 = cur_slot & mask;
554 
555 		d0 = &ring->desc[s0];
556 		mbuf_head = mq->buffers[s0];
557 		mbuf = mbuf_head;
558 
559 next_slot:
560 		/* prefetch next descriptor */
561 		if (n_rx_pkts + 1 < nb_pkts)
562 			rte_prefetch0(&ring->desc[(cur_slot + 1) & mask]);
563 
564 		mbuf->port = mq->in_port;
565 		rte_pktmbuf_data_len(mbuf) = d0->length;
566 		rte_pktmbuf_pkt_len(mbuf) = rte_pktmbuf_data_len(mbuf);
567 
568 		mq->n_bytes += rte_pktmbuf_data_len(mbuf);
569 
570 		cur_slot++;
571 		n_slots--;
572 		if (d0->flags & MEMIF_DESC_FLAG_NEXT) {
573 			s0 = cur_slot & mask;
574 			d0 = &ring->desc[s0];
575 			mbuf_tail = mbuf;
576 			mbuf = mq->buffers[s0];
577 			ret = memif_pktmbuf_chain(mbuf_head, mbuf_tail, mbuf);
578 			if (unlikely(ret < 0)) {
579 				MIF_LOG(ERR, "number-of-segments-overflow");
580 				goto refill;
581 			}
582 			goto next_slot;
583 		}
584 
585 		*bufs++ = mbuf_head;
586 		n_rx_pkts++;
587 	}
588 
589 	mq->last_tail = cur_slot;
590 
591 /* Supply server with new buffers */
592 refill:
593 	/* ring->head is updated by the receiver and this function
594 	 * is called in the context of receiver thread. The loads in
595 	 * the receiver do not need to synchronize with its own stores.
596 	 */
597 	head = rte_atomic_load_explicit(&ring->head, rte_memory_order_relaxed);
598 	n_slots = ring_size - head + mq->last_tail;
599 
600 	if (n_slots < 32)
601 		goto no_free_mbufs;
602 
603 	ret = rte_pktmbuf_alloc_bulk(mq->mempool, &mq->buffers[head & mask], n_slots);
604 	if (unlikely(ret < 0))
605 		goto no_free_mbufs;
606 	if (unlikely(n_slots > ring_size - (head & mask))) {
607 		rte_memcpy(mq->buffers, &mq->buffers[ring_size],
608 			(n_slots + (head & mask) - ring_size) * sizeof(struct rte_mbuf *));
609 	}
610 
611 	while (n_slots--) {
612 		s0 = head++ & mask;
613 		if (n_slots > 0)
614 			rte_prefetch0(mq->buffers[head & mask]);
615 		d0 = &ring->desc[s0];
616 		/* store buffer header */
617 		mbuf = mq->buffers[s0];
618 		/* populate descriptor */
619 		d0->length = rte_pktmbuf_data_room_size(mq->mempool) -
620 				RTE_PKTMBUF_HEADROOM;
621 		d0->region = 1;
622 		d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
623 			(uint8_t *)proc_private->regions[d0->region]->addr;
624 	}
625 no_free_mbufs:
626 	/* The ring->head acts as a guard variable between Tx and Rx
627 	 * threads, so using store-release pairs with load-acquire
628 	 * in function eth_memif_tx.
629 	 */
630 	rte_atomic_store_explicit(&ring->head, head, rte_memory_order_release);
631 
632 	mq->n_pkts += n_rx_pkts;
633 
634 	return n_rx_pkts;
635 }
636 
637 static uint16_t
638 eth_memif_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
639 {
640 	struct memif_queue *mq = queue;
641 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
642 	struct pmd_process_private *proc_private =
643 		rte_eth_devices[mq->in_port].process_private;
644 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
645 	uint16_t slot, saved_slot, n_free, ring_size, mask, n_tx_pkts = 0;
646 	uint16_t src_len, src_off, dst_len, dst_off, cp_len, nb_segs;
647 	memif_ring_type_t type = mq->type;
648 	memif_desc_t *d0;
649 	struct rte_mbuf *mbuf;
650 	struct rte_mbuf *mbuf_head;
651 	uint64_t a;
652 	ssize_t size;
653 	struct rte_eth_link link;
654 
655 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
656 		return 0;
657 	if (unlikely(ring == NULL)) {
658 		int ret;
659 
660 		/* Secondary process will attempt to request regions. */
661 		ret = rte_eth_link_get(mq->in_port, &link);
662 		if (ret < 0)
663 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
664 				mq->in_port, rte_strerror(-ret));
665 		return 0;
666 	}
667 
668 	ring_size = 1 << mq->log2_ring_size;
669 	mask = ring_size - 1;
670 
671 	if (type == MEMIF_RING_C2S) {
672 		/* For C2S queues ring->head is updated by the sender and
673 		 * this function is called in the context of sending thread.
674 		 * The loads in the sender do not need to synchronize with
675 		 * its own stores. Hence, the following load can be a
676 		 * relaxed load.
677 		 */
678 		slot = rte_atomic_load_explicit(&ring->head, rte_memory_order_relaxed);
679 		n_free = ring_size - slot +
680 				rte_atomic_load_explicit(&ring->tail, rte_memory_order_acquire);
681 	} else {
682 		/* For S2C queues ring->tail is updated by the sender and
683 		 * this function is called in the context of sending thread.
684 		 * The loads in the sender do not need to synchronize with
685 		 * its own stores. Hence, the following load can be a
686 		 * relaxed load.
687 		 */
688 		slot = rte_atomic_load_explicit(&ring->tail, rte_memory_order_relaxed);
689 		n_free = rte_atomic_load_explicit(&ring->head, rte_memory_order_acquire) - slot;
690 	}
691 
692 	uint16_t i;
693 	struct rte_mbuf **buf_tmp = bufs;
694 	mbuf_head = *buf_tmp++;
695 	struct rte_mempool *mp = mbuf_head->pool;
696 
697 	for (i = 1; i < nb_pkts; i++) {
698 		mbuf_head = *buf_tmp++;
699 		if (mbuf_head->pool != mp)
700 			break;
701 	}
702 
703 	uint16_t mbuf_size = rte_pktmbuf_data_room_size(mp) - RTE_PKTMBUF_HEADROOM;
704 	if (i == nb_pkts && pmd->cfg.pkt_buffer_size >= mbuf_size) {
705 		buf_tmp = bufs;
706 		while (n_tx_pkts < nb_pkts && n_free) {
707 			mbuf_head = *bufs++;
708 			nb_segs = mbuf_head->nb_segs;
709 			mbuf = mbuf_head;
710 
711 			saved_slot = slot;
712 
713 next_in_chain1:
714 			d0 = &ring->desc[slot & mask];
715 			cp_len = rte_pktmbuf_data_len(mbuf);
716 
717 			rte_memcpy((uint8_t *)memif_get_buffer(proc_private, d0),
718 				rte_pktmbuf_mtod(mbuf, void *), cp_len);
719 
720 			d0->length = cp_len;
721 			mq->n_bytes += cp_len;
722 			slot++;
723 			n_free--;
724 
725 			if (--nb_segs > 0) {
726 				if (n_free) {
727 					d0->flags |= MEMIF_DESC_FLAG_NEXT;
728 					mbuf = mbuf->next;
729 					goto next_in_chain1;
730 				} else {
731 					slot = saved_slot;
732 					goto free_mbufs;
733 				}
734 			}
735 
736 			n_tx_pkts++;
737 		}
738 free_mbufs:
739 		rte_pktmbuf_free_bulk(buf_tmp, n_tx_pkts);
740 	} else {
741 		while (n_tx_pkts < nb_pkts && n_free) {
742 			mbuf_head = *bufs++;
743 			nb_segs = mbuf_head->nb_segs;
744 			mbuf = mbuf_head;
745 
746 			saved_slot = slot;
747 			d0 = &ring->desc[slot & mask];
748 			dst_off = 0;
749 			dst_len = (type == MEMIF_RING_C2S) ?
750 				pmd->run.pkt_buffer_size : d0->length;
751 
752 next_in_chain2:
753 			src_off = 0;
754 			src_len = rte_pktmbuf_data_len(mbuf);
755 
756 			while (src_len) {
757 				if (dst_len == 0) {
758 					if (n_free) {
759 						slot++;
760 						n_free--;
761 						d0->flags |= MEMIF_DESC_FLAG_NEXT;
762 						d0 = &ring->desc[slot & mask];
763 						dst_off = 0;
764 						dst_len = (type == MEMIF_RING_C2S) ?
765 						    pmd->run.pkt_buffer_size : d0->length;
766 						d0->flags = 0;
767 					} else {
768 						slot = saved_slot;
769 						goto no_free_slots;
770 					}
771 				}
772 				cp_len = RTE_MIN(dst_len, src_len);
773 
774 				rte_memcpy((uint8_t *)memif_get_buffer(proc_private,
775 								       d0) + dst_off,
776 					rte_pktmbuf_mtod_offset(mbuf, void *, src_off),
777 					cp_len);
778 
779 				mq->n_bytes += cp_len;
780 				src_off += cp_len;
781 				dst_off += cp_len;
782 				src_len -= cp_len;
783 				dst_len -= cp_len;
784 
785 				d0->length = dst_off;
786 			}
787 
788 			if (--nb_segs > 0) {
789 				mbuf = mbuf->next;
790 				goto next_in_chain2;
791 			}
792 
793 			n_tx_pkts++;
794 			slot++;
795 			n_free--;
796 			rte_pktmbuf_free(mbuf_head);
797 		}
798 	}
799 
800 no_free_slots:
801 	if (type == MEMIF_RING_C2S)
802 		rte_atomic_store_explicit(&ring->head, slot, rte_memory_order_release);
803 	else
804 		rte_atomic_store_explicit(&ring->tail, slot, rte_memory_order_release);
805 
806 	if (((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) &&
807 	    (rte_intr_fd_get(mq->intr_handle) >= 0)) {
808 		a = 1;
809 		size = write(rte_intr_fd_get(mq->intr_handle), &a,
810 			     sizeof(a));
811 		if (unlikely(size < 0)) {
812 			MIF_LOG(WARNING,
813 				"Failed to send interrupt. %s", strerror(errno));
814 		}
815 	}
816 
817 	mq->n_pkts += n_tx_pkts;
818 	return n_tx_pkts;
819 }
820 
821 static int
822 memif_tx_one_zc(struct pmd_process_private *proc_private, struct memif_queue *mq,
823 		memif_ring_t *ring, struct rte_mbuf *mbuf, const uint16_t mask,
824 		uint16_t slot, uint16_t n_free)
825 {
826 	memif_desc_t *d0;
827 	uint16_t nb_segs = mbuf->nb_segs;
828 	int used_slots = 1;
829 
830 next_in_chain:
831 	/* store pointer to mbuf to free it later */
832 	mq->buffers[slot & mask] = mbuf;
833 	/* populate descriptor */
834 	d0 = &ring->desc[slot & mask];
835 	d0->length = rte_pktmbuf_data_len(mbuf);
836 	mq->n_bytes += rte_pktmbuf_data_len(mbuf);
837 	/* FIXME: get region index */
838 	d0->region = 1;
839 	d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
840 		(uint8_t *)proc_private->regions[d0->region]->addr;
841 	d0->flags = 0;
842 
843 	/* check if buffer is chained */
844 	if (--nb_segs > 0) {
845 		if (n_free < 2)
846 			return 0;
847 		/* mark buffer as chained */
848 		d0->flags |= MEMIF_DESC_FLAG_NEXT;
849 		/* advance mbuf */
850 		mbuf = mbuf->next;
851 		/* update counters */
852 		used_slots++;
853 		slot++;
854 		n_free--;
855 		goto next_in_chain;
856 	}
857 	return used_slots;
858 }
859 
860 static uint16_t
861 eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
862 {
863 	struct memif_queue *mq = queue;
864 	struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
865 	struct pmd_process_private *proc_private =
866 		rte_eth_devices[mq->in_port].process_private;
867 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
868 	uint16_t slot, n_free, ring_size, mask, n_tx_pkts = 0;
869 	struct rte_eth_link link;
870 
871 	if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
872 		return 0;
873 	if (unlikely(ring == NULL)) {
874 		int ret;
875 
876 		/* Secondary process will attempt to request regions. */
877 		ret = rte_eth_link_get(mq->in_port, &link);
878 		if (ret < 0)
879 			MIF_LOG(ERR, "Failed to get port %u link info: %s",
880 				mq->in_port, rte_strerror(-ret));
881 		return 0;
882 	}
883 
884 	ring_size = 1 << mq->log2_ring_size;
885 	mask = ring_size - 1;
886 
887 	/* free mbufs received by server */
888 	memif_free_stored_mbufs(proc_private, mq);
889 
890 	/* ring type always MEMIF_RING_C2S */
891 	/* For C2S queues ring->head is updated by the sender and
892 	 * this function is called in the context of sending thread.
893 	 * The loads in the sender do not need to synchronize with
894 	 * its own stores. Hence, the following load can be a
895 	 * relaxed load.
896 	 */
897 	slot = rte_atomic_load_explicit(&ring->head, rte_memory_order_relaxed);
898 	n_free = ring_size - slot + mq->last_tail;
899 
900 	int used_slots;
901 
902 	while (n_free && (n_tx_pkts < nb_pkts)) {
903 		while ((n_free > 4) && ((nb_pkts - n_tx_pkts) > 4)) {
904 			if ((nb_pkts - n_tx_pkts) > 8) {
905 				rte_prefetch0(*bufs + 4);
906 				rte_prefetch0(*bufs + 5);
907 				rte_prefetch0(*bufs + 6);
908 				rte_prefetch0(*bufs + 7);
909 			}
910 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
911 				mask, slot, n_free);
912 			if (unlikely(used_slots < 1))
913 				goto no_free_slots;
914 			n_tx_pkts++;
915 			slot += used_slots;
916 			n_free -= used_slots;
917 
918 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
919 				mask, slot, n_free);
920 			if (unlikely(used_slots < 1))
921 				goto no_free_slots;
922 			n_tx_pkts++;
923 			slot += used_slots;
924 			n_free -= used_slots;
925 
926 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
927 				mask, slot, n_free);
928 			if (unlikely(used_slots < 1))
929 				goto no_free_slots;
930 			n_tx_pkts++;
931 			slot += used_slots;
932 			n_free -= used_slots;
933 
934 			used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
935 				mask, slot, n_free);
936 			if (unlikely(used_slots < 1))
937 				goto no_free_slots;
938 			n_tx_pkts++;
939 			slot += used_slots;
940 			n_free -= used_slots;
941 		}
942 		used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
943 			mask, slot, n_free);
944 		if (unlikely(used_slots < 1))
945 			goto no_free_slots;
946 		n_tx_pkts++;
947 		slot += used_slots;
948 		n_free -= used_slots;
949 	}
950 
951 no_free_slots:
952 	/* ring type always MEMIF_RING_C2S */
953 	/* The ring->head acts as a guard variable between Tx and Rx
954 	 * threads, so using store-release pairs with load-acquire
955 	 * in function eth_memif_rx for C2S rings.
956 	 */
957 	rte_atomic_store_explicit(&ring->head, slot, rte_memory_order_release);
958 
959 	/* Send interrupt, if enabled. */
960 	if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
961 		uint64_t a = 1;
962 		if (rte_intr_fd_get(mq->intr_handle) < 0)
963 			return -1;
964 
965 		ssize_t size = write(rte_intr_fd_get(mq->intr_handle),
966 				     &a, sizeof(a));
967 		if (unlikely(size < 0)) {
968 			MIF_LOG(WARNING,
969 				"Failed to send interrupt. %s", strerror(errno));
970 		}
971 	}
972 
973 	/* increment queue counters */
974 	mq->n_pkts += n_tx_pkts;
975 
976 	return n_tx_pkts;
977 }
978 
979 void
980 memif_free_regions(struct rte_eth_dev *dev)
981 {
982 	struct pmd_process_private *proc_private = dev->process_private;
983 	struct pmd_internals *pmd = dev->data->dev_private;
984 	int i;
985 	struct memif_region *r;
986 
987 	/* regions are allocated contiguously, so it's
988 	 * enough to loop until 'proc_private->regions_num'
989 	 */
990 	for (i = 0; i < proc_private->regions_num; i++) {
991 		r = proc_private->regions[i];
992 		if (r != NULL) {
993 			/* This is memzone */
994 			if (i > 0 && (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)) {
995 				r->addr = NULL;
996 				if (r->fd > 0)
997 					close(r->fd);
998 			}
999 			if (r->addr != NULL) {
1000 				munmap(r->addr, r->region_size);
1001 				if (r->fd > 0) {
1002 					close(r->fd);
1003 					r->fd = -1;
1004 				}
1005 			}
1006 			rte_free(r);
1007 			proc_private->regions[i] = NULL;
1008 		}
1009 	}
1010 	proc_private->regions_num = 0;
1011 }
1012 
1013 static int
1014 memif_region_init_zc(const struct rte_memseg_list *msl, const struct rte_memseg *ms,
1015 		     void *arg)
1016 {
1017 	struct pmd_process_private *proc_private = (struct pmd_process_private *)arg;
1018 	struct memif_region *r;
1019 
1020 	if (proc_private->regions_num < 1) {
1021 		MIF_LOG(ERR, "Missing descriptor region");
1022 		return -1;
1023 	}
1024 
1025 	r = proc_private->regions[proc_private->regions_num - 1];
1026 
1027 	if (r->addr != msl->base_va)
1028 		r = proc_private->regions[++proc_private->regions_num - 1];
1029 
1030 	if (r == NULL) {
1031 		r = rte_zmalloc("region", sizeof(struct memif_region), 0);
1032 		if (r == NULL) {
1033 			MIF_LOG(ERR, "Failed to alloc memif region.");
1034 			return -ENOMEM;
1035 		}
1036 
1037 		r->addr = msl->base_va;
1038 		r->region_size = ms->len;
1039 		r->fd = rte_memseg_get_fd(ms);
1040 		if (r->fd < 0)
1041 			return -1;
1042 		r->pkt_buffer_offset = 0;
1043 
1044 		proc_private->regions[proc_private->regions_num - 1] = r;
1045 	} else {
1046 		r->region_size += ms->len;
1047 	}
1048 
1049 	return 0;
1050 }
1051 
1052 static int
1053 memif_region_init_shm(struct rte_eth_dev *dev, uint8_t has_buffers)
1054 {
1055 	struct pmd_internals *pmd = dev->data->dev_private;
1056 	struct pmd_process_private *proc_private = dev->process_private;
1057 	char shm_name[ETH_MEMIF_SHM_NAME_SIZE];
1058 	int ret = 0;
1059 	struct memif_region *r;
1060 
1061 	if (proc_private->regions_num >= ETH_MEMIF_MAX_REGION_NUM) {
1062 		MIF_LOG(ERR, "Too many regions.");
1063 		return -1;
1064 	}
1065 
1066 	r = rte_zmalloc("region", sizeof(struct memif_region), 0);
1067 	if (r == NULL) {
1068 		MIF_LOG(ERR, "Failed to alloc memif region.");
1069 		return -ENOMEM;
1070 	}
1071 
1072 	/* calculate buffer offset */
1073 	r->pkt_buffer_offset = (pmd->run.num_c2s_rings + pmd->run.num_s2c_rings) *
1074 	    (sizeof(memif_ring_t) + sizeof(memif_desc_t) *
1075 	    (1 << pmd->run.log2_ring_size));
1076 
1077 	r->region_size = r->pkt_buffer_offset;
1078 	/* if region has buffers, add buffers size to region_size */
1079 	if (has_buffers == 1)
1080 		r->region_size += (uint32_t)(pmd->run.pkt_buffer_size *
1081 			(1 << pmd->run.log2_ring_size) *
1082 			(pmd->run.num_c2s_rings +
1083 			 pmd->run.num_s2c_rings));
1084 
1085 	memset(shm_name, 0, sizeof(char) * ETH_MEMIF_SHM_NAME_SIZE);
1086 	snprintf(shm_name, ETH_MEMIF_SHM_NAME_SIZE, "memif_region_%d",
1087 		 proc_private->regions_num);
1088 
1089 	r->fd = memfd_create(shm_name, MFD_ALLOW_SEALING);
1090 	if (r->fd < 0) {
1091 		MIF_LOG(ERR, "Failed to create shm file: %s.", strerror(errno));
1092 		ret = -1;
1093 		goto error;
1094 	}
1095 
1096 	ret = fcntl(r->fd, F_ADD_SEALS, F_SEAL_SHRINK);
1097 	if (ret < 0) {
1098 		MIF_LOG(ERR, "Failed to add seals to shm file: %s.", strerror(errno));
1099 		goto error;
1100 	}
1101 
1102 	ret = ftruncate(r->fd, r->region_size);
1103 	if (ret < 0) {
1104 		MIF_LOG(ERR, "Failed to truncate shm file: %s.", strerror(errno));
1105 		goto error;
1106 	}
1107 
1108 	r->addr = mmap(NULL, r->region_size, PROT_READ |
1109 		       PROT_WRITE, MAP_SHARED, r->fd, 0);
1110 	if (r->addr == MAP_FAILED) {
1111 		MIF_LOG(ERR, "Failed to mmap shm region: %s.", strerror(ret));
1112 		ret = -1;
1113 		goto error;
1114 	}
1115 
1116 	proc_private->regions[proc_private->regions_num] = r;
1117 	proc_private->regions_num++;
1118 
1119 	return ret;
1120 
1121 error:
1122 	if (r->fd > 0)
1123 		close(r->fd);
1124 	r->fd = -1;
1125 
1126 	return ret;
1127 }
1128 
1129 static int
1130 memif_regions_init(struct rte_eth_dev *dev)
1131 {
1132 	struct pmd_internals *pmd = dev->data->dev_private;
1133 	int ret;
1134 
1135 	/*
1136 	 * Zero-copy exposes dpdk memory.
1137 	 * Each memseg list will be represented by memif region.
1138 	 * Zero-copy regions indexing: memseg list idx + 1,
1139 	 * as we already have region 0 reserved for descriptors.
1140 	 */
1141 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1142 		/* create region idx 0 containing descriptors */
1143 		ret = memif_region_init_shm(dev, 0);
1144 		if (ret < 0)
1145 			return ret;
1146 		ret = rte_memseg_walk(memif_region_init_zc, (void *)dev->process_private);
1147 		if (ret < 0)
1148 			return ret;
1149 	} else {
1150 		/* create one memory region containing rings and buffers */
1151 		ret = memif_region_init_shm(dev, /* has buffers */ 1);
1152 		if (ret < 0)
1153 			return ret;
1154 	}
1155 
1156 	return 0;
1157 }
1158 
1159 static void
1160 memif_init_rings(struct rte_eth_dev *dev)
1161 {
1162 	struct pmd_internals *pmd = dev->data->dev_private;
1163 	struct pmd_process_private *proc_private = dev->process_private;
1164 	memif_ring_t *ring;
1165 	int i, j;
1166 	uint16_t slot;
1167 
1168 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1169 		ring = memif_get_ring(pmd, proc_private, MEMIF_RING_C2S, i);
1170 		rte_atomic_store_explicit(&ring->head, 0, rte_memory_order_relaxed);
1171 		rte_atomic_store_explicit(&ring->tail, 0, rte_memory_order_relaxed);
1172 		ring->cookie = MEMIF_COOKIE;
1173 		ring->flags = 0;
1174 
1175 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)
1176 			continue;
1177 
1178 		for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
1179 			slot = i * (1 << pmd->run.log2_ring_size) + j;
1180 			ring->desc[j].region = 0;
1181 			ring->desc[j].offset =
1182 				proc_private->regions[0]->pkt_buffer_offset +
1183 				(uint32_t)(slot * pmd->run.pkt_buffer_size);
1184 			ring->desc[j].length = pmd->run.pkt_buffer_size;
1185 		}
1186 	}
1187 
1188 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1189 		ring = memif_get_ring(pmd, proc_private, MEMIF_RING_S2C, i);
1190 		rte_atomic_store_explicit(&ring->head, 0, rte_memory_order_relaxed);
1191 		rte_atomic_store_explicit(&ring->tail, 0, rte_memory_order_relaxed);
1192 		ring->cookie = MEMIF_COOKIE;
1193 		ring->flags = 0;
1194 
1195 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)
1196 			continue;
1197 
1198 		for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
1199 			slot = (i + pmd->run.num_c2s_rings) *
1200 			    (1 << pmd->run.log2_ring_size) + j;
1201 			ring->desc[j].region = 0;
1202 			ring->desc[j].offset =
1203 				proc_private->regions[0]->pkt_buffer_offset +
1204 				(uint32_t)(slot * pmd->run.pkt_buffer_size);
1205 			ring->desc[j].length = pmd->run.pkt_buffer_size;
1206 		}
1207 	}
1208 }
1209 
1210 /* called only by client */
1211 static int
1212 memif_init_queues(struct rte_eth_dev *dev)
1213 {
1214 	struct pmd_internals *pmd = dev->data->dev_private;
1215 	struct memif_queue *mq;
1216 	int i;
1217 
1218 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1219 		mq = dev->data->tx_queues[i];
1220 		mq->log2_ring_size = pmd->run.log2_ring_size;
1221 		/* queues located only in region 0 */
1222 		mq->region = 0;
1223 		mq->ring_offset = memif_get_ring_offset(dev, mq, MEMIF_RING_C2S, i);
1224 		mq->last_head = 0;
1225 		mq->last_tail = 0;
1226 		if (rte_intr_fd_set(mq->intr_handle, eventfd(0, EFD_NONBLOCK)))
1227 			return -rte_errno;
1228 
1229 		if (rte_intr_fd_get(mq->intr_handle) < 0) {
1230 			MIF_LOG(WARNING,
1231 				"Failed to create eventfd for tx queue %d: %s.", i,
1232 				strerror(errno));
1233 		}
1234 		mq->buffers = NULL;
1235 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1236 			mq->buffers = rte_zmalloc("bufs", sizeof(struct rte_mbuf *) *
1237 						  (1 << mq->log2_ring_size), 0);
1238 			if (mq->buffers == NULL)
1239 				return -ENOMEM;
1240 		}
1241 	}
1242 
1243 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1244 		mq = dev->data->rx_queues[i];
1245 		mq->log2_ring_size = pmd->run.log2_ring_size;
1246 		/* queues located only in region 0 */
1247 		mq->region = 0;
1248 		mq->ring_offset = memif_get_ring_offset(dev, mq, MEMIF_RING_S2C, i);
1249 		mq->last_head = 0;
1250 		mq->last_tail = 0;
1251 		if (rte_intr_fd_set(mq->intr_handle, eventfd(0, EFD_NONBLOCK)))
1252 			return -rte_errno;
1253 		if (rte_intr_fd_get(mq->intr_handle) < 0) {
1254 			MIF_LOG(WARNING,
1255 				"Failed to create eventfd for rx queue %d: %s.", i,
1256 				strerror(errno));
1257 		}
1258 		mq->buffers = NULL;
1259 		if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1260 			/*
1261 			 * Allocate 2x ring_size to reserve a contiguous array for
1262 			 * rte_pktmbuf_alloc_bulk (to store allocated mbufs).
1263 			 */
1264 			mq->buffers = rte_zmalloc("bufs", sizeof(struct rte_mbuf *) *
1265 						  (1 << (mq->log2_ring_size + 1)), 0);
1266 			if (mq->buffers == NULL)
1267 				return -ENOMEM;
1268 		}
1269 	}
1270 	return 0;
1271 }
1272 
1273 int
1274 memif_init_regions_and_queues(struct rte_eth_dev *dev)
1275 {
1276 	int ret;
1277 
1278 	ret = memif_regions_init(dev);
1279 	if (ret < 0)
1280 		return ret;
1281 
1282 	memif_init_rings(dev);
1283 
1284 	ret = memif_init_queues(dev);
1285 	if (ret < 0)
1286 		return ret;
1287 
1288 	return 0;
1289 }
1290 
1291 int
1292 memif_connect(struct rte_eth_dev *dev)
1293 {
1294 	struct pmd_internals *pmd = dev->data->dev_private;
1295 	struct pmd_process_private *proc_private = dev->process_private;
1296 	struct memif_region *mr;
1297 	struct memif_queue *mq;
1298 	memif_ring_t *ring;
1299 	int i;
1300 
1301 	for (i = 0; i < proc_private->regions_num; i++) {
1302 		mr = proc_private->regions[i];
1303 		if (mr != NULL) {
1304 			if (mr->addr == NULL) {
1305 				if (mr->fd < 0)
1306 					return -1;
1307 				mr->addr = mmap(NULL, mr->region_size,
1308 						PROT_READ | PROT_WRITE,
1309 						MAP_SHARED, mr->fd, 0);
1310 				if (mr->addr == MAP_FAILED) {
1311 					MIF_LOG(ERR, "mmap failed: %s",
1312 						strerror(errno));
1313 					return -1;
1314 				}
1315 			}
1316 			if (i > 0 && (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY)) {
1317 				/* close memseg file */
1318 				close(mr->fd);
1319 				mr->fd = -1;
1320 			}
1321 		}
1322 	}
1323 
1324 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
1325 		for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1326 			mq = (pmd->role == MEMIF_ROLE_CLIENT) ?
1327 			    dev->data->tx_queues[i] : dev->data->rx_queues[i];
1328 			ring = memif_get_ring_from_queue(proc_private, mq);
1329 			if (ring == NULL || ring->cookie != MEMIF_COOKIE) {
1330 				MIF_LOG(ERR, "Wrong ring");
1331 				return -1;
1332 			}
1333 			rte_atomic_store_explicit(&ring->head, 0, rte_memory_order_relaxed);
1334 			rte_atomic_store_explicit(&ring->tail, 0, rte_memory_order_relaxed);
1335 			mq->last_head = 0;
1336 			mq->last_tail = 0;
1337 			/* enable polling mode */
1338 			if (pmd->role == MEMIF_ROLE_SERVER)
1339 				ring->flags = MEMIF_RING_FLAG_MASK_INT;
1340 		}
1341 		for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1342 			mq = (pmd->role == MEMIF_ROLE_CLIENT) ?
1343 			    dev->data->rx_queues[i] : dev->data->tx_queues[i];
1344 			ring = memif_get_ring_from_queue(proc_private, mq);
1345 			if (ring == NULL || ring->cookie != MEMIF_COOKIE) {
1346 				MIF_LOG(ERR, "Wrong ring");
1347 				return -1;
1348 			}
1349 			rte_atomic_store_explicit(&ring->head, 0, rte_memory_order_relaxed);
1350 			rte_atomic_store_explicit(&ring->tail, 0, rte_memory_order_relaxed);
1351 			mq->last_head = 0;
1352 			mq->last_tail = 0;
1353 			/* enable polling mode */
1354 			if (pmd->role == MEMIF_ROLE_CLIENT)
1355 				ring->flags = MEMIF_RING_FLAG_MASK_INT;
1356 		}
1357 
1358 		pmd->flags &= ~ETH_MEMIF_FLAG_CONNECTING;
1359 		pmd->flags |= ETH_MEMIF_FLAG_CONNECTED;
1360 		dev->data->dev_link.link_status = RTE_ETH_LINK_UP;
1361 	}
1362 	MIF_LOG(INFO, "Connected.");
1363 	return 0;
1364 }
1365 
1366 static int
1367 memif_dev_start(struct rte_eth_dev *dev)
1368 {
1369 	struct pmd_internals *pmd = dev->data->dev_private;
1370 	int ret = 0;
1371 	uint16_t i;
1372 
1373 	switch (pmd->role) {
1374 	case MEMIF_ROLE_CLIENT:
1375 		ret = memif_connect_client(dev);
1376 		break;
1377 	case MEMIF_ROLE_SERVER:
1378 		ret = memif_connect_server(dev);
1379 		break;
1380 	default:
1381 		MIF_LOG(ERR, "Unknown role: %d.", pmd->role);
1382 		ret = -1;
1383 		break;
1384 	}
1385 
1386 	if (ret == 0) {
1387 		for (i = 0; i < dev->data->nb_rx_queues; i++)
1388 			dev->data->rx_queue_state[i] = RTE_ETH_QUEUE_STATE_STARTED;
1389 		for (i = 0; i < dev->data->nb_tx_queues; i++)
1390 			dev->data->tx_queue_state[i] = RTE_ETH_QUEUE_STATE_STARTED;
1391 	}
1392 
1393 	return ret;
1394 }
1395 
1396 static int
1397 memif_dev_stop(struct rte_eth_dev *dev)
1398 {
1399 	uint16_t i;
1400 
1401 	memif_disconnect(dev);
1402 
1403 	for (i = 0; i < dev->data->nb_rx_queues; i++)
1404 		dev->data->rx_queue_state[i] = RTE_ETH_QUEUE_STATE_STOPPED;
1405 	for (i = 0; i < dev->data->nb_tx_queues; i++)
1406 		dev->data->tx_queue_state[i] = RTE_ETH_QUEUE_STATE_STOPPED;
1407 
1408 	return 0;
1409 }
1410 
1411 static int
1412 memif_dev_close(struct rte_eth_dev *dev)
1413 {
1414 	struct pmd_internals *pmd = dev->data->dev_private;
1415 	int i;
1416 
1417 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
1418 		memif_msg_enq_disconnect(pmd->cc, "Device closed", 0);
1419 
1420 		for (i = 0; i < dev->data->nb_rx_queues; i++)
1421 			(*dev->dev_ops->rx_queue_release)(dev, i);
1422 		for (i = 0; i < dev->data->nb_tx_queues; i++)
1423 			(*dev->dev_ops->tx_queue_release)(dev, i);
1424 
1425 		memif_socket_remove_device(dev);
1426 	}
1427 
1428 	rte_free(dev->process_private);
1429 
1430 	return 0;
1431 }
1432 
1433 static int
1434 memif_dev_configure(struct rte_eth_dev *dev)
1435 {
1436 	struct pmd_internals *pmd = dev->data->dev_private;
1437 
1438 	/*
1439 	 * CLIENT - TXQ
1440 	 * SERVER - RXQ
1441 	 */
1442 	pmd->cfg.num_c2s_rings = (pmd->role == MEMIF_ROLE_CLIENT) ?
1443 				  dev->data->nb_tx_queues : dev->data->nb_rx_queues;
1444 
1445 	/*
1446 	 * CLIENT - RXQ
1447 	 * SERVER - TXQ
1448 	 */
1449 	pmd->cfg.num_s2c_rings = (pmd->role == MEMIF_ROLE_CLIENT) ?
1450 				  dev->data->nb_rx_queues : dev->data->nb_tx_queues;
1451 
1452 	return 0;
1453 }
1454 
1455 static int
1456 memif_tx_queue_setup(struct rte_eth_dev *dev,
1457 		     uint16_t qid,
1458 		     uint16_t nb_tx_desc __rte_unused,
1459 		     unsigned int socket_id __rte_unused,
1460 		     const struct rte_eth_txconf *tx_conf __rte_unused)
1461 {
1462 	struct pmd_internals *pmd = dev->data->dev_private;
1463 	struct memif_queue *mq;
1464 
1465 	mq = rte_zmalloc("tx-queue", sizeof(struct memif_queue), 0);
1466 	if (mq == NULL) {
1467 		MIF_LOG(ERR, "Failed to allocate tx queue id: %u", qid);
1468 		return -ENOMEM;
1469 	}
1470 
1471 	/* Allocate interrupt instance */
1472 	mq->intr_handle = rte_intr_instance_alloc(RTE_INTR_INSTANCE_F_SHARED);
1473 	if (mq->intr_handle == NULL) {
1474 		MIF_LOG(ERR, "Failed to allocate intr handle");
1475 		return -ENOMEM;
1476 	}
1477 
1478 	mq->type =
1479 	    (pmd->role == MEMIF_ROLE_CLIENT) ? MEMIF_RING_C2S : MEMIF_RING_S2C;
1480 	mq->n_pkts = 0;
1481 	mq->n_bytes = 0;
1482 
1483 	if (rte_intr_fd_set(mq->intr_handle, -1))
1484 		return -rte_errno;
1485 
1486 	if (rte_intr_type_set(mq->intr_handle, RTE_INTR_HANDLE_EXT))
1487 		return -rte_errno;
1488 
1489 	mq->in_port = dev->data->port_id;
1490 	dev->data->tx_queues[qid] = mq;
1491 
1492 	return 0;
1493 }
1494 
1495 static int
1496 memif_rx_queue_setup(struct rte_eth_dev *dev,
1497 		     uint16_t qid,
1498 		     uint16_t nb_rx_desc __rte_unused,
1499 		     unsigned int socket_id __rte_unused,
1500 		     const struct rte_eth_rxconf *rx_conf __rte_unused,
1501 		     struct rte_mempool *mb_pool)
1502 {
1503 	struct pmd_internals *pmd = dev->data->dev_private;
1504 	struct memif_queue *mq;
1505 
1506 	mq = rte_zmalloc("rx-queue", sizeof(struct memif_queue), 0);
1507 	if (mq == NULL) {
1508 		MIF_LOG(ERR, "Failed to allocate rx queue id: %u", qid);
1509 		return -ENOMEM;
1510 	}
1511 
1512 	/* Allocate interrupt instance */
1513 	mq->intr_handle = rte_intr_instance_alloc(RTE_INTR_INSTANCE_F_SHARED);
1514 	if (mq->intr_handle == NULL) {
1515 		MIF_LOG(ERR, "Failed to allocate intr handle");
1516 		return -ENOMEM;
1517 	}
1518 
1519 	mq->type = (pmd->role == MEMIF_ROLE_CLIENT) ? MEMIF_RING_S2C : MEMIF_RING_C2S;
1520 	mq->n_pkts = 0;
1521 	mq->n_bytes = 0;
1522 
1523 	if (rte_intr_fd_set(mq->intr_handle, -1))
1524 		return -rte_errno;
1525 
1526 	if (rte_intr_type_set(mq->intr_handle, RTE_INTR_HANDLE_EXT))
1527 		return -rte_errno;
1528 
1529 	mq->mempool = mb_pool;
1530 	mq->in_port = dev->data->port_id;
1531 	dev->data->rx_queues[qid] = mq;
1532 
1533 	return 0;
1534 }
1535 
1536 static void
1537 memif_rx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
1538 {
1539 	struct memif_queue *mq = dev->data->rx_queues[qid];
1540 
1541 	if (!mq)
1542 		return;
1543 
1544 	rte_intr_instance_free(mq->intr_handle);
1545 	rte_free(mq);
1546 }
1547 
1548 static void
1549 memif_tx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
1550 {
1551 	struct memif_queue *mq = dev->data->tx_queues[qid];
1552 
1553 	if (!mq)
1554 		return;
1555 
1556 	rte_free(mq);
1557 }
1558 
1559 static int
1560 memif_link_update(struct rte_eth_dev *dev,
1561 		  int wait_to_complete __rte_unused)
1562 {
1563 	struct pmd_process_private *proc_private;
1564 
1565 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1566 		proc_private = dev->process_private;
1567 		if (dev->data->dev_link.link_status == RTE_ETH_LINK_UP &&
1568 				proc_private->regions_num == 0) {
1569 			memif_mp_request_regions(dev);
1570 		} else if (dev->data->dev_link.link_status == RTE_ETH_LINK_DOWN &&
1571 				proc_private->regions_num > 0) {
1572 			memif_free_regions(dev);
1573 		}
1574 	}
1575 	return 0;
1576 }
1577 
1578 static int
1579 memif_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
1580 {
1581 	struct pmd_internals *pmd = dev->data->dev_private;
1582 	struct memif_queue *mq;
1583 	int i;
1584 	uint8_t tmp, nq;
1585 
1586 	stats->ipackets = 0;
1587 	stats->ibytes = 0;
1588 	stats->opackets = 0;
1589 	stats->obytes = 0;
1590 
1591 	tmp = (pmd->role == MEMIF_ROLE_CLIENT) ? pmd->run.num_s2c_rings :
1592 	    pmd->run.num_c2s_rings;
1593 	nq = (tmp < RTE_ETHDEV_QUEUE_STAT_CNTRS) ? tmp :
1594 	    RTE_ETHDEV_QUEUE_STAT_CNTRS;
1595 
1596 	/* RX stats */
1597 	for (i = 0; i < nq; i++) {
1598 		mq = dev->data->rx_queues[i];
1599 		stats->q_ipackets[i] = mq->n_pkts;
1600 		stats->q_ibytes[i] = mq->n_bytes;
1601 		stats->ipackets += mq->n_pkts;
1602 		stats->ibytes += mq->n_bytes;
1603 	}
1604 
1605 	tmp = (pmd->role == MEMIF_ROLE_CLIENT) ? pmd->run.num_c2s_rings :
1606 	    pmd->run.num_s2c_rings;
1607 	nq = (tmp < RTE_ETHDEV_QUEUE_STAT_CNTRS) ? tmp :
1608 	    RTE_ETHDEV_QUEUE_STAT_CNTRS;
1609 
1610 	/* TX stats */
1611 	for (i = 0; i < nq; i++) {
1612 		mq = dev->data->tx_queues[i];
1613 		stats->q_opackets[i] = mq->n_pkts;
1614 		stats->q_obytes[i] = mq->n_bytes;
1615 		stats->opackets += mq->n_pkts;
1616 		stats->obytes += mq->n_bytes;
1617 	}
1618 	return 0;
1619 }
1620 
1621 static int
1622 memif_stats_reset(struct rte_eth_dev *dev)
1623 {
1624 	struct pmd_internals *pmd = dev->data->dev_private;
1625 	int i;
1626 	struct memif_queue *mq;
1627 
1628 	for (i = 0; i < pmd->run.num_c2s_rings; i++) {
1629 		mq = (pmd->role == MEMIF_ROLE_CLIENT) ? dev->data->tx_queues[i] :
1630 		    dev->data->rx_queues[i];
1631 		mq->n_pkts = 0;
1632 		mq->n_bytes = 0;
1633 	}
1634 	for (i = 0; i < pmd->run.num_s2c_rings; i++) {
1635 		mq = (pmd->role == MEMIF_ROLE_CLIENT) ? dev->data->rx_queues[i] :
1636 		    dev->data->tx_queues[i];
1637 		mq->n_pkts = 0;
1638 		mq->n_bytes = 0;
1639 	}
1640 
1641 	return 0;
1642 }
1643 
1644 static const struct eth_dev_ops ops = {
1645 	.dev_start = memif_dev_start,
1646 	.dev_stop = memif_dev_stop,
1647 	.dev_close = memif_dev_close,
1648 	.dev_infos_get = memif_dev_info,
1649 	.dev_configure = memif_dev_configure,
1650 	.tx_queue_setup = memif_tx_queue_setup,
1651 	.rx_queue_setup = memif_rx_queue_setup,
1652 	.rx_queue_release = memif_rx_queue_release,
1653 	.tx_queue_release = memif_tx_queue_release,
1654 	.link_update = memif_link_update,
1655 	.stats_get = memif_stats_get,
1656 	.stats_reset = memif_stats_reset,
1657 };
1658 
1659 static int
1660 memif_create(struct rte_vdev_device *vdev, enum memif_role_t role,
1661 	     memif_interface_id_t id, uint32_t flags,
1662 	     const char *socket_filename, uid_t owner_uid, gid_t owner_gid,
1663 	     memif_log2_ring_size_t log2_ring_size,
1664 	     uint16_t pkt_buffer_size, const char *secret,
1665 	     struct rte_ether_addr *ether_addr)
1666 {
1667 	int ret = 0;
1668 	struct rte_eth_dev *eth_dev;
1669 	struct rte_eth_dev_data *data;
1670 	struct pmd_internals *pmd;
1671 	struct pmd_process_private *process_private;
1672 	const unsigned int numa_node = vdev->device.numa_node;
1673 	const char *name = rte_vdev_device_name(vdev);
1674 
1675 	eth_dev = rte_eth_vdev_allocate(vdev, sizeof(*pmd));
1676 	if (eth_dev == NULL) {
1677 		MIF_LOG(ERR, "%s: Unable to allocate device struct.", name);
1678 		return -1;
1679 	}
1680 
1681 	process_private = (struct pmd_process_private *)
1682 		rte_zmalloc(name, sizeof(struct pmd_process_private),
1683 			    RTE_CACHE_LINE_SIZE);
1684 
1685 	if (process_private == NULL) {
1686 		MIF_LOG(ERR, "Failed to alloc memory for process private");
1687 		return -1;
1688 	}
1689 	eth_dev->process_private = process_private;
1690 
1691 	pmd = eth_dev->data->dev_private;
1692 	memset(pmd, 0, sizeof(*pmd));
1693 
1694 	pmd->id = id;
1695 	pmd->flags = flags;
1696 	pmd->flags |= ETH_MEMIF_FLAG_DISABLED;
1697 	pmd->role = role;
1698 	/* Zero-copy flag irelevant to server. */
1699 	if (pmd->role == MEMIF_ROLE_SERVER)
1700 		pmd->flags &= ~ETH_MEMIF_FLAG_ZERO_COPY;
1701 	pmd->owner_uid = owner_uid;
1702 	pmd->owner_gid = owner_gid;
1703 
1704 	ret = memif_socket_init(eth_dev, socket_filename);
1705 	if (ret < 0)
1706 		return ret;
1707 
1708 	memset(pmd->secret, 0, sizeof(char) * ETH_MEMIF_SECRET_SIZE);
1709 	if (secret != NULL)
1710 		strlcpy(pmd->secret, secret, sizeof(pmd->secret));
1711 
1712 	pmd->cfg.log2_ring_size = log2_ring_size;
1713 	/* set in .dev_configure() */
1714 	pmd->cfg.num_c2s_rings = 0;
1715 	pmd->cfg.num_s2c_rings = 0;
1716 
1717 	pmd->cfg.pkt_buffer_size = pkt_buffer_size;
1718 	rte_spinlock_init(&pmd->cc_lock);
1719 
1720 	data = eth_dev->data;
1721 	data->dev_private = pmd;
1722 	data->numa_node = numa_node;
1723 	data->dev_link = pmd_link;
1724 	data->mac_addrs = ether_addr;
1725 	data->promiscuous = 1;
1726 	data->dev_flags |= RTE_ETH_DEV_AUTOFILL_QUEUE_XSTATS;
1727 
1728 	eth_dev->dev_ops = &ops;
1729 	eth_dev->device = &vdev->device;
1730 	if (pmd->flags & ETH_MEMIF_FLAG_ZERO_COPY) {
1731 		eth_dev->rx_pkt_burst = eth_memif_rx_zc;
1732 		eth_dev->tx_pkt_burst = eth_memif_tx_zc;
1733 	} else {
1734 		eth_dev->rx_pkt_burst = eth_memif_rx;
1735 		eth_dev->tx_pkt_burst = eth_memif_tx;
1736 	}
1737 
1738 	rte_eth_dev_probing_finish(eth_dev);
1739 
1740 	return 0;
1741 }
1742 
1743 static int
1744 memif_set_role(const char *key __rte_unused, const char *value,
1745 	       void *extra_args)
1746 {
1747 	enum memif_role_t *role = (enum memif_role_t *)extra_args;
1748 
1749 	if (strstr(value, "server") != NULL) {
1750 		*role = MEMIF_ROLE_SERVER;
1751 	} else if (strstr(value, "client") != NULL) {
1752 		*role = MEMIF_ROLE_CLIENT;
1753 	} else if (strstr(value, "master") != NULL) {
1754 		MIF_LOG(NOTICE, "Role argument \"master\" is deprecated, use \"server\"");
1755 		*role = MEMIF_ROLE_SERVER;
1756 	} else if (strstr(value, "slave") != NULL) {
1757 		MIF_LOG(NOTICE, "Role argument \"slave\" is deprecated, use \"client\"");
1758 		*role = MEMIF_ROLE_CLIENT;
1759 	} else {
1760 		MIF_LOG(ERR, "Unknown role: %s.", value);
1761 		return -EINVAL;
1762 	}
1763 	return 0;
1764 }
1765 
1766 static int
1767 memif_set_zc(const char *key __rte_unused, const char *value, void *extra_args)
1768 {
1769 	uint32_t *flags = (uint32_t *)extra_args;
1770 
1771 	if (strstr(value, "yes") != NULL) {
1772 		if (!rte_mcfg_get_single_file_segments()) {
1773 			MIF_LOG(ERR, "Zero-copy doesn't support multi-file segments.");
1774 			return -ENOTSUP;
1775 		}
1776 		*flags |= ETH_MEMIF_FLAG_ZERO_COPY;
1777 	} else if (strstr(value, "no") != NULL) {
1778 		*flags &= ~ETH_MEMIF_FLAG_ZERO_COPY;
1779 	} else {
1780 		MIF_LOG(ERR, "Failed to parse zero-copy param: %s.", value);
1781 		return -EINVAL;
1782 	}
1783 	return 0;
1784 }
1785 
1786 static int
1787 memif_set_id(const char *key __rte_unused, const char *value, void *extra_args)
1788 {
1789 	memif_interface_id_t *id = (memif_interface_id_t *)extra_args;
1790 
1791 	/* even if parsing fails, 0 is a valid id */
1792 	*id = strtoul(value, NULL, 10);
1793 	return 0;
1794 }
1795 
1796 static int
1797 memif_set_bs(const char *key __rte_unused, const char *value, void *extra_args)
1798 {
1799 	unsigned long tmp;
1800 	uint16_t *pkt_buffer_size = (uint16_t *)extra_args;
1801 
1802 	tmp = strtoul(value, NULL, 10);
1803 	if (tmp == 0 || tmp > 0xFFFF) {
1804 		MIF_LOG(ERR, "Invalid buffer size: %s.", value);
1805 		return -EINVAL;
1806 	}
1807 	*pkt_buffer_size = tmp;
1808 	return 0;
1809 }
1810 
1811 static int
1812 memif_set_rs(const char *key __rte_unused, const char *value, void *extra_args)
1813 {
1814 	unsigned long tmp;
1815 	memif_log2_ring_size_t *log2_ring_size =
1816 	    (memif_log2_ring_size_t *)extra_args;
1817 
1818 	tmp = strtoul(value, NULL, 10);
1819 	if (tmp == 0 || tmp > ETH_MEMIF_MAX_LOG2_RING_SIZE) {
1820 		MIF_LOG(ERR, "Invalid ring size: %s (max %u).",
1821 			value, ETH_MEMIF_MAX_LOG2_RING_SIZE);
1822 		return -EINVAL;
1823 	}
1824 	*log2_ring_size = tmp;
1825 	return 0;
1826 }
1827 
1828 /* check if directory exists and if we have permission to read/write */
1829 static int
1830 memif_check_socket_filename(const char *filename)
1831 {
1832 	char *dir = NULL, *tmp;
1833 	uint32_t idx;
1834 	int ret = 0;
1835 
1836 	if (strlen(filename) >= MEMIF_SOCKET_UN_SIZE) {
1837 		MIF_LOG(ERR, "Unix socket address too long (max 108).");
1838 		return -1;
1839 	}
1840 
1841 	tmp = strrchr(filename, '/');
1842 	if (tmp != NULL) {
1843 		idx = tmp - filename;
1844 		dir = rte_zmalloc("memif_tmp", sizeof(char) * (idx + 1), 0);
1845 		if (dir == NULL) {
1846 			MIF_LOG(ERR, "Failed to allocate memory.");
1847 			return -1;
1848 		}
1849 		strlcpy(dir, filename, sizeof(char) * (idx + 1));
1850 	}
1851 
1852 	if (dir == NULL || (faccessat(-1, dir, F_OK | R_OK |
1853 					W_OK, AT_EACCESS) < 0)) {
1854 		MIF_LOG(ERR, "Invalid socket directory.");
1855 		ret = -EINVAL;
1856 	}
1857 
1858 	rte_free(dir);
1859 
1860 	return ret;
1861 }
1862 
1863 static int
1864 memif_set_socket_filename(const char *key __rte_unused, const char *value,
1865 			  void *extra_args)
1866 {
1867 	const char **socket_filename = (const char **)extra_args;
1868 
1869 	*socket_filename = value;
1870 	return 0;
1871 }
1872 
1873 static int
1874 memif_set_is_socket_abstract(const char *key __rte_unused, const char *value, void *extra_args)
1875 {
1876 	uint32_t *flags = (uint32_t *)extra_args;
1877 
1878 	if (strstr(value, "yes") != NULL) {
1879 		*flags |= ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1880 	} else if (strstr(value, "no") != NULL) {
1881 		*flags &= ~ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
1882 	} else {
1883 		MIF_LOG(ERR, "Failed to parse socket-abstract param: %s.", value);
1884 		return -EINVAL;
1885 	}
1886 	return 0;
1887 }
1888 
1889 static int
1890 memif_set_owner(const char *key, const char *value, void *extra_args)
1891 {
1892 	RTE_ASSERT(sizeof(uid_t) == sizeof(uint32_t));
1893 	RTE_ASSERT(sizeof(gid_t) == sizeof(uint32_t));
1894 
1895 	unsigned long val;
1896 	char *end = NULL;
1897 	uint32_t *id = (uint32_t *)extra_args;
1898 
1899 	val = strtoul(value, &end, 10);
1900 	if (*value == '\0' || *end != '\0') {
1901 		MIF_LOG(ERR, "Failed to parse %s: %s.", key, value);
1902 		return -EINVAL;
1903 	}
1904 	if (val >= UINT32_MAX) {
1905 		MIF_LOG(ERR, "Invalid %s: %s.", key, value);
1906 		return -ERANGE;
1907 	}
1908 
1909 	*id = val;
1910 	return 0;
1911 }
1912 
1913 static int
1914 memif_set_mac(const char *key __rte_unused, const char *value, void *extra_args)
1915 {
1916 	struct rte_ether_addr *ether_addr = (struct rte_ether_addr *)extra_args;
1917 
1918 	if (rte_ether_unformat_addr(value, ether_addr) < 0)
1919 		MIF_LOG(WARNING, "Failed to parse mac '%s'.", value);
1920 	return 0;
1921 }
1922 
1923 static int
1924 memif_set_secret(const char *key __rte_unused, const char *value, void *extra_args)
1925 {
1926 	const char **secret = (const char **)extra_args;
1927 
1928 	*secret = value;
1929 	return 0;
1930 }
1931 
1932 static int
1933 rte_pmd_memif_probe(struct rte_vdev_device *vdev)
1934 {
1935 	RTE_BUILD_BUG_ON(sizeof(memif_msg_t) != 128);
1936 	RTE_BUILD_BUG_ON(sizeof(memif_desc_t) != 16);
1937 	int ret = 0;
1938 	struct rte_kvargs *kvlist;
1939 	const char *name = rte_vdev_device_name(vdev);
1940 	enum memif_role_t role = MEMIF_ROLE_CLIENT;
1941 	memif_interface_id_t id = 0;
1942 	uint16_t pkt_buffer_size = ETH_MEMIF_DEFAULT_PKT_BUFFER_SIZE;
1943 	memif_log2_ring_size_t log2_ring_size = ETH_MEMIF_DEFAULT_RING_SIZE;
1944 	const char *socket_filename = ETH_MEMIF_DEFAULT_SOCKET_FILENAME;
1945 	uid_t owner_uid = -1;
1946 	gid_t owner_gid = -1;
1947 	uint32_t flags = 0;
1948 	const char *secret = NULL;
1949 	struct rte_ether_addr *ether_addr = rte_zmalloc("",
1950 		sizeof(struct rte_ether_addr), 0);
1951 	struct rte_eth_dev *eth_dev;
1952 
1953 	rte_eth_random_addr(ether_addr->addr_bytes);
1954 
1955 	MIF_LOG(INFO, "Initialize MEMIF: %s.", name);
1956 
1957 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1958 		eth_dev = rte_eth_dev_attach_secondary(name);
1959 		if (!eth_dev) {
1960 			MIF_LOG(ERR, "Failed to probe %s", name);
1961 			return -1;
1962 		}
1963 
1964 		eth_dev->dev_ops = &ops;
1965 		eth_dev->device = &vdev->device;
1966 		eth_dev->rx_pkt_burst = eth_memif_rx;
1967 		eth_dev->tx_pkt_burst = eth_memif_tx;
1968 
1969 		if (!rte_eal_primary_proc_alive(NULL)) {
1970 			MIF_LOG(ERR, "Primary process is missing");
1971 			return -1;
1972 		}
1973 
1974 		eth_dev->process_private = (struct pmd_process_private *)
1975 			rte_zmalloc(name,
1976 				sizeof(struct pmd_process_private),
1977 				RTE_CACHE_LINE_SIZE);
1978 		if (eth_dev->process_private == NULL) {
1979 			MIF_LOG(ERR,
1980 				"Failed to alloc memory for process private");
1981 			return -1;
1982 		}
1983 
1984 		rte_eth_dev_probing_finish(eth_dev);
1985 
1986 		return 0;
1987 	}
1988 
1989 	ret = rte_mp_action_register(MEMIF_MP_SEND_REGION, memif_mp_send_region);
1990 	/*
1991 	 * Primary process can continue probing, but secondary process won't
1992 	 * be able to get memory regions information
1993 	 */
1994 	if (ret < 0 && rte_errno != EEXIST)
1995 		MIF_LOG(WARNING, "Failed to register mp action callback: %s",
1996 			strerror(rte_errno));
1997 
1998 	/* use abstract address by default */
1999 	flags |= ETH_MEMIF_FLAG_SOCKET_ABSTRACT;
2000 
2001 	kvlist = rte_kvargs_parse(rte_vdev_device_args(vdev), valid_arguments);
2002 
2003 	/* parse parameters */
2004 	if (kvlist != NULL) {
2005 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ROLE_ARG,
2006 					 &memif_set_role, &role);
2007 		if (ret < 0)
2008 			goto exit;
2009 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ID_ARG,
2010 					 &memif_set_id, &id);
2011 		if (ret < 0)
2012 			goto exit;
2013 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_PKT_BUFFER_SIZE_ARG,
2014 					 &memif_set_bs, &pkt_buffer_size);
2015 		if (ret < 0)
2016 			goto exit;
2017 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_RING_SIZE_ARG,
2018 					 &memif_set_rs, &log2_ring_size);
2019 		if (ret < 0)
2020 			goto exit;
2021 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SOCKET_ARG,
2022 					 &memif_set_socket_filename,
2023 					 (void *)(&socket_filename));
2024 		if (ret < 0)
2025 			goto exit;
2026 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SOCKET_ABSTRACT_ARG,
2027 					 &memif_set_is_socket_abstract, &flags);
2028 		if (ret < 0)
2029 			goto exit;
2030 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_OWNER_UID_ARG,
2031 					 &memif_set_owner, &owner_uid);
2032 		if (ret < 0)
2033 			goto exit;
2034 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_OWNER_GID_ARG,
2035 					 &memif_set_owner, &owner_gid);
2036 		if (ret < 0)
2037 			goto exit;
2038 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_MAC_ARG,
2039 					 &memif_set_mac, ether_addr);
2040 		if (ret < 0)
2041 			goto exit;
2042 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_ZC_ARG,
2043 					 &memif_set_zc, &flags);
2044 		if (ret < 0)
2045 			goto exit;
2046 		ret = rte_kvargs_process(kvlist, ETH_MEMIF_SECRET_ARG,
2047 					 &memif_set_secret, (void *)(&secret));
2048 		if (ret < 0)
2049 			goto exit;
2050 	}
2051 
2052 	if (!(flags & ETH_MEMIF_FLAG_SOCKET_ABSTRACT)) {
2053 		ret = memif_check_socket_filename(socket_filename);
2054 		if (ret < 0)
2055 			goto exit;
2056 	}
2057 
2058 	/* create interface */
2059 	ret = memif_create(vdev, role, id, flags, socket_filename, owner_uid, owner_gid,
2060 			   log2_ring_size, pkt_buffer_size, secret, ether_addr);
2061 
2062 exit:
2063 	rte_kvargs_free(kvlist);
2064 	return ret;
2065 }
2066 
2067 static int
2068 rte_pmd_memif_remove(struct rte_vdev_device *vdev)
2069 {
2070 	struct rte_eth_dev *eth_dev;
2071 
2072 	eth_dev = rte_eth_dev_allocated(rte_vdev_device_name(vdev));
2073 	if (eth_dev == NULL)
2074 		return 0;
2075 
2076 	return rte_eth_dev_close(eth_dev->data->port_id);
2077 }
2078 
2079 static struct rte_vdev_driver pmd_memif_drv = {
2080 	.probe = rte_pmd_memif_probe,
2081 	.remove = rte_pmd_memif_remove,
2082 };
2083 
2084 RTE_PMD_REGISTER_VDEV(net_memif, pmd_memif_drv);
2085 
2086 RTE_PMD_REGISTER_PARAM_STRING(net_memif,
2087 			      ETH_MEMIF_ID_ARG "=<int>"
2088 			      ETH_MEMIF_ROLE_ARG "=server|client"
2089 			      ETH_MEMIF_PKT_BUFFER_SIZE_ARG "=<int>"
2090 			      ETH_MEMIF_RING_SIZE_ARG "=<int>"
2091 			      ETH_MEMIF_SOCKET_ARG "=<string>"
2092 			      ETH_MEMIF_SOCKET_ABSTRACT_ARG "=yes|no"
2093 			      ETH_MEMIF_OWNER_UID_ARG "=<int>"
2094 			      ETH_MEMIF_OWNER_GID_ARG "=<int>"
2095 			      ETH_MEMIF_MAC_ARG "=xx:xx:xx:xx:xx:xx"
2096 			      ETH_MEMIF_ZC_ARG "=yes|no"
2097 			      ETH_MEMIF_SECRET_ARG "=<string>");
2098 
2099 RTE_LOG_REGISTER_DEFAULT(memif_logtype, NOTICE);
2100