1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #if defined(__linux__) 37 #include <sys/epoll.h> 38 #include <linux/errqueue.h> 39 #elif defined(__FreeBSD__) 40 #include <sys/event.h> 41 #endif 42 43 #include "spdk/log.h" 44 #include "spdk/pipe.h" 45 #include "spdk/sock.h" 46 #include "spdk/util.h" 47 #include "spdk_internal/sock.h" 48 49 #define MAX_TMPBUF 1024 50 #define PORTNUMLEN 32 51 52 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 53 #define SPDK_ZEROCOPY 54 #endif 55 56 struct spdk_posix_sock { 57 struct spdk_sock base; 58 int fd; 59 60 uint32_t sendmsg_idx; 61 62 struct spdk_pipe *recv_pipe; 63 void *recv_buf; 64 int recv_buf_sz; 65 bool pending_recv; 66 bool zcopy; 67 int so_priority; 68 69 TAILQ_ENTRY(spdk_posix_sock) link; 70 }; 71 72 struct spdk_posix_sock_group_impl { 73 struct spdk_sock_group_impl base; 74 int fd; 75 TAILQ_HEAD(, spdk_posix_sock) pending_recv; 76 }; 77 78 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 79 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 80 .send_buf_size = MIN_SO_SNDBUF_SIZE, 81 .enable_recv_pipe = true, 82 .enable_zerocopy_send = false, 83 .enable_quickack = false, 84 .enable_placement_id = false, 85 }; 86 87 static int 88 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 89 { 90 const char *result = NULL; 91 92 if (sa == NULL || host == NULL) { 93 return -1; 94 } 95 96 switch (sa->sa_family) { 97 case AF_INET: 98 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 99 host, hlen); 100 break; 101 case AF_INET6: 102 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 103 host, hlen); 104 break; 105 default: 106 break; 107 } 108 109 if (result != NULL) { 110 return 0; 111 } else { 112 return -1; 113 } 114 } 115 116 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 117 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 118 119 static int 120 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 121 char *caddr, int clen, uint16_t *cport) 122 { 123 struct spdk_posix_sock *sock = __posix_sock(_sock); 124 struct sockaddr_storage sa; 125 socklen_t salen; 126 int rc; 127 128 assert(sock != NULL); 129 130 memset(&sa, 0, sizeof sa); 131 salen = sizeof sa; 132 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 133 if (rc != 0) { 134 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 135 return -1; 136 } 137 138 switch (sa.ss_family) { 139 case AF_UNIX: 140 /* Acceptable connection types that don't have IPs */ 141 return 0; 142 case AF_INET: 143 case AF_INET6: 144 /* Code below will get IP addresses */ 145 break; 146 default: 147 /* Unsupported socket family */ 148 return -1; 149 } 150 151 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 152 if (rc != 0) { 153 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 154 return -1; 155 } 156 157 if (sport) { 158 if (sa.ss_family == AF_INET) { 159 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 160 } else if (sa.ss_family == AF_INET6) { 161 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 162 } 163 } 164 165 memset(&sa, 0, sizeof sa); 166 salen = sizeof sa; 167 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 168 if (rc != 0) { 169 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 170 return -1; 171 } 172 173 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 174 if (rc != 0) { 175 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 176 return -1; 177 } 178 179 if (cport) { 180 if (sa.ss_family == AF_INET) { 181 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 182 } else if (sa.ss_family == AF_INET6) { 183 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 184 } 185 } 186 187 return 0; 188 } 189 190 enum posix_sock_create_type { 191 SPDK_SOCK_CREATE_LISTEN, 192 SPDK_SOCK_CREATE_CONNECT, 193 }; 194 195 static int 196 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 197 { 198 uint8_t *new_buf; 199 struct spdk_pipe *new_pipe; 200 struct iovec siov[2]; 201 struct iovec diov[2]; 202 int sbytes; 203 ssize_t bytes; 204 205 if (sock->recv_buf_sz == sz) { 206 return 0; 207 } 208 209 /* If the new size is 0, just free the pipe */ 210 if (sz == 0) { 211 spdk_pipe_destroy(sock->recv_pipe); 212 free(sock->recv_buf); 213 sock->recv_pipe = NULL; 214 sock->recv_buf = NULL; 215 return 0; 216 } else if (sz < MIN_SOCK_PIPE_SIZE) { 217 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 218 return -1; 219 } 220 221 /* Round up to next 64 byte multiple */ 222 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 223 if (!new_buf) { 224 SPDK_ERRLOG("socket recv buf allocation failed\n"); 225 return -ENOMEM; 226 } 227 228 new_pipe = spdk_pipe_create(new_buf, sz + 1); 229 if (new_pipe == NULL) { 230 SPDK_ERRLOG("socket pipe allocation failed\n"); 231 free(new_buf); 232 return -ENOMEM; 233 } 234 235 if (sock->recv_pipe != NULL) { 236 /* Pull all of the data out of the old pipe */ 237 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 238 if (sbytes > sz) { 239 /* Too much data to fit into the new pipe size */ 240 spdk_pipe_destroy(new_pipe); 241 free(new_buf); 242 return -EINVAL; 243 } 244 245 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 246 assert(sbytes == sz); 247 248 bytes = spdk_iovcpy(siov, 2, diov, 2); 249 spdk_pipe_writer_advance(new_pipe, bytes); 250 251 spdk_pipe_destroy(sock->recv_pipe); 252 free(sock->recv_buf); 253 } 254 255 sock->recv_buf_sz = sz; 256 sock->recv_buf = new_buf; 257 sock->recv_pipe = new_pipe; 258 259 return 0; 260 } 261 262 static int 263 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 264 { 265 struct spdk_posix_sock *sock = __posix_sock(_sock); 266 int rc; 267 268 assert(sock != NULL); 269 270 if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) { 271 rc = posix_sock_alloc_pipe(sock, sz); 272 if (rc) { 273 return rc; 274 } 275 } 276 277 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ 278 if (sz < MIN_SO_RCVBUF_SIZE) { 279 sz = MIN_SO_RCVBUF_SIZE; 280 } 281 282 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 283 if (rc < 0) { 284 return rc; 285 } 286 287 return 0; 288 } 289 290 static int 291 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 292 { 293 struct spdk_posix_sock *sock = __posix_sock(_sock); 294 int rc; 295 296 assert(sock != NULL); 297 298 if (sz < MIN_SO_SNDBUF_SIZE) { 299 sz = MIN_SO_SNDBUF_SIZE; 300 } 301 302 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 303 if (rc < 0) { 304 return rc; 305 } 306 307 return 0; 308 } 309 310 static struct spdk_posix_sock * 311 posix_sock_alloc(int fd, bool enable_zero_copy) 312 { 313 struct spdk_posix_sock *sock; 314 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 315 int flag; 316 int rc; 317 #endif 318 319 sock = calloc(1, sizeof(*sock)); 320 if (sock == NULL) { 321 SPDK_ERRLOG("sock allocation failed\n"); 322 return NULL; 323 } 324 325 sock->fd = fd; 326 327 #if defined(SPDK_ZEROCOPY) 328 flag = 1; 329 330 if (!enable_zero_copy || !g_spdk_posix_sock_impl_opts.enable_zerocopy_send) { 331 return sock; 332 } 333 334 /* Try to turn on zero copy sends */ 335 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 336 if (rc == 0) { 337 sock->zcopy = true; 338 } 339 #endif 340 341 #if defined(__linux__) 342 flag = 1; 343 344 if (g_spdk_posix_sock_impl_opts.enable_quickack) { 345 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 346 if (rc != 0) { 347 SPDK_ERRLOG("quickack was failed to set\n"); 348 } 349 } 350 #endif 351 352 return sock; 353 } 354 355 static bool 356 sock_is_loopback(int fd) 357 { 358 struct ifaddrs *addrs, *tmp; 359 struct sockaddr_storage sa = {}; 360 socklen_t salen; 361 struct ifreq ifr = {}; 362 char ip_addr[256], ip_addr_tmp[256]; 363 int rc; 364 bool is_loopback = false; 365 366 salen = sizeof(sa); 367 rc = getsockname(fd, (struct sockaddr *)&sa, &salen); 368 if (rc != 0) { 369 return is_loopback; 370 } 371 372 memset(ip_addr, 0, sizeof(ip_addr)); 373 rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr)); 374 if (rc != 0) { 375 return is_loopback; 376 } 377 378 getifaddrs(&addrs); 379 for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) { 380 if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) && 381 (tmp->ifa_addr->sa_family == sa.ss_family)) { 382 memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp)); 383 rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp)); 384 if (rc != 0) { 385 continue; 386 } 387 388 if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) { 389 memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name)); 390 ioctl(fd, SIOCGIFFLAGS, &ifr); 391 if (ifr.ifr_flags & IFF_LOOPBACK) { 392 is_loopback = true; 393 } 394 goto end; 395 } 396 } 397 } 398 399 end: 400 freeifaddrs(addrs); 401 return is_loopback; 402 } 403 404 static struct spdk_sock * 405 posix_sock_create(const char *ip, int port, 406 enum posix_sock_create_type type, 407 struct spdk_sock_opts *opts) 408 { 409 struct spdk_posix_sock *sock; 410 char buf[MAX_TMPBUF]; 411 char portnum[PORTNUMLEN]; 412 char *p; 413 struct addrinfo hints, *res, *res0; 414 int fd, flag; 415 int val = 1; 416 int rc, sz; 417 bool enable_zero_copy = true; 418 419 assert(opts != NULL); 420 421 if (ip == NULL) { 422 return NULL; 423 } 424 if (ip[0] == '[') { 425 snprintf(buf, sizeof(buf), "%s", ip + 1); 426 p = strchr(buf, ']'); 427 if (p != NULL) { 428 *p = '\0'; 429 } 430 ip = (const char *) &buf[0]; 431 } 432 433 snprintf(portnum, sizeof portnum, "%d", port); 434 memset(&hints, 0, sizeof hints); 435 hints.ai_family = PF_UNSPEC; 436 hints.ai_socktype = SOCK_STREAM; 437 hints.ai_flags = AI_NUMERICSERV; 438 hints.ai_flags |= AI_PASSIVE; 439 hints.ai_flags |= AI_NUMERICHOST; 440 rc = getaddrinfo(ip, portnum, &hints, &res0); 441 if (rc != 0) { 442 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 443 return NULL; 444 } 445 446 /* try listen */ 447 fd = -1; 448 for (res = res0; res != NULL; res = res->ai_next) { 449 retry: 450 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 451 if (fd < 0) { 452 /* error */ 453 continue; 454 } 455 456 sz = g_spdk_posix_sock_impl_opts.recv_buf_size; 457 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 458 if (rc) { 459 /* Not fatal */ 460 } 461 462 sz = g_spdk_posix_sock_impl_opts.send_buf_size; 463 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 464 if (rc) { 465 /* Not fatal */ 466 } 467 468 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 469 if (rc != 0) { 470 close(fd); 471 /* error */ 472 continue; 473 } 474 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 475 if (rc != 0) { 476 close(fd); 477 /* error */ 478 continue; 479 } 480 481 #if defined(SO_PRIORITY) 482 if (opts->priority) { 483 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 484 if (rc != 0) { 485 close(fd); 486 /* error */ 487 continue; 488 } 489 } 490 #endif 491 492 if (res->ai_family == AF_INET6) { 493 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 494 if (rc != 0) { 495 close(fd); 496 /* error */ 497 continue; 498 } 499 } 500 501 if (type == SPDK_SOCK_CREATE_LISTEN) { 502 rc = bind(fd, res->ai_addr, res->ai_addrlen); 503 if (rc != 0) { 504 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 505 switch (errno) { 506 case EINTR: 507 /* interrupted? */ 508 close(fd); 509 goto retry; 510 case EADDRNOTAVAIL: 511 SPDK_ERRLOG("IP address %s not available. " 512 "Verify IP address in config file " 513 "and make sure setup script is " 514 "run before starting spdk app.\n", ip); 515 /* FALLTHROUGH */ 516 default: 517 /* try next family */ 518 close(fd); 519 fd = -1; 520 continue; 521 } 522 } 523 /* bind OK */ 524 rc = listen(fd, 512); 525 if (rc != 0) { 526 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 527 close(fd); 528 fd = -1; 529 break; 530 } 531 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 532 rc = connect(fd, res->ai_addr, res->ai_addrlen); 533 if (rc != 0) { 534 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 535 /* try next family */ 536 close(fd); 537 fd = -1; 538 continue; 539 } 540 } 541 542 flag = fcntl(fd, F_GETFL); 543 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 544 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 545 close(fd); 546 fd = -1; 547 break; 548 } 549 break; 550 } 551 freeaddrinfo(res0); 552 553 if (fd < 0) { 554 return NULL; 555 } 556 557 /* Only enable zero copy for non-loopback sockets. */ 558 enable_zero_copy = opts->zcopy && !sock_is_loopback(fd); 559 560 sock = posix_sock_alloc(fd, enable_zero_copy); 561 if (sock == NULL) { 562 SPDK_ERRLOG("sock allocation failed\n"); 563 close(fd); 564 return NULL; 565 } 566 567 if (opts != NULL) { 568 sock->so_priority = opts->priority; 569 } 570 return &sock->base; 571 } 572 573 static struct spdk_sock * 574 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 575 { 576 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 577 } 578 579 static struct spdk_sock * 580 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 581 { 582 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 583 } 584 585 static struct spdk_sock * 586 posix_sock_accept(struct spdk_sock *_sock) 587 { 588 struct spdk_posix_sock *sock = __posix_sock(_sock); 589 struct sockaddr_storage sa; 590 socklen_t salen; 591 int rc, fd; 592 struct spdk_posix_sock *new_sock; 593 int flag; 594 595 memset(&sa, 0, sizeof(sa)); 596 salen = sizeof(sa); 597 598 assert(sock != NULL); 599 600 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 601 602 if (rc == -1) { 603 return NULL; 604 } 605 606 fd = rc; 607 608 flag = fcntl(fd, F_GETFL); 609 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 610 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 611 close(fd); 612 return NULL; 613 } 614 615 #if defined(SO_PRIORITY) 616 /* The priority is not inherited, so call this function again */ 617 if (sock->base.opts.priority) { 618 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 619 if (rc != 0) { 620 close(fd); 621 return NULL; 622 } 623 } 624 #endif 625 626 /* Inherit the zero copy feature from the listen socket */ 627 new_sock = posix_sock_alloc(fd, sock->zcopy); 628 if (new_sock == NULL) { 629 close(fd); 630 return NULL; 631 } 632 new_sock->so_priority = sock->base.opts.priority; 633 634 return &new_sock->base; 635 } 636 637 static int 638 posix_sock_close(struct spdk_sock *_sock) 639 { 640 struct spdk_posix_sock *sock = __posix_sock(_sock); 641 642 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 643 644 /* If the socket fails to close, the best choice is to 645 * leak the fd but continue to free the rest of the sock 646 * memory. */ 647 close(sock->fd); 648 649 spdk_pipe_destroy(sock->recv_pipe); 650 free(sock->recv_buf); 651 free(sock); 652 653 return 0; 654 } 655 656 #ifdef SPDK_ZEROCOPY 657 static int 658 _sock_check_zcopy(struct spdk_sock *sock) 659 { 660 struct spdk_posix_sock *psock = __posix_sock(sock); 661 struct msghdr msgh = {}; 662 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 663 ssize_t rc; 664 struct sock_extended_err *serr; 665 struct cmsghdr *cm; 666 uint32_t idx; 667 struct spdk_sock_request *req, *treq; 668 bool found; 669 670 msgh.msg_control = buf; 671 msgh.msg_controllen = sizeof(buf); 672 673 while (true) { 674 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 675 676 if (rc < 0) { 677 if (errno == EWOULDBLOCK || errno == EAGAIN) { 678 return 0; 679 } 680 681 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 682 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 683 } else { 684 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 685 } 686 return 0; 687 } 688 689 cm = CMSG_FIRSTHDR(&msgh); 690 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 691 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 692 return 0; 693 } 694 695 serr = (struct sock_extended_err *)CMSG_DATA(cm); 696 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 697 SPDK_WARNLOG("Unexpected extended error origin\n"); 698 return 0; 699 } 700 701 /* Most of the time, the pending_reqs array is in the exact 702 * order we need such that all of the requests to complete are 703 * in order, in the front. It is guaranteed that all requests 704 * belonging to the same sendmsg call are sequential, so once 705 * we encounter one match we can stop looping as soon as a 706 * non-match is found. 707 */ 708 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 709 found = false; 710 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 711 if (req->internal.offset == idx) { 712 found = true; 713 714 rc = spdk_sock_request_put(sock, req, 0); 715 if (rc < 0) { 716 return rc; 717 } 718 719 } else if (found) { 720 break; 721 } 722 } 723 724 } 725 } 726 727 return 0; 728 } 729 #endif 730 731 static int 732 _sock_flush(struct spdk_sock *sock) 733 { 734 struct spdk_posix_sock *psock = __posix_sock(sock); 735 struct msghdr msg = {}; 736 int flags; 737 struct iovec iovs[IOV_BATCH_SIZE]; 738 int iovcnt; 739 int retval; 740 struct spdk_sock_request *req; 741 int i; 742 ssize_t rc; 743 unsigned int offset; 744 size_t len; 745 746 /* Can't flush from within a callback or we end up with recursive calls */ 747 if (sock->cb_cnt > 0) { 748 return 0; 749 } 750 751 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL); 752 753 if (iovcnt == 0) { 754 return 0; 755 } 756 757 /* Perform the vectored write */ 758 msg.msg_iov = iovs; 759 msg.msg_iovlen = iovcnt; 760 #ifdef SPDK_ZEROCOPY 761 if (psock->zcopy) { 762 flags = MSG_ZEROCOPY; 763 } else 764 #endif 765 { 766 flags = 0; 767 } 768 rc = sendmsg(psock->fd, &msg, flags); 769 if (rc <= 0) { 770 if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 771 return 0; 772 } 773 return rc; 774 } 775 776 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 777 * req->internal.offset, so sendmsg_idx should not be zero */ 778 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 779 psock->sendmsg_idx = 1; 780 } else { 781 psock->sendmsg_idx++; 782 } 783 784 /* Consume the requests that were actually written */ 785 req = TAILQ_FIRST(&sock->queued_reqs); 786 while (req) { 787 offset = req->internal.offset; 788 789 for (i = 0; i < req->iovcnt; i++) { 790 /* Advance by the offset first */ 791 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 792 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 793 continue; 794 } 795 796 /* Calculate the remaining length of this element */ 797 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 798 799 if (len > (size_t)rc) { 800 /* This element was partially sent. */ 801 req->internal.offset += rc; 802 return 0; 803 } 804 805 offset = 0; 806 req->internal.offset += len; 807 rc -= len; 808 } 809 810 /* Handled a full request. */ 811 spdk_sock_request_pend(sock, req); 812 813 if (!psock->zcopy) { 814 /* The sendmsg syscall above isn't currently asynchronous, 815 * so it's already done. */ 816 retval = spdk_sock_request_put(sock, req, 0); 817 if (retval) { 818 break; 819 } 820 } else { 821 /* Re-use the offset field to hold the sendmsg call index. The 822 * index is 0 based, so subtract one here because we've already 823 * incremented above. */ 824 req->internal.offset = psock->sendmsg_idx - 1; 825 } 826 827 if (rc == 0) { 828 break; 829 } 830 831 req = TAILQ_FIRST(&sock->queued_reqs); 832 } 833 834 return 0; 835 } 836 837 static int 838 posix_sock_flush(struct spdk_sock *_sock) 839 { 840 return _sock_flush(_sock); 841 } 842 843 static ssize_t 844 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 845 { 846 struct iovec siov[2]; 847 int sbytes; 848 ssize_t bytes; 849 struct spdk_posix_sock_group_impl *group; 850 851 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 852 if (sbytes < 0) { 853 errno = EINVAL; 854 return -1; 855 } else if (sbytes == 0) { 856 errno = EAGAIN; 857 return -1; 858 } 859 860 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 861 862 if (bytes == 0) { 863 /* The only way this happens is if diov is 0 length */ 864 errno = EINVAL; 865 return -1; 866 } 867 868 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 869 870 /* If we drained the pipe, take it off the level-triggered list */ 871 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 872 group = __posix_group_impl(sock->base.group_impl); 873 TAILQ_REMOVE(&group->pending_recv, sock, link); 874 sock->pending_recv = false; 875 } 876 877 return bytes; 878 } 879 880 static inline ssize_t 881 posix_sock_read(struct spdk_posix_sock *sock) 882 { 883 struct iovec iov[2]; 884 int bytes; 885 struct spdk_posix_sock_group_impl *group; 886 887 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 888 889 if (bytes > 0) { 890 bytes = readv(sock->fd, iov, 2); 891 if (bytes > 0) { 892 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 893 if (sock->base.group_impl) { 894 group = __posix_group_impl(sock->base.group_impl); 895 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 896 sock->pending_recv = true; 897 } 898 } 899 } 900 901 return bytes; 902 } 903 904 static ssize_t 905 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 906 { 907 struct spdk_posix_sock *sock = __posix_sock(_sock); 908 int rc, i; 909 size_t len; 910 911 if (sock->recv_pipe == NULL) { 912 return readv(sock->fd, iov, iovcnt); 913 } 914 915 len = 0; 916 for (i = 0; i < iovcnt; i++) { 917 len += iov[i].iov_len; 918 } 919 920 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 921 /* If the user is receiving a sufficiently large amount of data, 922 * receive directly to their buffers. */ 923 if (len >= MIN_SOCK_PIPE_SIZE) { 924 return readv(sock->fd, iov, iovcnt); 925 } 926 927 /* Otherwise, do a big read into our pipe */ 928 rc = posix_sock_read(sock); 929 if (rc <= 0) { 930 return rc; 931 } 932 } 933 934 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 935 } 936 937 static ssize_t 938 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 939 { 940 struct iovec iov[1]; 941 942 iov[0].iov_base = buf; 943 iov[0].iov_len = len; 944 945 return posix_sock_readv(sock, iov, 1); 946 } 947 948 static ssize_t 949 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 950 { 951 struct spdk_posix_sock *sock = __posix_sock(_sock); 952 int rc; 953 954 /* In order to process a writev, we need to flush any asynchronous writes 955 * first. */ 956 rc = _sock_flush(_sock); 957 if (rc < 0) { 958 return rc; 959 } 960 961 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 962 /* We weren't able to flush all requests */ 963 errno = EAGAIN; 964 return -1; 965 } 966 967 return writev(sock->fd, iov, iovcnt); 968 } 969 970 static void 971 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 972 { 973 int rc; 974 975 spdk_sock_request_queue(sock, req); 976 977 /* If there are a sufficient number queued, just flush them out immediately. */ 978 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 979 rc = _sock_flush(sock); 980 if (rc) { 981 spdk_sock_abort_requests(sock); 982 } 983 } 984 } 985 986 static int 987 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 988 { 989 struct spdk_posix_sock *sock = __posix_sock(_sock); 990 int val; 991 int rc; 992 993 assert(sock != NULL); 994 995 val = nbytes; 996 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 997 if (rc != 0) { 998 return -1; 999 } 1000 return 0; 1001 } 1002 1003 static bool 1004 posix_sock_is_ipv6(struct spdk_sock *_sock) 1005 { 1006 struct spdk_posix_sock *sock = __posix_sock(_sock); 1007 struct sockaddr_storage sa; 1008 socklen_t salen; 1009 int rc; 1010 1011 assert(sock != NULL); 1012 1013 memset(&sa, 0, sizeof sa); 1014 salen = sizeof sa; 1015 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1016 if (rc != 0) { 1017 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1018 return false; 1019 } 1020 1021 return (sa.ss_family == AF_INET6); 1022 } 1023 1024 static bool 1025 posix_sock_is_ipv4(struct spdk_sock *_sock) 1026 { 1027 struct spdk_posix_sock *sock = __posix_sock(_sock); 1028 struct sockaddr_storage sa; 1029 socklen_t salen; 1030 int rc; 1031 1032 assert(sock != NULL); 1033 1034 memset(&sa, 0, sizeof sa); 1035 salen = sizeof sa; 1036 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1037 if (rc != 0) { 1038 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1039 return false; 1040 } 1041 1042 return (sa.ss_family == AF_INET); 1043 } 1044 1045 static bool 1046 posix_sock_is_connected(struct spdk_sock *_sock) 1047 { 1048 struct spdk_posix_sock *sock = __posix_sock(_sock); 1049 uint8_t byte; 1050 int rc; 1051 1052 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1053 if (rc == 0) { 1054 return false; 1055 } 1056 1057 if (rc < 0) { 1058 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1059 return true; 1060 } 1061 1062 return false; 1063 } 1064 1065 return true; 1066 } 1067 1068 static int 1069 posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1070 { 1071 int rc = -1; 1072 1073 if (!g_spdk_posix_sock_impl_opts.enable_placement_id) { 1074 return rc; 1075 } 1076 1077 #if defined(SO_INCOMING_NAPI_ID) 1078 struct spdk_posix_sock *sock = __posix_sock(_sock); 1079 socklen_t salen = sizeof(int); 1080 1081 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1082 if (rc != 0) { 1083 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1084 } 1085 1086 #endif 1087 return rc; 1088 } 1089 1090 static struct spdk_sock_group_impl * 1091 posix_sock_group_impl_create(void) 1092 { 1093 struct spdk_posix_sock_group_impl *group_impl; 1094 int fd; 1095 1096 #if defined(__linux__) 1097 fd = epoll_create1(0); 1098 #elif defined(__FreeBSD__) 1099 fd = kqueue(); 1100 #endif 1101 if (fd == -1) { 1102 return NULL; 1103 } 1104 1105 group_impl = calloc(1, sizeof(*group_impl)); 1106 if (group_impl == NULL) { 1107 SPDK_ERRLOG("group_impl allocation failed\n"); 1108 close(fd); 1109 return NULL; 1110 } 1111 1112 group_impl->fd = fd; 1113 TAILQ_INIT(&group_impl->pending_recv); 1114 1115 return &group_impl->base; 1116 } 1117 1118 static int 1119 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1120 { 1121 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1122 struct spdk_posix_sock *sock = __posix_sock(_sock); 1123 int rc; 1124 1125 #if defined(__linux__) 1126 struct epoll_event event; 1127 1128 memset(&event, 0, sizeof(event)); 1129 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1130 event.events = EPOLLIN | EPOLLERR; 1131 event.data.ptr = sock; 1132 1133 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1134 #elif defined(__FreeBSD__) 1135 struct kevent event; 1136 struct timespec ts = {0}; 1137 1138 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1139 1140 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1141 #endif 1142 1143 /* switched from another polling group due to scheduling */ 1144 if (spdk_unlikely(sock->recv_pipe != NULL && 1145 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1146 assert(sock->pending_recv == false); 1147 sock->pending_recv = true; 1148 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1149 } 1150 1151 return rc; 1152 } 1153 1154 static int 1155 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1156 { 1157 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1158 struct spdk_posix_sock *sock = __posix_sock(_sock); 1159 int rc; 1160 1161 if (sock->recv_pipe != NULL) { 1162 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { 1163 TAILQ_REMOVE(&group->pending_recv, sock, link); 1164 sock->pending_recv = false; 1165 } 1166 assert(sock->pending_recv == false); 1167 } 1168 1169 #if defined(__linux__) 1170 struct epoll_event event; 1171 1172 /* Event parameter is ignored but some old kernel version still require it. */ 1173 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1174 #elif defined(__FreeBSD__) 1175 struct kevent event; 1176 struct timespec ts = {0}; 1177 1178 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1179 1180 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1181 if (rc == 0 && event.flags & EV_ERROR) { 1182 rc = -1; 1183 errno = event.data; 1184 } 1185 #endif 1186 1187 spdk_sock_abort_requests(_sock); 1188 1189 return rc; 1190 } 1191 1192 static int 1193 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1194 struct spdk_sock **socks) 1195 { 1196 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1197 struct spdk_sock *sock, *tmp; 1198 int num_events, i, rc; 1199 struct spdk_posix_sock *psock, *ptmp; 1200 #if defined(__linux__) 1201 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1202 #elif defined(__FreeBSD__) 1203 struct kevent events[MAX_EVENTS_PER_POLL]; 1204 struct timespec ts = {0}; 1205 #endif 1206 1207 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1208 * a completion callback could remove the sock from the 1209 * group. */ 1210 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1211 rc = _sock_flush(sock); 1212 if (rc) { 1213 spdk_sock_abort_requests(sock); 1214 } 1215 } 1216 1217 #if defined(__linux__) 1218 num_events = epoll_wait(group->fd, events, max_events, 0); 1219 #elif defined(__FreeBSD__) 1220 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1221 #endif 1222 1223 if (num_events == -1) { 1224 return -1; 1225 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 1226 uint8_t byte; 1227 1228 sock = TAILQ_FIRST(&_group->socks); 1229 psock = __posix_sock(sock); 1230 /* a recv is done here to busy poll the queue associated with 1231 * first socket in list and potentially reap incoming data. 1232 */ 1233 if (psock->so_priority) { 1234 recv(psock->fd, &byte, 1, MSG_PEEK); 1235 } 1236 } 1237 1238 for (i = 0; i < num_events; i++) { 1239 #if defined(__linux__) 1240 sock = events[i].data.ptr; 1241 psock = __posix_sock(sock); 1242 1243 #ifdef SPDK_ZEROCOPY 1244 if (events[i].events & EPOLLERR) { 1245 rc = _sock_check_zcopy(sock); 1246 /* If the socket was closed or removed from 1247 * the group in response to a send ack, don't 1248 * add it to the array here. */ 1249 if (rc || sock->cb_fn == NULL) { 1250 continue; 1251 } 1252 } 1253 #endif 1254 if ((events[i].events & EPOLLIN) == 0) { 1255 continue; 1256 } 1257 1258 #elif defined(__FreeBSD__) 1259 sock = events[i].udata; 1260 psock = __posix_sock(sock); 1261 #endif 1262 1263 /* If the socket does not already have recv pending, add it now */ 1264 if (!psock->pending_recv) { 1265 psock->pending_recv = true; 1266 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1267 } 1268 } 1269 1270 num_events = 0; 1271 1272 TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) { 1273 if (num_events == max_events) { 1274 break; 1275 } 1276 1277 socks[num_events++] = &psock->base; 1278 } 1279 1280 /* Cycle the pending_recv list so that each time we poll things aren't 1281 * in the same order. */ 1282 for (i = 0; i < num_events; i++) { 1283 psock = __posix_sock(socks[i]); 1284 1285 TAILQ_REMOVE(&group->pending_recv, psock, link); 1286 1287 if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) { 1288 psock->pending_recv = false; 1289 } else { 1290 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1291 } 1292 1293 } 1294 1295 return num_events; 1296 } 1297 1298 static int 1299 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1300 { 1301 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1302 int rc; 1303 1304 rc = close(group->fd); 1305 free(group); 1306 return rc; 1307 } 1308 1309 static int 1310 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1311 { 1312 if (!opts || !len) { 1313 errno = EINVAL; 1314 return -1; 1315 } 1316 memset(opts, 0, *len); 1317 1318 #define FIELD_OK(field) \ 1319 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1320 1321 #define GET_FIELD(field) \ 1322 if (FIELD_OK(field)) { \ 1323 opts->field = g_spdk_posix_sock_impl_opts.field; \ 1324 } 1325 1326 GET_FIELD(recv_buf_size); 1327 GET_FIELD(send_buf_size); 1328 GET_FIELD(enable_recv_pipe); 1329 GET_FIELD(enable_zerocopy_send); 1330 GET_FIELD(enable_quickack); 1331 GET_FIELD(enable_placement_id); 1332 1333 #undef GET_FIELD 1334 #undef FIELD_OK 1335 1336 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 1337 return 0; 1338 } 1339 1340 static int 1341 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1342 { 1343 if (!opts) { 1344 errno = EINVAL; 1345 return -1; 1346 } 1347 1348 #define FIELD_OK(field) \ 1349 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1350 1351 #define SET_FIELD(field) \ 1352 if (FIELD_OK(field)) { \ 1353 g_spdk_posix_sock_impl_opts.field = opts->field; \ 1354 } 1355 1356 SET_FIELD(recv_buf_size); 1357 SET_FIELD(send_buf_size); 1358 SET_FIELD(enable_recv_pipe); 1359 SET_FIELD(enable_zerocopy_send); 1360 SET_FIELD(enable_quickack); 1361 SET_FIELD(enable_placement_id); 1362 1363 #undef SET_FIELD 1364 #undef FIELD_OK 1365 1366 return 0; 1367 } 1368 1369 1370 static struct spdk_net_impl g_posix_net_impl = { 1371 .name = "posix", 1372 .getaddr = posix_sock_getaddr, 1373 .connect = posix_sock_connect, 1374 .listen = posix_sock_listen, 1375 .accept = posix_sock_accept, 1376 .close = posix_sock_close, 1377 .recv = posix_sock_recv, 1378 .readv = posix_sock_readv, 1379 .writev = posix_sock_writev, 1380 .writev_async = posix_sock_writev_async, 1381 .flush = posix_sock_flush, 1382 .set_recvlowat = posix_sock_set_recvlowat, 1383 .set_recvbuf = posix_sock_set_recvbuf, 1384 .set_sendbuf = posix_sock_set_sendbuf, 1385 .is_ipv6 = posix_sock_is_ipv6, 1386 .is_ipv4 = posix_sock_is_ipv4, 1387 .is_connected = posix_sock_is_connected, 1388 .get_placement_id = posix_sock_get_placement_id, 1389 .group_impl_create = posix_sock_group_impl_create, 1390 .group_impl_add_sock = posix_sock_group_impl_add_sock, 1391 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 1392 .group_impl_poll = posix_sock_group_impl_poll, 1393 .group_impl_close = posix_sock_group_impl_close, 1394 .get_opts = posix_sock_impl_get_opts, 1395 .set_opts = posix_sock_impl_set_opts, 1396 }; 1397 1398 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1399