1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/config.h" 36 37 #include <sys/epoll.h> 38 #include <liburing.h> 39 40 #include "spdk/barrier.h" 41 #include "spdk/likely.h" 42 #include "spdk/log.h" 43 #include "spdk/pipe.h" 44 #include "spdk/sock.h" 45 #include "spdk/string.h" 46 #include "spdk/util.h" 47 48 #include "spdk_internal/sock.h" 49 #include "spdk_internal/assert.h" 50 51 #define MAX_TMPBUF 1024 52 #define PORTNUMLEN 32 53 #define SO_RCVBUF_SIZE (2 * 1024 * 1024) 54 #define SO_SNDBUF_SIZE (2 * 1024 * 1024) 55 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 56 #define IOV_BATCH_SIZE 64 57 58 enum spdk_sock_task_type { 59 SPDK_SOCK_TASK_POLLIN = 0, 60 SPDK_SOCK_TASK_WRITE, 61 SPDK_SOCK_TASK_CANCEL, 62 }; 63 64 enum spdk_uring_sock_task_status { 65 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 66 SPDK_URING_SOCK_TASK_IN_PROCESS, 67 }; 68 69 struct spdk_uring_task { 70 enum spdk_uring_sock_task_status status; 71 enum spdk_sock_task_type type; 72 struct spdk_uring_sock *sock; 73 struct msghdr msg; 74 struct iovec iovs[IOV_BATCH_SIZE]; 75 int iov_cnt; 76 struct spdk_sock_request *last_req; 77 STAILQ_ENTRY(spdk_uring_task) link; 78 }; 79 80 struct spdk_uring_sock { 81 struct spdk_sock base; 82 int fd; 83 struct spdk_uring_sock_group_impl *group; 84 struct spdk_uring_task write_task; 85 struct spdk_uring_task pollin_task; 86 struct spdk_uring_task cancel_task; 87 struct spdk_pipe *recv_pipe; 88 void *recv_buf; 89 int recv_buf_sz; 90 bool pending_recv; 91 int connection_status; 92 TAILQ_ENTRY(spdk_uring_sock) link; 93 }; 94 95 struct spdk_uring_sock_group_impl { 96 struct spdk_sock_group_impl base; 97 struct io_uring uring; 98 uint32_t io_inflight; 99 uint32_t io_queued; 100 uint32_t io_avail; 101 TAILQ_HEAD(, spdk_uring_sock) pending_recv; 102 }; 103 104 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 105 106 static int 107 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 108 { 109 const char *result = NULL; 110 111 if (sa == NULL || host == NULL) { 112 return -1; 113 } 114 115 switch (sa->sa_family) { 116 case AF_INET: 117 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 118 host, hlen); 119 break; 120 case AF_INET6: 121 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 122 host, hlen); 123 break; 124 default: 125 break; 126 } 127 128 if (result != NULL) { 129 return 0; 130 } else { 131 return -1; 132 } 133 } 134 135 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 136 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 137 138 static int 139 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 140 char *caddr, int clen, uint16_t *cport) 141 { 142 struct spdk_uring_sock *sock = __uring_sock(_sock); 143 struct sockaddr_storage sa; 144 socklen_t salen; 145 int rc; 146 147 assert(sock != NULL); 148 149 memset(&sa, 0, sizeof sa); 150 salen = sizeof sa; 151 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 152 if (rc != 0) { 153 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 154 return -1; 155 } 156 157 switch (sa.ss_family) { 158 case AF_UNIX: 159 /* Acceptable connection types that don't have IPs */ 160 return 0; 161 case AF_INET: 162 case AF_INET6: 163 /* Code below will get IP addresses */ 164 break; 165 default: 166 /* Unsupported socket family */ 167 return -1; 168 } 169 170 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 171 if (rc != 0) { 172 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 173 return -1; 174 } 175 176 if (sport) { 177 if (sa.ss_family == AF_INET) { 178 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 179 } else if (sa.ss_family == AF_INET6) { 180 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 181 } 182 } 183 184 memset(&sa, 0, sizeof sa); 185 salen = sizeof sa; 186 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 187 if (rc != 0) { 188 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 189 return -1; 190 } 191 192 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 193 if (rc != 0) { 194 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 195 return -1; 196 } 197 198 if (cport) { 199 if (sa.ss_family == AF_INET) { 200 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 201 } else if (sa.ss_family == AF_INET6) { 202 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 203 } 204 } 205 206 return 0; 207 } 208 209 enum uring_sock_create_type { 210 SPDK_SOCK_CREATE_LISTEN, 211 SPDK_SOCK_CREATE_CONNECT, 212 }; 213 214 static int 215 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 216 { 217 uint8_t *new_buf; 218 struct spdk_pipe *new_pipe; 219 struct iovec siov[2]; 220 struct iovec diov[2]; 221 int sbytes; 222 ssize_t bytes; 223 224 if (sock->recv_buf_sz == sz) { 225 return 0; 226 } 227 228 /* If the new size is 0, just free the pipe */ 229 if (sz == 0) { 230 spdk_pipe_destroy(sock->recv_pipe); 231 free(sock->recv_buf); 232 sock->recv_pipe = NULL; 233 sock->recv_buf = NULL; 234 return 0; 235 } else if (sz < MIN_SOCK_PIPE_SIZE) { 236 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 237 return -1; 238 } 239 240 /* Round up to next 64 byte multiple */ 241 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 242 if (!new_buf) { 243 SPDK_ERRLOG("socket recv buf allocation failed\n"); 244 return -ENOMEM; 245 } 246 247 new_pipe = spdk_pipe_create(new_buf, sz + 1); 248 if (new_pipe == NULL) { 249 SPDK_ERRLOG("socket pipe allocation failed\n"); 250 free(new_buf); 251 return -ENOMEM; 252 } 253 254 if (sock->recv_pipe != NULL) { 255 /* Pull all of the data out of the old pipe */ 256 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 257 if (sbytes > sz) { 258 /* Too much data to fit into the new pipe size */ 259 spdk_pipe_destroy(new_pipe); 260 free(new_buf); 261 return -EINVAL; 262 } 263 264 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 265 assert(sbytes == sz); 266 267 bytes = spdk_iovcpy(siov, 2, diov, 2); 268 spdk_pipe_writer_advance(new_pipe, bytes); 269 270 spdk_pipe_destroy(sock->recv_pipe); 271 free(sock->recv_buf); 272 } 273 274 sock->recv_buf_sz = sz; 275 sock->recv_buf = new_buf; 276 sock->recv_pipe = new_pipe; 277 278 return 0; 279 } 280 281 static int 282 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 283 { 284 struct spdk_uring_sock *sock = __uring_sock(_sock); 285 int rc; 286 287 assert(sock != NULL); 288 289 #ifndef __aarch64__ 290 /* On ARM systems, this buffering does not help. Skip it. */ 291 /* The size of the pipe is purely derived from benchmarks. It seems to work well. */ 292 rc = uring_sock_alloc_pipe(sock, sz); 293 if (rc) { 294 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 295 return rc; 296 } 297 #endif 298 299 if (sz < SO_RCVBUF_SIZE) { 300 sz = SO_RCVBUF_SIZE; 301 } 302 303 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 304 if (rc < 0) { 305 return rc; 306 } 307 308 return 0; 309 } 310 311 static int 312 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 313 { 314 struct spdk_uring_sock *sock = __uring_sock(_sock); 315 int rc; 316 317 assert(sock != NULL); 318 319 if (sz < SO_SNDBUF_SIZE) { 320 sz = SO_SNDBUF_SIZE; 321 } 322 323 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 324 if (rc < 0) { 325 return rc; 326 } 327 328 return 0; 329 } 330 331 static struct spdk_uring_sock * 332 uring_sock_alloc(int fd) 333 { 334 struct spdk_uring_sock *sock; 335 336 sock = calloc(1, sizeof(*sock)); 337 if (sock == NULL) { 338 SPDK_ERRLOG("sock allocation failed\n"); 339 return NULL; 340 } 341 342 sock->fd = fd; 343 return sock; 344 } 345 346 static struct spdk_sock * 347 uring_sock_create(const char *ip, int port, 348 enum uring_sock_create_type type, 349 struct spdk_sock_opts *opts) 350 { 351 struct spdk_uring_sock *sock; 352 char buf[MAX_TMPBUF]; 353 char portnum[PORTNUMLEN]; 354 char *p; 355 struct addrinfo hints, *res, *res0; 356 int fd, flag; 357 int val = 1; 358 int rc; 359 360 if (ip == NULL) { 361 return NULL; 362 } 363 if (ip[0] == '[') { 364 snprintf(buf, sizeof(buf), "%s", ip + 1); 365 p = strchr(buf, ']'); 366 if (p != NULL) { 367 *p = '\0'; 368 } 369 ip = (const char *) &buf[0]; 370 } 371 372 snprintf(portnum, sizeof portnum, "%d", port); 373 memset(&hints, 0, sizeof hints); 374 hints.ai_family = PF_UNSPEC; 375 hints.ai_socktype = SOCK_STREAM; 376 hints.ai_flags = AI_NUMERICSERV; 377 hints.ai_flags |= AI_PASSIVE; 378 hints.ai_flags |= AI_NUMERICHOST; 379 rc = getaddrinfo(ip, portnum, &hints, &res0); 380 if (rc != 0) { 381 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 382 return NULL; 383 } 384 385 /* try listen */ 386 fd = -1; 387 for (res = res0; res != NULL; res = res->ai_next) { 388 retry: 389 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 390 if (fd < 0) { 391 /* error */ 392 continue; 393 } 394 395 val = SO_RCVBUF_SIZE; 396 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 397 if (rc) { 398 /* Not fatal */ 399 } 400 401 val = SO_SNDBUF_SIZE; 402 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 403 if (rc) { 404 /* Not fatal */ 405 } 406 407 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 408 if (rc != 0) { 409 close(fd); 410 /* error */ 411 continue; 412 } 413 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 414 if (rc != 0) { 415 close(fd); 416 /* error */ 417 continue; 418 } 419 420 #if defined(SO_PRIORITY) 421 if (opts != NULL && opts->priority) { 422 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 423 if (rc != 0) { 424 close(fd); 425 /* error */ 426 continue; 427 } 428 } 429 #endif 430 if (res->ai_family == AF_INET6) { 431 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 432 if (rc != 0) { 433 close(fd); 434 /* error */ 435 continue; 436 } 437 } 438 439 if (type == SPDK_SOCK_CREATE_LISTEN) { 440 rc = bind(fd, res->ai_addr, res->ai_addrlen); 441 if (rc != 0) { 442 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 443 switch (errno) { 444 case EINTR: 445 /* interrupted? */ 446 close(fd); 447 goto retry; 448 case EADDRNOTAVAIL: 449 SPDK_ERRLOG("IP address %s not available. " 450 "Verify IP address in config file " 451 "and make sure setup script is " 452 "run before starting spdk app.\n", ip); 453 /* FALLTHROUGH */ 454 default: 455 /* try next family */ 456 close(fd); 457 fd = -1; 458 continue; 459 } 460 } 461 /* bind OK */ 462 rc = listen(fd, 512); 463 if (rc != 0) { 464 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 465 close(fd); 466 fd = -1; 467 break; 468 } 469 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 470 rc = connect(fd, res->ai_addr, res->ai_addrlen); 471 if (rc != 0) { 472 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 473 /* try next family */ 474 close(fd); 475 fd = -1; 476 continue; 477 } 478 } 479 480 flag = fcntl(fd, F_GETFL); 481 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 482 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 483 close(fd); 484 fd = -1; 485 break; 486 } 487 break; 488 } 489 freeaddrinfo(res0); 490 491 if (fd < 0) { 492 return NULL; 493 } 494 495 sock = uring_sock_alloc(fd); 496 if (sock == NULL) { 497 SPDK_ERRLOG("sock allocation failed\n"); 498 close(fd); 499 return NULL; 500 } 501 502 return &sock->base; 503 } 504 505 static struct spdk_sock * 506 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 507 { 508 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 509 } 510 511 static struct spdk_sock * 512 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 513 { 514 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 515 } 516 517 static struct spdk_sock * 518 uring_sock_accept(struct spdk_sock *_sock) 519 { 520 struct spdk_uring_sock *sock = __uring_sock(_sock); 521 struct sockaddr_storage sa; 522 socklen_t salen; 523 int rc, fd; 524 struct spdk_uring_sock *new_sock; 525 int flag; 526 527 memset(&sa, 0, sizeof(sa)); 528 salen = sizeof(sa); 529 530 assert(sock != NULL); 531 532 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 533 534 if (rc == -1) { 535 return NULL; 536 } 537 538 fd = rc; 539 540 flag = fcntl(fd, F_GETFL); 541 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 542 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 543 close(fd); 544 return NULL; 545 } 546 547 #if defined(SO_PRIORITY) 548 /* The priority is not inherited, so call this function again */ 549 if (sock->base.opts.priority) { 550 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 551 if (rc != 0) { 552 close(fd); 553 return NULL; 554 } 555 } 556 #endif 557 558 new_sock = uring_sock_alloc(fd); 559 if (new_sock == NULL) { 560 close(fd); 561 return NULL; 562 } 563 564 return &new_sock->base; 565 } 566 567 static int 568 uring_sock_close(struct spdk_sock *_sock) 569 { 570 struct spdk_uring_sock *sock = __uring_sock(_sock); 571 int rc; 572 573 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 574 assert(sock->group == NULL); 575 576 spdk_pipe_destroy(sock->recv_pipe); 577 free(sock->recv_buf); 578 rc = close(sock->fd); 579 if (rc == 0) { 580 free(sock); 581 } 582 583 return rc; 584 } 585 586 static ssize_t 587 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 588 { 589 struct iovec siov[2]; 590 int sbytes; 591 ssize_t bytes; 592 struct spdk_uring_sock_group_impl *group; 593 594 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 595 if (sbytes < 0) { 596 errno = EINVAL; 597 return -1; 598 } else if (sbytes == 0) { 599 errno = EAGAIN; 600 return -1; 601 } 602 603 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 604 605 if (bytes == 0) { 606 /* The only way this happens is if diov is 0 length */ 607 errno = EINVAL; 608 return -1; 609 } 610 611 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 612 613 /* If we drained the pipe, take it off the level-triggered list */ 614 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 615 group = __uring_group_impl(sock->base.group_impl); 616 TAILQ_REMOVE(&group->pending_recv, sock, link); 617 sock->pending_recv = false; 618 } 619 620 return bytes; 621 } 622 623 static inline ssize_t 624 uring_sock_read(struct spdk_uring_sock *sock) 625 { 626 struct iovec iov[2]; 627 int bytes; 628 struct spdk_uring_sock_group_impl *group; 629 630 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 631 632 if (bytes > 0) { 633 bytes = readv(sock->fd, iov, 2); 634 if (bytes > 0) { 635 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 636 if (sock->base.group_impl) { 637 group = __uring_group_impl(sock->base.group_impl); 638 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 639 sock->pending_recv = true; 640 } 641 } 642 } 643 644 return bytes; 645 } 646 647 static ssize_t 648 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 649 { 650 struct spdk_uring_sock *sock = __uring_sock(_sock); 651 int rc, i; 652 size_t len; 653 654 if (sock->recv_pipe == NULL) { 655 return readv(sock->fd, iov, iovcnt); 656 } 657 658 len = 0; 659 for (i = 0; i < iovcnt; i++) { 660 len += iov[i].iov_len; 661 } 662 663 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 664 /* If the user is receiving a sufficiently large amount of data, 665 * receive directly to their buffers. */ 666 if (len >= MIN_SOCK_PIPE_SIZE) { 667 return readv(sock->fd, iov, iovcnt); 668 } 669 670 /* Otherwise, do a big read into our pipe */ 671 rc = uring_sock_read(sock); 672 if (rc <= 0) { 673 return rc; 674 } 675 } 676 677 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 678 } 679 680 static ssize_t 681 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 682 { 683 struct iovec iov[1]; 684 685 iov[0].iov_base = buf; 686 iov[0].iov_len = len; 687 688 return uring_sock_readv(sock, iov, 1); 689 } 690 691 static ssize_t 692 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 693 { 694 struct spdk_uring_sock *sock = __uring_sock(_sock); 695 696 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 697 errno = EAGAIN; 698 return -1; 699 } 700 701 return writev(sock->fd, iov, iovcnt); 702 } 703 704 static int 705 sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, 706 struct spdk_sock_request **last_req) 707 { 708 int iovcnt, i; 709 struct spdk_sock_request *req; 710 unsigned int offset; 711 712 /* Gather an iov */ 713 iovcnt = index; 714 if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) { 715 goto end; 716 } 717 718 if (last_req != NULL && *last_req != NULL) { 719 req = TAILQ_NEXT(*last_req, internal.link); 720 } else { 721 req = TAILQ_FIRST(&_sock->queued_reqs); 722 } 723 724 while (req) { 725 offset = req->internal.offset; 726 727 for (i = 0; i < req->iovcnt; i++) { 728 /* Consume any offset first */ 729 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 730 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 731 continue; 732 } 733 734 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 735 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 736 iovcnt++; 737 738 offset = 0; 739 740 if (iovcnt >= IOV_BATCH_SIZE) { 741 break; 742 } 743 } 744 if (iovcnt >= IOV_BATCH_SIZE) { 745 break; 746 } 747 748 if (last_req != NULL) { 749 *last_req = req; 750 } 751 req = TAILQ_NEXT(req, internal.link); 752 } 753 754 end: 755 return iovcnt; 756 } 757 758 static int 759 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) 760 { 761 struct spdk_sock_request *req; 762 int i, retval; 763 unsigned int offset; 764 size_t len; 765 766 /* Consume the requests that were actually written */ 767 req = TAILQ_FIRST(&_sock->queued_reqs); 768 while (req) { 769 offset = req->internal.offset; 770 771 for (i = 0; i < req->iovcnt; i++) { 772 /* Advance by the offset first */ 773 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 774 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 775 continue; 776 } 777 778 /* Calculate the remaining length of this element */ 779 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 780 781 if (len > (size_t)rc) { 782 /* This element was partially sent. */ 783 req->internal.offset += rc; 784 return 0; 785 } 786 787 offset = 0; 788 req->internal.offset += len; 789 rc -= len; 790 } 791 792 /* Handled a full request. */ 793 spdk_sock_request_pend(_sock, req); 794 795 retval = spdk_sock_request_put(_sock, req, 0); 796 if (retval) { 797 return retval; 798 } 799 800 if (rc == 0) { 801 break; 802 } 803 804 req = TAILQ_FIRST(&_sock->queued_reqs); 805 } 806 807 return 0; 808 } 809 810 static void 811 _sock_flush(struct spdk_sock *_sock) 812 { 813 struct spdk_uring_sock *sock = __uring_sock(_sock); 814 struct spdk_uring_task *task = &sock->write_task; 815 uint32_t iovcnt; 816 struct io_uring_sqe *sqe; 817 818 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 819 return; 820 } 821 822 iovcnt = sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); 823 if (!iovcnt) { 824 return; 825 } 826 827 task->iov_cnt = iovcnt; 828 assert(sock->group != NULL); 829 task->msg.msg_iov = task->iovs; 830 task->msg.msg_iovlen = task->iov_cnt; 831 832 sock->group->io_queued++; 833 834 sqe = io_uring_get_sqe(&sock->group->uring); 835 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0); 836 io_uring_sqe_set_data(sqe, task); 837 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 838 } 839 840 static void 841 _sock_prep_pollin(struct spdk_sock *_sock) 842 { 843 struct spdk_uring_sock *sock = __uring_sock(_sock); 844 struct spdk_uring_task *task = &sock->pollin_task; 845 struct io_uring_sqe *sqe; 846 847 /* Do not prepare pollin event */ 848 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { 849 return; 850 } 851 852 assert(sock->group != NULL); 853 sock->group->io_queued++; 854 855 sqe = io_uring_get_sqe(&sock->group->uring); 856 io_uring_prep_poll_add(sqe, sock->fd, POLLIN); 857 io_uring_sqe_set_data(sqe, task); 858 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 859 } 860 861 static void 862 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 863 { 864 struct spdk_uring_sock *sock = __uring_sock(_sock); 865 struct spdk_uring_task *task = &sock->cancel_task; 866 struct io_uring_sqe *sqe; 867 868 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 869 return; 870 } 871 872 assert(sock->group != NULL); 873 sock->group->io_queued++; 874 875 sqe = io_uring_get_sqe(&sock->group->uring); 876 io_uring_prep_cancel(sqe, user_data, 0); 877 io_uring_sqe_set_data(sqe, task); 878 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 879 } 880 881 static int 882 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 883 struct spdk_sock **socks) 884 { 885 int i, count, ret; 886 struct io_uring_cqe *cqe; 887 struct spdk_uring_sock *sock, *tmp; 888 struct spdk_uring_task *task; 889 int status; 890 891 for (i = 0; i < max; i++) { 892 ret = io_uring_peek_cqe(&group->uring, &cqe); 893 if (ret != 0) { 894 break; 895 } 896 897 if (cqe == NULL) { 898 break; 899 } 900 901 task = (struct spdk_uring_task *)cqe->user_data; 902 assert(task != NULL); 903 sock = task->sock; 904 assert(sock != NULL); 905 assert(sock->group != NULL); 906 assert(sock->group == group); 907 sock->group->io_inflight--; 908 sock->group->io_avail++; 909 status = cqe->res; 910 io_uring_cqe_seen(&group->uring, cqe); 911 912 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 913 914 if (spdk_unlikely(status <= 0)) { 915 if (status == -EAGAIN || status == -EWOULDBLOCK) { 916 continue; 917 } 918 } 919 920 switch (task->type) { 921 case SPDK_SOCK_TASK_POLLIN: 922 if ((status & POLLIN) == POLLIN) { 923 if (sock->base.cb_fn != NULL) { 924 assert(sock->pending_recv == false); 925 sock->pending_recv = true; 926 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 927 } 928 } 929 break; 930 case SPDK_SOCK_TASK_WRITE: 931 assert(TAILQ_EMPTY(&sock->base.pending_reqs)); 932 task->last_req = NULL; 933 task->iov_cnt = 0; 934 if (spdk_unlikely(status) < 0) { 935 sock->connection_status = status; 936 spdk_sock_abort_requests(&sock->base); 937 } else { 938 sock_complete_reqs(&sock->base, status); 939 } 940 941 break; 942 case SPDK_SOCK_TASK_CANCEL: 943 /* Do nothing */ 944 break; 945 default: 946 SPDK_UNREACHABLE(); 947 } 948 } 949 950 if (!socks) { 951 return 0; 952 } 953 count = 0; 954 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 955 if (count == max_read_events) { 956 break; 957 } 958 959 socks[count++] = &sock->base; 960 } 961 962 /* Cycle the pending_recv list so that each time we poll things aren't 963 * in the same order. */ 964 for (i = 0; i < count; i++) { 965 sock = __uring_sock(socks[i]); 966 967 TAILQ_REMOVE(&group->pending_recv, sock, link); 968 969 if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 970 sock->pending_recv = false; 971 } else { 972 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 973 } 974 } 975 976 return count; 977 } 978 979 static int 980 _sock_flush_client(struct spdk_sock *_sock) 981 { 982 struct spdk_uring_sock *sock = __uring_sock(_sock); 983 struct msghdr msg = {}; 984 struct iovec iovs[IOV_BATCH_SIZE]; 985 int iovcnt; 986 ssize_t rc; 987 988 /* Can't flush from within a callback or we end up with recursive calls */ 989 if (_sock->cb_cnt > 0) { 990 return 0; 991 } 992 993 /* Gather an iov */ 994 iovcnt = sock_prep_reqs(_sock, iovs, 0, NULL); 995 if (iovcnt == 0) { 996 return 0; 997 } 998 999 /* Perform the vectored write */ 1000 msg.msg_iov = iovs; 1001 msg.msg_iovlen = iovcnt; 1002 rc = sendmsg(sock->fd, &msg, 0); 1003 if (rc <= 0) { 1004 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1005 return 0; 1006 } 1007 return rc; 1008 } 1009 1010 sock_complete_reqs(_sock, rc); 1011 1012 return 0; 1013 } 1014 1015 static void 1016 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1017 { 1018 struct spdk_uring_sock *sock = __uring_sock(_sock); 1019 int rc; 1020 1021 if (spdk_unlikely(sock->connection_status)) { 1022 req->cb_fn(req->cb_arg, sock->connection_status); 1023 return; 1024 } 1025 1026 spdk_sock_request_queue(_sock, req); 1027 1028 if (!sock->group) { 1029 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1030 rc = _sock_flush_client(_sock); 1031 if (rc) { 1032 spdk_sock_abort_requests(_sock); 1033 } 1034 } 1035 } 1036 } 1037 1038 static int 1039 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1040 { 1041 struct spdk_uring_sock *sock = __uring_sock(_sock); 1042 int val; 1043 int rc; 1044 1045 assert(sock != NULL); 1046 1047 val = nbytes; 1048 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1049 if (rc != 0) { 1050 return -1; 1051 } 1052 return 0; 1053 } 1054 1055 static bool 1056 uring_sock_is_ipv6(struct spdk_sock *_sock) 1057 { 1058 struct spdk_uring_sock *sock = __uring_sock(_sock); 1059 struct sockaddr_storage sa; 1060 socklen_t salen; 1061 int rc; 1062 1063 assert(sock != NULL); 1064 1065 memset(&sa, 0, sizeof sa); 1066 salen = sizeof sa; 1067 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1068 if (rc != 0) { 1069 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1070 return false; 1071 } 1072 1073 return (sa.ss_family == AF_INET6); 1074 } 1075 1076 static bool 1077 uring_sock_is_ipv4(struct spdk_sock *_sock) 1078 { 1079 struct spdk_uring_sock *sock = __uring_sock(_sock); 1080 struct sockaddr_storage sa; 1081 socklen_t salen; 1082 int rc; 1083 1084 assert(sock != NULL); 1085 1086 memset(&sa, 0, sizeof sa); 1087 salen = sizeof sa; 1088 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1089 if (rc != 0) { 1090 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1091 return false; 1092 } 1093 1094 return (sa.ss_family == AF_INET); 1095 } 1096 1097 static bool 1098 uring_sock_is_connected(struct spdk_sock *_sock) 1099 { 1100 struct spdk_uring_sock *sock = __uring_sock(_sock); 1101 uint8_t byte; 1102 int rc; 1103 1104 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1105 if (rc == 0) { 1106 return false; 1107 } 1108 1109 if (rc < 0) { 1110 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1111 return true; 1112 } 1113 1114 return false; 1115 } 1116 1117 return true; 1118 } 1119 1120 static int 1121 uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1122 { 1123 int rc = -1; 1124 1125 #if defined(SO_INCOMING_NAPI_ID) 1126 struct spdk_uring_sock *sock = __uring_sock(_sock); 1127 socklen_t salen = sizeof(int); 1128 1129 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1130 if (rc != 0) { 1131 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1132 } 1133 1134 #endif 1135 return rc; 1136 } 1137 1138 static struct spdk_sock_group_impl * 1139 uring_sock_group_impl_create(void) 1140 { 1141 struct spdk_uring_sock_group_impl *group_impl; 1142 1143 group_impl = calloc(1, sizeof(*group_impl)); 1144 if (group_impl == NULL) { 1145 SPDK_ERRLOG("group_impl allocation failed\n"); 1146 return NULL; 1147 } 1148 1149 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1150 1151 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1152 SPDK_ERRLOG("uring I/O context setup failure\n"); 1153 free(group_impl); 1154 return NULL; 1155 } 1156 1157 TAILQ_INIT(&group_impl->pending_recv); 1158 1159 return &group_impl->base; 1160 } 1161 1162 static int 1163 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1164 struct spdk_sock *_sock) 1165 { 1166 struct spdk_uring_sock *sock = __uring_sock(_sock); 1167 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1168 1169 sock->group = group; 1170 sock->write_task.sock = sock; 1171 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1172 1173 sock->pollin_task.sock = sock; 1174 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1175 1176 sock->cancel_task.sock = sock; 1177 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1178 1179 /* switched from another polling group due to scheduling */ 1180 if (spdk_unlikely(sock->recv_pipe != NULL && 1181 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1182 assert(sock->pending_recv == false); 1183 sock->pending_recv = true; 1184 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1185 } 1186 1187 return 0; 1188 } 1189 1190 static int 1191 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1192 struct spdk_sock **socks) 1193 { 1194 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1195 int count, ret; 1196 int to_complete, to_submit; 1197 struct spdk_sock *_sock, *tmp; 1198 struct spdk_uring_sock *sock; 1199 1200 if (spdk_likely(socks)) { 1201 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1202 sock = __uring_sock(_sock); 1203 if (spdk_unlikely(sock->connection_status)) { 1204 continue; 1205 } 1206 _sock_flush(_sock); 1207 _sock_prep_pollin(_sock); 1208 } 1209 } 1210 1211 to_submit = group->io_queued; 1212 1213 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1214 if (to_submit > 0) { 1215 /* If there are I/O to submit, use io_uring_submit here. 1216 * It will automatically call io_uring_enter appropriately. */ 1217 ret = io_uring_submit(&group->uring); 1218 if (ret < 0) { 1219 return 1; 1220 } 1221 group->io_queued = 0; 1222 group->io_inflight += to_submit; 1223 group->io_avail -= to_submit; 1224 } 1225 1226 count = 0; 1227 to_complete = group->io_inflight; 1228 if (to_complete > 0) { 1229 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1230 } 1231 1232 return count; 1233 } 1234 1235 static int 1236 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1237 struct spdk_sock *_sock) 1238 { 1239 struct spdk_uring_sock *sock = __uring_sock(_sock); 1240 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1241 1242 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1243 _sock_prep_cancel_task(_sock, &sock->write_task); 1244 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1245 * currently can use a while loop here. */ 1246 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1247 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1248 uring_sock_group_impl_poll(_group, 32, NULL); 1249 } 1250 } 1251 1252 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1253 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1254 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1255 * currently can use a while loop here. */ 1256 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1257 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1258 uring_sock_group_impl_poll(_group, 32, NULL); 1259 } 1260 } 1261 1262 if (sock->pending_recv) { 1263 TAILQ_REMOVE(&group->pending_recv, sock, link); 1264 sock->pending_recv = false; 1265 } 1266 assert(sock->pending_recv == false); 1267 1268 sock->group = NULL; 1269 return 0; 1270 } 1271 1272 static int 1273 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1274 { 1275 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1276 1277 /* try to reap all the active I/O */ 1278 while (group->io_inflight) { 1279 uring_sock_group_impl_poll(_group, 32, NULL); 1280 } 1281 assert(group->io_inflight == 0); 1282 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1283 1284 io_uring_queue_exit(&group->uring); 1285 1286 free(group); 1287 return 0; 1288 } 1289 1290 static int 1291 uring_sock_flush(struct spdk_sock *_sock) 1292 { 1293 struct spdk_uring_sock *sock = __uring_sock(_sock); 1294 1295 if (!sock->group) { 1296 return _sock_flush_client(_sock); 1297 } 1298 1299 return 0; 1300 } 1301 1302 static struct spdk_net_impl g_uring_net_impl = { 1303 .name = "uring", 1304 .getaddr = uring_sock_getaddr, 1305 .connect = uring_sock_connect, 1306 .listen = uring_sock_listen, 1307 .accept = uring_sock_accept, 1308 .close = uring_sock_close, 1309 .recv = uring_sock_recv, 1310 .readv = uring_sock_readv, 1311 .writev = uring_sock_writev, 1312 .writev_async = uring_sock_writev_async, 1313 .flush = uring_sock_flush, 1314 .set_recvlowat = uring_sock_set_recvlowat, 1315 .set_recvbuf = uring_sock_set_recvbuf, 1316 .set_sendbuf = uring_sock_set_sendbuf, 1317 .is_ipv6 = uring_sock_is_ipv6, 1318 .is_ipv4 = uring_sock_is_ipv4, 1319 .is_connected = uring_sock_is_connected, 1320 .get_placement_id = uring_sock_get_placement_id, 1321 .group_impl_create = uring_sock_group_impl_create, 1322 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1323 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1324 .group_impl_poll = uring_sock_group_impl_poll, 1325 .group_impl_close = uring_sock_group_impl_close, 1326 }; 1327 1328 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); 1329