xref: /spdk/module/sock/posix/posix.c (revision 78b696bca594269c6a5e0ac235e44e94fc69e4f8)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #if defined(__linux__)
37 #include <sys/epoll.h>
38 #include <linux/errqueue.h>
39 #elif defined(__FreeBSD__)
40 #include <sys/event.h>
41 #endif
42 
43 #include "spdk/log.h"
44 #include "spdk/pipe.h"
45 #include "spdk/sock.h"
46 #include "spdk/util.h"
47 #include "spdk_internal/sock.h"
48 
49 #define MAX_TMPBUF 1024
50 #define PORTNUMLEN 32
51 #define SO_RCVBUF_SIZE (2 * 1024 * 1024)
52 #define SO_SNDBUF_SIZE (2 * 1024 * 1024)
53 #define IOV_BATCH_SIZE 64
54 
55 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
56 #define SPDK_ZEROCOPY
57 #endif
58 
59 struct spdk_posix_sock {
60 	struct spdk_sock	base;
61 	int			fd;
62 
63 	uint32_t		sendmsg_idx;
64 	bool			zcopy;
65 
66 	struct spdk_pipe	*recv_pipe;
67 	void			*recv_buf;
68 	int			recv_buf_sz;
69 	bool			pending_recv;
70 
71 	TAILQ_ENTRY(spdk_posix_sock)	link;
72 };
73 
74 struct spdk_posix_sock_group_impl {
75 	struct spdk_sock_group_impl	base;
76 	int				fd;
77 	TAILQ_HEAD(, spdk_posix_sock)	pending_recv;
78 };
79 
80 static int
81 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
82 {
83 	const char *result = NULL;
84 
85 	if (sa == NULL || host == NULL) {
86 		return -1;
87 	}
88 
89 	switch (sa->sa_family) {
90 	case AF_INET:
91 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
92 				   host, hlen);
93 		break;
94 	case AF_INET6:
95 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
96 				   host, hlen);
97 		break;
98 	default:
99 		break;
100 	}
101 
102 	if (result != NULL) {
103 		return 0;
104 	} else {
105 		return -1;
106 	}
107 }
108 
109 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
110 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
111 
112 static int
113 spdk_posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
114 			char *caddr, int clen, uint16_t *cport)
115 {
116 	struct spdk_posix_sock *sock = __posix_sock(_sock);
117 	struct sockaddr_storage sa;
118 	socklen_t salen;
119 	int rc;
120 
121 	assert(sock != NULL);
122 
123 	memset(&sa, 0, sizeof sa);
124 	salen = sizeof sa;
125 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
126 	if (rc != 0) {
127 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
128 		return -1;
129 	}
130 
131 	switch (sa.ss_family) {
132 	case AF_UNIX:
133 		/* Acceptable connection types that don't have IPs */
134 		return 0;
135 	case AF_INET:
136 	case AF_INET6:
137 		/* Code below will get IP addresses */
138 		break;
139 	default:
140 		/* Unsupported socket family */
141 		return -1;
142 	}
143 
144 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
145 	if (rc != 0) {
146 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
147 		return -1;
148 	}
149 
150 	if (sport) {
151 		if (sa.ss_family == AF_INET) {
152 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
153 		} else if (sa.ss_family == AF_INET6) {
154 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
155 		}
156 	}
157 
158 	memset(&sa, 0, sizeof sa);
159 	salen = sizeof sa;
160 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
161 	if (rc != 0) {
162 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
163 		return -1;
164 	}
165 
166 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
167 	if (rc != 0) {
168 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
169 		return -1;
170 	}
171 
172 	if (cport) {
173 		if (sa.ss_family == AF_INET) {
174 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
175 		} else if (sa.ss_family == AF_INET6) {
176 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
177 		}
178 	}
179 
180 	return 0;
181 }
182 
183 enum spdk_posix_sock_create_type {
184 	SPDK_SOCK_CREATE_LISTEN,
185 	SPDK_SOCK_CREATE_CONNECT,
186 };
187 
188 static int
189 spdk_posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
190 {
191 	uint8_t *new_buf;
192 	struct spdk_pipe *new_pipe;
193 	struct iovec siov[2];
194 	struct iovec diov[2];
195 	int sbytes;
196 	ssize_t bytes;
197 
198 	if (sock->recv_buf_sz == sz) {
199 		return 0;
200 	}
201 
202 	/* If the new size is 0, just free the pipe */
203 	if (sz == 0) {
204 		spdk_pipe_destroy(sock->recv_pipe);
205 		free(sock->recv_buf);
206 		sock->recv_pipe = NULL;
207 		sock->recv_buf = NULL;
208 		return 0;
209 	}
210 
211 	/* Round up to next 64 byte multiple */
212 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
213 	if (!new_buf) {
214 		SPDK_ERRLOG("socket recv buf allocation failed\n");
215 		return -ENOMEM;
216 	}
217 
218 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
219 	if (new_pipe == NULL) {
220 		SPDK_ERRLOG("socket pipe allocation failed\n");
221 		free(new_buf);
222 		return -ENOMEM;
223 	}
224 
225 	if (sock->recv_pipe != NULL) {
226 		/* Pull all of the data out of the old pipe */
227 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
228 		if (sbytes > sz) {
229 			/* Too much data to fit into the new pipe size */
230 			spdk_pipe_destroy(new_pipe);
231 			free(new_buf);
232 			return -EINVAL;
233 		}
234 
235 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
236 		assert(sbytes == sz);
237 
238 		bytes = spdk_iovcpy(siov, 2, diov, 2);
239 		spdk_pipe_writer_advance(new_pipe, bytes);
240 
241 		spdk_pipe_destroy(sock->recv_pipe);
242 		free(sock->recv_buf);
243 	}
244 
245 	sock->recv_buf_sz = sz;
246 	sock->recv_buf = new_buf;
247 	sock->recv_pipe = new_pipe;
248 
249 	return 0;
250 }
251 
252 static int
253 spdk_posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
254 {
255 	struct spdk_posix_sock *sock = __posix_sock(_sock);
256 	int rc;
257 
258 	assert(sock != NULL);
259 
260 	if (sz < SO_RCVBUF_SIZE) {
261 		sz = SO_RCVBUF_SIZE;
262 	}
263 
264 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
265 	if (rc < 0) {
266 		return rc;
267 	}
268 
269 	return 0;
270 }
271 
272 static int
273 spdk_posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
274 {
275 	struct spdk_posix_sock *sock = __posix_sock(_sock);
276 	int rc;
277 
278 	assert(sock != NULL);
279 
280 	if (sz < SO_SNDBUF_SIZE) {
281 		sz = SO_SNDBUF_SIZE;
282 	}
283 
284 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
285 	if (rc < 0) {
286 		return rc;
287 	}
288 
289 	return 0;
290 }
291 
292 static struct spdk_posix_sock *
293 _spdk_posix_sock_alloc(int fd)
294 {
295 	struct spdk_posix_sock *sock;
296 	int rc;
297 #ifdef SPDK_ZEROCOPY
298 	int flag;
299 #endif
300 
301 	sock = calloc(1, sizeof(*sock));
302 	if (sock == NULL) {
303 		SPDK_ERRLOG("sock allocation failed\n");
304 		return NULL;
305 	}
306 
307 	sock->fd = fd;
308 
309 #ifndef __aarch64__
310 	/* On ARM systems, this buffering does not help. Skip it. */
311 	/* The size of the pipe is purely derived from benchmarks. It seems to work well. */
312 	rc = spdk_posix_sock_alloc_pipe(sock, 8192);
313 	if (rc) {
314 		SPDK_ERRLOG("unable to allocate sufficient recvbuf\n");
315 		free(sock);
316 		return NULL;
317 	}
318 #endif
319 
320 #ifdef SPDK_ZEROCOPY
321 	/* Try to turn on zero copy sends */
322 	flag = 1;
323 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
324 	if (rc == 0) {
325 		sock->zcopy = true;
326 	}
327 #endif
328 
329 	return sock;
330 }
331 
332 static struct spdk_sock *
333 spdk_posix_sock_create(const char *ip, int port, enum spdk_posix_sock_create_type type)
334 {
335 	struct spdk_posix_sock *sock;
336 	char buf[MAX_TMPBUF];
337 	char portnum[PORTNUMLEN];
338 	char *p;
339 	struct addrinfo hints, *res, *res0;
340 	int fd, flag;
341 	int val = 1;
342 	int rc, sz;
343 
344 	if (ip == NULL) {
345 		return NULL;
346 	}
347 	if (ip[0] == '[') {
348 		snprintf(buf, sizeof(buf), "%s", ip + 1);
349 		p = strchr(buf, ']');
350 		if (p != NULL) {
351 			*p = '\0';
352 		}
353 		ip = (const char *) &buf[0];
354 	}
355 
356 	snprintf(portnum, sizeof portnum, "%d", port);
357 	memset(&hints, 0, sizeof hints);
358 	hints.ai_family = PF_UNSPEC;
359 	hints.ai_socktype = SOCK_STREAM;
360 	hints.ai_flags = AI_NUMERICSERV;
361 	hints.ai_flags |= AI_PASSIVE;
362 	hints.ai_flags |= AI_NUMERICHOST;
363 	rc = getaddrinfo(ip, portnum, &hints, &res0);
364 	if (rc != 0) {
365 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
366 		return NULL;
367 	}
368 
369 	/* try listen */
370 	fd = -1;
371 	for (res = res0; res != NULL; res = res->ai_next) {
372 retry:
373 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
374 		if (fd < 0) {
375 			/* error */
376 			continue;
377 		}
378 
379 		sz = SO_RCVBUF_SIZE;
380 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
381 		if (rc) {
382 			/* Not fatal */
383 		}
384 
385 		sz = SO_SNDBUF_SIZE;
386 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
387 		if (rc) {
388 			/* Not fatal */
389 		}
390 
391 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
392 		if (rc != 0) {
393 			close(fd);
394 			/* error */
395 			continue;
396 		}
397 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
398 		if (rc != 0) {
399 			close(fd);
400 			/* error */
401 			continue;
402 		}
403 
404 		if (res->ai_family == AF_INET6) {
405 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
406 			if (rc != 0) {
407 				close(fd);
408 				/* error */
409 				continue;
410 			}
411 		}
412 
413 		if (type == SPDK_SOCK_CREATE_LISTEN) {
414 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
415 			if (rc != 0) {
416 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
417 				switch (errno) {
418 				case EINTR:
419 					/* interrupted? */
420 					close(fd);
421 					goto retry;
422 				case EADDRNOTAVAIL:
423 					SPDK_ERRLOG("IP address %s not available. "
424 						    "Verify IP address in config file "
425 						    "and make sure setup script is "
426 						    "run before starting spdk app.\n", ip);
427 				/* FALLTHROUGH */
428 				default:
429 					/* try next family */
430 					close(fd);
431 					fd = -1;
432 					continue;
433 				}
434 			}
435 			/* bind OK */
436 			rc = listen(fd, 512);
437 			if (rc != 0) {
438 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
439 				close(fd);
440 				fd = -1;
441 				break;
442 			}
443 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
444 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
445 			if (rc != 0) {
446 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
447 				/* try next family */
448 				close(fd);
449 				fd = -1;
450 				continue;
451 			}
452 		}
453 
454 		flag = fcntl(fd, F_GETFL);
455 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
456 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
457 			close(fd);
458 			fd = -1;
459 			break;
460 		}
461 		break;
462 	}
463 	freeaddrinfo(res0);
464 
465 	if (fd < 0) {
466 		return NULL;
467 	}
468 
469 	sock = _spdk_posix_sock_alloc(fd);
470 	if (sock == NULL) {
471 		SPDK_ERRLOG("sock allocation failed\n");
472 		close(fd);
473 		return NULL;
474 	}
475 
476 	/* Disable zero copy for client sockets until support is added */
477 	if (type == SPDK_SOCK_CREATE_CONNECT) {
478 		sock->zcopy = false;
479 	}
480 
481 	return &sock->base;
482 }
483 
484 static struct spdk_sock *
485 spdk_posix_sock_listen(const char *ip, int port)
486 {
487 	return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN);
488 }
489 
490 static struct spdk_sock *
491 spdk_posix_sock_connect(const char *ip, int port)
492 {
493 	return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT);
494 }
495 
496 static struct spdk_sock *
497 spdk_posix_sock_accept(struct spdk_sock *_sock)
498 {
499 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
500 	struct sockaddr_storage		sa;
501 	socklen_t			salen;
502 	int				rc, fd;
503 	struct spdk_posix_sock		*new_sock;
504 	int				flag;
505 
506 	memset(&sa, 0, sizeof(sa));
507 	salen = sizeof(sa);
508 
509 	assert(sock != NULL);
510 
511 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
512 
513 	if (rc == -1) {
514 		return NULL;
515 	}
516 
517 	fd = rc;
518 
519 	flag = fcntl(fd, F_GETFL);
520 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
521 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
522 		close(fd);
523 		return NULL;
524 	}
525 
526 	new_sock = _spdk_posix_sock_alloc(fd);
527 	if (new_sock == NULL) {
528 		close(fd);
529 		return NULL;
530 	}
531 
532 	return &new_sock->base;
533 }
534 
535 static int
536 spdk_posix_sock_close(struct spdk_sock *_sock)
537 {
538 	struct spdk_posix_sock *sock = __posix_sock(_sock);
539 
540 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
541 
542 	/* If the socket fails to close, the best choice is to
543 	 * leak the fd but continue to free the rest of the sock
544 	 * memory. */
545 	close(sock->fd);
546 
547 	spdk_pipe_destroy(sock->recv_pipe);
548 	free(sock->recv_buf);
549 	free(sock);
550 
551 	return 0;
552 }
553 
554 #ifdef SPDK_ZEROCOPY
555 static int
556 _sock_check_zcopy(struct spdk_sock *sock)
557 {
558 	struct spdk_posix_sock *psock = __posix_sock(sock);
559 	struct msghdr msgh = {};
560 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
561 	ssize_t rc;
562 	struct sock_extended_err *serr;
563 	struct cmsghdr *cm;
564 	uint32_t idx;
565 	struct spdk_sock_request *req, *treq;
566 	bool found;
567 
568 	msgh.msg_control = buf;
569 	msgh.msg_controllen = sizeof(buf);
570 
571 	while (true) {
572 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
573 
574 		if (rc < 0) {
575 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
576 				return 0;
577 			}
578 
579 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
580 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
581 			} else {
582 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
583 			}
584 			return 0;
585 		}
586 
587 		cm = CMSG_FIRSTHDR(&msgh);
588 		if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
589 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
590 			return 0;
591 		}
592 
593 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
594 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
595 			SPDK_WARNLOG("Unexpected extended error origin\n");
596 			return 0;
597 		}
598 
599 		/* Most of the time, the pending_reqs array is in the exact
600 		 * order we need such that all of the requests to complete are
601 		 * in order, in the front. It is guaranteed that all requests
602 		 * belonging to the same sendmsg call are sequential, so once
603 		 * we encounter one match we can stop looping as soon as a
604 		 * non-match is found.
605 		 */
606 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
607 			found = false;
608 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
609 				if (req->internal.offset == idx) {
610 					found = true;
611 
612 					rc = spdk_sock_request_put(sock, req, 0);
613 					if (rc < 0) {
614 						return rc;
615 					}
616 
617 				} else if (found) {
618 					break;
619 				}
620 			}
621 
622 		}
623 	}
624 
625 	return 0;
626 }
627 #endif
628 
629 static int
630 _sock_flush(struct spdk_sock *sock)
631 {
632 	struct spdk_posix_sock *psock = __posix_sock(sock);
633 	struct msghdr msg = {};
634 	int flags;
635 	struct iovec iovs[IOV_BATCH_SIZE];
636 	int iovcnt;
637 	int retval;
638 	struct spdk_sock_request *req;
639 	int i;
640 	ssize_t rc;
641 	unsigned int offset;
642 	size_t len;
643 
644 	/* Can't flush from within a callback or we end up with recursive calls */
645 	if (sock->cb_cnt > 0) {
646 		return 0;
647 	}
648 
649 	/* Gather an iov */
650 	iovcnt = 0;
651 	req = TAILQ_FIRST(&sock->queued_reqs);
652 	while (req) {
653 		offset = req->internal.offset;
654 
655 		for (i = 0; i < req->iovcnt; i++) {
656 			/* Consume any offset first */
657 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
658 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
659 				continue;
660 			}
661 
662 			iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
663 			iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
664 			iovcnt++;
665 
666 			offset = 0;
667 
668 			if (iovcnt >= IOV_BATCH_SIZE) {
669 				break;
670 			}
671 		}
672 
673 		if (iovcnt >= IOV_BATCH_SIZE) {
674 			break;
675 		}
676 
677 		req = TAILQ_NEXT(req, internal.link);
678 	}
679 
680 	if (iovcnt == 0) {
681 		return 0;
682 	}
683 
684 	/* Perform the vectored write */
685 	msg.msg_iov = iovs;
686 	msg.msg_iovlen = iovcnt;
687 #ifdef SPDK_ZEROCOPY
688 	if (psock->zcopy) {
689 		flags = MSG_ZEROCOPY;
690 	} else
691 #endif
692 	{
693 		flags = 0;
694 	}
695 	rc = sendmsg(psock->fd, &msg, flags);
696 	if (rc <= 0) {
697 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
698 			return 0;
699 		}
700 		return rc;
701 	}
702 
703 	psock->sendmsg_idx++;
704 
705 	/* Consume the requests that were actually written */
706 	req = TAILQ_FIRST(&sock->queued_reqs);
707 	while (req) {
708 		offset = req->internal.offset;
709 
710 		for (i = 0; i < req->iovcnt; i++) {
711 			/* Advance by the offset first */
712 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
713 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
714 				continue;
715 			}
716 
717 			/* Calculate the remaining length of this element */
718 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
719 
720 			if (len > (size_t)rc) {
721 				/* This element was partially sent. */
722 				req->internal.offset += rc;
723 				return 0;
724 			}
725 
726 			offset = 0;
727 			req->internal.offset += len;
728 			rc -= len;
729 		}
730 
731 		/* Handled a full request. */
732 		spdk_sock_request_pend(sock, req);
733 
734 		if (!psock->zcopy) {
735 			/* The sendmsg syscall above isn't currently asynchronous,
736 			* so it's already done. */
737 			retval = spdk_sock_request_put(sock, req, 0);
738 			if (retval) {
739 				break;
740 			}
741 		} else {
742 			/* Re-use the offset field to hold the sendmsg call index. The
743 			 * index is 0 based, so subtract one here because we've already
744 			 * incremented above. */
745 			req->internal.offset = psock->sendmsg_idx - 1;
746 		}
747 
748 		if (rc == 0) {
749 			break;
750 		}
751 
752 		req = TAILQ_FIRST(&sock->queued_reqs);
753 	}
754 
755 	return 0;
756 }
757 
758 static int
759 spdk_posix_sock_flush(struct spdk_sock *_sock)
760 {
761 	return _sock_flush(_sock);
762 }
763 
764 static ssize_t
765 spdk_posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
766 {
767 	struct iovec siov[2];
768 	int sbytes;
769 	ssize_t bytes;
770 	struct spdk_posix_sock_group_impl *group;
771 
772 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
773 	if (sbytes < 0) {
774 		errno = EINVAL;
775 		return -1;
776 	} else if (sbytes == 0) {
777 		errno = EAGAIN;
778 		return -1;
779 	}
780 
781 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
782 
783 	if (bytes == 0) {
784 		/* The only way this happens is if diov is 0 length */
785 		errno = EINVAL;
786 		return -1;
787 	}
788 
789 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
790 
791 	/* If we drained the pipe, take it off the level-triggered list */
792 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
793 		group = __posix_group_impl(sock->base.group_impl);
794 		TAILQ_REMOVE(&group->pending_recv, sock, link);
795 		sock->pending_recv = false;
796 	}
797 
798 	return bytes;
799 }
800 
801 static inline ssize_t
802 _spdk_posix_sock_read(struct spdk_posix_sock *sock)
803 {
804 	struct iovec iov[2];
805 	int bytes;
806 	struct spdk_posix_sock_group_impl *group;
807 
808 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
809 
810 	if (bytes > 0) {
811 		bytes = readv(sock->fd, iov, 2);
812 		if (bytes > 0) {
813 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
814 			if (sock->base.group_impl) {
815 				group = __posix_group_impl(sock->base.group_impl);
816 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
817 				sock->pending_recv = true;
818 			}
819 		}
820 	}
821 
822 	return bytes;
823 }
824 
825 static ssize_t
826 spdk_posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
827 {
828 	struct spdk_posix_sock *sock = __posix_sock(_sock);
829 	int rc, i;
830 	size_t len;
831 
832 	if (sock->recv_pipe == NULL) {
833 		return readv(sock->fd, iov, iovcnt);
834 	}
835 
836 	len = 0;
837 	for (i = 0; i < iovcnt; i++) {
838 		len += iov[i].iov_len;
839 	}
840 
841 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
842 		/* If the user is receiving a sufficiently large amount of data,
843 		 * receive directly to their buffers. */
844 		if (len >= 1024) {
845 			return readv(sock->fd, iov, iovcnt);
846 		}
847 
848 		/* Otherwise, do a big read into our pipe */
849 		rc = _spdk_posix_sock_read(sock);
850 		if (rc <= 0) {
851 			return rc;
852 		}
853 	}
854 
855 	return spdk_posix_sock_recv_from_pipe(sock, iov, iovcnt);
856 }
857 
858 static ssize_t
859 spdk_posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
860 {
861 	struct iovec iov[1];
862 
863 	iov[0].iov_base = buf;
864 	iov[0].iov_len = len;
865 
866 	return spdk_posix_sock_readv(sock, iov, 1);
867 }
868 
869 static ssize_t
870 spdk_posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
871 {
872 	struct spdk_posix_sock *sock = __posix_sock(_sock);
873 	int rc;
874 
875 	/* In order to process a writev, we need to flush any asynchronous writes
876 	 * first. */
877 	rc = _sock_flush(_sock);
878 	if (rc < 0) {
879 		return rc;
880 	}
881 
882 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
883 		/* We weren't able to flush all requests */
884 		errno = EAGAIN;
885 		return -1;
886 	}
887 
888 	return writev(sock->fd, iov, iovcnt);
889 }
890 
891 static void
892 spdk_posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
893 {
894 	int rc;
895 
896 	spdk_sock_request_queue(sock, req);
897 
898 	/* If there are a sufficient number queued, just flush them out immediately. */
899 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
900 		rc = _sock_flush(sock);
901 		if (rc) {
902 			spdk_sock_abort_requests(sock);
903 		}
904 	}
905 }
906 
907 static int
908 spdk_posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
909 {
910 	struct spdk_posix_sock *sock = __posix_sock(_sock);
911 	int val;
912 	int rc;
913 
914 	assert(sock != NULL);
915 
916 	val = nbytes;
917 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
918 	if (rc != 0) {
919 		return -1;
920 	}
921 	return 0;
922 }
923 
924 static int
925 spdk_posix_sock_set_priority(struct spdk_sock *_sock, int priority)
926 {
927 	int rc = 0;
928 
929 #if defined(SO_PRIORITY)
930 	struct spdk_posix_sock *sock = __posix_sock(_sock);
931 
932 	assert(sock != NULL);
933 
934 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_PRIORITY,
935 			&priority, sizeof(priority));
936 #endif
937 	return rc;
938 }
939 
940 static bool
941 spdk_posix_sock_is_ipv6(struct spdk_sock *_sock)
942 {
943 	struct spdk_posix_sock *sock = __posix_sock(_sock);
944 	struct sockaddr_storage sa;
945 	socklen_t salen;
946 	int rc;
947 
948 	assert(sock != NULL);
949 
950 	memset(&sa, 0, sizeof sa);
951 	salen = sizeof sa;
952 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
953 	if (rc != 0) {
954 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
955 		return false;
956 	}
957 
958 	return (sa.ss_family == AF_INET6);
959 }
960 
961 static bool
962 spdk_posix_sock_is_ipv4(struct spdk_sock *_sock)
963 {
964 	struct spdk_posix_sock *sock = __posix_sock(_sock);
965 	struct sockaddr_storage sa;
966 	socklen_t salen;
967 	int rc;
968 
969 	assert(sock != NULL);
970 
971 	memset(&sa, 0, sizeof sa);
972 	salen = sizeof sa;
973 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
974 	if (rc != 0) {
975 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
976 		return false;
977 	}
978 
979 	return (sa.ss_family == AF_INET);
980 }
981 
982 static bool
983 spdk_posix_sock_is_connected(struct spdk_sock *_sock)
984 {
985 	struct spdk_posix_sock *sock = __posix_sock(_sock);
986 	uint8_t byte;
987 	int rc;
988 
989 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
990 	if (rc == 0) {
991 		return false;
992 	}
993 
994 	if (rc < 0) {
995 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
996 			return true;
997 		}
998 
999 		return false;
1000 	}
1001 
1002 	return true;
1003 }
1004 
1005 static int
1006 spdk_posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1007 {
1008 	int rc = -1;
1009 
1010 #if defined(SO_INCOMING_NAPI_ID)
1011 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1012 	socklen_t salen = sizeof(int);
1013 
1014 	rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
1015 	if (rc != 0) {
1016 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1017 	}
1018 
1019 #endif
1020 	return rc;
1021 }
1022 
1023 static struct spdk_sock_group_impl *
1024 spdk_posix_sock_group_impl_create(void)
1025 {
1026 	struct spdk_posix_sock_group_impl *group_impl;
1027 	int fd;
1028 
1029 #if defined(__linux__)
1030 	fd = epoll_create1(0);
1031 #elif defined(__FreeBSD__)
1032 	fd = kqueue();
1033 #endif
1034 	if (fd == -1) {
1035 		return NULL;
1036 	}
1037 
1038 	group_impl = calloc(1, sizeof(*group_impl));
1039 	if (group_impl == NULL) {
1040 		SPDK_ERRLOG("group_impl allocation failed\n");
1041 		close(fd);
1042 		return NULL;
1043 	}
1044 
1045 	group_impl->fd = fd;
1046 	TAILQ_INIT(&group_impl->pending_recv);
1047 
1048 	return &group_impl->base;
1049 }
1050 
1051 static int
1052 spdk_posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1053 {
1054 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1055 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1056 	int rc;
1057 
1058 #if defined(__linux__)
1059 	struct epoll_event event;
1060 
1061 	memset(&event, 0, sizeof(event));
1062 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1063 	event.events = EPOLLIN | EPOLLERR;
1064 	event.data.ptr = sock;
1065 
1066 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1067 #elif defined(__FreeBSD__)
1068 	struct kevent event;
1069 	struct timespec ts = {0};
1070 
1071 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1072 
1073 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1074 #endif
1075 	return rc;
1076 }
1077 
1078 static int
1079 spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1080 {
1081 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1082 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1083 	int rc;
1084 
1085 	if (sock->recv_pipe != NULL) {
1086 		if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
1087 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1088 			sock->pending_recv = false;
1089 		}
1090 	}
1091 
1092 #if defined(__linux__)
1093 	struct epoll_event event;
1094 
1095 	/* Event parameter is ignored but some old kernel version still require it. */
1096 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1097 #elif defined(__FreeBSD__)
1098 	struct kevent event;
1099 	struct timespec ts = {0};
1100 
1101 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1102 
1103 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1104 	if (rc == 0 && event.flags & EV_ERROR) {
1105 		rc = -1;
1106 		errno = event.data;
1107 	}
1108 #endif
1109 
1110 	spdk_sock_abort_requests(_sock);
1111 
1112 	return rc;
1113 }
1114 
1115 static int
1116 spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1117 				struct spdk_sock **socks)
1118 {
1119 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1120 	struct spdk_sock *sock, *tmp;
1121 	int num_events, i, rc;
1122 	struct spdk_posix_sock *psock, *ptmp;
1123 #if defined(__linux__)
1124 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1125 #elif defined(__FreeBSD__)
1126 	struct kevent events[MAX_EVENTS_PER_POLL];
1127 	struct timespec ts = {0};
1128 #endif
1129 
1130 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1131 	 * a completion callback could remove the sock from the
1132 	 * group. */
1133 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1134 		rc = _sock_flush(sock);
1135 		if (rc) {
1136 			spdk_sock_abort_requests(sock);
1137 		}
1138 	}
1139 
1140 #if defined(__linux__)
1141 	num_events = epoll_wait(group->fd, events, max_events, 0);
1142 #elif defined(__FreeBSD__)
1143 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1144 #endif
1145 
1146 	if (num_events == -1) {
1147 		return -1;
1148 	}
1149 
1150 	for (i = 0; i < num_events; i++) {
1151 #if defined(__linux__)
1152 		sock = events[i].data.ptr;
1153 		psock = __posix_sock(sock);
1154 
1155 #ifdef SPDK_ZEROCOPY
1156 		if (events[i].events & EPOLLERR) {
1157 			rc = _sock_check_zcopy(sock);
1158 			/* If the socket was closed or removed from
1159 			 * the group in response to a send ack, don't
1160 			 * add it to the array here. */
1161 			if (rc || sock->cb_fn == NULL) {
1162 				continue;
1163 			}
1164 		}
1165 #endif
1166 		if ((events[i].events & EPOLLIN) == 0) {
1167 			continue;
1168 		}
1169 
1170 #elif defined(__FreeBSD__)
1171 		sock = events[i].udata;
1172 		psock = __posix_sock(sock);
1173 #endif
1174 
1175 		/* If the socket does not already have recv pending, add it now */
1176 		if (!psock->pending_recv) {
1177 			psock->pending_recv = true;
1178 			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1179 		}
1180 	}
1181 
1182 	num_events = 0;
1183 
1184 	TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) {
1185 		if (num_events == max_events) {
1186 			break;
1187 		}
1188 
1189 		socks[num_events++] = &psock->base;
1190 	}
1191 
1192 	/* Cycle the pending_recv list so that each time we poll things aren't
1193 	 * in the same order. */
1194 	for (i = 0; i < num_events; i++) {
1195 		psock = __posix_sock(socks[i]);
1196 
1197 		TAILQ_REMOVE(&group->pending_recv, psock, link);
1198 
1199 		if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) {
1200 			psock->pending_recv = false;
1201 		} else {
1202 			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1203 		}
1204 
1205 	}
1206 
1207 	return num_events;
1208 }
1209 
1210 static int
1211 spdk_posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1212 {
1213 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1214 	int rc;
1215 
1216 	rc = close(group->fd);
1217 	free(group);
1218 	return rc;
1219 }
1220 
1221 static struct spdk_net_impl g_posix_net_impl = {
1222 	.name		= "posix",
1223 	.getaddr	= spdk_posix_sock_getaddr,
1224 	.connect	= spdk_posix_sock_connect,
1225 	.listen		= spdk_posix_sock_listen,
1226 	.accept		= spdk_posix_sock_accept,
1227 	.close		= spdk_posix_sock_close,
1228 	.recv		= spdk_posix_sock_recv,
1229 	.readv		= spdk_posix_sock_readv,
1230 	.writev		= spdk_posix_sock_writev,
1231 	.writev_async	= spdk_posix_sock_writev_async,
1232 	.flush		= spdk_posix_sock_flush,
1233 	.set_recvlowat	= spdk_posix_sock_set_recvlowat,
1234 	.set_recvbuf	= spdk_posix_sock_set_recvbuf,
1235 	.set_sendbuf	= spdk_posix_sock_set_sendbuf,
1236 	.set_priority	= spdk_posix_sock_set_priority,
1237 	.is_ipv6	= spdk_posix_sock_is_ipv6,
1238 	.is_ipv4	= spdk_posix_sock_is_ipv4,
1239 	.is_connected	= spdk_posix_sock_is_connected,
1240 	.get_placement_id	= spdk_posix_sock_get_placement_id,
1241 	.group_impl_create	= spdk_posix_sock_group_impl_create,
1242 	.group_impl_add_sock	= spdk_posix_sock_group_impl_add_sock,
1243 	.group_impl_remove_sock = spdk_posix_sock_group_impl_remove_sock,
1244 	.group_impl_poll	= spdk_posix_sock_group_impl_poll,
1245 	.group_impl_close	= spdk_posix_sock_group_impl_close,
1246 };
1247 
1248 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
1249