1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 6 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "spdk/stdinc.h" 36 37 #if defined(__FreeBSD__) 38 #include <sys/event.h> 39 #define SPDK_KEVENT 40 #else 41 #include <sys/epoll.h> 42 #define SPDK_EPOLL 43 #endif 44 45 #if defined(__linux__) 46 #include <linux/errqueue.h> 47 #endif 48 49 #include "spdk/env.h" 50 #include "spdk/log.h" 51 #include "spdk/pipe.h" 52 #include "spdk/sock.h" 53 #include "spdk/util.h" 54 #include "spdk_internal/sock.h" 55 #include "../sock_kernel.h" 56 57 #define MAX_TMPBUF 1024 58 #define PORTNUMLEN 32 59 60 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 61 #define SPDK_ZEROCOPY 62 #endif 63 64 struct spdk_posix_sock { 65 struct spdk_sock base; 66 int fd; 67 68 uint32_t sendmsg_idx; 69 70 struct spdk_pipe *recv_pipe; 71 void *recv_buf; 72 int recv_buf_sz; 73 bool pipe_has_data; 74 bool socket_has_data; 75 bool zcopy; 76 77 int placement_id; 78 79 TAILQ_ENTRY(spdk_posix_sock) link; 80 }; 81 82 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 83 84 struct spdk_posix_sock_group_impl { 85 struct spdk_sock_group_impl base; 86 int fd; 87 struct spdk_has_data_list socks_with_data; 88 int placement_id; 89 }; 90 91 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 92 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 93 .send_buf_size = MIN_SO_SNDBUF_SIZE, 94 .enable_recv_pipe = true, 95 .enable_quickack = false, 96 .enable_placement_id = PLACEMENT_NONE, 97 .enable_zerocopy_send_server = true, 98 .enable_zerocopy_send_client = false 99 }; 100 101 static struct spdk_sock_map g_map = { 102 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 103 .mtx = PTHREAD_MUTEX_INITIALIZER 104 }; 105 106 __attribute((destructor)) static void 107 posix_sock_map_cleanup(void) 108 { 109 spdk_sock_map_cleanup(&g_map); 110 } 111 112 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 113 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 114 115 static int 116 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 117 char *caddr, int clen, uint16_t *cport) 118 { 119 struct spdk_posix_sock *sock = __posix_sock(_sock); 120 struct sockaddr_storage sa; 121 socklen_t salen; 122 int rc; 123 124 assert(sock != NULL); 125 126 memset(&sa, 0, sizeof sa); 127 salen = sizeof sa; 128 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 129 if (rc != 0) { 130 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 131 return -1; 132 } 133 134 switch (sa.ss_family) { 135 case AF_UNIX: 136 /* Acceptable connection types that don't have IPs */ 137 return 0; 138 case AF_INET: 139 case AF_INET6: 140 /* Code below will get IP addresses */ 141 break; 142 default: 143 /* Unsupported socket family */ 144 return -1; 145 } 146 147 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 148 if (rc != 0) { 149 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 150 return -1; 151 } 152 153 if (sport) { 154 if (sa.ss_family == AF_INET) { 155 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 156 } else if (sa.ss_family == AF_INET6) { 157 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 158 } 159 } 160 161 memset(&sa, 0, sizeof sa); 162 salen = sizeof sa; 163 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 164 if (rc != 0) { 165 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 166 return -1; 167 } 168 169 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 170 if (rc != 0) { 171 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 172 return -1; 173 } 174 175 if (cport) { 176 if (sa.ss_family == AF_INET) { 177 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 178 } else if (sa.ss_family == AF_INET6) { 179 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 180 } 181 } 182 183 return 0; 184 } 185 186 enum posix_sock_create_type { 187 SPDK_SOCK_CREATE_LISTEN, 188 SPDK_SOCK_CREATE_CONNECT, 189 }; 190 191 static int 192 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 193 { 194 uint8_t *new_buf; 195 struct spdk_pipe *new_pipe; 196 struct iovec siov[2]; 197 struct iovec diov[2]; 198 int sbytes; 199 ssize_t bytes; 200 201 if (sock->recv_buf_sz == sz) { 202 return 0; 203 } 204 205 /* If the new size is 0, just free the pipe */ 206 if (sz == 0) { 207 spdk_pipe_destroy(sock->recv_pipe); 208 free(sock->recv_buf); 209 sock->recv_pipe = NULL; 210 sock->recv_buf = NULL; 211 return 0; 212 } else if (sz < MIN_SOCK_PIPE_SIZE) { 213 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 214 return -1; 215 } 216 217 /* Round up to next 64 byte multiple */ 218 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 219 if (!new_buf) { 220 SPDK_ERRLOG("socket recv buf allocation failed\n"); 221 return -ENOMEM; 222 } 223 224 new_pipe = spdk_pipe_create(new_buf, sz + 1); 225 if (new_pipe == NULL) { 226 SPDK_ERRLOG("socket pipe allocation failed\n"); 227 free(new_buf); 228 return -ENOMEM; 229 } 230 231 if (sock->recv_pipe != NULL) { 232 /* Pull all of the data out of the old pipe */ 233 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 234 if (sbytes > sz) { 235 /* Too much data to fit into the new pipe size */ 236 spdk_pipe_destroy(new_pipe); 237 free(new_buf); 238 return -EINVAL; 239 } 240 241 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 242 assert(sbytes == sz); 243 244 bytes = spdk_iovcpy(siov, 2, diov, 2); 245 spdk_pipe_writer_advance(new_pipe, bytes); 246 247 spdk_pipe_destroy(sock->recv_pipe); 248 free(sock->recv_buf); 249 } 250 251 sock->recv_buf_sz = sz; 252 sock->recv_buf = new_buf; 253 sock->recv_pipe = new_pipe; 254 255 return 0; 256 } 257 258 static int 259 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 260 { 261 struct spdk_posix_sock *sock = __posix_sock(_sock); 262 int rc; 263 264 assert(sock != NULL); 265 266 if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) { 267 rc = posix_sock_alloc_pipe(sock, sz); 268 if (rc) { 269 return rc; 270 } 271 } 272 273 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ 274 if (sz < MIN_SO_RCVBUF_SIZE) { 275 sz = MIN_SO_RCVBUF_SIZE; 276 } 277 278 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 279 if (rc < 0) { 280 return rc; 281 } 282 283 return 0; 284 } 285 286 static int 287 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 288 { 289 struct spdk_posix_sock *sock = __posix_sock(_sock); 290 int rc; 291 292 assert(sock != NULL); 293 294 if (sz < MIN_SO_SNDBUF_SIZE) { 295 sz = MIN_SO_SNDBUF_SIZE; 296 } 297 298 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 299 if (rc < 0) { 300 return rc; 301 } 302 303 return 0; 304 } 305 306 static struct spdk_posix_sock * 307 posix_sock_alloc(int fd, bool enable_zero_copy) 308 { 309 struct spdk_posix_sock *sock; 310 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 311 int flag; 312 int rc; 313 #endif 314 315 sock = calloc(1, sizeof(*sock)); 316 if (sock == NULL) { 317 SPDK_ERRLOG("sock allocation failed\n"); 318 return NULL; 319 } 320 321 sock->fd = fd; 322 323 #if defined(SPDK_ZEROCOPY) 324 flag = 1; 325 326 if (enable_zero_copy) { 327 /* Try to turn on zero copy sends */ 328 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 329 if (rc == 0) { 330 sock->zcopy = true; 331 } 332 } 333 #endif 334 335 #if defined(__linux__) 336 flag = 1; 337 338 if (g_spdk_posix_sock_impl_opts.enable_quickack) { 339 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 340 if (rc != 0) { 341 SPDK_ERRLOG("quickack was failed to set\n"); 342 } 343 } 344 345 spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id, 346 &sock->placement_id); 347 348 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 349 /* Save placement_id */ 350 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 351 } 352 #endif 353 354 return sock; 355 } 356 357 static struct spdk_sock * 358 posix_sock_create(const char *ip, int port, 359 enum posix_sock_create_type type, 360 struct spdk_sock_opts *opts) 361 { 362 struct spdk_posix_sock *sock; 363 char buf[MAX_TMPBUF]; 364 char portnum[PORTNUMLEN]; 365 char *p; 366 struct addrinfo hints, *res, *res0; 367 int fd, flag; 368 int val = 1; 369 int rc, sz; 370 bool enable_zcopy_user_opts = true; 371 bool enable_zcopy_impl_opts = true; 372 373 assert(opts != NULL); 374 375 if (ip == NULL) { 376 return NULL; 377 } 378 if (ip[0] == '[') { 379 snprintf(buf, sizeof(buf), "%s", ip + 1); 380 p = strchr(buf, ']'); 381 if (p != NULL) { 382 *p = '\0'; 383 } 384 ip = (const char *) &buf[0]; 385 } 386 387 snprintf(portnum, sizeof portnum, "%d", port); 388 memset(&hints, 0, sizeof hints); 389 hints.ai_family = PF_UNSPEC; 390 hints.ai_socktype = SOCK_STREAM; 391 hints.ai_flags = AI_NUMERICSERV; 392 hints.ai_flags |= AI_PASSIVE; 393 hints.ai_flags |= AI_NUMERICHOST; 394 rc = getaddrinfo(ip, portnum, &hints, &res0); 395 if (rc != 0) { 396 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 397 return NULL; 398 } 399 400 /* try listen */ 401 fd = -1; 402 for (res = res0; res != NULL; res = res->ai_next) { 403 retry: 404 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 405 if (fd < 0) { 406 /* error */ 407 continue; 408 } 409 410 sz = g_spdk_posix_sock_impl_opts.recv_buf_size; 411 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 412 if (rc) { 413 /* Not fatal */ 414 } 415 416 sz = g_spdk_posix_sock_impl_opts.send_buf_size; 417 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 418 if (rc) { 419 /* Not fatal */ 420 } 421 422 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 423 if (rc != 0) { 424 close(fd); 425 fd = -1; 426 /* error */ 427 continue; 428 } 429 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 430 if (rc != 0) { 431 close(fd); 432 fd = -1; 433 /* error */ 434 continue; 435 } 436 437 #if defined(SO_PRIORITY) 438 if (opts->priority) { 439 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 440 if (rc != 0) { 441 close(fd); 442 fd = -1; 443 /* error */ 444 continue; 445 } 446 } 447 #endif 448 449 if (res->ai_family == AF_INET6) { 450 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 451 if (rc != 0) { 452 close(fd); 453 fd = -1; 454 /* error */ 455 continue; 456 } 457 } 458 459 if (type == SPDK_SOCK_CREATE_LISTEN) { 460 rc = bind(fd, res->ai_addr, res->ai_addrlen); 461 if (rc != 0) { 462 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 463 switch (errno) { 464 case EINTR: 465 /* interrupted? */ 466 close(fd); 467 goto retry; 468 case EADDRNOTAVAIL: 469 SPDK_ERRLOG("IP address %s not available. " 470 "Verify IP address in config file " 471 "and make sure setup script is " 472 "run before starting spdk app.\n", ip); 473 /* FALLTHROUGH */ 474 default: 475 /* try next family */ 476 close(fd); 477 fd = -1; 478 continue; 479 } 480 } 481 /* bind OK */ 482 rc = listen(fd, 512); 483 if (rc != 0) { 484 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 485 close(fd); 486 fd = -1; 487 break; 488 } 489 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server; 490 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 491 rc = connect(fd, res->ai_addr, res->ai_addrlen); 492 if (rc != 0) { 493 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 494 /* try next family */ 495 close(fd); 496 fd = -1; 497 continue; 498 } 499 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client; 500 } 501 502 flag = fcntl(fd, F_GETFL); 503 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 504 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 505 close(fd); 506 fd = -1; 507 break; 508 } 509 break; 510 } 511 freeaddrinfo(res0); 512 513 if (fd < 0) { 514 return NULL; 515 } 516 517 /* Only enable zero copy for non-loopback sockets. */ 518 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 519 520 sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts); 521 if (sock == NULL) { 522 SPDK_ERRLOG("sock allocation failed\n"); 523 close(fd); 524 return NULL; 525 } 526 527 return &sock->base; 528 } 529 530 static struct spdk_sock * 531 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 532 { 533 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 534 } 535 536 static struct spdk_sock * 537 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 538 { 539 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 540 } 541 542 static struct spdk_sock * 543 posix_sock_accept(struct spdk_sock *_sock) 544 { 545 struct spdk_posix_sock *sock = __posix_sock(_sock); 546 struct sockaddr_storage sa; 547 socklen_t salen; 548 int rc, fd; 549 struct spdk_posix_sock *new_sock; 550 int flag; 551 552 memset(&sa, 0, sizeof(sa)); 553 salen = sizeof(sa); 554 555 assert(sock != NULL); 556 557 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 558 559 if (rc == -1) { 560 return NULL; 561 } 562 563 fd = rc; 564 565 flag = fcntl(fd, F_GETFL); 566 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 567 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 568 close(fd); 569 return NULL; 570 } 571 572 #if defined(SO_PRIORITY) 573 /* The priority is not inherited, so call this function again */ 574 if (sock->base.opts.priority) { 575 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 576 if (rc != 0) { 577 close(fd); 578 return NULL; 579 } 580 } 581 #endif 582 583 /* Inherit the zero copy feature from the listen socket */ 584 new_sock = posix_sock_alloc(fd, sock->zcopy); 585 if (new_sock == NULL) { 586 close(fd); 587 return NULL; 588 } 589 590 return &new_sock->base; 591 } 592 593 static int 594 posix_sock_close(struct spdk_sock *_sock) 595 { 596 struct spdk_posix_sock *sock = __posix_sock(_sock); 597 598 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 599 600 /* If the socket fails to close, the best choice is to 601 * leak the fd but continue to free the rest of the sock 602 * memory. */ 603 close(sock->fd); 604 605 spdk_pipe_destroy(sock->recv_pipe); 606 free(sock->recv_buf); 607 free(sock); 608 609 return 0; 610 } 611 612 #ifdef SPDK_ZEROCOPY 613 static int 614 _sock_check_zcopy(struct spdk_sock *sock) 615 { 616 struct spdk_posix_sock *psock = __posix_sock(sock); 617 struct msghdr msgh = {}; 618 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 619 ssize_t rc; 620 struct sock_extended_err *serr; 621 struct cmsghdr *cm; 622 uint32_t idx; 623 struct spdk_sock_request *req, *treq; 624 bool found; 625 626 msgh.msg_control = buf; 627 msgh.msg_controllen = sizeof(buf); 628 629 while (true) { 630 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 631 632 if (rc < 0) { 633 if (errno == EWOULDBLOCK || errno == EAGAIN) { 634 return 0; 635 } 636 637 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 638 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 639 } else { 640 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 641 } 642 return 0; 643 } 644 645 cm = CMSG_FIRSTHDR(&msgh); 646 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 647 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 648 return 0; 649 } 650 651 serr = (struct sock_extended_err *)CMSG_DATA(cm); 652 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 653 SPDK_WARNLOG("Unexpected extended error origin\n"); 654 return 0; 655 } 656 657 /* Most of the time, the pending_reqs array is in the exact 658 * order we need such that all of the requests to complete are 659 * in order, in the front. It is guaranteed that all requests 660 * belonging to the same sendmsg call are sequential, so once 661 * we encounter one match we can stop looping as soon as a 662 * non-match is found. 663 */ 664 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 665 found = false; 666 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 667 if (req->internal.offset == idx) { 668 found = true; 669 670 rc = spdk_sock_request_put(sock, req, 0); 671 if (rc < 0) { 672 return rc; 673 } 674 675 } else if (found) { 676 break; 677 } 678 } 679 } 680 } 681 682 return 0; 683 } 684 #endif 685 686 static int 687 _sock_flush(struct spdk_sock *sock) 688 { 689 struct spdk_posix_sock *psock = __posix_sock(sock); 690 struct msghdr msg = {}; 691 int flags; 692 struct iovec iovs[IOV_BATCH_SIZE]; 693 int iovcnt; 694 int retval; 695 struct spdk_sock_request *req; 696 int i; 697 ssize_t rc; 698 unsigned int offset; 699 size_t len; 700 701 /* Can't flush from within a callback or we end up with recursive calls */ 702 if (sock->cb_cnt > 0) { 703 return 0; 704 } 705 706 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL); 707 708 if (iovcnt == 0) { 709 return 0; 710 } 711 712 /* Perform the vectored write */ 713 msg.msg_iov = iovs; 714 msg.msg_iovlen = iovcnt; 715 #ifdef SPDK_ZEROCOPY 716 if (psock->zcopy) { 717 flags = MSG_ZEROCOPY; 718 } else 719 #endif 720 { 721 flags = 0; 722 } 723 rc = sendmsg(psock->fd, &msg, flags); 724 if (rc <= 0) { 725 if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 726 return 0; 727 } 728 return rc; 729 } 730 731 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 732 * req->internal.offset, so sendmsg_idx should not be zero */ 733 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 734 psock->sendmsg_idx = 1; 735 } else { 736 psock->sendmsg_idx++; 737 } 738 739 /* Consume the requests that were actually written */ 740 req = TAILQ_FIRST(&sock->queued_reqs); 741 while (req) { 742 offset = req->internal.offset; 743 744 for (i = 0; i < req->iovcnt; i++) { 745 /* Advance by the offset first */ 746 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 747 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 748 continue; 749 } 750 751 /* Calculate the remaining length of this element */ 752 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 753 754 if (len > (size_t)rc) { 755 /* This element was partially sent. */ 756 req->internal.offset += rc; 757 return 0; 758 } 759 760 offset = 0; 761 req->internal.offset += len; 762 rc -= len; 763 } 764 765 /* Handled a full request. */ 766 spdk_sock_request_pend(sock, req); 767 768 if (!psock->zcopy) { 769 /* The sendmsg syscall above isn't currently asynchronous, 770 * so it's already done. */ 771 retval = spdk_sock_request_put(sock, req, 0); 772 if (retval) { 773 break; 774 } 775 } else { 776 /* Re-use the offset field to hold the sendmsg call index. The 777 * index is 0 based, so subtract one here because we've already 778 * incremented above. */ 779 req->internal.offset = psock->sendmsg_idx - 1; 780 } 781 782 if (rc == 0) { 783 break; 784 } 785 786 req = TAILQ_FIRST(&sock->queued_reqs); 787 } 788 789 return 0; 790 } 791 792 static int 793 posix_sock_flush(struct spdk_sock *sock) 794 { 795 #ifdef SPDK_ZEROCOPY 796 struct spdk_posix_sock *psock = __posix_sock(sock); 797 798 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 799 _sock_check_zcopy(sock); 800 } 801 #endif 802 803 return _sock_flush(sock); 804 } 805 806 static ssize_t 807 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 808 { 809 struct iovec siov[2]; 810 int sbytes; 811 ssize_t bytes; 812 struct spdk_posix_sock_group_impl *group; 813 814 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 815 if (sbytes < 0) { 816 errno = EINVAL; 817 return -1; 818 } else if (sbytes == 0) { 819 errno = EAGAIN; 820 return -1; 821 } 822 823 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 824 825 if (bytes == 0) { 826 /* The only way this happens is if diov is 0 length */ 827 errno = EINVAL; 828 return -1; 829 } 830 831 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 832 833 /* If we drained the pipe, mark it appropriately */ 834 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 835 assert(sock->pipe_has_data == true); 836 837 group = __posix_group_impl(sock->base.group_impl); 838 if (group && !sock->socket_has_data) { 839 TAILQ_REMOVE(&group->socks_with_data, sock, link); 840 } 841 842 sock->pipe_has_data = false; 843 } 844 845 return bytes; 846 } 847 848 static inline ssize_t 849 posix_sock_read(struct spdk_posix_sock *sock) 850 { 851 struct iovec iov[2]; 852 int bytes_avail, bytes_recvd; 853 struct spdk_posix_sock_group_impl *group; 854 855 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 856 857 if (bytes_avail <= 0) { 858 return bytes_avail; 859 } 860 861 bytes_recvd = readv(sock->fd, iov, 2); 862 863 assert(sock->pipe_has_data == false); 864 865 if (bytes_recvd <= 0) { 866 /* Errors count as draining the socket data */ 867 if (sock->base.group_impl && sock->socket_has_data) { 868 group = __posix_group_impl(sock->base.group_impl); 869 TAILQ_REMOVE(&group->socks_with_data, sock, link); 870 } 871 872 sock->socket_has_data = false; 873 874 return bytes_recvd; 875 } 876 877 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 878 879 #if DEBUG 880 if (sock->base.group_impl) { 881 assert(sock->socket_has_data == true); 882 } 883 #endif 884 885 sock->pipe_has_data = true; 886 sock->socket_has_data = false; 887 888 return bytes_recvd; 889 } 890 891 static ssize_t 892 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 893 { 894 struct spdk_posix_sock *sock = __posix_sock(_sock); 895 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 896 int rc, i; 897 size_t len; 898 899 if (sock->recv_pipe == NULL) { 900 assert(sock->pipe_has_data == false); 901 if (group && sock->socket_has_data) { 902 sock->socket_has_data = false; 903 TAILQ_REMOVE(&group->socks_with_data, sock, link); 904 } 905 return readv(sock->fd, iov, iovcnt); 906 } 907 908 /* If the socket is not in a group, we must assume it always has 909 * data waiting for us because it is not epolled */ 910 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 911 /* If the user is receiving a sufficiently large amount of data, 912 * receive directly to their buffers. */ 913 len = 0; 914 for (i = 0; i < iovcnt; i++) { 915 len += iov[i].iov_len; 916 } 917 918 if (len >= MIN_SOCK_PIPE_SIZE) { 919 /* TODO: Should this detect if kernel socket is drained? */ 920 return readv(sock->fd, iov, iovcnt); 921 } 922 923 /* Otherwise, do a big read into our pipe */ 924 rc = posix_sock_read(sock); 925 if (rc <= 0) { 926 return rc; 927 } 928 } 929 930 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 931 } 932 933 static ssize_t 934 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 935 { 936 struct iovec iov[1]; 937 938 iov[0].iov_base = buf; 939 iov[0].iov_len = len; 940 941 return posix_sock_readv(sock, iov, 1); 942 } 943 944 static ssize_t 945 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 946 { 947 struct spdk_posix_sock *sock = __posix_sock(_sock); 948 int rc; 949 950 /* In order to process a writev, we need to flush any asynchronous writes 951 * first. */ 952 rc = _sock_flush(_sock); 953 if (rc < 0) { 954 return rc; 955 } 956 957 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 958 /* We weren't able to flush all requests */ 959 errno = EAGAIN; 960 return -1; 961 } 962 963 return writev(sock->fd, iov, iovcnt); 964 } 965 966 static void 967 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 968 { 969 int rc; 970 971 spdk_sock_request_queue(sock, req); 972 973 /* If there are a sufficient number queued, just flush them out immediately. */ 974 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 975 rc = _sock_flush(sock); 976 if (rc) { 977 spdk_sock_abort_requests(sock); 978 } 979 } 980 } 981 982 static int 983 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 984 { 985 struct spdk_posix_sock *sock = __posix_sock(_sock); 986 int val; 987 int rc; 988 989 assert(sock != NULL); 990 991 val = nbytes; 992 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 993 if (rc != 0) { 994 return -1; 995 } 996 return 0; 997 } 998 999 static bool 1000 posix_sock_is_ipv6(struct spdk_sock *_sock) 1001 { 1002 struct spdk_posix_sock *sock = __posix_sock(_sock); 1003 struct sockaddr_storage sa; 1004 socklen_t salen; 1005 int rc; 1006 1007 assert(sock != NULL); 1008 1009 memset(&sa, 0, sizeof sa); 1010 salen = sizeof sa; 1011 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1012 if (rc != 0) { 1013 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1014 return false; 1015 } 1016 1017 return (sa.ss_family == AF_INET6); 1018 } 1019 1020 static bool 1021 posix_sock_is_ipv4(struct spdk_sock *_sock) 1022 { 1023 struct spdk_posix_sock *sock = __posix_sock(_sock); 1024 struct sockaddr_storage sa; 1025 socklen_t salen; 1026 int rc; 1027 1028 assert(sock != NULL); 1029 1030 memset(&sa, 0, sizeof sa); 1031 salen = sizeof sa; 1032 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1033 if (rc != 0) { 1034 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1035 return false; 1036 } 1037 1038 return (sa.ss_family == AF_INET); 1039 } 1040 1041 static bool 1042 posix_sock_is_connected(struct spdk_sock *_sock) 1043 { 1044 struct spdk_posix_sock *sock = __posix_sock(_sock); 1045 uint8_t byte; 1046 int rc; 1047 1048 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1049 if (rc == 0) { 1050 return false; 1051 } 1052 1053 if (rc < 0) { 1054 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1055 return true; 1056 } 1057 1058 return false; 1059 } 1060 1061 return true; 1062 } 1063 1064 static struct spdk_sock_group_impl * 1065 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock) 1066 { 1067 struct spdk_posix_sock *sock = __posix_sock(_sock); 1068 struct spdk_sock_group_impl *group_impl; 1069 1070 if (sock->placement_id != -1) { 1071 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl); 1072 return group_impl; 1073 } 1074 1075 return NULL; 1076 } 1077 1078 static struct spdk_sock_group_impl * 1079 posix_sock_group_impl_create(void) 1080 { 1081 struct spdk_posix_sock_group_impl *group_impl; 1082 int fd; 1083 1084 #if defined(SPDK_EPOLL) 1085 fd = epoll_create1(0); 1086 #elif defined(SPDK_KEVENT) 1087 fd = kqueue(); 1088 #endif 1089 if (fd == -1) { 1090 return NULL; 1091 } 1092 1093 group_impl = calloc(1, sizeof(*group_impl)); 1094 if (group_impl == NULL) { 1095 SPDK_ERRLOG("group_impl allocation failed\n"); 1096 close(fd); 1097 return NULL; 1098 } 1099 1100 group_impl->fd = fd; 1101 TAILQ_INIT(&group_impl->socks_with_data); 1102 group_impl->placement_id = -1; 1103 1104 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1105 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1106 group_impl->placement_id = spdk_env_get_current_core(); 1107 } 1108 1109 return &group_impl->base; 1110 } 1111 1112 static void 1113 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1114 int placement_id) 1115 { 1116 #if defined(SO_MARK) 1117 int rc; 1118 1119 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1120 &placement_id, sizeof(placement_id)); 1121 if (rc != 0) { 1122 /* Not fatal */ 1123 SPDK_ERRLOG("Error setting SO_MARK\n"); 1124 return; 1125 } 1126 1127 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1128 if (rc != 0) { 1129 /* Not fatal */ 1130 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1131 return; 1132 } 1133 1134 sock->placement_id = placement_id; 1135 #endif 1136 } 1137 1138 static void 1139 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1140 { 1141 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1142 1143 if (group->placement_id == -1) { 1144 group->placement_id = spdk_sock_map_find_free(&g_map); 1145 1146 /* If a free placement id is found, update existing sockets in this group */ 1147 if (group->placement_id != -1) { 1148 struct spdk_sock *sock, *tmp; 1149 1150 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1151 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1152 } 1153 } 1154 } 1155 1156 if (group->placement_id != -1) { 1157 /* 1158 * group placement id is already determined for this poll group. 1159 * Mark socket with group's placement id. 1160 */ 1161 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1162 } 1163 } 1164 1165 static int 1166 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1167 { 1168 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1169 struct spdk_posix_sock *sock = __posix_sock(_sock); 1170 int rc; 1171 1172 #if defined(SPDK_EPOLL) 1173 struct epoll_event event; 1174 1175 memset(&event, 0, sizeof(event)); 1176 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1177 event.events = EPOLLIN | EPOLLERR; 1178 event.data.ptr = sock; 1179 1180 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1181 #elif defined(SPDK_KEVENT) 1182 struct kevent event; 1183 struct timespec ts = {0}; 1184 1185 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1186 1187 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1188 #endif 1189 1190 if (rc != 0) { 1191 return rc; 1192 } 1193 1194 /* switched from another polling group due to scheduling */ 1195 if (spdk_unlikely(sock->recv_pipe != NULL && 1196 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1197 sock->pipe_has_data = true; 1198 sock->socket_has_data = false; 1199 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1200 } 1201 1202 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 1203 posix_sock_update_mark(_group, _sock); 1204 } else if (sock->placement_id != -1) { 1205 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1206 if (rc != 0) { 1207 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1208 /* Do not treat this as an error. The system will continue running. */ 1209 } 1210 } 1211 1212 return rc; 1213 } 1214 1215 static int 1216 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1217 { 1218 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1219 struct spdk_posix_sock *sock = __posix_sock(_sock); 1220 int rc; 1221 1222 if (sock->pipe_has_data || sock->socket_has_data) { 1223 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1224 sock->pipe_has_data = false; 1225 sock->socket_has_data = false; 1226 } 1227 1228 if (sock->placement_id != -1) { 1229 spdk_sock_map_release(&g_map, sock->placement_id); 1230 } 1231 1232 #if defined(SPDK_EPOLL) 1233 struct epoll_event event; 1234 1235 /* Event parameter is ignored but some old kernel version still require it. */ 1236 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1237 #elif defined(SPDK_KEVENT) 1238 struct kevent event; 1239 struct timespec ts = {0}; 1240 1241 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1242 1243 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1244 if (rc == 0 && event.flags & EV_ERROR) { 1245 rc = -1; 1246 errno = event.data; 1247 } 1248 #endif 1249 1250 spdk_sock_abort_requests(_sock); 1251 1252 return rc; 1253 } 1254 1255 static int 1256 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1257 struct spdk_sock **socks) 1258 { 1259 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1260 struct spdk_sock *sock, *tmp; 1261 int num_events, i, rc; 1262 struct spdk_posix_sock *psock, *ptmp; 1263 #if defined(SPDK_EPOLL) 1264 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1265 #elif defined(SPDK_KEVENT) 1266 struct kevent events[MAX_EVENTS_PER_POLL]; 1267 struct timespec ts = {0}; 1268 #endif 1269 1270 #ifdef SPDK_ZEROCOPY 1271 /* When all of the following conditions are met 1272 * - non-blocking socket 1273 * - zero copy is enabled 1274 * - interrupts suppressed (i.e. busy polling) 1275 * - the NIC tx queue is full at the time sendmsg() is called 1276 * - epoll_wait determines there is an EPOLLIN event for the socket 1277 * then we can get into a situation where data we've sent is queued 1278 * up in the kernel network stack, but interrupts have been suppressed 1279 * because other traffic is flowing so the kernel misses the signal 1280 * to flush the software tx queue. If there wasn't incoming data 1281 * pending on the socket, then epoll_wait would have been sufficient 1282 * to kick off the send operation, but since there is a pending event 1283 * epoll_wait does not trigger the necessary operation. 1284 * 1285 * We deal with this by checking for all of the above conditions and 1286 * additionally looking for EPOLLIN events that were not consumed from 1287 * the last poll loop. We take this to mean that the upper layer is 1288 * unable to consume them because it is blocked waiting for resources 1289 * to free up, and those resources are most likely freed in response 1290 * to a pending asynchronous write completing. 1291 * 1292 * Additionally, sockets that have the same placement_id actually share 1293 * an underlying hardware queue. That means polling one of them is 1294 * equivalent to polling all of them. As a quick mechanism to avoid 1295 * making extra poll() calls, stash the last placement_id during the loop 1296 * and only poll if it's not the same. The overwhelmingly common case 1297 * is that all sockets in this list have the same placement_id because 1298 * SPDK is intentionally grouping sockets by that value, so even 1299 * though this won't stop all extra calls to poll(), it's very fast 1300 * and will catch all of them in practice. 1301 */ 1302 int last_placement_id = -1; 1303 1304 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 1305 if (psock->zcopy && psock->placement_id >= 0 && 1306 psock->placement_id != last_placement_id) { 1307 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 1308 1309 poll(&pfd, 1, 0); 1310 last_placement_id = psock->placement_id; 1311 } 1312 } 1313 #endif 1314 1315 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1316 * a completion callback could remove the sock from the 1317 * group. */ 1318 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1319 rc = _sock_flush(sock); 1320 if (rc) { 1321 spdk_sock_abort_requests(sock); 1322 } 1323 } 1324 1325 assert(max_events > 0); 1326 1327 #if defined(SPDK_EPOLL) 1328 num_events = epoll_wait(group->fd, events, max_events, 0); 1329 #elif defined(SPDK_KEVENT) 1330 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1331 #endif 1332 1333 if (num_events == -1) { 1334 return -1; 1335 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 1336 sock = TAILQ_FIRST(&_group->socks); 1337 psock = __posix_sock(sock); 1338 /* poll() is called here to busy poll the queue associated with 1339 * first socket in list and potentially reap incoming data. 1340 */ 1341 if (sock->opts.priority) { 1342 struct pollfd pfd = {0, 0, 0}; 1343 1344 pfd.fd = psock->fd; 1345 pfd.events = POLLIN | POLLERR; 1346 poll(&pfd, 1, 0); 1347 } 1348 } 1349 1350 for (i = 0; i < num_events; i++) { 1351 #if defined(SPDK_EPOLL) 1352 sock = events[i].data.ptr; 1353 psock = __posix_sock(sock); 1354 1355 #ifdef SPDK_ZEROCOPY 1356 if (events[i].events & EPOLLERR) { 1357 rc = _sock_check_zcopy(sock); 1358 /* If the socket was closed or removed from 1359 * the group in response to a send ack, don't 1360 * add it to the array here. */ 1361 if (rc || sock->cb_fn == NULL) { 1362 continue; 1363 } 1364 } 1365 #endif 1366 if ((events[i].events & EPOLLIN) == 0) { 1367 continue; 1368 } 1369 1370 #elif defined(SPDK_KEVENT) 1371 sock = events[i].udata; 1372 psock = __posix_sock(sock); 1373 #endif 1374 1375 /* If the socket is not already in the list, add it now */ 1376 if (!psock->socket_has_data && !psock->pipe_has_data) { 1377 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 1378 } 1379 1380 psock->socket_has_data = true; 1381 } 1382 1383 num_events = 0; 1384 1385 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 1386 if (num_events == max_events) { 1387 break; 1388 } 1389 1390 /* If the socket's cb_fn is NULL, just remove it from the 1391 * list and do not add it to socks array */ 1392 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 1393 psock->socket_has_data = false; 1394 psock->pipe_has_data = false; 1395 TAILQ_REMOVE(&group->socks_with_data, psock, link); 1396 continue; 1397 } 1398 1399 socks[num_events++] = &psock->base; 1400 } 1401 1402 /* Cycle the has_data list so that each time we poll things aren't 1403 * in the same order. Say we have 6 sockets in the list, named as follows: 1404 * A B C D E F 1405 * And all 6 sockets had epoll events, but max_events is only 3. That means 1406 * psock currently points at D. We want to rearrange the list to the following: 1407 * D E F A B C 1408 * 1409 * The variables below are named according to this example to make it easier to 1410 * follow the swaps. 1411 */ 1412 if (psock != NULL) { 1413 struct spdk_posix_sock *pa, *pc, *pd, *pf; 1414 1415 /* Capture pointers to the elements we need */ 1416 pd = psock; 1417 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 1418 pa = TAILQ_FIRST(&group->socks_with_data); 1419 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 1420 1421 /* Break the link between C and D */ 1422 pc->link.tqe_next = NULL; 1423 1424 /* Connect F to A */ 1425 pf->link.tqe_next = pa; 1426 pa->link.tqe_prev = &pf->link.tqe_next; 1427 1428 /* Fix up the list first/last pointers */ 1429 group->socks_with_data.tqh_first = pd; 1430 group->socks_with_data.tqh_last = &pc->link.tqe_next; 1431 1432 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1433 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 1434 } 1435 1436 return num_events; 1437 } 1438 1439 static int 1440 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1441 { 1442 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1443 int rc; 1444 1445 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1446 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1447 } 1448 1449 rc = close(group->fd); 1450 free(group); 1451 return rc; 1452 } 1453 1454 static int 1455 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1456 { 1457 if (!opts || !len) { 1458 errno = EINVAL; 1459 return -1; 1460 } 1461 memset(opts, 0, *len); 1462 1463 #define FIELD_OK(field) \ 1464 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1465 1466 #define GET_FIELD(field) \ 1467 if (FIELD_OK(field)) { \ 1468 opts->field = g_spdk_posix_sock_impl_opts.field; \ 1469 } 1470 1471 GET_FIELD(recv_buf_size); 1472 GET_FIELD(send_buf_size); 1473 GET_FIELD(enable_recv_pipe); 1474 GET_FIELD(enable_zerocopy_send); 1475 GET_FIELD(enable_quickack); 1476 GET_FIELD(enable_placement_id); 1477 GET_FIELD(enable_zerocopy_send_server); 1478 GET_FIELD(enable_zerocopy_send_client); 1479 1480 #undef GET_FIELD 1481 #undef FIELD_OK 1482 1483 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 1484 return 0; 1485 } 1486 1487 static int 1488 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1489 { 1490 if (!opts) { 1491 errno = EINVAL; 1492 return -1; 1493 } 1494 1495 #define FIELD_OK(field) \ 1496 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1497 1498 #define SET_FIELD(field) \ 1499 if (FIELD_OK(field)) { \ 1500 g_spdk_posix_sock_impl_opts.field = opts->field; \ 1501 } 1502 1503 SET_FIELD(recv_buf_size); 1504 SET_FIELD(send_buf_size); 1505 SET_FIELD(enable_recv_pipe); 1506 SET_FIELD(enable_zerocopy_send); 1507 SET_FIELD(enable_quickack); 1508 SET_FIELD(enable_placement_id); 1509 SET_FIELD(enable_zerocopy_send_server); 1510 SET_FIELD(enable_zerocopy_send_client); 1511 1512 #undef SET_FIELD 1513 #undef FIELD_OK 1514 1515 return 0; 1516 } 1517 1518 1519 static struct spdk_net_impl g_posix_net_impl = { 1520 .name = "posix", 1521 .getaddr = posix_sock_getaddr, 1522 .connect = posix_sock_connect, 1523 .listen = posix_sock_listen, 1524 .accept = posix_sock_accept, 1525 .close = posix_sock_close, 1526 .recv = posix_sock_recv, 1527 .readv = posix_sock_readv, 1528 .writev = posix_sock_writev, 1529 .writev_async = posix_sock_writev_async, 1530 .flush = posix_sock_flush, 1531 .set_recvlowat = posix_sock_set_recvlowat, 1532 .set_recvbuf = posix_sock_set_recvbuf, 1533 .set_sendbuf = posix_sock_set_sendbuf, 1534 .is_ipv6 = posix_sock_is_ipv6, 1535 .is_ipv4 = posix_sock_is_ipv4, 1536 .is_connected = posix_sock_is_connected, 1537 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 1538 .group_impl_create = posix_sock_group_impl_create, 1539 .group_impl_add_sock = posix_sock_group_impl_add_sock, 1540 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 1541 .group_impl_poll = posix_sock_group_impl_poll, 1542 .group_impl_close = posix_sock_group_impl_close, 1543 .get_opts = posix_sock_impl_get_opts, 1544 .set_opts = posix_sock_impl_set_opts, 1545 }; 1546 1547 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1548