xref: /spdk/module/sock/posix/posix.c (revision c4fafdb2158529f43d225fec5e3509bb0c5ba0c7)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation. All rights reserved.
5  *   Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #if defined(__FreeBSD__)
37 #include <sys/event.h>
38 #define SPDK_KEVENT
39 #else
40 #include <sys/epoll.h>
41 #define SPDK_EPOLL
42 #endif
43 
44 #if defined(__linux__)
45 #include <linux/errqueue.h>
46 #endif
47 
48 #include "spdk/env.h"
49 #include "spdk/log.h"
50 #include "spdk/pipe.h"
51 #include "spdk/sock.h"
52 #include "spdk/util.h"
53 #include "spdk_internal/sock.h"
54 
55 #define MAX_TMPBUF 1024
56 #define PORTNUMLEN 32
57 
58 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
59 #define SPDK_ZEROCOPY
60 #endif
61 
62 struct spdk_posix_sock {
63 	struct spdk_sock	base;
64 	int			fd;
65 
66 	uint32_t		sendmsg_idx;
67 
68 	struct spdk_pipe	*recv_pipe;
69 	void			*recv_buf;
70 	int			recv_buf_sz;
71 	bool			pipe_has_data;
72 	bool			socket_has_data;
73 	bool			zcopy;
74 
75 	int			placement_id;
76 
77 	TAILQ_ENTRY(spdk_posix_sock)	link;
78 };
79 
80 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock);
81 
82 struct spdk_posix_sock_group_impl {
83 	struct spdk_sock_group_impl	base;
84 	int				fd;
85 	struct spdk_has_data_list	socks_with_data;
86 	int				placement_id;
87 };
88 
89 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
90 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
91 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
92 	.enable_recv_pipe = true,
93 	.enable_zerocopy_send = true,
94 	.enable_quickack = false,
95 	.enable_placement_id = PLACEMENT_NONE,
96 	.enable_zerocopy_send_server = true,
97 	.enable_zerocopy_send_client = false
98 };
99 
100 static struct spdk_sock_map g_map = {
101 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
102 	.mtx = PTHREAD_MUTEX_INITIALIZER
103 };
104 
105 __attribute((destructor)) static void
106 posix_sock_map_cleanup(void)
107 {
108 	spdk_sock_map_cleanup(&g_map);
109 }
110 
111 static int
112 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
113 {
114 	const char *result = NULL;
115 
116 	if (sa == NULL || host == NULL) {
117 		return -1;
118 	}
119 
120 	switch (sa->sa_family) {
121 	case AF_INET:
122 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
123 				   host, hlen);
124 		break;
125 	case AF_INET6:
126 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
127 				   host, hlen);
128 		break;
129 	default:
130 		break;
131 	}
132 
133 	if (result != NULL) {
134 		return 0;
135 	} else {
136 		return -1;
137 	}
138 }
139 
140 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
141 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
142 
143 static int
144 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
145 		   char *caddr, int clen, uint16_t *cport)
146 {
147 	struct spdk_posix_sock *sock = __posix_sock(_sock);
148 	struct sockaddr_storage sa;
149 	socklen_t salen;
150 	int rc;
151 
152 	assert(sock != NULL);
153 
154 	memset(&sa, 0, sizeof sa);
155 	salen = sizeof sa;
156 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
157 	if (rc != 0) {
158 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
159 		return -1;
160 	}
161 
162 	switch (sa.ss_family) {
163 	case AF_UNIX:
164 		/* Acceptable connection types that don't have IPs */
165 		return 0;
166 	case AF_INET:
167 	case AF_INET6:
168 		/* Code below will get IP addresses */
169 		break;
170 	default:
171 		/* Unsupported socket family */
172 		return -1;
173 	}
174 
175 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
176 	if (rc != 0) {
177 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
178 		return -1;
179 	}
180 
181 	if (sport) {
182 		if (sa.ss_family == AF_INET) {
183 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
184 		} else if (sa.ss_family == AF_INET6) {
185 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
186 		}
187 	}
188 
189 	memset(&sa, 0, sizeof sa);
190 	salen = sizeof sa;
191 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
192 	if (rc != 0) {
193 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
194 		return -1;
195 	}
196 
197 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
198 	if (rc != 0) {
199 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
200 		return -1;
201 	}
202 
203 	if (cport) {
204 		if (sa.ss_family == AF_INET) {
205 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
206 		} else if (sa.ss_family == AF_INET6) {
207 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
208 		}
209 	}
210 
211 	return 0;
212 }
213 
214 enum posix_sock_create_type {
215 	SPDK_SOCK_CREATE_LISTEN,
216 	SPDK_SOCK_CREATE_CONNECT,
217 };
218 
219 static int
220 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
221 {
222 	uint8_t *new_buf;
223 	struct spdk_pipe *new_pipe;
224 	struct iovec siov[2];
225 	struct iovec diov[2];
226 	int sbytes;
227 	ssize_t bytes;
228 
229 	if (sock->recv_buf_sz == sz) {
230 		return 0;
231 	}
232 
233 	/* If the new size is 0, just free the pipe */
234 	if (sz == 0) {
235 		spdk_pipe_destroy(sock->recv_pipe);
236 		free(sock->recv_buf);
237 		sock->recv_pipe = NULL;
238 		sock->recv_buf = NULL;
239 		return 0;
240 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
241 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
242 		return -1;
243 	}
244 
245 	/* Round up to next 64 byte multiple */
246 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
247 	if (!new_buf) {
248 		SPDK_ERRLOG("socket recv buf allocation failed\n");
249 		return -ENOMEM;
250 	}
251 
252 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
253 	if (new_pipe == NULL) {
254 		SPDK_ERRLOG("socket pipe allocation failed\n");
255 		free(new_buf);
256 		return -ENOMEM;
257 	}
258 
259 	if (sock->recv_pipe != NULL) {
260 		/* Pull all of the data out of the old pipe */
261 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
262 		if (sbytes > sz) {
263 			/* Too much data to fit into the new pipe size */
264 			spdk_pipe_destroy(new_pipe);
265 			free(new_buf);
266 			return -EINVAL;
267 		}
268 
269 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
270 		assert(sbytes == sz);
271 
272 		bytes = spdk_iovcpy(siov, 2, diov, 2);
273 		spdk_pipe_writer_advance(new_pipe, bytes);
274 
275 		spdk_pipe_destroy(sock->recv_pipe);
276 		free(sock->recv_buf);
277 	}
278 
279 	sock->recv_buf_sz = sz;
280 	sock->recv_buf = new_buf;
281 	sock->recv_pipe = new_pipe;
282 
283 	return 0;
284 }
285 
286 static int
287 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
288 {
289 	struct spdk_posix_sock *sock = __posix_sock(_sock);
290 	int rc;
291 
292 	assert(sock != NULL);
293 
294 	if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) {
295 		rc = posix_sock_alloc_pipe(sock, sz);
296 		if (rc) {
297 			return rc;
298 		}
299 	}
300 
301 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
302 	if (sz < MIN_SO_RCVBUF_SIZE) {
303 		sz = MIN_SO_RCVBUF_SIZE;
304 	}
305 
306 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
307 	if (rc < 0) {
308 		return rc;
309 	}
310 
311 	return 0;
312 }
313 
314 static int
315 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
316 {
317 	struct spdk_posix_sock *sock = __posix_sock(_sock);
318 	int rc;
319 
320 	assert(sock != NULL);
321 
322 	if (sz < MIN_SO_SNDBUF_SIZE) {
323 		sz = MIN_SO_SNDBUF_SIZE;
324 	}
325 
326 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
327 	if (rc < 0) {
328 		return rc;
329 	}
330 
331 	return 0;
332 }
333 
334 static struct spdk_posix_sock *
335 posix_sock_alloc(int fd, bool enable_zero_copy)
336 {
337 	struct spdk_posix_sock *sock;
338 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
339 	int flag;
340 	int rc;
341 #endif
342 
343 	sock = calloc(1, sizeof(*sock));
344 	if (sock == NULL) {
345 		SPDK_ERRLOG("sock allocation failed\n");
346 		return NULL;
347 	}
348 
349 	sock->fd = fd;
350 
351 #if defined(SPDK_ZEROCOPY)
352 	flag = 1;
353 
354 	if (enable_zero_copy) {
355 		/* Try to turn on zero copy sends */
356 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
357 		if (rc == 0) {
358 			sock->zcopy = true;
359 		}
360 	}
361 #endif
362 
363 #if defined(__linux__)
364 	flag = 1;
365 
366 	if (g_spdk_posix_sock_impl_opts.enable_quickack) {
367 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
368 		if (rc != 0) {
369 			SPDK_ERRLOG("quickack was failed to set\n");
370 		}
371 	}
372 
373 	spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id,
374 				   &sock->placement_id);
375 
376 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
377 		/* Save placement_id */
378 		spdk_sock_map_insert(&g_map, sock->placement_id, NULL);
379 	}
380 #endif
381 
382 	return sock;
383 }
384 
385 static bool
386 sock_is_loopback(int fd)
387 {
388 	struct ifaddrs *addrs, *tmp;
389 	struct sockaddr_storage sa = {};
390 	socklen_t salen;
391 	struct ifreq ifr = {};
392 	char ip_addr[256], ip_addr_tmp[256];
393 	int rc;
394 	bool is_loopback = false;
395 
396 	salen = sizeof(sa);
397 	rc = getsockname(fd, (struct sockaddr *)&sa, &salen);
398 	if (rc != 0) {
399 		return is_loopback;
400 	}
401 
402 	memset(ip_addr, 0, sizeof(ip_addr));
403 	rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr));
404 	if (rc != 0) {
405 		return is_loopback;
406 	}
407 
408 	getifaddrs(&addrs);
409 	for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) {
410 		if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) &&
411 		    (tmp->ifa_addr->sa_family == sa.ss_family)) {
412 			memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp));
413 			rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp));
414 			if (rc != 0) {
415 				continue;
416 			}
417 
418 			if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) {
419 				memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name));
420 				ioctl(fd, SIOCGIFFLAGS, &ifr);
421 				if (ifr.ifr_flags & IFF_LOOPBACK) {
422 					is_loopback = true;
423 				}
424 				goto end;
425 			}
426 		}
427 	}
428 
429 end:
430 	freeifaddrs(addrs);
431 	return is_loopback;
432 }
433 
434 static struct spdk_sock *
435 posix_sock_create(const char *ip, int port,
436 		  enum posix_sock_create_type type,
437 		  struct spdk_sock_opts *opts)
438 {
439 	struct spdk_posix_sock *sock;
440 	char buf[MAX_TMPBUF];
441 	char portnum[PORTNUMLEN];
442 	char *p;
443 	struct addrinfo hints, *res, *res0;
444 	int fd, flag;
445 	int val = 1;
446 	int rc, sz;
447 	bool enable_zcopy_user_opts = true;
448 	bool enable_zcopy_impl_opts = true;
449 
450 	assert(opts != NULL);
451 
452 	if (ip == NULL) {
453 		return NULL;
454 	}
455 	if (ip[0] == '[') {
456 		snprintf(buf, sizeof(buf), "%s", ip + 1);
457 		p = strchr(buf, ']');
458 		if (p != NULL) {
459 			*p = '\0';
460 		}
461 		ip = (const char *) &buf[0];
462 	}
463 
464 	snprintf(portnum, sizeof portnum, "%d", port);
465 	memset(&hints, 0, sizeof hints);
466 	hints.ai_family = PF_UNSPEC;
467 	hints.ai_socktype = SOCK_STREAM;
468 	hints.ai_flags = AI_NUMERICSERV;
469 	hints.ai_flags |= AI_PASSIVE;
470 	hints.ai_flags |= AI_NUMERICHOST;
471 	rc = getaddrinfo(ip, portnum, &hints, &res0);
472 	if (rc != 0) {
473 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
474 		return NULL;
475 	}
476 
477 	/* try listen */
478 	fd = -1;
479 	for (res = res0; res != NULL; res = res->ai_next) {
480 retry:
481 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
482 		if (fd < 0) {
483 			/* error */
484 			continue;
485 		}
486 
487 		sz = g_spdk_posix_sock_impl_opts.recv_buf_size;
488 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
489 		if (rc) {
490 			/* Not fatal */
491 		}
492 
493 		sz = g_spdk_posix_sock_impl_opts.send_buf_size;
494 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
495 		if (rc) {
496 			/* Not fatal */
497 		}
498 
499 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
500 		if (rc != 0) {
501 			close(fd);
502 			/* error */
503 			continue;
504 		}
505 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
506 		if (rc != 0) {
507 			close(fd);
508 			/* error */
509 			continue;
510 		}
511 
512 #if defined(SO_PRIORITY)
513 		if (opts->priority) {
514 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
515 			if (rc != 0) {
516 				close(fd);
517 				/* error */
518 				continue;
519 			}
520 		}
521 #endif
522 
523 		if (res->ai_family == AF_INET6) {
524 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
525 			if (rc != 0) {
526 				close(fd);
527 				/* error */
528 				continue;
529 			}
530 		}
531 
532 		if (type == SPDK_SOCK_CREATE_LISTEN) {
533 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
534 			if (rc != 0) {
535 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
536 				switch (errno) {
537 				case EINTR:
538 					/* interrupted? */
539 					close(fd);
540 					goto retry;
541 				case EADDRNOTAVAIL:
542 					SPDK_ERRLOG("IP address %s not available. "
543 						    "Verify IP address in config file "
544 						    "and make sure setup script is "
545 						    "run before starting spdk app.\n", ip);
546 				/* FALLTHROUGH */
547 				default:
548 					/* try next family */
549 					close(fd);
550 					fd = -1;
551 					continue;
552 				}
553 			}
554 			/* bind OK */
555 			rc = listen(fd, 512);
556 			if (rc != 0) {
557 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
558 				close(fd);
559 				fd = -1;
560 				break;
561 			}
562 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server &&
563 						 g_spdk_posix_sock_impl_opts.enable_zerocopy_send;
564 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
565 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
566 			if (rc != 0) {
567 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
568 				/* try next family */
569 				close(fd);
570 				fd = -1;
571 				continue;
572 			}
573 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client &&
574 						 g_spdk_posix_sock_impl_opts.enable_zerocopy_send;
575 		}
576 
577 		flag = fcntl(fd, F_GETFL);
578 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
579 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
580 			close(fd);
581 			fd = -1;
582 			break;
583 		}
584 		break;
585 	}
586 	freeaddrinfo(res0);
587 
588 	if (fd < 0) {
589 		return NULL;
590 	}
591 
592 	/* Only enable zero copy for non-loopback sockets. */
593 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd);
594 
595 	sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts);
596 	if (sock == NULL) {
597 		SPDK_ERRLOG("sock allocation failed\n");
598 		close(fd);
599 		return NULL;
600 	}
601 
602 	return &sock->base;
603 }
604 
605 static struct spdk_sock *
606 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
607 {
608 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
609 }
610 
611 static struct spdk_sock *
612 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
613 {
614 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
615 }
616 
617 static struct spdk_sock *
618 posix_sock_accept(struct spdk_sock *_sock)
619 {
620 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
621 	struct sockaddr_storage		sa;
622 	socklen_t			salen;
623 	int				rc, fd;
624 	struct spdk_posix_sock		*new_sock;
625 	int				flag;
626 
627 	memset(&sa, 0, sizeof(sa));
628 	salen = sizeof(sa);
629 
630 	assert(sock != NULL);
631 
632 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
633 
634 	if (rc == -1) {
635 		return NULL;
636 	}
637 
638 	fd = rc;
639 
640 	flag = fcntl(fd, F_GETFL);
641 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
642 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
643 		close(fd);
644 		return NULL;
645 	}
646 
647 #if defined(SO_PRIORITY)
648 	/* The priority is not inherited, so call this function again */
649 	if (sock->base.opts.priority) {
650 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
651 		if (rc != 0) {
652 			close(fd);
653 			return NULL;
654 		}
655 	}
656 #endif
657 
658 	/* Inherit the zero copy feature from the listen socket */
659 	new_sock = posix_sock_alloc(fd, sock->zcopy);
660 	if (new_sock == NULL) {
661 		close(fd);
662 		return NULL;
663 	}
664 
665 	return &new_sock->base;
666 }
667 
668 static int
669 posix_sock_close(struct spdk_sock *_sock)
670 {
671 	struct spdk_posix_sock *sock = __posix_sock(_sock);
672 
673 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
674 
675 	/* If the socket fails to close, the best choice is to
676 	 * leak the fd but continue to free the rest of the sock
677 	 * memory. */
678 	close(sock->fd);
679 
680 	spdk_pipe_destroy(sock->recv_pipe);
681 	free(sock->recv_buf);
682 	free(sock);
683 
684 	return 0;
685 }
686 
687 #ifdef SPDK_ZEROCOPY
688 static int
689 _sock_check_zcopy(struct spdk_sock *sock)
690 {
691 	struct spdk_posix_sock *psock = __posix_sock(sock);
692 	struct msghdr msgh = {};
693 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
694 	ssize_t rc;
695 	struct sock_extended_err *serr;
696 	struct cmsghdr *cm;
697 	uint32_t idx;
698 	struct spdk_sock_request *req, *treq;
699 	bool found;
700 
701 	msgh.msg_control = buf;
702 	msgh.msg_controllen = sizeof(buf);
703 
704 	while (true) {
705 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
706 
707 		if (rc < 0) {
708 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
709 				return 0;
710 			}
711 
712 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
713 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
714 			} else {
715 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
716 			}
717 			return 0;
718 		}
719 
720 		cm = CMSG_FIRSTHDR(&msgh);
721 		if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
722 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
723 			return 0;
724 		}
725 
726 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
727 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
728 			SPDK_WARNLOG("Unexpected extended error origin\n");
729 			return 0;
730 		}
731 
732 		/* Most of the time, the pending_reqs array is in the exact
733 		 * order we need such that all of the requests to complete are
734 		 * in order, in the front. It is guaranteed that all requests
735 		 * belonging to the same sendmsg call are sequential, so once
736 		 * we encounter one match we can stop looping as soon as a
737 		 * non-match is found.
738 		 */
739 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
740 			found = false;
741 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
742 				if (req->internal.offset == idx) {
743 					found = true;
744 
745 					rc = spdk_sock_request_put(sock, req, 0);
746 					if (rc < 0) {
747 						return rc;
748 					}
749 
750 				} else if (found) {
751 					break;
752 				}
753 			}
754 		}
755 	}
756 
757 	return 0;
758 }
759 #endif
760 
761 static int
762 _sock_flush(struct spdk_sock *sock)
763 {
764 	struct spdk_posix_sock *psock = __posix_sock(sock);
765 	struct msghdr msg = {};
766 	int flags;
767 	struct iovec iovs[IOV_BATCH_SIZE];
768 	int iovcnt;
769 	int retval;
770 	struct spdk_sock_request *req;
771 	int i;
772 	ssize_t rc;
773 	unsigned int offset;
774 	size_t len;
775 
776 	/* Can't flush from within a callback or we end up with recursive calls */
777 	if (sock->cb_cnt > 0) {
778 		return 0;
779 	}
780 
781 	iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL);
782 
783 	if (iovcnt == 0) {
784 		return 0;
785 	}
786 
787 	/* Perform the vectored write */
788 	msg.msg_iov = iovs;
789 	msg.msg_iovlen = iovcnt;
790 #ifdef SPDK_ZEROCOPY
791 	if (psock->zcopy) {
792 		flags = MSG_ZEROCOPY;
793 	} else
794 #endif
795 	{
796 		flags = 0;
797 	}
798 	rc = sendmsg(psock->fd, &msg, flags);
799 	if (rc <= 0) {
800 		if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
801 			return 0;
802 		}
803 		return rc;
804 	}
805 
806 	/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
807 	 * req->internal.offset, so sendmsg_idx should not be zero  */
808 	if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
809 		psock->sendmsg_idx = 1;
810 	} else {
811 		psock->sendmsg_idx++;
812 	}
813 
814 	/* Consume the requests that were actually written */
815 	req = TAILQ_FIRST(&sock->queued_reqs);
816 	while (req) {
817 		offset = req->internal.offset;
818 
819 		for (i = 0; i < req->iovcnt; i++) {
820 			/* Advance by the offset first */
821 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
822 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
823 				continue;
824 			}
825 
826 			/* Calculate the remaining length of this element */
827 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
828 
829 			if (len > (size_t)rc) {
830 				/* This element was partially sent. */
831 				req->internal.offset += rc;
832 				return 0;
833 			}
834 
835 			offset = 0;
836 			req->internal.offset += len;
837 			rc -= len;
838 		}
839 
840 		/* Handled a full request. */
841 		spdk_sock_request_pend(sock, req);
842 
843 		if (!psock->zcopy) {
844 			/* The sendmsg syscall above isn't currently asynchronous,
845 			* so it's already done. */
846 			retval = spdk_sock_request_put(sock, req, 0);
847 			if (retval) {
848 				break;
849 			}
850 		} else {
851 			/* Re-use the offset field to hold the sendmsg call index. The
852 			 * index is 0 based, so subtract one here because we've already
853 			 * incremented above. */
854 			req->internal.offset = psock->sendmsg_idx - 1;
855 		}
856 
857 		if (rc == 0) {
858 			break;
859 		}
860 
861 		req = TAILQ_FIRST(&sock->queued_reqs);
862 	}
863 
864 	return 0;
865 }
866 
867 static int
868 posix_sock_flush(struct spdk_sock *sock)
869 {
870 #ifdef SPDK_ZEROCOPY
871 	struct spdk_posix_sock *psock = __posix_sock(sock);
872 
873 	if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) {
874 		_sock_check_zcopy(sock);
875 	}
876 #endif
877 
878 	return _sock_flush(sock);
879 }
880 
881 static ssize_t
882 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
883 {
884 	struct iovec siov[2];
885 	int sbytes;
886 	ssize_t bytes;
887 	struct spdk_posix_sock_group_impl *group;
888 
889 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
890 	if (sbytes < 0) {
891 		errno = EINVAL;
892 		return -1;
893 	} else if (sbytes == 0) {
894 		errno = EAGAIN;
895 		return -1;
896 	}
897 
898 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
899 
900 	if (bytes == 0) {
901 		/* The only way this happens is if diov is 0 length */
902 		errno = EINVAL;
903 		return -1;
904 	}
905 
906 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
907 
908 	/* If we drained the pipe, mark it appropriately */
909 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
910 		assert(sock->pipe_has_data == true);
911 
912 		group = __posix_group_impl(sock->base.group_impl);
913 		if (group && !sock->socket_has_data) {
914 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
915 		}
916 
917 		sock->pipe_has_data = false;
918 	}
919 
920 	return bytes;
921 }
922 
923 static inline ssize_t
924 posix_sock_read(struct spdk_posix_sock *sock)
925 {
926 	struct iovec iov[2];
927 	int bytes_avail, bytes_recvd;
928 	struct spdk_posix_sock_group_impl *group;
929 
930 	bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
931 
932 	if (bytes_avail <= 0) {
933 		return bytes_avail;
934 	}
935 
936 	bytes_recvd = readv(sock->fd, iov, 2);
937 
938 	assert(sock->pipe_has_data == false);
939 
940 	if (bytes_recvd <= 0) {
941 		/* Errors count as draining the socket data */
942 		if (sock->base.group_impl && sock->socket_has_data) {
943 			group = __posix_group_impl(sock->base.group_impl);
944 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
945 		}
946 
947 		sock->socket_has_data = false;
948 
949 		return bytes_recvd;
950 	}
951 
952 	spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);
953 
954 #if DEBUG
955 	if (sock->base.group_impl) {
956 		assert(sock->socket_has_data == true);
957 	}
958 #endif
959 
960 	sock->pipe_has_data = true;
961 	sock->socket_has_data = false;
962 
963 	return bytes_recvd;
964 }
965 
966 static ssize_t
967 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
968 {
969 	struct spdk_posix_sock *sock = __posix_sock(_sock);
970 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
971 	int rc, i;
972 	size_t len;
973 
974 	if (sock->recv_pipe == NULL) {
975 		assert(sock->pipe_has_data == false);
976 		if (group && sock->socket_has_data) {
977 			sock->socket_has_data = false;
978 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
979 		}
980 		return readv(sock->fd, iov, iovcnt);
981 	}
982 
983 	/* If the socket is not in a group, we must assume it always has
984 	 * data waiting for us because it is not epolled */
985 	if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) {
986 		/* If the user is receiving a sufficiently large amount of data,
987 		 * receive directly to their buffers. */
988 		len = 0;
989 		for (i = 0; i < iovcnt; i++) {
990 			len += iov[i].iov_len;
991 		}
992 
993 		if (len >= MIN_SOCK_PIPE_SIZE) {
994 			/* TODO: Should this detect if kernel socket is drained? */
995 			return readv(sock->fd, iov, iovcnt);
996 		}
997 
998 		/* Otherwise, do a big read into our pipe */
999 		rc = posix_sock_read(sock);
1000 		if (rc <= 0) {
1001 			return rc;
1002 		}
1003 	}
1004 
1005 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
1006 }
1007 
1008 static ssize_t
1009 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
1010 {
1011 	struct iovec iov[1];
1012 
1013 	iov[0].iov_base = buf;
1014 	iov[0].iov_len = len;
1015 
1016 	return posix_sock_readv(sock, iov, 1);
1017 }
1018 
1019 static ssize_t
1020 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1021 {
1022 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1023 	int rc;
1024 
1025 	/* In order to process a writev, we need to flush any asynchronous writes
1026 	 * first. */
1027 	rc = _sock_flush(_sock);
1028 	if (rc < 0) {
1029 		return rc;
1030 	}
1031 
1032 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
1033 		/* We weren't able to flush all requests */
1034 		errno = EAGAIN;
1035 		return -1;
1036 	}
1037 
1038 	return writev(sock->fd, iov, iovcnt);
1039 }
1040 
1041 static void
1042 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1043 {
1044 	int rc;
1045 
1046 	spdk_sock_request_queue(sock, req);
1047 
1048 	/* If there are a sufficient number queued, just flush them out immediately. */
1049 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1050 		rc = _sock_flush(sock);
1051 		if (rc) {
1052 			spdk_sock_abort_requests(sock);
1053 		}
1054 	}
1055 }
1056 
1057 static int
1058 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1059 {
1060 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1061 	int val;
1062 	int rc;
1063 
1064 	assert(sock != NULL);
1065 
1066 	val = nbytes;
1067 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1068 	if (rc != 0) {
1069 		return -1;
1070 	}
1071 	return 0;
1072 }
1073 
1074 static bool
1075 posix_sock_is_ipv6(struct spdk_sock *_sock)
1076 {
1077 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1078 	struct sockaddr_storage sa;
1079 	socklen_t salen;
1080 	int rc;
1081 
1082 	assert(sock != NULL);
1083 
1084 	memset(&sa, 0, sizeof sa);
1085 	salen = sizeof sa;
1086 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1087 	if (rc != 0) {
1088 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1089 		return false;
1090 	}
1091 
1092 	return (sa.ss_family == AF_INET6);
1093 }
1094 
1095 static bool
1096 posix_sock_is_ipv4(struct spdk_sock *_sock)
1097 {
1098 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1099 	struct sockaddr_storage sa;
1100 	socklen_t salen;
1101 	int rc;
1102 
1103 	assert(sock != NULL);
1104 
1105 	memset(&sa, 0, sizeof sa);
1106 	salen = sizeof sa;
1107 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1108 	if (rc != 0) {
1109 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1110 		return false;
1111 	}
1112 
1113 	return (sa.ss_family == AF_INET);
1114 }
1115 
1116 static bool
1117 posix_sock_is_connected(struct spdk_sock *_sock)
1118 {
1119 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1120 	uint8_t byte;
1121 	int rc;
1122 
1123 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1124 	if (rc == 0) {
1125 		return false;
1126 	}
1127 
1128 	if (rc < 0) {
1129 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1130 			return true;
1131 		}
1132 
1133 		return false;
1134 	}
1135 
1136 	return true;
1137 }
1138 
1139 static struct spdk_sock_group_impl *
1140 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock)
1141 {
1142 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1143 	struct spdk_sock_group_impl *group_impl;
1144 
1145 	if (sock->placement_id != -1) {
1146 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl);
1147 		return group_impl;
1148 	}
1149 
1150 	return NULL;
1151 }
1152 
1153 static struct spdk_sock_group_impl *
1154 posix_sock_group_impl_create(void)
1155 {
1156 	struct spdk_posix_sock_group_impl *group_impl;
1157 	int fd;
1158 
1159 #if defined(SPDK_EPOLL)
1160 	fd = epoll_create1(0);
1161 #elif defined(SPDK_KEVENT)
1162 	fd = kqueue();
1163 #endif
1164 	if (fd == -1) {
1165 		return NULL;
1166 	}
1167 
1168 	group_impl = calloc(1, sizeof(*group_impl));
1169 	if (group_impl == NULL) {
1170 		SPDK_ERRLOG("group_impl allocation failed\n");
1171 		close(fd);
1172 		return NULL;
1173 	}
1174 
1175 	group_impl->fd = fd;
1176 	TAILQ_INIT(&group_impl->socks_with_data);
1177 	group_impl->placement_id = -1;
1178 
1179 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1180 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1181 		group_impl->placement_id = spdk_env_get_current_core();
1182 	}
1183 
1184 	return &group_impl->base;
1185 }
1186 
1187 static void
1188 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock,
1189 		int placement_id)
1190 {
1191 #if defined(SO_MARK)
1192 	int rc;
1193 
1194 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK,
1195 			&placement_id, sizeof(placement_id));
1196 	if (rc != 0) {
1197 		/* Not fatal */
1198 		SPDK_ERRLOG("Error setting SO_MARK\n");
1199 		return;
1200 	}
1201 
1202 	rc = spdk_sock_map_insert(&g_map, placement_id, &group->base);
1203 	if (rc != 0) {
1204 		/* Not fatal */
1205 		SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1206 		return;
1207 	}
1208 
1209 	sock->placement_id = placement_id;
1210 #endif
1211 }
1212 
1213 static void
1214 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1215 {
1216 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1217 
1218 	if (group->placement_id == -1) {
1219 		group->placement_id = spdk_sock_map_find_free(&g_map);
1220 
1221 		/* If a free placement id is found, update existing sockets in this group */
1222 		if (group->placement_id != -1) {
1223 			struct spdk_sock  *sock, *tmp;
1224 
1225 			TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1226 				posix_sock_mark(group, __posix_sock(sock), group->placement_id);
1227 			}
1228 		}
1229 	}
1230 
1231 	if (group->placement_id != -1) {
1232 		/*
1233 		 * group placement id is already determined for this poll group.
1234 		 * Mark socket with group's placement id.
1235 		 */
1236 		posix_sock_mark(group, __posix_sock(_sock), group->placement_id);
1237 	}
1238 }
1239 
1240 static int
1241 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1242 {
1243 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1244 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1245 	int rc;
1246 
1247 #if defined(SPDK_EPOLL)
1248 	struct epoll_event event;
1249 
1250 	memset(&event, 0, sizeof(event));
1251 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1252 	event.events = EPOLLIN | EPOLLERR;
1253 	event.data.ptr = sock;
1254 
1255 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1256 #elif defined(SPDK_KEVENT)
1257 	struct kevent event;
1258 	struct timespec ts = {0};
1259 
1260 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1261 
1262 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1263 #endif
1264 
1265 	if (rc != 0) {
1266 		return rc;
1267 	}
1268 
1269 	/* switched from another polling group due to scheduling */
1270 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1271 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1272 		sock->pipe_has_data = true;
1273 		sock->socket_has_data = false;
1274 		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
1275 	}
1276 
1277 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
1278 		posix_sock_update_mark(_group, _sock);
1279 	} else if (sock->placement_id != -1) {
1280 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1281 		if (rc != 0) {
1282 			SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1283 			/* Do not treat this as an error. The system will continue running. */
1284 		}
1285 	}
1286 
1287 	return rc;
1288 }
1289 
1290 static int
1291 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1292 {
1293 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1294 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1295 	int rc;
1296 
1297 	if (sock->pipe_has_data || sock->socket_has_data) {
1298 		TAILQ_REMOVE(&group->socks_with_data, sock, link);
1299 		sock->pipe_has_data = false;
1300 		sock->socket_has_data = false;
1301 	}
1302 
1303 	if (sock->placement_id != -1) {
1304 		spdk_sock_map_release(&g_map, sock->placement_id);
1305 	}
1306 
1307 #if defined(SPDK_EPOLL)
1308 	struct epoll_event event;
1309 
1310 	/* Event parameter is ignored but some old kernel version still require it. */
1311 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1312 #elif defined(SPDK_KEVENT)
1313 	struct kevent event;
1314 	struct timespec ts = {0};
1315 
1316 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1317 
1318 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1319 	if (rc == 0 && event.flags & EV_ERROR) {
1320 		rc = -1;
1321 		errno = event.data;
1322 	}
1323 #endif
1324 
1325 	spdk_sock_abort_requests(_sock);
1326 
1327 	return rc;
1328 }
1329 
1330 static int
1331 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1332 			   struct spdk_sock **socks)
1333 {
1334 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1335 	struct spdk_sock *sock, *tmp;
1336 	int num_events, i, rc;
1337 	struct spdk_posix_sock *psock, *ptmp;
1338 #if defined(SPDK_EPOLL)
1339 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1340 #elif defined(SPDK_KEVENT)
1341 	struct kevent events[MAX_EVENTS_PER_POLL];
1342 	struct timespec ts = {0};
1343 #endif
1344 
1345 #ifdef SPDK_ZEROCOPY
1346 	/* When all of the following conditions are met
1347 	 * - non-blocking socket
1348 	 * - zero copy is enabled
1349 	 * - interrupts suppressed (i.e. busy polling)
1350 	 * - the NIC tx queue is full at the time sendmsg() is called
1351 	 * - epoll_wait determines there is an EPOLLIN event for the socket
1352 	 * then we can get into a situation where data we've sent is queued
1353 	 * up in the kernel network stack, but interrupts have been suppressed
1354 	 * because other traffic is flowing so the kernel misses the signal
1355 	 * to flush the software tx queue. If there wasn't incoming data
1356 	 * pending on the socket, then epoll_wait would have been sufficient
1357 	 * to kick off the send operation, but since there is a pending event
1358 	 * epoll_wait does not trigger the necessary operation.
1359 	 *
1360 	 * We deal with this by checking for all of the above conditions and
1361 	 * additionally looking for EPOLLIN events that were not consumed from
1362 	 * the last poll loop. We take this to mean that the upper layer is
1363 	 * unable to consume them because it is blocked waiting for resources
1364 	 * to free up, and those resources are most likely freed in response
1365 	 * to a pending asynchronous write completing.
1366 	 *
1367 	 * Additionally, sockets that have the same placement_id actually share
1368 	 * an underlying hardware queue. That means polling one of them is
1369 	 * equivalent to polling all of them. As a quick mechanism to avoid
1370 	 * making extra poll() calls, stash the last placement_id during the loop
1371 	 * and only poll if it's not the same. The overwhelmingly common case
1372 	 * is that all sockets in this list have the same placement_id because
1373 	 * SPDK is intentionally grouping sockets by that value, so even
1374 	 * though this won't stop all extra calls to poll(), it's very fast
1375 	 * and will catch all of them in practice.
1376 	 */
1377 	int last_placement_id = -1;
1378 
1379 	TAILQ_FOREACH(psock, &group->socks_with_data, link) {
1380 		if (psock->zcopy && psock->placement_id >= 0 &&
1381 		    psock->placement_id != last_placement_id) {
1382 			struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0};
1383 
1384 			poll(&pfd, 1, 0);
1385 			last_placement_id = psock->placement_id;
1386 		}
1387 	}
1388 #endif
1389 
1390 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1391 	 * a completion callback could remove the sock from the
1392 	 * group. */
1393 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1394 		rc = _sock_flush(sock);
1395 		if (rc) {
1396 			spdk_sock_abort_requests(sock);
1397 		}
1398 	}
1399 
1400 	assert(max_events > 0);
1401 
1402 #if defined(SPDK_EPOLL)
1403 	num_events = epoll_wait(group->fd, events, max_events, 0);
1404 #elif defined(SPDK_KEVENT)
1405 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1406 #endif
1407 
1408 	if (num_events == -1) {
1409 		return -1;
1410 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
1411 		sock = TAILQ_FIRST(&_group->socks);
1412 		psock = __posix_sock(sock);
1413 		/* poll() is called here to busy poll the queue associated with
1414 		 * first socket in list and potentially reap incoming data.
1415 		 */
1416 		if (sock->opts.priority) {
1417 			struct pollfd pfd = {0, 0, 0};
1418 
1419 			pfd.fd = psock->fd;
1420 			pfd.events = POLLIN | POLLERR;
1421 			poll(&pfd, 1, 0);
1422 		}
1423 	}
1424 
1425 	for (i = 0; i < num_events; i++) {
1426 #if defined(SPDK_EPOLL)
1427 		sock = events[i].data.ptr;
1428 		psock = __posix_sock(sock);
1429 
1430 #ifdef SPDK_ZEROCOPY
1431 		if (events[i].events & EPOLLERR) {
1432 			rc = _sock_check_zcopy(sock);
1433 			/* If the socket was closed or removed from
1434 			 * the group in response to a send ack, don't
1435 			 * add it to the array here. */
1436 			if (rc || sock->cb_fn == NULL) {
1437 				continue;
1438 			}
1439 		}
1440 #endif
1441 		if ((events[i].events & EPOLLIN) == 0) {
1442 			continue;
1443 		}
1444 
1445 #elif defined(SPDK_KEVENT)
1446 		sock = events[i].udata;
1447 		psock = __posix_sock(sock);
1448 #endif
1449 
1450 		/* If the socket is not already in the list, add it now */
1451 		if (!psock->socket_has_data && !psock->pipe_has_data) {
1452 			TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link);
1453 		}
1454 
1455 		psock->socket_has_data = true;
1456 	}
1457 
1458 	num_events = 0;
1459 
1460 	TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) {
1461 		if (num_events == max_events) {
1462 			break;
1463 		}
1464 
1465 		/* If the socket's cb_fn is NULL, just remove it from the
1466 		 * list and do not add it to socks array */
1467 		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
1468 			psock->socket_has_data = false;
1469 			psock->pipe_has_data = false;
1470 			TAILQ_REMOVE(&group->socks_with_data, psock, link);
1471 			continue;
1472 		}
1473 
1474 		socks[num_events++] = &psock->base;
1475 	}
1476 
1477 	/* Cycle the has_data list so that each time we poll things aren't
1478 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1479 	 * A B C D E F
1480 	 * And all 6 sockets had epoll events, but max_events is only 3. That means
1481 	 * psock currently points at D. We want to rearrange the list to the following:
1482 	 * D E F A B C
1483 	 *
1484 	 * The variables below are named according to this example to make it easier to
1485 	 * follow the swaps.
1486 	 */
1487 	if (psock != NULL) {
1488 		struct spdk_posix_sock *pa, *pc, *pd, *pf;
1489 
1490 		/* Capture pointers to the elements we need */
1491 		pd = psock;
1492 		pc = TAILQ_PREV(pd, spdk_has_data_list, link);
1493 		pa = TAILQ_FIRST(&group->socks_with_data);
1494 		pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list);
1495 
1496 		/* Break the link between C and D */
1497 		pc->link.tqe_next = NULL;
1498 		pd->link.tqe_prev = NULL;
1499 
1500 		/* Connect F to A */
1501 		pf->link.tqe_next = pa;
1502 		pa->link.tqe_prev = &pf->link.tqe_next;
1503 
1504 		/* Fix up the list first/last pointers */
1505 		group->socks_with_data.tqh_first = pd;
1506 		group->socks_with_data.tqh_last = &pc->link.tqe_next;
1507 	}
1508 
1509 	return num_events;
1510 }
1511 
1512 static int
1513 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1514 {
1515 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1516 	int rc;
1517 
1518 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1519 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1520 	}
1521 
1522 	rc = close(group->fd);
1523 	free(group);
1524 	return rc;
1525 }
1526 
1527 static int
1528 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1529 {
1530 	if (!opts || !len) {
1531 		errno = EINVAL;
1532 		return -1;
1533 	}
1534 	memset(opts, 0, *len);
1535 
1536 #define FIELD_OK(field) \
1537 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1538 
1539 #define GET_FIELD(field) \
1540 	if (FIELD_OK(field)) { \
1541 		opts->field = g_spdk_posix_sock_impl_opts.field; \
1542 	}
1543 
1544 	GET_FIELD(recv_buf_size);
1545 	GET_FIELD(send_buf_size);
1546 	GET_FIELD(enable_recv_pipe);
1547 	GET_FIELD(enable_zerocopy_send);
1548 	GET_FIELD(enable_quickack);
1549 	GET_FIELD(enable_placement_id);
1550 	GET_FIELD(enable_zerocopy_send_server);
1551 	GET_FIELD(enable_zerocopy_send_client);
1552 
1553 #undef GET_FIELD
1554 #undef FIELD_OK
1555 
1556 	*len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
1557 	return 0;
1558 }
1559 
1560 static int
1561 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1562 {
1563 	if (!opts) {
1564 		errno = EINVAL;
1565 		return -1;
1566 	}
1567 
1568 #define FIELD_OK(field) \
1569 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1570 
1571 #define SET_FIELD(field) \
1572 	if (FIELD_OK(field)) { \
1573 		g_spdk_posix_sock_impl_opts.field = opts->field; \
1574 	}
1575 
1576 	SET_FIELD(recv_buf_size);
1577 	SET_FIELD(send_buf_size);
1578 	SET_FIELD(enable_recv_pipe);
1579 	SET_FIELD(enable_zerocopy_send);
1580 	SET_FIELD(enable_quickack);
1581 	SET_FIELD(enable_placement_id);
1582 	SET_FIELD(enable_zerocopy_send_server);
1583 	SET_FIELD(enable_zerocopy_send_client);
1584 
1585 #undef SET_FIELD
1586 #undef FIELD_OK
1587 
1588 	return 0;
1589 }
1590 
1591 
1592 static struct spdk_net_impl g_posix_net_impl = {
1593 	.name		= "posix",
1594 	.getaddr	= posix_sock_getaddr,
1595 	.connect	= posix_sock_connect,
1596 	.listen		= posix_sock_listen,
1597 	.accept		= posix_sock_accept,
1598 	.close		= posix_sock_close,
1599 	.recv		= posix_sock_recv,
1600 	.readv		= posix_sock_readv,
1601 	.writev		= posix_sock_writev,
1602 	.writev_async	= posix_sock_writev_async,
1603 	.flush		= posix_sock_flush,
1604 	.set_recvlowat	= posix_sock_set_recvlowat,
1605 	.set_recvbuf	= posix_sock_set_recvbuf,
1606 	.set_sendbuf	= posix_sock_set_sendbuf,
1607 	.is_ipv6	= posix_sock_is_ipv6,
1608 	.is_ipv4	= posix_sock_is_ipv4,
1609 	.is_connected	= posix_sock_is_connected,
1610 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
1611 	.group_impl_create	= posix_sock_group_impl_create,
1612 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
1613 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
1614 	.group_impl_poll	= posix_sock_group_impl_poll,
1615 	.group_impl_close	= posix_sock_group_impl_close,
1616 	.get_opts	= posix_sock_impl_get_opts,
1617 	.set_opts	= posix_sock_impl_set_opts,
1618 };
1619 
1620 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1621