xref: /spdk/module/sock/posix/posix.c (revision de21d8f4e45b732c13ce5c7aa1872f73bffd38aa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation. All rights reserved.
5  *   Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #if defined(__FreeBSD__)
37 #include <sys/event.h>
38 #define SPDK_KEVENT
39 #else
40 #include <sys/epoll.h>
41 #define SPDK_EPOLL
42 #endif
43 
44 #if defined(__linux__)
45 #include <linux/errqueue.h>
46 #endif
47 
48 #include "spdk/log.h"
49 #include "spdk/pipe.h"
50 #include "spdk/sock.h"
51 #include "spdk/util.h"
52 #include "spdk_internal/sock.h"
53 
54 #define MAX_TMPBUF 1024
55 #define PORTNUMLEN 32
56 
57 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
58 #define SPDK_ZEROCOPY
59 #endif
60 
61 struct spdk_posix_sock {
62 	struct spdk_sock	base;
63 	int			fd;
64 
65 	uint32_t		sendmsg_idx;
66 
67 	struct spdk_pipe	*recv_pipe;
68 	void			*recv_buf;
69 	int			recv_buf_sz;
70 	bool			pending_recv;
71 	bool			zcopy;
72 	int			so_priority;
73 
74 	TAILQ_ENTRY(spdk_posix_sock)	link;
75 };
76 
77 struct spdk_posix_sock_group_impl {
78 	struct spdk_sock_group_impl	base;
79 	int				fd;
80 	TAILQ_HEAD(, spdk_posix_sock)	pending_recv;
81 };
82 
83 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
84 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
85 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
86 	.enable_recv_pipe = true,
87 	.enable_zerocopy_send = true,
88 	.enable_quickack = false,
89 	.enable_placement_id = 0,
90 };
91 
92 static int
93 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
94 {
95 	const char *result = NULL;
96 
97 	if (sa == NULL || host == NULL) {
98 		return -1;
99 	}
100 
101 	switch (sa->sa_family) {
102 	case AF_INET:
103 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
104 				   host, hlen);
105 		break;
106 	case AF_INET6:
107 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
108 				   host, hlen);
109 		break;
110 	default:
111 		break;
112 	}
113 
114 	if (result != NULL) {
115 		return 0;
116 	} else {
117 		return -1;
118 	}
119 }
120 
121 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
122 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
123 
124 static int
125 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
126 		   char *caddr, int clen, uint16_t *cport)
127 {
128 	struct spdk_posix_sock *sock = __posix_sock(_sock);
129 	struct sockaddr_storage sa;
130 	socklen_t salen;
131 	int rc;
132 
133 	assert(sock != NULL);
134 
135 	memset(&sa, 0, sizeof sa);
136 	salen = sizeof sa;
137 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
138 	if (rc != 0) {
139 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
140 		return -1;
141 	}
142 
143 	switch (sa.ss_family) {
144 	case AF_UNIX:
145 		/* Acceptable connection types that don't have IPs */
146 		return 0;
147 	case AF_INET:
148 	case AF_INET6:
149 		/* Code below will get IP addresses */
150 		break;
151 	default:
152 		/* Unsupported socket family */
153 		return -1;
154 	}
155 
156 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
157 	if (rc != 0) {
158 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
159 		return -1;
160 	}
161 
162 	if (sport) {
163 		if (sa.ss_family == AF_INET) {
164 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
165 		} else if (sa.ss_family == AF_INET6) {
166 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
167 		}
168 	}
169 
170 	memset(&sa, 0, sizeof sa);
171 	salen = sizeof sa;
172 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
173 	if (rc != 0) {
174 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
175 		return -1;
176 	}
177 
178 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
179 	if (rc != 0) {
180 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
181 		return -1;
182 	}
183 
184 	if (cport) {
185 		if (sa.ss_family == AF_INET) {
186 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
187 		} else if (sa.ss_family == AF_INET6) {
188 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
189 		}
190 	}
191 
192 	return 0;
193 }
194 
195 enum posix_sock_create_type {
196 	SPDK_SOCK_CREATE_LISTEN,
197 	SPDK_SOCK_CREATE_CONNECT,
198 };
199 
200 static int
201 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
202 {
203 	uint8_t *new_buf;
204 	struct spdk_pipe *new_pipe;
205 	struct iovec siov[2];
206 	struct iovec diov[2];
207 	int sbytes;
208 	ssize_t bytes;
209 
210 	if (sock->recv_buf_sz == sz) {
211 		return 0;
212 	}
213 
214 	/* If the new size is 0, just free the pipe */
215 	if (sz == 0) {
216 		spdk_pipe_destroy(sock->recv_pipe);
217 		free(sock->recv_buf);
218 		sock->recv_pipe = NULL;
219 		sock->recv_buf = NULL;
220 		return 0;
221 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
222 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
223 		return -1;
224 	}
225 
226 	/* Round up to next 64 byte multiple */
227 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
228 	if (!new_buf) {
229 		SPDK_ERRLOG("socket recv buf allocation failed\n");
230 		return -ENOMEM;
231 	}
232 
233 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
234 	if (new_pipe == NULL) {
235 		SPDK_ERRLOG("socket pipe allocation failed\n");
236 		free(new_buf);
237 		return -ENOMEM;
238 	}
239 
240 	if (sock->recv_pipe != NULL) {
241 		/* Pull all of the data out of the old pipe */
242 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
243 		if (sbytes > sz) {
244 			/* Too much data to fit into the new pipe size */
245 			spdk_pipe_destroy(new_pipe);
246 			free(new_buf);
247 			return -EINVAL;
248 		}
249 
250 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
251 		assert(sbytes == sz);
252 
253 		bytes = spdk_iovcpy(siov, 2, diov, 2);
254 		spdk_pipe_writer_advance(new_pipe, bytes);
255 
256 		spdk_pipe_destroy(sock->recv_pipe);
257 		free(sock->recv_buf);
258 	}
259 
260 	sock->recv_buf_sz = sz;
261 	sock->recv_buf = new_buf;
262 	sock->recv_pipe = new_pipe;
263 
264 	return 0;
265 }
266 
267 static int
268 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
269 {
270 	struct spdk_posix_sock *sock = __posix_sock(_sock);
271 	int rc;
272 
273 	assert(sock != NULL);
274 
275 	if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) {
276 		rc = posix_sock_alloc_pipe(sock, sz);
277 		if (rc) {
278 			return rc;
279 		}
280 	}
281 
282 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
283 	if (sz < MIN_SO_RCVBUF_SIZE) {
284 		sz = MIN_SO_RCVBUF_SIZE;
285 	}
286 
287 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
288 	if (rc < 0) {
289 		return rc;
290 	}
291 
292 	return 0;
293 }
294 
295 static int
296 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
297 {
298 	struct spdk_posix_sock *sock = __posix_sock(_sock);
299 	int rc;
300 
301 	assert(sock != NULL);
302 
303 	if (sz < MIN_SO_SNDBUF_SIZE) {
304 		sz = MIN_SO_SNDBUF_SIZE;
305 	}
306 
307 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
308 	if (rc < 0) {
309 		return rc;
310 	}
311 
312 	return 0;
313 }
314 
315 static struct spdk_posix_sock *
316 posix_sock_alloc(int fd, bool enable_zero_copy)
317 {
318 	struct spdk_posix_sock *sock;
319 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
320 	int flag;
321 	int rc;
322 #endif
323 
324 	sock = calloc(1, sizeof(*sock));
325 	if (sock == NULL) {
326 		SPDK_ERRLOG("sock allocation failed\n");
327 		return NULL;
328 	}
329 
330 	sock->fd = fd;
331 
332 #if defined(SPDK_ZEROCOPY)
333 	flag = 1;
334 
335 	if (enable_zero_copy && g_spdk_posix_sock_impl_opts.enable_zerocopy_send) {
336 		/* Try to turn on zero copy sends */
337 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
338 		if (rc == 0) {
339 			sock->zcopy = true;
340 		}
341 	}
342 #endif
343 
344 #if defined(__linux__)
345 	flag = 1;
346 
347 	if (g_spdk_posix_sock_impl_opts.enable_quickack) {
348 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
349 		if (rc != 0) {
350 			SPDK_ERRLOG("quickack was failed to set\n");
351 		}
352 	}
353 #endif
354 
355 	return sock;
356 }
357 
358 static bool
359 sock_is_loopback(int fd)
360 {
361 	struct ifaddrs *addrs, *tmp;
362 	struct sockaddr_storage sa = {};
363 	socklen_t salen;
364 	struct ifreq ifr = {};
365 	char ip_addr[256], ip_addr_tmp[256];
366 	int rc;
367 	bool is_loopback = false;
368 
369 	salen = sizeof(sa);
370 	rc = getsockname(fd, (struct sockaddr *)&sa, &salen);
371 	if (rc != 0) {
372 		return is_loopback;
373 	}
374 
375 	memset(ip_addr, 0, sizeof(ip_addr));
376 	rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr));
377 	if (rc != 0) {
378 		return is_loopback;
379 	}
380 
381 	getifaddrs(&addrs);
382 	for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) {
383 		if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) &&
384 		    (tmp->ifa_addr->sa_family == sa.ss_family)) {
385 			memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp));
386 			rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp));
387 			if (rc != 0) {
388 				continue;
389 			}
390 
391 			if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) {
392 				memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name));
393 				ioctl(fd, SIOCGIFFLAGS, &ifr);
394 				if (ifr.ifr_flags & IFF_LOOPBACK) {
395 					is_loopback = true;
396 				}
397 				goto end;
398 			}
399 		}
400 	}
401 
402 end:
403 	freeifaddrs(addrs);
404 	return is_loopback;
405 }
406 
407 static struct spdk_sock *
408 posix_sock_create(const char *ip, int port,
409 		  enum posix_sock_create_type type,
410 		  struct spdk_sock_opts *opts)
411 {
412 	struct spdk_posix_sock *sock;
413 	char buf[MAX_TMPBUF];
414 	char portnum[PORTNUMLEN];
415 	char *p;
416 	struct addrinfo hints, *res, *res0;
417 	int fd, flag;
418 	int val = 1;
419 	int rc, sz;
420 	bool enable_zero_copy = true;
421 
422 	assert(opts != NULL);
423 
424 	if (ip == NULL) {
425 		return NULL;
426 	}
427 	if (ip[0] == '[') {
428 		snprintf(buf, sizeof(buf), "%s", ip + 1);
429 		p = strchr(buf, ']');
430 		if (p != NULL) {
431 			*p = '\0';
432 		}
433 		ip = (const char *) &buf[0];
434 	}
435 
436 	snprintf(portnum, sizeof portnum, "%d", port);
437 	memset(&hints, 0, sizeof hints);
438 	hints.ai_family = PF_UNSPEC;
439 	hints.ai_socktype = SOCK_STREAM;
440 	hints.ai_flags = AI_NUMERICSERV;
441 	hints.ai_flags |= AI_PASSIVE;
442 	hints.ai_flags |= AI_NUMERICHOST;
443 	rc = getaddrinfo(ip, portnum, &hints, &res0);
444 	if (rc != 0) {
445 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
446 		return NULL;
447 	}
448 
449 	/* try listen */
450 	fd = -1;
451 	for (res = res0; res != NULL; res = res->ai_next) {
452 retry:
453 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
454 		if (fd < 0) {
455 			/* error */
456 			continue;
457 		}
458 
459 		sz = g_spdk_posix_sock_impl_opts.recv_buf_size;
460 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
461 		if (rc) {
462 			/* Not fatal */
463 		}
464 
465 		sz = g_spdk_posix_sock_impl_opts.send_buf_size;
466 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
467 		if (rc) {
468 			/* Not fatal */
469 		}
470 
471 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
472 		if (rc != 0) {
473 			close(fd);
474 			/* error */
475 			continue;
476 		}
477 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
478 		if (rc != 0) {
479 			close(fd);
480 			/* error */
481 			continue;
482 		}
483 
484 #if defined(SO_PRIORITY)
485 		if (opts->priority) {
486 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
487 			if (rc != 0) {
488 				close(fd);
489 				/* error */
490 				continue;
491 			}
492 		}
493 #endif
494 
495 		if (res->ai_family == AF_INET6) {
496 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
497 			if (rc != 0) {
498 				close(fd);
499 				/* error */
500 				continue;
501 			}
502 		}
503 
504 		if (type == SPDK_SOCK_CREATE_LISTEN) {
505 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
506 			if (rc != 0) {
507 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
508 				switch (errno) {
509 				case EINTR:
510 					/* interrupted? */
511 					close(fd);
512 					goto retry;
513 				case EADDRNOTAVAIL:
514 					SPDK_ERRLOG("IP address %s not available. "
515 						    "Verify IP address in config file "
516 						    "and make sure setup script is "
517 						    "run before starting spdk app.\n", ip);
518 				/* FALLTHROUGH */
519 				default:
520 					/* try next family */
521 					close(fd);
522 					fd = -1;
523 					continue;
524 				}
525 			}
526 			/* bind OK */
527 			rc = listen(fd, 512);
528 			if (rc != 0) {
529 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
530 				close(fd);
531 				fd = -1;
532 				break;
533 			}
534 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
535 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
536 			if (rc != 0) {
537 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
538 				/* try next family */
539 				close(fd);
540 				fd = -1;
541 				continue;
542 			}
543 		}
544 
545 		flag = fcntl(fd, F_GETFL);
546 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
547 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
548 			close(fd);
549 			fd = -1;
550 			break;
551 		}
552 		break;
553 	}
554 	freeaddrinfo(res0);
555 
556 	if (fd < 0) {
557 		return NULL;
558 	}
559 
560 	/* Only enable zero copy for non-loopback sockets. */
561 	enable_zero_copy = opts->zcopy && !sock_is_loopback(fd);
562 
563 	sock = posix_sock_alloc(fd, enable_zero_copy);
564 	if (sock == NULL) {
565 		SPDK_ERRLOG("sock allocation failed\n");
566 		close(fd);
567 		return NULL;
568 	}
569 
570 	if (opts != NULL) {
571 		sock->so_priority = opts->priority;
572 	}
573 	return &sock->base;
574 }
575 
576 static struct spdk_sock *
577 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
578 {
579 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
580 }
581 
582 static struct spdk_sock *
583 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
584 {
585 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
586 }
587 
588 static struct spdk_sock *
589 posix_sock_accept(struct spdk_sock *_sock)
590 {
591 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
592 	struct sockaddr_storage		sa;
593 	socklen_t			salen;
594 	int				rc, fd;
595 	struct spdk_posix_sock		*new_sock;
596 	int				flag;
597 
598 	memset(&sa, 0, sizeof(sa));
599 	salen = sizeof(sa);
600 
601 	assert(sock != NULL);
602 
603 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
604 
605 	if (rc == -1) {
606 		return NULL;
607 	}
608 
609 	fd = rc;
610 
611 	flag = fcntl(fd, F_GETFL);
612 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
613 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
614 		close(fd);
615 		return NULL;
616 	}
617 
618 #if defined(SO_PRIORITY)
619 	/* The priority is not inherited, so call this function again */
620 	if (sock->base.opts.priority) {
621 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
622 		if (rc != 0) {
623 			close(fd);
624 			return NULL;
625 		}
626 	}
627 #endif
628 
629 	/* Inherit the zero copy feature from the listen socket */
630 	new_sock = posix_sock_alloc(fd, sock->zcopy);
631 	if (new_sock == NULL) {
632 		close(fd);
633 		return NULL;
634 	}
635 	new_sock->so_priority = sock->base.opts.priority;
636 
637 	return &new_sock->base;
638 }
639 
640 static int
641 posix_sock_close(struct spdk_sock *_sock)
642 {
643 	struct spdk_posix_sock *sock = __posix_sock(_sock);
644 
645 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
646 
647 	/* If the socket fails to close, the best choice is to
648 	 * leak the fd but continue to free the rest of the sock
649 	 * memory. */
650 	close(sock->fd);
651 
652 	spdk_pipe_destroy(sock->recv_pipe);
653 	free(sock->recv_buf);
654 	free(sock);
655 
656 	return 0;
657 }
658 
659 #ifdef SPDK_ZEROCOPY
660 static int
661 _sock_check_zcopy(struct spdk_sock *sock)
662 {
663 	struct spdk_posix_sock *psock = __posix_sock(sock);
664 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->group_impl);
665 	struct msghdr msgh = {};
666 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
667 	ssize_t rc;
668 	struct sock_extended_err *serr;
669 	struct cmsghdr *cm;
670 	uint32_t idx;
671 	struct spdk_sock_request *req, *treq;
672 	bool found;
673 
674 	msgh.msg_control = buf;
675 	msgh.msg_controllen = sizeof(buf);
676 
677 	while (true) {
678 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
679 
680 		if (rc < 0) {
681 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
682 				return 0;
683 			}
684 
685 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
686 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
687 			} else {
688 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
689 			}
690 			return 0;
691 		}
692 
693 		cm = CMSG_FIRSTHDR(&msgh);
694 		if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
695 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
696 			return 0;
697 		}
698 
699 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
700 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
701 			SPDK_WARNLOG("Unexpected extended error origin\n");
702 			return 0;
703 		}
704 
705 		/* Most of the time, the pending_reqs array is in the exact
706 		 * order we need such that all of the requests to complete are
707 		 * in order, in the front. It is guaranteed that all requests
708 		 * belonging to the same sendmsg call are sequential, so once
709 		 * we encounter one match we can stop looping as soon as a
710 		 * non-match is found.
711 		 */
712 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
713 			found = false;
714 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
715 				if (req->internal.offset == idx) {
716 					found = true;
717 
718 					rc = spdk_sock_request_put(sock, req, 0);
719 					if (rc < 0) {
720 						return rc;
721 					}
722 
723 				} else if (found) {
724 					break;
725 				}
726 			}
727 
728 			/* If we reaped buffer reclaim notification and sock is not in pending_recv list yet,
729 			 * add it now. It allows to call socket callback and process completions */
730 			if (found && !psock->pending_recv && group) {
731 				psock->pending_recv = true;
732 				TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
733 			}
734 		}
735 	}
736 
737 	return 0;
738 }
739 #endif
740 
741 static int
742 _sock_flush(struct spdk_sock *sock)
743 {
744 	struct spdk_posix_sock *psock = __posix_sock(sock);
745 	struct msghdr msg = {};
746 	int flags;
747 	struct iovec iovs[IOV_BATCH_SIZE];
748 	int iovcnt;
749 	int retval;
750 	struct spdk_sock_request *req;
751 	int i;
752 	ssize_t rc;
753 	unsigned int offset;
754 	size_t len;
755 
756 	/* Can't flush from within a callback or we end up with recursive calls */
757 	if (sock->cb_cnt > 0) {
758 		return 0;
759 	}
760 
761 	iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL);
762 
763 	if (iovcnt == 0) {
764 		return 0;
765 	}
766 
767 	/* Perform the vectored write */
768 	msg.msg_iov = iovs;
769 	msg.msg_iovlen = iovcnt;
770 #ifdef SPDK_ZEROCOPY
771 	if (psock->zcopy) {
772 		flags = MSG_ZEROCOPY;
773 	} else
774 #endif
775 	{
776 		flags = 0;
777 	}
778 	rc = sendmsg(psock->fd, &msg, flags);
779 	if (rc <= 0) {
780 		if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
781 			return 0;
782 		}
783 		return rc;
784 	}
785 
786 	/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
787 	 * req->internal.offset, so sendmsg_idx should not be zero  */
788 	if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
789 		psock->sendmsg_idx = 1;
790 	} else {
791 		psock->sendmsg_idx++;
792 	}
793 
794 	/* Consume the requests that were actually written */
795 	req = TAILQ_FIRST(&sock->queued_reqs);
796 	while (req) {
797 		offset = req->internal.offset;
798 
799 		for (i = 0; i < req->iovcnt; i++) {
800 			/* Advance by the offset first */
801 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
802 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
803 				continue;
804 			}
805 
806 			/* Calculate the remaining length of this element */
807 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
808 
809 			if (len > (size_t)rc) {
810 				/* This element was partially sent. */
811 				req->internal.offset += rc;
812 				return 0;
813 			}
814 
815 			offset = 0;
816 			req->internal.offset += len;
817 			rc -= len;
818 		}
819 
820 		/* Handled a full request. */
821 		spdk_sock_request_pend(sock, req);
822 
823 		if (!psock->zcopy) {
824 			/* The sendmsg syscall above isn't currently asynchronous,
825 			* so it's already done. */
826 			retval = spdk_sock_request_put(sock, req, 0);
827 			if (retval) {
828 				break;
829 			}
830 		} else {
831 			/* Re-use the offset field to hold the sendmsg call index. The
832 			 * index is 0 based, so subtract one here because we've already
833 			 * incremented above. */
834 			req->internal.offset = psock->sendmsg_idx - 1;
835 		}
836 
837 		if (rc == 0) {
838 			break;
839 		}
840 
841 		req = TAILQ_FIRST(&sock->queued_reqs);
842 	}
843 
844 	return 0;
845 }
846 
847 static int
848 posix_sock_flush(struct spdk_sock *sock)
849 {
850 #ifdef SPDK_ZEROCOPY
851 	struct spdk_posix_sock *psock = __posix_sock(sock);
852 
853 	if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) {
854 		_sock_check_zcopy(sock);
855 	}
856 #endif
857 
858 	return _sock_flush(sock);
859 }
860 
861 static ssize_t
862 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
863 {
864 	struct iovec siov[2];
865 	int sbytes;
866 	ssize_t bytes;
867 	struct spdk_posix_sock_group_impl *group;
868 
869 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
870 	if (sbytes < 0) {
871 		errno = EINVAL;
872 		return -1;
873 	} else if (sbytes == 0) {
874 		errno = EAGAIN;
875 		return -1;
876 	}
877 
878 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
879 
880 	if (bytes == 0) {
881 		/* The only way this happens is if diov is 0 length */
882 		errno = EINVAL;
883 		return -1;
884 	}
885 
886 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
887 
888 	/* If we drained the pipe, take it off the pending_recv list. The socket may still have data buffered
889 	 * in the kernel to receive, but this will be handled on the next poll call when we get the same EPOLLIN
890 	 * event again. */
891 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
892 		group = __posix_group_impl(sock->base.group_impl);
893 		TAILQ_REMOVE(&group->pending_recv, sock, link);
894 		sock->pending_recv = false;
895 	}
896 
897 	return bytes;
898 }
899 
900 static inline ssize_t
901 posix_sock_read(struct spdk_posix_sock *sock)
902 {
903 	struct iovec iov[2];
904 	int bytes;
905 	struct spdk_posix_sock_group_impl *group;
906 
907 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
908 
909 	if (bytes > 0) {
910 		bytes = readv(sock->fd, iov, 2);
911 		if (bytes > 0) {
912 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
913 
914 			/* For normal operation, this function is called in response to an EPOLLIN
915 			 * event, which already placed the socket onto the pending_recv list.
916 			 * But between polls the user may repeatedly call posix_sock_read
917 			 * and if they clear the pipe on one of those earlier calls, the
918 			 * socket will be removed from the pending_recv list. In that case,
919 			 * if we now found more data, put it back on.
920 			 * This essentially never happens in practice because the application
921 			 * will stop trying to receive and wait for the next EPOLLIN event, but
922 			 * for correctness let's handle it. */
923 			if (!sock->pending_recv && sock->base.group_impl) {
924 				group = __posix_group_impl(sock->base.group_impl);
925 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
926 				sock->pending_recv = true;
927 			}
928 		}
929 	}
930 
931 	return bytes;
932 }
933 
934 static ssize_t
935 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
936 {
937 	struct spdk_posix_sock *sock = __posix_sock(_sock);
938 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
939 	int rc, i;
940 	size_t len;
941 
942 	if (sock->recv_pipe == NULL) {
943 		if (group && sock->pending_recv) {
944 			sock->pending_recv = false;
945 			TAILQ_REMOVE(&group->pending_recv, sock, link);
946 		}
947 		return readv(sock->fd, iov, iovcnt);
948 	}
949 
950 	len = 0;
951 	for (i = 0; i < iovcnt; i++) {
952 		len += iov[i].iov_len;
953 	}
954 
955 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
956 		/* If the user is receiving a sufficiently large amount of data,
957 		 * receive directly to their buffers. */
958 		if (len >= MIN_SOCK_PIPE_SIZE) {
959 			if (group && sock->pending_recv) {
960 				sock->pending_recv = false;
961 				TAILQ_REMOVE(&group->pending_recv, sock, link);
962 			}
963 			return readv(sock->fd, iov, iovcnt);
964 		}
965 
966 		/* Otherwise, do a big read into our pipe */
967 		rc = posix_sock_read(sock);
968 		if (rc <= 0) {
969 			return rc;
970 		}
971 	}
972 
973 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
974 }
975 
976 static ssize_t
977 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
978 {
979 	struct iovec iov[1];
980 
981 	iov[0].iov_base = buf;
982 	iov[0].iov_len = len;
983 
984 	return posix_sock_readv(sock, iov, 1);
985 }
986 
987 static ssize_t
988 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
989 {
990 	struct spdk_posix_sock *sock = __posix_sock(_sock);
991 	int rc;
992 
993 	/* In order to process a writev, we need to flush any asynchronous writes
994 	 * first. */
995 	rc = _sock_flush(_sock);
996 	if (rc < 0) {
997 		return rc;
998 	}
999 
1000 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
1001 		/* We weren't able to flush all requests */
1002 		errno = EAGAIN;
1003 		return -1;
1004 	}
1005 
1006 	return writev(sock->fd, iov, iovcnt);
1007 }
1008 
1009 static void
1010 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1011 {
1012 	int rc;
1013 
1014 	spdk_sock_request_queue(sock, req);
1015 
1016 	/* If there are a sufficient number queued, just flush them out immediately. */
1017 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1018 		rc = _sock_flush(sock);
1019 		if (rc) {
1020 			spdk_sock_abort_requests(sock);
1021 		}
1022 	}
1023 }
1024 
1025 static int
1026 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1027 {
1028 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1029 	int val;
1030 	int rc;
1031 
1032 	assert(sock != NULL);
1033 
1034 	val = nbytes;
1035 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1036 	if (rc != 0) {
1037 		return -1;
1038 	}
1039 	return 0;
1040 }
1041 
1042 static bool
1043 posix_sock_is_ipv6(struct spdk_sock *_sock)
1044 {
1045 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1046 	struct sockaddr_storage sa;
1047 	socklen_t salen;
1048 	int rc;
1049 
1050 	assert(sock != NULL);
1051 
1052 	memset(&sa, 0, sizeof sa);
1053 	salen = sizeof sa;
1054 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1055 	if (rc != 0) {
1056 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1057 		return false;
1058 	}
1059 
1060 	return (sa.ss_family == AF_INET6);
1061 }
1062 
1063 static bool
1064 posix_sock_is_ipv4(struct spdk_sock *_sock)
1065 {
1066 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1067 	struct sockaddr_storage sa;
1068 	socklen_t salen;
1069 	int rc;
1070 
1071 	assert(sock != NULL);
1072 
1073 	memset(&sa, 0, sizeof sa);
1074 	salen = sizeof sa;
1075 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1076 	if (rc != 0) {
1077 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1078 		return false;
1079 	}
1080 
1081 	return (sa.ss_family == AF_INET);
1082 }
1083 
1084 static bool
1085 posix_sock_is_connected(struct spdk_sock *_sock)
1086 {
1087 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1088 	uint8_t byte;
1089 	int rc;
1090 
1091 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1092 	if (rc == 0) {
1093 		return false;
1094 	}
1095 
1096 	if (rc < 0) {
1097 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1098 			return true;
1099 		}
1100 
1101 		return false;
1102 	}
1103 
1104 	return true;
1105 }
1106 
1107 static int
1108 posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1109 {
1110 	int rc = -1;
1111 
1112 	if (!g_spdk_posix_sock_impl_opts.enable_placement_id) {
1113 		return rc;
1114 	}
1115 
1116 	if (g_spdk_posix_sock_impl_opts.enable_placement_id != 0) {
1117 		switch (g_spdk_posix_sock_impl_opts.enable_placement_id) {
1118 		case 1: {
1119 #if defined(SO_INCOMING_NAPI_ID)
1120 			struct spdk_posix_sock *sock = __posix_sock(_sock);
1121 			socklen_t len = sizeof(int);
1122 
1123 			rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len);
1124 #endif
1125 			break;
1126 		}
1127 		case 2: {
1128 #if defined(SO_INCOMING_CPU)
1129 			struct spdk_posix_sock *sock = __posix_sock(_sock);
1130 			socklen_t len = sizeof(int);
1131 
1132 			rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len);
1133 #endif
1134 			break;
1135 		}
1136 		default:
1137 			break;
1138 		}
1139 	}
1140 
1141 	if (rc != 0) {
1142 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1143 	}
1144 	return rc;
1145 }
1146 
1147 static struct spdk_sock_group_impl *
1148 posix_sock_group_impl_create(void)
1149 {
1150 	struct spdk_posix_sock_group_impl *group_impl;
1151 	int fd;
1152 
1153 #if defined(SPDK_EPOLL)
1154 	fd = epoll_create1(0);
1155 #elif defined(SPDK_KEVENT)
1156 	fd = kqueue();
1157 #endif
1158 	if (fd == -1) {
1159 		return NULL;
1160 	}
1161 
1162 	group_impl = calloc(1, sizeof(*group_impl));
1163 	if (group_impl == NULL) {
1164 		SPDK_ERRLOG("group_impl allocation failed\n");
1165 		close(fd);
1166 		return NULL;
1167 	}
1168 
1169 	group_impl->fd = fd;
1170 	TAILQ_INIT(&group_impl->pending_recv);
1171 
1172 	return &group_impl->base;
1173 }
1174 
1175 static int
1176 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1177 {
1178 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1179 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1180 	int rc;
1181 
1182 #if defined(SPDK_EPOLL)
1183 	struct epoll_event event;
1184 
1185 	memset(&event, 0, sizeof(event));
1186 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1187 	event.events = EPOLLIN | EPOLLERR;
1188 	event.data.ptr = sock;
1189 
1190 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1191 #elif defined(SPDK_KEVENT)
1192 	struct kevent event;
1193 	struct timespec ts = {0};
1194 
1195 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1196 
1197 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1198 #endif
1199 
1200 	/* switched from another polling group due to scheduling */
1201 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1202 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1203 		assert(sock->pending_recv == false);
1204 		sock->pending_recv = true;
1205 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1206 	}
1207 
1208 	return rc;
1209 }
1210 
1211 static int
1212 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1213 {
1214 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1215 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1216 	int rc;
1217 
1218 	if (sock->pending_recv) {
1219 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1220 		sock->pending_recv = false;
1221 	}
1222 
1223 #if defined(SPDK_EPOLL)
1224 	struct epoll_event event;
1225 
1226 	/* Event parameter is ignored but some old kernel version still require it. */
1227 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1228 #elif defined(SPDK_KEVENT)
1229 	struct kevent event;
1230 	struct timespec ts = {0};
1231 
1232 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1233 
1234 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1235 	if (rc == 0 && event.flags & EV_ERROR) {
1236 		rc = -1;
1237 		errno = event.data;
1238 	}
1239 #endif
1240 
1241 	spdk_sock_abort_requests(_sock);
1242 
1243 	return rc;
1244 }
1245 
1246 static int
1247 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1248 			   struct spdk_sock **socks)
1249 {
1250 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1251 	struct spdk_sock *sock, *tmp;
1252 	int num_events, i, rc;
1253 	struct spdk_posix_sock *psock, *ptmp;
1254 #if defined(SPDK_EPOLL)
1255 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1256 #elif defined(SPDK_KEVENT)
1257 	struct kevent events[MAX_EVENTS_PER_POLL];
1258 	struct timespec ts = {0};
1259 #endif
1260 
1261 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1262 	 * a completion callback could remove the sock from the
1263 	 * group. */
1264 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1265 		rc = _sock_flush(sock);
1266 		if (rc) {
1267 			spdk_sock_abort_requests(sock);
1268 		}
1269 	}
1270 
1271 #if defined(SPDK_EPOLL)
1272 	num_events = epoll_wait(group->fd, events, max_events, 0);
1273 #elif defined(SPDK_KEVENT)
1274 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1275 #endif
1276 
1277 	if (num_events == -1) {
1278 		return -1;
1279 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
1280 		uint8_t byte;
1281 
1282 		sock = TAILQ_FIRST(&_group->socks);
1283 		psock = __posix_sock(sock);
1284 		/* a recv is done here to busy poll the queue associated with
1285 		 * first socket in list and potentially reap incoming data.
1286 		 */
1287 		if (psock->so_priority) {
1288 			recv(psock->fd, &byte, 1, MSG_PEEK);
1289 		}
1290 	}
1291 
1292 	for (i = 0; i < num_events; i++) {
1293 #if defined(SPDK_EPOLL)
1294 		sock = events[i].data.ptr;
1295 		psock = __posix_sock(sock);
1296 
1297 #ifdef SPDK_ZEROCOPY
1298 		if (events[i].events & EPOLLERR) {
1299 			rc = _sock_check_zcopy(sock);
1300 			/* If the socket was closed or removed from
1301 			 * the group in response to a send ack, don't
1302 			 * add it to the array here. */
1303 			if (rc || sock->cb_fn == NULL) {
1304 				continue;
1305 			}
1306 		}
1307 #endif
1308 		if ((events[i].events & EPOLLIN) == 0) {
1309 			continue;
1310 		}
1311 
1312 #elif defined(SPDK_KEVENT)
1313 		sock = events[i].udata;
1314 		psock = __posix_sock(sock);
1315 #endif
1316 
1317 		/* If the socket does not already have recv pending, add it now */
1318 		if (!psock->pending_recv) {
1319 			psock->pending_recv = true;
1320 			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1321 		}
1322 	}
1323 
1324 	num_events = 0;
1325 
1326 	TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) {
1327 		if (num_events == max_events) {
1328 			break;
1329 		}
1330 
1331 		/* If the socket's cb_fn is NULL, just remove it from the
1332 		 * list and do not add it to socks array */
1333 		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
1334 			psock->pending_recv = false;
1335 			TAILQ_REMOVE(&group->pending_recv, psock, link);
1336 			continue;
1337 		}
1338 
1339 		socks[num_events++] = &psock->base;
1340 	}
1341 
1342 	/* Cycle the pending_recv list so that each time we poll things aren't
1343 	 * in the same order.
1344 	 * TODO: This could be done with a single operation because psock points
1345 	 * to the last node that needs to get cycled already. */
1346 	for (i = 0; i < num_events; i++) {
1347 		psock = __posix_sock(socks[i]);
1348 
1349 		TAILQ_REMOVE(&group->pending_recv, psock, link);
1350 		TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1351 	}
1352 
1353 	return num_events;
1354 }
1355 
1356 static int
1357 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1358 {
1359 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1360 	int rc;
1361 
1362 	rc = close(group->fd);
1363 	free(group);
1364 	return rc;
1365 }
1366 
1367 static int
1368 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1369 {
1370 	if (!opts || !len) {
1371 		errno = EINVAL;
1372 		return -1;
1373 	}
1374 	memset(opts, 0, *len);
1375 
1376 #define FIELD_OK(field) \
1377 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1378 
1379 #define GET_FIELD(field) \
1380 	if (FIELD_OK(field)) { \
1381 		opts->field = g_spdk_posix_sock_impl_opts.field; \
1382 	}
1383 
1384 	GET_FIELD(recv_buf_size);
1385 	GET_FIELD(send_buf_size);
1386 	GET_FIELD(enable_recv_pipe);
1387 	GET_FIELD(enable_zerocopy_send);
1388 	GET_FIELD(enable_quickack);
1389 	GET_FIELD(enable_placement_id);
1390 
1391 #undef GET_FIELD
1392 #undef FIELD_OK
1393 
1394 	*len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
1395 	return 0;
1396 }
1397 
1398 static int
1399 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1400 {
1401 	if (!opts) {
1402 		errno = EINVAL;
1403 		return -1;
1404 	}
1405 
1406 #define FIELD_OK(field) \
1407 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1408 
1409 #define SET_FIELD(field) \
1410 	if (FIELD_OK(field)) { \
1411 		g_spdk_posix_sock_impl_opts.field = opts->field; \
1412 	}
1413 
1414 	SET_FIELD(recv_buf_size);
1415 	SET_FIELD(send_buf_size);
1416 	SET_FIELD(enable_recv_pipe);
1417 	SET_FIELD(enable_zerocopy_send);
1418 	SET_FIELD(enable_quickack);
1419 	SET_FIELD(enable_placement_id);
1420 
1421 #undef SET_FIELD
1422 #undef FIELD_OK
1423 
1424 	return 0;
1425 }
1426 
1427 
1428 static struct spdk_net_impl g_posix_net_impl = {
1429 	.name		= "posix",
1430 	.getaddr	= posix_sock_getaddr,
1431 	.connect	= posix_sock_connect,
1432 	.listen		= posix_sock_listen,
1433 	.accept		= posix_sock_accept,
1434 	.close		= posix_sock_close,
1435 	.recv		= posix_sock_recv,
1436 	.readv		= posix_sock_readv,
1437 	.writev		= posix_sock_writev,
1438 	.writev_async	= posix_sock_writev_async,
1439 	.flush		= posix_sock_flush,
1440 	.set_recvlowat	= posix_sock_set_recvlowat,
1441 	.set_recvbuf	= posix_sock_set_recvbuf,
1442 	.set_sendbuf	= posix_sock_set_sendbuf,
1443 	.is_ipv6	= posix_sock_is_ipv6,
1444 	.is_ipv4	= posix_sock_is_ipv4,
1445 	.is_connected	= posix_sock_is_connected,
1446 	.get_placement_id	= posix_sock_get_placement_id,
1447 	.group_impl_create	= posix_sock_group_impl_create,
1448 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
1449 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
1450 	.group_impl_poll	= posix_sock_group_impl_poll,
1451 	.group_impl_close	= posix_sock_group_impl_close,
1452 	.get_opts	= posix_sock_impl_get_opts,
1453 	.set_opts	= posix_sock_impl_set_opts,
1454 };
1455 
1456 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1457