1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #if defined(__linux__) 37 #include <sys/epoll.h> 38 #include <linux/errqueue.h> 39 #elif defined(__FreeBSD__) 40 #include <sys/event.h> 41 #endif 42 43 #include "spdk/log.h" 44 #include "spdk/pipe.h" 45 #include "spdk/sock.h" 46 #include "spdk/util.h" 47 #include "spdk/likely.h" 48 #include "spdk_internal/sock.h" 49 50 #define MAX_TMPBUF 1024 51 #define PORTNUMLEN 32 52 #define IOV_BATCH_SIZE 64 53 54 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 55 #define SPDK_ZEROCOPY 56 #endif 57 58 struct spdk_posix_sock { 59 struct spdk_sock base; 60 int fd; 61 62 uint32_t sendmsg_idx; 63 64 struct spdk_pipe *recv_pipe; 65 void *recv_buf; 66 int recv_buf_sz; 67 bool pending_recv; 68 bool zcopy; 69 int so_priority; 70 71 TAILQ_ENTRY(spdk_posix_sock) link; 72 }; 73 74 struct spdk_posix_sock_group_impl { 75 struct spdk_sock_group_impl base; 76 int fd; 77 TAILQ_HEAD(, spdk_posix_sock) pending_recv; 78 }; 79 80 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 81 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 82 .send_buf_size = MIN_SO_SNDBUF_SIZE, 83 .enable_recv_pipe = true, 84 .enable_zerocopy_send = true, 85 .enable_quickack = false, 86 .enable_placement_id = false, 87 }; 88 89 static int 90 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 91 { 92 const char *result = NULL; 93 94 if (sa == NULL || host == NULL) { 95 return -1; 96 } 97 98 switch (sa->sa_family) { 99 case AF_INET: 100 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 101 host, hlen); 102 break; 103 case AF_INET6: 104 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 105 host, hlen); 106 break; 107 default: 108 break; 109 } 110 111 if (result != NULL) { 112 return 0; 113 } else { 114 return -1; 115 } 116 } 117 118 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 119 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 120 121 static int 122 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 123 char *caddr, int clen, uint16_t *cport) 124 { 125 struct spdk_posix_sock *sock = __posix_sock(_sock); 126 struct sockaddr_storage sa; 127 socklen_t salen; 128 int rc; 129 130 assert(sock != NULL); 131 132 memset(&sa, 0, sizeof sa); 133 salen = sizeof sa; 134 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 135 if (rc != 0) { 136 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 137 return -1; 138 } 139 140 switch (sa.ss_family) { 141 case AF_UNIX: 142 /* Acceptable connection types that don't have IPs */ 143 return 0; 144 case AF_INET: 145 case AF_INET6: 146 /* Code below will get IP addresses */ 147 break; 148 default: 149 /* Unsupported socket family */ 150 return -1; 151 } 152 153 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 154 if (rc != 0) { 155 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 156 return -1; 157 } 158 159 if (sport) { 160 if (sa.ss_family == AF_INET) { 161 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 162 } else if (sa.ss_family == AF_INET6) { 163 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 164 } 165 } 166 167 memset(&sa, 0, sizeof sa); 168 salen = sizeof sa; 169 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 170 if (rc != 0) { 171 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 172 return -1; 173 } 174 175 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 176 if (rc != 0) { 177 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 178 return -1; 179 } 180 181 if (cport) { 182 if (sa.ss_family == AF_INET) { 183 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 184 } else if (sa.ss_family == AF_INET6) { 185 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 186 } 187 } 188 189 return 0; 190 } 191 192 enum posix_sock_create_type { 193 SPDK_SOCK_CREATE_LISTEN, 194 SPDK_SOCK_CREATE_CONNECT, 195 }; 196 197 static int 198 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 199 { 200 uint8_t *new_buf; 201 struct spdk_pipe *new_pipe; 202 struct iovec siov[2]; 203 struct iovec diov[2]; 204 int sbytes; 205 ssize_t bytes; 206 207 if (sock->recv_buf_sz == sz) { 208 return 0; 209 } 210 211 /* If the new size is 0, just free the pipe */ 212 if (sz == 0) { 213 spdk_pipe_destroy(sock->recv_pipe); 214 free(sock->recv_buf); 215 sock->recv_pipe = NULL; 216 sock->recv_buf = NULL; 217 return 0; 218 } else if (sz < MIN_SOCK_PIPE_SIZE) { 219 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 220 return -1; 221 } 222 223 /* Round up to next 64 byte multiple */ 224 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 225 if (!new_buf) { 226 SPDK_ERRLOG("socket recv buf allocation failed\n"); 227 return -ENOMEM; 228 } 229 230 new_pipe = spdk_pipe_create(new_buf, sz + 1); 231 if (new_pipe == NULL) { 232 SPDK_ERRLOG("socket pipe allocation failed\n"); 233 free(new_buf); 234 return -ENOMEM; 235 } 236 237 if (sock->recv_pipe != NULL) { 238 /* Pull all of the data out of the old pipe */ 239 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 240 if (sbytes > sz) { 241 /* Too much data to fit into the new pipe size */ 242 spdk_pipe_destroy(new_pipe); 243 free(new_buf); 244 return -EINVAL; 245 } 246 247 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 248 assert(sbytes == sz); 249 250 bytes = spdk_iovcpy(siov, 2, diov, 2); 251 spdk_pipe_writer_advance(new_pipe, bytes); 252 253 spdk_pipe_destroy(sock->recv_pipe); 254 free(sock->recv_buf); 255 } 256 257 sock->recv_buf_sz = sz; 258 sock->recv_buf = new_buf; 259 sock->recv_pipe = new_pipe; 260 261 return 0; 262 } 263 264 static int 265 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 266 { 267 struct spdk_posix_sock *sock = __posix_sock(_sock); 268 int rc; 269 270 assert(sock != NULL); 271 272 if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) { 273 rc = posix_sock_alloc_pipe(sock, sz); 274 if (rc) { 275 return rc; 276 } 277 } 278 279 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ 280 if (sz < MIN_SO_RCVBUF_SIZE) { 281 sz = MIN_SO_RCVBUF_SIZE; 282 } 283 284 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 285 if (rc < 0) { 286 return rc; 287 } 288 289 return 0; 290 } 291 292 static int 293 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 294 { 295 struct spdk_posix_sock *sock = __posix_sock(_sock); 296 int rc; 297 298 assert(sock != NULL); 299 300 if (sz < MIN_SO_SNDBUF_SIZE) { 301 sz = MIN_SO_SNDBUF_SIZE; 302 } 303 304 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 305 if (rc < 0) { 306 return rc; 307 } 308 309 return 0; 310 } 311 312 static struct spdk_posix_sock * 313 posix_sock_alloc(int fd, bool enable_zero_copy) 314 { 315 struct spdk_posix_sock *sock; 316 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 317 int flag; 318 int rc; 319 #endif 320 321 sock = calloc(1, sizeof(*sock)); 322 if (sock == NULL) { 323 SPDK_ERRLOG("sock allocation failed\n"); 324 return NULL; 325 } 326 327 sock->fd = fd; 328 329 #if defined(SPDK_ZEROCOPY) 330 flag = 1; 331 332 if (!enable_zero_copy || !g_spdk_posix_sock_impl_opts.enable_zerocopy_send) { 333 return sock; 334 } 335 336 /* Try to turn on zero copy sends */ 337 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 338 if (rc == 0) { 339 sock->zcopy = true; 340 } 341 #endif 342 343 #if defined(__linux__) 344 flag = 1; 345 346 if (g_spdk_posix_sock_impl_opts.enable_quickack) { 347 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 348 if (rc != 0) { 349 SPDK_ERRLOG("quickack was failed to set\n"); 350 } 351 } 352 #endif 353 354 return sock; 355 } 356 357 static bool 358 sock_is_loopback(int fd) 359 { 360 struct ifaddrs *addrs, *tmp; 361 struct sockaddr_storage sa = {}; 362 socklen_t salen; 363 struct ifreq ifr = {}; 364 char ip_addr[256], ip_addr_tmp[256]; 365 int rc; 366 bool is_loopback = false; 367 368 salen = sizeof(sa); 369 rc = getsockname(fd, (struct sockaddr *)&sa, &salen); 370 if (rc != 0) { 371 return is_loopback; 372 } 373 374 memset(ip_addr, 0, sizeof(ip_addr)); 375 rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr)); 376 if (rc != 0) { 377 return is_loopback; 378 } 379 380 getifaddrs(&addrs); 381 for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) { 382 if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) && 383 (tmp->ifa_addr->sa_family == sa.ss_family)) { 384 memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp)); 385 rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp)); 386 if (rc != 0) { 387 continue; 388 } 389 390 if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) { 391 memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name)); 392 ioctl(fd, SIOCGIFFLAGS, &ifr); 393 if (ifr.ifr_flags & IFF_LOOPBACK) { 394 is_loopback = true; 395 } 396 goto end; 397 } 398 } 399 } 400 401 end: 402 freeifaddrs(addrs); 403 return is_loopback; 404 } 405 406 static struct spdk_sock * 407 posix_sock_create(const char *ip, int port, 408 enum posix_sock_create_type type, 409 struct spdk_sock_opts *opts) 410 { 411 struct spdk_posix_sock *sock; 412 char buf[MAX_TMPBUF]; 413 char portnum[PORTNUMLEN]; 414 char *p; 415 struct addrinfo hints, *res, *res0; 416 int fd, flag; 417 int val = 1; 418 int rc, sz; 419 bool enable_zero_copy = true; 420 421 assert(opts != NULL); 422 423 if (ip == NULL) { 424 return NULL; 425 } 426 if (ip[0] == '[') { 427 snprintf(buf, sizeof(buf), "%s", ip + 1); 428 p = strchr(buf, ']'); 429 if (p != NULL) { 430 *p = '\0'; 431 } 432 ip = (const char *) &buf[0]; 433 } 434 435 snprintf(portnum, sizeof portnum, "%d", port); 436 memset(&hints, 0, sizeof hints); 437 hints.ai_family = PF_UNSPEC; 438 hints.ai_socktype = SOCK_STREAM; 439 hints.ai_flags = AI_NUMERICSERV; 440 hints.ai_flags |= AI_PASSIVE; 441 hints.ai_flags |= AI_NUMERICHOST; 442 rc = getaddrinfo(ip, portnum, &hints, &res0); 443 if (rc != 0) { 444 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 445 return NULL; 446 } 447 448 /* try listen */ 449 fd = -1; 450 for (res = res0; res != NULL; res = res->ai_next) { 451 retry: 452 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 453 if (fd < 0) { 454 /* error */ 455 continue; 456 } 457 458 sz = g_spdk_posix_sock_impl_opts.recv_buf_size; 459 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 460 if (rc) { 461 /* Not fatal */ 462 } 463 464 sz = g_spdk_posix_sock_impl_opts.send_buf_size; 465 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 466 if (rc) { 467 /* Not fatal */ 468 } 469 470 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 471 if (rc != 0) { 472 close(fd); 473 /* error */ 474 continue; 475 } 476 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 477 if (rc != 0) { 478 close(fd); 479 /* error */ 480 continue; 481 } 482 483 #if defined(SO_PRIORITY) 484 if (opts->priority) { 485 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 486 if (rc != 0) { 487 close(fd); 488 /* error */ 489 continue; 490 } 491 } 492 #endif 493 494 if (res->ai_family == AF_INET6) { 495 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 496 if (rc != 0) { 497 close(fd); 498 /* error */ 499 continue; 500 } 501 } 502 503 if (type == SPDK_SOCK_CREATE_LISTEN) { 504 rc = bind(fd, res->ai_addr, res->ai_addrlen); 505 if (rc != 0) { 506 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 507 switch (errno) { 508 case EINTR: 509 /* interrupted? */ 510 close(fd); 511 goto retry; 512 case EADDRNOTAVAIL: 513 SPDK_ERRLOG("IP address %s not available. " 514 "Verify IP address in config file " 515 "and make sure setup script is " 516 "run before starting spdk app.\n", ip); 517 /* FALLTHROUGH */ 518 default: 519 /* try next family */ 520 close(fd); 521 fd = -1; 522 continue; 523 } 524 } 525 /* bind OK */ 526 rc = listen(fd, 512); 527 if (rc != 0) { 528 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 529 close(fd); 530 fd = -1; 531 break; 532 } 533 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 534 rc = connect(fd, res->ai_addr, res->ai_addrlen); 535 if (rc != 0) { 536 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 537 /* try next family */ 538 close(fd); 539 fd = -1; 540 continue; 541 } 542 } 543 544 flag = fcntl(fd, F_GETFL); 545 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 546 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 547 close(fd); 548 fd = -1; 549 break; 550 } 551 break; 552 } 553 freeaddrinfo(res0); 554 555 if (fd < 0) { 556 return NULL; 557 } 558 559 /* Only enable zero copy for non-loopback sockets. */ 560 enable_zero_copy = opts->zcopy && !sock_is_loopback(fd); 561 562 sock = posix_sock_alloc(fd, enable_zero_copy); 563 if (sock == NULL) { 564 SPDK_ERRLOG("sock allocation failed\n"); 565 close(fd); 566 return NULL; 567 } 568 569 if (opts != NULL) { 570 sock->so_priority = opts->priority; 571 } 572 return &sock->base; 573 } 574 575 static struct spdk_sock * 576 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 577 { 578 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 579 } 580 581 static struct spdk_sock * 582 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 583 { 584 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 585 } 586 587 static struct spdk_sock * 588 posix_sock_accept(struct spdk_sock *_sock) 589 { 590 struct spdk_posix_sock *sock = __posix_sock(_sock); 591 struct sockaddr_storage sa; 592 socklen_t salen; 593 int rc, fd; 594 struct spdk_posix_sock *new_sock; 595 int flag; 596 597 memset(&sa, 0, sizeof(sa)); 598 salen = sizeof(sa); 599 600 assert(sock != NULL); 601 602 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 603 604 if (rc == -1) { 605 return NULL; 606 } 607 608 fd = rc; 609 610 flag = fcntl(fd, F_GETFL); 611 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 612 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 613 close(fd); 614 return NULL; 615 } 616 617 #if defined(SO_PRIORITY) 618 /* The priority is not inherited, so call this function again */ 619 if (sock->base.opts.priority) { 620 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 621 if (rc != 0) { 622 close(fd); 623 return NULL; 624 } 625 } 626 #endif 627 628 /* Inherit the zero copy feature from the listen socket */ 629 new_sock = posix_sock_alloc(fd, sock->zcopy); 630 if (new_sock == NULL) { 631 close(fd); 632 return NULL; 633 } 634 new_sock->so_priority = sock->base.opts.priority; 635 636 return &new_sock->base; 637 } 638 639 static int 640 posix_sock_close(struct spdk_sock *_sock) 641 { 642 struct spdk_posix_sock *sock = __posix_sock(_sock); 643 644 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 645 646 /* If the socket fails to close, the best choice is to 647 * leak the fd but continue to free the rest of the sock 648 * memory. */ 649 close(sock->fd); 650 651 spdk_pipe_destroy(sock->recv_pipe); 652 free(sock->recv_buf); 653 free(sock); 654 655 return 0; 656 } 657 658 #ifdef SPDK_ZEROCOPY 659 static int 660 _sock_check_zcopy(struct spdk_sock *sock) 661 { 662 struct spdk_posix_sock *psock = __posix_sock(sock); 663 struct msghdr msgh = {}; 664 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 665 ssize_t rc; 666 struct sock_extended_err *serr; 667 struct cmsghdr *cm; 668 uint32_t idx; 669 struct spdk_sock_request *req, *treq; 670 bool found; 671 672 msgh.msg_control = buf; 673 msgh.msg_controllen = sizeof(buf); 674 675 while (true) { 676 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 677 678 if (rc < 0) { 679 if (errno == EWOULDBLOCK || errno == EAGAIN) { 680 return 0; 681 } 682 683 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 684 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 685 } else { 686 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 687 } 688 return 0; 689 } 690 691 cm = CMSG_FIRSTHDR(&msgh); 692 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 693 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 694 return 0; 695 } 696 697 serr = (struct sock_extended_err *)CMSG_DATA(cm); 698 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 699 SPDK_WARNLOG("Unexpected extended error origin\n"); 700 return 0; 701 } 702 703 /* Most of the time, the pending_reqs array is in the exact 704 * order we need such that all of the requests to complete are 705 * in order, in the front. It is guaranteed that all requests 706 * belonging to the same sendmsg call are sequential, so once 707 * we encounter one match we can stop looping as soon as a 708 * non-match is found. 709 */ 710 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 711 found = false; 712 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 713 if (req->internal.offset == idx) { 714 found = true; 715 716 rc = spdk_sock_request_put(sock, req, 0); 717 if (rc < 0) { 718 return rc; 719 } 720 721 } else if (found) { 722 break; 723 } 724 } 725 726 } 727 } 728 729 return 0; 730 } 731 #endif 732 733 static int 734 _sock_flush(struct spdk_sock *sock) 735 { 736 struct spdk_posix_sock *psock = __posix_sock(sock); 737 struct msghdr msg = {}; 738 int flags; 739 struct iovec iovs[IOV_BATCH_SIZE]; 740 int iovcnt; 741 int retval; 742 struct spdk_sock_request *req; 743 int i; 744 ssize_t rc; 745 unsigned int offset; 746 size_t len; 747 748 /* Can't flush from within a callback or we end up with recursive calls */ 749 if (sock->cb_cnt > 0) { 750 return 0; 751 } 752 753 /* Gather an iov */ 754 iovcnt = 0; 755 req = TAILQ_FIRST(&sock->queued_reqs); 756 while (req) { 757 offset = req->internal.offset; 758 759 for (i = 0; i < req->iovcnt; i++) { 760 /* Consume any offset first */ 761 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 762 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 763 continue; 764 } 765 766 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 767 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 768 iovcnt++; 769 770 offset = 0; 771 772 if (iovcnt >= IOV_BATCH_SIZE) { 773 break; 774 } 775 } 776 777 if (iovcnt >= IOV_BATCH_SIZE) { 778 break; 779 } 780 781 req = TAILQ_NEXT(req, internal.link); 782 } 783 784 if (iovcnt == 0) { 785 return 0; 786 } 787 788 /* Perform the vectored write */ 789 msg.msg_iov = iovs; 790 msg.msg_iovlen = iovcnt; 791 #ifdef SPDK_ZEROCOPY 792 if (psock->zcopy) { 793 flags = MSG_ZEROCOPY; 794 } else 795 #endif 796 { 797 flags = 0; 798 } 799 rc = sendmsg(psock->fd, &msg, flags); 800 if (rc <= 0) { 801 if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 802 return 0; 803 } 804 return rc; 805 } 806 807 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 808 * req->internal.offset, so sendmsg_idx should not be zero */ 809 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 810 psock->sendmsg_idx = 1; 811 } else { 812 psock->sendmsg_idx++; 813 } 814 815 /* Consume the requests that were actually written */ 816 req = TAILQ_FIRST(&sock->queued_reqs); 817 while (req) { 818 offset = req->internal.offset; 819 820 for (i = 0; i < req->iovcnt; i++) { 821 /* Advance by the offset first */ 822 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 823 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 824 continue; 825 } 826 827 /* Calculate the remaining length of this element */ 828 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 829 830 if (len > (size_t)rc) { 831 /* This element was partially sent. */ 832 req->internal.offset += rc; 833 return 0; 834 } 835 836 offset = 0; 837 req->internal.offset += len; 838 rc -= len; 839 } 840 841 /* Handled a full request. */ 842 spdk_sock_request_pend(sock, req); 843 844 if (!psock->zcopy) { 845 /* The sendmsg syscall above isn't currently asynchronous, 846 * so it's already done. */ 847 retval = spdk_sock_request_put(sock, req, 0); 848 if (retval) { 849 break; 850 } 851 } else { 852 /* Re-use the offset field to hold the sendmsg call index. The 853 * index is 0 based, so subtract one here because we've already 854 * incremented above. */ 855 req->internal.offset = psock->sendmsg_idx - 1; 856 } 857 858 if (rc == 0) { 859 break; 860 } 861 862 req = TAILQ_FIRST(&sock->queued_reqs); 863 } 864 865 return 0; 866 } 867 868 static int 869 posix_sock_flush(struct spdk_sock *_sock) 870 { 871 return _sock_flush(_sock); 872 } 873 874 static ssize_t 875 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 876 { 877 struct iovec siov[2]; 878 int sbytes; 879 ssize_t bytes; 880 struct spdk_posix_sock_group_impl *group; 881 882 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 883 if (sbytes < 0) { 884 errno = EINVAL; 885 return -1; 886 } else if (sbytes == 0) { 887 errno = EAGAIN; 888 return -1; 889 } 890 891 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 892 893 if (bytes == 0) { 894 /* The only way this happens is if diov is 0 length */ 895 errno = EINVAL; 896 return -1; 897 } 898 899 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 900 901 /* If we drained the pipe, take it off the level-triggered list */ 902 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 903 group = __posix_group_impl(sock->base.group_impl); 904 TAILQ_REMOVE(&group->pending_recv, sock, link); 905 sock->pending_recv = false; 906 } 907 908 return bytes; 909 } 910 911 static inline ssize_t 912 posix_sock_read(struct spdk_posix_sock *sock) 913 { 914 struct iovec iov[2]; 915 int bytes; 916 struct spdk_posix_sock_group_impl *group; 917 918 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 919 920 if (bytes > 0) { 921 bytes = readv(sock->fd, iov, 2); 922 if (bytes > 0) { 923 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 924 if (sock->base.group_impl) { 925 group = __posix_group_impl(sock->base.group_impl); 926 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 927 sock->pending_recv = true; 928 } 929 } 930 } 931 932 return bytes; 933 } 934 935 static ssize_t 936 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 937 { 938 struct spdk_posix_sock *sock = __posix_sock(_sock); 939 int rc, i; 940 size_t len; 941 942 if (sock->recv_pipe == NULL) { 943 return readv(sock->fd, iov, iovcnt); 944 } 945 946 len = 0; 947 for (i = 0; i < iovcnt; i++) { 948 len += iov[i].iov_len; 949 } 950 951 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 952 /* If the user is receiving a sufficiently large amount of data, 953 * receive directly to their buffers. */ 954 if (len >= MIN_SOCK_PIPE_SIZE) { 955 return readv(sock->fd, iov, iovcnt); 956 } 957 958 /* Otherwise, do a big read into our pipe */ 959 rc = posix_sock_read(sock); 960 if (rc <= 0) { 961 return rc; 962 } 963 } 964 965 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 966 } 967 968 static ssize_t 969 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 970 { 971 struct iovec iov[1]; 972 973 iov[0].iov_base = buf; 974 iov[0].iov_len = len; 975 976 return posix_sock_readv(sock, iov, 1); 977 } 978 979 static ssize_t 980 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 981 { 982 struct spdk_posix_sock *sock = __posix_sock(_sock); 983 int rc; 984 985 /* In order to process a writev, we need to flush any asynchronous writes 986 * first. */ 987 rc = _sock_flush(_sock); 988 if (rc < 0) { 989 return rc; 990 } 991 992 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 993 /* We weren't able to flush all requests */ 994 errno = EAGAIN; 995 return -1; 996 } 997 998 return writev(sock->fd, iov, iovcnt); 999 } 1000 1001 static void 1002 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1003 { 1004 int rc; 1005 1006 spdk_sock_request_queue(sock, req); 1007 1008 /* If there are a sufficient number queued, just flush them out immediately. */ 1009 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1010 rc = _sock_flush(sock); 1011 if (rc) { 1012 spdk_sock_abort_requests(sock); 1013 } 1014 } 1015 } 1016 1017 static int 1018 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1019 { 1020 struct spdk_posix_sock *sock = __posix_sock(_sock); 1021 int val; 1022 int rc; 1023 1024 assert(sock != NULL); 1025 1026 val = nbytes; 1027 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1028 if (rc != 0) { 1029 return -1; 1030 } 1031 return 0; 1032 } 1033 1034 static bool 1035 posix_sock_is_ipv6(struct spdk_sock *_sock) 1036 { 1037 struct spdk_posix_sock *sock = __posix_sock(_sock); 1038 struct sockaddr_storage sa; 1039 socklen_t salen; 1040 int rc; 1041 1042 assert(sock != NULL); 1043 1044 memset(&sa, 0, sizeof sa); 1045 salen = sizeof sa; 1046 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1047 if (rc != 0) { 1048 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1049 return false; 1050 } 1051 1052 return (sa.ss_family == AF_INET6); 1053 } 1054 1055 static bool 1056 posix_sock_is_ipv4(struct spdk_sock *_sock) 1057 { 1058 struct spdk_posix_sock *sock = __posix_sock(_sock); 1059 struct sockaddr_storage sa; 1060 socklen_t salen; 1061 int rc; 1062 1063 assert(sock != NULL); 1064 1065 memset(&sa, 0, sizeof sa); 1066 salen = sizeof sa; 1067 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1068 if (rc != 0) { 1069 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1070 return false; 1071 } 1072 1073 return (sa.ss_family == AF_INET); 1074 } 1075 1076 static bool 1077 posix_sock_is_connected(struct spdk_sock *_sock) 1078 { 1079 struct spdk_posix_sock *sock = __posix_sock(_sock); 1080 uint8_t byte; 1081 int rc; 1082 1083 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1084 if (rc == 0) { 1085 return false; 1086 } 1087 1088 if (rc < 0) { 1089 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1090 return true; 1091 } 1092 1093 return false; 1094 } 1095 1096 return true; 1097 } 1098 1099 static int 1100 posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1101 { 1102 int rc = -1; 1103 1104 if (!g_spdk_posix_sock_impl_opts.enable_placement_id) { 1105 return rc; 1106 } 1107 1108 #if defined(SO_INCOMING_NAPI_ID) 1109 struct spdk_posix_sock *sock = __posix_sock(_sock); 1110 socklen_t salen = sizeof(int); 1111 1112 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1113 if (rc != 0) { 1114 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1115 } 1116 1117 #endif 1118 return rc; 1119 } 1120 1121 static struct spdk_sock_group_impl * 1122 posix_sock_group_impl_create(void) 1123 { 1124 struct spdk_posix_sock_group_impl *group_impl; 1125 int fd; 1126 1127 #if defined(__linux__) 1128 fd = epoll_create1(0); 1129 #elif defined(__FreeBSD__) 1130 fd = kqueue(); 1131 #endif 1132 if (fd == -1) { 1133 return NULL; 1134 } 1135 1136 group_impl = calloc(1, sizeof(*group_impl)); 1137 if (group_impl == NULL) { 1138 SPDK_ERRLOG("group_impl allocation failed\n"); 1139 close(fd); 1140 return NULL; 1141 } 1142 1143 group_impl->fd = fd; 1144 TAILQ_INIT(&group_impl->pending_recv); 1145 1146 return &group_impl->base; 1147 } 1148 1149 static int 1150 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1151 { 1152 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1153 struct spdk_posix_sock *sock = __posix_sock(_sock); 1154 int rc; 1155 1156 #if defined(__linux__) 1157 struct epoll_event event; 1158 1159 memset(&event, 0, sizeof(event)); 1160 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1161 event.events = EPOLLIN | EPOLLERR; 1162 event.data.ptr = sock; 1163 1164 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1165 #elif defined(__FreeBSD__) 1166 struct kevent event; 1167 struct timespec ts = {0}; 1168 1169 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1170 1171 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1172 #endif 1173 1174 /* switched from another polling group due to scheduling */ 1175 if (spdk_unlikely(sock->recv_pipe != NULL && 1176 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1177 assert(sock->pending_recv == false); 1178 sock->pending_recv = true; 1179 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1180 } 1181 1182 return rc; 1183 } 1184 1185 static int 1186 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1187 { 1188 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1189 struct spdk_posix_sock *sock = __posix_sock(_sock); 1190 int rc; 1191 1192 if (sock->recv_pipe != NULL) { 1193 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { 1194 TAILQ_REMOVE(&group->pending_recv, sock, link); 1195 sock->pending_recv = false; 1196 } 1197 assert(sock->pending_recv == false); 1198 } 1199 1200 #if defined(__linux__) 1201 struct epoll_event event; 1202 1203 /* Event parameter is ignored but some old kernel version still require it. */ 1204 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1205 #elif defined(__FreeBSD__) 1206 struct kevent event; 1207 struct timespec ts = {0}; 1208 1209 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1210 1211 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1212 if (rc == 0 && event.flags & EV_ERROR) { 1213 rc = -1; 1214 errno = event.data; 1215 } 1216 #endif 1217 1218 spdk_sock_abort_requests(_sock); 1219 1220 return rc; 1221 } 1222 1223 static int 1224 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1225 struct spdk_sock **socks) 1226 { 1227 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1228 struct spdk_sock *sock, *tmp; 1229 int num_events, i, rc; 1230 struct spdk_posix_sock *psock, *ptmp; 1231 #if defined(__linux__) 1232 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1233 #elif defined(__FreeBSD__) 1234 struct kevent events[MAX_EVENTS_PER_POLL]; 1235 struct timespec ts = {0}; 1236 #endif 1237 1238 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1239 * a completion callback could remove the sock from the 1240 * group. */ 1241 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1242 rc = _sock_flush(sock); 1243 if (rc) { 1244 spdk_sock_abort_requests(sock); 1245 } 1246 } 1247 1248 #if defined(__linux__) 1249 num_events = epoll_wait(group->fd, events, max_events, 0); 1250 #elif defined(__FreeBSD__) 1251 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1252 #endif 1253 1254 if (num_events == -1) { 1255 return -1; 1256 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 1257 uint8_t byte; 1258 1259 sock = TAILQ_FIRST(&_group->socks); 1260 psock = __posix_sock(sock); 1261 /* a recv is done here to busy poll the queue associated with 1262 * first socket in list and potentially reap incoming data. 1263 */ 1264 if (psock->so_priority) { 1265 recv(psock->fd, &byte, 1, MSG_PEEK); 1266 } 1267 } 1268 1269 for (i = 0; i < num_events; i++) { 1270 #if defined(__linux__) 1271 sock = events[i].data.ptr; 1272 psock = __posix_sock(sock); 1273 1274 #ifdef SPDK_ZEROCOPY 1275 if (events[i].events & EPOLLERR) { 1276 rc = _sock_check_zcopy(sock); 1277 /* If the socket was closed or removed from 1278 * the group in response to a send ack, don't 1279 * add it to the array here. */ 1280 if (rc || sock->cb_fn == NULL) { 1281 continue; 1282 } 1283 } 1284 #endif 1285 if ((events[i].events & EPOLLIN) == 0) { 1286 continue; 1287 } 1288 1289 #elif defined(__FreeBSD__) 1290 sock = events[i].udata; 1291 psock = __posix_sock(sock); 1292 #endif 1293 1294 /* If the socket does not already have recv pending, add it now */ 1295 if (!psock->pending_recv) { 1296 psock->pending_recv = true; 1297 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1298 } 1299 } 1300 1301 num_events = 0; 1302 1303 TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) { 1304 if (num_events == max_events) { 1305 break; 1306 } 1307 1308 socks[num_events++] = &psock->base; 1309 } 1310 1311 /* Cycle the pending_recv list so that each time we poll things aren't 1312 * in the same order. */ 1313 for (i = 0; i < num_events; i++) { 1314 psock = __posix_sock(socks[i]); 1315 1316 TAILQ_REMOVE(&group->pending_recv, psock, link); 1317 1318 if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) { 1319 psock->pending_recv = false; 1320 } else { 1321 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1322 } 1323 1324 } 1325 1326 return num_events; 1327 } 1328 1329 static int 1330 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1331 { 1332 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1333 int rc; 1334 1335 rc = close(group->fd); 1336 free(group); 1337 return rc; 1338 } 1339 1340 static int 1341 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1342 { 1343 if (!opts || !len) { 1344 errno = EINVAL; 1345 return -1; 1346 } 1347 1348 #define FIELD_OK(field) \ 1349 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1350 1351 #define GET_FIELD(field) \ 1352 if (FIELD_OK(field)) { \ 1353 opts->field = g_spdk_posix_sock_impl_opts.field; \ 1354 } 1355 1356 GET_FIELD(recv_buf_size); 1357 GET_FIELD(send_buf_size); 1358 GET_FIELD(enable_recv_pipe); 1359 GET_FIELD(enable_zerocopy_send); 1360 GET_FIELD(enable_quickack); 1361 GET_FIELD(enable_placement_id); 1362 1363 #undef GET_FIELD 1364 #undef FIELD_OK 1365 1366 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 1367 return 0; 1368 } 1369 1370 static int 1371 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1372 { 1373 if (!opts) { 1374 errno = EINVAL; 1375 return -1; 1376 } 1377 1378 #define FIELD_OK(field) \ 1379 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1380 1381 #define SET_FIELD(field) \ 1382 if (FIELD_OK(field)) { \ 1383 g_spdk_posix_sock_impl_opts.field = opts->field; \ 1384 } 1385 1386 SET_FIELD(recv_buf_size); 1387 SET_FIELD(send_buf_size); 1388 SET_FIELD(enable_recv_pipe); 1389 SET_FIELD(enable_zerocopy_send); 1390 SET_FIELD(enable_quickack); 1391 SET_FIELD(enable_placement_id); 1392 1393 #undef SET_FIELD 1394 #undef FIELD_OK 1395 1396 return 0; 1397 } 1398 1399 1400 static struct spdk_net_impl g_posix_net_impl = { 1401 .name = "posix", 1402 .getaddr = posix_sock_getaddr, 1403 .connect = posix_sock_connect, 1404 .listen = posix_sock_listen, 1405 .accept = posix_sock_accept, 1406 .close = posix_sock_close, 1407 .recv = posix_sock_recv, 1408 .readv = posix_sock_readv, 1409 .writev = posix_sock_writev, 1410 .writev_async = posix_sock_writev_async, 1411 .flush = posix_sock_flush, 1412 .set_recvlowat = posix_sock_set_recvlowat, 1413 .set_recvbuf = posix_sock_set_recvbuf, 1414 .set_sendbuf = posix_sock_set_sendbuf, 1415 .is_ipv6 = posix_sock_is_ipv6, 1416 .is_ipv4 = posix_sock_is_ipv4, 1417 .is_connected = posix_sock_is_connected, 1418 .get_placement_id = posix_sock_get_placement_id, 1419 .group_impl_create = posix_sock_group_impl_create, 1420 .group_impl_add_sock = posix_sock_group_impl_add_sock, 1421 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 1422 .group_impl_poll = posix_sock_group_impl_poll, 1423 .group_impl_close = posix_sock_group_impl_close, 1424 .get_opts = posix_sock_impl_get_opts, 1425 .set_opts = posix_sock_impl_set_opts, 1426 }; 1427 1428 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1429