1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/config.h" 36 37 #include <sys/epoll.h> 38 #include <liburing.h> 39 40 #include "spdk/barrier.h" 41 #include "spdk/likely.h" 42 #include "spdk/log.h" 43 #include "spdk/pipe.h" 44 #include "spdk/sock.h" 45 #include "spdk/string.h" 46 #include "spdk/util.h" 47 48 #include "spdk_internal/sock.h" 49 #include "spdk_internal/assert.h" 50 51 #define MAX_TMPBUF 1024 52 #define PORTNUMLEN 32 53 #define SO_RCVBUF_SIZE (2 * 1024 * 1024) 54 #define SO_SNDBUF_SIZE (2 * 1024 * 1024) 55 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 56 #define IOV_BATCH_SIZE 64 57 58 enum spdk_sock_task_type { 59 SPDK_SOCK_TASK_POLLIN = 0, 60 SPDK_SOCK_TASK_WRITE, 61 SPDK_SOCK_TASK_CANCEL, 62 }; 63 64 enum spdk_uring_sock_task_status { 65 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 66 SPDK_URING_SOCK_TASK_IN_PROCESS, 67 }; 68 69 struct spdk_uring_task { 70 enum spdk_uring_sock_task_status status; 71 enum spdk_sock_task_type type; 72 struct spdk_uring_sock *sock; 73 struct msghdr msg; 74 struct iovec iovs[IOV_BATCH_SIZE]; 75 int iov_cnt; 76 struct spdk_sock_request *last_req; 77 STAILQ_ENTRY(spdk_uring_task) link; 78 }; 79 80 struct spdk_uring_sock { 81 struct spdk_sock base; 82 int fd; 83 struct spdk_uring_sock_group_impl *group; 84 struct spdk_uring_task write_task; 85 struct spdk_uring_task pollin_task; 86 struct spdk_uring_task cancel_task; 87 int outstanding_io; 88 struct spdk_pipe *recv_pipe; 89 void *recv_buf; 90 int recv_buf_sz; 91 bool pending_recv; 92 TAILQ_ENTRY(spdk_uring_sock) link; 93 }; 94 95 struct spdk_uring_sock_group_impl { 96 struct spdk_sock_group_impl base; 97 struct io_uring uring; 98 uint32_t io_inflight; 99 uint32_t io_queued; 100 uint32_t io_avail; 101 TAILQ_HEAD(, spdk_uring_sock) pending_recv; 102 }; 103 104 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 105 106 static int 107 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 108 { 109 const char *result = NULL; 110 111 if (sa == NULL || host == NULL) { 112 return -1; 113 } 114 115 switch (sa->sa_family) { 116 case AF_INET: 117 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 118 host, hlen); 119 break; 120 case AF_INET6: 121 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 122 host, hlen); 123 break; 124 default: 125 break; 126 } 127 128 if (result != NULL) { 129 return 0; 130 } else { 131 return -1; 132 } 133 } 134 135 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 136 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 137 138 static int 139 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 140 char *caddr, int clen, uint16_t *cport) 141 { 142 struct spdk_uring_sock *sock = __uring_sock(_sock); 143 struct sockaddr_storage sa; 144 socklen_t salen; 145 int rc; 146 147 assert(sock != NULL); 148 149 memset(&sa, 0, sizeof sa); 150 salen = sizeof sa; 151 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 152 if (rc != 0) { 153 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 154 return -1; 155 } 156 157 switch (sa.ss_family) { 158 case AF_UNIX: 159 /* Acceptable connection types that don't have IPs */ 160 return 0; 161 case AF_INET: 162 case AF_INET6: 163 /* Code below will get IP addresses */ 164 break; 165 default: 166 /* Unsupported socket family */ 167 return -1; 168 } 169 170 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 171 if (rc != 0) { 172 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 173 return -1; 174 } 175 176 if (sport) { 177 if (sa.ss_family == AF_INET) { 178 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 179 } else if (sa.ss_family == AF_INET6) { 180 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 181 } 182 } 183 184 memset(&sa, 0, sizeof sa); 185 salen = sizeof sa; 186 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 187 if (rc != 0) { 188 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 189 return -1; 190 } 191 192 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 193 if (rc != 0) { 194 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 195 return -1; 196 } 197 198 if (cport) { 199 if (sa.ss_family == AF_INET) { 200 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 201 } else if (sa.ss_family == AF_INET6) { 202 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 203 } 204 } 205 206 return 0; 207 } 208 209 enum uring_sock_create_type { 210 SPDK_SOCK_CREATE_LISTEN, 211 SPDK_SOCK_CREATE_CONNECT, 212 }; 213 214 static int 215 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 216 { 217 uint8_t *new_buf; 218 struct spdk_pipe *new_pipe; 219 struct iovec siov[2]; 220 struct iovec diov[2]; 221 int sbytes; 222 ssize_t bytes; 223 224 if (sock->recv_buf_sz == sz) { 225 return 0; 226 } 227 228 /* If the new size is 0, just free the pipe */ 229 if (sz == 0) { 230 spdk_pipe_destroy(sock->recv_pipe); 231 free(sock->recv_buf); 232 sock->recv_pipe = NULL; 233 sock->recv_buf = NULL; 234 return 0; 235 } 236 237 /* Round up to next 64 byte multiple */ 238 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 239 if (!new_buf) { 240 SPDK_ERRLOG("socket recv buf allocation failed\n"); 241 return -ENOMEM; 242 } 243 244 new_pipe = spdk_pipe_create(new_buf, sz + 1); 245 if (new_pipe == NULL) { 246 SPDK_ERRLOG("socket pipe allocation failed\n"); 247 free(new_buf); 248 return -ENOMEM; 249 } 250 251 if (sock->recv_pipe != NULL) { 252 /* Pull all of the data out of the old pipe */ 253 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 254 if (sbytes > sz) { 255 /* Too much data to fit into the new pipe size */ 256 spdk_pipe_destroy(new_pipe); 257 free(new_buf); 258 return -EINVAL; 259 } 260 261 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 262 assert(sbytes == sz); 263 264 bytes = spdk_iovcpy(siov, 2, diov, 2); 265 spdk_pipe_writer_advance(new_pipe, bytes); 266 267 spdk_pipe_destroy(sock->recv_pipe); 268 free(sock->recv_buf); 269 } 270 271 sock->recv_buf_sz = sz; 272 sock->recv_buf = new_buf; 273 sock->recv_pipe = new_pipe; 274 275 return 0; 276 } 277 278 static int 279 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 280 { 281 struct spdk_uring_sock *sock = __uring_sock(_sock); 282 int rc; 283 284 assert(sock != NULL); 285 286 #ifndef __aarch64__ 287 /* On ARM systems, this buffering does not help. Skip it. */ 288 /* The size of the pipe is purely derived from benchmarks. It seems to work well. */ 289 rc = uring_sock_alloc_pipe(sock, sz); 290 if (rc) { 291 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 292 return rc; 293 } 294 #endif 295 296 if (sz < SO_RCVBUF_SIZE) { 297 sz = SO_RCVBUF_SIZE; 298 } 299 300 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 301 if (rc < 0) { 302 return rc; 303 } 304 305 return 0; 306 } 307 308 static int 309 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 310 { 311 struct spdk_uring_sock *sock = __uring_sock(_sock); 312 int rc; 313 314 assert(sock != NULL); 315 316 if (sz < SO_SNDBUF_SIZE) { 317 sz = SO_SNDBUF_SIZE; 318 } 319 320 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 321 if (rc < 0) { 322 return rc; 323 } 324 325 return 0; 326 } 327 328 static struct spdk_uring_sock * 329 uring_sock_alloc(int fd) 330 { 331 struct spdk_uring_sock *sock; 332 333 sock = calloc(1, sizeof(*sock)); 334 if (sock == NULL) { 335 SPDK_ERRLOG("sock allocation failed\n"); 336 return NULL; 337 } 338 339 sock->fd = fd; 340 return sock; 341 } 342 343 static struct spdk_sock * 344 uring_sock_create(const char *ip, int port, 345 enum uring_sock_create_type type, 346 struct spdk_sock_opts *opts) 347 { 348 struct spdk_uring_sock *sock; 349 char buf[MAX_TMPBUF]; 350 char portnum[PORTNUMLEN]; 351 char *p; 352 struct addrinfo hints, *res, *res0; 353 int fd, flag; 354 int val = 1; 355 int rc; 356 357 if (ip == NULL) { 358 return NULL; 359 } 360 if (ip[0] == '[') { 361 snprintf(buf, sizeof(buf), "%s", ip + 1); 362 p = strchr(buf, ']'); 363 if (p != NULL) { 364 *p = '\0'; 365 } 366 ip = (const char *) &buf[0]; 367 } 368 369 snprintf(portnum, sizeof portnum, "%d", port); 370 memset(&hints, 0, sizeof hints); 371 hints.ai_family = PF_UNSPEC; 372 hints.ai_socktype = SOCK_STREAM; 373 hints.ai_flags = AI_NUMERICSERV; 374 hints.ai_flags |= AI_PASSIVE; 375 hints.ai_flags |= AI_NUMERICHOST; 376 rc = getaddrinfo(ip, portnum, &hints, &res0); 377 if (rc != 0) { 378 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 379 return NULL; 380 } 381 382 /* try listen */ 383 fd = -1; 384 for (res = res0; res != NULL; res = res->ai_next) { 385 retry: 386 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 387 if (fd < 0) { 388 /* error */ 389 continue; 390 } 391 392 val = SO_RCVBUF_SIZE; 393 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 394 if (rc) { 395 /* Not fatal */ 396 } 397 398 val = SO_SNDBUF_SIZE; 399 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 400 if (rc) { 401 /* Not fatal */ 402 } 403 404 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 405 if (rc != 0) { 406 close(fd); 407 /* error */ 408 continue; 409 } 410 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 411 if (rc != 0) { 412 close(fd); 413 /* error */ 414 continue; 415 } 416 417 #if defined(SO_PRIORITY) 418 if (opts != NULL && opts->priority) { 419 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 420 if (rc != 0) { 421 close(fd); 422 /* error */ 423 continue; 424 } 425 } 426 #endif 427 if (res->ai_family == AF_INET6) { 428 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 429 if (rc != 0) { 430 close(fd); 431 /* error */ 432 continue; 433 } 434 } 435 436 if (type == SPDK_SOCK_CREATE_LISTEN) { 437 rc = bind(fd, res->ai_addr, res->ai_addrlen); 438 if (rc != 0) { 439 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 440 switch (errno) { 441 case EINTR: 442 /* interrupted? */ 443 close(fd); 444 goto retry; 445 case EADDRNOTAVAIL: 446 SPDK_ERRLOG("IP address %s not available. " 447 "Verify IP address in config file " 448 "and make sure setup script is " 449 "run before starting spdk app.\n", ip); 450 /* FALLTHROUGH */ 451 default: 452 /* try next family */ 453 close(fd); 454 fd = -1; 455 continue; 456 } 457 } 458 /* bind OK */ 459 rc = listen(fd, 512); 460 if (rc != 0) { 461 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 462 close(fd); 463 fd = -1; 464 break; 465 } 466 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 467 rc = connect(fd, res->ai_addr, res->ai_addrlen); 468 if (rc != 0) { 469 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 470 /* try next family */ 471 close(fd); 472 fd = -1; 473 continue; 474 } 475 } 476 477 flag = fcntl(fd, F_GETFL); 478 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 479 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 480 close(fd); 481 fd = -1; 482 break; 483 } 484 break; 485 } 486 freeaddrinfo(res0); 487 488 if (fd < 0) { 489 return NULL; 490 } 491 492 sock = uring_sock_alloc(fd); 493 if (sock == NULL) { 494 SPDK_ERRLOG("sock allocation failed\n"); 495 close(fd); 496 return NULL; 497 } 498 499 return &sock->base; 500 } 501 502 static struct spdk_sock * 503 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 504 { 505 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 506 } 507 508 static struct spdk_sock * 509 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 510 { 511 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 512 } 513 514 static struct spdk_sock * 515 uring_sock_accept(struct spdk_sock *_sock) 516 { 517 struct spdk_uring_sock *sock = __uring_sock(_sock); 518 struct sockaddr_storage sa; 519 socklen_t salen; 520 int rc, fd; 521 struct spdk_uring_sock *new_sock; 522 int flag; 523 524 memset(&sa, 0, sizeof(sa)); 525 salen = sizeof(sa); 526 527 assert(sock != NULL); 528 529 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 530 531 if (rc == -1) { 532 return NULL; 533 } 534 535 fd = rc; 536 537 flag = fcntl(fd, F_GETFL); 538 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 539 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 540 close(fd); 541 return NULL; 542 } 543 544 #if defined(SO_PRIORITY) 545 /* The priority is not inherited, so call this function again */ 546 if (sock->base.opts.priority) { 547 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 548 if (rc != 0) { 549 close(fd); 550 return NULL; 551 } 552 } 553 #endif 554 555 new_sock = uring_sock_alloc(fd); 556 if (new_sock == NULL) { 557 close(fd); 558 return NULL; 559 } 560 561 return &new_sock->base; 562 } 563 564 static int 565 uring_sock_close(struct spdk_sock *_sock) 566 { 567 struct spdk_uring_sock *sock = __uring_sock(_sock); 568 int rc; 569 570 /* defer the socket close if there is outstanding I/O */ 571 if (sock->outstanding_io) { 572 return 0; 573 } 574 575 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 576 assert(sock->group == NULL); 577 578 spdk_pipe_destroy(sock->recv_pipe); 579 free(sock->recv_buf); 580 rc = close(sock->fd); 581 if (rc == 0) { 582 free(sock); 583 } 584 585 return rc; 586 } 587 588 static ssize_t 589 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 590 { 591 struct iovec siov[2]; 592 int sbytes; 593 ssize_t bytes; 594 struct spdk_uring_sock_group_impl *group; 595 596 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 597 if (sbytes < 0) { 598 errno = EINVAL; 599 return -1; 600 } else if (sbytes == 0) { 601 errno = EAGAIN; 602 return -1; 603 } 604 605 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 606 607 if (bytes == 0) { 608 /* The only way this happens is if diov is 0 length */ 609 errno = EINVAL; 610 return -1; 611 } 612 613 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 614 615 /* If we drained the pipe, take it off the level-triggered list */ 616 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 617 group = __uring_group_impl(sock->base.group_impl); 618 TAILQ_REMOVE(&group->pending_recv, sock, link); 619 sock->pending_recv = false; 620 } 621 622 return bytes; 623 } 624 625 static inline ssize_t 626 uring_sock_read(struct spdk_uring_sock *sock) 627 { 628 struct iovec iov[2]; 629 int bytes; 630 struct spdk_uring_sock_group_impl *group; 631 632 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 633 634 if (bytes > 0) { 635 bytes = readv(sock->fd, iov, 2); 636 if (bytes > 0) { 637 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 638 if (sock->base.group_impl) { 639 group = __uring_group_impl(sock->base.group_impl); 640 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 641 sock->pending_recv = true; 642 } 643 } 644 } 645 646 return bytes; 647 } 648 649 static ssize_t 650 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 651 { 652 struct spdk_uring_sock *sock = __uring_sock(_sock); 653 int rc, i; 654 size_t len; 655 656 if (sock->recv_pipe == NULL) { 657 return readv(sock->fd, iov, iovcnt); 658 } 659 660 len = 0; 661 for (i = 0; i < iovcnt; i++) { 662 len += iov[i].iov_len; 663 } 664 665 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 666 /* If the user is receiving a sufficiently large amount of data, 667 * receive directly to their buffers. */ 668 if (len >= 1024) { 669 return readv(sock->fd, iov, iovcnt); 670 } 671 672 /* Otherwise, do a big read into our pipe */ 673 rc = uring_sock_read(sock); 674 if (rc <= 0) { 675 return rc; 676 } 677 } 678 679 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 680 } 681 682 static ssize_t 683 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 684 { 685 struct iovec iov[1]; 686 687 iov[0].iov_base = buf; 688 iov[0].iov_len = len; 689 690 return uring_sock_readv(sock, iov, 1); 691 } 692 693 static ssize_t 694 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 695 { 696 struct spdk_uring_sock *sock = __uring_sock(_sock); 697 698 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 699 errno = EAGAIN; 700 return -1; 701 } 702 703 return writev(sock->fd, iov, iovcnt); 704 } 705 706 static int 707 sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, 708 struct spdk_sock_request **last_req) 709 { 710 int iovcnt, i; 711 struct spdk_sock_request *req; 712 unsigned int offset; 713 714 /* Gather an iov */ 715 iovcnt = index; 716 if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) { 717 goto end; 718 } 719 720 if (last_req != NULL && *last_req != NULL) { 721 req = TAILQ_NEXT(*last_req, internal.link); 722 } else { 723 req = TAILQ_FIRST(&_sock->queued_reqs); 724 } 725 726 while (req) { 727 offset = req->internal.offset; 728 729 for (i = 0; i < req->iovcnt; i++) { 730 /* Consume any offset first */ 731 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 732 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 733 continue; 734 } 735 736 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 737 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 738 iovcnt++; 739 740 offset = 0; 741 742 if (iovcnt >= IOV_BATCH_SIZE) { 743 break; 744 } 745 } 746 if (iovcnt >= IOV_BATCH_SIZE) { 747 break; 748 } 749 750 if (last_req != NULL) { 751 *last_req = req; 752 } 753 req = TAILQ_NEXT(req, internal.link); 754 } 755 756 end: 757 return iovcnt; 758 } 759 760 static int 761 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) 762 { 763 struct spdk_sock_request *req; 764 int i, retval; 765 unsigned int offset; 766 size_t len; 767 768 /* Consume the requests that were actually written */ 769 req = TAILQ_FIRST(&_sock->queued_reqs); 770 while (req) { 771 offset = req->internal.offset; 772 773 for (i = 0; i < req->iovcnt; i++) { 774 /* Advance by the offset first */ 775 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 776 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 777 continue; 778 } 779 780 /* Calculate the remaining length of this element */ 781 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 782 783 if (len > (size_t)rc) { 784 /* This element was partially sent. */ 785 req->internal.offset += rc; 786 return 0; 787 } 788 789 offset = 0; 790 req->internal.offset += len; 791 rc -= len; 792 } 793 794 /* Handled a full request. */ 795 spdk_sock_request_pend(_sock, req); 796 797 retval = spdk_sock_request_put(_sock, req, 0); 798 if (retval) { 799 return retval; 800 } 801 802 if (rc == 0) { 803 break; 804 } 805 806 req = TAILQ_FIRST(&_sock->queued_reqs); 807 } 808 809 return 0; 810 } 811 812 static void 813 _sock_flush(struct spdk_sock *_sock) 814 { 815 struct spdk_uring_sock *sock = __uring_sock(_sock); 816 struct spdk_uring_task *task = &sock->write_task; 817 uint32_t iovcnt; 818 struct io_uring_sqe *sqe; 819 820 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 821 return; 822 } 823 824 iovcnt = sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); 825 if (!iovcnt) { 826 return; 827 } 828 829 task->iov_cnt = iovcnt; 830 assert(sock->group != NULL); 831 task->msg.msg_iov = task->iovs; 832 task->msg.msg_iovlen = task->iov_cnt; 833 834 sock->group->io_queued++; 835 836 sqe = io_uring_get_sqe(&sock->group->uring); 837 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0); 838 io_uring_sqe_set_data(sqe, task); 839 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 840 } 841 842 static void 843 _sock_prep_pollin(struct spdk_sock *_sock) 844 { 845 struct spdk_uring_sock *sock = __uring_sock(_sock); 846 struct spdk_uring_task *task = &sock->pollin_task; 847 struct io_uring_sqe *sqe; 848 849 /* Do not prepare pollin event */ 850 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { 851 return; 852 } 853 854 assert(sock->group != NULL); 855 sock->group->io_queued++; 856 857 sqe = io_uring_get_sqe(&sock->group->uring); 858 io_uring_prep_poll_add(sqe, sock->fd, POLLIN); 859 io_uring_sqe_set_data(sqe, task); 860 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 861 } 862 863 static void 864 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 865 { 866 struct spdk_uring_sock *sock = __uring_sock(_sock); 867 struct spdk_uring_task *task = &sock->cancel_task; 868 struct io_uring_sqe *sqe; 869 870 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 871 return; 872 } 873 874 assert(sock->group != NULL); 875 sock->group->io_queued++; 876 877 sqe = io_uring_get_sqe(&sock->group->uring); 878 io_uring_prep_cancel(sqe, user_data, 0); 879 io_uring_sqe_set_data(sqe, task); 880 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 881 } 882 883 static int 884 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 885 struct spdk_sock **socks) 886 { 887 int count, i, completed_cqe_num; 888 struct io_uring_cqe *cqes[SPDK_SOCK_GROUP_QUEUE_DEPTH]; 889 struct spdk_uring_sock *sock, *tmp; 890 struct spdk_uring_task *task; 891 int status; 892 893 max = spdk_min(max, SPDK_SOCK_GROUP_QUEUE_DEPTH); 894 completed_cqe_num = io_uring_peek_batch_cqe(&group->uring, cqes, max); 895 for (i = 0; i < completed_cqe_num; i++) { 896 assert(cqes[i] != NULL); 897 898 task = (struct spdk_uring_task *)cqes[i]->user_data; 899 assert(task != NULL); 900 sock = task->sock; 901 assert(sock != NULL); 902 assert(sock->group != NULL); 903 assert(sock->group == group); 904 sock->group->io_inflight--; 905 sock->group->io_avail++; 906 status = cqes[i]->res; 907 io_uring_cqe_seen(&group->uring, cqes[i]); 908 909 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 910 911 if (spdk_unlikely(status <= 0)) { 912 if (status == -EAGAIN || status == -EWOULDBLOCK) { 913 continue; 914 } 915 } 916 917 switch (task->type) { 918 case SPDK_SOCK_TASK_POLLIN: 919 if ((status & POLLIN) == POLLIN) { 920 if ((socks != NULL) && (sock->base.cb_fn != NULL)) { 921 assert(sock->pending_recv == false); 922 sock->pending_recv = true; 923 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 924 } 925 } else { 926 SPDK_UNREACHABLE(); 927 } 928 break; 929 case SPDK_SOCK_TASK_WRITE: 930 assert(TAILQ_EMPTY(&sock->base.pending_reqs)); 931 task->last_req = NULL; 932 task->iov_cnt = 0; 933 sock_complete_reqs(&sock->base, status); 934 935 /* For socket is removed from the group but having outstanding I/O */ 936 if (spdk_unlikely(task->sock->outstanding_io > 0 && 937 TAILQ_EMPTY(&sock->base.pending_reqs))) { 938 if (--sock->outstanding_io == 0) { 939 /* Just for sock close case */ 940 if (sock->base.flags.closed) { 941 uring_sock_close(&sock->base); 942 } 943 } 944 } 945 946 break; 947 case SPDK_SOCK_TASK_CANCEL: 948 if ((status == 0) && (sock->outstanding_io > 0)) { 949 sock->outstanding_io--; 950 } 951 break; 952 default: 953 SPDK_UNREACHABLE(); 954 } 955 } 956 957 count = 0; 958 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 959 if (count == max_read_events) { 960 break; 961 } 962 963 socks[count++] = &sock->base; 964 } 965 966 /* Cycle the pending_recv list so that each time we poll things aren't 967 * in the same order. */ 968 for (i = 0; i < count; i++) { 969 sock = __uring_sock(socks[i]); 970 971 TAILQ_REMOVE(&group->pending_recv, sock, link); 972 973 if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 974 sock->pending_recv = false; 975 } else { 976 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 977 } 978 } 979 980 return count; 981 } 982 983 static int 984 _sock_flush_client(struct spdk_sock *_sock) 985 { 986 struct spdk_uring_sock *sock = __uring_sock(_sock); 987 struct msghdr msg = {}; 988 struct iovec iovs[IOV_BATCH_SIZE]; 989 int iovcnt; 990 ssize_t rc; 991 992 /* Can't flush from within a callback or we end up with recursive calls */ 993 if (_sock->cb_cnt > 0) { 994 return 0; 995 } 996 997 /* Gather an iov */ 998 iovcnt = sock_prep_reqs(_sock, iovs, 0, NULL); 999 if (iovcnt == 0) { 1000 return 0; 1001 } 1002 1003 /* Perform the vectored write */ 1004 msg.msg_iov = iovs; 1005 msg.msg_iovlen = iovcnt; 1006 rc = sendmsg(sock->fd, &msg, 0); 1007 if (rc <= 0) { 1008 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1009 return 0; 1010 } 1011 return rc; 1012 } 1013 1014 sock_complete_reqs(_sock, rc); 1015 1016 return 0; 1017 } 1018 1019 static void 1020 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1021 { 1022 struct spdk_uring_sock *sock = __uring_sock(_sock); 1023 int rc; 1024 1025 spdk_sock_request_queue(_sock, req); 1026 1027 if (!sock->group) { 1028 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1029 rc = _sock_flush_client(_sock); 1030 if (rc) { 1031 spdk_sock_abort_requests(_sock); 1032 } 1033 } 1034 } 1035 } 1036 1037 static int 1038 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1039 { 1040 struct spdk_uring_sock *sock = __uring_sock(_sock); 1041 int val; 1042 int rc; 1043 1044 assert(sock != NULL); 1045 1046 val = nbytes; 1047 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1048 if (rc != 0) { 1049 return -1; 1050 } 1051 return 0; 1052 } 1053 1054 static bool 1055 uring_sock_is_ipv6(struct spdk_sock *_sock) 1056 { 1057 struct spdk_uring_sock *sock = __uring_sock(_sock); 1058 struct sockaddr_storage sa; 1059 socklen_t salen; 1060 int rc; 1061 1062 assert(sock != NULL); 1063 1064 memset(&sa, 0, sizeof sa); 1065 salen = sizeof sa; 1066 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1067 if (rc != 0) { 1068 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1069 return false; 1070 } 1071 1072 return (sa.ss_family == AF_INET6); 1073 } 1074 1075 static bool 1076 uring_sock_is_ipv4(struct spdk_sock *_sock) 1077 { 1078 struct spdk_uring_sock *sock = __uring_sock(_sock); 1079 struct sockaddr_storage sa; 1080 socklen_t salen; 1081 int rc; 1082 1083 assert(sock != NULL); 1084 1085 memset(&sa, 0, sizeof sa); 1086 salen = sizeof sa; 1087 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1088 if (rc != 0) { 1089 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1090 return false; 1091 } 1092 1093 return (sa.ss_family == AF_INET); 1094 } 1095 1096 static bool 1097 uring_sock_is_connected(struct spdk_sock *_sock) 1098 { 1099 struct spdk_uring_sock *sock = __uring_sock(_sock); 1100 uint8_t byte; 1101 int rc; 1102 1103 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1104 if (rc == 0) { 1105 return false; 1106 } 1107 1108 if (rc < 0) { 1109 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1110 return true; 1111 } 1112 1113 return false; 1114 } 1115 1116 return true; 1117 } 1118 1119 static int 1120 uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1121 { 1122 int rc = -1; 1123 1124 #if defined(SO_INCOMING_NAPI_ID) 1125 struct spdk_uring_sock *sock = __uring_sock(_sock); 1126 socklen_t salen = sizeof(int); 1127 1128 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1129 if (rc != 0) { 1130 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1131 } 1132 1133 #endif 1134 return rc; 1135 } 1136 1137 static struct spdk_sock_group_impl * 1138 uring_sock_group_impl_create(void) 1139 { 1140 struct spdk_uring_sock_group_impl *group_impl; 1141 1142 group_impl = calloc(1, sizeof(*group_impl)); 1143 if (group_impl == NULL) { 1144 SPDK_ERRLOG("group_impl allocation failed\n"); 1145 return NULL; 1146 } 1147 1148 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1149 1150 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1151 SPDK_ERRLOG("uring I/O context setup failure\n"); 1152 free(group_impl); 1153 return NULL; 1154 } 1155 1156 TAILQ_INIT(&group_impl->pending_recv); 1157 1158 return &group_impl->base; 1159 } 1160 1161 static int 1162 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1163 struct spdk_sock *_sock) 1164 { 1165 struct spdk_uring_sock *sock = __uring_sock(_sock); 1166 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1167 1168 sock->group = group; 1169 sock->write_task.sock = sock; 1170 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1171 1172 sock->pollin_task.sock = sock; 1173 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1174 1175 sock->cancel_task.sock = sock; 1176 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1177 1178 /* switched from another polling group due to scheduling */ 1179 if (spdk_unlikely(sock->recv_pipe != NULL && 1180 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1181 assert(sock->pending_recv == false); 1182 sock->pending_recv = true; 1183 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1184 } 1185 1186 return 0; 1187 } 1188 1189 static int 1190 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1191 struct spdk_sock **socks) 1192 { 1193 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1194 int count, ret; 1195 int to_complete, to_submit; 1196 struct spdk_sock *_sock, *tmp; 1197 1198 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1199 _sock_flush(_sock); 1200 _sock_prep_pollin(_sock); 1201 } 1202 1203 to_submit = group->io_queued; 1204 1205 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1206 if (to_submit > 0) { 1207 /* If there are I/O to submit, use io_uring_submit here. 1208 * It will automatically call io_uring_enter appropriately. */ 1209 ret = io_uring_submit(&group->uring); 1210 if (ret < 0) { 1211 return 1; 1212 } 1213 group->io_queued = 0; 1214 group->io_inflight += to_submit; 1215 group->io_avail -= to_submit; 1216 } 1217 1218 count = 0; 1219 to_complete = group->io_inflight; 1220 if (to_complete > 0) { 1221 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1222 } 1223 1224 return count; 1225 } 1226 1227 static int 1228 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1229 struct spdk_sock *_sock) 1230 { 1231 struct spdk_uring_sock *sock = __uring_sock(_sock); 1232 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1233 1234 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1235 /* For write, we do not need to cancel it */ 1236 sock->outstanding_io++; 1237 } 1238 1239 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1240 sock->outstanding_io++; 1241 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1242 } 1243 1244 1245 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1246 * currently can use a while loop here. */ 1247 while (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1248 uring_sock_group_impl_poll(_group, 32, NULL); 1249 } 1250 1251 if (sock->recv_pipe != NULL) { 1252 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { 1253 TAILQ_REMOVE(&group->pending_recv, sock, link); 1254 sock->pending_recv = false; 1255 } 1256 assert(sock->pending_recv == false); 1257 } 1258 sock->group = NULL; 1259 return 0; 1260 } 1261 1262 static int 1263 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1264 { 1265 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1266 1267 /* try to reap all the active I/O */ 1268 while (group->io_inflight) { 1269 uring_sock_group_impl_poll(_group, 32, NULL); 1270 } 1271 assert(group->io_inflight == 0); 1272 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1273 1274 close(group->uring.ring_fd); 1275 io_uring_queue_exit(&group->uring); 1276 1277 free(group); 1278 return 0; 1279 } 1280 1281 static int 1282 uring_sock_flush(struct spdk_sock *_sock) 1283 { 1284 struct spdk_uring_sock *sock = __uring_sock(_sock); 1285 1286 if (!sock->group) { 1287 return _sock_flush_client(_sock); 1288 } 1289 1290 return 0; 1291 } 1292 1293 static struct spdk_net_impl g_uring_net_impl = { 1294 .name = "uring", 1295 .getaddr = uring_sock_getaddr, 1296 .connect = uring_sock_connect, 1297 .listen = uring_sock_listen, 1298 .accept = uring_sock_accept, 1299 .close = uring_sock_close, 1300 .recv = uring_sock_recv, 1301 .readv = uring_sock_readv, 1302 .writev = uring_sock_writev, 1303 .writev_async = uring_sock_writev_async, 1304 .flush = uring_sock_flush, 1305 .set_recvlowat = uring_sock_set_recvlowat, 1306 .set_recvbuf = uring_sock_set_recvbuf, 1307 .set_sendbuf = uring_sock_set_sendbuf, 1308 .is_ipv6 = uring_sock_is_ipv6, 1309 .is_ipv4 = uring_sock_is_ipv4, 1310 .is_connected = uring_sock_is_connected, 1311 .get_placement_id = uring_sock_get_placement_id, 1312 .group_impl_create = uring_sock_group_impl_create, 1313 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1314 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1315 .group_impl_poll = uring_sock_group_impl_poll, 1316 .group_impl_close = uring_sock_group_impl_close, 1317 }; 1318 1319 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); 1320