xref: /spdk/module/sock/posix/posix.c (revision 927f1fd57bd004df581518466ec4c1b8083e5d23)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation. All rights reserved.
5  *   Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved.
6  *   Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #if defined(__FreeBSD__)
38 #include <sys/event.h>
39 #define SPDK_KEVENT
40 #else
41 #include <sys/epoll.h>
42 #define SPDK_EPOLL
43 #endif
44 
45 #if defined(__linux__)
46 #include <linux/errqueue.h>
47 #endif
48 
49 #include "spdk/env.h"
50 #include "spdk/log.h"
51 #include "spdk/pipe.h"
52 #include "spdk/sock.h"
53 #include "spdk/util.h"
54 #include "spdk_internal/sock.h"
55 #include "../sock_kernel.h"
56 
57 #define MAX_TMPBUF 1024
58 #define PORTNUMLEN 32
59 
60 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
61 #define SPDK_ZEROCOPY
62 #endif
63 
64 struct spdk_posix_sock {
65 	struct spdk_sock	base;
66 	int			fd;
67 
68 	uint32_t		sendmsg_idx;
69 
70 	struct spdk_pipe	*recv_pipe;
71 	void			*recv_buf;
72 	int			recv_buf_sz;
73 	bool			pipe_has_data;
74 	bool			socket_has_data;
75 	bool			zcopy;
76 
77 	int			placement_id;
78 
79 	TAILQ_ENTRY(spdk_posix_sock)	link;
80 };
81 
82 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock);
83 
84 struct spdk_posix_sock_group_impl {
85 	struct spdk_sock_group_impl	base;
86 	int				fd;
87 	struct spdk_has_data_list	socks_with_data;
88 	int				placement_id;
89 };
90 
91 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
92 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
93 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
94 	.enable_recv_pipe = true,
95 	.enable_quickack = false,
96 	.enable_placement_id = PLACEMENT_NONE,
97 	.enable_zerocopy_send_server = true,
98 	.enable_zerocopy_send_client = false,
99 	.zerocopy_threshold = 0
100 };
101 
102 static struct spdk_sock_map g_map = {
103 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
104 	.mtx = PTHREAD_MUTEX_INITIALIZER
105 };
106 
107 __attribute((destructor)) static void
108 posix_sock_map_cleanup(void)
109 {
110 	spdk_sock_map_cleanup(&g_map);
111 }
112 
113 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
114 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
115 
116 static int
117 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
118 		   char *caddr, int clen, uint16_t *cport)
119 {
120 	struct spdk_posix_sock *sock = __posix_sock(_sock);
121 	struct sockaddr_storage sa;
122 	socklen_t salen;
123 	int rc;
124 
125 	assert(sock != NULL);
126 
127 	memset(&sa, 0, sizeof sa);
128 	salen = sizeof sa;
129 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
130 	if (rc != 0) {
131 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
132 		return -1;
133 	}
134 
135 	switch (sa.ss_family) {
136 	case AF_UNIX:
137 		/* Acceptable connection types that don't have IPs */
138 		return 0;
139 	case AF_INET:
140 	case AF_INET6:
141 		/* Code below will get IP addresses */
142 		break;
143 	default:
144 		/* Unsupported socket family */
145 		return -1;
146 	}
147 
148 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
149 	if (rc != 0) {
150 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
151 		return -1;
152 	}
153 
154 	if (sport) {
155 		if (sa.ss_family == AF_INET) {
156 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
157 		} else if (sa.ss_family == AF_INET6) {
158 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
159 		}
160 	}
161 
162 	memset(&sa, 0, sizeof sa);
163 	salen = sizeof sa;
164 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
165 	if (rc != 0) {
166 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
167 		return -1;
168 	}
169 
170 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
171 	if (rc != 0) {
172 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
173 		return -1;
174 	}
175 
176 	if (cport) {
177 		if (sa.ss_family == AF_INET) {
178 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
179 		} else if (sa.ss_family == AF_INET6) {
180 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
181 		}
182 	}
183 
184 	return 0;
185 }
186 
187 enum posix_sock_create_type {
188 	SPDK_SOCK_CREATE_LISTEN,
189 	SPDK_SOCK_CREATE_CONNECT,
190 };
191 
192 static int
193 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
194 {
195 	uint8_t *new_buf;
196 	struct spdk_pipe *new_pipe;
197 	struct iovec siov[2];
198 	struct iovec diov[2];
199 	int sbytes;
200 	ssize_t bytes;
201 
202 	if (sock->recv_buf_sz == sz) {
203 		return 0;
204 	}
205 
206 	/* If the new size is 0, just free the pipe */
207 	if (sz == 0) {
208 		spdk_pipe_destroy(sock->recv_pipe);
209 		free(sock->recv_buf);
210 		sock->recv_pipe = NULL;
211 		sock->recv_buf = NULL;
212 		return 0;
213 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
214 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
215 		return -1;
216 	}
217 
218 	/* Round up to next 64 byte multiple */
219 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
220 	if (!new_buf) {
221 		SPDK_ERRLOG("socket recv buf allocation failed\n");
222 		return -ENOMEM;
223 	}
224 
225 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
226 	if (new_pipe == NULL) {
227 		SPDK_ERRLOG("socket pipe allocation failed\n");
228 		free(new_buf);
229 		return -ENOMEM;
230 	}
231 
232 	if (sock->recv_pipe != NULL) {
233 		/* Pull all of the data out of the old pipe */
234 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
235 		if (sbytes > sz) {
236 			/* Too much data to fit into the new pipe size */
237 			spdk_pipe_destroy(new_pipe);
238 			free(new_buf);
239 			return -EINVAL;
240 		}
241 
242 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
243 		assert(sbytes == sz);
244 
245 		bytes = spdk_iovcpy(siov, 2, diov, 2);
246 		spdk_pipe_writer_advance(new_pipe, bytes);
247 
248 		spdk_pipe_destroy(sock->recv_pipe);
249 		free(sock->recv_buf);
250 	}
251 
252 	sock->recv_buf_sz = sz;
253 	sock->recv_buf = new_buf;
254 	sock->recv_pipe = new_pipe;
255 
256 	return 0;
257 }
258 
259 static int
260 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
261 {
262 	struct spdk_posix_sock *sock = __posix_sock(_sock);
263 	int rc;
264 
265 	assert(sock != NULL);
266 
267 	if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) {
268 		rc = posix_sock_alloc_pipe(sock, sz);
269 		if (rc) {
270 			return rc;
271 		}
272 	}
273 
274 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
275 	if (sz < MIN_SO_RCVBUF_SIZE) {
276 		sz = MIN_SO_RCVBUF_SIZE;
277 	}
278 
279 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
280 	if (rc < 0) {
281 		return rc;
282 	}
283 
284 	return 0;
285 }
286 
287 static int
288 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
289 {
290 	struct spdk_posix_sock *sock = __posix_sock(_sock);
291 	int rc;
292 
293 	assert(sock != NULL);
294 
295 	if (sz < MIN_SO_SNDBUF_SIZE) {
296 		sz = MIN_SO_SNDBUF_SIZE;
297 	}
298 
299 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
300 	if (rc < 0) {
301 		return rc;
302 	}
303 
304 	return 0;
305 }
306 
307 static void
308 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy)
309 {
310 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
311 	int flag;
312 	int rc;
313 #endif
314 
315 #if defined(SPDK_ZEROCOPY)
316 	flag = 1;
317 
318 	if (enable_zero_copy) {
319 		/* Try to turn on zero copy sends */
320 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
321 		if (rc == 0) {
322 			sock->zcopy = true;
323 		}
324 	}
325 #endif
326 
327 #if defined(__linux__)
328 	flag = 1;
329 
330 	if (g_spdk_posix_sock_impl_opts.enable_quickack) {
331 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
332 		if (rc != 0) {
333 			SPDK_ERRLOG("quickack was failed to set\n");
334 		}
335 	}
336 
337 	spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id,
338 				   &sock->placement_id);
339 
340 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
341 		/* Save placement_id */
342 		spdk_sock_map_insert(&g_map, sock->placement_id, NULL);
343 	}
344 #endif
345 }
346 
347 static struct spdk_posix_sock *
348 posix_sock_alloc(int fd, bool enable_zero_copy)
349 {
350 	struct spdk_posix_sock *sock;
351 
352 	sock = calloc(1, sizeof(*sock));
353 	if (sock == NULL) {
354 		SPDK_ERRLOG("sock allocation failed\n");
355 		return NULL;
356 	}
357 
358 	sock->fd = fd;
359 	posix_sock_init(sock, enable_zero_copy);
360 
361 	return sock;
362 }
363 
364 static int
365 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts)
366 {
367 	int fd;
368 	int val = 1;
369 	int rc, sz;
370 #if defined(__linux__)
371 	int to;
372 #endif
373 
374 	fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
375 	if (fd < 0) {
376 		/* error */
377 		return -1;
378 	}
379 
380 	sz = g_spdk_posix_sock_impl_opts.recv_buf_size;
381 	rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
382 	if (rc) {
383 		/* Not fatal */
384 	}
385 
386 	sz = g_spdk_posix_sock_impl_opts.send_buf_size;
387 	rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
388 	if (rc) {
389 		/* Not fatal */
390 	}
391 
392 	rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
393 	if (rc != 0) {
394 		close(fd);
395 		/* error */
396 		return -1;
397 	}
398 	rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
399 	if (rc != 0) {
400 		close(fd);
401 		/* error */
402 		return -1;
403 	}
404 
405 #if defined(SO_PRIORITY)
406 	if (opts->priority) {
407 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
408 		if (rc != 0) {
409 			close(fd);
410 			/* error */
411 			return -1;
412 		}
413 	}
414 #endif
415 
416 	if (res->ai_family == AF_INET6) {
417 		rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
418 		if (rc != 0) {
419 			close(fd);
420 			/* error */
421 			return -1;
422 		}
423 	}
424 
425 	if (opts->ack_timeout) {
426 #if defined(__linux__)
427 		to = opts->ack_timeout;
428 		rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to));
429 		if (rc != 0) {
430 			close(fd);
431 			/* error */
432 			return -1;
433 		}
434 #else
435 		SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
436 #endif
437 	}
438 
439 	return fd;
440 }
441 
442 static struct spdk_sock *
443 posix_sock_create(const char *ip, int port,
444 		  enum posix_sock_create_type type,
445 		  struct spdk_sock_opts *opts)
446 {
447 	struct spdk_posix_sock *sock;
448 	char buf[MAX_TMPBUF];
449 	char portnum[PORTNUMLEN];
450 	char *p;
451 	struct addrinfo hints, *res, *res0;
452 	int fd, flag;
453 	int rc;
454 	bool enable_zcopy_user_opts = true;
455 	bool enable_zcopy_impl_opts = true;
456 
457 	assert(opts != NULL);
458 
459 	if (ip == NULL) {
460 		return NULL;
461 	}
462 	if (ip[0] == '[') {
463 		snprintf(buf, sizeof(buf), "%s", ip + 1);
464 		p = strchr(buf, ']');
465 		if (p != NULL) {
466 			*p = '\0';
467 		}
468 		ip = (const char *) &buf[0];
469 	}
470 
471 	snprintf(portnum, sizeof portnum, "%d", port);
472 	memset(&hints, 0, sizeof hints);
473 	hints.ai_family = PF_UNSPEC;
474 	hints.ai_socktype = SOCK_STREAM;
475 	hints.ai_flags = AI_NUMERICSERV;
476 	hints.ai_flags |= AI_PASSIVE;
477 	hints.ai_flags |= AI_NUMERICHOST;
478 	rc = getaddrinfo(ip, portnum, &hints, &res0);
479 	if (rc != 0) {
480 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
481 		return NULL;
482 	}
483 
484 	/* try listen */
485 	fd = -1;
486 	for (res = res0; res != NULL; res = res->ai_next) {
487 retry:
488 		fd = posix_fd_create(res, opts);
489 		if (fd < 0) {
490 			continue;
491 		}
492 		if (type == SPDK_SOCK_CREATE_LISTEN) {
493 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
494 			if (rc != 0) {
495 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
496 				switch (errno) {
497 				case EINTR:
498 					/* interrupted? */
499 					close(fd);
500 					goto retry;
501 				case EADDRNOTAVAIL:
502 					SPDK_ERRLOG("IP address %s not available. "
503 						    "Verify IP address in config file "
504 						    "and make sure setup script is "
505 						    "run before starting spdk app.\n", ip);
506 				/* FALLTHROUGH */
507 				default:
508 					/* try next family */
509 					close(fd);
510 					fd = -1;
511 					continue;
512 				}
513 			}
514 			/* bind OK */
515 			rc = listen(fd, 512);
516 			if (rc != 0) {
517 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
518 				close(fd);
519 				fd = -1;
520 				break;
521 			}
522 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server;
523 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
524 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
525 			if (rc != 0) {
526 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
527 				/* try next family */
528 				close(fd);
529 				fd = -1;
530 				continue;
531 			}
532 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client;
533 		}
534 
535 		flag = fcntl(fd, F_GETFL);
536 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
537 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
538 			close(fd);
539 			fd = -1;
540 			break;
541 		}
542 		break;
543 	}
544 	freeaddrinfo(res0);
545 
546 	if (fd < 0) {
547 		return NULL;
548 	}
549 
550 	/* Only enable zero copy for non-loopback sockets. */
551 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd);
552 
553 	sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts);
554 	if (sock == NULL) {
555 		SPDK_ERRLOG("sock allocation failed\n");
556 		close(fd);
557 		return NULL;
558 	}
559 
560 	return &sock->base;
561 }
562 
563 static struct spdk_sock *
564 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
565 {
566 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
567 }
568 
569 static struct spdk_sock *
570 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
571 {
572 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
573 }
574 
575 static struct spdk_sock *
576 posix_sock_accept(struct spdk_sock *_sock)
577 {
578 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
579 	struct sockaddr_storage		sa;
580 	socklen_t			salen;
581 	int				rc, fd;
582 	struct spdk_posix_sock		*new_sock;
583 	int				flag;
584 
585 	memset(&sa, 0, sizeof(sa));
586 	salen = sizeof(sa);
587 
588 	assert(sock != NULL);
589 
590 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
591 
592 	if (rc == -1) {
593 		return NULL;
594 	}
595 
596 	fd = rc;
597 
598 	flag = fcntl(fd, F_GETFL);
599 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
600 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
601 		close(fd);
602 		return NULL;
603 	}
604 
605 #if defined(SO_PRIORITY)
606 	/* The priority is not inherited, so call this function again */
607 	if (sock->base.opts.priority) {
608 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
609 		if (rc != 0) {
610 			close(fd);
611 			return NULL;
612 		}
613 	}
614 #endif
615 
616 	/* Inherit the zero copy feature from the listen socket */
617 	new_sock = posix_sock_alloc(fd, sock->zcopy);
618 	if (new_sock == NULL) {
619 		close(fd);
620 		return NULL;
621 	}
622 
623 	return &new_sock->base;
624 }
625 
626 static int
627 posix_sock_close(struct spdk_sock *_sock)
628 {
629 	struct spdk_posix_sock *sock = __posix_sock(_sock);
630 
631 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
632 
633 	/* If the socket fails to close, the best choice is to
634 	 * leak the fd but continue to free the rest of the sock
635 	 * memory. */
636 	close(sock->fd);
637 
638 	spdk_pipe_destroy(sock->recv_pipe);
639 	free(sock->recv_buf);
640 	free(sock);
641 
642 	return 0;
643 }
644 
645 #ifdef SPDK_ZEROCOPY
646 static int
647 _sock_check_zcopy(struct spdk_sock *sock)
648 {
649 	struct spdk_posix_sock *psock = __posix_sock(sock);
650 	struct msghdr msgh = {};
651 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
652 	ssize_t rc;
653 	struct sock_extended_err *serr;
654 	struct cmsghdr *cm;
655 	uint32_t idx;
656 	struct spdk_sock_request *req, *treq;
657 	bool found;
658 
659 	msgh.msg_control = buf;
660 	msgh.msg_controllen = sizeof(buf);
661 
662 	while (true) {
663 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
664 
665 		if (rc < 0) {
666 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
667 				return 0;
668 			}
669 
670 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
671 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
672 			} else {
673 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
674 			}
675 			return 0;
676 		}
677 
678 		cm = CMSG_FIRSTHDR(&msgh);
679 		if (!(cm &&
680 		      ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
681 		       (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) {
682 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
683 			return 0;
684 		}
685 
686 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
687 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
688 			SPDK_WARNLOG("Unexpected extended error origin\n");
689 			return 0;
690 		}
691 
692 		/* Most of the time, the pending_reqs array is in the exact
693 		 * order we need such that all of the requests to complete are
694 		 * in order, in the front. It is guaranteed that all requests
695 		 * belonging to the same sendmsg call are sequential, so once
696 		 * we encounter one match we can stop looping as soon as a
697 		 * non-match is found.
698 		 */
699 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
700 			found = false;
701 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
702 				if (!req->internal.is_zcopy) {
703 					/* This wasn't a zcopy request. It was just waiting in line to complete */
704 					rc = spdk_sock_request_put(sock, req, 0);
705 					if (rc < 0) {
706 						return rc;
707 					}
708 				} else if (req->internal.offset == idx) {
709 					found = true;
710 					rc = spdk_sock_request_put(sock, req, 0);
711 					if (rc < 0) {
712 						return rc;
713 					}
714 				} else if (found) {
715 					break;
716 				}
717 			}
718 		}
719 	}
720 
721 	return 0;
722 }
723 #endif
724 
725 static int
726 _sock_flush(struct spdk_sock *sock)
727 {
728 	struct spdk_posix_sock *psock = __posix_sock(sock);
729 	struct msghdr msg = {};
730 	int flags;
731 	struct iovec iovs[IOV_BATCH_SIZE];
732 	int iovcnt;
733 	int retval;
734 	struct spdk_sock_request *req;
735 	int i;
736 	ssize_t rc;
737 	unsigned int offset;
738 	size_t len;
739 	bool is_zcopy = false;
740 
741 	/* Can't flush from within a callback or we end up with recursive calls */
742 	if (sock->cb_cnt > 0) {
743 		return 0;
744 	}
745 
746 #ifdef SPDK_ZEROCOPY
747 	if (psock->zcopy) {
748 		flags = MSG_ZEROCOPY | MSG_NOSIGNAL;
749 	} else
750 #endif
751 	{
752 		flags = MSG_NOSIGNAL;
753 	}
754 
755 	iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags);
756 	if (iovcnt == 0) {
757 		return 0;
758 	}
759 
760 #ifdef SPDK_ZEROCOPY
761 	is_zcopy = flags & MSG_ZEROCOPY;
762 #endif
763 
764 	/* Perform the vectored write */
765 	msg.msg_iov = iovs;
766 	msg.msg_iovlen = iovcnt;
767 
768 	rc = sendmsg(psock->fd, &msg, flags);
769 	if (rc <= 0) {
770 		if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
771 			return 0;
772 		}
773 		return rc;
774 	}
775 
776 	if (is_zcopy) {
777 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
778 		 * req->internal.offset, so sendmsg_idx should not be zero  */
779 		if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
780 			psock->sendmsg_idx = 1;
781 		} else {
782 			psock->sendmsg_idx++;
783 		}
784 	}
785 
786 	/* Consume the requests that were actually written */
787 	req = TAILQ_FIRST(&sock->queued_reqs);
788 	while (req) {
789 		offset = req->internal.offset;
790 
791 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
792 		req->internal.is_zcopy = is_zcopy;
793 
794 		for (i = 0; i < req->iovcnt; i++) {
795 			/* Advance by the offset first */
796 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
797 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
798 				continue;
799 			}
800 
801 			/* Calculate the remaining length of this element */
802 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
803 
804 			if (len > (size_t)rc) {
805 				/* This element was partially sent. */
806 				req->internal.offset += rc;
807 				return 0;
808 			}
809 
810 			offset = 0;
811 			req->internal.offset += len;
812 			rc -= len;
813 		}
814 
815 		/* Handled a full request. */
816 		spdk_sock_request_pend(sock, req);
817 
818 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) {
819 			/* The sendmsg syscall above isn't currently asynchronous,
820 			* so it's already done. */
821 			retval = spdk_sock_request_put(sock, req, 0);
822 			if (retval) {
823 				break;
824 			}
825 		} else {
826 			/* Re-use the offset field to hold the sendmsg call index. The
827 			 * index is 0 based, so subtract one here because we've already
828 			 * incremented above. */
829 			req->internal.offset = psock->sendmsg_idx - 1;
830 		}
831 
832 		if (rc == 0) {
833 			break;
834 		}
835 
836 		req = TAILQ_FIRST(&sock->queued_reqs);
837 	}
838 
839 	return 0;
840 }
841 
842 static int
843 posix_sock_flush(struct spdk_sock *sock)
844 {
845 #ifdef SPDK_ZEROCOPY
846 	struct spdk_posix_sock *psock = __posix_sock(sock);
847 
848 	if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) {
849 		_sock_check_zcopy(sock);
850 	}
851 #endif
852 
853 	return _sock_flush(sock);
854 }
855 
856 static ssize_t
857 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
858 {
859 	struct iovec siov[2];
860 	int sbytes;
861 	ssize_t bytes;
862 	struct spdk_posix_sock_group_impl *group;
863 
864 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
865 	if (sbytes < 0) {
866 		errno = EINVAL;
867 		return -1;
868 	} else if (sbytes == 0) {
869 		errno = EAGAIN;
870 		return -1;
871 	}
872 
873 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
874 
875 	if (bytes == 0) {
876 		/* The only way this happens is if diov is 0 length */
877 		errno = EINVAL;
878 		return -1;
879 	}
880 
881 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
882 
883 	/* If we drained the pipe, mark it appropriately */
884 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
885 		assert(sock->pipe_has_data == true);
886 
887 		group = __posix_group_impl(sock->base.group_impl);
888 		if (group && !sock->socket_has_data) {
889 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
890 		}
891 
892 		sock->pipe_has_data = false;
893 	}
894 
895 	return bytes;
896 }
897 
898 static inline ssize_t
899 posix_sock_read(struct spdk_posix_sock *sock)
900 {
901 	struct iovec iov[2];
902 	int bytes_avail, bytes_recvd;
903 	struct spdk_posix_sock_group_impl *group;
904 
905 	bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
906 
907 	if (bytes_avail <= 0) {
908 		return bytes_avail;
909 	}
910 
911 	bytes_recvd = readv(sock->fd, iov, 2);
912 
913 	assert(sock->pipe_has_data == false);
914 
915 	if (bytes_recvd <= 0) {
916 		/* Errors count as draining the socket data */
917 		if (sock->base.group_impl && sock->socket_has_data) {
918 			group = __posix_group_impl(sock->base.group_impl);
919 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
920 		}
921 
922 		sock->socket_has_data = false;
923 
924 		return bytes_recvd;
925 	}
926 
927 	spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);
928 
929 #if DEBUG
930 	if (sock->base.group_impl) {
931 		assert(sock->socket_has_data == true);
932 	}
933 #endif
934 
935 	sock->pipe_has_data = true;
936 	if (bytes_recvd < bytes_avail) {
937 		/* We drained the kernel socket entirely. */
938 		sock->socket_has_data = false;
939 	}
940 
941 	return bytes_recvd;
942 }
943 
944 static ssize_t
945 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
946 {
947 	struct spdk_posix_sock *sock = __posix_sock(_sock);
948 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
949 	int rc, i;
950 	size_t len;
951 
952 	if (sock->recv_pipe == NULL) {
953 		assert(sock->pipe_has_data == false);
954 		if (group && sock->socket_has_data) {
955 			sock->socket_has_data = false;
956 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
957 		}
958 		return readv(sock->fd, iov, iovcnt);
959 	}
960 
961 	/* If the socket is not in a group, we must assume it always has
962 	 * data waiting for us because it is not epolled */
963 	if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) {
964 		/* If the user is receiving a sufficiently large amount of data,
965 		 * receive directly to their buffers. */
966 		len = 0;
967 		for (i = 0; i < iovcnt; i++) {
968 			len += iov[i].iov_len;
969 		}
970 
971 		if (len >= MIN_SOCK_PIPE_SIZE) {
972 			/* TODO: Should this detect if kernel socket is drained? */
973 			return readv(sock->fd, iov, iovcnt);
974 		}
975 
976 		/* Otherwise, do a big read into our pipe */
977 		rc = posix_sock_read(sock);
978 		if (rc <= 0) {
979 			return rc;
980 		}
981 	}
982 
983 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
984 }
985 
986 static ssize_t
987 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
988 {
989 	struct iovec iov[1];
990 
991 	iov[0].iov_base = buf;
992 	iov[0].iov_len = len;
993 
994 	return posix_sock_readv(sock, iov, 1);
995 }
996 
997 static ssize_t
998 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
999 {
1000 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1001 	int rc;
1002 
1003 	/* In order to process a writev, we need to flush any asynchronous writes
1004 	 * first. */
1005 	rc = _sock_flush(_sock);
1006 	if (rc < 0) {
1007 		return rc;
1008 	}
1009 
1010 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
1011 		/* We weren't able to flush all requests */
1012 		errno = EAGAIN;
1013 		return -1;
1014 	}
1015 
1016 	return writev(sock->fd, iov, iovcnt);
1017 }
1018 
1019 static void
1020 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1021 {
1022 	int rc;
1023 
1024 	spdk_sock_request_queue(sock, req);
1025 
1026 	/* If there are a sufficient number queued, just flush them out immediately. */
1027 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1028 		rc = _sock_flush(sock);
1029 		if (rc) {
1030 			spdk_sock_abort_requests(sock);
1031 		}
1032 	}
1033 }
1034 
1035 static int
1036 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1037 {
1038 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1039 	int val;
1040 	int rc;
1041 
1042 	assert(sock != NULL);
1043 
1044 	val = nbytes;
1045 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1046 	if (rc != 0) {
1047 		return -1;
1048 	}
1049 	return 0;
1050 }
1051 
1052 static bool
1053 posix_sock_is_ipv6(struct spdk_sock *_sock)
1054 {
1055 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1056 	struct sockaddr_storage sa;
1057 	socklen_t salen;
1058 	int rc;
1059 
1060 	assert(sock != NULL);
1061 
1062 	memset(&sa, 0, sizeof sa);
1063 	salen = sizeof sa;
1064 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1065 	if (rc != 0) {
1066 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1067 		return false;
1068 	}
1069 
1070 	return (sa.ss_family == AF_INET6);
1071 }
1072 
1073 static bool
1074 posix_sock_is_ipv4(struct spdk_sock *_sock)
1075 {
1076 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1077 	struct sockaddr_storage sa;
1078 	socklen_t salen;
1079 	int rc;
1080 
1081 	assert(sock != NULL);
1082 
1083 	memset(&sa, 0, sizeof sa);
1084 	salen = sizeof sa;
1085 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1086 	if (rc != 0) {
1087 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1088 		return false;
1089 	}
1090 
1091 	return (sa.ss_family == AF_INET);
1092 }
1093 
1094 static bool
1095 posix_sock_is_connected(struct spdk_sock *_sock)
1096 {
1097 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1098 	uint8_t byte;
1099 	int rc;
1100 
1101 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1102 	if (rc == 0) {
1103 		return false;
1104 	}
1105 
1106 	if (rc < 0) {
1107 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1108 			return true;
1109 		}
1110 
1111 		return false;
1112 	}
1113 
1114 	return true;
1115 }
1116 
1117 static struct spdk_sock_group_impl *
1118 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1119 {
1120 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1121 	struct spdk_sock_group_impl *group_impl;
1122 
1123 	if (sock->placement_id != -1) {
1124 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint);
1125 		return group_impl;
1126 	}
1127 
1128 	return NULL;
1129 }
1130 
1131 static struct spdk_sock_group_impl *
1132 posix_sock_group_impl_create(void)
1133 {
1134 	struct spdk_posix_sock_group_impl *group_impl;
1135 	int fd;
1136 
1137 #if defined(SPDK_EPOLL)
1138 	fd = epoll_create1(0);
1139 #elif defined(SPDK_KEVENT)
1140 	fd = kqueue();
1141 #endif
1142 	if (fd == -1) {
1143 		return NULL;
1144 	}
1145 
1146 	group_impl = calloc(1, sizeof(*group_impl));
1147 	if (group_impl == NULL) {
1148 		SPDK_ERRLOG("group_impl allocation failed\n");
1149 		close(fd);
1150 		return NULL;
1151 	}
1152 
1153 	group_impl->fd = fd;
1154 	TAILQ_INIT(&group_impl->socks_with_data);
1155 	group_impl->placement_id = -1;
1156 
1157 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1158 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1159 		group_impl->placement_id = spdk_env_get_current_core();
1160 	}
1161 
1162 	return &group_impl->base;
1163 }
1164 
1165 static void
1166 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock,
1167 		int placement_id)
1168 {
1169 #if defined(SO_MARK)
1170 	int rc;
1171 
1172 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK,
1173 			&placement_id, sizeof(placement_id));
1174 	if (rc != 0) {
1175 		/* Not fatal */
1176 		SPDK_ERRLOG("Error setting SO_MARK\n");
1177 		return;
1178 	}
1179 
1180 	rc = spdk_sock_map_insert(&g_map, placement_id, &group->base);
1181 	if (rc != 0) {
1182 		/* Not fatal */
1183 		SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1184 		return;
1185 	}
1186 
1187 	sock->placement_id = placement_id;
1188 #endif
1189 }
1190 
1191 static void
1192 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1193 {
1194 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1195 
1196 	if (group->placement_id == -1) {
1197 		group->placement_id = spdk_sock_map_find_free(&g_map);
1198 
1199 		/* If a free placement id is found, update existing sockets in this group */
1200 		if (group->placement_id != -1) {
1201 			struct spdk_sock  *sock, *tmp;
1202 
1203 			TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1204 				posix_sock_mark(group, __posix_sock(sock), group->placement_id);
1205 			}
1206 		}
1207 	}
1208 
1209 	if (group->placement_id != -1) {
1210 		/*
1211 		 * group placement id is already determined for this poll group.
1212 		 * Mark socket with group's placement id.
1213 		 */
1214 		posix_sock_mark(group, __posix_sock(_sock), group->placement_id);
1215 	}
1216 }
1217 
1218 static int
1219 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1220 {
1221 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1222 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1223 	int rc;
1224 
1225 #if defined(SPDK_EPOLL)
1226 	struct epoll_event event;
1227 
1228 	memset(&event, 0, sizeof(event));
1229 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1230 	event.events = EPOLLIN | EPOLLERR;
1231 	event.data.ptr = sock;
1232 
1233 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1234 #elif defined(SPDK_KEVENT)
1235 	struct kevent event;
1236 	struct timespec ts = {0};
1237 
1238 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1239 
1240 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1241 #endif
1242 
1243 	if (rc != 0) {
1244 		return rc;
1245 	}
1246 
1247 	/* switched from another polling group due to scheduling */
1248 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1249 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1250 		sock->pipe_has_data = true;
1251 		sock->socket_has_data = false;
1252 		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
1253 	}
1254 
1255 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
1256 		posix_sock_update_mark(_group, _sock);
1257 	} else if (sock->placement_id != -1) {
1258 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1259 		if (rc != 0) {
1260 			SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1261 			/* Do not treat this as an error. The system will continue running. */
1262 		}
1263 	}
1264 
1265 	return rc;
1266 }
1267 
1268 static int
1269 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1270 {
1271 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1272 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1273 	int rc;
1274 
1275 	if (sock->pipe_has_data || sock->socket_has_data) {
1276 		TAILQ_REMOVE(&group->socks_with_data, sock, link);
1277 		sock->pipe_has_data = false;
1278 		sock->socket_has_data = false;
1279 	}
1280 
1281 	if (sock->placement_id != -1) {
1282 		spdk_sock_map_release(&g_map, sock->placement_id);
1283 	}
1284 
1285 #if defined(SPDK_EPOLL)
1286 	struct epoll_event event;
1287 
1288 	/* Event parameter is ignored but some old kernel version still require it. */
1289 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1290 #elif defined(SPDK_KEVENT)
1291 	struct kevent event;
1292 	struct timespec ts = {0};
1293 
1294 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1295 
1296 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1297 	if (rc == 0 && event.flags & EV_ERROR) {
1298 		rc = -1;
1299 		errno = event.data;
1300 	}
1301 #endif
1302 
1303 	spdk_sock_abort_requests(_sock);
1304 
1305 	return rc;
1306 }
1307 
1308 static int
1309 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1310 			   struct spdk_sock **socks)
1311 {
1312 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1313 	struct spdk_sock *sock, *tmp;
1314 	int num_events, i, rc;
1315 	struct spdk_posix_sock *psock, *ptmp;
1316 #if defined(SPDK_EPOLL)
1317 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1318 #elif defined(SPDK_KEVENT)
1319 	struct kevent events[MAX_EVENTS_PER_POLL];
1320 	struct timespec ts = {0};
1321 #endif
1322 
1323 #ifdef SPDK_ZEROCOPY
1324 	/* When all of the following conditions are met
1325 	 * - non-blocking socket
1326 	 * - zero copy is enabled
1327 	 * - interrupts suppressed (i.e. busy polling)
1328 	 * - the NIC tx queue is full at the time sendmsg() is called
1329 	 * - epoll_wait determines there is an EPOLLIN event for the socket
1330 	 * then we can get into a situation where data we've sent is queued
1331 	 * up in the kernel network stack, but interrupts have been suppressed
1332 	 * because other traffic is flowing so the kernel misses the signal
1333 	 * to flush the software tx queue. If there wasn't incoming data
1334 	 * pending on the socket, then epoll_wait would have been sufficient
1335 	 * to kick off the send operation, but since there is a pending event
1336 	 * epoll_wait does not trigger the necessary operation.
1337 	 *
1338 	 * We deal with this by checking for all of the above conditions and
1339 	 * additionally looking for EPOLLIN events that were not consumed from
1340 	 * the last poll loop. We take this to mean that the upper layer is
1341 	 * unable to consume them because it is blocked waiting for resources
1342 	 * to free up, and those resources are most likely freed in response
1343 	 * to a pending asynchronous write completing.
1344 	 *
1345 	 * Additionally, sockets that have the same placement_id actually share
1346 	 * an underlying hardware queue. That means polling one of them is
1347 	 * equivalent to polling all of them. As a quick mechanism to avoid
1348 	 * making extra poll() calls, stash the last placement_id during the loop
1349 	 * and only poll if it's not the same. The overwhelmingly common case
1350 	 * is that all sockets in this list have the same placement_id because
1351 	 * SPDK is intentionally grouping sockets by that value, so even
1352 	 * though this won't stop all extra calls to poll(), it's very fast
1353 	 * and will catch all of them in practice.
1354 	 */
1355 	int last_placement_id = -1;
1356 
1357 	TAILQ_FOREACH(psock, &group->socks_with_data, link) {
1358 		if (psock->zcopy && psock->placement_id >= 0 &&
1359 		    psock->placement_id != last_placement_id) {
1360 			struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0};
1361 
1362 			poll(&pfd, 1, 0);
1363 			last_placement_id = psock->placement_id;
1364 		}
1365 	}
1366 #endif
1367 
1368 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1369 	 * a completion callback could remove the sock from the
1370 	 * group. */
1371 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1372 		rc = _sock_flush(sock);
1373 		if (rc) {
1374 			spdk_sock_abort_requests(sock);
1375 		}
1376 	}
1377 
1378 	assert(max_events > 0);
1379 
1380 #if defined(SPDK_EPOLL)
1381 	num_events = epoll_wait(group->fd, events, max_events, 0);
1382 #elif defined(SPDK_KEVENT)
1383 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1384 #endif
1385 
1386 	if (num_events == -1) {
1387 		return -1;
1388 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
1389 		sock = TAILQ_FIRST(&_group->socks);
1390 		psock = __posix_sock(sock);
1391 		/* poll() is called here to busy poll the queue associated with
1392 		 * first socket in list and potentially reap incoming data.
1393 		 */
1394 		if (sock->opts.priority) {
1395 			struct pollfd pfd = {0, 0, 0};
1396 
1397 			pfd.fd = psock->fd;
1398 			pfd.events = POLLIN | POLLERR;
1399 			poll(&pfd, 1, 0);
1400 		}
1401 	}
1402 
1403 	for (i = 0; i < num_events; i++) {
1404 #if defined(SPDK_EPOLL)
1405 		sock = events[i].data.ptr;
1406 		psock = __posix_sock(sock);
1407 
1408 #ifdef SPDK_ZEROCOPY
1409 		if (events[i].events & EPOLLERR) {
1410 			rc = _sock_check_zcopy(sock);
1411 			/* If the socket was closed or removed from
1412 			 * the group in response to a send ack, don't
1413 			 * add it to the array here. */
1414 			if (rc || sock->cb_fn == NULL) {
1415 				continue;
1416 			}
1417 		}
1418 #endif
1419 		if ((events[i].events & EPOLLIN) == 0) {
1420 			continue;
1421 		}
1422 
1423 #elif defined(SPDK_KEVENT)
1424 		sock = events[i].udata;
1425 		psock = __posix_sock(sock);
1426 #endif
1427 
1428 		/* If the socket is not already in the list, add it now */
1429 		if (!psock->socket_has_data && !psock->pipe_has_data) {
1430 			TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link);
1431 		}
1432 		psock->socket_has_data = true;
1433 	}
1434 
1435 	num_events = 0;
1436 
1437 	TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) {
1438 		if (num_events == max_events) {
1439 			break;
1440 		}
1441 
1442 		/* If the socket's cb_fn is NULL, just remove it from the
1443 		 * list and do not add it to socks array */
1444 		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
1445 			psock->socket_has_data = false;
1446 			psock->pipe_has_data = false;
1447 			TAILQ_REMOVE(&group->socks_with_data, psock, link);
1448 			continue;
1449 		}
1450 
1451 		socks[num_events++] = &psock->base;
1452 	}
1453 
1454 	/* Cycle the has_data list so that each time we poll things aren't
1455 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1456 	 * A B C D E F
1457 	 * And all 6 sockets had epoll events, but max_events is only 3. That means
1458 	 * psock currently points at D. We want to rearrange the list to the following:
1459 	 * D E F A B C
1460 	 *
1461 	 * The variables below are named according to this example to make it easier to
1462 	 * follow the swaps.
1463 	 */
1464 	if (psock != NULL) {
1465 		struct spdk_posix_sock *pa, *pc, *pd, *pf;
1466 
1467 		/* Capture pointers to the elements we need */
1468 		pd = psock;
1469 		pc = TAILQ_PREV(pd, spdk_has_data_list, link);
1470 		pa = TAILQ_FIRST(&group->socks_with_data);
1471 		pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list);
1472 
1473 		/* Break the link between C and D */
1474 		pc->link.tqe_next = NULL;
1475 
1476 		/* Connect F to A */
1477 		pf->link.tqe_next = pa;
1478 		pa->link.tqe_prev = &pf->link.tqe_next;
1479 
1480 		/* Fix up the list first/last pointers */
1481 		group->socks_with_data.tqh_first = pd;
1482 		group->socks_with_data.tqh_last = &pc->link.tqe_next;
1483 
1484 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1485 		pd->link.tqe_prev = &group->socks_with_data.tqh_first;
1486 	}
1487 
1488 	return num_events;
1489 }
1490 
1491 static int
1492 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1493 {
1494 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1495 	int rc;
1496 
1497 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1498 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1499 	}
1500 
1501 	rc = close(group->fd);
1502 	free(group);
1503 	return rc;
1504 }
1505 
1506 static int
1507 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1508 {
1509 	if (!opts || !len) {
1510 		errno = EINVAL;
1511 		return -1;
1512 	}
1513 	memset(opts, 0, *len);
1514 
1515 #define FIELD_OK(field) \
1516 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1517 
1518 #define GET_FIELD(field) \
1519 	if (FIELD_OK(field)) { \
1520 		opts->field = g_spdk_posix_sock_impl_opts.field; \
1521 	}
1522 
1523 	GET_FIELD(recv_buf_size);
1524 	GET_FIELD(send_buf_size);
1525 	GET_FIELD(enable_recv_pipe);
1526 	GET_FIELD(enable_zerocopy_send);
1527 	GET_FIELD(enable_quickack);
1528 	GET_FIELD(enable_placement_id);
1529 	GET_FIELD(enable_zerocopy_send_server);
1530 	GET_FIELD(enable_zerocopy_send_client);
1531 	GET_FIELD(zerocopy_threshold);
1532 
1533 #undef GET_FIELD
1534 #undef FIELD_OK
1535 
1536 	*len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
1537 	return 0;
1538 }
1539 
1540 static int
1541 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1542 {
1543 	if (!opts) {
1544 		errno = EINVAL;
1545 		return -1;
1546 	}
1547 
1548 #define FIELD_OK(field) \
1549 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1550 
1551 #define SET_FIELD(field) \
1552 	if (FIELD_OK(field)) { \
1553 		g_spdk_posix_sock_impl_opts.field = opts->field; \
1554 	}
1555 
1556 	SET_FIELD(recv_buf_size);
1557 	SET_FIELD(send_buf_size);
1558 	SET_FIELD(enable_recv_pipe);
1559 	SET_FIELD(enable_zerocopy_send);
1560 	SET_FIELD(enable_quickack);
1561 	SET_FIELD(enable_placement_id);
1562 	SET_FIELD(enable_zerocopy_send_server);
1563 	SET_FIELD(enable_zerocopy_send_client);
1564 	SET_FIELD(zerocopy_threshold);
1565 
1566 #undef SET_FIELD
1567 #undef FIELD_OK
1568 
1569 	return 0;
1570 }
1571 
1572 
1573 static struct spdk_net_impl g_posix_net_impl = {
1574 	.name		= "posix",
1575 	.getaddr	= posix_sock_getaddr,
1576 	.connect	= posix_sock_connect,
1577 	.listen		= posix_sock_listen,
1578 	.accept		= posix_sock_accept,
1579 	.close		= posix_sock_close,
1580 	.recv		= posix_sock_recv,
1581 	.readv		= posix_sock_readv,
1582 	.writev		= posix_sock_writev,
1583 	.writev_async	= posix_sock_writev_async,
1584 	.flush		= posix_sock_flush,
1585 	.set_recvlowat	= posix_sock_set_recvlowat,
1586 	.set_recvbuf	= posix_sock_set_recvbuf,
1587 	.set_sendbuf	= posix_sock_set_sendbuf,
1588 	.is_ipv6	= posix_sock_is_ipv6,
1589 	.is_ipv4	= posix_sock_is_ipv4,
1590 	.is_connected	= posix_sock_is_connected,
1591 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
1592 	.group_impl_create	= posix_sock_group_impl_create,
1593 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
1594 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
1595 	.group_impl_poll	= posix_sock_group_impl_poll,
1596 	.group_impl_close	= posix_sock_group_impl_close,
1597 	.get_opts	= posix_sock_impl_get_opts,
1598 	.set_opts	= posix_sock_impl_set_opts,
1599 };
1600 
1601 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1602