1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #if defined(__linux__) 37 #include <sys/epoll.h> 38 #include <linux/errqueue.h> 39 #elif defined(__FreeBSD__) 40 #include <sys/event.h> 41 #endif 42 43 #include "spdk/log.h" 44 #include "spdk/pipe.h" 45 #include "spdk/sock.h" 46 #include "spdk/util.h" 47 #include "spdk/likely.h" 48 #include "spdk_internal/sock.h" 49 50 #define MAX_TMPBUF 1024 51 #define PORTNUMLEN 32 52 #define SO_RCVBUF_SIZE (2 * 1024 * 1024) 53 #define SO_SNDBUF_SIZE (2 * 1024 * 1024) 54 #define IOV_BATCH_SIZE 64 55 56 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 57 #define SPDK_ZEROCOPY 58 #endif 59 60 struct spdk_posix_sock { 61 struct spdk_sock base; 62 int fd; 63 64 uint32_t sendmsg_idx; 65 bool zcopy; 66 67 struct spdk_pipe *recv_pipe; 68 void *recv_buf; 69 int recv_buf_sz; 70 bool pending_recv; 71 72 TAILQ_ENTRY(spdk_posix_sock) link; 73 }; 74 75 struct spdk_posix_sock_group_impl { 76 struct spdk_sock_group_impl base; 77 int fd; 78 TAILQ_HEAD(, spdk_posix_sock) pending_recv; 79 }; 80 81 static int 82 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 83 { 84 const char *result = NULL; 85 86 if (sa == NULL || host == NULL) { 87 return -1; 88 } 89 90 switch (sa->sa_family) { 91 case AF_INET: 92 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 93 host, hlen); 94 break; 95 case AF_INET6: 96 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 97 host, hlen); 98 break; 99 default: 100 break; 101 } 102 103 if (result != NULL) { 104 return 0; 105 } else { 106 return -1; 107 } 108 } 109 110 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 111 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 112 113 static int 114 spdk_posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 115 char *caddr, int clen, uint16_t *cport) 116 { 117 struct spdk_posix_sock *sock = __posix_sock(_sock); 118 struct sockaddr_storage sa; 119 socklen_t salen; 120 int rc; 121 122 assert(sock != NULL); 123 124 memset(&sa, 0, sizeof sa); 125 salen = sizeof sa; 126 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 127 if (rc != 0) { 128 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 129 return -1; 130 } 131 132 switch (sa.ss_family) { 133 case AF_UNIX: 134 /* Acceptable connection types that don't have IPs */ 135 return 0; 136 case AF_INET: 137 case AF_INET6: 138 /* Code below will get IP addresses */ 139 break; 140 default: 141 /* Unsupported socket family */ 142 return -1; 143 } 144 145 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 146 if (rc != 0) { 147 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 148 return -1; 149 } 150 151 if (sport) { 152 if (sa.ss_family == AF_INET) { 153 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 154 } else if (sa.ss_family == AF_INET6) { 155 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 156 } 157 } 158 159 memset(&sa, 0, sizeof sa); 160 salen = sizeof sa; 161 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 162 if (rc != 0) { 163 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 164 return -1; 165 } 166 167 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 168 if (rc != 0) { 169 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 170 return -1; 171 } 172 173 if (cport) { 174 if (sa.ss_family == AF_INET) { 175 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 176 } else if (sa.ss_family == AF_INET6) { 177 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 178 } 179 } 180 181 return 0; 182 } 183 184 enum spdk_posix_sock_create_type { 185 SPDK_SOCK_CREATE_LISTEN, 186 SPDK_SOCK_CREATE_CONNECT, 187 }; 188 189 static int 190 spdk_posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 191 { 192 uint8_t *new_buf; 193 struct spdk_pipe *new_pipe; 194 struct iovec siov[2]; 195 struct iovec diov[2]; 196 int sbytes; 197 ssize_t bytes; 198 199 if (sock->recv_buf_sz == sz) { 200 return 0; 201 } 202 203 /* If the new size is 0, just free the pipe */ 204 if (sz == 0) { 205 spdk_pipe_destroy(sock->recv_pipe); 206 free(sock->recv_buf); 207 sock->recv_pipe = NULL; 208 sock->recv_buf = NULL; 209 return 0; 210 } 211 212 /* Round up to next 64 byte multiple */ 213 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 214 if (!new_buf) { 215 SPDK_ERRLOG("socket recv buf allocation failed\n"); 216 return -ENOMEM; 217 } 218 219 new_pipe = spdk_pipe_create(new_buf, sz + 1); 220 if (new_pipe == NULL) { 221 SPDK_ERRLOG("socket pipe allocation failed\n"); 222 free(new_buf); 223 return -ENOMEM; 224 } 225 226 if (sock->recv_pipe != NULL) { 227 /* Pull all of the data out of the old pipe */ 228 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 229 if (sbytes > sz) { 230 /* Too much data to fit into the new pipe size */ 231 spdk_pipe_destroy(new_pipe); 232 free(new_buf); 233 return -EINVAL; 234 } 235 236 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 237 assert(sbytes == sz); 238 239 bytes = spdk_iovcpy(siov, 2, diov, 2); 240 spdk_pipe_writer_advance(new_pipe, bytes); 241 242 spdk_pipe_destroy(sock->recv_pipe); 243 free(sock->recv_buf); 244 } 245 246 sock->recv_buf_sz = sz; 247 sock->recv_buf = new_buf; 248 sock->recv_pipe = new_pipe; 249 250 return 0; 251 } 252 253 static int 254 spdk_posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 255 { 256 struct spdk_posix_sock *sock = __posix_sock(_sock); 257 int rc; 258 259 assert(sock != NULL); 260 261 #ifndef __aarch64__ 262 /* On ARM systems, this buffering does not help. Skip it. */ 263 rc = spdk_posix_sock_alloc_pipe(sock, sz); 264 if (rc) { 265 return rc; 266 } 267 #endif 268 269 /* Set kernel buffer size to be at least SO_RCVBUF_SIZE */ 270 if (sz < SO_RCVBUF_SIZE) { 271 sz = SO_RCVBUF_SIZE; 272 } 273 274 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 275 if (rc < 0) { 276 return rc; 277 } 278 279 return 0; 280 } 281 282 static int 283 spdk_posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 284 { 285 struct spdk_posix_sock *sock = __posix_sock(_sock); 286 int rc; 287 288 assert(sock != NULL); 289 290 if (sz < SO_SNDBUF_SIZE) { 291 sz = SO_SNDBUF_SIZE; 292 } 293 294 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 295 if (rc < 0) { 296 return rc; 297 } 298 299 return 0; 300 } 301 302 static struct spdk_posix_sock * 303 _spdk_posix_sock_alloc(int fd) 304 { 305 struct spdk_posix_sock *sock; 306 #ifdef SPDK_ZEROCOPY 307 int rc; 308 int flag; 309 #endif 310 311 sock = calloc(1, sizeof(*sock)); 312 if (sock == NULL) { 313 SPDK_ERRLOG("sock allocation failed\n"); 314 return NULL; 315 } 316 317 sock->fd = fd; 318 319 #ifdef SPDK_ZEROCOPY 320 /* Try to turn on zero copy sends */ 321 flag = 1; 322 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 323 if (rc == 0) { 324 sock->zcopy = true; 325 } 326 #endif 327 328 return sock; 329 } 330 331 static struct spdk_sock * 332 spdk_posix_sock_create(const char *ip, int port, enum spdk_posix_sock_create_type type) 333 { 334 struct spdk_posix_sock *sock; 335 char buf[MAX_TMPBUF]; 336 char portnum[PORTNUMLEN]; 337 char *p; 338 struct addrinfo hints, *res, *res0; 339 int fd, flag; 340 int val = 1; 341 int rc, sz; 342 343 if (ip == NULL) { 344 return NULL; 345 } 346 if (ip[0] == '[') { 347 snprintf(buf, sizeof(buf), "%s", ip + 1); 348 p = strchr(buf, ']'); 349 if (p != NULL) { 350 *p = '\0'; 351 } 352 ip = (const char *) &buf[0]; 353 } 354 355 snprintf(portnum, sizeof portnum, "%d", port); 356 memset(&hints, 0, sizeof hints); 357 hints.ai_family = PF_UNSPEC; 358 hints.ai_socktype = SOCK_STREAM; 359 hints.ai_flags = AI_NUMERICSERV; 360 hints.ai_flags |= AI_PASSIVE; 361 hints.ai_flags |= AI_NUMERICHOST; 362 rc = getaddrinfo(ip, portnum, &hints, &res0); 363 if (rc != 0) { 364 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 365 return NULL; 366 } 367 368 /* try listen */ 369 fd = -1; 370 for (res = res0; res != NULL; res = res->ai_next) { 371 retry: 372 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 373 if (fd < 0) { 374 /* error */ 375 continue; 376 } 377 378 sz = SO_RCVBUF_SIZE; 379 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 380 if (rc) { 381 /* Not fatal */ 382 } 383 384 sz = SO_SNDBUF_SIZE; 385 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 386 if (rc) { 387 /* Not fatal */ 388 } 389 390 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 391 if (rc != 0) { 392 close(fd); 393 /* error */ 394 continue; 395 } 396 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 397 if (rc != 0) { 398 close(fd); 399 /* error */ 400 continue; 401 } 402 403 if (res->ai_family == AF_INET6) { 404 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 405 if (rc != 0) { 406 close(fd); 407 /* error */ 408 continue; 409 } 410 } 411 412 if (type == SPDK_SOCK_CREATE_LISTEN) { 413 rc = bind(fd, res->ai_addr, res->ai_addrlen); 414 if (rc != 0) { 415 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 416 switch (errno) { 417 case EINTR: 418 /* interrupted? */ 419 close(fd); 420 goto retry; 421 case EADDRNOTAVAIL: 422 SPDK_ERRLOG("IP address %s not available. " 423 "Verify IP address in config file " 424 "and make sure setup script is " 425 "run before starting spdk app.\n", ip); 426 /* FALLTHROUGH */ 427 default: 428 /* try next family */ 429 close(fd); 430 fd = -1; 431 continue; 432 } 433 } 434 /* bind OK */ 435 rc = listen(fd, 512); 436 if (rc != 0) { 437 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 438 close(fd); 439 fd = -1; 440 break; 441 } 442 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 443 rc = connect(fd, res->ai_addr, res->ai_addrlen); 444 if (rc != 0) { 445 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 446 /* try next family */ 447 close(fd); 448 fd = -1; 449 continue; 450 } 451 } 452 453 flag = fcntl(fd, F_GETFL); 454 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 455 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 456 close(fd); 457 fd = -1; 458 break; 459 } 460 break; 461 } 462 freeaddrinfo(res0); 463 464 if (fd < 0) { 465 return NULL; 466 } 467 468 sock = _spdk_posix_sock_alloc(fd); 469 if (sock == NULL) { 470 SPDK_ERRLOG("sock allocation failed\n"); 471 close(fd); 472 return NULL; 473 } 474 475 /* Disable zero copy for client sockets until support is added */ 476 if (type == SPDK_SOCK_CREATE_CONNECT) { 477 sock->zcopy = false; 478 } 479 480 return &sock->base; 481 } 482 483 static struct spdk_sock * 484 spdk_posix_sock_listen(const char *ip, int port) 485 { 486 return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN); 487 } 488 489 static struct spdk_sock * 490 spdk_posix_sock_connect(const char *ip, int port) 491 { 492 return spdk_posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT); 493 } 494 495 static struct spdk_sock * 496 spdk_posix_sock_accept(struct spdk_sock *_sock) 497 { 498 struct spdk_posix_sock *sock = __posix_sock(_sock); 499 struct sockaddr_storage sa; 500 socklen_t salen; 501 int rc, fd; 502 struct spdk_posix_sock *new_sock; 503 int flag; 504 505 memset(&sa, 0, sizeof(sa)); 506 salen = sizeof(sa); 507 508 assert(sock != NULL); 509 510 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 511 512 if (rc == -1) { 513 return NULL; 514 } 515 516 fd = rc; 517 518 flag = fcntl(fd, F_GETFL); 519 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 520 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 521 close(fd); 522 return NULL; 523 } 524 525 new_sock = _spdk_posix_sock_alloc(fd); 526 if (new_sock == NULL) { 527 close(fd); 528 return NULL; 529 } 530 531 return &new_sock->base; 532 } 533 534 static int 535 spdk_posix_sock_close(struct spdk_sock *_sock) 536 { 537 struct spdk_posix_sock *sock = __posix_sock(_sock); 538 539 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 540 541 /* If the socket fails to close, the best choice is to 542 * leak the fd but continue to free the rest of the sock 543 * memory. */ 544 close(sock->fd); 545 546 spdk_pipe_destroy(sock->recv_pipe); 547 free(sock->recv_buf); 548 free(sock); 549 550 return 0; 551 } 552 553 #ifdef SPDK_ZEROCOPY 554 static int 555 _sock_check_zcopy(struct spdk_sock *sock) 556 { 557 struct spdk_posix_sock *psock = __posix_sock(sock); 558 struct msghdr msgh = {}; 559 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 560 ssize_t rc; 561 struct sock_extended_err *serr; 562 struct cmsghdr *cm; 563 uint32_t idx; 564 struct spdk_sock_request *req, *treq; 565 bool found; 566 567 msgh.msg_control = buf; 568 msgh.msg_controllen = sizeof(buf); 569 570 while (true) { 571 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 572 573 if (rc < 0) { 574 if (errno == EWOULDBLOCK || errno == EAGAIN) { 575 return 0; 576 } 577 578 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 579 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 580 } else { 581 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 582 } 583 return 0; 584 } 585 586 cm = CMSG_FIRSTHDR(&msgh); 587 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 588 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 589 return 0; 590 } 591 592 serr = (struct sock_extended_err *)CMSG_DATA(cm); 593 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 594 SPDK_WARNLOG("Unexpected extended error origin\n"); 595 return 0; 596 } 597 598 /* Most of the time, the pending_reqs array is in the exact 599 * order we need such that all of the requests to complete are 600 * in order, in the front. It is guaranteed that all requests 601 * belonging to the same sendmsg call are sequential, so once 602 * we encounter one match we can stop looping as soon as a 603 * non-match is found. 604 */ 605 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 606 found = false; 607 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 608 if (req->internal.offset == idx) { 609 found = true; 610 611 rc = spdk_sock_request_put(sock, req, 0); 612 if (rc < 0) { 613 return rc; 614 } 615 616 } else if (found) { 617 break; 618 } 619 } 620 621 } 622 } 623 624 return 0; 625 } 626 #endif 627 628 static int 629 _sock_flush(struct spdk_sock *sock) 630 { 631 struct spdk_posix_sock *psock = __posix_sock(sock); 632 struct msghdr msg = {}; 633 int flags; 634 struct iovec iovs[IOV_BATCH_SIZE]; 635 int iovcnt; 636 int retval; 637 struct spdk_sock_request *req; 638 int i; 639 ssize_t rc; 640 unsigned int offset; 641 size_t len; 642 643 /* Can't flush from within a callback or we end up with recursive calls */ 644 if (sock->cb_cnt > 0) { 645 return 0; 646 } 647 648 /* Gather an iov */ 649 iovcnt = 0; 650 req = TAILQ_FIRST(&sock->queued_reqs); 651 while (req) { 652 offset = req->internal.offset; 653 654 for (i = 0; i < req->iovcnt; i++) { 655 /* Consume any offset first */ 656 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 657 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 658 continue; 659 } 660 661 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 662 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 663 iovcnt++; 664 665 offset = 0; 666 667 if (iovcnt >= IOV_BATCH_SIZE) { 668 break; 669 } 670 } 671 672 if (iovcnt >= IOV_BATCH_SIZE) { 673 break; 674 } 675 676 req = TAILQ_NEXT(req, internal.link); 677 } 678 679 if (iovcnt == 0) { 680 return 0; 681 } 682 683 /* Perform the vectored write */ 684 msg.msg_iov = iovs; 685 msg.msg_iovlen = iovcnt; 686 #ifdef SPDK_ZEROCOPY 687 if (psock->zcopy) { 688 flags = MSG_ZEROCOPY; 689 } else 690 #endif 691 { 692 flags = 0; 693 } 694 rc = sendmsg(psock->fd, &msg, flags); 695 if (rc <= 0) { 696 if (errno == EAGAIN || errno == EWOULDBLOCK) { 697 return 0; 698 } 699 return rc; 700 } 701 702 psock->sendmsg_idx++; 703 704 /* Consume the requests that were actually written */ 705 req = TAILQ_FIRST(&sock->queued_reqs); 706 while (req) { 707 offset = req->internal.offset; 708 709 for (i = 0; i < req->iovcnt; i++) { 710 /* Advance by the offset first */ 711 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 712 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 713 continue; 714 } 715 716 /* Calculate the remaining length of this element */ 717 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 718 719 if (len > (size_t)rc) { 720 /* This element was partially sent. */ 721 req->internal.offset += rc; 722 return 0; 723 } 724 725 offset = 0; 726 req->internal.offset += len; 727 rc -= len; 728 } 729 730 /* Handled a full request. */ 731 spdk_sock_request_pend(sock, req); 732 733 if (!psock->zcopy) { 734 /* The sendmsg syscall above isn't currently asynchronous, 735 * so it's already done. */ 736 retval = spdk_sock_request_put(sock, req, 0); 737 if (retval) { 738 break; 739 } 740 } else { 741 /* Re-use the offset field to hold the sendmsg call index. The 742 * index is 0 based, so subtract one here because we've already 743 * incremented above. */ 744 req->internal.offset = psock->sendmsg_idx - 1; 745 } 746 747 if (rc == 0) { 748 break; 749 } 750 751 req = TAILQ_FIRST(&sock->queued_reqs); 752 } 753 754 return 0; 755 } 756 757 static int 758 spdk_posix_sock_flush(struct spdk_sock *_sock) 759 { 760 return _sock_flush(_sock); 761 } 762 763 static ssize_t 764 spdk_posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 765 { 766 struct iovec siov[2]; 767 int sbytes; 768 ssize_t bytes; 769 struct spdk_posix_sock_group_impl *group; 770 771 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 772 if (sbytes < 0) { 773 errno = EINVAL; 774 return -1; 775 } else if (sbytes == 0) { 776 errno = EAGAIN; 777 return -1; 778 } 779 780 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 781 782 if (bytes == 0) { 783 /* The only way this happens is if diov is 0 length */ 784 errno = EINVAL; 785 return -1; 786 } 787 788 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 789 790 /* If we drained the pipe, take it off the level-triggered list */ 791 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 792 group = __posix_group_impl(sock->base.group_impl); 793 TAILQ_REMOVE(&group->pending_recv, sock, link); 794 sock->pending_recv = false; 795 } 796 797 return bytes; 798 } 799 800 static inline ssize_t 801 _spdk_posix_sock_read(struct spdk_posix_sock *sock) 802 { 803 struct iovec iov[2]; 804 int bytes; 805 struct spdk_posix_sock_group_impl *group; 806 807 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 808 809 if (bytes > 0) { 810 bytes = readv(sock->fd, iov, 2); 811 if (bytes > 0) { 812 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 813 if (sock->base.group_impl) { 814 group = __posix_group_impl(sock->base.group_impl); 815 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 816 sock->pending_recv = true; 817 } 818 } 819 } 820 821 return bytes; 822 } 823 824 static ssize_t 825 spdk_posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 826 { 827 struct spdk_posix_sock *sock = __posix_sock(_sock); 828 int rc, i; 829 size_t len; 830 831 if (sock->recv_pipe == NULL) { 832 return readv(sock->fd, iov, iovcnt); 833 } 834 835 len = 0; 836 for (i = 0; i < iovcnt; i++) { 837 len += iov[i].iov_len; 838 } 839 840 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 841 /* If the user is receiving a sufficiently large amount of data, 842 * receive directly to their buffers. */ 843 if (len >= 1024) { 844 return readv(sock->fd, iov, iovcnt); 845 } 846 847 /* Otherwise, do a big read into our pipe */ 848 rc = _spdk_posix_sock_read(sock); 849 if (rc <= 0) { 850 return rc; 851 } 852 } 853 854 return spdk_posix_sock_recv_from_pipe(sock, iov, iovcnt); 855 } 856 857 static ssize_t 858 spdk_posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 859 { 860 struct iovec iov[1]; 861 862 iov[0].iov_base = buf; 863 iov[0].iov_len = len; 864 865 return spdk_posix_sock_readv(sock, iov, 1); 866 } 867 868 static ssize_t 869 spdk_posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 870 { 871 struct spdk_posix_sock *sock = __posix_sock(_sock); 872 int rc; 873 874 /* In order to process a writev, we need to flush any asynchronous writes 875 * first. */ 876 rc = _sock_flush(_sock); 877 if (rc < 0) { 878 return rc; 879 } 880 881 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 882 /* We weren't able to flush all requests */ 883 errno = EAGAIN; 884 return -1; 885 } 886 887 return writev(sock->fd, iov, iovcnt); 888 } 889 890 static void 891 spdk_posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 892 { 893 int rc; 894 895 spdk_sock_request_queue(sock, req); 896 897 /* If there are a sufficient number queued, just flush them out immediately. */ 898 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 899 rc = _sock_flush(sock); 900 if (rc) { 901 spdk_sock_abort_requests(sock); 902 } 903 } 904 } 905 906 static int 907 spdk_posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 908 { 909 struct spdk_posix_sock *sock = __posix_sock(_sock); 910 int val; 911 int rc; 912 913 assert(sock != NULL); 914 915 val = nbytes; 916 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 917 if (rc != 0) { 918 return -1; 919 } 920 return 0; 921 } 922 923 static bool 924 spdk_posix_sock_is_ipv6(struct spdk_sock *_sock) 925 { 926 struct spdk_posix_sock *sock = __posix_sock(_sock); 927 struct sockaddr_storage sa; 928 socklen_t salen; 929 int rc; 930 931 assert(sock != NULL); 932 933 memset(&sa, 0, sizeof sa); 934 salen = sizeof sa; 935 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 936 if (rc != 0) { 937 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 938 return false; 939 } 940 941 return (sa.ss_family == AF_INET6); 942 } 943 944 static bool 945 spdk_posix_sock_is_ipv4(struct spdk_sock *_sock) 946 { 947 struct spdk_posix_sock *sock = __posix_sock(_sock); 948 struct sockaddr_storage sa; 949 socklen_t salen; 950 int rc; 951 952 assert(sock != NULL); 953 954 memset(&sa, 0, sizeof sa); 955 salen = sizeof sa; 956 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 957 if (rc != 0) { 958 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 959 return false; 960 } 961 962 return (sa.ss_family == AF_INET); 963 } 964 965 static bool 966 spdk_posix_sock_is_connected(struct spdk_sock *_sock) 967 { 968 struct spdk_posix_sock *sock = __posix_sock(_sock); 969 uint8_t byte; 970 int rc; 971 972 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 973 if (rc == 0) { 974 return false; 975 } 976 977 if (rc < 0) { 978 if (errno == EAGAIN || errno == EWOULDBLOCK) { 979 return true; 980 } 981 982 return false; 983 } 984 985 return true; 986 } 987 988 static int 989 spdk_posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 990 { 991 int rc = -1; 992 993 #if defined(SO_INCOMING_NAPI_ID) 994 struct spdk_posix_sock *sock = __posix_sock(_sock); 995 socklen_t salen = sizeof(int); 996 997 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 998 if (rc != 0) { 999 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1000 } 1001 1002 #endif 1003 return rc; 1004 } 1005 1006 static struct spdk_sock_group_impl * 1007 spdk_posix_sock_group_impl_create(void) 1008 { 1009 struct spdk_posix_sock_group_impl *group_impl; 1010 int fd; 1011 1012 #if defined(__linux__) 1013 fd = epoll_create1(0); 1014 #elif defined(__FreeBSD__) 1015 fd = kqueue(); 1016 #endif 1017 if (fd == -1) { 1018 return NULL; 1019 } 1020 1021 group_impl = calloc(1, sizeof(*group_impl)); 1022 if (group_impl == NULL) { 1023 SPDK_ERRLOG("group_impl allocation failed\n"); 1024 close(fd); 1025 return NULL; 1026 } 1027 1028 group_impl->fd = fd; 1029 TAILQ_INIT(&group_impl->pending_recv); 1030 1031 return &group_impl->base; 1032 } 1033 1034 static int 1035 spdk_posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1036 { 1037 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1038 struct spdk_posix_sock *sock = __posix_sock(_sock); 1039 int rc; 1040 1041 #if defined(__linux__) 1042 struct epoll_event event; 1043 1044 memset(&event, 0, sizeof(event)); 1045 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1046 event.events = EPOLLIN | EPOLLERR; 1047 event.data.ptr = sock; 1048 1049 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1050 #elif defined(__FreeBSD__) 1051 struct kevent event; 1052 struct timespec ts = {0}; 1053 1054 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1055 1056 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1057 #endif 1058 1059 /* switched from another polling group due to scheduling */ 1060 if (spdk_unlikely(sock->recv_pipe != NULL && 1061 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1062 assert(sock->pending_recv == false); 1063 sock->pending_recv = true; 1064 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1065 } 1066 1067 return rc; 1068 } 1069 1070 static int 1071 spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1072 { 1073 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1074 struct spdk_posix_sock *sock = __posix_sock(_sock); 1075 int rc; 1076 1077 if (sock->recv_pipe != NULL) { 1078 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { 1079 TAILQ_REMOVE(&group->pending_recv, sock, link); 1080 sock->pending_recv = false; 1081 } 1082 assert(sock->pending_recv == false); 1083 } 1084 1085 #if defined(__linux__) 1086 struct epoll_event event; 1087 1088 /* Event parameter is ignored but some old kernel version still require it. */ 1089 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1090 #elif defined(__FreeBSD__) 1091 struct kevent event; 1092 struct timespec ts = {0}; 1093 1094 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1095 1096 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1097 if (rc == 0 && event.flags & EV_ERROR) { 1098 rc = -1; 1099 errno = event.data; 1100 } 1101 #endif 1102 1103 spdk_sock_abort_requests(_sock); 1104 1105 return rc; 1106 } 1107 1108 static int 1109 spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1110 struct spdk_sock **socks) 1111 { 1112 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1113 struct spdk_sock *sock, *tmp; 1114 int num_events, i, rc; 1115 struct spdk_posix_sock *psock, *ptmp; 1116 #if defined(__linux__) 1117 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1118 #elif defined(__FreeBSD__) 1119 struct kevent events[MAX_EVENTS_PER_POLL]; 1120 struct timespec ts = {0}; 1121 #endif 1122 1123 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1124 * a completion callback could remove the sock from the 1125 * group. */ 1126 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1127 rc = _sock_flush(sock); 1128 if (rc) { 1129 spdk_sock_abort_requests(sock); 1130 } 1131 } 1132 1133 #if defined(__linux__) 1134 num_events = epoll_wait(group->fd, events, max_events, 0); 1135 #elif defined(__FreeBSD__) 1136 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1137 #endif 1138 1139 if (num_events == -1) { 1140 return -1; 1141 } 1142 1143 for (i = 0; i < num_events; i++) { 1144 #if defined(__linux__) 1145 sock = events[i].data.ptr; 1146 psock = __posix_sock(sock); 1147 1148 #ifdef SPDK_ZEROCOPY 1149 if (events[i].events & EPOLLERR) { 1150 rc = _sock_check_zcopy(sock); 1151 /* If the socket was closed or removed from 1152 * the group in response to a send ack, don't 1153 * add it to the array here. */ 1154 if (rc || sock->cb_fn == NULL) { 1155 continue; 1156 } 1157 } 1158 #endif 1159 if ((events[i].events & EPOLLIN) == 0) { 1160 continue; 1161 } 1162 1163 #elif defined(__FreeBSD__) 1164 sock = events[i].udata; 1165 psock = __posix_sock(sock); 1166 #endif 1167 1168 /* If the socket does not already have recv pending, add it now */ 1169 if (!psock->pending_recv) { 1170 psock->pending_recv = true; 1171 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1172 } 1173 } 1174 1175 num_events = 0; 1176 1177 TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) { 1178 if (num_events == max_events) { 1179 break; 1180 } 1181 1182 socks[num_events++] = &psock->base; 1183 } 1184 1185 /* Cycle the pending_recv list so that each time we poll things aren't 1186 * in the same order. */ 1187 for (i = 0; i < num_events; i++) { 1188 psock = __posix_sock(socks[i]); 1189 1190 TAILQ_REMOVE(&group->pending_recv, psock, link); 1191 1192 if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) { 1193 psock->pending_recv = false; 1194 } else { 1195 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); 1196 } 1197 1198 } 1199 1200 return num_events; 1201 } 1202 1203 static int 1204 spdk_posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1205 { 1206 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1207 int rc; 1208 1209 rc = close(group->fd); 1210 free(group); 1211 return rc; 1212 } 1213 1214 static struct spdk_net_impl g_posix_net_impl = { 1215 .name = "posix", 1216 .getaddr = spdk_posix_sock_getaddr, 1217 .connect = spdk_posix_sock_connect, 1218 .listen = spdk_posix_sock_listen, 1219 .accept = spdk_posix_sock_accept, 1220 .close = spdk_posix_sock_close, 1221 .recv = spdk_posix_sock_recv, 1222 .readv = spdk_posix_sock_readv, 1223 .writev = spdk_posix_sock_writev, 1224 .writev_async = spdk_posix_sock_writev_async, 1225 .flush = spdk_posix_sock_flush, 1226 .set_recvlowat = spdk_posix_sock_set_recvlowat, 1227 .set_recvbuf = spdk_posix_sock_set_recvbuf, 1228 .set_sendbuf = spdk_posix_sock_set_sendbuf, 1229 .is_ipv6 = spdk_posix_sock_is_ipv6, 1230 .is_ipv4 = spdk_posix_sock_is_ipv4, 1231 .is_connected = spdk_posix_sock_is_connected, 1232 .get_placement_id = spdk_posix_sock_get_placement_id, 1233 .group_impl_create = spdk_posix_sock_group_impl_create, 1234 .group_impl_add_sock = spdk_posix_sock_group_impl_add_sock, 1235 .group_impl_remove_sock = spdk_posix_sock_group_impl_remove_sock, 1236 .group_impl_poll = spdk_posix_sock_group_impl_poll, 1237 .group_impl_close = spdk_posix_sock_group_impl_close, 1238 }; 1239 1240 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1241