1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #if defined(__linux__) 37 #include <sys/epoll.h> 38 #include <linux/errqueue.h> 39 #elif defined(__FreeBSD__) 40 #include <sys/event.h> 41 #endif 42 43 #include "spdk/log.h" 44 #include "spdk/pipe.h" 45 #include "spdk/sock.h" 46 #include "spdk/util.h" 47 #include "spdk/likely.h" 48 #include "spdk_internal/sock.h" 49 50 #define MAX_TMPBUF 1024 51 #define PORTNUMLEN 32 52 #define SO_RCVBUF_SIZE (2 * 1024 * 1024) 53 #define SO_SNDBUF_SIZE (2 * 1024 * 1024) 54 #define IOV_BATCH_SIZE 64 55 56 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 57 #define SPDK_ZEROCOPY 58 #endif 59 60 struct spdk_posix_sock { 61 struct spdk_sock base; 62 int fd; 63 64 uint32_t sendmsg_idx; 65 bool zcopy; 66 67 struct spdk_pipe *recv_pipe; 68 void *recv_buf; 69 int recv_buf_sz; 70 bool pending_recv; 71 72 TAILQ_ENTRY(spdk_posix_sock) link; 73 }; 74 75 struct spdk_posix_sock_group_impl { 76 struct spdk_sock_group_impl base; 77 int fd; 78 TAILQ_HEAD(, spdk_posix_sock) pending_recv; 79 }; 80 81 static int 82 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 83 { 84 const char *result = NULL; 85 86 if (sa == NULL || host == NULL) { 87 return -1; 88 } 89 90 switch (sa->sa_family) { 91 case AF_INET: 92 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 93 host, hlen); 94 break; 95 case AF_INET6: 96 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 97 host, hlen); 98 break; 99 default: 100 break; 101 } 102 103 if (result != NULL) { 104 return 0; 105 } else { 106 return -1; 107 } 108 } 109 110 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 111 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 112 113 static int 114 spdk_posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 115 char *caddr, int clen, uint16_t *cport) 116 { 117 struct spdk_posix_sock *sock = __posix_sock(_sock); 118 struct sockaddr_storage sa; 119 socklen_t salen; 120 int rc; 121 122 assert(sock != NULL); 123 124 memset(&sa, 0, sizeof sa); 125 salen = sizeof sa; 126 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 127 if (rc != 0) { 128 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 129 return -1; 130 } 131 132 switch (sa.ss_family) { 133 case AF_UNIX: 134 /* Acceptable connection types that don't have IPs */ 135 return 0; 136 case AF_INET: 137 case AF_INET6: 138 /* Code below will get IP addresses */ 139 break; 140 default: 141 /* Unsupported socket family */ 142 return -1; 143 } 144 145 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 146 if (rc != 0) { 147 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 148 return -1; 149 } 150 151 if (sport) { 152 if (sa.ss_family == AF_INET) { 153 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 154 } else if (sa.ss_family == AF_INET6) { 155 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 156 } 157 } 158 159 memset(&sa, 0, sizeof sa); 160 salen = sizeof sa; 161 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 162 if (rc != 0) { 163 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 164 return -1; 165 } 166 167 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 168 if (rc != 0) { 169 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 170 return -1; 171 } 172 173 if (cport) { 174 if (sa.ss_family == AF_INET) { 175 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 176 } else if (sa.ss_family == AF_INET6) { 177 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 178 } 179 } 180 181 return 0; 182 } 183 184 enum spdk_posix_sock_create_type { 185 SPDK_SOCK_CREATE_LISTEN, 186 SPDK_SOCK_CREATE_CONNECT, 187 }; 188 189 static int 190 spdk_posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 191 { 192 uint8_t *new_buf; 193 struct spdk_pipe *new_pipe; 194 struct iovec siov[2]; 195 struct iovec diov[2]; 196 int sbytes; 197 ssize_t bytes; 198 199 if (sock->recv_buf_sz == sz) { 200 return 0; 201 } 202 203 /* If the new size is 0, just free the pipe */ 204 if (sz == 0) { 205 spdk_pipe_destroy(sock->recv_pipe); 206 free(sock->recv_buf); 207 sock->recv_pipe = NULL; 208 sock->recv_buf = NULL; 209 return 0; 210 } 211 212 /* Round up to next 64 byte multiple */ 213 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 214 if (!new_buf) { 215 SPDK_ERRLOG("socket recv buf allocation failed\n"); 216 return -ENOMEM; 217 } 218 219 new_pipe = spdk_pipe_create(new_buf, sz + 1); 220 if (new_pipe == NULL) { 221 SPDK_ERRLOG("socket pipe allocation failed\n"); 222 free(new_buf); 223 return -ENOMEM; 224 } 225 226 if (sock->recv_pipe != NULL) { 227 /* Pull all of the data out of the old pipe */ 228 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 229 if (sbytes > sz) { 230 /* Too much data to fit into the new pipe size */ 231 spdk_pipe_destroy(new_pipe); 232 free(new_buf); 233 return -EINVAL; 234 } 235 236 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 237 assert(sbytes == sz); 238 239 bytes = spdk_iovcpy(siov, 2, diov, 2); 240 spdk_pipe_writer_advance(new_pipe, bytes); 241 242 spdk_pipe_destroy(sock->recv_pipe); 243 free(sock->recv_buf); 244 } 245 246 sock->recv_buf_sz = sz; 247 sock->recv_buf = new_buf; 248 sock->recv_pipe = new_pipe; 249 250 return 0; 251 } 252 253 static int 254 spdk_posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 255 { 256 struct spdk_posix_sock *sock = __posix_sock(_sock); 257 int rc; 258 259 assert(sock != NULL); 260 261 #ifndef __aarch64__ 262 /* On ARM systems, this buffering does not help. Skip it. */ 263 rc = spdk_posix_sock_alloc_pipe(sock, sz); 264 if (rc) { 265 return rc; 266 } 267 #endif 268 269 /* Set kernel buffer size to be at least SO_RCVBUF_SIZE */ 270 if (sz < SO_RCVBUF_SIZE) { 271 sz = SO_RCVBUF_SIZE; 272 } 273 274 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 275 if (rc < 0) { 276 return rc; 277 } 278 279 return 0; 280 } 281 282 static int 283 spdk_posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 284 { 285 struct spdk_posix_sock *sock = __posix_sock(_sock); 286 int rc; 287 288 assert(sock != NULL); 289 290 if (sz < SO_SNDBUF_SIZE) { 291 sz = SO_SNDBUF_SIZE; 292 } 293 294 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 295 if (rc < 0) { 296 return rc; 297 } 298 299 return 0; 300 } 301 302 static struct spdk_posix_sock * 303 _spdk_posix_sock_alloc(int fd) 304 { 305 struct spdk_posix_sock *sock; 306 #ifdef SPDK_ZEROCOPY 307 int rc; 308 int flag; 309 #endif 310 311 sock = calloc(1, sizeof(*sock)); 312 if (sock == NULL) { 313 SPDK_ERRLOG("sock allocation failed\n"); 314 return NULL; 315 } 316 317 sock->fd = fd; 318 319 #ifdef SPDK_ZEROCOPY 320 /* Try to turn on zero copy sends */ 321 flag = 1; 322 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 323 if (rc == 0) { 324 sock->zcopy = true; 325 } 326 #endif 327 328 return sock; 329 } 330 331 static struct spdk_sock * 332 spdk_posix_sock_create(const char *ip, int port, 333 enum spdk_posix_sock_create_type type, 334 struct spdk_sock_opts *opts) 335 { 336 struct spdk_posix_sock *sock; 337 char buf[MAX_TMPBUF]; 338 char portnum[PORTNUMLEN]; 339 char *p; 340 struct addrinfo hints, *res, *res0; 341 int fd, flag; 342 int val = 1; 343 int rc, sz; 344 345 if (ip == NULL) { 346 return NULL; 347 } 348 if (ip[0] == '[') { 349 snprintf(buf, sizeof(buf), "%s", ip + 1); 350 p = strchr(buf, ']'); 351 if (p != NULL) { 352 *p = '\0'; 353 } 354 ip = (const char *) &buf[0]; 355 } 356 357 snprintf(portnum, sizeof portnum, "%d", port); 358 memset(&hints, 0, sizeof hints); 359 hints.ai_family = PF_UNSPEC; 360 hints.ai_socktype = SOCK_STREAM; 361 hints.ai_flags = AI_NUMERICSERV; 362 hints.ai_flags |= AI_PASSIVE; 363 hints.ai_flags |= AI_NUMERICHOST; 364 rc = getaddrinfo(ip, portnum, &hints, &res0); 365 if (rc != 0) { 366 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 367 return NULL; 368 } 369 370 /* try listen */ 371 fd = -1; 372 for (res = res0; res != NULL; res = res->ai_next) { 373 retry: 374 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 375 if (fd < 0) { 376 /* error */ 377 continue; 378 } 379 380 sz = SO_RCVBUF_SIZE; 381 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 382 if (rc) { 383 /* Not fatal */ 384 } 385 386 sz = SO_SNDBUF_SIZE; 387 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 388 if (rc) { 389 /* Not fatal */ 390 } 391 392 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 393 if (rc != 0) { 394 close(fd); 395 /* error */ 396 continue; 397 } 398 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 399 if (rc != 0) { 400 close(fd); 401 /* error */ 402 continue; 403 } 404 405 #if defined(SO_PRIORITY) 406 if (opts != NULL && opts->priority) { 407 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 408 if (rc != 0) { 409 close(fd); 410 /* error */ 411 continue; 412 } 413 } 414 #endif 415 416 if (res->ai_family == AF_INET6) { 417 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 418 if (rc != 0) { 419 close(fd); 420 /* error */ 421 continue; 422 } 423 } 424 425 if (type == SPDK_SOCK_CREATE_LISTEN) { 426 rc = bind(fd, res->ai_addr, res->ai_addrlen); 427 if (rc != 0) { 428 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 429 switch (errno) { 430 case EINTR: 431 /* interrupted? */ 432 close(fd); 433 goto retry; 434 case EADDRNOTAVAIL: 435 SPDK_ERRLOG("IP address %s not available. " 436 "Verify IP address in config file " 437 "and make sure setup script is " 438 "run before starting spdk app.\n", ip); 439 /* FALLTHROUGH */ 440 default: 441 /* try next family */ 442 close(fd); 443 fd = -1; 444 continue; 445 } 446 } 447 /* bind OK */ 448 rc = listen(fd, 512); 449 if (rc != 0) { 450 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 451 close(fd); 452 fd = -1; 453 break; 454 } 455 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 456 rc = connect(fd, res->ai_addr, res->ai_addrlen); 457 if (rc != 0) { 458 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 459 /* try next family */ 460 close(fd); 461 fd = -1; 462 continue; 463 } 464 } 465 466 flag = fcntl(fd, F_GETFL); 467 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 468 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 469 close(fd); 470 fd = -1; 471 break; 472 } 473 break; 474 } 475 freeaddrinfo(res0); 476 477 if (fd < 0) { 478 return NULL; 479 } 480 481 sock = _spdk_posix_sock_alloc(fd); 482 if (sock == NULL) { 483 SPDK_ERRLOG("sock allocation failed\n"); 484 close(fd); 485 return NULL; 486 } 487 488 /* Disable zero copy for client sockets until support is added */ 489 if (type == SPDK_SOCK_CREATE_CONNECT) { 490 sock->zcopy = false; 491 } 492 493 return &sock->base; 494 } 495 496 static struct spdk_sock * 497 spdk_posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 498 { 499 return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 500 } 501 502 static struct spdk_sock * 503 spdk_posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 504 { 505 return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 506 } 507 508 static struct spdk_sock * 509 spdk_posix_sock_accept(struct spdk_sock *_sock) 510 { 511 struct spdk_posix_sock *sock = __posix_sock(_sock); 512 struct sockaddr_storage sa; 513 socklen_t salen; 514 int rc, fd; 515 struct spdk_posix_sock *new_sock; 516 int flag; 517 518 memset(&sa, 0, sizeof(sa)); 519 salen = sizeof(sa); 520 521 assert(sock != NULL); 522 523 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 524 525 if (rc == -1) { 526 return NULL; 527 } 528 529 fd = rc; 530 531 flag = fcntl(fd, F_GETFL); 532 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 533 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 534 close(fd); 535 return NULL; 536 } 537 538 #if defined(SO_PRIORITY) 539 /* The priority is not inherited, so call this function again */ 540 if (sock->base.opts.priority) { 541 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 542 if (rc != 0) { 543 close(fd); 544 return NULL; 545 } 546 } 547 #endif 548 549 new_sock = _spdk_posix_sock_alloc(fd); 550 if (new_sock == NULL) { 551 close(fd); 552 return NULL; 553 } 554 555 return &new_sock->base; 556 } 557 558 static int 559 spdk_posix_sock_close(struct spdk_sock *_sock) 560 { 561 struct spdk_posix_sock *sock = __posix_sock(_sock); 562 563 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 564 565 /* If the socket fails to close, the best choice is to 566 * leak the fd but continue to free the rest of the sock 567 * memory. */ 568 close(sock->fd); 569 570 spdk_pipe_destroy(sock->recv_pipe); 571 free(sock->recv_buf); 572 free(sock); 573 574 return 0; 575 } 576 577 #ifdef SPDK_ZEROCOPY 578 static int 579 _sock_check_zcopy(struct spdk_sock *sock) 580 { 581 struct spdk_posix_sock *psock = __posix_sock(sock); 582 struct msghdr msgh = {}; 583 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 584 ssize_t rc; 585 struct sock_extended_err *serr; 586 struct cmsghdr *cm; 587 uint32_t idx; 588 struct spdk_sock_request *req, *treq; 589 bool found; 590 591 msgh.msg_control = buf; 592 msgh.msg_controllen = sizeof(buf); 593 594 while (true) { 595 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 596 597 if (rc < 0) { 598 if (errno == EWOULDBLOCK || errno == EAGAIN) { 599 return 0; 600 } 601 602 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 603 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 604 } else { 605 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 606 } 607 return 0; 608 } 609 610 cm = CMSG_FIRSTHDR(&msgh); 611 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 612 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 613 return 0; 614 } 615 616 serr = (struct sock_extended_err *)CMSG_DATA(cm); 617 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 618 SPDK_WARNLOG("Unexpected extended error origin\n"); 619 return 0; 620 } 621 622 /* Most of the time, the pending_reqs array is in the exact 623 * order we need such that all of the requests to complete are 624 * in order, in the front. It is guaranteed that all requests 625 * belonging to the same sendmsg call are sequential, so once 626 * we encounter one match we can stop looping as soon as a 627 * non-match is found. 628 */ 629 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 630 found = false; 631 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 632 if (req->internal.offset == idx) { 633 found = true; 634 635 rc = spdk_sock_request_put(sock, req, 0); 636 if (rc < 0) { 637 return rc; 638 } 639 640 } else if (found) { 641 break; 642 } 643 } 644 645 } 646 } 647 648 return 0; 649 } 650 #endif 651 652 static int 653 _sock_flush(struct spdk_sock *sock) 654 { 655 struct spdk_posix_sock *psock = __posix_sock(sock); 656 struct msghdr msg = {}; 657 int flags; 658 struct iovec iovs[IOV_BATCH_SIZE]; 659 int iovcnt; 660 int retval; 661 struct spdk_sock_request *req; 662 int i; 663 ssize_t rc; 664 unsigned int offset; 665 size_t len; 666 667 /* Can't flush from within a callback or we end up with recursive calls */ 668 if (sock->cb_cnt > 0) { 669 return 0; 670 } 671 672 /* Gather an iov */ 673 iovcnt = 0; 674 req = TAILQ_FIRST(&sock->queued_reqs); 675 while (req) { 676 offset = req->internal.offset; 677 678 for (i = 0; i < req->iovcnt; i++) { 679 /* Consume any offset first */ 680 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 681 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 682 continue; 683 } 684 685 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 686 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 687 iovcnt++; 688 689 offset = 0; 690 691 if (iovcnt >= IOV_BATCH_SIZE) { 692 break; 693 } 694 } 695 696 if (iovcnt >= IOV_BATCH_SIZE) { 697 break; 698 } 699 700 req = TAILQ_NEXT(req, internal.link); 701 } 702 703 if (iovcnt == 0) { 704 return 0; 705 } 706 707 /* Perform the vectored write */ 708 msg.msg_iov = iovs; 709 msg.msg_iovlen = iovcnt; 710 #ifdef SPDK_ZEROCOPY 711 if (psock->zcopy) { 712 flags = MSG_ZEROCOPY; 713 } else 714 #endif 715 { 716 flags = 0; 717 } 718 rc = sendmsg(psock->fd, &msg, flags); 719 if (rc <= 0) { 720 if (errno == EAGAIN || errno == EWOULDBLOCK) { 721 return 0; 722 } 723 return rc; 724 } 725 726 psock->sendmsg_idx++; 727 728 /* Consume the requests that were actually written */ 729 req = TAILQ_FIRST(&sock->queued_reqs); 730 while (req) { 731 offset = req->internal.offset; 732 733 for (i = 0; i < req->iovcnt; i++) { 734 /* Advance by the offset first */ 735 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 736 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 737 continue; 738 } 739 740 /* Calculate the remaining length of this element */ 741 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 742 743 if (len > (size_t)rc) { 744 /* This element was partially sent. */ 745 req->internal.offset += rc; 746 return 0; 747 } 748 749 offset = 0; 750 req->internal.offset += len; 751 rc -= len; 752 } 753 754 /* Handled a full request. */ 755 spdk_sock_request_pend(sock, req); 756 757 if (!psock->zcopy) { 758 /* The sendmsg syscall above isn't currently asynchronous, 759 * so it's already done. */ 760 retval = spdk_sock_request_put(sock, req, 0); 761 if (retval) { 762 break; 763 } 764 } else { 765 /* Re-use the offset field to hold the sendmsg call index. The 766 * index is 0 based, so subtract one here because we've already 767 * incremented above. */ 768 req->internal.offset = psock->sendmsg_idx - 1; 769 } 770 771 if (rc == 0) { 772 break; 773 } 774 775 req = TAILQ_FIRST(&sock->queued_reqs); 776 } 777 778 return 0; 779 } 780 781 static int 782 spdk_posix_sock_flush(struct spdk_sock *_sock) 783 { 784 return _sock_flush(_sock); 785 } 786 787 static ssize_t 788 spdk_posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 789 { 790 struct iovec siov[2]; 791 int sbytes; 792 ssize_t bytes; 793 struct spdk_posix_sock_group_impl *group; 794 795 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 796 if (sbytes < 0) { 797 errno = EINVAL; 798 return -1; 799 } else if (sbytes == 0) { 800 errno = EAGAIN; 801 return -1; 802 } 803 804 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 805 806 if (bytes == 0) { 807 /* The only way this happens is if diov is 0 length */ 808 errno = EINVAL; 809 return -1; 810 } 811 812 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 813 814 /* If we drained the pipe, take it off the level-triggered list */ 815 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 816 group = __posix_group_impl(sock->base.group_impl); 817 TAILQ_REMOVE(&group->pending_recv, sock, link); 818 sock->pending_recv = false; 819 } 820 821 return bytes; 822 } 823 824 static inline ssize_t 825 _spdk_posix_sock_read(struct spdk_posix_sock *sock) 826 { 827 struct iovec iov[2]; 828 int bytes; 829 struct spdk_posix_sock_group_impl *group; 830 831 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 832 833 if (bytes > 0) { 834 bytes = readv(sock->fd, iov, 2); 835 if (bytes > 0) { 836 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 837 if (sock->base.group_impl) { 838 group = __posix_group_impl(sock->base.group_impl); 839 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 840 sock->pending_recv = true; 841 } 842 } 843 } 844 845 return bytes; 846 } 847 848 static ssize_t 849 spdk_posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 850 { 851 struct spdk_posix_sock *sock = __posix_sock(_sock); 852 int rc, i; 853 size_t len; 854 855 if (sock->recv_pipe == NULL) { 856 return readv(sock->fd, iov, iovcnt); 857 } 858 859 len = 0; 860 for (i = 0; i < iovcnt; i++) { 861 len += iov[i].iov_len; 862 } 863 864 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 865 /* If the user is receiving a sufficiently large amount of data, 866 * receive directly to their buffers. */ 867 if (len >= 1024) { 868 return readv(sock->fd, iov, iovcnt); 869 } 870 871 /* Otherwise, do a big read into our pipe */ 872 rc = _spdk_posix_sock_read(sock); 873 if (rc <= 0) { 874 return rc; 875 } 876 } 877 878 return spdk_posix_sock_recv_from_pipe(sock, iov, iovcnt); 879 } 880 881 static ssize_t 882 spdk_posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 883 { 884 struct iovec iov[1]; 885 886 iov[0].iov_base = buf; 887 iov[0].iov_len = len; 888 889 return spdk_posix_sock_readv(sock, iov, 1); 890 } 891 892 static ssize_t 893 spdk_posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 894 { 895 struct spdk_posix_sock *sock = __posix_sock(_sock); 896 int rc; 897 898 /* In order to process a writev, we need to flush any asynchronous writes 899 * first. */ 900 rc = _sock_flush(_sock); 901 if (rc < 0) { 902 return rc; 903 } 904 905 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 906 /* We weren't able to flush all requests */ 907 errno = EAGAIN; 908 return -1; 909 } 910 911 return writev(sock->fd, iov, iovcnt); 912 } 913 914 static void 915 spdk_posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 916 { 917 int rc; 918 919 spdk_sock_request_queue(sock, req); 920 921 /* If there are a sufficient number queued, just flush them out immediately. */ 922 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 923 rc = _sock_flush(sock); 924 if (rc) { 925 spdk_sock_abort_requests(sock); 926 } 927 } 928 } 929 930 static int 931 spdk_posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 932 { 933 struct spdk_posix_sock *sock = __posix_sock(_sock); 934 int val; 935 int rc; 936 937 assert(sock != NULL); 938 939 val = nbytes; 940 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 941 if (rc != 0) { 942 return -1; 943 } 944 return 0; 945 } 946 947 static bool 948 spdk_posix_sock_is_ipv6(struct spdk_sock *_sock) 949 { 950 struct spdk_posix_sock *sock = __posix_sock(_sock); 951 struct sockaddr_storage sa; 952 socklen_t salen; 953 int rc; 954 955 assert(sock != NULL); 956 957 memset(&sa, 0, sizeof sa); 958 salen = sizeof sa; 959 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 960 if (rc != 0) { 961 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 962 return false; 963 } 964 965 return (sa.ss_family == AF_INET6); 966 } 967 968 static bool 969 spdk_posix_sock_is_ipv4(struct spdk_sock *_sock) 970 { 971 struct spdk_posix_sock *sock = __posix_sock(_sock); 972 struct sockaddr_storage sa; 973 socklen_t salen; 974 int rc; 975 976 assert(sock != NULL); 977 978 memset(&sa, 0, sizeof sa); 979 salen = sizeof sa; 980 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 981 if (rc != 0) { 982 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 983 return false; 984 } 985 986 return (sa.ss_family == AF_INET); 987 } 988 989 static bool 990 spdk_posix_sock_is_connected(struct spdk_sock *_sock) 991 { 992 struct spdk_posix_sock *sock = __posix_sock(_sock); 993 uint8_t byte; 994 int rc; 995 996 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 997 if (rc == 0) { 998 return false; 999 } 1000 1001 if (rc < 0) { 1002 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1003 return true; 1004 } 1005 1006 return false; 1007 } 1008 1009 return true; 1010 } 1011 1012 static int 1013 spdk_posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1014 { 1015 int rc = -1; 1016 1017 #if defined(SO_INCOMING_NAPI_ID) 1018 struct spdk_posix_sock *sock = __posix_sock(_sock); 1019 socklen_t salen = sizeof(int); 1020 1021 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1022 if (rc != 0) { 1023 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1024 } 1025 1026 #endif 1027 return rc; 1028 } 1029 1030 static struct spdk_sock_group_impl * 1031 spdk_posix_sock_group_impl_create(void) 1032 { 1033 struct spdk_posix_sock_group_impl *group_impl; 1034 int fd; 1035 1036 #if defined(__linux__) 1037 fd = epoll_create1(0); 1038 #elif defined(__FreeBSD__) 1039 fd = kqueue(); 1040 #endif 1041 if (fd == -1) { 1042 return NULL; 1043 } 1044 1045 group_impl = calloc(1, sizeof(*group_impl)); 1046 if (group_impl == NULL) { 1047 SPDK_ERRLOG("group_impl allocation failed\n"); 1048 close(fd); 1049 return NULL; 1050 } 1051 1052 group_impl->fd = fd; 1053 TAILQ_INIT(&group_impl->pending_recv); 1054 1055 return &group_impl->base; 1056 } 1057 1058 static int 1059 spdk_posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1060 { 1061 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1062 struct spdk_posix_sock *sock = __posix_sock(_sock); 1063 int rc; 1064 1065 #if defined(__linux__) 1066 struct epoll_event event; 1067 1068 memset(&event, 0, sizeof(event)); 1069 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1070 event.events = EPOLLIN | EPOLLERR; 1071 event.data.ptr = sock; 1072 1073 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1074 #elif defined(__FreeBSD__) 1075 struct kevent event; 1076 struct timespec ts = {0}; 1077 1078 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1079 1080 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1081 #endif 1082 1083 /* switched from another polling group due to scheduling */ 1084 if (spdk_unlikely(sock->recv_pipe != NULL && 1085 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1086 assert(sock->pending_recv == false); 1087 sock->pending_recv = true; 1088 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1089 } 1090 1091 return rc; 1092 } 1093 1094 static int 1095 spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1096 { 1097 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1098 struct spdk_posix_sock *sock = __posix_sock(_sock); 1099 int rc; 1100 1101 if (sock->recv_pipe != NULL) { 1102 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { 1103 TAILQ_REMOVE(&group->pending_recv, sock, link); 1104 sock->pending_recv = false; 1105 } 1106 assert(sock->pending_recv == false); 1107 } 1108 1109 #if defined(__linux__) 1110 struct epoll_event event; 1111 1112 /* Event parameter is ignored but some old kernel version still require it. */ 1113 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1114 #elif defined(__FreeBSD__) 1115 struct kevent event; 1116 struct timespec ts = {0}; 1117 1118 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1119 1120 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1121 if (rc == 0 && event.flags & EV_ERROR) { 1122 rc = -1; 1123 errno = event.data; 1124 } 1125 #endif 1126 1127 spdk_sock_abort_requests(_sock); 1128 1129 return rc; 1130 } 1131 1132 static int 1133 spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1134 struct spdk_sock **socks) 1135 { 1136 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1137 struct spdk_sock *sock, *tmp; 1138 int num_events, i, rc; 1139 struct spdk_posix_sock *psock, *ptmp; 1140 #if defined(__linux__) 1141 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1142 #elif defined(__FreeBSD__) 1143 struct kevent events[MAX_EVENTS_PER_POLL]; 1144 struct timespec ts = {0}; 1145 #endif 1146 1147 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1148 * a completion callback could remove the sock from the 1149 * group. */ 1150 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1151 rc = _sock_flush(sock); 1152 if (rc) { 1153 spdk_sock_abort_requests(sock); 1154 } 1155 } 1156 1157 #if defined(__linux__) 1158 num_events = epoll_wait(group->fd, events, max_events, 0); 1159 #elif defined(__FreeBSD__) 1160 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1161 #endif 1162 1163 if (num_events == -1) { 1164 return -1; 1165 } 1166 1167 for (i = 0; i < num_events; i++) { 1168 #if defined(__linux__) 1169 sock = events[i].data.ptr; 1170 psock = __posix_sock(sock); 1171 1172 #ifdef SPDK_ZEROCOPY 1173 if (events[i].events & EPOLLERR) { 1174 rc = _sock_check_zcopy(sock); 1175 /* If the socket was closed or removed from 1176 * the group in response to a send ack, don't 1177 * add it to the array here. */ 1178 if (rc || sock->cb_fn == NULL) { 1179 continue; 1180 } 1181 } 1182 #endif 1183 if ((events[i].events & EPOLLIN) == 0) { 1184 continue; 1185 } 1186 1187 #elif defined(__FreeBSD__) 1188 sock = events[i].udata; 1189 psock = __posix_sock(sock); 1190 #endif 1191 1192 /* If the socket does not already have recv pending, add it now */ 1193 if (!psock->pending_recv) { 1194 psock->pending_recv = true; 1195 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1196 } 1197 } 1198 1199 num_events = 0; 1200 1201 TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) { 1202 if (num_events == max_events) { 1203 break; 1204 } 1205 1206 socks[num_events++] = &psock->base; 1207 } 1208 1209 /* Cycle the pending_recv list so that each time we poll things aren't 1210 * in the same order. */ 1211 for (i = 0; i < num_events; i++) { 1212 psock = __posix_sock(socks[i]); 1213 1214 TAILQ_REMOVE(&group->pending_recv, psock, link); 1215 1216 if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) { 1217 psock->pending_recv = false; 1218 } else { 1219 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1220 } 1221 1222 } 1223 1224 return num_events; 1225 } 1226 1227 static int 1228 spdk_posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1229 { 1230 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1231 int rc; 1232 1233 rc = close(group->fd); 1234 free(group); 1235 return rc; 1236 } 1237 1238 static struct spdk_net_impl g_posix_net_impl = { 1239 .name = "posix", 1240 .getaddr = spdk_posix_sock_getaddr, 1241 .connect = spdk_posix_sock_connect, 1242 .listen = spdk_posix_sock_listen, 1243 .accept = spdk_posix_sock_accept, 1244 .close = spdk_posix_sock_close, 1245 .recv = spdk_posix_sock_recv, 1246 .readv = spdk_posix_sock_readv, 1247 .writev = spdk_posix_sock_writev, 1248 .writev_async = spdk_posix_sock_writev_async, 1249 .flush = spdk_posix_sock_flush, 1250 .set_recvlowat = spdk_posix_sock_set_recvlowat, 1251 .set_recvbuf = spdk_posix_sock_set_recvbuf, 1252 .set_sendbuf = spdk_posix_sock_set_sendbuf, 1253 .is_ipv6 = spdk_posix_sock_is_ipv6, 1254 .is_ipv4 = spdk_posix_sock_is_ipv4, 1255 .is_connected = spdk_posix_sock_is_connected, 1256 .get_placement_id = spdk_posix_sock_get_placement_id, 1257 .group_impl_create = spdk_posix_sock_group_impl_create, 1258 .group_impl_add_sock = spdk_posix_sock_group_impl_add_sock, 1259 .group_impl_remove_sock = spdk_posix_sock_group_impl_remove_sock, 1260 .group_impl_poll = spdk_posix_sock_group_impl_poll, 1261 .group_impl_close = spdk_posix_sock_group_impl_close, 1262 }; 1263 1264 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1265