xref: /spdk/module/sock/posix/posix.c (revision 441431d22872ae4e05a1bf8b78e9aeff1eba1eb3)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation. All rights reserved.
5  *   Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #if defined(__FreeBSD__)
37 #include <sys/event.h>
38 #define SPDK_KEVENT
39 #else
40 #include <sys/epoll.h>
41 #define SPDK_EPOLL
42 #endif
43 
44 #if defined(__linux__)
45 #include <linux/errqueue.h>
46 #endif
47 
48 #include "spdk/env.h"
49 #include "spdk/log.h"
50 #include "spdk/pipe.h"
51 #include "spdk/sock.h"
52 #include "spdk/util.h"
53 #include "spdk_internal/sock.h"
54 
55 #define MAX_TMPBUF 1024
56 #define PORTNUMLEN 32
57 
58 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
59 #define SPDK_ZEROCOPY
60 #endif
61 
62 struct spdk_posix_sock {
63 	struct spdk_sock	base;
64 	int			fd;
65 
66 	uint32_t		sendmsg_idx;
67 
68 	struct spdk_pipe	*recv_pipe;
69 	void			*recv_buf;
70 	int			recv_buf_sz;
71 	bool			pipe_has_data;
72 	bool			socket_has_data;
73 	bool			zcopy;
74 
75 	int			placement_id;
76 
77 	TAILQ_ENTRY(spdk_posix_sock)	link;
78 };
79 
80 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock);
81 
82 struct spdk_posix_sock_group_impl {
83 	struct spdk_sock_group_impl	base;
84 	int				fd;
85 	struct spdk_has_data_list	socks_with_data;
86 	int				placement_id;
87 };
88 
89 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
90 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
91 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
92 	.enable_recv_pipe = true,
93 	.enable_zerocopy_send = true,
94 	.enable_quickack = false,
95 	.enable_placement_id = PLACEMENT_NONE,
96 	.enable_zerocopy_send_server = true,
97 	.enable_zerocopy_send_client = false
98 };
99 
100 static struct spdk_sock_map g_map = {
101 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
102 	.mtx = PTHREAD_MUTEX_INITIALIZER
103 };
104 
105 __attribute((destructor)) static void
106 posix_sock_map_cleanup(void)
107 {
108 	spdk_sock_map_cleanup(&g_map);
109 }
110 
111 static int
112 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
113 {
114 	const char *result = NULL;
115 
116 	if (sa == NULL || host == NULL) {
117 		return -1;
118 	}
119 
120 	switch (sa->sa_family) {
121 	case AF_INET:
122 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
123 				   host, hlen);
124 		break;
125 	case AF_INET6:
126 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
127 				   host, hlen);
128 		break;
129 	default:
130 		break;
131 	}
132 
133 	if (result != NULL) {
134 		return 0;
135 	} else {
136 		return -1;
137 	}
138 }
139 
140 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
141 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
142 
143 static int
144 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
145 		   char *caddr, int clen, uint16_t *cport)
146 {
147 	struct spdk_posix_sock *sock = __posix_sock(_sock);
148 	struct sockaddr_storage sa;
149 	socklen_t salen;
150 	int rc;
151 
152 	assert(sock != NULL);
153 
154 	memset(&sa, 0, sizeof sa);
155 	salen = sizeof sa;
156 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
157 	if (rc != 0) {
158 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
159 		return -1;
160 	}
161 
162 	switch (sa.ss_family) {
163 	case AF_UNIX:
164 		/* Acceptable connection types that don't have IPs */
165 		return 0;
166 	case AF_INET:
167 	case AF_INET6:
168 		/* Code below will get IP addresses */
169 		break;
170 	default:
171 		/* Unsupported socket family */
172 		return -1;
173 	}
174 
175 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
176 	if (rc != 0) {
177 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
178 		return -1;
179 	}
180 
181 	if (sport) {
182 		if (sa.ss_family == AF_INET) {
183 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
184 		} else if (sa.ss_family == AF_INET6) {
185 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
186 		}
187 	}
188 
189 	memset(&sa, 0, sizeof sa);
190 	salen = sizeof sa;
191 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
192 	if (rc != 0) {
193 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
194 		return -1;
195 	}
196 
197 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
198 	if (rc != 0) {
199 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
200 		return -1;
201 	}
202 
203 	if (cport) {
204 		if (sa.ss_family == AF_INET) {
205 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
206 		} else if (sa.ss_family == AF_INET6) {
207 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
208 		}
209 	}
210 
211 	return 0;
212 }
213 
214 enum posix_sock_create_type {
215 	SPDK_SOCK_CREATE_LISTEN,
216 	SPDK_SOCK_CREATE_CONNECT,
217 };
218 
219 static int
220 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
221 {
222 	uint8_t *new_buf;
223 	struct spdk_pipe *new_pipe;
224 	struct iovec siov[2];
225 	struct iovec diov[2];
226 	int sbytes;
227 	ssize_t bytes;
228 
229 	if (sock->recv_buf_sz == sz) {
230 		return 0;
231 	}
232 
233 	/* If the new size is 0, just free the pipe */
234 	if (sz == 0) {
235 		spdk_pipe_destroy(sock->recv_pipe);
236 		free(sock->recv_buf);
237 		sock->recv_pipe = NULL;
238 		sock->recv_buf = NULL;
239 		return 0;
240 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
241 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
242 		return -1;
243 	}
244 
245 	/* Round up to next 64 byte multiple */
246 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
247 	if (!new_buf) {
248 		SPDK_ERRLOG("socket recv buf allocation failed\n");
249 		return -ENOMEM;
250 	}
251 
252 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
253 	if (new_pipe == NULL) {
254 		SPDK_ERRLOG("socket pipe allocation failed\n");
255 		free(new_buf);
256 		return -ENOMEM;
257 	}
258 
259 	if (sock->recv_pipe != NULL) {
260 		/* Pull all of the data out of the old pipe */
261 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
262 		if (sbytes > sz) {
263 			/* Too much data to fit into the new pipe size */
264 			spdk_pipe_destroy(new_pipe);
265 			free(new_buf);
266 			return -EINVAL;
267 		}
268 
269 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
270 		assert(sbytes == sz);
271 
272 		bytes = spdk_iovcpy(siov, 2, diov, 2);
273 		spdk_pipe_writer_advance(new_pipe, bytes);
274 
275 		spdk_pipe_destroy(sock->recv_pipe);
276 		free(sock->recv_buf);
277 	}
278 
279 	sock->recv_buf_sz = sz;
280 	sock->recv_buf = new_buf;
281 	sock->recv_pipe = new_pipe;
282 
283 	return 0;
284 }
285 
286 static int
287 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
288 {
289 	struct spdk_posix_sock *sock = __posix_sock(_sock);
290 	int rc;
291 
292 	assert(sock != NULL);
293 
294 	if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) {
295 		rc = posix_sock_alloc_pipe(sock, sz);
296 		if (rc) {
297 			return rc;
298 		}
299 	}
300 
301 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
302 	if (sz < MIN_SO_RCVBUF_SIZE) {
303 		sz = MIN_SO_RCVBUF_SIZE;
304 	}
305 
306 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
307 	if (rc < 0) {
308 		return rc;
309 	}
310 
311 	return 0;
312 }
313 
314 static int
315 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
316 {
317 	struct spdk_posix_sock *sock = __posix_sock(_sock);
318 	int rc;
319 
320 	assert(sock != NULL);
321 
322 	if (sz < MIN_SO_SNDBUF_SIZE) {
323 		sz = MIN_SO_SNDBUF_SIZE;
324 	}
325 
326 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
327 	if (rc < 0) {
328 		return rc;
329 	}
330 
331 	return 0;
332 }
333 
334 static struct spdk_posix_sock *
335 posix_sock_alloc(int fd, bool enable_zero_copy)
336 {
337 	struct spdk_posix_sock *sock;
338 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
339 	int flag;
340 	int rc;
341 #endif
342 
343 	sock = calloc(1, sizeof(*sock));
344 	if (sock == NULL) {
345 		SPDK_ERRLOG("sock allocation failed\n");
346 		return NULL;
347 	}
348 
349 	sock->fd = fd;
350 
351 #if defined(SPDK_ZEROCOPY)
352 	flag = 1;
353 
354 	if (enable_zero_copy) {
355 		/* Try to turn on zero copy sends */
356 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
357 		if (rc == 0) {
358 			sock->zcopy = true;
359 		}
360 	}
361 #endif
362 
363 #if defined(__linux__)
364 	flag = 1;
365 
366 	if (g_spdk_posix_sock_impl_opts.enable_quickack) {
367 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
368 		if (rc != 0) {
369 			SPDK_ERRLOG("quickack was failed to set\n");
370 		}
371 	}
372 
373 	spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id,
374 				   &sock->placement_id);
375 
376 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
377 		/* Save placement_id */
378 		spdk_sock_map_insert(&g_map, sock->placement_id, NULL);
379 	}
380 #endif
381 
382 	return sock;
383 }
384 
385 static bool
386 sock_is_loopback(int fd)
387 {
388 	struct ifaddrs *addrs, *tmp;
389 	struct sockaddr_storage sa = {};
390 	socklen_t salen;
391 	struct ifreq ifr = {};
392 	char ip_addr[256], ip_addr_tmp[256];
393 	int rc;
394 	bool is_loopback = false;
395 
396 	salen = sizeof(sa);
397 	rc = getsockname(fd, (struct sockaddr *)&sa, &salen);
398 	if (rc != 0) {
399 		return is_loopback;
400 	}
401 
402 	memset(ip_addr, 0, sizeof(ip_addr));
403 	rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr));
404 	if (rc != 0) {
405 		return is_loopback;
406 	}
407 
408 	getifaddrs(&addrs);
409 	for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) {
410 		if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) &&
411 		    (tmp->ifa_addr->sa_family == sa.ss_family)) {
412 			memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp));
413 			rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp));
414 			if (rc != 0) {
415 				continue;
416 			}
417 
418 			if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) {
419 				memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name));
420 				ioctl(fd, SIOCGIFFLAGS, &ifr);
421 				if (ifr.ifr_flags & IFF_LOOPBACK) {
422 					is_loopback = true;
423 				}
424 				goto end;
425 			}
426 		}
427 	}
428 
429 end:
430 	freeifaddrs(addrs);
431 	return is_loopback;
432 }
433 
434 static struct spdk_sock *
435 posix_sock_create(const char *ip, int port,
436 		  enum posix_sock_create_type type,
437 		  struct spdk_sock_opts *opts)
438 {
439 	struct spdk_posix_sock *sock;
440 	char buf[MAX_TMPBUF];
441 	char portnum[PORTNUMLEN];
442 	char *p;
443 	struct addrinfo hints, *res, *res0;
444 	int fd, flag;
445 	int val = 1;
446 	int rc, sz;
447 	bool enable_zcopy_user_opts = true;
448 	bool enable_zcopy_impl_opts = true;
449 
450 	assert(opts != NULL);
451 
452 	if (ip == NULL) {
453 		return NULL;
454 	}
455 	if (ip[0] == '[') {
456 		snprintf(buf, sizeof(buf), "%s", ip + 1);
457 		p = strchr(buf, ']');
458 		if (p != NULL) {
459 			*p = '\0';
460 		}
461 		ip = (const char *) &buf[0];
462 	}
463 
464 	snprintf(portnum, sizeof portnum, "%d", port);
465 	memset(&hints, 0, sizeof hints);
466 	hints.ai_family = PF_UNSPEC;
467 	hints.ai_socktype = SOCK_STREAM;
468 	hints.ai_flags = AI_NUMERICSERV;
469 	hints.ai_flags |= AI_PASSIVE;
470 	hints.ai_flags |= AI_NUMERICHOST;
471 	rc = getaddrinfo(ip, portnum, &hints, &res0);
472 	if (rc != 0) {
473 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
474 		return NULL;
475 	}
476 
477 	/* try listen */
478 	fd = -1;
479 	for (res = res0; res != NULL; res = res->ai_next) {
480 retry:
481 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
482 		if (fd < 0) {
483 			/* error */
484 			continue;
485 		}
486 
487 		sz = g_spdk_posix_sock_impl_opts.recv_buf_size;
488 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
489 		if (rc) {
490 			/* Not fatal */
491 		}
492 
493 		sz = g_spdk_posix_sock_impl_opts.send_buf_size;
494 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
495 		if (rc) {
496 			/* Not fatal */
497 		}
498 
499 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
500 		if (rc != 0) {
501 			close(fd);
502 			/* error */
503 			continue;
504 		}
505 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
506 		if (rc != 0) {
507 			close(fd);
508 			/* error */
509 			continue;
510 		}
511 
512 #if defined(SO_PRIORITY)
513 		if (opts->priority) {
514 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
515 			if (rc != 0) {
516 				close(fd);
517 				/* error */
518 				continue;
519 			}
520 		}
521 #endif
522 
523 		if (res->ai_family == AF_INET6) {
524 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
525 			if (rc != 0) {
526 				close(fd);
527 				/* error */
528 				continue;
529 			}
530 		}
531 
532 		if (type == SPDK_SOCK_CREATE_LISTEN) {
533 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
534 			if (rc != 0) {
535 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
536 				switch (errno) {
537 				case EINTR:
538 					/* interrupted? */
539 					close(fd);
540 					goto retry;
541 				case EADDRNOTAVAIL:
542 					SPDK_ERRLOG("IP address %s not available. "
543 						    "Verify IP address in config file "
544 						    "and make sure setup script is "
545 						    "run before starting spdk app.\n", ip);
546 				/* FALLTHROUGH */
547 				default:
548 					/* try next family */
549 					close(fd);
550 					fd = -1;
551 					continue;
552 				}
553 			}
554 			/* bind OK */
555 			rc = listen(fd, 512);
556 			if (rc != 0) {
557 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
558 				close(fd);
559 				fd = -1;
560 				break;
561 			}
562 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server &&
563 						 g_spdk_posix_sock_impl_opts.enable_zerocopy_send;
564 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
565 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
566 			if (rc != 0) {
567 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
568 				/* try next family */
569 				close(fd);
570 				fd = -1;
571 				continue;
572 			}
573 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client &&
574 						 g_spdk_posix_sock_impl_opts.enable_zerocopy_send;
575 		}
576 
577 		flag = fcntl(fd, F_GETFL);
578 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
579 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
580 			close(fd);
581 			fd = -1;
582 			break;
583 		}
584 		break;
585 	}
586 	freeaddrinfo(res0);
587 
588 	if (fd < 0) {
589 		return NULL;
590 	}
591 
592 	/* Only enable zero copy for non-loopback sockets. */
593 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd);
594 
595 	sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts);
596 	if (sock == NULL) {
597 		SPDK_ERRLOG("sock allocation failed\n");
598 		close(fd);
599 		return NULL;
600 	}
601 
602 	return &sock->base;
603 }
604 
605 static struct spdk_sock *
606 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
607 {
608 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
609 }
610 
611 static struct spdk_sock *
612 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
613 {
614 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
615 }
616 
617 static struct spdk_sock *
618 posix_sock_accept(struct spdk_sock *_sock)
619 {
620 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
621 	struct sockaddr_storage		sa;
622 	socklen_t			salen;
623 	int				rc, fd;
624 	struct spdk_posix_sock		*new_sock;
625 	int				flag;
626 
627 	memset(&sa, 0, sizeof(sa));
628 	salen = sizeof(sa);
629 
630 	assert(sock != NULL);
631 
632 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
633 
634 	if (rc == -1) {
635 		return NULL;
636 	}
637 
638 	fd = rc;
639 
640 	flag = fcntl(fd, F_GETFL);
641 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
642 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
643 		close(fd);
644 		return NULL;
645 	}
646 
647 #if defined(SO_PRIORITY)
648 	/* The priority is not inherited, so call this function again */
649 	if (sock->base.opts.priority) {
650 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
651 		if (rc != 0) {
652 			close(fd);
653 			return NULL;
654 		}
655 	}
656 #endif
657 
658 	/* Inherit the zero copy feature from the listen socket */
659 	new_sock = posix_sock_alloc(fd, sock->zcopy);
660 	if (new_sock == NULL) {
661 		close(fd);
662 		return NULL;
663 	}
664 
665 	return &new_sock->base;
666 }
667 
668 static int
669 posix_sock_close(struct spdk_sock *_sock)
670 {
671 	struct spdk_posix_sock *sock = __posix_sock(_sock);
672 
673 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
674 
675 	/* If the socket fails to close, the best choice is to
676 	 * leak the fd but continue to free the rest of the sock
677 	 * memory. */
678 	close(sock->fd);
679 
680 	spdk_pipe_destroy(sock->recv_pipe);
681 	free(sock->recv_buf);
682 	free(sock);
683 
684 	return 0;
685 }
686 
687 #ifdef SPDK_ZEROCOPY
688 static int
689 _sock_check_zcopy(struct spdk_sock *sock)
690 {
691 	struct spdk_posix_sock *psock = __posix_sock(sock);
692 	struct msghdr msgh = {};
693 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
694 	ssize_t rc;
695 	struct sock_extended_err *serr;
696 	struct cmsghdr *cm;
697 	uint32_t idx;
698 	struct spdk_sock_request *req, *treq;
699 	bool found;
700 
701 	msgh.msg_control = buf;
702 	msgh.msg_controllen = sizeof(buf);
703 
704 	while (true) {
705 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
706 
707 		if (rc < 0) {
708 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
709 				return 0;
710 			}
711 
712 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
713 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
714 			} else {
715 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
716 			}
717 			return 0;
718 		}
719 
720 		cm = CMSG_FIRSTHDR(&msgh);
721 		if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
722 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
723 			return 0;
724 		}
725 
726 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
727 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
728 			SPDK_WARNLOG("Unexpected extended error origin\n");
729 			return 0;
730 		}
731 
732 		/* Most of the time, the pending_reqs array is in the exact
733 		 * order we need such that all of the requests to complete are
734 		 * in order, in the front. It is guaranteed that all requests
735 		 * belonging to the same sendmsg call are sequential, so once
736 		 * we encounter one match we can stop looping as soon as a
737 		 * non-match is found.
738 		 */
739 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
740 			found = false;
741 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
742 				if (req->internal.offset == idx) {
743 					found = true;
744 
745 					rc = spdk_sock_request_put(sock, req, 0);
746 					if (rc < 0) {
747 						return rc;
748 					}
749 
750 				} else if (found) {
751 					break;
752 				}
753 			}
754 		}
755 	}
756 
757 	return 0;
758 }
759 #endif
760 
761 static int
762 _sock_flush(struct spdk_sock *sock)
763 {
764 	struct spdk_posix_sock *psock = __posix_sock(sock);
765 	struct msghdr msg = {};
766 	int flags;
767 	struct iovec iovs[IOV_BATCH_SIZE];
768 	int iovcnt;
769 	int retval;
770 	struct spdk_sock_request *req;
771 	int i;
772 	ssize_t rc;
773 	unsigned int offset;
774 	size_t len;
775 
776 	/* Can't flush from within a callback or we end up with recursive calls */
777 	if (sock->cb_cnt > 0) {
778 		return 0;
779 	}
780 
781 	iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL);
782 
783 	if (iovcnt == 0) {
784 		return 0;
785 	}
786 
787 	/* Perform the vectored write */
788 	msg.msg_iov = iovs;
789 	msg.msg_iovlen = iovcnt;
790 #ifdef SPDK_ZEROCOPY
791 	if (psock->zcopy) {
792 		flags = MSG_ZEROCOPY;
793 	} else
794 #endif
795 	{
796 		flags = 0;
797 	}
798 	rc = sendmsg(psock->fd, &msg, flags);
799 	if (rc <= 0) {
800 		if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
801 			return 0;
802 		}
803 		return rc;
804 	}
805 
806 	/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
807 	 * req->internal.offset, so sendmsg_idx should not be zero  */
808 	if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
809 		psock->sendmsg_idx = 1;
810 	} else {
811 		psock->sendmsg_idx++;
812 	}
813 
814 	/* Consume the requests that were actually written */
815 	req = TAILQ_FIRST(&sock->queued_reqs);
816 	while (req) {
817 		offset = req->internal.offset;
818 
819 		for (i = 0; i < req->iovcnt; i++) {
820 			/* Advance by the offset first */
821 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
822 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
823 				continue;
824 			}
825 
826 			/* Calculate the remaining length of this element */
827 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
828 
829 			if (len > (size_t)rc) {
830 				/* This element was partially sent. */
831 				req->internal.offset += rc;
832 				return 0;
833 			}
834 
835 			offset = 0;
836 			req->internal.offset += len;
837 			rc -= len;
838 		}
839 
840 		/* Handled a full request. */
841 		spdk_sock_request_pend(sock, req);
842 
843 		if (!psock->zcopy) {
844 			/* The sendmsg syscall above isn't currently asynchronous,
845 			* so it's already done. */
846 			retval = spdk_sock_request_put(sock, req, 0);
847 			if (retval) {
848 				break;
849 			}
850 		} else {
851 			/* Re-use the offset field to hold the sendmsg call index. The
852 			 * index is 0 based, so subtract one here because we've already
853 			 * incremented above. */
854 			req->internal.offset = psock->sendmsg_idx - 1;
855 		}
856 
857 		if (rc == 0) {
858 			break;
859 		}
860 
861 		req = TAILQ_FIRST(&sock->queued_reqs);
862 	}
863 
864 	return 0;
865 }
866 
867 static int
868 posix_sock_flush(struct spdk_sock *sock)
869 {
870 #ifdef SPDK_ZEROCOPY
871 	struct spdk_posix_sock *psock = __posix_sock(sock);
872 
873 	if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) {
874 		_sock_check_zcopy(sock);
875 	}
876 #endif
877 
878 	return _sock_flush(sock);
879 }
880 
881 static ssize_t
882 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
883 {
884 	struct iovec siov[2];
885 	int sbytes;
886 	ssize_t bytes;
887 	struct spdk_posix_sock_group_impl *group;
888 
889 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
890 	if (sbytes < 0) {
891 		errno = EINVAL;
892 		return -1;
893 	} else if (sbytes == 0) {
894 		errno = EAGAIN;
895 		return -1;
896 	}
897 
898 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
899 
900 	if (bytes == 0) {
901 		/* The only way this happens is if diov is 0 length */
902 		errno = EINVAL;
903 		return -1;
904 	}
905 
906 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
907 
908 	/* If we drained the pipe, mark it appropriately */
909 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
910 		assert(sock->pipe_has_data == true);
911 
912 		group = __posix_group_impl(sock->base.group_impl);
913 		if (group && !sock->socket_has_data) {
914 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
915 		}
916 
917 		sock->pipe_has_data = false;
918 	}
919 
920 	return bytes;
921 }
922 
923 static inline ssize_t
924 posix_sock_read(struct spdk_posix_sock *sock)
925 {
926 	struct iovec iov[2];
927 	int bytes_avail, bytes_recvd;
928 	struct spdk_posix_sock_group_impl *group;
929 
930 	bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
931 
932 	if (bytes_avail <= 0) {
933 		return bytes_avail;
934 	}
935 
936 	bytes_recvd = readv(sock->fd, iov, 2);
937 
938 	assert(sock->pipe_has_data == false);
939 
940 	if (bytes_recvd <= 0) {
941 		/* Errors count as draining the socket data */
942 		if (sock->base.group_impl && sock->socket_has_data) {
943 			group = __posix_group_impl(sock->base.group_impl);
944 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
945 		}
946 
947 		sock->socket_has_data = false;
948 
949 		return bytes_recvd;
950 	}
951 
952 	spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);
953 
954 #if DEBUG
955 	if (sock->base.group_impl) {
956 		assert(sock->socket_has_data == true);
957 	}
958 #endif
959 
960 	sock->pipe_has_data = true;
961 	if (bytes_recvd < bytes_avail) {
962 		/* We drained the kernel socket entirely. */
963 		sock->socket_has_data = false;
964 	}
965 
966 	return bytes_recvd;
967 }
968 
969 static ssize_t
970 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
971 {
972 	struct spdk_posix_sock *sock = __posix_sock(_sock);
973 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
974 	int rc, i;
975 	size_t len;
976 
977 	if (sock->recv_pipe == NULL) {
978 		assert(sock->pipe_has_data == false);
979 		if (group && sock->socket_has_data) {
980 			sock->socket_has_data = false;
981 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
982 		}
983 		return readv(sock->fd, iov, iovcnt);
984 	}
985 
986 	/* If the socket is not in a group, we must assume it always has
987 	 * data waiting for us because it is not epolled */
988 	if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) {
989 		/* If the user is receiving a sufficiently large amount of data,
990 		 * receive directly to their buffers. */
991 		len = 0;
992 		for (i = 0; i < iovcnt; i++) {
993 			len += iov[i].iov_len;
994 		}
995 
996 		if (len >= MIN_SOCK_PIPE_SIZE) {
997 			/* TODO: Should this detect if kernel socket is drained? */
998 			return readv(sock->fd, iov, iovcnt);
999 		}
1000 
1001 		/* Otherwise, do a big read into our pipe */
1002 		rc = posix_sock_read(sock);
1003 		if (rc <= 0) {
1004 			return rc;
1005 		}
1006 	}
1007 
1008 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
1009 }
1010 
1011 static ssize_t
1012 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
1013 {
1014 	struct iovec iov[1];
1015 
1016 	iov[0].iov_base = buf;
1017 	iov[0].iov_len = len;
1018 
1019 	return posix_sock_readv(sock, iov, 1);
1020 }
1021 
1022 static ssize_t
1023 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1024 {
1025 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1026 	int rc;
1027 
1028 	/* In order to process a writev, we need to flush any asynchronous writes
1029 	 * first. */
1030 	rc = _sock_flush(_sock);
1031 	if (rc < 0) {
1032 		return rc;
1033 	}
1034 
1035 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
1036 		/* We weren't able to flush all requests */
1037 		errno = EAGAIN;
1038 		return -1;
1039 	}
1040 
1041 	return writev(sock->fd, iov, iovcnt);
1042 }
1043 
1044 static void
1045 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1046 {
1047 	int rc;
1048 
1049 	spdk_sock_request_queue(sock, req);
1050 
1051 	/* If there are a sufficient number queued, just flush them out immediately. */
1052 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1053 		rc = _sock_flush(sock);
1054 		if (rc) {
1055 			spdk_sock_abort_requests(sock);
1056 		}
1057 	}
1058 }
1059 
1060 static int
1061 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1062 {
1063 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1064 	int val;
1065 	int rc;
1066 
1067 	assert(sock != NULL);
1068 
1069 	val = nbytes;
1070 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1071 	if (rc != 0) {
1072 		return -1;
1073 	}
1074 	return 0;
1075 }
1076 
1077 static bool
1078 posix_sock_is_ipv6(struct spdk_sock *_sock)
1079 {
1080 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1081 	struct sockaddr_storage sa;
1082 	socklen_t salen;
1083 	int rc;
1084 
1085 	assert(sock != NULL);
1086 
1087 	memset(&sa, 0, sizeof sa);
1088 	salen = sizeof sa;
1089 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1090 	if (rc != 0) {
1091 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1092 		return false;
1093 	}
1094 
1095 	return (sa.ss_family == AF_INET6);
1096 }
1097 
1098 static bool
1099 posix_sock_is_ipv4(struct spdk_sock *_sock)
1100 {
1101 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1102 	struct sockaddr_storage sa;
1103 	socklen_t salen;
1104 	int rc;
1105 
1106 	assert(sock != NULL);
1107 
1108 	memset(&sa, 0, sizeof sa);
1109 	salen = sizeof sa;
1110 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1111 	if (rc != 0) {
1112 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1113 		return false;
1114 	}
1115 
1116 	return (sa.ss_family == AF_INET);
1117 }
1118 
1119 static bool
1120 posix_sock_is_connected(struct spdk_sock *_sock)
1121 {
1122 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1123 	uint8_t byte;
1124 	int rc;
1125 
1126 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1127 	if (rc == 0) {
1128 		return false;
1129 	}
1130 
1131 	if (rc < 0) {
1132 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1133 			return true;
1134 		}
1135 
1136 		return false;
1137 	}
1138 
1139 	return true;
1140 }
1141 
1142 static struct spdk_sock_group_impl *
1143 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock)
1144 {
1145 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1146 	struct spdk_sock_group_impl *group_impl;
1147 
1148 	if (sock->placement_id != -1) {
1149 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl);
1150 		return group_impl;
1151 	}
1152 
1153 	return NULL;
1154 }
1155 
1156 static struct spdk_sock_group_impl *
1157 posix_sock_group_impl_create(void)
1158 {
1159 	struct spdk_posix_sock_group_impl *group_impl;
1160 	int fd;
1161 
1162 #if defined(SPDK_EPOLL)
1163 	fd = epoll_create1(0);
1164 #elif defined(SPDK_KEVENT)
1165 	fd = kqueue();
1166 #endif
1167 	if (fd == -1) {
1168 		return NULL;
1169 	}
1170 
1171 	group_impl = calloc(1, sizeof(*group_impl));
1172 	if (group_impl == NULL) {
1173 		SPDK_ERRLOG("group_impl allocation failed\n");
1174 		close(fd);
1175 		return NULL;
1176 	}
1177 
1178 	group_impl->fd = fd;
1179 	TAILQ_INIT(&group_impl->socks_with_data);
1180 	group_impl->placement_id = -1;
1181 
1182 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1183 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1184 		group_impl->placement_id = spdk_env_get_current_core();
1185 	}
1186 
1187 	return &group_impl->base;
1188 }
1189 
1190 static void
1191 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock,
1192 		int placement_id)
1193 {
1194 #if defined(SO_MARK)
1195 	int rc;
1196 
1197 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK,
1198 			&placement_id, sizeof(placement_id));
1199 	if (rc != 0) {
1200 		/* Not fatal */
1201 		SPDK_ERRLOG("Error setting SO_MARK\n");
1202 		return;
1203 	}
1204 
1205 	rc = spdk_sock_map_insert(&g_map, placement_id, &group->base);
1206 	if (rc != 0) {
1207 		/* Not fatal */
1208 		SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1209 		return;
1210 	}
1211 
1212 	sock->placement_id = placement_id;
1213 #endif
1214 }
1215 
1216 static void
1217 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1218 {
1219 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1220 
1221 	if (group->placement_id == -1) {
1222 		group->placement_id = spdk_sock_map_find_free(&g_map);
1223 
1224 		/* If a free placement id is found, update existing sockets in this group */
1225 		if (group->placement_id != -1) {
1226 			struct spdk_sock  *sock, *tmp;
1227 
1228 			TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1229 				posix_sock_mark(group, __posix_sock(sock), group->placement_id);
1230 			}
1231 		}
1232 	}
1233 
1234 	if (group->placement_id != -1) {
1235 		/*
1236 		 * group placement id is already determined for this poll group.
1237 		 * Mark socket with group's placement id.
1238 		 */
1239 		posix_sock_mark(group, __posix_sock(_sock), group->placement_id);
1240 	}
1241 }
1242 
1243 static int
1244 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1245 {
1246 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1247 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1248 	int rc;
1249 
1250 #if defined(SPDK_EPOLL)
1251 	struct epoll_event event;
1252 
1253 	memset(&event, 0, sizeof(event));
1254 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1255 	event.events = EPOLLIN | EPOLLERR;
1256 	event.data.ptr = sock;
1257 
1258 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1259 #elif defined(SPDK_KEVENT)
1260 	struct kevent event;
1261 	struct timespec ts = {0};
1262 
1263 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1264 
1265 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1266 #endif
1267 
1268 	if (rc != 0) {
1269 		return rc;
1270 	}
1271 
1272 	/* switched from another polling group due to scheduling */
1273 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1274 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1275 		sock->pipe_has_data = true;
1276 		sock->socket_has_data = false;
1277 		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
1278 	}
1279 
1280 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
1281 		posix_sock_update_mark(_group, _sock);
1282 	} else if (sock->placement_id != -1) {
1283 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1284 		if (rc != 0) {
1285 			SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1286 			/* Do not treat this as an error. The system will continue running. */
1287 		}
1288 	}
1289 
1290 	return rc;
1291 }
1292 
1293 static int
1294 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1295 {
1296 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1297 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1298 	int rc;
1299 
1300 	if (sock->pipe_has_data || sock->socket_has_data) {
1301 		TAILQ_REMOVE(&group->socks_with_data, sock, link);
1302 		sock->pipe_has_data = false;
1303 		sock->socket_has_data = false;
1304 	}
1305 
1306 	if (sock->placement_id != -1) {
1307 		spdk_sock_map_release(&g_map, sock->placement_id);
1308 	}
1309 
1310 #if defined(SPDK_EPOLL)
1311 	struct epoll_event event;
1312 
1313 	/* Event parameter is ignored but some old kernel version still require it. */
1314 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1315 #elif defined(SPDK_KEVENT)
1316 	struct kevent event;
1317 	struct timespec ts = {0};
1318 
1319 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1320 
1321 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1322 	if (rc == 0 && event.flags & EV_ERROR) {
1323 		rc = -1;
1324 		errno = event.data;
1325 	}
1326 #endif
1327 
1328 	spdk_sock_abort_requests(_sock);
1329 
1330 	return rc;
1331 }
1332 
1333 static int
1334 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1335 			   struct spdk_sock **socks)
1336 {
1337 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1338 	struct spdk_sock *sock, *tmp;
1339 	int num_events, i, rc;
1340 	struct spdk_posix_sock *psock, *ptmp;
1341 #if defined(SPDK_EPOLL)
1342 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1343 #elif defined(SPDK_KEVENT)
1344 	struct kevent events[MAX_EVENTS_PER_POLL];
1345 	struct timespec ts = {0};
1346 #endif
1347 
1348 #ifdef SPDK_ZEROCOPY
1349 	/* When all of the following conditions are met
1350 	 * - non-blocking socket
1351 	 * - zero copy is enabled
1352 	 * - interrupts suppressed (i.e. busy polling)
1353 	 * - the NIC tx queue is full at the time sendmsg() is called
1354 	 * - epoll_wait determines there is an EPOLLIN event for the socket
1355 	 * then we can get into a situation where data we've sent is queued
1356 	 * up in the kernel network stack, but interrupts have been suppressed
1357 	 * because other traffic is flowing so the kernel misses the signal
1358 	 * to flush the software tx queue. If there wasn't incoming data
1359 	 * pending on the socket, then epoll_wait would have been sufficient
1360 	 * to kick off the send operation, but since there is a pending event
1361 	 * epoll_wait does not trigger the necessary operation.
1362 	 *
1363 	 * We deal with this by checking for all of the above conditions and
1364 	 * additionally looking for EPOLLIN events that were not consumed from
1365 	 * the last poll loop. We take this to mean that the upper layer is
1366 	 * unable to consume them because it is blocked waiting for resources
1367 	 * to free up, and those resources are most likely freed in response
1368 	 * to a pending asynchronous write completing.
1369 	 *
1370 	 * Additionally, sockets that have the same placement_id actually share
1371 	 * an underlying hardware queue. That means polling one of them is
1372 	 * equivalent to polling all of them. As a quick mechanism to avoid
1373 	 * making extra poll() calls, stash the last placement_id during the loop
1374 	 * and only poll if it's not the same. The overwhelmingly common case
1375 	 * is that all sockets in this list have the same placement_id because
1376 	 * SPDK is intentionally grouping sockets by that value, so even
1377 	 * though this won't stop all extra calls to poll(), it's very fast
1378 	 * and will catch all of them in practice.
1379 	 */
1380 	int last_placement_id = -1;
1381 
1382 	TAILQ_FOREACH(psock, &group->socks_with_data, link) {
1383 		if (psock->zcopy && psock->placement_id >= 0 &&
1384 		    psock->placement_id != last_placement_id) {
1385 			struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0};
1386 
1387 			poll(&pfd, 1, 0);
1388 			last_placement_id = psock->placement_id;
1389 		}
1390 	}
1391 #endif
1392 
1393 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1394 	 * a completion callback could remove the sock from the
1395 	 * group. */
1396 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1397 		rc = _sock_flush(sock);
1398 		if (rc) {
1399 			spdk_sock_abort_requests(sock);
1400 		}
1401 	}
1402 
1403 	assert(max_events > 0);
1404 
1405 #if defined(SPDK_EPOLL)
1406 	num_events = epoll_wait(group->fd, events, max_events, 0);
1407 #elif defined(SPDK_KEVENT)
1408 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1409 #endif
1410 
1411 	if (num_events == -1) {
1412 		return -1;
1413 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
1414 		sock = TAILQ_FIRST(&_group->socks);
1415 		psock = __posix_sock(sock);
1416 		/* poll() is called here to busy poll the queue associated with
1417 		 * first socket in list and potentially reap incoming data.
1418 		 */
1419 		if (sock->opts.priority) {
1420 			struct pollfd pfd = {0, 0, 0};
1421 
1422 			pfd.fd = psock->fd;
1423 			pfd.events = POLLIN | POLLERR;
1424 			poll(&pfd, 1, 0);
1425 		}
1426 	}
1427 
1428 	for (i = 0; i < num_events; i++) {
1429 #if defined(SPDK_EPOLL)
1430 		sock = events[i].data.ptr;
1431 		psock = __posix_sock(sock);
1432 
1433 #ifdef SPDK_ZEROCOPY
1434 		if (events[i].events & EPOLLERR) {
1435 			rc = _sock_check_zcopy(sock);
1436 			/* If the socket was closed or removed from
1437 			 * the group in response to a send ack, don't
1438 			 * add it to the array here. */
1439 			if (rc || sock->cb_fn == NULL) {
1440 				continue;
1441 			}
1442 		}
1443 #endif
1444 		if ((events[i].events & EPOLLIN) == 0) {
1445 			continue;
1446 		}
1447 
1448 #elif defined(SPDK_KEVENT)
1449 		sock = events[i].udata;
1450 		psock = __posix_sock(sock);
1451 #endif
1452 
1453 		/* If the socket is not already in the list, add it now */
1454 		if (!psock->socket_has_data && !psock->pipe_has_data) {
1455 			TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link);
1456 		}
1457 		psock->socket_has_data = true;
1458 	}
1459 
1460 	num_events = 0;
1461 
1462 	TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) {
1463 		if (num_events == max_events) {
1464 			break;
1465 		}
1466 
1467 		/* If the socket's cb_fn is NULL, just remove it from the
1468 		 * list and do not add it to socks array */
1469 		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
1470 			psock->socket_has_data = false;
1471 			psock->pipe_has_data = false;
1472 			TAILQ_REMOVE(&group->socks_with_data, psock, link);
1473 			continue;
1474 		}
1475 
1476 		socks[num_events++] = &psock->base;
1477 	}
1478 
1479 	/* Cycle the has_data list so that each time we poll things aren't
1480 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1481 	 * A B C D E F
1482 	 * And all 6 sockets had epoll events, but max_events is only 3. That means
1483 	 * psock currently points at D. We want to rearrange the list to the following:
1484 	 * D E F A B C
1485 	 *
1486 	 * The variables below are named according to this example to make it easier to
1487 	 * follow the swaps.
1488 	 */
1489 	if (psock != NULL) {
1490 		struct spdk_posix_sock *pa, *pc, *pd, *pf;
1491 
1492 		/* Capture pointers to the elements we need */
1493 		pd = psock;
1494 		pc = TAILQ_PREV(pd, spdk_has_data_list, link);
1495 		pa = TAILQ_FIRST(&group->socks_with_data);
1496 		pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list);
1497 
1498 		/* Break the link between C and D */
1499 		pc->link.tqe_next = NULL;
1500 		pd->link.tqe_prev = NULL;
1501 
1502 		/* Connect F to A */
1503 		pf->link.tqe_next = pa;
1504 		pa->link.tqe_prev = &pf->link.tqe_next;
1505 
1506 		/* Fix up the list first/last pointers */
1507 		group->socks_with_data.tqh_first = pd;
1508 		group->socks_with_data.tqh_last = &pc->link.tqe_next;
1509 	}
1510 
1511 	return num_events;
1512 }
1513 
1514 static int
1515 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1516 {
1517 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1518 	int rc;
1519 
1520 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1521 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1522 	}
1523 
1524 	rc = close(group->fd);
1525 	free(group);
1526 	return rc;
1527 }
1528 
1529 static int
1530 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1531 {
1532 	if (!opts || !len) {
1533 		errno = EINVAL;
1534 		return -1;
1535 	}
1536 	memset(opts, 0, *len);
1537 
1538 #define FIELD_OK(field) \
1539 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1540 
1541 #define GET_FIELD(field) \
1542 	if (FIELD_OK(field)) { \
1543 		opts->field = g_spdk_posix_sock_impl_opts.field; \
1544 	}
1545 
1546 	GET_FIELD(recv_buf_size);
1547 	GET_FIELD(send_buf_size);
1548 	GET_FIELD(enable_recv_pipe);
1549 	GET_FIELD(enable_zerocopy_send);
1550 	GET_FIELD(enable_quickack);
1551 	GET_FIELD(enable_placement_id);
1552 	GET_FIELD(enable_zerocopy_send_server);
1553 	GET_FIELD(enable_zerocopy_send_client);
1554 
1555 #undef GET_FIELD
1556 #undef FIELD_OK
1557 
1558 	*len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
1559 	return 0;
1560 }
1561 
1562 static int
1563 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1564 {
1565 	if (!opts) {
1566 		errno = EINVAL;
1567 		return -1;
1568 	}
1569 
1570 #define FIELD_OK(field) \
1571 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1572 
1573 #define SET_FIELD(field) \
1574 	if (FIELD_OK(field)) { \
1575 		g_spdk_posix_sock_impl_opts.field = opts->field; \
1576 	}
1577 
1578 	SET_FIELD(recv_buf_size);
1579 	SET_FIELD(send_buf_size);
1580 	SET_FIELD(enable_recv_pipe);
1581 	SET_FIELD(enable_zerocopy_send);
1582 	SET_FIELD(enable_quickack);
1583 	SET_FIELD(enable_placement_id);
1584 	SET_FIELD(enable_zerocopy_send_server);
1585 	SET_FIELD(enable_zerocopy_send_client);
1586 
1587 #undef SET_FIELD
1588 #undef FIELD_OK
1589 
1590 	return 0;
1591 }
1592 
1593 
1594 static struct spdk_net_impl g_posix_net_impl = {
1595 	.name		= "posix",
1596 	.getaddr	= posix_sock_getaddr,
1597 	.connect	= posix_sock_connect,
1598 	.listen		= posix_sock_listen,
1599 	.accept		= posix_sock_accept,
1600 	.close		= posix_sock_close,
1601 	.recv		= posix_sock_recv,
1602 	.readv		= posix_sock_readv,
1603 	.writev		= posix_sock_writev,
1604 	.writev_async	= posix_sock_writev_async,
1605 	.flush		= posix_sock_flush,
1606 	.set_recvlowat	= posix_sock_set_recvlowat,
1607 	.set_recvbuf	= posix_sock_set_recvbuf,
1608 	.set_sendbuf	= posix_sock_set_sendbuf,
1609 	.is_ipv6	= posix_sock_is_ipv6,
1610 	.is_ipv4	= posix_sock_is_ipv4,
1611 	.is_connected	= posix_sock_is_connected,
1612 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
1613 	.group_impl_create	= posix_sock_group_impl_create,
1614 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
1615 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
1616 	.group_impl_poll	= posix_sock_group_impl_poll,
1617 	.group_impl_close	= posix_sock_group_impl_close,
1618 	.get_opts	= posix_sock_impl_get_opts,
1619 	.set_opts	= posix_sock_impl_set_opts,
1620 };
1621 
1622 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1623