1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/config.h" 36 37 #include <sys/epoll.h> 38 #include <liburing.h> 39 40 #include "spdk/barrier.h" 41 #include "spdk/likely.h" 42 #include "spdk/log.h" 43 #include "spdk/pipe.h" 44 #include "spdk/sock.h" 45 #include "spdk/string.h" 46 #include "spdk/util.h" 47 48 #include "spdk_internal/sock.h" 49 #include "spdk_internal/assert.h" 50 51 #define MAX_TMPBUF 1024 52 #define PORTNUMLEN 32 53 #define SO_RCVBUF_SIZE (2 * 1024 * 1024) 54 #define SO_SNDBUF_SIZE (2 * 1024 * 1024) 55 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 56 #define IOV_BATCH_SIZE 64 57 58 enum spdk_sock_task_type { 59 SPDK_SOCK_TASK_POLLIN = 0, 60 SPDK_SOCK_TASK_WRITE, 61 }; 62 63 enum spdk_uring_sock_task_status { 64 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 65 SPDK_URING_SOCK_TASK_IN_PROCESS, 66 }; 67 68 struct spdk_uring_task { 69 enum spdk_uring_sock_task_status status; 70 enum spdk_sock_task_type type; 71 struct spdk_uring_sock *sock; 72 struct msghdr msg; 73 struct iovec iovs[IOV_BATCH_SIZE]; 74 int iov_cnt; 75 struct spdk_sock_request *last_req; 76 STAILQ_ENTRY(spdk_uring_task) link; 77 }; 78 79 struct spdk_uring_sock { 80 struct spdk_sock base; 81 int fd; 82 struct spdk_uring_sock_group_impl *group; 83 struct spdk_uring_task write_task; 84 struct spdk_uring_task pollin_task; 85 int outstanding_io; 86 struct spdk_pipe *recv_pipe; 87 void *recv_buf; 88 int recv_buf_sz; 89 bool pending_recv; 90 TAILQ_ENTRY(spdk_uring_sock) link; 91 }; 92 93 struct spdk_uring_sock_group_impl { 94 struct spdk_sock_group_impl base; 95 struct io_uring uring; 96 uint32_t io_inflight; 97 uint32_t io_queued; 98 uint32_t io_avail; 99 TAILQ_HEAD(, spdk_uring_sock) pending_recv; 100 }; 101 102 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 103 104 static int 105 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 106 { 107 const char *result = NULL; 108 109 if (sa == NULL || host == NULL) { 110 return -1; 111 } 112 113 switch (sa->sa_family) { 114 case AF_INET: 115 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 116 host, hlen); 117 break; 118 case AF_INET6: 119 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 120 host, hlen); 121 break; 122 default: 123 break; 124 } 125 126 if (result != NULL) { 127 return 0; 128 } else { 129 return -1; 130 } 131 } 132 133 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 134 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 135 136 static int 137 spdk_uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 138 char *caddr, int clen, uint16_t *cport) 139 { 140 struct spdk_uring_sock *sock = __uring_sock(_sock); 141 struct sockaddr_storage sa; 142 socklen_t salen; 143 int rc; 144 145 assert(sock != NULL); 146 147 memset(&sa, 0, sizeof sa); 148 salen = sizeof sa; 149 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 150 if (rc != 0) { 151 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 152 return -1; 153 } 154 155 switch (sa.ss_family) { 156 case AF_UNIX: 157 /* Acceptable connection types that don't have IPs */ 158 return 0; 159 case AF_INET: 160 case AF_INET6: 161 /* Code below will get IP addresses */ 162 break; 163 default: 164 /* Unsupported socket family */ 165 return -1; 166 } 167 168 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 169 if (rc != 0) { 170 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 171 return -1; 172 } 173 174 if (sport) { 175 if (sa.ss_family == AF_INET) { 176 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 177 } else if (sa.ss_family == AF_INET6) { 178 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 179 } 180 } 181 182 memset(&sa, 0, sizeof sa); 183 salen = sizeof sa; 184 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 185 if (rc != 0) { 186 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 187 return -1; 188 } 189 190 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 191 if (rc != 0) { 192 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 193 return -1; 194 } 195 196 if (cport) { 197 if (sa.ss_family == AF_INET) { 198 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 199 } else if (sa.ss_family == AF_INET6) { 200 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 201 } 202 } 203 204 return 0; 205 } 206 207 enum spdk_uring_sock_create_type { 208 SPDK_SOCK_CREATE_LISTEN, 209 SPDK_SOCK_CREATE_CONNECT, 210 }; 211 212 static int 213 spdk_uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 214 { 215 uint8_t *new_buf; 216 struct spdk_pipe *new_pipe; 217 struct iovec siov[2]; 218 struct iovec diov[2]; 219 int sbytes; 220 ssize_t bytes; 221 222 if (sock->recv_buf_sz == sz) { 223 return 0; 224 } 225 226 /* If the new size is 0, just free the pipe */ 227 if (sz == 0) { 228 spdk_pipe_destroy(sock->recv_pipe); 229 free(sock->recv_buf); 230 sock->recv_pipe = NULL; 231 sock->recv_buf = NULL; 232 return 0; 233 } 234 235 /* Round up to next 64 byte multiple */ 236 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 237 if (!new_buf) { 238 SPDK_ERRLOG("socket recv buf allocation failed\n"); 239 return -ENOMEM; 240 } 241 242 new_pipe = spdk_pipe_create(new_buf, sz + 1); 243 if (new_pipe == NULL) { 244 SPDK_ERRLOG("socket pipe allocation failed\n"); 245 free(new_buf); 246 return -ENOMEM; 247 } 248 249 if (sock->recv_pipe != NULL) { 250 /* Pull all of the data out of the old pipe */ 251 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 252 if (sbytes > sz) { 253 /* Too much data to fit into the new pipe size */ 254 spdk_pipe_destroy(new_pipe); 255 free(new_buf); 256 return -EINVAL; 257 } 258 259 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 260 assert(sbytes == sz); 261 262 bytes = spdk_iovcpy(siov, 2, diov, 2); 263 spdk_pipe_writer_advance(new_pipe, bytes); 264 265 spdk_pipe_destroy(sock->recv_pipe); 266 free(sock->recv_buf); 267 } 268 269 sock->recv_buf_sz = sz; 270 sock->recv_buf = new_buf; 271 sock->recv_pipe = new_pipe; 272 273 return 0; 274 } 275 276 static int 277 spdk_uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 278 { 279 struct spdk_uring_sock *sock = __uring_sock(_sock); 280 int rc; 281 282 assert(sock != NULL); 283 284 #ifndef __aarch64__ 285 /* On ARM systems, this buffering does not help. Skip it. */ 286 /* The size of the pipe is purely derived from benchmarks. It seems to work well. */ 287 rc = spdk_uring_sock_alloc_pipe(sock, sz); 288 if (rc) { 289 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 290 return rc; 291 } 292 #endif 293 294 if (sz < SO_RCVBUF_SIZE) { 295 sz = SO_RCVBUF_SIZE; 296 } 297 298 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 299 if (rc < 0) { 300 return rc; 301 } 302 303 return 0; 304 } 305 306 static int 307 spdk_uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 308 { 309 struct spdk_uring_sock *sock = __uring_sock(_sock); 310 int rc; 311 312 assert(sock != NULL); 313 314 if (sz < SO_SNDBUF_SIZE) { 315 sz = SO_SNDBUF_SIZE; 316 } 317 318 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 319 if (rc < 0) { 320 return rc; 321 } 322 323 return 0; 324 } 325 326 static struct spdk_uring_sock * 327 _spdk_uring_sock_alloc(int fd) 328 { 329 struct spdk_uring_sock *sock; 330 331 sock = calloc(1, sizeof(*sock)); 332 if (sock == NULL) { 333 SPDK_ERRLOG("sock allocation failed\n"); 334 return NULL; 335 } 336 337 sock->fd = fd; 338 return sock; 339 } 340 341 static struct spdk_sock * 342 spdk_uring_sock_create(const char *ip, int port, enum spdk_uring_sock_create_type type) 343 { 344 struct spdk_uring_sock *sock; 345 char buf[MAX_TMPBUF]; 346 char portnum[PORTNUMLEN]; 347 char *p; 348 struct addrinfo hints, *res, *res0; 349 int fd, flag; 350 int val = 1; 351 int rc; 352 353 if (ip == NULL) { 354 return NULL; 355 } 356 if (ip[0] == '[') { 357 snprintf(buf, sizeof(buf), "%s", ip + 1); 358 p = strchr(buf, ']'); 359 if (p != NULL) { 360 *p = '\0'; 361 } 362 ip = (const char *) &buf[0]; 363 } 364 365 snprintf(portnum, sizeof portnum, "%d", port); 366 memset(&hints, 0, sizeof hints); 367 hints.ai_family = PF_UNSPEC; 368 hints.ai_socktype = SOCK_STREAM; 369 hints.ai_flags = AI_NUMERICSERV; 370 hints.ai_flags |= AI_PASSIVE; 371 hints.ai_flags |= AI_NUMERICHOST; 372 rc = getaddrinfo(ip, portnum, &hints, &res0); 373 if (rc != 0) { 374 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 375 return NULL; 376 } 377 378 /* try listen */ 379 fd = -1; 380 for (res = res0; res != NULL; res = res->ai_next) { 381 retry: 382 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 383 if (fd < 0) { 384 /* error */ 385 continue; 386 } 387 388 val = SO_RCVBUF_SIZE; 389 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 390 if (rc) { 391 /* Not fatal */ 392 } 393 394 val = SO_SNDBUF_SIZE; 395 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 396 if (rc) { 397 /* Not fatal */ 398 } 399 400 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 401 if (rc != 0) { 402 close(fd); 403 /* error */ 404 continue; 405 } 406 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 407 if (rc != 0) { 408 close(fd); 409 /* error */ 410 continue; 411 } 412 413 if (res->ai_family == AF_INET6) { 414 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 415 if (rc != 0) { 416 close(fd); 417 /* error */ 418 continue; 419 } 420 } 421 422 if (type == SPDK_SOCK_CREATE_LISTEN) { 423 rc = bind(fd, res->ai_addr, res->ai_addrlen); 424 if (rc != 0) { 425 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 426 switch (errno) { 427 case EINTR: 428 /* interrupted? */ 429 close(fd); 430 goto retry; 431 case EADDRNOTAVAIL: 432 SPDK_ERRLOG("IP address %s not available. " 433 "Verify IP address in config file " 434 "and make sure setup script is " 435 "run before starting spdk app.\n", ip); 436 /* FALLTHROUGH */ 437 default: 438 /* try next family */ 439 close(fd); 440 fd = -1; 441 continue; 442 } 443 } 444 /* bind OK */ 445 rc = listen(fd, 512); 446 if (rc != 0) { 447 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 448 close(fd); 449 fd = -1; 450 break; 451 } 452 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 453 rc = connect(fd, res->ai_addr, res->ai_addrlen); 454 if (rc != 0) { 455 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 456 /* try next family */ 457 close(fd); 458 fd = -1; 459 continue; 460 } 461 } 462 463 flag = fcntl(fd, F_GETFL); 464 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 465 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 466 close(fd); 467 fd = -1; 468 break; 469 } 470 break; 471 } 472 freeaddrinfo(res0); 473 474 if (fd < 0) { 475 return NULL; 476 } 477 478 sock = _spdk_uring_sock_alloc(fd); 479 if (sock == NULL) { 480 SPDK_ERRLOG("sock allocation failed\n"); 481 close(fd); 482 return NULL; 483 } 484 485 return &sock->base; 486 } 487 488 static struct spdk_sock * 489 spdk_uring_sock_listen(const char *ip, int port) 490 { 491 return spdk_uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN); 492 } 493 494 static struct spdk_sock * 495 spdk_uring_sock_connect(const char *ip, int port) 496 { 497 return spdk_uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT); 498 } 499 500 static struct spdk_sock * 501 spdk_uring_sock_accept(struct spdk_sock *_sock) 502 { 503 struct spdk_uring_sock *sock = __uring_sock(_sock); 504 struct sockaddr_storage sa; 505 socklen_t salen; 506 int rc, fd; 507 struct spdk_uring_sock *new_sock; 508 int flag; 509 510 memset(&sa, 0, sizeof(sa)); 511 salen = sizeof(sa); 512 513 assert(sock != NULL); 514 515 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 516 517 if (rc == -1) { 518 return NULL; 519 } 520 521 fd = rc; 522 523 flag = fcntl(fd, F_GETFL); 524 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 525 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 526 close(fd); 527 return NULL; 528 } 529 530 new_sock = _spdk_uring_sock_alloc(fd); 531 if (new_sock == NULL) { 532 close(fd); 533 return NULL; 534 } 535 536 return &new_sock->base; 537 } 538 539 static int 540 spdk_uring_sock_close(struct spdk_sock *_sock) 541 { 542 struct spdk_uring_sock *sock = __uring_sock(_sock); 543 int rc; 544 545 /* defer the socket close if there is outstanding I/O */ 546 if (sock->outstanding_io) { 547 return 0; 548 } 549 550 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 551 assert(sock->group == NULL); 552 553 spdk_pipe_destroy(sock->recv_pipe); 554 free(sock->recv_buf); 555 rc = close(sock->fd); 556 if (rc == 0) { 557 free(sock); 558 } 559 560 return rc; 561 } 562 563 static ssize_t 564 spdk_uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 565 { 566 struct iovec siov[2]; 567 int sbytes; 568 ssize_t bytes; 569 struct spdk_uring_sock_group_impl *group; 570 571 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 572 if (sbytes < 0) { 573 errno = EINVAL; 574 return -1; 575 } else if (sbytes == 0) { 576 errno = EAGAIN; 577 return -1; 578 } 579 580 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 581 582 if (bytes == 0) { 583 /* The only way this happens is if diov is 0 length */ 584 errno = EINVAL; 585 return -1; 586 } 587 588 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 589 590 /* If we drained the pipe, take it off the level-triggered list */ 591 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 592 group = __uring_group_impl(sock->base.group_impl); 593 TAILQ_REMOVE(&group->pending_recv, sock, link); 594 sock->pending_recv = false; 595 } 596 597 return bytes; 598 } 599 600 static inline ssize_t 601 _spdk_uring_sock_read(struct spdk_uring_sock *sock) 602 { 603 struct iovec iov[2]; 604 int bytes; 605 struct spdk_uring_sock_group_impl *group; 606 607 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 608 609 if (bytes > 0) { 610 bytes = readv(sock->fd, iov, 2); 611 if (bytes > 0) { 612 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 613 if (sock->base.group_impl) { 614 group = __uring_group_impl(sock->base.group_impl); 615 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 616 sock->pending_recv = true; 617 } 618 } 619 } 620 621 return bytes; 622 } 623 624 static ssize_t 625 spdk_uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 626 { 627 struct spdk_uring_sock *sock = __uring_sock(_sock); 628 int rc, i; 629 size_t len; 630 631 if (sock->recv_pipe == NULL) { 632 return readv(sock->fd, iov, iovcnt); 633 } 634 635 len = 0; 636 for (i = 0; i < iovcnt; i++) { 637 len += iov[i].iov_len; 638 } 639 640 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 641 /* If the user is receiving a sufficiently large amount of data, 642 * receive directly to their buffers. */ 643 if (len >= 1024) { 644 return readv(sock->fd, iov, iovcnt); 645 } 646 647 /* Otherwise, do a big read into our pipe */ 648 rc = _spdk_uring_sock_read(sock); 649 if (rc <= 0) { 650 return rc; 651 } 652 } 653 654 return spdk_uring_sock_recv_from_pipe(sock, iov, iovcnt); 655 } 656 657 static ssize_t 658 spdk_uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 659 { 660 struct iovec iov[1]; 661 662 iov[0].iov_base = buf; 663 iov[0].iov_len = len; 664 665 return spdk_uring_sock_readv(sock, iov, 1); 666 } 667 668 static ssize_t 669 spdk_uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 670 { 671 struct spdk_uring_sock *sock = __uring_sock(_sock); 672 673 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 674 errno = EAGAIN; 675 return -1; 676 } 677 678 return writev(sock->fd, iov, iovcnt); 679 } 680 681 static int 682 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, 683 struct spdk_sock_request **last_req) 684 { 685 int iovcnt, i; 686 struct spdk_sock_request *req; 687 unsigned int offset; 688 689 /* Gather an iov */ 690 iovcnt = index; 691 if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) { 692 goto end; 693 } 694 695 if (last_req != NULL && *last_req != NULL) { 696 req = TAILQ_NEXT(*last_req, internal.link); 697 } else { 698 req = TAILQ_FIRST(&_sock->queued_reqs); 699 } 700 701 while (req) { 702 offset = req->internal.offset; 703 704 for (i = 0; i < req->iovcnt; i++) { 705 /* Consume any offset first */ 706 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 707 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 708 continue; 709 } 710 711 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 712 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 713 iovcnt++; 714 715 offset = 0; 716 717 if (iovcnt >= IOV_BATCH_SIZE) { 718 break; 719 } 720 } 721 if (iovcnt >= IOV_BATCH_SIZE) { 722 break; 723 } 724 725 if (last_req != NULL) { 726 *last_req = req; 727 } 728 req = TAILQ_NEXT(req, internal.link); 729 } 730 731 end: 732 return iovcnt; 733 } 734 735 static int 736 spdk_sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) 737 { 738 struct spdk_sock_request *req; 739 int i, retval; 740 unsigned int offset; 741 size_t len; 742 743 /* Consume the requests that were actually written */ 744 req = TAILQ_FIRST(&_sock->queued_reqs); 745 while (req) { 746 offset = req->internal.offset; 747 748 for (i = 0; i < req->iovcnt; i++) { 749 /* Advance by the offset first */ 750 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 751 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 752 continue; 753 } 754 755 /* Calculate the remaining length of this element */ 756 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 757 758 if (len > (size_t)rc) { 759 /* This element was partially sent. */ 760 req->internal.offset += rc; 761 return 0; 762 } 763 764 offset = 0; 765 req->internal.offset += len; 766 rc -= len; 767 } 768 769 /* Handled a full request. */ 770 spdk_sock_request_pend(_sock, req); 771 772 retval = spdk_sock_request_put(_sock, req, 0); 773 if (retval) { 774 return retval; 775 } 776 777 if (rc == 0) { 778 break; 779 } 780 781 req = TAILQ_FIRST(&_sock->queued_reqs); 782 } 783 784 return 0; 785 } 786 787 static void 788 _sock_flush(struct spdk_sock *_sock) 789 { 790 struct spdk_uring_sock *sock = __uring_sock(_sock); 791 struct spdk_uring_task *task = &sock->write_task; 792 uint32_t iovcnt; 793 struct io_uring_sqe *sqe; 794 795 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 796 return; 797 } 798 799 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); 800 if (!iovcnt) { 801 return; 802 } 803 804 task->iov_cnt = iovcnt; 805 assert(sock->group != NULL); 806 task->msg.msg_iov = task->iovs; 807 task->msg.msg_iovlen = task->iov_cnt; 808 809 sock->group->io_queued++; 810 811 sqe = io_uring_get_sqe(&sock->group->uring); 812 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0); 813 io_uring_sqe_set_data(sqe, task); 814 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 815 } 816 817 static void 818 _sock_prep_pollin(struct spdk_sock *_sock) 819 { 820 struct spdk_uring_sock *sock = __uring_sock(_sock); 821 struct spdk_uring_task *task = &sock->pollin_task; 822 struct io_uring_sqe *sqe; 823 824 /* Do not prepare pollin event */ 825 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { 826 return; 827 } 828 829 assert(sock->group != NULL); 830 sock->group->io_queued++; 831 832 sqe = io_uring_get_sqe(&sock->group->uring); 833 io_uring_prep_poll_add(sqe, sock->fd, POLLIN); 834 io_uring_sqe_set_data(sqe, task); 835 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 836 } 837 838 static int 839 spdk_sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 840 struct spdk_sock **socks) 841 { 842 int i, count, ret; 843 struct io_uring_cqe *cqe; 844 struct spdk_uring_sock *sock, *tmp; 845 struct spdk_uring_task *task; 846 int status; 847 848 for (i = 0; i < max; i++) { 849 ret = io_uring_peek_cqe(&group->uring, &cqe); 850 if (ret != 0) { 851 break; 852 } 853 854 if (cqe == NULL) { 855 break; 856 } 857 858 task = (struct spdk_uring_task *)cqe->user_data; 859 assert(task != NULL); 860 sock = task->sock; 861 assert(sock != NULL); 862 assert(sock->group != NULL); 863 assert(sock->group == group); 864 sock->group->io_inflight--; 865 sock->group->io_avail++; 866 status = cqe->res; 867 io_uring_cqe_seen(&group->uring, cqe); 868 869 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 870 871 if (spdk_unlikely(status <= 0)) { 872 if (status == -EAGAIN || status == -EWOULDBLOCK) { 873 continue; 874 } 875 } 876 877 switch (task->type) { 878 case SPDK_SOCK_TASK_POLLIN: 879 if ((status & POLLIN) == POLLIN) { 880 if ((socks != NULL) && (sock->base.cb_fn != NULL)) { 881 assert(sock->pending_recv == false); 882 sock->pending_recv = true; 883 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 884 } 885 } else { 886 SPDK_UNREACHABLE(); 887 } 888 break; 889 case SPDK_SOCK_TASK_WRITE: 890 assert(TAILQ_EMPTY(&sock->base.pending_reqs)); 891 task->last_req = NULL; 892 task->iov_cnt = 0; 893 spdk_sock_complete_reqs(&sock->base, status); 894 895 /* For socket is removed from the group but having outstanding I/O */ 896 if (spdk_unlikely(task->sock->outstanding_io > 0 && 897 TAILQ_EMPTY(&sock->base.pending_reqs))) { 898 if (--sock->outstanding_io == 0) { 899 sock->group = NULL; 900 /* Just for sock close case */ 901 if (sock->base.flags.closed) { 902 spdk_uring_sock_close(&sock->base); 903 } 904 } 905 } 906 907 break; 908 default: 909 SPDK_UNREACHABLE(); 910 } 911 } 912 913 count = 0; 914 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 915 if (count == max_read_events) { 916 break; 917 } 918 919 socks[count++] = &sock->base; 920 } 921 922 /* Cycle the pending_recv list so that each time we poll things aren't 923 * in the same order. */ 924 for (i = 0; i < count; i++) { 925 sock = __uring_sock(socks[i]); 926 927 TAILQ_REMOVE(&group->pending_recv, sock, link); 928 929 if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 930 sock->pending_recv = false; 931 } else { 932 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 933 } 934 } 935 936 return count; 937 } 938 939 static int 940 _sock_flush_client(struct spdk_sock *_sock) 941 { 942 struct spdk_uring_sock *sock = __uring_sock(_sock); 943 struct msghdr msg = {}; 944 struct iovec iovs[IOV_BATCH_SIZE]; 945 int iovcnt; 946 ssize_t rc; 947 948 /* Can't flush from within a callback or we end up with recursive calls */ 949 if (_sock->cb_cnt > 0) { 950 return 0; 951 } 952 953 /* Gather an iov */ 954 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL); 955 if (iovcnt == 0) { 956 return 0; 957 } 958 959 /* Perform the vectored write */ 960 msg.msg_iov = iovs; 961 msg.msg_iovlen = iovcnt; 962 rc = sendmsg(sock->fd, &msg, 0); 963 if (rc <= 0) { 964 if (errno == EAGAIN || errno == EWOULDBLOCK) { 965 return 0; 966 } 967 return rc; 968 } 969 970 spdk_sock_complete_reqs(_sock, rc); 971 972 return 0; 973 } 974 975 static void 976 spdk_uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 977 { 978 struct spdk_uring_sock *sock = __uring_sock(_sock); 979 int rc; 980 981 spdk_sock_request_queue(_sock, req); 982 983 if (!sock->group) { 984 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 985 rc = _sock_flush_client(_sock); 986 if (rc) { 987 spdk_sock_abort_requests(_sock); 988 } 989 } 990 } 991 } 992 993 static int 994 spdk_uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 995 { 996 struct spdk_uring_sock *sock = __uring_sock(_sock); 997 int val; 998 int rc; 999 1000 assert(sock != NULL); 1001 1002 val = nbytes; 1003 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1004 if (rc != 0) { 1005 return -1; 1006 } 1007 return 0; 1008 } 1009 1010 static bool 1011 spdk_uring_sock_is_ipv6(struct spdk_sock *_sock) 1012 { 1013 struct spdk_uring_sock *sock = __uring_sock(_sock); 1014 struct sockaddr_storage sa; 1015 socklen_t salen; 1016 int rc; 1017 1018 assert(sock != NULL); 1019 1020 memset(&sa, 0, sizeof sa); 1021 salen = sizeof sa; 1022 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1023 if (rc != 0) { 1024 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1025 return false; 1026 } 1027 1028 return (sa.ss_family == AF_INET6); 1029 } 1030 1031 static bool 1032 spdk_uring_sock_is_ipv4(struct spdk_sock *_sock) 1033 { 1034 struct spdk_uring_sock *sock = __uring_sock(_sock); 1035 struct sockaddr_storage sa; 1036 socklen_t salen; 1037 int rc; 1038 1039 assert(sock != NULL); 1040 1041 memset(&sa, 0, sizeof sa); 1042 salen = sizeof sa; 1043 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1044 if (rc != 0) { 1045 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1046 return false; 1047 } 1048 1049 return (sa.ss_family == AF_INET); 1050 } 1051 1052 static bool 1053 spdk_uring_sock_is_connected(struct spdk_sock *_sock) 1054 { 1055 struct spdk_uring_sock *sock = __uring_sock(_sock); 1056 uint8_t byte; 1057 int rc; 1058 1059 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1060 if (rc == 0) { 1061 return false; 1062 } 1063 1064 if (rc < 0) { 1065 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1066 return true; 1067 } 1068 1069 return false; 1070 } 1071 1072 return true; 1073 } 1074 1075 static int 1076 spdk_uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1077 { 1078 int rc = -1; 1079 1080 #if defined(SO_INCOMING_NAPI_ID) 1081 struct spdk_uring_sock *sock = __uring_sock(_sock); 1082 socklen_t salen = sizeof(int); 1083 1084 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1085 if (rc != 0) { 1086 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1087 } 1088 1089 #endif 1090 return rc; 1091 } 1092 1093 static struct spdk_sock_group_impl * 1094 spdk_uring_sock_group_impl_create(void) 1095 { 1096 struct spdk_uring_sock_group_impl *group_impl; 1097 1098 group_impl = calloc(1, sizeof(*group_impl)); 1099 if (group_impl == NULL) { 1100 SPDK_ERRLOG("group_impl allocation failed\n"); 1101 return NULL; 1102 } 1103 1104 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1105 1106 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1107 SPDK_ERRLOG("uring I/O context setup failure\n"); 1108 free(group_impl); 1109 return NULL; 1110 } 1111 1112 TAILQ_INIT(&group_impl->pending_recv); 1113 1114 return &group_impl->base; 1115 } 1116 1117 static int 1118 spdk_uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1119 struct spdk_sock *_sock) 1120 { 1121 struct spdk_uring_sock *sock = __uring_sock(_sock); 1122 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1123 1124 sock->group = group; 1125 sock->write_task.sock = sock; 1126 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1127 1128 sock->pollin_task.sock = sock; 1129 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1130 1131 return 0; 1132 } 1133 1134 static int 1135 spdk_uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1136 struct spdk_sock *_sock) 1137 { 1138 struct spdk_uring_sock *sock = __uring_sock(_sock); 1139 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1140 1141 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1142 sock->outstanding_io++; 1143 } 1144 1145 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1146 sock->outstanding_io++; 1147 } 1148 1149 if ((sock->recv_pipe != NULL) && 1150 spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { 1151 TAILQ_REMOVE(&group->pending_recv, sock, link); 1152 sock->pending_recv = false; 1153 } 1154 1155 if (!sock->outstanding_io) { 1156 sock->group = NULL; 1157 } 1158 1159 return 0; 1160 } 1161 1162 static int 1163 spdk_uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1164 struct spdk_sock **socks) 1165 { 1166 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1167 int count, ret; 1168 int to_complete, to_submit; 1169 struct spdk_sock *_sock, *tmp; 1170 1171 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1172 _sock_flush(_sock); 1173 _sock_prep_pollin(_sock); 1174 } 1175 1176 to_submit = group->io_queued; 1177 1178 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1179 if (to_submit > 0) { 1180 /* If there are I/O to submit, use io_uring_submit here. 1181 * It will automatically call io_uring_enter appropriately. */ 1182 ret = io_uring_submit(&group->uring); 1183 if (ret < 0) { 1184 return 1; 1185 } 1186 group->io_queued = 0; 1187 group->io_inflight += to_submit; 1188 group->io_avail -= to_submit; 1189 } 1190 1191 count = 0; 1192 to_complete = group->io_inflight; 1193 if (to_complete > 0) { 1194 count = spdk_sock_uring_group_reap(group, to_complete, max_events, socks); 1195 } 1196 1197 return count; 1198 } 1199 1200 static int 1201 spdk_uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1202 { 1203 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1204 1205 /* try to reap all the active I/O */ 1206 while (group->io_inflight) { 1207 spdk_uring_sock_group_impl_poll(_group, 32, NULL); 1208 } 1209 assert(group->io_inflight == 0); 1210 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1211 1212 close(group->uring.ring_fd); 1213 io_uring_queue_exit(&group->uring); 1214 1215 free(group); 1216 return 0; 1217 } 1218 1219 static int 1220 spdk_uring_sock_flush(struct spdk_sock *_sock) 1221 { 1222 return _sock_flush_client(_sock); 1223 } 1224 1225 static struct spdk_net_impl g_uring_net_impl = { 1226 .name = "uring", 1227 .getaddr = spdk_uring_sock_getaddr, 1228 .connect = spdk_uring_sock_connect, 1229 .listen = spdk_uring_sock_listen, 1230 .accept = spdk_uring_sock_accept, 1231 .close = spdk_uring_sock_close, 1232 .recv = spdk_uring_sock_recv, 1233 .readv = spdk_uring_sock_readv, 1234 .writev = spdk_uring_sock_writev, 1235 .writev_async = spdk_uring_sock_writev_async, 1236 .flush = spdk_uring_sock_flush, 1237 .set_recvlowat = spdk_uring_sock_set_recvlowat, 1238 .set_recvbuf = spdk_uring_sock_set_recvbuf, 1239 .set_sendbuf = spdk_uring_sock_set_sendbuf, 1240 .is_ipv6 = spdk_uring_sock_is_ipv6, 1241 .is_ipv4 = spdk_uring_sock_is_ipv4, 1242 .is_connected = spdk_uring_sock_is_connected, 1243 .get_placement_id = spdk_uring_sock_get_placement_id, 1244 .group_impl_create = spdk_uring_sock_group_impl_create, 1245 .group_impl_add_sock = spdk_uring_sock_group_impl_add_sock, 1246 .group_impl_remove_sock = spdk_uring_sock_group_impl_remove_sock, 1247 .group_impl_poll = spdk_uring_sock_group_impl_poll, 1248 .group_impl_close = spdk_uring_sock_group_impl_close, 1249 }; 1250 1251 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); 1252