1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #if defined(__linux__) 37 #include <sys/epoll.h> 38 #include <linux/errqueue.h> 39 #elif defined(__FreeBSD__) 40 #include <sys/event.h> 41 #endif 42 43 #include "spdk/log.h" 44 #include "spdk/pipe.h" 45 #include "spdk/sock.h" 46 #include "spdk/util.h" 47 #include "spdk_internal/sock.h" 48 49 #define MAX_TMPBUF 1024 50 #define PORTNUMLEN 32 51 #define SO_RCVBUF_SIZE (2 * 1024 * 1024) 52 #define SO_SNDBUF_SIZE (2 * 1024 * 1024) 53 #define IOV_BATCH_SIZE 64 54 55 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 56 #define SPDK_ZEROCOPY 57 #endif 58 59 struct spdk_posix_sock { 60 struct spdk_sock base; 61 int fd; 62 63 uint32_t sendmsg_idx; 64 bool zcopy; 65 66 struct spdk_pipe *recv_pipe; 67 void *recv_buf; 68 int recv_buf_sz; 69 bool pending_recv; 70 71 TAILQ_ENTRY(spdk_posix_sock) link; 72 }; 73 74 struct spdk_posix_sock_group_impl { 75 struct spdk_sock_group_impl base; 76 int fd; 77 TAILQ_HEAD(, spdk_posix_sock) pending_recv; 78 }; 79 80 static int 81 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 82 { 83 const char *result = NULL; 84 85 if (sa == NULL || host == NULL) { 86 return -1; 87 } 88 89 switch (sa->sa_family) { 90 case AF_INET: 91 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 92 host, hlen); 93 break; 94 case AF_INET6: 95 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 96 host, hlen); 97 break; 98 default: 99 break; 100 } 101 102 if (result != NULL) { 103 return 0; 104 } else { 105 return -1; 106 } 107 } 108 109 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 110 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 111 112 static int 113 spdk_posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 114 char *caddr, int clen, uint16_t *cport) 115 { 116 struct spdk_posix_sock *sock = __posix_sock(_sock); 117 struct sockaddr_storage sa; 118 socklen_t salen; 119 int rc; 120 121 assert(sock != NULL); 122 123 memset(&sa, 0, sizeof sa); 124 salen = sizeof sa; 125 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 126 if (rc != 0) { 127 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 128 return -1; 129 } 130 131 switch (sa.ss_family) { 132 case AF_UNIX: 133 /* Acceptable connection types that don't have IPs */ 134 return 0; 135 case AF_INET: 136 case AF_INET6: 137 /* Code below will get IP addresses */ 138 break; 139 default: 140 /* Unsupported socket family */ 141 return -1; 142 } 143 144 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 145 if (rc != 0) { 146 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 147 return -1; 148 } 149 150 if (sport) { 151 if (sa.ss_family == AF_INET) { 152 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 153 } else if (sa.ss_family == AF_INET6) { 154 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 155 } 156 } 157 158 memset(&sa, 0, sizeof sa); 159 salen = sizeof sa; 160 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 161 if (rc != 0) { 162 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 163 return -1; 164 } 165 166 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 167 if (rc != 0) { 168 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 169 return -1; 170 } 171 172 if (cport) { 173 if (sa.ss_family == AF_INET) { 174 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 175 } else if (sa.ss_family == AF_INET6) { 176 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 177 } 178 } 179 180 return 0; 181 } 182 183 enum spdk_posix_sock_create_type { 184 SPDK_SOCK_CREATE_LISTEN, 185 SPDK_SOCK_CREATE_CONNECT, 186 }; 187 188 static int 189 spdk_posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 190 { 191 uint8_t *new_buf; 192 struct spdk_pipe *new_pipe; 193 struct iovec siov[2]; 194 struct iovec diov[2]; 195 int sbytes; 196 ssize_t bytes; 197 198 if (sock->recv_buf_sz == sz) { 199 return 0; 200 } 201 202 /* If the new size is 0, just free the pipe */ 203 if (sz == 0) { 204 spdk_pipe_destroy(sock->recv_pipe); 205 free(sock->recv_buf); 206 sock->recv_pipe = NULL; 207 sock->recv_buf = NULL; 208 return 0; 209 } 210 211 /* Round up to next 64 byte multiple */ 212 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 213 if (!new_buf) { 214 SPDK_ERRLOG("socket recv buf allocation failed\n"); 215 return -ENOMEM; 216 } 217 218 new_pipe = spdk_pipe_create(new_buf, sz + 1); 219 if (new_pipe == NULL) { 220 SPDK_ERRLOG("socket pipe allocation failed\n"); 221 free(new_buf); 222 return -ENOMEM; 223 } 224 225 if (sock->recv_pipe != NULL) { 226 /* Pull all of the data out of the old pipe */ 227 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 228 if (sbytes > sz) { 229 /* Too much data to fit into the new pipe size */ 230 spdk_pipe_destroy(new_pipe); 231 free(new_buf); 232 return -EINVAL; 233 } 234 235 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 236 assert(sbytes == sz); 237 238 bytes = spdk_iovcpy(siov, 2, diov, 2); 239 spdk_pipe_writer_advance(new_pipe, bytes); 240 241 spdk_pipe_destroy(sock->recv_pipe); 242 free(sock->recv_buf); 243 } 244 245 sock->recv_buf_sz = sz; 246 sock->recv_buf = new_buf; 247 sock->recv_pipe = new_pipe; 248 249 return 0; 250 } 251 252 static int 253 spdk_posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 254 { 255 struct spdk_posix_sock *sock = __posix_sock(_sock); 256 int rc; 257 258 assert(sock != NULL); 259 260 if (sz < SO_RCVBUF_SIZE) { 261 sz = SO_RCVBUF_SIZE; 262 } 263 264 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 265 if (rc < 0) { 266 return rc; 267 } 268 269 return 0; 270 } 271 272 static int 273 spdk_posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 274 { 275 struct spdk_posix_sock *sock = __posix_sock(_sock); 276 int rc; 277 278 assert(sock != NULL); 279 280 if (sz < SO_SNDBUF_SIZE) { 281 sz = SO_SNDBUF_SIZE; 282 } 283 284 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 285 if (rc < 0) { 286 return rc; 287 } 288 289 return 0; 290 } 291 292 static struct spdk_posix_sock * 293 _spdk_posix_sock_alloc(int fd) 294 { 295 struct spdk_posix_sock *sock; 296 int rc; 297 #ifdef SPDK_ZEROCOPY 298 int flag; 299 #endif 300 301 sock = calloc(1, sizeof(*sock)); 302 if (sock == NULL) { 303 SPDK_ERRLOG("sock allocation failed\n"); 304 return NULL; 305 } 306 307 sock->fd = fd; 308 309 #ifndef __aarch64__ 310 /* On ARM systems, this buffering does not help. Skip it. */ 311 /* The size of the pipe is purely derived from benchmarks. It seems to work well. */ 312 rc = spdk_posix_sock_alloc_pipe(sock, 8192); 313 if (rc) { 314 SPDK_ERRLOG("unable to allocate sufficient recvbuf\n"); 315 free(sock); 316 return NULL; 317 } 318 #endif 319 320 #ifdef SPDK_ZEROCOPY 321 /* Try to turn on zero copy sends */ 322 flag = 1; 323 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 324 if (rc == 0) { 325 sock->zcopy = true; 326 } 327 #endif 328 329 return sock; 330 } 331 332 static struct spdk_sock * 333 spdk_posix_sock_create(const char *ip, int port, enum spdk_posix_sock_create_type type) 334 { 335 struct spdk_posix_sock *sock; 336 char buf[MAX_TMPBUF]; 337 char portnum[PORTNUMLEN]; 338 char *p; 339 struct addrinfo hints, *res, *res0; 340 int fd, flag; 341 int val = 1; 342 int rc, sz; 343 344 if (ip == NULL) { 345 return NULL; 346 } 347 if (ip[0] == '[') { 348 snprintf(buf, sizeof(buf), "%s", ip + 1); 349 p = strchr(buf, ']'); 350 if (p != NULL) { 351 *p = '\0'; 352 } 353 ip = (const char *) &buf[0]; 354 } 355 356 snprintf(portnum, sizeof portnum, "%d", port); 357 memset(&hints, 0, sizeof hints); 358 hints.ai_family = PF_UNSPEC; 359 hints.ai_socktype = SOCK_STREAM; 360 hints.ai_flags = AI_NUMERICSERV; 361 hints.ai_flags |= AI_PASSIVE; 362 hints.ai_flags |= AI_NUMERICHOST; 363 rc = getaddrinfo(ip, portnum, &hints, &res0); 364 if (rc != 0) { 365 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 366 return NULL; 367 } 368 369 /* try listen */ 370 fd = -1; 371 for (res = res0; res != NULL; res = res->ai_next) { 372 retry: 373 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 374 if (fd < 0) { 375 /* error */ 376 continue; 377 } 378 379 sz = SO_RCVBUF_SIZE; 380 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 381 if (rc) { 382 /* Not fatal */ 383 } 384 385 sz = SO_SNDBUF_SIZE; 386 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 387 if (rc) { 388 /* Not fatal */ 389 } 390 391 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 392 if (rc != 0) { 393 close(fd); 394 /* error */ 395 continue; 396 } 397 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 398 if (rc != 0) { 399 close(fd); 400 /* error */ 401 continue; 402 } 403 404 if (res->ai_family == AF_INET6) { 405 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 406 if (rc != 0) { 407 close(fd); 408 /* error */ 409 continue; 410 } 411 } 412 413 if (type == SPDK_SOCK_CREATE_LISTEN) { 414 rc = bind(fd, res->ai_addr, res->ai_addrlen); 415 if (rc != 0) { 416 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 417 switch (errno) { 418 case EINTR: 419 /* interrupted? */ 420 close(fd); 421 goto retry; 422 case EADDRNOTAVAIL: 423 SPDK_ERRLOG("IP address %s not available. " 424 "Verify IP address in config file " 425 "and make sure setup script is " 426 "run before starting spdk app.\n", ip); 427 /* FALLTHROUGH */ 428 default: 429 /* try next family */ 430 close(fd); 431 fd = -1; 432 continue; 433 } 434 } 435 /* bind OK */ 436 rc = listen(fd, 512); 437 if (rc != 0) { 438 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 439 close(fd); 440 fd = -1; 441 break; 442 } 443 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 444 rc = connect(fd, res->ai_addr, res->ai_addrlen); 445 if (rc != 0) { 446 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 447 /* try next family */ 448 close(fd); 449 fd = -1; 450 continue; 451 } 452 } 453 454 flag = fcntl(fd, F_GETFL); 455 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 456 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 457 close(fd); 458 fd = -1; 459 break; 460 } 461 break; 462 } 463 freeaddrinfo(res0); 464 465 if (fd < 0) { 466 return NULL; 467 } 468 469 sock = _spdk_posix_sock_alloc(fd); 470 if (sock == NULL) { 471 SPDK_ERRLOG("sock allocation failed\n"); 472 close(fd); 473 return NULL; 474 } 475 476 /* Disable zero copy for client sockets until support is added */ 477 if (type == SPDK_SOCK_CREATE_CONNECT) { 478 sock->zcopy = false; 479 } 480 481 return &sock->base; 482 } 483 484 static struct spdk_sock * 485 spdk_posix_sock_listen(const char *ip, int port) 486 { 487 return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN); 488 } 489 490 static struct spdk_sock * 491 spdk_posix_sock_connect(const char *ip, int port) 492 { 493 return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT); 494 } 495 496 static struct spdk_sock * 497 spdk_posix_sock_accept(struct spdk_sock *_sock) 498 { 499 struct spdk_posix_sock *sock = __posix_sock(_sock); 500 struct sockaddr_storage sa; 501 socklen_t salen; 502 int rc, fd; 503 struct spdk_posix_sock *new_sock; 504 int flag; 505 506 memset(&sa, 0, sizeof(sa)); 507 salen = sizeof(sa); 508 509 assert(sock != NULL); 510 511 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 512 513 if (rc == -1) { 514 return NULL; 515 } 516 517 fd = rc; 518 519 flag = fcntl(fd, F_GETFL); 520 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 521 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 522 close(fd); 523 return NULL; 524 } 525 526 new_sock = _spdk_posix_sock_alloc(fd); 527 if (new_sock == NULL) { 528 close(fd); 529 return NULL; 530 } 531 532 return &new_sock->base; 533 } 534 535 static int 536 spdk_posix_sock_close(struct spdk_sock *_sock) 537 { 538 struct spdk_posix_sock *sock = __posix_sock(_sock); 539 540 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 541 542 /* If the socket fails to close, the best choice is to 543 * leak the fd but continue to free the rest of the sock 544 * memory. */ 545 close(sock->fd); 546 547 spdk_pipe_destroy(sock->recv_pipe); 548 free(sock->recv_buf); 549 free(sock); 550 551 return 0; 552 } 553 554 #ifdef SPDK_ZEROCOPY 555 static int 556 _sock_check_zcopy(struct spdk_sock *sock) 557 { 558 struct spdk_posix_sock *psock = __posix_sock(sock); 559 struct msghdr msgh = {}; 560 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 561 ssize_t rc; 562 struct sock_extended_err *serr; 563 struct cmsghdr *cm; 564 uint32_t idx; 565 struct spdk_sock_request *req, *treq; 566 bool found; 567 568 msgh.msg_control = buf; 569 msgh.msg_controllen = sizeof(buf); 570 571 while (true) { 572 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 573 574 if (rc < 0) { 575 if (errno == EWOULDBLOCK || errno == EAGAIN) { 576 return 0; 577 } 578 579 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 580 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 581 } else { 582 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 583 } 584 return 0; 585 } 586 587 cm = CMSG_FIRSTHDR(&msgh); 588 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 589 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 590 return 0; 591 } 592 593 serr = (struct sock_extended_err *)CMSG_DATA(cm); 594 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 595 SPDK_WARNLOG("Unexpected extended error origin\n"); 596 return 0; 597 } 598 599 /* Most of the time, the pending_reqs array is in the exact 600 * order we need such that all of the requests to complete are 601 * in order, in the front. It is guaranteed that all requests 602 * belonging to the same sendmsg call are sequential, so once 603 * we encounter one match we can stop looping as soon as a 604 * non-match is found. 605 */ 606 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 607 found = false; 608 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 609 if (req->internal.offset == idx) { 610 found = true; 611 612 rc = spdk_sock_request_put(sock, req, 0); 613 if (rc < 0) { 614 return rc; 615 } 616 617 } else if (found) { 618 break; 619 } 620 } 621 622 } 623 } 624 625 return 0; 626 } 627 #endif 628 629 static int 630 _sock_flush(struct spdk_sock *sock) 631 { 632 struct spdk_posix_sock *psock = __posix_sock(sock); 633 struct msghdr msg = {}; 634 int flags; 635 struct iovec iovs[IOV_BATCH_SIZE]; 636 int iovcnt; 637 int retval; 638 struct spdk_sock_request *req; 639 int i; 640 ssize_t rc; 641 unsigned int offset; 642 size_t len; 643 644 /* Can't flush from within a callback or we end up with recursive calls */ 645 if (sock->cb_cnt > 0) { 646 return 0; 647 } 648 649 /* Gather an iov */ 650 iovcnt = 0; 651 req = TAILQ_FIRST(&sock->queued_reqs); 652 while (req) { 653 offset = req->internal.offset; 654 655 for (i = 0; i < req->iovcnt; i++) { 656 /* Consume any offset first */ 657 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 658 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 659 continue; 660 } 661 662 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 663 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 664 iovcnt++; 665 666 offset = 0; 667 668 if (iovcnt >= IOV_BATCH_SIZE) { 669 break; 670 } 671 } 672 673 if (iovcnt >= IOV_BATCH_SIZE) { 674 break; 675 } 676 677 req = TAILQ_NEXT(req, internal.link); 678 } 679 680 if (iovcnt == 0) { 681 return 0; 682 } 683 684 /* Perform the vectored write */ 685 msg.msg_iov = iovs; 686 msg.msg_iovlen = iovcnt; 687 #ifdef SPDK_ZEROCOPY 688 if (psock->zcopy) { 689 flags = MSG_ZEROCOPY; 690 } else 691 #endif 692 { 693 flags = 0; 694 } 695 rc = sendmsg(psock->fd, &msg, flags); 696 if (rc <= 0) { 697 if (errno == EAGAIN || errno == EWOULDBLOCK) { 698 return 0; 699 } 700 return rc; 701 } 702 703 psock->sendmsg_idx++; 704 705 /* Consume the requests that were actually written */ 706 req = TAILQ_FIRST(&sock->queued_reqs); 707 while (req) { 708 offset = req->internal.offset; 709 710 for (i = 0; i < req->iovcnt; i++) { 711 /* Advance by the offset first */ 712 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 713 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 714 continue; 715 } 716 717 /* Calculate the remaining length of this element */ 718 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 719 720 if (len > (size_t)rc) { 721 /* This element was partially sent. */ 722 req->internal.offset += rc; 723 return 0; 724 } 725 726 offset = 0; 727 req->internal.offset += len; 728 rc -= len; 729 } 730 731 /* Handled a full request. */ 732 spdk_sock_request_pend(sock, req); 733 734 if (!psock->zcopy) { 735 /* The sendmsg syscall above isn't currently asynchronous, 736 * so it's already done. */ 737 retval = spdk_sock_request_put(sock, req, 0); 738 if (retval) { 739 break; 740 } 741 } else { 742 /* Re-use the offset field to hold the sendmsg call index. The 743 * index is 0 based, so subtract one here because we've already 744 * incremented above. */ 745 req->internal.offset = psock->sendmsg_idx - 1; 746 } 747 748 if (rc == 0) { 749 break; 750 } 751 752 req = TAILQ_FIRST(&sock->queued_reqs); 753 } 754 755 return 0; 756 } 757 758 static int 759 spdk_posix_sock_flush(struct spdk_sock *_sock) 760 { 761 return _sock_flush(_sock); 762 } 763 764 static ssize_t 765 spdk_posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 766 { 767 struct iovec siov[2]; 768 int sbytes; 769 ssize_t bytes; 770 struct spdk_posix_sock_group_impl *group; 771 772 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 773 if (sbytes < 0) { 774 errno = EINVAL; 775 return -1; 776 } else if (sbytes == 0) { 777 errno = EAGAIN; 778 return -1; 779 } 780 781 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 782 783 if (bytes == 0) { 784 /* The only way this happens is if diov is 0 length */ 785 errno = EINVAL; 786 return -1; 787 } 788 789 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 790 791 /* If we drained the pipe, take it off the level-triggered list */ 792 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 793 group = __posix_group_impl(sock->base.group_impl); 794 TAILQ_REMOVE(&group->pending_recv, sock, link); 795 sock->pending_recv = false; 796 } 797 798 return bytes; 799 } 800 801 static inline ssize_t 802 _spdk_posix_sock_read(struct spdk_posix_sock *sock) 803 { 804 struct iovec iov[2]; 805 int bytes; 806 struct spdk_posix_sock_group_impl *group; 807 808 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 809 810 if (bytes > 0) { 811 bytes = readv(sock->fd, iov, 2); 812 if (bytes > 0) { 813 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 814 if (sock->base.group_impl) { 815 group = __posix_group_impl(sock->base.group_impl); 816 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 817 sock->pending_recv = true; 818 } 819 } 820 } 821 822 return bytes; 823 } 824 825 static ssize_t 826 spdk_posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 827 { 828 struct spdk_posix_sock *sock = __posix_sock(_sock); 829 int rc, i; 830 size_t len; 831 832 if (sock->recv_pipe == NULL) { 833 return readv(sock->fd, iov, iovcnt); 834 } 835 836 len = 0; 837 for (i = 0; i < iovcnt; i++) { 838 len += iov[i].iov_len; 839 } 840 841 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 842 /* If the user is receiving a sufficiently large amount of data, 843 * receive directly to their buffers. */ 844 if (len >= 1024) { 845 return readv(sock->fd, iov, iovcnt); 846 } 847 848 /* Otherwise, do a big read into our pipe */ 849 rc = _spdk_posix_sock_read(sock); 850 if (rc <= 0) { 851 return rc; 852 } 853 } 854 855 return spdk_posix_sock_recv_from_pipe(sock, iov, iovcnt); 856 } 857 858 static ssize_t 859 spdk_posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 860 { 861 struct iovec iov[1]; 862 863 iov[0].iov_base = buf; 864 iov[0].iov_len = len; 865 866 return spdk_posix_sock_readv(sock, iov, 1); 867 } 868 869 static ssize_t 870 spdk_posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 871 { 872 struct spdk_posix_sock *sock = __posix_sock(_sock); 873 int rc; 874 875 /* In order to process a writev, we need to flush any asynchronous writes 876 * first. */ 877 rc = _sock_flush(_sock); 878 if (rc < 0) { 879 return rc; 880 } 881 882 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 883 /* We weren't able to flush all requests */ 884 errno = EAGAIN; 885 return -1; 886 } 887 888 return writev(sock->fd, iov, iovcnt); 889 } 890 891 static void 892 spdk_posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 893 { 894 int rc; 895 896 spdk_sock_request_queue(sock, req); 897 898 /* If there are a sufficient number queued, just flush them out immediately. */ 899 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 900 rc = _sock_flush(sock); 901 if (rc) { 902 spdk_sock_abort_requests(sock); 903 } 904 } 905 } 906 907 static int 908 spdk_posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 909 { 910 struct spdk_posix_sock *sock = __posix_sock(_sock); 911 int val; 912 int rc; 913 914 assert(sock != NULL); 915 916 val = nbytes; 917 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 918 if (rc != 0) { 919 return -1; 920 } 921 return 0; 922 } 923 924 static int 925 spdk_posix_sock_set_priority(struct spdk_sock *_sock, int priority) 926 { 927 int rc = 0; 928 929 #if defined(SO_PRIORITY) 930 struct spdk_posix_sock *sock = __posix_sock(_sock); 931 932 assert(sock != NULL); 933 934 rc = setsockopt(sock->fd, SOL_SOCKET, SO_PRIORITY, 935 &priority, sizeof(priority)); 936 #endif 937 return rc; 938 } 939 940 static bool 941 spdk_posix_sock_is_ipv6(struct spdk_sock *_sock) 942 { 943 struct spdk_posix_sock *sock = __posix_sock(_sock); 944 struct sockaddr_storage sa; 945 socklen_t salen; 946 int rc; 947 948 assert(sock != NULL); 949 950 memset(&sa, 0, sizeof sa); 951 salen = sizeof sa; 952 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 953 if (rc != 0) { 954 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 955 return false; 956 } 957 958 return (sa.ss_family == AF_INET6); 959 } 960 961 static bool 962 spdk_posix_sock_is_ipv4(struct spdk_sock *_sock) 963 { 964 struct spdk_posix_sock *sock = __posix_sock(_sock); 965 struct sockaddr_storage sa; 966 socklen_t salen; 967 int rc; 968 969 assert(sock != NULL); 970 971 memset(&sa, 0, sizeof sa); 972 salen = sizeof sa; 973 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 974 if (rc != 0) { 975 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 976 return false; 977 } 978 979 return (sa.ss_family == AF_INET); 980 } 981 982 static bool 983 spdk_posix_sock_is_connected(struct spdk_sock *_sock) 984 { 985 struct spdk_posix_sock *sock = __posix_sock(_sock); 986 uint8_t byte; 987 int rc; 988 989 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 990 if (rc == 0) { 991 return false; 992 } 993 994 if (rc < 0) { 995 if (errno == EAGAIN || errno == EWOULDBLOCK) { 996 return true; 997 } 998 999 return false; 1000 } 1001 1002 return true; 1003 } 1004 1005 static int 1006 spdk_posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1007 { 1008 int rc = -1; 1009 1010 #if defined(SO_INCOMING_NAPI_ID) 1011 struct spdk_posix_sock *sock = __posix_sock(_sock); 1012 socklen_t salen = sizeof(int); 1013 1014 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1015 if (rc != 0) { 1016 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1017 } 1018 1019 #endif 1020 return rc; 1021 } 1022 1023 static struct spdk_sock_group_impl * 1024 spdk_posix_sock_group_impl_create(void) 1025 { 1026 struct spdk_posix_sock_group_impl *group_impl; 1027 int fd; 1028 1029 #if defined(__linux__) 1030 fd = epoll_create1(0); 1031 #elif defined(__FreeBSD__) 1032 fd = kqueue(); 1033 #endif 1034 if (fd == -1) { 1035 return NULL; 1036 } 1037 1038 group_impl = calloc(1, sizeof(*group_impl)); 1039 if (group_impl == NULL) { 1040 SPDK_ERRLOG("group_impl allocation failed\n"); 1041 close(fd); 1042 return NULL; 1043 } 1044 1045 group_impl->fd = fd; 1046 TAILQ_INIT(&group_impl->pending_recv); 1047 1048 return &group_impl->base; 1049 } 1050 1051 static int 1052 spdk_posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1053 { 1054 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1055 struct spdk_posix_sock *sock = __posix_sock(_sock); 1056 int rc; 1057 1058 #if defined(__linux__) 1059 struct epoll_event event; 1060 1061 memset(&event, 0, sizeof(event)); 1062 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1063 event.events = EPOLLIN | EPOLLERR; 1064 event.data.ptr = sock; 1065 1066 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1067 #elif defined(__FreeBSD__) 1068 struct kevent event; 1069 struct timespec ts = {0}; 1070 1071 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1072 1073 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1074 #endif 1075 return rc; 1076 } 1077 1078 static int 1079 spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1080 { 1081 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1082 struct spdk_posix_sock *sock = __posix_sock(_sock); 1083 int rc; 1084 1085 if (sock->recv_pipe != NULL) { 1086 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { 1087 TAILQ_REMOVE(&group->pending_recv, sock, link); 1088 sock->pending_recv = false; 1089 } 1090 } 1091 1092 #if defined(__linux__) 1093 struct epoll_event event; 1094 1095 /* Event parameter is ignored but some old kernel version still require it. */ 1096 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1097 #elif defined(__FreeBSD__) 1098 struct kevent event; 1099 struct timespec ts = {0}; 1100 1101 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1102 1103 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1104 if (rc == 0 && event.flags & EV_ERROR) { 1105 rc = -1; 1106 errno = event.data; 1107 } 1108 #endif 1109 1110 spdk_sock_abort_requests(_sock); 1111 1112 return rc; 1113 } 1114 1115 static int 1116 spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1117 struct spdk_sock **socks) 1118 { 1119 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1120 struct spdk_sock *sock, *tmp; 1121 int num_events, i, rc; 1122 struct spdk_posix_sock *psock, *ptmp; 1123 #if defined(__linux__) 1124 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1125 #elif defined(__FreeBSD__) 1126 struct kevent events[MAX_EVENTS_PER_POLL]; 1127 struct timespec ts = {0}; 1128 #endif 1129 1130 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1131 * a completion callback could remove the sock from the 1132 * group. */ 1133 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1134 rc = _sock_flush(sock); 1135 if (rc) { 1136 spdk_sock_abort_requests(sock); 1137 } 1138 } 1139 1140 #if defined(__linux__) 1141 num_events = epoll_wait(group->fd, events, max_events, 0); 1142 #elif defined(__FreeBSD__) 1143 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1144 #endif 1145 1146 if (num_events == -1) { 1147 return -1; 1148 } 1149 1150 for (i = 0; i < num_events; i++) { 1151 #if defined(__linux__) 1152 sock = events[i].data.ptr; 1153 psock = __posix_sock(sock); 1154 1155 #ifdef SPDK_ZEROCOPY 1156 if (events[i].events & EPOLLERR) { 1157 rc = _sock_check_zcopy(sock); 1158 /* If the socket was closed or removed from 1159 * the group in response to a send ack, don't 1160 * add it to the array here. */ 1161 if (rc || sock->cb_fn == NULL) { 1162 continue; 1163 } 1164 } 1165 #endif 1166 if ((events[i].events & EPOLLIN) == 0) { 1167 continue; 1168 } 1169 1170 #elif defined(__FreeBSD__) 1171 sock = events[i].udata; 1172 psock = __posix_sock(sock); 1173 #endif 1174 1175 /* If the socket does not already have recv pending, add it now */ 1176 if (!psock->pending_recv) { 1177 psock->pending_recv = true; 1178 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1179 } 1180 } 1181 1182 num_events = 0; 1183 1184 TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) { 1185 if (num_events == max_events) { 1186 break; 1187 } 1188 1189 socks[num_events++] = &psock->base; 1190 } 1191 1192 /* Cycle the pending_recv list so that each time we poll things aren't 1193 * in the same order. */ 1194 for (i = 0; i < num_events; i++) { 1195 psock = __posix_sock(socks[i]); 1196 1197 TAILQ_REMOVE(&group->pending_recv, psock, link); 1198 1199 if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) { 1200 psock->pending_recv = false; 1201 } else { 1202 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1203 } 1204 1205 } 1206 1207 return num_events; 1208 } 1209 1210 static int 1211 spdk_posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1212 { 1213 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1214 int rc; 1215 1216 rc = close(group->fd); 1217 free(group); 1218 return rc; 1219 } 1220 1221 static struct spdk_net_impl g_posix_net_impl = { 1222 .name = "posix", 1223 .getaddr = spdk_posix_sock_getaddr, 1224 .connect = spdk_posix_sock_connect, 1225 .listen = spdk_posix_sock_listen, 1226 .accept = spdk_posix_sock_accept, 1227 .close = spdk_posix_sock_close, 1228 .recv = spdk_posix_sock_recv, 1229 .readv = spdk_posix_sock_readv, 1230 .writev = spdk_posix_sock_writev, 1231 .writev_async = spdk_posix_sock_writev_async, 1232 .flush = spdk_posix_sock_flush, 1233 .set_recvlowat = spdk_posix_sock_set_recvlowat, 1234 .set_recvbuf = spdk_posix_sock_set_recvbuf, 1235 .set_sendbuf = spdk_posix_sock_set_sendbuf, 1236 .set_priority = spdk_posix_sock_set_priority, 1237 .is_ipv6 = spdk_posix_sock_is_ipv6, 1238 .is_ipv4 = spdk_posix_sock_is_ipv4, 1239 .is_connected = spdk_posix_sock_is_connected, 1240 .get_placement_id = spdk_posix_sock_get_placement_id, 1241 .group_impl_create = spdk_posix_sock_group_impl_create, 1242 .group_impl_add_sock = spdk_posix_sock_group_impl_add_sock, 1243 .group_impl_remove_sock = spdk_posix_sock_group_impl_remove_sock, 1244 .group_impl_poll = spdk_posix_sock_group_impl_poll, 1245 .group_impl_close = spdk_posix_sock_group_impl_close, 1246 }; 1247 1248 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1249