xref: /spdk/module/sock/posix/posix.c (revision 4e527910925f8d001001e96f1719d015a7a51a94)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation. All rights reserved.
5  *   Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved.
6  *   Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #if defined(__FreeBSD__)
38 #include <sys/event.h>
39 #define SPDK_KEVENT
40 #else
41 #include <sys/epoll.h>
42 #define SPDK_EPOLL
43 #endif
44 
45 #if defined(__linux__)
46 #include <linux/errqueue.h>
47 #endif
48 
49 #include "spdk/env.h"
50 #include "spdk/log.h"
51 #include "spdk/pipe.h"
52 #include "spdk/sock.h"
53 #include "spdk/util.h"
54 #include "spdk_internal/sock.h"
55 #include "../sock_kernel.h"
56 
57 #define MAX_TMPBUF 1024
58 #define PORTNUMLEN 32
59 
60 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
61 #define SPDK_ZEROCOPY
62 #endif
63 
64 struct spdk_posix_sock {
65 	struct spdk_sock	base;
66 	int			fd;
67 
68 	uint32_t		sendmsg_idx;
69 
70 	struct spdk_pipe	*recv_pipe;
71 	void			*recv_buf;
72 	int			recv_buf_sz;
73 	bool			pipe_has_data;
74 	bool			socket_has_data;
75 	bool			zcopy;
76 
77 	int			placement_id;
78 
79 	TAILQ_ENTRY(spdk_posix_sock)	link;
80 };
81 
82 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock);
83 
84 struct spdk_posix_sock_group_impl {
85 	struct spdk_sock_group_impl	base;
86 	int				fd;
87 	struct spdk_has_data_list	socks_with_data;
88 	int				placement_id;
89 };
90 
91 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
92 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
93 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
94 	.enable_recv_pipe = true,
95 	.enable_quickack = false,
96 	.enable_placement_id = PLACEMENT_NONE,
97 	.enable_zerocopy_send_server = true,
98 	.enable_zerocopy_send_client = false
99 };
100 
101 static struct spdk_sock_map g_map = {
102 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
103 	.mtx = PTHREAD_MUTEX_INITIALIZER
104 };
105 
106 __attribute((destructor)) static void
107 posix_sock_map_cleanup(void)
108 {
109 	spdk_sock_map_cleanup(&g_map);
110 }
111 
112 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
113 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
114 
115 static int
116 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
117 		   char *caddr, int clen, uint16_t *cport)
118 {
119 	struct spdk_posix_sock *sock = __posix_sock(_sock);
120 	struct sockaddr_storage sa;
121 	socklen_t salen;
122 	int rc;
123 
124 	assert(sock != NULL);
125 
126 	memset(&sa, 0, sizeof sa);
127 	salen = sizeof sa;
128 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
129 	if (rc != 0) {
130 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
131 		return -1;
132 	}
133 
134 	switch (sa.ss_family) {
135 	case AF_UNIX:
136 		/* Acceptable connection types that don't have IPs */
137 		return 0;
138 	case AF_INET:
139 	case AF_INET6:
140 		/* Code below will get IP addresses */
141 		break;
142 	default:
143 		/* Unsupported socket family */
144 		return -1;
145 	}
146 
147 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
148 	if (rc != 0) {
149 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
150 		return -1;
151 	}
152 
153 	if (sport) {
154 		if (sa.ss_family == AF_INET) {
155 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
156 		} else if (sa.ss_family == AF_INET6) {
157 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
158 		}
159 	}
160 
161 	memset(&sa, 0, sizeof sa);
162 	salen = sizeof sa;
163 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
164 	if (rc != 0) {
165 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
166 		return -1;
167 	}
168 
169 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
170 	if (rc != 0) {
171 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
172 		return -1;
173 	}
174 
175 	if (cport) {
176 		if (sa.ss_family == AF_INET) {
177 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
178 		} else if (sa.ss_family == AF_INET6) {
179 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
180 		}
181 	}
182 
183 	return 0;
184 }
185 
186 enum posix_sock_create_type {
187 	SPDK_SOCK_CREATE_LISTEN,
188 	SPDK_SOCK_CREATE_CONNECT,
189 };
190 
191 static int
192 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
193 {
194 	uint8_t *new_buf;
195 	struct spdk_pipe *new_pipe;
196 	struct iovec siov[2];
197 	struct iovec diov[2];
198 	int sbytes;
199 	ssize_t bytes;
200 
201 	if (sock->recv_buf_sz == sz) {
202 		return 0;
203 	}
204 
205 	/* If the new size is 0, just free the pipe */
206 	if (sz == 0) {
207 		spdk_pipe_destroy(sock->recv_pipe);
208 		free(sock->recv_buf);
209 		sock->recv_pipe = NULL;
210 		sock->recv_buf = NULL;
211 		return 0;
212 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
213 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
214 		return -1;
215 	}
216 
217 	/* Round up to next 64 byte multiple */
218 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
219 	if (!new_buf) {
220 		SPDK_ERRLOG("socket recv buf allocation failed\n");
221 		return -ENOMEM;
222 	}
223 
224 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
225 	if (new_pipe == NULL) {
226 		SPDK_ERRLOG("socket pipe allocation failed\n");
227 		free(new_buf);
228 		return -ENOMEM;
229 	}
230 
231 	if (sock->recv_pipe != NULL) {
232 		/* Pull all of the data out of the old pipe */
233 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
234 		if (sbytes > sz) {
235 			/* Too much data to fit into the new pipe size */
236 			spdk_pipe_destroy(new_pipe);
237 			free(new_buf);
238 			return -EINVAL;
239 		}
240 
241 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
242 		assert(sbytes == sz);
243 
244 		bytes = spdk_iovcpy(siov, 2, diov, 2);
245 		spdk_pipe_writer_advance(new_pipe, bytes);
246 
247 		spdk_pipe_destroy(sock->recv_pipe);
248 		free(sock->recv_buf);
249 	}
250 
251 	sock->recv_buf_sz = sz;
252 	sock->recv_buf = new_buf;
253 	sock->recv_pipe = new_pipe;
254 
255 	return 0;
256 }
257 
258 static int
259 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
260 {
261 	struct spdk_posix_sock *sock = __posix_sock(_sock);
262 	int rc;
263 
264 	assert(sock != NULL);
265 
266 	if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) {
267 		rc = posix_sock_alloc_pipe(sock, sz);
268 		if (rc) {
269 			return rc;
270 		}
271 	}
272 
273 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
274 	if (sz < MIN_SO_RCVBUF_SIZE) {
275 		sz = MIN_SO_RCVBUF_SIZE;
276 	}
277 
278 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
279 	if (rc < 0) {
280 		return rc;
281 	}
282 
283 	return 0;
284 }
285 
286 static int
287 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
288 {
289 	struct spdk_posix_sock *sock = __posix_sock(_sock);
290 	int rc;
291 
292 	assert(sock != NULL);
293 
294 	if (sz < MIN_SO_SNDBUF_SIZE) {
295 		sz = MIN_SO_SNDBUF_SIZE;
296 	}
297 
298 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
299 	if (rc < 0) {
300 		return rc;
301 	}
302 
303 	return 0;
304 }
305 
306 static struct spdk_posix_sock *
307 posix_sock_alloc(int fd, bool enable_zero_copy)
308 {
309 	struct spdk_posix_sock *sock;
310 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
311 	int flag;
312 	int rc;
313 #endif
314 
315 	sock = calloc(1, sizeof(*sock));
316 	if (sock == NULL) {
317 		SPDK_ERRLOG("sock allocation failed\n");
318 		return NULL;
319 	}
320 
321 	sock->fd = fd;
322 
323 #if defined(SPDK_ZEROCOPY)
324 	flag = 1;
325 
326 	if (enable_zero_copy) {
327 		/* Try to turn on zero copy sends */
328 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
329 		if (rc == 0) {
330 			sock->zcopy = true;
331 		}
332 	}
333 #endif
334 
335 #if defined(__linux__)
336 	flag = 1;
337 
338 	if (g_spdk_posix_sock_impl_opts.enable_quickack) {
339 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
340 		if (rc != 0) {
341 			SPDK_ERRLOG("quickack was failed to set\n");
342 		}
343 	}
344 
345 	spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id,
346 				   &sock->placement_id);
347 
348 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
349 		/* Save placement_id */
350 		spdk_sock_map_insert(&g_map, sock->placement_id, NULL);
351 	}
352 #endif
353 
354 	return sock;
355 }
356 
357 static struct spdk_sock *
358 posix_sock_create(const char *ip, int port,
359 		  enum posix_sock_create_type type,
360 		  struct spdk_sock_opts *opts)
361 {
362 	struct spdk_posix_sock *sock;
363 	char buf[MAX_TMPBUF];
364 	char portnum[PORTNUMLEN];
365 	char *p;
366 	struct addrinfo hints, *res, *res0;
367 	int fd, flag;
368 	int val = 1;
369 	int rc, sz;
370 	bool enable_zcopy_user_opts = true;
371 	bool enable_zcopy_impl_opts = true;
372 
373 	assert(opts != NULL);
374 
375 	if (ip == NULL) {
376 		return NULL;
377 	}
378 	if (ip[0] == '[') {
379 		snprintf(buf, sizeof(buf), "%s", ip + 1);
380 		p = strchr(buf, ']');
381 		if (p != NULL) {
382 			*p = '\0';
383 		}
384 		ip = (const char *) &buf[0];
385 	}
386 
387 	snprintf(portnum, sizeof portnum, "%d", port);
388 	memset(&hints, 0, sizeof hints);
389 	hints.ai_family = PF_UNSPEC;
390 	hints.ai_socktype = SOCK_STREAM;
391 	hints.ai_flags = AI_NUMERICSERV;
392 	hints.ai_flags |= AI_PASSIVE;
393 	hints.ai_flags |= AI_NUMERICHOST;
394 	rc = getaddrinfo(ip, portnum, &hints, &res0);
395 	if (rc != 0) {
396 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
397 		return NULL;
398 	}
399 
400 	/* try listen */
401 	fd = -1;
402 	for (res = res0; res != NULL; res = res->ai_next) {
403 retry:
404 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
405 		if (fd < 0) {
406 			/* error */
407 			continue;
408 		}
409 
410 		sz = g_spdk_posix_sock_impl_opts.recv_buf_size;
411 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
412 		if (rc) {
413 			/* Not fatal */
414 		}
415 
416 		sz = g_spdk_posix_sock_impl_opts.send_buf_size;
417 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
418 		if (rc) {
419 			/* Not fatal */
420 		}
421 
422 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
423 		if (rc != 0) {
424 			close(fd);
425 			fd = -1;
426 			/* error */
427 			continue;
428 		}
429 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
430 		if (rc != 0) {
431 			close(fd);
432 			fd = -1;
433 			/* error */
434 			continue;
435 		}
436 
437 #if defined(SO_PRIORITY)
438 		if (opts->priority) {
439 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
440 			if (rc != 0) {
441 				close(fd);
442 				fd = -1;
443 				/* error */
444 				continue;
445 			}
446 		}
447 #endif
448 
449 		if (res->ai_family == AF_INET6) {
450 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
451 			if (rc != 0) {
452 				close(fd);
453 				fd = -1;
454 				/* error */
455 				continue;
456 			}
457 		}
458 
459 		if (type == SPDK_SOCK_CREATE_LISTEN) {
460 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
461 			if (rc != 0) {
462 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
463 				switch (errno) {
464 				case EINTR:
465 					/* interrupted? */
466 					close(fd);
467 					goto retry;
468 				case EADDRNOTAVAIL:
469 					SPDK_ERRLOG("IP address %s not available. "
470 						    "Verify IP address in config file "
471 						    "and make sure setup script is "
472 						    "run before starting spdk app.\n", ip);
473 				/* FALLTHROUGH */
474 				default:
475 					/* try next family */
476 					close(fd);
477 					fd = -1;
478 					continue;
479 				}
480 			}
481 			/* bind OK */
482 			rc = listen(fd, 512);
483 			if (rc != 0) {
484 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
485 				close(fd);
486 				fd = -1;
487 				break;
488 			}
489 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server;
490 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
491 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
492 			if (rc != 0) {
493 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
494 				/* try next family */
495 				close(fd);
496 				fd = -1;
497 				continue;
498 			}
499 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client;
500 		}
501 
502 		flag = fcntl(fd, F_GETFL);
503 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
504 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
505 			close(fd);
506 			fd = -1;
507 			break;
508 		}
509 		break;
510 	}
511 	freeaddrinfo(res0);
512 
513 	if (fd < 0) {
514 		return NULL;
515 	}
516 
517 	/* Only enable zero copy for non-loopback sockets. */
518 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd);
519 
520 	sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts);
521 	if (sock == NULL) {
522 		SPDK_ERRLOG("sock allocation failed\n");
523 		close(fd);
524 		return NULL;
525 	}
526 
527 	return &sock->base;
528 }
529 
530 static struct spdk_sock *
531 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
532 {
533 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
534 }
535 
536 static struct spdk_sock *
537 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
538 {
539 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
540 }
541 
542 static struct spdk_sock *
543 posix_sock_accept(struct spdk_sock *_sock)
544 {
545 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
546 	struct sockaddr_storage		sa;
547 	socklen_t			salen;
548 	int				rc, fd;
549 	struct spdk_posix_sock		*new_sock;
550 	int				flag;
551 
552 	memset(&sa, 0, sizeof(sa));
553 	salen = sizeof(sa);
554 
555 	assert(sock != NULL);
556 
557 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
558 
559 	if (rc == -1) {
560 		return NULL;
561 	}
562 
563 	fd = rc;
564 
565 	flag = fcntl(fd, F_GETFL);
566 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
567 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
568 		close(fd);
569 		return NULL;
570 	}
571 
572 #if defined(SO_PRIORITY)
573 	/* The priority is not inherited, so call this function again */
574 	if (sock->base.opts.priority) {
575 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
576 		if (rc != 0) {
577 			close(fd);
578 			return NULL;
579 		}
580 	}
581 #endif
582 
583 	/* Inherit the zero copy feature from the listen socket */
584 	new_sock = posix_sock_alloc(fd, sock->zcopy);
585 	if (new_sock == NULL) {
586 		close(fd);
587 		return NULL;
588 	}
589 
590 	return &new_sock->base;
591 }
592 
593 static int
594 posix_sock_close(struct spdk_sock *_sock)
595 {
596 	struct spdk_posix_sock *sock = __posix_sock(_sock);
597 
598 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
599 
600 	/* If the socket fails to close, the best choice is to
601 	 * leak the fd but continue to free the rest of the sock
602 	 * memory. */
603 	close(sock->fd);
604 
605 	spdk_pipe_destroy(sock->recv_pipe);
606 	free(sock->recv_buf);
607 	free(sock);
608 
609 	return 0;
610 }
611 
612 #ifdef SPDK_ZEROCOPY
613 static int
614 _sock_check_zcopy(struct spdk_sock *sock)
615 {
616 	struct spdk_posix_sock *psock = __posix_sock(sock);
617 	struct msghdr msgh = {};
618 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
619 	ssize_t rc;
620 	struct sock_extended_err *serr;
621 	struct cmsghdr *cm;
622 	uint32_t idx;
623 	struct spdk_sock_request *req, *treq;
624 	bool found;
625 
626 	msgh.msg_control = buf;
627 	msgh.msg_controllen = sizeof(buf);
628 
629 	while (true) {
630 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
631 
632 		if (rc < 0) {
633 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
634 				return 0;
635 			}
636 
637 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
638 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
639 			} else {
640 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
641 			}
642 			return 0;
643 		}
644 
645 		cm = CMSG_FIRSTHDR(&msgh);
646 		if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
647 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
648 			return 0;
649 		}
650 
651 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
652 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
653 			SPDK_WARNLOG("Unexpected extended error origin\n");
654 			return 0;
655 		}
656 
657 		/* Most of the time, the pending_reqs array is in the exact
658 		 * order we need such that all of the requests to complete are
659 		 * in order, in the front. It is guaranteed that all requests
660 		 * belonging to the same sendmsg call are sequential, so once
661 		 * we encounter one match we can stop looping as soon as a
662 		 * non-match is found.
663 		 */
664 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
665 			found = false;
666 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
667 				if (req->internal.offset == idx) {
668 					found = true;
669 
670 					rc = spdk_sock_request_put(sock, req, 0);
671 					if (rc < 0) {
672 						return rc;
673 					}
674 
675 				} else if (found) {
676 					break;
677 				}
678 			}
679 		}
680 	}
681 
682 	return 0;
683 }
684 #endif
685 
686 static int
687 _sock_flush(struct spdk_sock *sock)
688 {
689 	struct spdk_posix_sock *psock = __posix_sock(sock);
690 	struct msghdr msg = {};
691 	int flags;
692 	struct iovec iovs[IOV_BATCH_SIZE];
693 	int iovcnt;
694 	int retval;
695 	struct spdk_sock_request *req;
696 	int i;
697 	ssize_t rc;
698 	unsigned int offset;
699 	size_t len;
700 
701 	/* Can't flush from within a callback or we end up with recursive calls */
702 	if (sock->cb_cnt > 0) {
703 		return 0;
704 	}
705 
706 	iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL);
707 
708 	if (iovcnt == 0) {
709 		return 0;
710 	}
711 
712 	/* Perform the vectored write */
713 	msg.msg_iov = iovs;
714 	msg.msg_iovlen = iovcnt;
715 #ifdef SPDK_ZEROCOPY
716 	if (psock->zcopy) {
717 		flags = MSG_ZEROCOPY;
718 	} else
719 #endif
720 	{
721 		flags = 0;
722 	}
723 	rc = sendmsg(psock->fd, &msg, flags);
724 	if (rc <= 0) {
725 		if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
726 			return 0;
727 		}
728 		return rc;
729 	}
730 
731 	/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
732 	 * req->internal.offset, so sendmsg_idx should not be zero  */
733 	if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
734 		psock->sendmsg_idx = 1;
735 	} else {
736 		psock->sendmsg_idx++;
737 	}
738 
739 	/* Consume the requests that were actually written */
740 	req = TAILQ_FIRST(&sock->queued_reqs);
741 	while (req) {
742 		offset = req->internal.offset;
743 
744 		for (i = 0; i < req->iovcnt; i++) {
745 			/* Advance by the offset first */
746 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
747 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
748 				continue;
749 			}
750 
751 			/* Calculate the remaining length of this element */
752 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
753 
754 			if (len > (size_t)rc) {
755 				/* This element was partially sent. */
756 				req->internal.offset += rc;
757 				return 0;
758 			}
759 
760 			offset = 0;
761 			req->internal.offset += len;
762 			rc -= len;
763 		}
764 
765 		/* Handled a full request. */
766 		spdk_sock_request_pend(sock, req);
767 
768 		if (!psock->zcopy) {
769 			/* The sendmsg syscall above isn't currently asynchronous,
770 			* so it's already done. */
771 			retval = spdk_sock_request_put(sock, req, 0);
772 			if (retval) {
773 				break;
774 			}
775 		} else {
776 			/* Re-use the offset field to hold the sendmsg call index. The
777 			 * index is 0 based, so subtract one here because we've already
778 			 * incremented above. */
779 			req->internal.offset = psock->sendmsg_idx - 1;
780 		}
781 
782 		if (rc == 0) {
783 			break;
784 		}
785 
786 		req = TAILQ_FIRST(&sock->queued_reqs);
787 	}
788 
789 	return 0;
790 }
791 
792 static int
793 posix_sock_flush(struct spdk_sock *sock)
794 {
795 #ifdef SPDK_ZEROCOPY
796 	struct spdk_posix_sock *psock = __posix_sock(sock);
797 
798 	if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) {
799 		_sock_check_zcopy(sock);
800 	}
801 #endif
802 
803 	return _sock_flush(sock);
804 }
805 
806 static ssize_t
807 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
808 {
809 	struct iovec siov[2];
810 	int sbytes;
811 	ssize_t bytes;
812 	struct spdk_posix_sock_group_impl *group;
813 
814 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
815 	if (sbytes < 0) {
816 		errno = EINVAL;
817 		return -1;
818 	} else if (sbytes == 0) {
819 		errno = EAGAIN;
820 		return -1;
821 	}
822 
823 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
824 
825 	if (bytes == 0) {
826 		/* The only way this happens is if diov is 0 length */
827 		errno = EINVAL;
828 		return -1;
829 	}
830 
831 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
832 
833 	/* If we drained the pipe, mark it appropriately */
834 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
835 		assert(sock->pipe_has_data == true);
836 
837 		group = __posix_group_impl(sock->base.group_impl);
838 		if (group && !sock->socket_has_data) {
839 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
840 		}
841 
842 		sock->pipe_has_data = false;
843 	}
844 
845 	return bytes;
846 }
847 
848 static inline ssize_t
849 posix_sock_read(struct spdk_posix_sock *sock)
850 {
851 	struct iovec iov[2];
852 	int bytes_avail, bytes_recvd;
853 	struct spdk_posix_sock_group_impl *group;
854 
855 	bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
856 
857 	if (bytes_avail <= 0) {
858 		return bytes_avail;
859 	}
860 
861 	bytes_recvd = readv(sock->fd, iov, 2);
862 
863 	assert(sock->pipe_has_data == false);
864 
865 	if (bytes_recvd <= 0) {
866 		/* Errors count as draining the socket data */
867 		if (sock->base.group_impl && sock->socket_has_data) {
868 			group = __posix_group_impl(sock->base.group_impl);
869 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
870 		}
871 
872 		sock->socket_has_data = false;
873 
874 		return bytes_recvd;
875 	}
876 
877 	spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);
878 
879 #if DEBUG
880 	if (sock->base.group_impl) {
881 		assert(sock->socket_has_data == true);
882 	}
883 #endif
884 
885 	sock->pipe_has_data = true;
886 	sock->socket_has_data = false;
887 
888 	return bytes_recvd;
889 }
890 
891 static ssize_t
892 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
893 {
894 	struct spdk_posix_sock *sock = __posix_sock(_sock);
895 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
896 	int rc, i;
897 	size_t len;
898 
899 	if (sock->recv_pipe == NULL) {
900 		assert(sock->pipe_has_data == false);
901 		if (group && sock->socket_has_data) {
902 			sock->socket_has_data = false;
903 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
904 		}
905 		return readv(sock->fd, iov, iovcnt);
906 	}
907 
908 	/* If the socket is not in a group, we must assume it always has
909 	 * data waiting for us because it is not epolled */
910 	if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) {
911 		/* If the user is receiving a sufficiently large amount of data,
912 		 * receive directly to their buffers. */
913 		len = 0;
914 		for (i = 0; i < iovcnt; i++) {
915 			len += iov[i].iov_len;
916 		}
917 
918 		if (len >= MIN_SOCK_PIPE_SIZE) {
919 			/* TODO: Should this detect if kernel socket is drained? */
920 			return readv(sock->fd, iov, iovcnt);
921 		}
922 
923 		/* Otherwise, do a big read into our pipe */
924 		rc = posix_sock_read(sock);
925 		if (rc <= 0) {
926 			return rc;
927 		}
928 	}
929 
930 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
931 }
932 
933 static ssize_t
934 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
935 {
936 	struct iovec iov[1];
937 
938 	iov[0].iov_base = buf;
939 	iov[0].iov_len = len;
940 
941 	return posix_sock_readv(sock, iov, 1);
942 }
943 
944 static ssize_t
945 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
946 {
947 	struct spdk_posix_sock *sock = __posix_sock(_sock);
948 	int rc;
949 
950 	/* In order to process a writev, we need to flush any asynchronous writes
951 	 * first. */
952 	rc = _sock_flush(_sock);
953 	if (rc < 0) {
954 		return rc;
955 	}
956 
957 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
958 		/* We weren't able to flush all requests */
959 		errno = EAGAIN;
960 		return -1;
961 	}
962 
963 	return writev(sock->fd, iov, iovcnt);
964 }
965 
966 static void
967 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
968 {
969 	int rc;
970 
971 	spdk_sock_request_queue(sock, req);
972 
973 	/* If there are a sufficient number queued, just flush them out immediately. */
974 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
975 		rc = _sock_flush(sock);
976 		if (rc) {
977 			spdk_sock_abort_requests(sock);
978 		}
979 	}
980 }
981 
982 static int
983 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
984 {
985 	struct spdk_posix_sock *sock = __posix_sock(_sock);
986 	int val;
987 	int rc;
988 
989 	assert(sock != NULL);
990 
991 	val = nbytes;
992 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
993 	if (rc != 0) {
994 		return -1;
995 	}
996 	return 0;
997 }
998 
999 static bool
1000 posix_sock_is_ipv6(struct spdk_sock *_sock)
1001 {
1002 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1003 	struct sockaddr_storage sa;
1004 	socklen_t salen;
1005 	int rc;
1006 
1007 	assert(sock != NULL);
1008 
1009 	memset(&sa, 0, sizeof sa);
1010 	salen = sizeof sa;
1011 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1012 	if (rc != 0) {
1013 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1014 		return false;
1015 	}
1016 
1017 	return (sa.ss_family == AF_INET6);
1018 }
1019 
1020 static bool
1021 posix_sock_is_ipv4(struct spdk_sock *_sock)
1022 {
1023 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1024 	struct sockaddr_storage sa;
1025 	socklen_t salen;
1026 	int rc;
1027 
1028 	assert(sock != NULL);
1029 
1030 	memset(&sa, 0, sizeof sa);
1031 	salen = sizeof sa;
1032 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1033 	if (rc != 0) {
1034 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1035 		return false;
1036 	}
1037 
1038 	return (sa.ss_family == AF_INET);
1039 }
1040 
1041 static bool
1042 posix_sock_is_connected(struct spdk_sock *_sock)
1043 {
1044 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1045 	uint8_t byte;
1046 	int rc;
1047 
1048 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1049 	if (rc == 0) {
1050 		return false;
1051 	}
1052 
1053 	if (rc < 0) {
1054 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1055 			return true;
1056 		}
1057 
1058 		return false;
1059 	}
1060 
1061 	return true;
1062 }
1063 
1064 static struct spdk_sock_group_impl *
1065 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock)
1066 {
1067 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1068 	struct spdk_sock_group_impl *group_impl;
1069 
1070 	if (sock->placement_id != -1) {
1071 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl);
1072 		return group_impl;
1073 	}
1074 
1075 	return NULL;
1076 }
1077 
1078 static struct spdk_sock_group_impl *
1079 posix_sock_group_impl_create(void)
1080 {
1081 	struct spdk_posix_sock_group_impl *group_impl;
1082 	int fd;
1083 
1084 #if defined(SPDK_EPOLL)
1085 	fd = epoll_create1(0);
1086 #elif defined(SPDK_KEVENT)
1087 	fd = kqueue();
1088 #endif
1089 	if (fd == -1) {
1090 		return NULL;
1091 	}
1092 
1093 	group_impl = calloc(1, sizeof(*group_impl));
1094 	if (group_impl == NULL) {
1095 		SPDK_ERRLOG("group_impl allocation failed\n");
1096 		close(fd);
1097 		return NULL;
1098 	}
1099 
1100 	group_impl->fd = fd;
1101 	TAILQ_INIT(&group_impl->socks_with_data);
1102 	group_impl->placement_id = -1;
1103 
1104 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1105 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1106 		group_impl->placement_id = spdk_env_get_current_core();
1107 	}
1108 
1109 	return &group_impl->base;
1110 }
1111 
1112 static void
1113 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock,
1114 		int placement_id)
1115 {
1116 #if defined(SO_MARK)
1117 	int rc;
1118 
1119 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK,
1120 			&placement_id, sizeof(placement_id));
1121 	if (rc != 0) {
1122 		/* Not fatal */
1123 		SPDK_ERRLOG("Error setting SO_MARK\n");
1124 		return;
1125 	}
1126 
1127 	rc = spdk_sock_map_insert(&g_map, placement_id, &group->base);
1128 	if (rc != 0) {
1129 		/* Not fatal */
1130 		SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1131 		return;
1132 	}
1133 
1134 	sock->placement_id = placement_id;
1135 #endif
1136 }
1137 
1138 static void
1139 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1140 {
1141 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1142 
1143 	if (group->placement_id == -1) {
1144 		group->placement_id = spdk_sock_map_find_free(&g_map);
1145 
1146 		/* If a free placement id is found, update existing sockets in this group */
1147 		if (group->placement_id != -1) {
1148 			struct spdk_sock  *sock, *tmp;
1149 
1150 			TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1151 				posix_sock_mark(group, __posix_sock(sock), group->placement_id);
1152 			}
1153 		}
1154 	}
1155 
1156 	if (group->placement_id != -1) {
1157 		/*
1158 		 * group placement id is already determined for this poll group.
1159 		 * Mark socket with group's placement id.
1160 		 */
1161 		posix_sock_mark(group, __posix_sock(_sock), group->placement_id);
1162 	}
1163 }
1164 
1165 static int
1166 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1167 {
1168 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1169 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1170 	int rc;
1171 
1172 #if defined(SPDK_EPOLL)
1173 	struct epoll_event event;
1174 
1175 	memset(&event, 0, sizeof(event));
1176 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1177 	event.events = EPOLLIN | EPOLLERR;
1178 	event.data.ptr = sock;
1179 
1180 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1181 #elif defined(SPDK_KEVENT)
1182 	struct kevent event;
1183 	struct timespec ts = {0};
1184 
1185 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1186 
1187 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1188 #endif
1189 
1190 	if (rc != 0) {
1191 		return rc;
1192 	}
1193 
1194 	/* switched from another polling group due to scheduling */
1195 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1196 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1197 		sock->pipe_has_data = true;
1198 		sock->socket_has_data = false;
1199 		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
1200 	}
1201 
1202 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
1203 		posix_sock_update_mark(_group, _sock);
1204 	} else if (sock->placement_id != -1) {
1205 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1206 		if (rc != 0) {
1207 			SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1208 			/* Do not treat this as an error. The system will continue running. */
1209 		}
1210 	}
1211 
1212 	return rc;
1213 }
1214 
1215 static int
1216 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1217 {
1218 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1219 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1220 	int rc;
1221 
1222 	if (sock->pipe_has_data || sock->socket_has_data) {
1223 		TAILQ_REMOVE(&group->socks_with_data, sock, link);
1224 		sock->pipe_has_data = false;
1225 		sock->socket_has_data = false;
1226 	}
1227 
1228 	if (sock->placement_id != -1) {
1229 		spdk_sock_map_release(&g_map, sock->placement_id);
1230 	}
1231 
1232 #if defined(SPDK_EPOLL)
1233 	struct epoll_event event;
1234 
1235 	/* Event parameter is ignored but some old kernel version still require it. */
1236 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1237 #elif defined(SPDK_KEVENT)
1238 	struct kevent event;
1239 	struct timespec ts = {0};
1240 
1241 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1242 
1243 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1244 	if (rc == 0 && event.flags & EV_ERROR) {
1245 		rc = -1;
1246 		errno = event.data;
1247 	}
1248 #endif
1249 
1250 	spdk_sock_abort_requests(_sock);
1251 
1252 	return rc;
1253 }
1254 
1255 static int
1256 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1257 			   struct spdk_sock **socks)
1258 {
1259 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1260 	struct spdk_sock *sock, *tmp;
1261 	int num_events, i, rc;
1262 	struct spdk_posix_sock *psock, *ptmp;
1263 #if defined(SPDK_EPOLL)
1264 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1265 #elif defined(SPDK_KEVENT)
1266 	struct kevent events[MAX_EVENTS_PER_POLL];
1267 	struct timespec ts = {0};
1268 #endif
1269 
1270 #ifdef SPDK_ZEROCOPY
1271 	/* When all of the following conditions are met
1272 	 * - non-blocking socket
1273 	 * - zero copy is enabled
1274 	 * - interrupts suppressed (i.e. busy polling)
1275 	 * - the NIC tx queue is full at the time sendmsg() is called
1276 	 * - epoll_wait determines there is an EPOLLIN event for the socket
1277 	 * then we can get into a situation where data we've sent is queued
1278 	 * up in the kernel network stack, but interrupts have been suppressed
1279 	 * because other traffic is flowing so the kernel misses the signal
1280 	 * to flush the software tx queue. If there wasn't incoming data
1281 	 * pending on the socket, then epoll_wait would have been sufficient
1282 	 * to kick off the send operation, but since there is a pending event
1283 	 * epoll_wait does not trigger the necessary operation.
1284 	 *
1285 	 * We deal with this by checking for all of the above conditions and
1286 	 * additionally looking for EPOLLIN events that were not consumed from
1287 	 * the last poll loop. We take this to mean that the upper layer is
1288 	 * unable to consume them because it is blocked waiting for resources
1289 	 * to free up, and those resources are most likely freed in response
1290 	 * to a pending asynchronous write completing.
1291 	 *
1292 	 * Additionally, sockets that have the same placement_id actually share
1293 	 * an underlying hardware queue. That means polling one of them is
1294 	 * equivalent to polling all of them. As a quick mechanism to avoid
1295 	 * making extra poll() calls, stash the last placement_id during the loop
1296 	 * and only poll if it's not the same. The overwhelmingly common case
1297 	 * is that all sockets in this list have the same placement_id because
1298 	 * SPDK is intentionally grouping sockets by that value, so even
1299 	 * though this won't stop all extra calls to poll(), it's very fast
1300 	 * and will catch all of them in practice.
1301 	 */
1302 	int last_placement_id = -1;
1303 
1304 	TAILQ_FOREACH(psock, &group->socks_with_data, link) {
1305 		if (psock->zcopy && psock->placement_id >= 0 &&
1306 		    psock->placement_id != last_placement_id) {
1307 			struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0};
1308 
1309 			poll(&pfd, 1, 0);
1310 			last_placement_id = psock->placement_id;
1311 		}
1312 	}
1313 #endif
1314 
1315 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1316 	 * a completion callback could remove the sock from the
1317 	 * group. */
1318 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1319 		rc = _sock_flush(sock);
1320 		if (rc) {
1321 			spdk_sock_abort_requests(sock);
1322 		}
1323 	}
1324 
1325 	assert(max_events > 0);
1326 
1327 #if defined(SPDK_EPOLL)
1328 	num_events = epoll_wait(group->fd, events, max_events, 0);
1329 #elif defined(SPDK_KEVENT)
1330 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1331 #endif
1332 
1333 	if (num_events == -1) {
1334 		return -1;
1335 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
1336 		sock = TAILQ_FIRST(&_group->socks);
1337 		psock = __posix_sock(sock);
1338 		/* poll() is called here to busy poll the queue associated with
1339 		 * first socket in list and potentially reap incoming data.
1340 		 */
1341 		if (sock->opts.priority) {
1342 			struct pollfd pfd = {0, 0, 0};
1343 
1344 			pfd.fd = psock->fd;
1345 			pfd.events = POLLIN | POLLERR;
1346 			poll(&pfd, 1, 0);
1347 		}
1348 	}
1349 
1350 	for (i = 0; i < num_events; i++) {
1351 #if defined(SPDK_EPOLL)
1352 		sock = events[i].data.ptr;
1353 		psock = __posix_sock(sock);
1354 
1355 #ifdef SPDK_ZEROCOPY
1356 		if (events[i].events & EPOLLERR) {
1357 			rc = _sock_check_zcopy(sock);
1358 			/* If the socket was closed or removed from
1359 			 * the group in response to a send ack, don't
1360 			 * add it to the array here. */
1361 			if (rc || sock->cb_fn == NULL) {
1362 				continue;
1363 			}
1364 		}
1365 #endif
1366 		if ((events[i].events & EPOLLIN) == 0) {
1367 			continue;
1368 		}
1369 
1370 #elif defined(SPDK_KEVENT)
1371 		sock = events[i].udata;
1372 		psock = __posix_sock(sock);
1373 #endif
1374 
1375 		/* If the socket is not already in the list, add it now */
1376 		if (!psock->socket_has_data && !psock->pipe_has_data) {
1377 			TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link);
1378 		}
1379 
1380 		psock->socket_has_data = true;
1381 	}
1382 
1383 	num_events = 0;
1384 
1385 	TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) {
1386 		if (num_events == max_events) {
1387 			break;
1388 		}
1389 
1390 		/* If the socket's cb_fn is NULL, just remove it from the
1391 		 * list and do not add it to socks array */
1392 		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
1393 			psock->socket_has_data = false;
1394 			psock->pipe_has_data = false;
1395 			TAILQ_REMOVE(&group->socks_with_data, psock, link);
1396 			continue;
1397 		}
1398 
1399 		socks[num_events++] = &psock->base;
1400 	}
1401 
1402 	/* Cycle the has_data list so that each time we poll things aren't
1403 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1404 	 * A B C D E F
1405 	 * And all 6 sockets had epoll events, but max_events is only 3. That means
1406 	 * psock currently points at D. We want to rearrange the list to the following:
1407 	 * D E F A B C
1408 	 *
1409 	 * The variables below are named according to this example to make it easier to
1410 	 * follow the swaps.
1411 	 */
1412 	if (psock != NULL) {
1413 		struct spdk_posix_sock *pa, *pc, *pd, *pf;
1414 
1415 		/* Capture pointers to the elements we need */
1416 		pd = psock;
1417 		pc = TAILQ_PREV(pd, spdk_has_data_list, link);
1418 		pa = TAILQ_FIRST(&group->socks_with_data);
1419 		pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list);
1420 
1421 		/* Break the link between C and D */
1422 		pc->link.tqe_next = NULL;
1423 
1424 		/* Connect F to A */
1425 		pf->link.tqe_next = pa;
1426 		pa->link.tqe_prev = &pf->link.tqe_next;
1427 
1428 		/* Fix up the list first/last pointers */
1429 		group->socks_with_data.tqh_first = pd;
1430 		group->socks_with_data.tqh_last = &pc->link.tqe_next;
1431 
1432 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1433 		pd->link.tqe_prev = &group->socks_with_data.tqh_first;
1434 	}
1435 
1436 	return num_events;
1437 }
1438 
1439 static int
1440 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1441 {
1442 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1443 	int rc;
1444 
1445 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1446 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1447 	}
1448 
1449 	rc = close(group->fd);
1450 	free(group);
1451 	return rc;
1452 }
1453 
1454 static int
1455 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1456 {
1457 	if (!opts || !len) {
1458 		errno = EINVAL;
1459 		return -1;
1460 	}
1461 	memset(opts, 0, *len);
1462 
1463 #define FIELD_OK(field) \
1464 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1465 
1466 #define GET_FIELD(field) \
1467 	if (FIELD_OK(field)) { \
1468 		opts->field = g_spdk_posix_sock_impl_opts.field; \
1469 	}
1470 
1471 	GET_FIELD(recv_buf_size);
1472 	GET_FIELD(send_buf_size);
1473 	GET_FIELD(enable_recv_pipe);
1474 	GET_FIELD(enable_zerocopy_send);
1475 	GET_FIELD(enable_quickack);
1476 	GET_FIELD(enable_placement_id);
1477 	GET_FIELD(enable_zerocopy_send_server);
1478 	GET_FIELD(enable_zerocopy_send_client);
1479 
1480 #undef GET_FIELD
1481 #undef FIELD_OK
1482 
1483 	*len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
1484 	return 0;
1485 }
1486 
1487 static int
1488 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1489 {
1490 	if (!opts) {
1491 		errno = EINVAL;
1492 		return -1;
1493 	}
1494 
1495 #define FIELD_OK(field) \
1496 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1497 
1498 #define SET_FIELD(field) \
1499 	if (FIELD_OK(field)) { \
1500 		g_spdk_posix_sock_impl_opts.field = opts->field; \
1501 	}
1502 
1503 	SET_FIELD(recv_buf_size);
1504 	SET_FIELD(send_buf_size);
1505 	SET_FIELD(enable_recv_pipe);
1506 	SET_FIELD(enable_zerocopy_send);
1507 	SET_FIELD(enable_quickack);
1508 	SET_FIELD(enable_placement_id);
1509 	SET_FIELD(enable_zerocopy_send_server);
1510 	SET_FIELD(enable_zerocopy_send_client);
1511 
1512 #undef SET_FIELD
1513 #undef FIELD_OK
1514 
1515 	return 0;
1516 }
1517 
1518 
1519 static struct spdk_net_impl g_posix_net_impl = {
1520 	.name		= "posix",
1521 	.getaddr	= posix_sock_getaddr,
1522 	.connect	= posix_sock_connect,
1523 	.listen		= posix_sock_listen,
1524 	.accept		= posix_sock_accept,
1525 	.close		= posix_sock_close,
1526 	.recv		= posix_sock_recv,
1527 	.readv		= posix_sock_readv,
1528 	.writev		= posix_sock_writev,
1529 	.writev_async	= posix_sock_writev_async,
1530 	.flush		= posix_sock_flush,
1531 	.set_recvlowat	= posix_sock_set_recvlowat,
1532 	.set_recvbuf	= posix_sock_set_recvbuf,
1533 	.set_sendbuf	= posix_sock_set_sendbuf,
1534 	.is_ipv6	= posix_sock_is_ipv6,
1535 	.is_ipv4	= posix_sock_is_ipv4,
1536 	.is_connected	= posix_sock_is_connected,
1537 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
1538 	.group_impl_create	= posix_sock_group_impl_create,
1539 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
1540 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
1541 	.group_impl_poll	= posix_sock_group_impl_poll,
1542 	.group_impl_close	= posix_sock_group_impl_close,
1543 	.get_opts	= posix_sock_impl_get_opts,
1544 	.set_opts	= posix_sock_impl_set_opts,
1545 };
1546 
1547 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1548