xref: /spdk/module/sock/posix/posix.c (revision da2fd6651a9cd4732b0910d30291821e77f4d643)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation. All rights reserved.
5  *   Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #if defined(__FreeBSD__)
37 #include <sys/event.h>
38 #define SPDK_KEVENT
39 #else
40 #include <sys/epoll.h>
41 #define SPDK_EPOLL
42 #endif
43 
44 #if defined(__linux__)
45 #include <linux/errqueue.h>
46 #endif
47 
48 #include "spdk/env.h"
49 #include "spdk/log.h"
50 #include "spdk/pipe.h"
51 #include "spdk/sock.h"
52 #include "spdk/util.h"
53 #include "spdk_internal/sock.h"
54 
55 #define MAX_TMPBUF 1024
56 #define PORTNUMLEN 32
57 
58 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
59 #define SPDK_ZEROCOPY
60 #endif
61 
62 struct spdk_posix_sock {
63 	struct spdk_sock	base;
64 	int			fd;
65 
66 	uint32_t		sendmsg_idx;
67 
68 	struct spdk_pipe	*recv_pipe;
69 	void			*recv_buf;
70 	int			recv_buf_sz;
71 	bool			pipe_has_data;
72 	bool			socket_has_data;
73 	bool			zcopy;
74 
75 	int			placement_id;
76 
77 	TAILQ_ENTRY(spdk_posix_sock)	link;
78 };
79 
80 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock);
81 
82 struct spdk_posix_sock_group_impl {
83 	struct spdk_sock_group_impl	base;
84 	int				fd;
85 	struct spdk_has_data_list	socks_with_data;
86 	int				placement_id;
87 };
88 
89 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
90 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
91 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
92 	.enable_recv_pipe = true,
93 	.enable_zerocopy_send = true,
94 	.enable_quickack = false,
95 	.enable_placement_id = PLACEMENT_NONE,
96 	.enable_zerocopy_send_server = true,
97 	.enable_zerocopy_send_client = false
98 };
99 
100 static struct spdk_sock_map g_map = {
101 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
102 	.mtx = PTHREAD_MUTEX_INITIALIZER
103 };
104 
105 __attribute((destructor)) static void
106 posix_sock_map_cleanup(void)
107 {
108 	spdk_sock_map_cleanup(&g_map);
109 }
110 
111 static int
112 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
113 {
114 	const char *result = NULL;
115 
116 	if (sa == NULL || host == NULL) {
117 		return -1;
118 	}
119 
120 	switch (sa->sa_family) {
121 	case AF_INET:
122 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
123 				   host, hlen);
124 		break;
125 	case AF_INET6:
126 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
127 				   host, hlen);
128 		break;
129 	default:
130 		break;
131 	}
132 
133 	if (result != NULL) {
134 		return 0;
135 	} else {
136 		return -1;
137 	}
138 }
139 
140 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
141 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
142 
143 static int
144 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
145 		   char *caddr, int clen, uint16_t *cport)
146 {
147 	struct spdk_posix_sock *sock = __posix_sock(_sock);
148 	struct sockaddr_storage sa;
149 	socklen_t salen;
150 	int rc;
151 
152 	assert(sock != NULL);
153 
154 	memset(&sa, 0, sizeof sa);
155 	salen = sizeof sa;
156 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
157 	if (rc != 0) {
158 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
159 		return -1;
160 	}
161 
162 	switch (sa.ss_family) {
163 	case AF_UNIX:
164 		/* Acceptable connection types that don't have IPs */
165 		return 0;
166 	case AF_INET:
167 	case AF_INET6:
168 		/* Code below will get IP addresses */
169 		break;
170 	default:
171 		/* Unsupported socket family */
172 		return -1;
173 	}
174 
175 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
176 	if (rc != 0) {
177 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
178 		return -1;
179 	}
180 
181 	if (sport) {
182 		if (sa.ss_family == AF_INET) {
183 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
184 		} else if (sa.ss_family == AF_INET6) {
185 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
186 		}
187 	}
188 
189 	memset(&sa, 0, sizeof sa);
190 	salen = sizeof sa;
191 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
192 	if (rc != 0) {
193 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
194 		return -1;
195 	}
196 
197 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
198 	if (rc != 0) {
199 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
200 		return -1;
201 	}
202 
203 	if (cport) {
204 		if (sa.ss_family == AF_INET) {
205 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
206 		} else if (sa.ss_family == AF_INET6) {
207 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
208 		}
209 	}
210 
211 	return 0;
212 }
213 
214 enum posix_sock_create_type {
215 	SPDK_SOCK_CREATE_LISTEN,
216 	SPDK_SOCK_CREATE_CONNECT,
217 };
218 
219 static int
220 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
221 {
222 	uint8_t *new_buf;
223 	struct spdk_pipe *new_pipe;
224 	struct iovec siov[2];
225 	struct iovec diov[2];
226 	int sbytes;
227 	ssize_t bytes;
228 
229 	if (sock->recv_buf_sz == sz) {
230 		return 0;
231 	}
232 
233 	/* If the new size is 0, just free the pipe */
234 	if (sz == 0) {
235 		spdk_pipe_destroy(sock->recv_pipe);
236 		free(sock->recv_buf);
237 		sock->recv_pipe = NULL;
238 		sock->recv_buf = NULL;
239 		return 0;
240 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
241 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
242 		return -1;
243 	}
244 
245 	/* Round up to next 64 byte multiple */
246 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
247 	if (!new_buf) {
248 		SPDK_ERRLOG("socket recv buf allocation failed\n");
249 		return -ENOMEM;
250 	}
251 
252 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
253 	if (new_pipe == NULL) {
254 		SPDK_ERRLOG("socket pipe allocation failed\n");
255 		free(new_buf);
256 		return -ENOMEM;
257 	}
258 
259 	if (sock->recv_pipe != NULL) {
260 		/* Pull all of the data out of the old pipe */
261 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
262 		if (sbytes > sz) {
263 			/* Too much data to fit into the new pipe size */
264 			spdk_pipe_destroy(new_pipe);
265 			free(new_buf);
266 			return -EINVAL;
267 		}
268 
269 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
270 		assert(sbytes == sz);
271 
272 		bytes = spdk_iovcpy(siov, 2, diov, 2);
273 		spdk_pipe_writer_advance(new_pipe, bytes);
274 
275 		spdk_pipe_destroy(sock->recv_pipe);
276 		free(sock->recv_buf);
277 	}
278 
279 	sock->recv_buf_sz = sz;
280 	sock->recv_buf = new_buf;
281 	sock->recv_pipe = new_pipe;
282 
283 	return 0;
284 }
285 
286 static int
287 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
288 {
289 	struct spdk_posix_sock *sock = __posix_sock(_sock);
290 	int rc;
291 
292 	assert(sock != NULL);
293 
294 	if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) {
295 		rc = posix_sock_alloc_pipe(sock, sz);
296 		if (rc) {
297 			return rc;
298 		}
299 	}
300 
301 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
302 	if (sz < MIN_SO_RCVBUF_SIZE) {
303 		sz = MIN_SO_RCVBUF_SIZE;
304 	}
305 
306 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
307 	if (rc < 0) {
308 		return rc;
309 	}
310 
311 	return 0;
312 }
313 
314 static int
315 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
316 {
317 	struct spdk_posix_sock *sock = __posix_sock(_sock);
318 	int rc;
319 
320 	assert(sock != NULL);
321 
322 	if (sz < MIN_SO_SNDBUF_SIZE) {
323 		sz = MIN_SO_SNDBUF_SIZE;
324 	}
325 
326 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
327 	if (rc < 0) {
328 		return rc;
329 	}
330 
331 	return 0;
332 }
333 
334 static struct spdk_posix_sock *
335 posix_sock_alloc(int fd, bool enable_zero_copy)
336 {
337 	struct spdk_posix_sock *sock;
338 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
339 	int flag;
340 	int rc;
341 #endif
342 
343 	sock = calloc(1, sizeof(*sock));
344 	if (sock == NULL) {
345 		SPDK_ERRLOG("sock allocation failed\n");
346 		return NULL;
347 	}
348 
349 	sock->fd = fd;
350 
351 #if defined(SPDK_ZEROCOPY)
352 	flag = 1;
353 
354 	if (enable_zero_copy) {
355 		/* Try to turn on zero copy sends */
356 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
357 		if (rc == 0) {
358 			sock->zcopy = true;
359 		}
360 	}
361 #endif
362 
363 #if defined(__linux__)
364 	flag = 1;
365 
366 	if (g_spdk_posix_sock_impl_opts.enable_quickack) {
367 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
368 		if (rc != 0) {
369 			SPDK_ERRLOG("quickack was failed to set\n");
370 		}
371 	}
372 
373 	spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id,
374 				   &sock->placement_id);
375 
376 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
377 		/* Save placement_id */
378 		spdk_sock_map_insert(&g_map, sock->placement_id, NULL);
379 	}
380 #endif
381 
382 	return sock;
383 }
384 
385 static bool
386 sock_is_loopback(int fd)
387 {
388 	struct ifaddrs *addrs, *tmp;
389 	struct sockaddr_storage sa = {};
390 	socklen_t salen;
391 	struct ifreq ifr = {};
392 	char ip_addr[256], ip_addr_tmp[256];
393 	int rc;
394 	bool is_loopback = false;
395 
396 	salen = sizeof(sa);
397 	rc = getsockname(fd, (struct sockaddr *)&sa, &salen);
398 	if (rc != 0) {
399 		return is_loopback;
400 	}
401 
402 	memset(ip_addr, 0, sizeof(ip_addr));
403 	rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr));
404 	if (rc != 0) {
405 		return is_loopback;
406 	}
407 
408 	getifaddrs(&addrs);
409 	for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) {
410 		if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) &&
411 		    (tmp->ifa_addr->sa_family == sa.ss_family)) {
412 			memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp));
413 			rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp));
414 			if (rc != 0) {
415 				continue;
416 			}
417 
418 			if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) {
419 				memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name));
420 				ioctl(fd, SIOCGIFFLAGS, &ifr);
421 				if (ifr.ifr_flags & IFF_LOOPBACK) {
422 					is_loopback = true;
423 				}
424 				goto end;
425 			}
426 		}
427 	}
428 
429 end:
430 	freeifaddrs(addrs);
431 	return is_loopback;
432 }
433 
434 static struct spdk_sock *
435 posix_sock_create(const char *ip, int port,
436 		  enum posix_sock_create_type type,
437 		  struct spdk_sock_opts *opts)
438 {
439 	struct spdk_posix_sock *sock;
440 	char buf[MAX_TMPBUF];
441 	char portnum[PORTNUMLEN];
442 	char *p;
443 	struct addrinfo hints, *res, *res0;
444 	int fd, flag;
445 	int val = 1;
446 	int rc, sz;
447 	bool enable_zcopy_user_opts = true;
448 	bool enable_zcopy_impl_opts = true;
449 
450 	assert(opts != NULL);
451 
452 	if (ip == NULL) {
453 		return NULL;
454 	}
455 	if (ip[0] == '[') {
456 		snprintf(buf, sizeof(buf), "%s", ip + 1);
457 		p = strchr(buf, ']');
458 		if (p != NULL) {
459 			*p = '\0';
460 		}
461 		ip = (const char *) &buf[0];
462 	}
463 
464 	snprintf(portnum, sizeof portnum, "%d", port);
465 	memset(&hints, 0, sizeof hints);
466 	hints.ai_family = PF_UNSPEC;
467 	hints.ai_socktype = SOCK_STREAM;
468 	hints.ai_flags = AI_NUMERICSERV;
469 	hints.ai_flags |= AI_PASSIVE;
470 	hints.ai_flags |= AI_NUMERICHOST;
471 	rc = getaddrinfo(ip, portnum, &hints, &res0);
472 	if (rc != 0) {
473 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
474 		return NULL;
475 	}
476 
477 	/* try listen */
478 	fd = -1;
479 	for (res = res0; res != NULL; res = res->ai_next) {
480 retry:
481 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
482 		if (fd < 0) {
483 			/* error */
484 			continue;
485 		}
486 
487 		sz = g_spdk_posix_sock_impl_opts.recv_buf_size;
488 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
489 		if (rc) {
490 			/* Not fatal */
491 		}
492 
493 		sz = g_spdk_posix_sock_impl_opts.send_buf_size;
494 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
495 		if (rc) {
496 			/* Not fatal */
497 		}
498 
499 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
500 		if (rc != 0) {
501 			close(fd);
502 			fd = -1;
503 			/* error */
504 			continue;
505 		}
506 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
507 		if (rc != 0) {
508 			close(fd);
509 			fd = -1;
510 			/* error */
511 			continue;
512 		}
513 
514 #if defined(SO_PRIORITY)
515 		if (opts->priority) {
516 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
517 			if (rc != 0) {
518 				close(fd);
519 				fd = -1;
520 				/* error */
521 				continue;
522 			}
523 		}
524 #endif
525 
526 		if (res->ai_family == AF_INET6) {
527 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
528 			if (rc != 0) {
529 				close(fd);
530 				fd = -1;
531 				/* error */
532 				continue;
533 			}
534 		}
535 
536 		if (type == SPDK_SOCK_CREATE_LISTEN) {
537 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
538 			if (rc != 0) {
539 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
540 				switch (errno) {
541 				case EINTR:
542 					/* interrupted? */
543 					close(fd);
544 					goto retry;
545 				case EADDRNOTAVAIL:
546 					SPDK_ERRLOG("IP address %s not available. "
547 						    "Verify IP address in config file "
548 						    "and make sure setup script is "
549 						    "run before starting spdk app.\n", ip);
550 				/* FALLTHROUGH */
551 				default:
552 					/* try next family */
553 					close(fd);
554 					fd = -1;
555 					continue;
556 				}
557 			}
558 			/* bind OK */
559 			rc = listen(fd, 512);
560 			if (rc != 0) {
561 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
562 				close(fd);
563 				fd = -1;
564 				break;
565 			}
566 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server &&
567 						 g_spdk_posix_sock_impl_opts.enable_zerocopy_send;
568 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
569 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
570 			if (rc != 0) {
571 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
572 				/* try next family */
573 				close(fd);
574 				fd = -1;
575 				continue;
576 			}
577 			enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client &&
578 						 g_spdk_posix_sock_impl_opts.enable_zerocopy_send;
579 		}
580 
581 		flag = fcntl(fd, F_GETFL);
582 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
583 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
584 			close(fd);
585 			fd = -1;
586 			break;
587 		}
588 		break;
589 	}
590 	freeaddrinfo(res0);
591 
592 	if (fd < 0) {
593 		return NULL;
594 	}
595 
596 	/* Only enable zero copy for non-loopback sockets. */
597 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd);
598 
599 	sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts);
600 	if (sock == NULL) {
601 		SPDK_ERRLOG("sock allocation failed\n");
602 		close(fd);
603 		return NULL;
604 	}
605 
606 	return &sock->base;
607 }
608 
609 static struct spdk_sock *
610 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
611 {
612 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
613 }
614 
615 static struct spdk_sock *
616 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
617 {
618 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
619 }
620 
621 static struct spdk_sock *
622 posix_sock_accept(struct spdk_sock *_sock)
623 {
624 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
625 	struct sockaddr_storage		sa;
626 	socklen_t			salen;
627 	int				rc, fd;
628 	struct spdk_posix_sock		*new_sock;
629 	int				flag;
630 
631 	memset(&sa, 0, sizeof(sa));
632 	salen = sizeof(sa);
633 
634 	assert(sock != NULL);
635 
636 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
637 
638 	if (rc == -1) {
639 		return NULL;
640 	}
641 
642 	fd = rc;
643 
644 	flag = fcntl(fd, F_GETFL);
645 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
646 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
647 		close(fd);
648 		return NULL;
649 	}
650 
651 #if defined(SO_PRIORITY)
652 	/* The priority is not inherited, so call this function again */
653 	if (sock->base.opts.priority) {
654 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
655 		if (rc != 0) {
656 			close(fd);
657 			return NULL;
658 		}
659 	}
660 #endif
661 
662 	/* Inherit the zero copy feature from the listen socket */
663 	new_sock = posix_sock_alloc(fd, sock->zcopy);
664 	if (new_sock == NULL) {
665 		close(fd);
666 		return NULL;
667 	}
668 
669 	return &new_sock->base;
670 }
671 
672 static int
673 posix_sock_close(struct spdk_sock *_sock)
674 {
675 	struct spdk_posix_sock *sock = __posix_sock(_sock);
676 
677 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
678 
679 	/* If the socket fails to close, the best choice is to
680 	 * leak the fd but continue to free the rest of the sock
681 	 * memory. */
682 	close(sock->fd);
683 
684 	spdk_pipe_destroy(sock->recv_pipe);
685 	free(sock->recv_buf);
686 	free(sock);
687 
688 	return 0;
689 }
690 
691 #ifdef SPDK_ZEROCOPY
692 static int
693 _sock_check_zcopy(struct spdk_sock *sock)
694 {
695 	struct spdk_posix_sock *psock = __posix_sock(sock);
696 	struct msghdr msgh = {};
697 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
698 	ssize_t rc;
699 	struct sock_extended_err *serr;
700 	struct cmsghdr *cm;
701 	uint32_t idx;
702 	struct spdk_sock_request *req, *treq;
703 	bool found;
704 
705 	msgh.msg_control = buf;
706 	msgh.msg_controllen = sizeof(buf);
707 
708 	while (true) {
709 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
710 
711 		if (rc < 0) {
712 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
713 				return 0;
714 			}
715 
716 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
717 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
718 			} else {
719 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
720 			}
721 			return 0;
722 		}
723 
724 		cm = CMSG_FIRSTHDR(&msgh);
725 		if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
726 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
727 			return 0;
728 		}
729 
730 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
731 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
732 			SPDK_WARNLOG("Unexpected extended error origin\n");
733 			return 0;
734 		}
735 
736 		/* Most of the time, the pending_reqs array is in the exact
737 		 * order we need such that all of the requests to complete are
738 		 * in order, in the front. It is guaranteed that all requests
739 		 * belonging to the same sendmsg call are sequential, so once
740 		 * we encounter one match we can stop looping as soon as a
741 		 * non-match is found.
742 		 */
743 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
744 			found = false;
745 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
746 				if (req->internal.offset == idx) {
747 					found = true;
748 
749 					rc = spdk_sock_request_put(sock, req, 0);
750 					if (rc < 0) {
751 						return rc;
752 					}
753 
754 				} else if (found) {
755 					break;
756 				}
757 			}
758 		}
759 	}
760 
761 	return 0;
762 }
763 #endif
764 
765 static int
766 _sock_flush(struct spdk_sock *sock)
767 {
768 	struct spdk_posix_sock *psock = __posix_sock(sock);
769 	struct msghdr msg = {};
770 	int flags;
771 	struct iovec iovs[IOV_BATCH_SIZE];
772 	int iovcnt;
773 	int retval;
774 	struct spdk_sock_request *req;
775 	int i;
776 	ssize_t rc;
777 	unsigned int offset;
778 	size_t len;
779 
780 	/* Can't flush from within a callback or we end up with recursive calls */
781 	if (sock->cb_cnt > 0) {
782 		return 0;
783 	}
784 
785 	iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL);
786 
787 	if (iovcnt == 0) {
788 		return 0;
789 	}
790 
791 	/* Perform the vectored write */
792 	msg.msg_iov = iovs;
793 	msg.msg_iovlen = iovcnt;
794 #ifdef SPDK_ZEROCOPY
795 	if (psock->zcopy) {
796 		flags = MSG_ZEROCOPY;
797 	} else
798 #endif
799 	{
800 		flags = 0;
801 	}
802 	rc = sendmsg(psock->fd, &msg, flags);
803 	if (rc <= 0) {
804 		if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
805 			return 0;
806 		}
807 		return rc;
808 	}
809 
810 	/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
811 	 * req->internal.offset, so sendmsg_idx should not be zero  */
812 	if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
813 		psock->sendmsg_idx = 1;
814 	} else {
815 		psock->sendmsg_idx++;
816 	}
817 
818 	/* Consume the requests that were actually written */
819 	req = TAILQ_FIRST(&sock->queued_reqs);
820 	while (req) {
821 		offset = req->internal.offset;
822 
823 		for (i = 0; i < req->iovcnt; i++) {
824 			/* Advance by the offset first */
825 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
826 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
827 				continue;
828 			}
829 
830 			/* Calculate the remaining length of this element */
831 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
832 
833 			if (len > (size_t)rc) {
834 				/* This element was partially sent. */
835 				req->internal.offset += rc;
836 				return 0;
837 			}
838 
839 			offset = 0;
840 			req->internal.offset += len;
841 			rc -= len;
842 		}
843 
844 		/* Handled a full request. */
845 		spdk_sock_request_pend(sock, req);
846 
847 		if (!psock->zcopy) {
848 			/* The sendmsg syscall above isn't currently asynchronous,
849 			* so it's already done. */
850 			retval = spdk_sock_request_put(sock, req, 0);
851 			if (retval) {
852 				break;
853 			}
854 		} else {
855 			/* Re-use the offset field to hold the sendmsg call index. The
856 			 * index is 0 based, so subtract one here because we've already
857 			 * incremented above. */
858 			req->internal.offset = psock->sendmsg_idx - 1;
859 		}
860 
861 		if (rc == 0) {
862 			break;
863 		}
864 
865 		req = TAILQ_FIRST(&sock->queued_reqs);
866 	}
867 
868 	return 0;
869 }
870 
871 static int
872 posix_sock_flush(struct spdk_sock *sock)
873 {
874 #ifdef SPDK_ZEROCOPY
875 	struct spdk_posix_sock *psock = __posix_sock(sock);
876 
877 	if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) {
878 		_sock_check_zcopy(sock);
879 	}
880 #endif
881 
882 	return _sock_flush(sock);
883 }
884 
885 static ssize_t
886 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
887 {
888 	struct iovec siov[2];
889 	int sbytes;
890 	ssize_t bytes;
891 	struct spdk_posix_sock_group_impl *group;
892 
893 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
894 	if (sbytes < 0) {
895 		errno = EINVAL;
896 		return -1;
897 	} else if (sbytes == 0) {
898 		errno = EAGAIN;
899 		return -1;
900 	}
901 
902 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
903 
904 	if (bytes == 0) {
905 		/* The only way this happens is if diov is 0 length */
906 		errno = EINVAL;
907 		return -1;
908 	}
909 
910 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
911 
912 	/* If we drained the pipe, mark it appropriately */
913 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
914 		assert(sock->pipe_has_data == true);
915 
916 		group = __posix_group_impl(sock->base.group_impl);
917 		if (group && !sock->socket_has_data) {
918 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
919 		}
920 
921 		sock->pipe_has_data = false;
922 	}
923 
924 	return bytes;
925 }
926 
927 static inline ssize_t
928 posix_sock_read(struct spdk_posix_sock *sock)
929 {
930 	struct iovec iov[2];
931 	int bytes_avail, bytes_recvd;
932 	struct spdk_posix_sock_group_impl *group;
933 
934 	bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
935 
936 	if (bytes_avail <= 0) {
937 		return bytes_avail;
938 	}
939 
940 	bytes_recvd = readv(sock->fd, iov, 2);
941 
942 	assert(sock->pipe_has_data == false);
943 
944 	if (bytes_recvd <= 0) {
945 		/* Errors count as draining the socket data */
946 		if (sock->base.group_impl && sock->socket_has_data) {
947 			group = __posix_group_impl(sock->base.group_impl);
948 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
949 		}
950 
951 		sock->socket_has_data = false;
952 
953 		return bytes_recvd;
954 	}
955 
956 	spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);
957 
958 #if DEBUG
959 	if (sock->base.group_impl) {
960 		assert(sock->socket_has_data == true);
961 	}
962 #endif
963 
964 	sock->pipe_has_data = true;
965 	sock->socket_has_data = false;
966 
967 	return bytes_recvd;
968 }
969 
970 static ssize_t
971 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
972 {
973 	struct spdk_posix_sock *sock = __posix_sock(_sock);
974 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
975 	int rc, i;
976 	size_t len;
977 
978 	if (sock->recv_pipe == NULL) {
979 		assert(sock->pipe_has_data == false);
980 		if (group && sock->socket_has_data) {
981 			sock->socket_has_data = false;
982 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
983 		}
984 		return readv(sock->fd, iov, iovcnt);
985 	}
986 
987 	/* If the socket is not in a group, we must assume it always has
988 	 * data waiting for us because it is not epolled */
989 	if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) {
990 		/* If the user is receiving a sufficiently large amount of data,
991 		 * receive directly to their buffers. */
992 		len = 0;
993 		for (i = 0; i < iovcnt; i++) {
994 			len += iov[i].iov_len;
995 		}
996 
997 		if (len >= MIN_SOCK_PIPE_SIZE) {
998 			/* TODO: Should this detect if kernel socket is drained? */
999 			return readv(sock->fd, iov, iovcnt);
1000 		}
1001 
1002 		/* Otherwise, do a big read into our pipe */
1003 		rc = posix_sock_read(sock);
1004 		if (rc <= 0) {
1005 			return rc;
1006 		}
1007 	}
1008 
1009 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
1010 }
1011 
1012 static ssize_t
1013 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
1014 {
1015 	struct iovec iov[1];
1016 
1017 	iov[0].iov_base = buf;
1018 	iov[0].iov_len = len;
1019 
1020 	return posix_sock_readv(sock, iov, 1);
1021 }
1022 
1023 static ssize_t
1024 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1025 {
1026 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1027 	int rc;
1028 
1029 	/* In order to process a writev, we need to flush any asynchronous writes
1030 	 * first. */
1031 	rc = _sock_flush(_sock);
1032 	if (rc < 0) {
1033 		return rc;
1034 	}
1035 
1036 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
1037 		/* We weren't able to flush all requests */
1038 		errno = EAGAIN;
1039 		return -1;
1040 	}
1041 
1042 	return writev(sock->fd, iov, iovcnt);
1043 }
1044 
1045 static void
1046 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1047 {
1048 	int rc;
1049 
1050 	spdk_sock_request_queue(sock, req);
1051 
1052 	/* If there are a sufficient number queued, just flush them out immediately. */
1053 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1054 		rc = _sock_flush(sock);
1055 		if (rc) {
1056 			spdk_sock_abort_requests(sock);
1057 		}
1058 	}
1059 }
1060 
1061 static int
1062 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1063 {
1064 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1065 	int val;
1066 	int rc;
1067 
1068 	assert(sock != NULL);
1069 
1070 	val = nbytes;
1071 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1072 	if (rc != 0) {
1073 		return -1;
1074 	}
1075 	return 0;
1076 }
1077 
1078 static bool
1079 posix_sock_is_ipv6(struct spdk_sock *_sock)
1080 {
1081 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1082 	struct sockaddr_storage sa;
1083 	socklen_t salen;
1084 	int rc;
1085 
1086 	assert(sock != NULL);
1087 
1088 	memset(&sa, 0, sizeof sa);
1089 	salen = sizeof sa;
1090 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1091 	if (rc != 0) {
1092 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1093 		return false;
1094 	}
1095 
1096 	return (sa.ss_family == AF_INET6);
1097 }
1098 
1099 static bool
1100 posix_sock_is_ipv4(struct spdk_sock *_sock)
1101 {
1102 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1103 	struct sockaddr_storage sa;
1104 	socklen_t salen;
1105 	int rc;
1106 
1107 	assert(sock != NULL);
1108 
1109 	memset(&sa, 0, sizeof sa);
1110 	salen = sizeof sa;
1111 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1112 	if (rc != 0) {
1113 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1114 		return false;
1115 	}
1116 
1117 	return (sa.ss_family == AF_INET);
1118 }
1119 
1120 static bool
1121 posix_sock_is_connected(struct spdk_sock *_sock)
1122 {
1123 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1124 	uint8_t byte;
1125 	int rc;
1126 
1127 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1128 	if (rc == 0) {
1129 		return false;
1130 	}
1131 
1132 	if (rc < 0) {
1133 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1134 			return true;
1135 		}
1136 
1137 		return false;
1138 	}
1139 
1140 	return true;
1141 }
1142 
1143 static struct spdk_sock_group_impl *
1144 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock)
1145 {
1146 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1147 	struct spdk_sock_group_impl *group_impl;
1148 
1149 	if (sock->placement_id != -1) {
1150 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl);
1151 		return group_impl;
1152 	}
1153 
1154 	return NULL;
1155 }
1156 
1157 static struct spdk_sock_group_impl *
1158 posix_sock_group_impl_create(void)
1159 {
1160 	struct spdk_posix_sock_group_impl *group_impl;
1161 	int fd;
1162 
1163 #if defined(SPDK_EPOLL)
1164 	fd = epoll_create1(0);
1165 #elif defined(SPDK_KEVENT)
1166 	fd = kqueue();
1167 #endif
1168 	if (fd == -1) {
1169 		return NULL;
1170 	}
1171 
1172 	group_impl = calloc(1, sizeof(*group_impl));
1173 	if (group_impl == NULL) {
1174 		SPDK_ERRLOG("group_impl allocation failed\n");
1175 		close(fd);
1176 		return NULL;
1177 	}
1178 
1179 	group_impl->fd = fd;
1180 	TAILQ_INIT(&group_impl->socks_with_data);
1181 	group_impl->placement_id = -1;
1182 
1183 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1184 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1185 		group_impl->placement_id = spdk_env_get_current_core();
1186 	}
1187 
1188 	return &group_impl->base;
1189 }
1190 
1191 static void
1192 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock,
1193 		int placement_id)
1194 {
1195 #if defined(SO_MARK)
1196 	int rc;
1197 
1198 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK,
1199 			&placement_id, sizeof(placement_id));
1200 	if (rc != 0) {
1201 		/* Not fatal */
1202 		SPDK_ERRLOG("Error setting SO_MARK\n");
1203 		return;
1204 	}
1205 
1206 	rc = spdk_sock_map_insert(&g_map, placement_id, &group->base);
1207 	if (rc != 0) {
1208 		/* Not fatal */
1209 		SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1210 		return;
1211 	}
1212 
1213 	sock->placement_id = placement_id;
1214 #endif
1215 }
1216 
1217 static void
1218 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1219 {
1220 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1221 
1222 	if (group->placement_id == -1) {
1223 		group->placement_id = spdk_sock_map_find_free(&g_map);
1224 
1225 		/* If a free placement id is found, update existing sockets in this group */
1226 		if (group->placement_id != -1) {
1227 			struct spdk_sock  *sock, *tmp;
1228 
1229 			TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1230 				posix_sock_mark(group, __posix_sock(sock), group->placement_id);
1231 			}
1232 		}
1233 	}
1234 
1235 	if (group->placement_id != -1) {
1236 		/*
1237 		 * group placement id is already determined for this poll group.
1238 		 * Mark socket with group's placement id.
1239 		 */
1240 		posix_sock_mark(group, __posix_sock(_sock), group->placement_id);
1241 	}
1242 }
1243 
1244 static int
1245 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1246 {
1247 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1248 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1249 	int rc;
1250 
1251 #if defined(SPDK_EPOLL)
1252 	struct epoll_event event;
1253 
1254 	memset(&event, 0, sizeof(event));
1255 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1256 	event.events = EPOLLIN | EPOLLERR;
1257 	event.data.ptr = sock;
1258 
1259 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1260 #elif defined(SPDK_KEVENT)
1261 	struct kevent event;
1262 	struct timespec ts = {0};
1263 
1264 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1265 
1266 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1267 #endif
1268 
1269 	if (rc != 0) {
1270 		return rc;
1271 	}
1272 
1273 	/* switched from another polling group due to scheduling */
1274 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1275 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1276 		sock->pipe_has_data = true;
1277 		sock->socket_has_data = false;
1278 		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
1279 	}
1280 
1281 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
1282 		posix_sock_update_mark(_group, _sock);
1283 	} else if (sock->placement_id != -1) {
1284 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1285 		if (rc != 0) {
1286 			SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1287 			/* Do not treat this as an error. The system will continue running. */
1288 		}
1289 	}
1290 
1291 	return rc;
1292 }
1293 
1294 static int
1295 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1296 {
1297 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1298 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1299 	int rc;
1300 
1301 	if (sock->pipe_has_data || sock->socket_has_data) {
1302 		TAILQ_REMOVE(&group->socks_with_data, sock, link);
1303 		sock->pipe_has_data = false;
1304 		sock->socket_has_data = false;
1305 	}
1306 
1307 	if (sock->placement_id != -1) {
1308 		spdk_sock_map_release(&g_map, sock->placement_id);
1309 	}
1310 
1311 #if defined(SPDK_EPOLL)
1312 	struct epoll_event event;
1313 
1314 	/* Event parameter is ignored but some old kernel version still require it. */
1315 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1316 #elif defined(SPDK_KEVENT)
1317 	struct kevent event;
1318 	struct timespec ts = {0};
1319 
1320 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1321 
1322 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1323 	if (rc == 0 && event.flags & EV_ERROR) {
1324 		rc = -1;
1325 		errno = event.data;
1326 	}
1327 #endif
1328 
1329 	spdk_sock_abort_requests(_sock);
1330 
1331 	return rc;
1332 }
1333 
1334 static int
1335 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1336 			   struct spdk_sock **socks)
1337 {
1338 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1339 	struct spdk_sock *sock, *tmp;
1340 	int num_events, i, rc;
1341 	struct spdk_posix_sock *psock, *ptmp;
1342 #if defined(SPDK_EPOLL)
1343 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1344 #elif defined(SPDK_KEVENT)
1345 	struct kevent events[MAX_EVENTS_PER_POLL];
1346 	struct timespec ts = {0};
1347 #endif
1348 
1349 #ifdef SPDK_ZEROCOPY
1350 	/* When all of the following conditions are met
1351 	 * - non-blocking socket
1352 	 * - zero copy is enabled
1353 	 * - interrupts suppressed (i.e. busy polling)
1354 	 * - the NIC tx queue is full at the time sendmsg() is called
1355 	 * - epoll_wait determines there is an EPOLLIN event for the socket
1356 	 * then we can get into a situation where data we've sent is queued
1357 	 * up in the kernel network stack, but interrupts have been suppressed
1358 	 * because other traffic is flowing so the kernel misses the signal
1359 	 * to flush the software tx queue. If there wasn't incoming data
1360 	 * pending on the socket, then epoll_wait would have been sufficient
1361 	 * to kick off the send operation, but since there is a pending event
1362 	 * epoll_wait does not trigger the necessary operation.
1363 	 *
1364 	 * We deal with this by checking for all of the above conditions and
1365 	 * additionally looking for EPOLLIN events that were not consumed from
1366 	 * the last poll loop. We take this to mean that the upper layer is
1367 	 * unable to consume them because it is blocked waiting for resources
1368 	 * to free up, and those resources are most likely freed in response
1369 	 * to a pending asynchronous write completing.
1370 	 *
1371 	 * Additionally, sockets that have the same placement_id actually share
1372 	 * an underlying hardware queue. That means polling one of them is
1373 	 * equivalent to polling all of them. As a quick mechanism to avoid
1374 	 * making extra poll() calls, stash the last placement_id during the loop
1375 	 * and only poll if it's not the same. The overwhelmingly common case
1376 	 * is that all sockets in this list have the same placement_id because
1377 	 * SPDK is intentionally grouping sockets by that value, so even
1378 	 * though this won't stop all extra calls to poll(), it's very fast
1379 	 * and will catch all of them in practice.
1380 	 */
1381 	int last_placement_id = -1;
1382 
1383 	TAILQ_FOREACH(psock, &group->socks_with_data, link) {
1384 		if (psock->zcopy && psock->placement_id >= 0 &&
1385 		    psock->placement_id != last_placement_id) {
1386 			struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0};
1387 
1388 			poll(&pfd, 1, 0);
1389 			last_placement_id = psock->placement_id;
1390 		}
1391 	}
1392 #endif
1393 
1394 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1395 	 * a completion callback could remove the sock from the
1396 	 * group. */
1397 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1398 		rc = _sock_flush(sock);
1399 		if (rc) {
1400 			spdk_sock_abort_requests(sock);
1401 		}
1402 	}
1403 
1404 	assert(max_events > 0);
1405 
1406 #if defined(SPDK_EPOLL)
1407 	num_events = epoll_wait(group->fd, events, max_events, 0);
1408 #elif defined(SPDK_KEVENT)
1409 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1410 #endif
1411 
1412 	if (num_events == -1) {
1413 		return -1;
1414 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
1415 		sock = TAILQ_FIRST(&_group->socks);
1416 		psock = __posix_sock(sock);
1417 		/* poll() is called here to busy poll the queue associated with
1418 		 * first socket in list and potentially reap incoming data.
1419 		 */
1420 		if (sock->opts.priority) {
1421 			struct pollfd pfd = {0, 0, 0};
1422 
1423 			pfd.fd = psock->fd;
1424 			pfd.events = POLLIN | POLLERR;
1425 			poll(&pfd, 1, 0);
1426 		}
1427 	}
1428 
1429 	for (i = 0; i < num_events; i++) {
1430 #if defined(SPDK_EPOLL)
1431 		sock = events[i].data.ptr;
1432 		psock = __posix_sock(sock);
1433 
1434 #ifdef SPDK_ZEROCOPY
1435 		if (events[i].events & EPOLLERR) {
1436 			rc = _sock_check_zcopy(sock);
1437 			/* If the socket was closed or removed from
1438 			 * the group in response to a send ack, don't
1439 			 * add it to the array here. */
1440 			if (rc || sock->cb_fn == NULL) {
1441 				continue;
1442 			}
1443 		}
1444 #endif
1445 		if ((events[i].events & EPOLLIN) == 0) {
1446 			continue;
1447 		}
1448 
1449 #elif defined(SPDK_KEVENT)
1450 		sock = events[i].udata;
1451 		psock = __posix_sock(sock);
1452 #endif
1453 
1454 		/* If the socket is not already in the list, add it now */
1455 		if (!psock->socket_has_data && !psock->pipe_has_data) {
1456 			TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link);
1457 		}
1458 
1459 		psock->socket_has_data = true;
1460 	}
1461 
1462 	num_events = 0;
1463 
1464 	TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) {
1465 		if (num_events == max_events) {
1466 			break;
1467 		}
1468 
1469 		/* If the socket's cb_fn is NULL, just remove it from the
1470 		 * list and do not add it to socks array */
1471 		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
1472 			psock->socket_has_data = false;
1473 			psock->pipe_has_data = false;
1474 			TAILQ_REMOVE(&group->socks_with_data, psock, link);
1475 			continue;
1476 		}
1477 
1478 		socks[num_events++] = &psock->base;
1479 	}
1480 
1481 	/* Cycle the has_data list so that each time we poll things aren't
1482 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1483 	 * A B C D E F
1484 	 * And all 6 sockets had epoll events, but max_events is only 3. That means
1485 	 * psock currently points at D. We want to rearrange the list to the following:
1486 	 * D E F A B C
1487 	 *
1488 	 * The variables below are named according to this example to make it easier to
1489 	 * follow the swaps.
1490 	 */
1491 	if (psock != NULL) {
1492 		struct spdk_posix_sock *pa, *pc, *pd, *pf;
1493 
1494 		/* Capture pointers to the elements we need */
1495 		pd = psock;
1496 		pc = TAILQ_PREV(pd, spdk_has_data_list, link);
1497 		pa = TAILQ_FIRST(&group->socks_with_data);
1498 		pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list);
1499 
1500 		/* Break the link between C and D */
1501 		pc->link.tqe_next = NULL;
1502 
1503 		/* Connect F to A */
1504 		pf->link.tqe_next = pa;
1505 		pa->link.tqe_prev = &pf->link.tqe_next;
1506 
1507 		/* Fix up the list first/last pointers */
1508 		group->socks_with_data.tqh_first = pd;
1509 		group->socks_with_data.tqh_last = &pc->link.tqe_next;
1510 
1511 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1512 		pd->link.tqe_prev = &group->socks_with_data.tqh_first;
1513 	}
1514 
1515 	return num_events;
1516 }
1517 
1518 static int
1519 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1520 {
1521 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1522 	int rc;
1523 
1524 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1525 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1526 	}
1527 
1528 	rc = close(group->fd);
1529 	free(group);
1530 	return rc;
1531 }
1532 
1533 static int
1534 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1535 {
1536 	if (!opts || !len) {
1537 		errno = EINVAL;
1538 		return -1;
1539 	}
1540 	memset(opts, 0, *len);
1541 
1542 #define FIELD_OK(field) \
1543 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1544 
1545 #define GET_FIELD(field) \
1546 	if (FIELD_OK(field)) { \
1547 		opts->field = g_spdk_posix_sock_impl_opts.field; \
1548 	}
1549 
1550 	GET_FIELD(recv_buf_size);
1551 	GET_FIELD(send_buf_size);
1552 	GET_FIELD(enable_recv_pipe);
1553 	GET_FIELD(enable_zerocopy_send);
1554 	GET_FIELD(enable_quickack);
1555 	GET_FIELD(enable_placement_id);
1556 	GET_FIELD(enable_zerocopy_send_server);
1557 	GET_FIELD(enable_zerocopy_send_client);
1558 
1559 #undef GET_FIELD
1560 #undef FIELD_OK
1561 
1562 	*len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
1563 	return 0;
1564 }
1565 
1566 static int
1567 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1568 {
1569 	if (!opts) {
1570 		errno = EINVAL;
1571 		return -1;
1572 	}
1573 
1574 #define FIELD_OK(field) \
1575 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1576 
1577 #define SET_FIELD(field) \
1578 	if (FIELD_OK(field)) { \
1579 		g_spdk_posix_sock_impl_opts.field = opts->field; \
1580 	}
1581 
1582 	SET_FIELD(recv_buf_size);
1583 	SET_FIELD(send_buf_size);
1584 	SET_FIELD(enable_recv_pipe);
1585 	SET_FIELD(enable_zerocopy_send);
1586 	SET_FIELD(enable_quickack);
1587 	SET_FIELD(enable_placement_id);
1588 	SET_FIELD(enable_zerocopy_send_server);
1589 	SET_FIELD(enable_zerocopy_send_client);
1590 
1591 #undef SET_FIELD
1592 #undef FIELD_OK
1593 
1594 	return 0;
1595 }
1596 
1597 
1598 static struct spdk_net_impl g_posix_net_impl = {
1599 	.name		= "posix",
1600 	.getaddr	= posix_sock_getaddr,
1601 	.connect	= posix_sock_connect,
1602 	.listen		= posix_sock_listen,
1603 	.accept		= posix_sock_accept,
1604 	.close		= posix_sock_close,
1605 	.recv		= posix_sock_recv,
1606 	.readv		= posix_sock_readv,
1607 	.writev		= posix_sock_writev,
1608 	.writev_async	= posix_sock_writev_async,
1609 	.flush		= posix_sock_flush,
1610 	.set_recvlowat	= posix_sock_set_recvlowat,
1611 	.set_recvbuf	= posix_sock_set_recvbuf,
1612 	.set_sendbuf	= posix_sock_set_sendbuf,
1613 	.is_ipv6	= posix_sock_is_ipv6,
1614 	.is_ipv4	= posix_sock_is_ipv4,
1615 	.is_connected	= posix_sock_is_connected,
1616 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
1617 	.group_impl_create	= posix_sock_group_impl_create,
1618 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
1619 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
1620 	.group_impl_poll	= posix_sock_group_impl_poll,
1621 	.group_impl_close	= posix_sock_group_impl_close,
1622 	.get_opts	= posix_sock_impl_get_opts,
1623 	.set_opts	= posix_sock_impl_set_opts,
1624 };
1625 
1626 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1627