1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #if defined(__FreeBSD__) 37 #include <sys/event.h> 38 #define SPDK_KEVENT 39 #else 40 #include <sys/epoll.h> 41 #define SPDK_EPOLL 42 #endif 43 44 #if defined(__linux__) 45 #include <linux/errqueue.h> 46 #endif 47 48 #include "spdk/env.h" 49 #include "spdk/log.h" 50 #include "spdk/pipe.h" 51 #include "spdk/sock.h" 52 #include "spdk/util.h" 53 #include "spdk_internal/sock.h" 54 55 #define MAX_TMPBUF 1024 56 #define PORTNUMLEN 32 57 58 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 59 #define SPDK_ZEROCOPY 60 #endif 61 62 struct spdk_posix_sock { 63 struct spdk_sock base; 64 int fd; 65 66 uint32_t sendmsg_idx; 67 68 struct spdk_pipe *recv_pipe; 69 void *recv_buf; 70 int recv_buf_sz; 71 bool pipe_has_data; 72 bool socket_has_data; 73 bool zcopy; 74 75 int placement_id; 76 77 TAILQ_ENTRY(spdk_posix_sock) link; 78 }; 79 80 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 81 82 struct spdk_posix_sock_group_impl { 83 struct spdk_sock_group_impl base; 84 int fd; 85 struct spdk_has_data_list socks_with_data; 86 int placement_id; 87 }; 88 89 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 90 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 91 .send_buf_size = MIN_SO_SNDBUF_SIZE, 92 .enable_recv_pipe = true, 93 .enable_zerocopy_send = true, 94 .enable_quickack = false, 95 .enable_placement_id = PLACEMENT_NONE, 96 .enable_zerocopy_send_server = true, 97 .enable_zerocopy_send_client = false 98 }; 99 100 static struct spdk_sock_map g_map = { 101 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 102 .mtx = PTHREAD_MUTEX_INITIALIZER 103 }; 104 105 __attribute((destructor)) static void 106 posix_sock_map_cleanup(void) 107 { 108 spdk_sock_map_cleanup(&g_map); 109 } 110 111 static int 112 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 113 { 114 const char *result = NULL; 115 116 if (sa == NULL || host == NULL) { 117 return -1; 118 } 119 120 switch (sa->sa_family) { 121 case AF_INET: 122 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 123 host, hlen); 124 break; 125 case AF_INET6: 126 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 127 host, hlen); 128 break; 129 default: 130 break; 131 } 132 133 if (result != NULL) { 134 return 0; 135 } else { 136 return -1; 137 } 138 } 139 140 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 141 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 142 143 static int 144 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 145 char *caddr, int clen, uint16_t *cport) 146 { 147 struct spdk_posix_sock *sock = __posix_sock(_sock); 148 struct sockaddr_storage sa; 149 socklen_t salen; 150 int rc; 151 152 assert(sock != NULL); 153 154 memset(&sa, 0, sizeof sa); 155 salen = sizeof sa; 156 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 157 if (rc != 0) { 158 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 159 return -1; 160 } 161 162 switch (sa.ss_family) { 163 case AF_UNIX: 164 /* Acceptable connection types that don't have IPs */ 165 return 0; 166 case AF_INET: 167 case AF_INET6: 168 /* Code below will get IP addresses */ 169 break; 170 default: 171 /* Unsupported socket family */ 172 return -1; 173 } 174 175 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 176 if (rc != 0) { 177 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 178 return -1; 179 } 180 181 if (sport) { 182 if (sa.ss_family == AF_INET) { 183 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 184 } else if (sa.ss_family == AF_INET6) { 185 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 186 } 187 } 188 189 memset(&sa, 0, sizeof sa); 190 salen = sizeof sa; 191 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 192 if (rc != 0) { 193 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 194 return -1; 195 } 196 197 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 198 if (rc != 0) { 199 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 200 return -1; 201 } 202 203 if (cport) { 204 if (sa.ss_family == AF_INET) { 205 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 206 } else if (sa.ss_family == AF_INET6) { 207 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 208 } 209 } 210 211 return 0; 212 } 213 214 enum posix_sock_create_type { 215 SPDK_SOCK_CREATE_LISTEN, 216 SPDK_SOCK_CREATE_CONNECT, 217 }; 218 219 static int 220 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 221 { 222 uint8_t *new_buf; 223 struct spdk_pipe *new_pipe; 224 struct iovec siov[2]; 225 struct iovec diov[2]; 226 int sbytes; 227 ssize_t bytes; 228 229 if (sock->recv_buf_sz == sz) { 230 return 0; 231 } 232 233 /* If the new size is 0, just free the pipe */ 234 if (sz == 0) { 235 spdk_pipe_destroy(sock->recv_pipe); 236 free(sock->recv_buf); 237 sock->recv_pipe = NULL; 238 sock->recv_buf = NULL; 239 return 0; 240 } else if (sz < MIN_SOCK_PIPE_SIZE) { 241 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 242 return -1; 243 } 244 245 /* Round up to next 64 byte multiple */ 246 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 247 if (!new_buf) { 248 SPDK_ERRLOG("socket recv buf allocation failed\n"); 249 return -ENOMEM; 250 } 251 252 new_pipe = spdk_pipe_create(new_buf, sz + 1); 253 if (new_pipe == NULL) { 254 SPDK_ERRLOG("socket pipe allocation failed\n"); 255 free(new_buf); 256 return -ENOMEM; 257 } 258 259 if (sock->recv_pipe != NULL) { 260 /* Pull all of the data out of the old pipe */ 261 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 262 if (sbytes > sz) { 263 /* Too much data to fit into the new pipe size */ 264 spdk_pipe_destroy(new_pipe); 265 free(new_buf); 266 return -EINVAL; 267 } 268 269 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 270 assert(sbytes == sz); 271 272 bytes = spdk_iovcpy(siov, 2, diov, 2); 273 spdk_pipe_writer_advance(new_pipe, bytes); 274 275 spdk_pipe_destroy(sock->recv_pipe); 276 free(sock->recv_buf); 277 } 278 279 sock->recv_buf_sz = sz; 280 sock->recv_buf = new_buf; 281 sock->recv_pipe = new_pipe; 282 283 return 0; 284 } 285 286 static int 287 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 288 { 289 struct spdk_posix_sock *sock = __posix_sock(_sock); 290 int rc; 291 292 assert(sock != NULL); 293 294 if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) { 295 rc = posix_sock_alloc_pipe(sock, sz); 296 if (rc) { 297 return rc; 298 } 299 } 300 301 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ 302 if (sz < MIN_SO_RCVBUF_SIZE) { 303 sz = MIN_SO_RCVBUF_SIZE; 304 } 305 306 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 307 if (rc < 0) { 308 return rc; 309 } 310 311 return 0; 312 } 313 314 static int 315 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 316 { 317 struct spdk_posix_sock *sock = __posix_sock(_sock); 318 int rc; 319 320 assert(sock != NULL); 321 322 if (sz < MIN_SO_SNDBUF_SIZE) { 323 sz = MIN_SO_SNDBUF_SIZE; 324 } 325 326 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 327 if (rc < 0) { 328 return rc; 329 } 330 331 return 0; 332 } 333 334 static struct spdk_posix_sock * 335 posix_sock_alloc(int fd, bool enable_zero_copy) 336 { 337 struct spdk_posix_sock *sock; 338 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 339 int flag; 340 int rc; 341 #endif 342 343 sock = calloc(1, sizeof(*sock)); 344 if (sock == NULL) { 345 SPDK_ERRLOG("sock allocation failed\n"); 346 return NULL; 347 } 348 349 sock->fd = fd; 350 351 #if defined(SPDK_ZEROCOPY) 352 flag = 1; 353 354 if (enable_zero_copy) { 355 /* Try to turn on zero copy sends */ 356 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 357 if (rc == 0) { 358 sock->zcopy = true; 359 } 360 } 361 #endif 362 363 #if defined(__linux__) 364 flag = 1; 365 366 if (g_spdk_posix_sock_impl_opts.enable_quickack) { 367 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 368 if (rc != 0) { 369 SPDK_ERRLOG("quickack was failed to set\n"); 370 } 371 } 372 373 spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id, 374 &sock->placement_id); 375 376 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 377 /* Save placement_id */ 378 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 379 } 380 #endif 381 382 return sock; 383 } 384 385 static bool 386 sock_is_loopback(int fd) 387 { 388 struct ifaddrs *addrs, *tmp; 389 struct sockaddr_storage sa = {}; 390 socklen_t salen; 391 struct ifreq ifr = {}; 392 char ip_addr[256], ip_addr_tmp[256]; 393 int rc; 394 bool is_loopback = false; 395 396 salen = sizeof(sa); 397 rc = getsockname(fd, (struct sockaddr *)&sa, &salen); 398 if (rc != 0) { 399 return is_loopback; 400 } 401 402 memset(ip_addr, 0, sizeof(ip_addr)); 403 rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr)); 404 if (rc != 0) { 405 return is_loopback; 406 } 407 408 getifaddrs(&addrs); 409 for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) { 410 if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) && 411 (tmp->ifa_addr->sa_family == sa.ss_family)) { 412 memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp)); 413 rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp)); 414 if (rc != 0) { 415 continue; 416 } 417 418 if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) { 419 memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name)); 420 ioctl(fd, SIOCGIFFLAGS, &ifr); 421 if (ifr.ifr_flags & IFF_LOOPBACK) { 422 is_loopback = true; 423 } 424 goto end; 425 } 426 } 427 } 428 429 end: 430 freeifaddrs(addrs); 431 return is_loopback; 432 } 433 434 static struct spdk_sock * 435 posix_sock_create(const char *ip, int port, 436 enum posix_sock_create_type type, 437 struct spdk_sock_opts *opts) 438 { 439 struct spdk_posix_sock *sock; 440 char buf[MAX_TMPBUF]; 441 char portnum[PORTNUMLEN]; 442 char *p; 443 struct addrinfo hints, *res, *res0; 444 int fd, flag; 445 int val = 1; 446 int rc, sz; 447 bool enable_zcopy_user_opts = true; 448 bool enable_zcopy_impl_opts = true; 449 450 assert(opts != NULL); 451 452 if (ip == NULL) { 453 return NULL; 454 } 455 if (ip[0] == '[') { 456 snprintf(buf, sizeof(buf), "%s", ip + 1); 457 p = strchr(buf, ']'); 458 if (p != NULL) { 459 *p = '\0'; 460 } 461 ip = (const char *) &buf[0]; 462 } 463 464 snprintf(portnum, sizeof portnum, "%d", port); 465 memset(&hints, 0, sizeof hints); 466 hints.ai_family = PF_UNSPEC; 467 hints.ai_socktype = SOCK_STREAM; 468 hints.ai_flags = AI_NUMERICSERV; 469 hints.ai_flags |= AI_PASSIVE; 470 hints.ai_flags |= AI_NUMERICHOST; 471 rc = getaddrinfo(ip, portnum, &hints, &res0); 472 if (rc != 0) { 473 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 474 return NULL; 475 } 476 477 /* try listen */ 478 fd = -1; 479 for (res = res0; res != NULL; res = res->ai_next) { 480 retry: 481 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 482 if (fd < 0) { 483 /* error */ 484 continue; 485 } 486 487 sz = g_spdk_posix_sock_impl_opts.recv_buf_size; 488 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 489 if (rc) { 490 /* Not fatal */ 491 } 492 493 sz = g_spdk_posix_sock_impl_opts.send_buf_size; 494 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 495 if (rc) { 496 /* Not fatal */ 497 } 498 499 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 500 if (rc != 0) { 501 close(fd); 502 fd = -1; 503 /* error */ 504 continue; 505 } 506 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 507 if (rc != 0) { 508 close(fd); 509 fd = -1; 510 /* error */ 511 continue; 512 } 513 514 #if defined(SO_PRIORITY) 515 if (opts->priority) { 516 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 517 if (rc != 0) { 518 close(fd); 519 fd = -1; 520 /* error */ 521 continue; 522 } 523 } 524 #endif 525 526 if (res->ai_family == AF_INET6) { 527 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 528 if (rc != 0) { 529 close(fd); 530 fd = -1; 531 /* error */ 532 continue; 533 } 534 } 535 536 if (type == SPDK_SOCK_CREATE_LISTEN) { 537 rc = bind(fd, res->ai_addr, res->ai_addrlen); 538 if (rc != 0) { 539 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 540 switch (errno) { 541 case EINTR: 542 /* interrupted? */ 543 close(fd); 544 goto retry; 545 case EADDRNOTAVAIL: 546 SPDK_ERRLOG("IP address %s not available. " 547 "Verify IP address in config file " 548 "and make sure setup script is " 549 "run before starting spdk app.\n", ip); 550 /* FALLTHROUGH */ 551 default: 552 /* try next family */ 553 close(fd); 554 fd = -1; 555 continue; 556 } 557 } 558 /* bind OK */ 559 rc = listen(fd, 512); 560 if (rc != 0) { 561 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 562 close(fd); 563 fd = -1; 564 break; 565 } 566 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server && 567 g_spdk_posix_sock_impl_opts.enable_zerocopy_send; 568 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 569 rc = connect(fd, res->ai_addr, res->ai_addrlen); 570 if (rc != 0) { 571 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 572 /* try next family */ 573 close(fd); 574 fd = -1; 575 continue; 576 } 577 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client && 578 g_spdk_posix_sock_impl_opts.enable_zerocopy_send; 579 } 580 581 flag = fcntl(fd, F_GETFL); 582 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 583 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 584 close(fd); 585 fd = -1; 586 break; 587 } 588 break; 589 } 590 freeaddrinfo(res0); 591 592 if (fd < 0) { 593 return NULL; 594 } 595 596 /* Only enable zero copy for non-loopback sockets. */ 597 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 598 599 sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts); 600 if (sock == NULL) { 601 SPDK_ERRLOG("sock allocation failed\n"); 602 close(fd); 603 return NULL; 604 } 605 606 return &sock->base; 607 } 608 609 static struct spdk_sock * 610 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 611 { 612 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 613 } 614 615 static struct spdk_sock * 616 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 617 { 618 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 619 } 620 621 static struct spdk_sock * 622 posix_sock_accept(struct spdk_sock *_sock) 623 { 624 struct spdk_posix_sock *sock = __posix_sock(_sock); 625 struct sockaddr_storage sa; 626 socklen_t salen; 627 int rc, fd; 628 struct spdk_posix_sock *new_sock; 629 int flag; 630 631 memset(&sa, 0, sizeof(sa)); 632 salen = sizeof(sa); 633 634 assert(sock != NULL); 635 636 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 637 638 if (rc == -1) { 639 return NULL; 640 } 641 642 fd = rc; 643 644 flag = fcntl(fd, F_GETFL); 645 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 646 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 647 close(fd); 648 return NULL; 649 } 650 651 #if defined(SO_PRIORITY) 652 /* The priority is not inherited, so call this function again */ 653 if (sock->base.opts.priority) { 654 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 655 if (rc != 0) { 656 close(fd); 657 return NULL; 658 } 659 } 660 #endif 661 662 /* Inherit the zero copy feature from the listen socket */ 663 new_sock = posix_sock_alloc(fd, sock->zcopy); 664 if (new_sock == NULL) { 665 close(fd); 666 return NULL; 667 } 668 669 return &new_sock->base; 670 } 671 672 static int 673 posix_sock_close(struct spdk_sock *_sock) 674 { 675 struct spdk_posix_sock *sock = __posix_sock(_sock); 676 677 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 678 679 /* If the socket fails to close, the best choice is to 680 * leak the fd but continue to free the rest of the sock 681 * memory. */ 682 close(sock->fd); 683 684 spdk_pipe_destroy(sock->recv_pipe); 685 free(sock->recv_buf); 686 free(sock); 687 688 return 0; 689 } 690 691 #ifdef SPDK_ZEROCOPY 692 static int 693 _sock_check_zcopy(struct spdk_sock *sock) 694 { 695 struct spdk_posix_sock *psock = __posix_sock(sock); 696 struct msghdr msgh = {}; 697 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 698 ssize_t rc; 699 struct sock_extended_err *serr; 700 struct cmsghdr *cm; 701 uint32_t idx; 702 struct spdk_sock_request *req, *treq; 703 bool found; 704 705 msgh.msg_control = buf; 706 msgh.msg_controllen = sizeof(buf); 707 708 while (true) { 709 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 710 711 if (rc < 0) { 712 if (errno == EWOULDBLOCK || errno == EAGAIN) { 713 return 0; 714 } 715 716 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 717 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 718 } else { 719 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 720 } 721 return 0; 722 } 723 724 cm = CMSG_FIRSTHDR(&msgh); 725 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 726 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 727 return 0; 728 } 729 730 serr = (struct sock_extended_err *)CMSG_DATA(cm); 731 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 732 SPDK_WARNLOG("Unexpected extended error origin\n"); 733 return 0; 734 } 735 736 /* Most of the time, the pending_reqs array is in the exact 737 * order we need such that all of the requests to complete are 738 * in order, in the front. It is guaranteed that all requests 739 * belonging to the same sendmsg call are sequential, so once 740 * we encounter one match we can stop looping as soon as a 741 * non-match is found. 742 */ 743 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 744 found = false; 745 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 746 if (req->internal.offset == idx) { 747 found = true; 748 749 rc = spdk_sock_request_put(sock, req, 0); 750 if (rc < 0) { 751 return rc; 752 } 753 754 } else if (found) { 755 break; 756 } 757 } 758 } 759 } 760 761 return 0; 762 } 763 #endif 764 765 static int 766 _sock_flush(struct spdk_sock *sock) 767 { 768 struct spdk_posix_sock *psock = __posix_sock(sock); 769 struct msghdr msg = {}; 770 int flags; 771 struct iovec iovs[IOV_BATCH_SIZE]; 772 int iovcnt; 773 int retval; 774 struct spdk_sock_request *req; 775 int i; 776 ssize_t rc; 777 unsigned int offset; 778 size_t len; 779 780 /* Can't flush from within a callback or we end up with recursive calls */ 781 if (sock->cb_cnt > 0) { 782 return 0; 783 } 784 785 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL); 786 787 if (iovcnt == 0) { 788 return 0; 789 } 790 791 /* Perform the vectored write */ 792 msg.msg_iov = iovs; 793 msg.msg_iovlen = iovcnt; 794 #ifdef SPDK_ZEROCOPY 795 if (psock->zcopy) { 796 flags = MSG_ZEROCOPY; 797 } else 798 #endif 799 { 800 flags = 0; 801 } 802 rc = sendmsg(psock->fd, &msg, flags); 803 if (rc <= 0) { 804 if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 805 return 0; 806 } 807 return rc; 808 } 809 810 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 811 * req->internal.offset, so sendmsg_idx should not be zero */ 812 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 813 psock->sendmsg_idx = 1; 814 } else { 815 psock->sendmsg_idx++; 816 } 817 818 /* Consume the requests that were actually written */ 819 req = TAILQ_FIRST(&sock->queued_reqs); 820 while (req) { 821 offset = req->internal.offset; 822 823 for (i = 0; i < req->iovcnt; i++) { 824 /* Advance by the offset first */ 825 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 826 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 827 continue; 828 } 829 830 /* Calculate the remaining length of this element */ 831 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 832 833 if (len > (size_t)rc) { 834 /* This element was partially sent. */ 835 req->internal.offset += rc; 836 return 0; 837 } 838 839 offset = 0; 840 req->internal.offset += len; 841 rc -= len; 842 } 843 844 /* Handled a full request. */ 845 spdk_sock_request_pend(sock, req); 846 847 if (!psock->zcopy) { 848 /* The sendmsg syscall above isn't currently asynchronous, 849 * so it's already done. */ 850 retval = spdk_sock_request_put(sock, req, 0); 851 if (retval) { 852 break; 853 } 854 } else { 855 /* Re-use the offset field to hold the sendmsg call index. The 856 * index is 0 based, so subtract one here because we've already 857 * incremented above. */ 858 req->internal.offset = psock->sendmsg_idx - 1; 859 } 860 861 if (rc == 0) { 862 break; 863 } 864 865 req = TAILQ_FIRST(&sock->queued_reqs); 866 } 867 868 return 0; 869 } 870 871 static int 872 posix_sock_flush(struct spdk_sock *sock) 873 { 874 #ifdef SPDK_ZEROCOPY 875 struct spdk_posix_sock *psock = __posix_sock(sock); 876 877 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 878 _sock_check_zcopy(sock); 879 } 880 #endif 881 882 return _sock_flush(sock); 883 } 884 885 static ssize_t 886 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 887 { 888 struct iovec siov[2]; 889 int sbytes; 890 ssize_t bytes; 891 struct spdk_posix_sock_group_impl *group; 892 893 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 894 if (sbytes < 0) { 895 errno = EINVAL; 896 return -1; 897 } else if (sbytes == 0) { 898 errno = EAGAIN; 899 return -1; 900 } 901 902 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 903 904 if (bytes == 0) { 905 /* The only way this happens is if diov is 0 length */ 906 errno = EINVAL; 907 return -1; 908 } 909 910 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 911 912 /* If we drained the pipe, mark it appropriately */ 913 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 914 assert(sock->pipe_has_data == true); 915 916 group = __posix_group_impl(sock->base.group_impl); 917 if (group && !sock->socket_has_data) { 918 TAILQ_REMOVE(&group->socks_with_data, sock, link); 919 } 920 921 sock->pipe_has_data = false; 922 } 923 924 return bytes; 925 } 926 927 static inline ssize_t 928 posix_sock_read(struct spdk_posix_sock *sock) 929 { 930 struct iovec iov[2]; 931 int bytes_avail, bytes_recvd; 932 struct spdk_posix_sock_group_impl *group; 933 934 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 935 936 if (bytes_avail <= 0) { 937 return bytes_avail; 938 } 939 940 bytes_recvd = readv(sock->fd, iov, 2); 941 942 assert(sock->pipe_has_data == false); 943 944 if (bytes_recvd <= 0) { 945 /* Errors count as draining the socket data */ 946 if (sock->base.group_impl && sock->socket_has_data) { 947 group = __posix_group_impl(sock->base.group_impl); 948 TAILQ_REMOVE(&group->socks_with_data, sock, link); 949 } 950 951 sock->socket_has_data = false; 952 953 return bytes_recvd; 954 } 955 956 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 957 958 #if DEBUG 959 if (sock->base.group_impl) { 960 assert(sock->socket_has_data == true); 961 } 962 #endif 963 964 sock->pipe_has_data = true; 965 sock->socket_has_data = false; 966 967 return bytes_recvd; 968 } 969 970 static ssize_t 971 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 972 { 973 struct spdk_posix_sock *sock = __posix_sock(_sock); 974 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 975 int rc, i; 976 size_t len; 977 978 if (sock->recv_pipe == NULL) { 979 assert(sock->pipe_has_data == false); 980 if (group && sock->socket_has_data) { 981 sock->socket_has_data = false; 982 TAILQ_REMOVE(&group->socks_with_data, sock, link); 983 } 984 return readv(sock->fd, iov, iovcnt); 985 } 986 987 /* If the socket is not in a group, we must assume it always has 988 * data waiting for us because it is not epolled */ 989 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 990 /* If the user is receiving a sufficiently large amount of data, 991 * receive directly to their buffers. */ 992 len = 0; 993 for (i = 0; i < iovcnt; i++) { 994 len += iov[i].iov_len; 995 } 996 997 if (len >= MIN_SOCK_PIPE_SIZE) { 998 /* TODO: Should this detect if kernel socket is drained? */ 999 return readv(sock->fd, iov, iovcnt); 1000 } 1001 1002 /* Otherwise, do a big read into our pipe */ 1003 rc = posix_sock_read(sock); 1004 if (rc <= 0) { 1005 return rc; 1006 } 1007 } 1008 1009 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 1010 } 1011 1012 static ssize_t 1013 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1014 { 1015 struct iovec iov[1]; 1016 1017 iov[0].iov_base = buf; 1018 iov[0].iov_len = len; 1019 1020 return posix_sock_readv(sock, iov, 1); 1021 } 1022 1023 static ssize_t 1024 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1025 { 1026 struct spdk_posix_sock *sock = __posix_sock(_sock); 1027 int rc; 1028 1029 /* In order to process a writev, we need to flush any asynchronous writes 1030 * first. */ 1031 rc = _sock_flush(_sock); 1032 if (rc < 0) { 1033 return rc; 1034 } 1035 1036 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 1037 /* We weren't able to flush all requests */ 1038 errno = EAGAIN; 1039 return -1; 1040 } 1041 1042 return writev(sock->fd, iov, iovcnt); 1043 } 1044 1045 static void 1046 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1047 { 1048 int rc; 1049 1050 spdk_sock_request_queue(sock, req); 1051 1052 /* If there are a sufficient number queued, just flush them out immediately. */ 1053 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1054 rc = _sock_flush(sock); 1055 if (rc) { 1056 spdk_sock_abort_requests(sock); 1057 } 1058 } 1059 } 1060 1061 static int 1062 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1063 { 1064 struct spdk_posix_sock *sock = __posix_sock(_sock); 1065 int val; 1066 int rc; 1067 1068 assert(sock != NULL); 1069 1070 val = nbytes; 1071 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1072 if (rc != 0) { 1073 return -1; 1074 } 1075 return 0; 1076 } 1077 1078 static bool 1079 posix_sock_is_ipv6(struct spdk_sock *_sock) 1080 { 1081 struct spdk_posix_sock *sock = __posix_sock(_sock); 1082 struct sockaddr_storage sa; 1083 socklen_t salen; 1084 int rc; 1085 1086 assert(sock != NULL); 1087 1088 memset(&sa, 0, sizeof sa); 1089 salen = sizeof sa; 1090 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1091 if (rc != 0) { 1092 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1093 return false; 1094 } 1095 1096 return (sa.ss_family == AF_INET6); 1097 } 1098 1099 static bool 1100 posix_sock_is_ipv4(struct spdk_sock *_sock) 1101 { 1102 struct spdk_posix_sock *sock = __posix_sock(_sock); 1103 struct sockaddr_storage sa; 1104 socklen_t salen; 1105 int rc; 1106 1107 assert(sock != NULL); 1108 1109 memset(&sa, 0, sizeof sa); 1110 salen = sizeof sa; 1111 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1112 if (rc != 0) { 1113 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1114 return false; 1115 } 1116 1117 return (sa.ss_family == AF_INET); 1118 } 1119 1120 static bool 1121 posix_sock_is_connected(struct spdk_sock *_sock) 1122 { 1123 struct spdk_posix_sock *sock = __posix_sock(_sock); 1124 uint8_t byte; 1125 int rc; 1126 1127 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1128 if (rc == 0) { 1129 return false; 1130 } 1131 1132 if (rc < 0) { 1133 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1134 return true; 1135 } 1136 1137 return false; 1138 } 1139 1140 return true; 1141 } 1142 1143 static struct spdk_sock_group_impl * 1144 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock) 1145 { 1146 struct spdk_posix_sock *sock = __posix_sock(_sock); 1147 struct spdk_sock_group_impl *group_impl; 1148 1149 if (sock->placement_id != -1) { 1150 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl); 1151 return group_impl; 1152 } 1153 1154 return NULL; 1155 } 1156 1157 static struct spdk_sock_group_impl * 1158 posix_sock_group_impl_create(void) 1159 { 1160 struct spdk_posix_sock_group_impl *group_impl; 1161 int fd; 1162 1163 #if defined(SPDK_EPOLL) 1164 fd = epoll_create1(0); 1165 #elif defined(SPDK_KEVENT) 1166 fd = kqueue(); 1167 #endif 1168 if (fd == -1) { 1169 return NULL; 1170 } 1171 1172 group_impl = calloc(1, sizeof(*group_impl)); 1173 if (group_impl == NULL) { 1174 SPDK_ERRLOG("group_impl allocation failed\n"); 1175 close(fd); 1176 return NULL; 1177 } 1178 1179 group_impl->fd = fd; 1180 TAILQ_INIT(&group_impl->socks_with_data); 1181 group_impl->placement_id = -1; 1182 1183 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1184 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1185 group_impl->placement_id = spdk_env_get_current_core(); 1186 } 1187 1188 return &group_impl->base; 1189 } 1190 1191 static void 1192 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1193 int placement_id) 1194 { 1195 #if defined(SO_MARK) 1196 int rc; 1197 1198 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1199 &placement_id, sizeof(placement_id)); 1200 if (rc != 0) { 1201 /* Not fatal */ 1202 SPDK_ERRLOG("Error setting SO_MARK\n"); 1203 return; 1204 } 1205 1206 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1207 if (rc != 0) { 1208 /* Not fatal */ 1209 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1210 return; 1211 } 1212 1213 sock->placement_id = placement_id; 1214 #endif 1215 } 1216 1217 static void 1218 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1219 { 1220 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1221 1222 if (group->placement_id == -1) { 1223 group->placement_id = spdk_sock_map_find_free(&g_map); 1224 1225 /* If a free placement id is found, update existing sockets in this group */ 1226 if (group->placement_id != -1) { 1227 struct spdk_sock *sock, *tmp; 1228 1229 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1230 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1231 } 1232 } 1233 } 1234 1235 if (group->placement_id != -1) { 1236 /* 1237 * group placement id is already determined for this poll group. 1238 * Mark socket with group's placement id. 1239 */ 1240 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1241 } 1242 } 1243 1244 static int 1245 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1246 { 1247 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1248 struct spdk_posix_sock *sock = __posix_sock(_sock); 1249 int rc; 1250 1251 #if defined(SPDK_EPOLL) 1252 struct epoll_event event; 1253 1254 memset(&event, 0, sizeof(event)); 1255 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1256 event.events = EPOLLIN | EPOLLERR; 1257 event.data.ptr = sock; 1258 1259 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1260 #elif defined(SPDK_KEVENT) 1261 struct kevent event; 1262 struct timespec ts = {0}; 1263 1264 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1265 1266 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1267 #endif 1268 1269 if (rc != 0) { 1270 return rc; 1271 } 1272 1273 /* switched from another polling group due to scheduling */ 1274 if (spdk_unlikely(sock->recv_pipe != NULL && 1275 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1276 sock->pipe_has_data = true; 1277 sock->socket_has_data = false; 1278 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1279 } 1280 1281 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 1282 posix_sock_update_mark(_group, _sock); 1283 } else if (sock->placement_id != -1) { 1284 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1285 if (rc != 0) { 1286 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1287 /* Do not treat this as an error. The system will continue running. */ 1288 } 1289 } 1290 1291 return rc; 1292 } 1293 1294 static int 1295 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1296 { 1297 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1298 struct spdk_posix_sock *sock = __posix_sock(_sock); 1299 int rc; 1300 1301 if (sock->pipe_has_data || sock->socket_has_data) { 1302 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1303 sock->pipe_has_data = false; 1304 sock->socket_has_data = false; 1305 } 1306 1307 if (sock->placement_id != -1) { 1308 spdk_sock_map_release(&g_map, sock->placement_id); 1309 } 1310 1311 #if defined(SPDK_EPOLL) 1312 struct epoll_event event; 1313 1314 /* Event parameter is ignored but some old kernel version still require it. */ 1315 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1316 #elif defined(SPDK_KEVENT) 1317 struct kevent event; 1318 struct timespec ts = {0}; 1319 1320 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1321 1322 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1323 if (rc == 0 && event.flags & EV_ERROR) { 1324 rc = -1; 1325 errno = event.data; 1326 } 1327 #endif 1328 1329 spdk_sock_abort_requests(_sock); 1330 1331 return rc; 1332 } 1333 1334 static int 1335 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1336 struct spdk_sock **socks) 1337 { 1338 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1339 struct spdk_sock *sock, *tmp; 1340 int num_events, i, rc; 1341 struct spdk_posix_sock *psock, *ptmp; 1342 #if defined(SPDK_EPOLL) 1343 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1344 #elif defined(SPDK_KEVENT) 1345 struct kevent events[MAX_EVENTS_PER_POLL]; 1346 struct timespec ts = {0}; 1347 #endif 1348 1349 #ifdef SPDK_ZEROCOPY 1350 /* When all of the following conditions are met 1351 * - non-blocking socket 1352 * - zero copy is enabled 1353 * - interrupts suppressed (i.e. busy polling) 1354 * - the NIC tx queue is full at the time sendmsg() is called 1355 * - epoll_wait determines there is an EPOLLIN event for the socket 1356 * then we can get into a situation where data we've sent is queued 1357 * up in the kernel network stack, but interrupts have been suppressed 1358 * because other traffic is flowing so the kernel misses the signal 1359 * to flush the software tx queue. If there wasn't incoming data 1360 * pending on the socket, then epoll_wait would have been sufficient 1361 * to kick off the send operation, but since there is a pending event 1362 * epoll_wait does not trigger the necessary operation. 1363 * 1364 * We deal with this by checking for all of the above conditions and 1365 * additionally looking for EPOLLIN events that were not consumed from 1366 * the last poll loop. We take this to mean that the upper layer is 1367 * unable to consume them because it is blocked waiting for resources 1368 * to free up, and those resources are most likely freed in response 1369 * to a pending asynchronous write completing. 1370 * 1371 * Additionally, sockets that have the same placement_id actually share 1372 * an underlying hardware queue. That means polling one of them is 1373 * equivalent to polling all of them. As a quick mechanism to avoid 1374 * making extra poll() calls, stash the last placement_id during the loop 1375 * and only poll if it's not the same. The overwhelmingly common case 1376 * is that all sockets in this list have the same placement_id because 1377 * SPDK is intentionally grouping sockets by that value, so even 1378 * though this won't stop all extra calls to poll(), it's very fast 1379 * and will catch all of them in practice. 1380 */ 1381 int last_placement_id = -1; 1382 1383 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 1384 if (psock->zcopy && psock->placement_id >= 0 && 1385 psock->placement_id != last_placement_id) { 1386 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 1387 1388 poll(&pfd, 1, 0); 1389 last_placement_id = psock->placement_id; 1390 } 1391 } 1392 #endif 1393 1394 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1395 * a completion callback could remove the sock from the 1396 * group. */ 1397 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1398 rc = _sock_flush(sock); 1399 if (rc) { 1400 spdk_sock_abort_requests(sock); 1401 } 1402 } 1403 1404 assert(max_events > 0); 1405 1406 #if defined(SPDK_EPOLL) 1407 num_events = epoll_wait(group->fd, events, max_events, 0); 1408 #elif defined(SPDK_KEVENT) 1409 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1410 #endif 1411 1412 if (num_events == -1) { 1413 return -1; 1414 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 1415 sock = TAILQ_FIRST(&_group->socks); 1416 psock = __posix_sock(sock); 1417 /* poll() is called here to busy poll the queue associated with 1418 * first socket in list and potentially reap incoming data. 1419 */ 1420 if (sock->opts.priority) { 1421 struct pollfd pfd = {0, 0, 0}; 1422 1423 pfd.fd = psock->fd; 1424 pfd.events = POLLIN | POLLERR; 1425 poll(&pfd, 1, 0); 1426 } 1427 } 1428 1429 for (i = 0; i < num_events; i++) { 1430 #if defined(SPDK_EPOLL) 1431 sock = events[i].data.ptr; 1432 psock = __posix_sock(sock); 1433 1434 #ifdef SPDK_ZEROCOPY 1435 if (events[i].events & EPOLLERR) { 1436 rc = _sock_check_zcopy(sock); 1437 /* If the socket was closed or removed from 1438 * the group in response to a send ack, don't 1439 * add it to the array here. */ 1440 if (rc || sock->cb_fn == NULL) { 1441 continue; 1442 } 1443 } 1444 #endif 1445 if ((events[i].events & EPOLLIN) == 0) { 1446 continue; 1447 } 1448 1449 #elif defined(SPDK_KEVENT) 1450 sock = events[i].udata; 1451 psock = __posix_sock(sock); 1452 #endif 1453 1454 /* If the socket is not already in the list, add it now */ 1455 if (!psock->socket_has_data && !psock->pipe_has_data) { 1456 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 1457 } 1458 1459 psock->socket_has_data = true; 1460 } 1461 1462 num_events = 0; 1463 1464 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 1465 if (num_events == max_events) { 1466 break; 1467 } 1468 1469 /* If the socket's cb_fn is NULL, just remove it from the 1470 * list and do not add it to socks array */ 1471 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 1472 psock->socket_has_data = false; 1473 psock->pipe_has_data = false; 1474 TAILQ_REMOVE(&group->socks_with_data, psock, link); 1475 continue; 1476 } 1477 1478 socks[num_events++] = &psock->base; 1479 } 1480 1481 /* Cycle the has_data list so that each time we poll things aren't 1482 * in the same order. Say we have 6 sockets in the list, named as follows: 1483 * A B C D E F 1484 * And all 6 sockets had epoll events, but max_events is only 3. That means 1485 * psock currently points at D. We want to rearrange the list to the following: 1486 * D E F A B C 1487 * 1488 * The variables below are named according to this example to make it easier to 1489 * follow the swaps. 1490 */ 1491 if (psock != NULL) { 1492 struct spdk_posix_sock *pa, *pc, *pd, *pf; 1493 1494 /* Capture pointers to the elements we need */ 1495 pd = psock; 1496 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 1497 pa = TAILQ_FIRST(&group->socks_with_data); 1498 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 1499 1500 /* Break the link between C and D */ 1501 pc->link.tqe_next = NULL; 1502 1503 /* Connect F to A */ 1504 pf->link.tqe_next = pa; 1505 pa->link.tqe_prev = &pf->link.tqe_next; 1506 1507 /* Fix up the list first/last pointers */ 1508 group->socks_with_data.tqh_first = pd; 1509 group->socks_with_data.tqh_last = &pc->link.tqe_next; 1510 1511 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1512 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 1513 } 1514 1515 return num_events; 1516 } 1517 1518 static int 1519 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1520 { 1521 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1522 int rc; 1523 1524 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1525 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1526 } 1527 1528 rc = close(group->fd); 1529 free(group); 1530 return rc; 1531 } 1532 1533 static int 1534 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1535 { 1536 if (!opts || !len) { 1537 errno = EINVAL; 1538 return -1; 1539 } 1540 memset(opts, 0, *len); 1541 1542 #define FIELD_OK(field) \ 1543 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1544 1545 #define GET_FIELD(field) \ 1546 if (FIELD_OK(field)) { \ 1547 opts->field = g_spdk_posix_sock_impl_opts.field; \ 1548 } 1549 1550 GET_FIELD(recv_buf_size); 1551 GET_FIELD(send_buf_size); 1552 GET_FIELD(enable_recv_pipe); 1553 GET_FIELD(enable_zerocopy_send); 1554 GET_FIELD(enable_quickack); 1555 GET_FIELD(enable_placement_id); 1556 GET_FIELD(enable_zerocopy_send_server); 1557 GET_FIELD(enable_zerocopy_send_client); 1558 1559 #undef GET_FIELD 1560 #undef FIELD_OK 1561 1562 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 1563 return 0; 1564 } 1565 1566 static int 1567 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1568 { 1569 if (!opts) { 1570 errno = EINVAL; 1571 return -1; 1572 } 1573 1574 #define FIELD_OK(field) \ 1575 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1576 1577 #define SET_FIELD(field) \ 1578 if (FIELD_OK(field)) { \ 1579 g_spdk_posix_sock_impl_opts.field = opts->field; \ 1580 } 1581 1582 SET_FIELD(recv_buf_size); 1583 SET_FIELD(send_buf_size); 1584 SET_FIELD(enable_recv_pipe); 1585 SET_FIELD(enable_zerocopy_send); 1586 SET_FIELD(enable_quickack); 1587 SET_FIELD(enable_placement_id); 1588 SET_FIELD(enable_zerocopy_send_server); 1589 SET_FIELD(enable_zerocopy_send_client); 1590 1591 #undef SET_FIELD 1592 #undef FIELD_OK 1593 1594 return 0; 1595 } 1596 1597 1598 static struct spdk_net_impl g_posix_net_impl = { 1599 .name = "posix", 1600 .getaddr = posix_sock_getaddr, 1601 .connect = posix_sock_connect, 1602 .listen = posix_sock_listen, 1603 .accept = posix_sock_accept, 1604 .close = posix_sock_close, 1605 .recv = posix_sock_recv, 1606 .readv = posix_sock_readv, 1607 .writev = posix_sock_writev, 1608 .writev_async = posix_sock_writev_async, 1609 .flush = posix_sock_flush, 1610 .set_recvlowat = posix_sock_set_recvlowat, 1611 .set_recvbuf = posix_sock_set_recvbuf, 1612 .set_sendbuf = posix_sock_set_sendbuf, 1613 .is_ipv6 = posix_sock_is_ipv6, 1614 .is_ipv4 = posix_sock_is_ipv4, 1615 .is_connected = posix_sock_is_connected, 1616 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 1617 .group_impl_create = posix_sock_group_impl_create, 1618 .group_impl_add_sock = posix_sock_group_impl_add_sock, 1619 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 1620 .group_impl_poll = posix_sock_group_impl_poll, 1621 .group_impl_close = posix_sock_group_impl_close, 1622 .get_opts = posix_sock_impl_get_opts, 1623 .set_opts = posix_sock_impl_set_opts, 1624 }; 1625 1626 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1627