xref: /spdk/module/sock/posix/posix.c (revision 1f4f4cc75a522f897856e980a0b35d3c8fac24ed)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #if defined(__linux__)
37 #include <sys/epoll.h>
38 #include <linux/errqueue.h>
39 #elif defined(__FreeBSD__)
40 #include <sys/event.h>
41 #endif
42 
43 #include "spdk/log.h"
44 #include "spdk/pipe.h"
45 #include "spdk/sock.h"
46 #include "spdk/util.h"
47 #include "spdk/likely.h"
48 #include "spdk_internal/sock.h"
49 
50 #define MAX_TMPBUF 1024
51 #define PORTNUMLEN 32
52 #define SO_RCVBUF_SIZE (2 * 1024 * 1024)
53 #define SO_SNDBUF_SIZE (2 * 1024 * 1024)
54 #define IOV_BATCH_SIZE 64
55 
56 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
57 #define SPDK_ZEROCOPY
58 #endif
59 
60 struct spdk_posix_sock {
61 	struct spdk_sock	base;
62 	int			fd;
63 
64 	uint32_t		sendmsg_idx;
65 	bool			zcopy;
66 
67 	struct spdk_pipe	*recv_pipe;
68 	void			*recv_buf;
69 	int			recv_buf_sz;
70 	bool			pending_recv;
71 
72 	TAILQ_ENTRY(spdk_posix_sock)	link;
73 };
74 
75 struct spdk_posix_sock_group_impl {
76 	struct spdk_sock_group_impl	base;
77 	int				fd;
78 	TAILQ_HEAD(, spdk_posix_sock)	pending_recv;
79 };
80 
81 static int
82 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
83 {
84 	const char *result = NULL;
85 
86 	if (sa == NULL || host == NULL) {
87 		return -1;
88 	}
89 
90 	switch (sa->sa_family) {
91 	case AF_INET:
92 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
93 				   host, hlen);
94 		break;
95 	case AF_INET6:
96 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
97 				   host, hlen);
98 		break;
99 	default:
100 		break;
101 	}
102 
103 	if (result != NULL) {
104 		return 0;
105 	} else {
106 		return -1;
107 	}
108 }
109 
110 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
111 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
112 
113 static int
114 spdk_posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
115 			char *caddr, int clen, uint16_t *cport)
116 {
117 	struct spdk_posix_sock *sock = __posix_sock(_sock);
118 	struct sockaddr_storage sa;
119 	socklen_t salen;
120 	int rc;
121 
122 	assert(sock != NULL);
123 
124 	memset(&sa, 0, sizeof sa);
125 	salen = sizeof sa;
126 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
127 	if (rc != 0) {
128 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
129 		return -1;
130 	}
131 
132 	switch (sa.ss_family) {
133 	case AF_UNIX:
134 		/* Acceptable connection types that don't have IPs */
135 		return 0;
136 	case AF_INET:
137 	case AF_INET6:
138 		/* Code below will get IP addresses */
139 		break;
140 	default:
141 		/* Unsupported socket family */
142 		return -1;
143 	}
144 
145 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
146 	if (rc != 0) {
147 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
148 		return -1;
149 	}
150 
151 	if (sport) {
152 		if (sa.ss_family == AF_INET) {
153 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
154 		} else if (sa.ss_family == AF_INET6) {
155 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
156 		}
157 	}
158 
159 	memset(&sa, 0, sizeof sa);
160 	salen = sizeof sa;
161 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
162 	if (rc != 0) {
163 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
164 		return -1;
165 	}
166 
167 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
168 	if (rc != 0) {
169 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
170 		return -1;
171 	}
172 
173 	if (cport) {
174 		if (sa.ss_family == AF_INET) {
175 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
176 		} else if (sa.ss_family == AF_INET6) {
177 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
178 		}
179 	}
180 
181 	return 0;
182 }
183 
184 enum spdk_posix_sock_create_type {
185 	SPDK_SOCK_CREATE_LISTEN,
186 	SPDK_SOCK_CREATE_CONNECT,
187 };
188 
189 static int
190 spdk_posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
191 {
192 	uint8_t *new_buf;
193 	struct spdk_pipe *new_pipe;
194 	struct iovec siov[2];
195 	struct iovec diov[2];
196 	int sbytes;
197 	ssize_t bytes;
198 
199 	if (sock->recv_buf_sz == sz) {
200 		return 0;
201 	}
202 
203 	/* If the new size is 0, just free the pipe */
204 	if (sz == 0) {
205 		spdk_pipe_destroy(sock->recv_pipe);
206 		free(sock->recv_buf);
207 		sock->recv_pipe = NULL;
208 		sock->recv_buf = NULL;
209 		return 0;
210 	}
211 
212 	/* Round up to next 64 byte multiple */
213 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
214 	if (!new_buf) {
215 		SPDK_ERRLOG("socket recv buf allocation failed\n");
216 		return -ENOMEM;
217 	}
218 
219 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
220 	if (new_pipe == NULL) {
221 		SPDK_ERRLOG("socket pipe allocation failed\n");
222 		free(new_buf);
223 		return -ENOMEM;
224 	}
225 
226 	if (sock->recv_pipe != NULL) {
227 		/* Pull all of the data out of the old pipe */
228 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
229 		if (sbytes > sz) {
230 			/* Too much data to fit into the new pipe size */
231 			spdk_pipe_destroy(new_pipe);
232 			free(new_buf);
233 			return -EINVAL;
234 		}
235 
236 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
237 		assert(sbytes == sz);
238 
239 		bytes = spdk_iovcpy(siov, 2, diov, 2);
240 		spdk_pipe_writer_advance(new_pipe, bytes);
241 
242 		spdk_pipe_destroy(sock->recv_pipe);
243 		free(sock->recv_buf);
244 	}
245 
246 	sock->recv_buf_sz = sz;
247 	sock->recv_buf = new_buf;
248 	sock->recv_pipe = new_pipe;
249 
250 	return 0;
251 }
252 
253 static int
254 spdk_posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
255 {
256 	struct spdk_posix_sock *sock = __posix_sock(_sock);
257 	int rc;
258 
259 	assert(sock != NULL);
260 
261 #ifndef __aarch64__
262 	/* On ARM systems, this buffering does not help. Skip it. */
263 	rc = spdk_posix_sock_alloc_pipe(sock, sz);
264 	if (rc) {
265 		return rc;
266 	}
267 #endif
268 
269 	/* Set kernel buffer size to be at least SO_RCVBUF_SIZE */
270 	if (sz < SO_RCVBUF_SIZE) {
271 		sz = SO_RCVBUF_SIZE;
272 	}
273 
274 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
275 	if (rc < 0) {
276 		return rc;
277 	}
278 
279 	return 0;
280 }
281 
282 static int
283 spdk_posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
284 {
285 	struct spdk_posix_sock *sock = __posix_sock(_sock);
286 	int rc;
287 
288 	assert(sock != NULL);
289 
290 	if (sz < SO_SNDBUF_SIZE) {
291 		sz = SO_SNDBUF_SIZE;
292 	}
293 
294 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
295 	if (rc < 0) {
296 		return rc;
297 	}
298 
299 	return 0;
300 }
301 
302 static struct spdk_posix_sock *
303 _spdk_posix_sock_alloc(int fd)
304 {
305 	struct spdk_posix_sock *sock;
306 #ifdef SPDK_ZEROCOPY
307 	int rc;
308 	int flag;
309 #endif
310 
311 	sock = calloc(1, sizeof(*sock));
312 	if (sock == NULL) {
313 		SPDK_ERRLOG("sock allocation failed\n");
314 		return NULL;
315 	}
316 
317 	sock->fd = fd;
318 
319 #ifdef SPDK_ZEROCOPY
320 	/* Try to turn on zero copy sends */
321 	flag = 1;
322 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
323 	if (rc == 0) {
324 		sock->zcopy = true;
325 	}
326 #endif
327 
328 	return sock;
329 }
330 
331 static struct spdk_sock *
332 spdk_posix_sock_create(const char *ip, int port,
333 		       enum spdk_posix_sock_create_type type,
334 		       struct spdk_sock_opts *opts)
335 {
336 	struct spdk_posix_sock *sock;
337 	char buf[MAX_TMPBUF];
338 	char portnum[PORTNUMLEN];
339 	char *p;
340 	struct addrinfo hints, *res, *res0;
341 	int fd, flag;
342 	int val = 1;
343 	int rc, sz;
344 
345 	if (ip == NULL) {
346 		return NULL;
347 	}
348 	if (ip[0] == '[') {
349 		snprintf(buf, sizeof(buf), "%s", ip + 1);
350 		p = strchr(buf, ']');
351 		if (p != NULL) {
352 			*p = '\0';
353 		}
354 		ip = (const char *) &buf[0];
355 	}
356 
357 	snprintf(portnum, sizeof portnum, "%d", port);
358 	memset(&hints, 0, sizeof hints);
359 	hints.ai_family = PF_UNSPEC;
360 	hints.ai_socktype = SOCK_STREAM;
361 	hints.ai_flags = AI_NUMERICSERV;
362 	hints.ai_flags |= AI_PASSIVE;
363 	hints.ai_flags |= AI_NUMERICHOST;
364 	rc = getaddrinfo(ip, portnum, &hints, &res0);
365 	if (rc != 0) {
366 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
367 		return NULL;
368 	}
369 
370 	/* try listen */
371 	fd = -1;
372 	for (res = res0; res != NULL; res = res->ai_next) {
373 retry:
374 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
375 		if (fd < 0) {
376 			/* error */
377 			continue;
378 		}
379 
380 		sz = SO_RCVBUF_SIZE;
381 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
382 		if (rc) {
383 			/* Not fatal */
384 		}
385 
386 		sz = SO_SNDBUF_SIZE;
387 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
388 		if (rc) {
389 			/* Not fatal */
390 		}
391 
392 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
393 		if (rc != 0) {
394 			close(fd);
395 			/* error */
396 			continue;
397 		}
398 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
399 		if (rc != 0) {
400 			close(fd);
401 			/* error */
402 			continue;
403 		}
404 
405 #if defined(SO_PRIORITY)
406 		if (opts != NULL && opts->priority) {
407 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
408 			if (rc != 0) {
409 				close(fd);
410 				/* error */
411 				continue;
412 			}
413 		}
414 #endif
415 
416 		if (res->ai_family == AF_INET6) {
417 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
418 			if (rc != 0) {
419 				close(fd);
420 				/* error */
421 				continue;
422 			}
423 		}
424 
425 		if (type == SPDK_SOCK_CREATE_LISTEN) {
426 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
427 			if (rc != 0) {
428 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
429 				switch (errno) {
430 				case EINTR:
431 					/* interrupted? */
432 					close(fd);
433 					goto retry;
434 				case EADDRNOTAVAIL:
435 					SPDK_ERRLOG("IP address %s not available. "
436 						    "Verify IP address in config file "
437 						    "and make sure setup script is "
438 						    "run before starting spdk app.\n", ip);
439 				/* FALLTHROUGH */
440 				default:
441 					/* try next family */
442 					close(fd);
443 					fd = -1;
444 					continue;
445 				}
446 			}
447 			/* bind OK */
448 			rc = listen(fd, 512);
449 			if (rc != 0) {
450 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
451 				close(fd);
452 				fd = -1;
453 				break;
454 			}
455 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
456 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
457 			if (rc != 0) {
458 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
459 				/* try next family */
460 				close(fd);
461 				fd = -1;
462 				continue;
463 			}
464 		}
465 
466 		flag = fcntl(fd, F_GETFL);
467 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
468 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
469 			close(fd);
470 			fd = -1;
471 			break;
472 		}
473 		break;
474 	}
475 	freeaddrinfo(res0);
476 
477 	if (fd < 0) {
478 		return NULL;
479 	}
480 
481 	sock = _spdk_posix_sock_alloc(fd);
482 	if (sock == NULL) {
483 		SPDK_ERRLOG("sock allocation failed\n");
484 		close(fd);
485 		return NULL;
486 	}
487 
488 	/* Disable zero copy for client sockets until support is added */
489 	if (type == SPDK_SOCK_CREATE_CONNECT) {
490 		sock->zcopy = false;
491 	}
492 
493 	return &sock->base;
494 }
495 
496 static struct spdk_sock *
497 spdk_posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
498 {
499 	return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
500 }
501 
502 static struct spdk_sock *
503 spdk_posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
504 {
505 	return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
506 }
507 
508 static struct spdk_sock *
509 spdk_posix_sock_accept(struct spdk_sock *_sock)
510 {
511 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
512 	struct sockaddr_storage		sa;
513 	socklen_t			salen;
514 	int				rc, fd;
515 	struct spdk_posix_sock		*new_sock;
516 	int				flag;
517 
518 	memset(&sa, 0, sizeof(sa));
519 	salen = sizeof(sa);
520 
521 	assert(sock != NULL);
522 
523 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
524 
525 	if (rc == -1) {
526 		return NULL;
527 	}
528 
529 	fd = rc;
530 
531 	flag = fcntl(fd, F_GETFL);
532 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
533 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
534 		close(fd);
535 		return NULL;
536 	}
537 
538 #if defined(SO_PRIORITY)
539 	/* The priority is not inherited, so call this function again */
540 	if (sock->base.opts.priority) {
541 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
542 		if (rc != 0) {
543 			close(fd);
544 			return NULL;
545 		}
546 	}
547 #endif
548 
549 	new_sock = _spdk_posix_sock_alloc(fd);
550 	if (new_sock == NULL) {
551 		close(fd);
552 		return NULL;
553 	}
554 
555 	return &new_sock->base;
556 }
557 
558 static int
559 spdk_posix_sock_close(struct spdk_sock *_sock)
560 {
561 	struct spdk_posix_sock *sock = __posix_sock(_sock);
562 
563 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
564 
565 	/* If the socket fails to close, the best choice is to
566 	 * leak the fd but continue to free the rest of the sock
567 	 * memory. */
568 	close(sock->fd);
569 
570 	spdk_pipe_destroy(sock->recv_pipe);
571 	free(sock->recv_buf);
572 	free(sock);
573 
574 	return 0;
575 }
576 
577 #ifdef SPDK_ZEROCOPY
578 static int
579 _sock_check_zcopy(struct spdk_sock *sock)
580 {
581 	struct spdk_posix_sock *psock = __posix_sock(sock);
582 	struct msghdr msgh = {};
583 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
584 	ssize_t rc;
585 	struct sock_extended_err *serr;
586 	struct cmsghdr *cm;
587 	uint32_t idx;
588 	struct spdk_sock_request *req, *treq;
589 	bool found;
590 
591 	msgh.msg_control = buf;
592 	msgh.msg_controllen = sizeof(buf);
593 
594 	while (true) {
595 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
596 
597 		if (rc < 0) {
598 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
599 				return 0;
600 			}
601 
602 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
603 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
604 			} else {
605 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
606 			}
607 			return 0;
608 		}
609 
610 		cm = CMSG_FIRSTHDR(&msgh);
611 		if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
612 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
613 			return 0;
614 		}
615 
616 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
617 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
618 			SPDK_WARNLOG("Unexpected extended error origin\n");
619 			return 0;
620 		}
621 
622 		/* Most of the time, the pending_reqs array is in the exact
623 		 * order we need such that all of the requests to complete are
624 		 * in order, in the front. It is guaranteed that all requests
625 		 * belonging to the same sendmsg call are sequential, so once
626 		 * we encounter one match we can stop looping as soon as a
627 		 * non-match is found.
628 		 */
629 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
630 			found = false;
631 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
632 				if (req->internal.offset == idx) {
633 					found = true;
634 
635 					rc = spdk_sock_request_put(sock, req, 0);
636 					if (rc < 0) {
637 						return rc;
638 					}
639 
640 				} else if (found) {
641 					break;
642 				}
643 			}
644 
645 		}
646 	}
647 
648 	return 0;
649 }
650 #endif
651 
652 static int
653 _sock_flush(struct spdk_sock *sock)
654 {
655 	struct spdk_posix_sock *psock = __posix_sock(sock);
656 	struct msghdr msg = {};
657 	int flags;
658 	struct iovec iovs[IOV_BATCH_SIZE];
659 	int iovcnt;
660 	int retval;
661 	struct spdk_sock_request *req;
662 	int i;
663 	ssize_t rc;
664 	unsigned int offset;
665 	size_t len;
666 
667 	/* Can't flush from within a callback or we end up with recursive calls */
668 	if (sock->cb_cnt > 0) {
669 		return 0;
670 	}
671 
672 	/* Gather an iov */
673 	iovcnt = 0;
674 	req = TAILQ_FIRST(&sock->queued_reqs);
675 	while (req) {
676 		offset = req->internal.offset;
677 
678 		for (i = 0; i < req->iovcnt; i++) {
679 			/* Consume any offset first */
680 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
681 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
682 				continue;
683 			}
684 
685 			iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
686 			iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
687 			iovcnt++;
688 
689 			offset = 0;
690 
691 			if (iovcnt >= IOV_BATCH_SIZE) {
692 				break;
693 			}
694 		}
695 
696 		if (iovcnt >= IOV_BATCH_SIZE) {
697 			break;
698 		}
699 
700 		req = TAILQ_NEXT(req, internal.link);
701 	}
702 
703 	if (iovcnt == 0) {
704 		return 0;
705 	}
706 
707 	/* Perform the vectored write */
708 	msg.msg_iov = iovs;
709 	msg.msg_iovlen = iovcnt;
710 #ifdef SPDK_ZEROCOPY
711 	if (psock->zcopy) {
712 		flags = MSG_ZEROCOPY;
713 	} else
714 #endif
715 	{
716 		flags = 0;
717 	}
718 	rc = sendmsg(psock->fd, &msg, flags);
719 	if (rc <= 0) {
720 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
721 			return 0;
722 		}
723 		return rc;
724 	}
725 
726 	psock->sendmsg_idx++;
727 
728 	/* Consume the requests that were actually written */
729 	req = TAILQ_FIRST(&sock->queued_reqs);
730 	while (req) {
731 		offset = req->internal.offset;
732 
733 		for (i = 0; i < req->iovcnt; i++) {
734 			/* Advance by the offset first */
735 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
736 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
737 				continue;
738 			}
739 
740 			/* Calculate the remaining length of this element */
741 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
742 
743 			if (len > (size_t)rc) {
744 				/* This element was partially sent. */
745 				req->internal.offset += rc;
746 				return 0;
747 			}
748 
749 			offset = 0;
750 			req->internal.offset += len;
751 			rc -= len;
752 		}
753 
754 		/* Handled a full request. */
755 		spdk_sock_request_pend(sock, req);
756 
757 		if (!psock->zcopy) {
758 			/* The sendmsg syscall above isn't currently asynchronous,
759 			* so it's already done. */
760 			retval = spdk_sock_request_put(sock, req, 0);
761 			if (retval) {
762 				break;
763 			}
764 		} else {
765 			/* Re-use the offset field to hold the sendmsg call index. The
766 			 * index is 0 based, so subtract one here because we've already
767 			 * incremented above. */
768 			req->internal.offset = psock->sendmsg_idx - 1;
769 		}
770 
771 		if (rc == 0) {
772 			break;
773 		}
774 
775 		req = TAILQ_FIRST(&sock->queued_reqs);
776 	}
777 
778 	return 0;
779 }
780 
781 static int
782 spdk_posix_sock_flush(struct spdk_sock *_sock)
783 {
784 	return _sock_flush(_sock);
785 }
786 
787 static ssize_t
788 spdk_posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
789 {
790 	struct iovec siov[2];
791 	int sbytes;
792 	ssize_t bytes;
793 	struct spdk_posix_sock_group_impl *group;
794 
795 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
796 	if (sbytes < 0) {
797 		errno = EINVAL;
798 		return -1;
799 	} else if (sbytes == 0) {
800 		errno = EAGAIN;
801 		return -1;
802 	}
803 
804 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
805 
806 	if (bytes == 0) {
807 		/* The only way this happens is if diov is 0 length */
808 		errno = EINVAL;
809 		return -1;
810 	}
811 
812 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
813 
814 	/* If we drained the pipe, take it off the level-triggered list */
815 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
816 		group = __posix_group_impl(sock->base.group_impl);
817 		TAILQ_REMOVE(&group->pending_recv, sock, link);
818 		sock->pending_recv = false;
819 	}
820 
821 	return bytes;
822 }
823 
824 static inline ssize_t
825 _spdk_posix_sock_read(struct spdk_posix_sock *sock)
826 {
827 	struct iovec iov[2];
828 	int bytes;
829 	struct spdk_posix_sock_group_impl *group;
830 
831 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
832 
833 	if (bytes > 0) {
834 		bytes = readv(sock->fd, iov, 2);
835 		if (bytes > 0) {
836 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
837 			if (sock->base.group_impl) {
838 				group = __posix_group_impl(sock->base.group_impl);
839 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
840 				sock->pending_recv = true;
841 			}
842 		}
843 	}
844 
845 	return bytes;
846 }
847 
848 static ssize_t
849 spdk_posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
850 {
851 	struct spdk_posix_sock *sock = __posix_sock(_sock);
852 	int rc, i;
853 	size_t len;
854 
855 	if (sock->recv_pipe == NULL) {
856 		return readv(sock->fd, iov, iovcnt);
857 	}
858 
859 	len = 0;
860 	for (i = 0; i < iovcnt; i++) {
861 		len += iov[i].iov_len;
862 	}
863 
864 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
865 		/* If the user is receiving a sufficiently large amount of data,
866 		 * receive directly to their buffers. */
867 		if (len >= 1024) {
868 			return readv(sock->fd, iov, iovcnt);
869 		}
870 
871 		/* Otherwise, do a big read into our pipe */
872 		rc = _spdk_posix_sock_read(sock);
873 		if (rc <= 0) {
874 			return rc;
875 		}
876 	}
877 
878 	return spdk_posix_sock_recv_from_pipe(sock, iov, iovcnt);
879 }
880 
881 static ssize_t
882 spdk_posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
883 {
884 	struct iovec iov[1];
885 
886 	iov[0].iov_base = buf;
887 	iov[0].iov_len = len;
888 
889 	return spdk_posix_sock_readv(sock, iov, 1);
890 }
891 
892 static ssize_t
893 spdk_posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
894 {
895 	struct spdk_posix_sock *sock = __posix_sock(_sock);
896 	int rc;
897 
898 	/* In order to process a writev, we need to flush any asynchronous writes
899 	 * first. */
900 	rc = _sock_flush(_sock);
901 	if (rc < 0) {
902 		return rc;
903 	}
904 
905 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
906 		/* We weren't able to flush all requests */
907 		errno = EAGAIN;
908 		return -1;
909 	}
910 
911 	return writev(sock->fd, iov, iovcnt);
912 }
913 
914 static void
915 spdk_posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
916 {
917 	int rc;
918 
919 	spdk_sock_request_queue(sock, req);
920 
921 	/* If there are a sufficient number queued, just flush them out immediately. */
922 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
923 		rc = _sock_flush(sock);
924 		if (rc) {
925 			spdk_sock_abort_requests(sock);
926 		}
927 	}
928 }
929 
930 static int
931 spdk_posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
932 {
933 	struct spdk_posix_sock *sock = __posix_sock(_sock);
934 	int val;
935 	int rc;
936 
937 	assert(sock != NULL);
938 
939 	val = nbytes;
940 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
941 	if (rc != 0) {
942 		return -1;
943 	}
944 	return 0;
945 }
946 
947 static bool
948 spdk_posix_sock_is_ipv6(struct spdk_sock *_sock)
949 {
950 	struct spdk_posix_sock *sock = __posix_sock(_sock);
951 	struct sockaddr_storage sa;
952 	socklen_t salen;
953 	int rc;
954 
955 	assert(sock != NULL);
956 
957 	memset(&sa, 0, sizeof sa);
958 	salen = sizeof sa;
959 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
960 	if (rc != 0) {
961 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
962 		return false;
963 	}
964 
965 	return (sa.ss_family == AF_INET6);
966 }
967 
968 static bool
969 spdk_posix_sock_is_ipv4(struct spdk_sock *_sock)
970 {
971 	struct spdk_posix_sock *sock = __posix_sock(_sock);
972 	struct sockaddr_storage sa;
973 	socklen_t salen;
974 	int rc;
975 
976 	assert(sock != NULL);
977 
978 	memset(&sa, 0, sizeof sa);
979 	salen = sizeof sa;
980 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
981 	if (rc != 0) {
982 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
983 		return false;
984 	}
985 
986 	return (sa.ss_family == AF_INET);
987 }
988 
989 static bool
990 spdk_posix_sock_is_connected(struct spdk_sock *_sock)
991 {
992 	struct spdk_posix_sock *sock = __posix_sock(_sock);
993 	uint8_t byte;
994 	int rc;
995 
996 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
997 	if (rc == 0) {
998 		return false;
999 	}
1000 
1001 	if (rc < 0) {
1002 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1003 			return true;
1004 		}
1005 
1006 		return false;
1007 	}
1008 
1009 	return true;
1010 }
1011 
1012 static int
1013 spdk_posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1014 {
1015 	int rc = -1;
1016 
1017 #if defined(SO_INCOMING_NAPI_ID)
1018 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1019 	socklen_t salen = sizeof(int);
1020 
1021 	rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
1022 	if (rc != 0) {
1023 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1024 	}
1025 
1026 #endif
1027 	return rc;
1028 }
1029 
1030 static struct spdk_sock_group_impl *
1031 spdk_posix_sock_group_impl_create(void)
1032 {
1033 	struct spdk_posix_sock_group_impl *group_impl;
1034 	int fd;
1035 
1036 #if defined(__linux__)
1037 	fd = epoll_create1(0);
1038 #elif defined(__FreeBSD__)
1039 	fd = kqueue();
1040 #endif
1041 	if (fd == -1) {
1042 		return NULL;
1043 	}
1044 
1045 	group_impl = calloc(1, sizeof(*group_impl));
1046 	if (group_impl == NULL) {
1047 		SPDK_ERRLOG("group_impl allocation failed\n");
1048 		close(fd);
1049 		return NULL;
1050 	}
1051 
1052 	group_impl->fd = fd;
1053 	TAILQ_INIT(&group_impl->pending_recv);
1054 
1055 	return &group_impl->base;
1056 }
1057 
1058 static int
1059 spdk_posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1060 {
1061 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1062 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1063 	int rc;
1064 
1065 #if defined(__linux__)
1066 	struct epoll_event event;
1067 
1068 	memset(&event, 0, sizeof(event));
1069 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1070 	event.events = EPOLLIN | EPOLLERR;
1071 	event.data.ptr = sock;
1072 
1073 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1074 #elif defined(__FreeBSD__)
1075 	struct kevent event;
1076 	struct timespec ts = {0};
1077 
1078 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1079 
1080 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1081 #endif
1082 
1083 	/* switched from another polling group due to scheduling */
1084 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1085 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1086 		assert(sock->pending_recv == false);
1087 		sock->pending_recv = true;
1088 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1089 	}
1090 
1091 	return rc;
1092 }
1093 
1094 static int
1095 spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1096 {
1097 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1098 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1099 	int rc;
1100 
1101 	if (sock->recv_pipe != NULL) {
1102 		if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
1103 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1104 			sock->pending_recv = false;
1105 		}
1106 		assert(sock->pending_recv == false);
1107 	}
1108 
1109 #if defined(__linux__)
1110 	struct epoll_event event;
1111 
1112 	/* Event parameter is ignored but some old kernel version still require it. */
1113 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1114 #elif defined(__FreeBSD__)
1115 	struct kevent event;
1116 	struct timespec ts = {0};
1117 
1118 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1119 
1120 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1121 	if (rc == 0 && event.flags & EV_ERROR) {
1122 		rc = -1;
1123 		errno = event.data;
1124 	}
1125 #endif
1126 
1127 	spdk_sock_abort_requests(_sock);
1128 
1129 	return rc;
1130 }
1131 
1132 static int
1133 spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1134 				struct spdk_sock **socks)
1135 {
1136 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1137 	struct spdk_sock *sock, *tmp;
1138 	int num_events, i, rc;
1139 	struct spdk_posix_sock *psock, *ptmp;
1140 #if defined(__linux__)
1141 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1142 #elif defined(__FreeBSD__)
1143 	struct kevent events[MAX_EVENTS_PER_POLL];
1144 	struct timespec ts = {0};
1145 #endif
1146 
1147 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1148 	 * a completion callback could remove the sock from the
1149 	 * group. */
1150 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1151 		rc = _sock_flush(sock);
1152 		if (rc) {
1153 			spdk_sock_abort_requests(sock);
1154 		}
1155 	}
1156 
1157 #if defined(__linux__)
1158 	num_events = epoll_wait(group->fd, events, max_events, 0);
1159 #elif defined(__FreeBSD__)
1160 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1161 #endif
1162 
1163 	if (num_events == -1) {
1164 		return -1;
1165 	}
1166 
1167 	for (i = 0; i < num_events; i++) {
1168 #if defined(__linux__)
1169 		sock = events[i].data.ptr;
1170 		psock = __posix_sock(sock);
1171 
1172 #ifdef SPDK_ZEROCOPY
1173 		if (events[i].events & EPOLLERR) {
1174 			rc = _sock_check_zcopy(sock);
1175 			/* If the socket was closed or removed from
1176 			 * the group in response to a send ack, don't
1177 			 * add it to the array here. */
1178 			if (rc || sock->cb_fn == NULL) {
1179 				continue;
1180 			}
1181 		}
1182 #endif
1183 		if ((events[i].events & EPOLLIN) == 0) {
1184 			continue;
1185 		}
1186 
1187 #elif defined(__FreeBSD__)
1188 		sock = events[i].udata;
1189 		psock = __posix_sock(sock);
1190 #endif
1191 
1192 		/* If the socket does not already have recv pending, add it now */
1193 		if (!psock->pending_recv) {
1194 			psock->pending_recv = true;
1195 			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1196 		}
1197 	}
1198 
1199 	num_events = 0;
1200 
1201 	TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) {
1202 		if (num_events == max_events) {
1203 			break;
1204 		}
1205 
1206 		socks[num_events++] = &psock->base;
1207 	}
1208 
1209 	/* Cycle the pending_recv list so that each time we poll things aren't
1210 	 * in the same order. */
1211 	for (i = 0; i < num_events; i++) {
1212 		psock = __posix_sock(socks[i]);
1213 
1214 		TAILQ_REMOVE(&group->pending_recv, psock, link);
1215 
1216 		if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) {
1217 			psock->pending_recv = false;
1218 		} else {
1219 			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1220 		}
1221 
1222 	}
1223 
1224 	return num_events;
1225 }
1226 
1227 static int
1228 spdk_posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1229 {
1230 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1231 	int rc;
1232 
1233 	rc = close(group->fd);
1234 	free(group);
1235 	return rc;
1236 }
1237 
1238 static struct spdk_net_impl g_posix_net_impl = {
1239 	.name		= "posix",
1240 	.getaddr	= spdk_posix_sock_getaddr,
1241 	.connect	= spdk_posix_sock_connect,
1242 	.listen		= spdk_posix_sock_listen,
1243 	.accept		= spdk_posix_sock_accept,
1244 	.close		= spdk_posix_sock_close,
1245 	.recv		= spdk_posix_sock_recv,
1246 	.readv		= spdk_posix_sock_readv,
1247 	.writev		= spdk_posix_sock_writev,
1248 	.writev_async	= spdk_posix_sock_writev_async,
1249 	.flush		= spdk_posix_sock_flush,
1250 	.set_recvlowat	= spdk_posix_sock_set_recvlowat,
1251 	.set_recvbuf	= spdk_posix_sock_set_recvbuf,
1252 	.set_sendbuf	= spdk_posix_sock_set_sendbuf,
1253 	.is_ipv6	= spdk_posix_sock_is_ipv6,
1254 	.is_ipv4	= spdk_posix_sock_is_ipv4,
1255 	.is_connected	= spdk_posix_sock_is_connected,
1256 	.get_placement_id	= spdk_posix_sock_get_placement_id,
1257 	.group_impl_create	= spdk_posix_sock_group_impl_create,
1258 	.group_impl_add_sock	= spdk_posix_sock_group_impl_add_sock,
1259 	.group_impl_remove_sock = spdk_posix_sock_group_impl_remove_sock,
1260 	.group_impl_poll	= spdk_posix_sock_group_impl_poll,
1261 	.group_impl_close	= spdk_posix_sock_group_impl_close,
1262 };
1263 
1264 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1265