xref: /spdk/module/sock/posix/posix.c (revision 6569a0ea0630b45bb83582ac2893cda584375dc4)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #if defined(__linux__)
37 #include <sys/epoll.h>
38 #include <linux/errqueue.h>
39 #elif defined(__FreeBSD__)
40 #include <sys/event.h>
41 #endif
42 
43 #include "spdk/log.h"
44 #include "spdk/pipe.h"
45 #include "spdk/sock.h"
46 #include "spdk/util.h"
47 #include "spdk/likely.h"
48 #include "spdk_internal/sock.h"
49 
50 #define MAX_TMPBUF 1024
51 #define PORTNUMLEN 32
52 #define SO_RCVBUF_SIZE (2 * 1024 * 1024)
53 #define SO_SNDBUF_SIZE (2 * 1024 * 1024)
54 #define IOV_BATCH_SIZE 64
55 
56 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
57 #define SPDK_ZEROCOPY
58 #endif
59 
60 struct spdk_posix_sock {
61 	struct spdk_sock	base;
62 	int			fd;
63 
64 	uint32_t		sendmsg_idx;
65 	bool			zcopy;
66 
67 	struct spdk_pipe	*recv_pipe;
68 	void			*recv_buf;
69 	int			recv_buf_sz;
70 	bool			pending_recv;
71 
72 	TAILQ_ENTRY(spdk_posix_sock)	link;
73 };
74 
75 struct spdk_posix_sock_group_impl {
76 	struct spdk_sock_group_impl	base;
77 	int				fd;
78 	TAILQ_HEAD(, spdk_posix_sock)	pending_recv;
79 };
80 
81 static int
82 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
83 {
84 	const char *result = NULL;
85 
86 	if (sa == NULL || host == NULL) {
87 		return -1;
88 	}
89 
90 	switch (sa->sa_family) {
91 	case AF_INET:
92 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
93 				   host, hlen);
94 		break;
95 	case AF_INET6:
96 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
97 				   host, hlen);
98 		break;
99 	default:
100 		break;
101 	}
102 
103 	if (result != NULL) {
104 		return 0;
105 	} else {
106 		return -1;
107 	}
108 }
109 
110 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
111 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
112 
113 static int
114 spdk_posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
115 			char *caddr, int clen, uint16_t *cport)
116 {
117 	struct spdk_posix_sock *sock = __posix_sock(_sock);
118 	struct sockaddr_storage sa;
119 	socklen_t salen;
120 	int rc;
121 
122 	assert(sock != NULL);
123 
124 	memset(&sa, 0, sizeof sa);
125 	salen = sizeof sa;
126 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
127 	if (rc != 0) {
128 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
129 		return -1;
130 	}
131 
132 	switch (sa.ss_family) {
133 	case AF_UNIX:
134 		/* Acceptable connection types that don't have IPs */
135 		return 0;
136 	case AF_INET:
137 	case AF_INET6:
138 		/* Code below will get IP addresses */
139 		break;
140 	default:
141 		/* Unsupported socket family */
142 		return -1;
143 	}
144 
145 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
146 	if (rc != 0) {
147 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
148 		return -1;
149 	}
150 
151 	if (sport) {
152 		if (sa.ss_family == AF_INET) {
153 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
154 		} else if (sa.ss_family == AF_INET6) {
155 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
156 		}
157 	}
158 
159 	memset(&sa, 0, sizeof sa);
160 	salen = sizeof sa;
161 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
162 	if (rc != 0) {
163 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
164 		return -1;
165 	}
166 
167 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
168 	if (rc != 0) {
169 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
170 		return -1;
171 	}
172 
173 	if (cport) {
174 		if (sa.ss_family == AF_INET) {
175 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
176 		} else if (sa.ss_family == AF_INET6) {
177 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
178 		}
179 	}
180 
181 	return 0;
182 }
183 
184 enum spdk_posix_sock_create_type {
185 	SPDK_SOCK_CREATE_LISTEN,
186 	SPDK_SOCK_CREATE_CONNECT,
187 };
188 
189 static int
190 spdk_posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
191 {
192 	uint8_t *new_buf;
193 	struct spdk_pipe *new_pipe;
194 	struct iovec siov[2];
195 	struct iovec diov[2];
196 	int sbytes;
197 	ssize_t bytes;
198 
199 	if (sock->recv_buf_sz == sz) {
200 		return 0;
201 	}
202 
203 	/* If the new size is 0, just free the pipe */
204 	if (sz == 0) {
205 		spdk_pipe_destroy(sock->recv_pipe);
206 		free(sock->recv_buf);
207 		sock->recv_pipe = NULL;
208 		sock->recv_buf = NULL;
209 		return 0;
210 	}
211 
212 	/* Round up to next 64 byte multiple */
213 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
214 	if (!new_buf) {
215 		SPDK_ERRLOG("socket recv buf allocation failed\n");
216 		return -ENOMEM;
217 	}
218 
219 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
220 	if (new_pipe == NULL) {
221 		SPDK_ERRLOG("socket pipe allocation failed\n");
222 		free(new_buf);
223 		return -ENOMEM;
224 	}
225 
226 	if (sock->recv_pipe != NULL) {
227 		/* Pull all of the data out of the old pipe */
228 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
229 		if (sbytes > sz) {
230 			/* Too much data to fit into the new pipe size */
231 			spdk_pipe_destroy(new_pipe);
232 			free(new_buf);
233 			return -EINVAL;
234 		}
235 
236 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
237 		assert(sbytes == sz);
238 
239 		bytes = spdk_iovcpy(siov, 2, diov, 2);
240 		spdk_pipe_writer_advance(new_pipe, bytes);
241 
242 		spdk_pipe_destroy(sock->recv_pipe);
243 		free(sock->recv_buf);
244 	}
245 
246 	sock->recv_buf_sz = sz;
247 	sock->recv_buf = new_buf;
248 	sock->recv_pipe = new_pipe;
249 
250 	return 0;
251 }
252 
253 static int
254 spdk_posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
255 {
256 	struct spdk_posix_sock *sock = __posix_sock(_sock);
257 	int rc;
258 
259 	assert(sock != NULL);
260 
261 #ifndef __aarch64__
262 	/* On ARM systems, this buffering does not help. Skip it. */
263 	rc = spdk_posix_sock_alloc_pipe(sock, sz);
264 	if (rc) {
265 		return rc;
266 	}
267 #endif
268 
269 	/* Set kernel buffer size to be at least SO_RCVBUF_SIZE */
270 	if (sz < SO_RCVBUF_SIZE) {
271 		sz = SO_RCVBUF_SIZE;
272 	}
273 
274 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
275 	if (rc < 0) {
276 		return rc;
277 	}
278 
279 	return 0;
280 }
281 
282 static int
283 spdk_posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
284 {
285 	struct spdk_posix_sock *sock = __posix_sock(_sock);
286 	int rc;
287 
288 	assert(sock != NULL);
289 
290 	if (sz < SO_SNDBUF_SIZE) {
291 		sz = SO_SNDBUF_SIZE;
292 	}
293 
294 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
295 	if (rc < 0) {
296 		return rc;
297 	}
298 
299 	return 0;
300 }
301 
302 static struct spdk_posix_sock *
303 _spdk_posix_sock_alloc(int fd)
304 {
305 	struct spdk_posix_sock *sock;
306 #ifdef SPDK_ZEROCOPY
307 	int rc;
308 	int flag;
309 #endif
310 
311 	sock = calloc(1, sizeof(*sock));
312 	if (sock == NULL) {
313 		SPDK_ERRLOG("sock allocation failed\n");
314 		return NULL;
315 	}
316 
317 	sock->fd = fd;
318 
319 #ifdef SPDK_ZEROCOPY
320 	/* Try to turn on zero copy sends */
321 	flag = 1;
322 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
323 	if (rc == 0) {
324 		sock->zcopy = true;
325 	}
326 #endif
327 
328 	return sock;
329 }
330 
331 static struct spdk_sock *
332 spdk_posix_sock_create(const char *ip, int port, enum spdk_posix_sock_create_type type)
333 {
334 	struct spdk_posix_sock *sock;
335 	char buf[MAX_TMPBUF];
336 	char portnum[PORTNUMLEN];
337 	char *p;
338 	struct addrinfo hints, *res, *res0;
339 	int fd, flag;
340 	int val = 1;
341 	int rc, sz;
342 
343 	if (ip == NULL) {
344 		return NULL;
345 	}
346 	if (ip[0] == '[') {
347 		snprintf(buf, sizeof(buf), "%s", ip + 1);
348 		p = strchr(buf, ']');
349 		if (p != NULL) {
350 			*p = '\0';
351 		}
352 		ip = (const char *) &buf[0];
353 	}
354 
355 	snprintf(portnum, sizeof portnum, "%d", port);
356 	memset(&hints, 0, sizeof hints);
357 	hints.ai_family = PF_UNSPEC;
358 	hints.ai_socktype = SOCK_STREAM;
359 	hints.ai_flags = AI_NUMERICSERV;
360 	hints.ai_flags |= AI_PASSIVE;
361 	hints.ai_flags |= AI_NUMERICHOST;
362 	rc = getaddrinfo(ip, portnum, &hints, &res0);
363 	if (rc != 0) {
364 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
365 		return NULL;
366 	}
367 
368 	/* try listen */
369 	fd = -1;
370 	for (res = res0; res != NULL; res = res->ai_next) {
371 retry:
372 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
373 		if (fd < 0) {
374 			/* error */
375 			continue;
376 		}
377 
378 		sz = SO_RCVBUF_SIZE;
379 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
380 		if (rc) {
381 			/* Not fatal */
382 		}
383 
384 		sz = SO_SNDBUF_SIZE;
385 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
386 		if (rc) {
387 			/* Not fatal */
388 		}
389 
390 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
391 		if (rc != 0) {
392 			close(fd);
393 			/* error */
394 			continue;
395 		}
396 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
397 		if (rc != 0) {
398 			close(fd);
399 			/* error */
400 			continue;
401 		}
402 
403 		if (res->ai_family == AF_INET6) {
404 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
405 			if (rc != 0) {
406 				close(fd);
407 				/* error */
408 				continue;
409 			}
410 		}
411 
412 		if (type == SPDK_SOCK_CREATE_LISTEN) {
413 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
414 			if (rc != 0) {
415 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
416 				switch (errno) {
417 				case EINTR:
418 					/* interrupted? */
419 					close(fd);
420 					goto retry;
421 				case EADDRNOTAVAIL:
422 					SPDK_ERRLOG("IP address %s not available. "
423 						    "Verify IP address in config file "
424 						    "and make sure setup script is "
425 						    "run before starting spdk app.\n", ip);
426 				/* FALLTHROUGH */
427 				default:
428 					/* try next family */
429 					close(fd);
430 					fd = -1;
431 					continue;
432 				}
433 			}
434 			/* bind OK */
435 			rc = listen(fd, 512);
436 			if (rc != 0) {
437 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
438 				close(fd);
439 				fd = -1;
440 				break;
441 			}
442 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
443 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
444 			if (rc != 0) {
445 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
446 				/* try next family */
447 				close(fd);
448 				fd = -1;
449 				continue;
450 			}
451 		}
452 
453 		flag = fcntl(fd, F_GETFL);
454 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
455 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
456 			close(fd);
457 			fd = -1;
458 			break;
459 		}
460 		break;
461 	}
462 	freeaddrinfo(res0);
463 
464 	if (fd < 0) {
465 		return NULL;
466 	}
467 
468 	sock = _spdk_posix_sock_alloc(fd);
469 	if (sock == NULL) {
470 		SPDK_ERRLOG("sock allocation failed\n");
471 		close(fd);
472 		return NULL;
473 	}
474 
475 	/* Disable zero copy for client sockets until support is added */
476 	if (type == SPDK_SOCK_CREATE_CONNECT) {
477 		sock->zcopy = false;
478 	}
479 
480 	return &sock->base;
481 }
482 
483 static struct spdk_sock *
484 spdk_posix_sock_listen(const char *ip, int port)
485 {
486 	return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN);
487 }
488 
489 static struct spdk_sock *
490 spdk_posix_sock_connect(const char *ip, int port)
491 {
492 	return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT);
493 }
494 
495 static struct spdk_sock *
496 spdk_posix_sock_accept(struct spdk_sock *_sock)
497 {
498 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
499 	struct sockaddr_storage		sa;
500 	socklen_t			salen;
501 	int				rc, fd;
502 	struct spdk_posix_sock		*new_sock;
503 	int				flag;
504 
505 	memset(&sa, 0, sizeof(sa));
506 	salen = sizeof(sa);
507 
508 	assert(sock != NULL);
509 
510 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
511 
512 	if (rc == -1) {
513 		return NULL;
514 	}
515 
516 	fd = rc;
517 
518 	flag = fcntl(fd, F_GETFL);
519 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
520 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
521 		close(fd);
522 		return NULL;
523 	}
524 
525 	new_sock = _spdk_posix_sock_alloc(fd);
526 	if (new_sock == NULL) {
527 		close(fd);
528 		return NULL;
529 	}
530 
531 	return &new_sock->base;
532 }
533 
534 static int
535 spdk_posix_sock_close(struct spdk_sock *_sock)
536 {
537 	struct spdk_posix_sock *sock = __posix_sock(_sock);
538 
539 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
540 
541 	/* If the socket fails to close, the best choice is to
542 	 * leak the fd but continue to free the rest of the sock
543 	 * memory. */
544 	close(sock->fd);
545 
546 	spdk_pipe_destroy(sock->recv_pipe);
547 	free(sock->recv_buf);
548 	free(sock);
549 
550 	return 0;
551 }
552 
553 #ifdef SPDK_ZEROCOPY
554 static int
555 _sock_check_zcopy(struct spdk_sock *sock)
556 {
557 	struct spdk_posix_sock *psock = __posix_sock(sock);
558 	struct msghdr msgh = {};
559 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
560 	ssize_t rc;
561 	struct sock_extended_err *serr;
562 	struct cmsghdr *cm;
563 	uint32_t idx;
564 	struct spdk_sock_request *req, *treq;
565 	bool found;
566 
567 	msgh.msg_control = buf;
568 	msgh.msg_controllen = sizeof(buf);
569 
570 	while (true) {
571 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
572 
573 		if (rc < 0) {
574 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
575 				return 0;
576 			}
577 
578 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
579 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
580 			} else {
581 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
582 			}
583 			return 0;
584 		}
585 
586 		cm = CMSG_FIRSTHDR(&msgh);
587 		if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
588 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
589 			return 0;
590 		}
591 
592 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
593 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
594 			SPDK_WARNLOG("Unexpected extended error origin\n");
595 			return 0;
596 		}
597 
598 		/* Most of the time, the pending_reqs array is in the exact
599 		 * order we need such that all of the requests to complete are
600 		 * in order, in the front. It is guaranteed that all requests
601 		 * belonging to the same sendmsg call are sequential, so once
602 		 * we encounter one match we can stop looping as soon as a
603 		 * non-match is found.
604 		 */
605 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
606 			found = false;
607 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
608 				if (req->internal.offset == idx) {
609 					found = true;
610 
611 					rc = spdk_sock_request_put(sock, req, 0);
612 					if (rc < 0) {
613 						return rc;
614 					}
615 
616 				} else if (found) {
617 					break;
618 				}
619 			}
620 
621 		}
622 	}
623 
624 	return 0;
625 }
626 #endif
627 
628 static int
629 _sock_flush(struct spdk_sock *sock)
630 {
631 	struct spdk_posix_sock *psock = __posix_sock(sock);
632 	struct msghdr msg = {};
633 	int flags;
634 	struct iovec iovs[IOV_BATCH_SIZE];
635 	int iovcnt;
636 	int retval;
637 	struct spdk_sock_request *req;
638 	int i;
639 	ssize_t rc;
640 	unsigned int offset;
641 	size_t len;
642 
643 	/* Can't flush from within a callback or we end up with recursive calls */
644 	if (sock->cb_cnt > 0) {
645 		return 0;
646 	}
647 
648 	/* Gather an iov */
649 	iovcnt = 0;
650 	req = TAILQ_FIRST(&sock->queued_reqs);
651 	while (req) {
652 		offset = req->internal.offset;
653 
654 		for (i = 0; i < req->iovcnt; i++) {
655 			/* Consume any offset first */
656 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
657 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
658 				continue;
659 			}
660 
661 			iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
662 			iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
663 			iovcnt++;
664 
665 			offset = 0;
666 
667 			if (iovcnt >= IOV_BATCH_SIZE) {
668 				break;
669 			}
670 		}
671 
672 		if (iovcnt >= IOV_BATCH_SIZE) {
673 			break;
674 		}
675 
676 		req = TAILQ_NEXT(req, internal.link);
677 	}
678 
679 	if (iovcnt == 0) {
680 		return 0;
681 	}
682 
683 	/* Perform the vectored write */
684 	msg.msg_iov = iovs;
685 	msg.msg_iovlen = iovcnt;
686 #ifdef SPDK_ZEROCOPY
687 	if (psock->zcopy) {
688 		flags = MSG_ZEROCOPY;
689 	} else
690 #endif
691 	{
692 		flags = 0;
693 	}
694 	rc = sendmsg(psock->fd, &msg, flags);
695 	if (rc <= 0) {
696 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
697 			return 0;
698 		}
699 		return rc;
700 	}
701 
702 	psock->sendmsg_idx++;
703 
704 	/* Consume the requests that were actually written */
705 	req = TAILQ_FIRST(&sock->queued_reqs);
706 	while (req) {
707 		offset = req->internal.offset;
708 
709 		for (i = 0; i < req->iovcnt; i++) {
710 			/* Advance by the offset first */
711 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
712 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
713 				continue;
714 			}
715 
716 			/* Calculate the remaining length of this element */
717 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
718 
719 			if (len > (size_t)rc) {
720 				/* This element was partially sent. */
721 				req->internal.offset += rc;
722 				return 0;
723 			}
724 
725 			offset = 0;
726 			req->internal.offset += len;
727 			rc -= len;
728 		}
729 
730 		/* Handled a full request. */
731 		spdk_sock_request_pend(sock, req);
732 
733 		if (!psock->zcopy) {
734 			/* The sendmsg syscall above isn't currently asynchronous,
735 			* so it's already done. */
736 			retval = spdk_sock_request_put(sock, req, 0);
737 			if (retval) {
738 				break;
739 			}
740 		} else {
741 			/* Re-use the offset field to hold the sendmsg call index. The
742 			 * index is 0 based, so subtract one here because we've already
743 			 * incremented above. */
744 			req->internal.offset = psock->sendmsg_idx - 1;
745 		}
746 
747 		if (rc == 0) {
748 			break;
749 		}
750 
751 		req = TAILQ_FIRST(&sock->queued_reqs);
752 	}
753 
754 	return 0;
755 }
756 
757 static int
758 spdk_posix_sock_flush(struct spdk_sock *_sock)
759 {
760 	return _sock_flush(_sock);
761 }
762 
763 static ssize_t
764 spdk_posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
765 {
766 	struct iovec siov[2];
767 	int sbytes;
768 	ssize_t bytes;
769 	struct spdk_posix_sock_group_impl *group;
770 
771 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
772 	if (sbytes < 0) {
773 		errno = EINVAL;
774 		return -1;
775 	} else if (sbytes == 0) {
776 		errno = EAGAIN;
777 		return -1;
778 	}
779 
780 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
781 
782 	if (bytes == 0) {
783 		/* The only way this happens is if diov is 0 length */
784 		errno = EINVAL;
785 		return -1;
786 	}
787 
788 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
789 
790 	/* If we drained the pipe, take it off the level-triggered list */
791 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
792 		group = __posix_group_impl(sock->base.group_impl);
793 		TAILQ_REMOVE(&group->pending_recv, sock, link);
794 		sock->pending_recv = false;
795 	}
796 
797 	return bytes;
798 }
799 
800 static inline ssize_t
801 _spdk_posix_sock_read(struct spdk_posix_sock *sock)
802 {
803 	struct iovec iov[2];
804 	int bytes;
805 	struct spdk_posix_sock_group_impl *group;
806 
807 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
808 
809 	if (bytes > 0) {
810 		bytes = readv(sock->fd, iov, 2);
811 		if (bytes > 0) {
812 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
813 			if (sock->base.group_impl) {
814 				group = __posix_group_impl(sock->base.group_impl);
815 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
816 				sock->pending_recv = true;
817 			}
818 		}
819 	}
820 
821 	return bytes;
822 }
823 
824 static ssize_t
825 spdk_posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
826 {
827 	struct spdk_posix_sock *sock = __posix_sock(_sock);
828 	int rc, i;
829 	size_t len;
830 
831 	if (sock->recv_pipe == NULL) {
832 		return readv(sock->fd, iov, iovcnt);
833 	}
834 
835 	len = 0;
836 	for (i = 0; i < iovcnt; i++) {
837 		len += iov[i].iov_len;
838 	}
839 
840 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
841 		/* If the user is receiving a sufficiently large amount of data,
842 		 * receive directly to their buffers. */
843 		if (len >= 1024) {
844 			return readv(sock->fd, iov, iovcnt);
845 		}
846 
847 		/* Otherwise, do a big read into our pipe */
848 		rc = _spdk_posix_sock_read(sock);
849 		if (rc <= 0) {
850 			return rc;
851 		}
852 	}
853 
854 	return spdk_posix_sock_recv_from_pipe(sock, iov, iovcnt);
855 }
856 
857 static ssize_t
858 spdk_posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
859 {
860 	struct iovec iov[1];
861 
862 	iov[0].iov_base = buf;
863 	iov[0].iov_len = len;
864 
865 	return spdk_posix_sock_readv(sock, iov, 1);
866 }
867 
868 static ssize_t
869 spdk_posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
870 {
871 	struct spdk_posix_sock *sock = __posix_sock(_sock);
872 	int rc;
873 
874 	/* In order to process a writev, we need to flush any asynchronous writes
875 	 * first. */
876 	rc = _sock_flush(_sock);
877 	if (rc < 0) {
878 		return rc;
879 	}
880 
881 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
882 		/* We weren't able to flush all requests */
883 		errno = EAGAIN;
884 		return -1;
885 	}
886 
887 	return writev(sock->fd, iov, iovcnt);
888 }
889 
890 static void
891 spdk_posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
892 {
893 	int rc;
894 
895 	spdk_sock_request_queue(sock, req);
896 
897 	/* If there are a sufficient number queued, just flush them out immediately. */
898 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
899 		rc = _sock_flush(sock);
900 		if (rc) {
901 			spdk_sock_abort_requests(sock);
902 		}
903 	}
904 }
905 
906 static int
907 spdk_posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
908 {
909 	struct spdk_posix_sock *sock = __posix_sock(_sock);
910 	int val;
911 	int rc;
912 
913 	assert(sock != NULL);
914 
915 	val = nbytes;
916 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
917 	if (rc != 0) {
918 		return -1;
919 	}
920 	return 0;
921 }
922 
923 static bool
924 spdk_posix_sock_is_ipv6(struct spdk_sock *_sock)
925 {
926 	struct spdk_posix_sock *sock = __posix_sock(_sock);
927 	struct sockaddr_storage sa;
928 	socklen_t salen;
929 	int rc;
930 
931 	assert(sock != NULL);
932 
933 	memset(&sa, 0, sizeof sa);
934 	salen = sizeof sa;
935 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
936 	if (rc != 0) {
937 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
938 		return false;
939 	}
940 
941 	return (sa.ss_family == AF_INET6);
942 }
943 
944 static bool
945 spdk_posix_sock_is_ipv4(struct spdk_sock *_sock)
946 {
947 	struct spdk_posix_sock *sock = __posix_sock(_sock);
948 	struct sockaddr_storage sa;
949 	socklen_t salen;
950 	int rc;
951 
952 	assert(sock != NULL);
953 
954 	memset(&sa, 0, sizeof sa);
955 	salen = sizeof sa;
956 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
957 	if (rc != 0) {
958 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
959 		return false;
960 	}
961 
962 	return (sa.ss_family == AF_INET);
963 }
964 
965 static bool
966 spdk_posix_sock_is_connected(struct spdk_sock *_sock)
967 {
968 	struct spdk_posix_sock *sock = __posix_sock(_sock);
969 	uint8_t byte;
970 	int rc;
971 
972 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
973 	if (rc == 0) {
974 		return false;
975 	}
976 
977 	if (rc < 0) {
978 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
979 			return true;
980 		}
981 
982 		return false;
983 	}
984 
985 	return true;
986 }
987 
988 static int
989 spdk_posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
990 {
991 	int rc = -1;
992 
993 #if defined(SO_INCOMING_NAPI_ID)
994 	struct spdk_posix_sock *sock = __posix_sock(_sock);
995 	socklen_t salen = sizeof(int);
996 
997 	rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
998 	if (rc != 0) {
999 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1000 	}
1001 
1002 #endif
1003 	return rc;
1004 }
1005 
1006 static struct spdk_sock_group_impl *
1007 spdk_posix_sock_group_impl_create(void)
1008 {
1009 	struct spdk_posix_sock_group_impl *group_impl;
1010 	int fd;
1011 
1012 #if defined(__linux__)
1013 	fd = epoll_create1(0);
1014 #elif defined(__FreeBSD__)
1015 	fd = kqueue();
1016 #endif
1017 	if (fd == -1) {
1018 		return NULL;
1019 	}
1020 
1021 	group_impl = calloc(1, sizeof(*group_impl));
1022 	if (group_impl == NULL) {
1023 		SPDK_ERRLOG("group_impl allocation failed\n");
1024 		close(fd);
1025 		return NULL;
1026 	}
1027 
1028 	group_impl->fd = fd;
1029 	TAILQ_INIT(&group_impl->pending_recv);
1030 
1031 	return &group_impl->base;
1032 }
1033 
1034 static int
1035 spdk_posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1036 {
1037 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1038 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1039 	int rc;
1040 
1041 #if defined(__linux__)
1042 	struct epoll_event event;
1043 
1044 	memset(&event, 0, sizeof(event));
1045 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1046 	event.events = EPOLLIN | EPOLLERR;
1047 	event.data.ptr = sock;
1048 
1049 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1050 #elif defined(__FreeBSD__)
1051 	struct kevent event;
1052 	struct timespec ts = {0};
1053 
1054 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1055 
1056 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1057 #endif
1058 
1059 	/* switched from another polling group due to scheduling */
1060 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1061 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1062 		assert(sock->pending_recv == false);
1063 		sock->pending_recv = true;
1064 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1065 	}
1066 
1067 	return rc;
1068 }
1069 
1070 static int
1071 spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1072 {
1073 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1074 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1075 	int rc;
1076 
1077 	if (sock->recv_pipe != NULL) {
1078 		if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
1079 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1080 			sock->pending_recv = false;
1081 		}
1082 		assert(sock->pending_recv == false);
1083 	}
1084 
1085 #if defined(__linux__)
1086 	struct epoll_event event;
1087 
1088 	/* Event parameter is ignored but some old kernel version still require it. */
1089 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1090 #elif defined(__FreeBSD__)
1091 	struct kevent event;
1092 	struct timespec ts = {0};
1093 
1094 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1095 
1096 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1097 	if (rc == 0 && event.flags & EV_ERROR) {
1098 		rc = -1;
1099 		errno = event.data;
1100 	}
1101 #endif
1102 
1103 	spdk_sock_abort_requests(_sock);
1104 
1105 	return rc;
1106 }
1107 
1108 static int
1109 spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1110 				struct spdk_sock **socks)
1111 {
1112 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1113 	struct spdk_sock *sock, *tmp;
1114 	int num_events, i, rc;
1115 	struct spdk_posix_sock *psock, *ptmp;
1116 #if defined(__linux__)
1117 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1118 #elif defined(__FreeBSD__)
1119 	struct kevent events[MAX_EVENTS_PER_POLL];
1120 	struct timespec ts = {0};
1121 #endif
1122 
1123 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1124 	 * a completion callback could remove the sock from the
1125 	 * group. */
1126 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1127 		rc = _sock_flush(sock);
1128 		if (rc) {
1129 			spdk_sock_abort_requests(sock);
1130 		}
1131 	}
1132 
1133 #if defined(__linux__)
1134 	num_events = epoll_wait(group->fd, events, max_events, 0);
1135 #elif defined(__FreeBSD__)
1136 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1137 #endif
1138 
1139 	if (num_events == -1) {
1140 		return -1;
1141 	}
1142 
1143 	for (i = 0; i < num_events; i++) {
1144 #if defined(__linux__)
1145 		sock = events[i].data.ptr;
1146 		psock = __posix_sock(sock);
1147 
1148 #ifdef SPDK_ZEROCOPY
1149 		if (events[i].events & EPOLLERR) {
1150 			rc = _sock_check_zcopy(sock);
1151 			/* If the socket was closed or removed from
1152 			 * the group in response to a send ack, don't
1153 			 * add it to the array here. */
1154 			if (rc || sock->cb_fn == NULL) {
1155 				continue;
1156 			}
1157 		}
1158 #endif
1159 		if ((events[i].events & EPOLLIN) == 0) {
1160 			continue;
1161 		}
1162 
1163 #elif defined(__FreeBSD__)
1164 		sock = events[i].udata;
1165 		psock = __posix_sock(sock);
1166 #endif
1167 
1168 		/* If the socket does not already have recv pending, add it now */
1169 		if (!psock->pending_recv) {
1170 			psock->pending_recv = true;
1171 			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1172 		}
1173 	}
1174 
1175 	num_events = 0;
1176 
1177 	TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) {
1178 		if (num_events == max_events) {
1179 			break;
1180 		}
1181 
1182 		socks[num_events++] = &psock->base;
1183 	}
1184 
1185 	/* Cycle the pending_recv list so that each time we poll things aren't
1186 	 * in the same order. */
1187 	for (i = 0; i < num_events; i++) {
1188 		psock = __posix_sock(socks[i]);
1189 
1190 		TAILQ_REMOVE(&group->pending_recv, psock, link);
1191 
1192 		if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) {
1193 			psock->pending_recv = false;
1194 		} else {
1195 			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1196 		}
1197 
1198 	}
1199 
1200 	return num_events;
1201 }
1202 
1203 static int
1204 spdk_posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1205 {
1206 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1207 	int rc;
1208 
1209 	rc = close(group->fd);
1210 	free(group);
1211 	return rc;
1212 }
1213 
1214 static struct spdk_net_impl g_posix_net_impl = {
1215 	.name		= "posix",
1216 	.getaddr	= spdk_posix_sock_getaddr,
1217 	.connect	= spdk_posix_sock_connect,
1218 	.listen		= spdk_posix_sock_listen,
1219 	.accept		= spdk_posix_sock_accept,
1220 	.close		= spdk_posix_sock_close,
1221 	.recv		= spdk_posix_sock_recv,
1222 	.readv		= spdk_posix_sock_readv,
1223 	.writev		= spdk_posix_sock_writev,
1224 	.writev_async	= spdk_posix_sock_writev_async,
1225 	.flush		= spdk_posix_sock_flush,
1226 	.set_recvlowat	= spdk_posix_sock_set_recvlowat,
1227 	.set_recvbuf	= spdk_posix_sock_set_recvbuf,
1228 	.set_sendbuf	= spdk_posix_sock_set_sendbuf,
1229 	.is_ipv6	= spdk_posix_sock_is_ipv6,
1230 	.is_ipv4	= spdk_posix_sock_is_ipv4,
1231 	.is_connected	= spdk_posix_sock_is_connected,
1232 	.get_placement_id	= spdk_posix_sock_get_placement_id,
1233 	.group_impl_create	= spdk_posix_sock_group_impl_create,
1234 	.group_impl_add_sock	= spdk_posix_sock_group_impl_add_sock,
1235 	.group_impl_remove_sock = spdk_posix_sock_group_impl_remove_sock,
1236 	.group_impl_poll	= spdk_posix_sock_group_impl_poll,
1237 	.group_impl_close	= spdk_posix_sock_group_impl_close,
1238 };
1239 
1240 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1241