1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 6 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "spdk/stdinc.h" 36 37 #if defined(__FreeBSD__) 38 #include <sys/event.h> 39 #define SPDK_KEVENT 40 #else 41 #include <sys/epoll.h> 42 #define SPDK_EPOLL 43 #endif 44 45 #if defined(__linux__) 46 #include <linux/errqueue.h> 47 #endif 48 49 #include "spdk/env.h" 50 #include "spdk/log.h" 51 #include "spdk/pipe.h" 52 #include "spdk/sock.h" 53 #include "spdk/util.h" 54 #include "spdk_internal/sock.h" 55 #include "../sock_kernel.h" 56 57 #define MAX_TMPBUF 1024 58 #define PORTNUMLEN 32 59 60 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 61 #define SPDK_ZEROCOPY 62 #endif 63 64 struct spdk_posix_sock { 65 struct spdk_sock base; 66 int fd; 67 68 uint32_t sendmsg_idx; 69 70 struct spdk_pipe *recv_pipe; 71 void *recv_buf; 72 int recv_buf_sz; 73 bool pipe_has_data; 74 bool socket_has_data; 75 bool zcopy; 76 77 int placement_id; 78 79 TAILQ_ENTRY(spdk_posix_sock) link; 80 }; 81 82 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 83 84 struct spdk_posix_sock_group_impl { 85 struct spdk_sock_group_impl base; 86 int fd; 87 struct spdk_has_data_list socks_with_data; 88 int placement_id; 89 }; 90 91 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 92 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 93 .send_buf_size = MIN_SO_SNDBUF_SIZE, 94 .enable_recv_pipe = true, 95 .enable_quickack = false, 96 .enable_placement_id = PLACEMENT_NONE, 97 .enable_zerocopy_send_server = true, 98 .enable_zerocopy_send_client = false 99 }; 100 101 static struct spdk_sock_map g_map = { 102 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 103 .mtx = PTHREAD_MUTEX_INITIALIZER 104 }; 105 106 __attribute((destructor)) static void 107 posix_sock_map_cleanup(void) 108 { 109 spdk_sock_map_cleanup(&g_map); 110 } 111 112 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 113 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 114 115 static int 116 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 117 char *caddr, int clen, uint16_t *cport) 118 { 119 struct spdk_posix_sock *sock = __posix_sock(_sock); 120 struct sockaddr_storage sa; 121 socklen_t salen; 122 int rc; 123 124 assert(sock != NULL); 125 126 memset(&sa, 0, sizeof sa); 127 salen = sizeof sa; 128 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 129 if (rc != 0) { 130 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 131 return -1; 132 } 133 134 switch (sa.ss_family) { 135 case AF_UNIX: 136 /* Acceptable connection types that don't have IPs */ 137 return 0; 138 case AF_INET: 139 case AF_INET6: 140 /* Code below will get IP addresses */ 141 break; 142 default: 143 /* Unsupported socket family */ 144 return -1; 145 } 146 147 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 148 if (rc != 0) { 149 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 150 return -1; 151 } 152 153 if (sport) { 154 if (sa.ss_family == AF_INET) { 155 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 156 } else if (sa.ss_family == AF_INET6) { 157 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 158 } 159 } 160 161 memset(&sa, 0, sizeof sa); 162 salen = sizeof sa; 163 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 164 if (rc != 0) { 165 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 166 return -1; 167 } 168 169 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 170 if (rc != 0) { 171 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 172 return -1; 173 } 174 175 if (cport) { 176 if (sa.ss_family == AF_INET) { 177 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 178 } else if (sa.ss_family == AF_INET6) { 179 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 180 } 181 } 182 183 return 0; 184 } 185 186 enum posix_sock_create_type { 187 SPDK_SOCK_CREATE_LISTEN, 188 SPDK_SOCK_CREATE_CONNECT, 189 }; 190 191 static int 192 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 193 { 194 uint8_t *new_buf; 195 struct spdk_pipe *new_pipe; 196 struct iovec siov[2]; 197 struct iovec diov[2]; 198 int sbytes; 199 ssize_t bytes; 200 201 if (sock->recv_buf_sz == sz) { 202 return 0; 203 } 204 205 /* If the new size is 0, just free the pipe */ 206 if (sz == 0) { 207 spdk_pipe_destroy(sock->recv_pipe); 208 free(sock->recv_buf); 209 sock->recv_pipe = NULL; 210 sock->recv_buf = NULL; 211 return 0; 212 } else if (sz < MIN_SOCK_PIPE_SIZE) { 213 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 214 return -1; 215 } 216 217 /* Round up to next 64 byte multiple */ 218 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 219 if (!new_buf) { 220 SPDK_ERRLOG("socket recv buf allocation failed\n"); 221 return -ENOMEM; 222 } 223 224 new_pipe = spdk_pipe_create(new_buf, sz + 1); 225 if (new_pipe == NULL) { 226 SPDK_ERRLOG("socket pipe allocation failed\n"); 227 free(new_buf); 228 return -ENOMEM; 229 } 230 231 if (sock->recv_pipe != NULL) { 232 /* Pull all of the data out of the old pipe */ 233 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 234 if (sbytes > sz) { 235 /* Too much data to fit into the new pipe size */ 236 spdk_pipe_destroy(new_pipe); 237 free(new_buf); 238 return -EINVAL; 239 } 240 241 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 242 assert(sbytes == sz); 243 244 bytes = spdk_iovcpy(siov, 2, diov, 2); 245 spdk_pipe_writer_advance(new_pipe, bytes); 246 247 spdk_pipe_destroy(sock->recv_pipe); 248 free(sock->recv_buf); 249 } 250 251 sock->recv_buf_sz = sz; 252 sock->recv_buf = new_buf; 253 sock->recv_pipe = new_pipe; 254 255 return 0; 256 } 257 258 static int 259 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 260 { 261 struct spdk_posix_sock *sock = __posix_sock(_sock); 262 int rc; 263 264 assert(sock != NULL); 265 266 if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) { 267 rc = posix_sock_alloc_pipe(sock, sz); 268 if (rc) { 269 return rc; 270 } 271 } 272 273 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ 274 if (sz < MIN_SO_RCVBUF_SIZE) { 275 sz = MIN_SO_RCVBUF_SIZE; 276 } 277 278 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 279 if (rc < 0) { 280 return rc; 281 } 282 283 return 0; 284 } 285 286 static int 287 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 288 { 289 struct spdk_posix_sock *sock = __posix_sock(_sock); 290 int rc; 291 292 assert(sock != NULL); 293 294 if (sz < MIN_SO_SNDBUF_SIZE) { 295 sz = MIN_SO_SNDBUF_SIZE; 296 } 297 298 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 299 if (rc < 0) { 300 return rc; 301 } 302 303 return 0; 304 } 305 306 static void 307 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy) 308 { 309 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 310 int flag; 311 int rc; 312 #endif 313 314 #if defined(SPDK_ZEROCOPY) 315 flag = 1; 316 317 if (enable_zero_copy) { 318 /* Try to turn on zero copy sends */ 319 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 320 if (rc == 0) { 321 sock->zcopy = true; 322 } 323 } 324 #endif 325 326 #if defined(__linux__) 327 flag = 1; 328 329 if (g_spdk_posix_sock_impl_opts.enable_quickack) { 330 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 331 if (rc != 0) { 332 SPDK_ERRLOG("quickack was failed to set\n"); 333 } 334 } 335 336 spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id, 337 &sock->placement_id); 338 339 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 340 /* Save placement_id */ 341 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 342 } 343 #endif 344 } 345 346 static struct spdk_posix_sock * 347 posix_sock_alloc(int fd, bool enable_zero_copy) 348 { 349 struct spdk_posix_sock *sock; 350 351 sock = calloc(1, sizeof(*sock)); 352 if (sock == NULL) { 353 SPDK_ERRLOG("sock allocation failed\n"); 354 return NULL; 355 } 356 357 sock->fd = fd; 358 posix_sock_init(sock, enable_zero_copy); 359 360 return sock; 361 } 362 363 static int 364 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts) 365 { 366 int fd; 367 int val = 1; 368 int rc, sz; 369 370 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 371 if (fd < 0) { 372 /* error */ 373 return -1; 374 } 375 376 sz = g_spdk_posix_sock_impl_opts.recv_buf_size; 377 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 378 if (rc) { 379 /* Not fatal */ 380 } 381 382 sz = g_spdk_posix_sock_impl_opts.send_buf_size; 383 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 384 if (rc) { 385 /* Not fatal */ 386 } 387 388 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 389 if (rc != 0) { 390 close(fd); 391 /* error */ 392 return -1; 393 } 394 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 395 if (rc != 0) { 396 close(fd); 397 /* error */ 398 return -1; 399 } 400 401 #if defined(SO_PRIORITY) 402 if (opts->priority) { 403 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 404 if (rc != 0) { 405 close(fd); 406 /* error */ 407 return -1; 408 } 409 } 410 #endif 411 412 if (res->ai_family == AF_INET6) { 413 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 414 if (rc != 0) { 415 close(fd); 416 /* error */ 417 return -1; 418 } 419 } 420 return fd; 421 } 422 423 static struct spdk_sock * 424 posix_sock_create(const char *ip, int port, 425 enum posix_sock_create_type type, 426 struct spdk_sock_opts *opts) 427 { 428 struct spdk_posix_sock *sock; 429 char buf[MAX_TMPBUF]; 430 char portnum[PORTNUMLEN]; 431 char *p; 432 struct addrinfo hints, *res, *res0; 433 int fd, flag; 434 int rc; 435 bool enable_zcopy_user_opts = true; 436 bool enable_zcopy_impl_opts = true; 437 438 assert(opts != NULL); 439 440 if (ip == NULL) { 441 return NULL; 442 } 443 if (ip[0] == '[') { 444 snprintf(buf, sizeof(buf), "%s", ip + 1); 445 p = strchr(buf, ']'); 446 if (p != NULL) { 447 *p = '\0'; 448 } 449 ip = (const char *) &buf[0]; 450 } 451 452 snprintf(portnum, sizeof portnum, "%d", port); 453 memset(&hints, 0, sizeof hints); 454 hints.ai_family = PF_UNSPEC; 455 hints.ai_socktype = SOCK_STREAM; 456 hints.ai_flags = AI_NUMERICSERV; 457 hints.ai_flags |= AI_PASSIVE; 458 hints.ai_flags |= AI_NUMERICHOST; 459 rc = getaddrinfo(ip, portnum, &hints, &res0); 460 if (rc != 0) { 461 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 462 return NULL; 463 } 464 465 /* try listen */ 466 fd = -1; 467 for (res = res0; res != NULL; res = res->ai_next) { 468 retry: 469 fd = posix_fd_create(res, opts); 470 if (fd < 0) { 471 continue; 472 } 473 if (type == SPDK_SOCK_CREATE_LISTEN) { 474 rc = bind(fd, res->ai_addr, res->ai_addrlen); 475 if (rc != 0) { 476 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 477 switch (errno) { 478 case EINTR: 479 /* interrupted? */ 480 close(fd); 481 goto retry; 482 case EADDRNOTAVAIL: 483 SPDK_ERRLOG("IP address %s not available. " 484 "Verify IP address in config file " 485 "and make sure setup script is " 486 "run before starting spdk app.\n", ip); 487 /* FALLTHROUGH */ 488 default: 489 /* try next family */ 490 close(fd); 491 fd = -1; 492 continue; 493 } 494 } 495 /* bind OK */ 496 rc = listen(fd, 512); 497 if (rc != 0) { 498 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 499 close(fd); 500 fd = -1; 501 break; 502 } 503 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server; 504 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 505 rc = connect(fd, res->ai_addr, res->ai_addrlen); 506 if (rc != 0) { 507 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 508 /* try next family */ 509 close(fd); 510 fd = -1; 511 continue; 512 } 513 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client; 514 } 515 516 flag = fcntl(fd, F_GETFL); 517 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 518 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 519 close(fd); 520 fd = -1; 521 break; 522 } 523 break; 524 } 525 freeaddrinfo(res0); 526 527 if (fd < 0) { 528 return NULL; 529 } 530 531 /* Only enable zero copy for non-loopback sockets. */ 532 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 533 534 sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts); 535 if (sock == NULL) { 536 SPDK_ERRLOG("sock allocation failed\n"); 537 close(fd); 538 return NULL; 539 } 540 541 return &sock->base; 542 } 543 544 static struct spdk_sock * 545 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 546 { 547 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 548 } 549 550 static struct spdk_sock * 551 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 552 { 553 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 554 } 555 556 static struct spdk_sock * 557 posix_sock_accept(struct spdk_sock *_sock) 558 { 559 struct spdk_posix_sock *sock = __posix_sock(_sock); 560 struct sockaddr_storage sa; 561 socklen_t salen; 562 int rc, fd; 563 struct spdk_posix_sock *new_sock; 564 int flag; 565 566 memset(&sa, 0, sizeof(sa)); 567 salen = sizeof(sa); 568 569 assert(sock != NULL); 570 571 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 572 573 if (rc == -1) { 574 return NULL; 575 } 576 577 fd = rc; 578 579 flag = fcntl(fd, F_GETFL); 580 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 581 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 582 close(fd); 583 return NULL; 584 } 585 586 #if defined(SO_PRIORITY) 587 /* The priority is not inherited, so call this function again */ 588 if (sock->base.opts.priority) { 589 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 590 if (rc != 0) { 591 close(fd); 592 return NULL; 593 } 594 } 595 #endif 596 597 /* Inherit the zero copy feature from the listen socket */ 598 new_sock = posix_sock_alloc(fd, sock->zcopy); 599 if (new_sock == NULL) { 600 close(fd); 601 return NULL; 602 } 603 604 return &new_sock->base; 605 } 606 607 static int 608 posix_sock_close(struct spdk_sock *_sock) 609 { 610 struct spdk_posix_sock *sock = __posix_sock(_sock); 611 612 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 613 614 /* If the socket fails to close, the best choice is to 615 * leak the fd but continue to free the rest of the sock 616 * memory. */ 617 close(sock->fd); 618 619 spdk_pipe_destroy(sock->recv_pipe); 620 free(sock->recv_buf); 621 free(sock); 622 623 return 0; 624 } 625 626 #ifdef SPDK_ZEROCOPY 627 static int 628 _sock_check_zcopy(struct spdk_sock *sock) 629 { 630 struct spdk_posix_sock *psock = __posix_sock(sock); 631 struct msghdr msgh = {}; 632 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 633 ssize_t rc; 634 struct sock_extended_err *serr; 635 struct cmsghdr *cm; 636 uint32_t idx; 637 struct spdk_sock_request *req, *treq; 638 bool found; 639 640 msgh.msg_control = buf; 641 msgh.msg_controllen = sizeof(buf); 642 643 while (true) { 644 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 645 646 if (rc < 0) { 647 if (errno == EWOULDBLOCK || errno == EAGAIN) { 648 return 0; 649 } 650 651 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 652 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 653 } else { 654 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 655 } 656 return 0; 657 } 658 659 cm = CMSG_FIRSTHDR(&msgh); 660 if (!(cm && 661 ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 662 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) { 663 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 664 return 0; 665 } 666 667 serr = (struct sock_extended_err *)CMSG_DATA(cm); 668 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 669 SPDK_WARNLOG("Unexpected extended error origin\n"); 670 return 0; 671 } 672 673 /* Most of the time, the pending_reqs array is in the exact 674 * order we need such that all of the requests to complete are 675 * in order, in the front. It is guaranteed that all requests 676 * belonging to the same sendmsg call are sequential, so once 677 * we encounter one match we can stop looping as soon as a 678 * non-match is found. 679 */ 680 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 681 found = false; 682 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 683 if (req->internal.offset == idx) { 684 found = true; 685 686 rc = spdk_sock_request_put(sock, req, 0); 687 if (rc < 0) { 688 return rc; 689 } 690 691 } else if (found) { 692 break; 693 } 694 } 695 } 696 } 697 698 return 0; 699 } 700 #endif 701 702 static int 703 _sock_flush(struct spdk_sock *sock) 704 { 705 struct spdk_posix_sock *psock = __posix_sock(sock); 706 struct msghdr msg = {}; 707 int flags; 708 struct iovec iovs[IOV_BATCH_SIZE]; 709 int iovcnt; 710 int retval; 711 struct spdk_sock_request *req; 712 int i; 713 ssize_t rc; 714 unsigned int offset; 715 size_t len; 716 717 /* Can't flush from within a callback or we end up with recursive calls */ 718 if (sock->cb_cnt > 0) { 719 return 0; 720 } 721 722 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL); 723 724 if (iovcnt == 0) { 725 return 0; 726 } 727 728 /* Perform the vectored write */ 729 msg.msg_iov = iovs; 730 msg.msg_iovlen = iovcnt; 731 #ifdef SPDK_ZEROCOPY 732 if (psock->zcopy) { 733 flags = MSG_ZEROCOPY; 734 } else 735 #endif 736 { 737 flags = 0; 738 } 739 rc = sendmsg(psock->fd, &msg, flags); 740 if (rc <= 0) { 741 if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 742 return 0; 743 } 744 return rc; 745 } 746 747 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 748 * req->internal.offset, so sendmsg_idx should not be zero */ 749 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 750 psock->sendmsg_idx = 1; 751 } else { 752 psock->sendmsg_idx++; 753 } 754 755 /* Consume the requests that were actually written */ 756 req = TAILQ_FIRST(&sock->queued_reqs); 757 while (req) { 758 offset = req->internal.offset; 759 760 for (i = 0; i < req->iovcnt; i++) { 761 /* Advance by the offset first */ 762 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 763 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 764 continue; 765 } 766 767 /* Calculate the remaining length of this element */ 768 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 769 770 if (len > (size_t)rc) { 771 /* This element was partially sent. */ 772 req->internal.offset += rc; 773 return 0; 774 } 775 776 offset = 0; 777 req->internal.offset += len; 778 rc -= len; 779 } 780 781 /* Handled a full request. */ 782 spdk_sock_request_pend(sock, req); 783 784 if (!psock->zcopy) { 785 /* The sendmsg syscall above isn't currently asynchronous, 786 * so it's already done. */ 787 retval = spdk_sock_request_put(sock, req, 0); 788 if (retval) { 789 break; 790 } 791 } else { 792 /* Re-use the offset field to hold the sendmsg call index. The 793 * index is 0 based, so subtract one here because we've already 794 * incremented above. */ 795 req->internal.offset = psock->sendmsg_idx - 1; 796 } 797 798 if (rc == 0) { 799 break; 800 } 801 802 req = TAILQ_FIRST(&sock->queued_reqs); 803 } 804 805 return 0; 806 } 807 808 static int 809 posix_sock_flush(struct spdk_sock *sock) 810 { 811 #ifdef SPDK_ZEROCOPY 812 struct spdk_posix_sock *psock = __posix_sock(sock); 813 814 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 815 _sock_check_zcopy(sock); 816 } 817 #endif 818 819 return _sock_flush(sock); 820 } 821 822 static ssize_t 823 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 824 { 825 struct iovec siov[2]; 826 int sbytes; 827 ssize_t bytes; 828 struct spdk_posix_sock_group_impl *group; 829 830 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 831 if (sbytes < 0) { 832 errno = EINVAL; 833 return -1; 834 } else if (sbytes == 0) { 835 errno = EAGAIN; 836 return -1; 837 } 838 839 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 840 841 if (bytes == 0) { 842 /* The only way this happens is if diov is 0 length */ 843 errno = EINVAL; 844 return -1; 845 } 846 847 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 848 849 /* If we drained the pipe, mark it appropriately */ 850 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 851 assert(sock->pipe_has_data == true); 852 853 group = __posix_group_impl(sock->base.group_impl); 854 if (group && !sock->socket_has_data) { 855 TAILQ_REMOVE(&group->socks_with_data, sock, link); 856 } 857 858 sock->pipe_has_data = false; 859 } 860 861 return bytes; 862 } 863 864 static inline ssize_t 865 posix_sock_read(struct spdk_posix_sock *sock) 866 { 867 struct iovec iov[2]; 868 int bytes_avail, bytes_recvd; 869 struct spdk_posix_sock_group_impl *group; 870 871 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 872 873 if (bytes_avail <= 0) { 874 return bytes_avail; 875 } 876 877 bytes_recvd = readv(sock->fd, iov, 2); 878 879 assert(sock->pipe_has_data == false); 880 881 if (bytes_recvd <= 0) { 882 /* Errors count as draining the socket data */ 883 if (sock->base.group_impl && sock->socket_has_data) { 884 group = __posix_group_impl(sock->base.group_impl); 885 TAILQ_REMOVE(&group->socks_with_data, sock, link); 886 } 887 888 sock->socket_has_data = false; 889 890 return bytes_recvd; 891 } 892 893 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 894 895 #if DEBUG 896 if (sock->base.group_impl) { 897 assert(sock->socket_has_data == true); 898 } 899 #endif 900 901 sock->pipe_has_data = true; 902 if (bytes_recvd < bytes_avail) { 903 /* We drained the kernel socket entirely. */ 904 sock->socket_has_data = false; 905 } 906 907 return bytes_recvd; 908 } 909 910 static ssize_t 911 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 912 { 913 struct spdk_posix_sock *sock = __posix_sock(_sock); 914 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 915 int rc, i; 916 size_t len; 917 918 if (sock->recv_pipe == NULL) { 919 assert(sock->pipe_has_data == false); 920 if (group && sock->socket_has_data) { 921 sock->socket_has_data = false; 922 TAILQ_REMOVE(&group->socks_with_data, sock, link); 923 } 924 return readv(sock->fd, iov, iovcnt); 925 } 926 927 /* If the socket is not in a group, we must assume it always has 928 * data waiting for us because it is not epolled */ 929 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 930 /* If the user is receiving a sufficiently large amount of data, 931 * receive directly to their buffers. */ 932 len = 0; 933 for (i = 0; i < iovcnt; i++) { 934 len += iov[i].iov_len; 935 } 936 937 if (len >= MIN_SOCK_PIPE_SIZE) { 938 /* TODO: Should this detect if kernel socket is drained? */ 939 return readv(sock->fd, iov, iovcnt); 940 } 941 942 /* Otherwise, do a big read into our pipe */ 943 rc = posix_sock_read(sock); 944 if (rc <= 0) { 945 return rc; 946 } 947 } 948 949 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 950 } 951 952 static ssize_t 953 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 954 { 955 struct iovec iov[1]; 956 957 iov[0].iov_base = buf; 958 iov[0].iov_len = len; 959 960 return posix_sock_readv(sock, iov, 1); 961 } 962 963 static ssize_t 964 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 965 { 966 struct spdk_posix_sock *sock = __posix_sock(_sock); 967 int rc; 968 969 /* In order to process a writev, we need to flush any asynchronous writes 970 * first. */ 971 rc = _sock_flush(_sock); 972 if (rc < 0) { 973 return rc; 974 } 975 976 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 977 /* We weren't able to flush all requests */ 978 errno = EAGAIN; 979 return -1; 980 } 981 982 return writev(sock->fd, iov, iovcnt); 983 } 984 985 static void 986 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 987 { 988 int rc; 989 990 spdk_sock_request_queue(sock, req); 991 992 /* If there are a sufficient number queued, just flush them out immediately. */ 993 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 994 rc = _sock_flush(sock); 995 if (rc) { 996 spdk_sock_abort_requests(sock); 997 } 998 } 999 } 1000 1001 static int 1002 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1003 { 1004 struct spdk_posix_sock *sock = __posix_sock(_sock); 1005 int val; 1006 int rc; 1007 1008 assert(sock != NULL); 1009 1010 val = nbytes; 1011 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1012 if (rc != 0) { 1013 return -1; 1014 } 1015 return 0; 1016 } 1017 1018 static bool 1019 posix_sock_is_ipv6(struct spdk_sock *_sock) 1020 { 1021 struct spdk_posix_sock *sock = __posix_sock(_sock); 1022 struct sockaddr_storage sa; 1023 socklen_t salen; 1024 int rc; 1025 1026 assert(sock != NULL); 1027 1028 memset(&sa, 0, sizeof sa); 1029 salen = sizeof sa; 1030 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1031 if (rc != 0) { 1032 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1033 return false; 1034 } 1035 1036 return (sa.ss_family == AF_INET6); 1037 } 1038 1039 static bool 1040 posix_sock_is_ipv4(struct spdk_sock *_sock) 1041 { 1042 struct spdk_posix_sock *sock = __posix_sock(_sock); 1043 struct sockaddr_storage sa; 1044 socklen_t salen; 1045 int rc; 1046 1047 assert(sock != NULL); 1048 1049 memset(&sa, 0, sizeof sa); 1050 salen = sizeof sa; 1051 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1052 if (rc != 0) { 1053 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1054 return false; 1055 } 1056 1057 return (sa.ss_family == AF_INET); 1058 } 1059 1060 static bool 1061 posix_sock_is_connected(struct spdk_sock *_sock) 1062 { 1063 struct spdk_posix_sock *sock = __posix_sock(_sock); 1064 uint8_t byte; 1065 int rc; 1066 1067 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1068 if (rc == 0) { 1069 return false; 1070 } 1071 1072 if (rc < 0) { 1073 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1074 return true; 1075 } 1076 1077 return false; 1078 } 1079 1080 return true; 1081 } 1082 1083 static struct spdk_sock_group_impl * 1084 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock) 1085 { 1086 struct spdk_posix_sock *sock = __posix_sock(_sock); 1087 struct spdk_sock_group_impl *group_impl; 1088 1089 if (sock->placement_id != -1) { 1090 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl); 1091 return group_impl; 1092 } 1093 1094 return NULL; 1095 } 1096 1097 static struct spdk_sock_group_impl * 1098 posix_sock_group_impl_create(void) 1099 { 1100 struct spdk_posix_sock_group_impl *group_impl; 1101 int fd; 1102 1103 #if defined(SPDK_EPOLL) 1104 fd = epoll_create1(0); 1105 #elif defined(SPDK_KEVENT) 1106 fd = kqueue(); 1107 #endif 1108 if (fd == -1) { 1109 return NULL; 1110 } 1111 1112 group_impl = calloc(1, sizeof(*group_impl)); 1113 if (group_impl == NULL) { 1114 SPDK_ERRLOG("group_impl allocation failed\n"); 1115 close(fd); 1116 return NULL; 1117 } 1118 1119 group_impl->fd = fd; 1120 TAILQ_INIT(&group_impl->socks_with_data); 1121 group_impl->placement_id = -1; 1122 1123 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1124 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1125 group_impl->placement_id = spdk_env_get_current_core(); 1126 } 1127 1128 return &group_impl->base; 1129 } 1130 1131 static void 1132 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1133 int placement_id) 1134 { 1135 #if defined(SO_MARK) 1136 int rc; 1137 1138 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1139 &placement_id, sizeof(placement_id)); 1140 if (rc != 0) { 1141 /* Not fatal */ 1142 SPDK_ERRLOG("Error setting SO_MARK\n"); 1143 return; 1144 } 1145 1146 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1147 if (rc != 0) { 1148 /* Not fatal */ 1149 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1150 return; 1151 } 1152 1153 sock->placement_id = placement_id; 1154 #endif 1155 } 1156 1157 static void 1158 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1159 { 1160 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1161 1162 if (group->placement_id == -1) { 1163 group->placement_id = spdk_sock_map_find_free(&g_map); 1164 1165 /* If a free placement id is found, update existing sockets in this group */ 1166 if (group->placement_id != -1) { 1167 struct spdk_sock *sock, *tmp; 1168 1169 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1170 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1171 } 1172 } 1173 } 1174 1175 if (group->placement_id != -1) { 1176 /* 1177 * group placement id is already determined for this poll group. 1178 * Mark socket with group's placement id. 1179 */ 1180 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1181 } 1182 } 1183 1184 static int 1185 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1186 { 1187 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1188 struct spdk_posix_sock *sock = __posix_sock(_sock); 1189 int rc; 1190 1191 #if defined(SPDK_EPOLL) 1192 struct epoll_event event; 1193 1194 memset(&event, 0, sizeof(event)); 1195 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1196 event.events = EPOLLIN | EPOLLERR; 1197 event.data.ptr = sock; 1198 1199 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1200 #elif defined(SPDK_KEVENT) 1201 struct kevent event; 1202 struct timespec ts = {0}; 1203 1204 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1205 1206 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1207 #endif 1208 1209 if (rc != 0) { 1210 return rc; 1211 } 1212 1213 /* switched from another polling group due to scheduling */ 1214 if (spdk_unlikely(sock->recv_pipe != NULL && 1215 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1216 sock->pipe_has_data = true; 1217 sock->socket_has_data = false; 1218 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1219 } 1220 1221 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 1222 posix_sock_update_mark(_group, _sock); 1223 } else if (sock->placement_id != -1) { 1224 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1225 if (rc != 0) { 1226 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1227 /* Do not treat this as an error. The system will continue running. */ 1228 } 1229 } 1230 1231 return rc; 1232 } 1233 1234 static int 1235 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1236 { 1237 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1238 struct spdk_posix_sock *sock = __posix_sock(_sock); 1239 int rc; 1240 1241 if (sock->pipe_has_data || sock->socket_has_data) { 1242 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1243 sock->pipe_has_data = false; 1244 sock->socket_has_data = false; 1245 } 1246 1247 if (sock->placement_id != -1) { 1248 spdk_sock_map_release(&g_map, sock->placement_id); 1249 } 1250 1251 #if defined(SPDK_EPOLL) 1252 struct epoll_event event; 1253 1254 /* Event parameter is ignored but some old kernel version still require it. */ 1255 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1256 #elif defined(SPDK_KEVENT) 1257 struct kevent event; 1258 struct timespec ts = {0}; 1259 1260 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1261 1262 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1263 if (rc == 0 && event.flags & EV_ERROR) { 1264 rc = -1; 1265 errno = event.data; 1266 } 1267 #endif 1268 1269 spdk_sock_abort_requests(_sock); 1270 1271 return rc; 1272 } 1273 1274 static int 1275 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1276 struct spdk_sock **socks) 1277 { 1278 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1279 struct spdk_sock *sock, *tmp; 1280 int num_events, i, rc; 1281 struct spdk_posix_sock *psock, *ptmp; 1282 #if defined(SPDK_EPOLL) 1283 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1284 #elif defined(SPDK_KEVENT) 1285 struct kevent events[MAX_EVENTS_PER_POLL]; 1286 struct timespec ts = {0}; 1287 #endif 1288 1289 #ifdef SPDK_ZEROCOPY 1290 /* When all of the following conditions are met 1291 * - non-blocking socket 1292 * - zero copy is enabled 1293 * - interrupts suppressed (i.e. busy polling) 1294 * - the NIC tx queue is full at the time sendmsg() is called 1295 * - epoll_wait determines there is an EPOLLIN event for the socket 1296 * then we can get into a situation where data we've sent is queued 1297 * up in the kernel network stack, but interrupts have been suppressed 1298 * because other traffic is flowing so the kernel misses the signal 1299 * to flush the software tx queue. If there wasn't incoming data 1300 * pending on the socket, then epoll_wait would have been sufficient 1301 * to kick off the send operation, but since there is a pending event 1302 * epoll_wait does not trigger the necessary operation. 1303 * 1304 * We deal with this by checking for all of the above conditions and 1305 * additionally looking for EPOLLIN events that were not consumed from 1306 * the last poll loop. We take this to mean that the upper layer is 1307 * unable to consume them because it is blocked waiting for resources 1308 * to free up, and those resources are most likely freed in response 1309 * to a pending asynchronous write completing. 1310 * 1311 * Additionally, sockets that have the same placement_id actually share 1312 * an underlying hardware queue. That means polling one of them is 1313 * equivalent to polling all of them. As a quick mechanism to avoid 1314 * making extra poll() calls, stash the last placement_id during the loop 1315 * and only poll if it's not the same. The overwhelmingly common case 1316 * is that all sockets in this list have the same placement_id because 1317 * SPDK is intentionally grouping sockets by that value, so even 1318 * though this won't stop all extra calls to poll(), it's very fast 1319 * and will catch all of them in practice. 1320 */ 1321 int last_placement_id = -1; 1322 1323 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 1324 if (psock->zcopy && psock->placement_id >= 0 && 1325 psock->placement_id != last_placement_id) { 1326 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 1327 1328 poll(&pfd, 1, 0); 1329 last_placement_id = psock->placement_id; 1330 } 1331 } 1332 #endif 1333 1334 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1335 * a completion callback could remove the sock from the 1336 * group. */ 1337 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1338 rc = _sock_flush(sock); 1339 if (rc) { 1340 spdk_sock_abort_requests(sock); 1341 } 1342 } 1343 1344 assert(max_events > 0); 1345 1346 #if defined(SPDK_EPOLL) 1347 num_events = epoll_wait(group->fd, events, max_events, 0); 1348 #elif defined(SPDK_KEVENT) 1349 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1350 #endif 1351 1352 if (num_events == -1) { 1353 return -1; 1354 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 1355 sock = TAILQ_FIRST(&_group->socks); 1356 psock = __posix_sock(sock); 1357 /* poll() is called here to busy poll the queue associated with 1358 * first socket in list and potentially reap incoming data. 1359 */ 1360 if (sock->opts.priority) { 1361 struct pollfd pfd = {0, 0, 0}; 1362 1363 pfd.fd = psock->fd; 1364 pfd.events = POLLIN | POLLERR; 1365 poll(&pfd, 1, 0); 1366 } 1367 } 1368 1369 for (i = 0; i < num_events; i++) { 1370 #if defined(SPDK_EPOLL) 1371 sock = events[i].data.ptr; 1372 psock = __posix_sock(sock); 1373 1374 #ifdef SPDK_ZEROCOPY 1375 if (events[i].events & EPOLLERR) { 1376 rc = _sock_check_zcopy(sock); 1377 /* If the socket was closed or removed from 1378 * the group in response to a send ack, don't 1379 * add it to the array here. */ 1380 if (rc || sock->cb_fn == NULL) { 1381 continue; 1382 } 1383 } 1384 #endif 1385 if ((events[i].events & EPOLLIN) == 0) { 1386 continue; 1387 } 1388 1389 #elif defined(SPDK_KEVENT) 1390 sock = events[i].udata; 1391 psock = __posix_sock(sock); 1392 #endif 1393 1394 /* If the socket is not already in the list, add it now */ 1395 if (!psock->socket_has_data && !psock->pipe_has_data) { 1396 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 1397 } 1398 psock->socket_has_data = true; 1399 } 1400 1401 num_events = 0; 1402 1403 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 1404 if (num_events == max_events) { 1405 break; 1406 } 1407 1408 /* If the socket's cb_fn is NULL, just remove it from the 1409 * list and do not add it to socks array */ 1410 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 1411 psock->socket_has_data = false; 1412 psock->pipe_has_data = false; 1413 TAILQ_REMOVE(&group->socks_with_data, psock, link); 1414 continue; 1415 } 1416 1417 socks[num_events++] = &psock->base; 1418 } 1419 1420 /* Cycle the has_data list so that each time we poll things aren't 1421 * in the same order. Say we have 6 sockets in the list, named as follows: 1422 * A B C D E F 1423 * And all 6 sockets had epoll events, but max_events is only 3. That means 1424 * psock currently points at D. We want to rearrange the list to the following: 1425 * D E F A B C 1426 * 1427 * The variables below are named according to this example to make it easier to 1428 * follow the swaps. 1429 */ 1430 if (psock != NULL) { 1431 struct spdk_posix_sock *pa, *pc, *pd, *pf; 1432 1433 /* Capture pointers to the elements we need */ 1434 pd = psock; 1435 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 1436 pa = TAILQ_FIRST(&group->socks_with_data); 1437 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 1438 1439 /* Break the link between C and D */ 1440 pc->link.tqe_next = NULL; 1441 1442 /* Connect F to A */ 1443 pf->link.tqe_next = pa; 1444 pa->link.tqe_prev = &pf->link.tqe_next; 1445 1446 /* Fix up the list first/last pointers */ 1447 group->socks_with_data.tqh_first = pd; 1448 group->socks_with_data.tqh_last = &pc->link.tqe_next; 1449 1450 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1451 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 1452 } 1453 1454 return num_events; 1455 } 1456 1457 static int 1458 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1459 { 1460 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1461 int rc; 1462 1463 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1464 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1465 } 1466 1467 rc = close(group->fd); 1468 free(group); 1469 return rc; 1470 } 1471 1472 static int 1473 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1474 { 1475 if (!opts || !len) { 1476 errno = EINVAL; 1477 return -1; 1478 } 1479 memset(opts, 0, *len); 1480 1481 #define FIELD_OK(field) \ 1482 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1483 1484 #define GET_FIELD(field) \ 1485 if (FIELD_OK(field)) { \ 1486 opts->field = g_spdk_posix_sock_impl_opts.field; \ 1487 } 1488 1489 GET_FIELD(recv_buf_size); 1490 GET_FIELD(send_buf_size); 1491 GET_FIELD(enable_recv_pipe); 1492 GET_FIELD(enable_zerocopy_send); 1493 GET_FIELD(enable_quickack); 1494 GET_FIELD(enable_placement_id); 1495 GET_FIELD(enable_zerocopy_send_server); 1496 GET_FIELD(enable_zerocopy_send_client); 1497 1498 #undef GET_FIELD 1499 #undef FIELD_OK 1500 1501 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 1502 return 0; 1503 } 1504 1505 static int 1506 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1507 { 1508 if (!opts) { 1509 errno = EINVAL; 1510 return -1; 1511 } 1512 1513 #define FIELD_OK(field) \ 1514 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1515 1516 #define SET_FIELD(field) \ 1517 if (FIELD_OK(field)) { \ 1518 g_spdk_posix_sock_impl_opts.field = opts->field; \ 1519 } 1520 1521 SET_FIELD(recv_buf_size); 1522 SET_FIELD(send_buf_size); 1523 SET_FIELD(enable_recv_pipe); 1524 SET_FIELD(enable_zerocopy_send); 1525 SET_FIELD(enable_quickack); 1526 SET_FIELD(enable_placement_id); 1527 SET_FIELD(enable_zerocopy_send_server); 1528 SET_FIELD(enable_zerocopy_send_client); 1529 1530 #undef SET_FIELD 1531 #undef FIELD_OK 1532 1533 return 0; 1534 } 1535 1536 1537 static struct spdk_net_impl g_posix_net_impl = { 1538 .name = "posix", 1539 .getaddr = posix_sock_getaddr, 1540 .connect = posix_sock_connect, 1541 .listen = posix_sock_listen, 1542 .accept = posix_sock_accept, 1543 .close = posix_sock_close, 1544 .recv = posix_sock_recv, 1545 .readv = posix_sock_readv, 1546 .writev = posix_sock_writev, 1547 .writev_async = posix_sock_writev_async, 1548 .flush = posix_sock_flush, 1549 .set_recvlowat = posix_sock_set_recvlowat, 1550 .set_recvbuf = posix_sock_set_recvbuf, 1551 .set_sendbuf = posix_sock_set_sendbuf, 1552 .is_ipv6 = posix_sock_is_ipv6, 1553 .is_ipv4 = posix_sock_is_ipv4, 1554 .is_connected = posix_sock_is_connected, 1555 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 1556 .group_impl_create = posix_sock_group_impl_create, 1557 .group_impl_add_sock = posix_sock_group_impl_add_sock, 1558 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 1559 .group_impl_poll = posix_sock_group_impl_poll, 1560 .group_impl_close = posix_sock_group_impl_close, 1561 .get_opts = posix_sock_impl_get_opts, 1562 .set_opts = posix_sock_impl_set_opts, 1563 }; 1564 1565 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1566