1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/config.h" 36 37 #include <sys/epoll.h> 38 #include <liburing.h> 39 40 #include "spdk/barrier.h" 41 #include "spdk/env.h" 42 #include "spdk/log.h" 43 #include "spdk/pipe.h" 44 #include "spdk/sock.h" 45 #include "spdk/string.h" 46 #include "spdk/util.h" 47 48 #include "spdk_internal/sock.h" 49 #include "spdk_internal/assert.h" 50 51 #define MAX_TMPBUF 1024 52 #define PORTNUMLEN 32 53 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 54 55 enum spdk_sock_task_type { 56 SPDK_SOCK_TASK_POLLIN = 0, 57 SPDK_SOCK_TASK_WRITE, 58 SPDK_SOCK_TASK_CANCEL, 59 }; 60 61 enum spdk_uring_sock_task_status { 62 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 63 SPDK_URING_SOCK_TASK_IN_PROCESS, 64 }; 65 66 struct spdk_uring_task { 67 enum spdk_uring_sock_task_status status; 68 enum spdk_sock_task_type type; 69 struct spdk_uring_sock *sock; 70 struct msghdr msg; 71 struct iovec iovs[IOV_BATCH_SIZE]; 72 int iov_cnt; 73 struct spdk_sock_request *last_req; 74 STAILQ_ENTRY(spdk_uring_task) link; 75 }; 76 77 struct spdk_uring_sock { 78 struct spdk_sock base; 79 int fd; 80 struct spdk_uring_sock_group_impl *group; 81 struct spdk_uring_task write_task; 82 struct spdk_uring_task pollin_task; 83 struct spdk_uring_task cancel_task; 84 struct spdk_pipe *recv_pipe; 85 void *recv_buf; 86 int recv_buf_sz; 87 bool pending_recv; 88 int connection_status; 89 int placement_id; 90 TAILQ_ENTRY(spdk_uring_sock) link; 91 }; 92 93 struct spdk_uring_sock_group_impl { 94 struct spdk_sock_group_impl base; 95 struct io_uring uring; 96 uint32_t io_inflight; 97 uint32_t io_queued; 98 uint32_t io_avail; 99 TAILQ_HEAD(, spdk_uring_sock) pending_recv; 100 }; 101 102 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 103 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 104 .send_buf_size = MIN_SO_SNDBUF_SIZE, 105 .enable_recv_pipe = true, 106 .enable_quickack = false, 107 .enable_placement_id = PLACEMENT_NONE, 108 }; 109 110 static struct spdk_sock_map g_map = { 111 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 112 .mtx = PTHREAD_MUTEX_INITIALIZER 113 }; 114 115 __attribute((destructor)) static void 116 uring_sock_map_cleanup(void) 117 { 118 spdk_sock_map_cleanup(&g_map); 119 } 120 121 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 122 123 static int 124 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 125 { 126 const char *result = NULL; 127 128 if (sa == NULL || host == NULL) { 129 return -1; 130 } 131 132 switch (sa->sa_family) { 133 case AF_INET: 134 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 135 host, hlen); 136 break; 137 case AF_INET6: 138 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 139 host, hlen); 140 break; 141 default: 142 break; 143 } 144 145 if (result != NULL) { 146 return 0; 147 } else { 148 return -1; 149 } 150 } 151 152 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 153 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 154 155 static int 156 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 157 char *caddr, int clen, uint16_t *cport) 158 { 159 struct spdk_uring_sock *sock = __uring_sock(_sock); 160 struct sockaddr_storage sa; 161 socklen_t salen; 162 int rc; 163 164 assert(sock != NULL); 165 166 memset(&sa, 0, sizeof sa); 167 salen = sizeof sa; 168 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 169 if (rc != 0) { 170 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 171 return -1; 172 } 173 174 switch (sa.ss_family) { 175 case AF_UNIX: 176 /* Acceptable connection types that don't have IPs */ 177 return 0; 178 case AF_INET: 179 case AF_INET6: 180 /* Code below will get IP addresses */ 181 break; 182 default: 183 /* Unsupported socket family */ 184 return -1; 185 } 186 187 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 188 if (rc != 0) { 189 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 190 return -1; 191 } 192 193 if (sport) { 194 if (sa.ss_family == AF_INET) { 195 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 196 } else if (sa.ss_family == AF_INET6) { 197 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 198 } 199 } 200 201 memset(&sa, 0, sizeof sa); 202 salen = sizeof sa; 203 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 204 if (rc != 0) { 205 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 206 return -1; 207 } 208 209 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 210 if (rc != 0) { 211 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 212 return -1; 213 } 214 215 if (cport) { 216 if (sa.ss_family == AF_INET) { 217 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 218 } else if (sa.ss_family == AF_INET6) { 219 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 220 } 221 } 222 223 return 0; 224 } 225 226 enum uring_sock_create_type { 227 SPDK_SOCK_CREATE_LISTEN, 228 SPDK_SOCK_CREATE_CONNECT, 229 }; 230 231 static int 232 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 233 { 234 uint8_t *new_buf; 235 struct spdk_pipe *new_pipe; 236 struct iovec siov[2]; 237 struct iovec diov[2]; 238 int sbytes; 239 ssize_t bytes; 240 241 if (sock->recv_buf_sz == sz) { 242 return 0; 243 } 244 245 /* If the new size is 0, just free the pipe */ 246 if (sz == 0) { 247 spdk_pipe_destroy(sock->recv_pipe); 248 free(sock->recv_buf); 249 sock->recv_pipe = NULL; 250 sock->recv_buf = NULL; 251 return 0; 252 } else if (sz < MIN_SOCK_PIPE_SIZE) { 253 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 254 return -1; 255 } 256 257 /* Round up to next 64 byte multiple */ 258 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 259 if (!new_buf) { 260 SPDK_ERRLOG("socket recv buf allocation failed\n"); 261 return -ENOMEM; 262 } 263 264 new_pipe = spdk_pipe_create(new_buf, sz + 1); 265 if (new_pipe == NULL) { 266 SPDK_ERRLOG("socket pipe allocation failed\n"); 267 free(new_buf); 268 return -ENOMEM; 269 } 270 271 if (sock->recv_pipe != NULL) { 272 /* Pull all of the data out of the old pipe */ 273 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 274 if (sbytes > sz) { 275 /* Too much data to fit into the new pipe size */ 276 spdk_pipe_destroy(new_pipe); 277 free(new_buf); 278 return -EINVAL; 279 } 280 281 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 282 assert(sbytes == sz); 283 284 bytes = spdk_iovcpy(siov, 2, diov, 2); 285 spdk_pipe_writer_advance(new_pipe, bytes); 286 287 spdk_pipe_destroy(sock->recv_pipe); 288 free(sock->recv_buf); 289 } 290 291 sock->recv_buf_sz = sz; 292 sock->recv_buf = new_buf; 293 sock->recv_pipe = new_pipe; 294 295 return 0; 296 } 297 298 static int 299 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 300 { 301 struct spdk_uring_sock *sock = __uring_sock(_sock); 302 int rc; 303 304 assert(sock != NULL); 305 306 if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) { 307 rc = uring_sock_alloc_pipe(sock, sz); 308 if (rc) { 309 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 310 return rc; 311 } 312 } 313 314 if (sz < MIN_SO_RCVBUF_SIZE) { 315 sz = MIN_SO_RCVBUF_SIZE; 316 } 317 318 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 319 if (rc < 0) { 320 return rc; 321 } 322 323 return 0; 324 } 325 326 static int 327 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 328 { 329 struct spdk_uring_sock *sock = __uring_sock(_sock); 330 int rc; 331 332 assert(sock != NULL); 333 334 if (sz < MIN_SO_SNDBUF_SIZE) { 335 sz = MIN_SO_SNDBUF_SIZE; 336 } 337 338 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 339 if (rc < 0) { 340 return rc; 341 } 342 343 return 0; 344 } 345 346 static struct spdk_uring_sock * 347 uring_sock_alloc(int fd) 348 { 349 struct spdk_uring_sock *sock; 350 #if defined(__linux__) 351 int flag; 352 int rc; 353 #endif 354 355 sock = calloc(1, sizeof(*sock)); 356 if (sock == NULL) { 357 SPDK_ERRLOG("sock allocation failed\n"); 358 return NULL; 359 } 360 361 sock->fd = fd; 362 363 #if defined(__linux__) 364 flag = 1; 365 366 if (g_spdk_uring_sock_impl_opts.enable_quickack) { 367 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 368 if (rc != 0) { 369 SPDK_ERRLOG("quickack was failed to set\n"); 370 } 371 } 372 373 spdk_sock_get_placement_id(sock->fd, g_spdk_uring_sock_impl_opts.enable_placement_id, 374 &sock->placement_id); 375 #endif 376 377 return sock; 378 } 379 380 static struct spdk_sock * 381 uring_sock_create(const char *ip, int port, 382 enum uring_sock_create_type type, 383 struct spdk_sock_opts *opts) 384 { 385 struct spdk_uring_sock *sock; 386 char buf[MAX_TMPBUF]; 387 char portnum[PORTNUMLEN]; 388 char *p; 389 struct addrinfo hints, *res, *res0; 390 int fd, flag; 391 int val = 1; 392 int rc; 393 394 if (ip == NULL) { 395 return NULL; 396 } 397 if (ip[0] == '[') { 398 snprintf(buf, sizeof(buf), "%s", ip + 1); 399 p = strchr(buf, ']'); 400 if (p != NULL) { 401 *p = '\0'; 402 } 403 ip = (const char *) &buf[0]; 404 } 405 406 snprintf(portnum, sizeof portnum, "%d", port); 407 memset(&hints, 0, sizeof hints); 408 hints.ai_family = PF_UNSPEC; 409 hints.ai_socktype = SOCK_STREAM; 410 hints.ai_flags = AI_NUMERICSERV; 411 hints.ai_flags |= AI_PASSIVE; 412 hints.ai_flags |= AI_NUMERICHOST; 413 rc = getaddrinfo(ip, portnum, &hints, &res0); 414 if (rc != 0) { 415 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 416 return NULL; 417 } 418 419 /* try listen */ 420 fd = -1; 421 for (res = res0; res != NULL; res = res->ai_next) { 422 retry: 423 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 424 if (fd < 0) { 425 /* error */ 426 continue; 427 } 428 429 val = g_spdk_uring_sock_impl_opts.recv_buf_size; 430 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 431 if (rc) { 432 /* Not fatal */ 433 } 434 435 val = g_spdk_uring_sock_impl_opts.send_buf_size; 436 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 437 if (rc) { 438 /* Not fatal */ 439 } 440 441 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 442 if (rc != 0) { 443 close(fd); 444 fd = -1; 445 /* error */ 446 continue; 447 } 448 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 449 if (rc != 0) { 450 close(fd); 451 fd = -1; 452 /* error */ 453 continue; 454 } 455 456 #if defined(SO_PRIORITY) 457 if (opts != NULL && opts->priority) { 458 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 459 if (rc != 0) { 460 close(fd); 461 fd = -1; 462 /* error */ 463 continue; 464 } 465 } 466 #endif 467 if (res->ai_family == AF_INET6) { 468 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 469 if (rc != 0) { 470 close(fd); 471 fd = -1; 472 /* error */ 473 continue; 474 } 475 } 476 477 if (type == SPDK_SOCK_CREATE_LISTEN) { 478 rc = bind(fd, res->ai_addr, res->ai_addrlen); 479 if (rc != 0) { 480 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 481 switch (errno) { 482 case EINTR: 483 /* interrupted? */ 484 close(fd); 485 goto retry; 486 case EADDRNOTAVAIL: 487 SPDK_ERRLOG("IP address %s not available. " 488 "Verify IP address in config file " 489 "and make sure setup script is " 490 "run before starting spdk app.\n", ip); 491 /* FALLTHROUGH */ 492 default: 493 /* try next family */ 494 close(fd); 495 fd = -1; 496 continue; 497 } 498 } 499 /* bind OK */ 500 rc = listen(fd, 512); 501 if (rc != 0) { 502 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 503 close(fd); 504 fd = -1; 505 break; 506 } 507 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 508 rc = connect(fd, res->ai_addr, res->ai_addrlen); 509 if (rc != 0) { 510 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 511 /* try next family */ 512 close(fd); 513 fd = -1; 514 continue; 515 } 516 } 517 518 flag = fcntl(fd, F_GETFL); 519 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 520 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 521 close(fd); 522 fd = -1; 523 break; 524 } 525 break; 526 } 527 freeaddrinfo(res0); 528 529 if (fd < 0) { 530 return NULL; 531 } 532 533 sock = uring_sock_alloc(fd); 534 if (sock == NULL) { 535 SPDK_ERRLOG("sock allocation failed\n"); 536 close(fd); 537 return NULL; 538 } 539 540 return &sock->base; 541 } 542 543 static struct spdk_sock * 544 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 545 { 546 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 547 } 548 549 static struct spdk_sock * 550 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 551 { 552 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 553 } 554 555 static struct spdk_sock * 556 uring_sock_accept(struct spdk_sock *_sock) 557 { 558 struct spdk_uring_sock *sock = __uring_sock(_sock); 559 struct sockaddr_storage sa; 560 socklen_t salen; 561 int rc, fd; 562 struct spdk_uring_sock *new_sock; 563 int flag; 564 565 memset(&sa, 0, sizeof(sa)); 566 salen = sizeof(sa); 567 568 assert(sock != NULL); 569 570 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 571 572 if (rc == -1) { 573 return NULL; 574 } 575 576 fd = rc; 577 578 flag = fcntl(fd, F_GETFL); 579 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 580 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 581 close(fd); 582 return NULL; 583 } 584 585 #if defined(SO_PRIORITY) 586 /* The priority is not inherited, so call this function again */ 587 if (sock->base.opts.priority) { 588 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 589 if (rc != 0) { 590 close(fd); 591 return NULL; 592 } 593 } 594 #endif 595 596 new_sock = uring_sock_alloc(fd); 597 if (new_sock == NULL) { 598 close(fd); 599 return NULL; 600 } 601 602 return &new_sock->base; 603 } 604 605 static int 606 uring_sock_close(struct spdk_sock *_sock) 607 { 608 struct spdk_uring_sock *sock = __uring_sock(_sock); 609 610 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 611 assert(sock->group == NULL); 612 613 /* If the socket fails to close, the best choice is to 614 * leak the fd but continue to free the rest of the sock 615 * memory. */ 616 close(sock->fd); 617 618 spdk_pipe_destroy(sock->recv_pipe); 619 free(sock->recv_buf); 620 free(sock); 621 622 return 0; 623 } 624 625 static ssize_t 626 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 627 { 628 struct iovec siov[2]; 629 int sbytes; 630 ssize_t bytes; 631 struct spdk_uring_sock_group_impl *group; 632 633 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 634 if (sbytes < 0) { 635 errno = EINVAL; 636 return -1; 637 } else if (sbytes == 0) { 638 errno = EAGAIN; 639 return -1; 640 } 641 642 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 643 644 if (bytes == 0) { 645 /* The only way this happens is if diov is 0 length */ 646 errno = EINVAL; 647 return -1; 648 } 649 650 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 651 652 /* If we drained the pipe, take it off the level-triggered list */ 653 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 654 group = __uring_group_impl(sock->base.group_impl); 655 TAILQ_REMOVE(&group->pending_recv, sock, link); 656 sock->pending_recv = false; 657 } 658 659 return bytes; 660 } 661 662 static inline ssize_t 663 uring_sock_read(struct spdk_uring_sock *sock) 664 { 665 struct iovec iov[2]; 666 int bytes; 667 struct spdk_uring_sock_group_impl *group; 668 669 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 670 671 if (bytes > 0) { 672 bytes = readv(sock->fd, iov, 2); 673 if (bytes > 0) { 674 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 675 if (sock->base.group_impl) { 676 group = __uring_group_impl(sock->base.group_impl); 677 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 678 sock->pending_recv = true; 679 } 680 } 681 } 682 683 return bytes; 684 } 685 686 static ssize_t 687 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 688 { 689 struct spdk_uring_sock *sock = __uring_sock(_sock); 690 int rc, i; 691 size_t len; 692 693 if (sock->recv_pipe == NULL) { 694 return readv(sock->fd, iov, iovcnt); 695 } 696 697 len = 0; 698 for (i = 0; i < iovcnt; i++) { 699 len += iov[i].iov_len; 700 } 701 702 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 703 /* If the user is receiving a sufficiently large amount of data, 704 * receive directly to their buffers. */ 705 if (len >= MIN_SOCK_PIPE_SIZE) { 706 return readv(sock->fd, iov, iovcnt); 707 } 708 709 /* Otherwise, do a big read into our pipe */ 710 rc = uring_sock_read(sock); 711 if (rc <= 0) { 712 return rc; 713 } 714 } 715 716 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 717 } 718 719 static ssize_t 720 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 721 { 722 struct iovec iov[1]; 723 724 iov[0].iov_base = buf; 725 iov[0].iov_len = len; 726 727 return uring_sock_readv(sock, iov, 1); 728 } 729 730 static ssize_t 731 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 732 { 733 struct spdk_uring_sock *sock = __uring_sock(_sock); 734 735 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 736 errno = EAGAIN; 737 return -1; 738 } 739 740 return writev(sock->fd, iov, iovcnt); 741 } 742 743 static int 744 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) 745 { 746 struct spdk_sock_request *req; 747 int i, retval; 748 unsigned int offset; 749 size_t len; 750 751 /* Consume the requests that were actually written */ 752 req = TAILQ_FIRST(&_sock->queued_reqs); 753 while (req) { 754 offset = req->internal.offset; 755 756 for (i = 0; i < req->iovcnt; i++) { 757 /* Advance by the offset first */ 758 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 759 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 760 continue; 761 } 762 763 /* Calculate the remaining length of this element */ 764 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 765 766 if (len > (size_t)rc) { 767 /* This element was partially sent. */ 768 req->internal.offset += rc; 769 return 0; 770 } 771 772 offset = 0; 773 req->internal.offset += len; 774 rc -= len; 775 } 776 777 /* Handled a full request. */ 778 spdk_sock_request_pend(_sock, req); 779 780 retval = spdk_sock_request_put(_sock, req, 0); 781 if (retval) { 782 return retval; 783 } 784 785 if (rc == 0) { 786 break; 787 } 788 789 req = TAILQ_FIRST(&_sock->queued_reqs); 790 } 791 792 return 0; 793 } 794 795 static void 796 _sock_flush(struct spdk_sock *_sock) 797 { 798 struct spdk_uring_sock *sock = __uring_sock(_sock); 799 struct spdk_uring_task *task = &sock->write_task; 800 uint32_t iovcnt; 801 struct io_uring_sqe *sqe; 802 803 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 804 return; 805 } 806 807 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); 808 if (!iovcnt) { 809 return; 810 } 811 812 task->iov_cnt = iovcnt; 813 assert(sock->group != NULL); 814 task->msg.msg_iov = task->iovs; 815 task->msg.msg_iovlen = task->iov_cnt; 816 817 sock->group->io_queued++; 818 819 sqe = io_uring_get_sqe(&sock->group->uring); 820 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0); 821 io_uring_sqe_set_data(sqe, task); 822 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 823 } 824 825 static void 826 _sock_prep_pollin(struct spdk_sock *_sock) 827 { 828 struct spdk_uring_sock *sock = __uring_sock(_sock); 829 struct spdk_uring_task *task = &sock->pollin_task; 830 struct io_uring_sqe *sqe; 831 832 /* Do not prepare pollin event */ 833 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { 834 return; 835 } 836 837 assert(sock->group != NULL); 838 sock->group->io_queued++; 839 840 sqe = io_uring_get_sqe(&sock->group->uring); 841 io_uring_prep_poll_add(sqe, sock->fd, POLLIN); 842 io_uring_sqe_set_data(sqe, task); 843 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 844 } 845 846 static void 847 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 848 { 849 struct spdk_uring_sock *sock = __uring_sock(_sock); 850 struct spdk_uring_task *task = &sock->cancel_task; 851 struct io_uring_sqe *sqe; 852 853 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 854 return; 855 } 856 857 assert(sock->group != NULL); 858 sock->group->io_queued++; 859 860 sqe = io_uring_get_sqe(&sock->group->uring); 861 io_uring_prep_cancel(sqe, user_data, 0); 862 io_uring_sqe_set_data(sqe, task); 863 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 864 } 865 866 static int 867 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 868 struct spdk_sock **socks) 869 { 870 int i, count, ret; 871 struct io_uring_cqe *cqe; 872 struct spdk_uring_sock *sock, *tmp; 873 struct spdk_uring_task *task; 874 int status; 875 876 for (i = 0; i < max; i++) { 877 ret = io_uring_peek_cqe(&group->uring, &cqe); 878 if (ret != 0) { 879 break; 880 } 881 882 if (cqe == NULL) { 883 break; 884 } 885 886 task = (struct spdk_uring_task *)cqe->user_data; 887 assert(task != NULL); 888 sock = task->sock; 889 assert(sock != NULL); 890 assert(sock->group != NULL); 891 assert(sock->group == group); 892 sock->group->io_inflight--; 893 sock->group->io_avail++; 894 status = cqe->res; 895 io_uring_cqe_seen(&group->uring, cqe); 896 897 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 898 899 if (spdk_unlikely(status <= 0)) { 900 if (status == -EAGAIN || status == -EWOULDBLOCK) { 901 continue; 902 } 903 } 904 905 switch (task->type) { 906 case SPDK_SOCK_TASK_POLLIN: 907 if ((status & POLLIN) == POLLIN) { 908 if (sock->base.cb_fn != NULL) { 909 assert(sock->pending_recv == false); 910 sock->pending_recv = true; 911 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 912 } 913 } 914 break; 915 case SPDK_SOCK_TASK_WRITE: 916 assert(TAILQ_EMPTY(&sock->base.pending_reqs)); 917 task->last_req = NULL; 918 task->iov_cnt = 0; 919 if (spdk_unlikely(status) < 0) { 920 sock->connection_status = status; 921 spdk_sock_abort_requests(&sock->base); 922 } else { 923 sock_complete_reqs(&sock->base, status); 924 } 925 926 break; 927 case SPDK_SOCK_TASK_CANCEL: 928 /* Do nothing */ 929 break; 930 default: 931 SPDK_UNREACHABLE(); 932 } 933 } 934 935 if (!socks) { 936 return 0; 937 } 938 count = 0; 939 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 940 if (count == max_read_events) { 941 break; 942 } 943 944 /* If the socket's cb_fn is NULL, just remove it from 945 * the list and do not add it to socks array */ 946 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 947 sock->pending_recv = false; 948 TAILQ_REMOVE(&group->pending_recv, sock, link); 949 continue; 950 } 951 952 socks[count++] = &sock->base; 953 } 954 955 /* Cycle the pending_recv list so that each time we poll things aren't 956 * in the same order. */ 957 for (i = 0; i < count; i++) { 958 sock = __uring_sock(socks[i]); 959 960 TAILQ_REMOVE(&group->pending_recv, sock, link); 961 962 if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 963 sock->pending_recv = false; 964 } else { 965 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 966 } 967 } 968 969 return count; 970 } 971 972 static int 973 _sock_flush_client(struct spdk_sock *_sock) 974 { 975 struct spdk_uring_sock *sock = __uring_sock(_sock); 976 struct msghdr msg = {}; 977 struct iovec iovs[IOV_BATCH_SIZE]; 978 int iovcnt; 979 ssize_t rc; 980 981 /* Can't flush from within a callback or we end up with recursive calls */ 982 if (_sock->cb_cnt > 0) { 983 return 0; 984 } 985 986 /* Gather an iov */ 987 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL); 988 if (iovcnt == 0) { 989 return 0; 990 } 991 992 /* Perform the vectored write */ 993 msg.msg_iov = iovs; 994 msg.msg_iovlen = iovcnt; 995 rc = sendmsg(sock->fd, &msg, 0); 996 if (rc <= 0) { 997 if (errno == EAGAIN || errno == EWOULDBLOCK) { 998 return 0; 999 } 1000 return rc; 1001 } 1002 1003 sock_complete_reqs(_sock, rc); 1004 1005 return 0; 1006 } 1007 1008 static void 1009 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1010 { 1011 struct spdk_uring_sock *sock = __uring_sock(_sock); 1012 int rc; 1013 1014 if (spdk_unlikely(sock->connection_status)) { 1015 req->cb_fn(req->cb_arg, sock->connection_status); 1016 return; 1017 } 1018 1019 spdk_sock_request_queue(_sock, req); 1020 1021 if (!sock->group) { 1022 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1023 rc = _sock_flush_client(_sock); 1024 if (rc) { 1025 spdk_sock_abort_requests(_sock); 1026 } 1027 } 1028 } 1029 } 1030 1031 static int 1032 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1033 { 1034 struct spdk_uring_sock *sock = __uring_sock(_sock); 1035 int val; 1036 int rc; 1037 1038 assert(sock != NULL); 1039 1040 val = nbytes; 1041 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1042 if (rc != 0) { 1043 return -1; 1044 } 1045 return 0; 1046 } 1047 1048 static bool 1049 uring_sock_is_ipv6(struct spdk_sock *_sock) 1050 { 1051 struct spdk_uring_sock *sock = __uring_sock(_sock); 1052 struct sockaddr_storage sa; 1053 socklen_t salen; 1054 int rc; 1055 1056 assert(sock != NULL); 1057 1058 memset(&sa, 0, sizeof sa); 1059 salen = sizeof sa; 1060 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1061 if (rc != 0) { 1062 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1063 return false; 1064 } 1065 1066 return (sa.ss_family == AF_INET6); 1067 } 1068 1069 static bool 1070 uring_sock_is_ipv4(struct spdk_sock *_sock) 1071 { 1072 struct spdk_uring_sock *sock = __uring_sock(_sock); 1073 struct sockaddr_storage sa; 1074 socklen_t salen; 1075 int rc; 1076 1077 assert(sock != NULL); 1078 1079 memset(&sa, 0, sizeof sa); 1080 salen = sizeof sa; 1081 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1082 if (rc != 0) { 1083 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1084 return false; 1085 } 1086 1087 return (sa.ss_family == AF_INET); 1088 } 1089 1090 static bool 1091 uring_sock_is_connected(struct spdk_sock *_sock) 1092 { 1093 struct spdk_uring_sock *sock = __uring_sock(_sock); 1094 uint8_t byte; 1095 int rc; 1096 1097 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1098 if (rc == 0) { 1099 return false; 1100 } 1101 1102 if (rc < 0) { 1103 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1104 return true; 1105 } 1106 1107 return false; 1108 } 1109 1110 return true; 1111 } 1112 1113 static struct spdk_sock_group_impl * 1114 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock) 1115 { 1116 struct spdk_uring_sock *sock = __uring_sock(_sock); 1117 struct spdk_sock_group_impl *group; 1118 1119 if (sock->placement_id != -1) { 1120 spdk_sock_map_lookup(&g_map, sock->placement_id, &group); 1121 return group; 1122 } 1123 1124 return NULL; 1125 } 1126 1127 static struct spdk_sock_group_impl * 1128 uring_sock_group_impl_create(void) 1129 { 1130 struct spdk_uring_sock_group_impl *group_impl; 1131 1132 group_impl = calloc(1, sizeof(*group_impl)); 1133 if (group_impl == NULL) { 1134 SPDK_ERRLOG("group_impl allocation failed\n"); 1135 return NULL; 1136 } 1137 1138 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1139 1140 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1141 SPDK_ERRLOG("uring I/O context setup failure\n"); 1142 free(group_impl); 1143 return NULL; 1144 } 1145 1146 TAILQ_INIT(&group_impl->pending_recv); 1147 1148 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1149 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1150 } 1151 1152 return &group_impl->base; 1153 } 1154 1155 static int 1156 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1157 struct spdk_sock *_sock) 1158 { 1159 struct spdk_uring_sock *sock = __uring_sock(_sock); 1160 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1161 int rc; 1162 1163 sock->group = group; 1164 sock->write_task.sock = sock; 1165 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1166 1167 sock->pollin_task.sock = sock; 1168 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1169 1170 sock->cancel_task.sock = sock; 1171 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1172 1173 /* switched from another polling group due to scheduling */ 1174 if (spdk_unlikely(sock->recv_pipe != NULL && 1175 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1176 assert(sock->pending_recv == false); 1177 sock->pending_recv = true; 1178 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1179 } 1180 1181 if (sock->placement_id != -1) { 1182 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1183 if (rc != 0) { 1184 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1185 /* Do not treat this as an error. The system will continue running. */ 1186 } 1187 } 1188 1189 return 0; 1190 } 1191 1192 static int 1193 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1194 struct spdk_sock **socks) 1195 { 1196 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1197 int count, ret; 1198 int to_complete, to_submit; 1199 struct spdk_sock *_sock, *tmp; 1200 struct spdk_uring_sock *sock; 1201 1202 if (spdk_likely(socks)) { 1203 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1204 sock = __uring_sock(_sock); 1205 if (spdk_unlikely(sock->connection_status)) { 1206 continue; 1207 } 1208 _sock_flush(_sock); 1209 _sock_prep_pollin(_sock); 1210 } 1211 } 1212 1213 to_submit = group->io_queued; 1214 1215 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1216 if (to_submit > 0) { 1217 /* If there are I/O to submit, use io_uring_submit here. 1218 * It will automatically call io_uring_enter appropriately. */ 1219 ret = io_uring_submit(&group->uring); 1220 if (ret < 0) { 1221 return 1; 1222 } 1223 group->io_queued = 0; 1224 group->io_inflight += to_submit; 1225 group->io_avail -= to_submit; 1226 } 1227 1228 count = 0; 1229 to_complete = group->io_inflight; 1230 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1231 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1232 } 1233 1234 return count; 1235 } 1236 1237 static int 1238 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1239 struct spdk_sock *_sock) 1240 { 1241 struct spdk_uring_sock *sock = __uring_sock(_sock); 1242 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1243 1244 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1245 _sock_prep_cancel_task(_sock, &sock->write_task); 1246 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1247 * currently can use a while loop here. */ 1248 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1249 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1250 uring_sock_group_impl_poll(_group, 32, NULL); 1251 } 1252 } 1253 1254 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1255 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1256 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1257 * currently can use a while loop here. */ 1258 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1259 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1260 uring_sock_group_impl_poll(_group, 32, NULL); 1261 } 1262 } 1263 1264 if (sock->pending_recv) { 1265 TAILQ_REMOVE(&group->pending_recv, sock, link); 1266 sock->pending_recv = false; 1267 } 1268 assert(sock->pending_recv == false); 1269 1270 if (sock->placement_id != -1) { 1271 spdk_sock_map_release(&g_map, sock->placement_id); 1272 } 1273 1274 sock->group = NULL; 1275 return 0; 1276 } 1277 1278 static int 1279 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1280 { 1281 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1282 1283 /* try to reap all the active I/O */ 1284 while (group->io_inflight) { 1285 uring_sock_group_impl_poll(_group, 32, NULL); 1286 } 1287 assert(group->io_inflight == 0); 1288 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1289 1290 io_uring_queue_exit(&group->uring); 1291 1292 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1293 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1294 } 1295 1296 free(group); 1297 return 0; 1298 } 1299 1300 static int 1301 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1302 { 1303 if (!opts || !len) { 1304 errno = EINVAL; 1305 return -1; 1306 } 1307 memset(opts, 0, *len); 1308 1309 #define FIELD_OK(field) \ 1310 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1311 1312 #define GET_FIELD(field) \ 1313 if (FIELD_OK(field)) { \ 1314 opts->field = g_spdk_uring_sock_impl_opts.field; \ 1315 } 1316 1317 GET_FIELD(recv_buf_size); 1318 GET_FIELD(send_buf_size); 1319 GET_FIELD(enable_recv_pipe); 1320 GET_FIELD(enable_quickack); 1321 GET_FIELD(enable_placement_id); 1322 1323 #undef GET_FIELD 1324 #undef FIELD_OK 1325 1326 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 1327 return 0; 1328 } 1329 1330 static int 1331 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1332 { 1333 if (!opts) { 1334 errno = EINVAL; 1335 return -1; 1336 } 1337 1338 #define FIELD_OK(field) \ 1339 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1340 1341 #define SET_FIELD(field) \ 1342 if (FIELD_OK(field)) { \ 1343 g_spdk_uring_sock_impl_opts.field = opts->field; \ 1344 } 1345 1346 SET_FIELD(recv_buf_size); 1347 SET_FIELD(send_buf_size); 1348 SET_FIELD(enable_recv_pipe); 1349 SET_FIELD(enable_quickack); 1350 SET_FIELD(enable_placement_id); 1351 1352 #undef SET_FIELD 1353 #undef FIELD_OK 1354 1355 return 0; 1356 } 1357 1358 static int 1359 uring_sock_flush(struct spdk_sock *_sock) 1360 { 1361 struct spdk_uring_sock *sock = __uring_sock(_sock); 1362 1363 if (!sock->group) { 1364 return _sock_flush_client(_sock); 1365 } 1366 1367 return 0; 1368 } 1369 1370 static struct spdk_net_impl g_uring_net_impl = { 1371 .name = "uring", 1372 .getaddr = uring_sock_getaddr, 1373 .connect = uring_sock_connect, 1374 .listen = uring_sock_listen, 1375 .accept = uring_sock_accept, 1376 .close = uring_sock_close, 1377 .recv = uring_sock_recv, 1378 .readv = uring_sock_readv, 1379 .writev = uring_sock_writev, 1380 .writev_async = uring_sock_writev_async, 1381 .flush = uring_sock_flush, 1382 .set_recvlowat = uring_sock_set_recvlowat, 1383 .set_recvbuf = uring_sock_set_recvbuf, 1384 .set_sendbuf = uring_sock_set_sendbuf, 1385 .is_ipv6 = uring_sock_is_ipv6, 1386 .is_ipv4 = uring_sock_is_ipv4, 1387 .is_connected = uring_sock_is_connected, 1388 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 1389 .group_impl_create = uring_sock_group_impl_create, 1390 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1391 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1392 .group_impl_poll = uring_sock_group_impl_poll, 1393 .group_impl_close = uring_sock_group_impl_close, 1394 .get_opts = uring_sock_impl_get_opts, 1395 .set_opts = uring_sock_impl_set_opts, 1396 }; 1397 1398 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); 1399