1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/config.h" 36 37 #include <sys/epoll.h> 38 #include <liburing.h> 39 40 #include "spdk/barrier.h" 41 #include "spdk/likely.h" 42 #include "spdk/log.h" 43 #include "spdk/pipe.h" 44 #include "spdk/sock.h" 45 #include "spdk/string.h" 46 #include "spdk/util.h" 47 48 #include "spdk_internal/sock.h" 49 #include "spdk_internal/assert.h" 50 51 #define MAX_TMPBUF 1024 52 #define PORTNUMLEN 32 53 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 54 #define IOV_BATCH_SIZE 64 55 56 enum spdk_sock_task_type { 57 SPDK_SOCK_TASK_POLLIN = 0, 58 SPDK_SOCK_TASK_WRITE, 59 SPDK_SOCK_TASK_CANCEL, 60 }; 61 62 enum spdk_uring_sock_task_status { 63 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 64 SPDK_URING_SOCK_TASK_IN_PROCESS, 65 }; 66 67 struct spdk_uring_task { 68 enum spdk_uring_sock_task_status status; 69 enum spdk_sock_task_type type; 70 struct spdk_uring_sock *sock; 71 struct msghdr msg; 72 struct iovec iovs[IOV_BATCH_SIZE]; 73 int iov_cnt; 74 struct spdk_sock_request *last_req; 75 STAILQ_ENTRY(spdk_uring_task) link; 76 }; 77 78 struct spdk_uring_sock { 79 struct spdk_sock base; 80 int fd; 81 struct spdk_uring_sock_group_impl *group; 82 struct spdk_uring_task write_task; 83 struct spdk_uring_task pollin_task; 84 struct spdk_uring_task cancel_task; 85 struct spdk_pipe *recv_pipe; 86 void *recv_buf; 87 int recv_buf_sz; 88 bool pending_recv; 89 int connection_status; 90 TAILQ_ENTRY(spdk_uring_sock) link; 91 }; 92 93 struct spdk_uring_sock_group_impl { 94 struct spdk_sock_group_impl base; 95 struct io_uring uring; 96 uint32_t io_inflight; 97 uint32_t io_queued; 98 uint32_t io_avail; 99 TAILQ_HEAD(, spdk_uring_sock) pending_recv; 100 }; 101 102 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 103 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 104 .send_buf_size = MIN_SO_SNDBUF_SIZE, 105 .enable_recv_pipe = true, 106 .enable_quickack = false, 107 .enable_placement_id = false, 108 }; 109 110 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 111 112 static int 113 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 114 { 115 const char *result = NULL; 116 117 if (sa == NULL || host == NULL) { 118 return -1; 119 } 120 121 switch (sa->sa_family) { 122 case AF_INET: 123 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 124 host, hlen); 125 break; 126 case AF_INET6: 127 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 128 host, hlen); 129 break; 130 default: 131 break; 132 } 133 134 if (result != NULL) { 135 return 0; 136 } else { 137 return -1; 138 } 139 } 140 141 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 142 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 143 144 static int 145 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 146 char *caddr, int clen, uint16_t *cport) 147 { 148 struct spdk_uring_sock *sock = __uring_sock(_sock); 149 struct sockaddr_storage sa; 150 socklen_t salen; 151 int rc; 152 153 assert(sock != NULL); 154 155 memset(&sa, 0, sizeof sa); 156 salen = sizeof sa; 157 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 158 if (rc != 0) { 159 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 160 return -1; 161 } 162 163 switch (sa.ss_family) { 164 case AF_UNIX: 165 /* Acceptable connection types that don't have IPs */ 166 return 0; 167 case AF_INET: 168 case AF_INET6: 169 /* Code below will get IP addresses */ 170 break; 171 default: 172 /* Unsupported socket family */ 173 return -1; 174 } 175 176 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 177 if (rc != 0) { 178 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 179 return -1; 180 } 181 182 if (sport) { 183 if (sa.ss_family == AF_INET) { 184 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 185 } else if (sa.ss_family == AF_INET6) { 186 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 187 } 188 } 189 190 memset(&sa, 0, sizeof sa); 191 salen = sizeof sa; 192 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 193 if (rc != 0) { 194 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 195 return -1; 196 } 197 198 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 199 if (rc != 0) { 200 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 201 return -1; 202 } 203 204 if (cport) { 205 if (sa.ss_family == AF_INET) { 206 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 207 } else if (sa.ss_family == AF_INET6) { 208 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 209 } 210 } 211 212 return 0; 213 } 214 215 enum uring_sock_create_type { 216 SPDK_SOCK_CREATE_LISTEN, 217 SPDK_SOCK_CREATE_CONNECT, 218 }; 219 220 static int 221 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 222 { 223 uint8_t *new_buf; 224 struct spdk_pipe *new_pipe; 225 struct iovec siov[2]; 226 struct iovec diov[2]; 227 int sbytes; 228 ssize_t bytes; 229 230 if (sock->recv_buf_sz == sz) { 231 return 0; 232 } 233 234 /* If the new size is 0, just free the pipe */ 235 if (sz == 0) { 236 spdk_pipe_destroy(sock->recv_pipe); 237 free(sock->recv_buf); 238 sock->recv_pipe = NULL; 239 sock->recv_buf = NULL; 240 return 0; 241 } else if (sz < MIN_SOCK_PIPE_SIZE) { 242 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 243 return -1; 244 } 245 246 /* Round up to next 64 byte multiple */ 247 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 248 if (!new_buf) { 249 SPDK_ERRLOG("socket recv buf allocation failed\n"); 250 return -ENOMEM; 251 } 252 253 new_pipe = spdk_pipe_create(new_buf, sz + 1); 254 if (new_pipe == NULL) { 255 SPDK_ERRLOG("socket pipe allocation failed\n"); 256 free(new_buf); 257 return -ENOMEM; 258 } 259 260 if (sock->recv_pipe != NULL) { 261 /* Pull all of the data out of the old pipe */ 262 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 263 if (sbytes > sz) { 264 /* Too much data to fit into the new pipe size */ 265 spdk_pipe_destroy(new_pipe); 266 free(new_buf); 267 return -EINVAL; 268 } 269 270 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 271 assert(sbytes == sz); 272 273 bytes = spdk_iovcpy(siov, 2, diov, 2); 274 spdk_pipe_writer_advance(new_pipe, bytes); 275 276 spdk_pipe_destroy(sock->recv_pipe); 277 free(sock->recv_buf); 278 } 279 280 sock->recv_buf_sz = sz; 281 sock->recv_buf = new_buf; 282 sock->recv_pipe = new_pipe; 283 284 return 0; 285 } 286 287 static int 288 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 289 { 290 struct spdk_uring_sock *sock = __uring_sock(_sock); 291 int rc; 292 293 assert(sock != NULL); 294 295 if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) { 296 rc = uring_sock_alloc_pipe(sock, sz); 297 if (rc) { 298 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 299 return rc; 300 } 301 } 302 303 if (sz < MIN_SO_RCVBUF_SIZE) { 304 sz = MIN_SO_RCVBUF_SIZE; 305 } 306 307 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 308 if (rc < 0) { 309 return rc; 310 } 311 312 return 0; 313 } 314 315 static int 316 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 317 { 318 struct spdk_uring_sock *sock = __uring_sock(_sock); 319 int rc; 320 321 assert(sock != NULL); 322 323 if (sz < MIN_SO_SNDBUF_SIZE) { 324 sz = MIN_SO_SNDBUF_SIZE; 325 } 326 327 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 328 if (rc < 0) { 329 return rc; 330 } 331 332 return 0; 333 } 334 335 static struct spdk_uring_sock * 336 uring_sock_alloc(int fd) 337 { 338 struct spdk_uring_sock *sock; 339 #if defined(__linux__) 340 int flag; 341 int rc; 342 #endif 343 344 sock = calloc(1, sizeof(*sock)); 345 if (sock == NULL) { 346 SPDK_ERRLOG("sock allocation failed\n"); 347 return NULL; 348 } 349 350 sock->fd = fd; 351 352 #if defined(__linux__) 353 flag = 1; 354 355 if (g_spdk_uring_sock_impl_opts.enable_quickack) { 356 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 357 if (rc != 0) { 358 SPDK_ERRLOG("quickack was failed to set\n"); 359 } 360 } 361 #endif 362 return sock; 363 } 364 365 static struct spdk_sock * 366 uring_sock_create(const char *ip, int port, 367 enum uring_sock_create_type type, 368 struct spdk_sock_opts *opts) 369 { 370 struct spdk_uring_sock *sock; 371 char buf[MAX_TMPBUF]; 372 char portnum[PORTNUMLEN]; 373 char *p; 374 struct addrinfo hints, *res, *res0; 375 int fd, flag; 376 int val = 1; 377 int rc; 378 379 if (ip == NULL) { 380 return NULL; 381 } 382 if (ip[0] == '[') { 383 snprintf(buf, sizeof(buf), "%s", ip + 1); 384 p = strchr(buf, ']'); 385 if (p != NULL) { 386 *p = '\0'; 387 } 388 ip = (const char *) &buf[0]; 389 } 390 391 snprintf(portnum, sizeof portnum, "%d", port); 392 memset(&hints, 0, sizeof hints); 393 hints.ai_family = PF_UNSPEC; 394 hints.ai_socktype = SOCK_STREAM; 395 hints.ai_flags = AI_NUMERICSERV; 396 hints.ai_flags |= AI_PASSIVE; 397 hints.ai_flags |= AI_NUMERICHOST; 398 rc = getaddrinfo(ip, portnum, &hints, &res0); 399 if (rc != 0) { 400 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 401 return NULL; 402 } 403 404 /* try listen */ 405 fd = -1; 406 for (res = res0; res != NULL; res = res->ai_next) { 407 retry: 408 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 409 if (fd < 0) { 410 /* error */ 411 continue; 412 } 413 414 val = g_spdk_uring_sock_impl_opts.recv_buf_size; 415 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 416 if (rc) { 417 /* Not fatal */ 418 } 419 420 val = g_spdk_uring_sock_impl_opts.send_buf_size; 421 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 422 if (rc) { 423 /* Not fatal */ 424 } 425 426 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 427 if (rc != 0) { 428 close(fd); 429 /* error */ 430 continue; 431 } 432 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 433 if (rc != 0) { 434 close(fd); 435 /* error */ 436 continue; 437 } 438 439 #if defined(SO_PRIORITY) 440 if (opts != NULL && opts->priority) { 441 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 442 if (rc != 0) { 443 close(fd); 444 /* error */ 445 continue; 446 } 447 } 448 #endif 449 if (res->ai_family == AF_INET6) { 450 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 451 if (rc != 0) { 452 close(fd); 453 /* error */ 454 continue; 455 } 456 } 457 458 if (type == SPDK_SOCK_CREATE_LISTEN) { 459 rc = bind(fd, res->ai_addr, res->ai_addrlen); 460 if (rc != 0) { 461 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 462 switch (errno) { 463 case EINTR: 464 /* interrupted? */ 465 close(fd); 466 goto retry; 467 case EADDRNOTAVAIL: 468 SPDK_ERRLOG("IP address %s not available. " 469 "Verify IP address in config file " 470 "and make sure setup script is " 471 "run before starting spdk app.\n", ip); 472 /* FALLTHROUGH */ 473 default: 474 /* try next family */ 475 close(fd); 476 fd = -1; 477 continue; 478 } 479 } 480 /* bind OK */ 481 rc = listen(fd, 512); 482 if (rc != 0) { 483 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 484 close(fd); 485 fd = -1; 486 break; 487 } 488 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 489 rc = connect(fd, res->ai_addr, res->ai_addrlen); 490 if (rc != 0) { 491 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 492 /* try next family */ 493 close(fd); 494 fd = -1; 495 continue; 496 } 497 } 498 499 flag = fcntl(fd, F_GETFL); 500 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 501 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 502 close(fd); 503 fd = -1; 504 break; 505 } 506 break; 507 } 508 freeaddrinfo(res0); 509 510 if (fd < 0) { 511 return NULL; 512 } 513 514 sock = uring_sock_alloc(fd); 515 if (sock == NULL) { 516 SPDK_ERRLOG("sock allocation failed\n"); 517 close(fd); 518 return NULL; 519 } 520 521 return &sock->base; 522 } 523 524 static struct spdk_sock * 525 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 526 { 527 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 528 } 529 530 static struct spdk_sock * 531 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 532 { 533 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 534 } 535 536 static struct spdk_sock * 537 uring_sock_accept(struct spdk_sock *_sock) 538 { 539 struct spdk_uring_sock *sock = __uring_sock(_sock); 540 struct sockaddr_storage sa; 541 socklen_t salen; 542 int rc, fd; 543 struct spdk_uring_sock *new_sock; 544 int flag; 545 546 memset(&sa, 0, sizeof(sa)); 547 salen = sizeof(sa); 548 549 assert(sock != NULL); 550 551 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 552 553 if (rc == -1) { 554 return NULL; 555 } 556 557 fd = rc; 558 559 flag = fcntl(fd, F_GETFL); 560 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 561 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 562 close(fd); 563 return NULL; 564 } 565 566 #if defined(SO_PRIORITY) 567 /* The priority is not inherited, so call this function again */ 568 if (sock->base.opts.priority) { 569 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 570 if (rc != 0) { 571 close(fd); 572 return NULL; 573 } 574 } 575 #endif 576 577 new_sock = uring_sock_alloc(fd); 578 if (new_sock == NULL) { 579 close(fd); 580 return NULL; 581 } 582 583 return &new_sock->base; 584 } 585 586 static int 587 uring_sock_close(struct spdk_sock *_sock) 588 { 589 struct spdk_uring_sock *sock = __uring_sock(_sock); 590 int rc; 591 592 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 593 assert(sock->group == NULL); 594 595 spdk_pipe_destroy(sock->recv_pipe); 596 free(sock->recv_buf); 597 rc = close(sock->fd); 598 if (rc == 0) { 599 free(sock); 600 } 601 602 return rc; 603 } 604 605 static ssize_t 606 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 607 { 608 struct iovec siov[2]; 609 int sbytes; 610 ssize_t bytes; 611 struct spdk_uring_sock_group_impl *group; 612 613 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 614 if (sbytes < 0) { 615 errno = EINVAL; 616 return -1; 617 } else if (sbytes == 0) { 618 errno = EAGAIN; 619 return -1; 620 } 621 622 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 623 624 if (bytes == 0) { 625 /* The only way this happens is if diov is 0 length */ 626 errno = EINVAL; 627 return -1; 628 } 629 630 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 631 632 /* If we drained the pipe, take it off the level-triggered list */ 633 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 634 group = __uring_group_impl(sock->base.group_impl); 635 TAILQ_REMOVE(&group->pending_recv, sock, link); 636 sock->pending_recv = false; 637 } 638 639 return bytes; 640 } 641 642 static inline ssize_t 643 uring_sock_read(struct spdk_uring_sock *sock) 644 { 645 struct iovec iov[2]; 646 int bytes; 647 struct spdk_uring_sock_group_impl *group; 648 649 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 650 651 if (bytes > 0) { 652 bytes = readv(sock->fd, iov, 2); 653 if (bytes > 0) { 654 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 655 if (sock->base.group_impl) { 656 group = __uring_group_impl(sock->base.group_impl); 657 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 658 sock->pending_recv = true; 659 } 660 } 661 } 662 663 return bytes; 664 } 665 666 static ssize_t 667 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 668 { 669 struct spdk_uring_sock *sock = __uring_sock(_sock); 670 int rc, i; 671 size_t len; 672 673 if (sock->recv_pipe == NULL) { 674 return readv(sock->fd, iov, iovcnt); 675 } 676 677 len = 0; 678 for (i = 0; i < iovcnt; i++) { 679 len += iov[i].iov_len; 680 } 681 682 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 683 /* If the user is receiving a sufficiently large amount of data, 684 * receive directly to their buffers. */ 685 if (len >= MIN_SOCK_PIPE_SIZE) { 686 return readv(sock->fd, iov, iovcnt); 687 } 688 689 /* Otherwise, do a big read into our pipe */ 690 rc = uring_sock_read(sock); 691 if (rc <= 0) { 692 return rc; 693 } 694 } 695 696 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 697 } 698 699 static ssize_t 700 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 701 { 702 struct iovec iov[1]; 703 704 iov[0].iov_base = buf; 705 iov[0].iov_len = len; 706 707 return uring_sock_readv(sock, iov, 1); 708 } 709 710 static ssize_t 711 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 712 { 713 struct spdk_uring_sock *sock = __uring_sock(_sock); 714 715 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 716 errno = EAGAIN; 717 return -1; 718 } 719 720 return writev(sock->fd, iov, iovcnt); 721 } 722 723 static int 724 sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, 725 struct spdk_sock_request **last_req) 726 { 727 int iovcnt, i; 728 struct spdk_sock_request *req; 729 unsigned int offset; 730 731 /* Gather an iov */ 732 iovcnt = index; 733 if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) { 734 goto end; 735 } 736 737 if (last_req != NULL && *last_req != NULL) { 738 req = TAILQ_NEXT(*last_req, internal.link); 739 } else { 740 req = TAILQ_FIRST(&_sock->queued_reqs); 741 } 742 743 while (req) { 744 offset = req->internal.offset; 745 746 for (i = 0; i < req->iovcnt; i++) { 747 /* Consume any offset first */ 748 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 749 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 750 continue; 751 } 752 753 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 754 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 755 iovcnt++; 756 757 offset = 0; 758 759 if (iovcnt >= IOV_BATCH_SIZE) { 760 break; 761 } 762 } 763 if (iovcnt >= IOV_BATCH_SIZE) { 764 break; 765 } 766 767 if (last_req != NULL) { 768 *last_req = req; 769 } 770 req = TAILQ_NEXT(req, internal.link); 771 } 772 773 end: 774 return iovcnt; 775 } 776 777 static int 778 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) 779 { 780 struct spdk_sock_request *req; 781 int i, retval; 782 unsigned int offset; 783 size_t len; 784 785 /* Consume the requests that were actually written */ 786 req = TAILQ_FIRST(&_sock->queued_reqs); 787 while (req) { 788 offset = req->internal.offset; 789 790 for (i = 0; i < req->iovcnt; i++) { 791 /* Advance by the offset first */ 792 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 793 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 794 continue; 795 } 796 797 /* Calculate the remaining length of this element */ 798 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 799 800 if (len > (size_t)rc) { 801 /* This element was partially sent. */ 802 req->internal.offset += rc; 803 return 0; 804 } 805 806 offset = 0; 807 req->internal.offset += len; 808 rc -= len; 809 } 810 811 /* Handled a full request. */ 812 spdk_sock_request_pend(_sock, req); 813 814 retval = spdk_sock_request_put(_sock, req, 0); 815 if (retval) { 816 return retval; 817 } 818 819 if (rc == 0) { 820 break; 821 } 822 823 req = TAILQ_FIRST(&_sock->queued_reqs); 824 } 825 826 return 0; 827 } 828 829 static void 830 _sock_flush(struct spdk_sock *_sock) 831 { 832 struct spdk_uring_sock *sock = __uring_sock(_sock); 833 struct spdk_uring_task *task = &sock->write_task; 834 uint32_t iovcnt; 835 struct io_uring_sqe *sqe; 836 837 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 838 return; 839 } 840 841 iovcnt = sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); 842 if (!iovcnt) { 843 return; 844 } 845 846 task->iov_cnt = iovcnt; 847 assert(sock->group != NULL); 848 task->msg.msg_iov = task->iovs; 849 task->msg.msg_iovlen = task->iov_cnt; 850 851 sock->group->io_queued++; 852 853 sqe = io_uring_get_sqe(&sock->group->uring); 854 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0); 855 io_uring_sqe_set_data(sqe, task); 856 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 857 } 858 859 static void 860 _sock_prep_pollin(struct spdk_sock *_sock) 861 { 862 struct spdk_uring_sock *sock = __uring_sock(_sock); 863 struct spdk_uring_task *task = &sock->pollin_task; 864 struct io_uring_sqe *sqe; 865 866 /* Do not prepare pollin event */ 867 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { 868 return; 869 } 870 871 assert(sock->group != NULL); 872 sock->group->io_queued++; 873 874 sqe = io_uring_get_sqe(&sock->group->uring); 875 io_uring_prep_poll_add(sqe, sock->fd, POLLIN); 876 io_uring_sqe_set_data(sqe, task); 877 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 878 } 879 880 static void 881 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 882 { 883 struct spdk_uring_sock *sock = __uring_sock(_sock); 884 struct spdk_uring_task *task = &sock->cancel_task; 885 struct io_uring_sqe *sqe; 886 887 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 888 return; 889 } 890 891 assert(sock->group != NULL); 892 sock->group->io_queued++; 893 894 sqe = io_uring_get_sqe(&sock->group->uring); 895 io_uring_prep_cancel(sqe, user_data, 0); 896 io_uring_sqe_set_data(sqe, task); 897 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 898 } 899 900 static int 901 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 902 struct spdk_sock **socks) 903 { 904 int i, count, ret; 905 struct io_uring_cqe *cqe; 906 struct spdk_uring_sock *sock, *tmp; 907 struct spdk_uring_task *task; 908 int status; 909 910 for (i = 0; i < max; i++) { 911 ret = io_uring_peek_cqe(&group->uring, &cqe); 912 if (ret != 0) { 913 break; 914 } 915 916 if (cqe == NULL) { 917 break; 918 } 919 920 task = (struct spdk_uring_task *)cqe->user_data; 921 assert(task != NULL); 922 sock = task->sock; 923 assert(sock != NULL); 924 assert(sock->group != NULL); 925 assert(sock->group == group); 926 sock->group->io_inflight--; 927 sock->group->io_avail++; 928 status = cqe->res; 929 io_uring_cqe_seen(&group->uring, cqe); 930 931 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 932 933 if (spdk_unlikely(status <= 0)) { 934 if (status == -EAGAIN || status == -EWOULDBLOCK) { 935 continue; 936 } 937 } 938 939 switch (task->type) { 940 case SPDK_SOCK_TASK_POLLIN: 941 if ((status & POLLIN) == POLLIN) { 942 if (sock->base.cb_fn != NULL) { 943 assert(sock->pending_recv == false); 944 sock->pending_recv = true; 945 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 946 } 947 } 948 break; 949 case SPDK_SOCK_TASK_WRITE: 950 assert(TAILQ_EMPTY(&sock->base.pending_reqs)); 951 task->last_req = NULL; 952 task->iov_cnt = 0; 953 if (spdk_unlikely(status) < 0) { 954 sock->connection_status = status; 955 spdk_sock_abort_requests(&sock->base); 956 } else { 957 sock_complete_reqs(&sock->base, status); 958 } 959 960 break; 961 case SPDK_SOCK_TASK_CANCEL: 962 /* Do nothing */ 963 break; 964 default: 965 SPDK_UNREACHABLE(); 966 } 967 } 968 969 if (!socks) { 970 return 0; 971 } 972 count = 0; 973 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 974 if (count == max_read_events) { 975 break; 976 } 977 978 socks[count++] = &sock->base; 979 } 980 981 /* Cycle the pending_recv list so that each time we poll things aren't 982 * in the same order. */ 983 for (i = 0; i < count; i++) { 984 sock = __uring_sock(socks[i]); 985 986 TAILQ_REMOVE(&group->pending_recv, sock, link); 987 988 if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 989 sock->pending_recv = false; 990 } else { 991 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 992 } 993 } 994 995 return count; 996 } 997 998 static int 999 _sock_flush_client(struct spdk_sock *_sock) 1000 { 1001 struct spdk_uring_sock *sock = __uring_sock(_sock); 1002 struct msghdr msg = {}; 1003 struct iovec iovs[IOV_BATCH_SIZE]; 1004 int iovcnt; 1005 ssize_t rc; 1006 1007 /* Can't flush from within a callback or we end up with recursive calls */ 1008 if (_sock->cb_cnt > 0) { 1009 return 0; 1010 } 1011 1012 /* Gather an iov */ 1013 iovcnt = sock_prep_reqs(_sock, iovs, 0, NULL); 1014 if (iovcnt == 0) { 1015 return 0; 1016 } 1017 1018 /* Perform the vectored write */ 1019 msg.msg_iov = iovs; 1020 msg.msg_iovlen = iovcnt; 1021 rc = sendmsg(sock->fd, &msg, 0); 1022 if (rc <= 0) { 1023 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1024 return 0; 1025 } 1026 return rc; 1027 } 1028 1029 sock_complete_reqs(_sock, rc); 1030 1031 return 0; 1032 } 1033 1034 static void 1035 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1036 { 1037 struct spdk_uring_sock *sock = __uring_sock(_sock); 1038 int rc; 1039 1040 if (spdk_unlikely(sock->connection_status)) { 1041 req->cb_fn(req->cb_arg, sock->connection_status); 1042 return; 1043 } 1044 1045 spdk_sock_request_queue(_sock, req); 1046 1047 if (!sock->group) { 1048 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1049 rc = _sock_flush_client(_sock); 1050 if (rc) { 1051 spdk_sock_abort_requests(_sock); 1052 } 1053 } 1054 } 1055 } 1056 1057 static int 1058 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1059 { 1060 struct spdk_uring_sock *sock = __uring_sock(_sock); 1061 int val; 1062 int rc; 1063 1064 assert(sock != NULL); 1065 1066 val = nbytes; 1067 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1068 if (rc != 0) { 1069 return -1; 1070 } 1071 return 0; 1072 } 1073 1074 static bool 1075 uring_sock_is_ipv6(struct spdk_sock *_sock) 1076 { 1077 struct spdk_uring_sock *sock = __uring_sock(_sock); 1078 struct sockaddr_storage sa; 1079 socklen_t salen; 1080 int rc; 1081 1082 assert(sock != NULL); 1083 1084 memset(&sa, 0, sizeof sa); 1085 salen = sizeof sa; 1086 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1087 if (rc != 0) { 1088 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1089 return false; 1090 } 1091 1092 return (sa.ss_family == AF_INET6); 1093 } 1094 1095 static bool 1096 uring_sock_is_ipv4(struct spdk_sock *_sock) 1097 { 1098 struct spdk_uring_sock *sock = __uring_sock(_sock); 1099 struct sockaddr_storage sa; 1100 socklen_t salen; 1101 int rc; 1102 1103 assert(sock != NULL); 1104 1105 memset(&sa, 0, sizeof sa); 1106 salen = sizeof sa; 1107 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1108 if (rc != 0) { 1109 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1110 return false; 1111 } 1112 1113 return (sa.ss_family == AF_INET); 1114 } 1115 1116 static bool 1117 uring_sock_is_connected(struct spdk_sock *_sock) 1118 { 1119 struct spdk_uring_sock *sock = __uring_sock(_sock); 1120 uint8_t byte; 1121 int rc; 1122 1123 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1124 if (rc == 0) { 1125 return false; 1126 } 1127 1128 if (rc < 0) { 1129 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1130 return true; 1131 } 1132 1133 return false; 1134 } 1135 1136 return true; 1137 } 1138 1139 static int 1140 uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1141 { 1142 int rc = -1; 1143 1144 if (!g_spdk_uring_sock_impl_opts.enable_placement_id) { 1145 return rc; 1146 } 1147 1148 #if defined(SO_INCOMING_NAPI_ID) 1149 struct spdk_uring_sock *sock = __uring_sock(_sock); 1150 socklen_t salen = sizeof(int); 1151 1152 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1153 if (rc != 0) { 1154 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1155 } 1156 1157 #endif 1158 return rc; 1159 } 1160 1161 static struct spdk_sock_group_impl * 1162 uring_sock_group_impl_create(void) 1163 { 1164 struct spdk_uring_sock_group_impl *group_impl; 1165 1166 group_impl = calloc(1, sizeof(*group_impl)); 1167 if (group_impl == NULL) { 1168 SPDK_ERRLOG("group_impl allocation failed\n"); 1169 return NULL; 1170 } 1171 1172 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1173 1174 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1175 SPDK_ERRLOG("uring I/O context setup failure\n"); 1176 free(group_impl); 1177 return NULL; 1178 } 1179 1180 TAILQ_INIT(&group_impl->pending_recv); 1181 1182 return &group_impl->base; 1183 } 1184 1185 static int 1186 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1187 struct spdk_sock *_sock) 1188 { 1189 struct spdk_uring_sock *sock = __uring_sock(_sock); 1190 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1191 1192 sock->group = group; 1193 sock->write_task.sock = sock; 1194 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1195 1196 sock->pollin_task.sock = sock; 1197 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1198 1199 sock->cancel_task.sock = sock; 1200 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1201 1202 /* switched from another polling group due to scheduling */ 1203 if (spdk_unlikely(sock->recv_pipe != NULL && 1204 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1205 assert(sock->pending_recv == false); 1206 sock->pending_recv = true; 1207 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1208 } 1209 1210 return 0; 1211 } 1212 1213 static int 1214 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1215 struct spdk_sock **socks) 1216 { 1217 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1218 int count, ret; 1219 int to_complete, to_submit; 1220 struct spdk_sock *_sock, *tmp; 1221 struct spdk_uring_sock *sock; 1222 1223 if (spdk_likely(socks)) { 1224 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1225 sock = __uring_sock(_sock); 1226 if (spdk_unlikely(sock->connection_status)) { 1227 continue; 1228 } 1229 _sock_flush(_sock); 1230 _sock_prep_pollin(_sock); 1231 } 1232 } 1233 1234 to_submit = group->io_queued; 1235 1236 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1237 if (to_submit > 0) { 1238 /* If there are I/O to submit, use io_uring_submit here. 1239 * It will automatically call io_uring_enter appropriately. */ 1240 ret = io_uring_submit(&group->uring); 1241 if (ret < 0) { 1242 return 1; 1243 } 1244 group->io_queued = 0; 1245 group->io_inflight += to_submit; 1246 group->io_avail -= to_submit; 1247 } 1248 1249 count = 0; 1250 to_complete = group->io_inflight; 1251 if (to_complete > 0) { 1252 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1253 } 1254 1255 return count; 1256 } 1257 1258 static int 1259 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1260 struct spdk_sock *_sock) 1261 { 1262 struct spdk_uring_sock *sock = __uring_sock(_sock); 1263 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1264 1265 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1266 _sock_prep_cancel_task(_sock, &sock->write_task); 1267 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1268 * currently can use a while loop here. */ 1269 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1270 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1271 uring_sock_group_impl_poll(_group, 32, NULL); 1272 } 1273 } 1274 1275 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1276 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1277 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1278 * currently can use a while loop here. */ 1279 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1280 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1281 uring_sock_group_impl_poll(_group, 32, NULL); 1282 } 1283 } 1284 1285 if (sock->pending_recv) { 1286 TAILQ_REMOVE(&group->pending_recv, sock, link); 1287 sock->pending_recv = false; 1288 } 1289 assert(sock->pending_recv == false); 1290 1291 sock->group = NULL; 1292 return 0; 1293 } 1294 1295 static int 1296 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1297 { 1298 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1299 1300 /* try to reap all the active I/O */ 1301 while (group->io_inflight) { 1302 uring_sock_group_impl_poll(_group, 32, NULL); 1303 } 1304 assert(group->io_inflight == 0); 1305 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1306 1307 io_uring_queue_exit(&group->uring); 1308 1309 free(group); 1310 return 0; 1311 } 1312 1313 static int 1314 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1315 { 1316 if (!opts || !len) { 1317 errno = EINVAL; 1318 return -1; 1319 } 1320 1321 #define FIELD_OK(field) \ 1322 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1323 1324 #define GET_FIELD(field) \ 1325 if (FIELD_OK(field)) { \ 1326 opts->field = g_spdk_uring_sock_impl_opts.field; \ 1327 } 1328 1329 GET_FIELD(recv_buf_size); 1330 GET_FIELD(send_buf_size); 1331 GET_FIELD(enable_recv_pipe); 1332 GET_FIELD(enable_quickack); 1333 GET_FIELD(enable_placement_id); 1334 1335 #undef GET_FIELD 1336 #undef FIELD_OK 1337 1338 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 1339 return 0; 1340 } 1341 1342 static int 1343 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1344 { 1345 if (!opts) { 1346 errno = EINVAL; 1347 return -1; 1348 } 1349 1350 #define FIELD_OK(field) \ 1351 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1352 1353 #define SET_FIELD(field) \ 1354 if (FIELD_OK(field)) { \ 1355 g_spdk_uring_sock_impl_opts.field = opts->field; \ 1356 } 1357 1358 SET_FIELD(recv_buf_size); 1359 SET_FIELD(send_buf_size); 1360 SET_FIELD(enable_recv_pipe); 1361 SET_FIELD(enable_quickack); 1362 SET_FIELD(enable_placement_id); 1363 1364 #undef SET_FIELD 1365 #undef FIELD_OK 1366 1367 return 0; 1368 } 1369 1370 static int 1371 uring_sock_flush(struct spdk_sock *_sock) 1372 { 1373 struct spdk_uring_sock *sock = __uring_sock(_sock); 1374 1375 if (!sock->group) { 1376 return _sock_flush_client(_sock); 1377 } 1378 1379 return 0; 1380 } 1381 1382 static struct spdk_net_impl g_uring_net_impl = { 1383 .name = "uring", 1384 .getaddr = uring_sock_getaddr, 1385 .connect = uring_sock_connect, 1386 .listen = uring_sock_listen, 1387 .accept = uring_sock_accept, 1388 .close = uring_sock_close, 1389 .recv = uring_sock_recv, 1390 .readv = uring_sock_readv, 1391 .writev = uring_sock_writev, 1392 .writev_async = uring_sock_writev_async, 1393 .flush = uring_sock_flush, 1394 .set_recvlowat = uring_sock_set_recvlowat, 1395 .set_recvbuf = uring_sock_set_recvbuf, 1396 .set_sendbuf = uring_sock_set_sendbuf, 1397 .is_ipv6 = uring_sock_is_ipv6, 1398 .is_ipv4 = uring_sock_is_ipv4, 1399 .is_connected = uring_sock_is_connected, 1400 .get_placement_id = uring_sock_get_placement_id, 1401 .group_impl_create = uring_sock_group_impl_create, 1402 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1403 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1404 .group_impl_poll = uring_sock_group_impl_poll, 1405 .group_impl_close = uring_sock_group_impl_close, 1406 .get_opts = uring_sock_impl_get_opts, 1407 .set_opts = uring_sock_impl_set_opts, 1408 }; 1409 1410 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); 1411