xref: /spdk/module/sock/posix/posix.c (revision d73077b84a71985da1db1c9847ea7c042189bae2)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation. All rights reserved.
5  *   Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #if defined(__linux__)
37 #include <sys/epoll.h>
38 #include <linux/errqueue.h>
39 #elif defined(__FreeBSD__)
40 #include <sys/event.h>
41 #endif
42 
43 #include "spdk/log.h"
44 #include "spdk/pipe.h"
45 #include "spdk/sock.h"
46 #include "spdk/util.h"
47 #include "spdk/likely.h"
48 #include "spdk_internal/sock.h"
49 
50 #define MAX_TMPBUF 1024
51 #define PORTNUMLEN 32
52 #define IOV_BATCH_SIZE 64
53 
54 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
55 #define SPDK_ZEROCOPY
56 #endif
57 
58 struct spdk_posix_sock {
59 	struct spdk_sock	base;
60 	int			fd;
61 
62 	uint32_t		sendmsg_idx;
63 
64 	struct spdk_pipe	*recv_pipe;
65 	void			*recv_buf;
66 	int			recv_buf_sz;
67 	bool			pending_recv;
68 	bool			zcopy;
69 	int			so_priority;
70 
71 	TAILQ_ENTRY(spdk_posix_sock)	link;
72 };
73 
74 struct spdk_posix_sock_group_impl {
75 	struct spdk_sock_group_impl	base;
76 	int				fd;
77 	TAILQ_HEAD(, spdk_posix_sock)	pending_recv;
78 };
79 
80 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
81 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
82 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
83 	.enable_recv_pipe = true,
84 	.enable_zerocopy_send = false,
85 	.enable_quickack = false,
86 	.enable_placement_id = false,
87 };
88 
89 static int
90 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
91 {
92 	const char *result = NULL;
93 
94 	if (sa == NULL || host == NULL) {
95 		return -1;
96 	}
97 
98 	switch (sa->sa_family) {
99 	case AF_INET:
100 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
101 				   host, hlen);
102 		break;
103 	case AF_INET6:
104 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
105 				   host, hlen);
106 		break;
107 	default:
108 		break;
109 	}
110 
111 	if (result != NULL) {
112 		return 0;
113 	} else {
114 		return -1;
115 	}
116 }
117 
118 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
119 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
120 
121 static int
122 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
123 		   char *caddr, int clen, uint16_t *cport)
124 {
125 	struct spdk_posix_sock *sock = __posix_sock(_sock);
126 	struct sockaddr_storage sa;
127 	socklen_t salen;
128 	int rc;
129 
130 	assert(sock != NULL);
131 
132 	memset(&sa, 0, sizeof sa);
133 	salen = sizeof sa;
134 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
135 	if (rc != 0) {
136 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
137 		return -1;
138 	}
139 
140 	switch (sa.ss_family) {
141 	case AF_UNIX:
142 		/* Acceptable connection types that don't have IPs */
143 		return 0;
144 	case AF_INET:
145 	case AF_INET6:
146 		/* Code below will get IP addresses */
147 		break;
148 	default:
149 		/* Unsupported socket family */
150 		return -1;
151 	}
152 
153 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
154 	if (rc != 0) {
155 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
156 		return -1;
157 	}
158 
159 	if (sport) {
160 		if (sa.ss_family == AF_INET) {
161 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
162 		} else if (sa.ss_family == AF_INET6) {
163 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
164 		}
165 	}
166 
167 	memset(&sa, 0, sizeof sa);
168 	salen = sizeof sa;
169 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
170 	if (rc != 0) {
171 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
172 		return -1;
173 	}
174 
175 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
176 	if (rc != 0) {
177 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
178 		return -1;
179 	}
180 
181 	if (cport) {
182 		if (sa.ss_family == AF_INET) {
183 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
184 		} else if (sa.ss_family == AF_INET6) {
185 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
186 		}
187 	}
188 
189 	return 0;
190 }
191 
192 enum posix_sock_create_type {
193 	SPDK_SOCK_CREATE_LISTEN,
194 	SPDK_SOCK_CREATE_CONNECT,
195 };
196 
197 static int
198 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
199 {
200 	uint8_t *new_buf;
201 	struct spdk_pipe *new_pipe;
202 	struct iovec siov[2];
203 	struct iovec diov[2];
204 	int sbytes;
205 	ssize_t bytes;
206 
207 	if (sock->recv_buf_sz == sz) {
208 		return 0;
209 	}
210 
211 	/* If the new size is 0, just free the pipe */
212 	if (sz == 0) {
213 		spdk_pipe_destroy(sock->recv_pipe);
214 		free(sock->recv_buf);
215 		sock->recv_pipe = NULL;
216 		sock->recv_buf = NULL;
217 		return 0;
218 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
219 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
220 		return -1;
221 	}
222 
223 	/* Round up to next 64 byte multiple */
224 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
225 	if (!new_buf) {
226 		SPDK_ERRLOG("socket recv buf allocation failed\n");
227 		return -ENOMEM;
228 	}
229 
230 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
231 	if (new_pipe == NULL) {
232 		SPDK_ERRLOG("socket pipe allocation failed\n");
233 		free(new_buf);
234 		return -ENOMEM;
235 	}
236 
237 	if (sock->recv_pipe != NULL) {
238 		/* Pull all of the data out of the old pipe */
239 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
240 		if (sbytes > sz) {
241 			/* Too much data to fit into the new pipe size */
242 			spdk_pipe_destroy(new_pipe);
243 			free(new_buf);
244 			return -EINVAL;
245 		}
246 
247 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
248 		assert(sbytes == sz);
249 
250 		bytes = spdk_iovcpy(siov, 2, diov, 2);
251 		spdk_pipe_writer_advance(new_pipe, bytes);
252 
253 		spdk_pipe_destroy(sock->recv_pipe);
254 		free(sock->recv_buf);
255 	}
256 
257 	sock->recv_buf_sz = sz;
258 	sock->recv_buf = new_buf;
259 	sock->recv_pipe = new_pipe;
260 
261 	return 0;
262 }
263 
264 static int
265 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
266 {
267 	struct spdk_posix_sock *sock = __posix_sock(_sock);
268 	int rc;
269 
270 	assert(sock != NULL);
271 
272 	if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) {
273 		rc = posix_sock_alloc_pipe(sock, sz);
274 		if (rc) {
275 			return rc;
276 		}
277 	}
278 
279 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
280 	if (sz < MIN_SO_RCVBUF_SIZE) {
281 		sz = MIN_SO_RCVBUF_SIZE;
282 	}
283 
284 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
285 	if (rc < 0) {
286 		return rc;
287 	}
288 
289 	return 0;
290 }
291 
292 static int
293 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
294 {
295 	struct spdk_posix_sock *sock = __posix_sock(_sock);
296 	int rc;
297 
298 	assert(sock != NULL);
299 
300 	if (sz < MIN_SO_SNDBUF_SIZE) {
301 		sz = MIN_SO_SNDBUF_SIZE;
302 	}
303 
304 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
305 	if (rc < 0) {
306 		return rc;
307 	}
308 
309 	return 0;
310 }
311 
312 static struct spdk_posix_sock *
313 posix_sock_alloc(int fd, bool enable_zero_copy)
314 {
315 	struct spdk_posix_sock *sock;
316 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
317 	int flag;
318 	int rc;
319 #endif
320 
321 	sock = calloc(1, sizeof(*sock));
322 	if (sock == NULL) {
323 		SPDK_ERRLOG("sock allocation failed\n");
324 		return NULL;
325 	}
326 
327 	sock->fd = fd;
328 
329 #if defined(SPDK_ZEROCOPY)
330 	flag = 1;
331 
332 	if (!enable_zero_copy || !g_spdk_posix_sock_impl_opts.enable_zerocopy_send) {
333 		return sock;
334 	}
335 
336 	/* Try to turn on zero copy sends */
337 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
338 	if (rc == 0) {
339 		sock->zcopy = true;
340 	}
341 #endif
342 
343 #if defined(__linux__)
344 	flag = 1;
345 
346 	if (g_spdk_posix_sock_impl_opts.enable_quickack) {
347 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
348 		if (rc != 0) {
349 			SPDK_ERRLOG("quickack was failed to set\n");
350 		}
351 	}
352 #endif
353 
354 	return sock;
355 }
356 
357 static bool
358 sock_is_loopback(int fd)
359 {
360 	struct ifaddrs *addrs, *tmp;
361 	struct sockaddr_storage sa = {};
362 	socklen_t salen;
363 	struct ifreq ifr = {};
364 	char ip_addr[256], ip_addr_tmp[256];
365 	int rc;
366 	bool is_loopback = false;
367 
368 	salen = sizeof(sa);
369 	rc = getsockname(fd, (struct sockaddr *)&sa, &salen);
370 	if (rc != 0) {
371 		return is_loopback;
372 	}
373 
374 	memset(ip_addr, 0, sizeof(ip_addr));
375 	rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr));
376 	if (rc != 0) {
377 		return is_loopback;
378 	}
379 
380 	getifaddrs(&addrs);
381 	for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) {
382 		if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) &&
383 		    (tmp->ifa_addr->sa_family == sa.ss_family)) {
384 			memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp));
385 			rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp));
386 			if (rc != 0) {
387 				continue;
388 			}
389 
390 			if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) {
391 				memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name));
392 				ioctl(fd, SIOCGIFFLAGS, &ifr);
393 				if (ifr.ifr_flags & IFF_LOOPBACK) {
394 					is_loopback = true;
395 				}
396 				goto end;
397 			}
398 		}
399 	}
400 
401 end:
402 	freeifaddrs(addrs);
403 	return is_loopback;
404 }
405 
406 static struct spdk_sock *
407 posix_sock_create(const char *ip, int port,
408 		  enum posix_sock_create_type type,
409 		  struct spdk_sock_opts *opts)
410 {
411 	struct spdk_posix_sock *sock;
412 	char buf[MAX_TMPBUF];
413 	char portnum[PORTNUMLEN];
414 	char *p;
415 	struct addrinfo hints, *res, *res0;
416 	int fd, flag;
417 	int val = 1;
418 	int rc, sz;
419 	bool enable_zero_copy = true;
420 
421 	assert(opts != NULL);
422 
423 	if (ip == NULL) {
424 		return NULL;
425 	}
426 	if (ip[0] == '[') {
427 		snprintf(buf, sizeof(buf), "%s", ip + 1);
428 		p = strchr(buf, ']');
429 		if (p != NULL) {
430 			*p = '\0';
431 		}
432 		ip = (const char *) &buf[0];
433 	}
434 
435 	snprintf(portnum, sizeof portnum, "%d", port);
436 	memset(&hints, 0, sizeof hints);
437 	hints.ai_family = PF_UNSPEC;
438 	hints.ai_socktype = SOCK_STREAM;
439 	hints.ai_flags = AI_NUMERICSERV;
440 	hints.ai_flags |= AI_PASSIVE;
441 	hints.ai_flags |= AI_NUMERICHOST;
442 	rc = getaddrinfo(ip, portnum, &hints, &res0);
443 	if (rc != 0) {
444 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
445 		return NULL;
446 	}
447 
448 	/* try listen */
449 	fd = -1;
450 	for (res = res0; res != NULL; res = res->ai_next) {
451 retry:
452 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
453 		if (fd < 0) {
454 			/* error */
455 			continue;
456 		}
457 
458 		sz = g_spdk_posix_sock_impl_opts.recv_buf_size;
459 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
460 		if (rc) {
461 			/* Not fatal */
462 		}
463 
464 		sz = g_spdk_posix_sock_impl_opts.send_buf_size;
465 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
466 		if (rc) {
467 			/* Not fatal */
468 		}
469 
470 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
471 		if (rc != 0) {
472 			close(fd);
473 			/* error */
474 			continue;
475 		}
476 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
477 		if (rc != 0) {
478 			close(fd);
479 			/* error */
480 			continue;
481 		}
482 
483 #if defined(SO_PRIORITY)
484 		if (opts->priority) {
485 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
486 			if (rc != 0) {
487 				close(fd);
488 				/* error */
489 				continue;
490 			}
491 		}
492 #endif
493 
494 		if (res->ai_family == AF_INET6) {
495 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
496 			if (rc != 0) {
497 				close(fd);
498 				/* error */
499 				continue;
500 			}
501 		}
502 
503 		if (type == SPDK_SOCK_CREATE_LISTEN) {
504 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
505 			if (rc != 0) {
506 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
507 				switch (errno) {
508 				case EINTR:
509 					/* interrupted? */
510 					close(fd);
511 					goto retry;
512 				case EADDRNOTAVAIL:
513 					SPDK_ERRLOG("IP address %s not available. "
514 						    "Verify IP address in config file "
515 						    "and make sure setup script is "
516 						    "run before starting spdk app.\n", ip);
517 				/* FALLTHROUGH */
518 				default:
519 					/* try next family */
520 					close(fd);
521 					fd = -1;
522 					continue;
523 				}
524 			}
525 			/* bind OK */
526 			rc = listen(fd, 512);
527 			if (rc != 0) {
528 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
529 				close(fd);
530 				fd = -1;
531 				break;
532 			}
533 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
534 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
535 			if (rc != 0) {
536 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
537 				/* try next family */
538 				close(fd);
539 				fd = -1;
540 				continue;
541 			}
542 		}
543 
544 		flag = fcntl(fd, F_GETFL);
545 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
546 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
547 			close(fd);
548 			fd = -1;
549 			break;
550 		}
551 		break;
552 	}
553 	freeaddrinfo(res0);
554 
555 	if (fd < 0) {
556 		return NULL;
557 	}
558 
559 	/* Only enable zero copy for non-loopback sockets. */
560 	enable_zero_copy = opts->zcopy && !sock_is_loopback(fd);
561 
562 	sock = posix_sock_alloc(fd, enable_zero_copy);
563 	if (sock == NULL) {
564 		SPDK_ERRLOG("sock allocation failed\n");
565 		close(fd);
566 		return NULL;
567 	}
568 
569 	if (opts != NULL) {
570 		sock->so_priority = opts->priority;
571 	}
572 	return &sock->base;
573 }
574 
575 static struct spdk_sock *
576 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
577 {
578 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
579 }
580 
581 static struct spdk_sock *
582 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
583 {
584 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
585 }
586 
587 static struct spdk_sock *
588 posix_sock_accept(struct spdk_sock *_sock)
589 {
590 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
591 	struct sockaddr_storage		sa;
592 	socklen_t			salen;
593 	int				rc, fd;
594 	struct spdk_posix_sock		*new_sock;
595 	int				flag;
596 
597 	memset(&sa, 0, sizeof(sa));
598 	salen = sizeof(sa);
599 
600 	assert(sock != NULL);
601 
602 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
603 
604 	if (rc == -1) {
605 		return NULL;
606 	}
607 
608 	fd = rc;
609 
610 	flag = fcntl(fd, F_GETFL);
611 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
612 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
613 		close(fd);
614 		return NULL;
615 	}
616 
617 #if defined(SO_PRIORITY)
618 	/* The priority is not inherited, so call this function again */
619 	if (sock->base.opts.priority) {
620 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
621 		if (rc != 0) {
622 			close(fd);
623 			return NULL;
624 		}
625 	}
626 #endif
627 
628 	/* Inherit the zero copy feature from the listen socket */
629 	new_sock = posix_sock_alloc(fd, sock->zcopy);
630 	if (new_sock == NULL) {
631 		close(fd);
632 		return NULL;
633 	}
634 	new_sock->so_priority = sock->base.opts.priority;
635 
636 	return &new_sock->base;
637 }
638 
639 static int
640 posix_sock_close(struct spdk_sock *_sock)
641 {
642 	struct spdk_posix_sock *sock = __posix_sock(_sock);
643 
644 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
645 
646 	/* If the socket fails to close, the best choice is to
647 	 * leak the fd but continue to free the rest of the sock
648 	 * memory. */
649 	close(sock->fd);
650 
651 	spdk_pipe_destroy(sock->recv_pipe);
652 	free(sock->recv_buf);
653 	free(sock);
654 
655 	return 0;
656 }
657 
658 #ifdef SPDK_ZEROCOPY
659 static int
660 _sock_check_zcopy(struct spdk_sock *sock)
661 {
662 	struct spdk_posix_sock *psock = __posix_sock(sock);
663 	struct msghdr msgh = {};
664 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
665 	ssize_t rc;
666 	struct sock_extended_err *serr;
667 	struct cmsghdr *cm;
668 	uint32_t idx;
669 	struct spdk_sock_request *req, *treq;
670 	bool found;
671 
672 	msgh.msg_control = buf;
673 	msgh.msg_controllen = sizeof(buf);
674 
675 	while (true) {
676 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
677 
678 		if (rc < 0) {
679 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
680 				return 0;
681 			}
682 
683 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
684 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
685 			} else {
686 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
687 			}
688 			return 0;
689 		}
690 
691 		cm = CMSG_FIRSTHDR(&msgh);
692 		if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
693 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
694 			return 0;
695 		}
696 
697 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
698 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
699 			SPDK_WARNLOG("Unexpected extended error origin\n");
700 			return 0;
701 		}
702 
703 		/* Most of the time, the pending_reqs array is in the exact
704 		 * order we need such that all of the requests to complete are
705 		 * in order, in the front. It is guaranteed that all requests
706 		 * belonging to the same sendmsg call are sequential, so once
707 		 * we encounter one match we can stop looping as soon as a
708 		 * non-match is found.
709 		 */
710 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
711 			found = false;
712 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
713 				if (req->internal.offset == idx) {
714 					found = true;
715 
716 					rc = spdk_sock_request_put(sock, req, 0);
717 					if (rc < 0) {
718 						return rc;
719 					}
720 
721 				} else if (found) {
722 					break;
723 				}
724 			}
725 
726 		}
727 	}
728 
729 	return 0;
730 }
731 #endif
732 
733 static int
734 _sock_flush(struct spdk_sock *sock)
735 {
736 	struct spdk_posix_sock *psock = __posix_sock(sock);
737 	struct msghdr msg = {};
738 	int flags;
739 	struct iovec iovs[IOV_BATCH_SIZE];
740 	int iovcnt;
741 	int retval;
742 	struct spdk_sock_request *req;
743 	int i;
744 	ssize_t rc;
745 	unsigned int offset;
746 	size_t len;
747 
748 	/* Can't flush from within a callback or we end up with recursive calls */
749 	if (sock->cb_cnt > 0) {
750 		return 0;
751 	}
752 
753 	/* Gather an iov */
754 	iovcnt = 0;
755 	req = TAILQ_FIRST(&sock->queued_reqs);
756 	while (req) {
757 		offset = req->internal.offset;
758 
759 		for (i = 0; i < req->iovcnt; i++) {
760 			/* Consume any offset first */
761 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
762 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
763 				continue;
764 			}
765 
766 			iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
767 			iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
768 			iovcnt++;
769 
770 			offset = 0;
771 
772 			if (iovcnt >= IOV_BATCH_SIZE) {
773 				break;
774 			}
775 		}
776 
777 		if (iovcnt >= IOV_BATCH_SIZE) {
778 			break;
779 		}
780 
781 		req = TAILQ_NEXT(req, internal.link);
782 	}
783 
784 	if (iovcnt == 0) {
785 		return 0;
786 	}
787 
788 	/* Perform the vectored write */
789 	msg.msg_iov = iovs;
790 	msg.msg_iovlen = iovcnt;
791 #ifdef SPDK_ZEROCOPY
792 	if (psock->zcopy) {
793 		flags = MSG_ZEROCOPY;
794 	} else
795 #endif
796 	{
797 		flags = 0;
798 	}
799 	rc = sendmsg(psock->fd, &msg, flags);
800 	if (rc <= 0) {
801 		if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
802 			return 0;
803 		}
804 		return rc;
805 	}
806 
807 	/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
808 	 * req->internal.offset, so sendmsg_idx should not be zero  */
809 	if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
810 		psock->sendmsg_idx = 1;
811 	} else {
812 		psock->sendmsg_idx++;
813 	}
814 
815 	/* Consume the requests that were actually written */
816 	req = TAILQ_FIRST(&sock->queued_reqs);
817 	while (req) {
818 		offset = req->internal.offset;
819 
820 		for (i = 0; i < req->iovcnt; i++) {
821 			/* Advance by the offset first */
822 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
823 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
824 				continue;
825 			}
826 
827 			/* Calculate the remaining length of this element */
828 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
829 
830 			if (len > (size_t)rc) {
831 				/* This element was partially sent. */
832 				req->internal.offset += rc;
833 				return 0;
834 			}
835 
836 			offset = 0;
837 			req->internal.offset += len;
838 			rc -= len;
839 		}
840 
841 		/* Handled a full request. */
842 		spdk_sock_request_pend(sock, req);
843 
844 		if (!psock->zcopy) {
845 			/* The sendmsg syscall above isn't currently asynchronous,
846 			* so it's already done. */
847 			retval = spdk_sock_request_put(sock, req, 0);
848 			if (retval) {
849 				break;
850 			}
851 		} else {
852 			/* Re-use the offset field to hold the sendmsg call index. The
853 			 * index is 0 based, so subtract one here because we've already
854 			 * incremented above. */
855 			req->internal.offset = psock->sendmsg_idx - 1;
856 		}
857 
858 		if (rc == 0) {
859 			break;
860 		}
861 
862 		req = TAILQ_FIRST(&sock->queued_reqs);
863 	}
864 
865 	return 0;
866 }
867 
868 static int
869 posix_sock_flush(struct spdk_sock *_sock)
870 {
871 	return _sock_flush(_sock);
872 }
873 
874 static ssize_t
875 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
876 {
877 	struct iovec siov[2];
878 	int sbytes;
879 	ssize_t bytes;
880 	struct spdk_posix_sock_group_impl *group;
881 
882 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
883 	if (sbytes < 0) {
884 		errno = EINVAL;
885 		return -1;
886 	} else if (sbytes == 0) {
887 		errno = EAGAIN;
888 		return -1;
889 	}
890 
891 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
892 
893 	if (bytes == 0) {
894 		/* The only way this happens is if diov is 0 length */
895 		errno = EINVAL;
896 		return -1;
897 	}
898 
899 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
900 
901 	/* If we drained the pipe, take it off the level-triggered list */
902 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
903 		group = __posix_group_impl(sock->base.group_impl);
904 		TAILQ_REMOVE(&group->pending_recv, sock, link);
905 		sock->pending_recv = false;
906 	}
907 
908 	return bytes;
909 }
910 
911 static inline ssize_t
912 posix_sock_read(struct spdk_posix_sock *sock)
913 {
914 	struct iovec iov[2];
915 	int bytes;
916 	struct spdk_posix_sock_group_impl *group;
917 
918 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
919 
920 	if (bytes > 0) {
921 		bytes = readv(sock->fd, iov, 2);
922 		if (bytes > 0) {
923 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
924 			if (sock->base.group_impl) {
925 				group = __posix_group_impl(sock->base.group_impl);
926 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
927 				sock->pending_recv = true;
928 			}
929 		}
930 	}
931 
932 	return bytes;
933 }
934 
935 static ssize_t
936 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
937 {
938 	struct spdk_posix_sock *sock = __posix_sock(_sock);
939 	int rc, i;
940 	size_t len;
941 
942 	if (sock->recv_pipe == NULL) {
943 		return readv(sock->fd, iov, iovcnt);
944 	}
945 
946 	len = 0;
947 	for (i = 0; i < iovcnt; i++) {
948 		len += iov[i].iov_len;
949 	}
950 
951 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
952 		/* If the user is receiving a sufficiently large amount of data,
953 		 * receive directly to their buffers. */
954 		if (len >= MIN_SOCK_PIPE_SIZE) {
955 			return readv(sock->fd, iov, iovcnt);
956 		}
957 
958 		/* Otherwise, do a big read into our pipe */
959 		rc = posix_sock_read(sock);
960 		if (rc <= 0) {
961 			return rc;
962 		}
963 	}
964 
965 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
966 }
967 
968 static ssize_t
969 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
970 {
971 	struct iovec iov[1];
972 
973 	iov[0].iov_base = buf;
974 	iov[0].iov_len = len;
975 
976 	return posix_sock_readv(sock, iov, 1);
977 }
978 
979 static ssize_t
980 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
981 {
982 	struct spdk_posix_sock *sock = __posix_sock(_sock);
983 	int rc;
984 
985 	/* In order to process a writev, we need to flush any asynchronous writes
986 	 * first. */
987 	rc = _sock_flush(_sock);
988 	if (rc < 0) {
989 		return rc;
990 	}
991 
992 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
993 		/* We weren't able to flush all requests */
994 		errno = EAGAIN;
995 		return -1;
996 	}
997 
998 	return writev(sock->fd, iov, iovcnt);
999 }
1000 
1001 static void
1002 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1003 {
1004 	int rc;
1005 
1006 	spdk_sock_request_queue(sock, req);
1007 
1008 	/* If there are a sufficient number queued, just flush them out immediately. */
1009 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1010 		rc = _sock_flush(sock);
1011 		if (rc) {
1012 			spdk_sock_abort_requests(sock);
1013 		}
1014 	}
1015 }
1016 
1017 static int
1018 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1019 {
1020 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1021 	int val;
1022 	int rc;
1023 
1024 	assert(sock != NULL);
1025 
1026 	val = nbytes;
1027 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1028 	if (rc != 0) {
1029 		return -1;
1030 	}
1031 	return 0;
1032 }
1033 
1034 static bool
1035 posix_sock_is_ipv6(struct spdk_sock *_sock)
1036 {
1037 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1038 	struct sockaddr_storage sa;
1039 	socklen_t salen;
1040 	int rc;
1041 
1042 	assert(sock != NULL);
1043 
1044 	memset(&sa, 0, sizeof sa);
1045 	salen = sizeof sa;
1046 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1047 	if (rc != 0) {
1048 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1049 		return false;
1050 	}
1051 
1052 	return (sa.ss_family == AF_INET6);
1053 }
1054 
1055 static bool
1056 posix_sock_is_ipv4(struct spdk_sock *_sock)
1057 {
1058 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1059 	struct sockaddr_storage sa;
1060 	socklen_t salen;
1061 	int rc;
1062 
1063 	assert(sock != NULL);
1064 
1065 	memset(&sa, 0, sizeof sa);
1066 	salen = sizeof sa;
1067 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1068 	if (rc != 0) {
1069 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1070 		return false;
1071 	}
1072 
1073 	return (sa.ss_family == AF_INET);
1074 }
1075 
1076 static bool
1077 posix_sock_is_connected(struct spdk_sock *_sock)
1078 {
1079 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1080 	uint8_t byte;
1081 	int rc;
1082 
1083 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1084 	if (rc == 0) {
1085 		return false;
1086 	}
1087 
1088 	if (rc < 0) {
1089 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1090 			return true;
1091 		}
1092 
1093 		return false;
1094 	}
1095 
1096 	return true;
1097 }
1098 
1099 static int
1100 posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1101 {
1102 	int rc = -1;
1103 
1104 	if (!g_spdk_posix_sock_impl_opts.enable_placement_id) {
1105 		return rc;
1106 	}
1107 
1108 #if defined(SO_INCOMING_NAPI_ID)
1109 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1110 	socklen_t salen = sizeof(int);
1111 
1112 	rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
1113 	if (rc != 0) {
1114 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1115 	}
1116 
1117 #endif
1118 	return rc;
1119 }
1120 
1121 static struct spdk_sock_group_impl *
1122 posix_sock_group_impl_create(void)
1123 {
1124 	struct spdk_posix_sock_group_impl *group_impl;
1125 	int fd;
1126 
1127 #if defined(__linux__)
1128 	fd = epoll_create1(0);
1129 #elif defined(__FreeBSD__)
1130 	fd = kqueue();
1131 #endif
1132 	if (fd == -1) {
1133 		return NULL;
1134 	}
1135 
1136 	group_impl = calloc(1, sizeof(*group_impl));
1137 	if (group_impl == NULL) {
1138 		SPDK_ERRLOG("group_impl allocation failed\n");
1139 		close(fd);
1140 		return NULL;
1141 	}
1142 
1143 	group_impl->fd = fd;
1144 	TAILQ_INIT(&group_impl->pending_recv);
1145 
1146 	return &group_impl->base;
1147 }
1148 
1149 static int
1150 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1151 {
1152 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1153 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1154 	int rc;
1155 
1156 #if defined(__linux__)
1157 	struct epoll_event event;
1158 
1159 	memset(&event, 0, sizeof(event));
1160 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1161 	event.events = EPOLLIN | EPOLLERR;
1162 	event.data.ptr = sock;
1163 
1164 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1165 #elif defined(__FreeBSD__)
1166 	struct kevent event;
1167 	struct timespec ts = {0};
1168 
1169 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1170 
1171 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1172 #endif
1173 
1174 	/* switched from another polling group due to scheduling */
1175 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1176 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1177 		assert(sock->pending_recv == false);
1178 		sock->pending_recv = true;
1179 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1180 	}
1181 
1182 	return rc;
1183 }
1184 
1185 static int
1186 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1187 {
1188 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1189 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1190 	int rc;
1191 
1192 	if (sock->recv_pipe != NULL) {
1193 		if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
1194 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1195 			sock->pending_recv = false;
1196 		}
1197 		assert(sock->pending_recv == false);
1198 	}
1199 
1200 #if defined(__linux__)
1201 	struct epoll_event event;
1202 
1203 	/* Event parameter is ignored but some old kernel version still require it. */
1204 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1205 #elif defined(__FreeBSD__)
1206 	struct kevent event;
1207 	struct timespec ts = {0};
1208 
1209 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1210 
1211 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1212 	if (rc == 0 && event.flags & EV_ERROR) {
1213 		rc = -1;
1214 		errno = event.data;
1215 	}
1216 #endif
1217 
1218 	spdk_sock_abort_requests(_sock);
1219 
1220 	return rc;
1221 }
1222 
1223 static int
1224 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1225 			   struct spdk_sock **socks)
1226 {
1227 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1228 	struct spdk_sock *sock, *tmp;
1229 	int num_events, i, rc;
1230 	struct spdk_posix_sock *psock, *ptmp;
1231 #if defined(__linux__)
1232 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1233 #elif defined(__FreeBSD__)
1234 	struct kevent events[MAX_EVENTS_PER_POLL];
1235 	struct timespec ts = {0};
1236 #endif
1237 
1238 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1239 	 * a completion callback could remove the sock from the
1240 	 * group. */
1241 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1242 		rc = _sock_flush(sock);
1243 		if (rc) {
1244 			spdk_sock_abort_requests(sock);
1245 		}
1246 	}
1247 
1248 #if defined(__linux__)
1249 	num_events = epoll_wait(group->fd, events, max_events, 0);
1250 #elif defined(__FreeBSD__)
1251 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1252 #endif
1253 
1254 	if (num_events == -1) {
1255 		return -1;
1256 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
1257 		uint8_t byte;
1258 
1259 		sock = TAILQ_FIRST(&_group->socks);
1260 		psock = __posix_sock(sock);
1261 		/* a recv is done here to busy poll the queue associated with
1262 		 * first socket in list and potentially reap incoming data.
1263 		 */
1264 		if (psock->so_priority) {
1265 			recv(psock->fd, &byte, 1, MSG_PEEK);
1266 		}
1267 	}
1268 
1269 	for (i = 0; i < num_events; i++) {
1270 #if defined(__linux__)
1271 		sock = events[i].data.ptr;
1272 		psock = __posix_sock(sock);
1273 
1274 #ifdef SPDK_ZEROCOPY
1275 		if (events[i].events & EPOLLERR) {
1276 			rc = _sock_check_zcopy(sock);
1277 			/* If the socket was closed or removed from
1278 			 * the group in response to a send ack, don't
1279 			 * add it to the array here. */
1280 			if (rc || sock->cb_fn == NULL) {
1281 				continue;
1282 			}
1283 		}
1284 #endif
1285 		if ((events[i].events & EPOLLIN) == 0) {
1286 			continue;
1287 		}
1288 
1289 #elif defined(__FreeBSD__)
1290 		sock = events[i].udata;
1291 		psock = __posix_sock(sock);
1292 #endif
1293 
1294 		/* If the socket does not already have recv pending, add it now */
1295 		if (!psock->pending_recv) {
1296 			psock->pending_recv = true;
1297 			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1298 		}
1299 	}
1300 
1301 	num_events = 0;
1302 
1303 	TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) {
1304 		if (num_events == max_events) {
1305 			break;
1306 		}
1307 
1308 		socks[num_events++] = &psock->base;
1309 	}
1310 
1311 	/* Cycle the pending_recv list so that each time we poll things aren't
1312 	 * in the same order. */
1313 	for (i = 0; i < num_events; i++) {
1314 		psock = __posix_sock(socks[i]);
1315 
1316 		TAILQ_REMOVE(&group->pending_recv, psock, link);
1317 
1318 		if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) {
1319 			psock->pending_recv = false;
1320 		} else {
1321 			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1322 		}
1323 
1324 	}
1325 
1326 	return num_events;
1327 }
1328 
1329 static int
1330 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1331 {
1332 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1333 	int rc;
1334 
1335 	rc = close(group->fd);
1336 	free(group);
1337 	return rc;
1338 }
1339 
1340 static int
1341 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1342 {
1343 	if (!opts || !len) {
1344 		errno = EINVAL;
1345 		return -1;
1346 	}
1347 	memset(opts, 0, *len);
1348 
1349 #define FIELD_OK(field) \
1350 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1351 
1352 #define GET_FIELD(field) \
1353 	if (FIELD_OK(field)) { \
1354 		opts->field = g_spdk_posix_sock_impl_opts.field; \
1355 	}
1356 
1357 	GET_FIELD(recv_buf_size);
1358 	GET_FIELD(send_buf_size);
1359 	GET_FIELD(enable_recv_pipe);
1360 	GET_FIELD(enable_zerocopy_send);
1361 	GET_FIELD(enable_quickack);
1362 	GET_FIELD(enable_placement_id);
1363 
1364 #undef GET_FIELD
1365 #undef FIELD_OK
1366 
1367 	*len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
1368 	return 0;
1369 }
1370 
1371 static int
1372 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1373 {
1374 	if (!opts) {
1375 		errno = EINVAL;
1376 		return -1;
1377 	}
1378 
1379 #define FIELD_OK(field) \
1380 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1381 
1382 #define SET_FIELD(field) \
1383 	if (FIELD_OK(field)) { \
1384 		g_spdk_posix_sock_impl_opts.field = opts->field; \
1385 	}
1386 
1387 	SET_FIELD(recv_buf_size);
1388 	SET_FIELD(send_buf_size);
1389 	SET_FIELD(enable_recv_pipe);
1390 	SET_FIELD(enable_zerocopy_send);
1391 	SET_FIELD(enable_quickack);
1392 	SET_FIELD(enable_placement_id);
1393 
1394 #undef SET_FIELD
1395 #undef FIELD_OK
1396 
1397 	return 0;
1398 }
1399 
1400 
1401 static struct spdk_net_impl g_posix_net_impl = {
1402 	.name		= "posix",
1403 	.getaddr	= posix_sock_getaddr,
1404 	.connect	= posix_sock_connect,
1405 	.listen		= posix_sock_listen,
1406 	.accept		= posix_sock_accept,
1407 	.close		= posix_sock_close,
1408 	.recv		= posix_sock_recv,
1409 	.readv		= posix_sock_readv,
1410 	.writev		= posix_sock_writev,
1411 	.writev_async	= posix_sock_writev_async,
1412 	.flush		= posix_sock_flush,
1413 	.set_recvlowat	= posix_sock_set_recvlowat,
1414 	.set_recvbuf	= posix_sock_set_recvbuf,
1415 	.set_sendbuf	= posix_sock_set_sendbuf,
1416 	.is_ipv6	= posix_sock_is_ipv6,
1417 	.is_ipv4	= posix_sock_is_ipv4,
1418 	.is_connected	= posix_sock_is_connected,
1419 	.get_placement_id	= posix_sock_get_placement_id,
1420 	.group_impl_create	= posix_sock_group_impl_create,
1421 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
1422 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
1423 	.group_impl_poll	= posix_sock_group_impl_poll,
1424 	.group_impl_close	= posix_sock_group_impl_close,
1425 	.get_opts	= posix_sock_impl_get_opts,
1426 	.set_opts	= posix_sock_impl_set_opts,
1427 };
1428 
1429 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1430