1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 6 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "spdk/stdinc.h" 36 37 #if defined(__FreeBSD__) 38 #include <sys/event.h> 39 #define SPDK_KEVENT 40 #else 41 #include <sys/epoll.h> 42 #define SPDK_EPOLL 43 #endif 44 45 #if defined(__linux__) 46 #include <linux/errqueue.h> 47 #endif 48 49 #include "spdk/env.h" 50 #include "spdk/log.h" 51 #include "spdk/pipe.h" 52 #include "spdk/sock.h" 53 #include "spdk/util.h" 54 #include "spdk_internal/sock.h" 55 #include "../sock_kernel.h" 56 57 #define MAX_TMPBUF 1024 58 #define PORTNUMLEN 32 59 60 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 61 #define SPDK_ZEROCOPY 62 #endif 63 64 struct spdk_posix_sock { 65 struct spdk_sock base; 66 int fd; 67 68 uint32_t sendmsg_idx; 69 70 struct spdk_pipe *recv_pipe; 71 void *recv_buf; 72 int recv_buf_sz; 73 bool pipe_has_data; 74 bool socket_has_data; 75 bool zcopy; 76 77 int placement_id; 78 79 TAILQ_ENTRY(spdk_posix_sock) link; 80 }; 81 82 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 83 84 struct spdk_posix_sock_group_impl { 85 struct spdk_sock_group_impl base; 86 int fd; 87 struct spdk_has_data_list socks_with_data; 88 int placement_id; 89 }; 90 91 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 92 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 93 .send_buf_size = MIN_SO_SNDBUF_SIZE, 94 .enable_recv_pipe = true, 95 .enable_quickack = false, 96 .enable_placement_id = PLACEMENT_NONE, 97 .enable_zerocopy_send_server = true, 98 .enable_zerocopy_send_client = false, 99 .zerocopy_threshold = 0 100 }; 101 102 static struct spdk_sock_map g_map = { 103 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 104 .mtx = PTHREAD_MUTEX_INITIALIZER 105 }; 106 107 __attribute((destructor)) static void 108 posix_sock_map_cleanup(void) 109 { 110 spdk_sock_map_cleanup(&g_map); 111 } 112 113 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 114 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 115 116 static int 117 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 118 char *caddr, int clen, uint16_t *cport) 119 { 120 struct spdk_posix_sock *sock = __posix_sock(_sock); 121 struct sockaddr_storage sa; 122 socklen_t salen; 123 int rc; 124 125 assert(sock != NULL); 126 127 memset(&sa, 0, sizeof sa); 128 salen = sizeof sa; 129 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 130 if (rc != 0) { 131 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 132 return -1; 133 } 134 135 switch (sa.ss_family) { 136 case AF_UNIX: 137 /* Acceptable connection types that don't have IPs */ 138 return 0; 139 case AF_INET: 140 case AF_INET6: 141 /* Code below will get IP addresses */ 142 break; 143 default: 144 /* Unsupported socket family */ 145 return -1; 146 } 147 148 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 149 if (rc != 0) { 150 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 151 return -1; 152 } 153 154 if (sport) { 155 if (sa.ss_family == AF_INET) { 156 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 157 } else if (sa.ss_family == AF_INET6) { 158 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 159 } 160 } 161 162 memset(&sa, 0, sizeof sa); 163 salen = sizeof sa; 164 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 165 if (rc != 0) { 166 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 167 return -1; 168 } 169 170 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 171 if (rc != 0) { 172 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 173 return -1; 174 } 175 176 if (cport) { 177 if (sa.ss_family == AF_INET) { 178 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 179 } else if (sa.ss_family == AF_INET6) { 180 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 181 } 182 } 183 184 return 0; 185 } 186 187 enum posix_sock_create_type { 188 SPDK_SOCK_CREATE_LISTEN, 189 SPDK_SOCK_CREATE_CONNECT, 190 }; 191 192 static int 193 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 194 { 195 uint8_t *new_buf; 196 struct spdk_pipe *new_pipe; 197 struct iovec siov[2]; 198 struct iovec diov[2]; 199 int sbytes; 200 ssize_t bytes; 201 202 if (sock->recv_buf_sz == sz) { 203 return 0; 204 } 205 206 /* If the new size is 0, just free the pipe */ 207 if (sz == 0) { 208 spdk_pipe_destroy(sock->recv_pipe); 209 free(sock->recv_buf); 210 sock->recv_pipe = NULL; 211 sock->recv_buf = NULL; 212 return 0; 213 } else if (sz < MIN_SOCK_PIPE_SIZE) { 214 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 215 return -1; 216 } 217 218 /* Round up to next 64 byte multiple */ 219 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 220 if (!new_buf) { 221 SPDK_ERRLOG("socket recv buf allocation failed\n"); 222 return -ENOMEM; 223 } 224 225 new_pipe = spdk_pipe_create(new_buf, sz + 1); 226 if (new_pipe == NULL) { 227 SPDK_ERRLOG("socket pipe allocation failed\n"); 228 free(new_buf); 229 return -ENOMEM; 230 } 231 232 if (sock->recv_pipe != NULL) { 233 /* Pull all of the data out of the old pipe */ 234 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 235 if (sbytes > sz) { 236 /* Too much data to fit into the new pipe size */ 237 spdk_pipe_destroy(new_pipe); 238 free(new_buf); 239 return -EINVAL; 240 } 241 242 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 243 assert(sbytes == sz); 244 245 bytes = spdk_iovcpy(siov, 2, diov, 2); 246 spdk_pipe_writer_advance(new_pipe, bytes); 247 248 spdk_pipe_destroy(sock->recv_pipe); 249 free(sock->recv_buf); 250 } 251 252 sock->recv_buf_sz = sz; 253 sock->recv_buf = new_buf; 254 sock->recv_pipe = new_pipe; 255 256 return 0; 257 } 258 259 static int 260 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 261 { 262 struct spdk_posix_sock *sock = __posix_sock(_sock); 263 int rc; 264 265 assert(sock != NULL); 266 267 if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) { 268 rc = posix_sock_alloc_pipe(sock, sz); 269 if (rc) { 270 return rc; 271 } 272 } 273 274 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ 275 if (sz < MIN_SO_RCVBUF_SIZE) { 276 sz = MIN_SO_RCVBUF_SIZE; 277 } 278 279 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 280 if (rc < 0) { 281 return rc; 282 } 283 284 return 0; 285 } 286 287 static int 288 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 289 { 290 struct spdk_posix_sock *sock = __posix_sock(_sock); 291 int rc; 292 293 assert(sock != NULL); 294 295 if (sz < MIN_SO_SNDBUF_SIZE) { 296 sz = MIN_SO_SNDBUF_SIZE; 297 } 298 299 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 300 if (rc < 0) { 301 return rc; 302 } 303 304 return 0; 305 } 306 307 static void 308 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy) 309 { 310 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 311 int flag; 312 int rc; 313 #endif 314 315 #if defined(SPDK_ZEROCOPY) 316 flag = 1; 317 318 if (enable_zero_copy) { 319 /* Try to turn on zero copy sends */ 320 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 321 if (rc == 0) { 322 sock->zcopy = true; 323 } 324 } 325 #endif 326 327 #if defined(__linux__) 328 flag = 1; 329 330 if (g_spdk_posix_sock_impl_opts.enable_quickack) { 331 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 332 if (rc != 0) { 333 SPDK_ERRLOG("quickack was failed to set\n"); 334 } 335 } 336 337 spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id, 338 &sock->placement_id); 339 340 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 341 /* Save placement_id */ 342 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 343 } 344 #endif 345 } 346 347 static struct spdk_posix_sock * 348 posix_sock_alloc(int fd, bool enable_zero_copy) 349 { 350 struct spdk_posix_sock *sock; 351 352 sock = calloc(1, sizeof(*sock)); 353 if (sock == NULL) { 354 SPDK_ERRLOG("sock allocation failed\n"); 355 return NULL; 356 } 357 358 sock->fd = fd; 359 posix_sock_init(sock, enable_zero_copy); 360 361 return sock; 362 } 363 364 static int 365 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts) 366 { 367 int fd; 368 int val = 1; 369 int rc, sz; 370 #if defined(__linux__) 371 int to; 372 #endif 373 374 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 375 if (fd < 0) { 376 /* error */ 377 return -1; 378 } 379 380 sz = g_spdk_posix_sock_impl_opts.recv_buf_size; 381 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 382 if (rc) { 383 /* Not fatal */ 384 } 385 386 sz = g_spdk_posix_sock_impl_opts.send_buf_size; 387 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 388 if (rc) { 389 /* Not fatal */ 390 } 391 392 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 393 if (rc != 0) { 394 close(fd); 395 /* error */ 396 return -1; 397 } 398 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 399 if (rc != 0) { 400 close(fd); 401 /* error */ 402 return -1; 403 } 404 405 #if defined(SO_PRIORITY) 406 if (opts->priority) { 407 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 408 if (rc != 0) { 409 close(fd); 410 /* error */ 411 return -1; 412 } 413 } 414 #endif 415 416 if (res->ai_family == AF_INET6) { 417 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 418 if (rc != 0) { 419 close(fd); 420 /* error */ 421 return -1; 422 } 423 } 424 425 if (opts->ack_timeout) { 426 #if defined(__linux__) 427 to = opts->ack_timeout; 428 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to)); 429 if (rc != 0) { 430 close(fd); 431 /* error */ 432 return -1; 433 } 434 #else 435 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 436 #endif 437 } 438 439 return fd; 440 } 441 442 static struct spdk_sock * 443 posix_sock_create(const char *ip, int port, 444 enum posix_sock_create_type type, 445 struct spdk_sock_opts *opts) 446 { 447 struct spdk_posix_sock *sock; 448 char buf[MAX_TMPBUF]; 449 char portnum[PORTNUMLEN]; 450 char *p; 451 struct addrinfo hints, *res, *res0; 452 int fd, flag; 453 int rc; 454 bool enable_zcopy_user_opts = true; 455 bool enable_zcopy_impl_opts = true; 456 457 assert(opts != NULL); 458 459 if (ip == NULL) { 460 return NULL; 461 } 462 if (ip[0] == '[') { 463 snprintf(buf, sizeof(buf), "%s", ip + 1); 464 p = strchr(buf, ']'); 465 if (p != NULL) { 466 *p = '\0'; 467 } 468 ip = (const char *) &buf[0]; 469 } 470 471 snprintf(portnum, sizeof portnum, "%d", port); 472 memset(&hints, 0, sizeof hints); 473 hints.ai_family = PF_UNSPEC; 474 hints.ai_socktype = SOCK_STREAM; 475 hints.ai_flags = AI_NUMERICSERV; 476 hints.ai_flags |= AI_PASSIVE; 477 hints.ai_flags |= AI_NUMERICHOST; 478 rc = getaddrinfo(ip, portnum, &hints, &res0); 479 if (rc != 0) { 480 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 481 return NULL; 482 } 483 484 /* try listen */ 485 fd = -1; 486 for (res = res0; res != NULL; res = res->ai_next) { 487 retry: 488 fd = posix_fd_create(res, opts); 489 if (fd < 0) { 490 continue; 491 } 492 if (type == SPDK_SOCK_CREATE_LISTEN) { 493 rc = bind(fd, res->ai_addr, res->ai_addrlen); 494 if (rc != 0) { 495 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 496 switch (errno) { 497 case EINTR: 498 /* interrupted? */ 499 close(fd); 500 goto retry; 501 case EADDRNOTAVAIL: 502 SPDK_ERRLOG("IP address %s not available. " 503 "Verify IP address in config file " 504 "and make sure setup script is " 505 "run before starting spdk app.\n", ip); 506 /* FALLTHROUGH */ 507 default: 508 /* try next family */ 509 close(fd); 510 fd = -1; 511 continue; 512 } 513 } 514 /* bind OK */ 515 rc = listen(fd, 512); 516 if (rc != 0) { 517 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 518 close(fd); 519 fd = -1; 520 break; 521 } 522 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server; 523 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 524 rc = connect(fd, res->ai_addr, res->ai_addrlen); 525 if (rc != 0) { 526 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 527 /* try next family */ 528 close(fd); 529 fd = -1; 530 continue; 531 } 532 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client; 533 } 534 535 flag = fcntl(fd, F_GETFL); 536 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 537 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 538 close(fd); 539 fd = -1; 540 break; 541 } 542 break; 543 } 544 freeaddrinfo(res0); 545 546 if (fd < 0) { 547 return NULL; 548 } 549 550 /* Only enable zero copy for non-loopback sockets. */ 551 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 552 553 sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts); 554 if (sock == NULL) { 555 SPDK_ERRLOG("sock allocation failed\n"); 556 close(fd); 557 return NULL; 558 } 559 560 return &sock->base; 561 } 562 563 static struct spdk_sock * 564 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 565 { 566 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 567 } 568 569 static struct spdk_sock * 570 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 571 { 572 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 573 } 574 575 static struct spdk_sock * 576 posix_sock_accept(struct spdk_sock *_sock) 577 { 578 struct spdk_posix_sock *sock = __posix_sock(_sock); 579 struct sockaddr_storage sa; 580 socklen_t salen; 581 int rc, fd; 582 struct spdk_posix_sock *new_sock; 583 int flag; 584 585 memset(&sa, 0, sizeof(sa)); 586 salen = sizeof(sa); 587 588 assert(sock != NULL); 589 590 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 591 592 if (rc == -1) { 593 return NULL; 594 } 595 596 fd = rc; 597 598 flag = fcntl(fd, F_GETFL); 599 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 600 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 601 close(fd); 602 return NULL; 603 } 604 605 #if defined(SO_PRIORITY) 606 /* The priority is not inherited, so call this function again */ 607 if (sock->base.opts.priority) { 608 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 609 if (rc != 0) { 610 close(fd); 611 return NULL; 612 } 613 } 614 #endif 615 616 /* Inherit the zero copy feature from the listen socket */ 617 new_sock = posix_sock_alloc(fd, sock->zcopy); 618 if (new_sock == NULL) { 619 close(fd); 620 return NULL; 621 } 622 623 return &new_sock->base; 624 } 625 626 static int 627 posix_sock_close(struct spdk_sock *_sock) 628 { 629 struct spdk_posix_sock *sock = __posix_sock(_sock); 630 631 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 632 633 /* If the socket fails to close, the best choice is to 634 * leak the fd but continue to free the rest of the sock 635 * memory. */ 636 close(sock->fd); 637 638 spdk_pipe_destroy(sock->recv_pipe); 639 free(sock->recv_buf); 640 free(sock); 641 642 return 0; 643 } 644 645 #ifdef SPDK_ZEROCOPY 646 static int 647 _sock_check_zcopy(struct spdk_sock *sock) 648 { 649 struct spdk_posix_sock *psock = __posix_sock(sock); 650 struct msghdr msgh = {}; 651 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 652 ssize_t rc; 653 struct sock_extended_err *serr; 654 struct cmsghdr *cm; 655 uint32_t idx; 656 struct spdk_sock_request *req, *treq; 657 bool found; 658 659 msgh.msg_control = buf; 660 msgh.msg_controllen = sizeof(buf); 661 662 while (true) { 663 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 664 665 if (rc < 0) { 666 if (errno == EWOULDBLOCK || errno == EAGAIN) { 667 return 0; 668 } 669 670 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 671 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 672 } else { 673 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 674 } 675 return 0; 676 } 677 678 cm = CMSG_FIRSTHDR(&msgh); 679 if (!(cm && 680 ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 681 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) { 682 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 683 return 0; 684 } 685 686 serr = (struct sock_extended_err *)CMSG_DATA(cm); 687 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 688 SPDK_WARNLOG("Unexpected extended error origin\n"); 689 return 0; 690 } 691 692 /* Most of the time, the pending_reqs array is in the exact 693 * order we need such that all of the requests to complete are 694 * in order, in the front. It is guaranteed that all requests 695 * belonging to the same sendmsg call are sequential, so once 696 * we encounter one match we can stop looping as soon as a 697 * non-match is found. 698 */ 699 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 700 found = false; 701 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 702 if (!req->internal.is_zcopy) { 703 /* This wasn't a zcopy request. It was just waiting in line to complete */ 704 rc = spdk_sock_request_put(sock, req, 0); 705 if (rc < 0) { 706 return rc; 707 } 708 } else if (req->internal.offset == idx) { 709 found = true; 710 rc = spdk_sock_request_put(sock, req, 0); 711 if (rc < 0) { 712 return rc; 713 } 714 } else if (found) { 715 break; 716 } 717 } 718 } 719 } 720 721 return 0; 722 } 723 #endif 724 725 static int 726 _sock_flush(struct spdk_sock *sock) 727 { 728 struct spdk_posix_sock *psock = __posix_sock(sock); 729 struct msghdr msg = {}; 730 int flags; 731 struct iovec iovs[IOV_BATCH_SIZE]; 732 int iovcnt; 733 int retval; 734 struct spdk_sock_request *req; 735 int i; 736 ssize_t rc; 737 unsigned int offset; 738 size_t len; 739 bool is_zcopy = false; 740 741 /* Can't flush from within a callback or we end up with recursive calls */ 742 if (sock->cb_cnt > 0) { 743 return 0; 744 } 745 746 #ifdef SPDK_ZEROCOPY 747 if (psock->zcopy) { 748 flags = MSG_ZEROCOPY | MSG_NOSIGNAL; 749 } else 750 #endif 751 { 752 flags = MSG_NOSIGNAL; 753 } 754 755 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags); 756 if (iovcnt == 0) { 757 return 0; 758 } 759 760 #ifdef SPDK_ZEROCOPY 761 is_zcopy = flags & MSG_ZEROCOPY; 762 #endif 763 764 /* Perform the vectored write */ 765 msg.msg_iov = iovs; 766 msg.msg_iovlen = iovcnt; 767 768 rc = sendmsg(psock->fd, &msg, flags); 769 if (rc <= 0) { 770 if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 771 return 0; 772 } 773 return rc; 774 } 775 776 if (is_zcopy) { 777 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 778 * req->internal.offset, so sendmsg_idx should not be zero */ 779 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 780 psock->sendmsg_idx = 1; 781 } else { 782 psock->sendmsg_idx++; 783 } 784 } 785 786 /* Consume the requests that were actually written */ 787 req = TAILQ_FIRST(&sock->queued_reqs); 788 while (req) { 789 offset = req->internal.offset; 790 791 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 792 req->internal.is_zcopy = is_zcopy; 793 794 for (i = 0; i < req->iovcnt; i++) { 795 /* Advance by the offset first */ 796 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 797 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 798 continue; 799 } 800 801 /* Calculate the remaining length of this element */ 802 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 803 804 if (len > (size_t)rc) { 805 /* This element was partially sent. */ 806 req->internal.offset += rc; 807 return 0; 808 } 809 810 offset = 0; 811 req->internal.offset += len; 812 rc -= len; 813 } 814 815 /* Handled a full request. */ 816 spdk_sock_request_pend(sock, req); 817 818 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) { 819 /* The sendmsg syscall above isn't currently asynchronous, 820 * so it's already done. */ 821 retval = spdk_sock_request_put(sock, req, 0); 822 if (retval) { 823 break; 824 } 825 } else { 826 /* Re-use the offset field to hold the sendmsg call index. The 827 * index is 0 based, so subtract one here because we've already 828 * incremented above. */ 829 req->internal.offset = psock->sendmsg_idx - 1; 830 } 831 832 if (rc == 0) { 833 break; 834 } 835 836 req = TAILQ_FIRST(&sock->queued_reqs); 837 } 838 839 return 0; 840 } 841 842 static int 843 posix_sock_flush(struct spdk_sock *sock) 844 { 845 #ifdef SPDK_ZEROCOPY 846 struct spdk_posix_sock *psock = __posix_sock(sock); 847 848 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 849 _sock_check_zcopy(sock); 850 } 851 #endif 852 853 return _sock_flush(sock); 854 } 855 856 static ssize_t 857 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 858 { 859 struct iovec siov[2]; 860 int sbytes; 861 ssize_t bytes; 862 struct spdk_posix_sock_group_impl *group; 863 864 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 865 if (sbytes < 0) { 866 errno = EINVAL; 867 return -1; 868 } else if (sbytes == 0) { 869 errno = EAGAIN; 870 return -1; 871 } 872 873 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 874 875 if (bytes == 0) { 876 /* The only way this happens is if diov is 0 length */ 877 errno = EINVAL; 878 return -1; 879 } 880 881 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 882 883 /* If we drained the pipe, mark it appropriately */ 884 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 885 assert(sock->pipe_has_data == true); 886 887 group = __posix_group_impl(sock->base.group_impl); 888 if (group && !sock->socket_has_data) { 889 TAILQ_REMOVE(&group->socks_with_data, sock, link); 890 } 891 892 sock->pipe_has_data = false; 893 } 894 895 return bytes; 896 } 897 898 static inline ssize_t 899 posix_sock_read(struct spdk_posix_sock *sock) 900 { 901 struct iovec iov[2]; 902 int bytes_avail, bytes_recvd; 903 struct spdk_posix_sock_group_impl *group; 904 905 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 906 907 if (bytes_avail <= 0) { 908 return bytes_avail; 909 } 910 911 bytes_recvd = readv(sock->fd, iov, 2); 912 913 assert(sock->pipe_has_data == false); 914 915 if (bytes_recvd <= 0) { 916 /* Errors count as draining the socket data */ 917 if (sock->base.group_impl && sock->socket_has_data) { 918 group = __posix_group_impl(sock->base.group_impl); 919 TAILQ_REMOVE(&group->socks_with_data, sock, link); 920 } 921 922 sock->socket_has_data = false; 923 924 return bytes_recvd; 925 } 926 927 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 928 929 #if DEBUG 930 if (sock->base.group_impl) { 931 assert(sock->socket_has_data == true); 932 } 933 #endif 934 935 sock->pipe_has_data = true; 936 if (bytes_recvd < bytes_avail) { 937 /* We drained the kernel socket entirely. */ 938 sock->socket_has_data = false; 939 } 940 941 return bytes_recvd; 942 } 943 944 static ssize_t 945 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 946 { 947 struct spdk_posix_sock *sock = __posix_sock(_sock); 948 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 949 int rc, i; 950 size_t len; 951 952 if (sock->recv_pipe == NULL) { 953 assert(sock->pipe_has_data == false); 954 if (group && sock->socket_has_data) { 955 sock->socket_has_data = false; 956 TAILQ_REMOVE(&group->socks_with_data, sock, link); 957 } 958 return readv(sock->fd, iov, iovcnt); 959 } 960 961 /* If the socket is not in a group, we must assume it always has 962 * data waiting for us because it is not epolled */ 963 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 964 /* If the user is receiving a sufficiently large amount of data, 965 * receive directly to their buffers. */ 966 len = 0; 967 for (i = 0; i < iovcnt; i++) { 968 len += iov[i].iov_len; 969 } 970 971 if (len >= MIN_SOCK_PIPE_SIZE) { 972 /* TODO: Should this detect if kernel socket is drained? */ 973 return readv(sock->fd, iov, iovcnt); 974 } 975 976 /* Otherwise, do a big read into our pipe */ 977 rc = posix_sock_read(sock); 978 if (rc <= 0) { 979 return rc; 980 } 981 } 982 983 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 984 } 985 986 static ssize_t 987 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 988 { 989 struct iovec iov[1]; 990 991 iov[0].iov_base = buf; 992 iov[0].iov_len = len; 993 994 return posix_sock_readv(sock, iov, 1); 995 } 996 997 static ssize_t 998 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 999 { 1000 struct spdk_posix_sock *sock = __posix_sock(_sock); 1001 int rc; 1002 1003 /* In order to process a writev, we need to flush any asynchronous writes 1004 * first. */ 1005 rc = _sock_flush(_sock); 1006 if (rc < 0) { 1007 return rc; 1008 } 1009 1010 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 1011 /* We weren't able to flush all requests */ 1012 errno = EAGAIN; 1013 return -1; 1014 } 1015 1016 return writev(sock->fd, iov, iovcnt); 1017 } 1018 1019 static void 1020 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1021 { 1022 int rc; 1023 1024 spdk_sock_request_queue(sock, req); 1025 1026 /* If there are a sufficient number queued, just flush them out immediately. */ 1027 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1028 rc = _sock_flush(sock); 1029 if (rc) { 1030 spdk_sock_abort_requests(sock); 1031 } 1032 } 1033 } 1034 1035 static int 1036 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1037 { 1038 struct spdk_posix_sock *sock = __posix_sock(_sock); 1039 int val; 1040 int rc; 1041 1042 assert(sock != NULL); 1043 1044 val = nbytes; 1045 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1046 if (rc != 0) { 1047 return -1; 1048 } 1049 return 0; 1050 } 1051 1052 static bool 1053 posix_sock_is_ipv6(struct spdk_sock *_sock) 1054 { 1055 struct spdk_posix_sock *sock = __posix_sock(_sock); 1056 struct sockaddr_storage sa; 1057 socklen_t salen; 1058 int rc; 1059 1060 assert(sock != NULL); 1061 1062 memset(&sa, 0, sizeof sa); 1063 salen = sizeof sa; 1064 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1065 if (rc != 0) { 1066 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1067 return false; 1068 } 1069 1070 return (sa.ss_family == AF_INET6); 1071 } 1072 1073 static bool 1074 posix_sock_is_ipv4(struct spdk_sock *_sock) 1075 { 1076 struct spdk_posix_sock *sock = __posix_sock(_sock); 1077 struct sockaddr_storage sa; 1078 socklen_t salen; 1079 int rc; 1080 1081 assert(sock != NULL); 1082 1083 memset(&sa, 0, sizeof sa); 1084 salen = sizeof sa; 1085 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1086 if (rc != 0) { 1087 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1088 return false; 1089 } 1090 1091 return (sa.ss_family == AF_INET); 1092 } 1093 1094 static bool 1095 posix_sock_is_connected(struct spdk_sock *_sock) 1096 { 1097 struct spdk_posix_sock *sock = __posix_sock(_sock); 1098 uint8_t byte; 1099 int rc; 1100 1101 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1102 if (rc == 0) { 1103 return false; 1104 } 1105 1106 if (rc < 0) { 1107 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1108 return true; 1109 } 1110 1111 return false; 1112 } 1113 1114 return true; 1115 } 1116 1117 static struct spdk_sock_group_impl * 1118 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1119 { 1120 struct spdk_posix_sock *sock = __posix_sock(_sock); 1121 struct spdk_sock_group_impl *group_impl; 1122 1123 if (sock->placement_id != -1) { 1124 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint); 1125 return group_impl; 1126 } 1127 1128 return NULL; 1129 } 1130 1131 static struct spdk_sock_group_impl * 1132 posix_sock_group_impl_create(void) 1133 { 1134 struct spdk_posix_sock_group_impl *group_impl; 1135 int fd; 1136 1137 #if defined(SPDK_EPOLL) 1138 fd = epoll_create1(0); 1139 #elif defined(SPDK_KEVENT) 1140 fd = kqueue(); 1141 #endif 1142 if (fd == -1) { 1143 return NULL; 1144 } 1145 1146 group_impl = calloc(1, sizeof(*group_impl)); 1147 if (group_impl == NULL) { 1148 SPDK_ERRLOG("group_impl allocation failed\n"); 1149 close(fd); 1150 return NULL; 1151 } 1152 1153 group_impl->fd = fd; 1154 TAILQ_INIT(&group_impl->socks_with_data); 1155 group_impl->placement_id = -1; 1156 1157 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1158 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1159 group_impl->placement_id = spdk_env_get_current_core(); 1160 } 1161 1162 return &group_impl->base; 1163 } 1164 1165 static void 1166 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1167 int placement_id) 1168 { 1169 #if defined(SO_MARK) 1170 int rc; 1171 1172 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1173 &placement_id, sizeof(placement_id)); 1174 if (rc != 0) { 1175 /* Not fatal */ 1176 SPDK_ERRLOG("Error setting SO_MARK\n"); 1177 return; 1178 } 1179 1180 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1181 if (rc != 0) { 1182 /* Not fatal */ 1183 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1184 return; 1185 } 1186 1187 sock->placement_id = placement_id; 1188 #endif 1189 } 1190 1191 static void 1192 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1193 { 1194 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1195 1196 if (group->placement_id == -1) { 1197 group->placement_id = spdk_sock_map_find_free(&g_map); 1198 1199 /* If a free placement id is found, update existing sockets in this group */ 1200 if (group->placement_id != -1) { 1201 struct spdk_sock *sock, *tmp; 1202 1203 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1204 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1205 } 1206 } 1207 } 1208 1209 if (group->placement_id != -1) { 1210 /* 1211 * group placement id is already determined for this poll group. 1212 * Mark socket with group's placement id. 1213 */ 1214 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1215 } 1216 } 1217 1218 static int 1219 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1220 { 1221 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1222 struct spdk_posix_sock *sock = __posix_sock(_sock); 1223 int rc; 1224 1225 #if defined(SPDK_EPOLL) 1226 struct epoll_event event; 1227 1228 memset(&event, 0, sizeof(event)); 1229 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1230 event.events = EPOLLIN | EPOLLERR; 1231 event.data.ptr = sock; 1232 1233 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1234 #elif defined(SPDK_KEVENT) 1235 struct kevent event; 1236 struct timespec ts = {0}; 1237 1238 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1239 1240 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1241 #endif 1242 1243 if (rc != 0) { 1244 return rc; 1245 } 1246 1247 /* switched from another polling group due to scheduling */ 1248 if (spdk_unlikely(sock->recv_pipe != NULL && 1249 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1250 sock->pipe_has_data = true; 1251 sock->socket_has_data = false; 1252 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1253 } 1254 1255 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 1256 posix_sock_update_mark(_group, _sock); 1257 } else if (sock->placement_id != -1) { 1258 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1259 if (rc != 0) { 1260 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1261 /* Do not treat this as an error. The system will continue running. */ 1262 } 1263 } 1264 1265 return rc; 1266 } 1267 1268 static int 1269 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1270 { 1271 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1272 struct spdk_posix_sock *sock = __posix_sock(_sock); 1273 int rc; 1274 1275 if (sock->pipe_has_data || sock->socket_has_data) { 1276 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1277 sock->pipe_has_data = false; 1278 sock->socket_has_data = false; 1279 } 1280 1281 if (sock->placement_id != -1) { 1282 spdk_sock_map_release(&g_map, sock->placement_id); 1283 } 1284 1285 #if defined(SPDK_EPOLL) 1286 struct epoll_event event; 1287 1288 /* Event parameter is ignored but some old kernel version still require it. */ 1289 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1290 #elif defined(SPDK_KEVENT) 1291 struct kevent event; 1292 struct timespec ts = {0}; 1293 1294 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1295 1296 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1297 if (rc == 0 && event.flags & EV_ERROR) { 1298 rc = -1; 1299 errno = event.data; 1300 } 1301 #endif 1302 1303 spdk_sock_abort_requests(_sock); 1304 1305 return rc; 1306 } 1307 1308 static int 1309 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1310 struct spdk_sock **socks) 1311 { 1312 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1313 struct spdk_sock *sock, *tmp; 1314 int num_events, i, rc; 1315 struct spdk_posix_sock *psock, *ptmp; 1316 #if defined(SPDK_EPOLL) 1317 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1318 #elif defined(SPDK_KEVENT) 1319 struct kevent events[MAX_EVENTS_PER_POLL]; 1320 struct timespec ts = {0}; 1321 #endif 1322 1323 #ifdef SPDK_ZEROCOPY 1324 /* When all of the following conditions are met 1325 * - non-blocking socket 1326 * - zero copy is enabled 1327 * - interrupts suppressed (i.e. busy polling) 1328 * - the NIC tx queue is full at the time sendmsg() is called 1329 * - epoll_wait determines there is an EPOLLIN event for the socket 1330 * then we can get into a situation where data we've sent is queued 1331 * up in the kernel network stack, but interrupts have been suppressed 1332 * because other traffic is flowing so the kernel misses the signal 1333 * to flush the software tx queue. If there wasn't incoming data 1334 * pending on the socket, then epoll_wait would have been sufficient 1335 * to kick off the send operation, but since there is a pending event 1336 * epoll_wait does not trigger the necessary operation. 1337 * 1338 * We deal with this by checking for all of the above conditions and 1339 * additionally looking for EPOLLIN events that were not consumed from 1340 * the last poll loop. We take this to mean that the upper layer is 1341 * unable to consume them because it is blocked waiting for resources 1342 * to free up, and those resources are most likely freed in response 1343 * to a pending asynchronous write completing. 1344 * 1345 * Additionally, sockets that have the same placement_id actually share 1346 * an underlying hardware queue. That means polling one of them is 1347 * equivalent to polling all of them. As a quick mechanism to avoid 1348 * making extra poll() calls, stash the last placement_id during the loop 1349 * and only poll if it's not the same. The overwhelmingly common case 1350 * is that all sockets in this list have the same placement_id because 1351 * SPDK is intentionally grouping sockets by that value, so even 1352 * though this won't stop all extra calls to poll(), it's very fast 1353 * and will catch all of them in practice. 1354 */ 1355 int last_placement_id = -1; 1356 1357 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 1358 if (psock->zcopy && psock->placement_id >= 0 && 1359 psock->placement_id != last_placement_id) { 1360 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 1361 1362 poll(&pfd, 1, 0); 1363 last_placement_id = psock->placement_id; 1364 } 1365 } 1366 #endif 1367 1368 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1369 * a completion callback could remove the sock from the 1370 * group. */ 1371 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1372 rc = _sock_flush(sock); 1373 if (rc) { 1374 spdk_sock_abort_requests(sock); 1375 } 1376 } 1377 1378 assert(max_events > 0); 1379 1380 #if defined(SPDK_EPOLL) 1381 num_events = epoll_wait(group->fd, events, max_events, 0); 1382 #elif defined(SPDK_KEVENT) 1383 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1384 #endif 1385 1386 if (num_events == -1) { 1387 return -1; 1388 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 1389 sock = TAILQ_FIRST(&_group->socks); 1390 psock = __posix_sock(sock); 1391 /* poll() is called here to busy poll the queue associated with 1392 * first socket in list and potentially reap incoming data. 1393 */ 1394 if (sock->opts.priority) { 1395 struct pollfd pfd = {0, 0, 0}; 1396 1397 pfd.fd = psock->fd; 1398 pfd.events = POLLIN | POLLERR; 1399 poll(&pfd, 1, 0); 1400 } 1401 } 1402 1403 for (i = 0; i < num_events; i++) { 1404 #if defined(SPDK_EPOLL) 1405 sock = events[i].data.ptr; 1406 psock = __posix_sock(sock); 1407 1408 #ifdef SPDK_ZEROCOPY 1409 if (events[i].events & EPOLLERR) { 1410 rc = _sock_check_zcopy(sock); 1411 /* If the socket was closed or removed from 1412 * the group in response to a send ack, don't 1413 * add it to the array here. */ 1414 if (rc || sock->cb_fn == NULL) { 1415 continue; 1416 } 1417 } 1418 #endif 1419 if ((events[i].events & EPOLLIN) == 0) { 1420 continue; 1421 } 1422 1423 #elif defined(SPDK_KEVENT) 1424 sock = events[i].udata; 1425 psock = __posix_sock(sock); 1426 #endif 1427 1428 /* If the socket is not already in the list, add it now */ 1429 if (!psock->socket_has_data && !psock->pipe_has_data) { 1430 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 1431 } 1432 psock->socket_has_data = true; 1433 } 1434 1435 num_events = 0; 1436 1437 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 1438 if (num_events == max_events) { 1439 break; 1440 } 1441 1442 /* If the socket's cb_fn is NULL, just remove it from the 1443 * list and do not add it to socks array */ 1444 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 1445 psock->socket_has_data = false; 1446 psock->pipe_has_data = false; 1447 TAILQ_REMOVE(&group->socks_with_data, psock, link); 1448 continue; 1449 } 1450 1451 socks[num_events++] = &psock->base; 1452 } 1453 1454 /* Cycle the has_data list so that each time we poll things aren't 1455 * in the same order. Say we have 6 sockets in the list, named as follows: 1456 * A B C D E F 1457 * And all 6 sockets had epoll events, but max_events is only 3. That means 1458 * psock currently points at D. We want to rearrange the list to the following: 1459 * D E F A B C 1460 * 1461 * The variables below are named according to this example to make it easier to 1462 * follow the swaps. 1463 */ 1464 if (psock != NULL) { 1465 struct spdk_posix_sock *pa, *pc, *pd, *pf; 1466 1467 /* Capture pointers to the elements we need */ 1468 pd = psock; 1469 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 1470 pa = TAILQ_FIRST(&group->socks_with_data); 1471 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 1472 1473 /* Break the link between C and D */ 1474 pc->link.tqe_next = NULL; 1475 1476 /* Connect F to A */ 1477 pf->link.tqe_next = pa; 1478 pa->link.tqe_prev = &pf->link.tqe_next; 1479 1480 /* Fix up the list first/last pointers */ 1481 group->socks_with_data.tqh_first = pd; 1482 group->socks_with_data.tqh_last = &pc->link.tqe_next; 1483 1484 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1485 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 1486 } 1487 1488 return num_events; 1489 } 1490 1491 static int 1492 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1493 { 1494 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1495 int rc; 1496 1497 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1498 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1499 } 1500 1501 rc = close(group->fd); 1502 free(group); 1503 return rc; 1504 } 1505 1506 static int 1507 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1508 { 1509 if (!opts || !len) { 1510 errno = EINVAL; 1511 return -1; 1512 } 1513 memset(opts, 0, *len); 1514 1515 #define FIELD_OK(field) \ 1516 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1517 1518 #define GET_FIELD(field) \ 1519 if (FIELD_OK(field)) { \ 1520 opts->field = g_spdk_posix_sock_impl_opts.field; \ 1521 } 1522 1523 GET_FIELD(recv_buf_size); 1524 GET_FIELD(send_buf_size); 1525 GET_FIELD(enable_recv_pipe); 1526 GET_FIELD(enable_zerocopy_send); 1527 GET_FIELD(enable_quickack); 1528 GET_FIELD(enable_placement_id); 1529 GET_FIELD(enable_zerocopy_send_server); 1530 GET_FIELD(enable_zerocopy_send_client); 1531 GET_FIELD(zerocopy_threshold); 1532 1533 #undef GET_FIELD 1534 #undef FIELD_OK 1535 1536 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 1537 return 0; 1538 } 1539 1540 static int 1541 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1542 { 1543 if (!opts) { 1544 errno = EINVAL; 1545 return -1; 1546 } 1547 1548 #define FIELD_OK(field) \ 1549 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1550 1551 #define SET_FIELD(field) \ 1552 if (FIELD_OK(field)) { \ 1553 g_spdk_posix_sock_impl_opts.field = opts->field; \ 1554 } 1555 1556 SET_FIELD(recv_buf_size); 1557 SET_FIELD(send_buf_size); 1558 SET_FIELD(enable_recv_pipe); 1559 SET_FIELD(enable_zerocopy_send); 1560 SET_FIELD(enable_quickack); 1561 SET_FIELD(enable_placement_id); 1562 SET_FIELD(enable_zerocopy_send_server); 1563 SET_FIELD(enable_zerocopy_send_client); 1564 SET_FIELD(zerocopy_threshold); 1565 1566 #undef SET_FIELD 1567 #undef FIELD_OK 1568 1569 return 0; 1570 } 1571 1572 1573 static struct spdk_net_impl g_posix_net_impl = { 1574 .name = "posix", 1575 .getaddr = posix_sock_getaddr, 1576 .connect = posix_sock_connect, 1577 .listen = posix_sock_listen, 1578 .accept = posix_sock_accept, 1579 .close = posix_sock_close, 1580 .recv = posix_sock_recv, 1581 .readv = posix_sock_readv, 1582 .writev = posix_sock_writev, 1583 .writev_async = posix_sock_writev_async, 1584 .flush = posix_sock_flush, 1585 .set_recvlowat = posix_sock_set_recvlowat, 1586 .set_recvbuf = posix_sock_set_recvbuf, 1587 .set_sendbuf = posix_sock_set_sendbuf, 1588 .is_ipv6 = posix_sock_is_ipv6, 1589 .is_ipv4 = posix_sock_is_ipv4, 1590 .is_connected = posix_sock_is_connected, 1591 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 1592 .group_impl_create = posix_sock_group_impl_create, 1593 .group_impl_add_sock = posix_sock_group_impl_add_sock, 1594 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 1595 .group_impl_poll = posix_sock_group_impl_poll, 1596 .group_impl_close = posix_sock_group_impl_close, 1597 .get_opts = posix_sock_impl_get_opts, 1598 .set_opts = posix_sock_impl_set_opts, 1599 }; 1600 1601 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1602