1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #if defined(__linux__) 37 #include <sys/epoll.h> 38 #include <linux/errqueue.h> 39 #elif defined(__FreeBSD__) 40 #include <sys/event.h> 41 #endif 42 43 #include "spdk/log.h" 44 #include "spdk/pipe.h" 45 #include "spdk/sock.h" 46 #include "spdk/util.h" 47 #include "spdk/likely.h" 48 #include "spdk_internal/sock.h" 49 50 #define MAX_TMPBUF 1024 51 #define PORTNUMLEN 32 52 #define MIN_SO_RCVBUF_SIZE (2 * 1024 * 1024) 53 #define MIN_SO_SNDBUF_SIZE (2 * 1024 * 1024) 54 #define IOV_BATCH_SIZE 64 55 56 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 57 #define SPDK_ZEROCOPY 58 #endif 59 60 struct spdk_posix_sock { 61 struct spdk_sock base; 62 int fd; 63 64 uint32_t sendmsg_idx; 65 bool zcopy; 66 67 struct spdk_pipe *recv_pipe; 68 void *recv_buf; 69 int recv_buf_sz; 70 bool pending_recv; 71 72 TAILQ_ENTRY(spdk_posix_sock) link; 73 }; 74 75 struct spdk_posix_sock_group_impl { 76 struct spdk_sock_group_impl base; 77 int fd; 78 TAILQ_HEAD(, spdk_posix_sock) pending_recv; 79 }; 80 81 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 82 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 83 .send_buf_size = MIN_SO_SNDBUF_SIZE 84 }; 85 86 static int 87 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 88 { 89 const char *result = NULL; 90 91 if (sa == NULL || host == NULL) { 92 return -1; 93 } 94 95 switch (sa->sa_family) { 96 case AF_INET: 97 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 98 host, hlen); 99 break; 100 case AF_INET6: 101 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 102 host, hlen); 103 break; 104 default: 105 break; 106 } 107 108 if (result != NULL) { 109 return 0; 110 } else { 111 return -1; 112 } 113 } 114 115 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 116 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 117 118 static int 119 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 120 char *caddr, int clen, uint16_t *cport) 121 { 122 struct spdk_posix_sock *sock = __posix_sock(_sock); 123 struct sockaddr_storage sa; 124 socklen_t salen; 125 int rc; 126 127 assert(sock != NULL); 128 129 memset(&sa, 0, sizeof sa); 130 salen = sizeof sa; 131 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 132 if (rc != 0) { 133 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 134 return -1; 135 } 136 137 switch (sa.ss_family) { 138 case AF_UNIX: 139 /* Acceptable connection types that don't have IPs */ 140 return 0; 141 case AF_INET: 142 case AF_INET6: 143 /* Code below will get IP addresses */ 144 break; 145 default: 146 /* Unsupported socket family */ 147 return -1; 148 } 149 150 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 151 if (rc != 0) { 152 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 153 return -1; 154 } 155 156 if (sport) { 157 if (sa.ss_family == AF_INET) { 158 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 159 } else if (sa.ss_family == AF_INET6) { 160 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 161 } 162 } 163 164 memset(&sa, 0, sizeof sa); 165 salen = sizeof sa; 166 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 167 if (rc != 0) { 168 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 169 return -1; 170 } 171 172 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 173 if (rc != 0) { 174 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 175 return -1; 176 } 177 178 if (cport) { 179 if (sa.ss_family == AF_INET) { 180 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 181 } else if (sa.ss_family == AF_INET6) { 182 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 183 } 184 } 185 186 return 0; 187 } 188 189 enum posix_sock_create_type { 190 SPDK_SOCK_CREATE_LISTEN, 191 SPDK_SOCK_CREATE_CONNECT, 192 }; 193 194 static int 195 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 196 { 197 uint8_t *new_buf; 198 struct spdk_pipe *new_pipe; 199 struct iovec siov[2]; 200 struct iovec diov[2]; 201 int sbytes; 202 ssize_t bytes; 203 204 if (sock->recv_buf_sz == sz) { 205 return 0; 206 } 207 208 /* If the new size is 0, just free the pipe */ 209 if (sz == 0) { 210 spdk_pipe_destroy(sock->recv_pipe); 211 free(sock->recv_buf); 212 sock->recv_pipe = NULL; 213 sock->recv_buf = NULL; 214 return 0; 215 } else if (sz < MIN_SOCK_PIPE_SIZE) { 216 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 217 return -1; 218 } 219 220 /* Round up to next 64 byte multiple */ 221 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 222 if (!new_buf) { 223 SPDK_ERRLOG("socket recv buf allocation failed\n"); 224 return -ENOMEM; 225 } 226 227 new_pipe = spdk_pipe_create(new_buf, sz + 1); 228 if (new_pipe == NULL) { 229 SPDK_ERRLOG("socket pipe allocation failed\n"); 230 free(new_buf); 231 return -ENOMEM; 232 } 233 234 if (sock->recv_pipe != NULL) { 235 /* Pull all of the data out of the old pipe */ 236 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 237 if (sbytes > sz) { 238 /* Too much data to fit into the new pipe size */ 239 spdk_pipe_destroy(new_pipe); 240 free(new_buf); 241 return -EINVAL; 242 } 243 244 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 245 assert(sbytes == sz); 246 247 bytes = spdk_iovcpy(siov, 2, diov, 2); 248 spdk_pipe_writer_advance(new_pipe, bytes); 249 250 spdk_pipe_destroy(sock->recv_pipe); 251 free(sock->recv_buf); 252 } 253 254 sock->recv_buf_sz = sz; 255 sock->recv_buf = new_buf; 256 sock->recv_pipe = new_pipe; 257 258 return 0; 259 } 260 261 static int 262 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 263 { 264 struct spdk_posix_sock *sock = __posix_sock(_sock); 265 int rc; 266 267 assert(sock != NULL); 268 269 rc = posix_sock_alloc_pipe(sock, sz); 270 if (rc) { 271 return rc; 272 } 273 274 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ 275 if (sz < MIN_SO_RCVBUF_SIZE) { 276 sz = MIN_SO_RCVBUF_SIZE; 277 } 278 279 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 280 if (rc < 0) { 281 return rc; 282 } 283 284 return 0; 285 } 286 287 static int 288 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 289 { 290 struct spdk_posix_sock *sock = __posix_sock(_sock); 291 int rc; 292 293 assert(sock != NULL); 294 295 if (sz < MIN_SO_SNDBUF_SIZE) { 296 sz = MIN_SO_SNDBUF_SIZE; 297 } 298 299 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 300 if (rc < 0) { 301 return rc; 302 } 303 304 return 0; 305 } 306 307 static struct spdk_posix_sock * 308 posix_sock_alloc(int fd, bool enable_zero_copy) 309 { 310 struct spdk_posix_sock *sock; 311 #ifdef SPDK_ZEROCOPY 312 int rc; 313 int flag; 314 #endif 315 316 sock = calloc(1, sizeof(*sock)); 317 if (sock == NULL) { 318 SPDK_ERRLOG("sock allocation failed\n"); 319 return NULL; 320 } 321 322 sock->fd = fd; 323 324 #ifdef SPDK_ZEROCOPY 325 if (!enable_zero_copy) { 326 return sock; 327 } 328 329 /* Try to turn on zero copy sends */ 330 flag = 1; 331 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 332 if (rc == 0) { 333 sock->zcopy = true; 334 } 335 #endif 336 337 return sock; 338 } 339 340 static bool 341 sock_is_loopback(int fd) 342 { 343 struct ifaddrs *addrs, *tmp; 344 struct sockaddr_storage sa = {}; 345 socklen_t salen; 346 struct ifreq ifr = {}; 347 char ip_addr[256], ip_addr_tmp[256]; 348 int rc; 349 bool is_loopback = false; 350 351 salen = sizeof(sa); 352 rc = getsockname(fd, (struct sockaddr *)&sa, &salen); 353 if (rc != 0) { 354 return is_loopback; 355 } 356 357 memset(ip_addr, 0, sizeof(ip_addr)); 358 rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr)); 359 if (rc != 0) { 360 return is_loopback; 361 } 362 363 getifaddrs(&addrs); 364 for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) { 365 if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) && 366 (tmp->ifa_addr->sa_family == sa.ss_family)) { 367 memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp)); 368 rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp)); 369 if (rc != 0) { 370 continue; 371 } 372 373 if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) { 374 memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name)); 375 ioctl(fd, SIOCGIFFLAGS, &ifr); 376 if (ifr.ifr_flags & IFF_LOOPBACK) { 377 is_loopback = true; 378 } 379 goto end; 380 } 381 } 382 } 383 384 end: 385 freeifaddrs(addrs); 386 return is_loopback; 387 } 388 389 static struct spdk_sock * 390 posix_sock_create(const char *ip, int port, 391 enum posix_sock_create_type type, 392 struct spdk_sock_opts *opts) 393 { 394 struct spdk_posix_sock *sock; 395 char buf[MAX_TMPBUF]; 396 char portnum[PORTNUMLEN]; 397 char *p; 398 struct addrinfo hints, *res, *res0; 399 int fd, flag; 400 int val = 1; 401 int rc, sz; 402 bool enable_zero_copy = true; 403 404 if (ip == NULL) { 405 return NULL; 406 } 407 if (ip[0] == '[') { 408 snprintf(buf, sizeof(buf), "%s", ip + 1); 409 p = strchr(buf, ']'); 410 if (p != NULL) { 411 *p = '\0'; 412 } 413 ip = (const char *) &buf[0]; 414 } 415 416 snprintf(portnum, sizeof portnum, "%d", port); 417 memset(&hints, 0, sizeof hints); 418 hints.ai_family = PF_UNSPEC; 419 hints.ai_socktype = SOCK_STREAM; 420 hints.ai_flags = AI_NUMERICSERV; 421 hints.ai_flags |= AI_PASSIVE; 422 hints.ai_flags |= AI_NUMERICHOST; 423 rc = getaddrinfo(ip, portnum, &hints, &res0); 424 if (rc != 0) { 425 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 426 return NULL; 427 } 428 429 /* try listen */ 430 fd = -1; 431 for (res = res0; res != NULL; res = res->ai_next) { 432 retry: 433 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 434 if (fd < 0) { 435 /* error */ 436 continue; 437 } 438 439 sz = g_spdk_posix_sock_impl_opts.recv_buf_size; 440 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 441 if (rc) { 442 /* Not fatal */ 443 } 444 445 sz = g_spdk_posix_sock_impl_opts.send_buf_size; 446 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 447 if (rc) { 448 /* Not fatal */ 449 } 450 451 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 452 if (rc != 0) { 453 close(fd); 454 /* error */ 455 continue; 456 } 457 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 458 if (rc != 0) { 459 close(fd); 460 /* error */ 461 continue; 462 } 463 464 #if defined(SO_PRIORITY) 465 if (opts != NULL && opts->priority) { 466 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 467 if (rc != 0) { 468 close(fd); 469 /* error */ 470 continue; 471 } 472 } 473 #endif 474 475 if (res->ai_family == AF_INET6) { 476 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 477 if (rc != 0) { 478 close(fd); 479 /* error */ 480 continue; 481 } 482 } 483 484 if (type == SPDK_SOCK_CREATE_LISTEN) { 485 rc = bind(fd, res->ai_addr, res->ai_addrlen); 486 if (rc != 0) { 487 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 488 switch (errno) { 489 case EINTR: 490 /* interrupted? */ 491 close(fd); 492 goto retry; 493 case EADDRNOTAVAIL: 494 SPDK_ERRLOG("IP address %s not available. " 495 "Verify IP address in config file " 496 "and make sure setup script is " 497 "run before starting spdk app.\n", ip); 498 /* FALLTHROUGH */ 499 default: 500 /* try next family */ 501 close(fd); 502 fd = -1; 503 continue; 504 } 505 } 506 /* bind OK */ 507 rc = listen(fd, 512); 508 if (rc != 0) { 509 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 510 close(fd); 511 fd = -1; 512 break; 513 } 514 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 515 rc = connect(fd, res->ai_addr, res->ai_addrlen); 516 if (rc != 0) { 517 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 518 /* try next family */ 519 close(fd); 520 fd = -1; 521 continue; 522 } 523 } 524 525 flag = fcntl(fd, F_GETFL); 526 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 527 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 528 close(fd); 529 fd = -1; 530 break; 531 } 532 break; 533 } 534 freeaddrinfo(res0); 535 536 if (fd < 0) { 537 return NULL; 538 } 539 540 if (type == SPDK_SOCK_CREATE_LISTEN) { 541 /* Only enable zero copy for non-loopback sockets. */ 542 enable_zero_copy = !sock_is_loopback(fd); 543 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 544 /* Disable zero copy for client sockets until support is added */ 545 enable_zero_copy = false; 546 } 547 548 sock = posix_sock_alloc(fd, enable_zero_copy); 549 if (sock == NULL) { 550 SPDK_ERRLOG("sock allocation failed\n"); 551 close(fd); 552 return NULL; 553 } 554 555 return &sock->base; 556 } 557 558 static struct spdk_sock * 559 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 560 { 561 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 562 } 563 564 static struct spdk_sock * 565 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 566 { 567 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 568 } 569 570 static struct spdk_sock * 571 posix_sock_accept(struct spdk_sock *_sock) 572 { 573 struct spdk_posix_sock *sock = __posix_sock(_sock); 574 struct sockaddr_storage sa; 575 socklen_t salen; 576 int rc, fd; 577 struct spdk_posix_sock *new_sock; 578 int flag; 579 580 memset(&sa, 0, sizeof(sa)); 581 salen = sizeof(sa); 582 583 assert(sock != NULL); 584 585 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 586 587 if (rc == -1) { 588 return NULL; 589 } 590 591 fd = rc; 592 593 flag = fcntl(fd, F_GETFL); 594 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 595 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 596 close(fd); 597 return NULL; 598 } 599 600 #if defined(SO_PRIORITY) 601 /* The priority is not inherited, so call this function again */ 602 if (sock->base.opts.priority) { 603 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 604 if (rc != 0) { 605 close(fd); 606 return NULL; 607 } 608 } 609 #endif 610 611 /* Inherit the zero copy feature from the listen socket */ 612 new_sock = posix_sock_alloc(fd, sock->zcopy); 613 if (new_sock == NULL) { 614 close(fd); 615 return NULL; 616 } 617 618 return &new_sock->base; 619 } 620 621 static int 622 posix_sock_close(struct spdk_sock *_sock) 623 { 624 struct spdk_posix_sock *sock = __posix_sock(_sock); 625 626 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 627 628 /* If the socket fails to close, the best choice is to 629 * leak the fd but continue to free the rest of the sock 630 * memory. */ 631 close(sock->fd); 632 633 spdk_pipe_destroy(sock->recv_pipe); 634 free(sock->recv_buf); 635 free(sock); 636 637 return 0; 638 } 639 640 #ifdef SPDK_ZEROCOPY 641 static int 642 _sock_check_zcopy(struct spdk_sock *sock) 643 { 644 struct spdk_posix_sock *psock = __posix_sock(sock); 645 struct msghdr msgh = {}; 646 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 647 ssize_t rc; 648 struct sock_extended_err *serr; 649 struct cmsghdr *cm; 650 uint32_t idx; 651 struct spdk_sock_request *req, *treq; 652 bool found; 653 654 msgh.msg_control = buf; 655 msgh.msg_controllen = sizeof(buf); 656 657 while (true) { 658 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 659 660 if (rc < 0) { 661 if (errno == EWOULDBLOCK || errno == EAGAIN) { 662 return 0; 663 } 664 665 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 666 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 667 } else { 668 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 669 } 670 return 0; 671 } 672 673 cm = CMSG_FIRSTHDR(&msgh); 674 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 675 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 676 return 0; 677 } 678 679 serr = (struct sock_extended_err *)CMSG_DATA(cm); 680 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 681 SPDK_WARNLOG("Unexpected extended error origin\n"); 682 return 0; 683 } 684 685 /* Most of the time, the pending_reqs array is in the exact 686 * order we need such that all of the requests to complete are 687 * in order, in the front. It is guaranteed that all requests 688 * belonging to the same sendmsg call are sequential, so once 689 * we encounter one match we can stop looping as soon as a 690 * non-match is found. 691 */ 692 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 693 found = false; 694 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 695 if (req->internal.offset == idx) { 696 found = true; 697 698 rc = spdk_sock_request_put(sock, req, 0); 699 if (rc < 0) { 700 return rc; 701 } 702 703 } else if (found) { 704 break; 705 } 706 } 707 708 } 709 } 710 711 return 0; 712 } 713 #endif 714 715 static int 716 _sock_flush(struct spdk_sock *sock) 717 { 718 struct spdk_posix_sock *psock = __posix_sock(sock); 719 struct msghdr msg = {}; 720 int flags; 721 struct iovec iovs[IOV_BATCH_SIZE]; 722 int iovcnt; 723 int retval; 724 struct spdk_sock_request *req; 725 int i; 726 ssize_t rc; 727 unsigned int offset; 728 size_t len; 729 730 /* Can't flush from within a callback or we end up with recursive calls */ 731 if (sock->cb_cnt > 0) { 732 return 0; 733 } 734 735 /* Gather an iov */ 736 iovcnt = 0; 737 req = TAILQ_FIRST(&sock->queued_reqs); 738 while (req) { 739 offset = req->internal.offset; 740 741 for (i = 0; i < req->iovcnt; i++) { 742 /* Consume any offset first */ 743 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 744 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 745 continue; 746 } 747 748 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 749 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 750 iovcnt++; 751 752 offset = 0; 753 754 if (iovcnt >= IOV_BATCH_SIZE) { 755 break; 756 } 757 } 758 759 if (iovcnt >= IOV_BATCH_SIZE) { 760 break; 761 } 762 763 req = TAILQ_NEXT(req, internal.link); 764 } 765 766 if (iovcnt == 0) { 767 return 0; 768 } 769 770 /* Perform the vectored write */ 771 msg.msg_iov = iovs; 772 msg.msg_iovlen = iovcnt; 773 #ifdef SPDK_ZEROCOPY 774 if (psock->zcopy) { 775 flags = MSG_ZEROCOPY; 776 } else 777 #endif 778 { 779 flags = 0; 780 } 781 rc = sendmsg(psock->fd, &msg, flags); 782 if (rc <= 0) { 783 if (errno == EAGAIN || errno == EWOULDBLOCK) { 784 return 0; 785 } 786 return rc; 787 } 788 789 psock->sendmsg_idx++; 790 791 /* Consume the requests that were actually written */ 792 req = TAILQ_FIRST(&sock->queued_reqs); 793 while (req) { 794 offset = req->internal.offset; 795 796 for (i = 0; i < req->iovcnt; i++) { 797 /* Advance by the offset first */ 798 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 799 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 800 continue; 801 } 802 803 /* Calculate the remaining length of this element */ 804 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 805 806 if (len > (size_t)rc) { 807 /* This element was partially sent. */ 808 req->internal.offset += rc; 809 return 0; 810 } 811 812 offset = 0; 813 req->internal.offset += len; 814 rc -= len; 815 } 816 817 /* Handled a full request. */ 818 spdk_sock_request_pend(sock, req); 819 820 if (!psock->zcopy) { 821 /* The sendmsg syscall above isn't currently asynchronous, 822 * so it's already done. */ 823 retval = spdk_sock_request_put(sock, req, 0); 824 if (retval) { 825 break; 826 } 827 } else { 828 /* Re-use the offset field to hold the sendmsg call index. The 829 * index is 0 based, so subtract one here because we've already 830 * incremented above. */ 831 req->internal.offset = psock->sendmsg_idx - 1; 832 } 833 834 if (rc == 0) { 835 break; 836 } 837 838 req = TAILQ_FIRST(&sock->queued_reqs); 839 } 840 841 return 0; 842 } 843 844 static int 845 posix_sock_flush(struct spdk_sock *_sock) 846 { 847 return _sock_flush(_sock); 848 } 849 850 static ssize_t 851 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 852 { 853 struct iovec siov[2]; 854 int sbytes; 855 ssize_t bytes; 856 struct spdk_posix_sock_group_impl *group; 857 858 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 859 if (sbytes < 0) { 860 errno = EINVAL; 861 return -1; 862 } else if (sbytes == 0) { 863 errno = EAGAIN; 864 return -1; 865 } 866 867 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 868 869 if (bytes == 0) { 870 /* The only way this happens is if diov is 0 length */ 871 errno = EINVAL; 872 return -1; 873 } 874 875 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 876 877 /* If we drained the pipe, take it off the level-triggered list */ 878 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 879 group = __posix_group_impl(sock->base.group_impl); 880 TAILQ_REMOVE(&group->pending_recv, sock, link); 881 sock->pending_recv = false; 882 } 883 884 return bytes; 885 } 886 887 static inline ssize_t 888 posix_sock_read(struct spdk_posix_sock *sock) 889 { 890 struct iovec iov[2]; 891 int bytes; 892 struct spdk_posix_sock_group_impl *group; 893 894 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 895 896 if (bytes > 0) { 897 bytes = readv(sock->fd, iov, 2); 898 if (bytes > 0) { 899 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 900 if (sock->base.group_impl) { 901 group = __posix_group_impl(sock->base.group_impl); 902 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 903 sock->pending_recv = true; 904 } 905 } 906 } 907 908 return bytes; 909 } 910 911 static ssize_t 912 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 913 { 914 struct spdk_posix_sock *sock = __posix_sock(_sock); 915 int rc, i; 916 size_t len; 917 918 if (sock->recv_pipe == NULL) { 919 return readv(sock->fd, iov, iovcnt); 920 } 921 922 len = 0; 923 for (i = 0; i < iovcnt; i++) { 924 len += iov[i].iov_len; 925 } 926 927 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 928 /* If the user is receiving a sufficiently large amount of data, 929 * receive directly to their buffers. */ 930 if (len >= MIN_SOCK_PIPE_SIZE) { 931 return readv(sock->fd, iov, iovcnt); 932 } 933 934 /* Otherwise, do a big read into our pipe */ 935 rc = posix_sock_read(sock); 936 if (rc <= 0) { 937 return rc; 938 } 939 } 940 941 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 942 } 943 944 static ssize_t 945 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 946 { 947 struct iovec iov[1]; 948 949 iov[0].iov_base = buf; 950 iov[0].iov_len = len; 951 952 return posix_sock_readv(sock, iov, 1); 953 } 954 955 static ssize_t 956 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 957 { 958 struct spdk_posix_sock *sock = __posix_sock(_sock); 959 int rc; 960 961 /* In order to process a writev, we need to flush any asynchronous writes 962 * first. */ 963 rc = _sock_flush(_sock); 964 if (rc < 0) { 965 return rc; 966 } 967 968 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 969 /* We weren't able to flush all requests */ 970 errno = EAGAIN; 971 return -1; 972 } 973 974 return writev(sock->fd, iov, iovcnt); 975 } 976 977 static void 978 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 979 { 980 int rc; 981 982 spdk_sock_request_queue(sock, req); 983 984 /* If there are a sufficient number queued, just flush them out immediately. */ 985 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 986 rc = _sock_flush(sock); 987 if (rc) { 988 spdk_sock_abort_requests(sock); 989 } 990 } 991 } 992 993 static int 994 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 995 { 996 struct spdk_posix_sock *sock = __posix_sock(_sock); 997 int val; 998 int rc; 999 1000 assert(sock != NULL); 1001 1002 val = nbytes; 1003 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1004 if (rc != 0) { 1005 return -1; 1006 } 1007 return 0; 1008 } 1009 1010 static bool 1011 posix_sock_is_ipv6(struct spdk_sock *_sock) 1012 { 1013 struct spdk_posix_sock *sock = __posix_sock(_sock); 1014 struct sockaddr_storage sa; 1015 socklen_t salen; 1016 int rc; 1017 1018 assert(sock != NULL); 1019 1020 memset(&sa, 0, sizeof sa); 1021 salen = sizeof sa; 1022 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1023 if (rc != 0) { 1024 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1025 return false; 1026 } 1027 1028 return (sa.ss_family == AF_INET6); 1029 } 1030 1031 static bool 1032 posix_sock_is_ipv4(struct spdk_sock *_sock) 1033 { 1034 struct spdk_posix_sock *sock = __posix_sock(_sock); 1035 struct sockaddr_storage sa; 1036 socklen_t salen; 1037 int rc; 1038 1039 assert(sock != NULL); 1040 1041 memset(&sa, 0, sizeof sa); 1042 salen = sizeof sa; 1043 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1044 if (rc != 0) { 1045 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1046 return false; 1047 } 1048 1049 return (sa.ss_family == AF_INET); 1050 } 1051 1052 static bool 1053 posix_sock_is_connected(struct spdk_sock *_sock) 1054 { 1055 struct spdk_posix_sock *sock = __posix_sock(_sock); 1056 uint8_t byte; 1057 int rc; 1058 1059 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1060 if (rc == 0) { 1061 return false; 1062 } 1063 1064 if (rc < 0) { 1065 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1066 return true; 1067 } 1068 1069 return false; 1070 } 1071 1072 return true; 1073 } 1074 1075 static int 1076 posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1077 { 1078 int rc = -1; 1079 1080 #if defined(SO_INCOMING_NAPI_ID) 1081 struct spdk_posix_sock *sock = __posix_sock(_sock); 1082 socklen_t salen = sizeof(int); 1083 1084 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1085 if (rc != 0) { 1086 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1087 } 1088 1089 #endif 1090 return rc; 1091 } 1092 1093 static struct spdk_sock_group_impl * 1094 posix_sock_group_impl_create(void) 1095 { 1096 struct spdk_posix_sock_group_impl *group_impl; 1097 int fd; 1098 1099 #if defined(__linux__) 1100 fd = epoll_create1(0); 1101 #elif defined(__FreeBSD__) 1102 fd = kqueue(); 1103 #endif 1104 if (fd == -1) { 1105 return NULL; 1106 } 1107 1108 group_impl = calloc(1, sizeof(*group_impl)); 1109 if (group_impl == NULL) { 1110 SPDK_ERRLOG("group_impl allocation failed\n"); 1111 close(fd); 1112 return NULL; 1113 } 1114 1115 group_impl->fd = fd; 1116 TAILQ_INIT(&group_impl->pending_recv); 1117 1118 return &group_impl->base; 1119 } 1120 1121 static int 1122 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1123 { 1124 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1125 struct spdk_posix_sock *sock = __posix_sock(_sock); 1126 int rc; 1127 1128 #if defined(__linux__) 1129 struct epoll_event event; 1130 1131 memset(&event, 0, sizeof(event)); 1132 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1133 event.events = EPOLLIN | EPOLLERR; 1134 event.data.ptr = sock; 1135 1136 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1137 #elif defined(__FreeBSD__) 1138 struct kevent event; 1139 struct timespec ts = {0}; 1140 1141 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1142 1143 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1144 #endif 1145 1146 /* switched from another polling group due to scheduling */ 1147 if (spdk_unlikely(sock->recv_pipe != NULL && 1148 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1149 assert(sock->pending_recv == false); 1150 sock->pending_recv = true; 1151 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1152 } 1153 1154 return rc; 1155 } 1156 1157 static int 1158 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1159 { 1160 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1161 struct spdk_posix_sock *sock = __posix_sock(_sock); 1162 int rc; 1163 1164 if (sock->recv_pipe != NULL) { 1165 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { 1166 TAILQ_REMOVE(&group->pending_recv, sock, link); 1167 sock->pending_recv = false; 1168 } 1169 assert(sock->pending_recv == false); 1170 } 1171 1172 #if defined(__linux__) 1173 struct epoll_event event; 1174 1175 /* Event parameter is ignored but some old kernel version still require it. */ 1176 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1177 #elif defined(__FreeBSD__) 1178 struct kevent event; 1179 struct timespec ts = {0}; 1180 1181 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1182 1183 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1184 if (rc == 0 && event.flags & EV_ERROR) { 1185 rc = -1; 1186 errno = event.data; 1187 } 1188 #endif 1189 1190 spdk_sock_abort_requests(_sock); 1191 1192 return rc; 1193 } 1194 1195 static int 1196 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1197 struct spdk_sock **socks) 1198 { 1199 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1200 struct spdk_sock *sock, *tmp; 1201 int num_events, i, rc; 1202 struct spdk_posix_sock *psock, *ptmp; 1203 #if defined(__linux__) 1204 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1205 #elif defined(__FreeBSD__) 1206 struct kevent events[MAX_EVENTS_PER_POLL]; 1207 struct timespec ts = {0}; 1208 #endif 1209 1210 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1211 * a completion callback could remove the sock from the 1212 * group. */ 1213 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1214 rc = _sock_flush(sock); 1215 if (rc) { 1216 spdk_sock_abort_requests(sock); 1217 } 1218 } 1219 1220 #if defined(__linux__) 1221 num_events = epoll_wait(group->fd, events, max_events, 0); 1222 #elif defined(__FreeBSD__) 1223 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1224 #endif 1225 1226 if (num_events == -1) { 1227 return -1; 1228 } 1229 1230 for (i = 0; i < num_events; i++) { 1231 #if defined(__linux__) 1232 sock = events[i].data.ptr; 1233 psock = __posix_sock(sock); 1234 1235 #ifdef SPDK_ZEROCOPY 1236 if (events[i].events & EPOLLERR) { 1237 rc = _sock_check_zcopy(sock); 1238 /* If the socket was closed or removed from 1239 * the group in response to a send ack, don't 1240 * add it to the array here. */ 1241 if (rc || sock->cb_fn == NULL) { 1242 continue; 1243 } 1244 } 1245 #endif 1246 if ((events[i].events & EPOLLIN) == 0) { 1247 continue; 1248 } 1249 1250 #elif defined(__FreeBSD__) 1251 sock = events[i].udata; 1252 psock = __posix_sock(sock); 1253 #endif 1254 1255 /* If the socket does not already have recv pending, add it now */ 1256 if (!psock->pending_recv) { 1257 psock->pending_recv = true; 1258 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1259 } 1260 } 1261 1262 num_events = 0; 1263 1264 TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) { 1265 if (num_events == max_events) { 1266 break; 1267 } 1268 1269 socks[num_events++] = &psock->base; 1270 } 1271 1272 /* Cycle the pending_recv list so that each time we poll things aren't 1273 * in the same order. */ 1274 for (i = 0; i < num_events; i++) { 1275 psock = __posix_sock(socks[i]); 1276 1277 TAILQ_REMOVE(&group->pending_recv, psock, link); 1278 1279 if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) { 1280 psock->pending_recv = false; 1281 } else { 1282 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1283 } 1284 1285 } 1286 1287 return num_events; 1288 } 1289 1290 static int 1291 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1292 { 1293 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1294 int rc; 1295 1296 rc = close(group->fd); 1297 free(group); 1298 return rc; 1299 } 1300 1301 static int 1302 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1303 { 1304 if (!opts || !len) { 1305 errno = EINVAL; 1306 return -1; 1307 } 1308 1309 #define FIELD_OK(field) \ 1310 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1311 1312 if (FIELD_OK(recv_buf_size)) { 1313 opts->recv_buf_size = g_spdk_posix_sock_impl_opts.recv_buf_size; 1314 } 1315 if (FIELD_OK(send_buf_size)) { 1316 opts->send_buf_size = g_spdk_posix_sock_impl_opts.send_buf_size; 1317 } 1318 1319 #undef FIELD_OK 1320 1321 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 1322 return 0; 1323 } 1324 1325 static int 1326 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1327 { 1328 if (!opts) { 1329 errno = EINVAL; 1330 return -1; 1331 } 1332 1333 #define FIELD_OK(field) \ 1334 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1335 1336 if (FIELD_OK(recv_buf_size)) { 1337 g_spdk_posix_sock_impl_opts.recv_buf_size = opts->recv_buf_size; 1338 } 1339 if (FIELD_OK(send_buf_size)) { 1340 g_spdk_posix_sock_impl_opts.send_buf_size = opts->send_buf_size; 1341 } 1342 1343 #undef FIELD_OK 1344 1345 return 0; 1346 } 1347 1348 1349 static struct spdk_net_impl g_posix_net_impl = { 1350 .name = "posix", 1351 .getaddr = posix_sock_getaddr, 1352 .connect = posix_sock_connect, 1353 .listen = posix_sock_listen, 1354 .accept = posix_sock_accept, 1355 .close = posix_sock_close, 1356 .recv = posix_sock_recv, 1357 .readv = posix_sock_readv, 1358 .writev = posix_sock_writev, 1359 .writev_async = posix_sock_writev_async, 1360 .flush = posix_sock_flush, 1361 .set_recvlowat = posix_sock_set_recvlowat, 1362 .set_recvbuf = posix_sock_set_recvbuf, 1363 .set_sendbuf = posix_sock_set_sendbuf, 1364 .is_ipv6 = posix_sock_is_ipv6, 1365 .is_ipv4 = posix_sock_is_ipv4, 1366 .is_connected = posix_sock_is_connected, 1367 .get_placement_id = posix_sock_get_placement_id, 1368 .group_impl_create = posix_sock_group_impl_create, 1369 .group_impl_add_sock = posix_sock_group_impl_add_sock, 1370 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 1371 .group_impl_poll = posix_sock_group_impl_poll, 1372 .group_impl_close = posix_sock_group_impl_close, 1373 .get_opts = posix_sock_impl_get_opts, 1374 .set_opts = posix_sock_impl_set_opts, 1375 }; 1376 1377 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1378