1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #if defined(__linux__) 37 #include <sys/epoll.h> 38 #include <linux/errqueue.h> 39 #elif defined(__FreeBSD__) 40 #include <sys/event.h> 41 #endif 42 43 #include "spdk/log.h" 44 #include "spdk/pipe.h" 45 #include "spdk/sock.h" 46 #include "spdk/util.h" 47 #include "spdk/likely.h" 48 #include "spdk_internal/sock.h" 49 50 #define MAX_TMPBUF 1024 51 #define PORTNUMLEN 32 52 #define MIN_SO_RCVBUF_SIZE (2 * 1024 * 1024) 53 #define MIN_SO_SNDBUF_SIZE (2 * 1024 * 1024) 54 #define IOV_BATCH_SIZE 64 55 56 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 57 #define SPDK_ZEROCOPY 58 #endif 59 60 struct spdk_posix_sock { 61 struct spdk_sock base; 62 int fd; 63 64 uint32_t sendmsg_idx; 65 bool zcopy; 66 67 struct spdk_pipe *recv_pipe; 68 void *recv_buf; 69 int recv_buf_sz; 70 bool pending_recv; 71 int so_priority; 72 73 TAILQ_ENTRY(spdk_posix_sock) link; 74 }; 75 76 struct spdk_posix_sock_group_impl { 77 struct spdk_sock_group_impl base; 78 int fd; 79 TAILQ_HEAD(, spdk_posix_sock) pending_recv; 80 }; 81 82 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 83 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 84 .send_buf_size = MIN_SO_SNDBUF_SIZE 85 }; 86 87 static int 88 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 89 { 90 const char *result = NULL; 91 92 if (sa == NULL || host == NULL) { 93 return -1; 94 } 95 96 switch (sa->sa_family) { 97 case AF_INET: 98 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 99 host, hlen); 100 break; 101 case AF_INET6: 102 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 103 host, hlen); 104 break; 105 default: 106 break; 107 } 108 109 if (result != NULL) { 110 return 0; 111 } else { 112 return -1; 113 } 114 } 115 116 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 117 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 118 119 static int 120 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 121 char *caddr, int clen, uint16_t *cport) 122 { 123 struct spdk_posix_sock *sock = __posix_sock(_sock); 124 struct sockaddr_storage sa; 125 socklen_t salen; 126 int rc; 127 128 assert(sock != NULL); 129 130 memset(&sa, 0, sizeof sa); 131 salen = sizeof sa; 132 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 133 if (rc != 0) { 134 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 135 return -1; 136 } 137 138 switch (sa.ss_family) { 139 case AF_UNIX: 140 /* Acceptable connection types that don't have IPs */ 141 return 0; 142 case AF_INET: 143 case AF_INET6: 144 /* Code below will get IP addresses */ 145 break; 146 default: 147 /* Unsupported socket family */ 148 return -1; 149 } 150 151 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 152 if (rc != 0) { 153 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 154 return -1; 155 } 156 157 if (sport) { 158 if (sa.ss_family == AF_INET) { 159 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 160 } else if (sa.ss_family == AF_INET6) { 161 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 162 } 163 } 164 165 memset(&sa, 0, sizeof sa); 166 salen = sizeof sa; 167 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 168 if (rc != 0) { 169 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 170 return -1; 171 } 172 173 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 174 if (rc != 0) { 175 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 176 return -1; 177 } 178 179 if (cport) { 180 if (sa.ss_family == AF_INET) { 181 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 182 } else if (sa.ss_family == AF_INET6) { 183 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 184 } 185 } 186 187 return 0; 188 } 189 190 enum posix_sock_create_type { 191 SPDK_SOCK_CREATE_LISTEN, 192 SPDK_SOCK_CREATE_CONNECT, 193 }; 194 195 static int 196 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 197 { 198 uint8_t *new_buf; 199 struct spdk_pipe *new_pipe; 200 struct iovec siov[2]; 201 struct iovec diov[2]; 202 int sbytes; 203 ssize_t bytes; 204 205 if (sock->recv_buf_sz == sz) { 206 return 0; 207 } 208 209 /* If the new size is 0, just free the pipe */ 210 if (sz == 0) { 211 spdk_pipe_destroy(sock->recv_pipe); 212 free(sock->recv_buf); 213 sock->recv_pipe = NULL; 214 sock->recv_buf = NULL; 215 return 0; 216 } else if (sz < MIN_SOCK_PIPE_SIZE) { 217 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 218 return -1; 219 } 220 221 /* Round up to next 64 byte multiple */ 222 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 223 if (!new_buf) { 224 SPDK_ERRLOG("socket recv buf allocation failed\n"); 225 return -ENOMEM; 226 } 227 228 new_pipe = spdk_pipe_create(new_buf, sz + 1); 229 if (new_pipe == NULL) { 230 SPDK_ERRLOG("socket pipe allocation failed\n"); 231 free(new_buf); 232 return -ENOMEM; 233 } 234 235 if (sock->recv_pipe != NULL) { 236 /* Pull all of the data out of the old pipe */ 237 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 238 if (sbytes > sz) { 239 /* Too much data to fit into the new pipe size */ 240 spdk_pipe_destroy(new_pipe); 241 free(new_buf); 242 return -EINVAL; 243 } 244 245 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 246 assert(sbytes == sz); 247 248 bytes = spdk_iovcpy(siov, 2, diov, 2); 249 spdk_pipe_writer_advance(new_pipe, bytes); 250 251 spdk_pipe_destroy(sock->recv_pipe); 252 free(sock->recv_buf); 253 } 254 255 sock->recv_buf_sz = sz; 256 sock->recv_buf = new_buf; 257 sock->recv_pipe = new_pipe; 258 259 return 0; 260 } 261 262 static int 263 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 264 { 265 struct spdk_posix_sock *sock = __posix_sock(_sock); 266 int rc; 267 268 assert(sock != NULL); 269 270 rc = posix_sock_alloc_pipe(sock, sz); 271 if (rc) { 272 return rc; 273 } 274 275 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ 276 if (sz < MIN_SO_RCVBUF_SIZE) { 277 sz = MIN_SO_RCVBUF_SIZE; 278 } 279 280 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 281 if (rc < 0) { 282 return rc; 283 } 284 285 return 0; 286 } 287 288 static int 289 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 290 { 291 struct spdk_posix_sock *sock = __posix_sock(_sock); 292 int rc; 293 294 assert(sock != NULL); 295 296 if (sz < MIN_SO_SNDBUF_SIZE) { 297 sz = MIN_SO_SNDBUF_SIZE; 298 } 299 300 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 301 if (rc < 0) { 302 return rc; 303 } 304 305 return 0; 306 } 307 308 static struct spdk_posix_sock * 309 posix_sock_alloc(int fd, bool enable_zero_copy) 310 { 311 struct spdk_posix_sock *sock; 312 #ifdef SPDK_ZEROCOPY 313 int rc; 314 int flag; 315 #endif 316 317 sock = calloc(1, sizeof(*sock)); 318 if (sock == NULL) { 319 SPDK_ERRLOG("sock allocation failed\n"); 320 return NULL; 321 } 322 323 sock->fd = fd; 324 325 #ifdef SPDK_ZEROCOPY 326 if (!enable_zero_copy) { 327 return sock; 328 } 329 330 /* Try to turn on zero copy sends */ 331 flag = 1; 332 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 333 if (rc == 0) { 334 sock->zcopy = true; 335 } 336 #endif 337 338 return sock; 339 } 340 341 static bool 342 sock_is_loopback(int fd) 343 { 344 struct ifaddrs *addrs, *tmp; 345 struct sockaddr_storage sa = {}; 346 socklen_t salen; 347 struct ifreq ifr = {}; 348 char ip_addr[256], ip_addr_tmp[256]; 349 int rc; 350 bool is_loopback = false; 351 352 salen = sizeof(sa); 353 rc = getsockname(fd, (struct sockaddr *)&sa, &salen); 354 if (rc != 0) { 355 return is_loopback; 356 } 357 358 memset(ip_addr, 0, sizeof(ip_addr)); 359 rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr)); 360 if (rc != 0) { 361 return is_loopback; 362 } 363 364 getifaddrs(&addrs); 365 for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) { 366 if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) && 367 (tmp->ifa_addr->sa_family == sa.ss_family)) { 368 memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp)); 369 rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp)); 370 if (rc != 0) { 371 continue; 372 } 373 374 if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) { 375 memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name)); 376 ioctl(fd, SIOCGIFFLAGS, &ifr); 377 if (ifr.ifr_flags & IFF_LOOPBACK) { 378 is_loopback = true; 379 } 380 goto end; 381 } 382 } 383 } 384 385 end: 386 freeifaddrs(addrs); 387 return is_loopback; 388 } 389 390 static struct spdk_sock * 391 posix_sock_create(const char *ip, int port, 392 enum posix_sock_create_type type, 393 struct spdk_sock_opts *opts) 394 { 395 struct spdk_posix_sock *sock; 396 char buf[MAX_TMPBUF]; 397 char portnum[PORTNUMLEN]; 398 char *p; 399 struct addrinfo hints, *res, *res0; 400 int fd, flag; 401 int val = 1; 402 int rc, sz; 403 bool enable_zero_copy = true; 404 405 if (ip == NULL) { 406 return NULL; 407 } 408 if (ip[0] == '[') { 409 snprintf(buf, sizeof(buf), "%s", ip + 1); 410 p = strchr(buf, ']'); 411 if (p != NULL) { 412 *p = '\0'; 413 } 414 ip = (const char *) &buf[0]; 415 } 416 417 snprintf(portnum, sizeof portnum, "%d", port); 418 memset(&hints, 0, sizeof hints); 419 hints.ai_family = PF_UNSPEC; 420 hints.ai_socktype = SOCK_STREAM; 421 hints.ai_flags = AI_NUMERICSERV; 422 hints.ai_flags |= AI_PASSIVE; 423 hints.ai_flags |= AI_NUMERICHOST; 424 rc = getaddrinfo(ip, portnum, &hints, &res0); 425 if (rc != 0) { 426 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 427 return NULL; 428 } 429 430 /* try listen */ 431 fd = -1; 432 for (res = res0; res != NULL; res = res->ai_next) { 433 retry: 434 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 435 if (fd < 0) { 436 /* error */ 437 continue; 438 } 439 440 sz = g_spdk_posix_sock_impl_opts.recv_buf_size; 441 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 442 if (rc) { 443 /* Not fatal */ 444 } 445 446 sz = g_spdk_posix_sock_impl_opts.send_buf_size; 447 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 448 if (rc) { 449 /* Not fatal */ 450 } 451 452 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 453 if (rc != 0) { 454 close(fd); 455 /* error */ 456 continue; 457 } 458 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 459 if (rc != 0) { 460 close(fd); 461 /* error */ 462 continue; 463 } 464 465 #if defined(SO_PRIORITY) 466 if (opts != NULL && opts->priority) { 467 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 468 if (rc != 0) { 469 close(fd); 470 /* error */ 471 continue; 472 } 473 } 474 #endif 475 476 if (res->ai_family == AF_INET6) { 477 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 478 if (rc != 0) { 479 close(fd); 480 /* error */ 481 continue; 482 } 483 } 484 485 if (type == SPDK_SOCK_CREATE_LISTEN) { 486 rc = bind(fd, res->ai_addr, res->ai_addrlen); 487 if (rc != 0) { 488 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 489 switch (errno) { 490 case EINTR: 491 /* interrupted? */ 492 close(fd); 493 goto retry; 494 case EADDRNOTAVAIL: 495 SPDK_ERRLOG("IP address %s not available. " 496 "Verify IP address in config file " 497 "and make sure setup script is " 498 "run before starting spdk app.\n", ip); 499 /* FALLTHROUGH */ 500 default: 501 /* try next family */ 502 close(fd); 503 fd = -1; 504 continue; 505 } 506 } 507 /* bind OK */ 508 rc = listen(fd, 512); 509 if (rc != 0) { 510 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 511 close(fd); 512 fd = -1; 513 break; 514 } 515 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 516 rc = connect(fd, res->ai_addr, res->ai_addrlen); 517 if (rc != 0) { 518 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 519 /* try next family */ 520 close(fd); 521 fd = -1; 522 continue; 523 } 524 } 525 526 flag = fcntl(fd, F_GETFL); 527 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 528 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 529 close(fd); 530 fd = -1; 531 break; 532 } 533 break; 534 } 535 freeaddrinfo(res0); 536 537 if (fd < 0) { 538 return NULL; 539 } 540 541 if (type == SPDK_SOCK_CREATE_LISTEN) { 542 /* Only enable zero copy for non-loopback sockets. */ 543 enable_zero_copy = !sock_is_loopback(fd); 544 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 545 /* Disable zero copy for client sockets until support is added */ 546 enable_zero_copy = false; 547 } 548 549 sock = posix_sock_alloc(fd, enable_zero_copy); 550 if (sock == NULL) { 551 SPDK_ERRLOG("sock allocation failed\n"); 552 close(fd); 553 return NULL; 554 } 555 556 if (opts != NULL) { 557 sock->so_priority = opts->priority; 558 } 559 return &sock->base; 560 } 561 562 static struct spdk_sock * 563 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 564 { 565 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 566 } 567 568 static struct spdk_sock * 569 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 570 { 571 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 572 } 573 574 static struct spdk_sock * 575 posix_sock_accept(struct spdk_sock *_sock) 576 { 577 struct spdk_posix_sock *sock = __posix_sock(_sock); 578 struct sockaddr_storage sa; 579 socklen_t salen; 580 int rc, fd; 581 struct spdk_posix_sock *new_sock; 582 int flag; 583 584 memset(&sa, 0, sizeof(sa)); 585 salen = sizeof(sa); 586 587 assert(sock != NULL); 588 589 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 590 591 if (rc == -1) { 592 return NULL; 593 } 594 595 fd = rc; 596 597 flag = fcntl(fd, F_GETFL); 598 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 599 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 600 close(fd); 601 return NULL; 602 } 603 604 #if defined(SO_PRIORITY) 605 /* The priority is not inherited, so call this function again */ 606 if (sock->base.opts.priority) { 607 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 608 if (rc != 0) { 609 close(fd); 610 return NULL; 611 } 612 } 613 #endif 614 615 /* Inherit the zero copy feature from the listen socket */ 616 new_sock = posix_sock_alloc(fd, sock->zcopy); 617 if (new_sock == NULL) { 618 close(fd); 619 return NULL; 620 } 621 new_sock->so_priority = sock->base.opts.priority; 622 623 return &new_sock->base; 624 } 625 626 static int 627 posix_sock_close(struct spdk_sock *_sock) 628 { 629 struct spdk_posix_sock *sock = __posix_sock(_sock); 630 631 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 632 633 /* If the socket fails to close, the best choice is to 634 * leak the fd but continue to free the rest of the sock 635 * memory. */ 636 close(sock->fd); 637 638 spdk_pipe_destroy(sock->recv_pipe); 639 free(sock->recv_buf); 640 free(sock); 641 642 return 0; 643 } 644 645 #ifdef SPDK_ZEROCOPY 646 static int 647 _sock_check_zcopy(struct spdk_sock *sock) 648 { 649 struct spdk_posix_sock *psock = __posix_sock(sock); 650 struct msghdr msgh = {}; 651 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 652 ssize_t rc; 653 struct sock_extended_err *serr; 654 struct cmsghdr *cm; 655 uint32_t idx; 656 struct spdk_sock_request *req, *treq; 657 bool found; 658 659 msgh.msg_control = buf; 660 msgh.msg_controllen = sizeof(buf); 661 662 while (true) { 663 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 664 665 if (rc < 0) { 666 if (errno == EWOULDBLOCK || errno == EAGAIN) { 667 return 0; 668 } 669 670 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 671 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 672 } else { 673 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 674 } 675 return 0; 676 } 677 678 cm = CMSG_FIRSTHDR(&msgh); 679 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 680 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 681 return 0; 682 } 683 684 serr = (struct sock_extended_err *)CMSG_DATA(cm); 685 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 686 SPDK_WARNLOG("Unexpected extended error origin\n"); 687 return 0; 688 } 689 690 /* Most of the time, the pending_reqs array is in the exact 691 * order we need such that all of the requests to complete are 692 * in order, in the front. It is guaranteed that all requests 693 * belonging to the same sendmsg call are sequential, so once 694 * we encounter one match we can stop looping as soon as a 695 * non-match is found. 696 */ 697 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 698 found = false; 699 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 700 if (req->internal.offset == idx) { 701 found = true; 702 703 rc = spdk_sock_request_put(sock, req, 0); 704 if (rc < 0) { 705 return rc; 706 } 707 708 } else if (found) { 709 break; 710 } 711 } 712 713 } 714 } 715 716 return 0; 717 } 718 #endif 719 720 static int 721 _sock_flush(struct spdk_sock *sock) 722 { 723 struct spdk_posix_sock *psock = __posix_sock(sock); 724 struct msghdr msg = {}; 725 int flags; 726 struct iovec iovs[IOV_BATCH_SIZE]; 727 int iovcnt; 728 int retval; 729 struct spdk_sock_request *req; 730 int i; 731 ssize_t rc; 732 unsigned int offset; 733 size_t len; 734 735 /* Can't flush from within a callback or we end up with recursive calls */ 736 if (sock->cb_cnt > 0) { 737 return 0; 738 } 739 740 /* Gather an iov */ 741 iovcnt = 0; 742 req = TAILQ_FIRST(&sock->queued_reqs); 743 while (req) { 744 offset = req->internal.offset; 745 746 for (i = 0; i < req->iovcnt; i++) { 747 /* Consume any offset first */ 748 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 749 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 750 continue; 751 } 752 753 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 754 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 755 iovcnt++; 756 757 offset = 0; 758 759 if (iovcnt >= IOV_BATCH_SIZE) { 760 break; 761 } 762 } 763 764 if (iovcnt >= IOV_BATCH_SIZE) { 765 break; 766 } 767 768 req = TAILQ_NEXT(req, internal.link); 769 } 770 771 if (iovcnt == 0) { 772 return 0; 773 } 774 775 /* Perform the vectored write */ 776 msg.msg_iov = iovs; 777 msg.msg_iovlen = iovcnt; 778 #ifdef SPDK_ZEROCOPY 779 if (psock->zcopy) { 780 flags = MSG_ZEROCOPY; 781 } else 782 #endif 783 { 784 flags = 0; 785 } 786 rc = sendmsg(psock->fd, &msg, flags); 787 if (rc <= 0) { 788 if (errno == EAGAIN || errno == EWOULDBLOCK) { 789 return 0; 790 } 791 return rc; 792 } 793 794 psock->sendmsg_idx++; 795 796 /* Consume the requests that were actually written */ 797 req = TAILQ_FIRST(&sock->queued_reqs); 798 while (req) { 799 offset = req->internal.offset; 800 801 for (i = 0; i < req->iovcnt; i++) { 802 /* Advance by the offset first */ 803 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 804 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 805 continue; 806 } 807 808 /* Calculate the remaining length of this element */ 809 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 810 811 if (len > (size_t)rc) { 812 /* This element was partially sent. */ 813 req->internal.offset += rc; 814 return 0; 815 } 816 817 offset = 0; 818 req->internal.offset += len; 819 rc -= len; 820 } 821 822 /* Handled a full request. */ 823 spdk_sock_request_pend(sock, req); 824 825 if (!psock->zcopy) { 826 /* The sendmsg syscall above isn't currently asynchronous, 827 * so it's already done. */ 828 retval = spdk_sock_request_put(sock, req, 0); 829 if (retval) { 830 break; 831 } 832 } else { 833 /* Re-use the offset field to hold the sendmsg call index. The 834 * index is 0 based, so subtract one here because we've already 835 * incremented above. */ 836 req->internal.offset = psock->sendmsg_idx - 1; 837 } 838 839 if (rc == 0) { 840 break; 841 } 842 843 req = TAILQ_FIRST(&sock->queued_reqs); 844 } 845 846 return 0; 847 } 848 849 static int 850 posix_sock_flush(struct spdk_sock *_sock) 851 { 852 return _sock_flush(_sock); 853 } 854 855 static ssize_t 856 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 857 { 858 struct iovec siov[2]; 859 int sbytes; 860 ssize_t bytes; 861 struct spdk_posix_sock_group_impl *group; 862 863 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 864 if (sbytes < 0) { 865 errno = EINVAL; 866 return -1; 867 } else if (sbytes == 0) { 868 errno = EAGAIN; 869 return -1; 870 } 871 872 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 873 874 if (bytes == 0) { 875 /* The only way this happens is if diov is 0 length */ 876 errno = EINVAL; 877 return -1; 878 } 879 880 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 881 882 /* If we drained the pipe, take it off the level-triggered list */ 883 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 884 group = __posix_group_impl(sock->base.group_impl); 885 TAILQ_REMOVE(&group->pending_recv, sock, link); 886 sock->pending_recv = false; 887 } 888 889 return bytes; 890 } 891 892 static inline ssize_t 893 posix_sock_read(struct spdk_posix_sock *sock) 894 { 895 struct iovec iov[2]; 896 int bytes; 897 struct spdk_posix_sock_group_impl *group; 898 899 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 900 901 if (bytes > 0) { 902 bytes = readv(sock->fd, iov, 2); 903 if (bytes > 0) { 904 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 905 if (sock->base.group_impl) { 906 group = __posix_group_impl(sock->base.group_impl); 907 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 908 sock->pending_recv = true; 909 } 910 } 911 } 912 913 return bytes; 914 } 915 916 static ssize_t 917 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 918 { 919 struct spdk_posix_sock *sock = __posix_sock(_sock); 920 int rc, i; 921 size_t len; 922 923 if (sock->recv_pipe == NULL) { 924 return readv(sock->fd, iov, iovcnt); 925 } 926 927 len = 0; 928 for (i = 0; i < iovcnt; i++) { 929 len += iov[i].iov_len; 930 } 931 932 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 933 /* If the user is receiving a sufficiently large amount of data, 934 * receive directly to their buffers. */ 935 if (len >= MIN_SOCK_PIPE_SIZE) { 936 return readv(sock->fd, iov, iovcnt); 937 } 938 939 /* Otherwise, do a big read into our pipe */ 940 rc = posix_sock_read(sock); 941 if (rc <= 0) { 942 return rc; 943 } 944 } 945 946 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 947 } 948 949 static ssize_t 950 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 951 { 952 struct iovec iov[1]; 953 954 iov[0].iov_base = buf; 955 iov[0].iov_len = len; 956 957 return posix_sock_readv(sock, iov, 1); 958 } 959 960 static ssize_t 961 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 962 { 963 struct spdk_posix_sock *sock = __posix_sock(_sock); 964 int rc; 965 966 /* In order to process a writev, we need to flush any asynchronous writes 967 * first. */ 968 rc = _sock_flush(_sock); 969 if (rc < 0) { 970 return rc; 971 } 972 973 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 974 /* We weren't able to flush all requests */ 975 errno = EAGAIN; 976 return -1; 977 } 978 979 return writev(sock->fd, iov, iovcnt); 980 } 981 982 static void 983 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 984 { 985 int rc; 986 987 spdk_sock_request_queue(sock, req); 988 989 /* If there are a sufficient number queued, just flush them out immediately. */ 990 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 991 rc = _sock_flush(sock); 992 if (rc) { 993 spdk_sock_abort_requests(sock); 994 } 995 } 996 } 997 998 static int 999 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1000 { 1001 struct spdk_posix_sock *sock = __posix_sock(_sock); 1002 int val; 1003 int rc; 1004 1005 assert(sock != NULL); 1006 1007 val = nbytes; 1008 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1009 if (rc != 0) { 1010 return -1; 1011 } 1012 return 0; 1013 } 1014 1015 static bool 1016 posix_sock_is_ipv6(struct spdk_sock *_sock) 1017 { 1018 struct spdk_posix_sock *sock = __posix_sock(_sock); 1019 struct sockaddr_storage sa; 1020 socklen_t salen; 1021 int rc; 1022 1023 assert(sock != NULL); 1024 1025 memset(&sa, 0, sizeof sa); 1026 salen = sizeof sa; 1027 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1028 if (rc != 0) { 1029 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1030 return false; 1031 } 1032 1033 return (sa.ss_family == AF_INET6); 1034 } 1035 1036 static bool 1037 posix_sock_is_ipv4(struct spdk_sock *_sock) 1038 { 1039 struct spdk_posix_sock *sock = __posix_sock(_sock); 1040 struct sockaddr_storage sa; 1041 socklen_t salen; 1042 int rc; 1043 1044 assert(sock != NULL); 1045 1046 memset(&sa, 0, sizeof sa); 1047 salen = sizeof sa; 1048 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1049 if (rc != 0) { 1050 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1051 return false; 1052 } 1053 1054 return (sa.ss_family == AF_INET); 1055 } 1056 1057 static bool 1058 posix_sock_is_connected(struct spdk_sock *_sock) 1059 { 1060 struct spdk_posix_sock *sock = __posix_sock(_sock); 1061 uint8_t byte; 1062 int rc; 1063 1064 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1065 if (rc == 0) { 1066 return false; 1067 } 1068 1069 if (rc < 0) { 1070 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1071 return true; 1072 } 1073 1074 return false; 1075 } 1076 1077 return true; 1078 } 1079 1080 static int 1081 posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1082 { 1083 int rc = -1; 1084 1085 #if defined(SO_INCOMING_NAPI_ID) 1086 struct spdk_posix_sock *sock = __posix_sock(_sock); 1087 socklen_t salen = sizeof(int); 1088 1089 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1090 if (rc != 0) { 1091 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1092 } 1093 1094 #endif 1095 return rc; 1096 } 1097 1098 static struct spdk_sock_group_impl * 1099 posix_sock_group_impl_create(void) 1100 { 1101 struct spdk_posix_sock_group_impl *group_impl; 1102 int fd; 1103 1104 #if defined(__linux__) 1105 fd = epoll_create1(0); 1106 #elif defined(__FreeBSD__) 1107 fd = kqueue(); 1108 #endif 1109 if (fd == -1) { 1110 return NULL; 1111 } 1112 1113 group_impl = calloc(1, sizeof(*group_impl)); 1114 if (group_impl == NULL) { 1115 SPDK_ERRLOG("group_impl allocation failed\n"); 1116 close(fd); 1117 return NULL; 1118 } 1119 1120 group_impl->fd = fd; 1121 TAILQ_INIT(&group_impl->pending_recv); 1122 1123 return &group_impl->base; 1124 } 1125 1126 static int 1127 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1128 { 1129 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1130 struct spdk_posix_sock *sock = __posix_sock(_sock); 1131 int rc; 1132 1133 #if defined(__linux__) 1134 struct epoll_event event; 1135 1136 memset(&event, 0, sizeof(event)); 1137 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1138 event.events = EPOLLIN | EPOLLERR; 1139 event.data.ptr = sock; 1140 1141 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1142 #elif defined(__FreeBSD__) 1143 struct kevent event; 1144 struct timespec ts = {0}; 1145 1146 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1147 1148 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1149 #endif 1150 1151 /* switched from another polling group due to scheduling */ 1152 if (spdk_unlikely(sock->recv_pipe != NULL && 1153 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1154 assert(sock->pending_recv == false); 1155 sock->pending_recv = true; 1156 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1157 } 1158 1159 return rc; 1160 } 1161 1162 static int 1163 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1164 { 1165 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1166 struct spdk_posix_sock *sock = __posix_sock(_sock); 1167 int rc; 1168 1169 if (sock->recv_pipe != NULL) { 1170 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { 1171 TAILQ_REMOVE(&group->pending_recv, sock, link); 1172 sock->pending_recv = false; 1173 } 1174 assert(sock->pending_recv == false); 1175 } 1176 1177 #if defined(__linux__) 1178 struct epoll_event event; 1179 1180 /* Event parameter is ignored but some old kernel version still require it. */ 1181 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1182 #elif defined(__FreeBSD__) 1183 struct kevent event; 1184 struct timespec ts = {0}; 1185 1186 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1187 1188 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1189 if (rc == 0 && event.flags & EV_ERROR) { 1190 rc = -1; 1191 errno = event.data; 1192 } 1193 #endif 1194 1195 spdk_sock_abort_requests(_sock); 1196 1197 return rc; 1198 } 1199 1200 static int 1201 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1202 struct spdk_sock **socks) 1203 { 1204 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1205 struct spdk_sock *sock, *tmp; 1206 int num_events, i, rc; 1207 struct spdk_posix_sock *psock, *ptmp; 1208 #if defined(__linux__) 1209 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1210 #elif defined(__FreeBSD__) 1211 struct kevent events[MAX_EVENTS_PER_POLL]; 1212 struct timespec ts = {0}; 1213 #endif 1214 1215 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1216 * a completion callback could remove the sock from the 1217 * group. */ 1218 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1219 rc = _sock_flush(sock); 1220 if (rc) { 1221 spdk_sock_abort_requests(sock); 1222 } 1223 } 1224 1225 #if defined(__linux__) 1226 num_events = epoll_wait(group->fd, events, max_events, 0); 1227 #elif defined(__FreeBSD__) 1228 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1229 #endif 1230 1231 if (num_events == -1) { 1232 return -1; 1233 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 1234 uint8_t byte; 1235 1236 sock = TAILQ_FIRST(&_group->socks); 1237 psock = __posix_sock(sock); 1238 /* a recv is done here to busy poll the queue associated with 1239 * first socket in list and potentially reap incoming data. 1240 */ 1241 if (psock->so_priority) { 1242 recv(psock->fd, &byte, 1, MSG_PEEK); 1243 } 1244 } 1245 1246 for (i = 0; i < num_events; i++) { 1247 #if defined(__linux__) 1248 sock = events[i].data.ptr; 1249 psock = __posix_sock(sock); 1250 1251 #ifdef SPDK_ZEROCOPY 1252 if (events[i].events & EPOLLERR) { 1253 rc = _sock_check_zcopy(sock); 1254 /* If the socket was closed or removed from 1255 * the group in response to a send ack, don't 1256 * add it to the array here. */ 1257 if (rc || sock->cb_fn == NULL) { 1258 continue; 1259 } 1260 } 1261 #endif 1262 if ((events[i].events & EPOLLIN) == 0) { 1263 continue; 1264 } 1265 1266 #elif defined(__FreeBSD__) 1267 sock = events[i].udata; 1268 psock = __posix_sock(sock); 1269 #endif 1270 1271 /* If the socket does not already have recv pending, add it now */ 1272 if (!psock->pending_recv) { 1273 psock->pending_recv = true; 1274 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1275 } 1276 } 1277 1278 num_events = 0; 1279 1280 TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) { 1281 if (num_events == max_events) { 1282 break; 1283 } 1284 1285 socks[num_events++] = &psock->base; 1286 } 1287 1288 /* Cycle the pending_recv list so that each time we poll things aren't 1289 * in the same order. */ 1290 for (i = 0; i < num_events; i++) { 1291 psock = __posix_sock(socks[i]); 1292 1293 TAILQ_REMOVE(&group->pending_recv, psock, link); 1294 1295 if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) { 1296 psock->pending_recv = false; 1297 } else { 1298 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1299 } 1300 1301 } 1302 1303 return num_events; 1304 } 1305 1306 static int 1307 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1308 { 1309 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1310 int rc; 1311 1312 rc = close(group->fd); 1313 free(group); 1314 return rc; 1315 } 1316 1317 static int 1318 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1319 { 1320 if (!opts || !len) { 1321 errno = EINVAL; 1322 return -1; 1323 } 1324 1325 #define FIELD_OK(field) \ 1326 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1327 1328 if (FIELD_OK(recv_buf_size)) { 1329 opts->recv_buf_size = g_spdk_posix_sock_impl_opts.recv_buf_size; 1330 } 1331 if (FIELD_OK(send_buf_size)) { 1332 opts->send_buf_size = g_spdk_posix_sock_impl_opts.send_buf_size; 1333 } 1334 1335 #undef FIELD_OK 1336 1337 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 1338 return 0; 1339 } 1340 1341 static int 1342 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1343 { 1344 if (!opts) { 1345 errno = EINVAL; 1346 return -1; 1347 } 1348 1349 #define FIELD_OK(field) \ 1350 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1351 1352 if (FIELD_OK(recv_buf_size)) { 1353 g_spdk_posix_sock_impl_opts.recv_buf_size = opts->recv_buf_size; 1354 } 1355 if (FIELD_OK(send_buf_size)) { 1356 g_spdk_posix_sock_impl_opts.send_buf_size = opts->send_buf_size; 1357 } 1358 1359 #undef FIELD_OK 1360 1361 return 0; 1362 } 1363 1364 1365 static struct spdk_net_impl g_posix_net_impl = { 1366 .name = "posix", 1367 .getaddr = posix_sock_getaddr, 1368 .connect = posix_sock_connect, 1369 .listen = posix_sock_listen, 1370 .accept = posix_sock_accept, 1371 .close = posix_sock_close, 1372 .recv = posix_sock_recv, 1373 .readv = posix_sock_readv, 1374 .writev = posix_sock_writev, 1375 .writev_async = posix_sock_writev_async, 1376 .flush = posix_sock_flush, 1377 .set_recvlowat = posix_sock_set_recvlowat, 1378 .set_recvbuf = posix_sock_set_recvbuf, 1379 .set_sendbuf = posix_sock_set_sendbuf, 1380 .is_ipv6 = posix_sock_is_ipv6, 1381 .is_ipv4 = posix_sock_is_ipv4, 1382 .is_connected = posix_sock_is_connected, 1383 .get_placement_id = posix_sock_get_placement_id, 1384 .group_impl_create = posix_sock_group_impl_create, 1385 .group_impl_add_sock = posix_sock_group_impl_add_sock, 1386 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 1387 .group_impl_poll = posix_sock_group_impl_poll, 1388 .group_impl_close = posix_sock_group_impl_close, 1389 .get_opts = posix_sock_impl_get_opts, 1390 .set_opts = posix_sock_impl_set_opts, 1391 }; 1392 1393 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1394