1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/config.h" 36 37 #include <sys/epoll.h> 38 #include <liburing.h> 39 40 #include "spdk/barrier.h" 41 #include "spdk/log.h" 42 #include "spdk/pipe.h" 43 #include "spdk/sock.h" 44 #include "spdk/string.h" 45 #include "spdk/util.h" 46 47 #include "spdk_internal/sock.h" 48 #include "spdk_internal/assert.h" 49 50 #define MAX_TMPBUF 1024 51 #define PORTNUMLEN 32 52 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 53 54 enum spdk_sock_task_type { 55 SPDK_SOCK_TASK_POLLIN = 0, 56 SPDK_SOCK_TASK_WRITE, 57 SPDK_SOCK_TASK_CANCEL, 58 }; 59 60 enum spdk_uring_sock_task_status { 61 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 62 SPDK_URING_SOCK_TASK_IN_PROCESS, 63 }; 64 65 struct spdk_uring_task { 66 enum spdk_uring_sock_task_status status; 67 enum spdk_sock_task_type type; 68 struct spdk_uring_sock *sock; 69 struct msghdr msg; 70 struct iovec iovs[IOV_BATCH_SIZE]; 71 int iov_cnt; 72 struct spdk_sock_request *last_req; 73 STAILQ_ENTRY(spdk_uring_task) link; 74 }; 75 76 struct spdk_uring_sock { 77 struct spdk_sock base; 78 int fd; 79 struct spdk_uring_sock_group_impl *group; 80 struct spdk_uring_task write_task; 81 struct spdk_uring_task pollin_task; 82 struct spdk_uring_task cancel_task; 83 struct spdk_pipe *recv_pipe; 84 void *recv_buf; 85 int recv_buf_sz; 86 bool pending_recv; 87 int connection_status; 88 TAILQ_ENTRY(spdk_uring_sock) link; 89 }; 90 91 struct spdk_uring_sock_group_impl { 92 struct spdk_sock_group_impl base; 93 struct io_uring uring; 94 uint32_t io_inflight; 95 uint32_t io_queued; 96 uint32_t io_avail; 97 TAILQ_HEAD(, spdk_uring_sock) pending_recv; 98 }; 99 100 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 101 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 102 .send_buf_size = MIN_SO_SNDBUF_SIZE, 103 .enable_recv_pipe = true, 104 .enable_quickack = false, 105 .enable_placement_id = false, 106 }; 107 108 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 109 110 static int 111 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 112 { 113 const char *result = NULL; 114 115 if (sa == NULL || host == NULL) { 116 return -1; 117 } 118 119 switch (sa->sa_family) { 120 case AF_INET: 121 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 122 host, hlen); 123 break; 124 case AF_INET6: 125 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 126 host, hlen); 127 break; 128 default: 129 break; 130 } 131 132 if (result != NULL) { 133 return 0; 134 } else { 135 return -1; 136 } 137 } 138 139 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 140 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 141 142 static int 143 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 144 char *caddr, int clen, uint16_t *cport) 145 { 146 struct spdk_uring_sock *sock = __uring_sock(_sock); 147 struct sockaddr_storage sa; 148 socklen_t salen; 149 int rc; 150 151 assert(sock != NULL); 152 153 memset(&sa, 0, sizeof sa); 154 salen = sizeof sa; 155 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 156 if (rc != 0) { 157 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 158 return -1; 159 } 160 161 switch (sa.ss_family) { 162 case AF_UNIX: 163 /* Acceptable connection types that don't have IPs */ 164 return 0; 165 case AF_INET: 166 case AF_INET6: 167 /* Code below will get IP addresses */ 168 break; 169 default: 170 /* Unsupported socket family */ 171 return -1; 172 } 173 174 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 175 if (rc != 0) { 176 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 177 return -1; 178 } 179 180 if (sport) { 181 if (sa.ss_family == AF_INET) { 182 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 183 } else if (sa.ss_family == AF_INET6) { 184 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 185 } 186 } 187 188 memset(&sa, 0, sizeof sa); 189 salen = sizeof sa; 190 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 191 if (rc != 0) { 192 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 193 return -1; 194 } 195 196 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 197 if (rc != 0) { 198 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 199 return -1; 200 } 201 202 if (cport) { 203 if (sa.ss_family == AF_INET) { 204 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 205 } else if (sa.ss_family == AF_INET6) { 206 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 207 } 208 } 209 210 return 0; 211 } 212 213 enum uring_sock_create_type { 214 SPDK_SOCK_CREATE_LISTEN, 215 SPDK_SOCK_CREATE_CONNECT, 216 }; 217 218 static int 219 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 220 { 221 uint8_t *new_buf; 222 struct spdk_pipe *new_pipe; 223 struct iovec siov[2]; 224 struct iovec diov[2]; 225 int sbytes; 226 ssize_t bytes; 227 228 if (sock->recv_buf_sz == sz) { 229 return 0; 230 } 231 232 /* If the new size is 0, just free the pipe */ 233 if (sz == 0) { 234 spdk_pipe_destroy(sock->recv_pipe); 235 free(sock->recv_buf); 236 sock->recv_pipe = NULL; 237 sock->recv_buf = NULL; 238 return 0; 239 } else if (sz < MIN_SOCK_PIPE_SIZE) { 240 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 241 return -1; 242 } 243 244 /* Round up to next 64 byte multiple */ 245 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 246 if (!new_buf) { 247 SPDK_ERRLOG("socket recv buf allocation failed\n"); 248 return -ENOMEM; 249 } 250 251 new_pipe = spdk_pipe_create(new_buf, sz + 1); 252 if (new_pipe == NULL) { 253 SPDK_ERRLOG("socket pipe allocation failed\n"); 254 free(new_buf); 255 return -ENOMEM; 256 } 257 258 if (sock->recv_pipe != NULL) { 259 /* Pull all of the data out of the old pipe */ 260 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 261 if (sbytes > sz) { 262 /* Too much data to fit into the new pipe size */ 263 spdk_pipe_destroy(new_pipe); 264 free(new_buf); 265 return -EINVAL; 266 } 267 268 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 269 assert(sbytes == sz); 270 271 bytes = spdk_iovcpy(siov, 2, diov, 2); 272 spdk_pipe_writer_advance(new_pipe, bytes); 273 274 spdk_pipe_destroy(sock->recv_pipe); 275 free(sock->recv_buf); 276 } 277 278 sock->recv_buf_sz = sz; 279 sock->recv_buf = new_buf; 280 sock->recv_pipe = new_pipe; 281 282 return 0; 283 } 284 285 static int 286 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 287 { 288 struct spdk_uring_sock *sock = __uring_sock(_sock); 289 int rc; 290 291 assert(sock != NULL); 292 293 if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) { 294 rc = uring_sock_alloc_pipe(sock, sz); 295 if (rc) { 296 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 297 return rc; 298 } 299 } 300 301 if (sz < MIN_SO_RCVBUF_SIZE) { 302 sz = MIN_SO_RCVBUF_SIZE; 303 } 304 305 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 306 if (rc < 0) { 307 return rc; 308 } 309 310 return 0; 311 } 312 313 static int 314 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 315 { 316 struct spdk_uring_sock *sock = __uring_sock(_sock); 317 int rc; 318 319 assert(sock != NULL); 320 321 if (sz < MIN_SO_SNDBUF_SIZE) { 322 sz = MIN_SO_SNDBUF_SIZE; 323 } 324 325 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 326 if (rc < 0) { 327 return rc; 328 } 329 330 return 0; 331 } 332 333 static struct spdk_uring_sock * 334 uring_sock_alloc(int fd) 335 { 336 struct spdk_uring_sock *sock; 337 #if defined(__linux__) 338 int flag; 339 int rc; 340 #endif 341 342 sock = calloc(1, sizeof(*sock)); 343 if (sock == NULL) { 344 SPDK_ERRLOG("sock allocation failed\n"); 345 return NULL; 346 } 347 348 sock->fd = fd; 349 350 #if defined(__linux__) 351 flag = 1; 352 353 if (g_spdk_uring_sock_impl_opts.enable_quickack) { 354 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 355 if (rc != 0) { 356 SPDK_ERRLOG("quickack was failed to set\n"); 357 } 358 } 359 #endif 360 return sock; 361 } 362 363 static struct spdk_sock * 364 uring_sock_create(const char *ip, int port, 365 enum uring_sock_create_type type, 366 struct spdk_sock_opts *opts) 367 { 368 struct spdk_uring_sock *sock; 369 char buf[MAX_TMPBUF]; 370 char portnum[PORTNUMLEN]; 371 char *p; 372 struct addrinfo hints, *res, *res0; 373 int fd, flag; 374 int val = 1; 375 int rc; 376 377 if (ip == NULL) { 378 return NULL; 379 } 380 if (ip[0] == '[') { 381 snprintf(buf, sizeof(buf), "%s", ip + 1); 382 p = strchr(buf, ']'); 383 if (p != NULL) { 384 *p = '\0'; 385 } 386 ip = (const char *) &buf[0]; 387 } 388 389 snprintf(portnum, sizeof portnum, "%d", port); 390 memset(&hints, 0, sizeof hints); 391 hints.ai_family = PF_UNSPEC; 392 hints.ai_socktype = SOCK_STREAM; 393 hints.ai_flags = AI_NUMERICSERV; 394 hints.ai_flags |= AI_PASSIVE; 395 hints.ai_flags |= AI_NUMERICHOST; 396 rc = getaddrinfo(ip, portnum, &hints, &res0); 397 if (rc != 0) { 398 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 399 return NULL; 400 } 401 402 /* try listen */ 403 fd = -1; 404 for (res = res0; res != NULL; res = res->ai_next) { 405 retry: 406 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 407 if (fd < 0) { 408 /* error */ 409 continue; 410 } 411 412 val = g_spdk_uring_sock_impl_opts.recv_buf_size; 413 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 414 if (rc) { 415 /* Not fatal */ 416 } 417 418 val = g_spdk_uring_sock_impl_opts.send_buf_size; 419 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 420 if (rc) { 421 /* Not fatal */ 422 } 423 424 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 425 if (rc != 0) { 426 close(fd); 427 /* error */ 428 continue; 429 } 430 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 431 if (rc != 0) { 432 close(fd); 433 /* error */ 434 continue; 435 } 436 437 #if defined(SO_PRIORITY) 438 if (opts != NULL && opts->priority) { 439 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 440 if (rc != 0) { 441 close(fd); 442 /* error */ 443 continue; 444 } 445 } 446 #endif 447 if (res->ai_family == AF_INET6) { 448 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 449 if (rc != 0) { 450 close(fd); 451 /* error */ 452 continue; 453 } 454 } 455 456 if (type == SPDK_SOCK_CREATE_LISTEN) { 457 rc = bind(fd, res->ai_addr, res->ai_addrlen); 458 if (rc != 0) { 459 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 460 switch (errno) { 461 case EINTR: 462 /* interrupted? */ 463 close(fd); 464 goto retry; 465 case EADDRNOTAVAIL: 466 SPDK_ERRLOG("IP address %s not available. " 467 "Verify IP address in config file " 468 "and make sure setup script is " 469 "run before starting spdk app.\n", ip); 470 /* FALLTHROUGH */ 471 default: 472 /* try next family */ 473 close(fd); 474 fd = -1; 475 continue; 476 } 477 } 478 /* bind OK */ 479 rc = listen(fd, 512); 480 if (rc != 0) { 481 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 482 close(fd); 483 fd = -1; 484 break; 485 } 486 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 487 rc = connect(fd, res->ai_addr, res->ai_addrlen); 488 if (rc != 0) { 489 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 490 /* try next family */ 491 close(fd); 492 fd = -1; 493 continue; 494 } 495 } 496 497 flag = fcntl(fd, F_GETFL); 498 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 499 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 500 close(fd); 501 fd = -1; 502 break; 503 } 504 break; 505 } 506 freeaddrinfo(res0); 507 508 if (fd < 0) { 509 return NULL; 510 } 511 512 sock = uring_sock_alloc(fd); 513 if (sock == NULL) { 514 SPDK_ERRLOG("sock allocation failed\n"); 515 close(fd); 516 return NULL; 517 } 518 519 return &sock->base; 520 } 521 522 static struct spdk_sock * 523 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 524 { 525 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 526 } 527 528 static struct spdk_sock * 529 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 530 { 531 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 532 } 533 534 static struct spdk_sock * 535 uring_sock_accept(struct spdk_sock *_sock) 536 { 537 struct spdk_uring_sock *sock = __uring_sock(_sock); 538 struct sockaddr_storage sa; 539 socklen_t salen; 540 int rc, fd; 541 struct spdk_uring_sock *new_sock; 542 int flag; 543 544 memset(&sa, 0, sizeof(sa)); 545 salen = sizeof(sa); 546 547 assert(sock != NULL); 548 549 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 550 551 if (rc == -1) { 552 return NULL; 553 } 554 555 fd = rc; 556 557 flag = fcntl(fd, F_GETFL); 558 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 559 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 560 close(fd); 561 return NULL; 562 } 563 564 #if defined(SO_PRIORITY) 565 /* The priority is not inherited, so call this function again */ 566 if (sock->base.opts.priority) { 567 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 568 if (rc != 0) { 569 close(fd); 570 return NULL; 571 } 572 } 573 #endif 574 575 new_sock = uring_sock_alloc(fd); 576 if (new_sock == NULL) { 577 close(fd); 578 return NULL; 579 } 580 581 return &new_sock->base; 582 } 583 584 static int 585 uring_sock_close(struct spdk_sock *_sock) 586 { 587 struct spdk_uring_sock *sock = __uring_sock(_sock); 588 int rc; 589 590 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 591 assert(sock->group == NULL); 592 593 spdk_pipe_destroy(sock->recv_pipe); 594 free(sock->recv_buf); 595 rc = close(sock->fd); 596 if (rc == 0) { 597 free(sock); 598 } 599 600 return rc; 601 } 602 603 static ssize_t 604 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 605 { 606 struct iovec siov[2]; 607 int sbytes; 608 ssize_t bytes; 609 struct spdk_uring_sock_group_impl *group; 610 611 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 612 if (sbytes < 0) { 613 errno = EINVAL; 614 return -1; 615 } else if (sbytes == 0) { 616 errno = EAGAIN; 617 return -1; 618 } 619 620 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 621 622 if (bytes == 0) { 623 /* The only way this happens is if diov is 0 length */ 624 errno = EINVAL; 625 return -1; 626 } 627 628 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 629 630 /* If we drained the pipe, take it off the level-triggered list */ 631 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 632 group = __uring_group_impl(sock->base.group_impl); 633 TAILQ_REMOVE(&group->pending_recv, sock, link); 634 sock->pending_recv = false; 635 } 636 637 return bytes; 638 } 639 640 static inline ssize_t 641 uring_sock_read(struct spdk_uring_sock *sock) 642 { 643 struct iovec iov[2]; 644 int bytes; 645 struct spdk_uring_sock_group_impl *group; 646 647 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 648 649 if (bytes > 0) { 650 bytes = readv(sock->fd, iov, 2); 651 if (bytes > 0) { 652 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 653 if (sock->base.group_impl) { 654 group = __uring_group_impl(sock->base.group_impl); 655 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 656 sock->pending_recv = true; 657 } 658 } 659 } 660 661 return bytes; 662 } 663 664 static ssize_t 665 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 666 { 667 struct spdk_uring_sock *sock = __uring_sock(_sock); 668 int rc, i; 669 size_t len; 670 671 if (sock->recv_pipe == NULL) { 672 return readv(sock->fd, iov, iovcnt); 673 } 674 675 len = 0; 676 for (i = 0; i < iovcnt; i++) { 677 len += iov[i].iov_len; 678 } 679 680 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 681 /* If the user is receiving a sufficiently large amount of data, 682 * receive directly to their buffers. */ 683 if (len >= MIN_SOCK_PIPE_SIZE) { 684 return readv(sock->fd, iov, iovcnt); 685 } 686 687 /* Otherwise, do a big read into our pipe */ 688 rc = uring_sock_read(sock); 689 if (rc <= 0) { 690 return rc; 691 } 692 } 693 694 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 695 } 696 697 static ssize_t 698 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 699 { 700 struct iovec iov[1]; 701 702 iov[0].iov_base = buf; 703 iov[0].iov_len = len; 704 705 return uring_sock_readv(sock, iov, 1); 706 } 707 708 static ssize_t 709 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 710 { 711 struct spdk_uring_sock *sock = __uring_sock(_sock); 712 713 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 714 errno = EAGAIN; 715 return -1; 716 } 717 718 return writev(sock->fd, iov, iovcnt); 719 } 720 721 static int 722 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) 723 { 724 struct spdk_sock_request *req; 725 int i, retval; 726 unsigned int offset; 727 size_t len; 728 729 /* Consume the requests that were actually written */ 730 req = TAILQ_FIRST(&_sock->queued_reqs); 731 while (req) { 732 offset = req->internal.offset; 733 734 for (i = 0; i < req->iovcnt; i++) { 735 /* Advance by the offset first */ 736 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 737 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 738 continue; 739 } 740 741 /* Calculate the remaining length of this element */ 742 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 743 744 if (len > (size_t)rc) { 745 /* This element was partially sent. */ 746 req->internal.offset += rc; 747 return 0; 748 } 749 750 offset = 0; 751 req->internal.offset += len; 752 rc -= len; 753 } 754 755 /* Handled a full request. */ 756 spdk_sock_request_pend(_sock, req); 757 758 retval = spdk_sock_request_put(_sock, req, 0); 759 if (retval) { 760 return retval; 761 } 762 763 if (rc == 0) { 764 break; 765 } 766 767 req = TAILQ_FIRST(&_sock->queued_reqs); 768 } 769 770 return 0; 771 } 772 773 static void 774 _sock_flush(struct spdk_sock *_sock) 775 { 776 struct spdk_uring_sock *sock = __uring_sock(_sock); 777 struct spdk_uring_task *task = &sock->write_task; 778 uint32_t iovcnt; 779 struct io_uring_sqe *sqe; 780 781 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 782 return; 783 } 784 785 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); 786 if (!iovcnt) { 787 return; 788 } 789 790 task->iov_cnt = iovcnt; 791 assert(sock->group != NULL); 792 task->msg.msg_iov = task->iovs; 793 task->msg.msg_iovlen = task->iov_cnt; 794 795 sock->group->io_queued++; 796 797 sqe = io_uring_get_sqe(&sock->group->uring); 798 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0); 799 io_uring_sqe_set_data(sqe, task); 800 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 801 } 802 803 static void 804 _sock_prep_pollin(struct spdk_sock *_sock) 805 { 806 struct spdk_uring_sock *sock = __uring_sock(_sock); 807 struct spdk_uring_task *task = &sock->pollin_task; 808 struct io_uring_sqe *sqe; 809 810 /* Do not prepare pollin event */ 811 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { 812 return; 813 } 814 815 assert(sock->group != NULL); 816 sock->group->io_queued++; 817 818 sqe = io_uring_get_sqe(&sock->group->uring); 819 io_uring_prep_poll_add(sqe, sock->fd, POLLIN); 820 io_uring_sqe_set_data(sqe, task); 821 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 822 } 823 824 static void 825 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 826 { 827 struct spdk_uring_sock *sock = __uring_sock(_sock); 828 struct spdk_uring_task *task = &sock->cancel_task; 829 struct io_uring_sqe *sqe; 830 831 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 832 return; 833 } 834 835 assert(sock->group != NULL); 836 sock->group->io_queued++; 837 838 sqe = io_uring_get_sqe(&sock->group->uring); 839 io_uring_prep_cancel(sqe, user_data, 0); 840 io_uring_sqe_set_data(sqe, task); 841 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 842 } 843 844 static int 845 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 846 struct spdk_sock **socks) 847 { 848 int i, count, ret; 849 struct io_uring_cqe *cqe; 850 struct spdk_uring_sock *sock, *tmp; 851 struct spdk_uring_task *task; 852 int status; 853 854 for (i = 0; i < max; i++) { 855 ret = io_uring_peek_cqe(&group->uring, &cqe); 856 if (ret != 0) { 857 break; 858 } 859 860 if (cqe == NULL) { 861 break; 862 } 863 864 task = (struct spdk_uring_task *)cqe->user_data; 865 assert(task != NULL); 866 sock = task->sock; 867 assert(sock != NULL); 868 assert(sock->group != NULL); 869 assert(sock->group == group); 870 sock->group->io_inflight--; 871 sock->group->io_avail++; 872 status = cqe->res; 873 io_uring_cqe_seen(&group->uring, cqe); 874 875 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 876 877 if (spdk_unlikely(status <= 0)) { 878 if (status == -EAGAIN || status == -EWOULDBLOCK) { 879 continue; 880 } 881 } 882 883 switch (task->type) { 884 case SPDK_SOCK_TASK_POLLIN: 885 if ((status & POLLIN) == POLLIN) { 886 if (sock->base.cb_fn != NULL) { 887 assert(sock->pending_recv == false); 888 sock->pending_recv = true; 889 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 890 } 891 } 892 break; 893 case SPDK_SOCK_TASK_WRITE: 894 assert(TAILQ_EMPTY(&sock->base.pending_reqs)); 895 task->last_req = NULL; 896 task->iov_cnt = 0; 897 if (spdk_unlikely(status) < 0) { 898 sock->connection_status = status; 899 spdk_sock_abort_requests(&sock->base); 900 } else { 901 sock_complete_reqs(&sock->base, status); 902 } 903 904 break; 905 case SPDK_SOCK_TASK_CANCEL: 906 /* Do nothing */ 907 break; 908 default: 909 SPDK_UNREACHABLE(); 910 } 911 } 912 913 if (!socks) { 914 return 0; 915 } 916 count = 0; 917 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 918 if (count == max_read_events) { 919 break; 920 } 921 922 socks[count++] = &sock->base; 923 } 924 925 /* Cycle the pending_recv list so that each time we poll things aren't 926 * in the same order. */ 927 for (i = 0; i < count; i++) { 928 sock = __uring_sock(socks[i]); 929 930 TAILQ_REMOVE(&group->pending_recv, sock, link); 931 932 if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 933 sock->pending_recv = false; 934 } else { 935 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 936 } 937 } 938 939 return count; 940 } 941 942 static int 943 _sock_flush_client(struct spdk_sock *_sock) 944 { 945 struct spdk_uring_sock *sock = __uring_sock(_sock); 946 struct msghdr msg = {}; 947 struct iovec iovs[IOV_BATCH_SIZE]; 948 int iovcnt; 949 ssize_t rc; 950 951 /* Can't flush from within a callback or we end up with recursive calls */ 952 if (_sock->cb_cnt > 0) { 953 return 0; 954 } 955 956 /* Gather an iov */ 957 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL); 958 if (iovcnt == 0) { 959 return 0; 960 } 961 962 /* Perform the vectored write */ 963 msg.msg_iov = iovs; 964 msg.msg_iovlen = iovcnt; 965 rc = sendmsg(sock->fd, &msg, 0); 966 if (rc <= 0) { 967 if (errno == EAGAIN || errno == EWOULDBLOCK) { 968 return 0; 969 } 970 return rc; 971 } 972 973 sock_complete_reqs(_sock, rc); 974 975 return 0; 976 } 977 978 static void 979 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 980 { 981 struct spdk_uring_sock *sock = __uring_sock(_sock); 982 int rc; 983 984 if (spdk_unlikely(sock->connection_status)) { 985 req->cb_fn(req->cb_arg, sock->connection_status); 986 return; 987 } 988 989 spdk_sock_request_queue(_sock, req); 990 991 if (!sock->group) { 992 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 993 rc = _sock_flush_client(_sock); 994 if (rc) { 995 spdk_sock_abort_requests(_sock); 996 } 997 } 998 } 999 } 1000 1001 static int 1002 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1003 { 1004 struct spdk_uring_sock *sock = __uring_sock(_sock); 1005 int val; 1006 int rc; 1007 1008 assert(sock != NULL); 1009 1010 val = nbytes; 1011 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1012 if (rc != 0) { 1013 return -1; 1014 } 1015 return 0; 1016 } 1017 1018 static bool 1019 uring_sock_is_ipv6(struct spdk_sock *_sock) 1020 { 1021 struct spdk_uring_sock *sock = __uring_sock(_sock); 1022 struct sockaddr_storage sa; 1023 socklen_t salen; 1024 int rc; 1025 1026 assert(sock != NULL); 1027 1028 memset(&sa, 0, sizeof sa); 1029 salen = sizeof sa; 1030 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1031 if (rc != 0) { 1032 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1033 return false; 1034 } 1035 1036 return (sa.ss_family == AF_INET6); 1037 } 1038 1039 static bool 1040 uring_sock_is_ipv4(struct spdk_sock *_sock) 1041 { 1042 struct spdk_uring_sock *sock = __uring_sock(_sock); 1043 struct sockaddr_storage sa; 1044 socklen_t salen; 1045 int rc; 1046 1047 assert(sock != NULL); 1048 1049 memset(&sa, 0, sizeof sa); 1050 salen = sizeof sa; 1051 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1052 if (rc != 0) { 1053 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1054 return false; 1055 } 1056 1057 return (sa.ss_family == AF_INET); 1058 } 1059 1060 static bool 1061 uring_sock_is_connected(struct spdk_sock *_sock) 1062 { 1063 struct spdk_uring_sock *sock = __uring_sock(_sock); 1064 uint8_t byte; 1065 int rc; 1066 1067 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1068 if (rc == 0) { 1069 return false; 1070 } 1071 1072 if (rc < 0) { 1073 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1074 return true; 1075 } 1076 1077 return false; 1078 } 1079 1080 return true; 1081 } 1082 1083 static int 1084 uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) 1085 { 1086 int rc = -1; 1087 1088 if (!g_spdk_uring_sock_impl_opts.enable_placement_id) { 1089 return rc; 1090 } 1091 1092 #if defined(SO_INCOMING_NAPI_ID) 1093 struct spdk_uring_sock *sock = __uring_sock(_sock); 1094 socklen_t salen = sizeof(int); 1095 1096 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); 1097 if (rc != 0) { 1098 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); 1099 } 1100 1101 #endif 1102 return rc; 1103 } 1104 1105 static struct spdk_sock_group_impl * 1106 uring_sock_group_impl_create(void) 1107 { 1108 struct spdk_uring_sock_group_impl *group_impl; 1109 1110 group_impl = calloc(1, sizeof(*group_impl)); 1111 if (group_impl == NULL) { 1112 SPDK_ERRLOG("group_impl allocation failed\n"); 1113 return NULL; 1114 } 1115 1116 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1117 1118 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1119 SPDK_ERRLOG("uring I/O context setup failure\n"); 1120 free(group_impl); 1121 return NULL; 1122 } 1123 1124 TAILQ_INIT(&group_impl->pending_recv); 1125 1126 return &group_impl->base; 1127 } 1128 1129 static int 1130 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1131 struct spdk_sock *_sock) 1132 { 1133 struct spdk_uring_sock *sock = __uring_sock(_sock); 1134 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1135 1136 sock->group = group; 1137 sock->write_task.sock = sock; 1138 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1139 1140 sock->pollin_task.sock = sock; 1141 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1142 1143 sock->cancel_task.sock = sock; 1144 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1145 1146 /* switched from another polling group due to scheduling */ 1147 if (spdk_unlikely(sock->recv_pipe != NULL && 1148 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1149 assert(sock->pending_recv == false); 1150 sock->pending_recv = true; 1151 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1152 } 1153 1154 return 0; 1155 } 1156 1157 static int 1158 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1159 struct spdk_sock **socks) 1160 { 1161 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1162 int count, ret; 1163 int to_complete, to_submit; 1164 struct spdk_sock *_sock, *tmp; 1165 struct spdk_uring_sock *sock; 1166 1167 if (spdk_likely(socks)) { 1168 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1169 sock = __uring_sock(_sock); 1170 if (spdk_unlikely(sock->connection_status)) { 1171 continue; 1172 } 1173 _sock_flush(_sock); 1174 _sock_prep_pollin(_sock); 1175 } 1176 } 1177 1178 to_submit = group->io_queued; 1179 1180 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1181 if (to_submit > 0) { 1182 /* If there are I/O to submit, use io_uring_submit here. 1183 * It will automatically call io_uring_enter appropriately. */ 1184 ret = io_uring_submit(&group->uring); 1185 if (ret < 0) { 1186 return 1; 1187 } 1188 group->io_queued = 0; 1189 group->io_inflight += to_submit; 1190 group->io_avail -= to_submit; 1191 } 1192 1193 count = 0; 1194 to_complete = group->io_inflight; 1195 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1196 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1197 } 1198 1199 return count; 1200 } 1201 1202 static int 1203 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1204 struct spdk_sock *_sock) 1205 { 1206 struct spdk_uring_sock *sock = __uring_sock(_sock); 1207 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1208 1209 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1210 _sock_prep_cancel_task(_sock, &sock->write_task); 1211 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1212 * currently can use a while loop here. */ 1213 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1214 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1215 uring_sock_group_impl_poll(_group, 32, NULL); 1216 } 1217 } 1218 1219 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1220 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1221 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1222 * currently can use a while loop here. */ 1223 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1224 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1225 uring_sock_group_impl_poll(_group, 32, NULL); 1226 } 1227 } 1228 1229 if (sock->pending_recv) { 1230 TAILQ_REMOVE(&group->pending_recv, sock, link); 1231 sock->pending_recv = false; 1232 } 1233 assert(sock->pending_recv == false); 1234 1235 sock->group = NULL; 1236 return 0; 1237 } 1238 1239 static int 1240 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1241 { 1242 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1243 1244 /* try to reap all the active I/O */ 1245 while (group->io_inflight) { 1246 uring_sock_group_impl_poll(_group, 32, NULL); 1247 } 1248 assert(group->io_inflight == 0); 1249 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1250 1251 io_uring_queue_exit(&group->uring); 1252 1253 free(group); 1254 return 0; 1255 } 1256 1257 static int 1258 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1259 { 1260 if (!opts || !len) { 1261 errno = EINVAL; 1262 return -1; 1263 } 1264 memset(opts, 0, *len); 1265 1266 #define FIELD_OK(field) \ 1267 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1268 1269 #define GET_FIELD(field) \ 1270 if (FIELD_OK(field)) { \ 1271 opts->field = g_spdk_uring_sock_impl_opts.field; \ 1272 } 1273 1274 GET_FIELD(recv_buf_size); 1275 GET_FIELD(send_buf_size); 1276 GET_FIELD(enable_recv_pipe); 1277 GET_FIELD(enable_quickack); 1278 GET_FIELD(enable_placement_id); 1279 1280 #undef GET_FIELD 1281 #undef FIELD_OK 1282 1283 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 1284 return 0; 1285 } 1286 1287 static int 1288 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1289 { 1290 if (!opts) { 1291 errno = EINVAL; 1292 return -1; 1293 } 1294 1295 #define FIELD_OK(field) \ 1296 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1297 1298 #define SET_FIELD(field) \ 1299 if (FIELD_OK(field)) { \ 1300 g_spdk_uring_sock_impl_opts.field = opts->field; \ 1301 } 1302 1303 SET_FIELD(recv_buf_size); 1304 SET_FIELD(send_buf_size); 1305 SET_FIELD(enable_recv_pipe); 1306 SET_FIELD(enable_quickack); 1307 SET_FIELD(enable_placement_id); 1308 1309 #undef SET_FIELD 1310 #undef FIELD_OK 1311 1312 return 0; 1313 } 1314 1315 static int 1316 uring_sock_flush(struct spdk_sock *_sock) 1317 { 1318 struct spdk_uring_sock *sock = __uring_sock(_sock); 1319 1320 if (!sock->group) { 1321 return _sock_flush_client(_sock); 1322 } 1323 1324 return 0; 1325 } 1326 1327 static struct spdk_net_impl g_uring_net_impl = { 1328 .name = "uring", 1329 .getaddr = uring_sock_getaddr, 1330 .connect = uring_sock_connect, 1331 .listen = uring_sock_listen, 1332 .accept = uring_sock_accept, 1333 .close = uring_sock_close, 1334 .recv = uring_sock_recv, 1335 .readv = uring_sock_readv, 1336 .writev = uring_sock_writev, 1337 .writev_async = uring_sock_writev_async, 1338 .flush = uring_sock_flush, 1339 .set_recvlowat = uring_sock_set_recvlowat, 1340 .set_recvbuf = uring_sock_set_recvbuf, 1341 .set_sendbuf = uring_sock_set_sendbuf, 1342 .is_ipv6 = uring_sock_is_ipv6, 1343 .is_ipv4 = uring_sock_is_ipv4, 1344 .is_connected = uring_sock_is_connected, 1345 .get_placement_id = uring_sock_get_placement_id, 1346 .group_impl_create = uring_sock_group_impl_create, 1347 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1348 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1349 .group_impl_poll = uring_sock_group_impl_poll, 1350 .group_impl_close = uring_sock_group_impl_close, 1351 .get_opts = uring_sock_impl_get_opts, 1352 .set_opts = uring_sock_impl_set_opts, 1353 }; 1354 1355 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); 1356